aboutsummaryrefslogtreecommitdiffstats
path: root/src/vpp-api/python/vpp_papi/vpp_transport_socket.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/vpp-api/python/vpp_papi/vpp_transport_socket.py')
-rw-r--r--src/vpp-api/python/vpp_papi/vpp_transport_socket.py176
1 files changed, 176 insertions, 0 deletions
diff --git a/src/vpp-api/python/vpp_papi/vpp_transport_socket.py b/src/vpp-api/python/vpp_papi/vpp_transport_socket.py
new file mode 100644
index 00000000000..1822deb6d07
--- /dev/null
+++ b/src/vpp-api/python/vpp_papi/vpp_transport_socket.py
@@ -0,0 +1,176 @@
+#
+# VPP Unix Domain Socket Transport.
+#
+import socket
+import struct
+import threading
+import select
+import multiprocessing
+import logging
+
+
+class VppTransport:
+ def __init__(self, parent, read_timeout, server_address):
+ self.connected = False
+ self.read_timeout = read_timeout if read_timeout > 0 else 1
+ self.parent = parent
+ self.server_address = server_address
+ self.header = struct.Struct('>QII')
+ self.message_table = {}
+ self.sque = multiprocessing.Queue()
+ self.q = multiprocessing.Queue()
+ self.message_thread = threading.Thread(target=self.msg_thread_func)
+
+ def msg_thread_func(self):
+ while True:
+ try:
+ rlist, _, _ = select.select([self.socket,
+ self.sque._reader], [], [])
+ except socket.error:
+ # Terminate thread
+ logging.error('select failed')
+ self.q.put(None)
+ return
+
+ for r in rlist:
+ if r == self.sque._reader:
+ # Terminate
+ self.q.put(None)
+ return
+
+ elif r == self.socket:
+ try:
+ msg = self._read()
+ if not msg:
+ self.q.put(None)
+ return
+ except socket.error:
+ self.q.put(None)
+ return
+ # Put either to local queue or if context == 0
+ # callback queue
+ r = self.parent.decode_incoming_msg(msg)
+ if hasattr(r, 'context') and r.context > 0:
+ self.q.put(msg)
+ else:
+ self.parent.msg_handler_async(msg)
+ else:
+ raise IOError(2, 'Unknown response from select')
+
+ def connect(self, name, pfx, msg_handler, rx_qlen):
+
+ # Create a UDS socket
+ self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET)
+ self.socket.settimeout(self.read_timeout)
+
+ # Connect the socket to the port where the server is listening
+ try:
+ self.socket.connect(self.server_address)
+ except socket.error as msg:
+ logging.error(msg)
+ raise
+
+ self.connected = True
+
+ # Initialise sockclnt_create
+ sockclnt_create = self.parent.messages['sockclnt_create']
+ sockclnt_create_reply = self.parent.messages['sockclnt_create_reply']
+
+ args = {'_vl_msg_id': 15,
+ 'name': name,
+ 'context': 124}
+ b = sockclnt_create.pack(args)
+ self.write(b)
+ msg = self._read()
+ hdr, length = self.parent.header.unpack(msg, 0)
+ if hdr.msgid != 16:
+ raise IOError('Invalid reply message')
+
+ r, length = sockclnt_create_reply.unpack(msg)
+ self.socket_index = r.index
+ for m in r.message_table:
+ n = m.name.rstrip(b'\x00\x13')
+ self.message_table[n] = m.index
+
+ self.message_thread.daemon = True
+ self.message_thread.start()
+
+ return 0
+
+ def disconnect(self):
+ try: # Might fail, if VPP closes socket before packet makes it out
+ rv = self.parent.api.sockclnt_delete(index=self.socket_index)
+ except IOError:
+ pass
+ self.connected = False
+ self.socket.close()
+ self.sque.put(True) # Terminate listening thread
+ self.message_thread.join()
+
+ def suspend(self):
+ pass
+
+ def resume(self):
+ pass
+
+ def callback(self):
+ raise NotImplemented
+
+ def get_callback(self, async):
+ return self.callback
+
+ def get_msg_index(self, name):
+ try:
+ return self.message_table[name]
+ except KeyError:
+ return 0
+
+ def msg_table_max_index(self):
+ return len(self.message_table)
+
+ def write(self, buf):
+ """Send a binary-packed message to VPP."""
+ if not self.connected:
+ raise IOError(1, 'Not connected')
+
+ # Send header
+ header = self.header.pack(0, len(buf), 0)
+ n = self.socket.send(header)
+ n = self.socket.send(buf)
+
+ def _read(self):
+ # Header and message
+ try:
+ msg = self.socket.recv(4096)
+ if len(msg) == 0:
+ return None
+ except socket.error as message:
+ logging.error(message)
+ raise
+
+ (_, l, _) = self.header.unpack(msg[:16])
+
+ if l > len(msg):
+ buf = bytearray(l + 16)
+ view = memoryview(buf)
+ view[:4096] = msg
+ view = view[4096:]
+ # Read rest of message
+ remaining_bytes = l - 4096 + 16
+ while remaining_bytes > 0:
+ bytes_to_read = (remaining_bytes if remaining_bytes
+ <= 4096 else 4096)
+ nbytes = self.socket.recv_into(view, bytes_to_read)
+ if nbytes == 0:
+ logging.error('recv failed')
+ break
+ view = view[nbytes:]
+ remaining_bytes -= nbytes
+ else:
+ buf = msg
+ return buf[16:]
+
+ def read(self):
+ if not self.connected:
+ raise IOError(1, 'Not connected')
+ return self.q.get()