aboutsummaryrefslogtreecommitdiffstats
path: root/resources/libraries/python/SFC/VerifyPacket.py
diff options
context:
space:
mode:
Diffstat (limited to 'resources/libraries/python/SFC/VerifyPacket.py')
-rw-r--r--resources/libraries/python/SFC/VerifyPacket.py223
1 files changed, 223 insertions, 0 deletions
diff --git a/resources/libraries/python/SFC/VerifyPacket.py b/resources/libraries/python/SFC/VerifyPacket.py
new file mode 100644
index 0000000000..66bd09899d
--- /dev/null
+++ b/resources/libraries/python/SFC/VerifyPacket.py
@@ -0,0 +1,223 @@
+#!/usr/bin/env python
+# Copyright (c) 2017 Cisco and/or its affiliates.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+This module defines the common functions.
+"""
+
+import ipaddress
+
+from scapy.layers.inet import IP, UDP
+from scapy.all import Raw
+from scapy.utils import rdpcap
+from resources.libraries.python.constants import Constants as con
+from resources.libraries.python.SFC.SFCConstants import SFCConstants as sfccon
+from resources.libraries.python.SFC.TunnelProtocol import VxLAN, VxLANGPE, NSH
+
+from robot.api import logger
+
+def valid_ipv4(ipaddr):
+ """Check if IP address has the correct IPv4 address format.
+
+ :param ipaddr: IP address.
+ :type ipaddr: str
+ :returns: True in case of correct IPv4 address format,
+ otherwise return False.
+ :rtype: bool
+ """
+ try:
+ ipaddress.IPv4Address(unicode(ipaddr))
+ return True
+ except (AttributeError, ipaddress.AddressValueError):
+ return False
+
+
+def valid_ipv6(ipaddr):
+ """Check if IP address has the correct IPv6 address format.
+
+ :param ipaddr: IP address.
+ :type ipaddr: str
+ :returns: True in case of correct IPv6 address format,
+ otherwise return False.
+ :rtype: bool
+ """
+ try:
+ ipaddress.IPv6Address(unicode(ipaddr))
+ return True
+ except (AttributeError, ipaddress.AddressValueError):
+ return False
+
+
+class VerifyPacket(object):
+ """Define some functions for the test filed verify."""
+
+ @staticmethod
+ def check_vxlan_protocol(payload_data):
+ """
+ verify the vxlan protocol in the payload data.
+
+ :param payload_data: the payload data in the packet.
+ :type payload_data: str
+ :returns: none
+ :raises RuntimeError: If the vxlan protocol field verify fails.
+ """
+ # get the vxlan packet and check it
+ vxlan_pkt = VxLAN(payload_data[0:8])
+ if vxlan_pkt.flags != sfccon.VxLAN_FLAGS:
+ raise RuntimeError("Unexpected Vxlan flags: {0}".
+ format(vxlan_pkt.flags))
+
+ if vxlan_pkt.vni != sfccon.VxLAN_DEFAULT_VNI:
+ raise RuntimeError("Unexpected VNI flag: {0}".format(vxlan_pkt.vni))
+
+ @staticmethod
+ def check_vxlangpe_nsh_protocol(payload_data, test_type):
+ """
+ verify the vxlangpe and nsh protocol in the payload data.
+
+ :param payload_data: the payload data in the packet.
+ :param test_type: the functional test type.
+ :type payload_data: str
+ :type test_type: str
+ :returns: none
+ :raises RuntimeError: If the vxlangpe and nsh protocol
+ field verify fails.
+ """
+ # get the vxlan-gpe packet and check it
+ vxlangpe_pkt = VxLANGPE(payload_data[0:8])
+ if vxlangpe_pkt.flags != sfccon.VxLANGPE_FLAGS:
+ raise RuntimeError("Unexpected Vxlan-GPE flags: {0}".
+ format(vxlangpe_pkt.flags))
+
+ if vxlangpe_pkt.nextproto != sfccon.VxLANGPE_NEXT_PROTOCOL:
+ raise RuntimeError("next protocol not the NSH")
+
+ if vxlangpe_pkt.vni != sfccon.VxLANGPE_DEFAULT_VNI:
+ raise RuntimeError("Unexpected VNI flag: {0}".
+ format(vxlangpe_pkt.vni))
+
+ # get the NSH packet and check it
+ nsh_pkt = NSH(payload_data[8:32])
+ if nsh_pkt.flags != sfccon.NSH_FLAGS:
+ raise RuntimeError("Unexpected NSH flags: {0}".
+ format(nsh_pkt.flags))
+
+ if nsh_pkt.length != sfccon.NSH_HEADER_LENGTH:
+ raise RuntimeError("NSH length {0} incorrect".
+ format(nsh_pkt.length))
+
+ if nsh_pkt.MDtype != sfccon.NSH_DEFAULT_MDTYPE:
+ raise RuntimeError("NSH MD-Type {0} incorrect".
+ format(nsh_pkt.MDtype))
+
+ if nsh_pkt.nextproto != sfccon.NSH_NEXT_PROTOCOL:
+ raise RuntimeError("NSH next protocol {0} incorrect".
+ format(nsh_pkt.nextproto))
+
+ if test_type == "Proxy Outbound" or test_type == "SFF":
+ expect_nsi = sfccon.NSH_DEFAULT_NSI - 1
+ else:
+ expect_nsi = sfccon.NSH_DEFAULT_NSI
+
+ nsp_nsi = nsh_pkt.nsp_nsi
+ nsp = nsp_nsi >> 8
+ nsi = nsp_nsi & 0x000000FF
+ if nsp != sfccon.NSH_DEFAULT_NSP:
+ raise RuntimeError("NSH Service Path ID {0} incorrect".format(nsp))
+
+ if nsi != expect_nsi:
+ raise RuntimeError("NSH Service Index {0} incorrect".format(nsi))
+
+ nsh_c1 = nsh_pkt.c1
+ if nsh_c1 != sfccon.NSH_DEFAULT_C1:
+ raise RuntimeError("NSH c1 {0} incorrect".format(nsh_c1))
+
+ nsh_c2 = nsh_pkt.c2
+ if nsh_c2 != sfccon.NSH_DEFAULT_C2:
+ raise RuntimeError("NSH c2 {0} incorrect".format(nsh_c2))
+
+ nsh_c3 = nsh_pkt.c3
+ if nsh_c3 != sfccon.NSH_DEFAULT_C3:
+ raise RuntimeError("NSH c3 {0} incorrect".format(nsh_c3))
+
+ nsh_c4 = nsh_pkt.c4
+ if nsh_c4 != sfccon.NSH_DEFAULT_C4:
+ raise RuntimeError("NSH c4 {0} incorrect".format(nsh_c4))
+
+
+ @staticmethod
+ def check_the_nsh_sfc_packet(frame_size, test_type):
+ """
+ verify the NSH SFC functional test loopback packet field
+ is correct.
+
+ :param frame_size: the origin frame size.
+ :param test_type: the test type.
+ (Classifier, Proxy Inbound, Proxy Outbound, SFF).
+ :type frame_size: Integer
+ :type test_type: str
+ :returns: none
+ :raises RuntimeError: If the packet field verify fails.
+ """
+
+ rx_pcapfile = '{0}/nsh_sfc_tests/sfc_scripts/temp_packet.pcap' \
+ .format(con.REMOTE_FW_DIR)
+
+ logger.trace('read pcap file:{0}'.format(rx_pcapfile))
+
+ packets = rdpcap(rx_pcapfile)
+ if len(packets) < 1:
+ raise RuntimeError("No packet is received!")
+
+ ether = packets[0]
+
+ origin_size = int(frame_size)
+ if test_type == "Classifier":
+ expect_pkt_len = origin_size + 74 - 4
+ elif test_type == "Proxy Inbound":
+ expect_pkt_len = origin_size - 24 - 4
+ elif test_type == "Proxy Outbound":
+ expect_pkt_len = origin_size + 24 - 4
+ else:
+ expect_pkt_len = origin_size - 4
+
+ recv_pkt_len = len(ether)
+ if recv_pkt_len != expect_pkt_len:
+ raise RuntimeError("Received packet size {0} not " \
+ "the expect size {1}".format(recv_pkt_len, \
+ expect_pkt_len))
+
+ if not ether.haslayer(IP):
+ raise RuntimeError("Not a IPv4 packet")
+
+ pkt_proto = ether[IP].proto
+ if pkt_proto != sfccon.UDP_PROTOCOL:
+ raise RuntimeError("Not a UDP packet , {0}".format(pkt_proto))
+
+ if test_type == "Proxy Inbound":
+ expect_udp_port = sfccon.VxLAN_UDP_PORT
+ else:
+ expect_udp_port = sfccon.VxLANGPE_UDP_PORT
+
+ dst_port = ether[UDP].dport
+ if dst_port != expect_udp_port:
+ raise RuntimeError("UDP dest port must be {0}, {1}".
+ format(expect_udp_port, dst_port))
+
+ payload_data = ether[Raw].load
+
+ if test_type == "Proxy Inbound":
+ VerifyPacket.check_vxlan_protocol(payload_data)
+ else:
+ VerifyPacket.check_vxlangpe_nsh_protocol(payload_data, test_type)