diff options
Diffstat (limited to 'resources/libraries/python/PLRsearch/Integrator.py')
-rw-r--r-- | resources/libraries/python/PLRsearch/Integrator.py | 378 |
1 files changed, 378 insertions, 0 deletions
diff --git a/resources/libraries/python/PLRsearch/Integrator.py b/resources/libraries/python/PLRsearch/Integrator.py new file mode 100644 index 0000000000..f8846fdf62 --- /dev/null +++ b/resources/libraries/python/PLRsearch/Integrator.py @@ -0,0 +1,378 @@ +# Copyright (c) 2019 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Module for numerical integration, tightly coupled to PLRsearch algorithm. + +See log_plus for an explanation why None acts as a special case "float" number. + +TODO: Separate optimizations specific to PLRsearch + and distribute as a standalone package so other projects may reuse. +""" + +import math +import traceback + +import dill +import numpy + +# TODO: The preferred way to consume this code is via a pip package. +# If your project copies code instead, make sure your pylint check does not +# require these imports to be absolute and descending from your superpackage. +from log_plus import log_plus + + +def try_estimate_nd(communication_pipe, scale_coeff=10.0, trace_enabled=False): + """Call estimate_nd but catch any exception and send traceback.""" + try: + return estimate_nd(communication_pipe, scale_coeff, trace_enabled) + except BaseException: + # Any subclass could have caused estimate_nd to stop before sending, + # so we have to catch them all. + traceback_string = traceback.format_exc() + communication_pipe.send(traceback_string) + # After sendig, re-raise, so usages other than "one process per call" + # keep behaving correctly. + raise + + +# FIXME: Pylint reports multiple complexity violations. +# Refactor the code, using less (but structured) variables +# and function calls for (relatively) loosly coupled code blocks. +def estimate_nd(communication_pipe, scale_coeff=10.0, trace_enabled=False): + """Use Bayesian inference from control queue, put result to result queue. + + TODO: Use a logging framework that works in a user friendly way. + (Note that multiprocessing_logging does not work well with robot + and robotbackgroundlogger only works for threads, not processes. + Or, wait for https://github.com/robotframework/robotframework/pull/2182 + Anyway, the current implementation with trace_enabled looks ugly.) + + The result is average and standard deviation for posterior distribution + of a single dependent (positive scalar) value. + The prior is assumed to be uniform on (-1, 1) for every parameter. + Number of parameters and the function for computing + the dependent value and likelihood both come from input. + + The likelihood is assumed to be extremely uneven (but never zero), + so the function should return the logarithm of the likelihood. + The integration method is basically a Monte Carlo + (TODO: Add links to notions used here.), + but importance sampling is used in order to focus + on the part of parameter space with (relatively) non-negligible likelihood. + + Multivariate Gauss distribution is used for focusing, + so only unimodal posterior distributions are handled correctly. + Initial samples are mostly used for shaping (and shifting) + the Gaussian distribution, later samples will probably dominate. + Thus, initially the algorithm behavior resembles more "find the maximum", + as opposed to "reliably integrate". As for later iterations of PLRsearch, + it is assumed that the distribution position does not change rapidly; + thus integration algorithm returns also the distribution data, + to be used as initial focus in next iteration. + + After some number of samples (depends on dimension), + the algorithm starts tracking few most likely samples to base the Gaussian + distribution around, mixing with estimates from observed samples. + The idea is that even when (usually) one of the samples dominates, + first few are treated as if equally likely, to get reasonable focus. + During the "find the maximum" phase, this set of best samples + frequently takes a wrong shape (compared to observed samples + in equilibrium). Therefore scale_coeff argument is left for humans to tweak, + so the convergence is reliable and quick. + Any data (other than correctly weighted samples) used to keep + distribution shape reasonable is called "bias", regardles of + whether it comes from input hint, or from tracking top samples. + + Until the distribution locates itself roughly around + the maximum likeligood point, the integration results are probably wrong. + That means some minimal time is needed for the result to become reliable. + The reported standard distribution attempts to signal inconsistence + (when one sample has dominating weight compared to the rest of samples), + but some human supervision is strongly encouraged. + + To facilitate running in worker processes, arguments and results + are communicated via pipe. The computation does not start + until arguments appear in the pipe, the computation stops + when another item (stop object) is detected in the pipe + (and result is put to pipe). + + TODO: Create classes for arguments and results, + so their fields are documented (and code perhaps more readable). + + Input/argument object (received from pipe) + is a 4-tuple of the following fields: + - dimension: Integer, number of parameters to consider. + - dilled_function: Function (serialized using dill), which: + - - Takes the dimension number of float parameters from (-1, 1). + - - Returns float 2-tuple of dependent value and parameter log-likelihood. + - param_hint_avg: Dimension-tuple of floats to start searching around. + - param_hint_cov: Covariance matrix defining initial focus shape. + + Output/result object (sent to pipe queue) + is a 6-tuple of the following fields: + - value_avg: Float estimate of posterior average dependent value. + - value_stdev: Float estimate of posterior standard deviation of the value. + - param_importance_avg: Float tuple, center of Gaussian to use next. + - param_importance_cov: Float covariance matrix of the Gaussian to use next. + - debug_list: List of debug strings to log at main process. + - trace_list: List of trace strings to pass to main process if enabled. + Trace strings are very verbose, it is not recommended to enable them. + In they are not enabled, trace_list will be empty. + It is recommended to edit some lines manually to debug_list if needed. + + :param communication_pipe: Pipe to comunicate with boss process. + :param scale_coeff: Float number to tweak convergence speed with. + :param trace_enabled: Whether trace list should be populated at all. + Default: False + :type communication_pipe: multiprocessing.Connection (or compatible) + :type scale_coeff: float + :type trace_enabled: boolean + :raises OverflowError: If one sample dominates the rest too much. + Or if value_logweight_function does not handle + some part of parameter space carefully enough. + :raises numpy.linalg.LinAlgError: If the focus shape gets singular + (due to rounding errors). Try changing scale_coeff. + """ + + # Block until input object appears. + dimension, dilled_function, param_hint_avg, param_hint_cov = ( + communication_pipe.recv()) + debug_list = list() + trace_list = list() + def trace(name, value): + """ + Add a variable (name and value) to trace list (if enabled). + + This is a closure (not a pure function), + as it accesses trace_list and trace_enabled + (without any of them being an explicit argument). + + :param name: Any string identifying the value. + :param value: Any object to log repr of. + :type name: str + :type value: object + """ + if trace_enabled: + trace_list.append(name + " " + repr(value)) + value_logweight_function = dill.loads(dilled_function) + len_top = (dimension + 2) * (dimension + 1) / 2 + top_weight_param = list() + samples = 0 + log_sum_weight = None + # Importance sampling produces samples of higher weight (important) + # more frequently, and corrects that by adding weight bonus + # for the less frequently (unimportant) samples. + # But "corrected_weight" is too close to "weight" to be readable, + # so "importance" is used instead, even if it runs contrary to what + # important region is. + log_sum_importance = None + log_importance_best = None + value_avg = 0.0 + # 1x1 dimensional covariance matrix is just variance. + # As variance is never negative, we can track logarithm. + value_log_variance = None + # Here "secondary" means "excluding the weightest sample". + log_secondary_sum_importance = None + value_secondary_avg = 0.0 + value_log_secondary_variance = None + param_sampled_avg = [0.0 for first in range(dimension)] + # TODO: Examine whether we can gain speed by tracking triangle only. + # Covariance matrix can contain negative element (off-diagonal), + # so no logarithm here. This can lead to zeroes on diagonal, + # but we have biasing to make sure it does not hurt much. + param_sampled_cov = [[0.0 for first in range(dimension)] + for second in range(dimension)] + # The next two variables do NOT need to be initialized here, + # but pylint is not a mathematician enough to understand that. + param_top_avg = [0.0 for first in range(dimension)] + param_top_cov = [[0.0 for first in range(dimension)] + for second in range(dimension)] + if not (param_hint_avg and param_hint_cov): + # First call has Nones instead of useful hints. + param_hint_avg = [0.0 for first in range(dimension)] + param_hint_cov = [ + [1.0 if first == second else 0.0 for first in range(dimension)] + for second in range(dimension)] + while not communication_pipe.poll(): + # Compute focus data. + if len(top_weight_param) < len_top: + # Not enough samples for reasonable top, use hint bias. + param_focus_avg = param_hint_avg + param_focus_cov = param_hint_cov + else: + # We have both top samples and overall samples. + # Mix them according to how much the weightest sample dominates. + log_top_weight = top_weight_param[0][0] + log_weight_norm = log_plus(log_sum_weight, log_top_weight) + top_ratio = math.exp(log_top_weight - log_weight_norm) + sampled_ratio = math.exp(log_sum_weight - log_weight_norm) + trace("log_top_weight", log_top_weight) + trace("log_sum_weight", log_sum_weight) + trace("top_ratio", top_ratio) + trace("sampled_ratio", sampled_ratio) + param_focus_avg = [ + sampled_ratio * param_sampled_avg[first] + + top_ratio * param_top_avg[first] + for first in range(dimension)] + param_focus_cov = [[ + scale_coeff * ( + sampled_ratio * param_sampled_cov[first][second] + + top_ratio * param_top_cov[first][second]) + for first in range(dimension)] for second in range(dimension)] + trace("param_focus_avg", param_focus_avg) + trace("param_focus_cov", param_focus_cov) + # Generate next sample. + while 1: + # TODO: Inform pylint that correct version of numpy is available. + sample_point = numpy.random.multivariate_normal( + param_focus_avg, param_focus_cov, 1)[0] + # Multivariate Gauss can fall outside (-1, 1) interval + for first in range(dimension): + sample_coordinate = sample_point[first] + if sample_coordinate <= -1.0 or sample_coordinate >= 1.0: + break + else: # These two breaks implement "level two continue". + break + trace("sample_point", sample_point) + samples += 1 + value, log_weight = value_logweight_function(*sample_point) + trace("value", value) + trace("log_weight", log_weight) + # Update bias related statistics. + log_sum_weight = log_plus(log_sum_weight, log_weight) + if len(top_weight_param) < len_top: + top_weight_param.append((log_weight, sample_point)) + # Hack: top_weight_param[-1] is either the smallest, + # or the just appended to len_top-1 item list. + if (len(top_weight_param) >= len_top + and log_weight >= top_weight_param[-1][0]): + top_weight_param = top_weight_param[:-1] + top_weight_param.append((log_weight, sample_point)) + top_weight_param.sort(key=lambda item: -item[0]) + trace("top_weight_param", top_weight_param) + # top_weight_param has changed, recompute biases. + param_top_avg = top_weight_param[0][1] + param_top_cov = [[0.0 for first in range(dimension)] + for second in range(dimension)] + top_item_count = 1 + for _, near_top_param in top_weight_param[1:]: + top_item_count += 1 + next_item_ratio = 1.0 / top_item_count + previous_items_ratio = 1.0 - next_item_ratio + param_shift = [ + near_top_param[first] - param_top_avg[first] + for first in range(dimension)] + # Do not move center from the weightest sample. + for second in range(dimension): + for first in range(dimension): + param_top_cov[first][second] += ( + param_shift[first] * param_shift[second] + * next_item_ratio) + param_top_cov[first][second] *= previous_items_ratio + trace("param_top_avg", param_top_avg) + trace("param_top_cov", param_top_cov) + # The code above looked at weight (not importance). + # The code below looks at importance (not weight). + param_shift = [sample_point[first] - param_focus_avg[first] + for first in range(dimension)] + rarity_gradient = numpy.linalg.solve(param_focus_cov, param_shift) + rarity_step = numpy.vdot(param_shift, rarity_gradient) + log_rarity = rarity_step / 2.0 + trace("log_rarity", log_rarity) + trace("samples", samples) + log_importance = log_weight + log_rarity + trace("log_importance", log_importance) + # Update sampled statistics. + old_log_sum_importance = log_sum_importance + log_sum_importance = log_plus(old_log_sum_importance, log_importance) + trace("new log_sum_weight", log_sum_weight) + trace("log_sum_importance", log_sum_importance) + if old_log_sum_importance is None: + param_sampled_avg = list(sample_point) + value_avg = value + # Other value related quantities stay None. + continue + previous_samples_ratio = math.exp( + old_log_sum_importance - log_sum_importance) + new_sample_ratio = math.exp(log_importance - log_sum_importance) + param_shift = [sample_point[first] - param_sampled_avg[first] + for first in range(dimension)] + value_shift = value - value_avg + for first in range(dimension): + param_sampled_avg[first] += param_shift[first] * new_sample_ratio + old_value_avg = value_avg + value_avg += value_shift * new_sample_ratio + value_absolute_shift = abs(value_shift) + for second in range(dimension): + for first in range(dimension): + param_sampled_cov[first][second] += ( + param_shift[first] * param_shift[second] * new_sample_ratio) + param_sampled_cov[first][second] *= previous_samples_ratio + trace("param_sampled_avg", param_sampled_avg) + trace("param_sampled_cov", param_sampled_cov) + update_secondary_stats = True + if log_importance_best is None or log_importance > log_importance_best: + log_importance_best = log_importance + log_secondary_sum_importance = old_log_sum_importance + value_secondary_avg = old_value_avg + value_log_secondary_variance = value_log_variance + update_secondary_stats = False + # TODO: Update all primary quantities before secondary ones. + # (As opposed to current hybrid code.) + if value_absolute_shift > 0.0: + value_log_variance = log_plus( + value_log_variance, 2 * math.log(value_absolute_shift) + + log_importance - log_sum_importance) + if value_log_variance is not None: + value_log_variance -= log_sum_importance - old_log_sum_importance + if not update_secondary_stats: + continue + # TODO: Pylint says the following variable name is bad. + # Make sure the refactor uses shorter names. + old_log_secondary_sum_importance = log_secondary_sum_importance + log_secondary_sum_importance = log_plus( + old_log_secondary_sum_importance, log_importance) + if old_log_secondary_sum_importance is None: + value_secondary_avg = value + continue + new_sample_secondary_ratio = math.exp( + log_importance - log_secondary_sum_importance) + # TODO: No would-be variable named old_value_secondary_avg + # appears in subsequent computations. Probably means there is a bug. + value_secondary_shift = value - value_secondary_avg + value_secondary_absolute_shift = abs(value_secondary_shift) + value_secondary_avg += ( + value_secondary_shift * new_sample_secondary_ratio) + if value_secondary_absolute_shift > 0.0: + value_log_secondary_variance = log_plus( + value_log_secondary_variance, ( + 2 * math.log(value_secondary_absolute_shift) + + log_importance - log_secondary_sum_importance)) + if value_log_secondary_variance is not None: + value_log_secondary_variance -= ( + log_secondary_sum_importance - old_log_secondary_sum_importance) + debug_list.append("integrator used " + str(samples) + " samples") + debug_list.append( + "value_avg " + str(value_avg) + + " param_sampled_avg " + repr(param_sampled_avg) + + " param_sampled_cov " + repr(param_sampled_cov) + + " value_log_variance " + str(value_log_variance) + + " value_log_secondary_variance " + str(value_log_secondary_variance)) + value_stdev = math.exp( + (2 * value_log_variance - value_log_secondary_variance) / 2.0) + debug_list.append("top_weight_param[0] " + repr(top_weight_param[0])) + # Intentionally returning param_focus_avg and param_focus_cov, + # instead of possibly hyper-focused bias or sampled. + communication_pipe.send( + (value_avg, value_stdev, param_focus_avg, param_focus_cov, debug_list, + trace_list)) |