From dfc9b7cac857a3a49555f9fc448bd2c6aa3400a6 Mon Sep 17 00:00:00 2001 From: Ole Troan Date: Mon, 6 Mar 2017 23:51:57 +0100 Subject: Python API: Synchronous mode. Change-Id: Ic8f186dbb35bb4e2e191d311cab51315a88a2d81 Signed-off-by: Ole Troan --- src/vlibmemory/memclnt.api | 17 +- src/vpp-api/python/Makefile.am | 2 +- src/vpp-api/python/pneum/pneum.c | 273 +++++++++++++++++++++++++++---- src/vpp-api/python/pneum/pneum.h | 4 +- src/vpp-api/python/vpp_papi/pneum_wrap.c | 39 ++++- src/vpp-api/python/vpp_papi/vpp_papi.py | 194 ++++++++++------------ 6 files changed, 380 insertions(+), 149 deletions(-) diff --git a/src/vlibmemory/memclnt.api b/src/vlibmemory/memclnt.api index 0532d7b6..c38b483c 100644 --- a/src/vlibmemory/memclnt.api +++ b/src/vlibmemory/memclnt.api @@ -48,14 +48,27 @@ define memclnt_delete_reply { u64 handle; /* in case the client wonders */ }; -/* +/* * Client RX thread exit */ - define rx_thread_exit { u8 dummy; }; +/* + * Client RX thread suspend + */ +define memclnt_rx_thread_suspend { + u8 dummy; +}; + +/* + * Client read timeout + */ +define memclnt_read_timeout { + u8 dummy; +}; + /* * RPC */ diff --git a/src/vpp-api/python/Makefile.am b/src/vpp-api/python/Makefile.am index cd8db4f6..54076822 100644 --- a/src/vpp-api/python/Makefile.am +++ b/src/vpp-api/python/Makefile.am @@ -39,7 +39,7 @@ libpneum_la_LDFLAGS = -module libpneum_la_CPPFLAGS = # TODO: Support both Python 2 and 3. -install-exec-local: +install-exec-local: $(lib_LTLIBRARIES) cd $(srcdir); \ mkdir -p $(pythondir); \ mkdir -p $(pyexecdir); \ diff --git a/src/vpp-api/python/pneum/pneum.c b/src/vpp-api/python/pneum/pneum.c index 37c8d8fe..da9d69df 100644 --- a/src/vpp-api/python/pneum/pneum.c +++ b/src/vpp-api/python/pneum/pneum.c @@ -22,9 +22,7 @@ #include #include #include -#include #include - #include #include #include @@ -35,6 +33,16 @@ #include "pneum.h" +/* + * Asynchronous mode: + * Client registers a callback. All messages are sent to the callback. + * Synchronous mode: + * Client calls blocking read(). + * Clients are expected to collate events on a queue. + * pneum_write() -> suspends RX thread + * pneum_read() -> resumes RX thread + */ + #define vl_typedefs /* define message structures */ #include #undef vl_typedefs @@ -47,15 +55,50 @@ vlib_main_t vlib_global_main; vlib_main_t **vlib_mains; typedef struct { - u8 rx_thread_jmpbuf_valid; u8 connected_to_vlib; - jmp_buf rx_thread_jmpbuf; pthread_t rx_thread_handle; + pthread_t timeout_thread_handle; + pthread_mutex_t queue_lock; + pthread_cond_t suspend_cv; + pthread_cond_t resume_cv; + pthread_mutex_t timeout_lock; + pthread_cond_t timeout_cv; + pthread_cond_t timeout_cancel_cv; + pthread_cond_t terminate_cv; } pneum_main_t; pneum_main_t pneum_main; - pneum_callback_t pneum_callback; +u16 read_timeout = 0; +bool rx_is_running = false; + +static void +init (void) +{ + pneum_main_t *pm = &pneum_main; + memset(pm, 0, sizeof(*pm)); + pthread_mutex_init(&pm->queue_lock, NULL); + pthread_cond_init(&pm->suspend_cv, NULL); + pthread_cond_init(&pm->resume_cv, NULL); + pthread_mutex_init(&pm->timeout_lock, NULL); + pthread_cond_init(&pm->timeout_cv, NULL); + pthread_cond_init(&pm->timeout_cancel_cv, NULL); + pthread_cond_init(&pm->terminate_cv, NULL); +} + +static void +cleanup (void) +{ + pneum_main_t *pm = &pneum_main; + pthread_cond_destroy(&pm->suspend_cv); + pthread_cond_destroy(&pm->resume_cv); + pthread_cond_destroy(&pm->timeout_cv); + pthread_cond_destroy(&pm->timeout_cancel_cv); + pthread_cond_destroy(&pm->terminate_cv); + pthread_mutex_destroy(&pm->queue_lock); + pthread_mutex_destroy(&pm->timeout_lock); + memset (pm, 0, sizeof (*pm)); +} /* * Satisfy external references when -lvlib is not available. @@ -75,11 +118,6 @@ static void pneum_api_handler (void *msg) { u16 id = ntohs(*((u16 *)msg)); - if (id == VL_API_RX_THREAD_EXIT) { - pneum_main_t *pm = &pneum_main; - vl_msg_api_free(msg); - longjmp(pm->rx_thread_jmpbuf, 1); - } msgbuf_t *msgbuf = (msgbuf_t *)(((u8 *)msg) - offsetof(msgbuf_t, data)); int l = ntohl(msgbuf->data_len); if (l == 0) @@ -101,16 +139,108 @@ pneum_rx_thread_fn (void *arg) q = am->vl_input_queue; - /* So we can make the rx thread terminate cleanly */ - if (setjmp(pm->rx_thread_jmpbuf) == 0) { - pm->rx_thread_jmpbuf_valid = 1; - while (1) - while (!unix_shared_memory_queue_sub(q, (u8 *)&msg, 0)) - pneum_api_handler((void *)msg); - } + while (1) + while (!unix_shared_memory_queue_sub(q, (u8 *)&msg, 0)) + { + u16 id = ntohs(*((u16 *)msg)); + switch (id) { + case VL_API_RX_THREAD_EXIT: + vl_msg_api_free((void *) msg); + /* signal waiting threads that this thread is about to terminate */ + pthread_mutex_lock(&pm->queue_lock); + pthread_cond_signal(&pm->terminate_cv); + pthread_mutex_unlock(&pm->queue_lock); + pthread_exit(0); + return 0; + break; + + case VL_API_MEMCLNT_RX_THREAD_SUSPEND: + vl_msg_api_free((void * )msg); + /* Suspend thread and signal reader */ + pthread_mutex_lock(&pm->queue_lock); + pthread_cond_signal(&pm->suspend_cv); + /* Wait for the resume signal */ + pthread_cond_wait (&pm->resume_cv, &pm->queue_lock); + pthread_mutex_unlock(&pm->queue_lock); + break; + + case VL_API_MEMCLNT_READ_TIMEOUT: + clib_warning("Received read timeout in async thread\n"); + vl_msg_api_free((void *) msg); + break; + + default: + pneum_api_handler((void *)msg); + } + } +} + +static void * +pneum_timeout_thread_fn (void *arg) +{ + vl_api_memclnt_read_timeout_t *ep; + pneum_main_t *pm = &pneum_main; + api_main_t *am = &api_main; + struct timespec ts; + struct timeval tv; + u16 timeout; + int rv; + + while (1) + { + /* Wait for poke */ + pthread_mutex_lock(&pm->timeout_lock); + pthread_cond_wait (&pm->timeout_cv, &pm->timeout_lock); + timeout = read_timeout; + gettimeofday(&tv, NULL); + ts.tv_sec = tv.tv_sec + timeout; + ts.tv_nsec = 0; + rv = pthread_cond_timedwait (&pm->timeout_cancel_cv, + &pm->timeout_lock, &ts); + pthread_mutex_unlock(&pm->timeout_lock); + if (rv == ETIMEDOUT) + { + ep = vl_msg_api_alloc (sizeof (*ep)); + ep->_vl_msg_id = ntohs(VL_API_MEMCLNT_READ_TIMEOUT); + vl_msg_api_send_shmem(am->vl_input_queue, (u8 *)&ep); + } + } pthread_exit(0); } +void +pneum_rx_suspend (void) +{ + api_main_t *am = &api_main; + pneum_main_t *pm = &pneum_main; + vl_api_memclnt_rx_thread_suspend_t *ep; + + if (!pm->rx_thread_handle) return; + pthread_mutex_lock(&pm->queue_lock); + if (rx_is_running) + { + ep = vl_msg_api_alloc (sizeof (*ep)); + ep->_vl_msg_id = ntohs(VL_API_MEMCLNT_RX_THREAD_SUSPEND); + vl_msg_api_send_shmem(am->vl_input_queue, (u8 *)&ep); + /* Wait for RX thread to tell us it has suspendend */ + pthread_cond_wait(&pm->suspend_cv, &pm->queue_lock); + rx_is_running = false; + } + pthread_mutex_unlock(&pm->queue_lock); +} + +void +pneum_rx_resume (void) +{ + pneum_main_t *pm = &pneum_main; + if (!pm->rx_thread_handle) return; + pthread_mutex_lock(&pm->queue_lock); + if (rx_is_running) return; + pthread_cond_signal(&pm->resume_cv); + rx_is_running = true; + pthread_mutex_unlock(&pm->queue_lock); +} + uword * pneum_msg_table_get_hash (void) { @@ -126,12 +256,13 @@ pneum_msg_table_size(void) } int -pneum_connect (char * name, char * chroot_prefix, pneum_callback_t cb, +pneum_connect (char * name, char * chroot_prefix, pneum_callback_t cb, int rx_qlen) { int rv = 0; pneum_main_t *pm = &pneum_main; + init(); if (chroot_prefix != NULL) vl_set_memory_root_path (chroot_prefix); @@ -154,6 +285,16 @@ pneum_connect (char * name, char * chroot_prefix, pneum_callback_t cb, return (-1); } pneum_callback = cb; + rx_is_running = true; + } + + /* Start read timeout thread */ + rv = pthread_create(&pm->timeout_thread_handle, NULL, + pneum_timeout_thread_fn, 0); + if (rv) { + clib_warning("pthread_create returned %d", rv); + vl_client_api_unmap(); + return (-1); } pm->connected_to_vlib = 1; @@ -167,31 +308,69 @@ pneum_disconnect (void) api_main_t *am = &api_main; pneum_main_t *pm = &pneum_main; - if (pm->rx_thread_jmpbuf_valid) { + if (!pm->connected_to_vlib) return 0; + + if (pm->rx_thread_handle) { vl_api_rx_thread_exit_t *ep; uword junk; ep = vl_msg_api_alloc (sizeof (*ep)); ep->_vl_msg_id = ntohs(VL_API_RX_THREAD_EXIT); vl_msg_api_send_shmem(am->vl_input_queue, (u8 *)&ep); - pthread_join(pm->rx_thread_handle, (void **) &junk); - } - if (pm->connected_to_vlib) { - vl_client_disconnect(); - vl_client_api_unmap(); - pneum_callback = 0; + + /* wait (with timeout) until RX thread has finished */ + struct timespec ts; + struct timeval tv; + gettimeofday(&tv, NULL); + ts.tv_sec = tv.tv_sec + 5; + ts.tv_nsec = 0; + pthread_mutex_lock(&pm->queue_lock); + int rv = pthread_cond_timedwait(&pm->terminate_cv, &pm->queue_lock, &ts); + pthread_mutex_unlock(&pm->queue_lock); + /* now join so we wait until thread has -really- finished */ + if (rv == ETIMEDOUT) + pthread_cancel(pm->rx_thread_handle); + else + pthread_join(pm->rx_thread_handle, (void **) &junk); } - memset (pm, 0, sizeof (*pm)); + if (pm->timeout_thread_handle) + pthread_cancel(pm->timeout_thread_handle); + + vl_client_disconnect(); + vl_client_api_unmap(); + pneum_callback = 0; + + cleanup(); return (0); } +static void +set_timeout (unsigned short timeout) +{ + pneum_main_t *pm = &pneum_main; + pthread_mutex_lock(&pm->timeout_lock); + read_timeout = timeout; + pthread_cond_signal(&pm->timeout_cv); + pthread_mutex_unlock(&pm->timeout_lock); +} + +static void +unset_timeout (void) +{ + pneum_main_t *pm = &pneum_main; + pthread_mutex_lock(&pm->timeout_lock); + pthread_cond_signal(&pm->timeout_cancel_cv); + pthread_mutex_unlock(&pm->timeout_lock); +} + int -pneum_read (char **p, int *l) +pneum_read (char **p, int *l, u16 timeout) { unix_shared_memory_queue_t *q; api_main_t *am = &api_main; pneum_main_t *pm = &pneum_main; uword msg; + msgbuf_t *msgbuf; if (!pm->connected_to_vlib) return -1; @@ -199,21 +378,48 @@ pneum_read (char **p, int *l) if (am->our_pid == 0) return (-1); + /* Poke timeout thread */ + if (timeout) + set_timeout(timeout); + q = am->vl_input_queue; int rv = unix_shared_memory_queue_sub(q, (u8 *)&msg, 0); if (rv == 0) { u16 msg_id = ntohs(*((u16 *)msg)); - msgbuf_t *msgbuf = (msgbuf_t *)(((u8 *)msg) - offsetof(msgbuf_t, data)); - *l = ntohl(msgbuf->data_len); - if (*l == 0) { - printf("Unregistered API message: %d\n", msg_id); - return (-1); + switch (msg_id) { + case VL_API_RX_THREAD_EXIT: + printf("Received thread exit\n"); + return -1; + case VL_API_MEMCLNT_RX_THREAD_SUSPEND: + printf("Received thread suspend\n"); + goto error; + case VL_API_MEMCLNT_READ_TIMEOUT: + printf("Received read timeout %ds\n", timeout); + goto error; + + default: + msgbuf = (msgbuf_t *)(((u8 *)msg) - offsetof(msgbuf_t, data)); + *l = ntohl(msgbuf->data_len); + if (*l == 0) { + printf("Unregistered API message: %d\n", msg_id); + goto error; + } } *p = (char *)msg; + + /* Let timeout notification thread know we're done */ + unset_timeout(); + } else { printf("Read failed with %d\n", rv); } return (rv); + + error: + vl_msg_api_free((void *) msg); + /* Client might forget to resume RX thread on failure */ + pneum_rx_resume (); + return -1; } /* @@ -241,12 +447,13 @@ pneum_write (char *p, int l) if (!pm->connected_to_vlib) return -1; if (!mp) return (-1); + memcpy(mp, p, l); mp->client_index = pneum_client_index(); q = am->shmem_hdr->vl_input_queue; rv = unix_shared_memory_queue_add(q, (u8 *)&mp, 0); if (rv != 0) { - printf("vpe_api_write fails: %d\n", rv); + clib_warning("vpe_api_write fails: %d\n", rv); /* Clear message */ pneum_free(mp); } diff --git a/src/vpp-api/python/pneum/pneum.h b/src/vpp-api/python/pneum/pneum.h index 9312eb47..c4b55ae0 100644 --- a/src/vpp-api/python/pneum/pneum.h +++ b/src/vpp-api/python/pneum/pneum.h @@ -22,11 +22,13 @@ typedef void (*pneum_callback_t)(unsigned char * data, int len); int pneum_connect(char * name, char * chroot_prefix, pneum_callback_t cb, int rx_qlen); int pneum_disconnect(void); -int pneum_read(char **data, int *l); +int pneum_read(char **data, int *l, unsigned short timeout); int pneum_write(char *data, int len); void pneum_free(void * msg); uword * pneum_msg_table_get_hash (void); int pneum_msg_table_size(void); uint32_t pneum_get_msg_index(unsigned char * name); +void pneum_rx_suspend (void); +void pneum_rx_resume (void); #endif diff --git a/src/vpp-api/python/vpp_papi/pneum_wrap.c b/src/vpp-api/python/vpp_papi/pneum_wrap.c index 748b9674..c5a7eea1 100644 --- a/src/vpp-api/python/vpp_papi/pneum_wrap.c +++ b/src/vpp-api/python/vpp_papi/pneum_wrap.c @@ -42,19 +42,19 @@ wrap_pneum_callback (unsigned char * data, int len) } static PyObject * -wrap_connect (PyObject *self, PyObject *args) +wrap_connect (PyObject *self, PyObject *args, PyObject *kw) { char * name, * chroot_prefix = NULL; - int rx_qlen=32; /* default rx queue length */ + int rx_qlen = 32; /* default rx queue length */ int rv; PyObject * temp = NULL; pneum_callback_t cb = NULL; - if (!PyArg_ParseTuple(args, "s|Ois:wrap_connect", - &name, &temp, &rx_qlen, &chroot_prefix)) + if (!PyArg_ParseTuple(args, "sOzi:wrap_connect", + &name, &temp, &chroot_prefix, &rx_qlen)) return (NULL); - if (temp) + if (temp != Py_None) { if (!PyCallable_Check(temp)) { @@ -82,6 +82,7 @@ wrap_disconnect (PyObject *self, PyObject *args) Py_END_ALLOW_THREADS return PyLong_FromLong(rv); } + static PyObject * wrap_write (PyObject *self, PyObject *args) { @@ -90,6 +91,7 @@ wrap_write (PyObject *self, PyObject *args) if (!PyArg_ParseTuple(args, "s#", &data, &len)) return NULL; + Py_BEGIN_ALLOW_THREADS rv = pneum_write(data, len); Py_END_ALLOW_THREADS @@ -102,9 +104,12 @@ wrap_read (PyObject *self, PyObject *args) { char *data; int len, rv; + unsigned short timeout; + if (!PyArg_ParseTuple(args, "H", &timeout)) + return (NULL); Py_BEGIN_ALLOW_THREADS - rv = pneum_read(&data, &len); + rv = pneum_read(&data, &len, timeout); Py_END_ALLOW_THREADS if (rv != 0) { Py_RETURN_NONE; } @@ -113,9 +118,9 @@ wrap_read (PyObject *self, PyObject *args) #else PyObject *ret = Py_BuildValue("s#", data, len); #endif + pneum_free(data); if (!ret) { Py_RETURN_NONE; } - pneum_free(data); return ret; } @@ -147,12 +152,32 @@ wrap_msg_table (PyObject *self, PyObject *args) Py_RETURN_NONE; } +static PyObject * +wrap_suspend (PyObject *self, PyObject *args) +{ + Py_BEGIN_ALLOW_THREADS + pneum_rx_suspend(); + Py_END_ALLOW_THREADS + Py_RETURN_NONE; +} + +static PyObject * +wrap_resume (PyObject *self, PyObject *args) +{ + Py_BEGIN_ALLOW_THREADS + pneum_rx_resume(); + Py_END_ALLOW_THREADS + Py_RETURN_NONE; +} + static PyMethodDef vpp_api_Methods[] = { {"connect", wrap_connect, METH_VARARGS, "Connect to the VPP API."}, {"disconnect", wrap_disconnect, METH_VARARGS, "Disconnect from the VPP API."}, {"write", wrap_write, METH_VARARGS, "Write data to the VPP API."}, {"read", wrap_read, METH_VARARGS, "Read data from the VPP API."}, {"msg_table", wrap_msg_table, METH_VARARGS, "Get API dictionary."}, + {"suspend", wrap_suspend, METH_VARARGS, "Suspend RX thread."}, + {"resume", wrap_resume, METH_VARARGS, "Resume RX thread."}, {NULL, NULL, 0, NULL} /* Sentinel */ }; diff --git a/src/vpp-api/python/vpp_papi/vpp_papi.py b/src/vpp-api/python/vpp_papi/vpp_papi.py index 83247ffa..0c40f171 100644 --- a/src/vpp-api/python/vpp_papi/vpp_papi.py +++ b/src/vpp-api/python/vpp_papi/vpp_papi.py @@ -16,7 +16,7 @@ from __future__ import print_function import sys, os, logging, collections, struct, json, threading, glob -import atexit +import atexit, Queue logging.basicConfig(level=logging.DEBUG) import vpp_api @@ -57,7 +57,7 @@ class VPP(): provides a means to register a callback function to receive these messages in a background thread. """ - def __init__(self, apifiles = None, testmode = False): + def __init__(self, apifiles = None, testmode = False, async_thread = True): """Create a VPP API object. apifiles is a list of files containing API @@ -72,11 +72,15 @@ class VPP(): self.buffersize = 10000 self.connected = False self.header = struct.Struct('>HI') - self.results_lock = threading.Lock() - self.results = {} - self.timeout = 5 self.apifiles = [] self.event_callback = None + self.message_queue = Queue.Queue() + self.read_timeout = 0 + self.vpp_api = vpp_api + if async_thread: + self.event_thread = threading.Thread(target=self.thread_msg_handler) + self.event_thread.daemon = True + self.event_thread.start() if not apifiles: # Pick up API definitions from default directory @@ -346,7 +350,7 @@ class VPP(): f = self.make_function(name, i, msgdef, multipart, async) setattr(self._api, name, FuncWrapper(f)) - # olf API stuff starts here - will be removed in 17.07 + # old API stuff starts here - will be removed in 17.07 if hasattr(self, name): raise NameError( 3, "Conflicting name in JSON definition: `%s'" % name) @@ -359,6 +363,12 @@ class VPP(): raise IOError(1, 'Not connected') return vpp_api.write(str(buf)) + def _read (self): + if not self.connected: + raise IOError(1, 'Not connected') + + return vpp_api.read(self.read_timeout) + def _load_dictionary(self): self.vpp_dictionary = {} self.vpp_dictionary_maxid = 0 @@ -372,6 +382,19 @@ class VPP(): self.vpp_dictionary[name] = { 'id' : i, 'crc' : crc } self.vpp_dictionary_maxid = max(self.vpp_dictionary_maxid, i) + def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen, async): + rv = vpp_api.connect(name, msg_handler, chroot_prefix, rx_qlen) + if rv != 0: + raise IOError(2, 'Connect failed') + self.connected = True + + self._load_dictionary() + self._register_functions(async=async) + + # Initialise control ping + self.control_ping_index = self.vpp_dictionary['control_ping']['id'] + self.control_ping_msgdef = self.messages['control_ping'] + def connect(self, name, chroot_prefix = None, async = False, rx_qlen = 32): """Attach to VPP. @@ -381,22 +404,22 @@ class VPP(): rx_qlen - the length of the VPP message receive queue between client and server. """ - msg_handler = self.msg_handler_sync if not async else self.msg_handler_async - if chroot_prefix is not None: - rv = vpp_api.connect(name, msg_handler, rx_qlen, chroot_prefix) - else: - rv = vpp_api.connect(name, msg_handler, rx_qlen) + msg_handler = self.msg_handler_sync if not async \ + else self.msg_handler_async + return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen, + async) - if rv != 0: - raise IOError(2, 'Connect failed') - self.connected = True + def connect_sync (self, name, chroot_prefix = None, rx_qlen = 32): + """Attach to VPP in synchronous mode. Application must poll for events. - self._load_dictionary() - self._register_functions(async=async) + name - the name of the client. + chroot_prefix - if VPP is chroot'ed, the prefix of the jail + rx_qlen - the length of the VPP message receive queue between + client and server. + """ - # Initialise control ping - self.control_ping_index = self.vpp_dictionary['control_ping']['id'] - self.control_ping_msgdef = self.messages['control_ping'] + return self.connect_internal(name, None, chroot_prefix, rx_qlen, + async=False) def disconnect(self): """Detach from VPP.""" @@ -404,56 +427,6 @@ class VPP(): self.connected = False return rv - def results_wait(self, context): - """In a sync call, wait for the reply - - The context ID is used to pair reply to request. - """ - - # Results is filled by the background callback. It will - # raise the event when the context receives a response. - # Given there are two threads we have to be careful with the - # use of results and the structures under it, hence the lock. - with self.results_lock: - result = self.results[context] - ev = result['e'] - - timed_out = not ev.wait(self.timeout) - - if timed_out: - raise IOError(3, 'Waiting for reply timed out') - else: - with self.results_lock: - result = self.results[context] - del self.results[context] - return result['r'] - - def results_prepare(self, context, multi=False): - """Prep for receiving a result in response to a request msg - - context - unique context number sent in request and - returned in reply or replies - multi - true if we expect multiple messages from this - reply. - """ - - # The event is used to indicate that all results are in - new_result = { - 'e': threading.Event(), - } - if multi: - # Make it clear to the BG thread it's going to see several - # messages; messages are stored in a results array - new_result['m'] = True - new_result['r'] = [] - - new_result['e'].clear() - - # Put the prepped result structure into results, at which point - # the bg thread can also access it (hence the thread lock) - with self.results_lock: - self.results[context] = new_result - def msg_handler_sync(self, msg): """Process an incoming message from VPP in sync mode. @@ -473,32 +446,9 @@ class VPP(): if context == 0: # No context -> async notification that we feed to the callback - if self.event_callback: - self.event_callback(msgname, r) + self.message_queue.put_nowait(r) else: - # Context -> use the results structure (carefully) to find - # who we're responding to and return the message to that - # thread - with self.results_lock: - if context not in self.results: - eprint('Not expecting results for this context', context, r) - else: - result = self.results[context] - - # - # Collect results until control ping - # - - if msgname == 'control_ping_reply': - # End of a multipart - result['e'].set() - elif 'm' in self.results[context]: - # One element in a multipart - result['r'].append(r) - else: - # All of a single result - result['r'] = r - result['e'].set() + raise IOError(2, 'RPC reply message received in event handler') def decode_incoming_msg(self, msg): if not msg: @@ -556,16 +506,16 @@ class VPP(): no response within the timeout window. """ - # We need a context if not supplied, in order to get the - # response - context = kwargs.get('context', self.get_context()) - kwargs['context'] = context - - # Set up to receive a response - self.results_prepare(context, multi=multipart) + if not 'context' in kwargs: + context = self.get_context() + kwargs['context'] = context + else: + context = kwargs['context'] + kwargs['_vl_msg_id'] = i + b = self.encode(msgdef, kwargs) - # Output the message - self._call_vpp_async(i, msgdef, **kwargs) + vpp_api.suspend() + self._write(b) if multipart: # Send a ping after the request - we use its response @@ -573,9 +523,30 @@ class VPP(): self._control_ping(context) # Block until we get a reply. - r = self.results_wait(context) + rl = [] + while (True): + msg = self._read() + if not msg: + print('PNEUM ERROR: OH MY GOD') + raise IOError(2, 'PNEUM read failed') + + r = self.decode_incoming_msg(msg) + msgname = type(r).__name__ + if not context in r or r.context == 0 or context != r.context: + self.message_queue.put_nowait(r) + continue - return r + if not multipart: + rl = r + break + if msgname == 'control_ping_reply': + break + + rl.append(r) + + vpp_api.resume() + + return rl def _call_vpp_async(self, i, msgdef, **kwargs): """Given a message, send the message and await a reply. @@ -613,3 +584,16 @@ class VPP(): callback. """ self.event_callback = callback + + def thread_msg_handler(self): + """Python thread calling the user registerd message handler. + + This is to emulate the old style event callback scheme. Modern + clients should provide their own thread to poll the event + queue. + """ + while True: + r = self.message_queue.get() + msgname = type(r).__name__ + if self.event_callback: + self.event_callback(msgname, r) -- cgit 1.2.3-korg