diff options
author | Dan Klein <danklein10@gmail.com> | 2015-12-20 00:07:44 +0200 |
---|---|---|
committer | Dan Klein <danklein10@gmail.com> | 2015-12-20 00:07:44 +0200 |
commit | 4ca24cf31919870a684fe78f17c856e0d220e6d5 (patch) | |
tree | f40ab95e52adca3ac713d61eb9fa3fd0d136e4ea /scripts/automation/trex_control_plane/client/trex_stateless_client.py | |
parent | 1895d21485621c3428d045fa0f5b9daf165c8260 (diff) | |
parent | 5cef472bcdc6c0b7e20e5cc42485ed5570c10f8c (diff) |
Merge branch 'master' into dan_stateless
Diffstat (limited to 'scripts/automation/trex_control_plane/client/trex_stateless_client.py')
-rwxr-xr-x | scripts/automation/trex_control_plane/client/trex_stateless_client.py | 946 |
1 files changed, 410 insertions, 536 deletions
diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py index 748817da..58fa53c9 100755 --- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py +++ b/scripts/automation/trex_control_plane/client/trex_stateless_client.py @@ -14,433 +14,40 @@ import json from common.trex_streams import * from collections import namedtuple from common.text_opts import * -# import trex_stats from common import trex_stats from client_utils import parsing_opts, text_tables import time import datetime import re +import random +from trex_port import Port +from common.trex_types import * from trex_async_client import CTRexAsyncClient -RpcCmdData = namedtuple('RpcCmdData', ['method', 'params']) -class RpcResponseStatus(namedtuple('RpcResponseStatus', ['success', 'id', 'msg'])): - __slots__ = () - def __str__(self): - return "{id:^3} - {msg} ({stat})".format(id=self.id, - msg=self.msg, - stat="success" if self.success else "fail") - -# simple class to represent complex return value -class RC(): - - def __init__ (self, rc = None, data = None): - self.rc_list = [] - - if (rc != None) and (data != None): - tuple_rc = namedtuple('RC', ['rc', 'data']) - self.rc_list.append(tuple_rc(rc, data)) - - def add (self, rc): - self.rc_list += rc.rc_list - - def good (self): - return all([x.rc for x in self.rc_list]) - - def bad (self): - return not self.good() - - def data (self): - return all([x.data if x.rc else "" for x in self.rc_list]) - - def err (self): - return all([x.data if not x.rc else "" for x in self.rc_list]) - - def annotate (self, desc = None): - if desc: - print format_text('\n{:<40}'.format(desc), 'bold'), - - if self.bad(): - # print all the errors - print "" - for x in self.rc_list: - if not x.rc: - print format_text("\n{0}".format(x.data), 'bold') - - print "" - print format_text("[FAILED]\n", 'red', 'bold') - - - else: - print format_text("[SUCCESS]\n", 'green', 'bold') - - -def RC_OK(): - return RC(True, "") -def RC_ERR(err): - return RC(False, err) - - -LoadedStreamList = namedtuple('LoadedStreamList', ['loaded', 'compiled']) - -# describes a stream DB -class CStreamsDB(object): - - def __init__(self): - self.stream_packs = {} - - def load_yaml_file(self, filename): - - stream_pack_name = filename - if stream_pack_name in self.get_loaded_streams_names(): - self.remove_stream_packs(stream_pack_name) - - stream_list = CStreamList() - loaded_obj = stream_list.load_yaml(filename) - - try: - compiled_streams = stream_list.compile_streams() - rc = self.load_streams(stream_pack_name, - LoadedStreamList(loaded_obj, - [StreamPack(v.stream_id, v.stream.dump()) - for k, v in compiled_streams.items()])) - - except Exception as e: - return None - - return self.get_stream_pack(stream_pack_name) - - def load_streams(self, name, LoadedStreamList_obj): - if name in self.stream_packs: - return False - else: - self.stream_packs[name] = LoadedStreamList_obj - return True - - def remove_stream_packs(self, *names): - removed_streams = [] - for name in names: - removed = self.stream_packs.pop(name) - if removed: - removed_streams.append(name) - return removed_streams - - def clear(self): - self.stream_packs.clear() - - def get_loaded_streams_names(self): - return self.stream_packs.keys() - - def stream_pack_exists (self, name): - return name in self.get_loaded_streams_names() - - def get_stream_pack(self, name): - if not self.stream_pack_exists(name): - return None - else: - return self.stream_packs.get(name) - - -# describes a single port -class Port(object): - STATE_DOWN = 0 - STATE_IDLE = 1 - STATE_STREAMS = 2 - STATE_TX = 3 - STATE_PAUSE = 4 - PortState = namedtuple('PortState', ['state_id', 'state_name']) - STATES_MAP = {STATE_DOWN: "DOWN", - STATE_IDLE: "IDLE", - STATE_STREAMS: "STREAMS", - STATE_TX: "ACTIVE", - STATE_PAUSE: "PAUSE"} - - - def __init__ (self, port_id, speed, driver, user, transmit): - self.port_id = port_id - self.state = self.STATE_IDLE - self.handler = None - self.transmit = transmit - self.user = user - self.driver = driver - self.speed = speed - self.streams = {} - self.port_stats = trex_stats.CPortStats(self) - - def err(self, msg): - return RC_ERR("port {0} : {1}".format(self.port_id, msg)) - - def ok(self): - return RC_OK() - - def get_speed_bps (self): - return (self.speed * 1000 * 1000 * 1000) - - # take the port - def acquire(self, force = False): - params = {"port_id": self.port_id, - "user": self.user, - "force": force} - - command = RpcCmdData("acquire", params) - rc = self.transmit(command.method, command.params) - if rc.success: - self.handler = rc.data - return self.ok() - else: - return self.err(rc.data) - - # release the port - def release(self): - params = {"port_id": self.port_id, - "handler": self.handler} - - command = RpcCmdData("release", params) - rc = self.transmit(command.method, command.params) - if rc.success: - self.handler = rc.data - return self.ok() - else: - return self.err(rc.data) - - def is_acquired(self): - return (self.handler != None) - - def is_active(self): - return(self.state == self.STATE_TX ) or (self.state == self.STATE_PAUSE) - - def sync(self, sync_data): - self.handler = sync_data['handler'] - port_state = sync_data['state'].upper() - if port_state == "DOWN": - self.state = self.STATE_DOWN - elif port_state == "IDLE": - self.state = self.STATE_IDLE - elif port_state == "STREAMS": - self.state = self.STATE_STREAMS - elif port_state == "TX": - self.state = self.STATE_TX - elif port_state == "PAUSE": - self.state = self.STATE_PAUSE - else: - raise Exception("port {0}: bad state received from server '{1}'".format(self.port_id, sync_data['state'])) - - return self.ok() - - - # return TRUE if write commands - def is_port_writable (self): - # operations on port can be done on state idle or state streams - return ((self.state == self.STATE_IDLE) or (self.state == self.STATE_STREAMS)) - - # add stream to the port - def add_stream (self, stream_id, stream_obj): - - if not self.is_port_writable(): - return self.err("Please stop port before attempting to add streams") - - - params = {"handler": self.handler, - "port_id": self.port_id, - "stream_id": stream_id, - "stream": stream_obj} - - rc, data = self.transmit("add_stream", params) - if not rc: - r = self.err(data) - print r.good() - - # add the stream - self.streams[stream_id] = stream_obj - - # the only valid state now - self.state = self.STATE_STREAMS - - return self.ok() - - # remove stream from port - def remove_stream (self, stream_id): - - if not stream_id in self.streams: - return self.err("stream {0} does not exists".format(stream_id)) - - params = {"handler": self.handler, - "port_id": self.port_id, - "stream_id": stream_id} - - - rc, data = self.transmit("remove_stream", params) - if not rc: - return self.err(data) - - self.streams[stream_id] = None - - return self.ok() - - # remove all the streams - def remove_all_streams (self): - - params = {"handler": self.handler, - "port_id": self.port_id} +class CTRexStatelessClient(object): + """docstring for CTRexStatelessClient""" - rc, data = self.transmit("remove_all_streams", params) - if not rc: - return self.err(data) + # verbose levels + VERBOSE_QUIET = 0 + VERBOSE_REGULAR = 1 + VERBOSE_HIGH = 2 + + def __init__(self, username, server="localhost", sync_port = 5050, async_port = 4500, quiet = False, virtual = False): + super(CTRexStatelessClient, self).__init__() - self.streams = {} + self.user = username - return self.ok() + self.comm_link = CTRexStatelessClient.CCommLink(server, sync_port, virtual, self.prn_func) - # get a specific stream - def get_stream (self, stream_id): - if stream_id in self.streams: - return self.streams[stream_id] + # default verbose level + if not quiet: + self.verbose = self.VERBOSE_REGULAR else: - return None - - def get_all_streams (self): - return self.streams - - - def process_mul (self, mul): - # if percentage - translate - if mul['type'] == 'percentage': - mul['type'] = 'max_bps' - mul['max'] = self.get_speed_bps() * (mul['max'] / 100) - - - # start traffic - def start (self, mul, duration): - if self.state == self.STATE_DOWN: - return self.err("Unable to start traffic - port is down") - - if self.state == self.STATE_IDLE: - return self.err("Unable to start traffic - no streams attached to port") - - if self.state == self.STATE_TX: - return self.err("Unable to start traffic - port is already transmitting") - - self.process_mul(mul) - - params = {"handler": self.handler, - "port_id": self.port_id, - "mul": mul, - "duration": duration} - - rc, data = self.transmit("start_traffic", params) - if not rc: - return self.err(data) - - self.state = self.STATE_TX - - return self.ok() - - # stop traffic - # with force ignores the cached state and sends the command - def stop (self, force = False): - - if (not force) and (self.state != self.STATE_TX) and (self.state != self.STATE_PAUSE): - return self.err("port is not transmitting") - - params = {"handler": self.handler, - "port_id": self.port_id} - - rc, data = self.transmit("stop_traffic", params) - if not rc: - return self.err(data) - - # only valid state after stop - self.state = self.STATE_STREAMS - - return self.ok() - - def pause (self): - - if (self.state != self.STATE_TX) : - return self.err("port is not transmitting") - - params = {"handler": self.handler, - "port_id": self.port_id} + self.verbose = self.VERBOSE_QUIET - rc, data = self.transmit("pause_traffic", params) - if not rc: - return self.err(data) - - # only valid state after stop - self.state = self.STATE_PAUSE - - return self.ok() - - - def resume (self): - - if (self.state != self.STATE_PAUSE) : - return self.err("port is not in pause mode") - - params = {"handler": self.handler, - "port_id": self.port_id} - - rc, data = self.transmit("resume_traffic", params) - if not rc: - return self.err(data) - - # only valid state after stop - self.state = self.STATE_TX - - return self.ok() - - - def update (self, mul): - if (self.state != self.STATE_TX) : - return self.err("port is not transmitting") - - self.process_mul(mul) - - params = {"handler": self.handler, - "port_id": self.port_id, - "mul": mul} - - rc, data = self.transmit("update_traffic", params) - if not rc: - return self.err(data) - - return self.ok() - - def get_port_state_name(self): - return self.STATES_MAP.get(self.state, "Unknown") - - ################# stats handler ###################### - def generate_port_stats(self): - return self.port_stats.generate_stats() - pass - - def generate_port_status(self): - return {"port-type": self.driver, - "maximum": "{speed} Gb/s".format(speed=self.speed), - "port-status": self.get_port_state_name() - } - - def clear_stats(self): - return self.port_stats.clear_stats() - - ################# events handler ###################### - def async_event_port_stopped (self): - self.state = self.STATE_STREAMS - - -class CTRexStatelessClient(object): - """docstring for CTRexStatelessClient""" - - def __init__(self, username, server="localhost", sync_port = 5050, async_port = 4500, virtual=False): - super(CTRexStatelessClient, self).__init__() - self.user = username - self.system_info = None - self.comm_link = CTRexStatelessClient.CCommLink(server, sync_port, virtual) - self.verbose = False self.ports = {} - # self._conn_handler = {} - # self._active_ports = set() self._connection_info = {"server": server, "sync_port": sync_port, "async_port": async_port} @@ -448,7 +55,7 @@ class CTRexStatelessClient(object): self.server_version = {} self.__err_log = None - self._async_client = CTRexAsyncClient(server, async_port, self) + self.async_client = CTRexAsyncClient(server, async_port, self, self.prn_func) self.streams_db = CStreamsDB() self.global_stats = trex_stats.CGlobalStats(self._connection_info, @@ -457,12 +64,44 @@ class CTRexStatelessClient(object): self.stats_generator = trex_stats.CTRexStatsGenerator(self.global_stats, self.ports) + self.events = [] + + self.session_id = random.getrandbits(32) + self.read_only = False self.connected = False - self.events = [] + + + # returns the port object + def get_port (self, port_id): + return self.ports.get(port_id, None) + + + # connection server ip + def get_server_ip (self): + return self.comm_link.get_server() + + # connection server port + def get_server_port (self): + return self.comm_link.get_port() + ################# events handler ###################### - + def add_event_log (self, msg, ev_type, show = False): + + if ev_type == "server": + prefix = "[server]" + elif ev_type == "local": + prefix = "[local]" + + ts = time.time() + st = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S') + self.events.append("{:<10} - {:^8} - {:}".format(st, prefix, format_text(msg, 'bold'))) + + if show: + self.prn_func(format_text("\n{:^8} - {:}".format(prefix, format_text(msg, 'bold')))) + + def handle_async_stats_update(self, dump_data): global_stats = {} port_stats = {} @@ -490,59 +129,108 @@ class CTRexStatelessClient(object): for port_id, data in port_stats.iteritems(): self.ports[port_id].port_stats.update(data) + + def handle_async_event (self, type, data): # DP stopped - ev = "[event] - " - show_event = False # port started if (type == 0): port_id = int(data['port_id']) - ev += "Port {0} has started".format(port_id) + ev = "Port {0} has started".format(port_id) + self.async_event_port_started(port_id) # port stopped elif (type == 1): port_id = int(data['port_id']) - ev += "Port {0} has stopped".format(port_id) + ev = "Port {0} has stopped".format(port_id) # call the handler self.async_event_port_stopped(port_id) - # server stopped + # port paused elif (type == 2): - ev += "Server has stopped" - self.async_event_server_stopped() - show_event = True + port_id = int(data['port_id']) + ev = "Port {0} has paused".format(port_id) - # port finished traffic + # call the handler + self.async_event_port_paused(port_id) + + # port resumed elif (type == 3): port_id = int(data['port_id']) - ev += "Port {0} job done".format(port_id) + ev = "Port {0} has resumed".format(port_id) + + # call the handler + self.async_event_port_resumed(port_id) + + # port finished traffic + elif (type == 4): + port_id = int(data['port_id']) + ev = "Port {0} job done".format(port_id) # call the handler self.async_event_port_stopped(port_id) show_event = True + # port was stolen... + elif (type == 5): + session_id = data['session_id'] + + # false alarm, its us + if session_id == self.session_id: + return + + port_id = int(data['port_id']) + who = data['who'] + + ev = "Port {0} was forcely taken by '{1}'".format(port_id, who) + + # call the handler + self.async_event_port_forced_acquired(port_id) + show_event = True + + # server stopped + elif (type == 100): + ev = "Server has stopped" + self.async_event_server_stopped() + show_event = True + + else: # unknown event - ignore return - if show_event: - print format_text("\n" + ev, 'bold') - ts = time.time() - st = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S') - self.events.append("{0} - ".format(st) + format_text(ev, 'bold')) + self.add_event_log(ev, 'server', show_event) def async_event_port_stopped (self, port_id): self.ports[port_id].async_event_port_stopped() + + def async_event_port_started (self, port_id): + self.ports[port_id].async_event_port_started() + + + def async_event_port_paused (self, port_id): + self.ports[port_id].async_event_port_paused() + + + def async_event_port_resumed (self, port_id): + self.ports[port_id].async_event_port_resumed() + + + def async_event_port_forced_acquired (self, port_id): + self.ports[port_id].async_event_forced_acquired() + self.read_only = True + def async_event_server_stopped (self): - self.disconnect() + self.connected = False + def get_events (self): return self.events @@ -552,6 +240,24 @@ class CTRexStatelessClient(object): ############# helper functions section ############## + # measure time for functions + def timing(f): + def wrap(*args): + time1 = time.time() + ret = f(*args) + + # don't want to print on error + if ret.bad(): + return ret + + delta = time.time() - time1 + print format_time(delta) + "\n" + + return ret + + return wrap + + def validate_port_list(self, port_id_list): if not isinstance(port_id_list, list): print type(port_id_list) @@ -584,66 +290,120 @@ class CTRexStatelessClient(object): ############ boot up section ################ # connection sequence - def connect(self): + # mode can be RW - read / write, RWF - read write with force , RO - read only + def connect(self, mode = "RW"): + + if self.is_connected(): + self.disconnect() + + # clear this flag self.connected = False - # connect - rc, data = self.comm_link.connect() - if not rc: - return RC_ERR(data) + # connect sync channel + rc = self.comm_link.connect() + if rc.bad(): + return rc + + # connect async channel + rc = self.async_client.connect() + if rc.bad(): + return rc # version - rc, data = self.transmit("get_version") - if not rc: - return RC_ERR(data) + rc = self.transmit("get_version") + if rc.bad(): + return rc - self.server_version = data - self.global_stats.server_version = data + self.server_version = rc.data() + self.global_stats.server_version = rc.data() # cache system info - # self.get_system_info(refresh=True) - rc, data = self.transmit("get_system_info") - if not rc: - return RC_ERR(data) - self.system_info = data + rc = self.transmit("get_system_info") + if rc.bad(): + return rc + + self.system_info = rc.data() # cache supported commands - rc, data = self.transmit("get_supported_cmds") - if not rc: - return RC_ERR(data) + rc = self.transmit("get_supported_cmds") + if rc.bad(): + return rc - self.supported_cmds = data + self.supported_cmds = rc.data() # create ports for port_id in xrange(self.get_port_count()): speed = self.system_info['ports'][port_id]['speed'] driver = self.system_info['ports'][port_id]['driver'] - self.ports[port_id] = Port(port_id, speed, driver, self.user, self.transmit) - # acquire all ports - rc = self.acquire() - if rc.bad(): - return rc + self.ports[port_id] = Port(port_id, speed, driver, self.user, self.comm_link, self.session_id) + - rc = self.sync_with_server() + # sync the ports + rc = self.sync_ports() if rc.bad(): return rc - self.connected = True + # acquire all ports + if mode == "RW": + rc = self.acquire(force = False) + + # fallback to read only if failed + if rc.bad(): + rc.annotate(show_status = False) + print format_text("Switching to read only mode - only few commands will be available", 'bold') + + self.release(self.get_acquired_ports()) + self.read_only = True + else: + self.read_only = False + + elif mode == "RWF": + rc = self.acquire(force = True) + if rc.bad(): + return rc + self.read_only = False + elif mode == "RO": + # no acquire on read only + rc = RC_OK() + self.read_only = True + + + + self.connected = True return RC_OK() + + def is_read_only (self): + return self.read_only + def is_connected (self): return self.connected and self.comm_link.is_connected def disconnect(self): - self.connected = False + # release any previous acquired ports + if self.is_connected(): + self.release(self.get_acquired_ports()) + self.comm_link.disconnect() + self.async_client.disconnect() + + self.connected = False + return RC_OK() + def on_async_dead (self): + if self.connected: + msg = 'lost connection to server' + self.add_event_log(msg, 'local', True) + self.connected = False + + def on_async_alive (self): + pass ########### cached queries (no server traffic) ########### @@ -666,8 +426,8 @@ class CTRexStatelessClient(object): else: return port_ids - def get_stats_async(self): - return self._async_client.get_stats() + def get_stats_async (self): + return self.async_client.get_stats() def get_connection_port (self): return self.comm_link.port @@ -675,6 +435,9 @@ class CTRexStatelessClient(object): def get_connection_ip (self): return self.comm_link.server + def get_all_ports (self): + return [port_id for port_id, port_obj in self.ports.iteritems()] + def get_acquired_ports(self): return [port_id for port_id, port_obj in self.ports.iteritems() @@ -685,36 +448,60 @@ class CTRexStatelessClient(object): for port_id, port_obj in self.ports.iteritems() if port_obj.is_active()] + def get_paused_ports (self): + return [port_id + for port_id, port_obj in self.ports.iteritems() + if port_obj.is_paused()] + + def get_transmitting_ports (self): + return [port_id + for port_id, port_obj in self.ports.iteritems() + if port_obj.is_transmitting()] + def set_verbose(self, mode): - self.comm_link.set_verbose(mode) + + # on high - enable link verbose + if mode == self.VERBOSE_HIGH: + self.comm_link.set_verbose(True) + else: + self.comm_link.set_verbose(False) + self.verbose = mode + + def check_verbose (self, level): + return (self.verbose >= level) + + def get_verbose (self): + return self.verbose + + def prn_func (self, msg, level = VERBOSE_REGULAR): + if self.check_verbose(level): + print msg + ############# server actions ################ # ping server def ping(self): - rc, info = self.transmit("ping") - return RC(rc, info) + return self.transmit("ping") - def sync_with_server(self, sync_streams=False): - rc, data = self.transmit("sync_user", {"user": self.user, "sync_streams": sync_streams}) - if not rc: - return RC_ERR(data) - - for port_info in data: - rc = self.ports[port_info['port_id']].sync(port_info) - if rc.bad(): - return rc - - return RC_OK() def get_global_stats(self): - rc, info = self.transmit("get_global_stats") - return RC(rc, info) + return self.transmit("get_global_stats") ########## port commands ############## + def sync_ports (self, port_id_list = None, force = False): + port_id_list = self.__ports(port_id_list) + + rc = RC() + + for port_id in port_id_list: + rc.add(self.ports[port_id].sync()) + + return rc + # acquire ports, if port_list is none - get all def acquire (self, port_id_list = None, force = False): port_id_list = self.__ports(port_id_list) @@ -733,7 +520,7 @@ class CTRexStatelessClient(object): rc = RC() for port_id in port_id_list: - rc.add(self.ports[port_id].release(force)) + rc.add(self.ports[port_id].release()) return rc @@ -750,15 +537,16 @@ class CTRexStatelessClient(object): return rc + def add_stream_pack(self, stream_pack_list, port_id_list = None): port_id_list = self.__ports(port_id_list) rc = RC() - for stream_pack in stream_pack_list: - rc.add(self.add_stream(stream_pack.stream_id, stream_pack.stream, port_id_list)) - + for port_id in port_id_list: + rc.add(self.ports[port_id].add_streams(stream_pack_list)) + return rc @@ -855,6 +643,17 @@ class CTRexStatelessClient(object): return rc + def validate (self, port_id_list = None): + port_id_list = self.__ports(port_id_list) + + rc = RC() + + for port_id in port_id_list: + rc.add(self.ports[port_id].validate()) + + return rc + + def get_port_stats(self, port_id=None): pass @@ -866,16 +665,19 @@ class CTRexStatelessClient(object): return self.comm_link.transmit(method_name, params) + def transmit_batch(self, batch_list): + return self.comm_link.transmit_batch(batch_list) ######################### Console (high level) API ######################### + @timing def cmd_ping(self): rc = self.ping() rc.annotate("Pinging the server on '{0}' port '{1}': ".format(self.get_connection_ip(), self.get_connection_port())) return rc - def cmd_connect(self): - rc = self.connect() + def cmd_connect(self, mode = "RW"): + rc = self.connect(mode) rc.annotate() return rc @@ -886,13 +688,7 @@ class CTRexStatelessClient(object): # reset def cmd_reset(self): - - - # sync with the server - rc = self.sync_with_server() - rc.annotate("Syncing with the server:") - if rc.bad(): - return rc + #self.release(self.get_acquired_ports()) rc = self.acquire(force = True) rc.annotate("Force acquiring all ports:") @@ -924,7 +720,7 @@ class CTRexStatelessClient(object): active_ports = list(set(self.get_active_ports()).intersection(port_id_list)) if not active_ports: - msg = "No active traffic on porvided ports" + msg = "No active traffic on provided ports" print format_text(msg, 'bold') return RC_ERR(msg) @@ -948,15 +744,26 @@ class CTRexStatelessClient(object): rc = self.update_traffic(mult, active_ports) rc.annotate("Updating traffic on port(s) {0}:".format(port_id_list)) - if rc.bad(): - return rc - return RC_OK() + return rc + # clear stats def cmd_clear(self, port_id_list): + for port_id in port_id_list: self.ports[port_id].clear_stats() + self.global_stats.clear_stats() + + return RC_OK() + + + def cmd_invalidate (self, port_id_list): + for port_id in port_id_list: + self.ports[port_id].invalidate_stats() + + self.global_stats.invalidate() + return RC_OK() # pause cmd @@ -972,10 +779,8 @@ class CTRexStatelessClient(object): rc = self.pause_traffic(active_ports) rc.annotate("Pausing traffic on port(s) {0}:".format(port_id_list)) - if rc.bad(): - return rc + return rc - return RC_OK() # resume cmd @@ -991,14 +796,11 @@ class CTRexStatelessClient(object): rc = self.resume_traffic(active_ports) rc.annotate("Resume traffic on port(s) {0}:".format(port_id_list)) - if rc.bad(): - return rc - - return RC_OK() + return rc # start cmd - def cmd_start (self, port_id_list, stream_list, mult, force, duration): + def cmd_start (self, port_id_list, stream_list, mult, force, duration, dry): active_ports = list(set(self.get_active_ports()).intersection(port_id_list)) @@ -1020,19 +822,37 @@ class CTRexStatelessClient(object): rc = self.add_stream_pack(stream_list.compiled, port_id_list) - rc.annotate("Attaching streams to port(s) {0}:".format(port_id_list)) + rc.annotate("Attaching {0} streams to port(s) {1}:".format(len(stream_list.compiled), port_id_list)) if rc.bad(): return rc + # when not on dry - start the traffic , otherwise validate only + if not dry: + rc = self.start_traffic(mult, duration, port_id_list) + rc.annotate("Starting traffic on port(s) {0}:".format(port_id_list)) - # finally, start the traffic - rc = self.start_traffic(mult, duration, port_id_list) - rc.annotate("Starting traffic on port(s) {0}:".format(port_id_list)) - if rc.bad(): return rc + else: + rc = self.validate(port_id_list) + rc.annotate("Validating traffic profile on port(s) {0}:".format(port_id_list)) - return RC_OK() + if rc.bad(): + return rc + # show a profile on one port for illustration + self.ports[port_id_list[0]].print_profile(mult, duration) + + return rc + + + # validate port(s) profile + def cmd_validate (self, port_id_list): + rc = self.validate(port_id_list) + rc.annotate("Validating streams on port(s) {0}:".format(port_id_list)) + return rc + + + # stats def cmd_stats(self, port_id_list, stats_mask=set()): stats_opts = trex_stats.ALL_STATS_OPTS.intersection(stats_mask) @@ -1043,6 +863,26 @@ class CTRexStatelessClient(object): ############## High Level API With Parser ################ + + def cmd_connect_line (self, line): + '''Connects to the TRex server''' + # define a parser + parser = parsing_opts.gen_parser(self, + "connect", + self.cmd_connect_line.__doc__, + parsing_opts.FORCE) + + opts = parser.parse_args(line.split()) + + if opts is None: + return RC_ERR("bad command line parameters") + + if opts.force: + rc = self.cmd_connect(mode = "RWF") + else: + rc = self.cmd_connect(mode = "RW") + + @timing def cmd_start_line (self, line): '''Start selected traffic in specified ports on TRex\n''' # define a parser @@ -1054,13 +894,19 @@ class CTRexStatelessClient(object): parsing_opts.FORCE, parsing_opts.STREAM_FROM_PATH_OR_FILE, parsing_opts.DURATION, - parsing_opts.MULTIPLIER) + parsing_opts.MULTIPLIER_STRICT, + parsing_opts.DRY_RUN) opts = parser.parse_args(line.split()) + if opts is None: return RC_ERR("bad command line parameters") + + if opts.dry: + print format_text("\n*** DRY RUN ***", 'bold') + if opts.db: stream_list = self.streams_db.get_stream_pack(opts.db) rc = RC(stream_list != None) @@ -1070,7 +916,15 @@ class CTRexStatelessClient(object): else: # load streams from file - stream_list = self.streams_db.load_yaml_file(opts.file[0]) + stream_list = None; + try: + stream_list = self.streams_db.load_yaml_file(opts.file[0]) + except Exception as e: + s = str(e) + rc=RC_ERR(s) + rc.annotate() + return rc + rc = RC(stream_list != None) rc.annotate("Load stream pack (from file):") if stream_list == None: @@ -1078,12 +932,13 @@ class CTRexStatelessClient(object): # total has no meaning with percentage - its linear - if opts.total and (mult['type'] != 'percentage'): + if opts.total and (opts.mult['type'] != 'percentage'): # if total was set - divide it between the ports - opts.mult['max'] = opts.mult['max'] / len(opts.ports) + opts.mult['value'] = opts.mult['value'] / len(opts.ports) - return self.cmd_start(opts.ports, stream_list, opts.mult, opts.force, opts.duration) + return self.cmd_start(opts.ports, stream_list, opts.mult, opts.force, opts.duration, opts.dry) + @timing def cmd_resume_line (self, line): '''Resume active traffic in specified ports on TRex\n''' parser = parsing_opts.gen_parser(self, @@ -1097,6 +952,8 @@ class CTRexStatelessClient(object): return self.cmd_resume(opts.ports) + + @timing def cmd_stop_line (self, line): '''Stop active traffic in specified ports on TRex\n''' parser = parsing_opts.gen_parser(self, @@ -1110,6 +967,8 @@ class CTRexStatelessClient(object): return self.cmd_stop(opts.ports) + + @timing def cmd_pause_line (self, line): '''Pause active traffic in specified ports on TRex\n''' parser = parsing_opts.gen_parser(self, @@ -1123,6 +982,8 @@ class CTRexStatelessClient(object): return self.cmd_pause(opts.ports) + + @timing def cmd_update_line (self, line): '''Update port(s) speed currently active\n''' parser = parsing_opts.gen_parser(self, @@ -1139,14 +1000,15 @@ class CTRexStatelessClient(object): # total has no meaning with percentage - its linear if opts.total and (opts.mult['type'] != 'percentage'): # if total was set - divide it between the ports - opts.mult['max'] = opts.mult['max'] / len(opts.ports) + opts.mult['value'] = opts.mult['value'] / len(opts.ports) return self.cmd_update(opts.ports, opts.mult) - + @timing def cmd_reset_line (self, line): return self.cmd_reset() + def cmd_clear_line (self, line): '''Clear cached local statistics\n''' # define a parser @@ -1161,6 +1023,7 @@ class CTRexStatelessClient(object): return RC_ERR("bad command line parameters") return self.cmd_clear(opts.ports) + def cmd_stats_line (self, line): '''Fetch statistics from TRex server by port\n''' # define a parser @@ -1180,30 +1043,34 @@ class CTRexStatelessClient(object): if not mask: # set to show all stats if no filter was given mask = trex_stats.ALL_STATS_OPTS - # get stats objects, as dictionary + stats = self.cmd_stats(opts.ports, mask) + # print stats to screen for stat_type, stat_data in stats.iteritems(): text_tables.print_table_with_header(stat_data.text_table, stat_type) - return - - # if opts.db: - # stream_list = self.streams_db.get_stream_pack(opts.db) - # rc = RC(stream_list != None) - # rc.annotate("Load stream pack (from DB):") - # if rc.bad(): - # return RC_ERR("Failed to load stream pack") - # - # else: - # # load streams from file - # stream_list = self.streams_db.load_yaml_file(opts.file[0]) - # rc = RC(stream_list != None) - # rc.annotate("Load stream pack (from file):") - # if stream_list == None: - # return RC_ERR("Failed to load stream pack") - # - # - # return self.cmd_start(opts.ports, stream_list, opts.mult, opts.force, opts.duration) + + + return RC_OK() + + + + @timing + def cmd_validate_line (self, line): + '''validates port(s) stream configuration\n''' + + parser = parsing_opts.gen_parser(self, + "validate", + self.cmd_validate_line.__doc__, + parsing_opts.PORT_LIST_WITH_ALL) + + opts = parser.parse_args(line.split()) + if opts is None: + return RC_ERR("bad command line paramters") + + rc = self.cmd_validate(opts.ports) + return rc + def cmd_exit_line (self, line): print format_text("Exiting\n", 'bold') @@ -1280,6 +1147,7 @@ class CTRexStatelessClient(object): return True + ################################# # ------ private methods ------ # @staticmethod @@ -1294,17 +1162,18 @@ class CTRexStatelessClient(object): def _filter_namespace_args(namespace, ok_values): return {k: v for k, v in namespace.__dict__.items() if k in ok_values} + ################################# # ------ private classes ------ # class CCommLink(object): """describes the connectivity of the stateless client method""" - def __init__(self, server="localhost", port=5050, virtual=False): + def __init__(self, server="localhost", port=5050, virtual=False, prn_func = None): super(CTRexStatelessClient.CCommLink, self).__init__() self.virtual = virtual self.server = server self.port = port self.verbose = False - self.rpc_link = JsonRpcClient(self.server, self.port) + self.rpc_link = JsonRpcClient(self.server, self.port, prn_func) @property def is_connected(self): @@ -1313,6 +1182,12 @@ class CTRexStatelessClient(object): else: return True + def get_server (self): + return self.server + + def get_port (self): + return self.port + def set_verbose(self, mode): self.verbose = mode return self.rpc_link.set_verbose(mode) @@ -1354,4 +1229,3 @@ class CTRexStatelessClient(object): if __name__ == "__main__": pass - |