blob: 64a783ff5bbe924a56af1c697fce4d7d00a88b15 [file] [log] [blame]
# -*- coding: utf-8 -*-
"""Radiotap"""
from __future__ import print_function
from __future__ import absolute_import
from . import dpkt
from . import ieee80211
from .compat import compat_ord
# Ref: http://www.radiotap.org
# Fields Ref: http://www.radiotap.org/defined-fields/all
# Present flags
_TSFT_SHIFT = 0
_FLAGS_SHIFT = 1
_RATE_SHIFT = 2
_CHANNEL_SHIFT = 3
_FHSS_SHIFT = 4
_ANT_SIG_SHIFT = 5
_ANT_NOISE_SHIFT = 6
_LOCK_QUAL_SHIFT = 7
_TX_ATTN_SHIFT = 8
_DB_TX_ATTN_SHIFT = 9
_DBM_TX_POWER_SHIFT = 10
_ANTENNA_SHIFT = 11
_DB_ANT_SIG_SHIFT = 12
_DB_ANT_NOISE_SHIFT = 13
_RX_FLAGS_SHIFT = 14
_CHANNELPLUS_SHIFT = 18
_EXT_SHIFT = 31
# Flags elements
_FLAGS_SIZE = 2
_CFP_FLAG_SHIFT = 0
_PREAMBLE_SHIFT = 1
_WEP_SHIFT = 2
_FRAG_SHIFT = 3
_FCS_SHIFT = 4
_DATA_PAD_SHIFT = 5
_BAD_FCS_SHIFT = 6
_SHORT_GI_SHIFT = 7
# Channel type
_CHAN_TYPE_SIZE = 4
_CHANNEL_TYPE_SHIFT = 4
_CCK_SHIFT = 5
_OFDM_SHIFT = 6
_TWO_GHZ_SHIFT = 7
_FIVE_GHZ_SHIFT = 8
_PASSIVE_SHIFT = 9
_DYN_CCK_OFDM_SHIFT = 10
_GFSK_SHIFT = 11
_GSM_SHIFT = 12
_STATIC_TURBO_SHIFT = 13
_HALF_RATE_SHIFT = 14
_QUARTER_RATE_SHIFT = 15
# Flags offsets and masks
_FCS_MASK = 0x10
class Radiotap(dpkt.Packet):
"""Radiotap.
TODO: Longer class information....
Attributes:
__hdr__: Header fields of Radiotap.
TODO.
"""
__hdr__ = (
('version', 'B', 0),
('pad', 'B', 0),
('length', 'H', 0),
)
__byte_order__ = '<'
def _is_present(self, bit):
index = bit // 8
mask = 1 << (bit % 8)
return 1 if self.present_flags[index] & mask else 0
def _set_bit(self, bit, val):
# present_flags is a bytearray, this gets the element
index = bit // 8
# mask retrieves every bit except our one
mask = ~(1 << (bit % 8) & 0xff)
# retrieve all of the bits, then or in the val at the appropriate place
# as the mask does not return the value at `bit`, if `val` is zero, the bit remains zero
self.present_flags[index] = (self.present_flags[index] & mask) | (val << bit % 8)
@property
def tsft_present(self):
return self._is_present(_TSFT_SHIFT)
@tsft_present.setter
def tsft_present(self, val):
self._set_bit(_TSFT_SHIFT, val)
@property
def flags_present(self):
return self._is_present(_FLAGS_SHIFT)
@flags_present.setter
def flags_present(self, val):
self._set_bit(_FLAGS_SHIFT, val)
@property
def rate_present(self):
return self._is_present(_RATE_SHIFT)
@rate_present.setter
def rate_present(self, val):
self._set_bit(_RATE_SHIFT, val)
@property
def channel_present(self):
return self._is_present(_CHANNEL_SHIFT)
@channel_present.setter
def channel_present(self, val):
self._set_bit(_CHANNEL_SHIFT, val)
@property
def fhss_present(self):
return self._is_present(_FHSS_SHIFT)
@fhss_present.setter
def fhss_present(self, val):
self._set_bit(_FHSS_SHIFT, val)
@property
def ant_sig_present(self):
return self._is_present(_ANT_SIG_SHIFT)
@ant_sig_present.setter
def ant_sig_present(self, val):
self._set_bit(_ANT_SIG_SHIFT, val)
@property
def ant_noise_present(self):
return self._is_present(_ANT_NOISE_SHIFT)
@ant_noise_present.setter
def ant_noise_present(self, val):
self._set_bit(_ANT_NOISE_SHIFT, val)
@property
def lock_qual_present(self):
return self._is_present(_LOCK_QUAL_SHIFT)
@lock_qual_present.setter
def lock_qual_present(self, val):
self._set_bit(_LOCK_QUAL_SHIFT, val)
@property
def tx_attn_present(self):
return self._is_present(_TX_ATTN_SHIFT)
@tx_attn_present.setter
def tx_attn_present(self, val):
self._set_bit(_TX_ATTN_SHIFT, val)
@property
def db_tx_attn_present(self):
return self._is_present(_DB_TX_ATTN_SHIFT)
@db_tx_attn_present.setter
def db_tx_attn_present(self, val):
self._set_bit(_DB_TX_ATTN_SHIFT, val)
@property
def dbm_tx_power_present(self):
return self._is_present(_DBM_TX_POWER_SHIFT)
@dbm_tx_power_present.setter
def dbm_tx_power_present(self, val):
self._set_bit(_DBM_TX_POWER_SHIFT, val)
@property
def ant_present(self):
return self._is_present(_ANTENNA_SHIFT)
@ant_present.setter
def ant_present(self, val):
self._set_bit(_ANTENNA_SHIFT, val)
@property
def db_ant_sig_present(self):
return self._is_present(_DB_ANT_SIG_SHIFT)
@db_ant_sig_present.setter
def db_ant_sig_present(self, val):
self._set_bit(_DB_ANT_SIG_SHIFT, val)
@property
def db_ant_noise_present(self):
return self._is_present(_DB_ANT_NOISE_SHIFT)
@db_ant_noise_present.setter
def db_ant_noise_present(self, val):
self._set_bit(_DB_ANT_NOISE_SHIFT, val)
@property
def rx_flags_present(self):
return self._is_present(_RX_FLAGS_SHIFT)
@rx_flags_present.setter
def rx_flags_present(self, val):
self._set_bit(_RX_FLAGS_SHIFT, val)
@property
def chanplus_present(self):
return self._is_present(_CHANNELPLUS_SHIFT)
@chanplus_present.setter
def chanplus_present(self, val):
self._set_bit(_CHANNELPLUS_SHIFT, val)
def unpack(self, buf):
dpkt.Packet.unpack(self, buf)
self.data = buf[self.length:]
self.fields = []
buf = buf[self.__hdr_len__:]
self.present_flags = bytearray(buf[:4])
buf = buf[4:]
ext_bit = _EXT_SHIFT
while self._is_present(ext_bit):
self.present_flags += bytearray(buf[:4])
buf = buf[4:]
ext_bit += 32
# decode each field into self.<name> (eg. self.tsft) as well as append it self.fields list
field_decoder = [
('tsft', self.tsft_present, self.TSFT),
('flags', self.flags_present, self.Flags),
('rate', self.rate_present, self.Rate),
('channel', self.channel_present, self.Channel),
('fhss', self.fhss_present, self.FHSS),
('ant_sig', self.ant_sig_present, self.AntennaSignal),
('ant_noise', self.ant_noise_present, self.AntennaNoise),
('lock_qual', self.lock_qual_present, self.LockQuality),
('tx_attn', self.tx_attn_present, self.TxAttenuation),
('db_tx_attn', self.db_tx_attn_present, self.DbTxAttenuation),
('dbm_tx_power', self.dbm_tx_power_present, self.DbmTxPower),
('ant', self.ant_present, self.Antenna),
('db_ant_sig', self.db_ant_sig_present, self.DbAntennaSignal),
('db_ant_noise', self.db_ant_noise_present, self.DbAntennaNoise),
('rx_flags', self.rx_flags_present, self.RxFlags),
('chanplus', self.chanplus_present, self.ChannelPlus)
]
offset = self.__hdr_len__ + len(self.present_flags)
for name, present_bit, parser in field_decoder:
if present_bit:
ali = parser.__alignment__
if ali > 1 and offset % ali:
padding = ali - offset % ali
buf = buf[padding:]
offset += padding
field = parser(buf)
field.data = b''
setattr(self, name, field)
self.fields.append(field)
buf = buf[len(field):]
offset += len(field)
if len(self.data) > 0:
if self.flags_present and self.flags.fcs:
self.data = ieee80211.IEEE80211(self.data, fcs=self.flags.fcs)
else:
self.data = ieee80211.IEEE80211(self.data)
class RadiotapField(dpkt.Packet):
__alignment__ = 1
__byte_order__ = '<'
class Antenna(RadiotapField):
__hdr__ = (
('index', 'B', 0),
)
class AntennaNoise(RadiotapField):
__hdr__ = (
('db', 'b', 0),
)
class AntennaSignal(RadiotapField):
__hdr__ = (
('db', 'b', 0),
)
class Channel(RadiotapField):
__alignment__ = 2
__hdr__ = (
('freq', 'H', 0),
('flags', 'H', 0),
)
class FHSS(RadiotapField):
__hdr__ = (
('set', 'B', 0),
('pattern', 'B', 0),
)
class Flags(RadiotapField):
__hdr__ = (
('val', 'B', 0),
)
@property
def fcs(self):
return (self.val & _FCS_MASK) >> _FCS_SHIFT
@fcs.setter
def fcs(self, v):
self.val = (v << _FCS_SHIFT) | (v & ~_FCS_MASK)
class LockQuality(RadiotapField):
__alignment__ = 2
__hdr__ = (
('val', 'H', 0),
)
class RxFlags(RadiotapField):
__alignment__ = 2
__hdr__ = (
('val', 'H', 0),
)
class Rate(RadiotapField):
__hdr__ = (
('val', 'B', 0),
)
class TSFT(RadiotapField):
__alignment__ = 8
__hdr__ = (
('usecs', 'Q', 0),
)
class TxAttenuation(RadiotapField):
__alignment__ = 2
__hdr__ = (
('val', 'H', 0),
)
class DbTxAttenuation(RadiotapField):
__alignment__ = 2
__hdr__ = (
('db', 'H', 0),
)
class DbAntennaNoise(RadiotapField):
__hdr__ = (
('db', 'B', 0),
)
class DbAntennaSignal(RadiotapField):
__hdr__ = (
('db', 'B', 0),
)
class DbmTxPower(RadiotapField):
__hdr__ = (
('dbm', 'B', 0),
)
class ChannelPlus(RadiotapField):
__alignment__ = 4
__hdr__ = (
('flags', 'I', 0),
('freq', 'H', 0),
('channel', 'B', 0),
('maxpower', 'B', 0),
)
def test_radiotap_1():
s = b'\x00\x00\x00\x18\x6e\x48\x00\x00\x00\x02\x6c\x09\xa0\x00\xa8\x81\x02\x00\x00\x00\x00\x00\x00\x00'
rad = Radiotap(s)
assert(rad.version == 0)
assert(rad.present_flags == b'\x6e\x48\x00\x00')
assert(rad.tsft_present == 0)
assert(rad.flags_present == 1)
assert(rad.rate_present == 1)
assert(rad.channel_present == 1)
assert(rad.fhss_present == 0)
assert(rad.ant_sig_present == 1)
assert(rad.ant_noise_present == 1)
assert(rad.lock_qual_present == 0)
assert(rad.db_tx_attn_present == 0)
assert(rad.dbm_tx_power_present == 0)
assert(rad.ant_present == 1)
assert(rad.db_ant_sig_present == 0)
assert(rad.db_ant_noise_present == 0)
assert(rad.rx_flags_present == 1)
assert(rad.channel.freq == 0x096c)
assert(rad.channel.flags == 0xa0)
assert(len(rad.fields) == 7)
def test_radiotap_2():
s = (b'\x00\x00\x30\x00\x2f\x40\x00\xa0\x20\x08\x00\xa0\x20\x08\x00\xa0\x20\x08\x00\x00\x00\x00'
b'\x00\x00\x08\x84\xbd\xac\x28\x00\x00\x00\x10\x02\x85\x09\xa0\x00\xa5\x00\x00\x00\xa1\x00'
b'\x9f\x01\xa1\x02')
rad = Radiotap(s)
assert(rad.version == 0)
assert(rad.present_flags == b'\x2f\x40\x00\xa0\x20\x08\x00\xa0\x20\x08\x00\xa0\x20\x08\x00\x00')
assert(rad.tsft_present)
assert(rad.flags_present)
assert(rad.rate_present)
assert(rad.channel_present)
assert(not rad.fhss_present)
assert(rad.ant_sig_present)
assert(not rad.ant_noise_present)
assert(not rad.lock_qual_present)
assert(not rad.db_tx_attn_present)
assert(not rad.dbm_tx_power_present)
assert(not rad.ant_present)
assert(not rad.db_ant_sig_present)
assert(not rad.db_ant_noise_present)
assert(rad.rx_flags_present)
assert(rad.channel.freq == 2437)
assert(rad.channel.flags == 0x00a0)
assert(len(rad.fields) == 6)
assert(rad.flags_present)
assert(rad.flags.fcs)
def test_fcs():
s = b'\x00\x00\x1a\x00\x2f\x48\x00\x00\x34\x8f\x71\x09\x00\x00\x00\x00\x10\x0c\x85\x09\xc0\x00\xcc\x01\x00\x00'
rt = Radiotap(s)
assert(rt.flags_present == 1)
assert(rt.flags.fcs == 1)
def test_radiotap_3(): # xchannel aka channel plus field
s = (
b'\x00\x00\x20\x00\x67\x08\x04\x00\x84\x84\x66\x25\x00\x00\x00\x00\x22\x0c\xd6\xa0\x01\x00\x00\x00\x40'
b'\x01\x00\x00\x3c\x14\x24\x11\x08\x02\x00\x00\xff\xff\xff\xff\xff\xff\x06\x03\x7f\x07\xa0\x16\x00\x19'
b'\xe3\xd3\x53\x52\x00\x8e\xaa\xaa\x03\x00\x00\x00\x08\x06\x00\x01\x08\x00\x06\x04\x00\x01\x00\x19\xe3'
b'\xd3\x53\x52\xa9\xfe\xf7\x00\x00\x00\x00\x00\x00\x00\x4f\x67\x32\x38'
)
rt = Radiotap(s)
assert rt.ant_noise.db == -96
assert rt.ant_sig.db == -42
assert rt.ant.index == 1
assert rt.chanplus_present
assert rt.chanplus.flags == 0x140
assert rt.chanplus.freq == 5180
assert rt.chanplus.channel == 36
assert rt.chanplus.maxpower == 17
assert len(rt.fields) == 7
assert repr(rt.data).startswith('IEEE80211')
def test_radiotap_properties():
from binascii import unhexlify
buf = unhexlify(
'00'
'00'
'0018'
'0000000000000000000000000000000000000000'
)
radiotap = Radiotap(buf)
property_keys = [
'tsft', 'flags', 'rate', 'channel', 'fhss', 'ant_sig', 'ant_noise',
'lock_qual', 'tx_attn', 'db_tx_attn', 'dbm_tx_power', 'ant',
'db_ant_sig', 'db_ant_noise', 'rx_flags', 'chanplus'
]
for prop in [key + '_present' for key in property_keys]:
print(prop)
assert hasattr(radiotap, prop)
assert getattr(radiotap, prop) == 0
setattr(radiotap, prop, 1)
assert getattr(radiotap, prop) == 1
setattr(radiotap, prop, 0)
assert getattr(radiotap, prop) == 0
def test_radiotap_unpack_fcs():
from binascii import unhexlify
buf = unhexlify(
'00' # version
'00' # pad
'1800' # length
'6e48000011026c09a000a8810200000000000000'
'd40000000012f0b61ca4ffffffff'
)
radiotap = Radiotap(buf)
assert radiotap.data.fcs_present == 1
def test_flags():
flags = Radiotap.Flags(b'\x00')
assert flags.fcs == 0
flags.fcs = 1
assert flags.fcs == 1