PySpark 大数据分析:DataFrame 与 Spark SQL

FreeGuideOnline 最新 2026-06-12

Apache Spark 大数据处理快速入门:使用 PySpark DataFrame 与 Spark SQL

欢迎来到本教程!如果你正在寻找一种高效、易用的方式来处理海量数据,那么 Apache Spark 绝对是你的不二之选。本教程将带你从零开始,使用 Python 语言(PySpark)掌握 Spark 的核心数据处理能力,重点会放在 DataFrame API 和 Spark SQL 上。无需预先具备大数据背景,只要你有基本的 Python 知识,就能跟着我们一起完成你的第一个分布式数据分析任务。

为什么选择 Apache Spark?

在开始写代码之前,我们先花一分钟理解一下 Spark 到底是什么,以及它为什么如此流行。简单来说,Apache Spark 是一个分布式计算引擎,它能够将数据分片存储在集群的多台机器上,并且并行地执行计算任务。这意味着你可以在几秒钟内处理原本需要数小时才能完成的 TB 级数据集。

相比于传统的 MapReduce,Spark 最大的优势在于内存计算统一的编程模型。它提供了面向多种场景的库:Spark SQL(结构化数据)、Spark Streaming(流处理)、MLlib(机器学习)和 GraphX(图计算)。而我们今天的主角——DataFrame 和 Spark SQL——就是 Spark SQL 模块中的核心抽象,它们让你可以像使用 Excel 透视表或 SQL 数据库一样,用声明式的方式操纵分布式数据集。

环境准备:启动你的第一个 PySpark 会话

首先,你需要安装 PySpark。推荐使用 pip 安装:

pip install pyspark

安装完成后,打开你的 Python 交互环境或 Jupyter Notebook,创建一个 SparkSession。这是任何 Spark 应用的入口点。

from pyspark.sql import SparkSession

# 创建一个名为 "MyFirstSparkApp" 的 SparkSession
spark = SparkSession.builder \
    .appName("MyFirstSparkApp") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

# 检查 Spark 版本
print(f"Spark 版本: {spark.version}")

运行以上代码,如果没有报错,恭喜你!你的本地 Spark 环境已经就绪。spark 这个对象就是我们接下来所有操作的起点。

初识 DataFrame:分布式的表格数据

在 Spark 中,DataFrame 是一个带有模式(Schema)的分布式数据集合,在概念上非常类似于关系型数据库的表或者 pandas 的 DataFrame。你可以把它想象成一张巨大的表格,每一列都有明确的名称和数据类型,而数据本身可以分布在成百上千台机器上,但你操作它的代码就像在操作一个本地对象一样简单。

创建 DataFrame

创建 DataFrame 最常用的方式有三种:从现有数据结构创建、从外部文件读取、或者从 RDD 转换。我们先从最简单的开始。

1. 从 Python 集合创建

# 从包含元组的列表创建
data = [("Alice", 25, "Engineer"), ("Bob", 30, "Designer"), ("Cathy", 28, "Data Scientist")]
columns = ["Name", "Age", "Profession"]
df = spark.createDataFrame(data, schema=columns)

# 查看 DataFrame 内容
df.show()

输出:

+-----+---+--------------+
| Name|Age|    Profession|
+-----+---+--------------+
|Alice| 25|      Engineer|
|  Bob| 30|      Designer|
|Cathy| 28|Data Scientist|
+-----+---+--------------+

2. 从外部文件读取

Spark 支持读取多种数据源,如 CSV、JSON、Parquet 等。这里我们以一个简单的 CSV 文件为例(假设当前目录下有一个 employees.csv 文件,内容与上面相同)。

# 读取 CSV 文件,指定第一行为列名
df_csv = spark.read.csv("employees.csv", header=True, inferSchema=True)
df_csv.printSchema()
df_csv.show()

inferSchema=True 会让 Spark 自动推断每一列的数据类型,免去你手动定义的麻烦。

DataFrame 基本操作

现在我们有了数据,就来看看 DataFrame 提供的一些核心操作方法。这些方法分为转换(Transformation)行动(Action)。转换是惰性求值的,只有当你调用行动操作时,Spark 才会真正开始计算。

查看数据

  • show(n):显示前 n 行。
  • printSchema():打印列名和数据类型。
  • columns:返回列名列表。
  • describe():计算数值列的统计信息。
df.describe(["Age"]).show()

选择和过滤列

使用 selectfilter(或 where):

# 选择姓名和年龄两列
df.select("Name", "Age").show()

# 过滤出年龄大于 25 的记录
df.filter(df.Age > 25).show()
# 或者用字符串形式的条件表达式
df.filter("Age > 25").show()

添加新列

使用 withColumn 可以添加新列或替换已有列:

# 添加一个“五年后年龄”的列
df.withColumn("AgeIn5Years", df.Age + 5).show()

重命名列

df.withColumnRenamed("Name", "FullName").show()

分组和聚合

分组聚合是数据分析的核心,Spark DataFrame 提供了一套与 SQL 非常相似的 API。

# 按职业分类,计算平均年龄
df.groupBy("Profession").avg("Age").show()

# 使用 agg 函数进行多个聚合
from pyspark.sql.functions import min, max, count
df.groupBy("Profession").agg(
    min("Age").alias("min_age"),
    max("Age").alias("max_age"),
    count("*").alias("count")
).show()

排序

# 按年龄降序排列
df.orderBy(df.Age.desc()).show()

使用 SQL 函数

Spark SQL 提供了大量的内置函数,你可以从 pyspark.sql.functions 导入它们,用来做字符串处理、日期运算、数学计算等。

from pyspark.sql.functions import upper, length

df.select(upper(df.Name), length(df.Name).alias("name_length")).show()

Spark SQL:用 SQL 查询分布式数据

如果你熟悉传统的 SQL,你会爱上 Spark SQL。它允许你直接在 DataFrame 上注册临时视图,然后使用标准 SQL 语句进行查询,极大降低了学习成本。

注册临时视图

调用 createOrReplaceTempView 方法将一个 DataFrame 注册为一张临时的 SQL 表。

df.createOrReplaceTempView("employees")

执行 SQL 查询

注册完成后,就可以使用 spark.sql 来执行 SQL 语句了。

result = spark.sql("""
    SELECT Profession, AVG(Age) as avg_age, COUNT(*) as cnt
    FROM employees
    WHERE Age > 25
    GROUP BY Profession
    ORDER BY avg_age DESC
""")
result.show()

你会得到与前面分组聚合完全相同的结果,但表达方式更加简洁直观。Spark SQL 的引擎会自动将 SQL 查询翻译成底层的 RDD 操作,并进行优化。

混合使用 API 和 SQL

你不需要在两种方式之间二选一,完全可以混用。例如,你可以先用 DataFrame API 对数据进行清洗和转换,然后注册成表,最后用 SQL 完成复杂的联表查询。这种灵活性让 Spark 能够适应各种场景。

实战案例:分析电商订单数据

让我们通过一个稍微真实一点的例子,把学到的知识串联起来。假设我们有三个 CSV 文件:customers.csv(客户信息)、orders.csv(订单信息)、products.csv(产品信息)。我们的目标是找出每个客户的总消费金额,并列出消费最高的前 5 名。

1. 加载数据

customers = spark.read.csv("customers.csv", header=True, inferSchema=True)
orders = spark.read.csv("orders.csv", header=True, inferSchema=True)
products = spark.read.csv("products.csv", header=True, inferSchema=True)

customers.createOrReplaceTempView("customers")
orders.createOrReplaceTempView("orders")
products.createOrReplaceTempView("products")

2. 关联查询并计算总额

使用 SQL 将订单表与产品表关联获取单价,再按客户分组求和。

top_customers_sql = spark.sql("""
    SELECT c.customer_id, c.name, SUM(p.price * o.quantity) AS total_spent
    FROM orders o
    JOIN products p ON o.product_id = p.id
    JOIN customers c ON o.customer_id = c.id
    GROUP BY c.customer_id, c.name
    ORDER BY total_spent DESC
    LIMIT 5
""")
top_customers_sql.show()

如果更喜欢 DataFrame API,也可以这样写:

from pyspark.sql.functions import sum as _sum

joined = orders.join(products, orders.product_id == products.id) \
               .join(customers, orders.customer_id == customers.id)

top_customers_df = joined.groupBy("customer_id", "name") \
    .agg(_sum(products.price * orders.quantity).alias("total_spent")) \
    .orderBy("total_spent", ascending=False) \
    .limit(5)

top_customers_df.show()

两种方式得到的结果完全一致,你可以根据自己的喜好选择。

3. 保存结果

处理完的数据通常需要写回文件系统或数据库。Spark 支持多种输出格式。

# 保存为单个 CSV 文件(注意:在分布式环境下会生成一个文件夹,而不是单个文件)
top_customers_df.coalesce(1).write.csv("output/top_customers.csv", header=True, mode="overwrite")

# 保存为 Parquet 格式(列式存储,更高效)
top_customers_df.write.parquet("output/top_customers.parquet", mode="overwrite")

常见问题与排错

Q: 我的数据量很大,show() 会卡住吗?

A: show() 默认只显示前 20 行,即使你的数据有上亿条,它也只会抓取少量的样本,不会引起性能问题。

Q: 为什么修改 DataFrame 后,原来的变量并没有变?

A: DataFrame 是不可变的。所有的转换操作(如 withColumnfilter)都会返回一个新的 DataFrame,你需要将它赋给一个新变量。

Q: 什么时候用 DataFrame API,什么时候用 Spark SQL?

A: 这主要取决于团队习惯和任务类型。DataFrame API 可以更好地利用类型安全性,适合 ETL 管道;Spark SQL 则更适合临时查询和数据分析师。它们在性能上没有本质区别,因为底层执行计划都是相同的。

Q: 我的 PySpark 脚本报错了 OutOfMemoryError,怎么办?

A: 这通常是因为数据倾斜或内存配置不足。可以尝试增加 executor 内存(--conf spark.executor.memory=4g),或者对数据进行重分区(repartition),避免单个分区数据过大。

下一步学什么?

恭喜你完成了 PySpark 的第一次实战!现在你已经能够:

  • 创建 DataFrame 并读取外部数据
  • 使用选择和过滤、分组聚合、排序等基本操作
  • 用 SQL 查询 DataFrame
  • 将分析结果写入外部存储

这只是 Spark 强大功能的冰山一角。如果你想继续深入,建议学习以下主题:

  • 数据清洗:处理缺失值、重复值,使用 UDF(用户自定义函数)
  • Spark 性能优化:理解 Catalyst 优化器、钨丝计划、分区与缓存
  • Spark MLlib:在分布式数据上应用机器学习算法
  • Spark Streaming:处理实时数据流
  • 连接外部数据源:如 Hive、JDBC、Kafka 等

希望本教程能为你打开大数据世界的大门。开始用 Spark 处理你那堆曾经让电脑卡死的数据吧,现在你会发现,它们只不过是几行代码的事儿。