1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
|
#!/usr/bin/env python
import unittest
import os
from framework import VppTestCase, VppTestRunner, running_extended_tests
from vpp_ip_route import VppIpTable, VppIpRoute, VppRoutePath
class QUICTestCase(VppTestCase):
""" QUIC Test Case """
@classmethod
def setUpClass(cls):
super(QUICTestCase, cls).setUpClass()
@classmethod
def tearDownClass(cls):
super(QUICTestCase, cls).tearDownClass()
def setUp(self):
var = "VPP_BUILD_DIR"
self.build_dir = os.getenv(var, None)
if self.build_dir is None:
raise Exception("Environment variable `%s' not set" % var)
self.vppDebug = 'vpp_debug' in self.build_dir
self.timeout = 20
self.pre_test_sleep = 0.3
self.post_test_sleep = 0.3
self.vapi.session_enable_disable(is_enabled=1)
def tearDown(self):
self.vapi.session_enable_disable(is_enabled=0)
def thru_host_stack_ipv4_setup(self):
super(QUICTestCase, self).setUp()
self.create_loopback_interfaces(2)
self.uri = "quic://%s/1234" % self.loop0.local_ip4
common_args = ["uri", self.uri, "fifo-size", "4"]
self.server_echo_test_args = common_args + ["appns", "server"]
self.client_echo_test_args = common_args + ["appns", "client",
"bytes", "1024",
"test-bytes",
"no-output"]
table_id = 1
for i in self.lo_interfaces:
i.admin_up()
if table_id != 0:
tbl = VppIpTable(self, table_id)
tbl.add_vpp_config()
i.set_table_ip4(table_id)
i.config_ip4()
table_id += 1
# Configure namespaces
self.vapi.app_namespace_add_del(namespace_id=b"server",
sw_if_index=self.loop0.sw_if_index)
self.vapi.app_namespace_add_del(namespace_id=b"client",
sw_if_index=self.loop1.sw_if_index)
# Add inter-table routes
self.ip_t01 = VppIpRoute(self, self.loop1.local_ip4, 32,
[VppRoutePath("0.0.0.0",
0xffffffff,
nh_table_id=2)], table_id=1)
self.ip_t10 = VppIpRoute(self, self.loop0.local_ip4, 32,
[VppRoutePath("0.0.0.0",
0xffffffff,
nh_table_id=1)], table_id=2)
self.ip_t01.add_vpp_config()
self.ip_t10.add_vpp_config()
self.logger.debug(self.vapi.cli("show ip fib"))
def thru_host_stack_ipv4_tear_down(self):
# Delete inter-table routes
self.ip_t01.remove_vpp_config()
self.ip_t10.remove_vpp_config()
for i in self.lo_interfaces:
i.unconfig_ip4()
i.set_table_ip4(0)
i.admin_down()
def start_internal_echo_server(self, args):
error = self.vapi.cli("test echo server %s" % ' '.join(args))
if error:
self.logger.critical(error)
self.assertNotIn("failed", error)
def start_internal_echo_client(self, args):
error = self.vapi.cli("test echo client %s" % ' '.join(args))
if error:
self.logger.critical(error)
self.assertNotIn("failed", error)
def internal_ipv4_transfer_test(self, server_args, client_args):
self.start_internal_echo_server(server_args)
self.sleep(self.pre_test_sleep)
self.start_internal_echo_client(client_args)
self.sleep(self.post_test_sleep)
class QUICInternalEchoIPv4TestCase(QUICTestCase):
""" QUIC Internal Echo IPv4 Transfer Test Cases """
@classmethod
def setUpClass(cls):
super(QUICInternalEchoIPv4TestCase, cls).setUpClass()
@classmethod
def tearDownClass(cls):
super(QUICInternalEchoIPv4TestCase, cls).tearDownClass()
def setUp(self):
super(QUICInternalEchoIPv4TestCase, self).setUp()
self.thru_host_stack_ipv4_setup()
def tearDown(self):
self.thru_host_stack_ipv4_tear_down()
super(QUICInternalEchoIPv4TestCase, self).tearDown()
def show_commands_at_teardown(self):
self.logger.debug(self.vapi.cli("show session verbose 2"))
@unittest.skipUnless(running_extended_tests, "part of extended tests")
def test_quic_internal_transfer(self):
""" QUIC internal echo client/server transfer """
self.internal_ipv4_transfer_test(self.server_echo_test_args,
self.client_echo_test_args)
class QUICInternalEchoIPv4MultiStreamTestCase(QUICTestCase):
""" QUIC Internal Echo IPv4 Transfer Test Cases """
@classmethod
def setUpClass(cls):
super(QUICInternalEchoIPv4MultiStreamTestCase, cls).setUpClass()
@classmethod
def tearDownClass(cls):
super(QUICInternalEchoIPv4MultiStreamTestCase, cls).tearDownClass()
def setUp(self):
super(QUICInternalEchoIPv4MultiStreamTestCase, self).setUp()
self.thru_host_stack_ipv4_setup()
def tearDown(self):
self.thru_host_stack_ipv4_tear_down()
super(QUICInternalEchoIPv4MultiStreamTestCase, self).tearDown()
def show_commands_at_teardown(self):
self.logger.debug(self.vapi.cli("show session verbose 2"))
@unittest.skipUnless(running_extended_tests, "part of extended tests")
def test_quic_internal_multistream_transfer(self):
""" QUIC internal echo client/server multi-stream transfer """
self.internal_ipv4_transfer_test(self.server_echo_test_args,
self.client_echo_test_args +
["quic-streams", "10"])
if __name__ == '__main__':
unittest.main(testRunner=VppTestRunner)
|