summaryrefslogtreecommitdiffstats
path: root/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_scapy_service.py
diff options
context:
space:
mode:
authorAnton Kiselev <anton.kisel@gmail.com>2016-10-18 19:46:06 +0700
committerAnton Kiselev <anton.kisel@gmail.com>2016-10-18 19:46:06 +0700
commit4b49ef2f81941eb403fa7b39024365493a9afb63 (patch)
tree518c78028891d6bab063e3b412e7a9c8b684b58b /scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_scapy_service.py
parentb1264cf858e129ab4e131122a0ec9e2d75ac179b (diff)
add unit tests for scapy service
Signed-off-by: Anton Kiselev <anton.kisel@gmail.com>
Diffstat (limited to 'scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_scapy_service.py')
-rw-r--r--scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_scapy_service.py105
1 files changed, 105 insertions, 0 deletions
diff --git a/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_scapy_service.py b/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_scapy_service.py
new file mode 100644
index 00000000..5ea8cf07
--- /dev/null
+++ b/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_scapy_service.py
@@ -0,0 +1,105 @@
+#
+# run with 'nosetests' utility
+
+import tempfile
+from basetest import *
+
+TEST_MAC_1 = "10:10:10:10:10:10"
+# Test scapy structure
+TEST_PKT = Ether(dst=TEST_MAC_1)/IP(src='127.0.0.1')/TCP(sport=443)
+
+# Corresponding JSON-like structure
+TEST_PKT_DEF = [
+ layer_def("Ether", dst=TEST_MAC_1),
+ layer_def("IP", dst="127.0.0.1"),
+ layer_def("TCP", sport="443")
+ ]
+
+def test_build_pkt():
+ pkt = build_pkt_get_scapy(TEST_PKT_DEF)
+ assert(pkt[TCP].sport == 443)
+
+def test_build_invalid_structure_pkt():
+ ether_fields = {"dst": TEST_MAC_1, "type": "LOOP"}
+ pkt = build_pkt_get_scapy([
+ layer_def("Ether", **ether_fields),
+ layer_def("IP"),
+ layer_def("TCP", sport=8080)
+ ])
+ assert(pkt[Ether].dst == TEST_MAC_1)
+ assert(isinstance(pkt[Ether].payload, Raw))
+
+def test_reconstruct_pkt():
+ res = reconstruct_pkt(base64.b64encode(bytes(TEST_PKT)), None)
+ pkt = build_pkt_to_scapy(res)
+ assert(pkt[TCP].sport == 443)
+
+def test_layer_del():
+ modif = [
+ {"id": "Ether"},
+ {"id": "IP"},
+ {"id": "TCP", "delete": True},
+ ]
+ res = reconstruct_pkt(base64.b64encode(bytes(TEST_PKT)), modif)
+ pkt = build_pkt_to_scapy(res)
+ assert(not pkt[IP].payload)
+
+def test_layer_field_edit():
+ modif = [
+ {"id": "Ether"},
+ {"id": "IP"},
+ {"id": "TCP", "fields": [{"id": "dport", "value": 777}]},
+ ]
+ res = reconstruct_pkt(base64.b64encode(bytes(TEST_PKT)), modif)
+ pkt = build_pkt_to_scapy(res)
+ assert(pkt[TCP].dport == 777)
+ assert(pkt[TCP].sport == 443)
+
+def test_layer_add():
+ modif = [
+ {"id": "Ether"},
+ {"id": "IP"},
+ {"id": "TCP"},
+ {"id": "Raw", "fields": [{"id": "load", "value": "GET /helloworld HTTP/1.0\n\n"}]},
+ ]
+ res = reconstruct_pkt(base64.b64encode(bytes(TEST_PKT)), modif)
+ pkt = build_pkt_to_scapy(res)
+ assert("GET /helloworld" in str(pkt[TCP].payload.load))
+
+def test_build_Raw():
+ pkt = build_pkt_get_scapy([
+ layer_def("Ether"),
+ layer_def("IP"),
+ layer_def("TCP"),
+ layer_def("Raw", load={"vtype": "BYTES", "base64": bytes_to_b64(b"hi")})
+ ])
+ assert(str(pkt[Raw].load == "hi"))
+
+def test_get_all():
+ service.get_all(v_handler)
+
+def test_get_definitions_all():
+ get_definitions(None)
+ def_classnames = [pdef['id'] for pdef in get_definitions(None)['protocols']]
+ assert("IP" in def_classnames)
+ assert("Dot1Q" in def_classnames)
+ assert("TCP" in def_classnames)
+
+def test_get_definitions_ether():
+ res = get_definitions(["Ether"])
+ assert(len(res) == 1)
+ assert(res['protocols'][0]['id'] == "Ether")
+
+def test_get_payload_classes():
+ eth_payloads = get_payload_classes([{"id":"Ether"}])
+ assert("IP" in eth_payloads)
+ assert("Dot1Q" in eth_payloads)
+ assert("TCP" not in eth_payloads)
+
+def test_pcap_read_and_write():
+ pkts_to_write = [bytes_to_b64(bytes(TEST_PKT))]
+ pcap_b64 = service.write_pcap(v_handler, pkts_to_write)
+ array_pkt = service.read_pcap(v_handler, pcap_b64)
+ pkt = build_pkt_to_scapy(array_pkt[0])
+ assert(pkt[Ether].dst == TEST_MAC_1)
+