committed by
JackDandy
20 changed files with 1538 additions and 27 deletions
@ -0,0 +1,22 @@ |
|||||
|
# coding=utf-8 |
||||
|
# |
||||
|
# This file is part of SickGear. |
||||
|
# |
||||
|
# SickGear is free software: you can redistribute it and/or modify |
||||
|
# it under the terms of the GNU General Public License as published by |
||||
|
# the Free Software Foundation, either version 3 of the License, or |
||||
|
# (at your option) any later version. |
||||
|
# |
||||
|
# SickGear is distributed in the hope that it will be useful, |
||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of |
||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
||||
|
# GNU General Public License for more details. |
||||
|
# |
||||
|
# You should have received a copy of the GNU General Public License |
||||
|
# along with SickGear. If not, see <http://www.gnu.org/licenses/>. |
||||
|
import sys |
||||
|
|
||||
|
if 2 == sys.version_info[0]: |
||||
|
from .py2 import * |
||||
|
else: |
||||
|
from .py3 import * |
@ -0,0 +1,22 @@ |
|||||
|
import re |
||||
|
import sys |
||||
|
import threading |
||||
|
|
||||
|
if 2 == sys.version_info[0]: |
||||
|
# noinspection PyProtectedMember |
||||
|
from .futures.thread import _WorkItem |
||||
|
else: |
||||
|
# noinspection PyCompatibility,PyProtectedMember |
||||
|
from concurrent.futures.thread import _WorkItem |
||||
|
|
||||
|
|
||||
|
class GenericWorkItem(_WorkItem): |
||||
|
|
||||
|
number_regex = re.compile(r'(_\d+)$') |
||||
|
|
||||
|
def _set_thread_name(self): |
||||
|
try: |
||||
|
ct = threading.currentThread() |
||||
|
ct.name = '%s^WEB%s' % (self.args[0].__class__.__name__.upper(), self.number_regex.search(ct.name).group(1)) |
||||
|
except (BaseException, Exception): |
||||
|
pass |
@ -0,0 +1,23 @@ |
|||||
|
# Copyright 2009 Brian Quinlan. All Rights Reserved. |
||||
|
# Licensed to PSF under a Contributor Agreement. |
||||
|
|
||||
|
"""Execute computations asynchronously using threads or processes.""" |
||||
|
|
||||
|
__author__ = 'Brian Quinlan (brian@sweetapp.com)' |
||||
|
|
||||
|
from ._base import (FIRST_COMPLETED, |
||||
|
FIRST_EXCEPTION, |
||||
|
ALL_COMPLETED, |
||||
|
CancelledError, |
||||
|
TimeoutError, |
||||
|
Future, |
||||
|
Executor, |
||||
|
wait, |
||||
|
as_completed) |
||||
|
from .thread import ThreadPoolExecutor |
||||
|
|
||||
|
try: |
||||
|
from .process import ProcessPoolExecutor |
||||
|
except ImportError: |
||||
|
# some platforms don't have multiprocessing |
||||
|
pass |
@ -0,0 +1,673 @@ |
|||||
|
# Copyright 2009 Brian Quinlan. All Rights Reserved. |
||||
|
# Licensed to PSF under a Contributor Agreement. |
||||
|
|
||||
|
import collections |
||||
|
import logging |
||||
|
import threading |
||||
|
import itertools |
||||
|
import time |
||||
|
import types |
||||
|
|
||||
|
__author__ = 'Brian Quinlan (brian@sweetapp.com)' |
||||
|
|
||||
|
FIRST_COMPLETED = 'FIRST_COMPLETED' |
||||
|
FIRST_EXCEPTION = 'FIRST_EXCEPTION' |
||||
|
ALL_COMPLETED = 'ALL_COMPLETED' |
||||
|
_AS_COMPLETED = '_AS_COMPLETED' |
||||
|
|
||||
|
# Possible future states (for internal use by the futures package). |
||||
|
PENDING = 'PENDING' |
||||
|
RUNNING = 'RUNNING' |
||||
|
# The future was cancelled by the user... |
||||
|
CANCELLED = 'CANCELLED' |
||||
|
# ...and _Waiter.add_cancelled() was called by a worker. |
||||
|
CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED' |
||||
|
FINISHED = 'FINISHED' |
||||
|
|
||||
|
_FUTURE_STATES = [ |
||||
|
PENDING, |
||||
|
RUNNING, |
||||
|
CANCELLED, |
||||
|
CANCELLED_AND_NOTIFIED, |
||||
|
FINISHED |
||||
|
] |
||||
|
|
||||
|
_STATE_TO_DESCRIPTION_MAP = { |
||||
|
PENDING: "pending", |
||||
|
RUNNING: "running", |
||||
|
CANCELLED: "cancelled", |
||||
|
CANCELLED_AND_NOTIFIED: "cancelled", |
||||
|
FINISHED: "finished" |
||||
|
} |
||||
|
|
||||
|
# Logger for internal use by the futures package. |
||||
|
LOGGER = logging.getLogger("concurrent.futures") |
||||
|
|
||||
|
class Error(Exception): |
||||
|
"""Base class for all future-related exceptions.""" |
||||
|
pass |
||||
|
|
||||
|
class CancelledError(Error): |
||||
|
"""The Future was cancelled.""" |
||||
|
pass |
||||
|
|
||||
|
class TimeoutError(Error): |
||||
|
"""The operation exceeded the given deadline.""" |
||||
|
pass |
||||
|
|
||||
|
class _Waiter(object): |
||||
|
"""Provides the event that wait() and as_completed() block on.""" |
||||
|
def __init__(self): |
||||
|
self.event = threading.Event() |
||||
|
self.finished_futures = [] |
||||
|
|
||||
|
def add_result(self, future): |
||||
|
self.finished_futures.append(future) |
||||
|
|
||||
|
def add_exception(self, future): |
||||
|
self.finished_futures.append(future) |
||||
|
|
||||
|
def add_cancelled(self, future): |
||||
|
self.finished_futures.append(future) |
||||
|
|
||||
|
class _AsCompletedWaiter(_Waiter): |
||||
|
"""Used by as_completed().""" |
||||
|
|
||||
|
def __init__(self): |
||||
|
super(_AsCompletedWaiter, self).__init__() |
||||
|
self.lock = threading.Lock() |
||||
|
|
||||
|
def add_result(self, future): |
||||
|
with self.lock: |
||||
|
super(_AsCompletedWaiter, self).add_result(future) |
||||
|
self.event.set() |
||||
|
|
||||
|
def add_exception(self, future): |
||||
|
with self.lock: |
||||
|
super(_AsCompletedWaiter, self).add_exception(future) |
||||
|
self.event.set() |
||||
|
|
||||
|
def add_cancelled(self, future): |
||||
|
with self.lock: |
||||
|
super(_AsCompletedWaiter, self).add_cancelled(future) |
||||
|
self.event.set() |
||||
|
|
||||
|
class _FirstCompletedWaiter(_Waiter): |
||||
|
"""Used by wait(return_when=FIRST_COMPLETED).""" |
||||
|
|
||||
|
def add_result(self, future): |
||||
|
super(_FirstCompletedWaiter, self).add_result(future) |
||||
|
self.event.set() |
||||
|
|
||||
|
def add_exception(self, future): |
||||
|
super(_FirstCompletedWaiter, self).add_exception(future) |
||||
|
self.event.set() |
||||
|
|
||||
|
def add_cancelled(self, future): |
||||
|
super(_FirstCompletedWaiter, self).add_cancelled(future) |
||||
|
self.event.set() |
||||
|
|
||||
|
class _AllCompletedWaiter(_Waiter): |
||||
|
"""Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED).""" |
||||
|
|
||||
|
def __init__(self, num_pending_calls, stop_on_exception): |
||||
|
self.num_pending_calls = num_pending_calls |
||||
|
self.stop_on_exception = stop_on_exception |
||||
|
self.lock = threading.Lock() |
||||
|
super(_AllCompletedWaiter, self).__init__() |
||||
|
|
||||
|
def _decrement_pending_calls(self): |
||||
|
with self.lock: |
||||
|
self.num_pending_calls -= 1 |
||||
|
if not self.num_pending_calls: |
||||
|
self.event.set() |
||||
|
|
||||
|
def add_result(self, future): |
||||
|
super(_AllCompletedWaiter, self).add_result(future) |
||||
|
self._decrement_pending_calls() |
||||
|
|
||||
|
def add_exception(self, future): |
||||
|
super(_AllCompletedWaiter, self).add_exception(future) |
||||
|
if self.stop_on_exception: |
||||
|
self.event.set() |
||||
|
else: |
||||
|
self._decrement_pending_calls() |
||||
|
|
||||
|
def add_cancelled(self, future): |
||||
|
super(_AllCompletedWaiter, self).add_cancelled(future) |
||||
|
self._decrement_pending_calls() |
||||
|
|
||||
|
class _AcquireFutures(object): |
||||
|
"""A context manager that does an ordered acquire of Future conditions.""" |
||||
|
|
||||
|
def __init__(self, futures): |
||||
|
self.futures = sorted(futures, key=id) |
||||
|
|
||||
|
def __enter__(self): |
||||
|
for future in self.futures: |
||||
|
future._condition.acquire() |
||||
|
|
||||
|
def __exit__(self, *args): |
||||
|
for future in self.futures: |
||||
|
future._condition.release() |
||||
|
|
||||
|
def _create_and_install_waiters(fs, return_when): |
||||
|
if return_when == _AS_COMPLETED: |
||||
|
waiter = _AsCompletedWaiter() |
||||
|
elif return_when == FIRST_COMPLETED: |
||||
|
waiter = _FirstCompletedWaiter() |
||||
|
else: |
||||
|
pending_count = sum( |
||||
|
f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] for f in fs) |
||||
|
|
||||
|
if return_when == FIRST_EXCEPTION: |
||||
|
waiter = _AllCompletedWaiter(pending_count, stop_on_exception=True) |
||||
|
elif return_when == ALL_COMPLETED: |
||||
|
waiter = _AllCompletedWaiter(pending_count, stop_on_exception=False) |
||||
|
else: |
||||
|
raise ValueError("Invalid return condition: %r" % return_when) |
||||
|
|
||||
|
for f in fs: |
||||
|
f._waiters.append(waiter) |
||||
|
|
||||
|
return waiter |
||||
|
|
||||
|
|
||||
|
def _yield_finished_futures(fs, waiter, ref_collect): |
||||
|
""" |
||||
|
Iterate on the list *fs*, yielding finished futures one by one in |
||||
|
reverse order. |
||||
|
Before yielding a future, *waiter* is removed from its waiters |
||||
|
and the future is removed from each set in the collection of sets |
||||
|
*ref_collect*. |
||||
|
|
||||
|
The aim of this function is to avoid keeping stale references after |
||||
|
the future is yielded and before the iterator resumes. |
||||
|
""" |
||||
|
while fs: |
||||
|
f = fs[-1] |
||||
|
for futures_set in ref_collect: |
||||
|
futures_set.remove(f) |
||||
|
with f._condition: |
||||
|
f._waiters.remove(waiter) |
||||
|
del f |
||||
|
# Careful not to keep a reference to the popped value |
||||
|
yield fs.pop() |
||||
|
|
||||
|
|
||||
|
def as_completed(fs, timeout=None): |
||||
|
"""An iterator over the given futures that yields each as it completes. |
||||
|
|
||||
|
Args: |
||||
|
fs: The sequence of Futures (possibly created by different Executors) to |
||||
|
iterate over. |
||||
|
timeout: The maximum number of seconds to wait. If None, then there |
||||
|
is no limit on the wait time. |
||||
|
|
||||
|
Returns: |
||||
|
An iterator that yields the given Futures as they complete (finished or |
||||
|
cancelled). If any given Futures are duplicated, they will be returned |
||||
|
once. |
||||
|
|
||||
|
Raises: |
||||
|
TimeoutError: If the entire result iterator could not be generated |
||||
|
before the given timeout. |
||||
|
""" |
||||
|
if timeout is not None: |
||||
|
end_time = timeout + time.time() |
||||
|
|
||||
|
fs = set(fs) |
||||
|
total_futures = len(fs) |
||||
|
with _AcquireFutures(fs): |
||||
|
finished = set( |
||||
|
f for f in fs |
||||
|
if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) |
||||
|
pending = fs - finished |
||||
|
waiter = _create_and_install_waiters(fs, _AS_COMPLETED) |
||||
|
finished = list(finished) |
||||
|
try: |
||||
|
for f in _yield_finished_futures(finished, waiter, |
||||
|
ref_collect=(fs,)): |
||||
|
f = [f] |
||||
|
yield f.pop() |
||||
|
|
||||
|
while pending: |
||||
|
if timeout is None: |
||||
|
wait_timeout = None |
||||
|
else: |
||||
|
wait_timeout = end_time - time.time() |
||||
|
if wait_timeout < 0: |
||||
|
raise TimeoutError( |
||||
|
'%d (of %d) futures unfinished' % ( |
||||
|
len(pending), total_futures)) |
||||
|
|
||||
|
waiter.event.wait(wait_timeout) |
||||
|
|
||||
|
with waiter.lock: |
||||
|
finished = waiter.finished_futures |
||||
|
waiter.finished_futures = [] |
||||
|
waiter.event.clear() |
||||
|
|
||||
|
# reverse to keep finishing order |
||||
|
finished.reverse() |
||||
|
for f in _yield_finished_futures(finished, waiter, |
||||
|
ref_collect=(fs, pending)): |
||||
|
f = [f] |
||||
|
yield f.pop() |
||||
|
|
||||
|
finally: |
||||
|
# Remove waiter from unfinished futures |
||||
|
for f in fs: |
||||
|
with f._condition: |
||||
|
f._waiters.remove(waiter) |
||||
|
|
||||
|
DoneAndNotDoneFutures = collections.namedtuple( |
||||
|
'DoneAndNotDoneFutures', 'done not_done') |
||||
|
def wait(fs, timeout=None, return_when=ALL_COMPLETED): |
||||
|
"""Wait for the futures in the given sequence to complete. |
||||
|
|
||||
|
Args: |
||||
|
fs: The sequence of Futures (possibly created by different Executors) to |
||||
|
wait upon. |
||||
|
timeout: The maximum number of seconds to wait. If None, then there |
||||
|
is no limit on the wait time. |
||||
|
return_when: Indicates when this function should return. The options |
||||
|
are: |
||||
|
|
||||
|
FIRST_COMPLETED - Return when any future finishes or is |
||||
|
cancelled. |
||||
|
FIRST_EXCEPTION - Return when any future finishes by raising an |
||||
|
exception. If no future raises an exception |
||||
|
then it is equivalent to ALL_COMPLETED. |
||||
|
ALL_COMPLETED - Return when all futures finish or are cancelled. |
||||
|
|
||||
|
Returns: |
||||
|
A named 2-tuple of sets. The first set, named 'done', contains the |
||||
|
futures that completed (is finished or cancelled) before the wait |
||||
|
completed. The second set, named 'not_done', contains uncompleted |
||||
|
futures. |
||||
|
""" |
||||
|
with _AcquireFutures(fs): |
||||
|
done = set(f for f in fs |
||||
|
if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) |
||||
|
not_done = set(fs) - done |
||||
|
|
||||
|
if (return_when == FIRST_COMPLETED) and done: |
||||
|
return DoneAndNotDoneFutures(done, not_done) |
||||
|
elif (return_when == FIRST_EXCEPTION) and done: |
||||
|
if any(f for f in done |
||||
|
if not f.cancelled() and f.exception() is not None): |
||||
|
return DoneAndNotDoneFutures(done, not_done) |
||||
|
|
||||
|
if len(done) == len(fs): |
||||
|
return DoneAndNotDoneFutures(done, not_done) |
||||
|
|
||||
|
waiter = _create_and_install_waiters(fs, return_when) |
||||
|
|
||||
|
waiter.event.wait(timeout) |
||||
|
for f in fs: |
||||
|
with f._condition: |
||||
|
f._waiters.remove(waiter) |
||||
|
|
||||
|
done.update(waiter.finished_futures) |
||||
|
return DoneAndNotDoneFutures(done, set(fs) - done) |
||||
|
|
||||
|
class Future(object): |
||||
|
"""Represents the result of an asynchronous computation.""" |
||||
|
|
||||
|
def __init__(self): |
||||
|
"""Initializes the future. Should not be called by clients.""" |
||||
|
self._condition = threading.Condition() |
||||
|
self._state = PENDING |
||||
|
self._result = None |
||||
|
self._exception = None |
||||
|
self._traceback = None |
||||
|
self._waiters = [] |
||||
|
self._done_callbacks = [] |
||||
|
|
||||
|
def _invoke_callbacks(self): |
||||
|
for callback in self._done_callbacks: |
||||
|
try: |
||||
|
callback(self) |
||||
|
except Exception: |
||||
|
LOGGER.exception('exception calling callback for %r', self) |
||||
|
except BaseException: |
||||
|
# Explicitly let all other new-style exceptions through so |
||||
|
# that we can catch all old-style exceptions with a simple |
||||
|
# "except:" clause below. |
||||
|
# |
||||
|
# All old-style exception objects are instances of |
||||
|
# types.InstanceType, but "except types.InstanceType:" does |
||||
|
# not catch old-style exceptions for some reason. Thus, the |
||||
|
# only way to catch all old-style exceptions without catching |
||||
|
# any new-style exceptions is to filter out the new-style |
||||
|
# exceptions, which all derive from BaseException. |
||||
|
raise |
||||
|
except: |
||||
|
# Because of the BaseException clause above, this handler only |
||||
|
# executes for old-style exception objects. |
||||
|
LOGGER.exception('exception calling callback for %r', self) |
||||
|
|
||||
|
def __repr__(self): |
||||
|
with self._condition: |
||||
|
if self._state == FINISHED: |
||||
|
if self._exception: |
||||
|
return '<%s at %#x state=%s raised %s>' % ( |
||||
|
self.__class__.__name__, |
||||
|
id(self), |
||||
|
_STATE_TO_DESCRIPTION_MAP[self._state], |
||||
|
self._exception.__class__.__name__) |
||||
|
else: |
||||
|
return '<%s at %#x state=%s returned %s>' % ( |
||||
|
self.__class__.__name__, |
||||
|
id(self), |
||||
|
_STATE_TO_DESCRIPTION_MAP[self._state], |
||||
|
self._result.__class__.__name__) |
||||
|
return '<%s at %#x state=%s>' % ( |
||||
|
self.__class__.__name__, |
||||
|
id(self), |
||||
|
_STATE_TO_DESCRIPTION_MAP[self._state]) |
||||
|
|
||||
|
def cancel(self): |
||||
|
"""Cancel the future if possible. |
||||
|
|
||||
|
Returns True if the future was cancelled, False otherwise. A future |
||||
|
cannot be cancelled if it is running or has already completed. |
||||
|
""" |
||||
|
with self._condition: |
||||
|
if self._state in [RUNNING, FINISHED]: |
||||
|
return False |
||||
|
|
||||
|
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: |
||||
|
return True |
||||
|
|
||||
|
self._state = CANCELLED |
||||
|
self._condition.notify_all() |
||||
|
|
||||
|
self._invoke_callbacks() |
||||
|
return True |
||||
|
|
||||
|
def cancelled(self): |
||||
|
"""Return True if the future was cancelled.""" |
||||
|
with self._condition: |
||||
|
return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED] |
||||
|
|
||||
|
def running(self): |
||||
|
"""Return True if the future is currently executing.""" |
||||
|
with self._condition: |
||||
|
return self._state == RUNNING |
||||
|
|
||||
|
def done(self): |
||||
|
"""Return True of the future was cancelled or finished executing.""" |
||||
|
with self._condition: |
||||
|
return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED] |
||||
|
|
||||
|
def __get_result(self): |
||||
|
if self._exception: |
||||
|
if isinstance(self._exception, types.InstanceType): |
||||
|
# The exception is an instance of an old-style class, which |
||||
|
# means type(self._exception) returns types.ClassType instead |
||||
|
# of the exception's actual class type. |
||||
|
exception_type = self._exception.__class__ |
||||
|
else: |
||||
|
exception_type = type(self._exception) |
||||
|
raise exception_type, self._exception, self._traceback |
||||
|
else: |
||||
|
return self._result |
||||
|
|
||||
|
def add_done_callback(self, fn): |
||||
|
"""Attaches a callable that will be called when the future finishes. |
||||
|
|
||||
|
Args: |
||||
|
fn: A callable that will be called with this future as its only |
||||
|
argument when the future completes or is cancelled. The callable |
||||
|
will always be called by a thread in the same process in which |
||||
|
it was added. If the future has already completed or been |
||||
|
cancelled then the callable will be called immediately. These |
||||
|
callables are called in the order that they were added. |
||||
|
""" |
||||
|
with self._condition: |
||||
|
if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]: |
||||
|
self._done_callbacks.append(fn) |
||||
|
return |
||||
|
fn(self) |
||||
|
|
||||
|
def result(self, timeout=None): |
||||
|
"""Return the result of the call that the future represents. |
||||
|
|
||||
|
Args: |
||||
|
timeout: The number of seconds to wait for the result if the future |
||||
|
isn't done. If None, then there is no limit on the wait time. |
||||
|
|
||||
|
Returns: |
||||
|
The result of the call that the future represents. |
||||
|
|
||||
|
Raises: |
||||
|
CancelledError: If the future was cancelled. |
||||
|
TimeoutError: If the future didn't finish executing before the given |
||||
|
timeout. |
||||
|
Exception: If the call raised then that exception will be raised. |
||||
|
""" |
||||
|
with self._condition: |
||||
|
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: |
||||
|
raise CancelledError() |
||||
|
elif self._state == FINISHED: |
||||
|
return self.__get_result() |
||||
|
|
||||
|
self._condition.wait(timeout) |
||||
|
|
||||
|
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: |
||||
|
raise CancelledError() |
||||
|
elif self._state == FINISHED: |
||||
|
return self.__get_result() |
||||
|
else: |
||||
|
raise TimeoutError() |
||||
|
|
||||
|
def exception_info(self, timeout=None): |
||||
|
"""Return a tuple of (exception, traceback) raised by the call that the |
||||
|
future represents. |
||||
|
|
||||
|
Args: |
||||
|
timeout: The number of seconds to wait for the exception if the |
||||
|
future isn't done. If None, then there is no limit on the wait |
||||
|
time. |
||||
|
|
||||
|
Returns: |
||||
|
The exception raised by the call that the future represents or None |
||||
|
if the call completed without raising. |
||||
|
|
||||
|
Raises: |
||||
|
CancelledError: If the future was cancelled. |
||||
|
TimeoutError: If the future didn't finish executing before the given |
||||
|
timeout. |
||||
|
""" |
||||
|
with self._condition: |
||||
|
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: |
||||
|
raise CancelledError() |
||||
|
elif self._state == FINISHED: |
||||
|
return self._exception, self._traceback |
||||
|
|
||||
|
self._condition.wait(timeout) |
||||
|
|
||||
|
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: |
||||
|
raise CancelledError() |
||||
|
elif self._state == FINISHED: |
||||
|
return self._exception, self._traceback |
||||
|
else: |
||||
|
raise TimeoutError() |
||||
|
|
||||
|
def exception(self, timeout=None): |
||||
|
"""Return the exception raised by the call that the future represents. |
||||
|
|
||||
|
Args: |
||||
|
timeout: The number of seconds to wait for the exception if the |
||||
|
future isn't done. If None, then there is no limit on the wait |
||||
|
time. |
||||
|
|
||||
|
Returns: |
||||
|
The exception raised by the call that the future represents or None |
||||
|
if the call completed without raising. |
||||
|
|
||||
|
Raises: |
||||
|
CancelledError: If the future was cancelled. |
||||
|
TimeoutError: If the future didn't finish executing before the given |
||||
|
timeout. |
||||
|
""" |
||||
|
return self.exception_info(timeout)[0] |
||||
|
|
||||
|
# The following methods should only be used by Executors and in tests. |
||||
|
def set_running_or_notify_cancel(self): |
||||
|
"""Mark the future as running or process any cancel notifications. |
||||
|
|
||||
|
Should only be used by Executor implementations and unit tests. |
||||
|
|
||||
|
If the future has been cancelled (cancel() was called and returned |
||||
|
True) then any threads waiting on the future completing (though calls |
||||
|
to as_completed() or wait()) are notified and False is returned. |
||||
|
|
||||
|
If the future was not cancelled then it is put in the running state |
||||
|
(future calls to running() will return True) and True is returned. |
||||
|
|
||||
|
This method should be called by Executor implementations before |
||||
|
executing the work associated with this future. If this method returns |
||||
|
False then the work should not be executed. |
||||
|
|
||||
|
Returns: |
||||
|
False if the Future was cancelled, True otherwise. |
||||
|
|
||||
|
Raises: |
||||
|
RuntimeError: if this method was already called or if set_result() |
||||
|
or set_exception() was called. |
||||
|
""" |
||||
|
with self._condition: |
||||
|
if self._state == CANCELLED: |
||||
|
self._state = CANCELLED_AND_NOTIFIED |
||||
|
for waiter in self._waiters: |
||||
|
waiter.add_cancelled(self) |
||||
|
# self._condition.notify_all() is not necessary because |
||||
|
# self.cancel() triggers a notification. |
||||
|
return False |
||||
|
elif self._state == PENDING: |
||||
|
self._state = RUNNING |
||||
|
return True |
||||
|
else: |
||||
|
LOGGER.critical('Future %s in unexpected state: %s', |
||||
|
id(self), |
||||
|
self._state) |
||||
|
raise RuntimeError('Future in unexpected state') |
||||
|
|
||||
|
def set_result(self, result): |
||||
|
"""Sets the return value of work associated with the future. |
||||
|
|
||||
|
Should only be used by Executor implementations and unit tests. |
||||
|
""" |
||||
|
with self._condition: |
||||
|
self._result = result |
||||
|
self._state = FINISHED |
||||
|
for waiter in self._waiters: |
||||
|
waiter.add_result(self) |
||||
|
self._condition.notify_all() |
||||
|
self._invoke_callbacks() |
||||
|
|
||||
|
def set_exception_info(self, exception, traceback): |
||||
|
"""Sets the result of the future as being the given exception |
||||
|
and traceback. |
||||
|
|
||||
|
Should only be used by Executor implementations and unit tests. |
||||
|
""" |
||||
|
with self._condition: |
||||
|
self._exception = exception |
||||
|
self._traceback = traceback |
||||
|
self._state = FINISHED |
||||
|
for waiter in self._waiters: |
||||
|
waiter.add_exception(self) |
||||
|
self._condition.notify_all() |
||||
|
self._invoke_callbacks() |
||||
|
|
||||
|
def set_exception(self, exception): |
||||
|
"""Sets the result of the future as being the given exception. |
||||
|
|
||||
|
Should only be used by Executor implementations and unit tests. |
||||
|
""" |
||||
|
self.set_exception_info(exception, None) |
||||
|
|
||||
|
class Executor(object): |
||||
|
"""This is an abstract base class for concrete asynchronous executors.""" |
||||
|
|
||||
|
def submit(self, fn, *args, **kwargs): |
||||
|
"""Submits a callable to be executed with the given arguments. |
||||
|
|
||||
|
Schedules the callable to be executed as fn(*args, **kwargs) and returns |
||||
|
a Future instance representing the execution of the callable. |
||||
|
|
||||
|
Returns: |
||||
|
A Future representing the given call. |
||||
|
""" |
||||
|
raise NotImplementedError() |
||||
|
|
||||
|
def map(self, fn, *iterables, **kwargs): |
||||
|
"""Returns an iterator equivalent to map(fn, iter). |
||||
|
|
||||
|
Args: |
||||
|
fn: A callable that will take as many arguments as there are |
||||
|
passed iterables. |
||||
|
timeout: The maximum number of seconds to wait. If None, then there |
||||
|
is no limit on the wait time. |
||||
|
|
||||
|
Returns: |
||||
|
An iterator equivalent to: map(func, *iterables) but the calls may |
||||
|
be evaluated out-of-order. |
||||
|
|
||||
|
Raises: |
||||
|
TimeoutError: If the entire result iterator could not be generated |
||||
|
before the given timeout. |
||||
|
Exception: If fn(*args) raises for any values. |
||||
|
""" |
||||
|
timeout = kwargs.get('timeout') |
||||
|
if timeout is not None: |
||||
|
end_time = timeout + time.time() |
||||
|
|
||||
|
fs = [self.submit(fn, *args) for args in itertools.izip(*iterables)] |
||||
|
|
||||
|
# Yield must be hidden in closure so that the futures are submitted |
||||
|
# before the first iterator value is required. |
||||
|
def result_iterator(): |
||||
|
try: |
||||
|
# reverse to keep finishing order |
||||
|
fs.reverse() |
||||
|
while fs: |
||||
|
# Careful not to keep a reference to the popped future |
||||
|
if timeout is None: |
||||
|
yield fs.pop().result() |
||||
|
else: |
||||
|
yield fs.pop().result(end_time - time.time()) |
||||
|
finally: |
||||
|
for future in fs: |
||||
|
future.cancel() |
||||
|
return result_iterator() |
||||
|
|
||||
|
def shutdown(self, wait=True): |
||||
|
"""Clean-up the resources associated with the Executor. |
||||
|
|
||||
|
It is safe to call this method several times. Otherwise, no other |
||||
|
methods can be called after this one. |
||||
|
|
||||
|
Args: |
||||
|
wait: If True then shutdown will not return until all running |
||||
|
futures have finished executing and the resources used by the |
||||
|
executor have been reclaimed. |
||||
|
""" |
||||
|
pass |
||||
|
|
||||
|
def __enter__(self): |
||||
|
return self |
||||
|
|
||||
|
def __exit__(self, exc_type, exc_val, exc_tb): |
||||
|
self.shutdown(wait=True) |
||||
|
return False |
||||
|
|
||||
|
|
||||
|
class BrokenExecutor(RuntimeError): |
||||
|
""" |
||||
|
Raised when a executor has become non-functional after a severe failure. |
||||
|
""" |
@ -0,0 +1,363 @@ |
|||||
|
# Copyright 2009 Brian Quinlan. All Rights Reserved. |
||||
|
# Licensed to PSF under a Contributor Agreement. |
||||
|
|
||||
|
"""Implements ProcessPoolExecutor. |
||||
|
|
||||
|
The follow diagram and text describe the data-flow through the system: |
||||
|
|
||||
|
|======================= In-process =====================|== Out-of-process ==| |
||||
|
|
||||
|
+----------+ +----------+ +--------+ +-----------+ +---------+ |
||||
|
| | => | Work Ids | => | | => | Call Q | => | | |
||||
|
| | +----------+ | | +-----------+ | | |
||||
|
| | | ... | | | | ... | | | |
||||
|
| | | 6 | | | | 5, call() | | | |
||||
|
| | | 7 | | | | ... | | | |
||||
|
| Process | | ... | | Local | +-----------+ | Process | |
||||
|
| Pool | +----------+ | Worker | | #1..n | |
||||
|
| Executor | | Thread | | | |
||||
|
| | +----------- + | | +-----------+ | | |
||||
|
| | <=> | Work Items | <=> | | <= | Result Q | <= | | |
||||
|
| | +------------+ | | +-----------+ | | |
||||
|
| | | 6: call() | | | | ... | | | |
||||
|
| | | future | | | | 4, result | | | |
||||
|
| | | ... | | | | 3, except | | | |
||||
|
+----------+ +------------+ +--------+ +-----------+ +---------+ |
||||
|
|
||||
|
Executor.submit() called: |
||||
|
- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict |
||||
|
- adds the id of the _WorkItem to the "Work Ids" queue |
||||
|
|
||||
|
Local worker thread: |
||||
|
- reads work ids from the "Work Ids" queue and looks up the corresponding |
||||
|
WorkItem from the "Work Items" dict: if the work item has been cancelled then |
||||
|
it is simply removed from the dict, otherwise it is repackaged as a |
||||
|
_CallItem and put in the "Call Q". New _CallItems are put in the "Call Q" |
||||
|
until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because |
||||
|
calls placed in the "Call Q" can no longer be cancelled with Future.cancel(). |
||||
|
- reads _ResultItems from "Result Q", updates the future stored in the |
||||
|
"Work Items" dict and deletes the dict entry |
||||
|
|
||||
|
Process #1..n: |
||||
|
- reads _CallItems from "Call Q", executes the calls, and puts the resulting |
||||
|
_ResultItems in "Request Q" |
||||
|
""" |
||||
|
|
||||
|
import atexit |
||||
|
from . import _base |
||||
|
import Queue as queue |
||||
|
import multiprocessing |
||||
|
import threading |
||||
|
import weakref |
||||
|
import sys |
||||
|
|
||||
|
__author__ = 'Brian Quinlan (brian@sweetapp.com)' |
||||
|
|
||||
|
# Workers are created as daemon threads and processes. This is done to allow the |
||||
|
# interpreter to exit when there are still idle processes in a |
||||
|
# ProcessPoolExecutor's process pool (i.e. shutdown() was not called). However, |
||||
|
# allowing workers to die with the interpreter has two undesirable properties: |
||||
|
# - The workers would still be running during interpretor shutdown, |
||||
|
# meaning that they would fail in unpredictable ways. |
||||
|
# - The workers could be killed while evaluating a work item, which could |
||||
|
# be bad if the callable being evaluated has external side-effects e.g. |
||||
|
# writing to a file. |
||||
|
# |
||||
|
# To work around this problem, an exit handler is installed which tells the |
||||
|
# workers to exit when their work queues are empty and then waits until the |
||||
|
# threads/processes finish. |
||||
|
|
||||
|
_threads_queues = weakref.WeakKeyDictionary() |
||||
|
_shutdown = False |
||||
|
|
||||
|
def _python_exit(): |
||||
|
global _shutdown |
||||
|
_shutdown = True |
||||
|
items = list(_threads_queues.items()) if _threads_queues else () |
||||
|
for t, q in items: |
||||
|
q.put(None) |
||||
|
for t, q in items: |
||||
|
t.join(sys.maxint) |
||||
|
|
||||
|
# Controls how many more calls than processes will be queued in the call queue. |
||||
|
# A smaller number will mean that processes spend more time idle waiting for |
||||
|
# work while a larger number will make Future.cancel() succeed less frequently |
||||
|
# (Futures in the call queue cannot be cancelled). |
||||
|
EXTRA_QUEUED_CALLS = 1 |
||||
|
|
||||
|
class _WorkItem(object): |
||||
|
def __init__(self, future, fn, args, kwargs): |
||||
|
self.future = future |
||||
|
self.fn = fn |
||||
|
self.args = args |
||||
|
self.kwargs = kwargs |
||||
|
|
||||
|
class _ResultItem(object): |
||||
|
def __init__(self, work_id, exception=None, result=None): |
||||
|
self.work_id = work_id |
||||
|
self.exception = exception |
||||
|
self.result = result |
||||
|
|
||||
|
class _CallItem(object): |
||||
|
def __init__(self, work_id, fn, args, kwargs): |
||||
|
self.work_id = work_id |
||||
|
self.fn = fn |
||||
|
self.args = args |
||||
|
self.kwargs = kwargs |
||||
|
|
||||
|
def _process_worker(call_queue, result_queue): |
||||
|
"""Evaluates calls from call_queue and places the results in result_queue. |
||||
|
|
||||
|
This worker is run in a separate process. |
||||
|
|
||||
|
Args: |
||||
|
call_queue: A multiprocessing.Queue of _CallItems that will be read and |
||||
|
evaluated by the worker. |
||||
|
result_queue: A multiprocessing.Queue of _ResultItems that will written |
||||
|
to by the worker. |
||||
|
shutdown: A multiprocessing.Event that will be set as a signal to the |
||||
|
worker that it should exit when call_queue is empty. |
||||
|
""" |
||||
|
while True: |
||||
|
call_item = call_queue.get(block=True) |
||||
|
if call_item is None: |
||||
|
# Wake up queue management thread |
||||
|
result_queue.put(None) |
||||
|
return |
||||
|
try: |
||||
|
r = call_item.fn(*call_item.args, **call_item.kwargs) |
||||
|
except: |
||||
|
e = sys.exc_info()[1] |
||||
|
result_queue.put(_ResultItem(call_item.work_id, |
||||
|
exception=e)) |
||||
|
else: |
||||
|
result_queue.put(_ResultItem(call_item.work_id, |
||||
|
result=r)) |
||||
|
|
||||
|
def _add_call_item_to_queue(pending_work_items, |
||||
|
work_ids, |
||||
|
call_queue): |
||||
|
"""Fills call_queue with _WorkItems from pending_work_items. |
||||
|
|
||||
|
This function never blocks. |
||||
|
|
||||
|
Args: |
||||
|
pending_work_items: A dict mapping work ids to _WorkItems e.g. |
||||
|
{5: <_WorkItem...>, 6: <_WorkItem...>, ...} |
||||
|
work_ids: A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work ids |
||||
|
are consumed and the corresponding _WorkItems from |
||||
|
pending_work_items are transformed into _CallItems and put in |
||||
|
call_queue. |
||||
|
call_queue: A multiprocessing.Queue that will be filled with _CallItems |
||||
|
derived from _WorkItems. |
||||
|
""" |
||||
|
while True: |
||||
|
if call_queue.full(): |
||||
|
return |
||||
|
try: |
||||
|
work_id = work_ids.get(block=False) |
||||
|
except queue.Empty: |
||||
|
return |
||||
|
else: |
||||
|
work_item = pending_work_items[work_id] |
||||
|
|
||||
|
if work_item.future.set_running_or_notify_cancel(): |
||||
|
call_queue.put(_CallItem(work_id, |
||||
|
work_item.fn, |
||||
|
work_item.args, |
||||
|
work_item.kwargs), |
||||
|
block=True) |
||||
|
else: |
||||
|
del pending_work_items[work_id] |
||||
|
continue |
||||
|
|
||||
|
def _queue_management_worker(executor_reference, |
||||
|
processes, |
||||
|
pending_work_items, |
||||
|
work_ids_queue, |
||||
|
call_queue, |
||||
|
result_queue): |
||||
|
"""Manages the communication between this process and the worker processes. |
||||
|
|
||||
|
This function is run in a local thread. |
||||
|
|
||||
|
Args: |
||||
|
executor_reference: A weakref.ref to the ProcessPoolExecutor that owns |
||||
|
this thread. Used to determine if the ProcessPoolExecutor has been |
||||
|
garbage collected and that this function can exit. |
||||
|
process: A list of the multiprocessing.Process instances used as |
||||
|
workers. |
||||
|
pending_work_items: A dict mapping work ids to _WorkItems e.g. |
||||
|
{5: <_WorkItem...>, 6: <_WorkItem...>, ...} |
||||
|
work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]). |
||||
|
call_queue: A multiprocessing.Queue that will be filled with _CallItems |
||||
|
derived from _WorkItems for processing by the process workers. |
||||
|
result_queue: A multiprocessing.Queue of _ResultItems generated by the |
||||
|
process workers. |
||||
|
""" |
||||
|
nb_shutdown_processes = [0] |
||||
|
def shutdown_one_process(): |
||||
|
"""Tell a worker to terminate, which will in turn wake us again""" |
||||
|
call_queue.put(None) |
||||
|
nb_shutdown_processes[0] += 1 |
||||
|
while True: |
||||
|
_add_call_item_to_queue(pending_work_items, |
||||
|
work_ids_queue, |
||||
|
call_queue) |
||||
|
|
||||
|
result_item = result_queue.get(block=True) |
||||
|
if result_item is not None: |
||||
|
work_item = pending_work_items[result_item.work_id] |
||||
|
del pending_work_items[result_item.work_id] |
||||
|
|
||||
|
if result_item.exception: |
||||
|
work_item.future.set_exception(result_item.exception) |
||||
|
else: |
||||
|
work_item.future.set_result(result_item.result) |
||||
|
# Delete references to object. See issue16284 |
||||
|
del work_item |
||||
|
# Check whether we should start shutting down. |
||||
|
executor = executor_reference() |
||||
|
# No more work items can be added if: |
||||
|
# - The interpreter is shutting down OR |
||||
|
# - The executor that owns this worker has been collected OR |
||||
|
# - The executor that owns this worker has been shutdown. |
||||
|
if _shutdown or executor is None or executor._shutdown_thread: |
||||
|
# Since no new work items can be added, it is safe to shutdown |
||||
|
# this thread if there are no pending work items. |
||||
|
if not pending_work_items: |
||||
|
while nb_shutdown_processes[0] < len(processes): |
||||
|
shutdown_one_process() |
||||
|
# If .join() is not called on the created processes then |
||||
|
# some multiprocessing.Queue methods may deadlock on Mac OS |
||||
|
# X. |
||||
|
for p in processes: |
||||
|
p.join() |
||||
|
call_queue.close() |
||||
|
return |
||||
|
del executor |
||||
|
|
||||
|
_system_limits_checked = False |
||||
|
_system_limited = None |
||||
|
def _check_system_limits(): |
||||
|
global _system_limits_checked, _system_limited |
||||
|
if _system_limits_checked: |
||||
|
if _system_limited: |
||||
|
raise NotImplementedError(_system_limited) |
||||
|
_system_limits_checked = True |
||||
|
try: |
||||
|
import os |
||||
|
nsems_max = os.sysconf("SC_SEM_NSEMS_MAX") |
||||
|
except (AttributeError, ValueError): |
||||
|
# sysconf not available or setting not available |
||||
|
return |
||||
|
if nsems_max == -1: |
||||
|
# indetermine limit, assume that limit is determined |
||||
|
# by available memory only |
||||
|
return |
||||
|
if nsems_max >= 256: |
||||
|
# minimum number of semaphores available |
||||
|
# according to POSIX |
||||
|
return |
||||
|
_system_limited = "system provides too few semaphores (%d available, 256 necessary)" % nsems_max |
||||
|
raise NotImplementedError(_system_limited) |
||||
|
|
||||
|
|
||||
|
class ProcessPoolExecutor(_base.Executor): |
||||
|
def __init__(self, max_workers=None): |
||||
|
"""Initializes a new ProcessPoolExecutor instance. |
||||
|
|
||||
|
Args: |
||||
|
max_workers: The maximum number of processes that can be used to |
||||
|
execute the given calls. If None or not given then as many |
||||
|
worker processes will be created as the machine has processors. |
||||
|
""" |
||||
|
_check_system_limits() |
||||
|
|
||||
|
if max_workers is None: |
||||
|
self._max_workers = multiprocessing.cpu_count() |
||||
|
else: |
||||
|
if max_workers <= 0: |
||||
|
raise ValueError("max_workers must be greater than 0") |
||||
|
|
||||
|
self._max_workers = max_workers |
||||
|
|
||||
|
# Make the call queue slightly larger than the number of processes to |
||||
|
# prevent the worker processes from idling. But don't make it too big |
||||
|
# because futures in the call queue cannot be cancelled. |
||||
|
self._call_queue = multiprocessing.Queue(self._max_workers + |
||||
|
EXTRA_QUEUED_CALLS) |
||||
|
self._result_queue = multiprocessing.Queue() |
||||
|
self._work_ids = queue.Queue() |
||||
|
self._queue_management_thread = None |
||||
|
self._processes = set() |
||||
|
|
||||
|
# Shutdown is a two-step process. |
||||
|
self._shutdown_thread = False |
||||
|
self._shutdown_lock = threading.Lock() |
||||
|
self._queue_count = 0 |
||||
|
self._pending_work_items = {} |
||||
|
|
||||
|
def _start_queue_management_thread(self): |
||||
|
# When the executor gets lost, the weakref callback will wake up |
||||
|
# the queue management thread. |
||||
|
def weakref_cb(_, q=self._result_queue): |
||||
|
q.put(None) |
||||
|
if self._queue_management_thread is None: |
||||
|
self._queue_management_thread = threading.Thread( |
||||
|
target=_queue_management_worker, |
||||
|
args=(weakref.ref(self, weakref_cb), |
||||
|
self._processes, |
||||
|
self._pending_work_items, |
||||
|
self._work_ids, |
||||
|
self._call_queue, |
||||
|
self._result_queue)) |
||||
|
self._queue_management_thread.daemon = True |
||||
|
self._queue_management_thread.start() |
||||
|
_threads_queues[self._queue_management_thread] = self._result_queue |
||||
|
|
||||
|
def _adjust_process_count(self): |
||||
|
for _ in range(len(self._processes), self._max_workers): |
||||
|
p = multiprocessing.Process( |
||||
|
target=_process_worker, |
||||
|
args=(self._call_queue, |
||||
|
self._result_queue)) |
||||
|
p.start() |
||||
|
self._processes.add(p) |
||||
|
|
||||
|
def submit(self, fn, *args, **kwargs): |
||||
|
with self._shutdown_lock: |
||||
|
if self._shutdown_thread: |
||||
|
raise RuntimeError('cannot schedule new futures after shutdown') |
||||
|
|
||||
|
f = _base.Future() |
||||
|
w = _WorkItem(f, fn, args, kwargs) |
||||
|
|
||||
|
self._pending_work_items[self._queue_count] = w |
||||
|
self._work_ids.put(self._queue_count) |
||||
|
self._queue_count += 1 |
||||
|
# Wake up queue management thread |
||||
|
self._result_queue.put(None) |
||||
|
|
||||
|
self._start_queue_management_thread() |
||||
|
self._adjust_process_count() |
||||
|
return f |
||||
|
submit.__doc__ = _base.Executor.submit.__doc__ |
||||
|
|
||||
|
def shutdown(self, wait=True): |
||||
|
with self._shutdown_lock: |
||||
|
self._shutdown_thread = True |
||||
|
if self._queue_management_thread: |
||||
|
# Wake up queue management thread |
||||
|
self._result_queue.put(None) |
||||
|
if wait: |
||||
|
self._queue_management_thread.join(sys.maxint) |
||||
|
# To reduce the risk of openning too many files, remove references to |
||||
|
# objects that use file descriptors. |
||||
|
self._queue_management_thread = None |
||||
|
self._call_queue = None |
||||
|
self._result_queue = None |
||||
|
self._processes = None |
||||
|
shutdown.__doc__ = _base.Executor.shutdown.__doc__ |
||||
|
|
||||
|
atexit.register(_python_exit) |
@ -0,0 +1,207 @@ |
|||||
|
# Copyright 2009 Brian Quinlan. All Rights Reserved. |
||||
|
# Licensed to PSF under a Contributor Agreement. |
||||
|
|
||||
|
"""Implements ThreadPoolExecutor.""" |
||||
|
|
||||
|
import atexit |
||||
|
from six import PY2 |
||||
|
if PY2: |
||||
|
from . import _base |
||||
|
else: |
||||
|
from concurrent.futures import _base |
||||
|
import itertools |
||||
|
import Queue as queue |
||||
|
import threading |
||||
|
import weakref |
||||
|
import sys |
||||
|
|
||||
|
try: |
||||
|
from multiprocessing import cpu_count |
||||
|
except ImportError: |
||||
|
# some platforms don't have multiprocessing |
||||
|
def cpu_count(): |
||||
|
return None |
||||
|
|
||||
|
__author__ = 'Brian Quinlan (brian@sweetapp.com)' |
||||
|
|
||||
|
# Workers are created as daemon threads. This is done to allow the interpreter |
||||
|
# to exit when there are still idle threads in a ThreadPoolExecutor's thread |
||||
|
# pool (i.e. shutdown() was not called). However, allowing workers to die with |
||||
|
# the interpreter has two undesirable properties: |
||||
|
# - The workers would still be running during interpretor shutdown, |
||||
|
# meaning that they would fail in unpredictable ways. |
||||
|
# - The workers could be killed while evaluating a work item, which could |
||||
|
# be bad if the callable being evaluated has external side-effects e.g. |
||||
|
# writing to a file. |
||||
|
# |
||||
|
# To work around this problem, an exit handler is installed which tells the |
||||
|
# workers to exit when their work queues are empty and then waits until the |
||||
|
# threads finish. |
||||
|
|
||||
|
_threads_queues = weakref.WeakKeyDictionary() |
||||
|
_shutdown = False |
||||
|
|
||||
|
def _python_exit(): |
||||
|
global _shutdown |
||||
|
_shutdown = True |
||||
|
items = list(_threads_queues.items()) if _threads_queues else () |
||||
|
for t, q in items: |
||||
|
q.put(None) |
||||
|
for t, q in items: |
||||
|
t.join(sys.maxint) |
||||
|
|
||||
|
atexit.register(_python_exit) |
||||
|
|
||||
|
class _WorkItem(object): |
||||
|
def __init__(self, future, fn, args, kwargs): |
||||
|
self.future = future |
||||
|
self.fn = fn |
||||
|
self.args = args |
||||
|
self.kwargs = kwargs |
||||
|
|
||||
|
def run(self): |
||||
|
if not self.future.set_running_or_notify_cancel(): |
||||
|
return |
||||
|
|
||||
|
try: |
||||
|
result = self.fn(*self.args, **self.kwargs) |
||||
|
except: |
||||
|
e, tb = sys.exc_info()[1:] |
||||
|
self.future.set_exception_info(e, tb) |
||||
|
else: |
||||
|
self.future.set_result(result) |
||||
|
|
||||
|
def _worker(executor_reference, work_queue, initializer, initargs): |
||||
|
if initializer is not None: |
||||
|
try: |
||||
|
initializer(*initargs) |
||||
|
except BaseException: |
||||
|
_base.LOGGER.critical('Exception in initializer:', exc_info=True) |
||||
|
executor = executor_reference() |
||||
|
if executor is not None: |
||||
|
executor._initializer_failed() |
||||
|
return |
||||
|
try: |
||||
|
while True: |
||||
|
work_item = work_queue.get(block=True) |
||||
|
if work_item is not None: |
||||
|
work_item.run() |
||||
|
# Delete references to object. See issue16284 |
||||
|
del work_item |
||||
|
|
||||
|
# attempt to increment idle count |
||||
|
executor = executor_reference() |
||||
|
if executor is not None: |
||||
|
executor._idle_semaphore.release() |
||||
|
del executor |
||||
|
continue |
||||
|
executor = executor_reference() |
||||
|
# Exit if: |
||||
|
# - The interpreter is shutting down OR |
||||
|
# - The executor that owns the worker has been collected OR |
||||
|
# - The executor that owns the worker has been shutdown. |
||||
|
if _shutdown or executor is None or executor._shutdown: |
||||
|
# Notice other workers |
||||
|
work_queue.put(None) |
||||
|
return |
||||
|
del executor |
||||
|
except: |
||||
|
_base.LOGGER.critical('Exception in worker', exc_info=True) |
||||
|
|
||||
|
|
||||
|
class BrokenThreadPool(_base.BrokenExecutor): |
||||
|
""" |
||||
|
Raised when a worker thread in a ThreadPoolExecutor failed initializing. |
||||
|
""" |
||||
|
|
||||
|
|
||||
|
class ThreadPoolExecutor(_base.Executor): |
||||
|
|
||||
|
# Used to assign unique thread names when thread_name_prefix is not supplied. |
||||
|
_counter = itertools.count().next |
||||
|
|
||||
|
def __init__(self, max_workers=None, thread_name_prefix='', initializer=None, initargs=()): |
||||
|
"""Initializes a new ThreadPoolExecutor instance. |
||||
|
|
||||
|
Args: |
||||
|
max_workers: The maximum number of threads that can be used to |
||||
|
execute the given calls. |
||||
|
thread_name_prefix: An optional name prefix to give our threads. |
||||
|
""" |
||||
|
if max_workers is None: |
||||
|
# Use this number because ThreadPoolExecutor is often |
||||
|
# used to overlap I/O instead of CPU work. |
||||
|
max_workers = (cpu_count() or 1) * 5 |
||||
|
if max_workers <= 0: |
||||
|
raise ValueError("max_workers must be greater than 0") |
||||
|
|
||||
|
self._max_workers = max_workers |
||||
|
self._initializer = initializer |
||||
|
self._initargs = initargs |
||||
|
self._work_queue = queue.Queue() |
||||
|
self._idle_semaphore = threading.Semaphore(0) |
||||
|
self._threads = set() |
||||
|
self._broken = False |
||||
|
self._shutdown = False |
||||
|
self._shutdown_lock = threading.Lock() |
||||
|
self._thread_name_prefix = (thread_name_prefix or |
||||
|
("ThreadPoolExecutor-%d" % self._counter())) |
||||
|
|
||||
|
def submit(self, fn, *args, **kwargs): |
||||
|
with self._shutdown_lock: |
||||
|
if self._broken: |
||||
|
raise BrokenThreadPool(self._broken) |
||||
|
if self._shutdown: |
||||
|
raise RuntimeError('cannot schedule new futures after shutdown') |
||||
|
|
||||
|
f = _base.Future() |
||||
|
w = _WorkItem(f, fn, args, kwargs) |
||||
|
|
||||
|
self._work_queue.put(w) |
||||
|
self._adjust_thread_count() |
||||
|
return f |
||||
|
submit.__doc__ = _base.Executor.submit.__doc__ |
||||
|
|
||||
|
def _adjust_thread_count(self): |
||||
|
# if idle threads are available, don't spin new threads |
||||
|
if self._idle_semaphore.acquire(False): |
||||
|
return |
||||
|
|
||||
|
# When the executor gets lost, the weakref callback will wake up |
||||
|
# the worker threads. |
||||
|
def weakref_cb(_, q=self._work_queue): |
||||
|
q.put(None) |
||||
|
|
||||
|
num_threads = len(self._threads) |
||||
|
if num_threads < self._max_workers: |
||||
|
thread_name = '%s_%d' % (self._thread_name_prefix or self, |
||||
|
num_threads) |
||||
|
t = threading.Thread(name=thread_name, target=_worker, |
||||
|
args=(weakref.ref(self, weakref_cb), |
||||
|
self._work_queue, self._initializer, self._initargs)) |
||||
|
t.daemon = True |
||||
|
t.start() |
||||
|
self._threads.add(t) |
||||
|
_threads_queues[t] = self._work_queue |
||||
|
|
||||
|
def _initializer_failed(self): |
||||
|
with self._shutdown_lock: |
||||
|
self._broken = ('A thread initializer failed, the thread pool ' |
||||
|
'is not usable anymore') |
||||
|
# Drain work queue and mark pending futures failed |
||||
|
while True: |
||||
|
try: |
||||
|
work_item = self._work_queue.get_nowait() |
||||
|
except queue.Empty: |
||||
|
break |
||||
|
if work_item is not None: |
||||
|
work_item.future.set_exception(BrokenThreadPool(self._broken)) |
||||
|
|
||||
|
def shutdown(self, wait=True): |
||||
|
with self._shutdown_lock: |
||||
|
self._shutdown = True |
||||
|
self._work_queue.put(None) |
||||
|
if wait: |
||||
|
for t in self._threads: |
||||
|
t.join(sys.maxint) |
||||
|
shutdown.__doc__ = _base.Executor.shutdown.__doc__ |
@ -0,0 +1,55 @@ |
|||||
|
# coding=utf-8 |
||||
|
# |
||||
|
# This file is part of SickGear. |
||||
|
# |
||||
|
# SickGear is free software: you can redistribute it and/or modify |
||||
|
# it under the terms of the GNU General Public License as published by |
||||
|
# the Free Software Foundation, either version 3 of the License, or |
||||
|
# (at your option) any later version. |
||||
|
# |
||||
|
# SickGear is distributed in the hope that it will be useful, |
||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of |
||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
||||
|
# GNU General Public License for more details. |
||||
|
# |
||||
|
# You should have received a copy of the GNU General Public License |
||||
|
# along with SickGear. If not, see <http://www.gnu.org/licenses/>. |
||||
|
|
||||
|
# noinspection PyUnresolvedReferences |
||||
|
import sys |
||||
|
|
||||
|
# noinspection PyProtectedMember |
||||
|
from .futures.thread import _base, BrokenThreadPool, ThreadPoolExecutor |
||||
|
|
||||
|
from .base import * |
||||
|
|
||||
|
|
||||
|
class SgWorkItem(GenericWorkItem): |
||||
|
|
||||
|
def run(self): |
||||
|
if self.future.set_running_or_notify_cancel(): |
||||
|
try: |
||||
|
self._set_thread_name() |
||||
|
result = self.fn(*self.args, **self.kwargs) |
||||
|
except (BaseException, Exception): |
||||
|
e, tb = sys.exc_info()[1:] |
||||
|
self.future.set_exception_info(e, tb) |
||||
|
else: |
||||
|
self.future.set_result(result) |
||||
|
|
||||
|
|
||||
|
class SgThreadPoolExecutor(ThreadPoolExecutor): |
||||
|
|
||||
|
def submit(self, fn, *args, **kwargs): |
||||
|
with self._shutdown_lock: |
||||
|
if self._broken: |
||||
|
raise BrokenThreadPool(self._broken) |
||||
|
if self._shutdown: |
||||
|
raise RuntimeError('cannot schedule new futures after shutdown') |
||||
|
|
||||
|
f = _base.Future() |
||||
|
w = SgWorkItem(f, fn, args, kwargs) |
||||
|
|
||||
|
self._work_queue.put(w) |
||||
|
self._adjust_thread_count() |
||||
|
return f |
@ -0,0 +1,68 @@ |
|||||
|
# coding=utf-8 |
||||
|
# |
||||
|
# This file is part of SickGear. |
||||
|
# |
||||
|
# SickGear is free software: you can redistribute it and/or modify |
||||
|
# it under the terms of the GNU General Public License as published by |
||||
|
# the Free Software Foundation, either version 3 of the License, or |
||||
|
# (at your option) any later version. |
||||
|
# |
||||
|
# SickGear is distributed in the hope that it will be useful, |
||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of |
||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
||||
|
# GNU General Public License for more details. |
||||
|
# |
||||
|
# You should have received a copy of the GNU General Public License |
||||
|
# along with SickGear. If not, see <http://www.gnu.org/licenses/>. |
||||
|
|
||||
|
# noinspection PyCompatibility |
||||
|
from concurrent.futures import ThreadPoolExecutor |
||||
|
# noinspection PyCompatibility,PyProtectedMember,PyUnresolvedReferences |
||||
|
from concurrent.futures.thread import _base, _shutdown, BrokenThreadPool |
||||
|
|
||||
|
from .base import * |
||||
|
|
||||
|
|
||||
|
class SgWorkItem(GenericWorkItem): |
||||
|
|
||||
|
def run(self): |
||||
|
if self.future.set_running_or_notify_cancel(): |
||||
|
try: |
||||
|
self._set_thread_name() |
||||
|
result = self.fn(*self.args, **self.kwargs) |
||||
|
except BaseException as exc: |
||||
|
self.future.set_exception(exc) |
||||
|
# Break a reference cycle with the exception 'exc' |
||||
|
self = None |
||||
|
else: |
||||
|
self.future.set_result(result) |
||||
|
|
||||
|
|
||||
|
class SgThreadPoolExecutor(ThreadPoolExecutor): |
||||
|
def submit(*args, **kwargs): |
||||
|
if 2 <= len(args): |
||||
|
self, fn, *args = args |
||||
|
elif not args: |
||||
|
raise TypeError('descriptor \'submit\' of \'ThreadPoolExecutor\' object needs an argument') |
||||
|
elif 'fn' in kwargs: |
||||
|
fn = kwargs.pop('fn') |
||||
|
self, *args = args |
||||
|
import warnings |
||||
|
warnings.warn('Passing \'fn\' as keyword argument is deprecated', DeprecationWarning, stacklevel=2) |
||||
|
else: |
||||
|
raise TypeError('submit expected at least 1 positional argument, got %d' % (len(args) - 1)) |
||||
|
|
||||
|
with self._shutdown_lock: |
||||
|
if self._broken: |
||||
|
raise BrokenThreadPool(self._broken) |
||||
|
if self._shutdown: |
||||
|
raise RuntimeError('cannot schedule new futures after shutdown') |
||||
|
if _shutdown: |
||||
|
raise RuntimeError('cannot schedule new futures after interpreter shutdown') |
||||
|
|
||||
|
f = _base.Future() |
||||
|
w = SgWorkItem(f, fn, args, kwargs) |
||||
|
|
||||
|
self._work_queue.put(w) |
||||
|
self._adjust_thread_count() |
||||
|
return f |
Loading…
Reference in new issue