diff options
author | 2015-12-09 15:01:25 -0500 | |
---|---|---|
committer | 2015-12-09 15:36:16 -0500 | |
commit | 95c2405d6373ca3c6b69efc3faf293cd41a55c76 (patch) | |
tree | 7aa6728202e8a0d0eb8d049d82bb7f8dada7ac00 /scripts | |
parent | 1355327e97e6d5ce5800fa4d6f879695922e8637 (diff) |
read only support
Diffstat (limited to 'scripts')
7 files changed, 745 insertions, 566 deletions
diff --git a/scripts/automation/trex_control_plane/client/trex_port.py b/scripts/automation/trex_control_plane/client/trex_port.py new file mode 100644 index 00000000..68d89775 --- /dev/null +++ b/scripts/automation/trex_control_plane/client/trex_port.py @@ -0,0 +1,410 @@ + +from collections import namedtuple +from common.trex_types import * +from common import trex_stats + +# describes a single port +class Port(object): + STATE_DOWN = 0 + STATE_IDLE = 1 + STATE_STREAMS = 2 + STATE_TX = 3 + STATE_PAUSE = 4 + PortState = namedtuple('PortState', ['state_id', 'state_name']) + STATES_MAP = {STATE_DOWN: "DOWN", + STATE_IDLE: "IDLE", + STATE_STREAMS: "IDLE", + STATE_TX: "ACTIVE", + STATE_PAUSE: "PAUSE"} + + + def __init__ (self, port_id, speed, driver, user, session_id, comm_link): + self.port_id = port_id + self.state = self.STATE_IDLE + self.handler = None + self.comm_link = comm_link + self.transmit = comm_link.transmit + self.transmit_batch = comm_link.transmit_batch + self.user = user + self.session_id = session_id + self.driver = driver + self.speed = speed + self.streams = {} + self.profile = None + + self.port_stats = trex_stats.CPortStats(self) + + + def err(self, msg): + return RC_ERR("port {0} : {1}".format(self.port_id, msg)) + + def ok(self, data = "ACK"): + return RC_OK(data) + + def get_speed_bps (self): + return (self.speed * 1000 * 1000 * 1000) + + # take the port + def acquire(self, force = False): + params = {"port_id": self.port_id, + "user": self.user, + "session_id": self.session_id, + "force": force} + + command = RpcCmdData("acquire", params) + rc = self.transmit(command.method, command.params) + if rc.success: + self.handler = rc.data + return self.ok() + else: + return self.err(rc.data) + + # release the port + def release(self): + params = {"port_id": self.port_id, + "handler": self.handler} + + command = RpcCmdData("release", params) + rc = self.transmit(command.method, command.params) + self.handler = None + + if rc.success: + return self.ok() + else: + return self.err(rc.data) + + def is_acquired(self): + return (self.handler != None) + + def is_active(self): + return(self.state == self.STATE_TX ) or (self.state == self.STATE_PAUSE) + + def is_transmitting (self): + return (self.state == self.STATE_TX) + + def is_paused (self): + return (self.state == self.STATE_PAUSE) + + + def sync(self): + params = {"port_id": self.port_id} + + command = RpcCmdData("get_port_status", params) + rc = self.transmit(command.method, command.params) + if not rc.success: + return self.err(rc.data) + + # sync the port + port_state = rc.data['state'] + + if port_state == "DOWN": + self.state = self.STATE_DOWN + elif port_state == "IDLE": + self.state = self.STATE_IDLE + elif port_state == "STREAMS": + self.state = self.STATE_STREAMS + elif port_state == "TX": + self.state = self.STATE_TX + elif port_state == "PAUSE": + self.state = self.STATE_PAUSE + else: + raise Exception("port {0}: bad state received from server '{1}'".format(self.port_id, sync_data['state'])) + + return self.ok() + + + # return TRUE if write commands + def is_port_writable (self): + # operations on port can be done on state idle or state streams + return ((self.state == self.STATE_IDLE) or (self.state == self.STATE_STREAMS)) + + # add stream to the port + def add_stream (self, stream_id, stream_obj): + + if not self.is_port_writable(): + return self.err("Please stop port before attempting to add streams") + + + params = {"handler": self.handler, + "port_id": self.port_id, + "stream_id": stream_id, + "stream": stream_obj} + + rc, data = self.transmit("add_stream", params) + if not rc: + r = self.err(data) + print r.good() + + # add the stream + self.streams[stream_id] = stream_obj + + # the only valid state now + self.state = self.STATE_STREAMS + + return self.ok() + + # add multiple streams + def add_streams (self, streams_list): + batch = [] + + for stream in streams_list: + params = {"handler": self.handler, + "port_id": self.port_id, + "stream_id": stream.stream_id, + "stream": stream.stream} + + cmd = RpcCmdData('add_stream', params) + batch.append(cmd) + + rc, data = self.transmit_batch(batch) + + if not rc: + return self.err(data) + + # add the stream + for stream in streams_list: + self.streams[stream.stream_id] = stream.stream + + # the only valid state now + self.state = self.STATE_STREAMS + + return self.ok() + + # remove stream from port + def remove_stream (self, stream_id): + + if not stream_id in self.streams: + return self.err("stream {0} does not exists".format(stream_id)) + + params = {"handler": self.handler, + "port_id": self.port_id, + "stream_id": stream_id} + + + rc, data = self.transmit("remove_stream", params) + if not rc: + return self.err(data) + + self.streams[stream_id] = None + + self.state = self.STATE_STREAMS if len(self.streams > 0) else self.STATE_IDLE + + return self.ok() + + # remove all the streams + def remove_all_streams (self): + + params = {"handler": self.handler, + "port_id": self.port_id} + + rc, data = self.transmit("remove_all_streams", params) + if not rc: + return self.err(data) + + self.streams = {} + + self.state = self.STATE_IDLE + + return self.ok() + + # get a specific stream + def get_stream (self, stream_id): + if stream_id in self.streams: + return self.streams[stream_id] + else: + return None + + def get_all_streams (self): + return self.streams + + # start traffic + def start (self, mul, duration): + if self.state == self.STATE_DOWN: + return self.err("Unable to start traffic - port is down") + + if self.state == self.STATE_IDLE: + return self.err("Unable to start traffic - no streams attached to port") + + if self.state == self.STATE_TX: + return self.err("Unable to start traffic - port is already transmitting") + + params = {"handler": self.handler, + "port_id": self.port_id, + "mul": mul, + "duration": duration} + + rc, data = self.transmit("start_traffic", params) + if not rc: + return self.err(data) + + self.state = self.STATE_TX + + return self.ok() + + # stop traffic + # with force ignores the cached state and sends the command + def stop (self, force = False): + + if (not force) and (self.state != self.STATE_TX) and (self.state != self.STATE_PAUSE): + return self.err("port is not transmitting") + + params = {"handler": self.handler, + "port_id": self.port_id} + + rc, data = self.transmit("stop_traffic", params) + if not rc: + return self.err(data) + + # only valid state after stop + self.state = self.STATE_STREAMS + + return self.ok() + + def pause (self): + + if (self.state != self.STATE_TX) : + return self.err("port is not transmitting") + + params = {"handler": self.handler, + "port_id": self.port_id} + + rc, data = self.transmit("pause_traffic", params) + if not rc: + return self.err(data) + + # only valid state after stop + self.state = self.STATE_PAUSE + + return self.ok() + + + def resume (self): + + if (self.state != self.STATE_PAUSE) : + return self.err("port is not in pause mode") + + params = {"handler": self.handler, + "port_id": self.port_id} + + rc, data = self.transmit("resume_traffic", params) + if not rc: + return self.err(data) + + # only valid state after stop + self.state = self.STATE_TX + + return self.ok() + + + def update (self, mul): + if (self.state != self.STATE_TX) : + return self.err("port is not transmitting") + + params = {"handler": self.handler, + "port_id": self.port_id, + "mul": mul} + + rc, data = self.transmit("update_traffic", params) + if not rc: + return self.err(data) + + return self.ok() + + + def validate (self): + + if (self.state == self.STATE_DOWN): + return self.err("port is down") + + if (self.state == self.STATE_IDLE): + return self.err("no streams attached to port") + + params = {"handler": self.handler, + "port_id": self.port_id} + + rc, data = self.transmit("validate", params) + if not rc: + return self.err(data) + + self.profile = data + + return self.ok() + + def get_profile (self): + return self.profile + + + def print_profile (self, mult, duration): + if not self.get_profile(): + return + + rate = self.get_profile()['rate'] + graph = self.get_profile()['graph'] + + print format_text("Profile Map Per Port\n", 'underline', 'bold') + + factor = mult_to_factor(mult, rate['max_bps'], rate['max_pps'], rate['max_line_util']) + + print "Profile max BPS (base / req): {:^12} / {:^12}".format(format_num(rate['max_bps'], suffix = "bps"), + format_num(rate['max_bps'] * factor, suffix = "bps")) + + print "Profile max PPS (base / req): {:^12} / {:^12}".format(format_num(rate['max_pps'], suffix = "pps"), + format_num(rate['max_pps'] * factor, suffix = "pps"),) + + print "Profile line util. (base / req): {:^12} / {:^12}".format(format_percentage(rate['max_line_util'] * 100), + format_percentage(rate['max_line_util'] * factor * 100)) + + + # duration + exp_time_base_sec = graph['expected_duration'] / (1000 * 1000) + exp_time_factor_sec = exp_time_base_sec / factor + + # user configured a duration + if duration > 0: + if exp_time_factor_sec > 0: + exp_time_factor_sec = min(exp_time_factor_sec, duration) + else: + exp_time_factor_sec = duration + + + print "Duration (base / req): {:^12} / {:^12}".format(format_time(exp_time_base_sec), + format_time(exp_time_factor_sec)) + print "\n" + + + def get_port_state_name(self): + return self.STATES_MAP.get(self.state, "Unknown") + + ################# stats handler ###################### + def generate_port_stats(self): + return self.port_stats.generate_stats() + pass + + def generate_port_status(self): + return {"port-type": self.driver, + "maximum": "{speed} Gb/s".format(speed=self.speed), + "port-status": self.get_port_state_name() + } + + def clear_stats(self): + return self.port_stats.clear_stats() + + + ################# events handler ###################### + def async_event_port_stopped (self): + self.state = self.STATE_STREAMS + + + def async_event_port_started (self): + self.state = self.STATE_TX + + + def async_event_port_paused (self): + self.state = self.STATE_PAUSE + + + def async_event_port_resumed (self): + self.state = self.STATE_TX + + def async_event_forced_acquired (self): + self.handler = None diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py index db0ed5bf..43ebea9d 100755 --- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py +++ b/scripts/automation/trex_control_plane/client/trex_stateless_client.py @@ -19,68 +19,12 @@ from client_utils import parsing_opts, text_tables import time import datetime import re +import random +from trex_port import Port +from common.trex_types import * from trex_async_client import CTRexAsyncClient -RpcCmdData = namedtuple('RpcCmdData', ['method', 'params']) - -class RpcResponseStatus(namedtuple('RpcResponseStatus', ['success', 'id', 'msg'])): - __slots__ = () - def __str__(self): - return "{id:^3} - {msg} ({stat})".format(id=self.id, - msg=self.msg, - stat="success" if self.success else "fail") - -# simple class to represent complex return value -class RC(): - - def __init__ (self, rc = None, data = None): - self.rc_list = [] - - if (rc != None) and (data != None): - tuple_rc = namedtuple('RC', ['rc', 'data']) - self.rc_list.append(tuple_rc(rc, data)) - - def add (self, rc): - self.rc_list += rc.rc_list - - def good (self): - return all([x.rc for x in self.rc_list]) - - def bad (self): - return not self.good() - - def data (self): - return [x.data if x.rc else "" for x in self.rc_list] - - def err (self): - return [x.data if not x.rc else "" for x in self.rc_list] - - def annotate (self, desc = None): - if desc: - print format_text('\n{:<60}'.format(desc), 'bold'), - - if self.bad(): - # print all the errors - print "" - for x in self.rc_list: - if not x.rc: - print format_text("\n{0}".format(x.data), 'bold') - - print "" - print format_text("[FAILED]\n", 'red', 'bold') - - - else: - print format_text("[SUCCESS]\n", 'green', 'bold') - - -def RC_OK(data = ""): - return RC(True, data) -def RC_ERR (err): - return RC(False, err) - -LoadedStreamList = namedtuple('LoadedStreamList', ['loaded', 'compiled']) ########## utlity ############ def mult_to_factor (mult, max_bps, max_pps, line_util): @@ -98,444 +42,6 @@ def mult_to_factor (mult, max_bps, max_pps, line_util): -# describes a stream DB -class CStreamsDB(object): - - def __init__(self): - self.stream_packs = {} - - def load_yaml_file(self, filename): - - stream_pack_name = filename - if stream_pack_name in self.get_loaded_streams_names(): - self.remove_stream_packs(stream_pack_name) - - stream_list = CStreamList() - loaded_obj = stream_list.load_yaml(filename) - - try: - compiled_streams = stream_list.compile_streams() - rc = self.load_streams(stream_pack_name, - LoadedStreamList(loaded_obj, - [StreamPack(v.stream_id, v.stream.dump()) - for k, v in compiled_streams.items()])) - - except Exception as e: - return None - - return self.get_stream_pack(stream_pack_name) - - def load_streams(self, name, LoadedStreamList_obj): - if name in self.stream_packs: - return False - else: - self.stream_packs[name] = LoadedStreamList_obj - return True - - def remove_stream_packs(self, *names): - removed_streams = [] - for name in names: - removed = self.stream_packs.pop(name) - if removed: - removed_streams.append(name) - return removed_streams - - def clear(self): - self.stream_packs.clear() - - def get_loaded_streams_names(self): - return self.stream_packs.keys() - - def stream_pack_exists (self, name): - return name in self.get_loaded_streams_names() - - def get_stream_pack(self, name): - if not self.stream_pack_exists(name): - return None - else: - return self.stream_packs.get(name) - - -# describes a single port -class Port(object): - STATE_DOWN = 0 - STATE_IDLE = 1 - STATE_STREAMS = 2 - STATE_TX = 3 - STATE_PAUSE = 4 - PortState = namedtuple('PortState', ['state_id', 'state_name']) - STATES_MAP = {STATE_DOWN: "DOWN", - STATE_IDLE: "IDLE", - STATE_STREAMS: "STREAMS", - STATE_TX: "ACTIVE", - STATE_PAUSE: "PAUSE"} - - - def __init__ (self, port_id, speed, driver, user, comm_link): - self.port_id = port_id - self.state = self.STATE_IDLE - self.handler = None - self.comm_link = comm_link - self.transmit = comm_link.transmit - self.transmit_batch = comm_link.transmit_batch - self.user = user - self.driver = driver - self.speed = speed - self.streams = {} - self.profile = None - - self.port_stats = trex_stats.CPortStats(self) - - def err(self, msg): - return RC_ERR("port {0} : {1}".format(self.port_id, msg)) - - def ok(self, data = "ACK"): - return RC_OK(data) - - def get_speed_bps (self): - return (self.speed * 1000 * 1000 * 1000) - - # take the port - def acquire(self, force = False): - params = {"port_id": self.port_id, - "user": self.user, - "force": force} - - command = RpcCmdData("acquire", params) - rc = self.transmit(command.method, command.params) - if rc.success: - self.handler = rc.data - return self.ok() - else: - return self.err(rc.data) - - # release the port - def release(self): - params = {"port_id": self.port_id, - "handler": self.handler} - - command = RpcCmdData("release", params) - rc = self.transmit(command.method, command.params) - if rc.success: - self.handler = rc.data - return self.ok() - else: - return self.err(rc.data) - - def is_acquired(self): - return (self.handler != None) - - def is_active(self): - return(self.state == self.STATE_TX ) or (self.state == self.STATE_PAUSE) - - def is_transmitting (self): - return (self.state == self.STATE_TX) - - def is_paused (self): - return (self.state == self.STATE_PAUSE) - - - def sync(self, sync_data): - self.handler = sync_data['handler'] - port_state = sync_data['state'].upper() - if port_state == "DOWN": - self.state = self.STATE_DOWN - elif port_state == "IDLE": - self.state = self.STATE_IDLE - elif port_state == "STREAMS": - self.state = self.STATE_STREAMS - elif port_state == "TX": - self.state = self.STATE_TX - elif port_state == "PAUSE": - self.state = self.STATE_PAUSE - else: - raise Exception("port {0}: bad state received from server '{1}'".format(self.port_id, sync_data['state'])) - - return self.ok() - - - # return TRUE if write commands - def is_port_writable (self): - # operations on port can be done on state idle or state streams - return ((self.state == self.STATE_IDLE) or (self.state == self.STATE_STREAMS)) - - # add stream to the port - def add_stream (self, stream_id, stream_obj): - - if not self.is_port_writable(): - return self.err("Please stop port before attempting to add streams") - - - params = {"handler": self.handler, - "port_id": self.port_id, - "stream_id": stream_id, - "stream": stream_obj} - - rc, data = self.transmit("add_stream", params) - if not rc: - r = self.err(data) - print r.good() - - # add the stream - self.streams[stream_id] = stream_obj - - # the only valid state now - self.state = self.STATE_STREAMS - - return self.ok() - - # add multiple streams - def add_streams (self, streams_list): - batch = [] - - for stream in streams_list: - params = {"handler": self.handler, - "port_id": self.port_id, - "stream_id": stream.stream_id, - "stream": stream.stream} - - cmd = RpcCmdData('add_stream', params) - batch.append(cmd) - - rc, data = self.transmit_batch(batch) - - if not rc: - return self.err(data) - - # add the stream - for stream in streams_list: - self.streams[stream.stream_id] = stream.stream - - # the only valid state now - self.state = self.STATE_STREAMS - - return self.ok() - - # remove stream from port - def remove_stream (self, stream_id): - - if not stream_id in self.streams: - return self.err("stream {0} does not exists".format(stream_id)) - - params = {"handler": self.handler, - "port_id": self.port_id, - "stream_id": stream_id} - - - rc, data = self.transmit("remove_stream", params) - if not rc: - return self.err(data) - - self.streams[stream_id] = None - - self.state = self.STATE_STREAMS if len(self.streams > 0) else self.STATE_IDLE - - return self.ok() - - # remove all the streams - def remove_all_streams (self): - - params = {"handler": self.handler, - "port_id": self.port_id} - - rc, data = self.transmit("remove_all_streams", params) - if not rc: - return self.err(data) - - self.streams = {} - - self.state = self.STATE_IDLE - - return self.ok() - - # get a specific stream - def get_stream (self, stream_id): - if stream_id in self.streams: - return self.streams[stream_id] - else: - return None - - def get_all_streams (self): - return self.streams - - # start traffic - def start (self, mul, duration): - if self.state == self.STATE_DOWN: - return self.err("Unable to start traffic - port is down") - - if self.state == self.STATE_IDLE: - return self.err("Unable to start traffic - no streams attached to port") - - if self.state == self.STATE_TX: - return self.err("Unable to start traffic - port is already transmitting") - - params = {"handler": self.handler, - "port_id": self.port_id, - "mul": mul, - "duration": duration} - - rc, data = self.transmit("start_traffic", params) - if not rc: - return self.err(data) - - self.state = self.STATE_TX - - return self.ok() - - # stop traffic - # with force ignores the cached state and sends the command - def stop (self, force = False): - - if (not force) and (self.state != self.STATE_TX) and (self.state != self.STATE_PAUSE): - return self.err("port is not transmitting") - - params = {"handler": self.handler, - "port_id": self.port_id} - - rc, data = self.transmit("stop_traffic", params) - if not rc: - return self.err(data) - - # only valid state after stop - self.state = self.STATE_STREAMS - - return self.ok() - - def pause (self): - - if (self.state != self.STATE_TX) : - return self.err("port is not transmitting") - - params = {"handler": self.handler, - "port_id": self.port_id} - - rc, data = self.transmit("pause_traffic", params) - if not rc: - return self.err(data) - - # only valid state after stop - self.state = self.STATE_PAUSE - - return self.ok() - - - def resume (self): - - if (self.state != self.STATE_PAUSE) : - return self.err("port is not in pause mode") - - params = {"handler": self.handler, - "port_id": self.port_id} - - rc, data = self.transmit("resume_traffic", params) - if not rc: - return self.err(data) - - # only valid state after stop - self.state = self.STATE_TX - - return self.ok() - - - def update (self, mul): - if (self.state != self.STATE_TX) : - return self.err("port is not transmitting") - - params = {"handler": self.handler, - "port_id": self.port_id, - "mul": mul} - - rc, data = self.transmit("update_traffic", params) - if not rc: - return self.err(data) - - return self.ok() - - - def validate (self): - - if (self.state == self.STATE_DOWN): - return self.err("port is down") - - if (self.state == self.STATE_IDLE): - return self.err("no streams attached to port") - - params = {"handler": self.handler, - "port_id": self.port_id} - - rc, data = self.transmit("validate", params) - if not rc: - return self.err(data) - - self.profile = data - - return self.ok() - - def get_profile (self): - return self.profile - - - def print_profile (self, mult, duration): - if not self.get_profile(): - return - - rate = self.get_profile()['rate'] - graph = self.get_profile()['graph'] - - print format_text("Profile Map Per Port\n", 'underline', 'bold') - - factor = mult_to_factor(mult, rate['max_bps'], rate['max_pps'], rate['max_line_util']) - - print "Profile max BPS (base / req): {:^12} / {:^12}".format(format_num(rate['max_bps'], suffix = "bps"), - format_num(rate['max_bps'] * factor, suffix = "bps")) - - print "Profile max PPS (base / req): {:^12} / {:^12}".format(format_num(rate['max_pps'], suffix = "pps"), - format_num(rate['max_pps'] * factor, suffix = "pps"),) - - print "Profile line util. (base / req): {:^12} / {:^12}".format(format_percentage(rate['max_line_util'] * 100), - format_percentage(rate['max_line_util'] * factor * 100)) - - - # duration - exp_time_base_sec = graph['expected_duration'] / (1000 * 1000) - exp_time_factor_sec = exp_time_base_sec / factor - - # user configured a duration - if duration > 0: - if exp_time_factor_sec > 0: - exp_time_factor_sec = min(exp_time_factor_sec, duration) - else: - exp_time_factor_sec = duration - - - print "Duration (base / req): {:^12} / {:^12}".format(format_time(exp_time_base_sec), - format_time(exp_time_factor_sec)) - print "\n" - - - def get_port_state_name(self): - return self.STATES_MAP.get(self.state, "Unknown") - - ################# stats handler ###################### - def generate_port_stats(self): - return self.port_stats.generate_stats() - pass - - def generate_port_status(self): - return {"port-type": self.driver, - "maximum": "{speed} Gb/s".format(speed=self.speed), - "port-status": self.get_port_state_name() - } - - def clear_stats(self): - return self.port_stats.clear_stats() - - - ################# events handler ###################### - def async_event_port_stopped (self): - self.state = self.STATE_STREAMS - - class CTRexStatelessClient(object): """docstring for CTRexStatelessClient""" @@ -546,7 +52,10 @@ class CTRexStatelessClient(object): def __init__(self, username, server="localhost", sync_port = 5050, async_port = 4500, virtual=False): super(CTRexStatelessClient, self).__init__() + self.user = username + self.session_id = random.getrandbits(32) + self.comm_link = CTRexStatelessClient.CCommLink(server, sync_port, virtual) # default verbose level @@ -571,15 +80,24 @@ class CTRexStatelessClient(object): self.events = [] + + self.read_only = False self.connected = False + # when the client gets out + def shutdown (self): + self.release(self.get_acquired_ports()) + # returns the port object def get_port (self, port_id): return self.ports.get(port_id, None) + def get_server (self): + return self.comm_link.get_server() + ################# events handler ###################### def add_event_log (self, msg, ev_type, show = False): @@ -634,6 +152,7 @@ class CTRexStatelessClient(object): if (type == 0): port_id = int(data['port_id']) ev = "Port {0} has started".format(port_id) + self.async_event_port_started(port_id) # port stopped elif (type == 1): @@ -644,21 +163,47 @@ class CTRexStatelessClient(object): self.async_event_port_stopped(port_id) - # server stopped + # port paused elif (type == 2): - ev = "Server has stopped" - self.async_event_server_stopped() - show_event = True + port_id = int(data['port_id']) + ev = "Port {0} has paused".format(port_id) - # port finished traffic + # call the handler + self.async_event_port_paused(port_id) + + # port resumed elif (type == 3): port_id = int(data['port_id']) + ev = "Port {0} has resumed".format(port_id) + + # call the handler + self.async_event_port_resumed(port_id) + + # port finished traffic + elif (type == 4): + port_id = int(data['port_id']) ev = "Port {0} job done".format(port_id) # call the handler self.async_event_port_stopped(port_id) show_event = True + # port was stolen... + elif (type == 5): + port_id = int(data['port_id']) + ev = "Port {0} was forcely taken".format(port_id) + + # call the handler + self.async_event_port_forced_acquired(port_id) + show_event = True + + # server stopped + elif (type == 100): + ev = "Server has stopped" + self.async_event_server_stopped() + show_event = True + + else: # unknown event - ignore return @@ -670,6 +215,23 @@ class CTRexStatelessClient(object): def async_event_port_stopped (self, port_id): self.ports[port_id].async_event_port_stopped() + + def async_event_port_started (self, port_id): + self.ports[port_id].async_event_port_started() + + + def async_event_port_paused (self, port_id): + self.ports[port_id].async_event_port_paused() + + + def async_event_port_resumed (self, port_id): + self.ports[port_id].async_event_port_resumed() + + + def async_event_port_forced_acquired (self, port_id): + self.ports[port_id].async_event_forced_acquired() + self.read_only = True + def async_event_server_stopped (self): self.connected = False @@ -732,7 +294,7 @@ class CTRexStatelessClient(object): ############ boot up section ################ # connection sequence - def connect(self): + def connect(self, force = False): # clear this flag self.connected = False @@ -773,21 +335,30 @@ class CTRexStatelessClient(object): speed = self.system_info['ports'][port_id]['speed'] driver = self.system_info['ports'][port_id]['driver'] - self.ports[port_id] = Port(port_id, speed, driver, self.user, self.comm_link) + self.ports[port_id] = Port(port_id, speed, driver, self.user, self.session_id, self.comm_link) - # acquire all ports - rc = self.acquire() + # sync the ports + rc = self.sync_ports() if rc.bad(): return rc - rc = self.sync_with_server() + # acquire all ports + rc = self.acquire(force = force) if rc.bad(): - return rc + # release all the succeeded ports and set as read only + self.release(self.get_acquired_ports()) + self.read_only = True + else: + self.read_only = False + self.connected = True + return rc - return RC_OK() + + def is_read_only (self): + return self.read_only def is_connected (self): return self.connected and self.comm_link.is_connected @@ -838,6 +409,9 @@ class CTRexStatelessClient(object): def get_connection_ip (self): return self.comm_link.server + def get_all_ports (self): + return [port_id for port_id, port_obj in self.ports.iteritems()] + def get_acquired_ports(self): return [port_id for port_id, port_obj in self.ports.iteritems() @@ -883,17 +457,6 @@ class CTRexStatelessClient(object): return RC(rc, info) - def sync_with_server(self, sync_streams=False): - rc, data = self.transmit("sync_user", {"user": self.user, "sync_streams": sync_streams}) - if not rc: - return RC_ERR(data) - - for port_info in data: - rc = self.ports[port_info['port_id']].sync(port_info) - if rc.bad(): - return rc - - return RC_OK() def get_global_stats(self): rc, info = self.transmit("get_global_stats") @@ -901,6 +464,16 @@ class CTRexStatelessClient(object): ########## port commands ############## + def sync_ports (self, port_id_list = None, force = False): + port_id_list = self.__ports(port_id_list) + + rc = RC() + + for port_id in port_id_list: + rc.add(self.ports[port_id].sync()) + + return rc + # acquire ports, if port_list is none - get all def acquire (self, port_id_list = None, force = False): port_id_list = self.__ports(port_id_list) @@ -919,7 +492,7 @@ class CTRexStatelessClient(object): rc = RC() for port_id in port_id_list: - rc.add(self.ports[port_id].release(force)) + rc.add(self.ports[port_id].release()) return rc @@ -1075,8 +648,8 @@ class CTRexStatelessClient(object): rc.annotate("Pinging the server on '{0}' port '{1}': ".format(self.get_connection_ip(), self.get_connection_port())) return rc - def cmd_connect(self): - rc = self.connect() + def cmd_connect(self, force): + rc = self.connect(force) rc.annotate() return rc @@ -1089,12 +662,6 @@ class CTRexStatelessClient(object): def cmd_reset(self): - # sync with the server - rc = self.sync_with_server() - rc.annotate("Syncing with the server:") - if rc.bad(): - return rc - rc = self.acquire(force = True) rc.annotate("Force acquiring all ports:") if rc.bad(): @@ -1260,6 +827,22 @@ class CTRexStatelessClient(object): ############## High Level API With Parser ################ + + def cmd_connect_line (self, line): + '''Connects to the TRex server''' + # define a parser + parser = parsing_opts.gen_parser(self, + "connect", + self.cmd_connect_line.__doc__, + parsing_opts.FORCE) + + opts = parser.parse_args(line.split()) + + if opts is None: + return RC_ERR("bad command line parameters") + + return self.cmd_connect(opts.force) + @timing def cmd_start_line (self, line): '''Start selected traffic in specified ports on TRex\n''' @@ -1551,6 +1134,9 @@ class CTRexStatelessClient(object): else: return True + def get_server (self): + return self.server + def set_verbose(self, mode): self.verbose = mode return self.rpc_link.set_verbose(mode) diff --git a/scripts/automation/trex_control_plane/common/trex_stats.py b/scripts/automation/trex_control_plane/common/trex_stats.py index 671a0656..2f6ea38d 100755 --- a/scripts/automation/trex_control_plane/common/trex_stats.py +++ b/scripts/automation/trex_control_plane/common/trex_stats.py @@ -5,6 +5,7 @@ from common.text_opts import format_text from client.trex_async_client import CTRexAsyncStats import copy import datetime +import time import re GLOBAL_STATS = 'g' @@ -134,7 +135,8 @@ class CTRexStatsGenerator(object): # fetch owned ports ports = [port_obj for _, port_obj in self._ports_dict.iteritems() - if port_obj.is_acquired() and port_obj.port_id in port_id_list] + if port_obj.port_id in port_id_list] + # display only the first FOUR options, by design if len(ports) > 4: print format_text("[WARNING]: ", 'magenta', 'bold'), format_text("displaying up to 4 ports", 'magenta') @@ -155,7 +157,7 @@ class CTRexStats(object): def __init__(self): self.reference_stats = None self.latest_stats = {} - self.last_update_ts = datetime.datetime.now() + self.last_update_ts = time.time() def __getitem__(self, item): @@ -204,13 +206,16 @@ class CTRexStats(object): def update(self, snapshot): # update - self.last_update_ts = datetime.datetime.now() - self.latest_stats = snapshot - if self.reference_stats == None: + diff_time = time.time() - self.last_update_ts + + # 3 seconds is too much - this is the new reference + if (self.reference_stats == None) or (diff_time > 3): self.reference_stats = self.latest_stats + self.last_update_ts = time.time() + def clear_stats(self): self.reference_stats = self.latest_stats @@ -225,6 +230,7 @@ class CTRexStats(object): def get_rel(self, field, format=False, suffix=""): if not field in self.latest_stats: return "N/A" + if not format: return (self.latest_stats[field] - self.reference_stats[field]) else: diff --git a/scripts/automation/trex_control_plane/common/trex_streams.py b/scripts/automation/trex_control_plane/common/trex_streams.py index 89de7286..86eee1f4 100755 --- a/scripts/automation/trex_control_plane/common/trex_streams.py +++ b/scripts/automation/trex_control_plane/common/trex_streams.py @@ -10,6 +10,7 @@ import copy import os StreamPack = namedtuple('StreamPack', ['stream_id', 'stream']) +LoadedStreamList = namedtuple('LoadedStreamList', ['loaded', 'compiled']) class CStreamList(object): @@ -254,5 +255,61 @@ class CStream(object): raise RuntimeError("CStream object isn't loaded with data. Use 'load_data' method.") -if __name__ == "__main__": - pass + +# describes a stream DB +class CStreamsDB(object): + + def __init__(self): + self.stream_packs = {} + + def load_yaml_file(self, filename): + + stream_pack_name = filename + if stream_pack_name in self.get_loaded_streams_names(): + self.remove_stream_packs(stream_pack_name) + + stream_list = CStreamList() + loaded_obj = stream_list.load_yaml(filename) + + try: + compiled_streams = stream_list.compile_streams() + rc = self.load_streams(stream_pack_name, + LoadedStreamList(loaded_obj, + [StreamPack(v.stream_id, v.stream.dump()) + for k, v in compiled_streams.items()])) + + except Exception as e: + return None + + return self.get_stream_pack(stream_pack_name) + + def load_streams(self, name, LoadedStreamList_obj): + if name in self.stream_packs: + return False + else: + self.stream_packs[name] = LoadedStreamList_obj + return True + + def remove_stream_packs(self, *names): + removed_streams = [] + for name in names: + removed = self.stream_packs.pop(name) + if removed: + removed_streams.append(name) + return removed_streams + + def clear(self): + self.stream_packs.clear() + + def get_loaded_streams_names(self): + return self.stream_packs.keys() + + def stream_pack_exists (self, name): + return name in self.get_loaded_streams_names() + + def get_stream_pack(self, name): + if not self.stream_pack_exists(name): + return None + else: + return self.stream_packs.get(name) + diff --git a/scripts/automation/trex_control_plane/common/trex_types.py b/scripts/automation/trex_control_plane/common/trex_types.py new file mode 100644 index 00000000..3de36e4c --- /dev/null +++ b/scripts/automation/trex_control_plane/common/trex_types.py @@ -0,0 +1,66 @@ + +from collections import namedtuple +from common.text_opts import * + +RpcCmdData = namedtuple('RpcCmdData', ['method', 'params']) + +class RpcResponseStatus(namedtuple('RpcResponseStatus', ['success', 'id', 'msg'])): + __slots__ = () + def __str__(self): + return "{id:^3} - {msg} ({stat})".format(id=self.id, + msg=self.msg, + stat="success" if self.success else "fail") + +# simple class to represent complex return value +class RC(): + + def __init__ (self, rc = None, data = None): + self.rc_list = [] + + if (rc != None) and (data != None): + tuple_rc = namedtuple('RC', ['rc', 'data']) + self.rc_list.append(tuple_rc(rc, data)) + + def add (self, rc): + self.rc_list += rc.rc_list + + def good (self): + return all([x.rc for x in self.rc_list]) + + def bad (self): + return not self.good() + + def data (self): + return [x.data if x.rc else "" for x in self.rc_list] + + def err (self): + return [x.data if not x.rc else "" for x in self.rc_list] + + def annotate (self, desc = None, show_status = True): + if desc: + print format_text('\n{:<60}'.format(desc), 'bold'), + else: + print "" + + if self.bad(): + # print all the errors + print "" + for x in self.rc_list: + if not x.rc: + print format_text("\n{0}".format(x.data), 'bold') + + print "" + if show_status: + print format_text("[FAILED]\n", 'red', 'bold') + + + else: + if show_status: + print format_text("[SUCCESS]\n", 'green', 'bold') + + +def RC_OK(data = ""): + return RC(True, data) +def RC_ERR (err): + return RC(False, err) + diff --git a/scripts/automation/trex_control_plane/console/trex_console.py b/scripts/automation/trex_control_plane/console/trex_console.py index 9140977a..495e1c22 100755 --- a/scripts/automation/trex_control_plane/console/trex_console.py +++ b/scripts/automation/trex_control_plane/console/trex_console.py @@ -130,12 +130,17 @@ class TRexConsole(TRexGeneralCmd): ################### internal section ######################## + def verify_connected(f): @wraps(f) def wrap(*args): inst = args[0] + func_name = f.__name__ + if func_name.startswith("do_"): + func_name = func_name[3:] + if not inst.stateless_client.is_connected(): - print format_text("\nNot connected to server\n", 'bold') + print format_text("\n'{0}' cannot be executed on offline mode\n".format(func_name), 'bold') return ret = f(*args) @@ -143,9 +148,32 @@ class TRexConsole(TRexGeneralCmd): return wrap + # TODO: remove this ugly duplication + def verify_connected_and_rw (f): + @wraps(f) + def wrap(*args): + inst = args[0] + func_name = f.__name__ + if func_name.startswith("do_"): + func_name = func_name[3:] + + if not inst.stateless_client.is_connected(): + print format_text("\n'{0}' cannot be executed on offline mode\n".format(func_name), 'bold') + return + + if inst.stateless_client.is_read_only(): + print format_text("\n'{0}' cannot be executed on read only mode\n".format(func_name), 'bold') + return + + ret = f(*args) + return ret + + return wrap + + def get_console_identifier(self): return "{context}_{server}".format(context=self.__class__.__name__, - server=self.stateless_client.get_system_info()['hostname']) + server=self.stateless_client.get_server()) def register_main_console_methods(self): main_names = set(self.trex_console.get_names()).difference(set(dir(self.__class__))) @@ -156,11 +184,17 @@ class TRexConsole(TRexGeneralCmd): def postcmd(self, stop, line): - if self.stateless_client.is_connected(): - self.prompt = "TRex > " - else: - self.supported_rpc = None + if not self.stateless_client.is_connected(): self.prompt = "TRex (offline) > " + self.supported_rpc = None + return stop + + if self.stateless_client.is_read_only(): + self.prompt = "TRex (read only) > " + return stop + + + self.prompt = "TRex > " return stop @@ -287,7 +321,7 @@ class TRexConsole(TRexGeneralCmd): def do_connect (self, line): '''Connects to the server\n''' - rc = self.stateless_client.cmd_connect() + rc = self.stateless_client.cmd_connect_line(line) if rc.bad(): return @@ -314,7 +348,7 @@ class TRexConsole(TRexGeneralCmd): if (l > 2) and (s[l - 2] in file_flags): return TRexConsole.tree_autocomplete(s[l - 1]) - @verify_connected + @verify_connected_and_rw def do_start(self, line): '''Start selected traffic in specified port(s) on TRex\n''' @@ -325,7 +359,7 @@ class TRexConsole(TRexGeneralCmd): self.do_start("-h") ############# stop - @verify_connected + @verify_connected_and_rw def do_stop(self, line): '''stops port(s) transmitting traffic\n''' @@ -335,7 +369,7 @@ class TRexConsole(TRexGeneralCmd): self.do_stop("-h") ############# update - @verify_connected + @verify_connected_and_rw def do_update(self, line): '''update speed of port(s)currently transmitting traffic\n''' @@ -345,14 +379,14 @@ class TRexConsole(TRexGeneralCmd): self.do_update("-h") ############# pause - @verify_connected + @verify_connected_and_rw def do_pause(self, line): '''pause port(s) transmitting traffic\n''' self.stateless_client.cmd_pause_line(line) ############# resume - @verify_connected + @verify_connected_and_rw def do_resume(self, line): '''resume port(s) transmitting traffic\n''' @@ -361,7 +395,7 @@ class TRexConsole(TRexGeneralCmd): ########## reset - @verify_connected + @verify_connected_and_rw def do_reset (self, line): '''force stop all ports\n''' self.stateless_client.cmd_reset_line(line) @@ -375,6 +409,7 @@ class TRexConsole(TRexGeneralCmd): self.stateless_client.cmd_validate_line(line) + @verify_connected def do_stats(self, line): '''Fetch statistics from TRex server by port\n''' self.stateless_client.cmd_stats_line(line) @@ -383,6 +418,7 @@ class TRexConsole(TRexGeneralCmd): def help_stats(self): self.do_stats("-h") + @verify_connected def do_clear(self, line): '''Clear cached local statistics\n''' self.stateless_client.cmd_clear_line(line) @@ -529,9 +565,17 @@ def main(): # Stateless client connection stateless_client = CTRexStatelessClient(options.user, options.server, options.port, options.pub) - rc = stateless_client.cmd_connect() + + print "\nlogged as {0}".format(format_text(options.user, 'bold')) + rc = stateless_client.connect() + + # error can be either no able to connect or a read only if rc.bad(): - return + if not stateless_client.is_connected(): + rc.annotate() + else: + rc.annotate(show_status = False) + if options.batch: cont = stateless_client.run_script_file(options.batch[0]) @@ -544,7 +588,9 @@ def main(): console.cmdloop() except KeyboardInterrupt as e: print "\n\n*** Caught Ctrl + C... Exiting...\n\n" - return + + finally: + stateless_client.shutdown() if __name__ == '__main__': main() diff --git a/scripts/automation/trex_control_plane/console/trex_tui.py b/scripts/automation/trex_control_plane/console/trex_tui.py index c44efe15..3ddf7a7f 100644 --- a/scripts/automation/trex_control_plane/console/trex_tui.py +++ b/scripts/automation/trex_control_plane/console/trex_tui.py @@ -40,7 +40,7 @@ class TrexTUIDashBoard(TrexTUIPanel): self.key_actions['+'] = {'action': self.action_raise, 'legend': 'up 5%', 'show': True} self.key_actions['-'] = {'action': self.action_lower, 'legend': 'low 5%', 'show': True} - self.ports = self.stateless_client.get_acquired_ports() + self.ports = self.stateless_client.get_all_ports() def show (self): @@ -55,6 +55,10 @@ class TrexTUIDashBoard(TrexTUIPanel): allowed['c'] = self.key_actions['c'] + # thats it for read only + if self.stateless_client.is_read_only(): + return allowed + if len(self.stateless_client.get_transmitting_ports()) > 0: allowed['p'] = self.key_actions['p'] allowed['+'] = self.key_actions['+'] @@ -69,10 +73,10 @@ class TrexTUIDashBoard(TrexTUIPanel): ######### actions def action_pause (self): - rc = self.stateless_client.pause_traffic(self.mng.acquired_ports) + rc = self.stateless_client.pause_traffic(self.mng.ports) ports_succeeded = [] - for rc_single, port_id in zip(rc.rc_list, self.mng.acquired_ports): + for rc_single, port_id in zip(rc.rc_list, self.mng.ports): if rc_single.rc: ports_succeeded.append(port_id) @@ -83,10 +87,10 @@ class TrexTUIDashBoard(TrexTUIPanel): def action_resume (self): - rc = self.stateless_client.resume_traffic(self.mng.acquired_ports) + rc = self.stateless_client.resume_traffic(self.mng.ports) ports_succeeded = [] - for rc_single, port_id in zip(rc.rc_list, self.mng.acquired_ports): + for rc_single, port_id in zip(rc.rc_list, self.mng.ports): if rc_single.rc: ports_succeeded.append(port_id) @@ -98,10 +102,10 @@ class TrexTUIDashBoard(TrexTUIPanel): def action_raise (self): mul = {'type': 'percentage', 'value': 5, 'op': 'add'} - rc = self.stateless_client.update_traffic(mul, self.mng.acquired_ports) + rc = self.stateless_client.update_traffic(mul, self.mng.ports) ports_succeeded = [] - for rc_single, port_id in zip(rc.rc_list, self.mng.acquired_ports): + for rc_single, port_id in zip(rc.rc_list, self.mng.ports): if rc_single.rc: ports_succeeded.append(port_id) @@ -112,10 +116,10 @@ class TrexTUIDashBoard(TrexTUIPanel): def action_lower (self): mul = {'type': 'percentage', 'value': 5, 'op': 'sub'} - rc = self.stateless_client.update_traffic(mul, self.mng.acquired_ports) + rc = self.stateless_client.update_traffic(mul, self.mng.ports) ports_succeeded = [] - for rc_single, port_id in zip(rc.rc_list, self.mng.acquired_ports): + for rc_single, port_id in zip(rc.rc_list, self.mng.ports): if rc_single.rc: ports_succeeded.append(port_id) @@ -126,7 +130,7 @@ class TrexTUIDashBoard(TrexTUIPanel): def action_clear (self): - self.stateless_client.cmd_clear(self.mng.acquired_ports) + self.stateless_client.cmd_clear(self.mng.ports) return "cleared all stats" @@ -148,7 +152,6 @@ class TrexTUIPort(TrexTUIPanel): def show (self): - stats = self.stateless_client.cmd_stats([self.port_id], trex_stats.COMPACT) # print stats to screen for stat_type, stat_data in stats.iteritems(): @@ -160,6 +163,10 @@ class TrexTUIPort(TrexTUIPanel): allowed['c'] = self.key_actions['c'] + # thats it for read only + if self.stateless_client.is_read_only(): + return allowed + if self.port.state == self.port.STATE_TX: allowed['p'] = self.key_actions['p'] allowed['+'] = self.key_actions['+'] @@ -232,7 +239,7 @@ class TrexTUIPanelManager(): def __init__ (self, tui): self.tui = tui self.stateless_client = tui.stateless_client - self.acquired_ports = self.stateless_client.get_acquired_ports() + self.ports = self.stateless_client.get_all_ports() self.panels = {} @@ -242,7 +249,7 @@ class TrexTUIPanelManager(): self.key_actions['q'] = {'action': self.action_quit, 'legend': 'quit', 'show': True} self.key_actions['g'] = {'action': self.action_show_dash, 'legend': 'dashboard', 'show': True} - for port_id in self.acquired_ports: + for port_id in self.ports: self.key_actions[str(port_id)] = {'action': self.action_show_port(port_id), 'legend': 'port {0}'.format(port_id), 'show': False} self.panels['port {0}'.format(port_id)] = TrexTUIPort(self, port_id) @@ -263,7 +270,7 @@ class TrexTUIPanelManager(): x = "'{0}' - {1}, ".format(k, v['legend']) self.legend += "{:}".format(x) - self.legend += "'0-{0}' - port display".format(len(self.acquired_ports) - 1) + self.legend += "'0-{0}' - port display".format(len(self.ports) - 1) self.legend += "\n{:<12}".format(self.main_panel.get_name() + ":") @@ -282,6 +289,7 @@ class TrexTUIPanelManager(): self.generate_legend() def show (self): + print self.ports self.main_panel.show() self.print_legend() self.log.show() |