Browse Source

Update scheduler module

pull/2499/merge
Ruud 12 years ago
parent
commit
b8f78e311d
  1. 2
      couchpotato/core/_base/scheduler/main.py
  2. 4
      libs/apscheduler/__init__.py
  3. 31
      libs/apscheduler/job.py
  4. 91
      libs/apscheduler/jobstores/redis_store.py
  5. 5
      libs/apscheduler/jobstores/shelve_store.py
  6. 18
      libs/apscheduler/jobstores/sqlalchemy_store.py
  7. 60
      libs/apscheduler/scheduler.py
  8. 16
      libs/apscheduler/triggers/cron/__init__.py
  9. 18
      libs/apscheduler/triggers/cron/expressions.py
  10. 3
      libs/apscheduler/triggers/cron/fields.py
  11. 8
      libs/apscheduler/util.py

2
couchpotato/core/_base/scheduler/main.py

@ -31,8 +31,8 @@ class Scheduler(Plugin):
pass pass
def doShutdown(self): def doShutdown(self):
super(Scheduler, self).doShutdown()
self.stop() self.stop()
return super(Scheduler, self).doShutdown()
def stop(self): def stop(self):
if self.started: if self.started:

4
libs/apscheduler/__init__.py

@ -1,3 +1,3 @@
version_info = (2, 0, 2) version_info = (2, 1, 1)
version = '.'.join(str(n) for n in version_info[:3]) version = '.'.join(str(n) for n in version_info[:3])
release = version + ''.join(str(n) for n in version_info[3:]) release = '.'.join(str(n) for n in version_info)

31
libs/apscheduler/job.py

@ -16,22 +16,25 @@ class MaxInstancesReachedError(Exception):
class Job(object): class Job(object):
""" """
Encapsulates the actual Job along with its metadata. Job instances Encapsulates the actual Job along with its metadata. Job instances
are created by the scheduler when adding jobs, and it should not be are created by the scheduler when adding jobs, and should not be
directly instantiated. directly instantiated. These options can be set when adding jobs
to the scheduler (see :ref:`job_options`).
:param trigger: trigger that determines the execution times
:param func: callable to call when the trigger is triggered :var trigger: trigger that determines the execution times
:param args: list of positional arguments to call func with :var func: callable to call when the trigger is triggered
:param kwargs: dict of keyword arguments to call func with :var args: list of positional arguments to call func with
:param name: name of the job (optional) :var kwargs: dict of keyword arguments to call func with
:param misfire_grace_time: seconds after the designated run time that :var name: name of the job
:var misfire_grace_time: seconds after the designated run time that
the job is still allowed to be run the job is still allowed to be run
:param coalesce: run once instead of many times if the scheduler determines :var coalesce: run once instead of many times if the scheduler determines
that the job should be run more than once in succession that the job should be run more than once in succession
:param max_runs: maximum number of times this job is allowed to be :var max_runs: maximum number of times this job is allowed to be
triggered triggered
:param max_instances: maximum number of concurrently running :var max_instances: maximum number of concurrently running
instances allowed for this job instances allowed for this job
:var runs: number of times this job has been triggered
:var instances: number of concurrently running instances of this job
""" """
id = None id = None
next_run_time = None next_run_time = None
@ -130,5 +133,5 @@ class Job(object):
return '<Job (name=%s, trigger=%s)>' % (self.name, repr(self.trigger)) return '<Job (name=%s, trigger=%s)>' % (self.name, repr(self.trigger))
def __str__(self): def __str__(self):
return '%s (trigger: %s, next run at: %s)' % (self.name, return '%s (trigger: %s, next run at: %s)' % (
str(self.trigger), str(self.next_run_time)) self.name, str(self.trigger), str(self.next_run_time))

91
libs/apscheduler/jobstores/redis_store.py

@ -0,0 +1,91 @@
"""
Stores jobs in a Redis database.
"""
from uuid import uuid4
from datetime import datetime
import logging
from apscheduler.jobstores.base import JobStore
from apscheduler.job import Job
try:
import cPickle as pickle
except ImportError: # pragma: nocover
import pickle
try:
from redis import StrictRedis
except ImportError: # pragma: nocover
raise ImportError('RedisJobStore requires redis installed')
try:
long = long
except NameError:
long = int
logger = logging.getLogger(__name__)
class RedisJobStore(JobStore):
def __init__(self, db=0, key_prefix='jobs.',
pickle_protocol=pickle.HIGHEST_PROTOCOL, **connect_args):
self.jobs = []
self.pickle_protocol = pickle_protocol
self.key_prefix = key_prefix
if db is None:
raise ValueError('The "db" parameter must not be empty')
if not key_prefix:
raise ValueError('The "key_prefix" parameter must not be empty')
self.redis = StrictRedis(db=db, **connect_args)
def add_job(self, job):
job.id = str(uuid4())
job_state = job.__getstate__()
job_dict = {
'job_state': pickle.dumps(job_state, self.pickle_protocol),
'runs': '0',
'next_run_time': job_state.pop('next_run_time').isoformat()}
self.redis.hmset(self.key_prefix + job.id, job_dict)
self.jobs.append(job)
def remove_job(self, job):
self.redis.delete(self.key_prefix + job.id)
self.jobs.remove(job)
def load_jobs(self):
jobs = []
keys = self.redis.keys(self.key_prefix + '*')
pipeline = self.redis.pipeline()
for key in keys:
pipeline.hgetall(key)
results = pipeline.execute()
for job_dict in results:
job_state = {}
try:
job = Job.__new__(Job)
job_state = pickle.loads(job_dict['job_state'.encode()])
job_state['runs'] = long(job_dict['runs'.encode()])
dateval = job_dict['next_run_time'.encode()].decode()
job_state['next_run_time'] = datetime.strptime(
dateval, '%Y-%m-%dT%H:%M:%S')
job.__setstate__(job_state)
jobs.append(job)
except Exception:
job_name = job_state.get('name', '(unknown)')
logger.exception('Unable to restore job "%s"', job_name)
self.jobs = jobs
def update_job(self, job):
attrs = {
'next_run_time': job.next_run_time.isoformat(),
'runs': job.runs}
self.redis.hmset(self.key_prefix + job.id, attrs)
def close(self):
self.redis.connection_pool.disconnect()
def __repr__(self):
return '<%s>' % self.__class__.__name__

5
libs/apscheduler/jobstores/shelve_store.py

@ -32,17 +32,20 @@ class ShelveJobStore(JobStore):
def add_job(self, job): def add_job(self, job):
job.id = self._generate_id() job.id = self._generate_id()
self.jobs.append(job)
self.store[job.id] = job.__getstate__() self.store[job.id] = job.__getstate__()
self.store.sync()
self.jobs.append(job)
def update_job(self, job): def update_job(self, job):
job_dict = self.store[job.id] job_dict = self.store[job.id]
job_dict['next_run_time'] = job.next_run_time job_dict['next_run_time'] = job.next_run_time
job_dict['runs'] = job.runs job_dict['runs'] = job.runs
self.store[job.id] = job_dict self.store[job.id] = job_dict
self.store.sync()
def remove_job(self, job): def remove_job(self, job):
del self.store[job.id] del self.store[job.id]
self.store.sync()
self.jobs.remove(job) self.jobs.remove(job)
def load_jobs(self): def load_jobs(self):

18
libs/apscheduler/jobstores/sqlalchemy_store.py

@ -4,6 +4,8 @@ Stores jobs in a database table using SQLAlchemy.
import pickle import pickle
import logging import logging
import sqlalchemy
from apscheduler.jobstores.base import JobStore from apscheduler.jobstores.base import JobStore
from apscheduler.job import Job from apscheduler.job import Job
@ -28,17 +30,19 @@ class SQLAlchemyJobStore(JobStore):
else: else:
raise ValueError('Need either "engine" or "url" defined') raise ValueError('Need either "engine" or "url" defined')
self.jobs_t = Table(tablename, metadata or MetaData(), if sqlalchemy.__version__ < '0.7':
pickle_coltype = PickleType(pickle_protocol, mutable=False)
else:
pickle_coltype = PickleType(pickle_protocol)
self.jobs_t = Table(
tablename, metadata or MetaData(),
Column('id', Integer, Column('id', Integer,
Sequence(tablename + '_id_seq', optional=True), Sequence(tablename + '_id_seq', optional=True),
primary_key=True), primary_key=True),
Column('trigger', PickleType(pickle_protocol, mutable=False), Column('trigger', pickle_coltype, nullable=False),
nullable=False),
Column('func_ref', String(1024), nullable=False), Column('func_ref', String(1024), nullable=False),
Column('args', PickleType(pickle_protocol, mutable=False), Column('args', pickle_coltype, nullable=False),
nullable=False), Column('kwargs', pickle_coltype, nullable=False),
Column('kwargs', PickleType(pickle_protocol, mutable=False),
nullable=False),
Column('name', Unicode(1024)), Column('name', Unicode(1024)),
Column('misfire_grace_time', Integer, nullable=False), Column('misfire_grace_time', Integer, nullable=False),
Column('coalesce', Boolean, nullable=False), Column('coalesce', Boolean, nullable=False),

60
libs/apscheduler/scheduler.py

@ -35,7 +35,7 @@ class Scheduler(object):
their execution. their execution.
""" """
_stopped = False _stopped = True
_thread = None _thread = None
def __init__(self, gconfig={}, **options): def __init__(self, gconfig={}, **options):
@ -60,6 +60,7 @@ class Scheduler(object):
self.misfire_grace_time = int(config.pop('misfire_grace_time', 1)) self.misfire_grace_time = int(config.pop('misfire_grace_time', 1))
self.coalesce = asbool(config.pop('coalesce', True)) self.coalesce = asbool(config.pop('coalesce', True))
self.daemonic = asbool(config.pop('daemonic', True)) self.daemonic = asbool(config.pop('daemonic', True))
self.standalone = asbool(config.pop('standalone', False))
# Configure the thread pool # Configure the thread pool
if 'threadpool' in config: if 'threadpool' in config:
@ -85,6 +86,12 @@ class Scheduler(object):
def start(self): def start(self):
""" """
Starts the scheduler in a new thread. Starts the scheduler in a new thread.
In threaded mode (the default), this method will return immediately
after starting the scheduler thread.
In standalone mode, this method will block until there are no more
scheduled jobs.
""" """
if self.running: if self.running:
raise SchedulerAlreadyRunningError raise SchedulerAlreadyRunningError
@ -99,11 +106,15 @@ class Scheduler(object):
del self._pending_jobs[:] del self._pending_jobs[:]
self._stopped = False self._stopped = False
if self.standalone:
self._main_loop()
else:
self._thread = Thread(target=self._main_loop, name='APScheduler') self._thread = Thread(target=self._main_loop, name='APScheduler')
self._thread.setDaemon(self.daemonic) self._thread.setDaemon(self.daemonic)
self._thread.start() self._thread.start()
def shutdown(self, wait=True, shutdown_threadpool=True): def shutdown(self, wait=True, shutdown_threadpool=True,
close_jobstores=True):
""" """
Shuts down the scheduler and terminates the thread. Shuts down the scheduler and terminates the thread.
Does not interrupt any currently running jobs. Does not interrupt any currently running jobs.
@ -111,6 +122,7 @@ class Scheduler(object):
:param wait: ``True`` to wait until all currently executing jobs have :param wait: ``True`` to wait until all currently executing jobs have
finished (if ``shutdown_threadpool`` is also ``True``) finished (if ``shutdown_threadpool`` is also ``True``)
:param shutdown_threadpool: ``True`` to shut down the thread pool :param shutdown_threadpool: ``True`` to shut down the thread pool
:param close_jobstores: ``True`` to close all job stores after shutdown
""" """
if not self.running: if not self.running:
return return
@ -123,11 +135,19 @@ class Scheduler(object):
self._threadpool.shutdown(wait) self._threadpool.shutdown(wait)
# Wait until the scheduler thread terminates # Wait until the scheduler thread terminates
if self._thread:
self._thread.join() self._thread.join()
# Close all job stores
if close_jobstores:
for jobstore in itervalues(self._jobstores):
jobstore.close()
@property @property
def running(self): def running(self):
return not self._stopped and self._thread and self._thread.isAlive() thread_alive = self._thread and self._thread.isAlive()
standalone = getattr(self, 'standalone', False)
return not self._stopped and (standalone or thread_alive)
def add_jobstore(self, jobstore, alias, quiet=False): def add_jobstore(self, jobstore, alias, quiet=False):
""" """
@ -156,21 +176,25 @@ class Scheduler(object):
if not quiet: if not quiet:
self._wakeup.set() self._wakeup.set()
def remove_jobstore(self, alias): def remove_jobstore(self, alias, close=True):
""" """
Removes the job store by the given alias from this scheduler. Removes the job store by the given alias from this scheduler.
:param close: ``True`` to close the job store after removing it
:type alias: str :type alias: str
""" """
self._jobstores_lock.acquire() self._jobstores_lock.acquire()
try: try:
try: jobstore = self._jobstores.pop(alias)
del self._jobstores[alias] if not jobstore:
except KeyError:
raise KeyError('No such job store: %s' % alias) raise KeyError('No such job store: %s' % alias)
finally: finally:
self._jobstores_lock.release() self._jobstores_lock.release()
# Close the job store if requested
if close:
jobstore.close()
# Notify listeners that a job store has been removed # Notify listeners that a job store has been removed
self._notify_listeners(JobStoreEvent(EVENT_JOBSTORE_REMOVED, alias)) self._notify_listeners(JobStoreEvent(EVENT_JOBSTORE_REMOVED, alias))
@ -245,8 +269,10 @@ class Scheduler(object):
**options): **options):
""" """
Adds the given job to the job list and notifies the scheduler thread. Adds the given job to the job list and notifies the scheduler thread.
Any extra keyword arguments are passed along to the constructor of the
:class:`~apscheduler.job.Job` class (see :ref:`job_options`).
:param trigger: alias of the job store to store the job in :param trigger: trigger that determines when ``func`` is called
:param func: callable to run at the given time :param func: callable to run at the given time
:param args: list of positional arguments to call func with :param args: list of positional arguments to call func with
:param kwargs: dict of keyword arguments to call func with :param kwargs: dict of keyword arguments to call func with
@ -276,6 +302,8 @@ class Scheduler(object):
def add_date_job(self, func, date, args=None, kwargs=None, **options): def add_date_job(self, func, date, args=None, kwargs=None, **options):
""" """
Schedules a job to be completed on a specific date and time. Schedules a job to be completed on a specific date and time.
Any extra keyword arguments are passed along to the constructor of the
:class:`~apscheduler.job.Job` class (see :ref:`job_options`).
:param func: callable to run at the given time :param func: callable to run at the given time
:param date: the date/time to run the job at :param date: the date/time to run the job at
@ -294,6 +322,8 @@ class Scheduler(object):
**options): **options):
""" """
Schedules a job to be completed on specified intervals. Schedules a job to be completed on specified intervals.
Any extra keyword arguments are passed along to the constructor of the
:class:`~apscheduler.job.Job` class (see :ref:`job_options`).
:param func: callable to run :param func: callable to run
:param weeks: number of weeks to wait :param weeks: number of weeks to wait
@ -322,6 +352,8 @@ class Scheduler(object):
""" """
Schedules a job to be completed on times that match the given Schedules a job to be completed on times that match the given
expressions. expressions.
Any extra keyword arguments are passed along to the constructor of the
:class:`~apscheduler.job.Job` class (see :ref:`job_options`).
:param func: callable to run :param func: callable to run
:param year: year to run on :param year: year to run on
@ -352,6 +384,8 @@ class Scheduler(object):
This decorator does not wrap its host function. This decorator does not wrap its host function.
Unscheduling decorated functions is possible by passing the ``job`` Unscheduling decorated functions is possible by passing the ``job``
attribute of the scheduled function to :meth:`unschedule_job`. attribute of the scheduled function to :meth:`unschedule_job`.
Any extra keyword arguments are passed along to the constructor of the
:class:`~apscheduler.job.Job` class (see :ref:`job_options`).
""" """
def inner(func): def inner(func):
func.job = self.add_cron_job(func, **options) func.job = self.add_cron_job(func, **options)
@ -364,6 +398,8 @@ class Scheduler(object):
This decorator does not wrap its host function. This decorator does not wrap its host function.
Unscheduling decorated functions is possible by passing the ``job`` Unscheduling decorated functions is possible by passing the ``job``
attribute of the scheduled function to :meth:`unschedule_job`. attribute of the scheduled function to :meth:`unschedule_job`.
Any extra keyword arguments are passed along to the constructor of the
:class:`~apscheduler.job.Job` class (see :ref:`job_options`).
""" """
def inner(func): def inner(func):
func.job = self.add_interval_job(func, **options) func.job = self.add_interval_job(func, **options)
@ -517,7 +553,8 @@ class Scheduler(object):
job.runs += len(run_times) job.runs += len(run_times)
# Update the job, but don't keep finished jobs around # Update the job, but don't keep finished jobs around
if job.compute_next_run_time(now + timedelta(microseconds=1)): if job.compute_next_run_time(
now + timedelta(microseconds=1)):
jobstore.update_job(job) jobstore.update_job(job)
else: else:
self._remove_job(job, alias, jobstore) self._remove_job(job, alias, jobstore)
@ -550,6 +587,11 @@ class Scheduler(object):
logger.debug('Next wakeup is due at %s (in %f seconds)', logger.debug('Next wakeup is due at %s (in %f seconds)',
next_wakeup_time, wait_seconds) next_wakeup_time, wait_seconds)
self._wakeup.wait(wait_seconds) self._wakeup.wait(wait_seconds)
self._wakeup.clear()
elif self.standalone:
logger.debug('No jobs left; shutting down scheduler')
self.shutdown()
break
else: else:
logger.debug('No jobs; waiting until a job is added') logger.debug('No jobs; waiting until a job is added')
self._wakeup.wait() self._wakeup.wait()

16
libs/apscheduler/triggers/cron/__init__.py

@ -21,8 +21,10 @@ class CronTrigger(object):
if self.start_date: if self.start_date:
self.start_date = convert_to_datetime(self.start_date) self.start_date = convert_to_datetime(self.start_date)
# Yank out all None valued fields # Check field names and yank out all None valued fields
for key, value in list(iteritems(values)): for key, value in list(iteritems(values)):
if key not in self.FIELD_NAMES:
raise TypeError('Invalid field name: %s' % key)
if value is None: if value is None:
del values[key] del values[key]
@ -111,17 +113,17 @@ class CronTrigger(object):
if next_value is None: if next_value is None:
# No valid value was found # No valid value was found
next_date, fieldnum = self._increment_field_value(next_date, next_date, fieldnum = self._increment_field_value(
fieldnum - 1) next_date, fieldnum - 1)
elif next_value > curr_value: elif next_value > curr_value:
# A valid, but higher than the starting value, was found # A valid, but higher than the starting value, was found
if field.REAL: if field.REAL:
next_date = self._set_field_value(next_date, fieldnum, next_date = self._set_field_value(
next_value) next_date, fieldnum, next_value)
fieldnum += 1 fieldnum += 1
else: else:
next_date, fieldnum = self._increment_field_value(next_date, next_date, fieldnum = self._increment_field_value(
fieldnum) next_date, fieldnum)
else: else:
# A valid value was found, no changes necessary # A valid value was found, no changes necessary
fieldnum += 1 fieldnum += 1

18
libs/apscheduler/triggers/cron/expressions.py

@ -8,7 +8,7 @@ import re
from apscheduler.util import asint from apscheduler.util import asint
__all__ = ('AllExpression', 'RangeExpression', 'WeekdayRangeExpression', __all__ = ('AllExpression', 'RangeExpression', 'WeekdayRangeExpression',
'WeekdayPositionExpression') 'WeekdayPositionExpression', 'LastDayOfMonthExpression')
WEEKDAYS = ['mon', 'tue', 'wed', 'thu', 'fri', 'sat', 'sun'] WEEKDAYS = ['mon', 'tue', 'wed', 'thu', 'fri', 'sat', 'sun']
@ -176,3 +176,19 @@ class WeekdayPositionExpression(AllExpression):
return "%s('%s', '%s')" % (self.__class__.__name__, return "%s('%s', '%s')" % (self.__class__.__name__,
self.options[self.option_num], self.options[self.option_num],
WEEKDAYS[self.weekday]) WEEKDAYS[self.weekday])
class LastDayOfMonthExpression(AllExpression):
value_re = re.compile(r'last', re.IGNORECASE)
def __init__(self):
pass
def get_next_value(self, date, field):
return monthrange(date.year, date.month)[1]
def __str__(self):
return 'last'
def __repr__(self):
return "%s()" % self.__class__.__name__

3
libs/apscheduler/triggers/cron/fields.py

@ -85,7 +85,8 @@ class WeekField(BaseField):
class DayOfMonthField(BaseField): class DayOfMonthField(BaseField):
COMPILERS = BaseField.COMPILERS + [WeekdayPositionExpression] COMPILERS = BaseField.COMPILERS + [WeekdayPositionExpression,
LastDayOfMonthExpression]
def get_max(self, dateval): def get_max(self, dateval):
return monthrange(dateval.year, dateval.month)[1] return monthrange(dateval.year, dateval.month)[1]

8
libs/apscheduler/util.py

@ -6,7 +6,6 @@ from datetime import date, datetime, timedelta
from time import mktime from time import mktime
import re import re
import sys import sys
from types import MethodType
__all__ = ('asint', 'asbool', 'convert_to_datetime', 'timedelta_seconds', __all__ = ('asint', 'asbool', 'convert_to_datetime', 'timedelta_seconds',
'time_difference', 'datetime_ceil', 'combine_opts', 'time_difference', 'datetime_ceil', 'combine_opts',
@ -64,7 +63,7 @@ def convert_to_datetime(input):
return input return input
elif isinstance(input, date): elif isinstance(input, date):
return datetime.fromordinal(input.toordinal()) return datetime.fromordinal(input.toordinal())
elif isinstance(input, str): elif isinstance(input, basestring):
m = _DATE_REGEX.match(input) m = _DATE_REGEX.match(input)
if not m: if not m:
raise ValueError('Invalid date string') raise ValueError('Invalid date string')
@ -109,7 +108,7 @@ def datetime_ceil(dateval):
""" """
if dateval.microsecond > 0: if dateval.microsecond > 0:
return dateval + timedelta(seconds=1, return dateval + timedelta(seconds=1,
microseconds= -dateval.microsecond) microseconds=-dateval.microsecond)
return dateval return dateval
@ -143,7 +142,8 @@ def get_callable_name(func):
if f_self and hasattr(func, '__name__'): if f_self and hasattr(func, '__name__'):
if isinstance(f_self, type): if isinstance(f_self, type):
# class method # class method
return '%s.%s' % (f_self.__name__, func.__name__) clsname = getattr(f_self, '__qualname__', None) or f_self.__name__
return '%s.%s' % (clsname, func.__name__)
# bound method # bound method
return '%s.%s' % (f_self.__class__.__name__, func.__name__) return '%s.%s' % (f_self.__class__.__name__, func.__name__)

Loading…
Cancel
Save