diff options
3 files changed, 147 insertions, 11 deletions
diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc index d1e89efdc..446b9ef8e 100644 --- a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc +++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc @@ -31,6 +31,11 @@ #define HICN_MAX_DATA_SEQ 0xefffffff +//slow production rate param +#define MIN_PRODUCTION_RATE 8000 // in bytes per sec. this value is computed + // through experiments +#define LIFETIME_FRACTION 0.5 + // NACK HEADER // +-----------------------------------------+ // | 4 bytes: current segment in production | @@ -58,12 +63,15 @@ RTCProducerSocket::RTCProducerSocket(asio::io_service &io_service) bytesProductionRate_(INIT_PACKET_PRODUCTION_RATE * 1400), packetsProductionRate_(INIT_PACKET_PRODUCTION_RATE), perSecondFactor_(MILLI_IN_A_SEC / STATS_INTERVAL_DURATION), + timer_on_(false), active_(false) { lastStats_ = std::chrono::duration_cast<std::chrono::milliseconds>( std::chrono::steady_clock::now().time_since_epoch()) .count(); srand((unsigned int)time(NULL)); prodLabel_ = ((rand() % 255) << 24UL); + interests_cache_timer_ = std::make_unique<asio::steady_timer>( + this->getIoService()); } RTCProducerSocket::RTCProducerSocket() @@ -80,6 +88,8 @@ RTCProducerSocket::RTCProducerSocket() .count(); srand((unsigned int)time(NULL)); prodLabel_ = ((rand() % 255) << 24UL); + interests_cache_timer_ = std::make_unique<asio::steady_timer>( + this->getIoService()); } RTCProducerSocket::~RTCProducerSocket() {} @@ -159,6 +169,23 @@ void RTCProducerSocket::produce(std::unique_ptr<utils::MemBuf> &&buffer) { portal_->sendContentObject(content_object); + //remove interests from the interest cache if it exists + if(!seqs_map_.empty()){ + + utils::SpinLock::Acquire locked(interests_cache_lock_); + + auto it_seqs = seqs_map_.find(currentSeg_); + if(it_seqs != seqs_map_.end()){ + auto range = timers_map_.equal_range(it_seqs->second); + for(auto it_timers = range.first; it_timers != range.second; it_timers++){ + if(it_timers->second == it_seqs->first){ + timers_map_.erase(it_timers); + break; + } + } + } + } + currentSeg_ = (currentSeg_ + 1) % HICN_MAX_DATA_SEQ; } @@ -170,14 +197,15 @@ void RTCProducerSocket::onInterest(Interest::Ptr &&interest) { on_interest_input_(*this, *interest); } + uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + bool isActive; { utils::SpinLock::Acquire locked(lock_); isActive = active_; if (isActive) { - uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); if ((now - lastProduced_) > INACTIVE_TIME) { // socket is inactive active_ = false; @@ -186,13 +214,68 @@ void RTCProducerSocket::onInterest(Interest::Ptr &&interest) { } } - if (TRANSPORT_EXPECT_FALSE(!isActive)) { - sendNack(*interest, false); + // if the production rate is less than MIN_PRODUCTION_RATE we put the + // interest in a queue, otherwise we handle it in the usual way + if(bytesProductionRate_ < MIN_PRODUCTION_RATE){ + + utils::SpinLock::Acquire locked(interests_cache_lock_); + + uint64_t next_timer = ~0; + if(!timers_map_.empty()){ + next_timer = timers_map_.begin()->first; + } + + uint64_t expiration = now + (lifetime * LIFETIME_FRACTION); + //check if the seq number exists already + auto it_seqs = seqs_map_.find(interestSeg); + if(it_seqs != seqs_map_.end()){ + //the seq already exists + if(expiration < it_seqs->second){ + // we need to update the timer becasue we got a smaller one + // 1) remove the entry from the multimap + // 2) update this entry + auto range = timers_map_.equal_range(it_seqs->second); + for(auto it_timers = range.first; it_timers != range.second; it_timers++){ + if(it_timers->second == it_seqs->first){ + timers_map_.erase(it_timers); + break; + } + } + timers_map_.insert(std::pair<uint64_t,uint32_t>(expiration, interestSeg)); + it_seqs->second = expiration; + }else{ + //nothing to do here + return; + } + }else{ + // add the new seq + timers_map_.insert(std::pair<uint64_t,uint32_t>(expiration, interestSeg)); + seqs_map_.insert(std::pair<uint32_t,uint64_t>(interestSeg, expiration)); + } + + //here we have at least one interest in the queue, we need to start or + //update the timer + if(!timer_on_){ + //set timeout + timer_on_ = true; + scheduleTimer(timers_map_.begin()->first - now); + } else { + //re-schedule the timer because a new interest will expires sooner + if(next_timer > timers_map_.begin()->first){ + interests_cache_timer_->cancel(); + scheduleTimer(timers_map_.begin()->first - now); + } + } return; } if(interestSeg > HICN_MAX_DATA_SEQ){ - sendNack(*interest, isActive); + sendNack(interestSeg, isActive); + return; + } + + if (TRANSPORT_EXPECT_FALSE(!isActive)) { + sendNack(interestSeg, false); return; } @@ -202,18 +285,55 @@ void RTCProducerSocket::onInterest(Interest::Ptr &&interest) { (double)packetsProductionRate_.load())); if (interestSeg < currentSeg_ || interestSeg > (max_gap + currentSeg_)) { - sendNack(*interest, true); + sendNack(interestSeg, true); } // else drop packet } -void RTCProducerSocket::sendNack(const Interest &interest, bool isActive) { +void RTCProducerSocket::scheduleTimer(uint64_t wait){ + interests_cache_timer_->expires_from_now( + std::chrono::milliseconds(wait)); + interests_cache_timer_->async_wait([this](std::error_code ec) { + if (ec) return; + interestCacheTimer(); + }); +} + +void RTCProducerSocket::interestCacheTimer(){ + uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + + utils::SpinLock::Acquire locked(interests_cache_lock_); + + for(auto it_timers = timers_map_.begin(); it_timers != timers_map_.end();){ + uint64_t expire = it_timers->first; + if(expire <= now){ + uint32_t seq = it_timers->second; + sendNack(seq, active_); + //remove the interest from the other map + seqs_map_.erase(seq); + it_timers = timers_map_.erase(it_timers); + }else{ + //stop, we are done! + break; + } + } + if(timers_map_.empty()){ + timer_on_ = false; + }else{ + timer_on_ = true; + scheduleTimer(timers_map_.begin()->first - now); + } +} + +void RTCProducerSocket::sendNack(uint32_t sequence, bool isActive) { auto nack_payload = utils::MemBuf::create(NACK_HEADER_SIZE); nack_payload->append(NACK_HEADER_SIZE); ContentObject nack; nack.appendPayload(std::move(nack_payload)); - nack.setName(interest.getName()); + nack.setName(flowName_.setSuffix(sequence)); uint32_t *payload_ptr = (uint32_t *)nack.getPayload()->data(); *payload_ptr = currentSeg_; diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h index 5b9a23dd7..aa67f1a29 100644 --- a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h +++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h @@ -41,8 +41,10 @@ class RTCProducerSocket : public ProducerSocket { void onInterest(Interest::Ptr &&interest) override; private: - void sendNack(const Interest &interest, bool isActive); + void sendNack(uint32_t sequence, bool isActive); void updateStats(uint32_t packet_size, uint64_t now); + void scheduleTimer(uint64_t wait); + void interestCacheTimer(); uint32_t currentSeg_; uint32_t prodLabel_; @@ -55,6 +57,20 @@ class RTCProducerSocket : public ProducerSocket { uint32_t perSecondFactor_; uint64_t lastStats_; + // cache for the received interests + // this map maps the expiration time of an interest to + // its sequence number. the map is sorted by timeouts + // the same timeout may be used for multiple sequence numbers + // but for each sequence number we store only the smallest + // expiry time. In this way the mapping from seqs_map_ to + // timers_map_ is unique + std::multimap<uint64_t,uint32_t> timers_map_; + // this map does the opposite, this map is not ordered + std::unordered_map<uint32_t,uint64_t> seqs_map_; + bool timer_on_; + std::unique_ptr<asio::steady_timer> interests_cache_timer_; + utils::SpinLock interests_cache_lock_; + uint64_t lastProduced_; bool active_; utils::SpinLock lock_; diff --git a/libtransport/src/hicn/transport/protocols/rtc.cc b/libtransport/src/hicn/transport/protocols/rtc.cc index b3a00c58d..4104d8883 100644 --- a/libtransport/src/hicn/transport/protocols/rtc.cc +++ b/libtransport/src/hicn/transport/protocols/rtc.cc @@ -744,7 +744,7 @@ bool RTCTransportProtocol::onNack(const ContentObject &content_object, bool rtx) old_nack = true; } else if (productionSeg < nackSegment) { - actualSegment_ = (productionSeg + 1) % HICN_MIN_PROBE_SEQ; + actualSegment_ = productionSeg % HICN_MIN_PROBE_SEQ; if(!rtx){ // we are asking stuff in the future |