You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
381 lines
13 KiB
381 lines
13 KiB
# axel.py
|
|
#
|
|
# Copyright (C) 2010 Adrian Cristea adrian dot cristea at gmail dotcom
|
|
# Edits by Ruud Burger
|
|
#
|
|
# Based on an idea by Peter Thatcher, found on
|
|
# http://www.valuedlessons.com/2008/04/events-in-python.html
|
|
#
|
|
# This module is part of Axel and is released under
|
|
# the MIT License: http://www.opensource.org/licenses/mit-license.php
|
|
#
|
|
# Source: http://pypi.python.org/pypi/axel
|
|
# Docs: http://packages.python.org/axel
|
|
|
|
from Queue import Empty, Queue
|
|
import hashlib
|
|
import sys
|
|
import threading
|
|
|
|
from couchpotato.core.helpers.variable import natsortKey
|
|
|
|
|
|
class Event(object):
|
|
"""
|
|
Event object inspired by C# events. Handlers can be registered and
|
|
unregistered using += and -= operators. Execution and result are
|
|
influenced by the arguments passed to the constructor and += method.
|
|
|
|
from axel import Event
|
|
|
|
event = Event()
|
|
def on_event(*args, **kwargs):
|
|
return (args, kwargs)
|
|
|
|
event += on_event # handler registration
|
|
print(event(10, 20, y=30))
|
|
>> ((True, ((10, 20), {'y': 30}), <function on_event at 0x00BAA270>),)
|
|
|
|
event -= on_event # handler is unregistered
|
|
print(event(10, 20, y=30))
|
|
>> None
|
|
|
|
class Mouse(object):
|
|
def __init__(self):
|
|
self.click = Event(self)
|
|
self.click += self.on_click # handler registration
|
|
|
|
def on_click(self, sender, *args, **kwargs):
|
|
assert isinstance(sender, Mouse), 'Wrong sender'
|
|
return (args, kwargs)
|
|
|
|
mouse = Mouse()
|
|
print(mouse.click(10, 20))
|
|
>> ((True, ((10, 20), {}),
|
|
>> <bound method Mouse.on_click of <__main__.Mouse object at 0x00B6F470>>),)
|
|
|
|
mouse.click -= mouse.on_click # handler is unregistered
|
|
print(mouse.click(10, 20))
|
|
>> None
|
|
"""
|
|
|
|
def __init__(self, name = None, sender = None, asynch = False, exc_info = False,
|
|
lock = None, threads = 3, traceback = False):
|
|
""" Creates an event
|
|
|
|
asynch
|
|
if True handler's are executes asynchronous
|
|
exc_info
|
|
if True, result will contain sys.exc_info()[:2] on error
|
|
lock
|
|
threading.RLock used to synchronize execution
|
|
sender
|
|
event's sender. The sender is passed as the first argument to the
|
|
handler, only if is not None. For this case the handler must have
|
|
a placeholder in the arguments to receive the sender
|
|
threads
|
|
maximum number of threads that will be started
|
|
traceback
|
|
if True, the execution result will contain sys.exc_info()
|
|
on error. exc_info must be also True to get the traceback
|
|
|
|
hash = self.hash(handler)
|
|
|
|
Handlers are stored in a dictionary that has as keys the handler's hash
|
|
handlers = {
|
|
hash : (handler, memoize, timeout),
|
|
hash : (handler, memoize, timeout), ...
|
|
}
|
|
The execution result is cached using the following structure
|
|
memoize = {
|
|
hash : ((args, kwargs, result), (args, kwargs, result), ...),
|
|
hash : ((args, kwargs, result), ...), ...
|
|
}
|
|
The execution result is returned as a tuple having this structure
|
|
exec_result = (
|
|
(True, result, handler), # on success
|
|
(False, error_info, handler), # on error
|
|
(None, None, handler), ... # asynchronous execution
|
|
)
|
|
"""
|
|
self.name = name
|
|
self.asynchronous = asynch
|
|
self.exc_info = exc_info
|
|
self.lock = lock
|
|
self.sender = sender
|
|
self.threads = threads
|
|
self.traceback = traceback
|
|
self.handlers = {}
|
|
self.memoize = {}
|
|
|
|
def hash(self, handler):
|
|
return hashlib.md5(str(handler)).hexdigest()
|
|
|
|
def handle(self, handler, priority = 0):
|
|
""" Registers a handler. The handler can be transmitted together
|
|
with two arguments as a list or dictionary. The arguments are:
|
|
|
|
memoize
|
|
if True, the execution result will be cached in self.memoize
|
|
timeout
|
|
will allocate a predefined time interval for the execution
|
|
|
|
If arguments are provided as a list, they are considered to have
|
|
this sequence: (handler, memoize, timeout)
|
|
|
|
Examples:
|
|
event += handler
|
|
event += (handler, True, 1.5)
|
|
event += {'handler':handler, 'memoize':True, 'timeout':1.5}
|
|
"""
|
|
handler_, memoize, timeout = self._extract(handler)
|
|
self.handlers['%s.%s' % (priority, self.hash(handler_))] = (handler_, memoize, timeout)
|
|
return self
|
|
|
|
def unhandle(self, handler):
|
|
""" Unregisters a handler """
|
|
handler_, memoize, timeout = self._extract(handler)
|
|
key = self.hash(handler_)
|
|
if not key in self.handlers:
|
|
raise ValueError('Handler "%s" was not found' % str(handler_))
|
|
del self.handlers[key]
|
|
return self
|
|
|
|
def fire(self, *args, **kwargs):
|
|
""" Stores all registered handlers in a queue for processing """
|
|
self.queue = Queue()
|
|
result = {}
|
|
|
|
if self.handlers:
|
|
|
|
max_threads = 1 if kwargs.get('event_order_lock') else self._threads()
|
|
|
|
# Set global result
|
|
def add_to(key, value):
|
|
result[key] = value
|
|
kwargs['event_add_to_result'] = add_to
|
|
|
|
for i in range(max_threads):
|
|
t = threading.Thread(target = self._execute,
|
|
args = args, kwargs = kwargs)
|
|
t.daemon = True
|
|
t.start()
|
|
|
|
handler_keys = self.handlers.keys()
|
|
handler_keys.sort(key = natsortKey)
|
|
|
|
for handler in handler_keys:
|
|
self.queue.put(handler)
|
|
|
|
if self.asynchronous:
|
|
handler_, memoize, timeout = self.handlers[handler]
|
|
result[handler] = (None, None, handler_)
|
|
|
|
if not self.asynchronous:
|
|
self.queue.join()
|
|
|
|
return result
|
|
|
|
def count(self):
|
|
""" Returns the count of registered handlers """
|
|
return len(self.handlers)
|
|
|
|
def clear(self):
|
|
""" Discards all registered handlers and cached results """
|
|
self.handlers.clear()
|
|
self.memoize.clear()
|
|
|
|
def _execute(self, *args, **kwargs):
|
|
|
|
# Remove get and set from kwargs
|
|
add_to_result = kwargs.get('event_add_to_result')
|
|
del kwargs['event_add_to_result']
|
|
|
|
# Get and remove order lock
|
|
order_lock = kwargs.get('event_order_lock')
|
|
try: del kwargs['event_order_lock']
|
|
except: pass
|
|
|
|
# Get and remove return on first
|
|
return_on_result = kwargs.get('event_return_on_result')
|
|
try: del kwargs['event_return_on_result']
|
|
except: pass
|
|
|
|
got_results = False
|
|
|
|
""" Executes all handlers stored in the queue """
|
|
while True:
|
|
|
|
try:
|
|
h_ = self.queue.get(timeout = 2)
|
|
handler, memoize, timeout = self.handlers[h_]
|
|
|
|
if return_on_result and got_results:
|
|
|
|
if not self.asynchronous:
|
|
self.queue.task_done()
|
|
|
|
continue
|
|
|
|
if order_lock:
|
|
order_lock.acquire()
|
|
|
|
try:
|
|
r = self._memoize(memoize, timeout, handler, *args, **kwargs)
|
|
if not self.asynchronous:
|
|
if not return_on_result or (return_on_result and r[1] is not None):
|
|
add_to_result(h_, tuple(r))
|
|
got_results = True
|
|
|
|
except Exception:
|
|
if not self.asynchronous:
|
|
add_to_result(h_, (False, self._error(sys.exc_info()),
|
|
handler))
|
|
else:
|
|
self.error_handler(sys.exc_info())
|
|
finally:
|
|
|
|
if not self.asynchronous:
|
|
self.queue.task_done()
|
|
|
|
if order_lock:
|
|
order_lock.release()
|
|
|
|
if self.queue.empty():
|
|
raise Empty
|
|
|
|
except Empty:
|
|
break
|
|
|
|
def _extract(self, queue_item):
|
|
""" Extracts a handler and handler's arguments that can be provided
|
|
as list or dictionary. If arguments are provided as list, they are
|
|
considered to have this sequence: (handler, memoize, timeout)
|
|
Examples:
|
|
event += handler
|
|
event += (handler, True, 1.5)
|
|
event += {'handler':handler, 'memoize':True, 'timeout':1.5}
|
|
"""
|
|
assert queue_item, 'Invalid list of arguments'
|
|
handler = None
|
|
memoize = False
|
|
timeout = 0
|
|
|
|
if not isinstance(queue_item, (list, tuple, dict)):
|
|
handler = queue_item
|
|
elif isinstance(queue_item, (list, tuple)):
|
|
if len(queue_item) == 3:
|
|
handler, memoize, timeout = queue_item
|
|
elif len(queue_item) == 2:
|
|
handler, memoize, = queue_item
|
|
elif len(queue_item) == 1:
|
|
handler = queue_item
|
|
elif isinstance(queue_item, dict):
|
|
handler = queue_item.get('handler')
|
|
memoize = queue_item.get('memoize', False)
|
|
timeout = queue_item.get('timeout', 0)
|
|
return (handler, bool(memoize), float(timeout))
|
|
|
|
def _memoize(self, memoize, timeout, handler, *args, **kwargs):
|
|
""" Caches the execution result of successful executions
|
|
hash = self.hash(handler)
|
|
memoize = {
|
|
hash : ((args, kwargs, result), (args, kwargs, result), ...),
|
|
hash : ((args, kwargs, result), ...), ...
|
|
}
|
|
"""
|
|
if not isinstance(handler, Event) and self.sender is not None:
|
|
args = list(args)[:]
|
|
args.insert(0, self.sender)
|
|
|
|
if not memoize:
|
|
if timeout <= 0: #no time restriction
|
|
result = [True, handler(*args, **kwargs), handler]
|
|
return result
|
|
|
|
result = self._timeout(timeout, handler, *args, **kwargs)
|
|
if isinstance(result, tuple) and len(result) == 3:
|
|
if isinstance(result[1], Exception): #error occurred
|
|
return [False, self._error(result), handler]
|
|
return [True, result, handler]
|
|
else:
|
|
hash_ = self.hash(handler)
|
|
if hash_ in self.memoize:
|
|
for args_, kwargs_, result in self.memoize[hash_]:
|
|
if args_ == args and kwargs_ == kwargs:
|
|
return [True, result, handler]
|
|
|
|
if timeout <= 0: #no time restriction
|
|
result = handler(*args, **kwargs)
|
|
else:
|
|
result = self._timeout(timeout, handler, *args, **kwargs)
|
|
if isinstance(result, tuple) and len(result) == 3:
|
|
if isinstance(result[1], Exception): #error occurred
|
|
return [False, self._error(result), handler]
|
|
|
|
lock = threading.RLock()
|
|
lock.acquire()
|
|
try:
|
|
if hash_ not in self.memoize:
|
|
self.memoize[hash_] = []
|
|
self.memoize[hash_].append((args, kwargs, result))
|
|
return [True, result, handler]
|
|
finally:
|
|
lock.release()
|
|
|
|
|
|
def _timeout(self, timeout, handler, *args, **kwargs):
|
|
""" Controls the time allocated for the execution of a method """
|
|
t = spawn_thread(target = handler, args = args, kwargs = kwargs)
|
|
t.daemon = True
|
|
t.start()
|
|
t.join(timeout)
|
|
|
|
if not t.is_alive():
|
|
if t.exc_info:
|
|
return t.exc_info
|
|
return t.result
|
|
else:
|
|
try:
|
|
msg = '[%s] Execution was forcefully terminated'
|
|
raise RuntimeError(msg % t.name)
|
|
except:
|
|
return sys.exc_info()
|
|
|
|
def _threads(self):
|
|
""" Calculates maximum number of threads that will be started """
|
|
if self.threads < len(self.handlers):
|
|
return self.threads
|
|
return len(self.handlers)
|
|
|
|
def _error(self, exc_info):
|
|
""" Retrieves the error info """
|
|
if self.exc_info:
|
|
if self.traceback:
|
|
return exc_info
|
|
return exc_info[:2]
|
|
return exc_info[1]
|
|
|
|
__iadd__ = handle
|
|
__isub__ = unhandle
|
|
__call__ = fire
|
|
__len__ = count
|
|
|
|
class spawn_thread(threading.Thread):
|
|
""" Spawns a new thread and returns the execution result """
|
|
|
|
def __init__(self, target, args = (), kwargs = {}, default = None):
|
|
threading.Thread.__init__(self)
|
|
self._target = target
|
|
self._args = args
|
|
self._kwargs = kwargs
|
|
self.result = default
|
|
self.exc_info = None
|
|
|
|
def run(self):
|
|
try:
|
|
self.result = self._target(*self._args, **self._kwargs)
|
|
except:
|
|
self.exc_info = sys.exc_info()
|
|
finally:
|
|
del self._target, self._args, self._kwargs
|
|
|