cpython/Lib/test/test_socket.py
Ronald Oussoren 94f25283c9 Remove traces of MacOS9 support.
Fix for issue #7908
2010-05-05 19:11:21 +00:00

1495 lines
49 KiB
Python

#!/usr/bin/env python3
import unittest
from test import support
import errno
import io
import socket
import select
import time
import traceback
import queue
import sys
import os
import array
from weakref import proxy
import signal
HOST = support.HOST
MSG = b'Michael Gilfix was here\n'
try:
import _thread as thread
import threading
except ImportError:
thread = None
threading = None
class SocketTCPTest(unittest.TestCase):
def setUp(self):
self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.port = support.bind_port(self.serv)
self.serv.listen(1)
def tearDown(self):
self.serv.close()
self.serv = None
class SocketUDPTest(unittest.TestCase):
def setUp(self):
self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.port = support.bind_port(self.serv)
def tearDown(self):
self.serv.close()
self.serv = None
class ThreadableTest:
"""Threadable Test class
The ThreadableTest class makes it easy to create a threaded
client/server pair from an existing unit test. To create a
new threaded class from an existing unit test, use multiple
inheritance:
class NewClass (OldClass, ThreadableTest):
pass
This class defines two new fixture functions with obvious
purposes for overriding:
clientSetUp ()
clientTearDown ()
Any new test functions within the class must then define
tests in pairs, where the test name is preceeded with a
'_' to indicate the client portion of the test. Ex:
def testFoo(self):
# Server portion
def _testFoo(self):
# Client portion
Any exceptions raised by the clients during their tests
are caught and transferred to the main thread to alert
the testing framework.
Note, the server setup function cannot call any blocking
functions that rely on the client thread during setup,
unless serverExplicitReady() is called just before
the blocking call (such as in setting up a client/server
connection and performing the accept() in setUp().
"""
def __init__(self):
# Swap the true setup function
self.__setUp = self.setUp
self.__tearDown = self.tearDown
self.setUp = self._setUp
self.tearDown = self._tearDown
def serverExplicitReady(self):
"""This method allows the server to explicitly indicate that
it wants the client thread to proceed. This is useful if the
server is about to execute a blocking routine that is
dependent upon the client thread during its setup routine."""
self.server_ready.set()
def _setUp(self):
self.server_ready = threading.Event()
self.client_ready = threading.Event()
self.done = threading.Event()
self.queue = queue.Queue(1)
# Do some munging to start the client test.
methodname = self.id()
i = methodname.rfind('.')
methodname = methodname[i+1:]
test_method = getattr(self, '_' + methodname)
self.client_thread = thread.start_new_thread(
self.clientRun, (test_method,))
self.__setUp()
if not self.server_ready.is_set():
self.server_ready.set()
self.client_ready.wait()
def _tearDown(self):
self.__tearDown()
self.done.wait()
if self.queue.qsize():
msg = self.queue.get()
self.fail(msg)
def clientRun(self, test_func):
self.server_ready.wait()
self.client_ready.set()
self.clientSetUp()
if not hasattr(test_func, '__call__'):
raise TypeError("test_func must be a callable function")
try:
test_func()
except Exception as strerror:
self.queue.put(strerror)
self.clientTearDown()
def clientSetUp(self):
raise NotImplementedError("clientSetUp must be implemented.")
def clientTearDown(self):
self.done.set()
thread.exit()
class ThreadedTCPSocketTest(SocketTCPTest, ThreadableTest):
def __init__(self, methodName='runTest'):
SocketTCPTest.__init__(self, methodName=methodName)
ThreadableTest.__init__(self)
def clientSetUp(self):
self.cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
def clientTearDown(self):
self.cli.close()
self.cli = None
ThreadableTest.clientTearDown(self)
class ThreadedUDPSocketTest(SocketUDPTest, ThreadableTest):
def __init__(self, methodName='runTest'):
SocketUDPTest.__init__(self, methodName=methodName)
ThreadableTest.__init__(self)
def clientSetUp(self):
self.cli = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
class SocketConnectedTest(ThreadedTCPSocketTest):
"""Socket tests for client-server connection.
self.cli_conn is a client socket connected to the server. The
setUp() method guarantees that it is connected to the server.
"""
def __init__(self, methodName='runTest'):
ThreadedTCPSocketTest.__init__(self, methodName=methodName)
def setUp(self):
ThreadedTCPSocketTest.setUp(self)
# Indicate explicitly we're ready for the client thread to
# proceed and then perform the blocking call to accept
self.serverExplicitReady()
conn, addr = self.serv.accept()
self.cli_conn = conn
def tearDown(self):
self.cli_conn.close()
self.cli_conn = None
ThreadedTCPSocketTest.tearDown(self)
def clientSetUp(self):
ThreadedTCPSocketTest.clientSetUp(self)
self.cli.connect((HOST, self.port))
self.serv_conn = self.cli
def clientTearDown(self):
self.serv_conn.close()
self.serv_conn = None
ThreadedTCPSocketTest.clientTearDown(self)
class SocketPairTest(unittest.TestCase, ThreadableTest):
def __init__(self, methodName='runTest'):
unittest.TestCase.__init__(self, methodName=methodName)
ThreadableTest.__init__(self)
def setUp(self):
self.serv, self.cli = socket.socketpair()
def tearDown(self):
self.serv.close()
self.serv = None
def clientSetUp(self):
pass
def clientTearDown(self):
self.cli.close()
self.cli = None
ThreadableTest.clientTearDown(self)
#######################################################################
## Begin Tests
class GeneralModuleTests(unittest.TestCase):
def test_repr(self):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.assertTrue(repr(s).startswith("<socket.socket object"))
def test_weakref(self):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
p = proxy(s)
self.assertEqual(p.fileno(), s.fileno())
s.close()
s = None
try:
p.fileno()
except ReferenceError:
pass
else:
self.fail('Socket proxy still exists')
def testSocketError(self):
# Testing socket module exceptions
def raise_error(*args, **kwargs):
raise socket.error
def raise_herror(*args, **kwargs):
raise socket.herror
def raise_gaierror(*args, **kwargs):
raise socket.gaierror
self.assertRaises(socket.error, raise_error,
"Error raising socket exception.")
self.assertRaises(socket.error, raise_herror,
"Error raising socket exception.")
self.assertRaises(socket.error, raise_gaierror,
"Error raising socket exception.")
def testCrucialConstants(self):
# Testing for mission critical constants
socket.AF_INET
socket.SOCK_STREAM
socket.SOCK_DGRAM
socket.SOCK_RAW
socket.SOCK_RDM
socket.SOCK_SEQPACKET
socket.SOL_SOCKET
socket.SO_REUSEADDR
def testHostnameRes(self):
# Testing hostname resolution mechanisms
hostname = socket.gethostname()
try:
ip = socket.gethostbyname(hostname)
except socket.error:
# Probably name lookup wasn't set up right; skip this test
return
self.assertTrue(ip.find('.') >= 0, "Error resolving host to ip.")
try:
hname, aliases, ipaddrs = socket.gethostbyaddr(ip)
except socket.error:
# Probably a similar problem as above; skip this test
return
all_host_names = [hostname, hname] + aliases
fqhn = socket.getfqdn(ip)
if not fqhn in all_host_names:
self.fail("Error testing host resolution mechanisms. (fqdn: %s, all: %s)" % (fqhn, repr(all_host_names)))
def testRefCountGetNameInfo(self):
# Testing reference count for getnameinfo
if hasattr(sys, "getrefcount"):
try:
# On some versions, this loses a reference
orig = sys.getrefcount(__name__)
socket.getnameinfo(__name__,0)
except TypeError:
if sys.getrefcount(__name__) != orig:
self.fail("socket.getnameinfo loses a reference")
def testInterpreterCrash(self):
# Making sure getnameinfo doesn't crash the interpreter
try:
# On some versions, this crashes the interpreter.
socket.getnameinfo(('x', 0, 0, 0), 0)
except socket.error:
pass
def testNtoH(self):
# This just checks that htons etc. are their own inverse,
# when looking at the lower 16 or 32 bits.
sizes = {socket.htonl: 32, socket.ntohl: 32,
socket.htons: 16, socket.ntohs: 16}
for func, size in sizes.items():
mask = (1<<size) - 1
for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210):
self.assertEqual(i & mask, func(func(i&mask)) & mask)
swapped = func(mask)
self.assertEqual(swapped & mask, mask)
self.assertRaises(OverflowError, func, 1<<34)
def testNtoHErrors(self):
good_values = [ 1, 2, 3, 1, 2, 3 ]
bad_values = [ -1, -2, -3, -1, -2, -3 ]
for k in good_values:
socket.ntohl(k)
socket.ntohs(k)
socket.htonl(k)
socket.htons(k)
for k in bad_values:
self.assertRaises(OverflowError, socket.ntohl, k)
self.assertRaises(OverflowError, socket.ntohs, k)
self.assertRaises(OverflowError, socket.htonl, k)
self.assertRaises(OverflowError, socket.htons, k)
def testGetServBy(self):
eq = self.assertEqual
# Find one service that exists, then check all the related interfaces.
# I've ordered this by protocols that have both a tcp and udp
# protocol, at least for modern Linuxes.
if sys.platform in ('linux2', 'freebsd4', 'freebsd5', 'freebsd6',
'freebsd7', 'freebsd8', 'darwin'):
# avoid the 'echo' service on this platform, as there is an
# assumption breaking non-standard port/protocol entry
services = ('daytime', 'qotd', 'domain')
else:
services = ('echo', 'daytime', 'domain')
for service in services:
try:
port = socket.getservbyname(service, 'tcp')
break
except socket.error:
pass
else:
raise socket.error
# Try same call with optional protocol omitted
port2 = socket.getservbyname(service)
eq(port, port2)
# Try udp, but don't barf it it doesn't exist
try:
udpport = socket.getservbyname(service, 'udp')
except socket.error:
udpport = None
else:
eq(udpport, port)
# Now make sure the lookup by port returns the same service name
eq(socket.getservbyport(port2), service)
eq(socket.getservbyport(port, 'tcp'), service)
if udpport is not None:
eq(socket.getservbyport(udpport, 'udp'), service)
# Make sure getservbyport does not accept out of range ports.
self.assertRaises(OverflowError, socket.getservbyport, -1)
self.assertRaises(OverflowError, socket.getservbyport, 65536)
def testDefaultTimeout(self):
# Testing default timeout
# The default timeout should initially be None
self.assertEqual(socket.getdefaulttimeout(), None)
s = socket.socket()
self.assertEqual(s.gettimeout(), None)
s.close()
# Set the default timeout to 10, and see if it propagates
socket.setdefaulttimeout(10)
self.assertEqual(socket.getdefaulttimeout(), 10)
s = socket.socket()
self.assertEqual(s.gettimeout(), 10)
s.close()
# Reset the default timeout to None, and see if it propagates
socket.setdefaulttimeout(None)
self.assertEqual(socket.getdefaulttimeout(), None)
s = socket.socket()
self.assertEqual(s.gettimeout(), None)
s.close()
# Check that setting it to an invalid value raises ValueError
self.assertRaises(ValueError, socket.setdefaulttimeout, -1)
# Check that setting it to an invalid type raises TypeError
self.assertRaises(TypeError, socket.setdefaulttimeout, "spam")
def testIPv4_inet_aton_fourbytes(self):
if not hasattr(socket, 'inet_aton'):
return # No inet_aton, nothing to check
# Test that issue1008086 and issue767150 are fixed.
# It must return 4 bytes.
self.assertEquals(b'\x00'*4, socket.inet_aton('0.0.0.0'))
self.assertEquals(b'\xff'*4, socket.inet_aton('255.255.255.255'))
def testIPv4toString(self):
if not hasattr(socket, 'inet_pton'):
return # No inet_pton() on this platform
from socket import inet_aton as f, inet_pton, AF_INET
g = lambda a: inet_pton(AF_INET, a)
self.assertEquals(b'\x00\x00\x00\x00', f('0.0.0.0'))
self.assertEquals(b'\xff\x00\xff\x00', f('255.0.255.0'))
self.assertEquals(b'\xaa\xaa\xaa\xaa', f('170.170.170.170'))
self.assertEquals(b'\x01\x02\x03\x04', f('1.2.3.4'))
self.assertEquals(b'\xff\xff\xff\xff', f('255.255.255.255'))
self.assertEquals(b'\x00\x00\x00\x00', g('0.0.0.0'))
self.assertEquals(b'\xff\x00\xff\x00', g('255.0.255.0'))
self.assertEquals(b'\xaa\xaa\xaa\xaa', g('170.170.170.170'))
self.assertEquals(b'\xff\xff\xff\xff', g('255.255.255.255'))
def testIPv6toString(self):
if not hasattr(socket, 'inet_pton'):
return # No inet_pton() on this platform
try:
from socket import inet_pton, AF_INET6, has_ipv6
if not has_ipv6:
return
except ImportError:
return
f = lambda a: inet_pton(AF_INET6, a)
self.assertEquals(b'\x00' * 16, f('::'))
self.assertEquals(b'\x00' * 16, f('0::0'))
self.assertEquals(b'\x00\x01' + b'\x00' * 14, f('1::'))
self.assertEquals(
b'\x45\xef\x76\xcb\x00\x1a\x56\xef\xaf\xeb\x0b\xac\x19\x24\xae\xae',
f('45ef:76cb:1a:56ef:afeb:bac:1924:aeae')
)
def testStringToIPv4(self):
if not hasattr(socket, 'inet_ntop'):
return # No inet_ntop() on this platform
from socket import inet_ntoa as f, inet_ntop, AF_INET
g = lambda a: inet_ntop(AF_INET, a)
self.assertEquals('1.0.1.0', f(b'\x01\x00\x01\x00'))
self.assertEquals('170.85.170.85', f(b'\xaa\x55\xaa\x55'))
self.assertEquals('255.255.255.255', f(b'\xff\xff\xff\xff'))
self.assertEquals('1.2.3.4', f(b'\x01\x02\x03\x04'))
self.assertEquals('1.0.1.0', g(b'\x01\x00\x01\x00'))
self.assertEquals('170.85.170.85', g(b'\xaa\x55\xaa\x55'))
self.assertEquals('255.255.255.255', g(b'\xff\xff\xff\xff'))
def testStringToIPv6(self):
if not hasattr(socket, 'inet_ntop'):
return # No inet_ntop() on this platform
try:
from socket import inet_ntop, AF_INET6, has_ipv6
if not has_ipv6:
return
except ImportError:
return
f = lambda a: inet_ntop(AF_INET6, a)
self.assertEquals('::', f(b'\x00' * 16))
self.assertEquals('::1', f(b'\x00' * 15 + b'\x01'))
self.assertEquals(
'aef:b01:506:1001:ffff:9997:55:170',
f(b'\x0a\xef\x0b\x01\x05\x06\x10\x01\xff\xff\x99\x97\x00\x55\x01\x70')
)
# XXX The following don't test module-level functionality...
def _get_unused_port(self, bind_address='0.0.0.0'):
"""Use a temporary socket to elicit an unused ephemeral port.
Args:
bind_address: Hostname or IP address to search for a port on.
Returns: A most likely to be unused port.
"""
tempsock = socket.socket()
tempsock.bind((bind_address, 0))
host, port = tempsock.getsockname()
tempsock.close()
return port
def testSockName(self):
# Testing getsockname()
port = self._get_unused_port()
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(("0.0.0.0", port))
name = sock.getsockname()
# XXX(nnorwitz): http://tinyurl.com/os5jz seems to indicate
# it reasonable to get the host's addr in addition to 0.0.0.0.
# At least for eCos. This is required for the S/390 to pass.
my_ip_addr = socket.gethostbyname(socket.gethostname())
self.assertIn(name[0], ("0.0.0.0", my_ip_addr), '%s invalid' % name[0])
self.assertEqual(name[1], port)
def testGetSockOpt(self):
# Testing getsockopt()
# We know a socket should start without reuse==0
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)
self.assertFalse(reuse != 0, "initial mode is reuse")
def testSetSockOpt(self):
# Testing setsockopt()
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)
self.assertFalse(reuse == 0, "failed to set reuse mode")
def testSendAfterClose(self):
# testing send() after close() with timeout
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1)
sock.close()
self.assertRaises(socket.error, sock.send, b"spam")
def testNewAttributes(self):
# testing .family, .type and .protocol
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.assertEqual(sock.family, socket.AF_INET)
self.assertEqual(sock.type, socket.SOCK_STREAM)
self.assertEqual(sock.proto, 0)
sock.close()
def test_getsockaddrarg(self):
host = '0.0.0.0'
port = self._get_unused_port(bind_address=host)
big_port = port + 65536
neg_port = port - 65536
sock = socket.socket()
try:
self.assertRaises(OverflowError, sock.bind, (host, big_port))
self.assertRaises(OverflowError, sock.bind, (host, neg_port))
sock.bind((host, port))
finally:
sock.close()
def test_sock_ioctl(self):
if os.name != "nt":
return
self.assertTrue(hasattr(socket.socket, 'ioctl'))
self.assertTrue(hasattr(socket, 'SIO_RCVALL'))
self.assertTrue(hasattr(socket, 'RCVALL_ON'))
self.assertTrue(hasattr(socket, 'RCVALL_OFF'))
self.assertTrue(hasattr(socket, 'SIO_KEEPALIVE_VALS'))
s = socket.socket()
self.assertRaises(ValueError, s.ioctl, -1, None)
s.ioctl(socket.SIO_KEEPALIVE_VALS, (1, 100, 100))
@unittest.skipUnless(thread, 'Threading required for this test.')
class BasicTCPTest(SocketConnectedTest):
def __init__(self, methodName='runTest'):
SocketConnectedTest.__init__(self, methodName=methodName)
def testRecv(self):
# Testing large receive over TCP
msg = self.cli_conn.recv(1024)
self.assertEqual(msg, MSG)
def _testRecv(self):
self.serv_conn.send(MSG)
def testOverFlowRecv(self):
# Testing receive in chunks over TCP
seg1 = self.cli_conn.recv(len(MSG) - 3)
seg2 = self.cli_conn.recv(1024)
msg = seg1 + seg2
self.assertEqual(msg, MSG)
def _testOverFlowRecv(self):
self.serv_conn.send(MSG)
def testRecvFrom(self):
# Testing large recvfrom() over TCP
msg, addr = self.cli_conn.recvfrom(1024)
self.assertEqual(msg, MSG)
def _testRecvFrom(self):
self.serv_conn.send(MSG)
def testOverFlowRecvFrom(self):
# Testing recvfrom() in chunks over TCP
seg1, addr = self.cli_conn.recvfrom(len(MSG)-3)
seg2, addr = self.cli_conn.recvfrom(1024)
msg = seg1 + seg2
self.assertEqual(msg, MSG)
def _testOverFlowRecvFrom(self):
self.serv_conn.send(MSG)
def testSendAll(self):
# Testing sendall() with a 2048 byte string over TCP
msg = b''
while 1:
read = self.cli_conn.recv(1024)
if not read:
break
msg += read
self.assertEqual(msg, b'f' * 2048)
def _testSendAll(self):
big_chunk = b'f' * 2048
self.serv_conn.sendall(big_chunk)
def testFromFd(self):
# Testing fromfd()
if not hasattr(socket, "fromfd"):
return # On Windows, this doesn't exist
fd = self.cli_conn.fileno()
sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
msg = sock.recv(1024)
self.assertEqual(msg, MSG)
def _testFromFd(self):
self.serv_conn.send(MSG)
def testDup(self):
# Testing dup()
sock = self.cli_conn.dup()
msg = sock.recv(1024)
self.assertEqual(msg, MSG)
def _testDup(self):
self.serv_conn.send(MSG)
def testShutdown(self):
# Testing shutdown()
msg = self.cli_conn.recv(1024)
self.assertEqual(msg, MSG)
# wait for _testShutdown to finish: on OS X, when the server
# closes the connection the client also becomes disconnected,
# and the client's shutdown call will fail. (Issue #4397.)
self.done.wait()
def _testShutdown(self):
self.serv_conn.send(MSG)
self.serv_conn.shutdown(2)
@unittest.skipUnless(thread, 'Threading required for this test.')
class BasicUDPTest(ThreadedUDPSocketTest):
def __init__(self, methodName='runTest'):
ThreadedUDPSocketTest.__init__(self, methodName=methodName)
def testSendtoAndRecv(self):
# Testing sendto() and Recv() over UDP
msg = self.serv.recv(len(MSG))
self.assertEqual(msg, MSG)
def _testSendtoAndRecv(self):
self.cli.sendto(MSG, 0, (HOST, self.port))
def testRecvFrom(self):
# Testing recvfrom() over UDP
msg, addr = self.serv.recvfrom(len(MSG))
self.assertEqual(msg, MSG)
def _testRecvFrom(self):
self.cli.sendto(MSG, 0, (HOST, self.port))
def testRecvFromNegative(self):
# Negative lengths passed to recvfrom should give ValueError.
self.assertRaises(ValueError, self.serv.recvfrom, -1)
def _testRecvFromNegative(self):
self.cli.sendto(MSG, 0, (HOST, self.port))
@unittest.skipUnless(thread, 'Threading required for this test.')
class TCPCloserTest(ThreadedTCPSocketTest):
def testClose(self):
conn, addr = self.serv.accept()
conn.close()
sd = self.cli
read, write, err = select.select([sd], [], [], 1.0)
self.assertEqual(read, [sd])
self.assertEqual(sd.recv(1), b'')
# Calling close() many times should be safe.
conn.close()
conn.close()
def _testClose(self):
self.cli.connect((HOST, self.port))
time.sleep(1.0)
@unittest.skipUnless(thread, 'Threading required for this test.')
class BasicSocketPairTest(SocketPairTest):
def __init__(self, methodName='runTest'):
SocketPairTest.__init__(self, methodName=methodName)
def testRecv(self):
msg = self.serv.recv(1024)
self.assertEqual(msg, MSG)
def _testRecv(self):
self.cli.send(MSG)
def testSend(self):
self.serv.send(MSG)
def _testSend(self):
msg = self.cli.recv(1024)
self.assertEqual(msg, MSG)
@unittest.skipUnless(thread, 'Threading required for this test.')
class NonBlockingTCPTests(ThreadedTCPSocketTest):
def __init__(self, methodName='runTest'):
ThreadedTCPSocketTest.__init__(self, methodName=methodName)
def testSetBlocking(self):
# Testing whether set blocking works
self.serv.setblocking(0)
start = time.time()
try:
self.serv.accept()
except socket.error:
pass
end = time.time()
self.assertTrue((end - start) < 1.0, "Error setting non-blocking mode.")
def _testSetBlocking(self):
pass
def testAccept(self):
# Testing non-blocking accept
self.serv.setblocking(0)
try:
conn, addr = self.serv.accept()
except socket.error:
pass
else:
self.fail("Error trying to do non-blocking accept.")
read, write, err = select.select([self.serv], [], [])
if self.serv in read:
conn, addr = self.serv.accept()
else:
self.fail("Error trying to do accept after select.")
def _testAccept(self):
time.sleep(0.1)
self.cli.connect((HOST, self.port))
def testConnect(self):
# Testing non-blocking connect
conn, addr = self.serv.accept()
def _testConnect(self):
self.cli.settimeout(10)
self.cli.connect((HOST, self.port))
def testRecv(self):
# Testing non-blocking recv
conn, addr = self.serv.accept()
conn.setblocking(0)
try:
msg = conn.recv(len(MSG))
except socket.error:
pass
else:
self.fail("Error trying to do non-blocking recv.")
read, write, err = select.select([conn], [], [])
if conn in read:
msg = conn.recv(len(MSG))
self.assertEqual(msg, MSG)
else:
self.fail("Error during select call to non-blocking socket.")
def _testRecv(self):
self.cli.connect((HOST, self.port))
time.sleep(0.1)
self.cli.send(MSG)
@unittest.skipUnless(thread, 'Threading required for this test.')
class FileObjectClassTestCase(SocketConnectedTest):
"""Unit tests for the object returned by socket.makefile()
self.serv_file is the io object returned by makefile() on
the client connection. You can read from this file to
get output from the server.
self.cli_file is the io object returned by makefile() on the
server connection. You can write to this file to send output
to the client.
"""
bufsize = -1 # Use default buffer size
def __init__(self, methodName='runTest'):
SocketConnectedTest.__init__(self, methodName=methodName)
def setUp(self):
SocketConnectedTest.setUp(self)
self.serv_file = self.cli_conn.makefile('rb', self.bufsize)
def tearDown(self):
self.serv_file.close()
self.assertTrue(self.serv_file.closed)
self.serv_file = None
SocketConnectedTest.tearDown(self)
def clientSetUp(self):
SocketConnectedTest.clientSetUp(self)
self.cli_file = self.serv_conn.makefile('wb')
def clientTearDown(self):
self.cli_file.close()
self.assertTrue(self.cli_file.closed)
self.cli_file = None
SocketConnectedTest.clientTearDown(self)
def testSmallRead(self):
# Performing small file read test
first_seg = self.serv_file.read(len(MSG)-3)
second_seg = self.serv_file.read(3)
msg = first_seg + second_seg
self.assertEqual(msg, MSG)
def _testSmallRead(self):
self.cli_file.write(MSG)
self.cli_file.flush()
def testFullRead(self):
# read until EOF
msg = self.serv_file.read()
self.assertEqual(msg, MSG)
def _testFullRead(self):
self.cli_file.write(MSG)
self.cli_file.close()
def testUnbufferedRead(self):
# Performing unbuffered file read test
buf = b''
while 1:
char = self.serv_file.read(1)
if not char:
break
buf += char
self.assertEqual(buf, MSG)
def _testUnbufferedRead(self):
self.cli_file.write(MSG)
self.cli_file.flush()
def testReadline(self):
# Performing file readline test
line = self.serv_file.readline()
self.assertEqual(line, MSG)
def _testReadline(self):
self.cli_file.write(MSG)
self.cli_file.flush()
def testCloseAfterMakefile(self):
# The file returned by makefile should keep the socket open.
self.cli_conn.close()
# read until EOF
msg = self.serv_file.read()
self.assertEqual(msg, MSG)
def _testCloseAfterMakefile(self):
self.cli_file.write(MSG)
self.cli_file.flush()
def testMakefileAfterMakefileClose(self):
self.serv_file.close()
msg = self.cli_conn.recv(len(MSG))
self.assertEqual(msg, MSG)
def _testMakefileAfterMakefileClose(self):
self.cli_file.write(MSG)
self.cli_file.flush()
def testClosedAttr(self):
self.assertTrue(not self.serv_file.closed)
def _testClosedAttr(self):
self.assertTrue(not self.cli_file.closed)
def testAttributes(self):
self.assertEqual(self.serv_file.mode, 'rb')
self.assertEqual(self.serv_file.name, self.cli_conn.fileno())
def _testAttributes(self):
self.assertEqual(self.cli_file.mode, 'wb')
self.assertEqual(self.cli_file.name, self.serv_conn.fileno())
def testRealClose(self):
self.serv_file.close()
self.assertRaises(ValueError, self.serv_file.fileno)
self.cli_conn.close()
self.assertRaises(socket.error, self.cli_conn.getsockname)
def _testRealClose(self):
pass
class FileObjectInterruptedTestCase(unittest.TestCase):
"""Test that the file object correctly handles EINTR internally."""
class MockSocket(object):
def __init__(self, recv_funcs=()):
# A generator that returns callables that we'll call for each
# call to recv().
self._recv_step = iter(recv_funcs)
def recv_into(self, buffer):
data = next(self._recv_step)()
assert len(buffer) >= len(data)
buffer[:len(data)] = data
return len(data)
def _decref_socketios(self):
pass
def _textiowrap_for_test(self, buffering=-1):
raw = socket.SocketIO(self, "r")
if buffering < 0:
buffering = io.DEFAULT_BUFFER_SIZE
if buffering == 0:
return raw
buffer = io.BufferedReader(raw, buffering)
text = io.TextIOWrapper(buffer, None, None)
text.mode = "rb"
return text
@staticmethod
def _raise_eintr():
raise socket.error(errno.EINTR)
def _textiowrap_mock_socket(self, mock, buffering=-1):
raw = socket.SocketIO(mock, "r")
if buffering < 0:
buffering = io.DEFAULT_BUFFER_SIZE
if buffering == 0:
return raw
buffer = io.BufferedReader(raw, buffering)
text = io.TextIOWrapper(buffer, None, None)
text.mode = "rb"
return text
def _test_readline(self, size=-1, buffering=-1):
mock_sock = self.MockSocket(recv_funcs=[
lambda : b"This is the first line\nAnd the sec",
self._raise_eintr,
lambda : b"ond line is here\n",
lambda : b"",
lambda : b"", # XXX(gps): io library does an extra EOF read
])
fo = mock_sock._textiowrap_for_test(buffering=buffering)
self.assertEquals(fo.readline(size), "This is the first line\n")
self.assertEquals(fo.readline(size), "And the second line is here\n")
def _test_read(self, size=-1, buffering=-1):
mock_sock = self.MockSocket(recv_funcs=[
lambda : b"This is the first line\nAnd the sec",
self._raise_eintr,
lambda : b"ond line is here\n",
lambda : b"",
lambda : b"", # XXX(gps): io library does an extra EOF read
])
expecting = (b"This is the first line\n"
b"And the second line is here\n")
fo = mock_sock._textiowrap_for_test(buffering=buffering)
if buffering == 0:
data = b''
else:
data = ''
expecting = expecting.decode('utf8')
while len(data) != len(expecting):
part = fo.read(size)
if not part:
break
data += part
self.assertEquals(data, expecting)
def test_default(self):
self._test_readline()
self._test_readline(size=100)
self._test_read()
self._test_read(size=100)
def test_with_1k_buffer(self):
self._test_readline(buffering=1024)
self._test_readline(size=100, buffering=1024)
self._test_read(buffering=1024)
self._test_read(size=100, buffering=1024)
def _test_readline_no_buffer(self, size=-1):
mock_sock = self.MockSocket(recv_funcs=[
lambda : b"a",
lambda : b"\n",
lambda : b"B",
self._raise_eintr,
lambda : b"b",
lambda : b"",
])
fo = mock_sock._textiowrap_for_test(buffering=0)
self.assertEquals(fo.readline(size), b"a\n")
self.assertEquals(fo.readline(size), b"Bb")
def test_no_buffer(self):
self._test_readline_no_buffer()
self._test_readline_no_buffer(size=4)
self._test_read(buffering=0)
self._test_read(size=100, buffering=0)
class UnbufferedFileObjectClassTestCase(FileObjectClassTestCase):
"""Repeat the tests from FileObjectClassTestCase with bufsize==0.
In this case (and in this case only), it should be possible to
create a file object, read a line from it, create another file
object, read another line from it, without loss of data in the
first file object's buffer. Note that http.client relies on this
when reading multiple requests from the same socket."""
bufsize = 0 # Use unbuffered mode
def testUnbufferedReadline(self):
# Read a line, create a new file object, read another line with it
line = self.serv_file.readline() # first line
self.assertEqual(line, b"A. " + MSG) # first line
self.serv_file = self.cli_conn.makefile('rb', 0)
line = self.serv_file.readline() # second line
self.assertEqual(line, b"B. " + MSG) # second line
def _testUnbufferedReadline(self):
self.cli_file.write(b"A. " + MSG)
self.cli_file.write(b"B. " + MSG)
self.cli_file.flush()
def testMakefileClose(self):
# The file returned by makefile should keep the socket open...
self.cli_conn.close()
msg = self.cli_conn.recv(1024)
self.assertEqual(msg, MSG)
# ...until the file is itself closed
self.serv_file.close()
self.assertRaises(socket.error, self.cli_conn.recv, 1024)
def _testMakefileClose(self):
self.cli_file.write(MSG)
self.cli_file.flush()
def testMakefileCloseSocketDestroy(self):
refcount_before = sys.getrefcount(self.cli_conn)
self.serv_file.close()
refcount_after = sys.getrefcount(self.cli_conn)
self.assertEqual(refcount_before - 1, refcount_after)
def _testMakefileCloseSocketDestroy(self):
pass
class LineBufferedFileObjectClassTestCase(FileObjectClassTestCase):
bufsize = 1 # Default-buffered for reading; line-buffered for writing
class SmallBufferedFileObjectClassTestCase(FileObjectClassTestCase):
bufsize = 2 # Exercise the buffering code
class NetworkConnectionTest(object):
"""Prove network connection."""
def clientSetUp(self):
# We're inherited below by BasicTCPTest2, which also inherits
# BasicTCPTest, which defines self.port referenced below.
self.cli = socket.create_connection((HOST, self.port))
self.serv_conn = self.cli
class BasicTCPTest2(NetworkConnectionTest, BasicTCPTest):
"""Tests that NetworkConnection does not break existing TCP functionality.
"""
class NetworkConnectionNoServer(unittest.TestCase):
def testWithoutServer(self):
port = support.find_unused_port()
self.assertRaises(
socket.error,
lambda: socket.create_connection((HOST, port))
)
@unittest.skipUnless(thread, 'Threading required for this test.')
class NetworkConnectionAttributesTest(SocketTCPTest, ThreadableTest):
def __init__(self, methodName='runTest'):
SocketTCPTest.__init__(self, methodName=methodName)
ThreadableTest.__init__(self)
def clientSetUp(self):
self.source_port = support.find_unused_port()
def clientTearDown(self):
self.cli.close()
self.cli = None
ThreadableTest.clientTearDown(self)
def _justAccept(self):
conn, addr = self.serv.accept()
testFamily = _justAccept
def _testFamily(self):
self.cli = socket.create_connection((HOST, self.port), timeout=30)
self.assertEqual(self.cli.family, 2)
testSourceAddress = _justAccept
def _testSourceAddress(self):
self.cli = socket.create_connection((HOST, self.port), timeout=30,
source_address=('', self.source_port))
self.assertEqual(self.cli.getsockname()[1], self.source_port)
# The port number being used is sufficient to show that the bind()
# call happened.
testTimeoutDefault = _justAccept
def _testTimeoutDefault(self):
# passing no explicit timeout uses socket's global default
self.assertTrue(socket.getdefaulttimeout() is None)
socket.setdefaulttimeout(42)
try:
self.cli = socket.create_connection((HOST, self.port))
finally:
socket.setdefaulttimeout(None)
self.assertEquals(self.cli.gettimeout(), 42)
testTimeoutNone = _justAccept
def _testTimeoutNone(self):
# None timeout means the same as sock.settimeout(None)
self.assertTrue(socket.getdefaulttimeout() is None)
socket.setdefaulttimeout(30)
try:
self.cli = socket.create_connection((HOST, self.port), timeout=None)
finally:
socket.setdefaulttimeout(None)
self.assertEqual(self.cli.gettimeout(), None)
testTimeoutValueNamed = _justAccept
def _testTimeoutValueNamed(self):
self.cli = socket.create_connection((HOST, self.port), timeout=30)
self.assertEqual(self.cli.gettimeout(), 30)
testTimeoutValueNonamed = _justAccept
def _testTimeoutValueNonamed(self):
self.cli = socket.create_connection((HOST, self.port), 30)
self.assertEqual(self.cli.gettimeout(), 30)
@unittest.skipUnless(thread, 'Threading required for this test.')
class NetworkConnectionBehaviourTest(SocketTCPTest, ThreadableTest):
def __init__(self, methodName='runTest'):
SocketTCPTest.__init__(self, methodName=methodName)
ThreadableTest.__init__(self)
def clientSetUp(self):
pass
def clientTearDown(self):
self.cli.close()
self.cli = None
ThreadableTest.clientTearDown(self)
def testInsideTimeout(self):
conn, addr = self.serv.accept()
time.sleep(3)
conn.send(b"done!")
testOutsideTimeout = testInsideTimeout
def _testInsideTimeout(self):
self.cli = sock = socket.create_connection((HOST, self.port))
data = sock.recv(5)
self.assertEqual(data, b"done!")
def _testOutsideTimeout(self):
self.cli = sock = socket.create_connection((HOST, self.port), timeout=1)
self.assertRaises(socket.timeout, lambda: sock.recv(5))
class TCPTimeoutTest(SocketTCPTest):
def testTCPTimeout(self):
def raise_timeout(*args, **kwargs):
self.serv.settimeout(1.0)
self.serv.accept()
self.assertRaises(socket.timeout, raise_timeout,
"Error generating a timeout exception (TCP)")
def testTimeoutZero(self):
ok = False
try:
self.serv.settimeout(0.0)
foo = self.serv.accept()
except socket.timeout:
self.fail("caught timeout instead of error (TCP)")
except socket.error:
ok = True
except:
self.fail("caught unexpected exception (TCP)")
if not ok:
self.fail("accept() returned success when we did not expect it")
def testInterruptedTimeout(self):
# XXX I don't know how to do this test on MSWindows or any other
# plaform that doesn't support signal.alarm() or os.kill(), though
# the bug should have existed on all platforms.
if not hasattr(signal, "alarm"):
return # can only test on *nix
self.serv.settimeout(5.0) # must be longer than alarm
class Alarm(Exception):
pass
def alarm_handler(signal, frame):
raise Alarm
old_alarm = signal.signal(signal.SIGALRM, alarm_handler)
try:
signal.alarm(2) # POSIX allows alarm to be up to 1 second early
try:
foo = self.serv.accept()
except socket.timeout:
self.fail("caught timeout instead of Alarm")
except Alarm:
pass
except:
self.fail("caught other exception instead of Alarm:"
" %s(%s):\n%s" %
(sys.exc_info()[:2] + (traceback.format_exc(),)))
else:
self.fail("nothing caught")
finally:
signal.alarm(0) # shut off alarm
except Alarm:
self.fail("got Alarm in wrong place")
finally:
# no alarm can be pending. Safe to restore old handler.
signal.signal(signal.SIGALRM, old_alarm)
class UDPTimeoutTest(SocketTCPTest):
def testUDPTimeout(self):
def raise_timeout(*args, **kwargs):
self.serv.settimeout(1.0)
self.serv.recv(1024)
self.assertRaises(socket.timeout, raise_timeout,
"Error generating a timeout exception (UDP)")
def testTimeoutZero(self):
ok = False
try:
self.serv.settimeout(0.0)
foo = self.serv.recv(1024)
except socket.timeout:
self.fail("caught timeout instead of error (UDP)")
except socket.error:
ok = True
except:
self.fail("caught unexpected exception (UDP)")
if not ok:
self.fail("recv() returned success when we did not expect it")
class TestExceptions(unittest.TestCase):
def testExceptionTree(self):
self.assertTrue(issubclass(socket.error, Exception))
self.assertTrue(issubclass(socket.herror, socket.error))
self.assertTrue(issubclass(socket.gaierror, socket.error))
self.assertTrue(issubclass(socket.timeout, socket.error))
class TestLinuxAbstractNamespace(unittest.TestCase):
UNIX_PATH_MAX = 108
def testLinuxAbstractNamespace(self):
address = b"\x00python-test-hello\x00\xff"
s1 = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
s1.bind(address)
s1.listen(1)
s2 = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
s2.connect(s1.getsockname())
s1.accept()
self.assertEqual(s1.getsockname(), address)
self.assertEqual(s2.getpeername(), address)
def testMaxName(self):
address = b"\x00" + b"h" * (self.UNIX_PATH_MAX - 1)
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
s.bind(address)
self.assertEqual(s.getsockname(), address)
def testNameOverflow(self):
address = "\x00" + "h" * self.UNIX_PATH_MAX
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.assertRaises(socket.error, s.bind, address)
@unittest.skipUnless(thread, 'Threading required for this test.')
class BufferIOTest(SocketConnectedTest):
"""
Test the buffer versions of socket.recv() and socket.send().
"""
def __init__(self, methodName='runTest'):
SocketConnectedTest.__init__(self, methodName=methodName)
def testRecvIntoArray(self):
buf = bytearray(1024)
nbytes = self.cli_conn.recv_into(buf)
self.assertEqual(nbytes, len(MSG))
msg = buf[:len(MSG)]
self.assertEqual(msg, MSG)
def _testRecvIntoArray(self):
buf = bytes(MSG)
self.serv_conn.send(buf)
def testRecvIntoBytearray(self):
buf = bytearray(1024)
nbytes = self.cli_conn.recv_into(buf)
self.assertEqual(nbytes, len(MSG))
msg = buf[:len(MSG)]
self.assertEqual(msg, MSG)
_testRecvIntoBytearray = _testRecvIntoArray
def testRecvIntoMemoryview(self):
buf = bytearray(1024)
nbytes = self.cli_conn.recv_into(memoryview(buf))
self.assertEqual(nbytes, len(MSG))
msg = buf[:len(MSG)]
self.assertEqual(msg, MSG)
_testRecvIntoMemoryview = _testRecvIntoArray
def testRecvFromIntoArray(self):
buf = bytearray(1024)
nbytes, addr = self.cli_conn.recvfrom_into(buf)
self.assertEqual(nbytes, len(MSG))
msg = buf[:len(MSG)]
self.assertEqual(msg, MSG)
def _testRecvFromIntoArray(self):
buf = bytes(MSG)
self.serv_conn.send(buf)
def testRecvFromIntoBytearray(self):
buf = bytearray(1024)
nbytes, addr = self.cli_conn.recvfrom_into(buf)
self.assertEqual(nbytes, len(MSG))
msg = buf[:len(MSG)]
self.assertEqual(msg, MSG)
_testRecvFromIntoBytearray = _testRecvFromIntoArray
def testRecvFromIntoMemoryview(self):
buf = bytearray(1024)
nbytes, addr = self.cli_conn.recvfrom_into(memoryview(buf))
self.assertEqual(nbytes, len(MSG))
msg = buf[:len(MSG)]
self.assertEqual(msg, MSG)
_testRecvFromIntoMemoryview = _testRecvFromIntoArray
TIPC_STYPE = 2000
TIPC_LOWER = 200
TIPC_UPPER = 210
def isTipcAvailable():
"""Check if the TIPC module is loaded
The TIPC module is not loaded automatically on Ubuntu and probably
other Linux distros.
"""
if not hasattr(socket, "AF_TIPC"):
return False
if not os.path.isfile("/proc/modules"):
return False
with open("/proc/modules") as f:
for line in f:
if line.startswith("tipc "):
return True
if support.verbose:
print("TIPC module is not loaded, please 'sudo modprobe tipc'")
return False
class TIPCTest (unittest.TestCase):
def testRDM(self):
srv = socket.socket(socket.AF_TIPC, socket.SOCK_RDM)
cli = socket.socket(socket.AF_TIPC, socket.SOCK_RDM)
srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
srvaddr = (socket.TIPC_ADDR_NAMESEQ, TIPC_STYPE,
TIPC_LOWER, TIPC_UPPER)
srv.bind(srvaddr)
sendaddr = (socket.TIPC_ADDR_NAME, TIPC_STYPE,
TIPC_LOWER + int((TIPC_UPPER - TIPC_LOWER) / 2), 0)
cli.sendto(MSG, sendaddr)
msg, recvaddr = srv.recvfrom(1024)
self.assertEqual(cli.getsockname(), recvaddr)
self.assertEqual(msg, MSG)
class TIPCThreadableTest (unittest.TestCase, ThreadableTest):
def __init__(self, methodName = 'runTest'):
unittest.TestCase.__init__(self, methodName = methodName)
ThreadableTest.__init__(self)
def setUp(self):
self.srv = socket.socket(socket.AF_TIPC, socket.SOCK_STREAM)
self.srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
srvaddr = (socket.TIPC_ADDR_NAMESEQ, TIPC_STYPE,
TIPC_LOWER, TIPC_UPPER)
self.srv.bind(srvaddr)
self.srv.listen(5)
self.serverExplicitReady()
self.conn, self.connaddr = self.srv.accept()
def clientSetUp(self):
# The is a hittable race between serverExplicitReady() and the
# accept() call; sleep a little while to avoid it, otherwise
# we could get an exception
time.sleep(0.1)
self.cli = socket.socket(socket.AF_TIPC, socket.SOCK_STREAM)
addr = (socket.TIPC_ADDR_NAME, TIPC_STYPE,
TIPC_LOWER + int((TIPC_UPPER - TIPC_LOWER) / 2), 0)
self.cli.connect(addr)
self.cliaddr = self.cli.getsockname()
def testStream(self):
msg = self.conn.recv(1024)
self.assertEqual(msg, MSG)
self.assertEqual(self.cliaddr, self.connaddr)
def _testStream(self):
self.cli.send(MSG)
self.cli.close()
def test_main():
tests = [GeneralModuleTests, BasicTCPTest, TCPCloserTest, TCPTimeoutTest,
TestExceptions, BufferIOTest, BasicTCPTest2, BasicUDPTest, UDPTimeoutTest ]
tests.extend([
NonBlockingTCPTests,
FileObjectClassTestCase,
FileObjectInterruptedTestCase,
UnbufferedFileObjectClassTestCase,
LineBufferedFileObjectClassTestCase,
SmallBufferedFileObjectClassTestCase,
NetworkConnectionNoServer,
NetworkConnectionAttributesTest,
NetworkConnectionBehaviourTest,
])
if hasattr(socket, "socketpair"):
tests.append(BasicSocketPairTest)
if sys.platform == 'linux2':
tests.append(TestLinuxAbstractNamespace)
if isTipcAvailable():
tests.append(TIPCTest)
tests.append(TIPCThreadableTest)
thread_info = support.threading_setup()
support.run_unittest(*tests)
support.threading_cleanup(*thread_info)
if __name__ == "__main__":
test_main()