Browse Source

Axel python 2.5 fix

pull/1/merge
Ruud 14 years ago
parent
commit
b59c28d4a3
  1. 4
      libs/axl/__init__.py
  2. 148
      libs/axl/axel.py

4
libs/axel/__init__.py → libs/axl/__init__.py

@ -8,6 +8,6 @@
import inspect
from .axel import *
__all__ = sorted(name for name, obj in locals().items()
if not (name.startswith('_') or inspect.ismodule(obj)))
if not (name.startswith('_') or inspect.ismodule(obj)))
__all__.append('axel')
del inspect
del inspec

148
libs/axel/axel.py → libs/axl/axel.py

@ -52,8 +52,8 @@ class Event(object):
>> None
"""
def __init__(self, sender=None, asynch=False, exc_info=False,
lock=None, threads=3, traceback=False):
def __init__(self, sender = None, asynch = False, exc_info = False,
lock = None, threads = 3, traceback = False):
""" Creates an event
asynch
@ -90,13 +90,13 @@ class Event(object):
(False, error_info, handler), # on error
(None, None, handler), ... # asynchronous execution
)
"""
"""
self.asynchronous = asynch
self.exc_info = exc_info
self.lock = lock
self.sender = sender
self.threads = threads
self.traceback = traceback
self.traceback = traceback
self.handlers = {}
self.memoize = {}
@ -116,13 +116,13 @@ class Event(object):
event += handler
event += (handler, True, 1.5)
event += {'handler':handler, 'memoize':True, 'timeout':1.5}
"""
"""
handler_, memoize, timeout = self._extract(handler)
self.handlers[hash(handler_)] = (handler_, memoize, timeout)
return self
return self
def unhandle(self, handler):
""" Unregisters a handler """
""" Unregisters a handler """
handler_, memoize, timeout = self._extract(handler)
key = hash(handler_)
if not key in self.handlers:
@ -131,26 +131,26 @@ class Event(object):
return self
def fire(self, *args, **kwargs):
""" Stores all registered handlers in a queue for processing """
""" Stores all registered handlers in a queue for processing """
self.queue = Queue.Queue()
self.result = []
self.result = []
if self.handlers:
max_threads = self._threads()
max_threads = self._threads()
for i in range(max_threads):
t = threading.Thread(target=self._execute,
args=args, kwargs=kwargs)
t = threading.Thread(target = self._execute,
args = args, kwargs = kwargs)
t.daemon = True
t.start()
for handler in self.handlers:
self.queue.put(handler)
if self.asynchronous:
handler_, memoize, timeout = self.handlers[handler]
self.result.append((None, None, handler_))
self.result.append((None, None, handler_))
if not self.asynchronous:
self.queue.join()
@ -159,40 +159,43 @@ class Event(object):
def count(self):
""" Returns the count of registered handlers """
return len(self.handlers)
def clear(self):
""" Discards all registered handlers and cached results """
self.handlers.clear()
self.memoize.clear()
def _execute(self, *args, **kwargs):
""" Executes all handlers stored in the queue """
""" Executes all handlers stored in the queue """
while True:
try:
handler, memoize, timeout = self.handlers[self.queue.get()]
if isinstance(self.lock, threading._RLock):
self.lock.acquire() #synchronization
try:
r = self._memoize(memoize, timeout, handler, *args, **kwargs)
if not self.asynchronous:
self.result.append(tuple(r))
except Exception as err:
if not self.asynchronous:
self.result.append((False, self._error(sys.exc_info()),
handler))
except Exception:
if not self.asynchronous:
self.result.append((False, self._error(sys.exc_info()),
handler))
finally:
if isinstance(self.lock, threading._RLock):
self.lock.release()
if not self.asynchronous:
self.lock.release()
if not self.asynchronous:
self.queue.task_done()
if self.queue.empty():
raise Queue.Empty
except Queue.Empty:
break
def _extract(self, queue_item):
""" Extracts a handler and handler's arguments that can be provided
as list or dictionary. If arguments are provided as list, they are
@ -201,27 +204,27 @@ class Event(object):
event += handler
event += (handler, True, 1.5)
event += {'handler':handler, 'memoize':True, 'timeout':1.5}
"""
"""
assert queue_item, 'Invalid list of arguments'
handler = None
memoize = False
timeout = 0
handler = None
memoize = False
timeout = 0
if not isinstance(queue_item, (list, tuple, dict)):
handler = queue_item
elif isinstance(queue_item, (list, tuple)):
if len(queue_item) == 3:
handler, memoize, timeout = queue_item
elif len(queue_item) == 2:
handler, memoize, = queue_item
handler, memoize, = queue_item
elif len(queue_item) == 1:
handler = queue_item
handler = queue_item
elif isinstance(queue_item, dict):
handler = queue_item.get('handler')
memoize = queue_item.get('memoize', False)
timeout = queue_item.get('timeout', 0)
handler = queue_item.get('handler')
memoize = queue_item.get('memoize', False)
timeout = queue_item.get('timeout', 0)
return (handler, bool(memoize), float(timeout))
def _memoize(self, memoize, timeout, handler, *args, **kwargs):
""" Caches the execution result of successful executions
hash = hash(handler)
@ -229,35 +232,35 @@ class Event(object):
hash : ((args, kwargs, result), (args, kwargs, result), ...),
hash : ((args, kwargs, result), ...), ...
}
"""
"""
if not isinstance(handler, Event) and self.sender is not None:
args = list(args)[:]
args.insert(0, self.sender)
if not memoize:
if timeout <= 0: #no time restriction
return [True, handler(*args, **kwargs), handler]
result = self._timeout(timeout, handler, *args, **kwargs)
if isinstance(result, tuple) and len(result) == 3:
if isinstance(result, tuple) and len(result) == 3:
if isinstance(result[1], Exception): #error occurred
return [False, self._error(result), handler]
return [True, result, handler]
return [False, self._error(result), handler]
return [True, result, handler]
else:
hash_ = hash(handler)
hash_ = hash(handler)
if hash_ in self.memoize:
for args_, kwargs_, result in self.memoize[hash_]:
if args_ == args and kwargs_ == kwargs:
if args_ == args and kwargs_ == kwargs:
return [True, result, handler]
if timeout <= 0: #no time restriction
result = handler(*args, **kwargs)
else:
result = self._timeout(timeout, handler, *args, **kwargs)
else:
result = self._timeout(timeout, handler, *args, **kwargs)
if isinstance(result, tuple) and len(result) == 3:
if isinstance(result[1], Exception): #error occurred
return [False, self._error(result), handler]
lock = threading.RLock()
lock.acquire()
try:
@ -268,32 +271,33 @@ class Event(object):
finally:
lock.release()
def _timeout(self, timeout, handler, *args, **kwargs):
""" Controls the time allocated for the execution of a method """
t = spawn_thread(target=handler, args=args, kwargs=kwargs)
""" Controls the time allocated for the execution of a method """
t = spawn_thread(target = handler, args = args, kwargs = kwargs)
t.daemon = True
t.start()
t.join(timeout)
if not t.is_alive():
if t.exc_info:
return t.exc_info
return t.result
else:
try:
msg = '[%s] Execution was forcefully terminated'
msg = '[%s] Execution was forcefully terminated'
raise RuntimeError(msg % t.name)
except:
return sys.exc_info()
def _threads(self):
""" Calculates maximum number of threads that will be started """
""" Calculates maximum number of threads that will be started """
if self.threads < len(self.handlers):
return self.threads
return len(self.handlers)
def _error(self, exc_info):
""" Retrieves the error info """
def _error(self, exc_info):
""" Retrieves the error info """
if self.exc_info:
if self.traceback:
return exc_info
@ -303,23 +307,23 @@ class Event(object):
__iadd__ = handle
__isub__ = unhandle
__call__ = fire
__len__ = count
__len__ = count
class spawn_thread(threading.Thread):
class spawn_thread(threading.Thread):
""" Spawns a new thread and returns the execution result """
def __init__(self, target, args=(), kwargs={}, default=None):
threading.Thread.__init__(self)
def __init__(self, target, args = (), kwargs = {}, default = None):
threading.Thread.__init__(self)
self._target = target
self._args = args
self._kwargs = kwargs
self.result = default
self.exc_info = None
self.result = default
self.exc_info = None
def run(self):
try:
self.result = self._target(*self._args, **self._kwargs)
self.result = self._target(*self._args, **self._kwargs)
except:
self.exc_info = sys.exc_info()
finally:
del self._target, self._args, self._kwargs
del self._target, self._args, self._kwargs
Loading…
Cancel
Save