diff options
author | Vratko Polak <vrpolak@cisco.com> | 2021-03-22 17:11:21 +0100 |
---|---|---|
committer | Vratko Polak <vrpolak@cisco.com> | 2021-03-22 17:11:21 +0100 |
commit | b6fbffad32515ccf94404680cb5280c2cb561af5 (patch) | |
tree | 6f16569781e3e3627589c6fff0d7a5be0410d3f3 /resources/libraries/python/MLRsearch/MultipleLossRatioSearch.py | |
parent | adf5f13886e8bdd4fb224f12f10d731cadf698f3 (diff) |
MLRsearch: Support other than just two ratios
+ Change some method names and argument types:
+ Do not mention NDR and PDR, except as examples.
+ Return list of ReceiveRateInterval instead of NdrPdrResult.
+ The resulting intervals can be degenerate when hitting min/max rate.
+ Rename quantity name parts from "fraction" to "ratio".
+ Intervals are no longer tracked for each target ratio.
+ They are found dynamically from known results.
+ Add effective_loss_ratio field to avoid loss inversion effects.
+ Move some functions to separate files.
+ Bound search logic moved to MeasurementDatabase.py
+ ProgressState moved to its file.
+ WidthArithmetics.py holds small computation functions.
+ Use parameter expansion_coefficient instead of "doublings".
+ Do uneven bisect to save time when width is not power of two times goal.
+ Timeout now correctly tracked for the whole search,
not just the current phase.
+ Make logging (debug) function pluggable.
+ Added debug log messages for initial phase.
+ Do not mark as subclass if contructor signature differs.
+ Avoid re-measure on scale-limited (ASTF) profiles.
+ Remove outdated comments.
+ Bump copyright years.
Change-Id: I93f693b4f186f59030ee5ac21b78acc890109813
Signed-off-by: Vratko Polak <vrpolak@cisco.com>
Diffstat (limited to 'resources/libraries/python/MLRsearch/MultipleLossRatioSearch.py')
-rw-r--r-- | resources/libraries/python/MLRsearch/MultipleLossRatioSearch.py | 839 |
1 files changed, 337 insertions, 502 deletions
diff --git a/resources/libraries/python/MLRsearch/MultipleLossRatioSearch.py b/resources/libraries/python/MLRsearch/MultipleLossRatioSearch.py index 87dc784cbc..dd21444496 100644 --- a/resources/libraries/python/MLRsearch/MultipleLossRatioSearch.py +++ b/resources/libraries/python/MLRsearch/MultipleLossRatioSearch.py @@ -1,4 +1,4 @@ -# Copyright (c) 2020 Cisco and/or its affiliates. +# Copyright (c) 2021 Cisco and/or its affiliates. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at: @@ -17,13 +17,24 @@ import logging import math import time -from .AbstractSearchAlgorithm import AbstractSearchAlgorithm -from .NdrPdrResult import NdrPdrResult +from .MeasurementDatabase import MeasurementDatabase +from .ProgressState import ProgressState from .ReceiveRateInterval import ReceiveRateInterval +from .WidthArithmetics import ( + multiply_relative_width, + step_down, + step_up, + multiple_step_down, + multiple_step_up, + half_step_up, +) -class MultipleLossRatioSearch(AbstractSearchAlgorithm): - """Optimized binary search algorithm for finding NDR and PDR bounds. +class MultipleLossRatioSearch: + """Optimized binary search algorithm for finding bounds for multiple ratios. + + This is unofficially a subclass of AbstractSearchAlgorithm, + but constructor signature is different. Traditional binary search algorithm needs initial interval (lower and upper bound), and returns final interval after bisecting @@ -34,12 +45,13 @@ class MultipleLossRatioSearch(AbstractSearchAlgorithm): The optimized algorithm contains several improvements aimed to reduce overall search time. - One improvement is searching for two intervals at once. - The intervals are for NDR (No Drop Rate) and PDR (Partial Drop Rate). + One improvement is searching for multiple intervals at once. + The intervals differ by the target loss ratio. Lower bound + has to have equal or smaller loss ratio, upper bound has to have larger. Next improvement is that the initial interval does not need to be valid. - Imagine initial interval (10, 11) where 11 is smaller - than the searched value. + Imagine initial interval (10, 11) where loss at 11 is smaller + than the searched ratio. The algorithm will try (11, 13) interval next, and if 13 is still smaller, (13, 17) and so on, doubling width until the upper bound is valid. The part when interval expands is called external search, @@ -65,58 +77,24 @@ class MultipleLossRatioSearch(AbstractSearchAlgorithm): Final phase and preceding intermediate phases are performing external and internal search steps, each resulting interval is the starting point for the next phase. - The resulting interval of final phase is the result of the whole algorithm. + The resulting intervals of final phase is the result of the whole algorithm. - Each non-initial phase uses its own trial duration and width goal. - Any non-initial phase stops searching (for NDR or PDR independently) + Each non-initial phase uses its own trial duration. + Any non-initial phase stops searching (for all ratios independently) when minimum is not a valid lower bound (at current duration), or all of the following is true: - Both bounds are valid, bound bounds are measured at the current phase + Both bounds are valid, bounds are measured at the current phase trial duration, interval width is less than the width goal for current phase. TODO: Review and update this docstring according to rst docs. - TODO: Support configurable number of Packet Loss Ratios. """ - class ProgressState: - """Structure containing data to be passed around in recursion.""" - - def __init__( - self, result, phases, duration, width_goal, packet_loss_ratio, - minimum_transmit_rate, maximum_transmit_rate): - """Convert and store the argument values. - - :param result: Current measured NDR and PDR intervals. - :param phases: How many intermediate phases to perform - before the current one. - :param duration: Trial duration to use in the current phase [s]. - :param width_goal: The goal relative width for the curreent phase. - :param packet_loss_ratio: PDR fraction for the current search. - :param minimum_transmit_rate: Minimum target transmit rate - for the current search [pps]. - :param maximum_transmit_rate: Maximum target transmit rate - for the current search [pps]. - :type result: NdrPdrResult.NdrPdrResult - :type phases: int - :type duration: float - :type width_goal: float - :type packet_loss_ratio: float - :type minimum_transmit_rate: float - :type maximum_transmit_rate: float - """ - self.result = result - self.phases = int(phases) - self.duration = float(duration) - self.width_goal = float(width_goal) - self.packet_loss_ratio = float(packet_loss_ratio) - self.minimum_transmit_rate = float(minimum_transmit_rate) - self.maximum_transmit_rate = float(maximum_transmit_rate) - def __init__( self, measurer, final_relative_width=0.005, final_trial_duration=30.0, initial_trial_duration=1.0, - number_of_intermediate_phases=2, timeout=600.0, doublings=1): + number_of_intermediate_phases=2, timeout=600.0, debug=None, + expansion_coefficient=2.0): """Store the measurer object and additional arguments. :param measurer: Rate provider to use by this search object. @@ -129,284 +107,111 @@ class MultipleLossRatioSearch(AbstractSearchAlgorithm): to perform before the final phase [1]. :param timeout: The search will fail itself when not finished before this overall time [s]. - :param doublings: How many doublings to do in external search step. - Default 1 is suitable for fairly stable tests, - less stable tests might get better overal duration with 2 or more. + :param debug: Callable to use instead of logging.debug(). + :param expansion_coefficient: External search multiplies width by this. :type measurer: AbstractMeasurer.AbstractMeasurer :type final_relative_width: float :type final_trial_duration: float :type initial_trial_duration: float :type number_of_intermediate_phases: int :type timeout: float - :type doublings: int + :type debug: Optional[Callable[[str], None]] + :type expansion_coefficient: float """ - super(MultipleLossRatioSearch, self).__init__(measurer) + self.measurer = measurer self.final_trial_duration = float(final_trial_duration) self.final_relative_width = float(final_relative_width) self.number_of_intermediate_phases = int(number_of_intermediate_phases) self.initial_trial_duration = float(initial_trial_duration) self.timeout = float(timeout) - self.doublings = int(doublings) - - @staticmethod - def double_relative_width(relative_width): - """Return relative width corresponding to double logarithmic width. - - :param relative_width: The base relative width to double. - :type relative_width: float - :returns: The relative width of double logarithmic size. - :rtype: float - """ - return 1.99999 * relative_width - relative_width * relative_width - # The number should be 2.0, but we want to avoid rounding errors, - # and ensure half of double is not larger than the original value. - - @staticmethod - def double_step_down(relative_width, current_bound): - """Return rate of double logarithmic width below. - - :param relative_width: The base relative width to double. - :param current_bound: The current target transmit rate to move [pps]. - :type relative_width: float - :type current_bound: float - :returns: Transmit rate smaller by logarithmically double width [pps]. - :rtype: float - """ - return current_bound * ( - 1.0 - MultipleLossRatioSearch.double_relative_width(relative_width) - ) + self.state = None + self.debug = logging.debug if debug is None else debug + self.expansion_coefficient = float(expansion_coefficient) - @staticmethod - def expand_down(relative_width, doublings, current_bound): - """Return rate of expanded logarithmic width below. - - :param relative_width: The base relative width to double. - :param doublings: How many doublings to do for expansion. - :param current_bound: The current target transmit rate to move [pps]. - :type relative_width: float - :type doublings: int - :type current_bound: float - :returns: Transmit rate smaller by logarithmically double width [pps]. - :rtype: float - """ - for _ in range(doublings): - relative_width = MultipleLossRatioSearch.double_relative_width( - relative_width - ) - return current_bound * (1.0 - relative_width) - - @staticmethod - def double_step_up(relative_width, current_bound): - """Return rate of double logarithmic width above. - - :param relative_width: The base relative width to double. - :param current_bound: The current target transmit rate to move [pps]. - :type relative_width: float - :type current_bound: float - :returns: Transmit rate larger by logarithmically double width [pps]. - :rtype: float - """ - return current_bound / ( - 1.0 - MultipleLossRatioSearch.double_relative_width(relative_width) - ) - - @staticmethod - def expand_up(relative_width, doublings, current_bound): - """Return rate of expanded logarithmic width above. - - :param relative_width: The base relative width to double. - :param doublings: How many doublings to do for expansion. - :param current_bound: The current target transmit rate to move [pps]. - :type relative_width: float - :type doublings: int - :type current_bound: float - :returns: Transmit rate smaller by logarithmically double width [pps]. - :rtype: float - """ - for _ in range(doublings): - relative_width = MultipleLossRatioSearch.double_relative_width( - relative_width - ) - return current_bound / (1.0 - relative_width) - - @staticmethod - def half_relative_width(relative_width): - """Return relative width corresponding to half logarithmic width. - - :param relative_width: The base relative width to halve. - :type relative_width: float - :returns: The relative width of half logarithmic size. - :rtype: float - """ - return 1.0 - math.sqrt(1.0 - relative_width) - - @staticmethod - def half_step_up(relative_width, current_bound): - """Return rate of half logarithmic width above. - - :param relative_width: The base relative width to halve. - :param current_bound: The current target transmit rate to move [pps]. - :type relative_width: float - :type current_bound: float - :returns: Transmit rate larger by logarithmically half width [pps]. - :rtype: float - """ - return current_bound / ( - 1.0 - MultipleLossRatioSearch.half_relative_width(relative_width) - ) - - def narrow_down_ndr_and_pdr(self, min_rate, max_rate, packet_loss_ratio): + def narrow_down_intervals(self, min_rate, max_rate, packet_loss_ratios): """Perform initial phase, create state object, proceed with next phases. + The current implementation requires the ratios so be unique and sorted. + Also non-empty. + :param min_rate: Minimal target transmit rate [tps]. :param max_rate: Maximal target transmit rate [tps]. - :param packet_loss_ratio: Fraction of packets lost, for PDR [1]. + :param packet_loss_ratios: Target ratios of packets loss to locate. :type min_rate: float :type max_rate: float - :type packet_loss_ratio: float + :type packet_loss_ratios: Iterable[float] :returns: Structure containing narrowed down intervals and their measurements. - :rtype: NdrPdrResult.NdrPdrResult + :rtype: List[ReceiveRateInterval] :raises RuntimeError: If total duration is larger than timeout. + Or if ratios list is (empty or) not sorted or unique. """ - minimum_transmit_rate = float(min_rate) - maximum_transmit_rate = float(max_rate) - packet_loss_ratio = float(packet_loss_ratio) - max_measurement = self.measurer.measure( - self.initial_trial_duration, maximum_transmit_rate) + min_rate = float(min_rate) + max_rate = float(max_rate) + packet_loss_ratios = [float(ratio) for ratio in packet_loss_ratios] + if len(packet_loss_ratios) < 1: + raise RuntimeError(u"At least one ratio is required!") + if packet_loss_ratios != sorted(set(packet_loss_ratios)): + raise RuntimeError(u"Input ratios have to be sorted and unique!") + measurements = list() + self.debug(f"First measurement at max rate: {max_rate}") + measured = self.measurer.measure( + duration=self.initial_trial_duration, + transmit_rate=max_rate, + ) + measurements.append(measured) initial_width_goal = self.final_relative_width for _ in range(self.number_of_intermediate_phases): - initial_width_goal = self.double_relative_width(initial_width_goal) - max_lo = maximum_transmit_rate * (1.0 - initial_width_goal) - mrr = max(minimum_transmit_rate, min( - max_lo, max_measurement.relative_receive_rate - )) - mrr_measurement = self.measurer.measure( - self.initial_trial_duration, mrr + initial_width_goal = multiply_relative_width( + initial_width_goal, 2.0 + ) + max_lo = step_down(max_rate, initial_width_goal) + mrr = max(min_rate, min(max_lo, measured.relative_receive_rate)) + self.debug(f"Second measurement at mrr: {mrr}") + measured = self.measurer.measure( + duration=self.initial_trial_duration, + transmit_rate=mrr, ) + measurements.append(measured) # Attempt to get narrower width. - if mrr_measurement.loss_fraction > 0.0: - max2_lo = mrr * (1.0 - initial_width_goal) - mrr2 = min(max2_lo, mrr_measurement.relative_receive_rate) + if measured.loss_ratio > packet_loss_ratios[0]: + max_lo = step_down(mrr, initial_width_goal) + mrr2 = min(max_lo, measured.relative_receive_rate) else: - mrr2 = mrr / (1.0 - initial_width_goal) - if minimum_transmit_rate < mrr2 < maximum_transmit_rate: - max_measurement = mrr_measurement - mrr_measurement = self.measurer.measure( - self.initial_trial_duration, mrr2) - if mrr2 > mrr: - max_measurement, mrr_measurement = \ - (mrr_measurement, max_measurement) - starting_interval = ReceiveRateInterval( - mrr_measurement, max_measurement) - starting_result = NdrPdrResult(starting_interval, starting_interval) - state = self.ProgressState( - starting_result, self.number_of_intermediate_phases, + mrr2 = step_up(mrr, initial_width_goal) + if min_rate < mrr2 < max_rate: + self.debug(f"Third measurement at mrr2: {mrr2}") + measured = self.measurer.measure( + duration=self.initial_trial_duration, + transmit_rate=mrr2, + ) + measurements.append(measured) + # If mrr2 > mrr and mrr2 got zero loss, + # it is better to do external search from mrr2 up. + # To prevent bisection between mrr2 and max_rate, + # we simply remove the max_rate measurement. + # Similar logic applies to higher loss ratio goals. + # Overall, with mrr2 measurement done, we never need + # the first measurement done at max rate. + measurements = measurements[1:] + database = MeasurementDatabase(measurements) + stop_time = time.monotonic() + self.timeout + self.state = ProgressState( + database, self.number_of_intermediate_phases, self.final_trial_duration, self.final_relative_width, - packet_loss_ratio, minimum_transmit_rate, maximum_transmit_rate - ) - state = self.ndrpdr(state) - return state.result - - def _measure_and_update_state(self, state, transmit_rate): - """Perform trial measurement, update bounds, return new state. - - :param state: State before this measurement. - :param transmit_rate: Target transmit rate for this measurement [pps]. - :type state: ProgressState - :type transmit_rate: float - :returns: State after the measurement. - :rtype: ProgressState - """ - # TODO: Implement https://stackoverflow.com/a/24683360 - # to avoid the string manipulation if log verbosity is too low. - logging.info(f"result before update: {state.result}") - logging.debug( - f"relative widths in goals: " - f"{state.result.width_in_goals(self.final_relative_width)}" - ) - measurement = self.measurer.measure(state.duration, transmit_rate) - ndr_interval = self._new_interval( - state.result.ndr_interval, measurement, 0.0 + packet_loss_ratios, min_rate, max_rate, stop_time ) - pdr_interval = self._new_interval( - state.result.pdr_interval, measurement, state.packet_loss_ratio - ) - state.result = NdrPdrResult(ndr_interval, pdr_interval) - return state + self.ndrpdr() + return self.state.database.get_results(ratio_list=packet_loss_ratios) - @staticmethod - def _new_interval(old_interval, measurement, packet_loss_ratio): - """Return new interval with bounds updated according to the measurement. - - :param old_interval: The current interval before the measurement. - :param measurement: The new meaqsurement to take into account. - :param packet_loss_ratio: Fraction for PDR (or zero for NDR). - :type old_interval: ReceiveRateInterval.ReceiveRateInterval - :type measurement: ReceiveRateMeasurement.ReceiveRateMeasurement - :type packet_loss_ratio: float - :returns: The updated interval. - :rtype: ReceiveRateInterval.ReceiveRateInterval - """ - old_lo, old_hi = old_interval.measured_low, old_interval.measured_high - new_lo = new_hi = None - # Priority zero: direct replace if the target Tr is the same. - if measurement.target_tr in (old_lo.target_tr, old_hi.target_tr): - if measurement.target_tr == old_lo.target_tr: - new_lo = measurement - else: - new_hi = measurement - # Priority one: invalid lower bound allows only one type of update. - elif old_lo.loss_fraction > packet_loss_ratio: - # We can only expand down, old bound becomes valid upper one. - if measurement.target_tr < old_lo.target_tr: - new_lo, new_hi = measurement, old_lo - else: - return old_interval - - # Lower bound is now valid. - # Next priorities depend on target Tr. - elif measurement.target_tr < old_lo.target_tr: - # Lower external measurement, relevant only - # if the new measurement has high loss rate. - if measurement.loss_fraction > packet_loss_ratio: - # Returning the broader interval as old_lo - # would be invalid upper bound. - new_lo = measurement - elif measurement.target_tr > old_hi.target_tr: - # Upper external measurement, only relevant for invalid upper bound. - if old_hi.loss_fraction <= packet_loss_ratio: - # Old upper bound becomes valid new lower bound. - new_lo, new_hi = old_hi, measurement - else: - # Internal measurement, replaced boundary - # depends on measured loss fraction. - if measurement.loss_fraction > packet_loss_ratio: - # We have found a narrow valid interval, - # regardless of whether old upper bound was valid. - new_hi = measurement - else: - # In ideal world, we would not want to shrink interval - # if upper bound is not valid. - # In the real world, we want to shrink it for - # "invalid upper bound at maximal rate" case. - new_lo = measurement - - return ReceiveRateInterval( - old_lo if new_lo is None else new_lo, - old_hi if new_hi is None else new_hi - ) + def ndrpdr(self): + """Perform trials for this phase. State is updated in-place. - def ndrpdr(self, state): - """Perform trials for this phase. Return the new state when done. + Recursion to smaller durations is performed (if not performed yet). - :param state: State before this phase. - :type state: ProgressState - :returns: The updated state. - :rtype: ProgressState :raises RuntimeError: If total duration is larger than timeout. """ - start_time = time.time() + state = self.state if state.phases > 0: # We need to finish preceding intermediate phases first. saved_phases = state.phases @@ -420,226 +225,256 @@ class MultipleLossRatioSearch(AbstractSearchAlgorithm): ) # Shorter durations do not need that narrow widths. saved_width = state.width_goal - state.width_goal = self.double_relative_width(state.width_goal) + state.width_goal = multiply_relative_width(saved_width, 2.0) # Recurse. - state = self.ndrpdr(state) + self.ndrpdr() # Restore the state for current phase. - state.duration = saved_duration state.width_goal = saved_width + state.duration = saved_duration state.phases = saved_phases # Not needed, but just in case. - - logging.info( - f"starting iterations with duration {state.duration} and relative " - f"width goal {state.width_goal}" + self.debug( + f"Starting phase with {state.duration} duration" + f" and {state.width_goal} relative width goal." ) - while 1: - if time.time() > start_time + self.timeout: - raise RuntimeError(u"Optimized search takes too long.") - # Order of priorities: invalid bounds (nl, pl, nh, ph), - # then narrowing relative Tr widths. - # Durations are not priorities yet, - # they will settle on their own hopefully. - ndr_lo = state.result.ndr_interval.measured_low - ndr_hi = state.result.ndr_interval.measured_high - pdr_lo = state.result.pdr_interval.measured_low - pdr_hi = state.result.pdr_interval.measured_high - ndr_rel_width = max( - state.width_goal, state.result.ndr_interval.rel_tr_width + failing_fast = False + database = state.database + database.set_current_duration(state.duration) + while time.monotonic() < state.stop_time: + for index, ratio in enumerate(state.packet_loss_ratios): + new_tr = self._select_for_ratio(ratio) + if new_tr is None: + # Either this ratio is fine, or min rate got invalid result. + # If fine, we will continue to handle next ratio. + if index > 0: + # First ratio passed, all next have a valid lower bound. + continue + lower_bound, _, _, _, _, _ = database.get_bounds(ratio) + if lower_bound is None: + failing_fast = True + self.debug(u"No valid lower bound for this iteration.") + break + # First ratio is fine. + continue + # We have transmit rate to measure at. + # We do not check duration versus stop_time here, + # as some measurers can be unpredictably faster + # than what duration suggests. + measurement = self.measurer.measure( + duration=state.duration, + transmit_rate=new_tr, + ) + database.add(measurement) + # Restart ratio handling on updated database. + break + else: + # No ratio needs measuring, we are done with this phase. + self.debug(u"Phase done.") + break + # We have broken out of the for loop. + if failing_fast: + # Abort the while loop early. + break + # Not failing fast but database got updated, restart the while loop. + else: + # Time is up. + raise RuntimeError(u"Optimized search takes too long.") + # Min rate is not valid, but returning what we have + # so next duration can recover. + + @staticmethod + def improves(new_bound, lower_bound, upper_bound): + """Return whether new bound improves upon old bounds. + + To improve, new_bound has to be not None, + and between the old bounds (where the bound is not None). + + This piece of logic is commonly used, when we know old bounds + from a primary source (e.g. current duration database) + and new bound from a secondary source (e.g. previous duration database). + Having a function allows "if improves(..):" construction to save space. + + :param new_bound: The bound we consider applying. + :param lower_bound: Known bound, new_bound has to be higher to apply. + :param upper_bound: Known bound, new_bound has to be lower to apply. + :type new_bound: Optional[ReceiveRateMeasurement] + :type lower_bound: Optional[ReceiveRateMeasurement] + :type upper_bound: Optional[ReceiveRateMeasurement] + :returns: Whether we can apply the new bound. + :rtype: bool + """ + if new_bound is None: + return False + if lower_bound is not None: + if new_bound.target_tr <= lower_bound.target_tr: + return False + if upper_bound is not None: + if new_bound.target_tr >= upper_bound.target_tr: + return False + return True + + def _select_for_ratio(self, ratio): + """Return None or new target_tr to measure at. + + Returning None means either we have narrow enough valid interval + for this ratio, or we are hitting min rate and should fail early. + + :param ratio: Loss ratio to ensure narrow valid bounds for. + :type ratio: float + :returns: The next target transmit rate to measure at. + :rtype: Optional[float] + :raises RuntimeError: If database inconsistency is detected. + """ + state = self.state + data = state.database + bounds = data.get_bounds(ratio) + cur_lo1, cur_hi1, pre_lo, pre_hi, cur_lo2, cur_hi2 = bounds + pre_lo_improves = self.improves(pre_lo, cur_lo1, cur_hi1) + pre_hi_improves = self.improves(pre_hi, cur_lo1, cur_hi1) + if pre_lo_improves and pre_hi_improves: + # We allowed larger width for previous phase + # as single bisect here guarantees only one re-measurement. + new_tr = self._bisect(pre_lo, pre_hi) + if new_tr is not None: + self.debug(f"Initial bisect for {ratio}, tr: {new_tr}") + return new_tr + if pre_lo_improves: + new_tr = pre_lo.target_tr + self.debug(f"Re-measuring lower bound for {ratio}, tr: {new_tr}") + return new_tr + if pre_hi_improves: + new_tr = pre_hi.target_tr + self.debug(f"Re-measuring upper bound for {ratio}, tr: {new_tr}") + return new_tr + if cur_lo1 is None and cur_hi1 is None: + raise RuntimeError(u"No results found in databases!") + if cur_lo1 is None: + # Upper bound exists (cur_hi1). + # We already tried previous lower bound. + # So, we want to extend down. + new_tr = self._extend_down( + cur_hi1, cur_hi2, pre_hi, second_needed=False ) - pdr_rel_width = max( - state.width_goal, state.result.pdr_interval.rel_tr_width + self.debug( + f"Extending down for {ratio}:" + f" old {cur_hi1.target_tr} new {new_tr}" ) - # If we are hitting maximal or minimal rate, we cannot shift, - # but we can re-measure. - new_tr = self._ndrpdr_loss_fraction( - state, ndr_lo, ndr_hi, pdr_lo, pdr_hi, ndr_rel_width, - pdr_rel_width + return new_tr + if cur_hi1 is None: + # Lower bound exists (cur_lo1). + # We already tried previous upper bound. + # So, we want to extend up. + new_tr = self._extend_up(cur_lo1, cur_lo2, pre_lo) + self.debug( + f"Extending up for {ratio}:" + f" old {cur_lo1.target_tr} new {new_tr}" ) - - if new_tr is not None: - state = self._measure_and_update_state(state, new_tr) - continue - - # If we are hitting maximum_transmit_rate, - # it is still worth narrowing width, - # hoping large enough loss fraction will happen. - # But if we are hitting the minimal rate (at current duration), - # no additional measurement will help with that, - # so we can stop narrowing in this phase. - if (ndr_lo.target_tr <= state.minimum_transmit_rate - and ndr_lo.loss_fraction > 0.0): - ndr_rel_width = 0.0 - if (pdr_lo.target_tr <= state.minimum_transmit_rate - and pdr_lo.loss_fraction > state.packet_loss_ratio): - pdr_rel_width = 0.0 - - new_tr = self._ndrpdr_width_goal( - state, ndr_lo, pdr_lo, ndr_rel_width, pdr_rel_width + return new_tr + # Both bounds exist (cur_lo1 and cur_hi1). + # cur_lo1 might have been selected for this ratio (we are bisecting) + # or for previous ratio (we are extending down for this ratio). + # Compute both estimates and choose the higher value. + bisected_tr = self._bisect(cur_lo1, cur_hi1) + extended_tr = self._extend_down( + cur_hi1, cur_hi2, pre_hi, second_needed=True + ) + # Only if both are not None we need to decide. + if bisected_tr and extended_tr and extended_tr > bisected_tr: + self.debug( + f"Extending down for {ratio}:" + f" old {cur_hi1.target_tr} new {extended_tr}" ) - - if new_tr is not None: - state = self._measure_and_update_state(state, new_tr) - continue - - # We do not need to improve width, but there still might be - # some measurements with smaller duration. - new_tr = self._ndrpdr_duration( - state, ndr_lo, ndr_hi, pdr_lo, pdr_hi, ndr_rel_width, - pdr_rel_width + new_tr = extended_tr + else: + self.debug( + f"Bisecting for {ratio}: lower {cur_lo1.target_tr}," + f" upper {cur_hi1.target_tr}, new {bisected_tr}" ) - - if new_tr is not None: - state = self._measure_and_update_state(state, new_tr) - continue - - # Widths are narrow (or lower bound minimal), bound measurements - # are long enough, we can return. - logging.info(u"phase done") - break - return state - - def _ndrpdr_loss_fraction( - self, state, ndr_lo, ndr_hi, pdr_lo, pdr_hi, ndr_rel_width, - pdr_rel_width): - """Perform loss_fraction-based trials within a ndrpdr phase - - :param state: current state - :param ndr_lo: ndr interval measured low - :param ndr_hi: ndr interval measured high - :param pdr_lo: pdr interval measured low - :param pdr_hi: pdr interval measured high - :param ndr_rel_width: ndr interval relative width - :param pdr_rel_width: pdr interval relative width - :type state: ProgressState - :type ndr_lo: ReceiveRateMeasurement.ReceiveRateMeasurement - :type ndr_hi: ReceiveRateMeasurement.ReceiveRateMeasurement - :type pdr_lo: ReceiveRateMeasurement.ReceiveRateMeasurement - :type pdr_hi: ReceiveRateMeasurement.ReceiveRateMeasurement - :type ndr_rel_width: float - :type pdr_rel_width: float - :returns: a new transmit rate if one should be applied - :rtype: float + new_tr = bisected_tr + return new_tr + + def _extend_down(self, cur_hi1, cur_hi2, pre_hi, second_needed=False): + """Return extended width below, or None if hitting min rate. + + If no second tightest (nor previous) upper bound is available, + the behavior is governed by second_needed argument. + If true, return None, if false, start from width goal. + This is useful, as if a bisect is possible, + we want to give it a chance. + + :param cur_hi1: Tightest upper bound for current duration. Has to exist. + :param cur_hi2: Second tightest current upper bound, may not exist. + :param pre_hi: Tightest upper bound, previous duration, may not exist. + :param second_needed: Whether second tightest bound is required. + :type cur_hi1: ReceiveRateMeasurement + :type cur_hi2: Optional[ReceiveRateMeasurement] + :type pre_hi: Optional[ReceiveRateMeasurement] + :type second_needed: bool + :returns: The next target transmit rate to measure at. + :rtype: Optional[float] """ - result = None - if ndr_lo.loss_fraction > 0.0: - if ndr_lo.target_tr > state.minimum_transmit_rate: - result = max( - state.minimum_transmit_rate, self.expand_down( - ndr_rel_width, self.doublings, ndr_lo.target_tr - ) - ) - logging.info(f"ndr lo external {result}") - elif ndr_lo.duration < state.duration: - result = state.minimum_transmit_rate - logging.info(u"ndr lo minimal re-measure") - - if result is None and pdr_lo.loss_fraction > state.packet_loss_ratio: - if pdr_lo.target_tr > state.minimum_transmit_rate: - result = max( - state.minimum_transmit_rate, self.expand_down( - pdr_rel_width, self.doublings, pdr_lo.target_tr - ) - ) - logging.info(f"pdr lo external {result}") - elif pdr_lo.duration < state.duration: - result = state.minimum_transmit_rate - logging.info(u"pdr lo minimal re-measure") - - if result is None and ndr_hi.loss_fraction <= 0.0: - if ndr_hi.target_tr < state.maximum_transmit_rate: - result = min( - state.maximum_transmit_rate, self.expand_up( - ndr_rel_width, self.doublings, ndr_hi.target_tr - ) - ) - logging.info(f"ndr hi external {result}") - elif ndr_hi.duration < state.duration: - result = state.maximum_transmit_rate - logging.info(u"ndr hi maximal re-measure") - - if result is None and pdr_hi.loss_fraction <= state.packet_loss_ratio: - if pdr_hi.target_tr < state.maximum_transmit_rate: - result = min( - state.maximum_transmit_rate, self.expand_up( - pdr_rel_width, self.doublings, pdr_hi.target_tr - ) - ) - logging.info(f"pdr hi external {result}") - elif pdr_hi.duration < state.duration: - result = state.maximum_transmit_rate - logging.info(u"ndr hi maximal re-measure") - return result - - def _ndrpdr_width_goal( - self, state, ndr_lo, pdr_lo, ndr_rel_width, pdr_rel_width): - """Perform width_goal-based trials within a ndrpdr phase - - :param state: current state - :param ndr_lo: ndr interval measured low - :param pdr_lo: pdr interval measured low - :param ndr_rel_width: ndr interval relative width - :param pdr_rel_width: pdr interval relative width - :type state: ProgressState - :type ndr_lo: ReceiveRateMeasurement.ReceiveRateMeasurement - :type pdr_lo: ReceiveRateMeasurement.ReceiveRateMeasurement - :type ndr_rel_width: float - :type pdr_rel_width: float - :returns: a new transmit rate if one should be applied - :rtype: float - Return a new transmit rate if one should be applied. + state = self.state + old_tr = cur_hi1.target_tr + next_bound = cur_hi2 + if self.improves(pre_hi, cur_hi1, cur_hi2): + next_bound = pre_hi + if next_bound is None and second_needed: + return None + old_width = state.width_goal + if next_bound is not None: + old_width = ReceiveRateInterval(cur_hi1, next_bound).rel_tr_width + old_width = max(old_width, state.width_goal) + new_tr = multiple_step_down( + old_tr, old_width, self.expansion_coefficient + ) + new_tr = max(new_tr, state.min_rate) + if new_tr >= old_tr: + self.debug(u"Extend down hits max rate.") + return None + return new_tr + + def _extend_up(self, cur_lo1, cur_lo2, pre_lo): + """Return extended width above, or None if hitting max rate. + + :param cur_lo1: Tightest lower bound for current duration. Has to exist. + :param cur_lo2: Second tightest current lower bound, may not exist. + :param pre_lo: Tightest lower bound, previous duration, may not exist. + :type cur_lo1: ReceiveRateMeasurement + :type cur_lo2: Optional[ReceiveRateMeasurement] + :type pre_lo: Optional[ReceiveRateMeasurement] + :returns: The next target transmit rate to measure at. + :rtype: Optional[float] """ - if ndr_rel_width > state.width_goal: - # We have to narrow NDR width first, as NDR internal search - # can invalidate PDR (but not vice versa). - result = self.half_step_up(ndr_rel_width, ndr_lo.target_tr) - logging.info(f"Bisecting for NDR at {result}") - elif pdr_rel_width > state.width_goal: - # PDR internal search. - result = self.half_step_up(pdr_rel_width, pdr_lo.target_tr) - logging.info(f"Bisecting for PDR at {result}") - else: - result = None - return result - - @staticmethod - def _ndrpdr_duration( - state, ndr_lo, ndr_hi, pdr_lo, pdr_hi, ndr_rel_width, - pdr_rel_width): - """Perform duration-based trials within a ndrpdr phase - - :param state: current state - :param ndr_lo: ndr interval measured low - :param ndr_hi: ndr interval measured high - :param pdr_lo: pdr interval measured low - :param pdr_hi: pdr interval measured high - :param ndr_rel_width: ndr interval relative width - :param pdr_rel_width: pdr interval relative width - :type state: ProgressState - :type ndr_lo: ReceiveRateMeasurement.ReceiveRateMeasurement - :type ndr_hi: ReceiveRateMeasurement.ReceiveRateMeasurement - :type pdr_lo: ReceiveRateMeasurement.ReceiveRateMeasurement - :type pdr_hi: ReceiveRateMeasurement.ReceiveRateMeasurement - :type ndr_rel_width: float - :type pdr_rel_width: float - :returns: a new transmit rate if one should be applied - :rtype: float + state = self.state + old_tr = cur_lo1.target_tr + next_bound = cur_lo2 + if self.improves(pre_lo, cur_lo2, cur_lo1): + next_bound = pre_lo + old_width = state.width_goal + if next_bound is not None: + old_width = ReceiveRateInterval(cur_lo1, next_bound).rel_tr_width + old_width = max(old_width, state.width_goal) + new_tr = multiple_step_up(old_tr, old_width, self.expansion_coefficient) + new_tr = min(new_tr, state.max_rate) + if new_tr <= old_tr: + self.debug(u"Extend up hits max rate.") + return None + return new_tr + + def _bisect(self, lower_bound, upper_bound): + """Return middle rate or None if width is narrow enough. + + :param lower_bound: Measurement to use as a lower bound. Has to exist. + :param upper_bound: Measurement to use as an upper bound. Has to exist. + :type lower_bound: ReceiveRateMeasurement + :type upper_bound: ReceiveRateMeasurement + :returns: The next target transmit rate to measure at. + :rtype: Optional[float] + :raises RuntimeError: If database inconsistency is detected. """ - # We need to re-measure with full duration, possibly - # creating invalid bounds to resolve (thus broadening width). - if ndr_lo.duration < state.duration: - result = ndr_lo.target_tr - logging.info(u"re-measuring NDR lower bound") - elif pdr_lo.duration < state.duration: - result = pdr_lo.target_tr - logging.info(u"re-measuring PDR lower bound") - # Except when lower bounds have high loss fraction, in that case - # we do not need to re-measure _upper_ bounds. - elif ndr_hi.duration < state.duration and ndr_rel_width > 0.0: - result = ndr_hi.target_tr - logging.info(u"re-measuring NDR upper bound") - elif pdr_hi.duration < state.duration and pdr_rel_width > 0.0: - result = pdr_hi.target_tr - logging.info(u"re-measuring PDR upper bound") - else: - result = None - return result + state = self.state + width = ReceiveRateInterval(lower_bound, upper_bound).rel_tr_width + if width <= state.width_goal: + self.debug(u"No more bisects needed.") + return None + new_tr = half_step_up(lower_bound.target_tr, width, state.width_goal) + return new_tr |