aboutsummaryrefslogtreecommitdiffstats
path: root/doxygen/siphon-process
blob: 411bf72f0f01cc5f38bd723cc9fbcaf69deb490d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
#!/usr/bin/env python3
# Copyright (c) 2016 Comcast Cable Communications Management, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Filter for .siphon files that are generated by other filters.
# The idea is to siphon off certain initializers so that we can better
# auto-document the contents of that initializer.

import argparse
import logging
import os
import sys

import siphon

DEFAULT_LOGFILE = None
DEFAULT_LOGLEVEL = "info"
DEFAULT_SIPHON = "clicmd"
DEFAULT_FORMAT = "markdown"
DEFAULT_OUTPUT = None
DEFAULT_TEMPLATES = os.path.dirname(__file__) + "/siphon_templates"

ap = argparse.ArgumentParser()
ap.add_argument("--log-file", default=DEFAULT_LOGFILE,
                help="Log file [%s]" % DEFAULT_LOGFILE)
ap.add_argument("--log-level", default=DEFAULT_LOGLEVEL,
                choices=["debug", "info", "warning", "error", "critical"],
                help="Logging level [%s]" % DEFAULT_LOGLEVEL)

ap.add_argument("--type", '-t', metavar="siphon_type", default=DEFAULT_SIPHON,
                choices=siphon.process.siphons.keys(),
                help="Siphon type to process [%s]" % DEFAULT_SIPHON)
ap.add_argument("--format", '-f', default=DEFAULT_FORMAT,
                choices=siphon.process.formats.keys(),
                help="Output format to generate [%s]" % DEFAULT_FORMAT)
ap.add_argument("--output", '-o', metavar="file", default=DEFAULT_OUTPUT,
                help="Output file (uses stdout if not defined) [%s]" %
                     DEFAULT_OUTPUT)
ap.add_argument("--templates", metavar="directory", default=DEFAULT_TEMPLATES,
                help="Path to render templates directory [%s]" %
                     DEFAULT_TEMPLATES)
ap.add_argument("input", nargs='+', metavar="input_file",
                help="Input .siphon files")
args = ap.parse_args()

logging.basicConfig(filename=args.log_file,
                    level=getattr(logging, args.log_level.upper(), None))
log = logging.getLogger("siphon_process")

# Determine where to send the generated output
if args.output is None:
    out = sys.stdout
else:
    out = open(args.output, "w+")

# Get our processor
klass = siphon.process.siphons[args.type]
processor = klass(template_directory=args.templates, format=args.format)

# Load the input files
processor.load_json(args.input)

# Process the data
processor.process(out=out)

# All done
>unconfig_ip4() i.unconfig_ip6() i.admin_down() def send_and_assert_encapped(self, tx, ip6_src, ip6_dst, dmac=None): if not dmac: dmac = self.pg1.remote_mac self.pg0.add_stream(tx) self.pg_enable_capture(self.pg_interfaces) self.pg_start() rx = self.pg1.get_capture(1) rx = rx[0] self.assertEqual(rx[Ether].dst, dmac) self.assertEqual(rx[IP].src, tx[IP].src) self.assertEqual(rx[IPv6].src, ip6_src) self.assertEqual(rx[IPv6].dst, ip6_dst) def test_map_e(self): """ MAP-E """ # # Add a route to the MAP-BR # map_br_pfx = "2001::" map_br_pfx_len = 64 map_route = VppIpRoute(self, map_br_pfx, map_br_pfx_len, [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index, proto=DpoProto.DPO_PROTO_IP6)], is_ip6=1) map_route.add_vpp_config() # # Add a domain that maps from pg0 to pg1 # map_dst = socket.inet_pton(socket.AF_INET6, map_br_pfx) map_src = "3001::1" map_src_n = socket.inet_pton(socket.AF_INET6, map_src) client_pfx = socket.inet_pton(socket.AF_INET, "192.168.0.0") self.vapi.map_add_domain(map_dst, map_br_pfx_len, map_src_n, 128, client_pfx, 16) # # Fire in a v4 packet that will be encapped to the BR # v4 = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / IP(src=self.pg0.remote_ip4, dst='192.168.1.1') / UDP(sport=20000, dport=10000) / Raw('\xa5' * 100)) self.send_and_assert_encapped(v4, map_src, "2001::c0a8:0:0") # # Fire in a V6 encapped packet. # expect a decapped packet on the inside ip4 link # p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) / IPv6(dst=map_src, src="2001::1") / IP(dst=self.pg0.remote_ip4, src='192.168.1.1') / UDP(sport=20000, dport=10000) / Raw('\xa5' * 100)) self.pg1.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() rx = self.pg0.get_capture(1) rx = rx[0] self.assertFalse(rx.haslayer(IPv6)) self.assertEqual(rx[IP].src, p[IP].src) self.assertEqual(rx[IP].dst, p[IP].dst) # # Pre-resolve. No API for this!! # self.vapi.ppcli("map params pre-resolve ip6-nh 4001::1") self.send_and_assert_no_replies(self.pg0, v4, "resovled via default route") # # Add a route to 4001::1. Expect the encapped traffic to be # sent via that routes next-hop # pre_res_route = VppIpRoute( self, "4001::1", 128, [VppRoutePath(self.pg1.remote_hosts[2].ip6, self.pg1.sw_if_index, proto=DpoProto.DPO_PROTO_IP6)], is_ip6=1) pre_res_route.add_vpp_config() self.send_and_assert_encapped(v4, map_src, "2001::c0a8:0:0", dmac=self.pg1.remote_hosts[2].mac) # # change the route to the pre-solved next-hop # pre_res_route.modify([VppRoutePath(self.pg1.remote_hosts[3].ip6, self.pg1.sw_if_index, proto=DpoProto.DPO_PROTO_IP6)]) pre_res_route.add_vpp_config() self.send_and_assert_encapped(v4, map_src, "2001::c0a8:0:0", dmac=self.pg1.remote_hosts[3].mac) # # cleanup. The test infra's object registry will ensure # the route is really gone and thus that the unresolve worked. # pre_res_route.remove_vpp_config() self.vapi.ppcli("map params pre-resolve del ip6-nh 4001::1") def validate(self, rx, expected): self.assertEqual(rx, expected.__class__(str(expected))) def payload(self, len): return 'x' * len def test_map_t(self): """ MAP-T """ # # Add a domain that maps from pg0 to pg1 # map_dst = socket.inet_pton(socket.AF_INET6, "2001:db8::") map_src = socket.inet_pton(socket.AF_INET6, "1234:5678:90ab:cdef::") ip4_pfx = socket.inet_pton(socket.AF_INET, "192.168.0.0") self.vapi.map_add_domain(map_dst, 32, map_src, 64, ip4_pfx, 24, 16, 6, 4, 1) # Enable MAP-T on interfaces. # self.vapi.map_if_enable_disable(1, self.pg0.sw_if_index, 1) # self.vapi.map_if_enable_disable(1, self.pg1.sw_if_index, 1) map_route = VppIpRoute(self, "2001:db8::", 32, [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index, proto=DpoProto.DPO_PROTO_IP6)], is_ip6=1) map_route.add_vpp_config() # # Send a v4 packet that will be translated # p_ether = Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) p_ip4 = IP(src=self.pg0.remote_ip4, dst='192.168.0.1') payload = TCP(sport=0xabcd, dport=0xabcd) p4 = (p_ether / p_ip4 / payload) p6_translated = (IPv6(src="1234:5678:90ab:cdef:ac:1001:200:0", dst="2001:db8:1f0::c0a8:1:f") / payload) p6_translated.hlim -= 1 rx = self.send_and_expect(self.pg0, p4*1, self.pg1) for p in rx: self.validate(p[1], p6_translated) # Send back an IPv6 packet that will be "untranslated" p_ether6 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) p_ip6 = IPv6(src='2001:db8:1f0::c0a8:1:f', dst='1234:5678:90ab:cdef:ac:1001:200:0') p6 = (p_ether6 / p_ip6 / payload) p4_translated = (IP(src='192.168.0.1', dst=self.pg0.remote_ip4) / payload) p4_translated.id = 0 p4_translated.ttl -= 1 rx = self.send_and_expect(self.pg1, p6*1, self.pg0) for p in rx: self.validate(p[1], p4_translated) # IPv4 TTL ip4_ttl_expired = IP(src=self.pg0.remote_ip4, dst='192.168.0.1', ttl=0) p4 = (p_ether / ip4_ttl_expired / payload) icmp4_reply = (IP(id=0, ttl=254, src=self.pg0.local_ip4, dst=self.pg0.remote_ip4) / ICMP(type='time-exceeded', code='ttl-zero-during-transit') / IP(src=self.pg0.remote_ip4, dst='192.168.0.1', ttl=0) / payload) rx = self.send_and_expect(self.pg0, p4*1, self.pg0) for p in rx: self.validate(p[1], icmp4_reply) ''' This one is broken, cause it would require hairpinning... # IPv4 TTL TTL1 ip4_ttl_expired = IP(src=self.pg0.remote_ip4, dst='192.168.0.1', ttl=1) p4 = (p_ether / ip4_ttl_expired / payload) icmp4_reply = IP(id=0, ttl=254, src=self.pg0.local_ip4, dst=self.pg0.remote_ip4) / \ ICMP(type='time-exceeded', code='ttl-zero-during-transit' ) / \ IP(src=self.pg0.remote_ip4, dst='192.168.0.1', ttl=0) / payload rx = self.send_and_expect(self.pg0, p4*1, self.pg0) for p in rx: self.validate(p[1], icmp4_reply) ''' # IPv6 Hop limit ip6_hlim_expired = IPv6(hlim=0, src='2001:db8:1ab::c0a8:1:ab', dst='1234:5678:90ab:cdef:ac:1001:200:0') p6 = (p_ether6 / ip6_hlim_expired / payload) icmp6_reply = (IPv6(hlim=255, src=self.pg1.local_ip6, dst="2001:db8:1ab::c0a8:1:ab") / ICMPv6TimeExceeded(code=0) / IPv6(src="2001:db8:1ab::c0a8:1:ab", dst='1234:5678:90ab:cdef:ac:1001:200:0', hlim=0) / payload) rx = self.send_and_expect(self.pg1, p6*1, self.pg1) for p in rx: self.validate(p[1], icmp6_reply) # IPv4 Well-known port p_ip4 = IP(src=self.pg0.remote_ip4, dst='192.168.0.1') payload = UDP(sport=200, dport=200) p4 = (p_ether / p_ip4 / payload) self.send_and_assert_no_replies(self.pg0, p4*1) # IPv6 Well-known port payload = UDP(sport=200, dport=200) p6 = (p_ether6 / p_ip6 / payload) self.send_and_assert_no_replies(self.pg1, p6*1) # Packet fragmentation payload = UDP(sport=40000, dport=4000) / self.payload(1453) p4 = (p_ether / p_ip4 / payload) self.pg_enable_capture() self.pg0.add_stream(p4) self.pg_start() rx = self.pg1.get_capture(2) for p in rx: pass # TODO: Manual validation # self.validate(p[1], icmp4_reply) # Packet fragmentation send fragments payload = UDP(sport=40000, dport=4000) / self.payload(1453) p4 = (p_ether / p_ip4 / payload) frags = fragment(p4, fragsize=1000) self.pg_enable_capture() self.pg0.add_stream(frags) self.pg_start() rx = self.pg1.get_capture(2) for p in rx: pass # p.show2() # reass_pkt = reassemble(rx) # p4_reply.ttl -= 1 # p4_reply.id = 256 # self.validate(reass_pkt, p4_reply) if __name__ == '__main__': unittest.main(testRunner=VppTestRunner)