aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/vpp-api/python/vpp_papi/vpp_papi.py5
-rw-r--r--src/vpp-api/python/vpp_papi/vpp_transport_socket.py19
2 files changed, 18 insertions, 6 deletions
diff --git a/src/vpp-api/python/vpp_papi/vpp_papi.py b/src/vpp-api/python/vpp_papi/vpp_papi.py
index f29c250744d..cd1f2e549de 100644
--- a/src/vpp-api/python/vpp_papi/vpp_papi.py
+++ b/src/vpp-api/python/vpp_papi/vpp_papi.py
@@ -465,6 +465,8 @@ class VPPApiClient(object):
target=self.thread_msg_handler)
self.event_thread.daemon = True
self.event_thread.start()
+ else:
+ self.event_thread = None
return rv
def connect(self, name, chroot_prefix=None, do_async=False, rx_qlen=32):
@@ -495,7 +497,8 @@ class VPPApiClient(object):
def disconnect(self):
"""Detach from VPP."""
rv = self.transport.disconnect()
- self.message_queue.put("terminate event thread")
+ if self.event_thread is not None:
+ self.message_queue.put("terminate event thread")
return rv
def msg_handler_sync(self, msg):
diff --git a/src/vpp-api/python/vpp_papi/vpp_transport_socket.py b/src/vpp-api/python/vpp_papi/vpp_transport_socket.py
index 115a2c26428..6989e9ac9ba 100644
--- a/src/vpp-api/python/vpp_papi/vpp_transport_socket.py
+++ b/src/vpp-api/python/vpp_papi/vpp_transport_socket.py
@@ -29,9 +29,13 @@ class VppTransport(object):
self.server_address = server_address
self.header = struct.Struct('>QII')
self.message_table = {}
+ # These queues can be accessed async.
+ # They are always up, but replaced on connect.
+ # TODO: Use multiprocessing.Pipe instead of multiprocessing.Queue
+ # if possible.
+ self.sque = multiprocessing.Queue()
+ self.q = multiprocessing.Queue()
# The following fields are set in connect().
- self.sque = None
- self.q = None
self.message_thread = None
self.socket = None
@@ -92,7 +96,13 @@ class VppTransport(object):
self.connected = True
- # TODO: Can this block be moved even later?
+ # Queues' feeder threads from previous connect may still be sending.
+ # Close and join to avoid any errors.
+ self.sque.close()
+ self.q.close()
+ self.sque.join_thread()
+ self.q.join_thread()
+ # Finally safe to replace.
self.sque = multiprocessing.Queue()
self.q = multiprocessing.Queue()
self.message_thread = threading.Thread(target=self.msg_thread_func)
@@ -143,10 +153,9 @@ class VppTransport(object):
# Allow additional connect() calls.
self.message_thread.join()
# Collect garbage.
- self.sque = None
- self.q = None
self.message_thread = None
self.socket = None
+ # Queues will be collected after connect replaces them.
return rv
def suspend(self):