Source code for wltrace.dot11

"""IEEE802.11 protocol definitions and utilities.
"""

import struct
import datetime
import math
import binascii
import hashlib

from common import GenericHeader, PhyInfo
import utils


[docs]class Dot11Exception(Exception): pass
DOT11_TYPE_MANAGEMENT = 0 DOT11_TYPE_CONTROL = 1 DOT11_TYPE_DATA = 2 DOT11_TYPE_RSVD = 3 DOT11_SUBTYPE_ASSOC_REQ = 0 DOT11_SUBTYPE_ASSOC_RESP = 1 DOT11_SUBTYPE_REASSOC_REQ = 2 DOT11_SUBTYPE_REASSOC_RESP = 3 DOT11_SUBTYPE_PROBE_REQ = 4 DOT11_SUBTYPE_PROBE_RESP = 5 DOT11_SUBTYPE_RSVD = 7 DOT11_SUBTYPE_BEACON = 8 DOT11_SUBTYPE_DISASSOC = 10 DOT11_SUBTYPE_AUTH = 11 DOT11_SUBTYPE_DEAUTH = 12 DOT11_SUBTYPE_BLOCK_ACK = 9 DOT11_SUBTYPE_ACK = 0xd DOT11_SUBTYPE_DATA = 0 DOT11_SUBTYPE_NULL = 4 DOT11_SUBTYPE_QOS_DATA = 8 DOT11_SUBTYPE_QOS_NULL = 0xc MAX_ACK_LATENCY_US = 100 """Maximum allowed gap between a packet and its ack in the packet trace. """ SEQ_NUM_MODULO = 4096 MGMT_SUBTYPE_NAMES = { 0: 'Assoc Req', 1: 'Assoc Resp', 2: 'Reassoc Req', 3: 'Reassoc Resp', 4: 'Probe Req', 5: 'Probe Resq', 6: 'Timing Adv', 7: 'Rsvd', 8: 'Beacon', 9: 'ATIM', 10: 'Disassoc', 11: 'Auth', 12: 'Deauth', 13: 'Action', 14: 'Action no Ack', 15: 'Rsvd', } CTRL_SUBTYPE_NAMES = { 0: 'Rsvd', 1: 'Rsvd', 2: 'Rsvd', 3: 'Rsvd', 4: 'Rsvd', 5: 'Rsvd', 6: 'Rsvd', 7: 'Ctrl wrapper', 8: 'Block Ack req', 9: 'Block Ack', 10: 'PS-Poll', 11: 'RTS', 12: 'CTS', 13: 'Ack', 14: 'CF-End', 15: 'CF-End + CF-Ack', } DATA_SUBTYPE_NAMES = { 0: 'Data', 1: 'Data + CF-Ack', 2: 'Data + CF-Poll', 3: 'Data + CF-Ack + CF-Poll', 4: 'Null', 5: 'CF-Ack', 6: 'CF-Poll', 7: 'CF-Ack + CF-Poll', 8: 'QoS Data', 9: 'Qos Data + CF-Ack', 10: 'QoS Data + CF-Poll', 11: 'QoS Data + CF-Poll + CF-Ack', 12: 'QoS Null', 13: 'Rsvd', 14: 'QoS CF-Poll', 15: 'QoS CF-Ack + CF-Poll', } MCS_TABLE = { # http://mcsindex.com # MCS: [20MHz-LGI, 20MHz-SGI, 40MHz-LGI, 40MHz-SGI,...] 0: [6.5, 7.2, 13.5, 15, 29.3, 32.5, 58.5, 65], 1: [13, 14.4, 27, 30, 58.5, 65, 117, 130], 2: [19.5, 21.7, 40.5, 45, 87.8, 97.5, 175.5, 195], 3: [26, 28.9, 54, 60, 117, 130, 234, 260], 4: [39, 43.3, 81, 90, 175.5, 195, 351, 390], 5: [52, 57.8, 108, 120, 234, 260, 468, 520], 6: [58.5, 65, 121.5, 135, 263.3, 292.5, 526.5, 585], 7: [65, 72.2, 135, 150, 292.5, 325, 585, 650], 8: [13, 14.4, 27, 30, 58.5, 65, 117, 130], 9: [26, 28.9, 54, 60, 117, 130, 234, 260], 10: [39, 43.3, 81, 90, 175.5, 195, 351, 390], 11: [52, 57.8, 108, 120, 234, 260, 468, 520], 12: [78, 86.7, 162, 180, 351, 390, 702, 780], 13: [104, 115.6, 216, 240, 468, 520, 936, 1040], 14: [117, 130.3, 243, 270, 526.5, 585, 1053, 1170], 15: [130, 144.4, 270, 300, 585, 650, 1170, 1300], } DOT11A_RATES = [ 6, 9, 12, 18, 24, 36, 48, 54 ]
[docs]def mcs_to_rate(mcs, bw=20, long_gi=True): """Convert MCS index to rate in Mbps. See http://mcsindex.com/ Args: mcs (int): MCS index bw (int): bandwidth, 20, 40, 80, ... long_gi(bool): True if long GI is used. Returns: rate (float): bitrate in Mbps >>> mcs_to_rate(5, bw=20, long_gi=False) 57.8 >>> mcs_to_rate(4, bw=40, long_gi=True) 81 >>> mcs_to_rate(3, bw=80, long_gi=False) 130 >>> mcs_to_rate(13, bw=160, long_gi=True) 936 """ if bw not in [20, 40, 80, 160]: raise Exception("Unknown bandwidth: %d MHz" % (bw)) if mcs not in MCS_TABLE: raise Exception("Unknown MCS: %d" % (mcs)) idx = int((math.log(bw/10, 2)-1)*2) if not long_gi: idx += 1 return MCS_TABLE[mcs][idx]
[docs]def rate_to_mcs(rate, bw=20, long_gi=True): """Convert bit rate to MCS index. Args: rate (float): bit rate in Mbps bw (int): bandwidth, 20, 40, 80, ... long_gi (bool): True if long GI is used. Returns: mcs (int): MCS index >>> rate_to_mcs(120, bw=40, long_gi=False) 5 """ if bw not in [20, 40, 80, 160]: raise Exception("Unknown bandwidth: %d MHz" % (bw)) idx = int((math.log(bw/10, 2)-1)*2) if not long_gi: idx += 1 for mcs, rates in MCS_TABLE.items(): if abs(rates[idx] - rate) < 1e-3: return mcs # failed. Try dot11a rates for idx, r in enumerate(DOT11A_RATES): if abs(r-rate) < 1e-3: return idx raise Exception("MCS not found: rate=%f, bw=%d, long_gi=%s" % (rate, bw, long_gi))
[docs]def is_broadcast(mac): """Whether or not a mac is broadcast MAC address. Args: mac (str): MAC address in string format (``xx:xx:xx:xx:xx:xx``). Case insensitive. Returns: bool. """ return mac.lower() == 'ff:ff:ff:ff:ff:ff'
[docs]def is_multicast(mac): """Whether a MAC address is IPV4/V6 multicast address. See https://en.wikipedia.org/wiki/Multicast_address#Ethernet ARgs: mac (str): MAC address Returns: bool >>> is_multicast('01:80:C2:00:00:08') True """ octet = int(mac.split(':')[0], base=16) return octet & 0x01 > 0
[docs]def is_lowest_rate(rate): """Whether or not the rate is the lowest rate in rate table. Args: rate (int): rate in Mbps . Can be 802.11g/n rate. Returns: bool: ``True`` if the rate is lowest, otherwise ``False``. Note that if ``rate`` is not valid, this function returns ``False``, instead of raising an exception. """ return rate_to_mcs(rate) == 0
[docs]def is_highest_rate(rate): """Whether or not the rate is the highest rate (single spatial stream) in rate table. Args: rate (int): rate in Mbps. Can be 802.11g/n rate. Returns: bool: ``True`` if the rate is highest, otherwise ``False``. Note that if ``rate`` is not valid, this function returns ``False``, instead of raising an exception. """ return rate_to_mcs(rate) == 7
[docs]def is_ack(pkt): """Whether or not the packet is an ack packet. Args: pkt (:class:`wltrace.dot11.Dot11Packet`): the packet. Returns: bool: ``True`` if it is an ack packet, otherwise ``False``. """ return pkt.type == DOT11_TYPE_CONTROL and pkt.subtype == DOT11_SUBTYPE_ACK
[docs]def is_block_ack(pkt): """Whether a packet is a Block Ack packet. """ return pkt.type == DOT11_TYPE_CONTROL and\ pkt.subtype == DOT11_SUBTYPE_BLOCK_ACK
[docs]def is_beacon(pkt): """Whether a packet is a Beacon packet. """ return pkt.type == DOT11_TYPE_MANAGEMENT and\ pkt.subtype == DOT11_SUBTYPE_BEACON
[docs]def is_qos_data(pkt): """Whether a packet is a QoS Data packet. """ return pkt.type == DOT11_TYPE_DATA and pkt.subtype == DOT11_SUBTYPE_QOS_DATA
[docs]def next_seq(seq): """Next sequence number. Args: seq (int): current sequence number Returns: int: next sequence number, may wrap around >>> next_seq(3) 4 >>> next_seq(4095) 0 """ return (seq + 1) % SEQ_NUM_MODULO
[docs]class Beacon(object): """Payload for 802.11 Beacon packet. """ def __init__(self, pkt): self.timestamp, self.interval, self.capabilities = pkt.unpack('<QHH') tag, len = pkt.unpack('<BB') if tag == 0: self.ssid, = pkt.unpack('<%ds' % (len))
[docs]class Dot11Packet(GenericHeader): """IEEE802.11 packet. This class parse as much as possible depending on the packet type and subtype. Args: fh (file object): the file's read pointer points to the beginning of a 802.11 packet. phy (:class:`pyparser.capture.common.PhyInfo`): PHY information. counter (int): packet index in trace file, starting from 1. """ PACK_PATTERN = '<HH6s' FIELDS = [ 'fc', 'duration', 'addr1' ]
[docs] def parse_mgmt(self): self.addr2, self.addr3, self.seq = self.unpack('<6s6sH') if self.order: self.ht, = self.unpack('<I') if self.subtype == DOT11_SUBTYPE_BEACON: try: self.beacon = Beacon(self) except: pass
[docs] def parse_data(self): self.addr2, self.addr3, self.seq = self.unpack('<6s6sH') if self.from_ds and self.to_ds: self.addr4, = self.unpack('<6s') if self.subtype >= 8: self.qos, = self.unpack('<H')
[docs] def parse_control(self): if self.subtype == DOT11_SUBTYPE_BLOCK_ACK: self.addr2, ba_control = self.unpack('<6sH') self.ba_tid = ba_control >> 12 self.ba_compressed = ba_control & 0x0004 > 0 self.ba_multi_tid = ba_control & 0x0002 > 0 self.ba_policy = ba_control & 0x0001 > 0 if not self.ba_multi_tid and self.ba_compressed: ba_seq_control, self.ba_bitmap = self.unpack('<HQ') self.ba_begin_seq = ba_seq_control >> 4 self.ba_begin_frag = ba_seq_control & 0x000f
def __init__(self, fh=None, phy=None, counter=1, *args, **kwargs): if fh is None: self.real = False for k, v in kwargs.items(): setattr(self, k, v) self.phy = PhyInfo() return packet_start = fh.tell() cls = self.__class__ super(cls, self).__init__(fh, *args, **kwargs) self.real = True self.phy = phy self.counter = counter self.acked = False self.ack_pkt = None self.type = (self.fc & 0x000c) >> 2 self.subtype = (self.fc & 0x00f0) >> 4 for shift, flag in enumerate(['to_ds', 'from_ds', 'more_frag', 'retry', 'power', 'more_data', 'protected', 'order'], start=8): setattr(self, flag, (self.fc & (1 << shift)) > 0) self.payload = fh.read() self.offset = 0 try: if self.type == DOT11_TYPE_MANAGEMENT: self.parse_mgmt() elif self.type == DOT11_TYPE_DATA: self.parse_data() elif self.type == DOT11_TYPE_CONTROL: self.parse_control() except: pass for attr in ['addr1', 'addr2', 'addr3', 'addr4']: if hasattr(self, attr): setattr(self, attr, utils.bin_to_mac(getattr(self, attr))) if hasattr(self, 'seq'): self.frag_num = self.seq & 0x000f self.seq_num = (self.seq & 0xfff0) >> 4 del self.seq else: self.seq_num = None self.frag_num = None fh.seek(packet_start) self.raw = fh.read() self._hash = None fh.close() self._crc_ok = None @property def src(self): """Shortcut to ``pkt.addr2``. """ return getattr(self, 'addr2', None) @src.setter def src(self, val): self.addr2 = val @property def dest(self): """Shortcut to ``pkt.addr1``. """ return getattr(self, 'addr1', None) @dest.setter def dest(self, val): self.addr1 = val @property def ts(self): """Shortcut to ``pkt.phy.timestamp``. """ return datetime.datetime.fromtimestamp(self.phy.epoch_ts) @property def end_ts(self): return datetime.datetime.fromtimestamp(self.phy.end_epoch_ts) @property def epoch_ts(self): return self.phy.epoch_ts @property def end_epoch_ts(self): return self.phy.end_epoch_ts @property def hash(self): if self._hash is None: self._hash = hashlib.md5(self.raw).hexdigest() return self._hash def __eq__(self, other): if not isinstance(other, Dot11Packet): return False return self.hash == other.hash
[docs] def air_time(self): """Duration of the packet in air. Returns: float: duration in seconds. """ return self.phy.len * 8 / self.phy.rate * 1e-6
@property def crc_ok(self): if self._crc_ok is not None: return self._crc_ok if len(self.raw) <= 4: return False expected_crc = binascii.crc32(self.raw[:-4]) & 0xffffffff got_crc, = struct.unpack('<I', self.raw[-4:]) self._crc_ok = expected_crc == got_crc return self._crc_ok