blob: f4d9247ed66b0edc5c8813ecc4c198064d47a7c5 [file] [log] [blame]
# -*- coding: utf-8 -*-
"""
hyper/http20/stream
~~~~~~~~~~~~~~~~~~~
Objects that make up the stream-level abstraction of hyper's HTTP/2 support.
These objects are not expected to be part of the public HTTP/2 API: they're
intended purely for use inside hyper's HTTP/2 abstraction.
Conceptually, a single HTTP/2 connection is made up of many streams: each
stream is an independent, bi-directional sequence of HTTP headers and data.
Each stream is identified by a monotonically increasing integer, assigned to
the stream by the endpoint that initiated the stream.
"""
from ..h2 import exceptions as h2Exceptions
from ..common.headers import HTTPHeaderMap
from .util import h2_safe_headers
import logging
log = logging.getLogger(__name__)
# Define the largest chunk of data we'll send in one go. Realistically, we
# should take the MSS into account but that's pretty dull, so let's just say
# 1kB and call it a day.
MAX_CHUNK = 1024
class Stream(object):
"""
A single HTTP/2 stream.
A stream is an independent, bi-directional sequence of HTTP headers and
data. Each stream is identified by a single integer. From a HTTP
perspective, a stream _approximately_ matches a single request-response
pair.
"""
def __init__(self,
stream_id,
window_manager,
connection,
send_outstanding_data,
recv_cb,
close_cb):
self.stream_id = stream_id
self.headers = HTTPHeaderMap()
# Set to a key-value set of the response headers once their
# HEADERS..CONTINUATION frame sequence finishes.
self.response_headers = None
# Set to a key-value set of the response trailers once their
# HEADERS..CONTINUATION frame sequence finishes.
self.response_trailers = None
# A dict mapping the promised stream ID of a pushed resource to a
# key-value set of its request headers. Entries are added once their
# PUSH_PROMISE..CONTINUATION frame sequence finishes.
self.promised_headers = {}
# Unconsumed response data chunks. Empties after every call to _read().
self.data = []
# Whether the remote side has completed the stream.
self.remote_closed = False
# Whether we have closed the stream.
self.local_closed = False
# There are two flow control windows: one for data we're sending,
# one for data being sent to us.
self._in_window_manager = window_manager
# Save off a reference to the state machine wrapped with lock.
self._conn = connection
# Save off a data callback.
self._send_outstanding_data = send_outstanding_data
self._recv_cb = recv_cb
self._close_cb = close_cb
def add_header(self, name, value, replace=False):
"""
Adds a single HTTP header to the headers to be sent on the request.
"""
if not replace:
self.headers[name] = value
else:
self.headers.replace(name, value)
def send_headers(self, end_stream=False):
"""
Sends the complete saved header block on the stream.
"""
headers = self.get_headers()
with self._conn as conn:
conn.send_headers(self.stream_id, headers, end_stream)
self._send_outstanding_data()
if end_stream:
self.local_closed = True
def send_data(self, data, final):
"""
Send some data on the stream. If this is the end of the data to be
sent, the ``final`` flag _must_ be set to True. If no data is to be
sent, set ``data`` to ``None``.
"""
# Define a utility iterator for file objects.
def file_iterator(fobj):
while True:
data = fobj.read(MAX_CHUNK)
yield data
if len(data) < MAX_CHUNK:
break
# Build the appropriate iterator for the data, in chunks of CHUNK_SIZE.
if hasattr(data, 'read'):
chunks = file_iterator(data)
else:
chunks = (data[i:i+MAX_CHUNK]
for i in range(0, len(data), MAX_CHUNK))
for chunk in chunks:
self._send_chunk(chunk, final)
def _read(self, amt=None):
"""
Read data from the stream. Unlike a normal read behaviour, this
function returns _at least_ ``amt`` data, but may return more.
"""
def listlen(list):
return sum(map(len, list))
# Keep reading until the stream is closed or we get enough data.
while (not self.remote_closed and
(amt is None or listlen(self.data) < amt)):
self._recv_cb(stream_id=self.stream_id)
result = b''.join(self.data)
self.data = []
return result
def _read_one_frame(self):
"""
Reads a single data frame from the stream and returns it.
"""
# Keep reading until the stream is closed or we have a data frame.
while not self.remote_closed and not self.data:
self._recv_cb(stream_id=self.stream_id)
try:
return self.data.pop(0)
except IndexError:
return None
def receive_response(self, event):
"""
Receive response headers.
"""
# TODO: If this is called while we're still sending data, we may want
# to stop sending that data and check the response. Early responses to
# big uploads are almost always a problem.
self.response_headers = HTTPHeaderMap(event.headers)
def receive_trailers(self, event):
"""
Receive response trailers.
"""
self.response_trailers = HTTPHeaderMap(event.headers)
def receive_push(self, event):
"""
Receive the request headers for a pushed stream.
"""
self.promised_headers[event.pushed_stream_id] = event.headers
def receive_data(self, event):
"""
Receive a chunk of data.
"""
size = event.flow_controlled_length
increment = self._in_window_manager._handle_frame(size)
# Append the data to the buffer.
self.data.append(event.data)
if increment:
try:
with self._conn as conn:
conn.increment_flow_control_window(
increment, stream_id=self.stream_id
)
except h2Exceptions.StreamClosedError:
# We haven't got to it yet, but the stream is already
# closed. We don't need to increment the window in this
# case!
pass
else:
self._send_outstanding_data()
def receive_end_stream(self, event):
"""
All of the data is returned now.
"""
self.remote_closed = True
def receive_reset(self, event):
"""
Stream forcefully reset.
"""
self.remote_closed = True
self._close_cb(self.stream_id)
def get_headers(self):
"""
Provides the headers to the connection object.
"""
# Strip any headers invalid in H2.
return h2_safe_headers(self.headers)
def getheaders(self):
"""
Once all data has been sent on this connection, returns a key-value set
of the headers of the response to the original request.
"""
# Keep reading until all headers are received.
while self.response_headers is None:
self._recv_cb(stream_id=self.stream_id)
# Find the Content-Length header if present.
self._in_window_manager.document_size = (
int(self.response_headers.get(b'content-length', [0])[0])
)
return self.response_headers
def gettrailers(self):
"""
Once all data has been sent on this connection, returns a key-value set
of the trailers of the response to the original request.
.. warning:: Note that this method requires that the stream is
totally exhausted. This means that, if you have not
completely read from the stream, all stream data will be
read into memory.
:returns: The key-value set of the trailers, or ``None`` if no trailers
were sent.
"""
# Keep reading until the stream is done.
while not self.remote_closed:
self._recv_cb(stream_id=self.stream_id)
return self.response_trailers
def get_pushes(self, capture_all=False):
"""
Returns a generator that yields push promises from the server. Note
that this method is not idempotent; promises returned in one call will
not be returned in subsequent calls. Iterating through generators
returned by multiple calls to this method simultaneously results in
undefined behavior.
:param capture_all: If ``False``, the generator will yield all buffered
push promises without blocking. If ``True``, the generator will
first yield all buffered push promises, then yield additional ones
as they arrive, and terminate when the original stream closes.
"""
while True:
for pair in self.promised_headers.items():
yield pair
self.promised_headers = {}
if not capture_all or self.remote_closed:
break
self._recv_cb(stream_id=self.stream_id)
def close(self, error_code=None):
"""
Closes the stream. If the stream is currently open, attempts to close
it as gracefully as possible.
:param error_code: (optional) The error code to reset the stream with.
:returns: Nothing.
"""
# FIXME: I think this is overbroad, but for now it's probably ok.
if not (self.remote_closed and self.local_closed):
try:
with self._conn as conn:
conn.reset_stream(self.stream_id, error_code or 0)
except h2Exceptions.ProtocolError:
# If for any reason we can't reset the stream, just
# tolerate it.
pass
else:
self._send_outstanding_data(tolerate_peer_gone=True)
self.remote_closed = True
self.local_closed = True
self._close_cb(self.stream_id)
@property
def _out_flow_control_window(self):
"""
The size of our outbound flow control window.
"""
with self._conn as conn:
return conn.local_flow_control_window(self.stream_id)
def _send_chunk(self, data, final):
"""
Implements most of the sending logic.
Takes a single chunk of size at most MAX_CHUNK, wraps it in a frame and
sends it. Optionally sets the END_STREAM flag if this is the last chunk
(determined by being of size less than MAX_CHUNK) and no more data is
to be sent.
"""
# If we don't fit in the connection window, try popping frames off the
# connection in hope that one might be a window update frame.
while len(data) > self._out_flow_control_window:
self._recv_cb()
# If the length of the data is less than MAX_CHUNK, we're probably
# at the end of the file. If this is the end of the data, mark it
# as END_STREAM.
end_stream = False
if len(data) < MAX_CHUNK and final:
end_stream = True
# Send the frame and decrement the flow control window.
with self._conn as conn:
conn.send_data(
stream_id=self.stream_id, data=data, end_stream=end_stream
)
self._send_outstanding_data()
if end_stream:
self.local_closed = True