From dfc9b7cac857a3a49555f9fc448bd2c6aa3400a6 Mon Sep 17 00:00:00 2001
From: Ole Troan <ot@cisco.com>
Date: Mon, 6 Mar 2017 23:51:57 +0100
Subject: Python API: Synchronous mode.

Change-Id: Ic8f186dbb35bb4e2e191d311cab51315a88a2d81
Signed-off-by: Ole Troan <ot@cisco.com>
---
 src/vpp-api/python/vpp_papi/pneum_wrap.c |  39 +++++--
 src/vpp-api/python/vpp_papi/vpp_papi.py  | 194 ++++++++++++++-----------------
 2 files changed, 121 insertions(+), 112 deletions(-)

(limited to 'src/vpp-api/python/vpp_papi')

diff --git a/src/vpp-api/python/vpp_papi/pneum_wrap.c b/src/vpp-api/python/vpp_papi/pneum_wrap.c
index 748b96744aa..c5a7eea1e00 100644
--- a/src/vpp-api/python/vpp_papi/pneum_wrap.c
+++ b/src/vpp-api/python/vpp_papi/pneum_wrap.c
@@ -42,19 +42,19 @@ wrap_pneum_callback (unsigned char * data, int len)
 }
 
 static PyObject *
-wrap_connect (PyObject *self, PyObject *args)
+wrap_connect (PyObject *self, PyObject *args, PyObject *kw)
 {
   char * name, * chroot_prefix = NULL;
-  int rx_qlen=32; /* default rx queue length */
+  int rx_qlen = 32; /* default rx queue length */
   int rv;
   PyObject * temp = NULL;
   pneum_callback_t cb = NULL;
 
-  if (!PyArg_ParseTuple(args, "s|Ois:wrap_connect",
-			&name, &temp, &rx_qlen, &chroot_prefix))
+  if (!PyArg_ParseTuple(args, "sOzi:wrap_connect",
+			&name, &temp, &chroot_prefix, &rx_qlen))
     return (NULL);
 
-  if (temp)
+  if (temp != Py_None)
     {
       if (!PyCallable_Check(temp))
 	{
@@ -82,6 +82,7 @@ wrap_disconnect (PyObject *self, PyObject *args)
   Py_END_ALLOW_THREADS
   return PyLong_FromLong(rv);
 }
+
 static PyObject *
 wrap_write (PyObject *self, PyObject *args)
 {
@@ -90,6 +91,7 @@ wrap_write (PyObject *self, PyObject *args)
 
   if (!PyArg_ParseTuple(args, "s#", &data, &len))
     return NULL;
+
   Py_BEGIN_ALLOW_THREADS
   rv = pneum_write(data, len);
   Py_END_ALLOW_THREADS
@@ -102,9 +104,12 @@ wrap_read (PyObject *self, PyObject *args)
 {
   char *data;
   int len, rv;
+  unsigned short timeout;
 
+  if (!PyArg_ParseTuple(args, "H", &timeout))
+    return (NULL);
   Py_BEGIN_ALLOW_THREADS
-  rv = pneum_read(&data, &len);
+  rv = pneum_read(&data, &len, timeout);
   Py_END_ALLOW_THREADS
 
   if (rv != 0) { Py_RETURN_NONE; }
@@ -113,9 +118,9 @@ wrap_read (PyObject *self, PyObject *args)
 #else
   PyObject *ret = Py_BuildValue("s#", data, len);
 #endif
+  pneum_free(data);
   if (!ret) { Py_RETURN_NONE; }
 
-  pneum_free(data);
   return ret;
 }
 
@@ -147,12 +152,32 @@ wrap_msg_table (PyObject *self, PyObject *args)
   Py_RETURN_NONE;
 }
 
+static PyObject *
+wrap_suspend (PyObject *self, PyObject *args)
+{
+  Py_BEGIN_ALLOW_THREADS
+  pneum_rx_suspend();
+  Py_END_ALLOW_THREADS
+  Py_RETURN_NONE;
+}
+
+static PyObject *
+wrap_resume (PyObject *self, PyObject *args)
+{
+  Py_BEGIN_ALLOW_THREADS
+  pneum_rx_resume();
+  Py_END_ALLOW_THREADS
+  Py_RETURN_NONE;
+}
+
 static PyMethodDef vpp_api_Methods[] = {
   {"connect", wrap_connect, METH_VARARGS, "Connect to the VPP API."},
   {"disconnect", wrap_disconnect, METH_VARARGS, "Disconnect from the VPP API."},
   {"write", wrap_write, METH_VARARGS, "Write data to the VPP API."},
   {"read", wrap_read, METH_VARARGS, "Read data from the VPP API."},
   {"msg_table", wrap_msg_table, METH_VARARGS, "Get API dictionary."},
+  {"suspend", wrap_suspend, METH_VARARGS, "Suspend RX thread."},
+  {"resume", wrap_resume, METH_VARARGS, "Resume RX thread."},
   {NULL, NULL, 0, NULL}        /* Sentinel */
 };
 
diff --git a/src/vpp-api/python/vpp_papi/vpp_papi.py b/src/vpp-api/python/vpp_papi/vpp_papi.py
index 83247ffa04b..0c40f1710e5 100644
--- a/src/vpp-api/python/vpp_papi/vpp_papi.py
+++ b/src/vpp-api/python/vpp_papi/vpp_papi.py
@@ -16,7 +16,7 @@
 
 from __future__ import print_function
 import sys, os, logging, collections, struct, json, threading, glob
-import atexit
+import atexit, Queue
 
 logging.basicConfig(level=logging.DEBUG)
 import vpp_api
@@ -57,7 +57,7 @@ class VPP():
     provides a means to register a callback function to receive
     these messages in a background thread.
     """
-    def __init__(self, apifiles = None, testmode = False):
+    def __init__(self, apifiles = None, testmode = False, async_thread = True):
         """Create a VPP API object.
 
         apifiles is a list of files containing API
@@ -72,11 +72,15 @@ class VPP():
         self.buffersize = 10000
         self.connected = False
         self.header = struct.Struct('>HI')
-        self.results_lock = threading.Lock()
-        self.results = {}
-        self.timeout = 5
         self.apifiles = []
         self.event_callback = None
+        self.message_queue = Queue.Queue()
+        self.read_timeout = 0
+        self.vpp_api = vpp_api
+        if async_thread:
+            self.event_thread = threading.Thread(target=self.thread_msg_handler)
+            self.event_thread.daemon = True
+            self.event_thread.start()
 
         if not apifiles:
             # Pick up API definitions from default directory
@@ -346,7 +350,7 @@ class VPP():
                 f = self.make_function(name, i, msgdef, multipart, async)
                 setattr(self._api, name, FuncWrapper(f))
 
-                # olf API stuff starts here - will be removed in 17.07
+                # old API stuff starts here - will be removed in 17.07
                 if hasattr(self, name):
                     raise NameError(
                         3, "Conflicting name in JSON definition: `%s'" % name)
@@ -359,6 +363,12 @@ class VPP():
             raise IOError(1, 'Not connected')
         return vpp_api.write(str(buf))
 
+    def _read (self):
+        if not self.connected:
+            raise IOError(1, 'Not connected')
+
+        return vpp_api.read(self.read_timeout)
+
     def _load_dictionary(self):
         self.vpp_dictionary = {}
         self.vpp_dictionary_maxid = 0
@@ -372,6 +382,19 @@ class VPP():
             self.vpp_dictionary[name] = { 'id' : i, 'crc' : crc }
             self.vpp_dictionary_maxid = max(self.vpp_dictionary_maxid, i)
 
+    def connect_internal(self, name, msg_handler, chroot_prefix, rx_qlen, async):
+	rv = vpp_api.connect(name, msg_handler, chroot_prefix, rx_qlen)
+        if rv != 0:
+            raise IOError(2, 'Connect failed')
+        self.connected = True
+
+        self._load_dictionary()
+        self._register_functions(async=async)
+
+        # Initialise control ping
+        self.control_ping_index = self.vpp_dictionary['control_ping']['id']
+        self.control_ping_msgdef = self.messages['control_ping']
+
     def connect(self, name, chroot_prefix = None, async = False, rx_qlen = 32):
         """Attach to VPP.
 
@@ -381,22 +404,22 @@ class VPP():
         rx_qlen - the length of the VPP message receive queue between
         client and server.
         """
-        msg_handler = self.msg_handler_sync if not async else self.msg_handler_async
-	if chroot_prefix is not None:
-	    rv = vpp_api.connect(name, msg_handler, rx_qlen, chroot_prefix)
-        else:
-	    rv = vpp_api.connect(name, msg_handler, rx_qlen)
+        msg_handler = self.msg_handler_sync if not async \
+                      else self.msg_handler_async
+        return self.connect_internal(name, msg_handler, chroot_prefix, rx_qlen,
+                                     async)
 
-        if rv != 0:
-            raise IOError(2, 'Connect failed')
-        self.connected = True
+    def connect_sync (self, name, chroot_prefix = None, rx_qlen = 32):
+        """Attach to VPP in synchronous mode. Application must poll for events.
 
-        self._load_dictionary()
-        self._register_functions(async=async)
+        name - the name of the client.
+        chroot_prefix - if VPP is chroot'ed, the prefix of the jail
+        rx_qlen - the length of the VPP message receive queue between
+        client and server.
+        """
 
-        # Initialise control ping
-        self.control_ping_index = self.vpp_dictionary['control_ping']['id']
-        self.control_ping_msgdef = self.messages['control_ping']
+        return self.connect_internal(name, None, chroot_prefix, rx_qlen,
+                                     async=False)
 
     def disconnect(self):
         """Detach from VPP."""
@@ -404,56 +427,6 @@ class VPP():
         self.connected = False
         return rv
 
-    def results_wait(self, context):
-        """In a sync call, wait for the reply
-
-        The context ID is used to pair reply to request.
-        """
-
-        # Results is filled by the background callback.  It will
-        # raise the event when the context receives a response.
-        # Given there are two threads we have to be careful with the
-        # use of results and the structures under it, hence the lock.
-        with self.results_lock:
-            result = self.results[context]
-            ev = result['e']
-
-	timed_out = not ev.wait(self.timeout)
-
-	if timed_out:
-	   raise IOError(3, 'Waiting for reply timed out')
-	else:
-	    with self.results_lock:
-                result = self.results[context]
-		del self.results[context]
-		return result['r']
-
-    def results_prepare(self, context, multi=False):
-        """Prep for receiving a result in response to a request msg
-
-        context - unique context number sent in request and
-        returned in reply or replies
-        multi - true if we expect multiple messages from this
-        reply.
-        """
-
-        # The event is used to indicate that all results are in
-        new_result = {
-            'e': threading.Event(),
-        }
-        if multi:
-            # Make it clear to the BG thread it's going to see several
-            # messages; messages are stored in a results array
-            new_result['m'] = True
-            new_result['r'] = []
-
-        new_result['e'].clear()
-
-        # Put the prepped result structure into results, at which point
-        # the bg thread can also access it (hence the thread lock)
-        with self.results_lock:
-            self.results[context] = new_result
-
     def msg_handler_sync(self, msg):
         """Process an incoming message from VPP in sync mode.
 
@@ -473,32 +446,9 @@ class VPP():
 
         if context == 0:
             # No context -> async notification that we feed to the callback
-	    if self.event_callback:
-		self.event_callback(msgname, r)
+            self.message_queue.put_nowait(r)
         else:
-            # Context -> use the results structure (carefully) to find
-            # who we're responding to and return the message to that
-            # thread
-            with self.results_lock:
-                if context not in self.results:
-                    eprint('Not expecting results for this context', context, r)
-                else:
-                    result = self.results[context]
-
-                    #
-                    # Collect results until control ping
-                    #
-
-                    if msgname == 'control_ping_reply':
-                        # End of a multipart
-                        result['e'].set()
-                    elif 'm' in self.results[context]:
-                        # One element in a multipart
-                        result['r'].append(r)
-                    else:
-                        # All of a single result
-                        result['r'] = r
-                        result['e'].set()
+            raise IOError(2, 'RPC reply message received in event handler')
 
     def decode_incoming_msg(self, msg):
         if not msg:
@@ -556,16 +506,16 @@ class VPP():
         no response within the timeout window.
         """
 
-        # We need a context if not supplied, in order to get the
-        # response
-        context = kwargs.get('context', self.get_context())
-	kwargs['context'] = context
-
-        # Set up to receive a response
-        self.results_prepare(context, multi=multipart)
+        if not 'context' in kwargs:
+            context = self.get_context()
+            kwargs['context'] = context
+        else:
+            context = kwargs['context']
+        kwargs['_vl_msg_id'] = i
+        b = self.encode(msgdef, kwargs)
 
-        # Output the message
-        self._call_vpp_async(i, msgdef, **kwargs)
+        vpp_api.suspend()
+        self._write(b)
 
         if multipart:
             # Send a ping after the request - we use its response
@@ -573,9 +523,30 @@ class VPP():
             self._control_ping(context)
 
         # Block until we get a reply.
-        r = self.results_wait(context)
+        rl = []
+        while (True):
+            msg = self._read()
+            if not msg:
+                print('PNEUM ERROR: OH MY GOD')
+                raise IOError(2, 'PNEUM read failed')
+
+            r = self.decode_incoming_msg(msg)
+            msgname = type(r).__name__
+            if not context in r or r.context == 0 or context != r.context:
+                self.message_queue.put_nowait(r)
+                continue
 
-        return r
+            if not multipart:
+                rl = r
+                break
+            if msgname == 'control_ping_reply':
+                break
+
+            rl.append(r)
+
+        vpp_api.resume()
+
+        return rl
 
     def _call_vpp_async(self, i, msgdef, **kwargs):
         """Given a message, send the message and await a reply.
@@ -613,3 +584,16 @@ class VPP():
         callback.
         """
         self.event_callback = callback
+
+    def thread_msg_handler(self):
+        """Python thread calling the user registerd message handler.
+
+        This is to emulate the old style event callback scheme. Modern
+        clients should provide their own thread to poll the event
+        queue.
+        """
+        while True:
+            r = self.message_queue.get()
+            msgname = type(r).__name__
+	    if self.event_callback:
+	        self.event_callback(msgname, r)
-- 
cgit