|
|
@ -1,83 +1,32 @@ |
|
|
|
""" |
|
|
|
This module is the main part of the library, and is the only module that |
|
|
|
regular users should be concerned with. |
|
|
|
This module is the main part of the library. It houses the Scheduler class |
|
|
|
and related exceptions. |
|
|
|
""" |
|
|
|
|
|
|
|
from threading import Thread, Event, Lock |
|
|
|
from datetime import datetime, timedelta |
|
|
|
from logging import getLogger |
|
|
|
import os |
|
|
|
import sys |
|
|
|
|
|
|
|
from apscheduler.util import time_difference, asbool |
|
|
|
from apscheduler.triggers import DateTrigger, IntervalTrigger, CronTrigger |
|
|
|
|
|
|
|
from apscheduler.util import * |
|
|
|
from apscheduler.triggers import SimpleTrigger, IntervalTrigger, CronTrigger |
|
|
|
from apscheduler.jobstores.ram_store import RAMJobStore |
|
|
|
from apscheduler.job import Job, MaxInstancesReachedError |
|
|
|
from apscheduler.events import * |
|
|
|
from apscheduler.threadpool import ThreadPool |
|
|
|
|
|
|
|
logger = getLogger(__name__) |
|
|
|
|
|
|
|
|
|
|
|
class Job(object): |
|
|
|
""" |
|
|
|
Represents a task scheduled in the scheduler. |
|
|
|
""" |
|
|
|
|
|
|
|
def __init__(self, trigger, func, args, kwargs): |
|
|
|
self.thread = None |
|
|
|
self.trigger = trigger |
|
|
|
self.func = func |
|
|
|
self.args = args |
|
|
|
self.kwargs = kwargs |
|
|
|
if hasattr(func, '__name__'): |
|
|
|
self.name = func.__name__ |
|
|
|
else: |
|
|
|
self.name = str(func) |
|
|
|
|
|
|
|
def run(self): |
|
|
|
""" |
|
|
|
Starts the execution of this job in a separate thread. |
|
|
|
""" |
|
|
|
if (self.thread and self.thread.isAlive()): |
|
|
|
logger.info('Skipping run of job %s (previously triggered ' |
|
|
|
'instance is still running)', self) |
|
|
|
else: |
|
|
|
self.thread = Thread(target=self.run_in_thread) |
|
|
|
self.thread.setDaemon(False) |
|
|
|
self.thread.start() |
|
|
|
|
|
|
|
def run_in_thread(self): |
|
|
|
""" |
|
|
|
Runs the associated callable. |
|
|
|
This method is executed in a dedicated thread. |
|
|
|
""" |
|
|
|
try: |
|
|
|
self.func(*self.args, **self.kwargs) |
|
|
|
except: |
|
|
|
logger.exception('Error executing job "%s"', self) |
|
|
|
raise |
|
|
|
|
|
|
|
def __str__(self): |
|
|
|
return '%s: %s' % (self.name, repr(self.trigger)) |
|
|
|
|
|
|
|
def __repr__(self): |
|
|
|
return '%s(%s, %s)' % (self.__class__.__name__, self.name, |
|
|
|
repr(self.trigger)) |
|
|
|
|
|
|
|
|
|
|
|
class SchedulerShutdownError(Exception): |
|
|
|
""" |
|
|
|
Thrown when attempting to use the scheduler after |
|
|
|
it's been shut down. |
|
|
|
""" |
|
|
|
|
|
|
|
def __init__(self): |
|
|
|
Exception.__init__(self, 'Scheduler has already been shut down') |
|
|
|
|
|
|
|
|
|
|
|
class SchedulerAlreadyRunningError(Exception): |
|
|
|
""" |
|
|
|
Thrown when attempting to start the scheduler, but it's already running. |
|
|
|
Raised when attempting to start or configure the scheduler when it's |
|
|
|
already running. |
|
|
|
""" |
|
|
|
|
|
|
|
def __init__(self): |
|
|
|
Exception.__init__(self, 'Scheduler is already running') |
|
|
|
def __str__(self): |
|
|
|
return 'Scheduler is already running' |
|
|
|
|
|
|
|
|
|
|
|
class Scheduler(object): |
|
|
@ -86,141 +35,265 @@ class Scheduler(object): |
|
|
|
their execution. |
|
|
|
""" |
|
|
|
|
|
|
|
stopped = False |
|
|
|
thread = None |
|
|
|
misfire_grace_time = 1 |
|
|
|
daemonic = True |
|
|
|
_stopped = False |
|
|
|
_thread = None |
|
|
|
|
|
|
|
def __init__(self, **config): |
|
|
|
self.jobs = [] |
|
|
|
self.jobs_lock = Lock() |
|
|
|
self.wakeup = Event() |
|
|
|
self.configure(config) |
|
|
|
def __init__(self, gconfig={}, **options): |
|
|
|
self._wakeup = Event() |
|
|
|
self._jobstores = {} |
|
|
|
self._jobstores_lock = Lock() |
|
|
|
self._listeners = [] |
|
|
|
self._listeners_lock = Lock() |
|
|
|
self._pending_jobs = [] |
|
|
|
self.configure(gconfig, **options) |
|
|
|
|
|
|
|
def configure(self, config): |
|
|
|
def configure(self, gconfig={}, **options): |
|
|
|
""" |
|
|
|
Updates the configuration with the given options. |
|
|
|
Reconfigures the scheduler with the given options. Can only be done |
|
|
|
when the scheduler isn't running. |
|
|
|
""" |
|
|
|
for key, val in config.items(): |
|
|
|
if key.startswith('apscheduler.'): |
|
|
|
key = key[12:] |
|
|
|
if key == 'misfire_grace_time': |
|
|
|
self.misfire_grace_time = int(val) |
|
|
|
elif key == 'daemonic': |
|
|
|
self.daemonic = asbool(val) |
|
|
|
if self.running: |
|
|
|
raise SchedulerAlreadyRunningError |
|
|
|
|
|
|
|
# Set general options |
|
|
|
config = combine_opts(gconfig, 'apscheduler.', options) |
|
|
|
self.misfire_grace_time = int(config.pop('misfire_grace_time', 1)) |
|
|
|
self.coalesce = asbool(config.pop('coalesce', True)) |
|
|
|
self.daemonic = asbool(config.pop('daemonic', True)) |
|
|
|
|
|
|
|
# Configure the thread pool |
|
|
|
if 'threadpool' in config: |
|
|
|
self._threadpool = maybe_ref(config['threadpool']) |
|
|
|
else: |
|
|
|
threadpool_opts = combine_opts(config, 'threadpool.') |
|
|
|
self._threadpool = ThreadPool(**threadpool_opts) |
|
|
|
|
|
|
|
# Configure job stores |
|
|
|
jobstore_opts = combine_opts(config, 'jobstore.') |
|
|
|
jobstores = {} |
|
|
|
for key, value in jobstore_opts.items(): |
|
|
|
store_name, option = key.split('.', 1) |
|
|
|
opts_dict = jobstores.setdefault(store_name, {}) |
|
|
|
opts_dict[option] = value |
|
|
|
|
|
|
|
for alias, opts in jobstores.items(): |
|
|
|
classname = opts.pop('class') |
|
|
|
cls = maybe_ref(classname) |
|
|
|
jobstore = cls(**opts) |
|
|
|
self.add_jobstore(jobstore, alias, True) |
|
|
|
|
|
|
|
def start(self): |
|
|
|
""" |
|
|
|
Starts the scheduler in a new thread. |
|
|
|
""" |
|
|
|
if self.thread and self.thread.isAlive(): |
|
|
|
if self.running: |
|
|
|
raise SchedulerAlreadyRunningError |
|
|
|
|
|
|
|
self.stopped = False |
|
|
|
self.thread = Thread(target=self.run, name='APScheduler') |
|
|
|
self.thread.setDaemon(self.daemonic) |
|
|
|
self.thread.start() |
|
|
|
logger.info('Scheduler started') |
|
|
|
|
|
|
|
def shutdown(self, timeout=0): |
|
|
|
# Create a RAMJobStore as the default if there is no default job store |
|
|
|
if not 'default' in self._jobstores: |
|
|
|
self.add_jobstore(RAMJobStore(), 'default', True) |
|
|
|
|
|
|
|
# Schedule all pending jobs |
|
|
|
for job, jobstore in self._pending_jobs: |
|
|
|
self._real_add_job(job, jobstore, False) |
|
|
|
del self._pending_jobs[:] |
|
|
|
|
|
|
|
self._stopped = False |
|
|
|
self._thread = Thread(target=self._main_loop, name='APScheduler') |
|
|
|
self._thread.setDaemon(self.daemonic) |
|
|
|
self._thread.start() |
|
|
|
|
|
|
|
def shutdown(self, wait=True, shutdown_threadpool=True): |
|
|
|
""" |
|
|
|
Shuts down the scheduler and terminates the thread. |
|
|
|
Does not terminate any currently running jobs. |
|
|
|
Does not interrupt any currently running jobs. |
|
|
|
|
|
|
|
:param timeout: time (in seconds) to wait for the scheduler thread to |
|
|
|
terminate, 0 to wait forever, None to skip waiting |
|
|
|
:param wait: ``True`` to wait until all currently executing jobs have |
|
|
|
finished (if ``shutdown_threadpool`` is also ``True``) |
|
|
|
:param shutdown_threadpool: ``True`` to shut down the thread pool |
|
|
|
""" |
|
|
|
if self.stopped or not self.thread.isAlive(): |
|
|
|
if not self.running: |
|
|
|
return |
|
|
|
|
|
|
|
logger.info('Scheduler shutting down') |
|
|
|
self.stopped = True |
|
|
|
self.wakeup.set() |
|
|
|
if timeout is not None: |
|
|
|
self.thread.join(timeout) |
|
|
|
self.jobs = [] |
|
|
|
self._stopped = True |
|
|
|
self._wakeup.set() |
|
|
|
|
|
|
|
# Shut down the thread pool |
|
|
|
if shutdown_threadpool: |
|
|
|
self._threadpool.shutdown(wait) |
|
|
|
|
|
|
|
# Wait until the scheduler thread terminates |
|
|
|
self._thread.join() |
|
|
|
|
|
|
|
def cron_schedule(self, year='*', month='*', day='*', week='*', |
|
|
|
day_of_week='*', hour='*', minute='*', second='*', |
|
|
|
args=None, kwargs=None): |
|
|
|
@property |
|
|
|
def running(self): |
|
|
|
return not self._stopped and self._thread and self._thread.isAlive() |
|
|
|
|
|
|
|
def add_jobstore(self, jobstore, alias, quiet=False): |
|
|
|
""" |
|
|
|
Decorator that causes its host function to be scheduled |
|
|
|
according to the given parameters. |
|
|
|
This decorator does not wrap its host function. |
|
|
|
The scheduled function will be called without any arguments. |
|
|
|
See :meth:`add_cron_job` for more information. |
|
|
|
Adds a job store to this scheduler. |
|
|
|
|
|
|
|
:param jobstore: job store to be added |
|
|
|
:param alias: alias for the job store |
|
|
|
:param quiet: True to suppress scheduler thread wakeup |
|
|
|
:type jobstore: instance of |
|
|
|
:class:`~apscheduler.jobstores.base.JobStore` |
|
|
|
:type alias: str |
|
|
|
""" |
|
|
|
def inner(func): |
|
|
|
self.add_cron_job(func, year, month, day, week, day_of_week, hour, |
|
|
|
minute, second, args, kwargs) |
|
|
|
return func |
|
|
|
return inner |
|
|
|
self._jobstores_lock.acquire() |
|
|
|
try: |
|
|
|
if alias in self._jobstores: |
|
|
|
raise KeyError('Alias "%s" is already in use' % alias) |
|
|
|
self._jobstores[alias] = jobstore |
|
|
|
jobstore.load_jobs() |
|
|
|
finally: |
|
|
|
self._jobstores_lock.release() |
|
|
|
|
|
|
|
# Notify listeners that a new job store has been added |
|
|
|
self._notify_listeners(JobStoreEvent(EVENT_JOBSTORE_ADDED, alias)) |
|
|
|
|
|
|
|
def interval_schedule(self, weeks=0, days=0, hours=0, minutes=0, seconds=0, |
|
|
|
start_date=None, repeat=0, args=None, kwargs=None): |
|
|
|
# Notify the scheduler so it can scan the new job store for jobs |
|
|
|
if not quiet: |
|
|
|
self._wakeup.set() |
|
|
|
|
|
|
|
def remove_jobstore(self, alias): |
|
|
|
""" |
|
|
|
Decorator that causes its host function to be scheduled |
|
|
|
for execution on specified intervals. |
|
|
|
This decorator does not wrap its host function. |
|
|
|
The scheduled function will be called without any arguments. |
|
|
|
Note that the default repeat value is 0, which means to repeat forever. |
|
|
|
See :meth:`add_delayed_job` for more information. |
|
|
|
Removes the job store by the given alias from this scheduler. |
|
|
|
|
|
|
|
:type alias: str |
|
|
|
""" |
|
|
|
def inner(func): |
|
|
|
self.add_interval_job(func, weeks, days, hours, minutes, seconds, |
|
|
|
start_date, repeat, args, kwargs) |
|
|
|
return func |
|
|
|
return inner |
|
|
|
self._jobstores_lock.acquire() |
|
|
|
try: |
|
|
|
try: |
|
|
|
del self._jobstores[alias] |
|
|
|
except KeyError: |
|
|
|
raise KeyError('No such job store: %s' % alias) |
|
|
|
finally: |
|
|
|
self._jobstores_lock.release() |
|
|
|
|
|
|
|
def _add_job(self, trigger, func, args, kwargs): |
|
|
|
# Notify listeners that a job store has been removed |
|
|
|
self._notify_listeners(JobStoreEvent(EVENT_JOBSTORE_REMOVED, alias)) |
|
|
|
|
|
|
|
def add_listener(self, callback, mask=EVENT_ALL): |
|
|
|
""" |
|
|
|
Adds a Job to the job list and notifies the scheduler thread. |
|
|
|
Adds a listener for scheduler events. When a matching event occurs, |
|
|
|
``callback`` is executed with the event object as its sole argument. |
|
|
|
If the ``mask`` parameter is not provided, the callback will receive |
|
|
|
events of all types. |
|
|
|
|
|
|
|
:param trigger: trigger for the given callable |
|
|
|
:param args: list of positional arguments to call func with |
|
|
|
:param kwargs: dict of keyword arguments to call func with |
|
|
|
:return: the scheduled job |
|
|
|
:rtype: Job |
|
|
|
:param callback: any callable that takes one argument |
|
|
|
:param mask: bitmask that indicates which events should be listened to |
|
|
|
""" |
|
|
|
if self.stopped: |
|
|
|
raise SchedulerShutdownError |
|
|
|
if not hasattr(func, '__call__'): |
|
|
|
raise TypeError('func must be callable') |
|
|
|
self._listeners_lock.acquire() |
|
|
|
try: |
|
|
|
self._listeners.append((callback, mask)) |
|
|
|
finally: |
|
|
|
self._listeners_lock.release() |
|
|
|
|
|
|
|
def remove_listener(self, callback): |
|
|
|
""" |
|
|
|
Removes a previously added event listener. |
|
|
|
""" |
|
|
|
self._listeners_lock.acquire() |
|
|
|
try: |
|
|
|
for i, (cb, _) in enumerate(self._listeners): |
|
|
|
if callback == cb: |
|
|
|
del self._listeners[i] |
|
|
|
finally: |
|
|
|
self._listeners_lock.release() |
|
|
|
|
|
|
|
def _notify_listeners(self, event): |
|
|
|
self._listeners_lock.acquire() |
|
|
|
try: |
|
|
|
listeners = tuple(self._listeners) |
|
|
|
finally: |
|
|
|
self._listeners_lock.release() |
|
|
|
|
|
|
|
if args is None: |
|
|
|
args = [] |
|
|
|
if kwargs is None: |
|
|
|
kwargs = {} |
|
|
|
for cb, mask in listeners: |
|
|
|
if event.code & mask: |
|
|
|
try: |
|
|
|
cb(event) |
|
|
|
except: |
|
|
|
logger.exception('Error notifying listener') |
|
|
|
|
|
|
|
job = Job(trigger, func, args, kwargs) |
|
|
|
self.jobs_lock.acquire() |
|
|
|
def _real_add_job(self, job, jobstore, wakeup): |
|
|
|
job.compute_next_run_time(datetime.now()) |
|
|
|
if not job.next_run_time: |
|
|
|
raise ValueError('Not adding job since it would never be run') |
|
|
|
|
|
|
|
self._jobstores_lock.acquire() |
|
|
|
try: |
|
|
|
self.jobs.append(job) |
|
|
|
try: |
|
|
|
store = self._jobstores[jobstore] |
|
|
|
except KeyError: |
|
|
|
raise KeyError('No such job store: %s' % jobstore) |
|
|
|
store.add_job(job) |
|
|
|
finally: |
|
|
|
self.jobs_lock.release() |
|
|
|
logger.info('Added job "%s"', job) |
|
|
|
self._jobstores_lock.release() |
|
|
|
|
|
|
|
# Notify listeners that a new job has been added |
|
|
|
event = JobStoreEvent(EVENT_JOBSTORE_JOB_ADDED, jobstore, job) |
|
|
|
self._notify_listeners(event) |
|
|
|
|
|
|
|
logger.info('Added job "%s" to job store "%s"', job, jobstore) |
|
|
|
|
|
|
|
# Notify the scheduler about the new job |
|
|
|
self.wakeup.set() |
|
|
|
if wakeup: |
|
|
|
self._wakeup.set() |
|
|
|
|
|
|
|
def add_job(self, trigger, func, args, kwargs, jobstore='default', |
|
|
|
**options): |
|
|
|
""" |
|
|
|
Adds the given job to the job list and notifies the scheduler thread. |
|
|
|
|
|
|
|
:param trigger: alias of the job store to store the job in |
|
|
|
:param func: callable to run at the given time |
|
|
|
:param args: list of positional arguments to call func with |
|
|
|
:param kwargs: dict of keyword arguments to call func with |
|
|
|
:param jobstore: alias of the job store to store the job in |
|
|
|
:rtype: :class:`~apscheduler.job.Job` |
|
|
|
""" |
|
|
|
job = Job(trigger, func, args or [], kwargs or {}, |
|
|
|
options.pop('misfire_grace_time', self.misfire_grace_time), |
|
|
|
options.pop('coalesce', self.coalesce), **options) |
|
|
|
if not self.running: |
|
|
|
self._pending_jobs.append((job, jobstore)) |
|
|
|
logger.info('Adding job tentatively -- it will be properly ' |
|
|
|
'scheduled when the scheduler starts') |
|
|
|
else: |
|
|
|
self._real_add_job(job, jobstore, True) |
|
|
|
return job |
|
|
|
|
|
|
|
def add_date_job(self, func, date, args=None, kwargs=None): |
|
|
|
def _remove_job(self, job, alias, jobstore): |
|
|
|
jobstore.remove_job(job) |
|
|
|
|
|
|
|
# Notify listeners that a job has been removed |
|
|
|
event = JobStoreEvent(EVENT_JOBSTORE_JOB_REMOVED, alias, job) |
|
|
|
self._notify_listeners(event) |
|
|
|
|
|
|
|
logger.info('Removed job "%s"', job) |
|
|
|
|
|
|
|
def add_date_job(self, func, date, args=None, kwargs=None, **options): |
|
|
|
""" |
|
|
|
Adds a job to be completed on a specific date and time. |
|
|
|
Schedules a job to be completed on a specific date and time. |
|
|
|
|
|
|
|
:param func: callable to run |
|
|
|
:param args: positional arguments to call func with |
|
|
|
:param kwargs: keyword arguments to call func with |
|
|
|
:param func: callable to run at the given time |
|
|
|
:param date: the date/time to run the job at |
|
|
|
:param name: name of the job |
|
|
|
:param jobstore: stored the job in the named (or given) job store |
|
|
|
:param misfire_grace_time: seconds after the designated run time that |
|
|
|
the job is still allowed to be run |
|
|
|
:type date: :class:`datetime.date` |
|
|
|
:rtype: :class:`~apscheduler.job.Job` |
|
|
|
""" |
|
|
|
trigger = DateTrigger(date) |
|
|
|
return self._add_job(trigger, func, args, kwargs) |
|
|
|
trigger = SimpleTrigger(date) |
|
|
|
return self.add_job(trigger, func, args, kwargs, **options) |
|
|
|
|
|
|
|
def add_interval_job(self, func, weeks=0, days=0, hours=0, minutes=0, |
|
|
|
seconds=0, start_date=None, repeat=0, args=None, |
|
|
|
kwargs=None): |
|
|
|
seconds=0, start_date=None, args=None, kwargs=None, |
|
|
|
**options): |
|
|
|
""" |
|
|
|
Adds a job to be completed on specified intervals. |
|
|
|
Schedules a job to be completed on specified intervals. |
|
|
|
|
|
|
|
:param func: callable to run |
|
|
|
:param weeks: number of weeks to wait |
|
|
@ -230,25 +303,29 @@ class Scheduler(object): |
|
|
|
:param seconds: number of seconds to wait |
|
|
|
:param start_date: when to first execute the job and start the |
|
|
|
counter (default is after the given interval) |
|
|
|
:param repeat: number of times the job will be run (0 = repeat |
|
|
|
indefinitely) |
|
|
|
:param args: list of positional arguments to call func with |
|
|
|
:param kwargs: dict of keyword arguments to call func with |
|
|
|
:param name: name of the job |
|
|
|
:param jobstore: alias of the job store to add the job to |
|
|
|
:param misfire_grace_time: seconds after the designated run time that |
|
|
|
the job is still allowed to be run |
|
|
|
:rtype: :class:`~apscheduler.job.Job` |
|
|
|
""" |
|
|
|
interval = timedelta(weeks=weeks, days=days, hours=hours, |
|
|
|
minutes=minutes, seconds=seconds) |
|
|
|
trigger = IntervalTrigger(interval, repeat, start_date) |
|
|
|
return self._add_job(trigger, func, args, kwargs) |
|
|
|
trigger = IntervalTrigger(interval, start_date) |
|
|
|
return self.add_job(trigger, func, args, kwargs, **options) |
|
|
|
|
|
|
|
def add_cron_job(self, func, year='*', month='*', day='*', week='*', |
|
|
|
day_of_week='*', hour='*', minute='*', second='*', |
|
|
|
args=None, kwargs=None): |
|
|
|
def add_cron_job(self, func, year=None, month=None, day=None, week=None, |
|
|
|
day_of_week=None, hour=None, minute=None, second=None, |
|
|
|
start_date=None, args=None, kwargs=None, **options): |
|
|
|
""" |
|
|
|
Adds a job to be completed on times that match the given expressions. |
|
|
|
Schedules a job to be completed on times that match the given |
|
|
|
expressions. |
|
|
|
|
|
|
|
:param func: callable to run |
|
|
|
:param year: year to run on |
|
|
|
:param month: month to run on (0 = January) |
|
|
|
:param month: month to run on |
|
|
|
:param day: day of month to run on |
|
|
|
:param week: week of the year to run on |
|
|
|
:param day_of_week: weekday to run on (0 = Monday) |
|
|
@ -256,152 +333,227 @@ class Scheduler(object): |
|
|
|
:param second: second to run on |
|
|
|
:param args: list of positional arguments to call func with |
|
|
|
:param kwargs: dict of keyword arguments to call func with |
|
|
|
:param name: name of the job |
|
|
|
:param jobstore: alias of the job store to add the job to |
|
|
|
:param misfire_grace_time: seconds after the designated run time that |
|
|
|
the job is still allowed to be run |
|
|
|
:return: the scheduled job |
|
|
|
:rtype: Job |
|
|
|
:rtype: :class:`~apscheduler.job.Job` |
|
|
|
""" |
|
|
|
trigger = CronTrigger(year=year, month=month, day=day, week=week, |
|
|
|
day_of_week=day_of_week, hour=hour, |
|
|
|
minute=minute, second=second) |
|
|
|
return self._add_job(trigger, func, args, kwargs) |
|
|
|
minute=minute, second=second, |
|
|
|
start_date=start_date) |
|
|
|
return self.add_job(trigger, func, args, kwargs, **options) |
|
|
|
|
|
|
|
def cron_schedule(self, **options): |
|
|
|
""" |
|
|
|
Decorator version of :meth:`add_cron_job`. |
|
|
|
This decorator does not wrap its host function. |
|
|
|
Unscheduling decorated functions is possible by passing the ``job`` |
|
|
|
attribute of the scheduled function to :meth:`unschedule_job`. |
|
|
|
""" |
|
|
|
def inner(func): |
|
|
|
func.job = self.add_cron_job(func, **options) |
|
|
|
return func |
|
|
|
return inner |
|
|
|
|
|
|
|
def interval_schedule(self, **options): |
|
|
|
""" |
|
|
|
Decorator version of :meth:`add_interval_job`. |
|
|
|
This decorator does not wrap its host function. |
|
|
|
Unscheduling decorated functions is possible by passing the ``job`` |
|
|
|
attribute of the scheduled function to :meth:`unschedule_job`. |
|
|
|
""" |
|
|
|
def inner(func): |
|
|
|
func.job = self.add_interval_job(func, **options) |
|
|
|
return func |
|
|
|
return inner |
|
|
|
|
|
|
|
def is_job_active(self, job): |
|
|
|
def get_jobs(self): |
|
|
|
""" |
|
|
|
Determines if the given job is still on the job list. |
|
|
|
Returns a list of all scheduled jobs. |
|
|
|
|
|
|
|
:return: True if the job is still active, False if not |
|
|
|
:return: list of :class:`~apscheduler.job.Job` objects |
|
|
|
""" |
|
|
|
self.jobs_lock.acquire() |
|
|
|
self._jobstores_lock.acquire() |
|
|
|
try: |
|
|
|
return job in self.jobs |
|
|
|
jobs = [] |
|
|
|
for jobstore in itervalues(self._jobstores): |
|
|
|
jobs.extend(jobstore.jobs) |
|
|
|
return jobs |
|
|
|
finally: |
|
|
|
self.jobs_lock.release() |
|
|
|
self._jobstores_lock.release() |
|
|
|
|
|
|
|
def unschedule_job(self, job): |
|
|
|
""" |
|
|
|
Removes a job, preventing it from being fired any more. |
|
|
|
Removes a job, preventing it from being run any more. |
|
|
|
""" |
|
|
|
self.jobs_lock.acquire() |
|
|
|
self._jobstores_lock.acquire() |
|
|
|
try: |
|
|
|
self.jobs.remove(job) |
|
|
|
for alias, jobstore in iteritems(self._jobstores): |
|
|
|
if job in list(jobstore.jobs): |
|
|
|
self._remove_job(job, alias, jobstore) |
|
|
|
return |
|
|
|
finally: |
|
|
|
self.jobs_lock.release() |
|
|
|
logger.info('Removed job "%s"', job) |
|
|
|
self.wakeup.set() |
|
|
|
self._jobstores_lock.release() |
|
|
|
|
|
|
|
raise KeyError('Job "%s" is not scheduled in any job store' % job) |
|
|
|
|
|
|
|
def unschedule_func(self, func): |
|
|
|
""" |
|
|
|
Removes all jobs that would execute the given function. |
|
|
|
""" |
|
|
|
self.jobs_lock.acquire() |
|
|
|
found = False |
|
|
|
self._jobstores_lock.acquire() |
|
|
|
try: |
|
|
|
remove_list = [job for job in self.jobs if job.func == func] |
|
|
|
for job in remove_list: |
|
|
|
self.jobs.remove(job) |
|
|
|
logger.info('Removed job "%s"', job) |
|
|
|
for alias, jobstore in iteritems(self._jobstores): |
|
|
|
for job in list(jobstore.jobs): |
|
|
|
if job.func == func: |
|
|
|
self._remove_job(job, alias, jobstore) |
|
|
|
found = True |
|
|
|
finally: |
|
|
|
self.jobs_lock.release() |
|
|
|
self._jobstores_lock.release() |
|
|
|
|
|
|
|
# Have the scheduler calculate a new wakeup time |
|
|
|
self.wakeup.set() |
|
|
|
if not found: |
|
|
|
raise KeyError('The given function is not scheduled in this ' |
|
|
|
'scheduler') |
|
|
|
|
|
|
|
def dump_jobs(self): |
|
|
|
def print_jobs(self, out=None): |
|
|
|
""" |
|
|
|
Gives a textual listing of all jobs currently scheduled on this |
|
|
|
Prints out a textual listing of all jobs currently scheduled on this |
|
|
|
scheduler. |
|
|
|
|
|
|
|
:rtype: str |
|
|
|
:param out: a file-like object to print to (defaults to **sys.stdout** |
|
|
|
if nothing is given) |
|
|
|
""" |
|
|
|
out = out or sys.stdout |
|
|
|
job_strs = [] |
|
|
|
now = datetime.now() |
|
|
|
self.jobs_lock.acquire() |
|
|
|
self._jobstores_lock.acquire() |
|
|
|
try: |
|
|
|
for job in self.jobs: |
|
|
|
next_fire_time = job.trigger.get_next_fire_time(now) |
|
|
|
job_str = '%s (next fire time: %s)' % (str(job), |
|
|
|
next_fire_time) |
|
|
|
job_strs.append(job_str) |
|
|
|
for alias, jobstore in iteritems(self._jobstores): |
|
|
|
job_strs.append('Jobstore %s:' % alias) |
|
|
|
if jobstore.jobs: |
|
|
|
for job in jobstore.jobs: |
|
|
|
job_strs.append(' %s' % job) |
|
|
|
else: |
|
|
|
job_strs.append(' No scheduled jobs') |
|
|
|
finally: |
|
|
|
self.jobs_lock.release() |
|
|
|
self._jobstores_lock.release() |
|
|
|
|
|
|
|
if job_strs: |
|
|
|
return os.linesep.join(job_strs) |
|
|
|
return 'No jobs currently scheduled.' |
|
|
|
out.write(os.linesep.join(job_strs) + os.linesep) |
|
|
|
|
|
|
|
def _get_next_wakeup_time(self, now): |
|
|
|
def _run_job(self, job, run_times): |
|
|
|
""" |
|
|
|
Determines the time of the next job execution, and removes finished |
|
|
|
jobs. |
|
|
|
|
|
|
|
:param now: the result of datetime.now(), generated elsewhere for |
|
|
|
consistency. |
|
|
|
Acts as a harness that runs the actual job code in a thread. |
|
|
|
""" |
|
|
|
next_wakeup = None |
|
|
|
finished_jobs = [] |
|
|
|
|
|
|
|
self.jobs_lock.acquire() |
|
|
|
try: |
|
|
|
for job in self.jobs: |
|
|
|
next_run = job.trigger.get_next_fire_time(now) |
|
|
|
if next_run is None: |
|
|
|
finished_jobs.append(job) |
|
|
|
elif next_run and (next_wakeup is None or \ |
|
|
|
next_run < next_wakeup): |
|
|
|
next_wakeup = next_run |
|
|
|
|
|
|
|
# Clear out any finished jobs |
|
|
|
for job in finished_jobs: |
|
|
|
self.jobs.remove(job) |
|
|
|
logger.info('Removed finished job "%s"', job) |
|
|
|
finally: |
|
|
|
self.jobs_lock.release() |
|
|
|
|
|
|
|
return next_wakeup |
|
|
|
|
|
|
|
def _get_current_jobs(self): |
|
|
|
""" |
|
|
|
Determines which jobs should be executed right now. |
|
|
|
""" |
|
|
|
current_jobs = [] |
|
|
|
now = datetime.now() |
|
|
|
start = now - timedelta(seconds=self.misfire_grace_time) |
|
|
|
|
|
|
|
self.jobs_lock.acquire() |
|
|
|
for run_time in run_times: |
|
|
|
# See if the job missed its run time window, and handle possible |
|
|
|
# misfires accordingly |
|
|
|
difference = datetime.now() - run_time |
|
|
|
grace_time = timedelta(seconds=job.misfire_grace_time) |
|
|
|
if difference > grace_time: |
|
|
|
# Notify listeners about a missed run |
|
|
|
event = JobEvent(EVENT_JOB_MISSED, job, run_time) |
|
|
|
self._notify_listeners(event) |
|
|
|
logger.warning('Run time of job "%s" was missed by %s', |
|
|
|
job, difference) |
|
|
|
else: |
|
|
|
try: |
|
|
|
job.add_instance() |
|
|
|
except MaxInstancesReachedError: |
|
|
|
event = JobEvent(EVENT_JOB_MISSED, job, run_time) |
|
|
|
self._notify_listeners(event) |
|
|
|
logger.warning('Execution of job "%s" skipped: ' |
|
|
|
'maximum number of running instances ' |
|
|
|
'reached (%d)', job, job.max_instances) |
|
|
|
break |
|
|
|
|
|
|
|
logger.info('Running job "%s" (scheduled at %s)', job, |
|
|
|
run_time) |
|
|
|
|
|
|
|
try: |
|
|
|
retval = job.func(*job.args, **job.kwargs) |
|
|
|
except: |
|
|
|
# Notify listeners about the exception |
|
|
|
exc, tb = sys.exc_info()[1:] |
|
|
|
event = JobEvent(EVENT_JOB_ERROR, job, run_time, |
|
|
|
exception=exc, traceback=tb) |
|
|
|
self._notify_listeners(event) |
|
|
|
|
|
|
|
logger.exception('Job "%s" raised an exception', job) |
|
|
|
else: |
|
|
|
# Notify listeners about successful execution |
|
|
|
event = JobEvent(EVENT_JOB_EXECUTED, job, run_time, |
|
|
|
retval=retval) |
|
|
|
self._notify_listeners(event) |
|
|
|
|
|
|
|
logger.info('Job "%s" executed successfully', job) |
|
|
|
|
|
|
|
job.remove_instance() |
|
|
|
|
|
|
|
# If coalescing is enabled, don't attempt any further runs |
|
|
|
if job.coalesce: |
|
|
|
break |
|
|
|
|
|
|
|
def _process_jobs(self, now): |
|
|
|
""" |
|
|
|
Iterates through jobs in every jobstore, starts pending jobs |
|
|
|
and figures out the next wakeup time. |
|
|
|
""" |
|
|
|
next_wakeup_time = None |
|
|
|
self._jobstores_lock.acquire() |
|
|
|
try: |
|
|
|
for job in self.jobs: |
|
|
|
next_run = job.trigger.get_next_fire_time(start) |
|
|
|
if next_run: |
|
|
|
time_diff = time_difference(now, next_run) |
|
|
|
if next_run < now and time_diff <= self.misfire_grace_time: |
|
|
|
current_jobs.append(job) |
|
|
|
for alias, jobstore in iteritems(self._jobstores): |
|
|
|
for job in tuple(jobstore.jobs): |
|
|
|
run_times = job.get_run_times(now) |
|
|
|
if run_times: |
|
|
|
self._threadpool.submit(self._run_job, job, run_times) |
|
|
|
|
|
|
|
# Increase the job's run count |
|
|
|
if job.coalesce: |
|
|
|
job.runs += 1 |
|
|
|
else: |
|
|
|
job.runs += len(run_times) |
|
|
|
|
|
|
|
# Update the job, but don't keep finished jobs around |
|
|
|
if job.compute_next_run_time(now + timedelta(microseconds=1)): |
|
|
|
jobstore.update_job(job) |
|
|
|
else: |
|
|
|
self._remove_job(job, alias, jobstore) |
|
|
|
|
|
|
|
if not next_wakeup_time: |
|
|
|
next_wakeup_time = job.next_run_time |
|
|
|
elif job.next_run_time: |
|
|
|
next_wakeup_time = min(next_wakeup_time, |
|
|
|
job.next_run_time) |
|
|
|
return next_wakeup_time |
|
|
|
finally: |
|
|
|
self.jobs_lock.release() |
|
|
|
self._jobstores_lock.release() |
|
|
|
|
|
|
|
return current_jobs |
|
|
|
def _main_loop(self): |
|
|
|
"""Executes jobs on schedule.""" |
|
|
|
|
|
|
|
def run(self): |
|
|
|
""" |
|
|
|
Runs the main loop of the scheduler. |
|
|
|
""" |
|
|
|
self.wakeup.clear() |
|
|
|
while not self.stopped: |
|
|
|
# Execute any jobs scheduled to be run right now |
|
|
|
for job in self._get_current_jobs(): |
|
|
|
logger.debug('Executing job "%s"', job) |
|
|
|
job.run() |
|
|
|
logger.info('Scheduler started') |
|
|
|
self._notify_listeners(SchedulerEvent(EVENT_SCHEDULER_START)) |
|
|
|
|
|
|
|
# Figure out when the next job should be run, and |
|
|
|
# adjust the wait time accordingly |
|
|
|
self._wakeup.clear() |
|
|
|
while not self._stopped: |
|
|
|
logger.debug('Looking for jobs to run') |
|
|
|
now = datetime.now() |
|
|
|
next_wakeup_time = self._get_next_wakeup_time(now) |
|
|
|
next_wakeup_time = self._process_jobs(now) |
|
|
|
|
|
|
|
# Sleep until the next job is scheduled to be run, |
|
|
|
# or a new job is added, or the scheduler is stopped |
|
|
|
# a new job is added or the scheduler is stopped |
|
|
|
if next_wakeup_time is not None: |
|
|
|
wait_seconds = time_difference(next_wakeup_time, now) |
|
|
|
logger.debug('Next wakeup is due at %s (in %f seconds)', |
|
|
|
next_wakeup_time, wait_seconds) |
|
|
|
self.wakeup.wait(wait_seconds) |
|
|
|
self._wakeup.wait(wait_seconds) |
|
|
|
else: |
|
|
|
logger.debug('No jobs; waiting until a job is added') |
|
|
|
self.wakeup.wait() |
|
|
|
self.wakeup.clear() |
|
|
|
self._wakeup.wait() |
|
|
|
self._wakeup.clear() |
|
|
|
|
|
|
|
logger.info('Scheduler has been shut down') |
|
|
|
self._notify_listeners(SchedulerEvent(EVENT_SCHEDULER_SHUTDOWN)) |
|
|
|