Spark入门指南:从基础概念到实践应用全解析( 五 )


  • 从外部存储系统 。
  • 从其他RDD 。
  • 由一个已经存在的 Scala 集合创建 。
(1) 从外部存储系统
由外部存储系统的数据集创建,包括本地的文件系统,还有所有 Hadoop 支持的数据集,比如 HDFS、Cassandra、HBase 等:
val rdd1 = sc.textFile("hdfs://node1:8020/wordcount/input/words.txt")(2) 从其他RDD
通过已有的 RDD 经过算子转换生成新的 RDD:
val rdd2=rdd1.flatMap(_.split(" "))(3) 由一个已经存在的 Scala 集合创建
val rdd3 = sc.parallelize(Array(1,2,3,4,5,6,7,8))或者val rdd4 = sc.makeRDD(List(1,2,3,4,5,6,7,8))其实makeRDD 方法底层调用了 parallelize 方法:
4.RDD 缓存机制RDD 缓存是在内存存储RDD计算结果的一种优化技术 。把中间结果缓存起来以便在需要的时候重复使用,这样才能有效减轻计算压力,提升运算性能 。
要持久化一个RDD,只要调用其cache()或者persist()方法即可 。在该RDD第一次被计算出来时,就会直接缓存在每个节点中 。而且Spark的持久化机制还是自动容错的,如果持久化的RDD的任何partition丢失了,那么Spark会自动通过其源RDD,使用transformation操作重新计算该partition 。
val rdd1 = sc.textFile("hdfs://node01:8020/words.txt")val rdd2 = rdd1.flatMap(x=>x.split(" ")).map((_,1)).reduceByKey(_+_)rdd2.cache //缓存/持久化rdd2.sortBy(_._2,false).collect//触发action,会去读取HDFS的文件,rdd2会真正执行持久化rdd2.sortBy(_._2,false).collect//触发action,会去读缓存中的数据,执行速度会比之前快,因为rdd2已经持久化到内存中了需要注意的是,在触发action的时候,才会去执行持久化 。
cache()和persist()的区别在于,cache()是persist()的一种简化方式,cache()的底层就是调用的persist()的无参版本,就是调用persist(MEMORY_ONLY),将数据持久化到内存中 。
如果需要从内存中去除缓存 , 那么可以使用unpersist()方法 。
rdd.persist(StorageLevel.MEMORY_ONLY)rdd.unpersist()5.存储级别RDD存储级别主要有以下几种 。
级别
使用空间
CPU时间
是否在内存中
是否在磁盘上
备注
MEMORY_ONLY




使用未序列化的Java对象格式 , 将数据保存在内存中 。如果内存不够存放所有的数据,则数据可能就不会进行持久化 。
MEMORY_ONLY_2




数据存2份
MEMORY_ONLY_SER




基本含义同MEMORY_ONLY 。唯一的区别是,会将RDD中的数据进行序列化 。这种方式更加节省内存
MEMORY_ONLY_SER_2




数据序列化,数据存2份
MEMORY_AND_DISK

中等
部分
部分
如果数据在内存中放不下,则溢写到磁盘
MEMORY_AND_DISK_2

中等
部分
部分
数据存2份
MEMORY_AND_DISK_SER


部分
部分
基本含义同MEMORY_AND_DISK 。唯一的区别是 , 会将RDD中的数据进行序列化
MEMORY_AND_DISK_SER_2


部分
部分
数据存2份
DISK_ONLY




使用未序列化的Java对象格式 , 将数据全部写入磁盘文件中 。
DISK_ONLY_2




数据存2份
OFF_HEAP
 
 
 
 
这个目前是试验型选项,类似MEMORY_ONLY_SER , 但是数据是存储在堆外内存的 。
对于上述任意一种持久化策略,如果加上后缀_2,代表的是将每个持久化的数据,都复制一份副本,并将副本保存到其他节点上 。
这种基于副本的持久化机制主要用于进行容错 。假如某个节点挂掉了,节点的内存或磁盘中的持久化数据丢失了 , 那么后续对RDD计算时还可以使用该数据在其他节点上的副本 。如果没有副本的话,就只能将这些数据从源头处重新计算一遍了 。


推荐阅读