import outer_packages from platform_cmd_link import * import functional_general_test from import assert_equal from import assert_not_equal from import nottest from nose.plugins.attrib import attr from trex import CTRexScenario from trex_stl_lib import trex_stl_sim from trex_stl_lib.trex_stl_streams import STLProfile from trex_stl_lib.trex_stl_packet_builder_scapy import RawPcapReader, RawPcapWriter, Ether from trex_stl_lib.utils.text_opts import * import sys if sys.version_info > (3,0): from io import StringIO else: from cStringIO import StringIO import os import subprocess import shlex from threading import Thread @attr('run_on_trex') class CStlBasic_Test(functional_general_test.CGeneralFunctional_Test): def setUp (self): self.test_path = os.path.abspath(os.getcwd()) self.scripts_path = CTRexScenario.scripts_path self.verify_exists(os.path.join(self.scripts_path, "bp-sim-64-debug")) self.stl_sim = os.path.join(self.scripts_path, "stl-sim") self.verify_exists(self.stl_sim) self.profiles_path = os.path.join(self.scripts_path, "stl/yaml/") self.profiles = {} self.profiles['imix_3pkt'] = os.path.join(self.profiles_path, "imix_3pkt.yaml") self.profiles['imix_3pkt_vm'] = os.path.join(self.profiles_path, "imix_3pkt_vm.yaml") self.profiles['random_size_9k'] = os.path.join(self.profiles_path, "../") self.profiles['imix_tuple_gen'] = os.path.join(self.profiles_path, "imix_1pkt_tuple_gen.yaml") for k, v in self.profiles.items(): self.verify_exists(v) self.valgrind_profiles = [ self.profiles['imix_3pkt_vm'], self.profiles['random_size_9k'], self.profiles['imix_tuple_gen'] ] self.golden_path = os.path.join(self.test_path,"stl/golden/") os.chdir(self.scripts_path) def tearDown (self): os.chdir(self.test_path) def get_golden (self, name): golden = os.path.join(self.golden_path, name) self.verify_exists(golden) return golden def verify_exists (self, name): if not os.path.exists(name): raise Exception("cannot find '{0}'".format(name)) def scapy_pkt_show_to_str (self, scapy_pkt): capture = StringIO() save_stdout = sys.stdout sys.stdout = capture sys.stdout = save_stdout return capture.getvalue() def compare_caps (self, output, golden, max_diff_sec = 0.01): pkts1 = list(RawPcapReader(output)) pkts2 = list(RawPcapReader(golden)) assert_equal(len(pkts1), len(pkts2)) for pkt1, pkt2, i in zip(pkts1, pkts2, range(1, len(pkts1))): ts1 = float(pkt1[1][0]) + (float(pkt1[1][1]) / 1e6) ts2 = float(pkt2[1][0]) + (float(pkt2[1][1]) / 1e6) if abs(ts1-ts2) > 0.000005: # 5 nsec raise AssertionError("TS error: cap files '{0}', '{1}' differ in cap #{2} - '{3}' vs. '{4}'".format(output, golden, i, ts1, ts2)) if pkt1[0] != pkt2[0]: errmsg = "RAW error: output file '{0}', differs from golden '{1}' in cap #{2}".format(output, golden, i) print(errmsg) print(format_text("\ndifferent fields for packet #{0}:".format(i), 'underline')) scapy_pkt1_info = self.scapy_pkt_show_to_str(Ether(pkt1[0])).split('\n') scapy_pkt2_info = self.scapy_pkt_show_to_str(Ether(pkt2[0])).split('\n') print(format_text("\nGot:\n", 'bold', 'underline')) for line, ref in zip(scapy_pkt1_info, scapy_pkt2_info): if line != ref: print(format_text(line, 'bold')) print(format_text("\nExpected:\n", 'bold', 'underline')) for line, ref in zip(scapy_pkt2_info, scapy_pkt1_info): if line != ref: print(format_text(line, 'bold')) print("\n") raise AssertionError(errmsg) def run_sim (self, yaml, output, options = "", silent = False, obj = None): if output: user_cmd = "-f {0} -o {1} {2} -p {3}".format(yaml, output, options, self.scripts_path) else: user_cmd = "-f {0} {1} -p {2}".format(yaml, options, self.scripts_path) if silent: user_cmd += " --silent" rc = trex_stl_sim.main(args = shlex.split(user_cmd)) if obj: obj['rc'] = (rc == 0) return (rc == 0) def run_py_profile_path (self, profile, options,silent = False, do_no_remove=False,compare =True, test_generated=True, do_no_remove_generated = False): print('Testing profile: %s' % profile) output_cap = "a.pcap" input_file = os.path.join('stl/', profile) golden_file = os.path.join('exp',os.path.basename(profile).split('.')[0]+'.pcap'); if os.path.exists(output_cap): os.unlink(output_cap) try: rc = self.run_sim(input_file, output_cap, options, silent) assert_equal(rc, True) #s='cp '+output_cap+' '+golden_file; #print s #os.system(s) if compare: self.compare_caps(output_cap, golden_file) finally: if not do_no_remove: os.unlink(output_cap) if test_generated: try: generated_filename = input_file.replace('.py', '').replace('.yaml', '') if input_file.endswith('.py'): profile = STLProfile.load_py(input_file) elif input_file.endswith('.yaml'): profile = STLProfile.load_yaml(input_file) profile.dump_to_code(generated_filename) rc = self.run_sim(generated_filename, output_cap, options, silent) assert_equal(rc, True) if compare: self.compare_caps(output_cap, golden_file) except Exception as e: print(e) finally: if not do_no_remove_generated: os.unlink(generated_filename) # python 3 does not generate PYC under the same dir if os.path.exists(generated_filename + 'c'): os.unlink(generated_filename + 'c') if not do_no_remove: os.unlink(output_cap) def test_stl_profiles (self): p = [ ["","-m 1 -l 50",True], ["","-m 1 -l 50",True], # can't compare random now ["","-m 1 -l 50",True], ["","-m 1 -l 50",True], ["","-m 1 -l 50",True], ["","-m 1 -l 50",True], ["","-m 1 -l 50",True], # can't do the compare ["","-m 1 -l 50",True], ["","-m 1 ",True], ["","-m 1 -l 100",True], ["","-m 1 -l 100",True], ["","-m 1 -l 100",True], ["","-m 1 -l 100",True], ["", "-m 1", True], ["", "-m 1", True], ["", "-m 1 -l 1", True], ["", "-m 1 -l 1", True], # YAML test ["yaml/burst_1000_pkt.yaml","-m 1 -l 100",True], ["yaml/burst_1pkt_1burst.yaml","-m 1 -l 100",True], ["yaml/burst_1pkt_vm.yaml","-m 1 -l 100",True], ["yaml/imix_1pkt.yaml","-m 1 -l 100",True], ["yaml/imix_1pkt_2.yaml","-m 1 -l 100",True], ["yaml/imix_1pkt_tuple_gen.yaml","-m 1 -l 100",True], ["yaml/imix_1pkt_vm.yaml","-m 1 -l 100",True], ["","-m 1 -l 10",True], ["","-m 1 -l 10",True], #["","-m 1 -l 3",True], ["","-m 1 -l 3",True], ["","-m 1 -c 2 -l 100",True], ["","-m 1 -c 2 -l 100",True], ["","-m 1 -c 1 -l 17",True, False], # can't generate: no VXLAN in Scapy, only in profile ["","-m 1 -c 1 -l 17",True], ["yaml/imix_3pkt.yaml","-m 50kpps --limit 20 --cores 2",True], ["yaml/imix_3pkt_vm.yaml","-m 50kpps --limit 20 --cores 2",True], ["","-m 1 -l 1 ",True], ["","-m 1 -l 1 ",True], ["","-m 1 -l 1 ",True], ["","-m 1 -l 20 ",True], ["","-m 1 -l 20 ",True], ["","-m 1 -l 20 ",True] , ["","-m 1 -l 20 ",True], ["","-m 1 -l 20 ",True], ["","-m 1 -l 10 ",True], # test split of packet with ip option ["","-m 1 -l 10 ",True], ["","-m 1 -l 30 ",True], ["","-m 1 -l 50",True], ["","-m 1 -l 50 --cores 2",True] ]; #p = [ ["","-m 1 -l 50",True] ] #p = [ ["","-m 1 -l 50 --cores 2",True] ] for obj in p: try: test_generated = obj[3] except: # check generated if not said otherwise test_generated = True self.run_py_profile_path (obj[0],obj[1],compare =obj[2], test_generated = test_generated, do_no_remove=True, do_no_remove_generated = False) def test_hlt_profiles (self): p = ( ['hlt/', '-m 1 -l 20', True], ['hlt/', '-m 1 -l 20', True], ['hlt/', '-m 1 -l 20', True], ['hlt/', '-m 1 -l 20', True], ['hlt/', '-m 1 -l 20', True], ['hlt/', '-m 1 -l 20', True], ['hlt/', '-m 1 -l 20', True], ['hlt/', '-m 1 -l 20', True], ['hlt/', '-m 1 -l 20', True], ['hlt/', '-m 1 -l 20', True], ['hlt/', '-m 1 -l 20', True], ['hlt/', '-m 1 -l 20', True], ['hlt/', '-m 1 -l 20', True], ['hlt/', '-m 1 -l 20', True], ['hlt/', '-m 1 -l 20', True], ['hlt/', '-m 1 -l 20', True], ['hlt/', '-m 1 -l 20', True], ['hlt/', '-m 1 -l 20', True], ['hlt/', '-m 1 -l 20', True], ['hlt/', '-m 1 -l 20', True], ['hlt/', '-m 1 -l 20', True], ) for obj in p: self.run_py_profile_path (obj[0], obj[1], compare =obj[2], do_no_remove=True, do_no_remove_generated = False) # valgrind tests - this runs in multi thread as it safe (no output) def test_valgrind_various_profiles (self): print("\n") threads = [] for profile in self.valgrind_profiles: print("\n*** VALGRIND: testing profile '{0}' ***\n".format(profile)) obj = {'t': None, 'rc': None} t = Thread(target = self.run_sim, kwargs = {'obj': obj, 'yaml': profile, 'output':None, 'options': "--cores 8 --limit 20 --valgrind", 'silent': True}) obj['t'] = t threads.append(obj) t.start() for obj in threads: obj['t'].join() for obj in threads: assert_equal(obj['rc'], True) def test_multicore_scheduling (self): mc_tests = ['stl/tests/', 'stl/tests/', 'stl/tests/', 'stl/tests/', ] for mc_test in mc_tests: rc = self.run_sim(mc_test, output = None, options = '--test_multi_core --limit=3840 -m 27kpps', silent = True) assert_equal(rc, True)