summaryrefslogtreecommitdiffstats
path: root/scripts
diff options
context:
space:
mode:
Diffstat (limited to 'scripts')
-rw-r--r--scripts/automation/regression/unit_tests/functional_tests/scapy_pkt_builder_test.py212
-rw-r--r--scripts/automation/trex_control_plane/client_utils/scapy_packet_builder.py50
-rw-r--r--scripts/external_libs/scapy-2.3.1/scapy/layers/inet6.py2
3 files changed, 249 insertions, 15 deletions
diff --git a/scripts/automation/regression/unit_tests/functional_tests/scapy_pkt_builder_test.py b/scripts/automation/regression/unit_tests/functional_tests/scapy_pkt_builder_test.py
new file mode 100644
index 00000000..bfa9243c
--- /dev/null
+++ b/scripts/automation/regression/unit_tests/functional_tests/scapy_pkt_builder_test.py
@@ -0,0 +1,212 @@
+#!/router/bin/python
+
+import pkt_bld_general_test
+from client_utils.scapy_packet_builder import *
+from scapy.all import *
+from nose.tools import assert_equal
+from nose.tools import assert_not_equal
+from nose.tools import assert_raises
+from nose.tools import raises
+import os
+import random
+import pprint
+
+class CTRexPktBuilderSanitySCapy_Test(pkt_bld_general_test.CGeneralPktBld_Test):
+
+ def setUp(self):
+ pass
+
+ def test_simple_vm1(self):
+ raw1 = CTRexScRaw( [ CTRexVmDescFlowVar(name="a",min_value="16.0.0.1",max_value="16.0.0.10",init_value="16.0.0.1",size=4,op="inc"),
+ CTRexVmDescWrFlowVar (fv_name="a",pkt_offset= "IP.src"),
+ CTRexVmDescFixIpv4(offset = "IP")]
+ );
+
+ pkt_builder = CScapyTRexPktBuilder();
+
+ py='5'*128
+ pkt=Ether()/ \
+ IP(src="16.0.0.1",dst="48.0.0.1")/ \
+ UDP(dport=12,sport=1025)/IP()/py
+
+ # set packet
+ pkt_builder.set_packet(pkt);
+ pkt_builder.add_command ( raw1 )
+ pkt_builder.compile();
+
+ pkt_builder.dump_scripts ()
+
+ print pkt_builder.get_vm_data()
+
+ assert_equal( pkt_builder.get_vm_data(), {'split_by_var': '', 'instructions': [{'name': 'a', 'max_value': 268435466, 'min_value': 268435457, 'init_value': 268435457, 'size': 4, 'type': 'flow_var', 'op': 'inc'}, {'is_big_endian': True, 'pkt_offset': 26, 'type': 'write_flow_var', 'name': 'a', 'add_value': 0}, {'pkt_offset': 14, 'type': 'fix_checksum_ipv4'}]} )
+
+
+
+ def test_simple_no_vm1(self):
+
+ pkt_builder = CScapyTRexPktBuilder();
+
+ py='5'*128
+ pkt=Ether()/ \
+ IP(src="16.0.0.1",dst="48.0.0.1")/ \
+ UDP(dport=12,sport=1025)/IP()/py
+
+ # set packet
+ pkt_builder.set_packet(pkt);
+
+ pkt_builder.compile();
+
+ pkt_builder.dump_scripts ()
+
+ assert_equal( pkt_builder.get_vm_data(),
+ { 'instructions': [ ],
+ 'split_by_var': ''}
+ )
+
+ def test_simple_scapy_vlan(self):
+
+ py='5'*(9)
+ p1=Ether(src="00:00:00:01:00:00",dst="00:00:00:01:00:00")/ \
+ Dot1Q(vlan=12)/ \
+ Dot1Q(vlan=17)/ \
+ IP(src="10.0.0.10",dst="48.0.0.1")/ \
+ UDP(dport=12,sport=1025)/py
+
+ p1.build();
+ p1.dump_layers_offset()
+ p1.show2();
+ hexdump(p1);
+ #wrpcap("ipv4_udp_9k.pcap", p1);
+
+ p_utl=CTRexScapyPktUtl(p1);
+
+ assert_equal(p_utl.get_pkt_layers(),"Ethernet:802.1Q:802.1Q:IP:UDP:Raw")
+ assert_equal(p_utl.layer_offset("802.1Q",0),14);
+ assert_equal(p_utl.layer_offset("802.1Q",1),18);
+ assert_equal(p_utl.get_field_offet_by_str("802|1Q.vlan"),(14,0));
+ assert_equal(p_utl.get_field_offet_by_str("802|1Q:1.vlan"),(18,0));
+ assert_equal(p_utl.get_field_offet_by_str("IP.src"),(34,4));
+
+ def test_simple_scapy_128_udp(self):
+ """
+ build 128 byte packet with 0x35 as pyld
+ """
+
+
+ pkt_size =128
+ p1=Ether(src="00:00:00:01:00:00",dst="00:00:00:01:00:00")/ \
+ IP(src="16.0.0.1",dst="48.0.0.1")/ \
+ UDP(dport=12,sport=1025)
+ pyld_size=pkt_size-len(p1);
+
+ pkt=p1/('5'*(pyld_size))
+
+ pkt.show2();
+ hexdump(pkt);
+ assert_equal(len(pkt),128)
+
+ def test_simple_scapy_9k_ip_len(self):
+ """
+ build 9k ipv4 len packet
+ """
+
+
+ ip_pkt_size =9*1024
+ p_l2=Ether(src="00:00:00:01:00:00",dst="00:00:00:01:00:00");
+ p_l3= IP(src="16.0.0.1",dst="48.0.0.1")/ \
+ UDP(dport=12,sport=1025)
+ pyld_size = ip_pkt_size-len(p_l3);
+
+ pkt=p_l2/p_l3/('\x55'*(pyld_size))
+
+ #pkt.show2();
+ #hexdump(pkt);
+ assert_equal(len(pkt),9*1024+14)
+
+ def test_simple_scapy_ipv6_1(self):
+ """
+ build ipv6 packet
+ """
+
+ print "start "
+ py='\x55'*(64)
+
+ p=Ether()/IPv6()/UDP(dport=12,sport=1025)/py
+ #p.build();
+ #p.dump_layers_offset()
+ hexdump(p);
+ p.show2();
+
+ p_utl=CTRexScapyPktUtl(p);
+
+ assert_equal(p_utl.get_field_offet_by_str("IPv6.src"),(38,16));
+
+
+ def test_simple_vm2(self):
+ raw1 = CTRexScRaw( [ CTRexVmDescFlowVar(name="my_valn",min_value=0,max_value=10,init_value=2,size=1,op="inc"),
+ CTRexVmDescWrFlowVar (fv_name="my_valn",pkt_offset= "802|1Q.vlan" ,offset_fixup=3) # fix the offset as valn is bitfield and not supported right now
+ ]
+ );
+
+ pkt_builder = CScapyTRexPktBuilder();
+
+ py='5'*128
+ pkt=Ether()/ \
+ Dot1Q(vlan=12)/ \
+ IP(src="16.0.0.1",dst="48.0.0.1")/ \
+ UDP(dport=12,sport=1025)/IP()/py
+
+ # set packet
+ pkt_builder.set_packet(pkt);
+ pkt_builder.add_command ( raw1 )
+ pkt_builder.compile();
+
+
+ d= pkt_builder.get_vm_data()
+ assert_equal(d['instructions'][1]['pkt_offset'],17)
+
+ def test_simple_vm3(self):
+ try:
+ raw1 = CTRexScRaw( [ CTRexVmDescFlowVar(name="my_valn",min_value=0,max_value=10,init_value=2,size=1,op="inc"),
+ CTRexVmDescWrFlowVar (fv_name="my_valn_err",pkt_offset= "802|1Q.vlan" ,offset_fixup=3) # fix the offset as valn is bitfield and not supported right now
+ ]
+ );
+
+ pkt_builder = CScapyTRexPktBuilder();
+
+ py='5'*128
+ pkt=Ether()/ \
+ Dot1Q(vlan=12)/ \
+ IP(src="16.0.0.1",dst="48.0.0.1")/ \
+ UDP(dport=12,sport=1025)/IP()/py
+
+ # set packet
+ pkt_builder.set_packet(pkt);
+ pkt_builder.add_command ( raw1 )
+ pkt_builder.compile();
+
+
+ d= pkt_builder.get_vm_data()
+ except CTRexPacketBuildException as e:
+ assert_equal(str(e), "[errcode:-11] 'variable my_valn_err does not exists '")
+
+
+ def tearDown(self):
+ pass
+
+
+class CTRexPktBuilderScapy_Test(pkt_bld_general_test.CGeneralPktBld_Test):
+
+ def setUp(self):
+ pass;
+ #self.pkt_bld = CTRexPktBuilder()
+ #self.pkt_bld.add_pkt_layer("l2", dpkt.ethernet.Ethernet())
+ #self.pp = pprint.PrettyPrinter(indent=4)
+
+ def tearDown(self):
+ pass
+
+
+if __name__ == "__main__":
+ pass
+
diff --git a/scripts/automation/trex_control_plane/client_utils/scapy_packet_builder.py b/scripts/automation/trex_control_plane/client_utils/scapy_packet_builder.py
index e9b94675..65470d53 100644
--- a/scripts/automation/trex_control_plane/client_utils/scapy_packet_builder.py
+++ b/scripts/automation/trex_control_plane/client_utils/scapy_packet_builder.py
@@ -354,10 +354,13 @@ class CTRexScapyPktUtl(object):
"""
return layer offset by string
+ :parameters:
+
IP:0
IP:1
return offset
+
"""
l1=layer_des.split(":")
layer=""
@@ -377,13 +380,14 @@ class CTRexScapyPktUtl(object):
"""
return field_des (offset,size) layer:cnt.field
for example
-
+ 802|1Q.vlan get 802.1Q->valn replace | with .
IP.src
IP:0.src (first IP.src like IP.src)
for example IP:1.src for internal IP
return (offset, size) as tuple
+
"""
s=field_des.split(".");
@@ -391,7 +395,7 @@ class CTRexScapyPktUtl(object):
raise CTRexPacketBuildException(-11, ("field desription should be layer:cnt.field e.g IP.src or IP:1.src"));
- layer_ex = s[0]
+ layer_ex = s[0].replace("|",".")
field = s[1]
l1=layer_ex.split(":")
@@ -437,6 +441,12 @@ class CTRexVmDescBase(object):
print yaml.dump(self.get_json(), default_flow_style=False)
+ def get_var_ref (self):
+ '''
+ virtual function return a ref var name
+ '''
+ return None
+
def get_var_name(self):
'''
virtual function return the varible name if exists
@@ -504,18 +514,23 @@ class CTRexVmDescFixIpv4(CTRexVmDescBase):
self.offset = parent._pkt_layer_offset(self.offset);
class CTRexVmDescWrFlowVar(CTRexVmDescBase):
- def __init__(self, fv_name, pkt_offset, add_value=0, is_big_endian=True):
+ def __init__(self, fv_name, pkt_offset, offset_fixup=0, add_val=0, is_big=True):
super(CTRexVmDescWrFlowVar, self).__init__()
- self.name = fv_name
+ self.name =fv_name
assert type(fv_name)==str, 'type of fv_name is not str'
- self.pkt_offset = pkt_offset
- self.add_value = add_value
- assert type(add_value)==int, 'type of add_value is not int'
- self.is_big_endian =is_big_endian;
- assert type(is_big_endian)==bool, 'type of is_big_endian is not bool'
+ self.offset_fixup =offset_fixup
+ assert type(offset_fixup)==int, 'type of offset_fixup is not int'
+ self.pkt_offset =pkt_offset
+ self.add_val =add_val
+ assert type(add_val)==int,'type of add_val is not int'
+ self.is_big =is_big;
+ assert type(is_big)==bool,'type of is_big_endian is not bool'
+
+ def get_var_ref (self):
+ return self.name
- def get_obj(self):
- return CTRexVmInsWrFlowVar(self.name, self.pkt_offset, self.add_value, self.is_big_endian)
+ def get_obj (self):
+ return CTRexVmInsWrFlowVar(self.name,self.pkt_offset+self.offset_fixup,self.add_val,self.is_big)
def compile(self,parent):
if type(self.pkt_offset)==str:
@@ -625,15 +640,22 @@ class CScapyTRexPktBuilder(object):
# make sure we have varibles once
vars={};
+
+ # add it add var to dit
for desc in obj.commands:
- print desc.__dict__
var_name = desc.get_var_name()
- print var_name
if var_name :
if vars.has_key(var_name):
- raise CTRexPacketBuildException(-11, 'varible %s is defined twice' % var_name);
+ raise CTRexPacketBuildException(-11,("variable %s define twice ") % (var_name) );
else:
vars[var_name]=1
+
+ # check that all write exits
+ for desc in obj.commands:
+ var_name = desc.get_var_ref()
+ if var_name :
+ if not vars.has_key(var_name):
+ raise CTRexPacketBuildException(-11,("variable %s does not exists ") % (var_name) );
desc.compile(self);
for desc in obj.commands:
diff --git a/scripts/external_libs/scapy-2.3.1/scapy/layers/inet6.py b/scripts/external_libs/scapy-2.3.1/scapy/layers/inet6.py
index 5d594d54..19dda97e 100644
--- a/scripts/external_libs/scapy-2.3.1/scapy/layers/inet6.py
+++ b/scripts/external_libs/scapy-2.3.1/scapy/layers/inet6.py
@@ -357,7 +357,7 @@ class IPv6(_IPv6GuessPayload, Packet, IPTools):
ByteField("hlim", 64),
IP6Field("dst", "::2"),
#SourceIP6Field("src", "dst"), # dst is for src @ selection
- IP6Field("dst", "::1") ]
+ IP6Field("src", "::1") ]
def route(self):
dst = self.dst