summaryrefslogtreecommitdiffstats
path: root/scripts/automation
diff options
context:
space:
mode:
authorYaroslav Brustinov <ybrustin@cisco.com>2016-03-09 14:05:08 +0200
committerYaroslav Brustinov <ybrustin@cisco.com>2016-03-09 14:05:08 +0200
commit2258ea303c6c3573998dea7b9d347d46873018db (patch)
treeff446df4df551ec8efce7ace69f8ea029fd0996d /scripts/automation
parentbcf08b4e5715029ccfa54d22b0d5875b4c7cda78 (diff)
regression: stl updates
Diffstat (limited to 'scripts/automation')
-rw-r--r--scripts/automation/regression/functional_tests/config.yaml74
-rwxr-xr-xscripts/automation/regression/functional_tests/functional_general_test.py22
-rw-r--r--scripts/automation/regression/functional_tests/golden/basic_imix_golden.capbin0 -> 198474 bytes
-rw-r--r--scripts/automation/regression/functional_tests/golden/basic_imix_vm_golden.capbin0 -> 316552 bytes
-rw-r--r--scripts/automation/regression/functional_tests/golden/basic_tuple_gen_golden.capbin0 -> 38024 bytes
-rw-r--r--scripts/automation/regression/functional_tests/golden/udp_590.capbin0 -> 630 bytes
-rwxr-xr-xscripts/automation/regression/functional_tests/hltapi_stream_builder_test.py629
-rwxr-xr-xscripts/automation/regression/functional_tests/misc_methods_test.py61
-rwxr-xr-xscripts/automation/regression/functional_tests/pkt_bld_general_test.py28
-rwxr-xr-xscripts/automation/regression/functional_tests/platform_cmd_cache_test.py60
-rwxr-xr-xscripts/automation/regression/functional_tests/platform_cmd_link_test.py62
-rwxr-xr-xscripts/automation/regression/functional_tests/platform_device_cfg_test.py20
-rwxr-xr-xscripts/automation/regression/functional_tests/platform_dual_if_obj_test.py31
-rwxr-xr-xscripts/automation/regression/functional_tests/platform_if_manager_test.py40
-rwxr-xr-xscripts/automation/regression/functional_tests/platform_if_obj_test.py49
-rw-r--r--scripts/automation/regression/functional_tests/scapy_pkt_builder_test.py368
-rw-r--r--scripts/automation/regression/functional_tests/stl_basic_tests.py263
-rwxr-xr-xscripts/automation/regression/stateful_tests/tests_exceptions.py37
-rwxr-xr-xscripts/automation/regression/stateful_tests/trex_general_test.py319
-rwxr-xr-xscripts/automation/regression/stateful_tests/trex_imix_test.py202
-rwxr-xr-xscripts/automation/regression/stateful_tests/trex_ipv6_test.py102
-rwxr-xr-xscripts/automation/regression/stateful_tests/trex_nat_test.py169
-rwxr-xr-xscripts/automation/regression/stateful_tests/trex_nbar_test.py193
-rwxr-xr-xscripts/automation/regression/stateful_tests/trex_rx_test.py275
-rwxr-xr-xscripts/automation/regression/stateless_tests/stl_examples_test.py35
-rw-r--r--scripts/automation/regression/stateless_tests/stl_general_test.py62
26 files changed, 3101 insertions, 0 deletions
diff --git a/scripts/automation/regression/functional_tests/config.yaml b/scripts/automation/regression/functional_tests/config.yaml
new file mode 100644
index 00000000..4f4c7c40
--- /dev/null
+++ b/scripts/automation/regression/functional_tests/config.yaml
@@ -0,0 +1,74 @@
+################################################################
+#### T-Rex nightly test configuration file ####
+################################################################
+
+
+### T-Rex configuration:
+# hostname - can be DNS name or IP for the TRex machine for ssh to the box
+# password - root password for TRex machine
+# is_dual - should the TRex inject with -p ?
+# version_path - path to the t-rex version and executable
+# cores - how many cores should be used
+# latency - rate of latency packets injected by the TRex
+
+### Router configuration:
+# hostname - the router hostname as apears in ______# cli prefix
+# ip_address - the router's ip that can be used to communicate with
+# image - the desired imaged wished to be loaded as the router's running config
+# line_password - router password when access via Telent
+# en_password - router password when changing to "enable" mode
+# interfaces - an array of client-server pairs, representing the interfaces configurations of the router
+# configurations - an array of configurations that could possibly loaded into the router during the test.
+# The "clean" configuration is a mandatory configuration the router will load with to run the basic test bench
+
+### TFTP configuration:
+# hostname - the tftp hostname
+# ip_address - the tftp's ip address
+# images_path - the tftp's relative path in which the router's images are located
+
+### Test_misc configuration:
+# expected_bw - the "golden" bandwidth (in Gbps) results planned on receiving from the test
+
+trex:
+ hostname : hostname
+ password : root password
+ version_path : not used
+ cores : 1
+
+router:
+ model : device model
+ hostname : device hostname
+ ip_address : device ip
+ image : device image name
+ line_password : telnet pass
+ en_password : enable pass
+ mgmt_interface : GigabitEthernet0/0/0
+ clean_config : path to clean_config file
+ intf_masking : 255.255.255.0
+ ipv6_mask : 64
+ interfaces :
+ - client :
+ name : GigabitEthernet0/0/1
+ src_mac_addr : 0000.0001.0000
+ dest_mac_addr : 0000.1000.0000
+ server :
+ name : GigabitEthernet0/0/2
+ src_mac_addr : 0000.0002.0000
+ dest_mac_addr : 0000.2000.0000
+ vrf_name : null
+ - client :
+ name : GigabitEthernet0/0/3
+ src_mac_addr : 0000.0003.0000
+ dest_mac_addr : 0000.3000.0000
+ server :
+ name : GigabitEthernet0/0/4
+ src_mac_addr : 0000.0004.0000
+ dest_mac_addr : 0000.4000.0000
+ vrf_name : dup
+
+
+tftp:
+ hostname : tftp hostname
+ ip_address : tftp ip
+ root_dir : tftp root dir
+ images_path : path related to root dir
diff --git a/scripts/automation/regression/functional_tests/functional_general_test.py b/scripts/automation/regression/functional_tests/functional_general_test.py
new file mode 100755
index 00000000..525b58d2
--- /dev/null
+++ b/scripts/automation/regression/functional_tests/functional_general_test.py
@@ -0,0 +1,22 @@
+#!/router/bin/python
+
+from nose.tools import assert_equal
+from nose.tools import assert_not_equal
+from nose.tools import assert_raises
+from nose.tools import raises
+
+
+class CGeneralFunctional_Test(object):
+ def __init__(self):
+ pass
+
+
+ def setUp(self):
+ pass
+
+
+ def tearDown(self):
+ pass
+
+if __name__ == "__main__":
+ pass
diff --git a/scripts/automation/regression/functional_tests/golden/basic_imix_golden.cap b/scripts/automation/regression/functional_tests/golden/basic_imix_golden.cap
new file mode 100644
index 00000000..6ca32299
--- /dev/null
+++ b/scripts/automation/regression/functional_tests/golden/basic_imix_golden.cap
Binary files differ
diff --git a/scripts/automation/regression/functional_tests/golden/basic_imix_vm_golden.cap b/scripts/automation/regression/functional_tests/golden/basic_imix_vm_golden.cap
new file mode 100644
index 00000000..43ae2368
--- /dev/null
+++ b/scripts/automation/regression/functional_tests/golden/basic_imix_vm_golden.cap
Binary files differ
diff --git a/scripts/automation/regression/functional_tests/golden/basic_tuple_gen_golden.cap b/scripts/automation/regression/functional_tests/golden/basic_tuple_gen_golden.cap
new file mode 100644
index 00000000..7d5e7ec2
--- /dev/null
+++ b/scripts/automation/regression/functional_tests/golden/basic_tuple_gen_golden.cap
Binary files differ
diff --git a/scripts/automation/regression/functional_tests/golden/udp_590.cap b/scripts/automation/regression/functional_tests/golden/udp_590.cap
new file mode 100644
index 00000000..29302f22
--- /dev/null
+++ b/scripts/automation/regression/functional_tests/golden/udp_590.cap
Binary files differ
diff --git a/scripts/automation/regression/functional_tests/hltapi_stream_builder_test.py b/scripts/automation/regression/functional_tests/hltapi_stream_builder_test.py
new file mode 100755
index 00000000..c6b477aa
--- /dev/null
+++ b/scripts/automation/regression/functional_tests/hltapi_stream_builder_test.py
@@ -0,0 +1,629 @@
+#!/router/bin/python
+
+import os
+import unittest
+from trex_stl_lib.trex_stl_hltapi import STLHltStream
+from trex_stl_lib.trex_stl_types import validate_type
+from nose.plugins.attrib import attr
+from nose.tools import nottest
+
+def compare_yamls(yaml1, yaml2):
+ validate_type('yaml1', yaml1, str)
+ validate_type('yaml2', yaml2, str)
+ i = 0
+ for line1, line2 in zip(yaml1.strip().split('\n'), yaml2.strip().split('\n')):
+ i += 1
+ assert line1 == line2, 'yamls are not equal starting from line %s:\n%s\n Golden <-> Generated\n%s' % (i, line1.strip(), line2.strip())
+
+# TODO: move the tests to compare pcaps, not yamls
+@nottest
+class CTRexHltApi_Test(unittest.TestCase):
+ ''' Checks correct HLTAPI creation of packet/VM '''
+
+ def setUp(self):
+ self.golden_yaml = None
+ self.test_yaml = None
+
+ def tearDown(self):
+ compare_yamls(self.golden_yaml, self.test_yaml)
+
+ # Eth/IP/TCP, all values default, no VM instructions + test MACs correction
+ def test_hlt_basic(self):
+ STLHltStream(mac_src = 'a0:00:01:::01', mac_dst = '0d 00 01 00 00 01',
+ mac_src2 = '{00 b0 01 00 00 01}', mac_dst2 = 'd0.00.01.00.00.01')
+ with self.assertRaises(Exception):
+ STLHltStream(mac_src2 = '00:00:00:00:00:0k')
+ with self.assertRaises(Exception):
+ STLHltStream(mac_dst2 = '100:00:00:00:00:00')
+ # wrong encap
+ with self.assertRaises(Exception):
+ STLHltStream(l2_encap = 'ethernet_sdfgsdfg')
+ # all default values
+ test_stream = STLHltStream()
+ self.test_yaml = test_stream.dump_to_yaml(self.yaml_save_location())
+ self.golden_yaml = '''
+- stream:
+ action_count: 0
+ enabled: true
+ flags: 3
+ isg: 0.0
+ mode:
+ percentage: 10.0
+ type: continuous
+ packet:
+ binary: AAAAAAAAAAABAAABCABFAAAyAAAAAEAGusUAAAAAwAAAAQQAAFAAAAABAAAAAVAAD+U1/QAAISEhISEhISEhIQ==
+ meta: ''
+ flow_stats:
+ enabled: false
+ self_start: true
+ vm:
+ instructions: []
+ split_by_var: ''
+'''
+
+ # Eth/IP/TCP, test MAC fields VM, wait for masking of variables for MAC
+ @nottest
+ def test_macs_vm(self):
+ test_stream = STLHltStream(name = 'stream-0', )
+ self.test_yaml = test_stream.dump_to_yaml(self.yaml_save_location())
+ self.golden_yaml = '''
+TBD
+'''
+
+
+ # Eth/IP/TCP, ip src and dest is changed by VM
+ def test_ip_ranges(self):
+ # running on single core not implemented yet
+ with self.assertRaises(Exception):
+ test_stream = STLHltStream(split_by_cores = 'single',
+ ip_src_addr = '192.168.1.1',
+ ip_src_mode = 'increment',
+ ip_src_count = 5,)
+ # wrong type
+ with self.assertRaises(Exception):
+ test_stream = STLHltStream(split_by_cores = 12345,
+ ip_src_addr = '192.168.1.1',
+ ip_src_mode = 'increment',
+ ip_src_count = 5,)
+
+ test_stream = STLHltStream(split_by_cores = 'duplicate',
+ ip_src_addr = '192.168.1.1',
+ ip_src_mode = 'increment',
+ ip_src_count = 5,
+ ip_dst_addr = '5.5.5.5',
+ ip_dst_count = 2,
+ ip_dst_mode = 'random',
+ name = 'test_ip_ranges',
+ rate_pps = 1)
+ self.test_yaml = test_stream.dump_to_yaml(self.yaml_save_location())
+ self.golden_yaml = '''
+- name: test_ip_ranges
+ stream:
+ action_count: 0
+ enabled: true
+ flags: 3
+ isg: 0.0
+ mode:
+ pps: 1.0
+ type: continuous
+ packet:
+ binary: AAAAAAAAAAABAAABCABFAAAyAAAAAEAGrxPAqAEBBQUFBQQAAFAAAAABAAAAAVAAD+UqSwAAISEhISEhISEhIQ==
+ meta: ''
+ flow_stats:
+ enabled: false
+ self_start: true
+ vm:
+ instructions:
+ - init_value: 0
+ max_value: 4
+ min_value: 0
+ name: inc_4_4_1
+ op: inc
+ size: 4
+ step: 1
+ type: flow_var
+ - add_value: 3232235777
+ is_big_endian: true
+ name: inc_4_4_1
+ pkt_offset: 26
+ type: write_flow_var
+ - init_value: 0
+ max_value: 4294967295
+ min_value: 0
+ name: ip_dst_random
+ op: random
+ size: 4
+ step: 1
+ type: flow_var
+ - add_value: 0
+ is_big_endian: true
+ name: ip_dst_random
+ pkt_offset: 30
+ type: write_flow_var
+ - pkt_offset: 14
+ type: fix_checksum_ipv4
+ split_by_var: ''
+'''
+
+ # Eth / IP / TCP, tcp ports are changed by VM
+ def test_tcp_ranges(self):
+ test_stream = STLHltStream(tcp_src_port_mode = 'decrement',
+ tcp_src_port_count = 10,
+ tcp_dst_port_mode = 'random',
+ tcp_dst_port_count = 10,
+ tcp_dst_port = 1234,
+ name = 'test_tcp_ranges',
+ rate_pps = '2')
+ self.test_yaml = test_stream.dump_to_yaml(self.yaml_save_location())
+ self.golden_yaml = '''
+- name: test_tcp_ranges
+ stream:
+ action_count: 0
+ enabled: true
+ flags: 3
+ isg: 0.0
+ mode:
+ pps: 2.0
+ type: continuous
+ packet:
+ binary: AAAAAAAAAAABAAABCABFAAAyAAAAAEAGusUAAAAAwAAAAQQABNIAAAABAAAAAVAAD+UxewAAISEhISEhISEhIQ==
+ meta: ''
+ flow_stats:
+ enabled: false
+ self_start: true
+ vm:
+ instructions:
+ - init_value: 9
+ max_value: 9
+ min_value: 0
+ name: dec_2_9_1
+ op: dec
+ size: 2
+ step: 1
+ type: flow_var
+ - add_value: 1015
+ is_big_endian: true
+ name: dec_2_9_1
+ pkt_offset: 34
+ type: write_flow_var
+ - init_value: 0
+ max_value: 65535
+ min_value: 0
+ name: tcp_dst_random
+ op: random
+ size: 2
+ step: 1
+ type: flow_var
+ - add_value: 0
+ is_big_endian: true
+ name: tcp_dst_random
+ pkt_offset: 36
+ type: write_flow_var
+ - pkt_offset: 14
+ type: fix_checksum_ipv4
+ split_by_var: dec_2_9_1
+'''
+
+ # Eth / IP / UDP, udp ports are changed by VM
+ def test_udp_ranges(self):
+ # UDP is not set, expecting ignore of wrong UDP arguments
+ STLHltStream(udp_src_port_mode = 'qwerqwer',
+ udp_src_port_count = 'weqwer',
+ udp_src_port = 'qwerqwer',
+ udp_dst_port_mode = 'qwerqwe',
+ udp_dst_port_count = 'sfgsdfg',
+ udp_dst_port = 'sdfgsdfg')
+ # UDP is set, expecting fail due to wrong UDP arguments
+ with self.assertRaises(Exception):
+ STLHltStream(l4_protocol = 'udp',
+ udp_src_port_mode = 'qwerqwer',
+ udp_src_port_count = 'weqwer',
+ udp_src_port = 'qwerqwer',
+ udp_dst_port_mode = 'qwerqwe',
+ udp_dst_port_count = 'sfgsdfg',
+ udp_dst_port = 'sdfgsdfg')
+ # generate it already with correct arguments
+ test_stream = STLHltStream(l4_protocol = 'udp',
+ udp_src_port_mode = 'decrement',
+ udp_src_port_count = 10,
+ udp_src_port = 1234,
+ udp_dst_port_mode = 'increment',
+ udp_dst_port_count = 10,
+ udp_dst_port = 1234,
+ name = 'test_udp_ranges',
+ rate_percent = 20,)
+ self.test_yaml = test_stream.dump_to_yaml(self.yaml_save_location())
+ self.golden_yaml = '''
+- name: test_udp_ranges
+ stream:
+ action_count: 0
+ enabled: true
+ flags: 3
+ isg: 0.0
+ mode:
+ percentage: 20.0
+ type: continuous
+ packet:
+ binary: AAAAAAAAAAABAAABCABFAAAyAAAAAEARuroAAAAAwAAAAQTSBNIAHsmgISEhISEhISEhISEhISEhISEhISEhIQ==
+ meta: ''
+ flow_stats:
+ enabled: false
+ self_start: true
+ vm:
+ instructions:
+ - init_value: 9
+ max_value: 9
+ min_value: 0
+ name: dec_2_9_1
+ op: dec
+ size: 2
+ step: 1
+ type: flow_var
+ - add_value: 1225
+ is_big_endian: true
+ name: dec_2_9_1
+ pkt_offset: 34
+ type: write_flow_var
+ - init_value: 0
+ max_value: 9
+ min_value: 0
+ name: inc_2_9_1
+ op: inc
+ size: 2
+ step: 1
+ type: flow_var
+ - add_value: 1234
+ is_big_endian: true
+ name: inc_2_9_1
+ pkt_offset: 36
+ type: write_flow_var
+ - pkt_offset: 14
+ type: fix_checksum_ipv4
+ split_by_var: dec_2_9_1
+'''
+
+ # Eth/IP/TCP, packet length is changed in VM by frame_size
+ def test_pkt_len_by_framesize(self):
+ # just check errors, no compare to golden
+ STLHltStream(length_mode = 'increment',
+ frame_size_min = 100,
+ frame_size_max = 3000)
+ test_stream = STLHltStream(length_mode = 'decrement',
+ frame_size_min = 100,
+ frame_size_max = 3000,
+ name = 'test_pkt_len_by_framesize',
+ rate_bps = 1000)
+ self.test_yaml = test_stream.dump_to_yaml(self.yaml_save_location())
+ self.golden_yaml = '''
+- name: test_pkt_len_by_framesize
+ stream:
+ action_count: 0
+ enabled: true
+ flags: 3
+ isg: 0.0
+ mode:
+ bps_L2: 1000.0
+ type: continuous
+ packet:
+ binary: AAAAAAAAAAABAAABCABFAAuqAAAAAEAGr00AAAAAwAAAAQQAAFAAAAABAAAAAVAAD+UwiwAAISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEh
+ meta: ''
+ flow_stats:
+ enabled: false
+ self_start: true
+ vm:
+ instructions:
+ - init_value: 3000
+ max_value: 3000
+ min_value: 100
+ name: pkt_len
+ op: dec
+ size: 2
+ step: 1
+ type: flow_var
+ - name: pkt_len
+ type: trim_pkt_size
+ - add_value: -14
+ is_big_endian: true
+ name: pkt_len
+ pkt_offset: 16
+ type: write_flow_var
+ - pkt_offset: 14
+ type: fix_checksum_ipv4
+ split_by_var: pkt_len
+'''
+
+ # Eth/IP/UDP, packet length is changed in VM by l3_length
+ def test_pkt_len_by_l3length(self):
+ test_stream = STLHltStream(l4_protocol = 'udp',
+ length_mode = 'random',
+ l3_length_min = 100,
+ l3_length_max = 400,
+ name = 'test_pkt_len_by_l3length')
+ self.test_yaml = test_stream.dump_to_yaml(self.yaml_save_location())
+ self.golden_yaml = '''
+- name: test_pkt_len_by_l3length
+ stream:
+ action_count: 0
+ enabled: true
+ flags: 3
+ isg: 0.0
+ mode:
+ percentage: 10.0
+ type: continuous
+ packet:
+ binary: AAAAAAAAAAABAAABCABFAAGQAAAAAEARuVwAAAAAwAAAAQQAAFABfCaTISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEh
+ meta: ''
+ flow_stats:
+ enabled: false
+ self_start: true
+ vm:
+ instructions:
+ - init_value: 114
+ max_value: 414
+ min_value: 114
+ name: pkt_len
+ op: random
+ size: 2
+ step: 1
+ type: flow_var
+ - name: pkt_len
+ type: trim_pkt_size
+ - add_value: -14
+ is_big_endian: true
+ name: pkt_len
+ pkt_offset: 16
+ type: write_flow_var
+ - add_value: -34
+ is_big_endian: true
+ name: pkt_len
+ pkt_offset: 38
+ type: write_flow_var
+ - pkt_offset: 14
+ type: fix_checksum_ipv4
+ split_by_var: ''
+'''
+
+ # Eth/IP/TCP, with vlan, no VM
+ def test_vlan_basic(self):
+ with self.assertRaises(Exception):
+ STLHltStream(l2_encap = 'ethernet_ii',
+ vlan_id = 'sdfgsdgf')
+ test_stream = STLHltStream(l2_encap = 'ethernet_ii')
+ assert ':802.1Q:' not in test_stream.get_pkt_type(), 'Default packet should not include dot1q'
+
+ test_stream = STLHltStream(name = 'test_vlan_basic', l2_encap = 'ethernet_ii_vlan')
+ assert ':802.1Q:' in test_stream.get_pkt_type(), 'No dot1q in packet with encap ethernet_ii_vlan'
+ self.test_yaml = test_stream.dump_to_yaml(self.yaml_save_location())
+ self.golden_yaml = '''
+- name: test_vlan_basic
+ stream:
+ action_count: 0
+ enabled: true
+ flags: 3
+ isg: 0.0
+ mode:
+ percentage: 10.0
+ type: continuous
+ packet:
+ binary: AAAAAAAAAAABAAABgQAwAAgARQAALgAAAABABrrJAAAAAMAAAAEEAABQAAAAAQAAAAFQAA/leEMAACEhISEhIQ==
+ meta: ''
+ flow_stats:
+ enabled: false
+ self_start: true
+ vm:
+ instructions: []
+ split_by_var: ''
+'''
+
+ # Eth/IP/TCP, with 4 vlan
+ def test_vlan_multiple(self):
+ # default frame size should be not enough
+ with self.assertRaises(Exception):
+ STLHltStream(vlan_id = [1, 2, 3, 4])
+ test_stream = STLHltStream(name = 'test_vlan_multiple', frame_size = 100,
+ vlan_id = [1, 2, 3, 4], # can be either array or string separated by spaces
+ vlan_protocol_tag_id = '8100 0x8100')
+ pkt_layers = test_stream.get_pkt_type()
+ assert '802.1Q:' * 4 in pkt_layers, 'No four dot1q layers in packet: %s' % pkt_layers
+ self.test_yaml = test_stream.dump_to_yaml(self.yaml_save_location())
+ self.golden_yaml = '''
+- name: test_vlan_multiple
+ stream:
+ action_count: 0
+ enabled: true
+ flags: 3
+ isg: 0.0
+ mode:
+ percentage: 10.0
+ type: continuous
+ packet:
+ binary: AAAAAAAAAAABAAABgQAwAYEAMAKBADADgQAwBAgARQAARgAAAABABrqxAAAAAMAAAAEEAABQAAAAAQAAAAFQAA/l6p0AACEhISEhISEhISEhISEhISEhISEhISEhISEhISEhIQ==
+ meta: ''
+ flow_stats:
+ enabled: false
+ self_start: true
+ vm:
+ instructions: []
+ split_by_var: ''
+'''
+
+ # Eth/IP/TCP, with 5 vlans and VMs on vlan_id
+ def test_vlan_vm(self):
+ test_stream = STLHltStream(name = 'test_vlan_vm', frame_size = 100,
+ vlan_id = '1 2 1000 4 5', # 5 vlans
+ vlan_id_mode = 'increment fixed decrement random', # 5th vlan will be default fixed
+ vlan_id_step = 2, # 1st vlan step will be 2, others - default 1
+ vlan_id_count = [4, 1, 10], # 4th independent on count, 5th will be fixed
+ )
+ pkt_layers = test_stream.get_pkt_type()
+ self.test_yaml = test_stream.dump_to_yaml(self.yaml_save_location())
+ assert '802.1Q:' * 5 in pkt_layers, 'No five dot1q layers in packet: %s' % pkt_layers
+ self.golden_yaml = '''
+- name: test_vlan_vm
+ stream:
+ action_count: 0
+ enabled: true
+ flags: 3
+ isg: 0.0
+ mode:
+ percentage: 10.0
+ type: continuous
+ packet:
+ binary: AAAAAAAAAAABAAABgQAwAYEAMAKBADPogQAwBIEAMAUIAEUAAEIAAAAAQAa6tQAAAADAAAABBAAAUAAAAAEAAAABUAAP5SzkAAAhISEhISEhISEhISEhISEhISEhISEhISEhIQ==
+ meta: ''
+ flow_stats:
+ enabled: false
+ self_start: true
+ vm:
+ instructions:
+ - init_value: 0
+ max_value: 6
+ min_value: 0
+ name: dec_2_3_2
+ op: inc
+ size: 2
+ step: 2
+ type: flow_var
+ - add_value: 1
+ is_big_endian: true
+ mask: 4095
+ name: dec_2_3_2
+ pkt_cast_size: 2
+ pkt_offset: 14
+ shift: 0
+ type: write_mask_flow_var
+ - init_value: 9
+ max_value: 9
+ min_value: 0
+ name: dec_2_9_1
+ op: dec
+ size: 2
+ step: 1
+ type: flow_var
+ - add_value: 991
+ is_big_endian: true
+ mask: 4095
+ name: dec_2_9_1
+ pkt_cast_size: 2
+ pkt_offset: 22
+ shift: 0
+ type: write_mask_flow_var
+ - init_value: 0
+ max_value: 65535
+ min_value: 0
+ name: vlan_id_random
+ op: random
+ size: 2
+ step: 1
+ type: flow_var
+ - add_value: 0
+ is_big_endian: true
+ mask: 4095
+ name: vlan_id_random
+ pkt_cast_size: 2
+ pkt_offset: 26
+ shift: 0
+ type: write_mask_flow_var
+ split_by_var: dec_2_9_1
+'''
+
+
+ # Eth/IPv6/TCP, no VM
+ def test_ipv6_basic(self):
+ # default frame size should be not enough
+ with self.assertRaises(Exception):
+ STLHltStream(l3_protocol = 'ipv6')
+ # error should not affect
+ STLHltStream(ipv6_src_addr = 'asdfasdfasgasdf')
+ # error should affect
+ with self.assertRaises(Exception):
+ STLHltStream(l3_protocol = 'ipv6', ipv6_src_addr = 'asdfasdfasgasdf')
+ test_stream = STLHltStream(name = 'test_ipv6_basic', l3_protocol = 'ipv6', length_mode = 'fixed', l3_length = 150, )
+ self.test_yaml = test_stream.dump_to_yaml(self.yaml_save_location())
+ self.golden_yaml = '''
+- name: test_ipv6_basic
+ stream:
+ action_count: 0
+ enabled: true
+ flags: 3
+ isg: 0.0
+ mode:
+ percentage: 10.0
+ type: continuous
+ packet:
+ binary: AAAAAAAAAAABAAABht1gAAAAAG4GQP6AAAAAAAAAAAAAAAAAABL+gAAAAAAAAAAAAAAAAAAiBAAAUAAAAAEAAAABUAAP5ctLAAAhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISE=
+ meta: ''
+ flow_stats:
+ enabled: false
+ self_start: true
+ vm:
+ instructions: []
+ split_by_var: ''
+'''
+
+ # Eth/IPv6/UDP, VM on ipv6 fields
+ def test_ipv6_src_dst_ranges(self):
+ test_stream = STLHltStream(name = 'test_ipv6_src_dst_ranges', l3_protocol = 'ipv6', l3_length = 150, l4_protocol = 'udp',
+ ipv6_src_addr = '1111:2222:3333:4444:5555:6666:7777:8888',
+ ipv6_dst_addr = '1111:1111:1111:1111:1111:1111:1111:1111',
+ ipv6_src_mode = 'increment', ipv6_src_step = 5, ipv6_src_count = 10,
+ ipv6_dst_mode = 'decrement', ipv6_dst_step = '1111:1111:1111:1111:1111:1111:0000:0011', ipv6_dst_count = 150,
+ )
+ self.test_yaml = test_stream.dump_to_yaml(self.yaml_save_location())
+ self.golden_yaml = '''
+- name: test_ipv6_src_dst_ranges
+ stream:
+ action_count: 0
+ enabled: true
+ flags: 3
+ isg: 0.0
+ mode:
+ percentage: 10.0
+ type: continuous
+ packet:
+ binary: AAAAAAAAAAABAAABht1gAAAAAG4RQBERIiIzM0REVVVmZnd3iIgRERERERERERERERERERERBAAAUABucjohISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISEhISE=
+ meta: ''
+ flow_stats:
+ enabled: false
+ self_start: true
+ vm:
+ instructions:
+ - init_value: 0
+ max_value: 45
+ min_value: 0
+ name: inc_4_9_5
+ op: inc
+ size: 4
+ step: 5
+ type: flow_var
+ - add_value: 2004322440
+ is_big_endian: true
+ name: inc_4_9_5
+ pkt_offset: 34
+ type: write_flow_var
+ - init_value: 2533
+ max_value: 2533
+ min_value: 0
+ name: dec_4_149_17
+ op: dec
+ size: 4
+ step: 17
+ type: flow_var
+ - add_value: 286328620
+ is_big_endian: true
+ name: dec_4_149_17
+ pkt_offset: 50
+ type: write_flow_var
+ split_by_var: dec_4_149_17
+'''
+
+
+
+
+
+ def yaml_save_location(self):
+ #return os.devnull
+ # debug/deveopment, comment line above
+ return '/tmp/%s.yaml' % self._testMethodName
+
+
diff --git a/scripts/automation/regression/functional_tests/misc_methods_test.py b/scripts/automation/regression/functional_tests/misc_methods_test.py
new file mode 100755
index 00000000..096f86d8
--- /dev/null
+++ b/scripts/automation/regression/functional_tests/misc_methods_test.py
@@ -0,0 +1,61 @@
+#!/router/bin/python
+
+import functional_general_test
+import misc_methods
+from nose.tools import assert_equal
+from nose.tools import assert_not_equal
+from nose.tools import assert_raises
+from nose.tools import raises
+
+
+class MiscMethods_Test(functional_general_test.CGeneralFunctional_Test):
+
+ def setUp(self):
+ self.ipv4_gen = misc_methods.get_network_addr()
+ self.ipv6_gen = misc_methods.get_network_addr(ip_type = 'ipv6')
+ pass
+
+ def test_ipv4_gen(self):
+ for i in range(1, 255):
+ assert_equal( next(self.ipv4_gen), [".".join( map(str, [1, 1, i, 0])), '255.255.255.0'] )
+
+ def test_ipv6_gen(self):
+ tmp_ipv6_addr = ['2001', 'DB8', 0, '2222', 0, 0, 0, 0]
+ for i in range(0, 255):
+ tmp_ipv6_addr[2] = hex(i)[2:]
+ assert_equal( next(self.ipv6_gen), ":".join( map(str, tmp_ipv6_addr)) )
+
+ def test_get_ipv4_client_addr(self):
+ tmp_ipv4_addr = next(self.ipv4_gen)[0]
+ assert_equal ( misc_methods.get_single_net_client_addr(tmp_ipv4_addr), '1.1.1.1')
+ assert_raises (ValueError, misc_methods.get_single_net_client_addr, tmp_ipv4_addr, {'3' : 255} )
+
+ def test_get_ipv6_client_addr(self):
+ tmp_ipv6_addr = next(self.ipv6_gen)
+ assert_equal ( misc_methods.get_single_net_client_addr(tmp_ipv6_addr, {'7' : 1}, ip_type = 'ipv6'), '2001:DB8:0:2222:0:0:0:1')
+ assert_equal ( misc_methods.get_single_net_client_addr(tmp_ipv6_addr, {'7' : 2}, ip_type = 'ipv6'), '2001:DB8:0:2222:0:0:0:2')
+ assert_raises (ValueError, misc_methods.get_single_net_client_addr, tmp_ipv6_addr, {'7' : 70000} )
+
+
+ @raises(ValueError)
+ def test_ipv4_client_addr_exception(self):
+ tmp_ipv4_addr = next(self.ipv4_gen)[0]
+ misc_methods.get_single_net_client_addr(tmp_ipv4_addr, {'4' : 1})
+
+ @raises(ValueError)
+ def test_ipv6_client_addr_exception(self):
+ tmp_ipv6_addr = next(self.ipv6_gen)
+ misc_methods.get_single_net_client_addr(tmp_ipv6_addr, {'8' : 1}, ip_type = 'ipv6')
+
+ @raises(StopIteration)
+ def test_gen_ipv4_to_limit (self):
+ while(True):
+ next(self.ipv4_gen)
+
+ @raises(StopIteration)
+ def test_gen_ipv6_to_limit (self):
+ while(True):
+ next(self.ipv6_gen)
+
+ def tearDown(self):
+ pass
diff --git a/scripts/automation/regression/functional_tests/pkt_bld_general_test.py b/scripts/automation/regression/functional_tests/pkt_bld_general_test.py
new file mode 100755
index 00000000..5f89eaff
--- /dev/null
+++ b/scripts/automation/regression/functional_tests/pkt_bld_general_test.py
@@ -0,0 +1,28 @@
+#!/router/bin/python
+
+from nose.tools import assert_equal
+from nose.tools import assert_not_equal
+from nose.tools import assert_raises
+from nose.tools import raises
+import sys
+import outer_packages
+
+
+class CGeneralPktBld_Test(object):
+ def __init__(self):
+ pass
+
+ @staticmethod
+ def print_packet(pkt_obj):
+ print "\nGenerated packet:\n{}".format(repr(pkt_obj))
+
+
+ def setUp(self):
+ pass
+
+
+ def tearDown(self):
+ pass
+
+if __name__ == "__main__":
+ pass
diff --git a/scripts/automation/regression/functional_tests/platform_cmd_cache_test.py b/scripts/automation/regression/functional_tests/platform_cmd_cache_test.py
new file mode 100755
index 00000000..24ccf7a5
--- /dev/null
+++ b/scripts/automation/regression/functional_tests/platform_cmd_cache_test.py
@@ -0,0 +1,60 @@
+#!/router/bin/python
+
+from platform_cmd_link import *
+import functional_general_test
+from nose.tools import assert_equal
+from nose.tools import assert_not_equal
+
+
+class CCommandCache_Test(functional_general_test.CGeneralFunctional_Test):
+
+ def setUp(self):
+ self.cache = CCommandCache()
+ self.cache.add('IF', "ip nbar protocol-discovery", 'GigabitEthernet0/0/1')
+ self.cache.add('IF', "ip nbar protocol-discovery", 'GigabitEthernet0/0/2')
+ self.cache.add('conf', "arp 1.1.1.1 0000.0001.0000 arpa")
+ self.cache.add('conf', "arp 1.1.2.1 0000.0002.0000 arpa")
+ self.cache.add('exec', "show ip nbar protocol-discovery stats packet-count")
+
+ def test_add(self):
+ assert_equal(self.cache.cache['IF'],
+ {'GigabitEthernet0/0/1' : ['ip nbar protocol-discovery'],
+ 'GigabitEthernet0/0/2' : ['ip nbar protocol-discovery']
+ })
+ assert_equal(self.cache.cache['CONF'],
+ ["arp 1.1.1.1 0000.0001.0000 arpa",
+ "arp 1.1.2.1 0000.0002.0000 arpa"]
+ )
+ assert_equal(self.cache.cache['EXEC'],
+ ["show ip nbar protocol-discovery stats packet-count"])
+
+ def test_dump_config (self):
+ import sys
+ from StringIO import StringIO
+ saved_stdout = sys.stdout
+ try:
+ out = StringIO()
+ sys.stdout = out
+ self.cache.dump_config()
+ output = out.getvalue().strip()
+ assert_equal(output,
+ "configure terminal\ninterface GigabitEthernet0/0/1\nip nbar protocol-discovery\ninterface GigabitEthernet0/0/2\nip nbar protocol-discovery\nexit\narp 1.1.1.1 0000.0001.0000 arpa\narp 1.1.2.1 0000.0002.0000 arpa\nexit\nshow ip nbar protocol-discovery stats packet-count"
+ )
+ finally:
+ sys.stdout = saved_stdout
+
+ def test_get_config_list (self):
+ assert_equal(self.cache.get_config_list(),
+ ["configure terminal", "interface GigabitEthernet0/0/1", "ip nbar protocol-discovery", "interface GigabitEthernet0/0/2", "ip nbar protocol-discovery", "exit", "arp 1.1.1.1 0000.0001.0000 arpa", "arp 1.1.2.1 0000.0002.0000 arpa", "exit", "show ip nbar protocol-discovery stats packet-count"]
+ )
+
+ def test_clear_cache (self):
+ self.cache.clear_cache()
+ assert_equal(self.cache.cache,
+ {"IF" : {},
+ "CONF" : [],
+ "EXEC" : []}
+ )
+
+ def tearDown(self):
+ self.cache.clear_cache()
diff --git a/scripts/automation/regression/functional_tests/platform_cmd_link_test.py b/scripts/automation/regression/functional_tests/platform_cmd_link_test.py
new file mode 100755
index 00000000..7a31815b
--- /dev/null
+++ b/scripts/automation/regression/functional_tests/platform_cmd_link_test.py
@@ -0,0 +1,62 @@
+#!/router/bin/python
+
+from platform_cmd_link import *
+import functional_general_test
+from nose.tools import assert_equal
+from nose.tools import assert_not_equal
+
+
+class CCommandLink_Test(functional_general_test.CGeneralFunctional_Test):
+
+ def setUp(self):
+ self.cache = CCommandCache()
+ self.cache.add('IF', "ip nbar protocol-discovery", 'GigabitEthernet0/0/1')
+ self.cache.add('IF', "ip nbar protocol-discovery", 'GigabitEthernet0/0/2')
+ self.cache.add('conf', "arp 1.1.1.1 0000.0001.0000 arpa")
+ self.cache.add('conf', "arp 1.1.2.1 0000.0002.0000 arpa")
+ self.cache.add('exec', "show ip nbar protocol-discovery stats packet-count")
+ self.com_link = CCommandLink()
+
+ def test_transmit(self):
+ # test here future implemntatin of platform physical link
+ pass
+
+ def test_run_cached_command (self):
+ self.com_link.run_command([self.cache])
+
+ assert_equal (self.com_link.get_history(),
+ ["configure terminal", "interface GigabitEthernet0/0/1", "ip nbar protocol-discovery", "interface GigabitEthernet0/0/2", "ip nbar protocol-discovery", "exit", "arp 1.1.1.1 0000.0001.0000 arpa", "arp 1.1.2.1 0000.0002.0000 arpa", "exit", "show ip nbar protocol-discovery stats packet-count"]
+ )
+
+ self.com_link.clear_history()
+ self.com_link.run_single_command(self.cache)
+ assert_equal (self.com_link.get_history(),
+ ["configure terminal", "interface GigabitEthernet0/0/1", "ip nbar protocol-discovery", "interface GigabitEthernet0/0/2", "ip nbar protocol-discovery", "exit", "arp 1.1.1.1 0000.0001.0000 arpa", "arp 1.1.2.1 0000.0002.0000 arpa", "exit", "show ip nbar protocol-discovery stats packet-count"]
+ )
+
+ def test_run_single_command(self):
+ self.com_link.run_single_command("show ip nbar protocol-discovery stats packet-count")
+ assert_equal (self.com_link.get_history(),
+ ["show ip nbar protocol-discovery stats packet-count"]
+ )
+
+ def test_run_mixed_commands (self):
+ self.com_link.run_single_command("show ip nbar protocol-discovery stats packet-count")
+ self.com_link.run_command([self.cache])
+ self.com_link.run_command(["show ip interface brief"])
+
+ assert_equal (self.com_link.get_history(),
+ ["show ip nbar protocol-discovery stats packet-count",
+ "configure terminal", "interface GigabitEthernet0/0/1", "ip nbar protocol-discovery", "interface GigabitEthernet0/0/2", "ip nbar protocol-discovery", "exit", "arp 1.1.1.1 0000.0001.0000 arpa", "arp 1.1.2.1 0000.0002.0000 arpa", "exit", "show ip nbar protocol-discovery stats packet-count",
+ "show ip interface brief"]
+ )
+
+ def test_clear_history (self):
+ self.com_link.run_command(["show ip interface brief"])
+ self.com_link.clear_history()
+ assert_equal (self.com_link.get_history(), [])
+
+ def tearDown(self):
+ self.cache.clear_cache()
+
+
diff --git a/scripts/automation/regression/functional_tests/platform_device_cfg_test.py b/scripts/automation/regression/functional_tests/platform_device_cfg_test.py
new file mode 100755
index 00000000..3935a4c5
--- /dev/null
+++ b/scripts/automation/regression/functional_tests/platform_device_cfg_test.py
@@ -0,0 +1,20 @@
+#!/router/bin/python
+
+from platform_cmd_link import *
+import functional_general_test
+from nose.tools import assert_equal
+from nose.tools import assert_not_equal
+
+
+class CDeviceCfg_Test(functional_general_test.CGeneralFunctional_Test):
+
+ def setUp(self):
+ self.dev_cfg = CDeviceCfg('./functional_tests/config.yaml')
+
+ def test_get_interfaces_cfg(self):
+ assert_equal (self.dev_cfg.get_interfaces_cfg(),
+ [{'client': {'src_mac_addr': '0000.0001.0000', 'name': 'GigabitEthernet0/0/1', 'dest_mac_addr': '0000.1000.0000'}, 'vrf_name': None, 'server': {'src_mac_addr': '0000.0002.0000', 'name': 'GigabitEthernet0/0/2', 'dest_mac_addr': '0000.2000.0000'}}, {'client': {'src_mac_addr': '0000.0003.0000', 'name': 'GigabitEthernet0/0/3', 'dest_mac_addr': '0000.3000.0000'}, 'vrf_name': 'dup', 'server': {'src_mac_addr': '0000.0004.0000', 'name': 'GigabitEthernet0/0/4', 'dest_mac_addr': '0000.4000.0000'}}]
+ )
+
+ def tearDown(self):
+ pass
diff --git a/scripts/automation/regression/functional_tests/platform_dual_if_obj_test.py b/scripts/automation/regression/functional_tests/platform_dual_if_obj_test.py
new file mode 100755
index 00000000..ff54b9ee
--- /dev/null
+++ b/scripts/automation/regression/functional_tests/platform_dual_if_obj_test.py
@@ -0,0 +1,31 @@
+#!/router/bin/python
+
+from platform_cmd_link import *
+import functional_general_test
+from nose.tools import assert_equal
+from nose.tools import assert_not_equal
+
+
+class CDualIfObj_Test(functional_general_test.CGeneralFunctional_Test):
+
+ def setUp(self):
+ self.if_1 = CIfObj('gig0/0/1', '1.1.1.1', '2001:DB8:0:2222:0:0:0:1', '0000.0001.0000', '0000.0001.0000', IFType.Client)
+ self.if_2 = CIfObj('gig0/0/2', '1.1.2.1', '2001:DB8:1:2222:0:0:0:1', '0000.0002.0000', '0000.0002.0000', IFType.Server)
+ self.if_3 = CIfObj('gig0/0/3', '1.1.3.1', '2001:DB8:2:2222:0:0:0:1', '0000.0003.0000', '0000.0003.0000', IFType.Client)
+ self.if_4 = CIfObj('gig0/0/4', '1.1.4.1', '2001:DB8:3:2222:0:0:0:1', '0000.0004.0000', '0000.0004.0000', IFType.Server)
+ self.dual_1 = CDualIfObj(None, self.if_1, self.if_2)
+ self.dual_2 = CDualIfObj('dup', self.if_3, self.if_4)
+
+ def test_id_allocation(self):
+ assert (self.dual_1.get_id() < self.dual_2.get_id() < CDualIfObj._obj_id)
+
+ def test_get_vrf_name (self):
+ assert_equal ( self.dual_1.get_vrf_name() , None )
+ assert_equal ( self.dual_2.get_vrf_name() , 'dup' )
+
+ def test_is_duplicated (self):
+ assert_equal ( self.dual_1.is_duplicated() , False )
+ assert_equal ( self.dual_2.is_duplicated() , True )
+
+ def tearDown(self):
+ pass \ No newline at end of file
diff --git a/scripts/automation/regression/functional_tests/platform_if_manager_test.py b/scripts/automation/regression/functional_tests/platform_if_manager_test.py
new file mode 100755
index 00000000..b09e8d75
--- /dev/null
+++ b/scripts/automation/regression/functional_tests/platform_if_manager_test.py
@@ -0,0 +1,40 @@
+#!/router/bin/python
+
+from platform_cmd_link import *
+import functional_general_test
+from nose.tools import assert_equal
+from nose.tools import assert_not_equal
+
+
+class CIfManager_Test(functional_general_test.CGeneralFunctional_Test):
+
+ def setUp(self):
+ self.dev_cfg = CDeviceCfg('./functional_tests/config.yaml')
+ self.if_mng = CIfManager()
+
+ # main testing method to check the entire class
+ def test_load_config (self):
+ self.if_mng.load_config(self.dev_cfg)
+
+ # check the number of items in each qeury
+ assert_equal( len(self.if_mng.get_if_list()), 4 )
+ assert_equal( len(self.if_mng.get_if_list(if_type = IFType.Client)), 2 )
+ assert_equal( len(self.if_mng.get_if_list(if_type = IFType.Client, is_duplicated = True)), 1 )
+ assert_equal( len(self.if_mng.get_if_list(if_type = IFType.Client, is_duplicated = False)), 1 )
+ assert_equal( len(self.if_mng.get_if_list(if_type = IFType.Server)), 2 )
+ assert_equal( len(self.if_mng.get_if_list(if_type = IFType.Server, is_duplicated = True)), 1 )
+ assert_equal( len(self.if_mng.get_if_list(if_type = IFType.Server, is_duplicated = False)), 1 )
+ assert_equal( len(self.if_mng.get_duplicated_if()), 2 )
+ assert_equal( len(self.if_mng.get_dual_if_list()), 2 )
+
+ # check the classification with intf name
+ assert_equal( map(CIfObj.get_name, self.if_mng.get_if_list() ), ['GigabitEthernet0/0/1','GigabitEthernet0/0/2','GigabitEthernet0/0/3','GigabitEthernet0/0/4'] )
+ assert_equal( map(CIfObj.get_name, self.if_mng.get_if_list(is_duplicated = True) ), ['GigabitEthernet0/0/3','GigabitEthernet0/0/4'] )
+ assert_equal( map(CIfObj.get_name, self.if_mng.get_if_list(is_duplicated = False) ), ['GigabitEthernet0/0/1','GigabitEthernet0/0/2'] )
+ assert_equal( map(CIfObj.get_name, self.if_mng.get_duplicated_if() ), ['GigabitEthernet0/0/3', 'GigabitEthernet0/0/4'] )
+
+ # check the classification with vrf name
+ assert_equal( map(CDualIfObj.get_vrf_name, self.if_mng.get_dual_if_list() ), [None, 'dup'] )
+
+ def tearDown(self):
+ pass
diff --git a/scripts/automation/regression/functional_tests/platform_if_obj_test.py b/scripts/automation/regression/functional_tests/platform_if_obj_test.py
new file mode 100755
index 00000000..534d4170
--- /dev/null
+++ b/scripts/automation/regression/functional_tests/platform_if_obj_test.py
@@ -0,0 +1,49 @@
+#!/router/bin/python
+
+from platform_cmd_link import *
+import functional_general_test
+from nose.tools import assert_equal
+from nose.tools import assert_not_equal
+
+
+class CIfObj_Test(functional_general_test.CGeneralFunctional_Test):
+ test_idx = 1
+
+ def setUp(self):
+ self.if_1 = CIfObj('gig0/0/1', '1.1.1.1', '2001:DB8:0:2222:0:0:0:1', '0000.0001.0000', '0000.0001.0000', IFType.Client)
+ self.if_2 = CIfObj('TenGig0/0/0', '1.1.2.1', '2001:DB8:1:2222:0:0:0:1', '0000.0002.0000', '0000.0002.0000', IFType.Server)
+ CIfObj_Test.test_idx += 1
+
+ def test_id_allocation(self):
+ assert (self.if_1.get_id() < self.if_2.get_id() < CIfObj._obj_id)
+
+ def test_isClient(self):
+ assert_equal (self.if_1.is_client(), True)
+
+ def test_isServer(self):
+ assert_equal (self.if_2.is_server(), True)
+
+ def test_get_name (self):
+ assert_equal (self.if_1.get_name(), 'gig0/0/1')
+ assert_equal (self.if_2.get_name(), 'TenGig0/0/0')
+
+ def test_get_src_mac_addr (self):
+ assert_equal (self.if_1.get_src_mac_addr(), '0000.0001.0000')
+
+ def test_get_dest_mac (self):
+ assert_equal (self.if_2.get_dest_mac(), '0000.0002.0000')
+
+ def test_get_ipv4_addr (self):
+ assert_equal (self.if_1.get_ipv4_addr(), '1.1.1.1' )
+ assert_equal (self.if_2.get_ipv4_addr(), '1.1.2.1' )
+
+ def test_get_ipv6_addr (self):
+ assert_equal (self.if_1.get_ipv6_addr(), '2001:DB8:0:2222:0:0:0:1' )
+ assert_equal (self.if_2.get_ipv6_addr(), '2001:DB8:1:2222:0:0:0:1' )
+
+ def test_get_type (self):
+ assert_equal (self.if_1.get_if_type(), IFType.Client)
+ assert_equal (self.if_2.get_if_type(), IFType.Server)
+
+ def tearDown(self):
+ pass
diff --git a/scripts/automation/regression/functional_tests/scapy_pkt_builder_test.py b/scripts/automation/regression/functional_tests/scapy_pkt_builder_test.py
new file mode 100644
index 00000000..eaff9530
--- /dev/null
+++ b/scripts/automation/regression/functional_tests/scapy_pkt_builder_test.py
@@ -0,0 +1,368 @@
+#!/router/bin/python
+
+import pkt_bld_general_test
+
+#HACK FIX ME START
+import sys
+import os
+
+CURRENT_PATH = os.path.dirname(os.path.realpath(__file__))
+sys.path.append(os.path.join(CURRENT_PATH, '../../../trex_control_plane/stl/'))
+#HACK FIX ME END
+from trex_stl_lib.trex_stl_packet_builder_scapy import *
+
+from scapy.all import *
+from nose.tools import assert_equal
+from nose.tools import assert_not_equal
+from nose.tools import assert_raises
+from nose.tools import raises
+import os
+import random
+import pprint
+
+class CTRexPktBuilderSanitySCapy_Test(pkt_bld_general_test.CGeneralPktBld_Test):
+
+ def setUp(self):
+ pass
+
+ def test_simple_vm1(self):
+ raw1 = CTRexScRaw( [ CTRexVmDescFlowVar(name="a",min_value="16.0.0.1",max_value="16.0.0.10",init_value="16.0.0.1",size=4,op="inc"),
+ CTRexVmDescWrFlowVar (fv_name="a",pkt_offset= "IP.src"),
+ CTRexVmDescFixIpv4(offset = "IP")]
+ );
+
+ pkt_builder = CScapyTRexPktBuilder();
+
+ py='5'*128
+ pkt=Ether()/ \
+ IP(src="16.0.0.1",dst="48.0.0.1")/ \
+ UDP(dport=12,sport=1025)/IP()/py
+
+ # set packet
+ pkt_builder.set_packet(pkt);
+ pkt_builder.add_command ( raw1 )
+ pkt_builder.compile();
+
+ pkt_builder.dump_scripts ()
+
+ print pkt_builder.get_vm_data()
+
+ assert_equal( pkt_builder.get_vm_data(), {'split_by_var': '', 'instructions': [{'name': 'a', 'max_value': 268435466, 'min_value': 268435457, 'init_value': 268435457, 'size': 4, 'type': 'flow_var', 'step':1,'op': 'inc'}, {'is_big_endian': True, 'pkt_offset': 26, 'type': 'write_flow_var', 'name': 'a', 'add_value': 0}, {'pkt_offset': 14, 'type': 'fix_checksum_ipv4'}]} )
+
+
+
+ def test_simple_no_vm1(self):
+
+ pkt_builder = CScapyTRexPktBuilder();
+
+ py='5'*128
+ pkt=Ether()/ \
+ IP(src="16.0.0.1",dst="48.0.0.1")/ \
+ UDP(dport=12,sport=1025)/IP()/py
+
+ # set packet
+ pkt_builder.set_packet(pkt);
+
+ pkt_builder.compile();
+
+ pkt_builder.dump_scripts ()
+
+ assert_equal( pkt_builder.get_vm_data(),
+ { 'instructions': [ ],
+ 'split_by_var': ''}
+ )
+
+
+ def test_simple_mac_default(self):
+
+ pkt = Ether()/IP()/UDP()
+
+
+ pkt_builder = CScapyTRexPktBuilder(pkt = pkt);
+
+ assert_equal( pkt_builder.is_default_src_mac () ,True)
+ assert_equal( pkt_builder.is_default_dst_mac () ,True)
+
+ pkt = Ether(src="00:00:00:00:00:01")/IP()/UDP()
+
+ pkt_builder = CScapyTRexPktBuilder(pkt = pkt);
+
+ assert_equal( pkt_builder.is_default_src_mac (), False)
+ assert_equal( pkt_builder.is_default_dst_mac (), True)
+
+ pkt = Ether(dst="00:00:00:00:00:01")/IP()/UDP()
+
+ pkt_builder = CScapyTRexPktBuilder(pkt = pkt);
+
+ assert_equal( pkt_builder.is_default_src_mac (),True)
+ assert_equal( pkt_builder.is_default_dst_mac (),False)
+
+
+
+
+ def test_simple_teredo(self):
+
+ pkt = Ether()/IP(src="16.0.0.1",dst="48.0.0.1")/UDP(dport=3797,sport=3544)/IPv6(src="2001:0:4137:9350:8000:f12a:b9c8:2815",dst="2001:4860:0:2001::68")/UDP(dport=12,sport=1025)/ICMPv6Unknown()
+
+ pkt.build();
+ p_utl=CTRexScapyPktUtl(pkt);
+
+ assert_equal( p_utl.get_field_offet_by_str("IPv6.src"), (50,16) )
+ assert_equal( p_utl.get_field_offet_by_str("IPv6.dst"), (66,16) )
+
+
+
+
+ def test_simple_scapy_vlan(self):
+
+ py='5'*(9)
+ p1=Ether(src="00:00:00:01:00:00",dst="00:00:00:01:00:00")/ \
+ Dot1Q(vlan=12)/ \
+ Dot1Q(vlan=17)/ \
+ IP(src="10.0.0.10",dst="48.0.0.1")/ \
+ UDP(dport=12,sport=1025)/py
+
+ p1.build();
+ p1.dump_layers_offset()
+ p1.show2();
+ hexdump(p1);
+ #wrpcap("ipv4_udp_9k.pcap", p1);
+
+ p_utl=CTRexScapyPktUtl(p1);
+
+ assert_equal(p_utl.get_pkt_layers(),"Ethernet:802.1Q:802.1Q:IP:UDP:Raw")
+ assert_equal(p_utl.layer_offset("802.1Q",0),14);
+ assert_equal(p_utl.layer_offset("802.1Q",1),18);
+ assert_equal(p_utl.get_field_offet_by_str("802|1Q.vlan"),(14,0));
+ assert_equal(p_utl.get_field_offet_by_str("802|1Q:1.vlan"),(18,0));
+ assert_equal(p_utl.get_field_offet_by_str("IP.src"),(34,4));
+
+ def test_simple_scapy_128_udp(self):
+ """
+ build 128 byte packet with 0x35 as pyld
+ """
+
+
+ pkt_size =128
+ p1=Ether(src="00:00:00:01:00:00",dst="00:00:00:01:00:00")/ \
+ IP(src="16.0.0.1",dst="48.0.0.1")/ \
+ UDP(dport=12,sport=1025)
+ pyld_size=pkt_size-len(p1);
+
+ pkt=p1/('5'*(pyld_size))
+
+ pkt.show2();
+ hexdump(pkt);
+ assert_equal(len(pkt),128)
+
+ def test_simple_scapy_9k_ip_len(self):
+ """
+ build 9k ipv4 len packet
+ """
+
+
+ ip_pkt_size =9*1024
+ p_l2=Ether(src="00:00:00:01:00:00",dst="00:00:00:01:00:00");
+ p_l3= IP(src="16.0.0.1",dst="48.0.0.1")/ \
+ UDP(dport=12,sport=1025)
+ pyld_size = ip_pkt_size-len(p_l3);
+
+ pkt=p_l2/p_l3/('\x55'*(pyld_size))
+
+ #pkt.show2();
+ #hexdump(pkt);
+ assert_equal(len(pkt),9*1024+14)
+
+ def test_simple_scapy_ipv6_1(self):
+ """
+ build ipv6 packet
+ """
+
+ print "start "
+ py='\x55'*(64)
+
+ p=Ether()/IPv6()/UDP(dport=12,sport=1025)/py
+ #p.build();
+ #p.dump_layers_offset()
+ hexdump(p);
+ p.show2();
+
+ p_utl=CTRexScapyPktUtl(p);
+
+ assert_equal(p_utl.get_field_offet_by_str("IPv6.src"),(22,16));
+
+
+ def test_simple_vm2(self):
+ raw1 = CTRexScRaw( [ CTRexVmDescFlowVar(name="my_valn",min_value=0,max_value=10,init_value=2,size=1,op="inc"),
+ CTRexVmDescWrFlowVar (fv_name="my_valn",pkt_offset= "802|1Q.vlan" ,offset_fixup=3) # fix the offset as valn is bitfield and not supported right now
+ ]
+ );
+
+ pkt_builder = CScapyTRexPktBuilder();
+
+ py='5'*128
+ pkt=Ether()/ \
+ Dot1Q(vlan=12)/ \
+ IP(src="16.0.0.1",dst="48.0.0.1")/ \
+ UDP(dport=12,sport=1025)/IP()/py
+
+ # set packet
+ pkt_builder.set_packet(pkt);
+ pkt_builder.add_command ( raw1 )
+ pkt_builder.compile();
+
+
+ d= pkt_builder.get_vm_data()
+ assert_equal(d['instructions'][1]['pkt_offset'],17)
+
+ def test_simple_vm3(self):
+ try:
+ raw1 = CTRexScRaw( [ CTRexVmDescFlowVar(name="my_valn",min_value=0,max_value=10,init_value=2,size=1,op="inc"),
+ CTRexVmDescWrFlowVar (fv_name="my_valn_err",pkt_offset= "802|1Q.vlan" ,offset_fixup=3) # fix the offset as valn is bitfield and not supported right now
+ ]
+ );
+
+ pkt_builder = CScapyTRexPktBuilder();
+
+ py='5'*128
+ pkt=Ether()/ \
+ Dot1Q(vlan=12)/ \
+ IP(src="16.0.0.1",dst="48.0.0.1")/ \
+ UDP(dport=12,sport=1025)/IP()/py
+
+ # set packet
+ pkt_builder.set_packet(pkt);
+ pkt_builder.add_command ( raw1 )
+ pkt_builder.compile();
+
+
+ d= pkt_builder.get_vm_data()
+ except CTRexPacketBuildException as e:
+ assert_equal(str(e), "[errcode:-11] 'variable my_valn_err does not exists '")
+
+ def test_simple_tuple_gen(self):
+ vm = CTRexScRaw( [ CTRexVmDescTupleGen (name="tuple"), # define tuple gen
+ CTRexVmDescWrFlowVar (fv_name="tuple.ip", pkt_offset= "IP.src" ), # write ip to packet IP.src
+ CTRexVmDescFixIpv4(offset = "IP"), # fix checksum
+ CTRexVmDescWrFlowVar (fv_name="tuple.port", pkt_offset= "UDP.sport" ) #write udp.port
+ ]
+ );
+ pkt_builder = CScapyTRexPktBuilder();
+
+ py='5'*128
+ pkt=Ether()/ \
+ Dot1Q(vlan=12)/ \
+ IP(src="16.0.0.1",dst="48.0.0.1")/ \
+ UDP(dport=12,sport=1025)/IP()/py
+
+ # set packet
+ pkt_builder.set_packet(pkt);
+ pkt_builder.add_command ( vm )
+ pkt_builder.compile();
+ d= pkt_builder.get_vm_data()
+ pkt_builder.dump_vm_data_as_yaml()
+
+ assert_equal(d['instructions'][1]['pkt_offset'],30)
+ assert_equal(d['instructions'][3]['pkt_offset'],38)
+
+ def test_simple_random_pkt_size(self):
+
+ ip_pkt_size = 9*1024
+ p_l2 = Ether();
+ p_l3 = IP(src="16.0.0.1",dst="48.0.0.1")
+ p_l4 = UDP(dport=12,sport=1025)
+ pyld_size = ip_pkt_size-len(p_l3/p_l4);
+
+ pkt =p_l2/p_l3/p_l4/('\x55'*(pyld_size))
+
+ l3_len_fix =-(len(p_l2));
+ l4_len_fix =-(len(p_l2/p_l3));
+
+ vm = CTRexScRaw( [ CTRexVmDescFlowVar(name="fv_rand", min_value=64, max_value=len(pkt), size=2, op="random"),
+ CTRexVmDescTrimPktSize("fv_rand"), # total packet size
+ CTRexVmDescWrFlowVar(fv_name="fv_rand", pkt_offset= "IP.len", add_val=l3_len_fix),
+ CTRexVmDescFixIpv4(offset = "IP"), # fix checksum
+ CTRexVmDescWrFlowVar(fv_name="fv_rand", pkt_offset= "UDP.len", add_val=l4_len_fix)
+ ]
+ )
+ pkt_builder = CScapyTRexPktBuilder();
+
+ # set packet
+ pkt_builder.set_packet(pkt);
+ pkt_builder.add_command ( vm )
+ pkt_builder.compile();
+ d= pkt_builder.get_vm_data()
+ pkt_builder.dump_vm_data_as_yaml()
+
+ assert_equal(d['instructions'][0]['max_value'],9230)
+ assert_equal(d['instructions'][2]['pkt_offset'],16)
+ assert_equal(d['instructions'][4]['pkt_offset'],38)
+
+ def test_simple_pkt_loader(self):
+ p=RawPcapReader("functional_tests/golden/basic_imix_golden.cap")
+ print ""
+ for pkt in p:
+ print pkt[1]
+ print hexdump(str(pkt[0]))
+ break;
+
+ def test_simple_pkt_loader1(self):
+
+ pkt_builder = CScapyTRexPktBuilder(pkt = "functional_tests/golden/udp_590.cap", build_raw = False);
+ print ""
+ pkt_builder.dump_as_hex()
+ r = pkt_builder.pkt_raw
+ assert_equal(ord(r[1]),0x50)
+ assert_equal(ord(r[0]),0x00)
+ assert_equal(ord(r[0x240]),0x16)
+ assert_equal(ord(r[0x24d]),0x79)
+ assert_equal(len(r),590)
+
+ print len(r)
+
+ def test_simple_pkt_loader2(self):
+
+ pkt_builder = CScapyTRexPktBuilder(pkt = "functional_tests/golden/basic_imix_golden.cap");
+ assert_equal(pkt_builder.pkt_layers_desc (), "Ethernet:IP:UDP:Raw");
+
+ def test_simple_pkt_loader3(self):
+
+ #pkt_builder = CScapyTRexPktBuilder(pkt = "stl/golden/basic_imix_golden.cap");
+ #r = pkt_builder.pkt_raw
+ #print ""
+ #hexdump(str(r))
+
+
+ #print pkt_builder.pkt_layers_desc ()
+
+
+ #pkt_builder.set_packet(pkt);
+
+ py='\x55'*(64)
+
+ p=Ether()/IP()/UDP(dport=12,sport=1025)/py
+ pkt_str = str(p);
+ print ""
+ hexdump(pkt_str);
+ scapy_pkt = Ether(pkt_str);
+ scapy_pkt.show2();
+
+ def tearDown(self):
+ pass
+
+
+class CTRexPktBuilderScapy_Test(pkt_bld_general_test.CGeneralPktBld_Test):
+
+ def setUp(self):
+ pass;
+ #self.pkt_bld = CTRexPktBuilder()
+ #self.pkt_bld.add_pkt_layer("l2", dpkt.ethernet.Ethernet())
+ #self.pp = pprint.PrettyPrinter(indent=4)
+
+ def tearDown(self):
+ pass
+
+
+if __name__ == "__main__":
+ pass
+
diff --git a/scripts/automation/regression/functional_tests/stl_basic_tests.py b/scripts/automation/regression/functional_tests/stl_basic_tests.py
new file mode 100644
index 00000000..ea515401
--- /dev/null
+++ b/scripts/automation/regression/functional_tests/stl_basic_tests.py
@@ -0,0 +1,263 @@
+
+import outer_packages
+from platform_cmd_link import *
+import functional_general_test
+from nose.tools import assert_equal
+from nose.tools import assert_not_equal
+from nose.tools import nottest
+from nose.plugins.attrib import attr
+from trex import CTRexScenario
+from dpkt import pcap
+from trex_stl_lib import trex_stl_sim
+from trex_stl_lib.trex_stl_streams import STLProfile
+import sys
+import os
+import subprocess
+import shlex
+from threading import Thread
+
+@attr('run_on_trex')
+class CStlBasic_Test(functional_general_test.CGeneralFunctional_Test):
+ def setUp (self):
+ self.test_path = os.path.abspath(os.getcwd())
+ self.scripts_path = CTRexScenario.scripts_path
+
+ self.verify_exists(os.path.join(self.scripts_path, "bp-sim-64-debug"))
+
+ self.stl_sim = os.path.join(self.scripts_path, "stl-sim")
+
+ self.verify_exists(self.stl_sim)
+
+ self.profiles_path = os.path.join(self.scripts_path, "stl/yaml/")
+
+ self.profiles = {}
+ self.profiles['imix_3pkt'] = os.path.join(self.profiles_path, "imix_3pkt.yaml")
+ self.profiles['imix_3pkt_vm'] = os.path.join(self.profiles_path, "imix_3pkt_vm.yaml")
+ self.profiles['random_size_9k'] = os.path.join(self.profiles_path, "../udp_rand_len_9k.py")
+ self.profiles['imix_tuple_gen'] = os.path.join(self.profiles_path, "imix_1pkt_tuple_gen.yaml")
+
+ for k, v in self.profiles.iteritems():
+ self.verify_exists(v)
+
+ self.valgrind_profiles = [ self.profiles['imix_3pkt_vm'],
+ self.profiles['random_size_9k'],
+ self.profiles['imix_tuple_gen'] ]
+
+ self.golden_path = os.path.join(self.test_path,"stl/golden/")
+
+ os.chdir(self.scripts_path)
+
+
+ def tearDown (self):
+ os.chdir(self.test_path)
+
+
+
+ def get_golden (self, name):
+ golden = os.path.join(self.golden_path, name)
+ self.verify_exists(golden)
+ return golden
+
+
+ def verify_exists (self, name):
+ if not os.path.exists(name):
+ raise Exception("cannot find '{0}'".format(name))
+
+
+ def compare_caps (self, cap1, cap2, max_diff_sec = 0.01):
+ with open(cap1, 'r') as f1:
+ reader1 = pcap.Reader(f1)
+ pkts1 = reader1.readpkts()
+
+ with open(cap2, 'r') as f2:
+ reader2 = pcap.Reader(f2)
+ pkts2 = reader2.readpkts()
+
+ assert_equal(len(pkts1), len(pkts2))
+
+ for pkt1, pkt2, i in zip(pkts1, pkts2, xrange(1, len(pkts1))):
+ ts1 = pkt1[0]
+ ts2 = pkt2[0]
+ if abs(ts1-ts2) > 0.000005: # 5 nsec
+ raise AssertionError("TS error: cap files '{0}', '{1}' differ in cap #{2} - '{3}' vs. '{4}'".format(cap1, cap2, i, ts1, ts2))
+
+ if pkt1[1] != pkt2[1]:
+ raise AssertionError("RAW error: cap files '{0}', '{1}' differ in cap #{2}".format(cap1, cap2, i))
+
+
+
+ def run_sim (self, yaml, output, options = "", silent = False, obj = None):
+ if output:
+ user_cmd = "-f {0} -o {1} {2}".format(yaml, output, options)
+ else:
+ user_cmd = "-f {0} {1}".format(yaml, options)
+
+ if silent:
+ user_cmd += " --silent"
+
+ rc = trex_stl_sim.main(args = shlex.split(user_cmd))
+ if obj:
+ obj['rc'] = (rc == 0)
+
+ return (rc == 0)
+
+
+
+ def run_py_profile_path (self, profile, options,silent = False, do_no_remove=False,compare =True, test_generated=True, do_no_remove_generated = False):
+ output_cap = "a.pcap"
+ input_file = os.path.join('stl/', profile)
+ golden_file = os.path.join('exp',os.path.basename(profile).split('.')[0]+'.pcap');
+ if os.path.exists(output_cap):
+ os.unlink(output_cap)
+ try:
+ rc = self.run_sim(input_file, output_cap, options, silent)
+ assert_equal(rc, True)
+ #s='cp '+output_cap+' '+golden_file;
+ #print s
+ #os.system(s)
+
+ if compare:
+ self.compare_caps(output_cap, golden_file)
+ finally:
+ if not do_no_remove:
+ os.unlink(output_cap)
+ if test_generated:
+ try:
+ generated_filename = input_file.replace('.py', '_GENERATED.py').replace('.yaml', '_GENERATED.py')
+ if input_file.endswith('.py'):
+ profile = STLProfile.load_py(input_file)
+ elif input_file.endswith('.yaml'):
+ profile = STLProfile.load_yaml(input_file)
+ profile.dump_to_code(generated_filename)
+
+ rc = self.run_sim(generated_filename, output_cap, options, silent)
+ assert_equal(rc, True)
+
+ if compare:
+ self.compare_caps(output_cap, golden_file)
+ except Exception as e:
+ print e
+ finally:
+ if not do_no_remove_generated:
+ os.unlink(generated_filename)
+ os.unlink(generated_filename + 'c')
+ if not do_no_remove:
+ os.unlink(output_cap)
+
+
+ def test_stl_profiles (self):
+
+ p = [
+ ["udp_1pkt_1mac_override.py","-m 1 -l 50",True],
+ ["syn_attack.py","-m 1 -l 50",True], # can't compare random now
+ ["udp_1pkt_1mac.py","-m 1 -l 50",True],
+ ["udp_1pkt_mac.py","-m 1 -l 50",True],
+ ["udp_1pkt.py","-m 1 -l 50",True],
+ ["udp_1pkt_tuple_gen.py","-m 1 -l 50",True],
+ ["udp_rand_len_9k.py","-m 1 -l 50",True], # can't do the compare
+ ["udp_1pkt_mpls.py","-m 1 -l 50",True],
+ ["udp_1pkt_mpls_vm.py","-m 1 ",True],
+ ["imix.py","-m 1 -l 100",True],
+ ["udp_inc_len_9k.py","-m 1 -l 100",True],
+ ["udp_1pkt_range_clients.py","-m 1 -l 100",True],
+ ["multi_burst_2st_1000pkt.py","-m 1 -l 100",True],
+ ["pcap.py", "-m 1", True],
+ ["pcap_with_vm.py", "-m 1", True],
+
+ # YAML test
+ ["yaml/burst_1000_pkt.yaml","-m 1 -l 100",True],
+ ["yaml/burst_1pkt_1burst.yaml","-m 1 -l 100",True],
+ ["yaml/burst_1pkt_vm.yaml","-m 1 -l 100",True],
+ ["yaml/imix_1pkt.yaml","-m 1 -l 100",True],
+ ["yaml/imix_1pkt_2.yaml","-m 1 -l 100",True],
+ ["yaml/imix_1pkt_tuple_gen.yaml","-m 1 -l 100",True],
+ ["yaml/imix_1pkt_vm.yaml","-m 1 -l 100",True],
+ ["udp_1pkt_pcap.py","-m 1 -l 10",True],
+ ["udp_3pkt_pcap.py","-m 1 -l 10",True],
+ #["udp_1pkt_simple.py","-m 1 -l 3",True],
+ ["udp_1pkt_pcap_relative_path.py","-m 1 -l 3",True],
+ ["udp_1pkt_tuple_gen_split.py","-m 1 -c 2 -l 100",True],
+ ["udp_1pkt_range_clients_split.py","-m 1 -c 2 -l 100",True],
+ ["udp_1pkt_vxlan.py","-m 1 -c 1 -l 17",True, False], # can't generate: no VXLAN in Scapy, only in profile
+ ["udp_1pkt_ipv6_in_ipv4.py","-m 1 -c 1 -l 17",True],
+ ["yaml/imix_3pkt.yaml","-m 50kpps --limit 20 --cores 2",True],
+ ["yaml/imix_3pkt_vm.yaml","-m 50kpps --limit 20 --cores 2",True],
+ ["udp_1pkt_simple_mac_dst.py","-m 1 -l 1 ",True],
+ ["udp_1pkt_simple_mac_src.py","-m 1 -l 1 ",True],
+ ["udp_1pkt_simple_mac_dst_src.py","-m 1 -l 1 ",True],
+ ["burst_3st_loop_x_times.py","-m 1 -l 20 ",True],
+ ["udp_1pkt_mac_step.py","-m 1 -l 20 ",True],
+ ["udp_1pkt_mac_mask1.py","-m 1 -l 20 ",True] ,
+ ["udp_1pkt_mac_mask2.py","-m 1 -l 20 ",True],
+ ["udp_1pkt_mac_mask3.py","-m 1 -l 20 ",True],
+ ["udp_1pkt_simple_test2.py","-m 1 -l 10 ",True], # test split of packet with ip option
+ ["udp_1pkt_simple_test.py","-m 1 -l 10 ",True],
+ ["udp_1pkt_mac_mask5.py","-m 1 -l 30 ",True],
+ ["udp_1pkt_range_clients_split_garp.py","-m 1 -l 50",True]
+
+
+ ];
+
+
+ p1 = [ ["udp_1pkt_range_clients_split_garp.py","-m 1 -l 50",True] ]
+
+
+ for obj in p:
+ try:
+ test_generated = obj[3]
+ except: # check generated if not said otherwise
+ test_generated = True
+ self.run_py_profile_path (obj[0],obj[1],compare =obj[2], test_generated = test_generated, do_no_remove=True, do_no_remove_generated = False)
+
+
+ def test_hlt_profiles (self):
+ p = (
+ ['hlt/hlt_udp_inc_dec_len_9k.py', '-m 1 -l 20', True],
+ ['hlt/hlt_imix_default.py', '-m 1 -l 20', True],
+ ['hlt/hlt_imix_4rates.py', '-m 1 -l 20', True],
+ ['hlt/hlt_david1.py', '-m 1 -l 20', True],
+ ['hlt/hlt_david2.py', '-m 1 -l 20', True],
+ ['hlt/hlt_david3.py', '-m 1 -l 20', True],
+ ['hlt/hlt_david4.py', '-m 1 -l 20', True],
+ ['hlt/hlt_wentong1.py', '-m 1 -l 20', True],
+ ['hlt/hlt_wentong2.py', '-m 1 -l 20', True],
+ ['hlt/hlt_tcp_ranges.py', '-m 1 -l 20', True],
+ ['hlt/hlt_udp_ports.py', '-m 1 -l 20', True],
+ ['hlt/hlt_udp_random_ports.py', '-m 1 -l 20', True],
+ ['hlt/hlt_ip_ranges.py', '-m 1 -l 20', True],
+ ['hlt/hlt_framesize_vm.py', '-m 1 -l 20', True],
+ ['hlt/hlt_l3_length_vm.py', '-m 1 -l 20', True],
+ ['hlt/hlt_vlan_default.py', '-m 1 -l 20', True],
+ ['hlt/hlt_4vlans.py', '-m 1 -l 20', True],
+ ['hlt/hlt_vlans_vm.py', '-m 1 -l 20', True],
+ ['hlt/hlt_ipv6_default.py', '-m 1 -l 20', True],
+ ['hlt/hlt_ipv6_ranges.py', '-m 1 -l 20', True],
+ ['hlt/hlt_mac_ranges.py', '-m 1 -l 20', True],
+ )
+
+ for obj in p:
+ self.run_py_profile_path (obj[0], obj[1], compare =obj[2], do_no_remove=True, do_no_remove_generated = False)
+
+ # valgrind tests - this runs in multi thread as it safe (no output)
+ def test_valgrind_various_profiles (self):
+
+ print "\n"
+ threads = []
+ for profile in self.valgrind_profiles:
+ print "\n*** VALGRIND: testing profile '{0}' ***\n".format(profile)
+ obj = {'t': None, 'rc': None}
+ t = Thread(target = self.run_sim,
+ kwargs = {'obj': obj, 'yaml': profile, 'output':None, 'options': "--cores 8 --limit 20 --valgrind", 'silent': True})
+ obj['t'] = t
+
+ threads.append(obj)
+ t.start()
+
+ for obj in threads:
+ obj['t'].join()
+
+ for obj in threads:
+ assert_equal(obj['rc'], True)
+
+
+
diff --git a/scripts/automation/regression/stateful_tests/tests_exceptions.py b/scripts/automation/regression/stateful_tests/tests_exceptions.py
new file mode 100755
index 00000000..604efcc8
--- /dev/null
+++ b/scripts/automation/regression/stateful_tests/tests_exceptions.py
@@ -0,0 +1,37 @@
+#!/router/bin/python
+
+class TRexInUseError(Exception):
+ def __init__(self, value):
+ self.value = value
+ def __str__(self):
+ return repr(self.value)
+
+class TRexRunFailedError(Exception):
+ def __init__(self, value):
+ self.value = value
+ def __str__(self):
+ return repr(self.value)
+
+class TRexIncompleteRunError(Exception):
+ def __init__(self, value):
+ self.value = value
+ def __str__(self):
+ return repr(self.value)
+
+class TRexLowCpuUtilError(Exception):
+ def __init__(self, value):
+ self.value = value
+ def __str__(self):
+ return repr(self.value)
+
+class AbnormalResultError(Exception):
+ def __init__(self, value):
+ self.value = value
+ def __str__(self):
+ return repr(self.value)
+
+class ClassificationMissmatchError(Exception):
+ def __init__(self, value):
+ self.value = value
+ def __str__(self):
+ return repr(self.value)
diff --git a/scripts/automation/regression/stateful_tests/trex_general_test.py b/scripts/automation/regression/stateful_tests/trex_general_test.py
new file mode 100755
index 00000000..21f5d8aa
--- /dev/null
+++ b/scripts/automation/regression/stateful_tests/trex_general_test.py
@@ -0,0 +1,319 @@
+#!/router/bin/python
+
+__copyright__ = "Copyright 2014"
+
+"""
+Name:
+ trex_general_test.py
+
+
+Description:
+
+ This script creates the functionality to test the performance of the T-Rex traffic generator
+ The tested scenario is a T-Rex TG directly connected to a Cisco router.
+
+::
+
+ Topology:
+
+ ------- --------
+ | | Tx---1gig/10gig----Rx | |
+ | T-Rex | | router |
+ | | Rx---1gig/10gig----Tx | |
+ ------- --------
+
+"""
+from nose.plugins import Plugin
+from nose.plugins.skip import SkipTest
+import trex
+from trex import CTRexScenario
+import misc_methods
+import sys
+import os
+# from CPlatformUnderTest import *
+from CPlatform import *
+import termstyle
+import threading
+from tests_exceptions import *
+from platform_cmd_link import *
+import unittest
+
+def setUpModule(module):
+ pass
+
+def tearDownModule(module):
+ pass
+
+class CTRexGeneral_Test(unittest.TestCase):
+ """This class defines the general stateful testcase of the T-Rex traffic generator"""
+ def __init__ (self, *args, **kwargs):
+ unittest.TestCase.__init__(self, *args, **kwargs)
+ if CTRexScenario.is_test_list:
+ return
+ # Point test object to scenario global object
+ self.configuration = CTRexScenario.configuration
+ self.benchmark = CTRexScenario.benchmark
+ self.trex = CTRexScenario.trex
+ self.trex_crashed = CTRexScenario.trex_crashed
+ self.modes = CTRexScenario.modes
+ self.skipping = False
+ self.fail_reasons = []
+ if not hasattr(self, 'unsupported_modes'):
+ self.unsupported_modes = []
+ self.is_loopback = True if 'loopback' in self.modes else False
+ self.is_virt_nics = True if 'virt_nics' in self.modes else False
+ self.is_VM = True if 'VM' in self.modes else False
+
+ if not CTRexScenario.is_init:
+ if self.trex: # stateful
+ CTRexScenario.trex_version = self.trex.get_trex_version()
+ if not self.is_loopback:
+ # initilize the scenario based on received configuration, once per entire testing session
+ CTRexScenario.router = CPlatform(CTRexScenario.router_cfg['silent_mode'])
+ device_cfg = CDeviceCfg()
+ device_cfg.set_platform_config(CTRexScenario.router_cfg['config_dict'])
+ device_cfg.set_tftp_config(CTRexScenario.router_cfg['tftp_config_dict'])
+ CTRexScenario.router.load_platform_data_from_file(device_cfg)
+ CTRexScenario.router.launch_connection(device_cfg)
+ running_image = CTRexScenario.router.get_running_image_details()['image']
+ print 'Current router image: %s' % running_image
+ if CTRexScenario.router_cfg['forceImageReload']:
+ needed_image = device_cfg.get_image_name()
+ if not CTRexScenario.router.is_image_matches(needed_image):
+ print 'Setting router image: %s' % needed_image
+ CTRexScenario.router.config_tftp_server(device_cfg)
+ CTRexScenario.router.load_platform_image(needed_image)
+ CTRexScenario.router.set_boot_image(needed_image)
+ CTRexScenario.router.reload_platform(device_cfg)
+ CTRexScenario.router.launch_connection(device_cfg)
+ running_image = CTRexScenario.router.get_running_image_details()['image'] # verify image
+ if not CTRexScenario.router.is_image_matches(needed_image):
+ self.fail('Unable to set router image: %s, current image is: %s' % (needed_image, running_image))
+ else:
+ print 'Matches needed image: %s' % needed_image
+ CTRexScenario.router_image = running_image
+
+ if self.modes:
+ print termstyle.green('\t!!!\tRunning with modes: %s, not suitable tests will be skipped.\t!!!' % list(self.modes))
+
+ CTRexScenario.is_init = True
+ print termstyle.green("Done instantiating T-Rex scenario!\n")
+
+# raise RuntimeError('CTRexScenario class is not initialized!')
+ self.router = CTRexScenario.router
+
+
+
+# def assert_dict_eq (self, dict, key, val, error=''):
+# v1 = int(dict[key]))
+# self.assertEqual(v1, int(val), error)
+#
+# def assert_dict_gt (self, d, key, val, error=''):
+# v1 = int(dict[key])
+# self.assert_gt(v1, int(val), error)
+
+ def assertEqual(self, v1, v2, s):
+ if v1 != v2:
+ error='ERROR '+str(v1)+' != '+str(v2)+ ' '+s;
+ self.fail(error)
+
+ def assert_gt(self, v1, v2, s):
+ if not v1 > v2:
+ error='ERROR {big} < {small} {str}'.format(big = v1, small = v2, str = s)
+ self.fail(error)
+
+ def check_results_eq (self,res,name,val):
+ if res is None:
+ self.fail('TRex results cannot be None !')
+ return
+
+ if name not in res:
+ self.fail('TRex results does not include key %s' % name)
+ return
+
+ if res[name] != float(val):
+ self.fail('TRex results[%s]==%f and not as expected %f ' % (name, res[name], val))
+
+ def check_CPU_benchmark (self, trex_res, err = 10, minimal_cpu = 30, maximal_cpu = 85):
+ #cpu_util = float(trex_res.get_last_value("trex-global.data.m_cpu_util"))
+ cpu_util = sum([float(x) for x in trex_res.get_value_list("trex-global.data.m_cpu_util")[-4:-1]]) / 3 # mean of 3 values before last
+
+ if not self.is_virt_nics:
+ if cpu_util > maximal_cpu:
+ self.fail("CPU is too high (%s%%), probably queue full." % cpu_util )
+ if cpu_util < minimal_cpu:
+ self.fail("CPU is too low (%s%%), can't verify performance in such low CPU%%." % cpu_util )
+
+ cores = self.get_benchmark_param('cores')
+ trex_tx_bps = trex_res.get_last_value("trex-global.data.m_total_tx_bytes")
+ test_norm_cpu = 100.0*(trex_tx_bps/(cores*cpu_util))/1e6
+
+ print "TRex CPU utilization: %g%%, norm_cpu is : %d Mb/core" % (round(cpu_util), int(test_norm_cpu))
+
+ #expected_norm_cpu = self.get_benchmark_param('cpu_to_core_ratio')
+
+ #calc_error_precent = abs(100.0*(test_norm_cpu/expected_norm_cpu)-100.0)
+
+# if calc_error_precent > err:
+# msg ='Normalized bandwidth to CPU utilization ratio is %2.0f Mb/core expected %2.0f Mb/core more than %2.0f %% - ERROR' % (test_norm_cpu, expected_norm_cpu, err)
+# raise AbnormalResultError(msg)
+# else:
+# msg ='Normalized bandwidth to CPU utilization ratio is %2.0f Mb/core expected %2.0f Mb/core less than %2.0f %% - OK' % (test_norm_cpu, expected_norm_cpu, err)
+# print msg
+
+
+ def check_results_gt (self, res, name, val):
+ if res is None:
+ self.fail('TRex results canot be None !')
+ return
+
+ if name not in res:
+ self.fail('TRex results does not include key %s' % name)
+ return
+
+ if res[name]< float(val):
+ self.fail('TRex results[%s]<%f and not as expected greater than %f ' % (name, res[name], val))
+
+ def check_for_trex_crash(self):
+ pass
+
+ def get_benchmark_param (self, param, sub_param = None, test_name = None):
+ if not test_name:
+ test_name = self.get_name()
+ if test_name not in self.benchmark:
+ self.skip('No data in benchmark.yaml for test: %s, param: %s. Skipping.' % (test_name, param))
+ if sub_param:
+ return self.benchmark[test_name][param].get(sub_param)
+ else:
+ return self.benchmark[test_name].get(param)
+
+ def check_general_scenario_results (self, trex_res, check_latency = True):
+
+ try:
+ # check if test is valid
+ if not trex_res.is_done_warmup():
+ self.fail('T-Rex did not reach warm-up situtaion. Results are not valid.')
+
+ # check history size is enough
+ if len(trex_res._history) < 5:
+ self.fail('T-Rex results list is too short. Increase the test duration or check unexpected stopping.')
+
+ # check T-Rex number of drops
+ trex_tx_pckt = trex_res.get_last_value("trex-global.data.m_total_tx_pkts")
+ trex_drops = trex_res.get_total_drops()
+ trex_drop_rate = trex_res.get_drop_rate()
+ if ( trex_drops > 0.001 * trex_tx_pckt) and (trex_drop_rate > 0.0): # deliberately mask kickoff drops when T-Rex first initiated
+ self.fail('Number of packet drops larger than 0.1% of all traffic')
+
+ # check queue full, queue drop, allocation error
+ m_total_alloc_error = trex_res.get_last_value("trex-global.data.m_total_alloc_error")
+ m_total_queue_full = trex_res.get_last_value("trex-global.data.m_total_queue_full")
+ m_total_queue_drop = trex_res.get_last_value("trex-global.data.m_total_queue_drop")
+ self.assert_gt(1000, m_total_alloc_error, 'Got allocation errors. (%s), please review multiplier and templates configuration.' % m_total_alloc_error)
+ self.assert_gt(1000, m_total_queue_drop, 'Too much queue_drop (%s), please review multiplier.' % m_total_queue_drop)
+
+ if self.is_VM:
+ allowed_queue_full = 10000 + trex_tx_pckt / 100
+ else:
+ allowed_queue_full = 1000 + trex_tx_pckt / 1000
+ self.assert_gt(allowed_queue_full, m_total_queue_full, 'Too much queue_full (%s), please review multiplier.' % m_total_queue_full)
+
+ # # check T-Rex expected counters
+ #trex_exp_rate = trex_res.get_expected_tx_rate().get('m_tx_expected_bps')
+ #assert trex_exp_rate is not None
+ #trex_exp_gbps = trex_exp_rate/(10**9)
+
+ if check_latency:
+ # check that max latency does not exceed 1 msec in regular setup or 100ms in VM
+ allowed_latency = 9999999 if self.is_VM else 1000
+ if max(trex_res.get_max_latency().values()) > allowed_latency:
+ self.fail('LatencyError: Maximal latency exceeds %s (usec)' % allowed_latency)
+
+ # check that avg latency does not exceed 1 msec in regular setup or 3ms in VM
+ allowed_latency = 9999999 if self.is_VM else 1000
+ if max(trex_res.get_avg_latency().values()) > allowed_latency:
+ self.fail('LatencyError: Average latency exceeds %s (usec)' % allowed_latency)
+
+ if not self.is_loopback:
+ # check router number of drops --> deliberately masked- need to be figured out!!!!!
+ pkt_drop_stats = self.router.get_drop_stats()
+# assert pkt_drop_stats['total_drops'] < 20
+
+ # check for trex-router packet consistency
+ # TODO: check if it's ok
+ print 'router drop stats: %s' % pkt_drop_stats
+ print 'TRex drop stats: %s' % trex_drops
+ #self.assertEqual(pkt_drop_stats, trex_drops, "TRex's and router's drop stats don't match.")
+
+ except KeyError as e:
+ self.fail(e)
+ #assert False
+
+ # except AssertionError as e:
+ # e.args += ('T-Rex has crashed!')
+ # raise
+
+ # We encountered error, don't fail the test immediately
+ def fail(self, reason = 'Unknown error'):
+ print 'Error: %s' % reason
+ self.fail_reasons.append(reason)
+
+ # skip running of the test, counts as 'passed' but prints 'skipped'
+ def skip(self, message = 'Unknown reason'):
+ print 'Skip: %s' % message
+ self.skipping = True
+ raise SkipTest(message)
+
+ # get name of currently running test
+ def get_name(self):
+ return self._testMethodName
+
+ def setUp(self):
+ test_setup_modes_conflict = self.modes & set(self.unsupported_modes)
+ if test_setup_modes_conflict:
+ self.skip("The test can't run with following modes of given setup: %s " % test_setup_modes_conflict)
+ if self.trex and not self.trex.is_idle():
+ print 'Warning: TRex is not idle at setUp, trying to stop it.'
+ self.trex.force_kill(confirm = False)
+ if not self.is_loopback:
+ print ''
+ if self.trex: # stateful
+ self.router.load_clean_config()
+ self.router.clear_counters()
+ self.router.clear_packet_drop_stats()
+
+ ########################################################################
+ #### DO NOT ADD TESTS TO THIS FILE ####
+ #### Added tests here will held once for EVERY test sub-class ####
+ ########################################################################
+
+ # masked example to such test. uncomment to watch how it affects #
+# def test_isInitialized(self):
+# assert CTRexScenario.is_init == True
+ def tearDown(self):
+ if not self.trex:
+ return
+ if not self.trex.is_idle():
+ print 'Warning: TRex is not idle at tearDown, trying to stop it.'
+ self.trex.force_kill(confirm = False)
+ if not self.skipping:
+ # print server logs of test run
+ if CTRexScenario.server_logs:
+ try:
+ print termstyle.green('\n>>>>>>>>>>>>>>> Daemon log <<<<<<<<<<<<<<<')
+ daemon_log = self.trex.get_trex_daemon_log()
+ log_size = len(daemon_log)
+ print ''.join(daemon_log[CTRexScenario.daemon_log_lines:])
+ CTRexScenario.daemon_log_lines = log_size
+ except Exception as e:
+ print "Can't get TRex daemon log:", e
+ try:
+ print termstyle.green('>>>>>>>>>>>>>>>> Trex log <<<<<<<<<<<<<<<<')
+ print ''.join(self.trex.get_trex_log())
+ except Exception as e:
+ print "Can't get TRex log:", e
+ if len(self.fail_reasons):
+ raise Exception('The test is failed, reasons:\n%s' % '\n'.join(self.fail_reasons))
+
+ def check_for_trex_crash(self):
+ pass
diff --git a/scripts/automation/regression/stateful_tests/trex_imix_test.py b/scripts/automation/regression/stateful_tests/trex_imix_test.py
new file mode 100755
index 00000000..43dea900
--- /dev/null
+++ b/scripts/automation/regression/stateful_tests/trex_imix_test.py
@@ -0,0 +1,202 @@
+#!/router/bin/python
+from trex_general_test import CTRexGeneral_Test
+from CPlatform import CStaticRouteConfig
+from tests_exceptions import *
+#import sys
+import time
+
+class CTRexIMIX_Test(CTRexGeneral_Test):
+ """This class defines the IMIX testcase of the T-Rex traffic generator"""
+ def __init__(self, *args, **kwargs):
+ # super(CTRexIMIX_Test, self).__init__()
+ CTRexGeneral_Test.__init__(self, *args, **kwargs)
+ pass
+
+ def setUp(self):
+ super(CTRexIMIX_Test, self).setUp() # launch super test class setUp process
+ # CTRexGeneral_Test.setUp(self) # launch super test class setUp process
+ # self.router.clear_counters()
+ pass
+
+ def test_routing_imix_64(self):
+ # test initializtion
+ if not self.is_loopback:
+ self.router.configure_basic_interfaces()
+ self.router.config_pbr(mode = "config")
+
+# self.trex.set_yaml_file('cap2/imix_64.yaml')
+ mult = self.get_benchmark_param('multiplier')
+ core = self.get_benchmark_param('cores')
+
+# trex_res = self.trex.run(multiplier = mult, cores = core, duration = 30, l = 1000, p = True)
+ ret = self.trex.start_trex(
+ c = core,
+ m = mult,
+ p = True,
+ nc = True,
+ d = 30,
+ f = 'cap2/imix_64.yaml',
+ l = 1000)
+
+ trex_res = self.trex.sample_to_run_finish()
+
+ # trex_res is a CTRexResult instance- and contains the summary of the test results
+ # you may see all the results keys by simply calling here for 'print trex_res.result'
+ print ("\nLATEST RESULT OBJECT:")
+ print trex_res
+
+ self.check_general_scenario_results(trex_res)
+ self.check_CPU_benchmark(trex_res)
+
+ # the name intentionally not matches nose default pattern, including the test should be specified explicitly
+ def dummy(self):
+ self.assertEqual(1, 2, 'boo')
+ self.assertEqual(2, 2, 'boo')
+ self.assertEqual(2, 3, 'boo')
+ #print ''
+ #print dir(self)
+ #print locals()
+ #print ''
+ #print_r(unittest.TestCase)
+ #print ''
+ #print_r(self)
+ print ''
+ #print unittest.TestCase.shortDescription(self)
+ #self.skip("I'm just a dummy test")
+
+
+ def test_routing_imix (self):
+ # test initializtion
+ if not self.is_loopback:
+ self.router.configure_basic_interfaces()
+ self.router.config_pbr(mode = "config")
+
+# self.trex.set_yaml_file('cap2/imix_fast_1g.yaml')
+ mult = self.get_benchmark_param('multiplier')
+ core = self.get_benchmark_param('cores')
+
+ ret = self.trex.start_trex(
+ c = core,
+ m = mult,
+ p = True,
+ nc = True,
+ d = 60,
+ f = 'cap2/imix_fast_1g.yaml',
+ l = 1000)
+
+ trex_res = self.trex.sample_to_run_finish()
+
+ # trex_res is a CTRexResult instance- and contains the summary of the test results
+ # you may see all the results keys by simply calling here for 'print trex_res.result'
+ print ("\nLATEST RESULT OBJECT:")
+ print trex_res
+
+ self.check_general_scenario_results(trex_res)
+
+ self.check_CPU_benchmark(trex_res)
+
+
+ def test_static_routing_imix (self):
+ if self.is_loopback:
+ self.skip('In loopback mode the test is same as test_routing_imix')
+ # test initializtion
+ if not self.is_loopback:
+ self.router.configure_basic_interfaces()
+
+ # Configure static routing based on benchmark data input
+ stat_route_dict = self.get_benchmark_param('stat_route_dict')
+ stat_route_obj = CStaticRouteConfig(stat_route_dict)
+ self.router.config_static_routing(stat_route_obj, mode = "config")
+
+ mult = self.get_benchmark_param('multiplier')
+ core = self.get_benchmark_param('cores')
+
+ ret = self.trex.start_trex(
+ c = core,
+ m = mult,
+ p = True,
+ nc = True,
+ d = 60,
+ f = 'cap2/imix_fast_1g.yaml',
+ l = 1000)
+
+ trex_res = self.trex.sample_to_run_finish()
+
+ # trex_res is a CTRexResult instance- and contains the summary of the test results
+ # you may see all the results keys by simply calling here for 'print trex_res.result'
+ print ("\nLATEST RESULT OBJECT:")
+ print trex_res
+ print ("\nLATEST DUMP:")
+ print trex_res.get_latest_dump()
+
+ self.check_general_scenario_results(trex_res)
+ self.check_CPU_benchmark(trex_res)
+
+
+ def test_static_routing_imix_asymmetric (self):
+ # test initializtion
+ if not self.is_loopback:
+ self.router.configure_basic_interfaces()
+
+ # Configure static routing based on benchmark data input
+ stat_route_dict = self.get_benchmark_param('stat_route_dict')
+ stat_route_obj = CStaticRouteConfig(stat_route_dict)
+ self.router.config_static_routing(stat_route_obj, mode = "config")
+
+ mult = self.get_benchmark_param('multiplier')
+ core = self.get_benchmark_param('cores')
+
+ ret = self.trex.start_trex(
+ c = core,
+ m = mult,
+ nc = True,
+ d = 100,
+ f = 'cap2/imix_fast_1g.yaml',
+ l = 1000)
+
+ trex_res = self.trex.sample_to_run_finish()
+
+ # trex_res is a CTRexResults instance- and contains the summary of the test results
+ # you may see all the results keys by simply calling here for 'print trex_res.result'
+ print ("\nLATEST RESULT OBJECT:")
+ print trex_res
+
+ self.check_general_scenario_results(trex_res)
+
+ self.check_CPU_benchmark(trex_res)
+
+
+ def test_jumbo(self):
+ if not self.is_loopback:
+ self.router.configure_basic_interfaces(mtu = 9216)
+ self.router.config_pbr(mode = "config")
+
+ mult = self.get_benchmark_param('multiplier')
+ core = self.get_benchmark_param('cores')
+
+ ret = self.trex.start_trex(
+ c = core,
+ m = mult,
+ p = True,
+ nc = True,
+ d = 100,
+ f = 'cap2/imix_9k.yaml',
+ l = 1000)
+
+ trex_res = self.trex.sample_to_run_finish()
+
+ # trex_res is a CTRexResults instance- and contains the summary of the test results
+ # you may see all the results keys by simply calling here for 'print trex_res.result'
+ print ("\nLATEST RESULT OBJECT:")
+ print trex_res
+
+ self.check_general_scenario_results(trex_res)
+ self.check_CPU_benchmark(trex_res, minimal_cpu = 0, maximal_cpu = 10)
+
+ def tearDown(self):
+ CTRexGeneral_Test.tearDown(self)
+ # remove nbar config here
+ pass
+
+if __name__ == "__main__":
+ pass
diff --git a/scripts/automation/regression/stateful_tests/trex_ipv6_test.py b/scripts/automation/regression/stateful_tests/trex_ipv6_test.py
new file mode 100755
index 00000000..bffb4754
--- /dev/null
+++ b/scripts/automation/regression/stateful_tests/trex_ipv6_test.py
@@ -0,0 +1,102 @@
+#!/router/bin/python
+from trex_general_test import CTRexGeneral_Test
+from tests_exceptions import *
+import time
+from nose.tools import assert_equal
+
+class CTRexIPv6_Test(CTRexGeneral_Test):
+ """This class defines the IPv6 testcase of the T-Rex traffic generator"""
+ def __init__(self, *args, **kwargs):
+ super(CTRexIPv6_Test, self).__init__(*args, **kwargs)
+ pass
+
+ def setUp(self):
+ super(CTRexIPv6_Test, self).setUp() # launch super test class setUp process
+# print " before sleep setup !!"
+# time.sleep(100000);
+# pass
+
+ def test_ipv6_simple(self):
+ if self.is_virt_nics:
+ self.skip('--ipv6 flag does not work correctly in with virtual NICs') # TODO: fix
+ # test initializtion
+ if not self.is_loopback:
+ self.router.configure_basic_interfaces()
+
+ self.router.config_pbr(mode = "config")
+ self.router.config_ipv6_pbr(mode = "config")
+
+ mult = self.get_benchmark_param('multiplier')
+ core = self.get_benchmark_param('cores')
+
+ ret = self.trex.start_trex(
+ c = core,
+ m = mult,
+ p = True,
+ nc = True,
+ ipv6 = True,
+ d = 60,
+ f = 'avl/sfr_delay_10_1g.yaml',
+ l = 1000)
+
+ trex_res = self.trex.sample_to_run_finish()
+
+ # trex_res is a CTRexResult instance- and contains the summary of the test results
+ # you may see all the results keys by simply calling here for 'print trex_res.result'
+ print ("\nLATEST RESULT OBJECT:")
+ print trex_res
+
+ self.check_general_scenario_results(trex_res)
+
+ self.check_CPU_benchmark (trex_res, 10.0)
+
+ assert True
+
+
+ def test_ipv6_negative (self):
+ if self.is_loopback:
+ self.skip('The test checks ipv6 drops by device and we are in loopback setup')
+ # test initializtion
+ self.router.configure_basic_interfaces()
+
+ # NOT CONFIGURING IPv6 INTENTIONALLY TO GET DROPS!
+ self.router.config_pbr(mode = "config")
+
+ # same params as test_ipv6_simple
+ mult = self.get_benchmark_param('multiplier', test_name = 'test_ipv6_simple')
+ core = self.get_benchmark_param('cores', test_name = 'test_ipv6_simple')
+
+ ret = self.trex.start_trex(
+ c = core,
+ m = mult,
+ p = True,
+ nc = True,
+ ipv6 = True,
+ d = 60,
+ f = 'avl/sfr_delay_10_1g.yaml',
+ l = 1000)
+
+ trex_res = self.trex.sample_to_run_finish()
+
+ # trex_res is a CTRexResult instance- and contains the summary of the test results
+ # you may see all the results keys by simply calling here for 'print trex_res.result'
+ print ("\nLATEST RESULT OBJECT:")
+ print trex_res
+
+ trex_tx_pckt = float(trex_res.get_last_value("trex-global.data.m_total_tx_pkts"))
+ trex_drops = int(trex_res.get_total_drops())
+
+ trex_drop_rate = trex_res.get_drop_rate()
+
+ # make sure that at least 50% of the total transmitted packets failed
+ self.assert_gt((trex_drops/trex_tx_pckt), 0.5, 'packet drop ratio is not high enough')
+
+
+
+ def tearDown(self):
+ CTRexGeneral_Test.tearDown(self)
+ # remove config here
+ pass
+
+if __name__ == "__main__":
+ pass
diff --git a/scripts/automation/regression/stateful_tests/trex_nat_test.py b/scripts/automation/regression/stateful_tests/trex_nat_test.py
new file mode 100755
index 00000000..e7fe5ca5
--- /dev/null
+++ b/scripts/automation/regression/stateful_tests/trex_nat_test.py
@@ -0,0 +1,169 @@
+#!/router/bin/python
+from trex_general_test import CTRexGeneral_Test
+from tests_exceptions import *
+import time
+from CPlatform import CStaticRouteConfig, CNatConfig
+from nose.tools import assert_equal
+
+
+class CTRexNoNat_Test(CTRexGeneral_Test):#(unittest.TestCase):
+ """This class defines the NAT testcase of the T-Rex traffic generator"""
+ def __init__(self, *args, **kwargs):
+ super(CTRexNoNat_Test, self).__init__(*args, **kwargs)
+ self.unsupported_modes = ['loopback'] # NAT requires device
+ pass
+
+ def setUp(self):
+ super(CTRexNoNat_Test, self).setUp() # launch super test class setUp process
+ pass
+
+ def check_nat_stats (self, nat_stats):
+ pass
+
+
+ def test_nat_learning(self):
+ # test initializtion
+ self.router.configure_basic_interfaces()
+
+ stat_route_dict = self.get_benchmark_param('stat_route_dict')
+ stat_route_obj = CStaticRouteConfig(stat_route_dict)
+ self.router.config_static_routing(stat_route_obj, mode = "config")
+
+ self.router.config_nat_verify() # shutdown duplicate interfaces
+
+# self.trex.set_yaml_file('cap2/http_simple.yaml')
+ mult = self.get_benchmark_param('multiplier')
+ core = self.get_benchmark_param('cores')
+
+# trex_res = self.trex.run(multiplier = mult, cores = core, duration = 100, l = 1000, learn_verify = True)
+ ret = self.trex.start_trex(
+ c = core,
+ m = mult,
+ learn_verify = True,
+ d = 100,
+ f = 'cap2/http_simple.yaml',
+ l = 1000)
+
+ trex_res = self.trex.sample_to_run_finish()
+
+ print ("\nLATEST RESULT OBJECT:")
+ print trex_res
+ print ("\nLATEST DUMP:")
+ print trex_res.get_latest_dump()
+
+
+ expected_nat_opened = self.get_benchmark_param('nat_opened')
+ learning_stats = trex_res.get_last_value("trex-global.data", ".*nat.*") # extract all nat data
+
+ if self.get_benchmark_param('allow_timeout_dev'):
+ nat_timeout_ratio = learning_stats['m_total_nat_time_out']/learning_stats['m_total_nat_open']
+ if nat_timeout_ratio > 0.005:
+ self.fail('TRex nat_timeout ratio %f > 0.005 (0.5%) and not as expected to be less than 0.5%' %(nat_timeout_ratio))
+ else:
+ self.check_results_eq (learning_stats, 'm_total_nat_time_out', 0.0)
+ self.check_results_eq (learning_stats, 'm_total_nat_no_fid', 0.0)
+ self.check_results_gt (learning_stats, 'm_total_nat_learn_error', 0.0)
+#
+ self.check_results_gt (learning_stats, 'm_total_nat_open', expected_nat_opened)
+
+ self.check_general_scenario_results(trex_res)
+ self.check_CPU_benchmark(trex_res, minimal_cpu = 10, maximal_cpu = 85)
+
+ def tearDown(self):
+ CTRexGeneral_Test.tearDown(self)
+ pass
+
+
+class CTRexNat_Test(CTRexGeneral_Test):#(unittest.TestCase):
+ """This class defines the NAT testcase of the T-Rex traffic generator"""
+ def __init__(self, *args, **kwargs):
+ super(CTRexNat_Test, self).__init__(*args, **kwargs)
+ self.unsupported_modes = ['loopback'] # NAT requires device
+ pass
+
+ def setUp(self):
+ super(CTRexNat_Test, self).setUp() # launch super test class setUp process
+ # config nat here
+
+
+ def check_nat_stats (self, nat_stats):
+ pass
+
+
+ def test_nat_simple_mode1(self):
+ self.nat_simple_helper(learn_mode=1)
+
+ def test_nat_simple_mode2(self):
+ self.nat_simple_helper(learn_mode=2)
+
+ def nat_simple_helper(self, learn_mode=1):
+ # test initializtion
+ self.router.configure_basic_interfaces()
+
+
+ stat_route_dict = self.get_benchmark_param('stat_route_dict')
+ stat_route_obj = CStaticRouteConfig(stat_route_dict)
+ self.router.config_static_routing(stat_route_obj, mode = "config")
+
+ nat_dict = self.get_benchmark_param('nat_dict')
+ nat_obj = CNatConfig(nat_dict)
+ self.router.config_nat(nat_obj)
+
+# self.trex.set_yaml_file('cap2/http_simple.yaml')
+ mult = self.get_benchmark_param('multiplier')
+ core = self.get_benchmark_param('cores')
+
+# trex_res = self.trex.run(nc=False,multiplier = mult, cores = core, duration = 100, l = 1000, learn = True)
+ ret = self.trex.start_trex(
+ c = core,
+ m = mult,
+ learn_mode = learn_mode,
+ d = 100,
+ f = 'cap2/http_simple.yaml',
+ l = 1000)
+
+ trex_res = self.trex.sample_to_run_finish()
+
+ print ("\nLATEST RESULT OBJECT:")
+ print trex_res
+ print ("\nLATEST DUMP:")
+ print trex_res.get_latest_dump()
+
+ trex_nat_stats = trex_res.get_last_value("trex-global.data", ".*nat.*") # extract all nat data
+ if self.get_benchmark_param('allow_timeout_dev'):
+ nat_timeout_ratio = trex_nat_stats['m_total_nat_time_out']/trex_nat_stats['m_total_nat_open']
+ if nat_timeout_ratio > 0.005:
+ self.fail('TRex nat_timeout ratio %f > 0.5%%' % nat_timeout_ratio)
+ else:
+ self.check_results_eq (trex_nat_stats,'m_total_nat_time_out', 0.0)
+ self.check_results_eq (trex_nat_stats,'m_total_nat_no_fid', 0.0)
+ self.check_results_gt (trex_nat_stats,'m_total_nat_open', 6000)
+
+
+ self.check_general_scenario_results(trex_res, check_latency = False) # NAT can cause latency
+## test_norm_cpu = 2*(trex_res.result['total-tx']/(core*trex_res.result['cpu_utilization']))
+# trex_tx_pckt = trex_res.get_last_value("trex-global.data.m_total_tx_bps")
+# cpu_util = int(trex_res.get_last_value("trex-global.data.m_cpu_util"))
+# test_norm_cpu = 2*(trex_tx_pckt/(core*cpu_util))
+# print "test_norm_cpu is: ", test_norm_cpu
+
+ self.check_CPU_benchmark(trex_res, minimal_cpu = 10, maximal_cpu = 85)
+
+ #if ( abs((test_norm_cpu/self.get_benchmark_param('cpu_to_core_ratio')) - 1) > 0.03):
+ # raiseraise AbnormalResultError('Normalized bandwidth to CPU utilization ratio exceeds 3%')
+
+ nat_stats = self.router.get_nat_stats()
+ print nat_stats
+
+ self.assert_gt(nat_stats['total_active_trans'], 5000, 'total active translations is not high enough')
+ self.assert_gt(nat_stats['dynamic_active_trans'], 5000, 'total dynamic active translations is not high enough')
+ self.assertEqual(nat_stats['static_active_trans'], 0, "NAT statistics nat_stats['static_active_trans'] should be zero")
+ self.assert_gt(nat_stats['num_of_hits'], 50000, 'total nat hits is not high enough')
+
+ def tearDown(self):
+ CTRexGeneral_Test.tearDown(self)
+ self.router.clear_nat_translations()
+
+
+if __name__ == "__main__":
+ pass
diff --git a/scripts/automation/regression/stateful_tests/trex_nbar_test.py b/scripts/automation/regression/stateful_tests/trex_nbar_test.py
new file mode 100755
index 00000000..74d0227b
--- /dev/null
+++ b/scripts/automation/regression/stateful_tests/trex_nbar_test.py
@@ -0,0 +1,193 @@
+#!/router/bin/python
+from trex_general_test import CTRexGeneral_Test
+from tests_exceptions import *
+from interfaces_e import IFType
+from nose.tools import nottest
+from misc_methods import print_r
+
+class CTRexNbar_Test(CTRexGeneral_Test):
+ """This class defines the NBAR testcase of the T-Rex traffic generator"""
+ def __init__(self, *args, **kwargs):
+ super(CTRexNbar_Test, self).__init__(*args, **kwargs)
+ self.unsupported_modes = ['loopback'] # obviously no NBar in loopback
+ pass
+
+ def setUp(self):
+ super(CTRexNbar_Test, self).setUp() # launch super test class setUp process
+# self.router.kill_nbar_flows()
+ self.router.clear_cft_counters()
+ self.router.clear_nbar_stats()
+
+ def match_classification (self):
+ nbar_benchmark = self.get_benchmark_param("nbar_classification")
+ test_classification = self.router.get_nbar_stats()
+ print "TEST CLASSIFICATION:"
+ print test_classification
+ missmatchFlag = False
+ missmatchMsg = "NBAR classification contians a missmatch on the following protocols:"
+ fmt = '\n\t{0:15} | Expected: {1:>3.2f}%, Got: {2:>3.2f}%'
+ noise_level = 0.045 # percents
+
+ for cl_intf in self.router.get_if_manager().get_if_list(if_type = IFType.Client):
+ client_intf = cl_intf.get_name()
+
+ # removing noise classifications
+ for key, value in test_classification[client_intf]['percentage'].items():
+ if value <= noise_level:
+ print 'Removing noise classification: %s' % key
+ del test_classification[client_intf]['percentage'][key]
+
+ if len(test_classification[client_intf]['percentage']) != (len(nbar_benchmark) + 1): # adding 'total' key to nbar_benchmark
+ raise ClassificationMissmatchError ('The total size of classification result does not match the provided benchmark.')
+
+ for protocol, bench in nbar_benchmark.iteritems():
+ if protocol != 'total':
+ try:
+ bench = float(bench)
+ protocol = protocol.replace('_','-')
+ protocol_test_res = test_classification[client_intf]['percentage'][protocol]
+ deviation = 100 * abs(bench/protocol_test_res - 1) # percents
+ difference = abs(bench - protocol_test_res)
+ if (deviation > 10 and difference > noise_level): # allowing 10% deviation and 'noise_level'% difference
+ missmatchFlag = True
+ missmatchMsg += fmt.format(protocol, bench, protocol_test_res)
+ except KeyError as e:
+ missmatchFlag = True
+ print e
+ print "Changes missmatchFlag to True. ", "\n\tProtocol {0} isn't part of classification results on interface {intf}".format( protocol, intf = client_intf )
+ missmatchMsg += "\n\tProtocol {0} isn't part of classification results on interface {intf}".format( protocol, intf = client_intf )
+ except ZeroDivisionError as e:
+ print "ZeroDivisionError: %s" % protocol
+ pass
+ if missmatchFlag:
+ self.fail(missmatchMsg)
+
+
+ def test_nbar_simple(self):
+ # test initializtion
+ deviation_compare_value = 0.03 # default value of deviation - 3%
+ self.router.configure_basic_interfaces()
+
+ self.router.config_pbr(mode = "config")
+ self.router.config_nbar_pd()
+
+ mult = self.get_benchmark_param('multiplier')
+ core = self.get_benchmark_param('cores')
+
+ ret = self.trex.start_trex(
+ c = core,
+ m = mult,
+ p = True,
+ nc = True,
+ d = 100,
+ f = 'avl/sfr_delay_10_1g.yaml',
+ l = 1000)
+
+ trex_res = self.trex.sample_to_run_finish()
+
+ # trex_res is a CTRexResult instance- and contains the summary of the test results
+ # you may see all the results keys by simply calling here for 'print trex_res.result'
+ print ("\nLATEST RESULT OBJECT:")
+ print trex_res
+ print ("\nLATEST DUMP:")
+ print trex_res.get_latest_dump()
+
+
+ self.check_general_scenario_results(trex_res, check_latency = False) # NBAR can cause latency
+ # test_norm_cpu = 2*(trex_res.result['total-tx']/(core*trex_res.result['cpu_utilization']))
+ trex_tx_pckt = trex_res.get_last_value("trex-global.data.m_total_tx_pkts")
+ cpu_util = trex_res.get_last_value("trex-global.data.m_cpu_util")
+ cpu_util_hist = trex_res.get_value_list("trex-global.data.m_cpu_util")
+ print "cpu util is:", cpu_util
+ print cpu_util_hist
+ test_norm_cpu = 2 * trex_tx_pckt / (core * cpu_util)
+ print "test_norm_cpu is:", test_norm_cpu
+
+
+ if self.get_benchmark_param('cpu2core_custom_dev'):
+ # check this test by custom deviation
+ deviation_compare_value = self.get_benchmark_param('cpu2core_dev')
+ print "Comparing test with custom deviation value- {dev_val}%".format( dev_val = int(deviation_compare_value*100) )
+
+ # need to be fixed !
+ #if ( abs((test_norm_cpu/self.get_benchmark_param('cpu_to_core_ratio')) - 1) > deviation_compare_value):
+ # raise AbnormalResultError('Normalized bandwidth to CPU utilization ratio exceeds benchmark boundaries')
+
+ self.match_classification()
+
+ assert True
+
+ @nottest
+ def test_rx_check (self):
+ # test initializtion
+ self.router.configure_basic_interfaces()
+
+ self.router.config_pbr(mode = "config")
+ self.router.config_nbar_pd()
+
+ mult = self.get_benchmark_param('multiplier')
+ core = self.get_benchmark_param('cores')
+ sample_rate = self.get_benchmark_param('rx_sample_rate')
+
+ ret = self.trex.start_trex(
+ c = core,
+ m = mult,
+ p = True,
+ nc = True,
+ rx_check = sample_rate,
+ d = 100,
+ f = 'cap2/sfr.yaml',
+ l = 1000)
+
+ trex_res = self.trex.sample_to_run_finish()
+
+ # trex_res is a CTRexResult instance- and contains the summary of the test results
+ # you may see all the results keys by simply calling here for 'print trex_res.result'
+ print ("\nLATEST RESULT OBJECT:")
+ print trex_res
+ print ("\nLATEST DUMP:")
+ print trex_res.get_latest_dump()
+
+ self.check_general_scenario_results(trex_res)
+
+ self.check_CPU_benchmark(trex_res, 10)
+
+# if trex_res.result['rx_check_tx']==trex_res.result['rx_check_rx']: # rx_check verification shoud pass
+# assert trex_res.result['rx_check_verification'] == "OK"
+# else:
+# assert trex_res.result['rx_check_verification'] == "FAIL"
+
+ # the name intentionally not matches nose default pattern, including the test should be specified explicitly
+ def NBarLong(self):
+ self.router.configure_basic_interfaces()
+ self.router.config_pbr(mode = "config")
+ self.router.config_nbar_pd()
+
+ mult = self.get_benchmark_param('multiplier')
+ core = self.get_benchmark_param('cores')
+
+ ret = self.trex.start_trex(
+ c = core,
+ m = mult,
+ p = True,
+ nc = True,
+ d = 18000, # 5 hours
+ f = 'avl/sfr_delay_10_1g.yaml',
+ l = 1000)
+
+ trex_res = self.trex.sample_to_run_finish()
+
+ # trex_res is a CTRexResult instance- and contains the summary of the test results
+ # you may see all the results keys by simply calling here for 'print trex_res.result'
+ print ("\nLATEST RESULT OBJECT:")
+ print trex_res
+
+ self.check_general_scenario_results(trex_res, check_latency = False)
+
+
+ def tearDown(self):
+ CTRexGeneral_Test.tearDown(self)
+ pass
+
+if __name__ == "__main__":
+ pass
diff --git a/scripts/automation/regression/stateful_tests/trex_rx_test.py b/scripts/automation/regression/stateful_tests/trex_rx_test.py
new file mode 100755
index 00000000..37b1c722
--- /dev/null
+++ b/scripts/automation/regression/stateful_tests/trex_rx_test.py
@@ -0,0 +1,275 @@
+#!/router/bin/python
+from trex_general_test import CTRexGeneral_Test
+from CPlatform import CStaticRouteConfig, CNatConfig
+from tests_exceptions import *
+#import sys
+import time
+import copy
+from nose.tools import nottest
+import traceback
+
+class CTRexRx_Test(CTRexGeneral_Test):
+ """This class defines the rx testcase of the T-Rex traffic generator"""
+ def __init__(self, *args, **kwargs):
+ CTRexGeneral_Test.__init__(self, *args, **kwargs)
+ self.unsupported_modes = ['virt_nics'] # TODO: fix
+ pass
+
+ def setUp(self):
+ CTRexGeneral_Test.setUp(self)
+ pass
+
+
+ def check_rx_errors(self, trex_res, allow_error_tolerance = True):
+ try:
+ # counters to check
+
+ latency_counters_display = {'m_unsup_prot': 0, 'm_no_magic': 0, 'm_no_id': 0, 'm_seq_error': 0, 'm_length_error': 0, 'm_no_ipv4_option': 0, 'm_tx_pkt_err': 0}
+ rx_counters = {'m_err_drop': 0, 'm_err_aged': 0, 'm_err_no_magic': 0, 'm_err_wrong_pkt_id': 0, 'm_err_fif_seen_twice': 0, 'm_err_open_with_no_fif_pkt': 0, 'm_err_oo_dup': 0, 'm_err_oo_early': 0, 'm_err_oo_late': 0, 'm_err_flow_length_changed': 0}
+
+ # get relevant TRex results
+
+ try:
+ ports_names = trex_res.get_last_value('trex-latecny-v2.data', 'port\-\d+')
+ if not ports_names:
+ raise AbnormalResultError('Could not find ports info in TRex results, path: trex-latecny-v2.data.port-*')
+ for port_name in ports_names:
+ path = 'trex-latecny-v2.data.%s.stats' % port_name
+ port_result = trex_res.get_last_value(path)
+ if not port_result:
+ raise AbnormalResultError('Could not find port stats in TRex results, path: %s' % path)
+ for key in latency_counters_display:
+ latency_counters_display[key] += port_result[key]
+
+ # using -k flag in TRex produces 1 error per port in latency counter m_seq_error, allow it until issue resolved. For comparing use dict with reduces m_seq_error number.
+ latency_counters_compare = copy.deepcopy(latency_counters_display)
+ latency_counters_compare['m_seq_error'] = max(0, latency_counters_compare['m_seq_error'] - len(ports_names))
+
+ path = 'rx-check.data.stats'
+ rx_check_results = trex_res.get_last_value(path)
+ if not rx_check_results:
+ raise AbnormalResultError('No TRex results by path: %s' % path)
+ for key in rx_counters:
+ rx_counters[key] = rx_check_results[key]
+
+ path = 'rx-check.data.stats.m_total_rx'
+ total_rx = trex_res.get_last_value(path)
+ if not total_rx:
+ raise AbnormalResultError('No TRex results by path: %s' % path)
+
+
+ print 'Total packets checked: %s' % total_rx
+ print 'Latency counters: %s' % latency_counters_display
+ print 'rx_check counters: %s' % rx_counters
+
+ except KeyError as e:
+ self.fail('Expected key in TRex result was not found.\n%s' % traceback.print_exc())
+
+ # the check. in loopback expect 0 problems, at others allow errors <error_tolerance>% of total_rx
+
+ total_errors = sum(rx_counters.values()) + sum(latency_counters_compare.values())
+ error_tolerance = self.get_benchmark_param('error_tolerance')
+ if not error_tolerance or not allow_error_tolerance:
+ error_tolerance = 0
+ error_percentage = float(total_errors) * 100 / total_rx
+
+ if total_errors > 0:
+ if self.is_loopback or error_percentage > error_tolerance:
+ self.fail('Too much errors in rx_check. (~%s%% of traffic)' % error_percentage)
+ else:
+ print 'There are errors in rx_check (%f%%), not exceeding allowed limit (%s%%)' % (error_percentage, error_tolerance)
+ else:
+ print 'No errors in rx_check.'
+ except Exception as e:
+ print traceback.print_exc()
+ self.fail('Errors in rx_check: %s' % e)
+
+ def test_rx_check_sfr(self):
+ if not self.is_loopback:
+ self.router.configure_basic_interfaces()
+ self.router.config_pbr(mode = 'config')
+
+ core = self.get_benchmark_param('cores')
+ mult = self.get_benchmark_param('multiplier')
+ sample_rate = self.get_benchmark_param('rx_sample_rate')
+
+ ret = self.trex.start_trex(
+ c = core,
+ m = mult,
+ p = True,
+ nc = True,
+ rx_check = sample_rate,
+ d = 100,
+ f = 'avl/sfr_delay_10_1g_no_bundeling.yaml',
+ l = 1000,
+ k = 10,
+ learn_verify = True,
+ l_pkt_mode = 2)
+
+ trex_res = self.trex.sample_to_run_finish()
+
+ print ("\nLATEST RESULT OBJECT:")
+ print trex_res
+ #print ("\nLATEST DUMP:")
+ #print trex_res.get_latest_dump()
+
+ self.check_general_scenario_results(trex_res)
+ self.check_CPU_benchmark(trex_res)
+ self.check_rx_errors(trex_res)
+
+
+ def test_rx_check_http(self):
+ if not self.is_loopback:
+ # TODO: skip as test_rx_check_http_negative will cover it
+ #self.skip('This test is covered by test_rx_check_http_negative')
+ self.router.configure_basic_interfaces()
+ self.router.config_pbr(mode = "config")
+
+ core = self.get_benchmark_param('cores')
+ mult = self.get_benchmark_param('multiplier')
+ sample_rate = self.get_benchmark_param('rx_sample_rate')
+
+ ret = self.trex.start_trex(
+ c = core,
+ m = mult,
+ p = True,
+ nc = True,
+ rx_check = sample_rate,
+ d = 100,
+ f = 'cap2/http_simple.yaml',
+ l = 1000,
+ k = 10,
+ learn_verify = True,
+ l_pkt_mode = 2)
+
+ trex_res = self.trex.sample_to_run_finish()
+
+ print ("\nLATEST RESULT OBJECT:")
+ print trex_res
+
+ self.check_general_scenario_results(trex_res)
+ self.check_CPU_benchmark(trex_res)
+ self.check_rx_errors(trex_res)
+
+
+ def test_rx_check_sfr_ipv6(self):
+ if not self.is_loopback:
+ self.router.configure_basic_interfaces()
+ self.router.config_pbr(mode = 'config')
+ self.router.config_ipv6_pbr(mode = "config")
+
+ core = self.get_benchmark_param('cores')
+ mult = self.get_benchmark_param('multiplier')
+ sample_rate = self.get_benchmark_param('rx_sample_rate')
+
+ ret = self.trex.start_trex(
+ c = core,
+ m = mult,
+ p = True,
+ nc = True,
+ rx_check = sample_rate,
+ d = 100,
+ f = 'avl/sfr_delay_10_1g_no_bundeling.yaml',
+ l = 1000,
+ k = 10,
+ ipv6 = True)
+
+ trex_res = self.trex.sample_to_run_finish()
+
+ print ("\nLATEST RESULT OBJECT:")
+ print trex_res
+ #print ("\nLATEST DUMP:")
+ #print trex_res.get_latest_dump()
+
+ self.check_general_scenario_results(trex_res)
+ self.check_CPU_benchmark(trex_res)
+ self.check_rx_errors(trex_res)
+
+
+ def test_rx_check_http_ipv6(self):
+ if not self.is_loopback:
+ self.router.configure_basic_interfaces()
+ self.router.config_pbr(mode = "config")
+ self.router.config_ipv6_pbr(mode = "config")
+
+ core = self.get_benchmark_param('cores')
+ mult = self.get_benchmark_param('multiplier')
+ sample_rate = self.get_benchmark_param('rx_sample_rate')
+
+ ret = self.trex.start_trex(
+ c = core,
+ m = mult,
+ p = True,
+ nc = True,
+ rx_check = sample_rate,
+ d = 100,
+ f = 'cap2/http_simple.yaml',
+ l = 1000,
+ k = 10,
+ ipv6 = True)
+
+ trex_res = self.trex.sample_to_run_finish()
+
+ print ("\nLATEST RESULT OBJECT:")
+ print trex_res
+
+ self.check_general_scenario_results(trex_res)
+ self.check_CPU_benchmark(trex_res)
+ self.check_rx_errors(trex_res)
+
+ #@nottest
+ def test_rx_check_http_negative(self):
+ if self.is_loopback:
+ self.skip('This test uses NAT, not relevant for loopback')
+
+ self.router.configure_basic_interfaces()
+ self.router.config_pbr(mode = "config")
+
+ core = self.get_benchmark_param('cores')
+ mult = self.get_benchmark_param('multiplier')
+ sample_rate = self.get_benchmark_param('rx_sample_rate')
+
+ ret = self.trex.start_trex(
+ c = core,
+ m = mult,
+ p = True,
+ rx_check = sample_rate,
+ d = 60,
+ f = 'cap2/http_simple.yaml',
+ l = 1000,
+ k = 10,
+ learn_verify = True,
+ l_pkt_mode = 2)
+
+ print 'Run for 40 seconds, expect no errors'
+ trex_res = self.trex.sample_x_seconds(40)
+ print ("\nLATEST RESULT OBJECT:")
+ print trex_res
+ self.check_general_scenario_results(trex_res)
+ self.check_CPU_benchmark(trex_res)
+ self.check_rx_errors(trex_res)
+
+ print 'Run until finish, expect errors'
+ old_errors = copy.deepcopy(self.fail_reasons)
+ nat_dict = self.get_benchmark_param('nat_dict', test_name = 'test_nat_simple')
+ nat_obj = CNatConfig(nat_dict)
+ self.router.config_nat(nat_obj)
+ self.router.config_zbf()
+ trex_res = self.trex.sample_to_run_finish()
+ self.router.config_no_zbf()
+ self.router.clear_nat_translations()
+ print ("\nLATEST RESULT OBJECT:")
+ print trex_res
+ self.check_rx_errors(trex_res, allow_error_tolerance = False)
+ if self.fail_reasons == old_errors:
+ self.fail('Expected errors here, got none.')
+ else:
+ print 'Got errors as expected.'
+ self.fail_reasons = old_errors
+
+ def tearDown(self):
+ CTRexGeneral_Test.tearDown(self)
+ pass
+
+if __name__ == "__main__":
+ pass
diff --git a/scripts/automation/regression/stateless_tests/stl_examples_test.py b/scripts/automation/regression/stateless_tests/stl_examples_test.py
new file mode 100755
index 00000000..080bb3d3
--- /dev/null
+++ b/scripts/automation/regression/stateless_tests/stl_examples_test.py
@@ -0,0 +1,35 @@
+#!/router/bin/python
+from stl_general_test import CStlGeneral_Test, CTRexScenario
+import os, sys
+from misc_methods import run_command
+
+class STLExamples_Test(CStlGeneral_Test):
+ """This class defines the IMIX testcase of the T-Rex traffic generator"""
+
+ def setUp(self):
+ print 'STLExamples_Test setUp'
+ CStlGeneral_Test.setUp(self)
+ # examples connect by their own
+ if self.is_connected():
+ CTRexScenario.stl_trex.disconnect()
+
+ @classmethod
+ def tearDownClass(cls):
+ print 'STLExamples_Test tearDownClass'
+ # connect back at end of tests
+ if not cls.is_connected():
+ CTRexScenario.stl_trex.connect()
+
+ def test_stl_examples(self):
+ examples_dir = '../trex_control_plane/stl/examples'
+ examples_to_test = [
+ 'stl_imix.py',
+ ]
+
+ for example in examples_to_test:
+ return_code, stdout, stderr = run_command("sh -c 'cd %s; %s %s -s %s'" % (examples_dir, sys.executable, example, CTRexScenario.configuration.trex['trex_name']))
+ assert return_code == 0, 'example %s failed.\nstdout: %s\nstderr: %s' % (return_code, stdout, stderr)
+
+ def test_stl_examples1(self):
+ print 'in test_stl_examples1'
+
diff --git a/scripts/automation/regression/stateless_tests/stl_general_test.py b/scripts/automation/regression/stateless_tests/stl_general_test.py
new file mode 100644
index 00000000..8d21cadf
--- /dev/null
+++ b/scripts/automation/regression/stateless_tests/stl_general_test.py
@@ -0,0 +1,62 @@
+import os, sys
+import unittest
+from trex import CTRexScenario
+from stateful_tests.trex_general_test import CTRexGeneral_Test
+from trex_stl_lib.api import *
+import time
+from nose.tools import nottest
+
+
+class CStlGeneral_Test(CTRexGeneral_Test):
+ """This class defines the general stateless testcase of the T-Rex traffic generator"""
+
+ #once for all tests under CStlGeneral_Test
+ @classmethod
+ def setUpClass(cls):
+ cls.stl_trex = CTRexScenario.stl_trex
+
+ def setUp(self):
+ CTRexGeneral_Test.setUp(self)
+ # check basic requirements, should be verified at test_connectivity, here only skip test
+ if CTRexScenario.stl_init_error:
+ self.skip(CTRexScenario.stl_init_error)
+
+ @staticmethod
+ def connect(timeout = 20):
+ sys.stdout.write('Connecting')
+ for i in range(timeout):
+ try:
+ sys.stdout.write('.')
+ sys.stdout.flush()
+ CTRexScenario.stl_trex.connect()
+ return
+ except:
+ time.sleep(1)
+ CTRexScenario.stl_trex.connect()
+
+ @staticmethod
+ def get_port_count():
+ return CTRexScenario.stl_trex.get_port_count()
+
+ @staticmethod
+ def is_connected():
+ return CTRexScenario.stl_trex.is_connected()
+
+class STLBasic_Test(CStlGeneral_Test):
+ # will run it first explicitly, check connectivity and configure routing
+ @nottest
+ def test_connectivity(self):
+ if not self.is_loopback:
+ CTRexScenario.router.load_clean_config()
+ CTRexScenario.router.configure_basic_interfaces()
+ CTRexScenario.router.config_pbr(mode = "config")
+
+ CTRexScenario.stl_init_error = 'Client could not connect'
+ self.connect()
+ CTRexScenario.stl_init_error = 'Client could not map ports'
+ CTRexScenario.stl_ports_map = stl_map_ports(CTRexScenario.stl_trex)
+ CTRexScenario.stl_init_error = 'Could not determine bidirectional ports'
+ print 'Ports mapping: %s' % CTRexScenario.stl_ports_map
+ if not len(CTRexScenario.stl_ports_map['bi']):
+ raise STLError('No bidirectional ports')
+ CTRexScenario.stl_init_error = None