mirror of
https://github.com/python/cpython.git
synced 2024-11-26 03:14:27 +08:00
34f20eac98
[ 559250 ] more POSIX signal stuff Adds support (and docs and tests and autoconfery) for posix signal mask handling -- sigpending, sigprocmask and sigsuspend.
129 lines
2.9 KiB
Python
129 lines
2.9 KiB
Python
# Test the signal module
|
|
from test_support import verbose, TestSkipped, TestFailed
|
|
import signal
|
|
import os, sys, time
|
|
|
|
if sys.platform[:3] in ('win', 'os2') or sys.platform=='riscos':
|
|
raise TestSkipped, "Can't test signal on %s" % sys.platform
|
|
|
|
if verbose:
|
|
x = '-x'
|
|
else:
|
|
x = '+x'
|
|
pid = os.getpid()
|
|
|
|
# Shell script that will send us asynchronous signals
|
|
script = """
|
|
(
|
|
set %(x)s
|
|
sleep 2
|
|
kill -5 %(pid)d
|
|
sleep 2
|
|
kill -2 %(pid)d
|
|
sleep 2
|
|
kill -3 %(pid)d
|
|
) &
|
|
""" % vars()
|
|
|
|
def handlerA(*args):
|
|
if verbose:
|
|
print "handlerA", args
|
|
|
|
HandlerBCalled = "HandlerBCalled" # Exception
|
|
|
|
def handlerB(*args):
|
|
if verbose:
|
|
print "handlerB", args
|
|
raise HandlerBCalled, args
|
|
|
|
signal.alarm(20) # Entire test lasts at most 20 sec.
|
|
signal.signal(5, handlerA)
|
|
signal.signal(2, handlerB)
|
|
signal.signal(3, signal.SIG_IGN)
|
|
signal.signal(signal.SIGALRM, signal.default_int_handler)
|
|
|
|
os.system(script)
|
|
|
|
print "starting pause() loop..."
|
|
|
|
try:
|
|
while 1:
|
|
if verbose:
|
|
print "call pause()..."
|
|
try:
|
|
signal.pause()
|
|
if verbose:
|
|
print "pause() returned"
|
|
except HandlerBCalled:
|
|
if verbose:
|
|
print "HandlerBCalled exception caught"
|
|
else:
|
|
pass
|
|
|
|
except KeyboardInterrupt:
|
|
if verbose:
|
|
print "KeyboardInterrupt (assume the alarm() went off)"
|
|
|
|
|
|
if hasattr(signal, "sigprocmask"):
|
|
class HupDelivered(Exception):
|
|
pass
|
|
def hup(signum, frame):
|
|
raise HupDelivered
|
|
def hup2(signum, frame):
|
|
signal.signal(signal.SIGHUP, hup)
|
|
return
|
|
signal.signal(signal.SIGHUP, hup)
|
|
|
|
if verbose:
|
|
print "blocking SIGHUP"
|
|
|
|
defaultmask = signal.sigprocmask(signal.SIG_BLOCK, [signal.SIGHUP])
|
|
|
|
if verbose:
|
|
print "sending SIGHUP"
|
|
|
|
try:
|
|
os.kill(pid, signal.SIGHUP)
|
|
except HupDelivered:
|
|
raise TestFailed, "HUP not blocked"
|
|
|
|
if signal.SIGHUP not in signal.sigpending():
|
|
raise TestFailed, "HUP not pending"
|
|
|
|
if verbose:
|
|
print "unblocking SIGHUP"
|
|
|
|
try:
|
|
signal.sigprocmask(signal.SIG_UNBLOCK, [signal.SIGHUP])
|
|
except HupDelivered:
|
|
pass
|
|
else:
|
|
raise TestFailed, "HUP not delivered"
|
|
|
|
if verbose:
|
|
print "testing sigsuspend"
|
|
|
|
signal.sigprocmask(signal.SIG_BLOCK, [signal.SIGHUP])
|
|
signal.signal(signal.SIGHUP, hup2)
|
|
|
|
if not os.fork():
|
|
time.sleep(2)
|
|
os.kill(pid, signal.SIGHUP)
|
|
time.sleep(2)
|
|
os.kill(pid, signal.SIGHUP)
|
|
os._exit(0)
|
|
else:
|
|
try:
|
|
signal.sigsuspend(defaultmask)
|
|
except:
|
|
raise TestFailed, "sigsuspend erroneously raised"
|
|
|
|
try:
|
|
signal.sigsuspend(defaultmask)
|
|
except HupDelivered:
|
|
pass
|
|
else:
|
|
raise TestFailed, "sigsupsend didn't raise"
|
|
|