aboutsummaryrefslogtreecommitdiffstats
path: root/main.py
blob: d575567854c5ba854d6008c56d7317772b436a31 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
#!/usr/bin/env python

# Copyright (c) 2016 Cisco and/or its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""This is a helper script to make test execution easy."""

from __future__ import print_function
import sys
import os
import time
from string import ascii_lowercase
from random import sample
import argparse
from pykwalify.core import Core
from pykwalify.errors import PyKwalifyException
from yaml import load
import robot
from robot.errors import DATA_ERROR, DataError, FRAMEWORK_ERROR, FrameworkError
from robot.run import RobotFramework
from robot.conf.settings import RobotSettings
from robot.running.builder import TestSuiteBuilder
from robot.running.model import TestSuite

TOPOLOGIES_DIR = './topologies/enabled/'
TESTS_DIR = './tests'
OUTPUTS_DIR = './outputs'


def get_suite_list(*datasources, **options):
    """Returns filtered test suites based on include exclude tags

    :param datasources: paths to tests
    :param options: Robot Framework options (robot.conf.settings.py)
    :return: list of Robot Framework TestSuites which contain tests
    """
    class _MyRobotFramework(RobotFramework):
        """Custom implementation of RobotFramework main()."""
        def main(self, datasources, **options):
            # copied from robot.run.RobotFramework.main
            settings = RobotSettings(options)
            test_suite = TestSuiteBuilder(settings['SuiteNames'],
                                          settings['WarnOnSkipped'])
            # pylint: disable=star-args
            suite = test_suite.build(*datasources)
            suite.configure(**settings.suite_config)

            return suite

    # get all test cases list without run tests, execute runs overloaded main
    # function
    suite = _MyRobotFramework().execute(*datasources, output=None, dryrun=True,
                                        **options)
    if isinstance(suite, TestSuite):
        suites = []
        suites.append(suite)
        append_new = True
        while append_new:
            append_new = False
            tmp = []
            for suite in suites:
                # pylint: disable=protected-access
                if len(suite.suites._items) > 0:
                    for i in suite.suites._items:
                        tmp.append(i)
                    append_new = True
                else:
                    tmp.append(suite)
            suites = tmp
        return suites
        # TODO: check testcases Tags ? all tests should have same set of tags
    else:
        if suite == DATA_ERROR:
            raise DataError
        if suite == FRAMEWORK_ERROR:
            raise FrameworkError
        return []


def run_suites(tests_dir, suites, output_dir, output_prefix='suite',
               **options):
    """Execute RF's run with parameters."""

    with open('{}/{}.out'.format(output_dir, output_prefix), 'w') as out:
        robot.run(tests_dir,
                  suite=[s.longname for s in suites],
                  output='{}/{}.xml'.format(output_dir, output_prefix),
                  debugfile='{}/{}.log'.format(output_dir, output_prefix),
                  log=None,
                  report=None,
                  stdout=out,
                  **options)


def parse_outputs(output_dir):
    """Parse output xmls from all executed tests."""

    outs = [os.path.join(output_dir, file_name)
            for file_name in os.listdir(output_dir)
            if file_name.endswith('.xml')]
    # pylint: disable=star-args
    robot.rebot(*outs, merge=True)


def topology_lookup(topology_paths, topo_dir, validate):
    """Make topology list and validate topologies against schema

    :param parsed_args: topology list, is empty then scans topologies in
                        topo_dir
    :param topo_dir: scan directory for topologies
    :param validate: if True then validate topology
    :return: list of topologies
    """

    ret_topologies = []
    if topology_paths:
        for topo in topology_paths:
            if os.path.exists(topo):
                ret_topologies.append(topo)
            else:
                print("Topology file {} doesn't exist".format(topo),
                      file=sys.stderr)
    else:
        ret_topologies = [os.path.join(topo_dir, file_name)
                          for file_name in os.listdir(topo_dir)
                          if file_name.lower().endswith('.yaml')]

    if len(ret_topologies) == 0:
        print('No valid topology found', file=sys.stderr)
        exit(1)

    # validate topologies against schema
    exit_on_error = False
    for topology_name in ret_topologies:
        try:
            with open(topology_name) as file_name:
                yaml_obj = load(file_name)
            core = Core(source_file=topology_name,
                        schema_files=yaml_obj["metadata"]["schema"])
            core.validate()
        except PyKwalifyException as ex:
            print('Unable to verify topology {}, schema error: {}'.\
                  format(topology_name, ex),
                  file=sys.stderr)
            exit_on_error = True
        except KeyError as ex:
            print('Unable to verify topology {}, key error: {}'.\
                  format(topology_name, ex),
                  file=sys.stderr)
            exit_on_error = True
        except Exception as ex:
            print('Unable to verify topology {}, {}'.format(topology_name, ex),
                  file=sys.stderr)
            exit_on_error = True

    if exit_on_error and validate:
        exit(1)

    return ret_topologies


def main():
    """Main function."""
    parser = argparse.ArgumentParser(description='A test runner')
    parser.add_argument('-i', '--include', action='append',
                        help='include tests with tag')
    parser.add_argument('-e', '--exclude', action='append',
                        help='exclude tests with tag')
    parser.add_argument('-s', '--suite', action='append',
                        help='full name of suite to run')
    parser.add_argument('-t', '--topology', action='append',
                        help='topology where tests should be run')
    parser.add_argument('-d', '--test_dir', nargs='?', default=TESTS_DIR,
                        help='where tests are stored')
    parser.add_argument('-o', '--output_dir', nargs='?', default=OUTPUTS_DIR,
                        help='where results are stored')
    parser.add_argument('-L', '--loglevel', nargs='?', default='INFO', type=str,
                        choices=['TRACE', 'DEBUG', 'INFO', 'WARN', 'NONE'],
                        help='robot frameworks level for logging')
    parser.add_argument('-n', '--no_validate', action="store_false",
                        help='Do not exit if topology validation failed')

    args = parser.parse_args()

    i = args.include or []
    excl = args.exclude or []
    suite_filter = args.suite or []
    test_dir = args.test_dir

    # prepare output subdir
    suite_output_dir = os.path.join(args.output_dir,
                                    time.strftime('%y%m%d%H%M%S'))
    os.makedirs(suite_output_dir)

    topologies = topology_lookup(args.topology, TOPOLOGIES_DIR,
                                 args.no_validate)
    suite_list = get_suite_list(test_dir, include=i, exclude=excl,
                                suite=suite_filter)

    # TODO: do the topology suite mapping magic
    #       for now all tests on single topology
    if len(topologies) > 1:
        print('Multiple topologies unsupported yet', file=sys.stderr)
        exit(1)
    topology_suite_mapping = {topologies[0]: suite_list}

    # on all topologies, run test
    # TODO: run parallel
    for topology_path, topology_suite_list in topology_suite_mapping.items():
        topology_path_variable = 'TOPOLOGY_PATH:{}'.format(topology_path)
        variables = [topology_path_variable]
        print('Runing tests on topology {}'.format(topology_path))
        run_suites(test_dir, topology_suite_list, variable=variables,
                   output_dir=suite_output_dir,
                   output_prefix=''.join(sample(ascii_lowercase, 5)),
                   include=i, exclude=excl, loglevel=args.loglevel)

    print('Parsing test results')
    parse_outputs(suite_output_dir)


if __name__ == "__main__":
    main()
span>)] merged_l4args = {'sport': self.ports[s0], 'dport': self.ports[s1]} merged_l4args.update(l4args) p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) / layer_3[is_ip6] / self.l4proto(**merged_l4args) / Raw(payload)) return p def send(self, side, flags=None, payload=""): l4args = {} if flags is not None: l4args['flags'] = flags self.ifs[side].add_stream(self.pkt(side, l4args=l4args, payload=payload)) self.ifs[1 - side].enable_capture() self.testcase.pg_start() def recv(self, side): p = self.ifs[side].wait_for_packet(1) return p def send_through(self, side, flags=None, payload=""): self.send(side, flags, payload) p = self.recv(1 - side) return p def send_pingpong(self, side, flags1=None, flags2=None): p1 = self.send_through(side, flags1) p2 = self.send_through(1 - side, flags2) return [p1, p2] class L4_CONN_SIDE: L4_CONN_SIDE_ZERO = 0 L4_CONN_SIDE_ONE = 1 def fragment_rfc791(packet, fragsize, logger=null_logger): """ Fragment an IPv4 packet per RFC 791 :param packet: packet to fragment :param fragsize: size at which to fragment :note: IP options are not supported :returns: list of fragments """ logger.debug(ppp("Fragmenting packet:", packet)) packet = packet.__class__(scapy.compat.raw(packet)) # recalc. all values if len(packet[IP].options) > 0: raise Exception("Not implemented") if len(packet) <= fragsize: return [packet] pre_ip_len = len(packet) - len(packet[IP]) ip_header_len = packet[IP].ihl * 4 hex_packet = scapy.compat.raw(packet) hex_headers = hex_packet[:(pre_ip_len + ip_header_len)] hex_payload = hex_packet[(pre_ip_len + ip_header_len):] pkts = [] ihl = packet[IP].ihl otl = len(packet[IP]) nfb = int((fragsize - pre_ip_len - ihl * 4) / 8) fo = packet[IP].frag p = packet.__class__(hex_headers + hex_payload[:nfb * 8]) p[IP].flags = "MF" p[IP].frag = fo p[IP].len = ihl * 4 + nfb * 8 del p[IP].chksum pkts.append(p) p = packet.__class__(hex_headers + hex_payload[nfb * 8:]) p[IP].len = otl - nfb * 8 p[IP].frag = fo + nfb del p[IP].chksum more_fragments = fragment_rfc791(p, fragsize, logger) pkts.extend(more_fragments) return pkts def fragment_rfc8200(packet, identification, fragsize, logger=null_logger): """ Fragment an IPv6 packet per RFC 8200 :param packet: packet to fragment :param fragsize: size at which to fragment :note: IP options are not supported :returns: list of fragments """ packet = packet.__class__(scapy.compat.raw(packet)) # recalc. all values if len(packet) <= fragsize: return [packet] logger.debug(ppp("Fragmenting packet:", packet)) pkts = [] counter = 0 routing_hdr = None hop_by_hop_hdr = None upper_layer = None seen_ipv6 = False ipv6_nr = -1 l = packet.getlayer(counter) while l is not None: if l.__class__ is IPv6: if seen_ipv6: # ignore 2nd IPv6 header and everything below.. break ipv6_nr = counter seen_ipv6 = True elif l.__class__ is IPv6ExtHdrFragment: raise Exception("Already fragmented") elif l.__class__ is IPv6ExtHdrRouting: routing_hdr = counter elif l.__class__ is IPv6ExtHdrHopByHop: hop_by_hop_hdr = counter elif seen_ipv6 and not upper_layer and \ not l.__class__.__name__.startswith('IPv6ExtHdr'): upper_layer = counter counter = counter + 1 l = packet.getlayer(counter) logger.debug( "Layers seen: IPv6(#%s), Routing(#%s), HopByHop(#%s), upper(#%s)" % (ipv6_nr, routing_hdr, hop_by_hop_hdr, upper_layer)) if upper_layer is None: raise Exception("Upper layer header not found in IPv6 packet") last_per_fragment_hdr = ipv6_nr if routing_hdr is None: if hop_by_hop_hdr is not None: last_per_fragment_hdr = hop_by_hop_hdr else: last_per_fragment_hdr = routing_hdr logger.debug("Last per-fragment hdr is #%s" % (last_per_fragment_hdr)) per_fragment_headers = packet.copy() per_fragment_headers[last_per_fragment_hdr].remove_payload() logger.debug(ppp("Per-fragment headers:", per_fragment_headers)) ext_and_upper_layer = packet.getlayer(last_per_fragment_hdr)[1] hex_payload = scapy.compat.raw(ext_and_upper_layer) logger.debug("Payload length is %s" % len(hex_payload)) logger.debug(ppp("Ext and upper layer:", ext_and_upper_layer)) fragment_ext_hdr = IPv6ExtHdrFragment() logger.debug(ppp("Fragment header:", fragment_ext_hdr)) len_ext_and_upper_layer_payload = len(ext_and_upper_layer.payload) if not len_ext_and_upper_layer_payload and \ hasattr(ext_and_upper_layer, "data"): len_ext_and_upper_layer_payload = len(ext_and_upper_layer.data) if len(per_fragment_headers) + len(fragment_ext_hdr) +\ len(ext_and_upper_layer) - len_ext_and_upper_layer_payload\ > fragsize: raise Exception("Cannot fragment this packet - MTU too small " "(%s, %s, %s, %s, %s)" % ( len(per_fragment_headers), len(fragment_ext_hdr), len(ext_and_upper_layer), len_ext_and_upper_layer_payload, fragsize)) orig_nh = packet[IPv6].nh p = per_fragment_headers del p[IPv6].plen del p[IPv6].nh p = p / fragment_ext_hdr del p[IPv6ExtHdrFragment].nh first_payload_len_nfb = int((fragsize - len(p)) / 8) p = p / Raw(hex_payload[:first_payload_len_nfb * 8]) del p[IPv6].plen p[IPv6ExtHdrFragment].nh = orig_nh p[IPv6ExtHdrFragment].id = identification p[IPv6ExtHdrFragment].offset = 0 p[IPv6ExtHdrFragment].m = 1 p = p.__class__(scapy.compat.raw(p)) logger.debug(ppp("Fragment %s:" % len(pkts), p)) pkts.append(p) offset = first_payload_len_nfb * 8 logger.debug("Offset after first fragment: %s" % offset) while len(hex_payload) > offset: p = per_fragment_headers del p[IPv6].plen del p[IPv6].nh p = p / fragment_ext_hdr del p[IPv6ExtHdrFragment].nh l_nfb = int((fragsize - len(p)) / 8) p = p / Raw(hex_payload[offset:offset + l_nfb * 8]) p[IPv6ExtHdrFragment].nh = orig_nh p[IPv6ExtHdrFragment].id = identification p[IPv6ExtHdrFragment].offset = int(offset / 8) p[IPv6ExtHdrFragment].m = 1 p = p.__class__(scapy.compat.raw(p)) logger.debug(ppp("Fragment %s:" % len(pkts), p)) pkts.append(p) offset = offset + l_nfb * 8 pkts[-1][IPv6ExtHdrFragment].m = 0 # reset more-flags in last fragment return pkts def reassemble4_core(listoffragments, return_ip): buffer = BytesIO() first = listoffragments[0] buffer.seek(20) for pkt in listoffragments: buffer.seek(pkt[IP].frag*8) buffer.write(bytes(pkt[IP].payload)) first.len = len(buffer.getvalue()) + 20 first.flags = 0 del(first.chksum) if return_ip: header = bytes(first[IP])[:20] return first[IP].__class__(header + buffer.getvalue()) else: header = bytes(first[Ether])[:34] return first[Ether].__class__(header + buffer.getvalue()) def reassemble4_ether(listoffragments): return reassemble4_core(listoffragments, False) def reassemble4(listoffragments): return reassemble4_core(listoffragments, True)