path: root/resources/libraries/python/MLRsearch/multiple_loss_ratio_search.py
diff options
Diffstat (limited to 'resources/libraries/python/MLRsearch/multiple_loss_ratio_search.py')
1 files changed, 314 insertions, 0 deletions
diff --git a/resources/libraries/python/MLRsearch/multiple_loss_ratio_search.py b/resources/libraries/python/MLRsearch/multiple_loss_ratio_search.py
new file mode 100644
index 0000000000..b48e2e7547
--- /dev/null
+++ b/resources/libraries/python/MLRsearch/multiple_loss_ratio_search.py
@@ -0,0 +1,314 @@
+# Copyright (c) 2023 Cisco and/or its affiliates.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+# http://www.apache.org/licenses/LICENSE-2.0
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Module defining MultipleLossRatioSearch class."""
+import logging
+import time
+from dataclasses import dataclass
+from typing import Callable, Optional, Tuple
+from .candidate import Candidate
+from .config import Config
+from .dataclass import secondary_field
+from .discrete_load import DiscreteLoad
+from .discrete_result import DiscreteResult
+from .expander import GlobalWidth
+from .limit_handler import LimitHandler
+from .load_rounding import LoadRounding
+from .measurement_database import MeasurementDatabase
+from .pep3140 import Pep3140Dict
+from .search_goal import SearchGoal
+from .selector import Selector
+from .target_scaling import TargetScaling
+from .trial_measurement import AbstractMeasurer
+from .trimmed_stat import TrimmedStat
+class MultipleLossRatioSearch:
+ """Optimized binary search algorithm for finding conditional throughputs.
+ Traditional binary search algorithm needs initial interval
+ (lower and upper bound), and returns final narrow bounds
+ (related to its search goal) after bisecting
+ (until some exit condition is met).
+ The exit condition is usually related to the interval width,
+ (upper bound value minus lower bound value).
+ The optimized algorithm in this class contains several improvements
+ aimed to reduce overall search time.
+ One improvement is searching for bounds for multiple search goals at once.
+ Specifically, the trial measurement results influence bounds for all goals,
+ even though the selection of trial inputs for next measurement
+ focuses only on one goal. The focus can switch between goals frequently.
+ Next improvement is that results of trial measurements
+ with small trial duration can be used to find a reasonable starting interval
+ for full trial duration search.
+ This results in more trials performed, but smaller overall duration
+ in general.
+ Internally, such shorter trials come from "preceding targets",
+ handled in a same way as a search goal "final target".
+ Related improvement is that the "current" interval does not need to be valid
+ (e.g. one of the bounds is missing).
+ In that case, this algorithm will move and expand the interval,
+ in a process called external search. Only when both bounds are found,
+ the interval bisection (called internal search) starts making it narrow.
+ Next improvement is bisecting in logarithmic quantities,
+ so that target relative width is independent of measurement units.
+ Next improvement is basing the initial interval on forwarding rates
+ of few initial measurements, starting at max load and using forwarding rates
+ seen so far.
+ Next improvement is to allow the use of multiple shorter trials
+ instead one big trial, allowing a percentage of trials
+ to exceed the loss ratio target.
+ This makes the result more stable in practice.
+ Conservative behavior (single long trial, zero exceed ratio)
+ is still available using corresponding goal definitions.
+ Final improvement is exiting early if the minimal load
+ is not a valid lower bound (at final duration)
+ and also exiting if the overall search duration is too long.
+ There are also subtle optimizations related to candidate selection
+ and uneven splitting of intervals, too numerous to list here.
+ """
+ config: Config
+ """Arguments required at construction time."""
+ # End of fields required at intance creation.
+ measurer: AbstractMeasurer = secondary_field()
+ """Measurer to use, set at calling search()."""
+ debug: Callable[[str], None] = secondary_field()
+ """Object to call for logging, None means logging.debug."""
+ # Fields below are computed from data above
+ rounding: LoadRounding = secondary_field()
+ """Derived from goals. Instance to use for intended load rounding."""
+ from_float: Callable[[float], DiscreteLoad] = secondary_field()
+ """Conversion method from float [tps] intended load values."""
+ limit_handler: LimitHandler = secondary_field()
+ """Load post-processing utility based on config and rounding."""
+ scaling: TargetScaling = secondary_field()
+ """Utility for creating target chains for search goals."""
+ database: MeasurementDatabase = secondary_field()
+ """Storage for (stats of) measurement results so far."""
+ stop_time: float = secondary_field()
+ """Monotonic time value at which the search should end with failure."""
+ def search(
+ self,
+ measurer: AbstractMeasurer,
+ debug: Optional[Callable[[str], None]] = None,
+ ) -> Pep3140Dict[SearchGoal, Optional[TrimmedStat]]:
+ """Perform initial trials, create state object, proceed with main loop.
+ Stateful arguments (measurer and debug) are stored.
+ Derived objects are constructed from config.
+ :param measurer: Measurement provider to use by this search object.
+ :param debug: Callable to optionally use instead of logging.debug().
+ :returns: Structure containing conditional throughputs and other stats,
+ one for each search goal.
+ :type measurer: AbstractMeasurer
+ :type debug: Optional[Callable[[str], None]]
+ :returns: Mapping from goal to lower bound (none if min load is hit).
+ :rtype: Pep3140Dict[SearchGoal, Optional[TrimmedStat]]
+ :raises RuntimeError: If total duration is larger than timeout,
+ or if min load becomes an upper bound for a search goal
+ that has fail fast true.
+ """
+ self.measurer = measurer
+ self.debug = logging.debug if debug is None else debug
+ self.rounding = LoadRounding(
+ min_load=self.config.min_load,
+ max_load=self.config.max_load,
+ float_goals=[goal.relative_width for goal in self.config.goals],
+ )
+ self.from_float = DiscreteLoad.float_conver(rounding=self.rounding)
+ self.limit_handler = LimitHandler(
+ rounding=self.rounding,
+ debug=self.debug,
+ )
+ self.scaling = TargetScaling(
+ goals=self.config.goals,
+ rounding=self.rounding,
+ )
+ self.database = MeasurementDatabase(self.scaling.targets)
+ self.stop_time = time.monotonic() + self.config.search_duration_max
+ result0, result1 = self.run_initial_trials()
+ self.main_loop(result0.discrete_load, result1.discrete_load)
+ ret_dict = Pep3140Dict()
+ for goal in self.config.goals:
+ target = self.scaling.goal_to_final_target[goal]
+ bounds = self.database.get_relevant_bounds(target=target)
+ ret_dict[goal] = bounds.clo
+ return ret_dict
+ def measure(self, duration: float, load: DiscreteLoad) -> DiscreteResult:
+ """Call measurer and put the result to appropriate form in database.
+ Also check the argument types and load roundness,
+ and return the result to the caller.
+ :param duration: Intended duration for the trial measurement.
+ :param load: Intended load for the trial measurement:
+ :type duration: float
+ :type load: DiscreteLoad
+ :returns: The trial results.
+ :rtype: DiscreteResult
+ :raises RuntimeError: If an argument doed not have the required type.
+ """
+ if not isinstance(duration, float):
+ raise RuntimeError(f"Duration has to be float: {duration!r}")
+ if not isinstance(load, DiscreteLoad):
+ raise RuntimeError(f"Load has to be discrete: {load!r}")
+ if not load.is_round:
+ raise RuntimeError(f"Told to measure unrounded: {load!r}")
+ self.debug(f"Measuring at d={duration},il={int(load)}")
+ result = self.measurer.measure(
+ intended_duration=duration,
+ intended_load=float(load),
+ )
+ self.debug(f"Measured lr={result.loss_ratio}")
+ result = DiscreteResult.with_load(result=result, load=load)
+ self.database.add(result)
+ return result
+ def run_initial_trials(self) -> Tuple[DiscreteResult, DiscreteResult]:
+ """Perform trials to get enough data to start the selectors.
+ Measurements are done with all initial targets in mind,
+ based on smallest target loss ratio, largest initial trial duration,
+ and largest initial target width.
+ Forwarding rate is used as a hint for next intended load.
+ The relative quantity is used, as load can use different units.
+ When the smallest target loss ratio is non-zero, a correction is needed
+ (forwarding rate is only a good hint for zero loss ratio load).
+ The correction is conservative (all increase in load turns to losses).
+ Also, warmup trial (if configured) is performed,
+ all other trials are added to the database.
+ This could return the initial width, but from implementation perspective
+ it is easier to return two measurements (or the same one twice) here
+ and compute width later. The "one value twice" happens when max load
+ has small loss, or when min load has big loss.
+ :returns: Two last measured values, in any order. Or one value twice.
+ :rtype: Tuple[DiscreteResult, DiscreteResult]
+ """
+ max_load = self.limit_handler.max_load
+ ratio, duration, width = None, None, None
+ for target in self.scaling.targets:
+ if target.preceding:
+ continue
+ if ratio is None or ratio > target.loss_ratio:
+ ratio = target.loss_ratio
+ if not duration or duration < target.trial_duration:
+ duration = target.trial_duration
+ if not width or width < target.discrete_width:
+ width = target.discrete_width
+ self.debug(f"Init ratio {ratio} duration {duration} width {width}")
+ if self.config.warmup_duration:
+ self.debug("Warmup trial.")
+ self.measure(self.config.warmup_duration, max_load)
+ # Warmup should not affect the real results, reset the database.
+ self.database = MeasurementDatabase(self.scaling.targets)
+ self.debug(f"First trial at max rate: {max_load}")
+ result0 = self.measure(duration, max_load)
+ rfr = result0.relative_forwarding_rate
+ corrected_rfr = (self.from_float(rfr) / (1.0 - ratio)).rounded_down()
+ if corrected_rfr >= max_load:
+ self.debug("Small loss, no other initial trials are needed.")
+ return result0, result0
+ mrr = self.limit_handler.handle(corrected_rfr, width, None, max_load)
+ self.debug(f"Second trial at (corrected) mrr: {mrr}")
+ result1 = self.measure(duration, mrr)
+ # Attempt to get narrower width.
+ result_ratio = result1.loss_ratio
+ if result_ratio > ratio:
+ rfr2 = result1.relative_forwarding_rate
+ crfr2 = (self.from_float(rfr2) / (1.0 - ratio)).rounded_down()
+ mrr2 = self.limit_handler.handle(crfr2, width, None, mrr)
+ else:
+ mrr2 = mrr + width
+ mrr2 = self.limit_handler.handle(mrr2, width, mrr, max_load)
+ if not mrr2:
+ self.debug("Close enough, measuring at mrr2 is not needed.")
+ return result1, result1
+ self.debug(f"Third trial at (corrected) mrr2: {mrr2}")
+ result2 = self.measure(duration, mrr2)
+ return result1, result2
+ def main_loop(self, load0: DiscreteLoad, load1: DiscreteLoad) -> None:
+ """Initialize selectors and keep measuring the winning candidate.
+ Selectors are created, the two input loads are useful starting points.
+ The search ends when no selector nominates any candidate,
+ or if the search takes too long (or if a selector raises).
+ Winner is selected according to ordering defined in Candidate class.
+ In case of a tie, selectors for earlier goals are preferred.
+ As a selector is only allowed to update current width as the winner,
+ the update is done here explicitly.
+ :param load0: Discrete load of one of results from run_initial_trials.
+ :param load1: Discrete load of other of results from run_initial_trials.
+ :type load0: DiscreteLoad
+ :type load1: DiscreteLoad
+ :raises RuntimeError: If the search takes too long,
+ or if min load becomes an upper bound for any search goal
+ """
+ if load1 < load0:
+ load0, load1 = load1, load0
+ global_width = GlobalWidth.from_loads(load0, load1)
+ selectors = []
+ for target in self.scaling.goal_to_final_target.values():
+ selector = Selector(
+ final_target=target,
+ global_width=global_width,
+ initial_lower_load=load0,
+ initial_upper_load=load1,
+ database=self.database,
+ handler=self.limit_handler,
+ debug=self.debug,
+ )
+ selectors.append(selector)
+ while time.monotonic() < self.stop_time:
+ winner = Candidate()
+ for selector in selectors:
+ # Order of arguments is important
+ # when two targets nominate the same candidate.
+ winner = min(Candidate.nomination_from(selector), winner)
+ if not winner:
+ break
+ # We do not check duration versus stop_time here,
+ # as some measurers can be unpredictably faster
+ # than their intended duration suggests.
+ self.measure(duration=winner.duration, load=winner.load)
+ # Delayed updates.
+ if winner.width:
+ global_width.width = winner.width
+ winner.won()
+ else:
+ raise RuntimeError("Optimized search takes too long.")
+ self.debug("Search done.")