summaryrefslogtreecommitdiffstats
path: root/scripts/automation/trex_control_plane/client_utils
diff options
context:
space:
mode:
authorDan Klein <danklein10@gmail.com>2015-11-24 08:54:53 +0200
committerDan Klein <danklein10@gmail.com>2015-11-24 08:54:53 +0200
commite7cb8b0f6c2fbe08d2086a7408040ac7d12aee5a (patch)
tree1b27e542fe9f3ae4abdc8245b804cda25a6e2c2f /scripts/automation/trex_control_plane/client_utils
parent597f74d8ed10abc3dd9df7e81ecea5ac2f5c714e (diff)
parentf3861d504353729724086dec82c79e818224554f (diff)
Merge branch 'master' into dan_stateless
Diffstat (limited to 'scripts/automation/trex_control_plane/client_utils')
-rwxr-xr-xscripts/automation/trex_control_plane/client_utils/jsonrpc_client.py325
1 files changed, 29 insertions, 296 deletions
diff --git a/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py b/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py
index 58491aba..b826f02f 100755
--- a/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py
+++ b/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py
@@ -110,45 +110,45 @@ class JsonRpcClient(object):
return id, msg
- def invoke_rpc_method (self, method_name, params = {}, block = False):
+ def invoke_rpc_method (self, method_name, params = {}):
if not self.connected:
return False, "Not connected to server"
id, msg = self.create_jsonrpc_v2(method_name, params)
- return self.send_raw_msg(msg, block)
+ return self.send_raw_msg(msg)
# low level send of string message
- def send_raw_msg (self, msg, block = False):
+ def send_raw_msg (self, msg):
+
self.verbose_msg("Sending Request To Server:\n\n" + self.pretty_json(msg) + "\n")
- if block:
- self.socket.send(msg)
- else:
+ tries = 0
+ while True:
try:
- self.socket.send(msg, flags = zmq.NOBLOCK)
- except zmq.error.ZMQError as e:
- self.disconnect()
- return CmdResponse(False, "Failed To Get Send Message")
-
- got_response = False
+ self.socket.send(msg)
+ break
+ except zmq.Again:
+ sleep(0.1)
+ tries += 1
+ if tries > 10:
+ self.disconnect()
+ return CmdResponse(False, "Failed to send message to server")
+
+
+ tries = 0
+ while True:
+ try:
+ response = self.socket.recv()
+ break
+ except zmq.Again:
+ sleep(0.1)
+ tries += 1
+ if tries > 10:
+ self.disconnect()
+ return CmdResponse(False, "Failed to get server response")
- if block:
- response = self.socket.recv()
- got_response = True
- else:
- for i in xrange(0 ,10):
- try:
- response = self.socket.recv(flags = zmq.NOBLOCK)
- got_response = True
- break
- except zmq.Again:
- sleep(0.2)
-
- if not got_response:
- self.disconnect()
- return CmdResponse(False, "Failed To Get Server Response")
self.verbose_msg("Server Response:\n\n" + self.pretty_json(response) + "\n")
@@ -223,6 +223,8 @@ class JsonRpcClient(object):
except zmq.error.ZMQError as e:
return False, "ZMQ Error: Bad server or port name: " + str(e)
+ self.socket.setsockopt(zmq.SNDTIMEO, 5)
+ self.socket.setsockopt(zmq.RCVTIMEO, 5)
self.connected = True
@@ -248,272 +250,3 @@ class JsonRpcClient(object):
if hasattr(self, "context"):
self.context.destroy(linger=0)
-# MOVE THIS TO DAN'S FILE
-class TrexStatelessClient(JsonRpcClient):
-
- def __init__ (self, server, port, user):
-
- super(TrexStatelessClient, self).__init__(server, port)
-
- self.user = user
- self.port_handlers = {}
-
- self.supported_cmds = []
- self.system_info = None
- self.server_version = None
-
-
- def whoami (self):
- return self.user
-
- def ping_rpc_server(self):
-
- return self.invoke_rpc_method("ping", block = False)
-
- def get_rpc_server_version (self):
- return self.server_version
-
- def get_system_info (self):
- if not self.system_info:
- return {}
-
- return self.system_info
-
- def get_supported_cmds(self):
- if not self.supported_cmds:
- return {}
-
- return self.supported_cmds
-
- def get_port_count (self):
- if not self.system_info:
- return 0
-
- return self.system_info["port_count"]
-
- # sync the client with all the server required data
- def sync (self):
-
- # get server version
- rc, msg = self.invoke_rpc_method("get_version")
- if not rc:
- self.disconnect()
- return rc, msg
-
- self.server_version = msg
-
- # get supported commands
- rc, msg = self.invoke_rpc_method("get_supported_cmds")
- if not rc:
- self.disconnect()
- return rc, msg
-
- self.supported_cmds = [str(x) for x in msg if x]
-
- # get system info
- rc, msg = self.invoke_rpc_method("get_system_info")
- if not rc:
- self.disconnect()
- return rc, msg
-
- self.system_info = msg
-
- return True, ""
-
- def connect (self):
- rc, err = super(TrexStatelessClient, self).connect()
- if not rc:
- return rc, err
-
- return self.sync()
-
-
- # take ownership over ports
- def take_ownership (self, port_id_array, force = False):
- if not self.connected:
- return False, "Not connected to server"
-
- batch = self.create_batch()
-
- for port_id in port_id_array:
- batch.add("acquire", params = {"port_id":port_id, "user":self.user, "force":force})
-
- rc, resp_list = batch.invoke()
- if not rc:
- return rc, resp_list
-
- for i, rc in enumerate(resp_list):
- if rc[0]:
- self.port_handlers[port_id_array[i]] = rc[1]
-
- return True, resp_list
-
-
- def release_ports (self, port_id_array):
- batch = self.create_batch()
-
- for port_id in port_id_array:
-
- # let the server handle un-acquired errors
- if self.port_handlers.get(port_id):
- handler = self.port_handlers[port_id]
- else:
- handler = ""
-
- batch.add("release", params = {"port_id":port_id, "handler":handler})
-
-
- rc, resp_list = batch.invoke()
- if not rc:
- return rc, resp_list
-
- for i, rc in enumerate(resp_list):
- if rc[0]:
- self.port_handlers.pop(port_id_array[i])
-
- return True, resp_list
-
- def get_owned_ports (self):
- return self.port_handlers.keys()
-
- # fetch port stats
- def get_port_stats (self, port_id_array):
- if not self.connected:
- return False, "Not connected to server"
-
- batch = self.create_batch()
-
- # empty list means all
- if port_id_array == []:
- port_id_array = list([x for x in xrange(0, self.system_info["port_count"])])
-
- for port_id in port_id_array:
-
- # let the server handle un-acquired errors
- if self.port_handlers.get(port_id):
- handler = self.port_handlers[port_id]
- else:
- handler = ""
-
- batch.add("get_port_stats", params = {"port_id":port_id, "handler":handler})
-
-
- rc, resp_list = batch.invoke()
-
- return rc, resp_list
-
- # snapshot will take a snapshot of all your owned ports for streams and etc.
- def snapshot(self):
-
-
- if len(self.get_owned_ports()) == 0:
- return {}
-
- snap = {}
-
- batch = self.create_batch()
-
- for port_id in self.get_owned_ports():
-
- batch.add("get_port_stats", params = {"port_id": port_id, "handler": self.port_handlers[port_id]})
- batch.add("get_stream_list", params = {"port_id": port_id, "handler": self.port_handlers[port_id]})
-
- rc, resp_list = batch.invoke()
- if not rc:
- return rc, resp_list
-
- # split the list to 2s
- index = 0
- for port_id in self.get_owned_ports():
- if not resp_list[index] or not resp_list[index + 1]:
- snap[port_id] = None
- continue
-
- # fetch the first two
- stats = resp_list[index][1]
- stream_list = resp_list[index + 1][1]
-
- port = {}
- port['status'] = stats['status']
- port['stream_list'] = []
-
- # get all the streams
- if len(stream_list) > 0:
- batch = self.create_batch()
- for stream_id in stream_list:
- batch.add("get_stream", params = {"port_id": port_id, "stream_id": stream_id, "handler": self.port_handlers[port_id]})
-
- rc, stream_resp_list = batch.invoke()
- if not rc:
- port = {}
-
- port['streams'] = {}
- for i, resp in enumerate(stream_resp_list):
- if resp[0]:
- port['streams'][stream_list[i]] = resp[1]
-
- snap[port_id] = port
-
- # move to next one
- index += 2
-
-
- return snap
-
- # add stream
- # def add_stream (self, port_id, stream_id, isg, next_stream_id, packet, vm=[]):
- # if not port_id in self.get_owned_ports():
- # return False, "Port {0} is not owned... please take ownership before adding streams".format(port_id)
- #
- # handler = self.port_handlers[port_id]
- #
- # stream = {}
- # stream['enabled'] = True
- # stream['self_start'] = True
- # stream['isg'] = isg
- # stream['next_stream_id'] = next_stream_id
- # stream['packet'] = {}
- # stream['packet']['binary'] = packet
- # stream['packet']['meta'] = ""
- # stream['vm'] = vm
- # stream['rx_stats'] = {}
- # stream['rx_stats']['enabled'] = False
- #
- # stream['mode'] = {}
- # stream['mode']['type'] = 'continuous'
- # stream['mode']['pps'] = 10.0
- #
- # params = {}
- # params['handler'] = handler
- # params['stream'] = stream
- # params['port_id'] = port_id
- # params['stream_id'] = stream_id
- #
- # print params
- # return self.invoke_rpc_method('add_stream', params = params)
-
- def add_stream(self, port_id_array, stream_pack_list):
- batch = self.create_batch()
-
- for port_id in port_id_array:
- for stream_pack in stream_pack_list:
- params = {"port_id": port_id,
- "handler": self.port_handlers[port_id],
- "stream_id": stream_pack.stream_id,
- "stream": stream_pack.stream}
- batch.add("add_stream", params=params)
- rc, resp_list = batch.invoke()
- if not rc:
- return rc, resp_list
-
- for i, rc in enumerate(resp_list):
- if rc[0]:
- print "Stream {0} - {1}".format(i, rc[1])
- # self.port_handlers[port_id_array[i]] = rc[1]
-
- return True, resp_list
-
- # return self.invoke_rpc_method('add_stream', params = params)
-
-if __name__ == "__main__":
- pass \ No newline at end of file