summaryrefslogtreecommitdiffstats
path: root/scripts/automation/regression
diff options
context:
space:
mode:
authoritraviv <itraviv@cisco.com>2016-08-15 17:52:38 +0300
committeritraviv <itraviv@cisco.com>2016-08-15 17:52:38 +0300
commit2c575880c6b5bd0a6c4f1df91f819118af800699 (patch)
treeea751687ef67f7452d31d6510d7a065df0146fe2 /scripts/automation/regression
parent72171290152897fac1a149fe4f9d1cbf0ae914c4 (diff)
scapy_server_test:
added cases for testing: GRE, VXML, DNS, IPv6 changed test functions to include only the testted object and created other functions as testing engines. scapy_service: added api class. added documentry for api class changed method functions to include underscore
Diffstat (limited to 'scripts/automation/regression')
-rwxr-xr-xscripts/automation/regression/functional_tests/scapy_server_test.py124
1 files changed, 55 insertions, 69 deletions
diff --git a/scripts/automation/regression/functional_tests/scapy_server_test.py b/scripts/automation/regression/functional_tests/scapy_server_test.py
index ccf0b754..c0651f69 100755
--- a/scripts/automation/regression/functional_tests/scapy_server_test.py
+++ b/scripts/automation/regression/functional_tests/scapy_server_test.py
@@ -27,51 +27,7 @@ import zmq
import json
import scapy_zmq_server
import threading
-
-
-class Scapy_server_wrapper():
- def __init__(self,dest_scapy_port=5555,server_ip_address='localhost'):
- self.context = zmq.Context()
- self.socket = self.context.socket(zmq.REQ)
- self.dest_scapy_port =dest_scapy_port
- self.socket.connect("tcp://"+str(server_ip_address)+":"+str(self.dest_scapy_port)) #ip address of csi-trex-11
-
- def call_method(self,method_name,method_params):
- json_rpc_req = { "jsonrpc":"2.0","method": method_name ,"params": method_params, "id":"1"}
- request = json.dumps(json_rpc_req)
- self.socket.send(request)
- # Get the reply.
- message = self.socket.recv()
-# print("Received reply %s [ %s ]" % (request, message))
- message_parsed = json.loads(message)
- try:
- result = message_parsed['result']
- except:
- result = {'error':message_parsed['error']}
- finally:
- return result
-
- def get_all(self):
- return self.call_method('get_all',[])
-
- def check_update(self,db_md5,field_md5):
- result = self.call_method('check_update',[db_md5,field_md5])
- if result!=True:
- if 'error' in result.keys():
- if "Fields DB is not up to date" in result['error']['message:']:
- raise ScapyException("Fields DB is not up to date")
- if "Protocol DB is not up to date" in result['error']['message:']:
- raise ScapyException("Protocol DB is not up to date")
- return result
-
- def build_pkt(self,pkt_descriptor):
- return self.call_method('build_pkt',[pkt_descriptor])
-
- def get_all_pkt_offsets(self,pkt_desc):
- return self.call_method('get_all_pkt_offsets',[pkt_desc])
-
-
-
+from scapy_zmq_client import Scapy_server_wrapper
class scapy_service_tester(functional_general_test.CGeneralFunctional_Test):
def setUp(self):
@@ -133,8 +89,7 @@ class scapy_service_tester(functional_general_test.CGeneralFunctional_Test):
raise Exception("scapy_server_test: check_updating_db failed")
-# testing pkt = Ether()/IP()/TCP()/"test" by defualt
- def test_build_packet(self,original_pkt='Ether()/IP()/TCP()/"test"'):
+ def _build_packet_test_method(self,original_pkt):
test_pkt = original_pkt
original_pkt = eval(original_pkt)
test_res = self.s.build_pkt(test_pkt)
@@ -143,12 +98,12 @@ class scapy_service_tester(functional_general_test.CGeneralFunctional_Test):
assert_equal(resT1,True)
-#testing offsets of packet IP() by default
- def test_get_all_offsets(self,original_pkt = 'IP()'):
+#testing offsets of a packet
+ def _get_all_offsets_test_method(self,original_pkt):
test_pkt = original_pkt
original_pkt = eval(original_pkt)
original_pkt.build()
- tested_offsets_by_layers = self.s.get_all_pkt_offsets(test_pkt)
+ tested_offsets_by_layers = self.s._get_all_pkt_offsets(test_pkt)
layers = (test_pkt).split('/')
offsets_by_layers = {}
for layer in layers:
@@ -167,25 +122,11 @@ class scapy_service_tester(functional_general_test.CGeneralFunctional_Test):
offsets_by_layers[layer_name] = fields_dict
resT1 = (tested_offsets_by_layers == offsets_by_layers)
assert_equal(resT1,True)
-
- def test_multi_packet(self):
- e0 = 'Ether()'
- e1 = 'Ether()/IP()'
- e2 = 'TCP()'
- e3 = 'UDP()'
- e4 = 'Ether()/IP()/TCP()/"test"'
- e5 = 'Ether()/IP()/UDP()'
- packets = [e0,e1,e2,e3,e4,e5]
- for packet in packets:
- self.test_get_all_offsets(packet)
-
- for packet in packets:
- self.test_build_packet(packet)
-
- def test_offsets_and_buffer(self,mac_src='ab:cd:ef:12:34:56',mac_dst='98:76:54:32:1a:bc',ip_src='127.1.1.1',ip_dst='192.168.1.1'):
+
+ def _offsets_and_buffer_test_method(self,mac_src,mac_dst,ip_src,ip_dst):
pkt = Ether(src=mac_src,dst=mac_dst)/IP(src=ip_src,dst=ip_dst)/TCP()
pkt_descriptor = "Ether(src='"+mac_src+"',dst='"+mac_dst+"')/IP(src='"+ip_src+"',dst='"+ip_dst+"')/TCP()"
- pkt_offsets = self.s.get_all_pkt_offsets(pkt_descriptor)
+ pkt_offsets = self.s._get_all_pkt_offsets(pkt_descriptor)
pkt_buffer = str(pkt)
#--------------------------Dest-MAC--------------------
mac_start_index = pkt_offsets['Ether']['dst'][0]+pkt_offsets['Ether']['global_offset']
@@ -204,7 +145,52 @@ class scapy_service_tester(functional_general_test.CGeneralFunctional_Test):
ip_end_index= ip_start_index+pkt_offsets['IP']['src'][1]
assert_equal(binascii.b2a_hex(pkt_buffer[ip_start_index:ip_end_index]),binascii.hexlify(socket.inet_aton(ip_src)))
+ def test_multi_packet(self):
+ packets= [
+ 'Ether()',
+ 'Ether()/IP()',
+ 'TCP()',
+ 'UDP()',
+ 'Ether()/IP()/TCP()/"test"',
+ 'Ether()/IP()/UDP()',
+ 'Ether()/IP(src="16.0.0.1",dst="48.0.0.1")',
+ 'Ether()/IP(src="16.0.0.1",dst="48.0.0.1")',
+ 'Ether()/Dot1Q(vlan=12)/Dot1Q(vlan=12)/IP(src="16.0.0.1",dst="48.0.0.1")/UDP(dport=12,sport=1025)',
+ 'Ether()/Dot1Q(vlan=12)/IP(src="16.0.0.1",dst="48.0.0.1")/TCP(dport=12,sport=1025)',
+ 'Ether()/Dot1Q(vlan=12)/IP(src="16.0.0.1",dst="48.0.0.1")/UDP(dport=12,sport=1025)',
+ 'Ether()/Dot1Q(vlan=12)/IPv6(src="::5")/TCP(dport=12,sport=1025)',
+ 'Ether()/IP()/UDP()/IPv6(src="::5")/TCP(dport=12,sport=1025)',
+ 'Ether()/IP(src="16.0.0.1",dst="48.0.0.1")/UDP(dport=12,sport=1025)',
+ 'Ether()/IP(dst="48.0.0.1")/TCP(dport=80,flags="S")',
+ 'Ether()/IP(src="16.0.0.1",dst="48.0.0.1")/UDP(dport=12,sport=1025)',
+ 'Ether() / IP(src = "16.0.0.1", dst = "48.0.0.1") / UDP(dport = 12, sport = 1025)',
+ 'Ether()/IP()/UDP()',
+ 'Ether()/IP(src="16.0.0.1",dst="48.0.0.1")/UDP(dport=12,sport=1025)',
+ 'Ether()/IP(src="16.0.0.1",dst="48.0.0.1")/UDP(dport=12,sport=1025)',
+ 'Ether()/IP(src="16.0.0.2",dst="48.0.0.1")/UDP(dport=12,sport=1025)',
+ 'Ether()/IP(src="16.0.0.3",dst="48.0.0.1")/UDP(dport=12,sport=1025)',
+ r'Ether()/IP()/IPv6()/IP(dst="48.0.0.1",options=IPOption("\x01\x01\x01\x00"))/UDP(dport=12,sport=1025)',
+ r'Ether()/IP(dst="48.0.0.1",options=IPOption("\x01\x01\x01\x00"))/UDP(dport=12,sport=1025)',
+ 'Ether()',
+ 'Ether()/IP()/UDP(sport=1337,dport=4789)/VXLAN(vni=42)/Ether()/IP()/("x"*20)',
+ 'Ether()/IP(src="16.0.0.1",dst="48.0.0.1")/UDP(dport=3797,sport=3544)/IPv6(dst="2001:0:4137:9350:8000:f12a:b9c8:2815",src="2001:4860:0:2001::68")/UDP(dport=12,sport=1025)/ICMPv6Unknown()',
+ 'Ether()/IP(src="16.0.0.1",dst="48.0.0.1")/UDP(sport=1025)/DNS()',
+ 'Ether()/MPLS(label=17,cos=1,s=0,ttl=255)/MPLS(label=0,cos=1,s=1,ttl=12)/IP(src="16.0.0.1",dst="48.0.0.1")/UDP(dport=12,sport=1025)/("x"*20)',
+ 'Ether()/MPLS(label=17,cos=1,s=0,ttl=255)/MPLS(label=12,cos=1,s=1,ttl=12)/IP(src="16.0.0.1",dst="48.0.0.1")/UDP(dport=12,sport=1025)/("x"*20)',
+ 'Ether()/IP(src="16.0.0.1",dst="48.0.0.1")/ICMP(type=3)',
+ 'Ether()/IP()/GRE()/("x"*2)']
+
+ for packet in packets:
+ self._get_all_offsets_test_method(packet)
+
+ for packet in packets:
+ self._build_packet_test_method(packet)
+
+
+ def test_offsets_and_buffer(self):
+ self._offsets_and_buffer_test_method('ab:cd:ef:12:34:56','98:76:54:32:1a:bc','127.1.1.1','192.168.1.1')
+ self._offsets_and_buffer_test_method('bb:bb:bb:bb:bb:bb','aa:aa:aa:aa:aa:aa','1.1.1.1','0.0.0.0')
class scapy_server_thread(threading.Thread):
def __init__(self,thread_id,server_port=5555):
@@ -213,9 +199,9 @@ class scapy_server_thread(threading.Thread):
self.server_port = server_port
def run(self):
- print '\nStarted scapy thread server'
+ print('\nStarted scapy thread server')
scapy_zmq_server.main(self.server_port)
- print 'Thread server closed'
+ print('Thread server closed')
# Scapy_server_wrapper is the CLIENT for the scapy server, it wraps the CLIENT: its default port is set to 5555, default server ip set to localhost
class scapy_server_tester(scapy_service_tester):