blob: 0c054ce201af4110fa92df771d14ceaf9ac61f69 [file] [log] [blame]
# $Id: gre.py 75 2010-08-03 14:42:19Z jon.oberheide $
# -*- coding: utf-8 -*-
"""Generic Routing Encapsulation."""
from __future__ import absolute_import
import struct
import codecs
from . import dpkt
from . import ethernet
from .compat import compat_izip
GRE_CP = 0x8000 # Checksum Present
GRE_RP = 0x4000 # Routing Present
GRE_KP = 0x2000 # Key Present
GRE_SP = 0x1000 # Sequence Present
GRE_SS = 0x0800 # Strict Source Route
GRE_AP = 0x0080 # Acknowledgment Present
GRE_opt_fields = (
(GRE_CP | GRE_RP, 'sum', 'H'), (GRE_CP | GRE_RP, 'off', 'H'),
(GRE_KP, 'key', 'I'), (GRE_SP, 'seq', 'I'), (GRE_AP, 'ack', 'I')
)
class GRE(dpkt.Packet):
"""Generic Routing Encapsulation.
TODO: Longer class information....
Attributes:
__hdr__: Header fields of GRE.
TODO.
"""
__hdr__ = (
('flags', 'H', 0),
('p', 'H', 0x0800), # ETH_TYPE_IP
)
sre = ()
@property
def v(self):
return self.flags & 0x7
@v.setter
def v(self, v):
self.flags = (self.flags & ~0x7) | (v & 0x7)
@property
def recur(self):
return (self.flags >> 5) & 0x7
@recur.setter
def recur(self, v):
self.flags = (self.flags & ~0xe0) | ((v & 0x7) << 5)
class SRE(dpkt.Packet):
__hdr__ = [
('family', 'H', 0),
('off', 'B', 0),
('len', 'B', 0)
]
def unpack(self, buf):
dpkt.Packet.unpack(self, buf)
self.data = self.data[:self.len]
def opt_fields_fmts(self):
if self.v == 0:
fields, fmts = [], []
opt_fields = GRE_opt_fields
else:
fields, fmts = ['len', 'callid'], ['H', 'H']
opt_fields = GRE_opt_fields[-2:]
for flags, field, fmt in opt_fields:
if self.flags & flags:
fields.append(field)
fmts.append(fmt)
return fields, fmts
def unpack(self, buf):
dpkt.Packet.unpack(self, buf)
fields, fmts = self.opt_fields_fmts()
if fields:
fmt = ''.join(fmts)
fmtlen = struct.calcsize(fmt)
vals = struct.unpack("!" + fmt, self.data[:fmtlen])
self.data = self.data[fmtlen:]
self.__dict__.update(dict(compat_izip(fields, vals)))
if self.flags & GRE_RP:
l_ = []
while True:
sre = self.SRE(self.data)
self.data = self.data[len(sre):]
l_.append(sre)
if not sre.len:
break
self.sre = l_
try:
self.data = ethernet.Ethernet._typesw[self.p](self.data)
setattr(self, self.data.__class__.__name__.lower(), self.data)
except (KeyError, dpkt.UnpackError):
# data alrady set
pass
def __len__(self):
opt_fmtlen = struct.calcsize(''.join(self.opt_fields_fmts()[1]))
return self.__hdr_len__ + opt_fmtlen + sum(map(len, self.sre)) + len(self.data)
def __bytes__(self):
fields, fmts = self.opt_fields_fmts()
if fields:
vals = []
for f in fields:
vals.append(getattr(self, f))
opt_s = struct.pack('!' + ''.join(fmts), *vals)
else:
opt_s = b''
return self.pack_hdr() + opt_s + b''.join(map(bytes, self.sre)) + bytes(self.data)
def test_gre_v1():
# Runs all the test associated with this class/file
s = codecs.decode("3081880a0067178000068fb100083a76", 'hex') + b"A" * 103
g = GRE(s)
assert g.v == 1
assert g.p == 0x880a
assert g.seq == 430001
assert g.ack == 539254
assert g.callid == 6016
assert g.len == 103
assert g.data == b"A" * 103
assert len(g) == len(s)
s = codecs.decode("3001880a00b2001100083ab8", 'hex') + b"A" * 178
g = GRE(s)
assert g.v == 1
assert g.p == 0x880a
assert g.seq == 539320
assert g.callid == 17
assert g.len == 178
assert g.data == b"A" * 178
assert len(g) == len(s)
def test_gre_len():
from binascii import unhexlify
gre = GRE()
assert len(gre) == 4
buf = unhexlify("3081880a0067178000068fb100083a76") + b"\x41" * 103
gre = GRE(buf)
assert bytes(gre) == buf
assert len(gre) == len(buf)
def test_gre_accessors():
gre = GRE()
for attr in ['v', 'recur']:
print(attr)
assert hasattr(gre, attr)
assert getattr(gre, attr) == 0
setattr(gre, attr, 1)
assert getattr(gre, attr) == 1
def test_sre_creation():
from binascii import unhexlify
buf = unhexlify(
'0000' # family
'00' # off
'02' # len
'ffff'
)
sre = GRE.SRE(buf)
assert sre.data == b'\xff\xff'
assert len(sre) == 6
assert bytes(sre) == buf
def test_gre_nested_sre():
from binascii import unhexlify
buf = unhexlify(
'4000' # flags (GRE_RP)
'0800' # p (ETH_TYPE_IP)
'0001' # sum
'0002' # off
# SRE entry
'0003' # family
'04' # off
'02' # len
'ffff'
# SRE entry (no len => last element)
'0006' # family
'00' # off
'00' # len
)
gre = GRE(buf)
assert hasattr(gre, 'sre')
assert isinstance(gre.sre, list)
assert len(gre.sre) == 2
assert len(gre) == len(buf)
assert bytes(gre) == buf
assert gre.data == b''
def test_gre_next_layer():
from binascii import unhexlify
from . import ipx
buf = unhexlify(
'0000' # flags (NONE)
'8137' # p (ETH_TYPE_IPX)
# IPX packet
'0000' # sum
'0001' # len
'02' # tc
'03' # pt
'0102030405060708090a0b0c' # dst
'c0b0a0908070605040302010' # src
)
gre = GRE(buf)
assert hasattr(gre, 'ipx')
assert isinstance(gre.data, ipx.IPX)
assert gre.data.tc == 2
assert gre.data.src == unhexlify('c0b0a0908070605040302010')
assert gre.data.dst == unhexlify('0102030405060708090a0b0c')
assert len(gre) == len(buf)
assert bytes(gre) == buf