aboutsummaryrefslogtreecommitdiffstats
path: root/test/test_srv6_as.py
diff options
context:
space:
mode:
authorFilip Tehlar <ftehlar@cisco.com>2022-02-18 08:49:43 +0000
committerFlorin Coras <florin.coras@gmail.com>2022-02-24 19:21:20 +0000
commit447e51d4e4ed08df7f370b150d851842bcafa957 (patch)
tree8e29fd14bda4e0e1ec91ffbaa87b79c71ff36fed /test/test_srv6_as.py
parent26cd0242c95025e0d644db3a80dfe8dee83b6d7a (diff)
session: fix session layer socket read
This fixes an issue caused by session layer reading expected part of data (cert + key) before the client actually sends it. Type: fix Signed-off-by: Filip Tehlar <ftehlar@cisco.com> Change-Id: I6ddddb08f9576211b302e814d7c2b040383e5fb7
Diffstat (limited to 'test/test_srv6_as.py')
0 files changed, 0 insertions, 0 deletions
150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261
import os
import sys
import traceback
import ipaddress
from subprocess import check_output, CalledProcessError

import scapy.compat
import asfframework
from config import config
from log import RED, single_line_delim, double_line_delim
from util import check_core_path, get_core_path


class Hook:
    """
    Generic hooks before/after API/CLI calls
    """

    def __init__(self, test):
        self.test = test
        self.logger = test.logger

    def before_api(self, api_name, api_args):
        """
        Function called before API call
        Emit a debug message describing the API name and arguments

        @param api_name: name of the API
        @param api_args: tuple containing the API arguments
        """

        def _friendly_format(val):
            if not isinstance(val, str):
                return val
            if len(val) == 6:
                return "{!s} ({!s})".format(
                    val, ":".join(["{:02x}".format(scapy.compat.orb(x)) for x in val])
                )
            try:
                # we don't call test_type(val) because it is a packed value.
                return "{!s} ({!s})".format(val, str(ipaddress.ip_address(val)))
            except ValueError:
                return val

        _args = ", ".join(
            "{!s}={!r}".format(key, _friendly_format(val))
            for (key, val) in api_args.items()
        )
        self.logger.debug("API: %s (%s)" % (api_name, _args), extra={"color": RED})

    def after_api(self, api_name, api_args):
        """
        Function called after API call

        @param api_name: name of the API
        @param api_args: tuple containing the API arguments
        """
        pass

    def before_cli(self, cli):
        """
        Function called before CLI call
        Emit a debug message describing the CLI

        @param cli: CLI string
        """
        self.logger.debug("CLI: %s" % (cli), extra={"color": RED})

    def after_cli(self, cli):
        """
        Function called after CLI call
        """
        pass


class PollHook(Hook):
    """Hook which checks if the vpp subprocess is alive"""

    def __init__(self, test):
        super(PollHook, self).__init__(test)

    def on_crash(self, core_path):
        self.logger.error(
            "Core file present, debug with: gdb %s %s", config.vpp, core_path
        )
        check_core_path(self.logger, core_path)
        self.logger.error("Running `file %s':", core_path)
        try:
            info = check_output(["file", core_path])
            self.logger.error(info)
        except CalledProcessError as e:
            self.logger.error(
                "Subprocess returned with error running `file' utility on "
                "core-file, "
                "rc=%s",
                e.returncode,
            )
        except OSError as e:
            self.logger.error(
                "Subprocess returned OS error running `file' utility on "
                "core-file, "
                "oserror=(%s) %s",
                e.errno,
                e.strerror,
            )
        except Exception as e:
            self.logger.error(
                "Subprocess returned unanticipated error running `file' "
                "utility on core-file, "
                "%s",
                e,
            )

    def poll_vpp(self):
        """
        Poll the vpp status and throw an exception if it's not running
        :raises VppDiedError: exception if VPP is not running anymore
        """
        if not hasattr(self.test, "vpp") or self.test.vpp_dead:
            # already dead, nothing to do
            return

        self.test.vpp.poll()
        if self.test.vpp.returncode is not None:
            self.test.vpp_dead = True
            core_path = get_core_path(self.test.tempdir)
            if os.path.isfile(core_path):
                self.on_crash(core_path)
            raise asfframework.VppDiedError(rv=self.test.vpp.returncode)

    def before_api(self, api_name, api_args):
        """
        Check if VPP died before executing an API

        :param api_name: name of the API
        :param api_args: tuple containing the API arguments
        :raises VppDiedError: exception if VPP is not running anymore

        """
        super(PollHook, self).before_api(api_name, api_args)
        self.poll_vpp()

    def before_cli(self, cli):
        """
        Check if VPP died before executing a CLI

        :param cli: CLI string
        :raises Exception: exception if VPP is not running anymore

        """
        super(PollHook, self).before_cli(cli)
        self.poll_vpp()


class StepHook(PollHook):
    """Hook which requires user to press ENTER before doing any API/CLI"""

    def __init__(self, test):
        self.skip_stack = None
        self.skip_num = None
        self.skip_count = 0
        self.break_func = None
        super(StepHook, self).__init__(test)

    def skip(self):
        if self.break_func is not None:
            return self.should_skip_func_based()
        if self.skip_stack is not None:
            return self.should_skip_stack_based()

    def should_skip_func_based(self):
        stack = traceback.extract_stack()
        for e in stack:
            if e[2] == self.break_func:
                self.break_func = None
                return False
        return True

    def should_skip_stack_based(self):
        stack = traceback.extract_stack()
        counter = 0
        skip = True
        for e in stack:
            if counter > self.skip_num:
                break
            if e[0] != self.skip_stack[counter][0]:
                skip = False
            if e[1] != self.skip_stack[counter][1]:
                skip = False
            counter += 1
        if skip:
            self.skip_count += 1
            return True
        else:
            print("%d API/CLI calls skipped in specified stack frame" % self.skip_count)
            self.skip_count = 0
            self.skip_stack = None
            self.skip_num = None
            return False

    def user_input(self):
        print("number\tfunction\tfile\tcode")
        counter = 0
        stack = traceback.extract_stack()
        for e in stack:
            print("%02d.\t%s\t%s:%d\t[%s]" % (counter, e[2], e[0], e[1], e[3]))
            counter += 1
        print(single_line_delim)
        print("You may enter a number of stack frame chosen from above")
        print("Calls in/below that stack frame will be not be stepped anymore")
        print("Alternatively, enter a test function name to stop at")
        print(single_line_delim)
        while True:
            print(
                "Enter your choice, if any, and press ENTER to continue "
                "running the testcase..."
            )
            choice = sys.stdin.readline().rstrip("\r\n")
            if choice == "":
                choice = None
            try:
                if choice is not None:
                    num = int(choice)
            except ValueError:
                if choice.startswith("test_"):
                    break
                print("Invalid input")
                continue
            if choice is not None and (num < 0 or num >= len(stack)):
                print("Invalid choice")
                continue
            break
        if choice is not None:
            if choice.startswith("test_"):
                self.break_func = choice
            else:
                self.break_func = None
                self.skip_stack = stack
                self.skip_num = num

    def before_cli(self, cli):
        """Wait for ENTER before executing CLI"""
        if self.skip():
            print("Skip pause before executing CLI: %s" % cli)
        else:
            print(double_line_delim)
            print("Test paused before executing CLI: %s" % cli)
            print(single_line_delim)
            self.user_input()
        super(StepHook, self).before_cli(cli)

    def before_api(self, api_name, api_args):
        """Wait for ENTER before executing API"""
        if self.skip():
            print("Skip pause before executing API: %s (%s)" % (api_name, api_args))
        else:
            print(double_line_delim)
            print("Test paused before executing API: %s (%s)" % (api_name, api_args))
            print(single_line_delim)
            self.user_input()
        super(StepHook, self).before_api(api_name, api_args)