From eee866f42bd0fc8472e6295b4f26bd0697e59f1f Mon Sep 17 00:00:00 2001 From: Yaroslav Brustinov Date: Mon, 18 Jan 2016 22:00:51 +0200 Subject: regression: corrections of benchmarks move packet builder tests to functional folder add CPU utilization boundries for more tests print Skipping message fix relative drop counting API: convert hostname to IP at init of client side (trex_client.py) various: move python path determination to external common file add functional tests running bash script add sudo check to t-rex-64 --- .../regression/setups/kiwi02/benchmark.yaml | 20 +- .../regression/setups/trex-dan/benchmark.yaml | 4 +- .../regression/setups/trex-dan/config.yaml | 1 - .../regression/setups/trex04/benchmark.yaml | 2 +- .../functional_tests/payload_gen_test.py | 125 ++++++ .../functional_tests/pkt_bld_general_test.py | 29 ++ .../functional_tests/pkt_builder_test.py | 437 +++++++++++++++++++++ .../unit_tests/functional_tests/test.pcap | Bin 0 -> 346 bytes .../unit_tests/functional_tests/test2.pcap | Bin 0 -> 93 bytes .../unit_tests/functional_tests/test_cmp.pcap | Bin 0 -> 346 bytes .../unit_tests/functional_tests/vm_test.py | 77 ++++ .../functional_tests/vm_variable_test.py | 63 +++ .../unit_tests/pkt_bld_tests/payload_gen_test.py | 125 ------ .../pkt_bld_tests/pkt_bld_general_test.py | 29 -- .../unit_tests/pkt_bld_tests/pkt_builder_test.py | 437 --------------------- .../regression/unit_tests/pkt_bld_tests/test.pcap | Bin 346 -> 0 bytes .../regression/unit_tests/pkt_bld_tests/test2.pcap | Bin 93 -> 0 bytes .../unit_tests/pkt_bld_tests/test_cmp.pcap | Bin 346 -> 0 bytes .../regression/unit_tests/pkt_bld_tests/vm_test.py | 77 ---- .../unit_tests/pkt_bld_tests/vm_variable_test.py | 63 --- .../regression/unit_tests/trex_general_test.py | 7 +- .../regression/unit_tests/trex_nat_test.py | 7 +- .../regression/unit_tests/trex_rx_test.py | 54 ++- 23 files changed, 775 insertions(+), 782 deletions(-) create mode 100755 scripts/automation/regression/unit_tests/functional_tests/payload_gen_test.py create mode 100755 scripts/automation/regression/unit_tests/functional_tests/pkt_bld_general_test.py create mode 100755 scripts/automation/regression/unit_tests/functional_tests/pkt_builder_test.py create mode 100755 scripts/automation/regression/unit_tests/functional_tests/test.pcap create mode 100755 scripts/automation/regression/unit_tests/functional_tests/test2.pcap create mode 100755 scripts/automation/regression/unit_tests/functional_tests/test_cmp.pcap create mode 100755 scripts/automation/regression/unit_tests/functional_tests/vm_test.py create mode 100755 scripts/automation/regression/unit_tests/functional_tests/vm_variable_test.py delete mode 100755 scripts/automation/regression/unit_tests/pkt_bld_tests/payload_gen_test.py delete mode 100755 scripts/automation/regression/unit_tests/pkt_bld_tests/pkt_bld_general_test.py delete mode 100755 scripts/automation/regression/unit_tests/pkt_bld_tests/pkt_builder_test.py delete mode 100755 scripts/automation/regression/unit_tests/pkt_bld_tests/test.pcap delete mode 100755 scripts/automation/regression/unit_tests/pkt_bld_tests/test2.pcap delete mode 100755 scripts/automation/regression/unit_tests/pkt_bld_tests/test_cmp.pcap delete mode 100755 scripts/automation/regression/unit_tests/pkt_bld_tests/vm_test.py delete mode 100755 scripts/automation/regression/unit_tests/pkt_bld_tests/vm_variable_test.py (limited to 'scripts/automation/regression') diff --git a/scripts/automation/regression/setups/kiwi02/benchmark.yaml b/scripts/automation/regression/setups/kiwi02/benchmark.yaml index c387a994..ccd566cc 100644 --- a/scripts/automation/regression/setups/kiwi02/benchmark.yaml +++ b/scripts/automation/regression/setups/kiwi02/benchmark.yaml @@ -49,9 +49,9 @@ test_nat_simple : dual_port_mask : 1.0.0.0 pool_start : 200.0.0.0 pool_netmask : 255.255.255.0 - multiplier : 400 + multiplier : 10000 cpu_to_core_ratio : 37270000 - cores : 4 + cores : 1 exp_bw : 1 exp_latency : 1 allow_timeout_dev : YES @@ -63,8 +63,8 @@ test_nat_learning : dual_port_mask : 1.0.0.0 client_destination_mask : 255.0.0.0 server_destination_mask : 255.0.0.0 - multiplier : 400 - cores : 4 + multiplier : 10000 + cores : 1 nat_opened : 100000 cpu_to_core_ratio : 37270000 exp_bw : 1 @@ -78,7 +78,7 @@ test_routing_imix_64 : exp_latency : 1 test_routing_imix : - multiplier : 35 + multiplier : 32 cores : 2 cpu_to_core_ratio : 8900 exp_latency : 1 @@ -90,7 +90,7 @@ test_static_routing_imix : dual_port_mask : 1.0.0.0 client_destination_mask : 255.0.0.0 server_destination_mask : 255.0.0.0 - multiplier : 35 + multiplier : 32 cores : 2 cpu_to_core_ratio : 3766666 exp_latency : 1 @@ -102,7 +102,7 @@ test_static_routing_imix_asymmetric: dual_port_mask : 1.0.0.0 client_destination_mask : 255.0.0.0 server_destination_mask : 255.0.0.0 - multiplier : 18 + multiplier : 16 cores : 1 cpu_to_core_ratio : 3766666 exp_latency : 1 @@ -135,7 +135,11 @@ test_rx_check_http_ipv6: cores : 2 rx_sample_rate : 32 +test_rx_check_http_negative: + multiplier : 40000 + cores : 2 + rx_sample_rate : 32 test_jumbo: - multiplier : 56 + multiplier : 55 cores : 1 diff --git a/scripts/automation/regression/setups/trex-dan/benchmark.yaml b/scripts/automation/regression/setups/trex-dan/benchmark.yaml index f65fcf90..ae814551 100644 --- a/scripts/automation/regression/setups/trex-dan/benchmark.yaml +++ b/scripts/automation/regression/setups/trex-dan/benchmark.yaml @@ -49,7 +49,7 @@ test_nat_simple : dual_port_mask : 1.0.0.0 pool_start : 200.0.0.0 pool_netmask : 255.255.255.0 - multiplier : 150 + multiplier : 550 cores : 1 cpu_to_core_ratio : 37270000 exp_bw : 1 @@ -63,7 +63,7 @@ test_nat_learning : dual_port_mask : 1.0.0.0 client_destination_mask : 255.0.0.0 server_destination_mask : 255.0.0.0 - multiplier : 150 + multiplier : 550 cores : 1 nat_opened : 40000 cpu_to_core_ratio : 270 diff --git a/scripts/automation/regression/setups/trex-dan/config.yaml b/scripts/automation/regression/setups/trex-dan/config.yaml index ae60f9ad..5f91ea6a 100644 --- a/scripts/automation/regression/setups/trex-dan/config.yaml +++ b/scripts/automation/regression/setups/trex-dan/config.yaml @@ -34,7 +34,6 @@ trex: hostname : trex-dan -# version_path : /auto/proj-pcube-b/apps/PL-b/tools/bp_sim2/v1.57/ #/auto/srg-sce-swinfra-usr/emb/users/danklei/Work/asr1k/emb/private/bpsim/main/scripts cores : 2 modes : [VM] diff --git a/scripts/automation/regression/setups/trex04/benchmark.yaml b/scripts/automation/regression/setups/trex04/benchmark.yaml index 56193f46..e5459dce 100644 --- a/scripts/automation/regression/setups/trex04/benchmark.yaml +++ b/scripts/automation/regression/setups/trex04/benchmark.yaml @@ -20,7 +20,7 @@ test_routing_imix_64 : exp_latency : 1 test_routing_imix : - multiplier : 0.8 + multiplier : 0.5 cores : 1 cpu_to_core_ratio : 1800 exp_latency : 1 diff --git a/scripts/automation/regression/unit_tests/functional_tests/payload_gen_test.py b/scripts/automation/regression/unit_tests/functional_tests/payload_gen_test.py new file mode 100755 index 00000000..80d2b086 --- /dev/null +++ b/scripts/automation/regression/unit_tests/functional_tests/payload_gen_test.py @@ -0,0 +1,125 @@ +#!/router/bin/python + +import pkt_bld_general_test +from client_utils.packet_builder import * +from dpkt.ethernet import Ethernet +from dpkt.ip import IP +from dpkt.icmp import ICMP +from nose.tools import assert_equal +from nose.tools import assert_not_equal +from nose.tools import assert_raises +from nose.tools import raises +import re +import binascii + +class CTRexPayloadGen_Test(pkt_bld_general_test.CGeneralPktBld_Test): + + def setUp(self): +# echo = dpkt.icmp.ICMP.Echo() +# echo.id = random.randint(0, 0xffff) +# echo.seq = random.randint(0, 0xffff) +# echo.data = 'hello world' +# +# icmp = dpkt.icmp.ICMP() +# icmp.type = dpkt.icmp.ICMP_ECHO +# icmp.data = echo + + # packet generation is done directly using dpkt package and + self.packet = Ethernet() + ip = IP(src='\x01\x02\x03\x04', dst='\x05\x06\x07\x08', p=1) + icmp = ICMP(type=8, data=ICMP.Echo(id=123, seq=1, data='foobar')) + ip.data = icmp + self.packet.src = "\x00\x00\x55\x55\x00\x00" + self.packet.dst = "\x00\x00\x11\x11\x00\x00" + self.packet.data = ip + self.print_packet(self.packet) + self.max_pkt_size = 1400 + self.pld_gen = CTRexPktBuilder.CTRexPayloadGen(self.packet, self.max_pkt_size) + + @staticmethod + def special_match(strg, search=re.compile(r'[^a-zA-Z0-9]').search): + return not bool(search(strg)) + + @staticmethod + def principal_period(s): + # finds the string the repeats itself in the string + i = (s+s).find(s, 1, -1) + return None if i == -1 else s[:i] + + def test_gen_random_str(self): + generated_str = self.pld_gen.gen_random_str() + # print "\nGenerated string: {}".format(generated_str) + # chech that the generated string is accorsing to rules. + assert CTRexPayloadGen_Test.special_match(generated_str) + assert_equal(len(generated_str), (self.max_pkt_size - len(self.packet))) + + def test_gen_repeat_ptrn(self): + gen_len = self.max_pkt_size - len(self.packet) + # case 1 - repeated string + repeat_str = "HelloWorld" + generated_str = self.pld_gen.gen_repeat_ptrn(repeat_str) + for i in xrange(len(repeat_str)): + if generated_str.endswith(repeat_str[:i+1]): + # remove the string residue, if found + generated_str = generated_str[:-(i+1)] + # print generated_str + break + assert_equal(CTRexPayloadGen_Test.principal_period(generated_str), "HelloWorld") + + # case 2.1 - repeated single number - long number + repeat_num = 0x645564646465 + generated_str = self.pld_gen.gen_repeat_ptrn(repeat_num) + ptrn = binascii.unhexlify(hex(repeat_num)[2:]) + for i in xrange(len(ptrn)): + if generated_str.endswith(ptrn[:i+1]): + # remove the string residue, if found + generated_str = generated_str[:-(i+1)] + break + assert_equal(CTRexPayloadGen_Test.principal_period(generated_str), ptrn) + # case 2.2 - repeated single number - 1 byte + repeat_num = 0x64 + generated_str = self.pld_gen.gen_repeat_ptrn(repeat_num) + ptrn = binascii.unhexlify(hex(repeat_num)[2:]) + assert_equal(CTRexPayloadGen_Test.principal_period(generated_str), ptrn) + assert_equal(len(generated_str), (self.max_pkt_size - len(self.packet))) + # case 3 - repeated sequence + repeat_seq = (0x55, 0x60, 0x65, 0x70, 0x85) + ptrn = binascii.unhexlify(''.join(hex(x)[2:] for x in repeat_seq)) + generated_str = self.pld_gen.gen_repeat_ptrn(repeat_seq) + # ptrn = binascii.unhexlify(hex(repeat_num)[2:]) + for i in xrange(len(ptrn)): + if generated_str.endswith(ptrn[:i+1]): + # remove the string residue, if found + generated_str = generated_str[:-(i+1)] + break + assert_equal(CTRexPayloadGen_Test.principal_period(generated_str), ptrn) + + # in tuples, check that if any of the numbers exceeds limit + assert_raises(ValueError, self.pld_gen.gen_repeat_ptrn, (0x1, -18)) + assert_raises(ValueError, self.pld_gen.gen_repeat_ptrn, (0xFFF, 5)) + # finally, check an exception is thrown in rest of cases + assert_raises(ValueError, self.pld_gen.gen_repeat_ptrn, 5.5) + + + + + + + + + + + + + + + pass + + + def tearDown(self): + pass + + +if __name__ == "__main__": + pass + diff --git a/scripts/automation/regression/unit_tests/functional_tests/pkt_bld_general_test.py b/scripts/automation/regression/unit_tests/functional_tests/pkt_bld_general_test.py new file mode 100755 index 00000000..b630147b --- /dev/null +++ b/scripts/automation/regression/unit_tests/functional_tests/pkt_bld_general_test.py @@ -0,0 +1,29 @@ +#!/router/bin/python + +from nose.tools import assert_equal +from nose.tools import assert_not_equal +from nose.tools import assert_raises +from nose.tools import raises +import sys +import outer_packages +from client_utils.packet_builder import * + + +class CGeneralPktBld_Test(object): + def __init__(self): + pass + + @staticmethod + def print_packet(pkt_obj): + print "\nGenerated packet:\n{}".format(repr(pkt_obj)) + + + def setUp(self): + pass + + + def tearDown(self): + pass + +if __name__ == "__main__": + pass diff --git a/scripts/automation/regression/unit_tests/functional_tests/pkt_builder_test.py b/scripts/automation/regression/unit_tests/functional_tests/pkt_builder_test.py new file mode 100755 index 00000000..fd157c8a --- /dev/null +++ b/scripts/automation/regression/unit_tests/functional_tests/pkt_builder_test.py @@ -0,0 +1,437 @@ +#!/router/bin/python + +import pkt_bld_general_test +from client_utils.packet_builder import * +import dpkt +from nose.tools import assert_equal +from nose.tools import assert_not_equal +from nose.tools import assert_raises +from nose.tools import raises +import os +import random +import pprint + +class CTRexPktBuilderSanity_Test(pkt_bld_general_test.CGeneralPktBld_Test): + + def setUp(self): + pass + + def test_decode_ip_addr(self): + # test ipv4 case + assert_equal(CTRexPktBuilder._decode_ip_addr('1.2.3.4', "ipv4"), '\x01\x02\x03\x04') + assert_equal(CTRexPktBuilder._decode_ip_addr('127.0.0.1', "ipv4"), '\x7F\x00\x00\x01') + assert_raises(CTRexPktBuilder.IPAddressError, CTRexPktBuilder._decode_ip_addr, '1.2.3.4.5', "ipv4") + assert_raises(CTRexPktBuilder.IPAddressError, CTRexPktBuilder._decode_ip_addr, '1.2.3.4', "ipv6") + # test ipv6 case + assert_equal(CTRexPktBuilder._decode_ip_addr("5001::DB8:1:3333:1:1", "ipv6"), + 'P\x01\x00\x00\x00\x00\r\xb8\x00\x0133\x00\x01\x00\x01') + assert_raises(CTRexPktBuilder.IPAddressError, CTRexPktBuilder._decode_ip_addr, + '2001::DB8:1:2222::1:1', "ipv6") + + def test_decode_mac_addr(self): + assert_equal(CTRexPktBuilder._decode_mac_addr('00:de:34:ef:2e:f4'), '\x00\xde4\xef.\xf4') + assert_equal(CTRexPktBuilder._decode_mac_addr('00-de-55-ef-2e-f4'), '\x00\xdeU\xef.\xf4') + assert_raises(CTRexPktBuilder.MACAddressError, CTRexPktBuilder._decode_mac_addr, + '00:de:34:ef:2e:f4:f4') + assert_raises(CTRexPktBuilder.MACAddressError, CTRexPktBuilder._decode_mac_addr, + '1.2.3.4') + assert_raises(CTRexPktBuilder.MACAddressError, CTRexPktBuilder._decode_mac_addr, + '00 de 34 ef 2e f4 f4') + + def test_gen_layer_name(self): + pkt = CTRexPktBuilder() + assert_equal(pkt._gen_layer_name("eth"), "eth_1") + pkt._pkt_by_hdr = {'eth':None} # mock header pointer data + assert_equal(pkt._gen_layer_name("eth"), "eth_1") + pkt._pkt_by_hdr.update({'eth_1':None}) # more mock header pointer data + assert_equal(pkt._gen_layer_name("eth"), "eth_2") + + def test_set_layer_attr_basic(self): + pkt = CTRexPktBuilder() + pkt._pkt_by_hdr['ip'] = dpkt.ip.IP() + # case 1 - test full value assignment + pkt.set_layer_attr('ip', 'src', '\x01\x02\x03\x04') + assert_equal(pkt._pkt_by_hdr['ip'].src, '\x01\x02\x03\x04') + # case 2 - test bit assignment + pkt.set_layer_bit_attr('ip', 'off', dpkt.ip.IP_DF) + pkt.set_layer_bit_attr('ip', 'off', dpkt.ip.IP_MF) + assert_equal(bin(pkt._pkt_by_hdr['ip'].off), '0b110000000000000') + # case 3 - test assignment of not-exist attribute + assert_raises(ValueError, pkt.set_layer_bit_attr, 'ip', 'src_dst', 0) + # case 4.1 - test assignment of data attribute - without dpkt.Packet object + assert_raises(ValueError, pkt.set_layer_bit_attr, 'ip', 'data', "Not a dpkt.Packet object") + # case 4.2 - test assignment of data attribute - with dpkt.Packet object - tested under CTRexPktBuilder_Test class +# tcp = dpkt.tcp.TCP() + self.print_packet(pkt._pkt_by_hdr['ip']) +# pkt.set_layer_attr('ip', 'data', tcp) + # case 5 - test assignment of not-exist layer + assert_raises(KeyError, pkt.set_layer_bit_attr, 'no_such_layer', 'src', 0) + + def tearDown(self): + pass + + +class CTRexPktBuilder_Test(pkt_bld_general_test.CGeneralPktBld_Test): + + def setUp(self): + self.pkt_bld = CTRexPktBuilder() + self.pkt_bld.add_pkt_layer("l2", dpkt.ethernet.Ethernet()) + self.pp = pprint.PrettyPrinter(indent=4) + + def test_add_pkt_layer(self): + ip = dpkt.ip.IP(src='\x01\x02\x03\x04', dst='\x05\x06\x07\x08', p=1) + self.pkt_bld.add_pkt_layer("l3", ip) + tcp = dpkt.tcp.TCP(sport = 8080) + self.pkt_bld.add_pkt_layer("l4_tcp", tcp) + assert_equal(len(self.pkt_bld._pkt_by_hdr), 3) + assert_equal(self.pkt_bld._pkt_by_hdr.keys(), ['l2', 'l3', 'l4_tcp']) + self.print_packet(self.pkt_bld._packet) + assert_raises(ValueError, self.pkt_bld.add_pkt_layer, 'l2', dpkt.ethernet.Ethernet()) + + def test_set_ip_layer_addr(self): + ip = dpkt.ip.IP() + self.pkt_bld.add_pkt_layer("l3", ip) + self.pkt_bld.set_ip_layer_addr("l3", "src", "1.2.3.4") + self.print_packet(self.pkt_bld._packet) + assert_equal(self.pkt_bld._pkt_by_hdr['l3'].src, '\x01\x02\x03\x04') + # check that only IP layer is using this function + assert_raises(ValueError, self.pkt_bld.set_ip_layer_addr, 'l2', "src", "1.2.3.4") + + def test_calc_offset(self): + ip = dpkt.ip.IP() + self.pkt_bld.add_pkt_layer("l3", ip) + assert_equal(self.pkt_bld._calc_offset("l3", "src", 4), (14, 14+12)) + + def test_set_ipv6_layer_addr(self): + ip6 = dpkt.ip6.IP6() + self.pkt_bld.add_pkt_layer("l3", ip6) + self.pkt_bld.set_ipv6_layer_addr("l3", "src", "5001::DB8:1:3333:1:1") + self.print_packet(self.pkt_bld._packet) + assert_equal(self.pkt_bld._pkt_by_hdr['l3'].src, 'P\x01\x00\x00\x00\x00\r\xb8\x00\x0133\x00\x01\x00\x01') + # check that only IP layer is using this function + assert_raises(ValueError, self.pkt_bld.set_ipv6_layer_addr, 'l2', "src", "5001::DB8:1:3333:1:1") + + def test_set_eth_layer_addr(self): + ip = dpkt.ip.IP() + self.pkt_bld.add_pkt_layer("l3", ip) + self.pkt_bld.set_eth_layer_addr("l2", "src", "00:de:34:ef:2e:f4") + self.print_packet(self.pkt_bld._packet) + assert_equal(self.pkt_bld._pkt_by_hdr['l2'].src, '\x00\xde4\xef.\xf4') + # check that only IP layer is using this function + assert_raises(ValueError, self.pkt_bld.set_eth_layer_addr, 'l3', "src", "\x00\xde4\xef.\xf4") + + def test_set_layer_attr(self): + # extend the set_layer_attr_basic test by handling the following case: + # replace some header data with another layer, causing other layers to disconnect + # this also tests the _reevaluate_packet method + ip = dpkt.ip.IP(src='\x01\x02\x03\x04', dst='\x05\x06\x07\x08', p=1) + self.pkt_bld.add_pkt_layer("l3_ip", ip) + tcp = dpkt.tcp.TCP(sport = 8080) + self.pkt_bld.add_pkt_layer("l4_tcp", tcp) + # sanity: try changing data attr with non-dpkt.Packet instance + assert_raises(ValueError, self.pkt_bld.set_layer_attr, 'l2', 'data', "HelloWorld") + # now, add different L3 layer instead of existting one, L4 would disconnect + old_layer_count = len(self.pkt_bld._pkt_by_hdr) + new_ip = dpkt.ip.IP(src='\x05\x06\x07\x08', dst='\x01\x02\x03\x04') + print "\nBefore disconnecting layers:" + print "============================", + self.print_packet(self.pkt_bld._packet) + self.pkt_bld.set_layer_attr('l2', 'data', new_ip) + print "\nAfter disconnecting layers:" + print "===========================", + self.print_packet(self.pkt_bld._packet) + assert_not_equal(old_layer_count, len(self.pkt_bld._pkt_by_hdr)) + assert_equal(len(self.pkt_bld._pkt_by_hdr), 1) # only Eth layer appears + + def test_set_pkt_payload(self): + payload = "HelloWorld" + # test for setting a payload to an empty packet + empty_pkt = CTRexPktBuilder() + assert_raises(AttributeError, empty_pkt.set_pkt_payload, payload) + # add content to packet + ip = dpkt.ip.IP(src='\x01\x02\x03\x04', dst='\x05\x06\x07\x08', p=1) + self.pkt_bld.add_pkt_layer("l3_ip", ip) + tcp = dpkt.tcp.TCP(sport = 8080) + self.pkt_bld.add_pkt_layer("l4_tcp", tcp) + # now, set a payload for the packet + self.pkt_bld.set_pkt_payload(payload) + self.print_packet(self.pkt_bld._packet) + assert_equal(self.pkt_bld._pkt_by_hdr['l4_tcp'].data, payload) + + def test_load_packet(self): + # add content to packet + ip = dpkt.ip.IP(src='\x01\x02\x03\x04', dst='\x05\x06\x07\x08', p=1) + self.pkt_bld.add_pkt_layer("l3_ip", ip) + tcp = dpkt.tcp.TCP(sport = 8080) + self.pkt_bld.add_pkt_layer("l4_tcp", tcp) + self.pkt_bld.set_pkt_payload("HelloWorld") + + new_pkt = CTRexPktBuilder() + new_pkt.load_packet(self.pkt_bld._packet) + self.print_packet(new_pkt._packet) + assert_equal(len(new_pkt._pkt_by_hdr), 4) + assert_equal(new_pkt._pkt_by_hdr.keys(), + ['ip_1', + 'tcp_1', + 'pkt_final_payload', + 'ethernet_1' + ] + ) + assert_equal(new_pkt._pkt_by_hdr['pkt_final_payload'], "HelloWorld") + + def test_get_packet(self): + # get a pointer to the packet + assert(self.pkt_bld.get_packet(get_ptr=True) is self.pkt_bld._packet) + # get a copy of the packet + assert(not(self.pkt_bld.get_packet() is self.pkt_bld._packet)) + + def test_get_layer(self): + assert_equal(self.pkt_bld.get_layer('no_such_layer'), None) + assert(not(self.pkt_bld.get_layer('l2') is self.pkt_bld._packet)) + assert(type(self.pkt_bld.get_layer('l2')).__name__, "ethernet") + + def test_dump_to_pcap(self): + # set Ethernet layer attributes + self.pkt_bld.set_eth_layer_addr("l2", "src", "00:15:17:a7:75:a3") + self.pkt_bld.set_eth_layer_addr("l2", "dst", "e0:5f:b9:69:e9:22") + self.pkt_bld.set_layer_attr("l2", "type", dpkt.ethernet.ETH_TYPE_IP) + # set IP layer attributes + self.pkt_bld.add_pkt_layer("l3_ip", dpkt.ip.IP()) + self.pkt_bld.set_ip_layer_addr("l3_ip", "src", "21.0.0.2") + self.pkt_bld.set_ip_layer_addr("l3_ip", "dst", "22.0.0.12") + self.pkt_bld.set_layer_attr("l3_ip", "p", dpkt.ip.IP_PROTO_TCP) + # set TCP layer attributes + self.pkt_bld.add_pkt_layer("l4_tcp", dpkt.tcp.TCP()) + self.pkt_bld.set_layer_attr("l4_tcp", "sport", 13311) + self.pkt_bld.set_layer_attr("l4_tcp", "dport", 80) + self.pkt_bld.set_layer_attr("l4_tcp", "flags", 0) + self.pkt_bld.set_layer_attr("l4_tcp", "win", 32768) + self.pkt_bld.set_layer_attr("l4_tcp", "seq", 0) + # set packet payload, for example HTTP GET request + self.pkt_bld.set_pkt_payload('GET /10k_60k HTTP/1.1\r\nHost: 22.0.0.3\r\nConnection: Keep-Alive\r\nUser-Agent: Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; SV1; .NET CLR 1.1.4322; .NET CLR 2.0.50727)\r\nAccept: */*\r\nAccept-Language: en-us\r\nAccept-Encoding: gzip, deflate, compress\r\n\r\n') + # finally, set IP header len with relation to payload data + self.pkt_bld.set_layer_attr("l3_ip", "len", len(self.pkt_bld.get_layer('l3_ip'))) + + filepath = "unit_tests/functional_tests/test.pcap" + self.pkt_bld.dump_pkt_to_pcap(filepath) + assert os.path.isfile(filepath) + # remove pcap after creation - masked for now + # os.remove(filepath) + filepath = "/not/a/valid/path/test.pcap" + assert_raises(IOError, self.pkt_bld.dump_pkt_to_pcap, filepath) + # check that dump is not available for empty packet + new_pkt = CTRexPktBuilder() + assert_raises(CTRexPktBuilder.EmptyPacketError, new_pkt.dump_pkt_to_pcap, filepath) + + def test_dump_pkt(self): + # check that dump is not available for empty packet + new_pkt = CTRexPktBuilder() + assert_raises(CTRexPktBuilder.EmptyPacketError, new_pkt.dump_pkt) + + # set Ethernet layer attributes + self.pkt_bld.set_eth_layer_addr("l2", "src", "00:15:17:a7:75:a3") + self.pkt_bld.set_eth_layer_addr("l2", "dst", "e0:5f:b9:69:e9:22") + self.pkt_bld.set_layer_attr("l2", "type", dpkt.ethernet.ETH_TYPE_IP) + # set IP layer attributes + self.pkt_bld.add_pkt_layer("l3_ip", dpkt.ip.IP()) + self.pkt_bld.set_ip_layer_addr("l3_ip", "src", "21.0.0.2") + self.pkt_bld.set_ip_layer_addr("l3_ip", "dst", "22.0.0.12") + self.pkt_bld.set_layer_attr("l3_ip", "p", dpkt.ip.IP_PROTO_ICMP) + # set ICMP layer attributes + self.pkt_bld.add_pkt_layer("icmp", dpkt.icmp.ICMP()) + self.pkt_bld.set_layer_attr("icmp", "type", dpkt.icmp.ICMP_ECHO) + # set Echo(ICMP) layer attributes + self.pkt_bld.add_pkt_layer("icmp_echo", dpkt.icmp.ICMP.Echo()) + self.pkt_bld.set_layer_attr("icmp_echo", "id", 24528) + self.pkt_bld.set_layer_attr("icmp_echo", "seq", 11482) + self.pkt_bld.set_pkt_payload('hello world') + + # finally, set IP header len with relation to payload data + self.pkt_bld.set_layer_attr("l3_ip", "len", len(self.pkt_bld.get_layer('l3_ip'))) + + self.print_packet(self.pkt_bld.get_packet()) + assert_equal(self.pkt_bld.dump_pkt(), { + 'binary': [224, 95, 185, 105, 233, 34, 0, 21, 23, 167, 117, 163, 8, 0, 69, 0, 0, 39, 0, 0, 0, 0, 64, 1, 79, 201, 21, 0, 0, 2, 22, 0, 0, 12, 8, 0, 217, 134, 95, 208, 44, 218, 104, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100], + 'meta': '', + }) + + def test_set_vm_ip_range_ipv4(self): + # set some mock packet + ip = dpkt.ip.IP() + self.pkt_bld.add_pkt_layer("l3", ip) + self.pkt_bld.add_pkt_layer("l4_tcp", dpkt.tcp.TCP()) + self.pkt_bld.set_pkt_payload("HelloWorld") + + self.pkt_bld.set_vm_ip_range("l3", "src", + "10.0.0.1", "10.0.0.1", "10.0.0.255", 1, + "inc") +# self.pkt_bld.set_vm_custom_range(layer_name="l3", +# hdr_field="tos", +# init_val="10", start_val="10", end_val="200", add_val=2, val_size=1, +# operation="inc") + print '' + self.pp.pprint(self.pkt_bld.vm.dump()) + assert_equal(self.pkt_bld.vm.dump(), + { 'instructions': [ { 'init_value': '167772161', + 'max_value': '167772415', + 'min_value': '167772161', + 'name': 'l3__src', + 'op': 'inc', + 'size': 4, + 'type': 'flow_var'}, + { 'add_value': 1, + 'is_big_endian': False, + 'name': 'l3__src', + 'pkt_offset': 26, + 'type': 'write_flow_var'}, + { 'pkt_offset': 14, 'type': 'fix_checksum_ipv4'}], + 'split_by_var': ''} + ) + + def test_set_vm_ip_range_ipv4_no_checksum(self): + # set some mock packet + ip = dpkt.ip.IP() + self.pkt_bld.add_pkt_layer("l3", ip) + self.pkt_bld.add_pkt_layer("l4_tcp", dpkt.tcp.TCP()) + self.pkt_bld.set_pkt_payload("HelloWorld") + + self.pkt_bld.set_vm_ip_range(ip_layer_name="l3", + ip_field="src", + ip_init="10.0.0.1", ip_start="10.0.0.1", ip_end="10.0.0.255", + add_value=1, + operation="inc", + add_checksum_inst=False) + print '' + self.pp.pprint(self.pkt_bld.vm.dump()) + assert_equal(self.pkt_bld.vm.dump(), + { 'instructions': [ { 'init_value': '167772161', + 'max_value': '167772415', + 'min_value': '167772161', + 'name': 'l3__src', + 'op': 'inc', + 'size': 4, + 'type': 'flow_var'}, + { 'add_value': 1, + 'is_big_endian': False, + 'name': 'l3__src', + 'pkt_offset': 26, + 'type': 'write_flow_var'}], + 'split_by_var': ''} + ) + + def test_set_vm_ip_range_ipv6(self): + # set some mock packet + ip6 = dpkt.ip6.IP6() + self.pkt_bld.add_pkt_layer("l3", ip6) + self.pkt_bld.add_pkt_layer("l4_tcp", dpkt.tcp.TCP()) + self.pkt_bld.set_pkt_payload("HelloWorld") + + self.pkt_bld.set_vm_ip_range(ip_layer_name="l3", + ip_field="src", + ip_init="5001::DB8:1:3333:1:1", ip_start="5001::DB8:1:3333:1:1", ip_end="5001::DB8:1:3333:1:F", + add_value=1, + operation="inc", + ip_type="ipv6") + print '' + self.pp.pprint(self.pkt_bld.vm.dump()) + assert_equal(self.pkt_bld.vm.dump(), + { 'instructions': [ { 'init_value': '65537', + 'max_value': '65551', + 'min_value': '65537', + 'name': 'l3__src', + 'op': 'inc', + 'size': 4, + 'type': 'flow_var'}, + { 'add_value': 1, + 'is_big_endian': False, + 'name': 'l3__src', + 'pkt_offset': 34, + 'type': 'write_flow_var'}], + 'split_by_var': ''} + ) + + def test_set_vm_eth_range(self): + pass + + def test_set_vm_custom_range(self): + # set some mock packet + ip = dpkt.ip.IP() + self.pkt_bld.add_pkt_layer("l3", ip) + self.pkt_bld.add_pkt_layer("l4_tcp", dpkt.tcp.TCP()) + self.pkt_bld.set_pkt_payload("HelloWorld") + + self.pkt_bld.set_vm_custom_range(layer_name="l3", + hdr_field="tos", + init_val=10, start_val=10, end_val=200, add_val=2, val_size=1, + operation="inc") + print '' + self.pp.pprint(self.pkt_bld.vm.dump()) + assert_equal(self.pkt_bld.vm.dump(), + { 'instructions': [ { 'init_value': '10', + 'max_value': '200', + 'min_value': '10', + 'name': 'l3__tos', + 'op': 'inc', + 'size': 1, + 'type': 'flow_var'}, + { 'add_value': 2, + 'is_big_endian': False, + 'name': 'l3__tos', + 'pkt_offset': 15, + 'type': 'write_flow_var'}, + { 'pkt_offset': 14, 'type': 'fix_checksum_ipv4'}], + 'split_by_var': ''} + ) + + def test_various_ranges(self): + # set some mock packet + ip = dpkt.ip.IP() + self.pkt_bld.add_pkt_layer("l3", ip) + self.pkt_bld.add_pkt_layer("l4_tcp", dpkt.tcp.TCP()) + self.pkt_bld.set_pkt_payload("HelloWorld") + + self.pkt_bld.set_vm_ip_range("l3", "src", + "10.0.0.1", "10.0.0.1", "10.0.0.255", 1, + "inc") + self.pkt_bld.set_vm_custom_range(layer_name="l3", + hdr_field="tos", + init_val=10, start_val=10, end_val=200, add_val=2, val_size=1, + operation="inc") + print '' + self.pp.pprint(self.pkt_bld.vm.dump()) + assert_equal(self.pkt_bld.vm.dump(), + {'instructions': [{'init_value': '167772161', + 'max_value': '167772415', + 'min_value': '167772161', + 'name': 'l3__src', + 'op': 'inc', + 'size': 4, + 'type': 'flow_var'}, + {'init_value': '10', + 'max_value': '200', + 'min_value': '10', + 'name': 'l3__tos', + 'op': 'inc', + 'size': 1, + 'type': 'flow_var'}, + {'add_value': 2, + 'is_big_endian': False, + 'name': 'l3__tos', + 'pkt_offset': 15, + 'type': 'write_flow_var'}, + {'add_value': 1, + 'is_big_endian': False, + 'name': 'l3__src', + 'pkt_offset': 26, + 'type': 'write_flow_var'}, + {'pkt_offset': 14, 'type': 'fix_checksum_ipv4'}], + 'split_by_var': ''} + ) + + def tearDown(self): + pass + + +if __name__ == "__main__": + pass + diff --git a/scripts/automation/regression/unit_tests/functional_tests/test.pcap b/scripts/automation/regression/unit_tests/functional_tests/test.pcap new file mode 100755 index 00000000..e2b12565 Binary files /dev/null and b/scripts/automation/regression/unit_tests/functional_tests/test.pcap differ diff --git a/scripts/automation/regression/unit_tests/functional_tests/test2.pcap b/scripts/automation/regression/unit_tests/functional_tests/test2.pcap new file mode 100755 index 00000000..1d35d9c1 Binary files /dev/null and b/scripts/automation/regression/unit_tests/functional_tests/test2.pcap differ diff --git a/scripts/automation/regression/unit_tests/functional_tests/test_cmp.pcap b/scripts/automation/regression/unit_tests/functional_tests/test_cmp.pcap new file mode 100755 index 00000000..4c92859f Binary files /dev/null and b/scripts/automation/regression/unit_tests/functional_tests/test_cmp.pcap differ diff --git a/scripts/automation/regression/unit_tests/functional_tests/vm_test.py b/scripts/automation/regression/unit_tests/functional_tests/vm_test.py new file mode 100755 index 00000000..603c52db --- /dev/null +++ b/scripts/automation/regression/unit_tests/functional_tests/vm_test.py @@ -0,0 +1,77 @@ +#!/router/bin/python + +import pkt_bld_general_test +from client_utils.packet_builder import * +from nose.tools import assert_equal +from nose.tools import assert_not_equal +from nose.tools import assert_raises +from nose.tools import raises + +class CTRexVM_Test(pkt_bld_general_test.CGeneralPktBld_Test): + + def setUp(self): + self.vm = CTRexPktBuilder.CTRexVM() + + def test_add_flow_man_inst(self): + self.vm.add_flow_man_inst("src_ip") + assert_raises(CTRexPktBuilder.VMVarNameExistsError, self.vm.add_flow_man_inst, "src_ip") + self.vm.add_flow_man_inst("dst_ip", size=8, operation="inc", init_value=5, max_value=100) + assert_equal(self.vm.vm_variables["dst_ip"].dump(), + {"type": "flow_var", + "name": "dst_ip", + "size": 8, + # "big_endian": True, + "op": "inc", +# "split_by_core": False, + "init_value": "5", + "min_value": "1", + "max_value": "100"}) + assert_raises(CTRexPktBuilder.VMFieldNameError, self.vm.add_flow_man_inst, "src_mac", no_field=1) + + def test_load_flow_man(self): + vm2 = CTRexPktBuilder.CTRexVM() + vm2.add_flow_man_inst("dst_ip", size=8, operation="inc", init_value=5, max_value=100) + self.vm.load_flow_man(vm2.vm_variables["dst_ip"]) + assert_equal(self.vm.vm_variables["dst_ip"].dump(), + vm2.vm_variables["dst_ip"].dump()) + + def test_set_vm_var_field(self): + self.vm.add_flow_man_inst("src_ip") + self.vm.set_vm_var_field("src_ip", "size", 8) + assert_equal(self.vm.vm_variables["src_ip"].size, 8) + assert_raises(KeyError, self.vm.set_vm_var_field, "no_var", "no_field", 10) + assert_raises(CTRexPktBuilder.VMFieldNameError, self.vm.set_vm_var_field, "src_ip", "no_field", 10) + assert_raises(CTRexPktBuilder.VMFieldValueError, self.vm.set_vm_var_field, "src_ip", "operation", "rand") + + def test_dump(self): + self.vm.add_flow_man_inst("dst_ip", size=8, operation="inc", init_value=5, max_value=100) + self.vm.add_flow_man_inst("src_ip", size=8, operation="dec", init_value=10, min_value=2, max_value=100) + from pprint import pprint + print '' + pprint (self.vm.dump()) + assert_equal(self.vm.dump(), + {'instructions': [{'init_value': '10', + 'max_value': '100', + 'min_value': '2', + 'name': 'src_ip', + 'op': 'dec', + 'size': 8, + 'type': 'flow_var'}, + {'init_value': '5', + 'max_value': '100', + 'min_value': '1', + 'name': 'dst_ip', + 'op': 'inc', + 'size': 8, + 'type': 'flow_var'}], + 'split_by_var': ''} + ) + + + def tearDown(self): + pass + + +if __name__ == "__main__": + pass + diff --git a/scripts/automation/regression/unit_tests/functional_tests/vm_variable_test.py b/scripts/automation/regression/unit_tests/functional_tests/vm_variable_test.py new file mode 100755 index 00000000..af56b1b1 --- /dev/null +++ b/scripts/automation/regression/unit_tests/functional_tests/vm_variable_test.py @@ -0,0 +1,63 @@ +#!/router/bin/python + +import pkt_bld_general_test +from client_utils.packet_builder import * +from nose.tools import assert_equal +from nose.tools import assert_not_equal +from nose.tools import assert_raises +from nose.tools import raises + +class CTRexVMVariable_Test(pkt_bld_general_test.CGeneralPktBld_Test): + + def setUp(self): + self.vm_var = CTRexPktBuilder.CTRexVM.CTRexVMFlowVariable("test_var") + + def test_var_name(self): + assert_equal(self.vm_var.name, "test_var") + +# @raises(CTRexPktBuilder.VMVarNameError) + def test_set_field(self): + assert_raises(CTRexPktBuilder.VMFieldNameError, self.vm_var.set_field, "no_field", 10) + # test for 'size' field + assert_raises(CTRexPktBuilder.VMFieldTypeError, self.vm_var.set_field, "size", "wrong_type") + assert_raises(CTRexPktBuilder.VMFieldValueError, self.vm_var.set_field, "size", 10) # 10 is illegal size + self.vm_var.set_field("size", 8) + assert_equal(self.vm_var.size, 8) + # test for 'init_value' field + assert_raises(CTRexPktBuilder.VMFieldTypeError, self.vm_var.set_field, "init_value", '123') # 123 is wrong type, should be int + self.vm_var.set_field("init_value", 5) + assert_equal(self.vm_var.init_value, 5) + # test for 'operation' field + assert_raises(CTRexPktBuilder.VMFieldTypeError, self.vm_var.set_field, "operation", 1) # operation is field of type str + assert_raises(CTRexPktBuilder.VMFieldValueError, self.vm_var.set_field, "operation", "rand") # "rand" is illegal option + self.vm_var.set_field("operation", "inc") + assert_equal(self.vm_var.operation, "inc") + # test for 'split_by_core' field +# self.vm_var.set_field("split_by_core", 5) +# assert_equal(self.vm_var.split_by_core, True) + + def test_var_dump (self): + # set VM variable options + self.vm_var.set_field("size", 8) + self.vm_var.set_field("init_value", 5) + self.vm_var.set_field("operation", "inc") +# self.vm_var.set_field("split_by_core", False) + self.vm_var.set_field("max_value", 100) + assert_equal(self.vm_var.dump(), + {"type": "flow_var", + "name": "test_var", + "size": 8, + # "big_endian": True, + "op": "inc", +# "split_by_core": False, + "init_value": "5", + "min_value": "1", + "max_value": "100"}) + + def tearDown(self): + pass + + +if __name__ == "__main__": + pass + diff --git a/scripts/automation/regression/unit_tests/pkt_bld_tests/payload_gen_test.py b/scripts/automation/regression/unit_tests/pkt_bld_tests/payload_gen_test.py deleted file mode 100755 index 80d2b086..00000000 --- a/scripts/automation/regression/unit_tests/pkt_bld_tests/payload_gen_test.py +++ /dev/null @@ -1,125 +0,0 @@ -#!/router/bin/python - -import pkt_bld_general_test -from client_utils.packet_builder import * -from dpkt.ethernet import Ethernet -from dpkt.ip import IP -from dpkt.icmp import ICMP -from nose.tools import assert_equal -from nose.tools import assert_not_equal -from nose.tools import assert_raises -from nose.tools import raises -import re -import binascii - -class CTRexPayloadGen_Test(pkt_bld_general_test.CGeneralPktBld_Test): - - def setUp(self): -# echo = dpkt.icmp.ICMP.Echo() -# echo.id = random.randint(0, 0xffff) -# echo.seq = random.randint(0, 0xffff) -# echo.data = 'hello world' -# -# icmp = dpkt.icmp.ICMP() -# icmp.type = dpkt.icmp.ICMP_ECHO -# icmp.data = echo - - # packet generation is done directly using dpkt package and - self.packet = Ethernet() - ip = IP(src='\x01\x02\x03\x04', dst='\x05\x06\x07\x08', p=1) - icmp = ICMP(type=8, data=ICMP.Echo(id=123, seq=1, data='foobar')) - ip.data = icmp - self.packet.src = "\x00\x00\x55\x55\x00\x00" - self.packet.dst = "\x00\x00\x11\x11\x00\x00" - self.packet.data = ip - self.print_packet(self.packet) - self.max_pkt_size = 1400 - self.pld_gen = CTRexPktBuilder.CTRexPayloadGen(self.packet, self.max_pkt_size) - - @staticmethod - def special_match(strg, search=re.compile(r'[^a-zA-Z0-9]').search): - return not bool(search(strg)) - - @staticmethod - def principal_period(s): - # finds the string the repeats itself in the string - i = (s+s).find(s, 1, -1) - return None if i == -1 else s[:i] - - def test_gen_random_str(self): - generated_str = self.pld_gen.gen_random_str() - # print "\nGenerated string: {}".format(generated_str) - # chech that the generated string is accorsing to rules. - assert CTRexPayloadGen_Test.special_match(generated_str) - assert_equal(len(generated_str), (self.max_pkt_size - len(self.packet))) - - def test_gen_repeat_ptrn(self): - gen_len = self.max_pkt_size - len(self.packet) - # case 1 - repeated string - repeat_str = "HelloWorld" - generated_str = self.pld_gen.gen_repeat_ptrn(repeat_str) - for i in xrange(len(repeat_str)): - if generated_str.endswith(repeat_str[:i+1]): - # remove the string residue, if found - generated_str = generated_str[:-(i+1)] - # print generated_str - break - assert_equal(CTRexPayloadGen_Test.principal_period(generated_str), "HelloWorld") - - # case 2.1 - repeated single number - long number - repeat_num = 0x645564646465 - generated_str = self.pld_gen.gen_repeat_ptrn(repeat_num) - ptrn = binascii.unhexlify(hex(repeat_num)[2:]) - for i in xrange(len(ptrn)): - if generated_str.endswith(ptrn[:i+1]): - # remove the string residue, if found - generated_str = generated_str[:-(i+1)] - break - assert_equal(CTRexPayloadGen_Test.principal_period(generated_str), ptrn) - # case 2.2 - repeated single number - 1 byte - repeat_num = 0x64 - generated_str = self.pld_gen.gen_repeat_ptrn(repeat_num) - ptrn = binascii.unhexlify(hex(repeat_num)[2:]) - assert_equal(CTRexPayloadGen_Test.principal_period(generated_str), ptrn) - assert_equal(len(generated_str), (self.max_pkt_size - len(self.packet))) - # case 3 - repeated sequence - repeat_seq = (0x55, 0x60, 0x65, 0x70, 0x85) - ptrn = binascii.unhexlify(''.join(hex(x)[2:] for x in repeat_seq)) - generated_str = self.pld_gen.gen_repeat_ptrn(repeat_seq) - # ptrn = binascii.unhexlify(hex(repeat_num)[2:]) - for i in xrange(len(ptrn)): - if generated_str.endswith(ptrn[:i+1]): - # remove the string residue, if found - generated_str = generated_str[:-(i+1)] - break - assert_equal(CTRexPayloadGen_Test.principal_period(generated_str), ptrn) - - # in tuples, check that if any of the numbers exceeds limit - assert_raises(ValueError, self.pld_gen.gen_repeat_ptrn, (0x1, -18)) - assert_raises(ValueError, self.pld_gen.gen_repeat_ptrn, (0xFFF, 5)) - # finally, check an exception is thrown in rest of cases - assert_raises(ValueError, self.pld_gen.gen_repeat_ptrn, 5.5) - - - - - - - - - - - - - - - pass - - - def tearDown(self): - pass - - -if __name__ == "__main__": - pass - diff --git a/scripts/automation/regression/unit_tests/pkt_bld_tests/pkt_bld_general_test.py b/scripts/automation/regression/unit_tests/pkt_bld_tests/pkt_bld_general_test.py deleted file mode 100755 index b630147b..00000000 --- a/scripts/automation/regression/unit_tests/pkt_bld_tests/pkt_bld_general_test.py +++ /dev/null @@ -1,29 +0,0 @@ -#!/router/bin/python - -from nose.tools import assert_equal -from nose.tools import assert_not_equal -from nose.tools import assert_raises -from nose.tools import raises -import sys -import outer_packages -from client_utils.packet_builder import * - - -class CGeneralPktBld_Test(object): - def __init__(self): - pass - - @staticmethod - def print_packet(pkt_obj): - print "\nGenerated packet:\n{}".format(repr(pkt_obj)) - - - def setUp(self): - pass - - - def tearDown(self): - pass - -if __name__ == "__main__": - pass diff --git a/scripts/automation/regression/unit_tests/pkt_bld_tests/pkt_builder_test.py b/scripts/automation/regression/unit_tests/pkt_bld_tests/pkt_builder_test.py deleted file mode 100755 index a0b87a43..00000000 --- a/scripts/automation/regression/unit_tests/pkt_bld_tests/pkt_builder_test.py +++ /dev/null @@ -1,437 +0,0 @@ -#!/router/bin/python - -import pkt_bld_general_test -from client_utils.packet_builder import * -import dpkt -from nose.tools import assert_equal -from nose.tools import assert_not_equal -from nose.tools import assert_raises -from nose.tools import raises -import os -import random -import pprint - -class CTRexPktBuilderSanity_Test(pkt_bld_general_test.CGeneralPktBld_Test): - - def setUp(self): - pass - - def test_decode_ip_addr(self): - # test ipv4 case - assert_equal(CTRexPktBuilder._decode_ip_addr('1.2.3.4', "ipv4"), '\x01\x02\x03\x04') - assert_equal(CTRexPktBuilder._decode_ip_addr('127.0.0.1', "ipv4"), '\x7F\x00\x00\x01') - assert_raises(CTRexPktBuilder.IPAddressError, CTRexPktBuilder._decode_ip_addr, '1.2.3.4.5', "ipv4") - assert_raises(CTRexPktBuilder.IPAddressError, CTRexPktBuilder._decode_ip_addr, '1.2.3.4', "ipv6") - # test ipv6 case - assert_equal(CTRexPktBuilder._decode_ip_addr("5001::DB8:1:3333:1:1", "ipv6"), - 'P\x01\x00\x00\x00\x00\r\xb8\x00\x0133\x00\x01\x00\x01') - assert_raises(CTRexPktBuilder.IPAddressError, CTRexPktBuilder._decode_ip_addr, - '2001::DB8:1:2222::1:1', "ipv6") - - def test_decode_mac_addr(self): - assert_equal(CTRexPktBuilder._decode_mac_addr('00:de:34:ef:2e:f4'), '\x00\xde4\xef.\xf4') - assert_equal(CTRexPktBuilder._decode_mac_addr('00-de-55-ef-2e-f4'), '\x00\xdeU\xef.\xf4') - assert_raises(CTRexPktBuilder.MACAddressError, CTRexPktBuilder._decode_mac_addr, - '00:de:34:ef:2e:f4:f4') - assert_raises(CTRexPktBuilder.MACAddressError, CTRexPktBuilder._decode_mac_addr, - '1.2.3.4') - assert_raises(CTRexPktBuilder.MACAddressError, CTRexPktBuilder._decode_mac_addr, - '00 de 34 ef 2e f4 f4') - - def test_gen_layer_name(self): - pkt = CTRexPktBuilder() - assert_equal(pkt._gen_layer_name("eth"), "eth_1") - pkt._pkt_by_hdr = {'eth':None} # mock header pointer data - assert_equal(pkt._gen_layer_name("eth"), "eth_1") - pkt._pkt_by_hdr.update({'eth_1':None}) # more mock header pointer data - assert_equal(pkt._gen_layer_name("eth"), "eth_2") - - def test_set_layer_attr_basic(self): - pkt = CTRexPktBuilder() - pkt._pkt_by_hdr['ip'] = dpkt.ip.IP() - # case 1 - test full value assignment - pkt.set_layer_attr('ip', 'src', '\x01\x02\x03\x04') - assert_equal(pkt._pkt_by_hdr['ip'].src, '\x01\x02\x03\x04') - # case 2 - test bit assignment - pkt.set_layer_bit_attr('ip', 'off', dpkt.ip.IP_DF) - pkt.set_layer_bit_attr('ip', 'off', dpkt.ip.IP_MF) - assert_equal(bin(pkt._pkt_by_hdr['ip'].off), '0b110000000000000') - # case 3 - test assignment of not-exist attribute - assert_raises(ValueError, pkt.set_layer_bit_attr, 'ip', 'src_dst', 0) - # case 4.1 - test assignment of data attribute - without dpkt.Packet object - assert_raises(ValueError, pkt.set_layer_bit_attr, 'ip', 'data', "Not a dpkt.Packet object") - # case 4.2 - test assignment of data attribute - with dpkt.Packet object - tested under CTRexPktBuilder_Test class -# tcp = dpkt.tcp.TCP() - self.print_packet(pkt._pkt_by_hdr['ip']) -# pkt.set_layer_attr('ip', 'data', tcp) - # case 5 - test assignment of not-exist layer - assert_raises(KeyError, pkt.set_layer_bit_attr, 'no_such_layer', 'src', 0) - - def tearDown(self): - pass - - -class CTRexPktBuilder_Test(pkt_bld_general_test.CGeneralPktBld_Test): - - def setUp(self): - self.pkt_bld = CTRexPktBuilder() - self.pkt_bld.add_pkt_layer("l2", dpkt.ethernet.Ethernet()) - self.pp = pprint.PrettyPrinter(indent=4) - - def test_add_pkt_layer(self): - ip = dpkt.ip.IP(src='\x01\x02\x03\x04', dst='\x05\x06\x07\x08', p=1) - self.pkt_bld.add_pkt_layer("l3", ip) - tcp = dpkt.tcp.TCP(sport = 8080) - self.pkt_bld.add_pkt_layer("l4_tcp", tcp) - assert_equal(len(self.pkt_bld._pkt_by_hdr), 3) - assert_equal(self.pkt_bld._pkt_by_hdr.keys(), ['l2', 'l3', 'l4_tcp']) - self.print_packet(self.pkt_bld._packet) - assert_raises(ValueError, self.pkt_bld.add_pkt_layer, 'l2', dpkt.ethernet.Ethernet()) - - def test_set_ip_layer_addr(self): - ip = dpkt.ip.IP() - self.pkt_bld.add_pkt_layer("l3", ip) - self.pkt_bld.set_ip_layer_addr("l3", "src", "1.2.3.4") - self.print_packet(self.pkt_bld._packet) - assert_equal(self.pkt_bld._pkt_by_hdr['l3'].src, '\x01\x02\x03\x04') - # check that only IP layer is using this function - assert_raises(ValueError, self.pkt_bld.set_ip_layer_addr, 'l2', "src", "1.2.3.4") - - def test_calc_offset(self): - ip = dpkt.ip.IP() - self.pkt_bld.add_pkt_layer("l3", ip) - assert_equal(self.pkt_bld._calc_offset("l3", "src", 4), (14, 14+12)) - - def test_set_ipv6_layer_addr(self): - ip6 = dpkt.ip6.IP6() - self.pkt_bld.add_pkt_layer("l3", ip6) - self.pkt_bld.set_ipv6_layer_addr("l3", "src", "5001::DB8:1:3333:1:1") - self.print_packet(self.pkt_bld._packet) - assert_equal(self.pkt_bld._pkt_by_hdr['l3'].src, 'P\x01\x00\x00\x00\x00\r\xb8\x00\x0133\x00\x01\x00\x01') - # check that only IP layer is using this function - assert_raises(ValueError, self.pkt_bld.set_ipv6_layer_addr, 'l2', "src", "5001::DB8:1:3333:1:1") - - def test_set_eth_layer_addr(self): - ip = dpkt.ip.IP() - self.pkt_bld.add_pkt_layer("l3", ip) - self.pkt_bld.set_eth_layer_addr("l2", "src", "00:de:34:ef:2e:f4") - self.print_packet(self.pkt_bld._packet) - assert_equal(self.pkt_bld._pkt_by_hdr['l2'].src, '\x00\xde4\xef.\xf4') - # check that only IP layer is using this function - assert_raises(ValueError, self.pkt_bld.set_eth_layer_addr, 'l3', "src", "\x00\xde4\xef.\xf4") - - def test_set_layer_attr(self): - # extend the set_layer_attr_basic test by handling the following case: - # replace some header data with another layer, causing other layers to disconnect - # this also tests the _reevaluate_packet method - ip = dpkt.ip.IP(src='\x01\x02\x03\x04', dst='\x05\x06\x07\x08', p=1) - self.pkt_bld.add_pkt_layer("l3_ip", ip) - tcp = dpkt.tcp.TCP(sport = 8080) - self.pkt_bld.add_pkt_layer("l4_tcp", tcp) - # sanity: try changing data attr with non-dpkt.Packet instance - assert_raises(ValueError, self.pkt_bld.set_layer_attr, 'l2', 'data', "HelloWorld") - # now, add different L3 layer instead of existting one, L4 would disconnect - old_layer_count = len(self.pkt_bld._pkt_by_hdr) - new_ip = dpkt.ip.IP(src='\x05\x06\x07\x08', dst='\x01\x02\x03\x04') - print "\nBefore disconnecting layers:" - print "============================", - self.print_packet(self.pkt_bld._packet) - self.pkt_bld.set_layer_attr('l2', 'data', new_ip) - print "\nAfter disconnecting layers:" - print "===========================", - self.print_packet(self.pkt_bld._packet) - assert_not_equal(old_layer_count, len(self.pkt_bld._pkt_by_hdr)) - assert_equal(len(self.pkt_bld._pkt_by_hdr), 1) # only Eth layer appears - - def test_set_pkt_payload(self): - payload = "HelloWorld" - # test for setting a payload to an empty packet - empty_pkt = CTRexPktBuilder() - assert_raises(AttributeError, empty_pkt.set_pkt_payload, payload) - # add content to packet - ip = dpkt.ip.IP(src='\x01\x02\x03\x04', dst='\x05\x06\x07\x08', p=1) - self.pkt_bld.add_pkt_layer("l3_ip", ip) - tcp = dpkt.tcp.TCP(sport = 8080) - self.pkt_bld.add_pkt_layer("l4_tcp", tcp) - # now, set a payload for the packet - self.pkt_bld.set_pkt_payload(payload) - self.print_packet(self.pkt_bld._packet) - assert_equal(self.pkt_bld._pkt_by_hdr['l4_tcp'].data, payload) - - def test_load_packet(self): - # add content to packet - ip = dpkt.ip.IP(src='\x01\x02\x03\x04', dst='\x05\x06\x07\x08', p=1) - self.pkt_bld.add_pkt_layer("l3_ip", ip) - tcp = dpkt.tcp.TCP(sport = 8080) - self.pkt_bld.add_pkt_layer("l4_tcp", tcp) - self.pkt_bld.set_pkt_payload("HelloWorld") - - new_pkt = CTRexPktBuilder() - new_pkt.load_packet(self.pkt_bld._packet) - self.print_packet(new_pkt._packet) - assert_equal(len(new_pkt._pkt_by_hdr), 4) - assert_equal(new_pkt._pkt_by_hdr.keys(), - ['ip_1', - 'tcp_1', - 'pkt_final_payload', - 'ethernet_1' - ] - ) - assert_equal(new_pkt._pkt_by_hdr['pkt_final_payload'], "HelloWorld") - - def test_get_packet(self): - # get a pointer to the packet - assert(self.pkt_bld.get_packet(get_ptr=True) is self.pkt_bld._packet) - # get a copy of the packet - assert(not(self.pkt_bld.get_packet() is self.pkt_bld._packet)) - - def test_get_layer(self): - assert_equal(self.pkt_bld.get_layer('no_such_layer'), None) - assert(not(self.pkt_bld.get_layer('l2') is self.pkt_bld._packet)) - assert(type(self.pkt_bld.get_layer('l2')).__name__, "ethernet") - - def test_dump_to_pcap(self): - # set Ethernet layer attributes - self.pkt_bld.set_eth_layer_addr("l2", "src", "00:15:17:a7:75:a3") - self.pkt_bld.set_eth_layer_addr("l2", "dst", "e0:5f:b9:69:e9:22") - self.pkt_bld.set_layer_attr("l2", "type", dpkt.ethernet.ETH_TYPE_IP) - # set IP layer attributes - self.pkt_bld.add_pkt_layer("l3_ip", dpkt.ip.IP()) - self.pkt_bld.set_ip_layer_addr("l3_ip", "src", "21.0.0.2") - self.pkt_bld.set_ip_layer_addr("l3_ip", "dst", "22.0.0.12") - self.pkt_bld.set_layer_attr("l3_ip", "p", dpkt.ip.IP_PROTO_TCP) - # set TCP layer attributes - self.pkt_bld.add_pkt_layer("l4_tcp", dpkt.tcp.TCP()) - self.pkt_bld.set_layer_attr("l4_tcp", "sport", 13311) - self.pkt_bld.set_layer_attr("l4_tcp", "dport", 80) - self.pkt_bld.set_layer_attr("l4_tcp", "flags", 0) - self.pkt_bld.set_layer_attr("l4_tcp", "win", 32768) - self.pkt_bld.set_layer_attr("l4_tcp", "seq", 0) - # set packet payload, for example HTTP GET request - self.pkt_bld.set_pkt_payload('GET /10k_60k HTTP/1.1\r\nHost: 22.0.0.3\r\nConnection: Keep-Alive\r\nUser-Agent: Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; SV1; .NET CLR 1.1.4322; .NET CLR 2.0.50727)\r\nAccept: */*\r\nAccept-Language: en-us\r\nAccept-Encoding: gzip, deflate, compress\r\n\r\n') - # finally, set IP header len with relation to payload data - self.pkt_bld.set_layer_attr("l3_ip", "len", len(self.pkt_bld.get_layer('l3_ip'))) - - filepath = "unit_tests/pkt_bld_tests/test.pcap" - self.pkt_bld.dump_pkt_to_pcap(filepath) - assert os.path.isfile(filepath) - # remove pcap after creation - masked for now - # os.remove(filepath) - filepath = "/not/a/valid/path/test.pcap" - assert_raises(IOError, self.pkt_bld.dump_pkt_to_pcap, filepath) - # check that dump is not available for empty packet - new_pkt = CTRexPktBuilder() - assert_raises(CTRexPktBuilder.EmptyPacketError, new_pkt.dump_pkt_to_pcap, filepath) - - def test_dump_pkt(self): - # check that dump is not available for empty packet - new_pkt = CTRexPktBuilder() - assert_raises(CTRexPktBuilder.EmptyPacketError, new_pkt.dump_pkt) - - # set Ethernet layer attributes - self.pkt_bld.set_eth_layer_addr("l2", "src", "00:15:17:a7:75:a3") - self.pkt_bld.set_eth_layer_addr("l2", "dst", "e0:5f:b9:69:e9:22") - self.pkt_bld.set_layer_attr("l2", "type", dpkt.ethernet.ETH_TYPE_IP) - # set IP layer attributes - self.pkt_bld.add_pkt_layer("l3_ip", dpkt.ip.IP()) - self.pkt_bld.set_ip_layer_addr("l3_ip", "src", "21.0.0.2") - self.pkt_bld.set_ip_layer_addr("l3_ip", "dst", "22.0.0.12") - self.pkt_bld.set_layer_attr("l3_ip", "p", dpkt.ip.IP_PROTO_ICMP) - # set ICMP layer attributes - self.pkt_bld.add_pkt_layer("icmp", dpkt.icmp.ICMP()) - self.pkt_bld.set_layer_attr("icmp", "type", dpkt.icmp.ICMP_ECHO) - # set Echo(ICMP) layer attributes - self.pkt_bld.add_pkt_layer("icmp_echo", dpkt.icmp.ICMP.Echo()) - self.pkt_bld.set_layer_attr("icmp_echo", "id", 24528) - self.pkt_bld.set_layer_attr("icmp_echo", "seq", 11482) - self.pkt_bld.set_pkt_payload('hello world') - - # finally, set IP header len with relation to payload data - self.pkt_bld.set_layer_attr("l3_ip", "len", len(self.pkt_bld.get_layer('l3_ip'))) - - self.print_packet(self.pkt_bld.get_packet()) - assert_equal(self.pkt_bld.dump_pkt(), { - 'binary': [224, 95, 185, 105, 233, 34, 0, 21, 23, 167, 117, 163, 8, 0, 69, 0, 0, 39, 0, 0, 0, 0, 64, 1, 79, 201, 21, 0, 0, 2, 22, 0, 0, 12, 8, 0, 217, 134, 95, 208, 44, 218, 104, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100], - 'meta': '', - }) - - def test_set_vm_ip_range_ipv4(self): - # set some mock packet - ip = dpkt.ip.IP() - self.pkt_bld.add_pkt_layer("l3", ip) - self.pkt_bld.add_pkt_layer("l4_tcp", dpkt.tcp.TCP()) - self.pkt_bld.set_pkt_payload("HelloWorld") - - self.pkt_bld.set_vm_ip_range("l3", "src", - "10.0.0.1", "10.0.0.1", "10.0.0.255", 1, - "inc") -# self.pkt_bld.set_vm_custom_range(layer_name="l3", -# hdr_field="tos", -# init_val="10", start_val="10", end_val="200", add_val=2, val_size=1, -# operation="inc") - print '' - self.pp.pprint(self.pkt_bld.vm.dump()) - assert_equal(self.pkt_bld.vm.dump(), - { 'instructions': [ { 'init_value': '167772161', - 'max_value': '167772415', - 'min_value': '167772161', - 'name': 'l3__src', - 'op': 'inc', - 'size': 4, - 'type': 'flow_var'}, - { 'add_value': 1, - 'is_big_endian': False, - 'name': 'l3__src', - 'pkt_offset': 26, - 'type': 'write_flow_var'}, - { 'pkt_offset': 14, 'type': 'fix_checksum_ipv4'}], - 'split_by_var': ''} - ) - - def test_set_vm_ip_range_ipv4_no_checksum(self): - # set some mock packet - ip = dpkt.ip.IP() - self.pkt_bld.add_pkt_layer("l3", ip) - self.pkt_bld.add_pkt_layer("l4_tcp", dpkt.tcp.TCP()) - self.pkt_bld.set_pkt_payload("HelloWorld") - - self.pkt_bld.set_vm_ip_range(ip_layer_name="l3", - ip_field="src", - ip_init="10.0.0.1", ip_start="10.0.0.1", ip_end="10.0.0.255", - add_value=1, - operation="inc", - add_checksum_inst=False) - print '' - self.pp.pprint(self.pkt_bld.vm.dump()) - assert_equal(self.pkt_bld.vm.dump(), - { 'instructions': [ { 'init_value': '167772161', - 'max_value': '167772415', - 'min_value': '167772161', - 'name': 'l3__src', - 'op': 'inc', - 'size': 4, - 'type': 'flow_var'}, - { 'add_value': 1, - 'is_big_endian': False, - 'name': 'l3__src', - 'pkt_offset': 26, - 'type': 'write_flow_var'}], - 'split_by_var': ''} - ) - - def test_set_vm_ip_range_ipv6(self): - # set some mock packet - ip6 = dpkt.ip6.IP6() - self.pkt_bld.add_pkt_layer("l3", ip6) - self.pkt_bld.add_pkt_layer("l4_tcp", dpkt.tcp.TCP()) - self.pkt_bld.set_pkt_payload("HelloWorld") - - self.pkt_bld.set_vm_ip_range(ip_layer_name="l3", - ip_field="src", - ip_init="5001::DB8:1:3333:1:1", ip_start="5001::DB8:1:3333:1:1", ip_end="5001::DB8:1:3333:1:F", - add_value=1, - operation="inc", - ip_type="ipv6") - print '' - self.pp.pprint(self.pkt_bld.vm.dump()) - assert_equal(self.pkt_bld.vm.dump(), - { 'instructions': [ { 'init_value': '65537', - 'max_value': '65551', - 'min_value': '65537', - 'name': 'l3__src', - 'op': 'inc', - 'size': 4, - 'type': 'flow_var'}, - { 'add_value': 1, - 'is_big_endian': False, - 'name': 'l3__src', - 'pkt_offset': 34, - 'type': 'write_flow_var'}], - 'split_by_var': ''} - ) - - def test_set_vm_eth_range(self): - pass - - def test_set_vm_custom_range(self): - # set some mock packet - ip = dpkt.ip.IP() - self.pkt_bld.add_pkt_layer("l3", ip) - self.pkt_bld.add_pkt_layer("l4_tcp", dpkt.tcp.TCP()) - self.pkt_bld.set_pkt_payload("HelloWorld") - - self.pkt_bld.set_vm_custom_range(layer_name="l3", - hdr_field="tos", - init_val=10, start_val=10, end_val=200, add_val=2, val_size=1, - operation="inc") - print '' - self.pp.pprint(self.pkt_bld.vm.dump()) - assert_equal(self.pkt_bld.vm.dump(), - { 'instructions': [ { 'init_value': '10', - 'max_value': '200', - 'min_value': '10', - 'name': 'l3__tos', - 'op': 'inc', - 'size': 1, - 'type': 'flow_var'}, - { 'add_value': 2, - 'is_big_endian': False, - 'name': 'l3__tos', - 'pkt_offset': 15, - 'type': 'write_flow_var'}, - { 'pkt_offset': 14, 'type': 'fix_checksum_ipv4'}], - 'split_by_var': ''} - ) - - def test_various_ranges(self): - # set some mock packet - ip = dpkt.ip.IP() - self.pkt_bld.add_pkt_layer("l3", ip) - self.pkt_bld.add_pkt_layer("l4_tcp", dpkt.tcp.TCP()) - self.pkt_bld.set_pkt_payload("HelloWorld") - - self.pkt_bld.set_vm_ip_range("l3", "src", - "10.0.0.1", "10.0.0.1", "10.0.0.255", 1, - "inc") - self.pkt_bld.set_vm_custom_range(layer_name="l3", - hdr_field="tos", - init_val=10, start_val=10, end_val=200, add_val=2, val_size=1, - operation="inc") - print '' - self.pp.pprint(self.pkt_bld.vm.dump()) - assert_equal(self.pkt_bld.vm.dump(), - {'instructions': [{'init_value': '167772161', - 'max_value': '167772415', - 'min_value': '167772161', - 'name': 'l3__src', - 'op': 'inc', - 'size': 4, - 'type': 'flow_var'}, - {'init_value': '10', - 'max_value': '200', - 'min_value': '10', - 'name': 'l3__tos', - 'op': 'inc', - 'size': 1, - 'type': 'flow_var'}, - {'add_value': 2, - 'is_big_endian': False, - 'name': 'l3__tos', - 'pkt_offset': 15, - 'type': 'write_flow_var'}, - {'add_value': 1, - 'is_big_endian': False, - 'name': 'l3__src', - 'pkt_offset': 26, - 'type': 'write_flow_var'}, - {'pkt_offset': 14, 'type': 'fix_checksum_ipv4'}], - 'split_by_var': ''} - ) - - def tearDown(self): - pass - - -if __name__ == "__main__": - pass - diff --git a/scripts/automation/regression/unit_tests/pkt_bld_tests/test.pcap b/scripts/automation/regression/unit_tests/pkt_bld_tests/test.pcap deleted file mode 100755 index da1830b8..00000000 Binary files a/scripts/automation/regression/unit_tests/pkt_bld_tests/test.pcap and /dev/null differ diff --git a/scripts/automation/regression/unit_tests/pkt_bld_tests/test2.pcap b/scripts/automation/regression/unit_tests/pkt_bld_tests/test2.pcap deleted file mode 100755 index 1d35d9c1..00000000 Binary files a/scripts/automation/regression/unit_tests/pkt_bld_tests/test2.pcap and /dev/null differ diff --git a/scripts/automation/regression/unit_tests/pkt_bld_tests/test_cmp.pcap b/scripts/automation/regression/unit_tests/pkt_bld_tests/test_cmp.pcap deleted file mode 100755 index 4c92859f..00000000 Binary files a/scripts/automation/regression/unit_tests/pkt_bld_tests/test_cmp.pcap and /dev/null differ diff --git a/scripts/automation/regression/unit_tests/pkt_bld_tests/vm_test.py b/scripts/automation/regression/unit_tests/pkt_bld_tests/vm_test.py deleted file mode 100755 index 603c52db..00000000 --- a/scripts/automation/regression/unit_tests/pkt_bld_tests/vm_test.py +++ /dev/null @@ -1,77 +0,0 @@ -#!/router/bin/python - -import pkt_bld_general_test -from client_utils.packet_builder import * -from nose.tools import assert_equal -from nose.tools import assert_not_equal -from nose.tools import assert_raises -from nose.tools import raises - -class CTRexVM_Test(pkt_bld_general_test.CGeneralPktBld_Test): - - def setUp(self): - self.vm = CTRexPktBuilder.CTRexVM() - - def test_add_flow_man_inst(self): - self.vm.add_flow_man_inst("src_ip") - assert_raises(CTRexPktBuilder.VMVarNameExistsError, self.vm.add_flow_man_inst, "src_ip") - self.vm.add_flow_man_inst("dst_ip", size=8, operation="inc", init_value=5, max_value=100) - assert_equal(self.vm.vm_variables["dst_ip"].dump(), - {"type": "flow_var", - "name": "dst_ip", - "size": 8, - # "big_endian": True, - "op": "inc", -# "split_by_core": False, - "init_value": "5", - "min_value": "1", - "max_value": "100"}) - assert_raises(CTRexPktBuilder.VMFieldNameError, self.vm.add_flow_man_inst, "src_mac", no_field=1) - - def test_load_flow_man(self): - vm2 = CTRexPktBuilder.CTRexVM() - vm2.add_flow_man_inst("dst_ip", size=8, operation="inc", init_value=5, max_value=100) - self.vm.load_flow_man(vm2.vm_variables["dst_ip"]) - assert_equal(self.vm.vm_variables["dst_ip"].dump(), - vm2.vm_variables["dst_ip"].dump()) - - def test_set_vm_var_field(self): - self.vm.add_flow_man_inst("src_ip") - self.vm.set_vm_var_field("src_ip", "size", 8) - assert_equal(self.vm.vm_variables["src_ip"].size, 8) - assert_raises(KeyError, self.vm.set_vm_var_field, "no_var", "no_field", 10) - assert_raises(CTRexPktBuilder.VMFieldNameError, self.vm.set_vm_var_field, "src_ip", "no_field", 10) - assert_raises(CTRexPktBuilder.VMFieldValueError, self.vm.set_vm_var_field, "src_ip", "operation", "rand") - - def test_dump(self): - self.vm.add_flow_man_inst("dst_ip", size=8, operation="inc", init_value=5, max_value=100) - self.vm.add_flow_man_inst("src_ip", size=8, operation="dec", init_value=10, min_value=2, max_value=100) - from pprint import pprint - print '' - pprint (self.vm.dump()) - assert_equal(self.vm.dump(), - {'instructions': [{'init_value': '10', - 'max_value': '100', - 'min_value': '2', - 'name': 'src_ip', - 'op': 'dec', - 'size': 8, - 'type': 'flow_var'}, - {'init_value': '5', - 'max_value': '100', - 'min_value': '1', - 'name': 'dst_ip', - 'op': 'inc', - 'size': 8, - 'type': 'flow_var'}], - 'split_by_var': ''} - ) - - - def tearDown(self): - pass - - -if __name__ == "__main__": - pass - diff --git a/scripts/automation/regression/unit_tests/pkt_bld_tests/vm_variable_test.py b/scripts/automation/regression/unit_tests/pkt_bld_tests/vm_variable_test.py deleted file mode 100755 index af56b1b1..00000000 --- a/scripts/automation/regression/unit_tests/pkt_bld_tests/vm_variable_test.py +++ /dev/null @@ -1,63 +0,0 @@ -#!/router/bin/python - -import pkt_bld_general_test -from client_utils.packet_builder import * -from nose.tools import assert_equal -from nose.tools import assert_not_equal -from nose.tools import assert_raises -from nose.tools import raises - -class CTRexVMVariable_Test(pkt_bld_general_test.CGeneralPktBld_Test): - - def setUp(self): - self.vm_var = CTRexPktBuilder.CTRexVM.CTRexVMFlowVariable("test_var") - - def test_var_name(self): - assert_equal(self.vm_var.name, "test_var") - -# @raises(CTRexPktBuilder.VMVarNameError) - def test_set_field(self): - assert_raises(CTRexPktBuilder.VMFieldNameError, self.vm_var.set_field, "no_field", 10) - # test for 'size' field - assert_raises(CTRexPktBuilder.VMFieldTypeError, self.vm_var.set_field, "size", "wrong_type") - assert_raises(CTRexPktBuilder.VMFieldValueError, self.vm_var.set_field, "size", 10) # 10 is illegal size - self.vm_var.set_field("size", 8) - assert_equal(self.vm_var.size, 8) - # test for 'init_value' field - assert_raises(CTRexPktBuilder.VMFieldTypeError, self.vm_var.set_field, "init_value", '123') # 123 is wrong type, should be int - self.vm_var.set_field("init_value", 5) - assert_equal(self.vm_var.init_value, 5) - # test for 'operation' field - assert_raises(CTRexPktBuilder.VMFieldTypeError, self.vm_var.set_field, "operation", 1) # operation is field of type str - assert_raises(CTRexPktBuilder.VMFieldValueError, self.vm_var.set_field, "operation", "rand") # "rand" is illegal option - self.vm_var.set_field("operation", "inc") - assert_equal(self.vm_var.operation, "inc") - # test for 'split_by_core' field -# self.vm_var.set_field("split_by_core", 5) -# assert_equal(self.vm_var.split_by_core, True) - - def test_var_dump (self): - # set VM variable options - self.vm_var.set_field("size", 8) - self.vm_var.set_field("init_value", 5) - self.vm_var.set_field("operation", "inc") -# self.vm_var.set_field("split_by_core", False) - self.vm_var.set_field("max_value", 100) - assert_equal(self.vm_var.dump(), - {"type": "flow_var", - "name": "test_var", - "size": 8, - # "big_endian": True, - "op": "inc", -# "split_by_core": False, - "init_value": "5", - "min_value": "1", - "max_value": "100"}) - - def tearDown(self): - pass - - -if __name__ == "__main__": - pass - diff --git a/scripts/automation/regression/unit_tests/trex_general_test.py b/scripts/automation/regression/unit_tests/trex_general_test.py index af3c897d..9bcccaab 100755 --- a/scripts/automation/regression/unit_tests/trex_general_test.py +++ b/scripts/automation/regression/unit_tests/trex_general_test.py @@ -213,7 +213,7 @@ class CTRexGeneral_Test(unittest.TestCase): if not test_name: test_name = self.get_name() if test_name not in self.benchmark: - self.skip('No data in benchmark.yaml for test %s, skipping.' % test_name) + self.skip('No data in benchmark.yaml for test: %s, param: %s. Skipping.' % (test_name, param)) if sub_param: return self.benchmark[test_name][param].get(sub_param) else: @@ -230,7 +230,7 @@ class CTRexGeneral_Test(unittest.TestCase): trex_tx_pckt = trex_res.get_last_value("trex-global.data.m_total_tx_pkts") trex_drops = trex_res.get_total_drops() trex_drop_rate = trex_res.get_drop_rate() - if ( (trex_drops/trex_tx_pckt) > 0.001) and (trex_drop_rate > 0.0): # deliberately mask kickoff drops when T-Rex first initiated + if ( trex_drops > 0.001 * trex_tx_pckt) and (trex_drop_rate > 0.0): # deliberately mask kickoff drops when T-Rex first initiated self.fail('Number of packet drops larger than 0.1% of all traffic') # check queue full, queue drop, allocation error @@ -282,7 +282,8 @@ class CTRexGeneral_Test(unittest.TestCase): self.fail_reasons.append(reason) # skip running of the test, counts as 'passed' but prints 'skipped' - def skip(self, message = ''): + def skip(self, message = 'Unknown reason'): + print 'Skip: %s' % message self.skipping = True raise SkipTest(message) diff --git a/scripts/automation/regression/unit_tests/trex_nat_test.py b/scripts/automation/regression/unit_tests/trex_nat_test.py index 452f7ecf..9fe12507 100755 --- a/scripts/automation/regression/unit_tests/trex_nat_test.py +++ b/scripts/automation/regression/unit_tests/trex_nat_test.py @@ -67,8 +67,7 @@ class CTRexNoNat_Test(CTRexGeneral_Test):#(unittest.TestCase): self.check_results_gt (learning_stats, 'm_total_nat_open', expected_nat_opened) self.check_general_scenario_results(trex_res) - - # self.check_CPU_benchmark(trex_res, 10) + self.check_CPU_benchmark(trex_res, minimal_cpu = 10, maximal_cpu = 85) def tearDown(self): CTRexGeneral_Test.tearDown(self) @@ -128,7 +127,7 @@ class CTRexNat_Test(CTRexGeneral_Test):#(unittest.TestCase): if self.get_benchmark_param('allow_timeout_dev'): nat_timeout_ratio = trex_nat_stats['m_total_nat_time_out']/trex_nat_stats['m_total_nat_open'] if nat_timeout_ratio > 0.005: - self.fail('TRex nat_timeout ratio %f > 0.005 (0.5%) and not as expected to be less than 0.5%' %(nat_timeout_ratio)) + self.fail('TRex nat_timeout ratio %f > 0.5%%' % nat_timeout_ratio) else: self.check_results_eq (trex_nat_stats,'m_total_nat_time_out', 0.0) self.check_results_eq (trex_nat_stats,'m_total_nat_no_fid', 0.0) @@ -142,7 +141,7 @@ class CTRexNat_Test(CTRexGeneral_Test):#(unittest.TestCase): # test_norm_cpu = 2*(trex_tx_pckt/(core*cpu_util)) # print "test_norm_cpu is: ", test_norm_cpu - # self.check_CPU_benchmark(trex_res, 10) + self.check_CPU_benchmark(trex_res, minimal_cpu = 10, maximal_cpu = 85) #if ( abs((test_norm_cpu/self.get_benchmark_param('cpu_to_core_ratio')) - 1) > 0.03): # raiseraise AbnormalResultError('Normalized bandwidth to CPU utilization ratio exceeds 3%') diff --git a/scripts/automation/regression/unit_tests/trex_rx_test.py b/scripts/automation/regression/unit_tests/trex_rx_test.py index a37615c4..4f404616 100755 --- a/scripts/automation/regression/unit_tests/trex_rx_test.py +++ b/scripts/automation/regression/unit_tests/trex_rx_test.py @@ -223,9 +223,7 @@ class CTRexRx_Test(CTRexGeneral_Test): self.skip('This test uses NAT, not relevant for loopback') self.router.configure_basic_interfaces() - stat_route_dict = self.get_benchmark_param('stat_route_dict') - stat_route_obj = CStaticRouteConfig(stat_route_dict) - self.router.config_static_routing(stat_route_obj, mode = "config") + self.router.config_pbr(mode = "config") core = self.get_benchmark_param('cores') mult = self.get_benchmark_param('multiplier') @@ -234,48 +232,40 @@ class CTRexRx_Test(CTRexGeneral_Test): ret = self.trex.start_trex( c = core, m = mult, - #p = True, - #nc = True, + p = True, rx_check = sample_rate, - d = 80, + d = 50, f = 'cap2/http_simple.yaml', l = 1000, k = 10, learn_verify = True, l_pkt_mode = 2) - print 'Run for 2 minutes, expect no errors' - trex_res = self.trex.sample_x_seconds(60) + print 'Run for 40 seconds, expect no errors' + trex_res = self.trex.sample_x_seconds(40) print ("\nLATEST RESULT OBJECT:") print trex_res self.check_general_scenario_results(trex_res) - self.check_CPU_benchmark(trex_res, 10) + self.check_CPU_benchmark(trex_res) self.check_rx_errors(trex_res) - try: - # TODO: add nat/zbf config for router - nat_dict = self.get_benchmark_param('nat_dict') - nat_obj = CNatConfig(nat_dict) - self.router.config_nat(nat_obj) - self.router.config_nat_verify() - self.router.config_zbf() - - print 'Run until finish, expect errors' - trex_res = self.trex.sample_to_run_finish() - - self.router.config_no_zbf() - self.router.clear_nat_translations() - print ("\nLATEST RESULT OBJECT:") - print trex_res - nat_stats = self.router.get_nat_stats() - print nat_stats - self.check_general_scenario_results(trex_res) - self.check_CPU_benchmark(trex_res, 10) - self.check_rx_errors(trex_res) + print 'Run until finish, expect errors' + old_errors = copy.deepcopy(self.fail_reasons) + nat_dict = self.get_benchmark_param('nat_dict', test_name = 'test_nat_simple') + nat_obj = CNatConfig(nat_dict) + self.router.config_nat(nat_obj) + self.router.config_zbf() + trex_res = self.trex.sample_to_run_finish() + self.router.config_no_nat(nat_obj) + self.router.config_no_zbf() + print ("\nLATEST RESULT OBJECT:") + print trex_res + self.check_rx_errors(trex_res) + if self.fail_reasons == old_errors: self.fail('Expected errors here, got none.') - except Exception as e: - print 'Got errors as expected: %s' % e - pass + else: + print 'Got errors as expected.' + self.fail_reasons = old_errors def tearDown(self): CTRexGeneral_Test.tearDown(self) -- cgit 1.2.3-korg