aboutsummaryrefslogtreecommitdiffstats
path: root/resources/libraries/python/MLRsearch/multiple_loss_ratio_search.py
blob: b48e2e7547d8699f139e3e29e10c967ce8437cf2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
# Copyright (c) 2023 Cisco and/or its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Module defining MultipleLossRatioSearch class."""

import logging
import time

from dataclasses import dataclass
from typing import Callable, Optional, Tuple

from .candidate import Candidate
from .config import Config
from .dataclass import secondary_field
from .discrete_load import DiscreteLoad
from .discrete_result import DiscreteResult
from .expander import GlobalWidth
from .limit_handler import LimitHandler
from .load_rounding import LoadRounding
from .measurement_database import MeasurementDatabase
from .pep3140 import Pep3140Dict
from .search_goal import SearchGoal
from .selector import Selector
from .target_scaling import TargetScaling
from .trial_measurement import AbstractMeasurer
from .trimmed_stat import TrimmedStat


@dataclass
class MultipleLossRatioSearch:
    """Optimized binary search algorithm for finding conditional throughputs.

    Traditional binary search algorithm needs initial interval
    (lower and upper bound), and returns final narrow bounds
    (related to its search goal) after bisecting
    (until some exit condition is met).
    The exit condition is usually related to the interval width,
    (upper bound value minus lower bound value).

    The optimized algorithm in this class contains several improvements
    aimed to reduce overall search time.

    One improvement is searching for bounds for multiple search goals at once.
    Specifically, the trial measurement results influence bounds for all goals,
    even though the selection of trial inputs for next measurement
    focuses only on one goal. The focus can switch between goals frequently.

    Next improvement is that results of trial measurements
    with small trial duration can be used to find a reasonable starting interval
    for full trial duration search.
    This results in more trials performed, but smaller overall duration
    in general.
    Internally, such shorter trials come from "preceding targets",
    handled in a same way as a search goal "final target".
    Related improvement is that the "current" interval does not need to be valid
    (e.g. one of the bounds is missing).
    In that case, this algorithm will move and expand the interval,
    in a process called external search. Only when both bounds are found,
    the interval bisection (called internal search) starts making it narrow.

    Next improvement is bisecting in logarithmic quantities,
    so that target relative width is independent of measurement units.

    Next improvement is basing the initial interval on forwarding rates
    of few initial measurements, starting at max load and using forwarding rates
    seen so far.

    Next improvement is to allow the use of multiple shorter trials
    instead one big trial, allowing a percentage of trials
    to exceed the loss ratio target.
    This makes the result more stable in practice.
    Conservative behavior (single long trial, zero exceed ratio)
    is still available using corresponding goal definitions.

    Final improvement is exiting early if the minimal load
    is not a valid lower bound (at final duration)
    and also exiting if the overall search duration is too long.

    There are also subtle optimizations related to candidate selection
    and uneven splitting of intervals, too numerous to list here.
    """

    config: Config
    """Arguments required at construction time."""
    # End of fields required at intance creation.
    measurer: AbstractMeasurer = secondary_field()
    """Measurer to use, set at calling search()."""
    debug: Callable[[str], None] = secondary_field()
    """Object to call for logging, None means logging.debug."""
    # Fields below are computed from data above
    rounding: LoadRounding = secondary_field()
    """Derived from goals. Instance to use for intended load rounding."""
    from_float: Callable[[float], DiscreteLoad] = secondary_field()
    """Conversion method from float [tps] intended load values."""
    limit_handler: LimitHandler = secondary_field()
    """Load post-processing utility based on config and rounding."""
    scaling: TargetScaling = secondary_field()
    """Utility for creating target chains for search goals."""
    database: MeasurementDatabase = secondary_field()
    """Storage for (stats of) measurement results so far."""
    stop_time: float = secondary_field()
    """Monotonic time value at which the search should end with failure."""

    def search(
        self,
        measurer: AbstractMeasurer,
        debug: Optional[Callable[[str], None]] = None,
    ) -> Pep3140Dict[SearchGoal, Optional[TrimmedStat]]:
        """Perform initial trials, create state object, proceed with main loop.

        Stateful arguments (measurer and debug) are stored.
        Derived objects are constructed from config.

        :param measurer: Measurement provider to use by this search object.
        :param debug: Callable to optionally use instead of logging.debug().
        :returns: Structure containing conditional throughputs and other stats,
            one for each search goal.
        :type measurer: AbstractMeasurer
        :type debug: Optional[Callable[[str], None]]
        :returns: Mapping from goal to lower bound (none if min load is hit).
        :rtype: Pep3140Dict[SearchGoal, Optional[TrimmedStat]]
        :raises RuntimeError: If total duration is larger than timeout,
            or if min load becomes an upper bound for a search goal
            that has fail fast true.
        """
        self.measurer = measurer
        self.debug = logging.debug if debug is None else debug
        self.rounding = LoadRounding(
            min_load=self.config.min_load,
            max_load=self.config.max_load,
            float_goals=[goal.relative_width for goal in self.config.goals],
        )
        self.from_float = DiscreteLoad.float_conver(rounding=self.rounding)
        self.limit_handler = LimitHandler(
            rounding=self.rounding,
            debug=self.debug,
        )
        self.scaling = TargetScaling(
            goals=self.config.goals,
            rounding=self.rounding,
        )
        self.database = MeasurementDatabase(self.scaling.targets)
        self.stop_time = time.monotonic() + self.config.search_duration_max
        result0, result1 = self.run_initial_trials()
        self.main_loop(result0.discrete_load, result1.discrete_load)
        ret_dict = Pep3140Dict()
        for goal in self.config.goals:
            target = self.scaling.goal_to_final_target[goal]
            bounds = self.database.get_relevant_bounds(target=target)
            ret_dict[goal] = bounds.clo
        return ret_dict

    def measure(self, duration: float, load: DiscreteLoad) -> DiscreteResult:
        """Call measurer and put the result to appropriate form in database.

        Also check the argument types and load roundness,
        and return the result to the caller.

        :param duration: Intended duration for the trial measurement.
        :param load: Intended load for the trial measurement:
        :type duration: float
        :type load: DiscreteLoad
        :returns: The trial results.
        :rtype: DiscreteResult
        :raises RuntimeError: If an argument doed not have the required type.
        """
        if not isinstance(duration, float):
            raise RuntimeError(f"Duration has to be float: {duration!r}")
        if not isinstance(load, DiscreteLoad):
            raise RuntimeError(f"Load has to be discrete: {load!r}")
        if not load.is_round:
            raise RuntimeError(f"Told to measure unrounded: {load!r}")
        self.debug(f"Measuring at d={duration},il={int(load)}")
        result = self.measurer.measure(
            intended_duration=duration,
            intended_load=float(load),
        )
        self.debug(f"Measured lr={result.loss_ratio}")
        result = DiscreteResult.with_load(result=result, load=load)
        self.database.add(result)
        return result

    def run_initial_trials(self) -> Tuple[DiscreteResult, DiscreteResult]:
        """Perform trials to get enough data to start the selectors.

        Measurements are done with all initial targets in mind,
        based on smallest target loss ratio, largest initial trial duration,
        and largest initial target width.

        Forwarding rate is used as a hint for next intended load.
        The relative quantity is used, as load can use different units.
        When the smallest target loss ratio is non-zero, a correction is needed
        (forwarding rate is only a good hint for zero loss ratio load).
        The correction is conservative (all increase in load turns to losses).

        Also, warmup trial (if configured) is performed,
        all other trials are added to the database.

        This could return the initial width, but from implementation perspective
        it is easier to return two measurements (or the same one twice) here
        and compute width later. The "one value twice" happens when max load
        has small loss, or when min load has big loss.

        :returns: Two last measured values, in any order. Or one value twice.
        :rtype: Tuple[DiscreteResult, DiscreteResult]
        """
        max_load = self.limit_handler.max_load
        ratio, duration, width = None, None, None
        for target in self.scaling.targets:
            if target.preceding:
                continue
            if ratio is None or ratio > target.loss_ratio:
                ratio = target.loss_ratio
            if not duration or duration < target.trial_duration:
                duration = target.trial_duration
            if not width or width < target.discrete_width:
                width = target.discrete_width
        self.debug(f"Init ratio {ratio} duration {duration} width {width}")
        if self.config.warmup_duration:
            self.debug("Warmup trial.")
            self.measure(self.config.warmup_duration, max_load)
            # Warmup should not affect the real results, reset the database.
            self.database = MeasurementDatabase(self.scaling.targets)
        self.debug(f"First trial at max rate: {max_load}")
        result0 = self.measure(duration, max_load)
        rfr = result0.relative_forwarding_rate
        corrected_rfr = (self.from_float(rfr) / (1.0 - ratio)).rounded_down()
        if corrected_rfr >= max_load:
            self.debug("Small loss, no other initial trials are needed.")
            return result0, result0
        mrr = self.limit_handler.handle(corrected_rfr, width, None, max_load)
        self.debug(f"Second trial at (corrected) mrr: {mrr}")
        result1 = self.measure(duration, mrr)
        # Attempt to get narrower width.
        result_ratio = result1.loss_ratio
        if result_ratio > ratio:
            rfr2 = result1.relative_forwarding_rate
            crfr2 = (self.from_float(rfr2) / (1.0 - ratio)).rounded_down()
            mrr2 = self.limit_handler.handle(crfr2, width, None, mrr)
        else:
            mrr2 = mrr + width
            mrr2 = self.limit_handler.handle(mrr2, width, mrr, max_load)
        if not mrr2:
            self.debug("Close enough, measuring at mrr2 is not needed.")
            return result1, result1
        self.debug(f"Third trial at (corrected) mrr2: {mrr2}")
        result2 = self.measure(duration, mrr2)
        return result1, result2

    def main_loop(self, load0: DiscreteLoad, load1: DiscreteLoad) -> None:
        """Initialize selectors and keep measuring the winning candidate.

        Selectors are created, the two input loads are useful starting points.

        The search ends when no selector nominates any candidate,
        or if the search takes too long (or if a selector raises).

        Winner is selected according to ordering defined in Candidate class.
        In case of a tie, selectors for earlier goals are preferred.

        As a selector is only allowed to update current width as the winner,
        the update is done here explicitly.

        :param load0: Discrete load of one of results from run_initial_trials.
        :param load1: Discrete load of other of results from run_initial_trials.
        :type load0: DiscreteLoad
        :type load1: DiscreteLoad
        :raises RuntimeError: If the search takes too long,
            or if min load becomes an upper bound for any search goal
        """
        if load1 < load0:
            load0, load1 = load1, load0
        global_width = GlobalWidth.from_loads(load0, load1)
        selectors = []
        for target in self.scaling.goal_to_final_target.values():
            selector = Selector(
                final_target=target,
                global_width=global_width,
                initial_lower_load=load0,
                initial_upper_load=load1,
                database=self.database,
                handler=self.limit_handler,
                debug=self.debug,
            )
            selectors.append(selector)
        while time.monotonic() < self.stop_time:
            winner = Candidate()
            for selector in selectors:
                # Order of arguments is important
                # when two targets nominate the same candidate.
                winner = min(Candidate.nomination_from(selector), winner)
            if not winner:
                break
            # We do not check duration versus stop_time here,
            # as some measurers can be unpredictably faster
            # than their intended duration suggests.
            self.measure(duration=winner.duration, load=winner.load)
            # Delayed updates.
            if winner.width:
                global_width.width = winner.width
            winner.won()
        else:
            raise RuntimeError("Optimized search takes too long.")
        self.debug("Search done.")