Python 多进程 multiprocessing:绕过 GIL
为什么需要多进程:认识 Python 的全局解释器锁(GIL)
Python 因为其简单易用和丰富的生态,成为数据处理、Web 开发等领域的首选语言。然而,当我们需要进行 CPU 密集型计算时,一个隐藏的限制会浮出水面——全局解释器锁(GIL)。
GIL 是 CPython(最主流的 Python 解释器)中的一个互斥锁,它确保同一时刻只有一个线程能够执行 Python 字节码。这意味着:
- 即使你创建了多个线程,它们也无法真正并行执行 Python 代码;
- 对于 I/O 密集型任务(网络请求、文件读写等),多线程仍然有效,因为 GIL 会在等待 I/O 时释放;
- 但对于 CPU 密集型任务(数学计算、图像处理、大规模循环等),多线程几乎无法带来性能提升,甚至可能因为线程切换而变慢。
这时,多进程(multiprocessing) 就成了绕过 GIL 的首选方案。每个进程拥有独立的 Python 解释器和独立的内存空间,它们可以真正地并行执行,充分利用多核 CPU 的优势。
multiprocessing 模块快速上手
Python 标准库中的 multiprocessing 模块提供了和 threading 十分相似的 API,让我们可以用几乎相同的方式编写并发程序,却能享受到多核并行的能力。
创建并启动一个进程
最基础的方式是实例化 Process 类,传入一个目标函数。
import multiprocessing
import time
def worker(name):
print(f"进程 {name} 开始")
time.sleep(2)
print(f"进程 {name} 结束")
if __name__ == '__main__':
p = multiprocessing.Process(target=worker, args=('一号',))
p.start() # 启动进程
p.join() # 等待进程结束
print("主进程退出")
要点说明:
- 将进程逻辑放在
if __name__ == '__main__':保护中,是 Windows 平台下的必须要求(避免递归创建子进程)。 start()开始进程活动,join()会阻塞主进程直到子进程结束。
通过继承 Process 类创建进程
你也可以定义一个继承自 Process 的类,并重写 run() 方法:
class MyProcess(multiprocessing.Process):
def __init__(self, name):
super().__init__()
self.name = name
def run(self):
print(f"自定义进程 {self.name} 正在运行")
if __name__ == '__main__':
p = MyProcess('TaskA')
p.start()
p.join()
两种方式本质相同,选择适合自己代码风格的那一种即可。
使用进程池(Pool)批量管理任务
当需要创建大量进程时,频繁地创建和销毁进程会带来不小的开销。Pool(进程池)可以预先创建固定数量的进程,并把任务分配给他们执行,大大提高效率。
基本用法:apply, apply_async, map
import multiprocessing
import time
def square(x):
time.sleep(0.5) # 模拟耗时计算
return x * x
if __name__ == '__main__':
# 创建一个包含4个进程的池
with multiprocessing.Pool(processes=4) as pool:
# 同步调用:一次执行一个任务,会阻塞
result1 = pool.apply(square, (10,))
print(f"apply 结果: {result1}")
# 异步调用:返回 AsyncResult 对象,可以用 get() 获取结果
async_result = pool.apply_async(square, (8,))
print(f"apply_async 结果: {async_result.get()}")
# map 方法:将函数应用于可迭代对象的每个元素,并行工作
numbers = [1, 2, 3, 4, 5]
results = pool.map(square, numbers)
print(f"map 结果: {results}")
# imap 是按顺序返回结果的迭代器(惰性)
# starmap 支持多个参数
使用 with 语句 会自动调用 close() 和 terminate(),保证资源被正确释放。
异步回调
apply_async 还支持回调函数,任务完成后会自动调用。
def callback_func(result):
print(f"回调得到结果: {result}")
pool.apply_async(square, (7,), callback=callback_func)
进程间通信(IPC)
不同进程有独立的内存空间,默认情况下无法直接共享数据。multiprocessing 提供了多种通信方式。
Queue(队列)
Queue 是多进程安全的队列,适用于生产者-消费者模式。
import multiprocessing
def producer(q):
for i in range(5):
q.put(f'数据 {i}')
print(f'生产: 数据 {i}')
q.put(None) # 发送结束信号
def consumer(q):
while True:
item = q.get()
if item is None:
break
print(f'消费: {item}')
if __name__ == '__main__':
q = multiprocessing.Queue()
p1 = multiprocessing.Process(target=producer, args=(q,))
p2 = multiprocessing.Process(target=consumer, args=(q,))
p1.start()
p2.start()
p1.join()
p2.join()
Pipe(管道)
管道提供一对连接对象(conn1, conn2),默认是双工的。
import multiprocessing
def sender(conn):
conn.send("来自发送方的消息")
response = conn.recv()
print(f"发送方收到回复: {response}")
def receiver(conn):
msg = conn.recv()
print(f"接收方收到: {msg}")
conn.send("确认收到")
if __name__ == '__main__':
parent_conn, child_conn = multiprocessing.Pipe()
p1 = multiprocessing.Process(target=sender, args=(parent_conn,))
p2 = multiprocessing.Process(target=receiver, args=(child_conn,))
p1.start()
p2.start()
p1.join()
p2.join()
共享内存:在多进程间共享数据
对于一些简单的数值或数组,可以使用 Value 和 Array 在共享内存中创建对象,从而在多个进程间直接访问和修改。
import multiprocessing
def increment(n, a):
n.value += 1
for i in range(len(a)):
a[i] += 1
if __name__ == '__main__':
num = multiprocessing.Value('i', 0) # 'i' 表示整数
arr = multiprocessing.Array('i', [0, 1, 2]) # 整型数组
p1 = multiprocessing.Process(target=increment, args=(num, arr))
p2 = multiprocessing.Process(target=increment, args=(num, arr))
p1.start()
p2.start()
p1.join()
p2.join()
print(f"num = {num.value}, arr = {arr[:]}")
然而,直接使用共享内存修改数据 不是原子操作,必须使用锁来避免竞争条件。
同步机制:锁(Lock)
multiprocessing.Lock 可以确保同一时刻只有一个进程访问临界区。
import multiprocessing
def worker(lock, counter):
for _ in range(1000):
with lock:
counter.value += 1
if __name__ == '__main__':
lock = multiprocessing.Lock()
counter = multiprocessing.Value('i', 0)
processes = [
multiprocessing.Process(target=worker, args=(lock, counter))
for _ in range(10)
]
for p in processes:
p.start()
for p in processes:
p.join()
print(f"最终计数: {counter.value}")
此外,与线程模块类似,还有 RLock, Semaphore, Event, Condition 等同步原语。
实战:绕过 GIL 对比线程与进程
我们来看一个 CPU 密集型的例子:计算斐波那契数列(递归实现,纯 Python 运算)。
import time
import threading
import multiprocessing
def fib(n):
if n <= 1:
return n
return fib(n-1) + fib(n-2)
def run_sequential(n, count):
start = time.time()
for _ in range(count):
fib(n)
return time.time() - start
def run_threads(n, count):
start = time.time()
threads = []
for _ in range(count):
t = threading.Thread(target=fib, args=(n,))
threads.append(t)
t.start()
for t in threads:
t.join()
return time.time() - start
def run_processes(n, count):
start = time.time()
processes = []
for _ in range(count):
p = multiprocessing.Process(target=fib, args=(n,))
processes.append(p)
p.start()
for p in processes:
p.join()
return time.time() - start
if __name__ == '__main__':
N = 35
TASKS = 8
print(f"顺序执行 {TASKS} 次 fib({N}): {run_sequential(N, TASKS):.2f}s")
print(f"多线程 {TASKS} 次 fib({N}): {run_threads(N, TASKS):.2f}s")
print(f"多进程 {TASKS} 次 fib({N}): {run_processes(N, TASKS):.2f}s")
在 8 核机器上的典型输出:
顺序执行 8 次 fib(35): 20.50s
多线程 8 次 fib(35): 20.80s (几乎无提升,甚至更慢)
多进程 8 次 fib(35): 3.30s (约 6.2 倍加速)
结论一目了然:CPU 密集型任务使用多进程能获得接近线性的加速,而多线程则受困于 GIL,无法利用多核。
注意事项与常见问题
-
if __name__ == '__main__'不可省略
在 Windows 和 macOS(使用spawn模式)下,必须将进程启动代码放在该保护中,否则会无限递归创建子进程。 -
谨慎传递大对象
通过参数或队列传递数据时,对象会被序列化(pickle)。大量数据的序列化与反序列化会成为性能瓶颈。可考虑使用共享内存Array或将数据放在文件/数据库中。 -
选择正确的启动方法
Python 提供了fork(Unix)、spawn和forkserver三种启动方式。fork速度快但可能引发死锁或资源泄漏;spawn更安全但启动慢。可以通过multiprocessing.set_start_method('spawn')设置。 -
异常处理
子进程中抛出的异常默认不会影响父进程,需要在子进程内部妥善捕获和处理,或者通过队列传递异常信息。 -
优雅关闭进程
调用terminate()会强制结束进程,可能造成资源未释放。尽量设计任务完成自然退出,或使用Event/Queue发送退出信号。
小结
- GIL 是 CPython 多线程并行的主要障碍,多进程可以完美绕开它。
multiprocessing模块提供了类线程的 API,包括Process,Pool,Queue,Pipe,Value,Array和同步原语。- 对于 I/O 密集型任务,优先使用
asyncio或线程;对于 CPU 密集型任务,毫不犹豫选择多进程。 - 注意跨平台兼容性、数据序列化开销以及进程的优雅启停。
掌握了 multiprocessing,你就可以充分利用服务器的全部 CPU 核心,让 Python 程序在计算密集型场景下焕发强劲的性能。