summaryrefslogtreecommitdiffstats
path: root/scripts/automation/trex_control_plane/stl/services
diff options
context:
space:
mode:
authorAnton Kiselev <anton.kisel@gmail.com>2016-11-01 04:11:57 +0700
committerAnton Kiselev <anton.kisel@gmail.com>2016-11-04 19:54:44 +0700
commit5002eeed7b5557abdce3dade8287d837f2900590 (patch)
treedd2bca40f0b91885657eca9c141cce3990b0ea04 /scripts/automation/trex_control_plane/stl/services
parent15734de2eaa3dec4bbe5a54d8a061c28f26e23bc (diff)
scapy_service: support {vtype: BYTES, generate: ascii/bytes/template/template code }
Signed-off-by: Anton Kiselev <anton.kisel@gmail.com>
Diffstat (limited to 'scripts/automation/trex_control_plane/stl/services')
-rwxr-xr-xscripts/automation/trex_control_plane/stl/services/scapy_server/scapy_service.py48
-rw-r--r--scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_scapy_service.py26
-rw-r--r--scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_utils.py58
3 files changed, 130 insertions, 2 deletions
diff --git a/scripts/automation/trex_control_plane/stl/services/scapy_server/scapy_service.py b/scripts/automation/trex_control_plane/stl/services/scapy_server/scapy_service.py
index 4ce9e4ae..16261b8d 100755
--- a/scripts/automation/trex_control_plane/stl/services/scapy_server/scapy_service.py
+++ b/scripts/automation/trex_control_plane/stl/services/scapy_server/scapy_service.py
@@ -9,6 +9,7 @@ import tempfile
import hashlib
import base64
import numbers
+import random
import inspect
import json
from pprint import pprint
@@ -279,6 +280,48 @@ def get_sample_field_val(scapy_layer, fieldId):
except:
pass
+def generate_random_bytes(sz, seed, start, end):
+ # generate bytes of specified range with a fixed seed and size
+ rnd = random.Random(seed)
+ end = end + 1 # to include end value
+ res = [rnd.randrange(start, end) for _i in range(sz)]
+ if is_python(2):
+ return ''.join(chr(x) for x in res)
+ else:
+ return bytes(res)
+
+def generate_bytes_from_template(sz, template):
+ # generate bytes by repeating a template
+ res = str_to_bytes('') # new bytes array
+ if len(template) == 0:
+ return res
+ while len(res) < sz:
+ res = res + template
+ return res[:sz]
+
+def parse_template_code(template_code):
+ template_code = re.sub("0[xX]", '', template_code) # remove 0x
+ template_code = re.sub("[\s]", '', template_code) # remove spaces
+ return bytearray.fromhex(template_code)
+
+def generate_bytes(bytes_definition):
+ # accepts a bytes definition object
+ # {generate: random_bytes or random_ascii, seed: <seed_number>, size: <size_bytes>}
+ # {generate: template, template_base64: '<base64str>', size: <size_bytes>}
+ # {generate: template_code, template_text_code: '<template_code_str>', size: <size_bytes>}
+ gen_type = bytes_definition.get('generate')
+ bytes_size = bytes_definition.get('size')
+ seed = bytes_definition.get('seed') or 12345
+ if gen_type == 'random_bytes':
+ return generate_random_bytes(bytes_size, seed, 0, 0xFF)
+ elif gen_type == 'random_ascii':
+ return generate_random_bytes(bytes_size, seed, 0x20, 0x7E)
+ elif gen_type == 'template':
+ return generate_bytes_from_template(bytes_size, b64_to_bytes(bytes_definition["template_base64"]))
+ elif gen_type == 'template_code':
+ return generate_bytes_from_template(bytes_size, bytes_definition["template_code"])
+
+
class ScapyException(Exception): pass
class Scapy_service(Scapy_service_api):
@@ -381,7 +424,10 @@ class Scapy_service(Scapy_service_api):
if value_type == 'EXPRESSION':
return eval(val['expr'], {})
elif value_type == 'BYTES': # bytes payload(ex Raw.load)
- return b64_to_bytes(val['base64'])
+ if 'generate' in val:
+ return generate_bytes(val)
+ else:
+ return b64_to_bytes(val['base64'])
elif value_type == 'OBJECT':
return val['value']
else:
diff --git a/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_scapy_service.py b/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_scapy_service.py
index 7126ef4b..b2bc8e03 100644
--- a/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_scapy_service.py
+++ b/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_scapy_service.py
@@ -163,7 +163,7 @@ def test_layer_wrong_structure():
assert(real_structure == ["Ether", "IP", "Raw", None, None])
assert(valid_structure_flags == [True, True, True, False, False])
-def test_hand_crafted_definitions():
+def test_ether_definitions():
etherDef = get_definition_of("Ether")
assert(etherDef['name'] == "Ethernet II")
etherFields = etherDef['fields']
@@ -174,3 +174,27 @@ def test_hand_crafted_definitions():
assert(etherFields[2]['id'] == 'type')
assert(etherFields[2]['name'] == 'Type')
+def test_ether_definitions():
+ pdef = get_definition_of("ICMP")
+ assert(pdef['id'] == "ICMP")
+ assert(pdef['name'])
+ assert(pdef['fields'])
+
+def test_ip_definitions():
+ pdef = get_definition_of("IP")
+ fields = pdef['fields']
+ assert(fields[0]['id'] == 'version')
+
+ assert(fields[1]['id'] == 'ihl')
+ assert(fields[1]['auto'] == True)
+
+ assert(fields[3]['id'] == 'len')
+ assert(fields[3]['auto'] == True)
+
+ assert(fields[5]['id'] == 'flags')
+ assert(fields[5]['type'] == 'BITMASK')
+ assert(fields[5]['bits'][0]['name'] == 'Reserved')
+
+ assert(fields[9]['id'] == 'chksum')
+ assert(fields[9]['auto'] == True)
+
diff --git a/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_utils.py b/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_utils.py
new file mode 100644
index 00000000..8ae186a2
--- /dev/null
+++ b/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_utils.py
@@ -0,0 +1,58 @@
+# run with 'nosetests' utility
+
+from basetest import *
+from scapy_service import *
+
+def test_generate_random_bytes():
+ res = generate_random_bytes(10, 333, ord('0'), ord('9'))
+ print(res)
+ assert(len(res) == 10)
+ assert(res == '5390532937') # random value with this seed
+
+def test_generate_bytes_from_template_empty():
+ res = generate_bytes_from_template(5, b"")
+ print(res)
+ assert(res == b"")
+
+def test_generate_bytes_from_template_less():
+ res = generate_bytes_from_template(5, b"qwe")
+ print(res)
+ assert(res == b"qweqw")
+
+def test_generate_bytes_from_template_same():
+ res = generate_bytes_from_template(5, b"qwert")
+ print(res)
+ assert(res == b"qwert")
+
+def test_generate_bytes_from_template_more():
+ res = generate_bytes_from_template(5, b"qwerty")
+ print(res)
+ assert(res == b"qwert")
+
+def test_parse_template_code_with_trash():
+ res = parse_template_code("0xDE AD\n be ef \t0xDEAD")
+ print(res)
+ assert(res == bytearray.fromhex('DEADBEEFDEAD'))
+
+def test_generate_bytes():
+ res = generate_bytes({"generate":"random_bytes", "seed": 123, "size": 12})
+ print(res)
+ assert(len(res) == 12)
+
+def test_generate_ascii_default_seed():
+ res = generate_bytes({"generate":"random_ascii", "size": 14})
+ print(res)
+ assert(len(res) == 14)
+
+
+def test_generate_template_code():
+ res = generate_bytes({"generate":"template_code", "template_code": "0xDEAD 0xBEEF", "size": 6})
+ print(res)
+ assert(res == bytearray.fromhex('DE AD BE EF DE AD'))
+
+def test_generate_template_code():
+ res = generate_bytes({"generate":"template", "template_base64": bytes_to_b64(b'hi'), "size": 5})
+ print(res)
+ assert(res == b'hihih')
+
+