You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

91 lines
2.7 KiB

"""
Stores jobs in a Redis database.
"""
from uuid import uuid4
from datetime import datetime
import logging
from apscheduler.jobstores.base import JobStore
from apscheduler.job import Job
try:
import cPickle as pickle
except ImportError: # pragma: nocover
import pickle
try:
from redis import StrictRedis
except ImportError: # pragma: nocover
raise ImportError('RedisJobStore requires redis installed')
try:
long = long
except NameError:
long = int
logger = logging.getLogger(__name__)
class RedisJobStore(JobStore):
def __init__(self, db=0, key_prefix='jobs.',
pickle_protocol=pickle.HIGHEST_PROTOCOL, **connect_args):
self.jobs = []
self.pickle_protocol = pickle_protocol
self.key_prefix = key_prefix
if db is None:
raise ValueError('The "db" parameter must not be empty')
if not key_prefix:
raise ValueError('The "key_prefix" parameter must not be empty')
self.redis = StrictRedis(db=db, **connect_args)
def add_job(self, job):
job.id = str(uuid4())
job_state = job.__getstate__()
job_dict = {
'job_state': pickle.dumps(job_state, self.pickle_protocol),
'runs': '0',
'next_run_time': job_state.pop('next_run_time').isoformat()}
self.redis.hmset(self.key_prefix + job.id, job_dict)
self.jobs.append(job)
def remove_job(self, job):
self.redis.delete(self.key_prefix + job.id)
self.jobs.remove(job)
def load_jobs(self):
jobs = []
keys = self.redis.keys(self.key_prefix + '*')
pipeline = self.redis.pipeline()
for key in keys:
pipeline.hgetall(key)
results = pipeline.execute()
for job_dict in results:
job_state = {}
try:
job = Job.__new__(Job)
job_state = pickle.loads(job_dict['job_state'.encode()])
job_state['runs'] = long(job_dict['runs'.encode()])
dateval = job_dict['next_run_time'.encode()].decode()
job_state['next_run_time'] = datetime.strptime(
dateval, '%Y-%m-%dT%H:%M:%S')
job.__setstate__(job_state)
jobs.append(job)
except Exception:
job_name = job_state.get('name', '(unknown)')
logger.exception('Unable to restore job "%s"', job_name)
self.jobs = jobs
def update_job(self, job):
attrs = {
'next_run_time': job.next_run_time.isoformat(),
'runs': job.runs}
self.redis.hmset(self.key_prefix + job.id, attrs)
def close(self):
self.redis.connection_pool.disconnect()
def __repr__(self):
return '<%s>' % self.__class__.__name__