From 08233d44a6cfde878d7e10bca38ae935ed1c8fd5 Mon Sep 17 00:00:00 2001 From: Mauro Date: Wed, 30 Jun 2021 07:57:22 +0000 Subject: [HICN-713] Transport Library Major Refactoring 2 Co-authored-by: Luca Muscariello Co-authored-by: Michele Papalini Co-authored-by: Olivier Roques Co-authored-by: Giulio Grassi Signed-off-by: Mauro Sardara Change-Id: I5b2c667bad66feb45abdb5effe22ed0f6c85d1c2 --- libtransport/src/core/rs.cc | 370 -------------------------------------------- 1 file changed, 370 deletions(-) delete mode 100644 libtransport/src/core/rs.cc (limited to 'libtransport/src/core/rs.cc') diff --git a/libtransport/src/core/rs.cc b/libtransport/src/core/rs.cc deleted file mode 100644 index 33270736d..000000000 --- a/libtransport/src/core/rs.cc +++ /dev/null @@ -1,370 +0,0 @@ - -/* - * Copyright (c) 2021 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include -#include -#include -#include - -#include - -namespace transport { -namespace core { -namespace fec { - -BlockCode::BlockCode(uint32_t k, uint32_t n, struct fec_parms *code) - : Packets(), - k_(k), - n_(n), - code_(code), - max_buffer_size_(0), - current_block_size_(0), - to_decode_(false) { - sorted_index_.reserve(n); -} - -bool BlockCode::addRepairSymbol(const fec::buffer &packet, uint32_t i) { - // Get index - to_decode_ = true; - TRANSPORT_LOGD("adding symbol of size %zu", packet->length()); - return addSymbol(packet, i, packet->length() - sizeof(fec_header)); -} - -bool BlockCode::addSourceSymbol(const fec::buffer &packet, uint32_t i) { - return addSymbol(packet, i, packet->length()); -} - -bool BlockCode::addSymbol(const fec::buffer &packet, uint32_t i, - std::size_t size) { - if (size > max_buffer_size_) { - max_buffer_size_ = size; - } - - operator[](current_block_size_++) = std::make_pair(i, packet); - - if (current_block_size_ >= k_) { - if (to_decode_) { - decode(); - } else { - encode(); - } - - clear(); - return false; - } - - return true; -} - -void BlockCode::encode() { - gf **data = new gf*[k_]; - uint32_t *old_values = new uint32_t[k_]; - uint32_t base = operator[](0).first; - - // Set packet length in first 2 bytes - for (uint32_t i = 0; i < k_; i++) { - auto &packet = operator[](i).second; - - TRANSPORT_LOGD("Current buffer size: %zu", packet->length()); - - auto ret = packet->ensureCapacityAndFillUnused(max_buffer_size_, 0); - if (TRANSPORT_EXPECT_FALSE(ret == false)) { - throw errors::RuntimeException( - "Provided packet is not suitable to be used as FEC source packet. " - "Aborting."); - } - - // Buffers should hold 2 bytes before the starting pointer, in order to be - // able to set the length for the encoding operation - packet->prepend(2); - uint16_t *length = reinterpret_cast(packet->writableData()); - - old_values[i] = *length; - *length = htons(u_short(packet->length() - LEN_SIZE_BYTES)); - - data[i] = packet->writableData(); - - } - - // Finish to fill source block with the buffers to hold the repair symbols - for (uint32_t i = k_; i < n_; i++) { - // For the moment we get a packet from the pool here.. later we'll need to - // require a packet from the caller with a callback. - auto packet = PacketManager<>::getInstance().getMemBuf(); - packet->append(max_buffer_size_ + sizeof(fec_header) + LEN_SIZE_BYTES); - fec_header *fh = reinterpret_cast(packet->writableData()); - - fh->setSeqNumberBase(base); - fh->setNFecSymbols(n_ - k_); - fh->setEncodedSymbolId(i); - fh->setSourceBlockLen(n_); - - packet->trimStart(sizeof(fec_header)); - - data[i] = packet->writableData(); - operator[](i) = std::make_pair(i, std::move(packet)); - } - - // Generate repair symbols and put them in corresponding buffers - TRANSPORT_LOGD("Calling encode with max_buffer_size_ = %zu", - max_buffer_size_); - for (uint32_t i = k_; i < n_; i++) { - fec_encode(code_, data, data[i], i, (int)(max_buffer_size_ + LEN_SIZE_BYTES)); - } - - // Restore original content of buffer space used to store the length - for (uint32_t i = 0; i < k_; i++) { - auto &packet = operator[](i).second; - uint16_t *length = reinterpret_cast(packet->writableData()); - *length = old_values[i]; - packet->trimStart(2); - } - - // Re-include header in repair packets - for (uint32_t i = k_; i < n_; i++) { - auto &packet = operator[](i).second; - TRANSPORT_LOGD("Produced repair symbol of size = %zu", packet->length()); - packet->prepend(sizeof(fec_header)); - } - delete [] data; - delete [] old_values; -} - -void BlockCode::decode() { - gf **data = new gf*[k_]; - uint32_t *index = new uint32_t[k_]; - - for (uint32_t i = 0; i < k_; i++) { - auto &packet = operator[](i).second; - index[i] = operator[](i).first; - sorted_index_[i] = index[i]; - - if (index[i] < k_) { - TRANSPORT_LOGD("DECODE SOURCE - index %u - Current buffer size: %zu", - index[i], packet->length()); - // This is a source packet. We need to prepend the length and fill - // additional space to 0 - - // Buffers should hold 2 bytes before the starting pointer, in order to be - // able to set the length for the encoding operation - packet->prepend(LEN_SIZE_BYTES); - packet->ensureCapacityAndFillUnused(max_buffer_size_, 0); - uint16_t *length = reinterpret_cast(packet->writableData()); - - *length = htons(u_short(packet->length() - LEN_SIZE_BYTES)); - } else { - TRANSPORT_LOGD("DECODE SYMBOL - index %u - Current buffer size: %zu", - index[i], packet->length()); - packet->trimStart(sizeof(fec_header)); - } - - data[i] = packet->writableData(); - delete [] data; - delete [] index; - } - - // We decode the source block - TRANSPORT_LOGD("Calling decode with max_buffer_size_ = %zu", - max_buffer_size_); - fec_decode(code_, data, reinterpret_cast(index), (int)max_buffer_size_); - - // Find the index in the block for recovered packets - for (uint32_t i = 0; i < k_; i++) { - if (index[i] != i) { - for (uint32_t j = 0; j < k_; j++) - if (sorted_index_[j] == uint32_t(index[i])) { - sorted_index_[j] = i; - } - } - } - - // Reorder block by index with in-place sorting - for (uint32_t i = 0; i < k_; i++) { - for (uint32_t j = sorted_index_[i]; j != i; j = sorted_index_[i]) { - std::swap(sorted_index_[j], sorted_index_[i]); - std::swap(operator[](j), operator[](i)); - } - } - - // Adjust length according to the one written in the source packet - for (uint32_t i = 0; i < k_; i++) { - auto &packet = operator[](i).second; - uint16_t *length = reinterpret_cast(packet->writableData()); - packet->trimStart(2); - packet->setLength(ntohs(*length)); - } -} - -void BlockCode::clear() { - current_block_size_ = 0; - max_buffer_size_ = 0; - sorted_index_.clear(); - to_decode_ = false; -} - -void rs::MatrixDeleter::operator()(struct fec_parms *params) { - fec_free(params); -} - -rs::Codes rs::createCodes() { - Codes ret; - - ret.emplace(std::make_pair(1, 3), Matrix(fec_new(1, 3), MatrixDeleter())); - ret.emplace(std::make_pair(6, 10), Matrix(fec_new(6, 10), MatrixDeleter())); - ret.emplace(std::make_pair(8, 32), Matrix(fec_new(8, 32), MatrixDeleter())); - ret.emplace(std::make_pair(10, 30), Matrix(fec_new(10, 30), MatrixDeleter())); - ret.emplace(std::make_pair(16, 24), Matrix(fec_new(16, 24), MatrixDeleter())); - ret.emplace(std::make_pair(10, 40), Matrix(fec_new(10, 40), MatrixDeleter())); - ret.emplace(std::make_pair(10, 60), Matrix(fec_new(10, 60), MatrixDeleter())); - ret.emplace(std::make_pair(10, 90), Matrix(fec_new(10, 90), MatrixDeleter())); - - return ret; -} - -rs::Codes rs::codes_ = createCodes(); - -rs::rs(uint32_t k, uint32_t n) : k_(k), n_(n) {} - -void rs::setFECCallback(const PacketsReady &callback) { - fec_callback_ = callback; -} - -encoder::encoder(uint32_t k, uint32_t n) - : rs(k, n), - current_code_(codes_[std::make_pair(k, n)].get()), - source_block_(k_, n_, current_code_) {} - -void encoder::consume(const fec::buffer &packet, uint32_t index) { - if (!source_block_.addSourceSymbol(packet, index)) { - std::vector repair_packets; - for (uint32_t i = k_; i < n_; i++) { - repair_packets.emplace_back(std::move(source_block_[i].second)); - } - fec_callback_(repair_packets); - } -} - -decoder::decoder(uint32_t k, uint32_t n) : rs(k, n) {} - -void decoder::recoverPackets(SourceBlocks::iterator &src_block_it) { - TRANSPORT_LOGD("recoverPackets for %u", k_); - auto &src_block = src_block_it->second; - std::vector source_packets(k_); - for (uint32_t i = 0; i < src_block.getK(); i++) { - source_packets[i] = std::move(src_block[i].second); - } - - fec_callback_(source_packets); - processed_source_blocks_.emplace(src_block_it->first); - - auto it = parked_packets_.find(src_block_it->first); - if (it != parked_packets_.end()) { - parked_packets_.erase(it); - } - - src_blocks_.erase(src_block_it); -} - -void decoder::consume(const fec::buffer &packet, uint32_t index) { - // Normalize index - auto i = index % n_; - - // Get base - uint32_t base = index - i; - - TRANSPORT_LOGD( - "Decoder consume called for source symbol. BASE = %u, index = %u and i = " - "%u", - base, index, i); - - // check if a source block already exist for this symbol. If it does not - // exist, we lazily park this packet until we receive a repair symbol for the - // same block. This is done for 2 reason: - // 1) If we receive all the source packets of a block, we do not need to - // recover anything. - // 2) Sender may change n and k at any moment, so we construct the source - // block based on the (n, k) values written in the fec header. This is - // actually not used right now, since we use fixed value of n and k passed - // at construction time, but it paves the ground for a more dynamic - // protocol that may come in the future. - auto it = src_blocks_.find(base); - if (it != src_blocks_.end()) { - auto ret = it->second.addSourceSymbol(packet, i); - if (!ret) { - recoverPackets(it); - } - } else { - TRANSPORT_LOGD("Adding to parked source packets"); - auto ret = parked_packets_.emplace( - base, std::vector >()); - ret.first->second.emplace_back(packet, i); - } -} - -void decoder::consume(const fec::buffer &packet) { - // Repair symbol! Get index and base source block. - fec_header *h = reinterpret_cast(packet->writableData()); - auto i = h->getEncodedSymbolId(); - auto base = h->getSeqNumberBase(); - auto n = h->getSourceBlockLen(); - auto k = n - h->getNFecSymbols(); - - TRANSPORT_LOGD( - "Decoder consume called for repair symbol. BASE = %u, index = %u and i = " - "%u", - base, base + i, i); - - // check if a source block already exist for this symbol - auto it = src_blocks_.find(base); - if (it == src_blocks_.end()) { - // Create new source block - auto code_it = codes_.find(std::make_pair(k, n)); - if (code_it == codes_.end()) { - TRANSPORT_LOGE("Code for k = %u and n = %u does not exist.", k_, n_); - return; - } - - auto emplace_result = - src_blocks_.emplace(base, BlockCode(k, n, code_it->second.get())); - it = emplace_result.first; - - // Check in the parked packets and insert any packet that is part of this - // source block - - auto it2 = parked_packets_.find(base); - if (it2 != parked_packets_.end()) { - for (auto &packet_index : it2->second) { - auto ret = - it->second.addSourceSymbol(packet_index.first, packet_index.second); - if (!ret) { - recoverPackets(it); - // Finish to delete packets in same source block that were - // eventually not used - return; - } - } - } - } - - auto ret = it->second.addRepairSymbol(packet, i); - if (!ret) { - recoverPackets(it); - } -} - -} // namespace fec -} // namespace core -} // namespace transport -- cgit 1.2.3-korg