diff options
author | Luca Muscariello <muscariello@ieee.org> | 2021-04-15 09:05:46 +0200 |
---|---|---|
committer | Mauro Sardara <msardara@cisco.com> | 2021-04-15 16:36:16 +0200 |
commit | e92e9e839ca2cf42b56322b2489ccc0d8bf767af (patch) | |
tree | 9f1647c83a87fbf982ae329e800af25dbfb226b5 /libtransport/src/core/rs.cc | |
parent | 3e541d7c947cc2f9db145f26c9274efd29a6fb56 (diff) |
[HICN-690] Transport Library Major Refactory
The current patch provides a major refactory of the transportlibrary.
A summary of the different components that underwent major modifications is
reported below.
- Transport protocol updates
The hierarchy of classes has been optimized to have common transport services
across different transport protocols. This can allow to customize a transport
protocol with new features.
- A new real-time communication protocol
The RTC protocol has been optimized in terms of algorithms to reduce
consumer-producer synchronization latency.
- A novel socket API
The API has been reworked to be easier to consumer but also to have a more
efficient integration in L4 proxies.
- Several performance improvements
A large number of performance improvements have been included in
particular to make the entire stack zero-copy and optimize cache miss.
- New memory buffer framework
Memory management has been reworked entirely to provide a more efficient infra
with a richer API. Buffers are now allocated in blocks and a single buffer
holds the memory for (1) the shared_ptr control block, (2) the metadata of the
packet (e.g. name, pointer to other buffers if buffer is chained and relevant
offsets), and (3) the packet itself, as it is sent/received over the network.
- A new slab allocator
Dynamic memory allocation is now managed by a novel slab allocator that is
optimised for packet processing and connection management. Memory is organized
in pools of blocks all of the same size which are used during the processing of
outgoing/incoming packets. When a memory block Is allocated is always taken
from a global pool and when it is deallocated is returned to the pool, thus
avoiding the cost of any heap allocation in the data path.
- New transport connectors
Consumer and producer end-points can communication either using an hicn packet
forwarder or with direct connector based on shared memories or sockets.
The usage of transport connectors typically for unit and funcitonal
testing but may have additional usage.
- Support for FEC/ECC for transport services
FEC/ECC via reed solomon is supported by default and made available to
transport services as a modular component. Reed solomon block codes is a
default FEC model that can be replaced in a modular way by many other
codes including RLNC not avaiable in this distribution.
The current FEC framework support variable size padding and efficiently
makes use of the infra memory buffers to avoid additiona copies.
- Secure transport framework for signature computation and verification
Crypto support is nativelty used in hICN for integrity and authenticity.
Novel support that includes RTC has been implemented and made modular
and reusable acrosso different transport protocols.
- TLS - Transport layer security over hicn
Point to point confidentiality is provided by integrating TLS on top of
hICN reliable and non-reliable transport. The integration is common and
makes a different use of the TLS record.
- MLS - Messaging layer security over hicn
MLS integration on top of hICN is made by using the MLSPP implemetation
open sourced by Cisco. We have included instrumentation tools to deploy
performance and functional tests of groups of end-points.
- Android support
The overall code has been heavily tested in Android environments and
has received heavy lifting to better run natively in recent Android OS.
Co-authored-by: Mauro Sardara <msardara@cisco.com>
Co-authored-by: Michele Papalini <micpapal@cisco.com>
Co-authored-by: Olivier Roques <oroques+fdio@cisco.com>
Co-authored-by: Giulio Grassi <gigrassi@cisco.com>
Change-Id: If477ba2fa686e6f47bdf96307ac60938766aef69
Signed-off-by: Luca Muscariello <muscariello@ieee.org>
Diffstat (limited to 'libtransport/src/core/rs.cc')
-rw-r--r-- | libtransport/src/core/rs.cc | 365 |
1 files changed, 365 insertions, 0 deletions
diff --git a/libtransport/src/core/rs.cc b/libtransport/src/core/rs.cc new file mode 100644 index 000000000..44b5852e5 --- /dev/null +++ b/libtransport/src/core/rs.cc @@ -0,0 +1,365 @@ + +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <core/fec.h> +#include <core/rs.h> +#include <hicn/transport/core/global_object_pool.h> +#include <hicn/transport/utils/log.h> + +#include <cassert> + +namespace transport { +namespace core { +namespace fec { + +BlockCode::BlockCode(uint32_t k, uint32_t n, struct fec_parms *code) + : Packets(), + k_(k), + n_(n), + code_(code), + max_buffer_size_(0), + current_block_size_(0), + to_decode_(false) { + sorted_index_.reserve(n); +} + +bool BlockCode::addRepairSymbol(const fec::buffer &packet, uint32_t i) { + // Get index + to_decode_ = true; + TRANSPORT_LOGD("adding symbol of size %zu", packet->length()); + return addSymbol(packet, i, packet->length() - sizeof(fec_header)); +} + +bool BlockCode::addSourceSymbol(const fec::buffer &packet, uint32_t i) { + return addSymbol(packet, i, packet->length()); +} + +bool BlockCode::addSymbol(const fec::buffer &packet, uint32_t i, + std::size_t size) { + if (size > max_buffer_size_) { + max_buffer_size_ = size; + } + + operator[](current_block_size_++) = std::make_pair(i, packet); + + if (current_block_size_ >= k_) { + if (to_decode_) { + decode(); + } else { + encode(); + } + + clear(); + return false; + } + + return true; +} + +void BlockCode::encode() { + gf *data[n_]; + std::uint16_t old_values[k_]; + uint32_t base = operator[](0).first; + + // Set packet length in first 2 bytes + for (uint32_t i = 0; i < k_; i++) { + auto &packet = operator[](i).second; + + TRANSPORT_LOGD("Current buffer size: %zu", packet->length()); + + auto ret = packet->ensureCapacityAndFillUnused(max_buffer_size_, 0); + if (TRANSPORT_EXPECT_FALSE(ret == false)) { + throw errors::RuntimeException( + "Provided packet is not suitable to be used as FEC source packet. " + "Aborting."); + } + + // Buffers should hold 2 bytes before the starting pointer, in order to be + // able to set the length for the encoding operation + packet->prepend(2); + uint16_t *length = reinterpret_cast<uint16_t *>(packet->writableData()); + + old_values[i] = *length; + *length = htons(packet->length() - LEN_SIZE_BYTES); + + data[i] = packet->writableData(); + } + + // Finish to fill source block with the buffers to hold the repair symbols + for (uint32_t i = k_; i < n_; i++) { + // For the moment we get a packet from the pool here.. later we'll need to + // require a packet from the caller with a callback. + auto packet = PacketManager<>::getInstance().getMemBuf(); + packet->append(max_buffer_size_ + sizeof(fec_header) + LEN_SIZE_BYTES); + fec_header *fh = reinterpret_cast<fec_header *>(packet->writableData()); + + fh->setSeqNumberBase(base); + fh->setNFecSymbols(n_ - k_); + fh->setEncodedSymbolId(i); + fh->setSourceBlockLen(n_); + + packet->trimStart(sizeof(fec_header)); + + data[i] = packet->writableData(); + operator[](i) = std::make_pair(i, std::move(packet)); + } + + // Generate repair symbols and put them in corresponding buffers + TRANSPORT_LOGD("Calling encode with max_buffer_size_ = %zu", + max_buffer_size_); + for (uint32_t i = k_; i < n_; i++) { + fec_encode(code_, data, data[i], i, max_buffer_size_ + LEN_SIZE_BYTES); + } + + // Restore original content of buffer space used to store the length + for (uint32_t i = 0; i < k_; i++) { + auto &packet = operator[](i).second; + uint16_t *length = reinterpret_cast<uint16_t *>(packet->writableData()); + *length = old_values[i]; + packet->trimStart(2); + } + + // Re-include header in repair packets + for (uint32_t i = k_; i < n_; i++) { + auto &packet = operator[](i).second; + TRANSPORT_LOGD("Produced repair symbol of size = %zu", packet->length()); + packet->prepend(sizeof(fec_header)); + } +} + +void BlockCode::decode() { + gf *data[k_]; + uint32_t index[k_]; + + for (uint32_t i = 0; i < k_; i++) { + auto &packet = operator[](i).second; + index[i] = operator[](i).first; + sorted_index_[i] = index[i]; + + if (index[i] < k_) { + TRANSPORT_LOGD("DECODE SOURCE - index %u - Current buffer size: %zu", + index[i], packet->length()); + // This is a source packet. We need to prepend the length and fill + // additional space to 0 + + // Buffers should hold 2 bytes before the starting pointer, in order to be + // able to set the length for the encoding operation + packet->prepend(LEN_SIZE_BYTES); + packet->ensureCapacityAndFillUnused(max_buffer_size_, 0); + uint16_t *length = reinterpret_cast<uint16_t *>(packet->writableData()); + + *length = htons(packet->length() - LEN_SIZE_BYTES); + } else { + TRANSPORT_LOGD("DECODE SYMBOL - index %u - Current buffer size: %zu", + index[i], packet->length()); + packet->trimStart(sizeof(fec_header)); + } + + data[i] = packet->writableData(); + } + + // We decode the source block + TRANSPORT_LOGD("Calling decode with max_buffer_size_ = %zu", + max_buffer_size_); + fec_decode(code_, data, reinterpret_cast<int *>(index), max_buffer_size_); + + // Find the index in the block for recovered packets + for (uint32_t i = 0; i < k_; i++) { + if (index[i] != i) { + for (uint32_t j = 0; j < k_; j++) + if (sorted_index_[j] == uint32_t(index[i])) { + sorted_index_[j] = i; + } + } + } + + // Reorder block by index with in-place sorting + for (uint32_t i = 0; i < k_; i++) { + for (uint32_t j = sorted_index_[i]; j != i; j = sorted_index_[i]) { + std::swap(sorted_index_[j], sorted_index_[i]); + std::swap(operator[](j), operator[](i)); + } + } + + // Adjust length according to the one written in the source packet + for (uint32_t i = 0; i < k_; i++) { + auto &packet = operator[](i).second; + uint16_t *length = reinterpret_cast<uint16_t *>(packet->writableData()); + packet->trimStart(2); + packet->setLength(ntohs(*length)); + } +} + +void BlockCode::clear() { + current_block_size_ = 0; + max_buffer_size_ = 0; + sorted_index_.clear(); + to_decode_ = false; +} + +void rs::MatrixDeleter::operator()(struct fec_parms *params) { + fec_free(params); +} + +rs::Codes rs::createCodes() { + Codes ret; + + ret.emplace(std::make_pair(1, 3), Matrix(fec_new(1, 3), MatrixDeleter())); + ret.emplace(std::make_pair(6, 10), Matrix(fec_new(6, 10), MatrixDeleter())); + ret.emplace(std::make_pair(8, 32), Matrix(fec_new(8, 32), MatrixDeleter())); + ret.emplace(std::make_pair(10, 30), Matrix(fec_new(10, 30), MatrixDeleter())); + ret.emplace(std::make_pair(16, 24), Matrix(fec_new(16, 24), MatrixDeleter())); + ret.emplace(std::make_pair(10, 40), Matrix(fec_new(10, 40), MatrixDeleter())); + ret.emplace(std::make_pair(10, 60), Matrix(fec_new(10, 60), MatrixDeleter())); + ret.emplace(std::make_pair(10, 90), Matrix(fec_new(10, 90), MatrixDeleter())); + + return ret; +} + +rs::Codes rs::codes_ = createCodes(); + +rs::rs(uint32_t k, uint32_t n) : k_(k), n_(n) {} + +void rs::setFECCallback(const PacketsReady &callback) { + fec_callback_ = callback; +} + +encoder::encoder(uint32_t k, uint32_t n) + : rs(k, n), + current_code_(codes_[std::make_pair(k, n)].get()), + source_block_(k_, n_, current_code_) {} + +void encoder::consume(const fec::buffer &packet, uint32_t index) { + if (!source_block_.addSourceSymbol(packet, index)) { + std::vector<buffer> repair_packets; + for (uint32_t i = k_; i < n_; i++) { + repair_packets.emplace_back(std::move(source_block_[i].second)); + } + fec_callback_(repair_packets); + } +} + +decoder::decoder(uint32_t k, uint32_t n) : rs(k, n) {} + +void decoder::recoverPackets(SourceBlocks::iterator &src_block_it) { + TRANSPORT_LOGD("recoverPackets for %u", k_); + auto &src_block = src_block_it->second; + std::vector<buffer> source_packets(k_); + for (uint32_t i = 0; i < src_block.getK(); i++) { + source_packets[i] = std::move(src_block[i].second); + } + + fec_callback_(source_packets); + processed_source_blocks_.emplace(src_block_it->first); + + auto it = parked_packets_.find(src_block_it->first); + if (it != parked_packets_.end()) { + parked_packets_.erase(it); + } + + src_blocks_.erase(src_block_it); +} + +void decoder::consume(const fec::buffer &packet, uint32_t index) { + // Normalize index + auto i = index % n_; + + // Get base + uint32_t base = index - i; + + TRANSPORT_LOGD( + "Decoder consume called for source symbol. BASE = %u, index = %u and i = " + "%u", + base, index, i); + + // check if a source block already exist for this symbol. If it does not + // exist, we lazily park this packet until we receive a repair symbol for the + // same block. This is done for 2 reason: + // 1) If we receive all the source packets of a block, we do not need to + // recover anything. + // 2) Sender may change n and k at any moment, so we construct the source + // block based on the (n, k) values written in the fec header. This is + // actually not used right now, since we use fixed value of n and k passed + // at construction time, but it paves the ground for a more dynamic + // protocol that may come in the future. + auto it = src_blocks_.find(base); + if (it != src_blocks_.end()) { + auto ret = it->second.addSourceSymbol(packet, i); + if (!ret) { + recoverPackets(it); + } + } else { + TRANSPORT_LOGD("Adding to parked source packets"); + auto ret = parked_packets_.emplace( + base, std::vector<std::pair<buffer, uint32_t> >()); + ret.first->second.emplace_back(packet, i); + } +} + +void decoder::consume(const fec::buffer &packet) { + // Repair symbol! Get index and base source block. + fec_header *h = reinterpret_cast<fec_header *>(packet->writableData()); + auto i = h->getEncodedSymbolId(); + auto base = h->getSeqNumberBase(); + auto n = h->getSourceBlockLen(); + auto k = n - h->getNFecSymbols(); + + TRANSPORT_LOGD( + "Decoder consume called for repair symbol. BASE = %u, index = %u and i = " + "%u", + base, base + i, i); + + // check if a source block already exist for this symbol + auto it = src_blocks_.find(base); + if (it == src_blocks_.end()) { + // Create new source block + auto code_it = codes_.find(std::make_pair(k, n)); + if (code_it == codes_.end()) { + TRANSPORT_LOGE("Code for k = %u and n = %u does not exist.", k_, n_); + return; + } + + auto emplace_result = + src_blocks_.emplace(base, BlockCode(k, n, code_it->second.get())); + it = emplace_result.first; + + // Check in the parked packets and insert any packet that is part of this + // source block + + auto it2 = parked_packets_.find(base); + if (it2 != parked_packets_.end()) { + for (auto &packet_index : it2->second) { + auto ret = + it->second.addSourceSymbol(packet_index.first, packet_index.second); + if (!ret) { + recoverPackets(it); + // Finish to delete packets in same source block that were + // eventually not used + return; + } + } + } + } + + auto ret = it->second.addRepairSymbol(packet, i); + if (!ret) { + recoverPackets(it); + } +} + +} // namespace fec +} // namespace core +} // namespace transport |