diff --git a/interfaces/Config/templates/config_sorting.tmpl b/interfaces/Config/templates/config_sorting.tmpl
index 81ceced..868b32b 100644
--- a/interfaces/Config/templates/config_sorting.tmpl
+++ b/interfaces/Config/templates/config_sorting.tmpl
@@ -96,6 +96,26 @@
diff --git a/requirements.txt b/requirements.txt
index cf802a5..1c59616 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -9,6 +9,7 @@ portend
chardet
notify2
puremagic
+guessit>=3.1.0
# Windows system integration
pywin32>=227; sys_platform == 'win32'
diff --git a/sabnzbd/cfg.py b/sabnzbd/cfg.py
index cc54c57..65d1f44 100644
--- a/sabnzbd/cfg.py
+++ b/sabnzbd/cfg.py
@@ -231,13 +231,12 @@ rating_filter_pause_keywords = OptionStr("misc", "rating_filter_pause_keywords")
##############################################################################
enable_tv_sorting = OptionBool("misc", "enable_tv_sorting", False)
tv_sort_string = OptionStr("misc", "tv_sort_string")
-tv_sort_countries = OptionNumber("misc", "tv_sort_countries", 1)
tv_categories = OptionList("misc", "tv_categories", "")
enable_movie_sorting = OptionBool("misc", "enable_movie_sorting", False)
movie_sort_string = OptionStr("misc", "movie_sort_string")
movie_sort_extra = OptionStr("misc", "movie_sort_extra", "-cd%1", strip=False)
-movie_extra_folders = OptionBool("misc", "movie_extra_folder", False)
+movie_extra_folder = OptionBool("misc", "movie_extra_folder", False)
movie_categories = OptionList("misc", "movie_categories", ["movies"])
enable_date_sorting = OptionBool("misc", "enable_date_sorting", False)
@@ -296,6 +295,7 @@ rss_odd_titles = OptionList("misc", "rss_odd_titles", ["nzbindex.nl/", "nzbindex
req_completion_rate = OptionNumber("misc", "req_completion_rate", 100.2, 100, 200)
selftest_host = OptionStr("misc", "selftest_host", "self-test.sabnzbd.org")
movie_rename_limit = OptionStr("misc", "movie_rename_limit", "100M")
+episode_rename_limit = OptionStr("misc", "episode_rename_limit", "20M")
size_limit = OptionStr("misc", "size_limit", "0")
show_sysload = OptionNumber("misc", "show_sysload", 2, 0, 2)
history_limit = OptionNumber("misc", "history_limit", 10, 0)
diff --git a/sabnzbd/constants.py b/sabnzbd/constants.py
index 3cc515b..c2d62e8 100644
--- a/sabnzbd/constants.py
+++ b/sabnzbd/constants.py
@@ -123,25 +123,10 @@ CHEETAH_DIRECTIVES = {"directiveStartToken": "
IGNORED_FOLDERS = ("@eaDir", ".appleDouble")
-# (MATCHER, [EXTRA, MATCHERS])
-series_match = [
- (compile(r"( [sS]|[\d]+)x(\d+)"), [compile(r"^[-\.]+([sS]|[\d])+x(\d+)"), compile(r"^[-\.](\d+)")]), # 1x01
- (
- compile(r"[Ss](\d+)[\.\-]?[Ee](\d+)"), # S01E01
- [compile(r"^[-\.]+[Ss](\d+)[\.\-]?[Ee](\d+)"), compile(r"^[-\.](\d+)")],
- ),
- (compile(r"[ \-_\.](\d)(\d{2,2})[ \-_\.]"), []), # .101. / _101_ / etc.
- (compile(r"[ \-_\.](\d)(\d{2,2})$"), []), # .101 at end of title
+EXCLUDED_GUESSIT_PROPERTIES = [
+ "part",
]
-date_match = [r"(\d{4})\W(\d{1,2})\W(\d{1,2})", r"(\d{1,2})\W(\d{1,2})\W(\d{4})"] # 2008-10-16 # 10.16.2008
-
-year_match = r"[\W]([1|2]\d{3})([^\w]|$)" # Something '(YYYY)' or '.YYYY.' or ' YYYY '
-
-sample_match = r"((^|[\W_])(sample|proof))" # something-sample or something-proof
-
-resolution_match = r"(^|[\W_])((240|360|480|540|576|720|900|1080|1440|2160|4320)[piP])([\W_]|$)" # 576i, 720p, 1080P
-
class Status:
IDLE = "Idle" # Q: Nothing in the queue
diff --git a/sabnzbd/database.py b/sabnzbd/database.py
index 07c5cd3..18c549f 100644
--- a/sabnzbd/database.py
+++ b/sabnzbd/database.py
@@ -271,8 +271,8 @@ class HistoryDB:
if to_keep > 0:
logging.info("Removing all but last %s completed jobs from history", to_keep)
return self.execute(
- """DELETE FROM history WHERE status = ? AND id NOT IN (
- SELECT id FROM history WHERE status = ? ORDER BY completed DESC LIMIT ?
+ """DELETE FROM history WHERE status = ? AND id NOT IN (
+ SELECT id FROM history WHERE status = ? ORDER BY completed DESC LIMIT ?
)""",
(Status.COMPLETED, Status.COMPLETED, to_keep),
save=True,
@@ -346,9 +346,8 @@ class HistoryDB:
def have_episode(self, series, season, episode):
"""Check whether History contains this series episode"""
total = 0
- series = series.lower().replace(".", " ").replace("_", " ").replace(" ", " ")
if series and season and episode:
- pattern = "%s/%s/%s" % (series, season, episode)
+ pattern = "%s/%s/%s" % (series.lower(), season, episode)
if self.execute(
"""SELECT COUNT(*) FROM History WHERE series = ? AND STATUS != ?""", (pattern, Status.FAILED)
):
@@ -477,7 +476,7 @@ def build_history_info(nzo, workdir_complete="", postproc_time=0, script_output=
# Analyze series info only when job is finished
series = ""
if series_info:
- seriesname, season, episode, _ = sabnzbd.newsunpack.analyse_show(nzo.final_name)
+ seriesname, season, episode = sabnzbd.newsunpack.analyse_show(nzo.final_name)[:3]
if seriesname and season and episode:
series = "%s/%s/%s" % (seriesname.lower(), season, episode)
diff --git a/sabnzbd/deobfuscate_filenames.py b/sabnzbd/deobfuscate_filenames.py
old mode 100755
new mode 100644
diff --git a/sabnzbd/interface.py b/sabnzbd/interface.py
index 2b86885..f7a8c88 100644
--- a/sabnzbd/interface.py
+++ b/sabnzbd/interface.py
@@ -34,6 +34,7 @@ from random import randint
from xml.sax.saxutils import escape
from Cheetah.Template import Template
from typing import Optional, Callable, Union
+from guessit.api import properties as guessit_properties
import sabnzbd
import sabnzbd.rss
@@ -71,7 +72,7 @@ from sabnzbd.utils.diskspeed import diskspeedmeasure
from sabnzbd.utils.getperformance import getpystone
from sabnzbd.utils.internetspeed import internetspeed
import sabnzbd.utils.ssdp
-from sabnzbd.constants import DEF_STDCONFIG, DEFAULT_PRIORITY, CHEETAH_DIRECTIVES
+from sabnzbd.constants import DEF_STDCONFIG, DEFAULT_PRIORITY, CHEETAH_DIRECTIVES, EXCLUDED_GUESSIT_PROPERTIES
from sabnzbd.lang import list_languages
from sabnzbd.api import (
list_scripts,
@@ -924,6 +925,7 @@ SPECIAL_VALUE_LIST = (
"downloader_sleep_time",
"size_limit",
"movie_rename_limit",
+ "episode_rename_limit",
"nomedia_marker",
"max_url_retries",
"req_completion_rate",
@@ -1897,6 +1899,9 @@ class ConfigSorting:
for kw in SORT_LIST:
conf[kw] = config.get_config("misc", kw)()
conf["categories"] = list_cats(False)
+ conf["guessit_properties"] = tuple(
+ prop for prop in guessit_properties().keys() if prop not in EXCLUDED_GUESSIT_PROPERTIES
+ )
template = Template(
file=os.path.join(sabnzbd.WEB_DIR_CONFIG, "config_sorting.tmpl"),
diff --git a/sabnzbd/newsunpack.py b/sabnzbd/newsunpack.py
index 7162eec..437a26e 100644
--- a/sabnzbd/newsunpack.py
+++ b/sabnzbd/newsunpack.py
@@ -28,6 +28,7 @@ import time
import zlib
import shutil
import functools
+from typing import Tuple
import sabnzbd
from sabnzbd.encoding import platform_btou, correct_unknown_encoding, ubtou
@@ -2297,16 +2298,18 @@ def crc_calculate(path):
return b"%08x" % (crc & 0xFFFFFFFF)
-def analyse_show(name):
+def analyse_show(name: str) -> Tuple[str, str, str, str, bool]:
"""Do a quick SeasonSort check and return basic facts"""
- job = SeriesSorter(None, name, None, None)
- job.match(force=True)
- if job.is_match():
+ job = SeriesSorter(None, name, None, None, force=True)
+ if job.matched:
job.get_values()
- info = job.show_info
- show_name = info.get("show_name", "").replace(".", " ").replace("_", " ")
- show_name = show_name.replace(" ", " ")
- return show_name, info.get("season_num", ""), info.get("episode_num", ""), info.get("ep_name", "")
+ return (
+ job.info.get("title", ""),
+ job.info.get("season_num", ""),
+ job.info.get("episode_num", ""),
+ job.info.get("ep_name", ""),
+ job.is_proper(),
+ )
def pre_queue(nzo: NzbObject, pp, cat):
@@ -2334,7 +2337,7 @@ def pre_queue(nzo: NzbObject, pp, cat):
str(nzo.bytes),
" ".join(nzo.groups),
]
- command.extend(analyse_show(nzo.final_name_with_password))
+ command.extend(analyse_show(nzo.final_name_with_password)[:4])
command = [fix(arg) for arg in command]
# Fields not in the NZO directly
diff --git a/sabnzbd/nzbstuff.py b/sabnzbd/nzbstuff.py
index 4d748f4..4dc494b 100644
--- a/sabnzbd/nzbstuff.py
+++ b/sabnzbd/nzbstuff.py
@@ -94,7 +94,6 @@ RE_SUBJECT_FILENAME_QUOTES = re.compile(r'"([^"]*)"')
# Otherwise something that looks like a filename
RE_SUBJECT_BASIC_FILENAME = re.compile(r"([\w\-+()'\s.,]+\.[A-Za-z0-9]{2,4})[^A-Za-z0-9]")
RE_RAR = re.compile(r"(\.rar|\.r\d\d|\.s\d\d|\.t\d\d|\.u\d\d|\.v\d\d)$", re.I)
-RE_PROPER = re.compile(r"(^|[\. _-])(PROPER|REAL|REPACK)([\. _-]|$)")
##############################################################################
@@ -1973,39 +1972,38 @@ class NzbObject(TryList):
no_series_dupes = cfg.no_series_dupes()
series_propercheck = cfg.series_propercheck()
- # abort logic if dupe check is off for both nzb+series
+ # Abort if dupe check is off for both nzb and series
if not no_dupes and not no_series_dupes:
return False, False
series = False
res = False
- history_db = HistoryDB()
- # dupe check off nzb contents
- if no_dupes:
- res = history_db.have_name_or_md5sum(self.final_name, self.md5sum)
- logging.debug(
- "Dupe checking NZB in history: filename=%s, md5sum=%s, result=%s", self.filename, self.md5sum, res
- )
- if not res and cfg.backup_for_duplicates():
- res = sabnzbd.backup_exists(self.filename)
- logging.debug("Dupe checking NZB against backup: filename=%s, result=%s", self.filename, res)
- # dupe check off nzb filename
- if not res and no_series_dupes:
- series, season, episode, misc = sabnzbd.newsunpack.analyse_show(self.final_name)
- if RE_PROPER.match(misc) and series_propercheck:
- logging.debug("Dupe checking series+season+ep in history aborted due to PROPER/REAL/REPACK found")
- else:
- res = history_db.have_episode(series, season, episode)
+ with HistoryDB() as history_db:
+ # Dupe check off nzb contents
+ if no_dupes:
+ res = history_db.have_name_or_md5sum(self.final_name, self.md5sum)
logging.debug(
- "Dupe checking series+season+ep in history: series=%s, season=%s, episode=%s, result=%s",
- series,
- season,
- episode,
- res,
+ "Dupe checking NZB in history: filename=%s, md5sum=%s, result=%s", self.filename, self.md5sum, res
)
+ if not res and cfg.backup_for_duplicates():
+ res = sabnzbd.backup_exists(self.filename)
+ logging.debug("Dupe checking NZB against backup: filename=%s, result=%s", self.filename, res)
+ # Dupe check off nzb filename
+ if not res and no_series_dupes:
+ series, season, episode, _, is_proper = sabnzbd.newsunpack.analyse_show(self.final_name)
+ if is_proper and series_propercheck:
+ logging.debug("Dupe checking series+season+ep in history aborted due to PROPER/REAL/REPACK found")
+ else:
+ res = history_db.have_episode(series, season, episode)
+ logging.debug(
+ "Dupe checking series+season+ep in history: series=%s, season=%s, episode=%s, result=%s",
+ series,
+ season,
+ episode,
+ res,
+ )
- history_db.close()
return res, series
def is_gone(self):
diff --git a/sabnzbd/postproc.py b/sabnzbd/postproc.py
index 0f63737..bfc5dca 100644
--- a/sabnzbd/postproc.py
+++ b/sabnzbd/postproc.py
@@ -65,13 +65,12 @@ from sabnzbd.filesystem import (
get_filename,
)
from sabnzbd.nzbstuff import NzbObject
-from sabnzbd.sorting import Sorter
+from sabnzbd.sorting import Sorter, is_sample, move_to_parent_directory
from sabnzbd.constants import (
REPAIR_PRIORITY,
FORCE_PRIORITY,
POSTPROC_QUEUE_FILE_NAME,
POSTPROC_QUEUE_VERSION,
- sample_match,
JOB_ADMIN,
Status,
VERIFIED_FILE,
@@ -93,9 +92,6 @@ import sabnzbd.deobfuscate_filenames as deobfuscate
MAX_FAST_JOB_COUNT = 3
-# Match samples
-RE_SAMPLE = re.compile(sample_match, re.I)
-
class PostProcessor(Thread):
"""PostProcessor thread, designed as Singleton"""
@@ -515,13 +511,12 @@ def process_job(nzo: NzbObject):
# TV/Movie/Date Renaming code part 2 - rename and move files to parent folder
if all_ok and file_sorter.sort_file:
if newfiles:
- file_sorter.rename(newfiles, workdir_complete)
- workdir_complete, ok = file_sorter.move(workdir_complete)
- else:
- workdir_complete, ok = file_sorter.rename_with_ext(workdir_complete)
- if not ok:
- nzo.set_unpack_info("Unpack", T("Failed to move files"))
- all_ok = False
+ workdir_complete, ok = file_sorter.rename(newfiles, workdir_complete)
+ if not ok:
+ workdir_complete, ok = move_to_parent_directory(workdir_complete)
+ if not ok:
+ nzo.set_unpack_info("Unpack", T("Failed to move files"))
+ all_ok = False
if cfg.deobfuscate_final_filenames() and all_ok and not nzb_list:
# Deobfuscate the filenames
@@ -749,7 +744,7 @@ def parring(nzo: NzbObject, workdir: str):
# Need to make a copy because it can change during iteration
single = len(nzo.extrapars) == 1
for setname in list(nzo.extrapars):
- if cfg.ignore_samples() and RE_SAMPLE.search(setname.lower()):
+ if cfg.ignore_samples() and is_sample(setname.lower()):
continue
# Skip sets that were already tried
if not verified.get(setname, False):
@@ -1156,7 +1151,7 @@ def remove_samples(path):
for root, _dirs, files in os.walk(path):
for file_to_match in files:
nr_files += 1
- if RE_SAMPLE.search(file_to_match):
+ if is_sample(file_to_match):
files_to_delete.append(os.path.join(root, file_to_match))
# Make sure we skip false-positives
diff --git a/sabnzbd/skintext.py b/sabnzbd/skintext.py
index 964d9ca..3f7c640 100644
--- a/sabnzbd/skintext.py
+++ b/sabnzbd/skintext.py
@@ -827,6 +827,11 @@ SKIN_TEXT = {
"button-DailyF": TT("Daily Folders"),
"case-adjusted": TT("case-adjusted"), #: Note for title expression in Sorting that does case adjustment
"sortResult": TT("Processed Result"),
+ "sort-guessitMeaning": TT("Any property"),
+ "sort-guessitProperty": TT("property"),
+ "guessit-sp-property": TT("GuessIt Property"),
+ "guessit-dot-property": TT("GuessIt.Property"),
+ "guessit-us-property": TT("GuessIt_Property"),
# Config->Special
"explain-special": TT(
"Rarely used options. For their meaning and explanation, click on the Help button to go to the Wiki page.
"
diff --git a/sabnzbd/sorting.py b/sabnzbd/sorting.py
index ee28e95..addf0c4 100644
--- a/sabnzbd/sorting.py
+++ b/sabnzbd/sorting.py
@@ -17,33 +17,31 @@
"""
sabnzbd.sorting - Sorting Functions
-Series Sorting - Sorting downloads into seasons & episodes
-Date Sorting - Sorting downloads by a custom date matching
-Generic Sorting - Sorting large files by a custom matching
"""
import os
import logging
import re
-from typing import Optional
+import guessit
+from rebulk.match import MatchesDict
+from string import whitespace, ascii_lowercase, punctuation
+from typing import Optional, Union, List, Tuple, Dict
import sabnzbd
from sabnzbd.filesystem import (
move_to_path,
cleanup_empty_directories,
- get_unique_path,
get_unique_filename,
get_ext,
renamer,
- sanitize_and_trim_path,
sanitize_foldername,
clip_path,
)
-from sabnzbd.constants import series_match, date_match, year_match, sample_match, resolution_match
import sabnzbd.cfg as cfg
-from sabnzbd.nzbstuff import NzbObject
+from sabnzbd.constants import EXCLUDED_GUESSIT_PROPERTIES
+from sabnzbd.nzbstuff import NzbObject, scan_password
+
-RE_SAMPLE = re.compile(sample_match, re.I)
# Do not rename .vob files as they are usually DVD's
EXCLUDED_FILE_EXTS = (".vob", ".bin")
@@ -52,388 +50,192 @@ UPPERCASE = ("III", "II", "IV")
REPLACE_AFTER = {"()": "", "..": ".", "__": "_", " ": " ", " .%ext": ".%ext"}
-# Title() function messes up country names, so need to replace them instead
-COUNTRY_REP = (
- "(US)",
- "(UK)",
- "(EU)",
- "(CA)",
- "(YU)",
- "(VE)",
- "(TR)",
- "(CH)",
- "(SE)",
- "(ES)",
- "(KR)",
- "(ZA)",
- "(SK)",
- "(SG)",
- "(RU)",
- "(RO)",
- "(PR)",
- "(PT)",
- "(PL)",
- "(PH)",
- "(PK)",
- "(NO)",
- "(NG)",
- "(NZ)",
- "(NL)",
- "(MX)",
- "(MY)",
- "(MK)",
- "(KZ)",
- "(JP)",
- "(JM)",
- "(IT)",
- "(IL)",
- "(IE)",
- "(IN)",
- "(IS)",
- "(HU)",
- "(HK)",
- "(HN)",
- "(GR)",
- "(GH)",
- "(DE)",
- "(FR)",
- "(FI)",
- "(DK)",
- "(CZ)",
- "(HR)",
- "(CR)",
- "(CO)",
- "(CN)",
- "(CL)",
- "(BG)",
- "(BR)",
- "(BE)",
- "(AT)",
- "(AU)",
- "(AW)",
- "(AR)",
- "(AL)",
- "(AF)",
-)
-
-
-def ends_in_file(path):
- """Return True when path ends with '.%ext' or '%fn'"""
- _RE_ENDEXT = re.compile(r"\.%ext[{}]*$", re.I)
- _RE_ENDFN = re.compile(r"%fn[{}]*$", re.I)
- return bool(_RE_ENDEXT.search(path) or _RE_ENDFN.search(path))
-
-
-def move_to_parent_folder(workdir):
- """Move all in 'workdir' into 'workdir/..'"""
- # Determine 'folder'/..
- workdir = os.path.abspath(os.path.normpath(workdir))
- dest = os.path.abspath(os.path.normpath(os.path.join(workdir, "..")))
-
- # Check for DVD folders and stop if found
- for item in os.listdir(workdir):
- if item.lower() in ("video_ts", "audio_ts", "bdmv"):
- return workdir, True
-
- for root, dirs, files in os.walk(workdir):
- for _file in files:
- path = os.path.join(root, _file)
- new_path = path.replace(workdir, dest)
- ok, new_path = move_to_path(path, new_path)
- if not ok:
- return dest, False
-
- cleanup_empty_directories(workdir)
- return dest, True
-
-
-class Sorter:
- """Generic Sorter class"""
+RE_GI = re.compile(r"(%G([._]?)I<([\w]+)>)") # %GI
, %G.I, or %G_I
- def __init__(self, nzo: Optional[NzbObject], cat):
- self.sorter = None
- self.type = None
- self.sort_file = False
- self.nzo = nzo
- self.cat = cat
- self.ext = ""
-
- def detect(self, job_name, complete_dir):
- """Detect which kind of sort applies"""
- self.sorter = SeriesSorter(self.nzo, job_name, complete_dir, self.cat)
- if self.sorter.matched:
- complete_dir = self.sorter.get_final_path()
- self.type = "tv"
- self.sort_file = True
- return complete_dir
+# Prevent guessit/rebulk from spamming the log when debug logging is active in SABnzbd
+logging.getLogger("rebulk").setLevel(logging.WARNING)
- self.sorter = DateSorter(self.nzo, job_name, complete_dir, self.cat)
- if self.sorter.matched:
- complete_dir = self.sorter.get_final_path()
- self.type = "date"
- self.sort_file = True
- return complete_dir
- self.sorter = MovieSorter(self.nzo, job_name, complete_dir, self.cat)
- if self.sorter.matched:
- complete_dir = self.sorter.get_final_path()
- self.type = "movie"
- self.sort_file = True
- return complete_dir
-
- self.sort_file = False
- return complete_dir
-
- def rename(self, newfiles, workdir_complete):
- """Rename files of the job"""
- if self.sorter.rename_or_not:
- self.sorter.rename(newfiles, workdir_complete)
-
- def rename_with_ext(self, workdir_complete):
- """Special renamer for %ext"""
- if self.sorter.rename_or_not and "%ext" in workdir_complete and self.ext:
- # Replace %ext with extension
- newpath = workdir_complete.replace("%ext", self.ext)
- try:
- renamer(workdir_complete, newpath)
- except:
- return newpath, False
- return newpath, True
- else:
- return workdir_complete, True
-
- def move(self, workdir_complete):
- ok = True
- if self.type == "movie":
- move_to_parent = True
- # check if we should leave the files inside an extra folder
- if cfg.movie_extra_folders():
- # if there is a folder in the download, leave it in an extra folder
- move_to_parent = not check_for_folder(workdir_complete)
- if move_to_parent:
- workdir_complete, ok = move_to_parent_folder(workdir_complete)
- else:
- workdir_complete, ok = move_to_parent_folder(workdir_complete)
- if not ok:
- return workdir_complete, False
-
- path, part = os.path.split(workdir_complete)
- if "%fn" in part and self.sorter.fname:
- old = workdir_complete
- workdir_complete = os.path.join(path, part.replace("%fn", self.sorter.fname))
- workdir_complete = get_unique_path(workdir_complete, create_dir=False)
- try:
- renamer(old, workdir_complete)
- except:
- logging.error(T("Cannot create directory %s"), clip_path(workdir_complete))
- workdir_complete = old
- ok = False
- return workdir_complete, ok
-
-
-class SeriesSorter:
- """Methods for Series Sorting"""
+class BaseSorter:
+ """Common methods for Sorter classes"""
- def __init__(self, nzo: Optional[NzbObject], job_name, path, cat):
+ def __init__(
+ self,
+ nzo: Optional[NzbObject],
+ job_name: str,
+ path: str,
+ cat: str,
+ sort_string: str,
+ cats: str,
+ guess: Optional[MatchesDict],
+ force: Optional[bool] = False,
+ ) -> None:
self.matched = False
-
self.original_job_name = job_name
self.original_path = path
self.nzo = nzo
self.cat = cat
- self.sort_string = cfg.tv_sort_string()
- self.cats = cfg.tv_categories()
self.filename_set = ""
self.fname = "" # Value for %fn substitution in folders
- self.final_path = ""
-
- self.match_obj = None
- self.extras = None
-
- self.rename_or_not = False
-
- self.show_info = {}
+ self.do_rename = False
+ self.info = {}
+ self.type = None
+ self.guess = guess
+ self.force = force
+ self.sort_string = sort_string
+ self.cats = cats
- # Check if it is a TV show on init()
+ # Check categories and do the guessing work, if necessary
self.match()
- def match(self, force=False):
- """Checks the regex for a match, if so set self.match to true"""
- if force or (cfg.enable_tv_sorting() and cfg.tv_sort_string()):
- if (
- force
- or (not self.cats)
- or (self.cat and self.cat.lower() in self.cats)
- or (not self.cat and "None" in self.cats)
- ):
- # First check if the show matches TV episode regular expressions. Returns regex match object
- self.match_obj, self.extras = check_regexs(self.original_job_name, series_match)
- if self.match_obj:
- logging.debug("Found TV Show (%s)", self.original_job_name)
- self.matched = True
-
- def is_match(self):
- """Returns whether there was a match or not"""
- return self.matched
-
- def get_final_path(self):
- """Collect and construct all the variables such as episode name, show names"""
- if self.get_values():
- # Get the final path
- path = self.construct_path()
- self.final_path = os.path.join(self.original_path, path)
- return self.final_path
+ def get_final_path(self) -> str:
+ if self.matched:
+ # Construct the final path
+ self.get_values()
+ return os.path.join(self.original_path, self.construct_path())
else:
# Error Sorting
return os.path.join(self.original_path, self.original_job_name)
- @staticmethod
- def get_multi_ep_naming(one, two, extras):
- """Returns a list of unique values joined into a string and separated by - (ex:01-02-03-04)"""
- extra_list = [one]
- extra2_list = [two]
- for extra in extras:
- if extra not in (extra_list, extra2_list):
- ep_no2 = extra.rjust(2, "0")
- extra_list.append(extra)
- extra2_list.append(ep_no2)
-
- one = "-".join(extra_list)
- two = "-".join(extra2_list)
- return one, two
-
- def get_shownames(self):
- """Get the show name from the match object and format it"""
+ def get_names(self) -> None:
+ """Get the show or movie name from the guess and format it"""
# Get the formatted title and alternate title formats
- self.show_info["show_tname"], self.show_info["show_tname_two"], self.show_info["show_tname_three"] = get_titles(
- self.nzo, self.match_obj, self.original_job_name, True
+ self.info["ttitle"], self.info["ttitle_two"], self.info["ttitle_three"] = get_titles(
+ self.nzo, self.guess, self.original_job_name, True
)
- self.show_info["show_name"], self.show_info["show_name_two"], self.show_info["show_name_three"] = get_titles(
- self.nzo, self.match_obj, self.original_job_name
+ self.info["title"], self.info["title_two"], self.info["title_three"] = get_titles(
+ self.nzo, self.guess, self.original_job_name
)
- def get_seasons(self):
- """Get the season number from the match object and format it"""
- try:
- season = self.match_obj.group(1).strip("_") # season number
- except AttributeError:
- season = "1"
+ def get_resolution(self) -> None:
+ self.info["resolution"] = self.guess.get("screen_size", "")
- # Provide alternative formatting (0 padding)
- if season.lower() == "s":
- season2 = season
- else:
- try:
- season = str(int(season))
- except:
- pass
- season2 = season.rjust(2, "0")
-
- self.show_info["season_num"] = season
- self.show_info["season_num_alt"] = season2
-
- def get_episodes(self):
- """Get the episode numbers from the match object, format and join them"""
- try:
- ep_no = self.match_obj.group(2) # episode number
- except AttributeError:
- ep_no = "1"
- # Store the original episode number
-
- # Provide alternative formatting (0 padding)
- ep_no2 = ep_no.rjust(2, "0")
- try:
- ep_no = str(int(ep_no))
- except:
- pass
-
- # Dual episode support
- if self.extras:
- ep_no, ep_no2 = self.get_multi_ep_naming(ep_no, ep_no2, self.extras)
-
- self.show_info["episode_num"] = ep_no
- self.show_info["episode_num_alt"] = ep_no2
-
- def get_showdescriptions(self):
- """Get the show descriptions from the match object and format them"""
- self.show_info["ep_name"], self.show_info["ep_name_two"], self.show_info["ep_name_three"] = get_descriptions(
- self.nzo, self.match_obj, self.original_job_name
+ def get_showdescriptions(self) -> None:
+ """Get the show descriptions based on metadata, guessit and jobname"""
+ self.info["ep_name"], self.info["ep_name_two"], self.info["ep_name_three"] = get_descriptions(
+ self.nzo, self.guess, self.original_job_name
)
- def get_show_resolution(self):
- self.show_info["resolution"] = get_resolution(self.original_job_name)
-
- def get_values(self):
- """Collect and construct all the values needed for path replacement"""
- try:
- # - Show Name
- self.get_shownames()
-
- # - Season
- self.get_seasons()
-
- # - Episode Number
- self.get_episodes()
-
- # - Episode Name
- self.get_showdescriptions()
-
- # - Resolution
- self.get_show_resolution()
-
- return True
-
- except:
- logging.error(T("Error getting TV info (%s)"), clip_path(self.original_job_name))
- logging.info("Traceback: ", exc_info=True)
- return False
+ def get_year(self) -> None:
+ """Get the year and the corresponding two and four digit decade values"""
+ year = ""
+ if self.nzo:
+ year = self.nzo.nzo_info.get("year")
+ if not year:
+ year = self.guess.get("year", "")
+ if not year:
+ # Try extracting the year from the guessed date instead
+ try:
+ year = self.guess.get("date").year or ""
+ except:
+ pass
+ self.info["year"] = str(year)
+ self.info["decade"] = ""
+ self.info["decade_two"] = ""
+ if self.info["year"]:
+ try:
+ self.info["decade"] = self.info["year"][2:3] + "0"
+ self.info["decade_two"] = self.info["year"][:3] + "0"
+ except TypeError:
+ pass
- def construct_path(self):
- """Replaces the sort string with real values such as Show Name and Episode Number"""
+ def is_proper(self):
+ """Determine if the release is tagged 'Proper'. Note that guessit also sets this for similar
+ tags such as 'Real' and 'Repack', saving us the trouble of checking for additional keywords."""
+ other = self.guess.get("other", "")
+ if isinstance(other, list):
+ return "Proper" in other
+ else:
+ return other == "Proper"
- sorter = self.sort_string.replace("\\", "/")
+ def construct_path(self) -> str:
+ """Map all markers and replace the sort string with real values"""
+ sorter = self.sort_string
mapping = []
if ends_in_file(sorter):
extension = True
- sorter = sorter.replace(".%ext", "")
+ if sorter.endswith(".%ext"):
+ sorter = sorter[:-5] # Strip '.%ext' off the end; other %ext may remain in sorter
else:
extension = False
- # Replace Show name
- mapping.append(("%sn", self.show_info["show_tname"]))
- mapping.append(("%s.n", self.show_info["show_tname_two"]))
- mapping.append(("%s_n", self.show_info["show_tname_three"]))
- mapping.append(("%sN", self.show_info["show_name"]))
- mapping.append(("%s.N", self.show_info["show_name_two"]))
- mapping.append(("%s_N", self.show_info["show_name_three"]))
-
- # Replace season number
- mapping.append(("%s", self.show_info["season_num"]))
- mapping.append(("%0s", self.show_info["season_num_alt"]))
-
- # Original dir name
+ # Title
+ mapping.append(("%title", self.info["title"]))
+ mapping.append(("%.title", self.info["title_two"]))
+ mapping.append(("%_title", self.info["title_three"]))
+ # Legacy markers for the same; note that %t must come after %title
+ mapping.append(("%t", self.info["title"]))
+ mapping.append(("%.t", self.info["title_two"]))
+ mapping.append(("%_t", self.info["title_three"]))
+ mapping.append(("%sN", self.info["title"]))
+ mapping.append(("%s.N", self.info["title_two"]))
+ mapping.append(("%s_N", self.info["title_three"]))
+
+ # Titlecased title
+ mapping.append(("%sn", self.info["ttitle"]))
+ mapping.append(("%s.n", self.info["ttitle_two"]))
+ mapping.append(("%s_n", self.info["ttitle_three"]))
+
+ # Original directory name
mapping.append(("%dn", self.original_job_name))
- # Replace episode names
- if self.show_info["ep_name"]:
- mapping.append(("%en", self.show_info["ep_name"]))
- mapping.append(("%e.n", self.show_info["ep_name_two"]))
- mapping.append(("%e_n", self.show_info["ep_name_three"]))
- else:
- mapping.append(("%en", ""))
- mapping.append(("%e.n", ""))
- mapping.append(("%e_n", ""))
-
- # Replace episode number
- mapping.append(("%e", self.show_info["episode_num"]))
- mapping.append(("%0e", self.show_info["episode_num_alt"]))
-
- # Replace resolution
- mapping.append(("%r", self.show_info["resolution"]))
-
- # Make sure unsupported %desc is removed
- mapping.append(("%desc", ""))
+ # Resolution
+ mapping.append(("%r", self.info["resolution"]))
+
+ # Year
+ mapping.append(("%year", self.info["year"]))
+ mapping.append(("%y", self.info["year"]))
+
+ # Decades
+ mapping.append(("%decade", self.info["decade"]))
+ mapping.append(("%0decade", self.info["decade_two"]))
+
+ # Handle some type-specific mappings
+ if self.type in ("tv", "date"):
+ # Episode name
+ mapping.append(("%en", self.info["ep_name"]))
+ mapping.append(("%e.n", self.info["ep_name_two"]))
+ mapping.append(("%e_n", self.info["ep_name_three"]))
+
+ # Legacy %desc
+ if self.type == "date" and self.info.get("ep_name"):
+ # For date, %desc was no longer listed but still supported in the backend. For tv,
+ # it was invalid and %en (etc.) used instead. For backward compatibility, map %desc
+ # to %en for 'date' only and remove for 'tv'.
+ mapping.append(("%desc", self.info["ep_name"]))
+ else:
+ mapping.append(("%desc", ""))
+
+ if self.type == "tv":
+ # Season number
+ mapping.append(("%s", self.info["season_num"]))
+ mapping.append(("%0s", self.info["season_num_alt"]))
+
+ # Episode number; note this must come after the %en variants
+ mapping.append(("%e", self.info["episode_num"]))
+ mapping.append(("%0e", self.info["episode_num_alt"]))
+
+ if self.type == "date":
+ # Month
+ mapping.append(("%m", self.info["month"]))
+ mapping.append(("%0m", self.info["month_two"]))
+
+ # Day
+ mapping.append(("%d", self.info["day"]))
+ mapping.append(("%0d", self.info["day_two"]))
+
+ # Handle generic guessit markers
+ for marker, spacer, guess_property in re.findall(RE_GI, sorter):
+ value = self.guess.get(guess_property, "") if self.guess else ""
+ # Guessit returns a list for some properties in case they have multiple entries/values
+ if isinstance(value, list):
+ value = "-".join([str(v) for v in value]) # Format as value1-value2
+ else:
+ value = str(value)
+ if spacer:
+ value = value.replace(" ", spacer)
+ mapping.append((marker, value))
# Replace elements
path = path_subst(sorter, mapping)
@@ -444,350 +246,233 @@ class SeriesSorter:
# Lowercase all characters wrapped in {}
path = to_lowercase(path)
- # Strip any extra ' ' '.' or '_' around foldernames
- path = strip_folders(path)
+ # Strip any extra spaces, dots, and underscores around directory names
+ path = strip_path_elements(path)
# Split the last part of the path up for the renamer
if extension:
- head, tail = os.path.split(path)
- self.filename_set = tail
- self.rename_or_not = True
- else:
- head = path
+ path, self.filename_set = os.path.split(path)
+ self.do_rename = True
- if head:
- return os.path.normpath(head)
- else:
- # The normpath function translates "" to "."
- # which results in wrong path.join later on
- return head
+ # The normpath function translates "" to "." which results in an incorrect path
+ return os.path.normpath(path) if path else path
- def rename(self, files, current_path):
- """Rename for Series"""
- logging.debug("Renaming Series")
+ def rename(self, files: List[str], current_path: str, min_size: int) -> Tuple[str, bool]:
largest = (None, None, 0)
- def to_filepath(f, current_path):
- if is_full_path(f):
- filepath = os.path.normpath(f)
+ def to_filepath(file, current_path):
+ if is_full_path(file):
+ filepath = os.path.normpath(file)
else:
- filepath = os.path.normpath(os.path.join(current_path, f))
+ filepath = os.path.normpath(os.path.join(current_path, file))
return filepath
- # Create a generator of filepaths, ignore sample files and excluded files (vobs ect)
+ # Create a generator of filepaths, ignore samples and excluded files
filepaths = (
(file, to_filepath(file, current_path))
for file in files
- if not RE_SAMPLE.search(file) and get_ext(file) not in EXCLUDED_FILE_EXTS
+ if not is_sample(file) and get_ext(file) not in EXCLUDED_FILE_EXTS
)
- # Find the largest existing file
+ # Find the largest file
for file, fp in filepaths:
- # If for some reason the file no longer exists, skip
+ # Skip any file that no longer exists (e.g. extension on the cleanup list)
if not os.path.exists(fp):
continue
-
size = os.stat(fp).st_size
f_file, f_fp, f_size = largest
if size > f_size:
largest = (file, fp, size)
file, filepath, size = largest
- # >20MB
- if filepath and size > 20971520:
- self.fname, self.ext = os.path.splitext(os.path.split(file)[1])
- newname = "%s%s" % (self.filename_set, self.ext)
- # Replace %fn with the original filename
- newname = newname.replace("%fn", self.fname)
- newpath = os.path.join(current_path, newname)
- # Replace %ext with extension
- newpath = newpath.replace("%ext", self.ext)
- newpath = sanitize_and_trim_path(newpath)
- try:
- logging.debug("Rename: %s to %s", filepath, newpath)
- renamer(filepath, newpath)
- except:
- logging.error(T("Failed to rename: %s to %s"), clip_path(current_path), clip_path(newpath))
- logging.info("Traceback: ", exc_info=True)
- rename_similar(current_path, self.ext, self.filename_set, ())
- else:
- logging.debug("Nothing to rename, %s", files)
-
-
-_RE_MULTIPLE = (
- re.compile(r"cd\W?(\d+)\W?", re.I), # .cd1.mkv
- re.compile(r"\w\W?([\w\d])[{}]*$", re.I), # blah1.mkv blaha.mkv
- re.compile(r"\w\W([\w\d])\W", re.I), # blah-1-ok.mkv blah-a-ok.mkv
-)
-
-
-def check_for_multiple(files):
- """Return list of files that looks like a multi-part post"""
- for regex in _RE_MULTIPLE:
- matched_files = check_for_sequence(regex, files)
- if matched_files:
- return matched_files
- return ""
-
-
-def check_for_sequence(regex, files):
- """Return list of files that looks like a sequence, using 'regex'"""
- matches = {}
- prefix = None
- # Build up a dictionary of matches
- # The key is based off the match, ie {1:'blah-part1.mkv'}
- for _file in files:
- name, ext = os.path.splitext(_file)
- match1 = regex.search(name)
- if match1:
- if not prefix or prefix == name[: match1.start()]:
- matches[match1.group(1)] = name + ext
- prefix = name[: match1.start()]
-
- # Don't do anything if only one or no files matched
- if len(list(matches)) < 2:
- return {}
-
- key_prev = 0
- passed = True
- alphabet = "abcdefghijklmnopqrstuvwxyz"
- # Check the dictionary to see if the keys are in a numeric or alphabetic sequence
- for akey in sorted(matches):
- if akey.isdigit():
- key = int(akey)
- elif akey in alphabet:
- key = alphabet.find(akey) + 1
+ if filepath and size > min_size:
+ self.fname, ext = os.path.splitext(os.path.split(file)[1])
+ newpath = os.path.join(
+ current_path, self.filename_set.replace("%fn", self.fname).replace("%ext", ext.lstrip(".")) + ext
+ )
+ if not os.path.exists(newpath):
+ try:
+ logging.debug("Rename: %s to %s", filepath, newpath)
+ renamer(filepath, newpath)
+ except:
+ logging.error(T("Failed to rename: %s to %s"), clip_path(current_path), clip_path(newpath))
+ logging.info("Traceback: ", exc_info=True)
+ rename_similar(current_path, ext, self.filename_set, ())
else:
- passed = False
-
- if passed:
- if not key_prev:
- key_prev = key
- else:
- if key_prev + 1 == key:
- key_prev = key
- else:
- passed = False
- if passed:
- # convert {'b':'filename-b.mkv'} to {'2', 'filename-b.mkv'}
- item = matches.pop(akey)
- matches[str(key)] = item
-
- if passed:
- return matches
- else:
- return {}
+ logging.debug("Nothing to rename, %s", files)
+ return move_to_parent_directory(current_path)
-class MovieSorter:
- """Methods for Generic Sorting"""
- def __init__(self, nzo: Optional[NzbObject], job_name, path, cat):
- self.matched = False
+class Sorter:
+ """Generic Sorter"""
- self.original_job_name = job_name
- self.original_path = path
- self.sort_string = cfg.movie_sort_string()
- self.extra = cfg.movie_sort_extra()
- self.cats = cfg.movie_categories()
- self.cat = cat
+ def __init__(self, nzo: Optional[NzbObject], cat: str) -> None:
+ self.sorter = None
+ self.sort_file = False
self.nzo = nzo
- self.filename_set = ""
- self.fname = "" # Value for %fn substitution in folders
- self.final_path = ""
-
- self.match_obj = None
-
- self.rename_or_not = False
-
- self.movie_info = {}
+ self.cat = cat
- # Check if we match the category in init()
- self.match()
+ def detect(self, job_name: str, complete_dir: str) -> str:
+ """Detect the sorting type"""
+ guess = guess_what(job_name)
- def match(self, force=False):
- """Checks the category for a match, if so set self.match to true"""
- if force or (cfg.enable_movie_sorting() and self.sort_string):
- # First check if the show matches TV episode regular expressions. Returns regex match object
- if force or (self.cat and self.cat.lower() in self.cats) or (not self.cat and "None" in self.cats):
- logging.debug("Found Movie (%s)", self.original_job_name)
- self.matched = True
-
- def get_final_path(self):
- """Collect and construct all the variables such as episode name, show names"""
- if self.get_values():
- # Get the final path
- path = self.construct_path()
- self.final_path = os.path.join(self.original_path, path)
- return self.final_path
- else:
- # Error Sorting
- return os.path.join(self.original_path, self.original_job_name)
-
- def get_values(self):
- """Collect and construct all the values needed for path replacement"""
- # - Get Year
- if self.nzo:
- year = self.nzo.nzo_info.get("year")
- else:
- year = ""
- if year:
- year_m = None
- else:
- job_name = self.original_job_name.replace("_", " ")
- RE_YEAR = re.compile(year_match, re.I)
- year_m = RE_YEAR.search(job_name)
- if year_m:
- # Find the last matched date
- # Keep year_m to use in get_titles
- year = RE_YEAR.findall(job_name)[-1][0]
+ if guess["type"] == "episode":
+ self.sort_file = True
+ if "date" in guess:
+ self.sorter = DateSorter(self.nzo, job_name, complete_dir, self.cat, guess)
else:
- year = ""
- self.movie_info["year"] = year
-
- # Get resolution
- self.movie_info["resolution"] = get_resolution(self.original_job_name)
+ self.sorter = SeriesSorter(self.nzo, job_name, complete_dir, self.cat, guess)
+ elif guess["type"] == "movie":
+ self.sort_file = True
+ self.sorter = MovieSorter(self.nzo, job_name, complete_dir, self.cat, guess)
- # - Get Decades
- self.movie_info["decade"], self.movie_info["decade_two"] = get_decades(year)
+ return self.sorter.get_final_path() if self.sort_file else complete_dir
- # - Get Title
- self.movie_info["ttitle"], self.movie_info["ttitle_two"], self.movie_info["ttitle_three"] = get_titles(
- self.nzo, year_m, self.original_job_name, True
- )
- self.movie_info["title"], self.movie_info["title_two"], self.movie_info["title_three"] = get_titles(
- self.nzo, year_m, self.original_job_name
- )
+ def rename(self, newfiles, workdir_complete) -> None:
+ """Rename files of the job"""
+ return self.sorter.rename(newfiles, workdir_complete)
- return True
- def construct_path(self):
- """Return path reconstructed from original and sort expression"""
- sorter = self.sort_string.replace("\\", "/")
- mapping = []
+class SeriesSorter(BaseSorter):
+ """Methods for Series Sorting"""
- if ends_in_file(sorter):
- extension = True
- sorter = sorter.replace(".%ext", "")
+ def __init__(
+ self,
+ nzo: Optional[NzbObject],
+ job_name: str,
+ path: str,
+ cat: str,
+ guess: Optional[MatchesDict] = None,
+ force: Optional[bool] = False,
+ ) -> None:
+
+ super().__init__(nzo, job_name, path, cat, cfg.tv_sort_string(), cfg.tv_categories(), guess, force)
+
+ def match(self) -> None:
+ """Try to guess series info if config and category sort out or force is set"""
+ if self.force or (cfg.enable_tv_sorting() and cfg.tv_sort_string()):
+ if (
+ self.force
+ or (not self.cats)
+ or (self.cat and self.cat.lower() in self.cats)
+ or (not self.cat and "None" in self.cats)
+ ):
+ if not self.guess:
+ self.guess = guess_what(self.original_job_name, sort_type="episode")
+ if self.guess.get("type") == "episode" and not "date" in self.guess:
+ logging.debug("Using tv sorter for %s", self.original_job_name)
+ self.matched = True
+ self.type = "tv"
+
+ def get_values(self) -> None:
+ """Collect all values needed for path replacement"""
+ self.get_year()
+ self.get_names()
+ self.get_seasons()
+ self.get_episodes()
+ self.get_showdescriptions()
+ self.get_resolution()
+
+ def format_series_numbers(self, numbers: Union[int, List[int]], info_name: str) -> None:
+ """Format the numbers in both plain and alternative (zero-padded) format and set as showinfo"""
+ # Guessit returns multiple episodes or seasons as a list of integers, single values as int
+ if isinstance(numbers, int):
+ self.info[info_name] = str(numbers) # 1
+ self.info[info_name + "_alt"] = str(numbers).rjust(2, "0") # 01
else:
- extension = False
-
- # Replace title
- mapping.append(("%title", self.movie_info["title"]))
- mapping.append(("%.title", self.movie_info["title_two"]))
- mapping.append(("%_title", self.movie_info["title_three"]))
-
- # Replace title (short forms)
- mapping.append(("%t", self.movie_info["title"]))
- mapping.append(("%.t", self.movie_info["title_two"]))
- mapping.append(("%_t", self.movie_info["title_three"]))
-
- mapping.append(("%sn", self.movie_info["title"]))
- mapping.append(("%s.n", self.movie_info["title_two"]))
- mapping.append(("%s_n", self.movie_info["title_three"]))
-
- mapping.append(("%sN", self.movie_info["ttitle"]))
- mapping.append(("%s.N", self.movie_info["ttitle_two"]))
- mapping.append(("%s_N", self.movie_info["ttitle_three"]))
+ self.info[info_name] = "-".join([str(num) for num in numbers]) # 1-2-3
+ self.info[info_name + "_alt"] = "-".join([str(num).rjust(2, "0") for num in numbers]) # 01-02-03
- # Replace year
- mapping.append(("%y", self.movie_info["year"]))
+ def get_seasons(self) -> None:
+ """Fetch the guessed season number(s)"""
+ self.format_series_numbers(self.guess.get("season", ""), "season_num")
- # Replace resolution
- mapping.append(("%r", self.movie_info["resolution"]))
+ def get_episodes(self) -> None:
+ """Fetch the guessed episode number(s)"""
+ self.format_series_numbers(self.guess.get("episode", ""), "episode_num")
- # Replace decades
- mapping.append(("%decade", self.movie_info["decade"]))
- mapping.append(("%0decade", self.movie_info["decade_two"]))
-
- # Original dir name
- mapping.append(("%dn", self.original_job_name))
-
- path = path_subst(sorter, mapping)
-
- for key, name in REPLACE_AFTER.items():
- path = path.replace(key, name)
+ def rename(self, files: List[str], current_path: str, min_size: int = -1) -> Tuple[str, bool]:
+ """Rename for Series"""
+ if min_size < 0:
+ min_size = cfg.episode_rename_limit.get_int()
+ if not self.do_rename:
+ return current_path, False
+ else:
+ logging.debug("Renaming series file(s)")
+ return super().rename(files, current_path, min_size)
+
+
+class MovieSorter(BaseSorter):
+ """Methods for Movie Sorting"""
+
+ def __init__(
+ self,
+ nzo: Optional[NzbObject],
+ job_name: str,
+ path: str,
+ cat: str,
+ guess: Optional[MatchesDict] = None,
+ force: Optional[bool] = False,
+ ) -> None:
+ self.extra = cfg.movie_sort_extra()
- # Lowercase all characters wrapped in {}
- path = to_lowercase(path)
+ super().__init__(nzo, job_name, path, cat, cfg.movie_sort_string(), cfg.movie_categories(), guess, force)
- # Strip any extra ' ' '.' or '_' around foldernames
- path = strip_folders(path)
+ def match(self) -> None:
+ """Try to guess movie info if config and category sort out or force is set"""
+ if self.force or (cfg.enable_movie_sorting() and self.sort_string):
+ if self.force or (self.cat and self.cat.lower() in self.cats) or (not self.cat and "None" in self.cats):
+ if not self.guess:
+ self.guess = guess_what(self.original_job_name, sort_type="movie")
+ if self.guess.get("type") == "movie":
+ logging.debug("Using movie sorter for %s", self.original_job_name)
+ self.matched = True
+ self.type = "movie"
- # Split the last part of the path up for the renamer
- if extension:
- head, tail = os.path.split(path)
- self.filename_set = tail
- self.rename_or_not = True
- else:
- head = path
+ def get_values(self) -> None:
+ """Collect all values needed for path replacement"""
+ self.get_year()
+ self.get_resolution()
+ self.get_names()
- if head:
- return os.path.normpath(head)
- else:
- # The normpath function translates "" to "."
- # which results in wrong path.join later on
- return head
+ def rename(self, files, current_path, min_size: int = -1) -> Tuple[str, bool]:
+ """Rename for movie files"""
+ if min_size < 0:
+ min_size = cfg.movie_rename_limit.get_int()
- def rename(self, _files, current_path):
- """Rename for Generic files"""
- logging.debug("Renaming Generic file")
+ if not self.do_rename:
+ return current_path, False
+ logging.debug("Renaming movie file(s)")
- def filter_files(_file, current_path):
- if is_full_path(_file):
- filepath = os.path.normpath(_file)
- else:
- filepath = os.path.normpath(os.path.join(current_path, _file))
+ def filter_files(f, current_path):
+ filepath = os.path.normpath(f) if is_full_path(f) else os.path.normpath(os.path.join(current_path, f))
if os.path.exists(filepath):
- size = os.stat(filepath).st_size
- if (
- size >= cfg.movie_rename_limit.get_int()
- and not RE_SAMPLE.search(_file)
- and get_ext(_file) not in EXCLUDED_FILE_EXTS
- ):
+ if os.stat(filepath).st_size >= min_size and not is_sample(f) and get_ext(f) not in EXCLUDED_FILE_EXTS:
return True
return False
- # remove any files below the limit from this list
- files = [_file for _file in _files if filter_files(_file, current_path)]
+ # Filter samples and anything nonexistent or below the size limit
+ files = [f for f in files if filter_files(f, current_path)]
- length = len(files)
- # Single File Handling
- if length == 1:
- file = files[0]
- if is_full_path(file):
- filepath = os.path.normpath(file)
- else:
- filepath = os.path.normpath(os.path.join(current_path, file))
- if os.path.exists(filepath):
- self.fname, ext = os.path.splitext(os.path.split(file)[1])
- newname = "%s%s" % (self.filename_set, ext)
- newname = newname.replace("%fn", self.fname)
- newpath = os.path.join(current_path, newname)
- try:
- logging.debug("Rename: %s to %s", filepath, newpath)
- renamer(filepath, newpath)
- except:
- logging.error(T("Failed to rename: %s to %s"), clip_path(filepath), clip_path(newpath))
- logging.info("Traceback: ", exc_info=True)
- rename_similar(current_path, ext, self.filename_set, ())
+ # Single movie file
+ if len(files) == 1:
+ return super().rename(files, current_path, min_size)
- # Sequence File Handling
- # if there is more than one extracted file check for CD1/1/A in the title
- elif self.extra:
+ # Multiple files, check for sequential filenames
+ elif files and self.extra:
matched_files = check_for_multiple(files)
- # rename files marked as in a set
if matched_files:
- logging.debug("Renaming a series of generic files (%s)", matched_files)
+ logging.debug("Renaming sequential files %s", matched_files)
renamed = list(matched_files.values())
for index, file in matched_files.items():
filepath = os.path.join(current_path, file)
renamed.append(filepath)
self.fname, ext = os.path.splitext(os.path.split(file)[1])
- name = "%s%s" % (self.filename_set, self.extra)
- name = name.replace("%1", str(index)).replace("%fn", self.fname)
- name = name + ext
+ name = (self.filename_set + self.extra).replace("%1", str(index)).replace(
+ "%fn", self.fname
+ ).replace("%ext", ext.lstrip(".")) + ext
newpath = os.path.join(current_path, name)
try:
logging.debug("Rename: %s to %s", filepath, newpath)
@@ -797,218 +482,180 @@ class MovieSorter:
logging.info("Traceback: ", exc_info=True)
rename_similar(current_path, ext, self.filename_set, renamed)
else:
- logging.debug("Movie files not in sequence %s", _files)
+ logging.debug("No sequential files in %s", files)
+ return move_to_parent_directory(current_path)
-class DateSorter:
- """Methods for Date Sorting"""
-
- def __init__(self, nzo: Optional[NzbObject], job_name, path, cat):
- self.matched = False
- self.original_job_name = job_name
- self.original_path = path
- self.sort_string = cfg.date_sort_string()
- self.cats = cfg.date_categories()
- self.cat = cat
- self.nzo = nzo
- self.filename_set = ""
- self.fname = "" # Value for %fn substitution in folders
-
- self.match_obj = None
-
- self.rename_or_not = False
- self.date_type = None
+class DateSorter(BaseSorter):
+ """Methods for Date Sorting"""
- self.date_info = {}
- self.final_path = ""
+ def __init__(
+ self,
+ nzo: Optional[NzbObject],
+ job_name: str,
+ path: str,
+ cat: str,
+ guess: Optional[MatchesDict] = None,
+ force: Optional[bool] = False,
+ ) -> None:
- # Check if we match the category in init()
- self.match()
+ super().__init__(nzo, job_name, path, cat, cfg.date_sort_string(), cfg.date_categories(), guess, force)
- def match(self, force=False):
+ def match(self) -> None:
"""Checks the category for a match, if so set self.matched to true"""
- if force or (cfg.enable_date_sorting() and self.sort_string):
- # First check if the show matches TV episode regular expressions. Returns regex match object
- if force or (self.cat and self.cat.lower() in self.cats) or (not self.cat and "None" in self.cats):
- self.match_obj, self.date_type = check_for_date(self.original_job_name, date_match)
- if self.match_obj:
- logging.debug("Found date for sorting (%s)", self.original_job_name)
+ if self.force or (cfg.enable_date_sorting() and self.sort_string):
+ if self.force or (self.cat and self.cat.lower() in self.cats) or (not self.cat and "None" in self.cats):
+ if not self.guess:
+ self.guess = guess_what(self.original_job_name, sort_type="episode")
+ if self.guess.get("type") == "episode" and "date" in self.guess:
+ logging.debug("Using date sorter for %s", self.original_job_name)
self.matched = True
-
- def is_match(self):
- """Returns whether there was a match or not"""
- return self.matched
-
- def get_final_path(self):
- """Collect and construct all the variables such as episode name, show names"""
- if self.get_values():
- # Get the final path
- path = self.construct_path()
- self.final_path = os.path.join(self.original_path, path)
- return self.final_path
- else:
- # Error Sorting
- return os.path.join(self.original_path, self.original_job_name)
-
- def get_values(self):
- """Collect and construct all the values needed for path replacement"""
-
- # 2008-10-16
- if self.date_type == 1:
- self.date_info["year"] = self.match_obj.group(1)
- self.date_info["month"] = self.match_obj.group(2)
- self.date_info["date"] = self.match_obj.group(3)
- # 10.16.2008
+ self.type = "date"
+
+ def get_date(self) -> None:
+ """Get month and day"""
+ self.info["month"] = str(self.guess.get("date").month)
+ self.info["day"] = str(self.guess.get("date").day)
+ # Zero-padded versions of the same
+ self.info["month_two"] = self.info["month"].rjust(2, "0")
+ self.info["day_two"] = self.info["day"].rjust(2, "0")
+
+ def get_values(self) -> None:
+ """Collect all values needed for path replacement"""
+ self.get_year()
+ self.get_date()
+ self.get_resolution()
+ self.get_names()
+ self.get_showdescriptions()
+
+ def rename(self, files: List[str], current_path: str, min_size: int = -1) -> Tuple[str, bool]:
+ """Renaming Date file"""
+ if min_size < 0:
+ min_size = cfg.episode_rename_limit.get_int()
+ if not self.do_rename:
+ return current_path, False
else:
- self.date_info["year"] = self.match_obj.group(3)
- self.date_info["month"] = self.match_obj.group(1)
- self.date_info["date"] = self.match_obj.group(2)
+ logging.debug("Renaming date file(s)")
+ return super().rename(files, current_path, min_size)
- self.date_info["month_two"] = self.date_info["month"].rjust(2, "0")
- self.date_info["date_two"] = self.date_info["date"].rjust(2, "0")
- # - Get Decades
- self.date_info["decade"], self.date_info["decade_two"] = get_decades(self.date_info["year"])
+def ends_in_file(path: str) -> bool:
+ """Return True when path ends with '.%ext' or '%fn' while allowing for a lowercase marker"""
+ RE_ENDEXT = re.compile(r"\.%ext}?$", re.I)
+ RE_ENDFN = re.compile(r"%fn}?$", re.I)
+ return bool(RE_ENDEXT.search(path) or RE_ENDFN.search(path))
- # - Get resolution
- self.date_info["resolution"] = get_resolution(self.original_job_name)
- # - Get Title
- self.date_info["ttitle"], self.date_info["ttitle_two"], self.date_info["ttitle_three"] = get_titles(
- self.nzo, self.match_obj, self.original_job_name, True
- )
- self.date_info["title"], self.date_info["title_two"], self.date_info["title_three"] = get_titles(
- self.nzo, self.match_obj, self.original_job_name
- )
+def move_to_parent_directory(workdir: str) -> Tuple[str, bool]:
+ """Move all files under 'workdir' into 'workdir/..'"""
+ # Determine 'folder'/..
+ workdir = os.path.abspath(os.path.normpath(workdir))
+ dest = os.path.abspath(os.path.normpath(os.path.join(workdir, "..")))
- self.date_info["ep_name"], self.date_info["ep_name_two"], self.date_info["ep_name_three"] = get_descriptions(
- self.nzo, self.match_obj, self.original_job_name
- )
+ # Check for DVD folders and bail out if found
+ for item in os.listdir(workdir):
+ if item.lower() in ("video_ts", "audio_ts", "bdmv"):
+ return workdir, True
- return True
+ for root, dirs, files in os.walk(workdir):
+ for _file in files:
+ path = os.path.join(root, _file)
+ new_path = path.replace(workdir, dest)
+ ok, new_path = move_to_path(path, new_path)
+ if not ok:
+ return dest, False
- def construct_path(self):
- """Return path reconstructed from original and sort expression"""
- sorter = self.sort_string.replace("\\", "/")
- mapping = []
+ cleanup_empty_directories(workdir)
+ return dest, True
- if ends_in_file(sorter):
- extension = True
- sorter = sorter.replace(".%ext", "")
- else:
- extension = False
- # Replace title
- mapping.append(("%title", self.date_info["title"]))
- mapping.append(("%.title", self.date_info["title_two"]))
- mapping.append(("%_title", self.date_info["title_three"]))
-
- mapping.append(("%t", self.date_info["title"]))
- mapping.append(("%.t", self.date_info["title_two"]))
- mapping.append(("%_t", self.date_info["title_three"]))
-
- mapping.append(("%sn", self.date_info["ttitle"]))
- mapping.append(("%s.n", self.date_info["ttitle_two"]))
- mapping.append(("%s_n", self.date_info["ttitle_three"]))
- mapping.append(("%sN", self.date_info["title"]))
- mapping.append(("%s.N", self.date_info["title_two"]))
- mapping.append(("%s_N", self.date_info["title_three"]))
-
- # Replace year
- mapping.append(("%year", self.date_info["year"]))
- mapping.append(("%y", self.date_info["year"]))
-
- # Replace resolution
- mapping.append(("%r", self.date_info["resolution"]))
-
- if self.date_info["ep_name"]:
- mapping.append(("%desc", self.date_info["ep_name"]))
- mapping.append(("%.desc", self.date_info["ep_name_two"]))
- mapping.append(("%_desc", self.date_info["ep_name_three"]))
+def guess_what(name: str, sort_type: Optional[str] = None) -> MatchesDict:
+ """Guess metadata for movies or episodes from their name. The sort_type ('movie' or 'episode')
+ is passed as a hint to guessit, if given."""
+
+ if not name:
+ raise ValueError("Need a name for guessing")
+
+ # Remove any passwords from the name
+ name = scan_password(name)[0]
+
+ # Avoid trouble with names starting with a digit (esp. with no year in the name)
+ digit_fix = "FIX" if name[0].isdigit() else ""
+
+ guessit_options = {
+ # "no-user-config": True,
+ "expected_title": [], # This isn't empty by default?
+ # "allowed_countries": [],
+ # "allowed_languages": [],
+ "excludes": EXCLUDED_GUESSIT_PROPERTIES,
+ }
+ if sort_type:
+ # Hint the type if known
+ guessit_options["type"] = sort_type
+
+ guess = guessit.api.guessit(digit_fix + name, options=guessit_options)
+ logging.debug("Initial guess for %s is %s", digit_fix + name, guess)
+
+ if digit_fix:
+ # Unfix the title
+ guess["title"] = guess.get("title", "")[len(digit_fix) :]
+
+ # Force season to 1 for seasonless episodes with no date
+ if guess.get("type") == "episode" and not "date" in guess:
+ guess.setdefault("season", 1)
+
+ # Try to avoid setting the type to movie on arbitrary jobs (e.g. 'Setup.exe') just because guessit defaults to that
+ table = str.maketrans({char: "" for char in whitespace + "_.-()[]{}"})
+ if guess.get("type") == "movie" and not sort_type == "movie": # No movie hint
+ if (
+ guess.get("title", "").translate(table) == name.translate(table) # Check for full name used as title
+ or any(
+ c in guess.get("release_group", "") for c in (whitespace + punctuation)
+ ) # interpuction of white spaces in the groupname
+ or not any(
+ [key in guess for key in ("year", "screen_size", "video_codec")]
+ ) # No typical movie properties set
+ ):
+ guess["type"] = "unknown"
+
+ # Remove sample indicators from groupnames, e.g. 'sample-groupname' or 'groupname-proof'
+ group = guess.get("release_group", "")
+ if group.lower().startswith(("sample-", "proof-")) or group.lower().endswith(("-sample", "-proof")):
+ # Set clean groupname
+ guess["release_group"] = re.sub("^(sample|proof)-|-(sample|proof)$", "", group, re.I)
+ # Add 'Sample' property to the guess
+ other = guess.get("other")
+ if not other:
+ guess.setdefault("other", "Sample")
else:
- mapping.append(("%desc", ""))
- mapping.append(("%.desc", ""))
- mapping.append(("%_desc", ""))
-
- # Replace dir-name before replacing %d for month
- mapping.append(("%dn", self.original_job_name))
-
- # Replace decades
- mapping.append(("%decade", self.date_info["decade"]))
- mapping.append(("%0decade", self.date_info["decade_two"]))
-
- # Replace month
- mapping.append(("%m", self.date_info["month"]))
- mapping.append(("%0m", self.date_info["month_two"]))
-
- # Replace date
- mapping.append(("%d", self.date_info["date"]))
- mapping.append(("%0d", self.date_info["date_two"]))
-
- path = path_subst(sorter, mapping)
-
- for key, name in REPLACE_AFTER.items():
- path = path.replace(key, name)
+ if "Sample" not in guess["other"]:
+ # Pre-existing 'other' may be a string or a list
+ try:
+ guess["other"].append("Sample")
+ except AttributeError:
+ guess["other"] = [other, "Sample"]
- # Lowercase all characters wrapped in {}
- path = to_lowercase(path)
+ return guess
- # Strip any extra ' ' '.' or '_' around foldernames
- path = strip_folders(path)
- # Split the last part of the path up for the renamer
- if extension:
- head, tail = os.path.split(path)
- self.filename_set = tail
- self.rename_or_not = True
- else:
- head = path
+def is_sample(filename: str) -> bool:
+ """Try to determine if filename belongs to a sample"""
+ if os.path.splitext(filename)[0].lower().strip() in ("sample", "proof"):
+ # The entire filename is just 'sample.ext' or similar
+ return True
- if head:
- return os.path.normpath(head)
- else:
- # The normpath function translates "" to "."
- # which results in wrong path.join later on
- return head
+ # If that didn't work, start guessing
+ guess = guess_what(filename).get("other", "")
+ if isinstance(guess, list):
+ return any(item in ("Sample", "Proof") for item in guess)
+ else:
+ return guess in ("Sample", "Proof")
- def rename(self, files, current_path):
- """Renaming Date file"""
- logging.debug("Renaming Date file")
- # find the master file to rename
- for file in files:
- if is_full_path(file):
- filepath = os.path.normpath(file)
- else:
- filepath = os.path.normpath(os.path.join(current_path, file))
- if os.path.exists(filepath):
- size = os.stat(filepath).st_size
- if size > cfg.movie_rename_limit.get_int():
- if "sample" not in file:
- self.fname, ext = os.path.splitext(os.path.split(file)[1])
- newname = "%s%s" % (self.filename_set, ext)
- newname = newname.replace("%fn", self.fname)
- newpath = os.path.join(current_path, newname)
- if not os.path.exists(newpath):
- try:
- logging.debug("Rename: %s to %s", filepath, newpath)
- renamer(filepath, newpath)
- except:
- logging.error(
- T("Failed to rename: %s to %s"), clip_path(current_path), clip_path(newpath)
- )
- logging.info("Traceback: ", exc_info=True)
- rename_similar(current_path, ext, self.filename_set, ())
- break
-
-
-def path_subst(path, mapping):
- """Replace the sort sting elements by real values.
- Non-elements are copied literally.
- path = the sort string
- mapping = array of tuples that maps all elements to their values
- """
+def path_subst(path: str, mapping: List[Tuple[str, str]]) -> str:
+ """Replace the sort string elements in the path with the real values provided by the mapping;
+ non-elements are copied verbatim."""
# Added ugly hack to prevent %ext from being masked by %e
newpath = []
plen = len(path)
@@ -1021,63 +668,38 @@ def path_subst(path, mapping):
n += len(key) - 1
result = value
break
- newpath.append(result)
+ if result:
+ newpath.append(result)
n += 1
return "".join(newpath)
-def get_titles(nzo: NzbObject, match, name, titleing=False):
- """The title will be the part before the match
- Clean it up and title() it
-
- ''.title() isn't very good under python so this contains
- a lot of little hacks to make it better and for more control
- """
+def get_titles(
+ nzo: Optional[NzbObject], guess: Optional[MatchesDict], jobname: str, titleing: bool = False
+) -> Tuple[str, str, str]:
+ """Get the title from NZB metadata or jobname, and return it in various formats. Formatting
+ mostly deals with working around quirks of Python's str.title(). NZB metadata is used as-is,
+ further processing done only for info obtained from guessit or the jobname."""
+ title = ""
if nzo:
+ # Fetch NZB metadata
title = nzo.nzo_info.get("propername")
- else:
- title = ""
if not title:
- if match:
- name = name[: match.start()]
-
- # Replace .US. with (US)
- if cfg.tv_sort_countries() == 1:
- for rep in COUNTRY_REP:
- # (us) > (US)
- name = replace_word(name, rep.lower(), rep)
- # (Us) > (US)
- name = replace_word(name, rep.title(), rep)
- # .US. > (US)
- dotted_country = ".%s." % (rep.strip("()"))
- name = replace_word(name, dotted_country, rep)
- # Remove .US. and (US)
- elif cfg.tv_sort_countries() == 2:
- for rep in COUNTRY_REP:
- # Remove (US)
- name = replace_word(name, rep, "")
- dotted_country = ".%s." % (rep.strip("()"))
- # Remove .US.
- name = replace_word(name, dotted_country, ".")
-
- title = name.replace(".", " ").replace("_", " ")
- title = title.strip().strip("(").strip("_").strip("-").strip().strip("_")
+ # Try guessit next
+ if guess:
+ title = guess.get("title", "")
+
+ # Fallback to the jobname if neither of the better options yielded a title
+ if not title:
+ title = jobname.replace(".", " ").replace("_", " ").strip(whitespace + "._-")
if titleing:
- title = title.title() # title the show name so it is in a consistent letter case
+ # Titlecase the show name so it is in a consistent letter case
+ title = title.title()
- # title applied uppercase to 's Python bug?
+ # Get rid of 's uppercased by str.title()
title = title.replace("'S", "'s")
- # Replace titled country names, (Us) with (US) and so on
- if cfg.tv_sort_countries() == 1:
- for rep in COUNTRY_REP:
- title = title.replace(rep.title(), rep)
- # Remove country names, ie (Us)
- elif cfg.tv_sort_countries() == 2:
- for rep in COUNTRY_REP:
- title = title.replace(rep.title(), "").strip()
-
# Make sure some words such as 'and' or 'of' stay lowercased.
for x in LOWERCASE:
xtitled = x.title()
@@ -1092,128 +714,94 @@ def get_titles(nzo: NzbObject, match, name, titleing=False):
if title:
title = title[0].title() + title[1:]
- # The title with spaces replaced by dots
- dots = title.replace(" - ", "-").replace(" ", ".").replace("_", ".")
- dots = dots.replace("(", ".").replace(")", ".").replace("..", ".").rstrip(".")
+ if guess and "country" in guess:
+ title += " " + str(guess.get("country")) # Append ' CC'
- # The title with spaces replaced by underscores
- underscores = title.replace(" ", "_").replace(".", "_").replace("__", "_").rstrip("_")
+ # Alternative formats
+ dots = re.sub(
+ r"\.{2,}",
+ ".",
+ title.replace(" - ", "-").replace(" ", ".").replace("_", ".").replace("(", ".").replace(")", "."),
+ ).rstrip(".")
+ underscores = re.sub("_{2,}", "_", title.replace(" ", "_").replace(".", "_")).rstrip("_")
return title, dots, underscores
-def replace_word(word_input, one, two):
+def replace_word(word_input: str, one: str, two: str) -> str:
"""Regex replace on just words"""
- regex = re.compile(r"\W(%s)(\W|$)" % one, re.I)
- matches = regex.findall(word_input)
+ RE_WORD = re.compile(r"\W(%s)(\W|$)" % one, re.I)
+ matches = RE_WORD.findall(word_input)
if matches:
for _ in matches:
word_input = word_input.replace(one, two)
return word_input
-def get_descriptions(nzo: NzbObject, match, name):
- """If present, get a description from the nzb name.
- A description has to be after the matched item, separated either
- like ' - Description' or '_-_Description'
- """
+def get_descriptions(nzo: Optional[NzbObject], guess: Optional[MatchesDict], jobname: str) -> Tuple[str, str, str]:
+ """Try to get an episode title or similar description from the NZB metadata or jobname, e.g.
+ 'Download This' in Show.S01E23.Download.This.1080p.HDTV.x264 and return multiple formats"""
+ ep_name = None
if nzo:
ep_name = nzo.nzo_info.get("episodename")
- else:
- ep_name = ""
- if not ep_name:
- if match:
- ep_name = name[match.end() :] # Need to improve for multi-ep support
- else:
- ep_name = name
- ep_name = ep_name.strip(" _.")
- if ep_name.startswith("-"):
- ep_name = ep_name.strip("- _.")
- if "." in ep_name and " " not in ep_name:
- ep_name = ep_name.replace(".", " ")
- ep_name = ep_name.replace("_", " ")
- ep_name2 = ep_name.replace(" - ", "-").replace(" ", ".")
- ep_name3 = ep_name.replace(" ", "_")
- return ep_name, ep_name2, ep_name3
-
-
-def get_decades(year):
- """Return 4 digit and 2 digit decades given 'year'"""
- if year:
- try:
- decade = year[2:3] + "0"
- decade2 = year[:3] + "0"
- except:
- decade = ""
- decade2 = ""
- else:
- decade = ""
- decade2 = ""
- return decade, decade2
+ if (not ep_name) and guess:
+ ep_name = guess.get("episode_title")
+ ep_name = ep_name or ""
+ ep_name = ep_name.strip("- _.")
+ if "." in ep_name and " " not in ep_name:
+ ep_name = ep_name.replace(".", " ")
-def get_resolution(job_name):
- try:
- RE_RESOLUTION = re.compile(resolution_match)
- # Use the last match, lowercased
- resolution = RE_RESOLUTION.findall(job_name)[-1][1].lower()
- except Exception:
- resolution = ""
- return resolution
+ # Return the episode names with spaces, dots, and underscores
+ return ep_name.replace("_", " "), ep_name.replace(" - ", "-").replace(" ", "."), ep_name.replace(" ", "_")
-def check_for_folder(path):
- """Return True if any folder is found in the tree at 'path'"""
+def has_subdirectory(path: str) -> bool:
+ """Return True if any directory is found inside the tree at 'path'"""
for _root, dirs, _files in os.walk(path):
if dirs:
return True
return False
-def to_lowercase(path):
+def to_lowercase(path: str) -> str:
"""Lowercases any characters enclosed in {}"""
- _RE_LOWERCASE = re.compile(r"{([^{]*)}")
+ RE_LOWERCASE = re.compile(r"{([^{]*)}")
while True:
- m = _RE_LOWERCASE.search(path)
+ m = RE_LOWERCASE.search(path)
if not m:
break
path = path[: m.start()] + m.group(1).lower() + path[m.end() :]
- # just in case
- path = path.replace("{", "")
- path = path.replace("}", "")
- return path
+ # Remove any remaining '{' and '}'
+ return path.replace("{", "").replace("}", "")
-def strip_folders(path):
- """Return 'path' without leading and trailing spaces and underscores in each element
- For Windows, also remove leading and trailing dots
- """
- unc = sabnzbd.WIN32 and (path.startswith("//") or path.startswith("\\\\"))
- f = path.strip("/").split("/")
-
- # For path beginning with a slash, insert empty element to prevent loss
- if path.strip()[0] in "/\\":
- f.insert(0, "")
-
- def strip_all(x):
- """Strip all leading/trailing underscores also dots for Windows"""
- x = x.strip().strip("_")
- if sabnzbd.WIN32:
- # macOS and Linux should keep dots, because leading dots are significant
- # while Windows cannot handle trailing dots
- x = x.strip(".")
- x = x.strip()
- return x
-
- path = os.path.normpath("/".join([strip_all(x) for x in f]))
- if unc:
- return "\\" + path
- else:
- return path
+def strip_path_elements(path: str) -> str:
+ """Return 'path' without leading and trailing spaces and underscores in each element"""
+ # Clear the most deviant of UNC notations
+ path = clip_path(path)
+ if sabnzbd.WIN32:
+ path = path.replace("\\", "/") # Switch to unix style directory separators
+ is_unc = sabnzbd.WIN32 and path.startswith("//")
+
+ path_elements = path.strip("/").split("/")
+ # Insert an empty element to prevent loss, if path starts with a slash
+ if not is_unc and path.strip()[0] in "/":
+ path_elements.insert(0, "")
+
+ # For Windows, also remove leading and trailing dots: it cannot handle trailing dots, and
+ # leading dots carry no significance like on macOS, Linux, etc.
+ chars = whitespace + "_" + ("." if sabnzbd.WIN32 else "")
+ # Clean all elements and reconstruct the path
+ path = os.path.normpath("/".join([element.strip(chars) for element in path_elements]))
+ path = path.replace("//", "/") # Re: https://bugs.python.org/issue26329
-def rename_similar(folder, skip_ext, name, skipped_files):
+ return "\\\\" + path if is_unc else path
+
+
+def rename_similar(folder: str, skip_ext: str, name: str, skipped_files: List[str]) -> None:
"""Rename all other files in the 'folder' hierarchy after 'name'
and move them to the root of 'folder'.
Files having extension 'skip_ext' will be moved, but not renamed.
@@ -1248,90 +836,99 @@ def rename_similar(folder, skip_ext, name, skipped_files):
cleanup_empty_directories(folder)
-def check_regexs(filename, matchers):
- """Regular Expression match for a list of regexes
- Returns the MatchObject if a match is made
- This version checks for an additional match
- """
- extras = []
- for expressions in matchers:
- expression, extramatchers = expressions
- match1 = expression.search(filename)
- if match1:
- for m in extramatchers:
- match2 = m.findall(filename, match1.end())
- if match2:
- for match in match2:
- if type(match) == type(()) and len(match) > 1:
- extras.append(match[1])
- else:
- extras.append(match)
- break
- return match1, extras
- return None, None
-
-
-def check_for_date(filename, matcher):
- """Regular Expression match for date based files
- Returns the MatchObject if a match is made
- """
- x = 0
- if matcher:
- for expression in matcher:
- regex = re.compile(expression)
- match1 = regex.search(filename)
- x += 1
- if match1:
- return match1, x
- return None, 0
-
-
-def is_full_path(file):
- """Return True if path is absolute"""
- if file.startswith("\\") or file.startswith("/"):
- return True
- try:
- if file[1:3] == ":\\":
- return True
- except:
- pass
- return False
+def is_full_path(file: str) -> bool:
+ """Determine whether file has an absolute path"""
+ return file.startswith("/") or (sabnzbd.WIN32 and (file.startswith("\\") or file[1:3] == ":\\"))
-def eval_sort(sorttype, expression, name=None, multipart=""):
+def eval_sort(sort_type: str, expression: str, name: str = None, multipart: str = "") -> Optional[str]:
"""Preview a sort expression, to be used by API"""
from sabnzbd.api import Ttemplate
path = ""
name = sanitize_foldername(name)
- if sorttype == "series":
+ if sort_type == "series":
name = name or ("%s S01E05 - %s [DTS]" % (Ttemplate("show-name"), Ttemplate("ep-name")))
- sorter = SeriesSorter(None, name, path, "tv")
- elif sorttype == "movie":
+ sorter = SeriesSorter(None, name, path, "tv", force=True)
+ elif sort_type == "movie":
name = name or (Ttemplate("movie-sp-name") + " (2009)")
- sorter = MovieSorter(None, name, path, "tv")
- elif sorttype == "date":
+ sorter = MovieSorter(None, name, path, "tv", force=True)
+ elif sort_type == "date":
name = name or (Ttemplate("show-name") + " 2009-01-02")
- sorter = DateSorter(None, name, path, "tv")
+ sorter = DateSorter(None, name, path, "tv", force=True)
else:
return None
sorter.sort_string = expression
- sorter.match(force=True)
- path = sorter.get_final_path()
- path = os.path.normpath(os.path.join(path, sorter.filename_set))
+ path = os.path.normpath(os.path.join(sorter.get_final_path(), sorter.filename_set))
fname = Ttemplate("orgFilename")
fpath = path
- if sorttype == "movie" and "%1" in multipart:
+ if sort_type == "movie" and "%1" in multipart:
fname = fname + multipart.replace("%1", "1")
fpath = fpath + multipart.replace("%1", "1")
if "%fn" in path:
- path = path.replace("%fn", fname + ".mkv")
+ path = path.replace("%fn", fname + ".ext")
else:
- if sorter.rename_or_not:
- path = fpath + ".mkv"
+ if sorter.do_rename:
+ path = fpath + ".ext"
else:
- if sabnzbd.WIN32:
- path += "\\"
- else:
- path += "/"
+ path += "\\" if sabnzbd.WIN32 else "/"
return path
+
+
+def check_for_multiple(files: List[str]) -> Optional[Dict[str, str]]:
+ """Return list of files that looks like a multi-part post"""
+ RE_MULTIPLE = (
+ re.compile(r"cd\W?(\d+)\W?", re.I), # .cd1.mkv
+ re.compile(r"\w\W?([\w\d])[{}]*$", re.I), # blah1.mkv blaha.mkv
+ re.compile(r"\w\W([\w\d])\W", re.I), # blah-1-ok.mkv blah-a-ok.mkv
+ )
+ for regex in RE_MULTIPLE:
+ matched_files = check_for_sequence(regex, files)
+ if matched_files:
+ return matched_files
+ return None
+
+
+def check_for_sequence(regex, files: List[str]) -> Dict[str, str]:
+ """Return list of files that looks like a sequence"""
+ matches = {}
+ prefix = None
+ # Build a dictionary of matches with keys based on the matches, e.g. {1:'blah-part1.mkv'}
+ for _file in files:
+ name, ext = os.path.splitext(_file)
+ match1 = regex.search(name)
+ if match1:
+ if not prefix or prefix == name[: match1.start()]:
+ matches[match1.group(1)] = name + ext
+ prefix = name[: match1.start()]
+
+ # Don't do anything if only one or no files matched
+ if len(list(matches)) < 2:
+ return {}
+
+ key_prev = 0
+ passed = True
+
+ # Check the dictionary to see if the keys form an alphanumeric sequence
+ for akey in sorted(matches):
+ if akey.isdigit():
+ key = int(akey)
+ elif akey in ascii_lowercase:
+ key = ascii_lowercase.find(akey) + 1
+ else:
+ passed = False
+
+ if passed:
+ if not key_prev:
+ key_prev = key
+ else:
+ if key_prev + 1 == key:
+ key_prev = key
+ else:
+ passed = False
+ if passed:
+ # convert {'b':'filename-b.mkv'} to {'2', 'filename-b.mkv'}
+ item = matches.pop(akey)
+ matches[str(key)] = item
+
+ return matches if passed else {}
diff --git a/tests/test_sorting.py b/tests/test_sorting.py
index a5086c9..e8ff5ba 100644
--- a/tests/test_sorting.py
+++ b/tests/test_sorting.py
@@ -18,29 +18,694 @@
"""
tests.test_sorting - Testing functions in sorting.py
"""
+import os
+import pyfakefs
+import shutil
+import sys
+from random import choice
from sabnzbd import sorting
from tests.testhelper import *
-class TestSorting:
+class TestSortingFunctions:
@pytest.mark.parametrize(
- "job_name, result",
+ "name, result",
[
- ("Ubuntu.optimized.for.1080p.Screens-Canonical", "1080p"),
- ("Debian_for_240i_Scientific_Calculators-FTPMasters", "240i"),
- ("OpenBSD Streaming Edition 4320P", "4320p"), # Lower-case result
- ("Surely.1080p.is.better.than.720p", "720p"), # Last hit wins
- ("2160p.Campaign.Video", "2160p"), # Resolution at the start
- ("Some.Linux.Iso.1234p", ""), # Non-standard resolution
- ("No.Resolution.Anywhere", ""),
- ("not.keeping.its1080p.distance", ""), # No separation
- ("not_keeping_1440idistance_either", ""),
- ("240 is a semiperfect and highly composite number", ""), # Number only
- (480, ""),
- (None, ""),
+ (
+ "2147.Confinement.2015.1080p.WEB-DL.DD5.1.H264-EMRG",
+ {"type": "movie", "title": "2147 Confinement"},
+ ), # Digit at the start
+ (
+ "2146.Confinement.1080p.WEB-DL.DD5.1.H264-EMRG",
+ {"type": "movie", "title": "2146 Confinement"},
+ ), # No year, guessit sets type to episode
+ ("Setup.exe", {"type": "unknown", "title": "Setup exe"}), # Guessit uses 'movie' as its default type
+ (
+ "25.817.hdtv-rofl",
+ {"type": "episode", "title": "25", "season": 8, "episode": 17},
+ ), # Guessit comes up with bad episode info: [25, 17]
+ (
+ "The.Wonders.of.Usenet.E08.2160p-SABnzbd",
+ {"type": "episode", "season": 1, "episode": 8},
+ ), # Episode without season
+ (
+ "Glade Runner 2094 2022.avi",
+ {"type": "movie", "title": "Glade Runner 2094", "year": 2022},
+ ), # Double year
+ ("Micro.Maffia.s01.web.aac.x265-Tfoe{{Wollah}}", {"release_group": "Tfoe"}), # Password in jobname
+ ("No.Choking.Part.2.2008.360i-NotLOL", {"part": None, "title": "No Choking Part 2"}), # Part property
+ (
+ "John.Hamburger.III.US.S01E01.OMG.WTF.BBQ.4320p.WEB.H265-HeliUM.mkv",
+ {
+ "type": "episode",
+ "episode_title": "OMG WTF BBQ",
+ "screen_size": "4320p",
+ "title": "John Hamburger III",
+ "country": "US",
+ },
+ ),
+ ("Test Movie 720p HDTV AAC x265 sample-MYgroup", {"release_group": "MYgroup", "other": "Sample"}),
+ (None, None), # Jobname missing
+ ("", None),
+ ],
+ )
+ def test_guess_what(self, name, result):
+ """Test guessing quirks"""
+ if not result:
+ # Bad input
+ with pytest.raises(ValueError):
+ guess = sorting.guess_what(name)
+ else:
+ guess = sorting.guess_what(name)
+ for key, value in result.items():
+ if value is None:
+ # Property should not exist in the guess
+ assert key not in guess
+ else:
+ assert guess[key] == value
+
+ @pytest.mark.parametrize(
+ "name, result",
+ [
+ ("Free.Open.Source.Movie.2001.1080p.WEB-DL.DD5.1.H264-FOSS", False), # Not samples
+ ("Setup.exe", False),
+ ("23.123.hdtv-rofl", False),
+ ("Something.1080p.WEB-DL.DD5.1.H264-EMRG-sample", True), # Samples
+ ("Something.1080p.WEB-DL.DD5.1.H264-EMRG-sample.ogg", True),
+ ("Sumtin_Else_1080p_WEB-DL_DD5.1_H264_proof-EMRG", True),
+ ("Wot.Eva.540i.WEB-DL.aac.H264-Groupie sample.mp4", True),
+ ("file-sample.mkv", True),
+ ("PROOF.JPG", True),
+ ("Bla.s01e02.title.1080p.aac-sample proof.mkv", True),
+ ("Bla.s01e02.title.1080p.aac-proof.mkv", True),
+ ("Bla.s01e02.title.1080p.aac sample proof.mkv", True),
+ ("Bla.s01e02.title.1080p.aac proof.mkv", True),
+ ("Not Death Proof (2022) 1080p x264 (DD5.1) BE Subs", False), # Try to trigger some false positives
+ ("Proof.of.Everything.(2042).4320p.x266-4U", False),
+ ("Crime_Scene_S01E13_Free_Sample_For_Sale_480p-OhDear", False),
+ ("Sample That 2011 480p WEB-DL.H265-aMiGo", False),
+ ("Look at That 2011 540i WEB-DL.H265-NoSample", False),
+ ("NOT A SAMPLE.JPG", False),
+ ],
+ )
+ def test_is_sample(self, name, result):
+ assert sorting.is_sample(name) == result
+
+ @pytest.mark.parametrize("platform", ["linux", "darwin", "win32"])
+ @pytest.mark.parametrize(
+ "path, result_unix, result_win",
+ [
+ ("/tmp/test.file", True, True),
+ ("/boot", True, True),
+ ("/y.e.p", True, True),
+ ("/ok/", True, True),
+ ("/this.is.a/full.path", True, True),
+ ("f:\\e.txt", False, True),
+ ("\\\\relative.path", False, True),
+ ("Z:\\some\\thing", False, True),
+ ("Bitte ein Bit", False, False),
+ ("this/is/not/an/abs.path", False, False),
+ ("this\\is\\not\\an\\abs.path", False, False),
+ ("AAA", False, False),
+ ("", False, False),
+ ],
+ )
+ def test_is_full_path(self, platform, path, result_unix, result_win):
+ @set_platform(platform)
+ def _func():
+ result = result_win if sabnzbd.WIN32 else result_unix
+ assert sorting.is_full_path(path) == result
+
+ _func()
+
+ @pytest.mark.skipif(not sys.platform.startswith("win"), reason="Windows tests")
+ @pytest.mark.parametrize(
+ "path, result",
+ [
+ ("P:\\foo\\bar", "P:\\foo\\bar"),
+ ("FOO\\bar\\", "FOO\\bar"),
+ ("foo\\_bar_", "foo\\bar"),
+ ("foo\\__bar", "foo\\bar"),
+ ("foo\\bar__", "foo\\bar"),
+ ("foo\\ bar ", "foo\\bar"),
+ ("foo\\ bar", "foo\\bar"),
+ ("E:\\foo\\bar _", "E:\\foo\\bar"),
+ ("E:\\foo_\\_bar", "E:\\foo\\bar"),
+ ("E:\\foo._\\bar", "E:\\foo\\bar"),
+ (".foo\\bar", "foo\\bar"), # Dots
+ ("E:\\\\foo\\bar\\...", "E:\\foo\\bar"),
+ ("E:\\\\foo\\bar\\...", "E:\\foo\\bar"),
+ ("E:\\foo_\\bar\\...", "E:\\foo\\bar"),
+ ("\\\\some.path.\\foo\\_bar_", "\\\\some.path\\foo\\bar"), # UNC
+ ("\\\\some.path.\\foo\\_bar_", "\\\\some.path\\foo\\bar"),
+ (r"\\?\UNC\SRVR\SHR\__File.txt__", r"\\SRVR\SHR\File.txt"),
+ ("F:\\.path.\\ more\\foo bar ", "F:\\path\\more\\foo bar"), # Drive letter
+ ("c:\\.path.\\ more\\foo bar \\ ", "c:\\path\\more\\foo bar"),
+ ("c:\\foo_.\\bar", "c:\\foo\\bar"), # The remainder are all regression tests
+ ("c:\\foo_ _\\bar", "c:\\foo\\bar"),
+ ("c:\\foo. _\\bar", "c:\\foo\\bar"),
+ ("c:\\foo. .\\bar", "c:\\foo\\bar"),
+ ("c:\\foo. _\\bar", "c:\\foo\\bar"),
+ ("c:\\foo. .\\bar", "c:\\foo\\bar"),
+ ("c:\\__\\foo\\bar", "c:\\foo\\bar"), # No double \\\\ when an entire element is stripped
+ ("c:\\...\\foobar", "c:\\foobar"),
+ ],
+ )
+ def test_strip_path_elements_win(self, path, result):
+ def _func():
+ assert sorting.strip_path_elements(path) == result
+
+ _func()
+
+ @pytest.mark.skipif(sys.platform.startswith("win"), reason="Unix tests")
+ @pytest.mark.parametrize(
+ "path, result",
+ [
+ ("/foo/bar", "/foo/bar"),
+ ("FOO/bar/", "FOO/bar"),
+ ("foo/_bar_", "foo/bar"),
+ ("foo/__bar", "foo/bar"),
+ ("foo/bar__", "foo/bar"),
+ ("foo/ bar ", "foo/bar"),
+ ("foo/ bar", "foo/bar"),
+ ("/foo/bar _", "/foo/bar"),
+ ("/foo_/_bar", "/foo/bar"),
+ ("/foo._/bar", "/foo./bar"),
+ (".foo/bar", ".foo/bar"), # Dots
+ ("/foo/bar/...", "/foo/bar/..."),
+ ("foo_\\bar\\...", "foo_\\bar\\..."),
+ ("foo_./bar", "foo_./bar"), # The remainder are all regression tests
+ ("foo_ _/bar", "foo/bar"),
+ ("foo. _/bar", "foo./bar"),
+ ("foo. ./bar", "foo. ./bar"),
+ ("foo. _/bar", "foo./bar"),
+ ("/foo. ./bar", "/foo. ./bar"),
+ ("/__/foo/bar", "/foo/bar"), # No double // when an entire element is stripped
+ ],
+ )
+ def test_strip_path_elements_unix(self, path, result):
+ def _func():
+ assert sorting.strip_path_elements(path) == result
+
+ _func()
+
+ @pytest.mark.parametrize(
+ "path, result",
+ [
+ ("/Foo/Bar", "/Foo/Bar"), # Nothing to do
+ ("/{Foo}/Bar", "/foo/Bar"),
+ ("{/Foo/B}ar", "/foo/bar"),
+ ("/{F}oo/B{AR}", "/foo/Bar"), # Multiple
+ ("/F{{O}O/{B}A}R", "/Foo/baR"), # Multiple, overlapping
+ ("/F}oo/B{ar", "/Foo/Bar"), # Wrong order, no lowercasing should be done but { and } removed still
("", ""),
],
)
- def test_get_resolution(self, job_name, result):
- assert sorting.get_resolution(job_name) == result
+ def test_to_lowercase(self, path, result):
+ assert sorting.to_lowercase(path) == result
+
+ def test_has_subdirectory(self):
+ with pyfakefs.fake_filesystem_unittest.Patcher() as ffs:
+ pyfakefs.fake_filesystem_unittest.set_uid(0)
+ # Prep the fake filesystem
+ for test_dir in ["/another/test/dir", "/some/TEST/DIR"]:
+ ffs.fs.create_dir(test_dir, perm_bits=755)
+ # Sanity check
+ assert os.path.exists(test_dir) is True
+
+ assert sorting.has_subdirectory("/") is True
+ assert sorting.has_subdirectory("/some") is True
+ assert sorting.has_subdirectory("/another/test/") is True
+ # No subdirs
+ assert sorting.has_subdirectory("/another/test/dir") is False
+ assert sorting.has_subdirectory("/some/TEST/DIR/") is False
+ # Nonexistent dir
+ assert sorting.has_subdirectory("/some/TEST/NoSuchDir") is False
+ assert sorting.has_subdirectory("/some/TEST/NoSuchDir/") is False
+ # Relative path
+ assert sorting.has_subdirectory("some/TEST/NoSuchDir") is False
+ assert sorting.has_subdirectory("some/TEST/NoSuchDir/") is False
+ assert sorting.has_subdirectory("TEST") is False
+ assert sorting.has_subdirectory("TEST/") is False
+ # Empty input
+ assert sorting.has_subdirectory("") is False
+
+ @pytest.mark.parametrize(
+ "path, result",
+ [
+ ("/Foo/Bar", False),
+ ("", False),
+ ("%fn", True),
+ (".%ext", True),
+ ("%fn.%ext", True),
+ ("{%fn}", True), # A single closing lowercase marker is allowed
+ ("{.%ext}", True),
+ ("%fn{}", False), # But not the opening lowercase marker
+ (".%ext{}", False),
+ ("%fn}}", False), # Nor multiple closing lowercase markers
+ (".%ext}}", False),
+ ("%ext.%fn", True),
+ ("%ext", False), # Missing dot
+ ("%fn.ext", False),
+ (".ext", False),
+ (".fn", False),
+ ("", False),
+ ],
+ )
+ def test_ends_in_file(self, path, result):
+ assert sorting.ends_in_file(path) is result
+ assert sorting.ends_in_file(os.path.join("/tmp", path)) is result # Prepending makes no difference
+ assert sorting.ends_in_file("foo.bar-" + path) is result
+ assert sorting.ends_in_file(path + "-foo.bar") is False # Appending does, obviously
+ assert sorting.ends_in_file(os.path.join("/tmp", path + "-foo.bar")) is False
+
+ @pytest.mark.skipif(sys.platform.startswith("win"), reason="Unix tests")
+ def test_move_to_parent_directory_unix(self):
+ # Standard files/dirs
+ with pyfakefs.fake_filesystem_unittest.Patcher() as ffs:
+ pyfakefs.fake_filesystem_unittest.set_uid(0)
+ # Create a fake filesystem with some file content in a random base directory
+ base_dir = "/" + os.urandom(4).hex() + "/" + os.urandom(2).hex()
+ for test_dir in ["dir/2", "TEST/DIR2"]:
+ ffs.fs.create_dir(base_dir + "/" + test_dir, perm_bits=755)
+ assert os.path.exists(base_dir + "/" + test_dir) is True
+ for test_file in ["dir/some.file", "TEST/DIR/FILE"]:
+ ffs.fs.create_file(base_dir + "/" + test_file, int("0644", 8))
+ assert os.path.exists(base_dir + "/" + test_file) is True
+
+ return_path, return_status = sorting.move_to_parent_directory(base_dir + "/TEST")
+
+ # Affected by move
+ assert not os.path.exists(base_dir + "/TEST/DIR/FILE") # Moved to subdir
+ assert not os.path.exists(base_dir + "/TEST/DIR2") # Deleted empty directory
+ assert not os.path.exists(base_dir + "/DIR2") # Dirs don't get moved, only their file content
+ assert os.path.exists(base_dir + "/DIR/FILE") # Moved file
+ # Not moved
+ assert not os.path.exists(base_dir + "/some.file")
+ assert not os.path.exists(base_dir + "/2")
+ assert os.path.exists(base_dir + "/dir/some.file")
+ assert os.path.exists(base_dir + "/dir/2")
+ # Function return values
+ assert (return_path) == base_dir
+ assert (return_status) is True
+
+ # Exception for DVD directories
+ with pyfakefs.fake_filesystem_unittest.Patcher() as ffs:
+ pyfakefs.fake_filesystem_unittest.set_uid(0)
+ # Create a fake filesystem in a random base directory, and included a typical DVD directory
+ base_dir = "/" + os.urandom(4).hex() + "/" + os.urandom(2).hex()
+ dvd = choice(("video_ts", "audio_ts", "bdmv"))
+ for test_dir in ["dir/2", "TEST/DIR2"]:
+ ffs.fs.create_dir(base_dir + "/" + test_dir, perm_bits=755)
+ assert os.path.exists(base_dir + "/" + test_dir) is True
+ for test_file in ["dir/some.file", "TEST/" + dvd + "/FILE"]:
+ ffs.fs.create_file(base_dir + "/" + test_file, int("0644", 8))
+ assert os.path.exists(base_dir + "/" + test_file) is True
+
+ return_path, return_status = sorting.move_to_parent_directory(base_dir + "/TEST")
+
+ # Nothing should move in the presence of a DVD directory structure
+ assert os.path.exists(base_dir + "/TEST/" + dvd + "/FILE")
+ assert os.path.exists(base_dir + "/TEST/DIR2")
+ assert not os.path.exists(base_dir + "/DIR2")
+ assert not os.path.exists(base_dir + "/DIR/FILE")
+ assert not os.path.exists(base_dir + "/some.file")
+ assert not os.path.exists(base_dir + "/2")
+ assert os.path.exists(base_dir + "/dir/some.file")
+ assert os.path.exists(base_dir + "/dir/2")
+ # Function return values
+ assert (return_path) == base_dir + "/TEST"
+ assert (return_status) is True
+
+ @pytest.mark.skipif(not sys.platform.startswith("win"), reason="Windows tests")
+ def test_move_to_parent_directory_win(self):
+ # Standard files/dirs
+ with pyfakefs.fake_filesystem_unittest.Patcher() as ffs:
+ pyfakefs.fake_filesystem_unittest.set_uid(0)
+ # Create a fake filesystem with some file content in a random base directory
+ base_dir = "Z:\\" + os.urandom(4).hex() + "\\" + os.urandom(2).hex()
+ for test_dir in ["dir\\2", "TEST\\DIR2"]:
+ ffs.fs.create_dir(base_dir + "\\" + test_dir, perm_bits=755)
+ assert os.path.exists(base_dir + "\\" + test_dir) is True
+ for test_file in ["dir\\some.file", "TEST\\DIR\\FILE"]:
+ ffs.fs.create_file(base_dir + "\\" + test_file, int("0644", 8))
+ assert os.path.exists(base_dir + "\\" + test_file) is True
+
+ return_path, return_status = sorting.move_to_parent_directory(base_dir + "\\TEST")
+
+ # Affected by move
+ assert not os.path.exists(base_dir + "\\TEST\\DIR\\FILE") # Moved to subdir
+ assert not os.path.exists(base_dir + "\\TEST\\DIR2") # Deleted empty directory
+ assert not os.path.exists(base_dir + "\\DIR2") # Dirs don't get moved, only their file content
+ assert os.path.exists(base_dir + "\\DIR\\FILE") # Moved file
+ # Not moved
+ assert not os.path.exists(base_dir + "\\some.file")
+ assert not os.path.exists(base_dir + "\\2")
+ assert os.path.exists(base_dir + "\\dir\\some.file")
+ assert os.path.exists(base_dir + "\\dir\\2")
+ # Function return values
+ assert (return_path) == base_dir
+ assert (return_status) is True
+
+ # Exception for DVD directories
+ with pyfakefs.fake_filesystem_unittest.Patcher() as ffs:
+ pyfakefs.fake_filesystem_unittest.set_uid(0)
+ # Create a fake filesystem in a random base directory, and included a typical DVD directory
+ base_dir = "D:\\" + os.urandom(4).hex() + "\\" + os.urandom(2).hex()
+ dvd = choice(("video_ts", "audio_ts", "bdmv"))
+ for test_dir in ["dir\\2", "TEST\\DIR2"]:
+ ffs.fs.create_dir(base_dir + "\\" + test_dir, perm_bits=755)
+ assert os.path.exists(base_dir + "\\" + test_dir) is True
+ for test_file in ["dir\\some.file", "TEST\\" + dvd + "\\FILE"]:
+ ffs.fs.create_file(base_dir + "\\" + test_file, int("0644", 8))
+ assert os.path.exists(base_dir + "\\" + test_file) is True
+
+ return_path, return_status = sorting.move_to_parent_directory(base_dir + "\\TEST")
+
+ # Nothing should move in the presence of a DVD directory structure
+ assert os.path.exists(base_dir + "\\TEST\\" + dvd + "\\FILE")
+ assert os.path.exists(base_dir + "\\TEST\\DIR2")
+ assert not os.path.exists(base_dir + "\\DIR2")
+ assert not os.path.exists(base_dir + "\\DIR\\FILE")
+ assert not os.path.exists(base_dir + "\\some.file")
+ assert not os.path.exists(base_dir + "\\2")
+ assert os.path.exists(base_dir + "\\dir\\some.file")
+ assert os.path.exists(base_dir + "\\dir\\2")
+ # Function return values
+ assert (return_path) == base_dir + "\\TEST"
+ assert (return_status) is True
+
+
+@pytest.mark.usefixtures("clean_cache_dir")
+class TestSortingSorters:
+ @pytest.mark.parametrize(
+ "s_class, jobname, sort_string, result_path, result_setname",
+ [
+ (
+ sorting.DateSorter,
+ "My.EveryDay.Show.20210203.Great.Success.1080p.aac.hdtv-mygrp.mkv",
+ "%y-%0m/%t - %y-%0m-%0d - %desc.%ext",
+ "2021-02",
+ "My EveryDay Show - 2021-02-03 - Great Success",
+ ),
+ (
+ sorting.DateSorter,
+ "My.EveryDay.Show.20210606.Greater.Successes.2160p.dts.bluray-mygrp.mkv",
+ "%y-%m/%t - %y-%m-%d - %desc.%ext",
+ "2021-6",
+ "My EveryDay Show - 2021-6-6 - Greater Successes",
+ ),
+ (
+ sorting.DateSorter,
+ "ME!.1999.12.31.720p.hd-tv",
+ "{%t}/%0decade_%r/%t - %y-%0m-%0d.%ext",
+ "me!/1990_720p",
+ "ME! - 1999-12-31",
+ ),
+ (
+ sorting.DateSorter,
+ "2000 A.D. 28-01-2000 360i dvd-r.avi",
+ "%y/%0m/%0d/%r.%dn.%ext",
+ "2000/01/28",
+ "360i.2000 A.D. 28-01-2000 360i dvd-r.avi",
+ ),
+ (sorting.DateSorter, "Allo_Allo_07-SEP-1984", "%y/%0m/%0d/%.t.%ext", "1984/09/07", "Allo.Allo"),
+ (
+ sorting.DateSorter,
+ "www.example.org Allo_Allo_07-SEP-1984",
+ "%GI/%GI/%y/%0m/%0d/%.t%GI.%ext",
+ "www.example.org/1984/09/07",
+ "Allo.Allo",
+ ),
+ (
+ sorting.SeriesSorter,
+ "onslow.goes.to.university.s06e66-grp.mkv",
+ "%sn/Season %s/%sn - %sx%0e - %en.%ext",
+ "Onslow Goes To University/Season 6",
+ "Onslow Goes To University - 6x66 - grp",
+ ),
+ (
+ sorting.SeriesSorter,
+ "rose's_BEAUTY_parlour",
+ "%sn/S%0sE%0e - %en/%sn - S%0sE%0e - %en.%ext",
+ "Rose's Beauty Parlour/S01E -", # Season defaults to '1' if missing, episode doesn't
+ "Rose's Beauty Parlour - S01E -",
+ ),
+ (
+ sorting.SeriesSorter,
+ "Cooking with Hyacinth S01E13 Biscuits 2160p DD5.1 Cookies",
+ "{%s.N}/%sx%0e - %en/%s_N - %0sx%0e - %en (%r).%ext",
+ "cooking.with.hyacinth/1x13 - Biscuits",
+ "Cooking_with_Hyacinth - 01x13 - Biscuits (2160p)",
+ ),
+ (
+ sorting.SeriesSorter,
+ "Daisy _ (1987) _ S01E02 _ 480i.avi",
+ "%dn.%ext",
+ "",
+ "Daisy _ (1987) _ S01E02 _ 480i.avi",
+ ),
+ (
+ sorting.SeriesSorter,
+ "Bruce.and.Violet.S126E202.Oh.Dear.Benz.4320p.mkv",
+ "%sn/Season W%s/W%0e_%desc.%ext",
+ "Bruce and Violet/Season W126",
+ "W202",
+ ), # %desc should be stripped, season and episode numbers >=100 handled correctly, and "and" remain lowercase
+ (
+ sorting.SeriesSorter,
+ "[www.sabnzbd.org]Candle.Light.Dinners.S02E13.Elite.Soups.dts.hvec-NZBLuv.mkv",
+ "%s.N.S%0sE%0e.(%e.n).%G.I.%GI-%GI.%ext",
+ "",
+ "Candle.Light.Dinners.S02E13.(Elite.Soups).DTS.www.sabnzbd.org-hvec-NZBLuv",
+ ), # GI
+ (
+ sorting.SeriesSorter,
+ "Candle.Light.Dinners.S02E13.DD+5.1.x265.Hi10-NZBLuv.mkv",
+ "%s_N_S%0sE%0e_%G_I_%G.I_%G_I.%ext",
+ "",
+ "Candle_Light_Dinners_S02E13_H.265_10-bit_Dolby_Digital_Plus",
+ ), # GI with spacer
+ (
+ sorting.MovieSorter,
+ "Pantomimes.Lumineuses.1982.2160p.WEB-DL.DDP5.1.H.264-TheOpt.mkv",
+ "%r/%year/%title-%G.I.%ext",
+ "2160p/1982",
+ "Pantomimes Lumineuses-TheOpt",
+ ),
+ (
+ sorting.MovieSorter,
+ "The Lucky Dog 1921 540i Tape ac3 mono-LnH proof sample.avi",
+ "%year/%_t-%G.I.%ext",
+ "1921",
+ "The_Lucky_Dog-Proof-Sample",
+ ),
+ (
+ sorting.MovieSorter,
+ "Kid_Auto_Races_at_Venice_[2014]",
+ "%0decades/%y_%_t",
+ "2010s/2014_Kid_Auto_Races_at_Venice",
+ "",
+ ),
+ ],
+ )
+ @pytest.mark.parametrize("enable_sorting", [0, 1])
+ @pytest.mark.parametrize("category", ["sortme", "nosort", "*"])
+ def test_sorter_get_final_path(
+ self, s_class, enable_sorting, jobname, sort_string, category, result_path, result_setname
+ ):
+ sort_cats = "*, sortme"
+
+ @set_config(
+ {
+ "date_sort_string": sort_string,
+ "date_categories": sort_cats,
+ "enable_date_sorting": enable_sorting,
+ "tv_sort_string": sort_string,
+ "tv_categories": sort_cats,
+ "enable_tv_sorting": enable_sorting,
+ "movie_sort_string": sort_string,
+ "movie_categories": sort_cats,
+ "enable_movie_sorting": enable_sorting,
+ "movie_sort_extra": " CD%1",
+ "movie_extra_folder": 0,
+ "movie_rename_limit": "100M",
+ }
+ )
+ def _func():
+ path = ("/tmp/" if not sys.platform.startswith("win") else "c:\\tmp\\") + os.urandom(4).hex()
+ sorter = s_class(None, jobname, path, category)
+ if bool(enable_sorting) and category in sort_cats:
+ if sys.platform.startswith("win"):
+ assert sorter.get_final_path() == (path + "/" + result_path).replace("/", "\\")
+ assert sorter.filename_set == result_setname.replace("/", "\\")
+ else:
+ assert sorter.get_final_path() == path + "/" + result_path
+ assert sorter.filename_set == result_setname
+ else:
+ if sys.platform.startswith("win"):
+ assert sorter.get_final_path() == (path + "/" + jobname).replace("/", "\\")
+ else:
+ assert sorter.get_final_path() == path + "/" + jobname
+ assert sorter.filename_set == ""
+
+ _func()
+
+ @pytest.mark.parametrize(
+ "s_class, job_tag, sort_string, sort_result", # sort_result without extension
+ [
+ (sorting.SeriesSorter, "S01E02", "%r/%sn s%0se%0e.%ext", "Simulated Job s01e02"),
+ (sorting.MovieSorter, "2021", "%y_%.title.%r.%ext", "2021_Simulated.Job.2160p"),
+ (sorting.DateSorter, "2020-02-29", "%y/%0m/%0d/%.t-%GI", "Simulated.Job-SAB"),
+ ],
+ )
+ @pytest.mark.parametrize("size_limit, file_size", [(512, 1024), (1024, 512)])
+ @pytest.mark.parametrize("extension", [".mkv", ".data", ".mkv", ".vob"])
+ @pytest.mark.parametrize("number_of_files", [1, 2])
+ @pytest.mark.parametrize("generate_sequential_filenames", [True, False])
+ def test_sorter_rename(
+ self,
+ s_class,
+ job_tag,
+ sort_string,
+ sort_result,
+ size_limit,
+ file_size,
+ extension,
+ number_of_files,
+ generate_sequential_filenames,
+ ):
+ """Test the file renaming of the Sorter classes"""
+
+ @set_config(
+ {
+ "tv_sort_string": sort_string, # TV
+ "tv_categories": "*",
+ "enable_tv_sorting": 1,
+ "movie_sort_string": sort_string, # Movie
+ "movie_categories": "*",
+ "enable_movie_sorting": 1,
+ "movie_sort_extra": " CD%1",
+ "movie_extra_folder": 0,
+ "movie_rename_limit": size_limit,
+ "date_sort_string": sort_string, # Date
+ "date_categories": "*",
+ "enable_date_sorting": 1,
+ "episode_rename_limit": size_limit, # TV & Date
+ }
+ )
+ def _func():
+ # Make up a job name
+ job_name = "Simulated.Job." + job_tag + ".2160p.Web.x264-SAB"
+
+ # Prep the filesystem
+ storage_dir = os.path.join(SAB_CACHE_DIR, "complete" + os.urandom(4).hex())
+ try:
+ shutil.rmtree(storage_dir)
+ except FileNotFoundError:
+ pass
+ job_dir = os.path.join(storage_dir, job_name)
+ os.makedirs(job_dir, exist_ok=True)
+ assert os.path.exists(job_dir) is True
+
+ # Create "downloaded" file(s)
+ all_files = []
+ fixed_random = os.urandom(8).hex()
+ for number in range(1, 1 + number_of_files):
+ if not generate_sequential_filenames:
+ job_file = os.urandom(8).hex() + extension
+ else:
+ job_file = fixed_random + ".CD" + str(number) + extension
+ job_filepath = os.path.join(job_dir, job_file)
+ with open(job_filepath, "wb") as f:
+ f.write(os.urandom(file_size))
+ assert os.path.exists(job_filepath) is True
+ all_files.append(job_file)
+
+ # Initialise the sorter and rename
+ sorter = s_class(None, job_name, job_dir, "*", force=True)
+ sorter.get_values()
+ sorter.construct_path()
+ sort_dest, is_ok = sorter.rename(all_files, job_dir, size_limit)
+
+ # Check the result
+ try:
+ if (
+ is_ok
+ and file_size > size_limit
+ and extension not in sorting.EXCLUDED_FILE_EXTS
+ and not (sorter.type == "movie" and number_of_files > 1 and not generate_sequential_filenames)
+ and not (sorter.type != "movie" and number_of_files > 1)
+ ):
+ # File(s) should be renamed
+ if number_of_files > 1 and generate_sequential_filenames and sorter.type == "movie":
+ # Movie sequential file handling
+ for n in range(1, number_of_files + 1):
+ expected = os.path.join(sort_dest, sort_result + " CD" + str(n) + extension)
+ assert os.path.exists(expected)
+ else:
+ expected = os.path.join(sort_dest, sort_result + extension)
+ assert os.path.exists(expected)
+ else:
+ # No renaming should happen
+ expected = os.path.join(sort_dest, job_file)
+ assert os.path.exists(expected)
+ except AssertionError:
+ # Get some insight into what *did* happen and re-raise the error
+ for root, dirs, files in os.walk(sort_dest):
+ print(sort_dest, dirs, files)
+ raise AssertionError()
+
+ # Cleanup
+ try:
+ shutil.rmtree(storage_dir)
+ except FileNotFoundError:
+ pass
+
+ _func()
+
+ @pytest.mark.parametrize(
+ "job_name, result_sort_file, result_class",
+ [
+ ("OGEL.NinjaGo.Masters.of.Jinspitzu.S13.1080p.CN.WEB-DL.AAC2.0.H.264", True, sorting.SeriesSorter),
+ (
+ "The.Hunt.for.Blue.November.1990.NORDiC.REMUX.2160p.DV.HDR.UHD-BluRay.HEVC.TrueHD.5.1-SLoWGoaTS",
+ True,
+ sorting.MovieSorter,
+ ),
+ ("가요무대.1985-11-18.480p.Sat.KorSub", True, sorting.DateSorter),
+ ("Virus.cmd", False, None),
+ ("SABnzbd 0.3.9 DeadyNas Mono (incl. Python2.3).pkg", False, None),
+ ],
+ )
+ def test_sorter_generic(self, job_name, result_sort_file, result_class):
+ """Check if the generic sorter makes the right choices"""
+ generic = sorting.Sorter(None, None)
+ generic.detect(job_name, SAB_CACHE_DIR)
+
+ assert generic.sort_file is result_sort_file
+ if result_sort_file:
+ assert generic.sorter
+ assert generic.sorter.__class__ is result_class
+ else:
+ assert not generic.sorter
+
+ @pytest.mark.parametrize(
+ "name, result",
+ [
+ ("Undrinkable.2010.PROPER", True),
+ ("Undrinkable.2010.EXTENDED.DVDRip.XviD-MoveIt", False),
+ ("The.Choir.S01E02.The.Details.AC3.DVDRip.XviD-AD1100", False),
+ ("The.Choir.S01E02.The.Real.Details.AC3.DVDRip.XviD-AD1100", False),
+ ("The.Choir.S01E02.The.Details.REAL.AC3.DVDRip.XviD-AD1100", True),
+ ("real.steal.2011.dvdrip.xvid.ac3-4lt1n", False),
+ ("The.Stalking.Mad.S88E01.repack.ReaL.PROPER.CONVERT.1080p.WEB.h265-BTS", True),
+ ("The.Stalking.Mad.S88E01.CONVERT.1080p.WEB.h265-BTS", False),
+ ],
+ )
+ def test_sorter_is_proper(self, name, result):
+ """Test the is_proper method of the BaseSorter class"""
+ sorter = sorting.BaseSorter.__new__(sorting.BaseSorter) # Skip __init__
+ sorter.guess = sorting.guess_what(name)
+ assert sorter.is_proper() is result