进程 (process)
进程是对各种资源管理的集合,包含对各种资源的调用、内存的管理、网络接口的调用 进程要操作 CPU 必须先启动一个线程,启动一个进程的时候会自动创建一个线程,进程里的第一个线程就是主线程 程序执行的实例 有唯一的进程标识符(pid)
multiprossing 模块
启动进程
示例:
import multiprocessing
import time
def process_run(n):
time.sleep(1)
print('process', n)
for i in range(10):
p = multiprocessing.Process(target=process_run, args=(i, ))
p.start()
所有进程都是由父进程启动的
示例:
import multiprocessing
import os
def show_info(title):
print(title)
print('module name:', __name__)
print('parent process:', os.getppid())
print('process id', os.getpid())
print('\n\n')
def f(name):
show_info('function f')
print(name)
if __name__ == '__main__':
show_info('main process line')
p = multiprocessing.Process(target=f, args=('children process', ))
p.start()
进程间通信
线程间共享内存空间,进程间只能通过其他方法进行通信
Queue
注意这个 Queue 不同于 queue.Queue Queue type using a pipe, buffer and thread 两个进程的 Queue 并不是同一个,而是将数据 pickle 后传给另一个进程的 Queue 用于父进程与子进程之间的通信或同一父进程的子进程之间通信
示例:
from multiprocessing import Process, Queue
def p_put(*args):
q.put(args)
print('Has put %s' % args)
def p_get(*args):
print('%s wait to get...' % args)
print(q.get())
print('%s got it' % args)
q = Queue()
p1 = Process(target=p_put, args=('p1', ))
p2 = Process(target=p_get, args=('p2', ))
p1.start()
p2.start()
输出结果: Has put p1 p2 wait to get... ('p1',) p2 got it
换成 queue 示例:
from multiprocessing import Process
import queue
def p_put(*args):
q.put(args)
print('Has put %s' % args)
def p_get(*args):
print('%s wait to get...' % args)
print(q.get())
print('%s got it' % args)
q = queue.Queue()
p1 = Process(target=p_put, args=('p1', ))
p2 = Process(target=p_get, args=('p2', ))
p1.start()
p2.start()
输出结果: Has put p1 p2 wait to get...
由于父进程启动子进程时是复制一份,所以每个子进程里也有一个空的队列,但是这些队列数据独立,所以 get
时会阻塞
Pipe
Pipe(管道) 是通过 socket 进行进程间通信的 所以步骤与建立 socket 连接相似: 建立连接、发送/接收数据(一端发送另一端不接受就会阻塞)、关闭连接 示例:
from multiprocessing import Pipe, Process
def f(conn):
conn.send('send by child')
print('child recv:', conn.recv())
conn.close()
parent_conn, child_conn = Pipe() # 获得 Pipe 连接的两端
p = Process(target=f, args=(child_conn, ))
p.start()
print('parent recv:', parent_conn.recv())
parent_conn.send('send by parent')
p.join()
输出结果: parent recv: send by child child recv: send by parent
进程间数据共享
Manager
Manager 实现的是进程间共享数据 支持的可共享数据类型:
list
dict
Value
Array
Namespace
Queue queue.Queue
JoinableQueue queue.Queue
Event threading.Event
Lock threading.Lock
RLock threading.RLock
Semaphore threading.Semaphore
BoundedSemaphore threading.BoundedSemaphore
Condition threading.Condition
Barrier threading.Barrier
Pool pool.Pool
示例:
from multiprocessing import Manager, Process
import os
def func():
m_dict['key'] = 'value'
m_list.append(os.getpid())
manager = Manager()
m_dict = manager.dict()
m_list = manager.list()
p_list = []
for i in range(10):
p = Process(target=func)
p.start()
p_list.append(p)
for p in p_list:
p.join()
print(m_list)
print(m_dict)
进程锁
打印时可能会出错,加锁可以避免 示例:
from multiprocessing import Lock, Process
def foo(n, l):
l.acquire()
print('hello world', n)
l.release()
lock = Lock()
for i in range(100):
process = Process(target=foo, args=(i, lock))
process.start()
进程池 (pool)
同一时间最多有几个进程在 CPU 上运行 示例:
from multiprocessing import Pool
import time
import os
def foo(n):
time.sleep(1)
print('In process', n, os.getpid())
return n
def bar(*args):
print('>>done: ', args, os.getpid())
pool = Pool(processes=3)
print('主进程: ', os.getpid())
for i in range(10):
# pool.apply(func=foo, args=(i, ))
pool.apply_async(func=foo, args=(i, ), callback=bar)
print('end')
pool.close()
pool.join()
从程序运行过程中可以看出:同一时间最多只有3个进程在运行,类似于线程中的信号量 主进程在执行 callback 函数
注意
1. pool.apply(func=foo, args=(i, ))
是串行执行 pool.apply_async(func=foo, args=(i, ), callback=bar)
是并行执行 2. callback 函数会以 target 函数返回结果为参数,在 target 函数执行结束之后执行 callback 函数是主进程调用的 3. 如果不执行 join,程序会在主进程执行完成之后直接结束,不会等待子进程执行完成 Pool.join()
必须在 Pool.close()
之后执行,否则会报错:ValueError: Pool is still running