1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
|
# Copyright (c) 2024 Cisco and/or its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Module for tracking average and variance for data of weighted samples.
See log_plus for an explanation why None acts as a special case "float" number.
TODO: Current implementation sets zero averages on empty tracker.
Should we set None instead?
TODO: Implement __str__ and __repr__ for easier debugging.
"""
import copy
import math
import numpy
# TODO: Teach FD.io CSIT to use multiple dirs in PYTHONPATH,
# then switch to absolute imports within PLRsearch package.
# Current usage of relative imports is just a short term workaround.
from .log_plus import log_plus, safe_exp
class ScalarStatTracker:
"""Class for tracking one-dimensional samples.
Variance of one-dimensional data cannot be negative,
so this class avoids possible underflows by tracking logarithm
of the variance instead.
Similarly, sum of weights is also tracked as a logarithm.
"""
def __init__(self, log_sum_weight=None, average=0.0, log_variance=None):
"""Initialize new tracker instance, empty by default.
:param log_sum_weight: Natural logarithm of sum of weights
of samples so far. Default: None (as log of 0.0).
:param average: Weighted average of the samples. Default: 0.0
:param log_variance: Natural logarithm of variance.
Default: None (as log of 0.0).
:type log_sum_weight: float or None
:type average: float
:type log_variance: float or None
"""
self.log_sum_weight = log_sum_weight
self.average = average
self.log_variance = log_variance
def __repr__(self):
"""Return string, which interpreted constructs state of self.
:returns: Expression constructing an equivalent instance.
:rtype: str
"""
return (
f"ScalarStatTracker(log_sum_weight={self.log_sum_weight!r},"
f"average={self.average!r},log_variance={self.log_variance!r})"
)
def copy(self):
"""Return new ScalarStatTracker instance with the same state as self.
The point of this method is the return type, instances of subclasses
can get rid of their additional data this way.
:returns: New instance with the same core state.
:rtype: ScalarStatTracker
"""
return ScalarStatTracker(
self.log_sum_weight, self.average, self.log_variance
)
def add(self, scalar_value, log_weight=0.0):
"""Return updated stats corresponding to addition of another sample.
:param scalar_value: The scalar value of the sample.
:param log_weight: Natural logarithm of weight of the sample.
Default: 0.0 (as log of 1.0).
:type scalar_value: float
:type log_weight: float
:returns: Updated self.
:rtype: ScalarStatTracker
"""
old_log_sum_weight = self.log_sum_weight
if old_log_sum_weight is None:
# First sample.
self.log_sum_weight = log_weight
self.average = scalar_value
return self
old_average = self.average
log_variance = self.log_variance
new_log_sum_weight = log_plus(old_log_sum_weight, log_weight)
log_sample_ratio = log_weight - new_log_sum_weight
sample_ratio = math.exp(log_sample_ratio)
shift = scalar_value - old_average
new_average = old_average + shift * sample_ratio
# The log_variance is updated in-place (instead of new_ vs old_)
# because of if-s detecting None-s where the value does not change.
absolute_shift = abs(shift)
if absolute_shift > 0.0:
log_square_shift = 2 * math.log(absolute_shift)
log_variance = log_plus(
log_variance, log_square_shift + log_sample_ratio
)
if log_variance is not None:
log_variance += old_log_sum_weight - new_log_sum_weight
self.log_sum_weight = new_log_sum_weight
self.average = new_average
self.log_variance = log_variance
return self
class ScalarDualStatTracker(ScalarStatTracker):
"""Class for tracking one-dimensional samples, offering dual stats.
It means that instead of just primary stats (identical to what
ScalarStatTracker offers, that is why this is its subclass),
this tracker allso offers secondary stats.
Secondary stats are scalar stats that track the same data,
except that the most weighty (or later if tied) sample is not considered.
Users can analyze how do secondary stats differ from the primary ones,
and based on that decide whether they processed "enough" data.
One typical use is for Monte Carlo integrator to decide whether
the partial sums so far are reliable enough.
"""
def __init__(
self,
log_sum_weight=None,
average=0.0,
log_variance=None,
log_sum_secondary_weight=None,
secondary_average=0.0,
log_secondary_variance=None,
max_log_weight=None,
):
"""Initialize new tracker instance, empty by default.
:param log_sum_weight: Natural logarithm of sum of weights
of samples so far. Default: None (as log of 0.0).
:param average: Weighted average of the samples. Default: 0.0
:param log_variance: Natural logarithm of variance.
Default: None (as log of 0.0).
:param log_sum_secondary_weight: Natural logarithm of sum of weights
of samples so far except weightest one.
Default: None (as log of 0.0).
:param secondary_average: Weighted average of the samples
except the weightest one. Default: 0.0
:param log_variance: Natural logarithm of variance od samples
except the weightest one. Default: None (as log of 0.0).
:param max_log_weight: Natural logarithm of weight of sample
counted in primary but not secondary stats.
Default: None (as log of 0.0).
:type log_sum_weight: float or None
:type average: float
:type log_variance: float or None
:type log_sum_secondary_weight: float or None
:type secondary_average: float
:type log_secondary_variance: float or None
:type max_log_weight: float or None
"""
# Not using super() as the constructor signature is different,
# so in case of diamond inheritance mismatch would be probable.
ScalarStatTracker.__init__(self, log_sum_weight, average, log_variance)
self.secondary = ScalarStatTracker(
log_sum_secondary_weight, secondary_average, log_secondary_variance
)
self.max_log_weight = max_log_weight
def __repr__(self):
"""Return string, which interpreted constructs state of self.
:returns: Expression contructing an equivalent instance.
:rtype: str
"""
sec = self.secondary
return (
f"ScalarDualStatTracker(log_sum_weight={self.log_sum_weight!r},"
f"average={self.average!r},log_variance={self.log_variance!r},"
f"log_sum_secondary_weight={sec.log_sum_weight!r},"
f"secondary_average={sec.average!r},"
f"log_secondary_variance={sec.log_variance!r},"
f"max_log_weight={self.max_log_weight!r})"
)
def add(self, scalar_value, log_weight=0.0):
"""Return updated both stats after addition of another sample.
:param scalar_value: The scalar value of the sample.
:param log_weight: Natural logarithm of weight of the sample.
Default: 0.0 (as log of 1.0).
:type scalar_value: float
:type log_weight: float
:returns: Updated self.
:rtype: ScalarDualStatTracker
"""
# Using super() as copy() and add() are not expected to change
# signature, so this way diamond inheritance will be supported.
primary = super()
if self.max_log_weight is None or log_weight >= self.max_log_weight:
self.max_log_weight = log_weight
self.secondary = primary.copy()
else:
self.secondary.add(scalar_value, log_weight)
primary.add(scalar_value, log_weight)
return self
def get_pessimistic_variance(self):
"""Return estimate of variance reflecting weight effects.
Typical scenario is the primary tracker dominated by a single sample.
In worse case, secondary tracker is also dominated by
a single (but different) sample.
Current implementation simply returns variance of average
of the two trackers, as if they were independent.
:returns: Pessimistic estimate of variance (not stdev, no log).
:rtype: float
"""
var_primary = safe_exp(self.log_variance)
var_secondary = safe_exp(self.secondary.log_variance)
var_combined = (var_primary + var_secondary) / 2
avg_half_diff = (self.average - self.secondary.average) / 2
var_combined += avg_half_diff * avg_half_diff
return var_combined
class VectorStatTracker:
"""Class for tracking multi-dimensional samples.
Contrary to one-dimensional data, multi-dimensional covariance matrix
contains off-diagonal elements which can be negative.
But, sum of weights is never negative, so it is tracked as a logarithm.
The code assumes every method gets arguments with correct dimensionality
as set in constructor. No checks are performed (for performance reasons).
TODO: Should we provide a subclass with the dimensionality checks?
"""
def __init__(
self,
dimension=2,
log_sum_weight=None,
averages=None,
covariance_matrix=None,
):
"""Initialize new tracker instance, two-dimensional empty by default.
If any of latter two arguments is None, it means
the tracker state is invalid. Use reset method
to create empty tracker of constructed dimensionality.
:param dimension: Number of scalar components of samples.
:param log_sum_weight: Natural logarithm of sum of weights
of samples so far. Default: None (as log of 0.0).
:param averages: Weighted average of the samples.
Default: None
:param covariance_matrix: Variance matrix elements.
Default: None
:type log_sum_weight: float or None
:type averages: None or tuple of float
:type covariance_matrix: None or tuple of tuple of float
"""
self.dimension = dimension
self.log_sum_weight = log_sum_weight
self.averages = averages
self.covariance_matrix = covariance_matrix
def __repr__(self):
"""Return string, which interpreted constructs state of self.
:returns: Expression constructing an equivalent instance.
:rtype: str
"""
return (
f"VectorStatTracker(dimension={self.dimension!r},"
f"log_sum_weight={self.log_sum_weight!r},"
f"averages={self.averages!r},"
f"covariance_matrix={self.covariance_matrix!r})"
)
def copy(self):
"""Return new instance with the same state as self.
The main usage is to simplify debugging. This method allows
to store the old state, while self continues to track towards new state.
:returns: Created tracker instance.
:rtype: VectorStatTracker
"""
return VectorStatTracker(
self.dimension,
self.log_sum_weight,
self.averages[:],
copy.deepcopy(self.covariance_matrix),
)
def reset(self):
"""Return state set to empty data of proper dimensionality.
:returns: Updated self.
:rtype: VectorStatTracker
"""
self.averages = [0.0 for _ in range(self.dimension)]
# TODO: Examine whether we can gain speed by tracking triangle only.
self.covariance_matrix = [
[0.0 for _ in range(self.dimension)] for _ in range(self.dimension)
]
# TODO: In Python3, list comprehensions are generators,
# so they are not indexable. Put list() when converting.
return self
def unit_reset(self):
"""Reset state but use unit matric as covariance.
:returns: Updated self.
:rtype: VectorStatTracker
"""
self.reset()
for index in range(self.dimension):
self.covariance_matrix[index][index] = 1.0
return self
def add_get_shift(self, vector_value, log_weight=0.0):
"""Return shift and update state to addition of another sample.
Shift is the vector from old average to new sample.
For most callers, returning shift is more useful than returning self.
:param vector_value: The value of the sample.
:param log_weight: Natural logarithm of weight of the sample.
Default: 0.0 (as log of 1.0).
:type vector_value: iterable of float
:type log_weight: float
:returns: Shift vector
:rtype: list of float
"""
dimension = self.dimension
old_log_sum_weight = self.log_sum_weight
old_averages = self.averages
if not old_averages:
shift = [0.0 for _ in range(dimension)]
else:
shift = [
vector_value[index] - old_averages[index]
for index in range(dimension)
]
if old_log_sum_weight is None:
# First sample.
self.log_sum_weight = log_weight
self.averages = [vector_value[index] for index in range(dimension)]
# Not touching covariance matrix.
return shift
covariance_matrix = self.covariance_matrix
new_log_sum_weight = log_plus(old_log_sum_weight, log_weight)
data_ratio = math.exp(old_log_sum_weight - new_log_sum_weight)
sample_ratio = math.exp(log_weight - new_log_sum_weight)
new_averages = [
old_averages[index] + shift[index] * sample_ratio
for index in range(dimension)
]
# It is easier to update covariance matrix in-place.
for second in range(dimension):
for first in range(dimension):
element = covariance_matrix[first][second]
element += shift[first] * shift[second] * sample_ratio
element *= data_ratio
covariance_matrix[first][second] = element
self.log_sum_weight = new_log_sum_weight
self.averages = new_averages
# self.covariance_matrix still points to the object we updated in-place.
return shift
# TODO: There are some uses for such a vector tracker,
# that does not track average, but weightest (latest if tied) value,
# and computes covariance matrix centered around that.
# But perhaps the following method would work similarly well?
def add_without_dominance_get_distance(self, vector_value, log_weight=0.0):
"""Update stats, avoid having the sample dominate, return old distance.
If the weight of the incoming sample is far bigger
than the weight of all the previous data together,
covariance matrix would suffer from underflow.
To avoid that, this method manipulates both weights
before calling add().
The old covariance matrix (before update) can define a metric.
Shift is a vector, so the metric can be used to compute a distance
between old average and new sample.
:param vector_value: The value of the sample.
:param log_weight: Natural logarithm of weight of the sample.
Default: 0.0 (as log of 1.0).
:type vector_value: iterable of float
:type log_weight: float
:returns: Updated self.
:rtype: VectorStatTracker
"""
lsw = self.log_sum_weight
if lsw is not None and lsw < log_weight - 1.0:
lsw = (lsw + log_weight) / 2.0
log_weight = lsw
self.log_sum_weight = lsw
old_metric = copy.deepcopy(self.covariance_matrix)
shift = self.add_get_shift(vector_value, log_weight)
gradient = numpy.linalg.solve(old_metric, shift)
distance = numpy.vdot(shift, gradient)
return distance
|