path: root/scripts/automation/trex_control_plane/common
diff options
authorYaroslav Brustinov <>2015-12-13 17:18:02 +0200
committerYaroslav Brustinov <>2015-12-13 17:18:02 +0200
commit9738e267d806223ee25e013b5959ccac26c1a14a (patch)
tree590c8f329f2ab68c7da3f1f8f4c55f81243a08bc /scripts/automation/trex_control_plane/common
parenta573adc6395c9ad8d96978508a07a654ef48c7a9 (diff)
parent301341ddb1bf17387d7fea19667bedd40fce4509 (diff)
Merge branch 'master' into get_logs_and_version
Diffstat (limited to 'scripts/automation/trex_control_plane/common')
4 files changed, 563 insertions, 46 deletions
diff --git a/scripts/automation/trex_control_plane/common/ b/scripts/automation/trex_control_plane/common/
new file mode 100755
index 00000000..5a86149c
--- /dev/null
+++ b/scripts/automation/trex_control_plane/common/
@@ -0,0 +1,144 @@
+import json
+import re
+TEXT_CODES = {'bold': {'start': '\x1b[1m',
+ 'end': '\x1b[22m'},
+ 'cyan': {'start': '\x1b[36m',
+ 'end': '\x1b[39m'},
+ 'blue': {'start': '\x1b[34m',
+ 'end': '\x1b[39m'},
+ 'red': {'start': '\x1b[31m',
+ 'end': '\x1b[39m'},
+ 'magenta': {'start': '\x1b[35m',
+ 'end': '\x1b[39m'},
+ 'green': {'start': '\x1b[32m',
+ 'end': '\x1b[39m'},
+ 'yellow': {'start': '\x1b[33m',
+ 'end': '\x1b[39m'},
+ 'underline': {'start': '\x1b[4m',
+ 'end': '\x1b[24m'}}
+def format_num (size, suffix = ""):
+ for unit in ['','K','M','G','T','P']:
+ if abs(size) < 1000.0:
+ return "%3.2f %s%s" % (size, unit, suffix)
+ size /= 1000.0
+ return "NaN"
+def format_time (t_sec):
+ if t_sec < 0:
+ return "infinite"
+ if t_sec < 1:
+ # low numbers
+ for unit in ['ms', 'usec', 'ns']:
+ t_sec *= 1000.0
+ if t_sec >= 1.0:
+ return '{:,.2f} [{:}]'.format(t_sec, unit)
+ return "NaN"
+ else:
+ # seconds
+ if t_sec < 60.0:
+ return '{:,.2f} [{:}]'.format(t_sec, 'sec')
+ # minutes
+ t_sec /= 60.0
+ if t_sec < 60.0:
+ return '{:,.2f} [{:}]'.format(t_sec, 'minutes')
+ # hours
+ t_sec /= 60.0
+ if t_sec < 24.0:
+ return '{:,.2f} [{:}]'.format(t_sec, 'hours')
+ # days
+ t_sec /= 24.0
+ return '{:,.2f} [{:}]'.format(t_sec, 'days')
+def format_percentage (size):
+ return "%0.2f %%" % (size)
+def bold(text):
+ return text_attribute(text, 'bold')
+def cyan(text):
+ return text_attribute(text, 'cyan')
+def blue(text):
+ return text_attribute(text, 'blue')
+def red(text):
+ return text_attribute(text, 'red')
+def magenta(text):
+ return text_attribute(text, 'magenta')
+def green(text):
+ return text_attribute(text, 'green')
+def yellow(text):
+ return text_attribute(text, 'yellow')
+def underline(text):
+ return text_attribute(text, 'underline')
+def text_attribute(text, attribute):
+ return "{start}{txt}{stop}".format(start=TEXT_CODES[attribute]['start'],
+ txt=text,
+ stop=TEXT_CODES[attribute]['end'])
+FUNC_DICT = {'blue': blue,
+ 'bold': bold,
+ 'green': green,
+ 'yellow': yellow,
+ 'cyan': cyan,
+ 'magenta': magenta,
+ 'underline': underline,
+ 'red': red}
+def format_text(text, *args):
+ return_string = text
+ for i in args:
+ func = FUNC_DICT.get(i)
+ if func:
+ return_string = func(return_string)
+ return return_string
+# pretty print for JSON
+def pretty_json (json_str, use_colors = True):
+ pretty_str = json.dumps(json.loads(json_str), indent = 4, separators=(',', ': '), sort_keys = True)
+ if not use_colors:
+ return pretty_str
+ try:
+ # int numbers
+ pretty_str = re.sub(r'([ ]*:[ ]+)(\-?[1-9][0-9]*[^.])',r'\1{0}'.format(blue(r'\2')), pretty_str)
+ # float
+ pretty_str = re.sub(r'([ ]*:[ ]+)(\-?[1-9][0-9]*\.[0-9]+)',r'\1{0}'.format(magenta(r'\2')), pretty_str)
+ # # strings
+ #
+ pretty_str = re.sub(r'([ ]*:[ ]+)("[^"]*")',r'\1{0}'.format(red(r'\2')), pretty_str)
+ pretty_str = re.sub(r"('[^']*')", r'{0}\1{1}'.format(TEXT_CODES['magenta']['start'],
+ TEXT_CODES['red']['start']), pretty_str)
+ except :
+ pass
+ return pretty_str
+if __name__ == "__main__":
+ pass
diff --git a/scripts/automation/trex_control_plane/common/ b/scripts/automation/trex_control_plane/common/
index b7e768c1..2f6ea38d 100755
--- a/scripts/automation/trex_control_plane/common/
+++ b/scripts/automation/trex_control_plane/common/
@@ -1,59 +1,299 @@
+from collections import namedtuple, OrderedDict
+from client_utils import text_tables
+from common.text_opts import format_text
+from client.trex_async_client import CTRexAsyncStats
import copy
+import datetime
+import time
+import re
-class CTRexStatsManager(object):
+ExportableStats = namedtuple('ExportableStats', ['raw_data', 'text_table'])
- def __init__(self, *args):
- for stat_type in args:
- # register stat handler for each stats type
- setattr(self, stat_type, CTRexStatsManager.CSingleStatsHandler())
- def __getitem__(self, item):
- stats_obj = getattr(self, item)
- if stats_obj:
- return stats_obj.get_stats()
+class CTRexStatsGenerator(object):
+ """
+ This object is responsible of generating stats from objects maintained at
+ CTRexStatelessClient and the ports.
+ """
+ def __init__(self, global_stats_ref, ports_dict_ref):
+ self._global_stats = global_stats_ref
+ self._ports_dict = ports_dict_ref
+ def generate_single_statistic(self, port_id_list, statistic_type):
+ if statistic_type == GLOBAL_STATS:
+ return self._generate_global_stats()
+ elif statistic_type == PORT_STATS:
+ return self._generate_port_stats(port_id_list)
+ pass
+ elif statistic_type == PORT_STATUS:
+ return self._generate_port_status(port_id_list)
- return None
+ # ignore by returning empty object
+ return {}
- class CSingleStatsHandler(object):
+ def _generate_global_stats(self):
+ # stats_obj = self._async_stats.get_general_stats()
+ stats_data = self._global_stats.generate_stats()
- def __init__(self):
- self._stats = {}
+ # build table representation
+ stats_table = text_tables.TRexTextInfo()
+ stats_table.set_cols_align(["l", "l"])
+ stats_table.add_rows([[k.replace("_", " ").title(), v]
+ for k, v in stats_data.iteritems()],
+ header=False)
+ return {"global_statistics": ExportableStats(stats_data, stats_table)}
+ def _generate_port_stats(self, port_id_list):
+ relevant_ports = self.__get_relevant_ports(port_id_list)
+ return_stats_data = {}
+ per_field_stats = OrderedDict([("owner", []),
+ ("state", []),
+ ("--", []),
+ ("opackets", []),
+ ("obytes", []),
+ ("ipackets", []),
+ ("ibytes", []),
+ ("ierrors", []),
+ ("oerrors", []),
+ ("tx-bytes", []),
+ ("rx-bytes", []),
+ ("tx-pkts", []),
+ ("rx-pkts", []),
+ ("---", []),
+ ("Tx bps", []),
+ ("Rx bps", []),
+ ("----", []),
+ ("Tx pps", []),
+ ("Rx pps", [])
+ ]
+ )
+ for port_obj in relevant_ports:
+ # fetch port data
+ port_stats = port_obj.generate_port_stats()
+ # populate to data structures
+ return_stats_data[port_obj.port_id] = port_stats
+ self.__update_per_field_dict(port_stats, per_field_stats)
+ stats_table = text_tables.TRexTextTable()
+ stats_table.set_cols_align(["l"] + ["r"]*len(relevant_ports))
+ stats_table.set_cols_width([10] + [20] * len(relevant_ports))
+ stats_table.set_cols_dtype(['t'] + ['t'] * len(relevant_ports))
+ stats_table.add_rows([[k] + v
+ for k, v in per_field_stats.iteritems()],
+ header=False)
+ stats_table.header(["port"] + [port.port_id
+ for port in relevant_ports])
+ return {"port_statistics": ExportableStats(return_stats_data, stats_table)}
+ def _generate_port_status(self, port_id_list):
+ relevant_ports = self.__get_relevant_ports(port_id_list)
+ return_stats_data = {}
+ per_field_status = OrderedDict([("port-type", []),
+ ("maximum", []),
+ ("port-status", [])
+ ]
+ )
+ for port_obj in relevant_ports:
+ # fetch port data
+ # port_stats = self._async_stats.get_port_stats(port_obj.port_id)
+ port_status = port_obj.generate_port_status()
+ # populate to data structures
+ return_stats_data[port_obj.port_id] = port_status
+ self.__update_per_field_dict(port_status, per_field_status)
+ stats_table = text_tables.TRexTextTable()
+ stats_table.set_cols_align(["l"] + ["c"]*len(relevant_ports))
+ stats_table.set_cols_width([10] + [20] * len(relevant_ports))
+ stats_table.add_rows([[k] + v
+ for k, v in per_field_status.iteritems()],
+ header=False)
+ stats_table.header(["port"] + [port.port_id
+ for port in relevant_ports])
+ return {"port_status": ExportableStats(return_stats_data, stats_table)}
+ def __get_relevant_ports(self, port_id_list):
+ # fetch owned ports
+ ports = [port_obj
+ for _, port_obj in self._ports_dict.iteritems()
+ if port_obj.port_id in port_id_list]
+ # display only the first FOUR options, by design
+ if len(ports) > 4:
+ print format_text("[WARNING]: ", 'magenta', 'bold'), format_text("displaying up to 4 ports", 'magenta')
+ ports = ports[:4]
+ return ports
+ def __update_per_field_dict(self, dict_src_data, dict_dest_ref):
+ for key, val in dict_src_data.iteritems():
+ if key in dict_dest_ref:
+ dict_dest_ref[key].append(val)
- def update(self, obj_id, stats_obj):
- assert isinstance(stats_obj, CTRexStats)
- self._stats[obj_id] = stats_obj
- def get_stats(self, obj_id=None):
- if obj_id:
- return copy.copy(self._stats.pop(obj_id))
- else:
- return copy.copy(self._stats)
class CTRexStats(object):
- def __init__(self, **kwargs):
- for k, v in kwargs.items():
- setattr(self, k, v)
+ """ This is an abstract class to represent a stats object """
+ def __init__(self):
+ self.reference_stats = None
+ self.latest_stats = {}
+ self.last_update_ts = time.time()
+ def __getitem__(self, item):
+ # override this to allow quick and clean access to fields
+ if not item in self.latest_stats:
+ return "N/A"
+ # item must exist
+ m ='_(([a-z])ps)$', item)
+ if m:
+ # this is a non-relative item
+ unit =
+ if unit == "b":
+ return self.get(item, format=True, suffix="b/sec")
+ elif unit == "p":
+ return self.get(item, format=True, suffix="pkt/sec")
+ else:
+ return self.get(item, format=True,
+ m ='^[i|o](a-z+)$', item)
+ if m:
+ # this is a non-relative item
+ type =
+ if type == "bytes":
+ return self.get_rel(item, format=True, suffix="B")
+ elif type == "packets":
+ return self.get_rel(item, format=True, suffix="pkts")
+ else:
+ # do not format with suffix
+ return self.get_rel(item, format=True)
+ # can't match to any known pattern, return N/A
+ return "N/A"
+ @staticmethod
+ def format_num(size, suffix = ""):
+ for unit in ['','K','M','G','T','P']:
+ if abs(size) < 1000.0:
+ return "%3.2f %s%s" % (size, unit, suffix)
+ size /= 1000.0
+ return "NaN"
+ def generate_stats(self):
+ # must be implemented by designated classes (such as port/ global stats)
+ raise NotImplementedError()
+ def update(self, snapshot):
+ # update
+ self.latest_stats = snapshot
+ diff_time = time.time() - self.last_update_ts
+ # 3 seconds is too much - this is the new reference
+ if (self.reference_stats == None) or (diff_time > 3):
+ self.reference_stats = self.latest_stats
+ self.last_update_ts = time.time()
+ def clear_stats(self):
+ self.reference_stats = self.latest_stats
+ def get(self, field, format=False, suffix=""):
+ if not field in self.latest_stats:
+ return "N/A"
+ if not format:
+ return self.latest_stats[field]
+ else:
+ return self.format_num(self.latest_stats[field], suffix)
+ def get_rel(self, field, format=False, suffix=""):
+ if not field in self.latest_stats:
+ return "N/A"
+ if not format:
+ return (self.latest_stats[field] - self.reference_stats[field])
+ else:
+ return self.format_num(self.latest_stats[field] - self.reference_stats[field], suffix)
class CGlobalStats(CTRexStats):
- def __init__(self, **kwargs):
- super(CGlobalStats, self).__init__(kwargs)
- pass
+ pass
+ def __init__(self, connection_info, server_version, ports_dict_ref):
+ super(CGlobalStats, self).__init__()
+ self.connection_info = connection_info
+ self.server_version = server_version
+ self._ports_dict = ports_dict_ref
+ def generate_stats(self):
+ return OrderedDict([("connection", "{host}, Port {port}".format(host=self.connection_info.get("server"),
+ port=self.connection_info.get("sync_port"))),
+ ("version", "{ver}, UUID: {uuid}".format(ver=self.server_version.get("version", "N/A"),
+ uuid="N/A")),
+ ("cpu_util", "{0}%".format(self.get("m_cpu_util"))),
+ ("total_tx", self.get("m_tx_bps", format=True, suffix="b/sec")),
+ ("total_rx", self.get("m_rx_bps", format=True, suffix="b/sec")),
+ ("total_pps", self.format_num(self.get("m_tx_pps") + self.get("m_rx_pps"),
+ suffix="pkt/sec")),
+ ("total_streams", sum([len(port_obj.streams)
+ for _, port_obj in self._ports_dict.iteritems()])),
+ ("active_ports", sum([port_obj.is_active()
+ for _, port_obj in self._ports_dict.iteritems()]))
+ ]
+ )
class CPortStats(CTRexStats):
- def __init__(self, **kwargs):
- super(CPortStats, self).__init__(kwargs)
- pass
+ pass
+ def __init__(self, port_obj):
+ super(CPortStats, self).__init__()
+ self._port_obj = port_obj
+ def generate_stats(self):
+ return {"owner": self._port_obj.user,
+ "state": self._port_obj.get_port_state_name(),
+ "--": "",
+ "opackets" : self.get_rel("opackets"),
+ "obytes" : self.get_rel("obytes"),
+ "ipackets" : self.get_rel("ipackets"),
+ "ibytes" : self.get_rel("ibytes"),
+ "ierrors" : self.get_rel("ierrors"),
+ "oerrors" : self.get_rel("oerrors"),
+ "tx-bytes": self.get_rel("obytes", format = True, suffix = "B"),
+ "rx-bytes": self.get_rel("ibytes", format = True, suffix = "B"),
+ "tx-pkts": self.get_rel("opackets", format = True, suffix = "pkts"),
+ "rx-pkts": self.get_rel("ipackets", format = True, suffix = "pkts"),
+ "---": "",
+ "Tx bps": self.get("m_total_tx_bps", format = True, suffix = "bps"),
+ "Rx bps": self.get("m_total_rx_bps", format = True, suffix = "bps"),
+ "----": "",
+ "Tx pps": self.get("m_total_tx_pps", format = True, suffix = "pps"),
+ "Rx pps": self.get("m_total_rx_pps", format = True, suffix = "pps"),
+ }
-class CStreamStats(CTRexStats):
- def __init__(self, **kwargs):
- super(CStreamStats, self).__init__(kwargs)
- pass
if __name__ == "__main__":
diff --git a/scripts/automation/trex_control_plane/common/ b/scripts/automation/trex_control_plane/common/
index 783f2769..86eee1f4 100755
--- a/scripts/automation/trex_control_plane/common/
+++ b/scripts/automation/trex_control_plane/common/
@@ -10,20 +10,33 @@ import copy
import os
StreamPack = namedtuple('StreamPack', ['stream_id', 'stream'])
+LoadedStreamList = namedtuple('LoadedStreamList', ['loaded', 'compiled'])
class CStreamList(object):
def __init__(self):
- self.streams_list = {}
+ self.streams_list = OrderedDict()
self.yaml_loader = CTRexYAMLLoader(os.path.join(os.path.dirname(os.path.realpath(__file__)),
+ def generate_numbered_name (self, name):
+ prefix = name.rstrip('01234567890')
+ suffix = name[len(prefix):]
+ if suffix == "":
+ n = "_1"
+ else:
+ n = int(suffix) + 1
+ return prefix + str(n)
def append_stream(self, name, stream_obj):
assert isinstance(stream_obj, CStream)
- if name in self.streams_list:
- raise NameError("A stream with this name already exists on this list.")
+ # if name exists simply add numbered suffix to it
+ while name in self.streams_list:
+ name = self.generate_numbered_name(name)
- return
+ return name
def remove_stream(self, name):
popped = self.streams_list.pop(name)
@@ -48,6 +61,7 @@ class CStreamList(object):
streams_data = load_yaml_to_obj(file_path)
assert isinstance(streams_data, list)
+ new_streams_data = []
for stream in streams_data:
stream_name = stream.get("name")
raw_stream = stream.get("stream")
@@ -58,16 +72,18 @@ class CStreamList(object):
new_stream_data = self.yaml_loader.validate_yaml(raw_stream,
multiplier= multiplier)
+ new_streams_data.append(new_stream_data)
new_stream_obj = CStream()
self.append_stream(stream_name, new_stream_obj)
- return new_stream_data
+ return new_streams_data
def compile_streams(self):
# first, assign an id to each stream
stream_ids = {}
for idx, stream_name in enumerate(self.streams_list):
stream_ids[stream_name] = idx
# next, iterate over the streams and transform them from working with names to ids.
# with that build a new dict with old stream_name as the key, and StreamPack as the stored value
compiled_streams = {}
@@ -156,7 +172,6 @@ class CStream(object):
"""docstring for CStream"""
FIELDS = ["enabled", "self_start", "next_stream_id", "isg", "mode", "rx_stats", "packet", "vm"]
- # COMPILE_FIELDS = ["enabled", "self_start", "next_stream_id", "isg", "mode", "rx_stats", "packet", "vm"]
def __init__(self):
self.is_loaded = False
@@ -183,6 +198,7 @@ class CStream(object):
if isinstance(kwargs[k], CTRexPktBuilder):
if "vm" not in kwargs:
+ break # vm field check is skipped
raise ValueError("When providing packet object with a CTRexPktBuilder, vm parameter "
"should not be supplied")
@@ -226,8 +242,7 @@ class CStream(object):
- def dump(self, compilation=False):
- # fields = CStream.COMPILE_FIELDS if compilation else CStream.FIELDS
+ def dump(self):
if self.is_loaded:
dump = {}
for key in CStream.FIELDS:
@@ -239,10 +254,62 @@ class CStream(object):
raise RuntimeError("CStream object isn't loaded with data. Use 'load_data' method.")
- def dump_compiled(self):
- return self.dump(compilation=True)
+# describes a stream DB
+class CStreamsDB(object):
+ def __init__(self):
+ self.stream_packs = {}
+ def load_yaml_file(self, filename):
+ stream_pack_name = filename
+ if stream_pack_name in self.get_loaded_streams_names():
+ self.remove_stream_packs(stream_pack_name)
+ stream_list = CStreamList()
+ loaded_obj = stream_list.load_yaml(filename)
+ try:
+ compiled_streams = stream_list.compile_streams()
+ rc = self.load_streams(stream_pack_name,
+ LoadedStreamList(loaded_obj,
+ [StreamPack(v.stream_id,
+ for k, v in compiled_streams.items()]))
+ except Exception as e:
+ return None
+ return self.get_stream_pack(stream_pack_name)
+ def load_streams(self, name, LoadedStreamList_obj):
+ if name in self.stream_packs:
+ return False
+ else:
+ self.stream_packs[name] = LoadedStreamList_obj
+ return True
+ def remove_stream_packs(self, *names):
+ removed_streams = []
+ for name in names:
+ removed = self.stream_packs.pop(name)
+ if removed:
+ removed_streams.append(name)
+ return removed_streams
+ def clear(self):
+ self.stream_packs.clear()
+ def get_loaded_streams_names(self):
+ return self.stream_packs.keys()
+ def stream_pack_exists (self, name):
+ return name in self.get_loaded_streams_names()
+ def get_stream_pack(self, name):
+ if not self.stream_pack_exists(name):
+ return None
+ else:
+ return self.stream_packs.get(name)
-if __name__ == "__main__":
- pass
diff --git a/scripts/automation/trex_control_plane/common/ b/scripts/automation/trex_control_plane/common/
new file mode 100644
index 00000000..3de36e4c
--- /dev/null
+++ b/scripts/automation/trex_control_plane/common/
@@ -0,0 +1,66 @@
+from collections import namedtuple
+from common.text_opts import *
+RpcCmdData = namedtuple('RpcCmdData', ['method', 'params'])
+class RpcResponseStatus(namedtuple('RpcResponseStatus', ['success', 'id', 'msg'])):
+ __slots__ = ()
+ def __str__(self):
+ return "{id:^3} - {msg} ({stat})".format(,
+ msg=self.msg,
+ stat="success" if self.success else "fail")
+# simple class to represent complex return value
+class RC():
+ def __init__ (self, rc = None, data = None):
+ self.rc_list = []
+ if (rc != None) and (data != None):
+ tuple_rc = namedtuple('RC', ['rc', 'data'])
+ self.rc_list.append(tuple_rc(rc, data))
+ def add (self, rc):
+ self.rc_list += rc.rc_list
+ def good (self):
+ return all([x.rc for x in self.rc_list])
+ def bad (self):
+ return not self.good()
+ def data (self):
+ return [ if x.rc else "" for x in self.rc_list]
+ def err (self):
+ return [ if not x.rc else "" for x in self.rc_list]
+ def annotate (self, desc = None, show_status = True):
+ if desc:
+ print format_text('\n{:<60}'.format(desc), 'bold'),
+ else:
+ print ""
+ if self.bad():
+ # print all the errors
+ print ""
+ for x in self.rc_list:
+ if not x.rc:
+ print format_text("\n{0}".format(, 'bold')
+ print ""
+ if show_status:
+ print format_text("[FAILED]\n", 'red', 'bold')
+ else:
+ if show_status:
+ print format_text("[SUCCESS]\n", 'green', 'bold')
+def RC_OK(data = ""):
+ return RC(True, data)
+def RC_ERR (err):
+ return RC(False, err)