在 Python 中,多进程通过 multiprocessing 模块实现,它克服了 GIL(全局解释器锁)的限制,可以充分利用多核 CPU,特别适合 CPU 密集型任务。
创建进程
通过 multiprocessing.Process 创建进程。
class Process(  
group: None = None,  
target: ((...) -> object) | None = None,  
name: str | None = None,  
args: Iterable[Any] = (),  
kwargs: Mapping[str, Any] = {},  
*,  
daemon: bool | None = None  
)
参数说明:
| 参数 | 类型 | 默认值 | 说明 | 
|---|---|---|---|
group | NoneType | None | 保留参数,应始终为 None(为未来扩展保留) | 
target | callable | None | 目标函数,子进程启动时调用的可调用对象 | 
name | str | None | 进程名称,用于标识进程的字符串(自动生成格式:Process-1,2,3…) | 
args | tuple | () | 位置参数,传递给目标函数的参数元组 | 
kwargs | dict | {} | 关键字参数,传递给目标函数的关键字参数字典 | 
daemon | bool | None | 守护标志,设置为 True 时创建守护进程(主进程退出时自动终止) | 
使用方法:
def worker(task):
    print(f"Worker PID: {os.getpid()} with task: {task}")
    # 完成相关任务
p = multiprocessing.Process(target=worker, args=(f"running {i} task",)) # 当参数为一个时,最后要打逗号
def say(name, task):
    print(f"say PID: {os.getpid()} with name: {name}, task: {task}")
    # 完成相关任务
p = multiprocessing.Process(target=say, args=("p1", "task1"))
进程对象 p 的属性和方法
核心方法:
| 方法 | 说明 | 
|---|---|
start() | 启动进程(创建子进程并调用其 run() 方法) | 
run() | 实际执行目标函数的方法(通常不需要直接调用) | 
join(timeout) | 阻塞当前进程,直到子进程结束(timeout:最长等待秒数,None 表示无限等待) | 
terminate() | 强制终止进程(立即终止,不执行清理操作) | 
kill() | 同 terminate()(在 Unix 上使用 SIGKILL 信号) | 
close() | 关闭进程对象并释放所有资源(只能在未启动或已终止的进程上调用) | 
重要属性:
| 属性 | 类型 | 说明 | 
|---|---|---|
name | str | 进程名称(可读写) | 
daemon | bool | 守护标志(必须在 start() 前设置) | 
pid | int | 进程 ID(启动后才有值) | 
exitcode | int | 进程退出代码: • None:进程仍在运行 • 0:成功退出 • >0:错误退出 • <0:被信号终止  | 
is_alive() | method | 检查进程是否仍在运行(返回布尔值) | 
authkey | bytes | 进程的身份验证密钥(用于安全连接) | 
sentinel | int | 系统对象的数字句柄(可用于等待进程结束) | 
进程间通信
Queue
multiprocessing.Queue 是 Python 多进程编程中用于进程间通信(IPC)的关键组件,它提供了线程安全的 FIFO(先进先出)队列实现,允许多个进程安全地交换数据。
class Queue(  
maxsize: int = 0,  
*,  
ctx: Any = ...  
)
参数说明:
| 参数 | 类型 | 默认值 | 说明 | 
|---|---|---|---|
maxsize | int | 0 | 队列的最大容量: • 0 或负数:无限大小(直到内存耗尽) • 正整数:指定队列容量  | 
ctx | Context | None | 多进程上下文(通常使用默认值) | 
Queue 核心方法
数据操作:
| 方法 | 说明 | 
|---|---|
q.put(item[, block[, timeout]]) | 将 item 放入队列: • block=True(默认):队列满时阻塞• block=False:队列满时立即引发 queue.Full 异常• timeout:阻塞的最长时间(秒) | 
q.get([block[, timeout]]) | 从队列获取并移除一个项目: • block=True(默认):队列空时阻塞• block=False:队列空时立即引发 queue.Empty 异常• timeout:阻塞的最长时间(秒) | 
q.put_nowait(item) | 等同于 q.put(item, block=False) | 
q.get_nowait() | 等同于 q.get(block=False) | 
状态检查:
| 方法 | 说明 | 
|---|---|
q.empty() | 检查队列是否为空(不可靠:结果可能被其他进程改变) | 
q.full() | 检查队列是否已满(不可靠:结果可能被其他进程改变) | 
q.qsize() | 返回队列的近似大小(不可靠:多进程环境下不保证精确) | 
队列管理:
| 方法 | 说明 | 
|---|---|
q.close() | 关闭队列,不再接受新项目 | 
q.join_thread() | 等待后台线程完成工作(队列使用后台线程传输数据) | 
q.cancel_join_thread() | 防止 join_thread() 阻塞(进程退出时自动调用) | 
使用示例:
生产者-消费者模型
import multiprocessing
import random
import time
def producer(q, name):
    for i in range(5):
        item = f"{name}-{i}"
        time.sleep(random.uniform(1, 2)) # 设置随机延迟
        print(f"[producer {name}] Put: {item}")
        q.put(item)
def consumer(q, name):
    while True:
        item = q.get() # 阻塞式获取
        if item is None:
            print(f"[consumer {name}] received termination signal")
            break
        time.sleep(random.uniform(0.5, 1.5)) # 模拟处理时间
        print(f"[consumer {name}] received item: {item}")
if __name__ == '__main__':
    q = multiprocessing.Queue(maxsize=3)
    producers = [
        multiprocessing.Process(target=producer, args=(q, "P1")),
        multiprocessing.Process(target=producer, args=(q, "P2"))
    ]
    consumers = [
        multiprocessing.Process(target=consumer, args=(q, "C1")),
        multiprocessing.Process(target=consumer, args=(q, "C2"))
    ]
    # 启动生产者进程
    for p in producers:
        p.start()
    # 启动消费者进程
    for c in consumers:
        c.start()
    # 等待生产者完成
    for p in producers:
        p.join()
    # 向每个消费者发送结束信号
    for _ in consumers:
        q.put(None)
    # 等待消费者结束
    for c in consumers:
        c.join()
    print("All processes completed")
Pipe
multiprocessing.Pipe() 是 Python 多进程编程中用于进程间通信(IPC)的核心工具,它创建了一个双向通信通道,允许两个进程高效地交换数据。下面将全面解析其参数和使用方法。
variable) def Pipe(duplex: bool = True) -> tuple[PipeConnection[Any, Any], PipeConnection[Any, Any]]
duplexTrue:创建双向管道(两端均可收发);False:创建单向管道(前一个只能接收,后一个只能发送)。
Pipe 核心方法
方法:
| 方法 | 说明 | 
|---|---|
send(obj) | 发送 Python 对象(自动序列化) | 
recv() | 接收 Python 对象(阻塞直到数据到达) | 
send_bytes(buffer[, offset[, size]]) | 发送字节数据(避免序列化开销) | 
recv_bytes([maxlength]) | 接收字节数据(返回完整字节对象) | 
recv_bytes_into(buffer[, offset]) | 接收字节数据到指定缓冲区 | 
poll([timeout]) | 检查是否有数据可读(timeout:等待秒数,None 无限等待) | 
close() | 关闭连接 | 
fileno() | 返回连接使用的文件描述符(用于 select/poll) | 
属性:
| 方法/属性 | 说明 | 
|---|---|
closed | 只读属性,检查连接是否已关闭 | 
writable | 只读属性,检查连接是否可写 | 
readable | 只读属性,检查连接是否可读 | 
双向通信使用示例:
import multiprocessing
import os
def worker(conn):
    print(f"worker pid: {os.getpid()}")
    message = conn.recv() # 接收父进程的消息
    print(f"workder received: {message}")
    conn.send("message from worker") # 向父进程发送消息
    conn.close() # 关闭连接
if __name__ == '__main__':
    parent_conn, child_conn = multiprocessing.Pipe()
    p = multiprocessing.Process(target=worker, args=(child_conn, ))
    p.start()
    parent_conn.send("message from parent") # 向子进程发送消息
    response = parent_conn.recv() # 接收子进程的消息
    print(f"parent received response: {response}")
    parent_conn.close() # 关闭连接
    p.join() # 等待子进程结束
			
                            
Comments NOTHING