mirror of
https://github.com/python/cpython.git
synced 2024-11-25 19:03:49 +08:00
e549ead826
svn+ssh://pythondev@svn.python.org/python/trunk ........ r70554 | benjamin.peterson | 2009-03-23 16:25:15 -0500 (Mon, 23 Mar 2009) | 1 line complain when there's no last exception ........ r70588 | benjamin.peterson | 2009-03-24 17:56:32 -0500 (Tue, 24 Mar 2009) | 1 line fix newline issue in test summary ........ r70589 | benjamin.peterson | 2009-03-24 18:07:07 -0500 (Tue, 24 Mar 2009) | 1 line another style nit ........ r70598 | benjamin.peterson | 2009-03-25 16:24:04 -0500 (Wed, 25 Mar 2009) | 1 line add shorthands for expected failures and unexpected success ........ r70605 | benjamin.peterson | 2009-03-26 11:32:23 -0500 (Thu, 26 Mar 2009) | 1 line remove uneeded function ........ r70611 | benjamin.peterson | 2009-03-26 13:35:37 -0500 (Thu, 26 Mar 2009) | 1 line add much better tests for python version information parsing ........ r70612 | benjamin.peterson | 2009-03-26 13:55:48 -0500 (Thu, 26 Mar 2009) | 1 line more and more implementations now support sys.subversion ........ r70613 | benjamin.peterson | 2009-03-26 13:58:30 -0500 (Thu, 26 Mar 2009) | 1 line roll old test in with new one ........ r70614 | benjamin.peterson | 2009-03-26 14:09:21 -0500 (Thu, 26 Mar 2009) | 1 line add support for PyPy ........ r70615 | benjamin.peterson | 2009-03-26 14:58:18 -0500 (Thu, 26 Mar 2009) | 5 lines add some useful utilities for skipping tests with unittest's new skipping ability most significantly apply a modified portion of the patch from #4242 with patches for skipping implementation details ........ r70616 | benjamin.peterson | 2009-03-26 15:05:50 -0500 (Thu, 26 Mar 2009) | 1 line rename TestCase.skip() to skipTest() because it causes annoying problems with trial #5571 ........ r70617 | benjamin.peterson | 2009-03-26 15:17:27 -0500 (Thu, 26 Mar 2009) | 1 line apply the second part of #4242's patch; classify all the implementation details in test_descr ........ r70618 | benjamin.peterson | 2009-03-26 15:48:25 -0500 (Thu, 26 Mar 2009) | 1 line remove test_support.TestSkipped and just use unittest.SkipTest ........ r70619 | benjamin.peterson | 2009-03-26 15:49:40 -0500 (Thu, 26 Mar 2009) | 1 line fix naming ........ r70620 | benjamin.peterson | 2009-03-26 16:10:30 -0500 (Thu, 26 Mar 2009) | 1 line fix incorrect auto-translation of TestSkipped -> unittest.SkipTest ........ r70621 | benjamin.peterson | 2009-03-26 16:11:16 -0500 (Thu, 26 Mar 2009) | 1 line must pass argument to get expected behavior ;) ........ r70623 | benjamin.peterson | 2009-03-26 16:30:10 -0500 (Thu, 26 Mar 2009) | 1 line add missing import ........ r70624 | benjamin.peterson | 2009-03-26 16:30:54 -0500 (Thu, 26 Mar 2009) | 1 line ** is required here ........ r70626 | benjamin.peterson | 2009-03-26 16:40:29 -0500 (Thu, 26 Mar 2009) | 1 line update email tests to use SkipTest ........ r70627 | benjamin.peterson | 2009-03-26 16:44:43 -0500 (Thu, 26 Mar 2009) | 1 line fix another name ........
158 lines
4.4 KiB
Python
158 lines
4.4 KiB
Python
# Test case for the os.poll() function
|
|
|
|
import os, select, random, unittest
|
|
from test.support import TESTFN, run_unittest
|
|
|
|
try:
|
|
select.poll
|
|
except AttributeError:
|
|
raise unittest.SkipTest("select.poll not defined -- skipping test_poll")
|
|
|
|
|
|
def find_ready_matching(ready, flag):
|
|
match = []
|
|
for fd, mode in ready:
|
|
if mode & flag:
|
|
match.append(fd)
|
|
return match
|
|
|
|
class PollTests(unittest.TestCase):
|
|
|
|
def test_poll1(self):
|
|
# Basic functional test of poll object
|
|
# Create a bunch of pipe and test that poll works with them.
|
|
|
|
p = select.poll()
|
|
|
|
NUM_PIPES = 12
|
|
MSG = b" This is a test."
|
|
MSG_LEN = len(MSG)
|
|
readers = []
|
|
writers = []
|
|
r2w = {}
|
|
w2r = {}
|
|
|
|
for i in range(NUM_PIPES):
|
|
rd, wr = os.pipe()
|
|
p.register(rd)
|
|
p.modify(rd, select.POLLIN)
|
|
p.register(wr, select.POLLOUT)
|
|
readers.append(rd)
|
|
writers.append(wr)
|
|
r2w[rd] = wr
|
|
w2r[wr] = rd
|
|
|
|
bufs = []
|
|
|
|
while writers:
|
|
ready = p.poll()
|
|
ready_writers = find_ready_matching(ready, select.POLLOUT)
|
|
if not ready_writers:
|
|
raise RuntimeError("no pipes ready for writing")
|
|
wr = random.choice(ready_writers)
|
|
os.write(wr, MSG)
|
|
|
|
ready = p.poll()
|
|
ready_readers = find_ready_matching(ready, select.POLLIN)
|
|
if not ready_readers:
|
|
raise RuntimeError("no pipes ready for reading")
|
|
rd = random.choice(ready_readers)
|
|
buf = os.read(rd, MSG_LEN)
|
|
self.assertEqual(len(buf), MSG_LEN)
|
|
bufs.append(buf)
|
|
os.close(r2w[rd]) ; os.close( rd )
|
|
p.unregister( r2w[rd] )
|
|
p.unregister( rd )
|
|
writers.remove(r2w[rd])
|
|
|
|
self.assertEqual(bufs, [MSG] * NUM_PIPES)
|
|
|
|
def poll_unit_tests(self):
|
|
# returns NVAL for invalid file descriptor
|
|
FD = 42
|
|
try:
|
|
os.close(FD)
|
|
except OSError:
|
|
pass
|
|
p = select.poll()
|
|
p.register(FD)
|
|
r = p.poll()
|
|
self.assertEqual(r[0], (FD, select.POLLNVAL))
|
|
|
|
f = open(TESTFN, 'w')
|
|
fd = f.fileno()
|
|
p = select.poll()
|
|
p.register(f)
|
|
r = p.poll()
|
|
self.assertEqual(r[0][0], fd)
|
|
f.close()
|
|
r = p.poll()
|
|
self.assertEqual(r[0], (fd, select.POLLNVAL))
|
|
os.unlink(TESTFN)
|
|
|
|
# type error for invalid arguments
|
|
p = select.poll()
|
|
self.assertRaises(TypeError, p.register, p)
|
|
self.assertRaises(TypeError, p.unregister, p)
|
|
|
|
# can't unregister non-existent object
|
|
p = select.poll()
|
|
self.assertRaises(KeyError, p.unregister, 3)
|
|
|
|
# Test error cases
|
|
pollster = select.poll()
|
|
class Nope:
|
|
pass
|
|
|
|
class Almost:
|
|
def fileno(self):
|
|
return 'fileno'
|
|
|
|
self.assertRaises(TypeError, pollster.register, Nope(), 0)
|
|
self.assertRaises(TypeError, pollster.register, Almost(), 0)
|
|
|
|
# Another test case for poll(). This is copied from the test case for
|
|
# select(), modified to use poll() instead.
|
|
|
|
def test_poll2(self):
|
|
cmd = 'for i in 0 1 2 3 4 5 6 7 8 9; do echo testing...; sleep 1; done'
|
|
p = os.popen(cmd, 'r')
|
|
pollster = select.poll()
|
|
pollster.register( p, select.POLLIN )
|
|
for tout in (0, 1000, 2000, 4000, 8000, 16000) + (-1,)*10:
|
|
fdlist = pollster.poll(tout)
|
|
if (fdlist == []):
|
|
continue
|
|
fd, flags = fdlist[0]
|
|
if flags & select.POLLHUP:
|
|
line = p.readline()
|
|
if line != "":
|
|
self.fail('error: pipe seems to be closed, but still returns data')
|
|
continue
|
|
|
|
elif flags & select.POLLIN:
|
|
line = p.readline()
|
|
if not line:
|
|
break
|
|
continue
|
|
else:
|
|
self.fail('Unexpected return value from select.poll: %s' % fdlist)
|
|
p.close()
|
|
|
|
def test_poll3(self):
|
|
# test int overflow
|
|
pollster = select.poll()
|
|
pollster.register(1)
|
|
|
|
self.assertRaises(OverflowError, pollster.poll, 1 << 64)
|
|
|
|
x = 2 + 3
|
|
if x != 5:
|
|
self.fail('Overflow must have occurred')
|
|
|
|
def test_main():
|
|
run_unittest(PollTests)
|
|
|
|
if __name__ == '__main__':
|
|
test_main()
|