Python并发编程之进程

发表于:2018-4-02 10:46

字体: | 上一篇 | 下一篇 | 我要投稿

 作者:曹艳飞    来源:博客园

  信号量
  事件:
  用于主线程控制其他线程的执行,事件主要提供了三个方法 set、wait、clear。
  事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果“Flag”值为True,那么event.wait 方法时便不再阻塞。
  clear:将“Flag”设置为False
  set:将“Flag”设置为True
#!/usr/bin/env python
# -*- coding:utf-8 -*-
#Author: caoyf
from multiprocessing import Event,Process
import random
import time
def cars(a,i):
if not a.is_set():
print('car%s在等待'%i)
a.wait()
print('\033[31mcar%s通过\033[0m' % i)
def f(a):
while True:
if a.is_set():
a.clear()
print('\033[31m红灯亮了\033[0m')
else:
a.set()
print('\033[32m绿灯亮了\033[0m')
time.sleep(2)
if __name__ == '__main__':
a = Event()
p = Process(target=f,args=(a,))
p.start()
for i in range(20):
car = Process(target=cars,args=(a,i))
car.start()
time.sleep(random.random())
  事件/红绿灯实例
  四、进程间通信---队列和管道
  队列Queue:适用于多线程编程的先进先出数据结构,可以用来安全的传递多线程信息。
  通过队列实现了 主进程与子进程的通信 子进程与子进程之间的通信
  q=Queue(10)     #实例化一个对象,允许队列对多10个元素
  q.put()         #放入队列
  q.get()         #从队列中取出
  假设现在有一个队伍,队伍里最多只能站5个人,但是有15个人想要进去
#!/usr/bin/env python
# -*- coding:utf-8 -*-
#Author: caoyf
from multiprocessing import Process
from multiprocessing import Queue
def getin(q):     #进入队伍的子进程
for i in range(15):
q.put(i)
# print(q)
def getout(q):    #离开队伍的子进程
for i in range(6):
print(q.get())
if __name__=='__main__':
q=Queue(5)      #队伍内最多可以容纳的人数
p=Process(target=getin,args=(q,))     #进入队伍的进程
p.start()
p2=Process(target=getout,args=(q,))   #离开队伍的进程
p2.start()
  队列实例
  管道(Pipes)
#!/usr/bin/env python
# -*- coding:utf-8 -*-
#Author: caoyf
from multiprocessing import Process,Pipe,Manager,Lock
import time
import  random
# 管道 进程之间创建的一条管道,默认是全双工模式,两头都可以进和出,
# 注意 必须在产生Process对象之前产生管道
# 如果在Pipe括号里面填写False后就变成了单双工,
# 左边的只能收,右边的只能发,recv(接收),send(发送)
#如果没有消息可以接收,recv会一直阻塞,如果连接的另外一段关闭后,
#recv会抛出EOFError错误
# close 关闭连接
#下面的实例是在Pipe的括号里填写和不填写False的区别
# from multiprocessing import Process,Pipe
# def func(pro):
#     pro.send('hello')
#     pro.close()
#
# if __name__=='__main__':
#     con,pro =  Pipe(False)
#     p = Process(target=func,args=(pro,))
#     p.start()
#     print(con.recv())
#     p.join()
# 模拟recv阻塞情况
# def func(con,pro):
#     con.close()
#     while 1:
#         try:
#             print(pro.recv())
#         except EOFError:
#             pro.close()
#             break
#
#
# if __name__=='__main__':
#     con,pro =  Pipe()
#     p = Process(target=func,args=(con,pro,))
#     p.start()
#     pro.close()
#     con.send('aaaaa')
#     con.close()
#     p.join()
# 利用管道实现生产者和消费者
# def sc(con,pro,name,food):
#     con.close()
#     for i in range(5):
#         time.sleep(random.random())
#         f = '%s生产了%s%s'%(name,food,i)
#         print(f)
#         pro.send(f)
# def xf(con,pro,name):
#     pro.close()
#     while 1:
#         try:
#             baozi = con.recv()
#             print('%s消费了%s'%(name,baozi))
#         except EOFError:
#             break
# if __name__=='__main__':
#     con,pro = Pipe()
#     p1 = Process(target=sc,args=(con,pro,'caoyf','包子'))
#     c1 = Process(target=xf,args=(con,pro,'zhoaf'))
#     p1.start()
#     c1.start()
#     con.close()
#     pro.close()
#     p1.join()
  管道
  数据共享:
  队列和管道只是实现了数据的传递,还没有实现数据的共享,如实现数据共享,就要用到Managers( 注:进程间通信应该尽量避免使用共享数据的方式 )
from multiprocessing import Process,Manager
import os
def f(dict1,list1):
dict1[os.getpid()] = os.getpid()            # 往字典里放当前PID
list1.append(os.getpid())                   # 往列表里放当前PID
print(list1)
if __name__ == "__main__":
with Manager() as manager:
d = manager.dict()                       #生成一个字典,可在多个进程间共享和传递
l = manager.list(range(5))               #生成一个列表,可在多个进程间共享和传递
p_list = []
for i in range(10):
p = Process(target=f,args=(d,l))
p.start()
p_list.append(p)                     # 存进程列表
for res in p_list:
res.join()
print('\n%s' %d)                                        #若要保证数据安全,需要加锁lock=Lock()
  进程池
  对于需要使用几个甚至十几个进程时,我们使用Process还是比较方便的,但是如果要成百上千个进程,用Process显然太笨了,multiprocessing提供了Pool类,即现在要讲的进程池,能够将众多进程放在一起,设置一个运行进程上限,每次只运行设置的进程数,等有进程结束,再添加新的进程
  Pool(processes =num):设置运行进程数,当一个进程运行完,会添加新的进程进去
  apply_async:异步,串行
  apply:同步,并行
  close():关闭pool,不能再添加新的任务
import os
import time
import random
from multiprocessing import Pool
from multiprocessing import Process
def func(i):
i += 1
if __name__ == '__main__':
p = Pool(5)          # 创建了5个进程
start = time.time()
p.map(func,range(1000))
p.close()                        # 是不允许再向进程池中添加任务
p.join()                        #阻塞等待 执行进程池中的所有任务直到执行结束
print(time.time() - start)
start = time.time()
l = []
for i in range(1000):
p = Process(target=func,args=(i,))  # 创建了一百个进程
p.start()
l.append(p)
[i.join() for i in l]
print(time.time() - start)
  回调函数:
import os
import time
from multiprocessing import Pool
# 参数 概念 回调函数
def func(i):    # 多进程中的io多,分出去一部分
print('子进程%s:%s'%(i,os.getpid()))
return i*'*'
def call(arg):   # 回调函数是在主进程中完成的,不能传参数,只能接受多进程中函数的返回值
print('回调 :',os.getpid())
print(arg)
if __name__ == '__main__':
print('主进程',os.getpid())
p = Pool(5)
for i in range(10):
p.apply_async(func,args=(i,),callback=call)  #callback 回调函数 :主进程执行 参数是子进程执行的函数的返回值
p.close()
p.join()

上文内容不用于商业目的,如涉及知识产权问题,请权利人联系博为峰小编(021-64471599-8017),我们将立即处理。
22/2<12
《2023软件测试行业现状调查报告》独家发布~

关注51Testing

联系我们

快捷面板 站点地图 联系我们 广告服务 关于我们 站长统计 发展历程

法律顾问:上海兰迪律师事务所 项棋律师
版权所有 上海博为峰软件技术股份有限公司 Copyright©51testing.com 2003-2024
投诉及意见反馈:webmaster@51testing.com; 业务联系:service@51testing.com 021-64471599-8017

沪ICP备05003035号

沪公网安备 31010102002173号