Python 多进程 multiprocessing:绕过 GIL

FreeGuideOnline 最新 2026-06-16

为什么需要多进程:认识 Python 的全局解释器锁(GIL)

Python 因为其简单易用和丰富的生态,成为数据处理、Web 开发等领域的首选语言。然而,当我们需要进行 CPU 密集型计算时,一个隐藏的限制会浮出水面——全局解释器锁(GIL)

GIL 是 CPython(最主流的 Python 解释器)中的一个互斥锁,它确保同一时刻只有一个线程能够执行 Python 字节码。这意味着:

  • 即使你创建了多个线程,它们也无法真正并行执行 Python 代码;
  • 对于 I/O 密集型任务(网络请求、文件读写等),多线程仍然有效,因为 GIL 会在等待 I/O 时释放;
  • 但对于 CPU 密集型任务(数学计算、图像处理、大规模循环等),多线程几乎无法带来性能提升,甚至可能因为线程切换而变慢。

这时,多进程(multiprocessing) 就成了绕过 GIL 的首选方案。每个进程拥有独立的 Python 解释器和独立的内存空间,它们可以真正地并行执行,充分利用多核 CPU 的优势。

multiprocessing 模块快速上手

Python 标准库中的 multiprocessing 模块提供了和 threading 十分相似的 API,让我们可以用几乎相同的方式编写并发程序,却能享受到多核并行的能力。

创建并启动一个进程

最基础的方式是实例化 Process 类,传入一个目标函数。

import multiprocessing
import time

def worker(name):
    print(f"进程 {name} 开始")
    time.sleep(2)
    print(f"进程 {name} 结束")

if __name__ == '__main__':
    p = multiprocessing.Process(target=worker, args=('一号',))
    p.start()           # 启动进程
    p.join()            # 等待进程结束
    print("主进程退出")

要点说明

  • 将进程逻辑放在 if __name__ == '__main__': 保护中,是 Windows 平台下的必须要求(避免递归创建子进程)。
  • start() 开始进程活动,join() 会阻塞主进程直到子进程结束。

通过继承 Process 类创建进程

你也可以定义一个继承自 Process 的类,并重写 run() 方法:

class MyProcess(multiprocessing.Process):
    def __init__(self, name):
        super().__init__()
        self.name = name

    def run(self):
        print(f"自定义进程 {self.name} 正在运行")

if __name__ == '__main__':
    p = MyProcess('TaskA')
    p.start()
    p.join()

两种方式本质相同,选择适合自己代码风格的那一种即可。

使用进程池(Pool)批量管理任务

当需要创建大量进程时,频繁地创建和销毁进程会带来不小的开销。Pool(进程池)可以预先创建固定数量的进程,并把任务分配给他们执行,大大提高效率。

基本用法:apply, apply_async, map

import multiprocessing
import time

def square(x):
    time.sleep(0.5)   # 模拟耗时计算
    return x * x

if __name__ == '__main__':
    # 创建一个包含4个进程的池
    with multiprocessing.Pool(processes=4) as pool:
        # 同步调用:一次执行一个任务,会阻塞
        result1 = pool.apply(square, (10,))
        print(f"apply 结果: {result1}")

        # 异步调用:返回 AsyncResult 对象,可以用 get() 获取结果
        async_result = pool.apply_async(square, (8,))
        print(f"apply_async 结果: {async_result.get()}")

        # map 方法:将函数应用于可迭代对象的每个元素,并行工作
        numbers = [1, 2, 3, 4, 5]
        results = pool.map(square, numbers)
        print(f"map 结果: {results}")

        # imap 是按顺序返回结果的迭代器(惰性)
        # starmap 支持多个参数

使用 with 语句 会自动调用 close()terminate(),保证资源被正确释放。

异步回调

apply_async 还支持回调函数,任务完成后会自动调用。

def callback_func(result):
    print(f"回调得到结果: {result}")

pool.apply_async(square, (7,), callback=callback_func)

进程间通信(IPC)

不同进程有独立的内存空间,默认情况下无法直接共享数据。multiprocessing 提供了多种通信方式。

Queue(队列)

Queue 是多进程安全的队列,适用于生产者-消费者模式。

import multiprocessing

def producer(q):
    for i in range(5):
        q.put(f'数据 {i}')
        print(f'生产: 数据 {i}')
    q.put(None)   # 发送结束信号

def consumer(q):
    while True:
        item = q.get()
        if item is None:
            break
        print(f'消费: {item}')

if __name__ == '__main__':
    q = multiprocessing.Queue()
    p1 = multiprocessing.Process(target=producer, args=(q,))
    p2 = multiprocessing.Process(target=consumer, args=(q,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()

Pipe(管道)

管道提供一对连接对象(conn1, conn2),默认是双工的。

import multiprocessing

def sender(conn):
    conn.send("来自发送方的消息")
    response = conn.recv()
    print(f"发送方收到回复: {response}")

def receiver(conn):
    msg = conn.recv()
    print(f"接收方收到: {msg}")
    conn.send("确认收到")

if __name__ == '__main__':
    parent_conn, child_conn = multiprocessing.Pipe()
    p1 = multiprocessing.Process(target=sender, args=(parent_conn,))
    p2 = multiprocessing.Process(target=receiver, args=(child_conn,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()

共享内存:在多进程间共享数据

对于一些简单的数值或数组,可以使用 ValueArray 在共享内存中创建对象,从而在多个进程间直接访问和修改。

import multiprocessing

def increment(n, a):
    n.value += 1
    for i in range(len(a)):
        a[i] += 1

if __name__ == '__main__':
    num = multiprocessing.Value('i', 0)          # 'i' 表示整数
    arr = multiprocessing.Array('i', [0, 1, 2])  # 整型数组

    p1 = multiprocessing.Process(target=increment, args=(num, arr))
    p2 = multiprocessing.Process(target=increment, args=(num, arr))
    p1.start()
    p2.start()
    p1.join()
    p2.join()

    print(f"num = {num.value}, arr = {arr[:]}")

然而,直接使用共享内存修改数据 不是原子操作,必须使用锁来避免竞争条件。

同步机制:锁(Lock)

multiprocessing.Lock 可以确保同一时刻只有一个进程访问临界区。

import multiprocessing

def worker(lock, counter):
    for _ in range(1000):
        with lock:
            counter.value += 1

if __name__ == '__main__':
    lock = multiprocessing.Lock()
    counter = multiprocessing.Value('i', 0)
    processes = [
        multiprocessing.Process(target=worker, args=(lock, counter))
        for _ in range(10)
    ]
    for p in processes:
        p.start()
    for p in processes:
        p.join()
    print(f"最终计数: {counter.value}")

此外,与线程模块类似,还有 RLock, Semaphore, Event, Condition 等同步原语。

实战:绕过 GIL 对比线程与进程

我们来看一个 CPU 密集型的例子:计算斐波那契数列(递归实现,纯 Python 运算)。

import time
import threading
import multiprocessing

def fib(n):
    if n <= 1:
        return n
    return fib(n-1) + fib(n-2)

def run_sequential(n, count):
    start = time.time()
    for _ in range(count):
        fib(n)
    return time.time() - start

def run_threads(n, count):
    start = time.time()
    threads = []
    for _ in range(count):
        t = threading.Thread(target=fib, args=(n,))
        threads.append(t)
        t.start()
    for t in threads:
        t.join()
    return time.time() - start

def run_processes(n, count):
    start = time.time()
    processes = []
    for _ in range(count):
        p = multiprocessing.Process(target=fib, args=(n,))
        processes.append(p)
        p.start()
    for p in processes:
        p.join()
    return time.time() - start

if __name__ == '__main__':
    N = 35
    TASKS = 8
    print(f"顺序执行 {TASKS} 次 fib({N}): {run_sequential(N, TASKS):.2f}s")
    print(f"多线程    {TASKS} 次 fib({N}): {run_threads(N, TASKS):.2f}s")
    print(f"多进程    {TASKS} 次 fib({N}): {run_processes(N, TASKS):.2f}s")

在 8 核机器上的典型输出:

顺序执行 8 次 fib(35): 20.50s
多线程    8 次 fib(35): 20.80s   (几乎无提升,甚至更慢)
多进程    8 次 fib(35): 3.30s    (约 6.2 倍加速)

结论一目了然:CPU 密集型任务使用多进程能获得接近线性的加速,而多线程则受困于 GIL,无法利用多核。

注意事项与常见问题

  1. if __name__ == '__main__' 不可省略
    在 Windows 和 macOS(使用 spawn 模式)下,必须将进程启动代码放在该保护中,否则会无限递归创建子进程。

  2. 谨慎传递大对象
    通过参数或队列传递数据时,对象会被序列化(pickle)。大量数据的序列化与反序列化会成为性能瓶颈。可考虑使用共享内存 Array 或将数据放在文件/数据库中。

  3. 选择正确的启动方法
    Python 提供了 fork(Unix)、spawnforkserver 三种启动方式。fork 速度快但可能引发死锁或资源泄漏;spawn 更安全但启动慢。可以通过 multiprocessing.set_start_method('spawn') 设置。

  4. 异常处理
    子进程中抛出的异常默认不会影响父进程,需要在子进程内部妥善捕获和处理,或者通过队列传递异常信息。

  5. 优雅关闭进程
    调用 terminate() 会强制结束进程,可能造成资源未释放。尽量设计任务完成自然退出,或使用 Event/Queue 发送退出信号。

小结

  • GIL 是 CPython 多线程并行的主要障碍,多进程可以完美绕开它。
  • multiprocessing 模块提供了类线程的 API,包括 Process, Pool, Queue, Pipe, Value, Array 和同步原语。
  • 对于 I/O 密集型任务,优先使用 asyncio 或线程;对于 CPU 密集型任务,毫不犹豫选择多进程。
  • 注意跨平台兼容性、数据序列化开销以及进程的优雅启停。

掌握了 multiprocessing,你就可以充分利用服务器的全部 CPU 核心,让 Python 程序在计算密集型场景下焕发强劲的性能。