summaryrefslogtreecommitdiffstats
path: root/scripts
diff options
context:
space:
mode:
authorYaroslav Brustinov <ybrustin@cisco.com>2016-01-13 10:40:23 +0200
committerYaroslav Brustinov <ybrustin@cisco.com>2016-01-13 10:40:23 +0200
commit6092cc6f8b7d6d5f44b9c6cc600fc819257996e3 (patch)
tree360c42940d54212d334eb75f0cbfe43062d13e80 /scripts
parent898fadae0e9bf9d0eb46abf286a888d791bdaf96 (diff)
add Dan's packet builder functional tests
Diffstat (limited to 'scripts')
-rwxr-xr-xscripts/automation/regression/unit_tests/pkt_bld_tests/payload_gen_test.py125
-rwxr-xr-xscripts/automation/regression/unit_tests/pkt_bld_tests/pkt_bld_general_test.py29
-rwxr-xr-xscripts/automation/regression/unit_tests/pkt_bld_tests/pkt_builder_test.py437
-rwxr-xr-xscripts/automation/regression/unit_tests/pkt_bld_tests/test.pcapbin0 -> 346 bytes
-rwxr-xr-xscripts/automation/regression/unit_tests/pkt_bld_tests/test2.pcapbin0 -> 93 bytes
-rwxr-xr-xscripts/automation/regression/unit_tests/pkt_bld_tests/test_cmp.pcapbin0 -> 346 bytes
-rwxr-xr-xscripts/automation/regression/unit_tests/pkt_bld_tests/vm_test.py77
-rwxr-xr-xscripts/automation/regression/unit_tests/pkt_bld_tests/vm_variable_test.py63
8 files changed, 731 insertions, 0 deletions
diff --git a/scripts/automation/regression/unit_tests/pkt_bld_tests/payload_gen_test.py b/scripts/automation/regression/unit_tests/pkt_bld_tests/payload_gen_test.py
new file mode 100755
index 00000000..80d2b086
--- /dev/null
+++ b/scripts/automation/regression/unit_tests/pkt_bld_tests/payload_gen_test.py
@@ -0,0 +1,125 @@
+#!/router/bin/python
+
+import pkt_bld_general_test
+from client_utils.packet_builder import *
+from dpkt.ethernet import Ethernet
+from dpkt.ip import IP
+from dpkt.icmp import ICMP
+from nose.tools import assert_equal
+from nose.tools import assert_not_equal
+from nose.tools import assert_raises
+from nose.tools import raises
+import re
+import binascii
+
+class CTRexPayloadGen_Test(pkt_bld_general_test.CGeneralPktBld_Test):
+
+ def setUp(self):
+# echo = dpkt.icmp.ICMP.Echo()
+# echo.id = random.randint(0, 0xffff)
+# echo.seq = random.randint(0, 0xffff)
+# echo.data = 'hello world'
+#
+# icmp = dpkt.icmp.ICMP()
+# icmp.type = dpkt.icmp.ICMP_ECHO
+# icmp.data = echo
+
+ # packet generation is done directly using dpkt package and
+ self.packet = Ethernet()
+ ip = IP(src='\x01\x02\x03\x04', dst='\x05\x06\x07\x08', p=1)
+ icmp = ICMP(type=8, data=ICMP.Echo(id=123, seq=1, data='foobar'))
+ ip.data = icmp
+ self.packet.src = "\x00\x00\x55\x55\x00\x00"
+ self.packet.dst = "\x00\x00\x11\x11\x00\x00"
+ self.packet.data = ip
+ self.print_packet(self.packet)
+ self.max_pkt_size = 1400
+ self.pld_gen = CTRexPktBuilder.CTRexPayloadGen(self.packet, self.max_pkt_size)
+
+ @staticmethod
+ def special_match(strg, search=re.compile(r'[^a-zA-Z0-9]').search):
+ return not bool(search(strg))
+
+ @staticmethod
+ def principal_period(s):
+ # finds the string the repeats itself in the string
+ i = (s+s).find(s, 1, -1)
+ return None if i == -1 else s[:i]
+
+ def test_gen_random_str(self):
+ generated_str = self.pld_gen.gen_random_str()
+ # print "\nGenerated string: {}".format(generated_str)
+ # chech that the generated string is accorsing to rules.
+ assert CTRexPayloadGen_Test.special_match(generated_str)
+ assert_equal(len(generated_str), (self.max_pkt_size - len(self.packet)))
+
+ def test_gen_repeat_ptrn(self):
+ gen_len = self.max_pkt_size - len(self.packet)
+ # case 1 - repeated string
+ repeat_str = "HelloWorld"
+ generated_str = self.pld_gen.gen_repeat_ptrn(repeat_str)
+ for i in xrange(len(repeat_str)):
+ if generated_str.endswith(repeat_str[:i+1]):
+ # remove the string residue, if found
+ generated_str = generated_str[:-(i+1)]
+ # print generated_str
+ break
+ assert_equal(CTRexPayloadGen_Test.principal_period(generated_str), "HelloWorld")
+
+ # case 2.1 - repeated single number - long number
+ repeat_num = 0x645564646465
+ generated_str = self.pld_gen.gen_repeat_ptrn(repeat_num)
+ ptrn = binascii.unhexlify(hex(repeat_num)[2:])
+ for i in xrange(len(ptrn)):
+ if generated_str.endswith(ptrn[:i+1]):
+ # remove the string residue, if found
+ generated_str = generated_str[:-(i+1)]
+ break
+ assert_equal(CTRexPayloadGen_Test.principal_period(generated_str), ptrn)
+ # case 2.2 - repeated single number - 1 byte
+ repeat_num = 0x64
+ generated_str = self.pld_gen.gen_repeat_ptrn(repeat_num)
+ ptrn = binascii.unhexlify(hex(repeat_num)[2:])
+ assert_equal(CTRexPayloadGen_Test.principal_period(generated_str), ptrn)
+ assert_equal(len(generated_str), (self.max_pkt_size - len(self.packet)))
+ # case 3 - repeated sequence
+ repeat_seq = (0x55, 0x60, 0x65, 0x70, 0x85)
+ ptrn = binascii.unhexlify(''.join(hex(x)[2:] for x in repeat_seq))
+ generated_str = self.pld_gen.gen_repeat_ptrn(repeat_seq)
+ # ptrn = binascii.unhexlify(hex(repeat_num)[2:])
+ for i in xrange(len(ptrn)):
+ if generated_str.endswith(ptrn[:i+1]):
+ # remove the string residue, if found
+ generated_str = generated_str[:-(i+1)]
+ break
+ assert_equal(CTRexPayloadGen_Test.principal_period(generated_str), ptrn)
+
+ # in tuples, check that if any of the numbers exceeds limit
+ assert_raises(ValueError, self.pld_gen.gen_repeat_ptrn, (0x1, -18))
+ assert_raises(ValueError, self.pld_gen.gen_repeat_ptrn, (0xFFF, 5))
+ # finally, check an exception is thrown in rest of cases
+ assert_raises(ValueError, self.pld_gen.gen_repeat_ptrn, 5.5)
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ pass
+
+
+ def tearDown(self):
+ pass
+
+
+if __name__ == "__main__":
+ pass
+
diff --git a/scripts/automation/regression/unit_tests/pkt_bld_tests/pkt_bld_general_test.py b/scripts/automation/regression/unit_tests/pkt_bld_tests/pkt_bld_general_test.py
new file mode 100755
index 00000000..b630147b
--- /dev/null
+++ b/scripts/automation/regression/unit_tests/pkt_bld_tests/pkt_bld_general_test.py
@@ -0,0 +1,29 @@
+#!/router/bin/python
+
+from nose.tools import assert_equal
+from nose.tools import assert_not_equal
+from nose.tools import assert_raises
+from nose.tools import raises
+import sys
+import outer_packages
+from client_utils.packet_builder import *
+
+
+class CGeneralPktBld_Test(object):
+ def __init__(self):
+ pass
+
+ @staticmethod
+ def print_packet(pkt_obj):
+ print "\nGenerated packet:\n{}".format(repr(pkt_obj))
+
+
+ def setUp(self):
+ pass
+
+
+ def tearDown(self):
+ pass
+
+if __name__ == "__main__":
+ pass
diff --git a/scripts/automation/regression/unit_tests/pkt_bld_tests/pkt_builder_test.py b/scripts/automation/regression/unit_tests/pkt_bld_tests/pkt_builder_test.py
new file mode 100755
index 00000000..a0b87a43
--- /dev/null
+++ b/scripts/automation/regression/unit_tests/pkt_bld_tests/pkt_builder_test.py
@@ -0,0 +1,437 @@
+#!/router/bin/python
+
+import pkt_bld_general_test
+from client_utils.packet_builder import *
+import dpkt
+from nose.tools import assert_equal
+from nose.tools import assert_not_equal
+from nose.tools import assert_raises
+from nose.tools import raises
+import os
+import random
+import pprint
+
+class CTRexPktBuilderSanity_Test(pkt_bld_general_test.CGeneralPktBld_Test):
+
+ def setUp(self):
+ pass
+
+ def test_decode_ip_addr(self):
+ # test ipv4 case
+ assert_equal(CTRexPktBuilder._decode_ip_addr('1.2.3.4', "ipv4"), '\x01\x02\x03\x04')
+ assert_equal(CTRexPktBuilder._decode_ip_addr('127.0.0.1', "ipv4"), '\x7F\x00\x00\x01')
+ assert_raises(CTRexPktBuilder.IPAddressError, CTRexPktBuilder._decode_ip_addr, '1.2.3.4.5', "ipv4")
+ assert_raises(CTRexPktBuilder.IPAddressError, CTRexPktBuilder._decode_ip_addr, '1.2.3.4', "ipv6")
+ # test ipv6 case
+ assert_equal(CTRexPktBuilder._decode_ip_addr("5001::DB8:1:3333:1:1", "ipv6"),
+ 'P\x01\x00\x00\x00\x00\r\xb8\x00\x0133\x00\x01\x00\x01')
+ assert_raises(CTRexPktBuilder.IPAddressError, CTRexPktBuilder._decode_ip_addr,
+ '2001::DB8:1:2222::1:1', "ipv6")
+
+ def test_decode_mac_addr(self):
+ assert_equal(CTRexPktBuilder._decode_mac_addr('00:de:34:ef:2e:f4'), '\x00\xde4\xef.\xf4')
+ assert_equal(CTRexPktBuilder._decode_mac_addr('00-de-55-ef-2e-f4'), '\x00\xdeU\xef.\xf4')
+ assert_raises(CTRexPktBuilder.MACAddressError, CTRexPktBuilder._decode_mac_addr,
+ '00:de:34:ef:2e:f4:f4')
+ assert_raises(CTRexPktBuilder.MACAddressError, CTRexPktBuilder._decode_mac_addr,
+ '1.2.3.4')
+ assert_raises(CTRexPktBuilder.MACAddressError, CTRexPktBuilder._decode_mac_addr,
+ '00 de 34 ef 2e f4 f4')
+
+ def test_gen_layer_name(self):
+ pkt = CTRexPktBuilder()
+ assert_equal(pkt._gen_layer_name("eth"), "eth_1")
+ pkt._pkt_by_hdr = {'eth':None} # mock header pointer data
+ assert_equal(pkt._gen_layer_name("eth"), "eth_1")
+ pkt._pkt_by_hdr.update({'eth_1':None}) # more mock header pointer data
+ assert_equal(pkt._gen_layer_name("eth"), "eth_2")
+
+ def test_set_layer_attr_basic(self):
+ pkt = CTRexPktBuilder()
+ pkt._pkt_by_hdr['ip'] = dpkt.ip.IP()
+ # case 1 - test full value assignment
+ pkt.set_layer_attr('ip', 'src', '\x01\x02\x03\x04')
+ assert_equal(pkt._pkt_by_hdr['ip'].src, '\x01\x02\x03\x04')
+ # case 2 - test bit assignment
+ pkt.set_layer_bit_attr('ip', 'off', dpkt.ip.IP_DF)
+ pkt.set_layer_bit_attr('ip', 'off', dpkt.ip.IP_MF)
+ assert_equal(bin(pkt._pkt_by_hdr['ip'].off), '0b110000000000000')
+ # case 3 - test assignment of not-exist attribute
+ assert_raises(ValueError, pkt.set_layer_bit_attr, 'ip', 'src_dst', 0)
+ # case 4.1 - test assignment of data attribute - without dpkt.Packet object
+ assert_raises(ValueError, pkt.set_layer_bit_attr, 'ip', 'data', "Not a dpkt.Packet object")
+ # case 4.2 - test assignment of data attribute - with dpkt.Packet object - tested under CTRexPktBuilder_Test class
+# tcp = dpkt.tcp.TCP()
+ self.print_packet(pkt._pkt_by_hdr['ip'])
+# pkt.set_layer_attr('ip', 'data', tcp)
+ # case 5 - test assignment of not-exist layer
+ assert_raises(KeyError, pkt.set_layer_bit_attr, 'no_such_layer', 'src', 0)
+
+ def tearDown(self):
+ pass
+
+
+class CTRexPktBuilder_Test(pkt_bld_general_test.CGeneralPktBld_Test):
+
+ def setUp(self):
+ self.pkt_bld = CTRexPktBuilder()
+ self.pkt_bld.add_pkt_layer("l2", dpkt.ethernet.Ethernet())
+ self.pp = pprint.PrettyPrinter(indent=4)
+
+ def test_add_pkt_layer(self):
+ ip = dpkt.ip.IP(src='\x01\x02\x03\x04', dst='\x05\x06\x07\x08', p=1)
+ self.pkt_bld.add_pkt_layer("l3", ip)
+ tcp = dpkt.tcp.TCP(sport = 8080)
+ self.pkt_bld.add_pkt_layer("l4_tcp", tcp)
+ assert_equal(len(self.pkt_bld._pkt_by_hdr), 3)
+ assert_equal(self.pkt_bld._pkt_by_hdr.keys(), ['l2', 'l3', 'l4_tcp'])
+ self.print_packet(self.pkt_bld._packet)
+ assert_raises(ValueError, self.pkt_bld.add_pkt_layer, 'l2', dpkt.ethernet.Ethernet())
+
+ def test_set_ip_layer_addr(self):
+ ip = dpkt.ip.IP()
+ self.pkt_bld.add_pkt_layer("l3", ip)
+ self.pkt_bld.set_ip_layer_addr("l3", "src", "1.2.3.4")
+ self.print_packet(self.pkt_bld._packet)
+ assert_equal(self.pkt_bld._pkt_by_hdr['l3'].src, '\x01\x02\x03\x04')
+ # check that only IP layer is using this function
+ assert_raises(ValueError, self.pkt_bld.set_ip_layer_addr, 'l2', "src", "1.2.3.4")
+
+ def test_calc_offset(self):
+ ip = dpkt.ip.IP()
+ self.pkt_bld.add_pkt_layer("l3", ip)
+ assert_equal(self.pkt_bld._calc_offset("l3", "src", 4), (14, 14+12))
+
+ def test_set_ipv6_layer_addr(self):
+ ip6 = dpkt.ip6.IP6()
+ self.pkt_bld.add_pkt_layer("l3", ip6)
+ self.pkt_bld.set_ipv6_layer_addr("l3", "src", "5001::DB8:1:3333:1:1")
+ self.print_packet(self.pkt_bld._packet)
+ assert_equal(self.pkt_bld._pkt_by_hdr['l3'].src, 'P\x01\x00\x00\x00\x00\r\xb8\x00\x0133\x00\x01\x00\x01')
+ # check that only IP layer is using this function
+ assert_raises(ValueError, self.pkt_bld.set_ipv6_layer_addr, 'l2', "src", "5001::DB8:1:3333:1:1")
+
+ def test_set_eth_layer_addr(self):
+ ip = dpkt.ip.IP()
+ self.pkt_bld.add_pkt_layer("l3", ip)
+ self.pkt_bld.set_eth_layer_addr("l2", "src", "00:de:34:ef:2e:f4")
+ self.print_packet(self.pkt_bld._packet)
+ assert_equal(self.pkt_bld._pkt_by_hdr['l2'].src, '\x00\xde4\xef.\xf4')
+ # check that only IP layer is using this function
+ assert_raises(ValueError, self.pkt_bld.set_eth_layer_addr, 'l3', "src", "\x00\xde4\xef.\xf4")
+
+ def test_set_layer_attr(self):
+ # extend the set_layer_attr_basic test by handling the following case:
+ # replace some header data with another layer, causing other layers to disconnect
+ # this also tests the _reevaluate_packet method
+ ip = dpkt.ip.IP(src='\x01\x02\x03\x04', dst='\x05\x06\x07\x08', p=1)
+ self.pkt_bld.add_pkt_layer("l3_ip", ip)
+ tcp = dpkt.tcp.TCP(sport = 8080)
+ self.pkt_bld.add_pkt_layer("l4_tcp", tcp)
+ # sanity: try changing data attr with non-dpkt.Packet instance
+ assert_raises(ValueError, self.pkt_bld.set_layer_attr, 'l2', 'data', "HelloWorld")
+ # now, add different L3 layer instead of existting one, L4 would disconnect
+ old_layer_count = len(self.pkt_bld._pkt_by_hdr)
+ new_ip = dpkt.ip.IP(src='\x05\x06\x07\x08', dst='\x01\x02\x03\x04')
+ print "\nBefore disconnecting layers:"
+ print "============================",
+ self.print_packet(self.pkt_bld._packet)
+ self.pkt_bld.set_layer_attr('l2', 'data', new_ip)
+ print "\nAfter disconnecting layers:"
+ print "===========================",
+ self.print_packet(self.pkt_bld._packet)
+ assert_not_equal(old_layer_count, len(self.pkt_bld._pkt_by_hdr))
+ assert_equal(len(self.pkt_bld._pkt_by_hdr), 1) # only Eth layer appears
+
+ def test_set_pkt_payload(self):
+ payload = "HelloWorld"
+ # test for setting a payload to an empty packet
+ empty_pkt = CTRexPktBuilder()
+ assert_raises(AttributeError, empty_pkt.set_pkt_payload, payload)
+ # add content to packet
+ ip = dpkt.ip.IP(src='\x01\x02\x03\x04', dst='\x05\x06\x07\x08', p=1)
+ self.pkt_bld.add_pkt_layer("l3_ip", ip)
+ tcp = dpkt.tcp.TCP(sport = 8080)
+ self.pkt_bld.add_pkt_layer("l4_tcp", tcp)
+ # now, set a payload for the packet
+ self.pkt_bld.set_pkt_payload(payload)
+ self.print_packet(self.pkt_bld._packet)
+ assert_equal(self.pkt_bld._pkt_by_hdr['l4_tcp'].data, payload)
+
+ def test_load_packet(self):
+ # add content to packet
+ ip = dpkt.ip.IP(src='\x01\x02\x03\x04', dst='\x05\x06\x07\x08', p=1)
+ self.pkt_bld.add_pkt_layer("l3_ip", ip)
+ tcp = dpkt.tcp.TCP(sport = 8080)
+ self.pkt_bld.add_pkt_layer("l4_tcp", tcp)
+ self.pkt_bld.set_pkt_payload("HelloWorld")
+
+ new_pkt = CTRexPktBuilder()
+ new_pkt.load_packet(self.pkt_bld._packet)
+ self.print_packet(new_pkt._packet)
+ assert_equal(len(new_pkt._pkt_by_hdr), 4)
+ assert_equal(new_pkt._pkt_by_hdr.keys(),
+ ['ip_1',
+ 'tcp_1',
+ 'pkt_final_payload',
+ 'ethernet_1'
+ ]
+ )
+ assert_equal(new_pkt._pkt_by_hdr['pkt_final_payload'], "HelloWorld")
+
+ def test_get_packet(self):
+ # get a pointer to the packet
+ assert(self.pkt_bld.get_packet(get_ptr=True) is self.pkt_bld._packet)
+ # get a copy of the packet
+ assert(not(self.pkt_bld.get_packet() is self.pkt_bld._packet))
+
+ def test_get_layer(self):
+ assert_equal(self.pkt_bld.get_layer('no_such_layer'), None)
+ assert(not(self.pkt_bld.get_layer('l2') is self.pkt_bld._packet))
+ assert(type(self.pkt_bld.get_layer('l2')).__name__, "ethernet")
+
+ def test_dump_to_pcap(self):
+ # set Ethernet layer attributes
+ self.pkt_bld.set_eth_layer_addr("l2", "src", "00:15:17:a7:75:a3")
+ self.pkt_bld.set_eth_layer_addr("l2", "dst", "e0:5f:b9:69:e9:22")
+ self.pkt_bld.set_layer_attr("l2", "type", dpkt.ethernet.ETH_TYPE_IP)
+ # set IP layer attributes
+ self.pkt_bld.add_pkt_layer("l3_ip", dpkt.ip.IP())
+ self.pkt_bld.set_ip_layer_addr("l3_ip", "src", "21.0.0.2")
+ self.pkt_bld.set_ip_layer_addr("l3_ip", "dst", "22.0.0.12")
+ self.pkt_bld.set_layer_attr("l3_ip", "p", dpkt.ip.IP_PROTO_TCP)
+ # set TCP layer attributes
+ self.pkt_bld.add_pkt_layer("l4_tcp", dpkt.tcp.TCP())
+ self.pkt_bld.set_layer_attr("l4_tcp", "sport", 13311)
+ self.pkt_bld.set_layer_attr("l4_tcp", "dport", 80)
+ self.pkt_bld.set_layer_attr("l4_tcp", "flags", 0)
+ self.pkt_bld.set_layer_attr("l4_tcp", "win", 32768)
+ self.pkt_bld.set_layer_attr("l4_tcp", "seq", 0)
+ # set packet payload, for example HTTP GET request
+ self.pkt_bld.set_pkt_payload('GET /10k_60k HTTP/1.1\r\nHost: 22.0.0.3\r\nConnection: Keep-Alive\r\nUser-Agent: Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; SV1; .NET CLR 1.1.4322; .NET CLR 2.0.50727)\r\nAccept: */*\r\nAccept-Language: en-us\r\nAccept-Encoding: gzip, deflate, compress\r\n\r\n')
+ # finally, set IP header len with relation to payload data
+ self.pkt_bld.set_layer_attr("l3_ip", "len", len(self.pkt_bld.get_layer('l3_ip')))
+
+ filepath = "unit_tests/pkt_bld_tests/test.pcap"
+ self.pkt_bld.dump_pkt_to_pcap(filepath)
+ assert os.path.isfile(filepath)
+ # remove pcap after creation - masked for now
+ # os.remove(filepath)
+ filepath = "/not/a/valid/path/test.pcap"
+ assert_raises(IOError, self.pkt_bld.dump_pkt_to_pcap, filepath)
+ # check that dump is not available for empty packet
+ new_pkt = CTRexPktBuilder()
+ assert_raises(CTRexPktBuilder.EmptyPacketError, new_pkt.dump_pkt_to_pcap, filepath)
+
+ def test_dump_pkt(self):
+ # check that dump is not available for empty packet
+ new_pkt = CTRexPktBuilder()
+ assert_raises(CTRexPktBuilder.EmptyPacketError, new_pkt.dump_pkt)
+
+ # set Ethernet layer attributes
+ self.pkt_bld.set_eth_layer_addr("l2", "src", "00:15:17:a7:75:a3")
+ self.pkt_bld.set_eth_layer_addr("l2", "dst", "e0:5f:b9:69:e9:22")
+ self.pkt_bld.set_layer_attr("l2", "type", dpkt.ethernet.ETH_TYPE_IP)
+ # set IP layer attributes
+ self.pkt_bld.add_pkt_layer("l3_ip", dpkt.ip.IP())
+ self.pkt_bld.set_ip_layer_addr("l3_ip", "src", "21.0.0.2")
+ self.pkt_bld.set_ip_layer_addr("l3_ip", "dst", "22.0.0.12")
+ self.pkt_bld.set_layer_attr("l3_ip", "p", dpkt.ip.IP_PROTO_ICMP)
+ # set ICMP layer attributes
+ self.pkt_bld.add_pkt_layer("icmp", dpkt.icmp.ICMP())
+ self.pkt_bld.set_layer_attr("icmp", "type", dpkt.icmp.ICMP_ECHO)
+ # set Echo(ICMP) layer attributes
+ self.pkt_bld.add_pkt_layer("icmp_echo", dpkt.icmp.ICMP.Echo())
+ self.pkt_bld.set_layer_attr("icmp_echo", "id", 24528)
+ self.pkt_bld.set_layer_attr("icmp_echo", "seq", 11482)
+ self.pkt_bld.set_pkt_payload('hello world')
+
+ # finally, set IP header len with relation to payload data
+ self.pkt_bld.set_layer_attr("l3_ip", "len", len(self.pkt_bld.get_layer('l3_ip')))
+
+ self.print_packet(self.pkt_bld.get_packet())
+ assert_equal(self.pkt_bld.dump_pkt(), {
+ 'binary': [224, 95, 185, 105, 233, 34, 0, 21, 23, 167, 117, 163, 8, 0, 69, 0, 0, 39, 0, 0, 0, 0, 64, 1, 79, 201, 21, 0, 0, 2, 22, 0, 0, 12, 8, 0, 217, 134, 95, 208, 44, 218, 104, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100],
+ 'meta': '',
+ })
+
+ def test_set_vm_ip_range_ipv4(self):
+ # set some mock packet
+ ip = dpkt.ip.IP()
+ self.pkt_bld.add_pkt_layer("l3", ip)
+ self.pkt_bld.add_pkt_layer("l4_tcp", dpkt.tcp.TCP())
+ self.pkt_bld.set_pkt_payload("HelloWorld")
+
+ self.pkt_bld.set_vm_ip_range("l3", "src",
+ "10.0.0.1", "10.0.0.1", "10.0.0.255", 1,
+ "inc")
+# self.pkt_bld.set_vm_custom_range(layer_name="l3",
+# hdr_field="tos",
+# init_val="10", start_val="10", end_val="200", add_val=2, val_size=1,
+# operation="inc")
+ print ''
+ self.pp.pprint(self.pkt_bld.vm.dump())
+ assert_equal(self.pkt_bld.vm.dump(),
+ { 'instructions': [ { 'init_value': '167772161',
+ 'max_value': '167772415',
+ 'min_value': '167772161',
+ 'name': 'l3__src',
+ 'op': 'inc',
+ 'size': 4,
+ 'type': 'flow_var'},
+ { 'add_value': 1,
+ 'is_big_endian': False,
+ 'name': 'l3__src',
+ 'pkt_offset': 26,
+ 'type': 'write_flow_var'},
+ { 'pkt_offset': 14, 'type': 'fix_checksum_ipv4'}],
+ 'split_by_var': ''}
+ )
+
+ def test_set_vm_ip_range_ipv4_no_checksum(self):
+ # set some mock packet
+ ip = dpkt.ip.IP()
+ self.pkt_bld.add_pkt_layer("l3", ip)
+ self.pkt_bld.add_pkt_layer("l4_tcp", dpkt.tcp.TCP())
+ self.pkt_bld.set_pkt_payload("HelloWorld")
+
+ self.pkt_bld.set_vm_ip_range(ip_layer_name="l3",
+ ip_field="src",
+ ip_init="10.0.0.1", ip_start="10.0.0.1", ip_end="10.0.0.255",
+ add_value=1,
+ operation="inc",
+ add_checksum_inst=False)
+ print ''
+ self.pp.pprint(self.pkt_bld.vm.dump())
+ assert_equal(self.pkt_bld.vm.dump(),
+ { 'instructions': [ { 'init_value': '167772161',
+ 'max_value': '167772415',
+ 'min_value': '167772161',
+ 'name': 'l3__src',
+ 'op': 'inc',
+ 'size': 4,
+ 'type': 'flow_var'},
+ { 'add_value': 1,
+ 'is_big_endian': False,
+ 'name': 'l3__src',
+ 'pkt_offset': 26,
+ 'type': 'write_flow_var'}],
+ 'split_by_var': ''}
+ )
+
+ def test_set_vm_ip_range_ipv6(self):
+ # set some mock packet
+ ip6 = dpkt.ip6.IP6()
+ self.pkt_bld.add_pkt_layer("l3", ip6)
+ self.pkt_bld.add_pkt_layer("l4_tcp", dpkt.tcp.TCP())
+ self.pkt_bld.set_pkt_payload("HelloWorld")
+
+ self.pkt_bld.set_vm_ip_range(ip_layer_name="l3",
+ ip_field="src",
+ ip_init="5001::DB8:1:3333:1:1", ip_start="5001::DB8:1:3333:1:1", ip_end="5001::DB8:1:3333:1:F",
+ add_value=1,
+ operation="inc",
+ ip_type="ipv6")
+ print ''
+ self.pp.pprint(self.pkt_bld.vm.dump())
+ assert_equal(self.pkt_bld.vm.dump(),
+ { 'instructions': [ { 'init_value': '65537',
+ 'max_value': '65551',
+ 'min_value': '65537',
+ 'name': 'l3__src',
+ 'op': 'inc',
+ 'size': 4,
+ 'type': 'flow_var'},
+ { 'add_value': 1,
+ 'is_big_endian': False,
+ 'name': 'l3__src',
+ 'pkt_offset': 34,
+ 'type': 'write_flow_var'}],
+ 'split_by_var': ''}
+ )
+
+ def test_set_vm_eth_range(self):
+ pass
+
+ def test_set_vm_custom_range(self):
+ # set some mock packet
+ ip = dpkt.ip.IP()
+ self.pkt_bld.add_pkt_layer("l3", ip)
+ self.pkt_bld.add_pkt_layer("l4_tcp", dpkt.tcp.TCP())
+ self.pkt_bld.set_pkt_payload("HelloWorld")
+
+ self.pkt_bld.set_vm_custom_range(layer_name="l3",
+ hdr_field="tos",
+ init_val=10, start_val=10, end_val=200, add_val=2, val_size=1,
+ operation="inc")
+ print ''
+ self.pp.pprint(self.pkt_bld.vm.dump())
+ assert_equal(self.pkt_bld.vm.dump(),
+ { 'instructions': [ { 'init_value': '10',
+ 'max_value': '200',
+ 'min_value': '10',
+ 'name': 'l3__tos',
+ 'op': 'inc',
+ 'size': 1,
+ 'type': 'flow_var'},
+ { 'add_value': 2,
+ 'is_big_endian': False,
+ 'name': 'l3__tos',
+ 'pkt_offset': 15,
+ 'type': 'write_flow_var'},
+ { 'pkt_offset': 14, 'type': 'fix_checksum_ipv4'}],
+ 'split_by_var': ''}
+ )
+
+ def test_various_ranges(self):
+ # set some mock packet
+ ip = dpkt.ip.IP()
+ self.pkt_bld.add_pkt_layer("l3", ip)
+ self.pkt_bld.add_pkt_layer("l4_tcp", dpkt.tcp.TCP())
+ self.pkt_bld.set_pkt_payload("HelloWorld")
+
+ self.pkt_bld.set_vm_ip_range("l3", "src",
+ "10.0.0.1", "10.0.0.1", "10.0.0.255", 1,
+ "inc")
+ self.pkt_bld.set_vm_custom_range(layer_name="l3",
+ hdr_field="tos",
+ init_val=10, start_val=10, end_val=200, add_val=2, val_size=1,
+ operation="inc")
+ print ''
+ self.pp.pprint(self.pkt_bld.vm.dump())
+ assert_equal(self.pkt_bld.vm.dump(),
+ {'instructions': [{'init_value': '167772161',
+ 'max_value': '167772415',
+ 'min_value': '167772161',
+ 'name': 'l3__src',
+ 'op': 'inc',
+ 'size': 4,
+ 'type': 'flow_var'},
+ {'init_value': '10',
+ 'max_value': '200',
+ 'min_value': '10',
+ 'name': 'l3__tos',
+ 'op': 'inc',
+ 'size': 1,
+ 'type': 'flow_var'},
+ {'add_value': 2,
+ 'is_big_endian': False,
+ 'name': 'l3__tos',
+ 'pkt_offset': 15,
+ 'type': 'write_flow_var'},
+ {'add_value': 1,
+ 'is_big_endian': False,
+ 'name': 'l3__src',
+ 'pkt_offset': 26,
+ 'type': 'write_flow_var'},
+ {'pkt_offset': 14, 'type': 'fix_checksum_ipv4'}],
+ 'split_by_var': ''}
+ )
+
+ def tearDown(self):
+ pass
+
+
+if __name__ == "__main__":
+ pass
+
diff --git a/scripts/automation/regression/unit_tests/pkt_bld_tests/test.pcap b/scripts/automation/regression/unit_tests/pkt_bld_tests/test.pcap
new file mode 100755
index 00000000..da1830b8
--- /dev/null
+++ b/scripts/automation/regression/unit_tests/pkt_bld_tests/test.pcap
Binary files differ
diff --git a/scripts/automation/regression/unit_tests/pkt_bld_tests/test2.pcap b/scripts/automation/regression/unit_tests/pkt_bld_tests/test2.pcap
new file mode 100755
index 00000000..1d35d9c1
--- /dev/null
+++ b/scripts/automation/regression/unit_tests/pkt_bld_tests/test2.pcap
Binary files differ
diff --git a/scripts/automation/regression/unit_tests/pkt_bld_tests/test_cmp.pcap b/scripts/automation/regression/unit_tests/pkt_bld_tests/test_cmp.pcap
new file mode 100755
index 00000000..4c92859f
--- /dev/null
+++ b/scripts/automation/regression/unit_tests/pkt_bld_tests/test_cmp.pcap
Binary files differ
diff --git a/scripts/automation/regression/unit_tests/pkt_bld_tests/vm_test.py b/scripts/automation/regression/unit_tests/pkt_bld_tests/vm_test.py
new file mode 100755
index 00000000..603c52db
--- /dev/null
+++ b/scripts/automation/regression/unit_tests/pkt_bld_tests/vm_test.py
@@ -0,0 +1,77 @@
+#!/router/bin/python
+
+import pkt_bld_general_test
+from client_utils.packet_builder import *
+from nose.tools import assert_equal
+from nose.tools import assert_not_equal
+from nose.tools import assert_raises
+from nose.tools import raises
+
+class CTRexVM_Test(pkt_bld_general_test.CGeneralPktBld_Test):
+
+ def setUp(self):
+ self.vm = CTRexPktBuilder.CTRexVM()
+
+ def test_add_flow_man_inst(self):
+ self.vm.add_flow_man_inst("src_ip")
+ assert_raises(CTRexPktBuilder.VMVarNameExistsError, self.vm.add_flow_man_inst, "src_ip")
+ self.vm.add_flow_man_inst("dst_ip", size=8, operation="inc", init_value=5, max_value=100)
+ assert_equal(self.vm.vm_variables["dst_ip"].dump(),
+ {"type": "flow_var",
+ "name": "dst_ip",
+ "size": 8,
+ # "big_endian": True,
+ "op": "inc",
+# "split_by_core": False,
+ "init_value": "5",
+ "min_value": "1",
+ "max_value": "100"})
+ assert_raises(CTRexPktBuilder.VMFieldNameError, self.vm.add_flow_man_inst, "src_mac", no_field=1)
+
+ def test_load_flow_man(self):
+ vm2 = CTRexPktBuilder.CTRexVM()
+ vm2.add_flow_man_inst("dst_ip", size=8, operation="inc", init_value=5, max_value=100)
+ self.vm.load_flow_man(vm2.vm_variables["dst_ip"])
+ assert_equal(self.vm.vm_variables["dst_ip"].dump(),
+ vm2.vm_variables["dst_ip"].dump())
+
+ def test_set_vm_var_field(self):
+ self.vm.add_flow_man_inst("src_ip")
+ self.vm.set_vm_var_field("src_ip", "size", 8)
+ assert_equal(self.vm.vm_variables["src_ip"].size, 8)
+ assert_raises(KeyError, self.vm.set_vm_var_field, "no_var", "no_field", 10)
+ assert_raises(CTRexPktBuilder.VMFieldNameError, self.vm.set_vm_var_field, "src_ip", "no_field", 10)
+ assert_raises(CTRexPktBuilder.VMFieldValueError, self.vm.set_vm_var_field, "src_ip", "operation", "rand")
+
+ def test_dump(self):
+ self.vm.add_flow_man_inst("dst_ip", size=8, operation="inc", init_value=5, max_value=100)
+ self.vm.add_flow_man_inst("src_ip", size=8, operation="dec", init_value=10, min_value=2, max_value=100)
+ from pprint import pprint
+ print ''
+ pprint (self.vm.dump())
+ assert_equal(self.vm.dump(),
+ {'instructions': [{'init_value': '10',
+ 'max_value': '100',
+ 'min_value': '2',
+ 'name': 'src_ip',
+ 'op': 'dec',
+ 'size': 8,
+ 'type': 'flow_var'},
+ {'init_value': '5',
+ 'max_value': '100',
+ 'min_value': '1',
+ 'name': 'dst_ip',
+ 'op': 'inc',
+ 'size': 8,
+ 'type': 'flow_var'}],
+ 'split_by_var': ''}
+ )
+
+
+ def tearDown(self):
+ pass
+
+
+if __name__ == "__main__":
+ pass
+
diff --git a/scripts/automation/regression/unit_tests/pkt_bld_tests/vm_variable_test.py b/scripts/automation/regression/unit_tests/pkt_bld_tests/vm_variable_test.py
new file mode 100755
index 00000000..af56b1b1
--- /dev/null
+++ b/scripts/automation/regression/unit_tests/pkt_bld_tests/vm_variable_test.py
@@ -0,0 +1,63 @@
+#!/router/bin/python
+
+import pkt_bld_general_test
+from client_utils.packet_builder import *
+from nose.tools import assert_equal
+from nose.tools import assert_not_equal
+from nose.tools import assert_raises
+from nose.tools import raises
+
+class CTRexVMVariable_Test(pkt_bld_general_test.CGeneralPktBld_Test):
+
+ def setUp(self):
+ self.vm_var = CTRexPktBuilder.CTRexVM.CTRexVMFlowVariable("test_var")
+
+ def test_var_name(self):
+ assert_equal(self.vm_var.name, "test_var")
+
+# @raises(CTRexPktBuilder.VMVarNameError)
+ def test_set_field(self):
+ assert_raises(CTRexPktBuilder.VMFieldNameError, self.vm_var.set_field, "no_field", 10)
+ # test for 'size' field
+ assert_raises(CTRexPktBuilder.VMFieldTypeError, self.vm_var.set_field, "size", "wrong_type")
+ assert_raises(CTRexPktBuilder.VMFieldValueError, self.vm_var.set_field, "size", 10) # 10 is illegal size
+ self.vm_var.set_field("size", 8)
+ assert_equal(self.vm_var.size, 8)
+ # test for 'init_value' field
+ assert_raises(CTRexPktBuilder.VMFieldTypeError, self.vm_var.set_field, "init_value", '123') # 123 is wrong type, should be int
+ self.vm_var.set_field("init_value", 5)
+ assert_equal(self.vm_var.init_value, 5)
+ # test for 'operation' field
+ assert_raises(CTRexPktBuilder.VMFieldTypeError, self.vm_var.set_field, "operation", 1) # operation is field of type str
+ assert_raises(CTRexPktBuilder.VMFieldValueError, self.vm_var.set_field, "operation", "rand") # "rand" is illegal option
+ self.vm_var.set_field("operation", "inc")
+ assert_equal(self.vm_var.operation, "inc")
+ # test for 'split_by_core' field
+# self.vm_var.set_field("split_by_core", 5)
+# assert_equal(self.vm_var.split_by_core, True)
+
+ def test_var_dump (self):
+ # set VM variable options
+ self.vm_var.set_field("size", 8)
+ self.vm_var.set_field("init_value", 5)
+ self.vm_var.set_field("operation", "inc")
+# self.vm_var.set_field("split_by_core", False)
+ self.vm_var.set_field("max_value", 100)
+ assert_equal(self.vm_var.dump(),
+ {"type": "flow_var",
+ "name": "test_var",
+ "size": 8,
+ # "big_endian": True,
+ "op": "inc",
+# "split_by_core": False,
+ "init_value": "5",
+ "min_value": "1",
+ "max_value": "100"})
+
+ def tearDown(self):
+ pass
+
+
+if __name__ == "__main__":
+ pass
+