aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/protocols/prod_protocol_rtc.cc
diff options
context:
space:
mode:
Diffstat (limited to 'libtransport/src/protocols/prod_protocol_rtc.cc')
-rw-r--r--libtransport/src/protocols/prod_protocol_rtc.cc834
1 files changed, 497 insertions, 337 deletions
diff --git a/libtransport/src/protocols/prod_protocol_rtc.cc b/libtransport/src/protocols/prod_protocol_rtc.cc
index cdc882d81..83cd23ac6 100644
--- a/libtransport/src/protocols/prod_protocol_rtc.cc
+++ b/libtransport/src/protocols/prod_protocol_rtc.cc
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -13,15 +13,21 @@
* limitations under the License.
*/
+#include <glog/logging.h>
#include <hicn/transport/core/global_object_pool.h>
#include <implementation/socket_producer.h>
#include <protocols/prod_protocol_rtc.h>
+#include <protocols/rtc/probe_handler.h>
#include <protocols/rtc/rtc_consts.h>
#include <stdlib.h>
#include <time.h>
#include <unordered_set>
+extern "C" {
+#include <hicn/util/bitmap.h>
+}
+
namespace transport {
namespace protocol {
@@ -31,117 +37,144 @@ RTCProductionProtocol::RTCProductionProtocol(
implementation::ProducerSocket *icn_socket)
: ProductionProtocol(icn_socket),
current_seg_(1),
+ prev_produced_bytes_(0),
+ prev_produced_packets_(0),
produced_bytes_(0),
produced_packets_(0),
- produced_fec_packets_(0),
- max_packet_production_(1),
- bytes_production_rate_(0),
+ max_packet_production_(UINT32_MAX),
+ bytes_production_rate_(UINT32_MAX),
packets_production_rate_(0),
- fec_packets_production_rate_(0),
- last_round_(std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count()),
+ last_produced_data_ts_(0),
+ last_round_(utils::SteadyTime::nowMs().count()),
allow_delayed_nacks_(false),
- queue_timer_on_(false),
- consumer_in_sync_(false),
- on_consumer_in_sync_(nullptr) {
- srand((unsigned int)time(NULL));
- prod_label_ = rand() % 256;
+ pending_fec_pace_(false),
+ max_len_(0),
+ queue_len_(0),
+ data_aggregation_(true),
+ data_aggregation_timer_switch_(false) {
+ std::uniform_int_distribution<> dis(0, 255);
+ prod_label_ = dis(gen_);
cache_label_ = (prod_label_ + 1) % 256;
- interests_queue_timer_ =
- std::make_unique<asio::steady_timer>(portal_->getIoService());
- round_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService());
+ round_timer_ =
+ std::make_unique<asio::steady_timer>(portal_->getThread().getIoService());
+ fec_pacing_timer_ =
+ std::make_unique<asio::steady_timer>(portal_->getThread().getIoService());
+ app_packets_timer_ =
+ std::make_unique<asio::steady_timer>(portal_->getThread().getIoService());
setOutputBufferSize(10000);
- scheduleRoundTimer();
+}
+
+RTCProductionProtocol::~RTCProductionProtocol() {}
+
+void RTCProductionProtocol::setProducerParam() {
+ // Flow name: here we assume there is only one prefix registered in the portal
+ flow_name_ = portal_->getServedNamespaces().begin()->makeName();
+
+ // Default format
+ core::Packet::Format default_format;
+ socket_->getSocketOption(interface::GeneralTransportOptions::PACKET_FORMAT,
+ default_format);
// FEC
using namespace std::placeholders;
enableFEC(std::bind(&RTCProductionProtocol::onFecPackets, this, _1),
std::bind(&RTCProductionProtocol::getBuffer, this, _1));
-}
-RTCProductionProtocol::~RTCProductionProtocol() {}
-
-void RTCProductionProtocol::registerNamespaceWithNetwork(
- const Prefix &producer_namespace) {
- ProductionProtocol::registerNamespaceWithNetwork(producer_namespace);
-
- flow_name_ = producer_namespace.getName();
- auto family = flow_name_.getAddressFamily();
-
- switch (family) {
- case AF_INET6:
- data_header_size_ =
- signer_ && !making_manifest_
- ? (uint32_t)Packet::getHeaderSizeFromFormat(
- HF_INET6_TCP_AH, signer_->getSignatureFieldSize())
- : (uint32_t)Packet::getHeaderSizeFromFormat(HF_INET6_TCP);
- ;
- break;
- case AF_INET:
- data_header_size_ =
- signer_ && !making_manifest_
- ? (uint32_t)Packet::getHeaderSizeFromFormat(
- HF_INET_TCP_AH, signer_->getSignatureFieldSize())
- : (uint32_t)Packet::getHeaderSizeFromFormat(HF_INET_TCP);
- break;
- default:
- throw errors::RuntimeException("Unknown name format.");
- }
+ // Aggregated data
+ socket_->getSocketOption(interface::RtcTransportOptions::AGGREGATED_DATA,
+ data_aggregation_);
+
+ size_t signature_size = signer_->getSignatureFieldSize();
+ data_header_format_ = {!manifest_max_capacity_
+ ? Packet::toAHFormat(default_format)
+ : default_format,
+ !manifest_max_capacity_ ? signature_size : 0};
+ manifest_header_format_ = {Packet::toAHFormat(default_format),
+ signature_size};
+ nack_header_format_ = {Packet::toAHFormat(default_format), signature_size};
+ fec_header_format_ = {Packet::toAHFormat(default_format), signature_size};
+
+ // Initialize verifier for aggregated interests
+ std::shared_ptr<auth::Verifier> verifier;
+ socket_->getSocketOption(implementation::GeneralTransportOptions::VERIFIER,
+ verifier);
+ verifier_ = std::make_shared<rtc::RTCVerifier>(verifier, 0, 0);
+
+ // Schedule round timer
+ scheduleRoundTimer();
}
void RTCProductionProtocol::scheduleRoundTimer() {
round_timer_->expires_from_now(
std::chrono::milliseconds(rtc::PRODUCER_STATS_INTERVAL));
- round_timer_->async_wait([this](std::error_code ec) {
+ std::weak_ptr<RTCProductionProtocol> self = shared_from_this();
+ round_timer_->async_wait([self](const std::error_code &ec) {
if (ec) return;
- updateStats();
+
+ auto sp = self.lock();
+ if (sp && sp->isRunning()) {
+ sp->updateStats(true);
+ }
});
}
-void RTCProductionProtocol::updateStats() {
- uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
+void RTCProductionProtocol::updateStats(bool new_round) {
+ uint64_t now = utils::SteadyTime::nowMs().count();
uint64_t duration = now - last_round_;
- if (duration == 0) duration = 1;
+ if (!new_round) {
+ duration += rtc::PRODUCER_STATS_INTERVAL;
+ } else {
+ prev_produced_bytes_ = 0;
+ prev_produced_packets_ = 0;
+ }
+
double per_second = rtc::MILLI_IN_A_SEC / duration;
uint32_t prev_packets_production_rate = packets_production_rate_;
- bytes_production_rate_ = ceil((double)produced_bytes_ * per_second);
- packets_production_rate_ = ceil((double)produced_packets_ * per_second);
- fec_packets_production_rate_ =
- ceil((double)produced_fec_packets_ * per_second);
-
- DLOG_IF(INFO, VLOG_IS_ON(3))
- << "Updating production rate: produced_bytes_ = " << produced_bytes_
- << " bps = " << bytes_production_rate_;
+ // bytes_production_rate_ does not take into account FEC!!! this is because
+ // each client requests a differen amount of FEC packet so the client itself
+ // increase the production rate in the right way
+ bytes_production_rate_ =
+ ceil((double)(produced_bytes_ + prev_produced_bytes_) * per_second);
+ packets_production_rate_ =
+ ceil((double)(produced_packets_ + prev_produced_packets_) * per_second);
+
+ if (fec_encoder_ && fec_type_ != fec::FECType::UNKNOWN) {
+ // add fec packets looking at the fec code. we don't use directly the number
+ // of fec packets produced in 1 round because it may happen that different
+ // numbers of blocks are generated during the rounds and this creates
+ // inconsistencies in the estimation of the production rate
+ uint32_t k = fec::FECUtils::getSourceSymbols(fec_type_);
+ uint32_t n = fec::FECUtils::getBlockSymbols(fec_type_);
+
+ packets_production_rate_ +=
+ ceil((double)packets_production_rate_ / (double)k) * (n - k);
+ }
// update the production rate as soon as it increases by 10% with respect to
// the last round
max_packet_production_ =
- produced_packets_ + ceil((double)produced_packets_ * 0.1);
+ produced_packets_ + ceil((double)produced_packets_ * 0.10);
if (max_packet_production_ < rtc::WIN_MIN)
max_packet_production_ = rtc::WIN_MIN;
- if (packets_production_rate_ != 0) {
- allow_delayed_nacks_ = false;
- } else if (prev_packets_production_rate == 0) {
- // at least 2 rounds with production rate = 0
+ if (packets_production_rate_ <= rtc::MIN_PRODUCTION_RATE ||
+ prev_packets_production_rate <= rtc::MIN_PRODUCTION_RATE) {
allow_delayed_nacks_ = true;
+ } else {
+ // at least 2 rounds with enough packets
+ allow_delayed_nacks_ = false;
}
- // check if the production rate is decreased. if yes send nacks if needed
- if (prev_packets_production_rate < packets_production_rate_) {
- sendNacksForPendingInterests();
+ if (new_round) {
+ prev_produced_bytes_ = produced_bytes_;
+ prev_produced_packets_ = produced_packets_;
+ produced_bytes_ = 0;
+ produced_packets_ = 0;
+ last_round_ = now;
+ scheduleRoundTimer();
}
-
- produced_bytes_ = 0;
- produced_packets_ = 0;
- produced_fec_packets_ = 0;
- last_round_ = now;
- scheduleRoundTimer();
}
uint32_t RTCProductionProtocol::produceStream(
@@ -164,75 +197,275 @@ void RTCProductionProtocol::produce(ContentObject &content_object) {
uint32_t RTCProductionProtocol::produceDatagram(
const Name &content_name, std::unique_ptr<utils::MemBuf> &&buffer) {
std::size_t buffer_size = buffer->length();
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "Maybe Sending content object: " << content_name;
+
if (TRANSPORT_EXPECT_FALSE(buffer_size == 0)) return 0;
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "Sending content object: " << content_name;
+
uint32_t data_packet_size;
socket_->getSocketOption(interface::GeneralTransportOptions::DATA_PACKET_SIZE,
data_packet_size);
-
- if (TRANSPORT_EXPECT_FALSE((buffer_size + data_header_size_ +
- rtc::DATA_HEADER_SIZE) > data_packet_size)) {
+ // this is a source packet but we check the fec header size of FEC packet in
+ // order to leave room for the header when FEC packets will be generated
+ uint32_t fec_header = 0;
+ if (fec_encoder_) fec_encoder_->getFecHeaderSize(true);
+ uint32_t headers_size =
+ (uint32_t)Packet::getHeaderSizeFromFormat(data_header_format_.first,
+ data_header_format_.second) +
+ rtc::DATA_HEADER_SIZE + fec_header;
+ if (TRANSPORT_EXPECT_FALSE((headers_size + buffer_size) > data_packet_size)) {
return 0;
}
+ if (!data_aggregation_) {
+ // if data aggregation is off emptyQueue will always return doing nothing
+ emptyQueue();
+
+ sendManifest(content_name);
+
+ // create content object
+ auto content_object =
+ core::PacketManager<>::getInstance().getPacket<ContentObject>(
+ data_header_format_.first, data_header_format_.second);
+
+ // add rtc header to the payload
+ struct rtc::data_packet_t header;
+ content_object->appendPayload((const uint8_t *)&header,
+ rtc::DATA_HEADER_SIZE);
+ content_object->appendPayload(buffer->data(), buffer->length());
+
+ // schedule actual sending on internal thread
+ portal_->getThread().tryRunHandlerNow(
+ [this, content_object{std::move(content_object)},
+ content_name]() mutable {
+ produceInternal(std::move(content_object), content_name);
+ });
+ } else {
+ // XXX here we assume that all the packets that we push to the queue have
+ // the same name
+ auto app_pkt = utils::MemBuf::copyBuffer(buffer->data(), buffer->length());
+ addPacketToQueue(std::move(app_pkt));
+ }
+
+ return 1;
+}
+
+void RTCProductionProtocol::addPacketToQueue(
+ std::unique_ptr<utils::MemBuf> &&buffer) {
+ std::size_t buffer_size = buffer->length();
+ if ((queue_len_ + buffer_size) > rtc::MAX_RTC_PAYLOAD_SIZE) {
+ emptyQueue(); // this should guaranty that the generated packet will never
+ // be larger than an MTU
+ }
+
+ waiting_app_packets_.push(std::move(buffer));
+ if (max_len_ < buffer_size) max_len_ = buffer_size;
+ queue_len_ += buffer_size;
+
+ if (waiting_app_packets_.size() >= rtc::MAX_AGGREGATED_PACKETS) {
+ emptyQueue();
+ }
+
+ if (waiting_app_packets_.size() >= 1 && !data_aggregation_timer_switch_) {
+ data_aggregation_timer_switch_ = true;
+ app_packets_timer_->expires_from_now(
+ std::chrono::milliseconds(rtc::AGGREGATED_PACKETS_TIMER));
+ std::weak_ptr<RTCProductionProtocol> self = shared_from_this();
+ app_packets_timer_->async_wait([self](const std::error_code &ec) {
+ if (ec) return;
+
+ auto ptr = self.lock();
+ if (ptr && ptr->isRunning()) {
+ if (!ptr->data_aggregation_timer_switch_) return;
+ ptr->emptyQueue();
+ }
+ });
+ }
+}
+
+void RTCProductionProtocol::emptyQueue() {
+ if (waiting_app_packets_.size() == 0) return; // queue is empty
+
+ Name n(flow_name_);
+
+ // cancel timer is scheduled
+ if (data_aggregation_timer_switch_) {
+ data_aggregation_timer_switch_ = false;
+ app_packets_timer_->cancel();
+ }
+
+ // send a manifest beforehand if the hash buffer if full
+ sendManifest(n);
+
+ // create content object
auto content_object =
core::PacketManager<>::getInstance().getPacket<ContentObject>(
- signer_ ? Format::HF_INET6_TCP_AH : Format::HF_INET6_TCP,
- signer_ ? signer_->getSignatureFieldSize() : 0);
+ data_header_format_.first, data_header_format_.second);
+
// add rtc header to the payload
struct rtc::data_packet_t header;
content_object->appendPayload((const uint8_t *)&header,
rtc::DATA_HEADER_SIZE);
- content_object->appendPayload(buffer->data(), buffer->length());
- // schedule actual sending on internal thread
- portal_->getIoService().dispatch([this,
- content_object{std::move(content_object)},
- content_name]() mutable {
- produceInternal(std::move(content_object), content_name);
+ // init aggregated header
+ rtc::AggrPktHeader hdr(
+ (uint8_t *)(content_object->getPayload()->data() + rtc::DATA_HEADER_SIZE),
+ max_len_, waiting_app_packets_.size());
+ uint32_t header_size = hdr.getHeaderLen();
+ content_object->append(header_size); // leave space for the aggregated header
+
+ uint8_t index = 0;
+ while (waiting_app_packets_.size() != 0) {
+ std::unique_ptr<utils::MemBuf> pkt =
+ std::move(waiting_app_packets_.front());
+ waiting_app_packets_.pop();
+ // XXX for the moment we have a single name, so this works, otherwise we
+ // need to do something else
+ hdr.addPacketToHeader(index, pkt->length());
+ // append packet
+ content_object->appendPayload(pkt->data(), pkt->length());
+ index++;
+ }
+
+ // reset queue values
+ max_len_ = 0;
+ queue_len_ = 0;
+
+ // the packet is ready we need to send it
+ portal_->getThread().tryRunHandlerNow(
+ [this, content_object{std::move(content_object)}, n]() mutable {
+ produceInternal(std::move(content_object), n);
+ });
+}
+
+void RTCProductionProtocol::sendManifest(const Name &name) {
+ if (!manifest_max_capacity_) {
+ return;
+ }
+
+ Name manifest_name = name;
+
+ // If there is not enough hashes to fill a manifest, return early
+ if (manifest_entries_.size() < manifest_max_capacity_) {
+ return;
+ }
+
+ // Create a new manifest
+ std::shared_ptr<core::ContentObjectManifest> manifest =
+ createManifest(manifest_name.setSuffix(current_seg_));
+ auto manifest_co =
+ std::dynamic_pointer_cast<ContentObject>(manifest->getPacket());
+
+ // Fill the manifest with packet hashes that were previously saved
+ uint32_t nb_entries;
+ for (nb_entries = 0; nb_entries < manifest_max_capacity_; ++nb_entries) {
+ if (manifest_entries_.empty()) {
+ break;
+ }
+ std::pair<uint32_t, auth::CryptoHash> front = manifest_entries_.front();
+ manifest->addEntry(front.first, front.second);
+ manifest_entries_.pop();
+ }
+
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "Sending manifest " << manifest_co->getName().getSuffix()
+ << " of size " << nb_entries;
+
+ // Encode and send the manifest
+ manifest->encode();
+ portal_->getThread().tryRunHandlerNow(
+ [this, content_object{std::move(manifest_co)}, manifest_name]() mutable {
+ produceInternal(std::move(content_object), manifest_name);
+ });
+}
+
+std::shared_ptr<core::ContentObjectManifest>
+RTCProductionProtocol::createManifest(const Name &content_name) const {
+ Name name(content_name);
+
+ auth::CryptoHashType hash_algo;
+ socket_->getSocketOption(interface::GeneralTransportOptions::HASH_ALGORITHM,
+ hash_algo);
+
+ uint64_t now = utils::SteadyTime::nowMs().count();
+
+ // Create a new manifest
+ std::shared_ptr<core::ContentObjectManifest> manifest =
+ ContentObjectManifest::createContentManifest(
+ manifest_header_format_.first, name, manifest_header_format_.second);
+ manifest->setHeaders(core::ManifestType::INLINE_MANIFEST,
+ manifest_max_capacity_, hash_algo, false /* is_last */,
+ name);
+
+ // Set connection parameters
+ manifest->setParamsRTC(ParamsRTC{
+ .timestamp = now,
+ .prod_rate = bytes_production_rate_,
+ .prod_seg = current_seg_,
+ .fec_type = fec_type_,
});
- return 1;
+ return manifest;
}
void RTCProductionProtocol::produceInternal(
std::shared_ptr<ContentObject> &&content_object, const Name &content_name,
bool fec) {
+ uint64_t now = utils::SteadyTime::nowMs().count();
+
+ if (fec && (now - last_produced_data_ts_) < rtc::FEC_PACING_TIME) {
+ paced_fec_packets_.push(std::pair<uint64_t, ContentObject::Ptr>(
+ now, std::move(content_object)));
+ postponeFecPacket();
+ } else {
+ // need to check if there are FEC packets waiting to be sent
+ flushFecPkts(current_seg_);
+ producePktInternal(std::move(content_object), content_name, fec);
+ }
+}
+
+void RTCProductionProtocol::producePktInternal(
+ std::shared_ptr<ContentObject> &&content_object, const Name &content_name,
+ bool fec) {
+ bool is_manifest = content_object->getPayloadType() == PayloadType::MANIFEST;
+ uint64_t now = utils::SteadyTime::nowMs().count();
+
// set rtc header
- struct rtc::data_packet_t *data_pkt =
- (struct rtc::data_packet_t *)content_object->getPayload()->data();
- uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
- data_pkt->setTimestamp(now);
- data_pkt->setProductionRate(bytes_production_rate_);
+ if (!is_manifest) {
+ struct rtc::data_packet_t *data_pkt =
+ (struct rtc::data_packet_t *)content_object->getPayload()->data();
+ data_pkt->setTimestamp(now);
+ data_pkt->setProductionRate(bytes_production_rate_);
+ }
// set hicn stuff
Name n(content_name);
content_object->setName(n.setSuffix(current_seg_));
- content_object->setLifetime(500); // XXX this should be set by the APP
- content_object->setPathLabel(prod_label_);
- // sign packet
- if (signer_) {
- signer_->signPacket(content_object.get());
- }
+ uint32_t expiry_time = 0;
+ socket_->getSocketOption(
+ interface::GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME,
+ expiry_time);
+ if (expiry_time == interface::default_values::content_object_expiry_time)
+ expiry_time = 500; // the data expiration time should be set by the App. if
+ // the App does not specify it the default is 500ms
+ content_object->setLifetime(expiry_time);
+ content_object->setPathLabel(prod_label_);
// update stats
if (!fec) {
produced_bytes_ +=
content_object->headerSize() + content_object->payloadSize();
produced_packets_++;
- } else {
- produced_fec_packets_++;
}
- if (produced_packets_ >= max_packet_production_) {
+ if (!data_aggregation_ && produced_packets_ >= max_packet_production_) {
// in this case all the pending interests may be used to accomodate the
// sudden increase in the production rate. calling the updateStats we will
// notify all the clients
- round_timer_->cancel();
- updateStats();
+ updateStats(false);
}
DLOG_IF(INFO, VLOG_IS_ON(3))
@@ -240,8 +473,12 @@ void RTCProductionProtocol::produceInternal(
// pass packet to FEC encoder
if (fec_encoder_ && !fec) {
- fec_encoder_->onPacketProduced(
- *content_object, content_object->headerSize() + rtc::DATA_HEADER_SIZE);
+ uint32_t offset = is_manifest ? (uint32_t)content_object->headerSize()
+ : (uint32_t)content_object->headerSize() +
+ rtc::DATA_HEADER_SIZE;
+ uint32_t metadata = static_cast<uint32_t>(content_object->getPayloadType());
+
+ fec_encoder_->onPacketProduced(*content_object, offset, metadata);
}
output_buffer_.insert(content_object);
@@ -251,18 +488,15 @@ void RTCProductionProtocol::produceInternal(
*content_object);
}
- auto seq_it = seqs_map_.find(current_seg_);
- if (seq_it != seqs_map_.end()) {
- portal_->sendContentObject(*content_object);
- }
+ // TODO we may want to send FEC only if an interest is pending in the pit in
+ sendContentObject(content_object, false, fec);
if (*on_content_object_output_) {
on_content_object_output_->operator()(*socket_->getInterface(),
*content_object);
}
- // remove interests from the interest cache if it exists
- removeFromInterestQueue(current_seg_);
+ if (!fec) last_produced_data_ts_ = now;
// Update current segment
current_seg_ = (current_seg_ + 1) % rtc::MIN_PROBE_SEQ;
@@ -277,254 +511,169 @@ void RTCProductionProtocol::produceInternal(
}
}
-void RTCProductionProtocol::onInterest(Interest &interest) {
- if (*on_interest_input_) {
- on_interest_input_->operator()(*socket_->getInterface(), interest);
+void RTCProductionProtocol::flushFecPkts(uint32_t current_seq_num) {
+ // Currently we immediately send all the pending fec packets
+ // A pacing policy may be helpful, but we do not want to delay too much
+ // the packets at this moment.
+ while (paced_fec_packets_.size() > 0) {
+ producePktInternal(std::move(paced_fec_packets_.front().second), flow_name_,
+ true);
+ paced_fec_packets_.pop();
+ }
+ fec_pacing_timer_->cancel();
+ pending_fec_pace_ = false;
+ postponeFecPacket();
+}
+
+void RTCProductionProtocol::postponeFecPacket() {
+ if (paced_fec_packets_.size() == 0) return;
+ if (pending_fec_pace_) {
+ return;
}
- auto suffix = interest.firstSuffix();
- // numberOfSuffixes returns only the prefixes in the payalod
- // we add + 1 to count anche the seq in the name
- auto n_suffixes = interest.numberOfSuffixes() + 1;
- Name name = interest.getName();
- bool prev_consumer_state = consumer_in_sync_;
+ uint64_t produced_time = paced_fec_packets_.front().first;
+ uint64_t now = utils::SteadyTime::nowMs().count();
- for (uint32_t i = 0; i < n_suffixes; i++) {
- if (i > 0) {
- name.setSuffix(*(suffix + (i - 1)));
- }
- DLOG_IF(INFO, VLOG_IS_ON(3)) << "Received interest " << name;
+ uint64_t wait_time = 0;
+ if ((produced_time + rtc::FEC_PACING_TIME) > now)
+ wait_time = produced_time + rtc::FEC_PACING_TIME - now;
- const std::shared_ptr<ContentObject> content_object =
- output_buffer_.find(name);
+ fec_pacing_timer_->expires_from_now(std::chrono::milliseconds(wait_time));
+ pending_fec_pace_ = true;
- if (content_object) {
- if (*on_interest_satisfied_output_buffer_) {
- on_interest_satisfied_output_buffer_->operator()(
- *socket_->getInterface(), interest);
- }
+ std::weak_ptr<RTCProductionProtocol> self = shared_from_this();
+ fec_pacing_timer_->async_wait([self](const std::error_code &ec) {
+ if (ec) return;
- if (*on_content_object_output_) {
- on_content_object_output_->operator()(*socket_->getInterface(),
- *content_object);
- }
+ auto sp = self.lock();
+ if (sp && sp->isRunning()) {
+ if (!sp->pending_fec_pace_) return;
- DLOG_IF(INFO, VLOG_IS_ON(3))
- << "Send content %u (onInterest) " << content_object->getName();
- content_object->setPathLabel(cache_label_);
- portal_->sendContentObject(*content_object);
- } else {
- if (*on_interest_process_) {
- on_interest_process_->operator()(*socket_->getInterface(), interest);
+ if (sp->paced_fec_packets_.size() > 0) {
+ sp->producePktInternal(std::move(sp->paced_fec_packets_.front().second),
+ sp->flow_name_, true);
+ sp->paced_fec_packets_.pop();
}
- processInterest(name.getSuffix(), interest.getLifetime());
+ sp->pending_fec_pace_ = false;
+ sp->postponeFecPacket();
}
- }
-
- if (prev_consumer_state != consumer_in_sync_ && consumer_in_sync_)
- on_consumer_in_sync_(*socket_->getInterface(), interest);
+ });
}
-void RTCProductionProtocol::processInterest(uint32_t interest_seg,
- uint32_t lifetime) {
- if (interest_seg == 0) {
- // first packet from the consumer, reset sync state
- consumer_in_sync_ = false;
+void RTCProductionProtocol::onInterest(Interest &interest) {
+ if (*on_interest_input_) {
+ on_interest_input_->operator()(*socket_->getInterface(), interest);
}
- uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
+ if (!interest.isValid()) throw std::runtime_error("Bad interest format");
+ if (interest.hasManifest() &&
+ verifier_->verify(interest) != auth::VerificationPolicy::ACCEPT)
+ throw std::runtime_error("Interset manifest verification failed");
- if (interest_seg > rtc::MIN_PROBE_SEQ) {
- DLOG_IF(INFO, VLOG_IS_ON(3)) << "Received probe " << interest_seg;
- sendNack(interest_seg);
- return;
- }
-
- // if the production rate 0 use delayed nacks
- if (allow_delayed_nacks_ && interest_seg >= current_seg_) {
- uint64_t next_timer = ~0;
- if (!timers_map_.empty()) {
- next_timer = timers_map_.begin()->first;
- }
+ uint32_t *suffix = interest.firstSuffix();
+ uint32_t n_suffixes_in_manifest = interest.numberOfSuffixes();
+ hicn_uword *request_bitmap = interest.getRequestBitmap();
- uint64_t expiration = now + rtc::SENTINEL_TIMER_INTERVAL;
- addToInterestQueue(interest_seg, expiration);
-
- // here we have at least one interest in the queue, we need to start or
- // update the timer
- if (!queue_timer_on_) {
- // set timeout
- queue_timer_on_ = true;
- scheduleQueueTimer(timers_map_.begin()->first - now);
- } else {
- // re-schedule the timer because a new interest will expires sooner
- if (next_timer > timers_map_.begin()->first) {
- interests_queue_timer_->cancel();
- scheduleQueueTimer(timers_map_.begin()->first - now);
- }
- }
- return;
- }
+ Name name = interest.getName();
+ uint32_t pos = 0; // Position of current suffix in manifest
- if (queue_timer_on_) {
- // the producer is producing. Send nacks to packets that will expire before
- // the data production and remove the timer
- queue_timer_on_ = false;
- interests_queue_timer_->cancel();
- sendNacksForPendingInterests();
- }
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "Received interest " << name << " (" << n_suffixes_in_manifest
+ << " suffixes in manifest)";
+
+ // Process the suffix in the interest header
+ // (first loop iteration), then suffixes in the manifest
+ do {
+ if (!interest.hasManifest() ||
+ bitmap_is_set_no_check(request_bitmap, pos)) {
+ const std::shared_ptr<ContentObject> content_object =
+ output_buffer_.find(name);
+
+ if (content_object) {
+ if (*on_interest_satisfied_output_buffer_) {
+ on_interest_satisfied_output_buffer_->operator()(
+ *socket_->getInterface(), interest);
+ }
- uint32_t max_gap = (uint32_t)floor(
- (double)((double)((double)lifetime *
- rtc::INTEREST_LIFETIME_REDUCTION_FACTOR /
- rtc::MILLI_IN_A_SEC) *
- (double)(packets_production_rate_ +
- fec_packets_production_rate_)));
+ if (*on_content_object_output_) {
+ on_content_object_output_->operator()(*socket_->getInterface(),
+ *content_object);
+ }
- if (interest_seg < current_seg_ || interest_seg > (max_gap + current_seg_)) {
- sendNack(interest_seg);
- } else {
- if (!consumer_in_sync_ && on_consumer_in_sync_) {
- // we consider the remote consumer to be in sync as soon as it covers 70%
- // of the production window with interests
- uint32_t perc = ceil((double)max_gap * 0.7);
- if (interest_seg > (perc + current_seg_)) {
- consumer_in_sync_ = true;
- // on_consumer_in_sync_(*socket_->getInterface(), interest);
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "Send content %u (onInterest) " << content_object->getName();
+ content_object->setPathLabel(cache_label_);
+ sendContentObject(content_object);
+ } else {
+ if (*on_interest_process_) {
+ on_interest_process_->operator()(*socket_->getInterface(), interest);
+ }
+ processInterest(name.getSuffix(), interest.getLifetime());
}
}
- uint64_t expiration =
- now + floor((double)lifetime * rtc::INTEREST_LIFETIME_REDUCTION_FACTOR);
- addToInterestQueue(interest_seg, expiration);
- }
-}
-void RTCProductionProtocol::onError(std::error_code ec) {}
+ // Retrieve next suffix in the manifest
+ if (interest.hasManifest()) {
+ uint32_t seq = *suffix;
+ suffix++;
-void RTCProductionProtocol::scheduleQueueTimer(uint64_t wait) {
- interests_queue_timer_->expires_from_now(std::chrono::milliseconds(wait));
- interests_queue_timer_->async_wait([this](std::error_code ec) {
- if (ec) return;
- interestQueueTimer();
- });
+ name.setSuffix(seq);
+ interest.setName(name);
+ }
+ } while (pos++ < n_suffixes_in_manifest);
}
-void RTCProductionProtocol::addToInterestQueue(uint32_t interest_seg,
- uint64_t expiration) {
- // check if the seq number exists already
- auto it_seqs = seqs_map_.find(interest_seg);
- if (it_seqs != seqs_map_.end()) {
- // the seq already exists
- if (expiration < it_seqs->second) {
- // we need to update the timer becasue we got a smaller one
- // 1) remove the entry from the multimap
- // 2) update this entry
- auto range = timers_map_.equal_range(it_seqs->second);
- for (auto it_timers = range.first; it_timers != range.second;
- it_timers++) {
- if (it_timers->second == it_seqs->first) {
- timers_map_.erase(it_timers);
- break;
- }
- }
- timers_map_.insert(
- std::pair<uint64_t, uint32_t>(expiration, interest_seg));
- it_seqs->second = expiration;
- } else {
- // nothing to do here
+void RTCProductionProtocol::processInterest(uint32_t interest_seg,
+ uint32_t lifetime) {
+ switch (rtc::ProbeHandler::getProbeType(interest_seg)) {
+ case rtc::ProbeType::INIT:
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "Received init probe " << interest_seg;
+ sendManifestProbe(interest_seg);
return;
- }
- } else {
- // add the new seq
- timers_map_.insert(std::pair<uint64_t, uint32_t>(expiration, interest_seg));
- seqs_map_.insert(std::pair<uint32_t, uint64_t>(interest_seg, expiration));
+ case rtc::ProbeType::RTT:
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "Received RTT probe " << interest_seg;
+ sendNack(interest_seg);
+ return;
+ default:
+ break;
}
-}
-void RTCProductionProtocol::sendNacksForPendingInterests() {
- std::unordered_set<uint32_t> to_remove;
+ if (interest_seg < current_seg_) sendNack(interest_seg);
+}
- uint32_t packet_gap = 100000; // set it to a high value (100sec)
- if (packets_production_rate_ != 0)
- packet_gap = ceil(rtc::MILLI_IN_A_SEC / (double)packets_production_rate_);
+void RTCProductionProtocol::sendManifestProbe(uint32_t sequence) {
+ Name manifest_name(flow_name_);
+ manifest_name.setSuffix(sequence);
- uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
+ std::shared_ptr<core::ContentObjectManifest> manifest_probe =
+ createManifest(manifest_name);
+ auto manifest_probe_co =
+ std::dynamic_pointer_cast<ContentObject>(manifest_probe->getPacket());
- for (auto it = seqs_map_.begin(); it != seqs_map_.end(); it++) {
- if (it->first > current_seg_) {
- uint64_t production_time =
- ((it->first - current_seg_) * packet_gap) + now;
- if (production_time >= it->second) {
- sendNack(it->first);
- to_remove.insert(it->first);
- }
- }
- }
+ manifest_probe_co->setLifetime(0);
+ manifest_probe_co->setPathLabel(prod_label_);
+ manifest_probe->encode();
- // delete nacked interests
- for (auto it = to_remove.begin(); it != to_remove.end(); it++) {
- removeFromInterestQueue(*it);
- }
-}
-
-void RTCProductionProtocol::removeFromInterestQueue(uint32_t interest_seg) {
- auto seq_it = seqs_map_.find(interest_seg);
- if (seq_it != seqs_map_.end()) {
- auto range = timers_map_.equal_range(seq_it->second);
- for (auto it_timers = range.first; it_timers != range.second; it_timers++) {
- if (it_timers->second == seq_it->first) {
- timers_map_.erase(it_timers);
- break;
- }
- }
- seqs_map_.erase(seq_it);
+ if (*on_content_object_output_) {
+ on_content_object_output_->operator()(*socket_->getInterface(),
+ *manifest_probe_co);
}
-}
-void RTCProductionProtocol::interestQueueTimer() {
- uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
-
- for (auto it_timers = timers_map_.begin(); it_timers != timers_map_.end();) {
- uint64_t expire = it_timers->first;
- if (expire <= now) {
- uint32_t seq = it_timers->second;
- sendNack(seq);
- // remove the interest from the other map
- seqs_map_.erase(seq);
- it_timers = timers_map_.erase(it_timers);
- } else {
- // stop, we are done!
- break;
- }
- }
- if (timers_map_.empty()) {
- queue_timer_on_ = false;
- } else {
- queue_timer_on_ = true;
- scheduleQueueTimer(timers_map_.begin()->first - now);
- }
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "Send init probe " << sequence;
+ sendContentObject(manifest_probe_co, true, false);
}
void RTCProductionProtocol::sendNack(uint32_t sequence) {
auto nack = core::PacketManager<>::getInstance().getPacket<ContentObject>(
- signer_ ? Format::HF_INET6_TCP_AH : Format::HF_INET6_TCP,
- signer_ ? signer_->getSignatureFieldSize() : 0);
- uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
+ nack_header_format_.first, nack_header_format_.second);
+ uint64_t now = utils::SteadyTime::nowMs().count();
uint32_t next_packet = current_seg_;
uint32_t prod_rate = bytes_production_rate_;
struct rtc::nack_packet_t header;
header.setTimestamp(now);
header.setProductionRate(prod_rate);
- header.setProductionSegement(next_packet);
+ header.setProductionSegment(next_packet);
nack->appendPayload((const uint8_t *)&header, rtc::NACK_HEADER_SIZE);
Name n(flow_name_);
@@ -533,33 +682,42 @@ void RTCProductionProtocol::sendNack(uint32_t sequence) {
nack->setLifetime(0);
nack->setPathLabel(prod_label_);
- if (signer_) {
- signer_->signPacket(nack.get());
+ if (*on_content_object_output_) {
+ on_content_object_output_->operator()(*socket_->getInterface(), *nack);
}
- if (!consumer_in_sync_ && on_consumer_in_sync_ &&
- sequence < rtc::MIN_PROBE_SEQ && sequence > next_packet) {
- consumer_in_sync_ = true;
- auto interest = core::PacketManager<>::getInstance().getPacket<Interest>();
- interest->setName(n);
- on_consumer_in_sync_(*socket_->getInterface(), *interest);
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "Send nack " << sequence;
+ sendContentObject(nack, true, false);
+}
+
+void RTCProductionProtocol::sendContentObject(
+ std::shared_ptr<ContentObject> content_object, bool nack, bool fec) {
+ bool is_ah = HICN_PACKET_FORMAT_IS_AH(content_object->getFormat());
+
+ // Compute signature
+ if (is_ah) {
+ signer_->signPacket(content_object.get());
}
- if (*on_content_object_output_) {
- on_content_object_output_->operator()(*socket_->getInterface(), *nack);
+ // Compute and save data packet digest
+ if (manifest_max_capacity_ && !is_ah) {
+ auth::CryptoHashType hash_algo;
+ socket_->getSocketOption(interface::GeneralTransportOptions::HASH_ALGORITHM,
+ hash_algo);
+ manifest_entries_.push({content_object->getName().getSuffix(),
+ content_object->computeDigest(hash_algo)});
}
- DLOG_IF(INFO, VLOG_IS_ON(3)) << "Send nack " << sequence;
- portal_->sendContentObject(*nack);
+ portal_->sendContentObject(*content_object);
}
-void RTCProductionProtocol::onFecPackets(
- std::vector<std::pair<uint32_t, fec::buffer>> &packets) {
+void RTCProductionProtocol::onFecPackets(fec::BufferArray &packets) {
DLOG_IF(INFO, VLOG_IS_ON(3))
<< "Produced " << packets.size() << " FEC packets";
+
for (auto &packet : packets) {
auto content_object =
- std::static_pointer_cast<ContentObject>(packet.second);
+ std::static_pointer_cast<ContentObject>(packet.getBuffer());
content_object->prepend(content_object->headerSize() +
rtc::DATA_HEADER_SIZE);
pending_fec_packets_.push(std::move(content_object));
@@ -569,19 +727,21 @@ void RTCProductionProtocol::onFecPackets(
fec::buffer RTCProductionProtocol::getBuffer(std::size_t size) {
DLOG_IF(INFO, VLOG_IS_ON(3))
<< "Asked buffer for FEC symbol of size " << size;
+
auto ret = core::PacketManager<>::getInstance().getPacket<ContentObject>(
- signer_ ? Format::HF_INET6_TCP_AH : Format::HF_INET6_TCP,
- signer_ ? signer_->getSignatureFieldSize() : 0);
+ fec_header_format_.first, fec_header_format_.second);
+
ret->updateLength(rtc::DATA_HEADER_SIZE + size);
ret->append(rtc::DATA_HEADER_SIZE + size);
ret->trimStart(ret->headerSize() + rtc::DATA_HEADER_SIZE);
+
DLOG_IF(INFO, VLOG_IS_ON(3))
<< "Responding with buffer of length " << ret->length();
- assert(ret->length() >= size);
+ DCHECK(ret->length() >= size);
return ret;
}
} // namespace protocol
-} // end namespace transport
+} // namespace transport