summaryrefslogtreecommitdiffstats
path: root/scripts/automation/trex_control_plane/client
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/automation/trex_control_plane/client')
-rw-r--r--scripts/automation/trex_control_plane/client/trex_async_client.py330
-rw-r--r--scripts/automation/trex_control_plane/client/trex_port.py512
-rwxr-xr-xscripts/automation/trex_control_plane/client/trex_stateless_client.py2035
-rw-r--r--scripts/automation/trex_control_plane/client/trex_stateless_sim.py430
4 files changed, 0 insertions, 3307 deletions
diff --git a/scripts/automation/trex_control_plane/client/trex_async_client.py b/scripts/automation/trex_control_plane/client/trex_async_client.py
deleted file mode 100644
index ef4c48f9..00000000
--- a/scripts/automation/trex_control_plane/client/trex_async_client.py
+++ /dev/null
@@ -1,330 +0,0 @@
-#!/router/bin/python
-
-try:
- # support import for Python 2
- import outer_packages
-except ImportError:
- # support import for Python 3
- import client.outer_packages
-from client_utils.jsonrpc_client import JsonRpcClient, BatchMessage
-
-from common.text_opts import *
-
-import json
-import threading
-import time
-import datetime
-import zmq
-import re
-import random
-
-from common.trex_stats import *
-from common.trex_streams import *
-from common.trex_types import *
-
-# basic async stats class
-class CTRexAsyncStats(object):
- def __init__ (self):
- self.ref_point = None
- self.current = {}
- self.last_update_ts = datetime.datetime.now()
-
- def update (self, snapshot):
-
- #update
- self.last_update_ts = datetime.datetime.now()
-
- self.current = snapshot
-
- if self.ref_point == None:
- self.ref_point = self.current
-
- def clear(self):
- self.ref_point = self.current
-
-
- def get(self, field, format=False, suffix=""):
-
- if not field in self.current:
- return "N/A"
-
- if not format:
- return self.current[field]
- else:
- return format_num(self.current[field], suffix)
-
- def get_rel (self, field, format=False, suffix=""):
- if not field in self.current:
- return "N/A"
-
- if not format:
- return (self.current[field] - self.ref_point[field])
- else:
- return format_num(self.current[field] - self.ref_point[field], suffix)
-
-
- # return true if new data has arrived in the past 2 seconds
- def is_online (self):
- delta_ms = (datetime.datetime.now() - self.last_update_ts).total_seconds() * 1000
- return (delta_ms < 2000)
-
-# describes the general stats provided by TRex
-class CTRexAsyncStatsGeneral(CTRexAsyncStats):
- def __init__ (self):
- super(CTRexAsyncStatsGeneral, self).__init__()
-
-
-# per port stats
-class CTRexAsyncStatsPort(CTRexAsyncStats):
- def __init__ (self):
- super(CTRexAsyncStatsPort, self).__init__()
-
- def get_stream_stats (self, stream_id):
- return None
-
-# stats manager
-class CTRexAsyncStatsManager():
- def __init__ (self):
-
- self.general_stats = CTRexAsyncStatsGeneral()
- self.port_stats = {}
-
-
- def get_general_stats(self):
- return self.general_stats
-
- def get_port_stats (self, port_id):
-
- if not str(port_id) in self.port_stats:
- return None
-
- return self.port_stats[str(port_id)]
-
-
- def update(self, data):
- self.__handle_snapshot(data)
-
- def __handle_snapshot(self, snapshot):
-
- general_stats = {}
- port_stats = {}
-
- # filter the values per port and general
- for key, value in snapshot.iteritems():
-
- # match a pattern of ports
- m = re.search('(.*)\-([0-8])', key)
- if m:
-
- port_id = m.group(2)
- field_name = m.group(1)
-
- if not port_id in port_stats:
- port_stats[port_id] = {}
-
- port_stats[port_id][field_name] = value
-
- else:
- # no port match - general stats
- general_stats[key] = value
-
- # update the general object with the snapshot
- self.general_stats.update(general_stats)
-
- # update all ports
- for port_id, data in port_stats.iteritems():
-
- if not port_id in self.port_stats:
- self.port_stats[port_id] = CTRexAsyncStatsPort()
-
- self.port_stats[port_id].update(data)
-
-
-
-
-
-class CTRexAsyncClient():
- def __init__ (self, server, port, stateless_client):
-
- self.port = port
- self.server = server
-
- self.stateless_client = stateless_client
-
- self.event_handler = stateless_client.event_handler
- self.logger = self.stateless_client.logger
-
- self.raw_snapshot = {}
-
- self.stats = CTRexAsyncStatsManager()
-
- self.last_data_recv_ts = 0
- self.async_barrier = None
-
- self.connected = False
-
- # connects the async channel
- def connect (self):
-
- if self.connected:
- self.disconnect()
-
- self.tr = "tcp://{0}:{1}".format(self.server, self.port)
-
- # Socket to talk to server
- self.context = zmq.Context()
- self.socket = self.context.socket(zmq.SUB)
-
-
- # before running the thread - mark as active
- self.active = True
- self.t = threading.Thread(target = self._run)
-
- # kill this thread on exit and don't add it to the join list
- self.t.setDaemon(True)
- self.t.start()
-
- self.connected = True
-
- rc = self.barrier()
- if not rc:
- self.disconnect()
- return rc
-
- return RC_OK()
-
-
-
-
- # disconnect
- def disconnect (self):
- if not self.connected:
- return
-
- # signal that the context was destroyed (exit the thread loop)
- self.context.term()
-
- # mark for join and join
- self.active = False
- self.t.join()
-
- # done
- self.connected = False
-
-
- # thread function
- def _run (self):
-
- # socket must be created on the same thread
- self.socket.setsockopt(zmq.SUBSCRIBE, '')
- self.socket.setsockopt(zmq.RCVTIMEO, 5000)
- self.socket.connect(self.tr)
-
- got_data = False
-
- while self.active:
- try:
-
- line = self.socket.recv_string()
- self.last_data_recv_ts = time.time()
-
- # signal once
- if not got_data:
- self.event_handler.on_async_alive()
- got_data = True
-
-
- # got a timeout - mark as not alive and retry
- except zmq.Again:
-
- # signal once
- if got_data:
- self.event_handler.on_async_dead()
- got_data = False
-
- continue
-
- except zmq.ContextTerminated:
- # outside thread signaled us to exit
- break
-
- msg = json.loads(line)
-
- name = msg['name']
- data = msg['data']
- type = msg['type']
- self.raw_snapshot[name] = data
-
- self.__dispatch(name, type, data)
-
-
- # closing of socket must be from the same thread
- self.socket.close(linger = 0)
-
-
- # did we get info for the last 3 seconds ?
- def is_alive (self):
- if self.last_data_recv_ts == None:
- return False
-
- return ( (time.time() - self.last_data_recv_ts) < 3 )
-
- def get_stats (self):
- return self.stats
-
- def get_raw_snapshot (self):
- return self.raw_snapshot
-
- # dispatch the message to the right place
- def __dispatch (self, name, type, data):
- # stats
- if name == "trex-global":
- self.event_handler.handle_async_stats_update(data)
-
- # events
- elif name == "trex-event":
- self.event_handler.handle_async_event(type, data)
-
- # barriers
- elif name == "trex-barrier":
- self.handle_async_barrier(type, data)
- else:
- pass
-
-
- # async barrier handling routine
- def handle_async_barrier (self, type, data):
- if self.async_barrier['key'] == type:
- self.async_barrier['ack'] = True
-
-
- # block on barrier for async channel
- def barrier(self, timeout = 5):
-
- # set a random key
- key = random.getrandbits(32)
- self.async_barrier = {'key': key, 'ack': False}
-
- # expr time
- expr = time.time() + timeout
-
- while not self.async_barrier['ack']:
-
- # inject
- rc = self.stateless_client._transmit("publish_now", params = {'key' : key})
- if not rc:
- return rc
-
- # fast loop
- for i in xrange(0, 100):
- if self.async_barrier['ack']:
- break
- time.sleep(0.001)
-
- if time.time() > expr:
- return RC_ERR("*** [subscriber] - timeout - no data flow from server at : " + self.tr)
-
- return RC_OK()
-
-
-
diff --git a/scripts/automation/trex_control_plane/client/trex_port.py b/scripts/automation/trex_control_plane/client/trex_port.py
deleted file mode 100644
index eaf64ac2..00000000
--- a/scripts/automation/trex_control_plane/client/trex_port.py
+++ /dev/null
@@ -1,512 +0,0 @@
-
-from collections import namedtuple, OrderedDict
-from common.trex_types import *
-from common import trex_stats
-from client_utils import packet_builder
-
-StreamOnPort = namedtuple('StreamOnPort', ['compiled_stream', 'metadata'])
-
-########## utlity ############
-def mult_to_factor (mult, max_bps_l2, max_pps, line_util):
- if mult['type'] == 'raw':
- return mult['value']
-
- if mult['type'] == 'bps':
- return mult['value'] / max_bps_l2
-
- if mult['type'] == 'pps':
- return mult['value'] / max_pps
-
- if mult['type'] == 'percentage':
- return mult['value'] / line_util
-
-
-# describes a single port
-class Port(object):
- STATE_DOWN = 0
- STATE_IDLE = 1
- STATE_STREAMS = 2
- STATE_TX = 3
- STATE_PAUSE = 4
- PortState = namedtuple('PortState', ['state_id', 'state_name'])
- STATES_MAP = {STATE_DOWN: "DOWN",
- STATE_IDLE: "IDLE",
- STATE_STREAMS: "IDLE",
- STATE_TX: "ACTIVE",
- STATE_PAUSE: "PAUSE"}
-
-
- def __init__ (self, port_id, speed, driver, user, comm_link, session_id):
- self.port_id = port_id
- self.state = self.STATE_IDLE
- self.handler = None
- self.comm_link = comm_link
- self.transmit = comm_link.transmit
- self.transmit_batch = comm_link.transmit_batch
- self.user = user
- self.driver = driver
- self.speed = speed
- self.streams = {}
- self.profile = None
- self.session_id = session_id
-
- self.port_stats = trex_stats.CPortStats(self)
-
-
- def err(self, msg):
- return RC_ERR("port {0} : {1}".format(self.port_id, msg))
-
- def ok(self, data = ""):
- return RC_OK(data)
-
- def get_speed_bps (self):
- return (self.speed * 1000 * 1000 * 1000)
-
- # take the port
- def acquire(self, force = False):
- params = {"port_id": self.port_id,
- "user": self.user,
- "session_id": self.session_id,
- "force": force}
-
- command = RpcCmdData("acquire", params)
- rc = self.transmit(command.method, command.params)
- if rc.good():
- self.handler = rc.data()
- return self.ok()
- else:
- return self.err(rc.err())
-
- # release the port
- def release(self):
- params = {"port_id": self.port_id,
- "handler": self.handler}
-
- command = RpcCmdData("release", params)
- rc = self.transmit(command.method, command.params)
- self.handler = None
-
- if rc.good():
- return self.ok()
- else:
- return self.err(rc.err())
-
- def is_acquired(self):
- return (self.handler != None)
-
- def is_active(self):
- return(self.state == self.STATE_TX ) or (self.state == self.STATE_PAUSE)
-
- def is_transmitting (self):
- return (self.state == self.STATE_TX)
-
- def is_paused (self):
- return (self.state == self.STATE_PAUSE)
-
-
- def sync(self):
- params = {"port_id": self.port_id}
-
- command = RpcCmdData("get_port_status", params)
- rc = self.transmit(command.method, command.params)
- if rc.bad():
- return self.err(rc.err())
-
- # sync the port
- port_state = rc.data()['state']
-
- if port_state == "DOWN":
- self.state = self.STATE_DOWN
- elif port_state == "IDLE":
- self.state = self.STATE_IDLE
- elif port_state == "STREAMS":
- self.state = self.STATE_STREAMS
- elif port_state == "TX":
- self.state = self.STATE_TX
- elif port_state == "PAUSE":
- self.state = self.STATE_PAUSE
- else:
- raise Exception("port {0}: bad state received from server '{1}'".format(self.port_id, port_state))
-
- # TODO: handle syncing the streams into stream_db
-
- return self.ok()
-
-
- # return TRUE if write commands
- def is_port_writable (self):
- # operations on port can be done on state idle or state streams
- return ((self.state == self.STATE_IDLE) or (self.state == self.STATE_STREAMS))
-
-
- # add streams
- def add_streams (self, streams_list):
-
- if not self.is_acquired():
- return self.err("port is not owned")
-
- if not self.is_port_writable():
- return self.err("Please stop port before attempting to add streams")
-
- batch = []
- for stream in (streams_list if isinstance(streams_list, list) else [streams_list]):
-
- params = {"handler": self.handler,
- "port_id": self.port_id,
- "stream_id": stream.get_id(),
- "stream": stream.to_json()}
-
- cmd = RpcCmdData('add_stream', params)
- batch.append(cmd)
-
- # meta data for show streams
- self.streams[stream.get_id()] = StreamOnPort(stream.to_json(),
- Port._generate_stream_metadata(stream.get_id(), stream.to_json()))
-
- rc = self.transmit_batch(batch)
- if not rc:
- return self.err(rc.err())
-
-
-
- # the only valid state now
- self.state = self.STATE_STREAMS
-
- return self.ok()
-
-
-
- # remove stream from port
- def remove_streams (self, stream_id_list):
-
- if not self.is_acquired():
- return self.err("port is not owned")
-
- if not self.is_port_writable():
- return self.err("Please stop port before attempting to remove streams")
-
- # single element to list
- stream_id_list = stream_id_list if isinstance(stream_id_list, list) else [stream_id_list]
-
- # verify existance
- if not all([stream_id in self.streams for stream_id in stream_id_list]):
- return self.err("stream {0} does not exists".format(stream_id))
-
- batch = []
-
- for stream_id in stream_id_list:
- params = {"handler": self.handler,
- "port_id": self.port_id,
- "stream_id": stream_id}
-
- cmd = RpcCmdData('remove_stream', params)
- batch.append(cmd)
-
- del self.streams[stream_id]
-
-
- rc = self.transmit_batch(batch)
- if not rc:
- return self.err(rc.err())
-
- self.state = self.STATE_STREAMS if (len(self.streams) > 0) else self.STATE_IDLE
-
- return self.ok()
-
-
- # remove all the streams
- def remove_all_streams (self):
-
- if not self.is_acquired():
- return self.err("port is not owned")
-
- if not self.is_port_writable():
- return self.err("Please stop port before attempting to remove streams")
-
- params = {"handler": self.handler,
- "port_id": self.port_id}
-
- rc = self.transmit("remove_all_streams", params)
- if not rc:
- return self.err(rc.err())
-
- self.streams = {}
-
- self.state = self.STATE_IDLE
-
- return self.ok()
-
- # get a specific stream
- def get_stream (self, stream_id):
- if stream_id in self.streams:
- return self.streams[stream_id]
- else:
- return None
-
- def get_all_streams (self):
- return self.streams
-
- # start traffic
- def start (self, mul, duration, force):
- if not self.is_acquired():
- return self.err("port is not owned")
-
- if self.state == self.STATE_DOWN:
- return self.err("Unable to start traffic - port is down")
-
- if self.state == self.STATE_IDLE:
- return self.err("Unable to start traffic - no streams attached to port")
-
- if self.state == self.STATE_TX:
- return self.err("Unable to start traffic - port is already transmitting")
-
- params = {"handler": self.handler,
- "port_id": self.port_id,
- "mul": mul,
- "duration": duration,
- "force": force}
-
- rc = self.transmit("start_traffic", params)
- if rc.bad():
- return self.err(rc.err())
-
- self.state = self.STATE_TX
-
- return self.ok()
-
- # stop traffic
- # with force ignores the cached state and sends the command
- def stop (self, force = False):
-
- if not self.is_acquired():
- return self.err("port is not owned")
-
- # port is already stopped
- if not force:
- if (self.state == self.STATE_IDLE) or (self.state == self.state == self.STATE_STREAMS):
- return self.ok()
-
-
-
- params = {"handler": self.handler,
- "port_id": self.port_id}
-
- rc = self.transmit("stop_traffic", params)
- if rc.bad():
- return self.err(rc.err())
-
- # only valid state after stop
- self.state = self.STATE_STREAMS
-
- return self.ok()
-
- def pause (self):
-
- if not self.is_acquired():
- return self.err("port is not owned")
-
- if (self.state != self.STATE_TX) :
- return self.err("port is not transmitting")
-
- params = {"handler": self.handler,
- "port_id": self.port_id}
-
- rc = self.transmit("pause_traffic", params)
- if rc.bad():
- return self.err(rc.err())
-
- # only valid state after stop
- self.state = self.STATE_PAUSE
-
- return self.ok()
-
-
- def resume (self):
-
- if not self.is_acquired():
- return self.err("port is not owned")
-
- if (self.state != self.STATE_PAUSE) :
- return self.err("port is not in pause mode")
-
- params = {"handler": self.handler,
- "port_id": self.port_id}
-
- rc = self.transmit("resume_traffic", params)
- if rc.bad():
- return self.err(rc.err())
-
- # only valid state after stop
- self.state = self.STATE_TX
-
- return self.ok()
-
-
- def update (self, mul, force):
-
- if not self.is_acquired():
- return self.err("port is not owned")
-
- if (self.state != self.STATE_TX) :
- return self.err("port is not transmitting")
-
- params = {"handler": self.handler,
- "port_id": self.port_id,
- "mul": mul,
- "force": force}
-
- rc = self.transmit("update_traffic", params)
- if rc.bad():
- return self.err(rc.err())
-
- return self.ok()
-
-
- def validate (self):
-
- if not self.is_acquired():
- return self.err("port is not owned")
-
- if (self.state == self.STATE_DOWN):
- return self.err("port is down")
-
- if (self.state == self.STATE_IDLE):
- return self.err("no streams attached to port")
-
- params = {"handler": self.handler,
- "port_id": self.port_id}
-
- rc = self.transmit("validate", params)
- if rc.bad():
- return self.err(rc.err())
-
- self.profile = rc.data()
-
- return self.ok()
-
- def get_profile (self):
- return self.profile
-
-
- def print_profile (self, mult, duration):
- if not self.get_profile():
- return
-
- rate = self.get_profile()['rate']
- graph = self.get_profile()['graph']
-
- print format_text("Profile Map Per Port\n", 'underline', 'bold')
-
- factor = mult_to_factor(mult, rate['max_bps_l2'], rate['max_pps'], rate['max_line_util'])
-
- print "Profile max BPS L2 (base / req): {:^12} / {:^12}".format(format_num(rate['max_bps_l2'], suffix = "bps"),
- format_num(rate['max_bps_l2'] * factor, suffix = "bps"))
-
- print "Profile max BPS L1 (base / req): {:^12} / {:^12}".format(format_num(rate['max_bps_l1'], suffix = "bps"),
- format_num(rate['max_bps_l1'] * factor, suffix = "bps"))
-
- print "Profile max PPS (base / req): {:^12} / {:^12}".format(format_num(rate['max_pps'], suffix = "pps"),
- format_num(rate['max_pps'] * factor, suffix = "pps"),)
-
- print "Profile line util. (base / req): {:^12} / {:^12}".format(format_percentage(rate['max_line_util']),
- format_percentage(rate['max_line_util'] * factor))
-
-
- # duration
- exp_time_base_sec = graph['expected_duration'] / (1000 * 1000)
- exp_time_factor_sec = exp_time_base_sec / factor
-
- # user configured a duration
- if duration > 0:
- if exp_time_factor_sec > 0:
- exp_time_factor_sec = min(exp_time_factor_sec, duration)
- else:
- exp_time_factor_sec = duration
-
-
- print "Duration (base / req): {:^12} / {:^12}".format(format_time(exp_time_base_sec),
- format_time(exp_time_factor_sec))
- print "\n"
-
-
- def get_port_state_name(self):
- return self.STATES_MAP.get(self.state, "Unknown")
-
- ################# stats handler ######################
- def generate_port_stats(self):
- return self.port_stats.generate_stats()
-
- def generate_port_status(self):
- return {"type": self.driver,
- "maximum": "{speed} Gb/s".format(speed=self.speed),
- "status": self.get_port_state_name()
- }
-
- def clear_stats(self):
- return self.port_stats.clear_stats()
-
-
- def get_stats (self):
- return self.port_stats.get_stats()
-
-
- def invalidate_stats(self):
- return self.port_stats.invalidate()
-
- ################# stream printout ######################
- def generate_loaded_streams_sum(self, stream_id_list):
- if self.state == self.STATE_DOWN or self.state == self.STATE_STREAMS:
- return {}
- streams_data = {}
-
- if not stream_id_list:
- # if no mask has been provided, apply to all streams on port
- stream_id_list = self.streams.keys()
-
-
- streams_data = {stream_id: self.streams[stream_id].metadata.get('stream_sum', ["N/A"] * 6)
- for stream_id in stream_id_list
- if stream_id in self.streams}
-
-
- return {"referring_file" : "",
- "streams" : streams_data}
-
- @staticmethod
- def _generate_stream_metadata(stream_id, compiled_stream_obj):
- meta_dict = {}
- # create packet stream description
- pkt_bld_obj = packet_builder.CTRexPktBuilder()
- pkt_bld_obj.load_from_stream_obj(compiled_stream_obj)
- # generate stream summary based on that
-
- next_stream = "None" if compiled_stream_obj['next_stream_id']==-1 else compiled_stream_obj['next_stream_id']
-
- meta_dict['stream_sum'] = OrderedDict([("id", stream_id),
- ("packet_type", "/".join(pkt_bld_obj.get_packet_layers())),
- ("length", pkt_bld_obj.get_packet_length()),
- ("mode", compiled_stream_obj['mode']['type']),
- ("rate_pps", compiled_stream_obj['mode']['pps']),
- ("next_stream", next_stream)
- ])
- return meta_dict
-
- ################# events handler ######################
- def async_event_port_stopped (self):
- self.state = self.STATE_STREAMS
-
-
- def async_event_port_started (self):
- self.state = self.STATE_TX
-
-
- def async_event_port_paused (self):
- self.state = self.STATE_PAUSE
-
-
- def async_event_port_resumed (self):
- self.state = self.STATE_TX
-
- def async_event_forced_acquired (self):
- self.handler = None
-
diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py
deleted file mode 100755
index 95fd2a69..00000000
--- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py
+++ /dev/null
@@ -1,2035 +0,0 @@
-#!/router/bin/python
-
-try:
- # support import for Python 2
- import outer_packages
-except ImportError:
- # support import for Python 3
- import client.outer_packages
-
-from client_utils.jsonrpc_client import JsonRpcClient, BatchMessage
-from client_utils import general_utils
-from client_utils.packet_builder import CTRexPktBuilder
-import json
-
-from common.trex_streams import *
-from collections import namedtuple
-from common.text_opts import *
-from common import trex_stats
-from client_utils import parsing_opts, text_tables
-import time
-import datetime
-import re
-import random
-from trex_port import Port
-from common.trex_types import *
-from common.trex_stl_exceptions import *
-from trex_async_client import CTRexAsyncClient
-from yaml import YAMLError
-
-
-
-############################ logger #############################
-############################ #############################
-############################ #############################
-
-# logger API for the client
-class LoggerApi(object):
- # verbose levels
- VERBOSE_QUIET = 0
- VERBOSE_REGULAR = 1
- VERBOSE_HIGH = 2
-
- def __init__(self):
- self.level = LoggerApi.VERBOSE_REGULAR
-
- # implemented by specific logger
- def write(self, msg, newline = True):
- raise Exception("implement this")
-
- # implemented by specific logger
- def flush(self):
- raise Exception("implement this")
-
- def set_verbose (self, level):
- if not level in xrange(self.VERBOSE_QUIET, self.VERBOSE_HIGH + 1):
- raise ValueError("bad value provided for logger")
-
- self.level = level
-
- def get_verbose (self):
- return self.level
-
-
- def check_verbose (self, level):
- return (self.level >= level)
-
-
- # simple log message with verbose
- def log (self, msg, level = VERBOSE_REGULAR, newline = True):
- if not self.check_verbose(level):
- return
-
- self.write(msg, newline)
-
- # logging that comes from async event
- def async_log (self, msg, level = VERBOSE_REGULAR, newline = True):
- self.log(msg, level, newline)
-
-
- def pre_cmd (self, desc):
- self.log(format_text('\n{:<60}'.format(desc), 'bold'), newline = False)
- self.flush()
-
- def post_cmd (self, rc):
- if rc:
- self.log(format_text("[SUCCESS]\n", 'green', 'bold'))
- else:
- self.log(format_text("[FAILED]\n", 'red', 'bold'))
-
-
- def log_cmd (self, desc):
- self.pre_cmd(desc)
- self.post_cmd(True)
-
-
- # supress object getter
- def supress (self):
- class Supress(object):
- def __init__ (self, logger):
- self.logger = logger
-
- def __enter__ (self):
- self.saved_level = self.logger.get_verbose()
- self.logger.set_verbose(LoggerApi.VERBOSE_QUIET)
-
- def __exit__ (self, type, value, traceback):
- self.logger.set_verbose(self.saved_level)
-
- return Supress(self)
-
-
-
-# default logger - to stdout
-class DefaultLogger(LoggerApi):
-
- def __init__ (self):
- super(DefaultLogger, self).__init__()
-
- def write (self, msg, newline = True):
- if newline:
- print msg
- else:
- print msg,
-
- def flush (self):
- sys.stdout.flush()
-
-
-############################ async event hander #############################
-############################ #############################
-############################ #############################
-
-# handles different async events given to the client
-class AsyncEventHandler(object):
-
- def __init__ (self, client):
- self.client = client
- self.logger = self.client.logger
-
- self.events = []
-
- # public functions
-
- def get_events (self):
- return self.events
-
-
- def clear_events (self):
- self.events = []
-
-
- def on_async_dead (self):
- if self.client.connected:
- msg = 'lost connection to server'
- self.__add_event_log(msg, 'local', True)
- self.client.connected = False
-
-
- def on_async_alive (self):
- pass
-
-
- # handles an async stats update from the subscriber
- def handle_async_stats_update(self, dump_data):
- global_stats = {}
- port_stats = {}
-
- # filter the values per port and general
- for key, value in dump_data.iteritems():
- # match a pattern of ports
- m = re.search('(.*)\-([0-8])', key)
- if m:
- port_id = int(m.group(2))
- field_name = m.group(1)
- if self.client.ports.has_key(port_id):
- if not port_id in port_stats:
- port_stats[port_id] = {}
- port_stats[port_id][field_name] = value
- else:
- continue
- else:
- # no port match - general stats
- global_stats[key] = value
-
- # update the general object with the snapshot
- self.client.global_stats.update(global_stats)
-
- # update all ports
- for port_id, data in port_stats.iteritems():
- self.client.ports[port_id].port_stats.update(data)
-
-
- # dispatcher for server async events (port started, port stopped and etc.)
- def handle_async_event (self, type, data):
- # DP stopped
-
- show_event = False
-
- # port started
- if (type == 0):
- port_id = int(data['port_id'])
- ev = "Port {0} has started".format(port_id)
- self.__async_event_port_started(port_id)
-
- # port stopped
- elif (type == 1):
- port_id = int(data['port_id'])
- ev = "Port {0} has stopped".format(port_id)
-
- # call the handler
- self.__async_event_port_stopped(port_id)
-
-
- # port paused
- elif (type == 2):
- port_id = int(data['port_id'])
- ev = "Port {0} has paused".format(port_id)
-
- # call the handler
- self.__async_event_port_paused(port_id)
-
- # port resumed
- elif (type == 3):
- port_id = int(data['port_id'])
- ev = "Port {0} has resumed".format(port_id)
-
- # call the handler
- self.__async_event_port_resumed(port_id)
-
- # port finished traffic
- elif (type == 4):
- port_id = int(data['port_id'])
- ev = "Port {0} job done".format(port_id)
-
- # call the handler
- self.__async_event_port_stopped(port_id)
- show_event = True
-
- # port was stolen...
- elif (type == 5):
- session_id = data['session_id']
-
- # false alarm, its us
- if session_id == self.client.session_id:
- return
-
- port_id = int(data['port_id'])
- who = data['who']
-
- ev = "Port {0} was forcely taken by '{1}'".format(port_id, who)
-
- # call the handler
- self.__async_event_port_forced_acquired(port_id)
- show_event = True
-
- # server stopped
- elif (type == 100):
- ev = "Server has stopped"
- self.__async_event_server_stopped()
- show_event = True
-
-
- else:
- # unknown event - ignore
- return
-
-
- self.__add_event_log(ev, 'server', show_event)
-
-
- # private functions
-
- def __async_event_port_stopped (self, port_id):
- self.client.ports[port_id].async_event_port_stopped()
-
-
- def __async_event_port_started (self, port_id):
- self.client.ports[port_id].async_event_port_started()
-
-
- def __async_event_port_paused (self, port_id):
- self.client.ports[port_id].async_event_port_paused()
-
-
- def __async_event_port_resumed (self, port_id):
- self.client.ports[port_id].async_event_port_resumed()
-
-
- def __async_event_port_forced_acquired (self, port_id):
- self.client.ports[port_id].async_event_forced_acquired()
-
-
- def __async_event_server_stopped (self):
- self.client.connected = False
-
-
- # add event to log
- def __add_event_log (self, msg, ev_type, show = False):
-
- if ev_type == "server":
- prefix = "[server]"
- elif ev_type == "local":
- prefix = "[local]"
-
- ts = time.time()
- st = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S')
- self.events.append("{:<10} - {:^8} - {:}".format(st, prefix, format_text(msg, 'bold')))
-
- if show:
- self.logger.async_log(format_text("\n\n{:^8} - {:}".format(prefix, format_text(msg, 'bold'))))
-
-
-
-
-
-############################ RPC layer #############################
-############################ #############################
-############################ #############################
-
-class CCommLink(object):
- """describes the connectivity of the stateless client method"""
- def __init__(self, server="localhost", port=5050, virtual=False, prn_func = None):
- self.virtual = virtual
- self.server = server
- self.port = port
- self.rpc_link = JsonRpcClient(self.server, self.port, prn_func)
-
- @property
- def is_connected(self):
- if not self.virtual:
- return self.rpc_link.connected
- else:
- return True
-
- def get_server (self):
- return self.server
-
- def get_port (self):
- return self.port
-
- def connect(self):
- if not self.virtual:
- return self.rpc_link.connect()
-
- def disconnect(self):
- if not self.virtual:
- return self.rpc_link.disconnect()
-
- def transmit(self, method_name, params={}):
- if self.virtual:
- self._prompt_virtual_tx_msg()
- _, msg = self.rpc_link.create_jsonrpc_v2(method_name, params)
- print msg
- return
- else:
- return self.rpc_link.invoke_rpc_method(method_name, params)
-
- def transmit_batch(self, batch_list):
- if self.virtual:
- self._prompt_virtual_tx_msg()
- print [msg
- for _, msg in [self.rpc_link.create_jsonrpc_v2(command.method, command.params)
- for command in batch_list]]
- else:
- batch = self.rpc_link.create_batch()
- for command in batch_list:
- batch.add(command.method, command.params)
- # invoke the batch
- return batch.invoke()
-
- def _prompt_virtual_tx_msg(self):
- print "Transmitting virtually over tcp://{server}:{port}".format(server=self.server,
- port=self.port)
-
-
-
-############################ client #############################
-############################ #############################
-############################ #############################
-
-class STLClient(object):
- """docstring for STLClient"""
-
- def __init__(self,
- username = general_utils.get_current_user(),
- server = "localhost",
- sync_port = 4501,
- async_port = 4500,
- verbose_level = LoggerApi.VERBOSE_QUIET,
- logger = None,
- virtual = False):
-
-
- self.username = username
-
- # init objects
- self.ports = {}
- self.server_version = {}
- self.system_info = {}
- self.session_id = random.getrandbits(32)
- self.connected = False
-
- # logger
- self.logger = DefaultLogger() if not logger else logger
-
- # initial verbose
- self.logger.set_verbose(verbose_level)
-
- # low level RPC layer
- self.comm_link = CCommLink(server,
- sync_port,
- virtual,
- self.logger)
-
- # async event handler manager
- self.event_handler = AsyncEventHandler(self)
-
- # async subscriber level
- self.async_client = CTRexAsyncClient(server,
- async_port,
- self)
-
-
-
-
- # stats
- self.connection_info = {"username": username,
- "server": server,
- "sync_port": sync_port,
- "async_port": async_port,
- "virtual": virtual}
-
-
- self.global_stats = trex_stats.CGlobalStats(self.connection_info,
- self.server_version,
- self.ports)
-
- self.stats_generator = trex_stats.CTRexInfoGenerator(self.global_stats,
- self.ports)
-
-
-
- ############# private functions - used by the class itself ###########
-
- # some preprocessing for port argument
- def __ports (self, port_id_list):
-
- # none means all
- if port_id_list == None:
- return range(0, self.get_port_count())
-
- # always list
- if isinstance(port_id_list, int):
- port_id_list = [port_id_list]
-
- if not isinstance(port_id_list, list):
- raise ValueError("bad port id list: {0}".format(port_id_list))
-
- for port_id in port_id_list:
- if not isinstance(port_id, int) or (port_id < 0) or (port_id > self.get_port_count()):
- raise ValueError("bad port id {0}".format(port_id))
-
- return port_id_list
-
-
- # sync ports
- def __sync_ports (self, port_id_list = None, force = False):
- port_id_list = self.__ports(port_id_list)
-
- rc = RC()
-
- for port_id in port_id_list:
- rc.add(self.ports[port_id].sync())
-
- return rc
-
- # acquire ports, if port_list is none - get all
- def __acquire (self, port_id_list = None, force = False):
- port_id_list = self.__ports(port_id_list)
-
- rc = RC()
-
- for port_id in port_id_list:
- rc.add(self.ports[port_id].acquire(force))
-
- return rc
-
- # release ports
- def __release (self, port_id_list = None):
- port_id_list = self.__ports(port_id_list)
-
- rc = RC()
-
- for port_id in port_id_list:
- rc.add(self.ports[port_id].release())
-
- return rc
-
-
- def __add_streams(self, stream_list, port_id_list = None):
-
- port_id_list = self.__ports(port_id_list)
-
- rc = RC()
-
- for port_id in port_id_list:
- rc.add(self.ports[port_id].add_streams(stream_list))
-
- return rc
-
-
-
- def __remove_streams(self, stream_id_list, port_id_list = None):
-
- port_id_list = self.__ports(port_id_list)
-
- rc = RC()
-
- for port_id in port_id_list:
- rc.add(self.ports[port_id].remove_streams(stream_id_list))
-
- return rc
-
-
-
- def __remove_all_streams(self, port_id_list = None):
- port_id_list = self.__ports(port_id_list)
-
- rc = RC()
-
- for port_id in port_id_list:
- rc.add(self.ports[port_id].remove_all_streams())
-
- return rc
-
-
- def __get_stream(self, stream_id, port_id, get_pkt = False):
-
- return self.ports[port_id].get_stream(stream_id)
-
-
- def __get_all_streams(self, port_id, get_pkt = False):
-
- return self.ports[port_id].get_all_streams()
-
-
- def __get_stream_id_list(self, port_id):
-
- return self.ports[port_id].get_stream_id_list()
-
-
- def __start (self, multiplier, duration, port_id_list = None, force = False):
-
- port_id_list = self.__ports(port_id_list)
-
- rc = RC()
-
- for port_id in port_id_list:
- rc.add(self.ports[port_id].start(multiplier, duration, force))
-
- return rc
-
-
- def __resume (self, port_id_list = None, force = False):
-
- port_id_list = self.__ports(port_id_list)
- rc = RC()
-
- for port_id in port_id_list:
- rc.add(self.ports[port_id].resume())
-
- return rc
-
- def __pause (self, port_id_list = None, force = False):
-
- port_id_list = self.__ports(port_id_list)
- rc = RC()
-
- for port_id in port_id_list:
- rc.add(self.ports[port_id].pause())
-
- return rc
-
-
- def __stop (self, port_id_list = None, force = False):
-
- port_id_list = self.__ports(port_id_list)
- rc = RC()
-
- for port_id in port_id_list:
- rc.add(self.ports[port_id].stop(force))
-
- return rc
-
-
- def __update (self, mult, port_id_list = None, force = False):
-
- port_id_list = self.__ports(port_id_list)
- rc = RC()
-
- for port_id in port_id_list:
- rc.add(self.ports[port_id].update(mult, force))
-
- return rc
-
-
- def __validate (self, port_id_list = None):
- port_id_list = self.__ports(port_id_list)
-
- rc = RC()
-
- for port_id in port_id_list:
- rc.add(self.ports[port_id].validate())
-
- return rc
-
-
-
- # connect to server
- def __connect(self):
-
- # first disconnect if already connected
- if self.is_connected():
- self.__disconnect()
-
- # clear this flag
- self.connected = False
-
- # connect sync channel
- self.logger.pre_cmd("Connecting to RPC server on {0}:{1}".format(self.connection_info['server'], self.connection_info['sync_port']))
- rc = self.comm_link.connect()
- self.logger.post_cmd(rc)
-
- if not rc:
- return rc
-
- # version
- rc = self._transmit("get_version")
- if not rc:
- return rc
-
-
- self.server_version = rc.data()
- self.global_stats.server_version = rc.data()
-
- # cache system info
- rc = self._transmit("get_system_info")
- if not rc:
- return rc
-
- self.system_info = rc.data()
-
- # cache supported commands
- rc = self._transmit("get_supported_cmds")
- if not rc:
- return rc
-
- self.supported_cmds = rc.data()
-
- # create ports
- for port_id in xrange(self.system_info["port_count"]):
- speed = self.system_info['ports'][port_id]['speed']
- driver = self.system_info['ports'][port_id]['driver']
-
- self.ports[port_id] = Port(port_id,
- speed,
- driver,
- self.username,
- self.comm_link,
- self.session_id)
-
-
- # sync the ports
- rc = self.__sync_ports()
- if not rc:
- return rc
-
-
- # connect async channel
- self.logger.pre_cmd("connecting to publisher server on {0}:{1}".format(self.connection_info['server'], self.connection_info['async_port']))
- rc = self.async_client.connect()
- self.logger.post_cmd(rc)
-
- if not rc:
- return rc
-
- self.connected = True
-
- return RC_OK()
-
-
- # disconenct from server
- def __disconnect(self, release_ports = True):
- # release any previous acquired ports
- if self.is_connected() and release_ports:
- self.__release(self.get_acquired_ports())
-
- self.comm_link.disconnect()
- self.async_client.disconnect()
-
- self.connected = False
-
- return RC_OK()
-
-
- # clear stats
- def __clear_stats(self, port_id_list, clear_global):
-
- for port_id in port_id_list:
- self.ports[port_id].clear_stats()
-
- if clear_global:
- self.global_stats.clear_stats()
-
- self.logger.log_cmd("clearing stats on port(s) {0}:".format(port_id_list))
-
- return RC
-
-
- # get stats
- def __get_stats (self, port_id_list):
- stats = {}
-
- stats['global'] = self.global_stats.get_stats()
-
- total = {}
- for port_id in port_id_list:
- port_stats = self.ports[port_id].get_stats()
- stats[port_id] = port_stats
-
- for k, v in port_stats.iteritems():
- if not k in total:
- total[k] = v
- else:
- total[k] += v
-
- stats['total'] = total
-
- return stats
-
-
- ############ functions used by other classes but not users ##############
-
- def _verify_port_id_list (self, port_id_list):
- # check arguments
- if not isinstance(port_id_list, list):
- return RC_ERR("ports should be an instance of 'list' not {0}".format(type(port_id_list)))
-
- # all ports are valid ports
- if not port_id_list or not all([port_id in self.get_all_ports() for port_id in port_id_list]):
- return RC_ERR("")
-
- return RC_OK()
-
- def _validate_port_list(self, port_id_list):
- if not isinstance(port_id_list, list):
- return False
-
- # check each item of the sequence
- return (port_id_list and all([port_id in self.get_all_ports() for port_id in port_id_list]))
-
-
-
- # transmit request on the RPC link
- def _transmit(self, method_name, params={}):
- return self.comm_link.transmit(method_name, params)
-
- # transmit batch request on the RPC link
- def _transmit_batch(self, batch_list):
- return self.comm_link.transmit_batch(batch_list)
-
- # stats
- def _get_formatted_stats(self, port_id_list, stats_mask=set()):
- stats_opts = trex_stats.ALL_STATS_OPTS.intersection(stats_mask)
-
- stats_obj = {}
- for stats_type in stats_opts:
- stats_obj.update(self.stats_generator.generate_single_statistic(port_id_list, stats_type))
-
- return stats_obj
-
- def _get_streams(self, port_id_list, streams_mask=set()):
-
- streams_obj = self.stats_generator.generate_streams_info(port_id_list, streams_mask)
-
- return streams_obj
-
-
- def _invalidate_stats (self, port_id_list):
- for port_id in port_id_list:
- self.ports[port_id].invalidate_stats()
-
- self.global_stats.invalidate()
-
- return RC_OK()
-
-
-
-
-
- #################################
- # ------ private methods ------ #
- @staticmethod
- def __get_mask_keys(ok_values={True}, **kwargs):
- masked_keys = set()
- for key, val in kwargs.iteritems():
- if val in ok_values:
- masked_keys.add(key)
- return masked_keys
-
- @staticmethod
- def __filter_namespace_args(namespace, ok_values):
- return {k: v for k, v in namespace.__dict__.items() if k in ok_values}
-
-
- # API decorator - double wrap because of argument
- def __api_check(connected = True):
-
- def wrap (f):
- def wrap2(*args, **kwargs):
- client = args[0]
-
- func_name = f.__name__
-
- # check connection
- if connected and not client.is_connected():
- raise STLStateError(func_name, 'disconnected')
-
- ret = f(*args, **kwargs)
- return ret
- return wrap2
-
- return wrap
-
-
-
- ############################ API #############################
- ############################ #############################
- ############################ #############################
- def __enter__ (self):
- self.connect()
- self.acquire(force = True)
- self.reset()
- return self
-
- def __exit__ (self, type, value, traceback):
- if self.get_active_ports():
- self.stop(self.get_active_ports())
- self.disconnect()
-
- ############################ Getters #############################
- ############################ #############################
- ############################ #############################
-
-
- # return verbose level of the logger
- def get_verbose (self):
- return self.logger.get_verbose()
-
- # is the client on read only mode ?
- def is_all_ports_acquired (self):
- return not (self.get_all_ports() == self.get_acquired_ports())
-
- # is the client connected ?
- def is_connected (self):
- return self.connected and self.comm_link.is_connected
-
-
- # get connection info
- def get_connection_info (self):
- return self.connection_info
-
-
- # get supported commands by the server
- def get_server_supported_cmds(self):
- return self.supported_cmds
-
- # get server version
- def get_server_version(self):
- return self.server_version
-
- # get server system info
- def get_server_system_info(self):
- return self.system_info
-
- # get port count
- def get_port_count(self):
- return len(self.ports)
-
-
- # returns the port object
- def get_port (self, port_id):
- port = self.ports.get(port_id, None)
- if (port != None):
- return port
- else:
- raise STLArgumentError('port id', port_id, valid_values = self.get_all_ports())
-
-
- # get all ports as IDs
- def get_all_ports (self):
- return self.ports.keys()
-
- # get all acquired ports
- def get_acquired_ports(self):
- return [port_id
- for port_id, port_obj in self.ports.iteritems()
- if port_obj.is_acquired()]
-
- # get all active ports (TX or pause)
- def get_active_ports(self):
- return [port_id
- for port_id, port_obj in self.ports.iteritems()
- if port_obj.is_active()]
-
- # get paused ports
- def get_paused_ports (self):
- return [port_id
- for port_id, port_obj in self.ports.iteritems()
- if port_obj.is_paused()]
-
- # get all TX ports
- def get_transmitting_ports (self):
- return [port_id
- for port_id, port_obj in self.ports.iteritems()
- if port_obj.is_transmitting()]
-
-
- # get stats
- def get_stats (self, ports = None, async_barrier = True):
- # by default use all ports
- if ports == None:
- ports = self.get_acquired_ports()
- else:
- ports = self.__ports(ports)
-
- # verify valid port id list
- rc = self._validate_port_list(ports)
- if not rc:
- raise STLArgumentError('ports', ports, valid_values = self.get_all_ports())
-
- # check async barrier
- if not type(async_barrier) is bool:
- raise STLArgumentError('async_barrier', async_barrier)
-
-
- # if the user requested a barrier - use it
- if async_barrier:
- rc = self.async_client.barrier()
- if not rc:
- raise STLError(rc)
-
- return self.__get_stats(ports)
-
- # return all async events
- def get_events (self):
- return self.event_handler.get_events()
-
- ############################ Commands #############################
- ############################ #############################
- ############################ #############################
-
-
- """
- Sets verbose level
-
- :parameters:
- level : str
- "high"
- "low"
- "normal"
-
- :raises:
- None
-
- """
- def set_verbose (self, level):
- modes = {'low' : LoggerApi.VERBOSE_QUIET, 'normal': LoggerApi.VERBOSE_REGULAR, 'high': LoggerApi.VERBOSE_HIGH}
-
- if not level in modes.keys():
- raise STLArgumentError('level', level)
-
- self.logger.set_verbose(modes[level])
-
-
- """
- Connects to the TRex server
-
- :parameters:
- None
-
- :raises:
- + :exc:`STLError`
-
- """
- @__api_check(False)
- def connect (self):
- rc = self.__connect()
- if not rc:
- raise STLError(rc)
-
-
- """
- Disconnects from the server
-
- :parameters:
- stop_traffic : bool
- tries to stop traffic before disconnecting
- release_ports : bool
- tries to release all the acquired ports
-
- """
- @__api_check(False)
- def disconnect (self, stop_traffic = True, release_ports = True):
-
- # try to stop ports but do nothing if not possible
- if stop_traffic:
- try:
- self.stop()
- except STLError:
- pass
-
-
- self.logger.pre_cmd("Disconnecting from server at '{0}':'{1}'".format(self.connection_info['server'],
- self.connection_info['sync_port']))
- rc = self.__disconnect(release_ports)
- self.logger.post_cmd(rc)
-
-
-
- """
- Acquires ports for executing commands
-
- :parameters:
- ports : list
- ports to execute the command
- force : bool
- force acquire the ports
-
- :raises:
- + :exc:`STLError`
-
- """
- @__api_check(True)
- def acquire (self, ports = None, force = False):
- # by default use all ports
- if ports == None:
- ports = self.get_all_ports()
-
- # verify ports
- rc = self._validate_port_list(ports)
- if not rc:
- raise STLArgumentError('ports', ports, valid_values = self.get_all_ports())
-
- # verify valid port id list
- if force:
- self.logger.pre_cmd("Force acquiring ports {0}:".format(ports))
- else:
- self.logger.pre_cmd("Acquiring ports {0}:".format(ports))
-
- rc = self.__acquire(ports, force)
-
- self.logger.post_cmd(rc)
-
- if not rc:
- # cleanup
- self.__release(ports)
- raise STLError(rc)
-
-
- """
- Release ports
-
- :parameters:
- ports : list
- ports to execute the command
-
- :raises:
- + :exc:`STLError`
-
- """
- @__api_check(True)
- def release (self, ports = None):
- # by default use all acquired ports
- if ports == None:
- ports = self.get_acquired_ports()
-
- # verify ports
- rc = self._validate_port_list(ports)
- if not rc:
- raise STLArgumentError('ports', ports, valid_values = self.get_all_ports())
-
- self.logger.pre_cmd("Releasing ports {0}:".format(ports))
- rc = self.__release(ports)
- self.logger.post_cmd(rc)
-
- if not rc:
- raise STLError(rc)
-
- """
- Pings the server
-
- :parameters:
- None
-
-
- :raises:
- + :exc:`STLError`
-
- """
- @__api_check(True)
- def ping(self):
- self.logger.pre_cmd( "Pinging the server on '{0}' port '{1}': ".format(self.connection_info['server'],
- self.connection_info['sync_port']))
- rc = self._transmit("ping")
-
- self.logger.post_cmd(rc)
-
- if not rc:
- raise STLError(rc)
-
-
-
- """
- force acquire ports, stop the traffic, remove all streams and clear stats
-
- :parameters:
- ports : list
- ports to execute the command
-
-
- :raises:
- + :exc:`STLError`
-
- """
- @__api_check(True)
- def reset(self, ports = None):
-
- # by default use all ports
- if ports == None:
- ports = self.get_all_ports()
-
- # verify ports
- rc = self._validate_port_list(ports)
- if not rc:
- raise STLArgumentError('ports', ports, valid_values = self.get_all_ports())
-
- self.acquire(ports, force = True)
- self.stop(ports)
- self.remove_all_streams(ports)
- self.clear_stats(ports)
-
-
- """
- remove all streams from port(s)
-
- :parameters:
- ports : list
- ports to execute the command
-
-
- :raises:
- + :exc:`STLError`
-
- """
- @__api_check(True)
- def remove_all_streams (self, ports = None):
-
- # by default use all ports
- if ports == None:
- ports = self.get_acquired_ports()
-
- # verify valid port id list
- rc = self._validate_port_list(ports)
- if not rc:
- raise STLArgumentError('ports', ports, valid_values = self.get_all_ports())
-
- self.logger.pre_cmd("Removing all streams from port(s) {0}:".format(ports))
- rc = self.__remove_all_streams(ports)
- self.logger.post_cmd(rc)
-
- if not rc:
- raise STLError(rc)
-
-
- """
- add a list of streams to port(s)
-
- :parameters:
- ports : list
- ports to execute the command
- streams: list
- streams to attach
-
- :returns:
- list of stream IDs in order of the stream list
-
- :raises:
- + :exc:`STLError`
-
- """
- @__api_check(True)
- def add_streams (self, streams, ports = None):
- # by default use all ports
- if ports == None:
- ports = self.get_acquired_ports()
-
- # verify valid port id list
- rc = self._validate_port_list(ports)
- if not rc:
- raise STLArgumentError('ports', ports, valid_values = self.get_all_ports())
-
- # transform single stream
- if not isinstance(streams, list):
- streams = [streams]
-
- # check streams
- if not all([isinstance(stream, STLStream) for stream in streams]):
- raise STLArgumentError('streams', streams)
-
- self.logger.pre_cmd("Attaching {0} streams to port(s) {1}:".format(len(streams), ports))
- rc = self.__add_streams(streams, ports)
- self.logger.post_cmd(rc)
-
- if not rc:
- raise STLError(rc)
-
- return [stream.get_id() for stream in streams]
-
-
- """
- remove a list of streams from ports
-
- :parameters:
- ports : list
- ports to execute the command
- stream_id_list: list
- stream id list to remove
-
-
- :raises:
- + :exc:`STLError`
-
- """
- @__api_check(True)
- def remove_streams (self, stream_id_list, ports = None):
- # by default use all ports
- if ports == None:
- ports = self.get_acquired_ports()
-
- # verify valid port id list
- rc = self._validate_port_list(ports)
- if not rc:
- raise STLArgumentError('ports', ports, valid_values = self.get_all_ports())
-
- # transform single stream
- if not isinstance(stream_id_list, list):
- stream_id_list = [stream_id_list]
-
- # check streams
- if not all([isinstance(stream_id, long) for stream_id in stream_id_list]):
- raise STLArgumentError('stream_id_list', stream_id_list)
-
- # remove streams
- self.logger.pre_cmd("Removing {0} streams from port(s) {1}:".format(len(stream_id_list), ports))
- rc = self.__remove_streams(stream_id_list, ports)
- self.logger.post_cmd(rc)
-
- if not rc:
- raise STLError(rc)
-
-
- """
- load a profile file to port(s)
-
- :parameters:
- filename : str
- filename to load
- ports : list
- ports to execute the command
-
-
- :raises:
- + :exc:`STLError`
-
- """
- @__api_check(True)
- def load_profile (self, filename, ports = None):
-
- # check filename
- if not os.path.isfile(filename):
- raise STLError("file '{0}' does not exists".format(filename))
-
- # by default use all ports
- if ports == None:
- ports = self.get_acquired_ports()
-
- # verify valid port id list
- rc = self._validate_port_list(ports)
- if not rc:
- raise STLArgumentError('ports', ports, valid_values = self.get_all_ports())
-
-
- streams = None
-
- # try YAML
- try:
- streams_db = CStreamsDB()
- stream_list = streams_db.load_yaml_file(filename)
- # convert to new style stream object
- streams = [HACKSTLStream(stream) for stream in stream_list.compiled]
- except YAMLError:
- # try python loader
- try:
- basedir = os.path.dirname(filename)
-
- sys.path.append(basedir)
- file = os.path.basename(filename).split('.')[0]
- module = __import__(file, globals(), locals(), [], -1)
- reload(module) # reload the update
-
- streams = module.register().get_streams()
-
- except Exception as e :
- print str(e);
- traceback.print_exc(file=sys.stdout)
- raise STLError("Unexpected error: '{0}'".format(filename))
-
-
- self.add_streams(streams, ports)
-
-
-
- """
- start traffic on port(s)
-
- :parameters:
- ports : list
- ports to execute command
-
- mult : str
- multiplier in a form of pps, bps, or line util in %
- examples: "5kpps", "10gbps", "85%", "32mbps"
-
- force : bool
- imply stopping the port of active and also
- forces a profile that exceeds the L1 BW
-
- duration : int
- limit the run for time in seconds
- -1 means unlimited
-
- total : bool
- should the B/W be divided by the ports
- or duplicated for each
-
-
- :raises:
- + :exc:`STLError`
-
- """
- @__api_check(True)
- def start (self,
- ports = None,
- mult = "1",
- force = False,
- duration = -1,
- total = False):
-
-
- # by default use all ports
- if ports == None:
- ports = self.get_acquired_ports()
-
- # verify valid port id list
- rc = self._validate_port_list(ports)
- if not rc:
- raise STLArgumentError('ports', ports, valid_values = self.get_all_ports())
-
- # verify multiplier
- mult_obj = parsing_opts.decode_multiplier(mult,
- allow_update = False,
- divide_count = len(ports) if total else 1)
- if not mult_obj:
- raise STLArgumentError('mult', mult)
-
- # some type checkings
-
- if not type(force) is bool:
- raise STLArgumentError('force', force)
-
- if not isinstance(duration, (int, float)):
- raise STLArgumentError('duration', duration)
-
- if not type(total) is bool:
- raise STLArgumentError('total', total)
-
-
- # verify ports are stopped or force stop them
- active_ports = list(set(self.get_active_ports()).intersection(ports))
- if active_ports:
- if not force:
- raise STLError("Port(s) {0} are active - please stop them or specify 'force'".format(active_ports))
- else:
- rc = self.stop(active_ports)
- if not rc:
- raise STLError(rc)
-
-
- # start traffic
- self.logger.pre_cmd("Starting traffic on port(s) {0}:".format(ports))
- rc = self.__start(mult_obj, duration, ports, force)
- self.logger.post_cmd(rc)
-
- if not rc:
- raise STLError(rc)
-
-
-
-
- """
- stop port(s)
-
- :parameters:
- ports : list
- ports to execute the command
-
-
- :raises:
- + :exc:`STLError`
-
- """
- @__api_check(True)
- def stop (self, ports = None):
-
- # by default the user means all the active ports
- if ports == None:
- ports = self.get_active_ports()
- if not ports:
- return
-
- # verify valid port id list
- rc = self._validate_port_list(ports)
- if not rc:
- raise STLArgumentError('ports', ports, valid_values = self.get_all_ports())
-
- self.logger.pre_cmd("Stopping traffic on port(s) {0}:".format(ports))
- rc = self.__stop(ports)
- self.logger.post_cmd(rc)
-
- if not rc:
- raise STLError(rc)
-
-
-
- """
- update traffic on port(s)
-
- :parameters:
- ports : list
- ports to execute command
-
- mult : str
- multiplier in a form of pps, bps, or line util in %
- and also with +/-
- examples: "5kpps+", "10gbps-", "85%", "32mbps", "20%+"
-
- force : bool
- forces a profile that exceeds the L1 BW
-
- total : bool
- should the B/W be divided by the ports
- or duplicated for each
-
-
- :raises:
- + :exc:`STLError`
-
- """
- @__api_check(True)
- def update (self, ports = None, mult = "1", total = False, force = False):
-
- # by default the user means all the active ports
- if ports == None:
- ports = self.get_active_ports()
-
- # verify valid port id list
- rc = self._validate_port_list(ports)
- if not rc:
- raise STLArgumentError('ports', ports, valid_values = self.get_all_ports())
-
- # verify multiplier
- mult_obj = parsing_opts.decode_multiplier(mult,
- allow_update = True,
- divide_count = len(ports) if total else 1)
- if not mult_obj:
- raise STLArgumentError('mult', mult)
-
- # verify total
- if not type(total) is bool:
- raise STLArgumentError('total', total)
-
-
- # call low level functions
- self.logger.pre_cmd("Updating traffic on port(s) {0}:".format(ports))
- rc = self.__update(mult, ports, force)
- self.logger.post_cmd(rc)
-
- if not rc:
- raise STLError(rc)
-
-
-
- """
- pause traffic on port(s)
-
- :parameters:
- ports : list
- ports to execute command
-
- :raises:
- + :exc:`STLError`
-
- """
- @__api_check(True)
- def pause (self, ports = None):
-
- # by default the user means all the TX ports
- if ports == None:
- ports = self.get_transmitting_ports()
-
- # verify valid port id list
- rc = self._validate_port_list(ports)
- if not rc:
- raise STLArgumentError('ports', ports, valid_values = self.get_all_ports())
-
- self.logger.pre_cmd("Pausing traffic on port(s) {0}:".format(ports))
- rc = self.__pause(ports)
- self.logger.post_cmd(rc)
-
- if not rc:
- raise STLError(rc)
-
-
-
- """
- resume traffic on port(s)
-
- :parameters:
- ports : list
- ports to execute command
-
- :raises:
- + :exc:`STLError`
-
- """
- @__api_check(True)
- def resume (self, ports = None):
-
- # by default the user means all the paused ports
- if ports == None:
- ports = self.get_paused_ports()
-
- # verify valid port id list
- rc = self._validate_port_list(ports)
- if not rc:
- raise STLArgumentError('ports', ports, valid_values = self.get_all_ports())
-
- self.logger.pre_cmd("Resume traffic on port(s) {0}:".format(ports))
- rc = self.__resume(ports)
- self.logger.post_cmd(rc)
-
- if not rc:
- raise STLError(rc)
-
-
- """
- validate port(s) configuration
-
- :parameters:
- ports : list
- ports to execute command
-
- mult : str
- multiplier in a form of pps, bps, or line util in %
- examples: "5kpps", "10gbps", "85%", "32mbps"
-
- duration : int
- limit the run for time in seconds
- -1 means unlimited
-
- total : bool
- should the B/W be divided by the ports
- or duplicated for each
-
- :raises:
- + :exc:`STLError`
-
- """
- @__api_check(True)
- def validate (self, ports = None, mult = "1", duration = "-1", total = False):
- if ports == None:
- ports = self.get_acquired_ports()
-
- # verify valid port id list
- rc = self._validate_port_list(ports)
- if not rc:
- raise STLArgumentError('ports', ports, valid_values = self.get_all_ports())
-
- # verify multiplier
- mult_obj = parsing_opts.decode_multiplier(mult,
- allow_update = True,
- divide_count = len(ports) if total else 1)
- if not mult_obj:
- raise STLArgumentError('mult', mult)
-
-
- if not isinstance(duration, (int, float)):
- raise STLArgumentError('duration', duration)
-
-
- self.logger.pre_cmd("Validating streams on port(s) {0}:".format(ports))
- rc = self.__validate(ports)
- self.logger.post_cmd(rc)
-
-
- for port in ports:
- self.ports[port].print_profile(mult_obj, duration)
-
-
- """
- clear stats on port(s)
-
- :parameters:
- ports : list
- ports to execute command
-
- clear_global : bool
- clear the global stats
-
- :raises:
- + :exc:`STLError`
-
- """
- @__api_check(False)
- def clear_stats (self, ports = None, clear_global = True):
-
- # by default use all ports
- if ports == None:
- ports = self.get_all_ports()
- else:
- ports = self.__ports(ports)
-
- # verify valid port id list
- rc = self._validate_port_list(ports)
- if not rc:
- raise STLArgumentError('ports', ports, valid_values = self.get_all_ports())
-
- # verify clear global
- if not type(clear_global) is bool:
- raise STLArgumentError('clear_global', clear_global)
-
-
- rc = self.__clear_stats(ports, clear_global)
- if not rc:
- raise STLError(rc)
-
-
-
-
-
- """
- block until specify port(s) traffic has ended
-
- :parameters:
- ports : list
- ports to execute command
-
- timeout : int
- timeout in seconds
-
- :raises:
- + :exc:`STLTimeoutError` - in case timeout has expired
- + :exe:'STLError'
-
- """
- @__api_check(True)
- def wait_on_traffic (self, ports = None, timeout = 60):
-
- # by default use all acquired ports
- if ports == None:
- ports = self.get_acquired_ports()
-
- # verify valid port id list
- rc = self._validate_port_list(ports)
- if not rc:
- raise STLArgumentError('ports', ports, valid_values = self.get_all_ports())
-
- expr = time.time() + timeout
-
- # wait while any of the required ports are active
- while set(self.get_active_ports()).intersection(ports):
- time.sleep(0.01)
- if time.time() > expr:
- raise STLTimeoutError(timeout)
-
-
- """
- clear all events
-
- :parameters:
- None
-
- :raises:
- None
-
- """
- def clear_events (self):
- self.event_handler.clear_events()
-
-
- ############################ Line #############################
- ############################ Commands #############################
- ############################ #############################
-
- # console decorator
- def __console(f):
- def wrap(*args):
- client = args[0]
-
- time1 = time.time()
-
- try:
- rc = f(*args)
- except STLError as e:
- client.logger.log("Log:\n" + format_text(e.brief() + "\n", 'bold'))
- return
-
- # if got true - print time
- if rc:
- delta = time.time() - time1
- client.logger.log(format_time(delta) + "\n")
-
-
- return wrap
-
-
- @__console
- def connect_line (self, line):
- '''Connects to the TRex server'''
- # define a parser
- parser = parsing_opts.gen_parser(self,
- "connect",
- self.connect_line.__doc__,
- parsing_opts.FORCE)
-
- opts = parser.parse_args(line.split())
-
- if opts is None:
- return
-
- # call the API
- self.connect()
- self.acquire(force = opts.force)
-
- # true means print time
- return True
-
- @__console
- def disconnect_line (self, line):
- self.disconnect()
-
-
-
- @__console
- def reset_line (self, line):
- self.reset()
-
- # true means print time
- return True
-
-
- @__console
- def start_line (self, line):
- '''Start selected traffic in specified ports on TRex\n'''
- # define a parser
- parser = parsing_opts.gen_parser(self,
- "start",
- self.start_line.__doc__,
- parsing_opts.PORT_LIST_WITH_ALL,
- parsing_opts.TOTAL,
- parsing_opts.FORCE,
- parsing_opts.STREAM_FROM_PATH_OR_FILE,
- parsing_opts.DURATION,
- parsing_opts.MULTIPLIER_STRICT,
- parsing_opts.DRY_RUN)
-
- opts = parser.parse_args(line.split())
-
-
- if opts is None:
- return
-
-
- active_ports = list(set(self.get_active_ports()).intersection(opts.ports))
-
- if active_ports:
- if not opts.force:
- msg = "Port(s) {0} are active - please stop them or add '--force'\n".format(active_ports)
- self.logger.log(format_text(msg, 'bold'))
- return
- else:
- self.stop(active_ports)
-
-
- # remove all streams
- self.remove_all_streams(opts.ports)
-
- # pack the profile
- self.load_profile(opts.file[0], opts.ports)
-
- if opts.dry:
- self.validate(opts.ports, opts.mult, opts.duration, opts.total)
- else:
- self.start(opts.ports,
- opts.mult,
- opts.force,
- opts.duration,
- opts.total)
-
- # true means print time
- return True
-
-
-
- @__console
- def stop_line (self, line):
- '''Stop active traffic in specified ports on TRex\n'''
- parser = parsing_opts.gen_parser(self,
- "stop",
- self.stop_line.__doc__,
- parsing_opts.PORT_LIST_WITH_ALL)
-
- opts = parser.parse_args(line.split())
- if opts is None:
- return
-
- # find the relevant ports
- ports = list(set(self.get_active_ports()).intersection(opts.ports))
-
- if not ports:
- self.logger.log(format_text("No active traffic on provided ports\n", 'bold'))
- return
-
- self.stop(ports)
-
- # true means print time
- return True
-
-
- @__console
- def update_line (self, line):
- '''Update port(s) speed currently active\n'''
- parser = parsing_opts.gen_parser(self,
- "update",
- self.update_line.__doc__,
- parsing_opts.PORT_LIST_WITH_ALL,
- parsing_opts.MULTIPLIER,
- parsing_opts.TOTAL,
- parsing_opts.FORCE)
-
- opts = parser.parse_args(line.split())
- if opts is None:
- return
-
- # find the relevant ports
- ports = list(set(self.get_active_ports()).intersection(opts.ports))
-
- if not ports:
- self.logger.log(format_text("No ports in valid state to update\n", 'bold'))
- return
-
- self.update(ports, opts.mult, opts.total, opts.force)
-
- # true means print time
- return True
-
-
- @__console
- def pause_line (self, line):
- '''Pause active traffic in specified ports on TRex\n'''
- parser = parsing_opts.gen_parser(self,
- "pause",
- self.pause_line.__doc__,
- parsing_opts.PORT_LIST_WITH_ALL)
-
- opts = parser.parse_args(line.split())
- if opts is None:
- return
-
- # find the relevant ports
- ports = list(set(self.get_transmitting_ports()).intersection(opts.ports))
-
- if not ports:
- self.logger.log(format_text("No ports in valid state to pause\n", 'bold'))
- return
-
- self.pause(ports)
-
- # true means print time
- return True
-
-
- @__console
- def resume_line (self, line):
- '''Resume active traffic in specified ports on TRex\n'''
- parser = parsing_opts.gen_parser(self,
- "resume",
- self.resume_line.__doc__,
- parsing_opts.PORT_LIST_WITH_ALL)
-
- opts = parser.parse_args(line.split())
- if opts is None:
- return
-
- # find the relevant ports
- ports = list(set(self.get_paused_ports()).intersection(opts.ports))
-
- if not ports:
- self.logger.log(format_text("No ports in valid state to resume\n", 'bold'))
- return
-
- return self.resume(ports)
-
- # true means print time
- return True
-
-
- @__console
- def clear_stats_line (self, line):
- '''Clear cached local statistics\n'''
- # define a parser
- parser = parsing_opts.gen_parser(self,
- "clear",
- self.clear_stats_line.__doc__,
- parsing_opts.PORT_LIST_WITH_ALL)
-
- opts = parser.parse_args(line.split())
-
- if opts is None:
- return
-
- self.clear_stats(opts.ports)
-
-
-
-
- @__console
- def show_stats_line (self, line):
- '''Fetch statistics from TRex server by port\n'''
- # define a parser
- parser = parsing_opts.gen_parser(self,
- "stats",
- self.show_stats_line.__doc__,
- parsing_opts.PORT_LIST_WITH_ALL,
- parsing_opts.STATS_MASK)
-
- opts = parser.parse_args(line.split())
-
- if opts is None:
- return
-
- # determine stats mask
- mask = self.__get_mask_keys(**self.__filter_namespace_args(opts, trex_stats.ALL_STATS_OPTS))
- if not mask:
- # set to show all stats if no filter was given
- mask = trex_stats.ALL_STATS_OPTS
-
- stats_opts = trex_stats.ALL_STATS_OPTS.intersection(mask)
-
- stats = self._get_formatted_stats(opts.ports, mask)
-
-
- # print stats to screen
- for stat_type, stat_data in stats.iteritems():
- text_tables.print_table_with_header(stat_data.text_table, stat_type)
-
-
- @__console
- def show_streams_line(self, line):
- '''Fetch streams statistics from TRex server by port\n'''
- # define a parser
- parser = parsing_opts.gen_parser(self,
- "streams",
- self.show_streams_line.__doc__,
- parsing_opts.PORT_LIST_WITH_ALL,
- parsing_opts.STREAMS_MASK)
-
- opts = parser.parse_args(line.split())
-
- if opts is None:
- return
-
- streams = self._get_streams(opts.ports, set(opts.streams))
- if not streams:
- self.logger.log(format_text("No streams found with desired filter.\n", "bold", "magenta"))
-
- else:
- # print stats to screen
- for stream_hdr, port_streams_data in streams.iteritems():
- text_tables.print_table_with_header(port_streams_data.text_table,
- header= stream_hdr.split(":")[0] + ":",
- untouched_header= stream_hdr.split(":")[1])
-
-
-
-
- @__console
- def validate_line (self, line):
- '''validates port(s) stream configuration\n'''
-
- parser = parsing_opts.gen_parser(self,
- "validate",
- self.validate_line.__doc__,
- parsing_opts.PORT_LIST_WITH_ALL)
-
- opts = parser.parse_args(line.split())
- if opts is None:
- return
-
- self.validate(opts.ports)
-
-
-
- \ No newline at end of file
diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_sim.py b/scripts/automation/trex_control_plane/client/trex_stateless_sim.py
deleted file mode 100644
index 1452cdd1..00000000
--- a/scripts/automation/trex_control_plane/client/trex_stateless_sim.py
+++ /dev/null
@@ -1,430 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-
-"""
-Itay Marom
-Cisco Systems, Inc.
-
-Copyright (c) 2015-2015 Cisco Systems, Inc.
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-"""
-
-try:
- # support import for Python 2
- import outer_packages
-except ImportError:
- # support import for Python 3
- import client.outer_packages
-
-from common.trex_stl_exceptions import STLError
-from yaml import YAMLError
-from common.trex_streams import *
-from client_utils import parsing_opts
-
-import re
-import json
-
-
-
-import argparse
-import tempfile
-import subprocess
-import os
-from dpkt import pcap
-from operator import itemgetter
-
-class BpSimException(Exception):
- pass
-
-def merge_cap_files (pcap_file_list, out_filename, delete_src = False):
-
- out_pkts = []
- if not all([os.path.exists(f) for f in pcap_file_list]):
- print "failed to merge cap file list...\nnot all files exist\n"
- return
-
- # read all packets to a list
- for src in pcap_file_list:
- f = open(src, 'r')
- reader = pcap.Reader(f)
- pkts = reader.readpkts()
- out_pkts += pkts
- f.close()
- if delete_src:
- os.unlink(src)
-
- # sort by the timestamp
- out_pkts = sorted(out_pkts, key=itemgetter(0))
-
-
- out = open(out_filename, 'w')
- out_writer = pcap.Writer(out)
-
- for ts, pkt in out_pkts:
- out_writer.writepkt(pkt, ts)
-
- out.close()
-
-
-
-# stateless simulation
-class STLSim(object):
- def __init__ (self, bp_sim_path = None, handler = 0, port_id = 0):
-
- if not bp_sim_path:
- # auto find scripts
- m = re.match(".*/trex-core", os.getcwd())
- if not m:
- raise STLError('cannot find BP sim path, please provide it')
-
- self.bp_sim_path = os.path.join(m.group(0), 'scripts')
-
- else:
- self.bp_sim_path = bp_sim_path
-
- # dummies
- self.handler = handler
- self.port_id = port_id
-
-
- def load_input_file (self, input_file):
- # try YAML
- try:
- streams_db = CStreamsDB()
- stream_list = streams_db.load_yaml_file(input_file)
-
- # convert to new style stream object
- return [HACKSTLStream(stream) for stream in stream_list.compiled]
- except YAMLError:
- pass
-
- # try python
- try:
- basedir = os.path.dirname(input_file)
- sys.path.append(basedir)
-
- file = os.path.basename(input_file).split('.')[0]
- module = __import__(file, globals(), locals(), [], -1)
-
- return module.register().get_streams()
-
- except (AttributeError, ImportError) as e:
- print "specific error: {0}".format(e)
-
- raise STLError("bad format input file '{0}'".format(input_file))
-
-
- def generate_start_cmd (self, mult = "1", force = True, duration = -1):
- return {"id":1,
- "jsonrpc": "2.0",
- "method": "start_traffic",
- "params": {"handler": self.handler,
- "force": force,
- "port_id": self.port_id,
- "mul": parsing_opts.decode_multiplier(mult),
- "duration": duration}
- }
-
-
-
- # run command
- # input_list - a list of streams or YAML files
- # outfile - pcap file to save output, if None its a dry run
- # dp_core_count - how many DP cores to use
- # dp_core_index - simulate only specific dp core without merging
- # is_debug - debug or release image
- # pkt_limit - how many packets to simulate
- # mult - multiplier
- # mode - can be 'valgrind, 'gdb', 'json' or 'none'
- def run (self,
- input_list,
- outfile = None,
- dp_core_count = 1,
- dp_core_index = None,
- is_debug = True,
- pkt_limit = 5000,
- mult = "1",
- duration = -1,
- mode = 'none'):
-
- if not mode in ['none', 'gdb', 'valgrind', 'json']:
- raise STLArgumentError('mode', mode)
-
- # listify
- input_list = input_list if isinstance(input_list, list) else [input_list]
-
- # check streams arguments
- if not all([isinstance(i, (STLStream, str)) for i in input_list]):
- raise STLArgumentError('input_list', input_list)
-
- # split to two type
- input_files = [x for x in input_list if isinstance(x, str)]
- stream_list = [x for x in input_list if isinstance(x, STLStream)]
-
- # handle YAMLs
- for input_file in input_files:
- stream_list += self.load_input_file(input_file)
-
-
- # load streams
- cmds_json = []
- for stream in stream_list:
- cmd = {"id":1,
- "jsonrpc": "2.0",
- "method": "add_stream",
- "params": {"handler": self.handler,
- "port_id": self.port_id,
- "stream_id": stream.get_id(),
- "stream": stream.to_json()}
- }
-
- cmds_json.append(cmd)
-
- # generate start command
- cmds_json.append(self.generate_start_cmd(mult = mult,
- force = True,
- duration = duration))
-
- if mode == 'json':
- print json.dumps(cmds_json, indent = 4, separators=(',', ': '), sort_keys = True)
- return
-
- # start simulation
- self.outfile = outfile
- self.dp_core_count = dp_core_count
- self.dp_core_index = dp_core_index
- self.is_debug = is_debug
- self.pkt_limit = pkt_limit
- self.mult = mult
- self.duration = duration,
- self.mode = mode
-
- self.__run(cmds_json)
-
-
- # internal run
- def __run (self, cmds_json):
-
- # write to temp file
- f = tempfile.NamedTemporaryFile(delete = False)
- f.write(json.dumps(cmds_json))
- f.close()
-
- # launch bp-sim
- try:
- self.execute_bp_sim(f.name)
- finally:
- os.unlink(f.name)
-
-
-
- def execute_bp_sim (self, json_filename):
- if self.is_debug:
- exe = os.path.join(self.bp_sim_path, 'bp-sim-64-debug')
- else:
- exe = os.path.join(self.bp_sim_path, 'bp-sim-64')
-
- if not os.path.exists(exe):
- raise STLError("'{0}' does not exists, please build it before calling the simulation".format(exe))
-
-
- cmd = [exe,
- '--pcap',
- '--sl',
- '--cores',
- str(self.dp_core_count),
- '--limit',
- str(self.pkt_limit),
- '-f',
- json_filename]
-
- # out or dry
- if not self.outfile:
- cmd += ['--dry']
- cmd += ['-o', '/dev/null']
- else:
- cmd += ['-o', self.outfile]
-
- if self.dp_core_index != None:
- cmd += ['--core_index', str(self.dp_core_index)]
-
- if self.mode == 'valgrind':
- cmd = ['valgrind', '--leak-check=full', '--error-exitcode=1'] + cmd
-
- elif self.mode == 'gdb':
- cmd = ['/bin/gdb', '--args'] + cmd
-
- print "executing command: '{0}'".format(" ".join(cmd))
- rc = subprocess.call(cmd)
- if rc != 0:
- raise STLError('simulation has failed with error code {0}'.format(rc))
-
- self.merge_results()
-
-
- def merge_results (self):
- if not self.outfile:
- return
-
- if self.dp_core_count == 1:
- return
-
- if self.dp_core_index != None:
- return
-
-
- print "Mering cores output to a single pcap file...\n"
- inputs = ["{0}-{1}".format(self.outfile, index) for index in xrange(0, self.dp_core_count)]
- merge_cap_files(inputs, self.outfile, delete_src = True)
-
-
-
-
-def is_valid_file(filename):
- if not os.path.isfile(filename):
- raise argparse.ArgumentTypeError("The file '%s' does not exist" % filename)
-
- return filename
-
-
-def unsigned_int (x):
- x = int(x)
- if x < 0:
- raise argparse.ArgumentTypeError("argument must be >= 0")
-
- return x
-
-def setParserOptions():
- parser = argparse.ArgumentParser(prog="stl_sim.py")
-
- parser.add_argument("-f",
- dest ="input_file",
- help = "input file in YAML or Python format",
- type = is_valid_file,
- required=True)
-
- parser.add_argument("-o",
- dest = "output_file",
- default = None,
- help = "output file in ERF format")
-
-
- parser.add_argument("-c", "--cores",
- help = "DP core count [default is 1]",
- dest = "dp_core_count",
- default = 1,
- type = int,
- choices = xrange(1, 9))
-
- parser.add_argument("-n", "--core_index",
- help = "Record only a specific core",
- dest = "dp_core_index",
- default = None,
- type = int)
-
- parser.add_argument("-r", "--release",
- help = "runs on release image instead of debug [default is False]",
- action = "store_true",
- default = False)
-
-
- parser.add_argument("-l", "--limit",
- help = "limit test total packet count [default is 5000]",
- default = 5000,
- type = unsigned_int)
-
- parser.add_argument('-m', '--multiplier',
- help = parsing_opts.match_multiplier_help,
- dest = 'mult',
- default = "1",
- type = parsing_opts.match_multiplier_strict)
-
- parser.add_argument('-d', '--duration',
- help = "run duration",
- dest = 'duration',
- default = -1,
- type = float)
-
-
- group = parser.add_mutually_exclusive_group()
-
- group.add_argument("-x", "--valgrind",
- help = "run under valgrind [default is False]",
- action = "store_true",
- default = False)
-
- group.add_argument("-g", "--gdb",
- help = "run under GDB [default is False]",
- action = "store_true",
- default = False)
-
- group.add_argument("--json",
- help = "generate JSON output only to stdout [default is False]",
- action = "store_true",
- default = False)
-
- return parser
-
-
-def validate_args (parser, options):
-
- if options.dp_core_index:
- if not options.dp_core_index in xrange(0, options.dp_core_count):
- parser.error("DP core index valid range is 0 to {0}".format(options.dp_core_count - 1))
-
- # zero is ok - no limit, but other values must be at least as the number of cores
- if (options.limit != 0) and options.limit < options.dp_core_count:
- parser.error("limit cannot be lower than number of DP cores")
-
-
-def main ():
- parser = setParserOptions()
- options = parser.parse_args()
-
- validate_args(parser, options)
-
-
-
- if options.valgrind:
- mode = 'valgrind'
- elif options.gdb:
- mode = 'gdb'
- elif options.json:
- mode = 'json'
- else:
- mode = 'none'
-
- try:
- r = STLSim()
- r.run(input_list = options.input_file,
- outfile = options.output_file,
- dp_core_count = options.dp_core_count,
- dp_core_index = options.dp_core_index,
- is_debug = (not options.release),
- pkt_limit = options.limit,
- mult = options.mult,
- duration = options.duration,
- mode = mode)
-
- except KeyboardInterrupt as e:
- print "\n\n*** Caught Ctrl + C... Exiting...\n\n"
- exit(1)
-
- except STLError as e:
- print e
- exit(1)
-
- exit(0)
-
-if __name__ == '__main__':
- main()
-
-