#!/usr/bin/env python3 # # Copyright (c) 2019 Cisco and/or its affiliates. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at: # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # # Convert from VPP API trace to JSON. import argparse import struct import sys import logging import json from ipaddress import * from collections import namedtuple from vpp_papi import MACAddress, VPPApiJSONFiles import base64 import os import textwrap def serialize_likely_small_unsigned_integer(x): r = x # Low bit set means it fits into 1 byte. if r < (1 << 7): return struct.pack("B", 1 + 2 * r) # Low 2 bits 1 0 means it fits into 2 bytes. r -= (1 << 7) if r < (1 << 14): return struct.pack("<H", 4 * r + 2) r -= (1 << 14) if r < (1 << 29): return struct.pack("<I", 8 * r + 4) return struct.pack("<BQ", 0, x) def unserialize_likely_small_unsigned_integer(data, offset): y = struct.unpack_from("B", data, offset)[0] if y & 1: return y // 2, 1 r = 1 << 7 if y & 2: p = struct.unpack_from("B", data, offset + 1)[0] r += (y // 4) + (p << 6) return r, 2 r += 1 << 14 if y & 4: (p1, p2, p3) = struct.unpack_from("BBB", data, offset+1) r += ((y // 8) + (p1 << (5 + 8 * 0)) + (p2 << (5 + 8 * 1)) + (p3 << (5 + 8 * 2))) return r, 3 return struct.unpack_from(">Q", data, offset+1)[0], 8 def serialize_cstring(s): bstring = s.encode('utf8') l = len(bstring) b = serialize_likely_small_unsigned_integer(l) b += struct.pack('{}s'.format(l), bstring) return b def unserialize_cstring(data, offset): l, size = unserialize_likely_small_unsigned_integer(data, offset) name = struct.unpack_from('{}s'.format(l), data, offset+size)[0] return name.decode('utf8'), size + len(name) def unserialize_msgtbl(data, offset): msgtable_by_id = {} msgtable_by_name = {} i = 0 nmsg = struct.unpack_from(">I", data, offset)[0] o = 4 while i < nmsg: (msgid, size) = unserialize_likely_small_unsigned_integer( data, offset + o) o += size (name, size) = unserialize_cstring(data, offset + o) o += size msgtable_by_id[msgid] = name msgtable_by_name[name] = msgid i += 1 return msgtable_by_id, msgtable_by_name, o def serialize_msgtbl(messages): offset = 0 # XXX 100K? data = bytearray(100000) nmsg = len(messages) data = struct.pack(">I", nmsg) for k, v in messages.items(): name = k + '_' + v.crc[2:] data += serialize_likely_small_unsigned_integer(v._vl_msg_id) data += serialize_cstring(name) return data def apitrace2json(messages, filename): result = [] with open(filename, 'rb') as file: bytes_read = file.read() # Read header (nitems, msgtbl_size, wrapped) = struct.unpack_from(">IIB", bytes_read, 0) logging.debug('nitems: {} message table size: {} wrapped: {}' .format(nitems, msgtbl_size, wrapped)) if wrapped: sys.stdout.write('Wrapped/incomplete trace, results may vary') offset = 9 msgtbl_by_id, msgtbl_by_name, size = unserialize_msgtbl(bytes_read, offset) offset += size i = 0 while i < nitems: size = struct.unpack_from(">I", bytes_read, offset)[0] offset += 4 if size == 0: break msgid = struct.unpack_from(">H", bytes_read, offset)[0] name = msgtbl_by_id[msgid] n = name[:name.rfind("_")] msgobj = messages[n] if n + '_' + msgobj.crc[2:] != name: sys.exit("CRC Mismatch between JSON API definition " "and trace. {}".format(name)) x, s = msgobj.unpack(bytes_read[offset:offset+size]) msgname = type(x).__name__ offset += size # Replace named tuple illegal _0 y = x._asdict() y.pop('_0') result.append({'name': msgname, 'args': y}) i += 1 file.close() return result def json2apitrace(messages, filename): """Input JSON file and API message definition. Output API trace bytestring.""" msgs = [] with open(filename, 'r') as file: msgs = json.load(file, object_hook=vpp_decode) result = b'' for m in msgs: name = m['name'] msgobj = messages[name] m['args']['_vl_msg_id'] = messages[name]._vl_msg_id b = msgobj.pack(m['args']) result += struct.pack('>I', len(b)) result += b return len(msgs), result class VPPEncoder(json.JSONEncoder): def default(self, o): if type(o) is bytes: return "base64:" + base64.b64encode(o).decode('utf-8') # Let the base class default method raise the TypeError return json.JSONEncoder.default(self, o) def encode(self, obj): def hint_tuples(item): if isinstance(item, tuple): return hint_tuples(item._asdict()) if isinstance(item, list): return [hint_tuples(e) for e in item] if isinstance(item, dict): return {key: hint_tuples(value) for key, value in item.items()} else: return item return super(VPPEncoder, self).encode(hint_tuples(obj)) def vpp_decode(obj): for k, v in obj.items(): if type(v) is str and v.startswith('base64:'): s = v.lstrip('base64:') obj[k] = base64.b64decode(v[7:]) return obj def vpp_encoder(obj): if isinstance(obj, IPv6Network): return str(obj) if isinstance(obj, IPv4Network): return str(obj) if isinstance(obj, IPv6Address): return str(obj) if isinstance(obj, IPv4Address): return str(obj) if isinstance(obj, MACAddress): return str(obj) if type(obj) is bytes: return "base64:" + base64.b64encode(obj).decode('ascii') raise TypeError('Unknown object {} {}\n'.format(type(obj), obj)) message_filter = { 'control_ping', 'memclnt_create', 'memclnt_delete', 'get_first_msg_id', } argument_filter = { 'client_index', 'context', } def topython(messages, services): import pprint pp = pprint.PrettyPrinter() s = '''\ #!/usr/bin/env python3 from vpp_papi import VPP, VppEnum vpp = VPP(use_socket=True) vpp.connect(name='vppapitrace') ''' for m in messages: if m['name'] not in services: s += '# ignoring reply message: {}\n'.format(m['name']) continue if m['name'] in message_filter: s += '# ignoring message {}\n'.format(m['name']) continue for k in argument_filter: try: m['args'].pop(k) except KeyError: pass a = pp.pformat(m['args']) s += 'rv = vpp.api.{}(**{})\n'.format(m['name'], a) s += 'print("RV:", rv)\n' s += 'vpp.disconnect()\n' return s def todump_items(k, v, level): klen = len(k) if k else 0 spaces = ' ' * level + ' ' * (klen + 3) wrapper = textwrap.TextWrapper(initial_indent="", subsequent_indent=spaces, width=60) s = '' if type(v) is dict: if k: s += ' ' * level + '{}:\n'.format(k) for k2, v2 in v.items(): s += todump_items(k2, v2, level + 1) return s if type(v) is list: for v2 in v: s += '{}'.format(todump_items(k, v2, level)) return s if type(v) is bytes: w = wrapper.fill(bytes.hex(v)) s += ' ' * level + '{}: {}\n'.format(k, w) else: if type(v) is str: v = wrapper.fill(v) s += ' ' * level + '{}: {}\n'.format(k, v) return s def todump(messages, services): import pprint pp = pprint.PrettyPrinter() s = '' for m in messages: if m['name'] not in services: s += '# ignoring reply message: {}\n'.format(m['name']) continue #if m['name'] in message_filter: # s += '# ignoring message {}\n'.format(m['name']) # continue for k in argument_filter: try: m['args'].pop(k) except KeyError: pass a = pp.pformat(m['args']) s += '{}:\n'.format(m['name']) s += todump_items(None, m['args'], 0) return s def init_api(apidir): # Read API definitions apifiles = VPPApiJSONFiles.find_api_files(api_dir=apidir) messages = {} services = {} for file in apifiles: with open(file) as apidef_file: m, s = VPPApiJSONFiles.process_json_file(apidef_file) messages.update(m) services.update(s) return messages, services def replaymsgs(vpp, msgs): for m in msgs: name = m['name'] if name not in vpp.services: continue if name == 'control_ping': continue try: m['args'].pop('client_index') except KeyError: pass if m['args']['context'] == 0: m['args']['context'] = 1 f = vpp.get_function(name) rv = f(**m['args']) print('RV {}'.format(rv)) def replay(args): """Replay into running VPP instance""" from vpp_papi import VPP JSON = 1 APITRACE = 2 filename, file_extension = os.path.splitext(args.input) input_type = JSON if file_extension == '.json' else APITRACE vpp = VPP(use_socket=args.socket) rv = vpp.connect(name='vppapireplay', chroot_prefix=args.shmprefix) if rv != 0: sys.exit('Cannot connect to VPP') if input_type == JSON: with open(args.input, 'r') as file: msgs = json.load(file, object_hook=vpp_decode) else: msgs = apitrace2json(messages, args.input) replaymsgs(vpp, msgs) vpp.disconnect() def generate(args): """Generate JSON""" JSON = 1 APITRACE = 2 PYTHON = 3 DUMP = 4 filename, file_extension = os.path.splitext(args.input) input_type = JSON if file_extension == '.json' else APITRACE filename, file_extension = os.path.splitext(args.output) if args.todump: output_type = DUMP else: if file_extension == '.json' or filename == '-': output_type = JSON elif file_extension == '.py': output_type = PYTHON else: output_type = APITRACE if input_type == output_type: sys.exit("error: Nothing to convert between") if input_type != JSON and output_type == APITRACE: sys.exit("error: Input file must be JSON file: {}".format(args.input)) messages, services = init_api(args.apidir) if input_type == JSON and output_type == APITRACE: i = 0 for k, v in messages.items(): v._vl_msg_id = i i += 1 n, result = json2apitrace(messages, args.input) msgtbl = serialize_msgtbl(messages) print('API messages: {}'.format(n)) header = struct.pack(">IIB", n, len(msgtbl), 0) with open(args.output, 'wb') as outfile: outfile.write(header) outfile.write(msgtbl) outfile.write(result) return if input_type == APITRACE: result = apitrace2json(messages, args.input) if output_type == PYTHON: s = json.dumps(result, cls=VPPEncoder, default=vpp_encoder) x = json.loads(s, object_hook=vpp_decode) s = topython(x, services) elif output_type == DUMP: s = json.dumps(result, cls=VPPEncoder, default=vpp_encoder) x = json.loads(s, object_hook=vpp_decode) s = todump(x, services) else: s = json.dumps(result, cls=VPPEncoder, default=vpp_encoder, indent=4 * ' ') elif output_type == PYTHON: with open(args.input, 'r') as file: x = json.load(file, object_hook=vpp_decode) s = topython(x, services) else: sys.exit('Input file must be API trace file: {}'.format(args.input)) if args.output == '-': sys.stdout.write(s + '\n') else: print('Generating {} from API trace: {}' .format(args.output, args.input)) with open(args.output, 'w') as outfile: outfile.write(s) def general(args): return def main(): parser = argparse.ArgumentParser() parser.add_argument('--debug', action='store_true', help='enable debug mode') parser.add_argument('--apidir', help='Location of JSON API definitions') parser.set_defaults(func=general) subparsers = parser.add_subparsers(title='subcommands', description='valid subcommands', help='additional help') parser_convert = subparsers.add_parser('convert', help='Convert API trace to JSON or Python and back') parser_convert.add_argument('input', help='Input file (API trace | JSON)') parser_convert.add_argument('--todump', action='store_true', help='Output text format') parser_convert.add_argument('output', help='Output file (Python | JSON | API trace)') parser_convert.set_defaults(func=generate) parser_replay = subparsers.add_parser('replay', help='Replay messages to running VPP instance') parser_replay.add_argument('input', help='Input file (API trace | JSON)') parser_replay.add_argument('--socket', action='store_true', help='use default socket to connect to VPP') parser_replay.add_argument('--shmprefix', help='connect to VPP on shared memory prefix') parser_replay.set_defaults(func=replay) args = parser.parse_args() if args.debug: logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) args.func(args) main()