From ab66d2a6cbe1484244f98298c6cdd0cb65042842 Mon Sep 17 00:00:00 2001 From: Ross Lagerwall Date: Sun, 12 Feb 2012 09:01:30 +0200 Subject: [PATCH] Attempt to speed up some subprocess tests (and hopefully keep them reliable). --- Lib/test/test_subprocess.py | 45 +++++++++++++++++-------------------- 1 file changed, 21 insertions(+), 24 deletions(-) diff --git a/Lib/test/test_subprocess.py b/Lib/test/test_subprocess.py index f2cdc333c9d..b00e780ea13 100644 --- a/Lib/test/test_subprocess.py +++ b/Lib/test/test_subprocess.py @@ -686,26 +686,19 @@ class ProcessTestCase(BaseTestCase): self.assertEqual(subprocess.list2cmdline(['ab', '']), 'ab ""') - def test_poll(self): - p = subprocess.Popen([sys.executable, - "-c", "import time; time.sleep(1)"]) - count = 0 - while p.poll() is None: - time.sleep(0.1) - count += 1 - # We expect that the poll loop probably went around about 10 times, - # but, based on system scheduling we can't control, it's possible - # poll() never returned None. It "should be" very rare that it - # didn't go around at least twice. - self.assertGreaterEqual(count, 2) + p = subprocess.Popen([sys.executable, "-c", + "import os", + "os.read(1)"], stdin=subprocess.PIPE) + self.addCleanup(p.stdin.close) + self.assertIsNone(p.poll()) + os.write(p.stdin.fileno(), b'A') + p.wait() # Subsequent invocations should just return the returncode self.assertEqual(p.poll(), 0) - def test_wait(self): - p = subprocess.Popen([sys.executable, - "-c", "import time; time.sleep(2)"]) + p = subprocess.Popen([sys.executable, "-c", "pass"]) self.assertEqual(p.wait(), 0) # Subsequent invocations should just return the returncode self.assertEqual(p.wait(), 0) @@ -797,25 +790,29 @@ class ProcessTestCase(BaseTestCase): p = subprocess.Popen([sys.executable, "-c", 'pass'], stdin=subprocess.PIPE) self.addCleanup(p.stdin.close) - time.sleep(2) + p.wait() p.communicate(b"x" * 2**20) - @unittest.skipUnless(hasattr(signal, 'SIGALRM'), - "Requires signal.SIGALRM") + @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), + "Requires signal.SIGUSR1") + @unittest.skipUnless(hasattr(os, 'kill'), + "Requires os.kill") + @unittest.skipUnless(hasattr(os, 'getppid'), + "Requires os.getppid") def test_communicate_eintr(self): # Issue #12493: communicate() should handle EINTR def handler(signum, frame): pass - old_handler = signal.signal(signal.SIGALRM, handler) - self.addCleanup(signal.signal, signal.SIGALRM, old_handler) + old_handler = signal.signal(signal.SIGUSR1, handler) + self.addCleanup(signal.signal, signal.SIGUSR1, old_handler) - # the process is running for 2 seconds - args = [sys.executable, "-c", 'import time; time.sleep(2)'] + args = [sys.executable, "-c", + 'import os, signal;' + 'os.kill(os.getppid(), signal.SIGUSR1)'] for stream in ('stdout', 'stderr'): kw = {stream: subprocess.PIPE} with subprocess.Popen(args, **kw) as process: - signal.alarm(1) - # communicate() will be interrupted by SIGALRM + # communicate() will be interrupted by SIGUSR1 process.communicate()