multiprocessing模块主要提供Process、Queue、Pipe、Pool等对象支持
Process-多进程
from multiprocessing import Process
创建Process实例
p = Process(
['group=None', 'target=None', 'name=None', 'args=()', 'kwargs={}', '*', 'daemon=None'],
)
Process实例参数说明
参数名称 | 说明 |
group | 未使用 |
target | 进程调用对象 |
args | 传递给target的参数组成的元组类型 |
kwargs | 传递给target的关键字参数 |
name | 是为进程指定描述性名称 |
daemon | 是否后台进程,后台进程在父进程终止后也会终止,后台进程禁止创建子进程 |
Process实例属性
属性名 | 说明 |
authkey | 进程的身份验证键。默认由os.urandom生成。用于提供底层进程间通信安全 |
daemon | 指示进程是否是后台进程 |
exitcode | 进程退出状态码 |
name | 进程名称 |
pid | 进程ID |
Process实例方法
方法名称 | 说明 |
is_alive() | 进程是否仍在运行 |
join() | 等待进程终止 |
run() | 进程启动运行方法,默认会调用target。通过子类重载可以重写该方法 |
start() | 启动进程 |
terminate() | 强制终止进程,并且不进行任何清理动作。如果该进程创建了子进程,这些子进程都会变为僵尸进程。 |
进程间队列通信Queue对象
导入Queue
from multiprocessing import Queue
实例化Queue
q = Queue([maxsize])
maxsize - 队列最大项数
实例对象q具有的方法
方法名 | 说明 |
cancel_join_thread() | 不会在进程退出后自动连接后台线程,防止join_thread阻塞 |
close() | 关闭队列,防止队列加入更多数据。该方法不影响消费者端 |
empty() | 队列是否为空,True为空。如果存在进程写入时,该值不可靠 |
full() | 队列是否已满, True为满。同样存在不可靠情形 |
get(block=True, timeout=0) | 如果队列为空,将阻塞到有可用项为止,如果指定timeout,阻塞timeout秒后抛出Empty异常 |
get_nowait() | 相当于get(False) |
join_thread() | 连接队列的后台线程。此方法用于在close()之后,等待所有队列项被消耗 |
put( item, block=True,timeout=0) | 向对列中添加项,如果对列已满,则阻塞到队列有空间可用为止。如果block设置为False,则会抛出Full异常,如果指定了timeout,则会等待timeout秒,然后抛出Full错误 |
put_nowait() | 相当于put(item, False) |
qsize() | 返回目前队列中的数量。同样,结果不可靠,该函数在某些系统不可用,比如windows |
JoinableQueue队列对象
该队列允许消费者通知项的生产者,项已被处理
该对象除了拥有Queue对象方法以外,还有以下方法
task_done() | 消费者发出信号,表示该项已处理 |
join() | 生产者使用此方法阻塞,直到队列中所有项被处理(每个项都被task_done) |
Pipe管道对象
管道是另一种在进程之间传递数据的途径
实例化Pipe对象
p1, p2 = Pipe(duplex=True)
实例后,会返回两个multiprocessing.connection.Connection
对象,代表管道两端的连接对象,默认情况下,p1,p2都可以接受与发送数据,如果把duplex设为False,那么p1只能接受,p2只能发送
Connection对象方法
方法名称 | 说明 |
close() | 关闭连接 |
fileno() | 连接的整数文件描述符 |
poll(timeout=0) | 如果连接上有数据可用,返回True,timeout指定等待时间,默认立即返回结果,如果timeout=None,将无限期等待直到有数据可用 |
recv() | 接受send()方法返回的对象 |
recv_bytes([maxlength]) | 接受send_bytes()方法发送的完整的字节数据,maxlength接受最大字节数。如果进入的消息超过最大最大消息值,将引发IOError,并且无法读取。如果连接一段已经关闭,不存在任何数据,引发EOFError错误 |
recv_bytes_into(buffer[,offset]) | 接受一条消息,并保存到buffer对象中,该对象支持可写入缓冲区接口。offset指定缓冲区放置消息处的位移。返回值为收到的字节数。如果消息长度超出缓冲区空间,引发BufferTooShort异常 |
send(obj) | 通过连接发送对象,obj可以是与序列化兼容的任意对象 |
send_bytes(buffer, offset, size) | 通过连接发送字节到缓冲区 buffer - 支持缓冲区的对象 offset - 缓冲区中字节偏移量 size - 要发送的字节数 |
进程池对象Pool
进程池可以方便的管理进程
from multiprocessing import Pool
创建进程池对象实例
P = Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=None)
参数名称 | 说明 |
processes | 创建的进程数,如果此参数省略,使用cpu_count()值 |
initializer | 每个工作进程启动要调用的对象 |
initargs | 传递给initializer的参数,元组类型 |
maxtasksperchild | 每个子进程完成最多任务数,以便释放资源。如果为None,则与Pool寿命一致 |
实例对象P支持的方法
方法名 | 说明 |
apply(func[,args[,kwargs]]) | 在一个池中执行func,args, kwargs为传递给func的参数,此方法只会在一个进程中执行func,在返回结果前会阻塞 |
apply_async(func[, args[, kwds[, callback[, error_callback]]]]) | 异步执行func,并返回一个AsyncResult类型结果对象,当结果对象可用时,如果指定了callback,那么会调用callback回调函数,该函数不应当存在阻塞 |
map (func, iterable[, chunksize]) |
将可调用对象应用给iterable中所有的项,以列表形式返回结果,通过将iterable分割成多个块同时执行, chunksize指定每块中的项数 |
map_async (func, iterable[, chunksize[, callback[, error_callback]]]) |
同map,只是异步返回结果 |
imap (func, iterable[, chunksize]) |
同map,只是返回一个迭代器而不是列表 |
imap_unordered (func, iterable[, chunksize]) |
同imap,只是数据返回是无序的 |
starmap (func, iterable[, chunksize]) |
同map,只是iterable的每一项必须是可以解包的项 |
starmap_async (func, iterable[, chunksize[, callback[, error_callback]]]) |
同startmap,异步执行 |
close() | 关闭进程池,如果存在挂起操作,将在进程终止前结束 |
terminate () |
立即终止所有进程,不会进行任何清理工作或者结束任何挂起工作 |
join() | 等待所有工作进程退出。在close()或者terminate ()之后调用 |
AsyncResult类
apply_async与map_async都会返回AsyncResult实例
AsyncResult实例对象方法
方法名 | 说明 |
get([timeout]) | 返回结果,等待结果到达,timeout指定在timeout秒内,如果没有等到结果,引发TimeoutError异常。如果远程操作中引发了异常,将在调用此方法时再次引发 |
ready() | 如果调用完成,返回True |
successful() | 如果调用完成且没有引发异常,返回True,如果在结果就绪之前调用此方法,将引发AssertionError异常 |
wait([timeout]) | 等待结果可用,如果指定了timeout,将等待timeout秒。 |
讨论区