PySpark 大数据分析:DataFrame 与 Spark SQL
Apache Spark 大数据处理快速入门:使用 PySpark DataFrame 与 Spark SQL
欢迎来到本教程!如果你正在寻找一种高效、易用的方式来处理海量数据,那么 Apache Spark 绝对是你的不二之选。本教程将带你从零开始,使用 Python 语言(PySpark)掌握 Spark 的核心数据处理能力,重点会放在 DataFrame API 和 Spark SQL 上。无需预先具备大数据背景,只要你有基本的 Python 知识,就能跟着我们一起完成你的第一个分布式数据分析任务。
为什么选择 Apache Spark?
在开始写代码之前,我们先花一分钟理解一下 Spark 到底是什么,以及它为什么如此流行。简单来说,Apache Spark 是一个分布式计算引擎,它能够将数据分片存储在集群的多台机器上,并且并行地执行计算任务。这意味着你可以在几秒钟内处理原本需要数小时才能完成的 TB 级数据集。
相比于传统的 MapReduce,Spark 最大的优势在于内存计算和统一的编程模型。它提供了面向多种场景的库:Spark SQL(结构化数据)、Spark Streaming(流处理)、MLlib(机器学习)和 GraphX(图计算)。而我们今天的主角——DataFrame 和 Spark SQL——就是 Spark SQL 模块中的核心抽象,它们让你可以像使用 Excel 透视表或 SQL 数据库一样,用声明式的方式操纵分布式数据集。
环境准备:启动你的第一个 PySpark 会话
首先,你需要安装 PySpark。推荐使用 pip 安装:
pip install pyspark
安装完成后,打开你的 Python 交互环境或 Jupyter Notebook,创建一个 SparkSession。这是任何 Spark 应用的入口点。
from pyspark.sql import SparkSession
# 创建一个名为 "MyFirstSparkApp" 的 SparkSession
spark = SparkSession.builder \
.appName("MyFirstSparkApp") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
# 检查 Spark 版本
print(f"Spark 版本: {spark.version}")
运行以上代码,如果没有报错,恭喜你!你的本地 Spark 环境已经就绪。spark 这个对象就是我们接下来所有操作的起点。
初识 DataFrame:分布式的表格数据
在 Spark 中,DataFrame 是一个带有模式(Schema)的分布式数据集合,在概念上非常类似于关系型数据库的表或者 pandas 的 DataFrame。你可以把它想象成一张巨大的表格,每一列都有明确的名称和数据类型,而数据本身可以分布在成百上千台机器上,但你操作它的代码就像在操作一个本地对象一样简单。
创建 DataFrame
创建 DataFrame 最常用的方式有三种:从现有数据结构创建、从外部文件读取、或者从 RDD 转换。我们先从最简单的开始。
1. 从 Python 集合创建
# 从包含元组的列表创建
data = [("Alice", 25, "Engineer"), ("Bob", 30, "Designer"), ("Cathy", 28, "Data Scientist")]
columns = ["Name", "Age", "Profession"]
df = spark.createDataFrame(data, schema=columns)
# 查看 DataFrame 内容
df.show()
输出:
+-----+---+--------------+
| Name|Age| Profession|
+-----+---+--------------+
|Alice| 25| Engineer|
| Bob| 30| Designer|
|Cathy| 28|Data Scientist|
+-----+---+--------------+
2. 从外部文件读取
Spark 支持读取多种数据源,如 CSV、JSON、Parquet 等。这里我们以一个简单的 CSV 文件为例(假设当前目录下有一个 employees.csv 文件,内容与上面相同)。
# 读取 CSV 文件,指定第一行为列名
df_csv = spark.read.csv("employees.csv", header=True, inferSchema=True)
df_csv.printSchema()
df_csv.show()
inferSchema=True 会让 Spark 自动推断每一列的数据类型,免去你手动定义的麻烦。
DataFrame 基本操作
现在我们有了数据,就来看看 DataFrame 提供的一些核心操作方法。这些方法分为转换(Transformation)和行动(Action)。转换是惰性求值的,只有当你调用行动操作时,Spark 才会真正开始计算。
查看数据
show(n):显示前 n 行。printSchema():打印列名和数据类型。columns:返回列名列表。describe():计算数值列的统计信息。
df.describe(["Age"]).show()
选择和过滤列
使用 select 和 filter(或 where):
# 选择姓名和年龄两列
df.select("Name", "Age").show()
# 过滤出年龄大于 25 的记录
df.filter(df.Age > 25).show()
# 或者用字符串形式的条件表达式
df.filter("Age > 25").show()
添加新列
使用 withColumn 可以添加新列或替换已有列:
# 添加一个“五年后年龄”的列
df.withColumn("AgeIn5Years", df.Age + 5).show()
重命名列
df.withColumnRenamed("Name", "FullName").show()
分组和聚合
分组聚合是数据分析的核心,Spark DataFrame 提供了一套与 SQL 非常相似的 API。
# 按职业分类,计算平均年龄
df.groupBy("Profession").avg("Age").show()
# 使用 agg 函数进行多个聚合
from pyspark.sql.functions import min, max, count
df.groupBy("Profession").agg(
min("Age").alias("min_age"),
max("Age").alias("max_age"),
count("*").alias("count")
).show()
排序
# 按年龄降序排列
df.orderBy(df.Age.desc()).show()
使用 SQL 函数
Spark SQL 提供了大量的内置函数,你可以从 pyspark.sql.functions 导入它们,用来做字符串处理、日期运算、数学计算等。
from pyspark.sql.functions import upper, length
df.select(upper(df.Name), length(df.Name).alias("name_length")).show()
Spark SQL:用 SQL 查询分布式数据
如果你熟悉传统的 SQL,你会爱上 Spark SQL。它允许你直接在 DataFrame 上注册临时视图,然后使用标准 SQL 语句进行查询,极大降低了学习成本。
注册临时视图
调用 createOrReplaceTempView 方法将一个 DataFrame 注册为一张临时的 SQL 表。
df.createOrReplaceTempView("employees")
执行 SQL 查询
注册完成后,就可以使用 spark.sql 来执行 SQL 语句了。
result = spark.sql("""
SELECT Profession, AVG(Age) as avg_age, COUNT(*) as cnt
FROM employees
WHERE Age > 25
GROUP BY Profession
ORDER BY avg_age DESC
""")
result.show()
你会得到与前面分组聚合完全相同的结果,但表达方式更加简洁直观。Spark SQL 的引擎会自动将 SQL 查询翻译成底层的 RDD 操作,并进行优化。
混合使用 API 和 SQL
你不需要在两种方式之间二选一,完全可以混用。例如,你可以先用 DataFrame API 对数据进行清洗和转换,然后注册成表,最后用 SQL 完成复杂的联表查询。这种灵活性让 Spark 能够适应各种场景。
实战案例:分析电商订单数据
让我们通过一个稍微真实一点的例子,把学到的知识串联起来。假设我们有三个 CSV 文件:customers.csv(客户信息)、orders.csv(订单信息)、products.csv(产品信息)。我们的目标是找出每个客户的总消费金额,并列出消费最高的前 5 名。
1. 加载数据
customers = spark.read.csv("customers.csv", header=True, inferSchema=True)
orders = spark.read.csv("orders.csv", header=True, inferSchema=True)
products = spark.read.csv("products.csv", header=True, inferSchema=True)
customers.createOrReplaceTempView("customers")
orders.createOrReplaceTempView("orders")
products.createOrReplaceTempView("products")
2. 关联查询并计算总额
使用 SQL 将订单表与产品表关联获取单价,再按客户分组求和。
top_customers_sql = spark.sql("""
SELECT c.customer_id, c.name, SUM(p.price * o.quantity) AS total_spent
FROM orders o
JOIN products p ON o.product_id = p.id
JOIN customers c ON o.customer_id = c.id
GROUP BY c.customer_id, c.name
ORDER BY total_spent DESC
LIMIT 5
""")
top_customers_sql.show()
如果更喜欢 DataFrame API,也可以这样写:
from pyspark.sql.functions import sum as _sum
joined = orders.join(products, orders.product_id == products.id) \
.join(customers, orders.customer_id == customers.id)
top_customers_df = joined.groupBy("customer_id", "name") \
.agg(_sum(products.price * orders.quantity).alias("total_spent")) \
.orderBy("total_spent", ascending=False) \
.limit(5)
top_customers_df.show()
两种方式得到的结果完全一致,你可以根据自己的喜好选择。
3. 保存结果
处理完的数据通常需要写回文件系统或数据库。Spark 支持多种输出格式。
# 保存为单个 CSV 文件(注意:在分布式环境下会生成一个文件夹,而不是单个文件)
top_customers_df.coalesce(1).write.csv("output/top_customers.csv", header=True, mode="overwrite")
# 保存为 Parquet 格式(列式存储,更高效)
top_customers_df.write.parquet("output/top_customers.parquet", mode="overwrite")
常见问题与排错
Q: 我的数据量很大,show() 会卡住吗?
A: show() 默认只显示前 20 行,即使你的数据有上亿条,它也只会抓取少量的样本,不会引起性能问题。
Q: 为什么修改 DataFrame 后,原来的变量并没有变?
A: DataFrame 是不可变的。所有的转换操作(如 withColumn、filter)都会返回一个新的 DataFrame,你需要将它赋给一个新变量。
Q: 什么时候用 DataFrame API,什么时候用 Spark SQL?
A: 这主要取决于团队习惯和任务类型。DataFrame API 可以更好地利用类型安全性,适合 ETL 管道;Spark SQL 则更适合临时查询和数据分析师。它们在性能上没有本质区别,因为底层执行计划都是相同的。
Q: 我的 PySpark 脚本报错了 OutOfMemoryError,怎么办?
A: 这通常是因为数据倾斜或内存配置不足。可以尝试增加 executor 内存(--conf spark.executor.memory=4g),或者对数据进行重分区(repartition),避免单个分区数据过大。
下一步学什么?
恭喜你完成了 PySpark 的第一次实战!现在你已经能够:
- 创建 DataFrame 并读取外部数据
- 使用选择和过滤、分组聚合、排序等基本操作
- 用 SQL 查询 DataFrame
- 将分析结果写入外部存储
这只是 Spark 强大功能的冰山一角。如果你想继续深入,建议学习以下主题:
- 数据清洗:处理缺失值、重复值,使用 UDF(用户自定义函数)
- Spark 性能优化:理解 Catalyst 优化器、钨丝计划、分区与缓存
- Spark MLlib:在分布式数据上应用机器学习算法
- Spark Streaming:处理实时数据流
- 连接外部数据源:如 Hive、JDBC、Kafka 等
希望本教程能为你打开大数据世界的大门。开始用 Spark 处理你那堆曾经让电脑卡死的数据吧,现在你会发现,它们只不过是几行代码的事儿。