1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
|
#!/usr/bin/python
import sys, os
from multiprocessing import Process
import tempfile
def check_offsets(build, stdout, scapy_str = None, pcap = None):
import sys
sys.stdout = stdout
sys.stderr = stdout
# clean this env
for key in sys.modules.copy().keys():
if key.startswith('scapy.'):
del sys.modules[key]
globals().clear()
import outer_packages
from scapy.all import Ether, IP, UDP
if scapy_str:
pkt = eval(scapy_str)
elif pcap:
from scapy.utils import rdpcap
pkt = rdpcap(pcap)[0]
else:
raise Exception('Please specify scapy_str or pcap.')
if build:
pkt.build()
assert pkt
assert pkt.payload
lay = pkt
while lay:
print(' ### %s (offset %s)' % (lay.name, lay._offset))
lay.dump_fields_offsets()
if lay == pkt:
assert lay._offset == 0, 'Offset of first layer should be zero.'
else:
if build:
assert lay._offset != 0, 'Offset of second and further layers should not be zero if packets is built.'
else:
assert lay._offset == 0, 'Offset of second and further layers should be zero if packets is not built.'
for index, field in enumerate(lay.fields_desc):
if index == 0:
assert field._offset == 0, 'Offset of first field should be zero.'
else:
if build:
if field.get_size_bytes() == 0:
continue
assert field._offset != 0, 'Offset of second and further fields should not be zero if packets is built.'
else:
assert field._offset == 0, 'Offset of second and further fields should be zero if packets is not built.'
lay = lay.payload
def isolate_env(f, *a, **k):
with tempfile.TemporaryFile(mode = 'w+') as tmpfile:
k['stdout'] = tmpfile
p = Process(target = f, args = a, kwargs = k)
p.start()
p.join()
print('')
tmpfile.seek(0)
print(tmpfile.read())
if p.exitcode:
raise Exception('Return status not zero, check the output')
class CScapyOffsets_Test():
def setUp(self):
self.dir = os.path.abspath(os.path.dirname(__file__)) + '/'
def test_offsets_udp_build(self):
isolate_env(check_offsets, scapy_str = "Ether()/IP()/UDP()/('x'*9)", build = True)
def test_offsets_udp_nobuild(self):
isolate_env(check_offsets, scapy_str = "Ether()/IP()/UDP()/('x'*9)", build = False)
def test_offsets_pcap_build(self):
isolate_env(check_offsets, pcap = self.dir + 'golden/bp_sim_dns_vlans.pcap', build = True)
def test_offsets_pcap_nobuild(self):
isolate_env(check_offsets, pcap = self.dir + 'golden/bp_sim_dns_vlans.pcap', build = False)
|