summaryrefslogtreecommitdiffstats
path: root/scripts/automation
diff options
context:
space:
mode:
authorYaroslav Brustinov <ybrustin@cisco.com>2016-01-05 15:22:22 +0200
committerYaroslav Brustinov <ybrustin@cisco.com>2016-01-05 15:22:22 +0200
commit823b8294539f2e55db09795a7fff03d7be6b6346 (patch)
tree149cdce761ead614409829d9ab1c2d9cdf680c4a /scripts/automation
parentfecdb3ea73b380e01a8877c8e88ce61e853000bc (diff)
move regression to trex-core
slight fixes of hltapi + vm in packet builder update yaml lib version from 3.01 to 3.11
Diffstat (limited to 'scripts/automation')
-rwxr-xr-xscripts/automation/regression/CPlatform.py908
-rwxr-xr-xscripts/automation/regression/CProgressDisp.py87
-rwxr-xr-xscripts/automation/regression/CShowParser.py228
-rwxr-xr-xscripts/automation/regression/CustomLogger.py36
-rwxr-xr-xscripts/automation/regression/aggregate_results.py492
-rwxr-xr-xscripts/automation/regression/functional_unit_tests.py78
-rwxr-xr-xscripts/automation/regression/interactive_platform4
-rwxr-xr-xscripts/automation/regression/interactive_platform.py338
-rwxr-xr-xscripts/automation/regression/interfaces_e.py8
-rwxr-xr-xscripts/automation/regression/misc_methods.py280
-rwxr-xr-xscripts/automation/regression/outer_packages.py38
-rwxr-xr-xscripts/automation/regression/platform_cmd_link.py442
-rwxr-xr-xscripts/automation/regression/sshpass.exp17
-rwxr-xr-xscripts/automation/regression/stateless_example.py47
-rwxr-xr-xscripts/automation/regression/style.css54
-rw-r--r--scripts/automation/regression/trex.py427
-rwxr-xr-xscripts/automation/regression/trex_unit_test.py273
-rwxr-xr-xscripts/automation/regression/unit_tests/__init__.py1
-rw-r--r--scripts/automation/regression/unit_tests/functional_tests/config.yaml74
-rwxr-xr-xscripts/automation/regression/unit_tests/functional_tests/functional_general_test.py22
-rwxr-xr-xscripts/automation/regression/unit_tests/functional_tests/misc_methods_test.py61
-rwxr-xr-xscripts/automation/regression/unit_tests/functional_tests/platform_cmd_cache_test.py60
-rwxr-xr-xscripts/automation/regression/unit_tests/functional_tests/platform_cmd_link_test.py62
-rwxr-xr-xscripts/automation/regression/unit_tests/functional_tests/platform_device_cfg_test.py20
-rwxr-xr-xscripts/automation/regression/unit_tests/functional_tests/platform_dual_if_obj_test.py31
-rwxr-xr-xscripts/automation/regression/unit_tests/functional_tests/platform_if_manager_test.py40
-rwxr-xr-xscripts/automation/regression/unit_tests/functional_tests/platform_if_obj_test.py49
-rwxr-xr-xscripts/automation/regression/unit_tests/tests_exceptions.py37
-rwxr-xr-xscripts/automation/regression/unit_tests/trex_general_test.py328
-rwxr-xr-xscripts/automation/regression/unit_tests/trex_imix_test.py176
-rwxr-xr-xscripts/automation/regression/unit_tests/trex_ipv6_test.py102
-rwxr-xr-xscripts/automation/regression/unit_tests/trex_nat_test.py164
-rwxr-xr-xscripts/automation/regression/unit_tests/trex_nbar_test.py193
-rwxr-xr-xscripts/automation/regression/unit_tests/trex_rx_test.py285
-rwxr-xr-xscripts/automation/trex_control_plane/client/trex_hltapi.py91
-rwxr-xr-xscripts/automation/trex_control_plane/client_utils/external_packages.py2
-rwxr-xr-xscripts/automation/trex_control_plane/client_utils/packet_builder.py6
-rwxr-xr-xscripts/automation/trex_control_plane/common/external_packages.py2
38 files changed, 5520 insertions, 43 deletions
diff --git a/scripts/automation/regression/CPlatform.py b/scripts/automation/regression/CPlatform.py
new file mode 100755
index 00000000..9c81a3a0
--- /dev/null
+++ b/scripts/automation/regression/CPlatform.py
@@ -0,0 +1,908 @@
+#!/router/bin/python
+
+from interfaces_e import IFType
+from platform_cmd_link import *
+import CustomLogger
+import misc_methods
+import re
+import time
+import CProgressDisp
+from CShowParser import CShowParser
+
+class CPlatform(object):
+ def __init__(self, silent_mode):
+ self.if_mngr = CIfManager()
+ self.cmd_link = CCommandLink(silent_mode)
+ self.nat_config = None
+ self.stat_route_config = None
+ self.running_image = None
+ self.needed_image_path = None
+ self.tftp_cfg = None
+ self.config_history = { 'basic_if_config' : False, 'tftp_server_config' : False }
+
+ def configure_basic_interfaces(self):
+
+ cache = CCommandCache()
+ for dual_if in self.if_mngr.get_dual_if_list():
+ client_if_command_set = []
+ server_if_command_set = []
+
+ client_if_command_set.append ('mac-address {mac}'.format( mac = dual_if.client_if.get_src_mac_addr()) )
+ client_if_command_set.append ('mtu 4000')
+ client_if_command_set.append ('ip address {ip} 255.255.255.0'.format( ip = dual_if.client_if.get_ipv4_addr() ))
+ client_if_command_set.append ('ipv6 address {ip}/64'.format( ip = dual_if.client_if.get_ipv6_addr() ))
+
+ cache.add('IF', client_if_command_set, dual_if.client_if.get_name())
+
+ server_if_command_set.append ('mac-address {mac}'.format( mac = dual_if.server_if.get_src_mac_addr()) )
+ server_if_command_set.append ('mtu 4000')
+ server_if_command_set.append ('ip address {ip} 255.255.255.0'.format( ip = dual_if.server_if.get_ipv4_addr() ))
+ server_if_command_set.append ('ipv6 address {ip}/64'.format( ip = dual_if.server_if.get_ipv6_addr() ))
+
+ cache.add('IF', server_if_command_set, dual_if.server_if.get_name())
+
+ self.cmd_link.run_single_command(cache)
+ self.config_history['basic_if_config'] = True
+
+
+
+ def configure_basic_filtered_interfaces(self, intf_list):
+
+ cache = CCommandCache()
+ for intf in intf_list:
+ if_command_set = []
+
+ if_command_set.append ('mac-address {mac}'.format( mac = intf.get_src_mac_addr()) )
+ if_command_set.append ('mtu 4000')
+ if_command_set.append ('ip address {ip} 255.255.255.0'.format( ip = intf.get_ipv4_addr() ))
+ if_command_set.append ('ipv6 address {ip}/64'.format( ip = intf.get_ipv6_addr() ))
+
+ cache.add('IF', if_command_set, intf.get_name())
+
+ self.cmd_link.run_single_command(cache)
+
+
+ def load_clean_config (self, config_filename = "clean_config.cfg", cfg_drive = "bootflash"):
+ self.clear_nat_translations()
+
+ cache = CCommandCache()
+ cache.add('EXEC', "configure replace {drive}:{file} force".format(drive = cfg_drive, file = config_filename))
+ self.cmd_link.run_single_command(cache)
+
+ def config_pbr (self, mode = 'config'):
+ idx = 1
+ unconfig_str = '' if mode=='config' else 'no '
+
+ cache = CCommandCache()
+ pre_commit_cache = CCommandCache()
+ pre_commit_set = set([])
+
+ for dual_if in self.if_mngr.get_dual_if_list():
+ client_if_command_set = []
+ server_if_command_set = []
+ conf_t_command_set = []
+ client_net_next_hop = misc_methods.get_single_net_client_addr(dual_if.server_if.get_ipv4_addr() )
+ server_net_next_hop = misc_methods.get_single_net_client_addr(dual_if.client_if.get_ipv4_addr() )
+
+ if dual_if.is_duplicated():
+ # define the relevant VRF name
+ pre_commit_set.add('{mode}ip vrf {dup}'.format( mode = unconfig_str, dup = dual_if.get_vrf_name()) )
+
+ # assign VRF to interfaces, config interfaces with relevant route-map
+ client_if_command_set.append ('{mode}ip vrf forwarding {dup}'.format( mode = unconfig_str, dup = dual_if.get_vrf_name()) )
+ client_if_command_set.append ('{mode}ip policy route-map {dup}_{p1}_to_{p2}'.format(
+ mode = unconfig_str,
+ dup = dual_if.get_vrf_name(),
+ p1 = 'p'+str(idx), p2 = 'p'+str(idx+1) ) )
+ server_if_command_set.append ('{mode}ip vrf forwarding {dup}'.format( mode = unconfig_str, dup = dual_if.get_vrf_name()) )
+ server_if_command_set.append ('{mode}ip policy route-map {dup}_{p2}_to_{p1}'.format(
+ mode = unconfig_str,
+ dup = dual_if.get_vrf_name(),
+ p1 = 'p'+str(idx), p2 = 'p'+str(idx+1) ) )
+
+ # config route-map routing
+ conf_t_command_set.append('{mode}route-map {dup}_{p1}_to_{p2} permit 10'.format(
+ mode = unconfig_str,
+ dup = dual_if.get_vrf_name(),
+ p1 = 'p'+str(idx), p2 = 'p'+str(idx+1) ) )
+ if mode == 'config':
+ conf_t_command_set.append('set ip next-hop {next_hop}'.format(
+ next_hop = client_net_next_hop) )
+ conf_t_command_set.append('{mode}route-map {dup}_{p2}_to_{p1} permit 10'.format(
+ mode = unconfig_str,
+ dup = dual_if.get_vrf_name(),
+ p1 = 'p'+str(idx), p2 = 'p'+str(idx+1) ) )
+ if mode == 'config':
+ conf_t_command_set.append('set ip next-hop {next_hop}'.format(
+ next_hop = server_net_next_hop) )
+
+ # config global arp to interfaces net address and vrf
+ conf_t_command_set.append('{mode}arp vrf {dup} {next_hop} {dest_mac} arpa'.format(
+ mode = unconfig_str,
+ dup = dual_if.get_vrf_name(),
+ next_hop = server_net_next_hop,
+ dest_mac = dual_if.client_if.get_dest_mac()))
+ conf_t_command_set.append('{mode}arp vrf {dup} {next_hop} {dest_mac} arpa'.format(
+ mode = unconfig_str,
+ dup = dual_if.get_vrf_name(),
+ next_hop = client_net_next_hop,
+ dest_mac = dual_if.server_if.get_dest_mac()))
+ else:
+ # config interfaces with relevant route-map
+ client_if_command_set.append ('{mode}ip policy route-map {p1}_to_{p2}'.format(
+ mode = unconfig_str,
+ p1 = 'p'+str(idx), p2 = 'p'+str(idx+1) ) )
+ server_if_command_set.append ('{mode}ip policy route-map {p2}_to_{p1}'.format(
+ mode = unconfig_str,
+ p1 = 'p'+str(idx), p2 = 'p'+str(idx+1) ) )
+
+ # config route-map routing
+ conf_t_command_set.append('{mode}route-map {p1}_to_{p2} permit 10'.format(
+ mode = unconfig_str,
+ p1 = 'p'+str(idx), p2 = 'p'+str(idx+1) ) )
+ if mode == 'config':
+ conf_t_command_set.append('set ip next-hop {next_hop}'.format(
+ next_hop = client_net_next_hop) )
+ conf_t_command_set.append('{mode}route-map {p2}_to_{p1} permit 10'.format(
+ mode = unconfig_str,
+ p1 = 'p'+str(idx), p2 = 'p'+str(idx+1) ) )
+ if mode == 'config':
+ conf_t_command_set.append('set ip next-hop {next_hop}'.format(
+ next_hop = server_net_next_hop) )
+
+ # config global arp to interfaces net address
+ conf_t_command_set.append('{mode}arp {next_hop} {dest_mac} arpa'.format(
+ mode = unconfig_str,
+ next_hop = server_net_next_hop,
+ dest_mac = dual_if.client_if.get_dest_mac()))
+ conf_t_command_set.append('{mode}arp {next_hop} {dest_mac} arpa'.format(
+ mode = unconfig_str,
+ next_hop = client_net_next_hop,
+ dest_mac = dual_if.server_if.get_dest_mac()))
+
+ # assign generated config list to cache
+ cache.add('IF', server_if_command_set, dual_if.server_if.get_name())
+ cache.add('IF', client_if_command_set, dual_if.client_if.get_name())
+ cache.add('CONF', conf_t_command_set)
+ idx += 2
+
+ # finish handling pre-config cache
+ pre_commit_set = list(pre_commit_set)
+# pre_commit_set.append('exit')
+ pre_commit_cache.add('CONF', pre_commit_set )
+ # deploy the configs (order is important!)
+ self.cmd_link.run_command( [pre_commit_cache, cache] )
+ if self.config_history['basic_if_config']:
+ # in this case, duplicated interfaces will lose its ip address.
+ # re-config IPv4 addresses
+ self.configure_basic_filtered_interfaces(self.if_mngr.get_duplicated_if() )
+
+ def config_no_pbr (self):
+ self.config_pbr(mode = 'unconfig')
+
+ def config_static_routing (self, stat_route_obj, mode = 'config'):
+
+ if mode == 'config':
+ self.stat_route_config = stat_route_obj # save the latest static route config for future removal purposes
+
+ unconfig_str = '' if mode=='config' else 'no '
+ cache = CCommandCache()
+ pre_commit_cache = CCommandCache()
+ pre_commit_set = set([])
+ current_dup_intf = None
+ # client_net = None
+ # server_net = None
+ client_net = stat_route_obj.client_net_start
+ server_net = stat_route_obj.server_net_start
+ conf_t_command_set = []
+
+ for dual_if in self.if_mngr.get_dual_if_list():
+
+ # handle duplicated addressing generation
+ if dual_if.is_duplicated():
+ if dual_if.get_vrf_name() != current_dup_intf:
+ # if this is a dual interfaces, and it is different from the one we proccessed so far, reset static route addressing
+ current_dup_intf = dual_if.get_vrf_name()
+ client_net = stat_route_obj.client_net_start
+ server_net = stat_route_obj.server_net_start
+ else:
+ if current_dup_intf is not None:
+ current_dup_intf = None
+ client_net = stat_route_obj.client_net_start
+ server_net = stat_route_obj.server_net_start
+
+ client_net_next_hop = misc_methods.get_single_net_client_addr(dual_if.server_if.get_ipv4_addr() )
+ server_net_next_hop = misc_methods.get_single_net_client_addr(dual_if.client_if.get_ipv4_addr() )
+
+ # handle static route configuration for the interfaces
+ if dual_if.is_duplicated():
+ client_if_command_set = []
+ server_if_command_set = []
+
+ # define the relevant VRF name
+ pre_commit_set.add('{mode}ip vrf {dup}'.format( mode = unconfig_str, dup = dual_if.get_vrf_name()) )
+
+ # assign VRF to interfaces, config interfaces with relevant route-map
+ client_if_command_set.append ('{mode}ip vrf forwarding {dup}'.format( mode = unconfig_str, dup = dual_if.get_vrf_name()) )
+ server_if_command_set.append ('{mode}ip vrf forwarding {dup}'.format( mode = unconfig_str, dup = dual_if.get_vrf_name()) )
+
+ conf_t_command_set.append( "{mode}ip route vrf {dup} {next_net} {dest_mask} {next_hop}".format(
+ mode = unconfig_str,
+ dup = dual_if.get_vrf_name(),
+ next_net = client_net,
+ dest_mask = stat_route_obj.client_mask,
+ next_hop = client_net_next_hop))
+ conf_t_command_set.append( "{mode}ip route vrf {dup} {next_net} {dest_mask} {next_hop}".format(
+ mode = unconfig_str,
+ dup = dual_if.get_vrf_name(),
+ next_net = server_net,
+ dest_mask = stat_route_obj.server_mask,
+ next_hop = server_net_next_hop))
+
+ # config global arp to interfaces net address and vrf
+ conf_t_command_set.append('{mode}arp vrf {dup} {next_hop} {dest_mac} arpa'.format(
+ mode = unconfig_str,
+ dup = dual_if.get_vrf_name(),
+ next_hop = server_net_next_hop,
+ dest_mac = dual_if.client_if.get_dest_mac()))
+ conf_t_command_set.append('{mode}arp vrf {dup} {next_hop} {dest_mac} arpa'.format(
+ mode = unconfig_str,
+ dup = dual_if.get_vrf_name(),
+ next_hop = client_net_next_hop,
+ dest_mac = dual_if.server_if.get_dest_mac()))
+
+ # assign generated interfaces config list to cache
+ cache.add('IF', server_if_command_set, dual_if.server_if.get_name())
+ cache.add('IF', client_if_command_set, dual_if.client_if.get_name())
+
+ else:
+ conf_t_command_set.append( "{mode}ip route {next_net} {dest_mask} {next_hop}".format(
+ mode = unconfig_str,
+ next_net = client_net,
+ dest_mask = stat_route_obj.client_mask,
+ next_hop = server_net_next_hop))
+ conf_t_command_set.append( "{mode}ip route {next_net} {dest_mask} {next_hop}".format(
+ mode = unconfig_str,
+ next_net = server_net,
+ dest_mask = stat_route_obj.server_mask,
+ next_hop = client_net_next_hop))
+
+ # config global arp to interfaces net address
+ conf_t_command_set.append('{mode}arp {next_hop} {dest_mac} arpa'.format(
+ mode = unconfig_str,
+ next_hop = server_net_next_hop,
+ dest_mac = dual_if.client_if.get_dest_mac()))
+ conf_t_command_set.append('{mode}arp {next_hop} {dest_mac} arpa'.format(
+ mode = unconfig_str,
+ next_hop = client_net_next_hop,
+ dest_mac = dual_if.server_if.get_dest_mac()))
+
+ # bump up to the next client network address
+ client_net = misc_methods.get_single_net_client_addr(client_net, stat_route_obj.net_increment)
+ server_net = misc_methods.get_single_net_client_addr(server_net, stat_route_obj.net_increment)
+
+
+ # finish handling pre-config cache
+ pre_commit_set = list(pre_commit_set)
+# pre_commit_set.append('exit')
+ pre_commit_cache.add('CONF', pre_commit_set )
+ # assign generated config list to cache
+ cache.add('CONF', conf_t_command_set)
+ # deploy the configs (order is important!)
+ self.cmd_link.run_command( [pre_commit_cache, cache] )
+ if self.config_history['basic_if_config']:
+ # in this case, duplicated interfaces will lose its ip address.
+ # re-config IPv4 addresses
+ self.configure_basic_filtered_interfaces(self.if_mngr.get_duplicated_if() )
+
+
+ def config_no_static_routing (self, stat_route_obj = None):
+
+ if stat_route_obj is None and self.stat_route_config is not None:
+ self.config_static_routing(self.stat_route_config, mode = 'unconfig')
+ self.stat_route_config = None # reverse current static route config back to None (no nat config is known to run).
+ elif stat_route_obj is not None:
+ self.config_static_routing(stat_route_obj, mode = 'unconfig')
+ else:
+ raise UserWarning('No static route configuration is available for removal.')
+
+ def config_nbar_pd (self, mode = 'config'):
+ unconfig_str = '' if mode=='config' else 'no '
+ cache = CCommandCache()
+
+ for intf in self.if_mngr.get_if_list(if_type = IFType.Client):
+ cache.add('IF', "{mode}ip nbar protocol-discovery".format( mode = unconfig_str ), intf.get_name())
+
+ self.cmd_link.run_single_command( cache )
+
+ def config_no_nbar_pd (self):
+ self.config_nbar_pd (mode = 'unconfig')
+
+
+ def config_nat_verify (self, mode = 'config'):
+
+ # toggle all duplicate interfaces
+ # dup_ifs = self.if_mngr.get_duplicated_if()
+ if mode=='config':
+ self.toggle_duplicated_intf(action = 'down')
+ # self.__toggle_interfaces(dup_ifs, action = 'down' )
+ else:
+ # if we're in 'unconfig', toggle duplicated interfaces back up
+ self.toggle_duplicated_intf(action = 'up')
+ # self.__toggle_interfaces(dup_ifs)
+
+ def config_no_nat_verify (self):
+ self.config_nat_verify(mode = 'unconfig')
+
+ def config_nat (self, nat_obj, mode = 'config'):
+
+ if mode == 'config':
+ self.nat_config = nat_obj # save the latest nat config for future removal purposes
+
+ cache = CCommandCache()
+ conf_t_command_set = []
+ client_net = nat_obj.clients_net_start
+ pool_net = nat_obj.nat_pool_start
+ unconfig_str = '' if mode=='config' else 'no '
+
+ # toggle all duplicate interfaces
+ # dup_ifs = self.if_mngr.get_duplicated_if()
+ if mode=='config':
+ self.toggle_duplicated_intf(action = 'down')
+ # self.__toggle_interfaces(dup_ifs, action = 'down' )
+ else:
+ # if we're in 'unconfig', toggle duplicated interfaces back up
+ self.toggle_duplicated_intf(action = 'up')
+ # self.__toggle_interfaces(dup_ifs)
+
+ for dual_if in self.if_mngr.get_dual_if_list(is_duplicated = False):
+ cache.add('IF', "{mode}ip nat inside".format( mode = unconfig_str ), dual_if.client_if.get_name())
+ cache.add('IF', "{mode}ip nat outside".format( mode = unconfig_str ), dual_if.server_if.get_name())
+ pool_id = dual_if.get_id() + 1
+
+ conf_t_command_set.append("{mode}ip nat pool pool{pool_num} {start_addr} {end_addr} netmask {mask}".format(
+ mode = unconfig_str,
+ pool_num = pool_id,
+ start_addr = pool_net,
+ end_addr = CNatConfig.calc_pool_end(pool_net, nat_obj.nat_netmask),
+ mask = nat_obj.nat_netmask))
+
+ conf_t_command_set.append("{mode}ip nat inside source list {num} pool pool{pool_num} overload".format(
+ mode = unconfig_str,
+ num = pool_id,
+ pool_num = pool_id ))
+ conf_t_command_set.append("{mode}access-list {num} permit {net_addr} {net_wildcard}".format(
+ mode = unconfig_str,
+ num = pool_id,
+ net_addr = client_net,
+ net_wildcard = nat_obj.client_acl_wildcard))
+
+ # bump up to the next client address
+ client_net = misc_methods.get_single_net_client_addr(client_net, nat_obj.net_increment)
+ pool_net = misc_methods.get_single_net_client_addr(pool_net, nat_obj.net_increment)
+
+
+ # assign generated config list to cache
+ cache.add('CONF', conf_t_command_set)
+
+ # deploy the configs (order is important!)
+ self.cmd_link.run_single_command( cache )
+
+
+ def config_no_nat (self, nat_obj = None):
+ # first, clear all nat translations
+ self.clear_nat_translations()
+
+ # then, decompose the known config
+ if nat_obj is None and self.nat_config is not None:
+ self.config_nat(self.nat_config, mode = 'unconfig')
+ self.nat_config = None # reverse current NAT config back to None (no nat config is known to run).
+ elif nat_obj is not None:
+ self.config_nat(nat_obj, mode = 'unconfig')
+ else:
+ raise UserWarning('No NAT configuration is available for removal.')
+
+
+ def config_zbf (self, mode = 'config'):
+ cache = CCommandCache()
+ pre_commit_cache = CCommandCache()
+ conf_t_command_set = []
+
+ # toggle all duplicate interfaces down
+ self.toggle_duplicated_intf(action = 'down')
+ # dup_ifs = self.if_mngr.get_duplicated_if()
+ # self.__toggle_interfaces(dup_ifs, action = 'down' )
+
+ # define security zones and security service policy to be applied on the interfaces
+ conf_t_command_set.append('class-map type inspect match-any c1')
+ conf_t_command_set.append('match protocol tcp')
+ conf_t_command_set.append('match protocol udp')
+ conf_t_command_set.append('policy-map type inspect p1')
+ conf_t_command_set.append('class type inspect c1')
+ conf_t_command_set.append('inspect')
+ conf_t_command_set.append('class class-default')
+ conf_t_command_set.append('pass')
+
+ conf_t_command_set.append('zone security z_in')
+ conf_t_command_set.append('zone security z_out')
+
+ conf_t_command_set.append('zone-pair security in2out source z_in destination z_out')
+ conf_t_command_set.append('service-policy type inspect p1')
+ conf_t_command_set.append('zone-pair security out2in source z_out destination z_in')
+ conf_t_command_set.append('service-policy type inspect p1')
+ conf_t_command_set.append('exit')
+
+ pre_commit_cache.add('CONF', conf_t_command_set)
+
+ for dual_if in self.if_mngr.get_dual_if_list(is_duplicated = False):
+ cache.add('IF', "zone-member security z_in", dual_if.client_if.get_name() )
+ cache.add('IF', "zone-member security z_out", dual_if.server_if.get_name() )
+
+ self.cmd_link.run_command( [pre_commit_cache, cache] )
+
+ def config_no_zbf (self):
+ cache = CCommandCache()
+ conf_t_command_set = []
+
+ # define security zones and security service policy to be applied on the interfaces
+ conf_t_command_set.append('no zone-pair security in2out source z_in destination z_out')
+ conf_t_command_set.append('no zone-pair security out2in source z_out destination z_in')
+
+ conf_t_command_set.append('no policy-map type inspect p1')
+ conf_t_command_set.append('no class-map type inspect match-any c1')
+
+ conf_t_command_set.append('no zone security z_in')
+ conf_t_command_set.append('no zone security z_out')
+
+ cache.add('CONF', conf_t_command_set)
+
+ for dual_if in self.if_mngr.get_dual_if_list(is_duplicated = False):
+ cache.add('IF', "no zone-member security z_in", dual_if.client_if.get_name() )
+ cache.add('IF', "no zone-member security z_out", dual_if.server_if.get_name() )
+
+ self.cmd_link.run_command( [cache] )
+ # toggle all duplicate interfaces back up
+ self.toggle_duplicated_intf(action = 'up')
+ # dup_ifs = self.if_mngr.get_duplicated_if()
+ # self.__toggle_interfaces(dup_ifs)
+
+
+ def config_ipv6_pbr (self, mode = 'config'):
+ idx = 1
+ unconfig_str = '' if mode=='config' else 'no '
+ cache = CCommandCache()
+ conf_t_command_set = []
+
+ conf_t_command_set.append('{mode}ipv6 unicast-routing'.format(mode = unconfig_str) )
+
+ for dual_if in self.if_mngr.get_dual_if_list():
+ client_if_command_set = []
+ server_if_command_set = []
+
+ client_net_next_hop = misc_methods.get_single_net_client_addr(dual_if.server_if.get_ipv6_addr(), {'7':1}, ip_type = 'ipv6' )
+ server_net_next_hop = misc_methods.get_single_net_client_addr(dual_if.client_if.get_ipv6_addr(), {'7':1}, ip_type = 'ipv6' )
+
+
+ client_if_command_set.append ('{mode}ipv6 enable'.format(mode = unconfig_str))
+ server_if_command_set.append ('{mode}ipv6 enable'.format(mode = unconfig_str))
+
+ if dual_if.is_duplicated():
+ prefix = 'ipv6_' + dual_if.get_vrf_name()
+ else:
+ prefix = 'ipv6'
+
+ # config interfaces with relevant route-map
+ client_if_command_set.append ('{mode}ipv6 policy route-map {pre}_{p1}_to_{p2}'.format(
+ mode = unconfig_str,
+ pre = prefix,
+ p1 = 'p'+str(idx), p2 = 'p'+str(idx+1) ) )
+ server_if_command_set.append ('{mode}ipv6 policy route-map {pre}_{p2}_to_{p1}'.format(
+ mode = unconfig_str,
+ pre = prefix,
+ p1 = 'p'+str(idx), p2 = 'p'+str(idx+1) ) )
+
+ # config global arp to interfaces net address and vrf
+ conf_t_command_set.append('{mode}ipv6 neighbor {next_hop} {intf} {dest_mac}'.format(
+ mode = unconfig_str,
+ next_hop = server_net_next_hop,
+ intf = dual_if.client_if.get_name(),
+ dest_mac = dual_if.client_if.get_dest_mac()))
+ conf_t_command_set.append('{mode}ipv6 neighbor {next_hop} {intf} {dest_mac}'.format(
+ mode = unconfig_str,
+ next_hop = client_net_next_hop,
+ intf = dual_if.server_if.get_name(),
+ dest_mac = dual_if.server_if.get_dest_mac()))
+
+ conf_t_command_set.append('{mode}route-map {pre}_{p1}_to_{p2} permit 10'.format(
+ mode = unconfig_str,
+ pre = prefix,
+ p1 = 'p'+str(idx), p2 = 'p'+str(idx+1) ) )
+ if (mode == 'config'):
+ conf_t_command_set.append('set ipv6 next-hop {next_hop}'.format(next_hop = client_net_next_hop ) )
+ conf_t_command_set.append('{mode}route-map {pre}_{p2}_to_{p1} permit 10'.format(
+ mode = unconfig_str,
+ pre = prefix,
+ p1 = 'p'+str(idx), p2 = 'p'+str(idx+1) ) )
+ if (mode == 'config'):
+ conf_t_command_set.append('set ipv6 next-hop {next_hop}'.format(next_hop = server_net_next_hop ) )
+ conf_t_command_set.append('exit')
+
+ # assign generated config list to cache
+ cache.add('IF', server_if_command_set, dual_if.server_if.get_name())
+ cache.add('IF', client_if_command_set, dual_if.client_if.get_name())
+ idx += 2
+
+ cache.add('CONF', conf_t_command_set)
+
+ # deploy the configs (order is important!)
+ self.cmd_link.run_command( [cache] )
+
+ def config_no_ipv6_pbr (self):
+ self.config_ipv6_pbr(mode = 'unconfig')
+
+ # show methods
+ def get_cpu_util (self):
+ response = self.cmd_link.run_single_command('show platform hardware qfp active datapath utilization | inc Load')
+ return CShowParser.parse_cpu_util_stats(response)
+
+ def get_cft_stats (self):
+ response = self.cmd_link.run_single_command('test platform hardware qfp active infrastructure cft datapath function cft-cpp-show-all-instances')
+ return CShowParser.parse_cft_stats(response)
+
+ def get_nbar_stats (self):
+ per_intf_stats = {}
+ for intf in self.if_mngr.get_if_list(if_type = IFType.Client):
+ response = self.cmd_link.run_single_command("show ip nbar protocol-discovery interface {interface} stats packet-count protocol".format( interface = intf.get_name() ), flush_first = True)
+ per_intf_stats[intf.get_name()] = CShowParser.parse_nbar_stats(response)
+ return per_intf_stats
+
+ def get_nbar_profiling_stats (self):
+ response = self.cmd_link.run_single_command("show platform hardware qfp active feature nbar profiling")
+ return CShowParser.parse_nbar_profiling_stats(response)
+
+ def get_drop_stats (self):
+
+ response = self.cmd_link.run_single_command('show platform hardware qfp active interface all statistics', flush_first = True)
+ # print response
+ # response = self.cmd_link.run_single_command('show platform hardware qfp active interface all statistics')
+ # print response
+ if_list_by_name = map( lambda x: x.get_name(), self.if_mngr.get_if_list() )
+ return CShowParser.parse_drop_stats(response, if_list_by_name )
+
+ def get_nat_stats (self):
+ response = self.cmd_link.run_single_command('show ip nat statistics')
+ return CShowParser.parse_nat_stats(response)
+
+ def get_cvla_memory_usage(self):
+ response = self.cmd_link.run_single_command('show platform hardware qfp active infrastructure cvla client handles')
+ # (res, res2) = CShowParser.parse_cvla_memory_usage(response)
+ return CShowParser.parse_cvla_memory_usage(response)
+
+
+ # clear methods
+ def clear_nat_translations(self):
+ pre_commit_cache = CCommandCache()
+ pre_commit_cache.add('EXEC', 'clear ip nat translation *')
+ self.cmd_link.run_single_command( pre_commit_cache )
+
+ def clear_cft_counters (self):
+ """ clear_cft_counters(self) -> None
+
+ Clears the CFT counters on the platform
+ """
+ self.cmd_link.run_single_command('test platform hardware qfp active infrastructure cft datapath function cft-cpp-clear-instance-stats')
+
+ def clear_counters(self):
+ """ clear_counters(self) -> None
+
+ Clears the platform counters
+ """
+
+ pre_commit_cache = CCommandCache()
+ pre_commit_cache.add('EXEC', ['clear counters','\r'] )
+ self.cmd_link.run_single_command( pre_commit_cache )
+
+ def clear_nbar_stats(self):
+ """ clear_nbar_stats(self) -> None
+
+ Clears the NBAR-PD classification stats
+ """
+ pre_commit_cache = CCommandCache()
+ pre_commit_cache.add('EXEC', ['clear ip nbar protocol-discovery','\r'] )
+ self.cmd_link.run_single_command( pre_commit_cache )
+
+ def clear_packet_drop_stats(self):
+ """ clear_packet_drop_stats(self) -> None
+
+ Clears packet-drop stats
+ """
+# command = "show platform hardware qfp active statistics drop clear"
+ self.cmd_link.run_single_command('show platform hardware qfp active interface all statistics clear_drop')
+
+ ###########################################
+ # file transfer and image loading methods #
+ ###########################################
+ def get_running_image_details (self):
+ """ get_running_image_details() -> dict
+
+ Check for the currently running image file on the platform.
+ Returns a dictionary, where 'drive' key is the drive in which the image is installed,
+ and 'image' key is the actual image file used.
+ """
+ response = self.cmd_link.run_single_command('show version | include System image')
+ parsed_info = CShowParser.parse_show_image_version(response)
+ self.running_image = parsed_info
+ return parsed_info
+
+
+ def check_image_existence (self, img_name):
+ """ check_image_existence(self, img_name) -> boolean
+
+ Parameters
+ ----------
+ img_name : str
+ a string represents the image name.
+
+ Check if the image file defined in the platform_config already loaded into the platform.
+ """
+ search_drives = ['bootflash', 'harddisk', self.running_image['drive']]
+ for search_drive in search_drives:
+ command = "dir {drive}: | include {image}".format(drive = search_drive, image = img_name)
+ response = self.cmd_link.run_single_command(command, timeout = 10)
+ if CShowParser.parse_image_existence(response, img_name):
+ self.needed_image_path = '%s:%s' % (search_drive, img_name)
+ print 'Found image in platform:', self.needed_image_path
+ return True
+ return False
+
+ def config_tftp_server(self, device_cfg_obj, external_tftp_config = None, applyToPlatform = False):
+ """ configure_tftp_server(self, external_tftp_config, applyToPlatform) -> str
+
+ Parameters
+ ----------
+ external_tftp_config : dict (Not is use)
+ A path to external tftp config file other than using the one defined in the instance.
+ applyToPlatform : boolean
+ set to True in order to apply the config into the platform
+
+ Configures the tftp server on an interface of the platform.
+ """
+# tmp_tftp_config = external_tftp_config if external_tftp_config is not None else self.tftp_server_config
+ self.tftp_cfg = device_cfg_obj.get_tftp_info()
+ cache = CCommandCache()
+
+ command = "ip tftp source-interface {intf}".format( intf = device_cfg_obj.get_mgmt_interface() )
+ cache.add('CONF', command )
+ self.cmd_link.run_single_command(cache)
+ self.config_history['tftp_server_config'] = True
+
+ def load_platform_image(self, img_filename, external_tftp_config = None):
+ """ load_platform_image(self, img_filename, external_tftp_config) -> None
+
+ Parameters
+ ----------
+ external_tftp_config : dict
+ A path to external tftp config file other than using the one defined in the instance.
+ img_filename : str
+ image name to be saved into the platforms drive.
+
+ This method loads the configured image into the platform's harddisk (unless it is already loaded),
+ and sets that image to be the boot_image of the platform.
+ """
+ if not self.check_image_existence(img_filename): # check if this image isn't already saved in platform
+ #tmp_tftp_config = external_tftp_config if external_tftp_config is not None else self.tftp_cfg
+
+ if self.config_history['tftp_server_config']: # make sure a TFTP configuration has been loaded
+ cache = CCommandCache()
+ if self.running_image is None:
+ self.get_running_image_details()
+
+ command = "copy tftp://{tftp_ip}/{img_path}/{image} harddisk:".format(
+ tftp_ip = self.tftp_cfg['ip_address'],
+ img_path = self.tftp_cfg['images_path'],
+ image = img_filename)
+ cache.add('EXEC', [command, '\r', '\r'])
+
+ progress_thread = CProgressDisp.ProgressThread(notifyMessage = "Copying image via tftp, this may take a while...\n")
+ progress_thread.start()
+
+ response = self.cmd_link.run_single_command(cache, timeout = 900, read_until = ['\?', '\#'])
+ print "RESPONSE:"
+ print response
+ progress_thread.join()
+ copy_ok = CShowParser.parse_file_copy(response)
+
+ if not copy_ok:
+ raise UserWarning('Image file loading failed. Please make sure the accessed image exists and has read privileges')
+ else:
+ raise UserWarning('TFTP configuration is not available. Please make sure a valid TFTP configuration has been provided')
+
+ def set_boot_image(self, boot_image):
+ """ set_boot_image(self, boot_image) -> None
+
+ Parameters
+ ----------
+ boot_image : str
+ An image file to be set as boot_image
+
+ Configures boot_image as the boot image of the platform into the running-config + config-register
+ """
+ cache = CCommandCache()
+ if self.needed_image_path is None:
+ if not self.check_image_existence(boot_image):
+ raise Exception("Trying to set boot image that's not found in router, please copy it first.")
+
+ boot_img_cmd = "boot system flash %s" % self.needed_image_path
+ config_register_cmd = "config-register 0x2021"
+ cache.add('CONF', ["no boot system", boot_img_cmd, config_register_cmd])
+ self.cmd_link.run_single_command( cache )
+ self.save_config_to_startup_config()
+
+ def is_image_matches(self, needed_image):
+ """ set_boot_image(self, needed_image) -> boolean
+
+ Parameters
+ ----------
+ needed_image : str
+ An image file to compare router running image
+
+ Compares image name to router running image, returns match result.
+
+ """
+ if self.running_image is None:
+ self.get_running_image_details()
+ needed_image = needed_image.lower()
+ current_image = self.running_image['image'].lower()
+ if needed_image.find(current_image) != -1:
+ return True
+ if current_image.find(needed_image) != -1:
+ return True
+ return False
+
+ # misc class related methods
+
+ def load_platform_data_from_file (self, device_cfg_obj):
+ self.if_mngr.load_config(device_cfg_obj)
+
+ def launch_connection (self, device_cfg_obj):
+ self.running_image = None # clear the image name "cache"
+ self.cmd_link.launch_platform_connectivity(device_cfg_obj)
+
+ def reload_connection (self, device_cfg_obj):
+ self.cmd_link.close_platform_connection()
+ self.launch_connection(device_cfg_obj)
+
+ def save_config_to_startup_config (self):
+ """ save_config_to_startup_config(self) -> None
+
+ Copies running-config into startup-config.
+ """
+ self.cmd_link.run_single_command('wr')
+
+ def reload_platform(self, device_cfg_obj):
+ """ reload_platform(self) -> None
+
+ Reloads the platform.
+ """
+ from subprocess import call
+ import os
+ i = 0
+ sleep_time = 30 # seconds
+
+ try:
+ cache = CCommandCache()
+
+ cache.add('EXEC', ['reload','n\r','\r'] )
+ self.cmd_link.run_single_command( cache )
+
+ progress_thread = CProgressDisp.ProgressThread(notifyMessage = "Reloading the platform, this may take a while...\n")
+ progress_thread.start()
+ time.sleep(60) # need delay for device to shut down before polling it
+ # poll the platform until ping response is received.
+ while True:
+ time.sleep(sleep_time)
+ try:
+ x = call(["ping", "-c 1", device_cfg_obj.get_ip_address()], stdout = open(os.devnull, 'wb'))
+ except:
+ x = 1
+ if x == 0:
+ break
+ elif i > 20:
+ raise TimeoutError('Platform failed to reload after reboot for over {minutes} minutes!'.format(minutes = round(1 + i * sleep_time / 60)))
+ else:
+ i += 1
+
+ time.sleep(30)
+ self.reload_connection(device_cfg_obj)
+ finally:
+ progress_thread.join()
+
+ def get_if_manager(self):
+ return self.if_mngr
+
+ def dump_obj_config (self, object_name):
+ if object_name=='nat' and self.nat_config is not None:
+ self.nat_config.dump_config()
+ elif object_name=='static_route' and self.stat_route_config is not None:
+ self.stat_route_config.dump_config()
+ else:
+ raise UserWarning('No known configuration exists.')
+
+ def toggle_duplicated_intf(self, action = 'down'):
+
+ dup_ifs = self.if_mngr.get_duplicated_if()
+ self.__toggle_interfaces( dup_ifs, action = action )
+
+
+ def __toggle_interfaces (self, intf_list, action = 'up'):
+ cache = CCommandCache()
+ mode_str = 'no ' if action == 'up' else ''
+
+ for intf_obj in intf_list:
+ cache.add('IF', '{mode}shutdown'.format(mode = mode_str), intf_obj.get_name())
+
+ self.cmd_link.run_single_command( cache )
+
+
+class CStaticRouteConfig(object):
+
+ def __init__(self, static_route_dict):
+ self.clients_start = static_route_dict['clients_start']
+ self.servers_start = static_route_dict['servers_start']
+ self.net_increment = misc_methods.gen_increment_dict(static_route_dict['dual_port_mask'])
+ self.client_mask = static_route_dict['client_destination_mask']
+ self.server_mask = static_route_dict['server_destination_mask']
+ self.client_net_start = self.extract_net_addr(self.clients_start, self.client_mask)
+ self.server_net_start = self.extract_net_addr(self.servers_start, self.server_mask)
+ self.static_route_dict = static_route_dict
+
+ def extract_net_addr (self, ip_addr, ip_mask):
+ addr_lst = ip_addr.split('.')
+ mask_lst = ip_mask.split('.')
+ mask_lst = map(lambda x,y: int(x) & int(y), addr_lst, mask_lst )
+ masked_str = map(lambda x: str(x), mask_lst )
+ return '.'.join(masked_str)
+
+ def dump_config (self):
+ import yaml
+ print yaml.dump( self.static_route_dict , default_flow_style=False)
+
+
+class CNatConfig(object):
+ def __init__(self, nat_dict):
+ self.clients_net_start = nat_dict['clients_net_start']
+ self.client_acl_wildcard= nat_dict['client_acl_wildcard_mask']
+ self.net_increment = misc_methods.gen_increment_dict(nat_dict['dual_port_mask'])
+ self.nat_pool_start = nat_dict['pool_start']
+ self.nat_netmask = nat_dict['pool_netmask']
+ self.nat_dict = nat_dict
+
+ @staticmethod
+ def calc_pool_end (nat_pool_start, netmask):
+ pool_start_lst = map(lambda x: int(x), nat_pool_start.split('.') )
+ pool_end_lst = list( pool_start_lst ) # create new list object, don't point to the original one
+ mask_lst = map(lambda x: int(x), netmask.split('.'))
+ curr_octet = 3 # start with the LSB octet
+ inc_val = 1
+
+ while True:
+ tmp_masked = inc_val & mask_lst[curr_octet]
+ if tmp_masked == 0:
+ if (inc_val << 1) > 255:
+ inc_val = 1
+ pool_end_lst[curr_octet] = 255
+ curr_octet -= 1
+ else:
+ inc_val <<= 1
+ else:
+ pool_end_lst[curr_octet] += (inc_val - 1)
+ break
+ return '.'.join(map(lambda x: str(x), pool_end_lst))
+
+ def dump_config (self):
+ import yaml
+ print yaml.dump( self.nat_dict , default_flow_style=False)
+
+
+if __name__ == "__main__":
+ pass
diff --git a/scripts/automation/regression/CProgressDisp.py b/scripts/automation/regression/CProgressDisp.py
new file mode 100755
index 00000000..ec7920c3
--- /dev/null
+++ b/scripts/automation/regression/CProgressDisp.py
@@ -0,0 +1,87 @@
+#!/router/bin/python
+
+import threading
+import sys
+import time
+import outer_packages
+import termstyle
+import progressbar
+
+
+class ProgressThread(threading.Thread):
+ def __init__(self, notifyMessage = None):
+ super(ProgressThread, self).__init__()
+ self.stoprequest = threading.Event()
+ self.notifyMessage = notifyMessage
+
+ def run(self):
+ if self.notifyMessage is not None:
+ print(self.notifyMessage),
+
+ while not self.stoprequest.is_set():
+ print "\b.",
+ sys.stdout.flush()
+ time.sleep(5)
+
+ def join(self, timeout=None):
+ if self.notifyMessage is not None:
+ print termstyle.green("Done!\n"),
+ self.stoprequest.set()
+ super(ProgressThread, self).join(timeout)
+
+
+class TimedProgressBar(threading.Thread):
+ def __init__(self, time_in_secs):
+ super(TimedProgressBar, self).__init__()
+ self.stoprequest = threading.Event()
+ self.stopFlag = False
+ self.time_in_secs = time_in_secs + 15 # 80 # taking 15 seconds extra
+ widgets = ['Running T-Rex: ', progressbar.Percentage(), ' ',
+ progressbar.Bar(marker='>',left='[',right=']'),
+ ' ', progressbar.ETA()]
+ self.pbar = progressbar.ProgressBar(widgets=widgets, maxval=self.time_in_secs*2)
+
+
+ def run (self):
+ # global g_stop
+ print
+ self.pbar.start()
+
+ try:
+ for i in range(0, self.time_in_secs*2 + 1):
+ if (self.stopFlag == True):
+ break
+ time.sleep(0.5)
+ self.pbar.update(i)
+ # self.pbar.finish()
+
+ except KeyboardInterrupt:
+ # self.pbar.finish()
+ print "\nInterrupted by user!!"
+ self.join()
+ finally:
+ print
+
+ def join(self, isPlannedStop = True, timeout=None):
+ if isPlannedStop:
+ self.pbar.update(self.time_in_secs*2)
+ self.stopFlag = True
+ else:
+ self.stopFlag = True # Stop the progress bar in its current location
+ self.stoprequest.set()
+ super(TimedProgressBar, self).join(timeout)
+
+
+def timedProgressBar(time_in_secs):
+ widgets = ['Running T-Rex: ', progressbar.Percentage(), ' ',
+ Bar(marker='>',left='[',right=']'),
+ ' ', progressbar.ETA()]
+ pbar = progressbar.ProgressBar(widgets=widgets, maxval=time_in_secs*2)
+ pbar.start()
+ for i in range(0, time_in_secs*2 + 1):
+ time.sleep(0.5)
+ pbar.update(i)
+ pbar.finish()
+ print
+
+
diff --git a/scripts/automation/regression/CShowParser.py b/scripts/automation/regression/CShowParser.py
new file mode 100755
index 00000000..b3120eb1
--- /dev/null
+++ b/scripts/automation/regression/CShowParser.py
@@ -0,0 +1,228 @@
+#!/router/bin/python-2.7.4
+
+import re
+import misc_methods
+
+class PlatformResponseMissmatch(Exception):
+ def __init__(self, message):
+ # Call the base class constructor with the parameters it needs
+ super(PlatformResponseMissmatch, self).__init__(message + ' is not available for given platform state and data.\nPlease make sure the relevant features are turned on in the platform.')
+
+class PlatformResponseAmbiguity(Exception):
+ def __init__(self, message):
+ # Call the base class constructor with the parameters it needs
+ super(PlatformResponseAmbiguity, self).__init__(message + ' found more than one file matching the provided filename.\nPlease provide more distinct filename.')
+
+
+class CShowParser(object):
+
+ @staticmethod
+ def parse_drop_stats (query_response, interfaces_list):
+ res = {'total_drops' : 0}
+ response_lst = query_response.split('\r\n')
+ mtch_found = 0
+
+ for line in response_lst:
+ mtch = re.match("^\s*(\w+/\d/\d)\s+(\d+)\s+(\d+)", line)
+ if mtch:
+ mtch_found += 1
+ if (mtch.group(1) in interfaces_list):
+ res[mtch.group(1)] = (int(mtch.group(2)) + int(mtch.group(3)))
+ res['total_drops'] += (int(mtch.group(2)) + int(mtch.group(3)))
+# if mtch_found == 0: # no matches found at all
+# raise PlatformResponseMissmatch('Drop stats')
+# else:
+# return res
+ return res
+
+ @staticmethod
+ def parse_nbar_stats (query_response):
+ response_lst = query_response.split('\r\n')
+ stats = {}
+ final_stats = {}
+ mtch_found = 0
+
+ for line in response_lst:
+ mtch = re.match("\s*([\w-]+)\s*(\d+)\s*(\d+)\s+", line)
+ if mtch:
+ mtch_found += 1
+ key = mtch.group(1)
+ pkt_in = int(mtch.group(2))
+ pkt_out = int(mtch.group(3))
+
+ avg_pkt_cnt = ( pkt_in + pkt_out )/2
+ if avg_pkt_cnt == 0.0:
+ # escaping zero division case
+ continue
+ if stats.has_key(key) :
+ stats[key] += avg_pkt_cnt
+ else:
+ stats[key] = avg_pkt_cnt
+
+ # Normalize the results to percents
+ for protocol in stats:
+ protocol_norm_stat = int(stats[protocol]*10000/stats['Total'])/100.0 # round the result to x.xx format
+ if (protocol_norm_stat != 0.0):
+ final_stats[protocol] = protocol_norm_stat
+
+ if mtch_found == 0: # no matches found at all
+ raise PlatformResponseMissmatch('NBAR classification stats')
+ else:
+ return { 'percentage' : final_stats, 'packets' : stats }
+
+ @staticmethod
+ def parse_nat_stats (query_response):
+ response_lst = query_response.split('\r\n')
+ res = {}
+ mtch_found = 0
+
+ for line in response_lst:
+ mtch = re.match("Total (active translations):\s+(\d+).*(\d+)\s+static,\s+(\d+)\s+dynamic", line)
+ if mtch:
+ mtch_found += 1
+ res['total_active_trans'] = int(mtch.group(2))
+ res['static_active_trans'] = int(mtch.group(3))
+ res['dynamic_active_trans'] = int(mtch.group(4))
+ continue
+
+ mtch = re.match("(Hits):\s+(\d+)\s+(Misses):\s+(\d+)", line)
+ if mtch:
+ mtch_found += 1
+ res['num_of_hits'] = int(mtch.group(2))
+ res['num_of_misses'] = int(mtch.group(4))
+
+ if mtch_found == 0: # no matches found at all
+ raise PlatformResponseMissmatch('NAT translations stats')
+ else:
+ return res
+
+ @staticmethod
+ def parse_cpu_util_stats (query_response):
+ response_lst = query_response.split('\r\n')
+ res = { 'cpu0' : 0,
+ 'cpu1' : 0 }
+ mtch_found = 0
+ for line in response_lst:
+ mtch = re.match("\W*Processing: Load\D*(\d+)\D*(\d+)\D*(\d+)\D*(\d+)\D*", line)
+ if mtch:
+ mtch_found += 1
+ res['cpu0'] += float(mtch.group(1))
+ res['cpu1'] += float(mtch.group(2))
+
+ if mtch_found == 0: # no matches found at all
+ raise PlatformResponseMissmatch('CPU utilization processing')
+ else:
+ res['cpu0'] = res['cpu0']/mtch_found
+ res['cpu1'] = res['cpu1']/mtch_found
+ return res
+
+ @staticmethod
+ def parse_cft_stats (query_response):
+ response_lst = query_response.split('\r\n')
+ res = {}
+ mtch_found = 0
+ for line in response_lst:
+ mtch = re.match("\W*(\w+)\W*([:]|[=])\W*(\d+)", line)
+ if mtch:
+ mtch_found += 1
+ res[ str( mix_string(m.group(1)) )] = float(m.group(3))
+ if mtch_found == 0: # no matches found at all
+ raise PlatformResponseMissmatch('CFT counters stats')
+ else:
+ return res
+
+
+ @staticmethod
+ def parse_cvla_memory_usage(query_response):
+ response_lst = query_response.split('\r\n')
+ res = {}
+ res2 = {}
+ cnt = 0
+ state = 0
+ name = ''
+ number = 0.0
+
+ for line in response_lst:
+ if state == 0:
+ mtch = re.match("\W*Entity name:\W*(\w[^\r\n]+)", line)
+ if mtch:
+ name = misc_methods.mix_string(mtch.group(1))
+ state = 1
+ cnt += 1
+ elif state == 1:
+ mtch = re.match("\W*Handle:\W*(\d+)", line)
+ if mtch:
+ state = state + 1
+ else:
+ state = 0;
+ elif state == 2:
+ mtch = re.match("\W*Number of allocations:\W*(\d+)", line)
+ if mtch:
+ state = state + 1
+ number=float(mtch.group(1))
+ else:
+ state = 0;
+ elif state == 3:
+ mtch = re.match("\W*Memory allocated:\W*(\d+)", line)
+ if mtch:
+ state = 0
+ res[name] = float(mtch.group(1))
+ res2[name] = number
+ else:
+ state = 0
+ if cnt == 0:
+ raise PlatformResponseMissmatch('CVLA memory usage stats')
+
+ return (res,res2)
+
+
+ @staticmethod
+ def parse_show_image_version(query_response):
+ response_lst = query_response.split('\r\n')
+ res = {}
+
+ for line in response_lst:
+ mtch = re.match("System image file is \"(\w+):(.*/)?(.+)\"", line)
+ if mtch:
+ res['drive'] = mtch.group(1)
+ res['image'] = mtch.group(3)
+ return res
+
+ raise PlatformResponseMissmatch('Running image info')
+
+
+ @staticmethod
+ def parse_image_existence(query_response, img_name):
+ response_lst = query_response.split('\r\n')
+ cnt = 0
+
+ for line in response_lst:
+ regex = re.compile(".* (?!include) %s" % img_name )
+ mtch = regex.match(line)
+ if mtch:
+ cnt += 1
+ if cnt == 1:
+ return True
+ elif cnt > 1:
+ raise PlatformResponseAmbiguity('Image existence')
+ else:
+ return False
+
+ @staticmethod
+ def parse_file_copy (query_response):
+ rev_response_lst = reversed(query_response.split('\r\n'))
+ lines_parsed = 0
+
+ for line in rev_response_lst:
+ mtch = re.match("\[OK - (\d+) bytes\]", line)
+ if mtch:
+ return True
+ lines_parsed += 1
+
+ if lines_parsed > 5:
+ return False
+ return False
+
+
+if __name__ == "__main__":
+ pass
diff --git a/scripts/automation/regression/CustomLogger.py b/scripts/automation/regression/CustomLogger.py
new file mode 100755
index 00000000..14ef1362
--- /dev/null
+++ b/scripts/automation/regression/CustomLogger.py
@@ -0,0 +1,36 @@
+
+import sys
+import os
+import logging
+
+
+# def setup_custom_logger(name, log_path = None):
+# logging.basicConfig(level = logging.INFO,
+# format = '%(asctime)s %(name)-10s %(module)-20s %(levelname)-8s %(message)s',
+# datefmt = '%m-%d %H:%M')
+
+
+def setup_custom_logger(name, log_path = None):
+ # first make sure path availabe
+ if log_path is None:
+ log_path = os.getcwd()+'/trex_log.log'
+ else:
+ directory = os.path.dirname(log_path)
+ if not os.path.exists(directory):
+ os.makedirs(directory)
+ logging.basicConfig(level = logging.DEBUG,
+ format = '%(asctime)s %(name)-10s %(module)-20s %(levelname)-8s %(message)s',
+ datefmt = '%m-%d %H:%M',
+ filename= log_path,
+ filemode= 'w')
+
+ # define a Handler which writes INFO messages or higher to the sys.stderr
+ consoleLogger = logging.StreamHandler()
+ consoleLogger.setLevel(logging.ERROR)
+ # set a format which is simpler for console use
+ formatter = logging.Formatter('%(name)-12s: %(levelname)-8s %(message)s')
+ # tell the handler to use this format
+ consoleLogger.setFormatter(formatter)
+
+ # add the handler to the logger
+ logging.getLogger(name).addHandler(consoleLogger) \ No newline at end of file
diff --git a/scripts/automation/regression/aggregate_results.py b/scripts/automation/regression/aggregate_results.py
new file mode 100755
index 00000000..a3a90fbf
--- /dev/null
+++ b/scripts/automation/regression/aggregate_results.py
@@ -0,0 +1,492 @@
+# -*- coding: utf-8 -*-
+import xml.etree.ElementTree as ET
+import argparse
+import glob
+from pprint import pprint
+import sys, os
+from collections import OrderedDict
+import copy
+import datetime, time
+import cPickle as pickle
+import subprocess, shlex
+
+FUNCTIONAL_CATEGORY = 'Functional' # how to display those categories
+ERROR_CATEGORY = 'Error'
+
+
+def pad_tag(text, tag):
+ return '<%s>%s</%s>' % (tag, text, tag)
+
+def is_functional_test_name(testname):
+ if testname.startswith('platform_') or testname.startswith('misc_methods_'):
+ return True
+ return False
+
+def is_good_status(text):
+ return text in ('Successful', 'Fixed', 'Passed', 'True', 'Pass')
+
+# input: xml element with test result
+# output string: 'error', 'failure', 'skipped', 'passed'
+def get_test_result(test):
+ for child in test.getchildren():
+ if child.tag in ('error', 'failure', 'skipped'):
+ return child.tag
+ return 'passed'
+
+# returns row of table with <th> and <td> columns - key: value
+def add_th_td(key, value):
+ return '<tr><th>%s</th><td>%s</td></tr>\n' % (key, value)
+
+# returns row of table with <td> and <td> columns - key: value
+def add_td_td(key, value):
+ return '<tr><td>%s</td><td>%s</td></tr>\n' % (key, value)
+
+# returns row of table with <th> and <th> columns - key: value
+def add_th_th(key, value):
+ return '<tr><th>%s</th><th>%s</th></tr>\n' % (key, value)
+
+# returns <div> with table of tests under given category.
+# category - string with name of category
+# hidden - bool, true = <div> is hidden by CSS
+# tests - list of tests, derived from aggregated xml report, changed a little to get easily stdout etc.
+# category_info_dir - folder to search for category info file
+# expanded - bool, false = outputs (stdout etc.) of tests are hidden by CSS
+# brief - bool, true = cut some part of tests outputs (useful for errors section with expanded flag)
+def add_category_of_tests(category, tests, hidden = False, category_info_dir = None, expanded = False, brief = False):
+ is_actual_category = category not in (FUNCTIONAL_CATEGORY, ERROR_CATEGORY)
+ html_output = '<div style="display:%s;" id="cat_tglr_%s">\n' % ('none' if hidden else 'block', category)
+
+ if is_actual_category:
+ html_output += '<br><table class="reference">\n'
+
+ if category_info_dir:
+ category_info_file = '%s/report_%s.info' % (category_info_dir, category)
+ if os.path.exists(category_info_file):
+ with open(category_info_file) as f:
+ for info_line in f.readlines():
+ key_value = info_line.split(':', 1)
+ if key_value[0].startswith('User'): # always 'hhaim', no need to show
+ continue
+ html_output += add_th_td('%s:' % key_value[0], key_value[1])
+ else:
+ html_output += add_th_td('Info:', 'No info')
+ print 'add_category_of_tests: no category info %s' % category_info_file
+ if len(tests):
+ total_duration = 0.0
+ for test in tests:
+ total_duration += float(test.attrib['time'])
+ html_output += add_th_td('Tests duration:', datetime.timedelta(seconds = int(total_duration)))
+ html_output += '</table>\n'
+
+ if not len(tests):
+ return html_output + pad_tag('<br><font color=red>No tests!</font>', 'b') + '</div>'
+ html_output += '<br>\n<table class="reference">\n<tr><th align="left">'
+
+ if category == ERROR_CATEGORY:
+ html_output += 'Setup</th><th align="left">Failed tests:'
+ else:
+ html_output += '%s tests:' % category
+ html_output += '</th><th align="center">Final Result</th>\n<th align="center">Time (s)</th>\n</tr>\n'
+ for test in tests:
+ functional_test = is_functional_test_name(test.attrib['name'])
+ if functional_test and is_actual_category:
+ continue
+ if category == ERROR_CATEGORY:
+ test_id = ('err_' + test.attrib['classname'] + test.attrib['name']).replace('.', '_')
+ else:
+ test_id = (category + test.attrib['name']).replace('.', '_')
+ if expanded:
+ html_output += '<tr>\n<th>'
+ else:
+ html_output += '<tr onclick=tgl_test("%s") class=linktr>\n<td class=linktext>' % test_id
+ if category == ERROR_CATEGORY:
+ html_output += FUNCTIONAL_CATEGORY if functional_test else test.attrib['classname']
+ if expanded:
+ html_output += '</th><td>'
+ else:
+ html_output += '</td><td class=linktext>'
+ html_output += '%s</td>\n<td align="center">' % test.attrib['name']
+ test_result = get_test_result(test)
+ if test_result == 'error':
+ html_output += '<font color="red"><b>ERROR</b></font></td>'
+ elif test_result == 'failure':
+ html_output += '<font color="red"><b>FAILED</b></font></td>'
+ elif test_result == 'skipped':
+ html_output += '<font color="blue"><b>SKIPPED</b></font></td>'
+ else:
+ html_output += '<font color="green"><b>PASSED</b></font></td>'
+ html_output += '<td align="center"> '+ test.attrib['time'] + '</td></center></tr>'
+
+ result, result_text = test.attrib.get('result', ('', ''))
+ if result_text:
+ result_text = '<b style="color:000080;">%s:</b><br>%s<br><br>' % (result.capitalize(), result_text.replace('\n', '<br>'))
+ stderr = '' if brief and result_text else test.get('stderr', '')
+ if stderr:
+ stderr = '<b style="color:000080;"><text color=000080>Stderr</text>:</b><br>%s<br><br>\n' % stderr.replace('\n', '<br>')
+ stdout = '' if brief and result_text else test.get('stdout', '')
+ if stdout:
+ if brief: # cut off server logs
+ stdout = stdout.split('>>>>>>>>>>>>>>>', 1)[0]
+ stdout = '<b style="color:000080;">Stdout:</b><br>%s<br><br>\n' % stdout.replace('\n', '<br>')
+
+ html_output += '<tr style="%scolor:603000;" id="%s"><td colspan=%s>' % ('' if expanded else 'display:none;', test_id, 4 if category == ERROR_CATEGORY else 3)
+ if result_text or stderr or stdout:
+ html_output += '%s%s%s</td></tr>' % (result_text, stderr, stdout)
+ else:
+ html_output += '<b style="color:000080;">No output</b></td></tr>'
+
+ html_output += '\n</table>\n</div>'
+ return html_output
+
+# main
+if __name__ == '__main__':
+
+ # deal with input args
+ argparser = argparse.ArgumentParser(description='Aggregate test results of from ./reports dir, produces xml, html, mail report.')
+ argparser.add_argument('--input_dir', default='./reports',
+ help='Directory with xmls/setups info. Filenames: report_<setup name>.xml/report_<setup name>.info')
+ argparser.add_argument('--output_xml', default='./reports/aggregated_tests.xml',
+ dest = 'output_xmlfile', help='Name of output xml file with aggregated results.')
+ argparser.add_argument('--output_html', default='./reports/aggregated_tests.html',
+ dest = 'output_htmlfile', help='Name of output html file with aggregated results.')
+ argparser.add_argument('--output_mail', default='./reports/aggregated_tests_mail.html',
+ dest = 'output_mailfile', help='Name of output html file with aggregated results for mail.')
+ argparser.add_argument('--output_title', default='./reports/aggregated_tests_title.txt',
+ dest = 'output_titlefile', help='Name of output file to contain title of mail.')
+ argparser.add_argument('--build_status_file', default='./reports/build_status',
+ dest = 'build_status_file', help='Name of output file to save scenaries build results (should not be wiped).')
+ args = argparser.parse_args()
+
+
+##### get input variables/TRex commit info
+
+ scenario = os.environ.get('SCENARIO')
+ build_url = os.environ.get('BUILD_URL')
+ build_id = os.environ.get('BUILD_ID')
+ trex_last_commit_hash = os.environ.get('TREX_LAST_COMMIT_HASH') # TODO: remove it, take from setups info
+ trex_repo = os.environ.get('TREX_CORE_REPO')
+ if not scenario:
+ print 'Warning: no environment variable SCENARIO, using default'
+ scenario = 'TRex regression'
+ if not build_url:
+ print 'Warning: no environment variable BUILD_URL'
+ if not build_id:
+ print 'Warning: no environment variable BUILD_ID'
+ trex_last_commit_info = ''
+ if scenario == 'trex_build' and trex_last_commit_hash and trex_repo:
+ try:
+ print 'Getting TRex commit with hash %s' % trex_last_commit_hash
+ command = 'timeout 10 git --git-dir %s show %s --quiet' % (trex_repo, trex_last_commit_hash)
+ print 'Executing: %s' % command
+ proc = subprocess.Popen(shlex.split(command), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ (trex_last_commit_info, stderr) = proc.communicate()
+ print 'Stdout:\n\t' + trex_last_commit_info.replace('\n', '\n\t')
+ print 'Stderr:', stderr
+ print 'Return code:', proc.returncode
+ trex_last_commit_info = trex_last_commit_info.replace('\n', '<br>')
+ except Exception as e:
+ print 'Error getting last commit: %s' % e
+
+##### get xmls: report_<setup name>.xml
+
+ err = []
+ jobs_list = []
+ jobs_file = '%s/jobs_list.info' % args.input_dir
+ if os.path.exists(jobs_file):
+ with open('%s/jobs_list.info' % args.input_dir) as f:
+ for line in f.readlines():
+ line = line.strip()
+ if line:
+ jobs_list.append(line)
+ else:
+ message = '%s does not exist!' % jobs_file
+ print message
+ err.append(message)
+
+##### aggregate results to 1 single tree
+ aggregated_root = ET.Element('testsuite')
+ setups = {}
+ for job in jobs_list:
+ xml_file = '%s/report_%s.xml' % (args.input_dir, job)
+ if not os.path.exists(xml_file):
+ message = '%s referenced in jobs_list.info does not exist!' % xml_file
+ print message
+ err.append(message)
+ continue
+ if os.path.basename(xml_file) == os.path.basename(args.output_xmlfile):
+ continue
+ setups[job] = []
+ print('Processing setup: %s' % job)
+ tree = ET.parse(xml_file)
+ root = tree.getroot()
+ for key, value in root.attrib.items():
+ if key in aggregated_root.attrib and value.isdigit(): # sum total number of failed tests etc.
+ aggregated_root.attrib[key] = str(int(value) + int(aggregated_root.attrib[key]))
+ else:
+ aggregated_root.attrib[key] = value
+ tests = root.getchildren()
+ if not len(tests): # there should be tests:
+ message = 'No tests in xml %s' % xml_file
+ print message
+ err.append(message)
+ for test in tests:
+ setups[job].append(test)
+ test.attrib['name'] = test.attrib['classname'] + '.' + test.attrib['name']
+ test.attrib['classname'] = job
+ aggregated_root.append(test)
+
+##### save output xml
+
+ print('Writing output file: %s' % args.output_xmlfile)
+ ET.ElementTree(aggregated_root).write(args.output_xmlfile)
+
+
+##### build output html
+ error_tests = []
+ functional_tests = OrderedDict()
+ # categorize and get output of each test
+ for test in aggregated_root.getchildren(): # each test in xml
+ if is_functional_test_name(test.attrib['name']):
+ functional_tests[test.attrib['name']] = test
+ result_tuple = None
+ for child in test.getchildren(): # <system-out>, <system-err> (<failure>, <error>, <skipped> other: passed)
+# if child.tag in ('failure', 'error'):
+ #temp = copy.deepcopy(test)
+ #print temp._children
+ #print test._children
+# error_tests.append(test)
+ if child.tag == 'failure':
+ error_tests.append(test)
+ result_tuple = ('failure', child.text)
+ elif child.tag == 'error':
+ error_tests.append(test)
+ result_tuple = ('error', child.text)
+ elif child.tag == 'skipped':
+ result_tuple = ('skipped', child.text)
+ elif child.tag == 'system-out':
+ test.attrib['stdout'] = child.text
+ elif child.tag == 'system-err':
+ test.attrib['stderr'] = child.text
+ if result_tuple:
+ test.attrib['result'] = result_tuple
+
+ html_output = '''\
+<html>
+<head>
+<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
+<style type="text/css">
+'''
+ with open('style.css') as f:
+ style_css = f.read()
+ html_output += style_css
+ html_output +='''
+</style>
+</head>
+
+<body>
+<table class="reference">
+'''
+ html_output += add_th_td('Scenario:', scenario.capitalize())
+ start_time_file = '%s/start_time.info' % args.input_dir
+ if os.path.exists(start_time_file):
+ with open(start_time_file) as f:
+ start_time = int(f.read())
+ total_time = int(time.time()) - start_time
+ html_output += add_th_td('Started:', datetime.datetime.fromtimestamp(start_time).strftime('%d/%m/%Y %H:%M:%S'))
+ html_output += add_th_td('Total duration:', datetime.timedelta(seconds = total_time))
+ if trex_last_commit_info:
+ html_output += add_th_td('Last commit:', trex_last_commit_info)
+ html_output += '</table><br>\n'
+ if err:
+ html_output += '<font color=red>%s<font><br><br>\n' % '\n<br>'.join(err)
+
+#<table style="width:100%;">
+# <tr>
+# <td>Summary:</td>\
+#'''
+ #passed_quantity = len(result_types['passed'])
+ #failed_quantity = len(result_types['failed'])
+ #error_quantity = len(result_types['error'])
+ #skipped_quantity = len(result_types['skipped'])
+
+ #html_output += '<td>Passed: %s</td>' % passed_quantity
+ #html_output += '<td>Failed: %s</td>' % (pad_tag(failed_quantity, 'b') if failed_quantity else '0')
+ #html_output += '<td>Error: %s</td>' % (pad_tag(error_quantity, 'b') if error_quantity else '0')
+ #html_output += '<td>Skipped: %s</td>' % (pad_tag(skipped_quantity, 'b') if skipped_quantity else '0')
+# html_output += '''
+# </tr>
+#</table>'''
+
+ category_arr = [FUNCTIONAL_CATEGORY, ERROR_CATEGORY]
+
+# Adding buttons
+ # Error button
+ if len(error_tests):
+ html_output += '\n<button onclick=tgl_cat("cat_tglr_{error}")>{error}</button>'.format(error = ERROR_CATEGORY)
+ # Setups buttons
+ for category, tests in setups.items():
+ category_arr.append(category)
+ html_output += '\n<button onclick=tgl_cat("cat_tglr_%s")>%s</button>' % (category_arr[-1], category)
+ # Functional buttons
+ if len(functional_tests):
+ html_output += '\n<button onclick=tgl_cat("cat_tglr_%s")>%s</button>' % (FUNCTIONAL_CATEGORY, FUNCTIONAL_CATEGORY)
+
+# Adding tests
+ # Error tests
+ if len(error_tests):
+ html_output += add_category_of_tests(ERROR_CATEGORY, error_tests, hidden=False)
+ # Setups tests
+ for category, tests in setups.items():
+ html_output += add_category_of_tests(category, tests, hidden=True, category_info_dir=args.input_dir)
+ # Functional tests
+ if len(functional_tests):
+ html_output += add_category_of_tests(FUNCTIONAL_CATEGORY, functional_tests.values(), hidden=True)
+
+ html_output += '\n\n<script type="text/javascript">\n var category_arr = %s\n' % ['cat_tglr_%s' % x for x in category_arr]
+ html_output += '''
+ function tgl_cat(id)
+ {
+ for(var i=0; i<category_arr.length; i++)
+ {
+ var e = document.getElementById(category_arr[i]);
+ if (id == category_arr[i])
+ {
+ if(e.style.display == 'block')
+ e.style.display = 'none';
+ else
+ e.style.display = 'block';
+ }
+ else
+ {
+ if (e) e.style.display = 'none';
+ }
+ }
+ }
+ function tgl_test(id)
+ {
+ var e = document.getElementById(id);
+ if(e.style.display == 'table-row')
+ e.style.display = 'none';
+ else
+ e.style.display = 'table-row';
+ }
+</script>
+</body>
+</html>\
+'''
+
+# mail report (only error tests, expanded)
+
+ mail_output = '''\
+<html>
+<head>
+<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
+<style type="text/css">
+'''
+ mail_output += style_css
+ mail_output +='''
+</style>
+</head>
+
+<body>
+<table class="reference">
+'''
+ mail_output += add_th_td('Scenario:', scenario.capitalize())
+ if build_url:
+ mail_output += add_th_td('Full HTML report:', '<a class="example" href="%s/HTML_Report">link</a>' % build_url)
+ start_time_file = '%s/start_time.info' % args.input_dir
+ if os.path.exists(start_time_file):
+ with open(start_time_file) as f:
+ start_time = int(f.read())
+ total_time = int(time.time()) - start_time
+ mail_output += add_th_td('Started:', datetime.datetime.fromtimestamp(start_time).strftime('%d/%m/%Y %H:%M:%S'))
+ mail_output += add_th_td('Total duration:', datetime.timedelta(seconds = total_time))
+ if trex_last_commit_info:
+ mail_output += add_th_td('Last commit:', trex_last_commit_info)
+ mail_output += '</table><br>\n<table width=100%><tr><td>\n'
+
+ for category in setups.keys():
+ failing_category = False
+ for test in error_tests:
+ if test.attrib['classname'] == category:
+ failing_category = True
+ if failing_category or not len(setups[category]):
+ mail_output += '<table class="reference_fail" align=left style="Margin-bottom:10;Margin-right:10;">\n'
+ else:
+ mail_output += '<table class="reference" align=left style="Margin-bottom:10;Margin-right:10;">\n'
+ mail_output += add_th_th('Setup:', pad_tag(category.replace('.', '/'), 'b'))
+ category_info_file = '%s/report_%s.info' % (args.input_dir, category.replace('.', '_'))
+ if os.path.exists(category_info_file):
+ with open(category_info_file) as f:
+ for info_line in f.readlines():
+ key_value = info_line.split(':', 1)
+ if key_value[0].startswith('User'): # always 'hhaim', no need to show
+ continue
+ mail_output += add_th_td('%s:' % key_value[0], key_value[1])
+ else:
+ mail_output += add_th_td('Info:', 'No info')
+ mail_output += '</table>\n'
+ mail_output += '</td></tr></table>\n'
+
+ # Error tests
+ if len(error_tests) or err:
+ if err:
+ mail_output += '<font color=red>%s<font>' % '\n<br>'.join(err)
+ if len(error_tests) > 5:
+ mail_output += '\n<br><font color=red>More than 5 failed tests, showing brief output.<font>\n<br>'
+ # show only brief version (cut some info)
+ mail_output += add_category_of_tests(ERROR_CATEGORY, error_tests, hidden=False, expanded=True, brief=True)
+ else:
+ mail_output += add_category_of_tests(ERROR_CATEGORY, error_tests, hidden=False, expanded=True)
+ else:
+ mail_output += '<table><tr style="font-size:120;color:green;font-family:arial"><td>☺</td><td style="font-size:20">All passed.</td></tr></table>\n'
+ mail_output += '\n</body>\n</html>'
+
+##### save outputs
+
+# html
+ with open(args.output_htmlfile, 'w') as f:
+ print('Writing output file: %s' % args.output_htmlfile)
+ f.write(html_output)
+
+# mail content
+ with open(args.output_mailfile, 'w') as f:
+ print('Writing output file: %s' % args.output_mailfile)
+ f.write(mail_output)
+
+# build status
+ category_dict_status = {}
+ if os.path.exists(args.build_status_file):
+ with open(args.build_status_file) as f:
+ print('Reading: %s' % args.build_status_file)
+ category_dict_status = pickle.load(f)
+ if type(category_dict_status) is not dict:
+ print '%s is corrupt, truncating' % args.build_status_file
+ category_dict_status = {}
+
+ last_status = category_dict_status.get(scenario, 'Successful') # assume last is passed if no history
+ if err or len(error_tests): # has fails
+ if is_good_status(last_status):
+ current_status = 'Failure'
+ else:
+ current_status = 'Still Failing'
+ else:
+ if is_good_status(last_status):
+ current_status = 'Successful'
+ else:
+ current_status = 'Fixed'
+ category_dict_status[scenario] = current_status
+
+ with open(args.build_status_file, 'w') as f:
+ print('Writing output file: %s' % args.build_status_file)
+ pickle.dump(category_dict_status, f)
+
+# mail title
+ mailtitle_output = scenario.capitalize()
+ if build_id:
+ mailtitle_output += ' - Build #%s' % build_id
+ mailtitle_output += ' - %s!' % current_status
+
+ with open(args.output_titlefile, 'w') as f:
+ print('Writing output file: %s' % args.output_titlefile)
+ f.write(mailtitle_output)
diff --git a/scripts/automation/regression/functional_unit_tests.py b/scripts/automation/regression/functional_unit_tests.py
new file mode 100755
index 00000000..30e915c4
--- /dev/null
+++ b/scripts/automation/regression/functional_unit_tests.py
@@ -0,0 +1,78 @@
+#!/router/bin/python
+
+__copyright__ = "Copyright 2014"
+
+
+
+import os
+import sys
+import outer_packages
+import nose
+from nose.plugins import Plugin
+import logging
+from rednose import RedNose
+import termstyle
+
+
+
+
+def set_report_dir (report_dir):
+ if not os.path.exists(report_dir):
+ os.mkdir(report_dir)
+
+if __name__ == "__main__":
+
+ # setting defaults. By default we run all the test suite
+ specific_tests = False
+ disableLogCapture = False
+ long_test = False
+ report_dir = "reports"
+
+ nose_argv= sys.argv + ['-s', '-v', '--exe', '--rednose', '--detailed-errors']
+
+# for arg in sys.argv:
+# if 'unit_tests/' in arg:
+# specific_tests = True
+# if 'log-path' in arg:
+# disableLogCapture = True
+# if arg=='--collect-only': # this is a user trying simply to view the available tests. removing xunit param from nose args
+# nose_argv[5:7] = []
+
+
+
+ try:
+ result = nose.run(argv = nose_argv, addplugins = [RedNose()])
+
+ if (result == True):
+ print termstyle.green("""
+ ..::''''::..
+ .;'' ``;.
+ :: :: :: ::
+ :: :: :: ::
+ :: :: :: ::
+ :: .:' :: :: `:. ::
+ :: : : ::
+ :: `:. .:' ::
+ `;..``::::''..;'
+ ``::,,,,::''
+
+ ___ ___ __________
+ / _ \/ _ | / __/ __/ /
+ / ___/ __ |_\ \_\ \/_/
+ /_/ /_/ |_/___/___(_)
+
+ """)
+ sys.exit(0)
+ else:
+ sys.exit(-1)
+
+ finally:
+ pass
+
+
+
+
+
+
+
+
diff --git a/scripts/automation/regression/interactive_platform b/scripts/automation/regression/interactive_platform
new file mode 100755
index 00000000..5c5e920e
--- /dev/null
+++ b/scripts/automation/regression/interactive_platform
@@ -0,0 +1,4 @@
+#!/bin/bash
+/router/bin/python-2.7.4 interactive_platform.py $@
+sts=$?
+exit $sts \ No newline at end of file
diff --git a/scripts/automation/regression/interactive_platform.py b/scripts/automation/regression/interactive_platform.py
new file mode 100755
index 00000000..bfedd37d
--- /dev/null
+++ b/scripts/automation/regression/interactive_platform.py
@@ -0,0 +1,338 @@
+#!/router/bin/python-2.7.4
+
+from CPlatform import *
+import cmd
+import outer_packages
+import termstyle
+import os
+from misc_methods import load_object_config_file
+from optparse import OptionParser
+from CShowParser import PlatformResponseMissmatch, PlatformResponseAmbiguity
+
+class InteractivePlatform(cmd.Cmd):
+
+ intro = termstyle.green("\nInteractive shell to control a remote Cisco IOS platform.\nType help to view available pre-defined configurations\n(c) All rights reserved.\n")
+ prompt = '> '
+
+ def __init__(self, cfg_yaml_path = None, silent_mode = False, virtual_mode = False ):
+# super(InteractivePlatform, self).__init__()
+ cmd.Cmd.__init__(self)
+ self.virtual_mode = virtual_mode
+ self.platform = CPlatform(silent_mode)
+ if cfg_yaml_path is None:
+ try:
+ cfg_yaml_path = raw_input(termstyle.cyan("Please enter a readable .yaml configuration file path: "))
+ cfg_yaml_path = os.path.abspath(cfg_yaml_path)
+ except KeyboardInterrupt:
+ exit(-1)
+ try:
+ self.device_cfg = CDeviceCfg(cfg_yaml_path)
+ self.platform.load_platform_data_from_file(self.device_cfg)
+ if not virtual_mode:
+ # if not virtual mode, try to establish a phyisical connection to platform
+ self.platform.launch_connection(self.device_cfg)
+
+ except Exception as inst:
+ print termstyle.magenta(inst)
+ exit(-1)
+
+ def do_show_cfg (self, line):
+ """Outputs the loaded interface configuration"""
+ self.platform.get_if_manager().dump_if_config()
+ print termstyle.green("*** End of interface configuration ***")
+
+ def do_show_nat_cfg (self, line):
+ """Outputs the loaded nat provided configuration"""
+ try:
+ self.platform.dump_obj_config('nat')
+ print termstyle.green("*** End of nat configuration ***")
+ except UserWarning as inst:
+ print termstyle.magenta(inst)
+
+
+ def do_show_static_route_cfg (self, line):
+ """Outputs the loaded static route configuration"""
+ try:
+ self.platform.dump_obj_config('static_route')
+ print termstyle.green("*** End of static route configuration ***")
+ except UserWarning as inst:
+ print termstyle.magenta(inst)
+
+ def do_switch_cfg (self, cfg_file_path):
+ """Switch the current platform interface configuration with another one"""
+ if cfg_file_path:
+ cfg_yaml_path = os.path.abspath(cfg_file_path)
+ self.device_cfg = CDeviceCfg(cfg_yaml_path)
+ self.platform.load_platform_data_from_file(self.device_cfg)
+ if not self.virtual_mode:
+ self.platform.reload_connection(self.device_cfg)
+ print termstyle.green("Configuration switching completed successfully.")
+ else:
+ print termstyle.magenta("Configuration file is missing. Please try again.")
+
+ def do_load_clean (self, arg):
+ """Loads a clean configuration file onto the platform
+ Specify no arguments will load 'clean_config.cfg' file from bootflash disk
+ First argument is clean config filename
+ Second argument is platform file's disk"""
+ if arg:
+ in_val = arg.split(' ')
+ if len(in_val)==2:
+ self.platform.load_clean_config(in_val[0], in_val[1])
+ else:
+ print termstyle.magenta("One of the config inputs is missing.")
+ else:
+ self.platform.load_clean_config()
+# print termstyle.magenta("Configuration file definition is missing. use 'help load_clean' for further info.")
+
+ def do_basic_if_config(self, line):
+ """Apply basic interfaces configuartion to all platform interfaces"""
+ self.platform.configure_basic_interfaces()
+ print termstyle.green("Basic interfaces configuration applied successfully.")
+
+ def do_pbr(self, line):
+ """Apply IPv4 PBR configuration on all interfaces"""
+ self.platform.config_pbr()
+ print termstyle.green("IPv4 PBR configuration applied successfully.")
+
+ def do_no_pbr(self, line):
+ """Removes IPv4 PBR configuration from all interfaces"""
+ self.platform.config_no_pbr()
+ print termstyle.green("IPv4 PBR configuration removed successfully.")
+
+ def do_nbar(self, line):
+ """Apply NBAR PD configuration on all interfaces"""
+ self.platform.config_nbar_pd()
+ print termstyle.green("NBAR configuration applied successfully.")
+
+ def do_no_nbar(self, line):
+ """Removes NBAR PD configuration from all interfaces"""
+ self.platform.config_no_nbar_pd()
+ print termstyle.green("NBAR configuration removed successfully.")
+
+ def do_static_route(self, arg):
+ """Apply IPv4 static routing configuration on all interfaces
+ Specify no arguments will apply static routing with following config:
+ 1. clients_start - 16.0.0.1
+ 2. servers_start - 48.0.0.1
+ 3. dual_port_mask - 1.0.0.0
+ 4. client_destination_mask - 255.0.0.0
+ 5. server_destination_mask - 255.0.0.0
+ """
+ if arg:
+ stat_route_dict = load_object_config_file(arg)
+# else:
+# print termstyle.magenta("Unknown configutaion option requested. use 'help static_route' for further info.")
+ else:
+ stat_route_dict = { 'clients_start' : '16.0.0.1',
+ 'servers_start' : '48.0.0.1',
+ 'dual_port_mask': '1.0.0.0',
+ 'client_destination_mask' : '255.0.0.0',
+ 'server_destination_mask' : '255.0.0.0' }
+ stat_route_obj = CStaticRouteConfig(stat_route_dict)
+ self.platform.config_static_routing(stat_route_obj)
+ print termstyle.green("IPv4 static routing configuration applied successfully.")
+# print termstyle.magenta("Specific configutaion is missing. use 'help static_route' for further info.")
+
+ def do_no_static_route(self, line):
+ """Removes IPv4 static route configuration from all non-duplicated interfaces"""
+ try:
+ self.platform.config_no_static_routing()
+ print termstyle.green("IPv4 static routing configuration removed successfully.")
+ except UserWarning as inst:
+ print termstyle.magenta(inst)
+
+ def do_nat(self, arg):
+ """Apply NAT configuration on all non-duplicated interfaces
+ Specify no arguments will apply NAT with following config:
+ 1. clients_net_start - 16.0.0.0
+ 2. client_acl_wildcard_mask - 0.0.0.255
+ 3. dual_port_mask - 1.0.0.0
+ 4. pool_start - 200.0.0.0
+ 5. pool_netmask - 255.255.255.0
+ """
+ if arg:
+ nat_dict = load_object_config_file(arg)
+# else:
+# print termstyle.magenta("Unknown nat configutaion option requested. use 'help nat' for further info.")
+ else:
+# print termstyle.magenta("Specific nat configutaion is missing. use 'help nat' for further info.")
+ nat_dict = { 'clients_net_start' : '16.0.0.0',
+ 'client_acl_wildcard_mask' : '0.0.0.255',
+ 'dual_port_mask' : '1.0.0.0',
+ 'pool_start' : '200.0.0.0',
+ 'pool_netmask' : '255.255.255.0' }
+ nat_obj = CNatConfig(nat_dict)
+ self.platform.config_nat(nat_obj)
+ print termstyle.green("NAT configuration applied successfully.")
+
+ def do_no_nat(self, arg):
+ """Removes NAT configuration from all non-duplicated interfaces"""
+ try:
+ self.platform.config_no_nat()
+ print termstyle.green("NAT configuration removed successfully.")
+ except UserWarning as inst:
+ print termstyle.magenta(inst)
+
+
+ def do_ipv6_pbr(self, line):
+ """Apply IPv6 PBR configuration on all interfaces"""
+ self.platform.config_ipv6_pbr()
+ print termstyle.green("IPv6 PBR configuration applied successfully.")
+
+ def do_no_ipv6_pbr(self, line):
+ """Removes IPv6 PBR configuration from all interfaces"""
+ self.platform.config_no_ipv6_pbr()
+ print termstyle.green("IPv6 PBR configuration removed successfully.")
+
+ def do_zbf(self, line):
+ """Apply Zone-Based policy Firewall configuration on all interfaces"""
+ self.platform.config_zbf()
+ print termstyle.green("Zone-Based policy Firewall configuration applied successfully.")
+
+ def do_no_zbf(self, line):
+ """Removes Zone-Based policy Firewall configuration from all interfaces"""
+ self.platform.config_no_zbf()
+ print termstyle.green("Zone-Based policy Firewall configuration removed successfully.")
+
+ def do_show_cpu_util(self, line):
+ """Fetches CPU utilization stats from the platform"""
+ try:
+ print self.platform.get_cpu_util()
+ print termstyle.green("*** End of show_cpu_util output ***")
+ except PlatformResponseMissmatch as inst:
+ print termstyle.magenta(inst)
+
+ def do_show_drop_stats(self, line):
+ """Fetches packet drop stats from the platform.\nDrop are summed and presented for both input and output traffic of each interface"""
+ print self.platform.get_drop_stats()
+ print termstyle.green("*** End of show_drop_stats output ***")
+
+ def do_show_nbar_stats(self, line):
+ """Fetches NBAR classification stats from the platform.\nStats are available both as raw data and as percentage data."""
+ try:
+ print self.platform.get_nbar_stats()
+ print termstyle.green("*** End of show_nbar_stats output ***")
+ except PlatformResponseMissmatch as inst:
+ print termstyle.magenta(inst)
+
+ def do_show_nat_stats(self, line):
+ """Fetches NAT translations stats from the platform"""
+ print self.platform.get_nat_stats()
+ print termstyle.green("*** End of show_nat_stats output ***")
+
+ def do_show_cft_stats(self, line):
+ """Fetches CFT stats from the platform"""
+ print self.platform.get_cft_stats()
+ print termstyle.green("*** End of show_sft_stats output ***")
+
+ def do_show_cvla_memory_usage(self, line):
+ """Fetches CVLA memory usage stats from the platform"""
+ (res, res2) = self.platform.get_cvla_memory_usage()
+ print res
+ print res2
+ print termstyle.green("*** End of show_cvla_memory_usage output ***")
+
+ def do_clear_counters(self, line):
+ """Clears interfaces counters"""
+ self.platform.clear_counters()
+ print termstyle.green("*** clear counters completed ***")
+
+ def do_clear_nbar_stats(self, line):
+ """Clears interfaces counters"""
+ self.platform.clear_nbar_stats()
+ print termstyle.green("*** clear nbar stats completed ***")
+
+ def do_clear_cft_counters(self, line):
+ """Clears interfaces counters"""
+ self.platform.clear_cft_counters()
+ print termstyle.green("*** clear cft counters completed ***")
+
+ def do_clear_drop_stats(self, line):
+ """Clears interfaces counters"""
+ self.platform.clear_packet_drop_stats()
+ print termstyle.green("*** clear packet drop stats completed ***")
+
+ def do_clear_nat_translations(self, line):
+ """Clears nat translations"""
+ self.platform.clear_nat_translations()
+ print termstyle.green("*** clear nat translations completed ***")
+
+ def do_set_tftp_server (self, line):
+ """Configures TFTP access on platform"""
+ self.platform.config_tftp_server(self.device_cfg)
+ print termstyle.green("*** TFTP config deployment completed ***")
+
+ def do_show_running_image (self, line):
+ """Fetches currently loaded image of the platform"""
+ res = self.platform.get_running_image_details()
+ print res
+ print termstyle.green("*** Show running image completed ***")
+
+ def do_check_image_existence(self, arg):
+ """Check if specific image file (usually *.bin) is already stored in platform drive"""
+ if arg:
+ try:
+ res = self.platform.check_image_existence(arg.split(' ')[0])
+ print res
+ print termstyle.green("*** Check image existence completed ***")
+ except PlatformResponseAmbiguity as inst:
+ print termstyle.magenta(inst)
+ else:
+ print termstyle.magenta("Please provide an image name in order to check for existance.")
+
+ def do_load_image (self, arg):
+ """Loads a given image filename from tftp server (if not available on disk) and sets it as the boot image on the platform"""
+ if arg:
+ try:
+ self.platform.load_platform_image('asr1001-universalk9.BLD_V155_2_S_XE315_THROTTLE_LATEST_20150324_100047-std.bin')#arg.split(' ')[0])
+ except UserWarning as inst:
+ print termstyle.magenta(inst)
+ else:
+ print termstyle.magenta("Image filename is missing.")
+
+ def do_reload (self, line):
+ """Reloads the platform"""
+
+ ans = misc_methods.query_yes_no('This will reload the platform. Are you sure?', default = None)
+ if ans:
+ # user confirmed he wishes to reload the platform
+ self.platform.reload_platform(self.device_cfg)
+ print termstyle.green("*** Platform reload completed ***")
+ else:
+ print termstyle.green("*** Platform reload aborted ***")
+
+ def do_quit(self, arg):
+ """Quits the application"""
+ return True
+
+ def do_exit(self, arg):
+ """Quits the application"""
+ return True
+
+ def do_all(self, arg):
+ """Configures bundle of commands to set PBR routing"""
+ self.do_load_clean('')
+ self.do_set_tftp_server('')
+ self.do_basic_if_config('')
+ self.do_pbr('')
+ self.do_ipv6_pbr('')
+
+
+
+if __name__ == "__main__":
+ parser = OptionParser(version="%prog 1.0 \t (C) Cisco Systems Inc.\n")
+ parser.add_option("-c", "--config-file", dest="cfg_yaml_path",
+ action="store", help="Define the interface configuration to load the applicatino with.", metavar="FILE_PATH")
+ parser.add_option("-s", "--silent", dest="silent_mode", default = False,
+ action="store_true", help="Silence the generated input when commands launched.")
+ parser.add_option("-v", "--virtual", dest="virtual_mode", default = False,
+ action="store_true", help="Interact with a virtual router, no actual link will apply. Show commands are NOT available in this mode.")
+ (options, args) = parser.parse_args()
+
+ try:
+ InteractivePlatform(**vars(options)).cmdloop()
+
+ except KeyboardInterrupt:
+ exit(-1)
+
diff --git a/scripts/automation/regression/interfaces_e.py b/scripts/automation/regression/interfaces_e.py
new file mode 100755
index 00000000..15301623
--- /dev/null
+++ b/scripts/automation/regression/interfaces_e.py
@@ -0,0 +1,8 @@
+#!/router/bin/python
+
+import outer_packages
+from enum import Enum
+
+
+# define the states in which a T-Rex can hold during its lifetime
+IFType = Enum('IFType', 'Client Server All')
diff --git a/scripts/automation/regression/misc_methods.py b/scripts/automation/regression/misc_methods.py
new file mode 100755
index 00000000..f736d805
--- /dev/null
+++ b/scripts/automation/regression/misc_methods.py
@@ -0,0 +1,280 @@
+#!/router/bin/python
+
+import ConfigParser
+import outer_packages
+import yaml
+import sys
+from collections import namedtuple
+import subprocess, shlex
+import os
+
+TRexConfig = namedtuple('TRexConfig', 'trex, router, tftp')
+
+# debug/development purpose, lists object's attributes and their values
+def print_r(obj):
+ for attr in dir(obj):
+ print 'obj.%s %s' % (attr, getattr(obj, attr))
+
+def mix_string (str):
+ """Convert all string to lowercase letters, and replaces spaces with '_' char"""
+ return str.replace(' ', '_').lower()
+
+# executes given command, returns tuple (return_code, stdout, stderr)
+def run_command(cmd):
+ print 'Running command:', cmd
+ proc = subprocess.Popen(shlex.split(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ (stdout, stderr) = proc.communicate()
+ if stdout:
+ print 'Stdout:\n%s' % stdout
+ if stderr:
+ print 'Stderr:\n%s' % stderr
+ print 'Return code: %s' % proc.returncode
+ return (proc.returncode, stdout, stderr)
+
+
+def run_remote_command(host, passwd, command_string):
+ cmd = 'ssh -tt %s \'sudo sh -c "%s"\'' % (host, command_string)
+ print 'Trying connection with ssh...'
+ return_code, stdout, stderr = run_command(cmd)
+ if return_code == 0:
+ return (return_code, stdout, stderr)
+ else:
+ print 'Trying connection with expect + sshpass.exp...'
+ cmd = 'sshpass.exp %s %s root "%s"' % (passwd, host, command_string)
+ return_code, stdout, stderr = run_command(cmd)
+ return (return_code, stdout, stderr)
+
+
+def generate_intf_lists (interfacesList):
+ retDict = {
+ 'relevant_intf' : [],
+ 'relevant_ip_addr' : [],
+ 'relevant_mac_addr' : [],
+ 'total_pairs' : None
+ }
+
+ for intf in interfacesList:
+ retDict['relevant_intf'].append(intf['client'])
+ retDict['relevant_ip_addr'].append(intf['client_config']['ip_addr'])
+ retDict['relevant_mac_addr'].append(intf['client_config']['mac_addr'])
+ retDict['relevant_intf'].append(intf['server'])
+ retDict['relevant_ip_addr'].append(intf['server_config']['ip_addr'])
+ retDict['relevant_mac_addr'].append(intf['server_config']['mac_addr'])
+
+ retDict['total_pairs'] = len(interfacesList)
+
+ return retDict
+
+def get_single_net_client_addr (ip_addr, octetListDict = {'3' : 1}, ip_type = 'ipv4'):
+ """ get_single_net_client_addr(ip_addr, octetListDict, ip_type) -> str
+
+ Parameters
+ ----------
+ ip_addr : str
+ a string an IP address (by default, of type A.B.C.D)
+ octetListDict : dict
+ a ditionary representing the octets on which to act such that ip[octet_key] = ip[octet_key] + octet_value
+ ip_type : str
+ a string that defines the ip type to parse. possible inputs are 'ipv4', 'ipv6'
+
+ By default- Returns a new ip address - A.B.C.(D+1)
+ """
+ if ip_type == 'ipv4':
+ ip_lst = ip_addr.split('.')
+
+ for octet,increment in octetListDict.iteritems():
+ int_octet = int(octet)
+ if ((int_octet < 0) or (int_octet > 3)):
+ raise ValueError('the provided octet is not legal in {0} format'.format(ip_type) )
+ else:
+ if (int(ip_lst[int_octet]) + increment) < 255:
+ ip_lst[int_octet] = str(int(ip_lst[int_octet]) + increment)
+ else:
+ raise ValueError('the requested increment exceeds 255 client address limit')
+
+ return '.'.join(ip_lst)
+
+ else: # this is a ipv6 address, handle accordingly
+ ip_lst = ip_addr.split(':')
+
+ for octet,increment in octetListDict.iteritems():
+ int_octet = int(octet)
+ if ((int_octet < 0) or (int_octet > 7)):
+ raise ValueError('the provided octet is not legal in {0} format'.format(ip_type) )
+ else:
+ if (int(ip_lst[int_octet]) + increment) < 65535:
+ ip_lst[int_octet] = format( int(ip_lst[int_octet], 16) + increment, 'X')
+ else:
+ raise ValueError('the requested increment exceeds 65535 client address limit')
+
+ return ':'.join(ip_lst)
+
+
+def load_complete_config_file (filepath):
+ """load_complete_config_file(filepath) -> list
+
+ Loads a configuration file (.yaml) for both trex config and router config
+ Returns a list with a dictionary to each of the configurations
+ """
+
+ # create response dictionaries
+ trex_config = {}
+ rtr_config = {}
+ tftp_config = {}
+
+ try:
+ with open(filepath, 'r') as f:
+ config = yaml.load(f)
+
+ # Handle T-Rex configuration
+ trex_config['trex_name'] = config["trex"]["hostname"]
+ trex_config['trex_password'] = config["trex"]["password"]
+ #trex_config['trex_is_dual'] = config["trex"]["is_dual"]
+ trex_config['trex_cores'] = int(config["trex"]["cores"])
+ #trex_config['trex_latency'] = int(config["trex"]["latency"])
+# trex_config['trex_version_path'] = config["trex"]["version_path"]
+ trex_config['modes'] = config['trex'].get('modes', [])
+
+ if 'loopback' not in trex_config['modes']:
+ trex_config['router_interface'] = config["router"]["ip_address"]
+
+ # Handle Router configuration
+ rtr_config['model'] = config["router"]["model"]
+ rtr_config['hostname'] = config["router"]["hostname"]
+ rtr_config['ip_address'] = config["router"]["ip_address"]
+ rtr_config['image'] = config["router"]["image"]
+ rtr_config['line_pswd'] = config["router"]["line_password"]
+ rtr_config['en_pswd'] = config["router"]["en_password"]
+ rtr_config['interfaces'] = config["router"]["interfaces"]
+ rtr_config['clean_config'] = config["router"]["clean_config"]
+ rtr_config['intf_masking'] = config["router"]["intf_masking"]
+ rtr_config['ipv6_mask'] = config["router"]["ipv6_mask"]
+ rtr_config['mgmt_interface'] = config["router"]["mgmt_interface"]
+
+ # Handle TFTP configuration
+ tftp_config['hostname'] = config["tftp"]["hostname"]
+ tftp_config['ip_address'] = config["tftp"]["ip_address"]
+ tftp_config['images_path'] = config["tftp"]["images_path"]
+
+ if rtr_config['clean_config'] is None:
+ raise ValueError('A clean router configuration wasn`t provided.')
+
+ except ValueError:
+ print '!!!!!'
+ raise
+
+ except Exception as inst:
+ print "\nBad configuration file provided: '{0}'\n".format(filepath)
+ raise inst
+
+ return TRexConfig(trex_config, rtr_config, tftp_config)
+
+def load_object_config_file (filepath):
+ try:
+ with open(filepath, 'r') as f:
+ config = yaml.load(f)
+ return config
+ except Exception as inst:
+ print "\nBad configuration file provided: '{0}'\n".format(filepath)
+ print inst
+ exit(-1)
+
+
+def query_yes_no(question, default="yes"):
+ """Ask a yes/no question via raw_input() and return their answer.
+
+ "question" is a string that is presented to the user.
+ "default" is the presumed answer if the user just hits <Enter>.
+ It must be "yes" (the default), "no" or None (meaning
+ an answer is required of the user).
+
+ The "answer" return value is True for "yes" or False for "no".
+ """
+ valid = { "yes": True, "y": True, "ye": True,
+ "no": False, "n": False }
+ if default is None:
+ prompt = " [y/n] "
+ elif default == "yes":
+ prompt = " [Y/n] "
+ elif default == "no":
+ prompt = " [y/N] "
+ else:
+ raise ValueError("invalid default answer: '%s'" % default)
+
+ while True:
+ sys.stdout.write(question + prompt)
+ choice = raw_input().lower()
+ if default is not None and choice == '':
+ return valid[default]
+ elif choice in valid:
+ return valid[choice]
+ else:
+ sys.stdout.write("Please respond with 'yes' or 'no' "
+ "(or 'y' or 'n').\n")
+
+
+def load_benchmark_config_file (filepath):
+ """load_benchmark_config_file(filepath) -> list
+
+ Loads a configuration file (.yaml) for both trex config and router config
+ Returns a list with a dictionary to each of the configurations
+ """
+
+ # create response dictionary
+ benchmark_config = {}
+
+ try:
+ with open(filepath, 'r') as f:
+ benchmark_config = yaml.load(f)
+
+ except Exception as inst:
+ print "\nBad configuration file provided: '{0}'\n".format(filepath)
+ print inst
+ exit(-1)
+
+ return benchmark_config
+
+
+def get_benchmark_param (benchmark_path, test_name, param, sub_param = None):
+
+ config = load_benchmark_config_file(benchmark_path)
+ if sub_param is None:
+ return config[test_name][param]
+ else:
+ return config[test_name][param][sub_param]
+
+def gen_increment_dict (dual_port_mask):
+ addr_lst = dual_port_mask.split('.')
+ result = {}
+ for idx, octet_increment in enumerate(addr_lst):
+ octet_int = int(octet_increment)
+ if octet_int>0:
+ result[str(idx)] = octet_int
+
+ return result
+
+
+def get_network_addr (ip_type = 'ipv4'):
+ ipv4_addr = [1, 1, 1, 0] # base ipv4 address to start generating from- 1.1.1.0
+ ipv6_addr = ['2001', 'DB8', 0, '2222', 0, 0, 0, 0] # base ipv6 address to start generating from- 2001:DB8:1111:2222:0:0
+ while True:
+ if ip_type == 'ipv4':
+ if (ipv4_addr[2] < 255):
+ yield [".".join( map(str, ipv4_addr) ), '255.255.255.0']
+ ipv4_addr[2] += 1
+ else: # reached defined maximum limit of address allocation
+ return
+ else: # handling ipv6 addressing
+ if (ipv6_addr[2] < 4369):
+ tmp_ipv6_addr = list(ipv6_addr)
+ tmp_ipv6_addr[2] = hex(tmp_ipv6_addr[2])[2:]
+ yield ":".join( map(str, tmp_ipv6_addr) )
+ ipv6_addr[2] += 1
+ else: # reached defined maximum limit of address allocation
+ return
+
+
+
+
+if __name__ == "__main__":
+ pass
diff --git a/scripts/automation/regression/outer_packages.py b/scripts/automation/regression/outer_packages.py
new file mode 100755
index 00000000..05bedc71
--- /dev/null
+++ b/scripts/automation/regression/outer_packages.py
@@ -0,0 +1,38 @@
+#!/router/bin/python
+
+import sys, site
+import platform, os
+
+CURRENT_PATH = os.path.dirname(os.path.realpath(__file__)) # alternate use with: os.getcwd()
+TREX_PATH = os.getenv('TREX_UNDER_TEST') # path to <trex-core>/scripts directory, env. variable TREX_UNDER_TEST should override it.
+if not TREX_PATH or not os.path.isfile('%s/trex_daemon_server' % TREX_PATH):
+ TREX_PATH = os.path.abspath(os.path.join(CURRENT_PATH, os.pardir, os.pardir))
+PATH_TO_PYTHON_LIB = os.path.abspath(os.path.join(TREX_PATH, 'external_libs'))
+PATH_TO_CTRL_PLANE = os.path.abspath(os.path.join(TREX_PATH, 'automation', 'trex_control_plane'))
+
+NIGHTLY_MODULES = ['enum34-1.0.4',
+ 'nose-1.3.4',
+ 'rednose-0.4.1',
+ 'progressbar-2.2',
+ 'termstyle',
+ 'dpkt-1.8.6',
+ 'yaml-3.11',
+ ]
+
+def import_nightly_modules ():
+ sys.path.append(TREX_PATH)
+ sys.path.append(PATH_TO_CTRL_PLANE)
+ import_module_list(NIGHTLY_MODULES)
+
+def import_module_list (modules_list):
+ assert(isinstance(modules_list, list))
+ for p in modules_list:
+ full_path = os.path.join(PATH_TO_PYTHON_LIB, p)
+ fix_path = os.path.normcase(full_path) #CURRENT_PATH+p)
+ sys.path.insert(1, full_path)
+
+import_nightly_modules()
+
+
+if __name__ == "__main__":
+ pass
diff --git a/scripts/automation/regression/platform_cmd_link.py b/scripts/automation/regression/platform_cmd_link.py
new file mode 100755
index 00000000..3d577baf
--- /dev/null
+++ b/scripts/automation/regression/platform_cmd_link.py
@@ -0,0 +1,442 @@
+#!/router/bin/python
+
+from interfaces_e import IFType
+import CustomLogger
+import misc_methods
+import telnetlib
+import socket
+
+class CCommandCache(object):
+ def __init__(self):
+ self.__gen_clean_data_structure()
+
+ def __gen_clean_data_structure (self):
+ self.cache = {"IF" : {},
+ "CONF" : [],
+ "EXEC" : []}
+
+ def __list_append (self, dest_list, cmd):
+ if isinstance(cmd, list):
+ dest_list.extend( cmd )
+ else:
+ dest_list.append( cmd )
+
+ def add (self, cmd_type, cmd, interface = None):
+
+ if interface is not None: # this is an interface ("IF") config command
+ if interface in self.cache['IF']:
+ # interface commands already exists
+ self.__list_append(self.cache['IF'][interface], cmd)
+ else:
+ # no chached commands for this interface
+ self.cache['IF'][interface] = []
+ self.__list_append(self.cache['IF'][interface], cmd)
+ else: # this is either a CONF or EXEC command
+ self.__list_append(self.cache[cmd_type.upper()], cmd)
+
+ def dump_config (self):
+ # dump IF config:
+ print "configure terminal"
+ for intf, intf_cmd_list in self.cache['IF'].iteritems():
+ print "interface {if_name}".format( if_name = intf )
+ print '\n'.join(intf_cmd_list)
+
+ if self.cache['IF']:
+ # add 'exit' note only if if config actually took place
+ print 'exit' # exit to global config mode
+
+ # dump global config
+ if self.cache['CONF']:
+ print '\n'.join(self.cache['CONF'])
+
+ # exit back to en mode
+ print "exit"
+
+ # dump exec config
+ if self.cache['EXEC']:
+ print '\n'.join(self.cache['EXEC'])
+
+ def get_config_list (self):
+ conf_list = []
+
+ conf_list.append("configure terminal")
+ for intf, intf_cmd_list in self.cache['IF'].iteritems():
+ conf_list.append( "interface {if_name}".format( if_name = intf ) )
+ conf_list.extend( intf_cmd_list )
+ if len(conf_list)>1:
+ # add 'exit' note only if if config actually took place
+ conf_list.append("exit")
+
+ conf_list.extend( self.cache['CONF'] )
+ conf_list.append("exit")
+ conf_list.extend( self.cache['EXEC'] )
+
+
+ return conf_list
+
+ def clear_cache (self):
+ # clear all pointers to cache data (erase the data structure)
+ self.cache.clear()
+ # Re-initialize the cache
+ self.__gen_clean_data_structure()
+
+ pass
+
+
+class CCommandLink(object):
+ def __init__(self, silent_mode = False):
+ self.history = []
+ self.virtual_mode = True
+ self.silent_mode = silent_mode
+ self.telnet_con = None
+
+
+ def __transmit (self, cmd_list, **kwargs):
+ self.history.extend(cmd_list)
+ if not self.silent_mode:
+ print '\n'.join(cmd_list) # prompting the pushed platform commands
+ if not self.virtual_mode:
+ # transmit the command to platform.
+ return self.telnet_con.write_ios_cmd(cmd_list, **kwargs)
+
+ def run_command (self, cmd_list, **kwargs):
+ response = ''
+ for cmd in cmd_list:
+
+ # check which type of cmd we handle
+ if isinstance(cmd, CCommandCache):
+ tmp_response = self.__transmit( cmd.get_config_list(), **kwargs ) # join the commands with new-line delimiter
+ else:
+ tmp_response = self.__transmit([cmd], **kwargs)
+ if not self.virtual_mode:
+ response += tmp_response
+ return response
+
+ def run_single_command (self, cmd, **kwargs):
+ return self.run_command([cmd], **kwargs)
+
+ def get_history (self, as_string = False):
+ if as_string:
+ return '\n'.join(self.history)
+ else:
+ return self.history
+
+ def clear_history (self):
+ # clear all pointers to history data (erase the data structure)
+ del self.history[:]
+ # Re-initialize the histoyr with clear one
+ self.history = []
+
+ def launch_platform_connectivity (self, device_config_obj):
+ connection_info = device_config_obj.get_platform_connection_data()
+ self.telnet_con = CIosTelnet( **connection_info )
+ self.virtual_mode = False # if physical connectivity was successful, toggle virtual mode off
+
+ def close_platform_connection(self):
+ if self.telnet_con is not None:
+ self.telnet_con.close()
+
+
+
+class CDeviceCfg(object):
+ def __init__(self, cfg_yaml_path = None):
+ if cfg_yaml_path is not None:
+ (self.platform_cfg, self.tftp_cfg) = misc_methods.load_complete_config_file(cfg_yaml_path)[1:3]
+
+ self.interfaces_cfg = self.platform_cfg['interfaces'] # extract only the router interface configuration
+
+ def set_platform_config(self, config_dict):
+ self.platform_cfg = config_dict
+ self.interfaces_cfg = self.platform_cfg['interfaces']
+
+ def set_tftp_config(self, tftp_cfg):
+ self.tftp_cfg = tftp_cfg
+
+ def get_interfaces_cfg (self):
+ return self.interfaces_cfg
+
+ def get_ip_address (self):
+ return self.__get_attr('ip_address')
+
+ def get_line_password (self):
+ return self.__get_attr('line_pswd')
+
+ def get_en_password (self):
+ return self.__get_attr('en_pswd')
+
+ def get_mgmt_interface (self):
+ return self.__get_attr('mgmt_interface')
+
+ def get_platform_connection_data (self):
+ return { 'host' : self.get_ip_address(), 'line_pass' : self.get_line_password(), 'en_pass' : self.get_en_password() }
+
+ def get_tftp_info (self):
+ return self.tftp_cfg
+
+ def get_image_name (self):
+ return self.__get_attr('image')
+
+ def __get_attr (self, attr):
+ return self.platform_cfg[attr]
+
+ def dump_config (self):
+ import yaml
+ print yaml.dump(self.interfaces_cfg, default_flow_style=False)
+
+class CIfObj(object):
+ _obj_id = 0
+
+ def __init__(self, if_name, ipv4_addr, ipv6_addr, src_mac_addr, dest_mac_addr, if_type):
+ self.__get_and_increment_id()
+ self.if_name = if_name
+ self.if_type = if_type
+ self.src_mac_addr = src_mac_addr
+ self.dest_mac_addr = dest_mac_addr
+ self.ipv4_addr = ipv4_addr
+ self.ipv6_addr = ipv6_addr
+ self.pair_parent = None # a pointer to CDualIfObj which holds this interface and its pair-complement
+
+ def __get_and_increment_id (self):
+ self._obj_id = CIfObj._obj_id
+ CIfObj._obj_id += 1
+
+ def get_name (self):
+ return self.if_name
+
+ def get_src_mac_addr (self):
+ return self.src_mac_addr
+
+ def get_dest_mac (self):
+ return self.dest_mac_addr
+
+ def get_id (self):
+ return self._obj_id
+
+ def get_if_type (self):
+ return self.if_type
+
+ def get_ipv4_addr (self):
+ return self.ipv4_addr
+
+ def get_ipv6_addr (self):
+ return self.ipv6_addr
+
+ def set_ipv4_addr (self, addr):
+ self.ipv4_addr = addr
+
+ def set_ipv6_addr (self, addr):
+ self.ipv6_addr = addr
+
+ def set_pair_parent (self, dual_if_obj):
+ self.pair_parent = dual_if_obj
+
+ def get_pair_parent (self):
+ return self.pair_parent
+
+ def is_client (self):
+ return (self.if_type == IFType.Client)
+
+ def is_server (self):
+ return (self.if_type == IFType.Server)
+
+ pass
+
+
+class CDualIfObj(object):
+ _obj_id = 0
+
+ def __init__(self, vrf_name, client_if_obj, server_if_obj):
+ self.__get_and_increment_id()
+ self.vrf_name = vrf_name
+ self.client_if = client_if_obj
+ self.server_if = server_if_obj
+
+ # link if_objects to its parent dual_if
+ self.client_if.set_pair_parent(self)
+ self.server_if.set_pair_parent(self)
+ pass
+
+ def __get_and_increment_id (self):
+ self._obj_id = CDualIfObj._obj_id
+ CDualIfObj._obj_id += 1
+
+ def get_id (self):
+ return self._obj_id
+
+ def get_vrf_name (self):
+ return self.vrf_name
+
+ def is_duplicated (self):
+ return self.vrf_name != None
+
+class CIfManager(object):
+ _ipv4_gen = misc_methods.get_network_addr()
+ _ipv6_gen = misc_methods.get_network_addr(ip_type = 'ipv6')
+
+ def __init__(self):
+ self.interfarces = {}
+ self.dual_intf = []
+ self.full_device_cfg = None
+
+ def __add_if_to_manager (self, if_obj):
+ self.interfarces[if_obj.get_name()] = if_obj
+
+ def __add_dual_if_to_manager (self, dual_if_obj):
+ self.dual_intf.append(dual_if_obj)
+
+ def __get_ipv4_net_client_addr(self, ipv4_addr):
+ return misc_methods.get_single_net_client_addr (ipv4_addr)
+
+ def __get_ipv6_net_client_addr(self, ipv6_addr):
+ return misc_methods.get_single_net_client_addr (ipv6_addr, {'7' : 1}, ip_type = 'ipv6')
+
+ def load_config (self, device_config_obj):
+ self.full_device_cfg = device_config_obj
+ # first, erase all current config
+ self.interfarces.clear()
+ del self.dual_intf[:]
+
+ # than, load the configuration
+ intf_config = device_config_obj.get_interfaces_cfg()
+
+ # finally, parse the information into data-structures
+ for intf_pair in intf_config:
+ # generate network addresses for client side, and initialize client if object
+ tmp_ipv4_addr = self.__get_ipv4_net_client_addr (next(CIfManager._ipv4_gen)[0])
+ tmp_ipv6_addr = self.__get_ipv6_net_client_addr (next(CIfManager._ipv6_gen))
+
+ client_obj = CIfObj(if_name = intf_pair['client']['name'],
+ ipv4_addr = tmp_ipv4_addr,
+ ipv6_addr = tmp_ipv6_addr,
+ src_mac_addr = intf_pair['client']['src_mac_addr'],
+ dest_mac_addr = intf_pair['client']['dest_mac_addr'],
+ if_type = IFType.Client)
+
+ # generate network addresses for server side, and initialize server if object
+ tmp_ipv4_addr = self.__get_ipv4_net_client_addr (next(CIfManager._ipv4_gen)[0])
+ tmp_ipv6_addr = self.__get_ipv6_net_client_addr (next(CIfManager._ipv6_gen))
+
+ server_obj = CIfObj(if_name = intf_pair['server']['name'],
+ ipv4_addr = tmp_ipv4_addr,
+ ipv6_addr = tmp_ipv6_addr,
+ src_mac_addr = intf_pair['server']['src_mac_addr'],
+ dest_mac_addr = intf_pair['server']['dest_mac_addr'],
+ if_type = IFType.Server)
+
+ dual_intf_obj = CDualIfObj(vrf_name = intf_pair['vrf_name'],
+ client_if_obj = client_obj,
+ server_if_obj = server_obj)
+
+ # update single interfaces pointers
+ client_obj.set_pair_parent(dual_intf_obj)
+ server_obj.set_pair_parent(dual_intf_obj)
+
+ # finally, update the data-structures with generated objects
+ self.__add_if_to_manager(client_obj)
+ self.__add_if_to_manager(server_obj)
+ self.__add_dual_if_to_manager(dual_intf_obj)
+
+
+ def get_if_list (self, if_type = IFType.All, is_duplicated = None):
+ result = []
+ for if_name,if_obj in self.interfarces.iteritems():
+ if (if_type == IFType.All) or ( if_obj.get_if_type() == if_type) :
+ if (is_duplicated is None) or (if_obj.get_pair_parent().is_duplicated() == is_duplicated):
+ # append this if_obj only if matches both IFType and is_duplicated conditions
+ result.append(if_obj)
+ return result
+
+ def get_duplicated_if (self):
+ result = []
+ for dual_if_obj in self.dual_intf:
+ if dual_if_obj.get_vrf_name() is not None :
+ result.extend( (dual_if_obj.client_if, dual_if_obj.server_if) )
+ return result
+
+ def get_dual_if_list (self, is_duplicated = None):
+ result = []
+ for dual_if in self.dual_intf:
+ if (is_duplicated is None) or (dual_if.is_duplicated() == is_duplicated):
+ result.append(dual_if)
+ return result
+
+ def dump_if_config (self):
+ if self.full_device_cfg is None:
+ print "Device configuration isn't loaded.\nPlease load config and try again."
+ else:
+ self.full_device_cfg.dump_config()
+
+
+class AuthError(Exception):
+ pass
+
+class CIosTelnet(telnetlib.Telnet):
+ AuthError = AuthError
+ def __init__ (self, host, line_pass, en_pass, port = 23, str_wait = "#"):
+ telnetlib.Telnet.__init__(self)
+ self.host = host
+ self.port = port
+ self.line_passwd = line_pass
+ self.enable_passwd = en_pass
+ self.pr = str_wait
+# self.set_debuglevel (1)
+ try:
+ self.open(self.host,self.port, timeout = 5)
+ self.read_until("word:",1)
+ self.write("{line_pass}\n".format(line_pass = self.line_passwd) )
+ res = self.read_until(">",1)
+ if 'Password' in res:
+ raise AuthError('Invalid line password was provided')
+ self.write("enable 15\n")
+ self.read_until("d:",1)
+ self.write("{en_pass}\n".format(en_pass = self.enable_passwd) )
+ res = self.read_until(self.pr,1)
+ if 'Password' in res:
+ raise AuthError('Invalid en password was provided')
+ self.write_ios_cmd(['terminal length 0'])
+
+ except socket.timeout:
+ raise socket.timeout('A timeout error has occured.\nCheck platform connectivity or the hostname defined in the config file')
+ except Exception as inst:
+ raise
+
+ def write_ios_cmd (self, cmd_list, result_from = 0, timeout = 1, **kwargs):
+ assert (isinstance (cmd_list, list) == True)
+
+ if 'flush_first' in kwargs:
+ self.read_until(self.pr, timeout) # clear any accumulated data in telnet session
+
+ res = ''
+ wf = ''
+ if 'read_until' in kwargs:
+ wf = kwargs['read_until']
+ else:
+ wf = self.pr
+
+ for idx, cmd in enumerate(cmd_list):
+ self.write(cmd+'\r\n')
+ if idx < result_from:
+ # don't care for return string
+ if type(wf) is list:
+ self.expect(wf, timeout)[2]
+ else:
+ self.read_until(wf, timeout)
+ else:
+ # care for return string
+ if type(wf) is list:
+ res += self.expect(wf, timeout)[2]
+ else:
+ res += self.read_until(wf, timeout)
+# return res.split('\r\n')
+ return res # return the received response as a string, each line is seperated by '\r\n'.
+
+
+if __name__ == "__main__":
+# dev_cfg = CDeviceCfg('config/config.yaml')
+# print dev_cfg.get_platform_connection_data()
+# telnet = CIosTelnet( **(dev_cfg.get_platform_connection_data() ) )
+
+# if_mng = CIfManager()
+# if_mng.load_config(dev_cfg)
+# if_mng.dump_config()
+ pass
diff --git a/scripts/automation/regression/sshpass.exp b/scripts/automation/regression/sshpass.exp
new file mode 100755
index 00000000..2262290f
--- /dev/null
+++ b/scripts/automation/regression/sshpass.exp
@@ -0,0 +1,17 @@
+#!/usr/cisco/bin/expect -f
+# sample command: ./ssh.exp password 192.168.1.11 id *
+set pass [lrange $argv 0 0]
+set server [lrange $argv 1 1]
+set name [lrange $argv 2 2]
+set cmd [lrange $argv 3 10]
+
+set cmd_str [join $cmd]
+
+spawn ssh -t $name@$server $cmd_str
+match_max 100000
+expect "*?assword:*"
+send -- "$pass\r"
+send -- "\r"
+expect eof
+wait
+#interact
diff --git a/scripts/automation/regression/stateless_example.py b/scripts/automation/regression/stateless_example.py
new file mode 100755
index 00000000..93fb2703
--- /dev/null
+++ b/scripts/automation/regression/stateless_example.py
@@ -0,0 +1,47 @@
+#!/router/bin/python
+
+import outer_packages
+from client.trex_hltapi import CTRexHltApi
+import traceback
+import sys, time
+
+def fail(reason):
+ print 'Encountered error:\n%s' % reason
+ sys.exit(1)
+
+if __name__ == "__main__":
+ port_list = [0, 1]
+ #port_list = 1
+ try:
+ print 'init'
+ hlt_client = CTRexHltApi()
+
+ print 'connecting'
+ con = hlt_client.connect("localhost", port_list, "danklei", sync_port = 4501, async_port = 4500, break_locks=True, reset=True)#, port=6666)
+ print 'connected?', hlt_client.connected
+ if not hlt_client.trex_client or not hlt_client.connected:
+ fail(con['log'])
+ print 'connect result:', con
+
+ res = hlt_client.traffic_config("create", 0)#, ip_src_addr="2000.2.2")
+ print 'traffic_config result:', res
+
+ res = hlt_client.traffic_config("create", 1)#, ip_src_addr="2000.2.2")
+ print res
+ print 'got to running!'
+ #sys.exit(0)
+ res = hlt_client.traffic_control("run", 1, mul = {'type': 'raw', 'op': 'abs', 'value': 1}, duration = 15)#, ip_src_addr="2000.2.2")
+ print res
+ time.sleep(2)
+ res = hlt_client.traffic_control("stop", 1)#, ip_src_addr="2000.2.2")
+ print res
+
+
+
+ except Exception as e:
+ raise
+ finally:
+ #pass
+ if hlt_client.trex_client:
+ res = hlt_client.cleanup_session(port_list)
+ print res
diff --git a/scripts/automation/regression/style.css b/scripts/automation/regression/style.css
new file mode 100755
index 00000000..b5996af1
--- /dev/null
+++ b/scripts/automation/regression/style.css
@@ -0,0 +1,54 @@
+html {overflow-y:scroll;}
+
+body {
+ font-size:12px;
+ color:#000000;
+ background-color:#ffffff;
+ margin:0px;
+ font-family:verdana,helvetica,arial,sans-serif;
+}
+
+div {width:100%;}
+
+table,th,td,input,textarea {
+ font-size:100%;
+}
+
+table.reference, table.reference_fail {
+ background-color:#ffffff;
+ border:1px solid #c3c3c3;
+ border-collapse:collapse;
+ vertical-align:middle;
+}
+
+table.reference th {
+ background-color:#e5eecc;
+ border:1px solid #c3c3c3;
+ padding:3px;
+}
+
+table.reference_fail th {
+ background-color:#ffcccc;
+ border:1px solid #c3c3c3;
+ padding:3px;
+}
+
+
+table.reference td, table.reference_fail td {
+ border:1px solid #c3c3c3;
+ padding:3px;
+}
+
+a.example {font-weight:bold}
+
+#a:link,a:visited {color:#900B09; background-color:transparent}
+#a:hover,a:active {color:#FF0000; background-color:transparent}
+
+.linktr {
+ cursor: pointer;
+}
+
+.linktext {
+ color:#0000FF;
+ text-decoration: underline;
+} \ No newline at end of file
diff --git a/scripts/automation/regression/trex.py b/scripts/automation/regression/trex.py
new file mode 100644
index 00000000..b9fd87ec
--- /dev/null
+++ b/scripts/automation/regression/trex.py
@@ -0,0 +1,427 @@
+#!/router/bin/python
+
+import os
+import sys
+import subprocess
+import misc_methods
+import re
+import signal
+import time
+from CProgressDisp import TimedProgressBar
+import unit_tests.trex_general_test
+from unit_tests.tests_exceptions import TRexInUseError
+import datetime
+
+class CTRexRunner:
+ """This is an instance for generating a CTRexRunner"""
+
+ def __init__ (self, config_dict, yaml):
+ self.trex_config = config_dict#misc_methods.load_config_file(config_file)
+ self.yaml = yaml
+
+
+ def get_config (self):
+ """ get_config() -> dict
+
+ Returns the stored configuration of the T-Rex server of the CTRexRunner instance as a dictionary
+ """
+ return self.trex_config
+
+ def set_yaml_file (self, yaml_path):
+ """ update_yaml_file (self, yaml_path) -> None
+
+ Defines the yaml file to be used by the T-Rex.
+ """
+ self.yaml = yaml_path
+
+
+ def generate_run_cmd (self, multiplier, cores, duration, nc = True, export_path="/tmp/trex.txt", **kwargs):
+ """ generate_run_cmd(self, multiplier, duration, export_path) -> str
+
+ Generates a custom running command for the kick-off of the T-Rex traffic generator.
+ Returns a command (string) to be issued on the trex server
+
+ Parameters
+ ----------
+ multiplier : float
+ Defines the T-Rex multiplier factor (platform dependant)
+ duration : int
+ Defines the duration of the test
+ export_path : str
+ a full system path to which the results of the trex-run will be logged.
+
+ """
+ fileName, fileExtension = os.path.splitext(self.yaml)
+ if self.yaml == None:
+ raise ValueError('T-Rex yaml file is not defined')
+ elif fileExtension != '.yaml':
+ raise TypeError('yaml path is not referencing a .yaml file')
+
+ if 'results_file_path' in kwargs:
+ export_path = kwargs['results_file_path']
+
+ trex_cmd_str = './t-rex-64 -c %d -m %f -d %d -f %s '
+
+ if nc:
+ trex_cmd_str = trex_cmd_str + ' --nc '
+
+ trex_cmd = trex_cmd_str % (cores,
+ multiplier,
+ duration,
+ self.yaml)
+ # self.trex_config['trex_latency'])
+
+ for key, value in kwargs.iteritems():
+ tmp_key = key.replace('_','-')
+ dash = ' -' if (len(key)==1) else ' --'
+ if value == True:
+ trex_cmd += (dash + tmp_key)
+ else:
+ trex_cmd += (dash + '{k} {val}'.format( k = tmp_key, val = value ))
+
+ print "\nT-REX COMMAND: ", trex_cmd
+
+ cmd = 'sshpass.exp %s %s root "cd %s; %s > %s"' % (self.trex_config['trex_password'],
+ self.trex_config['trex_name'],
+ self.trex_config['trex_version_path'],
+ trex_cmd,
+ export_path)
+
+ return cmd;
+
+ def generate_fetch_cmd (self, result_file_full_path="/tmp/trex.txt"):
+ """ generate_fetch_cmd(self, result_file_full_path) -> str
+
+ Generates a custom command for which will enable to fetch the resutls of the T-Rex run.
+ Returns a command (string) to be issued on the trex server.
+
+ Example use: fetch_trex_results() - command that will fetch the content from the default log file- /tmp/trex.txt
+ fetch_trex_results("/tmp/trex_secondary_file.txt") - command that will fetch the content from a custom log file- /tmp/trex_secondary_file.txt
+ """
+ #dir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))
+ script_running_dir = os.path.dirname(os.path.realpath(__file__)) # get the current script working directory so that the sshpass could be accessed.
+ cmd = script_running_dir + '/sshpass.exp %s %s root "cat %s"' % (self.trex_config['trex_password'],
+ self.trex_config['trex_name'],
+ result_file_full_path);
+ return cmd;
+
+
+
+ def run (self, multiplier, cores, duration, **kwargs):
+ """ run(self, multiplier, duration, results_file_path) -> CTRexResults
+
+ Running the T-Rex server based on the config file.
+ Returns a CTRexResults object containing the results of the run.
+
+ Parameters
+ ----------
+ multiplier : float
+ Defines the T-Rex multiplier factor (platform dependant)
+ duration : int
+ Defines the duration of the test
+ results_file_path : str
+ a full system path to which the results of the trex-run will be logged and fetched from.
+
+ """
+ tmp_path = None
+ # print kwargs
+ if 'export_path' in kwargs:
+ tmp_path = kwargs['export_path']
+ del kwargs['export_path']
+ cmd = self.generate_run_cmd(multiplier, cores, duration, tmp_path, **kwargs)
+ else:
+ cmd = self.generate_run_cmd(multiplier, cores, duration, **kwargs)
+
+# print 'T-REx complete command to be used:'
+# print cmd
+ # print kwargs
+
+ progress_thread = TimedProgressBar(duration)
+ progress_thread.start()
+ interrupted = False
+ try:
+ start_time = time.time()
+ start = datetime.datetime.now()
+ results = subprocess.call(cmd, shell = True, stdout = open(os.devnull, 'wb'))
+ end_time = time.time()
+ fin = datetime.datetime.now()
+ # print "Time difference : ", fin-start
+ runtime_deviation = abs(( (end_time - start_time)/ (duration+15) ) - 1)
+ print "runtime_deviation: %2.0f %%" % ( runtime_deviation*100.0)
+ if ( runtime_deviation > 0.6 ) :
+ # If the run stopped immediately - classify as Trex in use or reachability issue
+ interrupted = True
+ if ((end_time - start_time) < 2):
+ raise TRexInUseError ('T-Rex run failed since T-Rex is used by another process, or due to reachability issues')
+ else:
+ unit_tests.trex_general_test.CTRexScenario.trex_crashed = True
+ # results = subprocess.Popen(cmd, stdout = open(os.devnull, 'wb'),
+ # shell=True, preexec_fn=os.setsid)
+ except KeyboardInterrupt:
+ print "\nT-Rex test interrupted by user during traffic generation!!"
+ results.killpg(results.pid, signal.SIGTERM) # Send the kill signal to all the process groups
+ interrupted = True
+ raise RuntimeError
+ finally:
+ progress_thread.join(isPlannedStop = (not interrupted) )
+
+ if results!=0:
+ sys.stderr.write("T-Rex run failed. Please Contact trex-dev mailer for further details")
+ sys.stderr.flush()
+ return None
+ elif interrupted:
+ sys.stderr.write("T-Rex run failed due user-interruption.")
+ sys.stderr.flush()
+ return None
+ else:
+
+ if tmp_path:
+ cmd = self.generate_fetch_cmd( tmp_path )#**kwargs)#results_file_path)
+ else:
+ cmd = self.generate_fetch_cmd()
+
+ try:
+ run_log = subprocess.check_output(cmd, shell = True)
+ trex_result = CTRexResult(None, run_log)
+ trex_result.load_file_lines()
+ trex_result.parse()
+
+ return trex_result
+
+ except subprocess.CalledProcessError:
+ sys.stderr.write("TRex result fetching failed. Please Contact trex-dev mailer for further details")
+ sys.stderr.flush()
+ return None
+
+class CTRexResult():
+ """This is an instance for generating a CTRexResult"""
+ def __init__ (self, file, buffer = None):
+ self.file = file
+ self.buffer = buffer
+ self.result = {}
+
+
+ def load_file_lines (self):
+ """ load_file_lines(self) -> None
+
+ Loads into the self.lines the content of self.file
+ """
+ if self.buffer:
+ self.lines = self.buffer.split("\n")
+ else:
+ f = open(self.file,'r')
+ self.lines = f.readlines()
+ f.close()
+
+
+ def dump (self):
+ """ dump(self) -> None
+
+ Prints nicely the content of self.result dictionary into the screen
+ """
+ for key, value in self.result.items():
+ print "{0:20} : \t{1}".format(key, float(value))
+
+ def update (self, key, val, _str):
+ """ update (self, key, val, _str) -> None
+
+ Updates the self.result[key] with a possibly new value representation of val
+ Example: 15K might be updated into 15000.0
+
+ Parameters
+ ----------
+ key :
+ Key of the self.result dictionary of the TRexResult instance
+ val : float
+ Key of the self.result dictionary of the TRexResult instance
+ _str : str
+ a represntation of the BW (.
+
+ """
+
+ s = _str.strip()
+
+ if s[0]=="G":
+ val = val*1E9
+ elif s[0]=="M":
+ val = val*1E6
+ elif s[0]=="K":
+ val = val*1E3
+
+ if self.result.has_key(key):
+ if self.result[key] > 0:
+ if (val/self.result[key] > 0.97 ):
+ self.result[key]= val
+ else:
+ self.result[key] = val
+ else:
+ self.result[key] = val
+
+
+
+ def parse (self):
+ """ parse(self) -> None
+
+ Parse the content of the result file from the TRex test and upload the data into
+ """
+ stop_read = False
+ d = {
+ 'total-tx' : 0,
+ 'total-rx' : 0,
+ 'total-pps' : 0,
+ 'total-cps' : 0,
+
+ 'expected-pps' : 0,
+ 'expected-cps' : 0,
+ 'expected-bps' : 0,
+ 'active-flows' : 0,
+ 'open-flows' : 0
+ }
+
+ self.error = ""
+
+ # Parse the output of the test, line by line (each line matches another RegEx and as such
+ # different rules apply
+ for line in self.lines:
+ match = re.match(".*/var/run/.rte_config.*", line)
+ if match:
+ stop_read = True
+ continue
+
+ #Total-Tx : 462.42 Mbps Nat_time_out : 0 ==> we try to parse the next decimal in this case Nat_time_out
+# match = re.match("\W*(\w(\w|[-])+)\W*([:]|[=])\W*(\d+[.]\d+)\W*\w+\W+(\w+)\W*([:]|[=])\W*(\d+)(.*)", line);
+# if match:
+# key = misc_methods.mix_string(match.group(5))
+# val = float(match.group(7))
+# # continue to parse !! we try the second
+# self.result[key] = val #update latest
+
+ # check if we need to stop reading
+ match = re.match(".*latency daemon has stopped.*", line)
+ if match:
+ stop_read = True
+ continue
+
+ match = re.match("\W*(\w(\w|[-])+)\W*([:]|[=])\W*(\d+[.]\d+)(.*ps)\s+(\w+)\W*([:]|[=])\W*(\d+)", line)
+ if match:
+ key = misc_methods.mix_string(match.group(1))
+ val = float(match.group(4))
+ if d.has_key(key):
+ if stop_read == False:
+ self.update (key, val, match.group(5))
+ else:
+ self.result[key] = val # update latest
+ key2 = misc_methods.mix_string(match.group(6))
+ val2 = int(match.group(8))
+ self.result[key2] = val2 # always take latest
+
+
+ match = re.match("\W*(\w(\w|[-])+)\W*([:]|[=])\W*(\d+[.]\d+)(.*)", line)
+ if match:
+ key = misc_methods.mix_string(match.group(1))
+ val = float(match.group(4))
+ if d.has_key(key):
+ if stop_read == False:
+ self.update (key, val, match.group(5))
+ else:
+ self.result[key] = val # update latest
+ continue
+
+ match = re.match("\W*(\w(\w|[-])+)\W*([:]|[=])\W*(\d+)(.*)", line)
+ if match:
+ key = misc_methods.mix_string(match.group(1))
+ val = float(match.group(4))
+ self.result[key] = val #update latest
+ continue
+
+ match = re.match("\W*(\w(\w|[-])+)\W*([:]|[=])\W*(OK)(.*)", line)
+ if match:
+ key = misc_methods.mix_string(match.group(1))
+ val = 0 # valid
+ self.result[key] = val #update latest
+ continue
+
+ match = re.match("\W*(Cpu Utilization)\W*([:]|[=])\W*(\d+[.]\d+) %(.*)", line)
+ if match:
+ key = misc_methods.mix_string(match.group(1))
+ val = float(match.group(3))
+ if self.result.has_key(key):
+ if (self.result[key] < val): # update only if larger than previous value
+ self.result[key] = val
+ else:
+ self.result[key] = val
+ continue
+
+ match = re.match(".*(rx_check\s.*)\s+:\s+(\w+)", line)
+ if match:
+ key = misc_methods.mix_string(match.group(1))
+ try:
+ val = int(match.group(2))
+ except ValueError: # corresponds with rx_check validation case
+ val = match.group(2)
+ finally:
+ self.result[key] = val
+ continue
+
+
+ def get_status (self, drop_expected = False):
+ if (self.error != ""):
+ print self.error
+ return (self.STATUS_ERR_FATAL)
+
+ d = self.result
+
+ # test for latency
+ latency_limit = 5000
+ if ( d['maximum-latency'] > latency_limit ):
+ self.reason="Abnormal latency measured (higher than %s" % latency_limit
+ return self.STATUS_ERR_LATENCY
+
+ # test for drops
+ if drop_expected == False:
+ if ( d['total-pkt-drop'] > 0 ):
+ self.reason=" At least one packet dropped "
+ return self.STATUS_ERR_DROP
+
+ # test for rx/tx distance
+ rcv_vs_tx = d['total-tx']/d['total-rx']
+ if ( (rcv_vs_tx >1.2) or (rcv_vs_tx <0.9) ):
+ self.reason="rx and tx should be close"
+ return self.STATUS_ERR_RX_TX_DISTANCE
+
+ # expected measurement
+ expect_vs_measued=d['total-tx']/d['expected-bps']
+ if ( (expect_vs_measued >1.1) or (expect_vs_measued < 0.9) ) :
+ print expect_vs_measued
+ print d['total-tx']
+ print d['expected-bps']
+ self.reason="measure is not as expected"
+ return self.STATUS_ERR_BAD_EXPECTED_MEASUREMENT
+
+ if ( d['latency-any-error'] !=0 ):
+ self.reason=" latency-any-error has error"
+ return self.STATUS_ERR_LATENCY_ANY_ERROR
+
+ return self.STATUS_OK
+
+ # return types
+ STATUS_OK = 0
+ STATUS_ERR_FATAL = 1
+ STATUS_ERR_LATENCY = 2
+ STATUS_ERR_DROP = 3
+ STATUS_ERR_RX_TX_DISTANCE = 4
+ STATUS_ERR_BAD_EXPECTED_MEASUREMENT = 5,
+ STATUS_ERR_LATENCY_ANY_ERROR = 6
+
+def test_TRex_result_parser():
+ t=CTRexResult('trex.txt');
+ t.load_file_lines()
+ t.parse()
+ print t.result
+
+
+
+
+if __name__ == "__main__":
+ #test_TRex_result_parser();
+ pass
diff --git a/scripts/automation/regression/trex_unit_test.py b/scripts/automation/regression/trex_unit_test.py
new file mode 100755
index 00000000..9e3652b4
--- /dev/null
+++ b/scripts/automation/regression/trex_unit_test.py
@@ -0,0 +1,273 @@
+#!/router/bin/python
+
+__copyright__ = "Copyright 2014"
+
+"""
+Name:
+ trex_unit_test.py
+
+
+Description:
+
+ This script creates the functionality to test the performance of the T-Rex traffic generator
+ The tested scenario is a T-Rex TG directly connected to a Cisco router.
+
+::
+
+ Topology:
+
+ ------- --------
+ | | Tx---1gig/10gig----Rx | |
+ | T-Rex | | router |
+ | | Rx---1gig/10gig----Tx | |
+ ------- --------
+
+"""
+
+import os
+import sys
+import outer_packages
+import nose
+from nose.plugins import Plugin
+import logging
+import CustomLogger
+import misc_methods
+from rednose import RedNose
+import termstyle
+from unit_tests.trex_general_test import CTRexScenario
+from client.trex_client import *
+from common.trex_exceptions import *
+import trex
+import socket
+from pprint import pprint
+import subprocess
+import re
+
+def check_trex_path(trex_path):
+ if os.path.isfile('%s/trex_daemon_server' % trex_path):
+ return os.path.abspath(trex_path)
+
+def check_setup_path(setup_path):
+ if os.path.isfile('%s/config.yaml' % setup_path):
+ return os.path.abspath(setup_path)
+
+
+def get_trex_path():
+ latest_build_path = check_trex_path(os.getenv('TREX_UNDER_TEST')) # TREX_UNDER_TEST is env var pointing to <trex-core>/scripts
+ if not latest_build_path:
+ latest_build_path = check_trex_path(os.path.join(os.pardir, os.pardir))
+ if not latest_build_path:
+ raise Exception('Could not determine trex_under_test folder, try setting env.var. TREX_UNDER_TEST')
+ return latest_build_path
+
+DAEMON_STOP_COMMAND = 'cd %s; ./trex_daemon_server stop; sleep 1; ./trex_daemon_server stop;' % get_trex_path()
+DAEMON_START_COMMAND = DAEMON_STOP_COMMAND + 'sleep 1; rm /var/log/trex/trex_daemon_server.log; ./trex_daemon_server start; sleep 2; ./trex_daemon_server show'
+
+def _start_stop_trex_remote_server(trex_data, command):
+ # start t-rex server as daemon process
+ # subprocess.call(["/usr/bin/python", "trex_daemon_server", "restart"], cwd = trex_latest_build)
+ misc_methods.run_remote_command(trex_data['trex_name'],
+ trex_data['trex_password'],
+ command)
+
+def start_trex_remote_server(trex_data, kill_running = False):
+ if kill_running:
+ (return_code, stdout, stderr) = misc_methods.run_remote_command(trex_data['trex_name'],
+ trex_data['trex_password'],
+ 'ps -u root --format comm,pid,cmd | grep t-rex-64')
+ if stdout:
+ for process in stdout.split('\n'):
+ try:
+ proc_name, pid, full_cmd = re.split('\s+', process, maxsplit=2)
+ if proc_name.find('t-rex-64') >= 0:
+ print 'Killing remote process: %s' % full_cmd
+ misc_methods.run_remote_command(trex_data['trex_name'],
+ trex_data['trex_password'],
+ 'kill %s' % pid)
+ except:
+ continue
+
+ _start_stop_trex_remote_server(trex_data, DAEMON_START_COMMAND)
+
+def stop_trex_remote_server(trex_data):
+ _start_stop_trex_remote_server(trex_data, DAEMON_STOP_COMMAND)
+
+class CTRexTestConfiguringPlugin(Plugin):
+ def options(self, parser, env = os.environ):
+ super(CTRexTestConfiguringPlugin, self).options(parser, env)
+ parser.add_option('--cfg', '--trex-scenario-config', action='store',
+ dest='config_path',
+ help='Specify path to folder with config.yaml and benchmark.yaml')
+ parser.add_option('--skip-clean', '--skip_clean', action='store_true',
+ dest='skip_clean_config',
+ help='Skip the clean configuration replace on the platform.')
+ parser.add_option('--load-image', '--load_image', action='store_true', default = False,
+ dest='load_image',
+ help='Install image specified in config file on router.')
+ parser.add_option('--log-path', '--log_path', action='store',
+ dest='log_path',
+ help='Specify path for the tests` log to be saved at. Once applied, logs capturing by nose will be disabled.') # Default is CURRENT/WORKING/PATH/trex_log/trex_log.log')
+ parser.add_option('--verbose-mode', '--verbose_mode', action="store_true", default = False,
+ dest="verbose_mode",
+ help="Print RPC command and router commands.")
+ parser.add_option('--server-logs', '--server_logs', action="store_true", default = False,
+ dest="server_logs",
+ help="Print server side (TRex and trex_daemon) logs per test.")
+ parser.add_option('--kill-running', '--kill_running', action="store_true", default = False,
+ dest="kill_running",
+ help="Kills running TRex process on remote server (useful for regression).")
+ parser.add_option('--dave', action="store_true", default = False,
+ dest="dave",
+ help="Dave's setup (temporary workaround flag, remove it ASAP).")
+
+ def configure(self, options, conf):
+ if CTRexScenario.setup_dir and options.config_path:
+ raise Exception('Please either define --cfg or use env. variable SETUP_DIR, not both.')
+ if not options.config_path and CTRexScenario.setup_dir:
+ options.config_path = CTRexScenario.setup_dir
+ if options.config_path:
+ self.configuration = misc_methods.load_complete_config_file(os.path.join(options.config_path, 'config.yaml'))
+ self.benchmark = misc_methods.load_benchmark_config_file(os.path.join(options.config_path, 'benchmark.yaml'))
+ self.enabled = True
+ else:
+ raise Exception('Please specify path to config.yaml using --cfg parameter or env. variable SETUP_DIR')
+ self.modes = self.configuration.trex.get('modes', [])
+ self.kill_running = options.kill_running
+ self.load_image = options.load_image
+ self.verbose_mode = options.verbose_mode
+ self.clean_config = False if options.skip_clean_config else True
+ self.server_logs = options.server_logs
+ self.dave = options.dave
+
+ if options.log_path:
+ self.loggerPath = options.log_path
+
+ def begin (self):
+ # launch t-rex server on relevant setup
+ if not self.dave:
+ start_trex_remote_server(self.configuration.trex, self.kill_running)
+
+ # initialize CTRexScenario global testing class, to be used by all tests
+ CTRexScenario.configuration = self.configuration
+ CTRexScenario.benchmark = self.benchmark
+ CTRexScenario.modes = set(self.modes)
+ CTRexScenario.server_logs = self.server_logs
+ CTRexScenario.trex = CTRexClient(trex_host = self.configuration.trex['trex_name'], verbose = self.verbose_mode)
+ if 'loopback' not in self.modes:
+ CTRexScenario.router_cfg = dict( config_dict = self.configuration.router,
+ forceImageReload = self.load_image,
+ silent_mode = not self.verbose_mode,
+ forceCleanConfig = self.clean_config,
+ tftp_config_dict = self.configuration.tftp )
+ try:
+ CustomLogger.setup_custom_logger('TRexLogger', self.loggerPath)
+ except AttributeError:
+ CustomLogger.setup_custom_logger('TRexLogger')
+
+ def finalize(self, result):
+ CTRexScenario.is_init = False
+ if not self.dave:
+ stop_trex_remote_server(self.configuration.trex)
+
+
+def save_setup_info():
+ try:
+ if CTRexScenario.setup_name and CTRexScenario.trex_version:
+ setup_info = ''
+ for key, value in CTRexScenario.trex_version.items():
+ setup_info += '{0:8}: {1}\n'.format(key, value)
+ cfg = CTRexScenario.configuration
+ setup_info += 'Server: %s, Modes: %s' % (cfg.trex.get('trex_name'), cfg.trex.get('modes'))
+ if cfg.router:
+ setup_info += '\nRouter: Model: %s, Image: %s' % (cfg.router.get('model'), CTRexScenario.router_image)
+ with open('%s/report_%s.info' % (CTRexScenario.report_dir, CTRexScenario.setup_name), 'w') as f:
+ f.write(setup_info)
+ except Exception as err:
+ print 'Error saving setup info: %s ' % err
+
+
+def set_report_dir (report_dir):
+ if not os.path.exists(report_dir):
+ os.mkdir(report_dir)
+
+
+if __name__ == "__main__":
+
+ # setting defaults. By default we run all the test suite
+ specific_tests = False
+ disableLogCapture = False
+ long_test = False
+ xml_name = 'unit_test.xml'
+ CTRexScenario.report_dir = 'reports'
+ setup_dir = os.getenv('SETUP_DIR', '')#.rstrip('/')
+ CTRexScenario.setup_dir = check_setup_path(setup_dir)
+ if not CTRexScenario.setup_dir:
+ CTRexScenario.setup_dir = check_setup_path(os.path.join(os.pardir, os.pardir, os.pardir, os.pardir, 'trex-local', 'setups', setup_dir))
+
+ if CTRexScenario.setup_dir:
+ CTRexScenario.setup_name = os.path.basename(CTRexScenario.setup_dir)
+ xml_name = 'report_%s.xml' % CTRexScenario.setup_name
+
+ nose_argv= sys.argv + ['-s', '-v', '--exe', '--rednose', '--detailed-errors', '--with-xunit', '--xunit-file=%s/%s' % (CTRexScenario.report_dir, xml_name)]
+
+ for arg in sys.argv:
+ if 'unit_tests/' in arg:
+ specific_tests = True
+ if 'log-path' in arg:
+ disableLogCapture = True
+ if arg == '--collect-only': # this is a user trying simply to view the available tests. removing xunit param from nose args
+ nose_argv[-3:-1] = []
+ CTRexScenario.is_test_list = True
+ else:
+ set_report_dir(CTRexScenario.report_dir)
+
+ # Run all of the unit tests or just the selected ones
+ if not specific_tests:
+ nose_argv += ['unit_tests']
+ if disableLogCapture:
+ nose_argv += ['--nologcapture']
+
+ try:
+ config_plugin = CTRexTestConfiguringPlugin()
+ red_nose = RedNose()
+ try:
+ result = nose.run(argv = nose_argv, addplugins = [red_nose, config_plugin])
+ except socket.error: # handle consecutive tests exception, try once again
+ print "TRex connectivity error identified. Possibly due to consecutive nightly runs.\nRetrying..."
+ result = nose.run(argv = nose_argv, addplugins = [red_nose, config_plugin])
+ finally:
+ save_setup_info()
+
+ if (result == True and not CTRexScenario.is_test_list):
+ print termstyle.green("""
+ ..::''''::..
+ .;'' ``;.
+ :: :: :: ::
+ :: :: :: ::
+ :: :: :: ::
+ :: .:' :: :: `:. ::
+ :: : : ::
+ :: `:. .:' ::
+ `;..``::::''..;'
+ ``::,,,,::''
+
+ ___ ___ __________
+ / _ \/ _ | / __/ __/ /
+ / ___/ __ |_\ \_\ \/_/
+ /_/ /_/ |_/___/___(_)
+
+ """)
+ sys.exit(0)
+ else:
+ sys.exit(-1)
+
+ finally:
+ pass
+
+
+
+
+
+
+
+
diff --git a/scripts/automation/regression/unit_tests/__init__.py b/scripts/automation/regression/unit_tests/__init__.py
new file mode 100755
index 00000000..8b137891
--- /dev/null
+++ b/scripts/automation/regression/unit_tests/__init__.py
@@ -0,0 +1 @@
+
diff --git a/scripts/automation/regression/unit_tests/functional_tests/config.yaml b/scripts/automation/regression/unit_tests/functional_tests/config.yaml
new file mode 100644
index 00000000..4f4c7c40
--- /dev/null
+++ b/scripts/automation/regression/unit_tests/functional_tests/config.yaml
@@ -0,0 +1,74 @@
+################################################################
+#### T-Rex nightly test configuration file ####
+################################################################
+
+
+### T-Rex configuration:
+# hostname - can be DNS name or IP for the TRex machine for ssh to the box
+# password - root password for TRex machine
+# is_dual - should the TRex inject with -p ?
+# version_path - path to the t-rex version and executable
+# cores - how many cores should be used
+# latency - rate of latency packets injected by the TRex
+
+### Router configuration:
+# hostname - the router hostname as apears in ______# cli prefix
+# ip_address - the router's ip that can be used to communicate with
+# image - the desired imaged wished to be loaded as the router's running config
+# line_password - router password when access via Telent
+# en_password - router password when changing to "enable" mode
+# interfaces - an array of client-server pairs, representing the interfaces configurations of the router
+# configurations - an array of configurations that could possibly loaded into the router during the test.
+# The "clean" configuration is a mandatory configuration the router will load with to run the basic test bench
+
+### TFTP configuration:
+# hostname - the tftp hostname
+# ip_address - the tftp's ip address
+# images_path - the tftp's relative path in which the router's images are located
+
+### Test_misc configuration:
+# expected_bw - the "golden" bandwidth (in Gbps) results planned on receiving from the test
+
+trex:
+ hostname : hostname
+ password : root password
+ version_path : not used
+ cores : 1
+
+router:
+ model : device model
+ hostname : device hostname
+ ip_address : device ip
+ image : device image name
+ line_password : telnet pass
+ en_password : enable pass
+ mgmt_interface : GigabitEthernet0/0/0
+ clean_config : path to clean_config file
+ intf_masking : 255.255.255.0
+ ipv6_mask : 64
+ interfaces :
+ - client :
+ name : GigabitEthernet0/0/1
+ src_mac_addr : 0000.0001.0000
+ dest_mac_addr : 0000.1000.0000
+ server :
+ name : GigabitEthernet0/0/2
+ src_mac_addr : 0000.0002.0000
+ dest_mac_addr : 0000.2000.0000
+ vrf_name : null
+ - client :
+ name : GigabitEthernet0/0/3
+ src_mac_addr : 0000.0003.0000
+ dest_mac_addr : 0000.3000.0000
+ server :
+ name : GigabitEthernet0/0/4
+ src_mac_addr : 0000.0004.0000
+ dest_mac_addr : 0000.4000.0000
+ vrf_name : dup
+
+
+tftp:
+ hostname : tftp hostname
+ ip_address : tftp ip
+ root_dir : tftp root dir
+ images_path : path related to root dir
diff --git a/scripts/automation/regression/unit_tests/functional_tests/functional_general_test.py b/scripts/automation/regression/unit_tests/functional_tests/functional_general_test.py
new file mode 100755
index 00000000..525b58d2
--- /dev/null
+++ b/scripts/automation/regression/unit_tests/functional_tests/functional_general_test.py
@@ -0,0 +1,22 @@
+#!/router/bin/python
+
+from nose.tools import assert_equal
+from nose.tools import assert_not_equal
+from nose.tools import assert_raises
+from nose.tools import raises
+
+
+class CGeneralFunctional_Test(object):
+ def __init__(self):
+ pass
+
+
+ def setUp(self):
+ pass
+
+
+ def tearDown(self):
+ pass
+
+if __name__ == "__main__":
+ pass
diff --git a/scripts/automation/regression/unit_tests/functional_tests/misc_methods_test.py b/scripts/automation/regression/unit_tests/functional_tests/misc_methods_test.py
new file mode 100755
index 00000000..096f86d8
--- /dev/null
+++ b/scripts/automation/regression/unit_tests/functional_tests/misc_methods_test.py
@@ -0,0 +1,61 @@
+#!/router/bin/python
+
+import functional_general_test
+import misc_methods
+from nose.tools import assert_equal
+from nose.tools import assert_not_equal
+from nose.tools import assert_raises
+from nose.tools import raises
+
+
+class MiscMethods_Test(functional_general_test.CGeneralFunctional_Test):
+
+ def setUp(self):
+ self.ipv4_gen = misc_methods.get_network_addr()
+ self.ipv6_gen = misc_methods.get_network_addr(ip_type = 'ipv6')
+ pass
+
+ def test_ipv4_gen(self):
+ for i in range(1, 255):
+ assert_equal( next(self.ipv4_gen), [".".join( map(str, [1, 1, i, 0])), '255.255.255.0'] )
+
+ def test_ipv6_gen(self):
+ tmp_ipv6_addr = ['2001', 'DB8', 0, '2222', 0, 0, 0, 0]
+ for i in range(0, 255):
+ tmp_ipv6_addr[2] = hex(i)[2:]
+ assert_equal( next(self.ipv6_gen), ":".join( map(str, tmp_ipv6_addr)) )
+
+ def test_get_ipv4_client_addr(self):
+ tmp_ipv4_addr = next(self.ipv4_gen)[0]
+ assert_equal ( misc_methods.get_single_net_client_addr(tmp_ipv4_addr), '1.1.1.1')
+ assert_raises (ValueError, misc_methods.get_single_net_client_addr, tmp_ipv4_addr, {'3' : 255} )
+
+ def test_get_ipv6_client_addr(self):
+ tmp_ipv6_addr = next(self.ipv6_gen)
+ assert_equal ( misc_methods.get_single_net_client_addr(tmp_ipv6_addr, {'7' : 1}, ip_type = 'ipv6'), '2001:DB8:0:2222:0:0:0:1')
+ assert_equal ( misc_methods.get_single_net_client_addr(tmp_ipv6_addr, {'7' : 2}, ip_type = 'ipv6'), '2001:DB8:0:2222:0:0:0:2')
+ assert_raises (ValueError, misc_methods.get_single_net_client_addr, tmp_ipv6_addr, {'7' : 70000} )
+
+
+ @raises(ValueError)
+ def test_ipv4_client_addr_exception(self):
+ tmp_ipv4_addr = next(self.ipv4_gen)[0]
+ misc_methods.get_single_net_client_addr(tmp_ipv4_addr, {'4' : 1})
+
+ @raises(ValueError)
+ def test_ipv6_client_addr_exception(self):
+ tmp_ipv6_addr = next(self.ipv6_gen)
+ misc_methods.get_single_net_client_addr(tmp_ipv6_addr, {'8' : 1}, ip_type = 'ipv6')
+
+ @raises(StopIteration)
+ def test_gen_ipv4_to_limit (self):
+ while(True):
+ next(self.ipv4_gen)
+
+ @raises(StopIteration)
+ def test_gen_ipv6_to_limit (self):
+ while(True):
+ next(self.ipv6_gen)
+
+ def tearDown(self):
+ pass
diff --git a/scripts/automation/regression/unit_tests/functional_tests/platform_cmd_cache_test.py b/scripts/automation/regression/unit_tests/functional_tests/platform_cmd_cache_test.py
new file mode 100755
index 00000000..24ccf7a5
--- /dev/null
+++ b/scripts/automation/regression/unit_tests/functional_tests/platform_cmd_cache_test.py
@@ -0,0 +1,60 @@
+#!/router/bin/python
+
+from platform_cmd_link import *
+import functional_general_test
+from nose.tools import assert_equal
+from nose.tools import assert_not_equal
+
+
+class CCommandCache_Test(functional_general_test.CGeneralFunctional_Test):
+
+ def setUp(self):
+ self.cache = CCommandCache()
+ self.cache.add('IF', "ip nbar protocol-discovery", 'GigabitEthernet0/0/1')
+ self.cache.add('IF', "ip nbar protocol-discovery", 'GigabitEthernet0/0/2')
+ self.cache.add('conf', "arp 1.1.1.1 0000.0001.0000 arpa")
+ self.cache.add('conf', "arp 1.1.2.1 0000.0002.0000 arpa")
+ self.cache.add('exec', "show ip nbar protocol-discovery stats packet-count")
+
+ def test_add(self):
+ assert_equal(self.cache.cache['IF'],
+ {'GigabitEthernet0/0/1' : ['ip nbar protocol-discovery'],
+ 'GigabitEthernet0/0/2' : ['ip nbar protocol-discovery']
+ })
+ assert_equal(self.cache.cache['CONF'],
+ ["arp 1.1.1.1 0000.0001.0000 arpa",
+ "arp 1.1.2.1 0000.0002.0000 arpa"]
+ )
+ assert_equal(self.cache.cache['EXEC'],
+ ["show ip nbar protocol-discovery stats packet-count"])
+
+ def test_dump_config (self):
+ import sys
+ from StringIO import StringIO
+ saved_stdout = sys.stdout
+ try:
+ out = StringIO()
+ sys.stdout = out
+ self.cache.dump_config()
+ output = out.getvalue().strip()
+ assert_equal(output,
+ "configure terminal\ninterface GigabitEthernet0/0/1\nip nbar protocol-discovery\ninterface GigabitEthernet0/0/2\nip nbar protocol-discovery\nexit\narp 1.1.1.1 0000.0001.0000 arpa\narp 1.1.2.1 0000.0002.0000 arpa\nexit\nshow ip nbar protocol-discovery stats packet-count"
+ )
+ finally:
+ sys.stdout = saved_stdout
+
+ def test_get_config_list (self):
+ assert_equal(self.cache.get_config_list(),
+ ["configure terminal", "interface GigabitEthernet0/0/1", "ip nbar protocol-discovery", "interface GigabitEthernet0/0/2", "ip nbar protocol-discovery", "exit", "arp 1.1.1.1 0000.0001.0000 arpa", "arp 1.1.2.1 0000.0002.0000 arpa", "exit", "show ip nbar protocol-discovery stats packet-count"]
+ )
+
+ def test_clear_cache (self):
+ self.cache.clear_cache()
+ assert_equal(self.cache.cache,
+ {"IF" : {},
+ "CONF" : [],
+ "EXEC" : []}
+ )
+
+ def tearDown(self):
+ self.cache.clear_cache()
diff --git a/scripts/automation/regression/unit_tests/functional_tests/platform_cmd_link_test.py b/scripts/automation/regression/unit_tests/functional_tests/platform_cmd_link_test.py
new file mode 100755
index 00000000..7a31815b
--- /dev/null
+++ b/scripts/automation/regression/unit_tests/functional_tests/platform_cmd_link_test.py
@@ -0,0 +1,62 @@
+#!/router/bin/python
+
+from platform_cmd_link import *
+import functional_general_test
+from nose.tools import assert_equal
+from nose.tools import assert_not_equal
+
+
+class CCommandLink_Test(functional_general_test.CGeneralFunctional_Test):
+
+ def setUp(self):
+ self.cache = CCommandCache()
+ self.cache.add('IF', "ip nbar protocol-discovery", 'GigabitEthernet0/0/1')
+ self.cache.add('IF', "ip nbar protocol-discovery", 'GigabitEthernet0/0/2')
+ self.cache.add('conf', "arp 1.1.1.1 0000.0001.0000 arpa")
+ self.cache.add('conf', "arp 1.1.2.1 0000.0002.0000 arpa")
+ self.cache.add('exec', "show ip nbar protocol-discovery stats packet-count")
+ self.com_link = CCommandLink()
+
+ def test_transmit(self):
+ # test here future implemntatin of platform physical link
+ pass
+
+ def test_run_cached_command (self):
+ self.com_link.run_command([self.cache])
+
+ assert_equal (self.com_link.get_history(),
+ ["configure terminal", "interface GigabitEthernet0/0/1", "ip nbar protocol-discovery", "interface GigabitEthernet0/0/2", "ip nbar protocol-discovery", "exit", "arp 1.1.1.1 0000.0001.0000 arpa", "arp 1.1.2.1 0000.0002.0000 arpa", "exit", "show ip nbar protocol-discovery stats packet-count"]
+ )
+
+ self.com_link.clear_history()
+ self.com_link.run_single_command(self.cache)
+ assert_equal (self.com_link.get_history(),
+ ["configure terminal", "interface GigabitEthernet0/0/1", "ip nbar protocol-discovery", "interface GigabitEthernet0/0/2", "ip nbar protocol-discovery", "exit", "arp 1.1.1.1 0000.0001.0000 arpa", "arp 1.1.2.1 0000.0002.0000 arpa", "exit", "show ip nbar protocol-discovery stats packet-count"]
+ )
+
+ def test_run_single_command(self):
+ self.com_link.run_single_command("show ip nbar protocol-discovery stats packet-count")
+ assert_equal (self.com_link.get_history(),
+ ["show ip nbar protocol-discovery stats packet-count"]
+ )
+
+ def test_run_mixed_commands (self):
+ self.com_link.run_single_command("show ip nbar protocol-discovery stats packet-count")
+ self.com_link.run_command([self.cache])
+ self.com_link.run_command(["show ip interface brief"])
+
+ assert_equal (self.com_link.get_history(),
+ ["show ip nbar protocol-discovery stats packet-count",
+ "configure terminal", "interface GigabitEthernet0/0/1", "ip nbar protocol-discovery", "interface GigabitEthernet0/0/2", "ip nbar protocol-discovery", "exit", "arp 1.1.1.1 0000.0001.0000 arpa", "arp 1.1.2.1 0000.0002.0000 arpa", "exit", "show ip nbar protocol-discovery stats packet-count",
+ "show ip interface brief"]
+ )
+
+ def test_clear_history (self):
+ self.com_link.run_command(["show ip interface brief"])
+ self.com_link.clear_history()
+ assert_equal (self.com_link.get_history(), [])
+
+ def tearDown(self):
+ self.cache.clear_cache()
+
+
diff --git a/scripts/automation/regression/unit_tests/functional_tests/platform_device_cfg_test.py b/scripts/automation/regression/unit_tests/functional_tests/platform_device_cfg_test.py
new file mode 100755
index 00000000..890d0cb9
--- /dev/null
+++ b/scripts/automation/regression/unit_tests/functional_tests/platform_device_cfg_test.py
@@ -0,0 +1,20 @@
+#!/router/bin/python
+
+from platform_cmd_link import *
+import functional_general_test
+from nose.tools import assert_equal
+from nose.tools import assert_not_equal
+
+
+class CDeviceCfg_Test(functional_general_test.CGeneralFunctional_Test):
+
+ def setUp(self):
+ self.dev_cfg = CDeviceCfg('./unit_tests/functional_tests/config.yaml')
+
+ def test_get_interfaces_cfg(self):
+ assert_equal (self.dev_cfg.get_interfaces_cfg(),
+ [{'client': {'src_mac_addr': '0000.0001.0000', 'name': 'GigabitEthernet0/0/1', 'dest_mac_addr': '0000.1000.0000'}, 'vrf_name': None, 'server': {'src_mac_addr': '0000.0002.0000', 'name': 'GigabitEthernet0/0/2', 'dest_mac_addr': '0000.2000.0000'}}, {'client': {'src_mac_addr': '0000.0003.0000', 'name': 'GigabitEthernet0/0/3', 'dest_mac_addr': '0000.3000.0000'}, 'vrf_name': 'dup', 'server': {'src_mac_addr': '0000.0004.0000', 'name': 'GigabitEthernet0/0/4', 'dest_mac_addr': '0000.4000.0000'}}]
+ )
+
+ def tearDown(self):
+ pass
diff --git a/scripts/automation/regression/unit_tests/functional_tests/platform_dual_if_obj_test.py b/scripts/automation/regression/unit_tests/functional_tests/platform_dual_if_obj_test.py
new file mode 100755
index 00000000..ff54b9ee
--- /dev/null
+++ b/scripts/automation/regression/unit_tests/functional_tests/platform_dual_if_obj_test.py
@@ -0,0 +1,31 @@
+#!/router/bin/python
+
+from platform_cmd_link import *
+import functional_general_test
+from nose.tools import assert_equal
+from nose.tools import assert_not_equal
+
+
+class CDualIfObj_Test(functional_general_test.CGeneralFunctional_Test):
+
+ def setUp(self):
+ self.if_1 = CIfObj('gig0/0/1', '1.1.1.1', '2001:DB8:0:2222:0:0:0:1', '0000.0001.0000', '0000.0001.0000', IFType.Client)
+ self.if_2 = CIfObj('gig0/0/2', '1.1.2.1', '2001:DB8:1:2222:0:0:0:1', '0000.0002.0000', '0000.0002.0000', IFType.Server)
+ self.if_3 = CIfObj('gig0/0/3', '1.1.3.1', '2001:DB8:2:2222:0:0:0:1', '0000.0003.0000', '0000.0003.0000', IFType.Client)
+ self.if_4 = CIfObj('gig0/0/4', '1.1.4.1', '2001:DB8:3:2222:0:0:0:1', '0000.0004.0000', '0000.0004.0000', IFType.Server)
+ self.dual_1 = CDualIfObj(None, self.if_1, self.if_2)
+ self.dual_2 = CDualIfObj('dup', self.if_3, self.if_4)
+
+ def test_id_allocation(self):
+ assert (self.dual_1.get_id() < self.dual_2.get_id() < CDualIfObj._obj_id)
+
+ def test_get_vrf_name (self):
+ assert_equal ( self.dual_1.get_vrf_name() , None )
+ assert_equal ( self.dual_2.get_vrf_name() , 'dup' )
+
+ def test_is_duplicated (self):
+ assert_equal ( self.dual_1.is_duplicated() , False )
+ assert_equal ( self.dual_2.is_duplicated() , True )
+
+ def tearDown(self):
+ pass \ No newline at end of file
diff --git a/scripts/automation/regression/unit_tests/functional_tests/platform_if_manager_test.py b/scripts/automation/regression/unit_tests/functional_tests/platform_if_manager_test.py
new file mode 100755
index 00000000..7ba6e66e
--- /dev/null
+++ b/scripts/automation/regression/unit_tests/functional_tests/platform_if_manager_test.py
@@ -0,0 +1,40 @@
+#!/router/bin/python
+
+from platform_cmd_link import *
+import functional_general_test
+from nose.tools import assert_equal
+from nose.tools import assert_not_equal
+
+
+class CIfManager_Test(functional_general_test.CGeneralFunctional_Test):
+
+ def setUp(self):
+ self.dev_cfg = CDeviceCfg('./unit_tests/functional_tests/config.yaml')
+ self.if_mng = CIfManager()
+
+ # main testing method to check the entire class
+ def test_load_config (self):
+ self.if_mng.load_config(self.dev_cfg)
+
+ # check the number of items in each qeury
+ assert_equal( len(self.if_mng.get_if_list()), 4 )
+ assert_equal( len(self.if_mng.get_if_list(if_type = IFType.Client)), 2 )
+ assert_equal( len(self.if_mng.get_if_list(if_type = IFType.Client, is_duplicated = True)), 1 )
+ assert_equal( len(self.if_mng.get_if_list(if_type = IFType.Client, is_duplicated = False)), 1 )
+ assert_equal( len(self.if_mng.get_if_list(if_type = IFType.Server)), 2 )
+ assert_equal( len(self.if_mng.get_if_list(if_type = IFType.Server, is_duplicated = True)), 1 )
+ assert_equal( len(self.if_mng.get_if_list(if_type = IFType.Server, is_duplicated = False)), 1 )
+ assert_equal( len(self.if_mng.get_duplicated_if()), 2 )
+ assert_equal( len(self.if_mng.get_dual_if_list()), 2 )
+
+ # check the classification with intf name
+ assert_equal( map(CIfObj.get_name, self.if_mng.get_if_list() ), ['GigabitEthernet0/0/1','GigabitEthernet0/0/2','GigabitEthernet0/0/3','GigabitEthernet0/0/4'] )
+ assert_equal( map(CIfObj.get_name, self.if_mng.get_if_list(is_duplicated = True) ), ['GigabitEthernet0/0/3','GigabitEthernet0/0/4'] )
+ assert_equal( map(CIfObj.get_name, self.if_mng.get_if_list(is_duplicated = False) ), ['GigabitEthernet0/0/1','GigabitEthernet0/0/2'] )
+ assert_equal( map(CIfObj.get_name, self.if_mng.get_duplicated_if() ), ['GigabitEthernet0/0/3', 'GigabitEthernet0/0/4'] )
+
+ # check the classification with vrf name
+ assert_equal( map(CDualIfObj.get_vrf_name, self.if_mng.get_dual_if_list() ), [None, 'dup'] )
+
+ def tearDown(self):
+ pass
diff --git a/scripts/automation/regression/unit_tests/functional_tests/platform_if_obj_test.py b/scripts/automation/regression/unit_tests/functional_tests/platform_if_obj_test.py
new file mode 100755
index 00000000..534d4170
--- /dev/null
+++ b/scripts/automation/regression/unit_tests/functional_tests/platform_if_obj_test.py
@@ -0,0 +1,49 @@
+#!/router/bin/python
+
+from platform_cmd_link import *
+import functional_general_test
+from nose.tools import assert_equal
+from nose.tools import assert_not_equal
+
+
+class CIfObj_Test(functional_general_test.CGeneralFunctional_Test):
+ test_idx = 1
+
+ def setUp(self):
+ self.if_1 = CIfObj('gig0/0/1', '1.1.1.1', '2001:DB8:0:2222:0:0:0:1', '0000.0001.0000', '0000.0001.0000', IFType.Client)
+ self.if_2 = CIfObj('TenGig0/0/0', '1.1.2.1', '2001:DB8:1:2222:0:0:0:1', '0000.0002.0000', '0000.0002.0000', IFType.Server)
+ CIfObj_Test.test_idx += 1
+
+ def test_id_allocation(self):
+ assert (self.if_1.get_id() < self.if_2.get_id() < CIfObj._obj_id)
+
+ def test_isClient(self):
+ assert_equal (self.if_1.is_client(), True)
+
+ def test_isServer(self):
+ assert_equal (self.if_2.is_server(), True)
+
+ def test_get_name (self):
+ assert_equal (self.if_1.get_name(), 'gig0/0/1')
+ assert_equal (self.if_2.get_name(), 'TenGig0/0/0')
+
+ def test_get_src_mac_addr (self):
+ assert_equal (self.if_1.get_src_mac_addr(), '0000.0001.0000')
+
+ def test_get_dest_mac (self):
+ assert_equal (self.if_2.get_dest_mac(), '0000.0002.0000')
+
+ def test_get_ipv4_addr (self):
+ assert_equal (self.if_1.get_ipv4_addr(), '1.1.1.1' )
+ assert_equal (self.if_2.get_ipv4_addr(), '1.1.2.1' )
+
+ def test_get_ipv6_addr (self):
+ assert_equal (self.if_1.get_ipv6_addr(), '2001:DB8:0:2222:0:0:0:1' )
+ assert_equal (self.if_2.get_ipv6_addr(), '2001:DB8:1:2222:0:0:0:1' )
+
+ def test_get_type (self):
+ assert_equal (self.if_1.get_if_type(), IFType.Client)
+ assert_equal (self.if_2.get_if_type(), IFType.Server)
+
+ def tearDown(self):
+ pass
diff --git a/scripts/automation/regression/unit_tests/tests_exceptions.py b/scripts/automation/regression/unit_tests/tests_exceptions.py
new file mode 100755
index 00000000..604efcc8
--- /dev/null
+++ b/scripts/automation/regression/unit_tests/tests_exceptions.py
@@ -0,0 +1,37 @@
+#!/router/bin/python
+
+class TRexInUseError(Exception):
+ def __init__(self, value):
+ self.value = value
+ def __str__(self):
+ return repr(self.value)
+
+class TRexRunFailedError(Exception):
+ def __init__(self, value):
+ self.value = value
+ def __str__(self):
+ return repr(self.value)
+
+class TRexIncompleteRunError(Exception):
+ def __init__(self, value):
+ self.value = value
+ def __str__(self):
+ return repr(self.value)
+
+class TRexLowCpuUtilError(Exception):
+ def __init__(self, value):
+ self.value = value
+ def __str__(self):
+ return repr(self.value)
+
+class AbnormalResultError(Exception):
+ def __init__(self, value):
+ self.value = value
+ def __str__(self):
+ return repr(self.value)
+
+class ClassificationMissmatchError(Exception):
+ def __init__(self, value):
+ self.value = value
+ def __str__(self):
+ return repr(self.value)
diff --git a/scripts/automation/regression/unit_tests/trex_general_test.py b/scripts/automation/regression/unit_tests/trex_general_test.py
new file mode 100755
index 00000000..6a6ad79c
--- /dev/null
+++ b/scripts/automation/regression/unit_tests/trex_general_test.py
@@ -0,0 +1,328 @@
+#!/router/bin/python
+
+__copyright__ = "Copyright 2014"
+
+"""
+Name:
+ trex_general_test.py
+
+
+Description:
+
+ This script creates the functionality to test the performance of the T-Rex traffic generator
+ The tested scenario is a T-Rex TG directly connected to a Cisco router.
+
+::
+
+ Topology:
+
+ ------- --------
+ | | Tx---1gig/10gig----Rx | |
+ | T-Rex | | router |
+ | | Rx---1gig/10gig----Tx | |
+ ------- --------
+
+"""
+from nose.plugins import Plugin
+from nose.plugins.skip import SkipTest
+import trex
+import misc_methods
+import sys
+import os
+# from CPlatformUnderTest import *
+from CPlatform import *
+import termstyle
+import threading
+from tests_exceptions import *
+from platform_cmd_link import *
+import unittest
+
+
+class CTRexScenario():
+ modes = set() # list of modes of this setup: loopback, virtual etc.
+ server_logs = False
+ is_test_list = False
+ is_init = False
+ trex_crashed = False
+ configuration = None
+ trex = None
+ router = None
+ router_cfg = None
+ daemon_log_lines = 0
+ setup_name = None
+ setup_dir = None
+ router_image = None
+ trex_version = None
+ report_dir = 'reports'
+ # logger = None
+
+#scenario = CTRexScenario()
+
+def setUpModule(module):
+# print ("") # this is to get a newline after the dots
+# print ("setup_module before anything in this file")
+# # ff = CTRexScenario()
+# scenario.configuration = misc_methods.load_complete_config_file('config/config.yaml')
+# scenario.trex = trex.CTRexRunner(scenario.configuration[0], None)
+# scenario.router = CPlatform(scenario.configuration[1], False, scenario.configuration[2])
+# scenario.router.platform.preCheck()
+# print "Done instantiating trex scenario!"
+ pass
+
+def tearDownModule(module):
+# print ("") # this is to get a newline after the dots
+# scenario.router.platform.postCheck()
+# print ("teardown_module after anything in this file")
+ pass
+
+
+
+class CTRexGeneral_Test(unittest.TestCase):
+ """This class defines the general testcase of the T-Rex traffic generator"""
+ def __init__ (self, *args, **kwargs):
+ unittest.TestCase.__init__(self, *args, **kwargs)
+ # Point test object to scenario global object
+ self.configuration = CTRexScenario.configuration
+ self.benchmark = CTRexScenario.benchmark
+ self.trex = CTRexScenario.trex
+ self.trex_crashed = CTRexScenario.trex_crashed
+ self.modes = CTRexScenario.modes
+ self.skipping = False
+ self.fail_reasons = []
+ if not hasattr(self, 'unsupported_modes'):
+ self.unsupported_modes = []
+ self.is_loopback = True if 'loopback' in self.modes else False
+ self.is_virt_nics = True if 'virt_nics' in self.modes else False
+ self.is_VM = True if 'VM' in self.modes else False
+
+ if not CTRexScenario.is_init:
+ CTRexScenario.trex_version = self.trex.get_trex_version()
+ if not self.is_loopback:
+ # initilize the scenario based on received configuration, once per entire testing session
+ CTRexScenario.router = CPlatform(CTRexScenario.router_cfg['silent_mode'])
+ device_cfg = CDeviceCfg()
+ device_cfg.set_platform_config(CTRexScenario.router_cfg['config_dict'])
+ device_cfg.set_tftp_config(CTRexScenario.router_cfg['tftp_config_dict'])
+ CTRexScenario.router.load_platform_data_from_file(device_cfg)
+ CTRexScenario.router.launch_connection(device_cfg)
+ running_image = CTRexScenario.router.get_running_image_details()['image']
+ print 'Current router image: %s' % running_image
+ if CTRexScenario.router_cfg['forceImageReload']:
+ needed_image = device_cfg.get_image_name()
+ if not CTRexScenario.router.is_image_matches(needed_image):
+ print 'Setting router image: %s' % needed_image
+ CTRexScenario.router.config_tftp_server(device_cfg)
+ CTRexScenario.router.load_platform_image(needed_image)
+ CTRexScenario.router.set_boot_image(needed_image)
+ CTRexScenario.router.reload_platform(device_cfg)
+ CTRexScenario.router.launch_connection(device_cfg)
+ running_image = CTRexScenario.router.get_running_image_details()['image'] # verify image
+ if not CTRexScenario.router.is_image_matches(needed_image):
+ self.fail('Unable to set router image: %s, current image is: %s' % (needed_image, running_image))
+ else:
+ print 'Matches needed image: %s' % needed_image
+ CTRexScenario.router_image = running_image
+
+
+ if self.modes:
+ print termstyle.green('\t!!!\tRunning with modes: %s, not suitable tests will be skipped.\t!!!' % list(self.modes))
+
+ CTRexScenario.is_init = True
+ print termstyle.green("Done instantiating T-Rex scenario!\n")
+
+# raise RuntimeError('CTRexScenario class is not initialized!')
+ self.router = CTRexScenario.router
+
+
+
+# def assert_dict_eq (self, dict, key, val, error=''):
+# v1 = int(dict[key]))
+# self.assertEqual(v1, int(val), error)
+#
+# def assert_dict_gt (self, d, key, val, error=''):
+# v1 = int(dict[key])
+# self.assert_gt(v1, int(val), error)
+
+ def assertEqual(self, v1, v2, s):
+ if v1 != v2:
+ error='ERROR '+str(v1)+' != '+str(v2)+ ' '+s;
+ self.fail(error)
+
+ def assert_gt(self, v1, v2, s):
+ if not v1 > v2:
+ error='ERROR {big} > {small} {str}'.format(big = v1, small = v2, str = s)
+ self.fail(error)
+
+ def check_results_eq (self,res,name,val):
+ if res is None:
+ self.fail('TRex results cannot be None !')
+ return
+
+ if name not in res:
+ self.fail('TRex results does not include key %s' % name)
+ return
+
+ if res[name] != float(val):
+ self.fail('TRex results[%s]==%f and not as expected %f ' % (name, res[name], val))
+
+ def check_CPU_benchmark (self, trex_res, err):
+ #cpu_util = float(trex_res.get_last_value("trex-global.data.m_cpu_util"))
+ cpu_util = sum([float(x) for x in trex_res.get_value_list("trex-global.data.m_cpu_util")[-4:-1]]) / 3 # mean of 3 values before last
+
+ if cpu_util < 30 and not self.is_virt_nics:
+ self.fail("CPU is too low (%s%%), can't verify performance in such low CPU%%." % cpu_util )
+
+ cores = self.get_benchmark_param('cores')
+ trex_tx_bps = trex_res.get_last_value("trex-global.data.m_total_tx_bytes")
+ test_norm_cpu = 100.0*(trex_tx_bps/(cores*cpu_util))/1e6
+
+ print "TRex CPU utilization: %g%%, norm_cpu is : %d Mb/core" % (round(cpu_util), int(test_norm_cpu))
+
+ #expected_norm_cpu = self.get_benchmark_param('cpu_to_core_ratio')
+
+ #calc_error_precent = abs(100.0*(test_norm_cpu/expected_norm_cpu)-100.0)
+
+# if calc_error_precent > err:
+# msg ='Normalized bandwidth to CPU utilization ratio is %2.0f Mb/core expected %2.0f Mb/core more than %2.0f %% - ERROR' % (test_norm_cpu, expected_norm_cpu, err)
+# raise AbnormalResultError(msg)
+# else:
+# msg ='Normalized bandwidth to CPU utilization ratio is %2.0f Mb/core expected %2.0f Mb/core less than %2.0f %% - OK' % (test_norm_cpu, expected_norm_cpu, err)
+# print msg
+
+
+ def check_results_gt (self, res, name, val):
+ if res is None:
+ self.fail('TRex results canot be None !')
+ return
+
+ if name not in res:
+ self.fail('TRex results does not include key %s' % name)
+ return
+
+ if res[name]< float(val):
+ self.fail('TRex results[%s]<%f and not as expected greater than %f ' % (name, res[name], val))
+
+ def check_for_trex_crash(self):
+ pass
+
+ def get_benchmark_param (self, param, sub_param = None, test_name = None):
+ if not test_name:
+ test_name = self.get_name()
+ if test_name not in self.benchmark:
+ self.skip('No data in benchmark.yaml for test %s, skipping.' % test_name)
+ if sub_param:
+ return self.benchmark[test_name][param].get(sub_param)
+ else:
+ return self.benchmark[test_name].get(param)
+
+ def check_general_scenario_results (self, trex_res, check_latency = True):
+
+ try:
+ # check if test is valid
+ if not trex_res.is_done_warmup():
+ self.fail('T-Rex did not reach warm-up situtaion. Results are not valid.')
+
+ # check T-Rex number of drops
+ trex_tx_pckt = trex_res.get_last_value("trex-global.data.m_total_tx_pkts")
+ trex_drops = trex_res.get_total_drops()
+ trex_drop_rate = trex_res.get_drop_rate()
+ if ( (trex_drops/trex_tx_pckt) > 0.001) and (trex_drop_rate > 0.0): # deliberately mask kickoff drops when T-Rex first initiated
+ self.fail('Number of packet drops larger than 0.1% of all traffic')
+
+ # # check T-Rex expected counters
+ #trex_exp_rate = trex_res.get_expected_tx_rate().get('m_tx_expected_bps')
+ #assert trex_exp_rate is not None
+ #trex_exp_gbps = trex_exp_rate/(10**9)
+
+ if check_latency:
+ # check that max latency does not exceed 1 msec in regular setup or 20ms in VM
+ allowed_latency = 20000 if self.is_VM else 1000
+ if max(trex_res.get_max_latency().values()) > allowed_latency:
+ print 'LatencyError: Maximal latency exceeds %s (usec)' % allowed_latency
+ #raise AbnormalResultError('Maximal latency above 1ms')
+
+ # check that avg latency does not exceed 1 msec in regular setup or 3ms in VM
+ allowed_latency = 3000 if self.is_VM else 1000
+ if max(trex_res.get_avg_latency().values()) > allowed_latency:
+ print 'LatencyError: Average latency exceeds %s (usec)' % allowed_latency
+ #raise AbnormalResultError('Maximal latency above 1ms')
+
+ if not self.is_loopback:
+ # check router number of drops --> deliberately masked- need to be figured out!!!!!
+ pkt_drop_stats = self.router.get_drop_stats()
+# assert pkt_drop_stats['total_drops'] < 20
+
+ # check for trex-router packet consistency
+ # TODO: check if it's ok
+ print 'router drop stats: %s' % pkt_drop_stats
+ print 'TRex drop stats: %s' % trex_drops
+ #self.assertEqual(pkt_drop_stats, trex_drops, "TRex's and router's drop stats don't match.")
+
+ except KeyError as e:
+ self.fail(e)
+ #assert False
+
+ # except AssertionError as e:
+ # e.args += ('T-Rex has crashed!')
+ # raise
+
+ # We encountered error, don't fail the test immediately
+ def fail(self, reason = 'Unknown error'):
+ print 'Error: %s' % reason
+ self.fail_reasons.append(reason)
+
+ # skip running of the test, counts as 'passed' but prints 'skipped'
+ def skip(self, message = ''):
+ self.skipping = True
+ raise SkipTest(message)
+
+ # get name of currently running test
+ def get_name(self):
+ return self._testMethodName
+
+ def setUp(self):
+ test_setup_modes_conflict = self.modes & set(self.unsupported_modes)
+ if test_setup_modes_conflict:
+ self.skip("The test can't run with following modes of given setup: %s " % test_setup_modes_conflict)
+ if not self.trex.is_idle():
+ print 'Warning: TRex is not idle at setUp, trying to stop it.'
+ self.trex.force_kill(confirm = False)
+ if not self.is_loopback:
+ print ''
+ self.router.load_clean_config()
+ self.router.clear_counters()
+ self.router.clear_packet_drop_stats()
+
+ ########################################################################
+ #### DO NOT ADD TESTS TO THIS FILE ####
+ #### Added tests here will held once for EVERY test sub-class ####
+ ########################################################################
+
+ # masked example to such test. uncomment to watch how it affects #
+# def test_isInitialized(self):
+# assert CTRexScenario.is_init == True
+ def tearDown(self):
+ if not self.trex.is_idle():
+ print 'Warning: TRex is not idle at tearDown, trying to stop it.'
+ self.trex.force_kill(confirm = False)
+ if not self.skipping:
+ # print server logs of test run
+ if CTRexScenario.server_logs:
+ try:
+ print termstyle.green('\n>>>>>>>>>>>>>>> Daemon log <<<<<<<<<<<<<<<')
+ daemon_log = self.trex.get_trex_daemon_log()
+ log_size = len(daemon_log)
+ print ''.join(daemon_log[CTRexScenario.daemon_log_lines:])
+ CTRexScenario.daemon_log_lines = log_size
+ except Exception as e:
+ print "Can't get TRex daemon log:", e
+ try:
+ print termstyle.green('>>>>>>>>>>>>>>>> Trex log <<<<<<<<<<<<<<<<')
+ print ''.join(self.trex.get_trex_log())
+ except Exception as e:
+ print "Can't get TRex log:", e
+ if len(self.fail_reasons):
+ raise Exception('The test is failed, reasons:\n%s' % '\n'.join(self.fail_reasons))
+
+ def check_for_trex_crash(self):
+ pass
diff --git a/scripts/automation/regression/unit_tests/trex_imix_test.py b/scripts/automation/regression/unit_tests/trex_imix_test.py
new file mode 100755
index 00000000..b56f7f4e
--- /dev/null
+++ b/scripts/automation/regression/unit_tests/trex_imix_test.py
@@ -0,0 +1,176 @@
+
+#!/router/bin/python
+from trex_general_test import CTRexGeneral_Test
+from CPlatform import CStaticRouteConfig
+from tests_exceptions import *
+#import sys
+import time;
+
+class CTRexIMIX_Test(CTRexGeneral_Test):
+ """This class defines the IMIX testcase of the T-Rex traffic generator"""
+ def __init__(self, *args, **kwargs):
+ # super(CTRexIMIX_Test, self).__init__()
+ CTRexGeneral_Test.__init__(self, *args, **kwargs)
+ pass
+
+ def setUp(self):
+ super(CTRexIMIX_Test, self).setUp() # launch super test class setUp process
+ # CTRexGeneral_Test.setUp(self) # launch super test class setUp process
+ # self.router.clear_counters()
+ pass
+
+ def test_routing_imix_64(self):
+ # test initializtion
+ if not self.is_loopback:
+ self.router.configure_basic_interfaces()
+ self.router.config_pbr(mode = "config")
+
+# self.trex.set_yaml_file('cap2/imix_64.yaml')
+ mult = self.get_benchmark_param('multiplier')
+ core = self.get_benchmark_param('cores')
+
+# trex_res = self.trex.run(multiplier = mult, cores = core, duration = 30, l = 1000, p = True)
+ ret = self.trex.start_trex(
+ c = core,
+ m = mult,
+ p = True,
+ nc = True,
+ d = 30,
+ f = 'cap2/imix_64.yaml',
+ l = 1000)
+
+ trex_res = self.trex.sample_to_run_finish()
+
+ # trex_res is a CTRexResult instance- and contains the summary of the test results
+ # you may see all the results keys by simply calling here for 'print trex_res.result'
+ print ("\nLATEST RESULT OBJECT:")
+ print trex_res
+
+ self.check_general_scenario_results(trex_res)
+
+ self.check_CPU_benchmark(trex_res, 10.0)
+
+ # the name intentionally not matches nose default pattern, including the test should be specified explicitly
+ def dummy(self):
+ self.assertEqual(1, 2, 'boo')
+ self.assertEqual(2, 2, 'boo')
+ self.assertEqual(2, 3, 'boo')
+ #print ''
+ #print dir(self)
+ #print locals()
+ #print ''
+ #print_r(unittest.TestCase)
+ #print ''
+ #print_r(self)
+ print ''
+ #print unittest.TestCase.shortDescription(self)
+ #self.skip("I'm just a dummy test")
+
+
+ def test_routing_imix (self):
+ # test initializtion
+ if not self.is_loopback:
+ self.router.configure_basic_interfaces()
+ self.router.config_pbr(mode = "config")
+
+# self.trex.set_yaml_file('cap2/imix_fast_1g.yaml')
+ mult = self.get_benchmark_param('multiplier')
+ core = self.get_benchmark_param('cores')
+
+ ret = self.trex.start_trex(
+ c = core,
+ m = mult,
+ p = True,
+ nc = True,
+ d = 60,
+ f = 'cap2/imix_fast_1g.yaml',
+ l = 1000)
+
+ trex_res = self.trex.sample_to_run_finish()
+
+ # trex_res is a CTRexResult instance- and contains the summary of the test results
+ # you may see all the results keys by simply calling here for 'print trex_res.result'
+ print ("\nLATEST RESULT OBJECT:")
+ print trex_res
+
+ self.check_general_scenario_results(trex_res)
+
+ self.check_CPU_benchmark(trex_res, 10.0)
+
+
+ def test_static_routing_imix (self):
+ if self.is_loopback: # in loopback mode this test acts same as test_routing_imix, disable to avoid duplication
+ self.skip()
+ # test initializtion
+ if not self.is_loopback:
+ self.router.configure_basic_interfaces()
+
+ # Configure static routing based on benchmark data input
+ stat_route_dict = self.get_benchmark_param('stat_route_dict')
+ stat_route_obj = CStaticRouteConfig(stat_route_dict)
+ self.router.config_static_routing(stat_route_obj, mode = "config")
+
+ mult = self.get_benchmark_param('multiplier')
+ core = self.get_benchmark_param('cores')
+
+ ret = self.trex.start_trex(
+ c = core,
+ m = mult,
+ p = True,
+ nc = True,
+ d = 60,
+ f = 'cap2/imix_fast_1g.yaml',
+ l = 1000)
+
+ trex_res = self.trex.sample_to_run_finish()
+
+ # trex_res is a CTRexResult instance- and contains the summary of the test results
+ # you may see all the results keys by simply calling here for 'print trex_res.result'
+ print ("\nLATEST RESULT OBJECT:")
+ print trex_res
+ print ("\nLATEST DUMP:")
+ print trex_res.get_latest_dump()
+
+ self.check_general_scenario_results(trex_res)
+ self.check_CPU_benchmark(trex_res, 10)
+
+
+ def test_static_routing_imix_asymmetric (self):
+ # test initializtion
+ if not self.is_loopback:
+ self.router.configure_basic_interfaces()
+
+ # Configure static routing based on benchmark data input
+ stat_route_dict = self.get_benchmark_param('stat_route_dict')
+ stat_route_obj = CStaticRouteConfig(stat_route_dict)
+ self.router.config_static_routing(stat_route_obj, mode = "config")
+
+ mult = self.get_benchmark_param('multiplier')
+ core = self.get_benchmark_param('cores')
+
+ ret = self.trex.start_trex(
+ c = core,
+ m = mult,
+ nc = True,
+ d = 100,
+ f = 'cap2/imix_fast_1g.yaml',
+ l = 1000)
+
+ trex_res = self.trex.sample_to_run_finish()
+
+ # trex_res is a CTRexResults instance- and contains the summary of the test results
+ # you may see all the results keys by simply calling here for 'print trex_res.result'
+ print ("\nLATEST RESULT OBJECT:")
+ print trex_res
+
+ self.check_general_scenario_results(trex_res)
+
+ self.check_CPU_benchmark(trex_res, 10)
+
+ def tearDown(self):
+ CTRexGeneral_Test.tearDown(self)
+ # remove nbar config here
+ pass
+
+if __name__ == "__main__":
+ pass
diff --git a/scripts/automation/regression/unit_tests/trex_ipv6_test.py b/scripts/automation/regression/unit_tests/trex_ipv6_test.py
new file mode 100755
index 00000000..bffb4754
--- /dev/null
+++ b/scripts/automation/regression/unit_tests/trex_ipv6_test.py
@@ -0,0 +1,102 @@
+#!/router/bin/python
+from trex_general_test import CTRexGeneral_Test
+from tests_exceptions import *
+import time
+from nose.tools import assert_equal
+
+class CTRexIPv6_Test(CTRexGeneral_Test):
+ """This class defines the IPv6 testcase of the T-Rex traffic generator"""
+ def __init__(self, *args, **kwargs):
+ super(CTRexIPv6_Test, self).__init__(*args, **kwargs)
+ pass
+
+ def setUp(self):
+ super(CTRexIPv6_Test, self).setUp() # launch super test class setUp process
+# print " before sleep setup !!"
+# time.sleep(100000);
+# pass
+
+ def test_ipv6_simple(self):
+ if self.is_virt_nics:
+ self.skip('--ipv6 flag does not work correctly in with virtual NICs') # TODO: fix
+ # test initializtion
+ if not self.is_loopback:
+ self.router.configure_basic_interfaces()
+
+ self.router.config_pbr(mode = "config")
+ self.router.config_ipv6_pbr(mode = "config")
+
+ mult = self.get_benchmark_param('multiplier')
+ core = self.get_benchmark_param('cores')
+
+ ret = self.trex.start_trex(
+ c = core,
+ m = mult,
+ p = True,
+ nc = True,
+ ipv6 = True,
+ d = 60,
+ f = 'avl/sfr_delay_10_1g.yaml',
+ l = 1000)
+
+ trex_res = self.trex.sample_to_run_finish()
+
+ # trex_res is a CTRexResult instance- and contains the summary of the test results
+ # you may see all the results keys by simply calling here for 'print trex_res.result'
+ print ("\nLATEST RESULT OBJECT:")
+ print trex_res
+
+ self.check_general_scenario_results(trex_res)
+
+ self.check_CPU_benchmark (trex_res, 10.0)
+
+ assert True
+
+
+ def test_ipv6_negative (self):
+ if self.is_loopback:
+ self.skip('The test checks ipv6 drops by device and we are in loopback setup')
+ # test initializtion
+ self.router.configure_basic_interfaces()
+
+ # NOT CONFIGURING IPv6 INTENTIONALLY TO GET DROPS!
+ self.router.config_pbr(mode = "config")
+
+ # same params as test_ipv6_simple
+ mult = self.get_benchmark_param('multiplier', test_name = 'test_ipv6_simple')
+ core = self.get_benchmark_param('cores', test_name = 'test_ipv6_simple')
+
+ ret = self.trex.start_trex(
+ c = core,
+ m = mult,
+ p = True,
+ nc = True,
+ ipv6 = True,
+ d = 60,
+ f = 'avl/sfr_delay_10_1g.yaml',
+ l = 1000)
+
+ trex_res = self.trex.sample_to_run_finish()
+
+ # trex_res is a CTRexResult instance- and contains the summary of the test results
+ # you may see all the results keys by simply calling here for 'print trex_res.result'
+ print ("\nLATEST RESULT OBJECT:")
+ print trex_res
+
+ trex_tx_pckt = float(trex_res.get_last_value("trex-global.data.m_total_tx_pkts"))
+ trex_drops = int(trex_res.get_total_drops())
+
+ trex_drop_rate = trex_res.get_drop_rate()
+
+ # make sure that at least 50% of the total transmitted packets failed
+ self.assert_gt((trex_drops/trex_tx_pckt), 0.5, 'packet drop ratio is not high enough')
+
+
+
+ def tearDown(self):
+ CTRexGeneral_Test.tearDown(self)
+ # remove config here
+ pass
+
+if __name__ == "__main__":
+ pass
diff --git a/scripts/automation/regression/unit_tests/trex_nat_test.py b/scripts/automation/regression/unit_tests/trex_nat_test.py
new file mode 100755
index 00000000..452f7ecf
--- /dev/null
+++ b/scripts/automation/regression/unit_tests/trex_nat_test.py
@@ -0,0 +1,164 @@
+#!/router/bin/python
+from trex_general_test import CTRexGeneral_Test
+from tests_exceptions import *
+import time
+from CPlatform import CStaticRouteConfig, CNatConfig
+from nose.tools import assert_equal
+
+
+class CTRexNoNat_Test(CTRexGeneral_Test):#(unittest.TestCase):
+ """This class defines the NAT testcase of the T-Rex traffic generator"""
+ def __init__(self, *args, **kwargs):
+ super(CTRexNoNat_Test, self).__init__(*args, **kwargs)
+ self.unsupported_modes = ['loopback'] # NAT requires device
+ pass
+
+ def setUp(self):
+ super(CTRexNoNat_Test, self).setUp() # launch super test class setUp process
+ pass
+
+ def check_nat_stats (self, nat_stats):
+ pass
+
+
+ def test_nat_learning(self):
+ # test initializtion
+ self.router.configure_basic_interfaces()
+
+ stat_route_dict = self.get_benchmark_param('stat_route_dict')
+ stat_route_obj = CStaticRouteConfig(stat_route_dict)
+ self.router.config_static_routing(stat_route_obj, mode = "config")
+
+ self.router.config_nat_verify() # shutdown duplicate interfaces
+
+# self.trex.set_yaml_file('cap2/http_simple.yaml')
+ mult = self.get_benchmark_param('multiplier')
+ core = self.get_benchmark_param('cores')
+
+# trex_res = self.trex.run(multiplier = mult, cores = core, duration = 100, l = 1000, learn_verify = True)
+ ret = self.trex.start_trex(
+ c = core,
+ m = mult,
+ learn_verify = True,
+ d = 100,
+ f = 'cap2/http_simple.yaml',
+ l = 1000)
+
+ trex_res = self.trex.sample_to_run_finish()
+
+ print ("\nLATEST RESULT OBJECT:")
+ print trex_res
+ print ("\nLATEST DUMP:")
+ print trex_res.get_latest_dump()
+
+
+ expected_nat_opened = self.get_benchmark_param('nat_opened')
+ learning_stats = trex_res.get_last_value("trex-global.data", ".*nat.*") # extract all nat data
+
+ if self.get_benchmark_param('allow_timeout_dev'):
+ nat_timeout_ratio = learning_stats['m_total_nat_time_out']/learning_stats['m_total_nat_open']
+ if nat_timeout_ratio > 0.005:
+ self.fail('TRex nat_timeout ratio %f > 0.005 (0.5%) and not as expected to be less than 0.5%' %(nat_timeout_ratio))
+ else:
+ self.check_results_eq (learning_stats, 'm_total_nat_time_out', 0.0)
+ self.check_results_eq (learning_stats, 'm_total_nat_no_fid', 0.0)
+ self.check_results_gt (learning_stats, 'm_total_nat_learn_error', 0.0)
+#
+ self.check_results_gt (learning_stats, 'm_total_nat_open', expected_nat_opened)
+
+ self.check_general_scenario_results(trex_res)
+
+ # self.check_CPU_benchmark(trex_res, 10)
+
+ def tearDown(self):
+ CTRexGeneral_Test.tearDown(self)
+ pass
+
+
+class CTRexNat_Test(CTRexGeneral_Test):#(unittest.TestCase):
+ """This class defines the NAT testcase of the T-Rex traffic generator"""
+ def __init__(self, *args, **kwargs):
+ super(CTRexNat_Test, self).__init__(*args, **kwargs)
+ self.unsupported_modes = ['loopback'] # NAT requires device
+ pass
+
+ def setUp(self):
+ super(CTRexNat_Test, self).setUp() # launch super test class setUp process
+ # config nat here
+
+
+ def check_nat_stats (self, nat_stats):
+ pass
+
+
+ def test_nat_simple(self):
+ # test initializtion
+ self.router.configure_basic_interfaces()
+
+
+ stat_route_dict = self.get_benchmark_param('stat_route_dict')
+ stat_route_obj = CStaticRouteConfig(stat_route_dict)
+ self.router.config_static_routing(stat_route_obj, mode = "config")
+
+ nat_dict = self.get_benchmark_param('nat_dict')
+ nat_obj = CNatConfig(nat_dict)
+ self.router.config_nat(nat_obj)
+
+# self.trex.set_yaml_file('cap2/http_simple.yaml')
+ mult = self.get_benchmark_param('multiplier')
+ core = self.get_benchmark_param('cores')
+
+# trex_res = self.trex.run(nc=False,multiplier = mult, cores = core, duration = 100, l = 1000, learn = True)
+ ret = self.trex.start_trex(
+ c = core,
+ m = mult,
+ learn = True,
+ d = 100,
+ f = 'cap2/http_simple.yaml',
+ l = 1000)
+
+ trex_res = self.trex.sample_to_run_finish()
+
+ print ("\nLATEST RESULT OBJECT:")
+ print trex_res
+ print ("\nLATEST DUMP:")
+ print trex_res.get_latest_dump()
+
+ trex_nat_stats = trex_res.get_last_value("trex-global.data", ".*nat.*") # extract all nat data
+ if self.get_benchmark_param('allow_timeout_dev'):
+ nat_timeout_ratio = trex_nat_stats['m_total_nat_time_out']/trex_nat_stats['m_total_nat_open']
+ if nat_timeout_ratio > 0.005:
+ self.fail('TRex nat_timeout ratio %f > 0.005 (0.5%) and not as expected to be less than 0.5%' %(nat_timeout_ratio))
+ else:
+ self.check_results_eq (trex_nat_stats,'m_total_nat_time_out', 0.0)
+ self.check_results_eq (trex_nat_stats,'m_total_nat_no_fid', 0.0)
+ self.check_results_gt (trex_nat_stats,'m_total_nat_open', 6000)
+
+
+ self.check_general_scenario_results(trex_res)
+## test_norm_cpu = 2*(trex_res.result['total-tx']/(core*trex_res.result['cpu_utilization']))
+# trex_tx_pckt = trex_res.get_last_value("trex-global.data.m_total_tx_bps")
+# cpu_util = int(trex_res.get_last_value("trex-global.data.m_cpu_util"))
+# test_norm_cpu = 2*(trex_tx_pckt/(core*cpu_util))
+# print "test_norm_cpu is: ", test_norm_cpu
+
+ # self.check_CPU_benchmark(trex_res, 10)
+
+ #if ( abs((test_norm_cpu/self.get_benchmark_param('cpu_to_core_ratio')) - 1) > 0.03):
+ # raiseraise AbnormalResultError('Normalized bandwidth to CPU utilization ratio exceeds 3%')
+
+ nat_stats = self.router.get_nat_stats()
+ print nat_stats
+
+ self.assert_gt(nat_stats['total_active_trans'], 5000, 'total active translations is not high enough')
+ self.assert_gt(nat_stats['dynamic_active_trans'], 5000, 'total dynamic active translations is not high enough')
+ self.assertEqual(nat_stats['static_active_trans'], 0, "NAT statistics nat_stats['static_active_trans'] should be zero")
+ self.assert_gt(nat_stats['num_of_hits'], 50000, 'total nat hits is not high enough')
+
+ def tearDown(self):
+ CTRexGeneral_Test.tearDown(self)
+ self.router.clear_nat_translations()
+
+
+if __name__ == "__main__":
+ pass
diff --git a/scripts/automation/regression/unit_tests/trex_nbar_test.py b/scripts/automation/regression/unit_tests/trex_nbar_test.py
new file mode 100755
index 00000000..e4f7eb37
--- /dev/null
+++ b/scripts/automation/regression/unit_tests/trex_nbar_test.py
@@ -0,0 +1,193 @@
+#!/router/bin/python
+from trex_general_test import CTRexGeneral_Test
+from tests_exceptions import *
+from interfaces_e import IFType
+from nose.tools import nottest
+from misc_methods import print_r
+
+class CTRexNbar_Test(CTRexGeneral_Test):
+ """This class defines the NBAR testcase of the T-Rex traffic generator"""
+ def __init__(self, *args, **kwargs):
+ super(CTRexNbar_Test, self).__init__(*args, **kwargs)
+ self.unsupported_modes = ['loopback'] # obviously no NBar in loopback
+ pass
+
+ def setUp(self):
+ super(CTRexNbar_Test, self).setUp() # launch super test class setUp process
+# self.router.kill_nbar_flows()
+ self.router.clear_cft_counters()
+ self.router.clear_nbar_stats()
+
+ def match_classification (self):
+ nbar_benchmark = self.get_benchmark_param("nbar_classification")
+ test_classification = self.router.get_nbar_stats()
+ print "TEST CLASSIFICATION:"
+ print test_classification
+ missmatchFlag = False
+ missmatchMsg = "NBAR classification contians a missmatch on the following protocols:"
+ fmt = '\n\t{0:15} | Expected: {1:>3.2f}%, Got: {2:>3.2f}%'
+ noise_level = 0.045 # percents
+
+ for cl_intf in self.router.get_if_manager().get_if_list(if_type = IFType.Client):
+ client_intf = cl_intf.get_name()
+
+ # removing noise classifications
+ for key, value in test_classification[client_intf]['percentage'].items():
+ if value <= noise_level:
+ print 'Removing noise classification: %s' % key
+ del test_classification[client_intf]['percentage'][key]
+
+ if len(test_classification[client_intf]['percentage']) != (len(nbar_benchmark) + 1): # adding 'total' key to nbar_benchmark
+ raise ClassificationMissmatchError ('The total size of classification result does not match the provided benchmark.')
+
+ for protocol, bench in nbar_benchmark.iteritems():
+ if protocol != 'total':
+ try:
+ bench = float(bench)
+ protocol = protocol.replace('_','-')
+ protocol_test_res = test_classification[client_intf]['percentage'][protocol]
+ deviation = 100 * abs(bench/protocol_test_res - 1) # percents
+ difference = abs(bench - protocol_test_res)
+ if (deviation > 10 and difference > noise_level): # allowing 10% deviation and 'noise_level'% difference
+ missmatchFlag = True
+ missmatchMsg += fmt.format(protocol, bench, protocol_test_res)
+ except KeyError as e:
+ missmatchFlag = True
+ print e
+ print "Changes missmatchFlag to True. ", "\n\tProtocol {0} isn't part of classification results on interface {intf}".format( protocol, intf = client_intf )
+ missmatchMsg += "\n\tProtocol {0} isn't part of classification results on interface {intf}".format( protocol, intf = client_intf )
+ except ZeroDivisionError as e:
+ print "ZeroDivisionError: %s" % protocol
+ pass
+ if missmatchFlag:
+ self.fail(missmatchMsg)
+
+
+ def test_nbar_simple(self):
+ # test initializtion
+ deviation_compare_value = 0.03 # default value of deviation - 3%
+ self.router.configure_basic_interfaces()
+
+ self.router.config_pbr(mode = "config")
+ self.router.config_nbar_pd()
+
+ mult = self.get_benchmark_param('multiplier')
+ core = self.get_benchmark_param('cores')
+
+ ret = self.trex.start_trex(
+ c = core,
+ m = mult,
+ p = True,
+ nc = True,
+ d = 100,
+ f = 'avl/sfr_delay_10_1g.yaml',
+ l = 1000)
+
+ trex_res = self.trex.sample_to_run_finish()
+
+ # trex_res is a CTRexResult instance- and contains the summary of the test results
+ # you may see all the results keys by simply calling here for 'print trex_res.result'
+ print ("\nLATEST RESULT OBJECT:")
+ print trex_res
+ print ("\nLATEST DUMP:")
+ print trex_res.get_latest_dump()
+
+
+ self.check_general_scenario_results(trex_res, check_latency = False)
+ # test_norm_cpu = 2*(trex_res.result['total-tx']/(core*trex_res.result['cpu_utilization']))
+ trex_tx_pckt = trex_res.get_last_value("trex-global.data.m_total_tx_pkts")
+ cpu_util = trex_res.get_last_value("trex-global.data.m_cpu_util")
+ cpu_util_hist = trex_res.get_value_list("trex-global.data.m_cpu_util")
+ print "cpu util is:", cpu_util
+ print cpu_util_hist
+ test_norm_cpu = 2 * trex_tx_pckt / (core * cpu_util)
+ print "test_norm_cpu is:", test_norm_cpu
+
+
+ if self.get_benchmark_param('cpu2core_custom_dev'):
+ # check this test by custom deviation
+ deviation_compare_value = self.get_benchmark_param('cpu2core_dev')
+ print "Comparing test with custom deviation value- {dev_val}%".format( dev_val = int(deviation_compare_value*100) )
+
+ # need to be fixed !
+ #if ( abs((test_norm_cpu/self.get_benchmark_param('cpu_to_core_ratio')) - 1) > deviation_compare_value):
+ # raise AbnormalResultError('Normalized bandwidth to CPU utilization ratio exceeds benchmark boundaries')
+
+ self.match_classification()
+
+ assert True
+
+ @nottest
+ def test_rx_check (self):
+ # test initializtion
+ self.router.configure_basic_interfaces()
+
+ self.router.config_pbr(mode = "config")
+ self.router.config_nbar_pd()
+
+ mult = self.get_benchmark_param('multiplier')
+ core = self.get_benchmark_param('cores')
+ sample_rate = self.get_benchmark_param('rx_sample_rate')
+
+ ret = self.trex.start_trex(
+ c = core,
+ m = mult,
+ p = True,
+ nc = True,
+ rx_check = sample_rate,
+ d = 100,
+ f = 'cap2/sfr.yaml',
+ l = 1000)
+
+ trex_res = self.trex.sample_to_run_finish()
+
+ # trex_res is a CTRexResult instance- and contains the summary of the test results
+ # you may see all the results keys by simply calling here for 'print trex_res.result'
+ print ("\nLATEST RESULT OBJECT:")
+ print trex_res
+ print ("\nLATEST DUMP:")
+ print trex_res.get_latest_dump()
+
+ self.check_general_scenario_results(trex_res)
+
+ self.check_CPU_benchmark(trex_res, 10)
+
+# if trex_res.result['rx_check_tx']==trex_res.result['rx_check_rx']: # rx_check verification shoud pass
+# assert trex_res.result['rx_check_verification'] == "OK"
+# else:
+# assert trex_res.result['rx_check_verification'] == "FAIL"
+
+ # the name intentionally not matches nose default pattern, including the test should be specified explicitly
+ def NBarLong(self):
+ self.router.configure_basic_interfaces()
+ self.router.config_pbr(mode = "config")
+ self.router.config_nbar_pd()
+
+ mult = self.get_benchmark_param('multiplier')
+ core = self.get_benchmark_param('cores')
+
+ ret = self.trex.start_trex(
+ c = core,
+ m = mult,
+ p = True,
+ nc = True,
+ d = 18000, # 5 hours
+ f = 'avl/sfr_delay_10_1g.yaml',
+ l = 1000)
+
+ trex_res = self.trex.sample_to_run_finish()
+
+ # trex_res is a CTRexResult instance- and contains the summary of the test results
+ # you may see all the results keys by simply calling here for 'print trex_res.result'
+ print ("\nLATEST RESULT OBJECT:")
+ print trex_res
+
+ self.check_general_scenario_results(trex_res, check_latency = False)
+
+
+ def tearDown(self):
+ CTRexGeneral_Test.tearDown(self)
+ pass
+
+if __name__ == "__main__":
+ pass
diff --git a/scripts/automation/regression/unit_tests/trex_rx_test.py b/scripts/automation/regression/unit_tests/trex_rx_test.py
new file mode 100755
index 00000000..a37615c4
--- /dev/null
+++ b/scripts/automation/regression/unit_tests/trex_rx_test.py
@@ -0,0 +1,285 @@
+#!/router/bin/python
+from trex_general_test import CTRexGeneral_Test
+from CPlatform import CStaticRouteConfig, CNatConfig
+from tests_exceptions import *
+#import sys
+import time
+import copy
+from nose.tools import nottest
+import traceback
+
+class CTRexRx_Test(CTRexGeneral_Test):
+ """This class defines the rx testcase of the T-Rex traffic generator"""
+ def __init__(self, *args, **kwargs):
+ CTRexGeneral_Test.__init__(self, *args, **kwargs)
+ self.unsupported_modes = ['virt_nics'] # TODO: fix
+ pass
+
+ def setUp(self):
+ CTRexGeneral_Test.setUp(self)
+ pass
+
+
+ def check_rx_errors(self, trex_res):
+ try:
+ # counters to check
+
+ latency_counters_display = {'m_unsup_prot': 0, 'm_no_magic': 0, 'm_no_id': 0, 'm_seq_error': 0, 'm_length_error': 0, 'm_no_ipv4_option': 0, 'm_tx_pkt_err': 0}
+ rx_counters = {'m_err_drop': 0, 'm_err_aged': 0, 'm_err_no_magic': 0, 'm_err_wrong_pkt_id': 0, 'm_err_fif_seen_twice': 0, 'm_err_open_with_no_fif_pkt': 0, 'm_err_oo_dup': 0, 'm_err_oo_early': 0, 'm_err_oo_late': 0, 'm_err_flow_length_changed': 0}
+
+ # get relevant TRex results
+
+ try:
+ ports_names = trex_res.get_last_value('trex-latecny-v2.data', 'port\-\d+')
+ if not ports_names:
+ raise AbnormalResultError('Could not find ports info in TRex results, path: trex-latecny-v2.data.port-*')
+ for port_name in ports_names:
+ path = 'trex-latecny-v2.data.%s.stats' % port_name
+ port_result = trex_res.get_last_value(path)
+ if not port_result:
+ raise AbnormalResultError('Could not find port stats in TRex results, path: %s' % path)
+ for key in latency_counters_display:
+ latency_counters_display[key] += port_result[key]
+
+ # using -k flag in TRex produces 1 error per port in latency counter m_seq_error, allow it until issue resolved. For comparing use dict with reduces m_seq_error number.
+ latency_counters_compare = copy.deepcopy(latency_counters_display)
+ latency_counters_compare['m_seq_error'] = max(0, latency_counters_compare['m_seq_error'] - len(ports_names))
+
+ path = 'rx-check.data.stats'
+ rx_check_results = trex_res.get_last_value(path)
+ if not rx_check_results:
+ raise AbnormalResultError('No TRex results by path: %s' % path)
+ for key in rx_counters:
+ rx_counters[key] = rx_check_results[key]
+
+ path = 'rx-check.data.stats.m_total_rx'
+ total_rx = trex_res.get_last_value(path)
+ if not total_rx:
+ raise AbnormalResultError('No TRex results by path: %s' % path)
+
+
+ print 'Total packets checked: %s' % total_rx
+ print 'Latency counters: %s' % latency_counters_display
+ print 'rx_check counters: %s' % rx_counters
+
+ except KeyError as e:
+ self.fail('Expected key in TRex result was not found.\n%s' % traceback.print_exc())
+
+ # the check. in loopback expect 0 problems, at others allow errors <error_tolerance>% of total_rx
+
+ total_errors = sum(rx_counters.values()) + sum(latency_counters_compare.values())
+ error_tolerance = self.get_benchmark_param('error_tolerance')
+ if not error_tolerance:
+ error_tolerance = 0
+ error_percentage = float(total_errors) * 100 / total_rx
+
+ if total_errors > 0:
+ if self.is_loopback or error_percentage > error_tolerance:
+ self.fail('Too much errors in rx_check. (~%s%% of traffic)' % error_percentage)
+ else:
+ print 'There are errors in rx_check (%f%%), not exceeding allowed limit (%s%%)' % (error_percentage, error_tolerance)
+ else:
+ print 'No errors in rx_check.'
+ except Exception as e:
+ print traceback.print_exc()
+ self.fail('Errors in rx_check: %s' % e)
+
+ def test_rx_check_sfr(self):
+ if not self.is_loopback:
+ self.router.configure_basic_interfaces()
+ self.router.config_pbr(mode = 'config')
+
+ core = self.get_benchmark_param('cores')
+ mult = self.get_benchmark_param('multiplier')
+ sample_rate = self.get_benchmark_param('rx_sample_rate')
+
+ ret = self.trex.start_trex(
+ c = core,
+ m = mult,
+ p = True,
+ nc = True,
+ rx_check = sample_rate,
+ d = 100,
+ f = 'avl/sfr_delay_10_1g_no_bundeling.yaml',
+ l = 1000,
+ k = 10,
+ learn_verify = True,
+ l_pkt_mode = 2)
+
+ trex_res = self.trex.sample_to_run_finish()
+
+ print ("\nLATEST RESULT OBJECT:")
+ print trex_res
+ #print ("\nLATEST DUMP:")
+ #print trex_res.get_latest_dump()
+
+ self.check_general_scenario_results(trex_res)
+ self.check_CPU_benchmark(trex_res, 10)
+ self.check_rx_errors(trex_res)
+
+
+ def test_rx_check_http(self):
+ if not self.is_loopback:
+ # TODO: skip as test_rx_check_http_negative will cover it
+ #self.skip('This test is covered by test_rx_check_http_negative')
+ self.router.configure_basic_interfaces()
+ self.router.config_pbr(mode = "config")
+
+ core = self.get_benchmark_param('cores')
+ mult = self.get_benchmark_param('multiplier')
+ sample_rate = self.get_benchmark_param('rx_sample_rate')
+
+ ret = self.trex.start_trex(
+ c = core,
+ m = mult,
+ p = True,
+ nc = True,
+ rx_check = sample_rate,
+ d = 100,
+ f = 'cap2/http_simple.yaml',
+ l = 1000,
+ k = 10,
+ learn_verify = True,
+ l_pkt_mode = 2)
+
+ trex_res = self.trex.sample_to_run_finish()
+
+ print ("\nLATEST RESULT OBJECT:")
+ print trex_res
+
+ self.check_general_scenario_results(trex_res)
+ self.check_CPU_benchmark(trex_res, 10)
+ self.check_rx_errors(trex_res)
+
+
+ def test_rx_check_sfr_ipv6(self):
+ if not self.is_loopback:
+ self.router.configure_basic_interfaces()
+ self.router.config_pbr(mode = 'config')
+ self.router.config_ipv6_pbr(mode = "config")
+
+ core = self.get_benchmark_param('cores')
+ mult = self.get_benchmark_param('multiplier')
+ sample_rate = self.get_benchmark_param('rx_sample_rate')
+
+ ret = self.trex.start_trex(
+ c = core,
+ m = mult,
+ p = True,
+ nc = True,
+ rx_check = sample_rate,
+ d = 100,
+ f = 'avl/sfr_delay_10_1g_no_bundeling.yaml',
+ l = 1000,
+ k = 10,
+ ipv6 = True)
+
+ trex_res = self.trex.sample_to_run_finish()
+
+ print ("\nLATEST RESULT OBJECT:")
+ print trex_res
+ #print ("\nLATEST DUMP:")
+ #print trex_res.get_latest_dump()
+
+ self.check_general_scenario_results(trex_res)
+ self.check_CPU_benchmark(trex_res, 10)
+ self.check_rx_errors(trex_res)
+
+
+ def test_rx_check_http_ipv6(self):
+ if not self.is_loopback:
+ self.router.configure_basic_interfaces()
+ self.router.config_pbr(mode = "config")
+ self.router.config_ipv6_pbr(mode = "config")
+
+ core = self.get_benchmark_param('cores')
+ mult = self.get_benchmark_param('multiplier')
+ sample_rate = self.get_benchmark_param('rx_sample_rate')
+
+ ret = self.trex.start_trex(
+ c = core,
+ m = mult,
+ p = True,
+ nc = True,
+ rx_check = sample_rate,
+ d = 100,
+ f = 'cap2/http_simple.yaml',
+ l = 1000,
+ k = 10,
+ ipv6 = True)
+
+ trex_res = self.trex.sample_to_run_finish()
+
+ print ("\nLATEST RESULT OBJECT:")
+ print trex_res
+
+ self.check_general_scenario_results(trex_res)
+ self.check_CPU_benchmark(trex_res, 10)
+ self.check_rx_errors(trex_res)
+
+ @nottest
+ def test_rx_check_http_negative(self):
+ if self.is_loopback:
+ self.skip('This test uses NAT, not relevant for loopback')
+
+ self.router.configure_basic_interfaces()
+ stat_route_dict = self.get_benchmark_param('stat_route_dict')
+ stat_route_obj = CStaticRouteConfig(stat_route_dict)
+ self.router.config_static_routing(stat_route_obj, mode = "config")
+
+ core = self.get_benchmark_param('cores')
+ mult = self.get_benchmark_param('multiplier')
+ sample_rate = self.get_benchmark_param('rx_sample_rate')
+
+ ret = self.trex.start_trex(
+ c = core,
+ m = mult,
+ #p = True,
+ #nc = True,
+ rx_check = sample_rate,
+ d = 80,
+ f = 'cap2/http_simple.yaml',
+ l = 1000,
+ k = 10,
+ learn_verify = True,
+ l_pkt_mode = 2)
+
+ print 'Run for 2 minutes, expect no errors'
+ trex_res = self.trex.sample_x_seconds(60)
+ print ("\nLATEST RESULT OBJECT:")
+ print trex_res
+ self.check_general_scenario_results(trex_res)
+ self.check_CPU_benchmark(trex_res, 10)
+ self.check_rx_errors(trex_res)
+
+ try:
+ # TODO: add nat/zbf config for router
+ nat_dict = self.get_benchmark_param('nat_dict')
+ nat_obj = CNatConfig(nat_dict)
+ self.router.config_nat(nat_obj)
+ self.router.config_nat_verify()
+ self.router.config_zbf()
+
+ print 'Run until finish, expect errors'
+ trex_res = self.trex.sample_to_run_finish()
+
+ self.router.config_no_zbf()
+ self.router.clear_nat_translations()
+ print ("\nLATEST RESULT OBJECT:")
+ print trex_res
+ nat_stats = self.router.get_nat_stats()
+ print nat_stats
+ self.check_general_scenario_results(trex_res)
+ self.check_CPU_benchmark(trex_res, 10)
+ self.check_rx_errors(trex_res)
+ self.fail('Expected errors here, got none.')
+ except Exception as e:
+ print 'Got errors as expected: %s' % e
+ pass
+
+ def tearDown(self):
+ CTRexGeneral_Test.tearDown(self)
+ pass
+
+if __name__ == "__main__":
+ pass
diff --git a/scripts/automation/trex_control_plane/client/trex_hltapi.py b/scripts/automation/trex_control_plane/client/trex_hltapi.py
index 92768ca4..848d5a9e 100755
--- a/scripts/automation/trex_control_plane/client/trex_hltapi.py
+++ b/scripts/automation/trex_control_plane/client/trex_hltapi.py
@@ -17,14 +17,16 @@ class CTRexHltApi(object):
self._port_data = {}
# ----- session functions ----- #
-
- def connect(self, device, port_list, username, port=5050, reset=False, break_locks=False):
+ # sync = RPC, async = ZMQ
+ def connect(self, device, port_list, username, sync_port = 4501, async_port = 4500, reset=False, break_locks=False):
ret_dict = {"status": 0}
- self.trex_client = CTRexStatelessClient(username, device, port)
- res_ok, msg = self.trex_client.connect()
- if not res_ok:
+ self.trex_client = CTRexStatelessClient(username, device, sync_port, async_port)
+
+ rc = self.trex_client.connect()
+ if rc.bad():
+
self.trex_client = None
- ret_dict.update({"log": msg})
+ ret_dict.update({"log": rc.err()})
return ret_dict
# arrived here, connection successfully created with server
# next, try acquiring ports of TRex
@@ -70,7 +72,6 @@ class CTRexHltApi(object):
port_list = self.parse_port_list(port_list)
response = self.trex_client.release(port_list)
res_ok, log = CTRexHltApi.process_response(port_list, response)
- print log
if not res_ok:
ret_dict.update({"log": log})
return ret_dict
@@ -89,11 +90,13 @@ class CTRexHltApi(object):
return {"status": 1, "log": None}
# ----- traffic functions ----- #
- def traffic_config(self, mode, port_handle,
+ def traffic_config(self, mode, port_list,
l2_encap="ethernet_ii", mac_src="00:00:01:00:00:01", mac_dst="00:00:00:00:00:00",
l3_protocol="ipv4", ip_src_addr="0.0.0.0", ip_dst_addr="192.0.0.1", l3_length=110,
transmit_mode="continuous", rate_pps=100,
**kwargs):
+ if type(port_list) is not list():
+ port_list = [port_list]
ALLOWED_MODES = ["create", "modify", "remove", "enable", "disable", "reset"]
if mode not in ALLOWED_MODES:
raise ValueError("mode must be one of the following values: {modes}".format(modes=ALLOWED_MODES))
@@ -119,45 +122,55 @@ class CTRexHltApi(object):
except Exception as e:
# some exception happened during the stream creation
return {"status": 0, "log": str(e)}
- # try adding the stream, until free stream_id is found
- port_data = self._port_data.get(port_handle)
- id_candidate = None
- # TODO: change this to better implementation
- while True:
- id_candidate = port_data["stream_id_gen"].next()
- response = self.trex_client.add_stream(stream_id=id_candidate,
- stream_obj=stream_obj,
- port_id=port_handle)
- res_ok, log = CTRexHltApi.process_response(port_handle, response)
- if res_ok:
- # found non-taken stream_id on server
- # save it for modifying needs
- port_data["streams"].update({id_candidate: stream_obj})
- break
- else:
- # proceed to another iteration to use another id
- continue
- return {"status": 1,
- "stream_id": id_candidate,
- "log": None}
+ # try adding the stream per port, until free stream_id is found
+ for port_id in port_list:
+ port_data = self._port_data.get(port_id)
+ id_candidate = None
+ # TODO: change this to better implementation
+ while True:
+ id_candidate = port_data["stream_id_gen"].next()
+ response = self.trex_client.add_stream(stream_id=id_candidate,
+ stream_obj=stream_obj.dump(),
+ port_id_list=port_id)
+ res_ok, log = CTRexHltApi.process_response(port_id, response)
+ if res_ok:
+ # found non-taken stream_id on server
+ # save it for modifying needs
+ port_data["streams"].update({id_candidate: stream_obj})
+ break
+ else:
+ print log
+ # proceed to another iteration to use another id
+ print 'need another iteration?'
+ continue
+ return {"status": 1,
+ "stream_id": id_candidate,
+ "log": None}
+
else:
raise NotImplementedError("mode '{0}' is not supported yet on TRex".format(mode))
- def traffic_control(self, action, port_handle):
+ def traffic_control(self, action, port_handle, **kwargs):
ALLOWED_ACTIONS = ["clear_stats", "run", "stop", "sync_run"]
if action not in ALLOWED_ACTIONS:
raise ValueError("action must be one of the following values: {actions}".format(actions=ALLOWED_ACTIONS))
# ret_dict = {"status": 0, "stopped": 1}
port_list = self.parse_port_list(port_handle)
+ if type(port_list) is not list():
+ port_list = [port_list]
if action == "run":
- response = self.trex_client.start_traffic(port_id=port_list)
+ if not set(kwargs.keys()) >= set(['mul', 'duration']):
+ raise ValueError("For 'run' action should be specified mul and duration arguments")
+ response = self.trex_client.start_traffic(kwargs['mul'], kwargs['duration'], port_id_list=port_list)
res_ok, log = CTRexHltApi.process_response(port_list, response)
if res_ok:
return {"status": 1,
"stopped": 0,
"log": None}
+ else:
+ print log
elif action == "stop":
- response = self.trex_client.stop_traffic(port_id=port_list)
+ response = self.trex_client.stop_traffic(port_id_list=port_list)
res_ok, log = CTRexHltApi.process_response(port_list, response)
if res_ok:
return {"status": 1,
@@ -236,13 +249,10 @@ class CTRexHltApi(object):
@staticmethod
def process_response(port_list, response):
+ log = response.data() if response.good() else response.err()
if isinstance(port_list, list):
- res_ok, response = response
- log = CTRexHltApi.join_batch_response(response)
- else:
- res_ok = response.success
- log = str(response)
- return res_ok, log
+ log = CTRexHltApi.join_batch_response(log)
+ return response.good(), log
@staticmethod
def parse_port_list(port_list):
@@ -257,8 +267,9 @@ class CTRexHltApi(object):
@staticmethod
def join_batch_response(responses):
- return "\n".join([str(response)
- for response in responses])
+ if type(responses) is list():
+ return "\n". join([str(response) for response in responses])
+ return responses
@staticmethod
def generate_stream(l2_encap, mac_src, mac_dst,
diff --git a/scripts/automation/trex_control_plane/client_utils/external_packages.py b/scripts/automation/trex_control_plane/client_utils/external_packages.py
index 3982a1b2..9d8c4dcf 100755
--- a/scripts/automation/trex_control_plane/client_utils/external_packages.py
+++ b/scripts/automation/trex_control_plane/client_utils/external_packages.py
@@ -8,7 +8,7 @@ ROOT_PATH = os.path.abspath(os.path.join(CURRENT_PATH, os.pardir))
PATH_TO_PYTHON_LIB = os.path.abspath(os.path.join(ROOT_PATH, os.pardir, os.pardir, 'external_libs'))
CLIENT_UTILS_MODULES = ['dpkt-1.8.6',
- 'PyYAML-3.01/lib',
+ 'yaml-3.11',
'texttable-0.8.4'
]
diff --git a/scripts/automation/trex_control_plane/client_utils/packet_builder.py b/scripts/automation/trex_control_plane/client_utils/packet_builder.py
index d8070c74..1ca01a33 100755
--- a/scripts/automation/trex_control_plane/client_utils/packet_builder.py
+++ b/scripts/automation/trex_control_plane/client_utils/packet_builder.py
@@ -692,7 +692,7 @@ class CTRexPktBuilder(object):
None
"""
super(CTRexPktBuilder.CTRexVM, self).__init__()
- self.vm_variables = {}
+ self.vm_variables = {'instructions': [], 'split_by_var': ""}
self._inst_by_offset = {} # this data structure holds only offset-related instructions, ordered in tuples
self._off_inst_by_name = {}
@@ -807,6 +807,10 @@ class CTRexPktBuilder(object):
list holds variables data of VM
"""
+
+ return self.vm_variables
+ # !!! TODO: review code below !!!
+
# at first, dump all CTRexVMFlowVariable instructions
ret_val = [var.dump()
for key, var in self.vm_variables.items()]
diff --git a/scripts/automation/trex_control_plane/common/external_packages.py b/scripts/automation/trex_control_plane/common/external_packages.py
index 62121d4f..7353c397 100755
--- a/scripts/automation/trex_control_plane/common/external_packages.py
+++ b/scripts/automation/trex_control_plane/common/external_packages.py
@@ -7,7 +7,7 @@ CURRENT_PATH = os.path.dirname(os.path.realpath(__file__))
ROOT_PATH = os.path.abspath(os.path.join(CURRENT_PATH, os.pardir)) # path to trex_control_plane directory
PATH_TO_PYTHON_LIB = os.path.abspath(os.path.join(ROOT_PATH, os.pardir, os.pardir, 'external_libs'))
-CLIENT_UTILS_MODULES = ['PyYAML-3.01/lib'
+CLIENT_UTILS_MODULES = ['yaml-3.11'
]
def import_common_modules():