From 3a6ae1bd41cf36282a66ffa667acd4feb8b5998b Mon Sep 17 00:00:00 2001 From: Ruud Date: Sat, 11 Feb 2012 15:30:24 +0100 Subject: [PATCH] Scheduler fix --- couchpotato/core/_base/scheduler/main.py | 10 +- libs/apscheduler/__init__.py | 3 + libs/apscheduler/events.py | 64 +++ libs/apscheduler/expressions.py | 176 ------- libs/apscheduler/fields.py | 92 ---- libs/apscheduler/job.py | 134 +++++ libs/apscheduler/jobstores/__init__.py | 0 libs/apscheduler/jobstores/base.py | 25 + libs/apscheduler/jobstores/mongodb_store.py | 84 +++ libs/apscheduler/jobstores/ram_store.py | 25 + libs/apscheduler/jobstores/shelve_store.py | 65 +++ libs/apscheduler/jobstores/sqlalchemy_store.py | 87 ++++ libs/apscheduler/scheduler.py | 684 +++++++++++++++---------- libs/apscheduler/threadpool.py | 133 +++++ libs/apscheduler/triggers.py | 171 ------- libs/apscheduler/triggers/__init__.py | 3 + libs/apscheduler/triggers/cron/__init__.py | 142 +++++ libs/apscheduler/triggers/cron/expressions.py | 178 +++++++ libs/apscheduler/triggers/cron/fields.py | 99 ++++ libs/apscheduler/triggers/interval.py | 39 ++ libs/apscheduler/triggers/simple.py | 17 + libs/apscheduler/util.py | 165 +++++- 22 files changed, 1672 insertions(+), 724 deletions(-) create mode 100644 libs/apscheduler/events.py delete mode 100644 libs/apscheduler/expressions.py delete mode 100644 libs/apscheduler/fields.py create mode 100644 libs/apscheduler/job.py create mode 100644 libs/apscheduler/jobstores/__init__.py create mode 100644 libs/apscheduler/jobstores/base.py create mode 100644 libs/apscheduler/jobstores/mongodb_store.py create mode 100644 libs/apscheduler/jobstores/ram_store.py create mode 100644 libs/apscheduler/jobstores/shelve_store.py create mode 100644 libs/apscheduler/jobstores/sqlalchemy_store.py create mode 100644 libs/apscheduler/threadpool.py delete mode 100644 libs/apscheduler/triggers.py create mode 100644 libs/apscheduler/triggers/__init__.py create mode 100644 libs/apscheduler/triggers/cron/__init__.py create mode 100644 libs/apscheduler/triggers/cron/expressions.py create mode 100644 libs/apscheduler/triggers/cron/fields.py create mode 100644 libs/apscheduler/triggers/interval.py create mode 100644 libs/apscheduler/triggers/simple.py diff --git a/couchpotato/core/_base/scheduler/main.py b/couchpotato/core/_base/scheduler/main.py index 8adaa05..896e1ca 100644 --- a/couchpotato/core/_base/scheduler/main.py +++ b/couchpotato/core/_base/scheduler/main.py @@ -15,8 +15,7 @@ class Scheduler(Plugin): def __init__(self): - sl = logging.getLogger('apscheduler.scheduler') - sl.disabled = True + logging.getLogger('apscheduler').setLevel(logging.WARNING) addEvent('schedule.cron', self.cron) addEvent('schedule.interval', self.interval) @@ -55,7 +54,7 @@ class Scheduler(Plugin): try: self.remove(identifier) interval = self.intervals[identifier] - job = self.sched.add_interval_job(interval['handle'], hours = interval['hours'], minutes = interval['minutes'], seconds = interval['seconds'], repeat = interval['repeat']) + job = self.sched.add_interval_job(interval['handle'], hours = interval['hours'], minutes = interval['minutes'], seconds = interval['seconds']) interval['job'] = job except ValueError, e: log.error("Failed adding interval cronjob: %s" % e) @@ -88,13 +87,12 @@ class Scheduler(Plugin): 'minute': minute, } - def interval(self, identifier = '', handle = None, hours = 0, minutes = 0, seconds = 0, repeat = 0): - log.info('Scheduling %s, interval: hours = %s, minutes = %s, seconds = %s, repeat = %s' % (identifier, hours, minutes, seconds, repeat)) + def interval(self, identifier = '', handle = None, hours = 0, minutes = 0, seconds = 0): + log.info('Scheduling %s, interval: hours = %s, minutes = %s, seconds = %s' % (identifier, hours, minutes, seconds)) self.remove(identifier) self.intervals[identifier] = { 'handle': handle, - 'repeat': repeat, 'hours': hours, 'minutes': minutes, 'seconds': seconds, diff --git a/libs/apscheduler/__init__.py b/libs/apscheduler/__init__.py index e69de29..a55959f 100644 --- a/libs/apscheduler/__init__.py +++ b/libs/apscheduler/__init__.py @@ -0,0 +1,3 @@ +version_info = (2, 0, 2) +version = '.'.join(str(n) for n in version_info[:3]) +release = version + ''.join(str(n) for n in version_info[3:]) diff --git a/libs/apscheduler/events.py b/libs/apscheduler/events.py new file mode 100644 index 0000000..80bde8e --- /dev/null +++ b/libs/apscheduler/events.py @@ -0,0 +1,64 @@ +__all__ = ('EVENT_SCHEDULER_START', 'EVENT_SCHEDULER_SHUTDOWN', + 'EVENT_JOBSTORE_ADDED', 'EVENT_JOBSTORE_REMOVED', + 'EVENT_JOBSTORE_JOB_ADDED', 'EVENT_JOBSTORE_JOB_REMOVED', + 'EVENT_JOB_EXECUTED', 'EVENT_JOB_ERROR', 'EVENT_JOB_MISSED', + 'EVENT_ALL', 'SchedulerEvent', 'JobStoreEvent', 'JobEvent') + + +EVENT_SCHEDULER_START = 1 # The scheduler was started +EVENT_SCHEDULER_SHUTDOWN = 2 # The scheduler was shut down +EVENT_JOBSTORE_ADDED = 4 # A job store was added to the scheduler +EVENT_JOBSTORE_REMOVED = 8 # A job store was removed from the scheduler +EVENT_JOBSTORE_JOB_ADDED = 16 # A job was added to a job store +EVENT_JOBSTORE_JOB_REMOVED = 32 # A job was removed from a job store +EVENT_JOB_EXECUTED = 64 # A job was executed successfully +EVENT_JOB_ERROR = 128 # A job raised an exception during execution +EVENT_JOB_MISSED = 256 # A job's execution was missed +EVENT_ALL = (EVENT_SCHEDULER_START | EVENT_SCHEDULER_SHUTDOWN | + EVENT_JOBSTORE_ADDED | EVENT_JOBSTORE_REMOVED | + EVENT_JOBSTORE_JOB_ADDED | EVENT_JOBSTORE_JOB_REMOVED | + EVENT_JOB_EXECUTED | EVENT_JOB_ERROR | EVENT_JOB_MISSED) + + +class SchedulerEvent(object): + """ + An event that concerns the scheduler itself. + + :var code: the type code of this event + """ + def __init__(self, code): + self.code = code + + +class JobStoreEvent(SchedulerEvent): + """ + An event that concerns job stores. + + :var alias: the alias of the job store involved + :var job: the new job if a job was added + """ + def __init__(self, code, alias, job=None): + SchedulerEvent.__init__(self, code) + self.alias = alias + if job: + self.job = job + + +class JobEvent(SchedulerEvent): + """ + An event that concerns the execution of individual jobs. + + :var job: the job instance in question + :var scheduled_run_time: the time when the job was scheduled to be run + :var retval: the return value of the successfully executed job + :var exception: the exception raised by the job + :var traceback: the traceback object associated with the exception + """ + def __init__(self, code, job, scheduled_run_time, retval=None, + exception=None, traceback=None): + SchedulerEvent.__init__(self, code) + self.job = job + self.scheduled_run_time = scheduled_run_time + self.retval = retval + self.exception = exception + self.traceback = traceback diff --git a/libs/apscheduler/expressions.py b/libs/apscheduler/expressions.py deleted file mode 100644 index d19754d..0000000 --- a/libs/apscheduler/expressions.py +++ /dev/null @@ -1,176 +0,0 @@ -""" -This module contains the expressions applicable for CronTrigger's fields. -""" -from calendar import monthrange -import re - -from apscheduler.util import asint - -__all__ = ('AllExpression', 'RangeExpression', 'WeekdayRangeExpression', - 'WeekdayPositionExpression') - -WEEKDAYS = ['mon', 'tue', 'wed', 'thu', 'fri', 'sat', 'sun'] - - -class AllExpression(object): - value_re = re.compile(r'\*(?:/(?P\d+))?$') - - def __init__(self, step=None): - self.step = asint(step) - if self.step == 0: - raise ValueError('Increment must be higher than 0') - - def get_next_value(self, date, field): - start = field.get_value(date) - minval = field.get_min(date) - maxval = field.get_max(date) - start = max(start, minval) - - if not self.step: - next = start - else: - distance_to_next = (self.step - (start - minval)) % self.step - next = start + distance_to_next - - if next <= maxval: - return next - - def __str__(self): - if self.step: - return '*/%d' % self.step - return '*' - - def __repr__(self): - return "%s(%s)" % (self.__class__.__name__, self.step) - - -class RangeExpression(AllExpression): - value_re = re.compile( - r'(?P\d+)(?:-(?P\d+))?(?:/(?P\d+))?$') - - def __init__(self, first, last=None, step=None): - AllExpression.__init__(self, step) - first = asint(first) - last = asint(last) - if last is None and step is None: - last = first - if last is not None and first > last: - raise ValueError('The minimum value in a range must not be ' - 'higher than the maximum') - self.first = first - self.last = last - - def get_next_value(self, date, field): - start = field.get_value(date) - minval = field.get_min(date) - maxval = field.get_max(date) - - # Apply range limits - minval = max(minval, self.first) - if self.last is not None: - maxval = min(maxval, self.last) - start = max(start, minval) - - if not self.step: - next = start - else: - distance_to_next = (self.step - (start - minval)) % self.step - next = start + distance_to_next - - if next <= maxval: - return next - - def __str__(self): - if self.last != self.first and self.last is not None: - range = '%d-%d' % (self.first, self.last) - else: - range = str(self.first) - - if self.step: - return '%s/%d' % (range, self.step) - return range - - def __repr__(self): - args = [str(self.first)] - if self.last != self.first and self.last is not None or self.step: - args.append(str(self.last)) - if self.step: - args.append(str(self.step)) - return "%s(%s)" % (self.__class__.__name__, ', '.join(args)) - - -class WeekdayRangeExpression(RangeExpression): - value_re = re.compile(r'(?P[a-z]+)(?:-(?P[a-z]+))?', - re.IGNORECASE) - - def __init__(self, first, last=None): - try: - first_num = WEEKDAYS.index(first.lower()) - except ValueError: - raise ValueError('Invalid weekday name "%s"' % first) - - if last: - try: - last_num = WEEKDAYS.index(last.lower()) - except ValueError: - raise ValueError('Invalid weekday name "%s"' % last) - else: - last_num = None - - RangeExpression.__init__(self, first_num, last_num) - - def __str__(self): - if self.last != self.first and self.last is not None: - return '%s-%s' % (WEEKDAYS[self.first], WEEKDAYS[self.last]) - return WEEKDAYS[self.first] - - def __repr__(self): - args = ["'%s'" % WEEKDAYS[self.first]] - if self.last != self.first and self.last is not None: - args.append("'%s'" % WEEKDAYS[self.last]) - return "%s(%s)" % (self.__class__.__name__, ', '.join(args)) - - -class WeekdayPositionExpression(AllExpression): - options = ['1st', '2nd', '3rd', '4th', '5th', 'last'] - value_re = re.compile(r'(?P%s) +(?P(?:\d+|\w+))' - % '|'.join(options), re.IGNORECASE) - - def __init__(self, option_name, weekday_name): - try: - self.option_num = self.options.index(option_name.lower()) - except ValueError: - raise ValueError('Invalid weekday position "%s"' % option_name) - - try: - self.weekday = WEEKDAYS.index(weekday_name.lower()) - except ValueError: - raise ValueError('Invalid weekday name "%s"' % weekday_name) - - def get_next_value(self, date, field): - # Figure out the weekday of the month's first day and the number - # of days in that month - first_day_wday, last_day = monthrange(date.year, date.month) - - # Calculate which day of the month is the first of the target weekdays - first_hit_day = self.weekday - first_day_wday + 1 - if first_hit_day <= 0: - first_hit_day += 7 - - # Calculate what day of the month the target weekday would be - if self.option_num < 5: - target_day = first_hit_day + self.option_num * 7 - else: - target_day = first_hit_day + ((last_day - first_hit_day) / 7) * 7 - - if target_day <= last_day and target_day >= date.day: - return target_day - - def __str__(self): - return '%s %s' % (self.options[self.option_num], - WEEKDAYS[self.weekday]) - - def __repr__(self): - return "%s('%s', '%s')" % (self.__class__.__name__, - self.options[self.option_num], - WEEKDAYS[self.weekday]) diff --git a/libs/apscheduler/fields.py b/libs/apscheduler/fields.py deleted file mode 100644 index ebc35f5..0000000 --- a/libs/apscheduler/fields.py +++ /dev/null @@ -1,92 +0,0 @@ -""" -Fields represent :class:`~apscheduler.triggers.CronTrigger` options which map -to :class:`~datetime.datetime` fields. -""" -from calendar import monthrange - -from apscheduler.expressions import * - -__all__ = ('BaseField', 'WeekField', 'DayOfMonthField', 'DayOfWeekField') - -MIN_VALUES = {'year': 1970, 'month': 1, 'day': 1, 'week': 1, - 'day_of_week': 0, 'hour': 0, 'minute': 0, 'second': 0} -MAX_VALUES = {'year': 2 ** 63, 'month': 12, 'day:': 31, 'week': 53, - 'day_of_week': 6, 'hour': 23, 'minute': 59, 'second': 59} - -class BaseField(object): - REAL = True - COMPILERS = [AllExpression, RangeExpression] - - def __init__(self, name, exprs): - self.name = name - self.compile_expressions(exprs) - - def get_min(self, dateval): - return MIN_VALUES[self.name] - - def get_max(self, dateval): - return MAX_VALUES[self.name] - - def get_value(self, dateval): - return getattr(dateval, self.name) - - def get_next_value(self, dateval): - smallest = None - for expr in self.expressions: - value = expr.get_next_value(dateval, self) - if smallest is None or (value is not None and value < smallest): - smallest = value - - return smallest - - def compile_expressions(self, exprs): - self.expressions = [] - - # Split a comma-separated expression list, if any - exprs = str(exprs).strip() - if ',' in exprs: - for expr in exprs.split(','): - self.compile_expression(expr) - else: - self.compile_expression(exprs) - - def compile_expression(self, expr): - for compiler in self.COMPILERS: - match = compiler.value_re.match(expr) - if match: - compiled_expr = compiler(**match.groupdict()) - self.expressions.append(compiled_expr) - return - - raise ValueError('Unrecognized expression "%s" for field "%s"' % - (expr, self.name)) - - def __str__(self): - expr_strings = (str(e) for e in self.expressions) - return ','.join(expr_strings) - - def __repr__(self): - return "%s('%s', '%s')" % (self.__class__.__name__, self.name, - str(self)) - - -class WeekField(BaseField): - REAL = False - - def get_value(self, dateval): - return dateval.isocalendar()[1] - - -class DayOfMonthField(BaseField): - COMPILERS = BaseField.COMPILERS + [WeekdayPositionExpression] - - def get_max(self, dateval): - return monthrange(dateval.year, dateval.month)[1] - - -class DayOfWeekField(BaseField): - REAL = False - COMPILERS = BaseField.COMPILERS + [WeekdayRangeExpression] - - def get_value(self, dateval): - return dateval.weekday() diff --git a/libs/apscheduler/job.py b/libs/apscheduler/job.py new file mode 100644 index 0000000..868e723 --- /dev/null +++ b/libs/apscheduler/job.py @@ -0,0 +1,134 @@ +""" +Jobs represent scheduled tasks. +""" + +from threading import Lock +from datetime import timedelta + +from apscheduler.util import to_unicode, ref_to_obj, get_callable_name,\ + obj_to_ref + + +class MaxInstancesReachedError(Exception): + pass + + +class Job(object): + """ + Encapsulates the actual Job along with its metadata. Job instances + are created by the scheduler when adding jobs, and it should not be + directly instantiated. + + :param trigger: trigger that determines the execution times + :param func: callable to call when the trigger is triggered + :param args: list of positional arguments to call func with + :param kwargs: dict of keyword arguments to call func with + :param name: name of the job (optional) + :param misfire_grace_time: seconds after the designated run time that + the job is still allowed to be run + :param coalesce: run once instead of many times if the scheduler determines + that the job should be run more than once in succession + :param max_runs: maximum number of times this job is allowed to be + triggered + :param max_instances: maximum number of concurrently running + instances allowed for this job + """ + id = None + next_run_time = None + + def __init__(self, trigger, func, args, kwargs, misfire_grace_time, + coalesce, name=None, max_runs=None, max_instances=1): + if not trigger: + raise ValueError('The trigger must not be None') + if not hasattr(func, '__call__'): + raise TypeError('func must be callable') + if not hasattr(args, '__getitem__'): + raise TypeError('args must be a list-like object') + if not hasattr(kwargs, '__getitem__'): + raise TypeError('kwargs must be a dict-like object') + if misfire_grace_time <= 0: + raise ValueError('misfire_grace_time must be a positive value') + if max_runs is not None and max_runs <= 0: + raise ValueError('max_runs must be a positive value') + if max_instances <= 0: + raise ValueError('max_instances must be a positive value') + + self._lock = Lock() + + self.trigger = trigger + self.func = func + self.args = args + self.kwargs = kwargs + self.name = to_unicode(name or get_callable_name(func)) + self.misfire_grace_time = misfire_grace_time + self.coalesce = coalesce + self.max_runs = max_runs + self.max_instances = max_instances + self.runs = 0 + self.instances = 0 + + def compute_next_run_time(self, now): + if self.runs == self.max_runs: + self.next_run_time = None + else: + self.next_run_time = self.trigger.get_next_fire_time(now) + + return self.next_run_time + + def get_run_times(self, now): + """ + Computes the scheduled run times between ``next_run_time`` and ``now``. + """ + run_times = [] + run_time = self.next_run_time + increment = timedelta(microseconds=1) + while ((not self.max_runs or self.runs < self.max_runs) and + run_time and run_time <= now): + run_times.append(run_time) + run_time = self.trigger.get_next_fire_time(run_time + increment) + + return run_times + + def add_instance(self): + self._lock.acquire() + try: + if self.instances == self.max_instances: + raise MaxInstancesReachedError + self.instances += 1 + finally: + self._lock.release() + + def remove_instance(self): + self._lock.acquire() + try: + assert self.instances > 0, 'Already at 0 instances' + self.instances -= 1 + finally: + self._lock.release() + + def __getstate__(self): + # Prevents the unwanted pickling of transient or unpicklable variables + state = self.__dict__.copy() + state.pop('instances', None) + state.pop('func', None) + state.pop('_lock', None) + state['func_ref'] = obj_to_ref(self.func) + return state + + def __setstate__(self, state): + state['instances'] = 0 + state['func'] = ref_to_obj(state.pop('func_ref')) + state['_lock'] = Lock() + self.__dict__ = state + + def __eq__(self, other): + if isinstance(other, Job): + return self.id is not None and other.id == self.id or self is other + return NotImplemented + + def __repr__(self): + return '' % (self.name, repr(self.trigger)) + + def __str__(self): + return '%s (trigger: %s, next run at: %s)' % (self.name, + str(self.trigger), str(self.next_run_time)) diff --git a/libs/apscheduler/jobstores/__init__.py b/libs/apscheduler/jobstores/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/libs/apscheduler/jobstores/base.py b/libs/apscheduler/jobstores/base.py new file mode 100644 index 0000000..f0a16dd --- /dev/null +++ b/libs/apscheduler/jobstores/base.py @@ -0,0 +1,25 @@ +""" +Abstract base class that provides the interface needed by all job stores. +Job store methods are also documented here. +""" + + +class JobStore(object): + def add_job(self, job): + """Adds the given job from this store.""" + raise NotImplementedError + + def update_job(self, job): + """Persists the running state of the given job.""" + raise NotImplementedError + + def remove_job(self, job): + """Removes the given jobs from this store.""" + raise NotImplementedError + + def load_jobs(self): + """Loads jobs from this store into memory.""" + raise NotImplementedError + + def close(self): + """Frees any resources still bound to this job store.""" diff --git a/libs/apscheduler/jobstores/mongodb_store.py b/libs/apscheduler/jobstores/mongodb_store.py new file mode 100644 index 0000000..3f522c2 --- /dev/null +++ b/libs/apscheduler/jobstores/mongodb_store.py @@ -0,0 +1,84 @@ +""" +Stores jobs in a MongoDB database. +""" +import logging + +from apscheduler.jobstores.base import JobStore +from apscheduler.job import Job + +try: + import cPickle as pickle +except ImportError: # pragma: nocover + import pickle + +try: + from bson.binary import Binary + from pymongo.connection import Connection +except ImportError: # pragma: nocover + raise ImportError('MongoDBJobStore requires PyMongo installed') + +logger = logging.getLogger(__name__) + + +class MongoDBJobStore(JobStore): + def __init__(self, database='apscheduler', collection='jobs', + connection=None, pickle_protocol=pickle.HIGHEST_PROTOCOL, + **connect_args): + self.jobs = [] + self.pickle_protocol = pickle_protocol + + if not database: + raise ValueError('The "database" parameter must not be empty') + if not collection: + raise ValueError('The "collection" parameter must not be empty') + + if connection: + self.connection = connection + else: + self.connection = Connection(**connect_args) + + self.collection = self.connection[database][collection] + + def add_job(self, job): + job_dict = job.__getstate__() + job_dict['trigger'] = Binary(pickle.dumps(job.trigger, + self.pickle_protocol)) + job_dict['args'] = Binary(pickle.dumps(job.args, + self.pickle_protocol)) + job_dict['kwargs'] = Binary(pickle.dumps(job.kwargs, + self.pickle_protocol)) + job.id = self.collection.insert(job_dict) + self.jobs.append(job) + + def remove_job(self, job): + self.collection.remove(job.id) + self.jobs.remove(job) + + def load_jobs(self): + jobs = [] + for job_dict in self.collection.find(): + try: + job = Job.__new__(Job) + job_dict['id'] = job_dict.pop('_id') + job_dict['trigger'] = pickle.loads(job_dict['trigger']) + job_dict['args'] = pickle.loads(job_dict['args']) + job_dict['kwargs'] = pickle.loads(job_dict['kwargs']) + job.__setstate__(job_dict) + jobs.append(job) + except Exception: + job_name = job_dict.get('name', '(unknown)') + logger.exception('Unable to restore job "%s"', job_name) + self.jobs = jobs + + def update_job(self, job): + spec = {'_id': job.id} + document = {'$set': {'next_run_time': job.next_run_time}, + '$inc': {'runs': 1}} + self.collection.update(spec, document) + + def close(self): + self.connection.disconnect() + + def __repr__(self): + connection = self.collection.database.connection + return '<%s (connection=%s)>' % (self.__class__.__name__, connection) diff --git a/libs/apscheduler/jobstores/ram_store.py b/libs/apscheduler/jobstores/ram_store.py new file mode 100644 index 0000000..85091fe --- /dev/null +++ b/libs/apscheduler/jobstores/ram_store.py @@ -0,0 +1,25 @@ +""" +Stores jobs in an array in RAM. Provides no persistence support. +""" + +from apscheduler.jobstores.base import JobStore + + +class RAMJobStore(JobStore): + def __init__(self): + self.jobs = [] + + def add_job(self, job): + self.jobs.append(job) + + def update_job(self, job): + pass + + def remove_job(self, job): + self.jobs.remove(job) + + def load_jobs(self): + pass + + def __repr__(self): + return '<%s>' % (self.__class__.__name__) diff --git a/libs/apscheduler/jobstores/shelve_store.py b/libs/apscheduler/jobstores/shelve_store.py new file mode 100644 index 0000000..87c95f8 --- /dev/null +++ b/libs/apscheduler/jobstores/shelve_store.py @@ -0,0 +1,65 @@ +""" +Stores jobs in a file governed by the :mod:`shelve` module. +""" + +import shelve +import pickle +import random +import logging + +from apscheduler.jobstores.base import JobStore +from apscheduler.job import Job +from apscheduler.util import itervalues + +logger = logging.getLogger(__name__) + + +class ShelveJobStore(JobStore): + MAX_ID = 1000000 + + def __init__(self, path, pickle_protocol=pickle.HIGHEST_PROTOCOL): + self.jobs = [] + self.path = path + self.pickle_protocol = pickle_protocol + self.store = shelve.open(path, 'c', self.pickle_protocol) + + def _generate_id(self): + id = None + while not id: + id = str(random.randint(1, self.MAX_ID)) + if not id in self.store: + return id + + def add_job(self, job): + job.id = self._generate_id() + self.jobs.append(job) + self.store[job.id] = job.__getstate__() + + def update_job(self, job): + job_dict = self.store[job.id] + job_dict['next_run_time'] = job.next_run_time + job_dict['runs'] = job.runs + self.store[job.id] = job_dict + + def remove_job(self, job): + del self.store[job.id] + self.jobs.remove(job) + + def load_jobs(self): + jobs = [] + for job_dict in itervalues(self.store): + try: + job = Job.__new__(Job) + job.__setstate__(job_dict) + jobs.append(job) + except Exception: + job_name = job_dict.get('name', '(unknown)') + logger.exception('Unable to restore job "%s"', job_name) + + self.jobs = jobs + + def close(self): + self.store.close() + + def __repr__(self): + return '<%s (path=%s)>' % (self.__class__.__name__, self.path) diff --git a/libs/apscheduler/jobstores/sqlalchemy_store.py b/libs/apscheduler/jobstores/sqlalchemy_store.py new file mode 100644 index 0000000..41ed4c7 --- /dev/null +++ b/libs/apscheduler/jobstores/sqlalchemy_store.py @@ -0,0 +1,87 @@ +""" +Stores jobs in a database table using SQLAlchemy. +""" +import pickle +import logging + +from apscheduler.jobstores.base import JobStore +from apscheduler.job import Job + +try: + from sqlalchemy import * +except ImportError: # pragma: nocover + raise ImportError('SQLAlchemyJobStore requires SQLAlchemy installed') + +logger = logging.getLogger(__name__) + + +class SQLAlchemyJobStore(JobStore): + def __init__(self, url=None, engine=None, tablename='apscheduler_jobs', + metadata=None, pickle_protocol=pickle.HIGHEST_PROTOCOL): + self.jobs = [] + self.pickle_protocol = pickle_protocol + + if engine: + self.engine = engine + elif url: + self.engine = create_engine(url) + else: + raise ValueError('Need either "engine" or "url" defined') + + self.jobs_t = Table(tablename, metadata or MetaData(), + Column('id', Integer, + Sequence(tablename + '_id_seq', optional=True), + primary_key=True), + Column('trigger', PickleType(pickle_protocol, mutable=False), + nullable=False), + Column('func_ref', String(1024), nullable=False), + Column('args', PickleType(pickle_protocol, mutable=False), + nullable=False), + Column('kwargs', PickleType(pickle_protocol, mutable=False), + nullable=False), + Column('name', Unicode(1024)), + Column('misfire_grace_time', Integer, nullable=False), + Column('coalesce', Boolean, nullable=False), + Column('max_runs', Integer), + Column('max_instances', Integer), + Column('next_run_time', DateTime, nullable=False), + Column('runs', BigInteger)) + + self.jobs_t.create(self.engine, True) + + def add_job(self, job): + job_dict = job.__getstate__() + result = self.engine.execute(self.jobs_t.insert().values(**job_dict)) + job.id = result.inserted_primary_key[0] + self.jobs.append(job) + + def remove_job(self, job): + delete = self.jobs_t.delete().where(self.jobs_t.c.id == job.id) + self.engine.execute(delete) + self.jobs.remove(job) + + def load_jobs(self): + jobs = [] + for row in self.engine.execute(select([self.jobs_t])): + try: + job = Job.__new__(Job) + job_dict = dict(row.items()) + job.__setstate__(job_dict) + jobs.append(job) + except Exception: + job_name = job_dict.get('name', '(unknown)') + logger.exception('Unable to restore job "%s"', job_name) + self.jobs = jobs + + def update_job(self, job): + job_dict = job.__getstate__() + update = self.jobs_t.update().where(self.jobs_t.c.id == job.id).\ + values(next_run_time=job_dict['next_run_time'], + runs=job_dict['runs']) + self.engine.execute(update) + + def close(self): + self.engine.dispose() + + def __repr__(self): + return '<%s (url=%s)>' % (self.__class__.__name__, self.engine.url) diff --git a/libs/apscheduler/scheduler.py b/libs/apscheduler/scheduler.py index cc50f49..50769e4 100644 --- a/libs/apscheduler/scheduler.py +++ b/libs/apscheduler/scheduler.py @@ -1,83 +1,32 @@ """ -This module is the main part of the library, and is the only module that -regular users should be concerned with. +This module is the main part of the library. It houses the Scheduler class +and related exceptions. """ + from threading import Thread, Event, Lock from datetime import datetime, timedelta from logging import getLogger import os +import sys -from apscheduler.util import time_difference, asbool -from apscheduler.triggers import DateTrigger, IntervalTrigger, CronTrigger - +from apscheduler.util import * +from apscheduler.triggers import SimpleTrigger, IntervalTrigger, CronTrigger +from apscheduler.jobstores.ram_store import RAMJobStore +from apscheduler.job import Job, MaxInstancesReachedError +from apscheduler.events import * +from apscheduler.threadpool import ThreadPool logger = getLogger(__name__) -class Job(object): - """ - Represents a task scheduled in the scheduler. - """ - - def __init__(self, trigger, func, args, kwargs): - self.thread = None - self.trigger = trigger - self.func = func - self.args = args - self.kwargs = kwargs - if hasattr(func, '__name__'): - self.name = func.__name__ - else: - self.name = str(func) - - def run(self): - """ - Starts the execution of this job in a separate thread. - """ - if (self.thread and self.thread.isAlive()): - logger.info('Skipping run of job %s (previously triggered ' - 'instance is still running)', self) - else: - self.thread = Thread(target=self.run_in_thread) - self.thread.setDaemon(False) - self.thread.start() - - def run_in_thread(self): - """ - Runs the associated callable. - This method is executed in a dedicated thread. - """ - try: - self.func(*self.args, **self.kwargs) - except: - logger.exception('Error executing job "%s"', self) - raise - - def __str__(self): - return '%s: %s' % (self.name, repr(self.trigger)) - - def __repr__(self): - return '%s(%s, %s)' % (self.__class__.__name__, self.name, - repr(self.trigger)) - - -class SchedulerShutdownError(Exception): - """ - Thrown when attempting to use the scheduler after - it's been shut down. - """ - - def __init__(self): - Exception.__init__(self, 'Scheduler has already been shut down') - - class SchedulerAlreadyRunningError(Exception): """ - Thrown when attempting to start the scheduler, but it's already running. + Raised when attempting to start or configure the scheduler when it's + already running. """ - def __init__(self): - Exception.__init__(self, 'Scheduler is already running') + def __str__(self): + return 'Scheduler is already running' class Scheduler(object): @@ -86,141 +35,265 @@ class Scheduler(object): their execution. """ - stopped = False - thread = None - misfire_grace_time = 1 - daemonic = True + _stopped = False + _thread = None - def __init__(self, **config): - self.jobs = [] - self.jobs_lock = Lock() - self.wakeup = Event() - self.configure(config) + def __init__(self, gconfig={}, **options): + self._wakeup = Event() + self._jobstores = {} + self._jobstores_lock = Lock() + self._listeners = [] + self._listeners_lock = Lock() + self._pending_jobs = [] + self.configure(gconfig, **options) - def configure(self, config): + def configure(self, gconfig={}, **options): """ - Updates the configuration with the given options. + Reconfigures the scheduler with the given options. Can only be done + when the scheduler isn't running. """ - for key, val in config.items(): - if key.startswith('apscheduler.'): - key = key[12:] - if key == 'misfire_grace_time': - self.misfire_grace_time = int(val) - elif key == 'daemonic': - self.daemonic = asbool(val) + if self.running: + raise SchedulerAlreadyRunningError + + # Set general options + config = combine_opts(gconfig, 'apscheduler.', options) + self.misfire_grace_time = int(config.pop('misfire_grace_time', 1)) + self.coalesce = asbool(config.pop('coalesce', True)) + self.daemonic = asbool(config.pop('daemonic', True)) + + # Configure the thread pool + if 'threadpool' in config: + self._threadpool = maybe_ref(config['threadpool']) + else: + threadpool_opts = combine_opts(config, 'threadpool.') + self._threadpool = ThreadPool(**threadpool_opts) + + # Configure job stores + jobstore_opts = combine_opts(config, 'jobstore.') + jobstores = {} + for key, value in jobstore_opts.items(): + store_name, option = key.split('.', 1) + opts_dict = jobstores.setdefault(store_name, {}) + opts_dict[option] = value + + for alias, opts in jobstores.items(): + classname = opts.pop('class') + cls = maybe_ref(classname) + jobstore = cls(**opts) + self.add_jobstore(jobstore, alias, True) def start(self): """ Starts the scheduler in a new thread. """ - if self.thread and self.thread.isAlive(): + if self.running: raise SchedulerAlreadyRunningError - - self.stopped = False - self.thread = Thread(target=self.run, name='APScheduler') - self.thread.setDaemon(self.daemonic) - self.thread.start() - logger.info('Scheduler started') - def shutdown(self, timeout=0): + # Create a RAMJobStore as the default if there is no default job store + if not 'default' in self._jobstores: + self.add_jobstore(RAMJobStore(), 'default', True) + + # Schedule all pending jobs + for job, jobstore in self._pending_jobs: + self._real_add_job(job, jobstore, False) + del self._pending_jobs[:] + + self._stopped = False + self._thread = Thread(target=self._main_loop, name='APScheduler') + self._thread.setDaemon(self.daemonic) + self._thread.start() + + def shutdown(self, wait=True, shutdown_threadpool=True): """ Shuts down the scheduler and terminates the thread. - Does not terminate any currently running jobs. + Does not interrupt any currently running jobs. - :param timeout: time (in seconds) to wait for the scheduler thread to - terminate, 0 to wait forever, None to skip waiting + :param wait: ``True`` to wait until all currently executing jobs have + finished (if ``shutdown_threadpool`` is also ``True``) + :param shutdown_threadpool: ``True`` to shut down the thread pool """ - if self.stopped or not self.thread.isAlive(): + if not self.running: return - logger.info('Scheduler shutting down') - self.stopped = True - self.wakeup.set() - if timeout is not None: - self.thread.join(timeout) - self.jobs = [] + self._stopped = True + self._wakeup.set() + + # Shut down the thread pool + if shutdown_threadpool: + self._threadpool.shutdown(wait) + + # Wait until the scheduler thread terminates + self._thread.join() - def cron_schedule(self, year='*', month='*', day='*', week='*', - day_of_week='*', hour='*', minute='*', second='*', - args=None, kwargs=None): + @property + def running(self): + return not self._stopped and self._thread and self._thread.isAlive() + + def add_jobstore(self, jobstore, alias, quiet=False): """ - Decorator that causes its host function to be scheduled - according to the given parameters. - This decorator does not wrap its host function. - The scheduled function will be called without any arguments. - See :meth:`add_cron_job` for more information. + Adds a job store to this scheduler. + + :param jobstore: job store to be added + :param alias: alias for the job store + :param quiet: True to suppress scheduler thread wakeup + :type jobstore: instance of + :class:`~apscheduler.jobstores.base.JobStore` + :type alias: str """ - def inner(func): - self.add_cron_job(func, year, month, day, week, day_of_week, hour, - minute, second, args, kwargs) - return func - return inner + self._jobstores_lock.acquire() + try: + if alias in self._jobstores: + raise KeyError('Alias "%s" is already in use' % alias) + self._jobstores[alias] = jobstore + jobstore.load_jobs() + finally: + self._jobstores_lock.release() + + # Notify listeners that a new job store has been added + self._notify_listeners(JobStoreEvent(EVENT_JOBSTORE_ADDED, alias)) - def interval_schedule(self, weeks=0, days=0, hours=0, minutes=0, seconds=0, - start_date=None, repeat=0, args=None, kwargs=None): + # Notify the scheduler so it can scan the new job store for jobs + if not quiet: + self._wakeup.set() + + def remove_jobstore(self, alias): """ - Decorator that causes its host function to be scheduled - for execution on specified intervals. - This decorator does not wrap its host function. - The scheduled function will be called without any arguments. - Note that the default repeat value is 0, which means to repeat forever. - See :meth:`add_delayed_job` for more information. + Removes the job store by the given alias from this scheduler. + + :type alias: str """ - def inner(func): - self.add_interval_job(func, weeks, days, hours, minutes, seconds, - start_date, repeat, args, kwargs) - return func - return inner + self._jobstores_lock.acquire() + try: + try: + del self._jobstores[alias] + except KeyError: + raise KeyError('No such job store: %s' % alias) + finally: + self._jobstores_lock.release() - def _add_job(self, trigger, func, args, kwargs): + # Notify listeners that a job store has been removed + self._notify_listeners(JobStoreEvent(EVENT_JOBSTORE_REMOVED, alias)) + + def add_listener(self, callback, mask=EVENT_ALL): """ - Adds a Job to the job list and notifies the scheduler thread. + Adds a listener for scheduler events. When a matching event occurs, + ``callback`` is executed with the event object as its sole argument. + If the ``mask`` parameter is not provided, the callback will receive + events of all types. - :param trigger: trigger for the given callable - :param args: list of positional arguments to call func with - :param kwargs: dict of keyword arguments to call func with - :return: the scheduled job - :rtype: Job + :param callback: any callable that takes one argument + :param mask: bitmask that indicates which events should be listened to """ - if self.stopped: - raise SchedulerShutdownError - if not hasattr(func, '__call__'): - raise TypeError('func must be callable') + self._listeners_lock.acquire() + try: + self._listeners.append((callback, mask)) + finally: + self._listeners_lock.release() + + def remove_listener(self, callback): + """ + Removes a previously added event listener. + """ + self._listeners_lock.acquire() + try: + for i, (cb, _) in enumerate(self._listeners): + if callback == cb: + del self._listeners[i] + finally: + self._listeners_lock.release() + + def _notify_listeners(self, event): + self._listeners_lock.acquire() + try: + listeners = tuple(self._listeners) + finally: + self._listeners_lock.release() - if args is None: - args = [] - if kwargs is None: - kwargs = {} + for cb, mask in listeners: + if event.code & mask: + try: + cb(event) + except: + logger.exception('Error notifying listener') - job = Job(trigger, func, args, kwargs) - self.jobs_lock.acquire() + def _real_add_job(self, job, jobstore, wakeup): + job.compute_next_run_time(datetime.now()) + if not job.next_run_time: + raise ValueError('Not adding job since it would never be run') + + self._jobstores_lock.acquire() try: - self.jobs.append(job) + try: + store = self._jobstores[jobstore] + except KeyError: + raise KeyError('No such job store: %s' % jobstore) + store.add_job(job) finally: - self.jobs_lock.release() - logger.info('Added job "%s"', job) + self._jobstores_lock.release() + + # Notify listeners that a new job has been added + event = JobStoreEvent(EVENT_JOBSTORE_JOB_ADDED, jobstore, job) + self._notify_listeners(event) + + logger.info('Added job "%s" to job store "%s"', job, jobstore) # Notify the scheduler about the new job - self.wakeup.set() + if wakeup: + self._wakeup.set() + def add_job(self, trigger, func, args, kwargs, jobstore='default', + **options): + """ + Adds the given job to the job list and notifies the scheduler thread. + + :param trigger: alias of the job store to store the job in + :param func: callable to run at the given time + :param args: list of positional arguments to call func with + :param kwargs: dict of keyword arguments to call func with + :param jobstore: alias of the job store to store the job in + :rtype: :class:`~apscheduler.job.Job` + """ + job = Job(trigger, func, args or [], kwargs or {}, + options.pop('misfire_grace_time', self.misfire_grace_time), + options.pop('coalesce', self.coalesce), **options) + if not self.running: + self._pending_jobs.append((job, jobstore)) + logger.info('Adding job tentatively -- it will be properly ' + 'scheduled when the scheduler starts') + else: + self._real_add_job(job, jobstore, True) return job - def add_date_job(self, func, date, args=None, kwargs=None): + def _remove_job(self, job, alias, jobstore): + jobstore.remove_job(job) + + # Notify listeners that a job has been removed + event = JobStoreEvent(EVENT_JOBSTORE_JOB_REMOVED, alias, job) + self._notify_listeners(event) + + logger.info('Removed job "%s"', job) + + def add_date_job(self, func, date, args=None, kwargs=None, **options): """ - Adds a job to be completed on a specific date and time. + Schedules a job to be completed on a specific date and time. - :param func: callable to run - :param args: positional arguments to call func with - :param kwargs: keyword arguments to call func with + :param func: callable to run at the given time + :param date: the date/time to run the job at + :param name: name of the job + :param jobstore: stored the job in the named (or given) job store + :param misfire_grace_time: seconds after the designated run time that + the job is still allowed to be run + :type date: :class:`datetime.date` + :rtype: :class:`~apscheduler.job.Job` """ - trigger = DateTrigger(date) - return self._add_job(trigger, func, args, kwargs) + trigger = SimpleTrigger(date) + return self.add_job(trigger, func, args, kwargs, **options) def add_interval_job(self, func, weeks=0, days=0, hours=0, minutes=0, - seconds=0, start_date=None, repeat=0, args=None, - kwargs=None): + seconds=0, start_date=None, args=None, kwargs=None, + **options): """ - Adds a job to be completed on specified intervals. + Schedules a job to be completed on specified intervals. :param func: callable to run :param weeks: number of weeks to wait @@ -230,25 +303,29 @@ class Scheduler(object): :param seconds: number of seconds to wait :param start_date: when to first execute the job and start the counter (default is after the given interval) - :param repeat: number of times the job will be run (0 = repeat - indefinitely) :param args: list of positional arguments to call func with :param kwargs: dict of keyword arguments to call func with + :param name: name of the job + :param jobstore: alias of the job store to add the job to + :param misfire_grace_time: seconds after the designated run time that + the job is still allowed to be run + :rtype: :class:`~apscheduler.job.Job` """ interval = timedelta(weeks=weeks, days=days, hours=hours, minutes=minutes, seconds=seconds) - trigger = IntervalTrigger(interval, repeat, start_date) - return self._add_job(trigger, func, args, kwargs) + trigger = IntervalTrigger(interval, start_date) + return self.add_job(trigger, func, args, kwargs, **options) - def add_cron_job(self, func, year='*', month='*', day='*', week='*', - day_of_week='*', hour='*', minute='*', second='*', - args=None, kwargs=None): + def add_cron_job(self, func, year=None, month=None, day=None, week=None, + day_of_week=None, hour=None, minute=None, second=None, + start_date=None, args=None, kwargs=None, **options): """ - Adds a job to be completed on times that match the given expressions. + Schedules a job to be completed on times that match the given + expressions. :param func: callable to run :param year: year to run on - :param month: month to run on (0 = January) + :param month: month to run on :param day: day of month to run on :param week: week of the year to run on :param day_of_week: weekday to run on (0 = Monday) @@ -256,152 +333,227 @@ class Scheduler(object): :param second: second to run on :param args: list of positional arguments to call func with :param kwargs: dict of keyword arguments to call func with + :param name: name of the job + :param jobstore: alias of the job store to add the job to + :param misfire_grace_time: seconds after the designated run time that + the job is still allowed to be run :return: the scheduled job - :rtype: Job + :rtype: :class:`~apscheduler.job.Job` """ trigger = CronTrigger(year=year, month=month, day=day, week=week, day_of_week=day_of_week, hour=hour, - minute=minute, second=second) - return self._add_job(trigger, func, args, kwargs) + minute=minute, second=second, + start_date=start_date) + return self.add_job(trigger, func, args, kwargs, **options) + + def cron_schedule(self, **options): + """ + Decorator version of :meth:`add_cron_job`. + This decorator does not wrap its host function. + Unscheduling decorated functions is possible by passing the ``job`` + attribute of the scheduled function to :meth:`unschedule_job`. + """ + def inner(func): + func.job = self.add_cron_job(func, **options) + return func + return inner + + def interval_schedule(self, **options): + """ + Decorator version of :meth:`add_interval_job`. + This decorator does not wrap its host function. + Unscheduling decorated functions is possible by passing the ``job`` + attribute of the scheduled function to :meth:`unschedule_job`. + """ + def inner(func): + func.job = self.add_interval_job(func, **options) + return func + return inner - def is_job_active(self, job): + def get_jobs(self): """ - Determines if the given job is still on the job list. + Returns a list of all scheduled jobs. - :return: True if the job is still active, False if not + :return: list of :class:`~apscheduler.job.Job` objects """ - self.jobs_lock.acquire() + self._jobstores_lock.acquire() try: - return job in self.jobs + jobs = [] + for jobstore in itervalues(self._jobstores): + jobs.extend(jobstore.jobs) + return jobs finally: - self.jobs_lock.release() + self._jobstores_lock.release() def unschedule_job(self, job): """ - Removes a job, preventing it from being fired any more. + Removes a job, preventing it from being run any more. """ - self.jobs_lock.acquire() + self._jobstores_lock.acquire() try: - self.jobs.remove(job) + for alias, jobstore in iteritems(self._jobstores): + if job in list(jobstore.jobs): + self._remove_job(job, alias, jobstore) + return finally: - self.jobs_lock.release() - logger.info('Removed job "%s"', job) - self.wakeup.set() + self._jobstores_lock.release() + + raise KeyError('Job "%s" is not scheduled in any job store' % job) def unschedule_func(self, func): """ Removes all jobs that would execute the given function. """ - self.jobs_lock.acquire() + found = False + self._jobstores_lock.acquire() try: - remove_list = [job for job in self.jobs if job.func == func] - for job in remove_list: - self.jobs.remove(job) - logger.info('Removed job "%s"', job) + for alias, jobstore in iteritems(self._jobstores): + for job in list(jobstore.jobs): + if job.func == func: + self._remove_job(job, alias, jobstore) + found = True finally: - self.jobs_lock.release() + self._jobstores_lock.release() - # Have the scheduler calculate a new wakeup time - self.wakeup.set() + if not found: + raise KeyError('The given function is not scheduled in this ' + 'scheduler') - def dump_jobs(self): + def print_jobs(self, out=None): """ - Gives a textual listing of all jobs currently scheduled on this + Prints out a textual listing of all jobs currently scheduled on this scheduler. - :rtype: str + :param out: a file-like object to print to (defaults to **sys.stdout** + if nothing is given) """ + out = out or sys.stdout job_strs = [] - now = datetime.now() - self.jobs_lock.acquire() + self._jobstores_lock.acquire() try: - for job in self.jobs: - next_fire_time = job.trigger.get_next_fire_time(now) - job_str = '%s (next fire time: %s)' % (str(job), - next_fire_time) - job_strs.append(job_str) + for alias, jobstore in iteritems(self._jobstores): + job_strs.append('Jobstore %s:' % alias) + if jobstore.jobs: + for job in jobstore.jobs: + job_strs.append(' %s' % job) + else: + job_strs.append(' No scheduled jobs') finally: - self.jobs_lock.release() + self._jobstores_lock.release() - if job_strs: - return os.linesep.join(job_strs) - return 'No jobs currently scheduled.' + out.write(os.linesep.join(job_strs) + os.linesep) - def _get_next_wakeup_time(self, now): + def _run_job(self, job, run_times): """ - Determines the time of the next job execution, and removes finished - jobs. - - :param now: the result of datetime.now(), generated elsewhere for - consistency. + Acts as a harness that runs the actual job code in a thread. """ - next_wakeup = None - finished_jobs = [] - - self.jobs_lock.acquire() - try: - for job in self.jobs: - next_run = job.trigger.get_next_fire_time(now) - if next_run is None: - finished_jobs.append(job) - elif next_run and (next_wakeup is None or \ - next_run < next_wakeup): - next_wakeup = next_run - - # Clear out any finished jobs - for job in finished_jobs: - self.jobs.remove(job) - logger.info('Removed finished job "%s"', job) - finally: - self.jobs_lock.release() - - return next_wakeup - - def _get_current_jobs(self): - """ - Determines which jobs should be executed right now. - """ - current_jobs = [] - now = datetime.now() - start = now - timedelta(seconds=self.misfire_grace_time) - - self.jobs_lock.acquire() + for run_time in run_times: + # See if the job missed its run time window, and handle possible + # misfires accordingly + difference = datetime.now() - run_time + grace_time = timedelta(seconds=job.misfire_grace_time) + if difference > grace_time: + # Notify listeners about a missed run + event = JobEvent(EVENT_JOB_MISSED, job, run_time) + self._notify_listeners(event) + logger.warning('Run time of job "%s" was missed by %s', + job, difference) + else: + try: + job.add_instance() + except MaxInstancesReachedError: + event = JobEvent(EVENT_JOB_MISSED, job, run_time) + self._notify_listeners(event) + logger.warning('Execution of job "%s" skipped: ' + 'maximum number of running instances ' + 'reached (%d)', job, job.max_instances) + break + + logger.info('Running job "%s" (scheduled at %s)', job, + run_time) + + try: + retval = job.func(*job.args, **job.kwargs) + except: + # Notify listeners about the exception + exc, tb = sys.exc_info()[1:] + event = JobEvent(EVENT_JOB_ERROR, job, run_time, + exception=exc, traceback=tb) + self._notify_listeners(event) + + logger.exception('Job "%s" raised an exception', job) + else: + # Notify listeners about successful execution + event = JobEvent(EVENT_JOB_EXECUTED, job, run_time, + retval=retval) + self._notify_listeners(event) + + logger.info('Job "%s" executed successfully', job) + + job.remove_instance() + + # If coalescing is enabled, don't attempt any further runs + if job.coalesce: + break + + def _process_jobs(self, now): + """ + Iterates through jobs in every jobstore, starts pending jobs + and figures out the next wakeup time. + """ + next_wakeup_time = None + self._jobstores_lock.acquire() try: - for job in self.jobs: - next_run = job.trigger.get_next_fire_time(start) - if next_run: - time_diff = time_difference(now, next_run) - if next_run < now and time_diff <= self.misfire_grace_time: - current_jobs.append(job) + for alias, jobstore in iteritems(self._jobstores): + for job in tuple(jobstore.jobs): + run_times = job.get_run_times(now) + if run_times: + self._threadpool.submit(self._run_job, job, run_times) + + # Increase the job's run count + if job.coalesce: + job.runs += 1 + else: + job.runs += len(run_times) + + # Update the job, but don't keep finished jobs around + if job.compute_next_run_time(now + timedelta(microseconds=1)): + jobstore.update_job(job) + else: + self._remove_job(job, alias, jobstore) + + if not next_wakeup_time: + next_wakeup_time = job.next_run_time + elif job.next_run_time: + next_wakeup_time = min(next_wakeup_time, + job.next_run_time) + return next_wakeup_time finally: - self.jobs_lock.release() + self._jobstores_lock.release() - return current_jobs + def _main_loop(self): + """Executes jobs on schedule.""" - def run(self): - """ - Runs the main loop of the scheduler. - """ - self.wakeup.clear() - while not self.stopped: - # Execute any jobs scheduled to be run right now - for job in self._get_current_jobs(): - logger.debug('Executing job "%s"', job) - job.run() + logger.info('Scheduler started') + self._notify_listeners(SchedulerEvent(EVENT_SCHEDULER_START)) - # Figure out when the next job should be run, and - # adjust the wait time accordingly + self._wakeup.clear() + while not self._stopped: + logger.debug('Looking for jobs to run') now = datetime.now() - next_wakeup_time = self._get_next_wakeup_time(now) + next_wakeup_time = self._process_jobs(now) # Sleep until the next job is scheduled to be run, - # or a new job is added, or the scheduler is stopped + # a new job is added or the scheduler is stopped if next_wakeup_time is not None: wait_seconds = time_difference(next_wakeup_time, now) logger.debug('Next wakeup is due at %s (in %f seconds)', next_wakeup_time, wait_seconds) - self.wakeup.wait(wait_seconds) + self._wakeup.wait(wait_seconds) else: logger.debug('No jobs; waiting until a job is added') - self.wakeup.wait() - self.wakeup.clear() + self._wakeup.wait() + self._wakeup.clear() + + logger.info('Scheduler has been shut down') + self._notify_listeners(SchedulerEvent(EVENT_SCHEDULER_SHUTDOWN)) diff --git a/libs/apscheduler/threadpool.py b/libs/apscheduler/threadpool.py new file mode 100644 index 0000000..8ec47da --- /dev/null +++ b/libs/apscheduler/threadpool.py @@ -0,0 +1,133 @@ +""" +Generic thread pool class. Modeled after Java's ThreadPoolExecutor. +Please note that this ThreadPool does *not* fully implement the PEP 3148 +ThreadPool! +""" + +from threading import Thread, Lock, currentThread +from weakref import ref +import logging +import atexit + +try: + from queue import Queue, Empty +except ImportError: + from Queue import Queue, Empty + +logger = logging.getLogger(__name__) +_threadpools = set() + + +# Worker threads are daemonic in order to let the interpreter exit without +# an explicit shutdown of the thread pool. The following trick is necessary +# to allow worker threads to finish cleanly. +def _shutdown_all(): + for pool_ref in tuple(_threadpools): + pool = pool_ref() + if pool: + pool.shutdown() + +atexit.register(_shutdown_all) + + +class ThreadPool(object): + def __init__(self, core_threads=0, max_threads=20, keepalive=1): + """ + :param core_threads: maximum number of persistent threads in the pool + :param max_threads: maximum number of total threads in the pool + :param thread_class: callable that creates a Thread object + :param keepalive: seconds to keep non-core worker threads waiting + for new tasks + """ + self.core_threads = core_threads + self.max_threads = max(max_threads, core_threads, 1) + self.keepalive = keepalive + self._queue = Queue() + self._threads_lock = Lock() + self._threads = set() + self._shutdown = False + + _threadpools.add(ref(self)) + logger.info('Started thread pool with %d core threads and %s maximum ' + 'threads', core_threads, max_threads or 'unlimited') + + def _adjust_threadcount(self): + self._threads_lock.acquire() + try: + if self.num_threads < self.max_threads: + self._add_thread(self.num_threads < self.core_threads) + finally: + self._threads_lock.release() + + def _add_thread(self, core): + t = Thread(target=self._run_jobs, args=(core,)) + t.setDaemon(True) + t.start() + self._threads.add(t) + + def _run_jobs(self, core): + logger.debug('Started worker thread') + block = True + timeout = None + if not core: + block = self.keepalive > 0 + timeout = self.keepalive + + while True: + try: + func, args, kwargs = self._queue.get(block, timeout) + except Empty: + break + + if self._shutdown: + break + + try: + func(*args, **kwargs) + except: + logger.exception('Error in worker thread') + + self._threads_lock.acquire() + self._threads.remove(currentThread()) + self._threads_lock.release() + + logger.debug('Exiting worker thread') + + @property + def num_threads(self): + return len(self._threads) + + def submit(self, func, *args, **kwargs): + if self._shutdown: + raise RuntimeError('Cannot schedule new tasks after shutdown') + + self._queue.put((func, args, kwargs)) + self._adjust_threadcount() + + def shutdown(self, wait=True): + if self._shutdown: + return + + logging.info('Shutting down thread pool') + self._shutdown = True + _threadpools.remove(ref(self)) + + self._threads_lock.acquire() + for _ in range(self.num_threads): + self._queue.put((None, None, None)) + self._threads_lock.release() + + if wait: + self._threads_lock.acquire() + threads = tuple(self._threads) + self._threads_lock.release() + for thread in threads: + thread.join() + + def __repr__(self): + if self.max_threads: + threadcount = '%d/%d' % (self.num_threads, self.max_threads) + else: + threadcount = '%d' % self.num_threads + + return '' % (id(self), threadcount) diff --git a/libs/apscheduler/triggers.py b/libs/apscheduler/triggers.py deleted file mode 100644 index 9886b2b..0000000 --- a/libs/apscheduler/triggers.py +++ /dev/null @@ -1,171 +0,0 @@ -""" -Triggers determine the times when a job should be executed. -""" -from datetime import datetime, timedelta -from math import ceil - -from apscheduler.fields import * -from apscheduler.util import * - -__all__ = ('CronTrigger', 'DateTrigger', 'IntervalTrigger') - - -class CronTrigger(object): - FIELD_NAMES = ('year', 'month', 'day', 'week', 'day_of_week', 'hour', - 'minute', 'second') - FIELDS_MAP = {'year': BaseField, - 'month': BaseField, - 'week': WeekField, - 'day': DayOfMonthField, - 'day_of_week': DayOfWeekField, - 'hour': BaseField, - 'minute': BaseField, - 'second': BaseField} - - def __init__(self, **values): - self.fields = [] - for field_name in self.FIELD_NAMES: - exprs = values.get(field_name) or '*' - field_class = self.FIELDS_MAP[field_name] - field = field_class(field_name, exprs) - self.fields.append(field) - - def _increment_field_value(self, dateval, fieldnum): - """ - Increments the designated field and resets all less significant fields - to their minimum values. - - :type dateval: datetime - :type fieldnum: int - :type amount: int - :rtype: tuple - :return: a tuple containing the new date, and the number of the field - that was actually incremented - """ - i = 0 - values = {} - while i < len(self.fields): - field = self.fields[i] - if not field.REAL: - if i == fieldnum: - fieldnum -= 1 - i -= 1 - else: - i += 1 - continue - - if i < fieldnum: - values[field.name] = field.get_value(dateval) - i += 1 - elif i > fieldnum: - values[field.name] = field.get_min(dateval) - i += 1 - else: - value = field.get_value(dateval) - maxval = field.get_max(dateval) - if value == maxval: - fieldnum -= 1 - i -= 1 - else: - values[field.name] = value + 1 - i += 1 - - return datetime(**values), fieldnum - - def _set_field_value(self, dateval, fieldnum, new_value): - values = {} - for i, field in enumerate(self.fields): - if field.REAL: - if i < fieldnum: - values[field.name] = field.get_value(dateval) - elif i > fieldnum: - values[field.name] = field.get_min(dateval) - else: - values[field.name] = new_value - - return datetime(**values) - - def get_next_fire_time(self, start_date): - next_date = datetime_ceil(start_date) - fieldnum = 0 - while 0 <= fieldnum < len(self.fields): - field = self.fields[fieldnum] - curr_value = field.get_value(next_date) - next_value = field.get_next_value(next_date) - - if next_value is None: - # No valid value was found - next_date, fieldnum = self._increment_field_value(next_date, - fieldnum - 1) - elif next_value > curr_value: - # A valid, but higher than the starting value, was found - if field.REAL: - next_date = self._set_field_value(next_date, fieldnum, - next_value) - fieldnum += 1 - else: - next_date, fieldnum = self._increment_field_value(next_date, - fieldnum) - else: - # A valid value was found, no changes necessary - fieldnum += 1 - - if fieldnum >= 0: - return next_date - - def __repr__(self): - field_reprs = ("%s='%s'" % (f.name, str(f)) for f in self.fields - if str(f) != '*') - return '%s(%s)' % (self.__class__.__name__, ', '.join(field_reprs)) - - -class DateTrigger(object): - def __init__(self, run_date): - self.run_date = convert_to_datetime(run_date) - - def get_next_fire_time(self, start_date): - if self.run_date >= start_date: - return self.run_date - - def __repr__(self): - return '%s(%s)' % (self.__class__.__name__, repr(self.run_date)) - - -class IntervalTrigger(object): - def __init__(self, interval, repeat, start_date=None): - if not isinstance(interval, timedelta): - raise TypeError('interval must be a timedelta') - if repeat < 0: - raise ValueError('Illegal value for repeat; expected >= 0, ' - 'received %s' % repeat) - - self.interval = interval - self.interval_length = timedelta_seconds(self.interval) - if self.interval_length == 0: - self.interval = timedelta(seconds=1) - self.interval_length = 1 - self.repeat = repeat - if start_date is None: - self.first_fire_date = datetime.now() + self.interval - else: - self.first_fire_date = convert_to_datetime(start_date) - self.first_fire_date -= timedelta(microseconds=\ - self.first_fire_date.microsecond) - if repeat > 0: - self.last_fire_date = self.first_fire_date + interval * (repeat - 1) - else: - self.last_fire_date = None - - def get_next_fire_time(self, start_date): - if start_date < self.first_fire_date: - return self.first_fire_date - if self.last_fire_date and start_date > self.last_fire_date: - return None - timediff_seconds = timedelta_seconds(start_date - self.first_fire_date) - next_interval_num = int(ceil(timediff_seconds / self.interval_length)) - return self.first_fire_date + self.interval * next_interval_num - - def __repr__(self): - return "%s(interval=%s, repeat=%d, start_date=%s)" % ( - self.__class__.__name__, repr(self.interval), self.repeat, - repr(self.first_fire_date)) diff --git a/libs/apscheduler/triggers/__init__.py b/libs/apscheduler/triggers/__init__.py new file mode 100644 index 0000000..74a9788 --- /dev/null +++ b/libs/apscheduler/triggers/__init__.py @@ -0,0 +1,3 @@ +from apscheduler.triggers.cron import CronTrigger +from apscheduler.triggers.interval import IntervalTrigger +from apscheduler.triggers.simple import SimpleTrigger diff --git a/libs/apscheduler/triggers/cron/__init__.py b/libs/apscheduler/triggers/cron/__init__.py new file mode 100644 index 0000000..763edb1 --- /dev/null +++ b/libs/apscheduler/triggers/cron/__init__.py @@ -0,0 +1,142 @@ +from datetime import date, datetime + +from apscheduler.triggers.cron.fields import * +from apscheduler.util import datetime_ceil, convert_to_datetime, iteritems + + +class CronTrigger(object): + FIELD_NAMES = ('year', 'month', 'day', 'week', 'day_of_week', 'hour', + 'minute', 'second') + FIELDS_MAP = {'year': BaseField, + 'month': BaseField, + 'week': WeekField, + 'day': DayOfMonthField, + 'day_of_week': DayOfWeekField, + 'hour': BaseField, + 'minute': BaseField, + 'second': BaseField} + + def __init__(self, **values): + self.start_date = values.pop('start_date', None) + if self.start_date: + self.start_date = convert_to_datetime(self.start_date) + + # Yank out all None valued fields + for key, value in list(iteritems(values)): + if value is None: + del values[key] + + self.fields = [] + assign_defaults = False + for field_name in self.FIELD_NAMES: + if field_name in values: + exprs = values.pop(field_name) + is_default = False + assign_defaults = not values + elif assign_defaults: + exprs = DEFAULT_VALUES[field_name] + is_default = True + else: + exprs = '*' + is_default = True + + field_class = self.FIELDS_MAP[field_name] + field = field_class(field_name, exprs, is_default) + self.fields.append(field) + + def _increment_field_value(self, dateval, fieldnum): + """ + Increments the designated field and resets all less significant fields + to their minimum values. + + :type dateval: datetime + :type fieldnum: int + :type amount: int + :rtype: tuple + :return: a tuple containing the new date, and the number of the field + that was actually incremented + """ + i = 0 + values = {} + while i < len(self.fields): + field = self.fields[i] + if not field.REAL: + if i == fieldnum: + fieldnum -= 1 + i -= 1 + else: + i += 1 + continue + + if i < fieldnum: + values[field.name] = field.get_value(dateval) + i += 1 + elif i > fieldnum: + values[field.name] = field.get_min(dateval) + i += 1 + else: + value = field.get_value(dateval) + maxval = field.get_max(dateval) + if value == maxval: + fieldnum -= 1 + i -= 1 + else: + values[field.name] = value + 1 + i += 1 + + return datetime(**values), fieldnum + + def _set_field_value(self, dateval, fieldnum, new_value): + values = {} + for i, field in enumerate(self.fields): + if field.REAL: + if i < fieldnum: + values[field.name] = field.get_value(dateval) + elif i > fieldnum: + values[field.name] = field.get_min(dateval) + else: + values[field.name] = new_value + + return datetime(**values) + + def get_next_fire_time(self, start_date): + if self.start_date: + start_date = max(start_date, self.start_date) + next_date = datetime_ceil(start_date) + fieldnum = 0 + while 0 <= fieldnum < len(self.fields): + field = self.fields[fieldnum] + curr_value = field.get_value(next_date) + next_value = field.get_next_value(next_date) + + if next_value is None: + # No valid value was found + next_date, fieldnum = self._increment_field_value(next_date, + fieldnum - 1) + elif next_value > curr_value: + # A valid, but higher than the starting value, was found + if field.REAL: + next_date = self._set_field_value(next_date, fieldnum, + next_value) + fieldnum += 1 + else: + next_date, fieldnum = self._increment_field_value(next_date, + fieldnum) + else: + # A valid value was found, no changes necessary + fieldnum += 1 + + if fieldnum >= 0: + return next_date + + def __str__(self): + options = ["%s='%s'" % (f.name, str(f)) for f in self.fields + if not f.is_default] + return 'cron[%s]' % (', '.join(options)) + + def __repr__(self): + options = ["%s='%s'" % (f.name, str(f)) for f in self.fields + if not f.is_default] + if self.start_date: + options.append("start_date='%s'" % self.start_date.isoformat(' ')) + return '<%s (%s)>' % (self.__class__.__name__, ', '.join(options)) diff --git a/libs/apscheduler/triggers/cron/expressions.py b/libs/apscheduler/triggers/cron/expressions.py new file mode 100644 index 0000000..018c7a3 --- /dev/null +++ b/libs/apscheduler/triggers/cron/expressions.py @@ -0,0 +1,178 @@ +""" +This module contains the expressions applicable for CronTrigger's fields. +""" + +from calendar import monthrange +import re + +from apscheduler.util import asint + +__all__ = ('AllExpression', 'RangeExpression', 'WeekdayRangeExpression', + 'WeekdayPositionExpression') + + +WEEKDAYS = ['mon', 'tue', 'wed', 'thu', 'fri', 'sat', 'sun'] + + +class AllExpression(object): + value_re = re.compile(r'\*(?:/(?P\d+))?$') + + def __init__(self, step=None): + self.step = asint(step) + if self.step == 0: + raise ValueError('Increment must be higher than 0') + + def get_next_value(self, date, field): + start = field.get_value(date) + minval = field.get_min(date) + maxval = field.get_max(date) + start = max(start, minval) + + if not self.step: + next = start + else: + distance_to_next = (self.step - (start - minval)) % self.step + next = start + distance_to_next + + if next <= maxval: + return next + + def __str__(self): + if self.step: + return '*/%d' % self.step + return '*' + + def __repr__(self): + return "%s(%s)" % (self.__class__.__name__, self.step) + + +class RangeExpression(AllExpression): + value_re = re.compile( + r'(?P\d+)(?:-(?P\d+))?(?:/(?P\d+))?$') + + def __init__(self, first, last=None, step=None): + AllExpression.__init__(self, step) + first = asint(first) + last = asint(last) + if last is None and step is None: + last = first + if last is not None and first > last: + raise ValueError('The minimum value in a range must not be ' + 'higher than the maximum') + self.first = first + self.last = last + + def get_next_value(self, date, field): + start = field.get_value(date) + minval = field.get_min(date) + maxval = field.get_max(date) + + # Apply range limits + minval = max(minval, self.first) + if self.last is not None: + maxval = min(maxval, self.last) + start = max(start, minval) + + if not self.step: + next = start + else: + distance_to_next = (self.step - (start - minval)) % self.step + next = start + distance_to_next + + if next <= maxval: + return next + + def __str__(self): + if self.last != self.first and self.last is not None: + range = '%d-%d' % (self.first, self.last) + else: + range = str(self.first) + + if self.step: + return '%s/%d' % (range, self.step) + return range + + def __repr__(self): + args = [str(self.first)] + if self.last != self.first and self.last is not None or self.step: + args.append(str(self.last)) + if self.step: + args.append(str(self.step)) + return "%s(%s)" % (self.__class__.__name__, ', '.join(args)) + + +class WeekdayRangeExpression(RangeExpression): + value_re = re.compile(r'(?P[a-z]+)(?:-(?P[a-z]+))?', + re.IGNORECASE) + + def __init__(self, first, last=None): + try: + first_num = WEEKDAYS.index(first.lower()) + except ValueError: + raise ValueError('Invalid weekday name "%s"' % first) + + if last: + try: + last_num = WEEKDAYS.index(last.lower()) + except ValueError: + raise ValueError('Invalid weekday name "%s"' % last) + else: + last_num = None + + RangeExpression.__init__(self, first_num, last_num) + + def __str__(self): + if self.last != self.first and self.last is not None: + return '%s-%s' % (WEEKDAYS[self.first], WEEKDAYS[self.last]) + return WEEKDAYS[self.first] + + def __repr__(self): + args = ["'%s'" % WEEKDAYS[self.first]] + if self.last != self.first and self.last is not None: + args.append("'%s'" % WEEKDAYS[self.last]) + return "%s(%s)" % (self.__class__.__name__, ', '.join(args)) + + +class WeekdayPositionExpression(AllExpression): + options = ['1st', '2nd', '3rd', '4th', '5th', 'last'] + value_re = re.compile(r'(?P%s) +(?P(?:\d+|\w+))' + % '|'.join(options), re.IGNORECASE) + + def __init__(self, option_name, weekday_name): + try: + self.option_num = self.options.index(option_name.lower()) + except ValueError: + raise ValueError('Invalid weekday position "%s"' % option_name) + + try: + self.weekday = WEEKDAYS.index(weekday_name.lower()) + except ValueError: + raise ValueError('Invalid weekday name "%s"' % weekday_name) + + def get_next_value(self, date, field): + # Figure out the weekday of the month's first day and the number + # of days in that month + first_day_wday, last_day = monthrange(date.year, date.month) + + # Calculate which day of the month is the first of the target weekdays + first_hit_day = self.weekday - first_day_wday + 1 + if first_hit_day <= 0: + first_hit_day += 7 + + # Calculate what day of the month the target weekday would be + if self.option_num < 5: + target_day = first_hit_day + self.option_num * 7 + else: + target_day = first_hit_day + ((last_day - first_hit_day) / 7) * 7 + + if target_day <= last_day and target_day >= date.day: + return target_day + + def __str__(self): + return '%s %s' % (self.options[self.option_num], + WEEKDAYS[self.weekday]) + + def __repr__(self): + return "%s('%s', '%s')" % (self.__class__.__name__, + self.options[self.option_num], + WEEKDAYS[self.weekday]) diff --git a/libs/apscheduler/triggers/cron/fields.py b/libs/apscheduler/triggers/cron/fields.py new file mode 100644 index 0000000..ef970cc --- /dev/null +++ b/libs/apscheduler/triggers/cron/fields.py @@ -0,0 +1,99 @@ +""" +Fields represent CronTrigger options which map to :class:`~datetime.datetime` +fields. +""" + +from calendar import monthrange + +from apscheduler.triggers.cron.expressions import * + +__all__ = ('MIN_VALUES', 'MAX_VALUES', 'DEFAULT_VALUES', 'BaseField', + 'WeekField', 'DayOfMonthField', 'DayOfWeekField') + + +MIN_VALUES = {'year': 1970, 'month': 1, 'day': 1, 'week': 1, + 'day_of_week': 0, 'hour': 0, 'minute': 0, 'second': 0} +MAX_VALUES = {'year': 2 ** 63, 'month': 12, 'day:': 31, 'week': 53, + 'day_of_week': 6, 'hour': 23, 'minute': 59, 'second': 59} +DEFAULT_VALUES = {'year': '*', 'month': 1, 'day': 1, 'week': '*', + 'day_of_week': '*', 'hour': 0, 'minute': 0, 'second': 0} + + +class BaseField(object): + REAL = True + COMPILERS = [AllExpression, RangeExpression] + + def __init__(self, name, exprs, is_default=False): + self.name = name + self.is_default = is_default + self.compile_expressions(exprs) + + def get_min(self, dateval): + return MIN_VALUES[self.name] + + def get_max(self, dateval): + return MAX_VALUES[self.name] + + def get_value(self, dateval): + return getattr(dateval, self.name) + + def get_next_value(self, dateval): + smallest = None + for expr in self.expressions: + value = expr.get_next_value(dateval, self) + if smallest is None or (value is not None and value < smallest): + smallest = value + + return smallest + + def compile_expressions(self, exprs): + self.expressions = [] + + # Split a comma-separated expression list, if any + exprs = str(exprs).strip() + if ',' in exprs: + for expr in exprs.split(','): + self.compile_expression(expr) + else: + self.compile_expression(exprs) + + def compile_expression(self, expr): + for compiler in self.COMPILERS: + match = compiler.value_re.match(expr) + if match: + compiled_expr = compiler(**match.groupdict()) + self.expressions.append(compiled_expr) + return + + raise ValueError('Unrecognized expression "%s" for field "%s"' % + (expr, self.name)) + + def __str__(self): + expr_strings = (str(e) for e in self.expressions) + return ','.join(expr_strings) + + def __repr__(self): + return "%s('%s', '%s')" % (self.__class__.__name__, self.name, + str(self)) + + +class WeekField(BaseField): + REAL = False + + def get_value(self, dateval): + return dateval.isocalendar()[1] + + +class DayOfMonthField(BaseField): + COMPILERS = BaseField.COMPILERS + [WeekdayPositionExpression] + + def get_max(self, dateval): + return monthrange(dateval.year, dateval.month)[1] + + +class DayOfWeekField(BaseField): + REAL = False + COMPILERS = BaseField.COMPILERS + [WeekdayRangeExpression] + + def get_value(self, dateval): + return dateval.weekday() diff --git a/libs/apscheduler/triggers/interval.py b/libs/apscheduler/triggers/interval.py new file mode 100644 index 0000000..dd16d77 --- /dev/null +++ b/libs/apscheduler/triggers/interval.py @@ -0,0 +1,39 @@ +from datetime import datetime, timedelta +from math import ceil + +from apscheduler.util import convert_to_datetime, timedelta_seconds + + +class IntervalTrigger(object): + def __init__(self, interval, start_date=None): + if not isinstance(interval, timedelta): + raise TypeError('interval must be a timedelta') + if start_date: + start_date = convert_to_datetime(start_date) + + self.interval = interval + self.interval_length = timedelta_seconds(self.interval) + if self.interval_length == 0: + self.interval = timedelta(seconds=1) + self.interval_length = 1 + + if start_date is None: + self.start_date = datetime.now() + self.interval + else: + self.start_date = convert_to_datetime(start_date) + + def get_next_fire_time(self, start_date): + if start_date < self.start_date: + return self.start_date + + timediff_seconds = timedelta_seconds(start_date - self.start_date) + next_interval_num = int(ceil(timediff_seconds / self.interval_length)) + return self.start_date + self.interval * next_interval_num + + def __str__(self): + return 'interval[%s]' % str(self.interval) + + def __repr__(self): + return "<%s (interval=%s, start_date=%s)>" % ( + self.__class__.__name__, repr(self.interval), + repr(self.start_date)) diff --git a/libs/apscheduler/triggers/simple.py b/libs/apscheduler/triggers/simple.py new file mode 100644 index 0000000..ea61b3f --- /dev/null +++ b/libs/apscheduler/triggers/simple.py @@ -0,0 +1,17 @@ +from apscheduler.util import convert_to_datetime + + +class SimpleTrigger(object): + def __init__(self, run_date): + self.run_date = convert_to_datetime(run_date) + + def get_next_fire_time(self, start_date): + if self.run_date >= start_date: + return self.run_date + + def __str__(self): + return 'date[%s]' % str(self.run_date) + + def __repr__(self): + return '<%s (run_date=%s)>' % ( + self.__class__.__name__, repr(self.run_date)) diff --git a/libs/apscheduler/util.py b/libs/apscheduler/util.py index 7dfc767..a49aaed 100644 --- a/libs/apscheduler/util.py +++ b/libs/apscheduler/util.py @@ -4,9 +4,14 @@ This module contains several handy functions primarily meant for internal use. from datetime import date, datetime, timedelta from time import mktime +import re +import sys +from types import MethodType __all__ = ('asint', 'asbool', 'convert_to_datetime', 'timedelta_seconds', - 'time_difference', 'datetime_ceil') + 'time_difference', 'datetime_ceil', 'combine_opts', + 'get_callable_name', 'obj_to_ref', 'ref_to_obj', 'maybe_ref', + 'to_unicode', 'iteritems', 'itervalues', 'xrange') def asint(text): @@ -37,19 +42,36 @@ def asbool(obj): return bool(obj) -def convert_to_datetime(dateval): +_DATE_REGEX = re.compile( + r'(?P\d{4})-(?P\d{1,2})-(?P\d{1,2})' + r'(?: (?P\d{1,2}):(?P\d{1,2}):(?P\d{1,2})' + r'(?:\.(?P\d{1,6}))?)?') + + +def convert_to_datetime(input): """ - Converts a date object to a datetime object. + Converts the given object to a datetime object, if possible. If an actual datetime object is passed, it is returned unmodified. + If the input is a string, it is parsed as a datetime. + + Date strings are accepted in three different forms: date only (Y-m-d), + date with time (Y-m-d H:M:S) or with date+time with microseconds + (Y-m-d H:M:S.micro). - :type dateval: date :rtype: datetime """ - if isinstance(dateval, datetime): - return dateval - elif isinstance(dateval, date): - return datetime.fromordinal(dateval.toordinal()) - raise TypeError('Expected date, got %s instead' % type(dateval)) + if isinstance(input, datetime): + return input + elif isinstance(input, date): + return datetime.fromordinal(input.toordinal()) + elif isinstance(input, str): + m = _DATE_REGEX.match(input) + if not m: + raise ValueError('Invalid date string') + values = [(k, int(v or 0)) for k, v in m.groupdict().items()] + values = dict(values) + return datetime(**values) + raise TypeError('Unsupported input type: %s' % type(input)) def timedelta_seconds(delta): @@ -74,9 +96,9 @@ def time_difference(date1, date2): :type date2: datetime :rtype: float """ - later = mktime(date1.timetuple()) - earlier = mktime(date2.timetuple()) - return int(later - earlier) + later = mktime(date1.timetuple()) + date1.microsecond / 1000000.0 + earlier = mktime(date2.timetuple()) + date2.microsecond / 1000000.0 + return later - earlier def datetime_ceil(dateval): @@ -87,5 +109,122 @@ def datetime_ceil(dateval): """ if dateval.microsecond > 0: return dateval + timedelta(seconds=1, - microseconds=-dateval.microsecond) + microseconds= -dateval.microsecond) return dateval + + +def combine_opts(global_config, prefix, local_config={}): + """ + Returns a subdictionary from keys and values of ``global_config`` where + the key starts with the given prefix, combined with options from + local_config. The keys in the subdictionary have the prefix removed. + + :type global_config: dict + :type prefix: str + :type local_config: dict + :rtype: dict + """ + prefixlen = len(prefix) + subconf = {} + for key, value in global_config.items(): + if key.startswith(prefix): + key = key[prefixlen:] + subconf[key] = value + subconf.update(local_config) + return subconf + + +def get_callable_name(func): + """ + Returns the best available display name for the given function/callable. + """ + f_self = getattr(func, '__self__', None) or getattr(func, 'im_self', None) + + if f_self and hasattr(func, '__name__'): + if isinstance(f_self, type): + # class method + return '%s.%s' % (f_self.__name__, func.__name__) + # bound method + return '%s.%s' % (f_self.__class__.__name__, func.__name__) + + if hasattr(func, '__call__'): + if hasattr(func, '__name__'): + # function, unbound method or a class with a __call__ method + return func.__name__ + # instance of a class with a __call__ method + return func.__class__.__name__ + + raise TypeError('Unable to determine a name for %s -- ' + 'maybe it is not a callable?' % repr(func)) + + +def obj_to_ref(obj): + """ + Returns the path to the given object. + """ + ref = '%s:%s' % (obj.__module__, get_callable_name(obj)) + try: + obj2 = ref_to_obj(ref) + if obj != obj2: + raise ValueError + except Exception: + raise ValueError('Cannot determine the reference to %s' % repr(obj)) + + return ref + + +def ref_to_obj(ref): + """ + Returns the object pointed to by ``ref``. + """ + if not isinstance(ref, basestring): + raise TypeError('References must be strings') + if not ':' in ref: + raise ValueError('Invalid reference') + + modulename, rest = ref.split(':', 1) + try: + obj = __import__(modulename) + except ImportError: + raise LookupError('Error resolving reference %s: ' + 'could not import module' % ref) + + try: + for name in modulename.split('.')[1:] + rest.split('.'): + obj = getattr(obj, name) + return obj + except Exception: + raise LookupError('Error resolving reference %s: ' + 'error looking up object' % ref) + + +def maybe_ref(ref): + """ + Returns the object that the given reference points to, if it is indeed + a reference. If it is not a reference, the object is returned as-is. + """ + if not isinstance(ref, str): + return ref + return ref_to_obj(ref) + + +def to_unicode(string, encoding='ascii'): + """ + Safely converts a string to a unicode representation on any + Python version. + """ + if hasattr(string, 'decode'): + return string.decode(encoding, 'ignore') + return string # pragma: nocover + + +if sys.version_info < (3, 0): # pragma: nocover + iteritems = lambda d: d.iteritems() + itervalues = lambda d: d.itervalues() + xrange = xrange + basestring = basestring +else: # pragma: nocover + iteritems = lambda d: d.items() + itervalues = lambda d: d.values() + xrange = range + basestring = str