cpython/Lib/compileall.py
Benjamin Peterson 42aa93b8ff
closes bpo-31650: PEP 552 (Deterministic pycs) implementation (#4575)
Python now supports checking bytecode cache up-to-dateness with a hash of the
source contents rather than volatile source metadata. See the PEP for details.

While a fairly straightforward idea, quite a lot of code had to be modified due
to the pervasiveness of pyc implementation details in the codebase. Changes in
this commit include:

- The core changes to importlib to understand how to read, validate, and
  regenerate hash-based pycs.

- Support for generating hash-based pycs in py_compile and compileall.

- Modifications to our siphash implementation to support passing a custom
  key. We then expose it to importlib through _imp.

- Updates to all places in the interpreter, standard library, and tests that
  manually generate or parse pyc files to grok the new format.

- Support in the interpreter command line code for long options like
  --check-hash-based-pycs.

- Tests and documentation for all of the above.
2017-12-09 10:26:52 -08:00

322 lines
13 KiB
Python

"""Module/script to byte-compile all .py files to .pyc files.
When called as a script with arguments, this compiles the directories
given as arguments recursively; the -l option prevents it from
recursing into directories.
Without arguments, if compiles all modules on sys.path, without
recursing into subdirectories. (Even though it should do so for
packages -- for now, you'll have to deal with packages separately.)
See module py_compile for details of the actual byte-compilation.
"""
import os
import sys
import importlib.util
import py_compile
import struct
try:
from concurrent.futures import ProcessPoolExecutor
except ImportError:
ProcessPoolExecutor = None
from functools import partial
__all__ = ["compile_dir","compile_file","compile_path"]
def _walk_dir(dir, ddir=None, maxlevels=10, quiet=0):
if quiet < 2 and isinstance(dir, os.PathLike):
dir = os.fspath(dir)
if not quiet:
print('Listing {!r}...'.format(dir))
try:
names = os.listdir(dir)
except OSError:
if quiet < 2:
print("Can't list {!r}".format(dir))
names = []
names.sort()
for name in names:
if name == '__pycache__':
continue
fullname = os.path.join(dir, name)
if ddir is not None:
dfile = os.path.join(ddir, name)
else:
dfile = None
if not os.path.isdir(fullname):
yield fullname
elif (maxlevels > 0 and name != os.curdir and name != os.pardir and
os.path.isdir(fullname) and not os.path.islink(fullname)):
yield from _walk_dir(fullname, ddir=dfile,
maxlevels=maxlevels - 1, quiet=quiet)
def compile_dir(dir, maxlevels=10, ddir=None, force=False, rx=None,
quiet=0, legacy=False, optimize=-1, workers=1,
invalidation_mode=py_compile.PycInvalidationMode.TIMESTAMP):
"""Byte-compile all modules in the given directory tree.
Arguments (only dir is required):
dir: the directory to byte-compile
maxlevels: maximum recursion level (default 10)
ddir: the directory that will be prepended to the path to the
file as it is compiled into each byte-code file.
force: if True, force compilation, even if timestamps are up-to-date
quiet: full output with False or 0, errors only with 1,
no output with 2
legacy: if True, produce legacy pyc paths instead of PEP 3147 paths
optimize: optimization level or -1 for level of the interpreter
workers: maximum number of parallel workers
invalidation_mode: how the up-to-dateness of the pyc will be checked
"""
if workers is not None and workers < 0:
raise ValueError('workers must be greater or equal to 0')
files = _walk_dir(dir, quiet=quiet, maxlevels=maxlevels,
ddir=ddir)
success = True
if workers is not None and workers != 1 and ProcessPoolExecutor is not None:
workers = workers or None
with ProcessPoolExecutor(max_workers=workers) as executor:
results = executor.map(partial(compile_file,
ddir=ddir, force=force,
rx=rx, quiet=quiet,
legacy=legacy,
optimize=optimize,
invalidation_mode=invalidation_mode),
files)
success = min(results, default=True)
else:
for file in files:
if not compile_file(file, ddir, force, rx, quiet,
legacy, optimize, invalidation_mode):
success = False
return success
def compile_file(fullname, ddir=None, force=False, rx=None, quiet=0,
legacy=False, optimize=-1,
invalidation_mode=py_compile.PycInvalidationMode.TIMESTAMP):
"""Byte-compile one file.
Arguments (only fullname is required):
fullname: the file to byte-compile
ddir: if given, the directory name compiled in to the
byte-code file.
force: if True, force compilation, even if timestamps are up-to-date
quiet: full output with False or 0, errors only with 1,
no output with 2
legacy: if True, produce legacy pyc paths instead of PEP 3147 paths
optimize: optimization level or -1 for level of the interpreter
invalidation_mode: how the up-to-dateness of the pyc will be checked
"""
success = True
if quiet < 2 and isinstance(fullname, os.PathLike):
fullname = os.fspath(fullname)
name = os.path.basename(fullname)
if ddir is not None:
dfile = os.path.join(ddir, name)
else:
dfile = None
if rx is not None:
mo = rx.search(fullname)
if mo:
return success
if os.path.isfile(fullname):
if legacy:
cfile = fullname + 'c'
else:
if optimize >= 0:
opt = optimize if optimize >= 1 else ''
cfile = importlib.util.cache_from_source(
fullname, optimization=opt)
else:
cfile = importlib.util.cache_from_source(fullname)
cache_dir = os.path.dirname(cfile)
head, tail = name[:-3], name[-3:]
if tail == '.py':
if not force:
try:
mtime = int(os.stat(fullname).st_mtime)
expect = struct.pack('<4sll', importlib.util.MAGIC_NUMBER,
0, mtime)
with open(cfile, 'rb') as chandle:
actual = chandle.read(12)
if expect == actual:
return success
except OSError:
pass
if not quiet:
print('Compiling {!r}...'.format(fullname))
try:
ok = py_compile.compile(fullname, cfile, dfile, True,
optimize=optimize,
invalidation_mode=invalidation_mode)
except py_compile.PyCompileError as err:
success = False
if quiet >= 2:
return success
elif quiet:
print('*** Error compiling {!r}...'.format(fullname))
else:
print('*** ', end='')
# escape non-printable characters in msg
msg = err.msg.encode(sys.stdout.encoding,
errors='backslashreplace')
msg = msg.decode(sys.stdout.encoding)
print(msg)
except (SyntaxError, UnicodeError, OSError) as e:
success = False
if quiet >= 2:
return success
elif quiet:
print('*** Error compiling {!r}...'.format(fullname))
else:
print('*** ', end='')
print(e.__class__.__name__ + ':', e)
else:
if ok == 0:
success = False
return success
def compile_path(skip_curdir=1, maxlevels=0, force=False, quiet=0,
legacy=False, optimize=-1,
invalidation_mode=py_compile.PycInvalidationMode.TIMESTAMP):
"""Byte-compile all module on sys.path.
Arguments (all optional):
skip_curdir: if true, skip current directory (default True)
maxlevels: max recursion level (default 0)
force: as for compile_dir() (default False)
quiet: as for compile_dir() (default 0)
legacy: as for compile_dir() (default False)
optimize: as for compile_dir() (default -1)
invalidation_mode: as for compiler_dir()
"""
success = True
for dir in sys.path:
if (not dir or dir == os.curdir) and skip_curdir:
if quiet < 2:
print('Skipping current directory')
else:
success = success and compile_dir(
dir,
maxlevels,
None,
force,
quiet=quiet,
legacy=legacy,
optimize=optimize,
invalidation_mode=invalidation_mode,
)
return success
def main():
"""Script main program."""
import argparse
parser = argparse.ArgumentParser(
description='Utilities to support installing Python libraries.')
parser.add_argument('-l', action='store_const', const=0,
default=10, dest='maxlevels',
help="don't recurse into subdirectories")
parser.add_argument('-r', type=int, dest='recursion',
help=('control the maximum recursion level. '
'if `-l` and `-r` options are specified, '
'then `-r` takes precedence.'))
parser.add_argument('-f', action='store_true', dest='force',
help='force rebuild even if timestamps are up to date')
parser.add_argument('-q', action='count', dest='quiet', default=0,
help='output only error messages; -qq will suppress '
'the error messages as well.')
parser.add_argument('-b', action='store_true', dest='legacy',
help='use legacy (pre-PEP3147) compiled file locations')
parser.add_argument('-d', metavar='DESTDIR', dest='ddir', default=None,
help=('directory to prepend to file paths for use in '
'compile-time tracebacks and in runtime '
'tracebacks in cases where the source file is '
'unavailable'))
parser.add_argument('-x', metavar='REGEXP', dest='rx', default=None,
help=('skip files matching the regular expression; '
'the regexp is searched for in the full path '
'of each file considered for compilation'))
parser.add_argument('-i', metavar='FILE', dest='flist',
help=('add all the files and directories listed in '
'FILE to the list considered for compilation; '
'if "-", names are read from stdin'))
parser.add_argument('compile_dest', metavar='FILE|DIR', nargs='*',
help=('zero or more file and directory names '
'to compile; if no arguments given, defaults '
'to the equivalent of -l sys.path'))
parser.add_argument('-j', '--workers', default=1,
type=int, help='Run compileall concurrently')
invalidation_modes = [mode.name.lower().replace('_', '-')
for mode in py_compile.PycInvalidationMode]
parser.add_argument('--invalidation-mode', default='timestamp',
choices=sorted(invalidation_modes),
help='How the pycs will be invalidated at runtime')
args = parser.parse_args()
compile_dests = args.compile_dest
if args.rx:
import re
args.rx = re.compile(args.rx)
if args.recursion is not None:
maxlevels = args.recursion
else:
maxlevels = args.maxlevels
# if flist is provided then load it
if args.flist:
try:
with (sys.stdin if args.flist=='-' else open(args.flist)) as f:
for line in f:
compile_dests.append(line.strip())
except OSError:
if args.quiet < 2:
print("Error reading file list {}".format(args.flist))
return False
if args.workers is not None:
args.workers = args.workers or None
ivl_mode = args.invalidation_mode.replace('-', '_').upper()
invalidation_mode = py_compile.PycInvalidationMode[ivl_mode]
success = True
try:
if compile_dests:
for dest in compile_dests:
if os.path.isfile(dest):
if not compile_file(dest, args.ddir, args.force, args.rx,
args.quiet, args.legacy,
invalidation_mode=invalidation_mode):
success = False
else:
if not compile_dir(dest, maxlevels, args.ddir,
args.force, args.rx, args.quiet,
args.legacy, workers=args.workers,
invalidation_mode=invalidation_mode):
success = False
return success
else:
return compile_path(legacy=args.legacy, force=args.force,
quiet=args.quiet,
invalidation_mode=invalidation_mode)
except KeyboardInterrupt:
if args.quiet < 2:
print("\n[interrupted]")
return False
return True
if __name__ == '__main__':
exit_status = int(not main())
sys.exit(exit_status)