Spark入门指南:从基础概念到实践应用全解析(13)


Structured Streaming 同样支持 DSL 和 SQL 语法 。
DSL 语法:
import org.apache.spark.sql.SparkSessionval spark = SparkSession.builder.appName("Structured Streaming Example").getOrCreate()val lines = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()import spark.implicits._val words = lines.as[String].flatMap(_.split(" "))val wordCounts = words.groupBy("value").count()val query = wordCounts.writeStream.outputMode("complete").format("console").start()query.awaitTermination()SQL 语法:
import org.apache.spark.sql.SparkSessionval spark = SparkSession.builder.appName("Structured Streaming Example").getOrCreate()val lines = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()lines.createOrReplaceTempView("lines")val wordCounts = spark.sql("""|SELECT value, COUNT(*) as count|FROM (|SELECT explode(split(value, ' ')) as value|FROM lines|)|GROUP BY value""".stripMargin)val query = wordCounts.writeStream.outputMode("complete").format("console").start()query.awaitTermination()1.SourceStructured Streaming 支持多种输入源,包括文件源(如文本文件、Parquet 文件、JSON 文件等)、Kafka、Socket 等 。下面是一个使用 Scala 语言从 Kafka 中读取数据的例子:
import org.apache.spark.sql.SparkSessionval spark = SparkSession.builder.appName("StructuredStreaming").getOrCreate()// 订阅一个主题val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "host1:port1,host2:port2").option("subscribe", "topic1").load()df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)]2.OutputStructured Streaming 支持多种输出方式,包括控制台输出、内存输出、文件输出、数据源输出等 。下面是将数据写入到 Parquet 文件中的例子:
import org.apache.spark.sql.SparkSessionval spark = SparkSession.builder.appName("StructuredStreaming").getOrCreate()// 从 socket 中读取数据val lines = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()// 将数据写入到 Parquet 文件中lines.writeStream.format("parquet").option("path", "path/to/output/dir").option("checkpointLocation", "path/to/checkpoint/dir").start()3.Output Mode每当结果表更新时,我们都希望将更改后的结果行写入外部接收器 。
Output mode 指定了数据写入输出接收器的方式 。Structured Streaming 支持以下三种 output mode:
Output Mode
描述
Append
只将流 DataFrame/Dataset 中的新行写入接收器 。
Complete
每当有更新时,将流 DataFrame/Dataset 中的所有行写入接收器 。
Update
每当有更新时 , 只将流 DataFrame/Dataset 中更新的行写入接收器 。
4.Output SinkOutput sink 指定了数据写入的位置 。Structured Streaming 支持多种输出接收器,包括文件接收器、Kafka 接收器、Foreach 接收器、控制台接收器和内存接收器等 。下面是一些使用 Scala 语言将数据写入到不同输出接收器中的例子:
import org.apache.spark.sql.SparkSessionval spark = SparkSession.builder.appName("StructuredStreaming").getOrCreate()// 从 socket 中读取数据val lines = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()// 将数据写入到 Parquet 文件中lines.writeStream.format("parquet").option("path", "path/to/output/dir").option("checkpointLocation", "path/to/checkpoint/dir").start()// 将数据写入到 Kafka 中//selectExpr 是一个 DataFrame 的转换操作 , 它允许你使用 SQL 表达式来选择 DataFrame 中的列 。//selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") 表示选择 key 和 value 列,并将它们的类型转换为字符串类型 。//这是因为 Kafka 接收器要求数据必须是字符串类型或二进制类型 。lines.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").writeStream.format("kafka").option("kafka.bootstrap.servers", "host1:port1,host2:port2").option("topic", "topic1").start()// 将数据写入到控制台中lines.writeStream.format("console").start()// 将数据写入到内存中lines.writeStream.format("memory").queryName("tableName").start()


推荐阅读