diff options
Diffstat (limited to 'src/vpp-api/python/vpp_papi/vpp_transport_socket.py')
-rw-r--r-- | src/vpp-api/python/vpp_papi/vpp_transport_socket.py | 176 |
1 files changed, 176 insertions, 0 deletions
diff --git a/src/vpp-api/python/vpp_papi/vpp_transport_socket.py b/src/vpp-api/python/vpp_papi/vpp_transport_socket.py new file mode 100644 index 00000000000..1822deb6d07 --- /dev/null +++ b/src/vpp-api/python/vpp_papi/vpp_transport_socket.py @@ -0,0 +1,176 @@ +# +# VPP Unix Domain Socket Transport. +# +import socket +import struct +import threading +import select +import multiprocessing +import logging + + +class VppTransport: + def __init__(self, parent, read_timeout, server_address): + self.connected = False + self.read_timeout = read_timeout if read_timeout > 0 else 1 + self.parent = parent + self.server_address = server_address + self.header = struct.Struct('>QII') + self.message_table = {} + self.sque = multiprocessing.Queue() + self.q = multiprocessing.Queue() + self.message_thread = threading.Thread(target=self.msg_thread_func) + + def msg_thread_func(self): + while True: + try: + rlist, _, _ = select.select([self.socket, + self.sque._reader], [], []) + except socket.error: + # Terminate thread + logging.error('select failed') + self.q.put(None) + return + + for r in rlist: + if r == self.sque._reader: + # Terminate + self.q.put(None) + return + + elif r == self.socket: + try: + msg = self._read() + if not msg: + self.q.put(None) + return + except socket.error: + self.q.put(None) + return + # Put either to local queue or if context == 0 + # callback queue + r = self.parent.decode_incoming_msg(msg) + if hasattr(r, 'context') and r.context > 0: + self.q.put(msg) + else: + self.parent.msg_handler_async(msg) + else: + raise IOError(2, 'Unknown response from select') + + def connect(self, name, pfx, msg_handler, rx_qlen): + + # Create a UDS socket + self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET) + self.socket.settimeout(self.read_timeout) + + # Connect the socket to the port where the server is listening + try: + self.socket.connect(self.server_address) + except socket.error as msg: + logging.error(msg) + raise + + self.connected = True + + # Initialise sockclnt_create + sockclnt_create = self.parent.messages['sockclnt_create'] + sockclnt_create_reply = self.parent.messages['sockclnt_create_reply'] + + args = {'_vl_msg_id': 15, + 'name': name, + 'context': 124} + b = sockclnt_create.pack(args) + self.write(b) + msg = self._read() + hdr, length = self.parent.header.unpack(msg, 0) + if hdr.msgid != 16: + raise IOError('Invalid reply message') + + r, length = sockclnt_create_reply.unpack(msg) + self.socket_index = r.index + for m in r.message_table: + n = m.name.rstrip(b'\x00\x13') + self.message_table[n] = m.index + + self.message_thread.daemon = True + self.message_thread.start() + + return 0 + + def disconnect(self): + try: # Might fail, if VPP closes socket before packet makes it out + rv = self.parent.api.sockclnt_delete(index=self.socket_index) + except IOError: + pass + self.connected = False + self.socket.close() + self.sque.put(True) # Terminate listening thread + self.message_thread.join() + + def suspend(self): + pass + + def resume(self): + pass + + def callback(self): + raise NotImplemented + + def get_callback(self, async): + return self.callback + + def get_msg_index(self, name): + try: + return self.message_table[name] + except KeyError: + return 0 + + def msg_table_max_index(self): + return len(self.message_table) + + def write(self, buf): + """Send a binary-packed message to VPP.""" + if not self.connected: + raise IOError(1, 'Not connected') + + # Send header + header = self.header.pack(0, len(buf), 0) + n = self.socket.send(header) + n = self.socket.send(buf) + + def _read(self): + # Header and message + try: + msg = self.socket.recv(4096) + if len(msg) == 0: + return None + except socket.error as message: + logging.error(message) + raise + + (_, l, _) = self.header.unpack(msg[:16]) + + if l > len(msg): + buf = bytearray(l + 16) + view = memoryview(buf) + view[:4096] = msg + view = view[4096:] + # Read rest of message + remaining_bytes = l - 4096 + 16 + while remaining_bytes > 0: + bytes_to_read = (remaining_bytes if remaining_bytes + <= 4096 else 4096) + nbytes = self.socket.recv_into(view, bytes_to_read) + if nbytes == 0: + logging.error('recv failed') + break + view = view[nbytes:] + remaining_bytes -= nbytes + else: + buf = msg + return buf[16:] + + def read(self): + if not self.connected: + raise IOError(1, 'Not connected') + return self.q.get() |