summaryrefslogtreecommitdiffstats
path: root/scripts/external_libs/scapy-2.3.1/python2/scapy/asn1fields.py
blob: 1a59bd50d757db44e254a9ff7c8d3b74d5b62aff (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
## This file is part of Scapy
## See http://www.secdev.org/projects/scapy for more informations
## Copyright (C) Philippe Biondi <phil@secdev.org>
## This program is published under a GPLv2 license

"""
Classes that implement ASN.1 data structures.
"""

from asn1.asn1 import *
from asn1.ber import *
from volatile import *
from base_classes import BasePacket


#####################
#### ASN1 Fields ####
#####################

class ASN1F_badsequence(Exception):
    pass

class ASN1F_element:
    pass

class ASN1F_optionnal(ASN1F_element):
    def __init__(self, field):
        self._field=field
    def __getattr__(self, attr):
        return getattr(self._field,attr)
    def dissect(self,pkt,s):
        try:
            return self._field.dissect(pkt,s)
        except ASN1F_badsequence:
            self._field.set_val(pkt,None)
            return s
        except BER_Decoding_Error:
            self._field.set_val(pkt,None)
            return s
    def build(self, pkt):
        if self._field.is_empty(pkt):
            return ""
        return self._field.build(pkt)

class ASN1F_field(ASN1F_element):
    holds_packets=0
    islist=0

    ASN1_tag = ASN1_Class_UNIVERSAL.ANY
    context=ASN1_Class_UNIVERSAL
    
    def __init__(self, name, default, context=None):
        if context is not None:
            self.context = context
        self.name = name
        self.default = default

    def i2repr(self, pkt, x):
        return repr(x)
    def i2h(self, pkt, x):
        return x
    def any2i(self, pkt, x):
        return x
    def m2i(self, pkt, x):
        return self.ASN1_tag.get_codec(pkt.ASN1_codec).safedec(x, context=self.context)
    def i2m(self, pkt, x):
        if x is None:
            x = 0
        if isinstance(x, ASN1_Object):
            if ( self.ASN1_tag == ASN1_Class_UNIVERSAL.ANY
                 or x.tag == ASN1_Class_UNIVERSAL.RAW
                 or x.tag == ASN1_Class_UNIVERSAL.ERROR
                 or self.ASN1_tag == x.tag ):
                return x.enc(pkt.ASN1_codec)
            else:
                raise ASN1_Error("Encoding Error: got %r instead of an %r for field [%s]" % (x, self.ASN1_tag, self.name))
        return self.ASN1_tag.get_codec(pkt.ASN1_codec).enc(x)

    def do_copy(self, x):
        if hasattr(x, "copy"):
            return x.copy()
        if type(x) is list:
            x = x[:]
            for i in xrange(len(x)):
                if isinstance(x[i], BasePacket):
                    x[i] = x[i].copy()
        return x

    def build(self, pkt):
        return self.i2m(pkt, getattr(pkt, self.name))

    def set_val(self, pkt, val):
        setattr(pkt, self.name, val)
    def is_empty(self, pkt):
        return getattr(pkt,self.name) is None
    
    def dissect(self, pkt, s):
        v,s = self.m2i(pkt, s)
        self.set_val(pkt, v)
        return s

    def get_fields_list(self):
        return [self]

    def __hash__(self):
        return hash(self.name)
    def __str__(self):
        return self.name
    def __eq__(self, other):
        return self.name == other
    def __repr__(self):
        return self.name
    def randval(self):
        return RandInt()


class ASN1F_INTEGER(ASN1F_field):
    ASN1_tag= ASN1_Class_UNIVERSAL.INTEGER
    def randval(self):
        return RandNum(-2**64, 2**64-1)

class ASN1F_BOOLEAN(ASN1F_field):
    ASN1_tag= ASN1_Class_UNIVERSAL.BOOLEAN
    def randval(self):
        return RandChoice(True,False)

class ASN1F_NULL(ASN1F_INTEGER):
    ASN1_tag= ASN1_Class_UNIVERSAL.NULL

class ASN1F_SEP(ASN1F_NULL):
    ASN1_tag= ASN1_Class_UNIVERSAL.SEP

class ASN1F_enum_INTEGER(ASN1F_INTEGER):
    def __init__(self, name, default, enum):
        ASN1F_INTEGER.__init__(self, name, default)
        i2s = self.i2s = {}
        s2i = self.s2i = {}
        if type(enum) is list:
            keys = xrange(len(enum))
        else:
            keys = enum.keys()
        if filter(lambda x: type(x) is str, keys):
            i2s,s2i = s2i,i2s
        for k in keys:
            i2s[k] = enum[k]
            s2i[enum[k]] = k
    def any2i_one(self, pkt, x):
        if type(x) is str:
            x = self.s2i[x]
        return x
    def i2repr_one(self, pkt, x):
        return self.i2s.get(x, repr(x))
    
    def any2i(self, pkt, x):
        if type(x) is list:
            return map(lambda z,pkt=pkt:self.any2i_one(pkt,z), x)
        else:
            return self.any2i_one(pkt,x)        
    def i2repr(self, pkt, x):
        if type(x) is list:
            return map(lambda z,pkt=pkt:self.i2repr_one(pkt,z), x)
        else:
            return self.i2repr_one(pkt,x)

class ASN1F_ENUMERATED(ASN1F_enum_INTEGER):
    ASN1_tag = ASN1_Class_UNIVERSAL.ENUMERATED

class ASN1F_STRING(ASN1F_field):
    ASN1_tag = ASN1_Class_UNIVERSAL.STRING
    def randval(self):
        return RandString(RandNum(0, 1000))

class ASN1F_PRINTABLE_STRING(ASN1F_STRING):
    ASN1_tag = ASN1_Class_UNIVERSAL.PRINTABLE_STRING

class ASN1F_BIT_STRING(ASN1F_STRING):
    ASN1_tag = ASN1_Class_UNIVERSAL.BIT_STRING
    
class ASN1F_IPADDRESS(ASN1F_STRING):
    ASN1_tag = ASN1_Class_UNIVERSAL.IPADDRESS    

class ASN1F_TIME_TICKS(ASN1F_INTEGER):
    ASN1_tag = ASN1_Class_UNIVERSAL.TIME_TICKS

class ASN1F_UTC_TIME(ASN1F_STRING):
    ASN1_tag = ASN1_Class_UNIVERSAL.UTC_TIME

class ASN1F_GENERALIZED_TIME(ASN1F_STRING):
    ASN1_tag = ASN1_Class_UNIVERSAL.GENERALIZED_TIME

class ASN1F_OID(ASN1F_field):
    ASN1_tag = ASN1_Class_UNIVERSAL.OID
    def randval(self):
        return RandOID()

class ASN1F_SEQUENCE(ASN1F_field):
    ASN1_tag = ASN1_Class_UNIVERSAL.SEQUENCE
    def __init__(self, *seq, **kargs):
        if "ASN1_tag" in kargs:
            self.ASN1_tag = kargs["ASN1_tag"]
        self.seq = seq
    def __repr__(self):
        return "<%s%r>" % (self.__class__.__name__,self.seq,)
    def set_val(self, pkt, val):
        for f in self.seq:
            f.set_val(pkt,val)
    def is_empty(self, pkt):
        for f in self.seq:
            if not f.is_empty(pkt):
                return False
        return True
    def get_fields_list(self):
        return reduce(lambda x,y: x+y.get_fields_list(), self.seq, [])
    def build(self, pkt):
        s = reduce(lambda x,y: x+y.build(pkt), self.seq, "")
        return self.i2m(pkt, s)
    def dissect(self, pkt, s):
        codec = self.ASN1_tag.get_codec(pkt.ASN1_codec)
        try:
            i,s,remain = codec.check_type_check_len(s)
            for obj in self.seq:
                s = obj.dissect(pkt,s)
            if s:
                warning("Too many bytes to decode sequence: [%r]" % s) # XXX not reversible!
            return remain
        except ASN1_Error,e:
            raise ASN1F_badsequence(e)

class ASN1F_SET(ASN1F_SEQUENCE):
    ASN1_tag = ASN1_Class_UNIVERSAL.SET

class ASN1F_SEQUENCE_OF(ASN1F_SEQUENCE):
    holds_packets = 1
    islist = 1
    def __init__(self, name, default, asn1pkt, ASN1_tag=0x30):
        self.asn1pkt = asn1pkt
        self.tag = chr(ASN1_tag)
        self.name = name
        self.default = default
    def i2repr(self, pkt, i):
        if i is None:
            return []
        return i
    def get_fields_list(self):
        return [self]
    def set_val(self, pkt, val):
        ASN1F_field.set_val(self, pkt, val)
    def is_empty(self, pkt):
        return ASN1F_field.is_empty(self, pkt)
    def build(self, pkt):
        val = getattr(pkt, self.name)
        if isinstance(val, ASN1_Object) and val.tag == ASN1_Class_UNIVERSAL.RAW:
            s = val
        elif val is None:
            s = ""
        else:
            s = "".join(map(str, val ))
        return self.i2m(pkt, s)
    def dissect(self, pkt, s):
        codec = self.ASN1_tag.get_codec(pkt.ASN1_codec)
        i,s1,remain = codec.check_type_check_len(s)
        lst = []
        while s1:
            try:
                p = self.asn1pkt(s1)
            except ASN1F_badsequence,e:
                lst.append(conf.raw_layer(s1))
                break
            lst.append(p)
            if conf.raw_layer in p:
                s1 = p[conf.raw_layer].load
                del(p[conf.raw_layer].underlayer.payload)
            else:
                break
        self.set_val(pkt, lst)
        return remain
    def randval(self):
        return fuzz(self.asn1pkt())
    def __repr__(self):
        return "<%s %s>" % (self.__class__.__name__,self.name)

class ASN1F_PACKET(ASN1F_field):
    holds_packets = 1
    def __init__(self, name, default, cls):
        ASN1F_field.__init__(self, name, default)
        self.cls = cls
    def i2m(self, pkt, x):
        if x is None:
            x = ""
        return str(x)
    def extract_packet(self, cls, x):
        try:
            c = cls(x)
        except ASN1F_badsequence:
            c = conf.raw_layer(x)
        cpad = c.getlayer(conf.padding_layer)
        x = ""
        if cpad is not None:
            x = cpad.load
            del(cpad.underlayer.payload)
        return c,x
    def m2i(self, pkt, x):
        return self.extract_packet(self.cls, x)


class ASN1F_CHOICE(ASN1F_PACKET):
    ASN1_tag = ASN1_Class_UNIVERSAL.NONE
    def __init__(self, name, default, *args):
        self.name=name
        self.choice = {}
        for p in args:
            self.choice[p.ASN1_root.ASN1_tag] = p
#        self.context=context
        self.default=default
    def m2i(self, pkt, x):
        if len(x) == 0:
            return conf.raw_layer(),""
            raise ASN1_Error("ASN1F_CHOICE: got empty string")
        if ord(x[0]) not in self.choice:
            return conf.raw_layer(x),"" # XXX return RawASN1 packet ? Raise error 
            raise ASN1_Error("Decoding Error: choice [%i] not found in %r" % (ord(x[0]), self.choice.keys()))

        z = ASN1F_PACKET.extract_packet(self, self.choice[ord(x[0])], x)
        return z
    def randval(self):
        return RandChoice(*map(lambda x:fuzz(x()), self.choice.values()))
            
    
# This import must come in last to avoid problems with cyclic dependencies
import packet
>(link_path): os.symlink(last_test_temp_dir, link_path) logger.error("Symlink to failed testcase directory: %s -> %s" % (link_path, lttd)) # Report core existence core_path = get_core_path(last_test_temp_dir) if os.path.exists(core_path): logger.error( "Core-file exists in test temporary directory: %s!" % core_path) check_core_path(logger, core_path) logger.debug("Running 'file %s':" % core_path) try: info = check_output(["file", core_path]) logger.debug(info) except CalledProcessError as e: logger.error("Subprocess returned with return code " "while running `file' utility on core-file " "returned: " "rc=%s", e.returncode) except OSError as e: logger.error("Subprocess returned with OS error while " "running 'file' utility " "on core-file: " "(%s) %s", e.errno, e.strerror) except Exception as e: logger.exception("Unexpected error running `file' utility " "on core-file") if vpp_pid: # Copy api post mortem api_post_mortem_path = "/tmp/api_post_mortem.%d" % vpp_pid if os.path.isfile(api_post_mortem_path): logger.error("Copying api_post_mortem.%d to %s" % (vpp_pid, last_test_temp_dir)) shutil.copy2(api_post_mortem_path, last_test_temp_dir) def check_and_handle_core(vpp_binary, tempdir, core_crash_test): if is_core_present(tempdir): if debug_core: print('VPP core detected in %s. Last test running was %s' % (tempdir, core_crash_test)) print(single_line_delim) spawn_gdb(vpp_binary, get_core_path(tempdir)) print(single_line_delim) elif compress_core: print("Compressing core-file in test directory `%s'" % tempdir) os.system("gzip %s" % get_core_path(tempdir)) def handle_cores(failed_testcases): for failed_testcase in failed_testcases: tcs_with_core = failed_testcase.testclasess_with_core if tcs_with_core: for test, vpp_binary, tempdir in tcs_with_core.values(): check_and_handle_core(vpp_binary, tempdir, test) def process_finished_testsuite(wrapped_testcase_suite, finished_testcase_suites, failed_wrapped_testcases, results): results.append(wrapped_testcase_suite.result) finished_testcase_suites.add(wrapped_testcase_suite) stop_run = False if failfast and not wrapped_testcase_suite.was_successful(): stop_run = True if not wrapped_testcase_suite.was_successful(): failed_wrapped_testcases.add(wrapped_testcase_suite) handle_failed_suite(wrapped_testcase_suite.logger, wrapped_testcase_suite.last_test_temp_dir, wrapped_testcase_suite.vpp_pid) return stop_run def run_forked(testcase_suites): wrapped_testcase_suites = set() # suites are unhashable, need to use list results = [] unread_testcases = set() finished_unread_testcases = set() manager = StreamQueueManager() manager.start() for i in range(concurrent_tests): if testcase_suites: wrapped_testcase_suite = TestCaseWrapper(testcase_suites.pop(0), manager) wrapped_testcase_suites.add(wrapped_testcase_suite) unread_testcases.add(wrapped_testcase_suite) else: break read_from_testcases = threading.Event() read_from_testcases.set() stdouterr_thread = threading.Thread(target=stdouterr_reader_wrapper, args=(unread_testcases, finished_unread_testcases, read_from_testcases)) stdouterr_thread.start() failed_wrapped_testcases = set() stop_run = False try: while wrapped_testcase_suites: finished_testcase_suites = set() for wrapped_testcase_suite in wrapped_testcase_suites: while wrapped_testcase_suite.result_parent_end.poll(): wrapped_testcase_suite.result.process_result( *wrapped_testcase_suite.result_parent_end.recv()) wrapped_testcase_suite.last_heard = time.time() while wrapped_testcase_suite.keep_alive_parent_end.poll(): wrapped_testcase_suite.last_test, \ wrapped_testcase_suite.last_test_vpp_binary, \ wrapped_testcase_suite.last_test_temp_dir, \ wrapped_testcase_suite.vpp_pid = \ wrapped_testcase_suite.keep_alive_parent_end.recv() wrapped_testcase_suite.last_heard = time.time() if wrapped_testcase_suite.finished_parent_end.poll(): wrapped_testcase_suite.finished_parent_end.recv() wrapped_testcase_suite.last_heard = time.time() stop_run = process_finished_testsuite( wrapped_testcase_suite, finished_testcase_suites, failed_wrapped_testcases, results) or stop_run continue fail = False if wrapped_testcase_suite.last_heard + test_timeout < \ time.time(): fail = True wrapped_testcase_suite.logger.critical( "Child test runner process timed out " "(last test running was `%s' in `%s')!" % (wrapped_testcase_suite.last_test, wrapped_testcase_suite.last_test_temp_dir)) elif not wrapped_testcase_suite.child.is_alive(): fail = True wrapped_testcase_suite.logger.critical( "Child test runner process unexpectedly died " "(last test running was `%s' in `%s')!" % (wrapped_testcase_suite.last_test, wrapped_testcase_suite.last_test_temp_dir)) elif wrapped_testcase_suite.last_test_temp_dir and \ wrapped_testcase_suite.last_test_vpp_binary: if is_core_present( wrapped_testcase_suite.last_test_temp_dir): wrapped_testcase_suite.add_testclass_with_core() if wrapped_testcase_suite.core_detected_at is None: wrapped_testcase_suite.core_detected_at = \ time.time() elif wrapped_testcase_suite.core_detected_at + \ core_timeout < time.time(): wrapped_testcase_suite.logger.critical( "Child test runner process unresponsive and " "core-file exists in test temporary directory " "(last test running was `%s' in `%s')!" % (wrapped_testcase_suite.last_test, wrapped_testcase_suite.last_test_temp_dir)) fail = True if fail: wrapped_testcase_suite.child.terminate() try: # terminating the child process tends to leave orphan # VPP process around if wrapped_testcase_suite.vpp_pid: os.kill(wrapped_testcase_suite.vpp_pid, signal.SIGTERM) except OSError: # already dead pass wrapped_testcase_suite.result.crashed = True wrapped_testcase_suite.result.process_result( wrapped_testcase_suite.last_test_id, ERROR) stop_run = process_finished_testsuite( wrapped_testcase_suite, finished_testcase_suites, failed_wrapped_testcases, results) or stop_run for finished_testcase in finished_testcase_suites: finished_testcase.child.join() finished_testcase.close_pipes() wrapped_testcase_suites.remove(finished_testcase) finished_unread_testcases.add(finished_testcase) finished_testcase.stdouterr_queue.put(None) if stop_run: while testcase_suites: results.append(TestResult(testcase_suites.pop(0))) elif testcase_suites: new_testcase = TestCaseWrapper(testcase_suites.pop(0), manager) wrapped_testcase_suites.add(new_testcase) unread_testcases.add(new_testcase) time.sleep(0.1) except Exception: for wrapped_testcase_suite in wrapped_testcase_suites: wrapped_testcase_suite.child.terminate() wrapped_testcase_suite.stdouterr_queue.put(None) raise finally: read_from_testcases.clear() stdouterr_thread.join(test_timeout) manager.shutdown() handle_cores(failed_wrapped_testcases) return results class SplitToSuitesCallback: def __init__(self, filter_callback): self.suites = {} self.suite_name = 'default' self.filter_callback = filter_callback self.filtered = unittest.TestSuite() def __call__(self, file_name, cls, method): test_method = cls(method) if self.filter_callback(file_name, cls.__name__, method): self.suite_name = file_name + cls.__name__ if self.suite_name not in self.suites: self.suites[self.suite_name] = unittest.TestSuite() self.suites[self.suite_name].addTest(test_method) else: self.filtered.addTest(test_method) test_option = "TEST" def parse_test_option(): f = os.getenv(test_option, None) filter_file_name = None filter_class_name = None filter_func_name = None if f: if '.' in f: parts = f.split('.') if len(parts) > 3: raise Exception("Unrecognized %s option: %s" % (test_option, f)) if len(parts) > 2: if parts[2] not in ('*', ''): filter_func_name = parts[2] if parts[1] not in ('*', ''): filter_class_name = parts[1] if parts[0] not in ('*', ''): if parts[0].startswith('test_'): filter_file_name = parts[0] else: filter_file_name = 'test_%s' % parts[0] else: if f.startswith('test_'): filter_file_name = f else: filter_file_name = 'test_%s' % f if filter_file_name: filter_file_name = '%s.py' % filter_file_name return filter_file_name, filter_class_name, filter_func_name def filter_tests(tests, filter_cb): result = unittest.suite.TestSuite() for t in tests: if isinstance(t, unittest.suite.TestSuite): # this is a bunch of tests, recursively filter... x = filter_tests(t, filter_cb) if x.countTestCases() > 0: result.addTest(x) elif isinstance(t, unittest.TestCase): # this is a single test parts = t.id().split('.') # t.id() for common cases like this: # test_classifier.TestClassifier.test_acl_ip # apply filtering only if it is so if len(parts) == 3: if not filter_cb(parts[0], parts[1], parts[2]): continue result.addTest(t) else: # unexpected object, don't touch it result.addTest(t) return result class FilterByTestOption: def __init__(self, filter_file_name, filter_class_name, filter_func_name): self.filter_file_name = filter_file_name self.filter_class_name = filter_class_name self.filter_func_name = filter_func_name def __call__(self, file_name, class_name, func_name): if self.filter_file_name: fn_match = fnmatch.fnmatch(file_name, self.filter_file_name) if not fn_match: return False if self.filter_class_name and class_name != self.filter_class_name: return False if self.filter_func_name and func_name != self.filter_func_name: return False return True class FilterByClassList: def __init__(self, classes_with_filenames): self.classes_with_filenames = classes_with_filenames def __call__(self, file_name, class_name, func_name): return '.'.join([file_name, class_name]) in self.classes_with_filenames def suite_from_failed(suite, failed): failed = {x.rsplit('.', 1)[0] for x in failed} filter_cb = FilterByClassList(failed) suite = filter_tests(suite, filter_cb) return suite class AllResults(dict): def __init__(self): super(AllResults, self).__init__() self.all_testcases = 0 self.results_per_suite = [] self[PASS] = 0 self[FAIL] = 0 self[ERROR] = 0 self[SKIP] = 0 self[TEST_RUN] = 0 self.rerun = [] self.testsuites_no_tests_run = [] def add_results(self, result): self.results_per_suite.append(result) result_types = [PASS, FAIL, ERROR, SKIP, TEST_RUN] for result_type in result_types: self[result_type] += len(result[result_type]) def add_result(self, result): retval = 0 self.all_testcases += result.testcase_suite.countTestCases() self.add_results(result) if result.no_tests_run(): self.testsuites_no_tests_run.append(result.testcase_suite) if result.crashed: retval = -1 else: retval = 1 elif not result.was_successful(): retval = 1 if retval != 0: self.rerun.append(result.testcase_suite) return retval def print_results(self): print('') print(double_line_delim) print('TEST RESULTS:') print(' Scheduled tests: {}'.format(self.all_testcases)) print(' Executed tests: {}'.format(self[TEST_RUN])) print(' Passed tests: {}'.format( colorize(str(self[PASS]), GREEN))) if self[SKIP] > 0: print(' Skipped tests: {}'.format( colorize(str(self[SKIP]), YELLOW))) if self.not_executed > 0: print(' Not Executed tests: {}'.format( colorize(str(self.not_executed), RED))) if self[FAIL] > 0: print(' Failures: {}'.format( colorize(str(self[FAIL]), RED))) if self[ERROR] > 0: print(' Errors: {}'.format( colorize(str(self[ERROR]), RED))) if self.all_failed > 0: print('FAILURES AND ERRORS IN TESTS:') for result in self.results_per_suite: failed_testcase_ids = result[FAIL] errored_testcase_ids = result[ERROR] old_testcase_name = None if failed_testcase_ids or errored_testcase_ids: for failed_test_id in failed_testcase_ids: new_testcase_name, test_name = \ result.get_testcase_names(failed_test_id) if new_testcase_name != old_testcase_name: print(' Testcase name: {}'.format( colorize(new_testcase_name, RED))) old_testcase_name = new_testcase_name print(' FAILURE: {} [{}]'.format( colorize(test_name, RED), failed_test_id)) for failed_test_id in errored_testcase_ids: new_testcase_name, test_name = \ result.get_testcase_names(failed_test_id) if new_testcase_name != old_testcase_name: print(' Testcase name: {}'.format( colorize(new_testcase_name, RED))) old_testcase_name = new_testcase_name print(' ERROR: {} [{}]'.format( colorize(test_name, RED), failed_test_id)) if self.testsuites_no_tests_run: print('TESTCASES WHERE NO TESTS WERE SUCCESSFULLY EXECUTED:') tc_classes = set() for testsuite in self.testsuites_no_tests_run: for testcase in testsuite: tc_classes.add(get_testcase_doc_name(testcase)) for tc_class in tc_classes: print(' {}'.format(colorize(tc_class, RED))) print(double_line_delim) print('') @property def not_executed(self): return self.all_testcases - self[TEST_RUN] @property def all_failed(self): return self[FAIL] + self[ERROR] def parse_results(results): """ Prints the number of scheduled, executed, not executed, passed, failed, errored and skipped tests and details about failed and errored tests. Also returns all suites where any test failed. :param results: :return: """ results_per_suite = AllResults() crashed = False failed = False for result in results: result_code = results_per_suite.add_result(result) if result_code == 1: failed = True elif result_code == -1: crashed = True results_per_suite.print_results() if crashed: return_code = -1 elif failed: return_code = 1 else: return_code = 0 return return_code, results_per_suite.rerun def parse_digit_env(env_var, default): value = os.getenv(env_var, default) if value != default: if value.isdigit(): value = int(value) else: print('WARNING: unsupported value "%s" for env var "%s",' 'defaulting to %s' % (value, env_var, default)) value = default return value if __name__ == '__main__': verbose = parse_digit_env("V", 0) test_timeout = parse_digit_env("TIMEOUT", 600) # default = 10 minutes retries = parse_digit_env("RETRIES", 0) debug = os.getenv("DEBUG", "n").lower() in ["gdb", "gdbserver"] debug_core = os.getenv("DEBUG", "").lower() == "core" compress_core = framework.BoolEnvironmentVariable("CORE_COMPRESS") step = framework.BoolEnvironmentVariable("STEP") force_foreground = framework.BoolEnvironmentVariable("FORCE_FOREGROUND") run_interactive = debug or step or force_foreground try: num_cpus = len(os.sched_getaffinity(0)) except AttributeError: num_cpus = multiprocessing.cpu_count() shm_free = psutil.disk_usage('/dev/shm').free print('OS reports %s available cpu(s). Free shm: %s' % ( num_cpus, "{:,}MB".format(shm_free / (1024 * 1024)))) test_jobs = os.getenv("TEST_JOBS", "1").lower() # default = 1 process if test_jobs == 'auto': if run_interactive: concurrent_tests = 1 print('Interactive mode required, running on one core') else: shm_max_processes = 1 if shm_free < min_req_shm: raise Exception('Not enough free space in /dev/shm. Required ' 'free space is at least %sM.' % (min_req_shm >> 20)) else: extra_shm = shm_free - min_req_shm shm_max_processes += extra_shm / shm_per_process concurrent_tests = min(cpu_count(), shm_max_processes) print('Found enough resources to run tests with %s cores' % concurrent_tests) elif test_jobs.isdigit(): concurrent_tests = int(test_jobs) print("Running on %s core(s) as set by 'TEST_JOBS'." % concurrent_tests) else: concurrent_tests = 1 print('Running on one core.') if run_interactive and concurrent_tests > 1: raise NotImplementedError( 'Running tests interactively (DEBUG is gdb or gdbserver or STEP ' 'is set) in parallel (TEST_JOBS is more than 1) is not supported') parser = argparse.ArgumentParser(description="VPP unit tests") parser.add_argument("-f", "--failfast", action='store_true', help="fast failure flag") parser.add_argument("-d", "--dir", action='append', type=str, help="directory containing test files " "(may be specified multiple times)") args = parser.parse_args() failfast = args.failfast descriptions = True print("Running tests using custom test runner") # debug message filter_file, filter_class, filter_func = parse_test_option() print("Active filters: file=%s, class=%s, function=%s" % ( filter_file, filter_class, filter_func)) filter_cb = FilterByTestOption(filter_file, filter_class, filter_func) ignore_path = os.getenv("VENV_PATH", None) cb = SplitToSuitesCallback(filter_cb) for d in args.dir: print("Adding tests from directory tree %s" % d) discover_tests(d, cb, ignore_path) # suites are not hashable, need to use list suites = [] tests_amount = 0 for testcase_suite in cb.suites.values(): tests_amount += testcase_suite.countTestCases() suites.append(testcase_suite) print("%s out of %s tests match specified filters" % ( tests_amount, tests_amount + cb.filtered.countTestCases())) if not running_extended_tests: print("Not running extended tests (some tests will be skipped)") attempts = retries + 1 if attempts > 1: print("Perform %s attempts to pass the suite..." % attempts) if run_interactive and suites: # don't fork if requiring interactive terminal print('Running tests in foreground in the current process') full_suite = unittest.TestSuite() map(full_suite.addTests, suites) result = VppTestRunner(verbosity=verbose, failfast=failfast, print_summary=True).run(full_suite) was_successful = result.wasSuccessful() if not was_successful: for test_case_info in result.failed_test_cases_info: handle_failed_suite(test_case_info.logger, test_case_info.tempdir, test_case_info.vpp_pid) if test_case_info in result.core_crash_test_cases_info: check_and_handle_core(test_case_info.vpp_bin_path, test_case_info.tempdir, test_case_info.core_crash_test) sys.exit(not was_successful) else: print('Running each VPPTestCase in a separate background process' ' with {} parallel process(es)'.format(concurrent_tests)) exit_code = 0 while suites and attempts > 0: results = run_forked(suites) exit_code, suites = parse_results(results) attempts -= 1 if exit_code == 0: print('Test run was successful') else: print('%s attempt(s) left.' % attempts) sys.exit(exit_code)