From 27a56cad3679e4decbcca90acfb22c55a22153e0 Mon Sep 17 00:00:00 2001 From: Vratko Polak Date: Fri, 30 Nov 2018 14:49:10 +0100 Subject: PLRsearch: Initial implementation and suites Missing bits: - Add up-to-date .rst document (in child Change). - Prepare for releasing to PyPI.org (in child Change): -- Either copy dependencies from MLRsearch, or list in requirements. -- Perhaps move common dependencies to separate package for both search to depend on. -- All the other metadata stuff. Jira: CSIT-1276 Change-Id: I277efdb63dbb90b30e11f4e30a82e2130ac8efc3 Signed-off-by: Vratko Polak --- resources/libraries/python/MLRsearch/__init__.py | 2 +- resources/libraries/python/PLRsearch/Integrator.py | 378 +++++++++++++ resources/libraries/python/PLRsearch/PLRsearch.py | 596 +++++++++++++++++++++ resources/libraries/python/PLRsearch/__init__.py | 16 + resources/libraries/python/PLRsearch/log_plus.py | 89 +++ resources/libraries/python/TrafficGenerator.py | 44 ++ resources/libraries/python/autogen/Regenerator.py | 5 + 7 files changed, 1129 insertions(+), 1 deletion(-) create mode 100644 resources/libraries/python/PLRsearch/Integrator.py create mode 100644 resources/libraries/python/PLRsearch/PLRsearch.py create mode 100644 resources/libraries/python/PLRsearch/__init__.py create mode 100644 resources/libraries/python/PLRsearch/log_plus.py (limited to 'resources/libraries/python') diff --git a/resources/libraries/python/MLRsearch/__init__.py b/resources/libraries/python/MLRsearch/__init__.py index 4cc8158397..70c713eaa0 100644 --- a/resources/libraries/python/MLRsearch/__init__.py +++ b/resources/libraries/python/MLRsearch/__init__.py @@ -12,5 +12,5 @@ # limitations under the License. """ -__init__ file for Python package "MLRsearch" +__init__ file for Python package "MLRsearch". """ diff --git a/resources/libraries/python/PLRsearch/Integrator.py b/resources/libraries/python/PLRsearch/Integrator.py new file mode 100644 index 0000000000..f8846fdf62 --- /dev/null +++ b/resources/libraries/python/PLRsearch/Integrator.py @@ -0,0 +1,378 @@ +# Copyright (c) 2019 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Module for numerical integration, tightly coupled to PLRsearch algorithm. + +See log_plus for an explanation why None acts as a special case "float" number. + +TODO: Separate optimizations specific to PLRsearch + and distribute as a standalone package so other projects may reuse. +""" + +import math +import traceback + +import dill +import numpy + +# TODO: The preferred way to consume this code is via a pip package. +# If your project copies code instead, make sure your pylint check does not +# require these imports to be absolute and descending from your superpackage. +from log_plus import log_plus + + +def try_estimate_nd(communication_pipe, scale_coeff=10.0, trace_enabled=False): + """Call estimate_nd but catch any exception and send traceback.""" + try: + return estimate_nd(communication_pipe, scale_coeff, trace_enabled) + except BaseException: + # Any subclass could have caused estimate_nd to stop before sending, + # so we have to catch them all. + traceback_string = traceback.format_exc() + communication_pipe.send(traceback_string) + # After sendig, re-raise, so usages other than "one process per call" + # keep behaving correctly. + raise + + +# FIXME: Pylint reports multiple complexity violations. +# Refactor the code, using less (but structured) variables +# and function calls for (relatively) loosly coupled code blocks. +def estimate_nd(communication_pipe, scale_coeff=10.0, trace_enabled=False): + """Use Bayesian inference from control queue, put result to result queue. + + TODO: Use a logging framework that works in a user friendly way. + (Note that multiprocessing_logging does not work well with robot + and robotbackgroundlogger only works for threads, not processes. + Or, wait for https://github.com/robotframework/robotframework/pull/2182 + Anyway, the current implementation with trace_enabled looks ugly.) + + The result is average and standard deviation for posterior distribution + of a single dependent (positive scalar) value. + The prior is assumed to be uniform on (-1, 1) for every parameter. + Number of parameters and the function for computing + the dependent value and likelihood both come from input. + + The likelihood is assumed to be extremely uneven (but never zero), + so the function should return the logarithm of the likelihood. + The integration method is basically a Monte Carlo + (TODO: Add links to notions used here.), + but importance sampling is used in order to focus + on the part of parameter space with (relatively) non-negligible likelihood. + + Multivariate Gauss distribution is used for focusing, + so only unimodal posterior distributions are handled correctly. + Initial samples are mostly used for shaping (and shifting) + the Gaussian distribution, later samples will probably dominate. + Thus, initially the algorithm behavior resembles more "find the maximum", + as opposed to "reliably integrate". As for later iterations of PLRsearch, + it is assumed that the distribution position does not change rapidly; + thus integration algorithm returns also the distribution data, + to be used as initial focus in next iteration. + + After some number of samples (depends on dimension), + the algorithm starts tracking few most likely samples to base the Gaussian + distribution around, mixing with estimates from observed samples. + The idea is that even when (usually) one of the samples dominates, + first few are treated as if equally likely, to get reasonable focus. + During the "find the maximum" phase, this set of best samples + frequently takes a wrong shape (compared to observed samples + in equilibrium). Therefore scale_coeff argument is left for humans to tweak, + so the convergence is reliable and quick. + Any data (other than correctly weighted samples) used to keep + distribution shape reasonable is called "bias", regardles of + whether it comes from input hint, or from tracking top samples. + + Until the distribution locates itself roughly around + the maximum likeligood point, the integration results are probably wrong. + That means some minimal time is needed for the result to become reliable. + The reported standard distribution attempts to signal inconsistence + (when one sample has dominating weight compared to the rest of samples), + but some human supervision is strongly encouraged. + + To facilitate running in worker processes, arguments and results + are communicated via pipe. The computation does not start + until arguments appear in the pipe, the computation stops + when another item (stop object) is detected in the pipe + (and result is put to pipe). + + TODO: Create classes for arguments and results, + so their fields are documented (and code perhaps more readable). + + Input/argument object (received from pipe) + is a 4-tuple of the following fields: + - dimension: Integer, number of parameters to consider. + - dilled_function: Function (serialized using dill), which: + - - Takes the dimension number of float parameters from (-1, 1). + - - Returns float 2-tuple of dependent value and parameter log-likelihood. + - param_hint_avg: Dimension-tuple of floats to start searching around. + - param_hint_cov: Covariance matrix defining initial focus shape. + + Output/result object (sent to pipe queue) + is a 6-tuple of the following fields: + - value_avg: Float estimate of posterior average dependent value. + - value_stdev: Float estimate of posterior standard deviation of the value. + - param_importance_avg: Float tuple, center of Gaussian to use next. + - param_importance_cov: Float covariance matrix of the Gaussian to use next. + - debug_list: List of debug strings to log at main process. + - trace_list: List of trace strings to pass to main process if enabled. + Trace strings are very verbose, it is not recommended to enable them. + In they are not enabled, trace_list will be empty. + It is recommended to edit some lines manually to debug_list if needed. + + :param communication_pipe: Pipe to comunicate with boss process. + :param scale_coeff: Float number to tweak convergence speed with. + :param trace_enabled: Whether trace list should be populated at all. + Default: False + :type communication_pipe: multiprocessing.Connection (or compatible) + :type scale_coeff: float + :type trace_enabled: boolean + :raises OverflowError: If one sample dominates the rest too much. + Or if value_logweight_function does not handle + some part of parameter space carefully enough. + :raises numpy.linalg.LinAlgError: If the focus shape gets singular + (due to rounding errors). Try changing scale_coeff. + """ + + # Block until input object appears. + dimension, dilled_function, param_hint_avg, param_hint_cov = ( + communication_pipe.recv()) + debug_list = list() + trace_list = list() + def trace(name, value): + """ + Add a variable (name and value) to trace list (if enabled). + + This is a closure (not a pure function), + as it accesses trace_list and trace_enabled + (without any of them being an explicit argument). + + :param name: Any string identifying the value. + :param value: Any object to log repr of. + :type name: str + :type value: object + """ + if trace_enabled: + trace_list.append(name + " " + repr(value)) + value_logweight_function = dill.loads(dilled_function) + len_top = (dimension + 2) * (dimension + 1) / 2 + top_weight_param = list() + samples = 0 + log_sum_weight = None + # Importance sampling produces samples of higher weight (important) + # more frequently, and corrects that by adding weight bonus + # for the less frequently (unimportant) samples. + # But "corrected_weight" is too close to "weight" to be readable, + # so "importance" is used instead, even if it runs contrary to what + # important region is. + log_sum_importance = None + log_importance_best = None + value_avg = 0.0 + # 1x1 dimensional covariance matrix is just variance. + # As variance is never negative, we can track logarithm. + value_log_variance = None + # Here "secondary" means "excluding the weightest sample". + log_secondary_sum_importance = None + value_secondary_avg = 0.0 + value_log_secondary_variance = None + param_sampled_avg = [0.0 for first in range(dimension)] + # TODO: Examine whether we can gain speed by tracking triangle only. + # Covariance matrix can contain negative element (off-diagonal), + # so no logarithm here. This can lead to zeroes on diagonal, + # but we have biasing to make sure it does not hurt much. + param_sampled_cov = [[0.0 for first in range(dimension)] + for second in range(dimension)] + # The next two variables do NOT need to be initialized here, + # but pylint is not a mathematician enough to understand that. + param_top_avg = [0.0 for first in range(dimension)] + param_top_cov = [[0.0 for first in range(dimension)] + for second in range(dimension)] + if not (param_hint_avg and param_hint_cov): + # First call has Nones instead of useful hints. + param_hint_avg = [0.0 for first in range(dimension)] + param_hint_cov = [ + [1.0 if first == second else 0.0 for first in range(dimension)] + for second in range(dimension)] + while not communication_pipe.poll(): + # Compute focus data. + if len(top_weight_param) < len_top: + # Not enough samples for reasonable top, use hint bias. + param_focus_avg = param_hint_avg + param_focus_cov = param_hint_cov + else: + # We have both top samples and overall samples. + # Mix them according to how much the weightest sample dominates. + log_top_weight = top_weight_param[0][0] + log_weight_norm = log_plus(log_sum_weight, log_top_weight) + top_ratio = math.exp(log_top_weight - log_weight_norm) + sampled_ratio = math.exp(log_sum_weight - log_weight_norm) + trace("log_top_weight", log_top_weight) + trace("log_sum_weight", log_sum_weight) + trace("top_ratio", top_ratio) + trace("sampled_ratio", sampled_ratio) + param_focus_avg = [ + sampled_ratio * param_sampled_avg[first] + + top_ratio * param_top_avg[first] + for first in range(dimension)] + param_focus_cov = [[ + scale_coeff * ( + sampled_ratio * param_sampled_cov[first][second] + + top_ratio * param_top_cov[first][second]) + for first in range(dimension)] for second in range(dimension)] + trace("param_focus_avg", param_focus_avg) + trace("param_focus_cov", param_focus_cov) + # Generate next sample. + while 1: + # TODO: Inform pylint that correct version of numpy is available. + sample_point = numpy.random.multivariate_normal( + param_focus_avg, param_focus_cov, 1)[0] + # Multivariate Gauss can fall outside (-1, 1) interval + for first in range(dimension): + sample_coordinate = sample_point[first] + if sample_coordinate <= -1.0 or sample_coordinate >= 1.0: + break + else: # These two breaks implement "level two continue". + break + trace("sample_point", sample_point) + samples += 1 + value, log_weight = value_logweight_function(*sample_point) + trace("value", value) + trace("log_weight", log_weight) + # Update bias related statistics. + log_sum_weight = log_plus(log_sum_weight, log_weight) + if len(top_weight_param) < len_top: + top_weight_param.append((log_weight, sample_point)) + # Hack: top_weight_param[-1] is either the smallest, + # or the just appended to len_top-1 item list. + if (len(top_weight_param) >= len_top + and log_weight >= top_weight_param[-1][0]): + top_weight_param = top_weight_param[:-1] + top_weight_param.append((log_weight, sample_point)) + top_weight_param.sort(key=lambda item: -item[0]) + trace("top_weight_param", top_weight_param) + # top_weight_param has changed, recompute biases. + param_top_avg = top_weight_param[0][1] + param_top_cov = [[0.0 for first in range(dimension)] + for second in range(dimension)] + top_item_count = 1 + for _, near_top_param in top_weight_param[1:]: + top_item_count += 1 + next_item_ratio = 1.0 / top_item_count + previous_items_ratio = 1.0 - next_item_ratio + param_shift = [ + near_top_param[first] - param_top_avg[first] + for first in range(dimension)] + # Do not move center from the weightest sample. + for second in range(dimension): + for first in range(dimension): + param_top_cov[first][second] += ( + param_shift[first] * param_shift[second] + * next_item_ratio) + param_top_cov[first][second] *= previous_items_ratio + trace("param_top_avg", param_top_avg) + trace("param_top_cov", param_top_cov) + # The code above looked at weight (not importance). + # The code below looks at importance (not weight). + param_shift = [sample_point[first] - param_focus_avg[first] + for first in range(dimension)] + rarity_gradient = numpy.linalg.solve(param_focus_cov, param_shift) + rarity_step = numpy.vdot(param_shift, rarity_gradient) + log_rarity = rarity_step / 2.0 + trace("log_rarity", log_rarity) + trace("samples", samples) + log_importance = log_weight + log_rarity + trace("log_importance", log_importance) + # Update sampled statistics. + old_log_sum_importance = log_sum_importance + log_sum_importance = log_plus(old_log_sum_importance, log_importance) + trace("new log_sum_weight", log_sum_weight) + trace("log_sum_importance", log_sum_importance) + if old_log_sum_importance is None: + param_sampled_avg = list(sample_point) + value_avg = value + # Other value related quantities stay None. + continue + previous_samples_ratio = math.exp( + old_log_sum_importance - log_sum_importance) + new_sample_ratio = math.exp(log_importance - log_sum_importance) + param_shift = [sample_point[first] - param_sampled_avg[first] + for first in range(dimension)] + value_shift = value - value_avg + for first in range(dimension): + param_sampled_avg[first] += param_shift[first] * new_sample_ratio + old_value_avg = value_avg + value_avg += value_shift * new_sample_ratio + value_absolute_shift = abs(value_shift) + for second in range(dimension): + for first in range(dimension): + param_sampled_cov[first][second] += ( + param_shift[first] * param_shift[second] * new_sample_ratio) + param_sampled_cov[first][second] *= previous_samples_ratio + trace("param_sampled_avg", param_sampled_avg) + trace("param_sampled_cov", param_sampled_cov) + update_secondary_stats = True + if log_importance_best is None or log_importance > log_importance_best: + log_importance_best = log_importance + log_secondary_sum_importance = old_log_sum_importance + value_secondary_avg = old_value_avg + value_log_secondary_variance = value_log_variance + update_secondary_stats = False + # TODO: Update all primary quantities before secondary ones. + # (As opposed to current hybrid code.) + if value_absolute_shift > 0.0: + value_log_variance = log_plus( + value_log_variance, 2 * math.log(value_absolute_shift) + + log_importance - log_sum_importance) + if value_log_variance is not None: + value_log_variance -= log_sum_importance - old_log_sum_importance + if not update_secondary_stats: + continue + # TODO: Pylint says the following variable name is bad. + # Make sure the refactor uses shorter names. + old_log_secondary_sum_importance = log_secondary_sum_importance + log_secondary_sum_importance = log_plus( + old_log_secondary_sum_importance, log_importance) + if old_log_secondary_sum_importance is None: + value_secondary_avg = value + continue + new_sample_secondary_ratio = math.exp( + log_importance - log_secondary_sum_importance) + # TODO: No would-be variable named old_value_secondary_avg + # appears in subsequent computations. Probably means there is a bug. + value_secondary_shift = value - value_secondary_avg + value_secondary_absolute_shift = abs(value_secondary_shift) + value_secondary_avg += ( + value_secondary_shift * new_sample_secondary_ratio) + if value_secondary_absolute_shift > 0.0: + value_log_secondary_variance = log_plus( + value_log_secondary_variance, ( + 2 * math.log(value_secondary_absolute_shift) + + log_importance - log_secondary_sum_importance)) + if value_log_secondary_variance is not None: + value_log_secondary_variance -= ( + log_secondary_sum_importance - old_log_secondary_sum_importance) + debug_list.append("integrator used " + str(samples) + " samples") + debug_list.append( + "value_avg " + str(value_avg) + + " param_sampled_avg " + repr(param_sampled_avg) + + " param_sampled_cov " + repr(param_sampled_cov) + + " value_log_variance " + str(value_log_variance) + + " value_log_secondary_variance " + str(value_log_secondary_variance)) + value_stdev = math.exp( + (2 * value_log_variance - value_log_secondary_variance) / 2.0) + debug_list.append("top_weight_param[0] " + repr(top_weight_param[0])) + # Intentionally returning param_focus_avg and param_focus_cov, + # instead of possibly hyper-focused bias or sampled. + communication_pipe.send( + (value_avg, value_stdev, param_focus_avg, param_focus_cov, debug_list, + trace_list)) diff --git a/resources/libraries/python/PLRsearch/PLRsearch.py b/resources/libraries/python/PLRsearch/PLRsearch.py new file mode 100644 index 0000000000..f1b3f740c2 --- /dev/null +++ b/resources/libraries/python/PLRsearch/PLRsearch.py @@ -0,0 +1,596 @@ +# Copyright (c) 2019 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Module holding PLRsearch class.""" + +import logging +import math +import multiprocessing +import time + +import dill +# TODO: Inform pylint about scipy (of correct version) being available. +from scipy.special import erfcx, erfc + +# TODO: The preferred way to consume this code is via a pip package. +# If your project copies code instead, make sure your pylint check does not +# require these imports to be absolute and descending from your superpackage. +import Integrator +from log_plus import log_plus, log_minus + + +class PLRsearch(object): + """A class to encapsulate data relevant for search method. + + The context is performance testing of packet processing systems. + The system, when being offered a steady stream of packets, + can process some of them successfully, other are considered "lost". + + See docstring of the search method for algorithm description. + + Two constants are stored as class fields for speed. + + Method othed than search (and than __init__) + are just internal code structure. + TODO: Those method names should start with underscore then. + + TODO: Figure out how to replace #print with logging + without slowing down too much. + """ + + xerfcx_limit = math.pow(math.acos(0), -0.5) + log_xerfcx_10 = math.log(xerfcx_limit - math.exp(10) * erfcx(math.exp(10))) + + def __init__( + self, measurer, trial_duration_per_trial, packet_loss_ratio_target, + trial_number_offset=0, timeout=60.0): + """Store rate measurer and additional parameters. + + Also declare packet_loss_per_second_target field (float), + to be initialized later. + + TODO: Copy AbstractMeasurer from MLRsearch. + + :param measurer: The measurer to call when searching. + :param trial_duration_per_trial: Each trial has larger duration + than the previous trial. This is the increment, in seconds. + :param packet_loss_ratio_target: The algorithm tries to estimate + the offered load leading to this ratio on average. + Trial ratio is number of packets lost divided by packets offered. + :param trial_number_offset: The "first" trial number will be 1+this. + Use this to ensure first iterations have enough time to compute + reasonable estimates for later trials to use. + :param timeout: The search ends if it lasts more than this many seconds. + :type measurer: MLRsearch.AbstractMeasurer + :type trial_duration_per_trial: float + :type packet_loss_ratio_target: float + :type trial_number_offset: int + :type timeout: float + """ + self.measurer = measurer + self.trial_duration_per_trial = trial_duration_per_trial + self.packet_loss_ratio_target = packet_loss_ratio_target + self.trial_number_offset = trial_number_offset + self.timeout = timeout + + def search(self, min_rate, max_rate): + """Perform the search, return average and stdev for throughput estimate. + + Considering measurer and packet_loss_ratio_target (see __init__), + find such an offered load (called critical load) that is expected + to hit the target loss ratio in the limit of very long trial duration. + As the system is probabilistic (and test duration is finite), + the critical ratio is only estimated. + Return the average and standard deviation of the estimate. + + In principle, this algorithm performs trial measurements, + each with varied offered load (which is constant during the trial). + During each measurement, Bayesian inference is performed + on all the measurement results so far. + When timeout is up, the last estimate is returned, + else another trial is performed. + + It is assumed that the system under test, even though not deterministic, + still follows the rule of large numbers. In another words, + any growing set of measurements at a particular offered load + will converge towards unique (for the given load) packet loss ratio. + This means there is a deterministic (but unknown) function + mapping the offered load to average loss ratio. + This function is called loss function. + This also assumes the average loss ratio + does not depend on trial duration. + + The actual probability distribution of loss counts, achieving + the average ratio on trials of various duration + can be complicated (and can depend on offered load), but simply assuming + Poisson distribution will make the algorithm converge. + Binomial distribution would be more precise, + but Poisson is more practical, as it effectively gives + less information content to high ratio results. + + Even when applying other assumptions on the loss function + (increasing function, limit zero ratio when load goes to zero, + global upper limit on rate of packets processed), there are still + too many different shapes of possible loss functions, + which makes full Bayesian reasoning intractable. + + This implementation radically simplifies things by examining + only two shapes, each with finitely many (in this case just two) + parameters. In other words, two fitting functions + (each with two parameters and one argument). + When restricting model space to one of the two fitting functions, + the Bayesian inference becomes tractable (even though it needs + numerical integration from Integrator class). + + The first measurement is done at min_rate to help with convergence + if max_rate measurements give loss below target. + FIXME: Fix overflow error and really use min_rate. + The second measurement is done at max_rate, next few measurements + have offered load of previous load minus excess loss ratio. + This simple rule is found to be good when offered loads + so far are way above the critical rate. After few measurements, + inference from fitting functions converges faster that the initial + "optimistic" procedure. + + Offered loads close to (limiting) critical rate are the most useful, + as linear approximation of the fitting function + becomes good enough there (thus reducing the impact + of the overall shape of fitting function). + After several trials, usually one of the fitting functions + has better predictions than the other one, but the algorithm + does not track that. Simply, it uses the estimate average, + alternating between the functions. + + The returned average and stdev is a combination of the two fitting + estimates. + + TODO: If measurement at max_rate is already below target loss rate, + the algorithm always measures at max_rate, and likelihood + no longer has single maximum, leading to "random" estimates. + Find a way to avoid that. + + :param min_rate: Avoid measuring at offered loads below this, + in packets per second. + :param max_rate: Avoid measuring at offered loads above this, + in packets per second. + :type min_rate: float + :type max_rate: float + :returns: Average and stdev of critical load estimate. + :rtype: 2-tuple of floats + """ + stop_time = time.time() + self.timeout + min_rate = float(min_rate) + max_rate = float(max_rate) + trial_result_list = list() + trial_number = self.trial_number_offset + integrator_data = (None, None, None, None) + message = "Trial %(number)r computed avg %(avg)r stdev %(stdev)r" + message += " stretch %(a1)r erf %(a2)r difference %(diff)r" + transmit_rate = (min_rate + max_rate) / 2.0 + while 1: + trial_number += 1 + trial_duration = trial_number * self.trial_duration_per_trial + results = self.measure_and_compute( + trial_duration, transmit_rate, trial_result_list, max_rate, + integrator_data) + measurement, average, stdev, avg1, avg2, integrator_data = results + logging.info(message, { + "number": trial_number, "avg": average, "stdev": stdev, + "a1": avg1, "a2": avg2, "diff": avg2 - avg1}) + if stop_time <= time.time(): + return average, stdev + trial_result_list.append(measurement) + if (trial_number - self.trial_number_offset) <= 1: + next_load = max_rate + elif (trial_number - self.trial_number_offset) <= 3: + next_load = (measurement.receive_rate + / (1.0 - self.packet_loss_ratio_target)) + else: + next_load = avg1 if trial_number % 2 else avg2 + transmit_rate = min(max_rate, max(min_rate, next_load)) + + @staticmethod + def lfit_stretch(load, mrr, spread): + """Stretch-based fitting function. + + Return the logarithm of average packet loss per second + when the load (argument) is offered to a system with given + mrr and spread (parameters). + Stretch function is 1/(1+Exp[-x]). The average itself is definite + integral from zero to load, of shifted and x-scaled stretch function. + As the integrator is sensitive to discontinuities, + and it calls this function at large areas of parameter space, + the implementation has to avoid rounding errors, overflows, + and correctly approximate underflows. + + TODO: Explain how the high-level description + has been converted into an implementation full of ifs. + + :param load: Offered load (positive), in packets per second. + :param mrr: Parameter of this fitting function, equal to limiting + (positive) average number of packets received (as opposed to lost) + when offered load is many spreads more than mrr. + :param spread: The x-scaling parameter (positive). No nice semantics, + roughly corresponds to size of "tail" for loads below mrr. + :type load: float + :type mrr: float + :type spread: float + :returns: Logarithm of average number of packets lost per second. + :rtype: float + """ + # TODO: chi is from https://en.wikipedia.org/wiki/Nondimensionalization + chi = (load - mrr) / spread + chi0 = -mrr / spread +# print "load", load, "mrr", mrr, "spread", spread, "chi", chi + if chi > 0: + log_lps = math.log( + load - mrr + (log_plus(0, -chi) - log_plus(0, chi0)) * spread) +# print "big loss direct log_lps", log_lps + else: + approx = (math.exp(chi) - math.exp(2 * chi) / 2) * spread + if approx == 0.0: + log_lps = chi +# print "small loss crude log_lps", log_lps + return log_lps + third = math.exp(3 * chi) / 3 * spread + if approx + third != approx + 2 * third: + log_lps = math.log( + (log_plus(0, chi) - log_plus(0, chi0)) * spread) +# print "small loss direct log_lps", log_lps + else: + log_lps = math.log( + approx - (math.exp(chi0) - math.exp(2 * chi0)) * spread) +# print "small loss approx log_lps", log_lps + return log_lps + + @staticmethod + def lfit_erf(load, mrr, spread): + """Erf-based fitting function. + + Return the logarithm of average packet loss per second + when the load (argument) is offered to a system with given + mrr and spread (parameters). + Erf function is Primitive function to normal distribution density. + The average itself is definite integral from zero to load, + of shifted and x-scaled erf function. + As the integrator is sensitive to discontinuities, + and it calls this function at large areas of parameter space, + the implementation has to avoid rounding errors, overflows, + and correctly approximate underflows. + + TODO: Explain how the high-level description + has been converted into an implementation full of ifs. + + :param load: Offered load (positive), in packets per second. + :param mrr: Parameter of this fitting function, equal to limiting + (positive) average number of packets received (as opposed to lost) + when offered load is many spreads more than mrr. + :param spread: The x-scaling parameter (positive). No nice semantics, + roughly corresponds to size of "tail" for loads below mrr. + :type load: float + :type mrr: float + :type spread: float + :returns: Logarithm of average number of packets lost per second. + :rtype: float + """ + # Beware, this chi has the sign opposite to the stretch function chi. + # TODO: The stretch sign is just to have less minuses. Worth changing? + chi = (mrr - load) / spread + chi0 = mrr / spread +# print "load", load, "mrr", mrr, "spread", spread, +# print "chi", chi, "chi0", chi0 + if chi >= -1.0: +# print "positive, b ~> m" + if chi > math.exp(10): + first = PLRsearch.log_xerfcx_10 + 2 * (math.log(chi) - 10) +# print "approximated" + else: + first = math.log(PLRsearch.xerfcx_limit - chi * erfcx(chi)) +# print "exact" + first -= chi * chi + second = math.log(PLRsearch.xerfcx_limit - chi * erfcx(chi0)) + second -= chi0 * chi0 + intermediate = log_minus(first, second) +# print "first", first, "second", second, +# print "intermediate", intermediate + else: +# print "negative, b ~< m" + exp_first = PLRsearch.xerfcx_limit + chi * erfcx(-chi) + exp_first *= math.exp(-chi * chi) + exp_first -= 2 * chi + # TODO: Why has the following line chi there (as opposed to chi0)? + # In general the functions would be more readable if they explicitly + # return math.log(func(chi) - func(chi0)) + # for some function "func", at least for some branches. + second = math.log(PLRsearch.xerfcx_limit - chi * erfcx(chi0)) + second -= chi0 * chi0 + intermediate = math.log(exp_first - math.exp(second)) +# print "exp_first", exp_first, "second", second, +# print "intermediate", intermediate + result = intermediate + math.log(spread) - math.log(erfc(-chi0)) +# print "lfit erf result", result + return result + + @staticmethod + def find_critical_rate(lfit_func, log_lps_target, mrr, spread): + """Given lps target and parameters, return the achieving offered load. + + This is basically an inverse function to lfit_func + when parameters are fixed. + Instead of implementing effective implementation + of the inverse function, this implementation uses + brute force binary search. + The search starts at an arbitrary offered load, + uses exponential search to find the other bound + and then bisecting until the load is found (or interval + becoming degenerate). + + TODO: Use at least some method with faster convergence. + + :param lfit_func: Fitting function, typically lfit_spread or lfit_erf. + :param log_lps_target: Fitting function should return this + at the returned load and parameters. + :param mrr: The mrr parameter for the fitting function. + :param spread: The spread parameter for the fittinmg function. + :type lfit_func: Function from 3 floats to float. + :type log_lps_target: float + :type mrr: float + :type spread: float + :returns: Load [pps] which achieves the target with given parameters. + :rtype: float + """ + # TODO: Should we make the initial rate configurable? + rate = 10000000.0 + log_loss = lfit_func(rate, mrr, spread) + if log_loss == log_lps_target: + return rate + # Exponential search part. + if log_loss > log_lps_target: + rate_hi = rate + while 1: + rate_lo = rate_hi / 2.0 + log_loss = lfit_func(rate_lo, mrr, spread) + if log_loss > log_lps_target: + rate_hi = rate_lo + continue + if log_loss == log_lps_target: + return rate_lo + break + else: + rate_lo = rate + while 1: + rate_hi = rate_lo * 2.0 + log_loss = lfit_func(rate_hi, mrr, spread) + if log_loss < log_lps_target: + rate_lo = rate_hi + continue + if log_loss == log_lps_target: + return rate_hi + break + # Binary search part. + while rate_hi != rate_lo: + rate = (rate_hi + rate_lo) / 2.0 + log_loss = lfit_func(rate, mrr, spread) + if rate == rate_hi or rate == rate_lo or log_loss == log_lps_target: +# print "found", rate + return rate + if log_loss > log_lps_target: + rate_hi = rate + else: + rate_lo = rate + + @staticmethod + def log_weight(lfit_func, trial_result_list, mrr, spread): + """Return log of weight of trial results by the function and parameters. + + Integrator assumes uniform distribution, but over different parameters. + Weight and likelihood are used interchangeably here anyway. + + Each trial has an offered load, a duration and a loss count. + Fitting function is used to compute the average loss per second. + Poisson distribution (with average loss per trial) is used + to get likelihood of one trial result, the overal likelihood is product. + As likelihoods can be extremely small, logarithms are tracked instead. + + TODO: Copy ReceiveRateMeasurement from MLRsearch. + + :param lfit_func: Fitting function, typically lfit_spread or lfit_erf. + :param result_list: List of trial measurement results. + :param mrr: The mrr parameter for the fitting function. + :param spread: The spread parameter for the fittinmg function. + :type lfit_func: Function from 3 floats to float. + :type result_list: list of MLRsearch.ReceiveRateMeasurement + :type mrr: float + :type spread: float + :returns: Logarithm of result weight for given function and parameters. + :rtype: float + """ + log_likelihood = 0.0 + for result in trial_result_list: +# print "DEBUG for tr", result.target_tr, +# print "lc", result.loss_count, "d", result.duration + log_avg_loss_per_second = lfit_func(result.target_tr, mrr, spread) + log_avg_loss_per_trial = ( + log_avg_loss_per_second + math.log(result.duration)) + # Poisson probability computation works nice for logarithms. + log_trial_likelihood = ( + result.loss_count * log_avg_loss_per_trial + - math.exp(log_avg_loss_per_trial)) + log_trial_likelihood -= math.lgamma(1 + result.loss_count) + log_likelihood += log_trial_likelihood +# print "log_avg_loss_per_second", log_avg_loss_per_second +# print "log_avg_loss_per_trial", log_avg_loss_per_trial +# print "avg_loss_per_trial", math.exp(log_avg_loss_per_trial) +# print "log_trial_likelihood", log_trial_likelihood +# print "log_likelihood", log_likelihood +# print "returning log_likelihood", log_likelihood + return log_likelihood + + # FIXME: Refactor (somehow) so pylint stops complaining about + # too many local variables. + # Some work already done in subsequent changes. + def measure_and_compute( + self, trial_duration, transmit_rate, + trial_result_list, max_rate, integrator_data): + """Perform both measurement and computation at once. + + High level steps: Prepare and launch computation worker processes, + perform the measurement, stop computation and combine results. + + Integrator needs a specific function to process (-1, 1) parameters. + As our fitting functions use dimensional parameters, + so a transformation is performed, resulting in a specific prior + distribution over the dimensional parameters. + Maximal rate (line rate) is needed for that transformation. + + Two fitting functions are used, computation is started + on temporary worker process per fitting function. After the measurement, + Average and stdev of the critical rate (not log) of each worker + are combined and returned. Raw averages are also returned, + offered load for next iteration is chosen from them. + The idea is that one fitting function might be fitting much better, + measurements at its avg are best for relevant results (for both), + but we do not know which fitting function it is. + + TODO: Define class for integrator data, so that fields are documented. + TODO: Define class for result object, so that fields are documented. + TODO: More processes to the slower fitting function? + TODO: Re-use processes, instead creating on each computation. + TODO: As only one result is needed fresh, figure out a way + how to keep the other worker running. This will alow shorter + duration per trial. Special handling at first and last measurement + will be needed (to properly initialize and to properly combine results). + + :param trial_duration: Length of the measurement in seconds. + :param transmit_rate: Offered load in packets per second. + :param trial_result_list: Results of previous measurements. + :param max_rate: Theoretic maximum of possible ofered load. + :param integrator_data: Hints to speed up the numeric computation. + :type trial_duration: float + :type transmit_rate: float + :type trial_result_list: list of MLRsearch.ReceiveRateMeasurement + :type max_rate: float + :type integrator_data: 4-tuple of gaussian positions and covariances + :returns: Measurement and computation results. + :rtype: 6-tuple: ReceiveRateMeasurement, floats, integrator data. + """ + # Preparation phase. + dimension = 2 + stretch_bias_avg, erf_bias_avg, stretch_bias_cov, erf_bias_cov = ( + integrator_data) + packet_loss_per_second_target = ( + transmit_rate * self.packet_loss_ratio_target) + def start_computing(fitting_function, bias_avg, bias_cov): + """Just a block of code to be used for each fitting function. + + Define function for integrator, create process and pipe ends, + start computation, return the boss pipe end. + + :param fitting_function: lfit_erf or lfit_stretch. + :param bias_avg: Tuple of floats to start searching around. + :param bias_cov: Covariance matrix defining initial focus shape. + :type fitting_function: Function from 3 floats to float. + :type bias_avg: 2-tuple of floats + :type bias_cov: 2-tuple of 2-tuples of floats + :returns: Boss end of communication pipe. + :rtype: multiprocessing.Connection + """ + def value_logweight_func(x_mrr, x_spread): + """Return log of critical rate and log of likelihood. + + This is a closure. The ancestor function got + trial_result_list as a parameter, and we are accessing it. + As integrator has strict conditions on function signature, + trial_result_list cannot be an explicit argument + of the current function. + This is also why we have to define this closure + at each invocation of the ancestor function anew. + + The dimensional spread parameter is the (dimensional) mrr + raised to the power of x_spread scaled to interval (0, 1). + The dimensional mrr parameter distribution has shape of + 1/(1+x^2), but x==1 corresponds to max_rate + and 1.0 pps is added to avoid numerical problems in fitting + functions. + + TODO: x^-2 (for x>1.0) might be simpler/nicer prior. + + :param x_mrr: The first dimensionless param + from (-1, 1) interval. + :param x_spread: The second dimensionless param + from (-1, 1) interval. + :returns: Log of critical rate [pps] and log of likelihood. + :rtype: 2-tuple of float + """ + mrr = max_rate * (1.0 / (x_mrr + 1.0) - 0.5) + 1.0 + spread = math.exp((x_spread + 1.0) / 2.0 * math.log(mrr)) +# print "mrr", mrr, "spread", spread + logweight = self.log_weight( + fitting_function, trial_result_list, mrr, spread) + value = math.log(self.find_critical_rate( + fitting_function, + math.log(packet_loss_per_second_target), mrr, spread)) + return value, logweight + dilled_function = dill.dumps(value_logweight_func) + boss_pipe_end, worker_pipe_end = multiprocessing.Pipe() + boss_pipe_end.send( + (dimension, dilled_function, bias_avg, bias_cov)) + worker = multiprocessing.Process( + target=Integrator.try_estimate_nd, args=(worker_pipe_end,)) + worker.daemon = True + worker.start() + return boss_pipe_end + erf_pipe = start_computing( + self.lfit_erf, erf_bias_avg, erf_bias_cov) + stretch_pipe = start_computing( + self.lfit_stretch, stretch_bias_avg, stretch_bias_cov) + # Measurement phase. + measurement = self.measurer.measure(trial_duration, transmit_rate) + # Processing phase. + erf_pipe.send(None) + stretch_pipe.send(None) + if not stretch_pipe.poll(1.0): + raise RuntimeError("Stretch worker did not finish!") + result_or_traceback = stretch_pipe.recv() + try: + (stretch_avg, stretch_stdev, stretch_bias_avg, + stretch_bias_cov, debug_list, _) = result_or_traceback + except ValueError: + raise RuntimeError( + "Stretch worker failed with the following traceback:\n{tr}" + .format(tr=result_or_traceback)) + logging.info("Logs from stretch worker:") + for message in debug_list: + logging.debug(message) + if not erf_pipe.poll(1.0): + raise RuntimeError("Erf worker did not finish!") + result_or_traceback = erf_pipe.recv() + try: + (erf_avg, erf_stdev, erf_bias_avg, + erf_bias_cov, debug_list, _) = result_or_traceback + except ValueError: + raise RuntimeError( + "Erf worker failed with the following traceback:\n{tr}" + .format(tr=result_or_traceback)) + logging.info("Logs from erf worker:") + for message in debug_list: + logging.debug(message) + avg = math.exp((stretch_avg + erf_avg) / 2.0) + var = (stretch_stdev * stretch_stdev + erf_stdev * erf_stdev) / 2.0 + var += (stretch_avg - erf_avg) * (stretch_avg - erf_avg) / 4.0 + stdev = avg * math.sqrt(var) + integrator_data = ( + stretch_bias_avg, erf_bias_avg, stretch_bias_cov, erf_bias_cov) + return ( + measurement, avg, stdev, math.exp(stretch_avg), + math.exp(erf_avg), integrator_data) diff --git a/resources/libraries/python/PLRsearch/__init__.py b/resources/libraries/python/PLRsearch/__init__.py new file mode 100644 index 0000000000..bce703803c --- /dev/null +++ b/resources/libraries/python/PLRsearch/__init__.py @@ -0,0 +1,16 @@ +# Copyright (c) 2018 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +__init__ file for Python package "PLRsearch". +""" diff --git a/resources/libraries/python/PLRsearch/log_plus.py b/resources/libraries/python/PLRsearch/log_plus.py new file mode 100644 index 0000000000..3f21cc78d7 --- /dev/null +++ b/resources/libraries/python/PLRsearch/log_plus.py @@ -0,0 +1,89 @@ +# Copyright (c) 2018 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Module holding functions for avoiding rounding underflows. + +Some applications wish to manipulate non-negative numbers +which are many orders of magnitude apart. +In those circumstances, it is useful to store +the logarithm of the intended number. + +As math.log(0.0) raises an exception (instead returning -inf or nan), +and 0.0 might be a result of computation caused only by rounding error, +functions of this module use None as -inf. + +TODO: Figure out a more performant way of handling -inf. + +The functions handle the common task of adding or substracting +two numbers where both operands and the result is given in logarithm form. +There are conditionals to make sure overflow does not happen (if possible) +during the computation.""" + +import math + + +def log_plus(first, second): + """Return logarithm of the sum of two exponentials. + + Basically math.log(math.exp(first) + math.exp(second)) + which avoids overflow and uses None as math.log(0.0). + + TODO: replace with scipy.special.logsumexp ? Test it. + + :param first: Logarithm of the first number to add (or None if zero). + :param second: Logarithm of the second number to add (or None if zero). + :type first: float + :type second: float + :returns: Logarithm of the sum (or None if zero). + :rtype: float + """ + + if first is None: + return second + if second is None: + return first + if second > first: + return second + math.log(1.0 + math.exp(first - second)) + else: + return first + math.log(1.0 + math.exp(second - first)) + + +def log_minus(first, second): + """Return logarithm of the difference of two exponentials. + + Basically math.log(math.exp(first) - math.exp(second)) + which avoids overflow and uses None as math.log(0.0). + + TODO: Support zero difference? + TODO: replace with scipy.special.logsumexp ? Test it. + + :param first: Logarithm of the number to subtract from (or None if zero). + :param second: Logarithm of the number to subtract (or None if zero). + :type first: float + :type second: float + :returns: Logarithm of the difference. + :rtype: float + :raises RuntimeError: If the difference would be non-positive. + """ + + if first is None: + raise RuntimeError("log_minus: does not suport None first") + if second is None: + return first + if second >= first: + raise RuntimeError("log_minus: first has to be bigger than second") + factor = -math.expm1(second - first) + if factor <= 0.0: + raise RuntimeError("log_minus: non-positive number to log") + else: + return first + math.log(factor) diff --git a/resources/libraries/python/TrafficGenerator.py b/resources/libraries/python/TrafficGenerator.py index b5558e663d..60915117fc 100644 --- a/resources/libraries/python/TrafficGenerator.py +++ b/resources/libraries/python/TrafficGenerator.py @@ -25,6 +25,7 @@ from .topology import Topology from .MLRsearch.AbstractMeasurer import AbstractMeasurer from .MLRsearch.MultipleLossRatioSearch import MultipleLossRatioSearch from .MLRsearch.ReceiveRateMeasurement import ReceiveRateMeasurement +from .PLRsearch.PLRsearch import PLRsearch __all__ = ['TGDropRateSearchImpl', 'TrafficGenerator', 'OptimizedSearch'] @@ -757,3 +758,46 @@ class OptimizedSearch(object): result = algorithm.narrow_down_ndr_and_pdr( minimum_transmit_rate, maximum_transmit_rate, packet_loss_ratio) return result + + @staticmethod + def perform_soak_search( + frame_size, traffic_type, minimum_transmit_rate, + maximum_transmit_rate, plr_target=1e-7, tdpt=0.2, + initial_count=50, timeout=1800.0): + """Setup initialized TG, perform soak search, return avg and stdev. + + :param frame_size: Frame size identifier or value [B]. + :param traffic_type: Module name as a traffic type identifier. + See resources/traffic_profiles/trex for implemented modules. + :param minimum_transmit_rate: Minimal bidirectional + target transmit rate [pps]. + :param maximum_transmit_rate: Maximal bidirectional + target transmit rate [pps]. + :param plr_target: Fraction of packets lost to achieve [1]. + :param tdpt: Trial duration per trial. + The algorithm linearly increases trial duration with trial number, + this is the increment between succesive trials, in seconds. + :param initial_count: Offset to apply before the first trial. + For example initial_count=50 makes first trial to be 51*tdpt long. + This is needed because initial "search" phase of integrator + takes significant time even without any trial results. + :param timeout: The search will stop after this overall time [s]. + :type frame_size: str or int + :type traffic_type: str + :type minimum_transmit_rate: float + :type maximum_transmit_rate: float + :type plr_target: float + :type initial_count: int + :type timeout: float + :returns: Average and stdev of estimated bidirectional rate giving PLR. + :rtype: 2-tuple of float + """ + tg_instance = BuiltIn().get_library_instance( + 'resources.libraries.python.TrafficGenerator') + tg_instance.set_rate_provider_defaults(frame_size, traffic_type) + algorithm = PLRsearch( + measurer=tg_instance, trial_duration_per_trial=tdpt, + packet_loss_ratio_target=plr_target, + trial_number_offset=initial_count, timeout=timeout) + result = algorithm.search(minimum_transmit_rate, maximum_transmit_rate) + return result diff --git a/resources/libraries/python/autogen/Regenerator.py b/resources/libraries/python/autogen/Regenerator.py index c0937df4e8..8bfe054e5e 100644 --- a/resources/libraries/python/autogen/Regenerator.py +++ b/resources/libraries/python/autogen/Regenerator.py @@ -101,6 +101,11 @@ class Regenerator(object): # Not supported by AVF driver. # https://git.fd.io/vpp/tree/src/plugins/avf/README.md pass + elif ("soak" in suite_id and + (kwargs["phy_cores"] != 1 + or kwargs["framesize"] in (1518, 9000, "IMIX_v4_1"))): + # Soak test take too long, do not risk other than tc01. + pass else: file_out.write(testcase.generate(num=num, **kwargs)) return num + 1 -- cgit 1.2.3-korg