You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
456 lines
14 KiB
456 lines
14 KiB
"""scandir, a better directory iterator that exposes all file info OS provides
|
|
|
|
scandir is a generator version of os.listdir() that returns an iterator over
|
|
files in a directory, and also exposes the extra information most OSes provide
|
|
while iterating files in a directory.
|
|
|
|
See README.md or https://github.com/benhoyt/scandir for rationale and docs.
|
|
|
|
scandir is released under the new BSD 3-clause license. See LICENSE.txt for
|
|
the full license text.
|
|
"""
|
|
|
|
from __future__ import division
|
|
|
|
import ctypes
|
|
import os
|
|
import stat
|
|
import sys
|
|
|
|
__version__ = '0.3'
|
|
__all__ = ['scandir', 'walk']
|
|
|
|
# Shortcuts to these functions for speed and ease
|
|
join = os.path.join
|
|
lstat = os.lstat
|
|
|
|
S_IFDIR = stat.S_IFDIR
|
|
S_IFREG = stat.S_IFREG
|
|
S_IFLNK = stat.S_IFLNK
|
|
|
|
# 'unicode' isn't defined on Python 3
|
|
try:
|
|
unicode
|
|
except NameError:
|
|
unicode = str
|
|
|
|
_scandir = None
|
|
|
|
|
|
class GenericDirEntry(object):
|
|
__slots__ = ('name', '_lstat', '_path')
|
|
|
|
def __init__(self, path, name):
|
|
self._path = path
|
|
self.name = name
|
|
self._lstat = None
|
|
|
|
def lstat(self):
|
|
if self._lstat is None:
|
|
self._lstat = lstat(join(self._path, self.name))
|
|
return self._lstat
|
|
|
|
def is_dir(self):
|
|
try:
|
|
self.lstat()
|
|
except OSError:
|
|
return False
|
|
return self._lstat.st_mode & 0o170000 == S_IFDIR
|
|
|
|
def is_file(self):
|
|
try:
|
|
self.lstat()
|
|
except OSError:
|
|
return False
|
|
return self._lstat.st_mode & 0o170000 == S_IFREG
|
|
|
|
def is_symlink(self):
|
|
try:
|
|
self.lstat()
|
|
except OSError:
|
|
return False
|
|
return self._lstat.st_mode & 0o170000 == S_IFLNK
|
|
|
|
def __str__(self):
|
|
return '<{0}: {1!r}>'.format(self.__class__.__name__, self.name)
|
|
|
|
__repr__ = __str__
|
|
|
|
|
|
if sys.platform == 'win32':
|
|
from ctypes import wintypes
|
|
|
|
# Various constants from windows.h
|
|
INVALID_HANDLE_VALUE = ctypes.c_void_p(-1).value
|
|
ERROR_FILE_NOT_FOUND = 2
|
|
ERROR_NO_MORE_FILES = 18
|
|
FILE_ATTRIBUTE_READONLY = 1
|
|
FILE_ATTRIBUTE_DIRECTORY = 16
|
|
FILE_ATTRIBUTE_REPARSE_POINT = 1024
|
|
|
|
# Numer of seconds between 1601-01-01 and 1970-01-01
|
|
SECONDS_BETWEEN_EPOCHS = 11644473600
|
|
|
|
kernel32 = ctypes.windll.kernel32
|
|
|
|
# ctypes wrappers for (wide string versions of) FindFirstFile,
|
|
# FindNextFile, and FindClose
|
|
FindFirstFile = kernel32.FindFirstFileW
|
|
FindFirstFile.argtypes = [
|
|
wintypes.LPCWSTR,
|
|
ctypes.POINTER(wintypes.WIN32_FIND_DATAW),
|
|
]
|
|
FindFirstFile.restype = wintypes.HANDLE
|
|
|
|
FindNextFile = kernel32.FindNextFileW
|
|
FindNextFile.argtypes = [
|
|
wintypes.HANDLE,
|
|
ctypes.POINTER(wintypes.WIN32_FIND_DATAW),
|
|
]
|
|
FindNextFile.restype = wintypes.BOOL
|
|
|
|
FindClose = kernel32.FindClose
|
|
FindClose.argtypes = [wintypes.HANDLE]
|
|
FindClose.restype = wintypes.BOOL
|
|
|
|
def filetime_to_time(filetime):
|
|
"""Convert Win32 FILETIME to time since Unix epoch in seconds."""
|
|
total = filetime.dwHighDateTime << 32 | filetime.dwLowDateTime
|
|
return total / 10000000 - SECONDS_BETWEEN_EPOCHS
|
|
|
|
def find_data_to_stat(data):
|
|
"""Convert Win32 FIND_DATA struct to stat_result."""
|
|
# First convert Win32 dwFileAttributes to st_mode
|
|
attributes = data.dwFileAttributes
|
|
st_mode = 0
|
|
if attributes & FILE_ATTRIBUTE_DIRECTORY:
|
|
st_mode |= S_IFDIR | 0o111
|
|
else:
|
|
st_mode |= S_IFREG
|
|
if attributes & FILE_ATTRIBUTE_READONLY:
|
|
st_mode |= 0o444
|
|
else:
|
|
st_mode |= 0o666
|
|
if attributes & FILE_ATTRIBUTE_REPARSE_POINT:
|
|
st_mode |= S_IFLNK
|
|
|
|
st_size = data.nFileSizeHigh << 32 | data.nFileSizeLow
|
|
st_atime = filetime_to_time(data.ftLastAccessTime)
|
|
st_mtime = filetime_to_time(data.ftLastWriteTime)
|
|
st_ctime = filetime_to_time(data.ftCreationTime)
|
|
|
|
# Some fields set to zero per CPython's posixmodule.c: st_ino, st_dev,
|
|
# st_nlink, st_uid, st_gid
|
|
return os.stat_result((st_mode, 0, 0, 0, 0, 0, st_size, st_atime,
|
|
st_mtime, st_ctime))
|
|
|
|
class Win32DirEntry(object):
|
|
__slots__ = ('name', '_lstat', '_find_data')
|
|
|
|
def __init__(self, name, find_data):
|
|
self.name = name
|
|
self._lstat = None
|
|
self._find_data = find_data
|
|
|
|
def lstat(self):
|
|
if self._lstat is None:
|
|
# Lazily convert to stat object, because it's slow, and often
|
|
# we only need is_dir() etc
|
|
self._lstat = find_data_to_stat(self._find_data)
|
|
return self._lstat
|
|
|
|
def is_dir(self):
|
|
return (self._find_data.dwFileAttributes &
|
|
FILE_ATTRIBUTE_DIRECTORY != 0)
|
|
|
|
def is_file(self):
|
|
return (self._find_data.dwFileAttributes &
|
|
FILE_ATTRIBUTE_DIRECTORY == 0)
|
|
|
|
def is_symlink(self):
|
|
return (self._find_data.dwFileAttributes &
|
|
FILE_ATTRIBUTE_REPARSE_POINT != 0)
|
|
|
|
def __str__(self):
|
|
return '<{0}: {1!r}>'.format(self.__class__.__name__, self.name)
|
|
|
|
__repr__ = __str__
|
|
|
|
def win_error(error, filename):
|
|
exc = WindowsError(error, ctypes.FormatError(error))
|
|
exc.filename = filename
|
|
return exc
|
|
|
|
def scandir(path='.', windows_wildcard='*.*'):
|
|
"""Like os.listdir(), but yield DirEntry objects instead of returning
|
|
a list of names.
|
|
"""
|
|
# Call FindFirstFile and handle errors
|
|
data = wintypes.WIN32_FIND_DATAW()
|
|
data_p = ctypes.byref(data)
|
|
filename = join(path, windows_wildcard)
|
|
handle = FindFirstFile(filename, data_p)
|
|
if handle == INVALID_HANDLE_VALUE:
|
|
error = ctypes.GetLastError()
|
|
if error == ERROR_FILE_NOT_FOUND:
|
|
# No files, don't yield anything
|
|
return
|
|
raise win_error(error, path)
|
|
|
|
# Call FindNextFile in a loop, stopping when no more files
|
|
try:
|
|
while True:
|
|
# Skip '.' and '..' (current and parent directory), but
|
|
# otherwise yield (filename, stat_result) tuple
|
|
name = data.cFileName
|
|
if name not in ('.', '..'):
|
|
yield Win32DirEntry(name, data)
|
|
|
|
data = wintypes.WIN32_FIND_DATAW()
|
|
data_p = ctypes.byref(data)
|
|
success = FindNextFile(handle, data_p)
|
|
if not success:
|
|
error = ctypes.GetLastError()
|
|
if error == ERROR_NO_MORE_FILES:
|
|
break
|
|
raise win_error(error, path)
|
|
finally:
|
|
if not FindClose(handle):
|
|
raise win_error(ctypes.GetLastError(), path)
|
|
|
|
try:
|
|
import _scandir
|
|
|
|
scandir_helper = _scandir.scandir_helper
|
|
|
|
class Win32DirEntry(object):
|
|
__slots__ = ('name', '_lstat')
|
|
|
|
def __init__(self, name, lstat):
|
|
self.name = name
|
|
self._lstat = lstat
|
|
|
|
def lstat(self):
|
|
return self._lstat
|
|
|
|
def is_dir(self):
|
|
return self._lstat.st_mode & 0o170000 == S_IFDIR
|
|
|
|
def is_file(self):
|
|
return self._lstat.st_mode & 0o170000 == S_IFREG
|
|
|
|
def is_symlink(self):
|
|
return self._lstat.st_mode & 0o170000 == S_IFLNK
|
|
|
|
def __str__(self):
|
|
return '<{0}: {1!r}>'.format(self.__class__.__name__, self.name)
|
|
|
|
__repr__ = __str__
|
|
|
|
def scandir(path='.'):
|
|
for name, stat in scandir_helper(unicode(path)):
|
|
yield Win32DirEntry(name, stat)
|
|
|
|
except ImportError:
|
|
pass
|
|
|
|
|
|
# Linux, OS X, and BSD implementation
|
|
elif sys.platform.startswith(('linux', 'darwin')) or 'bsd' in sys.platform:
|
|
import ctypes.util
|
|
|
|
DIR_p = ctypes.c_void_p
|
|
|
|
# Rather annoying how the dirent struct is slightly different on each
|
|
# platform. The only fields we care about are d_name and d_type.
|
|
class Dirent(ctypes.Structure):
|
|
if sys.platform.startswith('linux'):
|
|
_fields_ = (
|
|
('d_ino', ctypes.c_ulong),
|
|
('d_off', ctypes.c_long),
|
|
('d_reclen', ctypes.c_ushort),
|
|
('d_type', ctypes.c_byte),
|
|
('d_name', ctypes.c_char * 256),
|
|
)
|
|
else:
|
|
_fields_ = (
|
|
('d_ino', ctypes.c_uint32), # must be uint32, not ulong
|
|
('d_reclen', ctypes.c_ushort),
|
|
('d_type', ctypes.c_byte),
|
|
('d_namlen', ctypes.c_byte),
|
|
('d_name', ctypes.c_char * 256),
|
|
)
|
|
|
|
DT_UNKNOWN = 0
|
|
DT_DIR = 4
|
|
DT_REG = 8
|
|
DT_LNK = 10
|
|
|
|
Dirent_p = ctypes.POINTER(Dirent)
|
|
Dirent_pp = ctypes.POINTER(Dirent_p)
|
|
|
|
libc = ctypes.CDLL(ctypes.util.find_library('c'), use_errno=True)
|
|
opendir = libc.opendir
|
|
opendir.argtypes = [ctypes.c_char_p]
|
|
opendir.restype = DIR_p
|
|
|
|
readdir_r = libc.readdir_r
|
|
readdir_r.argtypes = [DIR_p, Dirent_p, Dirent_pp]
|
|
readdir_r.restype = ctypes.c_int
|
|
|
|
closedir = libc.closedir
|
|
closedir.argtypes = [DIR_p]
|
|
closedir.restype = ctypes.c_int
|
|
|
|
file_system_encoding = sys.getfilesystemencoding()
|
|
|
|
class PosixDirEntry(object):
|
|
__slots__ = ('name', '_d_type', '_lstat', '_path')
|
|
|
|
def __init__(self, path, name, d_type):
|
|
self._path = path
|
|
self.name = name
|
|
self._d_type = d_type
|
|
self._lstat = None
|
|
|
|
def lstat(self):
|
|
if self._lstat is None:
|
|
self._lstat = lstat(join(self._path, self.name))
|
|
return self._lstat
|
|
|
|
# Ridiculous duplication between these is* functions -- helps a little
|
|
# bit with os.walk() performance compared to calling another function.
|
|
def is_dir(self):
|
|
d_type = self._d_type
|
|
if d_type != DT_UNKNOWN:
|
|
return d_type == DT_DIR
|
|
try:
|
|
self.lstat()
|
|
except OSError:
|
|
return False
|
|
return self._lstat.st_mode & 0o170000 == S_IFDIR
|
|
|
|
def is_file(self):
|
|
d_type = self._d_type
|
|
if d_type != DT_UNKNOWN:
|
|
return d_type == DT_REG
|
|
try:
|
|
self.lstat()
|
|
except OSError:
|
|
return False
|
|
return self._lstat.st_mode & 0o170000 == S_IFREG
|
|
|
|
def is_symlink(self):
|
|
d_type = self._d_type
|
|
if d_type != DT_UNKNOWN:
|
|
return d_type == DT_LNK
|
|
try:
|
|
self.lstat()
|
|
except OSError:
|
|
return False
|
|
return self._lstat.st_mode & 0o170000 == S_IFLNK
|
|
|
|
def __str__(self):
|
|
return '<{0}: {1!r}>'.format(self.__class__.__name__, self.name)
|
|
|
|
__repr__ = __str__
|
|
|
|
def posix_error(filename):
|
|
errno = ctypes.get_errno()
|
|
exc = OSError(errno, os.strerror(errno))
|
|
exc.filename = filename
|
|
return exc
|
|
|
|
def scandir(path='.'):
|
|
"""Like os.listdir(), but yield DirEntry objects instead of returning
|
|
a list of names.
|
|
"""
|
|
dir_p = opendir(path.encode(file_system_encoding))
|
|
if not dir_p:
|
|
raise posix_error(path)
|
|
try:
|
|
result = Dirent_p()
|
|
while True:
|
|
entry = Dirent()
|
|
if readdir_r(dir_p, entry, result):
|
|
raise posix_error(path)
|
|
if not result:
|
|
break
|
|
name = entry.d_name.decode(file_system_encoding)
|
|
if name not in ('.', '..'):
|
|
yield PosixDirEntry(path, name, entry.d_type)
|
|
finally:
|
|
if closedir(dir_p):
|
|
raise posix_error(path)
|
|
|
|
try:
|
|
import _scandir
|
|
|
|
scandir_helper = _scandir.scandir_helper
|
|
|
|
def scandir(path='.'):
|
|
for name, d_type in scandir_helper(unicode(path)):
|
|
yield PosixDirEntry(path, name, d_type)
|
|
|
|
except ImportError:
|
|
pass
|
|
|
|
|
|
# Some other system -- no d_type or stat information
|
|
else:
|
|
def scandir(path='.'):
|
|
"""Like os.listdir(), but yield DirEntry objects instead of returning
|
|
a list of names.
|
|
"""
|
|
for name in os.listdir(path):
|
|
yield GenericDirEntry(path, name)
|
|
|
|
|
|
def walk(top, topdown=True, onerror=None, followlinks=False):
|
|
"""Like os.walk(), but faster, as it uses scandir() internally."""
|
|
# Determine which are files and which are directories
|
|
dirs = []
|
|
nondirs = []
|
|
try:
|
|
for entry in scandir(top):
|
|
if entry.is_dir():
|
|
dirs.append(entry)
|
|
else:
|
|
nondirs.append(entry)
|
|
except OSError as error:
|
|
if onerror is not None:
|
|
onerror(error)
|
|
return
|
|
|
|
# Yield before recursion if going top down
|
|
if topdown:
|
|
# Need to do some fancy footwork here as caller is allowed to modify
|
|
# dir_names, and we really want them to modify dirs (list of DirEntry
|
|
# objects) instead. Keep a mapping of entries keyed by name.
|
|
dir_names = []
|
|
entries_by_name = {}
|
|
for entry in dirs:
|
|
dir_names.append(entry.name)
|
|
entries_by_name[entry.name] = entry
|
|
|
|
yield top, dir_names, [e.name for e in nondirs]
|
|
|
|
dirs = []
|
|
for dir_name in dir_names:
|
|
entry = entries_by_name.get(dir_name)
|
|
if entry is None:
|
|
# Only happens when caller creates a new directory and adds it
|
|
# to dir_names
|
|
entry = GenericDirEntry(top, dir_name)
|
|
dirs.append(entry)
|
|
|
|
# Recurse into sub-directories, following symbolic links if "followlinks"
|
|
for entry in dirs:
|
|
if followlinks or not entry.is_symlink():
|
|
new_path = join(top, entry.name)
|
|
for x in walk(new_path, topdown, onerror, followlinks):
|
|
yield x
|
|
|
|
# Yield before recursion if going bottom up
|
|
if not topdown:
|
|
yield top, [e.name for e in dirs], [e.name for e in nondirs]
|
|
|