diff options
author | 2016-12-21 17:49:38 +0200 | |
---|---|---|
committer | 2016-12-21 17:52:14 +0200 | |
commit | 1f405257ba6caed845551b0641de914281ecfeba (patch) | |
tree | ed941041bd5d216d73566a07b1869f70b95df5ec /scripts/automation/trex_control_plane/stl/trex_stl_lib | |
parent | c77174ade8d36f377cfa74da4c487f04988a9679 (diff) |
RX services - general API to allow addition of new features
see trex_stl_lib/rx_services/trex_stl_rx_service_api.py
Signed-off-by: imarom <imarom@cisco.com>
Diffstat (limited to 'scripts/automation/trex_control_plane/stl/trex_stl_lib')
8 files changed, 376 insertions, 267 deletions
diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/rx_services/__init__.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/rx_services/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/rx_services/__init__.py diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/rx_services/trex_stl_rx_service_api.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/rx_services/trex_stl_rx_service_api.py new file mode 100644 index 00000000..b0904382 --- /dev/null +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/rx_services/trex_stl_rx_service_api.py @@ -0,0 +1,199 @@ + +import time + +# a generic abstract class for implementing RX services using the server +class RXServiceAPI(object): + + # specify for which layer this service is + LAYER_MODE_ANY = 0 + LAYER_MODE_L2 = 1 + LAYER_MODE_L3 = 2 + + def __init__ (self, port, layer_mode = LAYER_MODE_ANY, queue_size = 100): + self.port = port + self.queue_size = queue_size + self.layer_mode = layer_mode + + ################### virtual methods ###################### + + def get_name (self): + """ + returns the name of the service + + :returns: + str + + """ + + raise NotImplementedError() + + def pre_execute (self): + """ + specific class code called before executing + + :returns: + RC object + + """ + raise NotImplementedError() + + def generate_request (self): + """ + generate a request to be sent to the server + + :returns: + list of streams + + """ + raise NotImplementedError() + + def on_pkt_rx (self, pkt, start_ts): + """ + called for each packet arriving on RX + + :parameters: + 'pkt' - the packet received + 'start_ts' - the time recorded when 'start' was called + + :returns: + None for fetching more packets + RC object for terminating + + + + """ + raise NotImplementedError() + + + def on_timeout_err (self, retries): + """ + called when a timeout occurs + + :parameters: + retries - how many times was the service retring before failing + + :returns: + RC object + + """ + raise NotImplementedError() + + + ##################### API ###################### + def execute (self, retries = 0): + + # sanity check + rc = self.__sanity() + if not rc: + return rc + + # first cleanup + rc = self.port.remove_all_streams() + if not rc: + return rc + + + # start the iteration + try: + + # add the stream(s) + self.port.add_streams(self.generate_request()) + rc = self.port.set_rx_queue(size = self.queue_size) + if not rc: + return rc + + return self.__execute_internal(retries) + + finally: + # best effort restore + self.port.remove_rx_queue() + self.port.remove_all_streams() + + + ##################### Internal ###################### + def __sanity (self): + if not self.port.is_service_mode_on(): + return self.port.err('port service mode must be enabled for performing {0}. Please enable service mode'.format(self.get_name())) + + if self.layer_mode == RXServiceAPI.LAYER_MODE_L2: + if not self.port.is_l2_mode(): + return self.port.err('{0} - requires L2 mode configuration'.format(self.get_name())) + + elif self.layer_mode == RXServiceAPI.LAYER_MODE_L3: + if not self.port.is_l3_mode(): + return self.port.err('{0} - requires L3 mode configuration'.format(self.get_name())) + + + # sanity + if self.port.is_active(): + return self.port.err('{0} - port is active, please stop the port before executing command'.format(self.get_name())) + + # call the specific class implementation + rc = self.pre_execute() + if not rc: + return rc + + return True + + + # main resolve function + def __execute_internal (self, retries): + + # retry for 'retries' + index = 0 + while True: + rc = self.execute_iteration() + if rc is not None: + return rc + + if index >= retries: + return self.on_timeout_err(retries) + + index += 1 + time.sleep(0.1) + + + + def execute_iteration (self): + + mult = {'op': 'abs', 'type' : 'percentage', 'value': 100} + rc = self.port.start(mul = mult, force = False, duration = -1, mask = 0xffffffff) + if not rc: + return rc + + # save the start timestamp + self.start_ts = rc.data()['ts'] + + # block until traffic finishes + while self.port.is_active(): + time.sleep(0.01) + + return self.wait_for_rx_response() + + + def wait_for_rx_response (self): + + # we try to fetch response for 5 times + polling = 5 + + while polling > 0: + + # fetch the queue + rx_pkts = self.port.get_rx_queue_pkts() + + # might be an error + if not rx_pkts: + return rx_pkts + + # for each packet - examine it + for pkt in rx_pkts.data(): + rc = self.on_pkt_rx(pkt, self.start_ts) + if rc is not None: + return rc + + if polling == 0: + return None + + polling -= 1 + time.sleep(0.1) + diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/rx_services/trex_stl_rx_service_arp.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/rx_services/trex_stl_rx_service_arp.py new file mode 100644 index 00000000..2c159313 --- /dev/null +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/rx_services/trex_stl_rx_service_arp.py @@ -0,0 +1,57 @@ +from .trex_stl_rx_service_api import RXServiceAPI + +from ..trex_stl_streams import STLStream, STLTXSingleBurst +from ..trex_stl_packet_builder_scapy import STLPktBuilder + +from scapy.layers.l2 import Ether, ARP + + +class RXServiceARP(RXServiceAPI): + + def __init__ (self, port_id): + super(RXServiceARP, self).__init__(port_id, layer_mode = RXServiceAPI.LAYER_MODE_L3) + + def get_name (self): + return "ARP" + + def pre_execute (self): + + self.dst = self.port.get_dst_addr() + self.src = self.port.get_src_addr() + + return self.port.ok() + + # return a list of streams for request + def generate_request (self): + + base_pkt = Ether(dst="ff:ff:ff:ff:ff:ff")/ARP(psrc = self.src['ipv4'], pdst = self.dst['ipv4'], hwsrc = self.src['mac']) + s1 = STLStream( packet = STLPktBuilder(pkt = base_pkt), mode = STLTXSingleBurst(total_pkts = 1) ) + + return [s1] + + + def on_pkt_rx (self, pkt, start_ts): + # convert to scapy + scapy_pkt = Ether(pkt['binary']) + + # if not ARP wait for the next one + if not 'ARP' in scapy_pkt: + return None + + arp = scapy_pkt['ARP'] + + # check this is the right ARP (ARP reply with the address) + if (arp.op != 2) or (arp.psrc != self.dst['ipv4']): + return None + + + return self.port.ok({'psrc' : arp.psrc, 'hwsrc': arp.hwsrc}) + + #return self.port.ok('Port {0} - Recieved ARP reply from: {1}, hw: {2}'.format(self.port.port_id, arp.psrc, arp.hwsrc)) + + + def on_timeout_err (self, retries): + return self.port.err('failed to receive ARP response ({0} retries)'.format(retries)) + + + diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/rx_services/trex_stl_rx_service_icmp.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/rx_services/trex_stl_rx_service_icmp.py new file mode 100644 index 00000000..486cd458 --- /dev/null +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/rx_services/trex_stl_rx_service_icmp.py @@ -0,0 +1,85 @@ +from .trex_stl_rx_service_api import RXServiceAPI + +from ..trex_stl_streams import STLStream, STLTXSingleBurst +from ..trex_stl_packet_builder_scapy import STLPktBuilder + +from scapy.layers.l2 import Ether +from scapy.layers.inet import IP, ICMP + + +class RXServiceICMP(RXServiceAPI): + + def __init__ (self, port, ping_ip, pkt_size): + + super(RXServiceICMP, self).__init__(port, layer_mode = RXServiceAPI.LAYER_MODE_L3) + self.ping_ip = ping_ip + self.pkt_size = pkt_size + + def get_name (self): + return "PING" + + def pre_execute (self): + + if not self.port.is_resolved(): + return self.port.err('ping - port has an unresolved destination, cannot determine next hop MAC address') + + self.src = self.port.get_src_addr() + self.dst = self.port.get_dst_addr() + + + return self.port.ok() + + + # return a list of streams for request + def generate_request (self): + + base_pkt = Ether(dst = self.dst['mac'])/IP(src = self.src['ipv4'], dst = self.ping_ip)/ICMP(type = 8) + pad = max(0, self.pkt_size - len(base_pkt)) + + base_pkt = base_pkt / (pad * 'x') + + s1 = STLStream( packet = STLPktBuilder(pkt = base_pkt), mode = STLTXSingleBurst(total_pkts = 1) ) + + self.base_pkt = base_pkt + + return [s1] + + def on_pkt_rx (self, pkt, start_ts): + + scapy_pkt = Ether(pkt['binary']) + if not 'ICMP' in scapy_pkt: + return None + + ip = scapy_pkt['IP'] + if ip.dst != self.src['ipv4']: + return None + + icmp = scapy_pkt['ICMP'] + + dt = pkt['ts'] - start_ts + + # echo reply + if icmp.type == 0: + # check seq + if icmp.seq != self.base_pkt['ICMP'].seq: + return None + return self.port.ok('Reply from {0}: bytes={1}, time={2:.2f}ms, TTL={3}'.format(ip.src, len(pkt['binary']), dt * 1000, ip.ttl)) + + # unreachable + elif icmp.type == 3: + # check seq + if icmp.payload.seq != self.base_pkt['ICMP'].seq: + return None + return self.port.ok('Reply from {0}: Destination host unreachable'.format(icmp.src)) + + else: + # skip any other types + #scapy_pkt.show2() + return None + + + + # return the str of a timeout err + def on_timeout_err (self, retries): + return self.port.ok('Request timed out.') + diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py index 946c79dc..f86fff26 100755 --- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py @@ -2991,19 +2991,20 @@ class STLClient(object): ports = self._validate_port_list(ports) self.logger.pre_cmd('Resolving destination on port(s) {0}:'.format(ports)) - with self.logger.supress(): + + with self.logger.supress(level = LoggerApi.VERBOSE_REGULAR_SYNC): rc = self.__resolve(ports, retries) self.logger.post_cmd(rc) - + + if verbose: + for x in filter(bool, rc.data()): + self.logger.log(format_text("{0}".format(x), 'bold')) + if not rc: raise STLError(rc) - # print the ARP transaction - if verbose: - self.logger.log(rc) - self.logger.log('') - + @__api_check(True) diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py index 3fe4c198..d4275cb1 100644 --- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py @@ -4,7 +4,10 @@ from collections import namedtuple, OrderedDict from .trex_stl_packet_builder_scapy import STLPktBuilder from .trex_stl_streams import STLStream from .trex_stl_types import * -from .trex_stl_rx_features import ARPResolver, PingResolver + +from .rx_services.trex_stl_rx_service_arp import RXServiceARP +from .rx_services.trex_stl_rx_service_icmp import RXServiceICMP + from . import trex_stl_stats from .utils.constants import FLOW_CTRL_DICT_REVERSED @@ -953,17 +956,32 @@ class Port(object): @writeable def arp_resolve (self, retries): - if not self.is_service_mode_on(): - return self.err('port service mode must be enabled for performing ARP resolution. Please enable service mode') + + # execute the ARP service + rc = RXServiceARP(self).execute(retries) + if not rc: + return rc - return ARPResolver(self).resolve(retries) + # fetch the data returned + arp_rc = rc.data() + + # first invalidate current ARP if exists + rc = self.invalidate_arp() + if not rc: + return rc + + # update the port with L3 full configuration + rc = self.set_l3_mode(self.get_src_addr()['ipv4'], self.get_dst_addr()['ipv4'], arp_rc['hwsrc']) + if not rc: + return rc + + return self.ok('Port {0} - Recieved ARP reply from: {1}, hw: {2}'.format(self.port_id, arp_rc['psrc'], arp_rc['hwsrc'])) + + @writeable def ping (self, ping_ipv4, pkt_size): - if not self.is_service_mode_on(): - return self.err('port service mode must be enabled for performing ping. Please enable service mode') - - return PingResolver(self, ping_ipv4, pkt_size).resolve() + return RXServiceICMP(self, ping_ipv4, pkt_size).execute() ################# stats handler ###################### diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_rx_features.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_rx_features.py deleted file mode 100644 index 727451e6..00000000 --- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_rx_features.py +++ /dev/null @@ -1,251 +0,0 @@ - -from .trex_stl_streams import STLStream, STLTXSingleBurst -from .trex_stl_packet_builder_scapy import STLPktBuilder - -from scapy.layers.l2 import Ether, ARP -from scapy.layers.inet import IP, ICMP - -import time - -# a generic abstract class for resolving using the server -class Resolver(object): - def __init__ (self, port, queue_size = 100): - self.port = port - - # code to execute before sending any request - return RC object - def pre_send (self): - raise NotImplementedError() - - # return a list of streams for request - def generate_request (self): - raise NotImplementedError() - - # return None for more packets otherwise RC object - def on_pkt_rx (self, pkt): - raise NotImplementedError() - - # return value in case of timeout - def on_timeout_err (self, retries): - raise NotImplementedError() - - ##################### API ###################### - def resolve (self, retries = 0): - - # first cleanup - rc = self.port.remove_all_streams() - if not rc: - return rc - - # call the specific class implementation - rc = self.pre_send() - if not rc: - return rc - - # start the iteration - try: - - # add the stream(s) - self.port.add_streams(self.generate_request()) - rc = self.port.set_rx_queue(size = 100) - if not rc: - return rc - - return self.resolve_wrapper(retries) - - finally: - # best effort restore - self.port.remove_rx_queue() - self.port.remove_all_streams() - - - # main resolve function - def resolve_wrapper (self, retries): - - # retry for 'retries' - index = 0 - while True: - rc = self.resolve_iteration() - if rc is not None: - return rc - - if index >= retries: - return self.on_timeout_err(retries) - - index += 1 - time.sleep(0.1) - - - - def resolve_iteration (self): - - mult = {'op': 'abs', 'type' : 'percentage', 'value': 100} - rc = self.port.start(mul = mult, force = False, duration = -1, mask = 0xffffffff) - if not rc: - return rc - - # save the start timestamp - self.start_ts = rc.data()['ts'] - - # block until traffic finishes - while self.port.is_active(): - time.sleep(0.01) - - return self.wait_for_rx_response() - - - def wait_for_rx_response (self): - - # we try to fetch response for 5 times - polling = 5 - - while polling > 0: - - # fetch the queue - rx_pkts = self.port.get_rx_queue_pkts() - - # might be an error - if not rx_pkts: - return rx_pkts - - # for each packet - examine it - for pkt in rx_pkts.data(): - rc = self.on_pkt_rx(pkt) - if rc is not None: - return rc - - if polling == 0: - return None - - polling -= 1 - time.sleep(0.1) - - - - - -class ARPResolver(Resolver): - def __init__ (self, port_id): - super(ARPResolver, self).__init__(port_id) - - # before resolve - def pre_send (self): - - if not self.port.is_l3_mode(): - return self.port.err("arp - port is not configured as L3 layer") - - self.dst = self.port.get_dst_addr() - self.src = self.port.get_src_addr() - - return self.port.invalidate_arp() - - - # return a list of streams for request - def generate_request (self): - - base_pkt = Ether(dst="ff:ff:ff:ff:ff:ff")/ARP(psrc = self.src['ipv4'], pdst = self.dst['ipv4'], hwsrc = self.src['mac']) - s1 = STLStream( packet = STLPktBuilder(pkt = base_pkt), mode = STLTXSingleBurst(total_pkts = 1) ) - - return [s1] - - - # return None in case more packets are needed else the status rc - def on_pkt_rx (self, pkt): - scapy_pkt = Ether(pkt['binary']) - if not 'ARP' in scapy_pkt: - return None - - arp = scapy_pkt['ARP'] - - # check this is the right ARP (ARP reply with the address) - if (arp.op != 2) or (arp.psrc != self.dst['ipv4']): - return None - - - # update the port with L3 full configuration - rc = self.port.set_l3_mode(self.src['ipv4'], self.dst['ipv4'], arp.hwsrc) - if not rc: - return rc - - return self.port.ok('Port {0} - Recieved ARP reply from: {1}, hw: {2}'.format(self.port.port_id, arp.psrc, arp.hwsrc)) - - - def on_timeout_err (self, retries): - return self.port.err('failed to receive ARP response ({0} retries)'.format(retries)) - - - - - #################### ping resolver #################### - -class PingResolver(Resolver): - def __init__ (self, port, ping_ip, pkt_size): - super(PingResolver, self).__init__(port) - self.ping_ip = ping_ip - self.pkt_size = pkt_size - - def pre_send (self): - if not self.port.is_l3_mode(): - return self.port.err('ping - port is not configured as L3 layer') - - if not self.port.is_resolved(): - return self.port.err('ping - port has an unresolved destination, cannot determine next hop MAC address') - - self.src = self.port.get_src_addr() - self.dst = self.port.get_dst_addr() - - - return self.port.ok() - - - # return a list of streams for request - def generate_request (self): - - base_pkt = Ether(dst = self.dst['mac'])/IP(src = self.src['ipv4'], dst = self.ping_ip)/ICMP(type = 8) - pad = max(0, self.pkt_size - len(base_pkt)) - - base_pkt = base_pkt / (pad * 'x') - - s1 = STLStream( packet = STLPktBuilder(pkt = base_pkt), mode = STLTXSingleBurst(total_pkts = 1) ) - - self.base_pkt = base_pkt - - return [s1] - - # return None for more packets otherwise RC object - def on_pkt_rx (self, pkt): - scapy_pkt = Ether(pkt['binary']) - if not 'ICMP' in scapy_pkt: - return None - - ip = scapy_pkt['IP'] - if ip.dst != self.src['ipv4']: - return None - - icmp = scapy_pkt['ICMP'] - - dt = pkt['ts'] - self.start_ts - - # echo reply - if icmp.type == 0: - # check seq - if icmp.seq != self.base_pkt['ICMP'].seq: - return None - return self.port.ok('Reply from {0}: bytes={1}, time={2:.2f}ms, TTL={3}'.format(ip.src, len(pkt['binary']), dt * 1000, ip.ttl)) - - # unreachable - elif icmp.type == 3: - # check seq - if icmp.payload.seq != self.base_pkt['ICMP'].seq: - return None - return self.port.ok('Reply from {0}: Destination host unreachable'.format(icmp.src)) - - else: - # skip any other types - #scapy_pkt.show2() - return None - - - - # return the str of a timeout err - def on_timeout_err (self, retries): - return self.port.ok('Request timed out.') diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_types.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_types.py index a60a7ede..0230db23 100644 --- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_types.py +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_types.py @@ -64,7 +64,7 @@ class RC(): err_count += 1 if len(err_list) < show_count: err_list.append(format_text(x, 'bold')) - s = '\n' if len(err_list) > 1 else '' + s = '\n' if err_count > show_count: s += format_text('Occurred %s errors, showing first %s:\n' % (err_count, show_count), 'bold') s += '\n'.join(err_list) |