summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorimarom <imarom@cisco.com>2015-12-09 15:01:25 -0500
committerimarom <imarom@cisco.com>2015-12-09 15:36:16 -0500
commit95c2405d6373ca3c6b69efc3faf293cd41a55c76 (patch)
tree7aa6728202e8a0d0eb8d049d82bb7f8dada7ac00
parent1355327e97e6d5ce5800fa4d6f879695922e8637 (diff)
read only support
-rw-r--r--scripts/automation/trex_control_plane/client/trex_port.py410
-rwxr-xr-xscripts/automation/trex_control_plane/client/trex_stateless_client.py644
-rwxr-xr-xscripts/automation/trex_control_plane/common/trex_stats.py16
-rwxr-xr-xscripts/automation/trex_control_plane/common/trex_streams.py61
-rw-r--r--scripts/automation/trex_control_plane/common/trex_types.py66
-rwxr-xr-xscripts/automation/trex_control_plane/console/trex_console.py78
-rw-r--r--scripts/automation/trex_control_plane/console/trex_tui.py36
-rw-r--r--src/publisher/trex_publisher.h14
-rw-r--r--src/rpc-server/commands/trex_rpc_cmd_general.cpp57
-rw-r--r--src/rpc-server/commands/trex_rpc_cmds.h13
-rw-r--r--src/rpc-server/trex_rpc_cmd.cpp28
-rw-r--r--src/rpc-server/trex_rpc_cmd_api.h3
-rw-r--r--src/rpc-server/trex_rpc_cmds_table.cpp4
-rw-r--r--src/stateless/cp/trex_stateless_port.cpp101
-rw-r--r--src/stateless/cp/trex_stateless_port.h133
15 files changed, 967 insertions, 697 deletions
diff --git a/scripts/automation/trex_control_plane/client/trex_port.py b/scripts/automation/trex_control_plane/client/trex_port.py
new file mode 100644
index 00000000..68d89775
--- /dev/null
+++ b/scripts/automation/trex_control_plane/client/trex_port.py
@@ -0,0 +1,410 @@
+
+from collections import namedtuple
+from common.trex_types import *
+from common import trex_stats
+
+# describes a single port
+class Port(object):
+ STATE_DOWN = 0
+ STATE_IDLE = 1
+ STATE_STREAMS = 2
+ STATE_TX = 3
+ STATE_PAUSE = 4
+ PortState = namedtuple('PortState', ['state_id', 'state_name'])
+ STATES_MAP = {STATE_DOWN: "DOWN",
+ STATE_IDLE: "IDLE",
+ STATE_STREAMS: "IDLE",
+ STATE_TX: "ACTIVE",
+ STATE_PAUSE: "PAUSE"}
+
+
+ def __init__ (self, port_id, speed, driver, user, session_id, comm_link):
+ self.port_id = port_id
+ self.state = self.STATE_IDLE
+ self.handler = None
+ self.comm_link = comm_link
+ self.transmit = comm_link.transmit
+ self.transmit_batch = comm_link.transmit_batch
+ self.user = user
+ self.session_id = session_id
+ self.driver = driver
+ self.speed = speed
+ self.streams = {}
+ self.profile = None
+
+ self.port_stats = trex_stats.CPortStats(self)
+
+
+ def err(self, msg):
+ return RC_ERR("port {0} : {1}".format(self.port_id, msg))
+
+ def ok(self, data = "ACK"):
+ return RC_OK(data)
+
+ def get_speed_bps (self):
+ return (self.speed * 1000 * 1000 * 1000)
+
+ # take the port
+ def acquire(self, force = False):
+ params = {"port_id": self.port_id,
+ "user": self.user,
+ "session_id": self.session_id,
+ "force": force}
+
+ command = RpcCmdData("acquire", params)
+ rc = self.transmit(command.method, command.params)
+ if rc.success:
+ self.handler = rc.data
+ return self.ok()
+ else:
+ return self.err(rc.data)
+
+ # release the port
+ def release(self):
+ params = {"port_id": self.port_id,
+ "handler": self.handler}
+
+ command = RpcCmdData("release", params)
+ rc = self.transmit(command.method, command.params)
+ self.handler = None
+
+ if rc.success:
+ return self.ok()
+ else:
+ return self.err(rc.data)
+
+ def is_acquired(self):
+ return (self.handler != None)
+
+ def is_active(self):
+ return(self.state == self.STATE_TX ) or (self.state == self.STATE_PAUSE)
+
+ def is_transmitting (self):
+ return (self.state == self.STATE_TX)
+
+ def is_paused (self):
+ return (self.state == self.STATE_PAUSE)
+
+
+ def sync(self):
+ params = {"port_id": self.port_id}
+
+ command = RpcCmdData("get_port_status", params)
+ rc = self.transmit(command.method, command.params)
+ if not rc.success:
+ return self.err(rc.data)
+
+ # sync the port
+ port_state = rc.data['state']
+
+ if port_state == "DOWN":
+ self.state = self.STATE_DOWN
+ elif port_state == "IDLE":
+ self.state = self.STATE_IDLE
+ elif port_state == "STREAMS":
+ self.state = self.STATE_STREAMS
+ elif port_state == "TX":
+ self.state = self.STATE_TX
+ elif port_state == "PAUSE":
+ self.state = self.STATE_PAUSE
+ else:
+ raise Exception("port {0}: bad state received from server '{1}'".format(self.port_id, sync_data['state']))
+
+ return self.ok()
+
+
+ # return TRUE if write commands
+ def is_port_writable (self):
+ # operations on port can be done on state idle or state streams
+ return ((self.state == self.STATE_IDLE) or (self.state == self.STATE_STREAMS))
+
+ # add stream to the port
+ def add_stream (self, stream_id, stream_obj):
+
+ if not self.is_port_writable():
+ return self.err("Please stop port before attempting to add streams")
+
+
+ params = {"handler": self.handler,
+ "port_id": self.port_id,
+ "stream_id": stream_id,
+ "stream": stream_obj}
+
+ rc, data = self.transmit("add_stream", params)
+ if not rc:
+ r = self.err(data)
+ print r.good()
+
+ # add the stream
+ self.streams[stream_id] = stream_obj
+
+ # the only valid state now
+ self.state = self.STATE_STREAMS
+
+ return self.ok()
+
+ # add multiple streams
+ def add_streams (self, streams_list):
+ batch = []
+
+ for stream in streams_list:
+ params = {"handler": self.handler,
+ "port_id": self.port_id,
+ "stream_id": stream.stream_id,
+ "stream": stream.stream}
+
+ cmd = RpcCmdData('add_stream', params)
+ batch.append(cmd)
+
+ rc, data = self.transmit_batch(batch)
+
+ if not rc:
+ return self.err(data)
+
+ # add the stream
+ for stream in streams_list:
+ self.streams[stream.stream_id] = stream.stream
+
+ # the only valid state now
+ self.state = self.STATE_STREAMS
+
+ return self.ok()
+
+ # remove stream from port
+ def remove_stream (self, stream_id):
+
+ if not stream_id in self.streams:
+ return self.err("stream {0} does not exists".format(stream_id))
+
+ params = {"handler": self.handler,
+ "port_id": self.port_id,
+ "stream_id": stream_id}
+
+
+ rc, data = self.transmit("remove_stream", params)
+ if not rc:
+ return self.err(data)
+
+ self.streams[stream_id] = None
+
+ self.state = self.STATE_STREAMS if len(self.streams > 0) else self.STATE_IDLE
+
+ return self.ok()
+
+ # remove all the streams
+ def remove_all_streams (self):
+
+ params = {"handler": self.handler,
+ "port_id": self.port_id}
+
+ rc, data = self.transmit("remove_all_streams", params)
+ if not rc:
+ return self.err(data)
+
+ self.streams = {}
+
+ self.state = self.STATE_IDLE
+
+ return self.ok()
+
+ # get a specific stream
+ def get_stream (self, stream_id):
+ if stream_id in self.streams:
+ return self.streams[stream_id]
+ else:
+ return None
+
+ def get_all_streams (self):
+ return self.streams
+
+ # start traffic
+ def start (self, mul, duration):
+ if self.state == self.STATE_DOWN:
+ return self.err("Unable to start traffic - port is down")
+
+ if self.state == self.STATE_IDLE:
+ return self.err("Unable to start traffic - no streams attached to port")
+
+ if self.state == self.STATE_TX:
+ return self.err("Unable to start traffic - port is already transmitting")
+
+ params = {"handler": self.handler,
+ "port_id": self.port_id,
+ "mul": mul,
+ "duration": duration}
+
+ rc, data = self.transmit("start_traffic", params)
+ if not rc:
+ return self.err(data)
+
+ self.state = self.STATE_TX
+
+ return self.ok()
+
+ # stop traffic
+ # with force ignores the cached state and sends the command
+ def stop (self, force = False):
+
+ if (not force) and (self.state != self.STATE_TX) and (self.state != self.STATE_PAUSE):
+ return self.err("port is not transmitting")
+
+ params = {"handler": self.handler,
+ "port_id": self.port_id}
+
+ rc, data = self.transmit("stop_traffic", params)
+ if not rc:
+ return self.err(data)
+
+ # only valid state after stop
+ self.state = self.STATE_STREAMS
+
+ return self.ok()
+
+ def pause (self):
+
+ if (self.state != self.STATE_TX) :
+ return self.err("port is not transmitting")
+
+ params = {"handler": self.handler,
+ "port_id": self.port_id}
+
+ rc, data = self.transmit("pause_traffic", params)
+ if not rc:
+ return self.err(data)
+
+ # only valid state after stop
+ self.state = self.STATE_PAUSE
+
+ return self.ok()
+
+
+ def resume (self):
+
+ if (self.state != self.STATE_PAUSE) :
+ return self.err("port is not in pause mode")
+
+ params = {"handler": self.handler,
+ "port_id": self.port_id}
+
+ rc, data = self.transmit("resume_traffic", params)
+ if not rc:
+ return self.err(data)
+
+ # only valid state after stop
+ self.state = self.STATE_TX
+
+ return self.ok()
+
+
+ def update (self, mul):
+ if (self.state != self.STATE_TX) :
+ return self.err("port is not transmitting")
+
+ params = {"handler": self.handler,
+ "port_id": self.port_id,
+ "mul": mul}
+
+ rc, data = self.transmit("update_traffic", params)
+ if not rc:
+ return self.err(data)
+
+ return self.ok()
+
+
+ def validate (self):
+
+ if (self.state == self.STATE_DOWN):
+ return self.err("port is down")
+
+ if (self.state == self.STATE_IDLE):
+ return self.err("no streams attached to port")
+
+ params = {"handler": self.handler,
+ "port_id": self.port_id}
+
+ rc, data = self.transmit("validate", params)
+ if not rc:
+ return self.err(data)
+
+ self.profile = data
+
+ return self.ok()
+
+ def get_profile (self):
+ return self.profile
+
+
+ def print_profile (self, mult, duration):
+ if not self.get_profile():
+ return
+
+ rate = self.get_profile()['rate']
+ graph = self.get_profile()['graph']
+
+ print format_text("Profile Map Per Port\n", 'underline', 'bold')
+
+ factor = mult_to_factor(mult, rate['max_bps'], rate['max_pps'], rate['max_line_util'])
+
+ print "Profile max BPS (base / req): {:^12} / {:^12}".format(format_num(rate['max_bps'], suffix = "bps"),
+ format_num(rate['max_bps'] * factor, suffix = "bps"))
+
+ print "Profile max PPS (base / req): {:^12} / {:^12}".format(format_num(rate['max_pps'], suffix = "pps"),
+ format_num(rate['max_pps'] * factor, suffix = "pps"),)
+
+ print "Profile line util. (base / req): {:^12} / {:^12}".format(format_percentage(rate['max_line_util'] * 100),
+ format_percentage(rate['max_line_util'] * factor * 100))
+
+
+ # duration
+ exp_time_base_sec = graph['expected_duration'] / (1000 * 1000)
+ exp_time_factor_sec = exp_time_base_sec / factor
+
+ # user configured a duration
+ if duration > 0:
+ if exp_time_factor_sec > 0:
+ exp_time_factor_sec = min(exp_time_factor_sec, duration)
+ else:
+ exp_time_factor_sec = duration
+
+
+ print "Duration (base / req): {:^12} / {:^12}".format(format_time(exp_time_base_sec),
+ format_time(exp_time_factor_sec))
+ print "\n"
+
+
+ def get_port_state_name(self):
+ return self.STATES_MAP.get(self.state, "Unknown")
+
+ ################# stats handler ######################
+ def generate_port_stats(self):
+ return self.port_stats.generate_stats()
+ pass
+
+ def generate_port_status(self):
+ return {"port-type": self.driver,
+ "maximum": "{speed} Gb/s".format(speed=self.speed),
+ "port-status": self.get_port_state_name()
+ }
+
+ def clear_stats(self):
+ return self.port_stats.clear_stats()
+
+
+ ################# events handler ######################
+ def async_event_port_stopped (self):
+ self.state = self.STATE_STREAMS
+
+
+ def async_event_port_started (self):
+ self.state = self.STATE_TX
+
+
+ def async_event_port_paused (self):
+ self.state = self.STATE_PAUSE
+
+
+ def async_event_port_resumed (self):
+ self.state = self.STATE_TX
+
+ def async_event_forced_acquired (self):
+ self.handler = None
diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py
index db0ed5bf..43ebea9d 100755
--- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py
+++ b/scripts/automation/trex_control_plane/client/trex_stateless_client.py
@@ -19,68 +19,12 @@ from client_utils import parsing_opts, text_tables
import time
import datetime
import re
+import random
+from trex_port import Port
+from common.trex_types import *
from trex_async_client import CTRexAsyncClient
-RpcCmdData = namedtuple('RpcCmdData', ['method', 'params'])
-
-class RpcResponseStatus(namedtuple('RpcResponseStatus', ['success', 'id', 'msg'])):
- __slots__ = ()
- def __str__(self):
- return "{id:^3} - {msg} ({stat})".format(id=self.id,
- msg=self.msg,
- stat="success" if self.success else "fail")
-
-# simple class to represent complex return value
-class RC():
-
- def __init__ (self, rc = None, data = None):
- self.rc_list = []
-
- if (rc != None) and (data != None):
- tuple_rc = namedtuple('RC', ['rc', 'data'])
- self.rc_list.append(tuple_rc(rc, data))
-
- def add (self, rc):
- self.rc_list += rc.rc_list
-
- def good (self):
- return all([x.rc for x in self.rc_list])
-
- def bad (self):
- return not self.good()
-
- def data (self):
- return [x.data if x.rc else "" for x in self.rc_list]
-
- def err (self):
- return [x.data if not x.rc else "" for x in self.rc_list]
-
- def annotate (self, desc = None):
- if desc:
- print format_text('\n{:<60}'.format(desc), 'bold'),
-
- if self.bad():
- # print all the errors
- print ""
- for x in self.rc_list:
- if not x.rc:
- print format_text("\n{0}".format(x.data), 'bold')
-
- print ""
- print format_text("[FAILED]\n", 'red', 'bold')
-
-
- else:
- print format_text("[SUCCESS]\n", 'green', 'bold')
-
-
-def RC_OK(data = ""):
- return RC(True, data)
-def RC_ERR (err):
- return RC(False, err)
-
-LoadedStreamList = namedtuple('LoadedStreamList', ['loaded', 'compiled'])
########## utlity ############
def mult_to_factor (mult, max_bps, max_pps, line_util):
@@ -98,444 +42,6 @@ def mult_to_factor (mult, max_bps, max_pps, line_util):
-# describes a stream DB
-class CStreamsDB(object):
-
- def __init__(self):
- self.stream_packs = {}
-
- def load_yaml_file(self, filename):
-
- stream_pack_name = filename
- if stream_pack_name in self.get_loaded_streams_names():
- self.remove_stream_packs(stream_pack_name)
-
- stream_list = CStreamList()
- loaded_obj = stream_list.load_yaml(filename)
-
- try:
- compiled_streams = stream_list.compile_streams()
- rc = self.load_streams(stream_pack_name,
- LoadedStreamList(loaded_obj,
- [StreamPack(v.stream_id, v.stream.dump())
- for k, v in compiled_streams.items()]))
-
- except Exception as e:
- return None
-
- return self.get_stream_pack(stream_pack_name)
-
- def load_streams(self, name, LoadedStreamList_obj):
- if name in self.stream_packs:
- return False
- else:
- self.stream_packs[name] = LoadedStreamList_obj
- return True
-
- def remove_stream_packs(self, *names):
- removed_streams = []
- for name in names:
- removed = self.stream_packs.pop(name)
- if removed:
- removed_streams.append(name)
- return removed_streams
-
- def clear(self):
- self.stream_packs.clear()
-
- def get_loaded_streams_names(self):
- return self.stream_packs.keys()
-
- def stream_pack_exists (self, name):
- return name in self.get_loaded_streams_names()
-
- def get_stream_pack(self, name):
- if not self.stream_pack_exists(name):
- return None
- else:
- return self.stream_packs.get(name)
-
-
-# describes a single port
-class Port(object):
- STATE_DOWN = 0
- STATE_IDLE = 1
- STATE_STREAMS = 2
- STATE_TX = 3
- STATE_PAUSE = 4
- PortState = namedtuple('PortState', ['state_id', 'state_name'])
- STATES_MAP = {STATE_DOWN: "DOWN",
- STATE_IDLE: "IDLE",
- STATE_STREAMS: "STREAMS",
- STATE_TX: "ACTIVE",
- STATE_PAUSE: "PAUSE"}
-
-
- def __init__ (self, port_id, speed, driver, user, comm_link):
- self.port_id = port_id
- self.state = self.STATE_IDLE
- self.handler = None
- self.comm_link = comm_link
- self.transmit = comm_link.transmit
- self.transmit_batch = comm_link.transmit_batch
- self.user = user
- self.driver = driver
- self.speed = speed
- self.streams = {}
- self.profile = None
-
- self.port_stats = trex_stats.CPortStats(self)
-
- def err(self, msg):
- return RC_ERR("port {0} : {1}".format(self.port_id, msg))
-
- def ok(self, data = "ACK"):
- return RC_OK(data)
-
- def get_speed_bps (self):
- return (self.speed * 1000 * 1000 * 1000)
-
- # take the port
- def acquire(self, force = False):
- params = {"port_id": self.port_id,
- "user": self.user,
- "force": force}
-
- command = RpcCmdData("acquire", params)
- rc = self.transmit(command.method, command.params)
- if rc.success:
- self.handler = rc.data
- return self.ok()
- else:
- return self.err(rc.data)
-
- # release the port
- def release(self):
- params = {"port_id": self.port_id,
- "handler": self.handler}
-
- command = RpcCmdData("release", params)
- rc = self.transmit(command.method, command.params)
- if rc.success:
- self.handler = rc.data
- return self.ok()
- else:
- return self.err(rc.data)
-
- def is_acquired(self):
- return (self.handler != None)
-
- def is_active(self):
- return(self.state == self.STATE_TX ) or (self.state == self.STATE_PAUSE)
-
- def is_transmitting (self):
- return (self.state == self.STATE_TX)
-
- def is_paused (self):
- return (self.state == self.STATE_PAUSE)
-
-
- def sync(self, sync_data):
- self.handler = sync_data['handler']
- port_state = sync_data['state'].upper()
- if port_state == "DOWN":
- self.state = self.STATE_DOWN
- elif port_state == "IDLE":
- self.state = self.STATE_IDLE
- elif port_state == "STREAMS":
- self.state = self.STATE_STREAMS
- elif port_state == "TX":
- self.state = self.STATE_TX
- elif port_state == "PAUSE":
- self.state = self.STATE_PAUSE
- else:
- raise Exception("port {0}: bad state received from server '{1}'".format(self.port_id, sync_data['state']))
-
- return self.ok()
-
-
- # return TRUE if write commands
- def is_port_writable (self):
- # operations on port can be done on state idle or state streams
- return ((self.state == self.STATE_IDLE) or (self.state == self.STATE_STREAMS))
-
- # add stream to the port
- def add_stream (self, stream_id, stream_obj):
-
- if not self.is_port_writable():
- return self.err("Please stop port before attempting to add streams")
-
-
- params = {"handler": self.handler,
- "port_id": self.port_id,
- "stream_id": stream_id,
- "stream": stream_obj}
-
- rc, data = self.transmit("add_stream", params)
- if not rc:
- r = self.err(data)
- print r.good()
-
- # add the stream
- self.streams[stream_id] = stream_obj
-
- # the only valid state now
- self.state = self.STATE_STREAMS
-
- return self.ok()
-
- # add multiple streams
- def add_streams (self, streams_list):
- batch = []
-
- for stream in streams_list:
- params = {"handler": self.handler,
- "port_id": self.port_id,
- "stream_id": stream.stream_id,
- "stream": stream.stream}
-
- cmd = RpcCmdData('add_stream', params)
- batch.append(cmd)
-
- rc, data = self.transmit_batch(batch)
-
- if not rc:
- return self.err(data)
-
- # add the stream
- for stream in streams_list:
- self.streams[stream.stream_id] = stream.stream
-
- # the only valid state now
- self.state = self.STATE_STREAMS
-
- return self.ok()
-
- # remove stream from port
- def remove_stream (self, stream_id):
-
- if not stream_id in self.streams:
- return self.err("stream {0} does not exists".format(stream_id))
-
- params = {"handler": self.handler,
- "port_id": self.port_id,
- "stream_id": stream_id}
-
-
- rc, data = self.transmit("remove_stream", params)
- if not rc:
- return self.err(data)
-
- self.streams[stream_id] = None
-
- self.state = self.STATE_STREAMS if len(self.streams > 0) else self.STATE_IDLE
-
- return self.ok()
-
- # remove all the streams
- def remove_all_streams (self):
-
- params = {"handler": self.handler,
- "port_id": self.port_id}
-
- rc, data = self.transmit("remove_all_streams", params)
- if not rc:
- return self.err(data)
-
- self.streams = {}
-
- self.state = self.STATE_IDLE
-
- return self.ok()
-
- # get a specific stream
- def get_stream (self, stream_id):
- if stream_id in self.streams:
- return self.streams[stream_id]
- else:
- return None
-
- def get_all_streams (self):
- return self.streams
-
- # start traffic
- def start (self, mul, duration):
- if self.state == self.STATE_DOWN:
- return self.err("Unable to start traffic - port is down")
-
- if self.state == self.STATE_IDLE:
- return self.err("Unable to start traffic - no streams attached to port")
-
- if self.state == self.STATE_TX:
- return self.err("Unable to start traffic - port is already transmitting")
-
- params = {"handler": self.handler,
- "port_id": self.port_id,
- "mul": mul,
- "duration": duration}
-
- rc, data = self.transmit("start_traffic", params)
- if not rc:
- return self.err(data)
-
- self.state = self.STATE_TX
-
- return self.ok()
-
- # stop traffic
- # with force ignores the cached state and sends the command
- def stop (self, force = False):
-
- if (not force) and (self.state != self.STATE_TX) and (self.state != self.STATE_PAUSE):
- return self.err("port is not transmitting")
-
- params = {"handler": self.handler,
- "port_id": self.port_id}
-
- rc, data = self.transmit("stop_traffic", params)
- if not rc:
- return self.err(data)
-
- # only valid state after stop
- self.state = self.STATE_STREAMS
-
- return self.ok()
-
- def pause (self):
-
- if (self.state != self.STATE_TX) :
- return self.err("port is not transmitting")
-
- params = {"handler": self.handler,
- "port_id": self.port_id}
-
- rc, data = self.transmit("pause_traffic", params)
- if not rc:
- return self.err(data)
-
- # only valid state after stop
- self.state = self.STATE_PAUSE
-
- return self.ok()
-
-
- def resume (self):
-
- if (self.state != self.STATE_PAUSE) :
- return self.err("port is not in pause mode")
-
- params = {"handler": self.handler,
- "port_id": self.port_id}
-
- rc, data = self.transmit("resume_traffic", params)
- if not rc:
- return self.err(data)
-
- # only valid state after stop
- self.state = self.STATE_TX
-
- return self.ok()
-
-
- def update (self, mul):
- if (self.state != self.STATE_TX) :
- return self.err("port is not transmitting")
-
- params = {"handler": self.handler,
- "port_id": self.port_id,
- "mul": mul}
-
- rc, data = self.transmit("update_traffic", params)
- if not rc:
- return self.err(data)
-
- return self.ok()
-
-
- def validate (self):
-
- if (self.state == self.STATE_DOWN):
- return self.err("port is down")
-
- if (self.state == self.STATE_IDLE):
- return self.err("no streams attached to port")
-
- params = {"handler": self.handler,
- "port_id": self.port_id}
-
- rc, data = self.transmit("validate", params)
- if not rc:
- return self.err(data)
-
- self.profile = data
-
- return self.ok()
-
- def get_profile (self):
- return self.profile
-
-
- def print_profile (self, mult, duration):
- if not self.get_profile():
- return
-
- rate = self.get_profile()['rate']
- graph = self.get_profile()['graph']
-
- print format_text("Profile Map Per Port\n", 'underline', 'bold')
-
- factor = mult_to_factor(mult, rate['max_bps'], rate['max_pps'], rate['max_line_util'])
-
- print "Profile max BPS (base / req): {:^12} / {:^12}".format(format_num(rate['max_bps'], suffix = "bps"),
- format_num(rate['max_bps'] * factor, suffix = "bps"))
-
- print "Profile max PPS (base / req): {:^12} / {:^12}".format(format_num(rate['max_pps'], suffix = "pps"),
- format_num(rate['max_pps'] * factor, suffix = "pps"),)
-
- print "Profile line util. (base / req): {:^12} / {:^12}".format(format_percentage(rate['max_line_util'] * 100),
- format_percentage(rate['max_line_util'] * factor * 100))
-
-
- # duration
- exp_time_base_sec = graph['expected_duration'] / (1000 * 1000)
- exp_time_factor_sec = exp_time_base_sec / factor
-
- # user configured a duration
- if duration > 0:
- if exp_time_factor_sec > 0:
- exp_time_factor_sec = min(exp_time_factor_sec, duration)
- else:
- exp_time_factor_sec = duration
-
-
- print "Duration (base / req): {:^12} / {:^12}".format(format_time(exp_time_base_sec),
- format_time(exp_time_factor_sec))
- print "\n"
-
-
- def get_port_state_name(self):
- return self.STATES_MAP.get(self.state, "Unknown")
-
- ################# stats handler ######################
- def generate_port_stats(self):
- return self.port_stats.generate_stats()
- pass
-
- def generate_port_status(self):
- return {"port-type": self.driver,
- "maximum": "{speed} Gb/s".format(speed=self.speed),
- "port-status": self.get_port_state_name()
- }
-
- def clear_stats(self):
- return self.port_stats.clear_stats()
-
-
- ################# events handler ######################
- def async_event_port_stopped (self):
- self.state = self.STATE_STREAMS
-
-
class CTRexStatelessClient(object):
"""docstring for CTRexStatelessClient"""
@@ -546,7 +52,10 @@ class CTRexStatelessClient(object):
def __init__(self, username, server="localhost", sync_port = 5050, async_port = 4500, virtual=False):
super(CTRexStatelessClient, self).__init__()
+
self.user = username
+ self.session_id = random.getrandbits(32)
+
self.comm_link = CTRexStatelessClient.CCommLink(server, sync_port, virtual)
# default verbose level
@@ -571,15 +80,24 @@ class CTRexStatelessClient(object):
self.events = []
+
+ self.read_only = False
self.connected = False
+ # when the client gets out
+ def shutdown (self):
+ self.release(self.get_acquired_ports())
+
# returns the port object
def get_port (self, port_id):
return self.ports.get(port_id, None)
+ def get_server (self):
+ return self.comm_link.get_server()
+
################# events handler ######################
def add_event_log (self, msg, ev_type, show = False):
@@ -634,6 +152,7 @@ class CTRexStatelessClient(object):
if (type == 0):
port_id = int(data['port_id'])
ev = "Port {0} has started".format(port_id)
+ self.async_event_port_started(port_id)
# port stopped
elif (type == 1):
@@ -644,21 +163,47 @@ class CTRexStatelessClient(object):
self.async_event_port_stopped(port_id)
- # server stopped
+ # port paused
elif (type == 2):
- ev = "Server has stopped"
- self.async_event_server_stopped()
- show_event = True
+ port_id = int(data['port_id'])
+ ev = "Port {0} has paused".format(port_id)
- # port finished traffic
+ # call the handler
+ self.async_event_port_paused(port_id)
+
+ # port resumed
elif (type == 3):
port_id = int(data['port_id'])
+ ev = "Port {0} has resumed".format(port_id)
+
+ # call the handler
+ self.async_event_port_resumed(port_id)
+
+ # port finished traffic
+ elif (type == 4):
+ port_id = int(data['port_id'])
ev = "Port {0} job done".format(port_id)
# call the handler
self.async_event_port_stopped(port_id)
show_event = True
+ # port was stolen...
+ elif (type == 5):
+ port_id = int(data['port_id'])
+ ev = "Port {0} was forcely taken".format(port_id)
+
+ # call the handler
+ self.async_event_port_forced_acquired(port_id)
+ show_event = True
+
+ # server stopped
+ elif (type == 100):
+ ev = "Server has stopped"
+ self.async_event_server_stopped()
+ show_event = True
+
+
else:
# unknown event - ignore
return
@@ -670,6 +215,23 @@ class CTRexStatelessClient(object):
def async_event_port_stopped (self, port_id):
self.ports[port_id].async_event_port_stopped()
+
+ def async_event_port_started (self, port_id):
+ self.ports[port_id].async_event_port_started()
+
+
+ def async_event_port_paused (self, port_id):
+ self.ports[port_id].async_event_port_paused()
+
+
+ def async_event_port_resumed (self, port_id):
+ self.ports[port_id].async_event_port_resumed()
+
+
+ def async_event_port_forced_acquired (self, port_id):
+ self.ports[port_id].async_event_forced_acquired()
+ self.read_only = True
+
def async_event_server_stopped (self):
self.connected = False
@@ -732,7 +294,7 @@ class CTRexStatelessClient(object):
############ boot up section ################
# connection sequence
- def connect(self):
+ def connect(self, force = False):
# clear this flag
self.connected = False
@@ -773,21 +335,30 @@ class CTRexStatelessClient(object):
speed = self.system_info['ports'][port_id]['speed']
driver = self.system_info['ports'][port_id]['driver']
- self.ports[port_id] = Port(port_id, speed, driver, self.user, self.comm_link)
+ self.ports[port_id] = Port(port_id, speed, driver, self.user, self.session_id, self.comm_link)
- # acquire all ports
- rc = self.acquire()
+ # sync the ports
+ rc = self.sync_ports()
if rc.bad():
return rc
- rc = self.sync_with_server()
+ # acquire all ports
+ rc = self.acquire(force = force)
if rc.bad():
- return rc
+ # release all the succeeded ports and set as read only
+ self.release(self.get_acquired_ports())
+ self.read_only = True
+ else:
+ self.read_only = False
+
self.connected = True
+ return rc
- return RC_OK()
+
+ def is_read_only (self):
+ return self.read_only
def is_connected (self):
return self.connected and self.comm_link.is_connected
@@ -838,6 +409,9 @@ class CTRexStatelessClient(object):
def get_connection_ip (self):
return self.comm_link.server
+ def get_all_ports (self):
+ return [port_id for port_id, port_obj in self.ports.iteritems()]
+
def get_acquired_ports(self):
return [port_id
for port_id, port_obj in self.ports.iteritems()
@@ -883,17 +457,6 @@ class CTRexStatelessClient(object):
return RC(rc, info)
- def sync_with_server(self, sync_streams=False):
- rc, data = self.transmit("sync_user", {"user": self.user, "sync_streams": sync_streams})
- if not rc:
- return RC_ERR(data)
-
- for port_info in data:
- rc = self.ports[port_info['port_id']].sync(port_info)
- if rc.bad():
- return rc
-
- return RC_OK()
def get_global_stats(self):
rc, info = self.transmit("get_global_stats")
@@ -901,6 +464,16 @@ class CTRexStatelessClient(object):
########## port commands ##############
+ def sync_ports (self, port_id_list = None, force = False):
+ port_id_list = self.__ports(port_id_list)
+
+ rc = RC()
+
+ for port_id in port_id_list:
+ rc.add(self.ports[port_id].sync())
+
+ return rc
+
# acquire ports, if port_list is none - get all
def acquire (self, port_id_list = None, force = False):
port_id_list = self.__ports(port_id_list)
@@ -919,7 +492,7 @@ class CTRexStatelessClient(object):
rc = RC()
for port_id in port_id_list:
- rc.add(self.ports[port_id].release(force))
+ rc.add(self.ports[port_id].release())
return rc
@@ -1075,8 +648,8 @@ class CTRexStatelessClient(object):
rc.annotate("Pinging the server on '{0}' port '{1}': ".format(self.get_connection_ip(), self.get_connection_port()))
return rc
- def cmd_connect(self):
- rc = self.connect()
+ def cmd_connect(self, force):
+ rc = self.connect(force)
rc.annotate()
return rc
@@ -1089,12 +662,6 @@ class CTRexStatelessClient(object):
def cmd_reset(self):
- # sync with the server
- rc = self.sync_with_server()
- rc.annotate("Syncing with the server:")
- if rc.bad():
- return rc
-
rc = self.acquire(force = True)
rc.annotate("Force acquiring all ports:")
if rc.bad():
@@ -1260,6 +827,22 @@ class CTRexStatelessClient(object):
############## High Level API With Parser ################
+
+ def cmd_connect_line (self, line):
+ '''Connects to the TRex server'''
+ # define a parser
+ parser = parsing_opts.gen_parser(self,
+ "connect",
+ self.cmd_connect_line.__doc__,
+ parsing_opts.FORCE)
+
+ opts = parser.parse_args(line.split())
+
+ if opts is None:
+ return RC_ERR("bad command line parameters")
+
+ return self.cmd_connect(opts.force)
+
@timing
def cmd_start_line (self, line):
'''Start selected traffic in specified ports on TRex\n'''
@@ -1551,6 +1134,9 @@ class CTRexStatelessClient(object):
else:
return True
+ def get_server (self):
+ return self.server
+
def set_verbose(self, mode):
self.verbose = mode
return self.rpc_link.set_verbose(mode)
diff --git a/scripts/automation/trex_control_plane/common/trex_stats.py b/scripts/automation/trex_control_plane/common/trex_stats.py
index 671a0656..2f6ea38d 100755
--- a/scripts/automation/trex_control_plane/common/trex_stats.py
+++ b/scripts/automation/trex_control_plane/common/trex_stats.py
@@ -5,6 +5,7 @@ from common.text_opts import format_text
from client.trex_async_client import CTRexAsyncStats
import copy
import datetime
+import time
import re
GLOBAL_STATS = 'g'
@@ -134,7 +135,8 @@ class CTRexStatsGenerator(object):
# fetch owned ports
ports = [port_obj
for _, port_obj in self._ports_dict.iteritems()
- if port_obj.is_acquired() and port_obj.port_id in port_id_list]
+ if port_obj.port_id in port_id_list]
+
# display only the first FOUR options, by design
if len(ports) > 4:
print format_text("[WARNING]: ", 'magenta', 'bold'), format_text("displaying up to 4 ports", 'magenta')
@@ -155,7 +157,7 @@ class CTRexStats(object):
def __init__(self):
self.reference_stats = None
self.latest_stats = {}
- self.last_update_ts = datetime.datetime.now()
+ self.last_update_ts = time.time()
def __getitem__(self, item):
@@ -204,13 +206,16 @@ class CTRexStats(object):
def update(self, snapshot):
# update
- self.last_update_ts = datetime.datetime.now()
-
self.latest_stats = snapshot
- if self.reference_stats == None:
+ diff_time = time.time() - self.last_update_ts
+
+ # 3 seconds is too much - this is the new reference
+ if (self.reference_stats == None) or (diff_time > 3):
self.reference_stats = self.latest_stats
+ self.last_update_ts = time.time()
+
def clear_stats(self):
self.reference_stats = self.latest_stats
@@ -225,6 +230,7 @@ class CTRexStats(object):
def get_rel(self, field, format=False, suffix=""):
if not field in self.latest_stats:
return "N/A"
+
if not format:
return (self.latest_stats[field] - self.reference_stats[field])
else:
diff --git a/scripts/automation/trex_control_plane/common/trex_streams.py b/scripts/automation/trex_control_plane/common/trex_streams.py
index 89de7286..86eee1f4 100755
--- a/scripts/automation/trex_control_plane/common/trex_streams.py
+++ b/scripts/automation/trex_control_plane/common/trex_streams.py
@@ -10,6 +10,7 @@ import copy
import os
StreamPack = namedtuple('StreamPack', ['stream_id', 'stream'])
+LoadedStreamList = namedtuple('LoadedStreamList', ['loaded', 'compiled'])
class CStreamList(object):
@@ -254,5 +255,61 @@ class CStream(object):
raise RuntimeError("CStream object isn't loaded with data. Use 'load_data' method.")
-if __name__ == "__main__":
- pass
+
+# describes a stream DB
+class CStreamsDB(object):
+
+ def __init__(self):
+ self.stream_packs = {}
+
+ def load_yaml_file(self, filename):
+
+ stream_pack_name = filename
+ if stream_pack_name in self.get_loaded_streams_names():
+ self.remove_stream_packs(stream_pack_name)
+
+ stream_list = CStreamList()
+ loaded_obj = stream_list.load_yaml(filename)
+
+ try:
+ compiled_streams = stream_list.compile_streams()
+ rc = self.load_streams(stream_pack_name,
+ LoadedStreamList(loaded_obj,
+ [StreamPack(v.stream_id, v.stream.dump())
+ for k, v in compiled_streams.items()]))
+
+ except Exception as e:
+ return None
+
+ return self.get_stream_pack(stream_pack_name)
+
+ def load_streams(self, name, LoadedStreamList_obj):
+ if name in self.stream_packs:
+ return False
+ else:
+ self.stream_packs[name] = LoadedStreamList_obj
+ return True
+
+ def remove_stream_packs(self, *names):
+ removed_streams = []
+ for name in names:
+ removed = self.stream_packs.pop(name)
+ if removed:
+ removed_streams.append(name)
+ return removed_streams
+
+ def clear(self):
+ self.stream_packs.clear()
+
+ def get_loaded_streams_names(self):
+ return self.stream_packs.keys()
+
+ def stream_pack_exists (self, name):
+ return name in self.get_loaded_streams_names()
+
+ def get_stream_pack(self, name):
+ if not self.stream_pack_exists(name):
+ return None
+ else:
+ return self.stream_packs.get(name)
+
diff --git a/scripts/automation/trex_control_plane/common/trex_types.py b/scripts/automation/trex_control_plane/common/trex_types.py
new file mode 100644
index 00000000..3de36e4c
--- /dev/null
+++ b/scripts/automation/trex_control_plane/common/trex_types.py
@@ -0,0 +1,66 @@
+
+from collections import namedtuple
+from common.text_opts import *
+
+RpcCmdData = namedtuple('RpcCmdData', ['method', 'params'])
+
+class RpcResponseStatus(namedtuple('RpcResponseStatus', ['success', 'id', 'msg'])):
+ __slots__ = ()
+ def __str__(self):
+ return "{id:^3} - {msg} ({stat})".format(id=self.id,
+ msg=self.msg,
+ stat="success" if self.success else "fail")
+
+# simple class to represent complex return value
+class RC():
+
+ def __init__ (self, rc = None, data = None):
+ self.rc_list = []
+
+ if (rc != None) and (data != None):
+ tuple_rc = namedtuple('RC', ['rc', 'data'])
+ self.rc_list.append(tuple_rc(rc, data))
+
+ def add (self, rc):
+ self.rc_list += rc.rc_list
+
+ def good (self):
+ return all([x.rc for x in self.rc_list])
+
+ def bad (self):
+ return not self.good()
+
+ def data (self):
+ return [x.data if x.rc else "" for x in self.rc_list]
+
+ def err (self):
+ return [x.data if not x.rc else "" for x in self.rc_list]
+
+ def annotate (self, desc = None, show_status = True):
+ if desc:
+ print format_text('\n{:<60}'.format(desc), 'bold'),
+ else:
+ print ""
+
+ if self.bad():
+ # print all the errors
+ print ""
+ for x in self.rc_list:
+ if not x.rc:
+ print format_text("\n{0}".format(x.data), 'bold')
+
+ print ""
+ if show_status:
+ print format_text("[FAILED]\n", 'red', 'bold')
+
+
+ else:
+ if show_status:
+ print format_text("[SUCCESS]\n", 'green', 'bold')
+
+
+def RC_OK(data = ""):
+ return RC(True, data)
+def RC_ERR (err):
+ return RC(False, err)
+
diff --git a/scripts/automation/trex_control_plane/console/trex_console.py b/scripts/automation/trex_control_plane/console/trex_console.py
index 9140977a..495e1c22 100755
--- a/scripts/automation/trex_control_plane/console/trex_console.py
+++ b/scripts/automation/trex_control_plane/console/trex_console.py
@@ -130,12 +130,17 @@ class TRexConsole(TRexGeneralCmd):
################### internal section ########################
+
def verify_connected(f):
@wraps(f)
def wrap(*args):
inst = args[0]
+ func_name = f.__name__
+ if func_name.startswith("do_"):
+ func_name = func_name[3:]
+
if not inst.stateless_client.is_connected():
- print format_text("\nNot connected to server\n", 'bold')
+ print format_text("\n'{0}' cannot be executed on offline mode\n".format(func_name), 'bold')
return
ret = f(*args)
@@ -143,9 +148,32 @@ class TRexConsole(TRexGeneralCmd):
return wrap
+ # TODO: remove this ugly duplication
+ def verify_connected_and_rw (f):
+ @wraps(f)
+ def wrap(*args):
+ inst = args[0]
+ func_name = f.__name__
+ if func_name.startswith("do_"):
+ func_name = func_name[3:]
+
+ if not inst.stateless_client.is_connected():
+ print format_text("\n'{0}' cannot be executed on offline mode\n".format(func_name), 'bold')
+ return
+
+ if inst.stateless_client.is_read_only():
+ print format_text("\n'{0}' cannot be executed on read only mode\n".format(func_name), 'bold')
+ return
+
+ ret = f(*args)
+ return ret
+
+ return wrap
+
+
def get_console_identifier(self):
return "{context}_{server}".format(context=self.__class__.__name__,
- server=self.stateless_client.get_system_info()['hostname'])
+ server=self.stateless_client.get_server())
def register_main_console_methods(self):
main_names = set(self.trex_console.get_names()).difference(set(dir(self.__class__)))
@@ -156,11 +184,17 @@ class TRexConsole(TRexGeneralCmd):
def postcmd(self, stop, line):
- if self.stateless_client.is_connected():
- self.prompt = "TRex > "
- else:
- self.supported_rpc = None
+ if not self.stateless_client.is_connected():
self.prompt = "TRex (offline) > "
+ self.supported_rpc = None
+ return stop
+
+ if self.stateless_client.is_read_only():
+ self.prompt = "TRex (read only) > "
+ return stop
+
+
+ self.prompt = "TRex > "
return stop
@@ -287,7 +321,7 @@ class TRexConsole(TRexGeneralCmd):
def do_connect (self, line):
'''Connects to the server\n'''
- rc = self.stateless_client.cmd_connect()
+ rc = self.stateless_client.cmd_connect_line(line)
if rc.bad():
return
@@ -314,7 +348,7 @@ class TRexConsole(TRexGeneralCmd):
if (l > 2) and (s[l - 2] in file_flags):
return TRexConsole.tree_autocomplete(s[l - 1])
- @verify_connected
+ @verify_connected_and_rw
def do_start(self, line):
'''Start selected traffic in specified port(s) on TRex\n'''
@@ -325,7 +359,7 @@ class TRexConsole(TRexGeneralCmd):
self.do_start("-h")
############# stop
- @verify_connected
+ @verify_connected_and_rw
def do_stop(self, line):
'''stops port(s) transmitting traffic\n'''
@@ -335,7 +369,7 @@ class TRexConsole(TRexGeneralCmd):
self.do_stop("-h")
############# update
- @verify_connected
+ @verify_connected_and_rw
def do_update(self, line):
'''update speed of port(s)currently transmitting traffic\n'''
@@ -345,14 +379,14 @@ class TRexConsole(TRexGeneralCmd):
self.do_update("-h")
############# pause
- @verify_connected
+ @verify_connected_and_rw
def do_pause(self, line):
'''pause port(s) transmitting traffic\n'''
self.stateless_client.cmd_pause_line(line)
############# resume
- @verify_connected
+ @verify_connected_and_rw
def do_resume(self, line):
'''resume port(s) transmitting traffic\n'''
@@ -361,7 +395,7 @@ class TRexConsole(TRexGeneralCmd):
########## reset
- @verify_connected
+ @verify_connected_and_rw
def do_reset (self, line):
'''force stop all ports\n'''
self.stateless_client.cmd_reset_line(line)
@@ -375,6 +409,7 @@ class TRexConsole(TRexGeneralCmd):
self.stateless_client.cmd_validate_line(line)
+ @verify_connected
def do_stats(self, line):
'''Fetch statistics from TRex server by port\n'''
self.stateless_client.cmd_stats_line(line)
@@ -383,6 +418,7 @@ class TRexConsole(TRexGeneralCmd):
def help_stats(self):
self.do_stats("-h")
+ @verify_connected
def do_clear(self, line):
'''Clear cached local statistics\n'''
self.stateless_client.cmd_clear_line(line)
@@ -529,9 +565,17 @@ def main():
# Stateless client connection
stateless_client = CTRexStatelessClient(options.user, options.server, options.port, options.pub)
- rc = stateless_client.cmd_connect()
+
+ print "\nlogged as {0}".format(format_text(options.user, 'bold'))
+ rc = stateless_client.connect()
+
+ # error can be either no able to connect or a read only
if rc.bad():
- return
+ if not stateless_client.is_connected():
+ rc.annotate()
+ else:
+ rc.annotate(show_status = False)
+
if options.batch:
cont = stateless_client.run_script_file(options.batch[0])
@@ -544,7 +588,9 @@ def main():
console.cmdloop()
except KeyboardInterrupt as e:
print "\n\n*** Caught Ctrl + C... Exiting...\n\n"
- return
+
+ finally:
+ stateless_client.shutdown()
if __name__ == '__main__':
main()
diff --git a/scripts/automation/trex_control_plane/console/trex_tui.py b/scripts/automation/trex_control_plane/console/trex_tui.py
index c44efe15..3ddf7a7f 100644
--- a/scripts/automation/trex_control_plane/console/trex_tui.py
+++ b/scripts/automation/trex_control_plane/console/trex_tui.py
@@ -40,7 +40,7 @@ class TrexTUIDashBoard(TrexTUIPanel):
self.key_actions['+'] = {'action': self.action_raise, 'legend': 'up 5%', 'show': True}
self.key_actions['-'] = {'action': self.action_lower, 'legend': 'low 5%', 'show': True}
- self.ports = self.stateless_client.get_acquired_ports()
+ self.ports = self.stateless_client.get_all_ports()
def show (self):
@@ -55,6 +55,10 @@ class TrexTUIDashBoard(TrexTUIPanel):
allowed['c'] = self.key_actions['c']
+ # thats it for read only
+ if self.stateless_client.is_read_only():
+ return allowed
+
if len(self.stateless_client.get_transmitting_ports()) > 0:
allowed['p'] = self.key_actions['p']
allowed['+'] = self.key_actions['+']
@@ -69,10 +73,10 @@ class TrexTUIDashBoard(TrexTUIPanel):
######### actions
def action_pause (self):
- rc = self.stateless_client.pause_traffic(self.mng.acquired_ports)
+ rc = self.stateless_client.pause_traffic(self.mng.ports)
ports_succeeded = []
- for rc_single, port_id in zip(rc.rc_list, self.mng.acquired_ports):
+ for rc_single, port_id in zip(rc.rc_list, self.mng.ports):
if rc_single.rc:
ports_succeeded.append(port_id)
@@ -83,10 +87,10 @@ class TrexTUIDashBoard(TrexTUIPanel):
def action_resume (self):
- rc = self.stateless_client.resume_traffic(self.mng.acquired_ports)
+ rc = self.stateless_client.resume_traffic(self.mng.ports)
ports_succeeded = []
- for rc_single, port_id in zip(rc.rc_list, self.mng.acquired_ports):
+ for rc_single, port_id in zip(rc.rc_list, self.mng.ports):
if rc_single.rc:
ports_succeeded.append(port_id)
@@ -98,10 +102,10 @@ class TrexTUIDashBoard(TrexTUIPanel):
def action_raise (self):
mul = {'type': 'percentage', 'value': 5, 'op': 'add'}
- rc = self.stateless_client.update_traffic(mul, self.mng.acquired_ports)
+ rc = self.stateless_client.update_traffic(mul, self.mng.ports)
ports_succeeded = []
- for rc_single, port_id in zip(rc.rc_list, self.mng.acquired_ports):
+ for rc_single, port_id in zip(rc.rc_list, self.mng.ports):
if rc_single.rc:
ports_succeeded.append(port_id)
@@ -112,10 +116,10 @@ class TrexTUIDashBoard(TrexTUIPanel):
def action_lower (self):
mul = {'type': 'percentage', 'value': 5, 'op': 'sub'}
- rc = self.stateless_client.update_traffic(mul, self.mng.acquired_ports)
+ rc = self.stateless_client.update_traffic(mul, self.mng.ports)
ports_succeeded = []
- for rc_single, port_id in zip(rc.rc_list, self.mng.acquired_ports):
+ for rc_single, port_id in zip(rc.rc_list, self.mng.ports):
if rc_single.rc:
ports_succeeded.append(port_id)
@@ -126,7 +130,7 @@ class TrexTUIDashBoard(TrexTUIPanel):
def action_clear (self):
- self.stateless_client.cmd_clear(self.mng.acquired_ports)
+ self.stateless_client.cmd_clear(self.mng.ports)
return "cleared all stats"
@@ -148,7 +152,6 @@ class TrexTUIPort(TrexTUIPanel):
def show (self):
-
stats = self.stateless_client.cmd_stats([self.port_id], trex_stats.COMPACT)
# print stats to screen
for stat_type, stat_data in stats.iteritems():
@@ -160,6 +163,10 @@ class TrexTUIPort(TrexTUIPanel):
allowed['c'] = self.key_actions['c']
+ # thats it for read only
+ if self.stateless_client.is_read_only():
+ return allowed
+
if self.port.state == self.port.STATE_TX:
allowed['p'] = self.key_actions['p']
allowed['+'] = self.key_actions['+']
@@ -232,7 +239,7 @@ class TrexTUIPanelManager():
def __init__ (self, tui):
self.tui = tui
self.stateless_client = tui.stateless_client
- self.acquired_ports = self.stateless_client.get_acquired_ports()
+ self.ports = self.stateless_client.get_all_ports()
self.panels = {}
@@ -242,7 +249,7 @@ class TrexTUIPanelManager():
self.key_actions['q'] = {'action': self.action_quit, 'legend': 'quit', 'show': True}
self.key_actions['g'] = {'action': self.action_show_dash, 'legend': 'dashboard', 'show': True}
- for port_id in self.acquired_ports:
+ for port_id in self.ports:
self.key_actions[str(port_id)] = {'action': self.action_show_port(port_id), 'legend': 'port {0}'.format(port_id), 'show': False}
self.panels['port {0}'.format(port_id)] = TrexTUIPort(self, port_id)
@@ -263,7 +270,7 @@ class TrexTUIPanelManager():
x = "'{0}' - {1}, ".format(k, v['legend'])
self.legend += "{:}".format(x)
- self.legend += "'0-{0}' - port display".format(len(self.acquired_ports) - 1)
+ self.legend += "'0-{0}' - port display".format(len(self.ports) - 1)
self.legend += "\n{:<12}".format(self.main_panel.get_name() + ":")
@@ -282,6 +289,7 @@ class TrexTUIPanelManager():
self.generate_legend()
def show (self):
+ print self.ports
self.main_panel.show()
self.print_legend()
self.log.show()
diff --git a/src/publisher/trex_publisher.h b/src/publisher/trex_publisher.h
index 8d1be064..bd4392f7 100644
--- a/src/publisher/trex_publisher.h
+++ b/src/publisher/trex_publisher.h
@@ -39,10 +39,16 @@ public:
void publish_json(const std::string &s);
enum event_type_e {
- EVENT_PORT_STARTED = 0,
- EVENT_PORT_STOPPED = 1,
- EVENT_SERVER_STOPPED = 2,
- EVENT_PORT_FINISHED_TX = 3,
+ EVENT_PORT_STARTED = 0,
+ EVENT_PORT_STOPPED = 1,
+ EVENT_PORT_PAUSED = 2,
+ EVENT_PORT_RESUMED = 3,
+ EVENT_PORT_FINISHED_TX = 4,
+ EVENT_PORT_FORCE_ACQUIRED = 5,
+
+ EVENT_SERVER_STOPPED = 100,
+
+
};
void publish_event(event_type_e type, const Json::Value &data = Json::nullValue);
diff --git a/src/rpc-server/commands/trex_rpc_cmd_general.cpp b/src/rpc-server/commands/trex_rpc_cmd_general.cpp
index 5cea055c..6c239bf3 100644
--- a/src/rpc-server/commands/trex_rpc_cmd_general.cpp
+++ b/src/rpc-server/commands/trex_rpc_cmd_general.cpp
@@ -198,10 +198,6 @@ TrexRpcCmdGetSysInfo::_run(const Json::Value &params, Json::Value &result) {
}
- section["ports"][i]["owner"] = port->get_owner();
-
- section["ports"][i]["status"] = port->get_state_as_string();
-
}
return (TREX_RPC_CMD_OK);
@@ -224,7 +220,7 @@ TrexRpcCmdGetOwner::_run(const Json::Value &params, Json::Value &result) {
uint8_t port_id = parse_port(params, result);
TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id);
- section["owner"] = port->get_owner();
+ section["owner"] = port->get_owner().get_name();
return (TREX_RPC_CMD_OK);
}
@@ -238,19 +234,20 @@ TrexRpcCmdAcquire::_run(const Json::Value &params, Json::Value &result) {
uint8_t port_id = parse_port(params, result);
- const string &new_owner = parse_string(params, "user", result);
+ const string &new_owner = parse_string(params, "user", result);
+ uint32_t session_id = parse_uint32(params, "session_id", result);
bool force = parse_bool(params, "force", result);
/* if not free and not you and not force - fail */
TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id);
try {
- port->acquire(new_owner, force);
+ port->acquire(new_owner, session_id, force);
} catch (const TrexRpcException &ex) {
generate_execute_err(result, ex.what());
}
- result["result"] = port->get_owner_handler();
+ result["result"] = port->get_owner().get_handler();
return (TREX_RPC_CMD_OK);
}
@@ -288,8 +285,6 @@ TrexRpcCmdGetPortStats::_run(const Json::Value &params, Json::Value &result) {
TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id);
- result["result"]["status"] = port->get_state_as_string();
-
try {
port->encode_stats(result["result"]);
} catch (const TrexRpcException &ex) {
@@ -300,41 +295,25 @@ TrexRpcCmdGetPortStats::_run(const Json::Value &params, Json::Value &result) {
}
/**
- * request the server a sync about a specific user
+ * fetch the port status
+ *
+ * @author imarom (09-Dec-15)
+ *
+ * @param params
+ * @param result
*
+ * @return trex_rpc_cmd_rc_e
*/
trex_rpc_cmd_rc_e
-TrexRpcCmdSyncUser::_run(const Json::Value &params, Json::Value &result) {
-
- const string &user = parse_string(params, "user", result);
- bool sync_streams = parse_bool(params, "sync_streams", result);
-
- result["result"] = Json::arrayValue;
-
- for (auto port : get_stateless_obj()->get_port_list()) {
- if (port->get_owner() == user) {
-
- Json::Value owned_port;
+TrexRpcCmdGetPortStatus::_run(const Json::Value &params, Json::Value &result) {
+ uint8_t port_id = parse_port(params, result);
- owned_port["port_id"] = port->get_port_id();
- owned_port["handler"] = port->get_owner_handler();
- owned_port["state"] = port->get_state_as_string();
-
- /* if sync streams was asked - sync all the streams */
- if (sync_streams) {
- owned_port["streams"] = Json::arrayValue;
+ TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id);
- std::vector <TrexStream *> streams;
- port->get_object_list(streams);
+ result["result"]["owner"] = (port->get_owner().is_free() ? "" : port->get_owner().get_name());
+ result["result"]["state"] = port->get_state_as_string();
- for (auto stream : streams) {
- owned_port["streams"].append(stream->get_stream_json());
- }
- }
-
- result["result"].append(owned_port);
- }
- }
return (TREX_RPC_CMD_OK);
}
+
diff --git a/src/rpc-server/commands/trex_rpc_cmds.h b/src/rpc-server/commands/trex_rpc_cmds.h
index 80bef3b0..b9be1fbe 100644
--- a/src/rpc-server/commands/trex_rpc_cmds.h
+++ b/src/rpc-server/commands/trex_rpc_cmds.h
@@ -70,14 +70,15 @@ void get_hostname(std::string &hostname);
* ownership
*/
TREX_RPC_CMD_DEFINE(TrexRpcCmdGetOwner, "get_owner", 1, false);
-TREX_RPC_CMD_DEFINE(TrexRpcCmdAcquire, "acquire", 3, false);
+TREX_RPC_CMD_DEFINE(TrexRpcCmdAcquire, "acquire", 4, false);
TREX_RPC_CMD_DEFINE(TrexRpcCmdRelease, "release", 1, true);
/**
* port commands
*/
-TREX_RPC_CMD_DEFINE(TrexRpcCmdGetPortStats, "get_port_stats", 1, true);
+TREX_RPC_CMD_DEFINE(TrexRpcCmdGetPortStats, "get_port_stats", 1, false);
+TREX_RPC_CMD_DEFINE(TrexRpcCmdGetPortStatus, "get_port_status", 1, false);
/**
@@ -98,10 +99,10 @@ void parse_vm_instr_write_flow_var(const Json::Value &inst, TrexStream *stream,
);
-TREX_RPC_CMD_DEFINE(TrexRpcCmdGetStreamList, "get_stream_list", 1, true);
-TREX_RPC_CMD_DEFINE(TrexRpcCmdGetAllStreams, "get_all_streams", 2, true);
+TREX_RPC_CMD_DEFINE(TrexRpcCmdGetStreamList, "get_stream_list", 1, false);
+TREX_RPC_CMD_DEFINE(TrexRpcCmdGetAllStreams, "get_all_streams", 2, false);
-TREX_RPC_CMD_DEFINE(TrexRpcCmdGetStream, "get_stream", 3, true);
+TREX_RPC_CMD_DEFINE(TrexRpcCmdGetStream, "get_stream", 3, false);
@@ -112,8 +113,6 @@ TREX_RPC_CMD_DEFINE(TrexRpcCmdResumeTraffic, "resume_traffic", 1, true);
TREX_RPC_CMD_DEFINE(TrexRpcCmdUpdateTraffic, "update_traffic", 2, true);
-TREX_RPC_CMD_DEFINE(TrexRpcCmdSyncUser, "sync_user", 2, false);
-
TREX_RPC_CMD_DEFINE(TrexRpcCmdValidate, "validate", 2, false);
#endif /* __TREX_RPC_CMD_H__ */
diff --git a/src/rpc-server/trex_rpc_cmd.cpp b/src/rpc-server/trex_rpc_cmd.cpp
index af0db3f4..d4eef1f7 100644
--- a/src/rpc-server/trex_rpc_cmd.cpp
+++ b/src/rpc-server/trex_rpc_cmd.cpp
@@ -63,8 +63,12 @@ TrexRpcCommand::verify_ownership(const Json::Value &params, Json::Value &result)
TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id);
- if (!port->verify_owner_handler(handler)) {
- generate_execute_err(result, "invalid handler provided. please pass the handler given when calling 'acquire' or take ownership");
+ if (port->get_owner().is_free()) {
+ generate_execute_err(result, "please acquire the port before modifying port state");
+ }
+
+ if (!port->get_owner().verify(handler)) {
+ generate_execute_err(result, "port is not owned by you or your current executing session");
}
}
@@ -92,6 +96,8 @@ TrexRpcCommand::type_to_str(field_type_e type) {
return "byte";
case FIELD_TYPE_UINT16:
return "uint16";
+ case FIELD_TYPE_UINT32:
+ return "uint32";
case FIELD_TYPE_BOOL:
return "bool";
case FIELD_TYPE_INT:
@@ -161,6 +167,18 @@ TrexRpcCommand::parse_uint16(const Json::Value &parent, int index, Json::Value &
return parent[index].asUInt();
}
+uint32_t
+TrexRpcCommand::parse_uint32(const Json::Value &parent, const std::string &name, Json::Value &result) {
+ check_field_type(parent, name, FIELD_TYPE_UINT32, result);
+ return parent[name].asUInt();
+}
+
+uint32_t
+TrexRpcCommand::parse_uint32(const Json::Value &parent, int index, Json::Value &result) {
+ check_field_type(parent, index, FIELD_TYPE_UINT32, result);
+ return parent[index].asUInt();
+}
+
int
TrexRpcCommand::parse_int(const Json::Value &parent, const std::string &name, Json::Value &result) {
check_field_type(parent, name, FIELD_TYPE_INT, result);
@@ -250,6 +268,12 @@ TrexRpcCommand::check_field_type_common(const Json::Value &field, const std::str
}
break;
+ case FIELD_TYPE_UINT32:
+ if ( (!field.isUInt()) || (field.asUInt() > 0xFFFFFFFF)) {
+ rc = false;
+ }
+ break;
+
case FIELD_TYPE_BOOL:
if (!field.isBool()) {
rc = false;
diff --git a/src/rpc-server/trex_rpc_cmd_api.h b/src/rpc-server/trex_rpc_cmd_api.h
index e93fb775..f81981d4 100644
--- a/src/rpc-server/trex_rpc_cmd_api.h
+++ b/src/rpc-server/trex_rpc_cmd_api.h
@@ -99,6 +99,7 @@ protected:
enum field_type_e {
FIELD_TYPE_BYTE,
FIELD_TYPE_UINT16,
+ FIELD_TYPE_UINT32,
FIELD_TYPE_INT,
FIELD_TYPE_DOUBLE,
FIELD_TYPE_BOOL,
@@ -136,6 +137,7 @@ protected:
*/
uint8_t parse_byte(const Json::Value &parent, const std::string &name, Json::Value &result);
uint16_t parse_uint16(const Json::Value &parent, const std::string &name, Json::Value &result);
+ uint32_t parse_uint32(const Json::Value &parent, const std::string &name, Json::Value &result);
int parse_int(const Json::Value &parent, const std::string &name, Json::Value &result);
double parse_double(const Json::Value &parent, const std::string &name, Json::Value &result);
bool parse_bool(const Json::Value &parent, const std::string &name, Json::Value &result);
@@ -145,6 +147,7 @@ protected:
uint8_t parse_byte(const Json::Value &parent, int index, Json::Value &result);
uint16_t parse_uint16(const Json::Value &parent, int index, Json::Value &result);
+ uint32_t parse_uint32(const Json::Value &parent, int index, Json::Value &result);
int parse_int(const Json::Value &parent, int index, Json::Value &result);
double parse_double(const Json::Value &parent, int index, Json::Value &result);
bool parse_bool(const Json::Value &parent, int index, Json::Value &result);
diff --git a/src/rpc-server/trex_rpc_cmds_table.cpp b/src/rpc-server/trex_rpc_cmds_table.cpp
index 52258b88..82c723b7 100644
--- a/src/rpc-server/trex_rpc_cmds_table.cpp
+++ b/src/rpc-server/trex_rpc_cmds_table.cpp
@@ -41,8 +41,8 @@ TrexRpcCommandsTable::TrexRpcCommandsTable() {
register_command(new TrexRpcCmdAcquire());
register_command(new TrexRpcCmdRelease());
register_command(new TrexRpcCmdGetPortStats());
-
- register_command(new TrexRpcCmdSyncUser());
+ register_command(new TrexRpcCmdGetPortStatus());
+
/* stream commands */
register_command(new TrexRpcCmdAddStream());
diff --git a/src/stateless/cp/trex_stateless_port.cpp b/src/stateless/cp/trex_stateless_port.cpp
index 0e45bf0b..96194321 100644
--- a/src/stateless/cp/trex_stateless_port.cpp
+++ b/src/stateless/cp/trex_stateless_port.cpp
@@ -57,7 +57,6 @@ TrexStatelessPort::TrexStatelessPort(uint8_t port_id, const TrexPlatformApi *api
m_port_id = port_id;
m_port_state = PORT_STATE_IDLE;
- clear_owner();
/* get the platform specific data */
api->get_interface_info(port_id, m_driver_name, m_speed);
@@ -85,18 +84,42 @@ TrexStatelessPort::TrexStatelessPort(uint8_t port_id, const TrexPlatformApi *api
* @param force
*/
void
-TrexStatelessPort::acquire(const std::string &user, bool force) {
- if ( (!is_free_to_aquire()) && (get_owner() != user) && (!force)) {
- throw TrexRpcException("port is already taken by '" + get_owner() + "'");
+TrexStatelessPort::acquire(const std::string &user, uint32_t session_id, bool force) {
+
+ /* if port is free - just take it */
+ if (get_owner().is_free()) {
+ get_owner().own(user, session_id);
+ return;
+ }
+
+ /* not free - but it might be the same user that owns the port */
+ if ( (get_owner().get_name() == user) && (get_owner().get_session_id() == session_id) ) {
+ return;
+ }
+
+ /* so different session id or different user */
+ if (force) {
+ get_owner().own(user, session_id);
+
+ /* inform the other client of the steal... */
+ Json::Value data;
+ data["port_id"] = m_port_id;
+ get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_FORCE_ACQUIRED, data);
+
+ } else {
+ /* not same user or session id and not force - report error */
+ if (get_owner().get_name() == user) {
+ throw TrexRpcException("port is already owned by another session of '" + user + "'");
+ } else {
+ throw TrexRpcException("port is already taken by '" + get_owner().get_name() + "'");
+ }
}
- set_owner(user);
}
void
TrexStatelessPort::release(void) {
- verify_state( ~(PORT_STATE_TX | PORT_STATE_PAUSE) );
- clear_owner();
+ get_owner().release();
}
/**
@@ -221,6 +244,10 @@ TrexStatelessPort::pause_traffic(void) {
send_message_to_all_dp(pause_msg);
change_state(PORT_STATE_PAUSE);
+
+ Json::Value data;
+ data["port_id"] = m_port_id;
+ get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_PAUSED, data);
}
void
@@ -234,6 +261,11 @@ TrexStatelessPort::resume_traffic(void) {
send_message_to_all_dp(resume_msg);
change_state(PORT_STATE_TX);
+
+
+ Json::Value data;
+ data["port_id"] = m_port_id;
+ get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_RESUMED, data);
}
void
@@ -324,27 +356,6 @@ TrexStatelessPort::change_state(port_state_e new_state) {
m_port_state = new_state;
}
-/**
- * generate a random connection handler
- *
- */
-std::string
-TrexStatelessPort::generate_handler() {
- std::stringstream ss;
-
- static const char alphanum[] =
- "0123456789"
- "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
- "abcdefghijklmnopqrstuvwxyz";
-
- /* generate 8 bytes of random handler */
- for (int i = 0; i < 8; ++i) {
- ss << alphanum[rand() % (sizeof(alphanum) - 1)];
- }
-
- return (ss.str());
-}
-
void
TrexStatelessPort::encode_stats(Json::Value &port) {
@@ -576,3 +587,37 @@ TrexStatelessPort::validate(void) {
return m_graph_obj;
}
+
+
+/************* Trex Port Owner **************/
+
+TrexPortOwner::TrexPortOwner() {
+ m_is_free = true;
+
+ /* for handlers random generation */
+ srand(time(NULL));
+}
+
+/**
+ * generate a random connection handler
+ *
+ */
+std::string
+TrexPortOwner::generate_handler() {
+ std::stringstream ss;
+
+ static const char alphanum[] =
+ "0123456789"
+ "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
+ "abcdefghijklmnopqrstuvwxyz";
+
+ /* generate 8 bytes of random handler */
+ for (int i = 0; i < 8; ++i) {
+ ss << alphanum[rand() % (sizeof(alphanum) - 1)];
+ }
+
+ return (ss.str());
+}
+
+const std::string TrexPortOwner::g_unowned_name = "<FREE>";
+const std::string TrexPortOwner::g_unowned_handler = "";
diff --git a/src/stateless/cp/trex_stateless_port.h b/src/stateless/cp/trex_stateless_port.h
index 28e42a17..1310fdb2 100644
--- a/src/stateless/cp/trex_stateless_port.h
+++ b/src/stateless/cp/trex_stateless_port.h
@@ -29,6 +29,82 @@ class TrexStatelessCpToDpMsgBase;
class TrexStreamsGraphObj;
class TrexPortMultiplier;
+/**
+ * TRex port owner can perform
+ * write commands
+ * while port is owned - others can
+ * do read only commands
+ *
+ */
+class TrexPortOwner {
+public:
+
+ TrexPortOwner();
+
+ /**
+ * is port free to acquire
+ */
+ bool is_free() {
+ return m_is_free;
+ }
+
+ void release() {
+ m_is_free = true;
+ m_owner_name = "";
+ m_handler = "";
+ }
+
+ bool is_owned_by(const std::string &user) {
+ return ( !m_is_free && (m_owner_name == user) );
+ }
+
+ void own(const std::string &owner_name, uint32_t session_id) {
+
+ /* save user data */
+ m_owner_name = owner_name;
+ m_session_id = session_id;
+
+ /* internal data */
+ m_handler = generate_handler();
+ m_is_free = false;
+ }
+
+ bool verify(const std::string &handler) {
+ return ( (!m_is_free) && (m_handler == handler) );
+ }
+
+ const std::string &get_name() {
+ return (!m_is_free ? m_owner_name : g_unowned_name);
+ }
+
+ const std::string &get_handler() {
+ return (!m_is_free ? m_handler : g_unowned_handler);
+ }
+
+ uint32_t get_session_id() {
+ return m_session_id;
+ }
+
+private:
+ std::string generate_handler();
+
+ /* is this port owned by someone ? */
+ bool m_is_free;
+
+ /* user provided info - name and session id */
+ std::string m_owner_name;
+ uint32_t m_session_id;
+
+ /* handler genereated internally */
+ std::string m_handler;
+
+
+ /* just references defaults... */
+ static const std::string g_unowned_name;
+ static const std::string g_unowned_handler;
+};
+
+
/**
* describes a stateless port
*
@@ -67,7 +143,7 @@ public:
* acquire port
* throws TrexException in case of an error
*/
- void acquire(const std::string &user, bool force = false);
+ void acquire(const std::string &user, uint32_t session_id, bool force = false);
/**
* release the port from the current user
@@ -140,29 +216,6 @@ public:
void get_properties(std::string &driver, TrexPlatformApi::driver_speed_e &speed);
- /**
- * query for ownership
- *
- */
- const std::string &get_owner() {
- return m_owner;
- }
-
- /**
- * owner handler
- * for the connection
- *
- */
- const std::string &get_owner_handler() {
- return m_owner_handler;
- }
-
-
- bool verify_owner_handler(const std::string &handler) {
-
- return ( (m_owner != "none") && (m_owner_handler == handler) );
-
- }
/**
* encode stats as JSON
@@ -246,29 +299,11 @@ public:
*/
uint64_t get_port_speed_bps() const;
-private:
-
-
-
- /**
- * take ownership of the server array
- * this is static
- * ownership is total
- *
- */
- void set_owner(const std::string &owner) {
- m_owner = owner;
- m_owner_handler = generate_handler();
- }
-
- void clear_owner() {
- m_owner = "none";
- m_owner_handler = "";
+ TrexPortOwner & get_owner() {
+ return m_owner;
}
- bool is_free_to_aquire() {
- return (m_owner == "none");
- }
+private:
const std::vector<int> get_core_id_list () {
@@ -325,14 +360,12 @@ private:
TrexStreamTable m_stream_table;
uint8_t m_port_id;
port_state_e m_port_state;
- std::string m_owner;
- std::string m_owner_handler;
std::string m_driver_name;
TrexPlatformApi::driver_speed_e m_speed;
/* holds the DP cores associated with this port */
- std::vector<int> m_cores_id_list;
+ std::vector<int> m_cores_id_list;
bool m_last_all_streams_continues;
double m_last_duration;
@@ -342,8 +375,10 @@ private:
/* holds a graph of streams rate*/
const TrexStreamsGraphObj *m_graph_obj;
-};
+ /* owner information */
+ TrexPortOwner m_owner;
+};
/**