2、Dask函数功能与上面一样,所以我们把代码整合在一起:
import dask.dataframe as dd from dask.distributed import Client import timedef extraction():path1 = "yellow_tripdata/yellow_tripdata_2014-01.parquet"df_trips = dd.read_parquet(path1)path2 = "taxi+_zone_lookup.parquet"df_zone = dd.read_parquet(path2)return df_trips, df_zonedef transformation(df_trips, df_zone):df_trips = mean_test_speed_dask(df_trips)df = df_trips.merge(df_zone, how="inner", left_on="PULocationID", right_on="LocationID")df = df[["Borough", "Zone", "trip_distance"]]df = get_Queens_test_speed_dask(df)df = round_column(df, "trip_distance", 2)df = rename_column(df, "trip_distance", "mean_trip_distance")df = sort_by_columns_desc(df, "mean_trip_distance")return dfdef loading_into_parquet(df_dask):df_dask.to_parquet("yellow_tripdata_dask.parquet", engine="fastparquet")def mean_test_speed_dask(df_dask):df_dask = df_dask.groupby("PULocationID").agg({"trip_distance": "mean"})return df_daskdef get_Queens_test_speed_dask(df_dask):df_dask = df_dask[df_dask["Borough"] == "Queens"]return df_daskdef round_column(df, column, to_round):df[column] = df[column].round(to_round)return dfdef rename_column(df, column_old, column_new):df = df.rename(columns={column_old: column_new})return dfdef sort_by_columns_desc(df, column):df = df.sort_values(column, ascending=False)return dfdef main():print("Starting ETL for Dask")start_time = time.perf_counter()client = Client() # Start Dask Clientdf_trips, df_zone = extraction()end_extract = time.perf_counter()time_extract = end_extract - start_timeprint(f"Extraction Parquet end in {round(time_extract, 5)} seconds")print("Transforming...")df = transformation(df_trips, df_zone)end_transform = time.perf_counter()time_transformation = time.perf_counter() - end_extractprint(f"Transformation end in {round(time_transformation, 5)} seconds")print("Loading...")loading_into_parquet(df)load_transformation = time.perf_counter() - end_transformprint(f"Loading end in {round(load_transformation, 5)} seconds")print(f"End ETL for Dask in {str(time.perf_counter() - start_time)}")client.close() # Close Dask Clientif __name__ == "__main__":main()
测试结果对比1、小数据集我们使用164 Mb的数据集,这样大小的数据集对我们来说比较小,在日常中也时非常常见的 。
下面是每个库运行五次的结果:
Polars
文章插图
Dask
文章插图
2、中等数据集我们使用1.1 Gb的数据集,这种类型的数据集是GB级别,虽然可以完整的加载到内存中,但是数据体量要比小数据集大很多 。
Polars
文章插图
Dask
文章插图
3、大数据集我们使用一个8gb的数据集,这样大的数据集可能一次性加载不到内存中,需要框架的处理 。
Polars
文章插图
Dask
文章插图
总结从结果中可以看出,Polars和Dask都可以使用惰性求值 。所以读取和转换非常快,执行它们的时间几乎不随数据集大小而变化;
可以看到这两个库都非常擅长处理中等规模的数据集 。
由于polar和Dask都是使用惰性运行的,所以下面展示了完整ETL的结果(平均运行5次) 。
文章插图
Polars在小型数据集和中型数据集的测试中都取得了胜利 。但是,Dask在大型数据集上的平均时间性能为26秒 。
这可能和Dask的并行计算优化有关,因为官方的文档说“Dask任务的运行速度比Spark ETL查询快三倍,并且使用更少的CPU资源” 。
文章插图
上面是测试使用的电脑配置,Dask在计算时占用的CPU更多,可以说并行性能更好 。
推荐阅读
- 三位数除以两位数的除法计算
- 3年零存整取利息怎么算
- 教师退休金如何计算
- 2022年退休金如何计算公式
- BIOS与CMOS的区别与作用
- 样本量计算软件 样本量计算公式
- 营业收入利润率公式~~营业利润率怎么计算啊?谢谢了?
- 残保金怎么计提计算的
- 汽油密度计算公式 汽油密度
- 满堂脚手架怎么计算工程量 满堂脚手架