aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/protocols/prod_protocol_rtc.cc
diff options
context:
space:
mode:
Diffstat (limited to 'libtransport/src/protocols/prod_protocol_rtc.cc')
-rw-r--r--libtransport/src/protocols/prod_protocol_rtc.cc230
1 files changed, 168 insertions, 62 deletions
diff --git a/libtransport/src/protocols/prod_protocol_rtc.cc b/libtransport/src/protocols/prod_protocol_rtc.cc
index 049752876..cdc882d81 100644
--- a/libtransport/src/protocols/prod_protocol_rtc.cc
+++ b/libtransport/src/protocols/prod_protocol_rtc.cc
@@ -25,15 +25,19 @@
namespace transport {
namespace protocol {
+using Format = core::Packet::Format;
+
RTCProductionProtocol::RTCProductionProtocol(
implementation::ProducerSocket *icn_socket)
: ProductionProtocol(icn_socket),
current_seg_(1),
produced_bytes_(0),
produced_packets_(0),
+ produced_fec_packets_(0),
max_packet_production_(1),
bytes_production_rate_(0),
packets_production_rate_(0),
+ fec_packets_production_rate_(0),
last_round_(std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now().time_since_epoch())
.count()),
@@ -43,11 +47,17 @@ RTCProductionProtocol::RTCProductionProtocol(
on_consumer_in_sync_(nullptr) {
srand((unsigned int)time(NULL));
prod_label_ = rand() % 256;
+ cache_label_ = (prod_label_ + 1) % 256;
interests_queue_timer_ =
std::make_unique<asio::steady_timer>(portal_->getIoService());
round_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService());
setOutputBufferSize(10000);
scheduleRoundTimer();
+
+ // FEC
+ using namespace std::placeholders;
+ enableFEC(std::bind(&RTCProductionProtocol::onFecPackets, this, _1),
+ std::bind(&RTCProductionProtocol::getBuffer, this, _1));
}
RTCProductionProtocol::~RTCProductionProtocol() {}
@@ -61,10 +71,19 @@ void RTCProductionProtocol::registerNamespaceWithNetwork(
switch (family) {
case AF_INET6:
- header_size_ = (uint32_t)Packet::getHeaderSizeFromFormat(HF_INET6_TCP);
+ data_header_size_ =
+ signer_ && !making_manifest_
+ ? (uint32_t)Packet::getHeaderSizeFromFormat(
+ HF_INET6_TCP_AH, signer_->getSignatureFieldSize())
+ : (uint32_t)Packet::getHeaderSizeFromFormat(HF_INET6_TCP);
+ ;
break;
case AF_INET:
- header_size_ = (uint32_t)Packet::getHeaderSizeFromFormat(HF_INET_TCP);
+ data_header_size_ =
+ signer_ && !making_manifest_
+ ? (uint32_t)Packet::getHeaderSizeFromFormat(
+ HF_INET_TCP_AH, signer_->getSignatureFieldSize())
+ : (uint32_t)Packet::getHeaderSizeFromFormat(HF_INET_TCP);
break;
default:
throw errors::RuntimeException("Unknown name format.");
@@ -90,16 +109,19 @@ void RTCProductionProtocol::updateStats() {
uint32_t prev_packets_production_rate = packets_production_rate_;
- bytes_production_rate_ = (uint32_t)ceil((double)produced_bytes_ * per_second);
- packets_production_rate_ = (uint32_t)ceil((double)produced_packets_ * per_second);
+ bytes_production_rate_ = ceil((double)produced_bytes_ * per_second);
+ packets_production_rate_ = ceil((double)produced_packets_ * per_second);
+ fec_packets_production_rate_ =
+ ceil((double)produced_fec_packets_ * per_second);
- TRANSPORT_LOGD("Updating production rate: produced_bytes_ = %u bps = %u",
- produced_bytes_, bytes_production_rate_);
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "Updating production rate: produced_bytes_ = " << produced_bytes_
+ << " bps = " << bytes_production_rate_;
// update the production rate as soon as it increases by 10% with respect to
// the last round
max_packet_production_ =
- produced_packets_ + (uint32_t)ceil((double)produced_packets_ * 0.1);
+ produced_packets_ + ceil((double)produced_packets_ * 0.1);
if (max_packet_production_ < rtc::WIN_MIN)
max_packet_production_ = rtc::WIN_MIN;
@@ -117,6 +139,7 @@ void RTCProductionProtocol::updateStats() {
produced_bytes_ = 0;
produced_packets_ = 0;
+ produced_fec_packets_ = 0;
last_round_ = now;
scheduleRoundTimer();
}
@@ -147,32 +170,34 @@ uint32_t RTCProductionProtocol::produceDatagram(
socket_->getSocketOption(interface::GeneralTransportOptions::DATA_PACKET_SIZE,
data_packet_size);
- if (TRANSPORT_EXPECT_FALSE((buffer_size + header_size_ +
+ if (TRANSPORT_EXPECT_FALSE((buffer_size + data_header_size_ +
rtc::DATA_HEADER_SIZE) > data_packet_size)) {
return 0;
}
auto content_object =
- core::PacketManager<>::getInstance().getPacket<ContentObject>();
+ core::PacketManager<>::getInstance().getPacket<ContentObject>(
+ signer_ ? Format::HF_INET6_TCP_AH : Format::HF_INET6_TCP,
+ signer_ ? signer_->getSignatureFieldSize() : 0);
// add rtc header to the payload
struct rtc::data_packet_t header;
content_object->appendPayload((const uint8_t *)&header,
rtc::DATA_HEADER_SIZE);
content_object->appendPayload(buffer->data(), buffer->length());
- std::shared_ptr<ContentObject> co = std::move(content_object);
-
// schedule actual sending on internal thread
- portal_->getIoService().dispatch(
- [this, content_object{std::move(co)}, content_name]() mutable {
- produceInternal(std::move(content_object), content_name);
- });
+ portal_->getIoService().dispatch([this,
+ content_object{std::move(content_object)},
+ content_name]() mutable {
+ produceInternal(std::move(content_object), content_name);
+ });
return 1;
}
void RTCProductionProtocol::produceInternal(
- std::shared_ptr<ContentObject> &&content_object, const Name &content_name) {
+ std::shared_ptr<ContentObject> &&content_object, const Name &content_name,
+ bool fec) {
// set rtc header
struct rtc::data_packet_t *data_pkt =
(struct rtc::data_packet_t *)content_object->getPayload()->data();
@@ -188,10 +213,19 @@ void RTCProductionProtocol::produceInternal(
content_object->setLifetime(500); // XXX this should be set by the APP
content_object->setPathLabel(prod_label_);
+ // sign packet
+ if (signer_) {
+ signer_->signPacket(content_object.get());
+ }
+
// update stats
- produced_bytes_ += (uint32_t)(
- content_object->headerSize() + content_object->payloadSize());
- produced_packets_++;
+ if (!fec) {
+ produced_bytes_ +=
+ content_object->headerSize() + content_object->payloadSize();
+ produced_packets_++;
+ } else {
+ produced_fec_packets_++;
+ }
if (produced_packets_ >= max_packet_production_) {
// in this case all the pending interests may be used to accomodate the
@@ -201,7 +235,14 @@ void RTCProductionProtocol::produceInternal(
updateStats();
}
- TRANSPORT_LOGD("Sending content object: %s", n.toString().c_str());
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "Sending content object: " << n << ", is fec: " << fec;
+
+ // pass packet to FEC encoder
+ if (fec_encoder_ && !fec) {
+ fec_encoder_->onPacketProduced(
+ *content_object, content_object->headerSize() + rtc::DATA_HEADER_SIZE);
+ }
output_buffer_.insert(content_object);
@@ -210,7 +251,10 @@ void RTCProductionProtocol::produceInternal(
*content_object);
}
- portal_->sendContentObject(*content_object);
+ auto seq_it = seqs_map_.find(current_seg_);
+ if (seq_it != seqs_map_.end()) {
+ portal_->sendContentObject(*content_object);
+ }
if (*on_content_object_output_) {
on_content_object_output_->operator()(*socket_->getInterface(),
@@ -220,58 +264,84 @@ void RTCProductionProtocol::produceInternal(
// remove interests from the interest cache if it exists
removeFromInterestQueue(current_seg_);
+ // Update current segment
current_seg_ = (current_seg_ + 1) % rtc::MIN_PROBE_SEQ;
+
+ // Publish FEC packets if available
+ if (fec_encoder_ && !fec) {
+ while (!fec && pending_fec_packets_.size()) {
+ auto &co = pending_fec_packets_.front();
+ produceInternal(std::move(co), flow_name_, true);
+ pending_fec_packets_.pop();
+ }
+ }
}
void RTCProductionProtocol::onInterest(Interest &interest) {
- uint32_t interest_seg = interest.getName().getSuffix();
- uint32_t lifetime = interest.getLifetime();
+ if (*on_interest_input_) {
+ on_interest_input_->operator()(*socket_->getInterface(), interest);
+ }
+
+ auto suffix = interest.firstSuffix();
+ // numberOfSuffixes returns only the prefixes in the payalod
+ // we add + 1 to count anche the seq in the name
+ auto n_suffixes = interest.numberOfSuffixes() + 1;
+ Name name = interest.getName();
+ bool prev_consumer_state = consumer_in_sync_;
+
+ for (uint32_t i = 0; i < n_suffixes; i++) {
+ if (i > 0) {
+ name.setSuffix(*(suffix + (i - 1)));
+ }
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "Received interest " << name;
+ const std::shared_ptr<ContentObject> content_object =
+ output_buffer_.find(name);
+
+ if (content_object) {
+ if (*on_interest_satisfied_output_buffer_) {
+ on_interest_satisfied_output_buffer_->operator()(
+ *socket_->getInterface(), interest);
+ }
+
+ if (*on_content_object_output_) {
+ on_content_object_output_->operator()(*socket_->getInterface(),
+ *content_object);
+ }
+
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "Send content %u (onInterest) " << content_object->getName();
+ content_object->setPathLabel(cache_label_);
+ portal_->sendContentObject(*content_object);
+ } else {
+ if (*on_interest_process_) {
+ on_interest_process_->operator()(*socket_->getInterface(), interest);
+ }
+ processInterest(name.getSuffix(), interest.getLifetime());
+ }
+ }
+
+ if (prev_consumer_state != consumer_in_sync_ && consumer_in_sync_)
+ on_consumer_in_sync_(*socket_->getInterface(), interest);
+}
+
+void RTCProductionProtocol::processInterest(uint32_t interest_seg,
+ uint32_t lifetime) {
if (interest_seg == 0) {
// first packet from the consumer, reset sync state
consumer_in_sync_ = false;
}
- if (*on_interest_input_) {
- on_interest_input_->operator()(*socket_->getInterface(), interest);
- }
-
uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now().time_since_epoch())
.count();
if (interest_seg > rtc::MIN_PROBE_SEQ) {
- TRANSPORT_LOGD("received probe %u", interest_seg);
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "Received probe " << interest_seg;
sendNack(interest_seg);
return;
}
- TRANSPORT_LOGD("received interest %u", interest_seg);
-
- const std::shared_ptr<ContentObject> content_object =
- output_buffer_.find(interest);
-
- if (content_object) {
- if (*on_interest_satisfied_output_buffer_) {
- on_interest_satisfied_output_buffer_->operator()(*socket_->getInterface(),
- interest);
- }
-
- if (*on_content_object_output_) {
- on_content_object_output_->operator()(*socket_->getInterface(),
- *content_object);
- }
-
- TRANSPORT_LOGD("Send content %u (onInterest)",
- content_object->getName().getSuffix());
- portal_->sendContentObject(*content_object);
- return;
- } else {
- if (*on_interest_process_) {
- on_interest_process_->operator()(*socket_->getInterface(), interest);
- }
- }
-
// if the production rate 0 use delayed nacks
if (allow_delayed_nacks_ && interest_seg >= current_seg_) {
uint64_t next_timer = ~0;
@@ -310,7 +380,8 @@ void RTCProductionProtocol::onInterest(Interest &interest) {
(double)((double)((double)lifetime *
rtc::INTEREST_LIFETIME_REDUCTION_FACTOR /
rtc::MILLI_IN_A_SEC) *
- (double)packets_production_rate_));
+ (double)(packets_production_rate_ +
+ fec_packets_production_rate_)));
if (interest_seg < current_seg_ || interest_seg > (max_gap + current_seg_)) {
sendNack(interest_seg);
@@ -318,14 +389,14 @@ void RTCProductionProtocol::onInterest(Interest &interest) {
if (!consumer_in_sync_ && on_consumer_in_sync_) {
// we consider the remote consumer to be in sync as soon as it covers 70%
// of the production window with interests
- uint32_t perc = (uint32_t)ceil((double)max_gap * 0.7);
+ uint32_t perc = ceil((double)max_gap * 0.7);
if (interest_seg > (perc + current_seg_)) {
consumer_in_sync_ = true;
- on_consumer_in_sync_(*socket_->getInterface(), interest);
+ // on_consumer_in_sync_(*socket_->getInterface(), interest);
}
}
- uint64_t expiration =(uint32_t)(
- now + floor((double)lifetime * rtc::INTEREST_LIFETIME_REDUCTION_FACTOR));
+ uint64_t expiration =
+ now + floor((double)lifetime * rtc::INTEREST_LIFETIME_REDUCTION_FACTOR);
addToInterestQueue(interest_seg, expiration);
}
}
@@ -377,7 +448,7 @@ void RTCProductionProtocol::sendNacksForPendingInterests() {
uint32_t packet_gap = 100000; // set it to a high value (100sec)
if (packets_production_rate_ != 0)
- packet_gap = (uint32_t)ceil(rtc::MILLI_IN_A_SEC / (double)packets_production_rate_);
+ packet_gap = ceil(rtc::MILLI_IN_A_SEC / (double)packets_production_rate_);
uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now().time_since_epoch())
@@ -441,7 +512,9 @@ void RTCProductionProtocol::interestQueueTimer() {
}
void RTCProductionProtocol::sendNack(uint32_t sequence) {
- auto nack = core::PacketManager<>::getInstance().getPacket<ContentObject>();
+ auto nack = core::PacketManager<>::getInstance().getPacket<ContentObject>(
+ signer_ ? Format::HF_INET6_TCP_AH : Format::HF_INET6_TCP,
+ signer_ ? signer_->getSignatureFieldSize() : 0);
uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now().time_since_epoch())
.count();
@@ -460,6 +533,10 @@ void RTCProductionProtocol::sendNack(uint32_t sequence) {
nack->setLifetime(0);
nack->setPathLabel(prod_label_);
+ if (signer_) {
+ signer_->signPacket(nack.get());
+ }
+
if (!consumer_in_sync_ && on_consumer_in_sync_ &&
sequence < rtc::MIN_PROBE_SEQ && sequence > next_packet) {
consumer_in_sync_ = true;
@@ -472,10 +549,39 @@ void RTCProductionProtocol::sendNack(uint32_t sequence) {
on_content_object_output_->operator()(*socket_->getInterface(), *nack);
}
- TRANSPORT_LOGD("Send nack %u", sequence);
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "Send nack " << sequence;
portal_->sendContentObject(*nack);
}
+void RTCProductionProtocol::onFecPackets(
+ std::vector<std::pair<uint32_t, fec::buffer>> &packets) {
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "Produced " << packets.size() << " FEC packets";
+ for (auto &packet : packets) {
+ auto content_object =
+ std::static_pointer_cast<ContentObject>(packet.second);
+ content_object->prepend(content_object->headerSize() +
+ rtc::DATA_HEADER_SIZE);
+ pending_fec_packets_.push(std::move(content_object));
+ }
+}
+
+fec::buffer RTCProductionProtocol::getBuffer(std::size_t size) {
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "Asked buffer for FEC symbol of size " << size;
+ auto ret = core::PacketManager<>::getInstance().getPacket<ContentObject>(
+ signer_ ? Format::HF_INET6_TCP_AH : Format::HF_INET6_TCP,
+ signer_ ? signer_->getSignatureFieldSize() : 0);
+ ret->updateLength(rtc::DATA_HEADER_SIZE + size);
+ ret->append(rtc::DATA_HEADER_SIZE + size);
+ ret->trimStart(ret->headerSize() + rtc::DATA_HEADER_SIZE);
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "Responding with buffer of length " << ret->length();
+ assert(ret->length() >= size);
+
+ return ret;
+}
+
} // namespace protocol
} // end namespace transport