/*Return schema for input column(s) to the UDAF, schema being built using StructType*/=> public StructType inputSchema()/*Return schema of aggregation buffer, schema being built using StructType */=> public StructType bufferSchema()/*DataType of final aggregation result*/=> public DataType dataType()/*Initialize aggregation buffer*/=> public void initialize(MutableAggregationBuffer buffer)/*Update aggregation buffer for each of the untyped view (Row) of an input object*/=> public void update(MutableAggregationBuffer buffer, Row row)/*Update current aggregation buffer with a partially aggregated buffer*/=> public void merge(MutableAggregationBuffer buffer, Row buffer)/*Evaluate final aggregation buffer and return the evaluated value of DataType declared earlier */=> public Object evaluate(Row buffer)
除了覆盖上述方法外,还可以始终声明其他字段(在UDAF构造函数中使用可选的初始化)和自定义UDAF类中的其他方法,以便在覆盖方法中使用它们以实现聚合目标 。
在使用UDAF之前,必须先在Spark框架中注册相同的实例:
spark.udf.register('sampleUDAF, new SampleUDAF());
注册后,可以在Spark SQL查询中使用UDAF来聚合整个数据集/数据框或数据集/数据框中的记录组(通过一列或多列分组) 。除了直接在Spark SQL查询中使用外,还可以通过数据框/数据集聚合API(例如" agg")使用UDAF 。
【在Apache Spark中执行聚合的五种方法】UDAF虽然是定义自定义聚合的一种流行方法,但是当在聚合缓冲区中使用复杂的数据类型(数组或映射)时,会遇到性能问题 。这是由于以下事实:在UDAF中的每次更新操作期间,对于复杂的数据类型,将scala数据类型(用户特定)转换为相应的催化剂数据类型(催化剂内部数据类型)(反之亦然)变得非常昂贵 。从内存和计算的角度来看,此成本都更高 。
数据集的聚合器:聚合器是对数据集执行聚合的最新方法,类似于UDAF,它也基于聚合缓冲区的概念以及在该缓冲区上运行的一组方法 。但是,聚合器进行聚合的方式称为类型化聚合,因为它涉及对各种类型的对象进行操作/使用各种类型的对象进行操作 。聚合器的输入,聚合缓冲区和最终的聚合输出(从缓冲区派生)都是具有相应Spark编码器的某些类型的对象 。用户可以通过使用为IN定义的类型(输入记录类型)扩展抽象的通用'Aggregator <IN,BUF,OUT>'类(在包'org.apache.spark.sql.expressions中提供)来定义自己的自定义Aggregator 。,为BUF(聚合缓冲区)定义的类型和为OUT(输出记录类型)定义的类型,以及在基类中重写以下方法的实现:
/* return Encoder for aggregation buffer of type BUF. This is required for buffer ser/deser during shuffling or disk spilling */=> public Encoder<BUF> bufferEncoder()/* return Encoder for output object of type OUT after aggregation is performed */=> public Encoder<OUT> outputEncoder()/* return updated aggregation buffer object of type BUF after aggregating the existing buffer object of type BUF with the input object of type IN*/=> public BUF reduce(BUF buffer, IN input) ()/* return updated aggregation buffer of type BUF after merging two partially aggregated buffer objects of type BUF */=> public BUF merge(BUF buffer1, BUF buffer2)/* return output object of type OUT from evaluation ofaggregation buffer of type BUF */=> public OUT finish(BUF arg0)/* return buffer object of type BUF after initializing the same */=> public BUF zero()
由于Aggregator本机支持将聚合缓冲区作为对象,因此它是高效的,并且不需要与从Scala类型转换为催化剂类型(反之亦然)相关的不必要的开销(与UDAF一样) 。同样,聚合器的聚合方式在编写聚合逻辑时提供了更多的灵活性和编程的美感 。聚合器也已集成到无类型聚合流中,以支持SQL,例如即将发布的版本中的查询 。
预定义的聚合功能:Spark提供了各种预构建的聚合功能,可用于分布式数据收集的数据框或数据集表示形式 。这些预先构建的函数可以在SPARK SQL查询表达式中使用,也可以与为Dataframe或Dataset定义的聚合API一起使用 。在org.apache.spark.sql包中,所有预先构建的聚合函数都定义为"函数"类的静态方法 。带下划线的链接可以列出所有这些功能的列表 。
预定义的聚合函数经过高度优化,在大多数情况下可以直接与Spark tungusten格式一起使用 。因此,如果" functions"类中存在预先构建的聚合函数,则Spark程序员应始终偏向于使用它们 。万一那里没有所需的聚合函数,那么只有一个可以诉诸于编写自定义聚合函数 。
如果您对Spark Aggregation框架有更多查询,请随时在评论部分提问 。
推荐阅读
- 不可思议的神秘文化
- 独立服务器与云主机:性能大比拼
- 古道汇集八方客,正在迅速消失的茶马古道马帮
- 足球经理|培养职业精神在大学语文教育中的意义
- 外星有生物存在吗 外星来物寄生人类
- 脸部去角质的正确方法是什么?
- 护肤品使用步骤是什么?
- 化淡妆基本步骤 学会这个轻松化淡妆
- 祛眼袋,试试这个神奇的小方法
- 蛇用什么辨别气味 蛇最怕什么气味