1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
|
# scapy server unit test
import sys,os
scapy_server_path = os.path.abspath(os.path.join(os.pardir, 'trex_control_plane', 'stl', 'services','scapy_server'))
stl_pathname = os.path.abspath(os.path.join(os.pardir, os.pardir, 'trex_control_plane','stl'))
sys.path.append(scapy_server_path)
sys.path.append(stl_pathname)
#import stl_path
import trex_stl_lib
from trex_stl_lib.api import *
from copy import deepcopy
import tempfile
import hashlib
from platform_cmd_link import *
import functional_general_test
from nose.tools import assert_equal
from nose.tools import assert_not_equal
from nose.tools import nottest
from nose.plugins.attrib import attr
import binascii
from scapy_server import *
class scapy_service_tester(functional_general_test.CGeneralFunctional_Test):
def setUp(self):
self.s = Scapy_service()
def tearDown(self):
pass
'''
test for db and field update - checking check_update_test()
'''
def test_check_update(self):
allData = self.s.get_all()
allDataParsed = allData
dbMD5 = allDataParsed['db_md5']
fieldMD5 = allDataParsed['fields_md5']
resT1 = self.s.check_update(dbMD5,fieldMD5)
assert_equal(resT1,True)
try:
resT2 = False
resT2 = self.s.check_update('falseMD5','falseMD5')
except Exception as e:
if e.message == "Fields DB is not up to date":
resT2 = True
else:
raise
assert_equal(resT2,True)
try:
resT3 = False
resT3 = self.s.check_update(dbMD5,'falseMD5')
except Exception as e:
if e.message == "Fields DB is not up to date":
resT3 = True
else:
raise
assert_equal(resT3,True)
try:
resT4 = False
resT4 = self.s.check_update('falseMD5',fieldMD5)
except Exception as e:
if e.message == "Protocol DB is not up to date":
resT4 = True
else:
raise
assert_equal(resT4,True)
def test_check_updating_db(self):
#assume i got old db
try:
result = self.s.check_update('falseMD5','falseMD5')
except:
newAllData = self.s.get_all()
dbMD5 = newAllData['db_md5']
fieldMD5 = newAllData['fields_md5']
result = self.s.check_update(dbMD5,fieldMD5)
assert_equal(result,True)
else:
raise Exception("scapy_server_test: check_updating_db failed")
# testing pkt = Ether()/IP()/TCP()/"test" by defualt
def test_build_packet(self,original_pkt='Ether()/IP()/TCP()/"test"'):
test_pkt = original_pkt
original_pkt = eval(original_pkt)
test_res = self.s.build_pkt(test_pkt)
test_pkt_buffer = test_res['buffer']
resT1 = (test_pkt_buffer == binascii.b2a_base64(str(original_pkt)))
assert_equal(resT1,True)
#testing offsets of packet IP() by default
def test_get_all_offsets(self,original_pkt = 'IP()'):
test_pkt = original_pkt
original_pkt = eval(original_pkt)
original_pkt.build()
tested_offsets_by_layers = self.s.get_all_pkt_offsets(test_pkt)
layers = (test_pkt).split('/')
offsets_by_layers = {}
for layer in layers:
fields_dict = {}
for f in original_pkt.fields_desc:
size = f.get_size_bytes()
if f.name is 'load':
size = len(original_pkt)
fields_dict[f.name]= [f.offset, size]
original_pkt = original_pkt.payload
layer_name = layer.partition('(')[0] #clear layer name to include only alpha-numeric
layer_name = re.sub(r'\W+', '',layer_name)
offsets_by_layers[layer_name] = fields_dict
resT1 = (tested_offsets_by_layers == offsets_by_layers)
assert_equal(resT1,True)
def test_multi_packet(self):
e0 = 'Ether()'
e1 = 'Ether()/IP()'
e2 = 'TCP()'
e3 = 'UDP()'
e4 = 'Ether()/IP()/TCP()/"test"'
e5 = 'Ether()/IP()/UDP()'
packets = [e0,e1,e2,e3,e4,e5]
for packet in packets:
self.test_get_all_offsets(packet)
for packet in packets:
self.test_build_packet(packet)
|