#!/usr/bin/env python """ Vpp VCL tests """ import unittest import os import subprocess import signal from framework import VppTestCase, VppTestRunner, running_extended_tests, \ Worker from vpp_ip_route import VppIpTable, VppIpRoute, VppRoutePath, DpoProto class VCLAppWorker(Worker): """ VCL Test Application Worker """ def __init__(self, build_dir, appname, args, logger, env={}): vcl_lib_dir = "%s/vpp/lib" % build_dir if "iperf" in appname: app = appname env.update({'LD_PRELOAD': "%s/libvcl_ldpreload.so" % vcl_lib_dir}) elif "sock" in appname: app = "%s/vpp/bin/%s" % (build_dir, appname) env.update({'LD_PRELOAD': "%s/libvcl_ldpreload.so" % vcl_lib_dir}) else: app = "%s/vpp/bin/%s" % (build_dir, appname) self.args = [app] + args super(VCLAppWorker, self).__init__(self.args, logger, env) class VCLTestCase(VppTestCase): """ VCL Test Class """ @classmethod def setUpClass(cls): super(VCLTestCase, cls).setUpClass() @classmethod def tearDownClass(cls): super(VCLTestCase, cls).tearDownClass() def setUp(self): var = "VPP_BUILD_DIR" self.build_dir = os.getenv(var, None) if self.build_dir is None: raise Exception("Environment variable `%s' not set" % var) self.vppDebug = 'vpp_debug' in self.build_dir self.server_addr = "127.0.0.1" self.server_port = "22000" self.server_args = [self.server_port] self.server_ipv6_addr = "::1" self.server_ipv6_args = ["-6", self.server_port] self.timeout = 20 self.echo_phrase = "Hello, world! Jenny is a friend of mine." self.pre_test_sleep = 0.3 self.post_test_sleep = 0.2 if os.path.isfile("/tmp/ldp_server_af_unix_socket"): os.remove("/tmp/ldp_server_af_unix_socket") super(VCLTestCase, self).setUp() def cut_thru_setup(self): self.vapi.session_enable_disable(is_enabled=1) def cut_thru_tear_down(self): self.vapi.session_enable_disable(is_enabled=0) def cut_thru_test(self, server_app, server_args, client_app, client_args): self.env = {'VCL_API_PREFIX': self.shm_prefix, 'VCL_APP_SCOPE_LOCAL': "true"} worker_server = VCLAppWorker(self.build_dir, server_app, server_args, self.logger, self.env) worker_server.start() self.sleep(self.pre_test_sleep) worker_client = VCLAppWorker(self.build_dir, client_app, client_args, self.logger, self.env) worker_client.start() worker_client.join(self.timeout) try: self.validateResults(worker_client, worker_server, self.timeout) except Exception as error: self.fail("Failed with %s" % error) self.sleep(self.post_test_sleep) def thru_host_stack_setup(self): self.vapi.session_enable_disable(is_enabled=1) self.create_loopback_interfaces(2) table_id = 1 for i in self.lo_interfaces: i.admin_up() if table_id != 0: tbl = VppIpTable(self, table_id) tbl.add_vpp_config() i.set_table_ip4(table_id) i.config_ip4() table_id += 1 # Configure namespaces self.vapi.app_namespace_add(namespace_id="1", secret=1234, sw_if_index=self.loop0.sw_if_index) self.vapi.app_namespace_add(namespace_id="2", secret=5678, sw_if_index=self.loop1.sw_if_index) # Add inter-table routes ip_t01 = VppIpRoute(self, self.loop1.local_ip4, 32, [VppRoutePath("0.0.0.0", 0xffffffff, nh_table_id=2)], table_id=1) ip_t10 = VppIpRoute(self, self.loop0.local_ip4, 32, [VppRoutePath("0.0.0.0", 0xffffffff, nh_table_id=1)], table_id=2) ip_t01.add_vpp_config() ip_t10.add_vpp_config() self.logger.debug(self.vapi.cli("show ip fib")) def thru_host_stack_tear_down(self): for i in self.lo_interfaces: i.unconfig_ip4() i.set_table_ip4(0) i.admin_down() self.vapi.session_enable_disable(is_enabled=0) def thru_host_stack_ipv6
SOURCE_PATH = $(CURDIR)/..