Browse Source

Permissions were not set correctly when creating directories (#1568)

Restores changes made in d2e0ebe
tags/3.0.1RC1
Safihre 5 years ago
parent
commit
050b925f7b
  1. 38
      sabnzbd/filesystem.py
  2. 2
      sabnzbd/postproc.py
  3. 92
      tests/test_filesystem.py

38
sabnzbd/filesystem.py

@ -549,20 +549,36 @@ DIR_LOCK = threading.RLock()
@synchronized(DIR_LOCK) @synchronized(DIR_LOCK)
def create_all_dirs(path, umask=False): def create_all_dirs(path, apply_umask=False):
""" Create all required path elements and set umask on all """ Create all required path elements and set umask on all
The umask argument is ignored on Windows The umask argument is ignored on Windows
Return path if elements could be made or exists Return path if elements could be made or exists
""" """
try: try:
# Use custom mask if desired logging.info("Creating directories: %s", path)
mask = 0o700 if sabnzbd.WIN32:
if umask and sabnzbd.cfg.umask(): os.makedirs(path, exist_ok=True)
mask = int(sabnzbd.cfg.umask(), 8) else:
# We need to build the directory recursively so we can
# Use python functions to create the directory # apply permissions to only the newly created folders
logging.info("Creating directories: %s (mask=%s)", path, mask) # We cannot use os.makedirs() to do this as it ignores the mode
os.makedirs(path, mode=mask, exist_ok=True) try:
# Try the user permissions setting
umask = int(sabnzbd.cfg.umask(), 8) | int("0700", 8)
except:
# Use default
umask = int("0700", 8)
# Build path from root
path_part_combined = "/"
for path_part in path.split("/"):
if path_part:
path_part_combined = os.path.join(path_part_combined, path_part)
# Only create if it doesn't exist
if not os.path.exists(path_part_combined):
os.mkdir(path_part_combined)
# Try to set permissions, ignore failures
set_chmod(path_part_combined, umask, report=False)
return path return path
except OSError: except OSError:
logging.error(T("Failed making (%s)"), clip_path(path), exc_info=True) logging.error(T("Failed making (%s)"), clip_path(path), exc_info=True)
@ -582,7 +598,7 @@ def get_unique_path(dirpath, n=0, create_dir=True):
if not os.path.exists(path): if not os.path.exists(path):
if create_dir: if create_dir:
return create_all_dirs(path, umask=True) return create_all_dirs(path, apply_umask=True)
else: else:
return path return path
else: else:
@ -643,7 +659,7 @@ def move_to_path(path, new_path):
# Cannot rename, try copying # Cannot rename, try copying
logging.debug("File could not be renamed, trying copying: %s", path) logging.debug("File could not be renamed, trying copying: %s", path)
try: try:
create_all_dirs(os.path.dirname(new_path), umask=True) create_all_dirs(os.path.dirname(new_path), apply_umask=True)
shutil.copyfile(path, new_path) shutil.copyfile(path, new_path)
os.remove(path) os.remove(path)
except: except:

2
sabnzbd/postproc.py

@ -688,7 +688,7 @@ def prepare_extraction_path(nzo):
complete_dir = sanitize_and_trim_path(complete_dir) complete_dir = sanitize_and_trim_path(complete_dir)
if one_folder: if one_folder:
workdir_complete = create_all_dirs(complete_dir, umask=True) workdir_complete = create_all_dirs(complete_dir, apply_umask=True)
else: else:
workdir_complete = get_unique_path(os.path.join(complete_dir, nzo.final_name), create_dir=True) workdir_complete = get_unique_path(os.path.join(complete_dir, nzo.final_name), create_dir=True)
marker_file = set_marker(workdir_complete) marker_file = set_marker(workdir_complete)

92
tests/test_filesystem.py

@ -194,7 +194,7 @@ class TestSameFile:
assert 0 == filesystem.same_file("/test/../home", "/test") assert 0 == filesystem.same_file("/test/../home", "/test")
assert 0 == filesystem.same_file("/test/./test", "/test") assert 0 == filesystem.same_file("/test/./test", "/test")
@pytest.mark.skipif(sys.platform.startswith("win"), reason="Not for Windows") @pytest.mark.skipif(sys.platform.startswith("win"), reason="Non-Windows tests")
@set_platform("linux") @set_platform("linux")
def test_posix_fun(self): def test_posix_fun(self):
assert 1 == filesystem.same_file("/test", "/test") assert 1 == filesystem.same_file("/test", "/test")
@ -302,7 +302,7 @@ class TestClipLongPath:
assert filesystem.long_path("/test/dir") == "/test/dir" assert filesystem.long_path("/test/dir") == "/test/dir"
@pytest.mark.skipif(sys.platform.startswith("win"), reason="Broken on Windows") @pytest.mark.skipif(sys.platform.startswith("win"), reason="Non-Windows tests")
class TestCheckMountLinux(ffs.TestCase): class TestCheckMountLinux(ffs.TestCase):
# Our collection of fake directories # Our collection of fake directories
test_dirs = ["/media/test/dir", "/mnt/TEST/DIR"] test_dirs = ["/media/test/dir", "/mnt/TEST/DIR"]
@ -350,7 +350,7 @@ class TestCheckMountLinux(ffs.TestCase):
assert filesystem.check_mount("/") is True assert filesystem.check_mount("/") is True
@pytest.mark.skipif(sys.platform.startswith("win"), reason="Broken on Windows") @pytest.mark.skipif(sys.platform.startswith("win"), reason="Non-Windows tests")
class TestCheckMountDarwin(ffs.TestCase): class TestCheckMountDarwin(ffs.TestCase):
# Our faked macos directory # Our faked macos directory
test_dir = "/Volumes/test/dir" test_dir = "/Volumes/test/dir"
@ -458,7 +458,7 @@ class TestTrimWinPath:
assert filesystem.trim_win_path(test_path + "\\" + ("D" * 20)) == test_path + "\\" + "D" * 3 assert filesystem.trim_win_path(test_path + "\\" + ("D" * 20)) == test_path + "\\" + "D" * 3
@pytest.mark.skipif(sys.platform.startswith("win"), reason="Broken on Windows") @pytest.mark.skipif(sys.platform.startswith("win"), reason="Non-Windows tests")
class TestListdirFull(ffs.TestCase): class TestListdirFull(ffs.TestCase):
# Basic fake filesystem setup stanza # Basic fake filesystem setup stanza
def setUp(self): def setUp(self):
@ -539,7 +539,6 @@ class TestListdirFull(ffs.TestCase):
class TestListdirFullWin(ffs.TestCase): class TestListdirFullWin(ffs.TestCase):
# Basic fake filesystem setup stanza # Basic fake filesystem setup stanza
@set_platform("win32")
def setUp(self): def setUp(self):
self.setUpPyfakefs() self.setUpPyfakefs()
self.fs.is_windows_fs = True self.fs.is_windows_fs = True
@ -617,7 +616,7 @@ class TestListdirFullWin(ffs.TestCase):
assert filesystem.listdir_full(test_file) == [] assert filesystem.listdir_full(test_file) == []
@pytest.mark.skipif(sys.platform.startswith("win"), reason="Broken on Windows") @pytest.mark.skipif(sys.platform.startswith("win"), reason="Non-Windows tests")
class TestGetUniquePathFilename(ffs.TestCase): class TestGetUniquePathFilename(ffs.TestCase):
# Basic fake filesystem setup stanza # Basic fake filesystem setup stanza
def setUp(self): def setUp(self):
@ -675,9 +674,9 @@ class TestGetUniquePathFilename(ffs.TestCase):
assert filesystem.get_unique_filename(test_file) == "/some/filename.1" assert filesystem.get_unique_filename(test_file) == "/some/filename.1"
@pytest.mark.skipif(not sys.platform.startswith("win"), reason="Windows specific tests")
class TestGetUniquePathFilenameWin(ffs.TestCase): class TestGetUniquePathFilenameWin(ffs.TestCase):
# Basic fake filesystem setup stanza # Basic fake filesystem setup stanza
@set_platform("win32")
def setUp(self): def setUp(self):
self.setUpPyfakefs() self.setUpPyfakefs()
self.fs.is_windows_fs = True self.fs.is_windows_fs = True
@ -730,6 +729,77 @@ class TestGetUniquePathFilenameWin(ffs.TestCase):
assert filesystem.get_unique_filename(test_file).lower() == r"c:\some\filename.1" assert filesystem.get_unique_filename(test_file).lower() == r"c:\some\filename.1"
class TestCreateAllDirsWin(ffs.TestCase):
# Basic fake filesystem setup stanza
def setUp(self):
self.setUpPyfakefs()
self.fs.is_windows_fs = True
self.fs.path_separator = "\\"
self.fs.is_case_sensitive = False
@set_platform("win32")
def test_create_all_dirs(self):
self.dir = self.fs.create_dir(r"C:\Downloads")
# Also test for no crash when folder already exists
for folder in (r"C:\Downloads", r"C:\Downloads\Show\Test", r"C:\Downloads\Show\Test2", r"C:\Downloads\Show"):
assert filesystem.create_all_dirs(folder) == folder
assert os.path.exists(folder)
class PermissionCheckerHelper:
@staticmethod
def assert_dir_perms(path, expected_perms):
assert stat.filemode(os.stat(path).st_mode) == "d" + stat.filemode(expected_perms)[1:]
@pytest.mark.skipif(sys.platform.startswith("win"), reason="Non-Windows tests")
class TestCreateAllDirs(ffs.TestCase, PermissionCheckerHelper):
def setUp(self):
self.setUpPyfakefs()
self.fs.path_separator = "/"
self.fs.is_case_sensitive = True
def test_basic_folder_creation(self):
self.fs.create_dir("/test_base")
# Also test for no crash when folder already exists
for folder in ("/test_base", "/test_base/show/season 1/episode 1", "/test_base/show"):
assert filesystem.create_all_dirs(folder) == folder
assert os.path.exists(folder)
@set_config({"umask": "0777"})
def test_permissions_777(self):
self._permissions_runner("/test_base777", "/test_base777/se 1/ep 1", "0700")
@set_config({"umask": "0770"})
def test_permissions_770(self):
self._permissions_runner("/test_base770", "/test_base770/se 1/ep 1", "0700")
@set_config({"umask": "0600"})
def test_permissions_600(self):
self._permissions_runner("/test_base600", "/test_base600/se 1/ep 1", "0700")
@set_config({"umask": "0700"})
def test_permissions_450(self):
with pytest.raises(OSError):
self._permissions_runner("/test_base_450", "/test_base_450/se 1/ep 1", "0450")
def _permissions_runner(self, test_base, new_dir, perms_base):
# Create base directory and set the base permissions
perms_base_int = int(perms_base, 8)
self.fs.create_dir(test_base, perms_base_int)
assert os.path.exists(test_base) is True
self.assert_dir_perms(test_base, perms_base_int)
# Create directories with permissions
filesystem.create_all_dirs(new_dir, apply_umask=True)
# If permissions needed to be set, verify the new folder has the
# right permissions and verify the base didn't change
perms_test_int = int(cfg.umask(), 8) | int("0700", 8)
self.assert_dir_perms(new_dir, perms_test_int)
self.assert_dir_perms(test_base, perms_base_int)
class TestSetPermissionsWin(ffs.TestCase): class TestSetPermissionsWin(ffs.TestCase):
@set_platform("win32") @set_platform("win32")
def test_win32(self): def test_win32(self):
@ -737,8 +807,8 @@ class TestSetPermissionsWin(ffs.TestCase):
assert filesystem.set_permissions(r"F:\who\cares", recursive=False) is None assert filesystem.set_permissions(r"F:\who\cares", recursive=False) is None
@pytest.mark.skipif(sys.platform.startswith("win"), reason="Broken on Windows") @pytest.mark.skipif(sys.platform.startswith("win"), reason="Non-Windows tests")
class TestSetPermissions(ffs.TestCase): class TestSetPermissions(ffs.TestCase, PermissionCheckerHelper):
# Basic fake filesystem setup stanza # Basic fake filesystem setup stanza
def setUp(self): def setUp(self):
self.setUpPyfakefs() self.setUpPyfakefs()
@ -771,7 +841,7 @@ class TestSetPermissions(ffs.TestCase):
ffs.set_uid(0) ffs.set_uid(0)
self.fs.create_dir(test_dir, perms_test) self.fs.create_dir(test_dir, perms_test)
assert os.path.exists(test_dir) is True assert os.path.exists(test_dir) is True
assert stat.filemode(os.stat(test_dir).st_mode) == "d" + stat.filemode(perms_test)[1:] self.assert_dir_perms(test_dir, perms_test)
# Setup and verify fake files # Setup and verify fake files
for file in ( for file in (
@ -800,7 +870,7 @@ class TestSetPermissions(ffs.TestCase):
for root, dirs, files in os.walk(test_dir): for root, dirs, files in os.walk(test_dir):
for dir in [os.path.join(root, d) for d in dirs]: for dir in [os.path.join(root, d) for d in dirs]:
# Permissions on directories should now match perms_after # Permissions on directories should now match perms_after
assert stat.filemode(os.stat(dir).st_mode) == "d" + stat.filemode(perms_after)[1:] self.assert_dir_perms(dir, perms_after)
for file in [os.path.join(root, f) for f in files]: for file in [os.path.join(root, f) for f in files]:
# Files also shouldn't have any executable or special bits set # Files also shouldn't have any executable or special bits set
assert ( assert (

Loading…
Cancel
Save