Spark入门指南:从基础概念到实践应用全解析( 九 )

  • StructType (fields):表示一个拥有 StructFields (fields) 序列结构的值 。
  • StructField (name, dataType, nullable):代表 StructType 中的一个字段,字段的名字通过 name 指定 , dataType 指定 field 的数据类型 , nullable 表示字段的值是否有 null 值 。
  • 3.DataFrameDataFrame 是 Spark 中用于处理结构化数据的一种数据结构 。它类似于关系数据库中的表,具有行和列 。每一列都有一个名称和一个类型,每一行都是一条记录 。
    DataFrame 支持多种数据源,包括结构化数据文件、Hive 表、外部数据库和现有的 RDD 。它提供了丰富的操作,包括筛选、聚合、分组、排序等 。
    DataFrame 的优点在于它提供了一种高级的抽象,使得用户可以使用类似于 SQL 的语言进行数据处理,而无需关心底层的实现细节 。此外 , Spark 会自动对 DataFrame 进行优化,以提高查询性能 。
    下面是一个使用DataFrame的代码例子:
    import org.apache.spark.sql.SparkSessionval spark = SparkSession.builder.appName("DataFrame Example").getOrCreate()import spark.implicits._val data = https://www.isolves.com/it/cxkf/kj/2023-10-17/Seq(("Alice", 25),("Bob", 30),("Charlie", 35))val df = data.toDF("name", "age")df.show()在这个示例中,我们首先创建了一个 SparkSession 对象,然后使用 toDF 方法将一个序列转换为 DataFrame 。最后,我们使用 show 方法来显示 DataFrame 的内容 。
    4.创建 DataFrame在 Scala 中 , 可以通过以下几种方式创建 DataFrame:
    从现有的 RDD 转换而来 。例如:
    import org.apache.spark.sql.SparkSessionval spark = SparkSession.builder.appName("Create DataFrame").getOrCreate()import spark.implicits._case class Person(name: String, age: Int)val rdd = spark.sparkContext.parallelize(Seq(Person("Alice", 25), Person("Bob", 30)))val df = rdd.toDF()df.show()从外部数据源读取 。例如 , 从 JSON 文件中读取数据并创建 DataFrame:
    import org.apache.spark.sql.SparkSessionval spark = SparkSession.builder.appName("Create DataFrame").getOrCreate()val df = spark.read.json("path/to/json/file")df.show()通过编程方式创建 。例如 , 使用 createDataFrame 方法:
    import org.apache.spark.sql.{Row, SparkSession}import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}val spark = SparkSession.builder.appName("Create DataFrame").getOrCreate()val schema = StructType(List(StructField("name", StringType, nullable = true),StructField("age", IntegerType, nullable = true)))val data = https://www.isolves.com/it/cxkf/kj/2023-10-17/Seq(Row("Alice", 25), Row("Bob", 30))val rdd = spark.sparkContext.parallelize(data)val df = spark.createDataFrame(rdd, schema)df.show()5.DSL & SQL在 Spark 中 , 可以使用两种方式对 DataFrame 进行查询:「DSL(Domain-Specific Language)」和「 SQL」 。
    DSL 是一种特定领域语言,它提供了一组用于操作 DataFrame 的方法 。例如,下面是一个使用 DSL 进行查询的例子:
    import org.apache.spark.sql.SparkSessionval spark = SparkSession.builder.appName("DSL and SQL").getOrCreate()import spark.implicits._val df = Seq(("Alice", 25),("Bob", 30),("Charlie", 35)).toDF("name", "age")df.select("name", "age").filter($"age" > 25).show()SQL 是一种结构化查询语言,它用于管理关系数据库系统 。在 Spark 中,可以使用 SQL 对 DataFrame 进行查询 。例如 , 下面是一个使用 SQL 进行查询的例子:
    import org.apache.spark.sql.SparkSessionval spark = SparkSession.builder.appName("DSL and SQL").getOrCreate()import spark.implicits._val df = Seq(("Alice", 25),("Bob", 30),("Charlie", 35)).toDF("name", "age")df.createOrReplaceTempView("people")spark.sql("SELECT name, age FROM people WHERE age > 25").show()DSL 和 SQL 的区别在于语法和风格 。DSL 使用方法调用链来构建查询,而 SQL 使用声明式语言来描述查询 。选择哪种方式取决于个人喜好和使用场景 。
    6.Spark SQL 数据源Spark SQL 支持多种数据源,包括 Parquet、JSON、CSV、JDBC、Hive 等 。
    下面是示例代码:
    import org.apache.spark.sql.SparkSessionval spark = SparkSession.builder.appName("Data Sources Example").getOrCreate()// Parquetval df = spark.read.parquet("path/to/parquet/file")// JSON val df = spark.read.json("path/to/json/file")// CSVval df = spark.read.option("header", "true").csv("path/to/csv/file")// JDBCval df = spark.read.format("jdbc").option("url", "jdbc:MySQL://host:port/database").option("dbtable", "table").option("user", "username").option("password", "password").load()df.show()


    推荐阅读