请稍候,加载中....

multiprocessing模块

multiprocessing模块主要提供Process、Queue、Pipe、Pool等对象支持

Process-多进程

from multiprocessing import Process

创建Process实例

p = Process(
    ['group=None', 'target=None', 'name=None', 'args=()', 'kwargs={}', '*', 'daemon=None'],
)

Process实例参数说明

参数名称 说明
group 未使用
target 进程调用对象
args 传递给target的参数组成的元组类型
kwargs 传递给target的关键字参数
name 是为进程指定描述性名称
daemon 是否后台进程,后台进程在父进程终止后也会终止,后台进程禁止创建子进程

Process实例属性

属性名 说明
authkey 进程的身份验证键。默认由os.urandom生成。用于提供底层进程间通信安全
daemon 指示进程是否是后台进程
exitcode 进程退出状态码
name 进程名称
pid 进程ID

Process实例方法

方法名称 说明
is_alive() 进程是否仍在运行
join() 等待进程终止
run() 进程启动运行方法,默认会调用target。通过子类重载可以重写该方法
start() 启动进程
terminate() 强制终止进程,并且不进行任何清理动作。如果该进程创建了子进程,这些子进程都会变为僵尸进程。

进程间队列通信Queue对象

导入Queue

from multiprocessing import Queue

实例化Queue

q = Queue([maxsize])

maxsize - 队列最大项数

实例对象q具有的方法

方法名 说明
cancel_join_thread() 不会在进程退出后自动连接后台线程,防止join_thread阻塞
close() 关闭队列,防止队列加入更多数据。该方法不影响消费者端
empty() 队列是否为空,True为空。如果存在进程写入时,该值不可靠
full() 队列是否已满, True为满。同样存在不可靠情形
get(block=True, timeout=0) 如果队列为空,将阻塞到有可用项为止,如果指定timeout,阻塞timeout秒后抛出Empty异常
get_nowait() 相当于get(False)
join_thread() 连接队列的后台线程。此方法用于在close()之后,等待所有队列项被消耗
put( item, block=True,timeout=0) 向对列中添加项,如果对列已满,则阻塞到队列有空间可用为止。如果block设置为False,则会抛出Full异常,如果指定了timeout,则会等待timeout秒,然后抛出Full错误
put_nowait() 相当于put(item, False)
qsize() 返回目前队列中的数量。同样,结果不可靠,该函数在某些系统不可用,比如windows

 

JoinableQueue队列对象

该队列允许消费者通知项的生产者,项已被处理

该对象除了拥有Queue对象方法以外,还有以下方法

task_done() 消费者发出信号,表示该项已处理
join() 生产者使用此方法阻塞,直到队列中所有项被处理(每个项都被task_done)

 

Pipe管道对象

管道是另一种在进程之间传递数据的途径

实例化Pipe对象

p1, p2 = Pipe(duplex=True)

实例后,会返回两个multiprocessing.connection.Connection对象,代表管道两端的连接对象,默认情况下,p1,p2都可以接受与发送数据,如果把duplex设为False,那么p1只能接受,p2只能发送

Connection对象方法

方法名称 说明
close() 关闭连接
fileno() 连接的整数文件描述符
poll(timeout=0) 如果连接上有数据可用,返回True,timeout指定等待时间,默认立即返回结果,如果timeout=None,将无限期等待直到有数据可用
recv() 接受send()方法返回的对象
recv_bytes([maxlength]) 接受send_bytes()方法发送的完整的字节数据,maxlength接受最大字节数。如果进入的消息超过最大最大消息值,将引发IOError,并且无法读取。如果连接一段已经关闭,不存在任何数据,引发EOFError错误
recv_bytes_into(buffer[,offset]) 接受一条消息,并保存到buffer对象中,该对象支持可写入缓冲区接口。offset指定缓冲区放置消息处的位移。返回值为收到的字节数。如果消息长度超出缓冲区空间,引发BufferTooShort异常
send(obj) 通过连接发送对象,obj可以是与序列化兼容的任意对象
send_bytes(buffer, offset, size) 通过连接发送字节到缓冲区
buffer - 支持缓冲区的对象
offset - 缓冲区中字节偏移量
size - 要发送的字节数

 

进程池对象Pool

进程池可以方便的管理进程

from multiprocessing import Pool

创建进程池对象实例

P = Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=None)
参数名称 说明
processes 创建的进程数,如果此参数省略,使用cpu_count()值
initializer 每个工作进程启动要调用的对象
initargs 传递给initializer的参数,元组类型
maxtasksperchild 每个子进程完成最多任务数,以便释放资源。如果为None,则与Pool寿命一致

实例对象P支持的方法

方法名 说明
apply(func[,args[,kwargs]]) 在一个池中执行func,args, kwargs为传递给func的参数,此方法只会在一个进程中执行func,在返回结果前会阻塞
apply_async(func[, args[, kwds[, callback[, error_callback]]]]) 异步执行func,并返回一个AsyncResult类型结果对象,当结果对象可用时,如果指定了callback,那么会调用callback回调函数,该函数不应当存在阻塞
map(funciterable[, chunksize]) 将可调用对象应用给iterable中所有的项,以列表形式返回结果,通过将iterable分割成多个块同时执行, chunksize指定每块中的项数
map_async(funciterable[, chunksize[, callback[, error_callback]]]) 同map,只是异步返回结果
imap(funciterable[, chunksize]) 同map,只是返回一个迭代器而不是列表
imap_unordered(funciterable[, chunksize]) 同imap,只是数据返回是无序的
starmap(funciterable[, chunksize]) 同map,只是iterable的每一项必须是可以解包的项
starmap_async(funciterable[, chunksize[, callback[, error_callback]]]) 同startmap,异步执行
close() 关闭进程池,如果存在挂起操作,将在进程终止前结束
terminate() 立即终止所有进程,不会进行任何清理工作或者结束任何挂起工作
join() 等待所有工作进程退出。在close()或者terminate()之后调用

 

AsyncResult类

apply_async与map_async都会返回AsyncResult实例

AsyncResult实例对象方法

方法名 说明
get([timeout]) 返回结果,等待结果到达,timeout指定在timeout秒内,如果没有等到结果,引发TimeoutError异常。如果远程操作中引发了异常,将在调用此方法时再次引发
ready() 如果调用完成,返回True
successful() 如果调用完成且没有引发异常,返回True,如果在结果就绪之前调用此方法,将引发AssertionError异常
wait([timeout]) 等待结果可用,如果指定了timeout,将等待timeout秒。

 


Python学习手册-