blob: dc8a4a44f37e657412f2de4a68e35b9959d26c63 [file] [log] [blame]
import unittest
from test import support
from test.support import (
is_apple, os_helper, refleak_helper, socket_helper, threading_helper
)
import _thread as thread
import array
import contextlib
import errno
import gc
import io
import itertools
import math
import os
import pickle
import platform
import queue
import random
import re
import select
import signal
import socket
import string
import struct
import sys
import tempfile
import threading
import time
import traceback
from weakref import proxy
try:
import multiprocessing
except ImportError:
multiprocessing = False
try:
import fcntl
except ImportError:
fcntl = None
try:
import _testcapi
except ImportError:
_testcapi = None
support.requires_working_socket(module=True)
HOST = socket_helper.HOST
# test unicode string and carriage return
MSG = 'Michael Gilfix was here\u1234\r\n'.encode('utf-8')
VSOCKPORT = 1234
AIX = platform.system() == "AIX"
WSL = "microsoft-standard-WSL" in platform.release()
try:
import _socket
except ImportError:
_socket = None
def skipForRefleakHuntinIf(condition, issueref):
if not condition:
def decorator(f):
f.client_skip = lambda f: f
return f
else:
def decorator(f):
@contextlib.wraps(f)
def wrapper(*args, **kwds):
if refleak_helper.hunting_for_refleaks():
raise unittest.SkipTest(f"ignore while hunting for refleaks, see {issueref}")
return f(*args, **kwds)
def client_skip(f):
@contextlib.wraps(f)
def wrapper(*args, **kwds):
if refleak_helper.hunting_for_refleaks():
return
return f(*args, **kwds)
return wrapper
wrapper.client_skip = client_skip
return wrapper
return decorator
def get_cid():
if fcntl is None:
return None
if not hasattr(socket, 'IOCTL_VM_SOCKETS_GET_LOCAL_CID'):
return None
try:
with open("/dev/vsock", "rb") as f:
r = fcntl.ioctl(f, socket.IOCTL_VM_SOCKETS_GET_LOCAL_CID, " ")
except OSError:
return None
else:
return struct.unpack("I", r)[0]
def _have_socket_can():
"""Check whether CAN sockets are supported on this host."""
try:
s = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW)
except (AttributeError, OSError):
return False
else:
s.close()
return True
def _have_socket_can_isotp():
"""Check whether CAN ISOTP sockets are supported on this host."""
try:
s = socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_ISOTP)
except (AttributeError, OSError):
return False
else:
s.close()
return True
def _have_socket_can_j1939():
"""Check whether CAN J1939 sockets are supported on this host."""
try:
s = socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_J1939)
except (AttributeError, OSError):
return False
else:
s.close()
return True
def _have_socket_rds():
"""Check whether RDS sockets are supported on this host."""
try:
s = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0)
except (AttributeError, OSError):
return False
else:
s.close()
return True
def _have_socket_alg():
"""Check whether AF_ALG sockets are supported on this host."""
try:
s = socket.socket(socket.AF_ALG, socket.SOCK_SEQPACKET, 0)
except (AttributeError, OSError):
return False
else:
s.close()
return True
def _have_socket_qipcrtr():
"""Check whether AF_QIPCRTR sockets are supported on this host."""
try:
s = socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM, 0)
except (AttributeError, OSError):
return False
else:
s.close()
return True
def _have_socket_vsock():
"""Check whether AF_VSOCK sockets are supported on this host."""
cid = get_cid()
return (cid is not None)
def _have_socket_bluetooth():
"""Check whether AF_BLUETOOTH sockets are supported on this host."""
try:
# RFCOMM is supported by all platforms with bluetooth support. Windows
# does not support omitting the protocol.
s = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_STREAM, socket.BTPROTO_RFCOMM)
except (AttributeError, OSError):
return False
else:
s.close()
return True
def _have_socket_hyperv():
"""Check whether AF_HYPERV sockets are supported on this host."""
try:
s = socket.socket(socket.AF_HYPERV, socket.SOCK_STREAM, socket.HV_PROTOCOL_RAW)
except (AttributeError, OSError):
return False
else:
s.close()
return True
@contextlib.contextmanager
def socket_setdefaulttimeout(timeout):
old_timeout = socket.getdefaulttimeout()
try:
socket.setdefaulttimeout(timeout)
yield
finally:
socket.setdefaulttimeout(old_timeout)
HAVE_SOCKET_CAN = _have_socket_can()
HAVE_SOCKET_CAN_ISOTP = _have_socket_can_isotp()
HAVE_SOCKET_CAN_J1939 = _have_socket_can_j1939()
HAVE_SOCKET_RDS = _have_socket_rds()
HAVE_SOCKET_ALG = _have_socket_alg()
HAVE_SOCKET_QIPCRTR = _have_socket_qipcrtr()
HAVE_SOCKET_VSOCK = _have_socket_vsock()
# Older Android versions block UDPLITE with SELinux.
HAVE_SOCKET_UDPLITE = (
hasattr(socket, "IPPROTO_UDPLITE")
and not (support.is_android and platform.android_ver().api_level < 29))
HAVE_SOCKET_BLUETOOTH = _have_socket_bluetooth()
HAVE_SOCKET_HYPERV = _have_socket_hyperv()
# Size in bytes of the int type
SIZEOF_INT = array.array("i").itemsize
class SocketTCPTest(unittest.TestCase):
def setUp(self):
self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.port = socket_helper.bind_port(self.serv)
self.serv.listen()
def tearDown(self):
self.serv.close()
self.serv = None
class SocketUDPTest(unittest.TestCase):
def setUp(self):
self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.port = socket_helper.bind_port(self.serv)
def tearDown(self):
self.serv.close()
self.serv = None
class SocketUDPLITETest(SocketUDPTest):
def setUp(self):
self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDPLITE)
self.port = socket_helper.bind_port(self.serv)
class SocketCANTest(unittest.TestCase):
"""To be able to run this test, a `vcan0` CAN interface can be created with
the following commands:
# modprobe vcan
# ip link add dev vcan0 type vcan
# ip link set up vcan0
"""
interface = 'vcan0'
bufsize = 128
"""The CAN frame structure is defined in <linux/can.h>:
struct can_frame {
canid_t can_id; /* 32 bit CAN_ID + EFF/RTR/ERR flags */
__u8 can_dlc; /* data length code: 0 .. 8 */
__u8 data[8] __attribute__((aligned(8)));
};
"""
can_frame_fmt = "=IB3x8s"
can_frame_size = struct.calcsize(can_frame_fmt)
"""The Broadcast Management Command frame structure is defined
in <linux/can/bcm.h>:
struct bcm_msg_head {
__u32 opcode;
__u32 flags;
__u32 count;
struct timeval ival1, ival2;
canid_t can_id;
__u32 nframes;
struct can_frame frames[0];
}
`bcm_msg_head` must be 8 bytes aligned because of the `frames` member (see
`struct can_frame` definition). Must use native not standard types for packing.
"""
bcm_cmd_msg_fmt = "@3I4l2I"
bcm_cmd_msg_fmt += "x" * (struct.calcsize(bcm_cmd_msg_fmt) % 8)
def setUp(self):
self.s = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW)
self.addCleanup(self.s.close)
try:
self.s.bind((self.interface,))
except OSError:
self.skipTest('network interface `%s` does not exist' %
self.interface)
class SocketRDSTest(unittest.TestCase):
"""To be able to run this test, the `rds` kernel module must be loaded:
# modprobe rds
"""
bufsize = 8192
def setUp(self):
self.serv = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0)
self.addCleanup(self.serv.close)
try:
self.port = socket_helper.bind_port(self.serv)
except OSError:
self.skipTest('unable to bind RDS socket')
class ThreadableTest:
"""Threadable Test class
The ThreadableTest class makes it easy to create a threaded
client/server pair from an existing unit test. To create a
new threaded class from an existing unit test, use multiple
inheritance:
class NewClass (OldClass, ThreadableTest):
pass
This class defines two new fixture functions with obvious
purposes for overriding:
clientSetUp ()
clientTearDown ()
Any new test functions within the class must then define
tests in pairs, where the test name is preceded with a
'_' to indicate the client portion of the test. Ex:
def testFoo(self):
# Server portion
def _testFoo(self):
# Client portion
Any exceptions raised by the clients during their tests
are caught and transferred to the main thread to alert
the testing framework.
Note, the server setup function cannot call any blocking
functions that rely on the client thread during setup,
unless serverExplicitReady() is called just before
the blocking call (such as in setting up a client/server
connection and performing the accept() in setUp().
"""
def __init__(self):
# Swap the true setup function
self.__setUp = self.setUp
self.setUp = self._setUp
def serverExplicitReady(self):
"""This method allows the server to explicitly indicate that
it wants the client thread to proceed. This is useful if the
server is about to execute a blocking routine that is
dependent upon the client thread during its setup routine."""
self.server_ready.set()
def _setUp(self):
self.enterContext(threading_helper.wait_threads_exit())
self.server_ready = threading.Event()
self.client_ready = threading.Event()
self.done = threading.Event()
self.queue = queue.Queue(1)
self.server_crashed = False
def raise_queued_exception():
if self.queue.qsize():
raise self.queue.get()
self.addCleanup(raise_queued_exception)
# Do some munging to start the client test.
methodname = self.id()
i = methodname.rfind('.')
methodname = methodname[i+1:]
test_method = getattr(self, '_' + methodname)
self.client_thread = thread.start_new_thread(
self.clientRun, (test_method,))
try:
self.__setUp()
except:
self.server_crashed = True
raise
finally:
self.server_ready.set()
self.client_ready.wait()
self.addCleanup(self.done.wait)
def clientRun(self, test_func):
self.server_ready.wait()
try:
self.clientSetUp()
except BaseException as e:
self.queue.put(e)
self.clientTearDown()
return
finally:
self.client_ready.set()
if self.server_crashed:
self.clientTearDown()
return
if not hasattr(test_func, '__call__'):
raise TypeError("test_func must be a callable function")
try:
test_func()
except BaseException as e:
self.queue.put(e)
finally:
self.clientTearDown()
def clientSetUp(self):
raise NotImplementedError("clientSetUp must be implemented.")
def clientTearDown(self):
self.done.set()
thread.exit()
class ThreadedTCPSocketTest(SocketTCPTest, ThreadableTest):
def __init__(self, methodName='runTest'):
SocketTCPTest.__init__(self, methodName=methodName)
ThreadableTest.__init__(self)
def clientSetUp(self):
self.cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
def clientTearDown(self):
self.cli.close()
self.cli = None
ThreadableTest.clientTearDown(self)
class ThreadedUDPSocketTest(SocketUDPTest, ThreadableTest):
def __init__(self, methodName='runTest'):
SocketUDPTest.__init__(self, methodName=methodName)
ThreadableTest.__init__(self)
def clientSetUp(self):
self.cli = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
def clientTearDown(self):
self.cli.close()
self.cli = None
ThreadableTest.clientTearDown(self)
@unittest.skipUnless(HAVE_SOCKET_UDPLITE,
'UDPLITE sockets required for this test.')
class ThreadedUDPLITESocketTest(SocketUDPLITETest, ThreadableTest):
def __init__(self, methodName='runTest'):
SocketUDPLITETest.__init__(self, methodName=methodName)
ThreadableTest.__init__(self)
def clientSetUp(self):
self.cli = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDPLITE)
def clientTearDown(self):
self.cli.close()
self.cli = None
ThreadableTest.clientTearDown(self)
class ThreadedCANSocketTest(SocketCANTest, ThreadableTest):
def __init__(self, methodName='runTest'):
SocketCANTest.__init__(self, methodName=methodName)
ThreadableTest.__init__(self)
def clientSetUp(self):
self.cli = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW)
try:
self.cli.bind((self.interface,))
except OSError:
# skipTest should not be called here, and will be called in the
# server instead
pass
def clientTearDown(self):
self.cli.close()
self.cli = None
ThreadableTest.clientTearDown(self)
class ThreadedRDSSocketTest(SocketRDSTest, ThreadableTest):
def __init__(self, methodName='runTest'):
SocketRDSTest.__init__(self, methodName=methodName)
ThreadableTest.__init__(self)
def clientSetUp(self):
self.cli = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0)
try:
# RDS sockets must be bound explicitly to send or receive data
self.cli.bind((HOST, 0))
self.cli_addr = self.cli.getsockname()
except OSError:
# skipTest should not be called here, and will be called in the
# server instead
pass
def clientTearDown(self):
self.cli.close()
self.cli = None
ThreadableTest.clientTearDown(self)
@unittest.skipIf(fcntl is None, "need fcntl")
@unittest.skipIf(WSL, 'VSOCK does not work on Microsoft WSL')
@unittest.skipUnless(HAVE_SOCKET_VSOCK,
'VSOCK sockets required for this test.')
class ThreadedVSOCKSocketStreamTest(unittest.TestCase, ThreadableTest):
def __init__(self, methodName='runTest'):
unittest.TestCase.__init__(self, methodName=methodName)
ThreadableTest.__init__(self)
def setUp(self):
self.serv = socket.socket(socket.AF_VSOCK, socket.SOCK_STREAM)
self.addCleanup(self.serv.close)
self.serv.bind((socket.VMADDR_CID_ANY, VSOCKPORT))
self.serv.listen()
self.serverExplicitReady()
self.serv.settimeout(support.LOOPBACK_TIMEOUT)
self.conn, self.connaddr = self.serv.accept()
self.addCleanup(self.conn.close)
def clientSetUp(self):
time.sleep(0.1)
self.cli = socket.socket(socket.AF_VSOCK, socket.SOCK_STREAM)
self.addCleanup(self.cli.close)
cid = get_cid()
if cid in (socket.VMADDR_CID_HOST, socket.VMADDR_CID_ANY):
# gh-119461: Use the local communication address (loopback)
cid = socket.VMADDR_CID_LOCAL
self.cli.connect((cid, VSOCKPORT))
def testStream(self):
msg = self.conn.recv(1024)
self.assertEqual(msg, MSG)
def _testStream(self):
self.cli.send(MSG)
self.cli.close()
class SocketConnectedTest(ThreadedTCPSocketTest):
"""Socket tests for client-server connection.
self.cli_conn is a client socket connected to the server. The
setUp() method guarantees that it is connected to the server.
"""
def __init__(self, methodName='runTest'):
ThreadedTCPSocketTest.__init__(self, methodName=methodName)
def setUp(self):
ThreadedTCPSocketTest.setUp(self)
# Indicate explicitly we're ready for the client thread to
# proceed and then perform the blocking call to accept
self.serverExplicitReady()
conn, addr = self.serv.accept()
self.cli_conn = conn
def tearDown(self):
self.cli_conn.close()
self.cli_conn = None
ThreadedTCPSocketTest.tearDown(self)
def clientSetUp(self):
ThreadedTCPSocketTest.clientSetUp(self)
self.cli.connect((HOST, self.port))
self.serv_conn = self.cli
def clientTearDown(self):
self.serv_conn.close()
self.serv_conn = None
ThreadedTCPSocketTest.clientTearDown(self)
class SocketPairTest(unittest.TestCase, ThreadableTest):
def __init__(self, methodName='runTest'):
unittest.TestCase.__init__(self, methodName=methodName)
ThreadableTest.__init__(self)
self.cli = None
self.serv = None
def socketpair(self):
# To be overridden by some child classes.
return socket.socketpair()
def setUp(self):
self.serv, self.cli = self.socketpair()
def tearDown(self):
if self.serv:
self.serv.close()
self.serv = None
def clientSetUp(self):
pass
def clientTearDown(self):
if self.cli:
self.cli.close()
self.cli = None
ThreadableTest.clientTearDown(self)
# The following classes are used by the sendmsg()/recvmsg() tests.
# Combining, for instance, ConnectedStreamTestMixin and TCPTestBase
# gives a drop-in replacement for SocketConnectedTest, but different
# address families can be used, and the attributes serv_addr and
# cli_addr will be set to the addresses of the endpoints.
class SocketTestBase(unittest.TestCase):
"""A base class for socket tests.
Subclasses must provide methods newSocket() to return a new socket
and bindSock(sock) to bind it to an unused address.
Creates a socket self.serv and sets self.serv_addr to its address.
"""
def setUp(self):
self.serv = self.newSocket()
self.addCleanup(self.close_server)
self.bindServer()
def close_server(self):
self.serv.close()
self.serv = None
def bindServer(self):
"""Bind server socket and set self.serv_addr to its address."""
self.bindSock(self.serv)
self.serv_addr = self.serv.getsockname()
class SocketListeningTestMixin(SocketTestBase):
"""Mixin to listen on the server socket."""
def setUp(self):
super().setUp()
self.serv.listen()
class ThreadedSocketTestMixin(SocketTestBase, ThreadableTest):
"""Mixin to add client socket and allow client/server tests.
Client socket is self.cli and its address is self.cli_addr. See
ThreadableTest for usage information.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
ThreadableTest.__init__(self)
def clientSetUp(self):
self.cli = self.newClientSocket()
self.bindClient()
def newClientSocket(self):
"""Return a new socket for use as client."""
return self.newSocket()
def bindClient(self):
"""Bind client socket and set self.cli_addr to its address."""
self.bindSock(self.cli)
self.cli_addr = self.cli.getsockname()
def clientTearDown(self):
self.cli.close()
self.cli = None
ThreadableTest.clientTearDown(self)
class ConnectedStreamTestMixin(SocketListeningTestMixin,
ThreadedSocketTestMixin):
"""Mixin to allow client/server stream tests with connected client.
Server's socket representing connection to client is self.cli_conn
and client's connection to server is self.serv_conn. (Based on
SocketConnectedTest.)
"""
def setUp(self):
super().setUp()
# Indicate explicitly we're ready for the client thread to
# proceed and then perform the blocking call to accept
self.serverExplicitReady()
conn, addr = self.serv.accept()
self.cli_conn = conn
def tearDown(self):
self.cli_conn.close()
self.cli_conn = None
super().tearDown()
def clientSetUp(self):
super().clientSetUp()
self.cli.connect(self.serv_addr)
self.serv_conn = self.cli
def clientTearDown(self):
try:
self.serv_conn.close()
self.serv_conn = None
except AttributeError:
pass
super().clientTearDown()
class UnixSocketTestBase(SocketTestBase):
"""Base class for Unix-domain socket tests."""
# This class is used for file descriptor passing tests, so we
# create the sockets in a private directory so that other users
# can't send anything that might be problematic for a privileged
# user running the tests.
def bindSock(self, sock):
path = socket_helper.create_unix_domain_name()
self.addCleanup(os_helper.unlink, path)
socket_helper.bind_unix_socket(sock, path)
class UnixStreamBase(UnixSocketTestBase):
"""Base class for Unix-domain SOCK_STREAM tests."""
def newSocket(self):
return socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
class InetTestBase(SocketTestBase):
"""Base class for IPv4 socket tests."""
host = HOST
def setUp(self):
super().setUp()
self.port = self.serv_addr[1]
def bindSock(self, sock):
socket_helper.bind_port(sock, host=self.host)
class TCPTestBase(InetTestBase):
"""Base class for TCP-over-IPv4 tests."""
def newSocket(self):
return socket.socket(socket.AF_INET, socket.SOCK_STREAM)
class UDPTestBase(InetTestBase):
"""Base class for UDP-over-IPv4 tests."""
def newSocket(self):
return socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
class UDPLITETestBase(InetTestBase):
"""Base class for UDPLITE-over-IPv4 tests."""
def newSocket(self):
return socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDPLITE)
class SCTPStreamBase(InetTestBase):
"""Base class for SCTP tests in one-to-one (SOCK_STREAM) mode."""
def newSocket(self):
return socket.socket(socket.AF_INET, socket.SOCK_STREAM,
socket.IPPROTO_SCTP)
class Inet6TestBase(InetTestBase):
"""Base class for IPv6 socket tests."""
host = socket_helper.HOSTv6
class UDP6TestBase(Inet6TestBase):
"""Base class for UDP-over-IPv6 tests."""
def newSocket(self):
return socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
class UDPLITE6TestBase(Inet6TestBase):
"""Base class for UDPLITE-over-IPv6 tests."""
def newSocket(self):
return socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDPLITE)
# Test-skipping decorators for use with ThreadableTest.
def skipWithClientIf(condition, reason):
"""Skip decorated test if condition is true, add client_skip decorator.
If the decorated object is not a class, sets its attribute
"client_skip" to a decorator which will return an empty function
if the test is to be skipped, or the original function if it is
not. This can be used to avoid running the client part of a
skipped test when using ThreadableTest.
"""
def client_pass(*args, **kwargs):
pass
def skipdec(obj):
retval = unittest.skip(reason)(obj)
if not isinstance(obj, type):
retval.client_skip = lambda f: client_pass
return retval
def noskipdec(obj):
if not (isinstance(obj, type) or hasattr(obj, "client_skip")):
obj.client_skip = lambda f: f
return obj
return skipdec if condition else noskipdec
def requireAttrs(obj, *attributes):
"""Skip decorated test if obj is missing any of the given attributes.
Sets client_skip attribute as skipWithClientIf() does.
"""
missing = [name for name in attributes if not hasattr(obj, name)]
return skipWithClientIf(
missing, "don't have " + ", ".join(name for name in missing))
def requireSocket(*args):
"""Skip decorated test if a socket cannot be created with given arguments.
When an argument is given as a string, will use the value of that
attribute of the socket module, or skip the test if it doesn't
exist. Sets client_skip attribute as skipWithClientIf() does.
"""
err = None
missing = [obj for obj in args if
isinstance(obj, str) and not hasattr(socket, obj)]
if missing:
err = "don't have " + ", ".join(name for name in missing)
else:
callargs = [getattr(socket, obj) if isinstance(obj, str) else obj
for obj in args]
try:
s = socket.socket(*callargs)
except OSError as e:
# XXX: check errno?
err = str(e)
else:
s.close()
return skipWithClientIf(
err is not None,
"can't create socket({0}): {1}".format(
", ".join(str(o) for o in args), err))
#######################################################################
## Begin Tests
class GeneralModuleTests(unittest.TestCase):
@unittest.skipUnless(_socket is not None, 'need _socket module')
def test_socket_type(self):
self.assertTrue(gc.is_tracked(_socket.socket))
with self.assertRaisesRegex(TypeError, "immutable"):
_socket.socket.foo = 1
def test_SocketType_is_socketobject(self):
import _socket
self.assertTrue(socket.SocketType is _socket.socket)
s = socket.socket()
self.assertIsInstance(s, socket.SocketType)
s.close()
def test_repr(self):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
with s:
self.assertIn('fd=%i' % s.fileno(), repr(s))
self.assertIn('family=%s' % socket.AF_INET, repr(s))
self.assertIn('type=%s' % socket.SOCK_STREAM, repr(s))
self.assertIn('proto=0', repr(s))
self.assertNotIn('raddr', repr(s))
s.bind(('127.0.0.1', 0))
self.assertIn('laddr', repr(s))
self.assertIn(str(s.getsockname()), repr(s))
self.assertIn('[closed]', repr(s))
self.assertNotIn('laddr', repr(s))
@unittest.skipUnless(_socket is not None, 'need _socket module')
def test_csocket_repr(self):
s = _socket.socket(_socket.AF_INET, _socket.SOCK_STREAM)
try:
expected = ('<socket object, fd=%s, family=%s, type=%s, proto=%s>'
% (s.fileno(), s.family, s.type, s.proto))
self.assertEqual(repr(s), expected)
finally:
s.close()
expected = ('<socket object, fd=-1, family=%s, type=%s, proto=%s>'
% (s.family, s.type, s.proto))
self.assertEqual(repr(s), expected)
def test_weakref(self):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
p = proxy(s)
self.assertEqual(p.fileno(), s.fileno())
s = None
support.gc_collect() # For PyPy or other GCs.
try:
p.fileno()
except ReferenceError:
pass
else:
self.fail('Socket proxy still exists')
def testSocketError(self):
# Testing socket module exceptions
msg = "Error raising socket exception (%s)."
with self.assertRaises(OSError, msg=msg % 'OSError'):
raise OSError
with self.assertRaises(OSError, msg=msg % 'socket.herror'):
raise socket.herror
with self.assertRaises(OSError, msg=msg % 'socket.gaierror'):
raise socket.gaierror
def testSendtoErrors(self):
# Testing that sendto doesn't mask failures. See #10169.
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.addCleanup(s.close)
s.bind(('', 0))
sockname = s.getsockname()
# 2 args
with self.assertRaises(TypeError) as cm:
s.sendto('\u2620', sockname)
self.assertEqual(str(cm.exception),
"a bytes-like object is required, not 'str'")
with self.assertRaises(TypeError) as cm:
s.sendto(5j, sockname)
self.assertEqual(str(cm.exception),
"a bytes-like object is required, not 'complex'")
with self.assertRaises(TypeError) as cm:
s.sendto(b'foo', None)
self.assertIn('not NoneType',str(cm.exception))
# 3 args
with self.assertRaises(TypeError) as cm:
s.sendto('\u2620', 0, sockname)
self.assertEqual(str(cm.exception),
"a bytes-like object is required, not 'str'")
with self.assertRaises(TypeError) as cm:
s.sendto(5j, 0, sockname)
self.assertEqual(str(cm.exception),
"a bytes-like object is required, not 'complex'")
with self.assertRaises(TypeError) as cm:
s.sendto(b'foo', 0, None)
self.assertIn('not NoneType', str(cm.exception))
with self.assertRaises(TypeError) as cm:
s.sendto(b'foo', 'bar', sockname)
with self.assertRaises(TypeError) as cm:
s.sendto(b'foo', None, None)
# wrong number of args
with self.assertRaises(TypeError) as cm:
s.sendto(b'foo')
self.assertIn('(1 given)', str(cm.exception))
with self.assertRaises(TypeError) as cm:
s.sendto(b'foo', 0, sockname, 4)
self.assertIn('(4 given)', str(cm.exception))
def testCrucialConstants(self):
# Testing for mission critical constants
socket.AF_INET
if socket.has_ipv6:
socket.AF_INET6
socket.SOCK_STREAM
socket.SOCK_DGRAM
socket.SOCK_RAW
socket.SOCK_RDM
socket.SOCK_SEQPACKET
socket.SOL_SOCKET
socket.SO_REUSEADDR
def testCrucialIpProtoConstants(self):
socket.IPPROTO_TCP
socket.IPPROTO_UDP
if socket.has_ipv6:
socket.IPPROTO_IPV6
@unittest.skipUnless(os.name == "nt", "Windows specific")
def testWindowsSpecificConstants(self):
socket.IPPROTO_ICLFXBM
socket.IPPROTO_ST
socket.IPPROTO_CBT
socket.IPPROTO_IGP
socket.IPPROTO_RDP
socket.IPPROTO_PGM
socket.IPPROTO_L2TP
socket.IPPROTO_SCTP
@unittest.skipIf(support.is_wasi, "WASI is missing these methods")
def test_socket_methods(self):
# socket methods that depend on a configure HAVE_ check. They should
# be present on all platforms except WASI.
names = [
"_accept", "bind", "connect", "connect_ex", "getpeername",
"getsockname", "listen", "recvfrom", "recvfrom_into", "sendto",
"setsockopt", "shutdown"
]
for name in names:
if not hasattr(socket.socket, name):
self.fail(f"socket method {name} is missing")
@unittest.skipUnless(sys.platform == 'darwin', 'macOS specific test')
@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test')
def test3542SocketOptions(self):
# Ref. issue #35569 and https://tools.ietf.org/html/rfc3542
opts = {
'IPV6_CHECKSUM',
'IPV6_DONTFRAG',
'IPV6_DSTOPTS',
'IPV6_HOPLIMIT',
'IPV6_HOPOPTS',
'IPV6_NEXTHOP',
'IPV6_PATHMTU',
'IPV6_PKTINFO',
'IPV6_RECVDSTOPTS',
'IPV6_RECVHOPLIMIT',
'IPV6_RECVHOPOPTS',
'IPV6_RECVPATHMTU',
'IPV6_RECVPKTINFO',
'IPV6_RECVRTHDR',
'IPV6_RECVTCLASS',
'IPV6_RTHDR',
'IPV6_RTHDRDSTOPTS',
'IPV6_RTHDR_TYPE_0',
'IPV6_TCLASS',
'IPV6_USE_MIN_MTU',
}
for opt in opts:
self.assertTrue(
hasattr(socket, opt), f"Missing RFC3542 socket option '{opt}'"
)
def testHostnameRes(self):
# Testing hostname resolution mechanisms
hostname = socket.gethostname()
try:
ip = socket.gethostbyname(hostname)
except OSError:
# Probably name lookup wasn't set up right; skip this test
self.skipTest('name lookup failure')
self.assertTrue(ip.find('.') >= 0, "Error resolving host to ip.")
try:
hname, aliases, ipaddrs = socket.gethostbyaddr(ip)
except OSError:
# Probably a similar problem as above; skip this test
self.skipTest('name lookup failure')
all_host_names = [hostname, hname] + aliases
fqhn = socket.getfqdn(ip)
if not fqhn in all_host_names:
self.fail("Error testing host resolution mechanisms. (fqdn: %s, all: %s)" % (fqhn, repr(all_host_names)))
def test_host_resolution(self):
for addr in [socket_helper.HOSTv4, '10.0.0.1', '255.255.255.255']:
self.assertEqual(socket.gethostbyname(addr), addr)
# we don't test socket_helper.HOSTv6 because there's a chance it doesn't have
# a matching name entry (e.g. 'ip6-localhost')
for host in [socket_helper.HOSTv4]:
self.assertIn(host, socket.gethostbyaddr(host)[2])
def test_host_resolution_bad_address(self):
# These are all malformed IP addresses and expected not to resolve to
# any result. But some ISPs, e.g. AWS and AT&T, may successfully
# resolve these IPs. In particular, AT&T's DNS Error Assist service
# will break this test. See https://bugs.python.org/issue42092 for a
# workaround.
explanation = (
"resolving an invalid IP address did not raise OSError; "
"can be caused by a broken DNS server"
)
for addr in ['0.1.1.~1', '1+.1.1.1', '::1q', '::1::2',
'1:1:1:1:1:1:1:1:1']:
with self.assertRaises(OSError, msg=addr):
socket.gethostbyname(addr)
with self.assertRaises(OSError, msg=explanation):
socket.gethostbyaddr(addr)
@unittest.skipUnless(hasattr(socket, 'sethostname'), "test needs socket.sethostname()")
@unittest.skipUnless(hasattr(socket, 'gethostname'), "test needs socket.gethostname()")
def test_sethostname(self):
oldhn = socket.gethostname()
try:
socket.sethostname('new')
except OSError as e:
if e.errno == errno.EPERM:
self.skipTest("test should be run as root")
else:
raise
try:
# running test as root!
self.assertEqual(socket.gethostname(), 'new')
# Should work with bytes objects too
socket.sethostname(b'bar')
self.assertEqual(socket.gethostname(), 'bar')
finally:
socket.sethostname(oldhn)
@unittest.skipUnless(hasattr(socket, 'if_nameindex'),
'socket.if_nameindex() not available.')
@support.skip_android_selinux('if_nameindex')
def testInterfaceNameIndex(self):
interfaces = socket.if_nameindex()
for index, name in interfaces:
self.assertIsInstance(index, int)
self.assertIsInstance(name, str)
# interface indices are non-zero integers
self.assertGreater(index, 0)
_index = socket.if_nametoindex(name)
self.assertIsInstance(_index, int)
self.assertEqual(index, _index)
_name = socket.if_indextoname(index)
self.assertIsInstance(_name, str)
self.assertEqual(name, _name)
@unittest.skipUnless(hasattr(socket, 'if_indextoname'),
'socket.if_indextoname() not available.')
@support.skip_android_selinux('if_indextoname')
def testInvalidInterfaceIndexToName(self):
self.assertRaises(OSError, socket.if_indextoname, 0)
self.assertRaises(OverflowError, socket.if_indextoname, -1)
self.assertRaises(OverflowError, socket.if_indextoname, 2**1000)
self.assertRaises(TypeError, socket.if_indextoname, '_DEADBEEF')
if hasattr(socket, 'if_nameindex'):
indices = dict(socket.if_nameindex())
for index in indices:
index2 = index + 2**32
if index2 not in indices:
with self.assertRaises((OverflowError, OSError)):
socket.if_indextoname(index2)
for index in 2**32-1, 2**64-1:
if index not in indices:
with self.assertRaises((OverflowError, OSError)):
socket.if_indextoname(index)
@unittest.skipUnless(hasattr(socket, 'if_nametoindex'),
'socket.if_nametoindex() not available.')
@support.skip_android_selinux('if_nametoindex')
def testInvalidInterfaceNameToIndex(self):
self.assertRaises(TypeError, socket.if_nametoindex, 0)
self.assertRaises(OSError, socket.if_nametoindex, '_DEADBEEF')
@unittest.skipUnless(hasattr(sys, 'getrefcount'),
'test needs sys.getrefcount()')
def testRefCountGetNameInfo(self):
# Testing reference count for getnameinfo
try:
# On some versions, this loses a reference
orig = sys.getrefcount(__name__)
socket.getnameinfo(__name__,0)
except TypeError:
if sys.getrefcount(__name__) != orig:
self.fail("socket.getnameinfo loses a reference")
def testInterpreterCrash(self):
# Making sure getnameinfo doesn't crash the interpreter
try:
# On some versions, this crashes the interpreter.
socket.getnameinfo(('x', 0, 0, 0), 0)
except OSError:
pass
def testNtoH(self):
# This just checks that htons etc. are their own inverse,
# when looking at the lower 16 or 32 bits.
sizes = {socket.htonl: 32, socket.ntohl: 32,
socket.htons: 16, socket.ntohs: 16}
for func, size in sizes.items():
mask = (1<<size) - 1
for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210):
self.assertEqual(i & mask, func(func(i&mask)) & mask)
swapped = func(mask)
self.assertEqual(swapped & mask, mask)
self.assertRaises(OverflowError, func, 1<<34)
@support.cpython_only
@unittest.skipIf(_testcapi is None, "requires _testcapi")
def testNtoHErrors(self):
import _testcapi
s_good_values = [0, 1, 2, 0xffff]
l_good_values = s_good_values + [0xffffffff]
l_bad_values = [-1, -2, 1<<32, 1<<1000]
s_bad_values = (
l_bad_values +
[_testcapi.INT_MIN-1, _testcapi.INT_MAX+1] +
[1 << 16, _testcapi.INT_MAX]
)
for k in s_good_values:
socket.ntohs(k)
socket.htons(k)
for k in l_good_values:
socket.ntohl(k)
socket.htonl(k)
for k in s_bad_values:
self.assertRaises(OverflowError, socket.ntohs, k)
self.assertRaises(OverflowError, socket.htons, k)
for k in l_bad_values:
self.assertRaises(OverflowError, socket.ntohl, k)
self.assertRaises(OverflowError, socket.htonl, k)
def testGetServBy(self):
eq = self.assertEqual
# Find one service that exists, then check all the related interfaces.
# I've ordered this by protocols that have both a tcp and udp
# protocol, at least for modern Linuxes.
if (
sys.platform.startswith(
('linux', 'android', 'freebsd', 'netbsd', 'gnukfreebsd'))
or is_apple
):
# avoid the 'echo' service on this platform, as there is an
# assumption breaking non-standard port/protocol entry
services = ('daytime', 'qotd', 'domain')
else:
services = ('echo', 'daytime', 'domain')
for service in services:
try:
port = socket.getservbyname(service, 'tcp')
break
except OSError:
pass
else:
raise OSError
# Try same call with optional protocol omitted
# Issue gh-71123: this fails on Android before API level 23.
if not (support.is_android and platform.android_ver().api_level < 23):
port2 = socket.getservbyname(service)
eq(port, port2)
# Try udp, but don't barf if it doesn't exist
try:
udpport = socket.getservbyname(service, 'udp')
except OSError:
udpport = None
else:
eq(udpport, port)
# Now make sure the lookup by port returns the same service name
# Issue #26936: when the protocol is omitted, this fails on Android
# before API level 28.
if not (support.is_android and platform.android_ver().api_level < 28):
eq(socket.getservbyport(port2), service)
eq(socket.getservbyport(port, 'tcp'), service)
if udpport is not None:
eq(socket.getservbyport(udpport, 'udp'), service)
# Make sure getservbyport does not accept out of range ports.
self.assertRaises(OverflowError, socket.getservbyport, -1)
self.assertRaises(OverflowError, socket.getservbyport, 65536)
def testDefaultTimeout(self):
# Testing default timeout
# The default timeout should initially be None
self.assertEqual(socket.getdefaulttimeout(), None)
with socket.socket() as s:
self.assertEqual(s.gettimeout(), None)
# Set the default timeout to 10, and see if it propagates
with socket_setdefaulttimeout(10):
self.assertEqual(socket.getdefaulttimeout(), 10)
with socket.socket() as sock:
self.assertEqual(sock.gettimeout(), 10)
# Reset the default timeout to None, and see if it propagates
socket.setdefaulttimeout(None)
self.assertEqual(socket.getdefaulttimeout(), None)
with socket.socket() as sock:
self.assertEqual(sock.gettimeout(), None)
# Check that setting it to an invalid value raises ValueError
self.assertRaises(ValueError, socket.setdefaulttimeout, -1)
# Check that setting it to an invalid type raises TypeError
self.assertRaises(TypeError, socket.setdefaulttimeout, "spam")
@unittest.skipUnless(hasattr(socket, 'inet_aton'),
'test needs socket.inet_aton()')
def testIPv4_inet_aton_fourbytes(self):
# Test that issue1008086 and issue767150 are fixed.
# It must return 4 bytes.
self.assertEqual(b'\x00'*4, socket.inet_aton('0.0.0.0'))
self.assertEqual(b'\xff'*4, socket.inet_aton('255.255.255.255'))
@unittest.skipUnless(hasattr(socket, 'inet_pton'),
'test needs socket.inet_pton()')
def testIPv4toString(self):
from socket import inet_aton as f, inet_pton, AF_INET
g = lambda a: inet_pton(AF_INET, a)
assertInvalid = lambda func,a: self.assertRaises(
(OSError, ValueError), func, a
)
self.assertEqual(b'\x00\x00\x00\x00', f('0.0.0.0'))
self.assertEqual(b'\xff\x00\xff\x00', f('255.0.255.0'))
self.assertEqual(b'\xaa\xaa\xaa\xaa', f('170.170.170.170'))
self.assertEqual(b'\x01\x02\x03\x04', f('1.2.3.4'))
self.assertEqual(b'\xff\xff\xff\xff', f('255.255.255.255'))
# bpo-29972: inet_pton() doesn't fail on AIX
if not AIX:
assertInvalid(f, '0.0.0.')
assertInvalid(f, '300.0.0.0')
assertInvalid(f, 'a.0.0.0')
assertInvalid(f, '1.2.3.4.5')
assertInvalid(f, '::1')
self.assertEqual(b'\x00\x00\x00\x00', g('0.0.0.0'))
self.assertEqual(b'\xff\x00\xff\x00', g('255.0.255.0'))
self.assertEqual(b'\xaa\xaa\xaa\xaa', g('170.170.170.170'))
self.assertEqual(b'\xff\xff\xff\xff', g('255.255.255.255'))
assertInvalid(g, '0.0.0.')
assertInvalid(g, '300.0.0.0')
assertInvalid(g, 'a.0.0.0')
assertInvalid(g, '1.2.3.4.5')
assertInvalid(g, '::1')
@unittest.skipUnless(hasattr(socket, 'inet_pton'),
'test needs socket.inet_pton()')
def testIPv6toString(self):
try:
from socket import inet_pton, AF_INET6, has_ipv6
if not has_ipv6:
self.skipTest('IPv6 not available')
except ImportError:
self.skipTest('could not import needed symbols from socket')
if sys.platform == "win32":
try:
inet_pton(AF_INET6, '::')
except OSError as e:
if e.winerror == 10022:
self.skipTest('IPv6 might not be supported')
f = lambda a: inet_pton(AF_INET6, a)
assertInvalid = lambda a: self.assertRaises(
(OSError, ValueError), f, a
)
self.assertEqual(b'\x00' * 16, f('::'))
self.assertEqual(b'\x00' * 16, f('0::0'))
self.assertEqual(b'\x00\x01' + b'\x00' * 14, f('1::'))
self.assertEqual(
b'\x45\xef\x76\xcb\x00\x1a\x56\xef\xaf\xeb\x0b\xac\x19\x24\xae\xae',
f('45ef:76cb:1a:56ef:afeb:bac:1924:aeae')
)
self.assertEqual(
b'\xad\x42\x0a\xbc' + b'\x00' * 4 + b'\x01\x27\x00\x00\x02\x54\x00\x02',
f('ad42:abc::127:0:254:2')
)
self.assertEqual(b'\x00\x12\x00\x0a' + b'\x00' * 12, f('12:a::'))
assertInvalid('0x20::')
assertInvalid(':::')
assertInvalid('::0::')
assertInvalid('1::abc::')
assertInvalid('1::abc::def')
assertInvalid('1:2:3:4:5:6')
assertInvalid('1:2:3:4:5:6:')
assertInvalid('1:2:3:4:5:6:7:8:0')
# bpo-29972: inet_pton() doesn't fail on AIX
if not AIX:
assertInvalid('1:2:3:4:5:6:7:8:')
self.assertEqual(b'\x00' * 12 + b'\xfe\x2a\x17\x40',
f('::254.42.23.64')
)
self.assertEqual(
b'\x00\x42' + b'\x00' * 8 + b'\xa2\x9b\xfe\x2a\x17\x40',
f('42::a29b:254.42.23.64')
)
self.assertEqual(
b'\x00\x42\xa8\xb9\x00\x00\x00\x02\xff\xff\xa2\x9b\xfe\x2a\x17\x40',
f('42:a8b9:0:2:ffff:a29b:254.42.23.64')
)
assertInvalid('255.254.253.252')
assertInvalid('1::260.2.3.0')
assertInvalid('1::0.be.e.0')
assertInvalid('1:2:3:4:5:6:7:1.2.3.4')
assertInvalid('::1.2.3.4:0')
assertInvalid('0.100.200.0:3:4:5:6:7:8')
@unittest.skipUnless(hasattr(socket, 'inet_ntop'),
'test needs socket.inet_ntop()')
def testStringToIPv4(self):
from socket import inet_ntoa as f, inet_ntop, AF_INET
g = lambda a: inet_ntop(AF_INET, a)
assertInvalid = lambda func,a: self.assertRaises(
(OSError, ValueError), func, a
)
self.assertEqual('1.0.1.0', f(b'\x01\x00\x01\x00'))
self.assertEqual('170.85.170.85', f(b'\xaa\x55\xaa\x55'))
self.assertEqual('255.255.255.255', f(b'\xff\xff\xff\xff'))
self.assertEqual('1.2.3.4', f(b'\x01\x02\x03\x04'))
assertInvalid(f, b'\x00' * 3)
assertInvalid(f, b'\x00' * 5)
assertInvalid(f, b'\x00' * 16)
self.assertEqual('170.85.170.85', f(bytearray(b'\xaa\x55\xaa\x55')))
self.assertEqual('1.0.1.0', g(b'\x01\x00\x01\x00'))
self.assertEqual('170.85.170.85', g(b'\xaa\x55\xaa\x55'))
self.assertEqual('255.255.255.255', g(b'\xff\xff\xff\xff'))
assertInvalid(g, b'\x00' * 3)
assertInvalid(g, b'\x00' * 5)
assertInvalid(g, b'\x00' * 16)
self.assertEqual('170.85.170.85', g(bytearray(b'\xaa\x55\xaa\x55')))
@unittest.skipUnless(hasattr(socket, 'inet_ntop'),
'test needs socket.inet_ntop()')
def testStringToIPv6(self):
try:
from socket import inet_ntop, AF_INET6, has_ipv6
if not has_ipv6:
self.skipTest('IPv6 not available')
except ImportError:
self.skipTest('could not import needed symbols from socket')
if sys.platform == "win32":
try:
inet_ntop(AF_INET6, b'\x00' * 16)
except OSError as e:
if e.winerror == 10022:
self.skipTest('IPv6 might not be supported')
f = lambda a: inet_ntop(AF_INET6, a)
assertInvalid = lambda a: self.assertRaises(
(OSError, ValueError), f, a
)
self.assertEqual('::', f(b'\x00' * 16))
self.assertEqual('::1', f(b'\x00' * 15 + b'\x01'))
self.assertEqual(
'aef:b01:506:1001:ffff:9997:55:170',
f(b'\x0a\xef\x0b\x01\x05\x06\x10\x01\xff\xff\x99\x97\x00\x55\x01\x70')
)
self.assertEqual('::1', f(bytearray(b'\x00' * 15 + b'\x01')))
assertInvalid(b'\x12' * 15)
assertInvalid(b'\x12' * 17)
assertInvalid(b'\x12' * 4)
# XXX The following don't test module-level functionality...
def testSockName(self):
# Testing getsockname()
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.addCleanup(sock.close)
# Since find_unused_port() is inherently subject to race conditions, we
# call it a couple times if necessary.
for i in itertools.count():
port = socket_helper.find_unused_port()
try:
sock.bind(("0.0.0.0", port))
except OSError as e:
if e.errno != errno.EADDRINUSE or i == 5:
raise
else:
break
name = sock.getsockname()
# XXX(nnorwitz): http://tinyurl.com/os5jz seems to indicate
# it reasonable to get the host's addr in addition to 0.0.0.0.
# At least for eCos. This is required for the S/390 to pass.
try:
my_ip_addr = socket.gethostbyname(socket.gethostname())
except OSError:
# Probably name lookup wasn't set up right; skip this test
self.skipTest('name lookup failure')
self.assertIn(name[0], ("0.0.0.0", my_ip_addr), '%s invalid' % name[0])
self.assertEqual(name[1], port)
def testGetSockOpt(self):
# Testing getsockopt()
# We know a socket should start without reuse==0
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.addCleanup(sock.close)
reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)
self.assertFalse(reuse != 0, "initial mode is reuse")
def testSetSockOpt(self):
# Testing setsockopt()
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.addCleanup(sock.close)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)
self.assertFalse(reuse == 0, "failed to set reuse mode")
def testSendAfterClose(self):
# testing send() after close() with timeout
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.settimeout(1)
self.assertRaises(OSError, sock.send, b"spam")
def testCloseException(self):
sock = socket.socket()
sock.bind((socket._LOCALHOST, 0))
socket.socket(fileno=sock.fileno()).close()
try:
sock.close()
except OSError as err:
# Winsock apparently raises ENOTSOCK
self.assertIn(err.errno, (errno.EBADF, errno.ENOTSOCK))
else:
self.fail("close() should raise EBADF/ENOTSOCK")
def testNewAttributes(self):
# testing .family, .type and .protocol
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
self.assertEqual(sock.family, socket.AF_INET)
if hasattr(socket, 'SOCK_CLOEXEC'):
self.assertIn(sock.type,
(socket.SOCK_STREAM | socket.SOCK_CLOEXEC,
socket.SOCK_STREAM))
else:
self.assertEqual(sock.type, socket.SOCK_STREAM)
self.assertEqual(sock.proto, 0)
def test_getsockaddrarg(self):
sock = socket.socket()
self.addCleanup(sock.close)
port = socket_helper.find_unused_port()
big_port = port + 65536
neg_port = port - 65536
self.assertRaises(OverflowError, sock.bind, (HOST, big_port))
self.assertRaises(OverflowError, sock.bind, (HOST, neg_port))
# Since find_unused_port() is inherently subject to race conditions, we
# call it a couple times if necessary.
for i in itertools.count():
port = socket_helper.find_unused_port()
try:
sock.bind((HOST, port))
except OSError as e:
if e.errno != errno.EADDRINUSE or i == 5:
raise
else:
break
@unittest.skipUnless(os.name == "nt", "Windows specific")
def test_sock_ioctl(self):
self.assertTrue(hasattr(socket.socket, 'ioctl'))
self.assertTrue(hasattr(socket, 'SIO_RCVALL'))
self.assertTrue(hasattr(socket, 'RCVALL_ON'))
self.assertTrue(hasattr(socket, 'RCVALL_OFF'))
self.assertTrue(hasattr(socket, 'SIO_KEEPALIVE_VALS'))
s = socket.socket()
self.addCleanup(s.close)
self.assertRaises(ValueError, s.ioctl, -1, None)
s.ioctl(socket.SIO_KEEPALIVE_VALS, (1, 100, 100))
@unittest.skipUnless(os.name == "nt", "Windows specific")
@unittest.skipUnless(hasattr(socket, 'SIO_LOOPBACK_FAST_PATH'),
'Loopback fast path support required for this test')
def test_sio_loopback_fast_path(self):
s = socket.socket()
self.addCleanup(s.close)
try:
s.ioctl(socket.SIO_LOOPBACK_FAST_PATH, True)
except OSError as exc:
WSAEOPNOTSUPP = 10045
if exc.winerror == WSAEOPNOTSUPP:
self.skipTest("SIO_LOOPBACK_FAST_PATH is defined but "
"doesn't implemented in this Windows version")
raise
self.assertRaises(TypeError, s.ioctl, socket.SIO_LOOPBACK_FAST_PATH, None)
def testGetaddrinfo(self):
try:
socket.getaddrinfo('localhost', 80)
except socket.gaierror as err:
if err.errno == socket.EAI_SERVICE:
# see http://bugs.python.org/issue1282647
self.skipTest("buggy libc version")
raise
# len of every sequence is supposed to be == 5
for info in socket.getaddrinfo(HOST, None):
self.assertEqual(len(info), 5)
# host can be a domain name, a string representation of an
# IPv4/v6 address or None
socket.getaddrinfo('localhost', 80)
socket.getaddrinfo('127.0.0.1', 80)
socket.getaddrinfo(None, 80)
if socket_helper.IPV6_ENABLED:
socket.getaddrinfo('::1', 80)
# port can be a string service name such as "http", a numeric
# port number or None
# Issue #26936: this fails on Android before API level 23.
if not (support.is_android and platform.android_ver().api_level < 23):
socket.getaddrinfo(HOST, "http")
socket.getaddrinfo(HOST, 80)
socket.getaddrinfo(HOST, None)
# test family and socktype filters
infos = socket.getaddrinfo(HOST, 80, socket.AF_INET, socket.SOCK_STREAM)
for family, type, _, _, _ in infos:
self.assertEqual(family, socket.AF_INET)
self.assertEqual(repr(family), '<AddressFamily.AF_INET: %r>' % family.value)
self.assertEqual(str(family), str(family.value))
self.assertEqual(type, socket.SOCK_STREAM)
self.assertEqual(repr(type), '<SocketKind.SOCK_STREAM: %r>' % type.value)
self.assertEqual(str(type), str(type.value))
infos = socket.getaddrinfo(HOST, None, 0, socket.SOCK_STREAM)
for _, socktype, _, _, _ in infos:
self.assertEqual(socktype, socket.SOCK_STREAM)
# test proto and flags arguments
socket.getaddrinfo(HOST, None, 0, 0, socket.SOL_TCP)
socket.getaddrinfo(HOST, None, 0, 0, 0, socket.AI_PASSIVE)
# a server willing to support both IPv4 and IPv6 will
# usually do this
socket.getaddrinfo(None, 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0,
socket.AI_PASSIVE)
# test keyword arguments
a = socket.getaddrinfo(HOST, None)
b = socket.getaddrinfo(host=HOST, port=None)
self.assertEqual(a, b)
a = socket.getaddrinfo(HOST, None, socket.AF_INET)
b = socket.getaddrinfo(HOST, None, family=socket.AF_INET)
self.assertEqual(a, b)
a = socket.getaddrinfo(HOST, None, 0, socket.SOCK_STREAM)
b = socket.getaddrinfo(HOST, None, type=socket.SOCK_STREAM)
self.assertEqual(a, b)
a = socket.getaddrinfo(HOST, None, 0, 0, socket.SOL_TCP)
b = socket.getaddrinfo(HOST, None, proto=socket.SOL_TCP)
self.assertEqual(a, b)
a = socket.getaddrinfo(HOST, None, 0, 0, 0, socket.AI_PASSIVE)
b = socket.getaddrinfo(HOST, None, flags=socket.AI_PASSIVE)
self.assertEqual(a, b)
a = socket.getaddrinfo(None, 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0,
socket.AI_PASSIVE)
b = socket.getaddrinfo(host=None, port=0, family=socket.AF_UNSPEC,
type=socket.SOCK_STREAM, proto=0,
flags=socket.AI_PASSIVE)
self.assertEqual(a, b)
# Issue #6697.
self.assertRaises(UnicodeEncodeError, socket.getaddrinfo, 'localhost', '\uD800')
# Issue 17269: test workaround for OS X platform bug segfault
if hasattr(socket, 'AI_NUMERICSERV'):
try:
# The arguments here are undefined and the call may succeed
# or fail. All we care here is that it doesn't segfault.
socket.getaddrinfo("localhost", None, 0, 0, 0,
socket.AI_NUMERICSERV)
except socket.gaierror:
pass
@unittest.skipIf(_testcapi is None, "requires _testcapi")
def test_getaddrinfo_int_port_overflow(self):
# gh-74895: Test that getaddrinfo does not raise OverflowError on port.
#
# POSIX getaddrinfo() never specify the valid range for "service"
# decimal port number values. For IPv4 and IPv6 they are technically
# unsigned 16-bit values, but the API is protocol agnostic. Which values
# trigger an error from the C library function varies by platform as
# they do not all perform validation.
# The key here is that we don't want to produce OverflowError as Python
# prior to 3.12 did for ints outside of a [LONG_MIN, LONG_MAX] range.
# Leave the error up to the underlying string based platform C API.
from _testcapi import ULONG_MAX, LONG_MAX, LONG_MIN
try:
socket.getaddrinfo(None, ULONG_MAX + 1, type=socket.SOCK_STREAM)
except OverflowError:
# Platforms differ as to what values constitute a getaddrinfo() error
# return. Some fail for LONG_MAX+1, others ULONG_MAX+1, and Windows
# silently accepts such huge "port" aka "service" numeric values.
self.fail("Either no error or socket.gaierror expected.")
except socket.gaierror:
pass
try:
socket.getaddrinfo(None, LONG_MAX + 1, type=socket.SOCK_STREAM)
except OverflowError:
self.fail("Either no error or socket.gaierror expected.")
except socket.gaierror:
pass
try:
socket.getaddrinfo(None, LONG_MAX - 0xffff + 1, type=socket.SOCK_STREAM)
except OverflowError:
self.fail("Either no error or socket.gaierror expected.")
except socket.gaierror:
pass
try:
socket.getaddrinfo(None, LONG_MIN - 1, type=socket.SOCK_STREAM)
except OverflowError:
self.fail("Either no error or socket.gaierror expected.")
except socket.gaierror:
pass
socket.getaddrinfo(None, 0, type=socket.SOCK_STREAM) # No error expected.
socket.getaddrinfo(None, 0xffff, type=socket.SOCK_STREAM) # No error expected.
def test_getnameinfo(self):
# only IP addresses are allowed
self.assertRaises(OSError, socket.getnameinfo, ('mail.python.org',0), 0)
@unittest.skipUnless(support.is_resource_enabled('network'),
'network is not enabled')
def test_idna(self):
# Check for internet access before running test
# (issue #12804, issue #25138).
with socket_helper.transient_internet('python.org'):
socket.gethostbyname('python.org')
# these should all be successful
domain = 'испытание.pythontest.net'
socket.gethostbyname(domain)
socket.gethostbyname_ex(domain)
socket.getaddrinfo(domain,0,socket.AF_UNSPEC,socket.SOCK_STREAM)
# this may not work if the forward lookup chooses the IPv6 address, as that doesn't
# have a reverse entry yet
# socket.gethostbyaddr('испытание.python.org')
def check_sendall_interrupted(self, with_timeout):
# socketpair() is not strictly required, but it makes things easier.
if not hasattr(signal, 'alarm') or not hasattr(socket, 'socketpair'):
self.skipTest("signal.alarm and socket.socketpair required for this test")
# Our signal handlers clobber the C errno by calling a math function
# with an invalid domain value.
def ok_handler(*args):
self.assertRaises(ValueError, math.acosh, 0)
def raising_handler(*args):
self.assertRaises(ValueError, math.acosh, 0)
1 // 0
c, s = socket.socketpair()
old_alarm = signal.signal(signal.SIGALRM, raising_handler)
try:
if with_timeout:
# Just above the one second minimum for signal.alarm
c.settimeout(1.5)
with self.assertRaises(ZeroDivisionError):
signal.alarm(1)
c.sendall(b"x" * support.SOCK_MAX_SIZE)
if with_timeout:
signal.signal(signal.SIGALRM, ok_handler)
signal.alarm(1)
self.assertRaises(TimeoutError, c.sendall,
b"x" * support.SOCK_MAX_SIZE)
finally:
signal.alarm(0)
signal.signal(signal.SIGALRM, old_alarm)
c.close()
s.close()
def test_sendall_interrupted(self):
self.check_sendall_interrupted(False)
def test_sendall_interrupted_with_timeout(self):
self.check_sendall_interrupted(True)
def test_dealloc_warn(self):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
r = repr(sock)
with self.assertWarns(ResourceWarning) as cm:
sock = None
support.gc_collect()
self.assertIn(r, str(cm.warning.args[0]))
# An open socket file object gets dereferenced after the socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
f = sock.makefile('rb')
r = repr(sock)
sock = None
support.gc_collect()
with self.assertWarns(ResourceWarning):
f = None
support.gc_collect()
def test_name_closed_socketio(self):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
fp = sock.makefile("rb")
fp.close()
self.assertEqual(repr(fp), "<_io.BufferedReader name=-1>")
def test_unusable_closed_socketio(self):
with socket.socket() as sock:
fp = sock.makefile("rb", buffering=0)
self.assertTrue(fp.readable())
self.assertFalse(fp.writable())
self.assertFalse(fp.seekable())
fp.close()
self.assertRaises(ValueError, fp.readable)
self.assertRaises(ValueError, fp.writable)
self.assertRaises(ValueError, fp.seekable)
def test_socket_close(self):
sock = socket.socket()
try:
sock.bind((HOST, 0))
socket.close(sock.fileno())
with self.assertRaises(OSError):
sock.listen(1)
finally:
with self.assertRaises(OSError):
# sock.close() fails with EBADF
sock.close()
with self.assertRaises(TypeError):
socket.close(None)
with self.assertRaises(OSError):
socket.close(-1)
def test_makefile_mode(self):
for mode in 'r', 'rb', 'rw', 'w', 'wb':
with self.subTest(mode=mode):
with socket.socket() as sock:
encoding = None if "b" in mode else "utf-8"
with sock.makefile(mode, encoding=encoding) as fp:
self.assertEqual(fp.mode, mode)
def test_makefile_invalid_mode(self):
for mode in 'rt', 'x', '+', 'a':
with self.subTest(mode=mode):
with socket.socket() as sock:
with self.assertRaisesRegex(ValueError, 'invalid mode'):
sock.makefile(mode)
def test_pickle(self):
sock = socket.socket()
with sock:
for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
self.assertRaises(TypeError, pickle.dumps, sock, protocol)
for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
family = pickle.loads(pickle.dumps(socket.AF_INET, protocol))
self.assertEqual(family, socket.AF_INET)
type = pickle.loads(pickle.dumps(socket.SOCK_STREAM, protocol))
self.assertEqual(type, socket.SOCK_STREAM)
def test_listen_backlog(self):
for backlog in 0, -1:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as srv:
srv.bind((HOST, 0))
srv.listen(backlog)
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as srv:
srv.bind((HOST, 0))
srv.listen()
@support.cpython_only
@unittest.skipIf(_testcapi is None, "requires _testcapi")
def test_listen_backlog_overflow(self):
# Issue 15989
import _testcapi
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as srv:
srv.bind((HOST, 0))
self.assertRaises(OverflowError, srv.listen, _testcapi.INT_MAX + 1)
@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
def test_flowinfo(self):
self.assertRaises(OverflowError, socket.getnameinfo,
(socket_helper.HOSTv6, 0, 0xffffffff), 0)
with socket.socket(socket.AF_INET6, socket.SOCK_STREAM) as s:
self.assertRaises(OverflowError, s.bind, (socket_helper.HOSTv6, 0, -10))
@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
def test_getaddrinfo_ipv6_basic(self):
((*_, sockaddr),) = socket.getaddrinfo(
'ff02::1de:c0:face:8D', # Note capital letter `D`.
1234, socket.AF_INET6,
socket.SOCK_DGRAM,
socket.IPPROTO_UDP
)
self.assertEqual(sockaddr, ('ff02::1de:c0:face:8d', 1234, 0, 0))
def test_getfqdn_filter_localhost(self):
self.assertEqual(socket.getfqdn(), socket.getfqdn("0.0.0.0"))
self.assertEqual(socket.getfqdn(), socket.getfqdn("::"))
@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
@unittest.skipIf(sys.platform == 'win32', 'does not work on Windows')
@unittest.skipIf(AIX, 'Symbolic scope id does not work')
@unittest.skipUnless(hasattr(socket, 'if_nameindex'), "test needs socket.if_nameindex()")
@support.skip_android_selinux('if_nameindex')
def test_getaddrinfo_ipv6_scopeid_symbolic(self):
# Just pick up any network interface (Linux, Mac OS X)
(ifindex, test_interface) = socket.if_nameindex()[0]
((*_, sockaddr),) = socket.getaddrinfo(
'ff02::1de:c0:face:8D%' + test_interface,
1234, socket.AF_INET6,
socket.SOCK_DGRAM,
socket.IPPROTO_UDP
)
# Note missing interface name part in IPv6 address
self.assertEqual(sockaddr, ('ff02::1de:c0:face:8d', 1234, 0, ifindex))
@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
@unittest.skipUnless(
sys.platform == 'win32',
'Numeric scope id does not work or undocumented')
def test_getaddrinfo_ipv6_scopeid_numeric(self):
# Also works on Linux and Mac OS X, but is not documented (?)
# Windows, Linux and Max OS X allow nonexistent interface numbers here.
ifindex = 42
((*_, sockaddr),) = socket.getaddrinfo(
'ff02::1de:c0:face:8D%' + str(ifindex),
1234, socket.AF_INET6,
socket.SOCK_DGRAM,
socket.IPPROTO_UDP
)
# Note missing interface name part in IPv6 address
self.assertEqual(sockaddr, ('ff02::1de:c0:face:8d', 1234, 0, ifindex))
@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
@unittest.skipIf(sys.platform == 'win32', 'does not work on Windows')
@unittest.skipIf(AIX, 'Symbolic scope id does not work')
@unittest.skipUnless(hasattr(socket, 'if_nameindex'), "test needs socket.if_nameindex()")
@support.skip_android_selinux('if_nameindex')
def test_getnameinfo_ipv6_scopeid_symbolic(self):
# Just pick up any network interface.
(ifindex, test_interface) = socket.if_nameindex()[0]
sockaddr = ('ff02::1de:c0:face:8D', 1234, 0, ifindex) # Note capital letter `D`.
nameinfo = socket.getnameinfo(sockaddr, socket.NI_NUMERICHOST | socket.NI_NUMERICSERV)
self.assertEqual(nameinfo, ('ff02::1de:c0:face:8d%' + test_interface, '1234'))
@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
@unittest.skipUnless( sys.platform == 'win32',
'Numeric scope id does not work or undocumented')
def test_getnameinfo_ipv6_scopeid_numeric(self):
# Also works on Linux (undocumented), but does not work on Mac OS X
# Windows and Linux allow nonexistent interface numbers here.
ifindex = 42
sockaddr = ('ff02::1de:c0:face:8D', 1234, 0, ifindex) # Note capital letter `D`.
nameinfo = socket.getnameinfo(sockaddr, socket.NI_NUMERICHOST | socket.NI_NUMERICSERV)
self.assertEqual(nameinfo, ('ff02::1de:c0:face:8d%' + str(ifindex), '1234'))
def test_str_for_enums(self):
# Make sure that the AF_* and SOCK_* constants have enum-like string
# reprs.
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
self.assertEqual(repr(s.family), '<AddressFamily.AF_INET: %r>' % s.family.value)
self.assertEqual(repr(s.type), '<SocketKind.SOCK_STREAM: %r>' % s.type.value)
self.assertEqual(str(s.family), str(s.family.value))
self.assertEqual(str(s.type), str(s.type.value))
def test_socket_consistent_sock_type(self):
SOCK_NONBLOCK = getattr(socket, 'SOCK_NONBLOCK', 0)
SOCK_CLOEXEC = getattr(socket, 'SOCK_CLOEXEC', 0)
sock_type = socket.SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC
with socket.socket(socket.AF_INET, sock_type) as s:
self.assertEqual(s.type, socket.SOCK_STREAM)
s.settimeout(1)
self.assertEqual(s.type, socket.SOCK_STREAM)
s.settimeout(0)
self.assertEqual(s.type, socket.SOCK_STREAM)
s.setblocking(True)
self.assertEqual(s.type, socket.SOCK_STREAM)
s.setblocking(False)
self.assertEqual(s.type, socket.SOCK_STREAM)
def test_unknown_socket_family_repr(self):
# Test that when created with a family that's not one of the known
# AF_*/SOCK_* constants, socket.family just returns the number.
#
# To do this we fool socket.socket into believing it already has an
# open fd because on this path it doesn't actually verify the family and
# type and populates the socket object.
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
fd = sock.detach()
unknown_family = max(socket.AddressFamily.__members__.values()) + 1
unknown_type = max(
kind
for name, kind in socket.SocketKind.__members__.items()
if name not in {'SOCK_NONBLOCK', 'SOCK_CLOEXEC'}
) + 1
with socket.socket(
family=unknown_family, type=unknown_type, proto=23,
fileno=fd) as s:
self.assertEqual(s.family, unknown_family)
self.assertEqual(s.type, unknown_type)
# some OS like macOS ignore proto
self.assertIn(s.proto, {0, 23})
@unittest.skipUnless(hasattr(os, 'sendfile'), 'test needs os.sendfile()')
def test__sendfile_use_sendfile(self):
class File:
def __init__(self, fd):
self.fd = fd
def fileno(self):
return self.fd
with socket.socket() as sock:
fd = os.open(os.curdir, os.O_RDONLY)
os.close(fd)
with self.assertRaises(socket._GiveupOnSendfile):
sock._sendfile_use_sendfile(File(fd))
with self.assertRaises(OverflowError):
sock._sendfile_use_sendfile(File(2**1000))
with self.assertRaises(TypeError):
sock._sendfile_use_sendfile(File(None))
def _test_socket_fileno(self, s, family, stype):
self.assertEqual(s.family, family)
self.assertEqual(s.type, stype)
fd = s.fileno()
s2 = socket.socket(fileno=fd)
self.addCleanup(s2.close)
# detach old fd to avoid double close
s.detach()
self.assertEqual(s2.family, family)
self.assertEqual(s2.type, stype)
self.assertEqual(s2.fileno(), fd)
def test_socket_fileno(self):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.addCleanup(s.close)
s.bind((socket_helper.HOST, 0))
self._test_socket_fileno(s, socket.AF_INET, socket.SOCK_STREAM)
if hasattr(socket, "SOCK_DGRAM"):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.addCleanup(s.close)
s.bind((socket_helper.HOST, 0))
self._test_socket_fileno(s, socket.AF_INET, socket.SOCK_DGRAM)
if socket_helper.IPV6_ENABLED:
s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
self.addCleanup(s.close)
s.bind((socket_helper.HOSTv6, 0, 0, 0))
self._test_socket_fileno(s, socket.AF_INET6, socket.SOCK_STREAM)
if hasattr(socket, "AF_UNIX"):
unix_name = socket_helper.create_unix_domain_name()
self.addCleanup(os_helper.unlink, unix_name)
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
with s:
try:
s.bind(unix_name)
except PermissionError:
pass
else:
self._test_socket_fileno(s, socket.AF_UNIX,
socket.SOCK_STREAM)
def test_socket_fileno_rejects_float(self):
with self.assertRaises(TypeError):
socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=42.5)
def test_socket_fileno_rejects_other_types(self):
with self.assertRaises(TypeError):
socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno="foo")
def test_socket_fileno_rejects_invalid_socket(self):
with self.assertRaisesRegex(ValueError, "negative file descriptor"):
socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=-1)
@unittest.skipIf(os.name == "nt", "Windows disallows -1 only")
def test_socket_fileno_rejects_negative(self):
with self.assertRaisesRegex(ValueError, "negative file descriptor"):
socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=-42)
def test_socket_fileno_requires_valid_fd(self):
WSAENOTSOCK = 10038
with self.assertRaises(OSError) as cm:
socket.socket(fileno=os_helper.make_bad_fd())
self.assertIn(cm.exception.errno, (errno.EBADF, WSAENOTSOCK))
with self.assertRaises(OSError) as cm:
socket.socket(
socket.AF_INET,
socket.SOCK_STREAM,
fileno=os_helper.make_bad_fd())
self.assertIn(cm.exception.errno, (errno.EBADF, WSAENOTSOCK))
def test_socket_fileno_requires_socket_fd(self):
with tempfile.NamedTemporaryFile() as afile:
with self.assertRaises(OSError):
socket.socket(fileno=afile.fileno())
with self.assertRaises(OSError) as cm:
socket.socket(
socket.AF_INET,
socket.SOCK_STREAM,
fileno=afile.fileno())
self.assertEqual(cm.exception.errno, errno.ENOTSOCK)
def test_addressfamily_enum(self):
import _socket, enum
CheckedAddressFamily = enum._old_convert_(
enum.IntEnum, 'AddressFamily', 'socket',
lambda C: C.isupper() and C.startswith('AF_'),
source=_socket,
)
enum._test_simple_enum(CheckedAddressFamily, socket.AddressFamily)
def test_socketkind_enum(self):
import _socket, enum
CheckedSocketKind = enum._old_convert_(
enum.IntEnum, 'SocketKind', 'socket',
lambda C: C.isupper() and C.startswith('SOCK_'),
source=_socket,
)
enum._test_simple_enum(CheckedSocketKind, socket.SocketKind)
def test_msgflag_enum(self):
import _socket, enum
CheckedMsgFlag = enum._old_convert_(
enum.IntFlag, 'MsgFlag', 'socket',
lambda C: C.isupper() and C.startswith('MSG_'),
source=_socket,
)
enum._test_simple_enum(CheckedMsgFlag, socket.MsgFlag)
def test_addressinfo_enum(self):
import _socket, enum
CheckedAddressInfo = enum._old_convert_(
enum.IntFlag, 'AddressInfo', 'socket',
lambda C: C.isupper() and C.startswith('AI_'),
source=_socket)
enum._test_simple_enum(CheckedAddressInfo, socket.AddressInfo)
@unittest.skipUnless(HAVE_SOCKET_CAN, 'SocketCan required for this test.')
class BasicCANTest(unittest.TestCase):
def testCrucialConstants(self):
socket.AF_CAN
socket.PF_CAN
socket.CAN_RAW
@unittest.skipUnless(hasattr(socket, "CAN_BCM"),
'socket.CAN_BCM required for this test.')
def testBCMConstants(self):
socket.CAN_BCM
# opcodes
socket.CAN_BCM_TX_SETUP # create (cyclic) transmission task
socket.CAN_BCM_TX_DELETE # remove (cyclic) transmission task
socket.CAN_BCM_TX_READ # read properties of (cyclic) transmission task
socket.CAN_BCM_TX_SEND # send one CAN frame
socket.CAN_BCM_RX_SETUP # create RX content filter subscription
socket.CAN_BCM_RX_DELETE # remove RX content filter subscription
socket.CAN_BCM_RX_READ # read properties of RX content filter subscription
socket.CAN_BCM_TX_STATUS # reply to TX_READ request
socket.CAN_BCM_TX_EXPIRED # notification on performed transmissions (count=0)
socket.CAN_BCM_RX_STATUS # reply to RX_READ request
socket.CAN_BCM_RX_TIMEOUT # cyclic message is absent
socket.CAN_BCM_RX_CHANGED # updated CAN frame (detected content change)
# flags
socket.CAN_BCM_SETTIMER
socket.CAN_BCM_STARTTIMER
socket.CAN_BCM_TX_COUNTEVT
socket.CAN_BCM_TX_ANNOUNCE
socket.CAN_BCM_TX_CP_CAN_ID
socket.CAN_BCM_RX_FILTER_ID
socket.CAN_BCM_RX_CHECK_DLC
socket.CAN_BCM_RX_NO_AUTOTIMER
socket.CAN_BCM_RX_ANNOUNCE_RESUME
socket.CAN_BCM_TX_RESET_MULTI_IDX
socket.CAN_BCM_RX_RTR_FRAME
def testCreateSocket(self):
with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
pass
@unittest.skipUnless(hasattr(socket, "CAN_BCM"),
'socket.CAN_BCM required for this test.')
def testCreateBCMSocket(self):
with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_BCM) as s:
pass
def testBindAny(self):
with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
address = ('', )
s.bind(address)
self.assertEqual(s.getsockname(), address)
def testTooLongInterfaceName(self):
# most systems limit IFNAMSIZ to 16, take 1024 to be sure
with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
self.assertRaisesRegex(OSError, 'interface name too long',
s.bind, ('x' * 1024,))
@unittest.skipUnless(hasattr(socket, "CAN_RAW_LOOPBACK"),
'socket.CAN_RAW_LOOPBACK required for this test.')
def testLoopback(self):
with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
for loopback in (0, 1):
s.setsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_LOOPBACK,
loopback)
self.assertEqual(loopback,
s.getsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_LOOPBACK))
@unittest.skipUnless(hasattr(socket, "CAN_RAW_FILTER"),
'socket.CAN_RAW_FILTER required for this test.')
def testFilter(self):
can_id, can_mask = 0x200, 0x700
can_filter = struct.pack("=II", can_id, can_mask)
with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
s.setsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_FILTER, can_filter)
self.assertEqual(can_filter,
s.getsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_FILTER, 8))
s.setsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_FILTER, bytearray(can_filter))
@unittest.skipUnless(HAVE_SOCKET_CAN, 'SocketCan required for this test.')
class CANTest(ThreadedCANSocketTest):
def __init__(self, methodName='runTest'):
ThreadedCANSocketTest.__init__(self, methodName=methodName)
@classmethod
def build_can_frame(cls, can_id, data):
"""Build a CAN frame."""
can_dlc = len(data)
data = data.ljust(8, b'\x00')
return struct.pack(cls.can_frame_fmt, can_id, can_dlc, data)
@classmethod
def dissect_can_frame(cls, frame):
"""Dissect a CAN frame."""
can_id, can_dlc, data = struct.unpack(cls.can_frame_fmt, frame)
return (can_id, can_dlc, data[:can_dlc])
def testSendFrame(self):
cf, addr = self.s.recvfrom(self.bufsize)
self.assertEqual(self.cf, cf)
self.assertEqual(addr[0], self.interface)
def _testSendFrame(self):
self.cf = self.build_can_frame(0x00, b'\x01\x02\x03\x04\x05')
self.cli.send(self.cf)
def testSendMaxFrame(self):
cf, addr = self.s.recvfrom(self.bufsize)
self.assertEqual(self.cf, cf)
def _testSendMaxFrame(self):
self.cf = self.build_can_frame(0x00, b'\x07' * 8)
self.cli.send(self.cf)
def testSendMultiFrames(self):
cf, addr = self.s.recvfrom(self.bufsize)
self.assertEqual(self.cf1, cf)
cf, addr = self.s.recvfrom(self.bufsize)
self.assertEqual(self.cf2, cf)
def _testSendMultiFrames(self):
self.cf1 = self.build_can_frame(0x07, b'\x44\x33\x22\x11')
self.cli.send(self.cf1)
self.cf2 = self.build_can_frame(0x12, b'\x99\x22\x33')
self.cli.send(self.cf2)
@unittest.skipUnless(hasattr(socket, "CAN_BCM"),
'socket.CAN_BCM required for this test.')
def _testBCM(self):
cf, addr = self.cli.recvfrom(self.bufsize)
self.assertEqual(self.cf, cf)
can_id, can_dlc, data = self.dissect_can_frame(cf)
self.assertEqual(self.can_id, can_id)
self.assertEqual(self.data, data)
@unittest.skipUnless(hasattr(socket, "CAN_BCM"),
'socket.CAN_BCM required for this test.')
def testBCM(self):
bcm = socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_BCM)
self.addCleanup(bcm.close)
bcm.connect((self.interface,))
self.can_id = 0x123
self.data = bytes([0xc0, 0xff, 0xee])
self.cf = self.build_can_frame(self.can_id, self.data)
opcode = socket.CAN_BCM_TX_SEND
flags = 0
count = 0
ival1_seconds = ival1_usec = ival2_seconds = ival2_usec = 0
bcm_can_id = 0x0222
nframes = 1
assert len(self.cf) == 16
header = struct.pack(self.bcm_cmd_msg_fmt,
opcode,
flags,
count,
ival1_seconds,
ival1_usec,
ival2_seconds,
ival2_usec,
bcm_can_id,
nframes,
)
header_plus_frame = header + self.cf
bytes_sent = bcm.send(header_plus_frame)
self.assertEqual(bytes_sent, len(header_plus_frame))
@unittest.skipUnless(HAVE_SOCKET_CAN_ISOTP, 'CAN ISOTP required for this test.')
class ISOTPTest(unittest.TestCase):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.interface = "vcan0"
def testCrucialConstants(self):
socket.AF_CAN
socket.PF_CAN
socket.CAN_ISOTP
socket.SOCK_DGRAM
def testCreateSocket(self):
with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
pass
@unittest.skipUnless(hasattr(socket, "CAN_ISOTP"),
'socket.CAN_ISOTP required for this test.')
def testCreateISOTPSocket(self):
with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_ISOTP) as s:
pass
def testTooLongInterfaceName(self):
# most systems limit IFNAMSIZ to 16, take 1024 to be sure
with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_ISOTP) as s:
with self.assertRaisesRegex(OSError, 'interface name too long'):
s.bind(('x' * 1024, 1, 2))
def testBind(self):
try:
with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_ISOTP) as s:
addr = self.interface, 0x123, 0x456
s.bind(addr)
self.assertEqual(s.getsockname(), addr)
except OSError as e:
if e.errno == errno.ENODEV:
self.skipTest('network interface `%s` does not exist' %
self.interface)
else:
raise
@unittest.skipUnless(HAVE_SOCKET_CAN_J1939, 'CAN J1939 required for this test.')
class J1939Test(unittest.TestCase):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.interface = "vcan0"
@unittest.skipUnless(hasattr(socket, "CAN_J1939"),
'socket.CAN_J1939 required for this test.')
def testJ1939Constants(self):
socket.CAN_J1939
socket.J1939_MAX_UNICAST_ADDR
socket.J1939_IDLE_ADDR
socket.J1939_NO_ADDR
socket.J1939_NO_NAME
socket.J1939_PGN_REQUEST
socket.J1939_PGN_ADDRESS_CLAIMED
socket.J1939_PGN_ADDRESS_COMMANDED
socket.J1939_PGN_PDU1_MAX
socket.J1939_PGN_MAX
socket.J1939_NO_PGN
# J1939 socket options
socket.SO_J1939_FILTER
socket.SO_J1939_PROMISC
socket.SO_J1939_SEND_PRIO
socket.SO_J1939_ERRQUEUE
socket.SCM_J1939_DEST_ADDR
socket.SCM_J1939_DEST_NAME
socket.SCM_J1939_PRIO
socket.SCM_J1939_ERRQUEUE
socket.J1939_NLA_PAD
socket.J1939_NLA_BYTES_ACKED
socket.J1939_EE_INFO_NONE
socket.J1939_EE_INFO_TX_ABORT
socket.J1939_FILTER_MAX
@unittest.skipUnless(hasattr(socket, "CAN_J1939"),
'socket.CAN_J1939 required for this test.')
def testCreateJ1939Socket(self):
with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_J1939) as s:
pass
def testBind(self):
try:
with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_J1939) as s:
addr = self.interface, socket.J1939_NO_NAME, socket.J1939_NO_PGN, socket.J1939_NO_ADDR
s.bind(addr)
self.assertEqual(s.getsockname(), addr)
except OSError as e:
if e.errno == errno.ENODEV:
self.skipTest('network interface `%s` does not exist' %
self.interface)
else:
raise
@unittest.skipUnless(HAVE_SOCKET_RDS, 'RDS sockets required for this test.')
class BasicRDSTest(unittest.TestCase):
def testCrucialConstants(self):
socket.AF_RDS
socket.PF_RDS
def testCreateSocket(self):
with socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) as s:
pass
def testSocketBufferSize(self):
bufsize = 16384
with socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) as s:
s.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, bufsize)
s.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, bufsize)
@unittest.skipUnless(HAVE_SOCKET_RDS, 'RDS sockets required for this test.')
class RDSTest(ThreadedRDSSocketTest):
def __init__(self, methodName='runTest'):
ThreadedRDSSocketTest.__init__(self, methodName=methodName)
def setUp(self):
super().setUp()
self.evt = threading.Event()
def testSendAndRecv(self):
data, addr = self.serv.recvfrom(self.bufsize)
self.assertEqual(self.data, data)
self.assertEqual(self.cli_addr, addr)
def _testSendAndRecv(self):
self.data = b'spam'
self.cli.sendto(self.data, 0, (HOST, self.port))
def testPeek(self):
data, addr = self.serv.recvfrom(self.bufsize, socket.MSG_PEEK)
self.assertEqual(self.data, data)
data, addr = self.serv.recvfrom(self.bufsize)
self.assertEqual(self.data, data)
def _testPeek(self):
self.data = b'spam'
self.cli.sendto(self.data, 0, (HOST, self.port))
@requireAttrs(socket.socket, 'recvmsg')
def testSendAndRecvMsg(self):
data, ancdata, msg_flags, addr = self.serv.recvmsg(self.bufsize)
self.assertEqual(self.data, data)
@requireAttrs(socket.socket, 'sendmsg')
def _testSendAndRecvMsg(self):
self.data = b'hello ' * 10
self.cli.sendmsg([self.data], (), 0, (HOST, self.port))
def testSendAndRecvMulti(self):
data, addr = self.serv.recvfrom(self.bufsize)
self.assertEqual(self.data1, data)
data, addr = self.serv.recvfrom(self.bufsize)
self.assertEqual(self.data2, data)
def _testSendAndRecvMulti(self):
self.data1 = b'bacon'
self.cli.sendto(self.data1, 0, (HOST, self.port))
self.data2 = b'egg'
self.cli.sendto(self.data2, 0, (HOST, self.port))
def testSelect(self):
r, w, x = select.select([self.serv], [], [], 3.0)
self.assertIn(self.serv, r)
data, addr = self.serv.recvfrom(self.bufsize)
self.assertEqual(self.data, data)
def _testSelect(self):
self.data = b'select'
self.cli.sendto(self.data, 0, (HOST, self.port))
@unittest.skipUnless(HAVE_SOCKET_QIPCRTR,
'QIPCRTR sockets required for this test.')
class BasicQIPCRTRTest(unittest.TestCase):
def testCrucialConstants(self):
socket.AF_QIPCRTR
def testCreateSocket(self):
with socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM) as s:
pass
def testUnbound(self):
with socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM) as s:
self.assertEqual(s.getsockname()[1], 0)
def testBindSock(self):
with socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM) as s:
socket_helper.bind_port(s, host=s.getsockname()[0])
self.assertNotEqual(s.getsockname()[1], 0)
def testInvalidBindSock(self):
with socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM) as s:
self.assertRaises(OSError, socket_helper.bind_port, s, host=-2)
def testAutoBindSock(self):
with socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM) as s:
s.connect((123, 123))
self.assertNotEqual(s.getsockname()[1], 0)
@unittest.skipIf(fcntl is None, "need fcntl")
@unittest.skipUnless(HAVE_SOCKET_VSOCK,
'VSOCK sockets required for this test.')
class BasicVSOCKTest(unittest.TestCase):
def testCrucialConstants(self):
socket.AF_VSOCK
def testVSOCKConstants(self):
socket.SO_VM_SOCKETS_BUFFER_SIZE
socket.SO_VM_SOCKETS_BUFFER_MIN_SIZE
socket.SO_VM_SOCKETS_BUFFER_MAX_SIZE
socket.VMADDR_CID_ANY
socket.VMADDR_PORT_ANY
socket.VMADDR_CID_LOCAL
socket.VMADDR_CID_HOST
socket.VM_SOCKETS_INVALID_VERSION
socket.IOCTL_VM_SOCKETS_GET_LOCAL_CID
def testCreateSocket(self):
with socket.socket(socket.AF_VSOCK, socket.SOCK_STREAM) as s:
pass
def testSocketBufferSize(self):
with socket.socket(socket.AF_VSOCK, socket.SOCK_STREAM) as s:
orig_max = s.getsockopt(socket.AF_VSOCK,
socket.SO_VM_SOCKETS_BUFFER_MAX_SIZE)
orig = s.getsockopt(socket.AF_VSOCK,
socket.SO_VM_SOCKETS_BUFFER_SIZE)
orig_min = s.getsockopt(socket.AF_VSOCK,
socket.SO_VM_SOCKETS_BUFFER_MIN_SIZE)
s.setsockopt(socket.AF_VSOCK,
socket.SO_VM_SOCKETS_BUFFER_MAX_SIZE, orig_max * 2)
s.setsockopt(socket.AF_VSOCK,
socket.SO_VM_SOCKETS_BUFFER_SIZE, orig * 2)
s.setsockopt(socket.AF_VSOCK,
socket.SO_VM_SOCKETS_BUFFER_MIN_SIZE, orig_min * 2)
self.assertEqual(orig_max * 2,
s.getsockopt(socket.AF_VSOCK,
socket.SO_VM_SOCKETS_BUFFER_MAX_SIZE))
self.assertEqual(orig * 2,
s.getsockopt(socket.AF_VSOCK,
socket.SO_VM_SOCKETS_BUFFER_SIZE))
self.assertEqual(orig_min * 2,
s.getsockopt(socket.AF_VSOCK,
socket.SO_VM_SOCKETS_BUFFER_MIN_SIZE))
@unittest.skipUnless(HAVE_SOCKET_BLUETOOTH,
'Bluetooth sockets required for this test.')
class BasicBluetoothTest(unittest.TestCase):
def testBluetoothConstants(self):
socket.BDADDR_ANY
socket.BDADDR_LOCAL
socket.AF_BLUETOOTH
socket.BTPROTO_RFCOMM
if sys.platform != "win32":
socket.BTPROTO_HCI
socket.SOL_HCI
socket.BTPROTO_L2CAP
if not sys.platform.startswith("freebsd"):
socket.BTPROTO_SCO
def testCreateRfcommSocket(self):
with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_STREAM, socket.BTPROTO_RFCOMM) as s:
pass
@unittest.skipIf(sys.platform == "win32", "windows does not support L2CAP sockets")
def testCreateL2capSocket(self):
with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_L2CAP) as s:
pass
@unittest.skipIf(sys.platform == "win32", "windows does not support HCI sockets")
def testCreateHciSocket(self):
with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_RAW, socket.BTPROTO_HCI) as s:
pass
@unittest.skipIf(sys.platform == "win32" or sys.platform.startswith("freebsd"),
"windows and freebsd do not support SCO sockets")
def testCreateScoSocket(self):
with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_SCO) as s:
pass
@unittest.skipUnless(HAVE_SOCKET_HYPERV,
'Hyper-V sockets required for this test.')
class BasicHyperVTest(unittest.TestCase):
def testHyperVConstants(self):
socket.HVSOCKET_CONNECT_TIMEOUT
socket.HVSOCKET_CONNECT_TIMEOUT_MAX
socket.HVSOCKET_CONNECTED_SUSPEND
socket.HVSOCKET_ADDRESS_FLAG_PASSTHRU
socket.HV_GUID_ZERO
socket.HV_GUID_WILDCARD
socket.HV_GUID_BROADCAST
socket.HV_GUID_CHILDREN
socket.HV_GUID_LOOPBACK
socket.HV_GUID_PARENT
def testCreateHyperVSocketWithUnknownProtoFailure(self):
expected = r"\[WinError 10041\]"
with self.assertRaisesRegex(OSError, expected):
socket.socket(socket.AF_HYPERV, socket.SOCK_STREAM)
def testCreateHyperVSocketAddrNotTupleFailure(self):
expected = "connect(): AF_HYPERV address must be tuple, not str"
with socket.socket(socket.AF_HYPERV, socket.SOCK_STREAM, socket.HV_PROTOCOL_RAW) as s:
with self.assertRaisesRegex(TypeError, re.escape(expected)):
s.connect(socket.HV_GUID_ZERO)
def testCreateHyperVSocketAddrNotTupleOf2StrsFailure(self):
expected = "AF_HYPERV address must be a str tuple (vm_id, service_id)"
with socket.socket(socket.AF_HYPERV, socket.SOCK_STREAM, socket.HV_PROTOCOL_RAW) as s:
with self.assertRaisesRegex(TypeError, re.escape(expected)):
s.connect((socket.HV_GUID_ZERO,))
def testCreateHyperVSocketAddrNotTupleOfStrsFailure(self):
expected = "AF_HYPERV address must be a str tuple (vm_id, service_id)"
with socket.socket(socket.AF_HYPERV, socket.SOCK_STREAM, socket.HV_PROTOCOL_RAW) as s:
with self.assertRaisesRegex(TypeError, re.escape(expected)):
s.connect((1, 2))
def testCreateHyperVSocketAddrVmIdNotValidUUIDFailure(self):
expected = "connect(): AF_HYPERV address vm_id is not a valid UUID string"
with socket.socket(socket.AF_HYPERV, socket.SOCK_STREAM, socket.HV_PROTOCOL_RAW) as s:
with self.assertRaisesRegex(ValueError, re.escape(expected)):
s.connect(("00", socket.HV_GUID_ZERO))
def testCreateHyperVSocketAddrServiceIdNotValidUUIDFailure(self):
expected = "connect(): AF_HYPERV address service_id is not a valid UUID string"
with socket.socket(socket.AF_HYPERV, socket.SOCK_STREAM, socket.HV_PROTOCOL_RAW) as s:
with self.assertRaisesRegex(ValueError, re.escape(expected)):
s.connect((socket.HV_GUID_ZERO, "00"))
class BasicTCPTest(SocketConnectedTest):
def __init__(self, methodName='runTest'):
SocketConnectedTest.__init__(self, methodName=methodName)
def testRecv(self):
# Testing large receive over TCP
msg = self.cli_conn.recv(1024)
self.assertEqual(msg, MSG)
def _testRecv(self):
self.serv_conn.send(MSG)
def testOverFlowRecv(self):
# Testing receive in chunks over TCP
seg1 = self.cli_conn.recv(len(MSG) - 3)
seg2 = self.cli_conn.recv(1024)
msg = seg1 + seg2
self.assertEqual(msg, MSG)
def _testOverFlowRecv(self):
self.serv_conn.send(MSG)
def testRecvFrom(self):
# Testing large recvfrom() over TCP
msg, addr = self.cli_conn.recvfrom(1024)
self.assertEqual(msg, MSG)
def _testRecvFrom(self):
self.serv_conn.send(MSG)
def testOverFlowRecvFrom(self):
# Testing recvfrom() in chunks over TCP
seg1, addr = self.cli_conn.recvfrom(len(MSG)-3)
seg2, addr = self.cli_conn.recvfrom(1024)
msg = seg1 + seg2
self.assertEqual(msg, MSG)
def _testOverFlowRecvFrom(self):
self.serv_conn.send(MSG)
def testSendAll(self):
# Testing sendall() with a 2048 byte string over TCP
msg = b''
while 1:
read = self.cli_conn.recv(1024)
if not read:
break
msg += read
self.assertEqual(msg, b'f' * 2048)
def _testSendAll(self):
big_chunk = b'f' * 2048
self.serv_conn.sendall(big_chunk)
def testFromFd(self):
# Testing fromfd()
fd = self.cli_conn.fileno()
sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
self.addCleanup(sock.close)
self.assertIsInstance(sock, socket.socket)
msg = sock.recv(1024)
self.assertEqual(msg, MSG)
def _testFromFd(self):
self.serv_conn.send(MSG)
def testDup(self):
# Testing dup()
sock = self.cli_conn.dup()
self.addCleanup(sock.close)
msg = sock.recv(1024)
self.assertEqual(msg, MSG)
def _testDup(self):
self.serv_conn.send(MSG)
def check_shutdown(self):
# Test shutdown() helper
msg = self.cli_conn.recv(1024)
self.assertEqual(msg, MSG)
# wait for _testShutdown[_overflow] to finish: on OS X, when the server
# closes the connection the client also becomes disconnected,
# and the client's shutdown call will fail. (Issue #4397.)
self.done.wait()
def testShutdown(self):
self.check_shutdown()
def _testShutdown(self):
self.serv_conn.send(MSG)
self.serv_conn.shutdown(2)
@support.cpython_only
@unittest.skipIf(_testcapi is None, "requires _testcapi")
def testShutdown_overflow(self):
self.check_shutdown()
@support.cpython_only
@unittest.skipIf(_testcapi is None, "requires _testcapi")
def _testShutdown_overflow(self):
import _testcapi
self.serv_conn