多线程处理——测试工程师Python开发实战(09)

发表于:2023-8-08 09:34

字体: | 上一篇 | 下一篇 | 我要投稿

 作者:胡通    来源:51Testing软件测试网原创

#
Python
  4.5  多线程处理
  4.5.1  线程的含义
  线程是操作系统进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一个线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每个线程并行执行不同的任务,当中每一个线程共享当前进程的资源。
  Python由于存在全局解释器锁,任意时刻都只有一个线程在运行代码,致使多线程不能充分利用计算机多核的特性。如果程序是CPU密集型的,使用Python的多线程确实无法提升程序的效率,如果程序是IO密集型的,则可以使用Python的多线程提高程序的整体效率。也就是说,计算密集型的程序用多进程,如大数据计算;IO密集型的程序用多线程,如文件读写、网络数据传输。
  注意
  并发指任务数大于CPU核数,通过操作系统的各种任务调度算法,实现用多任务同时执行,实际上总有一些任务不在执行,因为切换任务的速度相当快,所以看上去是同时执行的。
  并行指任务数小于或等于CPU核数,即任务真的是一起执行的。
  提示
  当一个程序启动时,就有一个进程被操作系统创建,与此同时一个线程也立刻运行,该线程通常叫作程序的主线程。因为它是程序开始时就执行的,如果我们需要再创建线程,那么再创建的线程就是这个主线程的子线程。
  使用threading库和ThreadPoolExecutor类创建的线程均为子线程。
  4.5.2  线程的使用
  Python提供了threading库来实现多线程,threading库提供了Thread类,用于创建线程对象,创建方式如下:
t = threading.Thread(name, target, *args, **kwargs)?
  其中,参数name表示线程名称,target表示线程函数,args元组用于给线程函数传参,kwargs字典用于给线程函数传参。Thread类中的常用方法如表4-11所示。
表4-11  Thread类中的常用方法
  利用threading库来实现多线程有如下两种方式。
  (1)创建一个threading.Thread实例,并传入一个初始化函数对象作为线程执行的入口。
  (2)继承threading.Thread类,并重写run()方法。
  第一种方式是,我们通过创建Thread的实例,并传递一个函数,如代码清单4-6所示。
# -*- coding: utf-8 -*-
# @Time : 2022/2/19 10:20 上午
# @Project : threadDemo
# @File : threadUtil1.py
# @Author : hutong
# @Describe: 微信公众号:大话性能
# @Version: Python3.9.8
?
from threading import Thread,currentThread
from time import sleep
 
def fun_thread(sec,tname):
    '''线程函数,用于修改线程的名称'''
    print("启动线程-->",currentThread().getName(),":",currentThread().is_alive())
    print("setName修改线程名称")
    currentThread().setName(tname)
    sleep(sec)
    print("{}线程结束".format(currentThread().getName()))
?
if __name__ == '__main__':
threads = []  # 维护线程
for i in range(3):
        t = Thread(target=fun_thread, name="thread-%d"%i, 
               args=(3,"My"+str(i)+"Thread"))
        threads.append(t)
        t.start()
for t in threads:
        t.join()?
代码清单4-6  threadUtil1
  注意,在定义多线程传递参数的时候,如果只有一个参数,则这个参数后一定要加上逗号,例如args=(i, );如果有两个或者以上的参数,则不用在最后一个参数后加上逗号,例如args=(i, j)。
  第二种方式是,我们通过继承Thread类,并运行Thread类中的init()方法来获取父类属性,并重写run()方法。在线程启动后,程序将自动执行run()方法,如代码清单4-7所示。
# -*- coding: utf-8 -*-
# @Time : 2022/2/19 10:25 上午
# @Project : threadDemo
# @File : threadUtil2.py
# @Author : hutong
# @Describe: 微信公众号:大话性能
# @Version: Python3.9.8
?
import threading
from time import sleep, ctime
 
loops = (4, 2)
class MyThread(threading.Thread):
    def __init__(self, target, args):
        super().__init__()
        self.target = target
        self.args = args
    def run(self):
        self.target(*self.args)
def loop(nloop, nsec):
    print(ctime(), 'start loop', nloop)
    sleep(nsec)
    print(ctime(), 'end loop', nloop)
def main():
    threads = []
    nloops = range(len(loops))
    for i in nloops:
        t = MyThread(loop,(i, loops[i]))
        threads.append(t)
    for i in nloops:
        threads[i].start()
    for i in nloops:
        threads[i].join()
if __name__ == "__main__":
    main()
代码清单4-7  threadUtil2
  4.5.3  线程池的使用
  因为创建线程系统需要分配资源,终止线程系统需要回收资源,所以如果可以重用线程,则可以省去创建/终止线程的开销以提升性能。线程池在系统启动时即创建大量空闲的线程,程序只要将一个函数提交给线程池,线程池就会启动一个空闲的线程来执行它。当该函数执行结束后,线程并不会死亡,而是返回线程池中变成空闲状态,等待执行下一个函数。
  Python为我们提供了ThreadPoolExecutor类来实现线程池,通常适用场景为突发大量请求或需要大量线程来完成任务,但实际任务处理时间较短。
  线程池的基类是concurrent.futures库中的Executor类,Executor类提供了两个子类,即ThreadPoolExecutor类和ProcessPoolExecutor类,其中ThreadPoolExecutor类用于创建线程池,ProcessPoolExecutor类用于创建进程池。
  如果使用线程池/进程池来管理并发编程,那么只要将相应的task()函数提交给线程池/进程池,剩下的事情就由线程池/进程池来完成。
  Executor类的常用方法如表4-12所示。
表4-12  Executor类的常用方法
  将task()函数通过submit()方法提交给线程池后,submit()方法会返回一个Future对象,Future类主要用于获取线程任务函数的返回值。由于线程任务会在新线程中以异步方式执行,因此线程执行的函数相当于一个“将来完成”的任务,所以Python使用Future来表示。Future类的常用方法如表4-13所示。
表4-13  Future类的常用方法
  使用线程池来执行线程任务的步骤如下。
  (1)调用ThreadPoolExecutor类的构造器创建一个线程池。
  (2)定义一个普通函数作为线程任务。
  (3)调用ThreadPoolExecutor对象的submit()方法来提交线程任务。
  (4)当不想提交任何任务时,调用ThreadPoolExecutor对象的shutdown()方法来关闭线程池。
  ThreadPoolExecutor类使用示例如代码清单4-8所示。
# -*- coding: utf-8 -*-
# @Time : 2022/2/19 10:29 上午
# @Project : threadDemo
# @File : threadUtil3.py
# @Author : hutong
# @Describe: 微信公众号:大话性能
# @Version: Python3.9.8
?
from concurrent.futures import ThreadPoolExecutor
import os,time,random
?
def task(n):
    print(f"子线程:{os.getpid()}正在执行")
    time.sleep(random.randint(1,3))  # 模拟任务执行时间
    return n**2
?
if __name__ == '__main__':
    thread_pool = ThreadPoolExecutor(max_workers=4)  # 设置线程池大小
    futures = []
    for i in range(1,10):
        future = thread_pool.submit(task,i)  # 开启10个任务
        futures.append(future)
    thread_pool.shutdown(True)  # 关闭线程池,并等待任务结束
?
    for future in futures:
        print(future.result())  # 循环取出任务执行后的结果?
代码清单4-8  threadUtil3
  多线程共享变量的问题
  多线程最大的特点就是线程之间可以共享数据,由于线程的执行是无序的,若多线程同时更改一个变量,使用同样的资源,共享数据时常常会出现死锁、数据错乱等情况。
  解决以上问题的方法有如下两种。
  (1)通过线程锁。threading库提供了Lock类,这个类可以在某个线程访问某个变量的时候加锁,其他线程就使用不了这个变量了,直到当前线程处理完成,释放了锁,其他线程才能使用这个变量进行处理。也就是说,访问某个资源之前,用Lock.acquire()锁住资源,访问之后,用Lock.release()释放资源。
  (2)通过ThreadLocal。当不想将变量共享给其他线程时,我们可以使用局部变量,但在函数中定义局部变量会使得变量在函数之间传递特别复杂。ThreadLocal非常厉害,它解决了全局变量需要加锁,而局部变量传递复杂的问题。通过在线程中定义:
local_var = threading.local()?
  此时,local_var就变成了一个全局变量,但local_var只在该线程中为全局变量,对其他线程来说local_var是局部变量,其他线程不可修改。示例如下:
def process_thread(name): # 绑定ThreadLocal的student 
   local_var.student = name?
  这时,student属性只有该线程可以修改,其他线程不可以修改。
  4.5.4  高级用法
  下面我们介绍两种线程的高级用法。
  1.多线程返回执行结果
  多数情况下,使用threading库创建线程后,需要知道线程什么时候返回,或者返回的值是多少。此时我们可以使用类似callback的方式得到线程的返回结果。
  首先,定义一个Thread类的子类,传入线程执行结束后需要调用的方法,并重写run()方法,返回前调用传入的callback()方法。示例如下:
import threading
import time
?
class WorkerThread(threading.Thread):
    def __init__(self, callback):
        super(WorkerThread, self).__init__()
        self.callback = callback
?
    def run(self):
        time.sleep(5)
        self.callback(5)?
  其中,run()方法用sleep()方法模拟耗时操作,并在返回前调用传入的callback()方法。然后,在主线程中,新建WorkerThread类,传入线程结束后需要调用的callback()方法:
from worker_thead import WorkerThread
def callback(result):
    print('线程返回结果:%d' % result)
?
print('程序运行……')
worker = WorkerThread(callback)
worker.start()
worker.join()
print('程序结束……')?
  另外,如果我们采用的是线程池模式,那么可以使用Future类的as_completed()方法判断任务是否完成,并通过Future类的result()方法获取多线程的执行结果。
  2.回调函数的使用
  当我们使用线程池时,通过submit()方法提交任务future,当future调用result()方法,会阻塞当前主线程,等到所有线程完成任务后,该阻塞才会解除。如果我们不想让result()方法将线程阻塞,那么可以使用Future类的add_done_callback()方法来添加回调函数,当线程任务结束后,程序会自动触发该回调函数,并将future的结果作为参数传给回调函数,我们可以直接在回调函数内打印结果。示例如下:
from concurrent.futures import ThreadPoolExecutor
import os,time,random
?
def task(n):
    print(f"子线程:{os.getpid()}正在执行")
    time.sleep(random.randint(1,3))
    return n**2
?
def result_back(res):
    print(res.result())  # 打印任务运行的结果(不需要等待其他线程任务完成)
?
if __name__ == '__main__':
    thread_pool = ThreadPoolExecutor(max_workers=4)
    for i in range(1,10):
        future = thread_pool.submit(task,i)
        future.add_done_callback(result_back)  # 设置回调函数
    thread_pool.shutdown(True)  # 关闭线程池?
版权声明:51Testing软件测试网获得作者授权连载本书部分章节。
任何个人或单位未获得明确的书面许可,不得对本文内容复制、转载或进行镜像,否则将追究法律责
《2023软件测试行业现状调查报告》独家发布~

关注51Testing

联系我们

快捷面板 站点地图 联系我们 广告服务 关于我们 站长统计 发展历程

法律顾问:上海兰迪律师事务所 项棋律师
版权所有 上海博为峰软件技术股份有限公司 Copyright©51testing.com 2003-2024
投诉及意见反馈:webmaster@51testing.com; 业务联系:service@51testing.com 021-64471599-8017

沪ICP备05003035号

沪公网安备 31010102002173号