diff --git a/CHANGES.md b/CHANGES.md
index 3114da9..c851873 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -1,4 +1,16 @@
-### 0.23.17 (2020-04-12 12:40:00 UTC)
+### 0.23.18 (2020-05-03 23:10:00 UTC)
+
+* Change allow Python 3.8.10 and 3.9.5
+* Remove PiSexy provider
+* Fix refreshShow, prevent another refresh of show if already in queue and not forced
+* Fix webapi set scene season
+* Fix set path in all_tests for py2
+* Fix webapi exception if no backlog was done before (CMD_SickGearCheckScheduler)
+* Change webapi don't allow setting of scene numbers when show hasn't activated scene numbering
+* Add webapi unit tests
+
+
+### 0.23.17 (2020-04-12 12:40:00 UTC)
* Update UnRar for Windows 6.00 to 6.01 x64
diff --git a/gui/slick/images/providers/pisexy.png b/gui/slick/images/providers/pisexy.png
deleted file mode 100644
index 20674eb..0000000
Binary files a/gui/slick/images/providers/pisexy.png and /dev/null differ
diff --git a/sickbeard/providers/__init__.py b/sickbeard/providers/__init__.py
index 0956811..9910d7d 100755
--- a/sickbeard/providers/__init__.py
+++ b/sickbeard/providers/__init__.py
@@ -40,7 +40,7 @@ __all__ = [
'custom01', 'custom11', 'ettv', 'eztv', 'fano', 'filelist', 'funfile', 'grabtheinfo',
'hdbits', 'hdme', 'hdspace', 'hdtorrents',
'immortalseed', 'iptorrents', 'limetorrents', 'magnetdl', 'milkie', 'morethan', 'nebulance', 'ncore', 'nyaa',
- 'pisexy', 'pretome', 'privatehd', 'ptf',
+ 'pretome', 'privatehd', 'ptf',
'rarbg', 'revtt', 'scenehd', 'scenetime', 'shazbat', 'showrss', 'skytorrents', 'snowfl', 'speedcd',
'thepiratebay', 'torlock', 'torrentday', 'torrenting', 'torrentleech', 'tvchaosuk',
'xspeeds', 'zooqle',
diff --git a/sickbeard/providers/pisexy.py b/sickbeard/providers/pisexy.py
deleted file mode 100644
index ecbfcf3..0000000
--- a/sickbeard/providers/pisexy.py
+++ /dev/null
@@ -1,133 +0,0 @@
-# This file is part of Sick Beard.
-#
-# Sick Beard is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# Sick Beard is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with Sick Beard. If not, see .
-
-import re
-import traceback
-
-from . import generic
-from .. import logger
-from ..helpers import try_int
-from bs4_parser import BS4Parser
-
-from _23 import unidecode
-from six import iteritems, string_types
-
-
-class PiSexyProvider(generic.TorrentProvider):
-
- def __init__(self):
- generic.TorrentProvider.__init__(self, 'PiSexy')
-
- self.url_base = 'https://pisexy.me/'
- self.urls = {'config_provider_home_uri': self.url_base,
- 'login': self.url_base + 'takelogin.php',
- 'search': self.url_base + 'browseall.php?search=%s'}
-
- self.url = self.urls['config_provider_home_uri']
-
- self.username, self.password, self.freeleech, self.minseed, self.minleech = 5 * [None]
-
- def _authorised(self, **kwargs):
-
- return super(PiSexyProvider, self)._authorised(
- logged_in=(lambda y=None: self.has_all_cookies(['uid', 'pass', 'pcode'])))
-
- def _search_provider(self, search_params, **kwargs):
-
- results = []
- if not self._authorised():
- return results
-
- items = {'Cache': [], 'Season': [], 'Episode': [], 'Propers': []}
-
- rc = dict([(k, re.compile('(?i)' + v)) for (k, v) in iteritems({
- 'get': r'info.php\?id', 'cats': 'cat=(?:0|50[12])', 'filter': 'free',
- 'title': r'Download\s([^"\']+)', 'seeders': r'(^\d+)', 'leechers': r'(\d+)$'})])
- for mode in search_params:
- for search_string in search_params[mode]:
- search_string = unidecode(search_string)
- search_url = self.urls['search'] % search_string
-
- html = self.get_url(search_url)
- if self.should_skip():
- return results
-
- cnt = len(items[mode])
- try:
- if not html or self._has_no_results(html):
- raise generic.HaltParseException
-
- with BS4Parser(html, parse_only=dict(table={'class': 'listor'})) as tbl:
- tbl_rows = [] if not tbl else tbl.find_all('tr')
-
- if 2 > len(tbl_rows):
- raise generic.HaltParseException
-
- head = None
- for tr in tbl_rows[1:]:
- cells = tr.find_all('td')
- if 5 > len(cells):
- continue
- try:
- head = head if None is not head else self._header_row(tr, {'seed': r'(?:see/lee|seed)'})
- seeders, leechers = 2 * [cells[head['seed']].get_text().strip()]
- seeders, leechers = [try_int(n) for n in [
- rc['seeders'].findall(seeders)[0], rc['leechers'].findall(leechers)[0]]]
- if not tr.find('a', href=rc['cats']) or self._reject_item(
- seeders, leechers, self.freeleech and not tr.find('img', src=rc['filter'])):
- continue
-
- info = tr.find('a', href=rc['get'])
- tag = tr.find('a', alt=rc['title']) or tr.find('a', title=rc['title'])
- title = tag and rc['title'].findall(str(tag))
- title = title and title[0]
- if not isinstance(title, string_types) or 10 > len(title):
- title = (rc['title'].sub(r'\1', info.attrs.get('title', ''))
- or info.get_text()).strip()
- if (10 > len(title)) or (4 > len(re.sub(r'[^.\-\s]', '', title))):
- continue
- size = cells[head['size']].get_text().strip()
- download_url = self._link(info['href'])
- except (AttributeError, TypeError, ValueError, KeyError, IndexError):
- continue
-
- if title and download_url:
- items[mode].append((title, download_url, seeders, self._bytesizer(size)))
-
- except generic.HaltParseException:
- pass
- except (BaseException, Exception):
- logger.log(u'Failed to parse. Traceback: %s' % traceback.format_exc(), logger.ERROR)
-
- self._log_search(mode, len(items[mode]) - cnt, search_url)
-
- results = self._sort_seeding(mode, results + items[mode])
-
- return results
-
- def get_data(self, url):
- result = None
- html = self.get_url(url, timeout=90)
- if self.should_skip():
- return result
-
- try:
- result = self._link(re.findall(r'(?i)"([^"]*?download\.php[^"]+?&(?!pimp)[^"]*)"', html)[0])
- except IndexError:
- logger.log('Failed no torrent in response', logger.DEBUG)
- return result
-
-
-provider = PiSexyProvider()
diff --git a/sickbeard/show_queue.py b/sickbeard/show_queue.py
index 5543b18..8cb1353 100644
--- a/sickbeard/show_queue.py
+++ b/sickbeard/show_queue.py
@@ -299,7 +299,7 @@ class ShowQueue(generic_queue.GenericQueue):
:return:
:rtype: QueueItemRefresh
"""
- if self.isBeingRefreshed(show_obj) and not force:
+ if (self.isBeingRefreshed(show_obj) or self.isInRefreshQueue(show_obj)) and not force:
raise exceptions_helper.CantRefreshException('This show is already being refreshed, not refreshing again.')
if ((not after_update and self.isBeingUpdated(show_obj)) or self.isInUpdateQueue(show_obj)) and not force:
diff --git a/sickbeard/webapi.py b/sickbeard/webapi.py
index 7fb0e2f..8cd4ea4 100644
--- a/sickbeard/webapi.py
+++ b/sickbeard/webapi.py
@@ -2084,7 +2084,7 @@ class CMD_SickGearCheckScheduler(ApiCall):
nextBacklog = sickbeard.backlog_search_scheduler.next_run().strftime(dateFormat)
data = {"backlog_is_paused": int(backlogPaused), "backlog_is_running": int(backlogRunning),
- "last_backlog": _ordinal_to_dateForm(sql_result[0]["last_backlog"]),
+ "last_backlog": (0 < len(sql_result) and _ordinal_to_dateForm(sql_result[0]["last_backlog"])) or '',
"next_backlog": nextBacklog}
return _responds(RESULT_SUCCESS, data)
@@ -2755,8 +2755,14 @@ class CMD_SickGearSetSceneNumber(ApiCall):
def run(self):
""" saving scene numbers """
+ show_obj = helpers.find_show_by_id({self.tvid: self.prodid})
+ if not show_obj:
+ return _responds(RESULT_FAILURE, msg="Can't find show")
+ if not show_obj.is_scene:
+ return _responds(RESULT_FAILURE, msg="Show scene numbering disabled")
+
result = set_scene_numbering_helper(self.tvid, self.prodid, self.forSeason, self.forEpisode,
- self.forAbsolute, self.sceneSeason, self.sceneEpisode, self.sceneEpisode)
+ self.forAbsolute, self.sceneSeason, self.sceneEpisode, self.sceneAbsolute)
if not result['success']:
return _responds(RESULT_FAILURE, result)
@@ -4488,11 +4494,24 @@ class CMD_SickGearShows(ApiCall):
if None is not self.paused and bool(self.paused) != bool(cur_show_obj.paused):
continue
+ genreList = []
+ if cur_show_obj.genre:
+ genreListTmp = cur_show_obj.genre.split("|")
+ for genre in genreListTmp:
+ if genre:
+ genreList.append(genre)
+ anyQualities, bestQualities = _mapQuality(cur_show_obj.quality)
+
showDict = {
"paused": cur_show_obj.paused,
"quality": _get_quality_string(cur_show_obj.quality),
"language": cur_show_obj.lang,
"air_by_date": cur_show_obj.air_by_date,
+ "airs": cur_show_obj.airs,
+ "flatten_folders": cur_show_obj.flatten_folders,
+ "genre": genreList,
+ "location": cur_show_obj._location,
+ "quality_details": {"initial": anyQualities, "archive": bestQualities},
"sports": cur_show_obj.sports,
"anime": cur_show_obj.anime,
"indexerid": cur_show_obj.prodid,
diff --git a/sickgear.py b/sickgear.py
index f538dee..61a6f5b 100755
--- a/sickgear.py
+++ b/sickgear.py
@@ -41,7 +41,7 @@ warnings.filterwarnings('ignore', module=r'.*zoneinfo.*', message='.*file or dir
warnings.filterwarnings('ignore', message='.*deprecated in cryptography.*')
versions = [((2, 7, 9), (2, 7, 18)), ((3, 9, 0), (3, 9, 2)),
- ((3, 7, 1), (3, 8, 9)), ((3, 9, 4), (3, 9, 4))] # inclusive version ranges
+ ((3, 7, 1), (3, 8, 10)), ((3, 9, 4), (3, 9, 5))] # inclusive version ranges
if not any(list(map(lambda v: v[0] <= sys.version_info[:3] <= v[1], versions))) and not int(os.environ.get('PYT', 0)):
print('Python %s.%s.%s detected.' % sys.version_info[:3])
print('Sorry, SickGear requires a Python version %s' % ', '.join(map(
diff --git a/tests/all_tests.py b/tests/all_tests.py
index 8e19db3..94df834 100644
--- a/tests/all_tests.py
+++ b/tests/all_tests.py
@@ -28,7 +28,8 @@ if '__main__' == __name__:
import unittest
import os
- sys.path.insert(1, os.path.abspath(os.path.join(os.path.dirname(os.path.dirname(__file__)), 'lib')))
+ sys.path.insert(1, os.path.abspath(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
+ 'lib')))
test_file_strings = [x for x in glob.glob('*_tests.py') if x not in __file__]
module_strings = [file_string[0:len(file_string) - 3] for file_string in test_file_strings]
suites = [unittest.defaultTestLoader.loadTestsFromName(file_string) for file_string in module_strings]
diff --git a/tests/test_lib.py b/tests/test_lib.py
index b19ac31..e517d4d 100644
--- a/tests/test_lib.py
+++ b/tests/test_lib.py
@@ -69,6 +69,11 @@ def create_test_cache_folder():
os.mkdir(sickbeard.CACHE_DIR)
+def remove_test_cache_folder():
+ if os.path.isdir(sickbeard.CACHE_DIR):
+ shutil.rmtree(sickbeard.CACHE_DIR, ignore_errors=True)
+
+
# call env functions at appropriate time during sickbeard var setup
# =================
@@ -100,6 +105,7 @@ sickbeard.logger.sb_log_instance.init_logging(False)
sickbeard.CACHE_DIR = os.path.join(TESTDIR, 'cache')
sickbeard.ZONEINFO_DIR = os.path.join(TESTDIR, 'cache', 'zoneinfo')
create_test_cache_folder()
+sickbeard.GUI_NAME = 'slick'
# =================
@@ -128,6 +134,7 @@ sickbeard.tv.TVEpisode.specify_episode = _fake_specify_ep
# =================
class SickbeardTestDBCase(unittest.TestCase):
def setUp(self):
+ create_test_cache_folder()
sickbeard.showList = []
sickbeard.showDict = {}
setup_test_db()
@@ -135,6 +142,7 @@ class SickbeardTestDBCase(unittest.TestCase):
setup_test_show_dir()
def tearDown(self):
+ remove_test_cache_folder()
sickbeard.showList = []
sickbeard.showDict = {}
teardown_test_db()
@@ -144,9 +152,9 @@ class SickbeardTestDBCase(unittest.TestCase):
class TestDBConnection(db.DBConnection, object):
- def __init__(self, db_file_name=TESTDBNAME):
+ def __init__(self, db_file_name=TESTDBNAME, row_type=None):
db_file_name = os.path.join(TESTDIR, db_file_name)
- super(TestDBConnection, self).__init__(db_file_name)
+ super(TestDBConnection, self).__init__(db_file_name, row_type=row_type)
class TestCacheDBConnection(TestDBConnection, object):
diff --git a/tests/webapi_tests.py b/tests/webapi_tests.py
new file mode 100644
index 0000000..f00813c
--- /dev/null
+++ b/tests/webapi_tests.py
@@ -0,0 +1,842 @@
+# coding=UTF-8
+# Author:
+# URL: https://github.com/SickGear/SickGear
+#
+# This file is part of SickGear.
+#
+# SickGear is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# SickGear is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with SickGear. If not, see .
+from __future__ import print_function
+import datetime
+import itertools
+import os
+import unittest
+import warnings
+import sys
+import test_lib as test
+
+warnings.filterwarnings('ignore', module=r'.*ssl_.*', message='.*SSLContext object.*')
+
+import sickbeard
+from exceptions_helper import ex
+from sickbeard.classes import SearchResult
+from sickbeard.common import Quality, ARCHIVED, DOWNLOADED, WANTED, UNAIRED, SNATCHED, SNATCHED_PROPER, SNATCHED_BEST, \
+ SNATCHED_ANY, SUBTITLED, statusStrings, UNKNOWN
+from sickbeard.event_queue import Events
+from sickbeard.tv import TVEpisode, TVShow
+from sickbeard.webserveInit import WebServer
+from sickbeard import webapi, scheduler, search_backlog, search_queue, show_queue, history, db
+from sickbeard.scene_numbering import set_scene_numbering_helper
+from lib import requests
+from six import integer_types, iteritems, iterkeys, itervalues, string_types
+
+# noinspection PyUnreachableCode
+if False:
+ from typing import Any, AnyStr
+
+NoneType = type(None)
+today = datetime.date.today()
+last_week = today - datetime.timedelta(days=5)
+old_date = today - datetime.timedelta(days=14)
+future = today + datetime.timedelta(days=1)
+far_future = today + datetime.timedelta(days=30)
+
+if 'win32' == sys.platform:
+ root_folder_tests = [
+ # root_dirs, path, expected
+ ('1|C:\\dir', 'C:\\folder', None),
+ ('1|c:\\dir', 'c:\\dir', 'c:\\dir'),
+ ('1|c:\\dir2', 'c:\\dir2\\dir', 'c:\\dir2'),
+ ('1|c:\\tv_complete|c:\\tv', 'c:\\tv', 'c:\\tv')
+ ]
+else:
+ root_folder_tests = [
+ # root_dirs, path, expected
+ ('1|~/dir', '~/dir/dir', '~/dir'),
+ ('1|/mnt/hdd/dir', '/mnt/hdd/folder', None),
+ ('1|/mnt/hdd/dir', '/mnt/hdd/dir', '/mnt/hdd/dir'),
+ ('1|/mnt/hdd/dir2', '/mnt/hdd/dir2/dir', '/mnt/hdd/dir2'),
+ ('1|/mnt/hdd/tv_complete|/mnt/hdd/tv', '/mnt/hdd/tv', '/mnt/hdd/tv')
+ ]
+
+test_shows = [
+ {'tvid': 1, 'prodid': 1234, 'name': 'Test Show', 'runtime': 45, 'airs': 'Mondays, 00:45', 'imdbid': 'tt1234567',
+ '_location': r'C:\series\show dir', 'network': 'Network', 'overview': 'Overview text', 'status': 'Continuing',
+ 'quality_init': [], 'quality_upgrade': [],
+ 'episodes': {
+ 1: {
+ 1: {'name': 'ep1', 'status': Quality.compositeStatus(DOWNLOADED, Quality.HDWEBDL),
+ 'airdate': old_date, 'description': 'ep1 description'},
+ 2: {'name': 'ep2', 'status': WANTED, 'airdate': last_week, 'description': 'ep2 description'},
+ 3: {'name': 'ep3', 'status': WANTED, 'airdate': today, 'description': 'ep3 description'},
+ 4: {'name': 'ep4', 'status': UNAIRED, 'airdate': future, 'description': 'ep4 description'},
+ 5: {'name': 'ep5', 'status': UNAIRED, 'airdate': far_future, 'description': 'ep5 description'},
+ }
+ }
+ },
+ {'tvid': 1, 'prodid': 5678, 'name': 'Test Show 2', 'runtime': 45, 'airs': 'Tuesdays, 22:15', 'imdbid': 'tt7775567',
+ '_location': r'C:\series\show 2', 'network': 'Network 2', 'overview': 'Overview text 2', 'status': 'Continuing',
+ 'quality_init': [Quality.HDTV, Quality.FULLHDWEBDL], 'quality_upgrade': [Quality.FULLHDWEBDL],
+ 'episodes': {
+ 1: {
+ 1: {'name': 'new ep1', 'status': UNAIRED, 'airdate': far_future, 'description': 'ep1 description'},
+ 2: {'name': 'new ep2', 'status': UNAIRED, 'airdate': far_future + datetime.timedelta(days=7),
+ 'description': 'ep2 description'},
+ 3: {'name': 'new ep3', 'status': UNAIRED, 'airdate': far_future + datetime.timedelta(days=14),
+ 'description': 'ep3 description'},
+ 4: {'name': 'new ep4', 'status': UNAIRED, 'airdate': far_future + datetime.timedelta(days=28),
+ 'description': 'ep4 description'},
+ 5: {'name': 'new ep5', 'status': UNAIRED, 'description': 'ep5 description'},
+ }
+ }
+ },
+]
+
+
+def fake_action(*args, **kwargs):
+ pass
+
+
+class WebAPICase(test.SickbeardTestDBCase):
+ webserver = None
+ instance = None
+
+ def __init__(self, *args, **kwargs):
+ super(WebAPICase, self).__init__(*args, **kwargs)
+ self.org_mass_action = None
+ self.show_save_db = None
+
+ @classmethod
+ def setUpClass(cls):
+ super(WebAPICase, cls).setUpClass()
+ # web server options
+ sickbeard.WEB_PORT = 8080
+ cls.web_options = dict(
+ host='127.0.0.1',
+ port=sickbeard.WEB_PORT,
+ web_root=None,
+ data_root=os.path.join(sickbeard.PROG_DIR, 'gui', sickbeard.GUI_NAME),
+ log_dir=sickbeard.LOG_DIR,
+ username=sickbeard.WEB_USERNAME,
+ password=sickbeard.WEB_PASSWORD,
+ handle_reverse_proxy=sickbeard.HANDLE_REVERSE_PROXY,
+ enable_https=False,
+ https_cert=None,
+ https_key=None,
+ )
+ # start web server
+ try:
+ # used to check if existing SG instances have been started
+ sickbeard.helpers.wait_for_free_port(
+ sickbeard.WEB_IPV6 and '::1' or cls.web_options['host'], cls.web_options['port'])
+
+ cls.webserver = WebServer(options=cls.web_options)
+ cls.webserver.start()
+ # wait for server thread to be started
+ cls.webserver.wait_server_start()
+ cls.webserver.switch_handlers()
+ sickbeard.started = True
+ sickbeard.API_KEYS = [['unit test key', '1234567890']]
+ sickbeard.USE_API = True
+ except (BaseException, Exception) as e:
+ print('Failed to start WebServer: %s' % ex(e))
+
+ @classmethod
+ def tearDownClass(cls):
+ super(WebAPICase, cls).tearDownClass()
+ # shutdown web server
+ if cls.webserver:
+ cls.webserver.shut_down()
+ try:
+ cls.webserver.join(10)
+ except (BaseException, Exception):
+ pass
+ if cls.instance:
+ super(WebAPICase, cls.instance).tearDown()
+
+ def setUp(self):
+ self.reset_show_data = False
+ self.org_mass_action = None
+ self.show_save_db = None
+ if not WebAPICase.instance:
+ WebAPICase.instance = self
+ super(WebAPICase, self).setUp()
+ sickbeard.events = Events(None)
+ sickbeard.show_queue_scheduler = scheduler.Scheduler(
+ show_queue.ShowQueue(),
+ cycleTime=datetime.timedelta(seconds=3),
+ threadName='SHOWQUEUE')
+ sickbeard.search_queue_scheduler = scheduler.Scheduler(
+ search_queue.SearchQueue(),
+ cycleTime=datetime.timedelta(seconds=3),
+ threadName='SEARCHQUEUE')
+ sickbeard.backlog_search_scheduler = search_backlog.BacklogSearchScheduler(
+ search_backlog.BacklogSearcher(),
+ cycleTime=datetime.timedelta(minutes=60),
+ run_delay=datetime.timedelta(minutes=60),
+ threadName='BACKLOG')
+ sickbeard.indexermapper.indexer_list = [i for i in sickbeard.indexers.indexer_api.TVInfoAPI().all_sources]
+ for root_dirs, path, expected in root_folder_tests:
+ sickbeard.ROOT_DIRS = root_dirs
+ for cur_show in test_shows:
+ show_obj = TVShow(cur_show['tvid'], cur_show['prodid'])
+ for k, v in iteritems(cur_show):
+ if k in ('tvid', 'prodid', 'episodes', 'quality_init', 'quality_upgrade'):
+ continue
+ if '_%s' % k in show_obj.__dict__:
+ show_obj.__dict__['_%s' % k] = v
+ elif k in show_obj.__dict__:
+ show_obj.__dict__[k] = v
+ if 'quality_init' in cur_show and cur_show['quality_init']:
+ show_obj.quality = Quality.combineQualities(cur_show['quality_init'],
+ cur_show.get('quality_upgrade', []))
+ show_obj.dirty = True
+
+ show_obj.save_to_db(True)
+ sickbeard.showList.append(show_obj)
+ sickbeard.showDict.update({show_obj.sid_int: show_obj})
+
+ for season, eps in iteritems(cur_show['episodes']):
+ for ep, data in iteritems(eps):
+ ep_obj = TVEpisode(show_obj, season, ep)
+ for k, v in iteritems(data):
+ if '_%s' % k in ep_obj.__dict__:
+ ep_obj.__dict__['_%s' % k] = v
+ elif k in ep_obj.__dict__:
+ ep_obj.__dict__[k] = v
+ show_obj.sxe_ep_obj.setdefault(season, {})[ep] = ep_obj
+ ep_obj.save_to_db(True)
+ status, quality = Quality.splitCompositeStatus(ep_obj.status)
+ if status in (DOWNLOADED, SNATCHED):
+ s_r = SearchResult([ep_obj])
+ s_r.show_obj, s_r.quality, s_r.provider, s_r.name = \
+ show_obj, quality, None, '%s.S%sE%s.group' % (
+ show_obj.name, ep_obj.season, ep_obj.episode)
+ history.log_snatch(s_r)
+ if DOWNLOADED == status:
+ history.log_download(ep_obj, '%s.S%sE%s.group.mkv' % (
+ show_obj.name, ep_obj.season, ep_obj.episode), quality, 'group')
+
+ def tearDown(self):
+ if None is not self.org_mass_action:
+ db.DBConnection.mass_action = self.org_mass_action
+ if None is not self.show_save_db:
+ sickbeard.tv.TVShow.save_to_db = self.show_save_db
+ sickbeard.show_queue_scheduler.action.queue = []
+ sickbeard.search_queue_scheduler.action.queue = []
+ if self.reset_show_data:
+ for cur_show in test_shows:
+ show_obj = sickbeard.helpers.find_show_by_id({cur_show['tvid']: cur_show['prodid']})
+ if 'quality_init' in cur_show and cur_show['quality_init']:
+ show_obj.quality = Quality.combineQualities(cur_show['quality_init'],
+ cur_show.get('quality_upgrade', []))
+ else:
+ show_obj.quality = int(sickbeard.QUALITY_DEFAULT)
+ show_obj.upgrade_once = int(cur_show.get('upgrade_once', 0))
+ show_obj.scene = int(cur_show.get('scene', 0))
+ show_obj.save_to_db()
+ for season, data in iteritems(cur_show['episodes']):
+ for ep_nb, cur_ep in iteritems(data):
+ ep_obj = show_obj.get_episode(season, ep_nb)
+ ep_obj.status = cur_ep.get('status')
+ ep_obj.save_to_db()
+ set_scene_numbering_helper(
+ cur_show['tvid'], cur_show['prodid'], season, ep_nb,
+ scene_season=cur_ep.get('scene_season'), scene_episode=cur_ep.get('scene_episode'))
+
+ @staticmethod
+ def _request_from_api(cmd, params=None):
+ param = {'cmd': webapi._functionMaper_reversed[cmd]}
+ if isinstance(params, dict):
+ param.update(params)
+ return requests.get('http://127.0.0.1:%s/api/%s' % (sickbeard.WEB_PORT, sickbeard.API_KEYS[0][1]),
+ params=param).json()
+
+ @staticmethod
+ def _check_types(data, fields):
+ result, msg = True, []
+ missing_list, wrong_type = [], {}
+ for f in fields:
+ if f[0] not in data:
+ missing_list.append(f[0])
+ result = False
+ elif not isinstance(data[f[0]], f[1]):
+ wrong_type[f[0]] = 'Expected: %s, Got: %s' % (type(data[f[0]]), str(f[1]))
+ result = False
+ if missing_list:
+ msg.append('Missing fields: %s' % ', '.join(missing_list))
+ if wrong_type:
+ msg.append('Wrong field type: %s' % ', '.join(['%s: %s' % (k, v) for k, v in iteritems(wrong_type)]))
+ return result, ('', ', %s' % ', '.join(msg))[0 < len(msg)]
+
+ def _check_success_base_response(self, data, endpoint, message='', data_type=dict):
+ # type: (Any, Any, AnyStr, type) -> None
+ r, msg = self._check_types(data, [('result', string_types), ('data', data_type), ('message', string_types)])
+ self.assertTrue(r, msg='Failed command: %s%s' % (webapi._functionMaper_reversed[endpoint], msg))
+ self.assertEqual(data['result'], 'success')
+ self.assertEqual(data['message'], message)
+
+ def _check_episode_data(self, data, endpoint):
+ r, msg = self._check_types(
+ data,
+ [('absolute_number', integer_types), ('airdate', string_types), ('description', string_types),
+ ('name', string_types), ('quality', string_types), ('scene_absolute_number', integer_types),
+ ('scene_episode', integer_types), ('scene_season', integer_types), ('status', string_types),
+ ('timezone', (NoneType, string_types))]
+ )
+ self.assertTrue(r, msg='Failed command: %s - data episode dict%s' % (
+ webapi._functionMaper_reversed[endpoint], msg))
+
+ def test_sg(self):
+ data = self._request_from_api(webapi.CMD_SickGear)
+ r, msg = self._check_types(data, [('data', dict), ('message', string_types), ('result', string_types)])
+ self.assertTrue(r, msg='Failed command: %s%s' % (webapi._functionMaper_reversed[webapi.CMD_SickGear], msg))
+ r, msg = self._check_types(data['data'], [('api_commands', list), ('api_version', integer_types),
+ ('fork', string_types), ('sb_version', string_types)])
+ self.assertTrue(r, msg='Failed command: %s - data dict%s' % (
+ webapi._functionMaper_reversed[webapi.CMD_SickGear], msg))
+ needed_ele = [(k, string_types) for k in iterkeys(webapi._functionMaper) if 'listcommands' != k]
+ r = all(v[0] in data['data']['api_commands'] for v in needed_ele)
+ if not r:
+ i = list(set(n[0] for n in needed_ele) - set(data['data']['api_commands']))
+ else:
+ i = []
+ self.assertTrue(r, msg='Failed command: %s - api_commands list%s' % (
+ webapi._functionMaper_reversed[webapi.CMD_SickGear], ('', ', missing: %s' % ','.join(i))[0 < len(i)]))
+
+ def _check_show_fields(self, data, endpoint, include_season_list=True):
+ r, msg = self._check_types(
+ data,
+ [('air_by_date', integer_types), ('airs', string_types), ('anime', integer_types), ('cache', dict),
+ ('classification', string_types), ('flatten_folders', integer_types), ('genre', list),
+ ('global_exclude_ignore', string_types), ('global_exclude_require', string_types), ('ids', dict),
+ ('ignorewords', string_types), ('imdb_id', string_types), ('indexer', integer_types),
+ ('indexerid', integer_types), ('language', string_types), ('location', string_types),
+ ('network', string_types), ('next_ep_airdate', string_types), ('paused', integer_types),
+ ('prune', integer_types), ('quality', string_types), ('quality_details', dict),
+ ('requirewords', string_types), ('runtime', integer_types), ('scenenumbering', bool),
+ ('show_name', string_types), ('sports', integer_types), ('startyear', integer_types),
+ ('status', string_types), ('subtitles', integer_types), ('tag', string_types),
+ ('timezone', (NoneType, string_types)), ('tvrage_id', integer_types),
+ ('tvrage_name', string_types), ('upgrade_once', integer_types)] +
+ ([], [('season_list', list)])[include_season_list]
+ )
+ self.assertTrue(r, msg='Failed command: %s - data dict%s' % (
+ webapi._functionMaper_reversed[endpoint], msg))
+ r, msg = self._check_types(data['ids'],
+ [('%s' % k, integer_types) for k in sickbeard.indexermapper.indexer_list])
+ self.assertTrue(r, msg='Failed shows "ids" check: %s' % msg)
+ r, msg = self._check_types(
+ data['quality_details'],
+ [('archive', list), ('initial', list)]
+ )
+ self.assertTrue(r, msg='Failed shows "quality_details" check: %s' % msg)
+
+ def test_show(self):
+ # test not found
+ data = self._request_from_api(webapi.CMD_SickGearShow, params={'indexer': 1, 'indexerid': 98765})
+ r, msg = self._check_types(data, [('result', string_types), ('data', dict), ('message', string_types)])
+ self.assertTrue(r, msg='Failed command: %s%s' % (
+ webapi._functionMaper_reversed[webapi.CMD_SickGearShow], msg))
+ self.assertEqual(data['result'], 'failure')
+ self.assertEqual(data['message'], 'Show not found')
+ # test existing show
+ for cur_show in test_shows:
+ data = self._request_from_api(webapi.CMD_SickGearShow,
+ params={'indexer': cur_show['tvid'], 'indexerid': cur_show['prodid']})
+ self._check_success_base_response(data, webapi.CMD_SickGearShow)
+ self._check_show_fields(data['data'], webapi.CMD_SickGearShow)
+
+ def test_seasons(self):
+ for cur_show in test_shows:
+ data = self._request_from_api(webapi.CMD_SickGearShowSeasons,
+ params={'indexer': cur_show['tvid'], 'indexerid': cur_show['prodid']})
+ self._check_success_base_response(data, webapi.CMD_SickGearShowSeasons)
+ for season, eps in iteritems(cur_show['episodes']):
+ r, msg = self._check_types(data['data'], [('%s' % season, dict)])
+ self.assertTrue(r, msg='Failed command: %s - data dict%s' % (
+ webapi._functionMaper_reversed[webapi.CMD_SickGearShowSeasons], msg))
+ for cur_ep in iterkeys(eps):
+ r, msg = self._check_types(data['data']['%s' % season], [('%s' % cur_ep, dict)])
+ self.assertTrue(r, msg='Failed command: %s - data season dict%s' % (
+ webapi._functionMaper_reversed[webapi.CMD_SickGearShowSeasons], msg))
+ self._check_episode_data(data['data']['%s' % season]['%s' % cur_ep], webapi.CMD_SickGearShowSeasons)
+
+ def test_coming_episodes(self):
+ data = self._request_from_api(webapi.CMD_SickGearComingEpisodes)
+ self._check_success_base_response(data, webapi.CMD_SickGearComingEpisodes)
+ r, msg = self._check_types(data['data'], [('later', list), ('missed', list), ('soon', list), ('today', list)])
+ self.assertTrue(r, msg='Failed command: %s - data dict%s' % (
+ webapi._functionMaper_reversed[webapi.CMD_SickGearComingEpisodes], msg))
+ self.assertTrue(all(0 < len(data['data'][t]) for t in ('later', 'missed', 'soon', 'today')),
+ msg='Not all categories returned')
+ for t_p in ('later', 'missed', 'soon', 'today'):
+ for cur_ep in data['data'][t_p]:
+ r, msg = self._check_types(
+ cur_ep,
+ [('airdate', string_types), ('airs', string_types), ('data_network', string_types),
+ ('data_show_name', string_types), ('ep_name', string_types), ('ep_plot', string_types),
+ ('episode', integer_types), ('ids', dict), ('local_datetime', string_types),
+ ('network', string_types), ('parsed_datetime', string_types), ('paused', integer_types),
+ ('prod_id', integer_types), ('quality', string_types), ('runtime', integer_types),
+ ('season', integer_types), ('show_name', string_types), ('show_status', string_types),
+ ('status', integer_types), ('status_str', string_types), ('timezone', (NoneType, string_types)),
+ ('tv_id', integer_types), ('tvdbid', integer_types), ('weekday', integer_types)]
+ )
+ self.assertTrue(r, msg='Failed command: %s - data %s dict%s' % (
+ webapi._functionMaper_reversed[webapi.CMD_SickGearComingEpisodes], t_p, msg))
+ r, msg = self._check_types(cur_ep['ids'],
+ [('%s' % k, integer_types) for k in sickbeard.indexermapper.indexer_list])
+ self.assertTrue(r, msg='Failed %s "ids" check: %s' % (t_p, msg))
+
+ def test_all_shows(self):
+ data = self._request_from_api(webapi.CMD_SickGearShows)
+ self._check_success_base_response(data, webapi.CMD_SickGearShows)
+ for show_id, cur_show in iteritems(data['data']):
+ self._check_show_fields(cur_show, webapi.CMD_SickGearShows, include_season_list=False)
+
+ def test_episode(self):
+ # not found episode
+ data = self._request_from_api(webapi.CMD_SickGearEpisode,
+ params={'indexer': 1, 'indexerid': 1234, 'season': 10, 'episode': 11})
+ r, msg = self._check_types(data, [('result', string_types), ('data', dict), ('message', string_types)])
+ self.assertTrue(r, msg='Failed command: %s%s' % (
+ webapi._functionMaper_reversed[webapi.CMD_SickGearEpisode], msg))
+ self.assertEqual(data['result'], 'error')
+ self.assertEqual(data['message'], 'Episode not found')
+ # found episode
+ for cur_show in test_shows:
+ for season, eps in iteritems(cur_show['episodes']):
+ for cur_ep in iterkeys(eps):
+ data = self._request_from_api(webapi.CMD_SickGearEpisode,
+ params={'indexer': cur_show['tvid'], 'indexerid': cur_show['prodid'],
+ 'season': season, 'episode': cur_ep})
+ self._check_success_base_response(data, webapi.CMD_SickGearEpisode)
+ r, msg = self._check_types(
+ data['data'],
+ [('absolute_number', integer_types), ('airdate', string_types), ('description', string_types),
+ ('file_size', integer_types), ('file_size_human', string_types), ('location', string_types),
+ ('name', string_types), ('quality', string_types), ('release_name', string_types),
+ ('scene_absolute_number', (NoneType, integer_types)),
+ ('scene_episode', (NoneType, integer_types)), ('scene_season', (NoneType, integer_types)),
+ ('status', string_types), ('subtitles', string_types), ('timezone', (NoneType, string_types))]
+ )
+ self.assertTrue(r, msg='Failed command: %s - data dict%s' % (
+ webapi._functionMaper_reversed[webapi.CMD_SickGearEpisode], msg))
+
+ def test_shutdown(self):
+ data = self._request_from_api(webapi.CMD_SickGearShutdown)
+ r, msg = self._check_types(data, [('data', dict), ('message', string_types), ('result', string_types)])
+ self.assertTrue(r, msg='basic test failed for shutdown')
+ self.assertEqual(data['message'], 'SickGear is shutting down...')
+ self.assertEqual(data['result'], 'success')
+
+ def test_restart(self):
+ data = self._request_from_api(webapi.CMD_SickGearRestart)
+ r, msg = self._check_types(data, [('data', dict), ('message', string_types), ('result', string_types)])
+ self.assertTrue(r, msg='basic test failed for shutdown')
+ self.assertEqual(data['message'], 'SickGear is restarting...')
+ self.assertEqual(data['result'], 'success')
+
+ def test_ping(self):
+ data = self._request_from_api(webapi.CMD_SickGearPing)
+ r, msg = self._check_types(data, [('data', dict), ('message', string_types), ('result', string_types)])
+ self.assertTrue(r, msg='basic test failed for shutdown')
+ self.assertEqual(data['message'], 'Pong')
+ self.assertEqual(data['result'], 'success')
+
+ def test_get_indexers(self):
+ data = self._request_from_api(webapi.CMD_SickGearGetIndexers)
+ self._check_success_base_response(data, webapi.CMD_SickGearGetIndexers)
+ r, msg = self._check_types(data['data'], [('%s' % k, dict) for k in sickbeard.indexermapper.indexer_list])
+ self.assertTrue(r, msg='Failed command: %s - data dict%s' % (
+ webapi._functionMaper_reversed[webapi.CMD_SickGearGetIndexers], msg))
+ for i in sickbeard.indexermapper.indexer_list:
+ r, msg = self._check_types(
+ data['data']['%s' % i],
+ [('id', integer_types), ('main_url', string_types), ('name', string_types), ('searchable', bool),
+ ('show_url', string_types)])
+ self.assertTrue(r, msg='Failed command: %s - data %s dict%s' % (
+ webapi._functionMaper_reversed[webapi.CMD_SickGearGetIndexers], i, msg))
+
+ def test_get_seasonlist(self):
+ for cur_show in test_shows:
+ data = self._request_from_api(webapi.CMD_SickGearShowSeasonList,
+ params={'indexer': cur_show['tvid'], 'indexerid': cur_show['prodid']})
+ self._check_success_base_response(data, webapi.CMD_SickGearShowSeasonList, data_type=list)
+ r = all(isinstance(v, integer_types) for v in data['data'])
+ self.assertTrue(r, msg='Failed command: %s - data dict incorrect type' % (
+ webapi._functionMaper_reversed[webapi.CMD_SickGearShowSeasonList]))
+
+ def test_get_episodelist(self):
+ for cur_show in test_shows:
+ data = self._request_from_api(webapi.CMD_SickGearShowSeasons,
+ params={'indexer': cur_show['tvid'], 'indexerid': cur_show['prodid']})
+ self._check_success_base_response(data, webapi.CMD_SickGearShowSeasons)
+ r, msg = self._check_types(data['data'], [('%s' % i, dict) for i in iterkeys(cur_show['episodes'])])
+ self.assertTrue(r, msg='Failed command: %s - data dict%s' % (
+ webapi._functionMaper_reversed[webapi.CMD_SickGearShowSeasons], msg))
+ for season, eps in iteritems(cur_show['episodes']):
+ r, msg = self._check_types(data['data'], [('%s' % season, dict)])
+ self.assertTrue(r, msg='Failed command: %s - data dict%s' % (
+ webapi._functionMaper_reversed[webapi.CMD_SickGearShowSeasons], msg))
+ for cur_ep in iterkeys(eps):
+ r, msg = self._check_types(data['data']['%s' % season], [('%s' % cur_ep, dict)])
+ self.assertTrue(r, msg='Failed command: %s - data season dict%s' % (
+ webapi._functionMaper_reversed[webapi.CMD_SickGearShowSeasons], msg))
+ self._check_episode_data(data['data']['%s' % season]['%s' % cur_ep], webapi.CMD_SickGearShowSeasons)
+
+ def test_get_require_words(self):
+ # global
+ data = self._request_from_api(webapi.CMD_SickGearListRequireWords)
+ self._check_success_base_response(data, webapi.CMD_SickGearListRequireWords, message='Global require word list')
+ r, msg = self._check_types(data['data'], [('require words', list), ('type', string_types), ('use regex', bool)])
+ self.assertTrue(r, msg='Failed command: %s - data dict%s' % (
+ webapi._functionMaper_reversed[webapi.CMD_SickGearListRequireWords], msg))
+ # show based
+ for cur_show in test_shows:
+ data = self._request_from_api(webapi.CMD_SickGearListRequireWords,
+ params={'indexer': cur_show['tvid'], 'indexerid': cur_show['prodid']})
+ self._check_success_base_response(data, webapi.CMD_SickGearListRequireWords,
+ message='%s: require word list' % cur_show['name'])
+ r, msg = self._check_types(
+ data['data'],
+ [('require words', list), ('type', string_types), ('use regex', bool),
+ ('global exclude require', list), ('indexer', integer_types), ('indexerid', integer_types),
+ ('show name', string_types)])
+ self.assertTrue(r, msg='Failed command: %s - data dict%s' % (
+ webapi._functionMaper_reversed[webapi.CMD_SickGearListRequireWords], msg))
+
+ def test_get_ignore_words(self):
+ # global
+ data = self._request_from_api(webapi.CMD_SickGearListIgnoreWords)
+ self._check_success_base_response(data, webapi.CMD_SickGearListIgnoreWords, message='Global ignore word list')
+ r, msg = self._check_types(data['data'], [('ignore words', list), ('type', string_types), ('use regex', bool)])
+ self.assertTrue(r, msg='Failed command: %s - data dict%s' % (
+ webapi._functionMaper_reversed[webapi.CMD_SickGearListIgnoreWords], msg))
+ # show based
+ for cur_show in test_shows:
+ data = self._request_from_api(webapi.CMD_SickGearListIgnoreWords,
+ params={'indexer': cur_show['tvid'], 'indexerid': cur_show['prodid']})
+ self._check_success_base_response(data, webapi.CMD_SickGearListIgnoreWords,
+ message='%s: ignore word list' % cur_show['name'])
+ r, msg = self._check_types(
+ data['data'],
+ [('ignore words', list), ('type', string_types), ('use regex', bool),
+ ('global exclude ignore', list), ('indexer', integer_types), ('indexerid', integer_types),
+ ('show name', string_types)])
+ self.assertTrue(r, msg='Failed command: %s - data dict%s' % (
+ webapi._functionMaper_reversed[webapi.CMD_SickGearListIgnoreWords], msg))
+
+ def test_get_search_queue(self):
+ data = self._request_from_api(webapi.CMD_SickGearSearchQueue)
+ self._check_success_base_response(data, webapi.CMD_SickGearSearchQueue)
+ r, msg = self._check_types(
+ data['data'],
+ [(t, list) for t in ('backlog', 'failed', 'manual', 'proper')] + [('recent', integer_types)])
+ self.assertTrue(r, msg='Failed command: %s - data dict%s' % (
+ webapi._functionMaper_reversed[webapi.CMD_SickGearSearchQueue], msg))
+
+ def test_get_system_default(self):
+ data = self._request_from_api(webapi.CMD_SickGearGetDefaults)
+ self._check_success_base_response(data, webapi.CMD_SickGearGetDefaults)
+ r, msg = self._check_types(
+ data['data'],
+ [('archive', list), ('flatten_folders', integer_types), ('future_show_paused', integer_types),
+ ('initial', list), ('status', string_types)])
+ self.assertTrue(r, msg='Failed command: %s - data dict%s' % (
+ webapi._functionMaper_reversed[webapi.CMD_SickGearGetDefaults], msg))
+
+ def test_get_all_qualities(self):
+ data = self._request_from_api(webapi.CMD_SickGearGetQualities)
+ self._check_success_base_response(data, webapi.CMD_SickGearGetQualities)
+ r, msg = self._check_types(data['data'], [(q, integer_types) for q in iterkeys(webapi.quality_map)])
+ self.assertTrue(r, msg='Failed command: %s - data dict%s' % (
+ webapi._functionMaper_reversed[webapi.CMD_SickGearGetQualities], msg))
+
+ def test_get_human_qualities(self):
+ data = self._request_from_api(webapi.CMD_SickGearGetqualityStrings)
+ self._check_success_base_response(data, webapi.CMD_SickGearGetqualityStrings)
+ r, msg = self._check_types(data['data'], [('%s' % q, string_types) for q in iterkeys(Quality.qualityStrings)])
+ self.assertTrue(r, msg='Failed command: %s - data dict%s' % (
+ webapi._functionMaper_reversed[webapi.CMD_SickGearGetqualityStrings], msg))
+
+ def test_get_scene_qualities(self):
+ # global
+ data = self._request_from_api(webapi.CMD_SickGearExceptions)
+ self._check_success_base_response(data, webapi.CMD_SickGearExceptions)
+ r = all(isinstance(e, string_types) for e in data['data'])
+ self.assertTrue(r, msg='Failed command: %s - data dict' % (
+ webapi._functionMaper_reversed[webapi.CMD_SickGearExceptions]))
+ # show specific
+ for cur_show in test_shows:
+ data = self._request_from_api(webapi.CMD_SickGearExceptions,
+ params={'indexer': cur_show['tvid'], 'indexerid': cur_show['prodid']})
+ self._check_success_base_response(data, webapi.CMD_SickGearExceptions, data_type=list)
+ r = all(isinstance(e, string_types) for e in data['data'])
+ self.assertTrue(r, msg='Failed command: %s - data dict' % (
+ webapi._functionMaper_reversed[webapi.CMD_SickGearExceptions]))
+
+ def test_history(self):
+ data = self._request_from_api(webapi.CMD_SickGearHistory)
+ self._check_success_base_response(data, webapi.CMD_SickGearHistory, data_type=list)
+ for cur_show in data['data']:
+ self.assertTrue(isinstance(cur_show, dict), msg='wrong type')
+ r, msg = self._check_types(
+ cur_show,
+ [('date', string_types), ('episode', integer_types), ('indexer', integer_types),
+ ('indexerid', integer_types), ('provider', string_types), ('quality', string_types),
+ ('resource', string_types), ('resource_path', string_types), ('season', integer_types),
+ ('show_name', string_types), ('status', string_types), ('tvdbid', integer_types),
+ ('version', integer_types)])
+ self.assertTrue(r, msg='Failed command: %s - data dict%s' % (
+ webapi._functionMaper_reversed[webapi.CMD_SickGearHistory], msg))
+
+ def test_shows_stats(self):
+ data = self._request_from_api(webapi.CMD_SickGearShowsStats)
+ self._check_success_base_response(data, webapi.CMD_SickGearShowsStats)
+ r, msg = self._check_types(
+ data['data'],
+ [('ep_downloaded', integer_types), ('ep_total', integer_types), ('shows_active', integer_types),
+ ('shows_total', integer_types)])
+ self.assertTrue(r, msg='Failed command: %s - data dict%s' % (
+ webapi._functionMaper_reversed[webapi.CMD_SickGearShowsStats], msg))
+
+ def test_show_stats(self):
+ for cur_show in test_shows:
+ data = self._request_from_api(webapi.CMD_SickGearShowStats,
+ params={'indexer': cur_show['tvid'], 'indexerid': cur_show['prodid']})
+ self._check_success_base_response(data, webapi.CMD_SickGearShowStats)
+ r, msg = self._check_types(
+ data['data'],
+ [(statusStrings.statusStrings[status].lower().replace(" ", "_").replace("(", "").replace(
+ ")", ""), integer_types) for status in statusStrings.statusStrings
+ if status not in SNATCHED_ANY + [UNKNOWN, DOWNLOADED]] + [('downloaded', dict), ('snatched', dict)])
+ self.assertTrue(r, msg='Failed command: %s - data dict%s' % (
+ webapi._functionMaper_reversed[webapi.CMD_SickGearShowStats], msg))
+ for s in ('downloaded', 'snatched'):
+ r, msg = self._check_types(
+ data['data'][s],
+ [(t.lower().replace(" ", "_").replace("(", "").replace(")", ""), integer_types)
+ for k, t in iteritems(Quality.qualityStrings) if Quality.NONE != k])
+ self.assertTrue(r, msg='Failed command: %s - data dict - %s:%s' % (
+ webapi._functionMaper_reversed[webapi.CMD_SickGearShowStats], s, msg))
+
+ def test_get_root_dirs(self):
+ data = self._request_from_api(webapi.CMD_SickGearGetRootDirs)
+ self._check_success_base_response(data, webapi.CMD_SickGearGetRootDirs, data_type=list)
+ for r_d in data['data']:
+ r, msg = self._check_types(r_d, [('default', integer_types), ('location', string_types),
+ ('valid', integer_types)])
+ self.assertTrue(r, msg='Failed command: %s - data dict%s' % (
+ webapi._functionMaper_reversed[webapi.CMD_SickGearListIgnoreWords], msg))
+
+ def test_get_schedules(self):
+ data = self._request_from_api(webapi.CMD_SickGearCheckScheduler)
+ self._check_success_base_response(data, webapi.CMD_SickGearCheckScheduler)
+ r, msg = self._check_types(
+ data['data'],
+ [('backlog_is_paused', integer_types), ('backlog_is_running', integer_types),
+ ('last_backlog', string_types), ('next_backlog', string_types)])
+ self.assertTrue(r, msg='Failed command: %s - data dict%s' % (
+ webapi._functionMaper_reversed[webapi.CMD_SickGearCheckScheduler], msg))
+
+ def test_do_show_update(self):
+ for cur_show in test_shows:
+ data = self._request_from_api(webapi.CMD_SickGearShowUpdate,
+ params={'indexer': cur_show['tvid'], 'indexerid': cur_show['prodid']})
+ self._check_success_base_response(data, webapi.CMD_SickGearShowUpdate,
+ message='%s has queued to be updated' % cur_show['name'])
+ # check that duplicate adding fails
+ for cur_show in test_shows:
+ data = self._request_from_api(webapi.CMD_SickGearShowUpdate,
+ params={'indexer': cur_show['tvid'], 'indexerid': cur_show['prodid']})
+ r, msg = self._check_types(data, [('data', dict), ('message', string_types), ('result', string_types)])
+ self.assertTrue(r, msg='Failed command: %s - data dict%s' % (
+ webapi._functionMaper_reversed[webapi.CMD_SickGearShowUpdate], msg))
+ self.assertTrue('Unable to update %s.' % cur_show['name'] in data['message'], msg='Wrong failure message')
+
+ def test_do_show_refresh(self):
+ for cur_show in test_shows:
+ data = self._request_from_api(webapi.CMD_SickGearShowRefresh,
+ params={'indexer': cur_show['tvid'], 'indexerid': cur_show['prodid']})
+ self._check_success_base_response(data, webapi.CMD_SickGearShowUpdate,
+ message='%s has queued to be refreshed' % cur_show['name'])
+ # check that duplicate adding fails
+ for cur_show in test_shows:
+ data = self._request_from_api(webapi.CMD_SickGearShowRefresh,
+ params={'indexer': cur_show['tvid'], 'indexerid': cur_show['prodid']})
+ r, msg = self._check_types(data, [('data', dict), ('message', string_types), ('result', string_types)])
+ self.assertTrue(r, msg='Failed command: %s - data dict%s' % (
+ webapi._functionMaper_reversed[webapi.CMD_SickGearShowRefresh], msg))
+ self.assertTrue('Unable to refresh %s.' % cur_show['name'] in data['message'], msg='Wrong failure message')
+
+ def test_pause_show(self):
+ self.reset_show_data = True
+ self.show_save_db = sickbeard.tv.TVShow.save_to_db
+ sickbeard.tv.TVShow.save_to_db = fake_action
+ for set_pause in (None, 0, 1, 0):
+ for cur_show in test_shows:
+ params = {'indexer': cur_show['tvid'], 'indexerid': cur_show['prodid']}
+ if None is not set_pause:
+ params.update({'pause': set_pause})
+ data = self._request_from_api(webapi.CMD_SickGearShowPause, params=params)
+ self._check_success_base_response(data, webapi.CMD_SickGearShowPause,
+ message='%s has been %spaused' % (
+ cur_show['name'], ('', 'un')[set_pause in (None, 0, False)]))
+
+ def test_set_scene_numbering(self):
+ self.reset_show_data = True
+ self.show_save_db = sickbeard.tv.TVShow.save_to_db
+ sickbeard.tv.TVShow.save_to_db = fake_action
+ for set_scene in (1, 0):
+ for cur_show in test_shows:
+ params = {'indexer': cur_show['tvid'], 'indexerid': cur_show['prodid']}
+ if None is not set_scene:
+ params.update({'activate': set_scene})
+ data = self._request_from_api(webapi.CMD_SickGearActivateSceneNumber, params=params)
+ self._check_success_base_response(data, webapi.CMD_SickGearActivateSceneNumber,
+ message='Scene Numbering %sactivated' % (
+ ('', 'de')[set_scene in (None, 0, False)]))
+ r, msg = self._check_types(
+ data['data'],
+ [('indexer', integer_types), ('indexerid', integer_types), ('scenenumbering', bool),
+ ('show_name', string_types)])
+ self.assertTrue(r, msg='Failed command: %s - data dict%s' % (
+ webapi._functionMaper_reversed[webapi.CMD_SickGearActivateSceneNumber], msg))
+ self.assertTrue(data['data']['scenenumbering'] == bool(set_scene))
+ self.assertTrue(data['data']['show_name'] == cur_show['name'])
+
+ def test_set_show_quality(self):
+ self.reset_show_data = True
+ self.show_save_db = sickbeard.tv.TVShow.save_to_db
+ sickbeard.tv.TVShow.save_to_db = fake_action
+ for set_quality in [
+ {'init': [Quality.SDTV],
+ 'upgrade': [],
+ 'upgrade_once': 0},
+ {'init': [Quality.SDTV],
+ 'upgrade': [],
+ 'upgrade_once': 1},
+ {'init': [Quality.SDTV],
+ 'upgrade': [Quality.FULLHDWEBDL, Quality.FULLHDBLURAY],
+ 'upgrade_once': 0},
+ {'init': [Quality.SDTV],
+ 'upgrade': [Quality.FULLHDWEBDL, Quality.FULLHDBLURAY],
+ 'upgrade_once': 1},
+ {'init': [Quality.SDTV, Quality.SDDVD, Quality.HDTV],
+ 'upgrade': [],
+ 'upgrade_once': 0},
+ {'init': [Quality.SDTV, Quality.SDDVD, Quality.HDTV],
+ 'upgrade': [],
+ 'upgrade_once': 1},
+ {'init': [Quality.SDTV, Quality.SDDVD, Quality.HDTV],
+ 'upgrade': [Quality.SDDVD, Quality.HDWEBDL],
+ 'upgrade_once': 0},
+ {'init': [Quality.SDTV, Quality.SDDVD, Quality.HDTV],
+ 'upgrade': [Quality.SDDVD, Quality.HDWEBDL],
+ 'upgrade_once': 1},
+ ]:
+ for cur_show in test_shows:
+ show_obj = sickbeard.helpers.find_show_by_id({cur_show['tvid']: cur_show['prodid']})
+ params = {'indexer': cur_show['tvid'], 'indexerid': cur_show['prodid']}
+ for t in ('init', 'upgrade', 'upgrade_once'):
+ if set_quality[t]:
+ params.update({t: set_quality[t]})
+ data = self._request_from_api(webapi.CMD_SickGearShowSetQuality, params=params)
+ self._check_success_base_response(
+ data,
+ webapi.CMD_SickGearShowSetQuality,
+ message='%s quality has been changed to %s' % (
+ cur_show['name'], webapi._get_quality_string(show_obj.quality)))
+ self.assertEqual(show_obj.upgrade_once, int(set_quality['upgrade_once']))
+
+ def test_set_show_scene_numbers(self):
+ self.reset_show_data = True
+ self.show_save_db = sickbeard.tv.TVShow.save_to_db
+ sickbeard.tv.TVShow.save_to_db = fake_action
+ for set_numbers in [
+ {'forSeason': 1,
+ 'forEpisode': 1,
+ 'forAbsolute': None,
+ 'sceneSeason': 1,
+ 'sceneEpisode': 2,
+ 'sceneAbsolute': None,
+ },
+ ]:
+ for cur_show in test_shows:
+ params = {'indexer': cur_show['tvid'], 'indexerid': cur_show['prodid']}
+ set_scene_params = params.copy()
+ set_scene_params.update({'activate': 1})
+ data = self._request_from_api(webapi.CMD_SickGearActivateSceneNumber,
+ params=set_scene_params)
+ self._check_success_base_response(data, webapi.CMD_SickGearActivateSceneNumber,
+ message='Scene Numbering activated')
+ for t in ('forSeason', 'forEpisode', 'forAbsolute', 'sceneSeason', 'sceneEpisode', 'sceneAbsolute'):
+ if set_numbers[t]:
+ params.update({t: set_numbers[t]})
+ data = self._request_from_api(webapi.CMD_SickGearSetSceneNumber, params=params)
+ self._check_success_base_response(data, webapi.CMD_SickGearSetSceneNumber)
+ self._check_types(
+ data['data'],
+ [(t, integer_types) for t in ('forSeason', 'forEpisode', 'sceneSeason', 'sceneEpisode')] +
+ [('success', bool)])
+ for t in ('forSeason', 'forEpisode', 'sceneSeason', 'sceneEpisode'):
+ self.assertEqual(data['data'][t], set_numbers[t])
+
+ def test_set_episode_status(self):
+ self.org_mass_action = db.DBConnection.mass_action
+ db.DBConnection.mass_action = fake_action
+ self.reset_show_data = True
+ failed_msg = 'Failed to set all or some status. Check data.'
+ success_msg = 'All status set successfully.'
+ for cur_quality_str, cur_quality in itertools.chain(iteritems(webapi.quality_map), iteritems({'None': None})):
+ for cur_value, cur_status in iteritems(statusStrings.statusStrings):
+ if (cur_quality and cur_value not in (SNATCHED, DOWNLOADED, ARCHIVED)) or \
+ (None is cur_quality and SNATCHED == cur_value):
+ continue
+ cur_status = cur_status.lower()
+ # print('Testing setting episode status to: %s %s' % (cur_status, cur_quality_str))
+ if cur_value in (UNKNOWN, UNAIRED, SNATCHED_PROPER, SNATCHED_BEST, SUBTITLED):
+ continue
+ for cur_show in test_shows:
+ for season, eps in iteritems(cur_show['episodes']):
+ for ep_nb, cur_ep in iteritems(eps):
+ ep_obj = sickbeard.helpers.find_show_by_id({cur_show['tvid']: cur_show['prodid']}).\
+ get_episode(season, ep_nb)
+ params = {'indexer': cur_show['tvid'], 'indexerid': cur_show['prodid'], 'season': season,
+ 'episode': ep_nb, 'status': cur_status}
+ if cur_quality:
+ params.update({'quality': cur_quality_str})
+ old_status = ep_obj.status
+ status, quality = Quality.splitCompositeStatus(ep_obj.status)
+ expect_fail = UNAIRED == status or (DOWNLOADED == status and not cur_quality)
+ expected_msg = (success_msg, failed_msg)[expect_fail]
+ data = self._request_from_api(webapi.CMD_SickGearEpisodeSetStatus, params=params)
+ r, msg = self._check_types(data,
+ [('result', string_types), ('data', (dict, list)[expect_fail]),
+ ('message', string_types)])
+ self.assertTrue(r, msg=msg)
+ self.assertTrue(data['message'].startswith(expected_msg))
+ # reset status
+ ep_obj.status = old_status
+ # ep_obj.save_to_db()
+
+
+if __name__ == '__main__':
+ print('==================')
+ print('STARTING - WebAPI TESTS')
+ print('==================')
+ print('######################################################################')
+ suite = unittest.TestLoader().loadTestsFromTestCase(WebAPICase)
+ unittest.TextTestRunner(verbosity=2).run(suite)