multiprocessing

Tecy 发布于 24 天前 66 次阅读


Python 中,多进程通过 multiprocessing 模块实现,它克服了 GIL(全局解释器锁)的限制,可以充分利用多核 CPU,特别适合 CPU 密集型任务。

创建进程

通过 multiprocessing.Process 创建进程。

class Process(  
group: None = None,  
target: ((...) -> object) | None = None,  
name: str | None = None,  
args: Iterable[Any] = (),  
kwargs: Mapping[str, Any] = {},  
*,  
daemon: bool | None = None  
)

参数说明:

参数类型默认值说明
groupNoneTypeNone保留参数,应始终为 None(为未来扩展保留)
targetcallableNone目标函数,子进程启动时调用的可调用对象
namestrNone进程名称,用于标识进程的字符串(自动生成格式:Process-1,2,3…)
argstuple()位置参数,传递给目标函数的参数元组
kwargsdict{}关键字参数,传递给目标函数的关键字参数字典
daemonboolNone守护标志,设置为 True 时创建守护进程(主进程退出时自动终止)

使用方法:

def worker(task):
    print(f"Worker PID: {os.getpid()} with task: {task}")
    # 完成相关任务

p = multiprocessing.Process(target=worker, args=(f"running {i} task",)) # 当参数为一个时,最后要打逗号

def say(name, task):
    print(f"say PID: {os.getpid()} with name: {name}, task: {task}")
    # 完成相关任务
p = multiprocessing.Process(target=say, args=("p1", "task1"))

进程对象 p 的属性和方法

核心方法:

方法说明
start()启动进程(创建子进程并调用其 run() 方法)
run()实际执行目标函数的方法(通常不需要直接调用)
join(timeout)阻塞当前进程,直到子进程结束(timeout:最长等待秒数,None 表示无限等待)
terminate()强制终止进程(立即终止,不执行清理操作)
kill()同 terminate()(在 Unix 上使用 SIGKILL 信号)
close()关闭进程对象并释放所有资源(只能在未启动或已终止的进程上调用)

重要属性:

属性类型说明
namestr进程名称(可读写)
daemonbool守护标志(必须在 start() 前设置)
pidint进程 ID(启动后才有值)
exitcodeint进程退出代码
• None:进程仍在运行
• 0:成功退出
• >0:错误退出
• <0:被信号终止
is_alive()method检查进程是否仍在运行(返回布尔值)
authkeybytes进程的身份验证密钥(用于安全连接)
sentinelint系统对象的数字句柄(可用于等待进程结束)

进程间通信

Queue

multiprocessing.Queue 是 Python 多进程编程中用于进程间通信(IPC)的关键组件,它提供了线程安全的 FIFO(先进先出)队列实现,允许多个进程安全地交换数据。

class Queue(  
maxsize: int = 0,  
*,  
ctx: Any = ...  
)

参数说明:

参数类型默认值说明
maxsizeint0队列的最大容量:
• 0 或负数:无限大小(直到内存耗尽)
• 正整数:指定队列容量
ctxContextNone多进程上下文(通常使用默认值)

Queue 核心方法

数据操作:

方法说明
q.put(item[, block[, timeout]])将 item 放入队列:
• block=True(默认):队列满时阻塞
• block=False:队列满时立即引发 queue.Full 异常
• timeout:阻塞的最长时间(秒)
q.get([block[, timeout]])从队列获取并移除一个项目:
• block=True(默认):队列空时阻塞
• block=False:队列空时立即引发 queue.Empty 异常
• timeout:阻塞的最长时间(秒)
q.put_nowait(item)等同于 q.put(item, block=False)
q.get_nowait()等同于 q.get(block=False)

状态检查:

方法说明
q.empty()检查队列是否为空(不可靠:结果可能被其他进程改变)
q.full()检查队列是否已满(不可靠:结果可能被其他进程改变)
q.qsize()返回队列的近似大小(不可靠:多进程环境下不保证精确)

队列管理:

方法说明
q.close()关闭队列,不再接受新项目
q.join_thread()等待后台线程完成工作(队列使用后台线程传输数据)
q.cancel_join_thread()防止 join_thread() 阻塞(进程退出时自动调用)

使用示例:
生产者-消费者模型

import multiprocessing
import random
import time

def producer(q, name):
    for i in range(5):
        item = f"{name}-{i}"
        time.sleep(random.uniform(1, 2)) # 设置随机延迟
        print(f"[producer {name}] Put: {item}")
        q.put(item)


def consumer(q, name):
    while True:
        item = q.get() # 阻塞式获取
        if item is None:
            print(f"[consumer {name}] received termination signal")
            break

        time.sleep(random.uniform(0.5, 1.5)) # 模拟处理时间
        print(f"[consumer {name}] received item: {item}")

if __name__ == '__main__':
    q = multiprocessing.Queue(maxsize=3)

    producers = [
        multiprocessing.Process(target=producer, args=(q, "P1")),
        multiprocessing.Process(target=producer, args=(q, "P2"))
    ]

    consumers = [
        multiprocessing.Process(target=consumer, args=(q, "C1")),
        multiprocessing.Process(target=consumer, args=(q, "C2"))
    ]

    # 启动生产者进程
    for p in producers:
        p.start()

    # 启动消费者进程
    for c in consumers:
        c.start()

    # 等待生产者完成
    for p in producers:
        p.join()

    # 向每个消费者发送结束信号
    for _ in consumers:
        q.put(None)

    # 等待消费者结束
    for c in consumers:
        c.join()

    print("All processes completed")

Pipe

multiprocessing.Pipe() 是 Python 多进程编程中用于进程间通信(IPC)的核心工具,它创建了一个双向通信通道,允许两个进程高效地交换数据。下面将全面解析其参数和使用方法。

variable) def Pipe(duplex: bool = True) -> tuple[PipeConnection[Any, Any], PipeConnection[Any, Any]]
  • duplex True:创建双向管道(两端均可收发);False:创建单向管道(前一个只能接收,后一个只能发送)。

Pipe 核心方法

方法:

方法说明
send(obj)发送 Python 对象(自动序列化)
recv()接收 Python 对象(阻塞直到数据到达)
send_bytes(buffer[, offset[, size]])发送字节数据(避免序列化开销)
recv_bytes([maxlength])接收字节数据(返回完整字节对象)
recv_bytes_into(buffer[, offset])接收字节数据到指定缓冲区
poll([timeout])检查是否有数据可读(timeout:等待秒数,None 无限等待)
close()关闭连接
fileno()返回连接使用的文件描述符(用于 select/poll)

属性:

方法/属性说明
closed只读属性,检查连接是否已关闭
writable只读属性,检查连接是否可写
readable只读属性,检查连接是否可读

双向通信使用示例:

import multiprocessing
import os

def worker(conn):
    print(f"worker pid: {os.getpid()}")

    message = conn.recv() # 接收父进程的消息

    print(f"workder received: {message}")

    conn.send("message from worker") # 向父进程发送消息

    conn.close() # 关闭连接


if __name__ == '__main__':
    parent_conn, child_conn = multiprocessing.Pipe()

    p = multiprocessing.Process(target=worker, args=(child_conn, ))
    p.start()

    parent_conn.send("message from parent") # 向子进程发送消息

    response = parent_conn.recv() # 接收子进程的消息

    print(f"parent received response: {response}")

    parent_conn.close() # 关闭连接

    p.join() # 等待子进程结束