aboutsummaryrefslogtreecommitdiffstats
path: root/src/vpp-api/python/tests
diff options
context:
space:
mode:
Diffstat (limited to 'src/vpp-api/python/tests')
-rwxr-xr-xsrc/vpp-api/python/tests/test_cli.py52
-rwxr-xr-xsrc/vpp-api/python/tests/test_modules.py18
-rwxr-xr-xsrc/vpp-api/python/tests/test_papi.py119
-rwxr-xr-xsrc/vpp-api/python/tests/test_version.py35
-rwxr-xr-xsrc/vpp-api/python/tests/test_vpp_papi2.py487
5 files changed, 711 insertions, 0 deletions
diff --git a/src/vpp-api/python/tests/test_cli.py b/src/vpp-api/python/tests/test_cli.py
new file mode 100755
index 00000000..66fb6943
--- /dev/null
+++ b/src/vpp-api/python/tests/test_cli.py
@@ -0,0 +1,52 @@
+#!/usr/bin/env python
+
+from __future__ import print_function
+import unittest, sys, time, threading, struct
+import test_base
+import vpp_papi
+from ipaddress import *
+
+import glob, subprocess
+class TestPAPI(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ #
+ # Start main VPP process
+ cls.vpp_bin = glob.glob(test_base.scriptdir+'/../../../build-root/install-vpp*-native/vpp/bin/vpp')[0]
+ print("VPP BIN:", cls.vpp_bin)
+ cls.vpp = subprocess.Popen([cls.vpp_bin, "unix", "nodaemon"], stderr=subprocess.PIPE)
+ print('Started VPP')
+ # For some reason unless we let VPP start up the API cannot connect.
+ time.sleep(0.3)
+ @classmethod
+ def tearDownClass(cls):
+ cls.vpp.terminate()
+
+ def setUp(self):
+ print("Connecting API")
+ r = vpp_papi.connect("test_papi")
+ self.assertEqual(r, 0)
+
+ def tearDown(self):
+ r = vpp_papi.disconnect()
+ self.assertEqual(r, 0)
+
+ #
+ # The tests themselves
+ #
+
+ #
+ # Basic request / reply
+ #
+ def test_cli_request(self):
+ print(vpp_papi.cli_exec('show version verbose'))
+ #t = vpp_papi.cli_inband_request(len(cmd), cmd)
+ #print('T:',t)
+ #reply = t.reply[0].decode().rstrip('\x00')
+ #print(reply)
+ #program = t.program.decode().rstrip('\x00')
+ #self.assertEqual('vpe', program)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/src/vpp-api/python/tests/test_modules.py b/src/vpp-api/python/tests/test_modules.py
new file mode 100755
index 00000000..fdcd092c
--- /dev/null
+++ b/src/vpp-api/python/tests/test_modules.py
@@ -0,0 +1,18 @@
+from __future__ import print_function
+import unittest
+import vpp_papi
+import pot, snat
+print('Plugins:')
+vpp_papi.plugin_show()
+r = vpp_papi.connect('ole')
+
+r = vpp_papi.show_version()
+print('R:', r)
+
+r = snat.snat_interface_add_del_feature(1, 1, 1)
+print('R:', r)
+
+list_name = 'foobar'
+r = pot.pot_profile_add(0, 1, 123, 123, 0, 12, 0, 23, len(list_name), list_name)
+print('R:', r)
+vpp_papi.disconnect()
diff --git a/src/vpp-api/python/tests/test_papi.py b/src/vpp-api/python/tests/test_papi.py
new file mode 100755
index 00000000..8cbbfc59
--- /dev/null
+++ b/src/vpp-api/python/tests/test_papi.py
@@ -0,0 +1,119 @@
+from __future__ import print_function
+import unittest, sys, time, threading, struct, logging, os
+import vpp_papi
+from ipaddress import *
+scriptdir = os.path.dirname(os.path.realpath(__file__))
+papi_event = threading.Event()
+print(vpp_papi.vpe.VL_API_SW_INTERFACE_SET_FLAGS)
+def papi_event_handler(result):
+ if result.vl_msg_id == vpp_papi.vpe.VL_API_SW_INTERFACE_SET_FLAGS:
+ return
+ if result.vl_msg_id == vpp_papi.vpe.VL_API_VNET_INTERFACE_COUNTERS:
+ print('Interface counters', result)
+ return
+ if result.vl_msg_id == vpp_papi.vpe.VL_API_VNET_IP6_FIB_COUNTERS:
+ print('IPv6 FIB counters', result)
+ papi_event.set()
+ return
+
+ print('Unknown message id:', result.vl_msg_id)
+
+import glob, subprocess
+class TestPAPI(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ #
+ # Start main VPP process
+ cls.vpp_bin = glob.glob(scriptdir+'/../../../build-root/install-vpp*-native/vpp/bin/vpp')[0]
+ print("VPP BIN:", cls.vpp_bin)
+ cls.vpp = subprocess.Popen([cls.vpp_bin, "unix", "nodaemon"], stderr=subprocess.PIPE)
+ print('Started VPP')
+ # For some reason unless we let VPP start up the API cannot connect.
+ time.sleep(0.3)
+ @classmethod
+ def tearDownClass(cls):
+ cls.vpp.terminate()
+
+ def setUp(self):
+ print("Connecting API")
+ r = vpp_papi.connect("test_papi")
+ self.assertEqual(r, 0)
+
+ def tearDown(self):
+ r = vpp_papi.disconnect()
+ self.assertEqual(r, 0)
+
+ #
+ # The tests themselves
+ #
+
+ #
+ # Basic request / reply
+ #
+ def test_show_version(self):
+ t = vpp_papi.show_version()
+ print('T', t);
+ program = t.program.decode().rstrip('\x00')
+ self.assertEqual('vpe', program)
+
+ #
+ # Details / Dump
+ #
+ def test_details_dump(self):
+ t = vpp_papi.sw_interface_dump(0, b'')
+ print('Dump/details T', t)
+
+ #
+ # Arrays
+ #
+ def test_arrays(self):
+ t = vpp_papi.vnet_get_summary_stats()
+ print('Summary stats', t)
+ print('Packets:', t.total_pkts[0])
+ print('Packets:', t.total_pkts[1])
+ #
+ # Variable sized arrays and counters
+ #
+ #@unittest.skip("stats")
+ def test_want_stats(self):
+ pid = 123
+ vpp_papi.register_event_callback(papi_event_handler)
+ papi_event.clear()
+
+ # Need to configure IPv6 to get som IPv6 FIB stats
+ t = vpp_papi.create_loopback('')
+ print(t)
+ self.assertEqual(t.retval, 0)
+
+ ifindex = t.sw_if_index
+ addr = str(IPv6Address(u'1::1').packed)
+ t = vpp_papi.sw_interface_add_del_address(ifindex, 1, 1, 0, 16, addr)
+ print(t)
+ self.assertEqual(t.retval, 0)
+
+ # Check if interface is up
+ # XXX: Add new API to query interface state based on ifindex, instead of dump all.
+ t = vpp_papi.sw_interface_set_flags(ifindex, 1, 1, 0)
+ self.assertEqual(t.retval, 0)
+
+ t = vpp_papi.want_stats(True, pid)
+
+ print (t)
+
+ #
+ # Wait for some stats
+ #
+ self.assertEqual(papi_event.wait(15), True)
+ t = vpp_papi.want_stats(False, pid)
+ print (t)
+
+
+ #
+ # Plugins?
+ #
+
+if __name__ == '__main__':
+ #logging.basicConfig(level=logging.DEBUG)
+ unittest.main()
+def test_papi():
+ print('test')
diff --git a/src/vpp-api/python/tests/test_version.py b/src/vpp-api/python/tests/test_version.py
new file mode 100755
index 00000000..de39cc24
--- /dev/null
+++ b/src/vpp-api/python/tests/test_version.py
@@ -0,0 +1,35 @@
+from __future__ import print_function
+import unittest, sys, time, threading, struct
+
+import vpp_papi
+from ipaddress import *
+import glob, subprocess
+class TestPAPI(unittest.TestCase):
+ def setUp(self):
+ print("Connecting API")
+ r = vpp_papi.connect("test_papi")
+ self.assertEqual(r, 0)
+
+ def tearDown(self):
+ r = vpp_papi.disconnect()
+ self.assertEqual(r, 0)
+
+ #
+ # The tests themselves
+ #
+
+ #
+ # Basic request / reply
+ #
+ def test_show_version(self):
+ print(vpp_papi.show_version())
+
+ #
+ # Details / Dump
+ #
+ def test_details_dump(self):
+ t = vpp_papi.sw_interface_dump(0, b'')
+ print('Dump/details T', t)
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/src/vpp-api/python/tests/test_vpp_papi2.py b/src/vpp-api/python/tests/test_vpp_papi2.py
new file mode 100755
index 00000000..f45f791e
--- /dev/null
+++ b/src/vpp-api/python/tests/test_vpp_papi2.py
@@ -0,0 +1,487 @@
+#!/usr/bin/env python
+
+from __future__ import print_function
+import unittest, sys, threading, struct, logging, os
+from vpp_papi import VPP
+from ipaddress import *
+import glob, json
+
+papi_event = threading.Event()
+import glob
+
+import fnmatch
+import os
+
+jsonfiles = []
+for root, dirnames, filenames in os.walk('../../../build-root/'):
+ if root.find('install-') == -1: continue
+ for filename in fnmatch.filter(filenames, '*.api.json'):
+ jsonfiles.append(os.path.join(root, filename))
+
+class TestPAPI(unittest.TestCase):
+ show_version_msg = '''["show_version",
+ ["u16", "_vl_msg_id"],
+ ["u32", "client_index"],
+ ["u32", "context"],
+ {"crc" : "0xf18f9480"}
+ ]'''
+
+ ip_address_details_msg = '''["ip_address_details",
+ ["u16", "_vl_msg_id"],
+ ["u32", "client_index"],
+ ["u32", "context"],
+ ["u8", "ip", 16],
+ ["u8", "prefix_length"],
+ {"crc" : "0x87d522a1"}
+ ]'''
+
+ cli_inband_msg = '''["cli_inband",
+ ["u16", "_vl_msg_id"],
+ ["u32", "client_index"],
+ ["u32", "context"],
+ ["u32", "length"],
+ ["u8", "cmd", 0, "length"],
+ {"crc" : "0x22345937"}
+ ]'''
+
+ def test_adding_new_message_object(self):
+ p = json.loads(TestPAPI.show_version_msg)
+ msglist = VPP(testmode=json)
+ msgdef = msglist.add_message(p[0], p[1:])
+
+ # Verify that message can be retrieved
+ self.assertTrue(msglist['show_version'])
+ self.assertFalse(msglist['foobar'])
+
+ # Test duplicate
+ self.assertRaises(ValueError, msglist.add_message, p[0], p[1:])
+
+ # Look at return tuple
+ self.assertTrue(msglist.ret_tup('show_version'))
+
+ def test_adding_new_message_object_with_array(self):
+ p = json.loads(TestPAPI.ip_address_details_msg)
+ msglist = VPP(testmode=True)
+ msglist.add_message(p[0], p[1:])
+
+ self.assertTrue(msglist['ip_address_details'])
+
+ def test_message_to_bytes(self):
+ msglist = VPP(testmode=True)
+ p = json.loads(TestPAPI.show_version_msg)
+ msgdef = msglist.add_message(p[0], p[1:])
+
+ # Give me a byte string for given message and given arguments
+
+ b = msglist.encode(msgdef, {'_vl_msg_id' : 50, 'context' : 123 })
+ self.assertEqual(10, len(b))
+ rv = msglist.decode(msgdef, b)
+ self.assertEqual(rv._0, 50)
+ self.assertEqual(rv.context, 123)
+
+
+ p = json.loads(TestPAPI.ip_address_details_msg)
+ msgdef = msglist.add_message(p[0], p[1:])
+
+ # Give me a byte string for given message and given arguments
+ b = msglist.encode(msgdef, {'_vl_msg_id' : 50, 'context' : 123,
+ 'ip' : b'\xf0\xf1\xf2',
+ 'prefix_length' : 12})
+ self.assertEqual(27, len(b))
+ rv = msglist.decode(msgdef, b)
+
+ self.assertEqual(rv.context, 123)
+ self.assertEqual(rv.ip, b'\xf0\xf1\xf2\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00')
+ self.assertEqual(rv.prefix_length, 12)
+
+ p = json.loads(TestPAPI.cli_inband_msg)
+ msgdef = msglist.add_message(p[0], p[1:])
+
+ # Give me a byte string for given message and given arguments
+ b = msglist.encode(msgdef, { '_vl_msg_id' : 50, 'context' : 123,
+ 'length' : 20, 'cmd' : 'show version verbose'})
+ self.assertEqual(34, len(b))
+ rv = msglist.decode(msgdef, b)
+ self.assertEqual(rv._0, 50)
+ self.assertEqual(rv.context, 123)
+ self.assertEqual(rv.cmd.decode('ascii'), 'show version verbose')
+
+ variable_array_16_msg = '''["variable_array_16",
+ ["u32", "length"],
+ ["u16", "list", 0, "length"]
+ ]'''
+
+ p = json.loads(variable_array_16_msg)
+ msgdef = msglist.add_message(p[0], p[1:])
+
+ # Give me a byte string for given message and given arguments
+ b = msglist.encode(msgdef, { 'list' : [1, 2], 'length' :2})
+ self.assertEqual(8, len(b))
+ rv = msglist.decode(msgdef, b)
+ self.assertEqual(2, rv.length)
+ self.assertEqual([1,2], rv.list)
+
+ def test_add_new_types(self):
+ counter_type = '''["ip4_fib_counter",
+ ["u32", "address"],
+ ["u8", "address_length"],
+ ["u64", "packets"],
+ ["u64", "bytes"],
+ {"crc" : "0xb2739495"}
+ ]'''
+
+ with_type_msg = '''["with_type_msg",
+ ["u32", "length"],
+ ["u16", "list", 0, "length"],
+ ["vl_api_ip4_fib_counter_t", "counter"]
+ ]'''
+
+ # Add new type
+ msglist = VPP(testmode=True)
+ p = json.loads(counter_type)
+ msglist.add_type(p[0], p[1:])
+ p = json.loads(with_type_msg)
+ msgdef = msglist.add_message(p[0], p[1:])
+ b = msglist.encode(msgdef, {'length' : 2, 'list' : [1,2],
+ 'counter' : { 'address' : 4, 'address_length' : 12,
+ 'packets': 1235, 'bytes' : 5678}})
+ self.assertEqual(29, len(b)) # feil
+ rv = msglist.decode(msgdef, b)
+ self.assertEqual(2, rv.length)
+ self.assertEqual(5678, rv.counter.bytes)
+
+ def test_add_new_compound_type_with_array(self):
+ counter_type = '''["ip4_fib_counter",
+ ["u32", "address"],
+ ["u8", "address_length"],
+ ["u64", "packets"],
+ ["u64", "bytes"],
+ {"crc" : "0xb2739495"}
+ ]'''
+
+ with_type_msg = '''["with_type_msg",
+ ["u32", "length"],
+ ["u16", "list", 0, "length"],
+ ["vl_api_ip4_fib_counter_t", "counter", 2]
+
+ ]'''
+
+ # Add new type
+ msglist = VPP(testmode=True)
+ p = json.loads(counter_type)
+ msglist.add_type(p[0], p[1:])
+ p = json.loads(with_type_msg)
+ msgdef = msglist.add_message(p[0], p[1:])
+ b = msglist.encode(msgdef, {'length' : 2, 'list' : [1,2],
+ 'counter' : [{ 'address' : 4, 'address_length' : 12,
+ 'packets': 1235, 'bytes' : 5678},
+ { 'address' : 111, 'address_length' : 222,
+ 'packets': 333, 'bytes' : 444}]})
+ self.assertEqual(50, len(b))
+ rv = msglist.decode(msgdef, b)
+ self.assertEqual([1,2], rv.list)
+ self.assertEqual(1235, rv.counter[0].packets)
+
+ with_type_variable_msg = '''["with_type_variable_msg",
+ ["u32", "length"],
+ ["vl_api_ip4_fib_counter_t", "counter", 0, "length"]
+
+ ]'''
+
+ p = json.loads(with_type_variable_msg)
+ msgdef = msglist.add_message(p[0], p[1:])
+ b = msglist.encode(msgdef, {'length' : 2,
+ 'counter' : [{ 'address' : 4, 'address_length' : 12,
+ 'packets': 1235, 'bytes' : 5678},
+ { 'address' : 111, 'address_length' : 222,
+ 'packets': 333, 'bytes' : 444}]})
+ self.assertEqual(46, len(b))
+ rv = msglist.decode(msgdef, b)
+ self.assertEqual(2, rv.length)
+ self.assertEqual(1235, rv.counter[0].packets)
+ self.assertEqual(333, rv.counter[1].packets)
+
+ def test_simple_array(self):
+ msglist = VPP(testmode=True)
+
+ simple_byte_array = '''["simple_byte_array",
+ ["u32", "length"],
+ ["u8", "namecommand", 64]
+
+ ]'''
+ p = json.loads(simple_byte_array)
+ msgdef = msglist.add_message(p[0], p[1:])
+ b = msglist.encode(msgdef, {'length': 2, 'namecommand': 'foobar'})
+ self.assertEqual(68, len(b))
+ rv = msglist.decode(msgdef, b)
+ self.assertEqual(2, rv.length)
+
+ simple_array = '''["simple_array",
+ ["u32", "length"],
+ ["u32", "list", 2]
+
+ ]'''
+ p = json.loads(simple_array)
+ msgdef = msglist.add_message(p[0], p[1:])
+ b = msglist.encode(msgdef, {'length': 2, 'list': [1,2]})
+ self.assertEqual(12, len(b))
+ rv = msglist.decode(msgdef, b)
+ self.assertEqual(2, rv.length)
+ self.assertEqual([1,2], rv.list)
+
+ simple_variable_array = '''["simple_variable_array",
+ ["u32", "length"],
+ ["u32", "list", 0, "length"]
+
+ ]'''
+ p = json.loads(simple_variable_array)
+ msgdef = msglist.add_message(p[0], p[1:])
+ b = msglist.encode(msgdef, {'length':2, 'list': [1,2]})
+ self.assertEqual(12, len(b))
+ rv = msglist.decode(msgdef, b)
+ self.assertEqual(2, rv.length)
+ self.assertEqual([1,2], rv.list)
+
+ simple_variable_byte_array = '''["simple_variable_byte_array",
+ ["u32", "length"],
+ ["u8", "list", 0, "length"]
+ ]'''
+ p = json.loads(simple_variable_byte_array)
+ msgdef =msglist.add_message(p[0], p[1:])
+ b = msglist.encode(msgdef, {'length': 6, 'list' : 'foobar'})
+ self.assertEqual(10, len(b))
+ rv = msglist.decode(msgdef, b)
+ self.assertEqual(6, rv.length)
+ self.assertEqual('foobar', rv.list)
+
+ def test_old_vla_array(self):
+ msglist = VPP(testmode = True)
+
+ # VLA
+ vla_byte_array = '''["vla_byte_array",
+ ["u32", "foobar"],
+ ["u32", "list", 2],
+ ["u32", "propercount"],
+ ["u8", "propermask", 0, "propercount"],
+ ["u8", "oldmask", 0],
+ {"crc" : "0xb2739495"}
+ ]'''
+ p = json.loads(vla_byte_array)
+ msgdef = msglist.add_message(p[0], p[1:])
+ b = msglist.encode(msgdef, {'list' : [123, 456], 'oldmask': b'foobar',
+ 'propercount' : 2,
+ 'propermask' : [8,9]})
+ self.assertEqual(24, len(b))
+ rv = msglist.decode(msgdef, b)
+ self.assertEqual(b'foobar', rv.oldmask)
+
+ def test_old_vla_array_not_last_member(self):
+ msglist = VPP(testmode = True)
+
+ # VLA
+ vla_byte_array = '''["vla_byte_array",
+ ["u8", "oldmask", 0],
+ ["u32", "foobar"],
+ {"crc" : "0xb2739495"}
+ ]'''
+ p = json.loads(vla_byte_array)
+ self.assertRaises(ValueError, msglist.add_message, p[0], p[1:])
+
+ def test_old_vla_array_u32(self):
+ msglist = VPP(testmode = True)
+
+ # VLA
+ vla_byte_array = '''["vla_byte_array",
+ ["u32", "foobar"],
+ ["u32", "oldmask", 0],
+ {"crc" : "0xb2739495"}
+ ]'''
+ p = json.loads(vla_byte_array)
+ msgdef = msglist.add_message(p[0], p[1:])
+ b = msglist.encode(msgdef, {'foobar' : 123, 'oldmask': [123, 456, 789]})
+ self.assertEqual(16, len(b))
+ rv = msglist.decode(msgdef, b)
+ self.assertEqual([123, 456, 789], rv.oldmask)
+
+ def test_old_vla_array_compound(self):
+ msglist = VPP(testmode = True)
+
+ # VLA
+ counter_type = '''["ip4_fib_counter",
+ ["u32", "address"],
+ ["u8", "address_length"],
+ ["u64", "packets"],
+ ["u64", "bytes"],
+ {"crc" : "0xb2739495"}
+ ]'''
+
+ vla_byte_array = '''["vla_byte_array",
+ ["vl_api_ip4_fib_counter_t", "counter", 0],
+ {"crc" : "0xb2739495"}
+ ]'''
+
+ p = json.loads(counter_type)
+ msglist.add_type(p[0], p[1:])
+
+ p = json.loads(vla_byte_array)
+ with self.assertRaises(NotImplementedError):
+ msgdef = msglist.add_message(p[0], p[1:])
+
+ def test_array_count_not_previous(self):
+ msglist = VPP(testmode = True)
+
+ # VLA
+ vla_byte_array = '''["vla_byte_array",
+ ["u32", "count"],
+ ["u32", "filler"],
+ ["u32", "lst", 0, "count"],
+ {"crc" : "0xb2739495"}
+ ]'''
+
+ p = json.loads(vla_byte_array)
+ msgdef = msglist.add_message(p[0], p[1:])
+ b = msglist.encode(msgdef, {'count': 3, 'lst': [1,2,3], 'filler' : 1 })
+ rv = msglist.decode(msgdef, b)
+ self.assertEqual(rv.lst, [1,2,3])
+
+ def test_argument_name(self):
+ msglist = VPP(testmode=True)
+
+
+ simple_name = '''["simple_name",
+ ["u32", "length"],
+ ["u8", "name"]
+ ]'''
+ p = json.loads(simple_name)
+ msgdef = msglist.add_message(p[0], p[1:])
+ b = msglist.encode(msgdef, {'length': 6, 'name': 1})
+ self.assertEqual(5, len(b))
+ rv = msglist.decode(msgdef, b)
+ self.assertEqual(6, rv.length)
+ self.assertEqual(1, rv.name)
+
+class TestConnectedPAPI(unittest.TestCase):
+ def test_request_reply_function(self):
+ vpp = VPP(jsonfiles)
+
+ vpp.connect('test_vpp_papi2')
+
+ rv = vpp.show_version()
+ self.assertEqual(0, rv.retval)
+ self.assertEqual('vpe', rv.program.decode().rstrip('\0x00'))
+ vpp.disconnect()
+
+
+ def test_dump_details_function(self):
+ vpp = VPP(jsonfiles)
+ vpp.connect('test_vpp_papi3')
+
+ rv = vpp.sw_interface_dump()
+ #self.assertEqual(0, rv.retval)
+ print('RV', rv)
+ vpp.disconnect()
+
+ def test_vla(self):
+ vpp = VPP(jsonfiles)
+
+ vpp.connect('test_vpp_papi3')
+
+ cmd = 'show version verbose'
+ rv = vpp.cli_inband(length=len(cmd), cmd=cmd)
+ self.assertEqual(0, rv.retval)
+ print('RV', rv.reply)
+
+ cmd = 'show vlib graph'
+ rv = vpp.cli_inband(length=len(cmd), cmd=cmd)
+ self.assertEqual(0, rv.retval)
+ print('RV', rv.reply)
+ vpp.disconnect()
+
+ def test_events(self):
+ vpp = VPP(jsonfiles)
+
+ vpp.connect('test_vpp_papi3')
+
+ vpp.register_event_callback(event_handler)
+
+ rv = vpp.want_interface_events(enable_disable = True)
+ self.assertEqual(0, rv.retval)
+ print('RV', rv)
+
+ rv = vpp.create_loopback()
+ print('RV', rv)
+ self.assertEqual(0, rv.retval)
+
+ rv = vpp.sw_interface_set_flags(sw_if_index = 1, admin_up_down = 1)
+ print('RV', rv)
+ self.assertEqual(0, rv.retval)
+ rv = vpp.sw_interface_set_flags(sw_if_index = 1, admin_up_down = 0)
+ print('RV', rv)
+ self.assertEqual(0, rv.retval)
+ self.assertEqual(papi_event.wait(10), True)
+
+ vpp.disconnect()
+
+def event_handler(msgname, result):
+ print('IN EVENT HANDLER:', msgname, result)
+ papi_event.set()
+
+class TestACL(unittest.TestCase):
+ def test_acl_create(self):
+ vpp = VPP(jsonfiles)
+
+ vpp.connect('acl-test')
+
+ rv = vpp.acl_plugin_get_version()
+ print('RV', rv)
+ self.assertEqual(rv.major, 1)
+ self.assertEqual(rv.minor, 1)
+
+ rv = vpp.acl_add_replace(acl_index = 0xFFFFFFFF,
+ r = [{
+ "is_permit" : 1,
+ "is_ipv6" : 0,
+ "proto" : 6,
+ "srcport_or_icmptype_first" : 80,
+ }],
+ count = 1)
+ print ('RV', rv)
+ rv = vpp.acl_add_replace(acl_index = 0xFFFFFFFF,
+ r = [{
+ "is_permit" : 1,
+ "is_ipv6" : 0,
+ "proto" : 6,
+ "srcport_or_icmptype_first" : 81,
+ }],
+ count = 1)
+ self.assertEqual(rv.retval, 0)
+ print ('RV', rv)
+ ai = rv.acl_index
+ rv = vpp.acl_dump()
+ print ('RV', rv)
+
+ #rv = vpp.acl_del(acl_index = ai)
+ #self.assertEqual(rv.retval, 0)
+
+ #rv = vpp.acl_dump()
+ #self.assertEqual([], vpp.acl_dump())
+
+ vpp.disconnect()
+
+ def test_status(self):
+ vpp = VPP(jsonfiles)
+ vpp.status()
+
+ def test_acl_interface_get(self):
+ vpp = VPP(jsonfiles)
+
+ vpp.connect('test_vpp_papi2')
+
+ rv = vpp.macip_acl_interface_get()
+
+ print('RV', rv)
+
+ vpp.disconnect()
+
+if __name__ == '__main__':
+ unittest.main()