summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHanoh Haim <hhaim@cisco.com>2016-11-08 13:13:20 +0200
committerHanoh Haim <hhaim@cisco.com>2016-11-08 13:13:20 +0200
commitae863c50263aa79391d4b81238c19a4b920ecc47 (patch)
treee10f0988382a9f5045d883f5f97f8a7cae456f05
parent90a015cfcb51897de1836375487347d7b19dd416 (diff)
parent879150e9c79961e07900dfce02c5b53385bc74cb (diff)
Merge branch 'scapy_service_gui_updates' of git://github.com/kisel/trex-core into kisel-scapy_service_gui_updates
-rw-r--r--scripts/automation/trex_control_plane/stl/services/scapy_server/protocols.json194
-rwxr-xr-xscripts/automation/trex_control_plane/stl/services/scapy_server/scapy_service.py144
-rw-r--r--scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/basetest.py3
-rw-r--r--scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_scapy_service.py96
-rw-r--r--scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_utils.py69
5 files changed, 483 insertions, 23 deletions
diff --git a/scripts/automation/trex_control_plane/stl/services/scapy_server/protocols.json b/scripts/automation/trex_control_plane/stl/services/scapy_server/protocols.json
new file mode 100644
index 00000000..f685c06f
--- /dev/null
+++ b/scripts/automation/trex_control_plane/stl/services/scapy_server/protocols.json
@@ -0,0 +1,194 @@
+[
+ {
+ "id": "Ether",
+ "name": "Ethernet II",
+ "fields": [
+ {
+ "id": "dst",
+ "name": "Destination",
+ "type": "MAC_ADDRESS",
+ "regex": "^([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2})$"
+ },
+ {
+ "id": "src",
+ "name": "Source",
+ "type": "MAC_ADDRESS",
+ "regex": "^([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2})$"
+ },
+ {
+ "id": "type",
+ "name": "Type"
+ }
+ ],
+ "payload": ["IP", "IPv6", "Dot1Q", "Raw"]
+ },
+ {
+ "id": "IP",
+ "name": "IPv4",
+ "fields": [
+ {
+ "id": "version",
+ "name": "Version"
+ },
+ {
+ "id": "ihl",
+ "name": "IHL",
+ "type": "NUMBER",
+ "auto": true
+ },
+ {
+ "id": "tos",
+ "name": "TOS",
+ "type": "NUMBER"
+ },
+ {
+ "id": "len",
+ "name": "Total Length",
+ "type": "NUMBER",
+ "auto": true
+ },
+ {
+ "id": "id",
+ "name": "Identification",
+ "type": "NUMBER"
+ },
+ {
+ "id": "flags",
+ "name": "Flags",
+ "type": "BITMASK",
+ "bits": [
+ {"name": "Reserved", "mask": 4, "values":[{"name":"Not Set", "value": 0}, {"name":"Set", "value": 4}]},
+ {"name": "Fragment", "mask": 2, "values":[{"name":"May fragment (0)", "value": 0}, {"name":"Don't fragment (1)", "value": 2}]},
+ {"name": "More Fragments(MF)", "mask": 1, "values":[{"name":"Not Set", "value": 0}, {"name":"Set", "value": 1}]}
+ ]
+ },
+ {
+ "id": "frag",
+ "name": "Fragment offset",
+ "type": "NUMBER"
+ },
+ {
+ "id": "ttl",
+ "name": "TTL",
+ "type": "NUMBER",
+ "min": 1,
+ "max": 255
+
+ },
+ {
+ "id": "proto",
+ "name": "Protocol"
+ },
+ {
+ "id": "chksum",
+ "name": "Checksum",
+ "type": "STRING",
+ "auto": true
+ },
+ {
+ "id": "src",
+ "name": "Source address",
+ "type": "IP_ADDRESS"
+ },
+ {
+ "id": "dst",
+ "name": "Destination address",
+ "type": "IP_ADDRESS"
+ },
+ {
+ "id": "options",
+ "name": "Options",
+ "type": "IP_OPTIONS"
+ }
+ ],
+ "payload": ["TCP", "UDP", "ICMP", "Raw"]
+ },
+ {
+ "id": "TCP",
+ "name": "TCP",
+ "fields": [
+ {
+ "id": "sport",
+ "name": "Source port",
+ "type": "NUMBER",
+ "min": 0,
+ "max": 65535
+
+ },
+ {
+ "id": "dport",
+ "name": "Destination port",
+ "type": "NUMBER",
+ "min": 0,
+ "max": 65535
+ },
+ {
+ "id": "seq",
+ "name": "Sequence number",
+ "type": "NUMBER"
+ },
+ {
+ "id": "ack",
+ "name": "Acknowledgment number",
+ "type": "NUMBER"
+ },
+ {
+ "id": "dataofs",
+ "name": "Data offset",
+ "type": "NUMBER"
+ },
+ {
+ "id": "reserved",
+ "name": "Reserved",
+ "type": "NUMBER"
+ },
+ {
+ "id": "flags",
+ "name": "Flags",
+ "auto": false,
+ "type": "BITMASK",
+ "bits": [
+ {"name": "URG", "mask": 32, "values":[{"name":"Not Set", "value": 0}, {"name":"Set", "value": 32}]},
+ {"name": "ACK", "mask": 16, "values":[{"name":"Not Set", "value": 0}, {"name":"Set", "value": 16}]},
+ {"name": "PSH", "mask": 8, "values":[{"name":"Not Set", "value": 0}, {"name":"Set", "value": 8}]},
+ {"name": "RST", "mask": 4, "values":[{"name":"Not Set", "value": 0}, {"name":"Set", "value": 4}]},
+ {"name": "SYN", "mask": 2, "values":[{"name":"Not Set", "value": 0}, {"name":"Set", "value": 2}]},
+ {"name": "FIN", "mask": 1, "values":[{"name":"Not Set", "value": 0}, {"name":"Set", "value": 1}]}
+ ]
+ },
+ {
+ "id": "window",
+ "name": "Window size",
+ "type": "NUMBER"
+ },
+ {
+ "id": "chksum",
+ "name": "Checksum",
+ "auto": true,
+ "type": "NUMBER"
+ },
+ {
+ "id": "urgptr",
+ "name": "Urgent pointer",
+ "type": "NUMBER"
+ },
+ {
+ "id": "options",
+ "name": "Options",
+ "type": "TCP_OPTIONS"
+ }
+ ]
+ },
+ {
+ "id": "Raw",
+ "name": "Raw",
+ "fields": [
+ {
+ "id": "load",
+ "name": "Payload",
+ "type": "BYTES"
+ }
+ ]
+ }
+]
+
diff --git a/scripts/automation/trex_control_plane/stl/services/scapy_server/scapy_service.py b/scripts/automation/trex_control_plane/stl/services/scapy_server/scapy_service.py
index 91257596..88514aa8 100755
--- a/scripts/automation/trex_control_plane/stl/services/scapy_server/scapy_service.py
+++ b/scripts/automation/trex_control_plane/stl/services/scapy_server/scapy_service.py
@@ -9,6 +9,7 @@ import tempfile
import hashlib
import base64
import numbers
+import random
import inspect
import json
from pprint import pprint
@@ -279,6 +280,64 @@ def get_sample_field_val(scapy_layer, fieldId):
except:
pass
+def generate_random_bytes(sz, seed, start, end):
+ # generate bytes of specified range with a fixed seed and size
+ rnd = random.Random()
+ n = end - start + 1
+ if is_python(2):
+ rnd = random.Random(seed)
+ res = [start + int(rnd.random()*n) for _i in range(sz)]
+ return ''.join(chr(x) for x in res)
+ else:
+ rnd = random.Random()
+ # to generate same random sequence as 2.x
+ rnd.seed(seed, version=1)
+ res = [start + int(rnd.random()*n) for _i in range(sz)]
+ return bytes(res)
+
+def generate_bytes_from_template(sz, template):
+ # generate bytes by repeating a template
+ res = str_to_bytes('') # new bytes array
+ if len(template) == 0:
+ return res
+ while len(res) < sz:
+ res = res + template
+ return res[:sz]
+
+def parse_template_code(template_code):
+ template_code = re.sub("0[xX]", '', template_code) # remove 0x
+ template_code = re.sub("[\s]", '', template_code) # remove spaces
+ return bytearray.fromhex(template_code)
+
+def verify_payload_size(size):
+ assert(size != None)
+ if (size > (1<<20)): # 1Mb ought to be enough for anybody
+ raise ValueError('size is too large')
+
+def generate_bytes(bytes_definition):
+ # accepts a bytes definition object
+ # {generate: random_bytes or random_ascii, seed: <seed_number>, size: <size_bytes>}
+ # {generate: template, template_base64: '<base64str>', size: <size_bytes>}
+ # {generate: template_code, template_text_code: '<template_code_str>', size: <size_bytes>}
+ gen_type = bytes_definition.get('generate')
+ if gen_type == None:
+ return b64_to_bytes(bytes_definition['base64'])
+ elif gen_type == 'template_code':
+ code = parse_template_code(bytes_definition["template_code"])
+ bytes_size = int(bytes_definition.get('size') or len(code))
+ verify_payload_size(bytes_size)
+ return generate_bytes_from_template(bytes_size, code)
+ else:
+ bytes_size = int(bytes_definition['size']) # required
+ seed = int(bytes_definition.get('seed') or 12345) # optional
+ verify_payload_size(bytes_size)
+ if gen_type == 'random_bytes':
+ return generate_random_bytes(bytes_size, seed, 0, 0xFF)
+ elif gen_type == 'random_ascii':
+ return generate_random_bytes(bytes_size, seed, 0x20, 0x7E)
+ elif gen_type == 'template':
+ return generate_bytes_from_template(bytes_size, b64_to_bytes(bytes_definition["template_base64"]))
+
class ScapyException(Exception): pass
class Scapy_service(Scapy_service_api):
@@ -312,7 +371,16 @@ class Scapy_service(Scapy_service_api):
self.version_major = '1'
self.version_minor = '01'
self.server_v_hashed = self._generate_version_hash(self.version_major,self.version_minor)
-
+ self.protocol_definitions = {} # protocolId -> prococol definition overrides data
+ self._load_definitions_from_json()
+
+ def _load_definitions_from_json(self):
+ # load protocol definitions from a json file
+ self.protocol_definitions = {}
+ with open('protocols.json', 'r') as f:
+ protocols = json.load(f)
+ for protocol in protocols:
+ self.protocol_definitions[ protocol['id'] ] = protocol
def _all_protocol_structs(self):
old_stdout = sys.stdout
@@ -370,9 +438,9 @@ class Scapy_service(Scapy_service_api):
if type(val) == type({}):
value_type = val['vtype']
if value_type == 'EXPRESSION':
- return eval(val['expr'], {})
+ return eval(val['expr'], scapy.all.__dict__)
elif value_type == 'BYTES': # bytes payload(ex Raw.load)
- return b64_to_bytes(val['base64'])
+ return generate_bytes(val)
elif value_type == 'OBJECT':
return val['value']
else:
@@ -382,7 +450,7 @@ class Scapy_service(Scapy_service_api):
else:
return val
- def _field_value_from_def(self, layer, fieldId, val):
+ def _field_value_from_def(self, scapy_pkt, layer, fieldId, val):
field_desc = layer.get_field(fieldId)
sample_val = get_sample_field_val(layer, fieldId)
# extensions for field values
@@ -394,6 +462,16 @@ class Scapy_service(Scapy_service_api):
return field_desc.randval()
elif value_type == 'MACHINE': # internal machine field repr
return field_desc.m2i(layer, b64_to_bytes(val['base64']))
+ elif value_type == 'BYTES':
+ if 'total_size' in val: # custom case for total pkt size
+ gen = {}
+ gen.update(val)
+ total_sz = gen['total_size']
+ del gen['total_size']
+ gen['size'] = total_sz - len(scapy_pkt)
+ return generate_bytes(gen)
+ else:
+ return generate_bytes(val)
if is_number(sample_val) and is_string(val):
# human-value. guess the type and convert to internal value
# seems setfieldval already does this for some fields,
@@ -583,22 +661,24 @@ class Scapy_service(Scapy_service_api):
def _verify_version_handler(self,client_v_handler):
return (self.server_v_hashed == client_v_handler)
- def _parse_packet_dict(self,layer,scapy_layers,scapy_layer_names):
- class_name = scapy_layer_names.index(layer['id'])
- class_p = scapy_layers[class_name] # class pointer
+ def _parse_packet_dict(self, layer, layer_classes, base_layer):
+ class_p = layer_classes[layer['id']] # class id -> class dict
scapy_layer = class_p()
if isinstance(scapy_layer, Raw):
scapy_layer.load = str_to_bytes("dummy")
+ if base_layer == None:
+ base_layer = scapy_layer
if 'fields' in layer:
- self._modify_layer(scapy_layer, layer['fields'])
+ self._modify_layer(base_layer, scapy_layer, layer['fields'])
return scapy_layer
def _packet_model_to_scapy_packet(self,data):
- layers = Packet.__subclasses__()
- layer_names = [ layer.__name__ for layer in layers]
- base_layer = self._parse_packet_dict(data[0],layers,layer_names)
+ layer_classes = {}
+ for layer_class in Packet.__subclasses__():
+ layer_classes[layer_class.__name__] = layer_class
+ base_layer = self._parse_packet_dict(data[0], layer_classes, None)
for i in range(1,len(data),1):
- packet_layer = self._parse_packet_dict(data[i],layers,layer_names)
+ packet_layer = self._parse_packet_dict(data[i], layer_classes, base_layer)
base_layer = base_layer/packet_layer
return base_layer
@@ -654,10 +734,9 @@ class Scapy_service(Scapy_service_api):
return pkt_class()
- def _get_payload_classes(self, pkt):
+ def _get_payload_classes(self, pkt_class):
# tries to find, which subclasses allowed.
# this can take long time, since it tries to build packets with all subclasses(O(N))
- pkt_class = type(pkt)
allowed_subclasses = []
for pkt_subclass in conf.layers:
if self._is_packet_class(pkt_subclass):
@@ -671,16 +750,29 @@ class Scapy_service(Scapy_service_api):
pass
return allowed_subclasses
- def _get_fields_definition(self, pkt_class):
+ def _get_fields_definition(self, pkt_class, fieldsDef):
+ # fieldsDef - array of field definitions(or empty array)
fields = []
for field_desc in pkt_class.fields_desc:
+ fieldId = field_desc.name
field_data = {
- "id": field_desc.name,
+ "id": fieldId,
"name": field_desc.name
}
+ for fieldDef in fieldsDef:
+ if fieldDef['id'] == fieldId:
+ field_data.update(fieldDef)
if isinstance(field_desc, EnumField):
try:
field_data["values_dict"] = field_desc.s2i
+ if field_data.get("type") == None:
+ if len(field_data["values_dict"] > 0):
+ field_data["type"] = "ENUM"
+ elif fieldId == 'load':
+ field_data["type"] = "BYTES"
+ else:
+ field_data["type"] = "STRING"
+ field_data["values_dict"] = field_desc.s2i
except:
# MultiEnumField doesn't have s2i. need better handling
pass
@@ -696,17 +788,23 @@ class Scapy_service(Scapy_service_api):
for pkt_class in all_classes:
if self._is_packet_class(pkt_class):
# enumerate all non-abstract Packet classes
+ protocolId = pkt_class.__name__
+ protoDef = self.protocol_definitions.get(protocolId) or {}
protocols.append({
- "id": pkt_class.__name__,
- "name": pkt_class.name,
- "fields": self._get_fields_definition(pkt_class)
+ "id": protocolId,
+ "name": protoDef.get('name') or pkt_class.name,
+ "fields": self._get_fields_definition(pkt_class, protoDef.get('fields') or [])
})
res = {"protocols": protocols}
return res
def get_payload_classes(self,client_v_handler, pkt_model_descriptor):
pkt = self._packet_model_to_scapy_packet(pkt_model_descriptor)
- return [c.__name__ for c in self._get_payload_classes(pkt)]
+ pkt_class = type(pkt.lastlayer())
+ protocolDef = self.protocol_definitions.get(pkt_class.__name__)
+ if protocolDef and protocolDef.get('payload'):
+ return protocolDef['payload']
+ return [c.__name__ for c in self._get_payload_classes(pkt_class)]
#input in string encoded base64
def check_update_of_dbs(self,client_v_handler,db_md5,field_md5):
@@ -725,10 +823,10 @@ class Scapy_service(Scapy_service_api):
else:
raise ScapyException("Fields DB is not up to date")
- def _modify_layer(self, scapy_layer, fields):
+ def _modify_layer(self, scapy_pkt, scapy_layer, fields):
for field in fields:
fieldId = str(field['id'])
- fieldval = self._field_value_from_def(scapy_layer, fieldId, field['value'])
+ fieldval = self._field_value_from_def(scapy_pkt, scapy_layer, fieldId, field['value'])
if fieldval is not None:
scapy_layer.setfieldval(fieldId, fieldval)
else:
@@ -767,7 +865,7 @@ class Scapy_service(Scapy_service_api):
# TODO: support replacing payload, instead of breaking
raise ScapyException("Protocol id inconsistent")
if 'fields' in model_layer:
- self._modify_layer(scapy_layer, model_layer['fields'])
+ self._modify_layer(scapy_pkt, scapy_layer, model_layer['fields'])
return self._pkt_data(scapy_pkt)
def read_pcap(self,client_v_handler,pcap_base64):
diff --git a/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/basetest.py b/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/basetest.py
index 17dd304a..1db2c62b 100644
--- a/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/basetest.py
+++ b/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/basetest.py
@@ -62,6 +62,9 @@ def reconstruct_pkt(bytes_b64, model_def):
def get_definitions(def_filter):
return pass_result(service.get_definitions(v_handler, def_filter))
+def get_definition_of(scapy_classname):
+ return pass_result(service.get_definitions(v_handler, [scapy_classname]))['protocols'][0]
+
def get_payload_classes(def_filter):
return pass_result(service.get_payload_classes(v_handler, def_filter))
diff --git a/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_scapy_service.py b/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_scapy_service.py
index 9cd473d7..d1207ca5 100644
--- a/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_scapy_service.py
+++ b/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_scapy_service.py
@@ -78,6 +78,35 @@ def test_build_Raw():
])
assert(str(pkt[Raw].load == "hi"))
+def test_build_fixed_pkt_size_bytes_gen():
+ pkt = build_pkt_get_scapy([
+ layer_def("Ether"),
+ layer_def("IP"),
+ layer_def("TCP"),
+ layer_def("Raw", load={
+ "vtype": "BYTES",
+ "generate": "template",
+ "total_size": 64,
+ "template_base64": bytes_to_b64(b"hi")
+ })
+ ])
+ print(len(pkt))
+ assert(len(pkt) == 64)
+
+def test_build_fixed_pkt_size_bytes_gen():
+ pkt = build_pkt_get_scapy([
+ layer_def("Ether"),
+ layer_def("IP"),
+ layer_def("TCP"),
+ layer_def("Raw", load={
+ "vtype": "BYTES",
+ "generate": "random_ascii",
+ "total_size": 256
+ })
+ ])
+ print(len(pkt))
+ assert(len(pkt) == 256)
+
def test_get_all():
service.get_all(v_handler)
@@ -98,6 +127,16 @@ def test_get_payload_classes():
assert("IP" in eth_payloads)
assert("Dot1Q" in eth_payloads)
assert("TCP" not in eth_payloads)
+ assert(eth_payloads[0] == "IP") # order(based on prococols.json)
+
+def test_get_tcp_payload_classes():
+ payloads = get_payload_classes([{"id":"TCP"}])
+ assert("Raw" in payloads)
+
+def test_get_dot1q_payload_classes():
+ payloads = get_payload_classes([{"id":"Dot1Q"}])
+ assert("Dot1Q" in payloads)
+ assert("IP" in payloads)
def test_pcap_read_and_write():
pkts_to_write = [bytes_to_b64(bytes(TEST_PKT))]
@@ -120,6 +159,28 @@ def test_layer_random_value():
ether_fields = fields_to_map(res['data'][0]['fields'])
assert(re.match(RE_MAC, ether_fields['src']['value']))
+def test_IP_options():
+ options_expr = "[IPOption_SSRR(copy_flag=0, routers=['1.2.3.4', '5.6.7.8'])]"
+ res = build_pkt([
+ layer_def("Ether"),
+ layer_def("IP", options={"vtype": "EXPRESSION", "expr": options_expr}),
+ ])
+ pkt = build_pkt_to_scapy(res)
+ options = pkt[IP].options
+ assert(options[0].__class__.__name__ == 'IPOption_SSRR')
+ assert(options[0].copy_flag == 0)
+ assert(options[0].routers == ['1.2.3.4', '5.6.7.8'])
+
+def test_TCP_options():
+ options_expr = "[('MSS', 1460), ('NOP', None), ('NOP', None), ('SAckOK', b'')]"
+ pkt = build_pkt_get_scapy([
+ layer_def("Ether"),
+ layer_def("IP"),
+ layer_def("TCP", options={"vtype": "EXPRESSION", "expr": options_expr}),
+ ])
+ options = pkt[TCP].options
+ assert(options[0] == ('MSS', 1460) )
+
def test_layer_wrong_structure():
payload = [
layer_def("Ether"),
@@ -153,3 +214,38 @@ def test_layer_wrong_structure():
assert(real_structure == ["Ether", "IP", "Raw", None, None])
assert(valid_structure_flags == [True, True, True, False, False])
+def test_ether_definitions():
+ etherDef = get_definition_of("Ether")
+ assert(etherDef['name'] == "Ethernet II")
+ etherFields = etherDef['fields']
+ assert(etherFields[0]['id'] == 'dst')
+ assert(etherFields[0]['name'] == 'Destination')
+ assert(etherFields[1]['id'] == 'src')
+ assert(etherFields[1]['name'] == 'Source')
+ assert(etherFields[2]['id'] == 'type')
+ assert(etherFields[2]['name'] == 'Type')
+
+def test_ether_definitions():
+ pdef = get_definition_of("ICMP")
+ assert(pdef['id'] == "ICMP")
+ assert(pdef['name'])
+ assert(pdef['fields'])
+
+def test_ip_definitions():
+ pdef = get_definition_of("IP")
+ fields = pdef['fields']
+ assert(fields[0]['id'] == 'version')
+
+ assert(fields[1]['id'] == 'ihl')
+ assert(fields[1]['auto'] == True)
+
+ assert(fields[3]['id'] == 'len')
+ assert(fields[3]['auto'] == True)
+
+ assert(fields[5]['id'] == 'flags')
+ assert(fields[5]['type'] == 'BITMASK')
+ assert(fields[5]['bits'][0]['name'] == 'Reserved')
+
+ assert(fields[9]['id'] == 'chksum')
+ assert(fields[9]['auto'] == True)
+
diff --git a/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_utils.py b/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_utils.py
new file mode 100644
index 00000000..ceb88b47
--- /dev/null
+++ b/scripts/automation/trex_control_plane/stl/services/scapy_server/unit_tests/test_utils.py
@@ -0,0 +1,69 @@
+# run with 'nosetests' utility
+
+from basetest import *
+from scapy_service import *
+
+def test_generate_random_bytes():
+ res = generate_random_bytes(10, 333, ord('0'), ord('9'))
+ print(res)
+ assert(len(res) == 10)
+ assert(res == b'5390532937') # random value with this seed
+
+def test_generate_bytes_from_template_empty():
+ res = generate_bytes_from_template(5, b"")
+ print(res)
+ assert(res == b"")
+
+def test_generate_bytes_from_template_neg():
+ res = generate_bytes_from_template(-5, b"qwe")
+ assert(res == b"")
+
+def test_generate_bytes_from_template_less():
+ res = generate_bytes_from_template(5, b"qwe")
+ print(res)
+ assert(res == b"qweqw")
+
+def test_generate_bytes_from_template_same():
+ res = generate_bytes_from_template(5, b"qwert")
+ print(res)
+ assert(res == b"qwert")
+
+def test_generate_bytes_from_template_more():
+ res = generate_bytes_from_template(5, b"qwerty")
+ print(res)
+ assert(res == b"qwert")
+
+def test_parse_template_code_with_trash():
+ res = parse_template_code("0xDE AD\n be ef \t0xDEAD")
+ print(res)
+ assert(res == bytearray.fromhex('DEADBEEFDEAD'))
+
+def test_generate_bytes():
+ res = generate_bytes({"generate":"random_bytes", "seed": 123, "size": 12})
+ print(res)
+ assert(len(res) == 12)
+
+def test_generate_ascii_default_seed():
+ res = generate_bytes({"generate":"random_ascii", "size": 14})
+ print(res)
+ assert(len(res) == 14)
+
+
+def test_generate_template_code_no_size():
+ res = generate_bytes({"generate":"template_code", "template_code": "BE EF"})
+ assert(res == bytearray.fromhex('BE EF'))
+
+def test_generate_template_code_less():
+ res = generate_bytes({"generate":"template_code", "template_code": "DE AD BE EF", "size": 2})
+ assert(res == bytearray.fromhex('DE AD'))
+
+def test_generate_template_code_more():
+ res = generate_bytes({"generate":"template_code", "template_code": "0xDEAD 0xBEEF", "size": 6})
+ assert(res == bytearray.fromhex('DE AD BE EF DE AD'))
+
+def test_generate_template_base64():
+ res = generate_bytes({"generate":"template", "template_base64": bytes_to_b64(b'hi'), "size": 5})
+ print(res)
+ assert(res == b'hihih')
+
+