From b8f78e311d3d41a1c5c539b08bd1e21cef749ec5 Mon Sep 17 00:00:00 2001 From: Ruud Date: Fri, 22 Nov 2013 15:38:33 +0100 Subject: [PATCH] Update scheduler module --- couchpotato/core/_base/scheduler/main.py | 2 +- libs/apscheduler/__init__.py | 4 +- libs/apscheduler/job.py | 31 +++++---- libs/apscheduler/jobstores/ram_store.py | 2 +- libs/apscheduler/jobstores/redis_store.py | 91 ++++++++++++++++++++++++++ libs/apscheduler/jobstores/shelve_store.py | 5 +- libs/apscheduler/jobstores/sqlalchemy_store.py | 18 +++-- libs/apscheduler/scheduler.py | 70 ++++++++++++++++---- libs/apscheduler/triggers/cron/__init__.py | 16 +++-- libs/apscheduler/triggers/cron/expressions.py | 18 ++++- libs/apscheduler/triggers/cron/fields.py | 3 +- libs/apscheduler/util.py | 10 +-- 12 files changed, 216 insertions(+), 54 deletions(-) create mode 100644 libs/apscheduler/jobstores/redis_store.py diff --git a/couchpotato/core/_base/scheduler/main.py b/couchpotato/core/_base/scheduler/main.py index 2c97e1b..87b0533 100644 --- a/couchpotato/core/_base/scheduler/main.py +++ b/couchpotato/core/_base/scheduler/main.py @@ -31,8 +31,8 @@ class Scheduler(Plugin): pass def doShutdown(self): - super(Scheduler, self).doShutdown() self.stop() + return super(Scheduler, self).doShutdown() def stop(self): if self.started: diff --git a/libs/apscheduler/__init__.py b/libs/apscheduler/__init__.py index a55959f..d93e1b3 100644 --- a/libs/apscheduler/__init__.py +++ b/libs/apscheduler/__init__.py @@ -1,3 +1,3 @@ -version_info = (2, 0, 2) +version_info = (2, 1, 1) version = '.'.join(str(n) for n in version_info[:3]) -release = version + ''.join(str(n) for n in version_info[3:]) +release = '.'.join(str(n) for n in version_info) diff --git a/libs/apscheduler/job.py b/libs/apscheduler/job.py index 868e723..cfc09a2 100644 --- a/libs/apscheduler/job.py +++ b/libs/apscheduler/job.py @@ -16,22 +16,25 @@ class MaxInstancesReachedError(Exception): class Job(object): """ Encapsulates the actual Job along with its metadata. Job instances - are created by the scheduler when adding jobs, and it should not be - directly instantiated. - - :param trigger: trigger that determines the execution times - :param func: callable to call when the trigger is triggered - :param args: list of positional arguments to call func with - :param kwargs: dict of keyword arguments to call func with - :param name: name of the job (optional) - :param misfire_grace_time: seconds after the designated run time that + are created by the scheduler when adding jobs, and should not be + directly instantiated. These options can be set when adding jobs + to the scheduler (see :ref:`job_options`). + + :var trigger: trigger that determines the execution times + :var func: callable to call when the trigger is triggered + :var args: list of positional arguments to call func with + :var kwargs: dict of keyword arguments to call func with + :var name: name of the job + :var misfire_grace_time: seconds after the designated run time that the job is still allowed to be run - :param coalesce: run once instead of many times if the scheduler determines + :var coalesce: run once instead of many times if the scheduler determines that the job should be run more than once in succession - :param max_runs: maximum number of times this job is allowed to be + :var max_runs: maximum number of times this job is allowed to be triggered - :param max_instances: maximum number of concurrently running + :var max_instances: maximum number of concurrently running instances allowed for this job + :var runs: number of times this job has been triggered + :var instances: number of concurrently running instances of this job """ id = None next_run_time = None @@ -130,5 +133,5 @@ class Job(object): return '' % (self.name, repr(self.trigger)) def __str__(self): - return '%s (trigger: %s, next run at: %s)' % (self.name, - str(self.trigger), str(self.next_run_time)) + return '%s (trigger: %s, next run at: %s)' % ( + self.name, str(self.trigger), str(self.next_run_time)) diff --git a/libs/apscheduler/jobstores/ram_store.py b/libs/apscheduler/jobstores/ram_store.py index 85091fe..60458fb 100644 --- a/libs/apscheduler/jobstores/ram_store.py +++ b/libs/apscheduler/jobstores/ram_store.py @@ -8,7 +8,7 @@ from apscheduler.jobstores.base import JobStore class RAMJobStore(JobStore): def __init__(self): self.jobs = [] - + def add_job(self, job): self.jobs.append(job) diff --git a/libs/apscheduler/jobstores/redis_store.py b/libs/apscheduler/jobstores/redis_store.py new file mode 100644 index 0000000..5eabf4b --- /dev/null +++ b/libs/apscheduler/jobstores/redis_store.py @@ -0,0 +1,91 @@ +""" +Stores jobs in a Redis database. +""" +from uuid import uuid4 +from datetime import datetime +import logging + +from apscheduler.jobstores.base import JobStore +from apscheduler.job import Job + +try: + import cPickle as pickle +except ImportError: # pragma: nocover + import pickle + +try: + from redis import StrictRedis +except ImportError: # pragma: nocover + raise ImportError('RedisJobStore requires redis installed') + +try: + long = long +except NameError: + long = int + +logger = logging.getLogger(__name__) + + +class RedisJobStore(JobStore): + def __init__(self, db=0, key_prefix='jobs.', + pickle_protocol=pickle.HIGHEST_PROTOCOL, **connect_args): + self.jobs = [] + self.pickle_protocol = pickle_protocol + self.key_prefix = key_prefix + + if db is None: + raise ValueError('The "db" parameter must not be empty') + if not key_prefix: + raise ValueError('The "key_prefix" parameter must not be empty') + + self.redis = StrictRedis(db=db, **connect_args) + + def add_job(self, job): + job.id = str(uuid4()) + job_state = job.__getstate__() + job_dict = { + 'job_state': pickle.dumps(job_state, self.pickle_protocol), + 'runs': '0', + 'next_run_time': job_state.pop('next_run_time').isoformat()} + self.redis.hmset(self.key_prefix + job.id, job_dict) + self.jobs.append(job) + + def remove_job(self, job): + self.redis.delete(self.key_prefix + job.id) + self.jobs.remove(job) + + def load_jobs(self): + jobs = [] + keys = self.redis.keys(self.key_prefix + '*') + pipeline = self.redis.pipeline() + for key in keys: + pipeline.hgetall(key) + results = pipeline.execute() + + for job_dict in results: + job_state = {} + try: + job = Job.__new__(Job) + job_state = pickle.loads(job_dict['job_state'.encode()]) + job_state['runs'] = long(job_dict['runs'.encode()]) + dateval = job_dict['next_run_time'.encode()].decode() + job_state['next_run_time'] = datetime.strptime( + dateval, '%Y-%m-%dT%H:%M:%S') + job.__setstate__(job_state) + jobs.append(job) + except Exception: + job_name = job_state.get('name', '(unknown)') + logger.exception('Unable to restore job "%s"', job_name) + self.jobs = jobs + + def update_job(self, job): + attrs = { + 'next_run_time': job.next_run_time.isoformat(), + 'runs': job.runs} + self.redis.hmset(self.key_prefix + job.id, attrs) + + def close(self): + self.redis.connection_pool.disconnect() + + def __repr__(self): + return '<%s>' % self.__class__.__name__ diff --git a/libs/apscheduler/jobstores/shelve_store.py b/libs/apscheduler/jobstores/shelve_store.py index 87c95f8..bd68333 100644 --- a/libs/apscheduler/jobstores/shelve_store.py +++ b/libs/apscheduler/jobstores/shelve_store.py @@ -32,17 +32,20 @@ class ShelveJobStore(JobStore): def add_job(self, job): job.id = self._generate_id() - self.jobs.append(job) self.store[job.id] = job.__getstate__() + self.store.sync() + self.jobs.append(job) def update_job(self, job): job_dict = self.store[job.id] job_dict['next_run_time'] = job.next_run_time job_dict['runs'] = job.runs self.store[job.id] = job_dict + self.store.sync() def remove_job(self, job): del self.store[job.id] + self.store.sync() self.jobs.remove(job) def load_jobs(self): diff --git a/libs/apscheduler/jobstores/sqlalchemy_store.py b/libs/apscheduler/jobstores/sqlalchemy_store.py index 41ed4c7..5b64a35 100644 --- a/libs/apscheduler/jobstores/sqlalchemy_store.py +++ b/libs/apscheduler/jobstores/sqlalchemy_store.py @@ -4,6 +4,8 @@ Stores jobs in a database table using SQLAlchemy. import pickle import logging +import sqlalchemy + from apscheduler.jobstores.base import JobStore from apscheduler.job import Job @@ -28,17 +30,19 @@ class SQLAlchemyJobStore(JobStore): else: raise ValueError('Need either "engine" or "url" defined') - self.jobs_t = Table(tablename, metadata or MetaData(), + if sqlalchemy.__version__ < '0.7': + pickle_coltype = PickleType(pickle_protocol, mutable=False) + else: + pickle_coltype = PickleType(pickle_protocol) + self.jobs_t = Table( + tablename, metadata or MetaData(), Column('id', Integer, Sequence(tablename + '_id_seq', optional=True), primary_key=True), - Column('trigger', PickleType(pickle_protocol, mutable=False), - nullable=False), + Column('trigger', pickle_coltype, nullable=False), Column('func_ref', String(1024), nullable=False), - Column('args', PickleType(pickle_protocol, mutable=False), - nullable=False), - Column('kwargs', PickleType(pickle_protocol, mutable=False), - nullable=False), + Column('args', pickle_coltype, nullable=False), + Column('kwargs', pickle_coltype, nullable=False), Column('name', Unicode(1024)), Column('misfire_grace_time', Integer, nullable=False), Column('coalesce', Boolean, nullable=False), diff --git a/libs/apscheduler/scheduler.py b/libs/apscheduler/scheduler.py index 50769e4..d6afcad 100644 --- a/libs/apscheduler/scheduler.py +++ b/libs/apscheduler/scheduler.py @@ -35,7 +35,7 @@ class Scheduler(object): their execution. """ - _stopped = False + _stopped = True _thread = None def __init__(self, gconfig={}, **options): @@ -60,6 +60,7 @@ class Scheduler(object): self.misfire_grace_time = int(config.pop('misfire_grace_time', 1)) self.coalesce = asbool(config.pop('coalesce', True)) self.daemonic = asbool(config.pop('daemonic', True)) + self.standalone = asbool(config.pop('standalone', False)) # Configure the thread pool if 'threadpool' in config: @@ -85,6 +86,12 @@ class Scheduler(object): def start(self): """ Starts the scheduler in a new thread. + + In threaded mode (the default), this method will return immediately + after starting the scheduler thread. + + In standalone mode, this method will block until there are no more + scheduled jobs. """ if self.running: raise SchedulerAlreadyRunningError @@ -99,11 +106,15 @@ class Scheduler(object): del self._pending_jobs[:] self._stopped = False - self._thread = Thread(target=self._main_loop, name='APScheduler') - self._thread.setDaemon(self.daemonic) - self._thread.start() + if self.standalone: + self._main_loop() + else: + self._thread = Thread(target=self._main_loop, name='APScheduler') + self._thread.setDaemon(self.daemonic) + self._thread.start() - def shutdown(self, wait=True, shutdown_threadpool=True): + def shutdown(self, wait=True, shutdown_threadpool=True, + close_jobstores=True): """ Shuts down the scheduler and terminates the thread. Does not interrupt any currently running jobs. @@ -111,6 +122,7 @@ class Scheduler(object): :param wait: ``True`` to wait until all currently executing jobs have finished (if ``shutdown_threadpool`` is also ``True``) :param shutdown_threadpool: ``True`` to shut down the thread pool + :param close_jobstores: ``True`` to close all job stores after shutdown """ if not self.running: return @@ -123,11 +135,19 @@ class Scheduler(object): self._threadpool.shutdown(wait) # Wait until the scheduler thread terminates - self._thread.join() + if self._thread: + self._thread.join() + + # Close all job stores + if close_jobstores: + for jobstore in itervalues(self._jobstores): + jobstore.close() @property def running(self): - return not self._stopped and self._thread and self._thread.isAlive() + thread_alive = self._thread and self._thread.isAlive() + standalone = getattr(self, 'standalone', False) + return not self._stopped and (standalone or thread_alive) def add_jobstore(self, jobstore, alias, quiet=False): """ @@ -156,21 +176,25 @@ class Scheduler(object): if not quiet: self._wakeup.set() - def remove_jobstore(self, alias): + def remove_jobstore(self, alias, close=True): """ Removes the job store by the given alias from this scheduler. + :param close: ``True`` to close the job store after removing it :type alias: str """ self._jobstores_lock.acquire() try: - try: - del self._jobstores[alias] - except KeyError: + jobstore = self._jobstores.pop(alias) + if not jobstore: raise KeyError('No such job store: %s' % alias) finally: self._jobstores_lock.release() + # Close the job store if requested + if close: + jobstore.close() + # Notify listeners that a job store has been removed self._notify_listeners(JobStoreEvent(EVENT_JOBSTORE_REMOVED, alias)) @@ -245,8 +269,10 @@ class Scheduler(object): **options): """ Adds the given job to the job list and notifies the scheduler thread. + Any extra keyword arguments are passed along to the constructor of the + :class:`~apscheduler.job.Job` class (see :ref:`job_options`). - :param trigger: alias of the job store to store the job in + :param trigger: trigger that determines when ``func`` is called :param func: callable to run at the given time :param args: list of positional arguments to call func with :param kwargs: dict of keyword arguments to call func with @@ -276,6 +302,8 @@ class Scheduler(object): def add_date_job(self, func, date, args=None, kwargs=None, **options): """ Schedules a job to be completed on a specific date and time. + Any extra keyword arguments are passed along to the constructor of the + :class:`~apscheduler.job.Job` class (see :ref:`job_options`). :param func: callable to run at the given time :param date: the date/time to run the job at @@ -294,6 +322,8 @@ class Scheduler(object): **options): """ Schedules a job to be completed on specified intervals. + Any extra keyword arguments are passed along to the constructor of the + :class:`~apscheduler.job.Job` class (see :ref:`job_options`). :param func: callable to run :param weeks: number of weeks to wait @@ -322,6 +352,8 @@ class Scheduler(object): """ Schedules a job to be completed on times that match the given expressions. + Any extra keyword arguments are passed along to the constructor of the + :class:`~apscheduler.job.Job` class (see :ref:`job_options`). :param func: callable to run :param year: year to run on @@ -352,6 +384,8 @@ class Scheduler(object): This decorator does not wrap its host function. Unscheduling decorated functions is possible by passing the ``job`` attribute of the scheduled function to :meth:`unschedule_job`. + Any extra keyword arguments are passed along to the constructor of the + :class:`~apscheduler.job.Job` class (see :ref:`job_options`). """ def inner(func): func.job = self.add_cron_job(func, **options) @@ -364,6 +398,8 @@ class Scheduler(object): This decorator does not wrap its host function. Unscheduling decorated functions is possible by passing the ``job`` attribute of the scheduled function to :meth:`unschedule_job`. + Any extra keyword arguments are passed along to the constructor of the + :class:`~apscheduler.job.Job` class (see :ref:`job_options`). """ def inner(func): func.job = self.add_interval_job(func, **options) @@ -517,7 +553,8 @@ class Scheduler(object): job.runs += len(run_times) # Update the job, but don't keep finished jobs around - if job.compute_next_run_time(now + timedelta(microseconds=1)): + if job.compute_next_run_time( + now + timedelta(microseconds=1)): jobstore.update_job(job) else: self._remove_job(job, alias, jobstore) @@ -550,10 +587,15 @@ class Scheduler(object): logger.debug('Next wakeup is due at %s (in %f seconds)', next_wakeup_time, wait_seconds) self._wakeup.wait(wait_seconds) + self._wakeup.clear() + elif self.standalone: + logger.debug('No jobs left; shutting down scheduler') + self.shutdown() + break else: logger.debug('No jobs; waiting until a job is added') self._wakeup.wait() - self._wakeup.clear() + self._wakeup.clear() logger.info('Scheduler has been shut down') self._notify_listeners(SchedulerEvent(EVENT_SCHEDULER_SHUTDOWN)) diff --git a/libs/apscheduler/triggers/cron/__init__.py b/libs/apscheduler/triggers/cron/__init__.py index 763edb1..9e69f72 100644 --- a/libs/apscheduler/triggers/cron/__init__.py +++ b/libs/apscheduler/triggers/cron/__init__.py @@ -21,8 +21,10 @@ class CronTrigger(object): if self.start_date: self.start_date = convert_to_datetime(self.start_date) - # Yank out all None valued fields + # Check field names and yank out all None valued fields for key, value in list(iteritems(values)): + if key not in self.FIELD_NAMES: + raise TypeError('Invalid field name: %s' % key) if value is None: del values[key] @@ -111,17 +113,17 @@ class CronTrigger(object): if next_value is None: # No valid value was found - next_date, fieldnum = self._increment_field_value(next_date, - fieldnum - 1) + next_date, fieldnum = self._increment_field_value( + next_date, fieldnum - 1) elif next_value > curr_value: # A valid, but higher than the starting value, was found if field.REAL: - next_date = self._set_field_value(next_date, fieldnum, - next_value) + next_date = self._set_field_value( + next_date, fieldnum, next_value) fieldnum += 1 else: - next_date, fieldnum = self._increment_field_value(next_date, - fieldnum) + next_date, fieldnum = self._increment_field_value( + next_date, fieldnum) else: # A valid value was found, no changes necessary fieldnum += 1 diff --git a/libs/apscheduler/triggers/cron/expressions.py b/libs/apscheduler/triggers/cron/expressions.py index 018c7a3..b5d2919 100644 --- a/libs/apscheduler/triggers/cron/expressions.py +++ b/libs/apscheduler/triggers/cron/expressions.py @@ -8,7 +8,7 @@ import re from apscheduler.util import asint __all__ = ('AllExpression', 'RangeExpression', 'WeekdayRangeExpression', - 'WeekdayPositionExpression') + 'WeekdayPositionExpression', 'LastDayOfMonthExpression') WEEKDAYS = ['mon', 'tue', 'wed', 'thu', 'fri', 'sat', 'sun'] @@ -176,3 +176,19 @@ class WeekdayPositionExpression(AllExpression): return "%s('%s', '%s')" % (self.__class__.__name__, self.options[self.option_num], WEEKDAYS[self.weekday]) + + +class LastDayOfMonthExpression(AllExpression): + value_re = re.compile(r'last', re.IGNORECASE) + + def __init__(self): + pass + + def get_next_value(self, date, field): + return monthrange(date.year, date.month)[1] + + def __str__(self): + return 'last' + + def __repr__(self): + return "%s()" % self.__class__.__name__ diff --git a/libs/apscheduler/triggers/cron/fields.py b/libs/apscheduler/triggers/cron/fields.py index ef970cc..be5e5e3 100644 --- a/libs/apscheduler/triggers/cron/fields.py +++ b/libs/apscheduler/triggers/cron/fields.py @@ -85,7 +85,8 @@ class WeekField(BaseField): class DayOfMonthField(BaseField): - COMPILERS = BaseField.COMPILERS + [WeekdayPositionExpression] + COMPILERS = BaseField.COMPILERS + [WeekdayPositionExpression, + LastDayOfMonthExpression] def get_max(self, dateval): return monthrange(dateval.year, dateval.month)[1] diff --git a/libs/apscheduler/util.py b/libs/apscheduler/util.py index a49aaed..dcede4c 100644 --- a/libs/apscheduler/util.py +++ b/libs/apscheduler/util.py @@ -6,7 +6,6 @@ from datetime import date, datetime, timedelta from time import mktime import re import sys -from types import MethodType __all__ = ('asint', 'asbool', 'convert_to_datetime', 'timedelta_seconds', 'time_difference', 'datetime_ceil', 'combine_opts', @@ -64,7 +63,7 @@ def convert_to_datetime(input): return input elif isinstance(input, date): return datetime.fromordinal(input.toordinal()) - elif isinstance(input, str): + elif isinstance(input, basestring): m = _DATE_REGEX.match(input) if not m: raise ValueError('Invalid date string') @@ -109,7 +108,7 @@ def datetime_ceil(dateval): """ if dateval.microsecond > 0: return dateval + timedelta(seconds=1, - microseconds= -dateval.microsecond) + microseconds=-dateval.microsecond) return dateval @@ -143,7 +142,8 @@ def get_callable_name(func): if f_self and hasattr(func, '__name__'): if isinstance(f_self, type): # class method - return '%s.%s' % (f_self.__name__, func.__name__) + clsname = getattr(f_self, '__qualname__', None) or f_self.__name__ + return '%s.%s' % (clsname, func.__name__) # bound method return '%s.%s' % (f_self.__class__.__name__, func.__name__) @@ -169,7 +169,7 @@ def obj_to_ref(obj): raise ValueError except Exception: raise ValueError('Cannot determine the reference to %s' % repr(obj)) - + return ref