sb/path: Update the path module to the latest from master.

- Fixes Windows path issues
- Fixes infinite loop with a read-only path.

Close #3394
This commit is contained in:
Chris Johns 2018-04-13 13:14:51 +10:00
parent 9f4ed7e316
commit 62537bc0ac

View File

@ -34,6 +34,7 @@ import string
import error import error
windows = os.name == 'nt' windows = os.name == 'nt'
win_maxpath = 254
def host(path): def host(path):
if path is not None: if path is not None:
@ -46,13 +47,21 @@ def host(path):
path[1] in string.ascii_uppercase): path[1] in string.ascii_uppercase):
path = '%s:%s' % (path[1], path[2:]) path = '%s:%s' % (path[1], path[2:])
path = path.replace('/', '\\') path = path.replace('/', '\\')
if not path.startswith('\\\\?\\') and len(path) > 254: if len(path) > win_maxpath:
path = '\\\\?\\' + path if path.startswith('\\\\?\\'):
path = path[4:]
path = u'\\'.join([u'\\\\?', path])
return path return path
def is_abspath(path):
if path is not None and len(path) > 0:
return '/' == path[0]
return False
def shell(path): def shell(path):
if path is not None: if path is not None:
if windows: if windows:
path = path.encode('ascii', 'ignore')
if path.startswith('\\\\?\\'): if path.startswith('\\\\?\\'):
path = path[4:] path = path[4:]
if len(path) > 1 and path[1] == ':': if len(path) > 1 and path[1] == ':':
@ -63,9 +72,11 @@ def shell(path):
return path return path
def basename(path): def basename(path):
return shell(os.path.basename(path)) path = shell(path)
return shell(os.path.basename(host(path)))
def dirname(path): def dirname(path):
path = shell(path)
return shell(os.path.dirname(path)) return shell(os.path.dirname(path))
def join(path, *args): def join(path, *args):
@ -78,42 +89,68 @@ def join(path, *args):
return shell(path) return shell(path)
def abspath(path): def abspath(path):
path = shell(path)
return shell(os.path.abspath(host(path))) return shell(os.path.abspath(host(path)))
def relpath(path, start = None):
path = shell(path)
if start is None:
path = os.path.relpath(host(path))
else:
path = os.path.relpath(host(path), start)
return shell(path)
def splitext(path): def splitext(path):
path = shell(path)
root, ext = os.path.splitext(host(path)) root, ext = os.path.splitext(host(path))
return shell(root), ext return shell(root), ext
def listdir(path):
path = shell(path)
hp = host(path)
if not os.path.exists(hp):
return []
return os.listdir(hp)
def exists(paths): def exists(paths):
def _exists(p):
if not is_abspath(p):
p = shell(join(os.getcwd(), host(p)))
return basename(p) in ['.'] + listdir(dirname(p))
if type(paths) == list: if type(paths) == list:
results = [] results = []
for p in paths: for p in paths:
results += [os.path.exists(host(p))] results += [_exists(shell(p))]
return results return results
return os.path.exists(host(paths)) return _exists(shell(paths))
def isdir(path): def isdir(path):
path = shell(path)
return os.path.isdir(host(path)) return os.path.isdir(host(path))
def isfile(path): def isfile(path):
path = shell(path)
return os.path.isfile(host(path)) return os.path.isfile(host(path))
def isabspath(path): def isabspath(path):
path = shell(path)
return path[0] == '/' return path[0] == '/'
def iswritable(path): def iswritable(path):
path = shell(path)
return os.access(host(path), os.W_OK) return os.access(host(path), os.W_OK)
def ispathwritable(path): def ispathwritable(path):
path = host(path) path = shell(path)
while len(path) != 0: while len(path) > 1:
if os.path.exists(path): if exists(path):
return iswritable(path) return iswritable(path)
path = os.path.dirname(path) path = dirname(path)
return False return False
def mkdir(path): def mkdir(path):
path = host(path) path = shell(path)
if exists(path): if exists(path):
if not isdir(path): if not isdir(path):
raise error.general('path exists and is not a directory: %s' % (path)) raise error.general('path exists and is not a directory: %s' % (path))
@ -135,41 +172,60 @@ def mkdir(path):
except OSError as err: except OSError as err:
raise error.general('cannot make directory: %s' % (path)) raise error.general('cannot make directory: %s' % (path))
def chdir(path):
path = shell(path)
os.chdir(host(path))
def removeall(path): def removeall(path):
# #
# Perform the removal of the directory tree manually so we can # Perform the removal of the directory tree manually so we can
# make sure on Windows the files and correctly encoded to avoid # make sure on Windows the files are correctly encoded to avoid
# the size limit. # the file name size limit. On Windows the os.walk fails once we
# get to the max path length on Windows.
# #
path = host(path) def _isdir(path):
for root, dirs, files in os.walk(path, topdown = False): hpath = host(path)
for name in files: return os.path.isdir(hpath) and not os.path.islink(hpath)
file = host(os.path.join(root, name))
if not os.path.islink(file) and not os.access(file, os.W_OK): def _remove_node(path):
os.chmod(file, stat.S_IWUSR) hpath = host(path)
os.unlink(file) if not os.path.islink(hpath) and not os.access(hpath, os.W_OK):
for name in dirs: os.chmod(hpath, stat.S_IWUSR)
dir = host(os.path.join(root, name)) if _isdir(path):
if os.path.islink(dir): os.rmdir(hpath)
os.unlink(dir) else:
os.unlink(hpath)
def _remove(path):
dirs = []
for name in listdir(path):
path_ = join(path, name)
hname = host(path_)
if _isdir(path_):
dirs += [name]
else: else:
if not os.access(dir, os.W_OK): _remove_node(path_)
os.chmod(dir, stat.S_IWUSR) for name in dirs:
os.rmdir(dir) dir = join(path, name)
if not os.path.islink(path) and not os.access(path, os.W_OK): _remove(dir)
os.chmod(path, stat.S_IWUSR) _remove_node(dir)
if os.path.islink(path):
os.unlink(path) path = shell(path)
else: hpath = host(path)
os.rmdir(path)
if os.path.exists(hpath):
_remove(path)
_remove_node(path)
def expand(name, paths): def expand(name, paths):
l = [] l = []
for p in paths: for p in paths:
l += [join(p, name)] l += [join(shell(p), name)]
return l return l
def copy(src, dst): def copy(src, dst):
src = shell(src)
dst = shell(dst)
hsrc = host(src) hsrc = host(src)
hdst = host(dst) hdst = host(dst)
try: try:
@ -179,7 +235,7 @@ def copy(src, dst):
if WindowsError is not None and isinstance(why, WindowsError): if WindowsError is not None and isinstance(why, WindowsError):
pass pass
else: else:
raise error.general('copying tree: %s -> %s: %s' % (hsrc, hdst, str(why))) raise error.general('copying tree (1): %s -> %s: %s' % (hsrc, hdst, str(why)))
def copy_tree(src, dst): def copy_tree(src, dst):
trace = False trace = False
@ -187,17 +243,17 @@ def copy_tree(src, dst):
hsrc = host(src) hsrc = host(src)
hdst = host(dst) hdst = host(dst)
if os.path.exists(hsrc): if exists(src):
names = os.listdir(hsrc) names = listdir(src)
else: else:
names = [] names = []
if trace: if trace:
print('path.copy_tree:') print('path.copy_tree:')
print(' src: %s' % (src)) print(' src: "%s"' % (src))
print(' hsrc: %s' % (hsrc)) print(' hsrc: "%s"' % (hsrc))
print(' dst: %s' % (dst)) print(' dst: "%s"' % (dst))
print(' hdst: %s' % (hdst)) print(' hdst: "%s"' % (hdst))
print(' names: %r' % (names)) print(' names: %r' % (names))
if not os.path.isdir(hdst): if not os.path.isdir(hdst):
@ -215,7 +271,7 @@ def copy_tree(src, dst):
try: try:
if os.path.islink(srcname): if os.path.islink(srcname):
linkto = os.readlink(srcname) linkto = os.readlink(srcname)
if os.path.exists(dstname): if exists(shell(dstname)):
if os.path.islink(dstname): if os.path.islink(dstname):
dstlinkto = os.readlink(dstname) dstlinkto = os.readlink(dstname)
if linkto != dstlinkto: if linkto != dstlinkto:
@ -231,12 +287,13 @@ def copy_tree(src, dst):
elif os.path.isdir(srcname): elif os.path.isdir(srcname):
copy_tree(srcname, dstname) copy_tree(srcname, dstname)
else: else:
shutil.copy2(host(srcname), host(dstname)) shutil.copyfile(host(srcname), host(dstname))
shutil.copystat(host(srcname), host(dstname))
except shutil.Error as err: except shutil.Error as err:
raise error.general('copying tree: %s -> %s: %s' % \ raise error.general('copying tree (2): %s -> %s: %s' % \
(hsrc, hdst, str(err))) (hsrc, hdst, str(err)))
except EnvironmentError as why: except EnvironmentError as why:
raise error.general('copying tree: %s -> %s: %s' % \ raise error.general('copying tree (3): %s -> %s: %s' % \
(srcname, dstname, str(why))) (srcname, dstname, str(why)))
try: try:
shutil.copystat(hsrc, hdst) shutil.copystat(hsrc, hdst)
@ -245,7 +302,7 @@ def copy_tree(src, dst):
if WindowsError is not None and isinstance(why, WindowsError): if WindowsError is not None and isinstance(why, WindowsError):
pass pass
else: else:
raise error.general('copying tree: %s -> %s: %s' % (hsrc, hdst, str(why))) raise error.general('copying tree (4): %s -> %s: %s' % (hsrc, hdst, str(why)))
if __name__ == '__main__': if __name__ == '__main__':
print(host('/a/b/c/d-e-f')) print(host('/a/b/c/d-e-f'))