summaryrefslogtreecommitdiffstats
path: root/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py')
-rw-r--r--scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py95
1 files changed, 64 insertions, 31 deletions
diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py
index 29bad041..4dd07a13 100644
--- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py
+++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py
@@ -1,6 +1,9 @@
from collections import namedtuple, OrderedDict
+from trex_stl_packet_builder_scapy import CScapyTRexPktBuilder
+from trex_stl_streams import STLStream
+import base64
import trex_stl_stats
from trex_stl_types import *
import time
@@ -132,10 +135,23 @@ class Port(object):
else:
raise Exception("port {0}: bad state received from server '{1}'".format(self.port_id, port_state))
- # TODO: handle syncing the streams into stream_db
self.next_available_id = long(rc.data()['max_stream_id']) + 1
+ # sync the streams
+ params = {"port_id": self.port_id}
+
+ command = RpcCmdData("get_all_streams", params)
+ rc = self.transmit(command.method, command.params)
+ if rc.bad():
+ return self.err(rc.err())
+
+ for k, v in rc.data()['streams'].iteritems():
+ self.streams[k] = {'next_id': v['next_stream_id'],
+ 'pkt' : base64.b64decode(v['packet']['binary']),
+ 'mode' : v['mode']['type'],
+ 'rate' : STLStream.get_rate_from_field(v['mode']['rate'])}
+
return self.ok()
@@ -161,55 +177,64 @@ class Port(object):
return self.err("Please stop port before attempting to add streams")
# listify
- streams_list = copy.deepcopy(streams_list if isinstance(streams_list, list) else [streams_list])
+ streams_list = streams_list if isinstance(streams_list, list) else [streams_list]
lookup = {}
# allocate IDs
for stream in streams_list:
- if stream.get_id() == None:
- stream.set_id(self.__allocate_stream_id())
- lookup[stream.get_name()] = stream.get_id()
+ # allocate stream id
+ stream_id = stream.get_id() if stream.get_id() is not None else self.__allocate_stream_id()
+ if stream_id in self.streams:
+ return self.err('Stream ID: {0} already exists'.format(stream_id))
- batch = []
+ # name
+ name = stream.get_name() if stream.get_name() is not None else id(stream)
+ if name in lookup:
+ return self.err("multiple streams with duplicate name: '{0}'".format(name))
+ lookup[name] = stream_id
-
+ batch = []
for stream in streams_list:
+ name = stream.get_name() if stream.get_name() is not None else id(stream)
+ stream_id = lookup[name]
next_id = -1
+
next = stream.get_next()
if next:
if not next in lookup:
return self.err("stream dependency error - unable to find '{0}'".format(next))
next_id = lookup[next]
-
- stream.set_next_id(next_id)
-
-
stream_json = stream.to_json()
- stream_json['next_stream_id'] = stream.get_next_id()
+ stream_json['next_stream_id'] = next_id
params = {"handler": self.handler,
"port_id": self.port_id,
- "stream_id": stream.get_id(),
+ "stream_id": stream_id,
"stream": stream_json}
cmd = RpcCmdData('add_stream', params)
batch.append(cmd)
- self.streams[stream.get_id()] = stream
rc = self.transmit_batch(batch)
- if not rc:
- return self.err(str(rc))
+ for i, single_rc in enumerate(rc):
+ if single_rc:
+ stream_id = batch[i].params['stream_id']
+ next_id = batch[i].params['stream']['next_stream_id']
+ self.streams[stream_id] = {'next_id' : next_id,
+ 'pkt' : streams_list[i].get_pkt(),
+ 'mode' : streams_list[i].get_mode(),
+ 'rate' : streams_list[i].get_rate()}
- # the only valid state now
- self.state = self.STATE_STREAMS
- return self.ok()
+ self.state = self.STATE_STREAMS if (len(self.streams) > 0) else self.STATE_IDLE
+
+ return self.ok() if rc else self.err(str(rc))
@@ -239,16 +264,16 @@ class Port(object):
cmd = RpcCmdData('remove_stream', params)
batch.append(cmd)
- del self.streams[stream_id]
-
rc = self.transmit_batch(batch)
- if not rc:
- return self.err(rc.err())
+ for i, single_rc in enumerate(rc):
+ if single_rc:
+ id = batch[i].params['stream_id']
+ del self.streams[stream_id]
self.state = self.STATE_STREAMS if (len(self.streams) > 0) else self.STATE_IDLE
- return self.ok()
+ return self.ok() if rc else self.err(rc.err())
# remove all the streams
@@ -273,6 +298,7 @@ class Port(object):
return self.ok()
+
# get a specific stream
def get_stream (self, stream_id):
if stream_id in self.streams:
@@ -283,6 +309,7 @@ class Port(object):
def get_all_streams (self):
return self.streams
+
# start traffic
def start (self, mul, duration, force):
if not self.is_acquired():
@@ -324,7 +351,6 @@ class Port(object):
return self.ok()
-
params = {"handler": self.handler,
"port_id": self.port_id}
@@ -421,6 +447,7 @@ class Port(object):
return self.ok()
+
def get_profile (self):
return self.profile
@@ -496,18 +523,24 @@ class Port(object):
return {}
data = {}
- for id, stream in self.streams.iteritems():
+ for id, obj in self.streams.iteritems():
+
+ # lazy build scapy repr.
+ if not 'pkt_type' in obj:
+ obj['pkt_type'] = CScapyTRexPktBuilder.pkt_layers_desc_from_buffer(obj['pkt'])
+
data[id] = OrderedDict([ ('id', id),
- ('packet_type', stream.get_pkt_type()),
- ('L2 len', stream.get_pkt_len()),
- ('mode' , stream.get_mode()),
- ('rate_pps', stream.get_pps()),
- ('next_stream', stream.get_next_id())
+ ('packet_type', obj['pkt_type']),
+ ('L2 len', len(obj['pkt']) + 4),
+ ('mode', obj['mode']),
+ ('rate', obj['rate']),
+ ('next_stream', obj['next_id'])
])
return {"streams" : OrderedDict(sorted(data.items())) }
+
################# events handler ######################
def async_event_port_stopped (self):
self.state = self.STATE_STREAMS