From 9fc980b8aa43cf53446eeeb5184f10a86476da28 Mon Sep 17 00:00:00 2001 From: Dan Klein Date: Thu, 7 Jan 2016 13:31:11 +0200 Subject: Working version of streams view in TRex console. TODO: sync when console crashes isn't integrated yet --- .../trex_control_plane/client/trex_port.py | 72 ++++++++++++++++++---- 1 file changed, 61 insertions(+), 11 deletions(-) (limited to 'scripts/automation/trex_control_plane/client/trex_port.py') diff --git a/scripts/automation/trex_control_plane/client/trex_port.py b/scripts/automation/trex_control_plane/client/trex_port.py index 54b4945e..fc63cf0d 100644 --- a/scripts/automation/trex_control_plane/client/trex_port.py +++ b/scripts/automation/trex_control_plane/client/trex_port.py @@ -1,8 +1,9 @@ -from collections import namedtuple +from collections import namedtuple, OrderedDict from common.trex_types import * from common import trex_stats - +from client_utils import packet_builder +StreamOnPort = namedtuple('StreamOnPort', ['compiled_stream', 'metadata']) ########## utlity ############ def mult_to_factor (mult, max_bps, max_pps, line_util): @@ -47,6 +48,7 @@ class Port(object): self.streams = {} self.profile = None self.session_id = session_id + self.loaded_stream_pack = None self.port_stats = trex_stats.CPortStats(self) @@ -124,7 +126,9 @@ class Port(object): elif port_state == "PAUSE": self.state = self.STATE_PAUSE else: - raise Exception("port {0}: bad state received from server '{1}'".format(self.port_id, sync_data['state'])) + raise Exception("port {0}: bad state received from server '{1}'".format(self.port_id, port_state)) + + # TODO: handle syncing the streams into stream_db return self.ok() @@ -151,7 +155,7 @@ class Port(object): return self.err(rc.err()) # add the stream - self.streams[stream_id] = stream_obj + self.streams[stream_id] = StreamOnPort(stream_obj, Port._generate_stream_metadata(stream_id, stream_obj)) # the only valid state now self.state = self.STATE_STREAMS @@ -159,14 +163,17 @@ class Port(object): return self.ok() # add multiple streams - def add_streams (self, streams_list): + def add_streams (self, LoadedStreamList_obj): batch = [] - for stream in streams_list: + self.loaded_stream_pack = LoadedStreamList_obj + compiled_stream_list = LoadedStreamList_obj.compiled + + for stream_pack in compiled_stream_list: params = {"handler": self.handler, "port_id": self.port_id, - "stream_id": stream.stream_id, - "stream": stream.stream} + "stream_id": stream_pack.stream_id, + "stream": stream_pack.stream} cmd = RpcCmdData('add_stream', params) batch.append(cmd) @@ -178,8 +185,10 @@ class Port(object): # validate that every action succeeded # add the stream - for stream in streams_list: - self.streams[stream.stream_id] = stream.stream + for stream_pack in compiled_stream_list: + self.streams[stream_pack.stream_id] = StreamOnPort(stream_pack.stream, + Port._generate_stream_metadata(stream_pack.stream_id, + stream_pack.stream)) # the only valid state now self.state = self.STATE_STREAMS @@ -203,7 +212,7 @@ class Port(object): self.streams[stream_id] = None - self.state = self.STATE_STREAMS if len(self.streams > 0) else self.STATE_IDLE + self.state = self.STATE_STREAMS if (len(self.streams) > 0) else self.STATE_IDLE return self.ok() @@ -408,6 +417,47 @@ class Port(object): def invalidate_stats(self): return self.port_stats.invalidate() + ################# stream printout ###################### + def generate_loaded_streams_sum(self, stream_id_list): + if self.state == self.STATE_DOWN or self.state == self.STATE_STREAMS: + return {} + elif self.loaded_stream_pack is None: + # avoid crashing when sync with remote server isn't operational + # TODO: MAKE SURE TO HANDLE THIS CASE FOR BETTER UX + return {} + streams_data = {} + + if not stream_id_list: + # if no mask has been provided, apply to all streams on port + stream_id_list = self.streams.keys() + + + streams_data = {stream_id: self.streams[stream_id].metadata.get('stream_sum', ["N/A"] * 6) + for stream_id in stream_id_list + if stream_id in self.streams} + + + return {"referring_file" : self.loaded_stream_pack.name, + "streams" : streams_data} + + @staticmethod + def _generate_stream_metadata(stream_id, compiled_stream_obj): + meta_dict = {} + # create packet stream description + pkt_bld_obj = packet_builder.CTRexPktBuilder() + pkt_bld_obj.load_from_stream_obj(compiled_stream_obj) + # generate stream summary based on that + + next_stream = "None" if compiled_stream_obj['next_stream_id']==-1 else compiled_stream_obj['next_stream_id'] + + meta_dict['stream_sum'] = OrderedDict([("id", stream_id), + ("packet_type", "/".join(pkt_bld_obj.get_packet_layers())), + ("length", pkt_bld_obj.get_packet_length()), + ("mode", compiled_stream_obj['mode']['type']), + ("rate_pps", compiled_stream_obj['mode']['pps']), + ("next_stream", next_stream) + ]) + return meta_dict ################# events handler ###################### def async_event_port_stopped (self): -- cgit