diff options
Diffstat (limited to 'libtransport')
-rw-r--r-- | libtransport/src/hicn/transport/protocols/rtc.cc | 100 | ||||
-rw-r--r-- | libtransport/src/hicn/transport/protocols/rtc.h | 13 |
2 files changed, 105 insertions, 8 deletions
diff --git a/libtransport/src/hicn/transport/protocols/rtc.cc b/libtransport/src/hicn/transport/protocols/rtc.cc index e6134f767..f52494aba 100644 --- a/libtransport/src/hicn/transport/protocols/rtc.cc +++ b/libtransport/src/hicn/transport/protocols/rtc.cc @@ -33,6 +33,7 @@ RTCTransportProtocol::RTCTransportProtocol( icnet_socket->getSocketOption(PORTAL, portal_); rtx_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService()); probe_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService()); + sentinel_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService()); reset(); } @@ -90,6 +91,7 @@ void RTCTransportProtocol::reset() { lastReceivedTime_ = std::chrono::duration_cast<std::chrono::milliseconds>( std::chrono::steady_clock::now().time_since_epoch()) .count(); + lastEvent_ = lastReceivedTime_; highestReceived_ = 0; firstSequenceInRound_ = 0; @@ -108,6 +110,7 @@ void RTCTransportProtocol::reset() { avgPacketSize_ = HICN_INIT_PACKET_SIZE; gotNack_ = false; gotFutureNack_ = 0; + rounds_ = 0; roundsWithoutNacks_ = 0; pathTable_.clear(); @@ -228,6 +231,11 @@ void RTCTransportProtocol::updateStats(uint32_t round_duration) { pathTable_.find(producerPathLabels_[1]) == pathTable_.end()) return; // this should not happen + //set sentinel timer if needed + if(rounds_ == 0){ + sentinelTimer(); + } + // as a queuing delay we keep the lowest one among the two paths // if one path is congested the forwarder should decide to do not // use it so it does not make sense to inform the application @@ -304,6 +312,7 @@ void RTCTransportProtocol::updateStats(uint32_t round_duration) { receivedData_ = 0; packetLost_ = 0; lossRecovered_ = 0; + rounds_++; firstSequenceInRound_ = highestReceived_; } @@ -434,6 +443,7 @@ void RTCTransportProtocol::sendInterest(Name *interest_name, bool rtx) { sentInterest_++; if (!rtx) { + packets_in_window_[interest_name->getSuffix()] = 0; inflightInterestsCount_++; } } @@ -499,6 +509,57 @@ void RTCTransportProtocol::scheduleNextInterests() { } } +void RTCTransportProtocol::sentinelTimer(){ + uint32_t wait = 1; + if(pathTable_.find(producerPathLabels_[0]) != pathTable_.end()){ + wait = round( + pathTable_[producerPathLabels_[0]]->getInterArrivalGap()); + } + if(wait == 0) + wait = 1; + + sentinel_timer_->expires_from_now(std::chrono::milliseconds(wait)); + sentinel_timer_->async_wait([this](std::error_code ec) { + if (ec) return; + uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + + if(pathTable_.find(producerPathLabels_[0]) != pathTable_.end() || + pathTable_.find(producerPathLabels_[1]) != pathTable_.end()){ + + uint64_t max_waiting_time = + round((pathTable_[producerPathLabels_[1]]->getMinRtt() - + pathTable_[producerPathLabels_[0]]->getMinRtt()) + + pathTable_[producerPathLabels_[0]]->getInterArrivalGap()) * 2; + + if((currentState_ == HICN_RTC_NORMAL_STATE) && + (inflightInterestsCount_ >= currentCWin_) && + ((now - lastEvent_) > max_waiting_time)){ + + uint64_t RTT = pathTable_[producerPathLabels_[1]]->getMinRtt(); + + for(auto it = packets_in_window_.begin(); + it != packets_in_window_.end(); it++){ + uint32_t pkt = it->first & modMask_; + if (inflightInterests_[pkt].sequence == it->first && + ((now - inflightInterests_[pkt].transmissionTime) >= RTT)){ + inflightInterests_[pkt].transmissionTime = now; + Name *interest_name = nullptr; + socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, + &interest_name); + interest_name->setSuffix(it->first); + it->second++; + sendInterest(interest_name, true); + } + } + } + }//esle not enough info to resend the packet, schedule the timer agian + + sentinelTimer(); + }); +} + void RTCTransportProtocol::addRetransmissions(uint32_t val) { // add only val in the rtx list addRetransmissions(val, val + 1); @@ -651,8 +712,6 @@ void RTCTransportProtocol::checkRtx() { } void RTCTransportProtocol::onTimeout(Interest::Ptr &&interest) { - // packetLost_++; - uint32_t segmentNumber = interest->getName().getSuffix(); if (segmentNumber >= HICN_MIN_PROBE_SEQ) { @@ -663,14 +722,22 @@ void RTCTransportProtocol::onTimeout(Interest::Ptr &&interest) { uint32_t pkt = segmentNumber & modMask_; if (TRANSPORT_EXPECT_FALSE(!firstPckReceived_)) { - inflightInterestsCount_--; // we do nothing, and we keep asking the same stuff over // and over until we get at least a packet + inflightInterestsCount_--; + lastEvent_ = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + packets_in_window_.erase(segmentNumber); scheduleNextInterests(); return; } if (inflightInterests_[pkt].state == sent_) { + lastEvent_ = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + packets_in_window_.erase(segmentNumber); inflightInterestsCount_--; } @@ -795,15 +862,30 @@ void RTCTransportProtocol::onContentObject( return; } + //check if the packet is a rtx + bool is_rtx = false; + if(interestRetransmissions_.find(segmentNumber) != + interestRetransmissions_.end()){ + is_rtx = true; + }else{ + auto it_win = packets_in_window_.find(segmentNumber); + if(it_win != packets_in_window_.end() && + it_win->second != 0) + is_rtx = true; + } + if (payload_size == HICN_NACK_HEADER_SIZE) { if (inflightInterests_[pkt].state == sent_) { + lastEvent_ = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + packets_in_window_.erase(segmentNumber); inflightInterestsCount_--; } bool old_nack = false; - if (interestRetransmissions_.find(segmentNumber) == - interestRetransmissions_.end()) { + if (!is_rtx){ // this is not a retransmitted packet old_nack = onNack(*content_object, false); updateDelayStats(*content_object); @@ -831,12 +913,14 @@ void RTCTransportProtocol::onContentObject( content_object->payloadSize()); if (inflightInterests_[pkt].state == sent_) { + lastEvent_ = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + packets_in_window_.erase(segmentNumber); inflightInterestsCount_--; // packet sent without timeouts } - if (inflightInterests_[pkt].state == sent_ && - interestRetransmissions_.find(segmentNumber) == - interestRetransmissions_.end()) { + if (inflightInterests_[pkt].state == sent_ && !is_rtx){ // delay stats are computed only for non retransmitted data updateDelayStats(*content_object); } diff --git a/libtransport/src/hicn/transport/protocols/rtc.h b/libtransport/src/hicn/transport/protocols/rtc.h index 7927e3969..908be017a 100644 --- a/libtransport/src/hicn/transport/protocols/rtc.h +++ b/libtransport/src/hicn/transport/protocols/rtc.h @@ -123,6 +123,7 @@ class RTCTransportProtocol : public TransportProtocol, public Reassembly { // packet functions void sendInterest(Name *interest_name, bool rtx); void scheduleNextInterests() override; + void sentinelTimer(); void addRetransmissions(uint32_t val); void addRetransmissions(uint32_t start, uint32_t stop); uint64_t retransmit(); @@ -163,6 +164,17 @@ class RTCTransportProtocol : public TransportProtocol, public Reassembly { uint64_t lastReceivedTime_; //time at which we recevied the //lastReceived_ packet + //sentinel + //if all packets in the window get lost we need something that + //wakes up our consumer socket. Interest timeouts set to 1 sec + //expire too late. This timers expire much sooner and if it + //detects that all the interest in the window may be lost + //it sends all of them again + std::unique_ptr<asio::steady_timer> sentinel_timer_; + uint64_t lastEvent_; //time at which we removed a pending + //interest from the window + std::unordered_map<uint32_t, uint8_t> packets_in_window_; + //rtt probes //the RTC transport tends to overestimate the RTT //du to the production time on the server side @@ -188,6 +200,7 @@ class RTCTransportProtocol : public TransportProtocol, public Reassembly { double avgPacketSize_; bool gotNack_; uint32_t gotFutureNack_; + uint32_t rounds_; uint32_t roundsWithoutNacks_; //we keep track of up two paths (if only one path is in use |