aboutsummaryrefslogtreecommitdiffstats
path: root/src/vpp-api/python/vpp_papi
diff options
context:
space:
mode:
authorOle Troan <ot@cisco.com>2018-08-02 11:58:12 +0200
committerFlorin Coras <florin.coras@gmail.com>2018-10-02 21:10:20 +0000
commit94495f2a6a68ac2202b7715ce09620f1ba6fe673 (patch)
treeb87604a42f7cacedff6c82f7d8fc68adc8348a5f /src/vpp-api/python/vpp_papi
parent84db4087fa38b8d4c62cbb0787d600950638034c (diff)
PAPI: Use UNIX domain sockets instead of shared memory
Adds support for running the API purely across Unix domain sockets. Usage: vpp = VPP(use_socket=True) Change-Id: Iafc1301e03dd3edc3f4d702dd6c0b98d3b50b69e Signed-off-by: Ole Troan <ot@cisco.com>
Diffstat (limited to 'src/vpp-api/python/vpp_papi')
-rw-r--r--src/vpp-api/python/vpp_papi/vpp_papi.py146
-rw-r--r--src/vpp-api/python/vpp_papi/vpp_serializer.py2
-rw-r--r--src/vpp-api/python/vpp_papi/vpp_transport_shmem.py117
-rw-r--r--src/vpp-api/python/vpp_papi/vpp_transport_socket.py176
4 files changed, 331 insertions, 110 deletions
diff --git a/src/vpp-api/python/vpp_papi/vpp_papi.py b/src/vpp-api/python/vpp_papi/vpp_papi.py
index 4f765ecbd18..e1a7059f317 100644
--- a/src/vpp-api/python/vpp_papi/vpp_papi.py
+++ b/src/vpp-api/python/vpp_papi/vpp_papi.py
@@ -26,8 +26,6 @@ import threading
import fnmatch
import weakref
import atexit
-from cffi import FFI
-import cffi
from . vpp_serializer import VPPType, VPPEnumType, VPPUnionType, BaseTypes
from . vpp_serializer import VPPMessage
@@ -36,41 +34,15 @@ if sys.version[0] == '2':
else:
import queue as queue
-ffi = FFI()
-ffi.cdef("""
-typedef void (*vac_callback_t)(unsigned char * data, int len);
-typedef void (*vac_error_callback_t)(void *, unsigned char *, int);
-int vac_connect(char * name, char * chroot_prefix, vac_callback_t cb,
- int rx_qlen);
-int vac_disconnect(void);
-int vac_read(char **data, int *l, unsigned short timeout);
-int vac_write(char *data, int len);
-void vac_free(void * msg);
-
-int vac_get_msg_index(unsigned char * name);
-int vac_msg_table_size(void);
-int vac_msg_table_max_index(void);
-
-void vac_rx_suspend (void);
-void vac_rx_resume (void);
-void vac_set_error_handler(vac_error_callback_t);
- """)
-
-# Barfs on failure, no need to check success.
-vpp_api = ffi.dlopen('libvppapiclient.so')
-
def vpp_atexit(vpp_weakref):
"""Clean up VPP connection on shutdown."""
vpp_instance = vpp_weakref()
- if vpp_instance and vpp_instance.connected:
+ if vpp_instance and vpp_instance.transport.connected:
vpp_instance.logger.debug('Cleaning up VPP on exit')
vpp_instance.disconnect()
-vpp_object = None
-
-
def vpp_iterator(d):
if sys.version[0] == '2':
return d.iteritems()
@@ -78,21 +50,6 @@ def vpp_iterator(d):
return d.items()
-@ffi.callback("void(unsigned char *, int)")
-def vac_callback_sync(data, len):
- vpp_object.msg_handler_sync(ffi.buffer(data, len))
-
-
-@ffi.callback("void(unsigned char *, int)")
-def vac_callback_async(data, len):
- vpp_object.msg_handler_async(ffi.buffer(data, len))
-
-
-@ffi.callback("void(void *, unsigned char *, int)")
-def vac_error_handler(arg, msg, msg_len):
- vpp_object.logger.warning("VPP API client:: %s", ffi.string(msg, msg_len))
-
-
class VppApiDynamicMethodHolder(object):
pass
@@ -168,7 +125,8 @@ class VPP():
def __init__(self, apifiles=None, testmode=False, async_thread=True,
logger=logging.getLogger('vpp_papi'), loglevel='debug',
- read_timeout=0):
+ read_timeout=5, use_socket=False,
+ server_address='/run/vpp-api.sock'):
"""Create a VPP API object.
apifiles is a list of files containing API
@@ -181,9 +139,6 @@ class VPP():
loglevel, if supplied, is the log level this logger is set
to report at (from the loglevels in the logging module).
"""
- global vpp_object
- vpp_object = self
-
if logger is None:
logger = logging.getLogger(__name__)
if loglevel is not None:
@@ -193,16 +148,19 @@ class VPP():
self.messages = {}
self.id_names = []
self.id_msgdef = []
- self.connected = False
self.header = VPPType('header', [['u16', 'msgid'],
['u32', 'client_index']])
self.apifiles = []
self.event_callback = None
self.message_queue = queue.Queue()
self.read_timeout = read_timeout
- self.vpp_api = vpp_api
self.async_thread = async_thread
+ if use_socket:
+ from . vpp_transport_socket import VppTransport
+ else:
+ from . vpp_transport_shmem import VppTransport
+
if not apifiles:
# Pick up API definitions from default directory
try:
@@ -224,22 +182,11 @@ class VPP():
if len(self.messages) == 0 and not testmode:
raise ValueError(1, 'Missing JSON message definitions')
+ self.transport = VppTransport(self, read_timeout=read_timeout,
+ server_address=server_address)
# Make sure we allow VPP to clean up the message rings.
atexit.register(vpp_atexit, weakref.ref(self))
- # Register error handler
- if not testmode:
- vpp_api.vac_set_error_handler(vac_error_handler)
-
- # Support legacy CFFI
- # from_buffer supported from 1.8.0
- (major, minor, patch) = [int(s) for s in
- cffi.__version__.split('.', 3)]
- if major >= 1 and minor >= 8:
- self._write = self._write_new_cffi
- else:
- self._write = self._write_legacy_cffi
-
class ContextId(object):
"""Thread-safe provider of unique context IDs."""
def __init__(self):
@@ -377,11 +324,6 @@ class VPP():
return api_files
- def status(self):
- """Debug function: report current VPP API status to stdout."""
- print('Connected') if self.connected else print('Not Connected')
- print('Read API definitions from', ', '.join(self.apifiles))
-
@property
def api(self):
if not hasattr(self, "_api"):
@@ -408,7 +350,7 @@ class VPP():
self._api = VppApiDynamicMethodHolder()
for name, msg in vpp_iterator(self.messages):
n = name + '_' + msg.crc[2:]
- i = vpp_api.vac_get_msg_index(n.encode())
+ i = self.transport.get_msg_index(n.encode())
if i > 0:
self.id_msgdef[i] = msg
self.id_names[i] = name
@@ -420,43 +362,19 @@ class VPP():
self.logger.debug(
'No such message type or failed CRC checksum: %s', n)
- def _write_new_cffi(self, buf):
- """Send a binary-packed message to VPP."""
- if not self.connected:
- raise IOError(1, 'Not connected')
- return vpp_api.vac_write(ffi.from_buffer(buf), len(buf))
-
- def _write_legacy_cffi(self, buf):
- """Send a binary-packed message to VPP."""
- if not self.connected:
- raise IOError(1, 'Not connected')
- return vpp_api.vac_write(bytes(buf), len(buf))
-
- def _read(self):
- if not self.connected:
- raise IOError(1, 'Not connected')
- mem = ffi.new("char **")
- size = ffi.new("int *")
- rv = vpp_api.vac_read(mem, size, self.read_timeout)
- if rv:
- raise IOError(rv, 'vac_read failed')
- msg = bytes(ffi.buffer(mem[0], size[0]))
- vpp_api.vac_free(mem[0])
- return msg
-
def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
do_async):
- pfx = chroot_prefix.encode() if chroot_prefix else ffi.NULL
- rv = vpp_api.vac_connect(name.encode(), pfx, msg_handler, rx_qlen)
+ pfx = chroot_prefix.encode() if chroot_prefix else None
+
+ rv = self.transport.connect(name.encode(), pfx, msg_handler, rx_qlen)
if rv != 0:
raise IOError(2, 'Connect failed')
- self.connected = True
- self.vpp_dictionary_maxid = vpp_api.vac_msg_table_max_index()
+ self.vpp_dictionary_maxid = self.transport.msg_table_max_index()
self._register_functions(do_async=do_async)
# Initialise control ping
crc = self.messages['control_ping'].crc
- self.control_ping_index = vpp_api.vac_get_msg_index(
+ self.control_ping_index = self.transport.get_msg_index(
('control_ping' + '_' + crc[2:]).encode())
self.control_ping_msgdef = self.messages['control_ping']
if self.async_thread:
@@ -475,7 +393,7 @@ class VPP():
rx_qlen - the length of the VPP message receive queue between
client and server.
"""
- msg_handler = vac_callback_sync if not do_async else vac_callback_async
+ msg_handler = self.transport.get_callback(do_async)
return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
do_async)
@@ -488,13 +406,12 @@ class VPP():
client and server.
"""
- return self.connect_internal(name, ffi.NULL, chroot_prefix, rx_qlen,
+ return self.connect_internal(name, None, chroot_prefix, rx_qlen,
do_async=False)
def disconnect(self):
"""Detach from VPP."""
- rv = vpp_api.vac_disconnect()
- self.connected = False
+ rv = self.transport.disconnect()
self.message_queue.put("terminate event thread")
return rv
@@ -586,10 +503,16 @@ class VPP():
context = kwargs['context']
kwargs['_vl_msg_id'] = i
+ try:
+ if self.transport.socket_index:
+ kwargs['client_index'] = self.transport.socket_index
+ except AttributeError:
+ pass
self.validate_args(msg, kwargs)
b = msg.pack(kwargs)
- vpp_api.vac_rx_suspend()
- self._write(b)
+ self.transport.suspend()
+
+ self.transport.write(b)
if multipart:
# Send a ping after the request - we use its response
@@ -599,12 +522,13 @@ class VPP():
# Block until we get a reply.
rl = []
while (True):
- msg = self._read()
+ msg = self.transport.read()
if not msg:
raise IOError(2, 'VPP API client: read failed')
r = self.decode_incoming_msg(msg)
msgname = type(r).__name__
if context not in r or r.context == 0 or context != r.context:
+ # Message being queued
self.message_queue.put_nowait(r)
continue
@@ -616,7 +540,7 @@ class VPP():
rl.append(r)
- vpp_api.vac_rx_resume()
+ self.transport.resume()
return rl
@@ -634,11 +558,15 @@ class VPP():
kwargs['context'] = context
else:
context = kwargs['context']
- kwargs['client_index'] = 0
+ try:
+ if self.transport.socket_index:
+ kwargs['client_index'] = self.transport.socket_index
+ except AttributeError:
+ kwargs['client_index'] = 0
kwargs['_vl_msg_id'] = i
b = msg.pack(kwargs)
- self._write(b)
+ self.transport.write(b)
def register_event_callback(self, callback):
"""Register a callback for async messages.
@@ -659,7 +587,7 @@ class VPP():
self.event_callback = callback
def thread_msg_handler(self):
- """Python thread calling the user registerd message handler.
+ """Python thread calling the user registered message handler.
This is to emulate the old style event callback scheme. Modern
clients should provide their own thread to poll the event
diff --git a/src/vpp-api/python/vpp_papi/vpp_serializer.py b/src/vpp-api/python/vpp_papi/vpp_serializer.py
index 2177cdbb2e4..103a078cd5b 100644
--- a/src/vpp-api/python/vpp_papi/vpp_serializer.py
+++ b/src/vpp-api/python/vpp_papi/vpp_serializer.py
@@ -80,7 +80,7 @@ class FixedList_u8():
if len(data[offset:]) < self.num:
raise ValueError('Invalid array length for "{}" got {}'
' expected {}'
- .format(self.name, len(data), self.num))
+ .format(self.name, len(data[offset:]), self.num))
return self.packer.unpack(data, offset)
diff --git a/src/vpp-api/python/vpp_papi/vpp_transport_shmem.py b/src/vpp-api/python/vpp_papi/vpp_transport_shmem.py
new file mode 100644
index 00000000000..a20295b0f09
--- /dev/null
+++ b/src/vpp-api/python/vpp_papi/vpp_transport_shmem.py
@@ -0,0 +1,117 @@
+#
+# A transport class. With two implementations.
+# One for socket and one for shared memory.
+#
+
+from cffi import FFI
+import cffi
+
+ffi = FFI()
+ffi.cdef("""
+typedef void (*vac_callback_t)(unsigned char * data, int len);
+typedef void (*vac_error_callback_t)(void *, unsigned char *, int);
+int vac_connect(char * name, char * chroot_prefix, vac_callback_t cb,
+ int rx_qlen);
+int vac_disconnect(void);
+int vac_read(char **data, int *l, unsigned short timeout);
+int vac_write(char *data, int len);
+void vac_free(void * msg);
+
+int vac_get_msg_index(unsigned char * name);
+int vac_msg_table_size(void);
+int vac_msg_table_max_index(void);
+
+void vac_rx_suspend (void);
+void vac_rx_resume (void);
+void vac_set_error_handler(vac_error_callback_t);
+ """)
+
+vpp_object = None
+
+# Barfs on failure, no need to check success.
+vpp_api = ffi.dlopen('libvppapiclient.so')
+
+
+@ffi.callback("void(unsigned char *, int)")
+def vac_callback_sync(data, len):
+ vpp_object.msg_handler_sync(ffi.buffer(data, len))
+
+
+@ffi.callback("void(unsigned char *, int)")
+def vac_callback_async(data, len):
+ vpp_object.msg_handler_async(ffi.buffer(data, len))
+
+
+@ffi.callback("void(void *, unsigned char *, int)")
+def vac_error_handler(arg, msg, msg_len):
+ vpp_object.logger.warning("VPP API client:: %s", ffi.string(msg, msg_len))
+
+
+class VppTransport:
+ def __init__(self, parent, read_timeout, server_address):
+ self.connected = False
+ self.read_timeout = read_timeout
+ self.parent = parent
+ global vpp_object
+ vpp_object = parent
+
+ # Register error handler
+ vpp_api.vac_set_error_handler(vac_error_handler)
+
+ # Support legacy CFFI
+ # from_buffer supported from 1.8.0
+ (major, minor, patch) = [int(s) for s in
+ cffi.__version__.split('.', 3)]
+ if major >= 1 and minor >= 8:
+ self.write = self._write_new_cffi
+ else:
+ self.write = self._write_legacy_cffi
+
+ def connect(self, name, pfx, msg_handler, rx_qlen):
+ self.connected = True
+ if not pfx:
+ pfx = ffi.NULL
+ return vpp_api.vac_connect(name, pfx, msg_handler, rx_qlen)
+
+ def disconnect(self):
+ self.connected = False
+ vpp_api.vac_disconnect()
+
+ def suspend(self):
+ vpp_api.vac_rx_suspend()
+
+ def resume(self):
+ vpp_api.vac_rx_resume()
+
+ def get_callback(self, async):
+ return vac_callback_sync if not async else vac_callback_async
+
+ def get_msg_index(self, name):
+ return vpp_api.vac_get_msg_index(name)
+
+ def msg_table_max_index(self):
+ return vpp_api.vac_msg_table_max_index()
+
+ def _write_new_cffi(self, buf):
+ """Send a binary-packed message to VPP."""
+ if not self.connected:
+ raise IOError(1, 'Not connected')
+ return vpp_api.vac_write(ffi.from_buffer(buf), len(buf))
+
+ def _write_legacy_cffi(self, buf):
+ """Send a binary-packed message to VPP."""
+ if not self.connected:
+ raise IOError(1, 'Not connected')
+ return vpp_api.vac_write(bytes(buf), len(buf))
+
+ def read(self):
+ if not self.connected:
+ raise IOError(1, 'Not connected')
+ mem = ffi.new("char **")
+ size = ffi.new("int *")
+ rv = vpp_api.vac_read(mem, size, self.read_timeout)
+ if rv:
+ raise IOError(rv, 'vac_read failed')
+ msg = bytes(ffi.buffer(mem[0], size[0]))
+ vpp_api.vac_free(mem[0])
+ return msg
diff --git a/src/vpp-api/python/vpp_papi/vpp_transport_socket.py b/src/vpp-api/python/vpp_papi/vpp_transport_socket.py
new file mode 100644
index 00000000000..1822deb6d07
--- /dev/null
+++ b/src/vpp-api/python/vpp_papi/vpp_transport_socket.py
@@ -0,0 +1,176 @@
+#
+# VPP Unix Domain Socket Transport.
+#
+import socket
+import struct
+import threading
+import select
+import multiprocessing
+import logging
+
+
+class VppTransport:
+ def __init__(self, parent, read_timeout, server_address):
+ self.connected = False
+ self.read_timeout = read_timeout if read_timeout > 0 else 1
+ self.parent = parent
+ self.server_address = server_address
+ self.header = struct.Struct('>QII')
+ self.message_table = {}
+ self.sque = multiprocessing.Queue()
+ self.q = multiprocessing.Queue()
+ self.message_thread = threading.Thread(target=self.msg_thread_func)
+
+ def msg_thread_func(self):
+ while True:
+ try:
+ rlist, _, _ = select.select([self.socket,
+ self.sque._reader], [], [])
+ except socket.error:
+ # Terminate thread
+ logging.error('select failed')
+ self.q.put(None)
+ return
+
+ for r in rlist:
+ if r == self.sque._reader:
+ # Terminate
+ self.q.put(None)
+ return
+
+ elif r == self.socket:
+ try:
+ msg = self._read()
+ if not msg:
+ self.q.put(None)
+ return
+ except socket.error:
+ self.q.put(None)
+ return
+ # Put either to local queue or if context == 0
+ # callback queue
+ r = self.parent.decode_incoming_msg(msg)
+ if hasattr(r, 'context') and r.context > 0:
+ self.q.put(msg)
+ else:
+ self.parent.msg_handler_async(msg)
+ else:
+ raise IOError(2, 'Unknown response from select')
+
+ def connect(self, name, pfx, msg_handler, rx_qlen):
+
+ # Create a UDS socket
+ self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET)
+ self.socket.settimeout(self.read_timeout)
+
+ # Connect the socket to the port where the server is listening
+ try:
+ self.socket.connect(self.server_address)
+ except socket.error as msg:
+ logging.error(msg)
+ raise
+
+ self.connected = True
+
+ # Initialise sockclnt_create
+ sockclnt_create = self.parent.messages['sockclnt_create']
+ sockclnt_create_reply = self.parent.messages['sockclnt_create_reply']
+
+ args = {'_vl_msg_id': 15,
+ 'name': name,
+ 'context': 124}
+ b = sockclnt_create.pack(args)
+ self.write(b)
+ msg = self._read()
+ hdr, length = self.parent.header.unpack(msg, 0)
+ if hdr.msgid != 16:
+ raise IOError('Invalid reply message')
+
+ r, length = sockclnt_create_reply.unpack(msg)
+ self.socket_index = r.index
+ for m in r.message_table:
+ n = m.name.rstrip(b'\x00\x13')
+ self.message_table[n] = m.index
+
+ self.message_thread.daemon = True
+ self.message_thread.start()
+
+ return 0
+
+ def disconnect(self):
+ try: # Might fail, if VPP closes socket before packet makes it out
+ rv = self.parent.api.sockclnt_delete(index=self.socket_index)
+ except IOError:
+ pass
+ self.connected = False
+ self.socket.close()
+ self.sque.put(True) # Terminate listening thread
+ self.message_thread.join()
+
+ def suspend(self):
+ pass
+
+ def resume(self):
+ pass
+
+ def callback(self):
+ raise NotImplemented
+
+ def get_callback(self, async):
+ return self.callback
+
+ def get_msg_index(self, name):
+ try:
+ return self.message_table[name]
+ except KeyError:
+ return 0
+
+ def msg_table_max_index(self):
+ return len(self.message_table)
+
+ def write(self, buf):
+ """Send a binary-packed message to VPP."""
+ if not self.connected:
+ raise IOError(1, 'Not connected')
+
+ # Send header
+ header = self.header.pack(0, len(buf), 0)
+ n = self.socket.send(header)
+ n = self.socket.send(buf)
+
+ def _read(self):
+ # Header and message
+ try:
+ msg = self.socket.recv(4096)
+ if len(msg) == 0:
+ return None
+ except socket.error as message:
+ logging.error(message)
+ raise
+
+ (_, l, _) = self.header.unpack(msg[:16])
+
+ if l > len(msg):
+ buf = bytearray(l + 16)
+ view = memoryview(buf)
+ view[:4096] = msg
+ view = view[4096:]
+ # Read rest of message
+ remaining_bytes = l - 4096 + 16
+ while remaining_bytes > 0:
+ bytes_to_read = (remaining_bytes if remaining_bytes
+ <= 4096 else 4096)
+ nbytes = self.socket.recv_into(view, bytes_to_read)
+ if nbytes == 0:
+ logging.error('recv failed')
+ break
+ view = view[nbytes:]
+ remaining_bytes -= nbytes
+ else:
+ buf = msg
+ return buf[16:]
+
+ def read(self):
+ if not self.connected:
+ raise IOError(1, 'Not connected')
+ return self.q.get()