You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

381 lines
13 KiB

# axel.py
#
# Copyright (C) 2010 Adrian Cristea adrian dot cristea at gmail dotcom
# Edits by Ruud Burger
#
# Based on an idea by Peter Thatcher, found on
# http://www.valuedlessons.com/2008/04/events-in-python.html
#
# This module is part of Axel and is released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php
#
# Source: http://pypi.python.org/pypi/axel
# Docs: http://packages.python.org/axel
from Queue import Empty, Queue
import hashlib
import sys
import threading
from couchpotato.core.helpers.variable import natsortKey
class Event(object):
"""
Event object inspired by C# events. Handlers can be registered and
unregistered using += and -= operators. Execution and result are
influenced by the arguments passed to the constructor and += method.
from axel import Event
event = Event()
def on_event(*args, **kwargs):
return (args, kwargs)
event += on_event # handler registration
print(event(10, 20, y=30))
>> ((True, ((10, 20), {'y': 30}), <function on_event at 0x00BAA270>),)
event -= on_event # handler is unregistered
print(event(10, 20, y=30))
>> None
class Mouse(object):
def __init__(self):
self.click = Event(self)
self.click += self.on_click # handler registration
def on_click(self, sender, *args, **kwargs):
assert isinstance(sender, Mouse), 'Wrong sender'
return (args, kwargs)
mouse = Mouse()
print(mouse.click(10, 20))
>> ((True, ((10, 20), {}),
>> <bound method Mouse.on_click of <__main__.Mouse object at 0x00B6F470>>),)
mouse.click -= mouse.on_click # handler is unregistered
print(mouse.click(10, 20))
>> None
"""
def __init__(self, name = None, sender = None, asynch = False, exc_info = False,
lock = None, threads = 3, traceback = False):
""" Creates an event
asynch
if True handler's are executes asynchronous
exc_info
if True, result will contain sys.exc_info()[:2] on error
lock
threading.RLock used to synchronize execution
sender
event's sender. The sender is passed as the first argument to the
handler, only if is not None. For this case the handler must have
a placeholder in the arguments to receive the sender
threads
maximum number of threads that will be started
traceback
if True, the execution result will contain sys.exc_info()
on error. exc_info must be also True to get the traceback
hash = self.hash(handler)
Handlers are stored in a dictionary that has as keys the handler's hash
handlers = {
hash : (handler, memoize, timeout),
hash : (handler, memoize, timeout), ...
}
The execution result is cached using the following structure
memoize = {
hash : ((args, kwargs, result), (args, kwargs, result), ...),
hash : ((args, kwargs, result), ...), ...
}
The execution result is returned as a tuple having this structure
exec_result = (
(True, result, handler), # on success
(False, error_info, handler), # on error
(None, None, handler), ... # asynchronous execution
)
"""
self.name = name
self.asynchronous = asynch
self.exc_info = exc_info
self.lock = lock
self.sender = sender
self.threads = threads
self.traceback = traceback
self.handlers = {}
self.memoize = {}
def hash(self, handler):
return hashlib.md5(str(handler)).hexdigest()
def handle(self, handler, priority = 0):
""" Registers a handler. The handler can be transmitted together
with two arguments as a list or dictionary. The arguments are:
memoize
if True, the execution result will be cached in self.memoize
timeout
will allocate a predefined time interval for the execution
If arguments are provided as a list, they are considered to have
this sequence: (handler, memoize, timeout)
Examples:
event += handler
event += (handler, True, 1.5)
event += {'handler':handler, 'memoize':True, 'timeout':1.5}
"""
handler_, memoize, timeout = self._extract(handler)
self.handlers['%s.%s' % (priority, self.hash(handler_))] = (handler_, memoize, timeout)
return self
def unhandle(self, handler):
""" Unregisters a handler """
handler_, memoize, timeout = self._extract(handler)
key = self.hash(handler_)
if not key in self.handlers:
raise ValueError('Handler "%s" was not found' % str(handler_))
del self.handlers[key]
return self
def fire(self, *args, **kwargs):
""" Stores all registered handlers in a queue for processing """
self.queue = Queue()
result = {}
if self.handlers:
max_threads = 1 if kwargs.get('event_order_lock') else self._threads()
# Set global result
def add_to(key, value):
result[key] = value
kwargs['event_add_to_result'] = add_to
for i in range(max_threads):
t = threading.Thread(target = self._execute,
args = args, kwargs = kwargs)
t.daemon = True
t.start()
handler_keys = self.handlers.keys()
handler_keys.sort(key = natsortKey)
for handler in handler_keys:
self.queue.put(handler)
if self.asynchronous:
handler_, memoize, timeout = self.handlers[handler]
result[handler] = (None, None, handler_)
if not self.asynchronous:
self.queue.join()
return result
def count(self):
""" Returns the count of registered handlers """
return len(self.handlers)
def clear(self):
""" Discards all registered handlers and cached results """
self.handlers.clear()
self.memoize.clear()
def _execute(self, *args, **kwargs):
# Remove get and set from kwargs
add_to_result = kwargs.get('event_add_to_result')
del kwargs['event_add_to_result']
# Get and remove order lock
order_lock = kwargs.get('event_order_lock')
try: del kwargs['event_order_lock']
except: pass
# Get and remove return on first
return_on_result = kwargs.get('event_return_on_result')
try: del kwargs['event_return_on_result']
except: pass
got_results = False
""" Executes all handlers stored in the queue """
while True:
try:
h_ = self.queue.get(timeout = 2)
handler, memoize, timeout = self.handlers[h_]
if return_on_result and got_results:
if not self.asynchronous:
self.queue.task_done()
continue
if order_lock:
order_lock.acquire()
try:
r = self._memoize(memoize, timeout, handler, *args, **kwargs)
if not self.asynchronous:
if not return_on_result or (return_on_result and r[1] is not None):
add_to_result(h_, tuple(r))
got_results = True
except Exception:
if not self.asynchronous:
add_to_result(h_, (False, self._error(sys.exc_info()),
handler))
else:
self.error_handler(sys.exc_info())
finally:
if not self.asynchronous:
self.queue.task_done()
if order_lock:
order_lock.release()
if self.queue.empty():
raise Empty
except Empty:
break
def _extract(self, queue_item):
""" Extracts a handler and handler's arguments that can be provided
as list or dictionary. If arguments are provided as list, they are
considered to have this sequence: (handler, memoize, timeout)
Examples:
event += handler
event += (handler, True, 1.5)
event += {'handler':handler, 'memoize':True, 'timeout':1.5}
"""
assert queue_item, 'Invalid list of arguments'
handler = None
memoize = False
timeout = 0
if not isinstance(queue_item, (list, tuple, dict)):
handler = queue_item
elif isinstance(queue_item, (list, tuple)):
if len(queue_item) == 3:
handler, memoize, timeout = queue_item
elif len(queue_item) == 2:
handler, memoize, = queue_item
elif len(queue_item) == 1:
handler = queue_item
elif isinstance(queue_item, dict):
handler = queue_item.get('handler')
memoize = queue_item.get('memoize', False)
timeout = queue_item.get('timeout', 0)
return (handler, bool(memoize), float(timeout))
def _memoize(self, memoize, timeout, handler, *args, **kwargs):
""" Caches the execution result of successful executions
hash = self.hash(handler)
memoize = {
hash : ((args, kwargs, result), (args, kwargs, result), ...),
hash : ((args, kwargs, result), ...), ...
}
"""
if not isinstance(handler, Event) and self.sender is not None:
args = list(args)[:]
args.insert(0, self.sender)
if not memoize:
if timeout <= 0: #no time restriction
result = [True, handler(*args, **kwargs), handler]
return result
result = self._timeout(timeout, handler, *args, **kwargs)
if isinstance(result, tuple) and len(result) == 3:
if isinstance(result[1], Exception): #error occurred
return [False, self._error(result), handler]
return [True, result, handler]
else:
hash_ = self.hash(handler)
if hash_ in self.memoize:
for args_, kwargs_, result in self.memoize[hash_]:
if args_ == args and kwargs_ == kwargs:
return [True, result, handler]
if timeout <= 0: #no time restriction
result = handler(*args, **kwargs)
else:
result = self._timeout(timeout, handler, *args, **kwargs)
if isinstance(result, tuple) and len(result) == 3:
if isinstance(result[1], Exception): #error occurred
return [False, self._error(result), handler]
lock = threading.RLock()
lock.acquire()
try:
if hash_ not in self.memoize:
self.memoize[hash_] = []
self.memoize[hash_].append((args, kwargs, result))
return [True, result, handler]
finally:
lock.release()
def _timeout(self, timeout, handler, *args, **kwargs):
""" Controls the time allocated for the execution of a method """
t = spawn_thread(target = handler, args = args, kwargs = kwargs)
t.daemon = True
t.start()
t.join(timeout)
if not t.is_alive():
if t.exc_info:
return t.exc_info
return t.result
else:
try:
msg = '[%s] Execution was forcefully terminated'
raise RuntimeError(msg % t.name)
except:
return sys.exc_info()
def _threads(self):
""" Calculates maximum number of threads that will be started """
if self.threads < len(self.handlers):
return self.threads
return len(self.handlers)
def _error(self, exc_info):
""" Retrieves the error info """
if self.exc_info:
if self.traceback:
return exc_info
return exc_info[:2]
return exc_info[1]
__iadd__ = handle
__isub__ = unhandle
__call__ = fire
__len__ = count
class spawn_thread(threading.Thread):
""" Spawns a new thread and returns the execution result """
def __init__(self, target, args = (), kwargs = {}, default = None):
threading.Thread.__init__(self)
self._target = target
self._args = args
self._kwargs = kwargs
self.result = default
self.exc_info = None
def run(self):
try:
self.result = self._target(*self._args, **self._kwargs)
except:
self.exc_info = sys.exc_info()
finally:
del self._target, self._args, self._kwargs