From 8b52a31ed2c299b759f330c4f976b9c70f5765f4 Mon Sep 17 00:00:00 2001 From: Hanoh Haim Date: Wed, 24 Jun 2015 14:03:29 +0300 Subject: first version --- .../python_lib/rednose-0.4.1/rednose.py | 387 +++++++++++++++++++++ 1 file changed, 387 insertions(+) create mode 100755 scripts/automation/trex_control_plane/python_lib/rednose-0.4.1/rednose.py (limited to 'scripts/automation/trex_control_plane/python_lib/rednose-0.4.1/rednose.py') diff --git a/scripts/automation/trex_control_plane/python_lib/rednose-0.4.1/rednose.py b/scripts/automation/trex_control_plane/python_lib/rednose-0.4.1/rednose.py new file mode 100755 index 00000000..1ff892ad --- /dev/null +++ b/scripts/automation/trex_control_plane/python_lib/rednose-0.4.1/rednose.py @@ -0,0 +1,387 @@ +# Copyright (c) 2009, Tim Cuthbertson # All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following +# disclaimer in the documentation and/or other materials provided +# with the distribution. +# * Neither the name of the organisation nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS +# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE +# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, +# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, +# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS +# OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED +# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY +# WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +from __future__ import print_function +import os +import sys +import linecache +import re +import time + +import nose + +import termstyle + +failure = 'FAILED' +error = 'ERROR' +success = 'passed' +skip = 'skipped' +line_length = 77 + +PY3 = sys.version_info[0] >= 3 +if PY3: + to_unicode = str +else: + def to_unicode(s): + try: + return unicode(s) + except UnicodeDecodeError: + return unicode(repr(str(s))) + +BLACKLISTED_WRITERS = [ + 'nose[\\/]result\\.pyc?$', + 'unittest[\\/]runner\\.pyc?$' +] +REDNOSE_DEBUG = False + + +class RedNose(nose.plugins.Plugin): + env_opt = 'NOSE_REDNOSE' + env_opt_color = 'NOSE_REDNOSE_COLOR' + score = 199 # just under the `coverage` module + + def __init__(self, *args): + super(RedNose, self).__init__(*args) + self.reports = [] + self.error = self.success = self.failure = self.skip = 0 + self.total = 0 + self.stream = None + self.verbose = False + self.enabled = False + self.tree = False + + def options(self, parser, env=os.environ): + global REDNOSE_DEBUG + rednose_on = bool(env.get(self.env_opt, False)) + rednose_color = env.get(self.env_opt_color, 'auto') + REDNOSE_DEBUG = bool(env.get('REDNOSE_DEBUG', False)) + + parser.add_option( + "--rednose", + action="store_true", + default=rednose_on, + dest="rednose", + help="enable colour output (alternatively, set $%s=1)" % (self.env_opt,) + ) + parser.add_option( + "--no-color", + action="store_false", + dest="rednose", + help="disable colour output" + ) + parser.add_option( + "--force-color", + action="store_const", + dest='rednose_color', + default=rednose_color, + const='force', + help="force colour output when not using a TTY (alternatively, set $%s=force)" % (self.env_opt_color,) + ) + parser.add_option( + "--immediate", + action="store_true", + default=False, + help="print errors and failures as they happen, as well as at the end" + ) + + def configure(self, options, conf): + if options.rednose: + self.enabled = True + termstyle_init = { + 'force': termstyle.enable, + 'off': termstyle.disable + }.get(options.rednose_color, termstyle.auto) + termstyle_init() + + self.immediate = options.immediate + self.verbose = options.verbosity >= 2 + + def begin(self): + self.start_time = time.time() + self._in_test = False + + def _format_test_name(self, test): + return test.shortDescription() or to_unicode(test) + + def prepareTestResult(self, result): + result.stream = FilteringStream(self.stream, BLACKLISTED_WRITERS) + + def beforeTest(self, test): + self._in_test = True + if self.verbose: + self._out(self._format_test_name(test) + ' ... ') + + def afterTest(self, test): + if self._in_test: + self.addSkip() + + def _print_test(self, type_, color): + self.total += 1 + if self.verbose: + self._outln(color(type_)) + else: + if type_ == failure: + short_ = 'F' + elif type_ == error: + short_ = 'X' + elif type_ == skip: + short_ = '-' + else: + short_ = '.' + self._out(color(short_)) + if self.total % line_length == 0: + self._outln() + self._in_test = False + + def _add_report(self, report): + failure_type, test, err = report + self.reports.append(report) + if self.immediate: + self._outln() + self._report_test(len(self.reports), *report) + + def addFailure(self, test, err): + self.failure += 1 + self._add_report((failure, test, err)) + self._print_test(failure, termstyle.red) + + def addError(self, test, err): + if err[0].__name__ == 'SkipTest': + self.addSkip(test, err) + return + self.error += 1 + self._add_report((error, test, err)) + self._print_test(error, termstyle.yellow) + + def addSuccess(self, test): + self.success += 1 + self._print_test(success, termstyle.green) + + def addSkip(self, test=None, err=None): + self.skip += 1 + self._print_test(skip, termstyle.blue) + + def setOutputStream(self, stream): + self.stream = stream + + def report(self, stream): + """report on all registered failures and errors""" + self._outln() + if self.immediate: + for x in range(0, 5): + self._outln() + report_num = 0 + if len(self.reports) > 0: + for report_num, report in enumerate(self.reports): + self._report_test(report_num + 1, *report) + self._outln() + + self._summarize() + + def _summarize(self): + """summarize all tests - the number of failures, errors and successes""" + self._line(termstyle.black) + self._out("%s test%s run in %0.1f seconds" % ( + self.total, + self._plural(self.total), + time.time() - self.start_time)) + if self.total > self.success: + self._outln(". ") + additionals = [] + if self.failure > 0: + additionals.append(termstyle.red("%s FAILED" % ( + self.failure,))) + if self.error > 0: + additionals.append(termstyle.yellow("%s error%s" % ( + self.error, + self._plural(self.error) ))) + if self.skip > 0: + additionals.append(termstyle.blue("%s skipped" % ( + self.skip))) + self._out(', '.join(additionals)) + + self._out(termstyle.green(" (%s test%s passed)" % ( + self.success, + self._plural(self.success) ))) + self._outln() + + def _report_test(self, report_num, type_, test, err): + """report the results of a single (failing or errored) test""" + self._line(termstyle.black) + self._out("%s) " % (report_num)) + if type_ == failure: + color = termstyle.red + self._outln(color('FAIL: %s' % (self._format_test_name(test),))) + else: + color = termstyle.yellow + self._outln(color('ERROR: %s' % (self._format_test_name(test),))) + + exc_type, exc_instance, exc_trace = err + + self._outln() + self._outln(self._fmt_traceback(exc_trace)) + self._out(color(' ', termstyle.bold(color(exc_type.__name__)), ": ")) + self._outln(self._fmt_message(exc_instance, color)) + self._outln() + + def _relative_path(self, path): + """ + If path is a child of the current working directory, the relative + path is returned surrounded by bold xterm escape sequences. + If path is not a child of the working directory, path is returned + """ + try: + here = os.path.abspath(os.path.realpath(os.getcwd())) + fullpath = os.path.abspath(os.path.realpath(path)) + except OSError: + return path + if fullpath.startswith(here): + return termstyle.bold(fullpath[len(here)+1:]) + return path + + def _file_line(self, tb): + """formats the file / lineno / function line of a traceback element""" + prefix = "file://" + prefix = "" + + f = tb.tb_frame + if '__unittest' in f.f_globals: + # this is the magical flag that prevents unittest internal + # code from junking up the stacktrace + return None + + filename = f.f_code.co_filename + lineno = tb.tb_lineno + linecache.checkcache(filename) + function_name = f.f_code.co_name + + line_contents = linecache.getline(filename, lineno, f.f_globals).strip() + + return " %s line %s in %s\n %s" % ( + termstyle.blue(prefix, self._relative_path(filename)), + lineno, + termstyle.cyan(function_name), + line_contents) + + def _fmt_traceback(self, trace): + """format a traceback""" + ret = [] + ret.append(termstyle.default(" Traceback (most recent call last):")) + current_trace = trace + while current_trace is not None: + line = self._file_line(current_trace) + if line is not None: + ret.append(line) + current_trace = current_trace.tb_next + return '\n'.join(ret) + + def _fmt_message(self, exception, color): + orig_message_lines = to_unicode(exception).splitlines() + + if len(orig_message_lines) == 0: + return '' + message_lines = [color(orig_message_lines[0])] + for line in orig_message_lines[1:]: + match = re.match('^---.* begin captured stdout.*----$', line) + if match: + color = None + message_lines.append('') + line = ' ' + line + message_lines.append(color(line) if color is not None else line) + return '\n'.join(message_lines) + + def _out(self, msg='', newline=False): + self.stream.write(msg) + if newline: + self.stream.write('\n') + + def _outln(self, msg=''): + self._out(msg, True) + + def _plural(self, num): + return '' if num == 1 else 's' + + def _line(self, color=termstyle.reset, char='-'): + """ + print a line of separator characters (default '-') + in the given colour (default black) + """ + self._outln(color(char * line_length)) + + +import traceback +import sys + + +class FilteringStream(object): + """ + A wrapper for a stream that will filter + calls to `write` and `writeln` to ignore calls + from blacklisted callers + (implemented as a regex on their filename, according + to traceback.extract_stack()) + + It's super hacky, but there seems to be no other way + to suppress nose's default output + """ + def __init__(self, stream, excludes): + self.__stream = stream + self.__excludes = list(map(re.compile, excludes)) + + def __should_filter(self): + try: + stack = traceback.extract_stack(limit=3)[0] + filename = stack[0] + pattern_matches_filename = lambda pattern: pattern.search(filename) + should_filter = any(map(pattern_matches_filename, self.__excludes)) + if REDNOSE_DEBUG: + print >> sys.stderr, "REDNOSE_DEBUG: got write call from %s, should_filter = %s" % ( + filename, should_filter) + return should_filter + except StandardError as e: + if REDNOSE_DEBUG: + print("\nError in rednose filtering: %s" % (e,), file=sys.stderr) + traceback.print_exc(sys.stderr) + return False + + def write(self, *a): + if self.__should_filter(): + return + return self.__stream.write(*a) + + def writeln(self, *a): + if self.__should_filter(): + return + return self.__stream.writeln(*a) + + # pass non-known methods through to self.__stream + def __getattr__(self, name): + if REDNOSE_DEBUG: + print("REDNOSE_DEBUG: getting attr %s" % (name,), file=sys.stderr) + return getattr(self.__stream, name) -- cgit