| # -*- coding: utf-8 -*- |
| """ |
| hyper/http20/bufsocket.py |
| ~~~~~~~~~~~~~~~~~~~~~~~~~ |
| |
| This file implements a buffered socket wrapper. |
| |
| The purpose of this is to avoid the overhead of unnecessary syscalls while |
| allowing small reads from the network. This represents a potentially massive |
| performance optimisation at the cost of burning some memory in the userspace |
| process. |
| """ |
| import select |
| from .exceptions import ConnectionResetError, LineTooLongError |
| |
| |
| class BufferedSocket(object): |
| """ |
| A buffered socket wrapper. |
| |
| The purpose of this is to avoid the overhead of unnecessary syscalls while |
| allowing small reads from the network. This represents a potentially |
| massive performance optimisation at the cost of burning some memory in the |
| userspace process. |
| """ |
| def __init__(self, sck, buffer_size=1000): |
| """ |
| Create the buffered socket. |
| |
| :param sck: The socket to wrap. |
| :param buffer_size: The size of the backing buffer in bytes. This |
| parameter should be set to an appropriate value for your use case. |
| Small values of ``buffer_size`` increase the overhead of buffer |
| management: large values cause more memory to be used. |
| """ |
| # The wrapped socket. |
| self._sck = sck |
| |
| # The buffer we're using. |
| self._backing_buffer = bytearray(buffer_size) |
| self._buffer_view = memoryview(self._backing_buffer) |
| |
| # The size of the buffer. |
| self._buffer_size = buffer_size |
| |
| # The start index in the memory view. |
| self._index = 0 |
| |
| # The number of bytes in the buffer. |
| self._bytes_in_buffer = 0 |
| |
| @property |
| def _remaining_capacity(self): |
| """ |
| The maximum number of bytes the buffer could still contain. |
| """ |
| return self._buffer_size - self._index |
| |
| @property |
| def _buffer_end(self): |
| """ |
| The index of the first free byte in the buffer. |
| """ |
| return self._index + self._bytes_in_buffer |
| |
| @property |
| def can_read(self): |
| """ |
| Whether or not there is more data to read from the socket. |
| """ |
| read = select.select([self._sck], [], [], 0)[0] |
| if read: |
| return True |
| |
| return False |
| |
| @property |
| def buffer(self): |
| """ |
| Get access to the buffer itself. |
| """ |
| return self._buffer_view[self._index:self._buffer_end] |
| |
| def advance_buffer(self, count): |
| """ |
| Advances the buffer by the amount of data consumed outside the socket. |
| """ |
| self._index += count |
| self._bytes_in_buffer -= count |
| |
| def new_buffer(self): |
| """ |
| This method moves all the data in the backing buffer to the start of |
| a new, fresh buffer. This gives the ability to read much more data. |
| """ |
| def read_all_from_buffer(): |
| end = self._index + self._bytes_in_buffer |
| return self._buffer_view[self._index:end] |
| |
| new_buffer = bytearray(self._buffer_size) |
| new_buffer_view = memoryview(new_buffer) |
| new_buffer_view[0:self._bytes_in_buffer] = read_all_from_buffer() |
| |
| self._index = 0 |
| self._backing_buffer = new_buffer |
| self._buffer_view = new_buffer_view |
| |
| return |
| |
| def recv(self, amt): |
| """ |
| Read some data from the socket. |
| |
| :param amt: The amount of data to read. |
| :returns: A ``memoryview`` object containing the appropriate number of |
| bytes. The data *must* be copied out by the caller before the next |
| call to this function. |
| """ |
| # In this implementation you can never read more than the number of |
| # bytes in the buffer. |
| if amt > self._buffer_size: |
| amt = self._buffer_size |
| |
| # If the amount of data we've been asked to read is less than the |
| # remaining space in the buffer, we need to clear out the buffer and |
| # start over. |
| if amt > self._remaining_capacity: |
| self.new_buffer() |
| |
| # If there's still some room in the buffer, opportunistically attempt |
| # to read into it. |
| # If we don't actually _need_ the data (i.e. there's enough in the |
| # buffer to satisfy the request), use select to work out if the read |
| # attempt will block. If it will, don't bother reading. If we need the |
| # data, always do the read. |
| if self._bytes_in_buffer >= amt: |
| should_read = select.select([self._sck], [], [], 0)[0] |
| else: |
| should_read = True |
| |
| if (self._remaining_capacity > self._bytes_in_buffer and should_read): |
| count = self._sck.recv_into(self._buffer_view[self._buffer_end:]) |
| |
| # The socket just got closed. We should throw an exception if we |
| # were asked for more data than we can return. |
| if not count and amt > self._bytes_in_buffer: |
| raise ConnectionResetError() |
| self._bytes_in_buffer += count |
| |
| # Read out the bytes and update the index. |
| amt = min(amt, self._bytes_in_buffer) |
| data = self._buffer_view[self._index:self._index+amt] |
| |
| self._index += amt |
| self._bytes_in_buffer -= amt |
| |
| return data |
| |
| def fill(self): |
| """ |
| Attempts to fill the buffer as much as possible. It will block for at |
| most the time required to have *one* ``recv_into`` call return. |
| """ |
| if not self._remaining_capacity: |
| self.new_buffer() |
| |
| count = self._sck.recv_into(self._buffer_view[self._buffer_end:]) |
| if not count: |
| raise ConnectionResetError() |
| |
| self._bytes_in_buffer += count |
| |
| return |
| |
| def readline(self): |
| """ |
| Read up to a newline from the network and returns it. The implicit |
| maximum line length is the buffer size of the buffered socket. |
| |
| Note that, unlike recv, this method absolutely *does* block until it |
| can read the line. |
| |
| :returns: A ``memoryview`` object containing the appropriate number of |
| bytes. The data *must* be copied out by the caller before the next |
| call to this function. |
| """ |
| # First, check if there's anything in the buffer. This is one of those |
| # rare circumstances where this will work correctly on all platforms. |
| index = self._backing_buffer.find( |
| b'\n', |
| self._index, |
| self._index + self._bytes_in_buffer |
| ) |
| |
| if index != -1: |
| length = index + 1 - self._index |
| data = self._buffer_view[self._index:self._index+length] |
| self._index += length |
| self._bytes_in_buffer -= length |
| return data |
| |
| # In this case, we didn't find a newline in the buffer. To fix that, |
| # read some data into the buffer. To do our best to satisfy the read, |
| # we should shunt the data down in the buffer so that it's right at |
| # the start. We don't bother if we're already at the start of the |
| # buffer. |
| if self._index != 0: |
| self.new_buffer() |
| |
| while self._bytes_in_buffer < self._buffer_size: |
| count = self._sck.recv_into(self._buffer_view[self._buffer_end:]) |
| if not count: |
| raise ConnectionResetError() |
| |
| # We have some more data. Again, look for a newline in that gap. |
| first_new_byte = self._buffer_end |
| self._bytes_in_buffer += count |
| index = self._backing_buffer.find( |
| b'\n', |
| first_new_byte, |
| first_new_byte + count, |
| ) |
| |
| if index != -1: |
| # The length of the buffer is the index into the |
| # buffer at which we found the newline plus 1, minus the start |
| # index of the buffer, which really should be zero. |
| assert not self._index |
| length = index + 1 |
| data = self._buffer_view[:length] |
| self._index += length |
| self._bytes_in_buffer -= length |
| return data |
| |
| # If we got here, it means we filled the buffer without ever getting |
| # a newline. Time to throw an exception. |
| raise LineTooLongError() |
| |
| def __getattr__(self, name): |
| return getattr(self._sck, name) |