# Copyright (c) 2017 Cisco and/or its affiliates. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at: # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import socket import multiprocessing import argparse from time import time class Counter(object): """Counter used for stats collection.""" def __init__(self, start=0): """Initializer.""" self.lock = multiprocessing.Lock() self.value = start def increment(self, value=1): """Increment counter and return the new value.""" self.lock.acquire() val = self.value try: self.value += value finally: self.lock.release() return val class timer(object): """Timer used used during test execution.""" def __init__(self, verbose=False): self.verbose = verbose def __enter__(self): """Start the timer.""" self.start = time() return self def __exit__(self, *args): """Stop the timer and save current value.""" self.end = time() self.secs = self.end - self.start self.msecs = self.secs * 1000 # millisecs if self.verbose: print("elapsed time: {0} ms".format(self.msecs)) class ConfigBlaster(object): """Generates Netconf requests, receives replies and collects statistics.""" TIMEOUT = 10 # Hello message with capabilities list for Netconf sessions. hello = u""" urn:ietf:params:netconf:base:1.0 ]]>]]>""" # RPC to retrieve VPP version (minimal processing in VPP) request_template = u""" ]]>]]>""" class Stats(object): """Stores and further processes statistics collected by worker threads during their execution. """ def __init__(self): """Initializer.""" self.ok_rqst_rate = Counter(0) self.total_rqst_rate = Counter(0) self.ok_rqsts = Counter(0) self.total_rqsts = Counter(0) def process_stats(self, rqst_stats, elapsed_time): """Calculates the stats for request/reply throughput, and aggregates statistics across all threads. :param rqst_stats: Request statistics dictionary. :param elapsed_time: Elapsed time for the test. :type rqst_stats: dict :type elapsed_time: int :returns: Rates (requests/sec) for successfully finished requests and the total number of requests. :rtype: tuple """ ok_rqsts = rqst_stats["OK"] total_rqsts = sum(rqst_stats.values()) ok_rqst_rate = ok_rqsts / elapsed_time total_rqst_rate = total_rqsts / elapsed_time self.ok_rqsts.increment(ok_rqsts) self.total_rqsts.increment(total_rqsts) self.ok_rqst_rate.increment(ok_rqst_rate) self.total_rqst_rate.increment(total_rqst_rate) return ok_rqst_rate, total_rqst_rate @property def get_ok_rqst_rate(self): return self.ok_rqst_rate.value @property def get_total_rqst_rate(self): return self.total_rqst_rate.value @property def get_ok_rqsts(self): return self.ok_rqsts.value @property def get_total_rqsts(self): return self.total_rqsts.value def __init__(self, host, port, ncycles, nthreads, nrequests): """Initializer. :param host: Target IP address. :param port: Target port. :param ncycles: Number of test cycles. :param nthreads: Number of threads for packet generation. :param nrequests: Number of requests to send per thread. :type host: str :type port: int :type ncycles: int :type nthreads: int :type nrequests: int """ self.host = host self.port = port self.ncycles = ncycles self.nthreads = nthreads self.nrequests = nrequests self.stats = self.Stats() self.total_ok_rqsts = 0 self.print_lock = multiprocessing.Lock() self.cond = multiprocessing.Condition() self.threads_done = 0 self.recv_buf = 8192 def send_request(self, sock): """Send Netconf request and receive the reply. :param sock: Socket object to use for transfer. :type sock: socket object :returns: Response to request or error message. :rtype: str """ sock.send(self.request_template) try: return sock.recv(self.recv_buf) except socket.timeout: return "timeout" except socket.error: return "error" def send_requests(self, tid, stats): """Read entries from the Honeycomb operational data store. This function is executed by a worker thread. :param tid: Thread ID - used to id the Blaster thread when statistics for the thread are printed out. :param stats: Synchronized queue object for returning execution stats. :type tid: int :type stats: multiprocessing.Queue """ rqst_stats = {"OK": 0, "Error": 0, "Timeout": 0} sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(5) # Initiate connection sock.connect((self.host, self.port)) # Send hello message sock.send(self.hello) # Receive hello message sock.recv(self.recv_buf) # Determine length of expected responses self.recv_buf = len(self.send_request(sock)) with self.print_lock: print("\n Thread {0}:\n" " Sending {1} requests".format(tid, self.nrequests)) replies = [None]*self.nrequests with timer() as t: for x in range(self.nrequests): sts = self.send_request(sock) replies[x] = sts for reply in replies: if reply == "timeout": rqst_stats["Timeout"] += 1 elif "error" in reply: rqst_stats["Error"] += 1 else: rqst_stats["OK"] += 1 ok_rps, total_rps = self.stats.process_stats( rqst_stats, t.secs) with self.print_lock: print("\n Thread {0} results (READ): ".format(tid)) print(" Elapsed time: {0:.2f}s,".format(t.secs)) print(" Requests/s: {0:.2f} OK, {1:.2f} Total".format( ok_rps, total_rps)) print(" Stats ({Requests}, {entries}): "), print(rqst_stats) self.threads_done += 1 sock.close() stats.put({"stats": rqst_stats, "time": t.secs}) with self.cond: self.cond.notify_all() def run_cycle(self, function): """Runs a test cycle. Each test consists of test cycles, where worker threads are started in each test cycle. Each thread reads entries using Netconf RPCs. :param function: Function to be executed in each thread. :type function: function :return: None """ self.total_ok_rqsts = 0 stats_queue = multiprocessing.Queue() for c in range(self.ncycles): self.stats = self.Stats() with self.print_lock: print "\nCycle {0}:".format(c) threads = [] thread_stats = [] for i in range(self.nthreads): t = multiprocessing.Process(target=function, args=(i, stats_queue)) threads.append(t) t.start() # Wait for all threads to finish and measure the execution time with timer() as t: for _ in threads: thread_stats.append(stats_queue.get()) for thread in threads: thread.join() for item in thread_stats: self.stats.process_stats(item["stats"], item["time"]) with self.print_lock: print("\n*** Test summary:") print(" Elapsed time: {0:.2f}s".format(t.secs)) print( " Peak requests/s: {0:.2f} OK, {1:.2f} Total".format( self.stats.get_ok_rqst_rate, self.stats.get_total_rqst_rate)) print( " Avg. requests/s: {0:.2f} OK, {1:.2f} Total ({2:.2f} " "of peak total)".format( self.stats.get_ok_rqsts / t.secs, self.stats.get_total_rqsts / t.secs, (self.stats.get_total_rqsts / t.secs * 100) / self.stats.get_total_rqst_rate)) self.total_ok_rqsts += self.stats.get_ok_rqsts self.threads_done = 0 def add_blaster(self): """Run the test.""" self.run_cycle(self.send_requests) @property def get_ok_rqsts(self): return self.total_ok_rqsts def create_arguments_parser(): """Creates argument parser for test script. Shorthand to arg parser on library level in order to access and eventually enhance in ancestors. :returns: argument parser supporting arguments and parameters :rtype: argparse.ArgumentParser """ my_parser = argparse.ArgumentParser( description="entry reading performance test: Reads entries from " "the config tree, as specified by optional parameters.") my_parser.add_argument( "--host", default="127.0.0.1", help="Host where odl controller is running (default is 127.0.0.1).") my_parser.add_argument( "--port", default=7777, help="Port on which Honeycomb's Netconf is listening" " (default is 7777 for TCP)") my_parser.add_argument( "--cycles", type=int, default=1, help="Number of entry read cycles; default 1. worker threads " "are started in each cycle and the cycle ends when all threads " "finish. Another cycle is started when the previous cycle " "is finished.") my_parser.add_argument( "--threads", type=int, default=1, help="Number of request worker threads to start in each cycle; " "default=1. Each thread will read entries.") my_parser.add_argument( "--requests", type=int, default=10, help="Number of requests that will be made by each worker thread " "in each cycle; default 10") return my_parser if __name__ == "__main__": parser = create_arguments_parser() in_args = parser.parse_args() fct = ConfigBlaster(in_args.host, in_args.port, in_args.cycles, in_args.threads, in_args.requests) # Run through , where are started in each cycle and # are added from each thread fct.add_blaster() print " Successful reads: {0}\n".format(fct.get_ok_rqsts)