summaryrefslogtreecommitdiffstats
path: root/src/vpp-api/python/vpp_papi/vpp_papi.py
diff options
context:
space:
mode:
authorOle Troan <ot@cisco.com>2018-08-02 11:58:12 +0200
committerFlorin Coras <florin.coras@gmail.com>2018-10-02 21:10:20 +0000
commit94495f2a6a68ac2202b7715ce09620f1ba6fe673 (patch)
treeb87604a42f7cacedff6c82f7d8fc68adc8348a5f /src/vpp-api/python/vpp_papi/vpp_papi.py
parent84db4087fa38b8d4c62cbb0787d600950638034c (diff)
PAPI: Use UNIX domain sockets instead of shared memory
Adds support for running the API purely across Unix domain sockets. Usage: vpp = VPP(use_socket=True) Change-Id: Iafc1301e03dd3edc3f4d702dd6c0b98d3b50b69e Signed-off-by: Ole Troan <ot@cisco.com>
Diffstat (limited to 'src/vpp-api/python/vpp_papi/vpp_papi.py')
-rw-r--r--src/vpp-api/python/vpp_papi/vpp_papi.py146
1 files changed, 37 insertions, 109 deletions
diff --git a/src/vpp-api/python/vpp_papi/vpp_papi.py b/src/vpp-api/python/vpp_papi/vpp_papi.py
index 4f765ecbd18..e1a7059f317 100644
--- a/src/vpp-api/python/vpp_papi/vpp_papi.py
+++ b/src/vpp-api/python/vpp_papi/vpp_papi.py
@@ -26,8 +26,6 @@ import threading
import fnmatch
import weakref
import atexit
-from cffi import FFI
-import cffi
from . vpp_serializer import VPPType, VPPEnumType, VPPUnionType, BaseTypes
from . vpp_serializer import VPPMessage
@@ -36,41 +34,15 @@ if sys.version[0] == '2':
else:
import queue as queue
-ffi = FFI()
-ffi.cdef("""
-typedef void (*vac_callback_t)(unsigned char * data, int len);
-typedef void (*vac_error_callback_t)(void *, unsigned char *, int);
-int vac_connect(char * name, char * chroot_prefix, vac_callback_t cb,
- int rx_qlen);
-int vac_disconnect(void);
-int vac_read(char **data, int *l, unsigned short timeout);
-int vac_write(char *data, int len);
-void vac_free(void * msg);
-
-int vac_get_msg_index(unsigned char * name);
-int vac_msg_table_size(void);
-int vac_msg_table_max_index(void);
-
-void vac_rx_suspend (void);
-void vac_rx_resume (void);
-void vac_set_error_handler(vac_error_callback_t);
- """)
-
-# Barfs on failure, no need to check success.
-vpp_api = ffi.dlopen('libvppapiclient.so')
-
def vpp_atexit(vpp_weakref):
"""Clean up VPP connection on shutdown."""
vpp_instance = vpp_weakref()
- if vpp_instance and vpp_instance.connected:
+ if vpp_instance and vpp_instance.transport.connected:
vpp_instance.logger.debug('Cleaning up VPP on exit')
vpp_instance.disconnect()
-vpp_object = None
-
-
def vpp_iterator(d):
if sys.version[0] == '2':
return d.iteritems()
@@ -78,21 +50,6 @@ def vpp_iterator(d):
return d.items()
-@ffi.callback("void(unsigned char *, int)")
-def vac_callback_sync(data, len):
- vpp_object.msg_handler_sync(ffi.buffer(data, len))
-
-
-@ffi.callback("void(unsigned char *, int)")
-def vac_callback_async(data, len):
- vpp_object.msg_handler_async(ffi.buffer(data, len))
-
-
-@ffi.callback("void(void *, unsigned char *, int)")
-def vac_error_handler(arg, msg, msg_len):
- vpp_object.logger.warning("VPP API client:: %s", ffi.string(msg, msg_len))
-
-
class VppApiDynamicMethodHolder(object):
pass
@@ -168,7 +125,8 @@ class VPP():
def __init__(self, apifiles=None, testmode=False, async_thread=True,
logger=logging.getLogger('vpp_papi'), loglevel='debug',
- read_timeout=0):
+ read_timeout=5, use_socket=False,
+ server_address='/run/vpp-api.sock'):
"""Create a VPP API object.
apifiles is a list of files containing API
@@ -181,9 +139,6 @@ class VPP():
loglevel, if supplied, is the log level this logger is set
to report at (from the loglevels in the logging module).
"""
- global vpp_object
- vpp_object = self
-
if logger is None:
logger = logging.getLogger(__name__)
if loglevel is not None:
@@ -193,16 +148,19 @@ class VPP():
self.messages = {}
self.id_names = []
self.id_msgdef = []
- self.connected = False
self.header = VPPType('header', [['u16', 'msgid'],
['u32', 'client_index']])
self.apifiles = []
self.event_callback = None
self.message_queue = queue.Queue()
self.read_timeout = read_timeout
- self.vpp_api = vpp_api
self.async_thread = async_thread
+ if use_socket:
+ from . vpp_transport_socket import VppTransport
+ else:
+ from . vpp_transport_shmem import VppTransport
+
if not apifiles:
# Pick up API definitions from default directory
try:
@@ -224,22 +182,11 @@ class VPP():
if len(self.messages) == 0 and not testmode:
raise ValueError(1, 'Missing JSON message definitions')
+ self.transport = VppTransport(self, read_timeout=read_timeout,
+ server_address=server_address)
# Make sure we allow VPP to clean up the message rings.
atexit.register(vpp_atexit, weakref.ref(self))
- # Register error handler
- if not testmode:
- vpp_api.vac_set_error_handler(vac_error_handler)
-
- # Support legacy CFFI
- # from_buffer supported from 1.8.0
- (major, minor, patch) = [int(s) for s in
- cffi.__version__.split('.', 3)]
- if major >= 1 and minor >= 8:
- self._write = self._write_new_cffi
- else:
- self._write = self._write_legacy_cffi
-
class ContextId(object):
"""Thread-safe provider of unique context IDs."""
def __init__(self):
@@ -377,11 +324,6 @@ class VPP():
return api_files
- def status(self):
- """Debug function: report current VPP API status to stdout."""
- print('Connected') if self.connected else print('Not Connected')
- print('Read API definitions from', ', '.join(self.apifiles))
-
@property
def api(self):
if not hasattr(self, "_api"):
@@ -408,7 +350,7 @@ class VPP():
self._api = VppApiDynamicMethodHolder()
for name, msg in vpp_iterator(self.messages):
n = name + '_' + msg.crc[2:]
- i = vpp_api.vac_get_msg_index(n.encode())
+ i = self.transport.get_msg_index(n.encode())
if i > 0:
self.id_msgdef[i] = msg
self.id_names[i] = name
@@ -420,43 +362,19 @@ class VPP():
self.logger.debug(
'No such message type or failed CRC checksum: %s', n)
- def _write_new_cffi(self, buf):
- """Send a binary-packed message to VPP."""
- if not self.connected:
- raise IOError(1, 'Not connected')
- return vpp_api.vac_write(ffi.from_buffer(buf), len(buf))
-
- def _write_legacy_cffi(self, buf):
- """Send a binary-packed message to VPP."""
- if not self.connected:
- raise IOError(1, 'Not connected')
- return vpp_api.vac_write(bytes(buf), len(buf))
-
- def _read(self):
- if not self.connected:
- raise IOError(1, 'Not connected')
- mem = ffi.new("char **")
- size = ffi.new("int *")
- rv = vpp_api.vac_read(mem, size, self.read_timeout)
- if rv:
- raise IOError(rv, 'vac_read failed')
- msg = bytes(ffi.buffer(mem[0], size[0]))
- vpp_api.vac_free(mem[0])
- return msg
-
def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen,
do_async):
- pfx = chroot_prefix.encode() if chroot_prefix else ffi.NULL
- rv = vpp_api.vac_connect(name.encode(), pfx, msg_handler, rx_qlen)
+ pfx = chroot_prefix.encode() if chroot_prefix else None
+
+ rv = self.transport.connect(name.encode(), pfx, msg_handler, rx_qlen)
if rv != 0:
raise IOError(2, 'Connect failed')
- self.connected = True
- self.vpp_dictionary_maxid = vpp_api.vac_msg_table_max_index()
+ self.vpp_dictionary_maxid = self.transport.msg_table_max_index()
self._register_functions(do_async=do_async)
# Initialise control ping
crc = self.messages['control_ping'].crc
- self.control_ping_index = vpp_api.vac_get_msg_index(
+ self.control_ping_index = self.transport.get_msg_index(
('control_ping' + '_' + crc[2:]).encode())
self.control_ping_msgdef = self.messages['control_ping']
if self.async_thread:
@@ -475,7 +393,7 @@ class VPP():
rx_qlen - the length of the VPP message receive queue between
client and server.
"""
- msg_handler = vac_callback_sync if not do_async else vac_callback_async
+ msg_handler = self.transport.get_callback(do_async)
return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
do_async)
@@ -488,13 +406,12 @@ class VPP():
client and server.
"""
- return self.connect_internal(name, ffi.NULL, chroot_prefix, rx_qlen,
+ return self.connect_internal(name, None, chroot_prefix, rx_qlen,
do_async=False)
def disconnect(self):
"""Detach from VPP."""
- rv = vpp_api.vac_disconnect()
- self.connected = False
+ rv = self.transport.disconnect()
self.message_queue.put("terminate event thread")
return rv
@@ -586,10 +503,16 @@ class VPP():
context = kwargs['context']
kwargs['_vl_msg_id'] = i
+ try:
+ if self.transport.socket_index:
+ kwargs['client_index'] = self.transport.socket_index
+ except AttributeError:
+ pass
self.validate_args(msg, kwargs)
b = msg.pack(kwargs)
- vpp_api.vac_rx_suspend()
- self._write(b)
+ self.transport.suspend()
+
+ self.transport.write(b)
if multipart:
# Send a ping after the request - we use its response
@@ -599,12 +522,13 @@ class VPP():
# Block until we get a reply.
rl = []
while (True):
- msg = self._read()
+ msg = self.transport.read()
if not msg:
raise IOError(2, 'VPP API client: read failed')
r = self.decode_incoming_msg(msg)
msgname = type(r).__name__
if context not in r or r.context == 0 or context != r.context:
+ # Message being queued
self.message_queue.put_nowait(r)
continue
@@ -616,7 +540,7 @@ class VPP():
rl.append(r)
- vpp_api.vac_rx_resume()
+ self.transport.resume()
return rl
@@ -634,11 +558,15 @@ class VPP():
kwargs['context'] = context
else:
context = kwargs['context']
- kwargs['client_index'] = 0
+ try:
+ if self.transport.socket_index:
+ kwargs['client_index'] = self.transport.socket_index
+ except AttributeError:
+ kwargs['client_index'] = 0
kwargs['_vl_msg_id'] = i
b = msg.pack(kwargs)
- self._write(b)
+ self.transport.write(b)
def register_event_callback(self, callback):
"""Register a callback for async messages.
@@ -659,7 +587,7 @@ class VPP():
self.event_callback = callback
def thread_msg_handler(self):
- """Python thread calling the user registerd message handler.
+ """Python thread calling the user registered message handler.
This is to emulate the old style event callback scheme. Modern
clients should provide their own thread to poll the event