| import unittest |
| from test import support |
| from test.support import ( |
| is_apple, os_helper, refleak_helper, socket_helper, threading_helper |
| ) |
| import _thread as thread |
| import array |
| import contextlib |
| import errno |
| import gc |
| import io |
| import itertools |
| import math |
| import os |
| import pickle |
| import platform |
| import queue |
| import random |
| import re |
| import select |
| import signal |
| import socket |
| import string |
| import struct |
| import sys |
| import tempfile |
| import threading |
| import time |
| import traceback |
| from weakref import proxy |
| try: |
| import multiprocessing |
| except ImportError: |
| multiprocessing = False |
| try: |
| import fcntl |
| except ImportError: |
| fcntl = None |
| try: |
| import _testcapi |
| except ImportError: |
| _testcapi = None |
| |
| support.requires_working_socket(module=True) |
| |
| HOST = socket_helper.HOST |
| # test unicode string and carriage return |
| MSG = 'Michael Gilfix was here\u1234\r\n'.encode('utf-8') |
| |
| VSOCKPORT = 1234 |
| AIX = platform.system() == "AIX" |
| WSL = "microsoft-standard-WSL" in platform.release() |
| |
| try: |
| import _socket |
| except ImportError: |
| _socket = None |
| |
| def skipForRefleakHuntinIf(condition, issueref): |
| if not condition: |
| def decorator(f): |
| f.client_skip = lambda f: f |
| return f |
| |
| else: |
| def decorator(f): |
| @contextlib.wraps(f) |
| def wrapper(*args, **kwds): |
| if refleak_helper.hunting_for_refleaks(): |
| raise unittest.SkipTest(f"ignore while hunting for refleaks, see {issueref}") |
| |
| return f(*args, **kwds) |
| |
| def client_skip(f): |
| @contextlib.wraps(f) |
| def wrapper(*args, **kwds): |
| if refleak_helper.hunting_for_refleaks(): |
| return |
| |
| return f(*args, **kwds) |
| |
| return wrapper |
| wrapper.client_skip = client_skip |
| return wrapper |
| |
| return decorator |
| |
| def get_cid(): |
| if fcntl is None: |
| return None |
| if not hasattr(socket, 'IOCTL_VM_SOCKETS_GET_LOCAL_CID'): |
| return None |
| try: |
| with open("/dev/vsock", "rb") as f: |
| r = fcntl.ioctl(f, socket.IOCTL_VM_SOCKETS_GET_LOCAL_CID, " ") |
| except OSError: |
| return None |
| else: |
| return struct.unpack("I", r)[0] |
| |
| def _have_socket_can(): |
| """Check whether CAN sockets are supported on this host.""" |
| try: |
| s = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) |
| except (AttributeError, OSError): |
| return False |
| else: |
| s.close() |
| return True |
| |
| def _have_socket_can_isotp(): |
| """Check whether CAN ISOTP sockets are supported on this host.""" |
| try: |
| s = socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_ISOTP) |
| except (AttributeError, OSError): |
| return False |
| else: |
| s.close() |
| return True |
| |
| def _have_socket_can_j1939(): |
| """Check whether CAN J1939 sockets are supported on this host.""" |
| try: |
| s = socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_J1939) |
| except (AttributeError, OSError): |
| return False |
| else: |
| s.close() |
| return True |
| |
| def _have_socket_rds(): |
| """Check whether RDS sockets are supported on this host.""" |
| try: |
| s = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) |
| except (AttributeError, OSError): |
| return False |
| else: |
| s.close() |
| return True |
| |
| def _have_socket_alg(): |
| """Check whether AF_ALG sockets are supported on this host.""" |
| try: |
| s = socket.socket(socket.AF_ALG, socket.SOCK_SEQPACKET, 0) |
| except (AttributeError, OSError): |
| return False |
| else: |
| s.close() |
| return True |
| |
| def _have_socket_qipcrtr(): |
| """Check whether AF_QIPCRTR sockets are supported on this host.""" |
| try: |
| s = socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM, 0) |
| except (AttributeError, OSError): |
| return False |
| else: |
| s.close() |
| return True |
| |
| def _have_socket_vsock(): |
| """Check whether AF_VSOCK sockets are supported on this host.""" |
| cid = get_cid() |
| return (cid is not None) |
| |
| |
| def _have_socket_bluetooth(): |
| """Check whether AF_BLUETOOTH sockets are supported on this host.""" |
| try: |
| # RFCOMM is supported by all platforms with bluetooth support. Windows |
| # does not support omitting the protocol. |
| s = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_STREAM, socket.BTPROTO_RFCOMM) |
| except (AttributeError, OSError): |
| return False |
| else: |
| s.close() |
| return True |
| |
| |
| def _have_socket_hyperv(): |
| """Check whether AF_HYPERV sockets are supported on this host.""" |
| try: |
| s = socket.socket(socket.AF_HYPERV, socket.SOCK_STREAM, socket.HV_PROTOCOL_RAW) |
| except (AttributeError, OSError): |
| return False |
| else: |
| s.close() |
| return True |
| |
| |
| @contextlib.contextmanager |
| def socket_setdefaulttimeout(timeout): |
| old_timeout = socket.getdefaulttimeout() |
| try: |
| socket.setdefaulttimeout(timeout) |
| yield |
| finally: |
| socket.setdefaulttimeout(old_timeout) |
| |
| |
| HAVE_SOCKET_CAN = _have_socket_can() |
| |
| HAVE_SOCKET_CAN_ISOTP = _have_socket_can_isotp() |
| |
| HAVE_SOCKET_CAN_J1939 = _have_socket_can_j1939() |
| |
| HAVE_SOCKET_RDS = _have_socket_rds() |
| |
| HAVE_SOCKET_ALG = _have_socket_alg() |
| |
| HAVE_SOCKET_QIPCRTR = _have_socket_qipcrtr() |
| |
| HAVE_SOCKET_VSOCK = _have_socket_vsock() |
| |
| # Older Android versions block UDPLITE with SELinux. |
| HAVE_SOCKET_UDPLITE = ( |
| hasattr(socket, "IPPROTO_UDPLITE") |
| and not (support.is_android and platform.android_ver().api_level < 29)) |
| |
| HAVE_SOCKET_BLUETOOTH = _have_socket_bluetooth() |
| |
| HAVE_SOCKET_HYPERV = _have_socket_hyperv() |
| |
| # Size in bytes of the int type |
| SIZEOF_INT = array.array("i").itemsize |
| |
| class SocketTCPTest(unittest.TestCase): |
| |
| def setUp(self): |
| self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| self.port = socket_helper.bind_port(self.serv) |
| self.serv.listen() |
| |
| def tearDown(self): |
| self.serv.close() |
| self.serv = None |
| |
| class SocketUDPTest(unittest.TestCase): |
| |
| def setUp(self): |
| self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) |
| self.port = socket_helper.bind_port(self.serv) |
| |
| def tearDown(self): |
| self.serv.close() |
| self.serv = None |
| |
| class SocketUDPLITETest(SocketUDPTest): |
| |
| def setUp(self): |
| self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDPLITE) |
| self.port = socket_helper.bind_port(self.serv) |
| |
| |
| class SocketCANTest(unittest.TestCase): |
| |
| """To be able to run this test, a `vcan0` CAN interface can be created with |
| the following commands: |
| # modprobe vcan |
| # ip link add dev vcan0 type vcan |
| # ip link set up vcan0 |
| """ |
| interface = 'vcan0' |
| bufsize = 128 |
| |
| """The CAN frame structure is defined in <linux/can.h>: |
| |
| struct can_frame { |
| canid_t can_id; /* 32 bit CAN_ID + EFF/RTR/ERR flags */ |
| __u8 can_dlc; /* data length code: 0 .. 8 */ |
| __u8 data[8] __attribute__((aligned(8))); |
| }; |
| """ |
| can_frame_fmt = "=IB3x8s" |
| can_frame_size = struct.calcsize(can_frame_fmt) |
| |
| """The Broadcast Management Command frame structure is defined |
| in <linux/can/bcm.h>: |
| |
| struct bcm_msg_head { |
| __u32 opcode; |
| __u32 flags; |
| __u32 count; |
| struct timeval ival1, ival2; |
| canid_t can_id; |
| __u32 nframes; |
| struct can_frame frames[0]; |
| } |
| |
| `bcm_msg_head` must be 8 bytes aligned because of the `frames` member (see |
| `struct can_frame` definition). Must use native not standard types for packing. |
| """ |
| bcm_cmd_msg_fmt = "@3I4l2I" |
| bcm_cmd_msg_fmt += "x" * (struct.calcsize(bcm_cmd_msg_fmt) % 8) |
| |
| def setUp(self): |
| self.s = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) |
| self.addCleanup(self.s.close) |
| try: |
| self.s.bind((self.interface,)) |
| except OSError: |
| self.skipTest('network interface `%s` does not exist' % |
| self.interface) |
| |
| |
| class SocketRDSTest(unittest.TestCase): |
| |
| """To be able to run this test, the `rds` kernel module must be loaded: |
| # modprobe rds |
| """ |
| bufsize = 8192 |
| |
| def setUp(self): |
| self.serv = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) |
| self.addCleanup(self.serv.close) |
| try: |
| self.port = socket_helper.bind_port(self.serv) |
| except OSError: |
| self.skipTest('unable to bind RDS socket') |
| |
| |
| class ThreadableTest: |
| """Threadable Test class |
| |
| The ThreadableTest class makes it easy to create a threaded |
| client/server pair from an existing unit test. To create a |
| new threaded class from an existing unit test, use multiple |
| inheritance: |
| |
| class NewClass (OldClass, ThreadableTest): |
| pass |
| |
| This class defines two new fixture functions with obvious |
| purposes for overriding: |
| |
| clientSetUp () |
| clientTearDown () |
| |
| Any new test functions within the class must then define |
| tests in pairs, where the test name is preceded with a |
| '_' to indicate the client portion of the test. Ex: |
| |
| def testFoo(self): |
| # Server portion |
| |
| def _testFoo(self): |
| # Client portion |
| |
| Any exceptions raised by the clients during their tests |
| are caught and transferred to the main thread to alert |
| the testing framework. |
| |
| Note, the server setup function cannot call any blocking |
| functions that rely on the client thread during setup, |
| unless serverExplicitReady() is called just before |
| the blocking call (such as in setting up a client/server |
| connection and performing the accept() in setUp(). |
| """ |
| |
| def __init__(self): |
| # Swap the true setup function |
| self.__setUp = self.setUp |
| self.setUp = self._setUp |
| |
| def serverExplicitReady(self): |
| """This method allows the server to explicitly indicate that |
| it wants the client thread to proceed. This is useful if the |
| server is about to execute a blocking routine that is |
| dependent upon the client thread during its setup routine.""" |
| self.server_ready.set() |
| |
| def _setUp(self): |
| self.enterContext(threading_helper.wait_threads_exit()) |
| |
| self.server_ready = threading.Event() |
| self.client_ready = threading.Event() |
| self.done = threading.Event() |
| self.queue = queue.Queue(1) |
| self.server_crashed = False |
| |
| def raise_queued_exception(): |
| if self.queue.qsize(): |
| raise self.queue.get() |
| self.addCleanup(raise_queued_exception) |
| |
| # Do some munging to start the client test. |
| methodname = self.id() |
| i = methodname.rfind('.') |
| methodname = methodname[i+1:] |
| test_method = getattr(self, '_' + methodname) |
| self.client_thread = thread.start_new_thread( |
| self.clientRun, (test_method,)) |
| |
| try: |
| self.__setUp() |
| except: |
| self.server_crashed = True |
| raise |
| finally: |
| self.server_ready.set() |
| self.client_ready.wait() |
| self.addCleanup(self.done.wait) |
| |
| def clientRun(self, test_func): |
| self.server_ready.wait() |
| try: |
| self.clientSetUp() |
| except BaseException as e: |
| self.queue.put(e) |
| self.clientTearDown() |
| return |
| finally: |
| self.client_ready.set() |
| if self.server_crashed: |
| self.clientTearDown() |
| return |
| if not hasattr(test_func, '__call__'): |
| raise TypeError("test_func must be a callable function") |
| try: |
| test_func() |
| except BaseException as e: |
| self.queue.put(e) |
| finally: |
| self.clientTearDown() |
| |
| def clientSetUp(self): |
| raise NotImplementedError("clientSetUp must be implemented.") |
| |
| def clientTearDown(self): |
| self.done.set() |
| thread.exit() |
| |
| class ThreadedTCPSocketTest(SocketTCPTest, ThreadableTest): |
| |
| def __init__(self, methodName='runTest'): |
| SocketTCPTest.__init__(self, methodName=methodName) |
| ThreadableTest.__init__(self) |
| |
| def clientSetUp(self): |
| self.cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| |
| def clientTearDown(self): |
| self.cli.close() |
| self.cli = None |
| ThreadableTest.clientTearDown(self) |
| |
| class ThreadedUDPSocketTest(SocketUDPTest, ThreadableTest): |
| |
| def __init__(self, methodName='runTest'): |
| SocketUDPTest.__init__(self, methodName=methodName) |
| ThreadableTest.__init__(self) |
| |
| def clientSetUp(self): |
| self.cli = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) |
| |
| def clientTearDown(self): |
| self.cli.close() |
| self.cli = None |
| ThreadableTest.clientTearDown(self) |
| |
| @unittest.skipUnless(HAVE_SOCKET_UDPLITE, |
| 'UDPLITE sockets required for this test.') |
| class ThreadedUDPLITESocketTest(SocketUDPLITETest, ThreadableTest): |
| |
| def __init__(self, methodName='runTest'): |
| SocketUDPLITETest.__init__(self, methodName=methodName) |
| ThreadableTest.__init__(self) |
| |
| def clientSetUp(self): |
| self.cli = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDPLITE) |
| |
| def clientTearDown(self): |
| self.cli.close() |
| self.cli = None |
| ThreadableTest.clientTearDown(self) |
| |
| class ThreadedCANSocketTest(SocketCANTest, ThreadableTest): |
| |
| def __init__(self, methodName='runTest'): |
| SocketCANTest.__init__(self, methodName=methodName) |
| ThreadableTest.__init__(self) |
| |
| def clientSetUp(self): |
| self.cli = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) |
| try: |
| self.cli.bind((self.interface,)) |
| except OSError: |
| # skipTest should not be called here, and will be called in the |
| # server instead |
| pass |
| |
| def clientTearDown(self): |
| self.cli.close() |
| self.cli = None |
| ThreadableTest.clientTearDown(self) |
| |
| class ThreadedRDSSocketTest(SocketRDSTest, ThreadableTest): |
| |
| def __init__(self, methodName='runTest'): |
| SocketRDSTest.__init__(self, methodName=methodName) |
| ThreadableTest.__init__(self) |
| |
| def clientSetUp(self): |
| self.cli = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) |
| try: |
| # RDS sockets must be bound explicitly to send or receive data |
| self.cli.bind((HOST, 0)) |
| self.cli_addr = self.cli.getsockname() |
| except OSError: |
| # skipTest should not be called here, and will be called in the |
| # server instead |
| pass |
| |
| def clientTearDown(self): |
| self.cli.close() |
| self.cli = None |
| ThreadableTest.clientTearDown(self) |
| |
| @unittest.skipIf(fcntl is None, "need fcntl") |
| @unittest.skipIf(WSL, 'VSOCK does not work on Microsoft WSL') |
| @unittest.skipUnless(HAVE_SOCKET_VSOCK, |
| 'VSOCK sockets required for this test.') |
| class ThreadedVSOCKSocketStreamTest(unittest.TestCase, ThreadableTest): |
| |
| def __init__(self, methodName='runTest'): |
| unittest.TestCase.__init__(self, methodName=methodName) |
| ThreadableTest.__init__(self) |
| |
| def setUp(self): |
| self.serv = socket.socket(socket.AF_VSOCK, socket.SOCK_STREAM) |
| self.addCleanup(self.serv.close) |
| self.serv.bind((socket.VMADDR_CID_ANY, VSOCKPORT)) |
| self.serv.listen() |
| self.serverExplicitReady() |
| self.serv.settimeout(support.LOOPBACK_TIMEOUT) |
| self.conn, self.connaddr = self.serv.accept() |
| self.addCleanup(self.conn.close) |
| |
| def clientSetUp(self): |
| time.sleep(0.1) |
| self.cli = socket.socket(socket.AF_VSOCK, socket.SOCK_STREAM) |
| self.addCleanup(self.cli.close) |
| cid = get_cid() |
| if cid in (socket.VMADDR_CID_HOST, socket.VMADDR_CID_ANY): |
| # gh-119461: Use the local communication address (loopback) |
| cid = socket.VMADDR_CID_LOCAL |
| self.cli.connect((cid, VSOCKPORT)) |
| |
| def testStream(self): |
| msg = self.conn.recv(1024) |
| self.assertEqual(msg, MSG) |
| |
| def _testStream(self): |
| self.cli.send(MSG) |
| self.cli.close() |
| |
| class SocketConnectedTest(ThreadedTCPSocketTest): |
| """Socket tests for client-server connection. |
| |
| self.cli_conn is a client socket connected to the server. The |
| setUp() method guarantees that it is connected to the server. |
| """ |
| |
| def __init__(self, methodName='runTest'): |
| ThreadedTCPSocketTest.__init__(self, methodName=methodName) |
| |
| def setUp(self): |
| ThreadedTCPSocketTest.setUp(self) |
| # Indicate explicitly we're ready for the client thread to |
| # proceed and then perform the blocking call to accept |
| self.serverExplicitReady() |
| conn, addr = self.serv.accept() |
| self.cli_conn = conn |
| |
| def tearDown(self): |
| self.cli_conn.close() |
| self.cli_conn = None |
| ThreadedTCPSocketTest.tearDown(self) |
| |
| def clientSetUp(self): |
| ThreadedTCPSocketTest.clientSetUp(self) |
| self.cli.connect((HOST, self.port)) |
| self.serv_conn = self.cli |
| |
| def clientTearDown(self): |
| self.serv_conn.close() |
| self.serv_conn = None |
| ThreadedTCPSocketTest.clientTearDown(self) |
| |
| class SocketPairTest(unittest.TestCase, ThreadableTest): |
| |
| def __init__(self, methodName='runTest'): |
| unittest.TestCase.__init__(self, methodName=methodName) |
| ThreadableTest.__init__(self) |
| self.cli = None |
| self.serv = None |
| |
| def socketpair(self): |
| # To be overridden by some child classes. |
| return socket.socketpair() |
| |
| def setUp(self): |
| self.serv, self.cli = self.socketpair() |
| |
| def tearDown(self): |
| if self.serv: |
| self.serv.close() |
| self.serv = None |
| |
| def clientSetUp(self): |
| pass |
| |
| def clientTearDown(self): |
| if self.cli: |
| self.cli.close() |
| self.cli = None |
| ThreadableTest.clientTearDown(self) |
| |
| |
| # The following classes are used by the sendmsg()/recvmsg() tests. |
| # Combining, for instance, ConnectedStreamTestMixin and TCPTestBase |
| # gives a drop-in replacement for SocketConnectedTest, but different |
| # address families can be used, and the attributes serv_addr and |
| # cli_addr will be set to the addresses of the endpoints. |
| |
| class SocketTestBase(unittest.TestCase): |
| """A base class for socket tests. |
| |
| Subclasses must provide methods newSocket() to return a new socket |
| and bindSock(sock) to bind it to an unused address. |
| |
| Creates a socket self.serv and sets self.serv_addr to its address. |
| """ |
| |
| def setUp(self): |
| self.serv = self.newSocket() |
| self.addCleanup(self.close_server) |
| self.bindServer() |
| |
| def close_server(self): |
| self.serv.close() |
| self.serv = None |
| |
| def bindServer(self): |
| """Bind server socket and set self.serv_addr to its address.""" |
| self.bindSock(self.serv) |
| self.serv_addr = self.serv.getsockname() |
| |
| |
| class SocketListeningTestMixin(SocketTestBase): |
| """Mixin to listen on the server socket.""" |
| |
| def setUp(self): |
| super().setUp() |
| self.serv.listen() |
| |
| |
| class ThreadedSocketTestMixin(SocketTestBase, ThreadableTest): |
| """Mixin to add client socket and allow client/server tests. |
| |
| Client socket is self.cli and its address is self.cli_addr. See |
| ThreadableTest for usage information. |
| """ |
| |
| def __init__(self, *args, **kwargs): |
| super().__init__(*args, **kwargs) |
| ThreadableTest.__init__(self) |
| |
| def clientSetUp(self): |
| self.cli = self.newClientSocket() |
| self.bindClient() |
| |
| def newClientSocket(self): |
| """Return a new socket for use as client.""" |
| return self.newSocket() |
| |
| def bindClient(self): |
| """Bind client socket and set self.cli_addr to its address.""" |
| self.bindSock(self.cli) |
| self.cli_addr = self.cli.getsockname() |
| |
| def clientTearDown(self): |
| self.cli.close() |
| self.cli = None |
| ThreadableTest.clientTearDown(self) |
| |
| |
| class ConnectedStreamTestMixin(SocketListeningTestMixin, |
| ThreadedSocketTestMixin): |
| """Mixin to allow client/server stream tests with connected client. |
| |
| Server's socket representing connection to client is self.cli_conn |
| and client's connection to server is self.serv_conn. (Based on |
| SocketConnectedTest.) |
| """ |
| |
| def setUp(self): |
| super().setUp() |
| # Indicate explicitly we're ready for the client thread to |
| # proceed and then perform the blocking call to accept |
| self.serverExplicitReady() |
| conn, addr = self.serv.accept() |
| self.cli_conn = conn |
| |
| def tearDown(self): |
| self.cli_conn.close() |
| self.cli_conn = None |
| super().tearDown() |
| |
| def clientSetUp(self): |
| super().clientSetUp() |
| self.cli.connect(self.serv_addr) |
| self.serv_conn = self.cli |
| |
| def clientTearDown(self): |
| try: |
| self.serv_conn.close() |
| self.serv_conn = None |
| except AttributeError: |
| pass |
| super().clientTearDown() |
| |
| |
| class UnixSocketTestBase(SocketTestBase): |
| """Base class for Unix-domain socket tests.""" |
| |
| # This class is used for file descriptor passing tests, so we |
| # create the sockets in a private directory so that other users |
| # can't send anything that might be problematic for a privileged |
| # user running the tests. |
| |
| def bindSock(self, sock): |
| path = socket_helper.create_unix_domain_name() |
| self.addCleanup(os_helper.unlink, path) |
| socket_helper.bind_unix_socket(sock, path) |
| |
| class UnixStreamBase(UnixSocketTestBase): |
| """Base class for Unix-domain SOCK_STREAM tests.""" |
| |
| def newSocket(self): |
| return socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) |
| |
| |
| class InetTestBase(SocketTestBase): |
| """Base class for IPv4 socket tests.""" |
| |
| host = HOST |
| |
| def setUp(self): |
| super().setUp() |
| self.port = self.serv_addr[1] |
| |
| def bindSock(self, sock): |
| socket_helper.bind_port(sock, host=self.host) |
| |
| class TCPTestBase(InetTestBase): |
| """Base class for TCP-over-IPv4 tests.""" |
| |
| def newSocket(self): |
| return socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| |
| class UDPTestBase(InetTestBase): |
| """Base class for UDP-over-IPv4 tests.""" |
| |
| def newSocket(self): |
| return socket.socket(socket.AF_INET, socket.SOCK_DGRAM) |
| |
| class UDPLITETestBase(InetTestBase): |
| """Base class for UDPLITE-over-IPv4 tests.""" |
| |
| def newSocket(self): |
| return socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDPLITE) |
| |
| class SCTPStreamBase(InetTestBase): |
| """Base class for SCTP tests in one-to-one (SOCK_STREAM) mode.""" |
| |
| def newSocket(self): |
| return socket.socket(socket.AF_INET, socket.SOCK_STREAM, |
| socket.IPPROTO_SCTP) |
| |
| |
| class Inet6TestBase(InetTestBase): |
| """Base class for IPv6 socket tests.""" |
| |
| host = socket_helper.HOSTv6 |
| |
| class UDP6TestBase(Inet6TestBase): |
| """Base class for UDP-over-IPv6 tests.""" |
| |
| def newSocket(self): |
| return socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) |
| |
| class UDPLITE6TestBase(Inet6TestBase): |
| """Base class for UDPLITE-over-IPv6 tests.""" |
| |
| def newSocket(self): |
| return socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDPLITE) |
| |
| |
| # Test-skipping decorators for use with ThreadableTest. |
| |
| def skipWithClientIf(condition, reason): |
| """Skip decorated test if condition is true, add client_skip decorator. |
| |
| If the decorated object is not a class, sets its attribute |
| "client_skip" to a decorator which will return an empty function |
| if the test is to be skipped, or the original function if it is |
| not. This can be used to avoid running the client part of a |
| skipped test when using ThreadableTest. |
| """ |
| def client_pass(*args, **kwargs): |
| pass |
| def skipdec(obj): |
| retval = unittest.skip(reason)(obj) |
| if not isinstance(obj, type): |
| retval.client_skip = lambda f: client_pass |
| return retval |
| def noskipdec(obj): |
| if not (isinstance(obj, type) or hasattr(obj, "client_skip")): |
| obj.client_skip = lambda f: f |
| return obj |
| return skipdec if condition else noskipdec |
| |
| |
| def requireAttrs(obj, *attributes): |
| """Skip decorated test if obj is missing any of the given attributes. |
| |
| Sets client_skip attribute as skipWithClientIf() does. |
| """ |
| missing = [name for name in attributes if not hasattr(obj, name)] |
| return skipWithClientIf( |
| missing, "don't have " + ", ".join(name for name in missing)) |
| |
| |
| def requireSocket(*args): |
| """Skip decorated test if a socket cannot be created with given arguments. |
| |
| When an argument is given as a string, will use the value of that |
| attribute of the socket module, or skip the test if it doesn't |
| exist. Sets client_skip attribute as skipWithClientIf() does. |
| """ |
| err = None |
| missing = [obj for obj in args if |
| isinstance(obj, str) and not hasattr(socket, obj)] |
| if missing: |
| err = "don't have " + ", ".join(name for name in missing) |
| else: |
| callargs = [getattr(socket, obj) if isinstance(obj, str) else obj |
| for obj in args] |
| try: |
| s = socket.socket(*callargs) |
| except OSError as e: |
| # XXX: check errno? |
| err = str(e) |
| else: |
| s.close() |
| return skipWithClientIf( |
| err is not None, |
| "can't create socket({0}): {1}".format( |
| ", ".join(str(o) for o in args), err)) |
| |
| |
| ####################################################################### |
| ## Begin Tests |
| |
| class GeneralModuleTests(unittest.TestCase): |
| |
| @unittest.skipUnless(_socket is not None, 'need _socket module') |
| def test_socket_type(self): |
| self.assertTrue(gc.is_tracked(_socket.socket)) |
| with self.assertRaisesRegex(TypeError, "immutable"): |
| _socket.socket.foo = 1 |
| |
| def test_SocketType_is_socketobject(self): |
| import _socket |
| self.assertTrue(socket.SocketType is _socket.socket) |
| s = socket.socket() |
| self.assertIsInstance(s, socket.SocketType) |
| s.close() |
| |
| def test_repr(self): |
| s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| with s: |
| self.assertIn('fd=%i' % s.fileno(), repr(s)) |
| self.assertIn('family=%s' % socket.AF_INET, repr(s)) |
| self.assertIn('type=%s' % socket.SOCK_STREAM, repr(s)) |
| self.assertIn('proto=0', repr(s)) |
| self.assertNotIn('raddr', repr(s)) |
| s.bind(('127.0.0.1', 0)) |
| self.assertIn('laddr', repr(s)) |
| self.assertIn(str(s.getsockname()), repr(s)) |
| self.assertIn('[closed]', repr(s)) |
| self.assertNotIn('laddr', repr(s)) |
| |
| @unittest.skipUnless(_socket is not None, 'need _socket module') |
| def test_csocket_repr(self): |
| s = _socket.socket(_socket.AF_INET, _socket.SOCK_STREAM) |
| try: |
| expected = ('<socket object, fd=%s, family=%s, type=%s, proto=%s>' |
| % (s.fileno(), s.family, s.type, s.proto)) |
| self.assertEqual(repr(s), expected) |
| finally: |
| s.close() |
| expected = ('<socket object, fd=-1, family=%s, type=%s, proto=%s>' |
| % (s.family, s.type, s.proto)) |
| self.assertEqual(repr(s), expected) |
| |
| def test_weakref(self): |
| with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: |
| p = proxy(s) |
| self.assertEqual(p.fileno(), s.fileno()) |
| s = None |
| support.gc_collect() # For PyPy or other GCs. |
| try: |
| p.fileno() |
| except ReferenceError: |
| pass |
| else: |
| self.fail('Socket proxy still exists') |
| |
| def testSocketError(self): |
| # Testing socket module exceptions |
| msg = "Error raising socket exception (%s)." |
| with self.assertRaises(OSError, msg=msg % 'OSError'): |
| raise OSError |
| with self.assertRaises(OSError, msg=msg % 'socket.herror'): |
| raise socket.herror |
| with self.assertRaises(OSError, msg=msg % 'socket.gaierror'): |
| raise socket.gaierror |
| |
| def testSendtoErrors(self): |
| # Testing that sendto doesn't mask failures. See #10169. |
| s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) |
| self.addCleanup(s.close) |
| s.bind(('', 0)) |
| sockname = s.getsockname() |
| # 2 args |
| with self.assertRaises(TypeError) as cm: |
| s.sendto('\u2620', sockname) |
| self.assertEqual(str(cm.exception), |
| "a bytes-like object is required, not 'str'") |
| with self.assertRaises(TypeError) as cm: |
| s.sendto(5j, sockname) |
| self.assertEqual(str(cm.exception), |
| "a bytes-like object is required, not 'complex'") |
| with self.assertRaises(TypeError) as cm: |
| s.sendto(b'foo', None) |
| self.assertIn('not NoneType',str(cm.exception)) |
| # 3 args |
| with self.assertRaises(TypeError) as cm: |
| s.sendto('\u2620', 0, sockname) |
| self.assertEqual(str(cm.exception), |
| "a bytes-like object is required, not 'str'") |
| with self.assertRaises(TypeError) as cm: |
| s.sendto(5j, 0, sockname) |
| self.assertEqual(str(cm.exception), |
| "a bytes-like object is required, not 'complex'") |
| with self.assertRaises(TypeError) as cm: |
| s.sendto(b'foo', 0, None) |
| self.assertIn('not NoneType', str(cm.exception)) |
| with self.assertRaises(TypeError) as cm: |
| s.sendto(b'foo', 'bar', sockname) |
| with self.assertRaises(TypeError) as cm: |
| s.sendto(b'foo', None, None) |
| # wrong number of args |
| with self.assertRaises(TypeError) as cm: |
| s.sendto(b'foo') |
| self.assertIn('(1 given)', str(cm.exception)) |
| with self.assertRaises(TypeError) as cm: |
| s.sendto(b'foo', 0, sockname, 4) |
| self.assertIn('(4 given)', str(cm.exception)) |
| |
| def testCrucialConstants(self): |
| # Testing for mission critical constants |
| socket.AF_INET |
| if socket.has_ipv6: |
| socket.AF_INET6 |
| socket.SOCK_STREAM |
| socket.SOCK_DGRAM |
| socket.SOCK_RAW |
| socket.SOCK_RDM |
| socket.SOCK_SEQPACKET |
| socket.SOL_SOCKET |
| socket.SO_REUSEADDR |
| |
| def testCrucialIpProtoConstants(self): |
| socket.IPPROTO_TCP |
| socket.IPPROTO_UDP |
| if socket.has_ipv6: |
| socket.IPPROTO_IPV6 |
| |
| @unittest.skipUnless(os.name == "nt", "Windows specific") |
| def testWindowsSpecificConstants(self): |
| socket.IPPROTO_ICLFXBM |
| socket.IPPROTO_ST |
| socket.IPPROTO_CBT |
| socket.IPPROTO_IGP |
| socket.IPPROTO_RDP |
| socket.IPPROTO_PGM |
| socket.IPPROTO_L2TP |
| socket.IPPROTO_SCTP |
| |
| @unittest.skipIf(support.is_wasi, "WASI is missing these methods") |
| def test_socket_methods(self): |
| # socket methods that depend on a configure HAVE_ check. They should |
| # be present on all platforms except WASI. |
| names = [ |
| "_accept", "bind", "connect", "connect_ex", "getpeername", |
| "getsockname", "listen", "recvfrom", "recvfrom_into", "sendto", |
| "setsockopt", "shutdown" |
| ] |
| for name in names: |
| if not hasattr(socket.socket, name): |
| self.fail(f"socket method {name} is missing") |
| |
| @unittest.skipUnless(sys.platform == 'darwin', 'macOS specific test') |
| @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test') |
| def test3542SocketOptions(self): |
| # Ref. issue #35569 and https://tools.ietf.org/html/rfc3542 |
| opts = { |
| 'IPV6_CHECKSUM', |
| 'IPV6_DONTFRAG', |
| 'IPV6_DSTOPTS', |
| 'IPV6_HOPLIMIT', |
| 'IPV6_HOPOPTS', |
| 'IPV6_NEXTHOP', |
| 'IPV6_PATHMTU', |
| 'IPV6_PKTINFO', |
| 'IPV6_RECVDSTOPTS', |
| 'IPV6_RECVHOPLIMIT', |
| 'IPV6_RECVHOPOPTS', |
| 'IPV6_RECVPATHMTU', |
| 'IPV6_RECVPKTINFO', |
| 'IPV6_RECVRTHDR', |
| 'IPV6_RECVTCLASS', |
| 'IPV6_RTHDR', |
| 'IPV6_RTHDRDSTOPTS', |
| 'IPV6_RTHDR_TYPE_0', |
| 'IPV6_TCLASS', |
| 'IPV6_USE_MIN_MTU', |
| } |
| for opt in opts: |
| self.assertTrue( |
| hasattr(socket, opt), f"Missing RFC3542 socket option '{opt}'" |
| ) |
| |
| def testHostnameRes(self): |
| # Testing hostname resolution mechanisms |
| hostname = socket.gethostname() |
| try: |
| ip = socket.gethostbyname(hostname) |
| except OSError: |
| # Probably name lookup wasn't set up right; skip this test |
| self.skipTest('name lookup failure') |
| self.assertTrue(ip.find('.') >= 0, "Error resolving host to ip.") |
| try: |
| hname, aliases, ipaddrs = socket.gethostbyaddr(ip) |
| except OSError: |
| # Probably a similar problem as above; skip this test |
| self.skipTest('name lookup failure') |
| all_host_names = [hostname, hname] + aliases |
| fqhn = socket.getfqdn(ip) |
| if not fqhn in all_host_names: |
| self.fail("Error testing host resolution mechanisms. (fqdn: %s, all: %s)" % (fqhn, repr(all_host_names))) |
| |
| def test_host_resolution(self): |
| for addr in [socket_helper.HOSTv4, '10.0.0.1', '255.255.255.255']: |
| self.assertEqual(socket.gethostbyname(addr), addr) |
| |
| # we don't test socket_helper.HOSTv6 because there's a chance it doesn't have |
| # a matching name entry (e.g. 'ip6-localhost') |
| for host in [socket_helper.HOSTv4]: |
| self.assertIn(host, socket.gethostbyaddr(host)[2]) |
| |
| def test_host_resolution_bad_address(self): |
| # These are all malformed IP addresses and expected not to resolve to |
| # any result. But some ISPs, e.g. AWS and AT&T, may successfully |
| # resolve these IPs. In particular, AT&T's DNS Error Assist service |
| # will break this test. See https://bugs.python.org/issue42092 for a |
| # workaround. |
| explanation = ( |
| "resolving an invalid IP address did not raise OSError; " |
| "can be caused by a broken DNS server" |
| ) |
| for addr in ['0.1.1.~1', '1+.1.1.1', '::1q', '::1::2', |
| '1:1:1:1:1:1:1:1:1']: |
| with self.assertRaises(OSError, msg=addr): |
| socket.gethostbyname(addr) |
| with self.assertRaises(OSError, msg=explanation): |
| socket.gethostbyaddr(addr) |
| |
| @unittest.skipUnless(hasattr(socket, 'sethostname'), "test needs socket.sethostname()") |
| @unittest.skipUnless(hasattr(socket, 'gethostname'), "test needs socket.gethostname()") |
| def test_sethostname(self): |
| oldhn = socket.gethostname() |
| try: |
| socket.sethostname('new') |
| except OSError as e: |
| if e.errno == errno.EPERM: |
| self.skipTest("test should be run as root") |
| else: |
| raise |
| try: |
| # running test as root! |
| self.assertEqual(socket.gethostname(), 'new') |
| # Should work with bytes objects too |
| socket.sethostname(b'bar') |
| self.assertEqual(socket.gethostname(), 'bar') |
| finally: |
| socket.sethostname(oldhn) |
| |
| @unittest.skipUnless(hasattr(socket, 'if_nameindex'), |
| 'socket.if_nameindex() not available.') |
| @support.skip_android_selinux('if_nameindex') |
| def testInterfaceNameIndex(self): |
| interfaces = socket.if_nameindex() |
| for index, name in interfaces: |
| self.assertIsInstance(index, int) |
| self.assertIsInstance(name, str) |
| # interface indices are non-zero integers |
| self.assertGreater(index, 0) |
| _index = socket.if_nametoindex(name) |
| self.assertIsInstance(_index, int) |
| self.assertEqual(index, _index) |
| _name = socket.if_indextoname(index) |
| self.assertIsInstance(_name, str) |
| self.assertEqual(name, _name) |
| |
| @unittest.skipUnless(hasattr(socket, 'if_indextoname'), |
| 'socket.if_indextoname() not available.') |
| @support.skip_android_selinux('if_indextoname') |
| def testInvalidInterfaceIndexToName(self): |
| self.assertRaises(OSError, socket.if_indextoname, 0) |
| self.assertRaises(OverflowError, socket.if_indextoname, -1) |
| self.assertRaises(OverflowError, socket.if_indextoname, 2**1000) |
| self.assertRaises(TypeError, socket.if_indextoname, '_DEADBEEF') |
| if hasattr(socket, 'if_nameindex'): |
| indices = dict(socket.if_nameindex()) |
| for index in indices: |
| index2 = index + 2**32 |
| if index2 not in indices: |
| with self.assertRaises((OverflowError, OSError)): |
| socket.if_indextoname(index2) |
| for index in 2**32-1, 2**64-1: |
| if index not in indices: |
| with self.assertRaises((OverflowError, OSError)): |
| socket.if_indextoname(index) |
| |
| @unittest.skipUnless(hasattr(socket, 'if_nametoindex'), |
| 'socket.if_nametoindex() not available.') |
| @support.skip_android_selinux('if_nametoindex') |
| def testInvalidInterfaceNameToIndex(self): |
| self.assertRaises(TypeError, socket.if_nametoindex, 0) |
| self.assertRaises(OSError, socket.if_nametoindex, '_DEADBEEF') |
| |
| @unittest.skipUnless(hasattr(sys, 'getrefcount'), |
| 'test needs sys.getrefcount()') |
| def testRefCountGetNameInfo(self): |
| # Testing reference count for getnameinfo |
| try: |
| # On some versions, this loses a reference |
| orig = sys.getrefcount(__name__) |
| socket.getnameinfo(__name__,0) |
| except TypeError: |
| if sys.getrefcount(__name__) != orig: |
| self.fail("socket.getnameinfo loses a reference") |
| |
| def testInterpreterCrash(self): |
| # Making sure getnameinfo doesn't crash the interpreter |
| try: |
| # On some versions, this crashes the interpreter. |
| socket.getnameinfo(('x', 0, 0, 0), 0) |
| except OSError: |
| pass |
| |
| def testNtoH(self): |
| # This just checks that htons etc. are their own inverse, |
| # when looking at the lower 16 or 32 bits. |
| sizes = {socket.htonl: 32, socket.ntohl: 32, |
| socket.htons: 16, socket.ntohs: 16} |
| for func, size in sizes.items(): |
| mask = (1<<size) - 1 |
| for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210): |
| self.assertEqual(i & mask, func(func(i&mask)) & mask) |
| |
| swapped = func(mask) |
| self.assertEqual(swapped & mask, mask) |
| self.assertRaises(OverflowError, func, 1<<34) |
| |
| @support.cpython_only |
| @unittest.skipIf(_testcapi is None, "requires _testcapi") |
| def testNtoHErrors(self): |
| import _testcapi |
| s_good_values = [0, 1, 2, 0xffff] |
| l_good_values = s_good_values + [0xffffffff] |
| l_bad_values = [-1, -2, 1<<32, 1<<1000] |
| s_bad_values = ( |
| l_bad_values + |
| [_testcapi.INT_MIN-1, _testcapi.INT_MAX+1] + |
| [1 << 16, _testcapi.INT_MAX] |
| ) |
| for k in s_good_values: |
| socket.ntohs(k) |
| socket.htons(k) |
| for k in l_good_values: |
| socket.ntohl(k) |
| socket.htonl(k) |
| for k in s_bad_values: |
| self.assertRaises(OverflowError, socket.ntohs, k) |
| self.assertRaises(OverflowError, socket.htons, k) |
| for k in l_bad_values: |
| self.assertRaises(OverflowError, socket.ntohl, k) |
| self.assertRaises(OverflowError, socket.htonl, k) |
| |
| def testGetServBy(self): |
| eq = self.assertEqual |
| # Find one service that exists, then check all the related interfaces. |
| # I've ordered this by protocols that have both a tcp and udp |
| # protocol, at least for modern Linuxes. |
| if ( |
| sys.platform.startswith( |
| ('linux', 'android', 'freebsd', 'netbsd', 'gnukfreebsd')) |
| or is_apple |
| ): |
| # avoid the 'echo' service on this platform, as there is an |
| # assumption breaking non-standard port/protocol entry |
| services = ('daytime', 'qotd', 'domain') |
| else: |
| services = ('echo', 'daytime', 'domain') |
| for service in services: |
| try: |
| port = socket.getservbyname(service, 'tcp') |
| break |
| except OSError: |
| pass |
| else: |
| raise OSError |
| # Try same call with optional protocol omitted |
| # Issue gh-71123: this fails on Android before API level 23. |
| if not (support.is_android and platform.android_ver().api_level < 23): |
| port2 = socket.getservbyname(service) |
| eq(port, port2) |
| # Try udp, but don't barf if it doesn't exist |
| try: |
| udpport = socket.getservbyname(service, 'udp') |
| except OSError: |
| udpport = None |
| else: |
| eq(udpport, port) |
| # Now make sure the lookup by port returns the same service name |
| # Issue #26936: when the protocol is omitted, this fails on Android |
| # before API level 28. |
| if not (support.is_android and platform.android_ver().api_level < 28): |
| eq(socket.getservbyport(port2), service) |
| eq(socket.getservbyport(port, 'tcp'), service) |
| if udpport is not None: |
| eq(socket.getservbyport(udpport, 'udp'), service) |
| # Make sure getservbyport does not accept out of range ports. |
| self.assertRaises(OverflowError, socket.getservbyport, -1) |
| self.assertRaises(OverflowError, socket.getservbyport, 65536) |
| |
| def testDefaultTimeout(self): |
| # Testing default timeout |
| # The default timeout should initially be None |
| self.assertEqual(socket.getdefaulttimeout(), None) |
| with socket.socket() as s: |
| self.assertEqual(s.gettimeout(), None) |
| |
| # Set the default timeout to 10, and see if it propagates |
| with socket_setdefaulttimeout(10): |
| self.assertEqual(socket.getdefaulttimeout(), 10) |
| with socket.socket() as sock: |
| self.assertEqual(sock.gettimeout(), 10) |
| |
| # Reset the default timeout to None, and see if it propagates |
| socket.setdefaulttimeout(None) |
| self.assertEqual(socket.getdefaulttimeout(), None) |
| with socket.socket() as sock: |
| self.assertEqual(sock.gettimeout(), None) |
| |
| # Check that setting it to an invalid value raises ValueError |
| self.assertRaises(ValueError, socket.setdefaulttimeout, -1) |
| |
| # Check that setting it to an invalid type raises TypeError |
| self.assertRaises(TypeError, socket.setdefaulttimeout, "spam") |
| |
| @unittest.skipUnless(hasattr(socket, 'inet_aton'), |
| 'test needs socket.inet_aton()') |
| def testIPv4_inet_aton_fourbytes(self): |
| # Test that issue1008086 and issue767150 are fixed. |
| # It must return 4 bytes. |
| self.assertEqual(b'\x00'*4, socket.inet_aton('0.0.0.0')) |
| self.assertEqual(b'\xff'*4, socket.inet_aton('255.255.255.255')) |
| |
| @unittest.skipUnless(hasattr(socket, 'inet_pton'), |
| 'test needs socket.inet_pton()') |
| def testIPv4toString(self): |
| from socket import inet_aton as f, inet_pton, AF_INET |
| g = lambda a: inet_pton(AF_INET, a) |
| |
| assertInvalid = lambda func,a: self.assertRaises( |
| (OSError, ValueError), func, a |
| ) |
| |
| self.assertEqual(b'\x00\x00\x00\x00', f('0.0.0.0')) |
| self.assertEqual(b'\xff\x00\xff\x00', f('255.0.255.0')) |
| self.assertEqual(b'\xaa\xaa\xaa\xaa', f('170.170.170.170')) |
| self.assertEqual(b'\x01\x02\x03\x04', f('1.2.3.4')) |
| self.assertEqual(b'\xff\xff\xff\xff', f('255.255.255.255')) |
| # bpo-29972: inet_pton() doesn't fail on AIX |
| if not AIX: |
| assertInvalid(f, '0.0.0.') |
| assertInvalid(f, '300.0.0.0') |
| assertInvalid(f, 'a.0.0.0') |
| assertInvalid(f, '1.2.3.4.5') |
| assertInvalid(f, '::1') |
| |
| self.assertEqual(b'\x00\x00\x00\x00', g('0.0.0.0')) |
| self.assertEqual(b'\xff\x00\xff\x00', g('255.0.255.0')) |
| self.assertEqual(b'\xaa\xaa\xaa\xaa', g('170.170.170.170')) |
| self.assertEqual(b'\xff\xff\xff\xff', g('255.255.255.255')) |
| assertInvalid(g, '0.0.0.') |
| assertInvalid(g, '300.0.0.0') |
| assertInvalid(g, 'a.0.0.0') |
| assertInvalid(g, '1.2.3.4.5') |
| assertInvalid(g, '::1') |
| |
| @unittest.skipUnless(hasattr(socket, 'inet_pton'), |
| 'test needs socket.inet_pton()') |
| def testIPv6toString(self): |
| try: |
| from socket import inet_pton, AF_INET6, has_ipv6 |
| if not has_ipv6: |
| self.skipTest('IPv6 not available') |
| except ImportError: |
| self.skipTest('could not import needed symbols from socket') |
| |
| if sys.platform == "win32": |
| try: |
| inet_pton(AF_INET6, '::') |
| except OSError as e: |
| if e.winerror == 10022: |
| self.skipTest('IPv6 might not be supported') |
| |
| f = lambda a: inet_pton(AF_INET6, a) |
| assertInvalid = lambda a: self.assertRaises( |
| (OSError, ValueError), f, a |
| ) |
| |
| self.assertEqual(b'\x00' * 16, f('::')) |
| self.assertEqual(b'\x00' * 16, f('0::0')) |
| self.assertEqual(b'\x00\x01' + b'\x00' * 14, f('1::')) |
| self.assertEqual( |
| b'\x45\xef\x76\xcb\x00\x1a\x56\xef\xaf\xeb\x0b\xac\x19\x24\xae\xae', |
| f('45ef:76cb:1a:56ef:afeb:bac:1924:aeae') |
| ) |
| self.assertEqual( |
| b'\xad\x42\x0a\xbc' + b'\x00' * 4 + b'\x01\x27\x00\x00\x02\x54\x00\x02', |
| f('ad42:abc::127:0:254:2') |
| ) |
| self.assertEqual(b'\x00\x12\x00\x0a' + b'\x00' * 12, f('12:a::')) |
| assertInvalid('0x20::') |
| assertInvalid(':::') |
| assertInvalid('::0::') |
| assertInvalid('1::abc::') |
| assertInvalid('1::abc::def') |
| assertInvalid('1:2:3:4:5:6') |
| assertInvalid('1:2:3:4:5:6:') |
| assertInvalid('1:2:3:4:5:6:7:8:0') |
| # bpo-29972: inet_pton() doesn't fail on AIX |
| if not AIX: |
| assertInvalid('1:2:3:4:5:6:7:8:') |
| |
| self.assertEqual(b'\x00' * 12 + b'\xfe\x2a\x17\x40', |
| f('::254.42.23.64') |
| ) |
| self.assertEqual( |
| b'\x00\x42' + b'\x00' * 8 + b'\xa2\x9b\xfe\x2a\x17\x40', |
| f('42::a29b:254.42.23.64') |
| ) |
| self.assertEqual( |
| b'\x00\x42\xa8\xb9\x00\x00\x00\x02\xff\xff\xa2\x9b\xfe\x2a\x17\x40', |
| f('42:a8b9:0:2:ffff:a29b:254.42.23.64') |
| ) |
| assertInvalid('255.254.253.252') |
| assertInvalid('1::260.2.3.0') |
| assertInvalid('1::0.be.e.0') |
| assertInvalid('1:2:3:4:5:6:7:1.2.3.4') |
| assertInvalid('::1.2.3.4:0') |
| assertInvalid('0.100.200.0:3:4:5:6:7:8') |
| |
| @unittest.skipUnless(hasattr(socket, 'inet_ntop'), |
| 'test needs socket.inet_ntop()') |
| def testStringToIPv4(self): |
| from socket import inet_ntoa as f, inet_ntop, AF_INET |
| g = lambda a: inet_ntop(AF_INET, a) |
| assertInvalid = lambda func,a: self.assertRaises( |
| (OSError, ValueError), func, a |
| ) |
| |
| self.assertEqual('1.0.1.0', f(b'\x01\x00\x01\x00')) |
| self.assertEqual('170.85.170.85', f(b'\xaa\x55\xaa\x55')) |
| self.assertEqual('255.255.255.255', f(b'\xff\xff\xff\xff')) |
| self.assertEqual('1.2.3.4', f(b'\x01\x02\x03\x04')) |
| assertInvalid(f, b'\x00' * 3) |
| assertInvalid(f, b'\x00' * 5) |
| assertInvalid(f, b'\x00' * 16) |
| self.assertEqual('170.85.170.85', f(bytearray(b'\xaa\x55\xaa\x55'))) |
| |
| self.assertEqual('1.0.1.0', g(b'\x01\x00\x01\x00')) |
| self.assertEqual('170.85.170.85', g(b'\xaa\x55\xaa\x55')) |
| self.assertEqual('255.255.255.255', g(b'\xff\xff\xff\xff')) |
| assertInvalid(g, b'\x00' * 3) |
| assertInvalid(g, b'\x00' * 5) |
| assertInvalid(g, b'\x00' * 16) |
| self.assertEqual('170.85.170.85', g(bytearray(b'\xaa\x55\xaa\x55'))) |
| |
| @unittest.skipUnless(hasattr(socket, 'inet_ntop'), |
| 'test needs socket.inet_ntop()') |
| def testStringToIPv6(self): |
| try: |
| from socket import inet_ntop, AF_INET6, has_ipv6 |
| if not has_ipv6: |
| self.skipTest('IPv6 not available') |
| except ImportError: |
| self.skipTest('could not import needed symbols from socket') |
| |
| if sys.platform == "win32": |
| try: |
| inet_ntop(AF_INET6, b'\x00' * 16) |
| except OSError as e: |
| if e.winerror == 10022: |
| self.skipTest('IPv6 might not be supported') |
| |
| f = lambda a: inet_ntop(AF_INET6, a) |
| assertInvalid = lambda a: self.assertRaises( |
| (OSError, ValueError), f, a |
| ) |
| |
| self.assertEqual('::', f(b'\x00' * 16)) |
| self.assertEqual('::1', f(b'\x00' * 15 + b'\x01')) |
| self.assertEqual( |
| 'aef:b01:506:1001:ffff:9997:55:170', |
| f(b'\x0a\xef\x0b\x01\x05\x06\x10\x01\xff\xff\x99\x97\x00\x55\x01\x70') |
| ) |
| self.assertEqual('::1', f(bytearray(b'\x00' * 15 + b'\x01'))) |
| |
| assertInvalid(b'\x12' * 15) |
| assertInvalid(b'\x12' * 17) |
| assertInvalid(b'\x12' * 4) |
| |
| # XXX The following don't test module-level functionality... |
| |
| def testSockName(self): |
| # Testing getsockname() |
| sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| self.addCleanup(sock.close) |
| |
| # Since find_unused_port() is inherently subject to race conditions, we |
| # call it a couple times if necessary. |
| for i in itertools.count(): |
| port = socket_helper.find_unused_port() |
| try: |
| sock.bind(("0.0.0.0", port)) |
| except OSError as e: |
| if e.errno != errno.EADDRINUSE or i == 5: |
| raise |
| else: |
| break |
| |
| name = sock.getsockname() |
| # XXX(nnorwitz): http://tinyurl.com/os5jz seems to indicate |
| # it reasonable to get the host's addr in addition to 0.0.0.0. |
| # At least for eCos. This is required for the S/390 to pass. |
| try: |
| my_ip_addr = socket.gethostbyname(socket.gethostname()) |
| except OSError: |
| # Probably name lookup wasn't set up right; skip this test |
| self.skipTest('name lookup failure') |
| self.assertIn(name[0], ("0.0.0.0", my_ip_addr), '%s invalid' % name[0]) |
| self.assertEqual(name[1], port) |
| |
| def testGetSockOpt(self): |
| # Testing getsockopt() |
| # We know a socket should start without reuse==0 |
| sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| self.addCleanup(sock.close) |
| reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) |
| self.assertFalse(reuse != 0, "initial mode is reuse") |
| |
| def testSetSockOpt(self): |
| # Testing setsockopt() |
| sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| self.addCleanup(sock.close) |
| sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) |
| reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) |
| self.assertFalse(reuse == 0, "failed to set reuse mode") |
| |
| def testSendAfterClose(self): |
| # testing send() after close() with timeout |
| with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: |
| sock.settimeout(1) |
| self.assertRaises(OSError, sock.send, b"spam") |
| |
| def testCloseException(self): |
| sock = socket.socket() |
| sock.bind((socket._LOCALHOST, 0)) |
| socket.socket(fileno=sock.fileno()).close() |
| try: |
| sock.close() |
| except OSError as err: |
| # Winsock apparently raises ENOTSOCK |
| self.assertIn(err.errno, (errno.EBADF, errno.ENOTSOCK)) |
| else: |
| self.fail("close() should raise EBADF/ENOTSOCK") |
| |
| def testNewAttributes(self): |
| # testing .family, .type and .protocol |
| |
| with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: |
| self.assertEqual(sock.family, socket.AF_INET) |
| if hasattr(socket, 'SOCK_CLOEXEC'): |
| self.assertIn(sock.type, |
| (socket.SOCK_STREAM | socket.SOCK_CLOEXEC, |
| socket.SOCK_STREAM)) |
| else: |
| self.assertEqual(sock.type, socket.SOCK_STREAM) |
| self.assertEqual(sock.proto, 0) |
| |
| def test_getsockaddrarg(self): |
| sock = socket.socket() |
| self.addCleanup(sock.close) |
| port = socket_helper.find_unused_port() |
| big_port = port + 65536 |
| neg_port = port - 65536 |
| self.assertRaises(OverflowError, sock.bind, (HOST, big_port)) |
| self.assertRaises(OverflowError, sock.bind, (HOST, neg_port)) |
| # Since find_unused_port() is inherently subject to race conditions, we |
| # call it a couple times if necessary. |
| for i in itertools.count(): |
| port = socket_helper.find_unused_port() |
| try: |
| sock.bind((HOST, port)) |
| except OSError as e: |
| if e.errno != errno.EADDRINUSE or i == 5: |
| raise |
| else: |
| break |
| |
| @unittest.skipUnless(os.name == "nt", "Windows specific") |
| def test_sock_ioctl(self): |
| self.assertTrue(hasattr(socket.socket, 'ioctl')) |
| self.assertTrue(hasattr(socket, 'SIO_RCVALL')) |
| self.assertTrue(hasattr(socket, 'RCVALL_ON')) |
| self.assertTrue(hasattr(socket, 'RCVALL_OFF')) |
| self.assertTrue(hasattr(socket, 'SIO_KEEPALIVE_VALS')) |
| s = socket.socket() |
| self.addCleanup(s.close) |
| self.assertRaises(ValueError, s.ioctl, -1, None) |
| s.ioctl(socket.SIO_KEEPALIVE_VALS, (1, 100, 100)) |
| |
| @unittest.skipUnless(os.name == "nt", "Windows specific") |
| @unittest.skipUnless(hasattr(socket, 'SIO_LOOPBACK_FAST_PATH'), |
| 'Loopback fast path support required for this test') |
| def test_sio_loopback_fast_path(self): |
| s = socket.socket() |
| self.addCleanup(s.close) |
| try: |
| s.ioctl(socket.SIO_LOOPBACK_FAST_PATH, True) |
| except OSError as exc: |
| WSAEOPNOTSUPP = 10045 |
| if exc.winerror == WSAEOPNOTSUPP: |
| self.skipTest("SIO_LOOPBACK_FAST_PATH is defined but " |
| "doesn't implemented in this Windows version") |
| raise |
| self.assertRaises(TypeError, s.ioctl, socket.SIO_LOOPBACK_FAST_PATH, None) |
| |
| def testGetaddrinfo(self): |
| try: |
| socket.getaddrinfo('localhost', 80) |
| except socket.gaierror as err: |
| if err.errno == socket.EAI_SERVICE: |
| # see http://bugs.python.org/issue1282647 |
| self.skipTest("buggy libc version") |
| raise |
| # len of every sequence is supposed to be == 5 |
| for info in socket.getaddrinfo(HOST, None): |
| self.assertEqual(len(info), 5) |
| # host can be a domain name, a string representation of an |
| # IPv4/v6 address or None |
| socket.getaddrinfo('localhost', 80) |
| socket.getaddrinfo('127.0.0.1', 80) |
| socket.getaddrinfo(None, 80) |
| if socket_helper.IPV6_ENABLED: |
| socket.getaddrinfo('::1', 80) |
| # port can be a string service name such as "http", a numeric |
| # port number or None |
| # Issue #26936: this fails on Android before API level 23. |
| if not (support.is_android and platform.android_ver().api_level < 23): |
| socket.getaddrinfo(HOST, "http") |
| socket.getaddrinfo(HOST, 80) |
| socket.getaddrinfo(HOST, None) |
| # test family and socktype filters |
| infos = socket.getaddrinfo(HOST, 80, socket.AF_INET, socket.SOCK_STREAM) |
| for family, type, _, _, _ in infos: |
| self.assertEqual(family, socket.AF_INET) |
| self.assertEqual(repr(family), '<AddressFamily.AF_INET: %r>' % family.value) |
| self.assertEqual(str(family), str(family.value)) |
| self.assertEqual(type, socket.SOCK_STREAM) |
| self.assertEqual(repr(type), '<SocketKind.SOCK_STREAM: %r>' % type.value) |
| self.assertEqual(str(type), str(type.value)) |
| infos = socket.getaddrinfo(HOST, None, 0, socket.SOCK_STREAM) |
| for _, socktype, _, _, _ in infos: |
| self.assertEqual(socktype, socket.SOCK_STREAM) |
| # test proto and flags arguments |
| socket.getaddrinfo(HOST, None, 0, 0, socket.SOL_TCP) |
| socket.getaddrinfo(HOST, None, 0, 0, 0, socket.AI_PASSIVE) |
| # a server willing to support both IPv4 and IPv6 will |
| # usually do this |
| socket.getaddrinfo(None, 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0, |
| socket.AI_PASSIVE) |
| # test keyword arguments |
| a = socket.getaddrinfo(HOST, None) |
| b = socket.getaddrinfo(host=HOST, port=None) |
| self.assertEqual(a, b) |
| a = socket.getaddrinfo(HOST, None, socket.AF_INET) |
| b = socket.getaddrinfo(HOST, None, family=socket.AF_INET) |
| self.assertEqual(a, b) |
| a = socket.getaddrinfo(HOST, None, 0, socket.SOCK_STREAM) |
| b = socket.getaddrinfo(HOST, None, type=socket.SOCK_STREAM) |
| self.assertEqual(a, b) |
| a = socket.getaddrinfo(HOST, None, 0, 0, socket.SOL_TCP) |
| b = socket.getaddrinfo(HOST, None, proto=socket.SOL_TCP) |
| self.assertEqual(a, b) |
| a = socket.getaddrinfo(HOST, None, 0, 0, 0, socket.AI_PASSIVE) |
| b = socket.getaddrinfo(HOST, None, flags=socket.AI_PASSIVE) |
| self.assertEqual(a, b) |
| a = socket.getaddrinfo(None, 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0, |
| socket.AI_PASSIVE) |
| b = socket.getaddrinfo(host=None, port=0, family=socket.AF_UNSPEC, |
| type=socket.SOCK_STREAM, proto=0, |
| flags=socket.AI_PASSIVE) |
| self.assertEqual(a, b) |
| # Issue #6697. |
| self.assertRaises(UnicodeEncodeError, socket.getaddrinfo, 'localhost', '\uD800') |
| |
| # Issue 17269: test workaround for OS X platform bug segfault |
| if hasattr(socket, 'AI_NUMERICSERV'): |
| try: |
| # The arguments here are undefined and the call may succeed |
| # or fail. All we care here is that it doesn't segfault. |
| socket.getaddrinfo("localhost", None, 0, 0, 0, |
| socket.AI_NUMERICSERV) |
| except socket.gaierror: |
| pass |
| |
| @unittest.skipIf(_testcapi is None, "requires _testcapi") |
| def test_getaddrinfo_int_port_overflow(self): |
| # gh-74895: Test that getaddrinfo does not raise OverflowError on port. |
| # |
| # POSIX getaddrinfo() never specify the valid range for "service" |
| # decimal port number values. For IPv4 and IPv6 they are technically |
| # unsigned 16-bit values, but the API is protocol agnostic. Which values |
| # trigger an error from the C library function varies by platform as |
| # they do not all perform validation. |
| |
| # The key here is that we don't want to produce OverflowError as Python |
| # prior to 3.12 did for ints outside of a [LONG_MIN, LONG_MAX] range. |
| # Leave the error up to the underlying string based platform C API. |
| |
| from _testcapi import ULONG_MAX, LONG_MAX, LONG_MIN |
| try: |
| socket.getaddrinfo(None, ULONG_MAX + 1, type=socket.SOCK_STREAM) |
| except OverflowError: |
| # Platforms differ as to what values constitute a getaddrinfo() error |
| # return. Some fail for LONG_MAX+1, others ULONG_MAX+1, and Windows |
| # silently accepts such huge "port" aka "service" numeric values. |
| self.fail("Either no error or socket.gaierror expected.") |
| except socket.gaierror: |
| pass |
| |
| try: |
| socket.getaddrinfo(None, LONG_MAX + 1, type=socket.SOCK_STREAM) |
| except OverflowError: |
| self.fail("Either no error or socket.gaierror expected.") |
| except socket.gaierror: |
| pass |
| |
| try: |
| socket.getaddrinfo(None, LONG_MAX - 0xffff + 1, type=socket.SOCK_STREAM) |
| except OverflowError: |
| self.fail("Either no error or socket.gaierror expected.") |
| except socket.gaierror: |
| pass |
| |
| try: |
| socket.getaddrinfo(None, LONG_MIN - 1, type=socket.SOCK_STREAM) |
| except OverflowError: |
| self.fail("Either no error or socket.gaierror expected.") |
| except socket.gaierror: |
| pass |
| |
| socket.getaddrinfo(None, 0, type=socket.SOCK_STREAM) # No error expected. |
| socket.getaddrinfo(None, 0xffff, type=socket.SOCK_STREAM) # No error expected. |
| |
| def test_getnameinfo(self): |
| # only IP addresses are allowed |
| self.assertRaises(OSError, socket.getnameinfo, ('mail.python.org',0), 0) |
| |
| @unittest.skipUnless(support.is_resource_enabled('network'), |
| 'network is not enabled') |
| def test_idna(self): |
| # Check for internet access before running test |
| # (issue #12804, issue #25138). |
| with socket_helper.transient_internet('python.org'): |
| socket.gethostbyname('python.org') |
| |
| # these should all be successful |
| domain = 'испытание.pythontest.net' |
| socket.gethostbyname(domain) |
| socket.gethostbyname_ex(domain) |
| socket.getaddrinfo(domain,0,socket.AF_UNSPEC,socket.SOCK_STREAM) |
| # this may not work if the forward lookup chooses the IPv6 address, as that doesn't |
| # have a reverse entry yet |
| # socket.gethostbyaddr('испытание.python.org') |
| |
| def check_sendall_interrupted(self, with_timeout): |
| # socketpair() is not strictly required, but it makes things easier. |
| if not hasattr(signal, 'alarm') or not hasattr(socket, 'socketpair'): |
| self.skipTest("signal.alarm and socket.socketpair required for this test") |
| # Our signal handlers clobber the C errno by calling a math function |
| # with an invalid domain value. |
| def ok_handler(*args): |
| self.assertRaises(ValueError, math.acosh, 0) |
| def raising_handler(*args): |
| self.assertRaises(ValueError, math.acosh, 0) |
| 1 // 0 |
| c, s = socket.socketpair() |
| old_alarm = signal.signal(signal.SIGALRM, raising_handler) |
| try: |
| if with_timeout: |
| # Just above the one second minimum for signal.alarm |
| c.settimeout(1.5) |
| with self.assertRaises(ZeroDivisionError): |
| signal.alarm(1) |
| c.sendall(b"x" * support.SOCK_MAX_SIZE) |
| if with_timeout: |
| signal.signal(signal.SIGALRM, ok_handler) |
| signal.alarm(1) |
| self.assertRaises(TimeoutError, c.sendall, |
| b"x" * support.SOCK_MAX_SIZE) |
| finally: |
| signal.alarm(0) |
| signal.signal(signal.SIGALRM, old_alarm) |
| c.close() |
| s.close() |
| |
| def test_sendall_interrupted(self): |
| self.check_sendall_interrupted(False) |
| |
| def test_sendall_interrupted_with_timeout(self): |
| self.check_sendall_interrupted(True) |
| |
| def test_dealloc_warn(self): |
| sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| r = repr(sock) |
| with self.assertWarns(ResourceWarning) as cm: |
| sock = None |
| support.gc_collect() |
| self.assertIn(r, str(cm.warning.args[0])) |
| # An open socket file object gets dereferenced after the socket |
| sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| f = sock.makefile('rb') |
| r = repr(sock) |
| sock = None |
| support.gc_collect() |
| with self.assertWarns(ResourceWarning): |
| f = None |
| support.gc_collect() |
| |
| def test_name_closed_socketio(self): |
| with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: |
| fp = sock.makefile("rb") |
| fp.close() |
| self.assertEqual(repr(fp), "<_io.BufferedReader name=-1>") |
| |
| def test_unusable_closed_socketio(self): |
| with socket.socket() as sock: |
| fp = sock.makefile("rb", buffering=0) |
| self.assertTrue(fp.readable()) |
| self.assertFalse(fp.writable()) |
| self.assertFalse(fp.seekable()) |
| fp.close() |
| self.assertRaises(ValueError, fp.readable) |
| self.assertRaises(ValueError, fp.writable) |
| self.assertRaises(ValueError, fp.seekable) |
| |
| def test_socket_close(self): |
| sock = socket.socket() |
| try: |
| sock.bind((HOST, 0)) |
| socket.close(sock.fileno()) |
| with self.assertRaises(OSError): |
| sock.listen(1) |
| finally: |
| with self.assertRaises(OSError): |
| # sock.close() fails with EBADF |
| sock.close() |
| with self.assertRaises(TypeError): |
| socket.close(None) |
| with self.assertRaises(OSError): |
| socket.close(-1) |
| |
| def test_makefile_mode(self): |
| for mode in 'r', 'rb', 'rw', 'w', 'wb': |
| with self.subTest(mode=mode): |
| with socket.socket() as sock: |
| encoding = None if "b" in mode else "utf-8" |
| with sock.makefile(mode, encoding=encoding) as fp: |
| self.assertEqual(fp.mode, mode) |
| |
| def test_makefile_invalid_mode(self): |
| for mode in 'rt', 'x', '+', 'a': |
| with self.subTest(mode=mode): |
| with socket.socket() as sock: |
| with self.assertRaisesRegex(ValueError, 'invalid mode'): |
| sock.makefile(mode) |
| |
| def test_pickle(self): |
| sock = socket.socket() |
| with sock: |
| for protocol in range(pickle.HIGHEST_PROTOCOL + 1): |
| self.assertRaises(TypeError, pickle.dumps, sock, protocol) |
| for protocol in range(pickle.HIGHEST_PROTOCOL + 1): |
| family = pickle.loads(pickle.dumps(socket.AF_INET, protocol)) |
| self.assertEqual(family, socket.AF_INET) |
| type = pickle.loads(pickle.dumps(socket.SOCK_STREAM, protocol)) |
| self.assertEqual(type, socket.SOCK_STREAM) |
| |
| def test_listen_backlog(self): |
| for backlog in 0, -1: |
| with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as srv: |
| srv.bind((HOST, 0)) |
| srv.listen(backlog) |
| |
| with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as srv: |
| srv.bind((HOST, 0)) |
| srv.listen() |
| |
| @support.cpython_only |
| @unittest.skipIf(_testcapi is None, "requires _testcapi") |
| def test_listen_backlog_overflow(self): |
| # Issue 15989 |
| import _testcapi |
| with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as srv: |
| srv.bind((HOST, 0)) |
| self.assertRaises(OverflowError, srv.listen, _testcapi.INT_MAX + 1) |
| |
| @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') |
| def test_flowinfo(self): |
| self.assertRaises(OverflowError, socket.getnameinfo, |
| (socket_helper.HOSTv6, 0, 0xffffffff), 0) |
| with socket.socket(socket.AF_INET6, socket.SOCK_STREAM) as s: |
| self.assertRaises(OverflowError, s.bind, (socket_helper.HOSTv6, 0, -10)) |
| |
| @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') |
| def test_getaddrinfo_ipv6_basic(self): |
| ((*_, sockaddr),) = socket.getaddrinfo( |
| 'ff02::1de:c0:face:8D', # Note capital letter `D`. |
| 1234, socket.AF_INET6, |
| socket.SOCK_DGRAM, |
| socket.IPPROTO_UDP |
| ) |
| self.assertEqual(sockaddr, ('ff02::1de:c0:face:8d', 1234, 0, 0)) |
| |
| def test_getfqdn_filter_localhost(self): |
| self.assertEqual(socket.getfqdn(), socket.getfqdn("0.0.0.0")) |
| self.assertEqual(socket.getfqdn(), socket.getfqdn("::")) |
| |
| @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') |
| @unittest.skipIf(sys.platform == 'win32', 'does not work on Windows') |
| @unittest.skipIf(AIX, 'Symbolic scope id does not work') |
| @unittest.skipUnless(hasattr(socket, 'if_nameindex'), "test needs socket.if_nameindex()") |
| @support.skip_android_selinux('if_nameindex') |
| def test_getaddrinfo_ipv6_scopeid_symbolic(self): |
| # Just pick up any network interface (Linux, Mac OS X) |
| (ifindex, test_interface) = socket.if_nameindex()[0] |
| ((*_, sockaddr),) = socket.getaddrinfo( |
| 'ff02::1de:c0:face:8D%' + test_interface, |
| 1234, socket.AF_INET6, |
| socket.SOCK_DGRAM, |
| socket.IPPROTO_UDP |
| ) |
| # Note missing interface name part in IPv6 address |
| self.assertEqual(sockaddr, ('ff02::1de:c0:face:8d', 1234, 0, ifindex)) |
| |
| @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') |
| @unittest.skipUnless( |
| sys.platform == 'win32', |
| 'Numeric scope id does not work or undocumented') |
| def test_getaddrinfo_ipv6_scopeid_numeric(self): |
| # Also works on Linux and Mac OS X, but is not documented (?) |
| # Windows, Linux and Max OS X allow nonexistent interface numbers here. |
| ifindex = 42 |
| ((*_, sockaddr),) = socket.getaddrinfo( |
| 'ff02::1de:c0:face:8D%' + str(ifindex), |
| 1234, socket.AF_INET6, |
| socket.SOCK_DGRAM, |
| socket.IPPROTO_UDP |
| ) |
| # Note missing interface name part in IPv6 address |
| self.assertEqual(sockaddr, ('ff02::1de:c0:face:8d', 1234, 0, ifindex)) |
| |
| @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') |
| @unittest.skipIf(sys.platform == 'win32', 'does not work on Windows') |
| @unittest.skipIf(AIX, 'Symbolic scope id does not work') |
| @unittest.skipUnless(hasattr(socket, 'if_nameindex'), "test needs socket.if_nameindex()") |
| @support.skip_android_selinux('if_nameindex') |
| def test_getnameinfo_ipv6_scopeid_symbolic(self): |
| # Just pick up any network interface. |
| (ifindex, test_interface) = socket.if_nameindex()[0] |
| sockaddr = ('ff02::1de:c0:face:8D', 1234, 0, ifindex) # Note capital letter `D`. |
| nameinfo = socket.getnameinfo(sockaddr, socket.NI_NUMERICHOST | socket.NI_NUMERICSERV) |
| self.assertEqual(nameinfo, ('ff02::1de:c0:face:8d%' + test_interface, '1234')) |
| |
| @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') |
| @unittest.skipUnless( sys.platform == 'win32', |
| 'Numeric scope id does not work or undocumented') |
| def test_getnameinfo_ipv6_scopeid_numeric(self): |
| # Also works on Linux (undocumented), but does not work on Mac OS X |
| # Windows and Linux allow nonexistent interface numbers here. |
| ifindex = 42 |
| sockaddr = ('ff02::1de:c0:face:8D', 1234, 0, ifindex) # Note capital letter `D`. |
| nameinfo = socket.getnameinfo(sockaddr, socket.NI_NUMERICHOST | socket.NI_NUMERICSERV) |
| self.assertEqual(nameinfo, ('ff02::1de:c0:face:8d%' + str(ifindex), '1234')) |
| |
| def test_str_for_enums(self): |
| # Make sure that the AF_* and SOCK_* constants have enum-like string |
| # reprs. |
| with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: |
| self.assertEqual(repr(s.family), '<AddressFamily.AF_INET: %r>' % s.family.value) |
| self.assertEqual(repr(s.type), '<SocketKind.SOCK_STREAM: %r>' % s.type.value) |
| self.assertEqual(str(s.family), str(s.family.value)) |
| self.assertEqual(str(s.type), str(s.type.value)) |
| |
| def test_socket_consistent_sock_type(self): |
| SOCK_NONBLOCK = getattr(socket, 'SOCK_NONBLOCK', 0) |
| SOCK_CLOEXEC = getattr(socket, 'SOCK_CLOEXEC', 0) |
| sock_type = socket.SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC |
| |
| with socket.socket(socket.AF_INET, sock_type) as s: |
| self.assertEqual(s.type, socket.SOCK_STREAM) |
| s.settimeout(1) |
| self.assertEqual(s.type, socket.SOCK_STREAM) |
| s.settimeout(0) |
| self.assertEqual(s.type, socket.SOCK_STREAM) |
| s.setblocking(True) |
| self.assertEqual(s.type, socket.SOCK_STREAM) |
| s.setblocking(False) |
| self.assertEqual(s.type, socket.SOCK_STREAM) |
| |
| def test_unknown_socket_family_repr(self): |
| # Test that when created with a family that's not one of the known |
| # AF_*/SOCK_* constants, socket.family just returns the number. |
| # |
| # To do this we fool socket.socket into believing it already has an |
| # open fd because on this path it doesn't actually verify the family and |
| # type and populates the socket object. |
| sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| fd = sock.detach() |
| unknown_family = max(socket.AddressFamily.__members__.values()) + 1 |
| |
| unknown_type = max( |
| kind |
| for name, kind in socket.SocketKind.__members__.items() |
| if name not in {'SOCK_NONBLOCK', 'SOCK_CLOEXEC'} |
| ) + 1 |
| |
| with socket.socket( |
| family=unknown_family, type=unknown_type, proto=23, |
| fileno=fd) as s: |
| self.assertEqual(s.family, unknown_family) |
| self.assertEqual(s.type, unknown_type) |
| # some OS like macOS ignore proto |
| self.assertIn(s.proto, {0, 23}) |
| |
| @unittest.skipUnless(hasattr(os, 'sendfile'), 'test needs os.sendfile()') |
| def test__sendfile_use_sendfile(self): |
| class File: |
| def __init__(self, fd): |
| self.fd = fd |
| |
| def fileno(self): |
| return self.fd |
| with socket.socket() as sock: |
| fd = os.open(os.curdir, os.O_RDONLY) |
| os.close(fd) |
| with self.assertRaises(socket._GiveupOnSendfile): |
| sock._sendfile_use_sendfile(File(fd)) |
| with self.assertRaises(OverflowError): |
| sock._sendfile_use_sendfile(File(2**1000)) |
| with self.assertRaises(TypeError): |
| sock._sendfile_use_sendfile(File(None)) |
| |
| def _test_socket_fileno(self, s, family, stype): |
| self.assertEqual(s.family, family) |
| self.assertEqual(s.type, stype) |
| |
| fd = s.fileno() |
| s2 = socket.socket(fileno=fd) |
| self.addCleanup(s2.close) |
| # detach old fd to avoid double close |
| s.detach() |
| self.assertEqual(s2.family, family) |
| self.assertEqual(s2.type, stype) |
| self.assertEqual(s2.fileno(), fd) |
| |
| def test_socket_fileno(self): |
| s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| self.addCleanup(s.close) |
| s.bind((socket_helper.HOST, 0)) |
| self._test_socket_fileno(s, socket.AF_INET, socket.SOCK_STREAM) |
| |
| if hasattr(socket, "SOCK_DGRAM"): |
| s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) |
| self.addCleanup(s.close) |
| s.bind((socket_helper.HOST, 0)) |
| self._test_socket_fileno(s, socket.AF_INET, socket.SOCK_DGRAM) |
| |
| if socket_helper.IPV6_ENABLED: |
| s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) |
| self.addCleanup(s.close) |
| s.bind((socket_helper.HOSTv6, 0, 0, 0)) |
| self._test_socket_fileno(s, socket.AF_INET6, socket.SOCK_STREAM) |
| |
| if hasattr(socket, "AF_UNIX"): |
| unix_name = socket_helper.create_unix_domain_name() |
| self.addCleanup(os_helper.unlink, unix_name) |
| |
| s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) |
| with s: |
| try: |
| s.bind(unix_name) |
| except PermissionError: |
| pass |
| else: |
| self._test_socket_fileno(s, socket.AF_UNIX, |
| socket.SOCK_STREAM) |
| |
| def test_socket_fileno_rejects_float(self): |
| with self.assertRaises(TypeError): |
| socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=42.5) |
| |
| def test_socket_fileno_rejects_other_types(self): |
| with self.assertRaises(TypeError): |
| socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno="foo") |
| |
| def test_socket_fileno_rejects_invalid_socket(self): |
| with self.assertRaisesRegex(ValueError, "negative file descriptor"): |
| socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=-1) |
| |
| @unittest.skipIf(os.name == "nt", "Windows disallows -1 only") |
| def test_socket_fileno_rejects_negative(self): |
| with self.assertRaisesRegex(ValueError, "negative file descriptor"): |
| socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=-42) |
| |
| def test_socket_fileno_requires_valid_fd(self): |
| WSAENOTSOCK = 10038 |
| with self.assertRaises(OSError) as cm: |
| socket.socket(fileno=os_helper.make_bad_fd()) |
| self.assertIn(cm.exception.errno, (errno.EBADF, WSAENOTSOCK)) |
| |
| with self.assertRaises(OSError) as cm: |
| socket.socket( |
| socket.AF_INET, |
| socket.SOCK_STREAM, |
| fileno=os_helper.make_bad_fd()) |
| self.assertIn(cm.exception.errno, (errno.EBADF, WSAENOTSOCK)) |
| |
| def test_socket_fileno_requires_socket_fd(self): |
| with tempfile.NamedTemporaryFile() as afile: |
| with self.assertRaises(OSError): |
| socket.socket(fileno=afile.fileno()) |
| |
| with self.assertRaises(OSError) as cm: |
| socket.socket( |
| socket.AF_INET, |
| socket.SOCK_STREAM, |
| fileno=afile.fileno()) |
| self.assertEqual(cm.exception.errno, errno.ENOTSOCK) |
| |
| def test_addressfamily_enum(self): |
| import _socket, enum |
| CheckedAddressFamily = enum._old_convert_( |
| enum.IntEnum, 'AddressFamily', 'socket', |
| lambda C: C.isupper() and C.startswith('AF_'), |
| source=_socket, |
| ) |
| enum._test_simple_enum(CheckedAddressFamily, socket.AddressFamily) |
| |
| def test_socketkind_enum(self): |
| import _socket, enum |
| CheckedSocketKind = enum._old_convert_( |
| enum.IntEnum, 'SocketKind', 'socket', |
| lambda C: C.isupper() and C.startswith('SOCK_'), |
| source=_socket, |
| ) |
| enum._test_simple_enum(CheckedSocketKind, socket.SocketKind) |
| |
| def test_msgflag_enum(self): |
| import _socket, enum |
| CheckedMsgFlag = enum._old_convert_( |
| enum.IntFlag, 'MsgFlag', 'socket', |
| lambda C: C.isupper() and C.startswith('MSG_'), |
| source=_socket, |
| ) |
| enum._test_simple_enum(CheckedMsgFlag, socket.MsgFlag) |
| |
| def test_addressinfo_enum(self): |
| import _socket, enum |
| CheckedAddressInfo = enum._old_convert_( |
| enum.IntFlag, 'AddressInfo', 'socket', |
| lambda C: C.isupper() and C.startswith('AI_'), |
| source=_socket) |
| enum._test_simple_enum(CheckedAddressInfo, socket.AddressInfo) |
| |
| |
| @unittest.skipUnless(HAVE_SOCKET_CAN, 'SocketCan required for this test.') |
| class BasicCANTest(unittest.TestCase): |
| |
| def testCrucialConstants(self): |
| socket.AF_CAN |
| socket.PF_CAN |
| socket.CAN_RAW |
| |
| @unittest.skipUnless(hasattr(socket, "CAN_BCM"), |
| 'socket.CAN_BCM required for this test.') |
| def testBCMConstants(self): |
| socket.CAN_BCM |
| |
| # opcodes |
| socket.CAN_BCM_TX_SETUP # create (cyclic) transmission task |
| socket.CAN_BCM_TX_DELETE # remove (cyclic) transmission task |
| socket.CAN_BCM_TX_READ # read properties of (cyclic) transmission task |
| socket.CAN_BCM_TX_SEND # send one CAN frame |
| socket.CAN_BCM_RX_SETUP # create RX content filter subscription |
| socket.CAN_BCM_RX_DELETE # remove RX content filter subscription |
| socket.CAN_BCM_RX_READ # read properties of RX content filter subscription |
| socket.CAN_BCM_TX_STATUS # reply to TX_READ request |
| socket.CAN_BCM_TX_EXPIRED # notification on performed transmissions (count=0) |
| socket.CAN_BCM_RX_STATUS # reply to RX_READ request |
| socket.CAN_BCM_RX_TIMEOUT # cyclic message is absent |
| socket.CAN_BCM_RX_CHANGED # updated CAN frame (detected content change) |
| |
| # flags |
| socket.CAN_BCM_SETTIMER |
| socket.CAN_BCM_STARTTIMER |
| socket.CAN_BCM_TX_COUNTEVT |
| socket.CAN_BCM_TX_ANNOUNCE |
| socket.CAN_BCM_TX_CP_CAN_ID |
| socket.CAN_BCM_RX_FILTER_ID |
| socket.CAN_BCM_RX_CHECK_DLC |
| socket.CAN_BCM_RX_NO_AUTOTIMER |
| socket.CAN_BCM_RX_ANNOUNCE_RESUME |
| socket.CAN_BCM_TX_RESET_MULTI_IDX |
| socket.CAN_BCM_RX_RTR_FRAME |
| |
| def testCreateSocket(self): |
| with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s: |
| pass |
| |
| @unittest.skipUnless(hasattr(socket, "CAN_BCM"), |
| 'socket.CAN_BCM required for this test.') |
| def testCreateBCMSocket(self): |
| with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_BCM) as s: |
| pass |
| |
| def testBindAny(self): |
| with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s: |
| address = ('', ) |
| s.bind(address) |
| self.assertEqual(s.getsockname(), address) |
| |
| def testTooLongInterfaceName(self): |
| # most systems limit IFNAMSIZ to 16, take 1024 to be sure |
| with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s: |
| self.assertRaisesRegex(OSError, 'interface name too long', |
| s.bind, ('x' * 1024,)) |
| |
| @unittest.skipUnless(hasattr(socket, "CAN_RAW_LOOPBACK"), |
| 'socket.CAN_RAW_LOOPBACK required for this test.') |
| def testLoopback(self): |
| with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s: |
| for loopback in (0, 1): |
| s.setsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_LOOPBACK, |
| loopback) |
| self.assertEqual(loopback, |
| s.getsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_LOOPBACK)) |
| |
| @unittest.skipUnless(hasattr(socket, "CAN_RAW_FILTER"), |
| 'socket.CAN_RAW_FILTER required for this test.') |
| def testFilter(self): |
| can_id, can_mask = 0x200, 0x700 |
| can_filter = struct.pack("=II", can_id, can_mask) |
| with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s: |
| s.setsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_FILTER, can_filter) |
| self.assertEqual(can_filter, |
| s.getsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_FILTER, 8)) |
| s.setsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_FILTER, bytearray(can_filter)) |
| |
| |
| @unittest.skipUnless(HAVE_SOCKET_CAN, 'SocketCan required for this test.') |
| class CANTest(ThreadedCANSocketTest): |
| |
| def __init__(self, methodName='runTest'): |
| ThreadedCANSocketTest.__init__(self, methodName=methodName) |
| |
| @classmethod |
| def build_can_frame(cls, can_id, data): |
| """Build a CAN frame.""" |
| can_dlc = len(data) |
| data = data.ljust(8, b'\x00') |
| return struct.pack(cls.can_frame_fmt, can_id, can_dlc, data) |
| |
| @classmethod |
| def dissect_can_frame(cls, frame): |
| """Dissect a CAN frame.""" |
| can_id, can_dlc, data = struct.unpack(cls.can_frame_fmt, frame) |
| return (can_id, can_dlc, data[:can_dlc]) |
| |
| def testSendFrame(self): |
| cf, addr = self.s.recvfrom(self.bufsize) |
| self.assertEqual(self.cf, cf) |
| self.assertEqual(addr[0], self.interface) |
| |
| def _testSendFrame(self): |
| self.cf = self.build_can_frame(0x00, b'\x01\x02\x03\x04\x05') |
| self.cli.send(self.cf) |
| |
| def testSendMaxFrame(self): |
| cf, addr = self.s.recvfrom(self.bufsize) |
| self.assertEqual(self.cf, cf) |
| |
| def _testSendMaxFrame(self): |
| self.cf = self.build_can_frame(0x00, b'\x07' * 8) |
| self.cli.send(self.cf) |
| |
| def testSendMultiFrames(self): |
| cf, addr = self.s.recvfrom(self.bufsize) |
| self.assertEqual(self.cf1, cf) |
| |
| cf, addr = self.s.recvfrom(self.bufsize) |
| self.assertEqual(self.cf2, cf) |
| |
| def _testSendMultiFrames(self): |
| self.cf1 = self.build_can_frame(0x07, b'\x44\x33\x22\x11') |
| self.cli.send(self.cf1) |
| |
| self.cf2 = self.build_can_frame(0x12, b'\x99\x22\x33') |
| self.cli.send(self.cf2) |
| |
| @unittest.skipUnless(hasattr(socket, "CAN_BCM"), |
| 'socket.CAN_BCM required for this test.') |
| def _testBCM(self): |
| cf, addr = self.cli.recvfrom(self.bufsize) |
| self.assertEqual(self.cf, cf) |
| can_id, can_dlc, data = self.dissect_can_frame(cf) |
| self.assertEqual(self.can_id, can_id) |
| self.assertEqual(self.data, data) |
| |
| @unittest.skipUnless(hasattr(socket, "CAN_BCM"), |
| 'socket.CAN_BCM required for this test.') |
| def testBCM(self): |
| bcm = socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_BCM) |
| self.addCleanup(bcm.close) |
| bcm.connect((self.interface,)) |
| self.can_id = 0x123 |
| self.data = bytes([0xc0, 0xff, 0xee]) |
| self.cf = self.build_can_frame(self.can_id, self.data) |
| opcode = socket.CAN_BCM_TX_SEND |
| flags = 0 |
| count = 0 |
| ival1_seconds = ival1_usec = ival2_seconds = ival2_usec = 0 |
| bcm_can_id = 0x0222 |
| nframes = 1 |
| assert len(self.cf) == 16 |
| header = struct.pack(self.bcm_cmd_msg_fmt, |
| opcode, |
| flags, |
| count, |
| ival1_seconds, |
| ival1_usec, |
| ival2_seconds, |
| ival2_usec, |
| bcm_can_id, |
| nframes, |
| ) |
| header_plus_frame = header + self.cf |
| bytes_sent = bcm.send(header_plus_frame) |
| self.assertEqual(bytes_sent, len(header_plus_frame)) |
| |
| |
| @unittest.skipUnless(HAVE_SOCKET_CAN_ISOTP, 'CAN ISOTP required for this test.') |
| class ISOTPTest(unittest.TestCase): |
| |
| def __init__(self, *args, **kwargs): |
| super().__init__(*args, **kwargs) |
| self.interface = "vcan0" |
| |
| def testCrucialConstants(self): |
| socket.AF_CAN |
| socket.PF_CAN |
| socket.CAN_ISOTP |
| socket.SOCK_DGRAM |
| |
| def testCreateSocket(self): |
| with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s: |
| pass |
| |
| @unittest.skipUnless(hasattr(socket, "CAN_ISOTP"), |
| 'socket.CAN_ISOTP required for this test.') |
| def testCreateISOTPSocket(self): |
| with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_ISOTP) as s: |
| pass |
| |
| def testTooLongInterfaceName(self): |
| # most systems limit IFNAMSIZ to 16, take 1024 to be sure |
| with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_ISOTP) as s: |
| with self.assertRaisesRegex(OSError, 'interface name too long'): |
| s.bind(('x' * 1024, 1, 2)) |
| |
| def testBind(self): |
| try: |
| with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_ISOTP) as s: |
| addr = self.interface, 0x123, 0x456 |
| s.bind(addr) |
| self.assertEqual(s.getsockname(), addr) |
| except OSError as e: |
| if e.errno == errno.ENODEV: |
| self.skipTest('network interface `%s` does not exist' % |
| self.interface) |
| else: |
| raise |
| |
| |
| @unittest.skipUnless(HAVE_SOCKET_CAN_J1939, 'CAN J1939 required for this test.') |
| class J1939Test(unittest.TestCase): |
| |
| def __init__(self, *args, **kwargs): |
| super().__init__(*args, **kwargs) |
| self.interface = "vcan0" |
| |
| @unittest.skipUnless(hasattr(socket, "CAN_J1939"), |
| 'socket.CAN_J1939 required for this test.') |
| def testJ1939Constants(self): |
| socket.CAN_J1939 |
| |
| socket.J1939_MAX_UNICAST_ADDR |
| socket.J1939_IDLE_ADDR |
| socket.J1939_NO_ADDR |
| socket.J1939_NO_NAME |
| socket.J1939_PGN_REQUEST |
| socket.J1939_PGN_ADDRESS_CLAIMED |
| socket.J1939_PGN_ADDRESS_COMMANDED |
| socket.J1939_PGN_PDU1_MAX |
| socket.J1939_PGN_MAX |
| socket.J1939_NO_PGN |
| |
| # J1939 socket options |
| socket.SO_J1939_FILTER |
| socket.SO_J1939_PROMISC |
| socket.SO_J1939_SEND_PRIO |
| socket.SO_J1939_ERRQUEUE |
| |
| socket.SCM_J1939_DEST_ADDR |
| socket.SCM_J1939_DEST_NAME |
| socket.SCM_J1939_PRIO |
| socket.SCM_J1939_ERRQUEUE |
| |
| socket.J1939_NLA_PAD |
| socket.J1939_NLA_BYTES_ACKED |
| |
| socket.J1939_EE_INFO_NONE |
| socket.J1939_EE_INFO_TX_ABORT |
| |
| socket.J1939_FILTER_MAX |
| |
| @unittest.skipUnless(hasattr(socket, "CAN_J1939"), |
| 'socket.CAN_J1939 required for this test.') |
| def testCreateJ1939Socket(self): |
| with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_J1939) as s: |
| pass |
| |
| def testBind(self): |
| try: |
| with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_J1939) as s: |
| addr = self.interface, socket.J1939_NO_NAME, socket.J1939_NO_PGN, socket.J1939_NO_ADDR |
| s.bind(addr) |
| self.assertEqual(s.getsockname(), addr) |
| except OSError as e: |
| if e.errno == errno.ENODEV: |
| self.skipTest('network interface `%s` does not exist' % |
| self.interface) |
| else: |
| raise |
| |
| |
| @unittest.skipUnless(HAVE_SOCKET_RDS, 'RDS sockets required for this test.') |
| class BasicRDSTest(unittest.TestCase): |
| |
| def testCrucialConstants(self): |
| socket.AF_RDS |
| socket.PF_RDS |
| |
| def testCreateSocket(self): |
| with socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) as s: |
| pass |
| |
| def testSocketBufferSize(self): |
| bufsize = 16384 |
| with socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) as s: |
| s.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, bufsize) |
| s.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, bufsize) |
| |
| |
| @unittest.skipUnless(HAVE_SOCKET_RDS, 'RDS sockets required for this test.') |
| class RDSTest(ThreadedRDSSocketTest): |
| |
| def __init__(self, methodName='runTest'): |
| ThreadedRDSSocketTest.__init__(self, methodName=methodName) |
| |
| def setUp(self): |
| super().setUp() |
| self.evt = threading.Event() |
| |
| def testSendAndRecv(self): |
| data, addr = self.serv.recvfrom(self.bufsize) |
| self.assertEqual(self.data, data) |
| self.assertEqual(self.cli_addr, addr) |
| |
| def _testSendAndRecv(self): |
| self.data = b'spam' |
| self.cli.sendto(self.data, 0, (HOST, self.port)) |
| |
| def testPeek(self): |
| data, addr = self.serv.recvfrom(self.bufsize, socket.MSG_PEEK) |
| self.assertEqual(self.data, data) |
| data, addr = self.serv.recvfrom(self.bufsize) |
| self.assertEqual(self.data, data) |
| |
| def _testPeek(self): |
| self.data = b'spam' |
| self.cli.sendto(self.data, 0, (HOST, self.port)) |
| |
| @requireAttrs(socket.socket, 'recvmsg') |
| def testSendAndRecvMsg(self): |
| data, ancdata, msg_flags, addr = self.serv.recvmsg(self.bufsize) |
| self.assertEqual(self.data, data) |
| |
| @requireAttrs(socket.socket, 'sendmsg') |
| def _testSendAndRecvMsg(self): |
| self.data = b'hello ' * 10 |
| self.cli.sendmsg([self.data], (), 0, (HOST, self.port)) |
| |
| def testSendAndRecvMulti(self): |
| data, addr = self.serv.recvfrom(self.bufsize) |
| self.assertEqual(self.data1, data) |
| |
| data, addr = self.serv.recvfrom(self.bufsize) |
| self.assertEqual(self.data2, data) |
| |
| def _testSendAndRecvMulti(self): |
| self.data1 = b'bacon' |
| self.cli.sendto(self.data1, 0, (HOST, self.port)) |
| |
| self.data2 = b'egg' |
| self.cli.sendto(self.data2, 0, (HOST, self.port)) |
| |
| def testSelect(self): |
| r, w, x = select.select([self.serv], [], [], 3.0) |
| self.assertIn(self.serv, r) |
| data, addr = self.serv.recvfrom(self.bufsize) |
| self.assertEqual(self.data, data) |
| |
| def _testSelect(self): |
| self.data = b'select' |
| self.cli.sendto(self.data, 0, (HOST, self.port)) |
| |
| @unittest.skipUnless(HAVE_SOCKET_QIPCRTR, |
| 'QIPCRTR sockets required for this test.') |
| class BasicQIPCRTRTest(unittest.TestCase): |
| |
| def testCrucialConstants(self): |
| socket.AF_QIPCRTR |
| |
| def testCreateSocket(self): |
| with socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM) as s: |
| pass |
| |
| def testUnbound(self): |
| with socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM) as s: |
| self.assertEqual(s.getsockname()[1], 0) |
| |
| def testBindSock(self): |
| with socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM) as s: |
| socket_helper.bind_port(s, host=s.getsockname()[0]) |
| self.assertNotEqual(s.getsockname()[1], 0) |
| |
| def testInvalidBindSock(self): |
| with socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM) as s: |
| self.assertRaises(OSError, socket_helper.bind_port, s, host=-2) |
| |
| def testAutoBindSock(self): |
| with socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM) as s: |
| s.connect((123, 123)) |
| self.assertNotEqual(s.getsockname()[1], 0) |
| |
| @unittest.skipIf(fcntl is None, "need fcntl") |
| @unittest.skipUnless(HAVE_SOCKET_VSOCK, |
| 'VSOCK sockets required for this test.') |
| class BasicVSOCKTest(unittest.TestCase): |
| |
| def testCrucialConstants(self): |
| socket.AF_VSOCK |
| |
| def testVSOCKConstants(self): |
| socket.SO_VM_SOCKETS_BUFFER_SIZE |
| socket.SO_VM_SOCKETS_BUFFER_MIN_SIZE |
| socket.SO_VM_SOCKETS_BUFFER_MAX_SIZE |
| socket.VMADDR_CID_ANY |
| socket.VMADDR_PORT_ANY |
| socket.VMADDR_CID_LOCAL |
| socket.VMADDR_CID_HOST |
| socket.VM_SOCKETS_INVALID_VERSION |
| socket.IOCTL_VM_SOCKETS_GET_LOCAL_CID |
| |
| def testCreateSocket(self): |
| with socket.socket(socket.AF_VSOCK, socket.SOCK_STREAM) as s: |
| pass |
| |
| def testSocketBufferSize(self): |
| with socket.socket(socket.AF_VSOCK, socket.SOCK_STREAM) as s: |
| orig_max = s.getsockopt(socket.AF_VSOCK, |
| socket.SO_VM_SOCKETS_BUFFER_MAX_SIZE) |
| orig = s.getsockopt(socket.AF_VSOCK, |
| socket.SO_VM_SOCKETS_BUFFER_SIZE) |
| orig_min = s.getsockopt(socket.AF_VSOCK, |
| socket.SO_VM_SOCKETS_BUFFER_MIN_SIZE) |
| |
| s.setsockopt(socket.AF_VSOCK, |
| socket.SO_VM_SOCKETS_BUFFER_MAX_SIZE, orig_max * 2) |
| s.setsockopt(socket.AF_VSOCK, |
| socket.SO_VM_SOCKETS_BUFFER_SIZE, orig * 2) |
| s.setsockopt(socket.AF_VSOCK, |
| socket.SO_VM_SOCKETS_BUFFER_MIN_SIZE, orig_min * 2) |
| |
| self.assertEqual(orig_max * 2, |
| s.getsockopt(socket.AF_VSOCK, |
| socket.SO_VM_SOCKETS_BUFFER_MAX_SIZE)) |
| self.assertEqual(orig * 2, |
| s.getsockopt(socket.AF_VSOCK, |
| socket.SO_VM_SOCKETS_BUFFER_SIZE)) |
| self.assertEqual(orig_min * 2, |
| s.getsockopt(socket.AF_VSOCK, |
| socket.SO_VM_SOCKETS_BUFFER_MIN_SIZE)) |
| |
| |
| @unittest.skipUnless(HAVE_SOCKET_BLUETOOTH, |
| 'Bluetooth sockets required for this test.') |
| class BasicBluetoothTest(unittest.TestCase): |
| |
| def testBluetoothConstants(self): |
| socket.BDADDR_ANY |
| socket.BDADDR_LOCAL |
| socket.AF_BLUETOOTH |
| socket.BTPROTO_RFCOMM |
| |
| if sys.platform != "win32": |
| socket.BTPROTO_HCI |
| socket.SOL_HCI |
| socket.BTPROTO_L2CAP |
| |
| if not sys.platform.startswith("freebsd"): |
| socket.BTPROTO_SCO |
| |
| def testCreateRfcommSocket(self): |
| with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_STREAM, socket.BTPROTO_RFCOMM) as s: |
| pass |
| |
| @unittest.skipIf(sys.platform == "win32", "windows does not support L2CAP sockets") |
| def testCreateL2capSocket(self): |
| with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_L2CAP) as s: |
| pass |
| |
| @unittest.skipIf(sys.platform == "win32", "windows does not support HCI sockets") |
| def testCreateHciSocket(self): |
| with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_RAW, socket.BTPROTO_HCI) as s: |
| pass |
| |
| @unittest.skipIf(sys.platform == "win32" or sys.platform.startswith("freebsd"), |
| "windows and freebsd do not support SCO sockets") |
| def testCreateScoSocket(self): |
| with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_SCO) as s: |
| pass |
| |
| |
| @unittest.skipUnless(HAVE_SOCKET_HYPERV, |
| 'Hyper-V sockets required for this test.') |
| class BasicHyperVTest(unittest.TestCase): |
| |
| def testHyperVConstants(self): |
| socket.HVSOCKET_CONNECT_TIMEOUT |
| socket.HVSOCKET_CONNECT_TIMEOUT_MAX |
| socket.HVSOCKET_CONNECTED_SUSPEND |
| socket.HVSOCKET_ADDRESS_FLAG_PASSTHRU |
| socket.HV_GUID_ZERO |
| socket.HV_GUID_WILDCARD |
| socket.HV_GUID_BROADCAST |
| socket.HV_GUID_CHILDREN |
| socket.HV_GUID_LOOPBACK |
| socket.HV_GUID_PARENT |
| |
| def testCreateHyperVSocketWithUnknownProtoFailure(self): |
| expected = r"\[WinError 10041\]" |
| with self.assertRaisesRegex(OSError, expected): |
| socket.socket(socket.AF_HYPERV, socket.SOCK_STREAM) |
| |
| def testCreateHyperVSocketAddrNotTupleFailure(self): |
| expected = "connect(): AF_HYPERV address must be tuple, not str" |
| with socket.socket(socket.AF_HYPERV, socket.SOCK_STREAM, socket.HV_PROTOCOL_RAW) as s: |
| with self.assertRaisesRegex(TypeError, re.escape(expected)): |
| s.connect(socket.HV_GUID_ZERO) |
| |
| def testCreateHyperVSocketAddrNotTupleOf2StrsFailure(self): |
| expected = "AF_HYPERV address must be a str tuple (vm_id, service_id)" |
| with socket.socket(socket.AF_HYPERV, socket.SOCK_STREAM, socket.HV_PROTOCOL_RAW) as s: |
| with self.assertRaisesRegex(TypeError, re.escape(expected)): |
| s.connect((socket.HV_GUID_ZERO,)) |
| |
| def testCreateHyperVSocketAddrNotTupleOfStrsFailure(self): |
| expected = "AF_HYPERV address must be a str tuple (vm_id, service_id)" |
| with socket.socket(socket.AF_HYPERV, socket.SOCK_STREAM, socket.HV_PROTOCOL_RAW) as s: |
| with self.assertRaisesRegex(TypeError, re.escape(expected)): |
| s.connect((1, 2)) |
| |
| def testCreateHyperVSocketAddrVmIdNotValidUUIDFailure(self): |
| expected = "connect(): AF_HYPERV address vm_id is not a valid UUID string" |
| with socket.socket(socket.AF_HYPERV, socket.SOCK_STREAM, socket.HV_PROTOCOL_RAW) as s: |
| with self.assertRaisesRegex(ValueError, re.escape(expected)): |
| s.connect(("00", socket.HV_GUID_ZERO)) |
| |
| def testCreateHyperVSocketAddrServiceIdNotValidUUIDFailure(self): |
| expected = "connect(): AF_HYPERV address service_id is not a valid UUID string" |
| with socket.socket(socket.AF_HYPERV, socket.SOCK_STREAM, socket.HV_PROTOCOL_RAW) as s: |
| with self.assertRaisesRegex(ValueError, re.escape(expected)): |
| s.connect((socket.HV_GUID_ZERO, "00")) |
| |
| |
| class BasicTCPTest(SocketConnectedTest): |
| |
| def __init__(self, methodName='runTest'): |
| SocketConnectedTest.__init__(self, methodName=methodName) |
| |
| def testRecv(self): |
| # Testing large receive over TCP |
| msg = self.cli_conn.recv(1024) |
| self.assertEqual(msg, MSG) |
| |
| def _testRecv(self): |
| self.serv_conn.send(MSG) |
| |
| def testOverFlowRecv(self): |
| # Testing receive in chunks over TCP |
| seg1 = self.cli_conn.recv(len(MSG) - 3) |
| seg2 = self.cli_conn.recv(1024) |
| msg = seg1 + seg2 |
| self.assertEqual(msg, MSG) |
| |
| def _testOverFlowRecv(self): |
| self.serv_conn.send(MSG) |
| |
| def testRecvFrom(self): |
| # Testing large recvfrom() over TCP |
| msg, addr = self.cli_conn.recvfrom(1024) |
| self.assertEqual(msg, MSG) |
| |
| def _testRecvFrom(self): |
| self.serv_conn.send(MSG) |
| |
| def testOverFlowRecvFrom(self): |
| # Testing recvfrom() in chunks over TCP |
| seg1, addr = self.cli_conn.recvfrom(len(MSG)-3) |
| seg2, addr = self.cli_conn.recvfrom(1024) |
| msg = seg1 + seg2 |
| self.assertEqual(msg, MSG) |
| |
| def _testOverFlowRecvFrom(self): |
| self.serv_conn.send(MSG) |
| |
| def testSendAll(self): |
| # Testing sendall() with a 2048 byte string over TCP |
| msg = b'' |
| while 1: |
| read = self.cli_conn.recv(1024) |
| if not read: |
| break |
| msg += read |
| self.assertEqual(msg, b'f' * 2048) |
| |
| def _testSendAll(self): |
| big_chunk = b'f' * 2048 |
| self.serv_conn.sendall(big_chunk) |
| |
| def testFromFd(self): |
| # Testing fromfd() |
| fd = self.cli_conn.fileno() |
| sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM) |
| self.addCleanup(sock.close) |
| self.assertIsInstance(sock, socket.socket) |
| msg = sock.recv(1024) |
| self.assertEqual(msg, MSG) |
| |
| def _testFromFd(self): |
| self.serv_conn.send(MSG) |
| |
| def testDup(self): |
| # Testing dup() |
| sock = self.cli_conn.dup() |
| self.addCleanup(sock.close) |
| msg = sock.recv(1024) |
| self.assertEqual(msg, MSG) |
| |
| def _testDup(self): |
| self.serv_conn.send(MSG) |
| |
| def check_shutdown(self): |
| # Test shutdown() helper |
| msg = self.cli_conn.recv(1024) |
| self.assertEqual(msg, MSG) |
| # wait for _testShutdown[_overflow] to finish: on OS X, when the server |
| # closes the connection the client also becomes disconnected, |
| # and the client's shutdown call will fail. (Issue #4397.) |
| self.done.wait() |
| |
| def testShutdown(self): |
| self.check_shutdown() |
| |
| def _testShutdown(self): |
| self.serv_conn.send(MSG) |
| self.serv_conn.shutdown(2) |
| |
| @support.cpython_only |
| @unittest.skipIf(_testcapi is None, "requires _testcapi") |
| def testShutdown_overflow(self): |
| self.check_shutdown() |
| |
| @support.cpython_only |
| @unittest.skipIf(_testcapi is None, "requires _testcapi") |
| def _testShutdown_overflow(self): |
| import _testcapi |
| self.serv_conn |