From 5002eeed7b5557abdce3dade8287d837f2900590 Mon Sep 17 00:00:00 2001 From: Anton Kiselev Date: Tue, 1 Nov 2016 04:11:57 +0700 Subject: scapy_service: support {vtype: BYTES, generate: ascii/bytes/template/template code } Signed-off-by: Anton Kiselev --- .../stl/services/scapy_server/scapy_service.py | 48 +++++++++++++++++- .../scapy_server/unit_tests/test_scapy_service.py | 26 +++++++++- .../services/scapy_server/unit_tests/test_utils.py | 58 ++++++++++++++++++++++ 3 files changed, 130 insertions(+), 2 deletions(-) create mode 100644 scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_utils.py (limited to 'scripts/automation/trex_control_plane/stl/services/scapy_server') diff --git a/scripts/automation/trex_control_plane/stl/services/scapy_server/scapy_service.py b/scripts/automation/trex_control_plane/stl/services/scapy_server/scapy_service.py index 4ce9e4ae..16261b8d 100755 --- a/scripts/automation/trex_control_plane/stl/services/scapy_server/scapy_service.py +++ b/scripts/automation/trex_control_plane/stl/services/scapy_server/scapy_service.py @@ -9,6 +9,7 @@ import tempfile import hashlib import base64 import numbers +import random import inspect import json from pprint import pprint @@ -279,6 +280,48 @@ def get_sample_field_val(scapy_layer, fieldId): except: pass +def generate_random_bytes(sz, seed, start, end): + # generate bytes of specified range with a fixed seed and size + rnd = random.Random(seed) + end = end + 1 # to include end value + res = [rnd.randrange(start, end) for _i in range(sz)] + if is_python(2): + return ''.join(chr(x) for x in res) + else: + return bytes(res) + +def generate_bytes_from_template(sz, template): + # generate bytes by repeating a template + res = str_to_bytes('') # new bytes array + if len(template) == 0: + return res + while len(res) < sz: + res = res + template + return res[:sz] + +def parse_template_code(template_code): + template_code = re.sub("0[xX]", '', template_code) # remove 0x + template_code = re.sub("[\s]", '', template_code) # remove spaces + return bytearray.fromhex(template_code) + +def generate_bytes(bytes_definition): + # accepts a bytes definition object + # {generate: random_bytes or random_ascii, seed: , size: } + # {generate: template, template_base64: '', size: } + # {generate: template_code, template_text_code: '', size: } + gen_type = bytes_definition.get('generate') + bytes_size = bytes_definition.get('size') + seed = bytes_definition.get('seed') or 12345 + if gen_type == 'random_bytes': + return generate_random_bytes(bytes_size, seed, 0, 0xFF) + elif gen_type == 'random_ascii': + return generate_random_bytes(bytes_size, seed, 0x20, 0x7E) + elif gen_type == 'template': + return generate_bytes_from_template(bytes_size, b64_to_bytes(bytes_definition["template_base64"])) + elif gen_type == 'template_code': + return generate_bytes_from_template(bytes_size, bytes_definition["template_code"]) + + class ScapyException(Exception): pass class Scapy_service(Scapy_service_api): @@ -381,7 +424,10 @@ class Scapy_service(Scapy_service_api): if value_type == 'EXPRESSION': return eval(val['expr'], {}) elif value_type == 'BYTES': # bytes payload(ex Raw.load) - return b64_to_bytes(val['base64']) + if 'generate' in val: + return generate_bytes(val) + else: + return b64_to_bytes(val['base64']) elif value_type == 'OBJECT': return val['value'] else: diff --git a/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_scapy_service.py b/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_scapy_service.py index 7126ef4b..b2bc8e03 100644 --- a/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_scapy_service.py +++ b/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_scapy_service.py @@ -163,7 +163,7 @@ def test_layer_wrong_structure(): assert(real_structure == ["Ether", "IP", "Raw", None, None]) assert(valid_structure_flags == [True, True, True, False, False]) -def test_hand_crafted_definitions(): +def test_ether_definitions(): etherDef = get_definition_of("Ether") assert(etherDef['name'] == "Ethernet II") etherFields = etherDef['fields'] @@ -174,3 +174,27 @@ def test_hand_crafted_definitions(): assert(etherFields[2]['id'] == 'type') assert(etherFields[2]['name'] == 'Type') +def test_ether_definitions(): + pdef = get_definition_of("ICMP") + assert(pdef['id'] == "ICMP") + assert(pdef['name']) + assert(pdef['fields']) + +def test_ip_definitions(): + pdef = get_definition_of("IP") + fields = pdef['fields'] + assert(fields[0]['id'] == 'version') + + assert(fields[1]['id'] == 'ihl') + assert(fields[1]['auto'] == True) + + assert(fields[3]['id'] == 'len') + assert(fields[3]['auto'] == True) + + assert(fields[5]['id'] == 'flags') + assert(fields[5]['type'] == 'BITMASK') + assert(fields[5]['bits'][0]['name'] == 'Reserved') + + assert(fields[9]['id'] == 'chksum') + assert(fields[9]['auto'] == True) + diff --git a/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_utils.py b/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_utils.py new file mode 100644 index 00000000..8ae186a2 --- /dev/null +++ b/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_utils.py @@ -0,0 +1,58 @@ +# run with 'nosetests' utility + +from basetest import * +from scapy_service import * + +def test_generate_random_bytes(): + res = generate_random_bytes(10, 333, ord('0'), ord('9')) + print(res) + assert(len(res) == 10) + assert(res == '5390532937') # random value with this seed + +def test_generate_bytes_from_template_empty(): + res = generate_bytes_from_template(5, b"") + print(res) + assert(res == b"") + +def test_generate_bytes_from_template_less(): + res = generate_bytes_from_template(5, b"qwe") + print(res) + assert(res == b"qweqw") + +def test_generate_bytes_from_template_same(): + res = generate_bytes_from_template(5, b"qwert") + print(res) + assert(res == b"qwert") + +def test_generate_bytes_from_template_more(): + res = generate_bytes_from_template(5, b"qwerty") + print(res) + assert(res == b"qwert") + +def test_parse_template_code_with_trash(): + res = parse_template_code("0xDE AD\n be ef \t0xDEAD") + print(res) + assert(res == bytearray.fromhex('DEADBEEFDEAD')) + +def test_generate_bytes(): + res = generate_bytes({"generate":"random_bytes", "seed": 123, "size": 12}) + print(res) + assert(len(res) == 12) + +def test_generate_ascii_default_seed(): + res = generate_bytes({"generate":"random_ascii", "size": 14}) + print(res) + assert(len(res) == 14) + + +def test_generate_template_code(): + res = generate_bytes({"generate":"template_code", "template_code": "0xDEAD 0xBEEF", "size": 6}) + print(res) + assert(res == bytearray.fromhex('DE AD BE EF DE AD')) + +def test_generate_template_code(): + res = generate_bytes({"generate":"template", "template_base64": bytes_to_b64(b'hi'), "size": 5}) + print(res) + assert(res == b'hihih') + + -- cgit 1.2.3-korg