# Copyright (c) 2021 Cisco and/or its affiliates. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at: # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Performance testing traffic generator library.""" import math import time from robot.api import logger from robot.libraries.BuiltIn import BuiltIn from .Constants import Constants from .CpuUtils import CpuUtils from .DropRateSearch import DropRateSearch from .MLRsearch.AbstractMeasurer import AbstractMeasurer from .MLRsearch.MultipleLossRatioSearch import MultipleLossRatioSearch from .MLRsearch.ReceiveRateMeasurement import ReceiveRateMeasurement from .PLRsearch.PLRsearch import PLRsearch from .OptionString import OptionString from .ssh import exec_cmd_no_error, exec_cmd from .topology import NodeType from .topology import NodeSubTypeTG from .topology import Topology __all__ = [u"TGDropRateSearchImpl", u"TrafficGenerator", u"OptimizedSearch"] def check_subtype(node): """Return supported subtype of given node, or raise an exception. Currently only one subtype is supported, but we want our code to be ready for other ones. :param node: Topology node to check. Can be None. :type node: dict or NoneType :returns: Subtype detected. :rtype: NodeSubTypeTG :raises RuntimeError: If node is not supported, message explains how. """ if node.get(u"type") is None: msg = u"Node type is not defined" elif node[u"type"] != NodeType.TG: msg = f"Node type is {node[u'type']!r}, not a TG" elif node.get(u"subtype") is None: msg = u"TG subtype is not defined" elif node[u"subtype"] != NodeSubTypeTG.TREX: msg = f"TG subtype {node[u'subtype']!r} is not supported" else: return NodeSubTypeTG.TREX raise RuntimeError(msg) class TGDropRateSearchImpl(DropRateSearch): """Drop Rate Search implementation.""" # def __init__(self): # super(TGDropRateSearchImpl, self).__init__() def measure_loss( self, rate, frame_size, loss_acceptance, loss_acceptance_type, traffic_profile): """Runs the traffic and evaluate the measured results. :param rate: Offered traffic load. :param frame_size: Size of frame. :param loss_acceptance: Permitted drop ratio or frames count. :param loss_acceptance_type: Type of permitted loss. :param traffic_profile: Module name as a traffic profile identifier. See GPL/traffic_profiles/trex for implemented modules. :type rate: float :type frame_size: str :type loss_acceptance: float :type loss_acceptance_type: LossAcceptanceType :type traffic_profile: str :returns: Drop threshold exceeded? (True/False) :rtype: bool :raises NotImplementedError: If TG is not supported. :raises RuntimeError: If TG is not specified. """ # we need instance of TrafficGenerator instantiated by Robot Framework # to be able to use trex_stl-*() tg_instance = BuiltIn().get_library_instance( u"resources.libraries.python.TrafficGenerator" ) subtype = check_subtype(tg_instance.node) if subtype == NodeSubTypeTG.TREX: unit_rate = str(rate) + self.get_rate_type_str() tg_instance.trex_stl_start_remote_exec( self.get_duration(), unit_rate, frame_size, traffic_profile ) loss = tg_instance.get_loss() sent = tg_instance.get_sent() if self.loss_acceptance_type_is_percentage(): loss = (float(loss) / float(sent)) * 100 logger.trace( f"comparing: {loss} < {loss_acceptance} {loss_acceptance_type}" ) return float(loss) <= float(loss_acceptance) return False def get_latency(self): """Returns min/avg/max latency. :returns: Latency stats. :rtype: list """ tg_instance = BuiltIn().get_library_instance( u"resources.libraries.python.TrafficGenerator" ) return tg_instance.get_latency_int() class TrexMode: """Defines mode of T-Rex traffic generator.""" # Advanced stateful mode ASTF = u"ASTF" # Stateless mode STL = u"STL" # TODO: Pylint says too-many-instance-attributes. class TrafficGenerator(AbstractMeasurer): """Traffic Generator.""" # TODO: Remove "trex" from lines which could work with other TGs. # Use one instance of TrafficGenerator for all tests in test suite ROBOT_LIBRARY_SCOPE = u"TEST SUITE" def __init__(self): # TODO: Separate into few dataclasses/dicts. # Pylint dislikes large unstructured state, and it is right. self._node = None self._mode = None # TG interface order mapping self._ifaces_reordered = False # Result holding fields, to be removed. self._result = None self._loss = None self._sent = None self._latency = None self._received = None self._approximated_rate = None self._approximated_duration = None self._l7_data = None # Measurement input fields, needed for async stop result. self._start_time = None self._stop_time = None self._rate = None self._target_duration = None self._duration = None # Other input parameters, not knowable from measure() signature. self.frame_size = None self.traffic_profile = None self.traffic_directions = None self.negative_loss = None self.use_latency = None self.ppta = None self.resetter = None self.transaction_scale = None self.transaction_duration = None self.sleep_till_duration = None self.transaction_type = None self.duration_limit = None self.ramp_up_start = None self.ramp_up_stop = None self.ramp_up_rate = None self.ramp_up_duration = None self.state_timeout = None # Transient data needed for async measurements. self._xstats = (None, None) # TODO: Rename "xstats" to something opaque, so T-Rex is not privileged? @property def node(self): """Getter. :returns: Traffic generator node. :rtype: dict """ return self._node def get_loss(self): """Return number of lost packets. :returns: Number of lost packets. :rtype: str """ return self._loss def get_sent(self): """Return number of sent packets. :returns: Number of sent packets. :rtype: str """ return self._sent def get_received(self): """Return number of received packets. :returns: Number of received packets. :rtype: str """ return self._received def get_latency_int(self): """Return rounded min/avg/max latency. :returns: Latency stats. :rtype: list """ return self._latency def get_approximated_rate(self): """Return approximated rate computed as ratio of transmitted packets over duration of trial. :returns: Approximated rate. :rtype: str """ return self._approximated_rate def get_l7_data(self): """Return L7 data. :returns: Number of received packets. :rtype: dict """ return self._l7_data def check_mode(self, expected_mode): """Check TG mode. :param expected_mode: Expected traffic generator mode. :type expected_mode: object :raises RuntimeError: In case of unexpected TG mode. """ if self._mode == expected_mode: return raise RuntimeError( f"{self._node[u'subtype']} not running in {expected_mode} mode!" ) # TODO: pylint says disable=too-many-locals. def initialize_traffic_generator( self, tg_node, tg_if1, tg_if2, tg_if1_adj_node, tg_if1_adj_if, tg_if2_adj_node, tg_if2_adj_if, osi_layer, tg_if1_dst_mac=None, tg_if2_dst_mac=None): """TG initialization. TODO: Document why do we need (and how do we use) _ifaces_reordered. :param tg_node: Traffic generator node. :param tg_if1: TG - name of first interface. :param tg_if2: TG - name of second interface. :param tg_if1_adj_node: TG if1 adjecent node. :param tg_if1_adj_if: TG if1 adjecent interface. :param tg_if2_adj_node: TG if2 adjecent node. :param tg_if2_adj_if: TG if2 adjecent interface. :param osi_layer: 'L2', 'L3' or 'L7' - OSI Layer testing type. :param tg_if1_dst_mac: Interface 1 destination MAC address. :param tg_if2_dst_mac: Interface 2 destination MAC address. :type tg_node: dict :type tg_if1: str :type tg_if2: str :type tg_if1_adj_node: dict :type tg_if1_adj_if: str :type tg_if2_adj_node: dict :type tg_if2_adj_if: str :type osi_layer: str :type tg_if1_dst_mac: str :type tg_if2_dst_mac: str :returns: nothing :raises RuntimeError: In case of issue during initialization. """ subtype = check_subtype(tg_node) if subtype == NodeSubTypeTG.TREX: self._node = tg_node self._mode = TrexMode.ASTF if osi_layer == u"L7" else TrexMode.STL if1 = dict() if2 = dict() if1[u"pci"] = Topology().get_interface_pci_addr(self._node, tg_if1) if2[u"pci"] = Topology().get_interface_pci_addr(self._node, tg_if2) if1[u"addr"] = Topology().get_interface_mac(self._node, tg_if1) if2[u"addr"] = Topology().get_interface_mac(self._node, tg_if2) if osi_layer == u"L2": if1[u"adj_addr"] = if2[u"addr"] if2[u"adj_addr"] = if1[u"addr"] elif osi_layer in (u"L3", u"L7"): if1[u"adj_addr"] = Topology().get_interface_mac( tg_if1_adj_node, tg_if1_adj_if ) if2[u"adj_addr"] = Topology().get_interface_mac( tg_if2_adj_node, tg_if2_adj_if ) else: raise ValueError(u"Unknown OSI layer!") # in case of switched environment we can override MAC addresses if tg_if1_dst_mac is not None and tg_if2_dst_mac is not None: if1[u"adj_addr"] = tg_if1_dst_mac if2[u"adj_addr"] = tg_if2_dst_mac if min(if1[u"pci"], if2[u"pci"]) != if1[u"pci"]: if1, if2 = if2, if1 self._ifaces_reordered = True master_thread_id, latency_thread_id, socket, threads = \ CpuUtils.get_affinity_trex( self._node, tg_if1, tg_if2, tg_dtc=Constants.TREX_CORE_COUNT) if osi_layer in (u"L2", u"L3", u"L7"): exec_cmd_no_error( self._node, f"sh -c 'cat << EOF > /etc/trex_cfg.yaml\n" f"- version: 2\n" f" c: {len(threads)}\n" f" limit_memory: {Constants.TREX_LIMIT_MEMORY}\n" f" interfaces: [\"{if1[u'pci']}\",\"{if2[u'pci']}\"]\n" f" port_info:\n" f" - dest_mac: \'{if1[u'adj_addr']}\'\n" f" src_mac: \'{if1[u'addr']}\'\n" f" - dest_mac: \'{if2[u'adj_addr']}\'\n" f" src_mac: \'{if2[u'addr']}\'\n" f" platform :\n" f" master_thread_id: {master_thread_id}\n" f" latency_thread_id: {latency_thread_id}\n" f" dual_if:\n" f" - socket: {socket}\n" f" threads: {threads}\n" f"EOF'", sudo=True, message=u"T-Rex config generation!" ) if Constants.TREX_RX_DESCRIPTORS_COUNT != 0: exec_cmd_no_error( self._node, f"sh -c 'cat << EOF >> /etc/trex_cfg.yaml\n" f" rx_desc: {Constants.TREX_RX_DESCRIPTORS_COUNT}\n" f"EOF'", sudo=True, message=u"T-Rex rx_desc modification!" ) if Constants.TREX_TX_DESCRIPTORS_COUNT != 0: exec_cmd_no_error( self._node, f"sh -c 'cat << EOF >> /etc/trex_cfg.yaml\n" f" tx_desc: {Constants.TREX_TX_DESCRIPTORS_COUNT}\n" f"EOF'", sudo=True, message=u"T-Rex tx_desc modification!" ) else: raise ValueError(u"Unknown OSI layer!") TrafficGenerator.startup_trex( self._node, osi_layer, subtype=subtype ) @staticmethod def startup_trex(tg_node, osi_layer, subtype=None): """Startup sequence for the TRex traffic generator. :param tg_node: Traffic generator node. :param osi_layer: 'L2', 'L3' or 'L7' - OSI Layer testing type. :param subtype: Traffic generator sub-type. :type tg_node: dict :type osi_layer: str :type subtype: NodeSubTypeTG :raises RuntimeError: If T-Rex startup failed. :raises ValueError: If OSI layer is not supported. """ if not subtype: subtype = check_subtype(tg_node) if subtype == NodeSubTypeTG.TREX: for _ in range(0, 3): # Kill TRex only if it is already running. cmd = u"sh -c \"pgrep t-rex && pkill t-rex && sleep 3 || true\"" exec_cmd_no_error( tg_node, cmd, sudo=True, message=u"Kill TRex failed!" ) # Configure TRex. ports = '' for port in tg_node[u"interfaces"].values(): if u'Mellanox' not in port.get(u'model'): ports += f" {port.get(u'pci_address')}" cmd = f"sh -c \"cd {Constants.TREX_INSTALL_DIR}/scripts/ && " \ f"./dpdk_nic_bind.py -u {ports} || true\"" exec_cmd_no_error( tg_node, cmd, sudo=True, message=u"Unbind PCI ports from driver failed!" ) # Start TRex. cd_cmd = f"cd '{Constants.TREX_INSTALL_DIR}/scripts/'" trex_cmd = OptionString([u"nohup", u"./t-rex-64"]) trex_cmd.add(u"-i") trex_cmd.add(u"--prefix $(hostname)") trex_cmd.add(u"--hdrh") trex_cmd.add(u"--no-scapy-server") trex_cmd.add_if(u"--astf", osi_layer == u"L7") # OptionString does not create double space if extra is empty. trex_cmd.add(f"{Constants.TREX_EXTRA_CMDLINE}") inner_command = f"{cd_cmd} && {trex_cmd} > /tmp/trex.log 2>&1 &" cmd = f"sh -c \"{inner_command}\" > /dev/null" try: exec_cmd_no_error(tg_node, cmd, sudo=True) except RuntimeError: cmd = u"sh -c \"cat /tmp/trex.log\"" exec_cmd_no_error( tg_node, cmd, sudo=True, message=u"Get TRex logs failed!" ) raise RuntimeError(u"Start TRex failed!") # Test T-Rex API responsiveness. cmd = f"python3 {Constants.REMOTE_FW_DIR}/GPL/tools/trex/" if osi_layer in (u"L2", u"L3"): cmd += u"trex_stl_assert.py" elif osi_layer == u"L7": cmd += u"trex_astf_assert.py" else: raise ValueError(u"Unknown OSI layer!") try: exec_cmd_no_error( tg_node, cmd, sudo=True, message=u"T-Rex API is not responding!", retries=20 ) except RuntimeError: continue return # After max retries TRex is still not responding to API critical # error occurred. exec_cmd(tg_node, u"cat /tmp/trex.log", sudo=True) raise RuntimeError(u"Start T-Rex failed after multiple retries!") @staticmethod def is_trex_running(node): """Check if T-Rex is running using pidof. :param node: Traffic generator node. :type node: dict :returns: True if T-Rex is running otherwise False. :rtype: bool """ ret, _, _ = exec_cmd(node, u"pgrep t-rex", sudo=True) return bool(int(ret) == 0) @staticmethod def teardown_traffic_generator(node): """TG teardown. :param node: Traffic generator node. :type node: dict :returns: nothing :raises RuntimeError: If node type is not a TG, or if T-Rex teardown fails. """ subtype = check_subtype(node) if subtype == NodeSubTypeTG.TREX: exec_cmd_no_error( node, u"sh -c " u"\"if pgrep t-rex; then sudo pkill t-rex && sleep 3; fi\"", sudo=False, message=u"T-Rex kill failed!" ) def trex_astf_stop_remote_exec(self, node): """Execute T-Rex ASTF script on remote node over ssh to stop running traffic. Internal state is updated with measurement results. :param node: T-Rex generator node. :type node: dict :raises RuntimeError: If stop traffic script fails. """ command_line = OptionString().add(u"python3") dirname = f"{Constants.REMOTE_FW_DIR}/GPL/tools/trex" command_line.add(f"'{dirname}/trex_astf_stop.py'") command_line.change_prefix(u"--") for index, value in enumerate(self._xstats): if value is not None: value = value.replace(u"'", u"\"") command_line.add_equals(f"xstat{index}", f"'{value}'") stdout, _ = exec_cmd_no_error( node, command_line, message=u"T-Rex ASTF runtime error!" ) self._parse_traffic_results(stdout) def trex_stl_stop_remote_exec(self, node): """Execute T-Rex STL script on remote node over ssh to stop running traffic. Internal state is updated with measurement results. :param node: T-Rex generator node. :type node: dict :raises RuntimeError: If stop traffic script fails. """ command_line = OptionString().add(u"python3") dirname = f"{Constants.REMOTE_FW_DIR}/GPL/tools/trex" command_line.add(f"'{dirname}/trex_stl_stop.py'") command_line.change_prefix(u"--") for index, value in enumerate(self._xstats): if value is not None: value = value.replace(u"'", u"\"") command_line.add_equals(f"xstat{index}", f"'{value}'") stdout, _ = exec_cmd_no_error( node, command_line, message=u"T-Rex STL runtime error!" ) self._parse_traffic_results(stdout) def stop_traffic_on_tg(self): """Stop all traffic on TG. :returns: Structure containing the result of the measurement. :rtype: ReceiveRateMeasurement :raises ValueError: If TG traffic profile is not supported. """ subtype = check_subtype(self._node) if subtype != NodeSubTypeTG.TREX: raise ValueError(f"Unsupported TG subtype: {subtype!r}") if u"trex-astf" in self.traffic_profile: self.trex_astf_stop_remote_exec(self._node) elif u"trex-stl" in self.traffic_profile: self.trex_stl_stop_remote_exec(self._node) else: raise ValueError(u"Unsupported T-Rex traffic profile!") self._stop_time = time.monotonic() return self._get_measurement_result() def _compute_duration(self, duration, multiplier): """Compute duration for profile driver. The final result is influenced by transaction scale and duration limit. It is assumed a higher level function has already set those to self. The duration argument is the target value from search point of view, before the overrides are applied here. Minus one (signalling async traffic start) is kept. Completeness flag is also included. Duration limited or async trials are not considered complete for ramp-up purposes. :param duration: Time expressed in seconds for how long to send traffic. :param multiplier: Traffic rate in transactions per second. :type duration: float :type multiplier: float :returns: New duration and whether it was a complete ramp-up candidate. :rtype: float, bool """ if duration < 0.0: # Keep the async -1. return duration, False computed_duration = duration if self.transaction_scale: computed_duration = self.transaction_scale / multiplier # Log the computed duration, # so we can compare with what telemetry suggests # the real duration was. logger.debug(f"Expected duration {computed_duration}") if not self.duration_limit: return computed_duration, True limited_duration = min(computed_duration, self.duration_limit) return limited_duration, (limited_duration == computed_duration) def trex_astf_start_remote_exec( self, duration, multiplier, async_call=False): """Execute T-Rex ASTF script on remote node over ssh to start running traffic. In sync mode, measurement results are stored internally. In async mode, initial data including xstats are stored internally. This method contains the logic to compute duration as maximum time if transaction_scale is nonzero. The transaction_scale argument defines (limits) how many transactions will be started in total. As that amount of transaction can take considerable time (sometimes due to explicit delays in the profile), the real time a trial needs to finish is computed here. For now, in that case the duration argument is ignored, assuming it comes from ASTF-unaware search algorithm. The overall time a single transaction needs is given in parameter transaction_duration, it includes both explicit delays and implicit time it takes to transfer data (or whatever the transaction does). Currently it is observed TRex does not start the ASTF traffic immediately, an ad-hoc constant is added to the computed duration to compensate for that. If transaction_scale is zero, duration is not recomputed. It is assumed the subsequent result parsing gets the real duration if the traffic stops sooner for any reason. Currently, it is assumed traffic profile defines a single transaction. To avoid heavy logic here, the input rate is expected to be in transactions per second, as that directly translates to TRex multiplier, (assuming the profile does not override the default cps value of one). :param duration: Time expressed in seconds for how long to send traffic. :param multiplier: Traffic rate in transactions per second. :param async_call: If enabled then don't wait for all incoming traffic. :type duration: float :type multiplier: int :type async_call: bool :raises RuntimeError: In case of T-Rex driver issue. """ self.check_mode(TrexMode.ASTF) p_0, p_1 = (1, 0) if self._ifaces_reordered else (0, 1) if not isinstance(duration, (float, int)): duration = float(duration) # TODO: Refactor the code so duration is computed only once, # and both the initial and the computed durations are logged. computed_duration, _ = self._compute_duration(duration, multiplier) command_line = OptionString().add(u"python3") dirname = f"{Constants.REMOTE_FW_DIR}/GPL/tools/trex" command_line.add(f"'{dirname}/trex_astf_profile.py'") command_line.change_prefix(u"--") dirname = f"{Constants.REMOTE_FW_DIR}/GPL/traffic_profiles/trex" command_line.add_with_value( u"profile", f