#!/usr/bin/env python import time,argparse,sys,cmd, unittest from ipaddress import * parser = argparse.ArgumentParser(description='VPP MAP test') parser.add_argument('-i', nargs='*', action="store", dest="inputdir") args = parser.parse_args() for dir in args.inputdir: sys.path.append(dir) from vpp_papi import * # # 1:1 Shared IPv4 address, shared BR (16) VPP CLI # def lw46_shared(ip4_pfx_str, ip6_pfx_str, ip6_src_str, ea_bits_len, psid_offset, psid_len, ip6_src_ecmp = False): ip4_pfx = ip_network(ip4_pfx_str) ip6_src = ip_address(ip6_src_str) ip6_dst = ip_network(ip6_pfx_str) ip6_nul = IPv6Address(u'0::0') mod = ip4_pfx.num_addresses / 1024 for i in range(ip4_pfx.num_addresses): a = time.clock() t = map_add_domain(0, ip6_nul.packed, ip4_pfx[i].packed, ip6_src.packed, 0, 32, 128, ea_bits_len, psid_offset, psid_len, 0, 0) #print "Return from map_add_domain", t if t == None: print "map_add_domain failed" continue if t.retval != 0: print "map_add_domain failed", t continue for psid in range(0x1 << int(psid_len)): r = map_add_del_rule(0, t.index, 1, (ip6_dst[(i * (0x1<H', msg[0:2]) size = unpack('>H', msg[2:4]) print "Received", id, "of size", size i += 1 #del msg continue #time.sleep(0.001) return # Create RX thread rxthread = RXThread() rxthread.setDaemon(True) print "Connect", connect_to_vpe("client124") import timeit rxthread.start() print "After thread started" #pneum_kill_thread() print "After thread killed" #t = show_version(0) #print "Result from show version", t print timeit.timeit('t = show_version(0)', number=1000, setup="from __main__ import show_version") time.sleep(10) #print timeit.timeit('control_ping(0)', number=10, setup="from __main__ import control_ping") disconnect_from_vpe() sys.exit() print t.program, t.version,t.builddate,t.builddirectory ''' t = map_domain_dump(0) if not t: print('show map domain failed') for d in t: print("IP6 prefix:",str(IPv6Address(d.ip6prefix))) print( "IP4 prefix:",str(IPv4Address(d.ip4prefix))) ''' suite = unittest.TestLoader().loadTestsFromTestCase(TestMAP) unittest.TextTestRunner(verbosity=2).run(suite) disconnect_from_vpe()