summaryrefslogtreecommitdiffstats
path: root/scripts
diff options
context:
space:
mode:
authoritraviv <itraviv@cisco.com>2016-07-31 11:53:34 +0300
committeritraviv <itraviv@cisco.com>2016-07-31 11:53:34 +0300
commitabf329075bd14f5f41c3753d560260ac809ec4f3 (patch)
treed224ab9d306640376b5403c247e0b2dc6c5ebfaa /scripts
parentfb2b01538ee9ff03716c5546252e2825c1974198 (diff)
scapy_server for GUI+test
Diffstat (limited to 'scripts')
-rwxr-xr-xscripts/automation/regression/functional_tests/scapy_server_test.py179
-rwxr-xr-xscripts/automation/trex_control_plane/stl/examples/scapy_server.py272
2 files changed, 451 insertions, 0 deletions
diff --git a/scripts/automation/regression/functional_tests/scapy_server_test.py b/scripts/automation/regression/functional_tests/scapy_server_test.py
new file mode 100755
index 00000000..1c6d2bd0
--- /dev/null
+++ b/scripts/automation/regression/functional_tests/scapy_server_test.py
@@ -0,0 +1,179 @@
+# scapy server unit test
+
+import sys,os
+scapy_server_path = os.path.abspath(os.path.join(os.pardir, 'trex_control_plane', 'stl', 'examples'))
+print scapy_server_path
+stl_pathname = os.path.abspath(os.path.join(os.pardir, os.pardir, 'trex_control_plane','stl'))
+sys.path.append(scapy_server_path)
+sys.path.append(stl_pathname)
+
+
+
+#import stl_path
+import trex_stl_lib
+from trex_stl_lib.api import *
+from copy import deepcopy
+
+import tempfile
+import md5
+
+import outer_packages
+from platform_cmd_link import *
+import functional_general_test
+from nose.tools import assert_equal
+from nose.tools import assert_not_equal
+from nose.tools import nottest
+from nose.plugins.attrib import attr
+
+from scapy_server import *
+
+
+class scapy_server_tester(functional_general_test.CGeneralFunctional_Test):
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ '''
+ test for db and field update - checking check_update_test()
+ '''
+ def test_check_update(self):
+ allData = get_all()
+ allDataParsed = json.loads(allData)
+ dbMD5 = allDataParsed['DB_md5']
+ fieldMD5 = allDataParsed['fields_md5']
+ result = check_update(dbMD5,fieldMD5)
+ result = json.loads(result)
+ '''
+ if result[0][0] == 'Success' and result[1][0] == 'Success':
+ print 'check_update_test [md5 comparison test]: Success'
+ else:
+ print 'check_update_test [md5 comparison test]: md5s of fields or db do not match the source'
+ '''
+ resT1 = (result[0][0] == 'Success' and result[1][0] == 'Success')
+ assert_equal(resT1,True)
+
+ result = check_update(json.dumps('falseMD5'),json.dumps('falseMD5'))
+ result = json.loads(result)
+ '''
+ if result[0][0] == 'Fail' and result[1][0] == 'Fail':
+ print 'check_update_test [wrong md5s return failure]: Success'
+ else:
+ print 'check_update_test [wrong md5s return failure]: md5s of fields or db return Success for invalid value'
+ '''
+ resT2 = (result[0][0] == 'Fail' and result[1][0] == 'Fail')
+ assert_equal(resT2,True)
+
+ result = check_update(dbMD5,json.dumps('falseMD5'))
+ result = json.loads(result)
+ '''
+ if result[0][0] == 'Fail' and result[1][0] == 'Success':
+ print 'check_update_test [wrong field md5 returns error, correct db md5]: Success'
+ else:
+ print 'md5 of field return Success for invalid value'
+ '''
+ resT3 = (result[0][0] == 'Fail' and result[1][0] == 'Success')
+ assert_equal(resT3,True)
+
+ result = check_update(json.dumps('falseMD5'),fieldMD5)
+ result = json.loads(result)
+ '''
+ if result[0][0] == 'Success' and result[1][0] == 'Fail':
+ print 'check_update_test [wrong db md5 returns error, correct field md5]: Success'
+ else:
+ print 'md5 of db return Success for invalid value'
+ '''
+ resT4 = (result[0][0] == 'Success' and result[1][0] == 'Fail')
+ assert_equal(resT4,True)
+
+
+ def test_check_updating_db(self):
+ #assume i got old db
+ result = check_update(json.dumps('falseMD5'),json.dumps('falseMD5'))
+ result = json.loads(result)
+ if result[0][0] == 'Fail' or result[1][0] == 'Fail':
+ newAllData = get_all()
+ allDataParsed = json.loads(newAllData)
+ dbMD5 = allDataParsed['DB_md5']
+ fieldMD5 = allDataParsed['fields_md5']
+ result = check_update(dbMD5,fieldMD5)
+ result = json.loads(result)
+ '''
+ if result[0][0] == 'Success' and result[1][0] == 'Success':
+ print 'check_updating_db [got old db and updated it]: Success'
+ else:
+ print'check_updating_db [got old db and updated it]: FAILED'
+ '''
+ resT1 = (result[0][0] == 'Success' and result[1][0] == 'Success')
+ assert_equal(resT1,True)
+ else:
+ raise Exception("scapy_server_test: check_updating_db failed")
+
+
+# testing pkt = Ether()/IP()/TCP()/"test" by defualt
+ def test_build_packet(self,original_pkt = json.dumps('Ether()/IP()/TCP()/"test"')):
+ test_pkt = original_pkt
+ original_pkt = eval(json.loads(original_pkt))
+ test_res = build_pkt(test_pkt)
+ test_res = json.loads(test_res)
+ test_pkt_buffer = json.loads(test_res[2])
+ test_pkt_buffer = test_pkt_buffer.decode('base64')
+ '''
+ if test_pkt_buffer == str(original_pkt):
+ print 'build_pkt test [scapy packet and string-defined packet comparison]: Success'
+ else:
+ print 'build_pkt test [scapy packet and string-defined packet comparison]: FAILED'
+ '''
+ resT1 = (test_pkt_buffer == str(original_pkt))
+ assert_equal(resT1,True)
+
+
+#testing offsets of packet IP() by default
+ def test_get_all_offsets(self,original_pkt = json.dumps('IP()')):
+ test_pkt = original_pkt
+ original_pkt = eval(json.loads(original_pkt))
+ tested_offsets_by_layers = get_all_pkt_offsets(test_pkt)
+ tested_offsets_by_layers = json.loads(tested_offsets_by_layers)
+ layers = json.loads(test_pkt).split('/')
+ offsets_by_layers = {}
+ for layer in layers:
+ fields_list = []
+ for f in original_pkt.fields_desc:
+ size = f.get_size_bytes()
+ if f.name is 'load':
+ size = len(original_pkt)
+ fields_list.append([f.name, f.offset, size])
+ original_pkt = original_pkt.payload
+ offsets_by_layers[layer] = fields_list
+ '''
+ if tested_offsets_by_layers == offsets_by_layers:
+ print 'Success'
+ else:
+ print 'test_get_all_offsets[comparison of offsets in given packet]: FAILED'
+ '''
+ resT1 = (tested_offsets_by_layers == offsets_by_layers)
+ assert_equal(resT1,True)
+
+ def test_multi_packet(self):
+ e0 = json.dumps('Ether()')
+ e1 = json.dumps('Ether()/IP()')
+ e2 = json.dumps('TCP()')
+ e3 = json.dumps('UDP()')
+ e4 = json.dumps('Ether()/IP()/TCP()/"test"')
+ e5 = json.dumps('Ether()/IP()/UDP()')
+ packets = [e0,e1,e2,e3,e4,e5]
+
+ for packet in packets:
+ self.test_get_all_offsets(packet)
+
+ for packet in packets:
+ self.test_build_packet(packet)
+
+
+
+
+
+
+
+
diff --git a/scripts/automation/trex_control_plane/stl/examples/scapy_server.py b/scripts/automation/trex_control_plane/stl/examples/scapy_server.py
new file mode 100755
index 00000000..4762f1a6
--- /dev/null
+++ b/scripts/automation/trex_control_plane/stl/examples/scapy_server.py
@@ -0,0 +1,272 @@
+import stl_path
+import trex_stl_lib
+from trex_stl_lib.api import *
+from copy import deepcopy
+import sys
+import tempfile
+import md5
+
+#print ls()
+
+from cStringIO import StringIO
+import sys
+"""
+old_stdout = sys.stdout
+sys.stdout = mystdout = StringIO()
+
+ls()
+
+sys.stdout = old_stdout
+
+a= mystdout.getvalue()
+
+f = open('scapy_supported_formats.txt','w')
+f.write(a)
+f.close()
+"""
+#-------------------------------------------------------TREE IMPLEMENTATION ------------------------
+class node(object):
+ def __init__(self, value):#, children = []):
+ self.value = value
+ self.children = []
+
+ def __str__(self, level=0):
+ ret = "\t"*level+repr(self.value)+"\n"
+ for child in self.children:
+ ret += child.__str__(level+1)
+ return ret
+#----------------------------------------------------------------------------------------------------
+
+
+def build_lib():
+ lib = protocol_struct()
+ lib = lib.split('\n')
+ all_protocols=[]
+ for entry in lib:
+ entry = entry.split(':')
+ all_protocols.append(entry[0].strip())
+ del all_protocols[len(all_protocols)-1]
+ return all_protocols
+
+def protocol_struct(protocol=''):
+ if '_' in protocol:
+ return []
+ if not protocol=='':
+ if protocol not in all_protocols:
+ return 'protocol not supported'
+ protocol = eval(protocol)
+ old_stdout = sys.stdout
+ sys.stdout = mystdout = StringIO()
+ if not protocol=='':
+ ls(protocol)
+ else:
+ ls()
+ sys.stdout = old_stdout
+ a= mystdout.getvalue()
+# print a
+ return a
+
+def parse_description_line(line):
+ line_arr = [x.strip() for x in re.split(': | = ',line)]
+ return tuple(line_arr)
+
+def parse_entire_description(d):
+ d = d.split('\n')
+ description_list = [parse_description_line(x) for x in d]
+ del description_list[len(description_list)-1]
+ return description_list
+
+def get_protocol_details(p_name):
+ protocol_str = protocol_struct(p_name)
+ if protocol_str=='protocol not supported':
+ return 'protocol not supported'
+ if len(protocol_str) is 0:
+ return []
+ tupled_protocol = parse_entire_description(protocol_str)
+ return tupled_protocol
+
+
+class scapyRegex:
+ def __init__(self,FieldName,regex='empty'):
+ self.FieldName = FieldName
+ self.regex = regex
+
+ def stringRegex(self):
+ return self.regex
+
+all_protocols = build_lib()
+
+Raw = {'Raw':''}
+high_level_protocols = ['Raw']
+transport_protocols = {'TCP':Raw,'UDP':Raw}
+network_protocols = {'IP':transport_protocols ,'ARP':''}
+low_level_protocols = { 'Ether': network_protocols }
+regexDB= {'MACField' : scapyRegex('MACField','^([0-9a-fA-F][0-9a-fA-F]:){5}([0-9a-fA-F][0-9a-fA-F])$'),
+ 'IPField' : scapyRegex('IPField','^(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9])\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9]|0)\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9]|0)\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[0-9])$')}
+
+
+def build_nodes(data,dictionary):
+ n = node(data)
+ if len(dictionary)==0:
+ n.children=[]
+ return n
+ for obj in dictionary.keys():
+ if not (obj==''):
+ x = build_nodes(obj,dictionary[obj])
+ n.children.append(x)
+ return n
+
+def build_tree():
+ root = node('ALL')
+ root.children = []
+ root = build_nodes('ALL',low_level_protocols)
+ return root
+
+protocol_tree = build_tree()
+
+def print_tree(root,level=0):
+ output = "\t"*level+str(root.value)
+ print output
+ if len(root.children)==0:
+ return
+ for child in root.children:
+ print_tree(child,level+1)
+
+def get_all_protocols():
+ return json.dumps(all_protocols)
+
+def get_tree():
+ old_stdout = sys.stdout
+ sys.stdout = mystdout = StringIO()
+ print_tree(t)
+ sys.stdout = old_stdout
+ a= mystdout.getvalue()
+ return json.dumps(a)
+
+def get_all_db():
+ db = {}
+ for pro in all_protocols:
+ details = get_protocol_details(pro)
+ db[pro] = json.dumps(details)
+ return db
+
+def get_all_fields():
+ fields = []
+ for pro in all_protocols:
+ details = get_protocol_details(pro)
+ for i in range(0,len(details),1):
+ if len(details[i]) is 3:
+ fields.append(details[i][1])
+ uniqeFields = list(set(fields))
+ fieldDict = {}
+ for f in uniqeFields:
+ if f in regexDB:
+ fieldDict[f] = regexDB[f].stringRegex()
+ else:
+ fieldDict[f] = scapyRegex(f).stringRegex()
+ return fieldDict
+
+
+class pktResult:
+ def __init__(self,result,errorCode,errorDesc):
+ self.result = result
+ self.errorCode = errorCode
+ self.errorDesc = errorDesc
+
+ def convert2tuple(self):
+ return tuple([self.result,self.errorCode,self.errorDesc])
+
+
+
+# pkt_descriptor in JSON format only!
+# returned tuple in JSON format: (pktResultTuple , show2data, hexdump(buffer))
+# pktResultTuple is: ( result, errorCode, error description )
+
+def build_pkt(pkt_descriptor):
+ try:
+ pkt = eval(json.loads(pkt_descriptor))
+ old_stdout = sys.stdout
+ sys.stdout = mystdout = StringIO()
+ pkt.show2()
+ sys.stdout = old_stdout
+ show2data = mystdout.getvalue() #show2 data
+ bufferData = str(pkt) #pkt buffer
+ bufferData = bufferData.encode('base64')
+ pktRes = pktResult('Success',0,'None').convert2tuple()
+ res = [pktRes,json.dumps(show2data),json.dumps(bufferData)]
+ JSONres = json.dumps(res)
+ return JSONres
+ except:
+ pktRes = pktResult('Pkt build Failed',str(sys.exc_info()[0]),str(sys.exc_info()[1])).convert2tuple()
+ res = [pktRes,[],[]]
+ res = json.dumps(res)
+ return res
+
+#input: container
+#output: md5 in json format encoded in base64
+def getMD5(container):
+ resMD5 = md5.new(json.dumps(container))
+ resMD5 = json.dumps(resMD5.digest().encode('base64'))
+ return resMD5
+
+
+def get_all():
+ fields=get_all_fields()
+ db=get_all_db()
+ fieldMD5 = getMD5(fields)
+ dbMD5 = getMD5(db)
+ res = {}
+ res['db'] = db
+ res['fields'] = fields
+ res['DB_md5'] = dbMD5
+# print dbMD5
+ res['fields_md5'] = fieldMD5
+ return json.dumps(res)
+
+#input in json string encoded base64
+def check_update(dbMD5,fieldMD5):
+ fields=get_all_fields()
+ db=get_all_db()
+ currentDBMD5 = json.loads(getMD5(db))
+ currentFieldMD5 = json.loads(getMD5(fields))
+ dbMD5_parsed = json.loads(dbMD5)
+ fieldMD5_parsed = json.loads(fieldMD5)
+ res = []
+# print 'this is current DB MD5: %s ' % currentDBMD5
+# print 'this is given DB MD5: %s ' % dbMD5_parsed
+ if fieldMD5_parsed == currentFieldMD5:
+ resField = pktResult('Success',0,'None').convert2tuple()
+ else:
+ resField = pktResult('Fail',0,'Field DB is not up to date').convert2tuple()
+ if dbMD5_parsed == currentDBMD5:
+ resDB = pktResult('Success',0,'None').convert2tuple()
+ else:
+ resDB = pktResult('Fail',0,'Protocol DB is not up to date').convert2tuple()
+ return json.dumps([resField,resDB])
+
+#pkt_desc as json
+#dictionary of offsets per protocol. tuple for each field: (name, offset, size) at json format
+def get_all_pkt_offsets(pkt_desc):
+ pkt_desc= json.loads(pkt_desc)
+ pkt_protocols = pkt_desc.split('/')
+ scapy_pkt = eval(pkt_desc)
+ total_protocols = len(pkt_protocols)
+ res = {}
+ for i in range(total_protocols):
+ fields = []
+ for field in scapy_pkt.fields_desc:
+ size = field.get_size_bytes()
+ if field.name is 'load':
+ size = len(scapy_pkt)
+ fields.append([field.name, field.offset, size])
+ res[pkt_protocols[i]] = fields
+ scapy_pkt=scapy_pkt.payload
+ return json.dumps(res)
+
+
+
+def get_supported_cmds():
+
+
+
+