diff options
Diffstat (limited to 'libtransport/src/protocols/fec')
-rw-r--r-- | libtransport/src/protocols/fec/CMakeLists.txt | 2 | ||||
-rw-r--r-- | libtransport/src/protocols/fec/fec.cc | 176 | ||||
-rw-r--r-- | libtransport/src/protocols/fec/rely.cc | 103 | ||||
-rw-r--r-- | libtransport/src/protocols/fec/rely.h | 61 | ||||
-rw-r--r-- | libtransport/src/protocols/fec/rs.cc | 161 | ||||
-rw-r--r-- | libtransport/src/protocols/fec/rs.h | 101 |
6 files changed, 316 insertions, 288 deletions
diff --git a/libtransport/src/protocols/fec/CMakeLists.txt b/libtransport/src/protocols/fec/CMakeLists.txt index 6d61ae043..8ae0a7360 100644 --- a/libtransport/src/protocols/fec/CMakeLists.txt +++ b/libtransport/src/protocols/fec/CMakeLists.txt @@ -1,4 +1,4 @@ -# Copyright (c) 2017-2019 Cisco and/or its affiliates. +# Copyright (c) 2021 Cisco and/or its affiliates. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at: diff --git a/libtransport/src/protocols/fec/fec.cc b/libtransport/src/protocols/fec/fec.cc index 16a04cb98..912e7a40f 100644 --- a/libtransport/src/protocols/fec/fec.cc +++ b/libtransport/src/protocols/fec/fec.cc @@ -44,6 +44,7 @@ #include "fec.h" +#include <assert.h> #include <hicn/transport/portability/platform.h> #include <stdio.h> #include <stdlib.h> @@ -60,72 +61,6 @@ #endif /* - * compatibility stuff - */ -#ifdef MSDOS /* but also for others, e.g. sun... */ -#define NEED_BCOPY -#define bcmp(a, b, n) memcmp(a, b, n) -#endif - -#ifdef ANDROID -#define bcmp(a, b, n) memcmp(a, b, n) -#endif - -#ifdef NEED_BCOPY -#define bcopy(s, d, siz) memcpy((d), (s), (siz)) -#define bzero(d, siz) memset((d), '\0', (siz)) -#endif - -/* - * stuff used for testing purposes only - */ - -#ifdef TEST -#define DEB(x) -#define DDB(x) x -#define DEBUG 0 /* minimal debugging */ -#ifdef MSDOS -#include <time.h> -struct timeval { - unsigned long ticks; -}; -#define gettimeofday(x, dummy) \ - { (x)->ticks = clock(); } -#define DIFF_T(a, b) (1 + 1000000 * (a.ticks - b.ticks) / CLOCKS_PER_SEC) -typedef unsigned long u_long; -typedef unsigned short u_short; -#else /* typically, unix systems */ -#include <sys/time.h> -#define DIFF_T(a, b) \ - (1 + 1000000 * (a.tv_sec - b.tv_sec) + (a.tv_usec - b.tv_usec)) -#endif - -#define TICK(t) \ - { \ - struct timeval x; \ - gettimeofday(&x, NULL); \ - t = x.tv_usec + 1000000 * (x.tv_sec & 0xff); \ - } -#define TOCK(t) \ - { \ - u_long t1; \ - TICK(t1); \ - if (t1 < t) \ - t = 256000000 + t1 - t; \ - else \ - t = t1 - t; \ - if (t == 0) t = 1; \ - } - -u_long ticks[10]; /* vars for timekeeping */ -#else -#define DEB(x) -#define DDB(x) -#define TICK(x) -#define TOCK(x) -#endif /* TEST */ - -/* * You should not need to change anything beyond this point. * The first part of the file implements linear algebra in GF. * @@ -402,31 +337,17 @@ static void matmul(gf *a, gf *b, gf *c, int n, int k, int m) { } } -#ifdef DEBUGG -/* - * returns 1 if the square matrix is identiy - * (only for test) - */ -static int is_identity(gf *m, int k) { - int row, col; - for (row = 0; row < k; row++) - for (col = 0; col < k; col++) - if ((row == col && *m != 1) || (row != col && *m != 0)) - return 0; - else - m++; - return 1; -} -#endif /* debug */ - /* * invert_mat() takes a matrix and produces its inverse * k is the size of the matrix. * (Gauss-Jordan, adapted from Numerical Recipes in C) * Return non-zero if singular. */ -DEB(int pivloops = 0; int pivswaps = 0; /* diagnostic */) +int pivloops = 0; +int pivswaps = 0; /* diagnostic */ static int invert_mat(gf *src, int k) { + assert(k > 0); + gf c, *p; int irow, icol, row, col, i, ix; @@ -436,9 +357,9 @@ static int invert_mat(gf *src, int k) { int *ipiv = (int *)my_malloc(k * sizeof(int), "ipiv"); gf *id_row = NEW_GF_MATRIX(1, k); gf *temp_row = NEW_GF_MATRIX(1, k); - - bzero(id_row, k * sizeof(gf)); - DEB(pivloops = 0; pivswaps = 0; /* diagnostic */) + memset(id_row, '\0', k * sizeof(gf)); + pivloops = 0; + pivswaps = 0; /* diagnostic */ /* * ipiv marks elements already used as pivots. */ @@ -459,7 +380,7 @@ static int invert_mat(gf *src, int k) { for (row = 0; row < k; row++) { if (ipiv[row] != 1) { for (ix = 0; ix < k; ix++) { - DEB(pivloops++;) + pivloops++; if (ipiv[ix] == 0) { if (src[row * k + ix] != 0) { irow = row; @@ -497,12 +418,9 @@ static int invert_mat(gf *src, int k) { fprintf(stderr, "singular matrix 2\n"); goto fail; } - if (c != 1) { /* otherwhise this is a NOP */ - /* - * this is done often , but optimizing is not so - * fruitful, at least in the obvious ways (unrolling) - */ - DEB(pivswaps++;) + + if (c != 1) { + pivswaps++; c = inverse[c]; pivot_row[icol] = 1; for (ix = 0; ix < k; ix++) pivot_row[ix] = gf_mul(c, pivot_row[ix]); @@ -515,7 +433,7 @@ static int invert_mat(gf *src, int k) { * we can optimize the addmul). */ id_row[icol] = 1; - if (bcmp(pivot_row, id_row, k * sizeof(gf)) != 0) { + if (memcmp(pivot_row, id_row, k * sizeof(gf)) != 0) { for (p = src, ix = 0; ix < k; ix++, p += k) { if (ix != icol) { c = p[icol]; @@ -560,6 +478,8 @@ fail: */ int invert_vdm(gf *src, int k) { + assert(k > 0); + int i, j, row, col; gf *b, *c, *p; gf t, xx; @@ -614,14 +534,8 @@ int invert_vdm(gf *src, int k) { static int fec_initialized = 0; static void init_fec() { - TICK(ticks[0]); generate_gf(); - TOCK(ticks[0]); - DDB(fprintf(stderr, "generate_gf took %ldus\n", ticks[0]);) - TICK(ticks[0]); init_mul_table(); - TOCK(ticks[0]); - DDB(fprintf(stderr, "init_mul_table took %ldus\n", ticks[0]);) fec_initialized = 1; } @@ -680,19 +594,14 @@ struct fec_parms *fec_new(int k, int n) { * k*k vandermonde matrix, multiply right the bottom n-k rows * by the inverse, and construct the identity matrix at the top. */ - TICK(ticks[3]); invert_vdm(tmp_m, k); /* much faster than invert_mat */ matmul(tmp_m + k * k, tmp_m, retval->enc_matrix + k * k, n - k, k, k); /* * the upper matrix is I so do not bother with a slow multiply */ - bzero(retval->enc_matrix, k * k * sizeof(gf)); + memset(retval->enc_matrix, '\0', k * k * sizeof(gf)); for (p = retval->enc_matrix, col = 0; col < k; col++, p += k + 1) *p = 1; free(tmp_m); - TOCK(ticks[3]); - - DDB(fprintf(stderr, "--- %ld us to build encoding matrix\n", ticks[3]);) - DEB(pr_matrix(retval->enc_matrix, n, k, "encoding_matrix");) return retval; } @@ -708,10 +617,10 @@ void fec_encode(struct fec_parms *code, gf *src[], gf *fec, int index, int sz) { if (GF_BITS > 8) sz /= 2; if (index < k) - bcopy(src[index], fec, sz * sizeof(gf)); + memcpy(fec, src[index], sz * sizeof(gf)); else if (index < code->n) { p = &(code->enc_matrix[index * k]); - bzero(fec, sz * sizeof(gf)); + memset(fec, '\0', sz * sizeof(gf)); for (i = 0; i < k; i++) addmul(fec, src[i], p[i], sz); } else fprintf(stderr, "Invalid index %d (max %d)\n", index, code->n - 1); @@ -733,22 +642,13 @@ static int shuffle(gf *pkt[], int index[], int k) { int c = index[i]; if (index[c] == c) { - DEB(fprintf(stderr, "\nshuffle, error at %d\n", i);) + fprintf(stderr, "\nshuffle, error at %d\n", i); return 1; } SWAP(index[i], index[c], int); SWAP(pkt[i], pkt[c], gf *); } } - DEB(/* just test that it works... */ - for (i = 0; i < k; i++) { - if (index[i] < k && index[i] != i) { - fprintf(stderr, "shuffle: after\n"); - for (i = 0; i < k; i++) fprintf(stderr, "%3d ", index[i]); - fprintf(stderr, "\n"); - return 1; - } - }) return 0; } @@ -757,20 +657,16 @@ static int shuffle(gf *pkt[], int index[], int k) { * indexes. The matrix must be already allocated as * a vector of k*k elements, in row-major order */ -static gf *build_decode_matrix(struct fec_parms *code, gf *pkt[], int index[]) { +static gf *build_decode_matrix(struct fec_parms *code, int index[]) { int i, k = code->k; gf *p, *matrix = NEW_GF_MATRIX(k, k); - TICK(ticks[9]); for (i = 0, p = matrix; i < k; i++, p += k) { -#if 1 /* this is simply an optimization, not very useful indeed */ if (index[i] < k) { - bzero(p, k * sizeof(gf)); + memset(p, '\0', k * sizeof(gf)); p[i] = 1; - } else -#endif - if (index[i] < code->n) - bcopy(&(code->enc_matrix[index[i] * k]), p, k * sizeof(gf)); + } else if (index[i] < code->n) + memcpy(p, &(code->enc_matrix[index[i] * k]), k * sizeof(gf)); else { fprintf(stderr, "decode: invalid index %d (max %d)\n", index[i], code->n - 1); @@ -778,12 +674,10 @@ static gf *build_decode_matrix(struct fec_parms *code, gf *pkt[], int index[]) { return NULL; } } - TICK(ticks[9]); if (invert_mat(matrix, k)) { free(matrix); matrix = NULL; } - TOCK(ticks[9]); return matrix; } @@ -800,39 +694,29 @@ static gf *build_decode_matrix(struct fec_parms *code, gf *pkt[], int index[]) { */ int fec_decode(struct fec_parms *code, gf *pkt[], int index[], int sz) { gf *m_dec; - gf **new_pkt; + gf **new_pkt = nullptr; int row, col, k = code->k; + int i = 0; if (GF_BITS > 8) sz /= 2; if (shuffle(pkt, index, k)) /* error if true */ return 1; - m_dec = build_decode_matrix(code, pkt, index); + m_dec = build_decode_matrix(code, index); if (m_dec == NULL) return 1; /* error */ /* * do the actual decoding */ - new_pkt = (gf **)my_malloc(k * sizeof(gf *), "new pkt pointers"); + new_pkt = pkt + k; for (row = 0; row < k; row++) { if (index[row] >= k) { - new_pkt[row] = (gf *)my_malloc(sz * sizeof(gf), "new pkt buffer"); - bzero(new_pkt[row], sz * sizeof(gf)); + memset(new_pkt[i], '\0', sz * sizeof(gf)); for (col = 0; col < k; col++) - addmul(new_pkt[row], pkt[col], m_dec[row * k + col], sz); - } - } - /* - * move pkts to their final destination - */ - for (row = 0; row < k; row++) { - if (index[row] >= k) { - bcopy(new_pkt[row], pkt[row], sz * sizeof(gf)); - free(new_pkt[row]); + addmul(new_pkt[i], pkt[col], m_dec[row * k + col], sz); + i++; } } - free(new_pkt); free(m_dec); - return 0; } diff --git a/libtransport/src/protocols/fec/rely.cc b/libtransport/src/protocols/fec/rely.cc index 7a30a62e2..d4d98a90b 100644 --- a/libtransport/src/protocols/fec/rely.cc +++ b/libtransport/src/protocols/fec/rely.cc @@ -23,38 +23,39 @@ namespace transport { namespace protocol { namespace fec { -RelyEncoder::RelyEncoder(uint32_t k, uint32_t n, uint32_t seq_offset) +RelyEncoder::RelyEncoder(uint32_t k, uint32_t n, uint32_t /* seq_offset */) : RelyBase(k, n) { configure(kmtu, ktimeout, kmax_stream_size); set_repair_trigger(k_, n_ - k_, n_ - k_); } void RelyEncoder::onPacketProduced(core::ContentObject &content_object, - uint32_t offset) { + uint32_t offset, uint32_t metadata) { // Get pointer to payload, leaving space to insert FEC header. // TODO Check if this additional header is really needed. - auto data = content_object.writableData() + offset - sizeof(fec_header); - auto length = content_object.length() - offset + sizeof(fec_header); + auto data = content_object.writableData() + offset - sizeof(fec_metadata); + auto length = content_object.length() - offset + sizeof(fec_metadata); // Check packet length does not exceed maximum length supported by the // encoder (otherwise segmentation would take place). - assert(length < max_packet_bytes()); + DCHECK(length < max_packet_bytes()); DLOG_IF(INFO, VLOG_IS_ON(4)) - << "Encoding packet of length " << length - sizeof(fec_header); + << "Encoding packet of length " << length - sizeof(fec_metadata); - // Get the suffix. With rely we need to write it in the fec_header in order to - // be able to recognize the seq number upon recovery. + // Get the suffix. With rely we need to write it in the fec_metadata in order + // to be able to recognize the seq number upon recovery. auto suffix = content_object.getName().getSuffix(); DLOG_IF(INFO, VLOG_IS_ON(4)) << "Producing packet " << suffix << " (index == " << current_index_ << ")"; - // Consume payload. Add fec_header in front before feeding payload to encoder, - // and copy original content of packet - fec_header *h = reinterpret_cast<fec_header *>(data); - fec_header copy = *h; + // Consume payload. Add fec_metadata in front before feeding payload to + // encoder, and copy original content of packet + fec_metadata *h = reinterpret_cast<fec_metadata *>(data); + fec_metadata copy = *h; h->setSeqNumberBase(suffix); + h->setMetadataBase(metadata); auto packets = consume(data, length, getCurrentTime()); - assert(packets == 1); + DCHECK(packets == 1); // Update packet counter current_index_ += packets; @@ -62,7 +63,7 @@ void RelyEncoder::onPacketProduced(core::ContentObject &content_object, // Restore original packet content and increment data pointer to the correct // position *h = copy; - data += sizeof(fec_header); + data += sizeof(fec_metadata); // Check position of this packet inside N size block auto i = current_index_ % n_; @@ -74,24 +75,24 @@ void RelyEncoder::onPacketProduced(core::ContentObject &content_object, // TODO Optimize it by copying only the RELY header // Be sure encoder can produce - assert(can_produce()); + DCHECK(can_produce()); // Check new payload size and make sure it fits in packet buffer auto new_payload_size = produce_bytes(); int difference = new_payload_size - length; - assert(difference > 0); - assert(content_object.ensureCapacity(difference)); + DCHECK(difference > 0); + DCHECK(content_object.ensureCapacity(difference)); // Update length DLOG_IF(INFO, VLOG_IS_ON(4)) << "The packet length will be incremented by " - << difference + sizeof(fec_header); - content_object.append(difference + sizeof(fec_header)); + << difference + sizeof(fec_metadata); + content_object.append(difference + sizeof(fec_metadata)); content_object.updateLength(); // Make sure we got a source packet, otherwise we would put a repair symbol // in a source packet - assert(rely::packet_is_systematic(produce_data())); + DCHECK(rely::packet_is_systematic(produce_data())); // Copy rely packet replacing old source packet. std::memcpy(data, produce_data(), new_payload_size); @@ -111,7 +112,7 @@ void RelyEncoder::onPacketProduced(core::ContentObject &content_object, while (can_produce()) { // The current index MUST be k_, because we enforce n - k repair to be // produced after k sources - assert(current_index_ == k_); + DCHECK(current_index_ == k_); buffer packet; if (!buffer_callback_) { @@ -130,7 +131,7 @@ void RelyEncoder::onPacketProduced(core::ContentObject &content_object, std::memcpy(packet->writableData(), produce_data(), produce_bytes()); // Push symbol in repair_packets - packets_.emplace_back(0, std::move(packet)); + packets_.emplace_back(0, metadata, std::move(packet)); // Advance the encoder produce_next(); @@ -143,7 +144,7 @@ void RelyEncoder::onPacketProduced(core::ContentObject &content_object, // If we have generated repair symbols, let's notify caller via the installed // callback if (packets_.size()) { - assert(packets_.size() == n_ - k_); + DCHECK(packets_.size() == n_ - k_); fec_callback_(packets_); packets_.clear(); current_index_ = 0; @@ -156,7 +157,7 @@ RelyDecoder::RelyDecoder(uint32_t k, uint32_t n, uint32_t seq_offset) } void RelyDecoder::onDataPacket(core::ContentObject &content_object, - uint32_t offset) { + uint32_t offset, uint32_t metadata) { // Adjust pointers to point to packet payload auto data = content_object.writableData() + offset; auto size = content_object.length() - offset; @@ -164,30 +165,38 @@ void RelyDecoder::onDataPacket(core::ContentObject &content_object, // Pass payload to decoder consume(data, size, getCurrentTime()); + producePackets(); +} + +void RelyDecoder::producePackets() { // Drain decoder if possible while (can_produce()) { - // Get size of decoded packet - auto size = produce_bytes(); - - // Get buffer to copy packet in - auto packet = core::PacketManager<>::getInstance().getMemBuf(); + auto fec_header_size = sizeof(fec_metadata); + auto payload_size = produce_bytes() - sizeof(fec_metadata); - // Copy buffer - packet->append(size); - std::memcpy(packet->writableData(), produce_data(), size); + buffer packet; + if (!buffer_callback_) { + packet = core::PacketManager<>::getInstance().getMemBuf(); + packet->append(payload_size); + } else { + packet = buffer_callback_(payload_size); + } // Read seq number - fec_header *h = reinterpret_cast<fec_header *>(packet->writableData()); + const fec_metadata *h = + reinterpret_cast<const fec_metadata *>(produce_data()); uint32_t index = h->getSeqNumberBase(); + uint32_t metadata = h->getMetadataBase(); DLOG_IF(INFO, VLOG_IS_ON(4)) << "The index written in the packet is " << index; - // Remove FEC header - packet->trimStart(sizeof(fec_header)); + // Copy payload + std::memcpy(packet->writableData(), produce_data() + fec_header_size, + payload_size); // Save packet in buffer - packets_.emplace_back(index, std::move(packet)); + packets_.emplace_back(index, metadata, std::move(packet)); // Advance to next packet produce_next(); @@ -198,8 +207,28 @@ void RelyDecoder::onDataPacket(core::ContentObject &content_object, fec_callback_(packets_); packets_.clear(); } + + flushOutOfOrder(); +} + +void RelyDecoder::flushOutOfOrder() { + if (flush_timer_ == nullptr) return; + flush_timer_->cancel(); + + if (has_upcoming_flush()) { + flush_timer_->expires_from_now(std::chrono::milliseconds( + std::max((int64_t)0, upcoming_flush(getCurrentTime())))); + + flush_timer_->async_wait([this](const std::error_code &ec) { + if (ec) return; + if (has_upcoming_flush()) { + flush(getCurrentTime()); + producePackets(); + } + }); + } } } // namespace fec } // namespace protocol -} // namespace transport
\ No newline at end of file +} // namespace transport diff --git a/libtransport/src/protocols/fec/rely.h b/libtransport/src/protocols/fec/rely.h index bfbdb30bc..001a26002 100644 --- a/libtransport/src/protocols/fec/rely.h +++ b/libtransport/src/protocols/fec/rely.h @@ -33,8 +33,12 @@ namespace fec { * @brief Table of used codes. */ #define foreach_rely_fec_type \ + _(Rely, 1, 2) \ _(Rely, 1, 3) \ + _(Rely, 1, 4) \ _(Rely, 2, 3) \ + _(Rely, 2, 6) \ + _(Rely, 3, 9) \ _(Rely, 4, 5) \ _(Rely, 4, 6) \ _(Rely, 4, 7) \ @@ -74,11 +78,17 @@ class RelyBase : public virtual FECBase { * decoding operations. It may be removed once we know the meaning of the * fields in the rely header. */ - struct fec_header { - uint32_t seq_number; - + class fec_metadata { + public: void setSeqNumberBase(uint32_t suffix) { seq_number = htonl(suffix); } - uint32_t getSeqNumberBase() { return ntohl(seq_number); } + uint32_t getSeqNumberBase() const { return ntohl(seq_number); } + + void setMetadataBase(uint32_t value) { metadata = htonl(value); } + uint32_t getMetadataBase() const { return ntohl(metadata); } + + private: + uint32_t seq_number; + uint32_t metadata; }; /** @@ -112,21 +122,20 @@ class RelyBase : public virtual FECBase { #if RELY_DEBUG return time_++; #else - auto _time = utils::SteadyClock::now().time_since_epoch(); - auto time = std::chrono::duration_cast<utils::Milliseconds>(_time).count(); - return time; + return utils::SteadyTime::nowMs().count(); #endif } - protected: uint32_t k_; uint32_t n_; + std::uint32_t seq_offset_; + /** * @brief Vector of packets to be passed to caller callbacks. For encoder it * will contain the repair packets, for decoder the recovered sources. */ - std::vector<std::pair<uint32_t, buffer>> packets_; + BufferArray packets_; /** * @brief Current index to be used for local packet count. @@ -142,50 +151,54 @@ class RelyBase : public virtual FECBase { * @brief The Rely Encoder implementation. * */ -class RelyEncoder : private RelyBase, - private rely::encoder, - public ProducerFEC { +class RelyEncoder : RelyBase, rely::encoder, public ProducerFEC { public: RelyEncoder(uint32_t k, uint32_t n, uint32_t seq_offset = 0); /** * Producers will call this function when they produce a data packet. */ - void onPacketProduced(core::ContentObject &content_object, - uint32_t offset) override; + void onPacketProduced(core::ContentObject &content_object, uint32_t offset, + uint32_t metadata = FECBase::INVALID_METADATA) override; /** * @brief Get the fec header size, if added to source packets */ std::size_t getFecHeaderSize() override { - return header_bytes() + sizeof(fec_header) + 4; + return header_bytes() + sizeof(fec_metadata) + 4; } - void reset() override {} + void reset() override { + // Nothing to do here + } }; -class RelyDecoder : private RelyBase, - private rely::decoder, - public ConsumerFEC { +class RelyDecoder : RelyBase, rely::decoder, public ConsumerFEC { public: RelyDecoder(uint32_t k, uint32_t n, uint32_t seq_offset = 0); /** * Consumers will call this function when they receive a data packet */ - void onDataPacket(core::ContentObject &content_object, - uint32_t offset) override; + void onDataPacket(core::ContentObject &content_object, uint32_t offset, + uint32_t metadata = FECBase::INVALID_METADATA) override; /** * @brief Get the fec header size, if added to source packets */ std::size_t getFecHeaderSize() override { - return header_bytes() + sizeof(fec_header); + return header_bytes() + sizeof(fec_metadata); + } + + void reset() override { + // Nothing to do here } - void reset() override {} + private: + void producePackets(); + void flushOutOfOrder(); }; } // namespace fec } // namespace protocol -} // namespace transport
\ No newline at end of file +} // namespace transport diff --git a/libtransport/src/protocols/fec/rs.cc b/libtransport/src/protocols/fec/rs.cc index 2c23d515d..9c0a3d4fb 100644 --- a/libtransport/src/protocols/fec/rs.cc +++ b/libtransport/src/protocols/fec/rs.cc @@ -46,23 +46,26 @@ bool BlockCode::addRepairSymbol(const fec::buffer &packet, uint32_t i, to_decode_ = true; DLOG_IF(INFO, VLOG_IS_ON(4)) << "Adding symbol of size " << packet->length(); return addSymbol(packet, i, offset, - packet->length() - sizeof(fec_header) - offset); + packet->length() - sizeof(fec_header) - offset, + FECBase::INVALID_METADATA); } bool BlockCode::addSourceSymbol(const fec::buffer &packet, uint32_t i, - uint32_t offset) { + uint32_t offset, uint32_t metadata) { DLOG_IF(INFO, VLOG_IS_ON(4)) << "Adding source symbol of size " << packet->length() << ", offset " << offset; - return addSymbol(packet, i, offset, packet->length() - offset); + return addSymbol(packet, i, offset, packet->length() - offset, metadata); } bool BlockCode::addSymbol(const fec::buffer &packet, uint32_t i, - uint32_t offset, std::size_t size) { + uint32_t offset, std::size_t size, + uint32_t metadata) { if (size > max_buffer_size_) { max_buffer_size_ = size; } - operator[](current_block_size_++) = std::make_tuple(i, packet, offset); + operator[](current_block_size_) = RSBufferInfo(offset, i, metadata, packet); + current_block_size_++; if (current_block_size_ >= k_) { if (to_decode_) { @@ -80,12 +83,13 @@ bool BlockCode::addSymbol(const fec::buffer &packet, uint32_t i, void BlockCode::encode() { gf *data[n_]; - uint32_t base = std::get<0>(operator[](0)); + uint32_t base = operator[](0).getIndex(); // Set packet length in first 2 bytes for (uint32_t i = 0; i < k_; i++) { - auto &packet = std::get<1>(operator[](i)); - auto offset = std::get<2>(operator[](i)); + auto &packet = operator[](i).getBuffer(); + auto offset = operator[](i).getOffset(); + auto metadata_base = operator[](i).getMetadata(); auto ret = packet->ensureCapacityAndFillUnused(max_buffer_size_ + offset, 0); @@ -98,10 +102,11 @@ void BlockCode::encode() { // Buffers should hold 2 *after* the padding, in order to be // able to set the length for the encoding operation. // packet->trimStart(offset); - uint16_t *length = reinterpret_cast<uint16_t *>(packet->writableData() + - max_buffer_size_ + offset); + fec_metadata *metadata = reinterpret_cast<fec_metadata *>( + packet->writableData() + max_buffer_size_ + offset); auto buffer_length = packet->length() - offset; - *length = htons(buffer_length); + metadata->setPacketLength(buffer_length); + metadata->setMetadataBase(metadata_base); DLOG_IF(INFO, VLOG_IS_ON(4)) << "Current buffer size: " << packet->length(); @@ -109,7 +114,7 @@ void BlockCode::encode() { } // Finish to fill source block with the buffers to hold the repair symbols - auto length = max_buffer_size_ + sizeof(fec_header) + LEN_SIZE_BYTES; + auto length = max_buffer_size_ + sizeof(fec_header) + METADATA_BYTES; for (uint32_t i = k_; i < n_; i++) { buffer packet; if (!params_.buffer_callback_) { @@ -133,19 +138,20 @@ void BlockCode::encode() { DLOG_IF(INFO, VLOG_IS_ON(4)) << "Current symbol size: " << packet->length(); data[i] = packet->writableData(); - operator[](i) = std::make_tuple(i, std::move(packet), uint32_t(0)); + operator[](i) = RSBufferInfo(uint32_t(0), i, FECBase::INVALID_METADATA, + std::move(packet)); } // Generate repair symbols and put them in corresponding buffers DLOG_IF(INFO, VLOG_IS_ON(4)) << "Calling encode with max_buffer_size_ = " << max_buffer_size_; for (uint32_t i = k_; i < n_; i++) { - fec_encode(code_, data, data[i], i, max_buffer_size_ + LEN_SIZE_BYTES); + fec_encode(code_, data, data[i], i, max_buffer_size_ + METADATA_BYTES); } // Re-include header in repair packets for (uint32_t i = k_; i < n_; i++) { - auto &packet = std::get<1>(operator[](i)); + auto &packet = operator[](i).getBuffer(); packet->prepend(sizeof(fec_header)); DLOG_IF(INFO, VLOG_IS_ON(4)) << "Produced repair symbol of size = " << packet->length(); @@ -153,16 +159,31 @@ void BlockCode::encode() { } void BlockCode::decode() { - gf *data[k_]; + gf *data[n_]; uint32_t index[k_]; + buffer aux_fec_packets[n_ - k_]; + // FEC packet number k0 + uint32_t k0 = 0; + + // Reorder block by index with in-place sorting + for (uint32_t i = 0; i < k_;) { + uint32_t idx = operator[](i).getIndex(); + if (idx >= k_ || idx == i) { + i++; + } else { + std::swap(operator[](i), operator[](idx)); + } + } for (uint32_t i = 0; i < k_; i++) { - auto &packet = std::get<1>(operator[](i)); - index[i] = std::get<0>(operator[](i)); - auto offset = std::get<2>(operator[](i)); + auto &packet = operator[](i).getBuffer(); + index[i] = operator[](i).getIndex(); + auto offset = operator[](i).getOffset(); + auto metadata_base = operator[](i).getMetadata(); sorted_index_[i] = index[i]; if (index[i] < k_) { + operator[](i).setReceived(); DLOG_IF(INFO, VLOG_IS_ON(4)) << "DECODE SOURCE - index " << index[i] << " - Current buffer size: " << packet->length(); @@ -173,49 +194,51 @@ void BlockCode::decode() { // able to set the length for the encoding operation packet->trimStart(offset); packet->ensureCapacityAndFillUnused(max_buffer_size_, 0); - uint16_t *length = reinterpret_cast<uint16_t *>( - packet->writableData() + max_buffer_size_ - LEN_SIZE_BYTES); - - *length = htons(packet->length()); + fec_metadata *metadata = reinterpret_cast<fec_metadata *>( + packet->writableData() + max_buffer_size_ - METADATA_BYTES); + metadata->setPacketLength(packet->length()); + metadata->setMetadataBase(metadata_base); } else { DLOG_IF(INFO, VLOG_IS_ON(4)) << "DECODE SYMBOL - index " << index[i] << " - Current buffer size: " << packet->length(); packet->trimStart(sizeof(fec_header) + offset); + aux_fec_packets[k0] = core::PacketManager<>::getInstance().getMemBuf(); + data[k_ + k0] = aux_fec_packets[k0]->writableData(); + k0++; } - data[i] = packet->writableData(); } - // We decode the source block DLOG_IF(INFO, VLOG_IS_ON(4)) << "Calling decode with max_buffer_size_ = " << max_buffer_size_; + fec_decode(code_, data, reinterpret_cast<int *>(index), max_buffer_size_); // Find the index in the block for recovered packets - for (uint32_t i = 0; i < k_; i++) { - if (index[i] != i) { - for (uint32_t j = 0; j < k_; j++) - if (sorted_index_[j] == uint32_t(index[i])) { - sorted_index_[j] = i; - } - } - } - - // Reorder block by index with in-place sorting - for (uint32_t i = 0; i < k_; i++) { - for (uint32_t j = sorted_index_[i]; j != i; j = sorted_index_[i]) { - std::swap(sorted_index_[j], sorted_index_[i]); - std::swap(operator[](j), operator[](i)); + for (uint32_t i = 0, j = 0; i < k_; i++) { + if (index[i] >= k_) { + operator[](i).setBuffer(aux_fec_packets[j++]); + operator[](i).setIndex(i); } } // Adjust length according to the one written in the source packet for (uint32_t i = 0; i < k_; i++) { - auto &packet = std::get<1>(operator[](i)); - uint16_t *length = reinterpret_cast<uint16_t *>( - packet->writableData() + max_buffer_size_ - LEN_SIZE_BYTES); - packet->setLength(ntohs(*length)); + auto &packet = operator[](i).getBuffer(); + fec_metadata *metadata = reinterpret_cast<fec_metadata *>( + packet->writableData() + max_buffer_size_ - METADATA_BYTES); + // Adjust buffer length + packet->setLength(metadata->getPacketLength()); + // Adjust metadata + operator[](i).setMetadata(metadata->getMetadataBase()); + + // reset the point to the beginning of the packets for all received packets + if (operator[](i).getReceived()) { + auto &packet = operator[](i).getBuffer(); + auto offset = operator[](i).getOffset(); + packet->prepend(offset); + } } } @@ -252,12 +275,11 @@ RSEncoder::RSEncoder(uint32_t k, uint32_t n, uint32_t seq_offset) source_block_(k_, n_, seq_offset_, current_code_, *this) {} void RSEncoder::consume(const fec::buffer &packet, uint32_t index, - uint32_t offset) { - if (!source_block_.addSourceSymbol(packet, index, offset)) { - std::vector<std::pair<uint32_t, buffer>> repair_packets; + uint32_t offset, uint32_t metadata) { + if (!source_block_.addSourceSymbol(packet, index, offset, metadata)) { + fec::BufferArray repair_packets; for (uint32_t i = k_; i < n_; i++) { - repair_packets.emplace_back(std::move(std::get<0>(source_block_[i])), - std::move(std::get<1>(source_block_[i]))); + repair_packets.emplace_back(std::move(source_block_[i])); } fec_callback_(repair_packets); @@ -265,9 +287,9 @@ void RSEncoder::consume(const fec::buffer &packet, uint32_t index, } void RSEncoder::onPacketProduced(core::ContentObject &content_object, - uint32_t offset) { + uint32_t offset, uint32_t metadata) { consume(content_object.shared_from_this(), - content_object.getName().getSuffix(), offset); + content_object.getName().getSuffix(), offset, metadata); } RSDecoder::RSDecoder(uint32_t k, uint32_t n, uint32_t seq_offset) @@ -276,10 +298,15 @@ RSDecoder::RSDecoder(uint32_t k, uint32_t n, uint32_t seq_offset) void RSDecoder::recoverPackets(SourceBlocks::iterator &src_block_it) { DLOG_IF(INFO, VLOG_IS_ON(4)) << "recoverPackets for " << k_; auto &src_block = src_block_it->second; - std::vector<std::pair<uint32_t, buffer>> source_packets(k_); + auto base_index = src_block_it->first; + BufferArray source_packets(k_); + + // Iterate over packets in the block and adjust indexed accordingly. This must + // be done because indexes are from 0 to (n - k - 1), but we need indexes from + // base_index to base_index + (n - k - 1) for (uint32_t i = 0; i < src_block.getK(); i++) { - source_packets[i] = std::make_pair(src_block_it->first + i, - std::move(std::get<1>(src_block[i]))); + src_block[i].setIndex(base_index + src_block[i].getIndex()); + source_packets[i] = FECBufferInfo(std::move(src_block[i])); } setProcessed(src_block_it->first); @@ -296,9 +323,9 @@ void RSDecoder::recoverPackets(SourceBlocks::iterator &src_block_it) { } void RSDecoder::consumeSource(const fec::buffer &packet, uint32_t index, - uint32_t offset) { + uint32_t offset, uint32_t metadata) { // Normalize index - assert(index >= seq_offset_); + DCHECK(index >= seq_offset_); auto i = (index - seq_offset_) % n_; // Get base @@ -324,16 +351,19 @@ void RSDecoder::consumeSource(const fec::buffer &packet, uint32_t index, // protocol that may come in the future. auto it = src_blocks_.find(base); if (it != src_blocks_.end()) { - auto ret = it->second.addSourceSymbol(packet, i, offset); + auto ret = it->second.addSourceSymbol(packet, i, offset, metadata); if (!ret) { recoverPackets(it); } } else { DLOG_IF(INFO, VLOG_IS_ON(4)) << "Adding to parked source packets"; - auto ret = parked_packets_.emplace( - base, std::vector<std::pair<buffer, uint32_t>>()); - ret.first->second.emplace_back(packet, i); + auto ret = parked_packets_.emplace(base, BufferInfoArray()); + ret.first->second.emplace_back(offset, i, metadata, packet); + /** + * If we reached k source packets, we do not have any missing packet to + * recover via FEC. Delete the block. + */ if (ret.first->second.size() >= k_) { setProcessed(ret.first->first); parked_packets_.erase(ret.first); @@ -356,8 +386,8 @@ void RSDecoder::consumeRepair(const fec::buffer &packet, uint32_t offset) { DLOG_IF(INFO, VLOG_IS_ON(4)) << "Decoder consume called for repair symbol. BASE = " << base - << ", index = " << base + i << " and i = " << i << ". K=" << k - << ", N=" << n; + << ", index = " << base + i << " and i = " << (int)i << ". K=" << (int)k + << ", N=" << (int)n; // check if a source block already exist for this symbol auto it = src_blocks_.find(base); @@ -380,8 +410,9 @@ void RSDecoder::consumeRepair(const fec::buffer &packet, uint32_t offset) { auto it2 = parked_packets_.find(base); if (it2 != parked_packets_.end()) { for (auto &packet_index : it2->second) { - auto ret = it->second.addSourceSymbol(packet_index.first, - packet_index.second, offset); + auto ret = it->second.addSourceSymbol( + packet_index.getBuffer(), packet_index.getIndex(), + packet_index.getOffset(), packet_index.getMetadata()); if (!ret) { recoverPackets(it); // Finish to delete packets in same source block that were @@ -399,7 +430,7 @@ void RSDecoder::consumeRepair(const fec::buffer &packet, uint32_t offset) { } void RSDecoder::onDataPacket(core::ContentObject &content_object, - uint32_t offset) { + uint32_t offset, uint32_t metadata) { DLOG_IF(INFO, VLOG_IS_ON(4)) << "Calling fec for data packet " << content_object.getName() << ". Offset: " << offset; @@ -409,7 +440,7 @@ void RSDecoder::onDataPacket(core::ContentObject &content_object, if (isSymbol(suffix)) { consumeRepair(content_object.shared_from_this(), offset); } else { - consumeSource(content_object.shared_from_this(), suffix, offset); + consumeSource(content_object.shared_from_this(), suffix, offset, metadata); } } diff --git a/libtransport/src/protocols/fec/rs.h b/libtransport/src/protocols/fec/rs.h index e159ad9f7..034c32bdc 100644 --- a/libtransport/src/protocols/fec/rs.h +++ b/libtransport/src/protocols/fec/rs.h @@ -34,10 +34,28 @@ namespace protocol { namespace fec { #define foreach_rs_fec_type \ + _(RS, 1, 2) \ _(RS, 1, 3) \ + _(RS, 1, 4) \ + _(RS, 2, 3) \ + _(RS, 2, 4) \ + _(RS, 2, 5) \ + _(RS, 2, 6) \ + _(RS, 3, 6) \ + _(RS, 3, 7) \ + _(RS, 3, 8) \ + _(RS, 3, 9) \ + _(RS, 3, 10) \ + _(RS, 3, 11) \ + _(RS, 3, 12) \ _(RS, 4, 5) \ _(RS, 4, 6) \ _(RS, 4, 7) \ + _(RS, 4, 8) \ + _(RS, 4, 9) \ + _(RS, 4, 10) \ + _(RS, 4, 11) \ + _(RS, 4, 12) \ _(RS, 6, 10) \ _(RS, 8, 10) \ _(RS, 8, 11) \ @@ -45,6 +63,8 @@ namespace fec { _(RS, 8, 14) \ _(RS, 8, 16) \ _(RS, 8, 32) \ + _(RS, 10, 20) \ + _(RS, 10, 25) \ _(RS, 10, 30) \ _(RS, 10, 40) \ _(RS, 10, 60) \ @@ -56,12 +76,24 @@ namespace fec { _(RS, 16, 27) \ _(RS, 17, 21) \ _(RS, 17, 34) \ + _(RS, 20, 45) \ + _(RS, 20, 50) \ + _(RS, 20, 60) \ + _(RS, 20, 70) \ + _(RS, 30, 70) \ + _(RS, 30, 75) \ + _(RS, 30, 85) \ + _(RS, 30, 95) \ _(RS, 32, 36) \ _(RS, 32, 41) \ _(RS, 32, 46) \ _(RS, 32, 54) \ _(RS, 34, 42) \ _(RS, 35, 70) \ + _(RS, 40, 95) \ + _(RS, 40, 100) \ + _(RS, 40, 110) \ + _(RS, 40, 120) \ _(RS, 52, 62) static const constexpr uint16_t MAX_SOURCE_BLOCK_SIZE = 128; @@ -73,9 +105,24 @@ static const constexpr uint16_t MAX_SOURCE_BLOCK_SIZE = 128; * std::array allows to be constructed in place, saving the allocation at the * price os knowing in advance its size. */ -using Packets = std::array<std::tuple</* index */ uint32_t, /* buffer */ buffer, - uint32_t /* offset */>, - MAX_SOURCE_BLOCK_SIZE>; +class RSBufferInfo : public FECBufferInfo { + public: + RSBufferInfo() : FECBufferInfo() {} + + RSBufferInfo(uint32_t offset, uint32_t index, uint32_t metadata, + buffer buffer) + : FECBufferInfo(index, metadata, buffer), offset_(offset) {} + + uint32_t getOffset() { return offset_; } + RSBufferInfo &setOffset(uint32_t offset) { + offset_ = offset; + return *this; + } + + private: + uint32_t offset_ = 0; +}; +using Packets = std::array<RSBufferInfo, MAX_SOURCE_BLOCK_SIZE>; /** * FEC Header, prepended to symbol packets. @@ -123,10 +170,32 @@ class rs; */ class BlockCode : public Packets { /** + * @brief Metadata to include when encoding the buffers. This does not need to + * be sent over the network, but just to be included in the FEC protected + * bytes. + * + */ + class __attribute__((__packed__)) fec_metadata { + public: + void setPacketLength(uint16_t length) { packet_length = htons(length); } + uint32_t getPacketLength() { return ntohs(packet_length); } + + void setMetadataBase(uint32_t value) { metadata = htonl(value); } + uint32_t getMetadataBase() { return ntohl(metadata); } + + private: + uint16_t packet_length; /* Used to get the real size of the packet after we + pad it */ + uint32_t + metadata; /* Caller may specify an integer for storing additional + metadata that can be used when recovering the packet. */ + }; + + /** * For variable length packet we need to prepend to the padded payload the * real length of the packet. This is *not* sent over the network. */ - static constexpr std::size_t LEN_SIZE_BYTES = 2; + static constexpr std::size_t METADATA_BYTES = sizeof(fec_metadata); public: BlockCode(uint32_t k, uint32_t n, uint32_t seq_offset, struct fec_parms *code, @@ -142,7 +211,8 @@ class BlockCode : public Packets { * Add a source symbol to the source block. */ bool addSourceSymbol(const fec::buffer &packet, uint32_t i, - uint32_t offset = 0); + uint32_t offset = 0, + uint32_t metadata = FECBase::INVALID_METADATA); /** * Get current length of source block. @@ -169,7 +239,7 @@ class BlockCode : public Packets { * Add symbol to source block **/ bool addSymbol(const fec::buffer &packet, uint32_t i, uint32_t offset, - std::size_t size); + std::size_t size, uint32_t metadata); /** * Starting from k source symbols, get the n - k repair symbols @@ -310,10 +380,11 @@ class RSEncoder : public rs, public ProducerFEC { /** * Always consume source symbols. */ - void consume(const fec::buffer &packet, uint32_t index, uint32_t offset = 0); + void consume(const fec::buffer &packet, uint32_t index, uint32_t offset = 0, + uint32_t metadata = FECBase::INVALID_METADATA); - void onPacketProduced(core::ContentObject &content_object, - uint32_t offset) override; + void onPacketProduced(core::ContentObject &content_object, uint32_t offset, + uint32_t metadata = FECBase::INVALID_METADATA) override; /** * @brief Get the fec header size, if added to source packets @@ -348,8 +419,8 @@ class RSDecoder : public rs, public ConsumerFEC { /** * Consume source symbol */ - void consumeSource(const fec::buffer &packet, uint32_t i, - uint32_t offset = 0); + void consumeSource(const fec::buffer &packet, uint32_t i, uint32_t offset = 0, + uint32_t metadata = FECBase::INVALID_METADATA); /** * Consume repair symbol @@ -359,8 +430,8 @@ class RSDecoder : public rs, public ConsumerFEC { /** * Consumers will call this function when they receive a data packet */ - void onDataPacket(core::ContentObject &content_object, - uint32_t offset) override; + void onDataPacket(core::ContentObject &content_object, uint32_t offset, + uint32_t metadata = FECBase::INVALID_METADATA) override; /** * @brief Get the fec header size, if added to source packets @@ -398,8 +469,8 @@ class RSDecoder : public rs, public ConsumerFEC { * not make any sense to build the source block, since we received all the * source packet of the block. */ - std::unordered_map<uint32_t, std::vector<std::pair<buffer, uint32_t>>> - parked_packets_; + using BufferInfoArray = std::vector<RSBufferInfo>; + std::unordered_map<uint32_t, BufferInfoArray> parked_packets_; }; } // namespace fec |