summaryrefslogtreecommitdiffstats
path: root/src/vpp-api/python/vpp_papi
diff options
context:
space:
mode:
authorOle Troan <ot@cisco.com>2017-03-06 23:51:57 +0100
committerDamjan Marion <dmarion.lists@gmail.com>2017-03-07 12:12:14 +0000
commitdfc9b7cac857a3a49555f9fc448bd2c6aa3400a6 (patch)
tree2b7636eeb0eec6dcce00e1d6d8fa2ea976da0cbf /src/vpp-api/python/vpp_papi
parent9c6a613feb2d718c5756cecbcd3ab84156241db2 (diff)
Python API: Synchronous mode.
Change-Id: Ic8f186dbb35bb4e2e191d311cab51315a88a2d81 Signed-off-by: Ole Troan <ot@cisco.com>
Diffstat (limited to 'src/vpp-api/python/vpp_papi')
-rw-r--r--src/vpp-api/python/vpp_papi/pneum_wrap.c39
-rw-r--r--src/vpp-api/python/vpp_papi/vpp_papi.py194
2 files changed, 121 insertions, 112 deletions
diff --git a/src/vpp-api/python/vpp_papi/pneum_wrap.c b/src/vpp-api/python/vpp_papi/pneum_wrap.c
index 748b96744aa..c5a7eea1e00 100644
--- a/src/vpp-api/python/vpp_papi/pneum_wrap.c
+++ b/src/vpp-api/python/vpp_papi/pneum_wrap.c
@@ -42,19 +42,19 @@ wrap_pneum_callback (unsigned char * data, int len)
}
static PyObject *
-wrap_connect (PyObject *self, PyObject *args)
+wrap_connect (PyObject *self, PyObject *args, PyObject *kw)
{
char * name, * chroot_prefix = NULL;
- int rx_qlen=32; /* default rx queue length */
+ int rx_qlen = 32; /* default rx queue length */
int rv;
PyObject * temp = NULL;
pneum_callback_t cb = NULL;
- if (!PyArg_ParseTuple(args, "s|Ois:wrap_connect",
- &name, &temp, &rx_qlen, &chroot_prefix))
+ if (!PyArg_ParseTuple(args, "sOzi:wrap_connect",
+ &name, &temp, &chroot_prefix, &rx_qlen))
return (NULL);
- if (temp)
+ if (temp != Py_None)
{
if (!PyCallable_Check(temp))
{
@@ -82,6 +82,7 @@ wrap_disconnect (PyObject *self, PyObject *args)
Py_END_ALLOW_THREADS
return PyLong_FromLong(rv);
}
+
static PyObject *
wrap_write (PyObject *self, PyObject *args)
{
@@ -90,6 +91,7 @@ wrap_write (PyObject *self, PyObject *args)
if (!PyArg_ParseTuple(args, "s#", &data, &len))
return NULL;
+
Py_BEGIN_ALLOW_THREADS
rv = pneum_write(data, len);
Py_END_ALLOW_THREADS
@@ -102,9 +104,12 @@ wrap_read (PyObject *self, PyObject *args)
{
char *data;
int len, rv;
+ unsigned short timeout;
+ if (!PyArg_ParseTuple(args, "H", &timeout))
+ return (NULL);
Py_BEGIN_ALLOW_THREADS
- rv = pneum_read(&data, &len);
+ rv = pneum_read(&data, &len, timeout);
Py_END_ALLOW_THREADS
if (rv != 0) { Py_RETURN_NONE; }
@@ -113,9 +118,9 @@ wrap_read (PyObject *self, PyObject *args)
#else
PyObject *ret = Py_BuildValue("s#", data, len);
#endif
+ pneum_free(data);
if (!ret) { Py_RETURN_NONE; }
- pneum_free(data);
return ret;
}
@@ -147,12 +152,32 @@ wrap_msg_table (PyObject *self, PyObject *args)
Py_RETURN_NONE;
}
+static PyObject *
+wrap_suspend (PyObject *self, PyObject *args)
+{
+ Py_BEGIN_ALLOW_THREADS
+ pneum_rx_suspend();
+ Py_END_ALLOW_THREADS
+ Py_RETURN_NONE;
+}
+
+static PyObject *
+wrap_resume (PyObject *self, PyObject *args)
+{
+ Py_BEGIN_ALLOW_THREADS
+ pneum_rx_resume();
+ Py_END_ALLOW_THREADS
+ Py_RETURN_NONE;
+}
+
static PyMethodDef vpp_api_Methods[] = {
{"connect", wrap_connect, METH_VARARGS, "Connect to the VPP API."},
{"disconnect", wrap_disconnect, METH_VARARGS, "Disconnect from the VPP API."},
{"write", wrap_write, METH_VARARGS, "Write data to the VPP API."},
{"read", wrap_read, METH_VARARGS, "Read data from the VPP API."},
{"msg_table", wrap_msg_table, METH_VARARGS, "Get API dictionary."},
+ {"suspend", wrap_suspend, METH_VARARGS, "Suspend RX thread."},
+ {"resume", wrap_resume, METH_VARARGS, "Resume RX thread."},
{NULL, NULL, 0, NULL} /* Sentinel */
};
diff --git a/src/vpp-api/python/vpp_papi/vpp_papi.py b/src/vpp-api/python/vpp_papi/vpp_papi.py
index 83247ffa04b..0c40f1710e5 100644
--- a/src/vpp-api/python/vpp_papi/vpp_papi.py
+++ b/src/vpp-api/python/vpp_papi/vpp_papi.py
@@ -16,7 +16,7 @@
from __future__ import print_function
import sys, os, logging, collections, struct, json, threading, glob
-import atexit
+import atexit, Queue
logging.basicConfig(level=logging.DEBUG)
import vpp_api
@@ -57,7 +57,7 @@ class VPP():
provides a means to register a callback function to receive
these messages in a background thread.
"""
- def __init__(self, apifiles = None, testmode = False):
+ def __init__(self, apifiles = None, testmode = False, async_thread = True):
"""Create a VPP API object.
apifiles is a list of files containing API
@@ -72,11 +72,15 @@ class VPP():
self.buffersize = 10000
self.connected = False
self.header = struct.Struct('>HI')
- self.results_lock = threading.Lock()
- self.results = {}
- self.timeout = 5
self.apifiles = []
self.event_callback = None
+ self.message_queue = Queue.Queue()
+ self.read_timeout = 0
+ self.vpp_api = vpp_api
+ if async_thread:
+ self.event_thread = threading.Thread(target=self.thread_msg_handler)
+ self.event_thread.daemon = True
+ self.event_thread.start()
if not apifiles:
# Pick up API definitions from default directory
@@ -346,7 +350,7 @@ class VPP():
f = self.make_function(name, i, msgdef, multipart, async)
setattr(self._api, name, FuncWrapper(f))
- # olf API stuff starts here - will be removed in 17.07
+ # old API stuff starts here - will be removed in 17.07
if hasattr(self, name):
raise NameError(
3, "Conflicting name in JSON definition: `%s'" % name)
@@ -359,6 +363,12 @@ class VPP():
raise IOError(1, 'Not connected')
return vpp_api.write(str(buf))
+ def _read (self):
+ if not self.connected:
+ raise IOError(1, 'Not connected')
+
+ return vpp_api.read(self.read_timeout)
+
def _load_dictionary(self):
self.vpp_dictionary = {}
self.vpp_dictionary_maxid = 0
@@ -372,6 +382,19 @@ class VPP():
self.vpp_dictionary[name] = { 'id' : i, 'crc' : crc }
self.vpp_dictionary_maxid = max(self.vpp_dictionary_maxid, i)
+ def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen, async):
+ rv = vpp_api.connect(name, msg_handler, chroot_prefix, rx_qlen)
+ if rv != 0:
+ raise IOError(2, 'Connect failed')
+ self.connected = True
+
+ self._load_dictionary()
+ self._register_functions(async=async)
+
+ # Initialise control ping
+ self.control_ping_index = self.vpp_dictionary['control_ping']['id']
+ self.control_ping_msgdef = self.messages['control_ping']
+
def connect(self, name, chroot_prefix = None, async = False, rx_qlen = 32):
"""Attach to VPP.
@@ -381,22 +404,22 @@ class VPP():
rx_qlen - the length of the VPP message receive queue between
client and server.
"""
- msg_handler = self.msg_handler_sync if not async else self.msg_handler_async
- if chroot_prefix is not None:
- rv = vpp_api.connect(name, msg_handler, rx_qlen, chroot_prefix)
- else:
- rv = vpp_api.connect(name, msg_handler, rx_qlen)
+ msg_handler = self.msg_handler_sync if not async \
+ else self.msg_handler_async
+ return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
+ async)
- if rv != 0:
- raise IOError(2, 'Connect failed')
- self.connected = True
+ def connect_sync (self, name, chroot_prefix = None, rx_qlen = 32):
+ """Attach to VPP in synchronous mode. Application must poll for events.
- self._load_dictionary()
- self._register_functions(async=async)
+ name - the name of the client.
+ chroot_prefix - if VPP is chroot'ed, the prefix of the jail
+ rx_qlen - the length of the VPP message receive queue between
+ client and server.
+ """
- # Initialise control ping
- self.control_ping_index = self.vpp_dictionary['control_ping']['id']
- self.control_ping_msgdef = self.messages['control_ping']
+ return self.connect_internal(name, None, chroot_prefix, rx_qlen,
+ async=False)
def disconnect(self):
"""Detach from VPP."""
@@ -404,56 +427,6 @@ class VPP():
self.connected = False
return rv
- def results_wait(self, context):
- """In a sync call, wait for the reply
-
- The context ID is used to pair reply to request.
- """
-
- # Results is filled by the background callback. It will
- # raise the event when the context receives a response.
- # Given there are two threads we have to be careful with the
- # use of results and the structures under it, hence the lock.
- with self.results_lock:
- result = self.results[context]
- ev = result['e']
-
- timed_out = not ev.wait(self.timeout)
-
- if timed_out:
- raise IOError(3, 'Waiting for reply timed out')
- else:
- with self.results_lock:
- result = self.results[context]
- del self.results[context]
- return result['r']
-
- def results_prepare(self, context, multi=False):
- """Prep for receiving a result in response to a request msg
-
- context - unique context number sent in request and
- returned in reply or replies
- multi - true if we expect multiple messages from this
- reply.
- """
-
- # The event is used to indicate that all results are in
- new_result = {
- 'e': threading.Event(),
- }
- if multi:
- # Make it clear to the BG thread it's going to see several
- # messages; messages are stored in a results array
- new_result['m'] = True
- new_result['r'] = []
-
- new_result['e'].clear()
-
- # Put the prepped result structure into results, at which point
- # the bg thread can also access it (hence the thread lock)
- with self.results_lock:
- self.results[context] = new_result
-
def msg_handler_sync(self, msg):
"""Process an incoming message from VPP in sync mode.
@@ -473,32 +446,9 @@ class VPP():
if context == 0:
# No context -> async notification that we feed to the callback
- if self.event_callback:
- self.event_callback(msgname, r)
+ self.message_queue.put_nowait(r)
else:
- # Context -> use the results structure (carefully) to find
- # who we're responding to and return the message to that
- # thread
- with self.results_lock:
- if context not in self.results:
- eprint('Not expecting results for this context', context, r)
- else:
- result = self.results[context]
-
- #
- # Collect results until control ping
- #
-
- if msgname == 'control_ping_reply':
- # End of a multipart
- result['e'].set()
- elif 'm' in self.results[context]:
- # One element in a multipart
- result['r'].append(r)
- else:
- # All of a single result
- result['r'] = r
- result['e'].set()
+ raise IOError(2, 'RPC reply message received in event handler')
def decode_incoming_msg(self, msg):
if not msg:
@@ -556,16 +506,16 @@ class VPP():
no response within the timeout window.
"""
- # We need a context if not supplied, in order to get the
- # response
- context = kwargs.get('context', self.get_context())
- kwargs['context'] = context
-
- # Set up to receive a response
- self.results_prepare(context, multi=multipart)
+ if not 'context' in kwargs:
+ context = self.get_context()
+ kwargs['context'] = context
+ else:
+ context = kwargs['context']
+ kwargs['_vl_msg_id'] = i
+ b = self.encode(msgdef, kwargs)
- # Output the message
- self._call_vpp_async(i, msgdef, **kwargs)
+ vpp_api.suspend()
+ self._write(b)
if multipart:
# Send a ping after the request - we use its response
@@ -573,9 +523,30 @@ class VPP():
self._control_ping(context)
# Block until we get a reply.
- r = self.results_wait(context)
+ rl = []
+ while (True):
+ msg = self._read()
+ if not msg:
+ print('PNEUM ERROR: OH MY GOD')
+ raise IOError(2, 'PNEUM read failed')
+
+ r = self.decode_incoming_msg(msg)
+ msgname = type(r).__name__
+ if not context in r or r.context == 0 or context != r.context:
+ self.message_queue.put_nowait(r)
+ continue
- return r
+ if not multipart:
+ rl = r
+ break
+ if msgname == 'control_ping_reply':
+ break
+
+ rl.append(r)
+
+ vpp_api.resume()
+
+ return rl
def _call_vpp_async(self, i, msgdef, **kwargs):
"""Given a message, send the message and await a reply.
@@ -613,3 +584,16 @@ class VPP():
callback.
"""
self.event_callback = callback
+
+ def thread_msg_handler(self):
+ """Python thread calling the user registerd message handler.
+
+ This is to emulate the old style event callback scheme. Modern
+ clients should provide their own thread to poll the event
+ queue.
+ """
+ while True:
+ r = self.message_queue.get()
+ msgname = type(r).__name__
+ if self.event_callback:
+ self.event_callback(msgname, r)