mirror of
https://github.com/python/cpython.git
synced 2024-11-24 02:15:30 +08:00
gh-106531: Refresh zipfile._path with zipp 3.18. (#116835)
* gh-106531: Refresh zipfile._path with zipp 3.18. * Add blurb
This commit is contained in:
parent
ab9e322ae1
commit
be59aaf3ab
@ -43,13 +43,17 @@ class TestComplexity(unittest.TestCase):
|
||||
@classmethod
|
||||
def make_names(cls, width, letters=string.ascii_lowercase):
|
||||
"""
|
||||
>>> list(TestComplexity.make_names(1))
|
||||
['a']
|
||||
>>> list(TestComplexity.make_names(2))
|
||||
['a', 'b']
|
||||
>>> list(TestComplexity.make_names(30))
|
||||
['aa', 'ab', ..., 'bd']
|
||||
>>> list(TestComplexity.make_names(17124))
|
||||
['aaa', 'aab', ..., 'zip']
|
||||
"""
|
||||
# determine how many products are needed to produce width
|
||||
n_products = math.ceil(math.log(width, len(letters)))
|
||||
n_products = max(1, math.ceil(math.log(width, len(letters))))
|
||||
inputs = (letters,) * n_products
|
||||
combinations = itertools.product(*inputs)
|
||||
names = map(''.join, combinations)
|
||||
@ -80,7 +84,7 @@ class TestComplexity(unittest.TestCase):
|
||||
max_n=100,
|
||||
min_n=1,
|
||||
)
|
||||
assert best <= big_o.complexities.Quadratic
|
||||
assert best <= big_o.complexities.Linear
|
||||
|
||||
@pytest.mark.flaky
|
||||
def test_glob_width(self):
|
||||
|
@ -6,6 +6,7 @@ import pickle
|
||||
import sys
|
||||
import unittest
|
||||
import zipfile
|
||||
import zipfile._path
|
||||
|
||||
from ._functools import compose
|
||||
from ._itertools import Counter
|
||||
@ -20,16 +21,6 @@ class jaraco:
|
||||
Counter = Counter
|
||||
|
||||
|
||||
def add_dirs(zf):
|
||||
"""
|
||||
Given a writable zip file zf, inject directory entries for
|
||||
any directories implied by the presence of children.
|
||||
"""
|
||||
for name in zipfile.CompleteDirs._implied_dirs(zf.namelist()):
|
||||
zf.writestr(name, b"")
|
||||
return zf
|
||||
|
||||
|
||||
def build_alpharep_fixture():
|
||||
"""
|
||||
Create a zip file with this structure:
|
||||
@ -76,7 +67,7 @@ def build_alpharep_fixture():
|
||||
|
||||
alpharep_generators = [
|
||||
Invoked.wrap(build_alpharep_fixture),
|
||||
Invoked.wrap(compose(add_dirs, build_alpharep_fixture)),
|
||||
Invoked.wrap(compose(zipfile._path.CompleteDirs.inject, build_alpharep_fixture)),
|
||||
]
|
||||
|
||||
pass_alpharep = parameterize(['alpharep'], alpharep_generators)
|
||||
@ -210,11 +201,12 @@ class TestPath(unittest.TestCase):
|
||||
with zf.joinpath('file.txt').open('w', encoding="utf-8") as strm:
|
||||
strm.write('text file')
|
||||
|
||||
def test_open_extant_directory(self):
|
||||
@pass_alpharep
|
||||
def test_open_extant_directory(self, alpharep):
|
||||
"""
|
||||
Attempting to open a directory raises IsADirectoryError.
|
||||
"""
|
||||
zf = zipfile.Path(add_dirs(build_alpharep_fixture()))
|
||||
zf = zipfile.Path(alpharep)
|
||||
with self.assertRaises(IsADirectoryError):
|
||||
zf.joinpath('b').open()
|
||||
|
||||
@ -226,11 +218,12 @@ class TestPath(unittest.TestCase):
|
||||
with self.assertRaises(ValueError):
|
||||
root.joinpath('a.txt').open('rb', 'utf-8')
|
||||
|
||||
def test_open_missing_directory(self):
|
||||
@pass_alpharep
|
||||
def test_open_missing_directory(self, alpharep):
|
||||
"""
|
||||
Attempting to open a missing directory raises FileNotFoundError.
|
||||
"""
|
||||
zf = zipfile.Path(add_dirs(build_alpharep_fixture()))
|
||||
zf = zipfile.Path(alpharep)
|
||||
with self.assertRaises(FileNotFoundError):
|
||||
zf.joinpath('z').open()
|
||||
|
||||
|
@ -5,8 +5,9 @@ import itertools
|
||||
import contextlib
|
||||
import pathlib
|
||||
import re
|
||||
import sys
|
||||
|
||||
from .glob import translate
|
||||
from .glob import Translator
|
||||
|
||||
|
||||
__all__ = ['Path']
|
||||
@ -147,6 +148,16 @@ class CompleteDirs(InitializedState, zipfile.ZipFile):
|
||||
source.__class__ = cls
|
||||
return source
|
||||
|
||||
@classmethod
|
||||
def inject(cls, zf: zipfile.ZipFile) -> zipfile.ZipFile:
|
||||
"""
|
||||
Given a writable zip file zf, inject directory entries for
|
||||
any directories implied by the presence of children.
|
||||
"""
|
||||
for name in cls._implied_dirs(zf.namelist()):
|
||||
zf.writestr(name, b"")
|
||||
return zf
|
||||
|
||||
|
||||
class FastLookup(CompleteDirs):
|
||||
"""
|
||||
@ -168,8 +179,10 @@ class FastLookup(CompleteDirs):
|
||||
|
||||
|
||||
def _extract_text_encoding(encoding=None, *args, **kwargs):
|
||||
# stacklevel=3 so that the caller of the caller see any warning.
|
||||
return io.text_encoding(encoding, 3), args, kwargs
|
||||
# compute stack level so that the caller of the caller sees any warning.
|
||||
is_pypy = sys.implementation.name == 'pypy'
|
||||
stack_level = 3 + is_pypy
|
||||
return io.text_encoding(encoding, stack_level), args, kwargs
|
||||
|
||||
|
||||
class Path:
|
||||
@ -194,13 +207,13 @@ class Path:
|
||||
|
||||
Path accepts the zipfile object itself or a filename
|
||||
|
||||
>>> root = Path(zf)
|
||||
>>> path = Path(zf)
|
||||
|
||||
From there, several path operations are available.
|
||||
|
||||
Directory iteration (including the zip file itself):
|
||||
|
||||
>>> a, b = root.iterdir()
|
||||
>>> a, b = path.iterdir()
|
||||
>>> a
|
||||
Path('mem/abcde.zip', 'a.txt')
|
||||
>>> b
|
||||
@ -238,16 +251,38 @@ class Path:
|
||||
'mem/abcde.zip/b/c.txt'
|
||||
|
||||
At the root, ``name``, ``filename``, and ``parent``
|
||||
resolve to the zipfile. Note these attributes are not
|
||||
valid and will raise a ``ValueError`` if the zipfile
|
||||
has no filename.
|
||||
resolve to the zipfile.
|
||||
|
||||
>>> root.name
|
||||
>>> str(path)
|
||||
'mem/abcde.zip/'
|
||||
>>> path.name
|
||||
'abcde.zip'
|
||||
>>> str(root.filename).replace(os.sep, posixpath.sep)
|
||||
'mem/abcde.zip'
|
||||
>>> str(root.parent)
|
||||
>>> path.filename == pathlib.Path('mem/abcde.zip')
|
||||
True
|
||||
>>> str(path.parent)
|
||||
'mem'
|
||||
|
||||
If the zipfile has no filename, such attribtues are not
|
||||
valid and accessing them will raise an Exception.
|
||||
|
||||
>>> zf.filename = None
|
||||
>>> path.name
|
||||
Traceback (most recent call last):
|
||||
...
|
||||
TypeError: ...
|
||||
|
||||
>>> path.filename
|
||||
Traceback (most recent call last):
|
||||
...
|
||||
TypeError: ...
|
||||
|
||||
>>> path.parent
|
||||
Traceback (most recent call last):
|
||||
...
|
||||
TypeError: ...
|
||||
|
||||
# workaround python/cpython#106763
|
||||
>>> pass
|
||||
"""
|
||||
|
||||
__repr = "{self.__class__.__name__}({self.root.filename!r}, {self.at!r})"
|
||||
@ -364,8 +399,10 @@ class Path:
|
||||
raise ValueError(f"Unacceptable pattern: {pattern!r}")
|
||||
|
||||
prefix = re.escape(self.at)
|
||||
matches = re.compile(prefix + translate(pattern)).fullmatch
|
||||
return map(self._next, filter(matches, self.root.namelist()))
|
||||
tr = Translator(seps='/')
|
||||
matches = re.compile(prefix + tr.translate(pattern)).fullmatch
|
||||
names = (data.filename for data in self.root.filelist)
|
||||
return map(self._next, filter(matches, names))
|
||||
|
||||
def rglob(self, pattern):
|
||||
return self.glob(f'**/{pattern}')
|
||||
|
@ -1,18 +1,97 @@
|
||||
import os
|
||||
import re
|
||||
|
||||
|
||||
def translate(pattern):
|
||||
r"""
|
||||
Given a glob pattern, produce a regex that matches it.
|
||||
_default_seps = os.sep + str(os.altsep) * bool(os.altsep)
|
||||
|
||||
>>> translate('*.txt')
|
||||
'[^/]*\\.txt'
|
||||
>>> translate('a?txt')
|
||||
'a.txt'
|
||||
>>> translate('**/*')
|
||||
'.*/[^/]*'
|
||||
|
||||
class Translator:
|
||||
"""
|
||||
return ''.join(map(replace, separate(pattern)))
|
||||
>>> Translator('xyz')
|
||||
Traceback (most recent call last):
|
||||
...
|
||||
AssertionError: Invalid separators
|
||||
|
||||
>>> Translator('')
|
||||
Traceback (most recent call last):
|
||||
...
|
||||
AssertionError: Invalid separators
|
||||
"""
|
||||
|
||||
seps: str
|
||||
|
||||
def __init__(self, seps: str = _default_seps):
|
||||
assert seps and set(seps) <= set(_default_seps), "Invalid separators"
|
||||
self.seps = seps
|
||||
|
||||
def translate(self, pattern):
|
||||
"""
|
||||
Given a glob pattern, produce a regex that matches it.
|
||||
"""
|
||||
return self.extend(self.translate_core(pattern))
|
||||
|
||||
def extend(self, pattern):
|
||||
r"""
|
||||
Extend regex for pattern-wide concerns.
|
||||
|
||||
Apply '(?s:)' to create a non-matching group that
|
||||
matches newlines (valid on Unix).
|
||||
|
||||
Append '\Z' to imply fullmatch even when match is used.
|
||||
"""
|
||||
return rf'(?s:{pattern})\Z'
|
||||
|
||||
def translate_core(self, pattern):
|
||||
r"""
|
||||
Given a glob pattern, produce a regex that matches it.
|
||||
|
||||
>>> t = Translator()
|
||||
>>> t.translate_core('*.txt').replace('\\\\', '')
|
||||
'[^/]*\\.txt'
|
||||
>>> t.translate_core('a?txt')
|
||||
'a[^/]txt'
|
||||
>>> t.translate_core('**/*').replace('\\\\', '')
|
||||
'.*/[^/][^/]*'
|
||||
"""
|
||||
self.restrict_rglob(pattern)
|
||||
return ''.join(map(self.replace, separate(self.star_not_empty(pattern))))
|
||||
|
||||
def replace(self, match):
|
||||
"""
|
||||
Perform the replacements for a match from :func:`separate`.
|
||||
"""
|
||||
return match.group('set') or (
|
||||
re.escape(match.group(0))
|
||||
.replace('\\*\\*', r'.*')
|
||||
.replace('\\*', rf'[^{re.escape(self.seps)}]*')
|
||||
.replace('\\?', r'[^/]')
|
||||
)
|
||||
|
||||
def restrict_rglob(self, pattern):
|
||||
"""
|
||||
Raise ValueError if ** appears in anything but a full path segment.
|
||||
|
||||
>>> Translator().translate('**foo')
|
||||
Traceback (most recent call last):
|
||||
...
|
||||
ValueError: ** must appear alone in a path segment
|
||||
"""
|
||||
seps_pattern = rf'[{re.escape(self.seps)}]+'
|
||||
segments = re.split(seps_pattern, pattern)
|
||||
if any('**' in segment and segment != '**' for segment in segments):
|
||||
raise ValueError("** must appear alone in a path segment")
|
||||
|
||||
def star_not_empty(self, pattern):
|
||||
"""
|
||||
Ensure that * will not match an empty segment.
|
||||
"""
|
||||
|
||||
def handle_segment(match):
|
||||
segment = match.group(0)
|
||||
return '?*' if segment == '*' else segment
|
||||
|
||||
not_seps_pattern = rf'[^{re.escape(self.seps)}]+'
|
||||
return re.sub(not_seps_pattern, handle_segment, pattern)
|
||||
|
||||
|
||||
def separate(pattern):
|
||||
@ -25,16 +104,3 @@ def separate(pattern):
|
||||
['a', '[?]', 'txt']
|
||||
"""
|
||||
return re.finditer(r'([^\[]+)|(?P<set>[\[].*?[\]])|([\[][^\]]*$)', pattern)
|
||||
|
||||
|
||||
def replace(match):
|
||||
"""
|
||||
Perform the replacements for a match from :func:`separate`.
|
||||
"""
|
||||
|
||||
return match.group('set') or (
|
||||
re.escape(match.group(0))
|
||||
.replace('\\*\\*', r'.*')
|
||||
.replace('\\*', r'[^/]*')
|
||||
.replace('\\?', r'.')
|
||||
)
|
||||
|
@ -0,0 +1,5 @@
|
||||
Refreshed zipfile._path from `zipp 3.18
|
||||
<https://zipp.readthedocs.io/en/latest/history.html#v3-18-0>`_, providing
|
||||
better compatibility for PyPy, better glob performance for deeply nested
|
||||
zipfiles, and providing internal access to ``CompleteDirs.inject`` for use
|
||||
in other tests (like importlib.resources).
|
Loading…
Reference in New Issue
Block a user