aboutsummaryrefslogtreecommitdiffstats
path: root/resources/libraries/python/PLRsearch
diff options
context:
space:
mode:
authorVratko Polak <vrpolak@cisco.com>2018-11-30 14:49:10 +0100
committerTibor Frank <tifrank@cisco.com>2019-01-29 08:06:17 +0000
commit27a56cad3679e4decbcca90acfb22c55a22153e0 (patch)
tree994a0b7fed07a33e5c2093393909d19e3ad71e2a /resources/libraries/python/PLRsearch
parent15648d7c4f98cc90a406519362b0d7f548893859 (diff)
PLRsearch: Initial implementation and suites
Missing bits: - Add up-to-date .rst document (in child Change). - Prepare for releasing to PyPI.org (in child Change): -- Either copy dependencies from MLRsearch, or list in requirements. -- Perhaps move common dependencies to separate package for both search to depend on. -- All the other metadata stuff. Jira: CSIT-1276 Change-Id: I277efdb63dbb90b30e11f4e30a82e2130ac8efc3 Signed-off-by: Vratko Polak <vrpolak@cisco.com>
Diffstat (limited to 'resources/libraries/python/PLRsearch')
-rw-r--r--resources/libraries/python/PLRsearch/Integrator.py378
-rw-r--r--resources/libraries/python/PLRsearch/PLRsearch.py596
-rw-r--r--resources/libraries/python/PLRsearch/__init__.py16
-rw-r--r--resources/libraries/python/PLRsearch/log_plus.py89
4 files changed, 1079 insertions, 0 deletions
diff --git a/resources/libraries/python/PLRsearch/Integrator.py b/resources/libraries/python/PLRsearch/Integrator.py
new file mode 100644
index 0000000000..f8846fdf62
--- /dev/null
+++ b/resources/libraries/python/PLRsearch/Integrator.py
@@ -0,0 +1,378 @@
+# Copyright (c) 2019 Cisco and/or its affiliates.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Module for numerical integration, tightly coupled to PLRsearch algorithm.
+
+See log_plus for an explanation why None acts as a special case "float" number.
+
+TODO: Separate optimizations specific to PLRsearch
+ and distribute as a standalone package so other projects may reuse.
+"""
+
+import math
+import traceback
+
+import dill
+import numpy
+
+# TODO: The preferred way to consume this code is via a pip package.
+# If your project copies code instead, make sure your pylint check does not
+# require these imports to be absolute and descending from your superpackage.
+from log_plus import log_plus
+
+
+def try_estimate_nd(communication_pipe, scale_coeff=10.0, trace_enabled=False):
+ """Call estimate_nd but catch any exception and send traceback."""
+ try:
+ return estimate_nd(communication_pipe, scale_coeff, trace_enabled)
+ except BaseException:
+ # Any subclass could have caused estimate_nd to stop before sending,
+ # so we have to catch them all.
+ traceback_string = traceback.format_exc()
+ communication_pipe.send(traceback_string)
+ # After sendig, re-raise, so usages other than "one process per call"
+ # keep behaving correctly.
+ raise
+
+
+# FIXME: Pylint reports multiple complexity violations.
+# Refactor the code, using less (but structured) variables
+# and function calls for (relatively) loosly coupled code blocks.
+def estimate_nd(communication_pipe, scale_coeff=10.0, trace_enabled=False):
+ """Use Bayesian inference from control queue, put result to result queue.
+
+ TODO: Use a logging framework that works in a user friendly way.
+ (Note that multiprocessing_logging does not work well with robot
+ and robotbackgroundlogger only works for threads, not processes.
+ Or, wait for https://github.com/robotframework/robotframework/pull/2182
+ Anyway, the current implementation with trace_enabled looks ugly.)
+
+ The result is average and standard deviation for posterior distribution
+ of a single dependent (positive scalar) value.
+ The prior is assumed to be uniform on (-1, 1) for every parameter.
+ Number of parameters and the function for computing
+ the dependent value and likelihood both come from input.
+
+ The likelihood is assumed to be extremely uneven (but never zero),
+ so the function should return the logarithm of the likelihood.
+ The integration method is basically a Monte Carlo
+ (TODO: Add links to notions used here.),
+ but importance sampling is used in order to focus
+ on the part of parameter space with (relatively) non-negligible likelihood.
+
+ Multivariate Gauss distribution is used for focusing,
+ so only unimodal posterior distributions are handled correctly.
+ Initial samples are mostly used for shaping (and shifting)
+ the Gaussian distribution, later samples will probably dominate.
+ Thus, initially the algorithm behavior resembles more "find the maximum",
+ as opposed to "reliably integrate". As for later iterations of PLRsearch,
+ it is assumed that the distribution position does not change rapidly;
+ thus integration algorithm returns also the distribution data,
+ to be used as initial focus in next iteration.
+
+ After some number of samples (depends on dimension),
+ the algorithm starts tracking few most likely samples to base the Gaussian
+ distribution around, mixing with estimates from observed samples.
+ The idea is that even when (usually) one of the samples dominates,
+ first few are treated as if equally likely, to get reasonable focus.
+ During the "find the maximum" phase, this set of best samples
+ frequently takes a wrong shape (compared to observed samples
+ in equilibrium). Therefore scale_coeff argument is left for humans to tweak,
+ so the convergence is reliable and quick.
+ Any data (other than correctly weighted samples) used to keep
+ distribution shape reasonable is called "bias", regardles of
+ whether it comes from input hint, or from tracking top samples.
+
+ Until the distribution locates itself roughly around
+ the maximum likeligood point, the integration results are probably wrong.
+ That means some minimal time is needed for the result to become reliable.
+ The reported standard distribution attempts to signal inconsistence
+ (when one sample has dominating weight compared to the rest of samples),
+ but some human supervision is strongly encouraged.
+
+ To facilitate running in worker processes, arguments and results
+ are communicated via pipe. The computation does not start
+ until arguments appear in the pipe, the computation stops
+ when another item (stop object) is detected in the pipe
+ (and result is put to pipe).
+
+ TODO: Create classes for arguments and results,
+ so their fields are documented (and code perhaps more readable).
+
+ Input/argument object (received from pipe)
+ is a 4-tuple of the following fields:
+ - dimension: Integer, number of parameters to consider.
+ - dilled_function: Function (serialized using dill), which:
+ - - Takes the dimension number of float parameters from (-1, 1).
+ - - Returns float 2-tuple of dependent value and parameter log-likelihood.
+ - param_hint_avg: Dimension-tuple of floats to start searching around.
+ - param_hint_cov: Covariance matrix defining initial focus shape.
+
+ Output/result object (sent to pipe queue)
+ is a 6-tuple of the following fields:
+ - value_avg: Float estimate of posterior average dependent value.
+ - value_stdev: Float estimate of posterior standard deviation of the value.
+ - param_importance_avg: Float tuple, center of Gaussian to use next.
+ - param_importance_cov: Float covariance matrix of the Gaussian to use next.
+ - debug_list: List of debug strings to log at main process.
+ - trace_list: List of trace strings to pass to main process if enabled.
+ Trace strings are very verbose, it is not recommended to enable them.
+ In they are not enabled, trace_list will be empty.
+ It is recommended to edit some lines manually to debug_list if needed.
+
+ :param communication_pipe: Pipe to comunicate with boss process.
+ :param scale_coeff: Float number to tweak convergence speed with.
+ :param trace_enabled: Whether trace list should be populated at all.
+ Default: False
+ :type communication_pipe: multiprocessing.Connection (or compatible)
+ :type scale_coeff: float
+ :type trace_enabled: boolean
+ :raises OverflowError: If one sample dominates the rest too much.
+ Or if value_logweight_function does not handle
+ some part of parameter space carefully enough.
+ :raises numpy.linalg.LinAlgError: If the focus shape gets singular
+ (due to rounding errors). Try changing scale_coeff.
+ """
+
+ # Block until input object appears.
+ dimension, dilled_function, param_hint_avg, param_hint_cov = (
+ communication_pipe.recv())
+ debug_list = list()
+ trace_list = list()
+ def trace(name, value):
+ """
+ Add a variable (name and value) to trace list (if enabled).
+
+ This is a closure (not a pure function),
+ as it accesses trace_list and trace_enabled
+ (without any of them being an explicit argument).
+
+ :param name: Any string identifying the value.
+ :param value: Any object to log repr of.
+ :type name: str
+ :type value: object
+ """
+ if trace_enabled:
+ trace_list.append(name + " " + repr(value))
+ value_logweight_function = dill.loads(dilled_function)
+ len_top = (dimension + 2) * (dimension + 1) / 2
+ top_weight_param = list()
+ samples = 0
+ log_sum_weight = None
+ # Importance sampling produces samples of higher weight (important)
+ # more frequently, and corrects that by adding weight bonus
+ # for the less frequently (unimportant) samples.
+ # But "corrected_weight" is too close to "weight" to be readable,
+ # so "importance" is used instead, even if it runs contrary to what
+ # important region is.
+ log_sum_importance = None
+ log_importance_best = None
+ value_avg = 0.0
+ # 1x1 dimensional covariance matrix is just variance.
+ # As variance is never negative, we can track logarithm.
+ value_log_variance = None
+ # Here "secondary" means "excluding the weightest sample".
+ log_secondary_sum_importance = None
+ value_secondary_avg = 0.0
+ value_log_secondary_variance = None
+ param_sampled_avg = [0.0 for first in range(dimension)]
+ # TODO: Examine whether we can gain speed by tracking triangle only.
+ # Covariance matrix can contain negative element (off-diagonal),
+ # so no logarithm here. This can lead to zeroes on diagonal,
+ # but we have biasing to make sure it does not hurt much.
+ param_sampled_cov = [[0.0 for first in range(dimension)]
+ for second in range(dimension)]
+ # The next two variables do NOT need to be initialized here,
+ # but pylint is not a mathematician enough to understand that.
+ param_top_avg = [0.0 for first in range(dimension)]
+ param_top_cov = [[0.0 for first in range(dimension)]
+ for second in range(dimension)]
+ if not (param_hint_avg and param_hint_cov):
+ # First call has Nones instead of useful hints.
+ param_hint_avg = [0.0 for first in range(dimension)]
+ param_hint_cov = [
+ [1.0 if first == second else 0.0 for first in range(dimension)]
+ for second in range(dimension)]
+ while not communication_pipe.poll():
+ # Compute focus data.
+ if len(top_weight_param) < len_top:
+ # Not enough samples for reasonable top, use hint bias.
+ param_focus_avg = param_hint_avg
+ param_focus_cov = param_hint_cov
+ else:
+ # We have both top samples and overall samples.
+ # Mix them according to how much the weightest sample dominates.
+ log_top_weight = top_weight_param[0][0]
+ log_weight_norm = log_plus(log_sum_weight, log_top_weight)
+ top_ratio = math.exp(log_top_weight - log_weight_norm)
+ sampled_ratio = math.exp(log_sum_weight - log_weight_norm)
+ trace("log_top_weight", log_top_weight)
+ trace("log_sum_weight", log_sum_weight)
+ trace("top_ratio", top_ratio)
+ trace("sampled_ratio", sampled_ratio)
+ param_focus_avg = [
+ sampled_ratio * param_sampled_avg[first]
+ + top_ratio * param_top_avg[first]
+ for first in range(dimension)]
+ param_focus_cov = [[
+ scale_coeff * (
+ sampled_ratio * param_sampled_cov[first][second]
+ + top_ratio * param_top_cov[first][second])
+ for first in range(dimension)] for second in range(dimension)]
+ trace("param_focus_avg", param_focus_avg)
+ trace("param_focus_cov", param_focus_cov)
+ # Generate next sample.
+ while 1:
+ # TODO: Inform pylint that correct version of numpy is available.
+ sample_point = numpy.random.multivariate_normal(
+ param_focus_avg, param_focus_cov, 1)[0]
+ # Multivariate Gauss can fall outside (-1, 1) interval
+ for first in range(dimension):
+ sample_coordinate = sample_point[first]
+ if sample_coordinate <= -1.0 or sample_coordinate >= 1.0:
+ break
+ else: # These two breaks implement "level two continue".
+ break
+ trace("sample_point", sample_point)
+ samples += 1
+ value, log_weight = value_logweight_function(*sample_point)
+ trace("value", value)
+ trace("log_weight", log_weight)
+ # Update bias related statistics.
+ log_sum_weight = log_plus(log_sum_weight, log_weight)
+ if len(top_weight_param) < len_top:
+ top_weight_param.append((log_weight, sample_point))
+ # Hack: top_weight_param[-1] is either the smallest,
+ # or the just appended to len_top-1 item list.
+ if (len(top_weight_param) >= len_top
+ and log_weight >= top_weight_param[-1][0]):
+ top_weight_param = top_weight_param[:-1]
+ top_weight_param.append((log_weight, sample_point))
+ top_weight_param.sort(key=lambda item: -item[0])
+ trace("top_weight_param", top_weight_param)
+ # top_weight_param has changed, recompute biases.
+ param_top_avg = top_weight_param[0][1]
+ param_top_cov = [[0.0 for first in range(dimension)]
+ for second in range(dimension)]
+ top_item_count = 1
+ for _, near_top_param in top_weight_param[1:]:
+ top_item_count += 1
+ next_item_ratio = 1.0 / top_item_count
+ previous_items_ratio = 1.0 - next_item_ratio
+ param_shift = [
+ near_top_param[first] - param_top_avg[first]
+ for first in range(dimension)]
+ # Do not move center from the weightest sample.
+ for second in range(dimension):
+ for first in range(dimension):
+ param_top_cov[first][second] += (
+ param_shift[first] * param_shift[second]
+ * next_item_ratio)
+ param_top_cov[first][second] *= previous_items_ratio
+ trace("param_top_avg", param_top_avg)
+ trace("param_top_cov", param_top_cov)
+ # The code above looked at weight (not importance).
+ # The code below looks at importance (not weight).
+ param_shift = [sample_point[first] - param_focus_avg[first]
+ for first in range(dimension)]
+ rarity_gradient = numpy.linalg.solve(param_focus_cov, param_shift)
+ rarity_step = numpy.vdot(param_shift, rarity_gradient)
+ log_rarity = rarity_step / 2.0
+ trace("log_rarity", log_rarity)
+ trace("samples", samples)
+ log_importance = log_weight + log_rarity
+ trace("log_importance", log_importance)
+ # Update sampled statistics.
+ old_log_sum_importance = log_sum_importance
+ log_sum_importance = log_plus(old_log_sum_importance, log_importance)
+ trace("new log_sum_weight", log_sum_weight)
+ trace("log_sum_importance", log_sum_importance)
+ if old_log_sum_importance is None:
+ param_sampled_avg = list(sample_point)
+ value_avg = value
+ # Other value related quantities stay None.
+ continue
+ previous_samples_ratio = math.exp(
+ old_log_sum_importance - log_sum_importance)
+ new_sample_ratio = math.exp(log_importance - log_sum_importance)
+ param_shift = [sample_point[first] - param_sampled_avg[first]
+ for first in range(dimension)]
+ value_shift = value - value_avg
+ for first in range(dimension):
+ param_sampled_avg[first] += param_shift[first] * new_sample_ratio
+ old_value_avg = value_avg
+ value_avg += value_shift * new_sample_ratio
+ value_absolute_shift = abs(value_shift)
+ for second in range(dimension):
+ for first in range(dimension):
+ param_sampled_cov[first][second] += (
+ param_shift[first] * param_shift[second] * new_sample_ratio)
+ param_sampled_cov[first][second] *= previous_samples_ratio
+ trace("param_sampled_avg", param_sampled_avg)
+ trace("param_sampled_cov", param_sampled_cov)
+ update_secondary_stats = True
+ if log_importance_best is None or log_importance > log_importance_best:
+ log_importance_best = log_importance
+ log_secondary_sum_importance = old_log_sum_importance
+ value_secondary_avg = old_value_avg
+ value_log_secondary_variance = value_log_variance
+ update_secondary_stats = False
+ # TODO: Update all primary quantities before secondary ones.
+ # (As opposed to current hybrid code.)
+ if value_absolute_shift > 0.0:
+ value_log_variance = log_plus(
+ value_log_variance, 2 * math.log(value_absolute_shift)
+ + log_importance - log_sum_importance)
+ if value_log_variance is not None:
+ value_log_variance -= log_sum_importance - old_log_sum_importance
+ if not update_secondary_stats:
+ continue
+ # TODO: Pylint says the following variable name is bad.
+ # Make sure the refactor uses shorter names.
+ old_log_secondary_sum_importance = log_secondary_sum_importance
+ log_secondary_sum_importance = log_plus(
+ old_log_secondary_sum_importance, log_importance)
+ if old_log_secondary_sum_importance is None:
+ value_secondary_avg = value
+ continue
+ new_sample_secondary_ratio = math.exp(
+ log_importance - log_secondary_sum_importance)
+ # TODO: No would-be variable named old_value_secondary_avg
+ # appears in subsequent computations. Probably means there is a bug.
+ value_secondary_shift = value - value_secondary_avg
+ value_secondary_absolute_shift = abs(value_secondary_shift)
+ value_secondary_avg += (
+ value_secondary_shift * new_sample_secondary_ratio)
+ if value_secondary_absolute_shift > 0.0:
+ value_log_secondary_variance = log_plus(
+ value_log_secondary_variance, (
+ 2 * math.log(value_secondary_absolute_shift)
+ + log_importance - log_secondary_sum_importance))
+ if value_log_secondary_variance is not None:
+ value_log_secondary_variance -= (
+ log_secondary_sum_importance - old_log_secondary_sum_importance)
+ debug_list.append("integrator used " + str(samples) + " samples")
+ debug_list.append(
+ "value_avg " + str(value_avg)
+ + " param_sampled_avg " + repr(param_sampled_avg)
+ + " param_sampled_cov " + repr(param_sampled_cov)
+ + " value_log_variance " + str(value_log_variance)
+ + " value_log_secondary_variance " + str(value_log_secondary_variance))
+ value_stdev = math.exp(
+ (2 * value_log_variance - value_log_secondary_variance) / 2.0)
+ debug_list.append("top_weight_param[0] " + repr(top_weight_param[0]))
+ # Intentionally returning param_focus_avg and param_focus_cov,
+ # instead of possibly hyper-focused bias or sampled.
+ communication_pipe.send(
+ (value_avg, value_stdev, param_focus_avg, param_focus_cov, debug_list,
+ trace_list))
diff --git a/resources/libraries/python/PLRsearch/PLRsearch.py b/resources/libraries/python/PLRsearch/PLRsearch.py
new file mode 100644
index 0000000000..f1b3f740c2
--- /dev/null
+++ b/resources/libraries/python/PLRsearch/PLRsearch.py
@@ -0,0 +1,596 @@
+# Copyright (c) 2019 Cisco and/or its affiliates.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Module holding PLRsearch class."""
+
+import logging
+import math
+import multiprocessing
+import time
+
+import dill
+# TODO: Inform pylint about scipy (of correct version) being available.
+from scipy.special import erfcx, erfc
+
+# TODO: The preferred way to consume this code is via a pip package.
+# If your project copies code instead, make sure your pylint check does not
+# require these imports to be absolute and descending from your superpackage.
+import Integrator
+from log_plus import log_plus, log_minus
+
+
+class PLRsearch(object):
+ """A class to encapsulate data relevant for search method.
+
+ The context is performance testing of packet processing systems.
+ The system, when being offered a steady stream of packets,
+ can process some of them successfully, other are considered "lost".
+
+ See docstring of the search method for algorithm description.
+
+ Two constants are stored as class fields for speed.
+
+ Method othed than search (and than __init__)
+ are just internal code structure.
+ TODO: Those method names should start with underscore then.
+
+ TODO: Figure out how to replace #print with logging
+ without slowing down too much.
+ """
+
+ xerfcx_limit = math.pow(math.acos(0), -0.5)
+ log_xerfcx_10 = math.log(xerfcx_limit - math.exp(10) * erfcx(math.exp(10)))
+
+ def __init__(
+ self, measurer, trial_duration_per_trial, packet_loss_ratio_target,
+ trial_number_offset=0, timeout=60.0):
+ """Store rate measurer and additional parameters.
+
+ Also declare packet_loss_per_second_target field (float),
+ to be initialized later.
+
+ TODO: Copy AbstractMeasurer from MLRsearch.
+
+ :param measurer: The measurer to call when searching.
+ :param trial_duration_per_trial: Each trial has larger duration
+ than the previous trial. This is the increment, in seconds.
+ :param packet_loss_ratio_target: The algorithm tries to estimate
+ the offered load leading to this ratio on average.
+ Trial ratio is number of packets lost divided by packets offered.
+ :param trial_number_offset: The "first" trial number will be 1+this.
+ Use this to ensure first iterations have enough time to compute
+ reasonable estimates for later trials to use.
+ :param timeout: The search ends if it lasts more than this many seconds.
+ :type measurer: MLRsearch.AbstractMeasurer
+ :type trial_duration_per_trial: float
+ :type packet_loss_ratio_target: float
+ :type trial_number_offset: int
+ :type timeout: float
+ """
+ self.measurer = measurer
+ self.trial_duration_per_trial = trial_duration_per_trial
+ self.packet_loss_ratio_target = packet_loss_ratio_target
+ self.trial_number_offset = trial_number_offset
+ self.timeout = timeout
+
+ def search(self, min_rate, max_rate):
+ """Perform the search, return average and stdev for throughput estimate.
+
+ Considering measurer and packet_loss_ratio_target (see __init__),
+ find such an offered load (called critical load) that is expected
+ to hit the target loss ratio in the limit of very long trial duration.
+ As the system is probabilistic (and test duration is finite),
+ the critical ratio is only estimated.
+ Return the average and standard deviation of the estimate.
+
+ In principle, this algorithm performs trial measurements,
+ each with varied offered load (which is constant during the trial).
+ During each measurement, Bayesian inference is performed
+ on all the measurement results so far.
+ When timeout is up, the last estimate is returned,
+ else another trial is performed.
+
+ It is assumed that the system under test, even though not deterministic,
+ still follows the rule of large numbers. In another words,
+ any growing set of measurements at a particular offered load
+ will converge towards unique (for the given load) packet loss ratio.
+ This means there is a deterministic (but unknown) function
+ mapping the offered load to average loss ratio.
+ This function is called loss function.
+ This also assumes the average loss ratio
+ does not depend on trial duration.
+
+ The actual probability distribution of loss counts, achieving
+ the average ratio on trials of various duration
+ can be complicated (and can depend on offered load), but simply assuming
+ Poisson distribution will make the algorithm converge.
+ Binomial distribution would be more precise,
+ but Poisson is more practical, as it effectively gives
+ less information content to high ratio results.
+
+ Even when applying other assumptions on the loss function
+ (increasing function, limit zero ratio when load goes to zero,
+ global upper limit on rate of packets processed), there are still
+ too many different shapes of possible loss functions,
+ which makes full Bayesian reasoning intractable.
+
+ This implementation radically simplifies things by examining
+ only two shapes, each with finitely many (in this case just two)
+ parameters. In other words, two fitting functions
+ (each with two parameters and one argument).
+ When restricting model space to one of the two fitting functions,
+ the Bayesian inference becomes tractable (even though it needs
+ numerical integration from Integrator class).
+
+ The first measurement is done at min_rate to help with convergence
+ if max_rate measurements give loss below target.
+ FIXME: Fix overflow error and really use min_rate.
+ The second measurement is done at max_rate, next few measurements
+ have offered load of previous load minus excess loss ratio.
+ This simple rule is found to be good when offered loads
+ so far are way above the critical rate. After few measurements,
+ inference from fitting functions converges faster that the initial
+ "optimistic" procedure.
+
+ Offered loads close to (limiting) critical rate are the most useful,
+ as linear approximation of the fitting function
+ becomes good enough there (thus reducing the impact
+ of the overall shape of fitting function).
+ After several trials, usually one of the fitting functions
+ has better predictions than the other one, but the algorithm
+ does not track that. Simply, it uses the estimate average,
+ alternating between the functions.
+
+ The returned average and stdev is a combination of the two fitting
+ estimates.
+
+ TODO: If measurement at max_rate is already below target loss rate,
+ the algorithm always measures at max_rate, and likelihood
+ no longer has single maximum, leading to "random" estimates.
+ Find a way to avoid that.
+
+ :param min_rate: Avoid measuring at offered loads below this,
+ in packets per second.
+ :param max_rate: Avoid measuring at offered loads above this,
+ in packets per second.
+ :type min_rate: float
+ :type max_rate: float
+ :returns: Average and stdev of critical load estimate.
+ :rtype: 2-tuple of floats
+ """
+ stop_time = time.time() + self.timeout
+ min_rate = float(min_rate)
+ max_rate = float(max_rate)
+ trial_result_list = list()
+ trial_number = self.trial_number_offset
+ integrator_data = (None, None, None, None)
+ message = "Trial %(number)r computed avg %(avg)r stdev %(stdev)r"
+ message += " stretch %(a1)r erf %(a2)r difference %(diff)r"
+ transmit_rate = (min_rate + max_rate) / 2.0
+ while 1:
+ trial_number += 1
+ trial_duration = trial_number * self.trial_duration_per_trial
+ results = self.measure_and_compute(
+ trial_duration, transmit_rate, trial_result_list, max_rate,
+ integrator_data)
+ measurement, average, stdev, avg1, avg2, integrator_data = results
+ logging.info(message, {
+ "number": trial_number, "avg": average, "stdev": stdev,
+ "a1": avg1, "a2": avg2, "diff": avg2 - avg1})
+ if stop_time <= time.time():
+ return average, stdev
+ trial_result_list.append(measurement)
+ if (trial_number - self.trial_number_offset) <= 1:
+ next_load = max_rate
+ elif (trial_number - self.trial_number_offset) <= 3:
+ next_load = (measurement.receive_rate
+ / (1.0 - self.packet_loss_ratio_target))
+ else:
+ next_load = avg1 if trial_number % 2 else avg2
+ transmit_rate = min(max_rate, max(min_rate, next_load))
+
+ @staticmethod
+ def lfit_stretch(load, mrr, spread):
+ """Stretch-based fitting function.
+
+ Return the logarithm of average packet loss per second
+ when the load (argument) is offered to a system with given
+ mrr and spread (parameters).
+ Stretch function is 1/(1+Exp[-x]). The average itself is definite
+ integral from zero to load, of shifted and x-scaled stretch function.
+ As the integrator is sensitive to discontinuities,
+ and it calls this function at large areas of parameter space,
+ the implementation has to avoid rounding errors, overflows,
+ and correctly approximate underflows.
+
+ TODO: Explain how the high-level description
+ has been converted into an implementation full of ifs.
+
+ :param load: Offered load (positive), in packets per second.
+ :param mrr: Parameter of this fitting function, equal to limiting
+ (positive) average number of packets received (as opposed to lost)
+ when offered load is many spreads more than mrr.
+ :param spread: The x-scaling parameter (positive). No nice semantics,
+ roughly corresponds to size of "tail" for loads below mrr.
+ :type load: float
+ :type mrr: float
+ :type spread: float
+ :returns: Logarithm of average number of packets lost per second.
+ :rtype: float
+ """
+ # TODO: chi is from https://en.wikipedia.org/wiki/Nondimensionalization
+ chi = (load - mrr) / spread
+ chi0 = -mrr / spread
+# print "load", load, "mrr", mrr, "spread", spread, "chi", chi
+ if chi > 0:
+ log_lps = math.log(
+ load - mrr + (log_plus(0, -chi) - log_plus(0, chi0)) * spread)
+# print "big loss direct log_lps", log_lps
+ else:
+ approx = (math.exp(chi) - math.exp(2 * chi) / 2) * spread
+ if approx == 0.0:
+ log_lps = chi
+# print "small loss crude log_lps", log_lps
+ return log_lps
+ third = math.exp(3 * chi) / 3 * spread
+ if approx + third != approx + 2 * third:
+ log_lps = math.log(
+ (log_plus(0, chi) - log_plus(0, chi0)) * spread)
+# print "small loss direct log_lps", log_lps
+ else:
+ log_lps = math.log(
+ approx - (math.exp(chi0) - math.exp(2 * chi0)) * spread)
+# print "small loss approx log_lps", log_lps
+ return log_lps
+
+ @staticmethod
+ def lfit_erf(load, mrr, spread):
+ """Erf-based fitting function.
+
+ Return the logarithm of average packet loss per second
+ when the load (argument) is offered to a system with given
+ mrr and spread (parameters).
+ Erf function is Primitive function to normal distribution density.
+ The average itself is definite integral from zero to load,
+ of shifted and x-scaled erf function.
+ As the integrator is sensitive to discontinuities,
+ and it calls this function at large areas of parameter space,
+ the implementation has to avoid rounding errors, overflows,
+ and correctly approximate underflows.
+
+ TODO: Explain how the high-level description
+ has been converted into an implementation full of ifs.
+
+ :param load: Offered load (positive), in packets per second.
+ :param mrr: Parameter of this fitting function, equal to limiting
+ (positive) average number of packets received (as opposed to lost)
+ when offered load is many spreads more than mrr.
+ :param spread: The x-scaling parameter (positive). No nice semantics,
+ roughly corresponds to size of "tail" for loads below mrr.
+ :type load: float
+ :type mrr: float
+ :type spread: float
+ :returns: Logarithm of average number of packets lost per second.
+ :rtype: float
+ """
+ # Beware, this chi has the sign opposite to the stretch function chi.
+ # TODO: The stretch sign is just to have less minuses. Worth changing?
+ chi = (mrr - load) / spread
+ chi0 = mrr / spread
+# print "load", load, "mrr", mrr, "spread", spread,
+# print "chi", chi, "chi0", chi0
+ if chi >= -1.0:
+# print "positive, b ~> m"
+ if chi > math.exp(10):
+ first = PLRsearch.log_xerfcx_10 + 2 * (math.log(chi) - 10)
+# print "approximated"
+ else:
+ first = math.log(PLRsearch.xerfcx_limit - chi * erfcx(chi))
+# print "exact"
+ first -= chi * chi
+ second = math.log(PLRsearch.xerfcx_limit - chi * erfcx(chi0))
+ second -= chi0 * chi0
+ intermediate = log_minus(first, second)
+# print "first", first, "second", second,
+# print "intermediate", intermediate
+ else:
+# print "negative, b ~< m"
+ exp_first = PLRsearch.xerfcx_limit + chi * erfcx(-chi)
+ exp_first *= math.exp(-chi * chi)
+ exp_first -= 2 * chi
+ # TODO: Why has the following line chi there (as opposed to chi0)?
+ # In general the functions would be more readable if they explicitly
+ # return math.log(func(chi) - func(chi0))
+ # for some function "func", at least for some branches.
+ second = math.log(PLRsearch.xerfcx_limit - chi * erfcx(chi0))
+ second -= chi0 * chi0
+ intermediate = math.log(exp_first - math.exp(second))
+# print "exp_first", exp_first, "second", second,
+# print "intermediate", intermediate
+ result = intermediate + math.log(spread) - math.log(erfc(-chi0))
+# print "lfit erf result", result
+ return result
+
+ @staticmethod
+ def find_critical_rate(lfit_func, log_lps_target, mrr, spread):
+ """Given lps target and parameters, return the achieving offered load.
+
+ This is basically an inverse function to lfit_func
+ when parameters are fixed.
+ Instead of implementing effective implementation
+ of the inverse function, this implementation uses
+ brute force binary search.
+ The search starts at an arbitrary offered load,
+ uses exponential search to find the other bound
+ and then bisecting until the load is found (or interval
+ becoming degenerate).
+
+ TODO: Use at least some method with faster convergence.
+
+ :param lfit_func: Fitting function, typically lfit_spread or lfit_erf.
+ :param log_lps_target: Fitting function should return this
+ at the returned load and parameters.
+ :param mrr: The mrr parameter for the fitting function.
+ :param spread: The spread parameter for the fittinmg function.
+ :type lfit_func: Function from 3 floats to float.
+ :type log_lps_target: float
+ :type mrr: float
+ :type spread: float
+ :returns: Load [pps] which achieves the target with given parameters.
+ :rtype: float
+ """
+ # TODO: Should we make the initial rate configurable?
+ rate = 10000000.0
+ log_loss = lfit_func(rate, mrr, spread)
+ if log_loss == log_lps_target:
+ return rate
+ # Exponential search part.
+ if log_loss > log_lps_target:
+ rate_hi = rate
+ while 1:
+ rate_lo = rate_hi / 2.0
+ log_loss = lfit_func(rate_lo, mrr, spread)
+ if log_loss > log_lps_target:
+ rate_hi = rate_lo
+ continue
+ if log_loss == log_lps_target:
+ return rate_lo
+ break
+ else:
+ rate_lo = rate
+ while 1:
+ rate_hi = rate_lo * 2.0
+ log_loss = lfit_func(rate_hi, mrr, spread)
+ if log_loss < log_lps_target:
+ rate_lo = rate_hi
+ continue
+ if log_loss == log_lps_target:
+ return rate_hi
+ break
+ # Binary search part.
+ while rate_hi != rate_lo:
+ rate = (rate_hi + rate_lo) / 2.0
+ log_loss = lfit_func(rate, mrr, spread)
+ if rate == rate_hi or rate == rate_lo or log_loss == log_lps_target:
+# print "found", rate
+ return rate
+ if log_loss > log_lps_target:
+ rate_hi = rate
+ else:
+ rate_lo = rate
+
+ @staticmethod
+ def log_weight(lfit_func, trial_result_list, mrr, spread):
+ """Return log of weight of trial results by the function and parameters.
+
+ Integrator assumes uniform distribution, but over different parameters.
+ Weight and likelihood are used interchangeably here anyway.
+
+ Each trial has an offered load, a duration and a loss count.
+ Fitting function is used to compute the average loss per second.
+ Poisson distribution (with average loss per trial) is used
+ to get likelihood of one trial result, the overal likelihood is product.
+ As likelihoods can be extremely small, logarithms are tracked instead.
+
+ TODO: Copy ReceiveRateMeasurement from MLRsearch.
+
+ :param lfit_func: Fitting function, typically lfit_spread or lfit_erf.
+ :param result_list: List of trial measurement results.
+ :param mrr: The mrr parameter for the fitting function.
+ :param spread: The spread parameter for the fittinmg function.
+ :type lfit_func: Function from 3 floats to float.
+ :type result_list: list of MLRsearch.ReceiveRateMeasurement
+ :type mrr: float
+ :type spread: float
+ :returns: Logarithm of result weight for given function and parameters.
+ :rtype: float
+ """
+ log_likelihood = 0.0
+ for result in trial_result_list:
+# print "DEBUG for tr", result.target_tr,
+# print "lc", result.loss_count, "d", result.duration
+ log_avg_loss_per_second = lfit_func(result.target_tr, mrr, spread)
+ log_avg_loss_per_trial = (
+ log_avg_loss_per_second + math.log(result.duration))
+ # Poisson probability computation works nice for logarithms.
+ log_trial_likelihood = (
+ result.loss_count * log_avg_loss_per_trial
+ - math.exp(log_avg_loss_per_trial))
+ log_trial_likelihood -= math.lgamma(1 + result.loss_count)
+ log_likelihood += log_trial_likelihood
+# print "log_avg_loss_per_second", log_avg_loss_per_second
+# print "log_avg_loss_per_trial", log_avg_loss_per_trial
+# print "avg_loss_per_trial", math.exp(log_avg_loss_per_trial)
+# print "log_trial_likelihood", log_trial_likelihood
+# print "log_likelihood", log_likelihood
+# print "returning log_likelihood", log_likelihood
+ return log_likelihood
+
+ # FIXME: Refactor (somehow) so pylint stops complaining about
+ # too many local variables.
+ # Some work already done in subsequent changes.
+ def measure_and_compute(
+ self, trial_duration, transmit_rate,
+ trial_result_list, max_rate, integrator_data):
+ """Perform both measurement and computation at once.
+
+ High level steps: Prepare and launch computation worker processes,
+ perform the measurement, stop computation and combine results.
+
+ Integrator needs a specific function to process (-1, 1) parameters.
+ As our fitting functions use dimensional parameters,
+ so a transformation is performed, resulting in a specific prior
+ distribution over the dimensional parameters.
+ Maximal rate (line rate) is needed for that transformation.
+
+ Two fitting functions are used, computation is started
+ on temporary worker process per fitting function. After the measurement,
+ Average and stdev of the critical rate (not log) of each worker
+ are combined and returned. Raw averages are also returned,
+ offered load for next iteration is chosen from them.
+ The idea is that one fitting function might be fitting much better,
+ measurements at its avg are best for relevant results (for both),
+ but we do not know which fitting function it is.
+
+ TODO: Define class for integrator data, so that fields are documented.
+ TODO: Define class for result object, so that fields are documented.
+ TODO: More processes to the slower fitting function?
+ TODO: Re-use processes, instead creating on each computation.
+ TODO: As only one result is needed fresh, figure out a way
+ how to keep the other worker running. This will alow shorter
+ duration per trial. Special handling at first and last measurement
+ will be needed (to properly initialize and to properly combine results).
+
+ :param trial_duration: Length of the measurement in seconds.
+ :param transmit_rate: Offered load in packets per second.
+ :param trial_result_list: Results of previous measurements.
+ :param max_rate: Theoretic maximum of possible ofered load.
+ :param integrator_data: Hints to speed up the numeric computation.
+ :type trial_duration: float
+ :type transmit_rate: float
+ :type trial_result_list: list of MLRsearch.ReceiveRateMeasurement
+ :type max_rate: float
+ :type integrator_data: 4-tuple of gaussian positions and covariances
+ :returns: Measurement and computation results.
+ :rtype: 6-tuple: ReceiveRateMeasurement, floats, integrator data.
+ """
+ # Preparation phase.
+ dimension = 2
+ stretch_bias_avg, erf_bias_avg, stretch_bias_cov, erf_bias_cov = (
+ integrator_data)
+ packet_loss_per_second_target = (
+ transmit_rate * self.packet_loss_ratio_target)
+ def start_computing(fitting_function, bias_avg, bias_cov):
+ """Just a block of code to be used for each fitting function.
+
+ Define function for integrator, create process and pipe ends,
+ start computation, return the boss pipe end.
+
+ :param fitting_function: lfit_erf or lfit_stretch.
+ :param bias_avg: Tuple of floats to start searching around.
+ :param bias_cov: Covariance matrix defining initial focus shape.
+ :type fitting_function: Function from 3 floats to float.
+ :type bias_avg: 2-tuple of floats
+ :type bias_cov: 2-tuple of 2-tuples of floats
+ :returns: Boss end of communication pipe.
+ :rtype: multiprocessing.Connection
+ """
+ def value_logweight_func(x_mrr, x_spread):
+ """Return log of critical rate and log of likelihood.
+
+ This is a closure. The ancestor function got
+ trial_result_list as a parameter, and we are accessing it.
+ As integrator has strict conditions on function signature,
+ trial_result_list cannot be an explicit argument
+ of the current function.
+ This is also why we have to define this closure
+ at each invocation of the ancestor function anew.
+
+ The dimensional spread parameter is the (dimensional) mrr
+ raised to the power of x_spread scaled to interval (0, 1).
+ The dimensional mrr parameter distribution has shape of
+ 1/(1+x^2), but x==1 corresponds to max_rate
+ and 1.0 pps is added to avoid numerical problems in fitting
+ functions.
+
+ TODO: x^-2 (for x>1.0) might be simpler/nicer prior.
+
+ :param x_mrr: The first dimensionless param
+ from (-1, 1) interval.
+ :param x_spread: The second dimensionless param
+ from (-1, 1) interval.
+ :returns: Log of critical rate [pps] and log of likelihood.
+ :rtype: 2-tuple of float
+ """
+ mrr = max_rate * (1.0 / (x_mrr + 1.0) - 0.5) + 1.0
+ spread = math.exp((x_spread + 1.0) / 2.0 * math.log(mrr))
+# print "mrr", mrr, "spread", spread
+ logweight = self.log_weight(
+ fitting_function, trial_result_list, mrr, spread)
+ value = math.log(self.find_critical_rate(
+ fitting_function,
+ math.log(packet_loss_per_second_target), mrr, spread))
+ return value, logweight
+ dilled_function = dill.dumps(value_logweight_func)
+ boss_pipe_end, worker_pipe_end = multiprocessing.Pipe()
+ boss_pipe_end.send(
+ (dimension, dilled_function, bias_avg, bias_cov))
+ worker = multiprocessing.Process(
+ target=Integrator.try_estimate_nd, args=(worker_pipe_end,))
+ worker.daemon = True
+ worker.start()
+ return boss_pipe_end
+ erf_pipe = start_computing(
+ self.lfit_erf, erf_bias_avg, erf_bias_cov)
+ stretch_pipe = start_computing(
+ self.lfit_stretch, stretch_bias_avg, stretch_bias_cov)
+ # Measurement phase.
+ measurement = self.measurer.measure(trial_duration, transmit_rate)
+ # Processing phase.
+ erf_pipe.send(None)
+ stretch_pipe.send(None)
+ if not stretch_pipe.poll(1.0):
+ raise RuntimeError("Stretch worker did not finish!")
+ result_or_traceback = stretch_pipe.recv()
+ try:
+ (stretch_avg, stretch_stdev, stretch_bias_avg,
+ stretch_bias_cov, debug_list, _) = result_or_traceback
+ except ValueError:
+ raise RuntimeError(
+ "Stretch worker failed with the following traceback:\n{tr}"
+ .format(tr=result_or_traceback))
+ logging.info("Logs from stretch worker:")
+ for message in debug_list:
+ logging.debug(message)
+ if not erf_pipe.poll(1.0):
+ raise RuntimeError("Erf worker did not finish!")
+ result_or_traceback = erf_pipe.recv()
+ try:
+ (erf_avg, erf_stdev, erf_bias_avg,
+ erf_bias_cov, debug_list, _) = result_or_traceback
+ except ValueError:
+ raise RuntimeError(
+ "Erf worker failed with the following traceback:\n{tr}"
+ .format(tr=result_or_traceback))
+ logging.info("Logs from erf worker:")
+ for message in debug_list:
+ logging.debug(message)
+ avg = math.exp((stretch_avg + erf_avg) / 2.0)
+ var = (stretch_stdev * stretch_stdev + erf_stdev * erf_stdev) / 2.0
+ var += (stretch_avg - erf_avg) * (stretch_avg - erf_avg) / 4.0
+ stdev = avg * math.sqrt(var)
+ integrator_data = (
+ stretch_bias_avg, erf_bias_avg, stretch_bias_cov, erf_bias_cov)
+ return (
+ measurement, avg, stdev, math.exp(stretch_avg),
+ math.exp(erf_avg), integrator_data)
diff --git a/resources/libraries/python/PLRsearch/__init__.py b/resources/libraries/python/PLRsearch/__init__.py
new file mode 100644
index 0000000000..bce703803c
--- /dev/null
+++ b/resources/libraries/python/PLRsearch/__init__.py
@@ -0,0 +1,16 @@
+# Copyright (c) 2018 Cisco and/or its affiliates.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+__init__ file for Python package "PLRsearch".
+"""
diff --git a/resources/libraries/python/PLRsearch/log_plus.py b/resources/libraries/python/PLRsearch/log_plus.py
new file mode 100644
index 0000000000..3f21cc78d7
--- /dev/null
+++ b/resources/libraries/python/PLRsearch/log_plus.py
@@ -0,0 +1,89 @@
+# Copyright (c) 2018 Cisco and/or its affiliates.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Module holding functions for avoiding rounding underflows.
+
+Some applications wish to manipulate non-negative numbers
+which are many orders of magnitude apart.
+In those circumstances, it is useful to store
+the logarithm of the intended number.
+
+As math.log(0.0) raises an exception (instead returning -inf or nan),
+and 0.0 might be a result of computation caused only by rounding error,
+functions of this module use None as -inf.
+
+TODO: Figure out a more performant way of handling -inf.
+
+The functions handle the common task of adding or substracting
+two numbers where both operands and the result is given in logarithm form.
+There are conditionals to make sure overflow does not happen (if possible)
+during the computation."""
+
+import math
+
+
+def log_plus(first, second):
+ """Return logarithm of the sum of two exponentials.
+
+ Basically math.log(math.exp(first) + math.exp(second))
+ which avoids overflow and uses None as math.log(0.0).
+
+ TODO: replace with scipy.special.logsumexp ? Test it.
+
+ :param first: Logarithm of the first number to add (or None if zero).
+ :param second: Logarithm of the second number to add (or None if zero).
+ :type first: float
+ :type second: float
+ :returns: Logarithm of the sum (or None if zero).
+ :rtype: float
+ """
+
+ if first is None:
+ return second
+ if second is None:
+ return first
+ if second > first:
+ return second + math.log(1.0 + math.exp(first - second))
+ else:
+ return first + math.log(1.0 + math.exp(second - first))
+
+
+def log_minus(first, second):
+ """Return logarithm of the difference of two exponentials.
+
+ Basically math.log(math.exp(first) - math.exp(second))
+ which avoids overflow and uses None as math.log(0.0).
+
+ TODO: Support zero difference?
+ TODO: replace with scipy.special.logsumexp ? Test it.
+
+ :param first: Logarithm of the number to subtract from (or None if zero).
+ :param second: Logarithm of the number to subtract (or None if zero).
+ :type first: float
+ :type second: float
+ :returns: Logarithm of the difference.
+ :rtype: float
+ :raises RuntimeError: If the difference would be non-positive.
+ """
+
+ if first is None:
+ raise RuntimeError("log_minus: does not suport None first")
+ if second is None:
+ return first
+ if second >= first:
+ raise RuntimeError("log_minus: first has to be bigger than second")
+ factor = -math.expm1(second - first)
+ if factor <= 0.0:
+ raise RuntimeError("log_minus: non-positive number to log")
+ else:
+ return first + math.log(factor)