aboutsummaryrefslogtreecommitdiffstats
path: root/resources/libraries/python
diff options
context:
space:
mode:
Diffstat (limited to 'resources/libraries/python')
-rw-r--r--resources/libraries/python/PLRsearch/Integrator.py55
-rw-r--r--resources/libraries/python/PLRsearch/PLRsearch.py97
-rw-r--r--resources/libraries/python/PLRsearch/__init__.py2
-rw-r--r--resources/libraries/python/PLRsearch/log_plus.py15
-rw-r--r--resources/libraries/python/PLRsearch/stat_trackers.py47
5 files changed, 166 insertions, 50 deletions
diff --git a/resources/libraries/python/PLRsearch/Integrator.py b/resources/libraries/python/PLRsearch/Integrator.py
index 035afd848c..86181eaa56 100644
--- a/resources/libraries/python/PLRsearch/Integrator.py
+++ b/resources/libraries/python/PLRsearch/Integrator.py
@@ -28,13 +28,31 @@ from numpy import random
# TODO: Teach FD.io CSIT to use multiple dirs in PYTHONPATH,
# then switch to absolute imports within PLRsearch package.
# Current usage of relative imports is just a short term workaround.
-import stat_trackers # pylint: disable=relative-import
+from . import stat_trackers
def try_estimate_nd(communication_pipe, scale_coeff=8.0, trace_enabled=False):
- """Call estimate_nd but catch any exception and send traceback."""
+ """Call estimate_nd but catch any exception and send traceback.
+
+ This function does not return anything, computation result
+ is sent via the communication pipe instead.
+
+ TODO: Move scale_coeff to a field of data class
+ with constructor/factory hiding the default value,
+ and receive its instance via pipe, instead of argument.
+
+ :param communication_pipe: Endpoint for communication with parent process.
+ :param scale_coeff: Float number to tweak convergence speed with.
+ :param trace_enabled: Whether to emit trace level debugs.
+ Keeping trace disabled improves speed and saves memory.
+ Enable trace only when debugging the computation itself.
+ :type communication_pipe: multiprocessing.Connection
+ :type scale_coeff: float
+ :type trace_enabled: bool
+ :raises BaseException: Anything raised by interpreter or estimate_nd.
+ """
try:
- return estimate_nd(communication_pipe, scale_coeff, trace_enabled)
+ estimate_nd(communication_pipe, scale_coeff, trace_enabled)
except BaseException:
# Any subclass could have caused estimate_nd to stop before sending,
# so we have to catch them all.
@@ -46,7 +64,22 @@ def try_estimate_nd(communication_pipe, scale_coeff=8.0, trace_enabled=False):
def generate_sample(averages, covariance_matrix, dimension, scale_coeff):
- """Generate next sample for estimate_nd"""
+ """Generate next sample for estimate_nd.
+
+ Arguments control the multivariate normal "focus".
+ Keep generating until the sample point fits into unit area.
+
+ :param averages: Coordinates of the focus center.
+ :param covariance_matrix: Matrix controlling the spread around the average.
+ :param dimension: If N is dimension, average is N vector and matrix is NxN.
+ :param scale_coeff: Coefficient to conformally multiply the spread.
+ :type averages: Indexable of N floats
+ :type covariance_matrix: Indexable of N indexables of N floats
+ :type dimension: int
+ :type scale_coeff: float
+ :returns: The generated sample point.
+ :rtype: N-tuple of float
+ """
covariance_matrix = copy.deepcopy(covariance_matrix)
for first in range(dimension):
for second in range(dimension):
@@ -142,13 +175,12 @@ def estimate_nd(communication_pipe, scale_coeff=8.0, trace_enabled=False):
In they are not enabled, trace_list will be empty.
It is recommended to edit some lines manually to debug_list if needed.
- :param communication_pipe: Pipe to comunicate with boss process.
+ :param communication_pipe: Endpoint for communication with parent process.
:param scale_coeff: Float number to tweak convergence speed with.
:param trace_enabled: Whether trace list should be populated at all.
- Default: False
- :type communication_pipe: multiprocessing.Connection (or compatible)
+ :type communication_pipe: multiprocessing.Connection
:type scale_coeff: float
- :type trace_enabled: boolean
+ :type trace_enabled: bool
:raises OverflowError: If one sample dominates the rest too much.
Or if value_logweight_function does not handle
some part of parameter space carefully enough.
@@ -201,10 +233,9 @@ def estimate_nd(communication_pipe, scale_coeff=8.0, trace_enabled=False):
while not communication_pipe.poll():
if max_samples and samples >= max_samples:
break
- sample_point = generate_sample(param_focus_tracker.averages,
- param_focus_tracker.covariance_matrix,
- dimension,
- scale_coeff)
+ sample_point = generate_sample(
+ param_focus_tracker.averages, param_focus_tracker.covariance_matrix,
+ dimension, scale_coeff)
trace("sample_point", sample_point)
samples += 1
trace("samples", samples)
diff --git a/resources/libraries/python/PLRsearch/PLRsearch.py b/resources/libraries/python/PLRsearch/PLRsearch.py
index 4205818d91..b7c9344391 100644
--- a/resources/libraries/python/PLRsearch/PLRsearch.py
+++ b/resources/libraries/python/PLRsearch/PLRsearch.py
@@ -43,6 +43,7 @@ class PLRsearch(object):
Method othed than search (and than __init__)
are just internal code structure.
+
TODO: Those method names should start with underscore then.
"""
@@ -162,7 +163,7 @@ class PLRsearch(object):
:type min_rate: float
:type max_rate: float
:returns: Average and stdev of critical load estimate.
- :rtype: 2-tuple of floats
+ :rtype: 2-tuple of float
"""
stop_time = time.time() + self.timeout
min_rate = float(min_rate)
@@ -174,7 +175,7 @@ class PLRsearch(object):
focus_trackers = (None, None)
transmit_rate = (min_rate + max_rate) / 2.0
lossy_loads = [max_rate]
- zeros = [0, 0] # Cosecutive zero loss, separately for stretch and erf.
+ zeros = 0 # How many cosecutive zero loss results are happening.
while 1:
trial_number += 1
logging.info("Trial %(number)r", {"number": trial_number})
@@ -182,15 +183,14 @@ class PLRsearch(object):
self.trial_duration_per_trial * trial_number, transmit_rate,
trial_result_list, min_rate, max_rate, focus_trackers)
measurement, average, stdev, avg1, avg2, focus_trackers = results
- index = trial_number % 2
- zeros[index] += 1
+ zeros += 1
# TODO: Ratio of fill rate to drain rate seems to have
# exponential impact. Make it configurable, or is 4:3 good enough?
if measurement.loss_fraction >= self.packet_loss_ratio_target:
- for _ in range(4 * zeros[index]):
+ for _ in range(4 * zeros):
lossy_loads.append(measurement.target_tr)
if measurement.loss_count > 0:
- zeros[index] = 0
+ zeros = 0
lossy_loads.sort()
if stop_time <= time.time():
return average, stdev
@@ -201,20 +201,19 @@ class PLRsearch(object):
next_load = (measurement.receive_rate / (
1.0 - self.packet_loss_ratio_target))
else:
- index = (trial_number + 1) % 2
- next_load = (avg1, avg2)[index]
- if zeros[index] > 0:
+ next_load = (avg1 + avg2) / 2.0
+ if zeros > 0:
if lossy_loads[0] > next_load:
- diminisher = math.pow(2.0, 1 - zeros[index])
+ diminisher = math.pow(2.0, 1 - zeros)
next_load = lossy_loads[0] + diminisher * next_load
next_load /= (1.0 + diminisher)
# On zero measurement, we need to drain obsoleted low losses
# even if we did not use them to increase next_load,
- # in order to get to usable loses with higher load.
+ # in order to get to usable loses at higher loads.
if len(lossy_loads) > 3:
lossy_loads = lossy_loads[3:]
logging.debug("Zeros %(z)r orig %(o)r next %(n)r loads %(s)r",
- {"z": zeros, "o": (avg1, avg2)[index],
+ {"z": zeros, "o": (avg1 + avg2) / 2.0,
"n": next_load, "s": lossy_loads})
transmit_rate = min(max_rate, max(min_rate, next_load))
@@ -510,12 +509,12 @@ class PLRsearch(object):
:type focus_trackers: 2-tuple of None or stat_trackers.VectorStatTracker
:type max_samples: None or int
:returns: Measurement and computation results.
- :rtype: 6-tuple: ReceiveRateMeasurement, 4 floats, 2-tuple of trackers.
+ :rtype: _ComputeResult
"""
logging.debug(
"measure_and_compute started with self %(self)r, trial_duration "
- + "%(dur)r, transmit_rate %(tr)r, trial_result_list %(trl)r, "
- + "max_rate %(mr)r, focus_trackers %(track)r, max_samples %(ms)r",
+ "%(dur)r, transmit_rate %(tr)r, trial_result_list %(trl)r, "
+ "max_rate %(mr)r, focus_trackers %(track)r, max_samples %(ms)r",
{"self": self, "dur": trial_duration, "tr": transmit_rate,
"trl": trial_result_list, "mr": max_rate, "track": focus_trackers,
"ms": max_samples})
@@ -620,7 +619,7 @@ class PLRsearch(object):
:type pipe: multiprocessing.Connection
:returns: Computed value tracker, actual focus tracker,
and number of samples used for this iteration.
- :rtype: 3-tuple of tracker, tracker and int
+ :rtype: _PartialResult
"""
pipe.send(None)
if not pipe.poll(10.0):
@@ -660,23 +659,65 @@ class PLRsearch(object):
@staticmethod
def _get_result(measurement, stretch_result, erf_result):
- """Collate results from measure_and_compute"""
+ """Process and collate results from measure_and_compute.
+
+ Turn logarithm based values to exponential ones,
+ combine averages and stdevs of two fitting functions into a whole.
+
+ :param measurement: The trial measurement obtained during computation.
+ :param stretch_result: Computation output for stretch fitting function.
+ :param erf_result: Computation output for erf fitting function.
+ :type measurement: ReceiveRateMeasurement
+ :type stretch_result: _PartialResult
+ :type erf_result: _PartialResult
+ :returns: Combined results.
+ :rtype: _ComputeResult
+ """
stretch_avg = stretch_result.value_tracker.average
erf_avg = erf_result.value_tracker.average
- # TODO: Take into account secondary stats.
- stretch_stdev = math.exp(stretch_result.value_tracker.log_variance / 2)
- erf_stdev = math.exp(erf_result.value_tracker.log_variance / 2)
- avg = math.exp((stretch_avg + erf_avg) / 2.0)
- var = (stretch_stdev * stretch_stdev + erf_stdev * erf_stdev) / 2.0
- var += (stretch_avg - erf_avg) * (stretch_avg - erf_avg) / 4.0
- stdev = avg * math.sqrt(var)
+ stretch_var = stretch_result.value_tracker.get_pessimistic_variance()
+ erf_var = erf_result.value_tracker.get_pessimistic_variance()
+ avg_log = (stretch_avg + erf_avg) / 2.0
+ var_log = (stretch_var + erf_var) / 2.0
+ var_log += (stretch_avg - erf_avg) * (stretch_avg - erf_avg) / 4.0
+ stdev_log = math.sqrt(var_log)
+ low, upp = math.exp(avg_log - stdev_log), math.exp(avg_log + stdev_log)
+ avg = (low + upp) / 2
+ stdev = avg - low
trackers = (stretch_result.focus_tracker, erf_result.focus_tracker)
sea = math.exp(stretch_avg)
eea = math.exp(erf_avg)
return _ComputeResult(measurement, avg, stdev, sea, eea, trackers)
-_PartialResult = namedtuple('_PartialResult',
- 'value_tracker focus_tracker samples')
-_ComputeResult = namedtuple('_ComputeResult', 'measurement avg stdev ' +
- 'stretch_exp_avg erf_exp_avg trackers')
+# Named tuples, for multiple local variables to be passed as return value.
+_PartialResult = namedtuple(
+ "_PartialResult", "value_tracker focus_tracker samples")
+"""Two stat trackers and sample counter.
+
+:param value_tracker: Tracker for the value (critical load) being integrated.
+:param focus_tracker: Tracker for focusing integration inputs (sample points).
+:param samples: How many samples were used for the computation.
+:type value_tracker: stat_trackers.ScalarDualStatTracker
+:type focus_tracker: stat_trackers.VectorStatTracker
+:type samples: int
+"""
+
+_ComputeResult = namedtuple(
+ "_ComputeResult",
+ "measurement avg stdev stretch_exp_avg erf_exp_avg trackers")
+"""Measurement, 4 computation result values, pair of trackers.
+
+:param measurement: The trial measurement result obtained during computation.
+:param avg: Overall average of critical rate estimate.
+:param stdev: Overall standard deviation of critical rate estimate.
+:param stretch_exp_avg: Stretch fitting function estimate average exponentiated.
+:param erf_exp_avg: Erf fitting function estimate average, exponentiated.
+:param trackers: Pair of focus trackers to start next iteration with.
+:type measurement: ReceiveRateMeasurement
+:type avg: float
+:type stdev: float
+:type stretch_exp_avg: float
+:type erf_exp_avg: float
+:type trackers: 2-tuple of stat_trackers.VectorStatTracker
+"""
diff --git a/resources/libraries/python/PLRsearch/__init__.py b/resources/libraries/python/PLRsearch/__init__.py
index bce703803c..6d1559d5f0 100644
--- a/resources/libraries/python/PLRsearch/__init__.py
+++ b/resources/libraries/python/PLRsearch/__init__.py
@@ -1,4 +1,4 @@
-# Copyright (c) 2018 Cisco and/or its affiliates.
+# Copyright (c) 2019 Cisco and/or its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
diff --git a/resources/libraries/python/PLRsearch/log_plus.py b/resources/libraries/python/PLRsearch/log_plus.py
index 3f21cc78d7..1c802a5599 100644
--- a/resources/libraries/python/PLRsearch/log_plus.py
+++ b/resources/libraries/python/PLRsearch/log_plus.py
@@ -1,4 +1,4 @@
-# Copyright (c) 2018 Cisco and/or its affiliates.
+# Copyright (c) 2019 Cisco and/or its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
@@ -87,3 +87,16 @@ def log_minus(first, second):
raise RuntimeError("log_minus: non-positive number to log")
else:
return first + math.log(factor)
+
+
+def safe_exp(log_value):
+ """Return exponential of the argument, or zero if the argument is None.
+
+ :param log_value: The value to exponentiate.
+ :type log_value: NoneType or float
+ :returns: The exponentiated value.
+ :rtype: float
+ """
+ if log_value is None:
+ return 0.0
+ return math.exp(log_value)
diff --git a/resources/libraries/python/PLRsearch/stat_trackers.py b/resources/libraries/python/PLRsearch/stat_trackers.py
index 168b09a14e..58ad98fd2e 100644
--- a/resources/libraries/python/PLRsearch/stat_trackers.py
+++ b/resources/libraries/python/PLRsearch/stat_trackers.py
@@ -29,7 +29,7 @@ import numpy
# TODO: Teach FD.io CSIT to use multiple dirs in PYTHONPATH,
# then switch to absolute imports within PLRsearch package.
# Current usage of relative imports is just a short term workaround.
-from log_plus import log_plus # pylint: disable=relative-import
+from .log_plus import log_plus, safe_exp
class ScalarStatTracker(object):
@@ -59,7 +59,11 @@ class ScalarStatTracker(object):
self.log_variance = log_variance
def __repr__(self):
- """Return string, which interpreted constructs state of self."""
+ """Return string, which interpreted constructs state of self.
+
+ :returns: Expression contructing an equivalent instance.
+ :rtype: str
+ """
return ("ScalarStatTracker(log_sum_weight={lsw!r},average={a!r},"
"log_variance={lv!r})".format(
lsw=self.log_sum_weight, a=self.average,
@@ -168,7 +172,11 @@ class ScalarDualStatTracker(ScalarStatTracker):
self.max_log_weight = max_log_weight
def __repr__(self):
- """Return string, which interpreted constructs state of self."""
+ """Return string, which interpreted constructs state of self.
+
+ :returns: Expression contructing an equivalent instance.
+ :rtype: str
+ """
sec = self.secondary
return (
"ScalarDualStatTracker(log_sum_weight={lsw!r},average={a!r},"
@@ -202,6 +210,27 @@ class ScalarDualStatTracker(ScalarStatTracker):
return self
+ def get_pessimistic_variance(self):
+ """Return estimate of variance reflecting weight effects.
+
+ Typical scenario is the primary tracker dominated by a single sample.
+ In worse case, secondary tracker is also dominated by
+ a single (but different) sample.
+
+ Current implementation simply returns variance of average
+ of the two trackers, as if they were independent.
+
+ :returns: Pessimistic estimate of variance (not stdev, no log).
+ :rtype: float
+ """
+ var_primary = safe_exp(self.log_variance)
+ var_secondary = safe_exp(self.secondary.log_variance)
+ var_combined = (var_primary + var_secondary) / 2
+ avg_half_diff = (self.average - self.secondary.average) / 2
+ var_combined += avg_half_diff * avg_half_diff
+ return var_combined
+
+
class VectorStatTracker(object):
"""Class for tracking multi-dimensional samples.
@@ -245,7 +274,8 @@ class VectorStatTracker(object):
"""Return string, which interpreted constructs state of self.
:returns: Expression contructing an equivalent instance.
- :rtype: str"""
+ :rtype: str
+ """
return (
"VectorStatTracker(dimension={d!r},log_sum_weight={lsw!r},"
"averages={a!r},covariance_matrix={cm!r})".format(
@@ -262,8 +292,8 @@ class VectorStatTracker(object):
:rtype: VectorStatTracker
"""
return VectorStatTracker(
- self.dimension, self.log_sum_weight, self.averages,
- self.covariance_matrix)
+ self.dimension, self.log_sum_weight, self.averages[:],
+ copy.deepcopy(self.covariance_matrix))
def reset(self):
"""Return state set to empty data of proper dimensionality.
@@ -288,6 +318,7 @@ class VectorStatTracker(object):
self.reset()
for index in range(self.dimension):
self.covariance_matrix[index][index] = 1.0
+ return self
def add_get_shift(self, vector_value, log_weight=0.0):
"""Return shift and update state to addition of another sample.
@@ -300,8 +331,8 @@ class VectorStatTracker(object):
Default: 0.0 (as log of 1.0).
:type vector_value: iterable of float
:type log_weight: float
- :returns: Updated self.
- :rtype: VectorStatTracker
+ :returns: Shift vector
+ :rtype: list of float
"""
dimension = self.dimension
old_log_sum_weight = self.log_sum_weight