Spark入门指南:从基础概念到实践应用全解析( 七 )


—name
应用程序的名称
—jars
用逗号分隔的本地 jar 包,设置后,这些 jar 将包含在 driver 和 executor 的 classpath 下
—packages
包含在driver 和executor 的 classpath 中的 jar 的 maven 坐标
—exclude-packages
为了避免冲突 而指定不包含的 package
—repositories
远程 repository
—conf PROP=VALUE
指定 spark 配置属性的值,例如 -conf spark.executor.extraJavaOptinotallow=”-XX:MaxPermSize=256m”
—properties-file
加载的配置文件 , 默认为 conf/spark-defaults.conf
—driver-memory
Driver内存,默认 1G
—driver-java-options
传给 driver 的额外的 Java 选项
—driver-library-path
传给 driver 的额外的库路径
—driver-class-path
传给 driver 的额外的类路径
—driver-cores
Driver 的核数 , 默认是1 。在 yarn 或者 standalone 下使用
—executor-memory
每个 executor 的内存 , 默认是1G
—total-executor-cores
所有 executor 总共的核数 。仅仅在 mesos 或者 standalone 下使用
—num-executors
启动的 executor 数量 。默认为2 。在 yarn 下使用
—executor-core
每个 executor 的核数 。在yarn或者standalone下使用
2.Master_URL的值 Master URL
含义
local
使用1个worker线程在本地运行Spark应用程序
local[K]
使用K个worker线程在本地运行Spark应用程序
local[*]
使用所有剩余worker线程在本地运行Spark应用程序
spark://HOST:PORT
连接到Spark Standalone集群,以便在该集群上运行Spark应用程序
mesos://HOST:PORT
连接到Mesos集群 , 以便在该集群上运行Spark应用程序
yarn-client
以client方式连接到YARN集群,集群的定位由环境变量HADOOP_CONF_DIR定义,该方式driver在client运行 。
yarn-cluster
以cluster方式连接到YARN集群,集群的定位由环境变量HADOOP_CONF_DIR定义 , 该方式driver也在集群中运行 。
八、Spark 共享变量一般情况下 , 当一个传递给Spark操作(例如map和reduce)的函数在远程节点上面运行时,Spark操作实际上操作的是这个函数所用变量的一个独立副本 。
这些变量被复制到每台机器上,并且这些变量在远程机器上的所有更新都不会传递回驱动程序 。通常跨任务的读写变量是低效的,所以,Spark提供了两种共享变量:「广播变量(broadcast variable)」和「累加器(accumulator)」 。
1.广播变量广播变量允许程序员缓存一个只读的变量在每台机器上面,而不是每个任务保存一份拷贝 。说白了其实就是共享变量 。
如果Executor端用到了Driver的变量,如果不使用广播变量在Executor有多少task就有多少Driver端的变量副本 。如果使用广播变量在每个Executor中只有一份Driver端的变量副本 。
一个广播变量可以通过调用SparkContext.broadcast(v)方法从一个初始变量v中创建 。广播变量是v的一个包装变量,它的值可以通过value方法访问,下面的代码说明了这个过程:
import org.apache.spark.{SparkConf, SparkContext}object BroadcastExample {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("Broadcast Example").setMaster("local")val sc = new SparkContext(conf)val data = https://www.isolves.com/it/cxkf/kj/2023-10-17/sc.parallelize(List(1, 2, 3, 4, 5))// 创建一个广播变量val factor = sc.broadcast(2)// 使用广播变量val result = data.map(x => x * factor.value)result.collect().foreach(println)}}广播变量创建以后,我们就能够在集群的任何函数中使用它来代替变量v,这样我们就不需要再次传递变量v到每个节点上 。另外 , 为了保证所有的节点得到广播变量具有相同的值,对象v不能在广播之后被修改 。
2.累加器累加器是一种只能通过关联操作进行“加”操作的变量 , 因此它能够高效的应用于并行操作中 。它们能够用来实现counters和sums 。
一个累加器可以通过调用SparkContext.accumulator(v)方法从一个初始变量v中创建 。运行在集群上的任务可以通过add方法或者使用+=操作来给它加值 。然而,它们无法读取这个值 。只有驱动程序可以使用value方法来读取累加器的值 。
示例代码如下:
import org.apache.spark.{SparkConf, SparkContext}object AccumulatorExample {def main(args: Array[String]) {val conf = new SparkConf().setAppName("AccumulatorExample")val sc = new SparkContext(conf)val accum = sc.longAccumulator("My Accumulator")sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))println(accum.value) // 输出 10}}


推荐阅读