Browse Source

Refactor RSS to fit the rest of the threads

tags/3.1.0RC2
Safihre 5 years ago
parent
commit
82b3f210f6
  1. 13
      sabnzbd/__init__.py
  2. 18
      sabnzbd/interface.py
  3. 101
      sabnzbd/rss.py
  4. 8
      sabnzbd/scheduler.py
  5. 4
      tests/test_rss.py

13
sabnzbd/__init__.py

@ -126,6 +126,7 @@ NzbQueue: sabnzbd.nzbqueue.NzbQueue
URLGrabber: sabnzbd.urlgrabber.URLGrabber
DirScanner: sabnzbd.dirscanner.DirScanner
BPSMeter: sabnzbd.bpsmeter.BPSMeter
RSSReader: sabnzbd.rss.RSSReader
# Regular constants
START = datetime.datetime.now()
@ -325,8 +326,6 @@ def initialize(pause_downloader=False, clean_up=False, evaluate_schedules=False,
pause_downloader = True
# Initialize threads
rss.init()
sabnzbd.ArticleCache = sabnzbd.articlecache.ArticleCache()
sabnzbd.BPSMeter = sabnzbd.bpsmeter.BPSMeter()
sabnzbd.NzbQueue = sabnzbd.nzbqueue.NzbQueue()
@ -337,6 +336,7 @@ def initialize(pause_downloader=False, clean_up=False, evaluate_schedules=False,
sabnzbd.DirScanner = sabnzbd.dirscanner.DirScanner()
sabnzbd.Rating = sabnzbd.rating.Rating()
sabnzbd.URLGrabber = sabnzbd.urlgrabber.URLGrabber()
sabnzbd.RSSReader = sabnzbd.rss.RSSReader()
sabnzbd.NzbQueue.read_queue(repair)
scheduler.init()
@ -394,7 +394,12 @@ def halt():
sabnzbd.directunpacker.abort_all()
rss.stop()
logging.debug("Stopping RSSReader")
sabnzbd.RSSReader.stop()
try:
sabnzbd.RSSReader.join()
except:
pass
logging.debug("Stopping URLGrabber")
sabnzbd.URLGrabber.stop()
@ -578,10 +583,10 @@ def save_state():
sabnzbd.ArticleCache.flush_articles()
sabnzbd.NzbQueue.save()
sabnzbd.BPSMeter.save()
rss.save()
sabnzbd.Rating.save()
sabnzbd.DirScanner.save()
sabnzbd.PostProcessor.save()
sabnzbd.RSSReader.save()
def pause_all():

18
sabnzbd/interface.py

@ -1759,7 +1759,7 @@ class ConfigRss:
active_feed = kwargs.get("feed", "")
conf["active_feed"] = active_feed
conf["rss"] = rss
conf["rss_next"] = time.strftime(time_format("%H:%M"), time.localtime(sabnzbd.rss.next_run()))
conf["rss_next"] = time.strftime(time_format("%H:%M"), time.localtime(sabnzbd.RSSReader.next_run))
if active_feed:
readout = bool(self.__refresh_readout)
@ -1769,7 +1769,7 @@ class ConfigRss:
self.__refresh_force = False
self.__refresh_ignore = False
if self.__evaluate:
msg = sabnzbd.rss.run_feed(
msg = sabnzbd.RSSReader.run_feed(
active_feed,
download=self.__refresh_download,
force=self.__refresh_force,
@ -1780,7 +1780,7 @@ class ConfigRss:
msg = ""
self.__evaluate = False
if readout:
sabnzbd.rss.save()
sabnzbd.RSSReader.save()
self.__last_msg = msg
else:
msg = self.__last_msg
@ -1883,7 +1883,7 @@ class ConfigRss:
config.ConfigRSS(feed, kwargs)
# Clear out any existing reference to this feed name
# Otherwise first-run detection can fail
sabnzbd.rss.clear_feed(feed)
sabnzbd.RSSReader.clear_feed(feed)
config.save_config()
self.__refresh_readout = feed
self.__refresh_download = False
@ -1939,7 +1939,7 @@ class ConfigRss:
kwargs["section"] = "rss"
kwargs["keyword"] = kwargs.get("feed")
del_from_section(kwargs)
sabnzbd.rss.clear_feed(kwargs.get("feed"))
sabnzbd.RSSReader.clear_feed(kwargs.get("feed"))
raise Raiser(self.__root)
@secured_expose(check_api_key=True, check_configlock=True)
@ -1975,7 +1975,7 @@ class ConfigRss:
@secured_expose(check_api_key=True, check_configlock=True)
def clean_rss_jobs(self, *args, **kwargs):
""" Remove processed RSS jobs from UI """
sabnzbd.rss.clear_downloaded(kwargs["feed"])
sabnzbd.RSSReader.clear_downloaded(kwargs["feed"])
self.__evaluate = True
raise rssRaiser(self.__root, kwargs)
@ -2010,7 +2010,7 @@ class ConfigRss:
feed = kwargs.get("feed")
url = kwargs.get("url")
nzbname = kwargs.get("nzbname")
att = sabnzbd.rss.lookup_url(feed, url)
att = sabnzbd.RSSReader.lookup_url(feed, url)
if att:
pp = att.get("pp")
cat = att.get("cat")
@ -2020,7 +2020,7 @@ class ConfigRss:
if url:
sabnzbd.add_url(url, pp, script, cat, prio, nzbname)
# Need to pass the title instead
sabnzbd.rss.flag_downloaded(feed, url)
sabnzbd.RSSReader.flag_downloaded(feed, url)
raise rssRaiser(self.__root, kwargs)
@secured_expose(check_api_key=True, check_configlock=True)
@ -2655,7 +2655,7 @@ def GetRssLog(feed):
return job
jobs = list(sabnzbd.rss.show_result(feed).values())
jobs = sabnzbd.RSSReader.show_result(feed).values()
good, bad, done = ([], [], [])
for job in jobs:
if job["status"][0] == "G":

101
sabnzbd/rss.py

@ -35,93 +35,12 @@ import sabnzbd.emailer as emailer
import feedparser
__RSS = None # Global pointer to RSS-scanner instance
##############################################################################
# Wrapper functions
##############################################################################
def init():
global __RSS
__RSS = RSSQueue()
def stop():
global __RSS
if __RSS:
__RSS.stop()
try:
__RSS.join()
except:
pass
def run_feed(feed, download, ignoreFirst=False, force=False, readout=True):
global __RSS
if __RSS:
return __RSS.run_feed(feed, download, ignoreFirst, force=force, readout=readout)
def show_result(feed):
global __RSS
if __RSS:
return __RSS.show_result(feed)
def flag_downloaded(feed, fid):
global __RSS
if __RSS:
__RSS.flag_downloaded(feed, fid)
def lookup_url(feed, fid):
global __RSS
if __RSS:
return __RSS.lookup_url(feed, fid)
def run_method():
global __RSS
if __RSS:
return __RSS.run()
else:
return None
def next_run(t=None):
global __RSS
if __RSS:
if t:
__RSS.next_run = t
else:
return __RSS.next_run
else:
return time.time()
def save():
global __RSS
if __RSS:
__RSS.save()
def clear_feed(feed):
global __RSS
if __RSS:
__RSS.clear_feed(feed)
def clear_downloaded(feed):
global __RSS
if __RSS:
__RSS.clear_downloaded(feed)
##############################################################################
def notdefault(item):
""" Return True if not 'Default|''|*' """
return bool(item) and str(item).lower() not in ("default", "*", "", str(DEFAULT_PRIORITY))
@ -161,13 +80,13 @@ def remove_obsolete(jobs, new_jobs):
del jobs[old]
LOCK = threading.RLock()
RSS_LOCK = threading.RLock()
_RE_SP = re.compile(r"s*(\d+)[ex](\d+)", re.I)
_RE_SIZE1 = re.compile(r"Size:\s*(\d+\.\d+\s*[KMG]{0,1})B\W*", re.I)
_RE_SIZE2 = re.compile(r"\W*(\d+\.\d+\s*[KMG]{0,1})B\W*", re.I)
class RSSQueue:
class RSSReader:
def __init__(self):
self.jobs = {}
self.next_run = time.time()
@ -211,7 +130,7 @@ class RSSQueue:
def stop(self):
self.shutdown = True
@synchronized(LOCK)
@synchronized(RSS_LOCK)
def run_feed(self, feed=None, download=False, ignoreFirst=False, force=False, readout=True):
""" Run the query for one URI and apply filters """
self.shutdown = False
@ -572,7 +491,7 @@ class RSSQueue:
self.save()
logging.info("Finished scheduled RSS read-outs")
@synchronized(LOCK)
@synchronized(RSS_LOCK)
def show_result(self, feed):
if feed in self.jobs:
try:
@ -582,16 +501,16 @@ class RSSQueue:
else:
return {}
@synchronized(LOCK)
@synchronized(RSS_LOCK)
def save(self):
sabnzbd.save_admin(self.jobs, RSS_FILE_NAME)
@synchronized(LOCK)
@synchronized(RSS_LOCK)
def delete(self, feed):
if feed in self.jobs:
del self.jobs[feed]
@synchronized(LOCK)
@synchronized(RSS_LOCK)
def flag_downloaded(self, feed, fid):
if feed in self.jobs:
lst = self.jobs[feed]
@ -600,7 +519,7 @@ class RSSQueue:
lst[link]["status"] = "D"
lst[link]["time_downloaded"] = time.localtime()
@synchronized(LOCK)
@synchronized(RSS_LOCK)
def lookup_url(self, feed, url):
if url and feed in self.jobs:
lst = self.jobs[feed]
@ -609,13 +528,13 @@ class RSSQueue:
return lst[link]
return None
@synchronized(LOCK)
@synchronized(RSS_LOCK)
def clear_feed(self, feed):
# Remove any previous references to this feed name, and start fresh
if feed in self.jobs:
del self.jobs[feed]
@synchronized(LOCK)
@synchronized(RSS_LOCK)
def clear_downloaded(self, feed):
# Mark downloaded jobs, so that they won't be displayed any more.
if feed in self.jobs:

8
sabnzbd/scheduler.py

@ -176,11 +176,11 @@ def init():
interval = cfg.rss_rate()
delay = random.randint(0, interval - 1)
logging.debug("Scheduling RSS interval task every %s min (delay=%s)", interval, delay)
sabnzbd.rss.next_run(time.time() + delay * 60)
sabnzbd.RSSReader.next_run = time.time() + delay * 60
__SCHED.add_interval_task(
sabnzbd.rss.run_method, "RSS", delay * 60, interval * 60, kronos.method.sequential, None, None
sabnzbd.RSSReader.run, "RSS", delay * 60, interval * 60, kronos.method.sequential, None, None
)
__SCHED.add_single_task(sabnzbd.rss.run_method, "RSS", 15, kronos.method.sequential, None, None)
__SCHED.add_single_task(sabnzbd.RSSReader.run, "RSS", 15, kronos.method.sequential, None, None)
if cfg.version_check():
# Check for new release, once per week on random time
@ -496,7 +496,7 @@ def plan_server(action, parms, interval):
def force_rss():
""" Add a one-time RSS scan, one second from now """
__SCHED.add_single_task(sabnzbd.rss.run_method, "RSS", 1, kronos.method.sequential, None, None)
__SCHED.add_single_task(sabnzbd.RSSReader.run, "RSS", 1, kronos.method.sequential, None, None)
# Scheduler Guarding system

4
tests/test_rss.py

@ -52,7 +52,7 @@ class TestRSS:
self.setup_rss(feed_name, "https://sabnzbd.org/tests/rss_newznab_test.xml")
# Start the RSS reader
rss_obj = rss.RSSQueue()
rss_obj = rss.RSSReader()
rss_obj.run_feed(feed_name)
# Is the feed processed?
@ -79,7 +79,7 @@ class TestRSS:
self.setup_rss(feed_name, "https://sabnzbd.org/tests/rss_nzedb_test.xml")
# Start the RSS reader
rss_obj = rss.RSSQueue()
rss_obj = rss.RSSReader()
rss_obj.run_feed(feed_name)
# Is the feed processed?

Loading…
Cancel
Save