|
|
@ -24,8 +24,8 @@ import traceback |
|
|
|
|
|
|
|
from exceptions_helper import ex, ConnectionSkipException |
|
|
|
from lib.cachecontrol import CacheControl, caches |
|
|
|
from lib.tmdbsimple.configuration import Configuration |
|
|
|
from lib.tmdbsimple.genres import Genres |
|
|
|
# from lib.tmdbsimple.configuration import Configuration |
|
|
|
# from lib.tmdbsimple.genres import Genres |
|
|
|
from cfscrape import CloudflareScraper |
|
|
|
from send2trash import send2trash |
|
|
|
|
|
|
@ -1043,107 +1043,63 @@ def save_failure(url, domain, log_failure_url, post_data, post_json): |
|
|
|
_log_failure_url(url, post_data, post_json) |
|
|
|
|
|
|
|
|
|
|
|
def file_bit_filter(mode): |
|
|
|
for bit in [stat.S_IXUSR, stat.S_IXGRP, stat.S_IXOTH, stat.S_ISUID, stat.S_ISGID]: |
|
|
|
if mode & bit: |
|
|
|
mode -= bit |
|
|
|
|
|
|
|
return mode |
|
|
|
|
|
|
|
|
|
|
|
def chmod_as_parent(child_path): |
|
|
|
""" |
|
|
|
def scantree(path, # type: AnyStr |
|
|
|
exclude=None, # type: Optional[AnyStr, List[AnyStr]] |
|
|
|
include=None, # type: Optional[AnyStr, List[AnyStr]] |
|
|
|
follow_symlinks=False, # type: bool |
|
|
|
filter_kind=None, # type: Optional[bool] |
|
|
|
recurse=True # type: bool |
|
|
|
): |
|
|
|
# type: (...) -> Generator[DirEntry, None, None] |
|
|
|
"""Yield DirEntry objects for given path. Returns without yield if path fails sanity check |
|
|
|
|
|
|
|
:param child_path: path |
|
|
|
:type child_path: AnyStr |
|
|
|
:return: |
|
|
|
:rtype: None |
|
|
|
:param path: Path to scan, sanity check is_dir and exists |
|
|
|
:param exclude: Escaped regex string(s) to exclude |
|
|
|
:param include: Escaped regex string(s) to include |
|
|
|
:param follow_symlinks: Follow symlinks |
|
|
|
:param filter_kind: None to yield everything, True yields directories, False yields files |
|
|
|
:param recurse: Recursively scan the tree |
|
|
|
""" |
|
|
|
if os.name in ('nt', 'ce'): |
|
|
|
return |
|
|
|
|
|
|
|
parent_path = ek.ek(os.path.dirname, child_path) |
|
|
|
|
|
|
|
if not parent_path: |
|
|
|
logger.debug(u'No parent path provided in %s, unable to get permissions from it' % child_path) |
|
|
|
return |
|
|
|
|
|
|
|
parent_path_stat = ek.ek(os.stat, parent_path) |
|
|
|
parent_mode = stat.S_IMODE(parent_path_stat[stat.ST_MODE]) |
|
|
|
if isinstance(path, string_types) and path and ek.ek(os.path.isdir, path): |
|
|
|
rc_exc, rc_inc = [re.compile(rx % '|'.join( |
|
|
|
[x for x in (param, ([param], [])[None is param])[not isinstance(param, list)]])) |
|
|
|
for rx, param in ((r'(?i)^(?:(?!%s).)*$', exclude), (r'(?i)%s', include))] |
|
|
|
for entry in ek.ek(scandir, path): |
|
|
|
is_dir = entry.is_dir(follow_symlinks=follow_symlinks) |
|
|
|
is_file = entry.is_file(follow_symlinks=follow_symlinks) |
|
|
|
no_filter = any([None is filter_kind, filter_kind and is_dir, not filter_kind and is_file]) |
|
|
|
if (rc_exc.search(entry.name), True)[not exclude] and (rc_inc.search(entry.name), True)[not include] \ |
|
|
|
and (no_filter or (not filter_kind and is_dir and recurse)): |
|
|
|
if recurse and is_dir: |
|
|
|
for subentry in scantree(entry.path, exclude, include, follow_symlinks, filter_kind, recurse): |
|
|
|
yield subentry |
|
|
|
if no_filter: |
|
|
|
yield entry |
|
|
|
|
|
|
|
child_path_stat = ek.ek(os.stat, child_path) |
|
|
|
child_path_mode = stat.S_IMODE(child_path_stat[stat.ST_MODE]) |
|
|
|
|
|
|
|
if ek.ek(os.path.isfile, child_path): |
|
|
|
child_mode = file_bit_filter(parent_mode) |
|
|
|
def copy_file(src_file, dest_file): |
|
|
|
if os.name.startswith('posix'): |
|
|
|
ek.ek(subprocess.call, ['cp', src_file, dest_file]) |
|
|
|
else: |
|
|
|
child_mode = parent_mode |
|
|
|
|
|
|
|
if child_path_mode == child_mode: |
|
|
|
return |
|
|
|
|
|
|
|
child_path_owner = child_path_stat.st_uid |
|
|
|
user_id = os.geteuid() # only available on UNIX |
|
|
|
|
|
|
|
if 0 != user_id and user_id != child_path_owner: |
|
|
|
logger.debug(u'Not running as root or owner of %s, not trying to set permissions' % child_path) |
|
|
|
return |
|
|
|
ek.ek(shutil.copyfile, src_file, dest_file) |
|
|
|
|
|
|
|
try: |
|
|
|
ek.ek(os.chmod, child_path, child_mode) |
|
|
|
logger.debug(u'Setting permissions for %s to %o as parent directory has %o' |
|
|
|
% (child_path, child_mode, parent_mode)) |
|
|
|
ek.ek(shutil.copymode, src_file, dest_file) |
|
|
|
except OSError: |
|
|
|
logger.error(u'Failed to set permission for %s to %o' % (child_path, child_mode)) |
|
|
|
|
|
|
|
|
|
|
|
def make_dirs(path, syno=False): |
|
|
|
""" |
|
|
|
Creates any folders that are missing and assigns them the permissions of their |
|
|
|
parents |
|
|
|
:param path: path |
|
|
|
:type path: AnyStr |
|
|
|
:param syno: whether to trigger a syno library update for path |
|
|
|
:type syno: bool |
|
|
|
:return: success |
|
|
|
:rtype: bool |
|
|
|
""" |
|
|
|
if not ek.ek(os.path.isdir, path): |
|
|
|
# Windows, create all missing folders |
|
|
|
if os.name in ('nt', 'ce'): |
|
|
|
try: |
|
|
|
logger.debug(u'Path %s doesn\'t exist, creating it' % path) |
|
|
|
ek.ek(os.makedirs, path) |
|
|
|
except (OSError, IOError) as e: |
|
|
|
logger.error(u'Failed creating %s : %s' % (path, ex(e))) |
|
|
|
return False |
|
|
|
|
|
|
|
# not Windows, create all missing folders and set permissions |
|
|
|
else: |
|
|
|
sofar = '' |
|
|
|
folder_list = path.split(os.path.sep) |
|
|
|
|
|
|
|
# look through each sub folder and make sure they all exist |
|
|
|
for cur_folder in folder_list: |
|
|
|
sofar += cur_folder + os.path.sep |
|
|
|
pass |
|
|
|
|
|
|
|
# if it exists then just keep walking down the line |
|
|
|
if ek.ek(os.path.isdir, sofar): |
|
|
|
continue |
|
|
|
|
|
|
|
def move_file(src_file, dest_file, raise_exceptions=False): |
|
|
|
try: |
|
|
|
logger.debug(u'Path %s doesn\'t exist, creating it' % sofar) |
|
|
|
ek.ek(os.mkdir, sofar) |
|
|
|
# use normpath to remove end separator, otherwise checks permissions against itself |
|
|
|
chmod_as_parent(ek.ek(os.path.normpath, sofar)) |
|
|
|
if syno: |
|
|
|
# do the library update for synoindex |
|
|
|
NOTIFIERS.NotifierFactory().get('SYNOINDEX').addFolder(sofar) |
|
|
|
except (OSError, IOError) as e: |
|
|
|
logger.error(u'Failed creating %s : %s' % (sofar, ex(e))) |
|
|
|
return False |
|
|
|
|
|
|
|
return True |
|
|
|
ek.ek(shutil.move, src_file, dest_file) |
|
|
|
fix_set_group_id(dest_file) |
|
|
|
except OSError: |
|
|
|
copy_file(src_file, dest_file) |
|
|
|
if ek.ek(os.path.exists, dest_file): |
|
|
|
fix_set_group_id(dest_file) |
|
|
|
ek.ek(os.unlink, src_file) |
|
|
|
elif raise_exceptions: |
|
|
|
raise OSError('Destination file could not be created: %s' % dest_file) |
|
|
|
|
|
|
|
|
|
|
|
def fix_set_group_id(child_path): |
|
|
@ -1184,31 +1140,6 @@ def fix_set_group_id(child_path): |
|
|
|
% (child_path, parent_gid)) |
|
|
|
|
|
|
|
|
|
|
|
def copy_file(src_file, dest_file): |
|
|
|
if os.name.startswith('posix'): |
|
|
|
ek.ek(subprocess.call, ['cp', src_file, dest_file]) |
|
|
|
else: |
|
|
|
ek.ek(shutil.copyfile, src_file, dest_file) |
|
|
|
|
|
|
|
try: |
|
|
|
ek.ek(shutil.copymode, src_file, dest_file) |
|
|
|
except OSError: |
|
|
|
pass |
|
|
|
|
|
|
|
|
|
|
|
def move_file(src_file, dest_file, raise_exceptions=False): |
|
|
|
try: |
|
|
|
ek.ek(shutil.move, src_file, dest_file) |
|
|
|
fix_set_group_id(dest_file) |
|
|
|
except OSError: |
|
|
|
copy_file(src_file, dest_file) |
|
|
|
if ek.ek(os.path.exists, dest_file): |
|
|
|
fix_set_group_id(dest_file) |
|
|
|
ek.ek(os.unlink, src_file) |
|
|
|
elif raise_exceptions: |
|
|
|
raise OSError('Destination file could not be created: %s' % dest_file) |
|
|
|
|
|
|
|
|
|
|
|
def remove_file_perm(filepath, log_err=True): |
|
|
|
# type: (AnyStr, Optional[bool]) -> Optional[bool] |
|
|
|
""" |
|
|
@ -1277,20 +1208,136 @@ def remove_file(filepath, tree=False, prefix_failure='', log_level=logging.INFO) |
|
|
|
return (None, result)[filepath and not ek.ek(os.path.exists, filepath)] |
|
|
|
|
|
|
|
|
|
|
|
def replace_extension(filename, new_ext): |
|
|
|
def touch_file(name, atime=None, dir_name=None): |
|
|
|
# type: (AnyStr, int, AnyStr) -> bool |
|
|
|
""" |
|
|
|
create an empty named file and set access time of file |
|
|
|
|
|
|
|
:param filename: filename |
|
|
|
:type filename: AnyStr |
|
|
|
:param new_ext: new extension |
|
|
|
:type new_ext: AnyStr |
|
|
|
:return: filename with new extension |
|
|
|
:rtype: AnyStr |
|
|
|
:param name: filename to touch with time |
|
|
|
:param atime: access time as epoch |
|
|
|
:param dir_name: create empty file and directory if file doesn't exist |
|
|
|
:return: success |
|
|
|
""" |
|
|
|
sepFile = filename.rpartition('.') |
|
|
|
if sepFile[0] == '': |
|
|
|
return filename |
|
|
|
return sepFile[0] + '.' + new_ext |
|
|
|
if None is not dir_name: |
|
|
|
name = ek.ek(os.path.join, dir_name, name) |
|
|
|
if make_path(dir_name): |
|
|
|
if not ek.ek(os.path.exists, name): |
|
|
|
with io.open(name, 'w') as fh: |
|
|
|
fh.flush() |
|
|
|
if None is atime: |
|
|
|
return True |
|
|
|
|
|
|
|
if None is not atime: |
|
|
|
try: |
|
|
|
with open(name, 'a'): |
|
|
|
ek.ek(os.utime, name, (atime, atime)) |
|
|
|
return True |
|
|
|
except (BaseException, Exception): |
|
|
|
logger.debug('File air date stamping not available on your OS') |
|
|
|
|
|
|
|
return False |
|
|
|
|
|
|
|
|
|
|
|
def make_path(name, syno=False): |
|
|
|
# type: (AnyStr, bool) -> bool |
|
|
|
""" |
|
|
|
Create an entire directory path and |
|
|
|
assign each subdir the parent permission |
|
|
|
|
|
|
|
:param name: directory to create |
|
|
|
:param syno: whether to trigger a syno library update for path |
|
|
|
:return: success or dir exists |
|
|
|
""" |
|
|
|
if not ek.ek(os.path.isdir, name): |
|
|
|
# Windows, create all missing folders |
|
|
|
if os.name in ('nt', 'ce'): |
|
|
|
try: |
|
|
|
logger.debug(u'Path %s doesn\'t exist, creating it' % name) |
|
|
|
ek.ek(os.makedirs, name) |
|
|
|
except (OSError, IOError) as e: |
|
|
|
logger.error(u'Failed creating %s : %s' % (name, ex(e))) |
|
|
|
return False |
|
|
|
|
|
|
|
# not Windows, create all missing folders and set permissions |
|
|
|
else: |
|
|
|
sofar = '' |
|
|
|
folder_list = name.split(os.path.sep) |
|
|
|
|
|
|
|
# look through each sub folder and make sure they all exist |
|
|
|
for cur_folder in folder_list: |
|
|
|
sofar += cur_folder + os.path.sep |
|
|
|
|
|
|
|
# if it exists then just keep walking down the line |
|
|
|
if ek.ek(os.path.isdir, sofar): |
|
|
|
continue |
|
|
|
|
|
|
|
try: |
|
|
|
logger.debug(u'Path %s doesn\'t exist, creating it' % sofar) |
|
|
|
ek.ek(os.mkdir, sofar) |
|
|
|
# use normpath to remove end separator, otherwise checks permissions against itself |
|
|
|
chmod_as_parent(ek.ek(os.path.normpath, sofar)) |
|
|
|
if syno: |
|
|
|
# do the library update for synoindex |
|
|
|
NOTIFIERS.NotifierFactory().get('SYNOINDEX').addFolder(sofar) |
|
|
|
except (OSError, IOError) as e: |
|
|
|
logger.error(u'Failed creating %s : %s' % (sofar, ex(e))) |
|
|
|
return False |
|
|
|
|
|
|
|
return True |
|
|
|
|
|
|
|
|
|
|
|
def chmod_as_parent(child_path): |
|
|
|
""" |
|
|
|
|
|
|
|
:param child_path: path |
|
|
|
:type child_path: AnyStr |
|
|
|
:return: |
|
|
|
:rtype: None |
|
|
|
""" |
|
|
|
if os.name in ('nt', 'ce'): |
|
|
|
return |
|
|
|
|
|
|
|
parent_path = ek.ek(os.path.dirname, child_path) |
|
|
|
|
|
|
|
if not parent_path: |
|
|
|
logger.debug(u'No parent path provided in %s, unable to get permissions from it' % child_path) |
|
|
|
return |
|
|
|
|
|
|
|
parent_path_stat = ek.ek(os.stat, parent_path) |
|
|
|
parent_mode = stat.S_IMODE(parent_path_stat[stat.ST_MODE]) |
|
|
|
|
|
|
|
child_path_stat = ek.ek(os.stat, child_path) |
|
|
|
child_path_mode = stat.S_IMODE(child_path_stat[stat.ST_MODE]) |
|
|
|
|
|
|
|
if ek.ek(os.path.isfile, child_path): |
|
|
|
child_mode = file_bit_filter(parent_mode) |
|
|
|
else: |
|
|
|
child_mode = parent_mode |
|
|
|
|
|
|
|
if child_path_mode == child_mode: |
|
|
|
return |
|
|
|
|
|
|
|
child_path_owner = child_path_stat.st_uid |
|
|
|
user_id = os.geteuid() # only available on UNIX |
|
|
|
|
|
|
|
if 0 != user_id and user_id != child_path_owner: |
|
|
|
logger.debug(u'Not running as root or owner of %s, not trying to set permissions' % child_path) |
|
|
|
return |
|
|
|
|
|
|
|
try: |
|
|
|
ek.ek(os.chmod, child_path, child_mode) |
|
|
|
logger.debug(u'Setting permissions for %s to %o as parent directory has %o' |
|
|
|
% (child_path, child_mode, parent_mode)) |
|
|
|
except OSError: |
|
|
|
logger.error(u'Failed to set permission for %s to %o' % (child_path, child_mode)) |
|
|
|
|
|
|
|
|
|
|
|
def file_bit_filter(mode): |
|
|
|
for bit in [stat.S_IXUSR, stat.S_IXGRP, stat.S_IXOTH, stat.S_ISUID, stat.S_ISGID]: |
|
|
|
if mode & bit: |
|
|
|
mode -= bit |
|
|
|
|
|
|
|
return mode |
|
|
|
|
|
|
|
|
|
|
|
def write_file(filepath, # type: AnyStr |
|
|
@ -1312,7 +1359,7 @@ def write_file(filepath, # type: AnyStr |
|
|
|
""" |
|
|
|
result = False |
|
|
|
|
|
|
|
if make_dirs(ek.ek(os.path.dirname, filepath)): |
|
|
|
if make_path(ek.ek(os.path.dirname, filepath)): |
|
|
|
try: |
|
|
|
if raw: |
|
|
|
with ek.ek(io.FileIO, filepath, 'wb') as fh: |
|
|
@ -1353,6 +1400,40 @@ def write_file(filepath, # type: AnyStr |
|
|
|
return result |
|
|
|
|
|
|
|
|
|
|
|
def sanitize_filename(name): |
|
|
|
""" |
|
|
|
|
|
|
|
:param name: filename |
|
|
|
:type name: AnyStr |
|
|
|
:return: sanitized filename |
|
|
|
:rtype: AnyStr |
|
|
|
""" |
|
|
|
# remove bad chars from the filename |
|
|
|
name = re.sub(r'[\\/*]', '-', name) |
|
|
|
name = re.sub(r'[:"<>|?]', '', name) |
|
|
|
|
|
|
|
# remove leading/trailing periods and spaces |
|
|
|
name = name.strip(' .') |
|
|
|
|
|
|
|
for char in REMOVE_FILENAME_CHARS or []: |
|
|
|
name = name.replace(char, '') |
|
|
|
|
|
|
|
return name |
|
|
|
|
|
|
|
|
|
|
|
def replace_extension(filename, new_ext): |
|
|
|
# type: (AnyStr, AnyStr) -> AnyStr |
|
|
|
""" |
|
|
|
:param filename: filename |
|
|
|
:param new_ext: new extension |
|
|
|
:return: filename with new extension |
|
|
|
""" |
|
|
|
sep_file = filename.rpartition('.') |
|
|
|
if '' == sep_file[0]: |
|
|
|
return filename |
|
|
|
return '%s.%s' % (sep_file[0], new_ext) |
|
|
|
|
|
|
|
|
|
|
|
def long_path(path): |
|
|
|
# type: (AnyStr) -> AnyStr |
|
|
|
"""add long path prefix for Windows""" |
|
|
@ -1361,6 +1442,64 @@ def long_path(path): |
|
|
|
return path |
|
|
|
|
|
|
|
|
|
|
|
def compress_file(target, filename, prefer_7z=True, remove_source=True): |
|
|
|
# type: (AnyStr, AnyStr, bool, bool) -> bool |
|
|
|
""" |
|
|
|
compress given file to zip or 7z archive |
|
|
|
|
|
|
|
:param target: file to compress with full path |
|
|
|
:param filename: filename inside the archive |
|
|
|
:param prefer_7z: prefer 7z over zip compression if available |
|
|
|
:param remove_source: remove source file after successful creation of archive |
|
|
|
:return: success of compression |
|
|
|
""" |
|
|
|
try: |
|
|
|
if prefer_7z and None is not py7zr: |
|
|
|
z_name = '%s.7z' % target.rpartition('.')[0] |
|
|
|
# noinspection PyUnresolvedReferences |
|
|
|
with py7zr.SevenZipFile(z_name, 'w') as z_file: |
|
|
|
z_file.write(target, filename) |
|
|
|
else: |
|
|
|
zip_name = '%s.zip' % target.rpartition('.')[0] |
|
|
|
with zipfile.ZipFile(zip_name, 'w', zipfile.ZIP_DEFLATED) as zip_fh: |
|
|
|
zip_fh.write(target, filename) |
|
|
|
except (BaseException, Exception) as e: |
|
|
|
logger.error('error compressing %s' % target) |
|
|
|
logger.debug('traceback: %s' % ex(e)) |
|
|
|
return False |
|
|
|
if remove_source: |
|
|
|
remove_file_perm(target) |
|
|
|
return True |
|
|
|
|
|
|
|
|
|
|
|
def cmdline_runner(cmd, shell=False, suppress_stderr=False): |
|
|
|
# type: (Union[AnyStr, List[AnyStr]], bool, bool) -> Tuple[AnyStr, Optional[AnyStr], int] |
|
|
|
""" Execute a child program in a new process. |
|
|
|
|
|
|
|
Can raise an exception to be caught in callee |
|
|
|
|
|
|
|
:param cmd: A string, or a sequence of program arguments |
|
|
|
:param shell: If true, the command will be executed through the shell. |
|
|
|
:param suppress_stderr: Suppress stderr output if True |
|
|
|
""" |
|
|
|
# noinspection PyUnresolvedReferences |
|
|
|
kw = dict(cwd=PROG_DIR, shell=shell, stdin=subprocess.PIPE, stdout=subprocess.PIPE, |
|
|
|
stderr=(open(os.devnull, 'w') if PY2 else subprocess.DEVNULL, subprocess.STDOUT)[not suppress_stderr]) |
|
|
|
|
|
|
|
if not PY2: |
|
|
|
kw.update(dict(encoding=ek.SYS_ENCODING, text=True, bufsize=0)) |
|
|
|
|
|
|
|
if 'win32' == sys.platform: |
|
|
|
kw['creationflags'] = 0x08000000 # CREATE_NO_WINDOW (needed for py2exe) |
|
|
|
|
|
|
|
with Popen(cmd, **kw) as p: |
|
|
|
out, err = p.communicate() |
|
|
|
if out: |
|
|
|
out = out.strip() |
|
|
|
|
|
|
|
return out, err, p.returncode |
|
|
|
|
|
|
|
|
|
|
|
def md5_for_text(text): |
|
|
|
""" |
|
|
|
|
|
|
@ -1449,97 +1588,6 @@ def indent_xml(elem, level=0): |
|
|
|
elem.tail = i |
|
|
|
|
|
|
|
|
|
|
|
def compress_file(target, filename, prefer_7z=True, remove_source=True): |
|
|
|
# type: (AnyStr, AnyStr, bool, bool) -> bool |
|
|
|
""" |
|
|
|
compress given file to zip or 7z archive |
|
|
|
|
|
|
|
:param target: file to compress with full path |
|
|
|
:param filename: filename inside the archive |
|
|
|
:param prefer_7z: prefer 7z over zip compression if available |
|
|
|
:param remove_source: remove source file after successful creation of archive |
|
|
|
:return: success of compression |
|
|
|
""" |
|
|
|
try: |
|
|
|
if prefer_7z and None is not py7zr: |
|
|
|
z_name = '%s.7z' % target.rpartition('.')[0] |
|
|
|
with py7zr.SevenZipFile(z_name, 'w') as z_file: |
|
|
|
z_file.write(target, filename) |
|
|
|
else: |
|
|
|
zip_name = '%s.zip' % target.rpartition('.')[0] |
|
|
|
with zipfile.ZipFile(zip_name, 'w', zipfile.ZIP_DEFLATED) as zip_fh: |
|
|
|
zip_fh.write(target, filename) |
|
|
|
except (BaseException, Exception) as e: |
|
|
|
logger.error('error compressing %s' % target) |
|
|
|
logger.debug('traceback: %s' % ex(e)) |
|
|
|
return False |
|
|
|
if remove_source: |
|
|
|
remove_file_perm(target) |
|
|
|
return True |
|
|
|
|
|
|
|
|
|
|
|
def scantree(path, # type: AnyStr |
|
|
|
exclude=None, # type: Optional[AnyStr, List[AnyStr]] |
|
|
|
include=None, # type: Optional[AnyStr, List[AnyStr]] |
|
|
|
follow_symlinks=False, # type: bool |
|
|
|
filter_kind=None, # type: Optional[bool] |
|
|
|
recurse=True # type: bool |
|
|
|
): |
|
|
|
# type: (...) -> Generator[DirEntry, None, None] |
|
|
|
"""Yield DirEntry objects for given path. Returns without yield if path fails sanity check |
|
|
|
|
|
|
|
:param path: Path to scan, sanity check is_dir and exists |
|
|
|
:param exclude: Escaped regex string(s) to exclude |
|
|
|
:param include: Escaped regex string(s) to include |
|
|
|
:param follow_symlinks: Follow symlinks |
|
|
|
:param filter_kind: None to yield everything, True yields directories, False yields files |
|
|
|
:param recurse: Recursively scan the tree |
|
|
|
""" |
|
|
|
if isinstance(path, string_types) and path and ek.ek(os.path.isdir, path): |
|
|
|
rc_exc, rc_inc = [re.compile(rx % '|'.join( |
|
|
|
[x for x in (param, ([param], [])[None is param])[not isinstance(param, list)]])) |
|
|
|
for rx, param in ((r'(?i)^(?:(?!%s).)*$', exclude), (r'(?i)%s', include))] |
|
|
|
for entry in ek.ek(scandir, path): |
|
|
|
is_dir = entry.is_dir(follow_symlinks=follow_symlinks) |
|
|
|
is_file = entry.is_file(follow_symlinks=follow_symlinks) |
|
|
|
no_filter = any([None is filter_kind, filter_kind and is_dir, not filter_kind and is_file]) |
|
|
|
if (rc_exc.search(entry.name), True)[not exclude] and (rc_inc.search(entry.name), True)[not include] \ |
|
|
|
and (no_filter or (not filter_kind and is_dir and recurse)): |
|
|
|
if recurse and is_dir: |
|
|
|
for subentry in scantree(entry.path, exclude, include, follow_symlinks, filter_kind, recurse): |
|
|
|
yield subentry |
|
|
|
if no_filter: |
|
|
|
yield entry |
|
|
|
|
|
|
|
|
|
|
|
def cmdline_runner(cmd, shell=False, suppress_stderr=False): |
|
|
|
# type: (Union[AnyStr, List[AnyStr]], bool, bool) -> Tuple[AnyStr, Optional[AnyStr], int] |
|
|
|
""" Execute a child program in a new process. |
|
|
|
|
|
|
|
Can raise an exception to be caught in callee |
|
|
|
|
|
|
|
:param cmd: A string, or a sequence of program arguments |
|
|
|
:param shell: If true, the command will be executed through the shell. |
|
|
|
:param suppress_stderr: Suppress stderr output if True |
|
|
|
""" |
|
|
|
# noinspection PyUnresolvedReferences |
|
|
|
kw = dict(cwd=PROG_DIR, shell=shell, stdin=subprocess.PIPE, stdout=subprocess.PIPE, |
|
|
|
stderr=(open(os.devnull, 'w') if PY2 else subprocess.DEVNULL, subprocess.STDOUT)[not suppress_stderr]) |
|
|
|
|
|
|
|
if not PY2: |
|
|
|
kw.update(dict(encoding=ek.SYS_ENCODING, text=True, bufsize=0)) |
|
|
|
|
|
|
|
if 'win32' == sys.platform: |
|
|
|
kw['creationflags'] = 0x08000000 # CREATE_NO_WINDOW (needed for py2exe) |
|
|
|
|
|
|
|
with Popen(cmd, **kw) as p: |
|
|
|
out, err = p.communicate() |
|
|
|
if out: |
|
|
|
out = out.strip() |
|
|
|
|
|
|
|
return out, err, p.returncode |
|
|
|
|
|
|
|
|
|
|
|
def ast_eval(value, default=None): |
|
|
|
# type: (AnyStr, Any) -> Any |
|
|
|
"""Convert string typed value into actual Python type and value |
|
|
@ -1575,27 +1623,6 @@ def ast_eval(value, default=None): |
|
|
|
return value |
|
|
|
|
|
|
|
|
|
|
|
def sanitize_filename(name): |
|
|
|
""" |
|
|
|
|
|
|
|
:param name: filename |
|
|
|
:type name: AnyStr |
|
|
|
:return: sanitized filename |
|
|
|
:rtype: AnyStr |
|
|
|
""" |
|
|
|
# remove bad chars from the filename |
|
|
|
name = re.sub(r'[\\/*]', '-', name) |
|
|
|
name = re.sub(r'[:"<>|?]', '', name) |
|
|
|
|
|
|
|
# remove leading/trailing periods and spaces |
|
|
|
name = name.strip(' .') |
|
|
|
|
|
|
|
for char in REMOVE_FILENAME_CHARS or []: |
|
|
|
name = name.replace(char, '') |
|
|
|
|
|
|
|
return name |
|
|
|
|
|
|
|
|
|
|
|
def download_file(url, filename, session=None, **kwargs): |
|
|
|
""" |
|
|
|
download given url to given filename |
|
|
@ -1678,34 +1705,3 @@ def spoken_height(height): |
|
|
|
:param height: height in cm |
|
|
|
""" |
|
|
|
return convert_to_inch_faction_html(height).replace('\'', ' foot').replace('"', '') |
|
|
|
|
|
|
|
|
|
|
|
def touch_file(fname, atime=None, dir_name=None): |
|
|
|
""" |
|
|
|
set access time of given file |
|
|
|
|
|
|
|
:param fname: filename |
|
|
|
:type fname: AnyStr |
|
|
|
:param atime: access time as epoch |
|
|
|
:type atime: int |
|
|
|
:param: dir_name: directory name |
|
|
|
:type dir_name: AnyStr |
|
|
|
:return: success |
|
|
|
:rtype: bool |
|
|
|
""" |
|
|
|
if None is not dir_name: |
|
|
|
fname = ek.ek(os.path.join, dir_name, fname) |
|
|
|
if make_dirs(dir_name): |
|
|
|
if not ek.ek(os.path.exists, fname): |
|
|
|
with io.open(fname, 'w') as fh: |
|
|
|
fh.flush() |
|
|
|
|
|
|
|
if None is not atime: |
|
|
|
try: |
|
|
|
with open(fname, 'a'): |
|
|
|
ek.ek(os.utime, fname, (atime, atime)) |
|
|
|
return True |
|
|
|
except (BaseException, Exception): |
|
|
|
logger.debug('File air date stamping not available on your OS') |
|
|
|
|
|
|
|
return False |
|
|
|