diff --git a/sabnzbd/filesystem.py b/sabnzbd/filesystem.py index a873154..f908216 100644 --- a/sabnzbd/filesystem.py +++ b/sabnzbd/filesystem.py @@ -549,20 +549,36 @@ DIR_LOCK = threading.RLock() @synchronized(DIR_LOCK) -def create_all_dirs(path, umask=False): +def create_all_dirs(path, apply_umask=False): """ Create all required path elements and set umask on all The umask argument is ignored on Windows Return path if elements could be made or exists """ try: - # Use custom mask if desired - mask = 0o700 - if umask and sabnzbd.cfg.umask(): - mask = int(sabnzbd.cfg.umask(), 8) - - # Use python functions to create the directory - logging.info("Creating directories: %s (mask=%s)", path, mask) - os.makedirs(path, mode=mask, exist_ok=True) + logging.info("Creating directories: %s", path) + if sabnzbd.WIN32: + os.makedirs(path, exist_ok=True) + else: + # We need to build the directory recursively so we can + # apply permissions to only the newly created folders + # We cannot use os.makedirs() to do this as it ignores the mode + try: + # Try the user permissions setting + umask = int(sabnzbd.cfg.umask(), 8) | int("0700", 8) + except: + # Use default + umask = int("0700", 8) + + # Build path from root + path_part_combined = "/" + for path_part in path.split("/"): + if path_part: + path_part_combined = os.path.join(path_part_combined, path_part) + # Only create if it doesn't exist + if not os.path.exists(path_part_combined): + os.mkdir(path_part_combined) + # Try to set permissions, ignore failures + set_chmod(path_part_combined, umask, report=False) return path except OSError: logging.error(T("Failed making (%s)"), clip_path(path), exc_info=True) @@ -582,7 +598,7 @@ def get_unique_path(dirpath, n=0, create_dir=True): if not os.path.exists(path): if create_dir: - return create_all_dirs(path, umask=True) + return create_all_dirs(path, apply_umask=True) else: return path else: @@ -643,7 +659,7 @@ def move_to_path(path, new_path): # Cannot rename, try copying logging.debug("File could not be renamed, trying copying: %s", path) try: - create_all_dirs(os.path.dirname(new_path), umask=True) + create_all_dirs(os.path.dirname(new_path), apply_umask=True) shutil.copyfile(path, new_path) os.remove(path) except: diff --git a/sabnzbd/postproc.py b/sabnzbd/postproc.py index 98bdf59..5923852 100644 --- a/sabnzbd/postproc.py +++ b/sabnzbd/postproc.py @@ -688,7 +688,7 @@ def prepare_extraction_path(nzo): complete_dir = sanitize_and_trim_path(complete_dir) if one_folder: - workdir_complete = create_all_dirs(complete_dir, umask=True) + workdir_complete = create_all_dirs(complete_dir, apply_umask=True) else: workdir_complete = get_unique_path(os.path.join(complete_dir, nzo.final_name), create_dir=True) marker_file = set_marker(workdir_complete) diff --git a/tests/test_filesystem.py b/tests/test_filesystem.py index abf2049..87d84e7 100644 --- a/tests/test_filesystem.py +++ b/tests/test_filesystem.py @@ -194,7 +194,7 @@ class TestSameFile: assert 0 == filesystem.same_file("/test/../home", "/test") assert 0 == filesystem.same_file("/test/./test", "/test") - @pytest.mark.skipif(sys.platform.startswith("win"), reason="Not for Windows") + @pytest.mark.skipif(sys.platform.startswith("win"), reason="Non-Windows tests") @set_platform("linux") def test_posix_fun(self): assert 1 == filesystem.same_file("/test", "/test") @@ -302,7 +302,7 @@ class TestClipLongPath: assert filesystem.long_path("/test/dir") == "/test/dir" -@pytest.mark.skipif(sys.platform.startswith("win"), reason="Broken on Windows") +@pytest.mark.skipif(sys.platform.startswith("win"), reason="Non-Windows tests") class TestCheckMountLinux(ffs.TestCase): # Our collection of fake directories test_dirs = ["/media/test/dir", "/mnt/TEST/DIR"] @@ -350,7 +350,7 @@ class TestCheckMountLinux(ffs.TestCase): assert filesystem.check_mount("/") is True -@pytest.mark.skipif(sys.platform.startswith("win"), reason="Broken on Windows") +@pytest.mark.skipif(sys.platform.startswith("win"), reason="Non-Windows tests") class TestCheckMountDarwin(ffs.TestCase): # Our faked macos directory test_dir = "/Volumes/test/dir" @@ -458,7 +458,7 @@ class TestTrimWinPath: assert filesystem.trim_win_path(test_path + "\\" + ("D" * 20)) == test_path + "\\" + "D" * 3 -@pytest.mark.skipif(sys.platform.startswith("win"), reason="Broken on Windows") +@pytest.mark.skipif(sys.platform.startswith("win"), reason="Non-Windows tests") class TestListdirFull(ffs.TestCase): # Basic fake filesystem setup stanza def setUp(self): @@ -539,7 +539,6 @@ class TestListdirFull(ffs.TestCase): class TestListdirFullWin(ffs.TestCase): # Basic fake filesystem setup stanza - @set_platform("win32") def setUp(self): self.setUpPyfakefs() self.fs.is_windows_fs = True @@ -617,7 +616,7 @@ class TestListdirFullWin(ffs.TestCase): assert filesystem.listdir_full(test_file) == [] -@pytest.mark.skipif(sys.platform.startswith("win"), reason="Broken on Windows") +@pytest.mark.skipif(sys.platform.startswith("win"), reason="Non-Windows tests") class TestGetUniquePathFilename(ffs.TestCase): # Basic fake filesystem setup stanza def setUp(self): @@ -675,9 +674,9 @@ class TestGetUniquePathFilename(ffs.TestCase): assert filesystem.get_unique_filename(test_file) == "/some/filename.1" +@pytest.mark.skipif(not sys.platform.startswith("win"), reason="Windows specific tests") class TestGetUniquePathFilenameWin(ffs.TestCase): # Basic fake filesystem setup stanza - @set_platform("win32") def setUp(self): self.setUpPyfakefs() self.fs.is_windows_fs = True @@ -730,6 +729,77 @@ class TestGetUniquePathFilenameWin(ffs.TestCase): assert filesystem.get_unique_filename(test_file).lower() == r"c:\some\filename.1" +class TestCreateAllDirsWin(ffs.TestCase): + # Basic fake filesystem setup stanza + def setUp(self): + self.setUpPyfakefs() + self.fs.is_windows_fs = True + self.fs.path_separator = "\\" + self.fs.is_case_sensitive = False + + @set_platform("win32") + def test_create_all_dirs(self): + self.dir = self.fs.create_dir(r"C:\Downloads") + # Also test for no crash when folder already exists + for folder in (r"C:\Downloads", r"C:\Downloads\Show\Test", r"C:\Downloads\Show\Test2", r"C:\Downloads\Show"): + assert filesystem.create_all_dirs(folder) == folder + assert os.path.exists(folder) + + +class PermissionCheckerHelper: + @staticmethod + def assert_dir_perms(path, expected_perms): + assert stat.filemode(os.stat(path).st_mode) == "d" + stat.filemode(expected_perms)[1:] + + +@pytest.mark.skipif(sys.platform.startswith("win"), reason="Non-Windows tests") +class TestCreateAllDirs(ffs.TestCase, PermissionCheckerHelper): + def setUp(self): + self.setUpPyfakefs() + self.fs.path_separator = "/" + self.fs.is_case_sensitive = True + + def test_basic_folder_creation(self): + self.fs.create_dir("/test_base") + # Also test for no crash when folder already exists + for folder in ("/test_base", "/test_base/show/season 1/episode 1", "/test_base/show"): + assert filesystem.create_all_dirs(folder) == folder + assert os.path.exists(folder) + + @set_config({"umask": "0777"}) + def test_permissions_777(self): + self._permissions_runner("/test_base777", "/test_base777/se 1/ep 1", "0700") + + @set_config({"umask": "0770"}) + def test_permissions_770(self): + self._permissions_runner("/test_base770", "/test_base770/se 1/ep 1", "0700") + + @set_config({"umask": "0600"}) + def test_permissions_600(self): + self._permissions_runner("/test_base600", "/test_base600/se 1/ep 1", "0700") + + @set_config({"umask": "0700"}) + def test_permissions_450(self): + with pytest.raises(OSError): + self._permissions_runner("/test_base_450", "/test_base_450/se 1/ep 1", "0450") + + def _permissions_runner(self, test_base, new_dir, perms_base): + # Create base directory and set the base permissions + perms_base_int = int(perms_base, 8) + self.fs.create_dir(test_base, perms_base_int) + assert os.path.exists(test_base) is True + self.assert_dir_perms(test_base, perms_base_int) + + # Create directories with permissions + filesystem.create_all_dirs(new_dir, apply_umask=True) + + # If permissions needed to be set, verify the new folder has the + # right permissions and verify the base didn't change + perms_test_int = int(cfg.umask(), 8) | int("0700", 8) + self.assert_dir_perms(new_dir, perms_test_int) + self.assert_dir_perms(test_base, perms_base_int) + + class TestSetPermissionsWin(ffs.TestCase): @set_platform("win32") def test_win32(self): @@ -737,8 +807,8 @@ class TestSetPermissionsWin(ffs.TestCase): assert filesystem.set_permissions(r"F:\who\cares", recursive=False) is None -@pytest.mark.skipif(sys.platform.startswith("win"), reason="Broken on Windows") -class TestSetPermissions(ffs.TestCase): +@pytest.mark.skipif(sys.platform.startswith("win"), reason="Non-Windows tests") +class TestSetPermissions(ffs.TestCase, PermissionCheckerHelper): # Basic fake filesystem setup stanza def setUp(self): self.setUpPyfakefs() @@ -771,7 +841,7 @@ class TestSetPermissions(ffs.TestCase): ffs.set_uid(0) self.fs.create_dir(test_dir, perms_test) assert os.path.exists(test_dir) is True - assert stat.filemode(os.stat(test_dir).st_mode) == "d" + stat.filemode(perms_test)[1:] + self.assert_dir_perms(test_dir, perms_test) # Setup and verify fake files for file in ( @@ -800,7 +870,7 @@ class TestSetPermissions(ffs.TestCase): for root, dirs, files in os.walk(test_dir): for dir in [os.path.join(root, d) for d in dirs]: # Permissions on directories should now match perms_after - assert stat.filemode(os.stat(dir).st_mode) == "d" + stat.filemode(perms_after)[1:] + self.assert_dir_perms(dir, perms_after) for file in [os.path.join(root, f) for f in files]: # Files also shouldn't have any executable or special bits set assert (