You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

380 lines
11 KiB

# Copyright 2015 The Tornado Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Asynchronous queues for coroutines. These classes are very similar
to those provided in the standard library's `asyncio package
<https://docs.python.org/3/library/asyncio-queue.html>`_.
.. warning::
Unlike the standard library's `queue` module, the classes defined here
are *not* thread-safe. To use these queues from another thread,
use `.IOLoop.add_callback` to transfer control to the `.IOLoop` thread
before calling any queue methods.
"""
from __future__ import absolute_import, division, print_function
import collections
import heapq
Change core system to improve performance and facilitate multi TV info sources. Change migrate core objects TVShow and TVEpisode and everywhere that these objects affect. Add message to logs and disable ui backlog buttons when no media provider has active and/or scheduled searching enabled. Change views for py3 compat. Change set default runtime of 5 mins if none is given for layout Day by Day. Add OpenSubtitles authentication support to config/Subtitles/Subtitles Plugin. Add &#34;Enforce media hash match&#34; to config/Subtitles Plugin/Opensubtitles for accurate subs if enabled, but if disabled, search failures will fallback to use less reliable subtitle results. Add Apprise 0.8.0 (6aa52c3). Add hachoir_py3 3.0a6 (5b9e05a). Add sgmllib3k 1.0.0 Update soupsieve 1.9.1 (24859cc) to soupsieve_py2 1.9.5 (6a38398) Add soupsieve_py3 2.0.0.dev (69194a2). Add Tornado_py3 Web Server 6.0.3 (ff985fe). Add xmlrpclib_to 0.1.1 (c37db9e). Remove ancient Growl lib 0.1 Remove xmltodict library. Change requirements.txt for Cheetah3 to minimum 3.2.4 Change update sabToSickBeard. Change update autoProcessTV. Change remove Twitter notifier. Update NZBGet Process Media extension, SickGear-NG 1.7 → 2.4 Update Kodi addon 1.0.3 → 1.0.4 Update ADBA for py3. Update Beautiful Soup 4.8.0 (r526) to 4.8.1 (r531). Update Send2Trash 1.3.0 (a568370) to 1.5.0 (66afce7). Update soupsieve 1.9.1 (24859cc) to 1.9.5 (6a38398). Change use GNTP (Growl Notification Transport Protocol) from Apprise. Change add multi host support to Growl notifier. Fix Growl notifier when using empty password. Change update links for Growl notifications. Change deprecate confg/Notifications/Growl password field as these are now stored with host setting. Fix prevent infinite memoryError from a particular jpg data structure. Change subliminal for py3. Change enzyme for py3. Change browser_ua for py3. Change feedparser for py3 (sgmlib is no longer available on py3 as standardlib so added ext lib) Fix Guessit. Fix parse_xml for py3. Fix name parser with multi eps for py3. Fix tvdb_api fixes for py3 (search show). Fix config/media process to only display &#34;pattern is invalid&#34; qtip on &#34;Episode naming&#34; tab if the associated field is actually visible. Also, if the field becomes hidden due to a setting change, hide any previously displayed qtip. Note for Javascript::getelementbyid (or $(&#39;tag[id=&#34;&lt;name&gt;&#34;&#39;)) is required when an id is being searched in the dom due to &#34;:&#34; used in a shows id name. Change download anidb xml files to main cache folder and use adba lib folder as a last resort. Change create get anidb show groups as centralised helper func and consolidate dupe code. Change move anidb related functions to newly renamed anime.py (from blacklistandwhitelist.py). Change str encode hex no longer exits in py3, use codecs.encode(...) instead. Change fix b64decode on py3 returns bytestrings. Change use binary read when downloading log file via browser to prevent any encoding issues. Change add case insensitive ordering to anime black/whitelist. Fix anime groups list not excluding whitelisted stuff. Change add Windows utf8 fix ... see: ytdl-org/youtube-dl#820 Change if no qualities are wanted, exit manual search thread. Fix keepalive for py3 process media. Change add a once a month update of tvinfo show mappings to the daily updater. Change autocorrect ids of new shows by updating from -8 to 31 days of the airdate of episode one. Add next run time to Manage/Show Tasks/Daily show update. Change when fetching imdb data, if imdb id is an episode id then try to find and use real show id. Change delete diskcache db in imdbpie when value error (due to change in Python version). Change during startup, cleanup any _cleaner.pyc/o to prevent issues when switching python versions. Add .pyc cleaner if python version is switched. Change replace deprecated gettz_db_metadata() and gettz. Change rebrand &#34;SickGear PostProcessing script&#34; to &#34;SickGear Process Media extension&#34;. Change improve setup guide to use the NZBGet version to minimise displayed text based on version. Change NZBGet versions prior to v17 now told to upgrade as those version are no longer supported - code has actually exit on start up for some time but docs were outdated. Change comment out code and unused option sg_base_path. Change supported Python version 2.7.9-2.7.18 inclusive expanded to 3.7.1-3.8.1 inclusive. Change pidfile creation under Linux 0o644. Make logger accept lists to output continuously using the log_lock instead of split up by other processes. Fix long path issues with Windows process media.
6 years ago
from tornado_py2 import gen, ioloop
from tornado_py2.concurrent import Future, future_set_result_unless_cancelled
from tornado_py2.locks import Event
__all__ = ['Queue', 'PriorityQueue', 'LifoQueue', 'QueueFull', 'QueueEmpty']
class QueueEmpty(Exception):
"""Raised by `.Queue.get_nowait` when the queue has no items."""
pass
class QueueFull(Exception):
"""Raised by `.Queue.put_nowait` when a queue is at its maximum size."""
pass
def _set_timeout(future, timeout):
if timeout:
def on_timeout():
if not future.done():
future.set_exception(gen.TimeoutError())
io_loop = ioloop.IOLoop.current()
timeout_handle = io_loop.add_timeout(timeout, on_timeout)
future.add_done_callback(
lambda _: io_loop.remove_timeout(timeout_handle))
class _QueueIterator(object):
def __init__(self, q):
self.q = q
def __anext__(self):
return self.q.get()
class Queue(object):
"""Coordinate producer and consumer coroutines.
If maxsize is 0 (the default) the queue size is unbounded.
.. testcode::
from tornado import gen
from tornado.ioloop import IOLoop
from tornado.queues import Queue
q = Queue(maxsize=2)
async def consumer():
async for item in q:
try:
print('Doing work on %s' % item)
await gen.sleep(0.01)
finally:
q.task_done()
async def producer():
for item in range(5):
await q.put(item)
print('Put %s' % item)
async def main():
# Start consumer without waiting (since it never finishes).
IOLoop.current().spawn_callback(consumer)
await producer() # Wait for producer to put all tasks.
await q.join() # Wait for consumer to finish all tasks.
print('Done')
IOLoop.current().run_sync(main)
.. testoutput::
Put 0
Put 1
Doing work on 0
Put 2
Doing work on 1
Put 3
Doing work on 2
Put 4
Doing work on 3
Doing work on 4
Done
In versions of Python without native coroutines (before 3.5),
``consumer()`` could be written as::
@gen.coroutine
def consumer():
while True:
item = yield q.get()
try:
print('Doing work on %s' % item)
yield gen.sleep(0.01)
finally:
q.task_done()
.. versionchanged:: 4.3
Added ``async for`` support in Python 3.5.
"""
def __init__(self, maxsize=0):
if maxsize is None:
raise TypeError("maxsize can't be None")
if maxsize < 0:
raise ValueError("maxsize can't be negative")
self._maxsize = maxsize
self._init()
self._getters = collections.deque([]) # Futures.
self._putters = collections.deque([]) # Pairs of (item, Future).
self._unfinished_tasks = 0
self._finished = Event()
self._finished.set()
@property
def maxsize(self):
"""Number of items allowed in the queue."""
return self._maxsize
def qsize(self):
"""Number of items in the queue."""
return len(self._queue)
def empty(self):
return not self._queue
def full(self):
if self.maxsize == 0:
return False
else:
return self.qsize() >= self.maxsize
def put(self, item, timeout=None):
"""Put an item into the queue, perhaps waiting until there is room.
Returns a Future, which raises `tornado.util.TimeoutError` after a
timeout.
``timeout`` may be a number denoting a time (on the same
scale as `tornado.ioloop.IOLoop.time`, normally `time.time`), or a
`datetime.timedelta` object for a deadline relative to the
current time.
"""
future = Future()
try:
self.put_nowait(item)
except QueueFull:
self._putters.append((item, future))
_set_timeout(future, timeout)
else:
future.set_result(None)
return future
def put_nowait(self, item):
"""Put an item into the queue without blocking.
If no free slot is immediately available, raise `QueueFull`.
"""
self._consume_expired()
if self._getters:
assert self.empty(), "queue non-empty, why are getters waiting?"
getter = self._getters.popleft()
self.__put_internal(item)
future_set_result_unless_cancelled(getter, self._get())
elif self.full():
raise QueueFull
else:
self.__put_internal(item)
def get(self, timeout=None):
"""Remove and return an item from the queue.
Returns a Future which resolves once an item is available, or raises
`tornado.util.TimeoutError` after a timeout.
``timeout`` may be a number denoting a time (on the same
scale as `tornado.ioloop.IOLoop.time`, normally `time.time`), or a
`datetime.timedelta` object for a deadline relative to the
current time.
"""
future = Future()
try:
future.set_result(self.get_nowait())
except QueueEmpty:
self._getters.append(future)
_set_timeout(future, timeout)
return future
def get_nowait(self):
"""Remove and return an item from the queue without blocking.
Return an item if one is immediately available, else raise
`QueueEmpty`.
"""
self._consume_expired()
if self._putters:
assert self.full(), "queue not full, why are putters waiting?"
item, putter = self._putters.popleft()
self.__put_internal(item)
future_set_result_unless_cancelled(putter, None)
return self._get()
elif self.qsize():
return self._get()
else:
raise QueueEmpty
def task_done(self):
"""Indicate that a formerly enqueued task is complete.
Used by queue consumers. For each `.get` used to fetch a task, a
subsequent call to `.task_done` tells the queue that the processing
on the task is complete.
If a `.join` is blocking, it resumes when all items have been
processed; that is, when every `.put` is matched by a `.task_done`.
Raises `ValueError` if called more times than `.put`.
"""
if self._unfinished_tasks <= 0:
raise ValueError('task_done() called too many times')
self._unfinished_tasks -= 1
if self._unfinished_tasks == 0:
self._finished.set()
def join(self, timeout=None):
"""Block until all items in the queue are processed.
Returns a Future, which raises `tornado.util.TimeoutError` after a
timeout.
"""
return self._finished.wait(timeout)
def __aiter__(self):
return _QueueIterator(self)
# These three are overridable in subclasses.
def _init(self):
self._queue = collections.deque()
def _get(self):
return self._queue.popleft()
def _put(self, item):
self._queue.append(item)
# End of the overridable methods.
def __put_internal(self, item):
self._unfinished_tasks += 1
self._finished.clear()
self._put(item)
def _consume_expired(self):
# Remove timed-out waiters.
while self._putters and self._putters[0][1].done():
self._putters.popleft()
while self._getters and self._getters[0].done():
self._getters.popleft()
def __repr__(self):
return '<%s at %s %s>' % (
type(self).__name__, hex(id(self)), self._format())
def __str__(self):
return '<%s %s>' % (type(self).__name__, self._format())
def _format(self):
result = 'maxsize=%r' % (self.maxsize, )
if getattr(self, '_queue', None):
result += ' queue=%r' % self._queue
if self._getters:
result += ' getters[%s]' % len(self._getters)
if self._putters:
result += ' putters[%s]' % len(self._putters)
if self._unfinished_tasks:
result += ' tasks=%s' % self._unfinished_tasks
return result
class PriorityQueue(Queue):
"""A `.Queue` that retrieves entries in priority order, lowest first.
Entries are typically tuples like ``(priority number, data)``.
.. testcode::
from tornado.queues import PriorityQueue
q = PriorityQueue()
q.put((1, 'medium-priority item'))
q.put((0, 'high-priority item'))
q.put((10, 'low-priority item'))
print(q.get_nowait())
print(q.get_nowait())
print(q.get_nowait())
.. testoutput::
(0, 'high-priority item')
(1, 'medium-priority item')
(10, 'low-priority item')
"""
def _init(self):
self._queue = []
def _put(self, item):
heapq.heappush(self._queue, item)
def _get(self):
return heapq.heappop(self._queue)
class LifoQueue(Queue):
"""A `.Queue` that retrieves the most recently put items first.
.. testcode::
from tornado.queues import LifoQueue
q = LifoQueue()
q.put(3)
q.put(2)
q.put(1)
print(q.get_nowait())
print(q.get_nowait())
print(q.get_nowait())
.. testoutput::
1
2
3
"""
def _init(self):
self._queue = []
def _put(self, item):
self._queue.append(item)
def _get(self):
return self._queue.pop()