python任务调度之schedule
2021-02-02 04:14
标签:strong not 创建 注解 定时调度 enter seconds nal tee 本文通过开源项目schedule来学习定时任务如何工作 先来看下做做提供的一个例子 注解 可看到有三个类CancelJob、Scheduler、Job,对源码的分析也将围绕这三个类展开 可以看到这是一个空类,这个类的作用是当job执行函数返回一个CancelJob类型的对象时,执行完之后就会被Schedule移除,简单说就是只会执行一次 Scheduler类源码 Scheduler作用 就是在job执行的时候执行它 Job是整个定时任务的核心. 主要功能就是根据创建Job时的参数,得到下一次运行的时间. 代码如下,稍微有点长(会省略部分代码,可以看源码): 参数的含义: 通过学习schedule,可以看到实现一个基础的任务定时调度就是根据job的配置计算执行时间和执行job. 代码里我认为比较好的地方有: 的确需要好好学习下 python任务调度之schedule 标签:strong not 创建 注解 定时调度 enter seconds nal tee 原文地址:https://www.cnblogs.com/whiteBear/p/12811491.html
schedule简介
import schedule
import time
def job():
print("I‘m working...")
schedule.every(10).minutes.do(job)
schedule.every().hour.do(job)
schedule.every().day.at("10:30").do(job)
schedule.every().monday.do(job)
schedule.every().wednesday.at("13:15").do(job)
schedule.every().minute.at(":17").do(job)
while True:
schedule.run_pending()
time.sleep(1)
schedule源码学习
CancelJob
class CancelJob(object):
pass
Scheduler
这里为使代码精简、紧凑,删除了注释class Scheduler(object):
def __init__(self):
self.jobs = []
def run_pending(self):
runnable_jobs = (job for job in self.jobs if job.should_run)
for job in sorted(runnable_jobs):
self._run_job(job)
def run_all(self, delay_seconds=0):
for job in self.jobs:
self._run_job(job)
time.sleep(delay_seconds)
def clear(self):
del self.jobs[:]
def cancel_job(self, job):
try:
self.jobs.remove(job)
except ValueError:
pass
def every(self, interval=1):
job = Job(interval)
self.jobs.append(job)
return job
def _run_job(self, job):
ret = job.run()
if isinstance(ret, CancelJob) or ret is CancelJob:
self.cancel_job(job)
@property
def next_run(self):
if not self.jobs:
return None
return min(self.jobs).next_run
@property
def idle_seconds(self):
return (self.next_run - datetime.datetime.now()).total_seconds()
函数解释
Job
class Job(object):
def __init__(self, interval):
self.interval = interval # pause interval * unit between runs
self.job_func = None # the job job_func to run
self.unit = None # time units, e.g. ‘minutes‘, ‘hours‘, ...
self.at_time = None # optional time at which this job runs
self.last_run = None # datetime of the last run
self.next_run = None # datetime of the next run
self.period = None # timedelta between runs, only valid for
self.start_day = None # Specific day of the week to start on
def __lt__(self, other):
return self.next_run = self.next_run
def run(self):
logger.info(‘Running job %s‘, self)
ret = self.job_func()
self.last_run = datetime.datetime.now()
self._schedule_next_run()
return ret
def _schedule_next_run(self):
assert self.unit in (‘seconds‘, ‘minutes‘, ‘hours‘, ‘days‘, ‘weeks‘)
self.period = datetime.timedelta(**{self.unit: self.interval})
self.next_run = datetime.datetime.now() + self.period
#还有很多....
各种方法
学习总结
转载:https://zhuanlan.zhihu.com/p/23086148