From a60f1f0ae35acfe2cb0d3fe680c05c7f3ac0c67b Mon Sep 17 00:00:00 2001 From: Anton Kiselev Date: Thu, 20 Oct 2016 14:49:44 +0700 Subject: preserve abstract model structure, calculate values and show structure changes --- .../stl/services/scapy_server/scapy_service.py | 29 ++++++++++--- .../services/scapy_server/unit_tests/basetest.py | 14 ++++++ .../scapy_server/unit_tests/test_scapy_service.py | 50 ++++++++++++++++++++++ 3 files changed, 87 insertions(+), 6 deletions(-) (limited to 'scripts/automation/trex_control_plane/stl/services/scapy_server') diff --git a/scripts/automation/trex_control_plane/stl/services/scapy_server/scapy_service.py b/scripts/automation/trex_control_plane/stl/services/scapy_server/scapy_service.py index 2d023dcc..1ad6843c 100755 --- a/scripts/automation/trex_control_plane/stl/services/scapy_server/scapy_service.py +++ b/scripts/automation/trex_control_plane/stl/services/scapy_server/scapy_service.py @@ -434,19 +434,33 @@ class Scapy_service(Scapy_service_api): return { "vtype": "BYTES", "base64": bytes_to_b64(payload_bytes) } def _pkt_to_field_tree(self,pkt): - pkt = self._fully_define(pkt) + pkt.build() result = [] + pcap_struct = self._fully_define(pkt) # structure, which will appear in pcap binary while pkt: layer_id = type(pkt).__name__ # Scapy classname - layer_name = pkt.name # Display name + layer_full = self._fully_define(pkt) # current layer recreated from binary to get auto-calculated vals + real_layer_id = type(pcap_struct).__name__ if pcap_struct else None + valid_struct = True # shows if packet is mapped correctly to the binary representation + if not pcap_struct: + valid_struct = False + elif not issubclass(type(pkt), type(pcap_struct)) and not issubclass(type(pcap_struct), type(pkt)): + # structure mismatch. no need to go deeper in pcap_struct + valid_struct = False + pcap_struct = None fields = [] for field_desc in pkt.fields_desc: field_id = field_desc.name - ignored = field_id not in pkt.fields + ignored = field_id not in layer_full.fields offset = field_desc.offset protocol_offset = pkt.offset field_sz = field_desc.get_size_bytes() - fieldval = getattr(pkt, field_id) + # some values are unavailable in pkt(original model) + # at the same time, + fieldval = pkt.getfieldval(field_id) + pkt_fieldval_defined = is_string(fieldval) or is_number(fieldval) or is_bytes3(fieldval) + if not pkt_fieldval_defined: + fieldval = layer_full.getfieldval(field_id) value = None hvalue = None value_base64 = None @@ -487,7 +501,7 @@ class Scapy_service(Scapy_service_api): hvalue = '' if field_desc.name == 'load': # show Padding(and possible similar classes) as Raw - layer_id = layer_name ='Raw' + layer_id = 'Raw' field_sz = len(pkt) value = self._bytes_to_value(fieldval) field_data = { @@ -502,12 +516,15 @@ class Scapy_service(Scapy_service_api): fields.append(field_data) layer_data = { "id": layer_id, - "name": layer_name, "offset": pkt.offset, "fields": fields, + "real_id": real_layer_id, + "valid_structure": valid_struct, } result.append(layer_data) pkt = pkt.payload + if pcap_struct: + pcap_struct = pcap_struct.payload or None return result #input: container diff --git a/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/basetest.py b/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/basetest.py index 79773f98..17dd304a 100644 --- a/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/basetest.py +++ b/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/basetest.py @@ -68,3 +68,17 @@ def get_payload_classes(def_filter): def build_pkt_to_scapy(buildpkt_result): return pass_pkt(Ether(b64_to_bytes(buildpkt_result['binary']))) +def fields_to_map(field_array): + # [{id, value, hvalue, offset}, ...] to map id -> {value, hvalue, offset} + res = {} + if field_array: + for f in field_array: + res[ f["id"] ] = f + return res + +def adapt_json_protocol_fields(protocols_array): + # replaces layer.fields(array) with map for easier access in tests + for protocol in protocols_array: + # change structure for easier + if protocol.get("fields"): + protocol["fields"] = fields_to_map(protocol["fields"]) diff --git a/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_scapy_service.py b/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_scapy_service.py index 5ea8cf07..9cd473d7 100644 --- a/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_scapy_service.py +++ b/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_scapy_service.py @@ -2,8 +2,11 @@ # run with 'nosetests' utility import tempfile +import re from basetest import * +RE_MAC = "^([0-9A-Fa-f]{2}:){5}([0-9A-Fa-f]{2})$" + TEST_MAC_1 = "10:10:10:10:10:10" # Test scapy structure TEST_PKT = Ether(dst=TEST_MAC_1)/IP(src='127.0.0.1')/TCP(sport=443) @@ -103,3 +106,50 @@ def test_pcap_read_and_write(): pkt = build_pkt_to_scapy(array_pkt[0]) assert(pkt[Ether].dst == TEST_MAC_1) +def test_layer_default_value(): + res = build_pkt([ + layer_def("Ether", src={"vtype": "UNDEFINED"}) + ]) + ether_fields = fields_to_map(res['data'][0]['fields']) + assert(re.match(RE_MAC, ether_fields['src']['value'])) + +def test_layer_random_value(): + res = build_pkt([ + layer_def("Ether", src={"vtype": "RANDOM"}) + ]) + ether_fields = fields_to_map(res['data'][0]['fields']) + assert(re.match(RE_MAC, ether_fields['src']['value'])) + +def test_layer_wrong_structure(): + payload = [ + layer_def("Ether"), + layer_def("IP"), + layer_def("Raw", load="dummy"), + layer_def("Ether"), + layer_def("IP"), + ] + res = build_pkt(payload) + pkt = build_pkt_to_scapy(res) + assert(type(pkt[0]) is Ether) + assert(type(pkt[1]) is IP) + assert(isinstance(pkt[2], Raw)) + assert(not pkt[2].payload) + model = res["data"] + assert(len(payload) == len(model)) + # verify same protocol structure as in abstract model + # and all fields defined + for depth in range(len(payload)): + layer_model = model[depth] + layer_fields = fields_to_map(layer_model["fields"]) + assert(payload[depth]["id"] == model[depth]["id"]) + for field in layer_model["fields"]: + required_field_properties = ["value", "hvalue", "offset"] + for field_property in required_field_properties: + assert(field[field_property] is not None) + if (model[depth]["id"] == "Ether"): + assert(layer_fields["type"]["hvalue"] == "IPv4") + real_structure = [layer["real_id"] for layer in model] + valid_structure_flags = [layer["valid_structure"] for layer in model] + assert(real_structure == ["Ether", "IP", "Raw", None, None]) + assert(valid_structure_flags == [True, True, True, False, False]) + -- cgit 1.2.3-korg