aboutsummaryrefslogtreecommitdiffstats
path: root/resources/libraries/python/PLRsearch/PLRsearch.py
diff options
context:
space:
mode:
Diffstat (limited to 'resources/libraries/python/PLRsearch/PLRsearch.py')
-rw-r--r--resources/libraries/python/PLRsearch/PLRsearch.py204
1 files changed, 109 insertions, 95 deletions
diff --git a/resources/libraries/python/PLRsearch/PLRsearch.py b/resources/libraries/python/PLRsearch/PLRsearch.py
index b7c9344391..e20d293d3c 100644
--- a/resources/libraries/python/PLRsearch/PLRsearch.py
+++ b/resources/libraries/python/PLRsearch/PLRsearch.py
@@ -17,20 +17,22 @@ import logging
import math
import multiprocessing
import time
+
from collections import namedtuple
import dill
+
from scipy.special import erfcx, erfc
# TODO: Teach FD.io CSIT to use multiple dirs in PYTHONPATH,
# then switch to absolute imports within PLRsearch package.
# Current usage of relative imports is just a short term workaround.
from . import Integrator
-from .log_plus import log_plus, log_minus
from . import stat_trackers
+from .log_plus import log_plus, log_minus
-class PLRsearch(object):
+class PLRsearch:
"""A class to encapsulate data relevant for the search method.
The context is performance testing of packet processing systems.
@@ -41,7 +43,7 @@ class PLRsearch(object):
Two constants are stored as class fields for speed.
- Method othed than search (and than __init__)
+ Method other than search (and than __init__)
are just internal code structure.
TODO: Those method names should start with underscore then.
@@ -168,20 +170,23 @@ class PLRsearch(object):
stop_time = time.time() + self.timeout
min_rate = float(min_rate)
max_rate = float(max_rate)
- logging.info("Started search with min_rate %(min)r, max_rate %(max)r",
- {"min": min_rate, "max": max_rate})
+ logging.info(
+ f"Started search with min_rate {min_rate!r}, "
+ f"max_rate {max_rate!r}"
+ )
trial_result_list = list()
trial_number = self.trial_number_offset
focus_trackers = (None, None)
transmit_rate = (min_rate + max_rate) / 2.0
lossy_loads = [max_rate]
- zeros = 0 # How many cosecutive zero loss results are happening.
+ zeros = 0 # How many consecutive zero loss results are happening.
while 1:
trial_number += 1
- logging.info("Trial %(number)r", {"number": trial_number})
+ logging.info(f"Trial {trial_number!r}")
results = self.measure_and_compute(
self.trial_duration_per_trial * trial_number, transmit_rate,
- trial_result_list, min_rate, max_rate, focus_trackers)
+ trial_result_list, min_rate, max_rate, focus_trackers
+ )
measurement, average, stdev, avg1, avg2, focus_trackers = results
zeros += 1
# TODO: Ratio of fill rate to drain rate seems to have
@@ -212,9 +217,10 @@ class PLRsearch(object):
# in order to get to usable loses at higher loads.
if len(lossy_loads) > 3:
lossy_loads = lossy_loads[3:]
- logging.debug("Zeros %(z)r orig %(o)r next %(n)r loads %(s)r",
- {"z": zeros, "o": (avg1 + avg2) / 2.0,
- "n": next_load, "s": lossy_loads})
+ logging.debug(
+ f"Zeros {zeros!r} orig {(avg1 + avg2) / 2.0!r} "
+ f"next {next_load!r} loads {lossy_loads!r}"
+ )
transmit_rate = min(max_rate, max(min_rate, next_load))
@staticmethod
@@ -255,21 +261,22 @@ class PLRsearch(object):
# TODO: chi is from https://en.wikipedia.org/wiki/Nondimensionalization
chi = (load - mrr) / spread
chi0 = -mrr / spread
- trace("stretch: load", load)
- trace("mrr", mrr)
- trace("spread", spread)
- trace("chi", chi)
- trace("chi0", chi0)
+ trace(u"stretch: load", load)
+ trace(u"mrr", mrr)
+ trace(u"spread", spread)
+ trace(u"chi", chi)
+ trace(u"chi0", chi0)
if chi > 0:
log_lps = math.log(
- load - mrr + (log_plus(0, -chi) - log_plus(0, chi0)) * spread)
- trace("big loss direct log_lps", log_lps)
+ load - mrr + (log_plus(0, -chi) - log_plus(0, chi0)) * spread
+ )
+ trace(u"big loss direct log_lps", log_lps)
else:
two_positive = log_plus(chi, 2 * chi0 - log_2)
two_negative = log_plus(chi0, 2 * chi - log_2)
if two_positive <= two_negative:
log_lps = log_minus(chi, chi0) + log_spread
- trace("small loss crude log_lps", log_lps)
+ trace(u"small loss crude log_lps", log_lps)
return log_lps
two = log_minus(two_positive, two_negative)
three_positive = log_plus(two_positive, 3 * chi - log_3)
@@ -277,11 +284,11 @@ class PLRsearch(object):
three = log_minus(three_positive, three_negative)
if two == three:
log_lps = two + log_spread
- trace("small loss approx log_lps", log_lps)
+ trace(u"small loss approx log_lps", log_lps)
else:
log_lps = math.log(log_plus(0, chi) - log_plus(0, chi0))
log_lps += log_spread
- trace("small loss direct log_lps", log_lps)
+ trace(u"small loss direct log_lps", log_lps)
return log_lps
@staticmethod
@@ -320,26 +327,26 @@ class PLRsearch(object):
# TODO: The stretch sign is just to have less minuses. Worth changing?
chi = (mrr - load) / spread
chi0 = mrr / spread
- trace("Erf: load", load)
- trace("mrr", mrr)
- trace("spread", spread)
- trace("chi", chi)
- trace("chi0", chi0)
+ trace(u"Erf: load", load)
+ trace(u"mrr", mrr)
+ trace(u"spread", spread)
+ trace(u"chi", chi)
+ trace(u"chi0", chi0)
if chi >= -1.0:
- trace("positive, b roughly bigger than m", None)
+ trace(u"positive, b roughly bigger than m", None)
if chi > math.exp(10):
first = PLRsearch.log_xerfcx_10 + 2 * (math.log(chi) - 10)
- trace("approximated first", first)
+ trace(u"approximated first", first)
else:
first = math.log(PLRsearch.xerfcx_limit - chi * erfcx(chi))
- trace("exact first", first)
+ trace(u"exact first", first)
first -= chi * chi
second = math.log(PLRsearch.xerfcx_limit - chi * erfcx(chi0))
second -= chi0 * chi0
intermediate = log_minus(first, second)
- trace("first", first)
+ trace(u"first", first)
else:
- trace("negative, b roughly smaller than m", None)
+ trace(u"negative, b roughly smaller than m", None)
exp_first = PLRsearch.xerfcx_limit + chi * erfcx(-chi)
exp_first *= math.exp(-chi * chi)
exp_first -= 2 * chi
@@ -350,11 +357,11 @@ class PLRsearch(object):
second = math.log(PLRsearch.xerfcx_limit - chi * erfcx(chi0))
second -= chi0 * chi0
intermediate = math.log(exp_first - math.exp(second))
- trace("exp_first", exp_first)
- trace("second", second)
- trace("intermediate", intermediate)
+ trace(u"exp_first", exp_first)
+ trace(u"second", second)
+ trace(u"intermediate", intermediate)
result = intermediate + math.log(spread) - math.log(erfc(-chi0))
- trace("result", result)
+ trace(u"result", result)
return result
@staticmethod
@@ -385,7 +392,7 @@ class PLRsearch(object):
:type lfit_func: Function from 3 floats to float.
:type min_rate: float
:type max_rate: float
- :type log_lps_target: float
+ :type loss_ratio_target: float
:type mrr: float
:type spread: float
:returns: Load [pps] which achieves the target with given parameters.
@@ -397,17 +404,17 @@ class PLRsearch(object):
loss_ratio = -1
while loss_ratio != loss_ratio_target:
rate = (rate_hi + rate_lo) / 2.0
- if rate == rate_hi or rate == rate_lo:
+ if rate in (rate_hi, rate_lo):
break
loss_rate = math.exp(lfit_func(trace, rate, mrr, spread))
loss_ratio = loss_rate / rate
if loss_ratio > loss_ratio_target:
- trace("halving down", rate)
+ trace(u"halving down", rate)
rate_hi = rate
elif loss_ratio < loss_ratio_target:
- trace("halving up", rate)
+ trace(u"halving up", rate)
rate_lo = rate
- trace("found", rate)
+ trace(u"found", rate)
return rate
@staticmethod
@@ -428,36 +435,39 @@ class PLRsearch(object):
:param trace: A multiprocessing-friendly logging function (closure).
:param lfit_func: Fitting function, typically lfit_spread or lfit_erf.
- :param result_list: List of trial measurement results.
+ :param trial_result_list: List of trial measurement results.
:param mrr: The mrr parameter for the fitting function.
:param spread: The spread parameter for the fittinmg function.
:type trace: function (str, object) -> None
:type lfit_func: Function from 3 floats to float.
- :type result_list: list of MLRsearch.ReceiveRateMeasurement
+ :type trial_result_list: list of MLRsearch.ReceiveRateMeasurement
:type mrr: float
:type spread: float
:returns: Logarithm of result weight for given function and parameters.
:rtype: float
"""
log_likelihood = 0.0
- trace("log_weight for mrr", mrr)
- trace("spread", spread)
+ trace(u"log_weight for mrr", mrr)
+ trace(u"spread", spread)
for result in trial_result_list:
- trace("for tr", result.target_tr)
- trace("lc", result.loss_count)
- trace("d", result.duration)
+ trace(u"for tr", result.target_tr)
+ trace(u"lc", result.loss_count)
+ trace(u"d", result.duration)
log_avg_loss_per_second = lfit_func(
- trace, result.target_tr, mrr, spread)
+ trace, result.target_tr, mrr, spread
+ )
log_avg_loss_per_trial = (
- log_avg_loss_per_second + math.log(result.duration))
+ log_avg_loss_per_second + math.log(result.duration)
+ )
# Poisson probability computation works nice for logarithms.
log_trial_likelihood = (
result.loss_count * log_avg_loss_per_trial
- - math.exp(log_avg_loss_per_trial))
+ - math.exp(log_avg_loss_per_trial)
+ )
log_trial_likelihood -= math.lgamma(1 + result.loss_count)
log_likelihood += log_trial_likelihood
- trace("avg_loss_per_trial", math.exp(log_avg_loss_per_trial))
- trace("log_trial_likelihood", log_trial_likelihood)
+ trace(u"avg_loss_per_trial", math.exp(log_avg_loss_per_trial))
+ trace(u"log_trial_likelihood", log_trial_likelihood)
return log_likelihood
def measure_and_compute(
@@ -512,12 +522,11 @@ class PLRsearch(object):
:rtype: _ComputeResult
"""
logging.debug(
- "measure_and_compute started with self %(self)r, trial_duration "
- "%(dur)r, transmit_rate %(tr)r, trial_result_list %(trl)r, "
- "max_rate %(mr)r, focus_trackers %(track)r, max_samples %(ms)r",
- {"self": self, "dur": trial_duration, "tr": transmit_rate,
- "trl": trial_result_list, "mr": max_rate, "track": focus_trackers,
- "ms": max_samples})
+ f"measure_and_compute started with self {self!r}, trial_duration "
+ f"{trial_duration!r}, transmit_rate {transmit_rate!r}, "
+ f"trial_result_list {trial_result_list!r}, max_rate {max_rate!r}, "
+ f"focus_trackers {focus_trackers!r}, max_samples {max_samples!r}"
+ )
# Preparation phase.
dimension = 2
stretch_focus_tracker, erf_focus_tracker = focus_trackers
@@ -536,11 +545,10 @@ class PLRsearch(object):
start computation, return the boss pipe end.
:param fitting_function: lfit_erf or lfit_stretch.
- :param bias_avg: Tuple of floats to start searching around.
- :param bias_cov: Covariance matrix defining initial focus shape.
+ :param focus_tracker: Tracker initialized to speed up the numeric
+ computation.
:type fitting_function: Function from 3 floats to float.
- :type bias_avg: 2-tuple of floats
- :type bias_cov: 2-tuple of 2-tuples of floats
+ :type focus_tracker: None or stat_trackers.VectorStatTracker
:returns: Boss end of communication pipe.
:rtype: multiprocessing.Connection
"""
@@ -579,27 +587,31 @@ class PLRsearch(object):
mrr = max_rate * (1.0 / (x_mrr + 1.0) - 0.5) + 1.0
spread = math.exp((x_spread + 1.0) / 2.0 * math.log(mrr))
logweight = self.log_weight(
- trace, fitting_function, trial_result_list, mrr, spread)
- value = math.log(self.find_critical_rate(
- trace, fitting_function, min_rate, max_rate,
- self.packet_loss_ratio_target, mrr, spread))
+ trace, fitting_function, trial_result_list, mrr, spread
+ )
+ value = math.log(
+ self.find_critical_rate(
+ trace, fitting_function, min_rate, max_rate,
+ self.packet_loss_ratio_target, mrr, spread
+ )
+ )
return value, logweight
dilled_function = dill.dumps(value_logweight_func)
boss_pipe_end, worker_pipe_end = multiprocessing.Pipe()
boss_pipe_end.send(
- (dimension, dilled_function, focus_tracker, max_samples))
+ (dimension, dilled_function, focus_tracker, max_samples)
+ )
worker = multiprocessing.Process(
- target=Integrator.try_estimate_nd, args=(
- worker_pipe_end, 10.0, self.trace_enabled))
+ target=Integrator.try_estimate_nd,
+ args=(worker_pipe_end, 10.0, self.trace_enabled)
+ )
worker.daemon = True
worker.start()
return boss_pipe_end
- erf_pipe = start_computing(
- self.lfit_erf, erf_focus_tracker)
- stretch_pipe = start_computing(
- self.lfit_stretch, stretch_focus_tracker)
+ erf_pipe = start_computing(self.lfit_erf, erf_focus_tracker)
+ stretch_pipe = start_computing(self.lfit_stretch, stretch_focus_tracker)
# Measurement phase.
measurement = self.measurer.measure(trial_duration, transmit_rate)
@@ -623,38 +635,38 @@ class PLRsearch(object):
"""
pipe.send(None)
if not pipe.poll(10.0):
- raise RuntimeError(
- "Worker {name} did not finish!".format(name=name))
+ raise RuntimeError(f"Worker {name} did not finish!")
result_or_traceback = pipe.recv()
try:
value_tracker, focus_tracker, debug_list, trace_list, sampls = (
- result_or_traceback)
+ result_or_traceback
+ )
except ValueError:
raise RuntimeError(
- "Worker {name} failed with the following traceback:\n{tr}"
- .format(name=name, tr=result_or_traceback))
- logging.info("Logs from worker %(name)r:", {"name": name})
+ f"Worker {name} failed with the following traceback:\n"
+ f"{result_or_traceback}"
+ )
+ logging.info(f"Logs from worker {name!r}:")
for message in debug_list:
logging.info(message)
for message in trace_list:
logging.debug(message)
- logging.debug("trackers: value %(val)r focus %(foc)r", {
- "val": value_tracker, "foc": focus_tracker})
+ logging.debug(
+ f"trackers: value {value_tracker!r} focus {focus_tracker!r}"
+ )
return _PartialResult(value_tracker, focus_tracker, sampls)
- stretch_result = stop_computing("stretch", stretch_pipe)
- erf_result = stop_computing("erf", erf_pipe)
+ stretch_result = stop_computing(u"stretch", stretch_pipe)
+ erf_result = stop_computing(u"erf", erf_pipe)
result = PLRsearch._get_result(measurement, stretch_result, erf_result)
logging.info(
- "measure_and_compute finished with trial result %(res)r "
- "avg %(avg)r stdev %(stdev)r stretch %(a1)r erf %(a2)r "
- "new trackers %(nt)r old trackers %(ot)r stretch samples %(ss)r "
- "erf samples %(es)r",
- {"res": result.measurement,
- "avg": result.avg, "stdev": result.stdev,
- "a1": result.stretch_exp_avg, "a2": result.erf_exp_avg,
- "nt": result.trackers, "ot": old_trackers,
- "ss": stretch_result.samples, "es": erf_result.samples})
+ f"measure_and_compute finished with trial result "
+ f"{result.measurement!r} avg {result.avg!r} stdev {result.stdev!r} "
+ f"stretch {result.stretch_exp_avg!r} erf {result.erf_exp_avg!r} "
+ f"new trackers {result.trackers!r} old trackers {old_trackers!r} "
+ f"stretch samples {stretch_result.samples!r} erf samples "
+ f"{erf_result.samples!r}"
+ )
return result
@staticmethod
@@ -692,7 +704,8 @@ class PLRsearch(object):
# Named tuples, for multiple local variables to be passed as return value.
_PartialResult = namedtuple(
- "_PartialResult", "value_tracker focus_tracker samples")
+ u"_PartialResult", u"value_tracker focus_tracker samples"
+)
"""Two stat trackers and sample counter.
:param value_tracker: Tracker for the value (critical load) being integrated.
@@ -704,8 +717,9 @@ _PartialResult = namedtuple(
"""
_ComputeResult = namedtuple(
- "_ComputeResult",
- "measurement avg stdev stretch_exp_avg erf_exp_avg trackers")
+ u"_ComputeResult",
+ u"measurement avg stdev stretch_exp_avg erf_exp_avg trackers"
+)
"""Measurement, 4 computation result values, pair of trackers.
:param measurement: The trial measurement result obtained during computation.