在 Python
中,多进程通过 multiprocessing
模块实现,它克服了 GIL(全局解释器锁)的限制,可以充分利用多核 CPU,特别适合 CPU 密集型任务。
创建进程
通过 multiprocessing.Process
创建进程。
class Process(
group: None = None,
target: ((...) -> object) | None = None,
name: str | None = None,
args: Iterable[Any] = (),
kwargs: Mapping[str, Any] = {},
*,
daemon: bool | None = None
)
参数说明:
参数 | 类型 | 默认值 | 说明 |
---|---|---|---|
group | NoneType | None | 保留参数,应始终为 None(为未来扩展保留) |
target | callable | None | 目标函数,子进程启动时调用的可调用对象 |
name | str | None | 进程名称,用于标识进程的字符串(自动生成格式:Process-1,2,3…) |
args | tuple | () | 位置参数,传递给目标函数的参数元组 |
kwargs | dict | {} | 关键字参数,传递给目标函数的关键字参数字典 |
daemon | bool | None | 守护标志,设置为 True 时创建守护进程(主进程退出时自动终止) |
使用方法:
def worker(task):
print(f"Worker PID: {os.getpid()} with task: {task}")
# 完成相关任务
p = multiprocessing.Process(target=worker, args=(f"running {i} task",)) # 当参数为一个时,最后要打逗号
def say(name, task):
print(f"say PID: {os.getpid()} with name: {name}, task: {task}")
# 完成相关任务
p = multiprocessing.Process(target=say, args=("p1", "task1"))
进程对象 p
的属性和方法
核心方法:
方法 | 说明 |
---|---|
start() | 启动进程(创建子进程并调用其 run() 方法) |
run() | 实际执行目标函数的方法(通常不需要直接调用) |
join(timeout) | 阻塞当前进程,直到子进程结束(timeout:最长等待秒数,None 表示无限等待) |
terminate() | 强制终止进程(立即终止,不执行清理操作) |
kill() | 同 terminate()(在 Unix 上使用 SIGKILL 信号) |
close() | 关闭进程对象并释放所有资源(只能在未启动或已终止的进程上调用) |
重要属性:
属性 | 类型 | 说明 |
---|---|---|
name | str | 进程名称(可读写) |
daemon | bool | 守护标志(必须在 start() 前设置) |
pid | int | 进程 ID(启动后才有值) |
exitcode | int | 进程退出代码: • None:进程仍在运行 • 0:成功退出 • >0:错误退出 • <0:被信号终止 |
is_alive() | method | 检查进程是否仍在运行(返回布尔值) |
authkey | bytes | 进程的身份验证密钥(用于安全连接) |
sentinel | int | 系统对象的数字句柄(可用于等待进程结束) |
进程间通信
Queue
multiprocessing.Queue
是 Python 多进程编程中用于进程间通信(IPC)的关键组件,它提供了线程安全的 FIFO(先进先出)队列实现,允许多个进程安全地交换数据。
class Queue(
maxsize: int = 0,
*,
ctx: Any = ...
)
参数说明:
参数 | 类型 | 默认值 | 说明 |
---|---|---|---|
maxsize | int | 0 | 队列的最大容量: • 0 或负数:无限大小(直到内存耗尽) • 正整数:指定队列容量 |
ctx | Context | None | 多进程上下文(通常使用默认值) |
Queue 核心方法
数据操作:
方法 | 说明 |
---|---|
q.put(item[, block[, timeout]]) | 将 item 放入队列: • block=True (默认):队列满时阻塞• block=False :队列满时立即引发 queue.Full 异常• timeout :阻塞的最长时间(秒) |
q.get([block[, timeout]]) | 从队列获取并移除一个项目: • block=True (默认):队列空时阻塞• block=False :队列空时立即引发 queue.Empty 异常• timeout :阻塞的最长时间(秒) |
q.put_nowait(item) | 等同于 q.put(item, block=False) |
q.get_nowait() | 等同于 q.get(block=False) |
状态检查:
方法 | 说明 |
---|---|
q.empty() | 检查队列是否为空(不可靠:结果可能被其他进程改变) |
q.full() | 检查队列是否已满(不可靠:结果可能被其他进程改变) |
q.qsize() | 返回队列的近似大小(不可靠:多进程环境下不保证精确) |
队列管理:
方法 | 说明 |
---|---|
q.close() | 关闭队列,不再接受新项目 |
q.join_thread() | 等待后台线程完成工作(队列使用后台线程传输数据) |
q.cancel_join_thread() | 防止 join_thread() 阻塞(进程退出时自动调用) |
使用示例:
生产者-消费者模型
import multiprocessing
import random
import time
def producer(q, name):
for i in range(5):
item = f"{name}-{i}"
time.sleep(random.uniform(1, 2)) # 设置随机延迟
print(f"[producer {name}] Put: {item}")
q.put(item)
def consumer(q, name):
while True:
item = q.get() # 阻塞式获取
if item is None:
print(f"[consumer {name}] received termination signal")
break
time.sleep(random.uniform(0.5, 1.5)) # 模拟处理时间
print(f"[consumer {name}] received item: {item}")
if __name__ == '__main__':
q = multiprocessing.Queue(maxsize=3)
producers = [
multiprocessing.Process(target=producer, args=(q, "P1")),
multiprocessing.Process(target=producer, args=(q, "P2"))
]
consumers = [
multiprocessing.Process(target=consumer, args=(q, "C1")),
multiprocessing.Process(target=consumer, args=(q, "C2"))
]
# 启动生产者进程
for p in producers:
p.start()
# 启动消费者进程
for c in consumers:
c.start()
# 等待生产者完成
for p in producers:
p.join()
# 向每个消费者发送结束信号
for _ in consumers:
q.put(None)
# 等待消费者结束
for c in consumers:
c.join()
print("All processes completed")
Pipe
multiprocessing.Pipe()
是 Python 多进程编程中用于进程间通信(IPC)的核心工具,它创建了一个双向通信通道,允许两个进程高效地交换数据。下面将全面解析其参数和使用方法。
variable) def Pipe(duplex: bool = True) -> tuple[PipeConnection[Any, Any], PipeConnection[Any, Any]]
duplex
True
:创建双向管道(两端均可收发);False
:创建单向管道(前一个只能接收,后一个只能发送)。
Pipe 核心方法
方法:
方法 | 说明 |
---|---|
send(obj) | 发送 Python 对象(自动序列化) |
recv() | 接收 Python 对象(阻塞直到数据到达) |
send_bytes(buffer[, offset[, size]]) | 发送字节数据(避免序列化开销) |
recv_bytes([maxlength]) | 接收字节数据(返回完整字节对象) |
recv_bytes_into(buffer[, offset]) | 接收字节数据到指定缓冲区 |
poll([timeout]) | 检查是否有数据可读(timeout:等待秒数,None 无限等待) |
close() | 关闭连接 |
fileno() | 返回连接使用的文件描述符(用于 select/poll) |
属性:
方法/属性 | 说明 |
---|---|
closed | 只读属性,检查连接是否已关闭 |
writable | 只读属性,检查连接是否可写 |
readable | 只读属性,检查连接是否可读 |
双向通信使用示例:
import multiprocessing
import os
def worker(conn):
print(f"worker pid: {os.getpid()}")
message = conn.recv() # 接收父进程的消息
print(f"workder received: {message}")
conn.send("message from worker") # 向父进程发送消息
conn.close() # 关闭连接
if __name__ == '__main__':
parent_conn, child_conn = multiprocessing.Pipe()
p = multiprocessing.Process(target=worker, args=(child_conn, ))
p.start()
parent_conn.send("message from parent") # 向子进程发送消息
response = parent_conn.recv() # 接收子进程的消息
print(f"parent received response: {response}")
parent_conn.close() # 关闭连接
p.join() # 等待子进程结束
Comments NOTHING