diff options
Diffstat (limited to 'scripts/automation')
-rwxr-xr-x | scripts/automation/regression/functional_tests/scapy_server_test.py | 157 |
1 files changed, 58 insertions, 99 deletions
diff --git a/scripts/automation/regression/functional_tests/scapy_server_test.py b/scripts/automation/regression/functional_tests/scapy_server_test.py index 1c6d2bd0..55daae77 100755 --- a/scripts/automation/regression/functional_tests/scapy_server_test.py +++ b/scripts/automation/regression/functional_tests/scapy_server_test.py @@ -1,36 +1,32 @@ # scapy server unit test
import sys,os
-scapy_server_path = os.path.abspath(os.path.join(os.pardir, 'trex_control_plane', 'stl', 'examples'))
-print scapy_server_path
+scapy_server_path = os.path.abspath(os.path.join(os.pardir, 'trex_control_plane', 'stl', 'services','scapy_server'))
stl_pathname = os.path.abspath(os.path.join(os.pardir, os.pardir, 'trex_control_plane','stl'))
sys.path.append(scapy_server_path)
sys.path.append(stl_pathname)
-
#import stl_path
import trex_stl_lib
from trex_stl_lib.api import *
from copy import deepcopy
import tempfile
-import md5
-
-import outer_packages
+import hashlib
from platform_cmd_link import *
import functional_general_test
from nose.tools import assert_equal
from nose.tools import assert_not_equal
from nose.tools import nottest
from nose.plugins.attrib import attr
-
+import binascii
from scapy_server import *
class scapy_server_tester(functional_general_test.CGeneralFunctional_Test):
def setUp(self):
- pass
+ self.s = scapy_server()
def tearDown(self):
pass
@@ -39,103 +35,71 @@ class scapy_server_tester(functional_general_test.CGeneralFunctional_Test): test for db and field update - checking check_update_test()
'''
def test_check_update(self):
- allData = get_all()
- allDataParsed = json.loads(allData)
- dbMD5 = allDataParsed['DB_md5']
+ allData = self.s.get_all()
+ allDataParsed = allData
+ dbMD5 = allDataParsed['db_md5']
fieldMD5 = allDataParsed['fields_md5']
- result = check_update(dbMD5,fieldMD5)
- result = json.loads(result)
- '''
- if result[0][0] == 'Success' and result[1][0] == 'Success':
- print 'check_update_test [md5 comparison test]: Success'
- else:
- print 'check_update_test [md5 comparison test]: md5s of fields or db do not match the source'
- '''
- resT1 = (result[0][0] == 'Success' and result[1][0] == 'Success')
+ resT1 = self.s.check_update(dbMD5,fieldMD5)
assert_equal(resT1,True)
-
- result = check_update(json.dumps('falseMD5'),json.dumps('falseMD5'))
- result = json.loads(result)
- '''
- if result[0][0] == 'Fail' and result[1][0] == 'Fail':
- print 'check_update_test [wrong md5s return failure]: Success'
- else:
- print 'check_update_test [wrong md5s return failure]: md5s of fields or db return Success for invalid value'
- '''
- resT2 = (result[0][0] == 'Fail' and result[1][0] == 'Fail')
+ try:
+ resT2 = False
+ resT2 = self.s.check_update('falseMD5','falseMD5')
+ except Exception as e:
+ if e.message == "Fields DB is not up to date":
+ resT2 = True
+ else:
+ raise
assert_equal(resT2,True)
-
- result = check_update(dbMD5,json.dumps('falseMD5'))
- result = json.loads(result)
- '''
- if result[0][0] == 'Fail' and result[1][0] == 'Success':
- print 'check_update_test [wrong field md5 returns error, correct db md5]: Success'
- else:
- print 'md5 of field return Success for invalid value'
- '''
- resT3 = (result[0][0] == 'Fail' and result[1][0] == 'Success')
+ try:
+ resT3 = False
+ resT3 = self.s.check_update(dbMD5,'falseMD5')
+ except Exception as e:
+ if e.message == "Fields DB is not up to date":
+ resT3 = True
+ else:
+ raise
assert_equal(resT3,True)
-
- result = check_update(json.dumps('falseMD5'),fieldMD5)
- result = json.loads(result)
- '''
- if result[0][0] == 'Success' and result[1][0] == 'Fail':
- print 'check_update_test [wrong db md5 returns error, correct field md5]: Success'
- else:
- print 'md5 of db return Success for invalid value'
- '''
- resT4 = (result[0][0] == 'Success' and result[1][0] == 'Fail')
+ try:
+ resT4 = False
+ resT4 = self.s.check_update('falseMD5',fieldMD5)
+ except Exception as e:
+ if e.message == "Protocol DB is not up to date":
+ resT4 = True
+ else:
+ raise
assert_equal(resT4,True)
def test_check_updating_db(self):
#assume i got old db
- result = check_update(json.dumps('falseMD5'),json.dumps('falseMD5'))
- result = json.loads(result)
- if result[0][0] == 'Fail' or result[1][0] == 'Fail':
- newAllData = get_all()
- allDataParsed = json.loads(newAllData)
- dbMD5 = allDataParsed['DB_md5']
- fieldMD5 = allDataParsed['fields_md5']
- result = check_update(dbMD5,fieldMD5)
- result = json.loads(result)
- '''
- if result[0][0] == 'Success' and result[1][0] == 'Success':
- print 'check_updating_db [got old db and updated it]: Success'
- else:
- print'check_updating_db [got old db and updated it]: FAILED'
- '''
- resT1 = (result[0][0] == 'Success' and result[1][0] == 'Success')
- assert_equal(resT1,True)
+ try:
+ result = self.s.check_update('falseMD5','falseMD5')
+ except:
+ newAllData = self.s.get_all()
+ dbMD5 = newAllData['db_md5']
+ fieldMD5 = newAllData['fields_md5']
+ result = self.s.check_update(dbMD5,fieldMD5)
+ assert_equal(result,True)
else:
raise Exception("scapy_server_test: check_updating_db failed")
# testing pkt = Ether()/IP()/TCP()/"test" by defualt
- def test_build_packet(self,original_pkt = json.dumps('Ether()/IP()/TCP()/"test"')):
+ def test_build_packet(self,original_pkt='Ether()/IP()/TCP()/"test"'):
test_pkt = original_pkt
- original_pkt = eval(json.loads(original_pkt))
- test_res = build_pkt(test_pkt)
- test_res = json.loads(test_res)
- test_pkt_buffer = json.loads(test_res[2])
- test_pkt_buffer = test_pkt_buffer.decode('base64')
- '''
- if test_pkt_buffer == str(original_pkt):
- print 'build_pkt test [scapy packet and string-defined packet comparison]: Success'
- else:
- print 'build_pkt test [scapy packet and string-defined packet comparison]: FAILED'
- '''
- resT1 = (test_pkt_buffer == str(original_pkt))
+ original_pkt = eval(original_pkt)
+ test_res = self.s.build_pkt(test_pkt)
+ test_pkt_buffer = test_res[1]
+ resT1 = (test_pkt_buffer == binascii.b2a_base64(str(original_pkt)))
assert_equal(resT1,True)
#testing offsets of packet IP() by default
- def test_get_all_offsets(self,original_pkt = json.dumps('IP()')):
+ def test_get_all_offsets(self,original_pkt = 'IP()'):
test_pkt = original_pkt
- original_pkt = eval(json.loads(original_pkt))
- tested_offsets_by_layers = get_all_pkt_offsets(test_pkt)
- tested_offsets_by_layers = json.loads(tested_offsets_by_layers)
- layers = json.loads(test_pkt).split('/')
+ original_pkt = eval(original_pkt)
+ tested_offsets_by_layers = self.s.get_all_pkt_offsets(test_pkt)
+ layers = (test_pkt).split('/')
offsets_by_layers = {}
for layer in layers:
fields_list = []
@@ -145,25 +109,20 @@ class scapy_server_tester(functional_general_test.CGeneralFunctional_Test): size = len(original_pkt)
fields_list.append([f.name, f.offset, size])
original_pkt = original_pkt.payload
- offsets_by_layers[layer] = fields_list
- '''
- if tested_offsets_by_layers == offsets_by_layers:
- print 'Success'
- else:
- print 'test_get_all_offsets[comparison of offsets in given packet]: FAILED'
- '''
+ layer_name = layer.partition('(')[0] #clear layer name to include only alpha-numeric
+ layer_name = re.sub(r'\W+', '',layer_name)
+ offsets_by_layers[layer_name] = fields_list
resT1 = (tested_offsets_by_layers == offsets_by_layers)
assert_equal(resT1,True)
def test_multi_packet(self):
- e0 = json.dumps('Ether()')
- e1 = json.dumps('Ether()/IP()')
- e2 = json.dumps('TCP()')
- e3 = json.dumps('UDP()')
- e4 = json.dumps('Ether()/IP()/TCP()/"test"')
- e5 = json.dumps('Ether()/IP()/UDP()')
+ e0 = 'Ether()'
+ e1 = 'Ether()/IP()'
+ e2 = 'TCP()'
+ e3 = 'UDP()'
+ e4 = 'Ether()/IP()/TCP()/"test"'
+ e5 = 'Ether()/IP()/UDP()'
packets = [e0,e1,e2,e3,e4,e5]
-
for packet in packets:
self.test_get_all_offsets(packet)
|