Python 实现定时任务的八种方案!(2)

发表于:2021-11-29 09:33

字体: | 上一篇 | 下一篇 | 我要投稿

 作者:佚名    来源:菜鸟学Python

  利用任务框架 APScheduler 实现定时任务
  APScheduler[5](advanceded python scheduler)基于 Quartz 的一个 Python 定时任务框架,实现了 Quartz 的所有功能,使用起来十分方便。提供了基于日期、固定时间间隔以及 crontab 类型的任务,并且可以持久化任务。基于这些功能,我们可以很方便的实现一个 Python 定时任务系统。
  它有以下三个特点:
  ·类似于 Liunx Cron 的调度程序(可选的开始/结束时间)
  · 基于时间间隔的执行调度(周期性调度,可选的开始/结束时间)
  · 一次性执行任务(在设定的日期/时间运行一次任务)
  APScheduler 有四种组成部分:
  · 触发器 (trigger) 包含调度逻辑,每一个作业有它自己的触发器,用于决定接下来哪一个作业会运行。除了他们自己初始配置意外,触发器完全是无状态的。
  ·  作业存储 (job store) 存储被调度的作业,默认的作业存储是简单地把作业保存在内存中,其他的作业存储是将作业保存在数据库中。一个作业的数据讲在保存在持久化作业存储时被序列化,并在加载时被反序列化。调度器不能分享同一个作业存储。
  · 执行器 (executor) 处理作业的运行,他们通常通过在作业中提交制定的可调用对象到一个线程或者进城池来进行。当作业完成时,执行器将会通知调度器。
  · 调度器 (scheduler) 是其他的组成部分。你通常在应用只有一个调度器,应用的开发者通常不会直接处理作业存储、调度器和触发器,相反,调度器提供了处理这些的合适的接口。配置作业存储和执行器可以在调度器中完成,例如添加、修改和移除作业。 通过配置 executor、jobstore、trigger,使用线程池 (ThreadPoolExecutor 默认值 20) 或进程池 (ProcessPoolExecutor 默认值 5) 并且默认最多 3 个 (max_instances) 任务实例同时运行,实现对 job 的增删改查等调度控制。
  示例代码:
  from apscheduler.schedulers.blocking import BlockingScheduler  
  from datetime import datetime  
  # 输出时间  
  def job():  
      print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))  
  # BlockingScheduler  
  sched = BlockingScheduler()  
  sched.add_job(my_job, 'interval', seconds=5, id='my_job_id')  
  sched.start()  
  APScheduler 中的重要概念
  Job 作业
  Job 作为 APScheduler 最小执行单位。创建 Job 时指定执行的函数,函数中所需参数,Job 执行时的一些设置信息。
  构建说明:
   id:指定作业的唯一 ID
   name:指定作业的名字
   trigger:apscheduler 定义的触发器,用于确定 Job 的执行时间,根据设置的 trigger 规则,计算得到下次执行此 job 的时间, 满足时将会执行
   executor:apscheduler 定义的执行器,job 创建时设置执行器的名字,根据字符串你名字到 scheduler 获取到执行此 job 的 执行器,执行 job 指定的函数
   max_instances:执行此 job 的最大实例数,executor 执行 job 时,根据 job 的 id 来计算执行次数,根据设置的最大实例数来确定是否可执行
   next_run_time:Job 下次的执行时间,创建 Job 时可以指定一个时间 [datetime], 不指定的话则默认根据 trigger 获取触发时间
   misfire_grace_time:Job 的延迟执行时间,例如 Job 的计划执行时间是 21:00:00,但因服务重启或其他原因导致 21:00:31 才执行,如果设置此 key 为 40, 则该 job 会继续执行,否则将会丢弃此 job
   coalesce:Job 是否合并执行,是一个 bool 值。例如 scheduler 停止 20s 后重启启动,而 job 的触发器设置为 5s 执行一次,因此此 job 错过了 4 个执行时间,如果设置为是,则会合并到一次执行,否则会逐个执行
   func:Job 执行的函数
   args:Job 执行函数需要的位置参数
   kwargs:Job 执行函数需要的关键字参数
  Trigger 触发器
  Trigger 绑定到 Job,在 scheduler 调度筛选 Job 时,根据触发器的规则计算出 Job 的触发时间,然后与当前时间比较确定此 Job 是否会被执行,总之就是根据 trigger 规则计算出下一个执行时间。
  目前 APScheduler 支持触发器:
  ·指定时间的 DateTrigger
  · 指定间隔时间的 IntervalTrigger
  · 像 Linux 的 crontab 一样的 CronTrigger。
  触发器参数:date
  date 定时,作业只执行一次。
  ·run_date (datetime|str) – the date/time to run the job at
  · timezone (datetime.tzinfo|str) – time zone for run_date if it doesn’t have one already 
  sched.add_job(my_job, 'date', run_date=date(2009, 11, 6), args=['text'])  
  sched.add_job(my_job, 'date', run_date=datetime(2019, 7, 6, 16, 30, 5), args=['text'])  
  触发器参数:interval
  interval 间隔调度
  · weeks (int) – 间隔几周
  · days (int) – 间隔几天
  · hours (int) – 间隔几小时
  · minutes (int) – 间隔几分钟
  · seconds (int) – 间隔多少秒
  · start_date (datetime|str) – 开始日期
  · end_date (datetime|str) – 结束日期
  · timezone (datetime.tzinfo|str) – 时区 
  sched.add_job(job_function, 'interval', hours=2) 
  触发器参数:cron
  cron 调度
   (int|str) 表示参数既可以是 int 类型,也可以是 str 类型
   (datetime | str) 表示参数既可以是 datetime 类型,也可以是 str 类型
   year (int|str) – 4-digit year -(表示四位数的年份,如 2008 年)
   month (int|str) – month (1-12) -(表示取值范围为 1-12 月)
   day (int|str) – day of the (1-31) -(表示取值范围为 1-31 日)
   week (int|str) – ISO week (1-53) -(格里历 2006 年 12 月 31 日可以写成 2006 年-W52-7(扩展形式)或 2006W527(紧凑形式))
   day_of_week (int|str) – number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun) – (表示一周中的第几天,既可以用 0-6 表示也可以用其英语缩写表示)
   hour (int|str) – hour (0-23) – (表示取值范围为 0-23 时)
   minute (int|str) – minute (0-59) – (表示取值范围为 0-59 分)
   second (int|str) – second (0-59) – (表示取值范围为 0-59 秒)
   start_date (datetime|str) – earliest possible date/time to trigger on (inclusive) – (表示开始时间)
   end_date (datetime|str) – latest possible date/time to trigger on (inclusive) – (表示结束时间)
   timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations (defaults to scheduler timezone) -(表示时区取值)
  CronTrigger 可用的表达式:

  # 6-8,11-12 月第三个周五 00:00, 01:00, 02:00, 03:00 运行  
  sched.add_job(job_function, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')  
  # 每周一到周五运行 直到 2024-05-30 00:00:00  
  sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2024-05-30' 
  Executor 执行器
  Executor 在 scheduler 中初始化,另外也可通过 scheduler 的 add_executor 动态添加 Executor。每个 executor 都会绑定一个 alias,这个作为唯一标识绑定到 Job,在实际执行时会根据 Job 绑定的 executor 找到实际的执行器对象,然后根据执行器对象执行 Job。Executor 的种类会根据不同的调度来选择,如果选择 AsyncIO 作为调度的库,那么选择 AsyncIOExecutor,如果选择 tornado 作为调度的库,选择 TornadoExecutor,如果选择启动进程作为调度,选择 ThreadPoolExecutor 或者 ProcessPoolExecutor 都可以。Executor 的选择需要根据实际的 scheduler 来选择不同的执行器。目前 APScheduler 支持的 Executor:
  ·executors.asyncio:同步 io,阻塞
  ·  executors.gevent:io 多路复用,非阻塞
  ·  executors.pool: 线程 ThreadPoolExecutor 和进程 ProcessPoolExecutor
  ·  executors.twisted:基于事件驱动
  Jobstore 作业存储
  Jobstore 在 scheduler 中初始化,另外也可通过 scheduler 的 add_jobstore 动态添加 Jobstore。每个 jobstore 都会绑定一个 alias,scheduler 在 Add Job 时,根据指定的 jobstore 在 scheduler 中找到相应的 jobstore,并将 job 添加到 jobstore 中。作业存储器决定任务的保存方式, 默认存储在内存中(MemoryJobStore),重启后就没有了。APScheduler 支持的任务存储器有:
   jobstores.memory:内存
   jobstores.mongodb:存储在 mongodb
   jobstores.redis:存储在 redis
   jobstores.rethinkdb:存储在 rethinkdb
   jobstores.sqlalchemy:支持 sqlalchemy 的数据库如 mysql,sqlite 等
   jobstores.zookeeper:zookeeper
  不同的任务存储器可以在调度器的配置中进行配置(见调度器)
  Event 事件
  Event 是 APScheduler 在进行某些操作时触发相应的事件,用户可以自定义一些函数来监听这些事件,当触发某些 Event 时,做一些具体的操作。常见的比如。Job 执行异常事件 EVENT_JOB_ERROR。Job 执行时间错过事件 EVENT_JOB_MISSED。
  目前 APScheduler 定义的 Event:
   EVENT_SCHEDULER_STARTED
   EVENT_SCHEDULER_START
   EVENT_SCHEDULER_SHUTDOWN
   EVENT_SCHEDULER_PAUSED
   EVENT_SCHEDULER_RESUMED
   EVENT_EXECUTOR_ADDED
   EVENT_EXECUTOR_REMOVED
   EVENT_JOBSTORE_ADDED
   EVENT_JOBSTORE_REMOVED
   EVENT_ALL_JOBS_REMOVED
   EVENT_JOB_ADDED
   EVENT_JOB_REMOVED
   EVENT_JOB_MODIFIED
   EVENT_JOB_EXECUTED
   EVENT_JOB_ERROR
   EVENT_JOB_MISSED
   EVENT_JOB_SUBMITTED
   EVENT_JOB_MAX_INSTANCES
  Listener 表示用户自定义监听的一些 Event,比如当 Job 触发了 EVENT_JOB_MISSED 事件时可以根据需求做一些其他处理。
  调度器
  Scheduler 是 APScheduler 的核心,所有相关组件通过其定义。scheduler 启动之后,将开始按照配置的任务进行调度。除了依据所有定义 Job 的 trigger 生成的将要调度时间唤醒调度之外。当发生 Job 信息变更时也会触发调度。
  APScheduler 支持的调度器方式如下,比较常用的为 BlockingScheduler 和 BackgroundScheduler
  ·  BlockingScheduler:适用于调度程序是进程中唯一运行的进程,调用 start 函数会阻塞当前线程,不能立即返回。
  ·  BackgroundScheduler:适用于调度程序在应用程序的后台运行,调用 start 后主线程不会阻塞。
  ·  AsyncIOScheduler:适用于使用了 asyncio 模块的应用程序。
  ·  GeventScheduler:适用于使用 gevent 模块的应用程序。
  ·  TwistedScheduler:适用于构建 Twisted 的应用程序。
  ·  QtScheduler:适用于构建 Qt 的应用程序。
  Scheduler 的工作流程
  Scheduler 添加 job 流程:
  Scheduler 调度流程:

  本文内容不用于商业目的,如涉及知识产权问题,请权利人联系51Testing小编(021-64471599-8017),我们将立即处理
《2023软件测试行业现状调查报告》独家发布~

关注51Testing

联系我们

快捷面板 站点地图 联系我们 广告服务 关于我们 站长统计 发展历程

法律顾问:上海兰迪律师事务所 项棋律师
版权所有 上海博为峰软件技术股份有限公司 Copyright©51testing.com 2003-2024
投诉及意见反馈:webmaster@51testing.com; 业务联系:service@51testing.com 021-64471599-8017

沪ICP备05003035号

沪公网安备 31010102002173号