diff options
author | Ole Troan <ot@cisco.com> | 2017-03-06 23:51:57 +0100 |
---|---|---|
committer | Damjan Marion <dmarion.lists@gmail.com> | 2017-03-07 12:12:14 +0000 |
commit | dfc9b7cac857a3a49555f9fc448bd2c6aa3400a6 (patch) | |
tree | 2b7636eeb0eec6dcce00e1d6d8fa2ea976da0cbf /src/vpp-api/python/vpp_papi | |
parent | 9c6a613feb2d718c5756cecbcd3ab84156241db2 (diff) |
Python API: Synchronous mode.
Change-Id: Ic8f186dbb35bb4e2e191d311cab51315a88a2d81
Signed-off-by: Ole Troan <ot@cisco.com>
Diffstat (limited to 'src/vpp-api/python/vpp_papi')
-rw-r--r-- | src/vpp-api/python/vpp_papi/pneum_wrap.c | 39 | ||||
-rw-r--r-- | src/vpp-api/python/vpp_papi/vpp_papi.py | 194 |
2 files changed, 121 insertions, 112 deletions
diff --git a/src/vpp-api/python/vpp_papi/pneum_wrap.c b/src/vpp-api/python/vpp_papi/pneum_wrap.c index 748b96744aa..c5a7eea1e00 100644 --- a/src/vpp-api/python/vpp_papi/pneum_wrap.c +++ b/src/vpp-api/python/vpp_papi/pneum_wrap.c @@ -42,19 +42,19 @@ wrap_pneum_callback (unsigned char * data, int len) } static PyObject * -wrap_connect (PyObject *self, PyObject *args) +wrap_connect (PyObject *self, PyObject *args, PyObject *kw) { char * name, * chroot_prefix = NULL; - int rx_qlen=32; /* default rx queue length */ + int rx_qlen = 32; /* default rx queue length */ int rv; PyObject * temp = NULL; pneum_callback_t cb = NULL; - if (!PyArg_ParseTuple(args, "s|Ois:wrap_connect", - &name, &temp, &rx_qlen, &chroot_prefix)) + if (!PyArg_ParseTuple(args, "sOzi:wrap_connect", + &name, &temp, &chroot_prefix, &rx_qlen)) return (NULL); - if (temp) + if (temp != Py_None) { if (!PyCallable_Check(temp)) { @@ -82,6 +82,7 @@ wrap_disconnect (PyObject *self, PyObject *args) Py_END_ALLOW_THREADS return PyLong_FromLong(rv); } + static PyObject * wrap_write (PyObject *self, PyObject *args) { @@ -90,6 +91,7 @@ wrap_write (PyObject *self, PyObject *args) if (!PyArg_ParseTuple(args, "s#", &data, &len)) return NULL; + Py_BEGIN_ALLOW_THREADS rv = pneum_write(data, len); Py_END_ALLOW_THREADS @@ -102,9 +104,12 @@ wrap_read (PyObject *self, PyObject *args) { char *data; int len, rv; + unsigned short timeout; + if (!PyArg_ParseTuple(args, "H", &timeout)) + return (NULL); Py_BEGIN_ALLOW_THREADS - rv = pneum_read(&data, &len); + rv = pneum_read(&data, &len, timeout); Py_END_ALLOW_THREADS if (rv != 0) { Py_RETURN_NONE; } @@ -113,9 +118,9 @@ wrap_read (PyObject *self, PyObject *args) #else PyObject *ret = Py_BuildValue("s#", data, len); #endif + pneum_free(data); if (!ret) { Py_RETURN_NONE; } - pneum_free(data); return ret; } @@ -147,12 +152,32 @@ wrap_msg_table (PyObject *self, PyObject *args) Py_RETURN_NONE; } +static PyObject * +wrap_suspend (PyObject *self, PyObject *args) +{ + Py_BEGIN_ALLOW_THREADS + pneum_rx_suspend(); + Py_END_ALLOW_THREADS + Py_RETURN_NONE; +} + +static PyObject * +wrap_resume (PyObject *self, PyObject *args) +{ + Py_BEGIN_ALLOW_THREADS + pneum_rx_resume(); + Py_END_ALLOW_THREADS + Py_RETURN_NONE; +} + static PyMethodDef vpp_api_Methods[] = { {"connect", wrap_connect, METH_VARARGS, "Connect to the VPP API."}, {"disconnect", wrap_disconnect, METH_VARARGS, "Disconnect from the VPP API."}, {"write", wrap_write, METH_VARARGS, "Write data to the VPP API."}, {"read", wrap_read, METH_VARARGS, "Read data from the VPP API."}, {"msg_table", wrap_msg_table, METH_VARARGS, "Get API dictionary."}, + {"suspend", wrap_suspend, METH_VARARGS, "Suspend RX thread."}, + {"resume", wrap_resume, METH_VARARGS, "Resume RX thread."}, {NULL, NULL, 0, NULL} /* Sentinel */ }; diff --git a/src/vpp-api/python/vpp_papi/vpp_papi.py b/src/vpp-api/python/vpp_papi/vpp_papi.py index 83247ffa04b..0c40f1710e5 100644 --- a/src/vpp-api/python/vpp_papi/vpp_papi.py +++ b/src/vpp-api/python/vpp_papi/vpp_papi.py @@ -16,7 +16,7 @@ from __future__ import print_function import sys, os, logging, collections, struct, json, threading, glob -import atexit +import atexit, Queue logging.basicConfig(level=logging.DEBUG) import vpp_api @@ -57,7 +57,7 @@ class VPP(): provides a means to register a callback function to receive these messages in a background thread. """ - def __init__(self, apifiles = None, testmode = False): + def __init__(self, apifiles = None, testmode = False, async_thread = True): """Create a VPP API object. apifiles is a list of files containing API @@ -72,11 +72,15 @@ class VPP(): self.buffersize = 10000 self.connected = False self.header = struct.Struct('>HI') - self.results_lock = threading.Lock() - self.results = {} - self.timeout = 5 self.apifiles = [] self.event_callback = None + self.message_queue = Queue.Queue() + self.read_timeout = 0 + self.vpp_api = vpp_api + if async_thread: + self.event_thread = threading.Thread(target=self.thread_msg_handler) + self.event_thread.daemon = True + self.event_thread.start() if not apifiles: # Pick up API definitions from default directory @@ -346,7 +350,7 @@ class VPP(): f = self.make_function(name, i, msgdef, multipart, async) setattr(self._api, name, FuncWrapper(f)) - # olf API stuff starts here - will be removed in 17.07 + # old API stuff starts here - will be removed in 17.07 if hasattr(self, name): raise NameError( 3, "Conflicting name in JSON definition: `%s'" % name) @@ -359,6 +363,12 @@ class VPP(): raise IOError(1, 'Not connected') return vpp_api.write(str(buf)) + def _read (self): + if not self.connected: + raise IOError(1, 'Not connected') + + return vpp_api.read(self.read_timeout) + def _load_dictionary(self): self.vpp_dictionary = {} self.vpp_dictionary_maxid = 0 @@ -372,6 +382,19 @@ class VPP(): self.vpp_dictionary[name] = { 'id' : i, 'crc' : crc } self.vpp_dictionary_maxid = max(self.vpp_dictionary_maxid, i) + def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen, async): + rv = vpp_api.connect(name, msg_handler, chroot_prefix, rx_qlen) + if rv != 0: + raise IOError(2, 'Connect failed') + self.connected = True + + self._load_dictionary() + self._register_functions(async=async) + + # Initialise control ping + self.control_ping_index = self.vpp_dictionary['control_ping']['id'] + self.control_ping_msgdef = self.messages['control_ping'] + def connect(self, name, chroot_prefix = None, async = False, rx_qlen = 32): """Attach to VPP. @@ -381,22 +404,22 @@ class VPP(): rx_qlen - the length of the VPP message receive queue between client and server. """ - msg_handler = self.msg_handler_sync if not async else self.msg_handler_async - if chroot_prefix is not None: - rv = vpp_api.connect(name, msg_handler, rx_qlen, chroot_prefix) - else: - rv = vpp_api.connect(name, msg_handler, rx_qlen) + msg_handler = self.msg_handler_sync if not async \ + else self.msg_handler_async + return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen, + async) - if rv != 0: - raise IOError(2, 'Connect failed') - self.connected = True + def connect_sync (self, name, chroot_prefix = None, rx_qlen = 32): + """Attach to VPP in synchronous mode. Application must poll for events. - self._load_dictionary() - self._register_functions(async=async) + name - the name of the client. + chroot_prefix - if VPP is chroot'ed, the prefix of the jail + rx_qlen - the length of the VPP message receive queue between + client and server. + """ - # Initialise control ping - self.control_ping_index = self.vpp_dictionary['control_ping']['id'] - self.control_ping_msgdef = self.messages['control_ping'] + return self.connect_internal(name, None, chroot_prefix, rx_qlen, + async=False) def disconnect(self): """Detach from VPP.""" @@ -404,56 +427,6 @@ class VPP(): self.connected = False return rv - def results_wait(self, context): - """In a sync call, wait for the reply - - The context ID is used to pair reply to request. - """ - - # Results is filled by the background callback. It will - # raise the event when the context receives a response. - # Given there are two threads we have to be careful with the - # use of results and the structures under it, hence the lock. - with self.results_lock: - result = self.results[context] - ev = result['e'] - - timed_out = not ev.wait(self.timeout) - - if timed_out: - raise IOError(3, 'Waiting for reply timed out') - else: - with self.results_lock: - result = self.results[context] - del self.results[context] - return result['r'] - - def results_prepare(self, context, multi=False): - """Prep for receiving a result in response to a request msg - - context - unique context number sent in request and - returned in reply or replies - multi - true if we expect multiple messages from this - reply. - """ - - # The event is used to indicate that all results are in - new_result = { - 'e': threading.Event(), - } - if multi: - # Make it clear to the BG thread it's going to see several - # messages; messages are stored in a results array - new_result['m'] = True - new_result['r'] = [] - - new_result['e'].clear() - - # Put the prepped result structure into results, at which point - # the bg thread can also access it (hence the thread lock) - with self.results_lock: - self.results[context] = new_result - def msg_handler_sync(self, msg): """Process an incoming message from VPP in sync mode. @@ -473,32 +446,9 @@ class VPP(): if context == 0: # No context -> async notification that we feed to the callback - if self.event_callback: - self.event_callback(msgname, r) + self.message_queue.put_nowait(r) else: - # Context -> use the results structure (carefully) to find - # who we're responding to and return the message to that - # thread - with self.results_lock: - if context not in self.results: - eprint('Not expecting results for this context', context, r) - else: - result = self.results[context] - - # - # Collect results until control ping - # - - if msgname == 'control_ping_reply': - # End of a multipart - result['e'].set() - elif 'm' in self.results[context]: - # One element in a multipart - result['r'].append(r) - else: - # All of a single result - result['r'] = r - result['e'].set() + raise IOError(2, 'RPC reply message received in event handler') def decode_incoming_msg(self, msg): if not msg: @@ -556,16 +506,16 @@ class VPP(): no response within the timeout window. """ - # We need a context if not supplied, in order to get the - # response - context = kwargs.get('context', self.get_context()) - kwargs['context'] = context - - # Set up to receive a response - self.results_prepare(context, multi=multipart) + if not 'context' in kwargs: + context = self.get_context() + kwargs['context'] = context + else: + context = kwargs['context'] + kwargs['_vl_msg_id'] = i + b = self.encode(msgdef, kwargs) - # Output the message - self._call_vpp_async(i, msgdef, **kwargs) + vpp_api.suspend() + self._write(b) if multipart: # Send a ping after the request - we use its response @@ -573,9 +523,30 @@ class VPP(): self._control_ping(context) # Block until we get a reply. - r = self.results_wait(context) + rl = [] + while (True): + msg = self._read() + if not msg: + print('PNEUM ERROR: OH MY GOD') + raise IOError(2, 'PNEUM read failed') + + r = self.decode_incoming_msg(msg) + msgname = type(r).__name__ + if not context in r or r.context == 0 or context != r.context: + self.message_queue.put_nowait(r) + continue - return r + if not multipart: + rl = r + break + if msgname == 'control_ping_reply': + break + + rl.append(r) + + vpp_api.resume() + + return rl def _call_vpp_async(self, i, msgdef, **kwargs): """Given a message, send the message and await a reply. @@ -613,3 +584,16 @@ class VPP(): callback. """ self.event_callback = callback + + def thread_msg_handler(self): + """Python thread calling the user registerd message handler. + + This is to emulate the old style event callback scheme. Modern + clients should provide their own thread to poll the event + queue. + """ + while True: + r = self.message_queue.get() + msgname = type(r).__name__ + if self.event_callback: + self.event_callback(msgname, r) |