summaryrefslogtreecommitdiffstats
path: root/scripts/automation/trex_control_plane/client/trex_stateless_client.py
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/automation/trex_control_plane/client/trex_stateless_client.py')
-rwxr-xr-xscripts/automation/trex_control_plane/client/trex_stateless_client.py946
1 files changed, 410 insertions, 536 deletions
diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py
index 748817da..58fa53c9 100755
--- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py
+++ b/scripts/automation/trex_control_plane/client/trex_stateless_client.py
@@ -14,433 +14,40 @@ import json
from common.trex_streams import *
from collections import namedtuple
from common.text_opts import *
-# import trex_stats
from common import trex_stats
from client_utils import parsing_opts, text_tables
import time
import datetime
import re
+import random
+from trex_port import Port
+from common.trex_types import *
from trex_async_client import CTRexAsyncClient
-RpcCmdData = namedtuple('RpcCmdData', ['method', 'params'])
-class RpcResponseStatus(namedtuple('RpcResponseStatus', ['success', 'id', 'msg'])):
- __slots__ = ()
- def __str__(self):
- return "{id:^3} - {msg} ({stat})".format(id=self.id,
- msg=self.msg,
- stat="success" if self.success else "fail")
-
-# simple class to represent complex return value
-class RC():
-
- def __init__ (self, rc = None, data = None):
- self.rc_list = []
-
- if (rc != None) and (data != None):
- tuple_rc = namedtuple('RC', ['rc', 'data'])
- self.rc_list.append(tuple_rc(rc, data))
-
- def add (self, rc):
- self.rc_list += rc.rc_list
-
- def good (self):
- return all([x.rc for x in self.rc_list])
-
- def bad (self):
- return not self.good()
-
- def data (self):
- return all([x.data if x.rc else "" for x in self.rc_list])
-
- def err (self):
- return all([x.data if not x.rc else "" for x in self.rc_list])
-
- def annotate (self, desc = None):
- if desc:
- print format_text('\n{:<40}'.format(desc), 'bold'),
-
- if self.bad():
- # print all the errors
- print ""
- for x in self.rc_list:
- if not x.rc:
- print format_text("\n{0}".format(x.data), 'bold')
-
- print ""
- print format_text("[FAILED]\n", 'red', 'bold')
-
-
- else:
- print format_text("[SUCCESS]\n", 'green', 'bold')
-
-
-def RC_OK():
- return RC(True, "")
-def RC_ERR(err):
- return RC(False, err)
-
-
-LoadedStreamList = namedtuple('LoadedStreamList', ['loaded', 'compiled'])
-
-# describes a stream DB
-class CStreamsDB(object):
-
- def __init__(self):
- self.stream_packs = {}
-
- def load_yaml_file(self, filename):
-
- stream_pack_name = filename
- if stream_pack_name in self.get_loaded_streams_names():
- self.remove_stream_packs(stream_pack_name)
-
- stream_list = CStreamList()
- loaded_obj = stream_list.load_yaml(filename)
-
- try:
- compiled_streams = stream_list.compile_streams()
- rc = self.load_streams(stream_pack_name,
- LoadedStreamList(loaded_obj,
- [StreamPack(v.stream_id, v.stream.dump())
- for k, v in compiled_streams.items()]))
-
- except Exception as e:
- return None
-
- return self.get_stream_pack(stream_pack_name)
-
- def load_streams(self, name, LoadedStreamList_obj):
- if name in self.stream_packs:
- return False
- else:
- self.stream_packs[name] = LoadedStreamList_obj
- return True
-
- def remove_stream_packs(self, *names):
- removed_streams = []
- for name in names:
- removed = self.stream_packs.pop(name)
- if removed:
- removed_streams.append(name)
- return removed_streams
-
- def clear(self):
- self.stream_packs.clear()
-
- def get_loaded_streams_names(self):
- return self.stream_packs.keys()
-
- def stream_pack_exists (self, name):
- return name in self.get_loaded_streams_names()
-
- def get_stream_pack(self, name):
- if not self.stream_pack_exists(name):
- return None
- else:
- return self.stream_packs.get(name)
-
-
-# describes a single port
-class Port(object):
- STATE_DOWN = 0
- STATE_IDLE = 1
- STATE_STREAMS = 2
- STATE_TX = 3
- STATE_PAUSE = 4
- PortState = namedtuple('PortState', ['state_id', 'state_name'])
- STATES_MAP = {STATE_DOWN: "DOWN",
- STATE_IDLE: "IDLE",
- STATE_STREAMS: "STREAMS",
- STATE_TX: "ACTIVE",
- STATE_PAUSE: "PAUSE"}
-
-
- def __init__ (self, port_id, speed, driver, user, transmit):
- self.port_id = port_id
- self.state = self.STATE_IDLE
- self.handler = None
- self.transmit = transmit
- self.user = user
- self.driver = driver
- self.speed = speed
- self.streams = {}
- self.port_stats = trex_stats.CPortStats(self)
-
- def err(self, msg):
- return RC_ERR("port {0} : {1}".format(self.port_id, msg))
-
- def ok(self):
- return RC_OK()
-
- def get_speed_bps (self):
- return (self.speed * 1000 * 1000 * 1000)
-
- # take the port
- def acquire(self, force = False):
- params = {"port_id": self.port_id,
- "user": self.user,
- "force": force}
-
- command = RpcCmdData("acquire", params)
- rc = self.transmit(command.method, command.params)
- if rc.success:
- self.handler = rc.data
- return self.ok()
- else:
- return self.err(rc.data)
-
- # release the port
- def release(self):
- params = {"port_id": self.port_id,
- "handler": self.handler}
-
- command = RpcCmdData("release", params)
- rc = self.transmit(command.method, command.params)
- if rc.success:
- self.handler = rc.data
- return self.ok()
- else:
- return self.err(rc.data)
-
- def is_acquired(self):
- return (self.handler != None)
-
- def is_active(self):
- return(self.state == self.STATE_TX ) or (self.state == self.STATE_PAUSE)
-
- def sync(self, sync_data):
- self.handler = sync_data['handler']
- port_state = sync_data['state'].upper()
- if port_state == "DOWN":
- self.state = self.STATE_DOWN
- elif port_state == "IDLE":
- self.state = self.STATE_IDLE
- elif port_state == "STREAMS":
- self.state = self.STATE_STREAMS
- elif port_state == "TX":
- self.state = self.STATE_TX
- elif port_state == "PAUSE":
- self.state = self.STATE_PAUSE
- else:
- raise Exception("port {0}: bad state received from server '{1}'".format(self.port_id, sync_data['state']))
-
- return self.ok()
-
-
- # return TRUE if write commands
- def is_port_writable (self):
- # operations on port can be done on state idle or state streams
- return ((self.state == self.STATE_IDLE) or (self.state == self.STATE_STREAMS))
-
- # add stream to the port
- def add_stream (self, stream_id, stream_obj):
-
- if not self.is_port_writable():
- return self.err("Please stop port before attempting to add streams")
-
-
- params = {"handler": self.handler,
- "port_id": self.port_id,
- "stream_id": stream_id,
- "stream": stream_obj}
-
- rc, data = self.transmit("add_stream", params)
- if not rc:
- r = self.err(data)
- print r.good()
-
- # add the stream
- self.streams[stream_id] = stream_obj
-
- # the only valid state now
- self.state = self.STATE_STREAMS
-
- return self.ok()
-
- # remove stream from port
- def remove_stream (self, stream_id):
-
- if not stream_id in self.streams:
- return self.err("stream {0} does not exists".format(stream_id))
-
- params = {"handler": self.handler,
- "port_id": self.port_id,
- "stream_id": stream_id}
-
-
- rc, data = self.transmit("remove_stream", params)
- if not rc:
- return self.err(data)
-
- self.streams[stream_id] = None
-
- return self.ok()
-
- # remove all the streams
- def remove_all_streams (self):
-
- params = {"handler": self.handler,
- "port_id": self.port_id}
+class CTRexStatelessClient(object):
+ """docstring for CTRexStatelessClient"""
- rc, data = self.transmit("remove_all_streams", params)
- if not rc:
- return self.err(data)
+ # verbose levels
+ VERBOSE_QUIET = 0
+ VERBOSE_REGULAR = 1
+ VERBOSE_HIGH = 2
+
+ def __init__(self, username, server="localhost", sync_port = 5050, async_port = 4500, quiet = False, virtual = False):
+ super(CTRexStatelessClient, self).__init__()
- self.streams = {}
+ self.user = username
- return self.ok()
+ self.comm_link = CTRexStatelessClient.CCommLink(server, sync_port, virtual, self.prn_func)
- # get a specific stream
- def get_stream (self, stream_id):
- if stream_id in self.streams:
- return self.streams[stream_id]
+ # default verbose level
+ if not quiet:
+ self.verbose = self.VERBOSE_REGULAR
else:
- return None
-
- def get_all_streams (self):
- return self.streams
-
-
- def process_mul (self, mul):
- # if percentage - translate
- if mul['type'] == 'percentage':
- mul['type'] = 'max_bps'
- mul['max'] = self.get_speed_bps() * (mul['max'] / 100)
-
-
- # start traffic
- def start (self, mul, duration):
- if self.state == self.STATE_DOWN:
- return self.err("Unable to start traffic - port is down")
-
- if self.state == self.STATE_IDLE:
- return self.err("Unable to start traffic - no streams attached to port")
-
- if self.state == self.STATE_TX:
- return self.err("Unable to start traffic - port is already transmitting")
-
- self.process_mul(mul)
-
- params = {"handler": self.handler,
- "port_id": self.port_id,
- "mul": mul,
- "duration": duration}
-
- rc, data = self.transmit("start_traffic", params)
- if not rc:
- return self.err(data)
-
- self.state = self.STATE_TX
-
- return self.ok()
-
- # stop traffic
- # with force ignores the cached state and sends the command
- def stop (self, force = False):
-
- if (not force) and (self.state != self.STATE_TX) and (self.state != self.STATE_PAUSE):
- return self.err("port is not transmitting")
-
- params = {"handler": self.handler,
- "port_id": self.port_id}
-
- rc, data = self.transmit("stop_traffic", params)
- if not rc:
- return self.err(data)
-
- # only valid state after stop
- self.state = self.STATE_STREAMS
-
- return self.ok()
-
- def pause (self):
-
- if (self.state != self.STATE_TX) :
- return self.err("port is not transmitting")
-
- params = {"handler": self.handler,
- "port_id": self.port_id}
+ self.verbose = self.VERBOSE_QUIET
- rc, data = self.transmit("pause_traffic", params)
- if not rc:
- return self.err(data)
-
- # only valid state after stop
- self.state = self.STATE_PAUSE
-
- return self.ok()
-
-
- def resume (self):
-
- if (self.state != self.STATE_PAUSE) :
- return self.err("port is not in pause mode")
-
- params = {"handler": self.handler,
- "port_id": self.port_id}
-
- rc, data = self.transmit("resume_traffic", params)
- if not rc:
- return self.err(data)
-
- # only valid state after stop
- self.state = self.STATE_TX
-
- return self.ok()
-
-
- def update (self, mul):
- if (self.state != self.STATE_TX) :
- return self.err("port is not transmitting")
-
- self.process_mul(mul)
-
- params = {"handler": self.handler,
- "port_id": self.port_id,
- "mul": mul}
-
- rc, data = self.transmit("update_traffic", params)
- if not rc:
- return self.err(data)
-
- return self.ok()
-
- def get_port_state_name(self):
- return self.STATES_MAP.get(self.state, "Unknown")
-
- ################# stats handler ######################
- def generate_port_stats(self):
- return self.port_stats.generate_stats()
- pass
-
- def generate_port_status(self):
- return {"port-type": self.driver,
- "maximum": "{speed} Gb/s".format(speed=self.speed),
- "port-status": self.get_port_state_name()
- }
-
- def clear_stats(self):
- return self.port_stats.clear_stats()
-
- ################# events handler ######################
- def async_event_port_stopped (self):
- self.state = self.STATE_STREAMS
-
-
-class CTRexStatelessClient(object):
- """docstring for CTRexStatelessClient"""
-
- def __init__(self, username, server="localhost", sync_port = 5050, async_port = 4500, virtual=False):
- super(CTRexStatelessClient, self).__init__()
- self.user = username
- self.system_info = None
- self.comm_link = CTRexStatelessClient.CCommLink(server, sync_port, virtual)
- self.verbose = False
self.ports = {}
- # self._conn_handler = {}
- # self._active_ports = set()
self._connection_info = {"server": server,
"sync_port": sync_port,
"async_port": async_port}
@@ -448,7 +55,7 @@ class CTRexStatelessClient(object):
self.server_version = {}
self.__err_log = None
- self._async_client = CTRexAsyncClient(server, async_port, self)
+ self.async_client = CTRexAsyncClient(server, async_port, self, self.prn_func)
self.streams_db = CStreamsDB()
self.global_stats = trex_stats.CGlobalStats(self._connection_info,
@@ -457,12 +64,44 @@ class CTRexStatelessClient(object):
self.stats_generator = trex_stats.CTRexStatsGenerator(self.global_stats,
self.ports)
+ self.events = []
+
+ self.session_id = random.getrandbits(32)
+ self.read_only = False
self.connected = False
- self.events = []
+
+
+ # returns the port object
+ def get_port (self, port_id):
+ return self.ports.get(port_id, None)
+
+
+ # connection server ip
+ def get_server_ip (self):
+ return self.comm_link.get_server()
+
+ # connection server port
+ def get_server_port (self):
+ return self.comm_link.get_port()
+
################# events handler ######################
-
+ def add_event_log (self, msg, ev_type, show = False):
+
+ if ev_type == "server":
+ prefix = "[server]"
+ elif ev_type == "local":
+ prefix = "[local]"
+
+ ts = time.time()
+ st = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S')
+ self.events.append("{:<10} - {:^8} - {:}".format(st, prefix, format_text(msg, 'bold')))
+
+ if show:
+ self.prn_func(format_text("\n{:^8} - {:}".format(prefix, format_text(msg, 'bold'))))
+
+
def handle_async_stats_update(self, dump_data):
global_stats = {}
port_stats = {}
@@ -490,59 +129,108 @@ class CTRexStatelessClient(object):
for port_id, data in port_stats.iteritems():
self.ports[port_id].port_stats.update(data)
+
+
def handle_async_event (self, type, data):
# DP stopped
- ev = "[event] - "
-
show_event = False
# port started
if (type == 0):
port_id = int(data['port_id'])
- ev += "Port {0} has started".format(port_id)
+ ev = "Port {0} has started".format(port_id)
+ self.async_event_port_started(port_id)
# port stopped
elif (type == 1):
port_id = int(data['port_id'])
- ev += "Port {0} has stopped".format(port_id)
+ ev = "Port {0} has stopped".format(port_id)
# call the handler
self.async_event_port_stopped(port_id)
- # server stopped
+ # port paused
elif (type == 2):
- ev += "Server has stopped"
- self.async_event_server_stopped()
- show_event = True
+ port_id = int(data['port_id'])
+ ev = "Port {0} has paused".format(port_id)
- # port finished traffic
+ # call the handler
+ self.async_event_port_paused(port_id)
+
+ # port resumed
elif (type == 3):
port_id = int(data['port_id'])
- ev += "Port {0} job done".format(port_id)
+ ev = "Port {0} has resumed".format(port_id)
+
+ # call the handler
+ self.async_event_port_resumed(port_id)
+
+ # port finished traffic
+ elif (type == 4):
+ port_id = int(data['port_id'])
+ ev = "Port {0} job done".format(port_id)
# call the handler
self.async_event_port_stopped(port_id)
show_event = True
+ # port was stolen...
+ elif (type == 5):
+ session_id = data['session_id']
+
+ # false alarm, its us
+ if session_id == self.session_id:
+ return
+
+ port_id = int(data['port_id'])
+ who = data['who']
+
+ ev = "Port {0} was forcely taken by '{1}'".format(port_id, who)
+
+ # call the handler
+ self.async_event_port_forced_acquired(port_id)
+ show_event = True
+
+ # server stopped
+ elif (type == 100):
+ ev = "Server has stopped"
+ self.async_event_server_stopped()
+ show_event = True
+
+
else:
# unknown event - ignore
return
- if show_event:
- print format_text("\n" + ev, 'bold')
- ts = time.time()
- st = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S')
- self.events.append("{0} - ".format(st) + format_text(ev, 'bold'))
+ self.add_event_log(ev, 'server', show_event)
def async_event_port_stopped (self, port_id):
self.ports[port_id].async_event_port_stopped()
+
+ def async_event_port_started (self, port_id):
+ self.ports[port_id].async_event_port_started()
+
+
+ def async_event_port_paused (self, port_id):
+ self.ports[port_id].async_event_port_paused()
+
+
+ def async_event_port_resumed (self, port_id):
+ self.ports[port_id].async_event_port_resumed()
+
+
+ def async_event_port_forced_acquired (self, port_id):
+ self.ports[port_id].async_event_forced_acquired()
+ self.read_only = True
+
def async_event_server_stopped (self):
- self.disconnect()
+ self.connected = False
+
def get_events (self):
return self.events
@@ -552,6 +240,24 @@ class CTRexStatelessClient(object):
############# helper functions section ##############
+ # measure time for functions
+ def timing(f):
+ def wrap(*args):
+ time1 = time.time()
+ ret = f(*args)
+
+ # don't want to print on error
+ if ret.bad():
+ return ret
+
+ delta = time.time() - time1
+ print format_time(delta) + "\n"
+
+ return ret
+
+ return wrap
+
+
def validate_port_list(self, port_id_list):
if not isinstance(port_id_list, list):
print type(port_id_list)
@@ -584,66 +290,120 @@ class CTRexStatelessClient(object):
############ boot up section ################
# connection sequence
- def connect(self):
+ # mode can be RW - read / write, RWF - read write with force , RO - read only
+ def connect(self, mode = "RW"):
+
+ if self.is_connected():
+ self.disconnect()
+
+ # clear this flag
self.connected = False
- # connect
- rc, data = self.comm_link.connect()
- if not rc:
- return RC_ERR(data)
+ # connect sync channel
+ rc = self.comm_link.connect()
+ if rc.bad():
+ return rc
+
+ # connect async channel
+ rc = self.async_client.connect()
+ if rc.bad():
+ return rc
# version
- rc, data = self.transmit("get_version")
- if not rc:
- return RC_ERR(data)
+ rc = self.transmit("get_version")
+ if rc.bad():
+ return rc
- self.server_version = data
- self.global_stats.server_version = data
+ self.server_version = rc.data()
+ self.global_stats.server_version = rc.data()
# cache system info
- # self.get_system_info(refresh=True)
- rc, data = self.transmit("get_system_info")
- if not rc:
- return RC_ERR(data)
- self.system_info = data
+ rc = self.transmit("get_system_info")
+ if rc.bad():
+ return rc
+
+ self.system_info = rc.data()
# cache supported commands
- rc, data = self.transmit("get_supported_cmds")
- if not rc:
- return RC_ERR(data)
+ rc = self.transmit("get_supported_cmds")
+ if rc.bad():
+ return rc
- self.supported_cmds = data
+ self.supported_cmds = rc.data()
# create ports
for port_id in xrange(self.get_port_count()):
speed = self.system_info['ports'][port_id]['speed']
driver = self.system_info['ports'][port_id]['driver']
- self.ports[port_id] = Port(port_id, speed, driver, self.user, self.transmit)
- # acquire all ports
- rc = self.acquire()
- if rc.bad():
- return rc
+ self.ports[port_id] = Port(port_id, speed, driver, self.user, self.comm_link, self.session_id)
+
- rc = self.sync_with_server()
+ # sync the ports
+ rc = self.sync_ports()
if rc.bad():
return rc
- self.connected = True
+ # acquire all ports
+ if mode == "RW":
+ rc = self.acquire(force = False)
+
+ # fallback to read only if failed
+ if rc.bad():
+ rc.annotate(show_status = False)
+ print format_text("Switching to read only mode - only few commands will be available", 'bold')
+
+ self.release(self.get_acquired_ports())
+ self.read_only = True
+ else:
+ self.read_only = False
+
+ elif mode == "RWF":
+ rc = self.acquire(force = True)
+ if rc.bad():
+ return rc
+ self.read_only = False
+ elif mode == "RO":
+ # no acquire on read only
+ rc = RC_OK()
+ self.read_only = True
+
+
+
+ self.connected = True
return RC_OK()
+
+ def is_read_only (self):
+ return self.read_only
+
def is_connected (self):
return self.connected and self.comm_link.is_connected
def disconnect(self):
- self.connected = False
+ # release any previous acquired ports
+ if self.is_connected():
+ self.release(self.get_acquired_ports())
+
self.comm_link.disconnect()
+ self.async_client.disconnect()
+
+ self.connected = False
+
return RC_OK()
+ def on_async_dead (self):
+ if self.connected:
+ msg = 'lost connection to server'
+ self.add_event_log(msg, 'local', True)
+ self.connected = False
+
+ def on_async_alive (self):
+ pass
########### cached queries (no server traffic) ###########
@@ -666,8 +426,8 @@ class CTRexStatelessClient(object):
else:
return port_ids
- def get_stats_async(self):
- return self._async_client.get_stats()
+ def get_stats_async (self):
+ return self.async_client.get_stats()
def get_connection_port (self):
return self.comm_link.port
@@ -675,6 +435,9 @@ class CTRexStatelessClient(object):
def get_connection_ip (self):
return self.comm_link.server
+ def get_all_ports (self):
+ return [port_id for port_id, port_obj in self.ports.iteritems()]
+
def get_acquired_ports(self):
return [port_id
for port_id, port_obj in self.ports.iteritems()
@@ -685,36 +448,60 @@ class CTRexStatelessClient(object):
for port_id, port_obj in self.ports.iteritems()
if port_obj.is_active()]
+ def get_paused_ports (self):
+ return [port_id
+ for port_id, port_obj in self.ports.iteritems()
+ if port_obj.is_paused()]
+
+ def get_transmitting_ports (self):
+ return [port_id
+ for port_id, port_obj in self.ports.iteritems()
+ if port_obj.is_transmitting()]
+
def set_verbose(self, mode):
- self.comm_link.set_verbose(mode)
+
+ # on high - enable link verbose
+ if mode == self.VERBOSE_HIGH:
+ self.comm_link.set_verbose(True)
+ else:
+ self.comm_link.set_verbose(False)
+
self.verbose = mode
+
+ def check_verbose (self, level):
+ return (self.verbose >= level)
+
+ def get_verbose (self):
+ return self.verbose
+
+ def prn_func (self, msg, level = VERBOSE_REGULAR):
+ if self.check_verbose(level):
+ print msg
+
############# server actions ################
# ping server
def ping(self):
- rc, info = self.transmit("ping")
- return RC(rc, info)
+ return self.transmit("ping")
- def sync_with_server(self, sync_streams=False):
- rc, data = self.transmit("sync_user", {"user": self.user, "sync_streams": sync_streams})
- if not rc:
- return RC_ERR(data)
-
- for port_info in data:
- rc = self.ports[port_info['port_id']].sync(port_info)
- if rc.bad():
- return rc
-
- return RC_OK()
def get_global_stats(self):
- rc, info = self.transmit("get_global_stats")
- return RC(rc, info)
+ return self.transmit("get_global_stats")
########## port commands ##############
+ def sync_ports (self, port_id_list = None, force = False):
+ port_id_list = self.__ports(port_id_list)
+
+ rc = RC()
+
+ for port_id in port_id_list:
+ rc.add(self.ports[port_id].sync())
+
+ return rc
+
# acquire ports, if port_list is none - get all
def acquire (self, port_id_list = None, force = False):
port_id_list = self.__ports(port_id_list)
@@ -733,7 +520,7 @@ class CTRexStatelessClient(object):
rc = RC()
for port_id in port_id_list:
- rc.add(self.ports[port_id].release(force))
+ rc.add(self.ports[port_id].release())
return rc
@@ -750,15 +537,16 @@ class CTRexStatelessClient(object):
return rc
+
def add_stream_pack(self, stream_pack_list, port_id_list = None):
port_id_list = self.__ports(port_id_list)
rc = RC()
- for stream_pack in stream_pack_list:
- rc.add(self.add_stream(stream_pack.stream_id, stream_pack.stream, port_id_list))
-
+ for port_id in port_id_list:
+ rc.add(self.ports[port_id].add_streams(stream_pack_list))
+
return rc
@@ -855,6 +643,17 @@ class CTRexStatelessClient(object):
return rc
+ def validate (self, port_id_list = None):
+ port_id_list = self.__ports(port_id_list)
+
+ rc = RC()
+
+ for port_id in port_id_list:
+ rc.add(self.ports[port_id].validate())
+
+ return rc
+
+
def get_port_stats(self, port_id=None):
pass
@@ -866,16 +665,19 @@ class CTRexStatelessClient(object):
return self.comm_link.transmit(method_name, params)
+ def transmit_batch(self, batch_list):
+ return self.comm_link.transmit_batch(batch_list)
######################### Console (high level) API #########################
+ @timing
def cmd_ping(self):
rc = self.ping()
rc.annotate("Pinging the server on '{0}' port '{1}': ".format(self.get_connection_ip(), self.get_connection_port()))
return rc
- def cmd_connect(self):
- rc = self.connect()
+ def cmd_connect(self, mode = "RW"):
+ rc = self.connect(mode)
rc.annotate()
return rc
@@ -886,13 +688,7 @@ class CTRexStatelessClient(object):
# reset
def cmd_reset(self):
-
-
- # sync with the server
- rc = self.sync_with_server()
- rc.annotate("Syncing with the server:")
- if rc.bad():
- return rc
+ #self.release(self.get_acquired_ports())
rc = self.acquire(force = True)
rc.annotate("Force acquiring all ports:")
@@ -924,7 +720,7 @@ class CTRexStatelessClient(object):
active_ports = list(set(self.get_active_ports()).intersection(port_id_list))
if not active_ports:
- msg = "No active traffic on porvided ports"
+ msg = "No active traffic on provided ports"
print format_text(msg, 'bold')
return RC_ERR(msg)
@@ -948,15 +744,26 @@ class CTRexStatelessClient(object):
rc = self.update_traffic(mult, active_ports)
rc.annotate("Updating traffic on port(s) {0}:".format(port_id_list))
- if rc.bad():
- return rc
- return RC_OK()
+ return rc
+ # clear stats
def cmd_clear(self, port_id_list):
+
for port_id in port_id_list:
self.ports[port_id].clear_stats()
+
self.global_stats.clear_stats()
+
+ return RC_OK()
+
+
+ def cmd_invalidate (self, port_id_list):
+ for port_id in port_id_list:
+ self.ports[port_id].invalidate_stats()
+
+ self.global_stats.invalidate()
+
return RC_OK()
# pause cmd
@@ -972,10 +779,8 @@ class CTRexStatelessClient(object):
rc = self.pause_traffic(active_ports)
rc.annotate("Pausing traffic on port(s) {0}:".format(port_id_list))
- if rc.bad():
- return rc
+ return rc
- return RC_OK()
# resume cmd
@@ -991,14 +796,11 @@ class CTRexStatelessClient(object):
rc = self.resume_traffic(active_ports)
rc.annotate("Resume traffic on port(s) {0}:".format(port_id_list))
- if rc.bad():
- return rc
-
- return RC_OK()
+ return rc
# start cmd
- def cmd_start (self, port_id_list, stream_list, mult, force, duration):
+ def cmd_start (self, port_id_list, stream_list, mult, force, duration, dry):
active_ports = list(set(self.get_active_ports()).intersection(port_id_list))
@@ -1020,19 +822,37 @@ class CTRexStatelessClient(object):
rc = self.add_stream_pack(stream_list.compiled, port_id_list)
- rc.annotate("Attaching streams to port(s) {0}:".format(port_id_list))
+ rc.annotate("Attaching {0} streams to port(s) {1}:".format(len(stream_list.compiled), port_id_list))
if rc.bad():
return rc
+ # when not on dry - start the traffic , otherwise validate only
+ if not dry:
+ rc = self.start_traffic(mult, duration, port_id_list)
+ rc.annotate("Starting traffic on port(s) {0}:".format(port_id_list))
- # finally, start the traffic
- rc = self.start_traffic(mult, duration, port_id_list)
- rc.annotate("Starting traffic on port(s) {0}:".format(port_id_list))
- if rc.bad():
return rc
+ else:
+ rc = self.validate(port_id_list)
+ rc.annotate("Validating traffic profile on port(s) {0}:".format(port_id_list))
- return RC_OK()
+ if rc.bad():
+ return rc
+ # show a profile on one port for illustration
+ self.ports[port_id_list[0]].print_profile(mult, duration)
+
+ return rc
+
+
+ # validate port(s) profile
+ def cmd_validate (self, port_id_list):
+ rc = self.validate(port_id_list)
+ rc.annotate("Validating streams on port(s) {0}:".format(port_id_list))
+ return rc
+
+
+ # stats
def cmd_stats(self, port_id_list, stats_mask=set()):
stats_opts = trex_stats.ALL_STATS_OPTS.intersection(stats_mask)
@@ -1043,6 +863,26 @@ class CTRexStatelessClient(object):
############## High Level API With Parser ################
+
+ def cmd_connect_line (self, line):
+ '''Connects to the TRex server'''
+ # define a parser
+ parser = parsing_opts.gen_parser(self,
+ "connect",
+ self.cmd_connect_line.__doc__,
+ parsing_opts.FORCE)
+
+ opts = parser.parse_args(line.split())
+
+ if opts is None:
+ return RC_ERR("bad command line parameters")
+
+ if opts.force:
+ rc = self.cmd_connect(mode = "RWF")
+ else:
+ rc = self.cmd_connect(mode = "RW")
+
+ @timing
def cmd_start_line (self, line):
'''Start selected traffic in specified ports on TRex\n'''
# define a parser
@@ -1054,13 +894,19 @@ class CTRexStatelessClient(object):
parsing_opts.FORCE,
parsing_opts.STREAM_FROM_PATH_OR_FILE,
parsing_opts.DURATION,
- parsing_opts.MULTIPLIER)
+ parsing_opts.MULTIPLIER_STRICT,
+ parsing_opts.DRY_RUN)
opts = parser.parse_args(line.split())
+
if opts is None:
return RC_ERR("bad command line parameters")
+
+ if opts.dry:
+ print format_text("\n*** DRY RUN ***", 'bold')
+
if opts.db:
stream_list = self.streams_db.get_stream_pack(opts.db)
rc = RC(stream_list != None)
@@ -1070,7 +916,15 @@ class CTRexStatelessClient(object):
else:
# load streams from file
- stream_list = self.streams_db.load_yaml_file(opts.file[0])
+ stream_list = None;
+ try:
+ stream_list = self.streams_db.load_yaml_file(opts.file[0])
+ except Exception as e:
+ s = str(e)
+ rc=RC_ERR(s)
+ rc.annotate()
+ return rc
+
rc = RC(stream_list != None)
rc.annotate("Load stream pack (from file):")
if stream_list == None:
@@ -1078,12 +932,13 @@ class CTRexStatelessClient(object):
# total has no meaning with percentage - its linear
- if opts.total and (mult['type'] != 'percentage'):
+ if opts.total and (opts.mult['type'] != 'percentage'):
# if total was set - divide it between the ports
- opts.mult['max'] = opts.mult['max'] / len(opts.ports)
+ opts.mult['value'] = opts.mult['value'] / len(opts.ports)
- return self.cmd_start(opts.ports, stream_list, opts.mult, opts.force, opts.duration)
+ return self.cmd_start(opts.ports, stream_list, opts.mult, opts.force, opts.duration, opts.dry)
+ @timing
def cmd_resume_line (self, line):
'''Resume active traffic in specified ports on TRex\n'''
parser = parsing_opts.gen_parser(self,
@@ -1097,6 +952,8 @@ class CTRexStatelessClient(object):
return self.cmd_resume(opts.ports)
+
+ @timing
def cmd_stop_line (self, line):
'''Stop active traffic in specified ports on TRex\n'''
parser = parsing_opts.gen_parser(self,
@@ -1110,6 +967,8 @@ class CTRexStatelessClient(object):
return self.cmd_stop(opts.ports)
+
+ @timing
def cmd_pause_line (self, line):
'''Pause active traffic in specified ports on TRex\n'''
parser = parsing_opts.gen_parser(self,
@@ -1123,6 +982,8 @@ class CTRexStatelessClient(object):
return self.cmd_pause(opts.ports)
+
+ @timing
def cmd_update_line (self, line):
'''Update port(s) speed currently active\n'''
parser = parsing_opts.gen_parser(self,
@@ -1139,14 +1000,15 @@ class CTRexStatelessClient(object):
# total has no meaning with percentage - its linear
if opts.total and (opts.mult['type'] != 'percentage'):
# if total was set - divide it between the ports
- opts.mult['max'] = opts.mult['max'] / len(opts.ports)
+ opts.mult['value'] = opts.mult['value'] / len(opts.ports)
return self.cmd_update(opts.ports, opts.mult)
-
+ @timing
def cmd_reset_line (self, line):
return self.cmd_reset()
+
def cmd_clear_line (self, line):
'''Clear cached local statistics\n'''
# define a parser
@@ -1161,6 +1023,7 @@ class CTRexStatelessClient(object):
return RC_ERR("bad command line parameters")
return self.cmd_clear(opts.ports)
+
def cmd_stats_line (self, line):
'''Fetch statistics from TRex server by port\n'''
# define a parser
@@ -1180,30 +1043,34 @@ class CTRexStatelessClient(object):
if not mask:
# set to show all stats if no filter was given
mask = trex_stats.ALL_STATS_OPTS
- # get stats objects, as dictionary
+
stats = self.cmd_stats(opts.ports, mask)
+
# print stats to screen
for stat_type, stat_data in stats.iteritems():
text_tables.print_table_with_header(stat_data.text_table, stat_type)
- return
-
- # if opts.db:
- # stream_list = self.streams_db.get_stream_pack(opts.db)
- # rc = RC(stream_list != None)
- # rc.annotate("Load stream pack (from DB):")
- # if rc.bad():
- # return RC_ERR("Failed to load stream pack")
- #
- # else:
- # # load streams from file
- # stream_list = self.streams_db.load_yaml_file(opts.file[0])
- # rc = RC(stream_list != None)
- # rc.annotate("Load stream pack (from file):")
- # if stream_list == None:
- # return RC_ERR("Failed to load stream pack")
- #
- #
- # return self.cmd_start(opts.ports, stream_list, opts.mult, opts.force, opts.duration)
+
+
+ return RC_OK()
+
+
+
+ @timing
+ def cmd_validate_line (self, line):
+ '''validates port(s) stream configuration\n'''
+
+ parser = parsing_opts.gen_parser(self,
+ "validate",
+ self.cmd_validate_line.__doc__,
+ parsing_opts.PORT_LIST_WITH_ALL)
+
+ opts = parser.parse_args(line.split())
+ if opts is None:
+ return RC_ERR("bad command line paramters")
+
+ rc = self.cmd_validate(opts.ports)
+ return rc
+
def cmd_exit_line (self, line):
print format_text("Exiting\n", 'bold')
@@ -1280,6 +1147,7 @@ class CTRexStatelessClient(object):
return True
+
#################################
# ------ private methods ------ #
@staticmethod
@@ -1294,17 +1162,18 @@ class CTRexStatelessClient(object):
def _filter_namespace_args(namespace, ok_values):
return {k: v for k, v in namespace.__dict__.items() if k in ok_values}
+
#################################
# ------ private classes ------ #
class CCommLink(object):
"""describes the connectivity of the stateless client method"""
- def __init__(self, server="localhost", port=5050, virtual=False):
+ def __init__(self, server="localhost", port=5050, virtual=False, prn_func = None):
super(CTRexStatelessClient.CCommLink, self).__init__()
self.virtual = virtual
self.server = server
self.port = port
self.verbose = False
- self.rpc_link = JsonRpcClient(self.server, self.port)
+ self.rpc_link = JsonRpcClient(self.server, self.port, prn_func)
@property
def is_connected(self):
@@ -1313,6 +1182,12 @@ class CTRexStatelessClient(object):
else:
return True
+ def get_server (self):
+ return self.server
+
+ def get_port (self):
+ return self.port
+
def set_verbose(self, mode):
self.verbose = mode
return self.rpc_link.set_verbose(mode)
@@ -1354,4 +1229,3 @@ class CTRexStatelessClient(object):
if __name__ == "__main__":
pass
-