blob: be6604add1d1c3b21ec752052384d8bb3d95da75 [file] [log] [blame]
# -*- coding: utf-8 -*-
"""Hypertext Transfer Protocol Version 2."""
import struct
import codecs
from . import dpkt
HTTP2_PREFACE = b'\x50\x52\x49\x20\x2a\x20\x48\x54\x54\x50\x2f\x32\x2e\x30\x0d\x0a\x0d\x0a\x53\x4d\x0d\x0a\x0d\x0a'
# Frame types
HTTP2_FRAME_DATA = 0
HTTP2_FRAME_HEADERS = 1
HTTP2_FRAME_PRIORITY = 2
HTTP2_FRAME_RST_STREAM = 3
HTTP2_FRAME_SETTINGS = 4
HTTP2_FRAME_PUSH_PROMISE = 5
HTTP2_FRAME_PING = 6
HTTP2_FRAME_GOAWAY = 7
HTTP2_FRAME_WINDOW_UPDATE = 8
HTTP2_FRAME_CONTINUATION = 9
# Flags
HTTP2_FLAG_END_STREAM = 0x01 # for DATA and HEADERS frames
HTTP2_FLAG_ACK = 0x01 # for SETTINGS and PING frames
HTTP2_FLAG_END_HEADERS = 0x04
HTTP2_FLAG_PADDED = 0x08
HTTP2_FLAG_PRIORITY = 0x20
# Settings
HTTP2_SETTINGS_HEADER_TABLE_SIZE = 0x1
HTTP2_SETTINGS_ENABLE_PUSH = 0x2
HTTP2_SETTINGS_MAX_CONCURRENT_STREAMS = 0x3
HTTP2_SETTINGS_INITIAL_WINDOW_SIZE = 0x4
HTTP2_SETTINGS_MAX_FRAME_SIZE = 0x5
HTTP2_SETTINGS_MAX_HEADER_LIST_SIZE = 0x6
# Error codes
HTTP2_NO_ERROR = 0x0
HTTP2_PROTOCOL_ERROR = 0x1
HTTP2_INTERNAL_ERROR = 0x2
HTTP2_FLOW_CONTROL_ERROR = 0x3
HTTP2_SETTINGS_TIMEOUT = 0x4
HTTP2_STREAM_CLOSED = 0x5
HTTP2_FRAME_SIZE_ERROR = 0x6
HTTP2_REFUSED_STREAM = 0x7
HTTP2_CANCEL = 0x8
HTTP2_COMPRESSION_ERROR = 0x9
HTTP2_CONNECT_ERROR = 0xa
HTTP2_ENHANCE_YOUR_CALM = 0xb
HTTP2_INADEQUATE_SECURITY = 0xc
HTTP2_HTTP_1_1_REQUIRED = 0xd
error_code_str = {
HTTP2_NO_ERROR: 'NO_ERROR',
HTTP2_PROTOCOL_ERROR: 'PROTOCOL_ERROR',
HTTP2_INTERNAL_ERROR: 'INTERNAL_ERROR',
HTTP2_FLOW_CONTROL_ERROR: 'FLOW_CONTROL_ERROR',
HTTP2_SETTINGS_TIMEOUT: 'SETTINGS_TIMEOUT',
HTTP2_STREAM_CLOSED: 'STREAM_CLOSED',
HTTP2_FRAME_SIZE_ERROR: 'FRAME_SIZE_ERROR',
HTTP2_REFUSED_STREAM: 'REFUSED_STREAM',
HTTP2_CANCEL: 'CANCEL',
HTTP2_COMPRESSION_ERROR: 'COMPRESSION_ERROR',
HTTP2_CONNECT_ERROR: 'CONNECT_ERROR',
HTTP2_ENHANCE_YOUR_CALM: 'ENHANCE_YOUR_CALM',
HTTP2_INADEQUATE_SECURITY: 'INADEQUATE_SECURITY',
HTTP2_HTTP_1_1_REQUIRED: 'HTTP_1_1_REQUIRED',
}
class HTTP2Exception(Exception):
pass
class Preface(dpkt.Packet):
__hdr__ = (
('preface', '24s', HTTP2_PREFACE),
)
def unpack(self, buf):
dpkt.Packet.unpack(self, buf)
if self.preface != HTTP2_PREFACE:
raise HTTP2Exception('Invalid HTTP/2 preface')
self.data = ''
class Frame(dpkt.Packet):
"""
An HTTP/2 frame as defined in RFC 7540
"""
# struct.unpack can't handle the 3-byte int, so we parse it as bytes
# (and store it as bytes so dpkt doesn't get confused), and turn it into
# an int in a user-facing property
__hdr__ = (
('length_bytes', '3s', 0),
('type', 'B', 0),
('flags', 'B', 0),
('stream_id', 'I', 0),
)
def unpack(self, buf):
dpkt.Packet.unpack(self, buf)
# only take the right number of bytes
self.data = self.data[:self.length]
if len(self.data) != self.length:
raise dpkt.NeedData
@property
def length(self):
return struct.unpack('!I', b'\x00' + self.length_bytes)[0]
class Priority(dpkt.Packet):
"""
Payload of a PRIORITY frame, also used in HEADERS frame with FLAG_PRIORITY.
Also used in the HEADERS frame if the PRIORITY flag is set.
"""
__hdr__ = (
('stream_dep', 'I', 0),
('weight', 'B', 0),
)
def unpack(self, buf):
dpkt.Packet.unpack(self, buf)
if len(self.data) != 0:
raise HTTP2Exception('Invalid number of bytes in PRIORITY frame')
self.exclusive = (self.stream_dep & 0x80000000) != 0
self.stream_dep &= 0x7fffffff
self.weight += 1
class Setting(dpkt.Packet):
"""
A key-value pair used in the SETTINGS frame.
"""
__hdr__ = (
('identifier', 'H', 0),
('value', 'I', 0),
)
class PaddedFrame(Frame):
"""
Abstract class for frame types that support the FLAG_PADDED flag: DATA,
HEADERS and PUSH_PROMISE.
"""
def unpack(self, buf):
Frame.unpack(self, buf)
if self.flags & HTTP2_FLAG_PADDED:
if self.length == 0:
raise HTTP2Exception('Missing padding length in PADDED frame')
self.pad_length = struct.unpack('B', self.data[0:1])[0]
if self.length <= self.pad_length:
raise HTTP2Exception('Missing padding bytes in PADDED frame')
self.unpadded_data = self.data[1:-self.pad_length]
else:
self.unpadded_data = self.data
class DataFrame(PaddedFrame):
"""
Frame of type DATA.
"""
@property
def payload(self):
return self.unpadded_data
class HeadersFrame(PaddedFrame):
"""
Frame of type HEADERS.
"""
def unpack(self, buf):
PaddedFrame.unpack(self, buf)
if self.flags & HTTP2_FLAG_PRIORITY:
if len(self.unpadded_data) < 5:
raise HTTP2Exception('Missing stream dependency in HEADERS frame with PRIORITY flag')
self.priority = Priority(self.unpadded_data[:5])
self.block_fragment = self.unpadded_data[5:]
else:
self.block_fragment = self.unpadded_data
class PriorityFrame(Frame):
"""
Frame of type PRIORITY.
"""
def unpack(self, buf):
Frame.unpack(self, buf)
self.priority = Priority(self.data)
class RSTStreamFrame(Frame):
"""
Frame of type RST_STREAM.
"""
def unpack(self, buf):
Frame.unpack(self, buf)
if self.length != 4:
raise HTTP2Exception('Invalid number of bytes in RST_STREAM frame (must be 4)')
self.error_code = struct.unpack('!I', self.data)[0]
class SettingsFrame(Frame):
"""
Frame of type SETTINGS.
"""
def unpack(self, buf):
Frame.unpack(self, buf)
if self.length % 6 != 0:
raise HTTP2Exception('Invalid number of bytes in SETTINGS frame (must be multiple of 6)')
self.settings = []
i = 0
while i < self.length:
self.settings.append(Setting(self.data[i:i + 6]))
i += 6
class PushPromiseFrame(PaddedFrame):
"""
Frame of type PUSH_PROMISE.
"""
def unpack(self, buf):
PaddedFrame.unpack(self, buf)
if len(self.unpadded_data) < 4:
raise HTTP2Exception('Missing promised stream ID in PUSH_PROMISE frame')
self.promised_id = struct.unpack('!I', self.data[:4])[0]
self.block_fragment = self.unpadded_data[4:]
class PingFrame(Frame):
"""
Frame of type PING.
"""
def unpack(self, buf):
Frame.unpack(self, buf)
if self.length != 8:
raise HTTP2Exception('Invalid number of bytes in PING frame (must be 8)')
class GoAwayFrame(Frame):
"""
Frame of type GO_AWAY.
"""
def unpack(self, buf):
Frame.unpack(self, buf)
if self.length < 8:
raise HTTP2Exception('Invalid number of bytes in GO_AWAY frame')
self.last_stream_id = struct.unpack('!I', self.data[:4])[0]
self.error_code = struct.unpack('!I', self.data[4:8])[0]
self.debug_data = self.data[8:]
class WindowUpdateFrame(Frame):
"""
Frame of type WINDOW_UPDATE.
"""
def unpack(self, buf):
Frame.unpack(self, buf)
if self.length != 4:
raise HTTP2Exception('Invalid number of bytes in WINDOW_UPDATE frame (must be 4)')
self.window_increment = struct.unpack('!I', self.data)[0]
class ContinuationFrame(Frame):
"""
Frame of type CONTINUATION.
"""
def unpack(self, buf):
Frame.unpack(self, buf)
self.block_fragment = self.data
FRAME_TYPES = {
HTTP2_FRAME_DATA: ('DATA', DataFrame),
HTTP2_FRAME_HEADERS: ('HEADERS', HeadersFrame),
HTTP2_FRAME_PRIORITY: ('PRIORITY', PriorityFrame),
HTTP2_FRAME_RST_STREAM: ('RST_STREAM', RSTStreamFrame),
HTTP2_FRAME_SETTINGS: ('SETTINGS', SettingsFrame),
HTTP2_FRAME_PUSH_PROMISE: ('PUSH_PROMISE', PushPromiseFrame),
HTTP2_FRAME_PING: ('PING', PingFrame),
HTTP2_FRAME_GOAWAY: ('GOAWAY', GoAwayFrame),
HTTP2_FRAME_WINDOW_UPDATE: ('WINDOW_UPDATE', WindowUpdateFrame),
HTTP2_FRAME_CONTINUATION: ('CONTINUATION', ContinuationFrame),
}
class FrameFactory(object):
def __new__(cls, buf):
if len(buf) < 4:
raise dpkt.NeedData
t = struct.unpack('B', buf[3:4])[0]
frame_type = FRAME_TYPES.get(t, None)
if frame_type is None:
raise HTTP2Exception('Invalid frame type: ' + hex(t))
return frame_type[1](buf)
def frame_multi_factory(buf, preface=False):
"""
Attempt to parse one or more Frame's out of buf
Args:
buf: string containing HTTP/2 frames. May have an incomplete frame at the
end.
preface: expect an HTTP/2 preface at the beginning of the buffer.
Returns:
[Frame]
int, total bytes consumed, != len(buf) if an incomplete frame was left at
the end.
"""
i = 0
n = len(buf)
frames = []
if preface:
try:
p = Preface(buf)
i += len(p)
except dpkt.NeedData:
return [], 0
while i < n:
try:
frame = FrameFactory(buf[i:])
frames.append(frame)
i += len(frame)
except dpkt.NeedData:
break
return frames, i
class TestFrame(object):
"""Some data found in real traffic"""
@classmethod
def setup_class(cls):
# First TLS AppData record sent by Firefox (decrypted)
record = codecs.decode(b'505249202a20485454502f322e300d0a'
b'0d0a534d0d0a0d0a00000c0400000000'
b'00000400020000000500004000000004'
b'08000000000000bf0001000005020000'
b'00000300000000c80000050200000000'
b'05000000006400000502000000000700'
b'00000000000005020000000009000000'
b'070000000502000000000b0000000300', 'hex')
cls.frames, cls.i = frame_multi_factory(record, preface=True)
def test_frame(self):
import pytest
# Too short
pytest.raises(dpkt.NeedData, Frame, codecs.decode(b'000001' # length
b'0000' # type, flags
b'deadbeef', # stream id
'hex'))
def test_data(self):
# Padded DATA frame
frame_data_padded = FrameFactory(codecs.decode(b'000008' # length
b'0008' # type, flags
b'12345678' # stream id
b'05' # pad length
b'abcd' # data
b'1122334455', # padding
'hex'))
assert (frame_data_padded.length == 8)
assert (frame_data_padded.type == HTTP2_FRAME_DATA)
assert (frame_data_padded.flags == HTTP2_FLAG_PADDED)
assert (frame_data_padded.stream_id == 0x12345678)
assert (frame_data_padded.data == b'\x05\xAB\xCD\x11\x22\x33\x44\x55')
assert (frame_data_padded.pad_length == 5)
assert (frame_data_padded.unpadded_data == b'\xAB\xCD')
assert (frame_data_padded.payload == b'\xAB\xCD')
# empty DATA frame
frame_data_empty_end = FrameFactory(codecs.decode(b'000000' # length
b'0001' # type, flags
b'deadbeef', # stream id
'hex'))
assert (frame_data_empty_end.length == 0)
assert (frame_data_empty_end.type == HTTP2_FRAME_DATA)
assert (frame_data_empty_end.flags == HTTP2_FLAG_END_STREAM)
assert (frame_data_empty_end.stream_id == 0xdeadbeef)
assert (frame_data_empty_end.data == b'')
assert (frame_data_empty_end.unpadded_data == b'')
assert (frame_data_empty_end.payload == b'')
import pytest
# Invalid padding
with pytest.raises(HTTP2Exception) as e:
DataFrame(codecs.decode(b'000000' # length
b'0008' # type, flags
b'12345678' # stream id
b'', # missing padding
'hex'))
assert (str(e.value) == 'Missing padding length in PADDED frame')
with pytest.raises(HTTP2Exception) as e:
DataFrame(codecs.decode(b'000001' # length
b'0008' # type, flags
b'12345678' # stream id
b'01'
b'', # missing padding bytes
'hex'))
assert (str(e.value) == 'Missing padding bytes in PADDED frame')
def test_headers(self):
frame_headers = FrameFactory(codecs.decode(b'000003' # length
b'0100' # type, flags
b'deadbeef' # stream id
b'f00baa', # block fragment
'hex'))
assert (frame_headers.length == 3)
assert (frame_headers.type == HTTP2_FRAME_HEADERS)
assert (frame_headers.flags == 0)
assert (frame_headers.stream_id == 0xdeadbeef)
assert (frame_headers.data == b'\xF0\x0B\xAA')
assert (frame_headers.unpadded_data == b'\xF0\x0B\xAA')
assert (frame_headers.block_fragment == b'\xF0\x0B\xAA')
frame_headers_prio = FrameFactory(codecs.decode(b'000008' # length
b'0120' # type, flags
b'deadbeef' # stream id
b'cafebabe10' # priority
b'f00baa', # block fragment
'hex'))
assert (frame_headers_prio.length == 8)
assert (frame_headers_prio.type == HTTP2_FRAME_HEADERS)
assert (frame_headers_prio.flags == HTTP2_FLAG_PRIORITY)
assert (frame_headers_prio.stream_id == 0xdeadbeef)
assert (frame_headers_prio.data == b'\xCA\xFE\xBA\xBE\x10\xF0\x0B\xAA')
assert (frame_headers_prio.unpadded_data == b'\xCA\xFE\xBA\xBE\x10\xF0\x0B\xAA')
assert (frame_headers_prio.priority.exclusive is True)
assert (frame_headers_prio.priority.stream_dep == 0x4afebabe)
assert (frame_headers_prio.priority.weight == 0x11)
assert (frame_headers_prio.block_fragment == b'\xF0\x0B\xAA')
import pytest
# Invalid priority
with pytest.raises(HTTP2Exception) as e:
HeadersFrame(codecs.decode(b'000002' # length
b'0120' # type, flags
b'deadbeef' # stream id
b'1234', # invalid priority
'hex'))
assert (str(e.value) == 'Missing stream dependency in HEADERS frame with PRIORITY flag')
def test_priority(self):
frame_priority = FrameFactory(codecs.decode(b'000005' # length
b'0200' # type, flags
b'deadbeef' # stream id
b'cafebabe' # stream dep
b'12', # weight
'hex'))
assert (frame_priority.length == 5)
assert (frame_priority.type == HTTP2_FRAME_PRIORITY)
assert (frame_priority.flags == 0)
assert (frame_priority.stream_id == 0xdeadbeef)
assert (frame_priority.data == b'\xCA\xFE\xBA\xBE\x12')
assert (frame_priority.priority.data == b'')
assert (frame_priority.priority.exclusive is True)
assert (frame_priority.priority.stream_dep == 0x4afebabe)
assert (frame_priority.priority.weight == 0x13)
import pytest
# Invalid length
with pytest.raises(HTTP2Exception) as e:
PriorityFrame(codecs.decode(b'000006' # length
b'0200' # type, flags
b'deadbeef' # stream id
b'cafebabe' # stream dep
b'12' # weight
b'00', # unexpected additional payload
'hex'))
assert (str(e.value) == 'Invalid number of bytes in PRIORITY frame')
def test_rst_stream(self):
frame_rst = FrameFactory(codecs.decode(b'000004' # length
b'0300' # type, flags
b'deadbeef' # stream id
b'0000000c', # error code
'hex'))
assert (frame_rst.length == 4)
assert (frame_rst.type == HTTP2_FRAME_RST_STREAM)
assert (frame_rst.flags == 0)
assert (frame_rst.stream_id == 0xdeadbeef)
assert (frame_rst.data == b'\x00\x00\x00\x0c')
assert (frame_rst.error_code == HTTP2_INADEQUATE_SECURITY)
import pytest
# Invalid length
with pytest.raises(HTTP2Exception) as e:
RSTStreamFrame(codecs.decode(b'000005' # length
b'0300' # type, flags
b'deadbeef' # stream id
b'0000000c' # error code
b'00', # unexpected additional payload
'hex'))
assert (str(e.value) == 'Invalid number of bytes in RST_STREAM frame (must be 4)')
def test_settings(self):
frame_settings = FrameFactory(codecs.decode(b'00000c' # length
b'0400' # type, flags
b'00000000' # stream id
# settings
b'0004' # setting id
b'00020000' # setting value
b'0005' # setting id
b'00004000', # setting value
'hex'))
assert (frame_settings.length == 12)
assert (frame_settings.type == HTTP2_FRAME_SETTINGS)
assert (frame_settings.flags == 0)
assert (frame_settings.stream_id == 0)
assert (len(frame_settings.settings) == 2)
assert (frame_settings.settings[0].identifier == HTTP2_SETTINGS_INITIAL_WINDOW_SIZE)
assert (frame_settings.settings[0].value == 0x20000)
assert (frame_settings.settings[1].identifier == HTTP2_SETTINGS_MAX_FRAME_SIZE)
assert (frame_settings.settings[1].value == 0x4000)
# Settings ack, with empty payload
frame_settings_ack = FrameFactory(codecs.decode(b'000000' # length
b'0401' # type, flags
b'00000000', # stream id
'hex'))
assert (frame_settings_ack.length == 0)
assert (frame_settings_ack.type == HTTP2_FRAME_SETTINGS)
assert (frame_settings_ack.flags == HTTP2_FLAG_ACK)
assert (frame_settings_ack.stream_id == 0)
assert (len(frame_settings_ack.settings) == 0)
import pytest
# Invalid length
with pytest.raises(HTTP2Exception) as e:
SettingsFrame(codecs.decode(b'000005' # length
b'0400' # type, flags
b'deadbeef' # stream id
b'1234567890', # invalid length
'hex'))
assert (str(e.value) == 'Invalid number of bytes in SETTINGS frame (must be multiple of 6)')
def test_push_promise(self):
frame_pp = FrameFactory(codecs.decode(b'000007' # length
b'0500' # type, flags
b'deadbeef' # stream id
b'cafebabe' # promised id
b'123456', # some block fragment
'hex'))
assert (frame_pp.length == 7)
assert (frame_pp.type == HTTP2_FRAME_PUSH_PROMISE)
assert (frame_pp.flags == 0)
assert (frame_pp.stream_id == 0xdeadbeef)
assert (frame_pp.promised_id == 0xcafebabe)
assert (frame_pp.block_fragment == b'\x12\x34\x56')
import pytest
# Invalid length
with pytest.raises(HTTP2Exception) as e:
PushPromiseFrame(codecs.decode(b'000003' # length
b'0500' # type, flags
b'deadbeef' # stream id
b'cafeba', # missing promised id
'hex'))
assert (str(e.value) == 'Missing promised stream ID in PUSH_PROMISE frame')
def test_ping(self):
frame_ping = FrameFactory(codecs.decode(b'000008' # length
b'0600' # type, flags
b'deadbeef' # stream id
b'cafebabe12345678', # user data
'hex'))
assert (frame_ping.length == 8)
assert (frame_ping.type == HTTP2_FRAME_PING)
assert (frame_ping.flags == 0)
assert (frame_ping.stream_id == 0xdeadbeef)
assert (frame_ping.data == b'\xCA\xFE\xBA\xBE\x12\x34\x56\x78')
import pytest
# Invalid length
with pytest.raises(HTTP2Exception) as e:
PingFrame(codecs.decode(b'000005' # length
b'0600' # type, flags
b'deadbeef' # stream id
b'1234567890', # invalid length
'hex'))
assert (str(e.value) == 'Invalid number of bytes in PING frame (must be 8)')
def test_goaway(self):
frame_goaway = FrameFactory(codecs.decode(b'00000a' # length
b'0700' # type, flags
b'deadbeef' # stream id
b'00000000' # last stream id
b'00000000' # error code
b'cafe', # debug data
'hex'))
assert (frame_goaway.length == 10)
assert (frame_goaway.type == HTTP2_FRAME_GOAWAY)
assert (frame_goaway.flags == 0)
assert (frame_goaway.stream_id == 0xdeadbeef)
assert (frame_goaway.last_stream_id == 0)
assert (frame_goaway.error_code == HTTP2_NO_ERROR)
assert (frame_goaway.debug_data == b'\xCA\xFE')
import pytest
# Invalid length
with pytest.raises(HTTP2Exception) as e:
GoAwayFrame(codecs.decode(b'000005' # length
b'0700' # type, flags
b'deadbeef' # stream id
b'1234567890', # invalid length
'hex'))
assert (str(e.value) == 'Invalid number of bytes in GO_AWAY frame')
def test_window_update(self):
frame_wu = FrameFactory(codecs.decode(b'000004' # length
b'0800' # type, flags
b'deadbeef' # stream id
b'12345678', # window increment
'hex'))
assert (frame_wu.length == 4)
assert (frame_wu.type == HTTP2_FRAME_WINDOW_UPDATE)
assert (frame_wu.flags == 0)
assert (frame_wu.stream_id == 0xdeadbeef)
assert (frame_wu.window_increment == 0x12345678)
import pytest
# Invalid length
with pytest.raises(HTTP2Exception) as e:
WindowUpdateFrame(codecs.decode(b'000005' # length
b'0800' # type, flags
b'deadbeef' # stream id
b'1234567890', # invalid length
'hex'))
assert (str(e.value) == 'Invalid number of bytes in WINDOW_UPDATE frame (must be 4)')
def test_continuation(self):
frame_cont = FrameFactory(codecs.decode(b'000003' # length
b'0900' # type, flags
b'deadbeef' # stream id
b'f00baa', # block fragment
'hex'))
assert (frame_cont.length == 3)
assert (frame_cont.type == HTTP2_FRAME_CONTINUATION)
assert (frame_cont.flags == 0)
assert (frame_cont.stream_id == 0xdeadbeef)
assert (frame_cont.block_fragment == b'\xF0\x0B\xAA')
def test_factory(self):
import pytest
# Too short
pytest.raises(dpkt.NeedData, FrameFactory, codecs.decode(b'000000', 'hex'))
# Invalid type
with pytest.raises(HTTP2Exception) as e:
FrameFactory(codecs.decode(b'000000' # length
b'abcd' # type, flags
b'deadbeef', # stream id
'hex'))
assert (str(e.value) == 'Invalid frame type: 0xab')
def test_preface(self):
import pytest
# Preface
pytest.raises(dpkt.NeedData, Preface,
codecs.decode(b'505249202a20485454502f322e300d0a', 'hex'))
pytest.raises(dpkt.NeedData, Preface, b'\x00' * 23)
with pytest.raises(HTTP2Exception) as e:
Preface(b'\x00' * 24)
assert (str(e.value) == 'Invalid HTTP/2 preface')
def test_multi(self):
assert (self.i == 128)
assert (len(self.frames) == 7)
assert (self.frames[0].length == 12)
assert (self.frames[1].length == 4)
assert (self.frames[2].length == 5)
assert (self.frames[3].length == 5)
assert (self.frames[4].length == 5)
assert (self.frames[5].length == 5)
assert (self.frames[6].length == 5)
assert (self.frames[0].type == HTTP2_FRAME_SETTINGS)
assert (self.frames[1].type == HTTP2_FRAME_WINDOW_UPDATE)
assert (self.frames[2].type == HTTP2_FRAME_PRIORITY)
assert (self.frames[3].type == HTTP2_FRAME_PRIORITY)
assert (self.frames[4].type == HTTP2_FRAME_PRIORITY)
assert (self.frames[5].type == HTTP2_FRAME_PRIORITY)
assert (self.frames[6].type == HTTP2_FRAME_PRIORITY)
assert (self.frames[0].flags == 0)
assert (self.frames[1].flags == 0)
assert (self.frames[2].flags == 0)
assert (self.frames[3].flags == 0)
assert (self.frames[4].flags == 0)
assert (self.frames[5].flags == 0)
assert (self.frames[6].flags == 0)
assert (self.frames[0].stream_id == 0)
assert (self.frames[1].stream_id == 0)
assert (self.frames[2].stream_id == 3)
assert (self.frames[3].stream_id == 5)
assert (self.frames[4].stream_id == 7)
assert (self.frames[5].stream_id == 9)
assert (self.frames[6].stream_id == 11)
frames, i = frame_multi_factory(
codecs.decode(b'505249202a20485454502f322e300d0a', 'hex'),
preface=True)
assert (len(frames) == 0)
assert (i == 0)
# Only preface was parsed
frames, i = frame_multi_factory(
codecs.decode(b'505249202a20485454502f322e300d0a'
b'0d0a534d0d0a0d0a00000c0400000000', 'hex'),
preface=True)
assert (len(frames) == 0)
assert (i == 24)