diff options
Diffstat (limited to 'src/vnet/map/examples/test_map.py')
-rwxr-xr-x | src/vnet/map/examples/test_map.py | 141 |
1 files changed, 0 insertions, 141 deletions
diff --git a/src/vnet/map/examples/test_map.py b/src/vnet/map/examples/test_map.py deleted file mode 100755 index 21388d49526..00000000000 --- a/src/vnet/map/examples/test_map.py +++ /dev/null @@ -1,141 +0,0 @@ -#!/usr/bin/env python - -import time,argparse,sys,cmd, unittest -from ipaddress import * - -parser = argparse.ArgumentParser(description='VPP MAP test') -parser.add_argument('-i', nargs='*', action="store", dest="inputdir") -args = parser.parse_args() - -for dir in args.inputdir: - sys.path.append(dir) -from vpp_papi import * - -# -# 1:1 Shared IPv4 address, shared BR (16) VPP CLI -# -def lw46_shared(ip4_pfx_str, ip6_pfx_str, ip6_src_str, ea_bits_len, psid_offset, psid_len, ip6_src_ecmp = False): - ip4_pfx = ip_network(ip4_pfx_str) - ip6_src = ip_address(ip6_src_str) - ip6_dst = ip_network(ip6_pfx_str) - ip6_nul = IPv6Address(u'0::0') - mod = ip4_pfx.num_addresses / 1024 - - for i in range(ip4_pfx.num_addresses): - a = time.clock() - t = map_add_domain(0, ip6_nul.packed, ip4_pfx[i].packed, ip6_src.packed, 0, 32, 128, ea_bits_len, psid_offset, psid_len, 0, 0) - #print "Return from map_add_domain", t - if t == None: - print "map_add_domain failed" - continue - if t.retval != 0: - print "map_add_domain failed", t - continue - for psid in range(0x1 << int(psid_len)): - r = map_add_del_rule(0, t.index, 1, (ip6_dst[(i * (0x1<<int(psid_len))) + psid]).packed, psid) - #print "Return from map_add_del_rule", r - - if ip6_src_ecmp and not i % mod: - ip6_src = ip6_src + 1 - - print "Running time:", time.clock() - a - -class TestMAP(unittest.TestCase): - ''' - def test_delete_all(self): - t = map_domain_dump(0) - self.assertNotEqual(t, None) - print "Number of domains configured: ", len(t) - for d in t: - ts = map_del_domain(0, d.domainindex) - self.assertNotEqual(ts, None) - t = map_domain_dump(0) - self.assertNotEqual(t, None) - print "Number of domains configured: ", len(t) - self.assertEqual(len(t), 0) - - ''' - - def test_a_million_rules(self): - ip4_pfx = u'192.0.2.0/24' - ip6_pfx = u'2001:db8::/32' - ip6_src = u'2001:db8::1' - psid_offset = 6 - psid_len = 6 - ea_bits_len = 0 - lw46_shared(ip4_pfx, ip6_pfx, ip6_src, ea_bits_len, psid_offset, psid_len) - -# -# RX thread, that should sit on blocking vpe_api_read() - -# - - -# -# -# -import threading -class RXThread (threading.Thread): - def __init__(self): - threading.Thread.__init__(self) - - def run(self): - print "Starting " - i = 0 - while True: - msg = vpe_api_read() - if msg: - #print msg - id = unpack('>H', msg[0:2]) - size = unpack('>H', msg[2:4]) - print "Received", id, "of size", size - i += 1 - #del msg - continue - - #time.sleep(0.001) - return - -# Create RX thread -rxthread = RXThread() -rxthread.setDaemon(True) - -print "Connect", connect_to_vpe("client124") -import timeit -rxthread.start() -print "After thread started" - -#pneum_kill_thread() -print "After thread killed" - -#t = show_version(0) -#print "Result from show version", t - -print timeit.timeit('t = show_version(0)', number=1000, setup="from __main__ import show_version") -time.sleep(10) -#print timeit.timeit('control_ping(0)', number=10, setup="from __main__ import control_ping") - - -disconnect_from_vpe() -sys.exit() - - -print t.program, t.version,t.builddate,t.builddirectory - -''' - -t = map_domain_dump(0) -if not t: - print('show map domain failed') - -for d in t: - print("IP6 prefix:",str(IPv6Address(d.ip6prefix))) - print( "IP4 prefix:",str(IPv4Address(d.ip4prefix))) -''' - -suite = unittest.TestLoader().loadTestsFromTestCase(TestMAP) -unittest.TextTestRunner(verbosity=2).run(suite) - -disconnect_from_vpe() - - |