From 4b49ef2f81941eb403fa7b39024365493a9afb63 Mon Sep 17 00:00:00 2001 From: Anton Kiselev Date: Tue, 18 Oct 2016 19:46:06 +0700 Subject: add unit tests for scapy service Signed-off-by: Anton Kiselev --- .../services/scapy_server/unit_tests/basetest.py | 70 ++++++++++++++ .../scapy_server/unit_tests/test_scapy_service.py | 105 +++++++++++++++++++++ 2 files changed, 175 insertions(+) create mode 100644 scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/basetest.py create mode 100644 scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_scapy_service.py (limited to 'scripts/automation/trex_control_plane/stl/services/scapy_server') diff --git a/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/basetest.py b/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/basetest.py new file mode 100644 index 00000000..79773f98 --- /dev/null +++ b/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/basetest.py @@ -0,0 +1,70 @@ +import os +import sys +import json +import base64 +import inspect +from inspect import getcallargs +# add paths to scapy_service and trex_stl_lib.api +sys.path.append(os.path.abspath(os.pardir)) +sys.path.append(os.path.abspath(os.path.join(os.pardir, os.pardir, os.pardir))) + +from scapy_service import * +from scapy.all import * + +service = Scapy_service() +v_handler = service.get_version_handler('1','01') + +def pretty_json(obj): + return json.dumps(obj, indent=4) + +def pprint(obj): + print(pretty_json(obj)) + +def is_verbose(): + return True + +def pass_result(result, *args): + # returns result unchanged, but can display debug info if enabled + if is_verbose(): + fargs = (inspect.stack()[-1][4]) + print(fargs[0]) + pprint(result) + return result + +def pass_pkt(result): + # returns packet unchanged, but can display debug info if enabled + if is_verbose() and result is not None: + result.show2() + return result + +# utility functions for tests + +def layer_def(layerId, **layerfields): + # test helper method to generate JSON-like protocol definition object for scapy + # ex. { "id": "Ether", "fields": [ { "id": "dst", "value": "10:10:10:10:10:10" } ] } + res = { "id": layerId } + if layerfields: + res["fields"] = [ {"id": k, "value": v} for k,v in layerfields.items() ] + return res + +def get_version_handler(): + return pass_result(service.get_version_handler("1", "01")) + +def build_pkt(model_def): + return pass_result(service.build_pkt(v_handler, model_def)) + +def build_pkt_get_scapy(model_def): + return build_pkt_to_scapy(build_pkt(model_def)) + +def reconstruct_pkt(bytes_b64, model_def): + return pass_result(service.reconstruct_pkt(v_handler, bytes_b64, model_def)) + +def get_definitions(def_filter): + return pass_result(service.get_definitions(v_handler, def_filter)) + +def get_payload_classes(def_filter): + return pass_result(service.get_payload_classes(v_handler, def_filter)) + +def build_pkt_to_scapy(buildpkt_result): + return pass_pkt(Ether(b64_to_bytes(buildpkt_result['binary']))) + diff --git a/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_scapy_service.py b/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_scapy_service.py new file mode 100644 index 00000000..5ea8cf07 --- /dev/null +++ b/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_scapy_service.py @@ -0,0 +1,105 @@ +# +# run with 'nosetests' utility + +import tempfile +from basetest import * + +TEST_MAC_1 = "10:10:10:10:10:10" +# Test scapy structure +TEST_PKT = Ether(dst=TEST_MAC_1)/IP(src='127.0.0.1')/TCP(sport=443) + +# Corresponding JSON-like structure +TEST_PKT_DEF = [ + layer_def("Ether", dst=TEST_MAC_1), + layer_def("IP", dst="127.0.0.1"), + layer_def("TCP", sport="443") + ] + +def test_build_pkt(): + pkt = build_pkt_get_scapy(TEST_PKT_DEF) + assert(pkt[TCP].sport == 443) + +def test_build_invalid_structure_pkt(): + ether_fields = {"dst": TEST_MAC_1, "type": "LOOP"} + pkt = build_pkt_get_scapy([ + layer_def("Ether", **ether_fields), + layer_def("IP"), + layer_def("TCP", sport=8080) + ]) + assert(pkt[Ether].dst == TEST_MAC_1) + assert(isinstance(pkt[Ether].payload, Raw)) + +def test_reconstruct_pkt(): + res = reconstruct_pkt(base64.b64encode(bytes(TEST_PKT)), None) + pkt = build_pkt_to_scapy(res) + assert(pkt[TCP].sport == 443) + +def test_layer_del(): + modif = [ + {"id": "Ether"}, + {"id": "IP"}, + {"id": "TCP", "delete": True}, + ] + res = reconstruct_pkt(base64.b64encode(bytes(TEST_PKT)), modif) + pkt = build_pkt_to_scapy(res) + assert(not pkt[IP].payload) + +def test_layer_field_edit(): + modif = [ + {"id": "Ether"}, + {"id": "IP"}, + {"id": "TCP", "fields": [{"id": "dport", "value": 777}]}, + ] + res = reconstruct_pkt(base64.b64encode(bytes(TEST_PKT)), modif) + pkt = build_pkt_to_scapy(res) + assert(pkt[TCP].dport == 777) + assert(pkt[TCP].sport == 443) + +def test_layer_add(): + modif = [ + {"id": "Ether"}, + {"id": "IP"}, + {"id": "TCP"}, + {"id": "Raw", "fields": [{"id": "load", "value": "GET /helloworld HTTP/1.0\n\n"}]}, + ] + res = reconstruct_pkt(base64.b64encode(bytes(TEST_PKT)), modif) + pkt = build_pkt_to_scapy(res) + assert("GET /helloworld" in str(pkt[TCP].payload.load)) + +def test_build_Raw(): + pkt = build_pkt_get_scapy([ + layer_def("Ether"), + layer_def("IP"), + layer_def("TCP"), + layer_def("Raw", load={"vtype": "BYTES", "base64": bytes_to_b64(b"hi")}) + ]) + assert(str(pkt[Raw].load == "hi")) + +def test_get_all(): + service.get_all(v_handler) + +def test_get_definitions_all(): + get_definitions(None) + def_classnames = [pdef['id'] for pdef in get_definitions(None)['protocols']] + assert("IP" in def_classnames) + assert("Dot1Q" in def_classnames) + assert("TCP" in def_classnames) + +def test_get_definitions_ether(): + res = get_definitions(["Ether"]) + assert(len(res) == 1) + assert(res['protocols'][0]['id'] == "Ether") + +def test_get_payload_classes(): + eth_payloads = get_payload_classes([{"id":"Ether"}]) + assert("IP" in eth_payloads) + assert("Dot1Q" in eth_payloads) + assert("TCP" not in eth_payloads) + +def test_pcap_read_and_write(): + pkts_to_write = [bytes_to_b64(bytes(TEST_PKT))] + pcap_b64 = service.write_pcap(v_handler, pkts_to_write) + array_pkt = service.read_pcap(v_handler, pcap_b64) + pkt = build_pkt_to_scapy(array_pkt[0]) + assert(pkt[Ether].dst == TEST_MAC_1) + -- cgit 1.2.3-korg