diff options
Diffstat (limited to 'resources/libraries/python')
-rw-r--r-- | resources/libraries/python/NATUtil.py | 291 | ||||
-rw-r--r-- | resources/libraries/python/PapiExecutor.py | 17 |
2 files changed, 108 insertions, 200 deletions
diff --git a/resources/libraries/python/NATUtil.py b/resources/libraries/python/NATUtil.py index e08a6daf82..aeeb3bc1a5 100644 --- a/resources/libraries/python/NATUtil.py +++ b/resources/libraries/python/NATUtil.py @@ -1,4 +1,4 @@ -# Copyright (c) 2018 Cisco and/or its affiliates. +# Copyright (c) 2019 Cisco and/or its affiliates. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at: @@ -13,7 +13,28 @@ """NAT utilities library.""" -from resources.libraries.python.VatExecutor import VatTerminal, VatExecutor +from pprint import pformat +from socket import AF_INET, inet_pton + +from enum import IntEnum + +from robot.api import logger + +from resources.libraries.python.InterfaceUtil import InterfaceUtil +from resources.libraries.python.PapiExecutor import PapiExecutor + + +class NATConfigFlags(IntEnum): + """Common NAT plugin APIs""" + NAT_IS_NONE = 0x00 + NAT_IS_TWICE_NAT = 0x01 + NAT_IS_SELF_TWICE_NAT = 0x02 + NAT_IS_OUT2IN_ONLY = 0x04 + NAT_IS_ADDR_ONLY = 0x08 + NAT_IS_OUTSIDE = 0x10 + NAT_IS_INSIDE = 0x20 + NAT_IS_STATIC = 0x40 + NAT_IS_EXT_HOST_VALID = 0x80 class NATUtil(object): @@ -32,21 +53,33 @@ class NATUtil(object): :type node: dict :type int_in: str :type int_out: str - :returns: Response of the command. - :rtype: str - :raises RuntimeError: If setting of inside and outside interfaces for - NAT44 fails. """ - try: - with VatTerminal(node, json_param=False) as vat: - response = vat.vat_terminal_exec_cmd_from_template( - 'nat/nat44_set_interfaces.vat', - int_in=int_in, int_out=int_out) - return response - except: - raise RuntimeError("Setting of inside and outside interfaces for " - "NAT failed!") + cmd = 'nat44_interface_add_del_feature' + + int_in_idx = InterfaceUtil.get_sw_if_index(node, int_in) + err_msg = 'Failed to set inside interface {int} for NAT44 on host ' \ + '{host}'.format(int=int_in, host=node['host']) + args_in = dict( + sw_if_index=int_in_idx, + is_add=1, + flags=getattr(NATConfigFlags, "NAT_IS_INSIDE").value + ) + with PapiExecutor(node) as papi_exec: + papi_exec.add(cmd, **args_in).get_replies(err_msg).\ + verify_reply(err_msg=err_msg) + + int_out_idx = InterfaceUtil.get_sw_if_index(node, int_out) + err_msg = 'Failed to set outside interface {int} for NAT44 on host ' \ + '{host}'.format(int=int_out, host=node['host']) + args_in = dict( + sw_if_index=int_out_idx, + is_add=1, + flags=getattr(NATConfigFlags, "NAT_IS_OUTSIDE").value + ) + with PapiExecutor(node) as papi_exec: + papi_exec.add(cmd, **args_in).get_replies(err_msg). \ + verify_reply(err_msg=err_msg) @staticmethod def set_nat44_deterministic(node, ip_in, subnet_in, ip_out, subnet_out): @@ -62,200 +95,58 @@ class NATUtil(object): :type subnet_in: str or int :type ip_out: str :type subnet_out: str or int - :returns: Response of the command. - :rtype: str - :raises RuntimeError: If setting of deterministic behaviour of NAT44 - fails. """ - try: - with VatTerminal(node, json_param=False) as vat: - response = vat.vat_terminal_exec_cmd_from_template( - 'nat/nat44_set_deterministic.vat', - ip_in=ip_in, subnet_in=subnet_in, - ip_out=ip_out, subnet_out=subnet_out) - return response - except: - raise RuntimeError("Setting of deterministic behaviour of NAT " - "failed!") - - @staticmethod - def set_nat_workers(node, lcores): - """Set NAT workers. - - :param node: DUT node. - :param lcores: List of cores, format: range e.g. 1-5 or list of ranges - e.g.: 1-5,18-22. - :type node: dict - :type lcores: str - :returns: Response of the command. - :rtype: str - :raises RuntimeError: If setting of NAT workers fails. - """ - - try: - with VatTerminal(node, json_param=False) as vat: - response = vat.vat_terminal_exec_cmd_from_template( - 'nat/nat_set_workers.vat', lcores=lcores) - return response - except: - raise RuntimeError("Setting of NAT workers failed!") + cmd = 'nat_det_add_del_map' + err_msg = 'Failed to set deterministic behaviour of NAT on host ' \ + '{host}'.format(host=node['host']) + args_in = dict( + is_add=True, + in_addr=inet_pton(AF_INET, str(ip_in)), + in_plen=int(subnet_in), + out_addr=inet_pton(AF_INET, str(ip_out)), + out_plen=int(subnet_out) + ) + with PapiExecutor(node) as papi_exec: + papi_exec.add(cmd, **args_in).get_replies(err_msg). \ + verify_reply(err_msg=err_msg) @staticmethod def show_nat(node): - """Show the NAT settings. - - :param node: DUT node. - :type node: dict - :returns: Response of the command. - :rtype: str - :raises RuntimeError: If getting of NAT settings fails. - """ - - try: - with VatTerminal(node, json_param=False) as vat: - response = vat.vat_terminal_exec_cmd_from_template( - 'nat/nat_show_nat.vat') - return response - except: - raise RuntimeError("Getting of NAT settings failed!") - - @staticmethod - def show_nat44_deterministic_forward(node, ip_addr): - """Show forward IP address and port(s). - - :param node: DUT node. - :param ip_addr: IP address. - :type node: dict - :type ip_addr: str - :returns: Response of the command. - :rtype: str - :raises RuntimeError: If command 'exec snat deterministic forward' - fails. - """ + """Show the NAT configuration and data. - try: - with VatTerminal(node, json_param=False) as vat: - response = vat.vat_terminal_exec_cmd_from_template( - 'nat/nat44_deterministic_forward.vat', ip=ip_addr) - return response - except: - raise RuntimeError("Command 'exec nat44 deterministic forward {ip}'" - " failed!".format(ip=ip_addr)) + Used data sources: - @staticmethod - def show_nat44_deterministic_reverse(node, ip_addr, port): - """Show reverse IP address. + nat_show_config + nat_worker_dump + nat44_interface_addr_dump + nat44_address_dump + nat44_static_mapping_dump + nat44_user_dump + nat44_interface_dump + nat44_user_session_dump + nat_det_map_dump :param node: DUT node. - :param ip_addr: IP address. - :param port: Port. :type node: dict - :type ip_addr: str - :type port: str or int - :returns: Response of the command. - :rtype: str - :raises RuntimeError: If command 'exec snat deterministic reverse' - fails. """ - try: - with VatTerminal(node, json_param=False) as vat: - response = vat.vat_terminal_exec_cmd_from_template( - 'nat/nat44_deterministic_reverse.vat', - ip=ip_addr, port=port) - return response - except: - raise RuntimeError( - "Command 'exec nat44 deterministic reverse {ip}:{port}'" - " failed!".format(ip=ip_addr, port=port)) - - @staticmethod - def get_nat_static_mappings(node): - """Get NAT static mappings from VPP node. - - :param node: VPP node. - :type node: dict - :returns: List of static mappings. - :rtype: list - :raises RuntimeError: If the output is not as expected. - """ - - vat = VatExecutor() - # JSON output not supported for this command - vat.execute_script('nat/snat_mapping_dump.vat', node, json_out=False) - - stdout = vat.get_script_stdout() - lines = stdout.split("\n") - - data = [] - # lines[0,1] are table and column headers - for line in lines[2::]: - # Ignore extra data after NAT table - if "snat_static_mapping_dump error: Misc" in line or "vat#" in line: - continue - items = line.split(" ") - while "" in items: - items.remove("") - if not items: - continue - if len(items) == 4: - # no ports were returned - data.append({ - "local_address": items[0], - "remote_address": items[1], - "vrf": items[2], - "protocol": items[3] - }) - elif len(items) == 6: - data.append({ - "local_address": items[0], - "local_port": items[1], - "remote_address": items[2], - "remote_port": items[3], - "vrf": items[4], - "protocol": items[5] - }) - else: - raise RuntimeError("Unexpected output from snat_mapping_dump.") - - return data - - @staticmethod - def get_nat_interfaces(node): - """Get list of interfaces configured with NAT from VPP node. - - :param node: VPP node. - :type node: dict - :returns: List of interfaces on the node that are configured with NAT. - :rtype: list - :raises RuntimeError: If the output is not as expected. - """ - - vat = VatExecutor() - # JSON output not supported for this command - vat.execute_script('nat/snat_interface_dump.vat', node, - json_out=False) - - stdout = vat.get_script_stdout() - lines = stdout.split("\n") - - data = [] - for line in lines: - items = line.split(" ") - for trash in ("", "vat#"): - while trash in items: - items.remove(trash) - if not items: - continue - if len(items) == 3: - data.append({ - # items[0] is the table header - "sw_if_index" - "sw_if_index": items[1], - "direction": items[2] - }) - else: - raise RuntimeError( - "Unexpected output from snat_interface_dump.") - - return data + cmd = 'nat_show_config' + err_msg = 'Failed to get NAT configuration on host {host}'.\ + format(host=node['host']) + with PapiExecutor(node) as papi_exec: + data = papi_exec.add(cmd).get_replies(err_msg).\ + verify_reply(err_msg=err_msg) + logger.debug("NAT Configuration:\n{data}".format(data=pformat(data))) + + cmds = [ + "nat_worker_dump", + "nat44_interface_addr_dump", + "nat44_address_dump", + "nat44_static_mapping_dump", + "nat44_user_dump", + "nat44_interface_dump", + "nat44_user_session_dump", + "nat_det_map_dump" + ] + PapiExecutor.dump_and_log(node, cmds) diff --git a/resources/libraries/python/PapiExecutor.py b/resources/libraries/python/PapiExecutor.py index c2f966fb6d..98eb59cae7 100644 --- a/resources/libraries/python/PapiExecutor.py +++ b/resources/libraries/python/PapiExecutor.py @@ -17,6 +17,8 @@ import binascii import json +from pprint import pformat + from robot.api import logger from resources.libraries.python.Constants import Constants @@ -366,6 +368,21 @@ class PapiExecutor(object): ignore_errors=ignore_errors, err_msg=err_msg, timeout=timeout) @staticmethod + def dump_and_log(node, cmds): + """Dump and log requested information. + + :param node: DUT node. + :param cmds: Dump commands to be executed. + :type node: dict + :type cmds: list + """ + with PapiExecutor(node) as papi_exec: + for cmd in cmds: + dump = papi_exec.add(cmd).get_dump() + logger.debug("{cmd}:\n{data}".format( + cmd=cmd, data=pformat(dump.reply[0]["api_reply"]))) + + @staticmethod def run_cli_cmd(node, cmd, log=True): """Run a CLI command. |