summaryrefslogtreecommitdiffstats
path: root/scripts/automation/regression/functional_tests/platform_cmd_link_test.py
blob: 7a31815b6c2dc0457e6ca59e2db299cc19a660b1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
#!/router/bin/python

from platform_cmd_link import *
import functional_general_test
from nose.tools import assert_equal
from nose.tools import assert_not_equal


class CCommandLink_Test(functional_general_test.CGeneralFunctional_Test):

    def setUp(self):
        self.cache = CCommandCache()
        self.cache.add('IF', "ip nbar protocol-discovery", 'GigabitEthernet0/0/1')
        self.cache.add('IF', "ip nbar protocol-discovery", 'GigabitEthernet0/0/2')
        self.cache.add('conf', "arp 1.1.1.1 0000.0001.0000 arpa")
        self.cache.add('conf', "arp 1.1.2.1 0000.0002.0000 arpa")
        self.cache.add('exec', "show ip nbar protocol-discovery stats packet-count")
        self.com_link = CCommandLink()

    def test_transmit(self):
        # test here future implemntatin of platform physical link
        pass

    def test_run_cached_command (self):
        self.com_link.run_command([self.cache])

        assert_equal (self.com_link.get_history(), 
            ["configure terminal", "interface GigabitEthernet0/0/1", "ip nbar protocol-discovery", "interface GigabitEthernet0/0/2", "ip nbar protocol-discovery", "exit", "arp 1.1.1.1 0000.0001.0000 arpa", "arp 1.1.2.1 0000.0002.0000 arpa", "exit", "show ip nbar protocol-discovery stats packet-count"]
            )

        self.com_link.clear_history()
        self.com_link.run_single_command(self.cache)
        assert_equal (self.com_link.get_history(), 
            ["configure terminal", "interface GigabitEthernet0/0/1", "ip nbar protocol-discovery", "interface GigabitEthernet0/0/2", "ip nbar protocol-discovery", "exit", "arp 1.1.1.1 0000.0001.0000 arpa", "arp 1.1.2.1 0000.0002.0000 arpa", "exit", "show ip nbar protocol-discovery stats packet-count"]
            )

    def test_run_single_command(self):
        self.com_link.run_single_command("show ip nbar protocol-discovery stats packet-count")
        assert_equal (self.com_link.get_history(), 
            ["show ip nbar protocol-discovery stats packet-count"]
            )

    def test_run_mixed_commands (self):
        self.com_link.run_single_command("show ip nbar protocol-discovery stats packet-count")
        self.com_link.run_command([self.cache])
        self.com_link.run_command(["show ip interface brief"])

        assert_equal (self.com_link.get_history(), 
            ["show ip nbar protocol-discovery stats packet-count",
             "configure terminal", "interface GigabitEthernet0/0/1", "ip nbar protocol-discovery", "interface GigabitEthernet0/0/2", "ip nbar protocol-discovery", "exit", "arp 1.1.1.1 0000.0001.0000 arpa", "arp 1.1.2.1 0000.0002.0000 arpa", "exit", "show ip nbar protocol-discovery stats packet-count",
             "show ip interface brief"]
            )

    def test_clear_history (self):
        self.com_link.run_command(["show ip interface brief"])
        self.com_link.clear_history()
        assert_equal (self.com_link.get_history(), [])

    def tearDown(self):
        self.cache.clear_cache()
fff0f0 } /* Comment.Special */ .highlight .gd { color: #000000; background-color: #ffdddd } /* Generic.Deleted */ .highlight .ge { font-style: italic } /* Generic.Emph */ .highlight .gr { color: #aa0000 } /* Generic.Error */ .highlight .gh { color: #333333 } /* Generic.Heading */ .highlight .gi { color: #000000; background-color: #ddffdd } /* Generic.Inserted */ .highlight .go { color: #888888 } /* Generic.Output */ .highlight .gp { color: #555555 } /* Generic.Prompt */ .highlight .gs { font-weight: bold } /* Generic.Strong */ .highlight .gu { color: #666666 } /* Generic.Subheading */ .highlight .gt { color: #aa0000 } /* Generic.Traceback */ .highlight .kc { color: #008800; font-weight: bold } /* Keyword.Constant */ .highlight .kd { color: #008800; font-weight: bold } /* Keyword.Declaration */ .highlight .kn { color: #008800; font-weight: bold } /* Keyword.Namespace */ .highlight .kp { color: #008800 } /* Keyword.Pseudo */ .highlight .kr { color: #008800; font-weight: bold } /* Keyword.Reserved */ .highlight .kt { color: #888888; font-weight: bold } /* Keyword.Type */ .highlight .m { color: #0000DD; font-weight: bold } /* Literal.Number */ .highlight .s { color: #dd2200; background-color: #fff0f0 } /* Literal.String */ .highlight .na { color: #336699 } /* Name.Attribute */ .highlight .nb { color: #003388 } /* Name.Builtin */ .highlight .nc { color: #bb0066; font-weight: bold } /* Name.Class */ .highlight .no { color: #003366; font-weight: bold } /* Name.Constant */ .highlight .nd { color: #555555 } /* Name.Decorator */ .highlight .ne { color: #bb0066; font-weight: bold } /* Name.Exception */ .highlight .nf { color: #0066bb; font-weight: bold } /* Name.Function */ .highlight .nl { color: #336699; font-style: italic } /* Name.Label */ .highlight .nn { color: #bb0066; font-weight: bold } /* Name.Namespace */ .highlight .py { color: #336699; font-weight: bold } /* Name.Property */ .highlight .nt { color: #bb0066; font-weight: bold } /* Name.Tag */ .highlight .nv { color: #336699 } /* Name.Variable */ .highlight .ow { color: #008800 } /* Operator.Word */ .highlight .w { color: #bbbbbb } /* Text.Whitespace */ .highlight .mb { color: #0000DD; font-weight: bold } /* Literal.Number.Bin */ .highlight .mf { color: #0000DD; font-weight: bold } /* Literal.Number.Float */ .highlight .mh { color: #0000DD; font-weight: bold } /* Literal.Number.Hex */ .highlight .mi { color: #0000DD; font-weight: bold } /* Literal.Number.Integer */ .highlight .mo { color: #0000DD; font-weight: bold } /* Literal.Number.Oct */ .highlight .sa { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Affix */ .highlight .sb { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Backtick */ .highlight .sc { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Char */ .highlight .dl { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Delimiter */ .highlight .sd { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Doc */ .highlight .s2 { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Double */ .highlight .se { color: #0044dd; background-color: #fff0f0 } /* Literal.String.Escape */ .highlight .sh { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Heredoc */ .highlight .si { color: #3333bb; background-color: #fff0f0 } /* Literal.String.Interpol */ .highlight .sx { color: #22bb22; background-color: #f0fff0 } /* Literal.String.Other */ .highlight .sr { color: #008800; background-color: #fff0ff } /* Literal.String.Regex */ .highlight .s1 { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Single */ .highlight .ss { color: #aa6600; background-color: #fff0f0 } /* Literal.String.Symbol */ .highlight .bp { color: #003388 } /* Name.Builtin.Pseudo */ .highlight .fm { color: #0066bb; font-weight: bold } /* Name.Function.Magic */ .highlight .vc { color: #336699 } /* Name.Variable.Class */ .highlight .vg { color: #dd7700 } /* Name.Variable.Global */ .highlight .vi { color: #3333bb } /* Name.Variable.Instance */ .highlight .vm { color: #336699 } /* Name.Variable.Magic */ .highlight .il { color: #0000DD; font-weight: bold } /* Literal.Number.Integer.Long */
# -*- coding: utf-8 -*-

# version.py
# Part of ‘python-daemon’, an implementation of PEP 3143.
#
# Copyright © 2008–2015 Ben Finney <ben+python@benfinney.id.au>
#
# This is free software: you may copy, modify, and/or distribute this work
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; version 3 of that license or any later version.
# No warranty expressed or implied. See the file ‘LICENSE.GPL-3’ for details.

""" Version information unified for human- and machine-readable formats.

    The project ‘ChangeLog’ file is a reStructuredText document, with
    each section describing a version of the project. The document is
    intended to be readable as-is by end users.

    This module handles transformation from the ‘ChangeLog’ to a
    mapping of version information, serialised as JSON. It also
    provides functionality for Distutils to use this information.

    Requires:

    * Docutils <http://docutils.sourceforge.net/>
    * JSON <https://docs.python.org/3/reference/json.html>

    """

from __future__ import (absolute_import, unicode_literals)

import sys
import os
import io
import errno
import json
import datetime
import textwrap
import re
import functools
import collections
import distutils
import distutils.errors
import distutils.cmd
try:
    # Python 2 has both ‘str’ (bytes) and ‘unicode’ (text).
    basestring = basestring
    unicode = unicode
except NameError:
    # Python 3 names the Unicode data type ‘str’.
    basestring = str
    unicode = str

import setuptools
import setuptools.command.egg_info


def ensure_class_bases_begin_with(namespace, class_name, base_class):
    """ Ensure the named class's bases start with the base class.

        :param namespace: The namespace containing the class name.
        :param class_name: The name of the class to alter.
        :param base_class: The type to be the first base class for the
            newly created type.
        :return: ``None``.

        This function is a hack to circumvent a circular dependency:
        using classes from a module which is not installed at the time
        this module is imported.

        Call this function after ensuring `base_class` is available,
        before using the class named by `class_name`.

        """
    existing_class = namespace[class_name]
    assert isinstance(existing_class, type)

    bases = list(existing_class.__bases__)
    if base_class is bases[0]:
        # Already bound to a type with the right bases.
        return
    bases.insert(0, base_class)

    new_class_namespace = existing_class.__dict__.copy()
    # Type creation will assign the correct ‘__dict__’ attribute.
    del new_class_namespace['__dict__']

    metaclass = existing_class.__metaclass__
    new_class = metaclass(class_name, tuple(bases), new_class_namespace)

    namespace[class_name] = new_class


class VersionInfoWriter(object):
    """ Docutils writer to produce a version info JSON data stream. """

    # This class needs its base class to be a class from `docutils`.
    # But that would create a circular dependency: Setuptools cannot
    # ensure `docutils` is available before importing this module.
    #
    # Use `ensure_class_bases_begin_with` after importing `docutils`, to
    # re-bind the `VersionInfoWriter` name to a new type that inherits
    # from `docutils.writers.Writer`.

    __metaclass__ = type

    supported = ['version_info']
    """ Formats this writer supports. """

    def __init__(self):
        super(VersionInfoWriter, self).__init__()
        self.translator_class = VersionInfoTranslator

    def translate(self):
        visitor = self.translator_class(self.document)
        self.document.walkabout(visitor)
        self.output = visitor.astext()


rfc822_person_regex = re.compile(
        "^(?P<name>[^<]+) <(?P<email>[^>]+)>$")

class ChangeLogEntry:
    """ An individual entry from the ‘ChangeLog’ document. """

    __metaclass__ = type

    field_names = [
            'release_date',
            'version',
            'maintainer',
            'body',
            ]

    date_format = "%Y-%m-%d"
    default_version = "UNKNOWN"
    default_release_date = "UNKNOWN"

    def __init__(
            self,
            release_date=default_release_date, version=default_version,
            maintainer=None, body=None):
        self.validate_release_date(release_date)
        self.release_date = release_date

        self.version = version

        self.validate_maintainer(maintainer)
        self.maintainer = maintainer
        self.body = body

    @classmethod
    def validate_release_date(cls, value):
        """ Validate the `release_date` value.

            :param value: The prospective `release_date` value.
            :return: ``None`` if the value is valid.
            :raises ValueError: If the value is invalid.

            """
        if value in ["UNKNOWN", "FUTURE"]:
            # A valid non-date value.
            return None

        # Raises `ValueError` if parse fails.
        datetime.datetime.strptime(value, ChangeLogEntry.date_format)

    @classmethod
    def validate_maintainer(cls, value):
        """ Validate the `maintainer` value.

            :param value: The prospective `maintainer` value.
            :return: ``None`` if the value is valid.
            :raises ValueError: If the value is invalid.

            """
        valid = False

        if value is None:
            valid = True
        elif rfc822_person_regex.search(value):
            valid = True

        if not valid:
            raise ValueError("Not a valid person specification {value!r}")
        else:
            return None

    @classmethod
    def make_ordered_dict(cls, fields):
        """ Make an ordered dict of the fields. """
        result = collections.OrderedDict(
                (name, fields[name])
                for name in cls.field_names)
        return result

    def as_version_info_entry(self):
        """ Format the changelog entry as a version info entry. """
        fields = vars(self)
        entry = self.make_ordered_dict(fields)

        return entry


class InvalidFormatError(ValueError):
    """ Raised when the document is not a valid ‘ChangeLog’ document. """


class VersionInfoTranslator(object):
    """ Translator from document nodes to a version info stream. """

    # This class needs its base class to be a class from `docutils`.
    # But that would create a circular dependency: Setuptools cannot
    # ensure `docutils` is available before importing this module.
    #
    # Use `ensure_class_bases_begin_with` after importing `docutils`,
    # to re-bind the `VersionInfoTranslator` name to a new type that
    # inherits from `docutils.nodes.SparseNodeVisitor`.

    __metaclass__ = type

    wrap_width = 78
    bullet_text = "* "

    attr_convert_funcs_by_attr_name = {
            'released': ('release_date', unicode),
            'version': ('version', unicode),
            'maintainer': ('maintainer', unicode),
            }

    def __init__(self, document):
        super(VersionInfoTranslator, self).__init__(document)
        self.settings = document.settings
        self.current_section_level = 0
        self.current_field_name = None
        self.content = []
        self.indent_width = 0
        self.initial_indent = ""
        self.subsequent_indent = ""
        self.current_entry = None

        # Docutils is not available when this class is defined.
        # Get the `docutils` module dynamically.
        self._docutils = sys.modules['docutils']

    def astext(self):
        """ Return the translated document as text. """
        text = json.dumps(self.content, indent=4)
        return text

    def append_to_current_entry(self, text):
        if self.current_entry is not None:
            if self.current_entry.body is not None:
                self.current_entry.body += text

    def visit_Text(self, node):
        raw_text = node.astext()
        text = textwrap.fill(
                raw_text,
                width=self.wrap_width,
                initial_indent=self.initial_indent,
                subsequent_indent=self.subsequent_indent)
        self.append_to_current_entry(text)

    def depart_Text(self, node):
        pass

    def visit_comment(self, node):
        raise self._docutils.nodes.SkipNode

    def visit_field_body(self, node):
        field_list_node = node.parent.parent
        if not isinstance(field_list_node, self._docutils.nodes.field_list):
            raise InvalidFormatError(
                    "Unexpected field within {node!r}".format(
                        node=field_list_node))
        (attr_name, convert_func) = self.attr_convert_funcs_by_attr_name[
                self.current_field_name]
        attr_value = convert_func(node.astext())
        setattr(self.current_entry, attr_name, attr_value)

    def depart_field_body(self, node):
        pass

    def visit_field_list(self, node):
        pass

    def depart_field_list(self, node):
        self.current_field_name = None
        self.current_entry.body = ""

    def visit_field_name(self, node):
        field_name = node.astext()
        if self.current_section_level == 1:
            # At a top-level section.
            if field_name.lower() not in ["released", "maintainer"]:
                raise InvalidFormatError(
                        "Unexpected field name {name!r}".format(name=field_name))
            self.current_field_name = field_name.lower()

    def depart_field_name(self, node):
        pass

    def visit_bullet_list(self, node):
        self.current_context = []

    def depart_bullet_list(self, node):
        self.current_entry.changes = self.current_context
        self.current_context = None

    def adjust_indent_width(self, delta):
        self.indent_width += delta
        self.subsequent_indent = " " * self.indent_width
        self.initial_indent = self.subsequent_indent

    def visit_list_item(self, node):
        indent_delta = +len(self.bullet_text)
        self.adjust_indent_width(indent_delta)
        self.initial_indent = self.subsequent_indent[:-indent_delta]
        self.append_to_current_entry(self.initial_indent + self.bullet_text)

    def depart_list_item(self, node):
        indent_delta = +len(self.bullet_text)
        self.adjust_indent_width(-indent_delta)
        self.append_to_current_entry("\n")

    def visit_section(self, node):
        self.current_section_level += 1
        if self.current_section_level == 1:
            # At a top-level section.
            self.current_entry = ChangeLogEntry()
        else:
            raise InvalidFormatError(
                    "Subsections not implemented for this writer")

    def depart_section(self, node):
        self.current_section_level -= 1
        self.content.append(
                self.current_entry.as_version_info_entry())
        self.current_entry = None

    _expected_title_word_length = len("Version FOO".split(" "))

    def depart_title(self, node):
        title_text = node.astext()
        # At a top-level section.
        words = title_text.split(" ")
        version = None
        if len(words) != self._expected_title_word_length:
            raise InvalidFormatError(
                    "Unexpected title text {text!r}".format(text=title_text))
        if words[0].lower() not in ["version"]:
            raise InvalidFormatError(
                    "Unexpected title text {text!r}".format(text=title_text))
        version = words[-1]
        self.current_entry.version = version


def changelog_to_version_info_collection(infile):
    """ Render the ‘ChangeLog’ document to a version info collection.

        :param infile: A file-like object containing the changelog.
        :return: The serialised JSON data of the version info collection.

        """

    # Docutils is not available when Setuptools needs this module, so
    # delay the imports to this function instead.
    import docutils.core
    import docutils.nodes
    import docutils.writers

    ensure_class_bases_begin_with(
            globals(), str('VersionInfoWriter'), docutils.writers.Writer)
    ensure_class_bases_begin_with(
            globals(), str('VersionInfoTranslator'),
            docutils.nodes.SparseNodeVisitor)

    writer = VersionInfoWriter()
    settings_overrides = {
            'doctitle_xform': False,
            }
    version_info_json = docutils.core.publish_string(
            infile.read(), writer=writer,
            settings_overrides=settings_overrides)

    return version_info_json


try:
    lru_cache = functools.lru_cache
except AttributeError:
    # Python < 3.2 does not have the `functools.lru_cache` function.
    # Not essential, so replace it with a no-op.
    lru_cache = lambda maxsize=None, typed=False: lambda func: func


@lru_cache(maxsize=128)
def generate_version_info_from_changelog(infile_path):
    """ Get the version info for the latest version in the changelog.

        :param infile_path: Filesystem path to the input changelog file.
        :return: The generated version info mapping; or ``None`` if the
            file cannot be read.

        The document is explicitly opened as UTF-8 encoded text.

        """
    version_info = collections.OrderedDict()

    versions_all_json = None
    try:
        with io.open(infile_path, 'rt', encoding="utf-8") as infile:
            versions_all_json = changelog_to_version_info_collection(infile)
    except EnvironmentError:
        # If we can't read the input file, leave the collection empty.
        pass

    if versions_all_json is not None:
        versions_all = json.loads(versions_all_json.decode('utf-8'))
        version_info = get_latest_version(versions_all)

    return version_info


def get_latest_version(versions):
    """ Get the latest version from a collection of changelog entries.

        :param versions: A collection of mappings for changelog entries.
        :return: An ordered mapping of fields for the latest version,
            if `versions` is non-empty; otherwise, an empty mapping.

        """
    version_info = collections.OrderedDict()

    versions_by_release_date = {
            item['release_date']: item
            for item in versions}
    if versions_by_release_date:
        latest_release_date = max(versions_by_release_date.keys())
        version_info = ChangeLogEntry.make_ordered_dict(
                versions_by_release_date[latest_release_date])

    return version_info


def serialise_version_info_from_mapping(version_info):
    """ Generate the version info serialised data.

        :param version_info: Mapping of version info items.
        :return: The version info serialised to JSON.

        """
    content = json.dumps(version_info, indent=4)

    return content


changelog_filename = "ChangeLog"

def get_changelog_path(distribution, filename=changelog_filename):
    """ Get the changelog file path for the distribution.

        :param distribution: The distutils.dist.Distribution instance.
        :param filename: The base filename of the changelog document.
        :return: Filesystem path of the changelog document, or ``None``
            if not discoverable.

        """
    setup_dirname = os.path.dirname(distribution.script_name)
    filepath = os.path.join(setup_dirname, filename)

    return filepath


def has_changelog(command):
    """ Return ``True`` iff the distribution's changelog file exists. """
    result = False

    changelog_path = get_changelog_path(command.distribution)
    if changelog_path is not None:
        if os.path.exists(changelog_path):
            result = True

    return result


class EggInfoCommand(setuptools.command.egg_info.egg_info, object):
    """ Custom ‘egg_info’ command for this distribution. """

    sub_commands = ([
            ('write_version_info', has_changelog),
            ] + setuptools.command.egg_info.egg_info.sub_commands)

    def run(self):
        """ Execute this command. """
        super(EggInfoCommand, self).run()

        for command_name in self.get_sub_commands():
            self.run_command(command_name)


version_info_filename = "version_info.json"

class WriteVersionInfoCommand(EggInfoCommand, object):
    """ Setuptools command to serialise version info metadata. """

    user_options = ([
            ("changelog-path=", None,
             "Filesystem path to the changelog document."),
            ("outfile-path=", None,
             "Filesystem path to the version info file."),
            ] + EggInfoCommand.user_options)

    def initialize_options(self):
        """ Initialise command options to defaults. """
        super(WriteVersionInfoCommand, self).initialize_options()
        self.changelog_path = None
        self.outfile_path = None

    def finalize_options(self):
        """ Finalise command options before execution. """
        self.set_undefined_options(
                'build',
                ('force', 'force'))

        super(WriteVersionInfoCommand, self).finalize_options()

        if self.changelog_path is None:
            self.changelog_path = get_changelog_path(self.distribution)

        if self.outfile_path is None:
            egg_dir = self.egg_info
            self.outfile_path = os.path.join(egg_dir, version_info_filename)

    def run(self):
        """ Execute this command. """
        version_info = generate_version_info_from_changelog(self.changelog_path)
        content = serialise_version_info_from_mapping(version_info)
        self.write_file("version info", self.outfile_path, content)


# Local variables:
# coding: utf-8
# mode: python
# End:
# vim: fileencoding=utf-8 filetype=python :