1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
|
/*
* Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <arpa/inet.h>
#include <hicn/transport/portability/c_portability.h>
#include <hicn/transport/portability/endianess.h>
#include <hicn/transport/utils/membuf.h>
#include <protocols/fec/fec_info.h>
#include <protocols/fec_base.h>
#include <array>
#include <cstdint>
#include <map>
#include <unordered_set>
#include <vector>
namespace transport {
namespace protocol {
namespace fec {
#define foreach_rs_fec_type \
_(RS, 1, 2) \
_(RS, 1, 3) \
_(RS, 1, 4) \
_(RS, 2, 3) \
_(RS, 2, 4) \
_(RS, 2, 5) \
_(RS, 2, 6) \
_(RS, 3, 6) \
_(RS, 3, 7) \
_(RS, 3, 8) \
_(RS, 3, 9) \
_(RS, 3, 10) \
_(RS, 3, 11) \
_(RS, 3, 12) \
_(RS, 4, 5) \
_(RS, 4, 6) \
_(RS, 4, 7) \
_(RS, 4, 8) \
_(RS, 4, 9) \
_(RS, 4, 10) \
_(RS, 4, 11) \
_(RS, 4, 12) \
_(RS, 6, 10) \
_(RS, 8, 10) \
_(RS, 8, 11) \
_(RS, 8, 12) \
_(RS, 8, 14) \
_(RS, 8, 16) \
_(RS, 8, 32) \
_(RS, 10, 20) \
_(RS, 10, 25) \
_(RS, 10, 30) \
_(RS, 10, 40) \
_(RS, 10, 60) \
_(RS, 10, 90) \
_(RS, 16, 18) \
_(RS, 16, 21) \
_(RS, 16, 23) \
_(RS, 16, 24) \
_(RS, 16, 27) \
_(RS, 17, 21) \
_(RS, 17, 34) \
_(RS, 20, 45) \
_(RS, 20, 50) \
_(RS, 20, 60) \
_(RS, 20, 70) \
_(RS, 30, 70) \
_(RS, 30, 75) \
_(RS, 30, 85) \
_(RS, 30, 95) \
_(RS, 32, 36) \
_(RS, 32, 41) \
_(RS, 32, 46) \
_(RS, 32, 54) \
_(RS, 34, 42) \
_(RS, 35, 70) \
_(RS, 40, 95) \
_(RS, 40, 100) \
_(RS, 40, 110) \
_(RS, 40, 120) \
_(RS, 52, 62)
static const constexpr uint16_t MAX_SOURCE_BLOCK_SIZE = 128;
/**
* We use a std::array in place of std::vector to avoid to allocate a new vector
* in the heap every time we build a new source block, which would be bad if
* the decoder has to allocate several source blocks for many concurrent bases.
* std::array allows to be constructed in place, saving the allocation at the
* price os knowing in advance its size.
*/
class RSBufferInfo : public FECBufferInfo {
public:
RSBufferInfo() : FECBufferInfo() {}
RSBufferInfo(uint32_t offset, uint32_t index, uint32_t metadata,
buffer buffer)
: FECBufferInfo(index, metadata, buffer), offset_(offset) {}
uint32_t getOffset() { return offset_; }
RSBufferInfo &setOffset(uint32_t offset) {
offset_ = offset;
return *this;
}
private:
uint32_t offset_ = 0;
};
using Packets = std::array<RSBufferInfo, MAX_SOURCE_BLOCK_SIZE>;
/**
* FEC Header, prepended to symbol packets.
*/
struct fec_header {
/**
* The base source packet seq_number this FES symbol refers to
*/
uint32_t seq_number;
/**
* The index of the symbol inside the source block, between k and n - 1
*/
uint8_t encoded_symbol_id;
/**
* Total length of source block (n)
*/
uint8_t source_block_len;
/**
* Total number of symbols (n - k)
*/
uint8_t n_fec_symbols;
/**
* Align header to 64 bits
*/
uint8_t padding;
void setSeqNumberBase(uint32_t suffix) {
seq_number = portability::host_to_net(suffix);
}
uint32_t getSeqNumberBase() { return portability::net_to_host(seq_number); }
void setEncodedSymbolId(uint8_t esi) { encoded_symbol_id = esi; }
uint8_t getEncodedSymbolId() { return encoded_symbol_id; }
void setSourceBlockLen(uint8_t k) { source_block_len = k; }
uint8_t getSourceBlockLen() { return source_block_len; }
void setNFecSymbols(uint8_t n_r) { n_fec_symbols = n_r; }
uint8_t getNFecSymbols() { return n_fec_symbols; }
};
static_assert(sizeof(fec_header) <= 8, "fec_header is too large");
class rs;
/**
* This class models the source block itself.
*/
class BlockCode : public Packets {
/**
* @brief Metadata to include when encoding the buffers. This does not need to
* be sent over the network, but just to be included in the FEC protected
* bytes.
*
*/
class __attribute__((__packed__)) fec_metadata {
public:
void setPacketLength(uint16_t length) {
packet_length = portability::host_to_net(length);
}
uint32_t getPacketLength() {
return portability::net_to_host(packet_length);
}
void setMetadataBase(uint32_t value) {
metadata = portability::host_to_net(value);
}
uint32_t getMetadataBase() { return portability::net_to_host(metadata); }
private:
uint16_t packet_length; /* Used to get the real size of the packet after we
pad it */
uint32_t
metadata; /* Caller may specify an integer for storing additional
metadata that can be used when recovering the packet. */
};
/**
* For variable length packet we need to prepend to the padded payload the
* real length of the packet. This is *not* sent over the network.
*/
static constexpr std::size_t METADATA_BYTES = sizeof(fec_metadata);
public:
BlockCode(uint32_t k, uint32_t n, uint32_t seq_offset, struct fec_parms *code,
rs ¶ms);
/**
* Add a repair symbol to the dource block.
*/
bool addRepairSymbol(const fec::buffer &packet, uint32_t i,
uint32_t offset = 0);
/**
* Add a source symbol to the source block.
*/
bool addSourceSymbol(const fec::buffer &packet, uint32_t i,
uint32_t offset = 0,
uint32_t metadata = FECBase::INVALID_METADATA);
/**
* Get current length of source block.
*/
std::size_t length() { return current_block_size_; }
/**
* Get N
*/
uint32_t getN() { return n_; }
/**
* Get K
*/
uint32_t getK() { return k_; }
/**
* Clear source block
*/
void clear();
private:
/**
* Add symbol to source block
**/
bool addSymbol(const fec::buffer &packet, uint32_t i, uint32_t offset,
std::size_t size, uint32_t metadata);
/**
* Starting from k source symbols, get the n - k repair symbols
*/
void encode();
/**
* Starting from k symbols (mixed repair and source), get k source symbols.
* NOTE: It does not make sense to retrieve the k source symbols using the
* very same k source symbols. With the current implementation that case can
* never happen.
*/
void decode();
private:
uint32_t k_;
uint32_t n_;
uint32_t seq_offset_;
struct fec_parms *code_;
std::size_t max_buffer_size_;
std::size_t current_block_size_;
std::vector<uint32_t> sorted_index_;
bool to_decode_;
rs ¶ms_;
};
/**
* This class contains common parameters between the fec encoder and decoder.
* In particular it contains:
* - The callback to be called when symbols are encoded / decoded
* - The reference to the static reed-solomon parameters, allocated at program
* startup
* - N and K. Ideally they are useful only for the encoder (the decoder can
* retrieve them from the FEC header). However right now we assume sender and
* receiver agreed on the parameters k and n to use. We will introduce a control
* message later to negotiate them, so that decoder cah dynamically change them
* during the download.
*/
class rs : public virtual FECBase {
friend class BlockCode;
/**
* Deleter for static preallocated reed-solomon parameters.
*/
struct MatrixDeleter {
void operator()(struct fec_parms *params);
};
/**
* unique_ptr to reed-solomon parameters, with custom deleter to call fec_free
* at the end of the program
*/
using Matrix = std::unique_ptr<struct fec_parms, MatrixDeleter>;
/**
* Key to retrieve static preallocated reed-solomon parameters. It is pair of
* k and n
*/
using Code = std::pair<std::uint32_t /* k */, std::uint32_t /* n */>;
/**
* Custom hash function for (k, n) pair.
*/
struct CodeHasher {
std::size_t operator()(const Code &code) const {
uint64_t ret = uint64_t(code.first) << 32 | uint64_t(code.second);
return std::hash<uint64_t>{}(ret);
}
};
protected:
/**
* Callback to be called after the encode or the decode operations. In the
* former case it will contain the symbols, while in the latter the sources.
*/
using PacketsReady = std::function<void(std::vector<buffer> &)>;
/**
* The sequence number base.
*/
using SNBase = std::uint32_t;
/**
* The map of source blocks, used at the decoder side. For the encoding
* operation we can use one source block only, since packet are produced in
* order.
*/
using SourceBlocks = std::unordered_map<SNBase, BlockCode>;
/**
* Map (k, n) -> reed-solomon parameter
*/
using Codes = std::unordered_map<Code, Matrix, CodeHasher>;
public:
rs(uint32_t k, uint32_t n, uint32_t seq_offset = 0);
~rs() = default;
virtual void clear() { processed_source_blocks_.clear(); }
bool isSymbol(uint32_t index) { return ((index - seq_offset_) % n_) >= k_; }
private:
/**
* Create reed-solomon codes at program startup.
*/
static Codes createCodes();
protected:
bool processed(SNBase seq_base) {
return processed_source_blocks_.find(seq_base) !=
processed_source_blocks_.end();
}
void setProcessed(SNBase seq_base) {
processed_source_blocks_.emplace(seq_base);
}
std::uint32_t k_;
std::uint32_t n_;
std::uint32_t seq_offset_;
/**
* Keep track of processed source blocks
*/
std::unordered_set<SNBase> processed_source_blocks_;
static Codes codes_;
};
/**
* The reed-solomon encoder. It is feeded with source symbols and it provide
* repair-symbols through the fec_callback_
*/
class RSEncoder : public rs, public ProducerFEC {
public:
RSEncoder(uint32_t k, uint32_t n, uint32_t seq_offset = 0);
/**
* Always consume source symbols.
*/
void consume(const fec::buffer &packet, uint32_t index, uint32_t offset = 0,
uint32_t metadata = FECBase::INVALID_METADATA);
void onPacketProduced(core::ContentObject &content_object, uint32_t offset,
uint32_t metadata = FECBase::INVALID_METADATA) override;
/**
* @brief Get the fec header size, if added to source packets
* in RS the source packets do not transport any FEC header
*/
std::size_t getFecHeaderSize(bool isFEC) override {
return isFEC ? sizeof(fec_header) : 0;
}
void clear() override {
rs::clear();
source_block_.clear();
}
void reset() override { clear(); }
private:
struct fec_parms *current_code_;
/**
* The source block. As soon as it is filled with k source symbols, the
* encoder calls the callback fec_callback_ and the resets the block 0, ready
* to accept another batch of k source symbols.
*/
BlockCode source_block_;
};
/**
* The reed-solomon encoder. It is feeded with source/repair symbols and it
* provides the original source symbols through the fec_callback_
*/
class RSDecoder : public rs, public ConsumerFEC {
public:
RSDecoder(uint32_t k, uint32_t n, uint32_t seq_offset = 0);
/**
* Consume source symbol
*/
void consumeSource(const fec::buffer &packet, uint32_t i, uint32_t offset = 0,
uint32_t metadata = FECBase::INVALID_METADATA);
/**
* Consume repair symbol
*/
void consumeRepair(const fec::buffer &packet, uint32_t offset = 0);
/**
* Consumers will call this function when they receive a data packet
*/
void onDataPacket(core::ContentObject &content_object, uint32_t offset,
uint32_t metadata = FECBase::INVALID_METADATA) override;
/**
* @brief Get the fec header size, if added to source packets
* in RS the source packets do not transport any FEC header
*/
std::size_t getFecHeaderSize(bool isFEC) override {
return isFEC ? sizeof(fec_header) : 0;
}
/**
* Clear decoder to reuse
*/
void clear() override {
rs::clear();
src_blocks_.clear();
parked_packets_.clear();
}
void reset() override { clear(); }
private:
void recoverPackets(SourceBlocks::iterator &src_block_it);
private:
/**
* Map of source blocks. We use a map because we may receive symbols belonging
* to diffreent source blocks at the same time, so we need to be able to
* decode many source symbols at the same time.
*/
SourceBlocks src_blocks_;
/**
* Unordered Map of source symbols for which we did not receive any repair
* symbol in the same source block. Notably this happens when:
*
* - We receive the source symbols first and the repair symbols after
* - We received only source symbols for a given block. In that case it does
* not make any sense to build the source block, since we received all the
* source packet of the block.
*/
using BufferInfoArray = std::vector<RSBufferInfo>;
std::unordered_map<uint32_t, BufferInfoArray> parked_packets_;
};
} // namespace fec
} // namespace protocol
} // namespace transport
|