mirror of
https://github.com/python/cpython.git
synced 2024-11-27 20:04:41 +08:00
569 lines
17 KiB
Python
569 lines
17 KiB
Python
r"""OS routines for Mac, DOS, NT, or Posix depending on what system we're on.
|
|
|
|
This exports:
|
|
- all functions from posix, nt, os2, mac, or ce, e.g. unlink, stat, etc.
|
|
- os.path is one of the modules posixpath, ntpath, or macpath
|
|
- os.name is 'posix', 'nt', 'os2', 'mac', 'ce' or 'riscos'
|
|
- os.curdir is a string representing the current directory ('.' or ':')
|
|
- os.pardir is a string representing the parent directory ('..' or '::')
|
|
- os.sep is the (or a most common) pathname separator ('/' or ':' or '\\')
|
|
- os.extsep is the extension separator ('.' or '/')
|
|
- os.altsep is the alternate pathname separator (None or '/')
|
|
- os.pathsep is the component separator used in $PATH etc
|
|
- os.linesep is the line separator in text files ('\r' or '\n' or '\r\n')
|
|
- os.defpath is the default search path for executables
|
|
|
|
Programs that import and use 'os' stand a better chance of being
|
|
portable between different platforms. Of course, they must then
|
|
only use functions that are defined by all platforms (e.g., unlink
|
|
and opendir), and leave all pathname manipulation to os.path
|
|
(e.g., split and join).
|
|
"""
|
|
|
|
#'
|
|
|
|
import sys
|
|
|
|
_names = sys.builtin_module_names
|
|
|
|
__all__ = ["altsep", "curdir", "pardir", "sep", "pathsep", "linesep",
|
|
"defpath", "name", "path"]
|
|
|
|
def _get_exports_list(module):
|
|
try:
|
|
return list(module.__all__)
|
|
except AttributeError:
|
|
return [n for n in dir(module) if n[0] != '_']
|
|
|
|
if 'posix' in _names:
|
|
name = 'posix'
|
|
linesep = '\n'
|
|
from posix import *
|
|
try:
|
|
from posix import _exit
|
|
except ImportError:
|
|
pass
|
|
import posixpath as path
|
|
|
|
import posix
|
|
__all__.extend(_get_exports_list(posix))
|
|
del posix
|
|
|
|
elif 'nt' in _names:
|
|
name = 'nt'
|
|
linesep = '\r\n'
|
|
from nt import *
|
|
try:
|
|
from nt import _exit
|
|
except ImportError:
|
|
pass
|
|
import ntpath as path
|
|
|
|
import nt
|
|
__all__.extend(_get_exports_list(nt))
|
|
del nt
|
|
|
|
elif 'os2' in _names:
|
|
name = 'os2'
|
|
linesep = '\r\n'
|
|
from os2 import *
|
|
try:
|
|
from os2 import _exit
|
|
except ImportError:
|
|
pass
|
|
if sys.version.find('EMX GCC') == -1:
|
|
import ntpath as path
|
|
else:
|
|
import os2emxpath as path
|
|
|
|
import os2
|
|
__all__.extend(_get_exports_list(os2))
|
|
del os2
|
|
|
|
elif 'mac' in _names:
|
|
name = 'mac'
|
|
linesep = '\r'
|
|
from mac import *
|
|
try:
|
|
from mac import _exit
|
|
except ImportError:
|
|
pass
|
|
import macpath as path
|
|
|
|
import mac
|
|
__all__.extend(_get_exports_list(mac))
|
|
del mac
|
|
|
|
elif 'ce' in _names:
|
|
name = 'ce'
|
|
linesep = '\r\n'
|
|
from ce import *
|
|
try:
|
|
from ce import _exit
|
|
except ImportError:
|
|
pass
|
|
# We can use the standard Windows path.
|
|
import ntpath as path
|
|
|
|
import ce
|
|
__all__.extend(_get_exports_list(ce))
|
|
del ce
|
|
|
|
elif 'riscos' in _names:
|
|
name = 'riscos'
|
|
linesep = '\n'
|
|
from riscos import *
|
|
try:
|
|
from riscos import _exit
|
|
except ImportError:
|
|
pass
|
|
import riscospath as path
|
|
|
|
import riscos
|
|
__all__.extend(_get_exports_list(riscos))
|
|
del riscos
|
|
|
|
else:
|
|
raise ImportError, 'no os specific module found'
|
|
|
|
sys.modules['os.path'] = path
|
|
from os.path import curdir, pardir, sep, pathsep, defpath, extsep, altsep
|
|
|
|
del _names
|
|
|
|
#'
|
|
|
|
# Super directory utilities.
|
|
# (Inspired by Eric Raymond; the doc strings are mostly his)
|
|
|
|
def makedirs(name, mode=0777):
|
|
"""makedirs(path [, mode=0777])
|
|
|
|
Super-mkdir; create a leaf directory and all intermediate ones.
|
|
Works like mkdir, except that any intermediate path segment (not
|
|
just the rightmost) will be created if it does not exist. This is
|
|
recursive.
|
|
|
|
"""
|
|
head, tail = path.split(name)
|
|
if not tail:
|
|
head, tail = path.split(head)
|
|
if head and tail and not path.exists(head):
|
|
makedirs(head, mode)
|
|
mkdir(name, mode)
|
|
|
|
def removedirs(name):
|
|
"""removedirs(path)
|
|
|
|
Super-rmdir; remove a leaf directory and empty all intermediate
|
|
ones. Works like rmdir except that, if the leaf directory is
|
|
successfully removed, directories corresponding to rightmost path
|
|
segments will be pruned way until either the whole path is
|
|
consumed or an error occurs. Errors during this latter phase are
|
|
ignored -- they generally mean that a directory was not empty.
|
|
|
|
"""
|
|
rmdir(name)
|
|
head, tail = path.split(name)
|
|
if not tail:
|
|
head, tail = path.split(head)
|
|
while head and tail:
|
|
try:
|
|
rmdir(head)
|
|
except error:
|
|
break
|
|
head, tail = path.split(head)
|
|
|
|
def renames(old, new):
|
|
"""renames(old, new)
|
|
|
|
Super-rename; create directories as necessary and delete any left
|
|
empty. Works like rename, except creation of any intermediate
|
|
directories needed to make the new pathname good is attempted
|
|
first. After the rename, directories corresponding to rightmost
|
|
path segments of the old name will be pruned way until either the
|
|
whole path is consumed or a nonempty directory is found.
|
|
|
|
Note: this function can fail with the new directory structure made
|
|
if you lack permissions needed to unlink the leaf directory or
|
|
file.
|
|
|
|
"""
|
|
head, tail = path.split(new)
|
|
if head and tail and not path.exists(head):
|
|
makedirs(head)
|
|
rename(old, new)
|
|
head, tail = path.split(old)
|
|
if head and tail:
|
|
try:
|
|
removedirs(head)
|
|
except error:
|
|
pass
|
|
|
|
__all__.extend(["makedirs", "removedirs", "renames"])
|
|
|
|
# Make sure os.environ exists, at least
|
|
try:
|
|
environ
|
|
except NameError:
|
|
environ = {}
|
|
|
|
def execl(file, *args):
|
|
"""execl(file, *args)
|
|
|
|
Execute the executable file with argument list args, replacing the
|
|
current process. """
|
|
execv(file, args)
|
|
|
|
def execle(file, *args):
|
|
"""execle(file, *args, env)
|
|
|
|
Execute the executable file with argument list args and
|
|
environment env, replacing the current process. """
|
|
env = args[-1]
|
|
execve(file, args[:-1], env)
|
|
|
|
def execlp(file, *args):
|
|
"""execlp(file, *args)
|
|
|
|
Execute the executable file (which is searched for along $PATH)
|
|
with argument list args, replacing the current process. """
|
|
execvp(file, args)
|
|
|
|
def execlpe(file, *args):
|
|
"""execlpe(file, *args, env)
|
|
|
|
Execute the executable file (which is searched for along $PATH)
|
|
with argument list args and environment env, replacing the current
|
|
process. """
|
|
env = args[-1]
|
|
execvpe(file, args[:-1], env)
|
|
|
|
def execvp(file, args):
|
|
"""execp(file, args)
|
|
|
|
Execute the executable file (which is searched for along $PATH)
|
|
with argument list args, replacing the current process.
|
|
args may be a list or tuple of strings. """
|
|
_execvpe(file, args)
|
|
|
|
def execvpe(file, args, env):
|
|
"""execvpe(file, args, env)
|
|
|
|
Execute the executable file (which is searched for along $PATH)
|
|
with argument list args and environment env , replacing the
|
|
current process.
|
|
args may be a list or tuple of strings. """
|
|
_execvpe(file, args, env)
|
|
|
|
__all__.extend(["execl","execle","execlp","execlpe","execvp","execvpe"])
|
|
|
|
def _execvpe(file, args, env=None):
|
|
from errno import ENOENT, ENOTDIR
|
|
|
|
if env is not None:
|
|
func = execve
|
|
argrest = (args, env)
|
|
else:
|
|
func = execv
|
|
argrest = (args,)
|
|
env = environ
|
|
|
|
head, tail = path.split(file)
|
|
if head:
|
|
func(file, *argrest)
|
|
return
|
|
if 'PATH' in env:
|
|
envpath = env['PATH']
|
|
else:
|
|
envpath = defpath
|
|
PATH = envpath.split(pathsep)
|
|
saved_exc = None
|
|
saved_tb = None
|
|
for dir in PATH:
|
|
fullname = path.join(dir, file)
|
|
try:
|
|
func(fullname, *argrest)
|
|
except error, e:
|
|
tb = sys.exc_info()[2]
|
|
if (e.errno != ENOENT and e.errno != ENOTDIR
|
|
and saved_exc is None):
|
|
saved_exc = e
|
|
saved_tb = tb
|
|
if saved_exc:
|
|
raise error, saved_exc, saved_tb
|
|
raise error, e, tb
|
|
|
|
# Change environ to automatically call putenv() if it exists
|
|
try:
|
|
# This will fail if there's no putenv
|
|
putenv
|
|
except NameError:
|
|
pass
|
|
else:
|
|
import UserDict
|
|
|
|
# Fake unsetenv() for Windows
|
|
# not sure about os2 here but
|
|
# I'm guessing they are the same.
|
|
|
|
if name in ('os2', 'nt'):
|
|
def unsetenv(key):
|
|
putenv(key, "")
|
|
|
|
if name == "riscos":
|
|
# On RISC OS, all env access goes through getenv and putenv
|
|
from riscosenviron import _Environ
|
|
elif name in ('os2', 'nt'): # Where Env Var Names Must Be UPPERCASE
|
|
# But we store them as upper case
|
|
class _Environ(UserDict.IterableUserDict):
|
|
def __init__(self, environ):
|
|
UserDict.UserDict.__init__(self)
|
|
data = self.data
|
|
for k, v in environ.items():
|
|
data[k.upper()] = v
|
|
def __setitem__(self, key, item):
|
|
putenv(key, item)
|
|
self.data[key.upper()] = item
|
|
def __getitem__(self, key):
|
|
return self.data[key.upper()]
|
|
try:
|
|
unsetenv
|
|
except NameError:
|
|
def __delitem__(self, key):
|
|
del self.data[key.upper()]
|
|
else:
|
|
def __delitem__(self, key):
|
|
unsetenv(key)
|
|
del self.data[key.upper()]
|
|
def has_key(self, key):
|
|
return key.upper() in self.data
|
|
def __contains__(self, key):
|
|
return key.upper() in self.data
|
|
def get(self, key, failobj=None):
|
|
return self.data.get(key.upper(), failobj)
|
|
def update(self, dict):
|
|
for k, v in dict.items():
|
|
self[k] = v
|
|
def copy(self):
|
|
return dict(self)
|
|
|
|
else: # Where Env Var Names Can Be Mixed Case
|
|
class _Environ(UserDict.IterableUserDict):
|
|
def __init__(self, environ):
|
|
UserDict.UserDict.__init__(self)
|
|
self.data = environ
|
|
def __setitem__(self, key, item):
|
|
putenv(key, item)
|
|
self.data[key] = item
|
|
def update(self, dict):
|
|
for k, v in dict.items():
|
|
self[k] = v
|
|
try:
|
|
unsetenv
|
|
except NameError:
|
|
pass
|
|
else:
|
|
def __delitem__(self, key):
|
|
unsetenv(key)
|
|
del self.data[key]
|
|
def copy(self):
|
|
return dict(self)
|
|
|
|
|
|
environ = _Environ(environ)
|
|
|
|
def getenv(key, default=None):
|
|
"""Get an environment variable, return None if it doesn't exist.
|
|
The optional second argument can specify an alternate default."""
|
|
return environ.get(key, default)
|
|
__all__.append("getenv")
|
|
|
|
def _exists(name):
|
|
try:
|
|
eval(name)
|
|
return True
|
|
except NameError:
|
|
return False
|
|
|
|
# Supply spawn*() (probably only for Unix)
|
|
if _exists("fork") and not _exists("spawnv") and _exists("execv"):
|
|
|
|
P_WAIT = 0
|
|
P_NOWAIT = P_NOWAITO = 1
|
|
|
|
# XXX Should we support P_DETACH? I suppose it could fork()**2
|
|
# and close the std I/O streams. Also, P_OVERLAY is the same
|
|
# as execv*()?
|
|
|
|
def _spawnvef(mode, file, args, env, func):
|
|
# Internal helper; func is the exec*() function to use
|
|
pid = fork()
|
|
if not pid:
|
|
# Child
|
|
try:
|
|
if env is None:
|
|
func(file, args)
|
|
else:
|
|
func(file, args, env)
|
|
except:
|
|
_exit(127)
|
|
else:
|
|
# Parent
|
|
if mode == P_NOWAIT:
|
|
return pid # Caller is responsible for waiting!
|
|
while 1:
|
|
wpid, sts = waitpid(pid, 0)
|
|
if WIFSTOPPED(sts):
|
|
continue
|
|
elif WIFSIGNALED(sts):
|
|
return -WTERMSIG(sts)
|
|
elif WIFEXITED(sts):
|
|
return WEXITSTATUS(sts)
|
|
else:
|
|
raise error, "Not stopped, signaled or exited???"
|
|
|
|
def spawnv(mode, file, args):
|
|
"""spawnv(mode, file, args) -> integer
|
|
|
|
Execute file with arguments from args in a subprocess.
|
|
If mode == P_NOWAIT return the pid of the process.
|
|
If mode == P_WAIT return the process's exit code if it exits normally;
|
|
otherwise return -SIG, where SIG is the signal that killed it. """
|
|
return _spawnvef(mode, file, args, None, execv)
|
|
|
|
def spawnve(mode, file, args, env):
|
|
"""spawnve(mode, file, args, env) -> integer
|
|
|
|
Execute file with arguments from args in a subprocess with the
|
|
specified environment.
|
|
If mode == P_NOWAIT return the pid of the process.
|
|
If mode == P_WAIT return the process's exit code if it exits normally;
|
|
otherwise return -SIG, where SIG is the signal that killed it. """
|
|
return _spawnvef(mode, file, args, env, execve)
|
|
|
|
# Note: spawnvp[e] is't currently supported on Windows
|
|
|
|
def spawnvp(mode, file, args):
|
|
"""spawnvp(mode, file, args) -> integer
|
|
|
|
Execute file (which is looked for along $PATH) with arguments from
|
|
args in a subprocess.
|
|
If mode == P_NOWAIT return the pid of the process.
|
|
If mode == P_WAIT return the process's exit code if it exits normally;
|
|
otherwise return -SIG, where SIG is the signal that killed it. """
|
|
return _spawnvef(mode, file, args, None, execvp)
|
|
|
|
def spawnvpe(mode, file, args, env):
|
|
"""spawnvpe(mode, file, args, env) -> integer
|
|
|
|
Execute file (which is looked for along $PATH) with arguments from
|
|
args in a subprocess with the supplied environment.
|
|
If mode == P_NOWAIT return the pid of the process.
|
|
If mode == P_WAIT return the process's exit code if it exits normally;
|
|
otherwise return -SIG, where SIG is the signal that killed it. """
|
|
return _spawnvef(mode, file, args, env, execvpe)
|
|
|
|
if _exists("spawnv"):
|
|
# These aren't supplied by the basic Windows code
|
|
# but can be easily implemented in Python
|
|
|
|
def spawnl(mode, file, *args):
|
|
"""spawnl(mode, file, *args) -> integer
|
|
|
|
Execute file with arguments from args in a subprocess.
|
|
If mode == P_NOWAIT return the pid of the process.
|
|
If mode == P_WAIT return the process's exit code if it exits normally;
|
|
otherwise return -SIG, where SIG is the signal that killed it. """
|
|
return spawnv(mode, file, args)
|
|
|
|
def spawnle(mode, file, *args):
|
|
"""spawnle(mode, file, *args, env) -> integer
|
|
|
|
Execute file with arguments from args in a subprocess with the
|
|
supplied environment.
|
|
If mode == P_NOWAIT return the pid of the process.
|
|
If mode == P_WAIT return the process's exit code if it exits normally;
|
|
otherwise return -SIG, where SIG is the signal that killed it. """
|
|
env = args[-1]
|
|
return spawnve(mode, file, args[:-1], env)
|
|
|
|
if _exists("spawnvp"):
|
|
# At the moment, Windows doesn't implement spawnvp[e],
|
|
# so it won't have spawnlp[e] either.
|
|
def spawnlp(mode, file, *args):
|
|
"""spawnlp(mode, file, *args, env) -> integer
|
|
|
|
Execute file (which is looked for along $PATH) with arguments from
|
|
args in a subprocess with the supplied environment.
|
|
If mode == P_NOWAIT return the pid of the process.
|
|
If mode == P_WAIT return the process's exit code if it exits normally;
|
|
otherwise return -SIG, where SIG is the signal that killed it. """
|
|
return spawnvp(mode, file, args)
|
|
|
|
def spawnlpe(mode, file, *args):
|
|
"""spawnlpe(mode, file, *args, env) -> integer
|
|
|
|
Execute file (which is looked for along $PATH) with arguments from
|
|
args in a subprocess with the supplied environment.
|
|
If mode == P_NOWAIT return the pid of the process.
|
|
If mode == P_WAIT return the process's exit code if it exits normally;
|
|
otherwise return -SIG, where SIG is the signal that killed it. """
|
|
env = args[-1]
|
|
return spawnvpe(mode, file, args[:-1], env)
|
|
|
|
|
|
__all__.extend(["spawnlp","spawnlpe","spawnv", "spawnve","spawnvp",
|
|
"spawnvpe","spawnl","spawnle",])
|
|
|
|
|
|
# Supply popen2 etc. (for Unix)
|
|
if _exists("fork"):
|
|
if not _exists("popen2"):
|
|
def popen2(cmd, mode="t", bufsize=-1):
|
|
import popen2
|
|
stdout, stdin = popen2.popen2(cmd, bufsize)
|
|
return stdin, stdout
|
|
__all__.append("popen2")
|
|
|
|
if not _exists("popen3"):
|
|
def popen3(cmd, mode="t", bufsize=-1):
|
|
import popen2
|
|
stdout, stdin, stderr = popen2.popen3(cmd, bufsize)
|
|
return stdin, stdout, stderr
|
|
__all__.append("popen3")
|
|
|
|
if not _exists("popen4"):
|
|
def popen4(cmd, mode="t", bufsize=-1):
|
|
import popen2
|
|
stdout, stdin = popen2.popen4(cmd, bufsize)
|
|
return stdin, stdout
|
|
__all__.append("popen4")
|
|
|
|
import copy_reg as _copy_reg
|
|
|
|
def _make_stat_result(tup, dict):
|
|
return stat_result(tup, dict)
|
|
|
|
def _pickle_stat_result(sr):
|
|
(type, args) = sr.__reduce__()
|
|
return (_make_stat_result, args)
|
|
|
|
try:
|
|
_copy_reg.pickle(stat_result, _pickle_stat_result, _make_stat_result)
|
|
except NameError: # stat_result may not exist
|
|
pass
|
|
|
|
def _make_statvfs_result(tup, dict):
|
|
return statvfs_result(tup, dict)
|
|
|
|
def _pickle_statvfs_result(sr):
|
|
(type, args) = sr.__reduce__()
|
|
return (_make_statvfs_result, args)
|
|
|
|
try:
|
|
_copy_reg.pickle(statvfs_result, _pickle_statvfs_result,
|
|
_make_statvfs_result)
|
|
except NameError: # statvfs_result may not exist
|
|
pass
|