并行计算框架Polars、Dask的数据处理性能对比

在Pandas 2.0发布以后,我们发布过一些评测的文章,这次我们看看,除了Pandas以外,常用的两个都是为了大数据处理的并行数据框架的对比测试 。

并行计算框架Polars、Dask的数据处理性能对比

文章插图
本文我们使用两个类似的脚本来执行提取、转换和加载(ETL)过程 。
测试内容这两个脚本主要功能包括:
从两个parquet 文件中提取数据,对于小型数据集,变量path1将为“yellow_tripdata/ yellow_tripdata_2014-01”,对于中等大小的数据集,变量path1将是“yellow_tripdata/yellow_tripdata” 。对于大数据集,变量path1将是“yellow_tripdata/yellow_tripdata*.parquet”;
【并行计算框架Polars、Dask的数据处理性能对比】进行数据转换:a)连接两个DF,b)根据PULocationID计算行程距离的平均值,c)只选择某些条件的行,d)将步骤b的值四舍五入为2位小数,e)将列“trip_distance”重命名为“mean_trip_distance”,f)对列“mean_trip_distance”进行排序 。
将最终的结果保存到新的文件 。
脚本1、Polars数据加载读取
def extraction():"""Extract two datasets from parquet files"""path1="yellow_tripdata/yellow_tripdata_2014-01.parquet"df_trips= pl_read_parquet(path1,)path2 = "taxi+_zone_lookup.parquet"df_zone = pl_read_parquet(path2,)return df_trips, df_zonedef pl_read_parquet(path, ):"""Converting parquet file into Polars dataframe"""df= pl.scan_parquet(path,)return df转换函数
def transformation(df_trips, df_zone):"""Proceed to several transformations"""df_trips= mean_test_speed_pl(df_trips, )df = df_trips.join(df_zone,how="inner", left_on="PULocationID", right_on="LocationID",)df = df.select(["Borough","Zone","trip_distance",])df = get_Queens_test_speed_pd(df)df = round_column(df, "trip_distance",2)df = rename_column(df, "trip_distance","mean_trip_distance")df = sort_by_columns_desc(df, "mean_trip_distance")return dfdef mean_test_speed_pl(df_pl,):"""Getting Mean per PULocationID"""df_pl = df_pl.groupby('PULocationID').agg(pl.col(["trip_distance",]).mean())return df_pldef get_Queens_test_speed_pd(df_pl):"""Only getting Borough in Queens"""df_pl = df_pl.filter(pl.col("Borough")=='Queens')return df_pldef round_column(df, column,to_round):"""Round numbers on columns"""df = df.with_columns(pl.col(column).round(to_round))return dfdef rename_column(df, column_old, column_new):"""Renaming columns"""df = df.rename({column_old: column_new})return dfdef sort_by_columns_desc(df, column):"""Sort by column"""df = df.sort(column, descending=True)return df保存
def loading_into_parquet(df_pl):"""Save dataframe in parquet"""df_pl.collect(streaming=True).write_parquet(f'yellow_tripdata_pl.parquet')其他代码
import polars as pl import timedef pl_read_parquet(path, ):"""Converting parquet file into Polars dataframe"""df= pl.scan_parquet(path,)return dfdef mean_test_speed_pl(df_pl,):"""Getting Mean per PULocationID"""df_pl = df_pl.groupby('PULocationID').agg(pl.col(["trip_distance",]).mean())return df_pldef get_Queens_test_speed_pd(df_pl):"""Only getting Borough in Queens"""df_pl = df_pl.filter(pl.col("Borough")=='Queens')return df_pldef round_column(df, column,to_round):"""Round numbers on columns"""df = df.with_columns(pl.col(column).round(to_round))return dfdef rename_column(df, column_old, column_new):"""Renaming columns"""df = df.rename({column_old: column_new})return dfdef sort_by_columns_desc(df, column):"""Sort by column"""df = df.sort(column, descending=True)return dfdef mAIn():print(f'Starting ETL for Polars')start_time = time.perf_counter()print('Extracting...')df_trips, df_zone =extraction()end_extract=time.perf_counter()time_extract =end_extract- start_timeprint(f'Extraction Parquet end in {round(time_extract,5)} seconds')print('Transforming...')df = transformation(df_trips, df_zone)end_transform = time.perf_counter()time_transformation =time.perf_counter() - end_extractprint(f'Transformation end in {round(time_transformation,5)} seconds')print('Loading...')loading_into_parquet(df,)load_transformation =time.perf_counter() - end_transformprint(f'Loading end in {round(load_transformation,5)} seconds')print(f"End ETL for Polars in {str(time.perf_counter()-start_time)}")if __name__ == "__main__":main()


推荐阅读