blob: 7f90c0f86efd952e74c2edd7d0598bbeaa09b98b [file] [log] [blame]
"""pcap Next Generation file format"""
# Spec: https://pcapng.github.io/pcapng/
# pylint: disable=no-member
# pylint: disable=attribute-defined-outside-init
from __future__ import print_function
from __future__ import absolute_import
from struct import pack as struct_pack, unpack as struct_unpack
from time import time
import sys
from . import dpkt
from .compat import BytesIO, intround
BYTE_ORDER_MAGIC = 0x1A2B3C4D
BYTE_ORDER_MAGIC_LE = 0x4D3C2B1A
PCAPNG_VERSION_MAJOR = 1
PCAPNG_VERSION_MINOR = 0
# Block types
PCAPNG_BT_IDB = 0x00000001 # Interface Description Block
PCAPNG_BT_PB = 0x00000002 # Packet Block (deprecated)
PCAPNG_BT_SPB = 0x00000003 # Simple Packet Block
PCAPNG_BT_EPB = 0x00000006 # Enhanced Packet Block
PCAPNG_BT_SHB = 0x0A0D0D0A # Section Header Block
# Options
PCAPNG_OPT_ENDOFOPT = 0 # end of options
PCAPNG_OPT_COMMENT = 1 # comment
# SHB options
PCAPNG_OPT_SHB_HARDWARE = 2 # description of the hardware
PCAPNG_OPT_SHB_OS = 3 # name of the operating system
PCAPNG_OPT_SHB_USERAPPL = 4 # name of the application
# IDB options
PCAPNG_OPT_IF_NAME = 2 # interface name
PCAPNG_OPT_IF_DESCRIPTION = 3 # interface description
PCAPNG_OPT_IF_IPV4ADDR = 4 # IPv4 network address and netmask for the interface
PCAPNG_OPT_IF_IPV6ADDR = 5 # IPv6 network address and prefix length for the interface
PCAPNG_OPT_IF_MACADDR = 6 # interface hardware MAC address
PCAPNG_OPT_IF_EUIADDR = 7 # interface hardware EUI address
PCAPNG_OPT_IF_SPEED = 8 # interface speed in bits/s
PCAPNG_OPT_IF_TSRESOL = 9 # timestamp resolution
PCAPNG_OPT_IF_TZONE = 10 # time zone
PCAPNG_OPT_IF_FILTER = 11 # capture filter
PCAPNG_OPT_IF_OS = 12 # operating system
PCAPNG_OPT_IF_FCSLEN = 13 # length of the Frame Check Sequence in bits
PCAPNG_OPT_IF_TSOFFSET = 14 # offset (in seconds) that must be added to packet timestamp
# <copied from pcap.py>
DLT_NULL = 0
DLT_EN10MB = 1
DLT_EN3MB = 2
DLT_AX25 = 3
DLT_PRONET = 4
DLT_CHAOS = 5
DLT_IEEE802 = 6
DLT_ARCNET = 7
DLT_SLIP = 8
DLT_PPP = 9
DLT_FDDI = 10
DLT_PFSYNC = 18
DLT_IEEE802_11 = 105
DLT_LINUX_SLL = 113
DLT_PFLOG = 117
DLT_IEEE802_11_RADIO = 127
if sys.platform.find('openbsd') != -1:
DLT_LOOP = 12
DLT_RAW = 14
else:
DLT_LOOP = 108
DLT_RAW = 12
dltoff = {DLT_NULL: 4, DLT_EN10MB: 14, DLT_IEEE802: 22, DLT_ARCNET: 6,
DLT_SLIP: 16, DLT_PPP: 4, DLT_FDDI: 21, DLT_PFLOG: 48, DLT_PFSYNC: 4,
DLT_LOOP: 4, DLT_LINUX_SLL: 16}
# </copied from pcap.py>
def _swap32b(i):
"""Swap endianness of an uint32"""
return struct_unpack('<I', struct_pack('>I', i))[0]
def _align32b(i):
"""Return int `i` aligned to the 32-bit boundary"""
r = i % 4
return i if not r else i + 4 - r
def _padded(s):
"""Return bytes `s` padded with zeroes to align to the 32-bit boundary"""
return struct_pack('%ss' % _align32b(len(s)), s)
def _padded_tolen(s, tolen):
"""Return bytes `s` padded with `tolen` zeroes to align to the 32-bit boundary"""
return struct_pack('%ss' % tolen, s)
def _padlen(s):
"""Return size of padding required to align str `s` to the 32-bit boundary"""
return _align32b(len(s)) - len(s)
class _PcapngBlock(dpkt.Packet):
"""Base class for a pcapng block with Options"""
__hdr__ = (
('type', 'I', 0), # block type
('len', 'I', 12), # block total length: total size of this block, in octets
# ( body, variable size )
('_len', 'I', 12), # dup of len
)
def unpack_hdr(self, buf):
dpkt.Packet.unpack(self, buf)
def unpack(self, buf):
dpkt.Packet.unpack(self, buf)
if self.len > len(buf):
raise dpkt.NeedData
self._do_unpack_options(buf)
def _do_unpack_options(self, buf, oo=None):
self.opts = []
self.data = ''
oo = oo or self.__hdr_len__ - 4 # options offset
ol = self.len - oo - 4 # length
opts_buf = buf[oo:oo + ol]
while opts_buf:
opt = (PcapngOptionLE(opts_buf) if self.__hdr_fmt__[0] == '<'
else PcapngOption(opts_buf))
self.opts.append(opt)
opts_buf = opts_buf[len(opt):]
if opt.code == PCAPNG_OPT_ENDOFOPT:
break
# duplicate total length field
self._len = struct_unpack(self.__hdr_fmt__[0] + 'I', buf[-4:])[0]
if self._len != self.len:
raise dpkt.UnpackError('length fields do not match')
def _do_pack_options(self):
if not getattr(self, 'opts', None):
return b''
if self.opts[-1].code != PCAPNG_OPT_ENDOFOPT:
raise dpkt.PackError('options must end with opt_endofopt')
return b''.join(bytes(o) for o in self.opts)
def __bytes__(self):
opts_buf = self._do_pack_options()
n = len(opts_buf) + self.__hdr_len__
self.len = n
self._len = n
hdr_buf = self._pack_hdr(self.type, n, n)
return b''.join([hdr_buf[:-4], opts_buf, hdr_buf[-4:]])
def __len__(self):
if not getattr(self, 'opts', None):
return self.__hdr_len__
opts_len = sum(len(o) for o in self.opts)
return self.__hdr_len__ + opts_len
class PcapngBlockLE(_PcapngBlock):
__byte_order__ = '<'
class PcapngOption(dpkt.Packet):
"""A single Option"""
__hdr__ = (
('code', 'H', PCAPNG_OPT_ENDOFOPT),
('len', 'H', 0),
)
def unpack(self, buf):
dpkt.Packet.unpack(self, buf)
self.data = buf[self.__hdr_len__:self.__hdr_len__ + self.len]
# decode comment
if self.code == PCAPNG_OPT_COMMENT:
self.text = self.data.decode('utf-8')
def __bytes__(self):
# encode comment
if self.code == PCAPNG_OPT_COMMENT:
text = getattr(self, 'text', self.data)
self.data = text.encode('utf-8') if not isinstance(text, bytes) else text
self.len = len(self.data)
hdr = self._pack_hdr(self.code, self.len)
return hdr + _padded(self.data)
def __len__(self):
return self.__hdr_len__ + len(self.data) + _padlen(self.data)
def __repr__(self):
if self.code == PCAPNG_OPT_ENDOFOPT:
return '{0}(opt_endofopt)'.format(self.__class__.__name__)
else:
return dpkt.Packet.__repr__(self)
class PcapngOptionLE(PcapngOption):
__byte_order__ = '<'
class SectionHeaderBlock(_PcapngBlock):
"""Section Header block"""
__hdr__ = (
('type', 'I', PCAPNG_BT_SHB),
('len', 'I', 28),
('bom', 'I', BYTE_ORDER_MAGIC),
('v_major', 'H', PCAPNG_VERSION_MAJOR),
('v_minor', 'H', PCAPNG_VERSION_MINOR),
('sec_len', 'q', -1), # section length, -1 = auto
# ( options, variable size )
('_len', 'I', 28)
)
def __bytes__(self):
opts_buf = self._do_pack_options()
n = len(opts_buf) + self.__hdr_len__
self.len = n
self._len = n
hdr_buf = self._pack_hdr(
self.type,
n,
self.bom,
self.v_major,
self.v_minor,
self.sec_len,
n,
)
return b''.join([hdr_buf[:-4], opts_buf, hdr_buf[-4:]])
class SectionHeaderBlockLE(SectionHeaderBlock):
__byte_order__ = '<'
class InterfaceDescriptionBlock(_PcapngBlock):
"""Interface Description block"""
__hdr__ = (
('type', 'I', PCAPNG_BT_IDB),
('len', 'I', 20),
('linktype', 'H', DLT_EN10MB),
('_reserved', 'H', 0),
('snaplen', 'I', 1500),
# ( options, variable size )
('_len', 'I', 20)
)
def __bytes__(self):
opts_buf = self._do_pack_options()
n = len(opts_buf) + self.__hdr_len__
self.len = n
self._len = n
hdr_buf = self._pack_hdr(
self.type,
n,
self.linktype,
self._reserved,
self.snaplen,
n,
)
return b''.join([hdr_buf[:-4], opts_buf, hdr_buf[-4:]])
class InterfaceDescriptionBlockLE(InterfaceDescriptionBlock):
__byte_order__ = '<'
class EnhancedPacketBlock(_PcapngBlock):
"""Enhanced Packet block"""
__hdr__ = (
('type', 'I', PCAPNG_BT_EPB),
('len', 'I', 64),
('iface_id', 'I', 0),
('ts_high', 'I', 0), # timestamp high
('ts_low', 'I', 0), # timestamp low
('caplen', 'I', 0), # captured len, size of pkt_data
('pkt_len', 'I', 0), # actual packet len
# ( pkt_data, variable size )
# ( options, variable size )
('_len', 'I', 64)
)
def unpack(self, buf):
dpkt.Packet.unpack(self, buf)
if self.len > len(buf):
raise dpkt.NeedData
# packet data
po = self.__hdr_len__ - 4 # offset of pkt_data
self.pkt_data = buf[po:po + self.caplen]
# skip padding between pkt_data and options
opts_offset = po + _align32b(self.caplen)
self._do_unpack_options(buf, opts_offset)
def __bytes__(self):
pkt_buf = self.pkt_data
pkt_len = len(pkt_buf)
self.caplen = pkt_len
self.pkt_len = pkt_len
opts_buf = self._do_pack_options()
n = self.__hdr_len__ + _align32b(self.caplen) + len(opts_buf)
self.len = n
self._len = n
hdr_buf = self._pack_hdr(
self.type,
n,
self.iface_id,
self.ts_high,
self.ts_low,
pkt_len,
pkt_len,
n
)
return b''.join([hdr_buf[:-4], _padded(pkt_buf), opts_buf, hdr_buf[-4:]])
def __len__(self):
opts_len = sum(len(o) for o in self.opts)
return self.__hdr_len__ + _align32b(self.caplen) + opts_len
class EnhancedPacketBlockLE(EnhancedPacketBlock):
__byte_order__ = '<'
class Writer(object):
"""Simple pcapng dumpfile writer."""
__le = sys.byteorder == 'little'
def __init__(self, fileobj, snaplen=1500, linktype=DLT_EN10MB, shb=None, idb=None):
"""
Create a pcapng dumpfile writer for the given fileobj.
shb can be an instance of SectionHeaderBlock(LE)
idb can be an instance of InterfaceDescriptionBlock(LE) (or sequence of them)
"""
self.__f = fileobj
if shb:
self._validate_block('shb', shb, SectionHeaderBlock)
if idb:
try:
for idb_ in idb:
self._validate_block('idb', idb_, InterfaceDescriptionBlock)
except (TypeError, ValueError): # not iter or _validate_block failed
self._validate_block('idb', idb, InterfaceDescriptionBlock)
idb = [idb]
if self.__le:
shb = shb or SectionHeaderBlockLE()
idb = idb or [InterfaceDescriptionBlockLE(snaplen=snaplen, linktype=linktype)]
self._kls = EnhancedPacketBlockLE
else:
shb = shb or SectionHeaderBlock()
idb = idb or [InterfaceDescriptionBlock(snaplen=snaplen, linktype=linktype)]
self._kls = EnhancedPacketBlock
self.__f.write(bytes(shb))
for idb_ in idb:
self.__f.write(bytes(idb_))
def _validate_block(self, arg_name, blk, expected_cls):
"""Check a user-defined block for correct type and endianness"""
if not isinstance(blk, expected_cls):
raise ValueError('{0}: expecting class {1}'.format(
arg_name, expected_cls.__name__))
if self.__le and blk.__hdr_fmt__[0] == '>':
raise ValueError('{0}: expecting class {1}LE on a little-endian system'.format(
arg_name, expected_cls.__name__))
if not self.__le and blk.__hdr_fmt__[0] == '<':
raise ValueError('{0}: expecting class {1} on a big-endian system'.format(
arg_name, expected_cls.__name__.replace('LE', '')))
def writepkt(self, pkt, ts=None):
"""
Write a single packet with an optional timestamp.
Args:
pkt: buffer or instance of EnhancedPacketBlock(LE)
ts: Unix timestamp in seconds since Epoch (e.g. 1454725786.99)
"""
if isinstance(pkt, EnhancedPacketBlock):
self._validate_block('pkt', pkt, EnhancedPacketBlock)
if ts is not None: # ts as an argument gets precedence
ts = intround(ts * 1e6)
elif pkt.ts_high == pkt.ts_low == 0:
ts = intround(time() * 1e6)
if ts is not None:
pkt.ts_high = ts >> 32
pkt.ts_low = ts & 0xffffffff
self.__f.write(bytes(pkt))
return
# pkt is a buffer - wrap it into an EPB
if ts is None:
ts = time()
self.writepkt_time(pkt, ts)
def writepkt_time(self, pkt, ts):
"""
Write a single packet with a mandatory timestamp.
Args:
pkt: a buffer
ts: Unix timestamp in seconds since Epoch (e.g. 1454725786.99)
"""
ts = intround(ts * 1e6) # to int microseconds
s = pkt
n = len(s)
epb = self._kls(
ts_high=ts >> 32,
ts_low=ts & 0xffffffff,
caplen=n,
pkt_len=n,
pkt_data=s
)
self.__f.write(bytes(epb))
def writepkts(self, pkts):
"""
Take an iterable of (ts, pkt), and write to file.
"""
kls = self._kls()
ph = kls._pack_hdr
fd = self.__f
iface_id = kls.iface_id
pkt_type = kls.type
opts_buf = kls._do_pack_options()
opts_len = len(opts_buf)
hdr_len = kls.__hdr_len__
precalc_n = hdr_len + opts_len
for ts, pkt in pkts:
ts = intround(ts * 1e6) # to int microseconds
pkt_len = len(pkt)
pkt_len_align = _align32b(pkt_len)
n = precalc_n + pkt_len_align
hdr_buf = ph(
pkt_type,
n,
iface_id,
ts >> 32,
ts & 0xffffffff,
pkt_len,
pkt_len,
n
)
buf = b''.join([
hdr_buf[:-4],
_padded_tolen(pkt, pkt_len_align),
opts_buf,
hdr_buf[-4:]
])
fd.write(buf)
def close(self):
self.__f.close()
class Reader(object):
"""Simple pypcap-compatible pcapng file reader."""
def __init__(self, fileobj):
self.name = getattr(fileobj, 'name', '<{0}>'.format(fileobj.__class__.__name__))
self.__f = fileobj
shb = SectionHeaderBlock()
buf = self.__f.read(shb.__hdr_len__)
if len(buf) < shb.__hdr_len__:
raise ValueError('invalid pcapng header')
# unpack just the header since endianness is not known
shb.unpack_hdr(buf)
if shb.type != PCAPNG_BT_SHB:
raise ValueError('invalid pcapng header: not a SHB')
# determine the correct byte order and reload full SHB
if shb.bom == BYTE_ORDER_MAGIC_LE:
self.__le = True
buf += self.__f.read(_swap32b(shb.len) - shb.__hdr_len__)
shb = SectionHeaderBlockLE(buf)
elif shb.bom == BYTE_ORDER_MAGIC:
self.__le = False
buf += self.__f.read(shb.len - shb.__hdr_len__)
shb = SectionHeaderBlock(buf)
else:
raise ValueError('unknown endianness')
# check if this version is supported
if shb.v_major != PCAPNG_VERSION_MAJOR:
raise ValueError('unknown pcapng version {0}.{1}'.format(shb.v_major, shb.v_minor,))
# look for a mandatory IDB
idb = None
while 1:
buf = self.__f.read(8)
if len(buf) < 8:
break
blk_type, blk_len = struct_unpack('<II' if self.__le else '>II', buf)
buf += self.__f.read(blk_len - 8)
if blk_type == PCAPNG_BT_IDB:
idb = (InterfaceDescriptionBlockLE(buf) if self.__le
else InterfaceDescriptionBlock(buf))
break
# just skip other blocks
if idb is None:
raise ValueError('IDB not found')
# set timestamp resolution and offset
self._divisor = float(1e6) # defaults
self._tsoffset = 0
for opt in idb.opts:
if opt.code == PCAPNG_OPT_IF_TSRESOL:
# if MSB=0, the remaining bits is a neg power of 10 (e.g. 6 means microsecs)
# if MSB=1, the remaining bits is a neg power of 2 (e.g. 10 means 1/1024 of second)
opt_val = struct_unpack('b', opt.data)[0]
pow_num = 2 if opt_val & 0b10000000 else 10
self._divisor = float(pow_num ** (opt_val & 0b01111111))
elif opt.code == PCAPNG_OPT_IF_TSOFFSET:
# 64-bit int that specifies an offset (in seconds) that must be added to the
# timestamp of each packet
self._tsoffset = struct_unpack('<q' if self.__le else '>q', opt.data)[0]
if idb.linktype in dltoff:
self.dloff = dltoff[idb.linktype]
else:
self.dloff = 0
self.idb = idb
self.snaplen = idb.snaplen
self.filter = ''
self.__iter = iter(self)
@property
def fd(self):
return self.__f.fileno()
def fileno(self):
return self.fd
def datalink(self):
return self.idb.linktype
def setfilter(self, value, optimize=1):
raise NotImplementedError
def readpkts(self):
return list(self)
def __next__(self):
return next(self.__iter)
next = __next__ # Python 2 compat
def dispatch(self, cnt, callback, *args):
"""Collect and process packets with a user callback.
Return the number of packets processed, or 0 for a savefile.
Arguments:
cnt -- number of packets to process;
or 0 to process all packets until EOF
callback -- function with (timestamp, pkt, *args) prototype
*args -- optional arguments passed to callback on execution
"""
processed = 0
if cnt > 0:
for _ in range(cnt):
try:
ts, pkt = next(iter(self))
except StopIteration:
break
callback(ts, pkt, *args)
processed += 1
else:
for ts, pkt in self:
callback(ts, pkt, *args)
processed += 1
return processed
def loop(self, callback, *args):
self.dispatch(0, callback, *args)
def __iter__(self):
while 1:
buf = self.__f.read(8)
if len(buf) < 8:
break
blk_type, blk_len = struct_unpack('<II' if self.__le else '>II', buf)
buf += self.__f.read(blk_len - 8)
if blk_type == PCAPNG_BT_EPB:
epb = EnhancedPacketBlockLE(buf) if self.__le else EnhancedPacketBlock(buf)
ts = self._tsoffset + (((epb.ts_high << 32) | epb.ts_low) / self._divisor)
yield (ts, epb.pkt_data)
# just ignore other blocks
#########
# TESTS #
#########
def test_shb():
"""Test SHB with options"""
buf = (
b'\x0a\x0d\x0d\x0a\x58\x00\x00\x00\x4d\x3c\x2b\x1a\x01\x00\x00\x00\xff\xff\xff\xff\xff\xff'
b'\xff\xff\x04\x00\x31\x00\x54\x53\x68\x61\x72\x6b\x20\x31\x2e\x31\x30\x2e\x30\x72\x63\x32'
b'\x20\x28\x53\x56\x4e\x20\x52\x65\x76\x20\x34\x39\x35\x32\x36\x20\x66\x72\x6f\x6d\x20\x2f'
b'\x74\x72\x75\x6e\x6b\x2d\x31\x2e\x31\x30\x29\x00\x00\x00\x00\x00\x00\x00\x58\x00\x00\x00')
opt_buf = b'\x04\x00\x31\x00TShark 1.10.0rc2 (SVN Rev 49526 from /trunk-1.10)\x00\x00\x00'
# block unpacking
shb = SectionHeaderBlockLE(buf)
assert shb.type == PCAPNG_BT_SHB
assert shb.bom == BYTE_ORDER_MAGIC
assert shb.v_major == 1
assert shb.v_minor == 0
assert shb.sec_len == -1
assert shb.data == ''
# options unpacking
assert len(shb.opts) == 2
assert shb.opts[0].code == PCAPNG_OPT_SHB_USERAPPL
assert shb.opts[0].data == b'TShark 1.10.0rc2 (SVN Rev 49526 from /trunk-1.10)'
assert shb.opts[0].len == len(shb.opts[0].data)
assert shb.opts[1].code == PCAPNG_OPT_ENDOFOPT
assert shb.opts[1].len == 0
# option packing
assert str(shb.opts[0]) == str(opt_buf)
assert len(shb.opts[0]) == len(opt_buf)
assert bytes(shb.opts[1]) == b'\x00\x00\x00\x00'
# block packing
assert bytes(shb) == bytes(buf)
assert str(shb) == str(buf)
assert len(shb) == len(buf)
def test_idb():
"""Test IDB with options"""
buf = (
b'\x01\x00\x00\x00\x20\x00\x00\x00\x01\x00\x00\x00\xff\xff\x00\x00\x09\x00\x01\x00\x06\x00'
b'\x00\x00\x00\x00\x00\x00\x20\x00\x00\x00')
# block unpacking
idb = InterfaceDescriptionBlockLE(buf)
assert idb.type == PCAPNG_BT_IDB
assert idb.linktype == DLT_EN10MB
assert idb.snaplen == 0xffff
assert idb.data == ''
# options unpacking
assert len(idb.opts) == 2
assert idb.opts[0].code == PCAPNG_OPT_IF_TSRESOL
assert idb.opts[0].len == 1
assert idb.opts[0].data == b'\x06'
assert idb.opts[1].code == PCAPNG_OPT_ENDOFOPT
assert idb.opts[1].len == 0
# option packing
assert bytes(idb.opts[0]) == b'\x09\x00\x01\x00\x06\x00\x00\x00'
assert len(idb.opts[0]) == 8
assert bytes(idb.opts[1]) == b'\x00\x00\x00\x00'
# block packing
assert bytes(idb) == bytes(buf)
assert str(idb) == str(buf)
assert len(idb) == len(buf)
def test_epb():
"""Test EPB with a non-ascii comment option"""
buf = (
b'\x06\x00\x00\x00\x80\x00\x00\x00\x00\x00\x00\x00\x73\xe6\x04\x00\xbe\x37\xe2\x19\x4a\x00'
b'\x00\x00\x4a\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x08\x00\x45\x00'
b'\x00\x3c\x5d\xb3\x40\x00\x40\x06\xdf\x06\x7f\x00\x00\x01\x7f\x00\x00\x01\x98\x34\x11\x4e'
b'\x95\xcb\x2d\x3a\x00\x00\x00\x00\xa0\x02\xaa\xaa\xfe\x30\x00\x00\x02\x04\xff\xd7\x04\x02'
b'\x08\x0a\x05\x8f\x70\x89\x00\x00\x00\x00\x01\x03\x03\x07\x00\x00\x01\x00\x0a\x00\xd0\xbf'
b'\xd0\xb0\xd0\xba\xd0\xb5\xd1\x82\x00\x00\x00\x00\x00\x00\x80\x00\x00\x00')
# block unpacking
epb = EnhancedPacketBlockLE(buf)
assert epb.type == PCAPNG_BT_EPB
assert epb.caplen == len(epb.pkt_data)
assert epb.pkt_len == len(epb.pkt_data)
assert epb.caplen == 74
assert epb.ts_high == 321139
assert epb.ts_low == 434255806
assert epb.data == ''
# options unpacking
assert len(epb.opts) == 2
assert epb.opts[0].code == PCAPNG_OPT_COMMENT
assert epb.opts[0].text == u'\u043f\u0430\u043a\u0435\u0442'
assert epb.opts[1].code == PCAPNG_OPT_ENDOFOPT
assert epb.opts[1].len == 0
# option packing
assert bytes(epb.opts[0]) == b'\x01\x00\x0a\x00\xd0\xbf\xd0\xb0\xd0\xba\xd0\xb5\xd1\x82\x00\x00'
assert len(epb.opts[0]) == 16
assert bytes(epb.opts[1]) == b'\x00\x00\x00\x00'
# block packing
assert bytes(epb) == bytes(buf)
assert str(epb) == str(buf)
assert len(epb) == len(buf)
def test_simple_write_read():
"""Test writing a basic pcapng and then reading it"""
fobj = BytesIO()
writer = Writer(fobj, snaplen=0x2000, linktype=DLT_LINUX_SLL)
writer.writepkt(b'foo', ts=1454725786.526401)
fobj.flush()
fobj.seek(0)
reader = Reader(fobj)
assert reader.snaplen == 0x2000
assert reader.datalink() == DLT_LINUX_SLL
ts, buf1 = next(iter(reader))
assert ts == 1454725786.526401
assert buf1 == b'foo'
# test dispatch()
fobj.seek(0)
reader = Reader(fobj)
assert reader.dispatch(1, lambda ts, pkt: None) == 1
assert reader.dispatch(1, lambda ts, pkt: None) == 0
fobj.close()
def test_pcapng_header():
"""Reading an empty file will fail as the header length is incorrect"""
fobj = BytesIO()
try:
Reader(fobj)
except Exception as e:
assert isinstance(e, ValueError)
def define_testdata():
class TestData(object):
def __init__(self):
self.valid_shb_le = SectionHeaderBlockLE(opts=[
PcapngOptionLE(code=3, data=b'64-bit Windows 8.1, build 9600'),
PcapngOptionLE(code=4, data=b'Dumpcap 1.12.7 (v1.12.7-0-g7fc8978 from master-1.12)'),
PcapngOptionLE()
])
self.valid_shb_be = SectionHeaderBlock(opts=[
PcapngOption(code=3, data=b'64-bit Windows 8.1, build 9600'),
PcapngOption(code=4, data=b'Dumpcap 1.12.7 (v1.12.7-0-g7fc8978 from master-1.12)'),
PcapngOption()
])
self.valid_idb_le = InterfaceDescriptionBlockLE(snaplen=0x40000, opts=[
PcapngOptionLE(code=2, data=b'\\Device\\NPF_{3BBF21A7-91AE-4DDB-AB2C-C782999C22D5}'),
PcapngOptionLE(code=9, data=b'\x06'),
PcapngOptionLE(code=12, data=b'64-bit Windows 8.1, build 9600'),
PcapngOptionLE()
])
self.valid_idb_be = InterfaceDescriptionBlock(snaplen=0x40000, opts=[
PcapngOption(code=2, data=b'\\Device\\NPF_{3BBF21A7-91AE-4DDB-AB2C-C782999C22D5}'),
PcapngOption(code=9, data=b'\x06'),
PcapngOption(code=12, data=b'64-bit Windows 8.1, build 9600'),
PcapngOption()
])
self.valid_pcapng = (
b'\x0a\x0d\x0d\x0a\x7c\x00\x00\x00\x4d\x3c\x2b\x1a\x01\x00\x00'
b'\x00\xff\xff\xff\xff\xff\xff\xff\xff\x03\x00\x1e\x00\x36\x34'
b'\x2d\x62\x69\x74\x20\x57\x69\x6e\x64\x6f\x77\x73\x20\x38\x2e'
b'\x31\x2c\x20\x62\x75\x69\x6c\x64\x20\x39\x36\x30\x30\x00\x00'
b'\x04\x00\x34\x00\x44\x75\x6d\x70\x63\x61\x70\x20\x31\x2e\x31'
b'\x32\x2e\x37\x20\x28\x76\x31\x2e\x31\x32\x2e\x37\x2d\x30\x2d'
b'\x67\x37\x66\x63\x38\x39\x37\x38\x20\x66\x72\x6f\x6d\x20\x6d'
b'\x61\x73\x74\x65\x72\x2d\x31\x2e\x31\x32\x29\x00\x00\x00\x00'
b'\x7c\x00\x00\x00\x01\x00\x00\x00\x7c\x00\x00\x00\x01\x00\x00'
b'\x00\x00\x00\x04\x00\x02\x00\x32\x00\x5c\x44\x65\x76\x69\x63'
b'\x65\x5c\x4e\x50\x46\x5f\x7b\x33\x42\x42\x46\x32\x31\x41\x37'
b'\x2d\x39\x31\x41\x45\x2d\x34\x44\x44\x42\x2d\x41\x42\x32\x43'
b'\x2d\x43\x37\x38\x32\x39\x39\x39\x43\x32\x32\x44\x35\x7d\x00'
b'\x00\x09\x00\x01\x00\x06\x00\x00\x00\x0c\x00\x1e\x00\x36\x34'
b'\x2d\x62\x69\x74\x20\x57\x69\x6e\x64\x6f\x77\x73\x20\x38\x2e'
b'\x31\x2c\x20\x62\x75\x69\x6c\x64\x20\x39\x36\x30\x30\x00\x00'
b'\x00\x00\x00\x00\x7c\x00\x00\x00\x06\x00\x00\x00\x84\x00\x00'
b'\x00\x00\x00\x00\x00\x63\x20\x05\x00\xd6\xc4\xab\x0b\x4a\x00'
b'\x00\x00\x4a\x00\x00\x00\x08\x00\x27\x96\xcb\x7c\x52\x54\x00'
b'\x12\x35\x02\x08\x00\x45\x00\x00\x3c\xa4\x40\x00\x00\x1f\x01'
b'\x27\xa2\xc0\xa8\x03\x28\x0a\x00\x02\x0f\x00\x00\x56\xf0\x00'
b'\x01\x00\x6d\x41\x42\x43\x44\x45\x46\x47\x48\x49\x4a\x4b\x4c'
b'\x4d\x4e\x4f\x50\x51\x52\x53\x54\x55\x56\x57\x41\x42\x43\x44'
b'\x45\x46\x47\x48\x49\x00\x00\x01\x00\x0f\x00\x64\x70\x6b\x74'
b'\x20\x69\x73\x20\x61\x77\x65\x73\x6f\x6d\x65\x00\x00\x00\x00'
b'\x00\x84\x00\x00\x00'
)
self.valid_pkts = [
(1442984653.210838,
(b"\x08\x00'\x96\xcb|RT\x00\x125\x02\x08\x00E\x00\x00<\xa4@"
b"\x00\x00\x1f\x01'\xa2\xc0\xa8\x03(\n\x00\x02\x0f\x00\x00V"
b"\xf0\x00\x01\x00mABCDEFGHIJKLMNOPQRSTUVWABCDEFGHI"))
]
self.valid_epb_be = EnhancedPacketBlock(opts=[
PcapngOption(code=1, text=b'dpkt is awesome'),
PcapngOption()
], pkt_data=(
b'\x08\x00\x27\x96\xcb\x7c\x52\x54\x00\x12\x35\x02\x08\x00\x45'
b'\x00\x00\x3c\xa4\x40\x00\x00\x1f\x01\x27\xa2\xc0\xa8\x03\x28'
b'\x0a\x00\x02\x0f\x00\x00\x56\xf0\x00\x01\x00\x6d\x41\x42\x43'
b'\x44\x45\x46\x47\x48\x49\x4a\x4b\x4c\x4d\x4e\x4f\x50\x51\x52'
b'\x53\x54\x55\x56\x57\x41\x42\x43\x44\x45\x46\x47\x48\x49'
))
self.valid_epb_le = EnhancedPacketBlockLE(opts=[
PcapngOptionLE(code=1, text=b'dpkt is awesome'),
PcapngOptionLE()
], pkt_data=(
b'\x08\x00\x27\x96\xcb\x7c\x52\x54\x00\x12\x35\x02\x08\x00\x45'
b'\x00\x00\x3c\xa4\x40\x00\x00\x1f\x01\x27\xa2\xc0\xa8\x03\x28'
b'\x0a\x00\x02\x0f\x00\x00\x56\xf0\x00\x01\x00\x6d\x41\x42\x43'
b'\x44\x45\x46\x47\x48\x49\x4a\x4b\x4c\x4d\x4e\x4f\x50\x51\x52'
b'\x53\x54\x55\x56\x57\x41\x42\x43\x44\x45\x46\x47\x48\x49'
))
@property
def shb_idb_epb_le(self):
return self.valid_shb_le, self.valid_idb_le, self.valid_epb_le
@property
def shb_idb_epb_be(self):
return self.valid_shb_be, self.valid_idb_be, self.valid_epb_be
return TestData()
def pre_test(f):
def wrapper(*args, **kwargs):
fobj = BytesIO()
f.__globals__['fobj'] = fobj
ret = f(*args, **kwargs)
fobj.flush()
fobj.seek(0)
return ret
return wrapper
class WriterTestWrap:
"""
Decorate a writer test function with an instance of this class.
The test will be provided with a writer object, which it shoud write some pkts to.
After the test has run, the BytesIO object will be passed to a Reader,
which will compare each pkt to the return value of the test.
"""
def __init__(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs
def __call__(self, f, *args, **kwargs):
def wrapper(*args, **kwargs):
from .compat import BytesIO
for little_endian in [True, False]:
fobj = BytesIO()
_sysle = Writer._Writer__le
Writer._Writer__le = little_endian
f.__globals__['writer'] = Writer(fobj, **self.kwargs.get('writer', {}))
f.__globals__['fobj'] = fobj
pkts = f(*args, **kwargs)
fobj.flush()
fobj.seek(0)
assert pkts, "You must return the input data from the test"
for (ts_out, pkt_out), (ts_in, pkt_in) in zip(pkts, iter(Reader(fobj))):
assert ts_out == ts_in
assert pkt_out == pkt_in
# 'noqa' for flake8 to ignore these since writer and fobj were injected into globals
writer.close() # noqa
Writer._Writer__le = _sysle
del f.__globals__['writer']
del f.__globals__['fobj']
return wrapper
class PostTest:
def __init__(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs
def __call__(self, f, *args, **kwargs):
def wrapper(*args, **kwargs):
ret = f(*args, **kwargs)
fobj = f.__globals__['fobj']
test_type = self.kwargs.get('test')
if test_type == 'assertion':
isexception = False
try:
Reader(fobj)
except Exception as e:
isexception = True
assert isinstance(e, self.kwargs['type'])
assert str(e) == self.kwargs['msg']
assert isexception, "No assertion raised!"
elif test_type == 'compare_property':
prop = self.kwargs['property']
reader = Reader(fobj)
assert bytes(ret) == bytes(getattr(reader, prop))
elif test_type == 'compare_method':
method = self.kwargs['method']
reader = Reader(fobj)
comp = getattr(reader, method)()
assert comp == ret
else:
raise Exception("No test type specified")
return wrapper
@PostTest(test='assertion', type=ValueError, msg='invalid pcapng header: not a SHB')
@pre_test
def test_shb_header():
shb = define_testdata().valid_shb_le
shb.type = 123456666
fobj.write(bytes(shb)) # noqa
@PostTest(test='assertion', type=ValueError, msg='unknown endianness')
@pre_test
def test_shb_bom():
shb = define_testdata().valid_shb_le
shb.bom = 12345666
fobj.write(bytes(shb)) # noqa
@PostTest(test='assertion', type=ValueError, msg='unknown pcapng version 123.45')
@pre_test
def test_shb_version():
shb = define_testdata().valid_shb_le
shb.v_major = 123
shb.v_minor = 45
fobj.write(bytes(shb)) # noqa
@PostTest(test='assertion', type=ValueError, msg='IDB not found')
@pre_test
def test_no_idb():
shb = define_testdata().valid_shb_le
fobj.write(bytes(shb)+b'aaaa') # noqa
@PostTest(test='compare_property', property='idb')
@pre_test
def test_idb_opt_offset():
"""Test that the timestamp offset is correctly written and read"""
shb = define_testdata().valid_shb_le
idb = define_testdata().valid_idb_le
idb.opts.insert(0, PcapngOptionLE(
code=PCAPNG_OPT_IF_TSOFFSET,
data=struct_pack('<q', 123456666))
)
fobj.write(bytes(shb)+bytes(idb)) # noqa
return idb
@PostTest(test='compare_property', property='dloff')
@pre_test
def test_idb_linktype():
"""Test that if the idb.linktype is not in dloff, dloff is set to 0"""
shb = define_testdata().valid_shb_le
idb = define_testdata().valid_idb_le
idb.linktype = 3456
fobj.write(bytes(shb)+bytes(idb)) # noqa
return 0
def test_repr():
"""check the __repr__ method for Packet subclass.
The __repr__ method currently includes the b'' in the string. This means that python2 and python3 will differ.
"""
real = repr(define_testdata().valid_shb_le)
python2 = (
"SectionHeaderBlockLE(opts=[PcapngOptionLE(code=3, data='64-bit Windows 8.1, build 9600'),"
" PcapngOptionLE(code=4, data='Dumpcap 1.12.7 (v1.12.7-0-g7fc8978 from master-1.12)'),"
" PcapngOptionLE(opt_endofopt)])")
python3 = (
"SectionHeaderBlockLE(opts=[PcapngOptionLE(code=3, data=b'64-bit Windows 8.1, build 9600'),"
" PcapngOptionLE(code=4, data=b'Dumpcap 1.12.7 (v1.12.7-0-g7fc8978 from master-1.12)'),"
" PcapngOptionLE(opt_endofopt)])")
assert real in [python2, python3]
@pre_test
def test_filter():
buf = define_testdata().valid_pcapng
fobj.write(buf) # noqa
fobj.flush() # noqa
fobj.seek(0) # noqa
reader = Reader(fobj) # noqa
try:
reader.setfilter(None, None)
except Exception as e:
assert isinstance(e, NotImplementedError)
@PostTest(test='compare_method', method='readpkts')
@pre_test
def test_readpkts():
fobj.write(define_testdata().valid_pcapng) # noqa
return define_testdata().valid_pkts
@PostTest(test='compare_method', method='next')
@pre_test
def test_next():
fobj.write(define_testdata().valid_pcapng) # noqa
return define_testdata().valid_pkts[0]
@pre_test
def test_dispatch():
fobj.write(define_testdata().valid_pcapng) # noqa
fobj.flush() # noqa
fobj.seek(0) # noqa
def callback(timestamp, pkt, *args):
assert (timestamp, pkt) == define_testdata().valid_pkts[0]
reader = Reader(fobj) # noqa
assert 1 == reader.dispatch(0, callback)
@pre_test
def test_loop():
fobj.write(define_testdata().valid_pcapng) # noqa
fobj.flush() # noqa
fobj.seek(0) # noqa
def callback(timestamp, pkt, *args):
assert (timestamp, pkt) == define_testdata().valid_pkts[0]
reader = Reader(fobj) # noqa
reader.loop(callback)
def test_idb_opt_err():
"""Test that options end with opt_endofopt"""
idb = define_testdata().valid_idb_le
del idb.opts[-1]
try:
bytes(idb)
except Exception as e:
assert isinstance(e, dpkt.PackError)
assert str(e) == 'options must end with opt_endofopt'
def test_custom_read_write():
"""Test a full pcapng file with 1 ICMP packet"""
buf = define_testdata().valid_pcapng
fobj = BytesIO(buf)
# test reading
reader = Reader(fobj)
assert reader.snaplen == 0x40000
assert reader.datalink() == DLT_EN10MB
assert reader.idb.opts[0].data.decode('utf-8') == '\\Device\\NPF_{3BBF21A7-91AE-4DDB-AB2C-C782999C22D5}'
assert reader.idb.opts[2].data.decode('utf-8') == '64-bit Windows 8.1, build 9600'
ts, buf1 = next(iter(reader))
assert ts == 1442984653.2108380
assert len(buf1) == 74
assert buf1.startswith(b'\x08\x00\x27\x96')
assert buf1.endswith(b'FGHI')
fobj.close()
# test pcapng customized writing
if sys.byteorder == 'little':
shb, idb, epb = define_testdata().shb_idb_epb_le
else:
shb, idb, epb = define_testdata().shb_idb_epb_be
fobj = BytesIO()
writer = Writer(fobj, shb=shb, idb=idb)
writer.writepkt(epb, ts=1442984653.210838)
assert fobj.getvalue() == buf
fobj.close()
# same with timestamps defined inside EPB
epb.ts_high = 335971
epb.ts_low = 195806422
fobj = BytesIO()
writer = Writer(fobj, shb=shb, idb=idb)
writer.writepkt(epb)
assert fobj.getvalue() == buf
fobj.close()
def test_multi_idb_writer():
"""Test writing multiple interface description blocks into pcapng and read it"""
fobj = BytesIO()
shb, idb, epb = define_testdata().shb_idb_epb_le
writer = Writer(fobj, shb=shb, idb=[idb, idb])
writer.writepkt(epb)
fobj.flush()
fobj.seek(0)
Reader(fobj)
fobj.close()
@pre_test
def test_writer_validate_instance():
"""System endianness and shb endianness should match"""
shb = 10
try:
writer = Writer(fobj, shb=shb) # noqa
except Exception as e:
assert isinstance(e, ValueError)
assert str(e) == 'shb: expecting class SectionHeaderBlock'
@pre_test
def test_writepkt_epb_ts():
"""writepkt should assign ts_high/low for epb if they are 0"""
global time
shb, idb, epb = define_testdata().shb_idb_epb_le
writer = Writer(fobj, shb=shb, idb=idb) # noqa
epb.ts_high = epb.ts_low = 0
ts = 1454725786.526401
_tmp = time
def time():
return ts
writer.writepkt(epb)
time = _tmp
ts_high, ts_low = 338704, 3183502017
assert epb.ts_high == ts_high
assert epb.ts_low == ts_low
@pre_test
def test_writer_validate_le():
"""System endianness and shb endianness should match"""
shb = define_testdata().valid_shb_be
_sysle = Writer._Writer__le
Writer._Writer__le = True
try:
writer = Writer(fobj, shb=shb) # noqa
except Exception as e:
assert isinstance(e, ValueError)
assert str(e) == 'shb: expecting class SectionHeaderBlockLE on a little-endian system'
Writer._Writer__le = _sysle
@pre_test
def test_writer_validate_be():
"""System endianness and shb endianness should match"""
shb = define_testdata().valid_shb_le
_sysle = Writer._Writer__le
Writer._Writer__le = False
try:
writer = Writer(fobj, shb=shb) # noqa
except Exception as e:
assert isinstance(e, ValueError)
assert str(e) == 'shb: expecting class SectionHeaderBlock on a big-endian system'
Writer._Writer__le = _sysle
@WriterTestWrap()
def test_writepkt_no_time():
global time
ts, pkt = 1454725786.526401, b'foooo'
_tmp = time
def time():
return ts
writer.writepkt(pkt) # noqa
time = _tmp
return [(ts, pkt)]
@WriterTestWrap(writer={'snaplen': 10})
def test_writepkt_snaplen():
ts, pkt = 1454725786.526401, b'foooo' * 100
writer.writepkt(pkt, ts) # noqa
return [(ts, pkt)]
@WriterTestWrap()
def test_writepkt_with_time():
ts, pkt = 1454725786.526401, b'foooo'
writer.writepkt(pkt, ts) # noqa
return [(ts, pkt)]
@WriterTestWrap()
def test_writepkts():
"""writing multiple packets from a list"""
pkts = [
(1454725786.526401, b"fooo"),
(1454725787.526401, b"barr"),
(3243204320.093211, b"grill"),
(1454725789.526401, b"lol"),
]
writer.writepkts(pkts) # noqa
return pkts
def test_pcapng_block_pack():
assert bytes(_PcapngBlock())
def test_pcapng_block_unpack():
block = _PcapngBlock()
buf = b'012345678901'
try:
block.unpack(buf)
except Exception as e:
assert isinstance(e, dpkt.NeedData)
def test_epb_unpack():
"""EnhancedPacketBlock can only unpack data >64 bytes, the length of their header"""
shb, idb, epb = define_testdata().shb_idb_epb_be
buf = b'quite-long-but-not-long-enough-at-least-32'
try:
epb.unpack(buf)
except Exception as e:
assert isinstance(e, dpkt.NeedData)
def test_epb_unpack_length_mismatch():
"""Force calculated len to be 0 when unpacking epb, this should fail when unpacking"""
shb, idb, epb = define_testdata().shb_idb_epb_be
unpackme = bytes(epb)
unpackme = unpackme[:-4] + b'\x00' * 4
try:
epb.unpack(unpackme)
except Exception as e:
assert isinstance(e, dpkt.UnpackError)
assert str(e) == 'length fields do not match'
def test_pcapng_block_len_no_opts():
"""_PcapngBlock should return its own header __len__ if it has no opts"""
block = _PcapngBlock()
assert len(block) == 12
def test_reader_file_descriptor():
"""Reader has .fd and .fileno() convenience members. Compare them to the actual fobj that was passed in"""
pcapng = define_testdata().valid_pcapng
import tempfile
with tempfile.TemporaryFile() as fobj:
fobj.write(pcapng)
fobj.seek(0)
reader = Reader(fobj)
assert reader.fd == fobj.fileno()
assert reader.fileno() == fobj.fileno()
def test_posttest():
"""Check that PostTest wrapper doesn't fail silently"""
@PostTest()
@pre_test
def fun():
pass
try:
fun()
except Exception as e:
assert str(e) == 'No test type specified'