-
-
Notifications
You must be signed in to change notification settings - Fork 30.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
gh-91133: tempfile.TemporaryDirectory: fix symlink bug in cleanup #99930
Changes from 11 commits
d8f50e5
d268947
60de13a
983a9ac
5b83afe
db40615
96124ef
1aadbdd
cb9aea4
f106e25
0be03d1
0a05de0
081a898
58924b6
b88478f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -1673,6 +1673,103 @@ def test_cleanup_with_symlink_to_a_directory(self): | |||||||||||
"were deleted") | ||||||||||||
d2.cleanup() | ||||||||||||
|
||||||||||||
@os_helper.skip_unless_symlink | ||||||||||||
def test_cleanup_with_symlink_modes(self): | ||||||||||||
# cleanup() should not follow symlinks when fixing mode bits (#91133) | ||||||||||||
with self.do_create(recurse=0) as d2: | ||||||||||||
file1 = os.path.join(d2, 'file1') | ||||||||||||
open(file1, 'wb').close() | ||||||||||||
dir1 = os.path.join(d2, 'dir1') | ||||||||||||
os.mkdir(dir1) | ||||||||||||
for mode in range(8): | ||||||||||||
mode <<= 6 | ||||||||||||
with self.subTest(mode=format(mode, '03o')): | ||||||||||||
def test(target, target_is_directory): | ||||||||||||
d1 = self.do_create(recurse=0) | ||||||||||||
symlink = os.path.join(d1.name, 'symlink') | ||||||||||||
os.symlink(target, symlink, | ||||||||||||
target_is_directory=target_is_directory) | ||||||||||||
try: | ||||||||||||
os.chmod(symlink, mode, follow_symlinks=False) | ||||||||||||
except NotImplementedError: | ||||||||||||
pass | ||||||||||||
try: | ||||||||||||
os.chmod(symlink, mode) | ||||||||||||
except FileNotFoundError: | ||||||||||||
pass | ||||||||||||
os.chmod(d1.name, mode) | ||||||||||||
d1.cleanup() | ||||||||||||
self.assertFalse(os.path.exists(d1.name)) | ||||||||||||
|
||||||||||||
with self.subTest('nonexisting file'): | ||||||||||||
test('nonexisting', target_is_directory=False) | ||||||||||||
with self.subTest('nonexisting dir'): | ||||||||||||
test('nonexisting', target_is_directory=True) | ||||||||||||
|
||||||||||||
with self.subTest('existing file'): | ||||||||||||
os.chmod(file1, mode) | ||||||||||||
old_mode = os.stat(file1).st_mode | ||||||||||||
test(file1, target_is_directory=False) | ||||||||||||
new_mode = os.stat(file1).st_mode | ||||||||||||
self.assertEqual(new_mode, old_mode, | ||||||||||||
'%03o != %03o' % (new_mode, old_mode)) | ||||||||||||
|
||||||||||||
with self.subTest('existing dir'): | ||||||||||||
os.chmod(dir1, mode) | ||||||||||||
old_mode = os.stat(dir1).st_mode | ||||||||||||
test(dir1, target_is_directory=True) | ||||||||||||
new_mode = os.stat(dir1).st_mode | ||||||||||||
self.assertEqual(new_mode, old_mode, | ||||||||||||
'%03o != %03o' % (new_mode, old_mode)) | ||||||||||||
|
||||||||||||
@unittest.skipUnless(hasattr(os, 'chflags'), 'requires os.chflags') | ||||||||||||
@os_helper.skip_unless_symlink | ||||||||||||
def test_cleanup_with_symlink_flags(self): | ||||||||||||
# cleanup() should not follow symlinks when fixing flags (#91133) | ||||||||||||
flags = stat.UF_IMMUTABLE | stat.UF_NOUNLINK | ||||||||||||
self.check_flags() | ||||||||||||
|
||||||||||||
with self.do_create(recurse=0) as d2: | ||||||||||||
file1 = os.path.join(d2, 'file1') | ||||||||||||
open(file1, 'wb').close() | ||||||||||||
dir1 = os.path.join(d2, 'dir1') | ||||||||||||
os.mkdir(dir1) | ||||||||||||
def test(target, target_is_directory): | ||||||||||||
d1 = self.do_create(recurse=0) | ||||||||||||
symlink = os.path.join(d1.name, 'symlink') | ||||||||||||
os.symlink(target, symlink, | ||||||||||||
target_is_directory=target_is_directory) | ||||||||||||
try: | ||||||||||||
os.chflags(symlink, flags, follow_symlinks=False) | ||||||||||||
except NotImplementedError: | ||||||||||||
pass | ||||||||||||
try: | ||||||||||||
os.chflags(symlink, flags) | ||||||||||||
except FileNotFoundError: | ||||||||||||
pass | ||||||||||||
os.chflags(d1.name, flags) | ||||||||||||
d1.cleanup() | ||||||||||||
self.assertFalse(os.path.exists(d1.name)) | ||||||||||||
|
||||||||||||
with self.subTest('nonexisting file'): | ||||||||||||
test('nonexisting', target_is_directory=False) | ||||||||||||
with self.subTest('nonexisting dir'): | ||||||||||||
test('nonexisting', target_is_directory=True) | ||||||||||||
|
||||||||||||
with self.subTest('existing file'): | ||||||||||||
os.chflags(file1, flags) | ||||||||||||
old_flags = os.stat(file1).st_flags | ||||||||||||
test(file1, target_is_directory=False) | ||||||||||||
new_flags = os.stat(file1).st_flags | ||||||||||||
self.assertEqual(new_flags, old_flags) | ||||||||||||
|
||||||||||||
with self.subTest('existing dir'): | ||||||||||||
os.chflags(dir1, flags) | ||||||||||||
old_flags = os.stat(dir1).st_flags | ||||||||||||
test(dir1, target_is_directory=True) | ||||||||||||
new_flags = os.stat(dir1).st_flags | ||||||||||||
self.assertEqual(new_flags, old_flags) | ||||||||||||
|
||||||||||||
@support.cpython_only | ||||||||||||
def test_del_on_collection(self): | ||||||||||||
# A TemporaryDirectory is deleted when garbage collected | ||||||||||||
|
@@ -1845,8 +1942,7 @@ def test_modes(self): | |||||||||||
d.cleanup() | ||||||||||||
self.assertFalse(os.path.exists(d.name)) | ||||||||||||
|
||||||||||||
@unittest.skipUnless(hasattr(os, 'chflags'), 'requires os.chflags') | ||||||||||||
def test_flags(self): | ||||||||||||
def check_flags(self): | ||||||||||||
flags = stat.UF_IMMUTABLE | stat.UF_NOUNLINK | ||||||||||||
|
||||||||||||
# skip the test if these flags are not supported (ex: FreeBSD 13) | ||||||||||||
|
@@ -1864,6 +1960,11 @@ def test_flags(self): | |||||||||||
finally: | ||||||||||||
os_helper.unlink(filename) | ||||||||||||
|
||||||||||||
@unittest.skipUnless(hasattr(os, 'chflags'), 'requires os.chflags') | ||||||||||||
def test_flags(self): | ||||||||||||
flags = stat.UF_IMMUTABLE | stat.UF_NOUNLINK | ||||||||||||
self.check_flags() | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can pass flags to check_flags(), rather than having a copy inside the function. I also suggest moving the comment here:
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I do not want to repeat the same comment 3 times. |
||||||||||||
|
||||||||||||
d = self.do_create(recurse=3, dirs=2, files=2) | ||||||||||||
with d: | ||||||||||||
# Change files and directories flags recursively. | ||||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
Fix a bug in :class:`tempfile.TemporaryDirectory` cleanup, which now no longer | ||
dereferences symlinks when working around file system permission errors. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest to rename the function to "no_follow_symlinks" or "dont_follow_symlinks".
I dislike this trend to declare nested functions. Can't you move these functions at the module level, just make them private? This function doesn't use name nor ignore_errors.