diff options
Diffstat (limited to 'scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py')
-rw-r--r-- | scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py | 95 |
1 files changed, 64 insertions, 31 deletions
diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py index 29bad041..4dd07a13 100644 --- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py @@ -1,6 +1,9 @@ from collections import namedtuple, OrderedDict +from trex_stl_packet_builder_scapy import CScapyTRexPktBuilder +from trex_stl_streams import STLStream +import base64 import trex_stl_stats from trex_stl_types import * import time @@ -132,10 +135,23 @@ class Port(object): else: raise Exception("port {0}: bad state received from server '{1}'".format(self.port_id, port_state)) - # TODO: handle syncing the streams into stream_db self.next_available_id = long(rc.data()['max_stream_id']) + 1 + # sync the streams + params = {"port_id": self.port_id} + + command = RpcCmdData("get_all_streams", params) + rc = self.transmit(command.method, command.params) + if rc.bad(): + return self.err(rc.err()) + + for k, v in rc.data()['streams'].iteritems(): + self.streams[k] = {'next_id': v['next_stream_id'], + 'pkt' : base64.b64decode(v['packet']['binary']), + 'mode' : v['mode']['type'], + 'rate' : STLStream.get_rate_from_field(v['mode']['rate'])} + return self.ok() @@ -161,55 +177,64 @@ class Port(object): return self.err("Please stop port before attempting to add streams") # listify - streams_list = copy.deepcopy(streams_list if isinstance(streams_list, list) else [streams_list]) + streams_list = streams_list if isinstance(streams_list, list) else [streams_list] lookup = {} # allocate IDs for stream in streams_list: - if stream.get_id() == None: - stream.set_id(self.__allocate_stream_id()) - lookup[stream.get_name()] = stream.get_id() + # allocate stream id + stream_id = stream.get_id() if stream.get_id() is not None else self.__allocate_stream_id() + if stream_id in self.streams: + return self.err('Stream ID: {0} already exists'.format(stream_id)) - batch = [] + # name + name = stream.get_name() if stream.get_name() is not None else id(stream) + if name in lookup: + return self.err("multiple streams with duplicate name: '{0}'".format(name)) + lookup[name] = stream_id - + batch = [] for stream in streams_list: + name = stream.get_name() if stream.get_name() is not None else id(stream) + stream_id = lookup[name] next_id = -1 + next = stream.get_next() if next: if not next in lookup: return self.err("stream dependency error - unable to find '{0}'".format(next)) next_id = lookup[next] - - stream.set_next_id(next_id) - - stream_json = stream.to_json() - stream_json['next_stream_id'] = stream.get_next_id() + stream_json['next_stream_id'] = next_id params = {"handler": self.handler, "port_id": self.port_id, - "stream_id": stream.get_id(), + "stream_id": stream_id, "stream": stream_json} cmd = RpcCmdData('add_stream', params) batch.append(cmd) - self.streams[stream.get_id()] = stream rc = self.transmit_batch(batch) - if not rc: - return self.err(str(rc)) + for i, single_rc in enumerate(rc): + if single_rc: + stream_id = batch[i].params['stream_id'] + next_id = batch[i].params['stream']['next_stream_id'] + self.streams[stream_id] = {'next_id' : next_id, + 'pkt' : streams_list[i].get_pkt(), + 'mode' : streams_list[i].get_mode(), + 'rate' : streams_list[i].get_rate()} - # the only valid state now - self.state = self.STATE_STREAMS - return self.ok() + self.state = self.STATE_STREAMS if (len(self.streams) > 0) else self.STATE_IDLE + + return self.ok() if rc else self.err(str(rc)) @@ -239,16 +264,16 @@ class Port(object): cmd = RpcCmdData('remove_stream', params) batch.append(cmd) - del self.streams[stream_id] - rc = self.transmit_batch(batch) - if not rc: - return self.err(rc.err()) + for i, single_rc in enumerate(rc): + if single_rc: + id = batch[i].params['stream_id'] + del self.streams[stream_id] self.state = self.STATE_STREAMS if (len(self.streams) > 0) else self.STATE_IDLE - return self.ok() + return self.ok() if rc else self.err(rc.err()) # remove all the streams @@ -273,6 +298,7 @@ class Port(object): return self.ok() + # get a specific stream def get_stream (self, stream_id): if stream_id in self.streams: @@ -283,6 +309,7 @@ class Port(object): def get_all_streams (self): return self.streams + # start traffic def start (self, mul, duration, force): if not self.is_acquired(): @@ -324,7 +351,6 @@ class Port(object): return self.ok() - params = {"handler": self.handler, "port_id": self.port_id} @@ -421,6 +447,7 @@ class Port(object): return self.ok() + def get_profile (self): return self.profile @@ -496,18 +523,24 @@ class Port(object): return {} data = {} - for id, stream in self.streams.iteritems(): + for id, obj in self.streams.iteritems(): + + # lazy build scapy repr. + if not 'pkt_type' in obj: + obj['pkt_type'] = CScapyTRexPktBuilder.pkt_layers_desc_from_buffer(obj['pkt']) + data[id] = OrderedDict([ ('id', id), - ('packet_type', stream.get_pkt_type()), - ('L2 len', stream.get_pkt_len()), - ('mode' , stream.get_mode()), - ('rate_pps', stream.get_pps()), - ('next_stream', stream.get_next_id()) + ('packet_type', obj['pkt_type']), + ('L2 len', len(obj['pkt']) + 4), + ('mode', obj['mode']), + ('rate', obj['rate']), + ('next_stream', obj['next_id']) ]) return {"streams" : OrderedDict(sorted(data.items())) } + ################# events handler ###################### def async_event_port_stopped (self): self.state = self.STATE_STREAMS |