aboutsummaryrefslogtreecommitdiffstats
path: root/resources/traffic_profiles/trex/trex-sl-2n-ethip6-ip6dst10000.py
blob: 085787aa6ba61e1bfd6c95aefde1bf3ac764925c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# Copyright (c) 2019 Cisco and/or its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Stream profile for T-rex traffic generator.

Stream profile:
 - Two streams sent in directions 0 --> 1 and 1 --> 0 at the same time.
 - Packet: ETH / IPv6 /
 - Direction 0 --> 1:
   - Source IP address range:      2001:1::1
   - Destination IP address range: 2001:2::0 - 2001:2::270F
 - Direction 1 --> 0:
   - Source IP address range:      2001:2::1
   - Destination IP address range: 2001:1::0 - 2001:1::270F
"""

from trex.stl.api import *
from profile_trex_stateless_base_class import TrafficStreamsBaseClass


class TrafficStreams(TrafficStreamsBaseClass):
    """Stream profile."""

    def __init__(self):
        """Initialization and setting of streams' parameters."""

        super(TrafficStreamsBaseClass, self).__init__()

        # IPs used in packet headers.
        self.p1_src_start_ip = '2001:1::1'
        self.p1_dst_start_ip = '2001:2::0'
        self.p1_dst_end_ip = '2001:2::270F'

        self.p2_src_start_ip = '2001:2::1'
        self.p2_dst_start_ip = '2001:1::0'
        self.p2_dst_end_ip = '2001:1::270F'

    def define_packets(self):
        """Defines the packets to be sent from the traffic generator.

        Packet definition: | ETH | IPv6 |

        :returns: Packets to be sent from the traffic generator.
        :rtype: tuple
        """

        base_p1, count_p1 = self._get_start_end_ipv6(self.p1_dst_start_ip,
                                                     self.p1_dst_end_ip)
        base_p2, count_p2 = self._get_start_end_ipv6(self.p2_dst_start_ip,
                                                     self.p2_dst_end_ip)

        # Direction 0 --> 1
        base_pkt_a = Ether() / IPv6(src=self.p1_src_start_ip,
                                    dst=self.p1_dst_start_ip)
        # Direction 1 --> 0
        base_pkt_b = Ether() / IPv6(src=self.p2_src_start_ip,
                                    dst=self.p2_dst_start_ip)

        # Direction 0 --> 1
        vm1 = STLScVmRaw([STLVmFlowVar(name="ipv6_dst",
                                       min_value=base_p1,
                                       max_value=base_p1 + count_p1,
                                       size=8, op="inc"),
                          STLVmWrFlowVar(fv_name="ipv6_dst",
                                         pkt_offset="IPv6.dst",
                                         offset_fixup=8)])
        # Direction 1 --> 0
        vm2 = STLScVmRaw([STLVmFlowVar(name="ipv6_dst",
                                       min_value=base_p2,
                                       max_value=base_p2 + count_p2,
                                       size=8, op="inc"),
                          STLVmWrFlowVar(fv_name="ipv6_dst",
                                         pkt_offset="IPv6.dst",
                                         offset_fixup=8)])

        return base_pkt_a, base_pkt_b, vm1, vm2


def register():
    """Register this traffic profile to T-rex.

    Do not change this function.

    :return: Traffic streams.
    :rtype: Object
    """
    return TrafficStreams()
s="p">): """Check udp checksum in ip packet. Return true if checksum is correct.""" new = pkt.__class__(str(pkt)) del new['UDP'].chksum new = new.__class__(str(new)) return new['UDP'].chksum == pkt['UDP'].chksum def _get_dhcpv6_msgtype(msg_index): """Return DHCPv6 message type string. :param msg_index: Index of message type. :return: Message type. :type msg_index: int :rtype msg_str: str """ dhcp6_messages = { 1: "SOLICIT", 2: "ADVERTISE", 3: "REQUEST", 4: "CONFIRM", 5: "RENEW", 6: "REBIND", 7: "REPLY", 8: "RELEASE", 9: "DECLINE", 10: "RECONFIGURE", 11: "INFORMATION-REQUEST", 12: "RELAY-FORW", 13: "RELAY-REPL" } return dhcp6_messages[msg_index] def dhcpv6_solicit(tx_if, rx_if, dhcp_multicast_ip, link_local_ip, proxy_ip, server_ip, server_mac, client_duid, client_mac): """Send and check DHCPv6 SOLICIT proxy packet. :param tx_if: Client interface. :param rx_if: DHCPv6 server interface. :param dhcp_multicast_ip: Servers and relay agents multicast address. :param link_local_ip: Client link-local address. :param proxy_ip: IP address of DHCPv6 proxy server. :param server_ip: IP address of DHCPv6 server. :param server_mac: MAC address of DHCPv6 server. :param client_duid: Client DHCP Unique Identifier. :param client_mac: Client MAC address. :type tx_if: str :type rx_if: str :type dhcp_multicast_ip: str :type link_local_ip: str :type proxy_ip: str :type server_ip: str :type server_mac: str :type client_duid: str :type client_mac: str :return interface_id: ID of proxy interface. :rtype interface_id: str """ rxq = RxQueue(rx_if) txq = TxQueue(tx_if) sent_packets = [] dhcp6_solicit_pkt = Ether(src=client_mac, dst="33:33:00:01:00:02") / \ IPv6(src=link_local_ip, dst=dhcp_multicast_ip) / \ UDP(sport=UDP_SERVICES.dhcpv6_client, dport=UDP_SERVICES.dhcpv6_server) / \ DHCP6_Solicit() / \ DHCP6OptClientId(duid=client_duid) sent_packets.append(dhcp6_solicit_pkt) txq.send(dhcp6_solicit_pkt) ether = rxq.recv(2) if ether is None: raise RuntimeError('DHCPv6 SOLICIT timeout') if ether.dst != server_mac: raise RuntimeError("Destination MAC address error: {} != {}".format( ether.dst, server_mac)) print "Destination MAC address: OK." if ether['IPv6'].src != proxy_ip: raise RuntimeError("Source IP address error: {} != {}".format( ether['IPv6'].src, proxy_ip)) print "Source IP address: OK." if ether['IPv6'].dst != server_ip: raise RuntimeError("Destination IP address error: {} != {}".format( ether['IPv6'].dst, server_ip)) print "Destination IP address: OK." msgtype = _get_dhcpv6_msgtype(ether['IPv6']['UDP'] ['DHCPv6 Relay Forward Message (Relay Agent/Server Message)'].msgtype) if msgtype != 'RELAY-FORW': raise RuntimeError("Message type error: {} != RELAY-FORW".format( msgtype)) print "Message type: OK." linkaddr = ether['IPv6']['UDP']\ ['DHCPv6 Relay Forward Message (Relay Agent/Server Message)'].linkaddr if linkaddr != proxy_ip: raise RuntimeError("Proxy IP address error: {} != {}".format( linkaddr, proxy_ip)) print "Proxy IP address: OK." try: interface_id = ether['IPv6']['UDP']\ ['DHCPv6 Relay Forward Message (Relay Agent/Server Message)']\ ['Unknown DHCPv6 OPtion']['DHCP6 Interface-Id Option'].ifaceid except Exception: raise RuntimeError("DHCP6 Interface-Id error!") return interface_id def dhcpv6_advertise(rx_if, tx_if, link_local_ip, proxy_ip, server_ip, server_mac, proxy_to_server_mac, interface_id): """Send and check DHCPv6 ADVERTISE proxy packet. :param rx_if: DHCPv6 server interface. :param tx_if: Client interface. :param link_local_ip: Client link-local address. :param proxy_ip: IP address of DHCPv6 proxy server. :param server_ip: IP address of DHCPv6 server. :param server_mac: MAC address of DHCPv6 server. :param proxy_to_server_mac: MAC address of DHCPv6 proxy interface. :param interface_id: ID of proxy interface. :type rx_if: str :type tx_if: str :type link_local_ip: str :type proxy_ip: str :type server_ip: str :type server_mac: str :type proxy_to_server_mac: str :type interface_id: str """ rxq = RxQueue(rx_if) txq = TxQueue(tx_if) sent_packets = [] dhcp6_advertise_pkt = Ether(src=server_mac, dst=proxy_to_server_mac) / \ IPv6(src=server_ip, dst=proxy_ip) / \ UDP(sport=UDP_SERVICES.dhcpv6_server, dport=UDP_SERVICES.dhcpv6_client) / \ DHCP6_RelayReply(peeraddr=link_local_ip, linkaddr=proxy_ip) / \ DHCP6OptIfaceId(ifaceid=interface_id) / \ DHCP6OptRelayMsg() / \ DHCP6_Advertise() sent_packets.append(dhcp6_advertise_pkt) txq.send(dhcp6_advertise_pkt) ether = rxq.recv(2) if ether is None: raise RuntimeError('DHCPv6 ADVERTISE timeout') if ether['IPv6'].src != proxy_ip: raise RuntimeError("Source IP address error: {} != {}".format( ether['IPv6'].src, proxy_ip)) print "Source IP address: OK." if not _check_udp_checksum(ether['IPv6']): raise RuntimeError("Checksum error!") print "Checksum: OK." msgtype = _get_dhcpv6_msgtype(ether['IPv6']['UDP'] ['DHCPv6 Advertise Message'].msgtype) if msgtype != 'ADVERTISE': raise RuntimeError("Message type error: {} != ADVERTISE".format( msgtype)) print "Message type: OK." def dhcpv6_request(tx_if, rx_if, dhcp_multicast_ip, link_local_ip, proxy_ip, server_ip, client_duid, client_mac): """Send and check DHCPv6 REQUEST proxy packet. :param tx_if: Client interface. :param rx_if: DHCPv6 server interface. :param dhcp_multicast_ip: Servers and relay agents multicast address. :param link_local_ip: Client link-local address. :param proxy_ip: IP address of DHCPv6 proxy server. :param server_ip: IP address of DHCPv6 server. :param client_duid: Client DHCP Unique Identifier. :param client_mac: Client MAC address. :type tx_if: str :type rx_if: str :type dhcp_multicast_ip: str :type link_local_ip: str :type proxy_ip: str :type server_ip: str :type client_duid: str :type client_mac: str """ rxq = RxQueue(rx_if) txq = TxQueue(tx_if) sent_packets = [] dhcp6_request_pkt = Ether(src=client_mac, dst="33:33:00:01:00:02") / \ IPv6(src=link_local_ip, dst=dhcp_multicast_ip) / \ UDP(sport=UDP_SERVICES.dhcpv6_client, dport=UDP_SERVICES.dhcpv6_server) / \ DHCP6_Request() / \ DHCP6OptClientId(duid=client_duid) sent_packets.append(dhcp6_request_pkt) txq.send(dhcp6_request_pkt) ether = rxq.recv(2) if ether is None: raise RuntimeError('DHCPv6 REQUEST timeout') if ether['IPv6'].src != proxy_ip: raise RuntimeError("Source IP address error: {} != {}".format( ether['IPv6'].src, proxy_ip)) print "Source IP address: OK." if ether['IPv6'].dst != server_ip: raise RuntimeError("Destination IP address error: {} != {}".format( ether['IPv6'].dst, server_ip)) print "Destination IP address: OK." msgtype = _get_dhcpv6_msgtype(ether['IPv6']['UDP'] ['DHCPv6 Relay Forward Message (Relay Agent/Server Message)'].msgtype) if msgtype != 'RELAY-FORW': raise RuntimeError("Message type error: {} != RELAY-FORW".format( msgtype)) print "Message type: OK." linkaddr = ether['IPv6']['UDP']\ ['DHCPv6 Relay Forward Message (Relay Agent/Server Message)'].linkaddr if linkaddr != proxy_ip: raise RuntimeError("Proxy IP address error: {} != {}".format( linkaddr, proxy_ip)) print "Proxy IP address: OK." def dhcpv6_reply(rx_if, tx_if, link_local_ip, proxy_ip, server_ip, server_mac, interface_id): """Send and check DHCPv6 REPLY proxy packet. :param rx_if: DHCPv6 server interface. :param tx_if: Client interface. :param link_local_ip: Client link-local address. :param proxy_ip: IP address of DHCPv6 proxy server. :param server_ip: IP address of DHCPv6 server. :param server_mac: MAC address of DHCPv6 server. :param interface_id: ID of proxy interface. :type rx_if: str :type tx_if: str :type link_local_ip: str :type proxy_ip: str :type server_ip: str :type server_mac: str :type interface_id: str """ rxq = RxQueue(rx_if) txq = TxQueue(tx_if) sent_packets = [] dhcp_reply_pkt = Ether(src=server_mac) / \ IPv6(src=server_ip, dst=proxy_ip) / \ UDP(sport=UDP_SERVICES.dhcpv6_server, dport=UDP_SERVICES.dhcpv6_client) / \ DHCP6_RelayReply(peeraddr=link_local_ip, linkaddr=proxy_ip) / \ DHCP6OptIfaceId(ifaceid=interface_id) / \ DHCP6OptRelayMsg() / \ DHCP6_Reply() sent_packets.append(dhcp_reply_pkt) txq.send(dhcp_reply_pkt) ether = rxq.recv(2) if ether is None: raise RuntimeError('DHCPv6 REPLY timeout') if ether['IPv6'].src != proxy_ip: raise RuntimeError("Source IP address error: {} != {}".format( ether['IPv6'].src, proxy_ip)) print "Source IP address: OK." if not _check_udp_checksum(ether['IPv6']): raise RuntimeError("Checksum error!") print "Checksum: OK." msgtype = _get_dhcpv6_msgtype(ether['IPv6']['UDP'] ['DHCPv6 Reply Message'].msgtype) if msgtype != 'REPLY': raise RuntimeError("Message type error: {} != REPLY".format(msgtype)) print "Message type: OK." def main(): """Send DHCPv6 proxy messages.""" args = TrafficScriptArg(['tx_src_ip', 'tx_dst_ip', 'proxy_ip', 'proxy_mac', 'server_ip', 'client_mac', 'server_mac', 'proxy_to_server_mac']) client_if = args.get_arg('tx_if') server_if = args.get_arg('rx_if') proxy_ip = args.get_arg('proxy_ip') proxy_mac = args.get_arg('proxy_mac') proxy_to_server_mac = args.get_arg('proxy_to_server_mac') server_ip = args.get_arg('server_ip') client_mac = args.get_arg('client_mac') server_mac = args.get_arg('server_mac') link_local_ip = "fe80::1" dhcp_multicast_ip = "ff02::1:2" client_duid = str(random.randint(0, 9999)) # SOLICIT interface_id = dhcpv6_solicit(client_if, server_if, dhcp_multicast_ip, link_local_ip, proxy_ip, server_ip, server_mac, client_duid, client_mac) # ADVERTISE dhcpv6_advertise(client_if, server_if, link_local_ip, proxy_ip, server_ip, server_mac, proxy_to_server_mac, interface_id) # REQUEST dhcpv6_request(client_if, server_if, dhcp_multicast_ip, link_local_ip, proxy_ip, server_ip, client_duid, client_mac) # REPLY dhcpv6_reply(client_if, server_if, link_local_ip, proxy_ip, server_ip, server_mac, interface_id) sys.exit(0) if __name__ == "__main__": main()