diff options
-rw-r--r-- | libtransport/src/hicn/transport/protocols/rtc.cc | 329 | ||||
-rw-r--r-- | libtransport/src/hicn/transport/protocols/rtc.h | 31 |
2 files changed, 228 insertions, 132 deletions
diff --git a/libtransport/src/hicn/transport/protocols/rtc.cc b/libtransport/src/hicn/transport/protocols/rtc.cc index 882508b41..9402d3b02 100644 --- a/libtransport/src/hicn/transport/protocols/rtc.cc +++ b/libtransport/src/hicn/transport/protocols/rtc.cc @@ -18,14 +18,6 @@ #include <hicn/transport/interfaces/socket_consumer.h> #include <hicn/transport/protocols/rtc.h> -/* - * TODO - * 2) start/constructor/rest variable implementation - * 3) interest retransmission: now I always recover, we should recover only if - * we have enough time 4) returnContentToApplication: rememeber to remove the - * first 32bits from the payload - */ - namespace transport { namespace protocol { @@ -39,6 +31,7 @@ RTCTransportProtocol::RTCTransportProtocol( modMask_((1 << default_values::log_2_default_buffer_size) - 1) { icnet_socket->getSocketOption(PORTAL, portal_); nack_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService()); + rtx_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService()); nack_timer_used_ = false; reset(); } @@ -49,7 +42,10 @@ RTCTransportProtocol::~RTCTransportProtocol() { } } -int RTCTransportProtocol::start() { return TransportProtocol::start(); } +int RTCTransportProtocol::start() { + checkRtx(); + return TransportProtocol::start(); +} void RTCTransportProtocol::stop() { if (!is_running_) return; @@ -67,6 +63,7 @@ void RTCTransportProtocol::resume() { inflightInterestsCount_ = 0; scheduleNextInterests(); + checkRtx(); portal_->runEventsLoop(); @@ -104,7 +101,7 @@ void RTCTransportProtocol::reset() { // names/packets var actualSegment_ = 0; inflightInterestsCount_ = 0; - while (interestRetransmissions_.size() != 0) interestRetransmissions_.pop(); + interestRetransmissions_.clear(); lastSegNacked_ = 0; lastReceived_ = 0; nackedByProducer_.clear(); @@ -120,10 +117,6 @@ void RTCTransportProtocol::reset() { gotFutureNack_ = 0; roundsWithoutNacks_ = 0; pathTable_.clear(); - // roundCounter_ = 0; - // minRTTwin_.clear(); - // for (int i = 0; i < MIN_RTT_WIN; i++) - // minRTTwin_.push_back(UINT_MAX); minRtt_ = UINT_MAX; // CC var @@ -135,7 +128,7 @@ void RTCTransportProtocol::reset() { producerPathLabel_ = 0; socket_->setSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, (uint32_t)HICN_RTC_INTEREST_LIFETIME); - // XXX this should bedone by the application + // XXX this should be done by the application } uint32_t max(uint32_t a, uint32_t b) { @@ -168,9 +161,12 @@ void RTCTransportProtocol::updateDelayStats( uint32_t segmentNumber = content_object.getName().getSuffix(); uint32_t pkt = segmentNumber & modMask_; - if (inflightInterests_[pkt].transmissionTime == - 0) // this is always the case if we have a retransmitted packet (timeout - // or RTCP) + if (inflightInterests_[pkt].state != sent_) + return; + + if(interestRetransmissions_.find(segmentNumber) != + interestRetransmissions_.end()) + //this packet was rtx at least once return; uint32_t pathLabel = content_object.getPathLabel(); @@ -283,8 +279,8 @@ void RTCTransportProtocol::computeMaxWindow(uint32_t productionRate, (double)HICN_MILLI_IN_A_SEC)); if (currentState_ == HICN_RTC_SYNC_STATE) { - // in this case we do not limit the window with the BDP, beacuse most likly - // it is wrong + // in this case we do not limit the window with the BDP, beacuse most + // likely it is wrong maxCWin_ = maxWaintingInterest; return; } @@ -333,68 +329,12 @@ void RTCTransportProtocol::increaseWindow() { } else { currentCWin_ = min( maxCWin_, - (uint32_t)ceil(currentCWin_ + (1.0 / (double)currentCWin_))); // linear + (uint32_t)ceil(currentCWin_ + + (1.0 / (double)currentCWin_))); // linear } } -void RTCTransportProtocol::sendInterest() { - Name *interest_name = nullptr; - socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, - &interest_name); - bool isRTX = false; - // uint32_t sentInt = 0; - - if (interestRetransmissions_.size() > 0) { - // handle retransmission - // here we have two possibile retransmissions: retransmissions due to - // timeouts and retransmissions due to RTCP NACKs. we will send the interest - // anyway, even if it is pending (this is possible only in the second case) - uint32_t rtxSeg = interestRetransmissions_.front(); - interestRetransmissions_.pop(); - - // a packet recovery means that there was a loss - packetLost_++; - - uint32_t pkt = rtxSeg & modMask_; - interest_name->setSuffix(rtxSeg); - - // if the interest is not pending anymore we encrease the retrasnmission - // counter in order to avoid to handle a recovered packt as a normal one - if (!portal_->interestIsPending(*interest_name)) { - inflightInterests_[pkt].retransmissions++; - } - - inflightInterests_[pkt].transmissionTime = 0; - inflightInterests_[pkt].received = 0; - inflightInterests_[pkt].sequence = rtxSeg; - isRTX = true; - } else { - // in this case we send the packet only if it is not pending yet - interest_name->setSuffix(actualSegment_); - if (portal_->interestIsPending(*interest_name)) { - actualSegment_++; - return; - } - uint32_t pkt = actualSegment_ & modMask_; - - //if we already reacevied the content we don't ask it again - if(inflightInterests_[pkt].received == 1 && - inflightInterests_[pkt].sequence == actualSegment_) { - actualSegment_++; - return; - } - - // sentInt = actualSegment_; - inflightInterests_[pkt].transmissionTime = - std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); - inflightInterests_[pkt].retransmissions = 0; - inflightInterests_[pkt].received = 0; - inflightInterests_[pkt].sequence = actualSegment_; - actualSegment_++; - } - +void RTCTransportProtocol::sendInterest(Name *interest_name, bool rtx) { auto interest = getPacket(); interest->setName(*interest_name); @@ -420,27 +360,51 @@ void RTCTransportProtocol::sendInterest() { sentInterest_++; - if (!isRTX) { + if (!rtx) { inflightInterestsCount_++; } + } void RTCTransportProtocol::scheduleNextInterests() { checkRound(); if (!is_running_) return; - while (interestRetransmissions_.size() > 0) { - sendInterest(); - checkRound(); - } - while (inflightInterestsCount_ < currentCWin_) { - sendInterest(); + Name *interest_name = nullptr; + socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, + &interest_name); + + //we send the packet only if it is not pending yet + interest_name->setSuffix(actualSegment_); + if (portal_->interestIsPending(*interest_name)) { + actualSegment_++; + continue; + } + + uint32_t pkt = actualSegment_ & modMask_; + //if we already reacevied the content we don't ask it again + if(inflightInterests_[pkt].state == received_ && + inflightInterests_[pkt].sequence == actualSegment_) { + actualSegment_++; + continue; + } + + inflightInterests_[pkt].transmissionTime = + std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + inflightInterests_[pkt].state = sent_; + inflightInterests_[pkt].sequence = actualSegment_; + actualSegment_++; + + sendInterest(interest_name, false); checkRound(); } } void RTCTransportProtocol::scheduleAppNackRtx(std::vector<uint32_t> &nacks) { +#if 0 for (uint32_t i = 0; i < nacks.size(); i++) { if (nackedByProducer_.find(nacks[i]) != nackedByProducer_.end()) { continue; @@ -452,22 +416,142 @@ void RTCTransportProtocol::scheduleAppNackRtx(std::vector<uint32_t> &nacks) { } scheduleNextInterests(); +#endif +} + +void RTCTransportProtocol::addRetransmissions(uint32_t val){ + //add only val in the rtx list + addRetransmissions(val, val + 1); } + +void RTCTransportProtocol::addRetransmissions(uint32_t start, uint32_t stop){ + for(uint32_t i = start; i < stop; i++){ + auto it = interestRetransmissions_.find(i); + if(it == interestRetransmissions_.end()){ + if (lastSegNacked_ <= i) { + //i must be larger than the last past nack received + interestRetransmissions_[i] = 0; + } + }//if the retransmission is already there the rtx timer will + //take care of it + } + retransmit(true); +} + +void RTCTransportProtocol::retransmit(bool first_rtx){ + auto it = interestRetransmissions_.begin(); + + //cut len to max HICN_MAX_RTX_SIZE + //since we use a map, the smaller (and so the older) sequence number are at + //the beginnin of the map + while(interestRetransmissions_.size() > HICN_MAX_RTX_SIZE){ + it = interestRetransmissions_.erase(it); + } + + it = interestRetransmissions_.begin(); + + while (it != interestRetransmissions_.end()){ + uint32_t pkt = it->first & modMask_; + + if(inflightInterests_[pkt].sequence != it->first){ + //this packet is not anymore in the inflight buffer, erase it + it = interestRetransmissions_.erase(it); + continue; + } + + //we retransmitted the packet too many times + if(it->second >= HICN_MAX_RTX){ + it = interestRetransmissions_.erase(it); + continue; + } + + //this packet is too old + if((lastReceived_ > it->first) && + (lastReceived_ - it->first) > HICN_MAX_RTX_MAX_AGE){ + it = interestRetransmissions_.erase(it); + continue; + } + + if(first_rtx){ + //TODO (optimization) + //the rtx that we never sent (it->second == 0) are all at the + //end, so we can go directly there + if(it->second == 0){ + inflightInterests_[pkt].transmissionTime = + std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + it->second++; + + Name *interest_name = nullptr; + socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, + &interest_name); + interest_name->setSuffix(it->first); + sendInterest(interest_name, true); + } + ++it; + }else{ + //base on time + uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + if((now - inflightInterests_[pkt].transmissionTime) > 20){ + //XXX replace 20 with rtt + inflightInterests_[pkt].transmissionTime = + std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + it->second++; + + Name *interest_name = nullptr; + socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, + &interest_name); + interest_name->setSuffix(it->first); + sendInterest(interest_name, true); + } + ++it; + } + } +} + +void RTCTransportProtocol::checkRtx(){ + retransmit(false); + rtx_timer_->expires_from_now(std::chrono::milliseconds(20)); + rtx_timer_->async_wait([this](std::error_code ec) { + if (ec) return; + checkRtx(); + }); +} + void RTCTransportProtocol::onTimeout(Interest::Ptr &&interest) { // packetLost_++; uint32_t segmentNumber = interest->getName().getSuffix(); uint32_t pkt = segmentNumber & modMask_; - if (inflightInterests_[pkt].retransmissions == 0) { + if (inflightInterests_[pkt].state == sent_) { inflightInterestsCount_--; } - if (inflightInterests_[pkt].retransmissions < HICN_MAX_RTX && - lastSegNacked_ <= segmentNumber && - actualSegment_ < segmentNumber) { - interestRetransmissions_.push(segmentNumber); - } + //check how many times we sent this packet + auto it = interestRetransmissions_.find(segmentNumber); + if(it != interestRetransmissions_.end() && it->second >= HICN_MAX_RTX){ + inflightInterests_[pkt].state = lost_; + } + + if(inflightInterests_[pkt].state == sent_) { + inflightInterests_[pkt].state = timeout1_; + } else if (inflightInterests_[pkt].state == timeout1_) { + inflightInterests_[pkt].state = timeout2_; + } else if (inflightInterests_[pkt].state == timeout2_) { + inflightInterests_[pkt].state = lost_; + } + + if(inflightInterests_[pkt].state == lost_) { + interestRetransmissions_.erase(segmentNumber); + }else{ + addRetransmissions(segmentNumber); + } scheduleNextInterests(); } @@ -478,6 +562,7 @@ bool RTCTransportProtocol::checkIfProducerIsActive( uint32_t productionSeg = *payload; uint32_t productionRate = *(++payload); + if (productionRate == 0) { // the producer socket is not active // in this case we consider only the first nack @@ -486,7 +571,7 @@ bool RTCTransportProtocol::checkIfProducerIsActive( } nack_timer_used_ = true; - // actualSegment_ should be the one in the nack, which will the next in + // actualSegment_ should be the one in the nack, which will be the next in // production actualSegment_ = productionSeg; // all the rest (win size should not change) @@ -522,7 +607,7 @@ void RTCTransportProtocol::onNack(const ContentObject &content_object) { computeMaxWindow(productionRate, 0); increaseWindow(); - while (interestRetransmissions_.size() != 0) interestRetransmissions_.pop(); + interestRetransmissions_.clear(); lastSegNacked_ = productionSeg; if (nackedByProducer_.size() >= nackedByProducerMaxSize_) @@ -533,30 +618,13 @@ void RTCTransportProtocol::onNack(const ContentObject &content_object) { // we are asking stuff in the future gotFutureNack_++; - if(lastReceived_ > productionSeg){ - //if this happens the producer socket was restarted - //we erase all the inflight + timeout state (only for the first NACK) - if(gotFutureNack_ == 1){ - while (interestRetransmissions_.size() != 0) - interestRetransmissions_.pop(); - for (uint32_t i = 0; i < inflightInterests_.size(); i++){ - inflightInterests_[i].transmissionTime = 0; - inflightInterests_[i].sequence = 0; - inflightInterests_[i].retransmissions = 0; - inflightInterests_[i].received = 0; - } - } - actualSegment_ = productionSeg + 1; - //we can't say much abouit the window, so keep it as it is - }else{ - actualSegment_ = productionSeg + 1; + actualSegment_ = productionSeg + 1; - computeMaxWindow(productionRate, 0); - decreaseWindow(); + computeMaxWindow(productionRate, 0); + decreaseWindow(); - if (currentState_ == HICN_RTC_SYNC_STATE) { - currentState_ = HICN_RTC_NORMAL_STATE; - } + if (currentState_ == HICN_RTC_SYNC_STATE) { + currentState_ = HICN_RTC_NORMAL_STATE; } } // equal should not happen } @@ -570,11 +638,10 @@ void RTCTransportProtocol::onNackForRtx(const ContentObject &content_object) { // we are asking for stuff produced in the past actualSegment_ = max(productionSeg + 1, actualSegment_); - while (interestRetransmissions_.size() != 0) interestRetransmissions_.pop(); + interestRetransmissions_.clear(); lastSegNacked_ = productionSeg; } else if (productionSeg < nackSegment) { - actualSegment_ = productionSeg + 1; } // equal should not happen } @@ -598,8 +665,7 @@ void RTCTransportProtocol::onContentObject( // Nacks always come form the producer, so we set the producerPathLabel_; producerPathLabel_ = content_object->getPathLabel(); schedule_next_interest = checkIfProducerIsActive(*content_object); - if (inflightInterests_[pkt].retransmissions == 0) { - // discard nacks for rtx packets + if (inflightInterests_[pkt].state == sent_) { inflightInterestsCount_--; // if checkIfProducerIsActive returns false, we did all we need to do // inside that function, no need to call onNack @@ -610,26 +676,37 @@ void RTCTransportProtocol::onContentObject( } } else { - receivedData_++; - inflightInterests_[pkt].received = 1; - lastReceived_ = segmentNumber; - avgPacketSize_ = (HICN_ESTIMATED_PACKET_SIZE * avgPacketSize_) + ((1 - HICN_ESTIMATED_PACKET_SIZE) * payload->length()); - if (inflightInterests_[pkt].retransmissions == 0) { - inflightInterestsCount_--; + if (inflightInterests_[pkt].state == sent_) { + inflightInterestsCount_--; //packet sent without timeouts + } + + if (inflightInterests_[pkt].state == sent_ && + interestRetransmissions_.find(segmentNumber) == + interestRetransmissions_.end()){ // we count only non retransmitted data in order to take into accunt only // the transmition rate of the producer receivedBytes_ += (uint32_t)(content_object->headerSize() + content_object->payloadSize()); updateDelayStats(*content_object); + + addRetransmissions(lastReceived_ + 1, segmentNumber); + //lastReceived_ is updated only for data packets received without RTX + lastReceived_ = segmentNumber; } + receivedData_++; + inflightInterests_[pkt].state = received_; + reassemble(std::move(content_object)); increaseWindow(); } + //in any case we remove the packet from the rtx list + interestRetransmissions_.erase(segmentNumber); + if (schedule_next_interest) { scheduleNextInterests(); } diff --git a/libtransport/src/hicn/transport/protocols/rtc.h b/libtransport/src/hicn/transport/protocols/rtc.h index 227f1d737..55deead6e 100644 --- a/libtransport/src/hicn/transport/protocols/rtc.h +++ b/libtransport/src/hicn/transport/protocols/rtc.h @@ -16,7 +16,7 @@ #pragma once #include <queue> -#include <set> +#include <map> #include <unordered_map> #include <hicn/transport/protocols/protocol.h> @@ -38,7 +38,9 @@ // controller constant #define HICN_ROUND_LEN \ 200 // ms interval of time on which we take decisions / measurements -#define HICN_MAX_RTX 3 +#define HICN_MAX_RTX 10 +#define HICN_MAX_RTX_SIZE 1024 +#define HICN_MAX_RTX_MAX_AGE 10000 #define HICN_MIN_RTT_WIN 30 // rounds // cwin @@ -82,11 +84,21 @@ namespace transport { namespace protocol { +enum packetState { + sent_, + received_, + timeout1_, + timeout2_, + lost_ +}; + +typedef enum packetState packetState_t; + struct sentInterest { uint64_t transmissionTime; uint32_t sequence; //sequence number of the interest sent - uint8_t retransmissions; - uint8_t received; //1 = received, 0 = not received + //to handle seq % buffer_size + packetState_t state; //see packet state }; class RTCTransportProtocol : public TransportProtocol, public Reassembly { @@ -119,9 +131,13 @@ class RTCTransportProtocol : public TransportProtocol, public Reassembly { void resetPreviousWindow(); // packet functions - void sendInterest(); + void sendInterest(Name *interest_name, bool rtx); void scheduleNextInterests() override; void scheduleAppNackRtx(std::vector<uint32_t> &nacks); + void addRetransmissions(uint32_t val); + void addRetransmissions(uint32_t start, uint32_t stop); + void retransmit(bool first_rtx); + void checkRtx(); void onTimeout(Interest::Ptr &&interest) override; // checkIfProducerIsActive: return true if we need to schedule an interest // immediatly after, false otherwise (this happens when the producer socket @@ -160,7 +176,10 @@ class RTCTransportProtocol : public TransportProtocol, public Reassembly { uint32_t actualSegment_; int32_t RTPhICN_offset_; uint32_t inflightInterestsCount_; - std::queue<uint32_t> interestRetransmissions_; + //map seq to rtx + std::map<uint32_t, uint8_t> interestRetransmissions_; + std::unique_ptr<asio::steady_timer> rtx_timer_; + //std::queue<uint32_t> interestRetransmissions_; std::vector<sentInterest> inflightInterests_; uint32_t lastSegNacked_; //indicates the segment id in the last received // past Nack. we do not ask for retransmissions |