blob: 352dfa0cd2d73d908a9e69905967464ef88ad9d5 [file] [log] [blame]
# $Id: bgp.py 76 2011-01-06 15:51:30Z dugsong $
# -*- coding: utf-8 -*-
"""Border Gateway Protocol."""
from __future__ import print_function
from __future__ import absolute_import
import struct
import socket
from . import dpkt
from .compat import compat_ord
# Border Gateway Protocol 4 - RFC 4271
# Communities Attribute - RFC 1997
# Capabilities - RFC 3392
# Route Refresh - RFC 2918
# Route Reflection - RFC 4456
# Confederations - RFC 3065
# Cease Subcodes - RFC 4486
# NOPEER Community - RFC 3765
# Multiprotocol Extensions - 2858
# Advertisement of Multiple Paths in BGP - RFC 7911
# BGP Support for Four-Octet Autonomous System (AS) Number Spac - RFC 6793
# Message Types
OPEN = 1
UPDATE = 2
NOTIFICATION = 3
KEEPALIVE = 4
ROUTE_REFRESH = 5
# Attribute Types
ORIGIN = 1
AS_PATH = 2
NEXT_HOP = 3
MULTI_EXIT_DISC = 4
LOCAL_PREF = 5
ATOMIC_AGGREGATE = 6
AGGREGATOR = 7
COMMUNITIES = 8
ORIGINATOR_ID = 9
CLUSTER_LIST = 10
MP_REACH_NLRI = 14
MP_UNREACH_NLRI = 15
# Origin Types
ORIGIN_IGP = 0
ORIGIN_EGP = 1
INCOMPLETE = 2
# AS Path Types
AS_SET = 1
AS_SEQUENCE = 2
AS_CONFED_SEQUENCE = 3
AS_CONFED_SET = 4
# Reserved Communities Types
NO_EXPORT = 0xffffff01
NO_ADVERTISE = 0xffffff02
NO_EXPORT_SUBCONFED = 0xffffff03
NO_PEER = 0xffffff04
# Common AFI types
AFI_IPV4 = 1
AFI_IPV6 = 2
AFI_L2VPN = 25
# Multiprotocol SAFI types
SAFI_UNICAST = 1
SAFI_MULTICAST = 2
SAFI_UNICAST_MULTICAST = 3
SAFI_EVPN = 70
# OPEN Message Optional Parameters
AUTHENTICATION = 1
CAPABILITY = 2
# Capability Types
CAP_MULTIPROTOCOL = 1
CAP_ROUTE_REFRESH = 2
# NOTIFICATION Error Codes
MESSAGE_HEADER_ERROR = 1
OPEN_MESSAGE_ERROR = 2
UPDATE_MESSAGE_ERROR = 3
HOLD_TIMER_EXPIRED = 4
FSM_ERROR = 5
CEASE = 6
# Message Header Error Subcodes
CONNECTION_NOT_SYNCHRONIZED = 1
BAD_MESSAGE_LENGTH = 2
BAD_MESSAGE_TYPE = 3
# OPEN Message Error Subcodes
UNSUPPORTED_VERSION_NUMBER = 1
BAD_PEER_AS = 2
BAD_BGP_IDENTIFIER = 3
UNSUPPORTED_OPTIONAL_PARAMETER = 4
AUTHENTICATION_FAILURE = 5
UNACCEPTABLE_HOLD_TIME = 6
UNSUPPORTED_CAPABILITY = 7
# UPDATE Message Error Subcodes
MALFORMED_ATTRIBUTE_LIST = 1
UNRECOGNIZED_ATTRIBUTE = 2
MISSING_ATTRIBUTE = 3
ATTRIBUTE_FLAGS_ERROR = 4
ATTRIBUTE_LENGTH_ERROR = 5
INVALID_ORIGIN_ATTRIBUTE = 6
AS_ROUTING_LOOP = 7
INVALID_NEXT_HOP_ATTRIBUTE = 8
OPTIONAL_ATTRIBUTE_ERROR = 9
INVALID_NETWORK_FIELD = 10
MALFORMED_AS_PATH = 11
# Cease Error Subcodes
MAX_NUMBER_OF_PREFIXES_REACHED = 1
ADMINISTRATIVE_SHUTDOWN = 2
PEER_DECONFIGURED = 3
ADMINISTRATIVE_RESET = 4
CONNECTION_REJECTED = 5
OTHER_CONFIGURATION_CHANGE = 6
CONNECTION_COLLISION_RESOLUTION = 7
OUT_OF_RESOURCES = 8
class BGP(dpkt.Packet):
"""Border Gateway Protocol.
BGP is an inter-AS routing protocol.
See more about the BGP on
https://en.wikipedia.org/wiki/Border_Gateway_Protocol
Attributes:
__hdr__: Header fields of BGP.
#TODO
"""
__hdr__ = (
('marker', '16s', '\xff' * 16),
('len', 'H', 0),
('type', 'B', OPEN)
)
def unpack(self, buf):
dpkt.Packet.unpack(self, buf)
self.data = self.data[:self.len - self.__hdr_len__]
if self.type == OPEN:
self.data = self.open = self.Open(self.data)
elif self.type == UPDATE:
self.data = self.update = self.Update(self.data)
elif self.type == NOTIFICATION:
self.data = self.notification = self.Notification(self.data)
elif self.type == KEEPALIVE:
self.data = self.keepalive = self.Keepalive(self.data)
elif self.type == ROUTE_REFRESH:
self.data = self.route_refresh = self.RouteRefresh(self.data)
class Open(dpkt.Packet):
__hdr__ = (
('v', 'B', 4),
('asn', 'H', 0),
('holdtime', 'H', 0),
('identifier', 'I', 0),
('param_len', 'B', 0)
)
__hdr_defaults__ = {
'parameters': []
}
def unpack(self, buf):
dpkt.Packet.unpack(self, buf)
l_ = []
plen = self.param_len
while plen > 0:
param = self.Parameter(self.data)
self.data = self.data[len(param):]
plen -= len(param)
l_.append(param)
self.data = self.parameters = l_
def __len__(self):
return self.__hdr_len__ + sum(map(len, self.parameters))
def __bytes__(self):
params = b''.join(map(bytes, self.parameters))
self.param_len = len(params)
return self.pack_hdr() + params
class Parameter(dpkt.Packet):
__hdr__ = (
('type', 'B', 0),
('len', 'B', 0)
)
def unpack(self, buf):
dpkt.Packet.unpack(self, buf)
self.data = self.data[:self.len]
if self.type == AUTHENTICATION:
self.data = self.authentication = self.Authentication(self.data)
elif self.type == CAPABILITY:
self.data = self.capability = self.Capability(self.data)
class Authentication(dpkt.Packet):
__hdr__ = (
('code', 'B', 0),
)
class Capability(dpkt.Packet):
__hdr__ = (
('code', 'B', 0),
('len', 'B', 0)
)
def unpack(self, buf):
dpkt.Packet.unpack(self, buf)
self.data = self.data[:self.len]
class Update(dpkt.Packet):
__hdr_defaults__ = {
'withdrawn': [],
'attributes': [],
'announced': []
}
def unpack(self, buf):
self.data = buf
# Withdrawn Routes
wlen = struct.unpack('>H', self.data[:2])[0]
self.data = self.data[2:]
l_ = []
while wlen > 0:
route = RouteIPV4(self.data)
self.data = self.data[len(route):]
wlen -= len(route)
l_.append(route)
self.withdrawn = l_
# Path Attributes
plen = struct.unpack('>H', self.data[:2])[0]
self.data = self.data[2:]
l_ = []
while plen > 0:
attr = self.Attribute(self.data)
self.data = self.data[len(attr):]
plen -= len(attr)
l_.append(attr)
self.attributes = l_
# Announced Routes
l_ = []
while self.data:
if len(self.data) % 9 == 0:
route = ExtendedRouteIPV4(self.data)
else:
route = RouteIPV4(self.data)
self.data = self.data[len(route):]
l_.append(route)
self.announced = l_
def __len__(self):
return 2 + sum(map(len, self.withdrawn)) + \
2 + sum(map(len, self.attributes)) + \
sum(map(len, self.announced))
def __bytes__(self):
return struct.pack('>H', sum(map(len, self.withdrawn))) + \
b''.join(map(bytes, self.withdrawn)) + \
struct.pack('>H', sum(map(len, self.attributes))) + \
b''.join(map(bytes, self.attributes)) + \
b''.join(map(bytes, self.announced))
class Attribute(dpkt.Packet):
__hdr__ = (
('flags', 'B', 0),
('type', 'B', 0)
)
@property
def optional(self):
return (self.flags >> 7) & 0x1
@optional.setter
def optional(self, o):
self.flags = (self.flags & ~0x80) | ((o & 0x1) << 7)
@property
def transitive(self):
return (self.flags >> 6) & 0x1
@transitive.setter
def transitive(self, t):
self.flags = (self.flags & ~0x40) | ((t & 0x1) << 6)
@property
def partial(self):
return (self.flags >> 5) & 0x1
@partial.setter
def partial(self, p):
self.flags = (self.flags & ~0x20) | ((p & 0x1) << 5)
@property
def extended_length(self):
return (self.flags >> 4) & 0x1
@extended_length.setter
def extended_length(self, e):
self.flags = (self.flags & ~0x10) | ((e & 0x1) << 4)
def unpack(self, buf):
dpkt.Packet.unpack(self, buf)
if self.extended_length:
self.len = struct.unpack('>H', self.data[:2])[0]
self.data = self.data[2:]
else:
self.len = struct.unpack('B', self.data[:1])[0]
self.data = self.data[1:]
self.data = self.data[:self.len]
if self.type == ORIGIN:
self.data = self.origin = self.Origin(self.data)
elif self.type == AS_PATH:
self.data = self.as_path = self.ASPath(self.data)
elif self.type == NEXT_HOP:
self.data = self.next_hop = self.NextHop(self.data)
elif self.type == MULTI_EXIT_DISC:
self.data = self.multi_exit_disc = self.MultiExitDisc(self.data)
elif self.type == LOCAL_PREF:
self.data = self.local_pref = self.LocalPref(self.data)
elif self.type == ATOMIC_AGGREGATE:
self.data = self.atomic_aggregate = self.AtomicAggregate(self.data)
elif self.type == AGGREGATOR:
self.data = self.aggregator = self.Aggregator(self.data)
elif self.type == COMMUNITIES:
self.data = self.communities = self.Communities(self.data)
elif self.type == ORIGINATOR_ID:
self.data = self.originator_id = self.OriginatorID(self.data)
elif self.type == CLUSTER_LIST:
self.data = self.cluster_list = self.ClusterList(self.data)
elif self.type == MP_REACH_NLRI:
self.data = self.mp_reach_nlri = self.MPReachNLRI(self.data)
elif self.type == MP_UNREACH_NLRI:
self.data = self.mp_unreach_nlri = self.MPUnreachNLRI(self.data)
def __len__(self):
if self.extended_length:
attr_len = 2
else:
attr_len = 1
return self.__hdr_len__ + attr_len + len(self.data)
def __bytes__(self):
if self.extended_length:
attr_len_str = struct.pack('>H', self.len)
else:
attr_len_str = struct.pack('B', self.len)
return self.pack_hdr() + attr_len_str + bytes(self.data)
class Origin(dpkt.Packet):
__hdr__ = (
('type', 'B', ORIGIN_IGP),
)
class ASPath(dpkt.Packet):
__hdr_defaults__ = {
'segments': []
}
def unpack(self, buf):
self.data = buf
l_ = []
as4 = len(self.data) == 6
while self.data:
if as4:
seg = self.ASPathSegment4(self.data)
else:
seg = self.ASPathSegment(self.data)
self.data = self.data[len(seg):]
l_.append(seg)
self.data = self.segments = l_
def __len__(self):
return sum(map(len, self.data))
def __bytes__(self):
return b''.join(map(bytes, self.data))
class ASPathSegment(dpkt.Packet):
__hdr__ = (
('type', 'B', 0),
('len', 'B', 0)
)
def unpack(self, buf):
dpkt.Packet.unpack(self, buf)
l_ = []
for i in range(self.len):
AS = struct.unpack('>H', self.data[:2])[0]
self.data = self.data[2:]
l_.append(AS)
self.data = self.path = l_
def __len__(self):
return self.__hdr_len__ + 2 * len(self.path)
def __bytes__(self):
as_str = b''
for AS in self.path:
as_str += struct.pack('>H', AS)
return self.pack_hdr() + as_str
class ASPathSegment4(dpkt.Packet):
__hdr__ = (
('type', 'B', 0),
('len', 'B', 0)
)
def unpack(self, buf):
dpkt.Packet.unpack(self, buf)
l_ = []
for i in range(self.len):
if len(self.data) >= 4:
AS = struct.unpack('>I', self.data[:4])[0]
self.data = self.data[4:]
l_.append(AS)
self.path = l_
def __len__(self):
return self.__hdr_len__ + 4 * len(self.path)
def __bytes__(self):
as_str = b''
for AS in self.path:
as_str += struct.pack('>I', AS)
return self.pack_hdr() + as_str
class NextHop(dpkt.Packet):
__hdr__ = (
('ip', 'I', 0),
)
class MultiExitDisc(dpkt.Packet):
__hdr__ = (
('value', 'I', 0),
)
class LocalPref(dpkt.Packet):
__hdr__ = (
('value', 'I', 0),
)
class AtomicAggregate(dpkt.Packet):
def unpack(self, buf):
pass
def __len__(self):
return 0
def __bytes__(self):
return b''
class Aggregator(dpkt.Packet):
__hdr__ = (
('asn', 'H', 0),
('ip', 'I', 0)
)
class Communities(dpkt.Packet):
__hdr_defaults__ = {
'list': []
}
def unpack(self, buf):
self.data = buf
l_ = []
while self.data:
val = struct.unpack('>I', self.data[:4])[0]
if (0x00000000 <= val <= 0x0000ffff) or (0xffff0000 <= val <= 0xffffffff):
comm = self.ReservedCommunity(self.data[:4])
else:
comm = self.Community(self.data[:4])
self.data = self.data[len(comm):]
l_.append(comm)
self.data = self.list = l_
def __len__(self):
return sum(map(len, self.data))
def __bytes__(self):
return b''.join(map(bytes, self.data))
class Community(dpkt.Packet):
__hdr__ = (
('asn', 'H', 0),
('value', 'H', 0)
)
class ReservedCommunity(dpkt.Packet):
__hdr__ = (
('value', 'I', 0),
)
class OriginatorID(dpkt.Packet):
__hdr__ = (
('value', 'I', 0),
)
class ClusterList(dpkt.Packet):
__hdr_defaults__ = {
'list': []
}
def unpack(self, buf):
self.data = buf
l_ = []
while self.data:
id = struct.unpack('>I', self.data[:4])[0]
self.data = self.data[4:]
l_.append(id)
self.data = self.list = l_
def __len__(self):
return 4 * len(self.list)
def __bytes__(self):
cluster_str = b''
for val in self.list:
cluster_str += struct.pack('>I', val)
return cluster_str
class MPReachNLRI(dpkt.Packet):
__hdr__ = (
('afi', 'H', AFI_IPV4),
('safi', 'B', SAFI_UNICAST),
)
def unpack(self, buf):
dpkt.Packet.unpack(self, buf)
# Next Hop
hop_len = 4
if self.afi == AFI_IPV6:
hop_len = 16
l_ = []
nlen = struct.unpack('B', self.data[:1])[0]
self.data = self.data[1:]
# next_hop is kept for backward compatibility
self.next_hop = self.data[:nlen]
while nlen > 0:
hop = self.data[:hop_len]
l_.append(hop)
self.data = self.data[hop_len:]
nlen -= hop_len
self.next_hops = l_
# SNPAs
l_ = []
num_snpas = struct.unpack('B', self.data[:1])[0]
self.data = self.data[1:]
for i in range(num_snpas):
snpa = self.SNPA(self.data)
self.data = self.data[len(snpa):]
l_.append(snpa)
self.snpas = l_
if self.afi == AFI_IPV4:
Route = RouteIPV4
elif self.afi == AFI_IPV6:
Route = RouteIPV6
elif self.afi == AFI_L2VPN:
Route = RouteEVPN
else:
Route = RouteGeneric
# Announced Routes
l_ = []
while self.data:
route = Route(self.data)
self.data = self.data[len(route):]
l_.append(route)
self.data = self.announced = l_
def __len__(self):
return self.__hdr_len__ + \
1 + sum(map(len, self.next_hops)) + \
1 + sum(map(len, self.snpas)) + \
sum(map(len, self.announced))
def __bytes__(self):
return self.pack_hdr() + \
struct.pack('B', sum(map(len, self.next_hops))) + \
b''.join(map(bytes, self.next_hops)) + \
struct.pack('B', len(self.snpas)) + \
b''.join(map(bytes, self.snpas)) + \
b''.join(map(bytes, self.announced))
class SNPA(dpkt.Packet):
__hdr__ = (
('len', 'B', 0),
)
def unpack(self, buf):
dpkt.Packet.unpack(self, buf)
self.data = self.data[:(self.len + 1) // 2]
class MPUnreachNLRI(dpkt.Packet):
__hdr__ = (
('afi', 'H', AFI_IPV4),
('safi', 'B', SAFI_UNICAST),
)
def unpack(self, buf):
dpkt.Packet.unpack(self, buf)
if self.afi == AFI_IPV4:
Route = RouteIPV4
elif self.afi == AFI_IPV6:
Route = RouteIPV6
elif self.afi == AFI_L2VPN:
Route = RouteEVPN
else:
Route = RouteGeneric
# Withdrawn Routes
l_ = []
while self.data:
route = Route(self.data)
self.data = self.data[len(route):]
l_.append(route)
self.data = self.withdrawn = l_
def __len__(self):
return self.__hdr_len__ + sum(map(len, self.data))
def __bytes__(self):
return self.pack_hdr() + b''.join(map(bytes, self.data))
class Notification(dpkt.Packet):
__hdr__ = (
('code', 'B', 0),
('subcode', 'B', 0),
)
def unpack(self, buf):
dpkt.Packet.unpack(self, buf)
self.error = self.data
class Keepalive(dpkt.Packet):
def unpack(self, buf):
pass
def __len__(self):
return 0
def __bytes__(self):
return b''
class RouteRefresh(dpkt.Packet):
__hdr__ = (
('afi', 'H', AFI_IPV4),
('rsvd', 'B', 0),
('safi', 'B', SAFI_UNICAST)
)
class RouteGeneric(dpkt.Packet):
__hdr__ = (
('len', 'B', 0),
)
def unpack(self, buf):
dpkt.Packet.unpack(self, buf)
self.data = self.prefix = self.data[:(self.len + 7) // 8]
class RouteIPV4(dpkt.Packet):
__hdr__ = (
('len', 'B', 0),
)
def unpack(self, buf):
dpkt.Packet.unpack(self, buf)
tmp = self.data[:(self.len + 7) // 8]
tmp += (4 - len(tmp)) * b'\x00'
self.data = self.prefix = tmp
def __repr__(self):
cidr = '%s/%d' % (socket.inet_ntoa(self.prefix), self.len)
return '%s(%s)' % (self.__class__.__name__, cidr)
def __len__(self):
return self.__hdr_len__ + (self.len + 7) // 8
def __bytes__(self):
return self.pack_hdr() + self.prefix[:(self.len + 7) // 8]
class ExtendedRouteIPV4(RouteIPV4):
__hdr__ = (
('path_id', 'I', 0),
('len', 'B', 0),
)
def __repr__(self):
cidr = '%s/%d PathId %d' % (socket.inet_ntoa(self.prefix), self.len, self.path_id)
return '%s(%s)' % (self.__class__.__name__, cidr)
class RouteIPV6(dpkt.Packet):
__hdr__ = (
('len', 'B', 0),
)
def unpack(self, buf):
dpkt.Packet.unpack(self, buf)
tmp = self.data[:(self.len + 7) // 8]
tmp += (16 - len(tmp)) * b'\x00'
self.data = self.prefix = tmp
def __len__(self):
return self.__hdr_len__ + (self.len + 7) // 8
def __bytes__(self):
return self.pack_hdr() + self.prefix[:(self.len + 7) // 8]
class RouteEVPN(dpkt.Packet):
__hdr__ = (
('type', 'B', 0),
('len', 'B', 0)
)
def unpack(self, buf):
dpkt.Packet.unpack(self, buf)
self.route_data = buf = self.data[:self.len]
self.data = self.data[self.len:]
# Get route distinguisher.
self.rd = buf[:8]
buf = buf[8:]
# Get route information. Not all fields are present on all route types.
if self.type != 0x3:
self.esi = buf[:10]
buf = buf[10:]
if self.type != 0x4:
self.eth_id = buf[:4]
buf = buf[4:]
if self.type == 0x2:
self.mac_address_length = compat_ord(buf[0])
if self.mac_address_length == 48:
self.mac_address = buf[1:7]
buf = buf[7:]
else:
self.mac_address = None
buf = buf[1:]
if self.type != 0x1:
self.ip_address_length = compat_ord(buf[0])
if self.ip_address_length == 128:
self.ip_address = buf[1:17]
buf = buf[17:]
elif self.ip_address_length == 32:
self.ip_address = buf[1:5]
buf = buf[5:]
else:
self.ip_address = None
buf = buf[1:]
if self.type in [0x1, 0x2]:
self.mpls_label_stack = buf[:3]
buf = buf[3:]
if self.len > len(buf):
self.mpls_label_stack += buf[:3]
def __len__(self):
return self.__hdr_len__ + self.len
def __bytes__(self):
return self.pack_hdr() + self.route_data
__bgp1 = b'\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00\x13\x04'
__bgp2 = (
b'\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00\x63\x02\x00\x00\x00\x48\x40\x01'
b'\x01\x00\x40\x02\x0a\x01\x02\x01\xf4\x01\xf4\x02\x01\xfe\xbb\x40\x03\x04\xc0\xa8\x00\x0f\x40\x05\x04'
b'\x00\x00\x00\x64\x40\x06\x00\xc0\x07\x06\xfe\xba\xc0\xa8\x00\x0a\xc0\x08\x0c\xfe\xbf\x00\x01\x03\x16'
b'\x00\x04\x01\x54\x00\xfa\x80\x09\x04\xc0\xa8\x00\x0f\x80\x0a\x04\xc0\xa8\x00\xfa\x16\xc0\xa8\x04'
)
__bgp3 = (
b'\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00\x79\x02\x00\x00\x00\x62\x40\x01'
b'\x01\x00\x40\x02\x00\x40\x05\x04\x00\x00\x00\x64\xc0\x10\x08\x00\x02\x01\x2c\x00\x00\x01\x2c\xc0\x80'
b'\x24\x00\x00\xfd\xe9\x40\x01\x01\x00\x40\x02\x04\x02\x01\x15\xb3\x40\x05\x04\x00\x00\x00\x2c\x80\x09'
b'\x04\x16\x05\x05\x05\x80\x0a\x04\x16\x05\x05\x05\x90\x0e\x00\x1e\x00\x01\x80\x0c\x00\x00\x00\x00\x00'
b'\x00\x00\x00\x0c\x04\x04\x04\x00\x60\x18\x77\x01\x00\x00\x01\xf4\x00\x00\x01\xf4\x85'
)
__bgp4 = (
b'\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00\x2d\x01\x04\x00\xed\x00\x5a\xc6'
b'\x6e\x83\x7d\x10\x02\x06\x01\x04\x00\x01\x00\x01\x02\x02\x80\x00\x02\x02\x02\x00'
)
# BGP-EVPN type 1-4 packets for testing.
__bgp5 = (
b'\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00\x60\x02\x00\x00\x00\x49\x40\x01'
b'\x01\x00\x40\x02\x00\x40\x05\x04\x00\x00\x00\x64\xc0\x10\x10\x03\x0c\x00\x00\x00\x00\x00\x08\x00\x02'
b'\x03\xe8\x00\x00\x00\x02\x90\x0e\x00\x24\x00\x19\x46\x04\x01\x01\x01\x02\x00\x01\x19\x00\x01\x01\x01'
b'\x01\x02\x00\x02\x05\x00\x00\x03\xe8\x00\x00\x04\x00\x00\x00\x00\x00\x02\x00\x00\x02'
)
__bgp6 = (
b'\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00\x6f\x02\x00\x00\x00\x58\x40\x01'
b'\x01\x00\x40\x02\x00\x40\x05\x04\x00\x00\x00\x64\xc0\x10\x10\x03\x0c\x00\x00\x00\x00\x00\x08\x00\x02'
b'\x03\xe8\x00\x00\x00\x02\x90\x0e\x00\x33\x00\x19\x46\x04\x01\x01\x01\x02\x00\x02\x28\x00\x01\x01\x01'
b'\x01\x02\x00\x02\x05\x00\x00\x03\xe8\x00\x00\x04\x00\x00\x00\x00\x00\x02\x30\xcc\xaa\x02\x9c\xd8\x29'
b'\x20\xc0\xb4\x01\x02\x00\x00\x02\x00\x00\x00'
)
__bgp7 = (
b'\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00\x58\x02\x00\x00\x00\x41\x40\x01'
b'\x01\x00\x40\x02\x00\x40\x05\x04\x00\x00\x00\x64\xc0\x10\x10\x03\x0c\x00\x00\x00\x00\x00\x08\x00\x02'
b'\x03\xe8\x00\x00\x00\x02\x90\x0e\x00\x1c\x00\x19\x46\x04\x01\x01\x01\x02\x00\x03\x11\x00\x01\x01\x01'
b'\x01\x02\x00\x02\x00\x00\x00\x02\x20\xc0\xb4\x01\x02'
)
__bgp8 = (
b'\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00\x5f\x02\x00\x00\x00\x48\x40\x01'
b'\x01\x00\x40\x02\x00\x40\x05\x04\x00\x00\x00\x64\xc0\x10\x10\x03\x0c\x00\x00\x00\x00\x00\x08\x00\x02'
b'\x03\xe8\x00\x00\x00\x02\x90\x0e\x00\x23\x00\x19\x46\x04\x01\x01\x01\x02\x00\x04\x18\x00\x01\x01\x01'
b'\x01\x02\x00\x02\x05\x00\x00\x03\xe8\x00\x00\x04\x00\x00\x20\xc0\xb4\x01\x02'
)
__bgp9 = (
b'\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00\x7b\x02\x00\x00\x00\x64\x40\x01'
b'\x01\x00\x40\x02\x00\x40\x05\x04\x00\x00\x00\x64\xc0\x10\x10\x03\x0c\x00\x00\x00\x00\x00\x08\x00\x02'
b'\x03\xe8\x00\x00\x00\x02\x90\x0e\x00\x3f\x00\x19\x46\x04\x01\x01\x01\x02\x00\x02\x34\x00\x01\x01\x01'
b'\x01\x02\x00\x02\x05\x00\x00\x03\xe8\x00\x00\x04\x00\x00\x00\x00\x00\x02\x30\xcc\xaa\x02\x9c\xd8\x29'
b'\x80\xc0\xb4\x01\x02\xc0\xb4\x01\x02\xc0\xb4\x01\x02\xc0\xb4\x01\x02\x00\x00\x02\x00\x00\x00'
)
def test_pack():
assert (__bgp1 == bytes(BGP(__bgp1)))
assert (__bgp2 == bytes(BGP(__bgp2)))
assert (__bgp3 == bytes(BGP(__bgp3)))
assert (__bgp4 == bytes(BGP(__bgp4)))
assert (__bgp5 == bytes(BGP(__bgp5)))
assert (__bgp6 == bytes(BGP(__bgp6)))
assert (__bgp7 == bytes(BGP(__bgp7)))
assert (__bgp8 == bytes(BGP(__bgp8)))
assert (__bgp9 == bytes(BGP(__bgp9)))
def test_unpack():
b1 = BGP(__bgp1)
assert (b1.len == 19)
assert (b1.type == KEEPALIVE)
assert (b1.keepalive is not None)
b2 = BGP(__bgp2)
assert (b2.type == UPDATE)
assert (len(b2.update.withdrawn) == 0)
assert (len(b2.update.announced) == 1)
assert (len(b2.update.attributes) == 9)
a = b2.update.attributes[1]
assert (a.type == AS_PATH)
assert (a.len == 10)
assert (len(a.as_path.segments) == 2)
s = a.as_path.segments[0]
assert (s.type == AS_SET)
assert (s.len == 2)
assert (len(s.path) == 2)
assert (s.path[0] == 500)
a = b2.update.attributes[6]
assert (a.type == COMMUNITIES)
assert (a.len == 12)
assert (len(a.communities.list) == 3)
c = a.communities.list[0]
assert (c.asn == 65215)
assert (c.value == 1)
r = b2.update.announced[0]
assert (r.len == 22)
assert (r.prefix == b'\xc0\xa8\x04\x00')
b3 = BGP(__bgp3)
assert (b3.type == UPDATE)
assert (len(b3.update.withdrawn) == 0)
assert (len(b3.update.announced) == 0)
assert (len(b3.update.attributes) == 6)
a = b3.update.attributes[0]
assert (not a.optional)
assert (a.transitive)
assert (not a.partial)
assert (not a.extended_length)
assert (a.type == ORIGIN)
assert (a.len == 1)
o = a.origin
assert (o.type == ORIGIN_IGP)
a = b3.update.attributes[5]
assert (a.optional)
assert (not a.transitive)
assert (not a.partial)
assert (a.extended_length)
assert (a.type == MP_REACH_NLRI)
assert (a.len == 30)
m = a.mp_reach_nlri
assert (m.afi == AFI_IPV4)
assert (len(m.snpas) == 0)
assert (len(m.announced) == 1)
p = m.announced[0]
assert (p.len == 96)
b4 = BGP(__bgp4)
assert (b4.len == 45)
assert (b4.type == OPEN)
assert (b4.open.asn == 237)
assert (b4.open.param_len == 16)
assert (len(b4.open.parameters) == 3)
p = b4.open.parameters[0]
assert (p.type == CAPABILITY)
assert (p.len == 6)
c = p.capability
assert (c.code == CAP_MULTIPROTOCOL)
assert (c.len == 4)
assert (c.data == b'\x00\x01\x00\x01')
c = b4.open.parameters[2].capability
assert (c.code == CAP_ROUTE_REFRESH)
assert (c.len == 0)
b5 = BGP(__bgp5)
assert (b5.len == 96)
assert (b5.type == UPDATE)
assert (len(b5.update.withdrawn) == 0)
a = b5.update.attributes[-1]
assert (a.type == MP_REACH_NLRI)
assert (a.len == 36)
m = a.mp_reach_nlri
assert (m.afi == AFI_L2VPN)
assert (m.safi == SAFI_EVPN)
r = m.announced[0]
assert (r.type == 1)
assert (r.len == 25)
assert (r.rd == b'\x00\x01\x01\x01\x01\x02\x00\x02')
assert (r.esi == b'\x05\x00\x00\x03\xe8\x00\x00\x04\x00\x00')
assert (r.eth_id == b'\x00\x00\x00\x02')
assert (r.mpls_label_stack == b'\x00\x00\x02')
b6 = BGP(__bgp6)
assert (b6.len == 111)
assert (b6.type == UPDATE)
assert (len(b6.update.withdrawn) == 0)
a = b6.update.attributes[-1]
assert (a.type == MP_REACH_NLRI)
assert (a.len == 51)
m = a.mp_reach_nlri
assert (m.afi == AFI_L2VPN)
assert (m.safi == SAFI_EVPN)
r = m.announced[0]
assert (r.type == 2)
assert (r.len == 40)
assert (r.rd == b'\x00\x01\x01\x01\x01\x02\x00\x02')
assert (r.esi == b'\x05\x00\x00\x03\xe8\x00\x00\x04\x00\x00')
assert (r.eth_id == b'\x00\x00\x00\x02')
assert (r.mac_address_length == 48)
assert (r.mac_address == b'\xcc\xaa\x02\x9c\xd8\x29')
assert (r.ip_address_length == 32)
assert (r.ip_address == b'\xc0\xb4\x01\x02')
assert (r.mpls_label_stack == b'\x00\x00\x02\x00\x00\x00')
b7 = BGP(__bgp7)
assert (b7.len == 88)
assert (b7.type == UPDATE)
assert (len(b7.update.withdrawn) == 0)
a = b7.update.attributes[-1]
assert (a.type == MP_REACH_NLRI)
assert (a.len == 28)
m = a.mp_reach_nlri
assert (m.afi == AFI_L2VPN)
assert (m.safi == SAFI_EVPN)
r = m.announced[0]
assert (r.type == 3)
assert (r.len == 17)
assert (r.rd == b'\x00\x01\x01\x01\x01\x02\x00\x02')
assert (r.eth_id == b'\x00\x00\x00\x02')
assert (r.ip_address_length == 32)
assert (r.ip_address == b'\xc0\xb4\x01\x02')
b8 = BGP(__bgp8)
assert (b8.len == 95)
assert (b8.type == UPDATE)
assert (len(b8.update.withdrawn) == 0)
a = b8.update.attributes[-1]
assert (a.type == MP_REACH_NLRI)
assert (a.len == 35)
m = a.mp_reach_nlri
assert (m.afi == AFI_L2VPN)
assert (m.safi == SAFI_EVPN)
r = m.announced[0]
assert (r.type == 4)
assert (r.len == 24)
assert (r.rd == b'\x00\x01\x01\x01\x01\x02\x00\x02')
assert (r.esi == b'\x05\x00\x00\x03\xe8\x00\x00\x04\x00\x00')
assert (r.ip_address_length == 32)
assert (r.ip_address == b'\xc0\xb4\x01\x02')
b9 = BGP(__bgp9)
assert (b9.len == 123)
assert (b9.type == UPDATE)
assert (len(b9.update.withdrawn) == 0)
a = b9.update.attributes[-1]
assert (a.type == MP_REACH_NLRI)
assert (a.len == 63)
m = a.mp_reach_nlri
assert (m.afi == AFI_L2VPN)
assert (m.safi == SAFI_EVPN)
r = m.announced[0]
assert (r.type == 2)
assert (r.len == 52)
assert (r.rd == b'\x00\x01\x01\x01\x01\x02\x00\x02')
assert (r.esi == b'\x05\x00\x00\x03\xe8\x00\x00\x04\x00\x00')
assert (r.eth_id == b'\x00\x00\x00\x02')
assert (r.mac_address_length == 48)
assert (r.mac_address == b'\xcc\xaa\x02\x9c\xd8\x29')
assert (r.ip_address_length == 128)
assert (r.ip_address == b'\xc0\xb4\x01\x02\xc0\xb4\x01\x02\xc0\xb4\x01\x02\xc0\xb4\x01\x02')
assert (r.mpls_label_stack == b'\x00\x00\x02\x00\x00\x00')
def test_bgp_mp_nlri_20_1_mp_reach_nlri_next_hop():
# test for https://github.com/kbandla/dpkt/issues/485
__bgp = (
b'\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00\x6c\x02\x00\x00\x00\x55\x40\x01'
b'\x01\x00\x40\x02\x04\x02\x01\xfd\xe9\x80\x04\x04\x00\x00\x00\x00\x80\x0e\x40\x00\x02\x01\x20\x20\x01'
b'\x0d\xb8\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\xfe\x80\x00\x00\x00\x00\x00\x00\xc0\x01\x0b'
b'\xff\xfe\x7e\x00\x00\x00\x40\x20\x01\x0d\xb8\x00\x01\x00\x02\x40\x20\x01\x0d\xb8\x00\x01\x00\x01\x40'
b'\x20\x01\x0d\xb8\x00\x01\x00\x00'
)
assert (__bgp == bytes(BGP(__bgp)))
bgp = BGP(__bgp)
assert (len(bgp.data) == 89)
assert (bgp.type == UPDATE)
assert (len(bgp.update.withdrawn) == 0)
assert (len(bgp.update.announced) == 0)
assert (len(bgp.update.attributes) == 4)
attribute = bgp.update.attributes[0]
assert (attribute.type == ORIGIN)
assert (not attribute.optional)
assert (attribute.transitive)
assert (not attribute.partial)
assert (not attribute.extended_length)
assert (attribute.len == 1)
o = attribute.origin
assert (o.type == ORIGIN_IGP)
attribute = bgp.update.attributes[1]
assert (attribute.type == AS_PATH)
assert (not attribute.optional)
assert (attribute.transitive)
assert (not attribute.partial)
assert (not attribute.extended_length)
assert (attribute.flags == 64)
assert (attribute.len == 4)
assert (len(attribute.as_path.segments) == 1)
segment = attribute.as_path.segments[0]
assert (segment.type == AS_SEQUENCE)
assert (segment.len == 1)
assert (len(segment.path) == 1)
assert (segment.path[0] == 65001)
attribute = bgp.update.attributes[2]
assert (attribute.type == MULTI_EXIT_DISC)
assert (attribute.optional)
assert (not attribute.transitive)
assert (not attribute.partial)
assert (not attribute.extended_length)
assert (attribute.flags == 0x80)
assert (attribute.len == 4)
assert (attribute.multi_exit_disc.value == 0)
attribute = bgp.update.attributes[3]
assert (attribute.type == MP_REACH_NLRI)
assert (attribute.optional)
assert (not attribute.transitive)
assert (not attribute.partial)
assert (not attribute.extended_length)
assert (attribute.flags == 0x80)
assert (attribute.len == 64)
mp_reach_nlri = attribute.mp_reach_nlri
assert (mp_reach_nlri.afi == AFI_IPV6)
assert (mp_reach_nlri.safi == SAFI_UNICAST)
assert (len(mp_reach_nlri.snpas) == 0)
assert (len(mp_reach_nlri.announced) == 3)
prefix = mp_reach_nlri.announced[0]
assert (socket.inet_ntop(socket.AF_INET6, prefix.prefix) == '2001:db8:1:2::')
assert (prefix.len == 64)
prefix = mp_reach_nlri.announced[1]
assert (socket.inet_ntop(socket.AF_INET6, prefix.prefix) == '2001:db8:1:1::')
assert (prefix.len == 64)
prefix = mp_reach_nlri.announced[2]
assert (socket.inet_ntop(socket.AF_INET6, prefix.prefix) == '2001:db8:1::')
assert (prefix.len == 64)
assert (len(mp_reach_nlri.next_hops) == 2)
assert (socket.inet_ntop(socket.AF_INET6, mp_reach_nlri.next_hops[0]) == '2001:db8::1')
assert (socket.inet_ntop(socket.AF_INET6, mp_reach_nlri.next_hops[1]) == 'fe80::c001:bff:fe7e:0')
assert (mp_reach_nlri.next_hop == b''.join(mp_reach_nlri.next_hops))
def test_bgp_add_path_6_1_as_path():
# test for https://github.com/kbandla/dpkt/issues/481
# Error processing BGP data: packet 6 : message 1 of bgp-add-path.cap
# https://packetlife.net/media/captures/bgp-add-path.cap
__bgp = (
b'\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00\x59\x02\x00\x00\x00\x30\x40\x01'
b'\x01\x00\x40\x02\x06\x02\x01\x00\x00\xfb\xff\x40\x03\x04\x0a\x00\x0e\x01\x80\x04\x04\x00\x00\x00\x00'
b'\x40\x05\x04\x00\x00\x00\x64\x80\x0a\x04\x0a\x00\x22\x04\x80\x09\x04\x0a\x00\x0f\x01\x00\x00\x00\x01'
b'\x20\x05\x05\x05\x05\x00\x00\x00\x01\x20\xc0\xa8\x01\x05'
)
bgp = BGP(__bgp)
assert (__bgp == bytes(bgp))
assert (len(bgp) == 89)
assert (bgp.type == UPDATE)
assert (len(bgp.update.withdrawn) == 0)
announced = bgp.update.announced
assert (len(announced) == 2)
assert (announced[0].len == 32)
assert (announced[0].path_id == 1)
assert (socket.inet_ntop(socket.AF_INET, bytes(announced[0].prefix)) == '5.5.5.5')
assert (announced[1].len == 32)
assert (announced[1].path_id == 1)
assert (socket.inet_ntop(socket.AF_INET, bytes(announced[1].prefix)) == '192.168.1.5')
assert (len(bgp.update.attributes) == 7)
attribute = bgp.update.attributes[0]
assert (attribute.type == ORIGIN)
assert (not attribute.optional)
assert (attribute.transitive)
assert (not attribute.partial)
assert (not attribute.extended_length)
assert (attribute.flags == 0x40)
assert (attribute.len == 1)
assert (attribute.origin.type == ORIGIN_IGP)
attribute = bgp.update.attributes[1]
assert (attribute.type == AS_PATH)
assert (not attribute.optional)
assert (attribute.transitive)
assert (not attribute.partial)
assert (not attribute.extended_length)
assert (attribute.flags == 0x40)
assert (attribute.len == 6)
assert (len(attribute.as_path.segments) == 1)
segment = attribute.as_path.segments[0]
assert (segment.type == AS_SEQUENCE)
assert (segment.len == 1)
assert (len(segment.path) == 1)
assert (segment.path[0] == 64511)
attribute = bgp.update.attributes[2]
assert (attribute.type == NEXT_HOP)
assert (not attribute.optional)
assert (attribute.transitive)
assert (not attribute.partial)
assert (not attribute.extended_length)
assert (attribute.flags == 0x40)
assert (attribute.len == 4)
assert (socket.inet_ntop(socket.AF_INET, bytes(attribute.next_hop)) == '10.0.14.1')
attribute = bgp.update.attributes[3]
assert (attribute.type == MULTI_EXIT_DISC)
assert (attribute.optional)
assert (not attribute.transitive)
assert (not attribute.partial)
assert (not attribute.extended_length)
assert (attribute.flags == 0x80)
assert (attribute.len == 4)
assert (attribute.multi_exit_disc.value == 0)
attribute = bgp.update.attributes[4]
assert (attribute.type == LOCAL_PREF)
assert (not attribute.optional)
assert (attribute.transitive)
assert (not attribute.partial)
assert (not attribute.extended_length)
assert (attribute.flags == 0x40)
assert (attribute.len == 4)
assert (attribute.local_pref.value == 100)
attribute = bgp.update.attributes[5]
assert (attribute.type == CLUSTER_LIST)
assert (attribute.optional)
assert (not attribute.transitive)
assert (not attribute.partial)
assert (not attribute.extended_length)
assert (attribute.flags == 0x80)
assert (attribute.len == 4)
assert (socket.inet_ntop(socket.AF_INET, bytes(attribute.cluster_list)) == '10.0.34.4')
attribute = bgp.update.attributes[6]
assert (attribute.type == ORIGINATOR_ID)
assert (attribute.optional)
assert (not attribute.transitive)
assert (not attribute.partial)
assert (not attribute.extended_length)
assert (attribute.flags == 0x80)
assert (attribute.len == 4)
assert (socket.inet_ntop(socket.AF_INET, bytes(attribute.originator_id)) == '10.0.15.1')
def test_attribute_accessors():
from binascii import unhexlify
buf = unhexlify(
'00' # flags
'01' # type (ORIGIN)
'01' # length
'00' # Origin type
)
attribute = BGP.Update.Attribute(buf)
assert isinstance(attribute.data, BGP.Update.Attribute.Origin)
for attr in ['optional', 'transitive', 'partial', 'extended_length']:
assert getattr(attribute, attr) == 0
# check we can set..
setattr(attribute, attr, 1)
assert getattr(attribute, attr) == 1
# and also unset
setattr(attribute, attr, 0)
assert getattr(attribute, attr) == 0
def test_snpa():
from binascii import unhexlify
buf = unhexlify(
'04' # len (in semi-octets)
'1234' # data
)
snpa = BGP.Update.Attribute.MPReachNLRI.SNPA(buf)
assert snpa.len == 4 # length of the data in semi-octets
assert len(snpa) == 3 # length of the snpa in bytes (including header)
assert bytes(snpa) == buf
def test_mpreachnlri():
from binascii import unhexlify
buf = unhexlify(
'0000' # afi
'00' # safi
'00' # nlen
'01' # num SNPAs
# SNPA
'04' # len
'1234' # data
)
mp = BGP.Update.Attribute.MPReachNLRI(buf)
assert len(mp.snpas) == 1
assert bytes(mp) == buf
def test_notification():
from binascii import unhexlify
buf_notification = unhexlify(
'11' # code
'22' # subcode
'33' # error
)
notification = BGP.Notification(buf_notification)
assert notification.code == 0x11
assert notification.subcode == 0x22
assert notification.error == b'\x33'
assert bytes(notification) == buf_notification
buf_bgp_hdr = unhexlify(
'11111111111111111111111111111111' # marker
'0016' # len
'03' # type (NOTIFICATION)
)
bgp = BGP(buf_bgp_hdr + buf_notification)
assert hasattr(bgp, 'notification')
assert isinstance(bgp.data, BGP.Notification)
assert bgp.data.code == 0x11
assert bgp.data.subcode == 0x22
assert bgp.data.error == b'\x33'
assert bytes(bgp) == buf_bgp_hdr + buf_notification
def test_keepalive():
keepalive = BGP.Keepalive(b'\x11')
assert len(keepalive) == 0
assert bytes(keepalive) == b''
def test_routegeneric():
from binascii import unhexlify
buf = unhexlify(
'08' # len (bits)
'11' # prefix
)
routegeneric = RouteGeneric(buf)
assert routegeneric.len == 8
assert routegeneric.prefix == b'\x11'
assert bytes(routegeneric) == buf
assert len(routegeneric) == 2
def test_routeipv4():
from binascii import unhexlify
buf = unhexlify(
'08' # len (bits)
'11' # prefix
)
routeipv4 = RouteIPV4(buf)
assert routeipv4.len == 8 # prefix len in bits
assert routeipv4.prefix == b'\x11\x00\x00\x00'
assert repr(routeipv4) == "RouteIPV4(17.0.0.0/8)"
assert bytes(routeipv4) == buf
assert len(routeipv4) == 2 # header + prefix(bytes)
def test_routeipv6():
from binascii import unhexlify
buf = unhexlify(
'08' # len (bits)
'22' # prefix
)
routeipv6 = RouteIPV4(buf)
assert routeipv6.len == 8 # prefix len in bits
assert routeipv6.prefix == b'\x22\x00\x00\x00'
assert bytes(routeipv6) == buf
assert len(routeipv6) == 2 # header + prefix(bytes)
def test_extendedrouteipv4():
from binascii import unhexlify
buf = unhexlify(
'00000001' # path_id
'20' # len (bits)
'05050505' # prefix
)
extendedrouteipv4 = ExtendedRouteIPV4(buf)
assert extendedrouteipv4.path_id == 1
assert extendedrouteipv4.len == 32
assert extendedrouteipv4.prefix == unhexlify('05050505')
assert repr(extendedrouteipv4) == "ExtendedRouteIPV4(5.5.5.5/32 PathId 1)"
assert bytes(extendedrouteipv4) == buf
assert len(extendedrouteipv4) == len(buf)
def test_routeevpn():
from binascii import unhexlify
buf = unhexlify(
'02' # type
'1a' # len
# route distinguisher
'1111111111111111'
# esi
'22222222222222222222'
# eth_id
'33333333'
# mac address
'00' # len (bits)
# ip address
'00' # len (bits)
# mpls
'6666' # label stack
)
routeevpn = RouteEVPN(buf)
assert routeevpn.type == 2
assert routeevpn.len == 26
assert routeevpn.esi == unhexlify('22222222222222222222')
assert routeevpn.eth_id == unhexlify('33333333')
assert routeevpn.mac_address_length == 0
assert routeevpn.mac_address is None
assert routeevpn.ip_address_length == 0
assert routeevpn.ip_address is None
assert routeevpn.mpls_label_stack == unhexlify('6666')
assert bytes(routeevpn) == buf
assert len(routeevpn) == len(buf)
def test_route_refresh():
from binascii import unhexlify
buf_route_refresh = unhexlify(
'1111' # afi
'22' # rsvd
'33' # safi
)
route_refresh = BGP.RouteRefresh(buf_route_refresh)
assert route_refresh.afi == 0x1111
assert route_refresh.rsvd == 0x22
assert route_refresh.safi == 0x33
assert bytes(route_refresh) == buf_route_refresh
buf_bgp_hdr = unhexlify(
'11111111111111111111111111111111' # marker
'0017' # len
'05' # type (ROUTE_REFRESH)
)
bgp = BGP(buf_bgp_hdr + buf_route_refresh)
assert hasattr(bgp, 'route_refresh')
assert isinstance(bgp.data, BGP.RouteRefresh)
assert bgp.data.afi == 0x1111
assert bgp.data.rsvd == 0x22
assert bgp.data.safi == 0x33
assert bytes(bgp) == buf_bgp_hdr + buf_route_refresh
def test_mpunreachnlri():
from binascii import unhexlify
buf_routeipv4 = unhexlify(
'08' # len (bits)
'11' # prefix
)
buf_routeipv6 = unhexlify(
'08' # len (bits)
'22' # prefix
)
buf_routeevpn = unhexlify(
'02' # type
'1a' # len
# route distinguisher
'1111111111111111'
# esi
'22222222222222222222'
# eth_id
'33333333'
# mac address
'00' # len (bits)
# ip address
'00' # len (bits)
# mpls
'6666' # label stack
)
buf_routegeneric = unhexlify(
'08' # len (bits)
'33' # prefix
)
afi = struct.Struct('>H')
routes = (
(AFI_IPV4, buf_routeipv4, RouteIPV4),
(AFI_IPV6, buf_routeipv6, RouteIPV6),
(AFI_L2VPN, buf_routeevpn, RouteEVPN),
# this afi does not exist, so we will parse as RouteGeneric
(1234, buf_routegeneric, RouteGeneric),
)
for afi_id, buf, cls in routes:
buf = afi.pack(afi_id) + b'\xcc' + buf
mpu = BGP.Update.Attribute.MPUnreachNLRI(buf)
assert mpu.afi == afi_id
assert mpu.safi == 0xcc
assert len(mpu.data) == 1
route = mpu.data[0]
assert isinstance(route, cls)
assert bytes(mpu) == buf
assert len(mpu) == len(buf)
# test the unpacking of the routes, as an Attribute
attribute_hdr = struct.Struct('BBB')
for afi_id, buf, cls in routes:
buf_mpunreachnlri = afi.pack(afi_id) + b'\xcc' + buf
buf_attribute_hdr = attribute_hdr.pack(0, MP_UNREACH_NLRI, len(buf_mpunreachnlri))
buf = buf_attribute_hdr + buf_mpunreachnlri
attribute = BGP.Update.Attribute(buf)
assert isinstance(attribute.data, BGP.Update.Attribute.MPUnreachNLRI)
routes = attribute.data.data
assert len(routes) == 1
assert isinstance(routes[0], cls)
def test_update_withdrawn():
from binascii import unhexlify
buf_ipv4 = unhexlify(
'08' # len (bits)
'11' # prefix
)
packed_length = struct.Struct('>H').pack
wlen, plen = packed_length(len(buf_ipv4)), packed_length(0)
buf = wlen + buf_ipv4 + plen
update = BGP.Update(buf)
assert len(update.withdrawn) == 1
route = update.withdrawn[0]
assert isinstance(route, RouteIPV4)
assert bytes(update) == buf
def test_parameters():
from binascii import unhexlify
buf = unhexlify(
'44' # v
'1111' # asn
'2222' # holdtime
'33333333' # identifier
'03' # param_len
# Parameter
'01' # type (AUTHENTICATION)
'01' # len
# Authentication
'11' # code
)
bgp_open = BGP.Open(buf)
assert len(bgp_open.parameters) == 1
parameter = bgp_open.parameters[0]
assert isinstance(parameter, BGP.Open.Parameter)
assert isinstance(parameter.data, BGP.Open.Parameter.Authentication)
assert bytes(bgp_open) == buf
assert len(bgp_open) == len(buf)
def test_reservedcommunities():
from binascii import unhexlify
buf = unhexlify(
# ReservedCommunity
'00002222' # value
)
communities = BGP.Update.Attribute.Communities(buf)
assert len(communities.data) == 1
community = communities.data[0]
assert isinstance(community, BGP.Update.Attribute.Communities.ReservedCommunity)
assert len(community) == 4
assert bytes(community) == buf
assert len(communities) == 4
assert bytes(communities) == buf