信号量
事件:
用于主线程控制其他线程的执行,事件主要提供了三个方法 set、wait、clear。
事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果“Flag”值为True,那么event.wait 方法时便不再阻塞。
clear:将“Flag”设置为False
set:将“Flag”设置为True
#!/usr/bin/env python # -*- coding:utf-8 -*- #Author: caoyf from multiprocessing import Event,Process import random import time def cars(a,i): if not a.is_set(): print('car%s在等待'%i) a.wait() print('\033[31mcar%s通过\033[0m' % i) def f(a): while True: if a.is_set(): a.clear() print('\033[31m红灯亮了\033[0m') else: a.set() print('\033[32m绿灯亮了\033[0m') time.sleep(2) if __name__ == '__main__': a = Event() p = Process(target=f,args=(a,)) p.start() for i in range(20): car = Process(target=cars,args=(a,i)) car.start() time.sleep(random.random()) |
事件/红绿灯实例
四、进程间通信---队列和管道
队列Queue:适用于多线程编程的先进先出数据结构,可以用来安全的传递多线程信息。
通过队列实现了 主进程与子进程的通信 子进程与子进程之间的通信
q=Queue(10) #实例化一个对象,允许队列对多10个元素
q.put() #放入队列
q.get() #从队列中取出
假设现在有一个队伍,队伍里最多只能站5个人,但是有15个人想要进去
#!/usr/bin/env python # -*- coding:utf-8 -*- #Author: caoyf from multiprocessing import Process from multiprocessing import Queue def getin(q): #进入队伍的子进程 for i in range(15): q.put(i) # print(q) def getout(q): #离开队伍的子进程 for i in range(6): print(q.get()) if __name__=='__main__': q=Queue(5) #队伍内最多可以容纳的人数 p=Process(target=getin,args=(q,)) #进入队伍的进程 p.start() p2=Process(target=getout,args=(q,)) #离开队伍的进程 p2.start() |
队列实例
管道(Pipes)
#!/usr/bin/env python # -*- coding:utf-8 -*- #Author: caoyf from multiprocessing import Process,Pipe,Manager,Lock import time import random # 管道 进程之间创建的一条管道,默认是全双工模式,两头都可以进和出, # 注意 必须在产生Process对象之前产生管道 # 如果在Pipe括号里面填写False后就变成了单双工, # 左边的只能收,右边的只能发,recv(接收),send(发送) #如果没有消息可以接收,recv会一直阻塞,如果连接的另外一段关闭后, #recv会抛出EOFError错误 # close 关闭连接 #下面的实例是在Pipe的括号里填写和不填写False的区别 # from multiprocessing import Process,Pipe # def func(pro): # pro.send('hello') # pro.close() # # if __name__=='__main__': # con,pro = Pipe(False) # p = Process(target=func,args=(pro,)) # p.start() # print(con.recv()) # p.join() # 模拟recv阻塞情况 # def func(con,pro): # con.close() # while 1: # try: # print(pro.recv()) # except EOFError: # pro.close() # break # # # if __name__=='__main__': # con,pro = Pipe() # p = Process(target=func,args=(con,pro,)) # p.start() # pro.close() # con.send('aaaaa') # con.close() # p.join() # 利用管道实现生产者和消费者 # def sc(con,pro,name,food): # con.close() # for i in range(5): # time.sleep(random.random()) # f = '%s生产了%s%s'%(name,food,i) # print(f) # pro.send(f) # def xf(con,pro,name): # pro.close() # while 1: # try: # baozi = con.recv() # print('%s消费了%s'%(name,baozi)) # except EOFError: # break # if __name__=='__main__': # con,pro = Pipe() # p1 = Process(target=sc,args=(con,pro,'caoyf','包子')) # c1 = Process(target=xf,args=(con,pro,'zhoaf')) # p1.start() # c1.start() # con.close() # pro.close() # p1.join() |
管道
数据共享:
队列和管道只是实现了数据的传递,还没有实现数据的共享,如实现数据共享,就要用到Managers( 注:进程间通信应该尽量避免使用共享数据的方式 )
from multiprocessing import Process,Manager import os def f(dict1,list1): dict1[os.getpid()] = os.getpid() # 往字典里放当前PID list1.append(os.getpid()) # 往列表里放当前PID print(list1) if __name__ == "__main__": with Manager() as manager: d = manager.dict() #生成一个字典,可在多个进程间共享和传递 l = manager.list(range(5)) #生成一个列表,可在多个进程间共享和传递 p_list = [] for i in range(10): p = Process(target=f,args=(d,l)) p.start() p_list.append(p) # 存进程列表 for res in p_list: res.join() print('\n%s' %d) #若要保证数据安全,需要加锁lock=Lock() |
进程池
对于需要使用几个甚至十几个进程时,我们使用Process还是比较方便的,但是如果要成百上千个进程,用Process显然太笨了,multiprocessing提供了Pool类,即现在要讲的进程池,能够将众多进程放在一起,设置一个运行进程上限,每次只运行设置的进程数,等有进程结束,再添加新的进程
Pool(processes =num):设置运行进程数,当一个进程运行完,会添加新的进程进去
apply_async:异步,串行
apply:同步,并行
close():关闭pool,不能再添加新的任务
import os import time import random from multiprocessing import Pool from multiprocessing import Process def func(i): i += 1 if __name__ == '__main__': p = Pool(5) # 创建了5个进程 start = time.time() p.map(func,range(1000)) p.close() # 是不允许再向进程池中添加任务 p.join() #阻塞等待 执行进程池中的所有任务直到执行结束 print(time.time() - start) start = time.time() l = [] for i in range(1000): p = Process(target=func,args=(i,)) # 创建了一百个进程 p.start() l.append(p) [i.join() for i in l] print(time.time() - start) |
回调函数:
import os import time from multiprocessing import Pool # 参数 概念 回调函数 def func(i): # 多进程中的io多,分出去一部分 print('子进程%s:%s'%(i,os.getpid())) return i*'*' def call(arg): # 回调函数是在主进程中完成的,不能传参数,只能接受多进程中函数的返回值 print('回调 :',os.getpid()) print(arg) if __name__ == '__main__': print('主进程',os.getpid()) p = Pool(5) for i in range(10): p.apply_async(func,args=(i,),callback=call) #callback 回调函数 :主进程执行 参数是子进程执行的函数的返回值 p.close() p.join() |
上文内容不用于商业目的,如涉及知识产权问题,请权利人联系博为峰小编(021-64471599-8017),我们将立即处理。