|  | """Event loop using a selector and related classes. | 
|  |  | 
|  | A selector is a "notify-when-ready" multiplexer.  For a subclass which | 
|  | also includes support for signal handling, see the unix_events sub-module. | 
|  | """ | 
|  |  | 
|  | __all__ = ['BaseSelectorEventLoop'] | 
|  |  | 
|  | import collections | 
|  | import errno | 
|  | import functools | 
|  | import socket | 
|  | import warnings | 
|  | import weakref | 
|  | try: | 
|  | import ssl | 
|  | except ImportError:  # pragma: no cover | 
|  | ssl = None | 
|  |  | 
|  | from . import base_events | 
|  | from . import compat | 
|  | from . import constants | 
|  | from . import events | 
|  | from . import futures | 
|  | from . import selectors | 
|  | from . import transports | 
|  | from . import sslproto | 
|  | from .coroutines import coroutine | 
|  | from .log import logger | 
|  |  | 
|  |  | 
|  | def _test_selector_event(selector, fd, event): | 
|  | # Test if the selector is monitoring 'event' events | 
|  | # for the file descriptor 'fd'. | 
|  | try: | 
|  | key = selector.get_key(fd) | 
|  | except KeyError: | 
|  | return False | 
|  | else: | 
|  | return bool(key.events & event) | 
|  |  | 
|  |  | 
|  | if hasattr(socket, 'TCP_NODELAY'): | 
|  | def _set_nodelay(sock): | 
|  | if (sock.family in {socket.AF_INET, socket.AF_INET6} and | 
|  | sock.type == socket.SOCK_STREAM and | 
|  | sock.proto == socket.IPPROTO_TCP): | 
|  | sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) | 
|  | else: | 
|  | def _set_nodelay(sock): | 
|  | pass | 
|  |  | 
|  |  | 
|  | class BaseSelectorEventLoop(base_events.BaseEventLoop): | 
|  | """Selector event loop. | 
|  |  | 
|  | See events.EventLoop for API specification. | 
|  | """ | 
|  |  | 
|  | def __init__(self, selector=None): | 
|  | super().__init__() | 
|  |  | 
|  | if selector is None: | 
|  | selector = selectors.DefaultSelector() | 
|  | logger.debug('Using selector: %s', selector.__class__.__name__) | 
|  | self._selector = selector | 
|  | self._make_self_pipe() | 
|  | self._transports = weakref.WeakValueDictionary() | 
|  |  | 
|  | def _make_socket_transport(self, sock, protocol, waiter=None, *, | 
|  | extra=None, server=None): | 
|  | return _SelectorSocketTransport(self, sock, protocol, waiter, | 
|  | extra, server) | 
|  |  | 
|  | def _make_ssl_transport(self, rawsock, protocol, sslcontext, waiter=None, | 
|  | *, server_side=False, server_hostname=None, | 
|  | extra=None, server=None): | 
|  | if not sslproto._is_sslproto_available(): | 
|  | return self._make_legacy_ssl_transport( | 
|  | rawsock, protocol, sslcontext, waiter, | 
|  | server_side=server_side, server_hostname=server_hostname, | 
|  | extra=extra, server=server) | 
|  |  | 
|  | ssl_protocol = sslproto.SSLProtocol(self, protocol, sslcontext, waiter, | 
|  | server_side, server_hostname) | 
|  | _SelectorSocketTransport(self, rawsock, ssl_protocol, | 
|  | extra=extra, server=server) | 
|  | return ssl_protocol._app_transport | 
|  |  | 
|  | def _make_legacy_ssl_transport(self, rawsock, protocol, sslcontext, | 
|  | waiter, *, | 
|  | server_side=False, server_hostname=None, | 
|  | extra=None, server=None): | 
|  | # Use the legacy API: SSL_write, SSL_read, etc. The legacy API is used | 
|  | # on Python 3.4 and older, when ssl.MemoryBIO is not available. | 
|  | return _SelectorSslTransport( | 
|  | self, rawsock, protocol, sslcontext, waiter, | 
|  | server_side, server_hostname, extra, server) | 
|  |  | 
|  | def _make_datagram_transport(self, sock, protocol, | 
|  | address=None, waiter=None, extra=None): | 
|  | return _SelectorDatagramTransport(self, sock, protocol, | 
|  | address, waiter, extra) | 
|  |  | 
|  | def close(self): | 
|  | if self.is_running(): | 
|  | raise RuntimeError("Cannot close a running event loop") | 
|  | if self.is_closed(): | 
|  | return | 
|  | self._close_self_pipe() | 
|  | super().close() | 
|  | if self._selector is not None: | 
|  | self._selector.close() | 
|  | self._selector = None | 
|  |  | 
|  | def _socketpair(self): | 
|  | raise NotImplementedError | 
|  |  | 
|  | def _close_self_pipe(self): | 
|  | self._remove_reader(self._ssock.fileno()) | 
|  | self._ssock.close() | 
|  | self._ssock = None | 
|  | self._csock.close() | 
|  | self._csock = None | 
|  | self._internal_fds -= 1 | 
|  |  | 
|  | def _make_self_pipe(self): | 
|  | # A self-socket, really. :-) | 
|  | self._ssock, self._csock = self._socketpair() | 
|  | self._ssock.setblocking(False) | 
|  | self._csock.setblocking(False) | 
|  | self._internal_fds += 1 | 
|  | self._add_reader(self._ssock.fileno(), self._read_from_self) | 
|  |  | 
|  | def _process_self_data(self, data): | 
|  | pass | 
|  |  | 
|  | def _read_from_self(self): | 
|  | while True: | 
|  | try: | 
|  | data = self._ssock.recv(4096) | 
|  | if not data: | 
|  | break | 
|  | self._process_self_data(data) | 
|  | except InterruptedError: | 
|  | continue | 
|  | except BlockingIOError: | 
|  | break | 
|  |  | 
|  | def _write_to_self(self): | 
|  | # This may be called from a different thread, possibly after | 
|  | # _close_self_pipe() has been called or even while it is | 
|  | # running.  Guard for self._csock being None or closed.  When | 
|  | # a socket is closed, send() raises OSError (with errno set to | 
|  | # EBADF, but let's not rely on the exact error code). | 
|  | csock = self._csock | 
|  | if csock is not None: | 
|  | try: | 
|  | csock.send(b'\0') | 
|  | except OSError: | 
|  | if self._debug: | 
|  | logger.debug("Fail to write a null byte into the " | 
|  | "self-pipe socket", | 
|  | exc_info=True) | 
|  |  | 
|  | def _start_serving(self, protocol_factory, sock, | 
|  | sslcontext=None, server=None, backlog=100): | 
|  | self._add_reader(sock.fileno(), self._accept_connection, | 
|  | protocol_factory, sock, sslcontext, server, backlog) | 
|  |  | 
|  | def _accept_connection(self, protocol_factory, sock, | 
|  | sslcontext=None, server=None, backlog=100): | 
|  | # This method is only called once for each event loop tick where the | 
|  | # listening socket has triggered an EVENT_READ. There may be multiple | 
|  | # connections waiting for an .accept() so it is called in a loop. | 
|  | # See https://bugs.python.org/issue27906 for more details. | 
|  | for _ in range(backlog): | 
|  | try: | 
|  | conn, addr = sock.accept() | 
|  | if self._debug: | 
|  | logger.debug("%r got a new connection from %r: %r", | 
|  | server, addr, conn) | 
|  | conn.setblocking(False) | 
|  | except (BlockingIOError, InterruptedError, ConnectionAbortedError): | 
|  | # Early exit because the socket accept buffer is empty. | 
|  | return None | 
|  | except OSError as exc: | 
|  | # There's nowhere to send the error, so just log it. | 
|  | if exc.errno in (errno.EMFILE, errno.ENFILE, | 
|  | errno.ENOBUFS, errno.ENOMEM): | 
|  | # Some platforms (e.g. Linux keep reporting the FD as | 
|  | # ready, so we remove the read handler temporarily. | 
|  | # We'll try again in a while. | 
|  | self.call_exception_handler({ | 
|  | 'message': 'socket.accept() out of system resource', | 
|  | 'exception': exc, | 
|  | 'socket': sock, | 
|  | }) | 
|  | self._remove_reader(sock.fileno()) | 
|  | self.call_later(constants.ACCEPT_RETRY_DELAY, | 
|  | self._start_serving, | 
|  | protocol_factory, sock, sslcontext, server, | 
|  | backlog) | 
|  | else: | 
|  | raise  # The event loop will catch, log and ignore it. | 
|  | else: | 
|  | extra = {'peername': addr} | 
|  | accept = self._accept_connection2(protocol_factory, conn, extra, | 
|  | sslcontext, server) | 
|  | self.create_task(accept) | 
|  |  | 
|  | @coroutine | 
|  | def _accept_connection2(self, protocol_factory, conn, extra, | 
|  | sslcontext=None, server=None): | 
|  | protocol = None | 
|  | transport = None | 
|  | try: | 
|  | protocol = protocol_factory() | 
|  | waiter = self.create_future() | 
|  | if sslcontext: | 
|  | transport = self._make_ssl_transport( | 
|  | conn, protocol, sslcontext, waiter=waiter, | 
|  | server_side=True, extra=extra, server=server) | 
|  | else: | 
|  | transport = self._make_socket_transport( | 
|  | conn, protocol, waiter=waiter, extra=extra, | 
|  | server=server) | 
|  |  | 
|  | try: | 
|  | yield from waiter | 
|  | except: | 
|  | transport.close() | 
|  | raise | 
|  |  | 
|  | # It's now up to the protocol to handle the connection. | 
|  | except Exception as exc: | 
|  | if self._debug: | 
|  | context = { | 
|  | 'message': ('Error on transport creation ' | 
|  | 'for incoming connection'), | 
|  | 'exception': exc, | 
|  | } | 
|  | if protocol is not None: | 
|  | context['protocol'] = protocol | 
|  | if transport is not None: | 
|  | context['transport'] = transport | 
|  | self.call_exception_handler(context) | 
|  |  | 
|  | def _ensure_fd_no_transport(self, fd): | 
|  | try: | 
|  | transport = self._transports[fd] | 
|  | except KeyError: | 
|  | pass | 
|  | else: | 
|  | if not transport.is_closing(): | 
|  | raise RuntimeError( | 
|  | 'File descriptor {!r} is used by transport {!r}'.format( | 
|  | fd, transport)) | 
|  |  | 
|  | def _add_reader(self, fd, callback, *args): | 
|  | self._check_closed() | 
|  | handle = events.Handle(callback, args, self) | 
|  | try: | 
|  | key = self._selector.get_key(fd) | 
|  | except KeyError: | 
|  | self._selector.register(fd, selectors.EVENT_READ, | 
|  | (handle, None)) | 
|  | else: | 
|  | mask, (reader, writer) = key.events, key.data | 
|  | self._selector.modify(fd, mask | selectors.EVENT_READ, | 
|  | (handle, writer)) | 
|  | if reader is not None: | 
|  | reader.cancel() | 
|  |  | 
|  | def _remove_reader(self, fd): | 
|  | if self.is_closed(): | 
|  | return False | 
|  | try: | 
|  | key = self._selector.get_key(fd) | 
|  | except KeyError: | 
|  | return False | 
|  | else: | 
|  | mask, (reader, writer) = key.events, key.data | 
|  | mask &= ~selectors.EVENT_READ | 
|  | if not mask: | 
|  | self._selector.unregister(fd) | 
|  | else: | 
|  | self._selector.modify(fd, mask, (None, writer)) | 
|  |  | 
|  | if reader is not None: | 
|  | reader.cancel() | 
|  | return True | 
|  | else: | 
|  | return False | 
|  |  | 
|  | def _add_writer(self, fd, callback, *args): | 
|  | self._check_closed() | 
|  | handle = events.Handle(callback, args, self) | 
|  | try: | 
|  | key = self._selector.get_key(fd) | 
|  | except KeyError: | 
|  | self._selector.register(fd, selectors.EVENT_WRITE, | 
|  | (None, handle)) | 
|  | else: | 
|  | mask, (reader, writer) = key.events, key.data | 
|  | self._selector.modify(fd, mask | selectors.EVENT_WRITE, | 
|  | (reader, handle)) | 
|  | if writer is not None: | 
|  | writer.cancel() | 
|  |  | 
|  | def _remove_writer(self, fd): | 
|  | """Remove a writer callback.""" | 
|  | if self.is_closed(): | 
|  | return False | 
|  | try: | 
|  | key = self._selector.get_key(fd) | 
|  | except KeyError: | 
|  | return False | 
|  | else: | 
|  | mask, (reader, writer) = key.events, key.data | 
|  | # Remove both writer and connector. | 
|  | mask &= ~selectors.EVENT_WRITE | 
|  | if not mask: | 
|  | self._selector.unregister(fd) | 
|  | else: | 
|  | self._selector.modify(fd, mask, (reader, None)) | 
|  |  | 
|  | if writer is not None: | 
|  | writer.cancel() | 
|  | return True | 
|  | else: | 
|  | return False | 
|  |  | 
|  | def add_reader(self, fd, callback, *args): | 
|  | """Add a reader callback.""" | 
|  | self._ensure_fd_no_transport(fd) | 
|  | return self._add_reader(fd, callback, *args) | 
|  |  | 
|  | def remove_reader(self, fd): | 
|  | """Remove a reader callback.""" | 
|  | self._ensure_fd_no_transport(fd) | 
|  | return self._remove_reader(fd) | 
|  |  | 
|  | def add_writer(self, fd, callback, *args): | 
|  | """Add a writer callback..""" | 
|  | self._ensure_fd_no_transport(fd) | 
|  | return self._add_writer(fd, callback, *args) | 
|  |  | 
|  | def remove_writer(self, fd): | 
|  | """Remove a writer callback.""" | 
|  | self._ensure_fd_no_transport(fd) | 
|  | return self._remove_writer(fd) | 
|  |  | 
|  | def sock_recv(self, sock, n): | 
|  | """Receive data from the socket. | 
|  |  | 
|  | The return value is a bytes object representing the data received. | 
|  | The maximum amount of data to be received at once is specified by | 
|  | nbytes. | 
|  |  | 
|  | This method is a coroutine. | 
|  | """ | 
|  | if self._debug and sock.gettimeout() != 0: | 
|  | raise ValueError("the socket must be non-blocking") | 
|  | fut = self.create_future() | 
|  | self._sock_recv(fut, False, sock, n) | 
|  | return fut | 
|  |  | 
|  | def _sock_recv(self, fut, registered, sock, n): | 
|  | # _sock_recv() can add itself as an I/O callback if the operation can't | 
|  | # be done immediately. Don't use it directly, call sock_recv(). | 
|  | fd = sock.fileno() | 
|  | if registered: | 
|  | # Remove the callback early.  It should be rare that the | 
|  | # selector says the fd is ready but the call still returns | 
|  | # EAGAIN, and I am willing to take a hit in that case in | 
|  | # order to simplify the common case. | 
|  | self.remove_reader(fd) | 
|  | if fut.cancelled(): | 
|  | return | 
|  | try: | 
|  | data = sock.recv(n) | 
|  | except (BlockingIOError, InterruptedError): | 
|  | self.add_reader(fd, self._sock_recv, fut, True, sock, n) | 
|  | except Exception as exc: | 
|  | fut.set_exception(exc) | 
|  | else: | 
|  | fut.set_result(data) | 
|  |  | 
|  | def sock_sendall(self, sock, data): | 
|  | """Send data to the socket. | 
|  |  | 
|  | The socket must be connected to a remote socket. This method continues | 
|  | to send data from data until either all data has been sent or an | 
|  | error occurs. None is returned on success. On error, an exception is | 
|  | raised, and there is no way to determine how much data, if any, was | 
|  | successfully processed by the receiving end of the connection. | 
|  |  | 
|  | This method is a coroutine. | 
|  | """ | 
|  | if self._debug and sock.gettimeout() != 0: | 
|  | raise ValueError("the socket must be non-blocking") | 
|  | fut = self.create_future() | 
|  | if data: | 
|  | self._sock_sendall(fut, False, sock, data) | 
|  | else: | 
|  | fut.set_result(None) | 
|  | return fut | 
|  |  | 
|  | def _sock_sendall(self, fut, registered, sock, data): | 
|  | fd = sock.fileno() | 
|  |  | 
|  | if registered: | 
|  | self.remove_writer(fd) | 
|  | if fut.cancelled(): | 
|  | return | 
|  |  | 
|  | try: | 
|  | n = sock.send(data) | 
|  | except (BlockingIOError, InterruptedError): | 
|  | n = 0 | 
|  | except Exception as exc: | 
|  | fut.set_exception(exc) | 
|  | return | 
|  |  | 
|  | if n == len(data): | 
|  | fut.set_result(None) | 
|  | else: | 
|  | if n: | 
|  | data = data[n:] | 
|  | self.add_writer(fd, self._sock_sendall, fut, True, sock, data) | 
|  |  | 
|  | @coroutine | 
|  | def sock_connect(self, sock, address): | 
|  | """Connect to a remote socket at address. | 
|  |  | 
|  | This method is a coroutine. | 
|  | """ | 
|  | if self._debug and sock.gettimeout() != 0: | 
|  | raise ValueError("the socket must be non-blocking") | 
|  |  | 
|  | if not hasattr(socket, 'AF_UNIX') or sock.family != socket.AF_UNIX: | 
|  | resolved = base_events._ensure_resolved( | 
|  | address, family=sock.family, proto=sock.proto, loop=self) | 
|  | if not resolved.done(): | 
|  | yield from resolved | 
|  | _, _, _, _, address = resolved.result()[0] | 
|  |  | 
|  | fut = self.create_future() | 
|  | self._sock_connect(fut, sock, address) | 
|  | return (yield from fut) | 
|  |  | 
|  | def _sock_connect(self, fut, sock, address): | 
|  | fd = sock.fileno() | 
|  | try: | 
|  | sock.connect(address) | 
|  | except (BlockingIOError, InterruptedError): | 
|  | # Issue #23618: When the C function connect() fails with EINTR, the | 
|  | # connection runs in background. We have to wait until the socket | 
|  | # becomes writable to be notified when the connection succeed or | 
|  | # fails. | 
|  | fut.add_done_callback( | 
|  | functools.partial(self._sock_connect_done, fd)) | 
|  | self.add_writer(fd, self._sock_connect_cb, fut, sock, address) | 
|  | except Exception as exc: | 
|  | fut.set_exception(exc) | 
|  | else: | 
|  | fut.set_result(None) | 
|  |  | 
|  | def _sock_connect_done(self, fd, fut): | 
|  | self.remove_writer(fd) | 
|  |  | 
|  | def _sock_connect_cb(self, fut, sock, address): | 
|  | if fut.cancelled(): | 
|  | return | 
|  |  | 
|  | try: | 
|  | err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) | 
|  | if err != 0: | 
|  | # Jump to any except clause below. | 
|  | raise OSError(err, 'Connect call failed %s' % (address,)) | 
|  | except (BlockingIOError, InterruptedError): | 
|  | # socket is still registered, the callback will be retried later | 
|  | pass | 
|  | except Exception as exc: | 
|  | fut.set_exception(exc) | 
|  | else: | 
|  | fut.set_result(None) | 
|  |  | 
|  | def sock_accept(self, sock): | 
|  | """Accept a connection. | 
|  |  | 
|  | The socket must be bound to an address and listening for connections. | 
|  | The return value is a pair (conn, address) where conn is a new socket | 
|  | object usable to send and receive data on the connection, and address | 
|  | is the address bound to the socket on the other end of the connection. | 
|  |  | 
|  | This method is a coroutine. | 
|  | """ | 
|  | if self._debug and sock.gettimeout() != 0: | 
|  | raise ValueError("the socket must be non-blocking") | 
|  | fut = self.create_future() | 
|  | self._sock_accept(fut, False, sock) | 
|  | return fut | 
|  |  | 
|  | def _sock_accept(self, fut, registered, sock): | 
|  | fd = sock.fileno() | 
|  | if registered: | 
|  | self.remove_reader(fd) | 
|  | if fut.cancelled(): | 
|  | return | 
|  | try: | 
|  | conn, address = sock.accept() | 
|  | conn.setblocking(False) | 
|  | except (BlockingIOError, InterruptedError): | 
|  | self.add_reader(fd, self._sock_accept, fut, True, sock) | 
|  | except Exception as exc: | 
|  | fut.set_exception(exc) | 
|  | else: | 
|  | fut.set_result((conn, address)) | 
|  |  | 
|  | def _process_events(self, event_list): | 
|  | for key, mask in event_list: | 
|  | fileobj, (reader, writer) = key.fileobj, key.data | 
|  | if mask & selectors.EVENT_READ and reader is not None: | 
|  | if reader._cancelled: | 
|  | self._remove_reader(fileobj) | 
|  | else: | 
|  | self._add_callback(reader) | 
|  | if mask & selectors.EVENT_WRITE and writer is not None: | 
|  | if writer._cancelled: | 
|  | self._remove_writer(fileobj) | 
|  | else: | 
|  | self._add_callback(writer) | 
|  |  | 
|  | def _stop_serving(self, sock): | 
|  | self._remove_reader(sock.fileno()) | 
|  | sock.close() | 
|  |  | 
|  |  | 
|  | class _SelectorTransport(transports._FlowControlMixin, | 
|  | transports.Transport): | 
|  |  | 
|  | max_size = 256 * 1024  # Buffer size passed to recv(). | 
|  |  | 
|  | _buffer_factory = bytearray  # Constructs initial value for self._buffer. | 
|  |  | 
|  | # Attribute used in the destructor: it must be set even if the constructor | 
|  | # is not called (see _SelectorSslTransport which may start by raising an | 
|  | # exception) | 
|  | _sock = None | 
|  |  | 
|  | def __init__(self, loop, sock, protocol, extra=None, server=None): | 
|  | super().__init__(extra, loop) | 
|  | self._extra['socket'] = sock | 
|  | self._extra['sockname'] = sock.getsockname() | 
|  | if 'peername' not in self._extra: | 
|  | try: | 
|  | self._extra['peername'] = sock.getpeername() | 
|  | except socket.error: | 
|  | self._extra['peername'] = None | 
|  | self._sock = sock | 
|  | self._sock_fd = sock.fileno() | 
|  | self._protocol = protocol | 
|  | self._protocol_connected = True | 
|  | self._server = server | 
|  | self._buffer = self._buffer_factory() | 
|  | self._conn_lost = 0  # Set when call to connection_lost scheduled. | 
|  | self._closing = False  # Set when close() called. | 
|  | if self._server is not None: | 
|  | self._server._attach() | 
|  | loop._transports[self._sock_fd] = self | 
|  |  | 
|  | def __repr__(self): | 
|  | info = [self.__class__.__name__] | 
|  | if self._sock is None: | 
|  | info.append('closed') | 
|  | elif self._closing: | 
|  | info.append('closing') | 
|  | info.append('fd=%s' % self._sock_fd) | 
|  | # test if the transport was closed | 
|  | if self._loop is not None and not self._loop.is_closed(): | 
|  | polling = _test_selector_event(self._loop._selector, | 
|  | self._sock_fd, selectors.EVENT_READ) | 
|  | if polling: | 
|  | info.append('read=polling') | 
|  | else: | 
|  | info.append('read=idle') | 
|  |  | 
|  | polling = _test_selector_event(self._loop._selector, | 
|  | self._sock_fd, | 
|  | selectors.EVENT_WRITE) | 
|  | if polling: | 
|  | state = 'polling' | 
|  | else: | 
|  | state = 'idle' | 
|  |  | 
|  | bufsize = self.get_write_buffer_size() | 
|  | info.append('write=<%s, bufsize=%s>' % (state, bufsize)) | 
|  | return '<%s>' % ' '.join(info) | 
|  |  | 
|  | def abort(self): | 
|  | self._force_close(None) | 
|  |  | 
|  | def set_protocol(self, protocol): | 
|  | self._protocol = protocol | 
|  |  | 
|  | def get_protocol(self): | 
|  | return self._protocol | 
|  |  | 
|  | def is_closing(self): | 
|  | return self._closing | 
|  |  | 
|  | def close(self): | 
|  | if self._closing: | 
|  | return | 
|  | self._closing = True | 
|  | self._loop._remove_reader(self._sock_fd) | 
|  | if not self._buffer: | 
|  | self._conn_lost += 1 | 
|  | self._loop._remove_writer(self._sock_fd) | 
|  | self._loop.call_soon(self._call_connection_lost, None) | 
|  |  | 
|  | # On Python 3.3 and older, objects with a destructor part of a reference | 
|  | # cycle are never destroyed. It's not more the case on Python 3.4 thanks | 
|  | # to the PEP 442. | 
|  | if compat.PY34: | 
|  | def __del__(self): | 
|  | if self._sock is not None: | 
|  | warnings.warn("unclosed transport %r" % self, ResourceWarning) | 
|  | self._sock.close() | 
|  |  | 
|  | def _fatal_error(self, exc, message='Fatal error on transport'): | 
|  | # Should be called from exception handler only. | 
|  | if isinstance(exc, base_events._FATAL_ERROR_IGNORE): | 
|  | if self._loop.get_debug(): | 
|  | logger.debug("%r: %s", self, message, exc_info=True) | 
|  | else: | 
|  | self._loop.call_exception_handler({ | 
|  | 'message': message, | 
|  | 'exception': exc, | 
|  | 'transport': self, | 
|  | 'protocol': self._protocol, | 
|  | }) | 
|  | self._force_close(exc) | 
|  |  | 
|  | def _force_close(self, exc): | 
|  | if self._conn_lost: | 
|  | return | 
|  | if self._buffer: | 
|  | self._buffer.clear() | 
|  | self._loop._remove_writer(self._sock_fd) | 
|  | if not self._closing: | 
|  | self._closing = True | 
|  | self._loop._remove_reader(self._sock_fd) | 
|  | self._conn_lost += 1 | 
|  | self._loop.call_soon(self._call_connection_lost, exc) | 
|  |  | 
|  | def _call_connection_lost(self, exc): | 
|  | try: | 
|  | if self._protocol_connected: | 
|  | self._protocol.connection_lost(exc) | 
|  | finally: | 
|  | self._sock.close() | 
|  | self._sock = None | 
|  | self._protocol = None | 
|  | self._loop = None | 
|  | server = self._server | 
|  | if server is not None: | 
|  | server._detach() | 
|  | self._server = None | 
|  |  | 
|  | def get_write_buffer_size(self): | 
|  | return len(self._buffer) | 
|  |  | 
|  |  | 
|  | class _SelectorSocketTransport(_SelectorTransport): | 
|  |  | 
|  | def __init__(self, loop, sock, protocol, waiter=None, | 
|  | extra=None, server=None): | 
|  | super().__init__(loop, sock, protocol, extra, server) | 
|  | self._eof = False | 
|  | self._paused = False | 
|  |  | 
|  | # Disable the Nagle algorithm -- small writes will be | 
|  | # sent without waiting for the TCP ACK.  This generally | 
|  | # decreases the latency (in some cases significantly.) | 
|  | _set_nodelay(self._sock) | 
|  |  | 
|  | self._loop.call_soon(self._protocol.connection_made, self) | 
|  | # only start reading when connection_made() has been called | 
|  | self._loop.call_soon(self._loop._add_reader, | 
|  | self._sock_fd, self._read_ready) | 
|  | if waiter is not None: | 
|  | # only wake up the waiter when connection_made() has been called | 
|  | self._loop.call_soon(futures._set_result_unless_cancelled, | 
|  | waiter, None) | 
|  |  | 
|  | def pause_reading(self): | 
|  | if self._closing: | 
|  | raise RuntimeError('Cannot pause_reading() when closing') | 
|  | if self._paused: | 
|  | raise RuntimeError('Already paused') | 
|  | self._paused = True | 
|  | self._loop._remove_reader(self._sock_fd) | 
|  | if self._loop.get_debug(): | 
|  | logger.debug("%r pauses reading", self) | 
|  |  | 
|  | def resume_reading(self): | 
|  | if not self._paused: | 
|  | raise RuntimeError('Not paused') | 
|  | self._paused = False | 
|  | if self._closing: | 
|  | return | 
|  | self._loop._add_reader(self._sock_fd, self._read_ready) | 
|  | if self._loop.get_debug(): | 
|  | logger.debug("%r resumes reading", self) | 
|  |  | 
|  | def _read_ready(self): | 
|  | if self._conn_lost: | 
|  | return | 
|  | try: | 
|  | data = self._sock.recv(self.max_size) | 
|  | except (BlockingIOError, InterruptedError): | 
|  | pass | 
|  | except Exception as exc: | 
|  | self._fatal_error(exc, 'Fatal read error on socket transport') | 
|  | else: | 
|  | if data: | 
|  | self._protocol.data_received(data) | 
|  | else: | 
|  | if self._loop.get_debug(): | 
|  | logger.debug("%r received EOF", self) | 
|  | keep_open = self._protocol.eof_received() | 
|  | if keep_open: | 
|  | # We're keeping the connection open so the | 
|  | # protocol can write more, but we still can't | 
|  | # receive more, so remove the reader callback. | 
|  | self._loop._remove_reader(self._sock_fd) | 
|  | else: | 
|  | self.close() | 
|  |  | 
|  | def write(self, data): | 
|  | if not isinstance(data, (bytes, bytearray, memoryview)): | 
|  | raise TypeError('data argument must be a bytes-like object, ' | 
|  | 'not %r' % type(data).__name__) | 
|  | if self._eof: | 
|  | raise RuntimeError('Cannot call write() after write_eof()') | 
|  | if not data: | 
|  | return | 
|  |  | 
|  | if self._conn_lost: | 
|  | if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES: | 
|  | logger.warning('socket.send() raised exception.') | 
|  | self._conn_lost += 1 | 
|  | return | 
|  |  | 
|  | if not self._buffer: | 
|  | # Optimization: try to send now. | 
|  | try: | 
|  | n = self._sock.send(data) | 
|  | except (BlockingIOError, InterruptedError): | 
|  | pass | 
|  | except Exception as exc: | 
|  | self._fatal_error(exc, 'Fatal write error on socket transport') | 
|  | return | 
|  | else: | 
|  | data = data[n:] | 
|  | if not data: | 
|  | return | 
|  | # Not all was written; register write handler. | 
|  | self._loop._add_writer(self._sock_fd, self._write_ready) | 
|  |  | 
|  | # Add it to the buffer. | 
|  | self._buffer.extend(data) | 
|  | self._maybe_pause_protocol() | 
|  |  | 
|  | def _write_ready(self): | 
|  | assert self._buffer, 'Data should not be empty' | 
|  |  | 
|  | if self._conn_lost: | 
|  | return | 
|  | try: | 
|  | n = self._sock.send(self._buffer) | 
|  | except (BlockingIOError, InterruptedError): | 
|  | pass | 
|  | except Exception as exc: | 
|  | self._loop._remove_writer(self._sock_fd) | 
|  | self._buffer.clear() | 
|  | self._fatal_error(exc, 'Fatal write error on socket transport') | 
|  | else: | 
|  | if n: | 
|  | del self._buffer[:n] | 
|  | self._maybe_resume_protocol()  # May append to buffer. | 
|  | if not self._buffer: | 
|  | self._loop._remove_writer(self._sock_fd) | 
|  | if self._closing: | 
|  | self._call_connection_lost(None) | 
|  | elif self._eof: | 
|  | self._sock.shutdown(socket.SHUT_WR) | 
|  |  | 
|  | def write_eof(self): | 
|  | if self._eof: | 
|  | return | 
|  | self._eof = True | 
|  | if not self._buffer: | 
|  | self._sock.shutdown(socket.SHUT_WR) | 
|  |  | 
|  | def can_write_eof(self): | 
|  | return True | 
|  |  | 
|  |  | 
|  | class _SelectorSslTransport(_SelectorTransport): | 
|  |  | 
|  | _buffer_factory = bytearray | 
|  |  | 
|  | def __init__(self, loop, rawsock, protocol, sslcontext, waiter=None, | 
|  | server_side=False, server_hostname=None, | 
|  | extra=None, server=None): | 
|  | if ssl is None: | 
|  | raise RuntimeError('stdlib ssl module not available') | 
|  |  | 
|  | if not sslcontext: | 
|  | sslcontext = sslproto._create_transport_context(server_side, server_hostname) | 
|  |  | 
|  | wrap_kwargs = { | 
|  | 'server_side': server_side, | 
|  | 'do_handshake_on_connect': False, | 
|  | } | 
|  | if server_hostname and not server_side: | 
|  | wrap_kwargs['server_hostname'] = server_hostname | 
|  | sslsock = sslcontext.wrap_socket(rawsock, **wrap_kwargs) | 
|  |  | 
|  | super().__init__(loop, sslsock, protocol, extra, server) | 
|  | # the protocol connection is only made after the SSL handshake | 
|  | self._protocol_connected = False | 
|  |  | 
|  | self._server_hostname = server_hostname | 
|  | self._waiter = waiter | 
|  | self._sslcontext = sslcontext | 
|  | self._paused = False | 
|  |  | 
|  | # SSL-specific extra info.  (peercert is set later) | 
|  | self._extra.update(sslcontext=sslcontext) | 
|  |  | 
|  | if self._loop.get_debug(): | 
|  | logger.debug("%r starts SSL handshake", self) | 
|  | start_time = self._loop.time() | 
|  | else: | 
|  | start_time = None | 
|  | self._on_handshake(start_time) | 
|  |  | 
|  | def _wakeup_waiter(self, exc=None): | 
|  | if self._waiter is None: | 
|  | return | 
|  | if not self._waiter.cancelled(): | 
|  | if exc is not None: | 
|  | self._waiter.set_exception(exc) | 
|  | else: | 
|  | self._waiter.set_result(None) | 
|  | self._waiter = None | 
|  |  | 
|  | def _on_handshake(self, start_time): | 
|  | try: | 
|  | self._sock.do_handshake() | 
|  | except ssl.SSLWantReadError: | 
|  | self._loop._add_reader(self._sock_fd, | 
|  | self._on_handshake, start_time) | 
|  | return | 
|  | except ssl.SSLWantWriteError: | 
|  | self._loop._add_writer(self._sock_fd, | 
|  | self._on_handshake, start_time) | 
|  | return | 
|  | except BaseException as exc: | 
|  | if self._loop.get_debug(): | 
|  | logger.warning("%r: SSL handshake failed", | 
|  | self, exc_info=True) | 
|  | self._loop._remove_reader(self._sock_fd) | 
|  | self._loop._remove_writer(self._sock_fd) | 
|  | self._sock.close() | 
|  | self._wakeup_waiter(exc) | 
|  | if isinstance(exc, Exception): | 
|  | return | 
|  | else: | 
|  | raise | 
|  |  | 
|  | self._loop._remove_reader(self._sock_fd) | 
|  | self._loop._remove_writer(self._sock_fd) | 
|  |  | 
|  | peercert = self._sock.getpeercert() | 
|  | if not hasattr(self._sslcontext, 'check_hostname'): | 
|  | # Verify hostname if requested, Python 3.4+ uses check_hostname | 
|  | # and checks the hostname in do_handshake() | 
|  | if (self._server_hostname and | 
|  | self._sslcontext.verify_mode != ssl.CERT_NONE): | 
|  | try: | 
|  | ssl.match_hostname(peercert, self._server_hostname) | 
|  | except Exception as exc: | 
|  | if self._loop.get_debug(): | 
|  | logger.warning("%r: SSL handshake failed " | 
|  | "on matching the hostname", | 
|  | self, exc_info=True) | 
|  | self._sock.close() | 
|  | self._wakeup_waiter(exc) | 
|  | return | 
|  |  | 
|  | # Add extra info that becomes available after handshake. | 
|  | self._extra.update(peercert=peercert, | 
|  | cipher=self._sock.cipher(), | 
|  | compression=self._sock.compression(), | 
|  | ssl_object=self._sock, | 
|  | ) | 
|  |  | 
|  | self._read_wants_write = False | 
|  | self._write_wants_read = False | 
|  | self._loop._add_reader(self._sock_fd, self._read_ready) | 
|  | self._protocol_connected = True | 
|  | self._loop.call_soon(self._protocol.connection_made, self) | 
|  | # only wake up the waiter when connection_made() has been called | 
|  | self._loop.call_soon(self._wakeup_waiter) | 
|  |  | 
|  | if self._loop.get_debug(): | 
|  | dt = self._loop.time() - start_time | 
|  | logger.debug("%r: SSL handshake took %.1f ms", self, dt * 1e3) | 
|  |  | 
|  | def pause_reading(self): | 
|  | # XXX This is a bit icky, given the comment at the top of | 
|  | # _read_ready().  Is it possible to evoke a deadlock?  I don't | 
|  | # know, although it doesn't look like it; write() will still | 
|  | # accept more data for the buffer and eventually the app will | 
|  | # call resume_reading() again, and things will flow again. | 
|  |  | 
|  | if self._closing: | 
|  | raise RuntimeError('Cannot pause_reading() when closing') | 
|  | if self._paused: | 
|  | raise RuntimeError('Already paused') | 
|  | self._paused = True | 
|  | self._loop._remove_reader(self._sock_fd) | 
|  | if self._loop.get_debug(): | 
|  | logger.debug("%r pauses reading", self) | 
|  |  | 
|  | def resume_reading(self): | 
|  | if not self._paused: | 
|  | raise RuntimeError('Not paused') | 
|  | self._paused = False | 
|  | if self._closing: | 
|  | return | 
|  | self._loop._add_reader(self._sock_fd, self._read_ready) | 
|  | if self._loop.get_debug(): | 
|  | logger.debug("%r resumes reading", self) | 
|  |  | 
|  | def _read_ready(self): | 
|  | if self._conn_lost: | 
|  | return | 
|  | if self._write_wants_read: | 
|  | self._write_wants_read = False | 
|  | self._write_ready() | 
|  |  | 
|  | if self._buffer: | 
|  | self._loop._add_writer(self._sock_fd, self._write_ready) | 
|  |  | 
|  | try: | 
|  | data = self._sock.recv(self.max_size) | 
|  | except (BlockingIOError, InterruptedError, ssl.SSLWantReadError): | 
|  | pass | 
|  | except ssl.SSLWantWriteError: | 
|  | self._read_wants_write = True | 
|  | self._loop._remove_reader(self._sock_fd) | 
|  | self._loop._add_writer(self._sock_fd, self._write_ready) | 
|  | except Exception as exc: | 
|  | self._fatal_error(exc, 'Fatal read error on SSL transport') | 
|  | else: | 
|  | if data: | 
|  | self._protocol.data_received(data) | 
|  | else: | 
|  | try: | 
|  | if self._loop.get_debug(): | 
|  | logger.debug("%r received EOF", self) | 
|  | keep_open = self._protocol.eof_received() | 
|  | if keep_open: | 
|  | logger.warning('returning true from eof_received() ' | 
|  | 'has no effect when using ssl') | 
|  | finally: | 
|  | self.close() | 
|  |  | 
|  | def _write_ready(self): | 
|  | if self._conn_lost: | 
|  | return | 
|  | if self._read_wants_write: | 
|  | self._read_wants_write = False | 
|  | self._read_ready() | 
|  |  | 
|  | if not (self._paused or self._closing): | 
|  | self._loop._add_reader(self._sock_fd, self._read_ready) | 
|  |  | 
|  | if self._buffer: | 
|  | try: | 
|  | n = self._sock.send(self._buffer) | 
|  | except (BlockingIOError, InterruptedError, ssl.SSLWantWriteError): | 
|  | n = 0 | 
|  | except ssl.SSLWantReadError: | 
|  | n = 0 | 
|  | self._loop._remove_writer(self._sock_fd) | 
|  | self._write_wants_read = True | 
|  | except Exception as exc: | 
|  | self._loop._remove_writer(self._sock_fd) | 
|  | self._buffer.clear() | 
|  | self._fatal_error(exc, 'Fatal write error on SSL transport') | 
|  | return | 
|  |  | 
|  | if n: | 
|  | del self._buffer[:n] | 
|  |  | 
|  | self._maybe_resume_protocol()  # May append to buffer. | 
|  |  | 
|  | if not self._buffer: | 
|  | self._loop._remove_writer(self._sock_fd) | 
|  | if self._closing: | 
|  | self._call_connection_lost(None) | 
|  |  | 
|  | def write(self, data): | 
|  | if not isinstance(data, (bytes, bytearray, memoryview)): | 
|  | raise TypeError('data argument must be a bytes-like object, ' | 
|  | 'not %r' % type(data).__name__) | 
|  | if not data: | 
|  | return | 
|  |  | 
|  | if self._conn_lost: | 
|  | if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES: | 
|  | logger.warning('socket.send() raised exception.') | 
|  | self._conn_lost += 1 | 
|  | return | 
|  |  | 
|  | if not self._buffer: | 
|  | self._loop._add_writer(self._sock_fd, self._write_ready) | 
|  |  | 
|  | # Add it to the buffer. | 
|  | self._buffer.extend(data) | 
|  | self._maybe_pause_protocol() | 
|  |  | 
|  | def can_write_eof(self): | 
|  | return False | 
|  |  | 
|  |  | 
|  | class _SelectorDatagramTransport(_SelectorTransport): | 
|  |  | 
|  | _buffer_factory = collections.deque | 
|  |  | 
|  | def __init__(self, loop, sock, protocol, address=None, | 
|  | waiter=None, extra=None): | 
|  | super().__init__(loop, sock, protocol, extra) | 
|  | self._address = address | 
|  | self._loop.call_soon(self._protocol.connection_made, self) | 
|  | # only start reading when connection_made() has been called | 
|  | self._loop.call_soon(self._loop._add_reader, | 
|  | self._sock_fd, self._read_ready) | 
|  | if waiter is not None: | 
|  | # only wake up the waiter when connection_made() has been called | 
|  | self._loop.call_soon(futures._set_result_unless_cancelled, | 
|  | waiter, None) | 
|  |  | 
|  | def get_write_buffer_size(self): | 
|  | return sum(len(data) for data, _ in self._buffer) | 
|  |  | 
|  | def _read_ready(self): | 
|  | if self._conn_lost: | 
|  | return | 
|  | try: | 
|  | data, addr = self._sock.recvfrom(self.max_size) | 
|  | except (BlockingIOError, InterruptedError): | 
|  | pass | 
|  | except OSError as exc: | 
|  | self._protocol.error_received(exc) | 
|  | except Exception as exc: | 
|  | self._fatal_error(exc, 'Fatal read error on datagram transport') | 
|  | else: | 
|  | self._protocol.datagram_received(data, addr) | 
|  |  | 
|  | def sendto(self, data, addr=None): | 
|  | if not isinstance(data, (bytes, bytearray, memoryview)): | 
|  | raise TypeError('data argument must be a bytes-like object, ' | 
|  | 'not %r' % type(data).__name__) | 
|  | if not data: | 
|  | return | 
|  |  | 
|  | if self._address and addr not in (None, self._address): | 
|  | raise ValueError('Invalid address: must be None or %s' % | 
|  | (self._address,)) | 
|  |  | 
|  | if self._conn_lost and self._address: | 
|  | if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES: | 
|  | logger.warning('socket.send() raised exception.') | 
|  | self._conn_lost += 1 | 
|  | return | 
|  |  | 
|  | if not self._buffer: | 
|  | # Attempt to send it right away first. | 
|  | try: | 
|  | if self._address: | 
|  | self._sock.send(data) | 
|  | else: | 
|  | self._sock.sendto(data, addr) | 
|  | return | 
|  | except (BlockingIOError, InterruptedError): | 
|  | self._loop._add_writer(self._sock_fd, self._sendto_ready) | 
|  | except OSError as exc: | 
|  | self._protocol.error_received(exc) | 
|  | return | 
|  | except Exception as exc: | 
|  | self._fatal_error(exc, | 
|  | 'Fatal write error on datagram transport') | 
|  | return | 
|  |  | 
|  | # Ensure that what we buffer is immutable. | 
|  | self._buffer.append((bytes(data), addr)) | 
|  | self._maybe_pause_protocol() | 
|  |  | 
|  | def _sendto_ready(self): | 
|  | while self._buffer: | 
|  | data, addr = self._buffer.popleft() | 
|  | try: | 
|  | if self._address: | 
|  | self._sock.send(data) | 
|  | else: | 
|  | self._sock.sendto(data, addr) | 
|  | except (BlockingIOError, InterruptedError): | 
|  | self._buffer.appendleft((data, addr))  # Try again later. | 
|  | break | 
|  | except OSError as exc: | 
|  | self._protocol.error_received(exc) | 
|  | return | 
|  | except Exception as exc: | 
|  | self._fatal_error(exc, | 
|  | 'Fatal write error on datagram transport') | 
|  | return | 
|  |  | 
|  | self._maybe_resume_protocol()  # May append to buffer. | 
|  | if not self._buffer: | 
|  | self._loop._remove_writer(self._sock_fd) | 
|  | if self._closing: | 
|  | self._call_connection_lost(None) |