diff options
Diffstat (limited to 'resources/libraries/python')
-rw-r--r-- | resources/libraries/python/PLRsearch/Integrator.py | 55 | ||||
-rw-r--r-- | resources/libraries/python/PLRsearch/PLRsearch.py | 97 | ||||
-rw-r--r-- | resources/libraries/python/PLRsearch/__init__.py | 2 | ||||
-rw-r--r-- | resources/libraries/python/PLRsearch/log_plus.py | 15 | ||||
-rw-r--r-- | resources/libraries/python/PLRsearch/stat_trackers.py | 47 |
5 files changed, 166 insertions, 50 deletions
diff --git a/resources/libraries/python/PLRsearch/Integrator.py b/resources/libraries/python/PLRsearch/Integrator.py index 035afd848c..86181eaa56 100644 --- a/resources/libraries/python/PLRsearch/Integrator.py +++ b/resources/libraries/python/PLRsearch/Integrator.py @@ -28,13 +28,31 @@ from numpy import random # TODO: Teach FD.io CSIT to use multiple dirs in PYTHONPATH, # then switch to absolute imports within PLRsearch package. # Current usage of relative imports is just a short term workaround. -import stat_trackers # pylint: disable=relative-import +from . import stat_trackers def try_estimate_nd(communication_pipe, scale_coeff=8.0, trace_enabled=False): - """Call estimate_nd but catch any exception and send traceback.""" + """Call estimate_nd but catch any exception and send traceback. + + This function does not return anything, computation result + is sent via the communication pipe instead. + + TODO: Move scale_coeff to a field of data class + with constructor/factory hiding the default value, + and receive its instance via pipe, instead of argument. + + :param communication_pipe: Endpoint for communication with parent process. + :param scale_coeff: Float number to tweak convergence speed with. + :param trace_enabled: Whether to emit trace level debugs. + Keeping trace disabled improves speed and saves memory. + Enable trace only when debugging the computation itself. + :type communication_pipe: multiprocessing.Connection + :type scale_coeff: float + :type trace_enabled: bool + :raises BaseException: Anything raised by interpreter or estimate_nd. + """ try: - return estimate_nd(communication_pipe, scale_coeff, trace_enabled) + estimate_nd(communication_pipe, scale_coeff, trace_enabled) except BaseException: # Any subclass could have caused estimate_nd to stop before sending, # so we have to catch them all. @@ -46,7 +64,22 @@ def try_estimate_nd(communication_pipe, scale_coeff=8.0, trace_enabled=False): def generate_sample(averages, covariance_matrix, dimension, scale_coeff): - """Generate next sample for estimate_nd""" + """Generate next sample for estimate_nd. + + Arguments control the multivariate normal "focus". + Keep generating until the sample point fits into unit area. + + :param averages: Coordinates of the focus center. + :param covariance_matrix: Matrix controlling the spread around the average. + :param dimension: If N is dimension, average is N vector and matrix is NxN. + :param scale_coeff: Coefficient to conformally multiply the spread. + :type averages: Indexable of N floats + :type covariance_matrix: Indexable of N indexables of N floats + :type dimension: int + :type scale_coeff: float + :returns: The generated sample point. + :rtype: N-tuple of float + """ covariance_matrix = copy.deepcopy(covariance_matrix) for first in range(dimension): for second in range(dimension): @@ -142,13 +175,12 @@ def estimate_nd(communication_pipe, scale_coeff=8.0, trace_enabled=False): In they are not enabled, trace_list will be empty. It is recommended to edit some lines manually to debug_list if needed. - :param communication_pipe: Pipe to comunicate with boss process. + :param communication_pipe: Endpoint for communication with parent process. :param scale_coeff: Float number to tweak convergence speed with. :param trace_enabled: Whether trace list should be populated at all. - Default: False - :type communication_pipe: multiprocessing.Connection (or compatible) + :type communication_pipe: multiprocessing.Connection :type scale_coeff: float - :type trace_enabled: boolean + :type trace_enabled: bool :raises OverflowError: If one sample dominates the rest too much. Or if value_logweight_function does not handle some part of parameter space carefully enough. @@ -201,10 +233,9 @@ def estimate_nd(communication_pipe, scale_coeff=8.0, trace_enabled=False): while not communication_pipe.poll(): if max_samples and samples >= max_samples: break - sample_point = generate_sample(param_focus_tracker.averages, - param_focus_tracker.covariance_matrix, - dimension, - scale_coeff) + sample_point = generate_sample( + param_focus_tracker.averages, param_focus_tracker.covariance_matrix, + dimension, scale_coeff) trace("sample_point", sample_point) samples += 1 trace("samples", samples) diff --git a/resources/libraries/python/PLRsearch/PLRsearch.py b/resources/libraries/python/PLRsearch/PLRsearch.py index 4205818d91..b7c9344391 100644 --- a/resources/libraries/python/PLRsearch/PLRsearch.py +++ b/resources/libraries/python/PLRsearch/PLRsearch.py @@ -43,6 +43,7 @@ class PLRsearch(object): Method othed than search (and than __init__) are just internal code structure. + TODO: Those method names should start with underscore then. """ @@ -162,7 +163,7 @@ class PLRsearch(object): :type min_rate: float :type max_rate: float :returns: Average and stdev of critical load estimate. - :rtype: 2-tuple of floats + :rtype: 2-tuple of float """ stop_time = time.time() + self.timeout min_rate = float(min_rate) @@ -174,7 +175,7 @@ class PLRsearch(object): focus_trackers = (None, None) transmit_rate = (min_rate + max_rate) / 2.0 lossy_loads = [max_rate] - zeros = [0, 0] # Cosecutive zero loss, separately for stretch and erf. + zeros = 0 # How many cosecutive zero loss results are happening. while 1: trial_number += 1 logging.info("Trial %(number)r", {"number": trial_number}) @@ -182,15 +183,14 @@ class PLRsearch(object): self.trial_duration_per_trial * trial_number, transmit_rate, trial_result_list, min_rate, max_rate, focus_trackers) measurement, average, stdev, avg1, avg2, focus_trackers = results - index = trial_number % 2 - zeros[index] += 1 + zeros += 1 # TODO: Ratio of fill rate to drain rate seems to have # exponential impact. Make it configurable, or is 4:3 good enough? if measurement.loss_fraction >= self.packet_loss_ratio_target: - for _ in range(4 * zeros[index]): + for _ in range(4 * zeros): lossy_loads.append(measurement.target_tr) if measurement.loss_count > 0: - zeros[index] = 0 + zeros = 0 lossy_loads.sort() if stop_time <= time.time(): return average, stdev @@ -201,20 +201,19 @@ class PLRsearch(object): next_load = (measurement.receive_rate / ( 1.0 - self.packet_loss_ratio_target)) else: - index = (trial_number + 1) % 2 - next_load = (avg1, avg2)[index] - if zeros[index] > 0: + next_load = (avg1 + avg2) / 2.0 + if zeros > 0: if lossy_loads[0] > next_load: - diminisher = math.pow(2.0, 1 - zeros[index]) + diminisher = math.pow(2.0, 1 - zeros) next_load = lossy_loads[0] + diminisher * next_load next_load /= (1.0 + diminisher) # On zero measurement, we need to drain obsoleted low losses # even if we did not use them to increase next_load, - # in order to get to usable loses with higher load. + # in order to get to usable loses at higher loads. if len(lossy_loads) > 3: lossy_loads = lossy_loads[3:] logging.debug("Zeros %(z)r orig %(o)r next %(n)r loads %(s)r", - {"z": zeros, "o": (avg1, avg2)[index], + {"z": zeros, "o": (avg1 + avg2) / 2.0, "n": next_load, "s": lossy_loads}) transmit_rate = min(max_rate, max(min_rate, next_load)) @@ -510,12 +509,12 @@ class PLRsearch(object): :type focus_trackers: 2-tuple of None or stat_trackers.VectorStatTracker :type max_samples: None or int :returns: Measurement and computation results. - :rtype: 6-tuple: ReceiveRateMeasurement, 4 floats, 2-tuple of trackers. + :rtype: _ComputeResult """ logging.debug( "measure_and_compute started with self %(self)r, trial_duration " - + "%(dur)r, transmit_rate %(tr)r, trial_result_list %(trl)r, " - + "max_rate %(mr)r, focus_trackers %(track)r, max_samples %(ms)r", + "%(dur)r, transmit_rate %(tr)r, trial_result_list %(trl)r, " + "max_rate %(mr)r, focus_trackers %(track)r, max_samples %(ms)r", {"self": self, "dur": trial_duration, "tr": transmit_rate, "trl": trial_result_list, "mr": max_rate, "track": focus_trackers, "ms": max_samples}) @@ -620,7 +619,7 @@ class PLRsearch(object): :type pipe: multiprocessing.Connection :returns: Computed value tracker, actual focus tracker, and number of samples used for this iteration. - :rtype: 3-tuple of tracker, tracker and int + :rtype: _PartialResult """ pipe.send(None) if not pipe.poll(10.0): @@ -660,23 +659,65 @@ class PLRsearch(object): @staticmethod def _get_result(measurement, stretch_result, erf_result): - """Collate results from measure_and_compute""" + """Process and collate results from measure_and_compute. + + Turn logarithm based values to exponential ones, + combine averages and stdevs of two fitting functions into a whole. + + :param measurement: The trial measurement obtained during computation. + :param stretch_result: Computation output for stretch fitting function. + :param erf_result: Computation output for erf fitting function. + :type measurement: ReceiveRateMeasurement + :type stretch_result: _PartialResult + :type erf_result: _PartialResult + :returns: Combined results. + :rtype: _ComputeResult + """ stretch_avg = stretch_result.value_tracker.average erf_avg = erf_result.value_tracker.average - # TODO: Take into account secondary stats. - stretch_stdev = math.exp(stretch_result.value_tracker.log_variance / 2) - erf_stdev = math.exp(erf_result.value_tracker.log_variance / 2) - avg = math.exp((stretch_avg + erf_avg) / 2.0) - var = (stretch_stdev * stretch_stdev + erf_stdev * erf_stdev) / 2.0 - var += (stretch_avg - erf_avg) * (stretch_avg - erf_avg) / 4.0 - stdev = avg * math.sqrt(var) + stretch_var = stretch_result.value_tracker.get_pessimistic_variance() + erf_var = erf_result.value_tracker.get_pessimistic_variance() + avg_log = (stretch_avg + erf_avg) / 2.0 + var_log = (stretch_var + erf_var) / 2.0 + var_log += (stretch_avg - erf_avg) * (stretch_avg - erf_avg) / 4.0 + stdev_log = math.sqrt(var_log) + low, upp = math.exp(avg_log - stdev_log), math.exp(avg_log + stdev_log) + avg = (low + upp) / 2 + stdev = avg - low trackers = (stretch_result.focus_tracker, erf_result.focus_tracker) sea = math.exp(stretch_avg) eea = math.exp(erf_avg) return _ComputeResult(measurement, avg, stdev, sea, eea, trackers) -_PartialResult = namedtuple('_PartialResult', - 'value_tracker focus_tracker samples') -_ComputeResult = namedtuple('_ComputeResult', 'measurement avg stdev ' + - 'stretch_exp_avg erf_exp_avg trackers') +# Named tuples, for multiple local variables to be passed as return value. +_PartialResult = namedtuple( + "_PartialResult", "value_tracker focus_tracker samples") +"""Two stat trackers and sample counter. + +:param value_tracker: Tracker for the value (critical load) being integrated. +:param focus_tracker: Tracker for focusing integration inputs (sample points). +:param samples: How many samples were used for the computation. +:type value_tracker: stat_trackers.ScalarDualStatTracker +:type focus_tracker: stat_trackers.VectorStatTracker +:type samples: int +""" + +_ComputeResult = namedtuple( + "_ComputeResult", + "measurement avg stdev stretch_exp_avg erf_exp_avg trackers") +"""Measurement, 4 computation result values, pair of trackers. + +:param measurement: The trial measurement result obtained during computation. +:param avg: Overall average of critical rate estimate. +:param stdev: Overall standard deviation of critical rate estimate. +:param stretch_exp_avg: Stretch fitting function estimate average exponentiated. +:param erf_exp_avg: Erf fitting function estimate average, exponentiated. +:param trackers: Pair of focus trackers to start next iteration with. +:type measurement: ReceiveRateMeasurement +:type avg: float +:type stdev: float +:type stretch_exp_avg: float +:type erf_exp_avg: float +:type trackers: 2-tuple of stat_trackers.VectorStatTracker +""" diff --git a/resources/libraries/python/PLRsearch/__init__.py b/resources/libraries/python/PLRsearch/__init__.py index bce703803c..6d1559d5f0 100644 --- a/resources/libraries/python/PLRsearch/__init__.py +++ b/resources/libraries/python/PLRsearch/__init__.py @@ -1,4 +1,4 @@ -# Copyright (c) 2018 Cisco and/or its affiliates. +# Copyright (c) 2019 Cisco and/or its affiliates. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at: diff --git a/resources/libraries/python/PLRsearch/log_plus.py b/resources/libraries/python/PLRsearch/log_plus.py index 3f21cc78d7..1c802a5599 100644 --- a/resources/libraries/python/PLRsearch/log_plus.py +++ b/resources/libraries/python/PLRsearch/log_plus.py @@ -1,4 +1,4 @@ -# Copyright (c) 2018 Cisco and/or its affiliates. +# Copyright (c) 2019 Cisco and/or its affiliates. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at: @@ -87,3 +87,16 @@ def log_minus(first, second): raise RuntimeError("log_minus: non-positive number to log") else: return first + math.log(factor) + + +def safe_exp(log_value): + """Return exponential of the argument, or zero if the argument is None. + + :param log_value: The value to exponentiate. + :type log_value: NoneType or float + :returns: The exponentiated value. + :rtype: float + """ + if log_value is None: + return 0.0 + return math.exp(log_value) diff --git a/resources/libraries/python/PLRsearch/stat_trackers.py b/resources/libraries/python/PLRsearch/stat_trackers.py index 168b09a14e..58ad98fd2e 100644 --- a/resources/libraries/python/PLRsearch/stat_trackers.py +++ b/resources/libraries/python/PLRsearch/stat_trackers.py @@ -29,7 +29,7 @@ import numpy # TODO: Teach FD.io CSIT to use multiple dirs in PYTHONPATH, # then switch to absolute imports within PLRsearch package. # Current usage of relative imports is just a short term workaround. -from log_plus import log_plus # pylint: disable=relative-import +from .log_plus import log_plus, safe_exp class ScalarStatTracker(object): @@ -59,7 +59,11 @@ class ScalarStatTracker(object): self.log_variance = log_variance def __repr__(self): - """Return string, which interpreted constructs state of self.""" + """Return string, which interpreted constructs state of self. + + :returns: Expression contructing an equivalent instance. + :rtype: str + """ return ("ScalarStatTracker(log_sum_weight={lsw!r},average={a!r}," "log_variance={lv!r})".format( lsw=self.log_sum_weight, a=self.average, @@ -168,7 +172,11 @@ class ScalarDualStatTracker(ScalarStatTracker): self.max_log_weight = max_log_weight def __repr__(self): - """Return string, which interpreted constructs state of self.""" + """Return string, which interpreted constructs state of self. + + :returns: Expression contructing an equivalent instance. + :rtype: str + """ sec = self.secondary return ( "ScalarDualStatTracker(log_sum_weight={lsw!r},average={a!r}," @@ -202,6 +210,27 @@ class ScalarDualStatTracker(ScalarStatTracker): return self + def get_pessimistic_variance(self): + """Return estimate of variance reflecting weight effects. + + Typical scenario is the primary tracker dominated by a single sample. + In worse case, secondary tracker is also dominated by + a single (but different) sample. + + Current implementation simply returns variance of average + of the two trackers, as if they were independent. + + :returns: Pessimistic estimate of variance (not stdev, no log). + :rtype: float + """ + var_primary = safe_exp(self.log_variance) + var_secondary = safe_exp(self.secondary.log_variance) + var_combined = (var_primary + var_secondary) / 2 + avg_half_diff = (self.average - self.secondary.average) / 2 + var_combined += avg_half_diff * avg_half_diff + return var_combined + + class VectorStatTracker(object): """Class for tracking multi-dimensional samples. @@ -245,7 +274,8 @@ class VectorStatTracker(object): """Return string, which interpreted constructs state of self. :returns: Expression contructing an equivalent instance. - :rtype: str""" + :rtype: str + """ return ( "VectorStatTracker(dimension={d!r},log_sum_weight={lsw!r}," "averages={a!r},covariance_matrix={cm!r})".format( @@ -262,8 +292,8 @@ class VectorStatTracker(object): :rtype: VectorStatTracker """ return VectorStatTracker( - self.dimension, self.log_sum_weight, self.averages, - self.covariance_matrix) + self.dimension, self.log_sum_weight, self.averages[:], + copy.deepcopy(self.covariance_matrix)) def reset(self): """Return state set to empty data of proper dimensionality. @@ -288,6 +318,7 @@ class VectorStatTracker(object): self.reset() for index in range(self.dimension): self.covariance_matrix[index][index] = 1.0 + return self def add_get_shift(self, vector_value, log_weight=0.0): """Return shift and update state to addition of another sample. @@ -300,8 +331,8 @@ class VectorStatTracker(object): Default: 0.0 (as log of 1.0). :type vector_value: iterable of float :type log_weight: float - :returns: Updated self. - :rtype: VectorStatTracker + :returns: Shift vector + :rtype: list of float """ dimension = self.dimension old_log_sum_weight = self.log_sum_weight |