Browse Source

Scheduler refactor and add additional typing

pull/1628/head
Safihre 5 years ago
parent
commit
30654af261
  1. 5
      SABnzbd.py
  2. 2
      interfaces/Config/templates/config_scheduling.tmpl
  3. 51
      sabnzbd/__init__.py
  4. 13
      sabnzbd/api.py
  5. 27
      sabnzbd/bpsmeter.py
  6. 1
      sabnzbd/dirscanner.py
  7. 3
      sabnzbd/downloader.py
  8. 26
      sabnzbd/interface.py
  9. 4
      sabnzbd/nzbqueue.py
  10. 11
      sabnzbd/osxmenu.py
  11. 10
      sabnzbd/postproc.py
  12. 9
      sabnzbd/sabtray.py
  13. 7
      sabnzbd/sabtraylinux.py
  14. 341
      sabnzbd/scheduler.py
  15. 3
      sabnzbd/urlgrabber.py
  16. 66
      sabnzbd/utils/kronos.py

5
SABnzbd.py

@ -69,7 +69,6 @@ from sabnzbd.misc import (
)
from sabnzbd.filesystem import get_ext, real_path, long_path, globber_full, remove_file
from sabnzbd.panic import panic_tmpl, panic_port, panic_host, panic, launch_a_browser
import sabnzbd.scheduler as scheduler
import sabnzbd.config as config
import sabnzbd.cfg
import sabnzbd.downloader
@ -1232,7 +1231,7 @@ def main():
if autobrowser is not None:
sabnzbd.cfg.autobrowser.set(autobrowser)
sabnzbd.initialize(pause, clean_up, evaluate_schedules=True, repair=repair)
sabnzbd.initialize(pause, clean_up, repair=repair)
os.chdir(sabnzbd.DIR_PROG)
@ -1512,7 +1511,7 @@ def main():
# Keep OS awake (if needed)
sabnzbd.keep_awake()
# Restart scheduler (if needed)
scheduler.restart()
sabnzbd.Scheduler.restart(plan_restart=False)
# Save config (if needed)
config.save_config()
# Check the threads

2
interfaces/Config/templates/config_scheduling.tmpl

@ -50,7 +50,7 @@ else:
<select name="action" id="action">
<optgroup label="$T('sch-action')">
<!--#for $action in $actions#-->
<option value="$action" data-action="" data-noarg="<!--#if $action is 'speedlimit' then 0 else 1#-->">$actions_lng[$action]</option>
<option value="$action" data-action="" data-noarg="<!--#if $action == 'speedlimit' then 0 else 1#-->">$actions_lng[$action]</option>
<!--#end for#-->
</optgroup>
<optgroup label="$T('cmenu-servers')">

51
sabnzbd/__init__.py

@ -127,6 +127,7 @@ URLGrabber: sabnzbd.urlgrabber.URLGrabber
DirScanner: sabnzbd.dirscanner.DirScanner
BPSMeter: sabnzbd.bpsmeter.BPSMeter
RSSReader: sabnzbd.rss.RSSReader
Scheduler: sabnzbd.scheduler.Scheduler
# Regular constants
START = datetime.datetime.now()
@ -228,7 +229,7 @@ def get_db_connection(thread_index=0):
@synchronized(INIT_LOCK)
def initialize(pause_downloader=False, clean_up=False, evaluate_schedules=False, repair=0):
def initialize(pause_downloader=False, clean_up=False, repair=0):
if sabnzbd.__INITIALIZED__:
return False
@ -290,25 +291,12 @@ def initialize(pause_downloader=False, clean_up=False, evaluate_schedules=False,
lang.set_language(cfg.language())
sabnzbd.api.clear_trans_cache()
# Set end-of-queue action
sabnzbd.change_queue_complete_action(cfg.queue_complete(), new=False)
# One time conversion "speedlimit" in schedules.
if not cfg.sched_converted():
schedules = cfg.schedules()
newsched = []
for sched in schedules:
if "speedlimit" in sched:
newsched.append(re.sub(r"(speedlimit \d+)$", r"\1K", sched))
else:
newsched.append(sched)
cfg.schedules.set(newsched)
cfg.sched_converted.set(1)
# Second time schedule conversion
if cfg.sched_converted() != 2:
cfg.schedules.set(["%s %s" % (1, schedule) for schedule in cfg.schedules()])
cfg.sched_converted.set(2)
config.save_config()
# Set cache limit
if not cfg.cache_limit():
cfg.cache_limit.set(misc.get_cache_limit())
# Convert auto-sort
if cfg.auto_sort() == "0":
@ -337,21 +325,16 @@ def initialize(pause_downloader=False, clean_up=False, evaluate_schedules=False,
sabnzbd.Rating = sabnzbd.rating.Rating()
sabnzbd.URLGrabber = sabnzbd.urlgrabber.URLGrabber()
sabnzbd.RSSReader = sabnzbd.rss.RSSReader()
sabnzbd.NzbQueue.read_queue(repair)
sabnzbd.Scheduler = sabnzbd.scheduler.Scheduler()
scheduler.init()
if evaluate_schedules:
scheduler.analyse(pause_downloader)
# Set cache limit
if not cfg.cache_limit() or (cfg.cache_limit() in ("200M", "450M") and (sabnzbd.WIN32 or sabnzbd.DARWIN)):
cfg.cache_limit.set(misc.get_cache_limit())
# Run startup tasks
sabnzbd.NzbQueue.read_queue(repair)
sabnzbd.Scheduler.analyse(pause_downloader)
sabnzbd.ArticleCache.new_limit(cfg.cache_limit.get_int())
logging.info("All processes started")
sabnzbd.RESTART_REQ = False
sabnzbd.__INITIALIZED__ = True
return True
@synchronized(INIT_LOCK)
@ -369,7 +352,8 @@ def start():
logging.debug("Starting decoders")
sabnzbd.Decoder.start()
scheduler.start()
logging.debug("Starting scheduler")
sabnzbd.Scheduler.start()
logging.debug("Starting dirscanner")
sabnzbd.DirScanner.start()
@ -452,7 +436,8 @@ def halt():
# Since all warm-restarts have been removed, it's not longer
# needed to stop the scheduler.
# We must tell the scheduler to deactivate.
scheduler.abort()
logging.debug("Terminating scheduler")
sabnzbd.Scheduler.abort()
logging.info("All processes stopped")
@ -520,7 +505,7 @@ def guard_quota_size():
def guard_quota_dp():
""" Callback for change of quota_day or quota_period """
scheduler.restart(force=True)
sabnzbd.Scheduler.restart()
def guard_language():
@ -1045,13 +1030,13 @@ def check_all_tasks():
if not sabnzbd.Rating.is_alive():
logging.info("Restarting crashed rating")
sabnzbd.Rating.__init__()
if not sabnzbd.scheduler.sched_check():
if not sabnzbd.Scheduler.is_alive():
logging.info("Restarting crashed scheduler")
sabnzbd.scheduler.init()
sabnzbd.Scheduler.restart()
sabnzbd.Downloader.unblock_all()
# Check one-shot pause
sabnzbd.scheduler.pause_check()
sabnzbd.Scheduler.pause_check()
# Check (and terminate) idle jobs
sabnzbd.NzbQueue.stop_idle_jobs()

13
sabnzbd/api.py

@ -50,7 +50,6 @@ from sabnzbd.constants import (
)
import sabnzbd.config as config
import sabnzbd.cfg as cfg
import sabnzbd.scheduler as scheduler
from sabnzbd.skintext import SKIN_TEXT
from sabnzbd.utils.pathbrowser import folders_at_path
from sabnzbd.utils.getperformance import getcpu
@ -574,14 +573,14 @@ def _api_addurl(name, output, kwargs):
def _api_pause(name, output, kwargs):
""" API: accepts output """
scheduler.plan_resume(0)
sabnzbd.Scheduler.plan_resume(0)
sabnzbd.Downloader.pause()
return report(output)
def _api_resume(name, output, kwargs):
""" API: accepts output """
scheduler.plan_resume(0)
sabnzbd.Scheduler.plan_resume(0)
sabnzbd.unpause_all()
return report(output)
@ -705,7 +704,7 @@ def _api_pause_pp(name, output, kwargs):
def _api_rss_now(name, output, kwargs):
""" API: accepts output """
# Run RSS scan async, because it can take a long time
scheduler.force_rss()
sabnzbd.Scheduler.force_rss()
return report(output)
@ -843,7 +842,7 @@ def _api_config_set_colorscheme(output, kwargs):
def _api_config_set_pause(output, kwargs):
""" API: accepts output, value(=pause interval) """
value = kwargs.get("value")
scheduler.plan_resume(int_conv(value))
sabnzbd.Scheduler.plan_resume(int_conv(value))
return report(output)
@ -1597,14 +1596,14 @@ def build_header(webdir="", output=None, trans_functions=True):
header["darwin"] = sabnzbd.DARWIN
header["power_options"] = sabnzbd.WIN32 or sabnzbd.DARWIN or sabnzbd.LINUX_POWER
header["pp_pause_event"] = sabnzbd.scheduler.pp_pause_event()
header["pp_pause_event"] = sabnzbd.Scheduler.pp_pause_event
header["apikey"] = cfg.api_key()
header["new_release"], header["new_rel_url"] = sabnzbd.NEW_VERSION
header["version"] = sabnzbd.__version__
header["paused"] = bool(sabnzbd.Downloader.paused or sabnzbd.Downloader.postproc)
header["pause_int"] = scheduler.pause_int()
header["pause_int"] = sabnzbd.Scheduler.pause_int()
header["paused_all"] = sabnzbd.PAUSED_ALL
header["diskspace1"] = "%.2f" % diskspace_info["download_dir"][1]

27
sabnzbd/bpsmeter.py

@ -22,6 +22,7 @@ sabnzbd.bpsmeter - bpsmeter
import time
import logging
import re
from typing import List, Dict
import sabnzbd
from sabnzbd.constants import BYTES_FILE_NAME, KIBI
@ -94,20 +95,20 @@ class BPSMeter:
self.speed_log_time = t
self.last_update = t
self.bps = 0.0
self.bps_list = []
self.bps_list: List[int] = []
self.bps_list_max = 275
self.day_total = {}
self.week_total = {}
self.month_total = {}
self.grand_total = {}
self.day_total: Dict[str, int] = {}
self.week_total: Dict[str, int] = {}
self.month_total: Dict[str, int] = {}
self.grand_total: Dict[str, int] = {}
self.timeline_total = {}
self.timeline_total: Dict[str, Dict[str, int]] = {}
self.day_label = time.strftime("%Y-%m-%d")
self.end_of_day = tomorrow(t) # Time that current day will end
self.end_of_week = next_week(t) # Time that current day will end
self.end_of_month = next_month(t) # Time that current month will end
self.day_label: str = time.strftime("%Y-%m-%d")
self.end_of_day: float = tomorrow(t) # Time that current day will end
self.end_of_week: float = next_week(t) # Time that current day will end
self.end_of_month: float = next_month(t) # Time that current month will end
self.q_day = 1 # Day of quota reset
self.q_period = "m" # Daily/Weekly/Monthly quota = d/w/m
self.quota = self.left = 0.0 # Quota and remaining quota
@ -119,7 +120,8 @@ class BPSMeter:
def save(self):
""" Save admin to disk """
data = (
sabnzbd.save_admin(
(
self.last_update,
self.grand_total,
self.day_total,
@ -132,8 +134,9 @@ class BPSMeter:
self.left,
self.q_time,
self.timeline_total,
),
BYTES_FILE_NAME,
)
sabnzbd.save_admin(data, BYTES_FILE_NAME)
def defaults(self):
""" Get the latest data from the database and assign to a fake server """

1
sabnzbd/dirscanner.py

@ -102,7 +102,6 @@ class DirScanner(threading.Thread):
def stop(self):
""" Stop the dir scanner """
logging.info("Dirscanner shutting down")
self.shutdown = True
def save(self):

3
sabnzbd/downloader.py

@ -35,7 +35,6 @@ from sabnzbd.newswrapper import NewsWrapper, request_server_info
import sabnzbd.notifier
import sabnzbd.config as config
import sabnzbd.cfg as cfg
import sabnzbd.scheduler
from sabnzbd.misc import from_units, nntp_to_msg, int_conv
from sabnzbd.utils.happyeyeballs import happyeyeballs
@ -869,7 +868,7 @@ class Downloader(Thread):
stamp = time.time() + 60.0 * interval
self._timers[server.id].append(stamp)
if interval:
sabnzbd.scheduler.plan_server(self.trigger_server, [server.id, stamp], interval)
sabnzbd.Scheduler.plan_server(self.trigger_server, [server.id, stamp], interval)
@synchronized(TIMER_LOCK)
def trigger_server(self, server_id, timestamp):

26
sabnzbd/interface.py

@ -37,7 +37,6 @@ from Cheetah.Template import Template
import sabnzbd
import sabnzbd.rss
import sabnzbd.scheduler as scheduler
from sabnzbd.misc import (
to_units,
from_units,
@ -78,11 +77,6 @@ from sabnzbd.api import (
)
##############################################################################
# Global constants
##############################################################################
##############################################################################
# Security functions
##############################################################################
def secured_expose(wrap_func=None, check_configlock=False, check_api_key=False):
@ -423,13 +417,13 @@ class MainPage:
@secured_expose(check_api_key=True)
def pause(self, **kwargs):
scheduler.plan_resume(0)
sabnzbd.Scheduler.plan_resume(0)
sabnzbd.Downloader.pause()
raise Raiser(self.__root)
@secured_expose(check_api_key=True)
def resume(self, **kwargs):
scheduler.plan_resume(0)
sabnzbd.Scheduler.plan_resume(0)
sabnzbd.unpause_all()
raise Raiser(self.__root)
@ -963,13 +957,13 @@ class QueuePage:
@secured_expose(check_api_key=True)
def pause(self, **kwargs):
scheduler.plan_resume(0)
sabnzbd.Scheduler.plan_resume(0)
sabnzbd.Downloader.pause()
raise queueRaiser(self.__root, kwargs)
@secured_expose(check_api_key=True)
def resume(self, **kwargs):
scheduler.plan_resume(0)
sabnzbd.Scheduler.plan_resume(0)
sabnzbd.unpause_all()
raise queueRaiser(self.__root, kwargs)
@ -1811,7 +1805,7 @@ class ConfigRss:
""" Save changed RSS automatic readout rate """
cfg.rss_rate.set(kwargs.get("rss_rate"))
config.save_config()
scheduler.restart()
sabnzbd.Scheduler.restart()
raise rssRaiser(self.__root, kwargs)
@secured_expose(check_api_key=True, check_configlock=True)
@ -2026,7 +2020,7 @@ class ConfigRss:
@secured_expose(check_api_key=True, check_configlock=True)
def rss_now(self, *args, **kwargs):
""" Run an automatic RSS run now """
scheduler.force_rss()
sabnzbd.Scheduler.force_rss()
raise rssRaiser(self.__root, kwargs)
@ -2105,7 +2099,7 @@ class ConfigScheduling:
snum = 1
conf["schedlines"] = []
conf["taskinfo"] = []
for ev in scheduler.sort_schedules(all_events=False):
for ev in sabnzbd.scheduler.sort_schedules(all_events=False):
line = ev[3]
conf["schedlines"].append(line)
try:
@ -2221,7 +2215,7 @@ class ConfigScheduling:
cfg.schedules.set(sched)
config.save_config()
scheduler.restart(force=True)
sabnzbd.Scheduler.restart()
raise Raiser(self.__root)
@secured_expose(check_api_key=True, check_configlock=True)
@ -2232,7 +2226,7 @@ class ConfigScheduling:
schedules.remove(line)
cfg.schedules.set(schedules)
config.save_config()
scheduler.restart(force=True)
sabnzbd.Scheduler.restart()
raise Raiser(self.__root)
@secured_expose(check_api_key=True, check_configlock=True)
@ -2249,7 +2243,7 @@ class ConfigScheduling:
break
cfg.schedules.set(schedules)
config.save_config()
scheduler.restart(force=True)
sabnzbd.Scheduler.restart()
raise Raiser(self.__root)

4
sabnzbd/nzbqueue.py

@ -322,7 +322,7 @@ class NzbQueue:
if nzo.nzo_id:
nzo.deleted = False
priority = nzo.priority
if sabnzbd.scheduler.analyse(False, priority):
if sabnzbd.Scheduler.analyse(False, priority):
nzo.status = Status.PAUSED
self.__nzo_table[nzo.nzo_id] = nzo
@ -615,7 +615,7 @@ class NzbQueue:
return nzo_id_pos1
nzo.set_priority(priority)
if sabnzbd.scheduler.analyse(False, priority) and nzo.status in (
if sabnzbd.Scheduler.analyse(False, priority) and nzo.status in (
Status.CHECKING,
Status.DOWNLOADING,
Status.QUEUED,

11
sabnzbd/osxmenu.py

@ -42,7 +42,6 @@ import sabnzbd.notifier as notifier
from sabnzbd.api import fast_queue
import sabnzbd.config as config
import sabnzbd.scheduler as scheduler
import sabnzbd.downloader
status_icons = {
@ -507,8 +506,8 @@ class SABnzbdDelegate(NSObject):
if paused:
self.state = T("Paused")
if sabnzbd.scheduler.pause_int() != "0":
self.setMenuTitle_("\n\n%s\n" % (sabnzbd.scheduler.pause_int()))
if sabnzbd.Scheduler.pause_int() != "0":
self.setMenuTitle_("\n\n%s\n" % (sabnzbd.Scheduler.pause_int()))
else:
self.setMenuTitle_("")
elif bytes_left > 0:
@ -750,18 +749,18 @@ class SABnzbdDelegate(NSObject):
minutes = int(sender.representedObject())
# logging.info("[osx] pause for %s" % (minutes))
if minutes:
scheduler.plan_resume(minutes)
sabnzbd.Scheduler.plan_resume(minutes)
else:
sabnzbd.Downloader.pause()
def resumeAction_(self, sender):
scheduler.plan_resume(0)
sabnzbd.Scheduler.plan_resume(0)
def watchedFolderAction_(self, sender):
sabnzbd.DirScanner.scan()
def rssAction_(self, sender):
scheduler.force_rss()
sabnzbd.Scheduler.force_rss()
def openFolderAction_(self, sender):
folder2open = sender.representedObject()

10
sabnzbd/postproc.py

@ -25,7 +25,7 @@ import functools
import time
import re
import queue
from typing import List
from typing import List, Optional
import sabnzbd
from sabnzbd.newsunpack import (
@ -108,10 +108,10 @@ class PostProcessor(Thread):
self.load()
# Fast-queue for jobs already finished by DirectUnpack
self.fast_queue: queue.Queue[NzbObject] = queue.Queue()
self.fast_queue: queue.Queue[Optional[NzbObject]] = queue.Queue()
# Regular queue for jobs that might need more attention
self.slow_queue: queue.Queue[NzbObject] = queue.Queue()
self.slow_queue: queue.Queue[Optional[NzbObject]] = queue.Queue()
# Load all old jobs
for nzo in self.history_queue:
@ -174,7 +174,7 @@ class PostProcessor(Thread):
self.save()
sabnzbd.history_updated()
def remove(self, nzo):
def remove(self, nzo: NzbObject):
""" Remove given nzo from the queue """
try:
self.history_queue.remove(nzo)
@ -293,7 +293,7 @@ class PostProcessor(Thread):
sabnzbd.Downloader.resume_from_postproc()
def process_job(nzo):
def process_job(nzo: NzbObject):
""" Process one job """
start = time.time()

9
sabnzbd/sabtray.py

@ -26,7 +26,6 @@ from time import sleep
import sabnzbd
from sabnzbd.panic import launch_a_browser
import sabnzbd.api as api
import sabnzbd.scheduler as scheduler
import sabnzbd.cfg as cfg
from sabnzbd.misc import to_units
@ -144,7 +143,7 @@ class SABTrayThread(SysTrayIconThread):
def pausefor(self, minutes):
""" Need function for each pause-timer """
scheduler.plan_resume(minutes)
sabnzbd.Scheduler.plan_resume(minutes)
def pausefor5min(self, icon):
self.pausefor(5)
@ -171,7 +170,7 @@ class SABTrayThread(SysTrayIconThread):
def rss(self, icon):
self.hover_text = T("Read all RSS feeds")
scheduler.force_rss()
sabnzbd.Scheduler.force_rss()
def nologin(self, icon):
sabnzbd.cfg.username.set("")
@ -192,9 +191,9 @@ class SABTrayThread(SysTrayIconThread):
sabnzbd.shutdown_program()
def pause(self):
scheduler.plan_resume(0)
sabnzbd.Scheduler.plan_resume(0)
sabnzbd.Downloader.pause()
def resume(self):
scheduler.plan_resume(0)
sabnzbd.Scheduler.plan_resume(0)
sabnzbd.unpause_all()

7
sabnzbd/sabtraylinux.py

@ -42,7 +42,6 @@ from os.path import abspath
import sabnzbd
from sabnzbd.panic import launch_a_browser
import sabnzbd.api as api
import sabnzbd.scheduler as scheduler
import sabnzbd.cfg as cfg
from sabnzbd.misc import to_units
@ -194,12 +193,12 @@ class StatusIcon(Thread):
sabnzbd.shutdown_program()
def pause(self):
scheduler.plan_resume(0)
sabnzbd.Scheduler.plan_resume(0)
sabnzbd.Downloader.pause()
def resume(self):
scheduler.plan_resume(0)
sabnzbd.Scheduler.plan_resume(0)
sabnzbd.unpause_all()
def rss(self, icon):
scheduler.force_rss()
sabnzbd.Scheduler.force_rss()

341
sabnzbd/scheduler.py

@ -34,36 +34,47 @@ import sabnzbd.cfg as cfg
from sabnzbd.constants import LOW_PRIORITY, NORMAL_PRIORITY, HIGH_PRIORITY
__SCHED: Optional[kronos.ThreadedScheduler] = None # Global pointer to Scheduler instance
SCHEDULE_GUARD_FLAG = False
PP_PAUSE_EVENT = False
def schedule_guard():
""" Set flag for scheduler restart """
global SCHEDULE_GUARD_FLAG
SCHEDULE_GUARD_FLAG = True
def pp_pause():
sabnzbd.PostProcessor.paused = True
def pp_resume():
sabnzbd.PostProcessor.paused = False
def pp_pause_event():
return PP_PAUSE_EVENT
class Scheduler:
def __init__(self):
self.scheduler = kronos.ThreadedScheduler()
self.pause_end: Optional[float] = None # Moment when pause will end
self.restart_scheduler = False
self.pp_pause_event = False
self.load_schedules()
def start(self):
""" Start the scheduler """
self.scheduler.start()
def stop(self):
""" Stop the scheduler, destroy instance """
logging.debug("Stopping scheduler")
self.scheduler.stop()
def init():
""" Create the scheduler and set all required events """
global __SCHED
def restart(self, plan_restart=True):
""" Stop and start scheduler """
if plan_restart:
self.restart_scheduler = True
elif self.restart_scheduler:
logging.debug("Restarting scheduler")
self.restart_scheduler = False
self.scheduler.stop()
self.scheduler.start()
self.analyse(sabnzbd.Downloader.paused)
self.load_schedules()
def abort(self):
"""Emergency stop, just set the running attribute false so we don't
have to wait the full scheduler-check cycle before it really stops"""
self.scheduler.running = False
def is_alive(self):
""" Thread-like check if we are doing fine """
if self.scheduler.thread:
return self.scheduler.thread.is_alive()
return False
reset_guardian()
__SCHED = kronos.ThreadedScheduler()
def load_schedules(self):
rss_planned = False
for schedule in cfg.schedules():
@ -95,7 +106,7 @@ def init():
d = list(range(1, 8))
if action_name == "resume":
action = scheduled_resume
action = self.scheduled_resume
arguments = []
elif action_name == "pause":
action = sabnzbd.Downloader.pause
@ -163,24 +174,19 @@ def init():
continue
if enabled == "1":
logging.debug("Scheduling %s(%s) on days %s at %02d:%02d", action_name, arguments, d, h, m)
__SCHED.add_daytime_task(action, action_name, d, None, (h, m), kronos.method.sequential, arguments, None)
logging.info("Scheduling %s(%s) on days %s at %02d:%02d", action_name, arguments, d, h, m)
self.scheduler.add_daytime_task(action, action_name, d, None, (h, m), args=arguments)
else:
logging.debug("Skipping %s(%s) on days %s at %02d:%02d", action_name, arguments, d, h, m)
# Set Guardian interval to 30 seconds
__SCHED.add_interval_task(sched_guardian, "Guardian", 15, 30, kronos.method.sequential, None, None)
# Set RSS check interval
if not rss_planned:
interval = cfg.rss_rate()
delay = random.randint(0, interval - 1)
logging.debug("Scheduling RSS interval task every %s min (delay=%s)", interval, delay)
logging.info("Scheduling RSS interval task every %s min (delay=%s)", interval, delay)
sabnzbd.RSSReader.next_run = time.time() + delay * 60
__SCHED.add_interval_task(
sabnzbd.RSSReader.run, "RSS", delay * 60, interval * 60, kronos.method.sequential, None, None
)
__SCHED.add_single_task(sabnzbd.RSSReader.run, "RSS", 15, kronos.method.sequential, None, None)
self.scheduler.add_interval_task(sabnzbd.RSSReader.run, "RSS", delay * 60, interval * 60)
self.scheduler.add_single_task(sabnzbd.RSSReader.run, "RSS", 15)
if cfg.version_check():
# Check for new release, once per week on random time
@ -188,144 +194,31 @@ def init():
h = random.randint(0, 23)
d = (random.randint(1, 7),)
logging.debug("Scheduling VersionCheck on day %s at %s:%s", d[0], h, m)
__SCHED.add_daytime_task(
sabnzbd.misc.check_latest_version, "VerCheck", d, None, (h, m), kronos.method.sequential, [], None
)
logging.info("Scheduling VersionCheck on day %s at %s:%s", d[0], h, m)
self.scheduler.add_daytime_task(sabnzbd.misc.check_latest_version, "VerCheck", d, None, (h, m))
action, hour, minute = sabnzbd.BPSMeter.get_quota()
if action:
logging.info("Setting schedule for quota check daily at %s:%s", hour, minute)
__SCHED.add_daytime_task(
action, "quota_reset", list(range(1, 8)), None, (hour, minute), kronos.method.sequential, [], None
)
self.scheduler.add_daytime_task(action, "quota_reset", list(range(1, 8)), None, (hour, minute))
if sabnzbd.misc.int_conv(cfg.history_retention()) > 0:
logging.info("Setting schedule for midnight auto history-purge")
__SCHED.add_daytime_task(
sabnzbd.database.midnight_history_purge,
"midnight_history_purge",
list(range(1, 8)),
None,
(0, 0),
kronos.method.sequential,
[],
None,
self.scheduler.add_daytime_task(
sabnzbd.database.midnight_history_purge, "midnight_history_purge", list(range(1, 8)), None, (0, 0)
)
logging.info("Setting schedule for midnight BPS reset")
__SCHED.add_daytime_task(
sabnzbd.BPSMeter.midnight,
"midnight_bps",
list(range(1, 8)),
None,
(0, 0),
kronos.method.sequential,
[],
None,
)
self.scheduler.add_daytime_task(sabnzbd.BPSMeter.midnight, "midnight_bps", list(range(1, 8)), None, (0, 0))
# Subscribe to special schedule changes
cfg.rss_rate.callback(schedule_guard)
def start():
""" Start the scheduler """
global __SCHED
if __SCHED:
logging.debug("Starting scheduler")
__SCHED.start()
cfg.rss_rate.callback(self.scheduler_restart_guard)
def restart(force=False):
""" Stop and start scheduler """
global SCHEDULE_GUARD_FLAG
if force:
SCHEDULE_GUARD_FLAG = True
else:
if SCHEDULE_GUARD_FLAG:
SCHEDULE_GUARD_FLAG = False
stop()
analyse(sabnzbd.Downloader.paused)
init()
start()
def stop():
""" Stop the scheduler, destroy instance """
global __SCHED
if __SCHED:
logging.debug("Stopping scheduler")
try:
__SCHED.stop()
except IndexError:
pass
del __SCHED
__SCHED = None
def abort():
""" Emergency stop, just set the running attribute false """
global __SCHED
if __SCHED:
logging.debug("Terminating scheduler")
__SCHED.running = False
def sort_schedules(all_events, now=None):
"""Sort the schedules, based on order of happening from now
`all_events=True`: Return an event for each active day
`all_events=False`: Return only first occurring event of the week
`now` : for testing: simulated localtime()
"""
day_min = 24 * 60
week_min = 7 * day_min
events = []
now = now or time.localtime()
now_hm = now[3] * 60 + now[4]
now = now[6] * day_min + now_hm
for schedule in cfg.schedules():
parms = None
try:
# Note: the last parameter can have spaces (category name)!
enabled, m, h, dd, action, parms = schedule.split(None, 5)
except:
try:
enabled, m, h, dd, action = schedule.split(None, 4)
except:
continue # Bad schedule, ignore
action = action.strip()
if dd == "*":
dd = "1234567"
if not dd.isdigit():
continue # Bad schedule, ignore
for d in dd:
then = (int(d) - 1) * day_min + int(h) * 60 + int(m)
dif = then - now
if all_events and dif < 0:
# Expired event will occur again after a week
dif = dif + week_min
events.append((dif, action, parms, schedule, enabled))
if not all_events:
break
events.sort(key=lambda x: x[0])
return events
def analyse(was_paused=False, priority=None):
def analyse(self, was_paused=False, priority=None):
"""Determine what pause/resume state we would have now.
'priority': evaluate only effect for given priority, return True for paused
"""
global PP_PAUSE_EVENT
PP_PAUSE_EVENT = False
self.pp_pause_event = False
paused = None
paused_all = False
pause_post = False
@ -351,16 +244,16 @@ def analyse(was_paused=False, priority=None):
paused = True
elif action == "pause_all":
paused_all = True
PP_PAUSE_EVENT = True
self.pp_pause_event = True
elif action == "resume":
paused = False
paused_all = False
elif action == "pause_post":
pause_post = True
PP_PAUSE_EVENT = True
self.pp_pause_event = True
elif action == "resume_post":
pause_post = False
PP_PAUSE_EVENT = True
self.pp_pause_event = True
elif action == "speedlimit" and value is not None:
speedlimit = ev[2]
elif action == "pause_all_low":
@ -425,51 +318,43 @@ def analyse(was_paused=False, priority=None):
pass
config.save_config()
def scheduler_restart_guard(self):
""" Set flag for scheduler restart """
self.restart_scheduler = True
# Support for single shot pause (=delayed resume)
__PAUSE_END = None # Moment when pause will end
def scheduled_resume():
def scheduled_resume(self):
""" Scheduled resume, only when no oneshot resume is active """
global __PAUSE_END
if __PAUSE_END is None:
if self.pause_end is None:
sabnzbd.unpause_all()
def __oneshot_resume(when):
def __oneshot_resume(self, when):
"""Called by delayed resume schedule
Only resumes if call comes at the planned time
"""
global __PAUSE_END
if __PAUSE_END is not None and (when > __PAUSE_END - 5) and (when < __PAUSE_END + 55):
__PAUSE_END = None
if self.pause_end is not None and (when > self.pause_end - 5) and (when < self.pause_end + 55):
self.pause_end = None
logging.debug("Resume after pause-interval")
sabnzbd.unpause_all()
else:
logging.debug("Ignoring cancelled resume")
def plan_resume(interval):
def plan_resume(self, interval):
""" Set a scheduled resume after the interval """
global __SCHED, __PAUSE_END
if interval > 0:
__PAUSE_END = time.time() + (interval * 60)
logging.debug("Schedule resume at %s", __PAUSE_END)
__SCHED.add_single_task(__oneshot_resume, "", interval * 60, kronos.method.sequential, [__PAUSE_END], None)
self.pause_end = time.time() + (interval * 60)
logging.debug("Schedule resume at %s", self.pause_end)
self.scheduler.add_single_task(self.__oneshot_resume, "", interval * 60, args=[self.pause_end])
sabnzbd.Downloader.pause()
else:
__PAUSE_END = None
self.pause_end = None
sabnzbd.unpause_all()
def pause_int():
def pause_int(self) -> str:
""" Return minutes:seconds until pause ends """
global __PAUSE_END
if __PAUSE_END is None:
if self.pause_end is None:
return "0"
else:
val = __PAUSE_END - time.time()
val = self.pause_end - time.time()
if val < 0:
sign = "-"
val = abs(val)
@ -479,50 +364,70 @@ def pause_int():
sec = int(val - mins * 60)
return "%s%d:%02d" % (sign, mins, sec)
def pause_check():
def pause_check(self):
""" Unpause when time left is negative, compensate for missed schedule """
global __PAUSE_END
if __PAUSE_END is not None and (__PAUSE_END - time.time()) < 0:
__PAUSE_END = None
if self.pause_end is not None and (self.pause_end - time.time()) < 0:
self.pause_end = None
logging.debug("Force resume, negative timer")
sabnzbd.unpause_all()
def plan_server(action, parms, interval):
def plan_server(self, action, parms, interval):
""" Plan to re-activate server after 'interval' minutes """
__SCHED.add_single_task(action, "", interval * 60, kronos.method.sequential, parms, None)
self.scheduler.add_single_task(action, "", interval * 60, args=parms)
def force_rss():
def force_rss(self):
""" Add a one-time RSS scan, one second from now """
__SCHED.add_single_task(sabnzbd.RSSReader.run, "RSS", 1, kronos.method.sequential, None, None)
self.scheduler.add_single_task(sabnzbd.RSSReader.run, "RSS", 1)
# Scheduler Guarding system
# Each check sets the guardian flag False
# Each successful scheduled check sets the flag
# If 4 consecutive checks fail, the scheduler is assumed to have crashed
def pp_pause():
sabnzbd.PostProcessor.paused = True
def pp_resume():
sabnzbd.PostProcessor.paused = False
__SCHED_GUARDIAN = False
__SCHED_GUARDIAN_CNT = 0
def sort_schedules(all_events, now=None):
"""Sort the schedules, based on order of happening from now
`all_events=True`: Return an event for each active day
`all_events=False`: Return only first occurring event of the week
`now` : for testing: simulated localtime()
"""
def reset_guardian():
global __SCHED_GUARDIAN, __SCHED_GUARDIAN_CNT
__SCHED_GUARDIAN = False
__SCHED_GUARDIAN_CNT = 0
day_min = 24 * 60
week_min = 7 * day_min
events = []
now = now or time.localtime()
now_hm = now[3] * 60 + now[4]
now = now[6] * day_min + now_hm
def sched_guardian():
global __SCHED_GUARDIAN, __SCHED_GUARDIAN_CNT
__SCHED_GUARDIAN = True
for schedule in cfg.schedules():
parms = None
try:
# Note: the last parameter can have spaces (category name)!
enabled, m, h, dd, action, parms = schedule.split(None, 5)
except:
try:
enabled, m, h, dd, action = schedule.split(None, 4)
except:
continue # Bad schedule, ignore
action = action.strip()
if dd == "*":
dd = "1234567"
if not dd.isdigit():
continue # Bad schedule, ignore
for d in dd:
then = (int(d) - 1) * day_min + int(h) * 60 + int(m)
dif = then - now
if all_events and dif < 0:
# Expired event will occur again after a week
dif = dif + week_min
events.append((dif, action, parms, schedule, enabled))
if not all_events:
break
def sched_check():
global __SCHED_GUARDIAN, __SCHED_GUARDIAN_CNT
if not __SCHED_GUARDIAN:
__SCHED_GUARDIAN_CNT += 1
return __SCHED_GUARDIAN_CNT < 4
reset_guardian()
return True
events.sort(key=lambda x: x[0])
return events

3
sabnzbd/urlgrabber.py

@ -84,14 +84,11 @@ class URLGrabber(Thread):
self.queue.put((url, future_nzo))
def stop(self):
logging.info("URLGrabber shutting down")
self.shutdown = True
self.add(None, None)
def run(self):
logging.info("URLGrabber starting up")
self.shutdown = False
while not self.shutdown:
(url, future_nzo) = self.queue.get()

66
sabnzbd/utils/kronos.py

@ -42,9 +42,11 @@ The version in Turbogears is based on the original stand-alone Kronos.
This is open-source software, released under the MIT Software License:
http://www.opensource.org/licenses/mit-license.php
Adapted to work on Python 3 by the SABnzbd-Team.
"""
__version__ = "2.0"
__version__ = "2.1"
__all__ = [
"DayTaskRescheduler",
@ -66,20 +68,15 @@ __all__ = [
"ThreadedTaskMixin",
"ThreadedWeekdayTask",
"WeekdayTask",
"add_interval_task",
"add_monthday_task",
"add_single_task",
"add_weekday_task",
"cancel",
"method",
]
import os
import sys
import sched
import time
import weakref
import logging
import threading
class method:
@ -121,7 +118,9 @@ class Scheduler:
def _release_lock(self):
pass
def add_interval_task(self, action, taskname, initialdelay, interval, processmethod, args, kw):
def add_interval_task(
self, action, taskname, initialdelay, interval, processmethod=method.sequential, args=None, kw=None
):
"""Add a new Interval Task to the schedule.
A very short initialdelay or one of zero cannot be honored, you will
@ -148,7 +147,7 @@ class Scheduler:
self.schedule_task(task, initialdelay)
return task
def add_single_task(self, action, taskname, initialdelay, processmethod, args, kw):
def add_single_task(self, action, taskname, initialdelay, processmethod=method.sequential, args=None, kw=None):
"""Add a new task to the scheduler that will only be executed once."""
if initialdelay < 0:
raise ValueError("Delay must be >0")
@ -169,7 +168,9 @@ class Scheduler:
self.schedule_task(task, initialdelay)
return task
def add_daytime_task(self, action, taskname, weekdays, monthdays, timeonday, processmethod, args, kw):
def add_daytime_task(
self, action, taskname, weekdays, monthdays, timeonday, processmethod=method.sequential, args=None, kw=None
):
"""Add a new Day Task (Weekday or Monthday) to the schedule."""
if weekdays and monthdays:
raise ValueError("You can only specify weekdays or monthdays, " "not both")
@ -250,8 +251,6 @@ class Scheduler:
"""Cancel given scheduled task."""
self.sched.cancel(task.event)
if sys.version_info >= (2, 6):
# code for sched module of python 2.6+
def _getqueuetoptime(self):
try:
return self.sched._queue[0].time
@ -261,24 +260,14 @@ class Scheduler:
def _clearschedqueue(self):
self.sched._queue[:] = []
else:
# code for sched module of python 2.5 and older
def _getqueuetoptime(self):
try:
return self.sched.queue[0][0]
except IndexError:
return 0.0
def _clearschedqueue(self):
self.sched.queue[:] = []
def _run(self):
# Low-level run method to do the actual scheduling loop.
self.running = True
while self.running:
try:
self.sched.run()
except Exception as x:
logging.error("ERROR DURING SCHEDULER EXECUTION %s" % str(x), exc_info=True)
logging.error("Error during scheduler execution: %s" % str(x), exc_info=True)
# queue is empty; sleep a short while before checking again
if self.running:
time.sleep(5)
@ -312,7 +301,7 @@ class Task:
def handle_exception(self, exc):
"""Handle any exception that occured during task execution."""
logging.error("ERROR DURING SCHEDULER EXECUTION %s" % str(exc), exc_info=True)
logging.error("Error during scheduler execution: %s" % str(exc), exc_info=True)
class SingleTask(Task):
@ -414,10 +403,7 @@ class MonthdayTask(DayTaskRescheduler, Task):
self.action(*self.args, **self.kw)
try:
import threading
class ThreadedScheduler(Scheduler):
class ThreadedScheduler(Scheduler):
"""A Scheduler that runs in its own thread."""
def __init__(self):
@ -447,7 +433,8 @@ try:
"""Release the lock on th ethread's task queue."""
self._lock.release()
class ThreadedTaskMixin:
class ThreadedTaskMixin:
"""A mixin class to make a Task execute in a separate thread."""
def __call__(self, schedulerref):
@ -457,35 +444,34 @@ try:
def threadedcall(self):
# This method is run within its own thread, so we have to
# do the execute() call and exception handling here.
# # do the execute() call and exception handling here.
try:
self.execute()
except Exception as x:
self.handle_exception(x)
class ThreadedIntervalTask(ThreadedTaskMixin, IntervalTask):
class ThreadedIntervalTask(ThreadedTaskMixin, IntervalTask):
"""Interval Task that executes in its own thread."""
pass
class ThreadedSingleTask(ThreadedTaskMixin, SingleTask):
class ThreadedSingleTask(ThreadedTaskMixin, SingleTask):
"""Single Task that executes in its own thread."""
pass
class ThreadedWeekdayTask(ThreadedTaskMixin, WeekdayTask):
class ThreadedWeekdayTask(ThreadedTaskMixin, WeekdayTask):
"""Weekday Task that executes in its own thread."""
pass
class ThreadedMonthdayTask(ThreadedTaskMixin, MonthdayTask):
"""Monthday Task that executes in its own thread."""
pass
class ThreadedMonthdayTask(ThreadedTaskMixin, MonthdayTask):
"""Monthday Task that executes in its own thread."""
except ImportError:
# threading is not available
pass

Loading…
Cancel
Save