summaryrefslogtreecommitdiffstats
path: root/scripts/automation/trex_control_plane/stl/services
diff options
context:
space:
mode:
authorAnton Kiselev <anton.kisel@gmail.com>2016-10-18 19:46:06 +0700
committerAnton Kiselev <anton.kisel@gmail.com>2016-10-18 19:46:06 +0700
commit4b49ef2f81941eb403fa7b39024365493a9afb63 (patch)
tree518c78028891d6bab063e3b412e7a9c8b684b58b /scripts/automation/trex_control_plane/stl/services
parentb1264cf858e129ab4e131122a0ec9e2d75ac179b (diff)
add unit tests for scapy service
Signed-off-by: Anton Kiselev <anton.kisel@gmail.com>
Diffstat (limited to 'scripts/automation/trex_control_plane/stl/services')
-rw-r--r--scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/basetest.py70
-rw-r--r--scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_scapy_service.py105
2 files changed, 175 insertions, 0 deletions
diff --git a/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/basetest.py b/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/basetest.py
new file mode 100644
index 00000000..79773f98
--- /dev/null
+++ b/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/basetest.py
@@ -0,0 +1,70 @@
+import os
+import sys
+import json
+import base64
+import inspect
+from inspect import getcallargs
+# add paths to scapy_service and trex_stl_lib.api
+sys.path.append(os.path.abspath(os.pardir))
+sys.path.append(os.path.abspath(os.path.join(os.pardir, os.pardir, os.pardir)))
+
+from scapy_service import *
+from scapy.all import *
+
+service = Scapy_service()
+v_handler = service.get_version_handler('1','01')
+
+def pretty_json(obj):
+ return json.dumps(obj, indent=4)
+
+def pprint(obj):
+ print(pretty_json(obj))
+
+def is_verbose():
+ return True
+
+def pass_result(result, *args):
+ # returns result unchanged, but can display debug info if enabled
+ if is_verbose():
+ fargs = (inspect.stack()[-1][4])
+ print(fargs[0])
+ pprint(result)
+ return result
+
+def pass_pkt(result):
+ # returns packet unchanged, but can display debug info if enabled
+ if is_verbose() and result is not None:
+ result.show2()
+ return result
+
+# utility functions for tests
+
+def layer_def(layerId, **layerfields):
+ # test helper method to generate JSON-like protocol definition object for scapy
+ # ex. { "id": "Ether", "fields": [ { "id": "dst", "value": "10:10:10:10:10:10" } ] }
+ res = { "id": layerId }
+ if layerfields:
+ res["fields"] = [ {"id": k, "value": v} for k,v in layerfields.items() ]
+ return res
+
+def get_version_handler():
+ return pass_result(service.get_version_handler("1", "01"))
+
+def build_pkt(model_def):
+ return pass_result(service.build_pkt(v_handler, model_def))
+
+def build_pkt_get_scapy(model_def):
+ return build_pkt_to_scapy(build_pkt(model_def))
+
+def reconstruct_pkt(bytes_b64, model_def):
+ return pass_result(service.reconstruct_pkt(v_handler, bytes_b64, model_def))
+
+def get_definitions(def_filter):
+ return pass_result(service.get_definitions(v_handler, def_filter))
+
+def get_payload_classes(def_filter):
+ return pass_result(service.get_payload_classes(v_handler, def_filter))
+
+def build_pkt_to_scapy(buildpkt_result):
+ return pass_pkt(Ether(b64_to_bytes(buildpkt_result['binary'])))
+
diff --git a/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_scapy_service.py b/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_scapy_service.py
new file mode 100644
index 00000000..5ea8cf07
--- /dev/null
+++ b/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_scapy_service.py
@@ -0,0 +1,105 @@
+#
+# run with 'nosetests' utility
+
+import tempfile
+from basetest import *
+
+TEST_MAC_1 = "10:10:10:10:10:10"
+# Test scapy structure
+TEST_PKT = Ether(dst=TEST_MAC_1)/IP(src='127.0.0.1')/TCP(sport=443)
+
+# Corresponding JSON-like structure
+TEST_PKT_DEF = [
+ layer_def("Ether", dst=TEST_MAC_1),
+ layer_def("IP", dst="127.0.0.1"),
+ layer_def("TCP", sport="443")
+ ]
+
+def test_build_pkt():
+ pkt = build_pkt_get_scapy(TEST_PKT_DEF)
+ assert(pkt[TCP].sport == 443)
+
+def test_build_invalid_structure_pkt():
+ ether_fields = {"dst": TEST_MAC_1, "type": "LOOP"}
+ pkt = build_pkt_get_scapy([
+ layer_def("Ether", **ether_fields),
+ layer_def("IP"),
+ layer_def("TCP", sport=8080)
+ ])
+ assert(pkt[Ether].dst == TEST_MAC_1)
+ assert(isinstance(pkt[Ether].payload, Raw))
+
+def test_reconstruct_pkt():
+ res = reconstruct_pkt(base64.b64encode(bytes(TEST_PKT)), None)
+ pkt = build_pkt_to_scapy(res)
+ assert(pkt[TCP].sport == 443)
+
+def test_layer_del():
+ modif = [
+ {"id": "Ether"},
+ {"id": "IP"},
+ {"id": "TCP", "delete": True},
+ ]
+ res = reconstruct_pkt(base64.b64encode(bytes(TEST_PKT)), modif)
+ pkt = build_pkt_to_scapy(res)
+ assert(not pkt[IP].payload)
+
+def test_layer_field_edit():
+ modif = [
+ {"id": "Ether"},
+ {"id": "IP"},
+ {"id": "TCP", "fields": [{"id": "dport", "value": 777}]},
+ ]
+ res = reconstruct_pkt(base64.b64encode(bytes(TEST_PKT)), modif)
+ pkt = build_pkt_to_scapy(res)
+ assert(pkt[TCP].dport == 777)
+ assert(pkt[TCP].sport == 443)
+
+def test_layer_add():
+ modif = [
+ {"id": "Ether"},
+ {"id": "IP"},
+ {"id": "TCP"},
+ {"id": "Raw", "fields": [{"id": "load", "value": "GET /helloworld HTTP/1.0\n\n"}]},
+ ]
+ res = reconstruct_pkt(base64.b64encode(bytes(TEST_PKT)), modif)
+ pkt = build_pkt_to_scapy(res)
+ assert("GET /helloworld" in str(pkt[TCP].payload.load))
+
+def test_build_Raw():
+ pkt = build_pkt_get_scapy([
+ layer_def("Ether"),
+ layer_def("IP"),
+ layer_def("TCP"),
+ layer_def("Raw", load={"vtype": "BYTES", "base64": bytes_to_b64(b"hi")})
+ ])
+ assert(str(pkt[Raw].load == "hi"))
+
+def test_get_all():
+ service.get_all(v_handler)
+
+def test_get_definitions_all():
+ get_definitions(None)
+ def_classnames = [pdef['id'] for pdef in get_definitions(None)['protocols']]
+ assert("IP" in def_classnames)
+ assert("Dot1Q" in def_classnames)
+ assert("TCP" in def_classnames)
+
+def test_get_definitions_ether():
+ res = get_definitions(["Ether"])
+ assert(len(res) == 1)
+ assert(res['protocols'][0]['id'] == "Ether")
+
+def test_get_payload_classes():
+ eth_payloads = get_payload_classes([{"id":"Ether"}])
+ assert("IP" in eth_payloads)
+ assert("Dot1Q" in eth_payloads)
+ assert("TCP" not in eth_payloads)
+
+def test_pcap_read_and_write():
+ pkts_to_write = [bytes_to_b64(bytes(TEST_PKT))]
+ pcap_b64 = service.write_pcap(v_handler, pkts_to_write)
+ array_pkt = service.read_pcap(v_handler, pcap_b64)
+ pkt = build_pkt_to_scapy(array_pkt[0])
+ assert(pkt[Ether].dst == TEST_MAC_1)
+