mirror of
https://github.com/python/cpython.git
synced 2024-11-25 19:03:49 +08:00
Replace docstrings on test functions witrh comments -- then unittest
prints function and module names, which is more informative now that we repeat some tests in slightly modified subclasses. Add a test for read() until EOF. Add test suites for line-buffered (bufsize==1) and a small custom buffer size (bufsize==2). Restructure testUnbufferedRead() somewhat to avoid a potentially infinite loop.
This commit is contained in:
parent
808eb59fc4
commit
8c94383fa8
@ -191,7 +191,7 @@ class SocketConnectedTest(ThreadedTCPSocketTest):
|
||||
class GeneralModuleTests(unittest.TestCase):
|
||||
|
||||
def testSocketError(self):
|
||||
"""Testing that socket module exceptions."""
|
||||
# Testing socket module exceptions
|
||||
def raise_error(*args, **kwargs):
|
||||
raise socket.error
|
||||
def raise_herror(*args, **kwargs):
|
||||
@ -206,7 +206,7 @@ class GeneralModuleTests(unittest.TestCase):
|
||||
"Error raising socket exception.")
|
||||
|
||||
def testCrucialConstants(self):
|
||||
"""Testing for mission critical constants."""
|
||||
# Testing for mission critical constants
|
||||
socket.AF_INET
|
||||
socket.SOCK_STREAM
|
||||
socket.SOCK_DGRAM
|
||||
@ -217,7 +217,7 @@ class GeneralModuleTests(unittest.TestCase):
|
||||
socket.SO_REUSEADDR
|
||||
|
||||
def testHostnameRes(self):
|
||||
"""Testing hostname resolution mechanisms."""
|
||||
# Testing hostname resolution mechanisms
|
||||
hostname = socket.gethostname()
|
||||
ip = socket.gethostbyname(hostname)
|
||||
self.assert_(ip.find('.') >= 0, "Error resolving host to ip.")
|
||||
@ -228,7 +228,7 @@ class GeneralModuleTests(unittest.TestCase):
|
||||
self.fail("Error testing host resolution mechanisms.")
|
||||
|
||||
def testRefCountGetNameInfo(self):
|
||||
"""Testing reference count for getnameinfo."""
|
||||
# Testing reference count for getnameinfo
|
||||
import sys
|
||||
if hasattr(sys, "getrefcount"):
|
||||
try:
|
||||
@ -240,7 +240,7 @@ class GeneralModuleTests(unittest.TestCase):
|
||||
self.fail("socket.getnameinfo loses a reference")
|
||||
|
||||
def testInterpreterCrash(self):
|
||||
"""Making sure getnameinfo doesn't crash the interpreter."""
|
||||
# Making sure getnameinfo doesn't crash the interpreter
|
||||
try:
|
||||
# On some versions, this crashes the interpreter.
|
||||
socket.getnameinfo(('x', 0, 0, 0), 0)
|
||||
@ -258,7 +258,7 @@ class GeneralModuleTests(unittest.TestCase):
|
||||
self.assertRaises(OverflowError, func, 2L**34)
|
||||
|
||||
def testGetServByName(self):
|
||||
"""Testing getservbyname()."""
|
||||
# Testing getservbyname()
|
||||
# try a few protocols - not everyone has telnet enabled
|
||||
found = 0
|
||||
for proto in ("telnet", "ssh", "www", "ftp"):
|
||||
@ -278,7 +278,7 @@ class GeneralModuleTests(unittest.TestCase):
|
||||
raise socket.error
|
||||
|
||||
def testDefaultTimeout(self):
|
||||
"""Testing default timeout."""
|
||||
# Testing default timeout
|
||||
# The default timeout should initially be None
|
||||
self.assertEqual(socket.getdefaulttimeout(), None)
|
||||
s = socket.socket()
|
||||
@ -308,28 +308,28 @@ class GeneralModuleTests(unittest.TestCase):
|
||||
# XXX The following don't test module-level functionality...
|
||||
|
||||
def testSockName(self):
|
||||
"""Testing getsockname()."""
|
||||
# Testing getsockname()
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
sock.bind(("0.0.0.0", PORT+1))
|
||||
name = sock.getsockname()
|
||||
self.assertEqual(name, ("0.0.0.0", PORT+1))
|
||||
|
||||
def testGetSockOpt(self):
|
||||
"""Testing getsockopt()."""
|
||||
# Testing getsockopt()
|
||||
# We know a socket should start without reuse==0
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)
|
||||
self.failIf(reuse != 0, "initial mode is reuse")
|
||||
|
||||
def testSetSockOpt(self):
|
||||
"""Testing setsockopt()."""
|
||||
# Testing setsockopt()
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)
|
||||
self.failIf(reuse == 0, "failed to set reuse mode")
|
||||
|
||||
def testSendAfterClose(self):
|
||||
"""testing send() after close() with timeout."""
|
||||
# testing send() after close() with timeout
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
sock.settimeout(1)
|
||||
sock.close()
|
||||
@ -341,7 +341,7 @@ class BasicTCPTest(SocketConnectedTest):
|
||||
SocketConnectedTest.__init__(self, methodName=methodName)
|
||||
|
||||
def testRecv(self):
|
||||
"""Testing large receive over TCP."""
|
||||
# Testing large receive over TCP
|
||||
msg = self.cli_conn.recv(1024)
|
||||
self.assertEqual(msg, MSG)
|
||||
|
||||
@ -349,7 +349,7 @@ class BasicTCPTest(SocketConnectedTest):
|
||||
self.serv_conn.send(MSG)
|
||||
|
||||
def testOverFlowRecv(self):
|
||||
"""Testing receive in chunks over TCP."""
|
||||
# Testing receive in chunks over TCP
|
||||
seg1 = self.cli_conn.recv(len(MSG) - 3)
|
||||
seg2 = self.cli_conn.recv(1024)
|
||||
msg = seg1 + seg2
|
||||
@ -359,7 +359,7 @@ class BasicTCPTest(SocketConnectedTest):
|
||||
self.serv_conn.send(MSG)
|
||||
|
||||
def testRecvFrom(self):
|
||||
"""Testing large recvfrom() over TCP."""
|
||||
# Testing large recvfrom() over TCP
|
||||
msg, addr = self.cli_conn.recvfrom(1024)
|
||||
self.assertEqual(msg, MSG)
|
||||
|
||||
@ -367,7 +367,7 @@ class BasicTCPTest(SocketConnectedTest):
|
||||
self.serv_conn.send(MSG)
|
||||
|
||||
def testOverFlowRecvFrom(self):
|
||||
"""Testing recvfrom() in chunks over TCP."""
|
||||
# Testing recvfrom() in chunks over TCP
|
||||
seg1, addr = self.cli_conn.recvfrom(len(MSG)-3)
|
||||
seg2, addr = self.cli_conn.recvfrom(1024)
|
||||
msg = seg1 + seg2
|
||||
@ -377,7 +377,7 @@ class BasicTCPTest(SocketConnectedTest):
|
||||
self.serv_conn.send(MSG)
|
||||
|
||||
def testSendAll(self):
|
||||
"""Testing sendall() with a 2048 byte string over TCP."""
|
||||
# Testing sendall() with a 2048 byte string over TCP
|
||||
while 1:
|
||||
read = self.cli_conn.recv(1024)
|
||||
if not read:
|
||||
@ -391,7 +391,7 @@ class BasicTCPTest(SocketConnectedTest):
|
||||
self.serv_conn.sendall(big_chunk)
|
||||
|
||||
def testFromFd(self):
|
||||
"""Testing fromfd()."""
|
||||
# Testing fromfd()
|
||||
if not hasattr(socket, "fromfd"):
|
||||
return # On Windows, this doesn't exist
|
||||
fd = self.cli_conn.fileno()
|
||||
@ -403,7 +403,7 @@ class BasicTCPTest(SocketConnectedTest):
|
||||
self.serv_conn.send(MSG)
|
||||
|
||||
def testShutdown(self):
|
||||
"""Testing shutdown()."""
|
||||
# Testing shutdown()
|
||||
msg = self.cli_conn.recv(1024)
|
||||
self.assertEqual(msg, MSG)
|
||||
|
||||
@ -417,7 +417,7 @@ class BasicUDPTest(ThreadedUDPSocketTest):
|
||||
ThreadedUDPSocketTest.__init__(self, methodName=methodName)
|
||||
|
||||
def testSendtoAndRecv(self):
|
||||
"""Testing sendto() and Recv() over UDP."""
|
||||
# Testing sendto() and Recv() over UDP
|
||||
msg = self.serv.recv(len(MSG))
|
||||
self.assertEqual(msg, MSG)
|
||||
|
||||
@ -425,7 +425,7 @@ class BasicUDPTest(ThreadedUDPSocketTest):
|
||||
self.cli.sendto(MSG, 0, (HOST, PORT))
|
||||
|
||||
def testRecvFrom(self):
|
||||
"""Testing recvfrom() over UDP."""
|
||||
# Testing recvfrom() over UDP
|
||||
msg, addr = self.serv.recvfrom(len(MSG))
|
||||
self.assertEqual(msg, MSG)
|
||||
|
||||
@ -438,7 +438,7 @@ class NonBlockingTCPTests(ThreadedTCPSocketTest):
|
||||
ThreadedTCPSocketTest.__init__(self, methodName=methodName)
|
||||
|
||||
def testSetBlocking(self):
|
||||
"""Testing whether set blocking works."""
|
||||
# Testing whether set blocking works
|
||||
self.serv.setblocking(0)
|
||||
start = time.time()
|
||||
try:
|
||||
@ -452,7 +452,7 @@ class NonBlockingTCPTests(ThreadedTCPSocketTest):
|
||||
pass
|
||||
|
||||
def testAccept(self):
|
||||
"""Testing non-blocking accept."""
|
||||
# Testing non-blocking accept
|
||||
self.serv.setblocking(0)
|
||||
try:
|
||||
conn, addr = self.serv.accept()
|
||||
@ -471,7 +471,7 @@ class NonBlockingTCPTests(ThreadedTCPSocketTest):
|
||||
self.cli.connect((HOST, PORT))
|
||||
|
||||
def testConnect(self):
|
||||
"""Testing non-blocking connect."""
|
||||
# Testing non-blocking connect
|
||||
conn, addr = self.serv.accept()
|
||||
|
||||
def _testConnect(self):
|
||||
@ -479,7 +479,7 @@ class NonBlockingTCPTests(ThreadedTCPSocketTest):
|
||||
self.cli.connect((HOST, PORT))
|
||||
|
||||
def testRecv(self):
|
||||
"""Testing non-blocking recv."""
|
||||
# Testing non-blocking recv
|
||||
conn, addr = self.serv.accept()
|
||||
conn.setblocking(0)
|
||||
try:
|
||||
@ -526,7 +526,7 @@ class FileObjectClassTestCase(SocketConnectedTest):
|
||||
SocketConnectedTest.clientTearDown(self)
|
||||
|
||||
def testSmallRead(self):
|
||||
"""Performing small file read test."""
|
||||
# Performing small file read test
|
||||
first_seg = self.serv_file.read(len(MSG)-3)
|
||||
second_seg = self.serv_file.read(3)
|
||||
msg = first_seg + second_seg
|
||||
@ -536,22 +536,31 @@ class FileObjectClassTestCase(SocketConnectedTest):
|
||||
self.cli_file.write(MSG)
|
||||
self.cli_file.flush()
|
||||
|
||||
def testFullRead(self):
|
||||
# read until EOF
|
||||
msg = self.serv_file.read()
|
||||
self.assertEqual(msg, MSG)
|
||||
|
||||
def _testFullRead(self):
|
||||
self.cli_file.write(MSG)
|
||||
self.cli_file.close()
|
||||
|
||||
def testUnbufferedRead(self):
|
||||
"""Performing unbuffered file read test."""
|
||||
# Performing unbuffered file read test
|
||||
buf = ''
|
||||
while 1:
|
||||
char = self.serv_file.read(1)
|
||||
self.failIf(not char)
|
||||
buf += char
|
||||
if buf == MSG:
|
||||
if not char:
|
||||
break
|
||||
buf += char
|
||||
self.assertEqual(buf, MSG)
|
||||
|
||||
def _testUnbufferedRead(self):
|
||||
self.cli_file.write(MSG)
|
||||
self.cli_file.flush()
|
||||
|
||||
def testReadline(self):
|
||||
"""Performing file readline test."""
|
||||
# Performing file readline test
|
||||
line = self.serv_file.readline()
|
||||
self.assertEqual(line, MSG)
|
||||
|
||||
@ -572,7 +581,7 @@ class UnbufferedFileObjectClassTestCase(FileObjectClassTestCase):
|
||||
bufsize = 0 # Use unbuffered mode
|
||||
|
||||
def testUnbufferedReadline(self):
|
||||
"""Read a line, create a new file object, read another line with it."""
|
||||
# Read a line, create a new file object, read another line with it
|
||||
line = self.serv_file.readline() # first line
|
||||
self.assertEqual(line, "A. " + MSG) # first line
|
||||
self.serv_file = self.cli_conn.makefile('rb', 0)
|
||||
@ -584,6 +593,14 @@ class UnbufferedFileObjectClassTestCase(FileObjectClassTestCase):
|
||||
self.cli_file.write("B. " + MSG)
|
||||
self.cli_file.flush()
|
||||
|
||||
class LineBufferedFileObjectClassTestCase(FileObjectClassTestCase):
|
||||
|
||||
bufsize = 1 # Default-buffered for reading; line-buffered for writing
|
||||
|
||||
|
||||
class SmallBufferedFileObjectClassTestCase(FileObjectClassTestCase):
|
||||
|
||||
bufsize = 2 # Exercise the buffering code
|
||||
|
||||
def test_main():
|
||||
suite = unittest.TestSuite()
|
||||
@ -593,6 +610,8 @@ def test_main():
|
||||
suite.addTest(unittest.makeSuite(NonBlockingTCPTests))
|
||||
suite.addTest(unittest.makeSuite(FileObjectClassTestCase))
|
||||
suite.addTest(unittest.makeSuite(UnbufferedFileObjectClassTestCase))
|
||||
suite.addTest(unittest.makeSuite(LineBufferedFileObjectClassTestCase))
|
||||
suite.addTest(unittest.makeSuite(SmallBufferedFileObjectClassTestCase))
|
||||
test_support.run_suite(suite)
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
Loading…
Reference in New Issue
Block a user