summaryrefslogtreecommitdiffstats
path: root/src/plugins/map/examples
diff options
context:
space:
mode:
Diffstat (limited to 'src/plugins/map/examples')
-rwxr-xr-xsrc/plugins/map/examples/gen-rules.py160
-rwxr-xr-xsrc/plugins/map/examples/test_map.py119
2 files changed, 195 insertions, 84 deletions
diff --git a/src/plugins/map/examples/gen-rules.py b/src/plugins/map/examples/gen-rules.py
index 7964aa9a359..d22d4633ef6 100755
--- a/src/plugins/map/examples/gen-rules.py
+++ b/src/plugins/map/examples/gen-rules.py
@@ -20,38 +20,63 @@ import sys
# map add domain ip4-pfx <pfx> ip6-pfx ::/0 ip6-src <ip6-src> ea-bits-len 0 psid-offset 6 psid-len 6
# map add rule index <0> psid <psid> ip6-dst <ip6-dst>
-def_ip4_pfx = '192.0.2.0/24'
-def_ip6_pfx = '2001:db8::/32'
-def_ip6_src = '2001:db8::1'
+def_ip4_pfx = "192.0.2.0/24"
+def_ip6_pfx = "2001:db8::/32"
+def_ip6_src = "2001:db8::1"
def_psid_offset = 6
def_psid_len = 6
def_ea_bits_len = 0
-parser = argparse.ArgumentParser(description='MAP VPP configuration generator')
-parser.add_argument('-t', action="store", dest="mapmode")
-parser.add_argument('-f', action="store", dest="format", default="vpp")
-parser.add_argument('--ip4-prefix', action="store", dest="ip4_pfx", default=def_ip4_pfx)
-parser.add_argument('--ip6-prefix', action="store", dest="ip6_pfx", default=def_ip6_pfx)
-parser.add_argument('--ip6-src', action="store", dest="ip6_src", default=def_ip6_src)
-parser.add_argument('--psid-len', action="store", dest="psid_len", default=def_psid_len)
-parser.add_argument('--psid-offset', action="store", dest="psid_offset", default=def_psid_offset)
-parser.add_argument('--ea-bits-len', action="store", dest="ea_bits_len", default=def_ea_bits_len)
+parser = argparse.ArgumentParser(description="MAP VPP configuration generator")
+parser.add_argument("-t", action="store", dest="mapmode")
+parser.add_argument("-f", action="store", dest="format", default="vpp")
+parser.add_argument("--ip4-prefix", action="store", dest="ip4_pfx", default=def_ip4_pfx)
+parser.add_argument("--ip6-prefix", action="store", dest="ip6_pfx", default=def_ip6_pfx)
+parser.add_argument("--ip6-src", action="store", dest="ip6_src", default=def_ip6_src)
+parser.add_argument("--psid-len", action="store", dest="psid_len", default=def_psid_len)
+parser.add_argument(
+ "--psid-offset", action="store", dest="psid_offset", default=def_psid_offset
+)
+parser.add_argument(
+ "--ea-bits-len", action="store", dest="ea_bits_len", default=def_ea_bits_len
+)
args = parser.parse_args()
#
# Print domain
#
def domain_print(i, ip4_pfx, ip6_pfx, ip6_src, eabits_len, psid_offset, psid_len):
- if format == 'vpp':
- print("map add domain ip4-pfx " + ip4_pfx + " ip6-pfx", ip6_pfx, "ip6-src " + ip6_src +
- " ea-bits-len", eabits_len, "psid-offset", psid_offset, "psid-len", psid_len)
- if format == 'confd':
- print("vpp softwire softwire-instances softwire-instance", i, "br-ipv6 " + ip6_src +
- " ipv6-prefix " + ip6_pfx + " ipv4-prefix " + ip4_pfx +
- " ea-bits-len", eabits_len, "psid-offset", psid_offset, "psid-len", psid_len)
- if format == 'xml':
+ if format == "vpp":
+ print(
+ "map add domain ip4-pfx " + ip4_pfx + " ip6-pfx",
+ ip6_pfx,
+ "ip6-src " + ip6_src + " ea-bits-len",
+ eabits_len,
+ "psid-offset",
+ psid_offset,
+ "psid-len",
+ psid_len,
+ )
+ if format == "confd":
+ print(
+ "vpp softwire softwire-instances softwire-instance",
+ i,
+ "br-ipv6 "
+ + ip6_src
+ + " ipv6-prefix "
+ + ip6_pfx
+ + " ipv4-prefix "
+ + ip4_pfx
+ + " ea-bits-len",
+ eabits_len,
+ "psid-offset",
+ psid_offset,
+ "psid-len",
+ psid_len,
+ )
+ if format == "xml":
print("<softwire-instance>")
- print("<id>", i, "</id>");
+ print("<id>", i, "</id>")
print(" <br-ipv6>" + ip6_src + "</br-ipv6>")
print(" <ipv6-prefix>" + ip6_pfx + "</ipv6-prefix>")
print(" <ipv4-prefix>" + ip4_pfx + "</ipv4-prefix>")
@@ -59,32 +84,54 @@ def domain_print(i, ip4_pfx, ip6_pfx, ip6_src, eabits_len, psid_offset, psid_len
print(" <psid-len>", psid_len, "</psid-len>")
print(" <psid-offset>", psid_offset, "</psid-offset>")
+
def domain_print_end():
- if format == 'xml':
+ if format == "xml":
print("</softwire-instance>")
+
def rule_print(i, psid, dst):
- if format == 'vpp':
+ if format == "vpp":
print("map add rule index", i, "psid", psid, "ip6-dst", dst)
- if format == 'confd':
+ if format == "confd":
print("binding", psid, "ipv6-addr", dst)
- if format == 'xml':
+ if format == "xml":
print(" <binding>")
print(" <psid>", psid, "</psid>")
print(" <ipv6-addr>", dst, "</ipv6-addr>")
print(" </binding>")
+
#
# Algorithmic mapping Shared IPv4 address
#
-def algo(ip4_pfx_str, ip6_pfx_str, ip6_src_str, ea_bits_len, psid_offset, psid_len, ip6_src_ecmp = False):
- domain_print(0, ip4_pfx_str, ip6_pfx_str, ip6_src_str, ea_bits_len, psid_offset, psid_len)
+def algo(
+ ip4_pfx_str,
+ ip6_pfx_str,
+ ip6_src_str,
+ ea_bits_len,
+ psid_offset,
+ psid_len,
+ ip6_src_ecmp=False,
+):
+ domain_print(
+ 0, ip4_pfx_str, ip6_pfx_str, ip6_src_str, ea_bits_len, psid_offset, psid_len
+ )
domain_print_end()
+
#
# 1:1 Full IPv4 address
#
-def lw46(ip4_pfx_str, ip6_pfx_str, ip6_src_str, ea_bits_len, psid_offset, psid_len, ip6_src_ecmp = False):
+def lw46(
+ ip4_pfx_str,
+ ip6_pfx_str,
+ ip6_src_str,
+ ea_bits_len,
+ psid_offset,
+ psid_len,
+ ip6_src_ecmp=False,
+):
ip4_pfx = ipaddress.ip_network(ip4_pfx_str)
ip6_src = ipaddress.ip_address(ip6_src_str)
ip6_dst = ipaddress.ip_network(ip6_pfx_str)
@@ -92,15 +139,26 @@ def lw46(ip4_pfx_str, ip6_pfx_str, ip6_src_str, ea_bits_len, psid_offset, psid_l
mod = ip4_pfx.num_addresses / 1024
for i in range(ip4_pfx.num_addresses):
- domain_print(i, str(ip4_pfx[i]) + "/32", str(ip6_dst[i]) + "/128", str(ip6_src), 0, 0, 0)
+ domain_print(
+ i, str(ip4_pfx[i]) + "/32", str(ip6_dst[i]) + "/128", str(ip6_src), 0, 0, 0
+ )
domain_print_end()
if ip6_src_ecmp and not i % mod:
ip6_src = ip6_src + 1
+
#
# 1:1 Shared IPv4 address, shared BR (16) VPP CLI
#
-def lw46_shared(ip4_pfx_str, ip6_pfx_str, ip6_src_str, ea_bits_len, psid_offset, psid_len, ip6_src_ecmp = False):
+def lw46_shared(
+ ip4_pfx_str,
+ ip6_pfx_str,
+ ip6_src_str,
+ ea_bits_len,
+ psid_offset,
+ psid_len,
+ ip6_src_ecmp=False,
+):
ip4_pfx = ipaddress.ip_network(ip4_pfx_str)
ip6_src = ipaddress.ip_address(ip6_src_str)
ip6_dst = ipaddress.ip_network(ip6_pfx_str)
@@ -109,7 +167,7 @@ def lw46_shared(ip4_pfx_str, ip6_pfx_str, ip6_src_str, ea_bits_len, psid_offset,
for i in range(ip4_pfx.num_addresses):
domain_print(i, str(ip4_pfx[i]) + "/32", "::/0", str(ip6_src), 0, 0, psid_len)
for psid in range(0x1 << int(psid_len)):
- rule_print(i, psid, str(ip6_dst[(i * (0x1<<int(psid_len))) + psid]))
+ rule_print(i, psid, str(ip6_dst[(i * (0x1 << int(psid_len))) + psid]))
domain_print_end()
if ip6_src_ecmp and not i % mod:
ip6_src = ip6_src + 1
@@ -118,7 +176,15 @@ def lw46_shared(ip4_pfx_str, ip6_pfx_str, ip6_src_str, ea_bits_len, psid_offset,
#
# 1:1 Shared IPv4 address, shared BR
#
-def lw46_shared_b(ip4_pfx_str, ip6_pfx_str, ip6_src_str, ea_bits_len, psid_offset, psid_len, ip6_src_ecmp = False):
+def lw46_shared_b(
+ ip4_pfx_str,
+ ip6_pfx_str,
+ ip6_src_str,
+ ea_bits_len,
+ psid_offset,
+ psid_len,
+ ip6_src_ecmp=False,
+):
ip4_pfx = ipaddress.ip_network(ip4_pfx_str)
ip6_src = ipaddress.ip_address(ip6_src_str)
ip6_dst = list(ipaddress.ip_network(ip6_pfx_str).subnets(new_prefix=56))
@@ -127,15 +193,16 @@ def lw46_shared_b(ip4_pfx_str, ip6_pfx_str, ip6_src_str, ea_bits_len, psid_offse
for i in range(ip4_pfx.num_addresses):
domain_print(i, str(ip4_pfx[i]) + "/32", "::/0", str(ip6_src), 0, 0, psid_len)
for psid in range(0x1 << psid_len):
- enduserprefix = list(ip6_dst.pop(0).subnets(new_prefix=64))[255-1]
- rule_print(i, psid, enduserprefix[(i * (0x1<<psid_len)) + psid])
+ enduserprefix = list(ip6_dst.pop(0).subnets(new_prefix=64))[255 - 1]
+ rule_print(i, psid, enduserprefix[(i * (0x1 << psid_len)) + psid])
domain_print_end()
if ip6_src_ecmp and not i % mod:
ip6_src = ip6_src + 1
def xml_header_print():
- print('''
+ print(
+ """
<?xml version="1.0" encoding="UTF-8"?>
<hello xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
<capabilities>
@@ -156,10 +223,13 @@ def xml_header_print():
<softwire>
<softwire-instances>
- ''')
+ """
+ )
+
def xml_footer_print():
- print('''
+ print(
+ """
</softwire-instances>
</softwire>
</vpp>
@@ -175,12 +245,20 @@ def xml_footer_print():
</rpc>
]]>]]>
- ''')
+ """
+ )
format = args.format
-if format == 'xml':
+if format == "xml":
xml_header_print()
-globals()[args.mapmode](args.ip4_pfx, args.ip6_pfx, args.ip6_src, args.ea_bits_len, args.psid_offset, args.psid_len)
-if format == 'xml':
+globals()[args.mapmode](
+ args.ip4_pfx,
+ args.ip6_pfx,
+ args.ip6_src,
+ args.ea_bits_len,
+ args.psid_offset,
+ args.psid_len,
+)
+if format == "xml":
xml_footer_print()
diff --git a/src/plugins/map/examples/test_map.py b/src/plugins/map/examples/test_map.py
index 7a48964b3f2..02df64015a2 100755
--- a/src/plugins/map/examples/test_map.py
+++ b/src/plugins/map/examples/test_map.py
@@ -1,10 +1,10 @@
#!/usr/bin/env python3
-import time,argparse,sys,cmd, unittest
+import time, argparse, sys, cmd, unittest
from ipaddress import *
-parser = argparse.ArgumentParser(description='VPP MAP test')
-parser.add_argument('-i', nargs='*', action="store", dest="inputdir")
+parser = argparse.ArgumentParser(description="VPP MAP test")
+parser.add_argument("-i", nargs="*", action="store", dest="inputdir")
args = parser.parse_args()
for dir in args.inputdir:
@@ -14,115 +14,150 @@ from vpp_papi import *
#
# 1:1 Shared IPv4 address, shared BR (16) VPP CLI
#
-def lw46_shared(ip4_pfx_str, ip6_pfx_str, ip6_src_str, ea_bits_len, psid_offset, psid_len, ip6_src_ecmp = False):
+def lw46_shared(
+ ip4_pfx_str,
+ ip6_pfx_str,
+ ip6_src_str,
+ ea_bits_len,
+ psid_offset,
+ psid_len,
+ ip6_src_ecmp=False,
+):
ip4_pfx = ip_network(ip4_pfx_str)
ip6_src = ip_address(ip6_src_str)
ip6_dst = ip_network(ip6_pfx_str)
- ip6_nul = IPv6Address(u'0::0')
+ ip6_nul = IPv6Address("0::0")
mod = ip4_pfx.num_addresses / 1024
for i in range(ip4_pfx.num_addresses):
a = time.clock()
- t = map_add_domain(0, ip6_nul.packed, ip4_pfx[i].packed, ip6_src.packed, 0, 32, 128, ea_bits_len, psid_offset, psid_len, 0, 0)
- #print "Return from map_add_domain", t
+ t = map_add_domain(
+ 0,
+ ip6_nul.packed,
+ ip4_pfx[i].packed,
+ ip6_src.packed,
+ 0,
+ 32,
+ 128,
+ ea_bits_len,
+ psid_offset,
+ psid_len,
+ 0,
+ 0,
+ )
+ # print "Return from map_add_domain", t
if t == None:
- print "map_add_domain failed"
+ print("map_add_domain failed")
continue
if t.retval != 0:
- print "map_add_domain failed", t
+ print(f"map_add_domain failed, {t}")
continue
for psid in range(0x1 << int(psid_len)):
- r = map_add_del_rule(0, t.index, 1, (ip6_dst[(i * (0x1<<int(psid_len))) + psid]).packed, psid)
- #print "Return from map_add_del_rule", r
+ r = map_add_del_rule(
+ 0,
+ t.index,
+ 1,
+ (ip6_dst[(i * (0x1 << int(psid_len))) + psid]).packed,
+ psid,
+ )
+ # print "Return from map_add_del_rule", r
if ip6_src_ecmp and not i % mod:
ip6_src = ip6_src + 1
- print "Running time:", time.clock() - a
+ print(f"Running time: {time.clock() - a}")
+
class TestMAP(unittest.TestCase):
- '''
+ """
def test_delete_all(self):
t = map_domain_dump(0)
self.assertNotEqual(t, None)
- print "Number of domains configured: ", len(t)
+ print(f"Number of domains configured: {len(t)}")
for d in t:
ts = map_del_domain(0, d.domainindex)
self.assertNotEqual(ts, None)
t = map_domain_dump(0)
self.assertNotEqual(t, None)
- print "Number of domains configured: ", len(t)
- self.assertEqual(len(t), 0)
+ print(f"Number of domains configured: {len(t)}")
+ self.assertEqual(len(t), 0)/
- '''
+ """
def test_a_million_rules(self):
- ip4_pfx = u'192.0.2.0/24'
- ip6_pfx = u'2001:db8::/32'
- ip6_src = u'2001:db8::1'
+ ip4_pfx = "192.0.2.0/24"
+ ip6_pfx = "2001:db8::/32"
+ ip6_src = "2001:db8::1"
psid_offset = 6
psid_len = 6
ea_bits_len = 0
lw46_shared(ip4_pfx, ip6_pfx, ip6_src, ea_bits_len, psid_offset, psid_len)
+
#
# RX thread, that should sit on blocking vpe_api_read()
-#
+#
#
#
#
import threading
-class RXThread (threading.Thread):
+
+
+class RXThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
- print "Starting "
+ print("Starting ")
i = 0
while True:
msg = vpe_api_read()
if msg:
- #print msg
- id = unpack('>H', msg[0:2])
- size = unpack('>H', msg[2:4])
- print "Received", id, "of size", size
+ # print msg
+ id = unpack(">H", msg[0:2])
+ size = unpack(">H", msg[2:4])
+ print(f"Received {id} of size {size}")
i += 1
- #del msg
+ # del msg
continue
- #time.sleep(0.001)
+ # time.sleep(0.001)
return
+
# Create RX thread
rxthread = RXThread()
rxthread.setDaemon(True)
-
-print "Connect", connect_to_vpe("client124")
+
+print(f"Connect {connect_to_vpe('client124')}")
import timeit
+
rxthread.start()
-print "After thread started"
+print("After thread started")
-#pneum_kill_thread()
-print "After thread killed"
+# pneum_kill_thread()
+print("After thread killed")
-#t = show_version(0)
-#print "Result from show version", t
+# t = show_version(0)
+# print "Result from show version", t
-print timeit.timeit('t = show_version(0)', number=1000, setup="from __main__ import show_version")
+print(
+ f"{timeit.timeit('t = show_version(0)', number=1000, setup='from __main__ import show_version')}"
+)
time.sleep(10)
-#print timeit.timeit('control_ping(0)', number=10, setup="from __main__ import control_ping")
+# print timeit.timeit('control_ping(0)', number=10, setup="from __main__ import control_ping")
disconnect_from_vpe()
sys.exit()
-print t.program, t.version,t.builddate,t.builddirectory
+print(f"{t.program} {t.version}{t.builddate}{t.builddirectory}")
-'''
+"""
t = map_domain_dump(0)
if not t:
@@ -131,11 +166,9 @@ if not t:
for d in t:
print("IP6 prefix:",str(IPv6Address(d.ip6prefix)))
print( "IP4 prefix:",str(IPv4Address(d.ip4prefix)))
-'''
+"""
suite = unittest.TestLoader().loadTestsFromTestCase(TestMAP)
unittest.TextTestRunner(verbosity=2).run(suite)
disconnect_from_vpe()
-
-