Spark入门指南:从基础概念到实践应用全解析(14)

5.PV,UV统计下面是用Structured Streaming实现PV,UV统计的例子,我们来感受实战下:
import org.apache.spark.sql.SparkSessionimport org.apache.spark.sql.functions._object PVUVExample {def main(args: Array[String]): Unit = {val spark = SparkSession.builder.appName("PVUVExample").getOrCreate()import spark.implicits._// 假设我们有一个包含用户ID和访问的URL的输入流val lines = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()val data = https://www.isolves.com/it/cxkf/kj/2023-10-17/lines.as[String].map(line => {val parts = line.split(",")(parts(0), parts(1))}).toDF("user", "url")// 计算PVval pv = data.groupBy("url").count().withColumnRenamed("count", "pv")val pvQuery = pv.writeStream.outputMode("complete").format("console").start()// 计算UVval uv = data.dropDuplicates().groupBy("url").count().withColumnRenamed("count", "uv")val uvQuery = uv.writeStream.outputMode("complete").format("console").start()pvQuery.awaitTermination()uvQuery.awaitTermination()}}这段代码演示了如何使用Structured Streaming对数据进行PV和UV统计 。它首先从一个socket源读取数据,然后使用groupBy和count对数据进行PV统计,最后使用dropDuplicates、groupBy和count对数据进行UV统计 。
假设我们在本地启动了一个socket服务器 , 并向其发送以下数据:
user1,http://example.com/page1user2,http://example.com/page1user1,http://example.com/page2user3,http://example.com/page1user2,http://example.com/page2user3,http://example.com/page2那么程序将输出以下结果:
-------------------------------------------Batch: 0-------------------------------------------+--------------------+---+|url| pv|+--------------------+---+|http://example.co...|3||http://example.co...|3|+--------------------+---+-------------------------------------------Batch: 0-------------------------------------------+--------------------+---+|url| uv|+--------------------+---+|http://example.co...|2||http://example.co...|3|+--------------------+---+总结在此,我们对Spark的基本概念、使用方式以及部分原理进行了简单的介绍 。Spark以其强大的处理能力和灵活性,已经成为大数据处理领域的一个重要工具 。然而,这只是冰山一角 。Spark的世界里还有许多深度和广度等待着我们去探索 。
作为初学者,你可能会觉得这个领域庞大且复杂 。但请记住 , 每个都是从初学者开始的 。不断的学习和实践,你将能够更好的理解和掌握Spark,并将其应用于解决实际问题 。这篇文章可能不能涵盖所有的知识点,但我希望它能带给你收获和思考 。




推荐阅读