Skip to content
  • Watch
  • Fork

/hexcap

HTTPS clone URL

SSH clone URL

Subversion checkout URL

You can clone with
,
, or
.
Clone in Desktop Download ZIP
Branch: master
executable file 93 lines (79 sloc) 3.071 kB
"""IEEE 802.1q"""
import struct
import dpkt
# Ethernet payload types - http://standards.ieee.org/regauth/ethertype
ETH_TYPE_PUP = 0x0200 # PUP protocol
ETH_TYPE_IP = 0x0800 # IP protocol
ETH_TYPE_ARP = 0x0806 # address resolution protocol
ETH_TYPE_CDP = 0x2000 # Cisco Discovery Protocol
ETH_TYPE_DTP = 0x2004 # Cisco Dynamic Trunking Protocol
ETH_TYPE_REVARP = 0x8035 # reverse addr resolution protocol
ETH_TYPE_DOT1Q = 0x8100 # IEEE 802.1Q VLAN tagging
ETH_TYPE_IPX = 0x8137 # Internetwork Packet Exchange
ETH_TYPE_IP6 = 0x86DD # IPv6 protocol
ETH_TYPE_PPP = 0x880B # PPP
ETH_TYPE_MPLS = 0x8847 # MPLS
ETH_TYPE_MPLS_MCAST = 0x8848 # MPLS Multicast
ETH_TYPE_PPPoE_DISC = 0x8863 # PPP Over Ethernet Discovery Stage
ETH_TYPE_PPPoE = 0x8864 # PPP Over Ethernet Session Stage
class DOT1Q(dpkt.Packet):
__hdr__ = (
('x2', 'H', 0),
('type', 'H', 0)
)
_typesw = {}
# pcp == Priority Code Point(802.1p)
def _get_pcp(self): return self.x2 >> 13
def _set_pcp(self, pcp): self.x2 = (self.x2 & 0x1fff) | (pcp << 13)
pcp = property(_get_pcp, _set_pcp)
# dei == Drop Eligible Indicator(almost never actually used)
def _get_dei(self): return (self.x2 >> 12) & 1
def _set_dei(self, dei): self.x2 = (self.x2 & 61439) | (dei << 12)
dei = property(_get_dei, _set_dei)
# tag == vlan tag
def _get_tag(self): return self.x2 & (65535 >> 4)
def _set_tag(self, tag): self.x2 = (self.x2 & 0xfff) | tag
tag = property(_get_tag, _set_tag)
def set_type(cls, t, pktclass):
cls._typesw[t] = pktclass
set_type = classmethod(set_type)
def get_type(cls, t):
return cls._typesw[t]
get_type = classmethod(get_type)
def _unpack_data(self, buf):
if self.type == ETH_TYPE_MPLS or \
self.type == ETH_TYPE_MPLS_MCAST:
# XXX - skip labels (max # of labels is undefined, just use 24)
self.labels = []
for i in range(24):
entry = struct.unpack('>I', buf[i*4:i*4+4])[0]
label = ((entry & MPLS_LABEL_MASK) >> MPLS_LABEL_SHIFT, \
(entry & MPLS_QOS_MASK) >> MPLS_QOS_SHIFT, \
(entry & MPLS_TTL_MASK) >> MPLS_TTL_SHIFT)
self.labels.append(label)
if entry & MPLS_STACK_BOTTOM:
break
self.type = ETH_TYPE_IP
buf = buf[(i + 1) * 4:]
try:
self.data = self._typesw[self.type](buf)
setattr(self, self.data.__class__.__name__.lower(), self.data)
except (KeyError, dpkt.UnpackError):
self.data = buf
def unpack(self, buf):
dpkt.Packet.unpack(self, buf)
self._unpack_data(self.data)
# XXX - auto-load Ethernet dispatch table from ETH_TYPE_* definitions
def __load_types():
g = globals()
for k, v in g.iteritems():
if k.startswith('ETH_TYPE_'):
name = k[9:]
modname = name.lower()
try:
mod = __import__(modname, g)
except ImportError:
continue
DOT1Q.set_type(v, getattr(mod, name))
if not DOT1Q._typesw:
__load_types()
Jump to Line
Something went wrong with that request. Please try again.