diff options
Diffstat (limited to 'src/vpp-api/python/vpp_papi')
-rw-r--r-- | src/vpp-api/python/vpp_papi/__init__.py | 3 | ||||
-rw-r--r-- | src/vpp-api/python/vpp_papi/macaddress.py | 17 | ||||
-rw-r--r-- | src/vpp-api/python/vpp_papi/tests/test_macaddress.py | 6 | ||||
-rw-r--r-- | src/vpp-api/python/vpp_papi/tests/test_vpp_format.py | 104 | ||||
-rw-r--r-- | src/vpp-api/python/vpp_papi/tests/test_vpp_papi.py | 10 | ||||
-rwxr-xr-x | src/vpp-api/python/vpp_papi/tests/test_vpp_serializer.py | 775 | ||||
-rw-r--r-- | src/vpp-api/python/vpp_papi/vpp_format.py | 216 | ||||
-rw-r--r-- | src/vpp-api/python/vpp_papi/vpp_papi.py | 350 | ||||
-rw-r--r-- | src/vpp-api/python/vpp_papi/vpp_serializer.py | 259 | ||||
-rwxr-xr-x | src/vpp-api/python/vpp_papi/vpp_stats.py | 308 | ||||
-rw-r--r-- | src/vpp-api/python/vpp_papi/vpp_transport_socket.py | 34 |
11 files changed, 1089 insertions, 993 deletions
diff --git a/src/vpp-api/python/vpp_papi/__init__.py b/src/vpp-api/python/vpp_papi/__init__.py index b2b4fc78fc1..f87b6480d4e 100644 --- a/src/vpp-api/python/vpp_papi/__init__.py +++ b/src/vpp-api/python/vpp_papi/__init__.py @@ -3,7 +3,7 @@ from .vpp_papi import VppEnum, VppEnumType, VppEnumFlag # noqa: F401 from .vpp_papi import VPPIOError, VPPRuntimeError, VPPValueError # noqa: F401 from .vpp_papi import VPPApiClient # noqa: F401 from .vpp_papi import VPPApiJSONFiles # noqa: F401 -from . macaddress import MACAddress, mac_pton, mac_ntop # noqa: F401 +from .macaddress import MACAddress, mac_pton, mac_ntop # noqa: F401 # sorted lexicographically from .vpp_serializer import BaseTypes # noqa: F401 @@ -11,6 +11,7 @@ from .vpp_serializer import VPPEnumType, VPPType, VPPTypeAlias # noqa: F401 from .vpp_serializer import VPPMessage, VPPUnionType # noqa: F401 import pkg_resources # part of setuptools + try: __version__ = pkg_resources.get_distribution("vpp_papi").version except (pkg_resources.DistributionNotFound): diff --git a/src/vpp-api/python/vpp_papi/macaddress.py b/src/vpp-api/python/vpp_papi/macaddress.py index c3b10a3c11e..8799bd7be24 100644 --- a/src/vpp-api/python/vpp_papi/macaddress.py +++ b/src/vpp-api/python/vpp_papi/macaddress.py @@ -18,20 +18,19 @@ import binascii def mac_pton(s): - '''Convert MAC address as text to binary''' - return binascii.unhexlify(s.replace(':', '')) + """Convert MAC address as text to binary""" + return binascii.unhexlify(s.replace(":", "")) def mac_ntop(binary): - '''Convert MAC address as binary to text''' - x = b':'.join(binascii.hexlify(binary)[i:i + 2] - for i in range(0, 12, 2)) - return str(x.decode('ascii')) + """Convert MAC address as binary to text""" + x = b":".join(binascii.hexlify(binary)[i : i + 2] for i in range(0, 12, 2)) + return str(x.decode("ascii")) -class MACAddress(): +class MACAddress: def __init__(self, mac): - '''MAC Address as a text-string (aa:bb:cc:dd:ee:ff) or 6 bytes''' + """MAC Address as a text-string (aa:bb:cc:dd:ee:ff) or 6 bytes""" # Of course Python 2 doesn't distinguish str from bytes if type(mac) is bytes and len(mac) == 6: self.mac_binary = mac @@ -51,7 +50,7 @@ class MACAddress(): return self.mac_string def __repr__(self): - return '%s(%s)' % (self.__class__.__name__, self.mac_string) + return "%s(%s)" % (self.__class__.__name__, self.mac_string) def __eq__(self, other): diff --git a/src/vpp-api/python/vpp_papi/tests/test_macaddress.py b/src/vpp-api/python/vpp_papi/tests/test_macaddress.py index 08e365afd92..e86ec75c76e 100644 --- a/src/vpp-api/python/vpp_papi/tests/test_macaddress.py +++ b/src/vpp-api/python/vpp_papi/tests/test_macaddress.py @@ -3,8 +3,6 @@ from vpp_papi import MACAddress class TestMacAddress(unittest.TestCase): - def test_eq(self): - mac = '11:22:33:44:55:66' - self.assertEqual(MACAddress(mac), - MACAddress(mac)) + mac = "11:22:33:44:55:66" + self.assertEqual(MACAddress(mac), MACAddress(mac)) diff --git a/src/vpp-api/python/vpp_papi/tests/test_vpp_format.py b/src/vpp-api/python/vpp_papi/tests/test_vpp_format.py index 5c179c02e0a..ae4d2c5126d 100644 --- a/src/vpp-api/python/vpp_papi/tests/test_vpp_format.py +++ b/src/vpp-api/python/vpp_papi/tests/test_vpp_format.py @@ -25,57 +25,64 @@ from vpp_papi import vpp_format from parameterized import parameterized -ip4_addr = '1.2.3.4' -ip4_addrn = b'\x01\x02\x03\x04' +ip4_addr = "1.2.3.4" +ip4_addrn = b"\x01\x02\x03\x04" ip4_prefix_len = 32 -ip4_prefix = '%s/%s' % (ip4_addr, ip4_prefix_len) +ip4_prefix = "%s/%s" % (ip4_addr, ip4_prefix_len) ipv4_network = ipaddress.IPv4Network(text_type(ip4_prefix)) -ip4_addr_format_vl_api_address_t = {'un': {'ip4': b'\x01\x02\x03\x04'}, - 'af': 0} -ip4_addr_format_vl_api_prefix_t = {'address': # noqa: E127,E501 - {'un': {'ip4': b'\x01\x02\x03\x04'}, - 'af': 0}, - 'len': ip4_prefix_len} -ip4_addr_format_vl_api_prefix_packed_t = {'address': b'\x01\x02\x03\x04', - 'len': ip4_prefix_len} - -ip6_addr = 'dead::' -ip6_addrn = b'\xde\xad\x00\x00\x00\x00\x00\x00' \ - b'\x00\x00\x00\x00\x00\x00\x00\x00' +ip4_addr_format_vl_api_address_t = {"un": {"ip4": b"\x01\x02\x03\x04"}, "af": 0} +ip4_addr_format_vl_api_prefix_t = { + "address": {"un": {"ip4": b"\x01\x02\x03\x04"}, "af": 0}, # noqa: E127,E501 + "len": ip4_prefix_len, +} +ip4_addr_format_vl_api_prefix_packed_t = { + "address": b"\x01\x02\x03\x04", + "len": ip4_prefix_len, +} + +ip6_addr = "dead::" +ip6_addrn = b"\xde\xad\x00\x00\x00\x00\x00\x00" b"\x00\x00\x00\x00\x00\x00\x00\x00" ip6_prefix_len = 127 -ip6_prefix = '%s/%s' % (ip6_addr, ip6_prefix_len) +ip6_prefix = "%s/%s" % (ip6_addr, ip6_prefix_len) ipv6_network = ipaddress.IPv6Network(text_type(ip6_prefix)) -ip6_addr_format_vl_api_address_t = {'un': {'ip6': b'\xde\xad\x00\x00' - b'\x00\x00\x00\x00' - b'\x00\x00\x00\x00' - b'\x00\x00\x00\x00'}, - 'af': 1} -ip6_addr_format_vl_api_prefix_t = {'address': # noqa: E127 - {'af': 1, - 'un': { - 'ip6': b'\xde\xad\x00\x00' - b'\x00\x00\x00\x00' - b'\x00\x00\x00\x00' - b'\x00\x00\x00\x00'}}, - 'len': ip6_prefix_len} -ip6_addr_format_vl_api_prefix_packed_t = {'address': b'\xde\xad\x00\x00' # noqa: E127,E501 - b'\x00\x00\x00\x00' - b'\x00\x00\x00\x00' - b'\x00\x00\x00\x00', - 'len': ip6_prefix_len} +ip6_addr_format_vl_api_address_t = { + "un": { + "ip6": b"\xde\xad\x00\x00" + b"\x00\x00\x00\x00" + b"\x00\x00\x00\x00" + b"\x00\x00\x00\x00" + }, + "af": 1, +} +ip6_addr_format_vl_api_prefix_t = { + "address": { # noqa: E127 + "af": 1, + "un": { + "ip6": b"\xde\xad\x00\x00" + b"\x00\x00\x00\x00" + b"\x00\x00\x00\x00" + b"\x00\x00\x00\x00" + }, + }, + "len": ip6_prefix_len, +} +ip6_addr_format_vl_api_prefix_packed_t = { + "address": b"\xde\xad\x00\x00" # noqa: E127,E501 + b"\x00\x00\x00\x00" + b"\x00\x00\x00\x00" + b"\x00\x00\x00\x00", + "len": ip6_prefix_len, +} class TestVppFormat(unittest.TestCase): - def test_format_vl_api_address_t(self): res = vpp_format.format_vl_api_address_t(ip4_addr) self.assertEqual(res, ip4_addr_format_vl_api_address_t) # PY2: raises socket.error # PY3: raises OSError - with self.assertRaises((TypeError, - socket.error, - OSError)): + with self.assertRaises((TypeError, socket.error, OSError)): res = vpp_format.format_vl_api_address_t(ip4_addrn) res = vpp_format.format_vl_api_address_t(ip6_addr) @@ -84,19 +91,14 @@ class TestVppFormat(unittest.TestCase): with self.assertRaises(TypeError): es = vpp_format.format_vl_api_address_t(ip6_addrn) - @parameterized.expand([('ip4 prefix', - ip4_prefix, - ip4_addr_format_vl_api_prefix_t), - ('ip6 prefix', - ip6_prefix, - ip6_addr_format_vl_api_prefix_t), - ('IPv4Network', - ipv4_network, - ip4_addr_format_vl_api_prefix_t), - ('IPv6Network', - ipv6_network, - ip6_addr_format_vl_api_prefix_t), - ]) + @parameterized.expand( + [ + ("ip4 prefix", ip4_prefix, ip4_addr_format_vl_api_prefix_t), + ("ip6 prefix", ip6_prefix, ip6_addr_format_vl_api_prefix_t), + ("IPv4Network", ipv4_network, ip4_addr_format_vl_api_prefix_t), + ("IPv6Network", ipv6_network, ip6_addr_format_vl_api_prefix_t), + ] + ) def test_format_vl_api_prefix_t(self, _, arg, expected): res = vpp_format.format_vl_api_prefix_t(arg) self.assertEqual(res, expected) diff --git a/src/vpp-api/python/vpp_papi/tests/test_vpp_papi.py b/src/vpp-api/python/vpp_papi/tests/test_vpp_papi.py index 99acb7c7469..2b21c83966a 100644 --- a/src/vpp-api/python/vpp_papi/tests/test_vpp_papi.py +++ b/src/vpp-api/python/vpp_papi/tests/test_vpp_papi.py @@ -24,7 +24,7 @@ from vpp_papi import vpp_transport_shmem class TestVppPapiVPPApiClient(unittest.TestCase): def test_getcontext(self): - vpp_papi.VPPApiClient.apidir = '.' + vpp_papi.VPPApiClient.apidir = "." c = vpp_papi.VPPApiClient(testmode=True, use_socket=True) # reset initialization at module load time. @@ -39,7 +39,7 @@ class TestVppPapiVPPApiClientMp(unittest.TestCase): # run_tests.py (eg. make test TEST_JOBS=10) def test_get_context_mp(self): - vpp_papi.VPPApiClient.apidir = '.' + vpp_papi.VPPApiClient.apidir = "." c = vpp_papi.VPPApiClient(testmode=True, use_socket=True) # reset initialization at module load time. @@ -243,11 +243,11 @@ class TestVppPapiLogging(unittest.TestCase): pass client = Vpp - with self.assertLogs('vpp_papi', level='DEBUG') as cm: + with self.assertLogs("vpp_papi", level="DEBUG") as cm: vpp_papi.vpp_atexit(client) - self.assertEqual(cm.output, ['DEBUG:vpp_papi:Cleaning up VPP on exit']) + self.assertEqual(cm.output, ["DEBUG:vpp_papi:Cleaning up VPP on exit"]) with self.assertRaises(AssertionError): - with self.assertLogs('vpp_papi.serializer', level='DEBUG') as cm: + with self.assertLogs("vpp_papi.serializer", level="DEBUG") as cm: vpp_papi.vpp_atexit(client) self.assertEqual(cm.output, []) diff --git a/src/vpp-api/python/vpp_papi/tests/test_vpp_serializer.py b/src/vpp-api/python/vpp_papi/tests/test_vpp_serializer.py index c9b3d672d6a..eee38f00632 100755 --- a/src/vpp-api/python/vpp_papi/tests/test_vpp_serializer.py +++ b/src/vpp-api/python/vpp_papi/tests/test_vpp_serializer.py @@ -13,61 +13,57 @@ from ipaddress import * class TestLimits(unittest.TestCase): def test_string(self): - fixed_string = VPPType('fixed_string', - [['string', 'name', 16]]) + fixed_string = VPPType("fixed_string", [["string", "name", 16]]) - b = fixed_string.pack({'name': 'foobar'}) + b = fixed_string.pack({"name": "foobar"}) self.assertEqual(len(b), 16) # Ensure string is nul terminated - self.assertEqual(b.decode('ascii')[6], '\x00') + self.assertEqual(b.decode("ascii")[6], "\x00") nt, size = fixed_string.unpack(b) self.assertEqual(size, 16) - self.assertEqual(nt.name, 'foobar') + self.assertEqual(nt.name, "foobar") # Empty string - b = fixed_string.pack({'name': ''}) + b = fixed_string.pack({"name": ""}) self.assertEqual(len(b), 16) nt, size = fixed_string.unpack(b) self.assertEqual(size, 16) - self.assertEqual(nt.name, '') + self.assertEqual(nt.name, "") # String too long with self.assertRaises(VPPSerializerValueError): - b = fixed_string.pack({'name': 'foobarfoobar1234'}) + b = fixed_string.pack({"name": "foobarfoobar1234"}) - variable_string = VPPType('variable_string', - [['string', 'name', 0]]) - b = variable_string.pack({'name': 'foobar'}) - self.assertEqual(len(b), 4 + len('foobar')) + variable_string = VPPType("variable_string", [["string", "name", 0]]) + b = variable_string.pack({"name": "foobar"}) + self.assertEqual(len(b), 4 + len("foobar")) nt, size = variable_string.unpack(b) - self.assertEqual(size, 4 + len('foobar')) - self.assertEqual(nt.name, 'foobar') - self.assertEqual(len(nt.name), len('foobar')) + self.assertEqual(size, 4 + len("foobar")) + self.assertEqual(nt.name, "foobar") + self.assertEqual(len(nt.name), len("foobar")) def test_limit(self): - limited_type = VPPType('limited_type_t', - [['string', 'name', 0, {'limit': 16}]]) - unlimited_type = VPPType('limited_type_t', - [['string', 'name', 0]]) + limited_type = VPPType("limited_type_t", [["string", "name", 0, {"limit": 16}]]) + unlimited_type = VPPType("limited_type_t", [["string", "name", 0]]) - b = limited_type.pack({'name': 'foobar'}) + b = limited_type.pack({"name": "foobar"}) self.assertEqual(len(b), 10) - b = unlimited_type.pack({'name': 'foobar'}) + b = unlimited_type.pack({"name": "foobar"}) self.assertEqual(len(b), 10) with self.assertRaises(VPPSerializerValueError): - b = limited_type.pack({'name': 'foobar'*3}) + b = limited_type.pack({"name": "foobar" * 3}) class TestDefaults(unittest.TestCase): def test_defaults(self): - default_type = VPPType('default_type_t', - [['u16', 'mtu', {'default': 1500, 'limit': 0}]]) - without_default_type = VPPType('without_default_type_t', - [['u16', 'mtu']]) + default_type = VPPType( + "default_type_t", [["u16", "mtu", {"default": 1500, "limit": 0}]] + ) + without_default_type = VPPType("without_default_type_t", [["u16", "mtu"]]) b = default_type.pack({}) self.assertEqual(len(b), 2) @@ -76,7 +72,7 @@ class TestDefaults(unittest.TestCase): self.assertEqual(nt.mtu, 1500) # distinguish between parameter 0 and parameter not passed - b = default_type.pack({'mtu': 0}) + b = default_type.pack({"mtu": 0}) self.assertEqual(len(b), 2) nt, size = default_type.unpack(b) self.assertEqual(len(b), size) @@ -90,13 +86,15 @@ class TestDefaults(unittest.TestCase): self.assertEqual(nt.mtu, 0) # default enum type - VPPEnumType('vl_api_enum_t', [["ADDRESS_IP4", 0], - ["ADDRESS_IP6", 1], - {"enumtype": "u32"}]) + VPPEnumType( + "vl_api_enum_t", + [["ADDRESS_IP4", 0], ["ADDRESS_IP6", 1], {"enumtype": "u32"}], + ) - default_with_enum = VPPType('default_enum_type_t', - [['u16', 'mtu'], ['vl_api_enum_t', - 'e', {'default': 1}]]) + default_with_enum = VPPType( + "default_enum_type_t", + [["u16", "mtu"], ["vl_api_enum_t", "e", {"default": 1}]], + ) b = default_with_enum.pack({}) self.assertEqual(len(b), 6) @@ -106,275 +104,275 @@ class TestDefaults(unittest.TestCase): class TestAddType(unittest.TestCase): - def test_union(self): - un = VPPUnionType('test_union', - [['u8', 'is_bool'], - ['u32', 'is_int']]) + un = VPPUnionType("test_union", [["u8", "is_bool"], ["u32", "is_int"]]) - b = un.pack({'is_int': 0x12345678}) + b = un.pack({"is_int": 0x12345678}) nt, size = un.unpack(b) self.assertEqual(len(b), size) self.assertEqual(nt.is_bool, 0x12) self.assertEqual(nt.is_int, 0x12345678) def test_address(self): - af = VPPEnumType('vl_api_address_family_t', [["ADDRESS_IP4", 0], - ["ADDRESS_IP6", 1], - {"enumtype": "u32"}]) - aff = VPPEnumFlagType('vl_api_address_family_flag_t', [["ADDRESS_IP4", 0], - ["ADDRESS_IP6", 1], - {"enumtype": "u32"}]) - ip4 = VPPTypeAlias('vl_api_ip4_address_t', {'type': 'u8', - 'length': 4}) - ip6 = VPPTypeAlias('vl_api_ip6_address_t', {'type': 'u8', - 'length': 16}) - VPPUnionType('vl_api_address_union_t', - [["vl_api_ip4_address_t", "ip4"], - ["vl_api_ip6_address_t", "ip6"]]) - - address = VPPType('vl_api_address_t', - [['vl_api_address_family_t', 'af'], - ['vl_api_address_union_t', 'un']]) - - prefix = VPPType('vl_api_prefix_t', - [['vl_api_address_t', 'address'], - ['u8', 'len']]) - - va_address_list = VPPType('list_addresses', - [['u8', 'count'], - ['vl_api_address_t', 'addresses', - 0, 'count']]) - - message_with_va_address_list = VPPType('msg_with_vla', - [['list_addresses', - 'vla_address'], - ['u8', 'is_cool']]) - - b = ip4.pack(inet_pton(AF_INET, '1.1.1.1')) + af = VPPEnumType( + "vl_api_address_family_t", + [["ADDRESS_IP4", 0], ["ADDRESS_IP6", 1], {"enumtype": "u32"}], + ) + aff = VPPEnumFlagType( + "vl_api_address_family_flag_t", + [["ADDRESS_IP4", 0], ["ADDRESS_IP6", 1], {"enumtype": "u32"}], + ) + ip4 = VPPTypeAlias("vl_api_ip4_address_t", {"type": "u8", "length": 4}) + ip6 = VPPTypeAlias("vl_api_ip6_address_t", {"type": "u8", "length": 16}) + VPPUnionType( + "vl_api_address_union_t", + [["vl_api_ip4_address_t", "ip4"], ["vl_api_ip6_address_t", "ip6"]], + ) + + address = VPPType( + "vl_api_address_t", + [["vl_api_address_family_t", "af"], ["vl_api_address_union_t", "un"]], + ) + + prefix = VPPType( + "vl_api_prefix_t", [["vl_api_address_t", "address"], ["u8", "len"]] + ) + + va_address_list = VPPType( + "list_addresses", + [["u8", "count"], ["vl_api_address_t", "addresses", 0, "count"]], + ) + + message_with_va_address_list = VPPType( + "msg_with_vla", [["list_addresses", "vla_address"], ["u8", "is_cool"]] + ) + + b = ip4.pack(inet_pton(AF_INET, "1.1.1.1")) self.assertEqual(len(b), 4) nt, size = ip4.unpack(b) - self.assertEqual(str(nt), '1.1.1.1') + self.assertEqual(str(nt), "1.1.1.1") - b = ip6.pack(inet_pton(AF_INET6, '1::1')) + b = ip6.pack(inet_pton(AF_INET6, "1::1")) self.assertEqual(len(b), 16) - b = address.pack({'af': af.ADDRESS_IP4, - 'un': - {'ip4': inet_pton(AF_INET, '2.2.2.2')}}) + b = address.pack( + {"af": af.ADDRESS_IP4, "un": {"ip4": inet_pton(AF_INET, "2.2.2.2")}} + ) self.assertEqual(len(b), 20) nt, size = address.unpack(b) - self.assertEqual(str(nt), '2.2.2.2') + self.assertEqual(str(nt), "2.2.2.2") # List of addresses address_list = [] for i in range(4): - address_list.append({'af': af.ADDRESS_IP4, - 'un': - {'ip4': inet_pton(AF_INET, '2.2.2.2')}}) - b = va_address_list.pack({'count': len(address_list), - 'addresses': address_list}) + address_list.append( + {"af": af.ADDRESS_IP4, "un": {"ip4": inet_pton(AF_INET, "2.2.2.2")}} + ) + b = va_address_list.pack( + {"count": len(address_list), "addresses": address_list} + ) self.assertEqual(len(b), 81) nt, size = va_address_list.unpack(b) - self.assertEqual(str(nt.addresses[0]), '2.2.2.2') - - b = message_with_va_address_list.pack({'vla_address': - {'count': len(address_list), - 'addresses': address_list}, - 'is_cool': 100}) + self.assertEqual(str(nt.addresses[0]), "2.2.2.2") + + b = message_with_va_address_list.pack( + { + "vla_address": {"count": len(address_list), "addresses": address_list}, + "is_cool": 100, + } + ) self.assertEqual(len(b), 82) nt, size = message_with_va_address_list.unpack(b) self.assertEqual(nt.is_cool, 100) def test_address_with_prefix(self): - af = VPPEnumType('vl_api_address_family_t', [["ADDRESS_IP4", 0], - ["ADDRESS_IP6", 1], - {"enumtype": "u32"}]) - ip4 = VPPTypeAlias('vl_api_ip4_address_t', {'type': 'u8', - 'length': 4}) - ip6 = VPPTypeAlias('vl_api_ip6_address_t', {'type': 'u8', - 'length': 16}) - VPPUnionType('vl_api_address_union_t', - [["vl_api_ip4_address_t", "ip4"], - ["vl_api_ip6_address_t", "ip6"]]) - - address = VPPType('vl_api_address_t', - [['vl_api_address_family_t', 'af'], - ['vl_api_address_union_t', 'un']]) - - prefix = VPPType('vl_api_prefix_t', - [['vl_api_address_t', 'address'], - ['u8', 'len']]) - prefix4 = VPPType('vl_api_ip4_prefix_t', - [['vl_api_ip4_address_t', 'address'], - ['u8', 'len']]) - prefix6 = VPPType('vl_api_ip6_prefix_t', - [['vl_api_ip6_address_t', 'address'], - ['u8', 'len']]) - - address_with_prefix = VPPTypeAlias('vl_api_address_with_prefix_t', {'type': 'vl_api_prefix_t' }) - address4_with_prefix = VPPTypeAlias('vl_api_ip4_address_with_prefix_t', - {'type': 'vl_api_ip4_prefix_t' }) - address6_with_prefix = VPPTypeAlias('vl_api_ip6_address_with_prefix_t', - {'type': 'vl_api_ip6_prefix_t' }) - - awp_type = VPPType('foobar_t', - [['vl_api_address_with_prefix_t', 'address']]) + af = VPPEnumType( + "vl_api_address_family_t", + [["ADDRESS_IP4", 0], ["ADDRESS_IP6", 1], {"enumtype": "u32"}], + ) + ip4 = VPPTypeAlias("vl_api_ip4_address_t", {"type": "u8", "length": 4}) + ip6 = VPPTypeAlias("vl_api_ip6_address_t", {"type": "u8", "length": 16}) + VPPUnionType( + "vl_api_address_union_t", + [["vl_api_ip4_address_t", "ip4"], ["vl_api_ip6_address_t", "ip6"]], + ) + + address = VPPType( + "vl_api_address_t", + [["vl_api_address_family_t", "af"], ["vl_api_address_union_t", "un"]], + ) + + prefix = VPPType( + "vl_api_prefix_t", [["vl_api_address_t", "address"], ["u8", "len"]] + ) + prefix4 = VPPType( + "vl_api_ip4_prefix_t", [["vl_api_ip4_address_t", "address"], ["u8", "len"]] + ) + prefix6 = VPPType( + "vl_api_ip6_prefix_t", [["vl_api_ip6_address_t", "address"], ["u8", "len"]] + ) + + address_with_prefix = VPPTypeAlias( + "vl_api_address_with_prefix_t", {"type": "vl_api_prefix_t"} + ) + address4_with_prefix = VPPTypeAlias( + "vl_api_ip4_address_with_prefix_t", {"type": "vl_api_ip4_prefix_t"} + ) + address6_with_prefix = VPPTypeAlias( + "vl_api_ip6_address_with_prefix_t", {"type": "vl_api_ip6_prefix_t"} + ) + + awp_type = VPPType("foobar_t", [["vl_api_address_with_prefix_t", "address"]]) # address with prefix - b = address_with_prefix.pack(IPv4Interface('2.2.2.2/24')) + b = address_with_prefix.pack(IPv4Interface("2.2.2.2/24")) self.assertEqual(len(b), 21) nt, size = address_with_prefix.unpack(b) self.assertTrue(isinstance(nt, IPv4Interface)) - self.assertEqual(str(nt), '2.2.2.2/24') + self.assertEqual(str(nt), "2.2.2.2/24") - b = address_with_prefix.pack(IPv6Interface('2::2/64')) + b = address_with_prefix.pack(IPv6Interface("2::2/64")) self.assertEqual(len(b), 21) nt, size = address_with_prefix.unpack(b) self.assertTrue(isinstance(nt, IPv6Interface)) - self.assertEqual(str(nt), '2::2/64') + self.assertEqual(str(nt), "2::2/64") - b = address_with_prefix.pack(IPv4Network('2.2.2.2/24', strict=False)) + b = address_with_prefix.pack(IPv4Network("2.2.2.2/24", strict=False)) self.assertEqual(len(b), 21) nt, size = address_with_prefix.unpack(b) self.assertTrue(isinstance(nt, IPv4Interface)) - self.assertEqual(str(nt), '2.2.2.0/24') + self.assertEqual(str(nt), "2.2.2.0/24") - b = address4_with_prefix.pack('2.2.2.2/24') + b = address4_with_prefix.pack("2.2.2.2/24") self.assertEqual(len(b), 5) nt, size = address4_with_prefix.unpack(b) self.assertTrue(isinstance(nt, IPv4Interface)) - self.assertEqual(str(nt), '2.2.2.2/24') - b = address4_with_prefix.pack(IPv4Interface('2.2.2.2/24')) + self.assertEqual(str(nt), "2.2.2.2/24") + b = address4_with_prefix.pack(IPv4Interface("2.2.2.2/24")) self.assertEqual(len(b), 5) - b = address6_with_prefix.pack('2::2/64') + b = address6_with_prefix.pack("2::2/64") self.assertEqual(len(b), 17) nt, size = address6_with_prefix.unpack(b) self.assertTrue(isinstance(nt, IPv6Interface)) - self.assertEqual(str(nt), '2::2/64') - b = address6_with_prefix.pack(IPv6Interface('2::2/64')) + self.assertEqual(str(nt), "2::2/64") + b = address6_with_prefix.pack(IPv6Interface("2::2/64")) self.assertEqual(len(b), 17) - b = prefix.pack('192.168.10.0/24') + b = prefix.pack("192.168.10.0/24") self.assertEqual(len(b), 21) nt, size = prefix.unpack(b) self.assertTrue(isinstance(nt, IPv4Network)) - self.assertEqual(str(nt), '192.168.10.0/24') + self.assertEqual(str(nt), "192.168.10.0/24") - b = awp_type.pack({'address': '1.2.3.4/24'}) + b = awp_type.pack({"address": "1.2.3.4/24"}) self.assertEqual(len(b), 21) nt, size = awp_type.unpack(b) self.assertTrue(isinstance(nt.address, IPv4Interface)) - self.assertEqual(str(nt.address), '1.2.3.4/24') + self.assertEqual(str(nt.address), "1.2.3.4/24") - b = awp_type.pack({'address': IPv4Interface('1.2.3.4/24')}) + b = awp_type.pack({"address": IPv4Interface("1.2.3.4/24")}) self.assertEqual(len(b), 21) nt, size = awp_type.unpack(b) self.assertTrue(isinstance(nt.address, IPv4Interface)) - self.assertEqual(str(nt.address), '1.2.3.4/24') + self.assertEqual(str(nt.address), "1.2.3.4/24") def test_recursive_address(self): - af = VPPEnumType('vl_api_address_family_t', [["ADDRESS_IP4", 0], - ["ADDRESS_IP6", 1], - {"enumtype": "u32"}]) - ip4 = VPPTypeAlias('vl_api_ip4_address_t', {'type': 'u8', - 'length': 4}) - b = ip4.pack('1.1.1.1') + af = VPPEnumType( + "vl_api_address_family_t", + [["ADDRESS_IP4", 0], ["ADDRESS_IP6", 1], {"enumtype": "u32"}], + ) + ip4 = VPPTypeAlias("vl_api_ip4_address_t", {"type": "u8", "length": 4}) + b = ip4.pack("1.1.1.1") self.assertEqual(len(b), 4) nt, size = ip4.unpack(b) - self.assertEqual(str(nt), '1.1.1.1') + self.assertEqual(str(nt), "1.1.1.1") - ip6 = VPPTypeAlias('vl_api_ip6_address_t', {'type': 'u8', - 'length': 16}) - VPPUnionType('vl_api_address_union_t', - [["vl_api_ip4_address_t", "ip4"], - ["vl_api_ip6_address_t", "ip6"]]) + ip6 = VPPTypeAlias("vl_api_ip6_address_t", {"type": "u8", "length": 16}) + VPPUnionType( + "vl_api_address_union_t", + [["vl_api_ip4_address_t", "ip4"], ["vl_api_ip6_address_t", "ip6"]], + ) - address = VPPType('vl_api_address_t', - [['vl_api_address_family_t', 'af'], - ['vl_api_address_union_t', 'un']]) + address = VPPType( + "vl_api_address_t", + [["vl_api_address_family_t", "af"], ["vl_api_address_union_t", "un"]], + ) - prefix = VPPType('vl_api_prefix_t', - [['vl_api_address_t', 'address'], - ['u8', 'len']]) - message = VPPMessage('svs', - [['vl_api_prefix_t', 'prefix']]) - message_addr = VPPMessage('svs_address', - [['vl_api_address_t', 'address']]) + prefix = VPPType( + "vl_api_prefix_t", [["vl_api_address_t", "address"], ["u8", "len"]] + ) + message = VPPMessage("svs", [["vl_api_prefix_t", "prefix"]]) + message_addr = VPPMessage("svs_address", [["vl_api_address_t", "address"]]) - b = message_addr.pack({'address': "1::1"}) + b = message_addr.pack({"address": "1::1"}) self.assertEqual(len(b), 20) nt, size = message_addr.unpack(b) self.assertEqual("1::1", str(nt.address)) - b = message_addr.pack({'address': "1.1.1.1"}) + b = message_addr.pack({"address": "1.1.1.1"}) self.assertEqual(len(b), 20) nt, size = message_addr.unpack(b) self.assertEqual("1.1.1.1", str(nt.address)) - b = message.pack({'prefix': "1.1.1.0/24"}) + b = message.pack({"prefix": "1.1.1.0/24"}) self.assertEqual(len(b), 21) nt, size = message.unpack(b) self.assertEqual("1.1.1.0/24", str(nt.prefix)) - message_array = VPPMessage('address_array', - [['vl_api_ip6_address_t', - 'addresses', 2]]) - b = message_array.pack({'addresses': [IPv6Address(u"1::1"), "2::2"]}) + message_array = VPPMessage( + "address_array", [["vl_api_ip6_address_t", "addresses", 2]] + ) + b = message_array.pack({"addresses": [IPv6Address("1::1"), "2::2"]}) self.assertEqual(len(b), 32) - message_array_vla = VPPMessage('address_array_vla', - [['u32', 'num'], - ['vl_api_ip6_address_t', - 'addresses', 0, 'num']]) - b = message_array_vla.pack({'addresses': ["1::1", "2::2"], 'num': 2}) + message_array_vla = VPPMessage( + "address_array_vla", + [["u32", "num"], ["vl_api_ip6_address_t", "addresses", 0, "num"]], + ) + b = message_array_vla.pack({"addresses": ["1::1", "2::2"], "num": 2}) self.assertEqual(len(b), 36) - message_array4 = VPPMessage('address_array4', - [['vl_api_ip4_address_t', - 'addresses', 2]]) - b = message_array4.pack({'addresses': ["1.1.1.1", "2.2.2.2"]}) + message_array4 = VPPMessage( + "address_array4", [["vl_api_ip4_address_t", "addresses", 2]] + ) + b = message_array4.pack({"addresses": ["1.1.1.1", "2.2.2.2"]}) self.assertEqual(len(b), 8) - b = message_array4.pack({'addresses': [IPv4Address(u"1.1.1.1"), - "2.2.2.2"]}) + b = message_array4.pack({"addresses": [IPv4Address("1.1.1.1"), "2.2.2.2"]}) self.assertEqual(len(b), 8) - message = VPPMessage('address', [['vl_api_address_t', 'address']]) - b = message.pack({'address': '1::1'}) + message = VPPMessage("address", [["vl_api_address_t", "address"]]) + b = message.pack({"address": "1::1"}) self.assertEqual(len(b), 20) - b = message.pack({'address': '1.1.1.1'}) + b = message.pack({"address": "1.1.1.1"}) self.assertEqual(len(b), 20) - message = VPPMessage('prefix', [['vl_api_prefix_t', 'prefix']]) - b = message.pack({'prefix': '1::1/130'}) + message = VPPMessage("prefix", [["vl_api_prefix_t", "prefix"]]) + b = message.pack({"prefix": "1::1/130"}) self.assertEqual(len(b), 21) - b = message.pack({'prefix': IPv6Network(u'1::/119')}) + b = message.pack({"prefix": IPv6Network("1::/119")}) self.assertEqual(len(b), 21) - b = message.pack({'prefix': IPv4Network(u'1.1.0.0/16')}) + b = message.pack({"prefix": IPv4Network("1.1.0.0/16")}) self.assertEqual(len(b), 21) def test_zero_vla(self): - '''Default zero'ed out for VLAs''' - list = VPPType('vl_api_list_t', - [['u8', 'count', 10]]) + """Default zero'ed out for VLAs""" + list = VPPType("vl_api_list_t", [["u8", "count", 10]]) # Define an embedded VLA type - valist = VPPType('vl_api_valist_t', - [['u8', 'count'], - ['u8', 'string', 0, 'count']]) + valist = VPPType( + "vl_api_valist_t", [["u8", "count"], ["u8", "string", 0, "count"]] + ) # Define a message - vamessage = VPPMessage('vamsg', - [['vl_api_valist_t', 'valist'], - ['u8', 'is_something']]) + vamessage = VPPMessage( + "vamsg", [["vl_api_valist_t", "valist"], ["u8", "is_something"]] + ) - message = VPPMessage('msg', - [['vl_api_list_t', 'list'], - ['u8', 'is_something']]) + message = VPPMessage("msg", [["vl_api_list_t", "list"], ["u8", "is_something"]]) # Pack message without VLA specified - b = message.pack({'is_something': 1}) - b = vamessage.pack({'is_something': 1}) + b = message.pack({"is_something": 1}) + b = vamessage.pack({"is_something": 1}) def test_arrays(self): # Test cases @@ -382,254 +380,275 @@ class TestAddType(unittest.TestCase): # 2. Fixed list of variable length sub type # 3. Variable length type # - s = VPPType('str', [['u32', 'length'], - ['u8', 'string', 0, 'length']]) + s = VPPType("str", [["u32", "length"], ["u8", "string", 0, "length"]]) - ip4 = VPPType('ip4_address', [['u8', 'address', 4]]) - listip4 = VPPType('list_ip4_t', [['ip4_address', 'addresses', 4]]) - valistip4 = VPPType('list_ip4_t', - [['u8', 'count'], - ['ip4_address', 'addresses', 0, 'count']]) + ip4 = VPPType("ip4_address", [["u8", "address", 4]]) + listip4 = VPPType("list_ip4_t", [["ip4_address", "addresses", 4]]) + valistip4 = VPPType( + "list_ip4_t", [["u8", "count"], ["ip4_address", "addresses", 0, "count"]] + ) - valistip4_legacy = VPPType('list_ip4_t', - [['u8', 'foo'], - ['ip4_address', 'addresses', 0]]) + valistip4_legacy = VPPType( + "list_ip4_t", [["u8", "foo"], ["ip4_address", "addresses", 0]] + ) addresses = [] for i in range(4): - addresses.append({'address': inet_pton(AF_INET, '2.2.2.2')}) - b = listip4.pack({'addresses': addresses}) + addresses.append({"address": inet_pton(AF_INET, "2.2.2.2")}) + b = listip4.pack({"addresses": addresses}) self.assertEqual(len(b), 16) nt, size = listip4.unpack(b) - self.assertEqual(nt.addresses[0].address, - inet_pton(AF_INET, '2.2.2.2')) + self.assertEqual(nt.addresses[0].address, inet_pton(AF_INET, "2.2.2.2")) - b = valistip4.pack({'count': len(addresses), 'addresses': addresses}) + b = valistip4.pack({"count": len(addresses), "addresses": addresses}) self.assertEqual(len(b), 17) nt, size = valistip4.unpack(b) self.assertEqual(nt.count, 4) - self.assertEqual(nt.addresses[0].address, - inet_pton(AF_INET, '2.2.2.2')) + self.assertEqual(nt.addresses[0].address, inet_pton(AF_INET, "2.2.2.2")) - b = valistip4_legacy.pack({'foo': 1, 'addresses': addresses}) + b = valistip4_legacy.pack({"foo": 1, "addresses": addresses}) self.assertEqual(len(b), 17) nt, size = valistip4_legacy.unpack(b) self.assertEqual(len(nt.addresses), 4) - self.assertEqual(nt.addresses[0].address, - inet_pton(AF_INET, '2.2.2.2')) + self.assertEqual(nt.addresses[0].address, inet_pton(AF_INET, "2.2.2.2")) - string = 'foobar foobar' - b = s.pack({'length': len(string), 'string': string.encode('utf-8')}) + string = "foobar foobar" + b = s.pack({"length": len(string), "string": string.encode("utf-8")}) nt, size = s.unpack(b) self.assertEqual(len(b), size) def test_string(self): - s = VPPType('str', [['u32', 'length'], - ['u8', 'string', 0, 'length']]) + s = VPPType("str", [["u32", "length"], ["u8", "string", 0, "length"]]) - string = '' - b = s.pack({'length': len(string), 'string': string.encode('utf-8')}) + string = "" + b = s.pack({"length": len(string), "string": string.encode("utf-8")}) nt, size = s.unpack(b) self.assertEqual(len(b), size) def test_message(self): - foo = VPPMessage('foo', [['u16', '_vl_msg_id'], - ['u8', 'client_index'], - ['u8', 'something'], - {"crc": "0x559b9f3c"}]) - b = foo.pack({'_vl_msg_id': 1, 'client_index': 5, - 'something': 200}) + foo = VPPMessage( + "foo", + [ + ["u16", "_vl_msg_id"], + ["u8", "client_index"], + ["u8", "something"], + {"crc": "0x559b9f3c"}, + ], + ) + b = foo.pack({"_vl_msg_id": 1, "client_index": 5, "something": 200}) nt, size = foo.unpack(b) self.assertEqual(len(b), size) self.assertEqual(nt.something, 200) def test_abf(self): - fib_mpls_label = VPPType('vl_api_fib_mpls_label_t', - [['u8', 'is_uniform'], - ['u32', 'label'], - ['u8', 'ttl'], - ['u8', 'exp']]) + fib_mpls_label = VPPType( + "vl_api_fib_mpls_label_t", + [["u8", "is_uniform"], ["u32", "label"], ["u8", "ttl"], ["u8", "exp"]], + ) - label_stack = {'is_uniform': 0, - 'label': 0, - 'ttl': 0, - 'exp': 0} + label_stack = {"is_uniform": 0, "label": 0, "ttl": 0, "exp": 0} b = fib_mpls_label.pack(label_stack) self.assertEqual(len(b), 7) - fib_path = VPPType('vl_api_fib_path_t', - [['u32', 'sw_if_index'], - ['u32', 'table_id'], - ['u8', 'weight'], - ['u8', 'preference'], - ['u8', 'is_local'], - ['u8', 'is_drop'], - ['u8', 'is_udp_encap'], - ['u8', 'is_unreach'], - ['u8', 'is_prohibit'], - ['u8', 'is_resolve_host'], - ['u8', 'is_resolve_attached'], - ['u8', 'is_dvr'], - ['u8', 'is_source_lookup'], - ['u8', 'afi'], - ['u8', 'next_hop', 16], - ['u32', 'next_hop_id'], - ['u32', 'rpf_id'], - ['u32', 'via_label'], - ['u8', 'n_labels'], - ['vl_api_fib_mpls_label_t', 'label_stack', 16]]) + fib_path = VPPType( + "vl_api_fib_path_t", + [ + ["u32", "sw_if_index"], + ["u32", "table_id"], + ["u8", "weight"], + ["u8", "preference"], + ["u8", "is_local"], + ["u8", "is_drop"], + ["u8", "is_udp_encap"], + ["u8", "is_unreach"], + ["u8", "is_prohibit"], + ["u8", "is_resolve_host"], + ["u8", "is_resolve_attached"], + ["u8", "is_dvr"], + ["u8", "is_source_lookup"], + ["u8", "afi"], + ["u8", "next_hop", 16], + ["u32", "next_hop_id"], + ["u32", "rpf_id"], + ["u32", "via_label"], + ["u8", "n_labels"], + ["vl_api_fib_mpls_label_t", "label_stack", 16], + ], + ) label_stack_list = [] for i in range(16): label_stack_list.append(label_stack) - paths = {'is_udp_encap': 0, - 'next_hop': b'\x10\x02\x02\xac', - 'table_id': 0, - 'afi': 0, - 'weight': 1, - 'next_hop_id': 4294967295, - 'label_stack': label_stack_list, - 'n_labels': 0, - 'sw_if_index': 4294967295, - 'preference': 0} + paths = { + "is_udp_encap": 0, + "next_hop": b"\x10\x02\x02\xac", + "table_id": 0, + "afi": 0, + "weight": 1, + "next_hop_id": 4294967295, + "label_stack": label_stack_list, + "n_labels": 0, + "sw_if_index": 4294967295, + "preference": 0, + } b = fib_path.pack(paths) - self.assertEqual(len(b), (7*16) + 49) + self.assertEqual(len(b), (7 * 16) + 49) - abf_policy = VPPType('vl_api_abf_policy_t', - [['u32', 'policy_id'], - ['u32', 'acl_index'], - ['u8', 'n_paths'], - ['vl_api_fib_path_t', 'paths', 0, 'n_paths']]) + abf_policy = VPPType( + "vl_api_abf_policy_t", + [ + ["u32", "policy_id"], + ["u32", "acl_index"], + ["u8", "n_paths"], + ["vl_api_fib_path_t", "paths", 0, "n_paths"], + ], + ) - policy = { - 'n_paths': 1, - 'paths': [paths], - 'acl_index': 0, - 'policy_id': 10} + policy = {"n_paths": 1, "paths": [paths], "acl_index": 0, "policy_id": 10} b = abf_policy.pack(policy) - self.assertEqual(len(b), (7*16) + 49 + 9) - - abf_policy_add_del = VPPMessage('abf_policy_add_del', - [['u16', '_vl_msg_id'], - ['u32', 'client_index'], - ['u32', 'context'], - ['u8', 'is_add'], - ['vl_api_abf_policy_t', 'policy']]) - - b = abf_policy_add_del.pack({'is_add': 1, - 'context': 66, - '_vl_msg_id': 1066, - 'policy': policy}) + self.assertEqual(len(b), (7 * 16) + 49 + 9) + + abf_policy_add_del = VPPMessage( + "abf_policy_add_del", + [ + ["u16", "_vl_msg_id"], + ["u32", "client_index"], + ["u32", "context"], + ["u8", "is_add"], + ["vl_api_abf_policy_t", "policy"], + ], + ) + + b = abf_policy_add_del.pack( + {"is_add": 1, "context": 66, "_vl_msg_id": 1066, "policy": policy} + ) nt, size = abf_policy_add_del.unpack(b) - self.assertEqual(nt.policy.paths[0].next_hop, - b'\x10\x02\x02\xac\x00\x00\x00\x00' - b'\x00\x00\x00\x00\x00\x00\x00\x00') + self.assertEqual( + nt.policy.paths[0].next_hop, + b"\x10\x02\x02\xac\x00\x00\x00\x00" b"\x00\x00\x00\x00\x00\x00\x00\x00", + ) def test_bier(self): - bier_table_id = VPPType('vl_api_bier_table_id_t', - [['u8', 'bt_set'], - ['u8', 'bt_sub_domain'], - ['u8', 'bt_hdr_len_id']]) + bier_table_id = VPPType( + "vl_api_bier_table_id_t", + [["u8", "bt_set"], ["u8", "bt_sub_domain"], ["u8", "bt_hdr_len_id"]], + ) - bier_imp_add = VPPMessage('bier_imp_add', - [['u32', 'client_index'], - ['u32', 'context'], - ['vl_api_bier_table_id_t', 'bi_tbl_id'], - ['u16', 'bi_src'], - ['u8', 'bi_n_bytes'], - ['u8', 'bi_bytes', 0, 'bi_n_bytes']]) + bier_imp_add = VPPMessage( + "bier_imp_add", + [ + ["u32", "client_index"], + ["u32", "context"], + ["vl_api_bier_table_id_t", "bi_tbl_id"], + ["u16", "bi_src"], + ["u8", "bi_n_bytes"], + ["u8", "bi_bytes", 0, "bi_n_bytes"], + ], + ) - table_id = {'bt_set': 0, - 'bt_sub_domain': 0, - 'bt_hdr_len_id': 0} + table_id = {"bt_set": 0, "bt_sub_domain": 0, "bt_hdr_len_id": 0} - bibytes = b'foobar' + bibytes = b"foobar" - b = bier_imp_add.pack({'bi_tbl_id': table_id, - 'bi_n_bytes': len(bibytes), - 'bi_bytes': bibytes}) + b = bier_imp_add.pack( + {"bi_tbl_id": table_id, "bi_n_bytes": len(bibytes), "bi_bytes": bibytes} + ) self.assertEqual(len(b), 20) def test_lisp(self): - VPPEnumType('vl_api_eid_type_t', - [["EID_TYPE_API_PREFIX", 0], - ["EID_TYPE_API_MAC", 1], - ["EID_TYPE_API_NSH", 2], - {"enumtype": "u32"}]) - - VPPTypeAlias('vl_api_mac_address_t', {'type': 'u8', - 'length': 6}) - - VPPType('vl_api_nsh_t', - [["u32", "spi"], - ["u8", "si"]]) - - VPPEnumType('vl_api_address_family_t', [["ADDRESS_IP4", 0], - ["ADDRESS_IP6", 1], - {"enumtype": "u32"}]) - VPPTypeAlias('vl_api_ip4_address_t', {'type': 'u8', - 'length': 4}) - VPPTypeAlias('vl_api_ip6_address_t', {'type': 'u8', - 'length': 16}) - VPPUnionType('vl_api_address_union_t', - [["vl_api_ip4_address_t", "ip4"], - ["vl_api_ip6_address_t", "ip6"]]) - - VPPType('vl_api_address_t', - [['vl_api_address_family_t', 'af'], - ['vl_api_address_union_t', 'un']]) - - VPPType('vl_api_prefix_t', - [['vl_api_address_t', 'address'], - ['u8', 'len']]) - - VPPUnionType('vl_api_eid_address_t', - [["vl_api_prefix_t", "prefix"], - ["vl_api_mac_address_t", "mac"], - ["vl_api_nsh_t", "nsh"]]) - - eid = VPPType('vl_api_eid_t', - [["vl_api_eid_type_t", "type"], - ["vl_api_eid_address_t", "address"]]) - - b = eid.pack({'type':1, - 'address': { - 'mac': MACAddress('aa:bb:cc:dd:ee:ff')}}) + VPPEnumType( + "vl_api_eid_type_t", + [ + ["EID_TYPE_API_PREFIX", 0], + ["EID_TYPE_API_MAC", 1], + ["EID_TYPE_API_NSH", 2], + {"enumtype": "u32"}, + ], + ) + + VPPTypeAlias("vl_api_mac_address_t", {"type": "u8", "length": 6}) + + VPPType("vl_api_nsh_t", [["u32", "spi"], ["u8", "si"]]) + + VPPEnumType( + "vl_api_address_family_t", + [["ADDRESS_IP4", 0], ["ADDRESS_IP6", 1], {"enumtype": "u32"}], + ) + VPPTypeAlias("vl_api_ip4_address_t", {"type": "u8", "length": 4}) + VPPTypeAlias("vl_api_ip6_address_t", {"type": "u8", "length": 16}) + VPPUnionType( + "vl_api_address_union_t", + [["vl_api_ip4_address_t", "ip4"], ["vl_api_ip6_address_t", "ip6"]], + ) + + VPPType( + "vl_api_address_t", + [["vl_api_address_family_t", "af"], ["vl_api_address_union_t", "un"]], + ) + + VPPType("vl_api_prefix_t", [["vl_api_address_t", "address"], ["u8", "len"]]) + + VPPUnionType( + "vl_api_eid_address_t", + [ + ["vl_api_prefix_t", "prefix"], + ["vl_api_mac_address_t", "mac"], + ["vl_api_nsh_t", "nsh"], + ], + ) + + eid = VPPType( + "vl_api_eid_t", + [["vl_api_eid_type_t", "type"], ["vl_api_eid_address_t", "address"]], + ) + + b = eid.pack({"type": 1, "address": {"mac": MACAddress("aa:bb:cc:dd:ee:ff")}}) self.assertEqual(len(b), 25) nt, size = eid.unpack(b) - self.assertEqual(str(nt.address.mac), 'aa:bb:cc:dd:ee:ff') + self.assertEqual(str(nt.address.mac), "aa:bb:cc:dd:ee:ff") self.assertIsNone(nt.address.prefix) class TestVppSerializerLogging(unittest.TestCase): - def test_logger(self): # test logger name 'vpp_papi.serializer' with self.assertRaises(VPPSerializerValueError) as ctx: - with self.assertLogs('vpp_papi.serializer', level='DEBUG') as cm: - u = VPPUnionType('vl_api_eid_address_t', - [["vl_api_prefix_t", "prefix"], - ["vl_api_mac_address_t", "mac"], - ["vl_api_nsh_t", "nsh"]]) - self.assertEqual(cm.output, ["DEBUG:vpp_papi.serializer:Unknown union type vl_api_prefix_t"]) + with self.assertLogs("vpp_papi.serializer", level="DEBUG") as cm: + u = VPPUnionType( + "vl_api_eid_address_t", + [ + ["vl_api_prefix_t", "prefix"], + ["vl_api_mac_address_t", "mac"], + ["vl_api_nsh_t", "nsh"], + ], + ) + self.assertEqual( + cm.output, ["DEBUG:vpp_papi.serializer:Unknown union type vl_api_prefix_t"] + ) # test parent logger name 'vpp_papi' with self.assertRaises(VPPSerializerValueError) as ctx: - with self.assertLogs('vpp_papi', level='DEBUG') as cm: - u = VPPUnionType('vl_api_eid_address_t', - [["vl_api_prefix_t", "prefix"], - ["vl_api_mac_address_t", "mac"], - ["vl_api_nsh_t", "nsh"]]) - self.assertEqual(cm.output, ["DEBUG:vpp_papi.serializer:Unknown union type vl_api_prefix_t"]) - - -if __name__ == '__main__': + with self.assertLogs("vpp_papi", level="DEBUG") as cm: + u = VPPUnionType( + "vl_api_eid_address_t", + [ + ["vl_api_prefix_t", "prefix"], + ["vl_api_mac_address_t", "mac"], + ["vl_api_nsh_t", "nsh"], + ], + ) + self.assertEqual( + cm.output, ["DEBUG:vpp_papi.serializer:Unknown union type vl_api_prefix_t"] + ) + + +if __name__ == "__main__": unittest.main() diff --git a/src/vpp-api/python/vpp_papi/vpp_format.py b/src/vpp-api/python/vpp_papi/vpp_format.py index 0b85eb4fcb6..f80a781c753 100644 --- a/src/vpp-api/python/vpp_papi/vpp_format.py +++ b/src/vpp-api/python/vpp_papi/vpp_format.py @@ -25,8 +25,8 @@ ADDRESS_IP6 = 1 def verify_enum_hint(e): - return (e.ADDRESS_IP4.value == ADDRESS_IP4) and\ - (e.ADDRESS_IP6.value == ADDRESS_IP6) + return (e.ADDRESS_IP4.value == ADDRESS_IP4) and (e.ADDRESS_IP6.value == ADDRESS_IP6) + # # Type conversion for input arguments and return values @@ -35,146 +35,128 @@ def verify_enum_hint(e): def format_vl_api_address_t(args): try: - return {'un': {'ip6': inet_pton(AF_INET6, args)}, - 'af': ADDRESS_IP6} + return {"un": {"ip6": inet_pton(AF_INET6, args)}, "af": ADDRESS_IP6} # PY2: raises socket.error # PY3: raises OSError except (socket.error, OSError): - return {'un': {'ip4': inet_pton(AF_INET, args)}, - 'af': ADDRESS_IP4} + return {"un": {"ip4": inet_pton(AF_INET, args)}, "af": ADDRESS_IP4} def format_vl_api_prefix_t(args): if isinstance(args, (ipaddress.IPv4Network, ipaddress.IPv6Network)): - return {'address': format_vl_api_address_t( - str(args.network_address)), - 'len': int(args.prefixlen)} - p, length = args.split('/') - return {'address': format_vl_api_address_t(p), - 'len': int(length)} + return { + "address": format_vl_api_address_t(str(args.network_address)), + "len": int(args.prefixlen), + } + p, length = args.split("/") + return {"address": format_vl_api_address_t(p), "len": int(length)} def format_vl_api_address_with_prefix_t(args): if isinstance(args, (ipaddress.IPv4Interface, ipaddress.IPv6Interface)): - return {'address': format_vl_api_address_t( - str(args.network_address)), - 'len': int(args.prefixlen)} - p, length = args.split('/') - return {'address': format_vl_api_address_t(p), - 'len': int(length)} + return { + "address": format_vl_api_address_t(str(args.network_address)), + "len": int(args.prefixlen), + } + p, length = args.split("/") + return {"address": format_vl_api_address_t(p), "len": int(length)} def format_vl_api_ip6_prefix_t(args): if isinstance(args, ipaddress.IPv6Network): - return {'address': args.network_address.packed, - 'len': int(args.prefixlen)} - p, length = args.split('/') - return {'address': inet_pton(AF_INET6, p), - 'len': int(length)} + return {"address": args.network_address.packed, "len": int(args.prefixlen)} + p, length = args.split("/") + return {"address": inet_pton(AF_INET6, p), "len": int(length)} def format_vl_api_ip6_address_with_prefix_t(args): if isinstance(args, ipaddress.IPv6Interface): - return {'address': args.network_address.packed, - 'len': int(args.prefixlen)} - p, length = args.split('/') - return {'address': inet_pton(AF_INET6, p), - 'len': int(length)} + return {"address": args.network_address.packed, "len": int(args.prefixlen)} + p, length = args.split("/") + return {"address": inet_pton(AF_INET6, p), "len": int(length)} def format_vl_api_ip4_prefix_t(args): if isinstance(args, ipaddress.IPv4Network): - return {'address': args.network_address.packed, - 'len': int(args.prefixlen)} - p, length = args.split('/') - return {'address': inet_pton(AF_INET, p), - 'len': int(length)} + return {"address": args.network_address.packed, "len": int(args.prefixlen)} + p, length = args.split("/") + return {"address": inet_pton(AF_INET, p), "len": int(length)} def format_vl_api_ip4_address_with_prefix_t(args): if isinstance(args, ipaddress.IPv4Interface): - return {'address': args.network_address.packed, - 'len': int(args.prefixlen)} - p, length = args.split('/') - return {'address': inet_pton(AF_INET, p), - 'len': int(length)} + return {"address": args.network_address.packed, "len": int(args.prefixlen)} + p, length = args.split("/") + return {"address": inet_pton(AF_INET, p), "len": int(length)} conversion_table = { - 'vl_api_ip6_address_t': - { - 'IPv6Address': lambda o: o.packed, - 'str': lambda s: inet_pton(AF_INET6, s) + "vl_api_ip6_address_t": { + "IPv6Address": lambda o: o.packed, + "str": lambda s: inet_pton(AF_INET6, s), + }, + "vl_api_ip4_address_t": { + "IPv4Address": lambda o: o.packed, + "str": lambda s: inet_pton(AF_INET, s), }, - 'vl_api_ip4_address_t': - { - 'IPv4Address': lambda o: o.packed, - 'str': lambda s: inet_pton(AF_INET, s) + "vl_api_ip6_prefix_t": { + "IPv6Network": lambda o: { + "address": o.network_address.packed, + "len": o.prefixlen, + }, + "str": lambda s: format_vl_api_ip6_prefix_t(s), }, - 'vl_api_ip6_prefix_t': - { - 'IPv6Network': lambda o: {'address': o.network_address.packed, - 'len': o.prefixlen}, - 'str': lambda s: format_vl_api_ip6_prefix_t(s) + "vl_api_ip4_prefix_t": { + "IPv4Network": lambda o: { + "address": o.network_address.packed, + "len": o.prefixlen, + }, + "str": lambda s: format_vl_api_ip4_prefix_t(s), }, - 'vl_api_ip4_prefix_t': - { - 'IPv4Network': lambda o: {'address': o.network_address.packed, - 'len': o.prefixlen}, - 'str': lambda s: format_vl_api_ip4_prefix_t(s) + "vl_api_address_t": { + "IPv4Address": lambda o: {"af": ADDRESS_IP4, "un": {"ip4": o.packed}}, + "IPv6Address": lambda o: {"af": ADDRESS_IP6, "un": {"ip6": o.packed}}, + "str": lambda s: format_vl_api_address_t(s), }, - 'vl_api_address_t': - { - 'IPv4Address': lambda o: {'af': ADDRESS_IP4, 'un': {'ip4': o.packed}}, - 'IPv6Address': lambda o: {'af': ADDRESS_IP6, 'un': {'ip6': o.packed}}, - 'str': lambda s: format_vl_api_address_t(s) + "vl_api_prefix_t": { + "IPv4Network": lambda o: { + "address": {"af": ADDRESS_IP4, "un": {"ip4": o.network_address.packed}}, + "len": o.prefixlen, + }, + "IPv6Network": lambda o: { + "address": {"af": ADDRESS_IP6, "un": {"ip6": o.network_address.packed}}, + "len": o.prefixlen, + }, + "str": lambda s: format_vl_api_prefix_t(s), }, - 'vl_api_prefix_t': - { - 'IPv4Network': lambda o: {'address': - {'af': ADDRESS_IP4, 'un': - {'ip4': o.network_address.packed}}, - 'len': o.prefixlen}, - 'IPv6Network': lambda o: {'address': - {'af': ADDRESS_IP6, 'un': - {'ip6': o.network_address.packed}}, - 'len': o.prefixlen}, - 'str': lambda s: format_vl_api_prefix_t(s) + "vl_api_address_with_prefix_t": { + "IPv4Interface": lambda o: { + "address": {"af": ADDRESS_IP4, "un": {"ip4": o.packed}}, + "len": o.network.prefixlen, + }, + "IPv6Interface": lambda o: { + "address": {"af": ADDRESS_IP6, "un": {"ip6": o.packed}}, + "len": o.network.prefixlen, + }, + "str": lambda s: format_vl_api_address_with_prefix_t(s), }, - 'vl_api_address_with_prefix_t': - { - 'IPv4Interface': lambda o: {'address': - {'af': ADDRESS_IP4, 'un': - {'ip4': o.packed}}, - 'len': o.network.prefixlen}, - 'IPv6Interface': lambda o: {'address': - {'af': ADDRESS_IP6, 'un': - {'ip6': o.packed}}, - 'len': o.network.prefixlen}, - 'str': lambda s: format_vl_api_address_with_prefix_t(s) + "vl_api_ip4_address_with_prefix_t": { + "IPv4Interface": lambda o: {"address": o.packed, "len": o.network.prefixlen}, + "str": lambda s: format_vl_api_ip4_address_with_prefix_t(s), }, - 'vl_api_ip4_address_with_prefix_t': - { - 'IPv4Interface': lambda o: {'address': o.packed, - 'len': o.network.prefixlen}, - 'str': lambda s: format_vl_api_ip4_address_with_prefix_t(s) + "vl_api_ip6_address_with_prefix_t": { + "IPv6Interface": lambda o: {"address": o.packed, "len": o.network.prefixlen}, + "str": lambda s: format_vl_api_ip6_address_with_prefix_t(s), }, - 'vl_api_ip6_address_with_prefix_t': - { - 'IPv6Interface': lambda o: {'address': o.packed, - 'len': o.network.prefixlen}, - 'str': lambda s: format_vl_api_ip6_address_with_prefix_t(s) + "vl_api_mac_address_t": { + "MACAddress": lambda o: o.packed, + "str": lambda s: macaddress.mac_pton(s), }, - 'vl_api_mac_address_t': - { - 'MACAddress': lambda o: o.packed, - 'str': lambda s: macaddress.mac_pton(s) + "vl_api_timestamp_t": { + "datetime.datetime": lambda o: ( + o - datetime.datetime(1970, 1, 1) + ).total_seconds() }, - 'vl_api_timestamp_t': - { - 'datetime.datetime': lambda o: - (o - datetime.datetime(1970, 1, 1)).total_seconds() - } } @@ -197,7 +179,7 @@ def unformat_api_prefix_t(o): return ipaddress.IPv4Network((o.address, o.len), False) if isinstance(o.address, ipaddress.IPv6Address): return ipaddress.IPv6Network((o.address, o.len), False) - raise ValueError('Unknown instance {}', format(o)) + raise ValueError("Unknown instance {}", format(o)) def unformat_api_address_with_prefix_t(o): @@ -217,16 +199,20 @@ def unformat_api_ip6_address_with_prefix_t(o): conversion_unpacker_table = { - 'vl_api_ip6_address_t': lambda o: ipaddress.IPv6Address(o), - 'vl_api_ip6_prefix_t': lambda o: ipaddress.IPv6Network((o.address, o.len)), - 'vl_api_ip4_address_t': lambda o: ipaddress.IPv4Address(o), - 'vl_api_ip4_prefix_t': lambda o: ipaddress.IPv4Network((o.address, o.len)), - 'vl_api_address_t': lambda o: unformat_api_address_t(o), - 'vl_api_prefix_t': lambda o: unformat_api_prefix_t(o), - 'vl_api_address_with_prefix_t': lambda o: unformat_api_address_with_prefix_t(o), - 'vl_api_ip4_address_with_prefix_t': lambda o: unformat_api_ip4_address_with_prefix_t(o), - 'vl_api_ip6_address_with_prefix_t': lambda o: unformat_api_ip6_address_with_prefix_t(o), - 'vl_api_mac_address_t': lambda o: macaddress.MACAddress(o), - 'vl_api_timestamp_t': lambda o: datetime.datetime.fromtimestamp(o), - 'vl_api_timedelta_t': lambda o: datetime.timedelta(seconds=o), + "vl_api_ip6_address_t": lambda o: ipaddress.IPv6Address(o), + "vl_api_ip6_prefix_t": lambda o: ipaddress.IPv6Network((o.address, o.len)), + "vl_api_ip4_address_t": lambda o: ipaddress.IPv4Address(o), + "vl_api_ip4_prefix_t": lambda o: ipaddress.IPv4Network((o.address, o.len)), + "vl_api_address_t": lambda o: unformat_api_address_t(o), + "vl_api_prefix_t": lambda o: unformat_api_prefix_t(o), + "vl_api_address_with_prefix_t": lambda o: unformat_api_address_with_prefix_t(o), + "vl_api_ip4_address_with_prefix_t": lambda o: unformat_api_ip4_address_with_prefix_t( + o + ), + "vl_api_ip6_address_with_prefix_t": lambda o: unformat_api_ip6_address_with_prefix_t( + o + ), + "vl_api_mac_address_t": lambda o: macaddress.MACAddress(o), + "vl_api_timestamp_t": lambda o: datetime.datetime.fromtimestamp(o), + "vl_api_timedelta_t": lambda o: datetime.timedelta(seconds=o), } diff --git a/src/vpp-api/python/vpp_papi/vpp_papi.py b/src/vpp-api/python/vpp_papi/vpp_papi.py index 3465f503e9e..1e5d23e59b7 100644 --- a/src/vpp-api/python/vpp_papi/vpp_papi.py +++ b/src/vpp-api/python/vpp_papi/vpp_papi.py @@ -30,13 +30,14 @@ import fnmatch import weakref import atexit import time -from . vpp_format import verify_enum_hint -from . vpp_serializer import VPPType, VPPEnumType, VPPEnumFlagType, VPPUnionType -from . vpp_serializer import VPPMessage, vpp_get_type, VPPTypeAlias +from .vpp_format import verify_enum_hint +from .vpp_serializer import VPPType, VPPEnumType, VPPEnumFlagType, VPPUnionType +from .vpp_serializer import VPPMessage, vpp_get_type, VPPTypeAlias try: import VppTransport except ModuleNotFoundError: + class V: """placeholder for VppTransport as the implementation is dependent on VPPAPIClient's initialization values @@ -44,15 +45,22 @@ except ModuleNotFoundError: VppTransport = V -from . vpp_transport_socket import VppTransport +from .vpp_transport_socket import VppTransport -logger = logging.getLogger('vpp_papi') +logger = logging.getLogger("vpp_papi") logger.addHandler(logging.NullHandler()) -__all__ = ('FuncWrapper', 'VppApiDynamicMethodHolder', - 'VppEnum', 'VppEnumType', 'VppEnumFlag', - 'VPPIOError', 'VPPRuntimeError', 'VPPValueError', - 'VPPApiClient', ) +__all__ = ( + "FuncWrapper", + "VppApiDynamicMethodHolder", + "VppEnum", + "VppEnumType", + "VppEnumFlag", + "VPPIOError", + "VPPRuntimeError", + "VPPValueError", + "VPPApiClient", +) def metaclass(metaclass): @@ -83,7 +91,7 @@ def vpp_atexit(vpp_weakref): """Clean up VPP connection on shutdown.""" vpp_instance = vpp_weakref() if vpp_instance and vpp_instance.transport.connected: - logger.debug('Cleaning up VPP on exit') + logger.debug("Cleaning up VPP on exit") vpp_instance.disconnect() @@ -98,9 +106,9 @@ def add_convenience_methods(): def _vapi_af_name(self): if 6 == self._version: - return 'ip6' + return "ip6" if 4 == self._version: - return 'ip4' + return "ip4" raise ValueError("Invalid _version.") ipaddress._IPAddressBase.vapi_af = property(_vapi_af) @@ -121,7 +129,7 @@ class FuncWrapper: return self._func(**kwargs) def __repr__(self): - return '<FuncWrapper(func=<%s(%s)>)>' % (self.__name__, self.__doc__) + return "<FuncWrapper(func=<%s(%s)>)>" % (self.__name__, self.__doc__) class VPPApiError(Exception): @@ -161,7 +169,8 @@ class VPPApiJSONFiles: # perhaps we're in the 'src/scripts' or 'src/vpp-api/python' dir; # in which case, plot a course to likely places in the src tree import __main__ as main - if hasattr(main, '__file__'): + + if hasattr(main, "__file__"): # get the path of the calling script localdir = os.path.dirname(os.path.realpath(main.__file__)) else: @@ -171,7 +180,7 @@ class VPPApiJSONFiles: def dmatch(dir): """Match dir against right-hand components of the script dir""" - d = dir.split('/') # param 'dir' assumes a / separator + d = dir.split("/") # param 'dir' assumes a / separator length = len(d) return len(localdir_s) > length and localdir_s[-length:] == d @@ -180,43 +189,45 @@ class VPPApiJSONFiles: 'variant' (typically '' or '_debug')""" # Since 'core' and 'plugin' files are staged # in separate directories, we target the parent dir. - return os.path.sep.join(( - srcdir, - 'build-root', - 'install-vpp%s-native' % variant, - 'vpp', - 'share', - 'vpp', - 'api', - )) + return os.path.sep.join( + ( + srcdir, + "build-root", + "install-vpp%s-native" % variant, + "vpp", + "share", + "vpp", + "api", + ) + ) srcdir = None - if dmatch('src/scripts'): + if dmatch("src/scripts"): srcdir = os.path.sep.join(localdir_s[:-2]) - elif dmatch('src/vpp-api/python'): + elif dmatch("src/vpp-api/python"): srcdir = os.path.sep.join(localdir_s[:-3]) - elif dmatch('test'): + elif dmatch("test"): # we're apparently running tests srcdir = os.path.sep.join(localdir_s[:-1]) if srcdir: # we're in the source tree, try both the debug and release # variants. - dirs.append(sdir(srcdir, '_debug')) - dirs.append(sdir(srcdir, '')) + dirs.append(sdir(srcdir, "_debug")) + dirs.append(sdir(srcdir, "")) # Test for staged copies of the scripts # For these, since we explicitly know if we're running a debug versus # release variant, target only the relevant directory - if dmatch('build-root/install-vpp_debug-native/vpp/bin'): + if dmatch("build-root/install-vpp_debug-native/vpp/bin"): srcdir = os.path.sep.join(localdir_s[:-4]) - dirs.append(sdir(srcdir, '_debug')) - if dmatch('build-root/install-vpp-native/vpp/bin'): + dirs.append(sdir(srcdir, "_debug")) + if dmatch("build-root/install-vpp-native/vpp/bin"): srcdir = os.path.sep.join(localdir_s[:-4]) - dirs.append(sdir(srcdir, '')) + dirs.append(sdir(srcdir, "")) # finally, try the location system packages typically install into - dirs.append(os.path.sep.join(('', 'usr', 'share', 'vpp', 'api'))) + dirs.append(os.path.sep.join(("", "usr", "share", "vpp", "api"))) # check the directories for existence; first one wins for dir in dirs: @@ -226,7 +237,7 @@ class VPPApiJSONFiles: return None @classmethod - def find_api_files(cls, api_dir=None, patterns='*'): # -> list + def find_api_files(cls, api_dir=None, patterns="*"): # -> list """Find API definition files from the given directory tree with the given pattern. If no directory is given then find_api_dir() is used to locate one. If no pattern is given then all definition files found @@ -252,9 +263,9 @@ class VPPApiJSONFiles: raise VPPApiError("api_dir cannot be located") if isinstance(patterns, list) or isinstance(patterns, tuple): - patterns = [p.strip() + '.api.json' for p in patterns] + patterns = [p.strip() + ".api.json" for p in patterns] else: - patterns = [p.strip() + '.api.json' for p in patterns.split(",")] + patterns = [p.strip() + ".api.json" for p in patterns.split(",")] api_files = [] for root, dirnames, files in os.walk(api_dir): @@ -281,39 +292,39 @@ class VPPApiJSONFiles: services = {} messages = {} try: - for t in api['enums']: - t[0] = 'vl_api_' + t[0] + '_t' - types[t[0]] = {'type': 'enum', 'data': t} + for t in api["enums"]: + t[0] = "vl_api_" + t[0] + "_t" + types[t[0]] = {"type": "enum", "data": t} except KeyError: pass try: - for t in api['enumflags']: - t[0] = 'vl_api_' + t[0] + '_t' - types[t[0]] = {'type': 'enum', 'data': t} + for t in api["enumflags"]: + t[0] = "vl_api_" + t[0] + "_t" + types[t[0]] = {"type": "enum", "data": t} except KeyError: pass try: - for t in api['unions']: - t[0] = 'vl_api_' + t[0] + '_t' - types[t[0]] = {'type': 'union', 'data': t} + for t in api["unions"]: + t[0] = "vl_api_" + t[0] + "_t" + types[t[0]] = {"type": "union", "data": t} except KeyError: pass try: - for t in api['types']: - t[0] = 'vl_api_' + t[0] + '_t' - types[t[0]] = {'type': 'type', 'data': t} + for t in api["types"]: + t[0] = "vl_api_" + t[0] + "_t" + types[t[0]] = {"type": "type", "data": t} except KeyError: pass try: - for t, v in api['aliases'].items(): - types['vl_api_' + t + '_t'] = {'type': 'alias', 'data': v} + for t, v in api["aliases"].items(): + types["vl_api_" + t + "_t"] = {"type": "alias", "data": v} except KeyError: pass try: - services.update(api['services']) + services.update(api["services"]) except KeyError: pass @@ -321,30 +332,30 @@ class VPPApiJSONFiles: while True: unresolved = {} for k, v in types.items(): - t = v['data'] + t = v["data"] if not vpp_get_type(k): - if v['type'] == 'enum': + if v["type"] == "enum": try: VPPEnumType(t[0], t[1:]) except ValueError: unresolved[k] = v if not vpp_get_type(k): - if v['type'] == 'enumflag': + if v["type"] == "enumflag": try: VPPEnumFlagType(t[0], t[1:]) except ValueError: unresolved[k] = v - elif v['type'] == 'union': + elif v["type"] == "union": try: VPPUnionType(t[0], t[1:]) except ValueError: unresolved[k] = v - elif v['type'] == 'type': + elif v["type"] == "type": try: VPPType(t[0], t[1:]) except ValueError: unresolved[k] = v - elif v['type'] == 'alias': + elif v["type"] == "alias": try: VPPTypeAlias(k, t) except ValueError: @@ -352,17 +363,16 @@ class VPPApiJSONFiles: if len(unresolved) == 0: break if i > 3: - raise VPPValueError('Unresolved type definitions {}' - .format(unresolved)) + raise VPPValueError("Unresolved type definitions {}".format(unresolved)) types = unresolved i += 1 try: - for m in api['messages']: + for m in api["messages"]: try: messages[m[0]] = VPPMessage(m[0], m[1:]) except VPPNotImplementedError: ### OLE FIXME - logger.error('Not implemented error for {}'.format(m[0])) + logger.error("Not implemented error for {}".format(m[0])) except KeyError: pass return messages, services @@ -380,6 +390,7 @@ class VPPApiClient: provides a means to register a callback function to receive these messages in a background thread. """ + apidir = None VPPApiError = VPPApiError VPPRuntimeError = VPPRuntimeError @@ -387,11 +398,18 @@ class VPPApiClient: VPPNotImplementedError = VPPNotImplementedError VPPIOError = VPPIOError - - def __init__(self, *, apifiles=None, testmode=False, async_thread=True, - logger=None, loglevel=None, - read_timeout=5, use_socket=True, - server_address='/run/vpp/api.sock'): + def __init__( + self, + *, + apifiles=None, + testmode=False, + async_thread=True, + logger=None, + loglevel=None, + read_timeout=5, + use_socket=True, + server_address="/run/vpp/api.sock", + ): """Create a VPP API object. apifiles is a list of files containing API @@ -406,7 +424,8 @@ class VPPApiClient: """ if logger is None: logger = logging.getLogger( - "{}.{}".format(__name__, self.__class__.__name__)) + "{}.{}".format(__name__, self.__class__.__name__) + ) if loglevel is not None: logger.setLevel(loglevel) self.logger = logger @@ -415,8 +434,7 @@ class VPPApiClient: self.services = {} self.id_names = [] self.id_msgdef = [] - self.header = VPPType('header', [['u16', 'msgid'], - ['u32', 'client_index']]) + self.header = VPPType("header", [["u16", "msgid"], ["u32", "client_index"]]) self.apifiles = [] self.event_callback = None self.message_queue = queue.Queue() @@ -449,13 +467,13 @@ class VPPApiClient: # Basic sanity check if len(self.messages) == 0 and not testmode: - raise VPPValueError(1, 'Missing JSON message definitions') - if not(verify_enum_hint(VppEnum.vl_api_address_family_t)): - raise VPPRuntimeError("Invalid address family hints. " - "Cannot continue.") + raise VPPValueError(1, "Missing JSON message definitions") + if not (verify_enum_hint(VppEnum.vl_api_address_family_t)): + raise VPPRuntimeError("Invalid address family hints. " "Cannot continue.") - self.transport = VppTransport(self, read_timeout=read_timeout, - server_address=server_address) + self.transport = VppTransport( + self, read_timeout=read_timeout, server_address=server_address + ) # Make sure we allow VPP to clean up the message rings. atexit.register(vpp_atexit, weakref.ref(self)) @@ -466,6 +484,7 @@ class VPPApiClient: class ContextId: """Multiprocessing-safe provider of unique context IDs.""" + def __init__(self): self.context = mp.Value(ctypes.c_uint, 0) self.lock = mp.Lock() @@ -475,6 +494,7 @@ class VPPApiClient: with self.lock: self.context.value += 1 return self.context.value + get_context = ContextId() def get_type(self, name): @@ -487,17 +507,20 @@ class VPPApiClient: return self._api def make_function(self, msg, i, multipart, do_async): - if (do_async): + if do_async: + def f(**kwargs): return self._call_vpp_async(i, msg, **kwargs) + else: + def f(**kwargs): return self._call_vpp(i, msg, multipart, **kwargs) f.__name__ = str(msg.name) - f.__doc__ = ", ".join(["%s %s" % - (msg.fieldtypes[j], k) - for j, k in enumerate(msg.fields)]) + f.__doc__ = ", ".join( + ["%s %s" % (msg.fieldtypes[j], k) for j, k in enumerate(msg.fields)] + ) f.msg = msg return f @@ -507,7 +530,7 @@ class VPPApiClient: self.id_msgdef = [None] * (self.vpp_dictionary_maxid + 1) self._api = VppApiDynamicMethodHolder() for name, msg in self.messages.items(): - n = name + '_' + msg.crc[2:] + n = name + "_" + msg.crc[2:] i = self.transport.get_msg_index(n) if i > 0: self.id_msgdef[i] = msg @@ -518,28 +541,25 @@ class VPPApiClient: f = self.make_function(msg, i, self.services[name], do_async) setattr(self._api, name, FuncWrapper(f)) else: - self.logger.debug( - 'No such message type or failed CRC checksum: %s', n) + self.logger.debug("No such message type or failed CRC checksum: %s", n) - def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen, - do_async): - pfx = chroot_prefix.encode('utf-8') if chroot_prefix else None + def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen, do_async): + pfx = chroot_prefix.encode("utf-8") if chroot_prefix else None - rv = self.transport.connect(name, pfx, - msg_handler, rx_qlen) + rv = self.transport.connect(name, pfx, msg_handler, rx_qlen) if rv != 0: - raise VPPIOError(2, 'Connect failed') + raise VPPIOError(2, "Connect failed") self.vpp_dictionary_maxid = self.transport.msg_table_max_index() self._register_functions(do_async=do_async) # Initialise control ping - crc = self.messages['control_ping'].crc + crc = self.messages["control_ping"].crc self.control_ping_index = self.transport.get_msg_index( - ('control_ping' + '_' + crc[2:])) - self.control_ping_msgdef = self.messages['control_ping'] + ("control_ping" + "_" + crc[2:]) + ) + self.control_ping_msgdef = self.messages["control_ping"] if self.async_thread: - self.event_thread = threading.Thread( - target=self.thread_msg_handler) + self.event_thread = threading.Thread(target=self.thread_msg_handler) self.event_thread.daemon = True self.event_thread.start() else: @@ -556,8 +576,9 @@ class VPPApiClient: client and server. """ msg_handler = self.transport.get_callback(do_async) - return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen, - do_async) + return self.connect_internal( + name, msg_handler, chroot_prefix, rx_qlen, do_async + ) def connect_sync(self, name, chroot_prefix=None, rx_qlen=32): """Attach to VPP in synchronous mode. Application must poll for events. @@ -568,8 +589,7 @@ class VPPApiClient: client and server. """ - return self.connect_internal(name, None, chroot_prefix, rx_qlen, - do_async=False) + return self.connect_internal(name, None, chroot_prefix, rx_qlen, do_async=False) def disconnect(self): """Detach from VPP.""" @@ -590,42 +610,43 @@ class VPPApiClient: # If we have a context, then use the context to find any # request waiting for a reply context = 0 - if hasattr(r, 'context') and r.context > 0: + if hasattr(r, "context") and r.context > 0: context = r.context if context == 0: # No context -> async notification that we feed to the callback self.message_queue.put_nowait(r) else: - raise VPPIOError(2, 'RPC reply message received in event handler') + raise VPPIOError(2, "RPC reply message received in event handler") def has_context(self, msg): if len(msg) < 10: return False - header = VPPType('header_with_context', [['u16', 'msgid'], - ['u32', 'client_index'], - ['u32', 'context']]) + header = VPPType( + "header_with_context", + [["u16", "msgid"], ["u32", "client_index"], ["u32", "context"]], + ) (i, ci, context), size = header.unpack(msg, 0) - if self.id_names[i] == 'rx_thread_exit': + if self.id_names[i] == "rx_thread_exit": return # # Decode message and returns a tuple. # msgobj = self.id_msgdef[i] - if 'context' in msgobj.field_by_name and context >= 0: + if "context" in msgobj.field_by_name and context >= 0: return True return False def decode_incoming_msg(self, msg, no_type_conversion=False): if not msg: - logger.warning('vpp_api.read failed') + logger.warning("vpp_api.read failed") return (i, ci), size = self.header.unpack(msg, 0) - if self.id_names[i] == 'rx_thread_exit': + if self.id_names[i] == "rx_thread_exit": return # @@ -633,7 +654,7 @@ class VPPApiClient: # msgobj = self.id_msgdef[i] if not msgobj: - raise VPPIOError(2, 'Reply message undefined') + raise VPPIOError(2, "Reply message undefined") r, size = msgobj.unpack(msg, ntc=no_type_conversion) return r @@ -654,41 +675,39 @@ class VPPApiClient: def _control_ping(self, context): """Send a ping command.""" - self._call_vpp_async(self.control_ping_index, - self.control_ping_msgdef, - context=context) + self._call_vpp_async( + self.control_ping_index, self.control_ping_msgdef, context=context + ) def validate_args(self, msg, kwargs): d = set(kwargs.keys()) - set(msg.field_by_name.keys()) if d: - raise VPPValueError('Invalid argument {} to {}' - .format(list(d), msg.name)) + raise VPPValueError("Invalid argument {} to {}".format(list(d), msg.name)) def _add_stat(self, name, ms): if not name in self.stats: - self.stats[name] = {'max': ms, 'count': 1, 'avg': ms} + self.stats[name] = {"max": ms, "count": 1, "avg": ms} else: - if ms > self.stats[name]['max']: - self.stats[name]['max'] = ms - self.stats[name]['count'] += 1 - n = self.stats[name]['count'] - self.stats[name]['avg'] = self.stats[name]['avg'] * (n - 1) / n + ms / n + if ms > self.stats[name]["max"]: + self.stats[name]["max"] = ms + self.stats[name]["count"] += 1 + n = self.stats[name]["count"] + self.stats[name]["avg"] = self.stats[name]["avg"] * (n - 1) / n + ms / n def get_stats(self): - s = '\n=== API PAPI STATISTICS ===\n' - s += '{:<30} {:>4} {:>6} {:>6}\n'.format('message', 'cnt', 'avg', 'max') - for n in sorted(self.stats.items(), key=lambda v: v[1]['avg'], reverse=True): - s += '{:<30} {:>4} {:>6.2f} {:>6.2f}\n'.format(n[0], n[1]['count'], - n[1]['avg'], n[1]['max']) + s = "\n=== API PAPI STATISTICS ===\n" + s += "{:<30} {:>4} {:>6} {:>6}\n".format("message", "cnt", "avg", "max") + for n in sorted(self.stats.items(), key=lambda v: v[1]["avg"], reverse=True): + s += "{:<30} {:>4} {:>6.2f} {:>6.2f}\n".format( + n[0], n[1]["count"], n[1]["avg"], n[1]["max"] + ) return s def get_field_options(self, msg, fld_name): # when there is an option, the msgdef has 3 elements. # ['u32', 'ring_size', {'default': 1024}] for _def in self.messages[msg].msgdef: - if isinstance(_def, list) and \ - len(_def) == 3 and \ - _def[1] == fld_name: + if isinstance(_def, list) and len(_def) == 3 and _def[1] == fld_name: return _def[2] def _call_vpp(self, i, msgdef, service, **kwargs): @@ -707,25 +726,26 @@ class VPPApiClient: no response within the timeout window. """ ts = time.time() - if 'context' not in kwargs: + if "context" not in kwargs: context = self.get_context() - kwargs['context'] = context + kwargs["context"] = context else: - context = kwargs['context'] - kwargs['_vl_msg_id'] = i + context = kwargs["context"] + kwargs["_vl_msg_id"] = i - no_type_conversion = kwargs.pop('_no_type_conversion', False) - timeout = kwargs.pop('_timeout', None) + no_type_conversion = kwargs.pop("_no_type_conversion", False) + timeout = kwargs.pop("_timeout", None) try: if self.transport.socket_index: - kwargs['client_index'] = self.transport.socket_index + kwargs["client_index"] = self.transport.socket_index except AttributeError: pass self.validate_args(msgdef, kwargs) - s = 'Calling {}({})'.format(msgdef.name, - ','.join(['{!r}:{!r}'.format(k, v) for k, v in kwargs.items()])) + s = "Calling {}({})".format( + msgdef.name, ",".join(["{!r}:{!r}".format(k, v) for k, v in kwargs.items()]) + ) self.logger.debug(s) b = msgdef.pack(kwargs) @@ -733,17 +753,17 @@ class VPPApiClient: self.transport.write(b) - msgreply = service['reply'] - stream = True if 'stream' in service else False + msgreply = service["reply"] + stream = True if "stream" in service else False if stream: - if 'stream_msg' in service: + if "stream_msg" in service: # New service['reply'] = _reply and service['stream_message'] = _details - stream_message = service['stream_msg'] - modern =True + stream_message = service["stream_msg"] + modern = True else: # Old service['reply'] = _details stream_message = msgreply - msgreply = 'control_ping_reply' + msgreply = "control_ping_reply" modern = False # Send a ping after the request - we use its response # to detect that we have seen all results. @@ -751,22 +771,22 @@ class VPPApiClient: # Block until we get a reply. rl = [] - while (True): + while True: r = self.read_blocking(no_type_conversion, timeout) if r is None: - raise VPPIOError(2, 'VPP API client: read failed') + raise VPPIOError(2, "VPP API client: read failed") msgname = type(r).__name__ if context not in r or r.context == 0 or context != r.context: # Message being queued self.message_queue.put_nowait(r) continue if msgname != msgreply and (stream and (msgname != stream_message)): - print('REPLY MISMATCH', msgreply, msgname, stream_message, stream) + print("REPLY MISMATCH", msgreply, msgname, stream_message, stream) if not stream: rl = r break if msgname == msgreply: - if modern: # Return both reply and list + if modern: # Return both reply and list rl = r, rl break @@ -774,7 +794,7 @@ class VPPApiClient: self.transport.resume() - s = 'Return value: {!r}'.format(r) + s = "Return value: {!r}".format(r) if len(s) > 80: s = s[:80] + "..." self.logger.debug(s) @@ -795,17 +815,17 @@ class VPPApiClient: The returned context will help with assigning which call the reply belongs to. """ - if 'context' not in kwargs: + if "context" not in kwargs: context = self.get_context() - kwargs['context'] = context + kwargs["context"] = context else: - context = kwargs['context'] + context = kwargs["context"] try: if self.transport.socket_index: - kwargs['client_index'] = self.transport.socket_index + kwargs["client_index"] = self.transport.socket_index except AttributeError: - kwargs['client_index'] = 0 - kwargs['_vl_msg_id'] = i + kwargs["client_index"] = 0 + kwargs["_vl_msg_id"] = i b = msg.pack(kwargs) self.transport.write(b) @@ -891,26 +911,34 @@ class VPPApiClient: """Return VPPs API message table as name_crc dictionary, filtered by message name list.""" - replies = [self.services[n]['reply'] for n in msglist] + replies = [self.services[n]["reply"] for n in msglist] message_table_filtered = {} for name in msglist + replies: - for k,v in self.transport.message_table.items(): + for k, v in self.transport.message_table.items(): if k.startswith(name): message_table_filtered[k] = v break return message_table_filtered def __repr__(self): - return "<VPPApiClient apifiles=%s, testmode=%s, async_thread=%s, " \ - "logger=%s, read_timeout=%s, " \ - "server_address='%s'>" % ( - self._apifiles, self.testmode, self.async_thread, - self.logger, self.read_timeout, self.server_address) + return ( + "<VPPApiClient apifiles=%s, testmode=%s, async_thread=%s, " + "logger=%s, read_timeout=%s, " + "server_address='%s'>" + % ( + self._apifiles, + self.testmode, + self.async_thread, + self.logger, + self.read_timeout, + self.server_address, + ) + ) def details_iter(self, f, **kwargs): cursor = 0 while True: - kwargs['cursor'] = cursor + kwargs["cursor"] = cursor rv, details = f(**kwargs) for d in details: yield d diff --git a/src/vpp-api/python/vpp_papi/vpp_serializer.py b/src/vpp-api/python/vpp_papi/vpp_serializer.py index 644aeac65c6..a99e16aa8f9 100644 --- a/src/vpp-api/python/vpp_papi/vpp_serializer.py +++ b/src/vpp-api/python/vpp_papi/vpp_serializer.py @@ -27,7 +27,7 @@ from . import vpp_format # logger = logging.getLogger('vpp_serializer') # logger.setLevel(logging.DEBUG) # -logger = logging.getLogger('vpp_papi.serializer') +logger = logging.getLogger("vpp_papi.serializer") def check(d): @@ -46,8 +46,7 @@ def conversion_required(data, field_type): def conversion_packer(data, field_type): t = type(data).__name__ - return types[field_type].pack(vpp_format. - conversion_table[field_type][t](data)) + return types[field_type].pack(vpp_format.conversion_table[field_type][t](data)) def conversion_unpacker(data, field_type): @@ -77,30 +76,33 @@ class Packer: return c._get_packer_with_options(f_type, options) except IndexError: raise VPPSerializerValueError( - "Options not supported for {}{} ({})". - format(f_type, types[f_type].__class__, - options)) + "Options not supported for {}{} ({})".format( + f_type, types[f_type].__class__, options + ) + ) class BaseTypes(Packer): def __init__(self, type, elements=0, options=None): self._type = type self._elements = elements - base_types = {'u8': '>B', - 'i8': '>b', - 'string': '>s', - 'u16': '>H', - 'i16': '>h', - 'u32': '>I', - 'i32': '>i', - 'u64': '>Q', - 'i64': '>q', - 'f64': '=d', - 'bool': '>?', - 'header': '>HI'} - - if elements > 0 and (type == 'u8' or type == 'string'): - self.packer = struct.Struct('>%ss' % elements) + base_types = { + "u8": ">B", + "i8": ">b", + "string": ">s", + "u16": ">H", + "i16": ">h", + "u32": ">I", + "i32": ">i", + "u64": ">Q", + "i64": ">q", + "f64": "=d", + "bool": ">?", + "header": ">HI", + } + + if elements > 0 and (type == "u8" or type == "string"): + self.packer = struct.Struct(">%ss" % elements) else: self.packer = struct.Struct(base_types[type]) self.size = self.packer.size @@ -108,8 +110,8 @@ class BaseTypes(Packer): def pack(self, data, kwargs=None): if data is None: # Default to zero if not specified - if self.options and 'default' in self.options: - data = self.options['default'] + if self.options and "default" in self.options: + data = self.options["default"] else: data = 0 return self.packer.pack(data) @@ -122,9 +124,11 @@ class BaseTypes(Packer): return BaseTypes(f_type, options=options) def __repr__(self): - return "BaseTypes(type=%s, elements=%s, options=%s)" % (self._type, - self._elements, - self.options) + return "BaseTypes(type=%s, elements=%s, options=%s)" % ( + self._type, + self._elements, + self.options, + ) class String(Packer): @@ -132,13 +136,15 @@ class String(Packer): self.name = name self.num = num self.size = 1 - self.length_field_packer = BaseTypes('u32') - self.limit = options['limit'] if 'limit' in options else num + self.length_field_packer = BaseTypes("u32") + self.limit = options["limit"] if "limit" in options else num self.fixed = True if num else False if self.fixed and not self.limit: raise VPPSerializerValueError( - "Invalid combination for: {}, {} fixed:{} limit:{}". - format(name, options, self.fixed, self.limit)) + "Invalid combination for: {}, {} fixed:{} limit:{}".format( + name, options, self.fixed, self.limit + ) + ) def pack(self, list, kwargs=None): if not list: @@ -147,34 +153,42 @@ class String(Packer): return self.length_field_packer.pack(0) + b"" if self.limit and len(list) > self.limit - 1: raise VPPSerializerValueError( - "Invalid argument length for: {}, {} maximum {}". - format(list, len(list), self.limit - 1)) + "Invalid argument length for: {}, {} maximum {}".format( + list, len(list), self.limit - 1 + ) + ) if self.fixed: - return list.encode('ascii').ljust(self.limit, b'\x00') - return self.length_field_packer.pack(len(list)) + list.encode('ascii') + return list.encode("ascii").ljust(self.limit, b"\x00") + return self.length_field_packer.pack(len(list)) + list.encode("ascii") def unpack(self, data, offset=0, result=None, ntc=False): if self.fixed: - p = BaseTypes('u8', self.num) + p = BaseTypes("u8", self.num) s = p.unpack(data, offset) - s2 = s[0].split(b'\0', 1)[0] - return (s2.decode('ascii'), self.num) + s2 = s[0].split(b"\0", 1)[0] + return (s2.decode("ascii"), self.num) - length, length_field_size = self.length_field_packer.unpack(data, - offset) + length, length_field_size = self.length_field_packer.unpack(data, offset) if length == 0: - return '', 0 - p = BaseTypes('u8', length) + return "", 0 + p = BaseTypes("u8", length) x, size = p.unpack(data, offset + length_field_size) - return (x.decode('ascii', errors='replace'), size + length_field_size) - - -types = {'u8': BaseTypes('u8'), 'i8': BaseTypes('i8'), - 'u16': BaseTypes('u16'), 'i16': BaseTypes('i16'), - 'u32': BaseTypes('u32'), 'i32': BaseTypes('i32'), - 'u64': BaseTypes('u64'), 'i64': BaseTypes('i64'), - 'f64': BaseTypes('f64'), - 'bool': BaseTypes('bool'), 'string': String} + return (x.decode("ascii", errors="replace"), size + length_field_size) + + +types = { + "u8": BaseTypes("u8"), + "i8": BaseTypes("i8"), + "u16": BaseTypes("u16"), + "i16": BaseTypes("i16"), + "u32": BaseTypes("u32"), + "i32": BaseTypes("i32"), + "u64": BaseTypes("u64"), + "i64": BaseTypes("i64"), + "f64": BaseTypes("f64"), + "bool": BaseTypes("bool"), + "string": String, +} class_types = {} @@ -202,32 +216,34 @@ class FixedList_u8(Packer): """Packs a fixed length bytestring. Left-pads with zeros if input data is too short.""" if not data: - return b'\x00' * self.size + return b"\x00" * self.size if len(data) > self.num: raise VPPSerializerValueError( 'Fixed list length error for "{}", got: {}' - ' expected: {}' - .format(self.name, len(data), self.num)) + " expected: {}".format(self.name, len(data), self.num) + ) try: return self.packer.pack(data) except struct.error: raise VPPSerializerValueError( - 'Packing failed for "{}" {}' - .format(self.name, kwargs)) + 'Packing failed for "{}" {}'.format(self.name, kwargs) + ) def unpack(self, data, offset=0, result=None, ntc=False): if len(data[offset:]) < self.num: raise VPPSerializerValueError( 'Invalid array length for "{}" got {}' - ' expected {}' - .format(self.name, len(data[offset:]), self.num)) + " expected {}".format(self.name, len(data[offset:]), self.num) + ) return self.packer.unpack(data, offset) def __repr__(self): return "FixedList_u8(name=%s, field_type=%s, num=%s)" % ( - self.name, self.field_type, self.num + self.name, + self.field_type, + self.num, ) @@ -242,8 +258,10 @@ class FixedList(Packer): def pack(self, list, kwargs): if len(list) != self.num: raise VPPSerializerValueError( - 'Fixed list length error, got: {} expected: {}' - .format(len(list), self.num)) + "Fixed list length error, got: {} expected: {}".format( + len(list), self.num + ) + ) b = bytes() for e in list: b += self.packer.pack(e) @@ -262,7 +280,10 @@ class FixedList(Packer): def __repr__(self): return "FixedList(name=%s, field_type=%s, num=%s)" % ( - self.name, self.field_type, self.num) + self.name, + self.field_type, + self.num, + ) class VLAList(Packer): @@ -279,13 +300,15 @@ class VLAList(Packer): return b"" if len(lst) != kwargs[self.length_field]: raise VPPSerializerValueError( - 'Variable length error, got: {} expected: {}' - .format(len(lst), kwargs[self.length_field])) + "Variable length error, got: {} expected: {}".format( + len(lst), kwargs[self.length_field] + ) + ) # u8 array if self.packer.size == 1: if isinstance(lst, list): - return b''.join(lst) + return b"".join(lst) return bytes(lst) b = bytes() @@ -300,8 +323,8 @@ class VLAList(Packer): # u8 array if self.packer.size == 1: if result[self.index] == 0: - return b'', 0 - p = BaseTypes('u8', result[self.index]) + return b"", 0 + p = BaseTypes("u8", result[self.index]) return p.unpack(data, offset, ntc=ntc) r = [] @@ -313,10 +336,12 @@ class VLAList(Packer): return r, total def __repr__(self): - return "VLAList(name=%s, field_type=%s, " \ - "len_field_name=%s, index=%s)" % ( - self.name, self.field_type, self.length_field, self.index - ) + return "VLAList(name=%s, field_type=%s, " "len_field_name=%s, index=%s)" % ( + self.name, + self.field_type, + self.length_field, + self.index, + ) class VLAList_legacy(Packer): @@ -340,7 +365,8 @@ class VLAList_legacy(Packer): # Return a list of arguments if (len(data) - offset) % self.packer.size: raise VPPSerializerValueError( - 'Legacy Variable Length Array length mismatch.') + "Legacy Variable Length Array length mismatch." + ) elements = int((len(data) - offset) / self.packer.size) r = [] for e in range(elements): @@ -351,9 +377,7 @@ class VLAList_legacy(Packer): return r, total def __repr__(self): - return "VLAList_legacy(name=%s, field_type=%s)" % ( - self.name, self.field_type - ) + return "VLAList_legacy(name=%s, field_type=%s)" % (self.name, self.field_type) # Will change to IntEnum after 21.04 release @@ -361,16 +385,16 @@ class VPPEnumType(Packer): output_class = IntFlag def __init__(self, name, msgdef, options=None): - self.size = types['u32'].size + self.size = types["u32"].size self.name = name - self.enumtype = 'u32' + self.enumtype = "u32" self.msgdef = msgdef e_hash = {} for f in msgdef: - if type(f) is dict and 'enumtype' in f: - if f['enumtype'] != 'u32': - self.size = types[f['enumtype']].size - self.enumtype = f['enumtype'] + if type(f) is dict and "enumtype" in f: + if f["enumtype"] != "u32": + self.size = types[f["enumtype"]].size + self.enumtype = f["enumtype"] continue ename, evalue = f e_hash[ename] = evalue @@ -387,8 +411,8 @@ class VPPEnumType(Packer): def pack(self, data, kwargs=None): if data is None: # Default to zero if not specified - if self.options and 'default' in self.options: - data = self.options['default'] + if self.options and "default" in self.options: + data = self.options["default"] else: data = 0 @@ -404,7 +428,10 @@ class VPPEnumType(Packer): def __repr__(self): return "%s(name=%s, msgdef=%s, options=%s)" % ( - self.__class__.__name__, self.name, self.msgdef, self.options + self.__class__.__name__, + self.name, + self.msgdef, + self.options, ) @@ -424,14 +451,13 @@ class VPPUnionType(Packer): fields = [] self.packers = collections.OrderedDict() for i, f in enumerate(msgdef): - if type(f) is dict and 'crc' in f: - self.crc = f['crc'] + if type(f) is dict and "crc" in f: + self.crc = f["crc"] continue f_type, f_name = f if f_type not in types: - logger.debug('Unknown union type {}'.format(f_type)) - raise VPPSerializerValueError( - 'Unknown message type {}'.format(f_type)) + logger.debug("Unknown union type {}".format(f_type)) + raise VPPSerializerValueError("Unknown message type {}".format(f_type)) fields.append(f_name) size = types[f_type].size self.packers[f_name] = types[f_type] @@ -445,14 +471,14 @@ class VPPUnionType(Packer): # Union of variable length? def pack(self, data, kwargs=None): if not data: - return b'\x00' * self.size + return b"\x00" * self.size for k, v in data.items(): logger.debug("Key: {} Value: {}".format(k, v)) b = self.packers[k].pack(v, kwargs) break r = bytearray(self.size) - r[:len(b)] = b + r[: len(b)] = b return r def unpack(self, data, offset=0, result=None, ntc=False): @@ -466,25 +492,24 @@ class VPPUnionType(Packer): return self.tuple._make(r), maxsize def __repr__(self): - return"VPPUnionType(name=%s, msgdef=%r)" % (self.name, self.msgdef) + return "VPPUnionType(name=%s, msgdef=%r)" % (self.name, self.msgdef) class VPPTypeAlias(Packer): def __init__(self, name, msgdef, options=None): self.name = name self.msgdef = msgdef - t = vpp_get_type(msgdef['type']) + t = vpp_get_type(msgdef["type"]) if not t: - raise ValueError('No such type: {}'.format(msgdef['type'])) - if 'length' in msgdef: - if msgdef['length'] == 0: + raise ValueError("No such type: {}".format(msgdef["type"])) + if "length" in msgdef: + if msgdef["length"] == 0: raise ValueError() - if msgdef['type'] == 'u8': - self.packer = FixedList_u8(name, msgdef['type'], - msgdef['length']) + if msgdef["type"] == "u8": + self.packer = FixedList_u8(name, msgdef["type"], msgdef["length"]) self.size = self.packer.size else: - self.packer = FixedList(name, msgdef['type'], msgdef['length']) + self.packer = FixedList(name, msgdef["type"], msgdef["length"]) else: self.packer = t self.size = t.size @@ -498,11 +523,11 @@ class VPPTypeAlias(Packer): try: return conversion_packer(data, self.name) # Python 2 and 3 raises different exceptions from inet_pton - except(OSError, socket.error, TypeError): + except (OSError, socket.error, TypeError): pass if data is None: # Default to zero if not specified - if self.options and 'default' in self.options: - data = self.options['default'] + if self.options and "default" in self.options: + data = self.options["default"] else: data = 0 @@ -525,7 +550,10 @@ class VPPTypeAlias(Packer): def __repr__(self): return "VPPTypeAlias(name=%s, msgdef=%s, options=%s)" % ( - self.name, self.msgdef, self.options) + self.name, + self.msgdef, + self.options, + ) class VPPType(Packer): @@ -539,17 +567,16 @@ class VPPType(Packer): self.field_by_name = {} size = 0 for i, f in enumerate(msgdef): - if type(f) is dict and 'crc' in f: - self.crc = f['crc'] + if type(f) is dict and "crc" in f: + self.crc = f["crc"] continue f_type, f_name = f[:2] self.fields.append(f_name) self.field_by_name[f_name] = None self.fieldtypes.append(f_type) if f_type not in types: - logger.debug('Unknown type {}'.format(f_type)) - raise VPPSerializerValueError( - 'Unknown message type {}'.format(f_type)) + logger.debug("Unknown type {}".format(f_type)) + raise VPPSerializerValueError("Unknown message type {}".format(f_type)) fieldlen = len(f) options = [x for x in f if type(x) is dict] @@ -561,16 +588,16 @@ class VPPType(Packer): if fieldlen == 3: # list list_elements = f[2] if list_elements == 0: - if f_type == 'string': + if f_type == "string": p = String(f_name, 0, self.options) else: p = VLAList_legacy(f_name, f_type) self.packers.append(p) - elif f_type == 'u8': + elif f_type == "u8": p = FixedList_u8(f_name, f_type, list_elements) self.packers.append(p) size += p.size - elif f_type == 'string': + elif f_type == "string": p = String(f_name, list_elements, self.options) self.packers.append(p) size += p.size @@ -584,7 +611,7 @@ class VPPType(Packer): self.packers.append(p) else: # default support for types that decay to basetype - if 'default' in self.options: + if "default" in self.options: p = self.get_packer_with_options(f_type, self.options) else: p = types[f_type] @@ -609,8 +636,8 @@ class VPPType(Packer): for i, a in enumerate(self.fields): if data and type(data) is not dict and a not in data: raise VPPSerializerValueError( - "Invalid argument: {} expected {}.{}". - format(data, self.name, a)) + "Invalid argument: {} expected {}.{}".format(data, self.name, a) + ) # Defaulting to zero. if not data or a not in data: # Default to 0 @@ -651,7 +678,9 @@ class VPPType(Packer): def __repr__(self): return "%s(name=%s, msgdef=%s)" % ( - self.__class__.__name__, self.name, self.msgdef + self.__class__.__name__, + self.name, + self.msgdef, ) diff --git a/src/vpp-api/python/vpp_papi/vpp_stats.py b/src/vpp-api/python/vpp_papi/vpp_stats.py index 0b1c701a430..4a342b68a8f 100755 --- a/src/vpp-api/python/vpp_papi/vpp_stats.py +++ b/src/vpp-api/python/vpp_papi/vpp_stats.py @@ -14,7 +14,7 @@ # limitations under the License. # -''' +""" This module implement Python access to the VPP statistics segment. It accesses the data structures directly in shared memory. VPP uses optimistic locking, so data structures may change underneath @@ -39,7 +39,7 @@ stat['/if/rx'][:, 1].sum_packets() - returns the sum of packet counters for interface 1 on all threads stat['/if/rx-miss'][:, 1].sum() - returns the sum of packet counters for interface 1 on all threads for simple counters -''' +""" import os import socket @@ -50,31 +50,36 @@ import time import unittest import re + def recv_fd(sock): - '''Get file descriptor for memory map''' - fds = array.array("i") # Array of ints + """Get file descriptor for memory map""" + fds = array.array("i") # Array of ints _, ancdata, _, _ = sock.recvmsg(0, socket.CMSG_LEN(4)) for cmsg_level, cmsg_type, cmsg_data in ancdata: if cmsg_level == socket.SOL_SOCKET and cmsg_type == socket.SCM_RIGHTS: - fds.frombytes(cmsg_data[:len(cmsg_data) - (len(cmsg_data) % fds.itemsize)]) + fds.frombytes(cmsg_data[: len(cmsg_data) - (len(cmsg_data) % fds.itemsize)]) return list(fds)[0] -VEC_LEN_FMT = Struct('I') + +VEC_LEN_FMT = Struct("I") + + def get_vec_len(stats, vector_offset): - '''Equivalent to VPP vec_len()''' + """Equivalent to VPP vec_len()""" return VEC_LEN_FMT.unpack_from(stats.statseg, vector_offset - 8)[0] + def get_string(stats, ptr): - '''Get a string from a VPP vector''' + """Get a string from a VPP vector""" namevector = ptr - stats.base namevectorlen = get_vec_len(stats, namevector) if namevector + namevectorlen >= stats.size: - raise IOError('String overruns stats segment') - return stats.statseg[namevector:namevector+namevectorlen-1].decode('ascii') + raise IOError("String overruns stats segment") + return stats.statseg[namevector : namevector + namevectorlen - 1].decode("ascii") class StatsVector: - '''A class representing a VPP vector''' + """A class representing a VPP vector""" def __init__(self, stats, ptr, fmt): self.vec_start = ptr - stats.base @@ -86,28 +91,35 @@ class StatsVector: self.stats = stats if self.vec_start + self.vec_len * self.elementsize >= stats.size: - raise IOError('Vector overruns stats segment') + raise IOError("Vector overruns stats segment") def __iter__(self): with self.stats.lock: - return self.struct.iter_unpack(self.statseg[self.vec_start:self.vec_start + - self.elementsize*self.vec_len]) + return self.struct.iter_unpack( + self.statseg[ + self.vec_start : self.vec_start + self.elementsize * self.vec_len + ] + ) def __getitem__(self, index): if index > self.vec_len: - raise IOError('Index beyond end of vector') + raise IOError("Index beyond end of vector") with self.stats.lock: if self.fmtlen == 1: - return self.struct.unpack_from(self.statseg, self.vec_start + - (index * self.elementsize))[0] - return self.struct.unpack_from(self.statseg, self.vec_start + - (index * self.elementsize)) + return self.struct.unpack_from( + self.statseg, self.vec_start + (index * self.elementsize) + )[0] + return self.struct.unpack_from( + self.statseg, self.vec_start + (index * self.elementsize) + ) + + +class VPPStats: + """Main class implementing Python access to the VPP statistics segment""" -class VPPStats(): - '''Main class implementing Python access to the VPP statistics segment''' # pylint: disable=too-many-instance-attributes - shared_headerfmt = Struct('QPQQPP') - default_socketname = '/run/vpp/stats.sock' + shared_headerfmt = Struct("QPQQPP") + default_socketname = "/run/vpp/stats.sock" def __init__(self, socketname=default_socketname, timeout=10): self.socketname = socketname @@ -120,7 +132,7 @@ class VPPStats(): self.statseg = 0 def connect(self): - '''Connect to stats segment''' + """Connect to stats segment""" if self.connected: return sock = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET) @@ -130,61 +142,64 @@ class VPPStats(): sock.close() stat_result = os.fstat(mfd) - self.statseg = mmap.mmap(mfd, stat_result.st_size, mmap.PROT_READ, mmap.MAP_SHARED) + self.statseg = mmap.mmap( + mfd, stat_result.st_size, mmap.PROT_READ, mmap.MAP_SHARED + ) os.close(mfd) self.size = stat_result.st_size if self.version != 2: - raise Exception('Incompatbile stat segment version {}' - .format(self.version)) + raise Exception("Incompatbile stat segment version {}".format(self.version)) self.refresh() self.connected = True def disconnect(self): - '''Disconnect from stats segment''' + """Disconnect from stats segment""" if self.connected: self.statseg.close() self.connected = False @property def version(self): - '''Get version of stats segment''' + """Get version of stats segment""" return self.shared_headerfmt.unpack_from(self.statseg)[0] @property def base(self): - '''Get base pointer of stats segment''' + """Get base pointer of stats segment""" return self.shared_headerfmt.unpack_from(self.statseg)[1] @property def epoch(self): - '''Get current epoch value from stats segment''' + """Get current epoch value from stats segment""" return self.shared_headerfmt.unpack_from(self.statseg)[2] @property def in_progress(self): - '''Get value of in_progress from stats segment''' + """Get value of in_progress from stats segment""" return self.shared_headerfmt.unpack_from(self.statseg)[3] @property def directory_vector(self): - '''Get pointer of directory vector''' + """Get pointer of directory vector""" return self.shared_headerfmt.unpack_from(self.statseg)[4] - elementfmt = 'IQ128s' + elementfmt = "IQ128s" def refresh(self, blocking=True): - '''Refresh directory vector cache (epoch changed)''' + """Refresh directory vector cache (epoch changed)""" directory = {} directory_by_idx = {} while True: try: with self.lock: self.last_epoch = self.epoch - for i, direntry in enumerate(StatsVector(self, self.directory_vector, self.elementfmt)): - path_raw = direntry[2].find(b'\x00') - path = direntry[2][:path_raw].decode('ascii') + for i, direntry in enumerate( + StatsVector(self, self.directory_vector, self.elementfmt) + ): + path_raw = direntry[2].find(b"\x00") + path = direntry[2][:path_raw].decode("ascii") directory[path] = StatsEntry(direntry[0], direntry[1]) directory_by_idx[i] = path self.directory = directory @@ -210,14 +225,12 @@ class VPPStats(): def __iter__(self): return iter(self.directory.items()) - def set_errors(self, blocking=True): - '''Return dictionary of error counters > 0''' + """Return dictionary of error counters > 0""" if not self.connected: self.connect() - errors = {k: v for k, v in self.directory.items() - if k.startswith("/err/")} + errors = {k: v for k, v in self.directory.items() if k.startswith("/err/")} result = {} for k in errors: try: @@ -229,23 +242,23 @@ class VPPStats(): return result def set_errors_str(self, blocking=True): - '''Return all errors counters > 0 pretty printed''' - error_string = ['ERRORS:'] + """Return all errors counters > 0 pretty printed""" + error_string = ["ERRORS:"] error_counters = self.set_errors(blocking) for k in sorted(error_counters): - error_string.append('{:<60}{:>10}'.format(k, error_counters[k])) - return '%s\n' % '\n'.join(error_string) + error_string.append("{:<60}{:>10}".format(k, error_counters[k])) + return "%s\n" % "\n".join(error_string) def get_counter(self, name, blocking=True): - '''Alternative call to __getitem__''' + """Alternative call to __getitem__""" return self.__getitem__(name, blocking) def get_err_counter(self, name, blocking=True): - '''Alternative call to __getitem__''' + """Alternative call to __getitem__""" return self.__getitem__(name, blocking).sum() def ls(self, patterns): - '''Returns list of counters matching pattern''' + """Returns list of counters matching pattern""" # pylint: disable=invalid-name if not self.connected: self.connect() @@ -255,20 +268,24 @@ class VPPStats(): if self.last_epoch != self.epoch: self.refresh() - return [k for k, v in self.directory.items() - if any(re.match(pattern, k) for pattern in regex)] + return [ + k + for k, v in self.directory.items() + if any(re.match(pattern, k) for pattern in regex) + ] def dump(self, counters, blocking=True): - '''Given a list of counters return a dictionary of results''' + """Given a list of counters return a dictionary of results""" if not self.connected: self.connect() result = {} for cnt in counters: - result[cnt] = self.__getitem__(cnt,blocking) + result[cnt] = self.__getitem__(cnt, blocking) return result -class StatsLock(): - '''Stat segment optimistic locking''' + +class StatsLock: + """Stat segment optimistic locking""" def __init__(self, stats): self.stats = stats @@ -283,7 +300,7 @@ class StatsLock(): self.release() def acquire(self, blocking=True, timeout=-1): - '''Acquire the lock. Await in progress to go false. Record epoch.''' + """Acquire the lock. Await in progress to go false. Record epoch.""" self.epoch = self.stats.epoch if timeout > 0: start = time.monotonic() @@ -296,46 +313,49 @@ class StatsLock(): return True def release(self): - '''Check if data read while locked is valid''' + """Check if data read while locked is valid""" if self.stats.in_progress or self.stats.epoch != self.epoch: - raise IOError('Optimistic lock failed, retry') + raise IOError("Optimistic lock failed, retry") def locked(self): - '''Not used''' + """Not used""" class StatsCombinedList(list): - '''Column slicing for Combined counters list''' + """Column slicing for Combined counters list""" def __getitem__(self, item): - '''Supports partial numpy style 2d support. Slice by column [:,1]''' + """Supports partial numpy style 2d support. Slice by column [:,1]""" if isinstance(item, int): return list.__getitem__(self, item) return CombinedList([row[item[1]] for row in self]) + class CombinedList(list): - '''Combined Counters 2-dimensional by thread by index of packets/octets''' + """Combined Counters 2-dimensional by thread by index of packets/octets""" def packets(self): - '''Return column (2nd dimension). Packets for all threads''' + """Return column (2nd dimension). Packets for all threads""" return [pair[0] for pair in self] def octets(self): - '''Return column (2nd dimension). Octets for all threads''' + """Return column (2nd dimension). Octets for all threads""" return [pair[1] for pair in self] def sum_packets(self): - '''Return column (2nd dimension). Sum of all packets for all threads''' + """Return column (2nd dimension). Sum of all packets for all threads""" return sum(self.packets()) def sum_octets(self): - '''Return column (2nd dimension). Sum of all octets for all threads''' + """Return column (2nd dimension). Sum of all octets for all threads""" return sum(self.octets()) + class StatsTuple(tuple): - '''A Combined vector tuple (packets, octets)''' + """A Combined vector tuple (packets, octets)""" + def __init__(self, data): - self.dictionary = {'packets': data[0], 'bytes': data[1]} + self.dictionary = {"packets": data[0], "bytes": data[1]} super().__init__() def __repr__(self): @@ -344,28 +364,32 @@ class StatsTuple(tuple): def __getitem__(self, item): if isinstance(item, int): return tuple.__getitem__(self, item) - if item == 'packets': + if item == "packets": return tuple.__getitem__(self, 0) return tuple.__getitem__(self, 1) + class StatsSimpleList(list): - '''Simple Counters 2-dimensional by thread by index of packets''' + """Simple Counters 2-dimensional by thread by index of packets""" def __getitem__(self, item): - '''Supports partial numpy style 2d support. Slice by column [:,1]''' + """Supports partial numpy style 2d support. Slice by column [:,1]""" if isinstance(item, int): return list.__getitem__(self, item) return SimpleList([row[item[1]] for row in self]) + class SimpleList(list): - '''Simple counter''' + """Simple counter""" def sum(self): - '''Sum the vector''' + """Sum the vector""" return sum(self) -class StatsEntry(): - '''An individual stats entry''' + +class StatsEntry: + """An individual stats entry""" + # pylint: disable=unused-argument,no-self-use def __init__(self, stattype, statvalue): @@ -386,115 +410,128 @@ class StatsEntry(): self.function = self.illegal def illegal(self, stats): - '''Invalid or unknown counter type''' + """Invalid or unknown counter type""" return None def scalar(self, stats): - '''Scalar counter''' + """Scalar counter""" return self.value def simple(self, stats): - '''Simple counter''' + """Simple counter""" counter = StatsSimpleList() - for threads in StatsVector(stats, self.value, 'P'): - clist = [v[0] for v in StatsVector(stats, threads[0], 'Q')] + for threads in StatsVector(stats, self.value, "P"): + clist = [v[0] for v in StatsVector(stats, threads[0], "Q")] counter.append(clist) return counter def combined(self, stats): - '''Combined counter''' + """Combined counter""" counter = StatsCombinedList() - for threads in StatsVector(stats, self.value, 'P'): - clist = [StatsTuple(cnt) for cnt in StatsVector(stats, threads[0], 'QQ')] + for threads in StatsVector(stats, self.value, "P"): + clist = [StatsTuple(cnt) for cnt in StatsVector(stats, threads[0], "QQ")] counter.append(clist) return counter def name(self, stats): - '''Name counter''' + """Name counter""" counter = [] - for name in StatsVector(stats, self.value, 'P'): + for name in StatsVector(stats, self.value, "P"): if name[0]: counter.append(get_string(stats, name[0])) return counter - SYMLINK_FMT1 = Struct('II') - SYMLINK_FMT2 = Struct('Q') + SYMLINK_FMT1 = Struct("II") + SYMLINK_FMT2 = Struct("Q") + def symlink(self, stats): - '''Symlink counter''' + """Symlink counter""" b = self.SYMLINK_FMT2.pack(self.value) index1, index2 = self.SYMLINK_FMT1.unpack(b) name = stats.directory_by_idx[index1] - return stats[name][:,index2] + return stats[name][:, index2] def get_counter(self, stats): - '''Return a list of counters''' + """Return a list of counters""" if stats: return self.function(stats) + class TestStats(unittest.TestCase): - '''Basic statseg tests''' + """Basic statseg tests""" def setUp(self): - '''Connect to statseg''' + """Connect to statseg""" self.stat = VPPStats() self.stat.connect() self.profile = cProfile.Profile() self.profile.enable() def tearDown(self): - '''Disconnect from statseg''' + """Disconnect from statseg""" self.stat.disconnect() profile = Stats(self.profile) profile.strip_dirs() - profile.sort_stats('cumtime') + profile.sort_stats("cumtime") profile.print_stats() print("\n--->>>") def test_counters(self): - '''Test access to statseg''' - - print('/err/abf-input-ip4/missed', self.stat['/err/abf-input-ip4/missed']) - print('/sys/heartbeat', self.stat['/sys/heartbeat']) - print('/if/names', self.stat['/if/names']) - print('/if/rx-miss', self.stat['/if/rx-miss']) - print('/if/rx-miss', self.stat['/if/rx-miss'][1]) - print('/nat44-ed/out2in/slowpath/drops', self.stat['/nat44-ed/out2in/slowpath/drops']) + """Test access to statseg""" + + print("/err/abf-input-ip4/missed", self.stat["/err/abf-input-ip4/missed"]) + print("/sys/heartbeat", self.stat["/sys/heartbeat"]) + print("/if/names", self.stat["/if/names"]) + print("/if/rx-miss", self.stat["/if/rx-miss"]) + print("/if/rx-miss", self.stat["/if/rx-miss"][1]) + print( + "/nat44-ed/out2in/slowpath/drops", + self.stat["/nat44-ed/out2in/slowpath/drops"], + ) with self.assertRaises(KeyError): - print('NO SUCH COUNTER', self.stat['foobar']) - print('/if/rx', self.stat.get_counter('/if/rx')) - print('/err/ethernet-input/no_error', - self.stat.get_counter('/err/ethernet-input/no_error')) + print("NO SUCH COUNTER", self.stat["foobar"]) + print("/if/rx", self.stat.get_counter("/if/rx")) + print( + "/err/ethernet-input/no_error", + self.stat.get_counter("/err/ethernet-input/no_error"), + ) def test_column(self): - '''Test column slicing''' - - print('/if/rx-miss', self.stat['/if/rx-miss']) - print('/if/rx', self.stat['/if/rx']) # All interfaces for thread #1 - print('/if/rx thread #1', self.stat['/if/rx'][0]) # All interfaces for thread #1 - print('/if/rx thread #1, interface #1', - self.stat['/if/rx'][0][1]) # All interfaces for thread #1 - print('/if/rx if_index #1', self.stat['/if/rx'][:, 1]) - print('/if/rx if_index #1 packets', self.stat['/if/rx'][:, 1].packets()) - print('/if/rx if_index #1 packets', self.stat['/if/rx'][:, 1].sum_packets()) - print('/if/rx if_index #1 packets', self.stat['/if/rx'][:, 1].octets()) - print('/if/rx-miss', self.stat['/if/rx-miss']) - print('/if/rx-miss if_index #1 packets', self.stat['/if/rx-miss'][:, 1].sum()) - print('/if/rx if_index #1 packets', self.stat['/if/rx'][0][1]['packets']) + """Test column slicing""" + + print("/if/rx-miss", self.stat["/if/rx-miss"]) + print("/if/rx", self.stat["/if/rx"]) # All interfaces for thread #1 + print( + "/if/rx thread #1", self.stat["/if/rx"][0] + ) # All interfaces for thread #1 + print( + "/if/rx thread #1, interface #1", self.stat["/if/rx"][0][1] + ) # All interfaces for thread #1 + print("/if/rx if_index #1", self.stat["/if/rx"][:, 1]) + print("/if/rx if_index #1 packets", self.stat["/if/rx"][:, 1].packets()) + print("/if/rx if_index #1 packets", self.stat["/if/rx"][:, 1].sum_packets()) + print("/if/rx if_index #1 packets", self.stat["/if/rx"][:, 1].octets()) + print("/if/rx-miss", self.stat["/if/rx-miss"]) + print("/if/rx-miss if_index #1 packets", self.stat["/if/rx-miss"][:, 1].sum()) + print("/if/rx if_index #1 packets", self.stat["/if/rx"][0][1]["packets"]) def test_nat44(self): - '''Test the nat counters''' + """Test the nat counters""" - print('/nat44-ei/ha/del-event-recv', self.stat['/nat44-ei/ha/del-event-recv']) - print('/err/nat44-ei-ha/pkts-processed', self.stat['/err/nat44-ei-ha/pkts-processed'].sum()) + print("/nat44-ei/ha/del-event-recv", self.stat["/nat44-ei/ha/del-event-recv"]) + print( + "/err/nat44-ei-ha/pkts-processed", + self.stat["/err/nat44-ei-ha/pkts-processed"].sum(), + ) def test_legacy(self): - '''Legacy interface''' + """Legacy interface""" directory = self.stat.ls(["^/if", "/err/ip4-input", "/sys/node/ip4-input"]) data = self.stat.dump(directory) print(data) - print('Looking up sys node') + print("Looking up sys node") directory = self.stat.ls(["^/sys/node"]) - print('Dumping sys node') + print("Dumping sys node") data = self.stat.dump(directory) print(data) directory = self.stat.ls(["^/foobar"]) @@ -502,18 +539,19 @@ class TestStats(unittest.TestCase): print(data) def test_sys_nodes(self): - '''Test /sys/nodes''' - counters = self.stat.ls('^/sys/node') - print('COUNTERS:', counters) - print('/sys/node', self.stat.dump(counters)) - print('/net/route/to', self.stat['/net/route/to']) + """Test /sys/nodes""" + counters = self.stat.ls("^/sys/node") + print("COUNTERS:", counters) + print("/sys/node", self.stat.dump(counters)) + print("/net/route/to", self.stat["/net/route/to"]) def test_symlink(self): - '''Symbolic links''' - print('/interface/local0/rx', self.stat['/interfaces/local0/rx']) - print('/sys/nodes/unix-epoll-input', self.stat['/nodes/unix-epoll-input/calls']) + """Symbolic links""" + print("/interface/local0/rx", self.stat["/interfaces/local0/rx"]) + print("/sys/nodes/unix-epoll-input", self.stat["/nodes/unix-epoll-input/calls"]) + -if __name__ == '__main__': +if __name__ == "__main__": import cProfile from pstats import Stats diff --git a/src/vpp-api/python/vpp_papi/vpp_transport_socket.py b/src/vpp-api/python/vpp_papi/vpp_transport_socket.py index c82b8c365a1..3a8c332a00a 100644 --- a/src/vpp-api/python/vpp_papi/vpp_transport_socket.py +++ b/src/vpp-api/python/vpp_papi/vpp_transport_socket.py @@ -9,7 +9,7 @@ import multiprocessing import queue import logging -logger = logging.getLogger('vpp_papi.transport') +logger = logging.getLogger("vpp_papi.transport") logger.addHandler(logging.NullHandler()) @@ -26,7 +26,7 @@ class VppTransport: self.read_timeout = read_timeout if read_timeout > 0 else None self.parent = parent self.server_address = server_address - self.header = struct.Struct('>QII') + self.header = struct.Struct(">QII") self.message_table = {} # These queues can be accessed async. # They are always up, but replaced on connect. @@ -41,11 +41,10 @@ class VppTransport: def msg_thread_func(self): while True: try: - rlist, _, _ = select.select([self.socket, - self.sque._reader], [], []) + rlist, _, _ = select.select([self.socket, self.sque._reader], [], []) except socket.error: # Terminate thread - logging.error('select failed') + logging.error("select failed") self.q.put(None) return @@ -71,8 +70,7 @@ class VppTransport: else: self.parent.msg_handler_async(msg) else: - raise VppTransportSocketIOError( - 2, 'Unknown response from select') + raise VppTransportSocketIOError(2, "Unknown response from select") def connect(self, name, pfx, msg_handler, rx_qlen): # TODO: Reorder the actions and add "roll-backs", @@ -80,7 +78,8 @@ class VppTransport: if self.message_thread is not None: raise VppTransportSocketIOError( - 1, "PAPI socket transport connect: Need to disconnect first.") + 1, "PAPI socket transport connect: Need to disconnect first." + ) # Create a UDS socket self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) @@ -107,19 +106,17 @@ class VppTransport: self.message_thread = threading.Thread(target=self.msg_thread_func) # Initialise sockclnt_create - sockclnt_create = self.parent.messages['sockclnt_create'] - sockclnt_create_reply = self.parent.messages['sockclnt_create_reply'] + sockclnt_create = self.parent.messages["sockclnt_create"] + sockclnt_create_reply = self.parent.messages["sockclnt_create_reply"] - args = {'_vl_msg_id': 15, - 'name': name, - 'context': 124} + args = {"_vl_msg_id": 15, "name": name, "context": 124} b = sockclnt_create.pack(args) self.write(b) msg = self._read() hdr, length = self.parent.header.unpack(msg, 0) if hdr.msgid != 16: # TODO: Add first numeric argument. - raise VppTransportSocketIOError('Invalid reply message') + raise VppTransportSocketIOError("Invalid reply message") r, length = sockclnt_create_reply.unpack(msg) self.socket_index = r.index @@ -184,7 +181,7 @@ class VppTransport: def write(self, buf): """Send a binary-packed message to VPP.""" if not self.connected: - raise VppTransportSocketIOError(1, 'Not connected') + raise VppTransportSocketIOError(1, "Not connected") # Send header header = self.header.pack(0, len(buf), 0) @@ -192,8 +189,7 @@ class VppTransport: self.socket.sendall(header) self.socket.sendall(buf) except socket.error as err: - raise VppTransportSocketIOError(1, 'Sendall error: {err!r}'.format( - err=err)) + raise VppTransportSocketIOError(1, "Sendall error: {err!r}".format(err=err)) def _read_fixed(self, size): """Repeat receive until fixed size is read. Return empty on error.""" @@ -223,11 +219,11 @@ class VppTransport: msg = self._read_fixed(hdrlen) if hdrlen == len(msg): return msg - raise VppTransportSocketIOError(1, 'Unknown socket read error') + raise VppTransportSocketIOError(1, "Unknown socket read error") def read(self, timeout=None): if not self.connected: - raise VppTransportSocketIOError(1, 'Not connected') + raise VppTransportSocketIOError(1, "Not connected") if timeout is None: timeout = self.read_timeout try: |