path: root/libtransport/src/protocols/prod_protocol_rtc.cc
diff options
authorLuca Muscariello <muscariello@ieee.org>2021-04-15 09:05:46 +0200
committerMauro Sardara <msardara@cisco.com>2021-04-15 16:36:16 +0200
commite92e9e839ca2cf42b56322b2489ccc0d8bf767af (patch)
tree9f1647c83a87fbf982ae329e800af25dbfb226b5 /libtransport/src/protocols/prod_protocol_rtc.cc
parent3e541d7c947cc2f9db145f26c9274efd29a6fb56 (diff)
[HICN-690] Transport Library Major Refactory
The current patch provides a major refactory of the transportlibrary. A summary of the different components that underwent major modifications is reported below. - Transport protocol updates The hierarchy of classes has been optimized to have common transport services across different transport protocols. This can allow to customize a transport protocol with new features. - A new real-time communication protocol The RTC protocol has been optimized in terms of algorithms to reduce consumer-producer synchronization latency. - A novel socket API The API has been reworked to be easier to consumer but also to have a more efficient integration in L4 proxies. - Several performance improvements A large number of performance improvements have been included in particular to make the entire stack zero-copy and optimize cache miss. - New memory buffer framework Memory management has been reworked entirely to provide a more efficient infra with a richer API. Buffers are now allocated in blocks and a single buffer holds the memory for (1) the shared_ptr control block, (2) the metadata of the packet (e.g. name, pointer to other buffers if buffer is chained and relevant offsets), and (3) the packet itself, as it is sent/received over the network. - A new slab allocator Dynamic memory allocation is now managed by a novel slab allocator that is optimised for packet processing and connection management. Memory is organized in pools of blocks all of the same size which are used during the processing of outgoing/incoming packets. When a memory block Is allocated is always taken from a global pool and when it is deallocated is returned to the pool, thus avoiding the cost of any heap allocation in the data path. - New transport connectors Consumer and producer end-points can communication either using an hicn packet forwarder or with direct connector based on shared memories or sockets. The usage of transport connectors typically for unit and funcitonal testing but may have additional usage. - Support for FEC/ECC for transport services FEC/ECC via reed solomon is supported by default and made available to transport services as a modular component. Reed solomon block codes is a default FEC model that can be replaced in a modular way by many other codes including RLNC not avaiable in this distribution. The current FEC framework support variable size padding and efficiently makes use of the infra memory buffers to avoid additiona copies. - Secure transport framework for signature computation and verification Crypto support is nativelty used in hICN for integrity and authenticity. Novel support that includes RTC has been implemented and made modular and reusable acrosso different transport protocols. - TLS - Transport layer security over hicn Point to point confidentiality is provided by integrating TLS on top of hICN reliable and non-reliable transport. The integration is common and makes a different use of the TLS record. - MLS - Messaging layer security over hicn MLS integration on top of hICN is made by using the MLSPP implemetation open sourced by Cisco. We have included instrumentation tools to deploy performance and functional tests of groups of end-points. - Android support The overall code has been heavily tested in Android environments and has received heavy lifting to better run natively in recent Android OS. Co-authored-by: Mauro Sardara <msardara@cisco.com> Co-authored-by: Michele Papalini <micpapal@cisco.com> Co-authored-by: Olivier Roques <oroques+fdio@cisco.com> Co-authored-by: Giulio Grassi <gigrassi@cisco.com> Change-Id: If477ba2fa686e6f47bdf96307ac60938766aef69 Signed-off-by: Luca Muscariello <muscariello@ieee.org>
Diffstat (limited to 'libtransport/src/protocols/prod_protocol_rtc.cc')
1 files changed, 481 insertions, 0 deletions
diff --git a/libtransport/src/protocols/prod_protocol_rtc.cc b/libtransport/src/protocols/prod_protocol_rtc.cc
new file mode 100644
index 000000000..8081923e3
--- /dev/null
+++ b/libtransport/src/protocols/prod_protocol_rtc.cc
@@ -0,0 +1,481 @@
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <hicn/transport/core/global_object_pool.h>
+#include <implementation/socket_producer.h>
+#include <protocols/prod_protocol_rtc.h>
+#include <protocols/rtc/rtc_consts.h>
+#include <stdlib.h>
+#include <time.h>
+#include <unordered_set>
+namespace transport {
+namespace protocol {
+ implementation::ProducerSocket *icn_socket)
+ : ProductionProtocol(icn_socket),
+ current_seg_(1),
+ produced_bytes_(0),
+ produced_packets_(0),
+ max_packet_production_(1),
+ bytes_production_rate_(0),
+ packets_production_rate_(0),
+ last_round_(std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count()),
+ allow_delayed_nacks_(false),
+ queue_timer_on_(false),
+ consumer_in_sync_(false),
+ on_consumer_in_sync_(nullptr) {
+ srand((unsigned int)time(NULL));
+ prod_label_ = rand() % 256;
+ interests_queue_timer_ =
+ std::make_unique<asio::steady_timer>(portal_->getIoService());
+ round_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService());
+ setOutputBufferSize(10000);
+ scheduleRoundTimer();
+RTCProductionProtocol::~RTCProductionProtocol() {}
+void RTCProductionProtocol::registerNamespaceWithNetwork(
+ const Prefix &producer_namespace) {
+ ProductionProtocol::registerNamespaceWithNetwork(producer_namespace);
+ flow_name_ = producer_namespace.getName();
+ auto family = flow_name_.getAddressFamily();
+ switch (family) {
+ case AF_INET6:
+ header_size_ = (uint32_t)Packet::getHeaderSizeFromFormat(HF_INET6_TCP);
+ break;
+ case AF_INET:
+ header_size_ = (uint32_t)Packet::getHeaderSizeFromFormat(HF_INET_TCP);
+ break;
+ default:
+ throw errors::RuntimeException("Unknown name format.");
+ }
+void RTCProductionProtocol::scheduleRoundTimer() {
+ round_timer_->expires_from_now(
+ std::chrono::milliseconds(rtc::PRODUCER_STATS_INTERVAL));
+ round_timer_->async_wait([this](std::error_code ec) {
+ if (ec) return;
+ updateStats();
+ });
+void RTCProductionProtocol::updateStats() {
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ uint64_t duration = now - last_round_;
+ if (duration == 0) duration = 1;
+ double per_second = rtc::MILLI_IN_A_SEC / duration;
+ uint32_t prev_packets_production_rate = packets_production_rate_;
+ bytes_production_rate_ = ceil((double)produced_bytes_ * per_second);
+ packets_production_rate_ = ceil((double)produced_packets_ * per_second);
+ TRANSPORT_LOGD("Updating production rate: produced_bytes_ = %u bps = %u",
+ produced_bytes_, bytes_production_rate_);
+ // update the production rate as soon as it increases by 10% with respect to
+ // the last round
+ max_packet_production_ =
+ produced_packets_ + ceil((double)produced_packets_ * 0.1);
+ if (max_packet_production_ < rtc::WIN_MIN)
+ max_packet_production_ = rtc::WIN_MIN;
+ if (packets_production_rate_ != 0) {
+ allow_delayed_nacks_ = false;
+ } else if (prev_packets_production_rate == 0) {
+ // at least 2 rounds with production rate = 0
+ allow_delayed_nacks_ = true;
+ }
+ // check if the production rate is decreased. if yes send nacks if needed
+ if (prev_packets_production_rate < packets_production_rate_) {
+ sendNacksForPendingInterests();
+ }
+ produced_bytes_ = 0;
+ produced_packets_ = 0;
+ last_round_ = now;
+ scheduleRoundTimer();
+uint32_t RTCProductionProtocol::produceStream(
+ const Name &content_name, std::unique_ptr<utils::MemBuf> &&buffer,
+ bool is_last, uint32_t start_offset) {
+ throw errors::NotImplementedException();
+uint32_t RTCProductionProtocol::produceStream(const Name &content_name,
+ const uint8_t *buffer,
+ size_t buffer_size, bool is_last,
+ uint32_t start_offset) {
+ throw errors::NotImplementedException();
+void RTCProductionProtocol::produce(ContentObject &content_object) {
+ throw errors::NotImplementedException();
+uint32_t RTCProductionProtocol::produceDatagram(
+ const Name &content_name, std::unique_ptr<utils::MemBuf> &&buffer) {
+ std::size_t buffer_size = buffer->length();
+ if (TRANSPORT_EXPECT_FALSE(buffer_size == 0)) return 0;
+ uint32_t data_packet_size;
+ socket_->getSocketOption(interface::GeneralTransportOptions::DATA_PACKET_SIZE,
+ data_packet_size);
+ if (TRANSPORT_EXPECT_FALSE((buffer_size + header_size_ +
+ rtc::DATA_HEADER_SIZE) > data_packet_size)) {
+ return 0;
+ }
+ auto content_object =
+ core::PacketManager<>::getInstance().getPacket<ContentObject>();
+ // add rtc header to the payload
+ struct rtc::data_packet_t header;
+ content_object->appendPayload((const uint8_t *)&header,
+ content_object->appendPayload(buffer->data(), buffer->length());
+ std::shared_ptr<ContentObject> co = std::move(content_object);
+ // schedule actual sending on internal thread
+ portal_->getIoService().dispatch(
+ [this, content_object{std::move(co)}, content_name]() mutable {
+ produceInternal(std::move(content_object), content_name);
+ });
+ return 1;
+void RTCProductionProtocol::produceInternal(
+ std::shared_ptr<ContentObject> &&content_object, const Name &content_name) {
+ // set rtc header
+ struct rtc::data_packet_t *data_pkt =
+ (struct rtc::data_packet_t *)content_object->getPayload()->data();
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ data_pkt->setTimestamp(now);
+ data_pkt->setProductionRate(bytes_production_rate_);
+ // set hicn stuff
+ Name n(content_name);
+ content_object->setName(n.setSuffix(current_seg_));
+ content_object->setLifetime(500); // XXX this should be set by the APP
+ content_object->setPathLabel(prod_label_);
+ // update stats
+ produced_bytes_ +=
+ content_object->headerSize() + content_object->payloadSize();
+ produced_packets_++;
+ if (produced_packets_ >= max_packet_production_) {
+ // in this case all the pending interests may be used to accomodate the
+ // sudden increase in the production rate. calling the updateStats we will
+ // notify all the clients
+ round_timer_->cancel();
+ updateStats();
+ }
+ TRANSPORT_LOGD("Sending content object: %s", n.toString().c_str());
+ output_buffer_.insert(content_object);
+ if (*on_content_object_in_output_buffer_) {
+ on_content_object_in_output_buffer_->operator()(*socket_->getInterface(),
+ *content_object);
+ }
+ portal_->sendContentObject(*content_object);
+ if (*on_content_object_output_) {
+ on_content_object_output_->operator()(*socket_->getInterface(),
+ *content_object);
+ }
+ // remove interests from the interest cache if it exists
+ removeFromInterestQueue(current_seg_);
+ current_seg_ = (current_seg_ + 1) % rtc::MIN_PROBE_SEQ;
+void RTCProductionProtocol::onInterest(Interest &interest) {
+ uint32_t interest_seg = interest.getName().getSuffix();
+ uint32_t lifetime = interest.getLifetime();
+ if (interest_seg == 0) {
+ // first packet from the consumer, reset sync state
+ consumer_in_sync_ = false;
+ }
+ if (*on_interest_input_) {
+ on_interest_input_->operator()(*socket_->getInterface(), interest);
+ }
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ if (interest_seg > rtc::MIN_PROBE_SEQ) {
+ TRANSPORT_LOGD("received probe %u", interest_seg);
+ sendNack(interest_seg);
+ return;
+ }
+ TRANSPORT_LOGD("received interest %u", interest_seg);
+ const std::shared_ptr<ContentObject> content_object =
+ output_buffer_.find(interest);
+ if (content_object) {
+ if (*on_interest_satisfied_output_buffer_) {
+ on_interest_satisfied_output_buffer_->operator()(*socket_->getInterface(),
+ interest);
+ }
+ if (*on_content_object_output_) {
+ on_content_object_output_->operator()(*socket_->getInterface(),
+ *content_object);
+ }
+ TRANSPORT_LOGD("Send content %u (onInterest)",
+ content_object->getName().getSuffix());
+ portal_->sendContentObject(*content_object);
+ return;
+ } else {
+ if (*on_interest_process_) {
+ on_interest_process_->operator()(*socket_->getInterface(), interest);
+ }
+ }
+ // if the production rate 0 use delayed nacks
+ if (allow_delayed_nacks_ && interest_seg >= current_seg_) {
+ uint64_t next_timer = ~0;
+ if (!timers_map_.empty()) {
+ next_timer = timers_map_.begin()->first;
+ }
+ uint64_t expiration = now + rtc::SENTINEL_TIMER_INTERVAL;
+ addToInterestQueue(interest_seg, expiration);
+ // here we have at least one interest in the queue, we need to start or
+ // update the timer
+ if (!queue_timer_on_) {
+ // set timeout
+ queue_timer_on_ = true;
+ scheduleQueueTimer(timers_map_.begin()->first - now);
+ } else {
+ // re-schedule the timer because a new interest will expires sooner
+ if (next_timer > timers_map_.begin()->first) {
+ interests_queue_timer_->cancel();
+ scheduleQueueTimer(timers_map_.begin()->first - now);
+ }
+ }
+ return;
+ }
+ if (queue_timer_on_) {
+ // the producer is producing. Send nacks to packets that will expire before
+ // the data production and remove the timer
+ queue_timer_on_ = false;
+ interests_queue_timer_->cancel();
+ sendNacksForPendingInterests();
+ }
+ uint32_t max_gap = (uint32_t)floor(
+ (double)((double)((double)lifetime *
+ rtc::MILLI_IN_A_SEC) *
+ (double)packets_production_rate_));
+ if (interest_seg < current_seg_ || interest_seg > (max_gap + current_seg_)) {
+ sendNack(interest_seg);
+ } else {
+ if (!consumer_in_sync_ && on_consumer_in_sync_) {
+ // we consider the remote consumer to be in sync as soon as it covers 70%
+ // of the production window with interests
+ uint32_t perc = ceil((double)max_gap * 0.7);
+ if (interest_seg > (perc + current_seg_)) {
+ consumer_in_sync_ = true;
+ on_consumer_in_sync_(*socket_->getInterface(), interest);
+ }
+ }
+ uint64_t expiration =
+ now + floor((double)lifetime * rtc::INTEREST_LIFETIME_REDUCTION_FACTOR);
+ addToInterestQueue(interest_seg, expiration);
+ }
+void RTCProductionProtocol::onError(std::error_code ec) {}
+void RTCProductionProtocol::scheduleQueueTimer(uint64_t wait) {
+ interests_queue_timer_->expires_from_now(std::chrono::milliseconds(wait));
+ interests_queue_timer_->async_wait([this](std::error_code ec) {
+ if (ec) return;
+ interestQueueTimer();
+ });
+void RTCProductionProtocol::addToInterestQueue(uint32_t interest_seg,
+ uint64_t expiration) {
+ // check if the seq number exists already
+ auto it_seqs = seqs_map_.find(interest_seg);
+ if (it_seqs != seqs_map_.end()) {
+ // the seq already exists
+ if (expiration < it_seqs->second) {
+ // we need to update the timer becasue we got a smaller one
+ // 1) remove the entry from the multimap
+ // 2) update this entry
+ auto range = timers_map_.equal_range(it_seqs->second);
+ for (auto it_timers = range.first; it_timers != range.second;
+ it_timers++) {
+ if (it_timers->second == it_seqs->first) {
+ timers_map_.erase(it_timers);
+ break;
+ }
+ }
+ timers_map_.insert(
+ std::pair<uint64_t, uint32_t>(expiration, interest_seg));
+ it_seqs->second = expiration;
+ } else {
+ // nothing to do here
+ return;
+ }
+ } else {
+ // add the new seq
+ timers_map_.insert(std::pair<uint64_t, uint32_t>(expiration, interest_seg));
+ seqs_map_.insert(std::pair<uint32_t, uint64_t>(interest_seg, expiration));
+ }
+void RTCProductionProtocol::sendNacksForPendingInterests() {
+ std::unordered_set<uint32_t> to_remove;
+ uint32_t packet_gap = 100000; // set it to a high value (100sec)
+ if (packets_production_rate_ != 0)
+ packet_gap = ceil(rtc::MILLI_IN_A_SEC / (double)packets_production_rate_);
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ for (auto it = seqs_map_.begin(); it != seqs_map_.end(); it++) {
+ if (it->first > current_seg_) {
+ uint64_t production_time =
+ ((it->first - current_seg_) * packet_gap) + now;
+ if (production_time >= it->second) {
+ sendNack(it->first);
+ to_remove.insert(it->first);
+ }
+ }
+ }
+ // delete nacked interests
+ for (auto it = to_remove.begin(); it != to_remove.end(); it++) {
+ removeFromInterestQueue(*it);
+ }
+void RTCProductionProtocol::removeFromInterestQueue(uint32_t interest_seg) {
+ auto seq_it = seqs_map_.find(interest_seg);
+ if (seq_it != seqs_map_.end()) {
+ auto range = timers_map_.equal_range(seq_it->second);
+ for (auto it_timers = range.first; it_timers != range.second; it_timers++) {
+ if (it_timers->second == seq_it->first) {
+ timers_map_.erase(it_timers);
+ break;
+ }
+ }
+ seqs_map_.erase(seq_it);
+ }
+void RTCProductionProtocol::interestQueueTimer() {
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ for (auto it_timers = timers_map_.begin(); it_timers != timers_map_.end();) {
+ uint64_t expire = it_timers->first;
+ if (expire <= now) {
+ uint32_t seq = it_timers->second;
+ sendNack(seq);
+ // remove the interest from the other map
+ seqs_map_.erase(seq);
+ it_timers = timers_map_.erase(it_timers);
+ } else {
+ // stop, we are done!
+ break;
+ }
+ }
+ if (timers_map_.empty()) {
+ queue_timer_on_ = false;
+ } else {
+ queue_timer_on_ = true;
+ scheduleQueueTimer(timers_map_.begin()->first - now);
+ }
+void RTCProductionProtocol::sendNack(uint32_t sequence) {
+ auto nack = core::PacketManager<>::getInstance().getPacket<ContentObject>();
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ uint32_t next_packet = current_seg_;
+ uint32_t prod_rate = bytes_production_rate_;
+ struct rtc::nack_packet_t header;
+ header.setTimestamp(now);
+ header.setProductionRate(prod_rate);
+ header.setProductionSegement(next_packet);
+ nack->appendPayload((const uint8_t *)&header, rtc::NACK_HEADER_SIZE);
+ Name n(flow_name_);
+ n.setSuffix(sequence);
+ nack->setName(n);
+ nack->setLifetime(0);
+ nack->setPathLabel(prod_label_);
+ if (!consumer_in_sync_ && on_consumer_in_sync_ &&
+ sequence < rtc::MIN_PROBE_SEQ && sequence > next_packet) {
+ consumer_in_sync_ = true;
+ auto interest = core::PacketManager<>::getInstance().getPacket<Interest>();
+ interest->setName(n);
+ on_consumer_in_sync_(*socket_->getInterface(), *interest);
+ }
+ if (*on_content_object_output_) {
+ on_content_object_output_->operator()(*socket_->getInterface(), *nack);
+ }
+ TRANSPORT_LOGD("Send nack %u", sequence);
+ portal_->sendContentObject(*nack);
+} // namespace protocol
+} // end namespace transport