mirror of
https://github.com/python/cpython.git
synced 2025-01-21 07:55:16 +08:00
068d5724d8
terminated; this makes the final assert in the self-test code fail if the parent runs faster than the children. Fix this by calling wait() on the remaining children instead.
101 lines
2.7 KiB
Python
101 lines
2.7 KiB
Python
import os
|
|
import sys
|
|
import string
|
|
|
|
MAXFD = 256 # Max number of file descriptors (os.getdtablesize()???)
|
|
|
|
_active = []
|
|
|
|
def _cleanup():
|
|
for inst in _active[:]:
|
|
inst.poll()
|
|
|
|
class Popen3:
|
|
def __init__(self, cmd, capturestderr=0, bufsize=-1):
|
|
if type(cmd) == type(''):
|
|
cmd = ['/bin/sh', '-c', cmd]
|
|
p2cread, p2cwrite = os.pipe()
|
|
c2pread, c2pwrite = os.pipe()
|
|
if capturestderr:
|
|
errout, errin = os.pipe()
|
|
self.pid = os.fork()
|
|
if self.pid == 0:
|
|
# Child
|
|
os.close(0)
|
|
os.close(1)
|
|
if os.dup(p2cread) <> 0:
|
|
sys.stderr.write('popen2: bad read dup\n')
|
|
if os.dup(c2pwrite) <> 1:
|
|
sys.stderr.write('popen2: bad write dup\n')
|
|
if capturestderr:
|
|
os.close(2)
|
|
if os.dup(errin) <> 2: pass
|
|
for i in range(3, MAXFD):
|
|
try:
|
|
os.close(i)
|
|
except: pass
|
|
try:
|
|
os.execvp(cmd[0], cmd)
|
|
finally:
|
|
os._exit(1)
|
|
# Shouldn't come here, I guess
|
|
os._exit(1)
|
|
os.close(p2cread)
|
|
self.tochild = os.fdopen(p2cwrite, 'w', bufsize)
|
|
os.close(c2pwrite)
|
|
self.fromchild = os.fdopen(c2pread, 'r', bufsize)
|
|
if capturestderr:
|
|
os.close(errin)
|
|
self.childerr = os.fdopen(errout, 'r', bufsize)
|
|
else:
|
|
self.childerr = None
|
|
self.sts = -1 # Child not completed yet
|
|
_active.append(self)
|
|
def poll(self):
|
|
if self.sts < 0:
|
|
try:
|
|
pid, sts = os.waitpid(self.pid, os.WNOHANG)
|
|
if pid == self.pid:
|
|
self.sts = sts
|
|
_active.remove(self)
|
|
except os.error:
|
|
pass
|
|
return self.sts
|
|
def wait(self):
|
|
pid, sts = os.waitpid(self.pid, 0)
|
|
if pid == self.pid:
|
|
self.sts = sts
|
|
_active.remove(self)
|
|
return self.sts
|
|
|
|
def popen2(cmd, bufsize=-1):
|
|
_cleanup()
|
|
inst = Popen3(cmd, 0, bufsize)
|
|
return inst.fromchild, inst.tochild
|
|
|
|
def popen3(cmd, bufsize=-1):
|
|
_cleanup()
|
|
inst = Popen3(cmd, 1, bufsize)
|
|
return inst.fromchild, inst.tochild, inst.childerr
|
|
|
|
def _test():
|
|
teststr = "abc\n"
|
|
print "testing popen2..."
|
|
r, w = popen2('cat')
|
|
w.write(teststr)
|
|
w.close()
|
|
assert r.read() == teststr
|
|
print "testing popen3..."
|
|
r, w, e = popen3(['cat'])
|
|
w.write(teststr)
|
|
w.close()
|
|
assert r.read() == teststr
|
|
assert e.read() == ""
|
|
for inst in _active[:]:
|
|
inst.wait()
|
|
assert not _active
|
|
print "All OK"
|
|
|
|
if __name__ == '__main__':
|
|
_test()
|