diff options
author | 2015-10-22 05:31:22 +0300 | |
---|---|---|
committer | 2015-10-22 05:31:22 +0300 | |
commit | f5c4a8051d697c086eb0d1c8ce3ac90ab245d249 (patch) | |
tree | fb0d379947c4b1984b77f6594d27f96d72c3cba0 /scripts/automation/trex_control_plane/client_utils | |
parent | 508703e11a3fad3e44535c5433f803d77f28e245 (diff) | |
parent | 876c76572fdb2fb8f0e8db21bc420d284dc05950 (diff) |
Merge branch 'master' into dan_stateless
Diffstat (limited to 'scripts/automation/trex_control_plane/client_utils')
4 files changed, 449 insertions, 19 deletions
diff --git a/scripts/automation/trex_control_plane/client_utils/external_packages.py b/scripts/automation/trex_control_plane/client_utils/external_packages.py index 4b10609b..e2bb37a5 100755 --- a/scripts/automation/trex_control_plane/client_utils/external_packages.py +++ b/scripts/automation/trex_control_plane/client_utils/external_packages.py @@ -8,7 +8,8 @@ ROOT_PATH = os.path.abspath(os.path.join(CURRENT_PATH, os.pardir)) PATH_TO_PYTHON_LIB = os.path.abspath(os.path.join(ROOT_PATH, os.pardir, os.pardir, 'external_libs')) CLIENT_UTILS_MODULES = ['zmq', - 'dpkt-1.8.6' + 'dpkt-1.8.6', + 'PyYAML-3.01/lib' ] def import_client_utils_modules(): diff --git a/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py b/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py index 163c6923..ed14e6f8 100755 --- a/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py +++ b/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py @@ -6,9 +6,6 @@ import json import general_utils import re from time import sleep -from collections import namedtuple - -CmdResponse = namedtuple('CmdResponse', ['success', 'data']) class bcolors: BLUE = '\033[94m' @@ -26,12 +23,12 @@ class BatchMessage(object): self.rpc_client = rpc_client self.batch_list = [] - def add (self, method_name, params={}): + def add (self, method_name, params = {}): id, msg = self.rpc_client.create_jsonrpc_v2(method_name, params, encode = False) self.batch_list.append(msg) - def invoke(self, block = False): + def invoke (self, block = False): if not self.rpc_client.connected: return False, "Not connected to server" @@ -39,9 +36,9 @@ class BatchMessage(object): rc, resp_list = self.rpc_client.send_raw_msg(msg, block = False) if len(self.batch_list) == 1: - return CmdResponse(True, [CmdResponse(rc, resp_list)]) + return True, [(rc, resp_list)] else: - return CmdResponse(rc, resp_list) + return rc, resp_list # JSON RPC v2.0 client @@ -130,7 +127,7 @@ class JsonRpcClient(object): self.socket.send(msg, flags = zmq.NOBLOCK) except zmq.error.ZMQError as e: self.disconnect() - return CmdResponse(False, "Failed To Get Send Message") + return False, "Failed To Get Send Message" got_response = False @@ -148,7 +145,7 @@ class JsonRpcClient(object): if not got_response: self.disconnect() - return CmdResponse(False, "Failed To Get Server Response") + return False, "Failed To Get Server Response" self.verbose_msg("Server Response:\n\n" + self.pretty_json(response) + "\n") @@ -162,19 +159,19 @@ class JsonRpcClient(object): for single_response in response_json: rc, msg = self.process_single_response(single_response) - rc_list.append( CmdResponse(rc, msg) ) + rc_list.append( (rc, msg) ) - return CmdResponse(True, rc_list) + return True, rc_list else: rc, msg = self.process_single_response(response_json) - return CmdResponse(rc, msg) + return rc, msg def process_single_response (self, response_json): if (response_json.get("jsonrpc") != "2.0"): - return False, "Malformed Response ({0})".format(str(response)) + return False, "Malfromed Response ({0})".format(str(response)) # error reported by server if ("error" in response_json): @@ -185,7 +182,7 @@ class JsonRpcClient(object): # if no error there should be a result if ("result" not in response_json): - return False, "Malformed Response ({0})".format(str(response)) + return False, "Malfromed Response ({0})".format(str(response)) return True, response_json["result"] @@ -194,7 +191,7 @@ class JsonRpcClient(object): def set_verbose(self, mode): self.verbose = mode - def disconnect(self): + def disconnect (self): if self.connected: self.socket.close(linger = 0) self.context.destroy(linger = 0) @@ -247,3 +244,270 @@ class JsonRpcClient(object): print "Shutting down RPC client\n" if hasattr(self, "context"): self.context.destroy(linger=0) + +# MOVE THIS TO DAN'S FILE +class TrexStatelessClient(JsonRpcClient): + + def __init__ (self, server, port, user): + + super(TrexStatelessClient, self).__init__(server, port) + + self.user = user + self.port_handlers = {} + + self.supported_cmds = [] + self.system_info = None + self.server_version = None + + + def whoami (self): + return self.user + + def ping_rpc_server(self): + + return self.invoke_rpc_method("ping", block = False) + + def get_rpc_server_version (self): + return self.server_version + + def get_system_info (self): + if not self.system_info: + return {} + + return self.system_info + + def get_supported_cmds(self): + if not self.supported_cmds: + return {} + + return self.supported_cmds + + def get_port_count (self): + if not self.system_info: + return 0 + + return self.system_info["port_count"] + + # sync the client with all the server required data + def sync (self): + + # get server version + rc, msg = self.invoke_rpc_method("get_version") + if not rc: + self.disconnect() + return rc, msg + + self.server_version = msg + + # get supported commands + rc, msg = self.invoke_rpc_method("get_supported_cmds") + if not rc: + self.disconnect() + return rc, msg + + self.supported_cmds = [str(x) for x in msg if x] + + # get system info + rc, msg = self.invoke_rpc_method("get_system_info") + if not rc: + self.disconnect() + return rc, msg + + self.system_info = msg + + return True, "" + + def connect (self): + rc, err = super(TrexStatelessClient, self).connect() + if not rc: + return rc, err + + return self.sync() + + + # take ownership over ports + def take_ownership (self, port_id_array, force = False): + if not self.connected: + return False, "Not connected to server" + + batch = self.create_batch() + + for port_id in port_id_array: + batch.add("acquire", params = {"port_id":port_id, "user":self.user, "force":force}) + + rc, resp_list = batch.invoke() + if not rc: + return rc, resp_list + + for i, rc in enumerate(resp_list): + if rc[0]: + self.port_handlers[port_id_array[i]] = rc[1] + + return True, resp_list + + + def release_ports (self, port_id_array): + batch = self.create_batch() + + for port_id in port_id_array: + + # let the server handle un-acquired errors + if self.port_handlers.get(port_id): + handler = self.port_handlers[port_id] + else: + handler = "" + + batch.add("release", params = {"port_id":port_id, "handler":handler}) + + + rc, resp_list = batch.invoke() + if not rc: + return rc, resp_list + + for i, rc in enumerate(resp_list): + if rc[0]: + self.port_handlers.pop(port_id_array[i]) + + return True, resp_list + + def get_owned_ports (self): + return self.port_handlers.keys() + + # fetch port stats + def get_port_stats (self, port_id_array): + if not self.connected: + return False, "Not connected to server" + + batch = self.create_batch() + + # empty list means all + if port_id_array == []: + port_id_array = list([x for x in xrange(0, self.system_info["port_count"])]) + + for port_id in port_id_array: + + # let the server handle un-acquired errors + if self.port_handlers.get(port_id): + handler = self.port_handlers[port_id] + else: + handler = "" + + batch.add("get_port_stats", params = {"port_id":port_id, "handler":handler}) + + + rc, resp_list = batch.invoke() + + return rc, resp_list + + # snapshot will take a snapshot of all your owned ports for streams and etc. + def snapshot(self): + + + if len(self.get_owned_ports()) == 0: + return {} + + snap = {} + + batch = self.create_batch() + + for port_id in self.get_owned_ports(): + + batch.add("get_port_stats", params = {"port_id": port_id, "handler": self.port_handlers[port_id]}) + batch.add("get_stream_list", params = {"port_id": port_id, "handler": self.port_handlers[port_id]}) + + rc, resp_list = batch.invoke() + if not rc: + return rc, resp_list + + # split the list to 2s + index = 0 + for port_id in self.get_owned_ports(): + if not resp_list[index] or not resp_list[index + 1]: + snap[port_id] = None + continue + + # fetch the first two + stats = resp_list[index][1] + stream_list = resp_list[index + 1][1] + + port = {} + port['status'] = stats['status'] + port['stream_list'] = [] + + # get all the streams + if len(stream_list) > 0: + batch = self.create_batch() + for stream_id in stream_list: + batch.add("get_stream", params = {"port_id": port_id, "stream_id": stream_id, "handler": self.port_handlers[port_id]}) + + rc, stream_resp_list = batch.invoke() + if not rc: + port = {} + + port['streams'] = {} + for i, resp in enumerate(stream_resp_list): + if resp[0]: + port['streams'][stream_list[i]] = resp[1] + + snap[port_id] = port + + # move to next one + index += 2 + + + return snap + + # add stream + # def add_stream (self, port_id, stream_id, isg, next_stream_id, packet, vm=[]): + # if not port_id in self.get_owned_ports(): + # return False, "Port {0} is not owned... please take ownership before adding streams".format(port_id) + # + # handler = self.port_handlers[port_id] + # + # stream = {} + # stream['enabled'] = True + # stream['self_start'] = True + # stream['isg'] = isg + # stream['next_stream_id'] = next_stream_id + # stream['packet'] = {} + # stream['packet']['binary'] = packet + # stream['packet']['meta'] = "" + # stream['vm'] = vm + # stream['rx_stats'] = {} + # stream['rx_stats']['enabled'] = False + # + # stream['mode'] = {} + # stream['mode']['type'] = 'continuous' + # stream['mode']['pps'] = 10.0 + # + # params = {} + # params['handler'] = handler + # params['stream'] = stream + # params['port_id'] = port_id + # params['stream_id'] = stream_id + # + # print params + # return self.invoke_rpc_method('add_stream', params = params) + + def add_stream(self, port_id_array, stream_pack_list): + batch = self.create_batch() + + for port_id in port_id_array: + for stream_pack in stream_pack_list: + params = {"port_id": port_id, + "handler": self.port_handlers[port_id], + "stream_id": stream_pack.stream_id, + "stream": stream_pack.stream} + batch.add("add_stream", params=params) + rc, resp_list = batch.invoke() + if not rc: + return rc, resp_list + + for i, rc in enumerate(resp_list): + if rc[0]: + print "Stream {0} - {1}".format(i, rc[1]) + # self.port_handlers[port_id_array[i]] = rc[1] + + return True, resp_list + + # return self.invoke_rpc_method('add_stream', params = params) diff --git a/scripts/automation/trex_control_plane/client_utils/packet_builder.py b/scripts/automation/trex_control_plane/client_utils/packet_builder.py index c687126b..3aeb6a34 100755 --- a/scripts/automation/trex_control_plane/client_utils/packet_builder.py +++ b/scripts/automation/trex_control_plane/client_utils/packet_builder.py @@ -33,6 +33,7 @@ class CTRexPktBuilder(object): self._max_pkt_size = max_pkt_size self.payload_gen = CTRexPktBuilder.CTRexPayloadGen(self._packet, self._max_pkt_size) self.vm = CTRexPktBuilder.CTRexVM() + self.metadata = "" def add_pkt_layer(self, layer_name, pkt_layer): """ @@ -441,8 +442,9 @@ class CTRexPktBuilder(object): if self._packet is None: raise CTRexPktBuilder.EmptyPacketError() pkt_in_hex = binascii.hexlify(str(self._packet)) - return [int(pkt_in_hex[i:i+2], 16) - for i in range(0, len(pkt_in_hex), 2)] + return {"binary": [int(pkt_in_hex[i:i+2], 16) + for i in range(0, len(pkt_in_hex), 2)], + "meta": self.metadata} # return [pkt_in_hex[i:i+2] for i in range(0, len(pkt_in_hex), 2)] def dump_pkt_to_pcap(self, file_path, ts=None): @@ -887,7 +889,7 @@ class CTRexPktBuilder(object): dictionary holds variable data of VM variable """ - return {"ins_name": "flow_var", # VM variable dump always refers to manipulate instruction. + return {"type": "flow_var", # VM variable dump always refers to manipulate instruction. "name": self.name, "size": self.size, "op": self.operation, diff --git a/scripts/automation/trex_control_plane/client_utils/yaml_utils.py b/scripts/automation/trex_control_plane/client_utils/yaml_utils.py new file mode 100755 index 00000000..60630a04 --- /dev/null +++ b/scripts/automation/trex_control_plane/client_utils/yaml_utils.py @@ -0,0 +1,163 @@ + +""" +Dan Klein +Cisco Systems, Inc. + +Copyright (c) 2015-2015 Cisco Systems, Inc. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +import external_packages +import yaml + + +class CTRexYAMLLoader(object): + TYPE_DICT = {"double":float, + "int":int, + "array":list, + "string":str, + "boolean":bool} + + def __init__(self, yaml_ref_file_path): + self.yaml_path = yaml_ref_file_path + self.ref_obj = None + + def check_term_param_type(self, val, val_field, ref_val, multiplier): + # print val, val_field, ref_val + tmp_type = ref_val.get('type') + if isinstance(tmp_type, list): + # item can be one of multiple types + # print "multiple choice!" + python_types = set() + for t in tmp_type: + if t in self.TYPE_DICT: + python_types.add(self.TYPE_DICT.get(t)) + else: + return False, TypeError("Unknown resolving for type {0}".format(t)) + # print "python legit types: ", python_types + if type(val) not in python_types: + return False, TypeError("Type of object field '{0}' is not allowed".format(val_field)) + else: + # WE'RE OK! + return True, CTRexYAMLLoader._calc_final_value(val, multiplier, ref_val.get('multiply', False)) + else: + # this is a single type field + python_type = self.TYPE_DICT.get(tmp_type) + if not isinstance(val, python_type): + return False, TypeError("Type of object field '{0}' is not allowed".format(val_field)) + else: + # WE'RE OK! + return True, CTRexYAMLLoader._calc_final_value(val, multiplier, ref_val.get('multiply', False)) + + def get_reference_default(self, root_obj, sub_obj, key): + # print root_obj, sub_obj, key + if sub_obj: + ref_field = self.ref_obj.get(root_obj).get(sub_obj).get(key) + else: + ref_field = self.ref_obj.get(root_obj).get(key) + if 'has_default' in ref_field: + if ref_field.get('has_default'): + # WE'RE OK! + return True, ref_field.get('default') + else: + # This is a mandatory field! + return False, ValueError("The {0} field is mandatory and must be specified explicitly".format(key)) + else: + return False, ValueError("The {0} field has no indication about default value".format(key)) + + def validate_yaml(self, evaluated_obj, root_obj, fill_defaults=True, multiplier=1): + if isinstance(evaluated_obj, dict) and evaluated_obj.keys() == [root_obj]: + evaluated_obj = evaluated_obj.get(root_obj) + if not self.ref_obj: + self.ref_obj = load_yaml_to_obj(self.yaml_path) + # self.load_reference() + ref_item = self.ref_obj.get(root_obj) + if ref_item is not None: + try: + typed_obj = [False, None] # first item stores validity (multiple object "shapes"), second stored type + if "type" in evaluated_obj: + ref_item = ref_item[evaluated_obj.get("type")] + # print "lower resolution with typed object" + typed_obj = [True, evaluated_obj.get("type")] + if isinstance(ref_item, dict) and "type" not in ref_item: # this is not a terminal + result_obj = {} + if typed_obj[0]: + result_obj["type"] = typed_obj[1] + # print "processing dictionary non-terminal value" + for k, v in ref_item.items(): + # print "processing element '{0}' with value '{1}'".format(k,v) + if k in evaluated_obj: + # validate with ref obj + # print "found in evaluated object!" + tmp_type = v.get('type') + # print tmp_type + # print evaluated_obj + if tmp_type == "object": + # go deeper into nesting hierarchy + # print "This is an object type, recursion!" + result_obj[k] = self.validate_yaml(evaluated_obj.get(k), k, fill_defaults, multiplier) + else: + # validation on terminal type + # print "Validating terminal type %s" % k + res_ok, data = self.check_term_param_type(evaluated_obj.get(k), k, v, multiplier) + if res_ok: + # data field contains the value to save + result_obj[k] = data + else: + # data var contains the exception to throw + raise data + elif fill_defaults: + # complete missing values with default value, if exists + sub_obj = typed_obj[1] if typed_obj[0] else None + res_ok, data = self.get_reference_default(root_obj, sub_obj, k) + if res_ok: + # data field contains the value to save + result_obj[k] = data + else: + # data var contains the exception to throw + raise data + return result_obj + elif isinstance(ref_item, list): + # currently not handling list objects + return NotImplementedError("List object are currently unsupported") + else: + raise TypeError("Unknown parse tree object type.") + except KeyError as e: + raise + else: + raise KeyError("The given root_key '{key}' does not exists on reference object".format(key=root_obj)) + + @staticmethod + def _calc_final_value(val, multiplier, multiply): + def to_num(s): + try: + return int(s) + except ValueError: + return float(s) + if multiply: + return val * to_num(multiplier) + else: + return val + + +def load_yaml_to_obj(file_path): + try: + return yaml.load(file(file_path, 'r')) + except yaml.YAMLError as e: + raise + except Exception as e: + raise + +def yaml_exporter(file_path): + pass + +if __name__ == "__main__": + pass |