diff options
Diffstat (limited to 'resources/libraries/python/PLRsearch/PLRsearch.py')
-rw-r--r-- | resources/libraries/python/PLRsearch/PLRsearch.py | 440 |
1 files changed, 255 insertions, 185 deletions
diff --git a/resources/libraries/python/PLRsearch/PLRsearch.py b/resources/libraries/python/PLRsearch/PLRsearch.py index f1b3f740c2..db870c55dc 100644 --- a/resources/libraries/python/PLRsearch/PLRsearch.py +++ b/resources/libraries/python/PLRsearch/PLRsearch.py @@ -22,15 +22,16 @@ import dill # TODO: Inform pylint about scipy (of correct version) being available. from scipy.special import erfcx, erfc -# TODO: The preferred way to consume this code is via a pip package. -# If your project copies code instead, make sure your pylint check does not -# require these imports to be absolute and descending from your superpackage. -import Integrator -from log_plus import log_plus, log_minus +# TODO: Teach FD.io CSIT to use multiple dirs in PYTHONPATH, +# then switch to absolute imports within PLRsearch package. +# Current usage of relative imports is just a short term workaround. +import Integrator # pylint: disable=relative-import +from log_plus import log_plus, log_minus # pylint: disable=relative-import +import stat_trackers # pylint: disable=relative-import class PLRsearch(object): - """A class to encapsulate data relevant for search method. + """A class to encapsulate data relevant for the search method. The context is performance testing of packet processing systems. The system, when being offered a steady stream of packets, @@ -43,9 +44,6 @@ class PLRsearch(object): Method othed than search (and than __init__) are just internal code structure. TODO: Those method names should start with underscore then. - - TODO: Figure out how to replace #print with logging - without slowing down too much. """ xerfcx_limit = math.pow(math.acos(0), -0.5) @@ -53,12 +51,9 @@ class PLRsearch(object): def __init__( self, measurer, trial_duration_per_trial, packet_loss_ratio_target, - trial_number_offset=0, timeout=60.0): + trial_number_offset=0, timeout=1800.0, trace_enabled=False): """Store rate measurer and additional parameters. - Also declare packet_loss_per_second_target field (float), - to be initialized later. - TODO: Copy AbstractMeasurer from MLRsearch. :param measurer: The measurer to call when searching. @@ -78,10 +73,11 @@ class PLRsearch(object): :type timeout: float """ self.measurer = measurer - self.trial_duration_per_trial = trial_duration_per_trial - self.packet_loss_ratio_target = packet_loss_ratio_target - self.trial_number_offset = trial_number_offset - self.timeout = timeout + self.trial_duration_per_trial = float(trial_duration_per_trial) + self.packet_loss_ratio_target = float(packet_loss_ratio_target) + self.trial_number_offset = int(trial_number_offset) + self.timeout = float(timeout) + self.trace_enabled = bool(trace_enabled) def search(self, min_rate, max_rate): """Perform the search, return average and stdev for throughput estimate. @@ -106,7 +102,7 @@ class PLRsearch(object): will converge towards unique (for the given load) packet loss ratio. This means there is a deterministic (but unknown) function mapping the offered load to average loss ratio. - This function is called loss function. + This function is called loss ratio function. This also assumes the average loss ratio does not depend on trial duration. @@ -118,7 +114,7 @@ class PLRsearch(object): but Poisson is more practical, as it effectively gives less information content to high ratio results. - Even when applying other assumptions on the loss function + Even when applying other assumptions on the loss ratio function (increasing function, limit zero ratio when load goes to zero, global upper limit on rate of packets processed), there are still too many different shapes of possible loss functions, @@ -132,14 +128,16 @@ class PLRsearch(object): the Bayesian inference becomes tractable (even though it needs numerical integration from Integrator class). - The first measurement is done at min_rate to help with convergence + The first measurement is done at the middle between + min_rate and max_rate, to help with convergence if max_rate measurements give loss below target. - FIXME: Fix overflow error and really use min_rate. + TODO: Fix overflow error and use min_rate instead of the middle. + The second measurement is done at max_rate, next few measurements - have offered load of previous load minus excess loss ratio. + have offered load of previous load minus excess loss rate. This simple rule is found to be good when offered loads so far are way above the critical rate. After few measurements, - inference from fitting functions converges faster that the initial + inference from fitting functions converges faster that this initial "optimistic" procedure. Offered loads close to (limiting) critical rate are the most useful, @@ -150,15 +148,13 @@ class PLRsearch(object): has better predictions than the other one, but the algorithm does not track that. Simply, it uses the estimate average, alternating between the functions. + Multiple workarounds are applied to try and avoid measurements + both in zero loss region and in big loss region, + as their results tend to make the critical load estimate worse. The returned average and stdev is a combination of the two fitting estimates. - TODO: If measurement at max_rate is already below target loss rate, - the algorithm always measures at max_rate, and likelihood - no longer has single maximum, leading to "random" estimates. - Find a way to avoid that. - :param min_rate: Avoid measuring at offered loads below this, in packets per second. :param max_rate: Avoid measuring at offered loads above this, @@ -171,36 +167,59 @@ class PLRsearch(object): stop_time = time.time() + self.timeout min_rate = float(min_rate) max_rate = float(max_rate) + logging.info("Started search with min_rate %(min)r, max_rate %(max)r", + {"min": min_rate, "max": max_rate}) trial_result_list = list() trial_number = self.trial_number_offset - integrator_data = (None, None, None, None) - message = "Trial %(number)r computed avg %(avg)r stdev %(stdev)r" - message += " stretch %(a1)r erf %(a2)r difference %(diff)r" + focus_trackers = (None, None) transmit_rate = (min_rate + max_rate) / 2.0 + lossy_loads = [max_rate] + zeros = [0, 0] # Cosecutive zero loss, separately for stretch and erf. while 1: trial_number += 1 - trial_duration = trial_number * self.trial_duration_per_trial + logging.info("Trial %(number)r", {"number": trial_number}) results = self.measure_and_compute( - trial_duration, transmit_rate, trial_result_list, max_rate, - integrator_data) - measurement, average, stdev, avg1, avg2, integrator_data = results - logging.info(message, { - "number": trial_number, "avg": average, "stdev": stdev, - "a1": avg1, "a2": avg2, "diff": avg2 - avg1}) + self.trial_duration_per_trial * trial_number, transmit_rate, + trial_result_list, min_rate, max_rate, focus_trackers) + measurement, average, stdev, avg1, avg2, focus_trackers = results + index = trial_number % 2 + zeros[index] += 1 + # TODO: Ratio of fill rate to drain rate seems to have + # exponential impact. Make it configurable, or is 4:3 good enough? + if measurement.loss_fraction >= self.packet_loss_ratio_target: + for _ in range(4 * zeros[index]): + lossy_loads.append(measurement.target_tr) + if measurement.loss_count > 0: + zeros[index] = 0 + lossy_loads.sort() if stop_time <= time.time(): return average, stdev trial_result_list.append(measurement) if (trial_number - self.trial_number_offset) <= 1: next_load = max_rate elif (trial_number - self.trial_number_offset) <= 3: - next_load = (measurement.receive_rate - / (1.0 - self.packet_loss_ratio_target)) + next_load = (measurement.receive_rate / ( + 1.0 - self.packet_loss_ratio_target)) else: - next_load = avg1 if trial_number % 2 else avg2 + index = (trial_number + 1) % 2 + next_load = (avg1, avg2)[index] + if zeros[index] > 0: + if lossy_loads[0] > next_load: + diminisher = math.pow(2.0, 1 - zeros[index]) + next_load = lossy_loads[0] + diminisher * next_load + next_load /= (1.0 + diminisher) + # On zero measurement, we need to drain obsoleted low losses + # even if we did not use them to increase next_load, + # in order to get to usable loses with higher load. + if len(lossy_loads) > 3: + lossy_loads = lossy_loads[3:] + logging.debug("Zeros %(z)r orig %(o)r next %(n)r loads %(s)r", + {"z": zeros, "o": (avg1, avg2)[index], + "n": next_load, "s": lossy_loads}) transmit_rate = min(max_rate, max(min_rate, next_load)) @staticmethod - def lfit_stretch(load, mrr, spread): + def lfit_stretch(trace, load, mrr, spread): """Stretch-based fitting function. Return the logarithm of average packet loss per second @@ -216,45 +235,58 @@ class PLRsearch(object): TODO: Explain how the high-level description has been converted into an implementation full of ifs. + :param trace: A multiprocessing-friendly logging function (closure). :param load: Offered load (positive), in packets per second. :param mrr: Parameter of this fitting function, equal to limiting (positive) average number of packets received (as opposed to lost) when offered load is many spreads more than mrr. :param spread: The x-scaling parameter (positive). No nice semantics, roughly corresponds to size of "tail" for loads below mrr. + :type trace: function (str, object) -> NoneType :type load: float :type mrr: float :type spread: float :returns: Logarithm of average number of packets lost per second. :rtype: float """ + # TODO: What is the fastest way to use such values? + log_2 = math.log(2) + log_3 = math.log(3) + log_spread = math.log(spread) # TODO: chi is from https://en.wikipedia.org/wiki/Nondimensionalization chi = (load - mrr) / spread chi0 = -mrr / spread -# print "load", load, "mrr", mrr, "spread", spread, "chi", chi + trace("stretch: load", load) + trace("mrr", mrr) + trace("spread", spread) + trace("chi", chi) + trace("chi0", chi0) if chi > 0: log_lps = math.log( load - mrr + (log_plus(0, -chi) - log_plus(0, chi0)) * spread) -# print "big loss direct log_lps", log_lps + trace("big loss direct log_lps", log_lps) else: - approx = (math.exp(chi) - math.exp(2 * chi) / 2) * spread - if approx == 0.0: - log_lps = chi -# print "small loss crude log_lps", log_lps + two_positive = log_plus(chi, 2 * chi0 - log_2) + two_negative = log_plus(chi0, 2 * chi - log_2) + if two_positive <= two_negative: + log_lps = log_minus(chi, chi0) + log_spread + trace("small loss crude log_lps", log_lps) return log_lps - third = math.exp(3 * chi) / 3 * spread - if approx + third != approx + 2 * third: - log_lps = math.log( - (log_plus(0, chi) - log_plus(0, chi0)) * spread) -# print "small loss direct log_lps", log_lps + two = log_minus(two_positive, two_negative) + three_positive = log_plus(two_positive, 3 * chi - log_3) + three_negative = log_plus(two_negative, 3 * chi0 - log_3) + three = log_minus(three_positive, three_negative) + if two == three: + log_lps = two + log_spread + trace("small loss approx log_lps", log_lps) else: - log_lps = math.log( - approx - (math.exp(chi0) - math.exp(2 * chi0)) * spread) -# print "small loss approx log_lps", log_lps + log_lps = math.log(log_plus(0, chi) - log_plus(0, chi0)) + log_lps += log_spread + trace("small loss direct log_lps", log_lps) return log_lps @staticmethod - def lfit_erf(load, mrr, spread): + def lfit_erf(trace, load, mrr, spread): """Erf-based fitting function. Return the logarithm of average packet loss per second @@ -271,12 +303,14 @@ class PLRsearch(object): TODO: Explain how the high-level description has been converted into an implementation full of ifs. + :param trace: A multiprocessing-friendly logging function (closure). :param load: Offered load (positive), in packets per second. :param mrr: Parameter of this fitting function, equal to limiting (positive) average number of packets received (as opposed to lost) when offered load is many spreads more than mrr. :param spread: The x-scaling parameter (positive). No nice semantics, roughly corresponds to size of "tail" for loads below mrr. + :type trace: function (str, object) -> NoneType :type load: float :type mrr: float :type spread: float @@ -287,24 +321,26 @@ class PLRsearch(object): # TODO: The stretch sign is just to have less minuses. Worth changing? chi = (mrr - load) / spread chi0 = mrr / spread -# print "load", load, "mrr", mrr, "spread", spread, -# print "chi", chi, "chi0", chi0 + trace("Erf: load", load) + trace("mrr", mrr) + trace("spread", spread) + trace("chi", chi) + trace("chi0", chi0) if chi >= -1.0: -# print "positive, b ~> m" + trace("positive, b roughly bigger than m", None) if chi > math.exp(10): first = PLRsearch.log_xerfcx_10 + 2 * (math.log(chi) - 10) -# print "approximated" + trace("approximated first", first) else: first = math.log(PLRsearch.xerfcx_limit - chi * erfcx(chi)) -# print "exact" + trace("exact first", first) first -= chi * chi second = math.log(PLRsearch.xerfcx_limit - chi * erfcx(chi0)) second -= chi0 * chi0 intermediate = log_minus(first, second) -# print "first", first, "second", second, -# print "intermediate", intermediate + trace("first", first) else: -# print "negative, b ~< m" + trace("negative, b roughly smaller than m", None) exp_first = PLRsearch.xerfcx_limit + chi * erfcx(-chi) exp_first *= math.exp(-chi * chi) exp_first -= 2 * chi @@ -315,82 +351,68 @@ class PLRsearch(object): second = math.log(PLRsearch.xerfcx_limit - chi * erfcx(chi0)) second -= chi0 * chi0 intermediate = math.log(exp_first - math.exp(second)) -# print "exp_first", exp_first, "second", second, -# print "intermediate", intermediate + trace("exp_first", exp_first) + trace("second", second) + trace("intermediate", intermediate) result = intermediate + math.log(spread) - math.log(erfc(-chi0)) -# print "lfit erf result", result + trace("result", result) return result @staticmethod - def find_critical_rate(lfit_func, log_lps_target, mrr, spread): - """Given lps target and parameters, return the achieving offered load. + def find_critical_rate( + trace, lfit_func, min_rate, max_rate, loss_ratio_target, + mrr, spread): + """Given ratio target and parameters, return the achieving offered load. This is basically an inverse function to lfit_func when parameters are fixed. Instead of implementing effective implementation of the inverse function, this implementation uses - brute force binary search. - The search starts at an arbitrary offered load, - uses exponential search to find the other bound - and then bisecting until the load is found (or interval - becoming degenerate). + brute force binary search. It is bisecting (nim_rate, max_rate) interval + until the critical load is found (or interval becomes degenerate). + This implementation assures min and max rate limits are honored. - TODO: Use at least some method with faster convergence. + TODO: Use some method with faster convergence? + :param trace: A multiprocessing-friendly logging function (closure). :param lfit_func: Fitting function, typically lfit_spread or lfit_erf. - :param log_lps_target: Fitting function should return this - at the returned load and parameters. - :param mrr: The mrr parameter for the fitting function. - :param spread: The spread parameter for the fittinmg function. + :param min_rate: Lower bound for binary search [pps]. + :param max_rate: Upper bound for binary search [pps]. + :param loss_ratio_target: Fitting function should return loss rate + giving this ratio at the returned load and parameters [1]. + :param mrr: The mrr parameter for the fitting function [pps]. + :param spread: The spread parameter for the fittinmg function [pps]. + :type trace: function (str, object) -> None :type lfit_func: Function from 3 floats to float. + :type min_rate: float + :type max_rate: float :type log_lps_target: float :type mrr: float :type spread: float :returns: Load [pps] which achieves the target with given parameters. :rtype: float """ - # TODO: Should we make the initial rate configurable? - rate = 10000000.0 - log_loss = lfit_func(rate, mrr, spread) - if log_loss == log_lps_target: - return rate - # Exponential search part. - if log_loss > log_lps_target: - rate_hi = rate - while 1: - rate_lo = rate_hi / 2.0 - log_loss = lfit_func(rate_lo, mrr, spread) - if log_loss > log_lps_target: - rate_hi = rate_lo - continue - if log_loss == log_lps_target: - return rate_lo - break - else: - rate_lo = rate - while 1: - rate_hi = rate_lo * 2.0 - log_loss = lfit_func(rate_hi, mrr, spread) - if log_loss < log_lps_target: - rate_lo = rate_hi - continue - if log_loss == log_lps_target: - return rate_hi - break - # Binary search part. - while rate_hi != rate_lo: + trace("Finding critical rate for loss_ratio_target", loss_ratio_target) + rate_lo = min_rate + rate_hi = max_rate + loss_ratio = -1 + while loss_ratio != loss_ratio_target: rate = (rate_hi + rate_lo) / 2.0 - log_loss = lfit_func(rate, mrr, spread) - if rate == rate_hi or rate == rate_lo or log_loss == log_lps_target: -# print "found", rate - return rate - if log_loss > log_lps_target: + if rate == rate_hi or rate == rate_lo: + break + loss_rate = math.exp(lfit_func(trace, rate, mrr, spread)) + loss_ratio = loss_rate / rate + if loss_ratio > loss_ratio_target: + trace("halving down", rate) rate_hi = rate - else: + elif loss_ratio < loss_ratio_target: + trace("halving up", rate) rate_lo = rate + trace("found", rate) + return rate @staticmethod - def log_weight(lfit_func, trial_result_list, mrr, spread): + def log_weight(trace, lfit_func, trial_result_list, mrr, spread): """Return log of weight of trial results by the function and parameters. Integrator assumes uniform distribution, but over different parameters. @@ -399,15 +421,18 @@ class PLRsearch(object): Each trial has an offered load, a duration and a loss count. Fitting function is used to compute the average loss per second. Poisson distribution (with average loss per trial) is used - to get likelihood of one trial result, the overal likelihood is product. + to get likelihood of one trial result, the overal likelihood + is a product of all trial likelihoods. As likelihoods can be extremely small, logarithms are tracked instead. TODO: Copy ReceiveRateMeasurement from MLRsearch. + :param trace: A multiprocessing-friendly logging function (closure). :param lfit_func: Fitting function, typically lfit_spread or lfit_erf. :param result_list: List of trial measurement results. :param mrr: The mrr parameter for the fitting function. :param spread: The spread parameter for the fittinmg function. + :type trace: function (str, object) -> None :type lfit_func: Function from 3 floats to float. :type result_list: list of MLRsearch.ReceiveRateMeasurement :type mrr: float @@ -416,10 +441,14 @@ class PLRsearch(object): :rtype: float """ log_likelihood = 0.0 + trace("log_weight for mrr", mrr) + trace("spread", spread) for result in trial_result_list: -# print "DEBUG for tr", result.target_tr, -# print "lc", result.loss_count, "d", result.duration - log_avg_loss_per_second = lfit_func(result.target_tr, mrr, spread) + trace("for tr", result.target_tr) + trace("lc", result.loss_count) + trace("d", result.duration) + log_avg_loss_per_second = lfit_func( + trace, result.target_tr, mrr, spread) log_avg_loss_per_trial = ( log_avg_loss_per_second + math.log(result.duration)) # Poisson probability computation works nice for logarithms. @@ -428,20 +457,15 @@ class PLRsearch(object): - math.exp(log_avg_loss_per_trial)) log_trial_likelihood -= math.lgamma(1 + result.loss_count) log_likelihood += log_trial_likelihood -# print "log_avg_loss_per_second", log_avg_loss_per_second -# print "log_avg_loss_per_trial", log_avg_loss_per_trial -# print "avg_loss_per_trial", math.exp(log_avg_loss_per_trial) -# print "log_trial_likelihood", log_trial_likelihood -# print "log_likelihood", log_likelihood -# print "returning log_likelihood", log_likelihood + trace("avg_loss_per_trial", math.exp(log_avg_loss_per_trial)) + trace("log_trial_likelihood", log_trial_likelihood) return log_likelihood - # FIXME: Refactor (somehow) so pylint stops complaining about + # TODO: Refactor (somehow) so pylint stops complaining about # too many local variables. - # Some work already done in subsequent changes. def measure_and_compute( - self, trial_duration, transmit_rate, - trial_result_list, max_rate, integrator_data): + self, trial_duration, transmit_rate, trial_result_list, + min_rate, max_rate, focus_trackers=(None, None), max_samples=None): """Perform both measurement and computation at once. High level steps: Prepare and launch computation worker processes, @@ -455,17 +479,18 @@ class PLRsearch(object): Two fitting functions are used, computation is started on temporary worker process per fitting function. After the measurement, - Average and stdev of the critical rate (not log) of each worker + average and stdev of the critical rate (not log) of each worker are combined and returned. Raw averages are also returned, - offered load for next iteration is chosen from them. + offered load for next iteration is chosen based on them. The idea is that one fitting function might be fitting much better, measurements at its avg are best for relevant results (for both), but we do not know which fitting function it is. - TODO: Define class for integrator data, so that fields are documented. + Focus trackers are updated in-place. If a focus tracker in None, + new instance is created. + TODO: Define class for result object, so that fields are documented. - TODO: More processes to the slower fitting function? - TODO: Re-use processes, instead creating on each computation. + TODO: Re-use processes, instead creating on each computation? TODO: As only one result is needed fresh, figure out a way how to keep the other worker running. This will alow shorter duration per trial. Special handling at first and last measurement @@ -474,23 +499,39 @@ class PLRsearch(object): :param trial_duration: Length of the measurement in seconds. :param transmit_rate: Offered load in packets per second. :param trial_result_list: Results of previous measurements. - :param max_rate: Theoretic maximum of possible ofered load. - :param integrator_data: Hints to speed up the numeric computation. + :param min_rate: Practical minimum of possible ofered load. + :param max_rate: Practical maximum of possible ofered load. + :param focus_trackers: Pair of trackers initialized + to speed up the numeric computation. + :param max_samples: Limit for integrator samples, for debugging. :type trial_duration: float :type transmit_rate: float :type trial_result_list: list of MLRsearch.ReceiveRateMeasurement + :type min_rate: float :type max_rate: float - :type integrator_data: 4-tuple of gaussian positions and covariances + :type focus_trackers: 2-tuple of None or stat_trackers.VectorStatTracker + :type max_samples: None or int :returns: Measurement and computation results. - :rtype: 6-tuple: ReceiveRateMeasurement, floats, integrator data. + :rtype: 6-tuple: ReceiveRateMeasurement, 4 floats, 2-tuple of trackers. """ + logging.debug( + "measure_and_compute started with self %(self)r, trial_duration " + + "%(dur)r, transmit_rate %(tr)r, trial_result_list %(trl)r, " + + "max_rate %(mr)r, focus_trackers %(track)r, max_samples %(ms)r", + {"self": self, "dur": trial_duration, "tr": transmit_rate, + "trl": trial_result_list, "mr": max_rate, "track": focus_trackers, + "ms": max_samples}) # Preparation phase. dimension = 2 - stretch_bias_avg, erf_bias_avg, stretch_bias_cov, erf_bias_cov = ( - integrator_data) - packet_loss_per_second_target = ( - transmit_rate * self.packet_loss_ratio_target) - def start_computing(fitting_function, bias_avg, bias_cov): + stretch_focus_tracker, erf_focus_tracker = focus_trackers + if stretch_focus_tracker is None: + stretch_focus_tracker = stat_trackers.VectorStatTracker(dimension) + stretch_focus_tracker.unit_reset() + if erf_focus_tracker is None: + erf_focus_tracker = stat_trackers.VectorStatTracker(dimension) + erf_focus_tracker.unit_reset() + old_trackers = stretch_focus_tracker.copy(), erf_focus_tracker.copy() + def start_computing(fitting_function, focus_tracker): """Just a block of code to be used for each fitting function. Define function for integrator, create process and pipe ends, @@ -505,7 +546,7 @@ class PLRsearch(object): :returns: Boss end of communication pipe. :rtype: multiprocessing.Connection """ - def value_logweight_func(x_mrr, x_spread): + def value_logweight_func(trace, x_mrr, x_spread): """Return log of critical rate and log of likelihood. This is a closure. The ancestor function got @@ -525,72 +566,101 @@ class PLRsearch(object): TODO: x^-2 (for x>1.0) might be simpler/nicer prior. + :param trace: Multiprocessing-safe logging function (closure). :param x_mrr: The first dimensionless param from (-1, 1) interval. :param x_spread: The second dimensionless param from (-1, 1) interval. + :type trace: function (str, object) -> None + :type x_mrr: float + :type x_spread: float :returns: Log of critical rate [pps] and log of likelihood. :rtype: 2-tuple of float """ mrr = max_rate * (1.0 / (x_mrr + 1.0) - 0.5) + 1.0 spread = math.exp((x_spread + 1.0) / 2.0 * math.log(mrr)) -# print "mrr", mrr, "spread", spread logweight = self.log_weight( - fitting_function, trial_result_list, mrr, spread) + trace, fitting_function, trial_result_list, mrr, spread) value = math.log(self.find_critical_rate( - fitting_function, - math.log(packet_loss_per_second_target), mrr, spread)) + trace, fitting_function, min_rate, max_rate, + self.packet_loss_ratio_target, mrr, spread)) return value, logweight dilled_function = dill.dumps(value_logweight_func) boss_pipe_end, worker_pipe_end = multiprocessing.Pipe() boss_pipe_end.send( - (dimension, dilled_function, bias_avg, bias_cov)) + (dimension, dilled_function, focus_tracker, max_samples)) worker = multiprocessing.Process( - target=Integrator.try_estimate_nd, args=(worker_pipe_end,)) + target=Integrator.try_estimate_nd, args=( + worker_pipe_end, 10.0, self.trace_enabled)) worker.daemon = True worker.start() return boss_pipe_end erf_pipe = start_computing( - self.lfit_erf, erf_bias_avg, erf_bias_cov) + self.lfit_erf, erf_focus_tracker) stretch_pipe = start_computing( - self.lfit_stretch, stretch_bias_avg, stretch_bias_cov) + self.lfit_stretch, stretch_focus_tracker) # Measurement phase. measurement = self.measurer.measure(trial_duration, transmit_rate) # Processing phase. - erf_pipe.send(None) - stretch_pipe.send(None) - if not stretch_pipe.poll(1.0): - raise RuntimeError("Stretch worker did not finish!") - result_or_traceback = stretch_pipe.recv() - try: - (stretch_avg, stretch_stdev, stretch_bias_avg, - stretch_bias_cov, debug_list, _) = result_or_traceback - except ValueError: - raise RuntimeError( - "Stretch worker failed with the following traceback:\n{tr}" - .format(tr=result_or_traceback)) - logging.info("Logs from stretch worker:") - for message in debug_list: - logging.debug(message) - if not erf_pipe.poll(1.0): - raise RuntimeError("Erf worker did not finish!") - result_or_traceback = erf_pipe.recv() - try: - (erf_avg, erf_stdev, erf_bias_avg, - erf_bias_cov, debug_list, _) = result_or_traceback - except ValueError: - raise RuntimeError( - "Erf worker failed with the following traceback:\n{tr}" - .format(tr=result_or_traceback)) - logging.info("Logs from erf worker:") - for message in debug_list: - logging.debug(message) + def stop_computing(name, pipe): + """Just a block of code to be used for each worker. + + Send stop object, poll for result, then either + unpack response, log messages and return, or raise traceback. + + TODO: Define class/structure for the return value? + + :param name: Human friendly worker identifier for logging purposes. + :param pipe: Boss end of connection towards worker to stop. + :type name: str + :type pipe: multiprocessing.Connection + :returns: Computed value tracker, actual focus tracker, + and number of samples used for this iteration. + :rtype: 3-tuple of tracker, tracker and int + """ + pipe.send(None) + if not pipe.poll(10.0): + raise RuntimeError( + "Worker {name} did not finish!".format(name=name)) + result_or_traceback = pipe.recv() + try: + value_tracker, focus_tracker, debug_list, trace_list, sampls = ( + result_or_traceback) + except ValueError: + raise RuntimeError( + "Worker {name} failed with the following traceback:\n{tr}" + .format(name=name, tr=result_or_traceback)) + logging.info("Logs from worker %(name)r:", {"name": name}) + for message in debug_list: + logging.info(message) + for message in trace_list: + logging.debug(message) + logging.debug("trackers: value %(val)r focus %(foc)r", { + "val": value_tracker, "foc": focus_tracker}) + return value_tracker, focus_tracker, sampls + stretch_value_tracker, stretch_focus_tracker, stretch_samples = ( + stop_computing("stretch", stretch_pipe)) + erf_value_tracker, erf_focus_tracker, erf_samples = ( + stop_computing("erf", erf_pipe)) + stretch_avg = stretch_value_tracker.average + erf_avg = erf_value_tracker.average + # TODO: Take into account secondary stats. + stretch_stdev = math.exp(stretch_value_tracker.log_variance / 2) + erf_stdev = math.exp(erf_value_tracker.log_variance / 2) avg = math.exp((stretch_avg + erf_avg) / 2.0) var = (stretch_stdev * stretch_stdev + erf_stdev * erf_stdev) / 2.0 var += (stretch_avg - erf_avg) * (stretch_avg - erf_avg) / 4.0 stdev = avg * math.sqrt(var) - integrator_data = ( - stretch_bias_avg, erf_bias_avg, stretch_bias_cov, erf_bias_cov) + focus_trackers = (stretch_focus_tracker, erf_focus_tracker) + logging.info( + "measure_and_compute finished with trial result %(res)r " + "avg %(avg)r stdev %(stdev)r stretch %(a1)r erf %(a2)r " + "new trackers %(nt)r old trackers %(ot)r stretch samples %(ss)r " + "erf samples %(es)r", + {"res": measurement, "avg": avg, "stdev": stdev, + "a1": math.exp(stretch_avg), "a2": math.exp(erf_avg), + "nt": focus_trackers, "ot": old_trackers, "ss": stretch_samples, + "es": erf_samples}) return ( measurement, avg, stdev, math.exp(stretch_avg), - math.exp(erf_avg), integrator_data) + math.exp(erf_avg), focus_trackers) |