diff options
Diffstat (limited to 'libtransport/src/protocols/rtc/rtc.cc')
-rw-r--r-- | libtransport/src/protocols/rtc/rtc.cc | 466 |
1 files changed, 312 insertions, 154 deletions
diff --git a/libtransport/src/protocols/rtc/rtc.cc b/libtransport/src/protocols/rtc/rtc.cc index 46659ac74..0cb4cda1d 100644 --- a/libtransport/src/protocols/rtc/rtc.cc +++ b/libtransport/src/protocols/rtc/rtc.cc @@ -17,8 +17,11 @@ #include <hicn/transport/interfaces/socket_consumer.h> #include <implementation/socket_consumer.h> #include <math.h> +#include <protocols/errors.h> +#include <protocols/incremental_indexer_bytestream.h> #include <protocols/rtc/rtc.h> #include <protocols/rtc/rtc_consts.h> +#include <protocols/rtc/rtc_indexer.h> #include <protocols/rtc/rtc_rc_queue.h> #include <algorithm> @@ -33,39 +36,41 @@ using namespace interface; RTCTransportProtocol::RTCTransportProtocol( implementation::ConsumerSocket *icn_socket) - : TransportProtocol(icn_socket, nullptr), - DatagramReassembly(icn_socket, this), + : TransportProtocol(icn_socket, new RtcIndexer<>(icn_socket, this), + new DatagramReassembly(icn_socket, this)), number_(0) { icn_socket->getSocketOption(PORTAL, portal_); round_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService()); scheduler_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService()); + pacing_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService()); } RTCTransportProtocol::~RTCTransportProtocol() {} void RTCTransportProtocol::resume() { - if (is_running_) return; - - is_running_ = true; - newRound(); + TransportProtocol::resume(); +} - portal_->runEventsLoop(); - is_running_ = false; +std::size_t RTCTransportProtocol::transportHeaderLength() { + return DATA_HEADER_SIZE + + (fec_decoder_ != nullptr ? fec_decoder_->getFecHeaderSize() : 0); } // private void RTCTransportProtocol::initParams() { - portal_->setConsumerCallback(this); + TransportProtocol::reset(); rc_ = std::make_shared<RTCRateControlQueue>(); ldr_ = std::make_shared<RTCLossDetectionAndRecovery>( + indexer_verifier_.get(), std::bind(&RTCTransportProtocol::sendRtxInterest, this, std::placeholders::_1), portal_->getIoService()); state_ = std::make_shared<RTCState>( + indexer_verifier_.get(), std::bind(&RTCTransportProtocol::sendProbeInterest, this, std::placeholders::_1), std::bind(&RTCTransportProtocol::discoveredRtt, this), @@ -83,8 +88,27 @@ void RTCTransportProtocol::initParams() { // Cancel timer number_++; round_timer_->cancel(); + scheduler_timer_->cancel(); scheduler_timer_on_ = false; + last_interest_sent_time_ = 0; + last_interest_sent_seq_ = 0; + +#if 0 + if(portal_->isConnectedToFwd()){ + max_aggregated_interest_ = 1; + }else{ + max_aggregated_interest_ = MAX_INTERESTS_IN_BATCH; + } +#else + max_aggregated_interest_ = 1; +#endif + + max_sent_int_ = + std::ceil((double)MAX_PACING_BATCH / (double)max_aggregated_interest_); + + pacing_timer_->cancel(); + pacing_timer_on_ = false; // delete all timeouts and future nacks timeouts_or_nacks_.clear(); @@ -93,16 +117,28 @@ void RTCTransportProtocol::initParams() { current_sync_win_ = INITIAL_WIN; max_sync_win_ = INITIAL_WIN_MAX; - // names/packets var - next_segment_ = 0; - socket_->setSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, RTC_INTEREST_LIFETIME); + + // FEC + using namespace std::placeholders; + enableFEC(std::bind(&RTCTransportProtocol::onFecPackets, this, _1), + /* We leave the buffer allocation to the fec decoder */ + fec::FECBase::BufferRequested(0)); + + if (fec_decoder_) { + indexer_verifier_->enableFec(fec_type_); + indexer_verifier_->setNFec(0); + ldr_->setFecParams(fec::FECUtils::getBlockSymbols(fec_type_), + fec::FECUtils::getSourceSymbols(fec_type_)); + } else { + indexer_verifier_->disableFec(); + } } // private void RTCTransportProtocol::reset() { - TRANSPORT_LOGD("reset called"); + DLOG_IF(INFO, VLOG_IS_ON(3)) << "reset called"; initParams(); newRound(); } @@ -113,11 +149,13 @@ void RTCTransportProtocol::inactiveProducer() { current_sync_win_ = INITIAL_WIN; max_sync_win_ = INITIAL_WIN_MAX; - TRANSPORT_LOGD("Current window: %u, max_sync_win_: %u", current_sync_win_, - max_sync_win_); + DLOG_IF(INFO, VLOG_IS_ON(3)) << "Current window: " << current_sync_win_ + << ", max_sync_win_: " << max_sync_win_; // names/packets var - next_segment_ = 0; + indexer_verifier_->reset(); + indexer_verifier_->enableFec(fec_type_); + indexer_verifier_->setNFec(0); ldr_->clear(); } @@ -137,10 +175,13 @@ void RTCTransportProtocol::newRound() { uint32_t received_bytes = state_->getReceivedBytesInRound(); uint32_t sent_interest = state_->getSentInterestInRound(); uint32_t lost_data = state_->getLostData(); + uint32_t definitely_lost = state_->getDefinitelyLostPackets(); uint32_t recovered_losses = state_->getRecoveredLosses(); uint32_t received_nacks = state_->getReceivedNacksInRound(); + uint32_t received_fec = state_->getReceivedFecPackets(); bool in_sync = (current_state_ == SyncState::in_sync); + ldr_->onNewRound(in_sync); state_->onNewRound((double)ROUND_LEN, in_sync); rc_->onNewRound((double)ROUND_LEN); @@ -161,11 +202,13 @@ void RTCTransportProtocol::newRound() { } } - TRANSPORT_LOGD("Calling updateSyncWindow in newRound function"); + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "Calling updateSyncWindow in newRound function"; updateSyncWindow(); sendStatsToApp(sent_retx, received_bytes, sent_interest, lost_data, - recovered_losses, received_nacks); + definitely_lost, recovered_losses, received_nacks, + received_fec); newRound(); }); } @@ -173,6 +216,7 @@ void RTCTransportProtocol::newRound() { void RTCTransportProtocol::discoveredRtt() { start_send_interest_ = true; ldr_->turnOnRTX(); + ldr_->onNewRound(false); updateSyncWindow(); } @@ -182,22 +226,23 @@ void RTCTransportProtocol::computeMaxSyncWindow() { if (production_rate == 0.0 || packet_size == 0.0) { // the consumer has no info about the producer, // keep the previous maxCWin - TRANSPORT_LOGD( - "Returning in computeMaxSyncWindow because: prod_rate: %d || " - "packet_size: %d", - (int)(production_rate == 0.0), (int)(packet_size == 0.0)); + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "Returning in computeMaxSyncWindow because: prod_rate: " + << (production_rate == 0.0) + << " || packet_size: " << (packet_size == 0.0); return; } + production_rate += (production_rate * indexer_verifier_->getMaxFecOverhead()); + uint32_t lifetime = default_values::interest_lifetime; socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, lifetime); double lifetime_ms = (double)lifetime / MILLI_IN_A_SEC; - - max_sync_win_ = - (uint32_t)ceil((production_rate * lifetime_ms * - INTEREST_LIFETIME_REDUCTION_FACTOR) / packet_size); + max_sync_win_ = (uint32_t)ceil( + (production_rate * lifetime_ms * INTEREST_LIFETIME_REDUCTION_FACTOR) / + packet_size); max_sync_win_ = std::min(max_sync_win_, rc_->getCongesionWindow()); } @@ -219,12 +264,25 @@ void RTCTransportProtocol::updateSyncWindow() { // if some of the info are not available do not update the current win if (prod_rate != 0.0 && rtt != 0.0 && packet_size != 0.0) { - current_sync_win_ = (uint32_t)ceil(prod_rate * rtt / packet_size); - current_sync_win_ += (uint32_t) - ceil(prod_rate * (PRODUCER_BUFFER_MS / MILLI_IN_A_SEC) / packet_size); + double fec_interest_overhead = (double)state_->getPendingFecPackets() / + (double)(state_->getPendingInterestNumber() - + state_->getPendingFecPackets()); + + double fec_overhead = + std::max(indexer_verifier_->getFecOverhead(), fec_interest_overhead); + + prod_rate += (prod_rate * fec_overhead); - if(current_state_ == SyncState::catch_up) { - current_sync_win_ = (uint32_t) (current_sync_win_ * CATCH_UP_WIN_INCREMENT); + current_sync_win_ = (uint32_t)ceil(prod_rate * rtt / packet_size); + uint32_t buffer = PRODUCER_BUFFER_MS; + if (rtt > 150) + buffer = buffer * 2; // if the RTT is too large we increase the + // the size of the buffer + current_sync_win_ += + ceil(prod_rate * (buffer / MILLI_IN_A_SEC) / packet_size); + + if (current_state_ == SyncState::catch_up) { + current_sync_win_ = current_sync_win_ * CATCH_UP_WIN_INCREMENT; } current_sync_win_ = std::min(current_sync_win_, max_sync_win_); @@ -243,70 +301,48 @@ void RTCTransportProtocol::decreaseSyncWindow() { scheduleNextInterests(); } -void RTCTransportProtocol::sendInterest(Name *interest_name) { - TRANSPORT_LOGD("Sending interest for name %s", - interest_name->toString().c_str()); - - auto interest = core::PacketManager<>::getInstance().getPacket<Interest>(); - interest->setName(*interest_name); - - uint32_t lifetime = default_values::interest_lifetime; - socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, - lifetime); - interest->setLifetime(uint32_t(lifetime)); - - if (*on_interest_output_) { - (*on_interest_output_)(*socket_->getInterface(), *interest); - } - - if (TRANSPORT_EXPECT_FALSE(!is_running_ && !is_first_)) { - return; - } - - portal_->sendInterest(std::move(interest)); -} - void RTCTransportProtocol::sendRtxInterest(uint32_t seq) { - if (!is_running_ && !is_first_) return; + if (!isRunning() && !is_first_) return; - if(!start_send_interest_) return; + if (!start_send_interest_) return; Name *interest_name = nullptr; socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &interest_name); - TRANSPORT_LOGD("send rtx %u", seq); + DLOG_IF(INFO, VLOG_IS_ON(3)) << "send rtx " << seq; interest_name->setSuffix(seq); - sendInterest(interest_name); + sendInterest(*interest_name); } void RTCTransportProtocol::sendProbeInterest(uint32_t seq) { - if (!is_running_ && !is_first_) return; + if (!isRunning() && !is_first_) return; Name *interest_name = nullptr; socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &interest_name); - TRANSPORT_LOGD("send probe %u", seq); + DLOG_IF(INFO, VLOG_IS_ON(3)) << "send probe " << seq; interest_name->setSuffix(seq); - sendInterest(interest_name); + sendInterest(*interest_name); } void RTCTransportProtocol::scheduleNextInterests() { - TRANSPORT_LOGD("Schedule next interests"); + DLOG_IF(INFO, VLOG_IS_ON(3)) << "Schedule next interests"; - if (!is_running_ && !is_first_) return; + if (!isRunning() && !is_first_) return; - if(!start_send_interest_) return; // RTT discovering phase is not finished so - // do not start to send interests + if (pacing_timer_on_) return; // wait pacing timer for the next send - if (scheduler_timer_on_) return; // wait befor send other interests + if (!start_send_interest_) + return; // RTT discovering phase is not finished so + // do not start to send interests if (TRANSPORT_EXPECT_FALSE(!state_->isProducerActive())) { - TRANSPORT_LOGD("Inactive producer."); + DLOG_IF(INFO, VLOG_IS_ON(3)) << "Inactive producer."; // here we keep seding the same interest until the producer // does not start again - if (next_segment_ != 0) { + if (indexer_verifier_->checkNextSuffix() != 0) { // the producer just become inactive, reset the state inactiveProducer(); } @@ -315,125 +351,208 @@ void RTCTransportProtocol::scheduleNextInterests() { socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &interest_name); - TRANSPORT_LOGD("send interest %u", next_segment_); - interest_name->setSuffix(next_segment_); + uint32_t next_seg = 0; + DLOG_IF(INFO, VLOG_IS_ON(3)) << "send interest " << next_seg; + interest_name->setSuffix(next_seg); if (portal_->interestIsPending(*interest_name)) { // if interest 0 is already pending we return return; } - sendInterest(interest_name); + sendInterest(*interest_name); state_->onSendNewInterest(interest_name); return; } - TRANSPORT_LOGD("Pending interest number: %d -- current_sync_win_: %d", - state_->getPendingInterestNumber(), current_sync_win_); + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "Pending interest number: " << state_->getPendingInterestNumber() + << " -- current_sync_win_: " << current_sync_win_; + + uint32_t pending = state_->getPendingInterestNumber(); + if (pending >= current_sync_win_) return; // no space in the window + + if ((current_sync_win_ - pending) < max_aggregated_interest_) { + if (scheduler_timer_on_) return; // timer already scheduled + + uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + + uint64_t time = now - last_interest_sent_time_; + if (time < WAIT_FOR_INTEREST_BATCH) { + uint64_t next = WAIT_FOR_INTEREST_BATCH - time; + scheduler_timer_on_ = true; + scheduler_timer_->expires_from_now(std::chrono::milliseconds(next)); + scheduler_timer_->async_wait([this](std::error_code ec) { + if (ec) return; + if (!scheduler_timer_on_) return; + + scheduler_timer_on_ = false; + scheduleNextInterests(); + }); + return; // whait for the timer + } + } + + scheduler_timer_on_ = false; + scheduler_timer_->cancel(); // skip nacked pacekts - if (next_segment_ <= state_->getLastSeqNacked()) { - next_segment_ = state_->getLastSeqNacked() + 1; + if (indexer_verifier_->checkNextSuffix() <= state_->getLastSeqNacked()) { + indexer_verifier_->jumpToIndex(state_->getLastSeqNacked() + 1); } // skipe received packets - if (next_segment_ <= state_->getHighestSeqReceivedInOrder()) { - next_segment_ = state_->getHighestSeqReceivedInOrder() + 1; + if (indexer_verifier_->checkNextSuffix() <= + state_->getHighestSeqReceivedInOrder()) { + indexer_verifier_->jumpToIndex(state_->getHighestSeqReceivedInOrder() + 1); } uint32_t sent_interests = 0; + uint32_t sent_packets = 0; + uint32_t aggregated_counter = 0; + Name *name = nullptr; + Name interest_name; + socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &name); + std::array<uint32_t, MAX_AGGREGATED_INTEREST> additional_suffixes; + while ((state_->getPendingInterestNumber() < current_sync_win_) && - (sent_interests < MAX_INTERESTS_IN_BATCH)) { - TRANSPORT_LOGD("In while loop. Window size: %u", current_sync_win_); - Name *interest_name = nullptr; - socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, - &interest_name); + (sent_interests < max_sent_int_)) { + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "In while loop. Window size: " << current_sync_win_; + + uint32_t next_seg = indexer_verifier_->getNextSuffix(); - interest_name->setSuffix(next_segment_); + name->setSuffix(next_seg); // send the packet only if: // 1) it is not pending yet (not true for rtx) // 2) the packet is not received or lost // 3) is not in the rtx list - if (portal_->interestIsPending(*interest_name) || - state_->isReceivedOrLost(next_segment_) != PacketState::UNKNOWN || - ldr_->isRtx(next_segment_)) { - TRANSPORT_LOGD( - "skip interest %u because: pending %u, recv %u, rtx %u", - next_segment_, (portal_->interestIsPending(*interest_name)), - (state_->isReceivedOrLost(next_segment_) != PacketState::UNKNOWN), - (ldr_->isRtx(next_segment_))); - next_segment_ = (next_segment_ + 1) % MIN_PROBE_SEQ; + // 4) is fec and is not in order (!= last sent + 1) + if (portal_->interestIsPending(*name) || + state_->isReceivedOrLost(next_seg) != PacketState::UNKNOWN || + ldr_->isRtx(next_seg) || + (indexer_verifier_->isFec(next_seg) && + next_seg != last_interest_sent_seq_ + 1)) { + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "skip interest " << next_seg << " because: pending " + << portal_->interestIsPending(*name) << ", recv " + << (state_->isReceivedOrLost(next_seg) != PacketState::UNKNOWN) + << ", rtx " << (ldr_->isRtx(next_seg)) << ", is old fec " + << ((indexer_verifier_->isFec(next_seg) && + next_seg != last_interest_sent_seq_ + 1)); continue; } + if (aggregated_counter == 0) { + DLOG_IF(INFO, VLOG_IS_ON(3)) << "(name) send interest " << next_seg; + interest_name = *name; + } else { + DLOG_IF(INFO, VLOG_IS_ON(3)) << "(append) send interest " << next_seg; + additional_suffixes[aggregated_counter - 1] = next_seg; + } - sent_interests++; - TRANSPORT_LOGD("send interest %u", next_segment_); - sendInterest(interest_name); - state_->onSendNewInterest(interest_name); + last_interest_sent_seq_ = next_seg; + state_->onSendNewInterest(name); + aggregated_counter++; + + if (aggregated_counter >= max_aggregated_interest_) { + sent_packets++; + sent_interests++; + sendInterest(interest_name, &additional_suffixes, aggregated_counter - 1); + last_interest_sent_time_ = + std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + aggregated_counter = 0; + } + } - next_segment_ = (next_segment_ + 1) % MIN_PROBE_SEQ; + // exiting the while we may have some pending interest to send + if (aggregated_counter != 0) { + sent_packets++; + last_interest_sent_time_ = + std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + sendInterest(interest_name, &additional_suffixes, aggregated_counter - 1); } if (state_->getPendingInterestNumber() < current_sync_win_) { - // we still have space in the window but we already sent a batch of - // MAX_INTERESTS_IN_BATCH interest. for the following ones wait one - // WAIT_BETWEEN_INTEREST_BATCHES to avoid local packets drop + // we still have space in the window but we already sent too many packets + // wait PACING_WAIT to avoid drops in the kernel - scheduler_timer_on_ = true; - scheduler_timer_->expires_from_now( - std::chrono::microseconds(WAIT_BETWEEN_INTEREST_BATCHES)); + pacing_timer_on_ = true; + pacing_timer_->expires_from_now(std::chrono::microseconds(PACING_WAIT)); scheduler_timer_->async_wait([this](std::error_code ec) { if (ec) return; - if (!scheduler_timer_on_) return; + if (!pacing_timer_on_) return; - scheduler_timer_on_ = false; + pacing_timer_on_ = false; scheduleNextInterests(); }); } } -void RTCTransportProtocol::onTimeout(Interest::Ptr &&interest) { - uint32_t segment_number = interest->getName().getSuffix(); - - TRANSPORT_LOGD("timeout for packet %u", segment_number); +void RTCTransportProtocol::onInterestTimeout(Interest::Ptr &interest, + const Name &name) { + uint32_t segment_number = name.getSuffix(); if (segment_number >= MIN_PROBE_SEQ) { // this is a timeout on a probe, do nothing return; } + PacketState state = state_->isReceivedOrLost(segment_number); + if (state != PacketState::UNKNOWN) { + // we may recover a packets using fec, ignore this timer + return; + } + timeouts_or_nacks_.insert(segment_number); if (TRANSPORT_EXPECT_TRUE(state_->isProducerActive()) && - segment_number <= state_->getHighestSeqReceivedInOrder()) { + segment_number <= state_->getHighestSeqReceived()) { // we retransmit packets only if the producer is active, otherwise we // use timeouts to avoid to send too much traffic // // a timeout is sent using RTX only if it is an old packet. if it is for a // seq number that we didn't reach yet, we send the packet using the normal // schedule next interest - TRANSPORT_LOGD("handle timeout for packet %u using rtx", segment_number); - ldr_->onTimeout(segment_number); - state_->onTimeout(segment_number); + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "handle timeout for packet " << segment_number << " using rtx"; + if (ldr_->isRtxOn()) { + ldr_->onTimeout(segment_number); + if (indexer_verifier_->isFec(segment_number)) + state_->onTimeout(segment_number, true); + else + state_->onTimeout(segment_number, false); + } else { + // in this case we wil never recover the timeout + state_->onTimeout(segment_number, true); + } scheduleNextInterests(); return; } - TRANSPORT_LOGD("handle timeout for packet %u using normal interests", - segment_number); + DLOG_IF(INFO, VLOG_IS_ON(3)) << "handle timeout for packet " << segment_number + << " using normal interests"; - if (segment_number < next_segment_) { + if (segment_number < indexer_verifier_->checkNextSuffix()) { // this is a timeout for a packet that will be generated in the future but // we are asking for higher sequence numbers. we need to go back like in the // case of future nacks - TRANSPORT_LOGD("on timeout next seg = %u, jump to %u", - next_segment_, segment_number); - next_segment_ = segment_number; + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "On timeout next seg = " << indexer_verifier_->checkNextSuffix() + << ", jump to " << segment_number; + // add an extra space in the window + current_sync_win_++; + indexer_verifier_->jumpToIndex(segment_number); } - state_->onTimeout(segment_number); + state_->onTimeout(segment_number, false); scheduleNextInterests(); } @@ -446,8 +565,8 @@ void RTCTransportProtocol::onNack(const ContentObject &content_object) { // check if the packet got a timeout - TRANSPORT_LOGD("Nack received %u. Production segment: %u", nack_segment, - production_seg); + DLOG_IF(INFO, VLOG_IS_ON(3)) << "Nack received " << nack_segment + << ". Production segment: " << production_seg; bool compute_stats = true; auto tn_it = timeouts_or_nacks_.find(nack_segment); @@ -459,14 +578,15 @@ void RTCTransportProtocol::onNack(const ContentObject &content_object) { state_->onNackPacketReceived(content_object, compute_stats); ldr_->onNackPacketReceived(content_object); - // both in case of past and future nack we set next_segment_ equal to the + // both in case of past and future nack we jump to the // production segment in the nack. In case of past nack we will skip unneded // interest (this is already done in the scheduleNextInterest in any case) // while in case of future nacks we can go back in time and ask again for the // content that generated the nack - TRANSPORT_LOGD("on nack next seg = %u, jump to %u", - next_segment_, production_seg); - next_segment_ = production_seg; + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "On nack next seg = " << indexer_verifier_->checkNextSuffix() + << ", jump to " << production_seg; + indexer_verifier_->jumpToIndex(production_seg); if (production_seg > nack_segment) { // remove the nack is it exists @@ -496,30 +616,33 @@ void RTCTransportProtocol::onNack(const ContentObject &content_object) { void RTCTransportProtocol::onProbe(const ContentObject &content_object) { bool valid = state_->onProbePacketReceived(content_object); - if(!valid) return; + if (!valid) return; struct nack_packet_t *probe = (struct nack_packet_t *)content_object.getPayload()->data(); uint32_t production_seg = probe->getProductionSegement(); - // as for the nacks set next_segment_ - TRANSPORT_LOGD("on probe next seg = %u, jump to %u", - next_segment_, production_seg); - next_segment_ = production_seg; + // as for the nacks set next_segment + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "on probe next seg = " << indexer_verifier_->checkNextSuffix() + << ", jump to " << production_seg; + indexer_verifier_->jumpToIndex(production_seg); ldr_->onProbePacketReceived(content_object); updateSyncWindow(); } -void RTCTransportProtocol::onContentObject(Interest &interest, - ContentObject &content_object) { - TRANSPORT_LOGD("Received content object of size: %zu", - content_object.payloadSize()); - uint32_t payload_size = (uint32_t) content_object.payloadSize(); +void RTCTransportProtocol::onContentObjectReceived( + Interest &interest, ContentObject &content_object, std::error_code &ec) { + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "Received content object of size: " << content_object.payloadSize(); + uint32_t payload_size = content_object.payloadSize(); uint32_t segment_number = content_object.getName().getSuffix(); + ec = make_error_code(protocol_error::not_reassemblable); + if (segment_number >= MIN_PROBE_SEQ) { - TRANSPORT_LOGD("Received probe %u", segment_number); + DLOG_IF(INFO, VLOG_IS_ON(3)) << "Received probe " << segment_number; if (*on_content_object_input_) { (*on_content_object_input_)(*socket_->getInterface(), content_object); } @@ -528,7 +651,7 @@ void RTCTransportProtocol::onContentObject(Interest &interest, } if (payload_size == NACK_HEADER_SIZE) { - TRANSPORT_LOGD("Received nack %u", segment_number); + DLOG_IF(INFO, VLOG_IS_ON(3)) << "Received nack " << segment_number; if (*on_content_object_input_) { (*on_content_object_input_)(*socket_->getInterface(), content_object); } @@ -536,9 +659,8 @@ void RTCTransportProtocol::onContentObject(Interest &interest, return; } - TRANSPORT_LOGD("Received content %u", segment_number); + DLOG_IF(INFO, VLOG_IS_ON(3)) << "Received content " << segment_number; - rc_->onDataPacketReceived(content_object); bool compute_stats = true; auto tn_it = timeouts_or_nacks_.find(segment_number); if (tn_it != timeouts_or_nacks_.end()) { @@ -551,25 +673,49 @@ void RTCTransportProtocol::onContentObject(Interest &interest, // check if the packet was already received PacketState state = state_->isReceivedOrLost(segment_number); - state_->onDataPacketReceived(content_object, compute_stats); - ldr_->onDataPacketReceived(content_object); - // if the stat for this seq number is received do not send the packet to app if (state != PacketState::RECEIVED) { - if (*on_content_object_input_) { - (*on_content_object_input_)(*socket_->getInterface(), content_object); + // send packet to decoder + if (fec_decoder_) { + DLOG_IF(INFO, VLOG_IS_ON(4)) + << "send packet " << segment_number << " to FEC decoder"; + fec_decoder_->onDataPacket( + content_object, content_object.headerSize() + rtc::DATA_HEADER_SIZE); + } + if (!indexer_verifier_->isFec(segment_number)) { + // the packet may be alredy sent to the ap by the decoder, check again if + // it is already received + state = state_->isReceivedOrLost(segment_number); + if (state != PacketState::RECEIVED) { + DLOG_IF(INFO, VLOG_IS_ON(4)) << "Received content " << segment_number; + + state_->onDataPacketReceived(content_object, compute_stats); + + if (*on_content_object_input_) { + (*on_content_object_input_)(*socket_->getInterface(), content_object); + } + ec = make_error_code(protocol_error::success); + } + } else { + DLOG_IF(INFO, VLOG_IS_ON(4)) << "Received fec " << segment_number; + state_->onFecPacketReceived(content_object); } - reassemble(content_object); } else { - TRANSPORT_LOGD("Received duplicated content %u, drop it", segment_number); + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "Received duplicated content " << segment_number << ", drop it"; + ec = make_error_code(protocol_error::duplicated_content); } + ldr_->onDataPacketReceived(content_object); + rc_->onDataPacketReceived(content_object); + updateSyncWindow(); } void RTCTransportProtocol::sendStatsToApp( uint32_t retx_count, uint32_t received_bytes, uint32_t sent_interests, - uint32_t lost_data, uint32_t recovered_losses, uint32_t received_nacks) { + uint32_t lost_data, uint32_t definitely_lost, uint32_t recovered_losses, + uint32_t received_nacks, uint32_t received_fec) { if (*stats_summary_) { // Send the stats to the app stats_->updateQueuingDelay(state_->getQueuing()); @@ -581,23 +727,35 @@ void RTCTransportProtocol::sendStatsToApp( stats_->updateBytesRecv(received_bytes); stats_->updateInterestTx(sent_interests); stats_->updateReceivedNacks(received_nacks); + stats_->updateReceivedFEC(received_fec); stats_->updateAverageWindowSize(current_sync_win_); stats_->updateLossRatio(state_->getLossRate()); stats_->updateAverageRtt(state_->getRTT()); + stats_->updateQueuingDelay(state_->getQueuing()); stats_->updateLostData(lost_data); + stats_->updateDefinitelyLostData(definitely_lost); stats_->updateRecoveredData(recovered_losses); stats_->updateCCState((unsigned int)current_state_ ? 1 : 0); (*stats_summary_)(*socket_->getInterface(), *stats_); } } -void RTCTransportProtocol::reassemble(ContentObject &content_object) { - auto read_buffer = content_object.getPayload(); - TRANSPORT_LOGD("Size of payload: %zu", read_buffer->length()); - read_buffer->trimStart(DATA_HEADER_SIZE); - Reassembly::read_buffer_ = std::move(read_buffer); - Reassembly::notifyApplication(); +void RTCTransportProtocol::onFecPackets( + std::vector<std::pair<uint32_t, fec::buffer>> &packets) { + for (auto &packet : packets) { + PacketState state = state_->isReceivedOrLost(packet.first); + if (state != PacketState::RECEIVED) { + state_->onPacketRecoveredFec(packet.first); + ldr_->onPacketRecoveredFec(packet.first); + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "Recovered packet " << packet.first << " through FEC."; + reassembly_->reassemble(*packet.second, packet.first); + } else { + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "Packet" << packet.first << "already received."; + } + } } } // end namespace rtc |