From abf329075bd14f5f41c3753d560260ac809ec4f3 Mon Sep 17 00:00:00 2001 From: itraviv Date: Sun, 31 Jul 2016 11:53:34 +0300 Subject: scapy_server for GUI+test --- .../functional_tests/scapy_server_test.py | 179 +++++++++++++++++++++ 1 file changed, 179 insertions(+) create mode 100755 scripts/automation/regression/functional_tests/scapy_server_test.py (limited to 'scripts/automation/regression/functional_tests') diff --git a/scripts/automation/regression/functional_tests/scapy_server_test.py b/scripts/automation/regression/functional_tests/scapy_server_test.py new file mode 100755 index 00000000..1c6d2bd0 --- /dev/null +++ b/scripts/automation/regression/functional_tests/scapy_server_test.py @@ -0,0 +1,179 @@ +# scapy server unit test + +import sys,os +scapy_server_path = os.path.abspath(os.path.join(os.pardir, 'trex_control_plane', 'stl', 'examples')) +print scapy_server_path +stl_pathname = os.path.abspath(os.path.join(os.pardir, os.pardir, 'trex_control_plane','stl')) +sys.path.append(scapy_server_path) +sys.path.append(stl_pathname) + + + +#import stl_path +import trex_stl_lib +from trex_stl_lib.api import * +from copy import deepcopy + +import tempfile +import md5 + +import outer_packages +from platform_cmd_link import * +import functional_general_test +from nose.tools import assert_equal +from nose.tools import assert_not_equal +from nose.tools import nottest +from nose.plugins.attrib import attr + +from scapy_server import * + + +class scapy_server_tester(functional_general_test.CGeneralFunctional_Test): + def setUp(self): + pass + + def tearDown(self): + pass + + ''' + test for db and field update - checking check_update_test() + ''' + def test_check_update(self): + allData = get_all() + allDataParsed = json.loads(allData) + dbMD5 = allDataParsed['DB_md5'] + fieldMD5 = allDataParsed['fields_md5'] + result = check_update(dbMD5,fieldMD5) + result = json.loads(result) + ''' + if result[0][0] == 'Success' and result[1][0] == 'Success': + print 'check_update_test [md5 comparison test]: Success' + else: + print 'check_update_test [md5 comparison test]: md5s of fields or db do not match the source' + ''' + resT1 = (result[0][0] == 'Success' and result[1][0] == 'Success') + assert_equal(resT1,True) + + result = check_update(json.dumps('falseMD5'),json.dumps('falseMD5')) + result = json.loads(result) + ''' + if result[0][0] == 'Fail' and result[1][0] == 'Fail': + print 'check_update_test [wrong md5s return failure]: Success' + else: + print 'check_update_test [wrong md5s return failure]: md5s of fields or db return Success for invalid value' + ''' + resT2 = (result[0][0] == 'Fail' and result[1][0] == 'Fail') + assert_equal(resT2,True) + + result = check_update(dbMD5,json.dumps('falseMD5')) + result = json.loads(result) + ''' + if result[0][0] == 'Fail' and result[1][0] == 'Success': + print 'check_update_test [wrong field md5 returns error, correct db md5]: Success' + else: + print 'md5 of field return Success for invalid value' + ''' + resT3 = (result[0][0] == 'Fail' and result[1][0] == 'Success') + assert_equal(resT3,True) + + result = check_update(json.dumps('falseMD5'),fieldMD5) + result = json.loads(result) + ''' + if result[0][0] == 'Success' and result[1][0] == 'Fail': + print 'check_update_test [wrong db md5 returns error, correct field md5]: Success' + else: + print 'md5 of db return Success for invalid value' + ''' + resT4 = (result[0][0] == 'Success' and result[1][0] == 'Fail') + assert_equal(resT4,True) + + + def test_check_updating_db(self): + #assume i got old db + result = check_update(json.dumps('falseMD5'),json.dumps('falseMD5')) + result = json.loads(result) + if result[0][0] == 'Fail' or result[1][0] == 'Fail': + newAllData = get_all() + allDataParsed = json.loads(newAllData) + dbMD5 = allDataParsed['DB_md5'] + fieldMD5 = allDataParsed['fields_md5'] + result = check_update(dbMD5,fieldMD5) + result = json.loads(result) + ''' + if result[0][0] == 'Success' and result[1][0] == 'Success': + print 'check_updating_db [got old db and updated it]: Success' + else: + print'check_updating_db [got old db and updated it]: FAILED' + ''' + resT1 = (result[0][0] == 'Success' and result[1][0] == 'Success') + assert_equal(resT1,True) + else: + raise Exception("scapy_server_test: check_updating_db failed") + + +# testing pkt = Ether()/IP()/TCP()/"test" by defualt + def test_build_packet(self,original_pkt = json.dumps('Ether()/IP()/TCP()/"test"')): + test_pkt = original_pkt + original_pkt = eval(json.loads(original_pkt)) + test_res = build_pkt(test_pkt) + test_res = json.loads(test_res) + test_pkt_buffer = json.loads(test_res[2]) + test_pkt_buffer = test_pkt_buffer.decode('base64') + ''' + if test_pkt_buffer == str(original_pkt): + print 'build_pkt test [scapy packet and string-defined packet comparison]: Success' + else: + print 'build_pkt test [scapy packet and string-defined packet comparison]: FAILED' + ''' + resT1 = (test_pkt_buffer == str(original_pkt)) + assert_equal(resT1,True) + + +#testing offsets of packet IP() by default + def test_get_all_offsets(self,original_pkt = json.dumps('IP()')): + test_pkt = original_pkt + original_pkt = eval(json.loads(original_pkt)) + tested_offsets_by_layers = get_all_pkt_offsets(test_pkt) + tested_offsets_by_layers = json.loads(tested_offsets_by_layers) + layers = json.loads(test_pkt).split('/') + offsets_by_layers = {} + for layer in layers: + fields_list = [] + for f in original_pkt.fields_desc: + size = f.get_size_bytes() + if f.name is 'load': + size = len(original_pkt) + fields_list.append([f.name, f.offset, size]) + original_pkt = original_pkt.payload + offsets_by_layers[layer] = fields_list + ''' + if tested_offsets_by_layers == offsets_by_layers: + print 'Success' + else: + print 'test_get_all_offsets[comparison of offsets in given packet]: FAILED' + ''' + resT1 = (tested_offsets_by_layers == offsets_by_layers) + assert_equal(resT1,True) + + def test_multi_packet(self): + e0 = json.dumps('Ether()') + e1 = json.dumps('Ether()/IP()') + e2 = json.dumps('TCP()') + e3 = json.dumps('UDP()') + e4 = json.dumps('Ether()/IP()/TCP()/"test"') + e5 = json.dumps('Ether()/IP()/UDP()') + packets = [e0,e1,e2,e3,e4,e5] + + for packet in packets: + self.test_get_all_offsets(packet) + + for packet in packets: + self.test_build_packet(packet) + + + + + + + + -- cgit From 9971425714e4fdc1a12fc1d66d36694d928d870c Mon Sep 17 00:00:00 2001 From: itraviv Date: Wed, 3 Aug 2016 11:36:14 +0300 Subject: fixed path after relocation of scapy_server --- .../functional_tests/scapy_server_test.py | 157 ++++++++------------- 1 file changed, 58 insertions(+), 99 deletions(-) (limited to 'scripts/automation/regression/functional_tests') diff --git a/scripts/automation/regression/functional_tests/scapy_server_test.py b/scripts/automation/regression/functional_tests/scapy_server_test.py index 1c6d2bd0..55daae77 100755 --- a/scripts/automation/regression/functional_tests/scapy_server_test.py +++ b/scripts/automation/regression/functional_tests/scapy_server_test.py @@ -1,36 +1,32 @@ # scapy server unit test import sys,os -scapy_server_path = os.path.abspath(os.path.join(os.pardir, 'trex_control_plane', 'stl', 'examples')) -print scapy_server_path +scapy_server_path = os.path.abspath(os.path.join(os.pardir, 'trex_control_plane', 'stl', 'services','scapy_server')) stl_pathname = os.path.abspath(os.path.join(os.pardir, os.pardir, 'trex_control_plane','stl')) sys.path.append(scapy_server_path) sys.path.append(stl_pathname) - #import stl_path import trex_stl_lib from trex_stl_lib.api import * from copy import deepcopy import tempfile -import md5 - -import outer_packages +import hashlib from platform_cmd_link import * import functional_general_test from nose.tools import assert_equal from nose.tools import assert_not_equal from nose.tools import nottest from nose.plugins.attrib import attr - +import binascii from scapy_server import * class scapy_server_tester(functional_general_test.CGeneralFunctional_Test): def setUp(self): - pass + self.s = scapy_server() def tearDown(self): pass @@ -39,103 +35,71 @@ class scapy_server_tester(functional_general_test.CGeneralFunctional_Test): test for db and field update - checking check_update_test() ''' def test_check_update(self): - allData = get_all() - allDataParsed = json.loads(allData) - dbMD5 = allDataParsed['DB_md5'] + allData = self.s.get_all() + allDataParsed = allData + dbMD5 = allDataParsed['db_md5'] fieldMD5 = allDataParsed['fields_md5'] - result = check_update(dbMD5,fieldMD5) - result = json.loads(result) - ''' - if result[0][0] == 'Success' and result[1][0] == 'Success': - print 'check_update_test [md5 comparison test]: Success' - else: - print 'check_update_test [md5 comparison test]: md5s of fields or db do not match the source' - ''' - resT1 = (result[0][0] == 'Success' and result[1][0] == 'Success') + resT1 = self.s.check_update(dbMD5,fieldMD5) assert_equal(resT1,True) - - result = check_update(json.dumps('falseMD5'),json.dumps('falseMD5')) - result = json.loads(result) - ''' - if result[0][0] == 'Fail' and result[1][0] == 'Fail': - print 'check_update_test [wrong md5s return failure]: Success' - else: - print 'check_update_test [wrong md5s return failure]: md5s of fields or db return Success for invalid value' - ''' - resT2 = (result[0][0] == 'Fail' and result[1][0] == 'Fail') + try: + resT2 = False + resT2 = self.s.check_update('falseMD5','falseMD5') + except Exception as e: + if e.message == "Fields DB is not up to date": + resT2 = True + else: + raise assert_equal(resT2,True) - - result = check_update(dbMD5,json.dumps('falseMD5')) - result = json.loads(result) - ''' - if result[0][0] == 'Fail' and result[1][0] == 'Success': - print 'check_update_test [wrong field md5 returns error, correct db md5]: Success' - else: - print 'md5 of field return Success for invalid value' - ''' - resT3 = (result[0][0] == 'Fail' and result[1][0] == 'Success') + try: + resT3 = False + resT3 = self.s.check_update(dbMD5,'falseMD5') + except Exception as e: + if e.message == "Fields DB is not up to date": + resT3 = True + else: + raise assert_equal(resT3,True) - - result = check_update(json.dumps('falseMD5'),fieldMD5) - result = json.loads(result) - ''' - if result[0][0] == 'Success' and result[1][0] == 'Fail': - print 'check_update_test [wrong db md5 returns error, correct field md5]: Success' - else: - print 'md5 of db return Success for invalid value' - ''' - resT4 = (result[0][0] == 'Success' and result[1][0] == 'Fail') + try: + resT4 = False + resT4 = self.s.check_update('falseMD5',fieldMD5) + except Exception as e: + if e.message == "Protocol DB is not up to date": + resT4 = True + else: + raise assert_equal(resT4,True) def test_check_updating_db(self): #assume i got old db - result = check_update(json.dumps('falseMD5'),json.dumps('falseMD5')) - result = json.loads(result) - if result[0][0] == 'Fail' or result[1][0] == 'Fail': - newAllData = get_all() - allDataParsed = json.loads(newAllData) - dbMD5 = allDataParsed['DB_md5'] - fieldMD5 = allDataParsed['fields_md5'] - result = check_update(dbMD5,fieldMD5) - result = json.loads(result) - ''' - if result[0][0] == 'Success' and result[1][0] == 'Success': - print 'check_updating_db [got old db and updated it]: Success' - else: - print'check_updating_db [got old db and updated it]: FAILED' - ''' - resT1 = (result[0][0] == 'Success' and result[1][0] == 'Success') - assert_equal(resT1,True) + try: + result = self.s.check_update('falseMD5','falseMD5') + except: + newAllData = self.s.get_all() + dbMD5 = newAllData['db_md5'] + fieldMD5 = newAllData['fields_md5'] + result = self.s.check_update(dbMD5,fieldMD5) + assert_equal(result,True) else: raise Exception("scapy_server_test: check_updating_db failed") # testing pkt = Ether()/IP()/TCP()/"test" by defualt - def test_build_packet(self,original_pkt = json.dumps('Ether()/IP()/TCP()/"test"')): + def test_build_packet(self,original_pkt='Ether()/IP()/TCP()/"test"'): test_pkt = original_pkt - original_pkt = eval(json.loads(original_pkt)) - test_res = build_pkt(test_pkt) - test_res = json.loads(test_res) - test_pkt_buffer = json.loads(test_res[2]) - test_pkt_buffer = test_pkt_buffer.decode('base64') - ''' - if test_pkt_buffer == str(original_pkt): - print 'build_pkt test [scapy packet and string-defined packet comparison]: Success' - else: - print 'build_pkt test [scapy packet and string-defined packet comparison]: FAILED' - ''' - resT1 = (test_pkt_buffer == str(original_pkt)) + original_pkt = eval(original_pkt) + test_res = self.s.build_pkt(test_pkt) + test_pkt_buffer = test_res[1] + resT1 = (test_pkt_buffer == binascii.b2a_base64(str(original_pkt))) assert_equal(resT1,True) #testing offsets of packet IP() by default - def test_get_all_offsets(self,original_pkt = json.dumps('IP()')): + def test_get_all_offsets(self,original_pkt = 'IP()'): test_pkt = original_pkt - original_pkt = eval(json.loads(original_pkt)) - tested_offsets_by_layers = get_all_pkt_offsets(test_pkt) - tested_offsets_by_layers = json.loads(tested_offsets_by_layers) - layers = json.loads(test_pkt).split('/') + original_pkt = eval(original_pkt) + tested_offsets_by_layers = self.s.get_all_pkt_offsets(test_pkt) + layers = (test_pkt).split('/') offsets_by_layers = {} for layer in layers: fields_list = [] @@ -145,25 +109,20 @@ class scapy_server_tester(functional_general_test.CGeneralFunctional_Test): size = len(original_pkt) fields_list.append([f.name, f.offset, size]) original_pkt = original_pkt.payload - offsets_by_layers[layer] = fields_list - ''' - if tested_offsets_by_layers == offsets_by_layers: - print 'Success' - else: - print 'test_get_all_offsets[comparison of offsets in given packet]: FAILED' - ''' + layer_name = layer.partition('(')[0] #clear layer name to include only alpha-numeric + layer_name = re.sub(r'\W+', '',layer_name) + offsets_by_layers[layer_name] = fields_list resT1 = (tested_offsets_by_layers == offsets_by_layers) assert_equal(resT1,True) def test_multi_packet(self): - e0 = json.dumps('Ether()') - e1 = json.dumps('Ether()/IP()') - e2 = json.dumps('TCP()') - e3 = json.dumps('UDP()') - e4 = json.dumps('Ether()/IP()/TCP()/"test"') - e5 = json.dumps('Ether()/IP()/UDP()') + e0 = 'Ether()' + e1 = 'Ether()/IP()' + e2 = 'TCP()' + e3 = 'UDP()' + e4 = 'Ether()/IP()/TCP()/"test"' + e5 = 'Ether()/IP()/UDP()' packets = [e0,e1,e2,e3,e4,e5] - for packet in packets: self.test_get_all_offsets(packet) -- cgit From 36f1db884d653f077bdafab908cbbf65833e8772 Mon Sep 17 00:00:00 2001 From: itraviv Date: Thu, 4 Aug 2016 16:07:04 +0300 Subject: 1) changed class name to 'scapy_service' 2) changed returned values to be dictionaries instead of arrays --- .../regression/functional_tests/scapy_server_test.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) (limited to 'scripts/automation/regression/functional_tests') diff --git a/scripts/automation/regression/functional_tests/scapy_server_test.py b/scripts/automation/regression/functional_tests/scapy_server_test.py index 55daae77..7752f004 100755 --- a/scripts/automation/regression/functional_tests/scapy_server_test.py +++ b/scripts/automation/regression/functional_tests/scapy_server_test.py @@ -24,9 +24,9 @@ import binascii from scapy_server import * -class scapy_server_tester(functional_general_test.CGeneralFunctional_Test): +class scapy_service_tester(functional_general_test.CGeneralFunctional_Test): def setUp(self): - self.s = scapy_server() + self.s = scapy_service() def tearDown(self): pass @@ -89,7 +89,7 @@ class scapy_server_tester(functional_general_test.CGeneralFunctional_Test): test_pkt = original_pkt original_pkt = eval(original_pkt) test_res = self.s.build_pkt(test_pkt) - test_pkt_buffer = test_res[1] + test_pkt_buffer = test_res['buffer'] resT1 = (test_pkt_buffer == binascii.b2a_base64(str(original_pkt))) assert_equal(resT1,True) @@ -98,20 +98,21 @@ class scapy_server_tester(functional_general_test.CGeneralFunctional_Test): def test_get_all_offsets(self,original_pkt = 'IP()'): test_pkt = original_pkt original_pkt = eval(original_pkt) + original_pkt.build() tested_offsets_by_layers = self.s.get_all_pkt_offsets(test_pkt) layers = (test_pkt).split('/') offsets_by_layers = {} for layer in layers: - fields_list = [] + fields_dict = {} for f in original_pkt.fields_desc: size = f.get_size_bytes() if f.name is 'load': size = len(original_pkt) - fields_list.append([f.name, f.offset, size]) + fields_dict[f.name]= [f.offset, size] original_pkt = original_pkt.payload layer_name = layer.partition('(')[0] #clear layer name to include only alpha-numeric layer_name = re.sub(r'\W+', '',layer_name) - offsets_by_layers[layer_name] = fields_list + offsets_by_layers[layer_name] = fields_dict resT1 = (tested_offsets_by_layers == offsets_by_layers) assert_equal(resT1,True) -- cgit From 28b09fb263445b1c73e3119d17feb5cc6f954ee6 Mon Sep 17 00:00:00 2001 From: itraviv Date: Thu, 4 Aug 2016 17:44:50 +0300 Subject: 1) made a class Scapy_server encapsulating scapy_wrapper and scapy_service 2) fixed some exception handling --- scripts/automation/regression/functional_tests/scapy_server_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'scripts/automation/regression/functional_tests') diff --git a/scripts/automation/regression/functional_tests/scapy_server_test.py b/scripts/automation/regression/functional_tests/scapy_server_test.py index 7752f004..5f269f44 100755 --- a/scripts/automation/regression/functional_tests/scapy_server_test.py +++ b/scripts/automation/regression/functional_tests/scapy_server_test.py @@ -26,7 +26,7 @@ from scapy_server import * class scapy_service_tester(functional_general_test.CGeneralFunctional_Test): def setUp(self): - self.s = scapy_service() + self.s = Scapy_service() def tearDown(self): pass -- cgit From 20d4c966c15aa134d58c8d53d840cddd9903f9aa Mon Sep 17 00:00:00 2001 From: itraviv Date: Tue, 9 Aug 2016 11:29:52 +0300 Subject: added wrapper class to support testing of scapy_server, this class wraps server interaction. added test of offsets versus actual packet dump --- .../functional_tests/scapy_server_test.py | 87 ++++++++++++++++++++-- 1 file changed, 81 insertions(+), 6 deletions(-) (limited to 'scripts/automation/regression/functional_tests') diff --git a/scripts/automation/regression/functional_tests/scapy_server_test.py b/scripts/automation/regression/functional_tests/scapy_server_test.py index 5f269f44..9e64f1a8 100755 --- a/scripts/automation/regression/functional_tests/scapy_server_test.py +++ b/scripts/automation/regression/functional_tests/scapy_server_test.py @@ -21,7 +21,54 @@ from nose.tools import assert_not_equal from nose.tools import nottest from nose.plugins.attrib import attr import binascii -from scapy_server import * +from scapy_service import * +from pprint import pprint +import zmq +import json + + +class Scapy_server_wrapper(): + def __init__(self): + self.context = zmq.Context() + self.socket = self.context.socket(zmq.REQ) + self.dest_scapy_port =5555 + self.socket.connect("tcp://10.56.216.133:"+str(self.dest_scapy_port)) #ip address of csi-trex-11 + + def call_method(self,method_name,method_params): + json_rpc_req = { "jsonrpc":"2.0","method": method_name ,"params": method_params, "id":"1"} + request = json.dumps(json_rpc_req) + self.socket.send(request) + # Get the reply. + message = self.socket.recv() +# print("Received reply %s [ %s ]" % (request, message)) + message_parsed = json.loads(message) + try: + result = message_parsed['result'] + except: + result = {'error':message_parsed['error']} + finally: + return result + + def get_all(self): + return self.call_method('get_all',[]) + + def check_update(self,db_md5,field_md5): + result = self.call_method('check_update',[db_md5,field_md5]) + if result!=True: + if 'error' in result.keys(): + if "Fields DB is not up to date" in result['error']['message:']: + raise ScapyException("Fields DB is not up to date") + if "Protocol DB is not up to date" in result['error']['message:']: + raise ScapyException("Protocol DB is not up to date") + return result + + def build_pkt(self,pkt_descriptor): + return self.call_method('build_pkt',[pkt_descriptor]) + + def get_all_pkt_offsets(self,pkt_desc): + return self.call_method('get_all_pkt_offsets',[pkt_desc]) + + class scapy_service_tester(functional_general_test.CGeneralFunctional_Test): @@ -104,14 +151,17 @@ class scapy_service_tester(functional_general_test.CGeneralFunctional_Test): offsets_by_layers = {} for layer in layers: fields_dict = {} + layer_name = layer.partition('(')[0] #clear layer name to include only alpha-numeric + layer_name = re.sub(r'\W+', '',layer_name) for f in original_pkt.fields_desc: size = f.get_size_bytes() + name = f.name if f.name is 'load': size = len(original_pkt) + layer_name = 'Raw' fields_dict[f.name]= [f.offset, size] + fields_dict['global_offset'] = original_pkt.offset original_pkt = original_pkt.payload - layer_name = layer.partition('(')[0] #clear layer name to include only alpha-numeric - layer_name = re.sub(r'\W+', '',layer_name) offsets_by_layers[layer_name] = fields_dict resT1 = (tested_offsets_by_layers == offsets_by_layers) assert_equal(resT1,True) @@ -130,9 +180,34 @@ class scapy_service_tester(functional_general_test.CGeneralFunctional_Test): for packet in packets: self.test_build_packet(packet) - - - + def test_offsets_and_buffer(self,mac_src='ab:cd:ef:12:34:56',mac_dst='98:76:54:32:1a:bc',ip_src='127.1.1.1',ip_dst='192.168.1.1'): + pkt = Ether(src=mac_src,dst=mac_dst)/IP(src=ip_src,dst=ip_dst)/TCP() + pkt_descriptor = "Ether(src='"+mac_src+"',dst='"+mac_dst+"')/IP(src='"+ip_src+"',dst='"+ip_dst+"')/TCP()" + pkt_offsets = self.s.get_all_pkt_offsets(pkt_descriptor) + pkt_buffer = str(pkt) + #--------------------------Dest-MAC-------------------- + mac_start_index = pkt_offsets['Ether']['dst'][0]+pkt_offsets['Ether']['global_offset'] + mac_end_index = mac_start_index+pkt_offsets['Ether']['dst'][1] + assert_equal(binascii.b2a_hex(pkt_buffer[mac_start_index:mac_end_index]),mac_dst.translate(None,':')) + #--------------------------Src-MAC--------------------- + mac_start_index = pkt_offsets['Ether']['src'][0]+pkt_offsets['Ether']['global_offset'] + mac_end_index = mac_start_index+pkt_offsets['Ether']['src'][1] + assert_equal(binascii.b2a_hex(pkt_buffer[mac_start_index:mac_end_index]),mac_src.translate(None,':')) + #--------------------------Dest-IP--------------------- + ip_start_index = pkt_offsets['IP']['dst'][0]+pkt_offsets['IP']['global_offset'] + ip_end_index= ip_start_index+pkt_offsets['IP']['dst'][1] + assert_equal(binascii.b2a_hex(pkt_buffer[ip_start_index:ip_end_index]),binascii.hexlify(socket.inet_aton(ip_dst))) + #--------------------------Src-IP---------------------- + ip_start_index = pkt_offsets['IP']['src'][0]+pkt_offsets['IP']['global_offset'] + ip_end_index= ip_start_index+pkt_offsets['IP']['src'][1] + assert_equal(binascii.b2a_hex(pkt_buffer[ip_start_index:ip_end_index]),binascii.hexlify(socket.inet_aton(ip_src))) + + + + +class scapy_server_tester(scapy_service_tester): + def setUp(self): + self.s = Scapy_server_wrapper() -- cgit From 8183f683a33231cc00b7b6ec6c7144bb5cfff54b Mon Sep 17 00:00:00 2001 From: itraviv Date: Wed, 10 Aug 2016 15:29:10 +0300 Subject: 1)added feature: running the entire test via RPC (over ZMQ) on a "remote" scapy server. 2) the mentioned above server is implemented as a thread running on the local machine,so in this commit a thread mechanism is added --- .../functional_tests/scapy_server_test.py | 27 ++++++++++++++++++---- 1 file changed, 23 insertions(+), 4 deletions(-) (limited to 'scripts/automation/regression/functional_tests') diff --git a/scripts/automation/regression/functional_tests/scapy_server_test.py b/scripts/automation/regression/functional_tests/scapy_server_test.py index 9e64f1a8..ccf0b754 100755 --- a/scripts/automation/regression/functional_tests/scapy_server_test.py +++ b/scripts/automation/regression/functional_tests/scapy_server_test.py @@ -25,14 +25,16 @@ from scapy_service import * from pprint import pprint import zmq import json +import scapy_zmq_server +import threading class Scapy_server_wrapper(): - def __init__(self): + def __init__(self,dest_scapy_port=5555,server_ip_address='localhost'): self.context = zmq.Context() self.socket = self.context.socket(zmq.REQ) - self.dest_scapy_port =5555 - self.socket.connect("tcp://10.56.216.133:"+str(self.dest_scapy_port)) #ip address of csi-trex-11 + self.dest_scapy_port =dest_scapy_port + self.socket.connect("tcp://"+str(server_ip_address)+":"+str(self.dest_scapy_port)) #ip address of csi-trex-11 def call_method(self,method_name,method_params): json_rpc_req = { "jsonrpc":"2.0","method": method_name ,"params": method_params, "id":"1"} @@ -204,10 +206,27 @@ class scapy_service_tester(functional_general_test.CGeneralFunctional_Test): +class scapy_server_thread(threading.Thread): + def __init__(self,thread_id,server_port=5555): + threading.Thread.__init__(self) + self.thread_id = thread_id + self.server_port = server_port + def run(self): + print '\nStarted scapy thread server' + scapy_zmq_server.main(self.server_port) + print 'Thread server closed' + +# Scapy_server_wrapper is the CLIENT for the scapy server, it wraps the CLIENT: its default port is set to 5555, default server ip set to localhost class scapy_server_tester(scapy_service_tester): def setUp(self): - self.s = Scapy_server_wrapper() + self.thread1 = scapy_server_thread(thread_id=1) + self.thread1.start() + self.s = Scapy_server_wrapper(dest_scapy_port=5555,server_ip_address='localhost') + + def tearDown(self): + self.s.call_method('shut_down',[]) + self.thread1.join() -- cgit From 2c575880c6b5bd0a6c4f1df91f819118af800699 Mon Sep 17 00:00:00 2001 From: itraviv Date: Mon, 15 Aug 2016 17:52:38 +0300 Subject: scapy_server_test: added cases for testing: GRE, VXML, DNS, IPv6 changed test functions to include only the testted object and created other functions as testing engines. scapy_service: added api class. added documentry for api class changed method functions to include underscore --- .../functional_tests/scapy_server_test.py | 124 +++++++++------------ 1 file changed, 55 insertions(+), 69 deletions(-) (limited to 'scripts/automation/regression/functional_tests') diff --git a/scripts/automation/regression/functional_tests/scapy_server_test.py b/scripts/automation/regression/functional_tests/scapy_server_test.py index ccf0b754..c0651f69 100755 --- a/scripts/automation/regression/functional_tests/scapy_server_test.py +++ b/scripts/automation/regression/functional_tests/scapy_server_test.py @@ -27,51 +27,7 @@ import zmq import json import scapy_zmq_server import threading - - -class Scapy_server_wrapper(): - def __init__(self,dest_scapy_port=5555,server_ip_address='localhost'): - self.context = zmq.Context() - self.socket = self.context.socket(zmq.REQ) - self.dest_scapy_port =dest_scapy_port - self.socket.connect("tcp://"+str(server_ip_address)+":"+str(self.dest_scapy_port)) #ip address of csi-trex-11 - - def call_method(self,method_name,method_params): - json_rpc_req = { "jsonrpc":"2.0","method": method_name ,"params": method_params, "id":"1"} - request = json.dumps(json_rpc_req) - self.socket.send(request) - # Get the reply. - message = self.socket.recv() -# print("Received reply %s [ %s ]" % (request, message)) - message_parsed = json.loads(message) - try: - result = message_parsed['result'] - except: - result = {'error':message_parsed['error']} - finally: - return result - - def get_all(self): - return self.call_method('get_all',[]) - - def check_update(self,db_md5,field_md5): - result = self.call_method('check_update',[db_md5,field_md5]) - if result!=True: - if 'error' in result.keys(): - if "Fields DB is not up to date" in result['error']['message:']: - raise ScapyException("Fields DB is not up to date") - if "Protocol DB is not up to date" in result['error']['message:']: - raise ScapyException("Protocol DB is not up to date") - return result - - def build_pkt(self,pkt_descriptor): - return self.call_method('build_pkt',[pkt_descriptor]) - - def get_all_pkt_offsets(self,pkt_desc): - return self.call_method('get_all_pkt_offsets',[pkt_desc]) - - - +from scapy_zmq_client import Scapy_server_wrapper class scapy_service_tester(functional_general_test.CGeneralFunctional_Test): def setUp(self): @@ -133,8 +89,7 @@ class scapy_service_tester(functional_general_test.CGeneralFunctional_Test): raise Exception("scapy_server_test: check_updating_db failed") -# testing pkt = Ether()/IP()/TCP()/"test" by defualt - def test_build_packet(self,original_pkt='Ether()/IP()/TCP()/"test"'): + def _build_packet_test_method(self,original_pkt): test_pkt = original_pkt original_pkt = eval(original_pkt) test_res = self.s.build_pkt(test_pkt) @@ -143,12 +98,12 @@ class scapy_service_tester(functional_general_test.CGeneralFunctional_Test): assert_equal(resT1,True) -#testing offsets of packet IP() by default - def test_get_all_offsets(self,original_pkt = 'IP()'): +#testing offsets of a packet + def _get_all_offsets_test_method(self,original_pkt): test_pkt = original_pkt original_pkt = eval(original_pkt) original_pkt.build() - tested_offsets_by_layers = self.s.get_all_pkt_offsets(test_pkt) + tested_offsets_by_layers = self.s._get_all_pkt_offsets(test_pkt) layers = (test_pkt).split('/') offsets_by_layers = {} for layer in layers: @@ -167,25 +122,11 @@ class scapy_service_tester(functional_general_test.CGeneralFunctional_Test): offsets_by_layers[layer_name] = fields_dict resT1 = (tested_offsets_by_layers == offsets_by_layers) assert_equal(resT1,True) - - def test_multi_packet(self): - e0 = 'Ether()' - e1 = 'Ether()/IP()' - e2 = 'TCP()' - e3 = 'UDP()' - e4 = 'Ether()/IP()/TCP()/"test"' - e5 = 'Ether()/IP()/UDP()' - packets = [e0,e1,e2,e3,e4,e5] - for packet in packets: - self.test_get_all_offsets(packet) - - for packet in packets: - self.test_build_packet(packet) - - def test_offsets_and_buffer(self,mac_src='ab:cd:ef:12:34:56',mac_dst='98:76:54:32:1a:bc',ip_src='127.1.1.1',ip_dst='192.168.1.1'): + + def _offsets_and_buffer_test_method(self,mac_src,mac_dst,ip_src,ip_dst): pkt = Ether(src=mac_src,dst=mac_dst)/IP(src=ip_src,dst=ip_dst)/TCP() pkt_descriptor = "Ether(src='"+mac_src+"',dst='"+mac_dst+"')/IP(src='"+ip_src+"',dst='"+ip_dst+"')/TCP()" - pkt_offsets = self.s.get_all_pkt_offsets(pkt_descriptor) + pkt_offsets = self.s._get_all_pkt_offsets(pkt_descriptor) pkt_buffer = str(pkt) #--------------------------Dest-MAC-------------------- mac_start_index = pkt_offsets['Ether']['dst'][0]+pkt_offsets['Ether']['global_offset'] @@ -204,7 +145,52 @@ class scapy_service_tester(functional_general_test.CGeneralFunctional_Test): ip_end_index= ip_start_index+pkt_offsets['IP']['src'][1] assert_equal(binascii.b2a_hex(pkt_buffer[ip_start_index:ip_end_index]),binascii.hexlify(socket.inet_aton(ip_src))) + def test_multi_packet(self): + packets= [ + 'Ether()', + 'Ether()/IP()', + 'TCP()', + 'UDP()', + 'Ether()/IP()/TCP()/"test"', + 'Ether()/IP()/UDP()', + 'Ether()/IP(src="16.0.0.1",dst="48.0.0.1")', + 'Ether()/IP(src="16.0.0.1",dst="48.0.0.1")', + 'Ether()/Dot1Q(vlan=12)/Dot1Q(vlan=12)/IP(src="16.0.0.1",dst="48.0.0.1")/UDP(dport=12,sport=1025)', + 'Ether()/Dot1Q(vlan=12)/IP(src="16.0.0.1",dst="48.0.0.1")/TCP(dport=12,sport=1025)', + 'Ether()/Dot1Q(vlan=12)/IP(src="16.0.0.1",dst="48.0.0.1")/UDP(dport=12,sport=1025)', + 'Ether()/Dot1Q(vlan=12)/IPv6(src="::5")/TCP(dport=12,sport=1025)', + 'Ether()/IP()/UDP()/IPv6(src="::5")/TCP(dport=12,sport=1025)', + 'Ether()/IP(src="16.0.0.1",dst="48.0.0.1")/UDP(dport=12,sport=1025)', + 'Ether()/IP(dst="48.0.0.1")/TCP(dport=80,flags="S")', + 'Ether()/IP(src="16.0.0.1",dst="48.0.0.1")/UDP(dport=12,sport=1025)', + 'Ether() / IP(src = "16.0.0.1", dst = "48.0.0.1") / UDP(dport = 12, sport = 1025)', + 'Ether()/IP()/UDP()', + 'Ether()/IP(src="16.0.0.1",dst="48.0.0.1")/UDP(dport=12,sport=1025)', + 'Ether()/IP(src="16.0.0.1",dst="48.0.0.1")/UDP(dport=12,sport=1025)', + 'Ether()/IP(src="16.0.0.2",dst="48.0.0.1")/UDP(dport=12,sport=1025)', + 'Ether()/IP(src="16.0.0.3",dst="48.0.0.1")/UDP(dport=12,sport=1025)', + r'Ether()/IP()/IPv6()/IP(dst="48.0.0.1",options=IPOption("\x01\x01\x01\x00"))/UDP(dport=12,sport=1025)', + r'Ether()/IP(dst="48.0.0.1",options=IPOption("\x01\x01\x01\x00"))/UDP(dport=12,sport=1025)', + 'Ether()', + 'Ether()/IP()/UDP(sport=1337,dport=4789)/VXLAN(vni=42)/Ether()/IP()/("x"*20)', + 'Ether()/IP(src="16.0.0.1",dst="48.0.0.1")/UDP(dport=3797,sport=3544)/IPv6(dst="2001:0:4137:9350:8000:f12a:b9c8:2815",src="2001:4860:0:2001::68")/UDP(dport=12,sport=1025)/ICMPv6Unknown()', + 'Ether()/IP(src="16.0.0.1",dst="48.0.0.1")/UDP(sport=1025)/DNS()', + 'Ether()/MPLS(label=17,cos=1,s=0,ttl=255)/MPLS(label=0,cos=1,s=1,ttl=12)/IP(src="16.0.0.1",dst="48.0.0.1")/UDP(dport=12,sport=1025)/("x"*20)', + 'Ether()/MPLS(label=17,cos=1,s=0,ttl=255)/MPLS(label=12,cos=1,s=1,ttl=12)/IP(src="16.0.0.1",dst="48.0.0.1")/UDP(dport=12,sport=1025)/("x"*20)', + 'Ether()/IP(src="16.0.0.1",dst="48.0.0.1")/ICMP(type=3)', + 'Ether()/IP()/GRE()/("x"*2)'] + + for packet in packets: + self._get_all_offsets_test_method(packet) + + for packet in packets: + self._build_packet_test_method(packet) + + + def test_offsets_and_buffer(self): + self._offsets_and_buffer_test_method('ab:cd:ef:12:34:56','98:76:54:32:1a:bc','127.1.1.1','192.168.1.1') + self._offsets_and_buffer_test_method('bb:bb:bb:bb:bb:bb','aa:aa:aa:aa:aa:aa','1.1.1.1','0.0.0.0') class scapy_server_thread(threading.Thread): def __init__(self,thread_id,server_port=5555): @@ -213,9 +199,9 @@ class scapy_server_thread(threading.Thread): self.server_port = server_port def run(self): - print '\nStarted scapy thread server' + print('\nStarted scapy thread server') scapy_zmq_server.main(self.server_port) - print 'Thread server closed' + print('Thread server closed') # Scapy_server_wrapper is the CLIENT for the scapy server, it wraps the CLIENT: its default port is set to 5555, default server ip set to localhost class scapy_server_tester(scapy_service_tester): -- cgit From 6796bb99573f15c77a007434feabb30291ac1670 Mon Sep 17 00:00:00 2001 From: itraviv Date: Thu, 18 Aug 2016 15:23:01 +0300 Subject: scapy_server_test: skiping python3 tests scapy_service.py: minor changes to protocol_struct function zmq_client: added zmq_server: minor changes in exception handling --- scripts/automation/regression/functional_tests/scapy_server_test.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'scripts/automation/regression/functional_tests') diff --git a/scripts/automation/regression/functional_tests/scapy_server_test.py b/scripts/automation/regression/functional_tests/scapy_server_test.py index c0651f69..ea3ec0da 100755 --- a/scripts/automation/regression/functional_tests/scapy_server_test.py +++ b/scripts/automation/regression/functional_tests/scapy_server_test.py @@ -1,13 +1,15 @@ # scapy server unit test import sys,os +from nose.plugins.skip import SkipTest +if sys.version_info.major == 3: + raise SkipTest("Python3 currently not supported") scapy_server_path = os.path.abspath(os.path.join(os.pardir, 'trex_control_plane', 'stl', 'services','scapy_server')) stl_pathname = os.path.abspath(os.path.join(os.pardir, os.pardir, 'trex_control_plane','stl')) sys.path.append(scapy_server_path) sys.path.append(stl_pathname) -#import stl_path import trex_stl_lib from trex_stl_lib.api import * from copy import deepcopy -- cgit