aboutsummaryrefslogtreecommitdiffstats
path: root/resources/libraries/python/PLRsearch/PLRsearch.py
diff options
context:
space:
mode:
Diffstat (limited to 'resources/libraries/python/PLRsearch/PLRsearch.py')
-rw-r--r--resources/libraries/python/PLRsearch/PLRsearch.py221
1 files changed, 138 insertions, 83 deletions
diff --git a/resources/libraries/python/PLRsearch/PLRsearch.py b/resources/libraries/python/PLRsearch/PLRsearch.py
index 226b482d76..326aa2e2d2 100644
--- a/resources/libraries/python/PLRsearch/PLRsearch.py
+++ b/resources/libraries/python/PLRsearch/PLRsearch.py
@@ -1,4 +1,4 @@
-# Copyright (c) 2020 Cisco and/or its affiliates.
+# Copyright (c) 2024 Cisco and/or its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
@@ -53,8 +53,14 @@ class PLRsearch:
log_xerfcx_10 = math.log(xerfcx_limit - math.exp(10) * erfcx(math.exp(10)))
def __init__(
- self, measurer, trial_duration_per_trial, packet_loss_ratio_target,
- trial_number_offset=0, timeout=7200.0, trace_enabled=False):
+ self,
+ measurer,
+ trial_duration_per_trial,
+ packet_loss_ratio_target,
+ trial_number_offset=0,
+ timeout=7200.0,
+ trace_enabled=False,
+ ):
"""Store rate measurer and additional parameters.
The measurer must never report negative loss count.
@@ -176,7 +182,7 @@ class PLRsearch:
f"Started search with min_rate {min_rate!r}, "
f"max_rate {max_rate!r}"
)
- trial_result_list = list()
+ trial_result_list = []
trial_number = self.trial_number_offset
focus_trackers = (None, None)
transmit_rate = (min_rate + max_rate) / 2.0
@@ -186,34 +192,54 @@ class PLRsearch:
trial_number += 1
logging.info(f"Trial {trial_number!r}")
results = self.measure_and_compute(
- self.trial_duration_per_trial * trial_number, transmit_rate,
- trial_result_list, min_rate, max_rate, focus_trackers
+ self.trial_duration_per_trial * trial_number,
+ transmit_rate,
+ trial_result_list,
+ min_rate,
+ max_rate,
+ focus_trackers,
)
measurement, average, stdev, avg1, avg2, focus_trackers = results
+ # Workaround for unsent packets and other anomalies.
+ measurement.plr_loss_count = min(
+ measurement.intended_count,
+ int(measurement.intended_count * measurement.loss_ratio + 0.9),
+ )
+ logging.debug(
+ f"loss ratio {measurement.plr_loss_count}"
+ f" / {measurement.intended_count}"
+ )
zeros += 1
# TODO: Ratio of fill rate to drain rate seems to have
# exponential impact. Make it configurable, or is 4:3 good enough?
- if measurement.loss_fraction >= self.packet_loss_ratio_target:
+ if measurement.plr_loss_count >= (
+ measurement.intended_count * self.packet_loss_ratio_target
+ ):
for _ in range(4 * zeros):
- lossy_loads.append(measurement.target_tr)
- if measurement.loss_count > 0:
+ lossy_loads.append(measurement.intended_load)
+ lossy_loads.sort()
zeros = 0
- lossy_loads.sort()
+ logging.debug("High enough loss, lossy loads added.")
+ else:
+ logging.debug(
+ f"Not a high loss, zero counter bumped to {zeros}."
+ )
if stop_time <= time.time():
return average, stdev
trial_result_list.append(measurement)
if (trial_number - self.trial_number_offset) <= 1:
next_load = max_rate
elif (trial_number - self.trial_number_offset) <= 3:
- next_load = (measurement.relative_receive_rate / (
- 1.0 - self.packet_loss_ratio_target))
+ next_load = measurement.relative_forwarding_rate / (
+ 1.0 - self.packet_loss_ratio_target
+ )
else:
next_load = (avg1 + avg2) / 2.0
if zeros > 0:
if lossy_loads[0] > next_load:
diminisher = math.pow(2.0, 1 - zeros)
next_load = lossy_loads[0] + diminisher * next_load
- next_load /= (1.0 + diminisher)
+ next_load /= 1.0 + diminisher
# On zero measurement, we need to drain obsoleted low losses
# even if we did not use them to increase next_load,
# in order to get to usable loses at higher loads.
@@ -263,22 +289,22 @@ class PLRsearch:
# TODO: chi is from https://en.wikipedia.org/wiki/Nondimensionalization
chi = (load - mrr) / spread
chi0 = -mrr / spread
- trace(u"stretch: load", load)
- trace(u"mrr", mrr)
- trace(u"spread", spread)
- trace(u"chi", chi)
- trace(u"chi0", chi0)
+ trace("stretch: load", load)
+ trace("mrr", mrr)
+ trace("spread", spread)
+ trace("chi", chi)
+ trace("chi0", chi0)
if chi > 0:
log_lps = math.log(
load - mrr + (log_plus(0, -chi) - log_plus(0, chi0)) * spread
)
- trace(u"big loss direct log_lps", log_lps)
+ trace("big loss direct log_lps", log_lps)
else:
two_positive = log_plus(chi, 2 * chi0 - log_2)
two_negative = log_plus(chi0, 2 * chi - log_2)
if two_positive <= two_negative:
log_lps = log_minus(chi, chi0) + log_spread
- trace(u"small loss crude log_lps", log_lps)
+ trace("small loss crude log_lps", log_lps)
return log_lps
two = log_minus(two_positive, two_negative)
three_positive = log_plus(two_positive, 3 * chi - log_3)
@@ -286,11 +312,11 @@ class PLRsearch:
three = log_minus(three_positive, three_negative)
if two == three:
log_lps = two + log_spread
- trace(u"small loss approx log_lps", log_lps)
+ trace("small loss approx log_lps", log_lps)
else:
log_lps = math.log(log_plus(0, chi) - log_plus(0, chi0))
log_lps += log_spread
- trace(u"small loss direct log_lps", log_lps)
+ trace("small loss direct log_lps", log_lps)
return log_lps
@staticmethod
@@ -329,26 +355,26 @@ class PLRsearch:
# TODO: The stretch sign is just to have less minuses. Worth changing?
chi = (mrr - load) / spread
chi0 = mrr / spread
- trace(u"Erf: load", load)
- trace(u"mrr", mrr)
- trace(u"spread", spread)
- trace(u"chi", chi)
- trace(u"chi0", chi0)
+ trace("Erf: load", load)
+ trace("mrr", mrr)
+ trace("spread", spread)
+ trace("chi", chi)
+ trace("chi0", chi0)
if chi >= -1.0:
- trace(u"positive, b roughly bigger than m", None)
+ trace("positive, b roughly bigger than m", None)
if chi > math.exp(10):
first = PLRsearch.log_xerfcx_10 + 2 * (math.log(chi) - 10)
- trace(u"approximated first", first)
+ trace("approximated first", first)
else:
first = math.log(PLRsearch.xerfcx_limit - chi * erfcx(chi))
- trace(u"exact first", first)
+ trace("exact first", first)
first -= chi * chi
second = math.log(PLRsearch.xerfcx_limit - chi * erfcx(chi0))
second -= chi0 * chi0
intermediate = log_minus(first, second)
- trace(u"first", first)
+ trace("first", first)
else:
- trace(u"negative, b roughly smaller than m", None)
+ trace("negative, b roughly smaller than m", None)
exp_first = PLRsearch.xerfcx_limit + chi * erfcx(-chi)
exp_first *= math.exp(-chi * chi)
exp_first -= 2 * chi
@@ -359,17 +385,17 @@ class PLRsearch:
second = math.log(PLRsearch.xerfcx_limit - chi * erfcx(chi0))
second -= chi0 * chi0
intermediate = math.log(exp_first - math.exp(second))
- trace(u"exp_first", exp_first)
- trace(u"second", second)
- trace(u"intermediate", intermediate)
+ trace("exp_first", exp_first)
+ trace("second", second)
+ trace("intermediate", intermediate)
result = intermediate + math.log(spread) - math.log(erfc(-chi0))
- trace(u"result", result)
+ trace("result", result)
return result
@staticmethod
def find_critical_rate(
- trace, lfit_func, min_rate, max_rate, loss_ratio_target,
- mrr, spread):
+ trace, lfit_func, min_rate, max_rate, loss_ratio_target, mrr, spread
+ ):
"""Given ratio target and parameters, return the achieving offered load.
This is basically an inverse function to lfit_func
@@ -411,12 +437,12 @@ class PLRsearch:
loss_rate = math.exp(lfit_func(trace, rate, mrr, spread))
loss_ratio = loss_rate / rate
if loss_ratio > loss_ratio_target:
- trace(u"halving down", rate)
+ trace("halving down", rate)
rate_hi = rate
elif loss_ratio < loss_ratio_target:
- trace(u"halving up", rate)
+ trace("halving up", rate)
rate_lo = rate
- trace(u"found", rate)
+ trace("found", rate)
return rate
@staticmethod
@@ -426,14 +452,22 @@ class PLRsearch:
Integrator assumes uniform distribution, but over different parameters.
Weight and likelihood are used interchangeably here anyway.
- Each trial has an offered load, a duration and a loss count.
- Fitting function is used to compute the average loss per second.
- Poisson distribution (with average loss per trial) is used
+ Each trial has an intended load, a sent count and a loss count
+ (probably counting unsent packets as loss, as they signal
+ the load is too high for the traffic generator).
+ The fitting function is used to compute the average loss rate.
+ Geometric distribution (with average loss per trial) is used
to get likelihood of one trial result, the overal likelihood
is a product of all trial likelihoods.
As likelihoods can be extremely small, logarithms are tracked instead.
- TODO: Copy ReceiveRateMeasurement from MLRsearch.
+ The current implementation does not use direct loss rate
+ from the fitting function, as the input and output units may not match
+ (e.g. intended load in TCP transactions, loss in packets).
+ Instead, the expected average loss is scaled according to the number
+ of packets actually sent.
+
+ TODO: Copy MeasurementResult from MLRsearch.
:param trace: A multiprocessing-friendly logging function (closure).
:param lfit_func: Fitting function, typically lfit_spread or lfit_erf.
@@ -442,40 +476,47 @@ class PLRsearch:
:param spread: The spread parameter for the fitting function.
:type trace: function (str, object) -> None
:type lfit_func: Function from 3 floats to float.
- :type trial_result_list: list of MLRsearch.ReceiveRateMeasurement
+ :type trial_result_list: list of MLRsearch.MeasurementResult
:type mrr: float
:type spread: float
:returns: Logarithm of result weight for given function and parameters.
:rtype: float
"""
log_likelihood = 0.0
- trace(u"log_weight for mrr", mrr)
- trace(u"spread", spread)
+ trace("log_weight for mrr", mrr)
+ trace("spread", spread)
for result in trial_result_list:
- trace(u"for tr", result.target_tr)
- trace(u"lc", result.loss_count)
- trace(u"d", result.duration)
- # _rel_ values use units of target_tr (transactions per second).
+ trace("for tr", result.intended_load)
+ trace("plc", result.plr_loss_count)
+ trace("d", result.intended_duration)
+ # _rel_ values use units of intended_load (transactions per second).
log_avg_rel_loss_per_second = lfit_func(
- trace, result.target_tr, mrr, spread
+ trace, result.intended_load, mrr, spread
)
# _abs_ values use units of loss count (maybe packets).
# There can be multiple packets per transaction.
log_avg_abs_loss_per_trial = log_avg_rel_loss_per_second + math.log(
- result.transmit_count / result.target_tr
+ result.offered_count / result.intended_load
)
# Geometric probability computation for logarithms.
log_trial_likelihood = log_plus(0.0, -log_avg_abs_loss_per_trial)
- log_trial_likelihood *= -result.loss_count
+ log_trial_likelihood *= -result.plr_loss_count
log_trial_likelihood -= log_plus(0.0, +log_avg_abs_loss_per_trial)
log_likelihood += log_trial_likelihood
- trace(u"avg_loss_per_trial", math.exp(log_avg_abs_loss_per_trial))
- trace(u"log_trial_likelihood", log_trial_likelihood)
+ trace("avg_loss_per_trial", math.exp(log_avg_abs_loss_per_trial))
+ trace("log_trial_likelihood", log_trial_likelihood)
return log_likelihood
def measure_and_compute(
- self, trial_duration, transmit_rate, trial_result_list,
- min_rate, max_rate, focus_trackers=(None, None), max_samples=None):
+ self,
+ trial_duration,
+ transmit_rate,
+ trial_result_list,
+ min_rate,
+ max_rate,
+ focus_trackers=(None, None),
+ max_samples=None,
+ ):
"""Perform both measurement and computation at once.
High level steps: Prepare and launch computation worker processes,
@@ -516,7 +557,7 @@ class PLRsearch:
:param max_samples: Limit for integrator samples, for debugging.
:type trial_duration: float
:type transmit_rate: float
- :type trial_result_list: list of MLRsearch.ReceiveRateMeasurement
+ :type trial_result_list: list of MLRsearch.MeasurementResult
:type min_rate: float
:type max_rate: float
:type focus_trackers: 2-tuple of None or stat_trackers.VectorStatTracker
@@ -556,6 +597,20 @@ class PLRsearch:
:rtype: multiprocessing.Connection
"""
+ boss_pipe_end, worker_pipe_end = multiprocessing.Pipe()
+ # Starting the worker first. Contrary to documentation
+ # https://docs.python.org/3/library/multiprocessing.html#multiprocessing.connection.Connection
+ # sending of large object without active listener on the other side
+ # results in a deadlock, not in a ValueError.
+ # See https://stackoverflow.com/questions/15137292/large-objects-and-multiprocessing-pipes-and-send
+ worker = multiprocessing.Process(
+ target=Integrator.try_estimate_nd,
+ args=(worker_pipe_end, 5.0, self.trace_enabled),
+ )
+ worker.daemon = True
+ worker.start()
+
+ # Only now it is safe to send the function to compute with.
def value_logweight_func(trace, x_mrr, x_spread):
"""Return log of critical rate and log of likelihood.
@@ -594,22 +649,18 @@ class PLRsearch:
)
value = math.log(
self.find_critical_rate(
- trace, fitting_function, min_rate, max_rate,
- self.packet_loss_ratio_target, mrr, spread
+ trace,
+ fitting_function,
+ min_rate,
+ max_rate,
+ self.packet_loss_ratio_target,
+ mrr,
+ spread,
)
)
return value, logweight
dilled_function = dill.dumps(value_logweight_func)
- boss_pipe_end, worker_pipe_end = multiprocessing.Pipe()
- # Do not send yet, run the worker first to avoid a deadlock.
- # See https://stackoverflow.com/a/15716500
- worker = multiprocessing.Process(
- target=Integrator.try_estimate_nd,
- args=(worker_pipe_end, 10.0, self.trace_enabled)
- )
- worker.daemon = True
- worker.start()
boss_pipe_end.send(
(dimension, dilled_function, focus_tracker, max_samples)
)
@@ -651,14 +702,18 @@ class PLRsearch:
raise RuntimeError(f"Worker {name} did not finish!")
result_or_traceback = pipe.recv()
try:
- value_tracker, focus_tracker, debug_list, trace_list, sampls = (
- result_or_traceback
- )
- except ValueError:
+ (
+ value_tracker,
+ focus_tracker,
+ debug_list,
+ trace_list,
+ sampls,
+ ) = result_or_traceback
+ except ValueError as exc:
raise RuntimeError(
f"Worker {name} failed with the following traceback:\n"
f"{result_or_traceback}"
- )
+ ) from exc
logging.info(f"Logs from worker {name!r}:")
for message in debug_list:
logging.info(message)
@@ -669,8 +724,8 @@ class PLRsearch:
)
return _PartialResult(value_tracker, focus_tracker, sampls)
- stretch_result = stop_computing(u"stretch", stretch_pipe)
- erf_result = stop_computing(u"erf", erf_pipe)
+ stretch_result = stop_computing("stretch", stretch_pipe)
+ erf_result = stop_computing("erf", erf_pipe)
result = PLRsearch._get_result(measurement, stretch_result, erf_result)
logging.info(
f"measure_and_compute finished with trial result "
@@ -692,7 +747,7 @@ class PLRsearch:
:param measurement: The trial measurement obtained during computation.
:param stretch_result: Computation output for stretch fitting function.
:param erf_result: Computation output for erf fitting function.
- :type measurement: ReceiveRateMeasurement
+ :type measurement: MeasurementResult
:type stretch_result: _PartialResult
:type erf_result: _PartialResult
:returns: Combined results.
@@ -717,7 +772,7 @@ class PLRsearch:
# Named tuples, for multiple local variables to be passed as return value.
_PartialResult = namedtuple(
- u"_PartialResult", u"value_tracker focus_tracker samples"
+ "_PartialResult", "value_tracker focus_tracker samples"
)
"""Two stat trackers and sample counter.
@@ -730,8 +785,8 @@ _PartialResult = namedtuple(
"""
_ComputeResult = namedtuple(
- u"_ComputeResult",
- u"measurement avg stdev stretch_exp_avg erf_exp_avg trackers"
+ "_ComputeResult",
+ "measurement avg stdev stretch_exp_avg erf_exp_avg trackers",
)
"""Measurement, 4 computation result values, pair of trackers.
@@ -741,7 +796,7 @@ _ComputeResult = namedtuple(
:param stretch_exp_avg: Stretch fitting function estimate average exponentiated.
:param erf_exp_avg: Erf fitting function estimate average, exponentiated.
:param trackers: Pair of focus trackers to start next iteration with.
-:type measurement: ReceiveRateMeasurement
+:type measurement: MeasurementResult
:type avg: float
:type stdev: float
:type stretch_exp_avg: float