Source code for wltrace.pcap

"""Pcap file parser.
"""

import struct
import binascii
try:
    from cStringIO import StringIO
except:
    from StringIO import StringIO

import radiotap
import dot11

from common import WlTrace, GenericHeader, PhyInfo

_PCAP_FILE_MAGIC_NUMBER = 0xa1b2c3d4
_PCAP_FILE_MAGIC_NUMBER_NS = 0xa1b23c4d

_PCAP_VERSION_MAJOR = 2
_PCAP_VERSION_MINOR = 4

_LINKTYPE_IEEE802_11 = 105
_LINKTYPE_IEEE802_11_RADIOTAP = 127

PCAP_FILE_MAGIC_LE = struct.pack('<I', _PCAP_FILE_MAGIC_NUMBER)
PCAP_FILE_MAGIC_LE_NS = struct.pack('<I', _PCAP_FILE_MAGIC_NUMBER_NS)
PCAP_FILE_MAGIC_BE = struct.pack('>I', _PCAP_FILE_MAGIC_NUMBER)
PCAP_FILE_MAGIC_BE_NS = struct.pack('>I', _PCAP_FILE_MAGIC_NUMBER_NS)


[docs]class PcapException(Exception): pass
[docs]class PcapHeader(GenericHeader): """Pcap file header. The format is documented here: https://wiki.wireshark.org/Development/LibpcapFileFormat Note that the file header does not contain the total number of packets in the file. Args: fh (file object): the packet trace file. """ _PACK_PATTERN_BASE = "IHHiIII" FIELDS = [ 'magic_number', 'version_major', 'version_minor', 'thiszone', 'sigfigs', 'snaplen', 'network', ] def __init__(self, fh, *args, **kwargs): fh.seek(0) magic = fh.read(4) fh.seek(0) if magic not in [PCAP_FILE_MAGIC_LE, PCAP_FILE_MAGIC_LE_NS, PCAP_FILE_MAGIC_BE, PCAP_FILE_MAGIC_BE_NS]: raise Exception("Unknown file magic: %s" % (binascii(magic))) self.endian = '<' if magic in [PCAP_FILE_MAGIC_LE, PCAP_FILE_MAGIC_LE_NS] else '>' self.nano_ts = magic in [PCAP_FILE_MAGIC_LE_NS, PCAP_FILE_MAGIC_BE_NS] cls = self.__class__ cls.PACK_PATTERN = '%s%s' % (self.endian, cls._PACK_PATTERN_BASE) super(cls, self).__init__(fh, *args, **kwargs) if self.version_major != _PCAP_VERSION_MAJOR or\ self.version_minor != _PCAP_VERSION_MINOR: raise PcapException('Expect PCAP version 2.4, got %d.%d' % (self.version_major, self.version_minor)) @classmethod
[docs] def to_binary(cls, endian='@', snaplen=65535, network=_LINKTYPE_IEEE802_11_RADIOTAP): pattern = '%s%s' % (endian, cls._PACK_PATTERN_BASE) return struct.pack(pattern, _PCAP_FILE_MAGIC_NUMBER, _PCAP_VERSION_MAJOR, _PCAP_VERSION_MINOR, 0, 0, snaplen, network)
[docs]class PcapPacketHeader(GenericHeader): """Per packet header in Pcap format. """ _PACK_PATTERN_BASE = 'IIII' FIELDS = [ 'ts_sec', 'ts_usec', 'incl_len', 'orig_len', ] def __init__(self, fh, header, *args, **kwargs): cls = self.__class__ cls.PACK_PATTERN = '%s%s' % (header.endian, cls._PACK_PATTERN_BASE) super(cls, self).__init__(fh, *args, **kwargs) self.epoch_ts = self.ts_sec self.epoch_ts += self.ts_usec / (1e9 if header.nano_ts else 1e6) self.epoch_ts += header.thiszone @classmethod
[docs] def encapsulate(cls, pkt, endian='@'): pattern = '%s%s' % (endian, cls._PACK_PATTERN_BASE) ts_sec = int(pkt.epoch_ts) ts_usec = int((pkt.epoch_ts - ts_sec) * 1e6) phy = pkt.phy.to_binary() incl_len = len(phy) + pkt.phy.caplen orig_len = len(phy) + pkt.phy.len return '%s%s%s' % (struct.pack(pattern, ts_sec, ts_usec, incl_len, orig_len), phy, pkt.raw)
[docs]class PcapCapture(WlTrace): """Represent a Pcap packet trace. Currently only support two link types. """ LINKTYPES = [ _LINKTYPE_IEEE802_11, _LINKTYPE_IEEE802_11_RADIOTAP, ] def __init__(self, path, *args, **kwargs): cls = self.__class__ super(cls, self).__init__(path, *args, **kwargs) self.header = PcapHeader(self.fh) if self.header.network not in cls.LINKTYPES: raise PcapException("Unsupported link type: %d" % (self.header.network)) if self.header.network == _LINKTYPE_IEEE802_11_RADIOTAP: self.has_phy_info = True @classmethod
[docs] def save(cls, path, pkts): with open(path, 'wb') as f: f.write(PcapHeader.to_binary()) for pkt in pkts: f.write(PcapPacketHeader.encapsulate(pkt))
def _read_one_pkt(self): pkt_header = PcapPacketHeader(self.fh, self.header) if pkt_header.incl_len > self.header.snaplen: raise PcapException("snaplen: %d, incl_len: %d" % (self.header.snaplen, pkt_header.incl_len)) raw = self.fh.read(pkt_header.incl_len) if len(raw) != pkt_header.incl_len: raise IOError("Short read: expect %d, got %d" % (pkt_header.incl_len, len(raw))) pkt_fh = StringIO(raw) if self.header.network == _LINKTYPE_IEEE802_11_RADIOTAP: rh = radiotap.RadiotapHeader(pkt_fh) phy = rh.to_phy() phy.len = pkt_header.orig_len - rh._it_len phy.caplen = pkt_header.incl_len - rh._it_len else: phy = PhyInfo(has_fcs=False, len=pkt_header.orig_len) phy.caplen = pkt_header.incl_len phy.epoch_ts = pkt_header.epoch_ts if self.fix_timestamp and phy.rate is not None: phy.epoch_ts -= phy.len * 8 / phy.rate * 1e-6 pkt = dot11.Dot11Packet(pkt_fh, phy=phy, counter=self.counter) try: phy.end_epoch_ts = phy.epoch_ts + pkt.air_time() except: pass self.counter += 1 return pkt def _next(self, n=100): if self.fh is None: return [] pkts = [] for _ in xrange(n): try: pkt = self._read_one_pkt() if pkt.phy.ampdu_ref is not None: # read all packets in this ampdu ampdu_ref = pkt.phy.ampdu_ref while pkt.phy.ampdu_ref == ampdu_ref: if pkt.phy.last_frame: # update previous ampdu's rate info for p in reversed(pkts): if p.phy.ampdu_ref != ampdu_ref: break p.phy.rate = pkt.phy.rate break pkts.append(pkt) pkt = self._read_one_pkt() pkts.append(pkt) except IOError: self.fh.close() self.fh = None break return pkts