PySpark 数据处理:用 Python 操作 Spark

FreeGuideOnline 最新 2026-06-17

PySpark 数据处理:用 Python 操作 Spark

为什么选择 PySpark?

Apache Spark 是一个快速、通用的分布式计算引擎,而 PySpark 是它的 Python API。它允许你在 Python 环境中轻松处理大规模数据集,结合了 Spark 的分布式计算能力和 Python 的简洁语法。无论你是数据分析师、数据工程师还是机器学习从业者,PySpark 都能帮你高效完成数据清洗、转换和分析任务。

本教程将从零开始,带你掌握 PySpark 的核心操作,快速上手大数据处理。

环境准备

在使用 PySpark 前,你需要安装并配置环境。推荐使用以下两种方式之一:

方式一:使用 pip 安装

pip install pyspark

如果你还需要处理结构化数据并存储为 Parquet 等格式,建议同时安装 pandas 和 pyarrow:

pip install pandas pyarrow

方式二:通过 Docker 或本地 Spark 集群 如果处理超大规模数据,可以搭建 Spark 集群。但本教程中的示例在单机模式下即可运行,只需安装 pyspark 包。

安装完成后,可以在 Python 或 Jupyter Notebook 中导入并验证:

import pyspark
print(pyspark.__version__)

启动 SparkSession

SparkSession 是 PySpark 的入口点,从 Spark 2.0 开始替代了旧的 SQLContext 和 HiveContext。它封装了 Spark 功能,用于创建 DataFrame、注册临时视图等。

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("PySpark数据处理教程") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

appName 用于标识你的应用,getOrCreate 会获取已有 Session 或创建新 Session。单机模式下无需额外配置。

理解核心数据结构

PySpark 提供三种主要的数据抽象:

  • RDD(弹性分布式数据集):低层 API,适合非结构化数据或需要精细控制的操作。
  • DataFrame:以命名列组织的分布式数据集合,类似于关系型数据库的表或 pandas 的 DataFrame。它优化了执行计划,性能更好。
  • Dataset:强类型的 DataFrame,在 Scala 和 Java 中可用,PySpark 中 DataFrame 本质上就是 Dataset[Row]。

本教程将重点介绍 DataFrame,因为它是数据处理中最常用的接口。

创建 DataFrame

DataFrame 可以从多种数据源创建:CSV、JSON、Parquet、数据库或 Python 列表等。

从列表创建

data = [("Alice", 25), ("Bob", 30), ("Charlie", 35)]
columns = ["Name", "Age"]
df = spark.createDataFrame(data, columns)
df.show()

从 CSV 文件创建

假设有一个 people.csv 文件,内容如下:

Name,Age,City
Alice,25,New York
Bob,30,San Francisco
Charlie,35,Los Angeles

加载并推断数据类型:

df_csv = spark.read.csv("people.csv", header=True, inferSchema=True)
df_csv.show()
# 也可以指定模式(Schema),提高性能
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
schema = StructType([
    StructField("Name", StringType(), True),
    StructField("Age", IntegerType(), True),
    StructField("City", StringType(), True)
])
df_csv = spark.read.csv("people.csv", header=True, schema=schema)

从 JSON、Parquet 等格式加载

df_json = spark.read.json("data.json")
df_parquet = spark.read.parquet("data.parquet")

查看数据基本信息

DataFrame 提供了类似 SQL 的探索方法:

  • df.show(n):显示前 n 行(默认 20)
  • df.printSchema():打印列名和数据类型
  • df.columns:返回列名列表
  • df.describe().show():数值列的统计摘要
  • df.count():返回行数
df.printSchema()
# root
#  |-- Name: string (nullable = true)
#  |-- Age: long (nullable = true)
#  |-- City: string (nullable = true)

df.describe(["Age"]).show()

基本数据操作

PySpark DataFrame API 支持两种风格:类 SQL 的 DSL原生 SQL

选择列(Select)

df.select("Name", "Age").show()
# 或使用 col 函数进行更复杂的操作
from pyspark.sql.functions import col
df.select(col("Name"), col("Age") + 1).show()

过滤行(Filter / Where)

df.filter(df.Age > 30).show()
# 等价写法
df.where("Age > 30").show()
# 多条件
df.filter((df.Age > 25) & (df.City == "New York")).show()

新增列(withColumn)

df.withColumn("Age_after_5_years", df.Age + 5).show()
# 使用函数表达式
from pyspark.sql.functions import upper, col
df.withColumn("Upper_Name", upper(col("Name"))).show()

重命名列

df.withColumnRenamed("Name", "Full_Name").show()

删除列

df.drop("City").show()

排序

df.sort("Age", ascending=False).show()
# 或使用 orderBy
df.orderBy(df.Age.desc()).show()

聚合操作

分组聚合是数据分析的核心。PySpark 提供强大的聚合函数。

分组计数

df.groupBy("City").count().show()

聚合函数(agg)

from pyspark.sql.functions import avg, max, min, sum
df.groupBy("City").agg(
    avg("Age").alias("avg_age"),
    max("Age").alias("max_age")
).show()

不分组直接聚合整个 DataFrame

df.agg(avg("Age").alias("avg_age_all")).show()

处理缺失值

现实数据常包含缺失值,PySpark 提供了多种处理方式。

# 丢弃包含任何空值的行
df.na.drop().show()
# 丢弃特定列中有空值的行
df.na.drop(subset=["Age"]).show()
# 填充缺失值
df.na.fill({"Age": 0, "City": "unknown"}).show()

使用 SQL 语句操作

如果你更熟悉 SQL,可以直接将 DataFrame 注册为临时视图,然后使用 SQL 查询。

df.createOrReplaceTempView("people")
sql_result = spark.sql("""
    SELECT City, AVG(Age) as avg_age
    FROM people
    WHERE Age > 25
    GROUP BY City
""")
sql_result.show()

与 Pandas 互操作

PySpark 和 pandas 可以无缝转换,方便在分布式计算和单机探索之间切换。

DataFrame 转 Pandas

pandas_df = df.toPandas()
print(pandas_df.head())

注意:toPandas() 会将所有数据收集到 Driver 节点内存,仅适用于小数据预览。

Pandas 转 DataFrame

import pandas as pd
pandas_data = pd.DataFrame({"Name": ["Dave"], "Age": [40], "City": ["Chicago"]})
df_from_pandas = spark.createDataFrame(pandas_data)
df_from_pandas.show()

利用 PySpark 的 Pandas API(Pandas on Spark) 从 Spark 3.2 开始,可以直接使用类似 pandas 的 API 来操作分布式数据,代码几乎无需修改。

import pyspark.pandas as ps
ps_df = ps.DataFrame(pandas_data)
print(ps_df.head())

实战示例:用户行为日志分析

假设我们有一份用户访问日志 logs.json,数据格式如下(每行一个 JSON 对象):

{"user_id": "u1", "event": "click", "timestamp": "2024-01-01 10:00:00"}
{"user_id": "u2", "event": "view", "timestamp": "2024-01-01 10:01:00"}
...

目标:统计每小时的独立用户访问量(UV)。

# 1. 加载数据
log_df = spark.read.json("logs.json")
log_df.printSchema()

# 2. 提取小时
from pyspark.sql.functions import hour, to_timestamp, col
log_df = log_df.withColumn(
    "hour",
    hour(to_timestamp(col("timestamp"), "yyyy-MM-dd HH:mm:ss"))
)

# 3. 按小时聚合 UV
uv_per_hour = log_df.groupBy("hour").agg(
    spark_apply_unique_count("user_id").alias("uv")
)
uv_per_hour.sort("hour").show()

注意:Spark 没有内置的“countDistinct”作为 agg 函数参数?实际使用 from pyspark.sql.functions import countDistinct,然后用 countDistinct("user_id") 即可。

写入处理结果

处理完成后,可将结果保存为多种格式。

# 保存为 CSV(单文件或多分区)
uv_per_hour.write.csv("output/uv_per_hour", header=True)

# 保存为 Parquet(推荐,压缩率高,读取快)
uv_per_hour.write.parquet("output/uv_per_hour.parquet")

# 保存为 JSON
uv_per_hour.write.json("output/uv_per_hour.json")

# 写入到数据库(需要 JDBC 驱动)
# jdbc_url = "jdbc:postgresql://localhost:5432/mydb"
# uv_per_hour.write.format("jdbc").option("url", jdbc_url) \
#     .option("dbtable", "user_uv").option("user", "user").option("password", "pass").save()

写入时可以通过 .mode("overwrite").mode("append") 控制行为。

性能优化提示

  • 使用 Parquet 列式存储:比 CSV/JSON 更快更省空间。
  • 调整分区数df.repartition(10) 可改变分区,平衡任务。
  • 缓存常用数据df.cache()df.persist() 将数据保存到内存/磁盘,避免重复计算。
  • 避免 UDF,多用内置函数:内置函数经过 Catalyst 优化,性能远胜 Python UDF。
  • broadcast 优化 Join:小表与大表连接时广播小表。
  • 合理设置 Executor 内存和核数:在真实集群中需调整 spark-submit 参数。

关闭 SparkSession

在脚本结束时,应关闭 Session 释放资源:

spark.stop()

总结

通过本教程,你已经学会了:

  • 搭建 PySpark 环境并启动 SparkSession
  • 创建和查看 DataFrame
  • 进行选择、过滤、聚合等常见数据操作
  • 用 SQL 查询数据
  • 与 Pandas 自由转换
  • 将结果写入外部存储

PySpark 是处理大规模数据集的利器,继续练习这些操作,你就能轻松应对 TB 级数据的挑战。接下来可以探索 Spark MLlib 进行机器学习或 Structured Streaming 实现实时计算。