Apache Spark 大数据:RDD、DataFrame 与集群计算

FreeGuideOnline 最新 2026-06-17

Apache Spark 大数据:RDD、DataFrame 与集群计算

1. 什么是 Apache Spark?

Apache Spark 是一个开源的统一分析引擎,专为大规模数据处理而设计。它提供了比 Hadoop MapReduce 快上百倍的内存计算能力,同时支持批处理、交互式查询、流处理和机器学习等多种工作负载。Spark 的核心抽象是弹性分布式数据集(RDD),以及更高层的 DataFrame / Dataset API,让开发者用简洁的代码完成复杂的分布式计算。

1.1 为什么选择 Spark

  • 内存计算:中间结果缓存于内存,避免反复读写磁盘。
  • 多语言支持:提供 Scala、Java、Python 和 R API。
  • 统一栈:Spark SQL、Spark Streaming、MLlib(机器学习)、GraphX(图计算)可以无缝组合,在一个应用中完成端到端分析。
  • 丰富的数据源连接:轻松读写 HDFS、S3、HBase、Cassandra、关系数据库、Parquet、Avro 等。

1.2 Spark 与 Hadoop MapReduce 的对比

特性 MapReduce Spark
中间结果存储 磁盘 内存(可溢写到磁盘)
迭代计算(如机器学习) 慢(多次 MapReduce) 快(内存缓存)
编程模型 仅 Map 与 Reduce 丰富的转换与行动算符
实时流处理 不支持 Spark Streaming / Structured Streaming
SQL 查询 Hive on MR Spark SQL(性能提升数倍)

2. 集群架构与执行模型

理解 Spark 的集群计算方式,是编写高效应用的基础。

2.1 集群组件

Spark 应用程序以独立进程集合的形式运行在集群上,由驱动程序(Driver)中的 SparkContext 协调。

  • Driver Program:运行用户的 main 函数,创建 SparkContext,负责将应用拆分为任务并调度执行。
  • Cluster Manager:管理集群资源的框架,可以是 Spark 自带的独立管理器、YARN、Kubernetes 或 Mesos。
  • Worker Node(Executor):实际执行任务的进程,运行在集群中的每台机器上,存储数据块并提供计算能力。

2.2 作业、阶段与任务

  • 作业(Job):每次 action 操作(如 count()collect())触发一个 Job。
  • 阶段(Stage):遇到宽依赖(如 groupByKey)时划分 Stage,窄依赖的 transformation 会被合并到一个 Stage 中。
  • 任务(Task):Stage 内的最小执行单元,每个分区对应一个 Task,由 Executor 并发执行。

2.3 惰性求值与 DAG 调度

Spark 中的所有转换操作都是惰性的,只有遇到行动操作时才会实际执行计算。Driver 会将所有转换构建成一个有向无环图(DAG),然后划分 Stage,优化执行计划,最后将任务发送到集群执行。这也是性能优化的关键:避免过早触发 Action,减少不必要的数据移动

3. 核心抽象:弹性分布式数据集 (RDD)

RDD 是 Spark 最基础的抽象,代表一个不可变的、分区的数据集合,可以并行操作。

3.1 RDD 的特性

  • 分区:数据被逻辑分片,分布于集群节点上。
  • 不可变性:RDD 一经创建,无法直接修改,所有 transformations 均返回新 RDD。
  • 弹性:发生数据丢失时,可通过血统(lineage)信息重新计算。
  • 可存储:可持久化到内存或磁盘。

3.2 创建 RDD

# 从集合创建
rdd = spark.sparkContext.parallelize([1, 2, 3, 4, 5])

# 从外部存储读取(如文本文件)
lines = spark.sparkContext.textFile("hdfs://path/to/file.txt")

3.3 转换与行动

  • 转换(Transformation):返回新 RDD,惰性执行。如 map()filter()flatMap()distinct()groupByKey()reduceByKey()sortBy()
  • 行动(Action):触发计算并返回值到 Driver 或输出到存储。如 collect()count()take()saveAsTextFile()foreach()
# 示例:单词计数
text_rdd = spark.sparkContext.textFile("words.txt")
words = text_rdd.flatMap(lambda line: line.split(" "))
word_pairs = words.map(lambda word: (word, 1))
counts = word_pairs.reduceByKey(lambda a, b: a + b)
counts.saveAsTextFile("output")

3.4 持久化(缓存)

对于会被多次使用的 RDD,应调用 rdd.cache()rdd.persist(StorageLevel.MEMORY_AND_DISK) 将其缓存,避免重复计算。存储级别包括内存、磁盘、序列化等选项。

4. 高层抽象:DataFrame

DataFrame 是构建在 RDD 之上的结构化数据抽象,类似于关系数据库中的表或 Pandas 中的 DataFrame,拥有列名和行。DataFrame 利用 Spark SQL 的 Catalyst 优化器,能生成比手写 RDD 转换更高效的查询计划。

4.1 创建 DataFrame

# 从现有 RDD + Schema 创建
df = spark.createDataFrame(rdd, schema=["col1", "col2"])

# 从 JSON、CSV、Parquet 等文件直接读取
df = spark.read.json("path/to/file.json")
df = spark.read.parquet("path/to/file.parquet")

# 通过 Spark SQL 查询已有 DataFrame
df.createOrReplaceTempView("my_table")
result = spark.sql("SELECT col1, COUNT(*) FROM my_table GROUP BY col1")

4.2 DataFrame 操作

DataFrame 提供声明式 API,支持类 SQL 的操作:

  • 列选择df.select("name", "age")
  • 过滤df.filter(df.age > 18)
  • 聚合df.groupBy("department").count()
  • 排序df.orderBy("salary")
  • 连接df1.join(df2, "id")
# 示例:计算平均年龄
from pyspark.sql.functions import avg
df.groupBy("city").agg(avg("age").alias("avg_age")).show()

4.3 DataFrame 与 RDD 的比较

维度 RDD DataFrame
数据格式 任意 Scala/Java/Python 对象 结构化的行对象,有 Schema
串行化/反串行化 使用 Java 串行化或 Kryo Spark 内部 Tungsten 二进制格式,高效内存管理
执行优化 无优化,直接执行 DAG Catalyst 优化器(谓词下推、列剪枝等)
编程风格 函数式,较底层的 API 声明式,跨语言一致性好
推荐场景 需要底层控制和类型安全时 绝大多数结构化数据分析任务

5. 集群计算实践与优化要点

5.1 并行度与分区

合理的分区数是性能的关键。通常分区数应设为 2-3 倍的集群 CPU 核心总数。

  • 读取文件时可通过 spark.sql.files.maxPartitionBytes 调整分区大小。
  • 在 Shuffle 后,可通过 spark.sql.shuffle.partitions(默认 200)配置分区数。
  • 使用 repartition()coalesce() 手动调整分区数量。

5.2 数据倾斜

groupByKeyjoin 时,若某些 Key 的数据量远超其他 Key,会导致部分任务耗时过长。

解决方案

  • 加盐处理:在 Key 后追加随机前缀,使数据分散,汇总时再去掉前缀。
  • 广播连接:当小表大小适合内存时,使用 broadcast hint 避免 Shuffle。
  • 自定义分区器,根据实际数据分布设计分区逻辑。

5.3 内存管理与序列化

  • 优先使用内存序列化(MEMORY_ONLY_SER)或内存+磁盘序列化缓存,可节省内存并提升速度。
  • 使用 Kryo 序列化 代替默认 Java 序列化(通过 spark.serializer org.apache.spark.serializer.KryoSerializer 设置)。
  • 在资源充足时,提升 spark.memory.fraction 的比例,给执行内存更多空间。

5.4 选择合适的 API

  • 对于结构清晰、非自定义数据类型的处理,优先使用 DataFrame/Dataset,可获得 Catalyst 优化和 Tungsten 二进制执行。
  • 当需要处理非结构化数据、复杂自定义逻辑或需要进程级别的控制时,可降级使用 RDD。
  • 尽量使用内置函数而非 UDF(用户自定义函数),因为 Catalyst 能优化内置函数,而 Python UDF 需要在不同进程间序列化数据,开销较大。

6. 完整示例:日志分析流水线

以下展示用 DataFrame 从 JSON 日志文件计算每个 HTTP 状态码的请求次数,并按降序排列。

from pyspark.sql import SparkSession
from pyspark.sql.functions import desc

spark = SparkSession.builder \
    .appName("Log Analysis") \
    .getOrCreate()

logs = spark.read.json("hdfs://logs/2025-01-01.json")
status_counts = logs.groupBy("status") \
                    .count() \
                    .orderBy(desc("count"))
status_counts.show()
spark.stop()

该作业在集群上会自动划分 Task,利用内存缓存中间 Shuffle 数据,结合 Catalyst 优化执行计划,处理 TB 级数据时依然保持稳定。

7. 总结

Apache Spark 通过 RDD 提供了低层次的分布式数据处理抽象,而 DataFrame 进一步带来了结构化数据的易用性和性能优化。掌握二者的区别和适用场景,理解集群执行模型、分区策略、数据倾斜处理,是成为 Spark 专业开发者的必经之路。从简单的单词计数到复杂的 ETL 和机器学习管道,Spark 始终是处理海量数据的高效选择。