文章插图
聚合是数据分析任务中广泛使用的运算符,Spark为此提供了坚实的框架 。以下是使用Spark可以针对大数据进行聚合的五种不同方式 。
RDD上的GroupByKey或ReduceByKey转换:RDD是Spark中分布式数据收集的最早表示,其中数据通过" T"类型的任意JAVA对象表示 。RDD上的聚合与map-reduce框架中的reduce概念相似,在reduce中,reduce函数(作用于两个输入记录以生成聚合记录)是聚合的关键 。使用RDD时,聚合可以通过GroupByKey或ReduceByKey转换来执行,但是,这些转换仅限于Pair RDD(元组对象的集合,每个元组都由类型为" K"的键对象和类型为" V"的值对象组成)。
在通过GroupByKey进行聚合的情况下,转换会导致元组对象具有键对象和针对该键对象的所有值对象的集合 。因此,之后需要应用一个映射器(通过map,maptoPair或mAppartitions进行映射转换),以便将每个Tuple对象的值对象的集合减少为一个聚合的值对象 。
文章插图
> Aggregation on a Pair RDD (with 2 partitions) via GroupByKey followed via either of map, maptopair
映射程序(例如map,maptoPair和mappartitions转换)包含聚合函数,以将类型为" V"的值对象的集合减少为类型为" U"的聚合对象 。聚合函数可以是任意函数,不需要遵循关联性或交换性状 。GroupByKey转换具有三种风格,它们因应用GroupByKey转换而在RDD的分区规范上有所不同 。GroupByKey可以总结为:
GroupByKey (PairRDD<K,V>) => PairRDD<K,Iterator<V>> Map (PairRDD<K,Iterator<V>>) => PairRDD<K,U>如果通过ReduceByKey进行聚合,则转换将直接导致具有键对象和针对该键对象的聚合对象的元组对象 。与GroupByKey一样,在ReduceByKey之后不需要映射器 。ReduceByKey转换采用关联和可交换的聚合函数,以便在跨分区聚合记录之前,可以在本地聚合位于同一分区的记录 。同样,聚合函数接受两个说类型为" V"的值对象,并返回一个类型为" V"的对象 。与GroupByKey相似,ReduceByKey转换也具有三种风格,它们的区别在于通过应用ReduceByKey转换而导致的RDD分区规范 。ReduceByKey可以总结为:
ReduceByKey(PairRDD<K,V>, Function<V,V,V>) => PairRDD<K,V>在GroupByKey和ReduceByKey中,前者更通用,可以与任何聚合函数一起使用,而后者则更有效,但仅适用于前面所述的一类聚合函数 。
RDD或数据集上的Mappartitions:如先前博客中所述,Mappartitions是功能强大的窄转换之一,可在RDD和Dataset(Spark中的数据表示)上使用,以明智地执行各种操作 。这样的操作之一也包括聚合 。但是,唯一需要满足的条件是,属于相同分组关键字的记录应位于单个分区中 。在涉及分组密钥的混排操作中实现的RDD或数据集(要聚合)中可以隐式满足此条件 。同样,可以通过首先基于分组密钥对RDD或数据集进行重新分区来明确实现该条件 。
在用于典型聚合流的mappartitions内,必须首先实例化一个Hashmap,将Hashmap与相应的分组键相对应地存储聚合的Value Objects 。然后,在迭代基础分区的数据收集时,将重复更新此Hashmap 。最后,返回包含在映射中的聚合值/对象(可选以及关联的分组键)的迭代器 。
由于基于Mappartitions的聚合涉及将Hashmap保留在内存中以保存键和聚合的Value对象,因此,如果大量唯一分组键驻留在基础分区中,则Hashmap将需要大量堆内存,因此可能导致 相应执行程序的内存不足终止的风险 。从此以后,不应该歪曲跨分区的分组密钥分配,否则会由于过度提供执行程序内存来处理偏斜而导致执行程序内存浪费 。此外,由于需要基于堆内存的聚合哈希图,因此与Spark中的专用聚合运算符相比,对内存的相对内存分配更多,但是如果内存不是约束,则基于Mappartitions的聚合可以提供良好的性能提升 。
用于数据帧或数据集的UDAF:与上述方法不同,UDAF基于聚合缓冲区的概念以及在此缓冲区上运行的一组方法来实现聚合 。
文章插图
> Aggregation buffer based aggregation flow in Spark (for Datasets and Dataframe)
到目前为止,UDAF是为Spark中的分布式数据收集的Dataframe或Dataset表示编写聚合逻辑的最常用方法 。UDAF在数据收集的无类型视图上工作,在该视图中,数据记录被视为(表的)一行,其架构定义了该行中每一列的类型和可空性 。通过扩展包" org.Apache.spark.sql.expressions"中存在的" UserDefinedAggregationFunction"类并覆盖基类中以下方法的实现,可以在Spark中创建UDAF:
推荐阅读
- 不可思议的神秘文化
- 独立服务器与云主机:性能大比拼
- 古道汇集八方客,正在迅速消失的茶马古道马帮
- 足球经理|培养职业精神在大学语文教育中的意义
- 外星有生物存在吗 外星来物寄生人类
- 脸部去角质的正确方法是什么?
- 护肤品使用步骤是什么?
- 化淡妆基本步骤 学会这个轻松化淡妆
- 祛眼袋,试试这个神奇的小方法
- 蛇用什么辨别气味 蛇最怕什么气味