Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 59 additions & 114 deletions Lib/pathlib.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,93 +274,6 @@ def make_uri(self, path):
_posix_flavour = _PosixFlavour()


class _Accessor:
"""An accessor implements a particular (system-specific or not) way of
accessing paths on the filesystem."""


class _NormalAccessor(_Accessor):

stat = os.stat

open = io.open

listdir = os.listdir

scandir = os.scandir

chmod = os.chmod

mkdir = os.mkdir

unlink = os.unlink

if hasattr(os, "link"):
link = os.link
else:
def link(self, src, dst):
raise NotImplementedError("os.link() not available on this system")

rmdir = os.rmdir

rename = os.rename

replace = os.replace

if hasattr(os, "symlink"):
symlink = os.symlink
else:
def symlink(self, src, dst, target_is_directory=False):
raise NotImplementedError("os.symlink() not available on this system")

def touch(self, path, mode=0o666, exist_ok=True):
if exist_ok:
# First try to bump modification time
# Implementation note: GNU touch uses the UTIME_NOW option of
# the utimensat() / futimens() functions.
try:
os.utime(path, None)
except OSError:
# Avoid exception chaining
pass
else:
return
flags = os.O_CREAT | os.O_WRONLY
if not exist_ok:
flags |= os.O_EXCL
fd = os.open(path, flags, mode)
os.close(fd)

if hasattr(os, "readlink"):
readlink = os.readlink
else:
def readlink(self, path):
raise NotImplementedError("os.readlink() not available on this system")

def owner(self, path):
try:
import pwd
return pwd.getpwuid(self.stat(path).st_uid).pw_name
except ImportError:
raise NotImplementedError("Path.owner() is unsupported on this system")

def group(self, path):
try:
import grp
return grp.getgrgid(self.stat(path).st_gid).gr_name
except ImportError:
raise NotImplementedError("Path.group() is unsupported on this system")

getcwd = os.getcwd

expanduser = staticmethod(os.path.expanduser)

realpath = staticmethod(os.path.realpath)


_normal_accessor = _NormalAccessor()


#
# Globbing helpers
#
Expand Down Expand Up @@ -401,7 +314,7 @@ def select_from(self, parent_path):
path_cls = type(parent_path)
is_dir = path_cls.is_dir
exists = path_cls.exists
scandir = parent_path._accessor.scandir
scandir = path_cls._scandir
if not is_dir(parent_path):
return iter([])
return self._select_from(parent_path, is_dir, exists, scandir)
Expand Down Expand Up @@ -949,7 +862,6 @@ class Path(PurePath):
object. You can also instantiate a PosixPath or WindowsPath directly,
but cannot instantiate a WindowsPath on a POSIX system or vice versa.
"""
_accessor = _normal_accessor
__slots__ = ()

def __new__(cls, *args, **kwargs):
Expand Down Expand Up @@ -988,7 +900,7 @@ def cwd(cls):
"""Return a new path pointing to the current working directory
(as returned by os.getcwd()).
"""
return cls(cls._accessor.getcwd())
return cls(os.getcwd())

@classmethod
def home(cls):
Expand All @@ -1005,16 +917,22 @@ def samefile(self, other_path):
try:
other_st = other_path.stat()
except AttributeError:
other_st = self._accessor.stat(other_path)
other_st = self.__class__(other_path).stat()
return os.path.samestat(st, other_st)

def iterdir(self):
"""Iterate over the files in this directory. Does not yield any
result for the special paths '.' and '..'.
"""
for name in self._accessor.listdir(self):
for name in os.listdir(self):
yield self._make_child_relpath(name)

def _scandir(self):
# bpo-24132: a future version of pathlib will support subclassing of
# pathlib.Path to customize how the filesystem is accessed. This
# includes scandir(), which is used to implement glob().
return os.scandir(self)

def glob(self, pattern):
"""Iterate over this subtree and yield all existing files (of any
kind, including directories) matching the given relative pattern.
Expand Down Expand Up @@ -1052,9 +970,7 @@ def absolute(self):
# XXX untested yet!
if self.is_absolute():
return self
# FIXME this must defer to the specific flavour (and, under Windows,
# use nt._getfullpathname())
return self._from_parts([self._accessor.getcwd()] + self._parts)
return self._from_parts([self.cwd()] + self._parts)

def resolve(self, strict=False):
"""
Expand All @@ -1069,7 +985,7 @@ def check_eloop(e):
raise RuntimeError("Symlink loop from %r" % e.filename)

try:
s = self._accessor.realpath(self, strict=strict)
s = os.path.realpath(self, strict=strict)
except OSError as e:
check_eloop(e)
raise
Expand All @@ -1089,19 +1005,28 @@ def stat(self, *, follow_symlinks=True):
Return the result of the stat() system call on this path, like
os.stat() does.
"""
return self._accessor.stat(self, follow_symlinks=follow_symlinks)
return os.stat(self, follow_symlinks=follow_symlinks)

def owner(self):
"""
Return the login name of the file owner.
"""
return self._accessor.owner(self)
try:
import pwd
return pwd.getpwuid(self.stat().st_uid).pw_name
except ImportError:
raise NotImplementedError("Path.owner() is unsupported on this system")

def group(self):
"""
Return the group name of the file gid.
"""
return self._accessor.group(self)

try:
import grp
return grp.getgrgid(self.stat().st_gid).gr_name
except ImportError:
raise NotImplementedError("Path.group() is unsupported on this system")

def open(self, mode='r', buffering=-1, encoding=None,
errors=None, newline=None):
Expand All @@ -1111,8 +1036,7 @@ def open(self, mode='r', buffering=-1, encoding=None,
"""
if "b" not in mode:
encoding = io.text_encoding(encoding)
return self._accessor.open(self, mode, buffering, encoding, errors,
newline)
return io.open(self, mode, buffering, encoding, errors, newline)

def read_bytes(self):
"""
Expand Down Expand Up @@ -1153,21 +1077,38 @@ def readlink(self):
"""
Return the path to which the symbolic link points.
"""
path = self._accessor.readlink(self)
return self._from_parts((path,))
if not hasattr(os, "readlink"):
raise NotImplementedError("os.readlink() not available on this system")
return self._from_parts((os.readlink(self),))

def touch(self, mode=0o666, exist_ok=True):
"""
Create this file with the given access mode, if it doesn't exist.
"""
self._accessor.touch(self, mode, exist_ok)

if exist_ok:
# First try to bump modification time
# Implementation note: GNU touch uses the UTIME_NOW option of
# the utimensat() / futimens() functions.
try:
os.utime(self, None)
except OSError:
# Avoid exception chaining
pass
else:
return
flags = os.O_CREAT | os.O_WRONLY
if not exist_ok:
flags |= os.O_EXCL
fd = os.open(self, flags, mode)
os.close(fd)

def mkdir(self, mode=0o777, parents=False, exist_ok=False):
"""
Create a new directory at this given path.
"""
try:
self._accessor.mkdir(self, mode)
os.mkdir(self, mode)
except FileNotFoundError:
if not parents or self.parent == self:
raise
Expand All @@ -1183,7 +1124,7 @@ def chmod(self, mode, *, follow_symlinks=True):
"""
Change the permissions of the path, like os.chmod().
"""
self._accessor.chmod(self, mode, follow_symlinks=follow_symlinks)
os.chmod(self, mode, follow_symlinks=follow_symlinks)

def lchmod(self, mode):
"""
Expand All @@ -1198,7 +1139,7 @@ def unlink(self, missing_ok=False):
If the path is a directory, use rmdir() instead.
"""
try:
self._accessor.unlink(self)
os.unlink(self)
except FileNotFoundError:
if not missing_ok:
raise
Expand All @@ -1207,7 +1148,7 @@ def rmdir(self):
"""
Remove this directory. The directory must be empty.
"""
self._accessor.rmdir(self)
os.rmdir(self)

def lstat(self):
"""
Expand All @@ -1226,7 +1167,7 @@ def rename(self, target):

Returns the new Path instance pointing to the target path.
"""
self._accessor.rename(self, target)
os.rename(self, target)
return self.__class__(target)

def replace(self, target):
Expand All @@ -1239,23 +1180,27 @@ def replace(self, target):

Returns the new Path instance pointing to the target path.
"""
self._accessor.replace(self, target)
os.replace(self, target)
return self.__class__(target)

def symlink_to(self, target, target_is_directory=False):
"""
Make this path a symlink pointing to the target path.
Note the order of arguments (link, target) is the reverse of os.symlink.
"""
self._accessor.symlink(target, self, target_is_directory)
if not hasattr(os, "symlink"):
raise NotImplementedError("os.symlink() not available on this system")
os.symlink(target, self, target_is_directory)

def hardlink_to(self, target):
"""
Make this path a hard link pointing to the same file as *target*.

Note the order of arguments (self, target) is the reverse of os.link's.
"""
self._accessor.link(target, self)
if not hasattr(os, "link"):
raise NotImplementedError("os.link() not available on this system")
os.link(target, self)

def link_to(self, target):
"""
Expand All @@ -1273,7 +1218,7 @@ def link_to(self, target):
"for removal in Python 3.12. "
"Use pathlib.Path.hardlink_to() instead.",
DeprecationWarning, stacklevel=2)
self._accessor.link(self, target)
self.__class__(target).hardlink_to(self)

# Convenience functions for querying the stat results

Expand Down Expand Up @@ -1430,7 +1375,7 @@ def expanduser(self):
"""
if (not (self._drv or self._root) and
self._parts and self._parts[0][:1] == '~'):
homedir = self._accessor.expanduser(self._parts[0])
homedir = os.path.expanduser(self._parts[0])
if homedir[:1] == "~":
raise RuntimeError("Could not determine home directory.")
return self._from_parts([homedir] + self._parts[1:])
Expand Down
31 changes: 15 additions & 16 deletions Lib/test/test_pathlib.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import contextlib
import collections.abc
import io
import os
Expand Down Expand Up @@ -1716,21 +1717,18 @@ def test_glob_permissions(self):
# Patching is needed to avoid relying on the filesystem
# to return the order of the files as the error will not
# happen if the symlink is the last item.

with mock.patch("os.scandir") as scandir:
scandir.return_value = sorted(os.scandir(base))
real_scandir = os.scandir
def my_scandir(path):
with real_scandir(path) as scandir_it:
entries = list(scandir_it)
entries.sort(key=lambda entry: entry.name)
return contextlib.nullcontext(entries)

with mock.patch("os.scandir", my_scandir):
self.assertEqual(len(set(base.glob("*"))), 3)

subdir.mkdir()

with mock.patch("os.scandir") as scandir:
scandir.return_value = sorted(os.scandir(base))
subdir.mkdir()
self.assertEqual(len(set(base.glob("*"))), 4)

subdir.chmod(000)

with mock.patch("os.scandir") as scandir:
scandir.return_value = sorted(os.scandir(base))
subdir.chmod(000)
self.assertEqual(len(set(base.glob("*"))), 4)

def _check_resolve(self, p, expected, strict=True):
Expand Down Expand Up @@ -2177,6 +2175,7 @@ def test_mkdir_concurrent_parent_creation(self):
p = self.cls(BASE, 'dirCPC%d' % pattern_num)
self.assertFalse(p.exists())

real_mkdir = os.mkdir
def my_mkdir(path, mode=0o777):
path = str(path)
# Emulate another process that would create the directory
Expand All @@ -2185,15 +2184,15 @@ def my_mkdir(path, mode=0o777):
# function is called at most 5 times (dirCPC/dir1/dir2,
# dirCPC/dir1, dirCPC, dirCPC/dir1, dirCPC/dir1/dir2).
if pattern.pop():
os.mkdir(path, mode) # From another process.
real_mkdir(path, mode) # From another process.
concurrently_created.add(path)
os.mkdir(path, mode) # Our real call.
real_mkdir(path, mode) # Our real call.

pattern = [bool(pattern_num & (1 << n)) for n in range(5)]
concurrently_created = set()
p12 = p / 'dir1' / 'dir2'
try:
with mock.patch("pathlib._normal_accessor.mkdir", my_mkdir):
with mock.patch("os.mkdir", my_mkdir):
p12.mkdir(parents=True, exist_ok=False)
except FileExistsError:
self.assertIn(str(p12), concurrently_created)
Expand Down
Loading