Source code for wltrace.radiotap
"""Radiotap header parser.
"""
import struct
import binascii
import common
import utils
import dot11
_IT_VERSION = 0
_CHANNEL_FLAG_TURBO = 0x0010
_CHANNEL_FLAG_CCK = 0x0020
_CHANNEL_FLAG_OFDM = 0x0040
_CHANNEL_FLAG_2GHZ = 0x0080
_CHANNEL_FLAG_5GHZ = 0x0100
_CHANNEL_FLAG_PASSIVE_ONLY = 0x0200
_CHANNEL_FLAG_DYNAMIC = 0x0400
_CHANNEL_FLAG_GFSK = 0x0800
_FLAG_HAS_FCS = 0x10
_FLAG_FCS_ERROR = 0x40
_MCS_KNOWN_BANDWIDTH = 0x01
_MCS_KNOWN_MCS = 0x02
_MCS_KNOWN_GI = 0x04
_MCS_KNOWN_HT = 0x08
_MCS_FLAG_BANDWIDTH = 0x03
_MCS_FLAG_GI = 0x04
_MCS_FLAG_HT = 0x08
_PRESENT_FLAG_TSFT = 1 << 0
_PRESENT_FLAG_FLAG = 1 << 1
_PRESENT_FLAG_RATE = 1 << 2
_PRESENT_FLAG_CHANNEL = 1 << 3
_PRESENT_FLAG_SIGNAL = 1 << 5
_PRESENT_FLAG_NOISE = 1 << 6
_PRESENT_FLAG_MCS = 1 << 19
_PRESENT_FLAG_AMPDU = 1 << 20
[docs]class RadiotapHeader(common.GenericHeader):
"""Radiotap header.
See this document for radiotap header format:
http://www.radiotap.org/
See this document for all defined radiotap fields:
http://www.radiotap.org/defined-fields/all
"""
PACK_PATTERN = '<BBHI'
"""Radiotap header is always in little endian.
"""
FIELDS = [
'_it_version',
'_it_pad',
'_it_len',
'_it_present',
]
PRESENT_FLAGS = [
# (idx, unpack_fmt, field, align)
(0, 'Q', 'mactime', 8),
(1, 'B', '_flags', 1),
(2, 'B', 'rate', 1),
(3, 'I', '_channel', 2),
(4, 'xx', 'unused', 1),
(5, 'b', 'signal', 1),
(6, 'b', 'noise', 1),
(7, 'xx', 'unused', 2),
(8, 'xx', 'unused', 2),
(9, 'xx', 'unused', 2),
(10, 'x', 'unused', 1),
(11, 'x', 'unused', 1),
(12, 'x', 'unused', 1),
(13, 'x', 'unused', 1),
(14, 'xx', 'unused', 2),
(19, 'bbb', 'mcs', 1),
(20, 'IHxx', '_ampdu', 4),
]
def __init__(self, fh, *args, **kwargs):
cls = self.__class__
super(cls, self).__init__(fh, *args, **kwargs)
if self._it_version != _IT_VERSION:
raise Exception('Incorrect version: expect %d, got %d' %
(cls._it_version, self._it_version))
rest_len = self._it_len - struct.calcsize(cls.PACK_PATTERN)
rest = fh.read(rest_len)
if len(rest) != rest_len:
raise Exception('Short read: expect %d, got %d' %
(rest_len, len(rest)))
self.payload = rest
self.offset = 0
present = self._it_present
shift = 32
while (present >> 31) > 0:
present, = self.unpack('<I')
self._it_present = (present << shift) + self._it_present
shift += 32
for idx, fmt, field, align in cls.PRESENT_FLAGS:
if self._it_present & (1 << idx):
self.offset = utils.align_up(self.offset, align)
val = self.unpack(fmt)
if len(val) == 1:
val = val[0]
setattr(self, field, val)
else:
setattr(self, field, None)
if self._it_present & _PRESENT_FLAG_CHANNEL:
self.freq_mhz = self._channel & 0x0000ffff
self.freq_flag = self._channel >> 16
delattr(self, '_channel')
else:
self.freq_mhz = None
self.freq_flag = None
if self._it_present & _PRESENT_FLAG_FLAG:
self.has_fcs = self._flags & _FLAG_HAS_FCS > 0
self.fcs_error = self._flags & _FLAG_FCS_ERROR
else:
self.has_fcs = False
self.fcs_error = False
if self._it_present & _PRESENT_FLAG_RATE:
self.rate /= 2.0
else:
self.rate = 0
if self._it_present & _PRESENT_FLAG_MCS:
mcs_known, mcs_flags, self.mcs = self.mcs
if mcs_flags & 0x3 in [0, 2, 3]:
bw = 20
else:
bw = 40
long_gi = (mcs_flags & 0x4) == 0
self.rate = dot11.mcs_to_rate(self.mcs, bw, long_gi)
else:
self.mcs = None
if self._it_present & _PRESENT_FLAG_AMPDU:
self.ampdu_ref, ampdu_flag = self._ampdu
self.last_frame = ampdu_flag & 0x8 > 0
else:
self.ampdu_ref = None
self.last_frame = True
@classmethod
[docs] def from_phy_info(cls, phy):
header = cls()
header.freq_mhz = phy.freq_mhz
if header.freq_mhz < 3000:
header.freq_flag = _CHANNEL_FLAG_2GHZ | _CHANNEL_FLAG_OFDM
else:
header.freq_flag = _CHANNEL_FLAG_5GHZ | _CHANNEL_FLAG_OFDM
header._channel = (header.freq_flag << 16) + header.freq_mhz
header._flags = _FLAG_HAS_FCS
if phy.fcs_error:
header._flags |= _FLAG_FCS_ERROR
if phy.rate < 256:
header.rate = phy.rate
header.epoch_ts = phy.epoch_ts
header.len = phy.len
header.caplen = phy.caplen
return header
[docs] def to_binary(self):
cls = self.__class__
offset = 0
present_flag = 0
payload = ''
for idx, fmt, field, align in cls.PRESENT_FLAGS:
if getattr(self, field, None) is None:
continue
present_flag |= (1 << idx)
aligned_offset = utils.align_up(offset, align)
if aligned_offset != offset:
fmt = '%s%s' % ('x' * (aligned_offset - offset), fmt)
try:
attr = getattr(self, field)
if type(attr) != tuple:
attr = (attr, )
payload += struct.pack(fmt, *attr)
except:
raise Exception('%s: %s' % (field, getattr(self, field)))
offset += struct.calcsize(fmt)
header = struct.pack(cls.PACK_PATTERN, 0, 0,
struct.calcsize(cls.PACK_PATTERN) + len(payload),
present_flag)
return header + payload