#!/usr/bin/env python import unittest import os from framework import VppTestCase, VppTestRunner, running_extended_tests from vpp_ip_route import VppIpTable, VppIpRoute, VppRoutePath class QUICTestCase(VppTestCase): """ QUIC Test Case """ @classmethod def setUpClass(cls): super(QUICTestCase, cls).setUpClass() @classmethod def tearDownClass(cls): super(QUICTestCase, cls).tearDownClass() def setUp(self): var = "VPP_BUILD_DIR" self.build_dir = os.getenv(var, None) if self.build_dir is None: raise Exception("Environment variable `%s' not set" % var) self.vppDebug = 'vpp_debug' in self.build_dir self.timeout = 20 self.pre_test_sleep = 0.3 self.post_test_sleep = 0.3 self.vapi.session_enable_disable(is_enabled=1) def tearDown(self): self.vapi.session_enable_disable(is_enabled=0) def thru_host_stack_ipv4_setup(self): super(QUICTestCase, self).setUp() self.create_loopback_interfaces(2) self.uri = "quic://%s/1234" % self.loop0.local_ip4 common_args = ["uri", self.uri, "fifo-size", "4"] self.server_echo_test_args = common_args + ["appns", "server"] self.client_echo_test_args = common_args + ["appns", "client", "bytes", "1024", "test-bytes", "no-output"] table_id = 1 for i in self.lo_interfaces: i.admin_up() if table_id != 0: tbl = VppIpTable(self, table_id) tbl.add_vpp_config() i.set_table_ip4(table_id) i.config_ip4() table_id += 1 # Configure namespaces self.vapi.app_namespace_add_del(namespace_id=b"server", sw_if_index=self.loop0.sw_if_index) self.vapi.app_namespace_add_del(namespace_id=b"client", sw_if_index=self.loop1.sw_if_index) # Add inter-table routes self.ip_t01 = VppIpRoute(self, self.loop1.local_ip4, 32, [VppRoutePath("0.0.0.0", 0xffffffff, nh_table_id=2)], table_id=1) self.ip_t10 = VppIpRoute(self, self.loop0.local_ip4, 32, [VppRoutePath("0.0.0.0", 0xffffffff, nh_table_id=1)], table_id=2) self.ip_t01.add_vpp_config() self.ip_t10.add_vpp_config() self.logger.debug(self.vapi.cli("show ip fib")) def thru_host_stack_ipv4_tear_down(self): # Delete inter-table routes self.ip_t01.remove_vpp_config() self.ip_t10.remove_vpp_config() for i in self.lo_interfaces: i.unconfig_ip4() i.set_table_ip4(0) i.admin_down() def start_internal_echo_server(self, args): error = self.vapi.cli("test echo server %s" % ' '.join(args)) if error: self.logger.critical(error) self.assertNotIn("failed", error) def start_internal_echo_client(self, args): error = self.vapi.cli("test echo client %s" % ' '.join(args)) if error: self.logger.critical(error) self.assertNotIn("failed", error) def internal_ipv4_transfer_test(self, server_args, client_args): self.start_internal_echo_server(server_args) self.sleep(self.pre_test_sleep) self.start_internal_echo_client(client_args) self.sleep(self.post_test_sleep) class QUICInternalEchoIPv4TestCase(QUICTestCase): """ QUIC Internal Echo IPv4 Transfer Test Cases """ @classmethod def setUpClass(cls): super(QUICInternalEchoIPv4TestCase, cls).setUpClass() @classmethod def tearDownClass(cls): super(QUICInternalEchoIPv4TestCase, cls).tearDownClass() def setUp(self): super(QUICInternalEchoIPv4TestCase, self).setUp() self.thru_host_stack_ipv4_setup() def tearDown(self): self.thru_host_stack_ipv4_tear_down() super(QUICInternalEchoIPv4TestCase, self).tearDown() def show_commands_at_teardown(self): self.logger.debug(self.vapi.cli("show session verbose 2")) @unittest.skipUnless(running_extended_tests, "part of extended tests") def test_quic_internal_transfer(self): """ QUIC internal echo client/server transfer """ self.internal_ipv4_transfer_test(self.server_echo_test_args, self.client_echo_test_args) class QUICInternalEchoIPv4MultiStreamTestCase(QUICTestCase): """ QUIC Internal Echo IPv4 Transfer Test Cases """ @classmethod def setUpClass(cls): super(QUICInternalEchoIPv4MultiStreamTestCase, cls).setUpClass() @classmethod def tearDownClass(cls): super(QUICInternalEchoIPv4MultiStreamTestCase, cls).tearDownClass() def setUp(self): super(QUICInternalEchoIPv4MultiStreamTestCase, self).setUp() self.thru_host_stack_ipv4_setup() def tearDown(self): self.thru_host_stack_ipv4_tear_down() super(QUICInternalEchoIPv4MultiStreamTestCase, self).tearDown() def show_commands_at_teardown(self): self.logger.debug(self.vapi.cli("show session verbose 2")) @unittest.skipUnless(running_extended_tests, "part of extended tests") def test_quic_internal_multistream_transfer(self): """ QUIC internal echo client/server multi-stream transfer """ self.internal_ipv4_transfer_test(self.server_echo_test_args, self.client_echo_test_args + ["quic-streams", "10"]) if __name__ == '__main__': unittest.main(testRunner=VppTestRunner)