8 种 Python 定时任务的解决方案(1)

上一篇 / 下一篇  2021-09-26 10:17:12

  在日常工作中,我们常常会用到需要周期性执行的任务,一种方式是采/用 Linux 系统自带的 crond 结合命令行实现,另外一种方式是直接使用Python
  最近我整理了一下 Python 定时任务的实现方式,内容较长,建议收藏后学习,梳理不易,有所收获,点赞支持。
  我们开始学习吧!
  1. 利用while True: + sleep()实现定时任务
  位于 time 模块中的 sleep(secs) 函数,可以实现令当前执行的线程暂停 secs 秒后再继续执行。所谓暂停,即令当前线程进入阻塞状态,当达到 sleep() 函数规定的时间后,再由阻塞状态转为就绪状态,等待 CPU 调度。
  基于这样的特性我们可以通过while死循环+sleep()的方式实现简单的定时任务。
  代码示例:
  import datetime  
  import time  
  def time_printer():  
      now = datetime.datetime.now()  
      ts = now.strftime('%Y-%m-%d %H:%M:%S')  
      print('do func time :', ts)  
  def loop_monitor():  
      while True:  
          time_printer()  
          time.sleep(5)  # 暂停5秒  
  if __name__ == "__main__":  
      loop_monitor() 
  主要缺点:
  · 只能设定间隔,不能指定具体的时间,比如每天早上8:00
  · sleep 是一个阻塞函数,也就是说 sleep 这一段时间,程序什么也不能操作。
  2. 使用Timeloop库运行定时任务
  Timeloop是一个库,可用于运行多周期任务。这是一个简单的库,它使用decorator模式在线程中运行标记函数。
  示例代码:
  import time  
  from timeloop import Timeloop  
  from datetime import timedelta  
  tl = Timeloop()  
  @tl.job(interval=timedelta(seconds=2))  
  def sample_job_every_2s():  
      print "2s job current time : {}".format(time.ctime())  
  @tl.job(interval=timedelta(seconds=5))  
  def sample_job_every_5s():  
      print "5s job current time : {}".format(time.ctime())  
  @tl.job(interval=timedelta(seconds=10))  
  def sample_job_every_10s():  
      print "10s job current time : {}".format(time.ctime()) 
  3. 利用threading.Timer实现定时任务
  threading 模块中的 Timer 是一个非阻塞函数,比 sleep 稍好一点,timer最基本理解就是定时器,我们可以启动多个定时任务,这些定时器任务是异步执行,所以不存在等待顺序执行问题。
  Timer(interval, function, args=[ ], kwargs={ })
  · interval: 指定的时间
  · function: 要执行的方法
  · args/kwargs: 方法的参数
  代码示例
  备注:Timer只能执行一次,这里需要循环调用,否则只能执行一次
  4. 利用内置模块sched实现定时任务
  sched模块实现了一个通用事件调度器,在调度器类使用一个延迟函数等待特定的时间,执行任务。同时支持多线程应用程序,在每个任务执行后会立刻调用延时函数,以确保其他线程也能执行。
  class sched.scheduler(timefunc, delayfunc)这个类定义了调度事件的通用接口,它需要外部传入两个参数,timefunc是一个没有参数的返回时间类型数字的函数(常用使用的如time模块里面的time),delayfunc应该是一个需要一个参数来调用、与timefunc的输出兼容、并且作用为延迟多个时间单位的函数(常用的如time模块的sleep)。
  代码示例:
  import datetime  
  import time  
  import sched  
  def time_printer():  
      now = datetime.datetime.now()  
      ts = now.strftime('%Y-%m-%d %H:%M:%S')  
      print('do func time :', ts)  
      loop_monitor()  
  def loop_monitor():  
      s = sched.scheduler(time.time, time.sleep)  # 生成调度器  
      s.enter(5, 1, time_printer, ())  
      s.run()  
  if __name__ == "__main__":  
      loop_monitor() 
  scheduler对象主要方法:
  · enter(delay, priority, action, argument),安排一个事件来延迟delay个时间单位。
  · cancel(event):从队列中删除事件。如果事件不是当前队列中的事件,则该方法将跑出一个ValueError。
  · run():运行所有预定的事件。这个函数将等待(使用传递给构造函数的delayfunc()函数),然后执行事件,直到不再有预定的事件。
  个人点评:比threading.Timer更好,不需要循环调用。
  5. 利用调度模块schedule实现定时任务
  schedule是一个第三方轻量级的任务调度模块,可以按照秒,分,小时,日期或者自定义事件执行时间。schedule允许用户使用简单、人性化的语法以预定的时间间隔定期运行Python函数(或其它可调用函数)。
  先来看代码,是不是不看文档就能明白什么意思?
  import schedule  
  import time  
  def job():  
      print("I'm working...") 
  schedule.every(10).seconds.do(job)  
  schedule.every(10).minutes.do(job)  
  schedule.every().hour.do(job)  
  schedule.every().day.at("10:30").do(job)  
  schedule.every(5).to(10).minutes.do(job)  
  schedule.every().monday.do(job)  
  schedule.every().wednesday.at("13:15").do(job)  
  schedule.every().minute.at(":17").do(job)  
  while True:  
      schedule.run_pending()  
      time.sleep(1) 
  装饰器:通过 @repeat() 装饰静态方法
  import time  
  from schedule import every, repeat, run_pending  
  @repeat(every().second)  
  def job():  
      print('working...')  
  while True:  
      run_pending()  
      time.sleep(1) 
  传递参数:
  import schedule  
  def greet(name):  
      print('Hello', name)  
  schedule.every(2).seconds.do(greet, name='Alice')  
  schedule.every(4).seconds.do(greet, name='Bob')  
  while True:  
      schedule.run_pending()
  装饰器同样能传递参数:
  from schedule import every, repeat, run_pending  
  @repeat(every().second, 'World')  
  @repeat(every().minute, 'Mars')  
  def hello(planet):  
      print('Hello', planet)  
  while True:  
      run_pending() 
  取消任务:
  import schedule  
  i = 0  
  def some_task():  
      global i  
      i += 1  
      print(i)  
      if i == 10:  
          schedule.cancel_job(job)  
          print('cancel job')  
          exit(0)  
  job = schedule.every().second.do(some_task)  
  while True:  
      schedule.run_pending() 
  运行一次任务:
  import time  
  import schedule  
  def job_that_executes_once():  
      print('Hello')  
      return schedule.CancelJob  
  schedule.every().minute.at(':34').do(job_that_executes_once)  
  while True:  
      schedule.run_pending()  
      time.sleep(1) 
  根据标签检索任务:
  # 检索所有任务:schedule.get_jobs()  
  import schedule  
  def greet(name):  
      print('Hello {}'.format(name))  
  schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend')  
  schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend')  
  schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')  
  schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest')  
  friends = schedule.get_jobs('friend')  
  print(friends) 
  根据标签取消任务:
  # 取消所有任务:schedule.clear()  
  import schedule  
  def greet(name):  
      print('Hello {}'.format(name))  
      if name == 'Cancel':  
          schedule.clear('second-tasks')  
          print('cancel second-tasks')  
  schedule.every().second.do(greet, 'Andrea').tag('second-tasks', 'friend')  
  schedule.every().second.do(greet, 'John').tag('second-tasks', 'friend')  
  schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')  
  schedule.every(5).seconds.do(greet, 'Cancel').tag('daily-tasks', 'guest')  
  while True:  
      schedule.run_pending() 
  运行任务到某时间:
  import schedule  
  from datetime import datetime, timedelta, time  
  def job():  
      print('working...')  
  schedule.every().second.until('23:59').do(job)  # 今天23:59停止  
  schedule.every().second.until('2030-01-01 18:30').do(job)  # 2030-01-01 18:30停止  
  schedule.every().second.until(timedelta(hours=8)).do(job)  # 8小时后停止  
  schedule.every().second.until(time(23, 59, 59)).do(job)  # 今天23:59:59停止  
  schedule.every().second.until(datetime(2030, 1, 1, 18, 30, 0)).do(job)  # 2030-01-01 18:30停止  
  while True:  
      schedule.run_pending() 
  马上运行所有任务(主要用于测试):
  import schedule  
  def job():  
      print('working...')  
  def job1():  
      print('Hello...')  
  schedule.every().monday.at('12:40').do(job)  
  schedule.every().tuesday.at('16:40').do(job1)  
  schedule.run_all()  
  schedule.run_all(delay_seconds=3)  # 任务间延迟3秒 
  并行运行:使用 Python 内置队列实现:
  import threading  
  import time  
  import schedule  
  def job1():  
      print("I'm running on thread %s" % threading.current_thread())  
  def job2():  
      print("I'm running on thread %s" % threading.current_thread())  
  def job3():  
      print("I'm running on thread %s" % threading.current_thread())  
  def run_threaded(job_func):  
      job_thread = threading.Thread(target=job_func)  
      job_thread.start()  
  schedule.every(10).seconds.do(run_threaded, job1)  
  schedule.every(10).seconds.do(run_threaded, job2)  
  schedule.every(10).seconds.do(run_threaded, job3) 
  while True:  
      schedule.run_pending()  
      time.sleep(1) 

TAG: 软件开发 Python

 

评分:0

我来说两句

Open Toolbar