Python异步IO编程的进程/线程通信实现

发表于:2023-8-09 09:43

字体: | 上一篇 | 下一篇 | 我要投稿

 作者:你的老师父    来源:今日头条

  一、 Python 中线程间通信的实现方式
  共享变量
  共享变量是多个线程可以共同访问的变量。在Python中,可以使用threading模块中的Lock对象来确保线程安全,避免多个线程同时访问同一个变量而导致的数据竞争问题。
  下面是一个使用共享变量进行线程间通信的示例代码:
  import threading
  # 共享变量
  count = 0
  lock = threading.Lock()
  # 线程函数
  def increment():
      global count
      for i in range(1000000):
          lock.acquire()
          count += 1
          lock.release()
  # 创建线程
  t1 = threading.Thread(target=increment)
  t2 = threading.Thread(target=increment)
  # 启动线程
  t1.start()
  t2.start()
  # 等待线程结束
  t1.join()
  t2.join()
  # 输出结果
  print("count = ", count)
  在上面的代码中,我们创建了两个线程,它们都会执行increment函数,该函数会将count变量增加1000000次。由于多个线程可能同时访问count变量,因此我们需要使用Lock对象来确保线程安全。每当一个线程需要访问count变量时,它必须先获取锁,然后执行相应的操作,最后释放锁,以便其他线程可以继续访问count变量。
  事件(Event)
  事件是一种线程间通信机制,它可以用于线程之间的通知和等待。一个线程可以设置事件,另外一个线程可以等待该事件的触发。
  在Python中,可以使用threading模块中的Event对象来实现事件。Event对象有两个方法:set和wait。当一个线程调用set方法时,它会将事件设置为已触发状态,所有等待该事件的线程都会被唤醒;当一个线程调用wait方法时,如果事件已经被设置为已触发状态,它会立即返回;否则,它会阻塞等待事件的触发。
  下面是一个使用事件进行线程间通信的示例代码:
  import threading
  # 事件对象
  event = threading.Event()
  # 线程函数1
  def wait_event():
      print("waiting for event...")
      event.wait()
      print("event has been set.")
  # 线程函数2
  def set_event():
      print("setting event...")
      event.set()
  # 创建线程
  t1 = threading.Thread(target=wait_event)
  t2 = threading.Thread(target=set_event)
  # 启动线程
  t1.start()
  t2.start()
  # 等待线程结束
  t1.join()
  t2.join()
  在上面的代码中,我们创建了两个线程,一个线程会等待事件的触发,另一个线程会设置事件。当set_event函数被调用时,它会将事件设置为已触发状态,然后wait_event函数会被唤醒,输出"event has been set."。在这个示例中,我们没有使用Lock对象来确保线程安全,因为事件对象内部已经使用了锁来实现线程安全。
  queue 模块中的队列
  queue 模块中的队列是一种先进先出(FIFO)的数据结构,用于实现多个线程之间的通信。在 Python 中,可以使用 queue 模块中的 Queue 类来创建队列。
  queue 模块中的队列类型分为两种:内存队列和文件队列。
  1、文件队列
  文件队列是一种使用文件作为队列的存储方式,可以用于在不同计算机之间传输数据。在 Python 中,可以使用 queue 模块中的 FileQueue 类来创建文件队列。
  下面是一个使用文件队列实现线程间通信的示例:
  import queue
  import threading
  def producer(q):
      for i in range(5):
          q.put(i)
          print(f'Produced {i}')
      q.put(None)
  def consumer(q):
      while True:
          item = q.get()
          if item is None:
              break
          print(f'Consumed {item}')
  if __name__ == '__main__':
      q = queue.FileQueue('queue.txt')
      t1 = threading.Thread(target=producer, args=(q,))
      t2 = threading.Thread(target=consumer, args=(q,))
      t1.start()
      t2.start()
      t1.join()
      t2.join()
  在上述代码中,创建了两个线程 t1 和 t2,t1 向文件队列中写入数据,t2 从文件队列中读取并打印数据。
  2、内存队列
  内存队列是一种使用内存作为队列的存储方式,可以用于在同一台计算机上的进程间通信。在 Python 中,可以使用 queue 模块中的 Queue 类来创建内存队列。
  下面是一个使用内存队列实现线程间通信的示例:
  import queue
  import threading
  def producer(q):
      for i in range(5):
          q.put(i)
          print(f'Produced {i}')
  def consumer(q):
      while True:
          item = q.get()
          if item is None:
              break
          print(f'Consumed {item}')
  if __name__ == '__main__':
      q = queue.Queue()
      t1 = threading.Thread(target=producer, args=(q,))
      t2 = threading.Thread(target=consumer, args=(q,))
      t1.start()
      t2.start()
      t1.join()
      q.put(None)
      t2.join()
  在上述代码中,创建了两个线程 t1 和 t2,t1 向内存队列中写入数据,t2 从内存队列中读取并打印数据。
  二、Python 中进程间通信的实现方式
  在 Python 中,进程间通信可以使用多种方式实现,例如:
  ·管道(Pipe)
  · 队列(Queue)
  · 共享内存(Shared Memory)
  · 套接字(Socket)
  下面将详细介绍这些方式。
  管道的使用及其类型
  管道是一种基于内存的通信机制,用于实现两个进程之间的通信。在 Python 中,可以使用 multiprocessing 模块中的 Pipe 类来创建管道。
  管道类型分为两种:匿名管道和命名管道。
  1、匿名管道
  匿名管道是一种临时的管道,没有名字,只能用于父进程和其创建的子进程之间的通信。匿名管道是双向的,可以同时进行读写操作。
  下面是一个使用匿名管道实现进程间通信的示例:
  import multiprocessing
  def sender(conn):
      conn.send('Hello, receiver!')
      conn.close()
  def receiver(conn):
      msg = conn.recv()
      print(msg)
      conn.close()
  if __name__ == '__main__':
      parent_conn, child_conn = multiprocessing.Pipe()
      p1 = multiprocessing.Process(target=sender, args=(parent_conn,))
      p2 = multiprocessing.Process(target=receiver, args=(child_conn,))
      p1.start()
      p2.start()
      p1.join()
      p2.join()
  在上述代码中,创建了两个进程 p1 和 p2,p1 向 p2 发送消息,p2 接收并打印消息。
  2、命名管道
  命名管道是一种持久的管道,有一个名字,可以用于任意进程之间的通信。在 Python 中,可以使用 os.mkfifo 函数来创建命名管道。
  下面是一个使用命名管道实现进程间通信的示例:
  import os
  fifo_path = 'fifo_test'
  def sender():
      with open(fifo_path, 'w') as f:
          f.write('Hello, receiver!')
  def receiver():
      with open(fifo_path, 'r') as f:
          msg = f.read()
          print(msg)
  if __name__ == '__main__':
      if not os.path.exists(fifo_path):
          os.mkfifo(fifo_path)
      p1 = multiprocessing.Process(target=sender)
      p2 = multiprocessing.Process(target=receiver)
      p1.start()
      p2.start()
      p1.join()
      p2.join()
  在上述代码中,创建了两个进程 p1 和 p2,p1 向命名管道中写入消息,p2 从命名管道中读取并打印消息。
  multiprocessing 模块中队列的使用及其类型
  multiprocessing 模块中的队列是一种多进程通信机制,可以用于实现多个进程之间的数据传输。在 Python 中,可以使用 multiprocessing 模块中的 Queue 类来创建队列。
  multiprocessing 模块中的队列类型分为两种:普通队列和优先级队列。
  普通队列
  普通队列是一种先进先出(FIFO)的队列,可以用于在同一台计算机上的进程间通信。在 Python 中,可以使用 multiprocessing 模块中的 Queue 类来创建普通队列。
  下面是一个使用普通队列实现进程间通信的示例:
  import multiprocessing
  def producer(q):
      for i in range(5):
          q.put(i)
          print(f'Produced {i}')
  def consumer(q):
      while True:
          item = q.get()
          if item is None:
              break
          print(f'Consumed {item}')
  if __name__ == '__main__':
      q = multiprocessing.Queue()
      p1 = multiprocessing.Process(target=producer, args=(q,))
      p2 = multiprocessing.Process(target=consumer, args=(q,))
      p1.start()
      p2.start()
      p1.join()
      q.put(None)
      p2.join()
  在上述代码中,创建了两个进程 p1 和 p2,p1 向普通队列中写入数据,p2 从普通队列中读取并打印数据。
  优先级队列
  优先级队列是一种根据元素优先级排序的队列,可以用于在同一台计算机上的进程间通信。在 Python 中,可以使用 multiprocessing 模块中的 PriorityQueue 类来创建优先级队列。
  下面是一个使用优先级队列实现进程间通信的示例:
  import multiprocessing
  def producer(q):
      q.put((1, 'high-priority message'))
      q.put((2, 'low-priority message'))
      print('Messages sent')
  def consumer(q):
      while True:
          item = q.get()
          if item is None:
              break
          print(f'Consumed {item[1]} with priority {item[0]}')
  if __name__ == '__main__':
      q = multiprocessing.PriorityQueue()
      p1 = multiprocessing.Process(target=producer, args=(q,))
      p2 = multiprocessing.Process(target=consumer, args=(q,))
      p1.start()
      p2.start()
      p1.join()
      q.put(None)
      p2.join()
  在上述代码中,创建了两个进程 p1 和 p2,p1 向优先级队列中写入数据,其中一个消息的优先级高于另一个消息,p2 从优先级队列中读取并打印数据。
  以上就是 Python 中文件队列、内存队列、普通队列和优先级队列在线程和进程间通信的方式的完整代码示例。需要注意的是,在使用队列进行线程间或进程间通信时,需要进行同步和互斥操作,以避免数据竞争和其他并发问题。因此,在使用队列进行线程间或进程间通信时,需要仔细设计和实现代码,确保程序的正确性和稳定性。
  共享内存的使用及其类型
  共享内存是一种多个进程共享同一块内存的通信机制,可以用于实现多个进程之间的高效通信。在 Python 中,可以使用 multiprocessing 模块中的 Value 和 Array 类来创建共享内存。
  共享内存类型分为两种:基本类型和数组类型。
  1、基本类型
  基本类型是指 Python 中的基本数据类型,例如整数、浮点数等。在共享内存中,可以使用 Value 类来创建基本类型的共享内存。
  下面是一个使用基本类型共享内存实现进程间通信的示例:
  import multiprocessing
  def sender(value):
      value.value = 1
  def receiver(value):
      print(value.value)
  if __name__ == '__main__':
      value = multiprocessing.Value('i', 0)
      p1 = multiprocessing.Process(target=sender, args=(value,))
      p2 = multiprocessing.Process(target=receiver, args=(value,))
      p1.start()
      p2.start()
      p1.join()
      p2.join()
  在上述代码中,创建了两个进程 p1 和 p2,p1 向共享内存中写入整数值,p2 从共享内存中读取并打印整数值。
  2、数组类型
  数组类型是指 Python 中的数组,可以使用 Array 类来创建数组类型的共享内存。
  下面是一个使用数组类型共享内存实现进程间通信的示例:
  import multiprocessing
  def sender(arr):
      arr[0] = 1
  def receiver(arr):
      print(arr[:])
  if __name__ == '__main__':
      arr = multiprocessing.Array('i', range(10))
      p1 = multiprocessing.Process(target=sender, args=(arr,))
      p2 = multiprocessing.Process(target=receiver, args=(arr,))
      p1.start()
      p2.start()
      p1.join()
      p2.join()
  在上述代码中,创建了两个进程 p1 和 p2,p1 向共享内存中写入整数数组,p2 从共享内存中读取并打印整数数组。
  套接字的使用及其类型
  套接字是一种网络通信机制,可以用于不同计算机之间的进程通信。在 Python 中,可以使用 socket 模块来创建套接字。
  套接字类型分为两种:流套接字和数据报套接字。
  1、流套接字
  流套接字是一种基于 TCP 协议的套接字,可以实现可靠的面向连接的数据传输,适用于大量数据传输和长时间连接。在 Python 中,可以使用 socket 模块中的 socket 类来创建流套接字。
  下面是一个使用流套接字实现进程间通信的示例:
  import socket
  HOST = 'localhost'
  PORT = 5000
  def sender():
      with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
          s.connect((HOST, PORT))
          s.sendall(b'Hello, receiver!')
  def receiver():
      with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
          s.bind((HOST, PORT))
          s.listen()
          conn, addr = s.accept()
          with conn:
              data = conn.recv(1024)
              print(data)
  if __name__ == '__main__':
      p1 = multiprocessing.Process(target=sender)
      p2 = multiprocessing.Process(target=receiver)
      p1.start()
      p2.start()
      p1.join()
      p2.join()
  在上述代码中,创建了两个进程 p1 和 p2,p1 向流套接字中写入消息,p2 从流套接字中读取并打印消息。
  2、数据报套接字
  数据报套接字是一种基于 UDP 协议的套接字,可以实现无连接的数据传输,适用于少量数据传输和短时间连接。在 Python 中,可以使用 socket 模块中的 socket 类来创建数据报套接字。
  下面是一个使用数据报套接字实现进程间通信的示例:
  import socket
  HOST = 'localhost'
  PORT = 5000
  def sender():
      with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
          s.sendto(b'Hello, receiver!', (HOST, PORT))
  def receiver():
      with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
          s.bind((HOST, PORT))
          data, addr = s.recvfrom(1024)
          print(data)
  if __name__ == '__main__':
      p1 = multiprocessing.Process(target=sender)
      p2 = multiprocessing.Process(target=receiver)
      p1.start()
      p2.start()
      p1.join()
      p2.join()
  在上述代码中,创建了两个进程 p1 和 p2,p1 向数据报套接字中写入消息,p2 从数据报套接字中读取并打印消息。
  手动实现进程间通信
  除了使用 Python 提供的多进程通信机制之外,还可以手动实现进程间通信。在 Python 中,可以使用共享内存和信号量来手动实现进程间通信。
  下面是一个使用共享内存和信号量手动实现进程间通信的示例:
  import multiprocessing
  import mmap
  import os
  import signal
  import time
  def sender(data, sem):
      time.sleep(1)
      sem.acquire()
      data.seek(0)
      data.write(b'Hello, receiver!')
      sem.release()
  def receiver(data, sem):
      sem.acquire()
      data.seek(0)
      print(data.read())
      sem.release()
  if __name__ == '__main__':
      with multiprocessing.shared_memory() as mem:
          with mmap.mmap(mem.fd, mem.size) as data:
              data.write(b'\0' * mem.size)
              sem = multiprocessing.Semaphore(1)
              p1 = multiprocessing.Process(target=sender, args=(data, sem))
              p2 = multiprocessing.Process(target=receiver, args=(data, sem))
              p1.start()
              p2.start()
              p1.join()
              p2.join()
  在上述代码中,创建了两个进程 p1 和 p2,p1 向共享内存中写入消息,p2 从共享内存中读取并打印消息。使用信号量来保证共享内存的互斥访问。
  总结
  本文介绍了 Python 中常用的多线程和进程通信机制。这些机制可以满足不同线程间的数据传输需要,应根据具体场景选择合适的通信机制。
  本文内容不用于商业目的,如涉及知识产权问题,请权利人联系51Testing小编(021-64471599-8017),我们将立即处理
《2023软件测试行业现状调查报告》独家发布~

关注51Testing

联系我们

快捷面板 站点地图 联系我们 广告服务 关于我们 站长统计 发展历程

法律顾问:上海兰迪律师事务所 项棋律师
版权所有 上海博为峰软件技术股份有限公司 Copyright©51testing.com 2003-2024
投诉及意见反馈:webmaster@51testing.com; 业务联系:service@51testing.com 021-64471599-8017

沪ICP备05003035号

沪公网安备 31010102002173号