From 08233d44a6cfde878d7e10bca38ae935ed1c8fd5 Mon Sep 17 00:00:00 2001 From: Mauro Date: Wed, 30 Jun 2021 07:57:22 +0000 Subject: [HICN-713] Transport Library Major Refactoring 2 Co-authored-by: Luca Muscariello Co-authored-by: Michele Papalini Co-authored-by: Olivier Roques Co-authored-by: Giulio Grassi Signed-off-by: Mauro Sardara Change-Id: I5b2c667bad66feb45abdb5effe22ed0f6c85d1c2 --- libtransport/src/protocols/prod_protocol_rtc.cc | 230 +++++++++++++++++------- 1 file changed, 168 insertions(+), 62 deletions(-) (limited to 'libtransport/src/protocols/prod_protocol_rtc.cc') diff --git a/libtransport/src/protocols/prod_protocol_rtc.cc b/libtransport/src/protocols/prod_protocol_rtc.cc index 049752876..cdc882d81 100644 --- a/libtransport/src/protocols/prod_protocol_rtc.cc +++ b/libtransport/src/protocols/prod_protocol_rtc.cc @@ -25,15 +25,19 @@ namespace transport { namespace protocol { +using Format = core::Packet::Format; + RTCProductionProtocol::RTCProductionProtocol( implementation::ProducerSocket *icn_socket) : ProductionProtocol(icn_socket), current_seg_(1), produced_bytes_(0), produced_packets_(0), + produced_fec_packets_(0), max_packet_production_(1), bytes_production_rate_(0), packets_production_rate_(0), + fec_packets_production_rate_(0), last_round_(std::chrono::duration_cast( std::chrono::steady_clock::now().time_since_epoch()) .count()), @@ -43,11 +47,17 @@ RTCProductionProtocol::RTCProductionProtocol( on_consumer_in_sync_(nullptr) { srand((unsigned int)time(NULL)); prod_label_ = rand() % 256; + cache_label_ = (prod_label_ + 1) % 256; interests_queue_timer_ = std::make_unique(portal_->getIoService()); round_timer_ = std::make_unique(portal_->getIoService()); setOutputBufferSize(10000); scheduleRoundTimer(); + + // FEC + using namespace std::placeholders; + enableFEC(std::bind(&RTCProductionProtocol::onFecPackets, this, _1), + std::bind(&RTCProductionProtocol::getBuffer, this, _1)); } RTCProductionProtocol::~RTCProductionProtocol() {} @@ -61,10 +71,19 @@ void RTCProductionProtocol::registerNamespaceWithNetwork( switch (family) { case AF_INET6: - header_size_ = (uint32_t)Packet::getHeaderSizeFromFormat(HF_INET6_TCP); + data_header_size_ = + signer_ && !making_manifest_ + ? (uint32_t)Packet::getHeaderSizeFromFormat( + HF_INET6_TCP_AH, signer_->getSignatureFieldSize()) + : (uint32_t)Packet::getHeaderSizeFromFormat(HF_INET6_TCP); + ; break; case AF_INET: - header_size_ = (uint32_t)Packet::getHeaderSizeFromFormat(HF_INET_TCP); + data_header_size_ = + signer_ && !making_manifest_ + ? (uint32_t)Packet::getHeaderSizeFromFormat( + HF_INET_TCP_AH, signer_->getSignatureFieldSize()) + : (uint32_t)Packet::getHeaderSizeFromFormat(HF_INET_TCP); break; default: throw errors::RuntimeException("Unknown name format."); @@ -90,16 +109,19 @@ void RTCProductionProtocol::updateStats() { uint32_t prev_packets_production_rate = packets_production_rate_; - bytes_production_rate_ = (uint32_t)ceil((double)produced_bytes_ * per_second); - packets_production_rate_ = (uint32_t)ceil((double)produced_packets_ * per_second); + bytes_production_rate_ = ceil((double)produced_bytes_ * per_second); + packets_production_rate_ = ceil((double)produced_packets_ * per_second); + fec_packets_production_rate_ = + ceil((double)produced_fec_packets_ * per_second); - TRANSPORT_LOGD("Updating production rate: produced_bytes_ = %u bps = %u", - produced_bytes_, bytes_production_rate_); + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "Updating production rate: produced_bytes_ = " << produced_bytes_ + << " bps = " << bytes_production_rate_; // update the production rate as soon as it increases by 10% with respect to // the last round max_packet_production_ = - produced_packets_ + (uint32_t)ceil((double)produced_packets_ * 0.1); + produced_packets_ + ceil((double)produced_packets_ * 0.1); if (max_packet_production_ < rtc::WIN_MIN) max_packet_production_ = rtc::WIN_MIN; @@ -117,6 +139,7 @@ void RTCProductionProtocol::updateStats() { produced_bytes_ = 0; produced_packets_ = 0; + produced_fec_packets_ = 0; last_round_ = now; scheduleRoundTimer(); } @@ -147,32 +170,34 @@ uint32_t RTCProductionProtocol::produceDatagram( socket_->getSocketOption(interface::GeneralTransportOptions::DATA_PACKET_SIZE, data_packet_size); - if (TRANSPORT_EXPECT_FALSE((buffer_size + header_size_ + + if (TRANSPORT_EXPECT_FALSE((buffer_size + data_header_size_ + rtc::DATA_HEADER_SIZE) > data_packet_size)) { return 0; } auto content_object = - core::PacketManager<>::getInstance().getPacket(); + core::PacketManager<>::getInstance().getPacket( + signer_ ? Format::HF_INET6_TCP_AH : Format::HF_INET6_TCP, + signer_ ? signer_->getSignatureFieldSize() : 0); // add rtc header to the payload struct rtc::data_packet_t header; content_object->appendPayload((const uint8_t *)&header, rtc::DATA_HEADER_SIZE); content_object->appendPayload(buffer->data(), buffer->length()); - std::shared_ptr co = std::move(content_object); - // schedule actual sending on internal thread - portal_->getIoService().dispatch( - [this, content_object{std::move(co)}, content_name]() mutable { - produceInternal(std::move(content_object), content_name); - }); + portal_->getIoService().dispatch([this, + content_object{std::move(content_object)}, + content_name]() mutable { + produceInternal(std::move(content_object), content_name); + }); return 1; } void RTCProductionProtocol::produceInternal( - std::shared_ptr &&content_object, const Name &content_name) { + std::shared_ptr &&content_object, const Name &content_name, + bool fec) { // set rtc header struct rtc::data_packet_t *data_pkt = (struct rtc::data_packet_t *)content_object->getPayload()->data(); @@ -188,10 +213,19 @@ void RTCProductionProtocol::produceInternal( content_object->setLifetime(500); // XXX this should be set by the APP content_object->setPathLabel(prod_label_); + // sign packet + if (signer_) { + signer_->signPacket(content_object.get()); + } + // update stats - produced_bytes_ += (uint32_t)( - content_object->headerSize() + content_object->payloadSize()); - produced_packets_++; + if (!fec) { + produced_bytes_ += + content_object->headerSize() + content_object->payloadSize(); + produced_packets_++; + } else { + produced_fec_packets_++; + } if (produced_packets_ >= max_packet_production_) { // in this case all the pending interests may be used to accomodate the @@ -201,7 +235,14 @@ void RTCProductionProtocol::produceInternal( updateStats(); } - TRANSPORT_LOGD("Sending content object: %s", n.toString().c_str()); + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "Sending content object: " << n << ", is fec: " << fec; + + // pass packet to FEC encoder + if (fec_encoder_ && !fec) { + fec_encoder_->onPacketProduced( + *content_object, content_object->headerSize() + rtc::DATA_HEADER_SIZE); + } output_buffer_.insert(content_object); @@ -210,7 +251,10 @@ void RTCProductionProtocol::produceInternal( *content_object); } - portal_->sendContentObject(*content_object); + auto seq_it = seqs_map_.find(current_seg_); + if (seq_it != seqs_map_.end()) { + portal_->sendContentObject(*content_object); + } if (*on_content_object_output_) { on_content_object_output_->operator()(*socket_->getInterface(), @@ -220,58 +264,84 @@ void RTCProductionProtocol::produceInternal( // remove interests from the interest cache if it exists removeFromInterestQueue(current_seg_); + // Update current segment current_seg_ = (current_seg_ + 1) % rtc::MIN_PROBE_SEQ; + + // Publish FEC packets if available + if (fec_encoder_ && !fec) { + while (!fec && pending_fec_packets_.size()) { + auto &co = pending_fec_packets_.front(); + produceInternal(std::move(co), flow_name_, true); + pending_fec_packets_.pop(); + } + } } void RTCProductionProtocol::onInterest(Interest &interest) { - uint32_t interest_seg = interest.getName().getSuffix(); - uint32_t lifetime = interest.getLifetime(); + if (*on_interest_input_) { + on_interest_input_->operator()(*socket_->getInterface(), interest); + } + + auto suffix = interest.firstSuffix(); + // numberOfSuffixes returns only the prefixes in the payalod + // we add + 1 to count anche the seq in the name + auto n_suffixes = interest.numberOfSuffixes() + 1; + Name name = interest.getName(); + bool prev_consumer_state = consumer_in_sync_; + + for (uint32_t i = 0; i < n_suffixes; i++) { + if (i > 0) { + name.setSuffix(*(suffix + (i - 1))); + } + DLOG_IF(INFO, VLOG_IS_ON(3)) << "Received interest " << name; + const std::shared_ptr content_object = + output_buffer_.find(name); + + if (content_object) { + if (*on_interest_satisfied_output_buffer_) { + on_interest_satisfied_output_buffer_->operator()( + *socket_->getInterface(), interest); + } + + if (*on_content_object_output_) { + on_content_object_output_->operator()(*socket_->getInterface(), + *content_object); + } + + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "Send content %u (onInterest) " << content_object->getName(); + content_object->setPathLabel(cache_label_); + portal_->sendContentObject(*content_object); + } else { + if (*on_interest_process_) { + on_interest_process_->operator()(*socket_->getInterface(), interest); + } + processInterest(name.getSuffix(), interest.getLifetime()); + } + } + + if (prev_consumer_state != consumer_in_sync_ && consumer_in_sync_) + on_consumer_in_sync_(*socket_->getInterface(), interest); +} + +void RTCProductionProtocol::processInterest(uint32_t interest_seg, + uint32_t lifetime) { if (interest_seg == 0) { // first packet from the consumer, reset sync state consumer_in_sync_ = false; } - if (*on_interest_input_) { - on_interest_input_->operator()(*socket_->getInterface(), interest); - } - uint64_t now = std::chrono::duration_cast( std::chrono::steady_clock::now().time_since_epoch()) .count(); if (interest_seg > rtc::MIN_PROBE_SEQ) { - TRANSPORT_LOGD("received probe %u", interest_seg); + DLOG_IF(INFO, VLOG_IS_ON(3)) << "Received probe " << interest_seg; sendNack(interest_seg); return; } - TRANSPORT_LOGD("received interest %u", interest_seg); - - const std::shared_ptr content_object = - output_buffer_.find(interest); - - if (content_object) { - if (*on_interest_satisfied_output_buffer_) { - on_interest_satisfied_output_buffer_->operator()(*socket_->getInterface(), - interest); - } - - if (*on_content_object_output_) { - on_content_object_output_->operator()(*socket_->getInterface(), - *content_object); - } - - TRANSPORT_LOGD("Send content %u (onInterest)", - content_object->getName().getSuffix()); - portal_->sendContentObject(*content_object); - return; - } else { - if (*on_interest_process_) { - on_interest_process_->operator()(*socket_->getInterface(), interest); - } - } - // if the production rate 0 use delayed nacks if (allow_delayed_nacks_ && interest_seg >= current_seg_) { uint64_t next_timer = ~0; @@ -310,7 +380,8 @@ void RTCProductionProtocol::onInterest(Interest &interest) { (double)((double)((double)lifetime * rtc::INTEREST_LIFETIME_REDUCTION_FACTOR / rtc::MILLI_IN_A_SEC) * - (double)packets_production_rate_)); + (double)(packets_production_rate_ + + fec_packets_production_rate_))); if (interest_seg < current_seg_ || interest_seg > (max_gap + current_seg_)) { sendNack(interest_seg); @@ -318,14 +389,14 @@ void RTCProductionProtocol::onInterest(Interest &interest) { if (!consumer_in_sync_ && on_consumer_in_sync_) { // we consider the remote consumer to be in sync as soon as it covers 70% // of the production window with interests - uint32_t perc = (uint32_t)ceil((double)max_gap * 0.7); + uint32_t perc = ceil((double)max_gap * 0.7); if (interest_seg > (perc + current_seg_)) { consumer_in_sync_ = true; - on_consumer_in_sync_(*socket_->getInterface(), interest); + // on_consumer_in_sync_(*socket_->getInterface(), interest); } } - uint64_t expiration =(uint32_t)( - now + floor((double)lifetime * rtc::INTEREST_LIFETIME_REDUCTION_FACTOR)); + uint64_t expiration = + now + floor((double)lifetime * rtc::INTEREST_LIFETIME_REDUCTION_FACTOR); addToInterestQueue(interest_seg, expiration); } } @@ -377,7 +448,7 @@ void RTCProductionProtocol::sendNacksForPendingInterests() { uint32_t packet_gap = 100000; // set it to a high value (100sec) if (packets_production_rate_ != 0) - packet_gap = (uint32_t)ceil(rtc::MILLI_IN_A_SEC / (double)packets_production_rate_); + packet_gap = ceil(rtc::MILLI_IN_A_SEC / (double)packets_production_rate_); uint64_t now = std::chrono::duration_cast( std::chrono::steady_clock::now().time_since_epoch()) @@ -441,7 +512,9 @@ void RTCProductionProtocol::interestQueueTimer() { } void RTCProductionProtocol::sendNack(uint32_t sequence) { - auto nack = core::PacketManager<>::getInstance().getPacket(); + auto nack = core::PacketManager<>::getInstance().getPacket( + signer_ ? Format::HF_INET6_TCP_AH : Format::HF_INET6_TCP, + signer_ ? signer_->getSignatureFieldSize() : 0); uint64_t now = std::chrono::duration_cast( std::chrono::steady_clock::now().time_since_epoch()) .count(); @@ -460,6 +533,10 @@ void RTCProductionProtocol::sendNack(uint32_t sequence) { nack->setLifetime(0); nack->setPathLabel(prod_label_); + if (signer_) { + signer_->signPacket(nack.get()); + } + if (!consumer_in_sync_ && on_consumer_in_sync_ && sequence < rtc::MIN_PROBE_SEQ && sequence > next_packet) { consumer_in_sync_ = true; @@ -472,10 +549,39 @@ void RTCProductionProtocol::sendNack(uint32_t sequence) { on_content_object_output_->operator()(*socket_->getInterface(), *nack); } - TRANSPORT_LOGD("Send nack %u", sequence); + DLOG_IF(INFO, VLOG_IS_ON(3)) << "Send nack " << sequence; portal_->sendContentObject(*nack); } +void RTCProductionProtocol::onFecPackets( + std::vector> &packets) { + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "Produced " << packets.size() << " FEC packets"; + for (auto &packet : packets) { + auto content_object = + std::static_pointer_cast(packet.second); + content_object->prepend(content_object->headerSize() + + rtc::DATA_HEADER_SIZE); + pending_fec_packets_.push(std::move(content_object)); + } +} + +fec::buffer RTCProductionProtocol::getBuffer(std::size_t size) { + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "Asked buffer for FEC symbol of size " << size; + auto ret = core::PacketManager<>::getInstance().getPacket( + signer_ ? Format::HF_INET6_TCP_AH : Format::HF_INET6_TCP, + signer_ ? signer_->getSignatureFieldSize() : 0); + ret->updateLength(rtc::DATA_HEADER_SIZE + size); + ret->append(rtc::DATA_HEADER_SIZE + size); + ret->trimStart(ret->headerSize() + rtc::DATA_HEADER_SIZE); + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "Responding with buffer of length " << ret->length(); + assert(ret->length() >= size); + + return ret; +} + } // namespace protocol } // end namespace transport -- cgit 1.2.3-korg