# -*- coding: utf-8 -*- import xml.etree.ElementTree as ET import outer_packages import argparse import glob from pprint import pprint import sys, os from collections import OrderedDict import copy import datetime, time try: import cPickle as pickle except: import pickle import subprocess, shlex from ansi2html import Ansi2HTMLConverter converter = Ansi2HTMLConverter(inline = True) convert = converter.convert def ansi2html(text): return convert(text, full = False) FUNCTIONAL_CATEGORY = 'Functional' # how to display those categories ERROR_CATEGORY = 'Error' def pad_tag(text, tag): return '<%s>%s' % (tag, text, tag) def mark_string(text, color, condition): if condition: return '%s' % (color, text) return text def is_functional_test_name(testname): #if testname.startswith(('platform_', 'misc_methods_', 'vm_', 'payload_gen_', 'pkt_builder_')): # return True #return False if testname.startswith('functional_tests.'): return True return False def is_good_status(text): return text in ('Successful', 'Fixed', 'Passed', 'True', 'Pass') # input: xml element with test result # output string: 'error', 'failure', 'skipped', 'passed' def get_test_result(test): for child in test.getchildren(): if child.tag in ('error', 'failure', 'skipped'): return child.tag return 'passed' # returns row of table with and columns - key: value def add_th_td(key, value): return '%s%s\n' % (key, value) # returns row of table with and columns - key: value def add_td_td(key, value): return '%s%s\n' % (key, value) # returns row of table with and columns - key: value def add_th_th(key, value): return '%s%s\n' % (key, value) # returns
with table of tests under given category. # category - string with name of category # tests - list of tests, derived from aggregated xml report, changed a little to get easily stdout etc. # tests_type - stateful or stateless # category_info_dir - folder to search for category info file # expanded - bool, false = outputs (stdout etc.) of tests are hidden by CSS # brief - bool, true = cut some part of tests outputs (useful for errors section with expanded flag) def add_category_of_tests(category, tests, tests_type = None, category_info_dir = None, expanded = False, brief = False): is_actual_category = category not in (FUNCTIONAL_CATEGORY, ERROR_CATEGORY) category_id = '_'.join([category, tests_type]) if tests_type else category category_name = ' '.join([category, tests_type.capitalize()]) if tests_type else category html_output = '' if is_actual_category: html_output += '
\n' if category_info_dir: category_info_file = '%s/report_%s.info' % (category_info_dir, category) if os.path.exists(category_info_file): with open(category_info_file) as f: for info_line in f.readlines(): key_value = info_line.split(':', 1) if key_value[0].strip() in list(trex_info_dict.keys()) + ['User']: # always 'hhaim', no need to show continue html_output += add_th_td('%s:' % key_value[0], key_value[1]) else: html_output += add_th_td('Info:', 'No info') print('add_category_of_tests: no category info %s' % category_info_file) if tests_type: html_output += add_th_td('Tests type:', tests_type.capitalize()) if len(tests): total_duration = 0.0 for test in tests: total_duration += float(test.attrib['time']) html_output += add_th_td('Tests duration:', datetime.timedelta(seconds = int(total_duration))) html_output += '
\n' if not len(tests): return html_output + pad_tag('
No tests!', 'b') html_output += '
\n\n\n\n\n' for test in tests: functional_test = is_functional_test_name(test.attrib['name']) if functional_test and is_actual_category: continue if category == ERROR_CATEGORY: test_id = ('err_' + test.attrib['classname'] + test.attrib['name']).replace('.', '_') else: test_id = (category_id + test.attrib['name']).replace('.', '_') if expanded: html_output += '\n\n\n' elif test_result == 'failure': html_output += 'FAILED' elif test_result == 'skipped': html_output += 'SKIPPED' else: html_output += 'PASSED' html_output += '' result, result_text = test.attrib.get('result', ('', '')) if result_text: start_index_errors_stl = result_text.find('STLError: \n******') if start_index_errors_stl > 0: result_text = result_text[start_index_errors_stl:].strip() # cut traceback start_index_errors = result_text.find('Exception: The test is failed, reasons:') if start_index_errors > 0: result_text = result_text[start_index_errors + 10:].strip() # cut traceback result_text = ansi2html(result_text) result_text = '%s:
%s

' % (result.capitalize(), result_text.replace('\n', '
')) stderr = '' if brief and result_text else test.get('stderr', '') if stderr: stderr = ansi2html(stderr) stderr = 'Stderr:
%s

\n' % stderr.replace('\n', '
') stdout = '' if brief and result_text else test.get('stdout', '') if stdout: stdout = ansi2html(stdout) if brief: # cut off server logs stdout = stdout.split('>>>>>>>>>>>>>>>', 1)[0] stdout = 'Stdout:
%s

\n' % stdout.replace('\n', '
') html_output += '' % (result_text, stderr, stdout) else: html_output += 'No output' html_output += '\n
' if category == ERROR_CATEGORY: html_output += 'SetupFailed tests:' else: html_output += '%s tests:' % category_name html_output += 'Final ResultTime (s)
' else: html_output += '
' % test_id if category == ERROR_CATEGORY: html_output += FUNCTIONAL_CATEGORY if functional_test else test.attrib['classname'] if expanded: html_output += '' else: html_output += '' html_output += '%s' % test.attrib['name'] test_result = get_test_result(test) if test_result == 'error': html_output += 'ERROR '+ test.attrib['time'] + '
' % ('' if expanded else 'display:none;', test_id, 4 if category == ERROR_CATEGORY else 3) if result_text or stderr or stdout: html_output += '%s%s%s
' return html_output style_css = """ html {overflow-y:scroll;} body { font-size:12px; color:#000000; background-color:#ffffff; margin:0px; font-family:verdana,helvetica,arial,sans-serif; } div {width:100%;} table,th,td,input,textarea { font-size:100%; } table.reference, table.reference_fail { background-color:#ffffff; border:1px solid #c3c3c3; border-collapse:collapse; vertical-align:middle; } table.reference th { background-color:#e5eecc; border:1px solid #c3c3c3; padding:3px; } table.reference_fail th { background-color:#ffcccc; border:1px solid #c3c3c3; padding:3px; } table.reference td, table.reference_fail td { border:1px solid #c3c3c3; padding:3px; } a.example {font-weight:bold} #a:link,a:visited {color:#900B09; background-color:transparent} #a:hover,a:active {color:#FF0000; background-color:transparent} .linktr { cursor: pointer; } .linktext { color:#0000FF; text-decoration: underline; } """ # main if __name__ == '__main__': # deal with input args argparser = argparse.ArgumentParser(description='Aggregate test results of from ./reports dir, produces xml, html, mail report.') argparser.add_argument('--input_dir', default='./reports', help='Directory with xmls/setups info. Filenames: report_.xml/report_.info') argparser.add_argument('--output_xml', default='./reports/aggregated_tests.xml', dest = 'output_xmlfile', help='Name of output xml file with aggregated results.') argparser.add_argument('--output_html', default='./reports/aggregated_tests.html', dest = 'output_htmlfile', help='Name of output html file with aggregated results.') argparser.add_argument('--output_mail', default='./reports/aggregated_tests_mail.html', dest = 'output_mailfile', help='Name of output html file with aggregated results for mail.') argparser.add_argument('--output_title', default='./reports/aggregated_tests_title.txt', dest = 'output_titlefile', help='Name of output file to contain title of mail.') argparser.add_argument('--build_status_file', default='./reports/build_status', dest = 'build_status_file', help='Name of output file to save scenaries build results (should not be wiped).') argparser.add_argument('--last_passed_commit', default='./reports/last_passed_commit', dest = 'last_passed_commit', help='Name of output file to save last passed commit (should not be wiped).') args = argparser.parse_args() ##### get input variables/TRex commit info scenario = os.environ.get('SCENARIO') build_url = os.environ.get('BUILD_URL') build_id = os.environ.get('BUILD_ID') trex_repo = os.environ.get('TREX_CORE_REPO') python_ver = os.environ.get('PYTHON_VER') if not scenario: print('Warning: no environment variable SCENARIO, using default') scenario = 'TRex regression' if not build_url: print('Warning: no environment variable BUILD_URL') if not build_id: print('Warning: no environment variable BUILD_ID') if not python_ver: print('Warning: no environment variable PYTHON_VER') trex_info_dict = OrderedDict() for file in glob.glob('%s/report_*.info' % args.input_dir): with open(file) as f: file_lines = f.readlines() if not len(file_lines): continue # to next file for info_line in file_lines: key_value = info_line.split(':', 1) not_trex_keys = ['Server', 'Router', 'User'] if key_value[0].strip() in not_trex_keys: continue # to next parameters trex_info_dict[key_value[0].strip()] = key_value[1].strip() break trex_last_commit_info = '' trex_last_commit_hash = trex_info_dict.get('Git SHA') if trex_last_commit_hash and trex_repo: try: print('Getting TRex commit with hash %s' % trex_last_commit_hash) command = 'git --git-dir %s show %s --quiet' % (trex_repo, trex_last_commit_hash) print('Executing: %s' % command) proc = subprocess.Popen(shlex.split(command), stdout=subprocess.PIPE, stderr=subprocess.PIPE) (trex_last_commit_info, stderr) = proc.communicate() print('Stdout:\n\t' + trex_last_commit_info.replace('\n', '\n\t')) print('Stderr:', stderr) print('Return code:', proc.returncode) trex_last_commit_info = trex_last_commit_info.replace('\n', '
') except Exception as e: print('Error getting last commit: %s' % e) ##### get xmls: report_.xml err = [] jobs_list = [] jobs_file = '%s/jobs_list.info' % args.input_dir if os.path.exists(jobs_file): with open('%s/jobs_list.info' % args.input_dir) as f: for line in f.readlines(): line = line.strip() if line: jobs_list.append(line) else: message = '%s does not exist!' % jobs_file print(message) err.append(message) ##### aggregate results to 1 single tree aggregated_root = ET.Element('testsuite') test_types = ('functional', 'stateful', 'stateless') setups = {} for job in jobs_list: setups[job] = {} for test_type in test_types: xml_file = '%s/report_%s_%s.xml' % (args.input_dir, job, test_type) if not os.path.exists(xml_file): continue if os.path.basename(xml_file) == os.path.basename(args.output_xmlfile): continue setups[job][test_type] = [] print('Processing report: %s.%s' % (job, test_type)) tree = ET.parse(xml_file) root = tree.getroot() for key, value in root.attrib.items(): if key in aggregated_root.attrib and value.isdigit(): # sum total number of failed tests etc. aggregated_root.attrib[key] = str(int(value) + int(aggregated_root.attrib[key])) else: aggregated_root.attrib[key] = value tests = root.getchildren() if not len(tests): # there should be tests: message = 'No tests in xml %s' % xml_file print(message) #err.append(message) for test in tests: setups[job][test_type].append(test) test.attrib['name'] = test.attrib['classname'] + '.' + test.attrib['name'] test.attrib['classname'] = job aggregated_root.append(test) if not sum([len(x) for x in setups[job].values()]): message = 'No reports from setup %s!' % job print(message) err.append(message) continue total_tests_count = int(aggregated_root.attrib.get('tests', 0)) error_tests_count = int(aggregated_root.attrib.get('errors', 0)) failure_tests_count = int(aggregated_root.attrib.get('failures', 0)) skipped_tests_count = int(aggregated_root.attrib.get('skip', 0)) passed_tests_count = total_tests_count - error_tests_count - failure_tests_count - skipped_tests_count tests_count_string = mark_string('Total: %s' % total_tests_count, 'red', total_tests_count == 0) + ', ' tests_count_string += mark_string('Passed: %s' % passed_tests_count, 'red', error_tests_count + failure_tests_count > 0) + ', ' tests_count_string += mark_string('Error: %s' % error_tests_count, 'red', error_tests_count > 0) + ', ' tests_count_string += mark_string('Failure: %s' % failure_tests_count, 'red', failure_tests_count > 0) + ', ' tests_count_string += 'Skipped: %s' % skipped_tests_count ##### save output xml print('Writing output file: %s' % args.output_xmlfile) ET.ElementTree(aggregated_root).write(args.output_xmlfile) ##### build output html error_tests = [] functional_tests = OrderedDict() # categorize and get output of each test for test in aggregated_root.getchildren(): # each test in xml if is_functional_test_name(test.attrib['name']): functional_tests[test.attrib['name']] = test result_tuple = None for child in test.getchildren(): # , (, , other: passed) # if child.tag in ('failure', 'error'): #temp = copy.deepcopy(test) #print temp._children #print test._children # error_tests.append(test) if child.tag == 'failure': error_tests.append(test) result_tuple = ('failure', child.text) elif child.tag == 'error': error_tests.append(test) result_tuple = ('error', child.text) elif child.tag == 'skipped': result_tuple = ('skipped', child.text) elif child.tag == 'system-out': test.attrib['stdout'] = child.text elif child.tag == 'system-err': test.attrib['stderr'] = child.text if result_tuple: test.attrib['result'] = result_tuple html_output = '''\ ''' if scenario: html_output += add_th_td('Scenario:', scenario.capitalize()) if python_ver: html_output += add_th_td('Python:', python_ver) start_time_file = '%s/start_time.info' % args.input_dir if os.path.exists(start_time_file): with open(start_time_file) as f: start_time = int(f.read()) total_time = int(time.time()) - start_time html_output += add_th_td('Regression start:', datetime.datetime.fromtimestamp(start_time).strftime('%d/%m/%Y %H:%M')) html_output += add_th_td('Regression duration:', datetime.timedelta(seconds = total_time)) html_output += add_th_td('Tests count:', tests_count_string) for key in trex_info_dict: if key == 'Git SHA': continue html_output += add_th_td(key, trex_info_dict[key]) if trex_last_commit_info: html_output += add_th_td('Last commit:', trex_last_commit_info) html_output += '

\n' if err: html_output += '%s

\n' % '\n
'.join(err) # # # \ #''' #passed_quantity = len(result_types['passed']) #failed_quantity = len(result_types['failed']) #error_quantity = len(result_types['error']) #skipped_quantity = len(result_types['skipped']) #html_output += '' % passed_quantity #html_output += '' % (pad_tag(failed_quantity, 'b') if failed_quantity else '0') #html_output += '' % (pad_tag(error_quantity, 'b') if error_quantity else '0') #html_output += '' % (pad_tag(skipped_quantity, 'b') if skipped_quantity else '0') # html_output += ''' # #
Summary:Passed: %sFailed: %sError: %sSkipped: %s
''' category_arr = [FUNCTIONAL_CATEGORY, ERROR_CATEGORY] # Adding buttons # Error button if len(error_tests): html_output += '\n'.format(error = ERROR_CATEGORY) # Setups buttons for category in sorted(setups.keys()): category_arr.append(category) html_output += '\n' % (category_arr[-1], category) # Functional buttons if len(functional_tests): html_output += '\n' % (FUNCTIONAL_CATEGORY, FUNCTIONAL_CATEGORY) # Adding tests # Error tests if len(error_tests): html_output += '
' % ERROR_CATEGORY html_output += add_category_of_tests(ERROR_CATEGORY, error_tests) html_output += '
' # Setups tests for category, tests in setups.items(): html_output += '' # Functional tests if len(functional_tests): html_output += '' html_output += '\n\n \ ''' # save html with open(args.output_htmlfile, 'w') as f: print('Writing output file: %s' % args.output_htmlfile) f.write(html_output) html_output = None # mail report (only error tests, expanded) mail_output = '''\ ''' if scenario: mail_output += add_th_td('Scenario:', scenario.capitalize()) if python_ver: mail_output += add_th_td('Python:', python_ver) if build_url: mail_output += add_th_td('Full HTML report:', 'link' % build_url) start_time_file = '%s/start_time.info' % args.input_dir if os.path.exists(start_time_file): with open(start_time_file) as f: start_time = int(f.read()) total_time = int(time.time()) - start_time mail_output += add_th_td('Regression start:', datetime.datetime.fromtimestamp(start_time).strftime('%d/%m/%Y %H:%M')) mail_output += add_th_td('Regression duration:', datetime.timedelta(seconds = total_time)) mail_output += add_th_td('Tests count:', tests_count_string) for key in trex_info_dict: if key == 'Git SHA': continue mail_output += add_th_td(key, trex_info_dict[key]) if trex_last_commit_info: mail_output += add_th_td('Last commit:', trex_last_commit_info) mail_output += '

\n
\n' for category in setups.keys(): failing_category = False for test in error_tests: if test.attrib['classname'] == category: failing_category = True if failing_category or not len(setups[category]) or not sum([len(x) for x in setups[category]]): mail_output += '\n' else: mail_output += '
\n' mail_output += add_th_th('Setup:', pad_tag(category.replace('.', '/'), 'b')) category_info_file = '%s/report_%s.info' % (args.input_dir, category.replace('.', '_')) if os.path.exists(category_info_file): with open(category_info_file) as f: for info_line in f.readlines(): key_value = info_line.split(':', 1) if key_value[0].strip() in list(trex_info_dict.keys()) + ['User']: # always 'hhaim', no need to show continue mail_output += add_th_td('%s:' % key_value[0].strip(), key_value[1].strip()) else: mail_output += add_th_td('Info:', 'No info') mail_output += '
\n' mail_output += '
\n' # Error tests if len(error_tests) or err: if err: mail_output += '%s' % '\n
'.join(err) if len(error_tests) > 5: mail_output += '\nMore than 5 failed tests, showing brief output.\n
' # show only brief version (cut some info) mail_output += add_category_of_tests(ERROR_CATEGORY, error_tests, expanded=True, brief=True) else: mail_output += add_category_of_tests(ERROR_CATEGORY, error_tests, expanded=True) else: mail_output += '
All passed.
\n' mail_output += '\n\n' ##### save outputs # mail content with open(args.output_mailfile, 'w') as f: print('Writing output file: %s' % args.output_mailfile) f.write(mail_output) # build status category_dict_status = {} if os.path.exists(args.build_status_file): print('Reading: %s' % args.build_status_file) with open(args.build_status_file, 'rb') as f: try: category_dict_status = pickle.load(f) except Exception as e: print('Error during pickle load: %s' % e) if type(category_dict_status) is not dict: print('%s is corrupt, truncating' % args.build_status_file) category_dict_status = {} last_status = category_dict_status.get(scenario, 'Successful') # assume last is passed if no history if err or len(error_tests): # has fails exit_status = 1 if is_good_status(last_status): current_status = 'Failure' else: current_status = 'Still Failing' else: exit_status = 0 if is_good_status(last_status): current_status = 'Successful' else: current_status = 'Fixed' category_dict_status[scenario] = current_status with open(args.build_status_file, 'wb') as f: print('Writing output file: %s' % args.build_status_file) pickle.dump(category_dict_status, f) # last successful commit if (current_status in ('Successful', 'Fixed')) and trex_last_commit_hash and jobs_list > 0 and scenario == 'nightly': with open(args.last_passed_commit, 'w') as f: print('Writing output file: %s' % args.last_passed_commit) f.write(trex_last_commit_hash) # mail title mailtitle_output = scenario.capitalize() if build_id: mailtitle_output += ' - Build #%s' % build_id mailtitle_output += ' - %s!' % current_status with open(args.output_titlefile, 'w') as f: print('Writing output file: %s' % args.output_titlefile) f.write(mailtitle_output) # exit sys.exit(exit_status)