PySpark 数据处理:用 Python 操作 Spark
PySpark 数据处理:用 Python 操作 Spark
为什么选择 PySpark?
Apache Spark 是一个快速、通用的分布式计算引擎,而 PySpark 是它的 Python API。它允许你在 Python 环境中轻松处理大规模数据集,结合了 Spark 的分布式计算能力和 Python 的简洁语法。无论你是数据分析师、数据工程师还是机器学习从业者,PySpark 都能帮你高效完成数据清洗、转换和分析任务。
本教程将从零开始,带你掌握 PySpark 的核心操作,快速上手大数据处理。
环境准备
在使用 PySpark 前,你需要安装并配置环境。推荐使用以下两种方式之一:
方式一:使用 pip 安装
pip install pyspark
如果你还需要处理结构化数据并存储为 Parquet 等格式,建议同时安装 pandas 和 pyarrow:
pip install pandas pyarrow
方式二:通过 Docker 或本地 Spark 集群
如果处理超大规模数据,可以搭建 Spark 集群。但本教程中的示例在单机模式下即可运行,只需安装 pyspark 包。
安装完成后,可以在 Python 或 Jupyter Notebook 中导入并验证:
import pyspark
print(pyspark.__version__)
启动 SparkSession
SparkSession 是 PySpark 的入口点,从 Spark 2.0 开始替代了旧的 SQLContext 和 HiveContext。它封装了 Spark 功能,用于创建 DataFrame、注册临时视图等。
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("PySpark数据处理教程") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
appName 用于标识你的应用,getOrCreate 会获取已有 Session 或创建新 Session。单机模式下无需额外配置。
理解核心数据结构
PySpark 提供三种主要的数据抽象:
- RDD(弹性分布式数据集):低层 API,适合非结构化数据或需要精细控制的操作。
- DataFrame:以命名列组织的分布式数据集合,类似于关系型数据库的表或 pandas 的 DataFrame。它优化了执行计划,性能更好。
- Dataset:强类型的 DataFrame,在 Scala 和 Java 中可用,PySpark 中 DataFrame 本质上就是 Dataset[Row]。
本教程将重点介绍 DataFrame,因为它是数据处理中最常用的接口。
创建 DataFrame
DataFrame 可以从多种数据源创建:CSV、JSON、Parquet、数据库或 Python 列表等。
从列表创建
data = [("Alice", 25), ("Bob", 30), ("Charlie", 35)]
columns = ["Name", "Age"]
df = spark.createDataFrame(data, columns)
df.show()
从 CSV 文件创建
假设有一个 people.csv 文件,内容如下:
Name,Age,City
Alice,25,New York
Bob,30,San Francisco
Charlie,35,Los Angeles
加载并推断数据类型:
df_csv = spark.read.csv("people.csv", header=True, inferSchema=True)
df_csv.show()
# 也可以指定模式(Schema),提高性能
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
schema = StructType([
StructField("Name", StringType(), True),
StructField("Age", IntegerType(), True),
StructField("City", StringType(), True)
])
df_csv = spark.read.csv("people.csv", header=True, schema=schema)
从 JSON、Parquet 等格式加载
df_json = spark.read.json("data.json")
df_parquet = spark.read.parquet("data.parquet")
查看数据基本信息
DataFrame 提供了类似 SQL 的探索方法:
df.show(n):显示前 n 行(默认 20)df.printSchema():打印列名和数据类型df.columns:返回列名列表df.describe().show():数值列的统计摘要df.count():返回行数
df.printSchema()
# root
# |-- Name: string (nullable = true)
# |-- Age: long (nullable = true)
# |-- City: string (nullable = true)
df.describe(["Age"]).show()
基本数据操作
PySpark DataFrame API 支持两种风格:类 SQL 的 DSL 和 原生 SQL。
选择列(Select)
df.select("Name", "Age").show()
# 或使用 col 函数进行更复杂的操作
from pyspark.sql.functions import col
df.select(col("Name"), col("Age") + 1).show()
过滤行(Filter / Where)
df.filter(df.Age > 30).show()
# 等价写法
df.where("Age > 30").show()
# 多条件
df.filter((df.Age > 25) & (df.City == "New York")).show()
新增列(withColumn)
df.withColumn("Age_after_5_years", df.Age + 5).show()
# 使用函数表达式
from pyspark.sql.functions import upper, col
df.withColumn("Upper_Name", upper(col("Name"))).show()
重命名列
df.withColumnRenamed("Name", "Full_Name").show()
删除列
df.drop("City").show()
排序
df.sort("Age", ascending=False).show()
# 或使用 orderBy
df.orderBy(df.Age.desc()).show()
聚合操作
分组聚合是数据分析的核心。PySpark 提供强大的聚合函数。
分组计数
df.groupBy("City").count().show()
聚合函数(agg)
from pyspark.sql.functions import avg, max, min, sum
df.groupBy("City").agg(
avg("Age").alias("avg_age"),
max("Age").alias("max_age")
).show()
不分组直接聚合整个 DataFrame
df.agg(avg("Age").alias("avg_age_all")).show()
处理缺失值
现实数据常包含缺失值,PySpark 提供了多种处理方式。
# 丢弃包含任何空值的行
df.na.drop().show()
# 丢弃特定列中有空值的行
df.na.drop(subset=["Age"]).show()
# 填充缺失值
df.na.fill({"Age": 0, "City": "unknown"}).show()
使用 SQL 语句操作
如果你更熟悉 SQL,可以直接将 DataFrame 注册为临时视图,然后使用 SQL 查询。
df.createOrReplaceTempView("people")
sql_result = spark.sql("""
SELECT City, AVG(Age) as avg_age
FROM people
WHERE Age > 25
GROUP BY City
""")
sql_result.show()
与 Pandas 互操作
PySpark 和 pandas 可以无缝转换,方便在分布式计算和单机探索之间切换。
DataFrame 转 Pandas
pandas_df = df.toPandas()
print(pandas_df.head())
注意:
toPandas()会将所有数据收集到 Driver 节点内存,仅适用于小数据预览。
Pandas 转 DataFrame
import pandas as pd
pandas_data = pd.DataFrame({"Name": ["Dave"], "Age": [40], "City": ["Chicago"]})
df_from_pandas = spark.createDataFrame(pandas_data)
df_from_pandas.show()
利用 PySpark 的 Pandas API(Pandas on Spark) 从 Spark 3.2 开始,可以直接使用类似 pandas 的 API 来操作分布式数据,代码几乎无需修改。
import pyspark.pandas as ps
ps_df = ps.DataFrame(pandas_data)
print(ps_df.head())
实战示例:用户行为日志分析
假设我们有一份用户访问日志 logs.json,数据格式如下(每行一个 JSON 对象):
{"user_id": "u1", "event": "click", "timestamp": "2024-01-01 10:00:00"}
{"user_id": "u2", "event": "view", "timestamp": "2024-01-01 10:01:00"}
...
目标:统计每小时的独立用户访问量(UV)。
# 1. 加载数据
log_df = spark.read.json("logs.json")
log_df.printSchema()
# 2. 提取小时
from pyspark.sql.functions import hour, to_timestamp, col
log_df = log_df.withColumn(
"hour",
hour(to_timestamp(col("timestamp"), "yyyy-MM-dd HH:mm:ss"))
)
# 3. 按小时聚合 UV
uv_per_hour = log_df.groupBy("hour").agg(
spark_apply_unique_count("user_id").alias("uv")
)
uv_per_hour.sort("hour").show()
注意:Spark 没有内置的“countDistinct”作为 agg 函数参数?实际使用
from pyspark.sql.functions import countDistinct,然后用countDistinct("user_id")即可。
写入处理结果
处理完成后,可将结果保存为多种格式。
# 保存为 CSV(单文件或多分区)
uv_per_hour.write.csv("output/uv_per_hour", header=True)
# 保存为 Parquet(推荐,压缩率高,读取快)
uv_per_hour.write.parquet("output/uv_per_hour.parquet")
# 保存为 JSON
uv_per_hour.write.json("output/uv_per_hour.json")
# 写入到数据库(需要 JDBC 驱动)
# jdbc_url = "jdbc:postgresql://localhost:5432/mydb"
# uv_per_hour.write.format("jdbc").option("url", jdbc_url) \
# .option("dbtable", "user_uv").option("user", "user").option("password", "pass").save()
写入时可以通过 .mode("overwrite") 或 .mode("append") 控制行为。
性能优化提示
- 使用 Parquet 列式存储:比 CSV/JSON 更快更省空间。
- 调整分区数:
df.repartition(10)可改变分区,平衡任务。 - 缓存常用数据:
df.cache()或df.persist()将数据保存到内存/磁盘,避免重复计算。 - 避免 UDF,多用内置函数:内置函数经过 Catalyst 优化,性能远胜 Python UDF。
- 用
broadcast优化 Join:小表与大表连接时广播小表。 - 合理设置 Executor 内存和核数:在真实集群中需调整
spark-submit参数。
关闭 SparkSession
在脚本结束时,应关闭 Session 释放资源:
spark.stop()
总结
通过本教程,你已经学会了:
- 搭建 PySpark 环境并启动 SparkSession
- 创建和查看 DataFrame
- 进行选择、过滤、聚合等常见数据操作
- 用 SQL 查询数据
- 与 Pandas 自由转换
- 将结果写入外部存储
PySpark 是处理大规模数据集的利器,继续练习这些操作,你就能轻松应对 TB 级数据的挑战。接下来可以探索 Spark MLlib 进行机器学习或 Structured Streaming 实现实时计算。