Spark入门指南:从基础概念到实践应用全解析( 六 )


6.RDD的血缘关系血缘关系是指 RDD 之间的依赖关系 。当你对一个 RDD 执行转换操作时,Spark 会生成一个新的 RDD , 并记录这两个 RDD 之间的依赖关系 。这种依赖关系就是血缘关系 。
血缘关系可以帮助 Spark 在发生故障时恢复数据 。当一个分区丢失时,Spark 可以根据血缘关系重新计算丢失的分区,而不需要从头开始重新计算整个 RDD 。
血缘关系还可以帮助 Spark 优化计算过程 。Spark 可以根据血缘关系合并多个连续的窄依赖转换,减少数据传输和通信开销 。
我们可以执行toDebugString打印RDD的依赖关系 。
下面是一个简单的例子:
val conf = new SparkConf().setAppName("Lineage Example").setMaster("local")val sc = new SparkContext(conf)val data = https://www.isolves.com/it/cxkf/kj/2023-10-17/sc.parallelize(List(1, 2, 3, 4, 5))val mappedData = data.map(x => x + 1)val filteredData = mappedData.filter(x => x % 2 == 0)println(filteredData.toDebugString)在这个例子中,我们首先创建了一个包含 5 个元素的 RDD , 并对它执行了两个转换操作:map 和 filter 。然后,我们使用 toDebugString 方法打印了最终 RDD 的血缘关系 。
运行这段代码后,你会看到类似下面的输出:
(2) MapPartitionsRDD[2] at filter at <console>:26 [] |MapPartitionsRDD[1] at map at <console>:24 [] |ParallelCollectionRDD[0] at parallelize at <console>:22 []这个输出表示最终的 RDD 是通过两个转换操作(map 和 filter)从原始的 ParallelCollectionRDD 转换而来的 。
六、CheckPointCheckPoint可以将RDD从其依赖关系中抽出来 , 保存到可靠的存储系统(例如HDFS,S3等),即它可以将数据和元数据保存到检查指向目录中 。因此,在程序发生崩溃的时候 , Spark可以恢复此数据 , 并从停止的任何地方开始 。
CheckPoint分为两类:

  • 高可用CheckPoint:容错性优先 。这种类型的检查点可确保数据永久存储,如存储在HDFS或其他分布式文件系统上 。这也意味着数据通常会在网络中复制 , 这会降低检查点的运行速度 。
  • 本地CheckPoint:性能优先 。RDD持久保存到执行程序中的本地文件系统 。因此,数据写得更快,但本地文件系统也不是完全可靠的 , 一旦数据丢失 , 工作将无法恢复 。
开发人员可以使用RDD.checkpoint()方法来设置检查点 。在使用检查点之前,必须使用SparkContext.setCheckpointDir(directory: String)方法设置检查点目录 。
下面是一个简单的例子:
import org.apache.spark.{SparkConf, SparkContext}object CheckpointExample {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("Checkpoint Example").setMaster("local")val sc = new SparkContext(conf)// 设置 checkpoint 目录sc.setCheckpointDir("/tmp/checkpoint")val data = https://www.isolves.com/it/cxkf/kj/2023-10-17/sc.parallelize(List(1, 2, 3, 4, 5))val mappedData = data.map(x => x + 1)val filteredData = mappedData.filter(x => x % 2 == 0)// 对 RDD 进行 checkpointfilteredData.checkpoint()// 触发 checkpointfilteredData.count()}}RDD的检查点机制就好比Hadoop将中间计算值存储到磁盘,即使计算中出现了故障,我们也可以轻松地从中恢复 。通过对 RDD 启动检查点机制可以实现容错和高可用 。
Persist VS CheckPoint
  • 位置:Persist 和 Cache 只能保存在本地的磁盘和内存中(或者堆外内存–实验中),而 Checkpoint 可以保存数据到 HDFS 这类可靠的存储上 。
  • 生命周期:Cache 和 Persist 的 RDD 会在程序结束后会被清除或者手动调用 unpersist 方法,而 Checkpoint 的 RDD 在程序结束后依然存在,不会被删除 。CheckPoint将RDD持久化到HDFS或本地文件夹,如果不被手动remove掉,是一直存在的,也就是说可以被下一个driver使用,而Persist不能被其他dirver使用 。
七、Spark-Submit1.详细参数说明 参数名
参数说明
—master
master 的地址,提交任务到哪里执行,例如 spark://host:port, yarn, local 。具体指可参考下面关于Master_URL的列表
—deploy-mode
在本地 (client) 启动 driver 或在 cluster 上启动,默认是 client
—class
应用程序的主类,仅针对 java 或 scala 应用


推荐阅读