From 94495f2a6a68ac2202b7715ce09620f1ba6fe673 Mon Sep 17 00:00:00 2001 From: Ole Troan Date: Thu, 2 Aug 2018 11:58:12 +0200 Subject: PAPI: Use UNIX domain sockets instead of shared memory Adds support for running the API purely across Unix domain sockets. Usage: vpp = VPP(use_socket=True) Change-Id: Iafc1301e03dd3edc3f4d702dd6c0b98d3b50b69e Signed-off-by: Ole Troan --- src/vpp-api/python/vpp_papi/vpp_papi.py | 146 +++++------------ src/vpp-api/python/vpp_papi/vpp_serializer.py | 2 +- src/vpp-api/python/vpp_papi/vpp_transport_shmem.py | 117 ++++++++++++++ .../python/vpp_papi/vpp_transport_socket.py | 176 +++++++++++++++++++++ 4 files changed, 331 insertions(+), 110 deletions(-) create mode 100644 src/vpp-api/python/vpp_papi/vpp_transport_shmem.py create mode 100644 src/vpp-api/python/vpp_papi/vpp_transport_socket.py (limited to 'src/vpp-api/python/vpp_papi') diff --git a/src/vpp-api/python/vpp_papi/vpp_papi.py b/src/vpp-api/python/vpp_papi/vpp_papi.py index 4f765ecbd18..e1a7059f317 100644 --- a/src/vpp-api/python/vpp_papi/vpp_papi.py +++ b/src/vpp-api/python/vpp_papi/vpp_papi.py @@ -26,8 +26,6 @@ import threading import fnmatch import weakref import atexit -from cffi import FFI -import cffi from . vpp_serializer import VPPType, VPPEnumType, VPPUnionType, BaseTypes from . vpp_serializer import VPPMessage @@ -36,41 +34,15 @@ if sys.version[0] == '2': else: import queue as queue -ffi = FFI() -ffi.cdef(""" -typedef void (*vac_callback_t)(unsigned char * data, int len); -typedef void (*vac_error_callback_t)(void *, unsigned char *, int); -int vac_connect(char * name, char * chroot_prefix, vac_callback_t cb, - int rx_qlen); -int vac_disconnect(void); -int vac_read(char **data, int *l, unsigned short timeout); -int vac_write(char *data, int len); -void vac_free(void * msg); - -int vac_get_msg_index(unsigned char * name); -int vac_msg_table_size(void); -int vac_msg_table_max_index(void); - -void vac_rx_suspend (void); -void vac_rx_resume (void); -void vac_set_error_handler(vac_error_callback_t); - """) - -# Barfs on failure, no need to check success. -vpp_api = ffi.dlopen('libvppapiclient.so') - def vpp_atexit(vpp_weakref): """Clean up VPP connection on shutdown.""" vpp_instance = vpp_weakref() - if vpp_instance and vpp_instance.connected: + if vpp_instance and vpp_instance.transport.connected: vpp_instance.logger.debug('Cleaning up VPP on exit') vpp_instance.disconnect() -vpp_object = None - - def vpp_iterator(d): if sys.version[0] == '2': return d.iteritems() @@ -78,21 +50,6 @@ def vpp_iterator(d): return d.items() -@ffi.callback("void(unsigned char *, int)") -def vac_callback_sync(data, len): - vpp_object.msg_handler_sync(ffi.buffer(data, len)) - - -@ffi.callback("void(unsigned char *, int)") -def vac_callback_async(data, len): - vpp_object.msg_handler_async(ffi.buffer(data, len)) - - -@ffi.callback("void(void *, unsigned char *, int)") -def vac_error_handler(arg, msg, msg_len): - vpp_object.logger.warning("VPP API client:: %s", ffi.string(msg, msg_len)) - - class VppApiDynamicMethodHolder(object): pass @@ -168,7 +125,8 @@ class VPP(): def __init__(self, apifiles=None, testmode=False, async_thread=True, logger=logging.getLogger('vpp_papi'), loglevel='debug', - read_timeout=0): + read_timeout=5, use_socket=False, + server_address='/run/vpp-api.sock'): """Create a VPP API object. apifiles is a list of files containing API @@ -181,9 +139,6 @@ class VPP(): loglevel, if supplied, is the log level this logger is set to report at (from the loglevels in the logging module). """ - global vpp_object - vpp_object = self - if logger is None: logger = logging.getLogger(__name__) if loglevel is not None: @@ -193,16 +148,19 @@ class VPP(): self.messages = {} self.id_names = [] self.id_msgdef = [] - self.connected = False self.header = VPPType('header', [['u16', 'msgid'], ['u32', 'client_index']]) self.apifiles = [] self.event_callback = None self.message_queue = queue.Queue() self.read_timeout = read_timeout - self.vpp_api = vpp_api self.async_thread = async_thread + if use_socket: + from . vpp_transport_socket import VppTransport + else: + from . vpp_transport_shmem import VppTransport + if not apifiles: # Pick up API definitions from default directory try: @@ -224,22 +182,11 @@ class VPP(): if len(self.messages) == 0 and not testmode: raise ValueError(1, 'Missing JSON message definitions') + self.transport = VppTransport(self, read_timeout=read_timeout, + server_address=server_address) # Make sure we allow VPP to clean up the message rings. atexit.register(vpp_atexit, weakref.ref(self)) - # Register error handler - if not testmode: - vpp_api.vac_set_error_handler(vac_error_handler) - - # Support legacy CFFI - # from_buffer supported from 1.8.0 - (major, minor, patch) = [int(s) for s in - cffi.__version__.split('.', 3)] - if major >= 1 and minor >= 8: - self._write = self._write_new_cffi - else: - self._write = self._write_legacy_cffi - class ContextId(object): """Thread-safe provider of unique context IDs.""" def __init__(self): @@ -377,11 +324,6 @@ class VPP(): return api_files - def status(self): - """Debug function: report current VPP API status to stdout.""" - print('Connected') if self.connected else print('Not Connected') - print('Read API definitions from', ', '.join(self.apifiles)) - @property def api(self): if not hasattr(self, "_api"): @@ -408,7 +350,7 @@ class VPP(): self._api = VppApiDynamicMethodHolder() for name, msg in vpp_iterator(self.messages): n = name + '_' + msg.crc[2:] - i = vpp_api.vac_get_msg_index(n.encode()) + i = self.transport.get_msg_index(n.encode()) if i > 0: self.id_msgdef[i] = msg self.id_names[i] = name @@ -420,43 +362,19 @@ class VPP(): self.logger.debug( 'No such message type or failed CRC checksum: %s', n) - def _write_new_cffi(self, buf): - """Send a binary-packed message to VPP.""" - if not self.connected: - raise IOError(1, 'Not connected') - return vpp_api.vac_write(ffi.from_buffer(buf), len(buf)) - - def _write_legacy_cffi(self, buf): - """Send a binary-packed message to VPP.""" - if not self.connected: - raise IOError(1, 'Not connected') - return vpp_api.vac_write(bytes(buf), len(buf)) - - def _read(self): - if not self.connected: - raise IOError(1, 'Not connected') - mem = ffi.new("char **") - size = ffi.new("int *") - rv = vpp_api.vac_read(mem, size, self.read_timeout) - if rv: - raise IOError(rv, 'vac_read failed') - msg = bytes(ffi.buffer(mem[0], size[0])) - vpp_api.vac_free(mem[0]) - return msg - def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen, do_async): - pfx = chroot_prefix.encode() if chroot_prefix else ffi.NULL - rv = vpp_api.vac_connect(name.encode(), pfx, msg_handler, rx_qlen) + pfx = chroot_prefix.encode() if chroot_prefix else None + + rv = self.transport.connect(name.encode(), pfx, msg_handler, rx_qlen) if rv != 0: raise IOError(2, 'Connect failed') - self.connected = True - self.vpp_dictionary_maxid = vpp_api.vac_msg_table_max_index() + self.vpp_dictionary_maxid = self.transport.msg_table_max_index() self._register_functions(do_async=do_async) # Initialise control ping crc = self.messages['control_ping'].crc - self.control_ping_index = vpp_api.vac_get_msg_index( + self.control_ping_index = self.transport.get_msg_index( ('control_ping' + '_' + crc[2:]).encode()) self.control_ping_msgdef = self.messages['control_ping'] if self.async_thread: @@ -475,7 +393,7 @@ class VPP(): rx_qlen - the length of the VPP message receive queue between client and server. """ - msg_handler = vac_callback_sync if not do_async else vac_callback_async + msg_handler = self.transport.get_callback(do_async) return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen, do_async) @@ -488,13 +406,12 @@ class VPP(): client and server. """ - return self.connect_internal(name, ffi.NULL, chroot_prefix, rx_qlen, + return self.connect_internal(name, None, chroot_prefix, rx_qlen, do_async=False) def disconnect(self): """Detach from VPP.""" - rv = vpp_api.vac_disconnect() - self.connected = False + rv = self.transport.disconnect() self.message_queue.put("terminate event thread") return rv @@ -586,10 +503,16 @@ class VPP(): context = kwargs['context'] kwargs['_vl_msg_id'] = i + try: + if self.transport.socket_index: + kwargs['client_index'] = self.transport.socket_index + except AttributeError: + pass self.validate_args(msg, kwargs) b = msg.pack(kwargs) - vpp_api.vac_rx_suspend() - self._write(b) + self.transport.suspend() + + self.transport.write(b) if multipart: # Send a ping after the request - we use its response @@ -599,12 +522,13 @@ class VPP(): # Block until we get a reply. rl = [] while (True): - msg = self._read() + msg = self.transport.read() if not msg: raise IOError(2, 'VPP API client: read failed') r = self.decode_incoming_msg(msg) msgname = type(r).__name__ if context not in r or r.context == 0 or context != r.context: + # Message being queued self.message_queue.put_nowait(r) continue @@ -616,7 +540,7 @@ class VPP(): rl.append(r) - vpp_api.vac_rx_resume() + self.transport.resume() return rl @@ -634,11 +558,15 @@ class VPP(): kwargs['context'] = context else: context = kwargs['context'] - kwargs['client_index'] = 0 + try: + if self.transport.socket_index: + kwargs['client_index'] = self.transport.socket_index + except AttributeError: + kwargs['client_index'] = 0 kwargs['_vl_msg_id'] = i b = msg.pack(kwargs) - self._write(b) + self.transport.write(b) def register_event_callback(self, callback): """Register a callback for async messages. @@ -659,7 +587,7 @@ class VPP(): self.event_callback = callback def thread_msg_handler(self): - """Python thread calling the user registerd message handler. + """Python thread calling the user registered message handler. This is to emulate the old style event callback scheme. Modern clients should provide their own thread to poll the event diff --git a/src/vpp-api/python/vpp_papi/vpp_serializer.py b/src/vpp-api/python/vpp_papi/vpp_serializer.py index 2177cdbb2e4..103a078cd5b 100644 --- a/src/vpp-api/python/vpp_papi/vpp_serializer.py +++ b/src/vpp-api/python/vpp_papi/vpp_serializer.py @@ -80,7 +80,7 @@ class FixedList_u8(): if len(data[offset:]) < self.num: raise ValueError('Invalid array length for "{}" got {}' ' expected {}' - .format(self.name, len(data), self.num)) + .format(self.name, len(data[offset:]), self.num)) return self.packer.unpack(data, offset) diff --git a/src/vpp-api/python/vpp_papi/vpp_transport_shmem.py b/src/vpp-api/python/vpp_papi/vpp_transport_shmem.py new file mode 100644 index 00000000000..a20295b0f09 --- /dev/null +++ b/src/vpp-api/python/vpp_papi/vpp_transport_shmem.py @@ -0,0 +1,117 @@ +# +# A transport class. With two implementations. +# One for socket and one for shared memory. +# + +from cffi import FFI +import cffi + +ffi = FFI() +ffi.cdef(""" +typedef void (*vac_callback_t)(unsigned char * data, int len); +typedef void (*vac_error_callback_t)(void *, unsigned char *, int); +int vac_connect(char * name, char * chroot_prefix, vac_callback_t cb, + int rx_qlen); +int vac_disconnect(void); +int vac_read(char **data, int *l, unsigned short timeout); +int vac_write(char *data, int len); +void vac_free(void * msg); + +int vac_get_msg_index(unsigned char * name); +int vac_msg_table_size(void); +int vac_msg_table_max_index(void); + +void vac_rx_suspend (void); +void vac_rx_resume (void); +void vac_set_error_handler(vac_error_callback_t); + """) + +vpp_object = None + +# Barfs on failure, no need to check success. +vpp_api = ffi.dlopen('libvppapiclient.so') + + +@ffi.callback("void(unsigned char *, int)") +def vac_callback_sync(data, len): + vpp_object.msg_handler_sync(ffi.buffer(data, len)) + + +@ffi.callback("void(unsigned char *, int)") +def vac_callback_async(data, len): + vpp_object.msg_handler_async(ffi.buffer(data, len)) + + +@ffi.callback("void(void *, unsigned char *, int)") +def vac_error_handler(arg, msg, msg_len): + vpp_object.logger.warning("VPP API client:: %s", ffi.string(msg, msg_len)) + + +class VppTransport: + def __init__(self, parent, read_timeout, server_address): + self.connected = False + self.read_timeout = read_timeout + self.parent = parent + global vpp_object + vpp_object = parent + + # Register error handler + vpp_api.vac_set_error_handler(vac_error_handler) + + # Support legacy CFFI + # from_buffer supported from 1.8.0 + (major, minor, patch) = [int(s) for s in + cffi.__version__.split('.', 3)] + if major >= 1 and minor >= 8: + self.write = self._write_new_cffi + else: + self.write = self._write_legacy_cffi + + def connect(self, name, pfx, msg_handler, rx_qlen): + self.connected = True + if not pfx: + pfx = ffi.NULL + return vpp_api.vac_connect(name, pfx, msg_handler, rx_qlen) + + def disconnect(self): + self.connected = False + vpp_api.vac_disconnect() + + def suspend(self): + vpp_api.vac_rx_suspend() + + def resume(self): + vpp_api.vac_rx_resume() + + def get_callback(self, async): + return vac_callback_sync if not async else vac_callback_async + + def get_msg_index(self, name): + return vpp_api.vac_get_msg_index(name) + + def msg_table_max_index(self): + return vpp_api.vac_msg_table_max_index() + + def _write_new_cffi(self, buf): + """Send a binary-packed message to VPP.""" + if not self.connected: + raise IOError(1, 'Not connected') + return vpp_api.vac_write(ffi.from_buffer(buf), len(buf)) + + def _write_legacy_cffi(self, buf): + """Send a binary-packed message to VPP.""" + if not self.connected: + raise IOError(1, 'Not connected') + return vpp_api.vac_write(bytes(buf), len(buf)) + + def read(self): + if not self.connected: + raise IOError(1, 'Not connected') + mem = ffi.new("char **") + size = ffi.new("int *") + rv = vpp_api.vac_read(mem, size, self.read_timeout) + if rv: + raise IOError(rv, 'vac_read failed') + msg = bytes(ffi.buffer(mem[0], size[0])) + vpp_api.vac_free(mem[0]) + return msg diff --git a/src/vpp-api/python/vpp_papi/vpp_transport_socket.py b/src/vpp-api/python/vpp_papi/vpp_transport_socket.py new file mode 100644 index 00000000000..1822deb6d07 --- /dev/null +++ b/src/vpp-api/python/vpp_papi/vpp_transport_socket.py @@ -0,0 +1,176 @@ +# +# VPP Unix Domain Socket Transport. +# +import socket +import struct +import threading +import select +import multiprocessing +import logging + + +class VppTransport: + def __init__(self, parent, read_timeout, server_address): + self.connected = False + self.read_timeout = read_timeout if read_timeout > 0 else 1 + self.parent = parent + self.server_address = server_address + self.header = struct.Struct('>QII') + self.message_table = {} + self.sque = multiprocessing.Queue() + self.q = multiprocessing.Queue() + self.message_thread = threading.Thread(target=self.msg_thread_func) + + def msg_thread_func(self): + while True: + try: + rlist, _, _ = select.select([self.socket, + self.sque._reader], [], []) + except socket.error: + # Terminate thread + logging.error('select failed') + self.q.put(None) + return + + for r in rlist: + if r == self.sque._reader: + # Terminate + self.q.put(None) + return + + elif r == self.socket: + try: + msg = self._read() + if not msg: + self.q.put(None) + return + except socket.error: + self.q.put(None) + return + # Put either to local queue or if context == 0 + # callback queue + r = self.parent.decode_incoming_msg(msg) + if hasattr(r, 'context') and r.context > 0: + self.q.put(msg) + else: + self.parent.msg_handler_async(msg) + else: + raise IOError(2, 'Unknown response from select') + + def connect(self, name, pfx, msg_handler, rx_qlen): + + # Create a UDS socket + self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET) + self.socket.settimeout(self.read_timeout) + + # Connect the socket to the port where the server is listening + try: + self.socket.connect(self.server_address) + except socket.error as msg: + logging.error(msg) + raise + + self.connected = True + + # Initialise sockclnt_create + sockclnt_create = self.parent.messages['sockclnt_create'] + sockclnt_create_reply = self.parent.messages['sockclnt_create_reply'] + + args = {'_vl_msg_id': 15, + 'name': name, + 'context': 124} + b = sockclnt_create.pack(args) + self.write(b) + msg = self._read() + hdr, length = self.parent.header.unpack(msg, 0) + if hdr.msgid != 16: + raise IOError('Invalid reply message') + + r, length = sockclnt_create_reply.unpack(msg) + self.socket_index = r.index + for m in r.message_table: + n = m.name.rstrip(b'\x00\x13') + self.message_table[n] = m.index + + self.message_thread.daemon = True + self.message_thread.start() + + return 0 + + def disconnect(self): + try: # Might fail, if VPP closes socket before packet makes it out + rv = self.parent.api.sockclnt_delete(index=self.socket_index) + except IOError: + pass + self.connected = False + self.socket.close() + self.sque.put(True) # Terminate listening thread + self.message_thread.join() + + def suspend(self): + pass + + def resume(self): + pass + + def callback(self): + raise NotImplemented + + def get_callback(self, async): + return self.callback + + def get_msg_index(self, name): + try: + return self.message_table[name] + except KeyError: + return 0 + + def msg_table_max_index(self): + return len(self.message_table) + + def write(self, buf): + """Send a binary-packed message to VPP.""" + if not self.connected: + raise IOError(1, 'Not connected') + + # Send header + header = self.header.pack(0, len(buf), 0) + n = self.socket.send(header) + n = self.socket.send(buf) + + def _read(self): + # Header and message + try: + msg = self.socket.recv(4096) + if len(msg) == 0: + return None + except socket.error as message: + logging.error(message) + raise + + (_, l, _) = self.header.unpack(msg[:16]) + + if l > len(msg): + buf = bytearray(l + 16) + view = memoryview(buf) + view[:4096] = msg + view = view[4096:] + # Read rest of message + remaining_bytes = l - 4096 + 16 + while remaining_bytes > 0: + bytes_to_read = (remaining_bytes if remaining_bytes + <= 4096 else 4096) + nbytes = self.socket.recv_into(view, bytes_to_read) + if nbytes == 0: + logging.error('recv failed') + break + view = view[nbytes:] + remaining_bytes -= nbytes + else: + buf = msg + return buf[16:] + + def read(self): + if not self.connected: + raise IOError(1, 'Not connected') + return self.q.get() -- cgit 1.2.3-korg