summaryrefslogtreecommitdiffstats
path: root/scripts/automation/regression
diff options
context:
space:
mode:
authoritraviv <itraviv@cisco.com>2016-08-09 11:29:52 +0300
committeritraviv <itraviv@cisco.com>2016-08-09 11:29:52 +0300
commit20d4c966c15aa134d58c8d53d840cddd9903f9aa (patch)
tree8561b6457e6db7213f00c3106dde56d2d9da9c3e /scripts/automation/regression
parentcb24f0f0ca09881de3754dbfb7f37d1f08f4b306 (diff)
added wrapper class to support testing of scapy_server, this class wraps server interaction.
added test of offsets versus actual packet dump
Diffstat (limited to 'scripts/automation/regression')
-rwxr-xr-xscripts/automation/regression/functional_tests/scapy_server_test.py87
1 files changed, 81 insertions, 6 deletions
diff --git a/scripts/automation/regression/functional_tests/scapy_server_test.py b/scripts/automation/regression/functional_tests/scapy_server_test.py
index 5f269f44..9e64f1a8 100755
--- a/scripts/automation/regression/functional_tests/scapy_server_test.py
+++ b/scripts/automation/regression/functional_tests/scapy_server_test.py
@@ -21,7 +21,54 @@ from nose.tools import assert_not_equal
from nose.tools import nottest
from nose.plugins.attrib import attr
import binascii
-from scapy_server import *
+from scapy_service import *
+from pprint import pprint
+import zmq
+import json
+
+
+class Scapy_server_wrapper():
+ def __init__(self):
+ self.context = zmq.Context()
+ self.socket = self.context.socket(zmq.REQ)
+ self.dest_scapy_port =5555
+ self.socket.connect("tcp://10.56.216.133:"+str(self.dest_scapy_port)) #ip address of csi-trex-11
+
+ def call_method(self,method_name,method_params):
+ json_rpc_req = { "jsonrpc":"2.0","method": method_name ,"params": method_params, "id":"1"}
+ request = json.dumps(json_rpc_req)
+ self.socket.send(request)
+ # Get the reply.
+ message = self.socket.recv()
+# print("Received reply %s [ %s ]" % (request, message))
+ message_parsed = json.loads(message)
+ try:
+ result = message_parsed['result']
+ except:
+ result = {'error':message_parsed['error']}
+ finally:
+ return result
+
+ def get_all(self):
+ return self.call_method('get_all',[])
+
+ def check_update(self,db_md5,field_md5):
+ result = self.call_method('check_update',[db_md5,field_md5])
+ if result!=True:
+ if 'error' in result.keys():
+ if "Fields DB is not up to date" in result['error']['message:']:
+ raise ScapyException("Fields DB is not up to date")
+ if "Protocol DB is not up to date" in result['error']['message:']:
+ raise ScapyException("Protocol DB is not up to date")
+ return result
+
+ def build_pkt(self,pkt_descriptor):
+ return self.call_method('build_pkt',[pkt_descriptor])
+
+ def get_all_pkt_offsets(self,pkt_desc):
+ return self.call_method('get_all_pkt_offsets',[pkt_desc])
+
+
class scapy_service_tester(functional_general_test.CGeneralFunctional_Test):
@@ -104,14 +151,17 @@ class scapy_service_tester(functional_general_test.CGeneralFunctional_Test):
offsets_by_layers = {}
for layer in layers:
fields_dict = {}
+ layer_name = layer.partition('(')[0] #clear layer name to include only alpha-numeric
+ layer_name = re.sub(r'\W+', '',layer_name)
for f in original_pkt.fields_desc:
size = f.get_size_bytes()
+ name = f.name
if f.name is 'load':
size = len(original_pkt)
+ layer_name = 'Raw'
fields_dict[f.name]= [f.offset, size]
+ fields_dict['global_offset'] = original_pkt.offset
original_pkt = original_pkt.payload
- layer_name = layer.partition('(')[0] #clear layer name to include only alpha-numeric
- layer_name = re.sub(r'\W+', '',layer_name)
offsets_by_layers[layer_name] = fields_dict
resT1 = (tested_offsets_by_layers == offsets_by_layers)
assert_equal(resT1,True)
@@ -130,9 +180,34 @@ class scapy_service_tester(functional_general_test.CGeneralFunctional_Test):
for packet in packets:
self.test_build_packet(packet)
-
-
-
+ def test_offsets_and_buffer(self,mac_src='ab:cd:ef:12:34:56',mac_dst='98:76:54:32:1a:bc',ip_src='127.1.1.1',ip_dst='192.168.1.1'):
+ pkt = Ether(src=mac_src,dst=mac_dst)/IP(src=ip_src,dst=ip_dst)/TCP()
+ pkt_descriptor = "Ether(src='"+mac_src+"',dst='"+mac_dst+"')/IP(src='"+ip_src+"',dst='"+ip_dst+"')/TCP()"
+ pkt_offsets = self.s.get_all_pkt_offsets(pkt_descriptor)
+ pkt_buffer = str(pkt)
+ #--------------------------Dest-MAC--------------------
+ mac_start_index = pkt_offsets['Ether']['dst'][0]+pkt_offsets['Ether']['global_offset']
+ mac_end_index = mac_start_index+pkt_offsets['Ether']['dst'][1]
+ assert_equal(binascii.b2a_hex(pkt_buffer[mac_start_index:mac_end_index]),mac_dst.translate(None,':'))
+ #--------------------------Src-MAC---------------------
+ mac_start_index = pkt_offsets['Ether']['src'][0]+pkt_offsets['Ether']['global_offset']
+ mac_end_index = mac_start_index+pkt_offsets['Ether']['src'][1]
+ assert_equal(binascii.b2a_hex(pkt_buffer[mac_start_index:mac_end_index]),mac_src.translate(None,':'))
+ #--------------------------Dest-IP---------------------
+ ip_start_index = pkt_offsets['IP']['dst'][0]+pkt_offsets['IP']['global_offset']
+ ip_end_index= ip_start_index+pkt_offsets['IP']['dst'][1]
+ assert_equal(binascii.b2a_hex(pkt_buffer[ip_start_index:ip_end_index]),binascii.hexlify(socket.inet_aton(ip_dst)))
+ #--------------------------Src-IP----------------------
+ ip_start_index = pkt_offsets['IP']['src'][0]+pkt_offsets['IP']['global_offset']
+ ip_end_index= ip_start_index+pkt_offsets['IP']['src'][1]
+ assert_equal(binascii.b2a_hex(pkt_buffer[ip_start_index:ip_end_index]),binascii.hexlify(socket.inet_aton(ip_src)))
+
+
+
+
+class scapy_server_tester(scapy_service_tester):
+ def setUp(self):
+ self.s = Scapy_server_wrapper()