From e8e399a5fd74ff35f703364c876e01679f0606b7 Mon Sep 17 00:00:00 2001 From: michele papalini Date: Tue, 8 Oct 2019 11:20:25 +0200 Subject: [HICN-302] low rate traffic in RTC Signed-off-by: michele papalini Change-Id: Ib6511d82abc91e9008588cd2b7fd80022c6d232b --- .../transport/interfaces/rtc_socket_producer.cc | 138 +++++++++++++++++++-- .../transport/interfaces/rtc_socket_producer.h | 18 ++- libtransport/src/hicn/transport/protocols/rtc.cc | 2 +- 3 files changed, 147 insertions(+), 11 deletions(-) (limited to 'libtransport/src') diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc index d1e89efdc..446b9ef8e 100644 --- a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc +++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc @@ -31,6 +31,11 @@ #define HICN_MAX_DATA_SEQ 0xefffffff +//slow production rate param +#define MIN_PRODUCTION_RATE 8000 // in bytes per sec. this value is computed + // through experiments +#define LIFETIME_FRACTION 0.5 + // NACK HEADER // +-----------------------------------------+ // | 4 bytes: current segment in production | @@ -58,12 +63,15 @@ RTCProducerSocket::RTCProducerSocket(asio::io_service &io_service) bytesProductionRate_(INIT_PACKET_PRODUCTION_RATE * 1400), packetsProductionRate_(INIT_PACKET_PRODUCTION_RATE), perSecondFactor_(MILLI_IN_A_SEC / STATS_INTERVAL_DURATION), + timer_on_(false), active_(false) { lastStats_ = std::chrono::duration_cast( std::chrono::steady_clock::now().time_since_epoch()) .count(); srand((unsigned int)time(NULL)); prodLabel_ = ((rand() % 255) << 24UL); + interests_cache_timer_ = std::make_unique( + this->getIoService()); } RTCProducerSocket::RTCProducerSocket() @@ -80,6 +88,8 @@ RTCProducerSocket::RTCProducerSocket() .count(); srand((unsigned int)time(NULL)); prodLabel_ = ((rand() % 255) << 24UL); + interests_cache_timer_ = std::make_unique( + this->getIoService()); } RTCProducerSocket::~RTCProducerSocket() {} @@ -159,6 +169,23 @@ void RTCProducerSocket::produce(std::unique_ptr &&buffer) { portal_->sendContentObject(content_object); + //remove interests from the interest cache if it exists + if(!seqs_map_.empty()){ + + utils::SpinLock::Acquire locked(interests_cache_lock_); + + auto it_seqs = seqs_map_.find(currentSeg_); + if(it_seqs != seqs_map_.end()){ + auto range = timers_map_.equal_range(it_seqs->second); + for(auto it_timers = range.first; it_timers != range.second; it_timers++){ + if(it_timers->second == it_seqs->first){ + timers_map_.erase(it_timers); + break; + } + } + } + } + currentSeg_ = (currentSeg_ + 1) % HICN_MAX_DATA_SEQ; } @@ -170,14 +197,15 @@ void RTCProducerSocket::onInterest(Interest::Ptr &&interest) { on_interest_input_(*this, *interest); } + uint64_t now = std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + bool isActive; { utils::SpinLock::Acquire locked(lock_); isActive = active_; if (isActive) { - uint64_t now = std::chrono::duration_cast( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); if ((now - lastProduced_) > INACTIVE_TIME) { // socket is inactive active_ = false; @@ -186,13 +214,68 @@ void RTCProducerSocket::onInterest(Interest::Ptr &&interest) { } } - if (TRANSPORT_EXPECT_FALSE(!isActive)) { - sendNack(*interest, false); + // if the production rate is less than MIN_PRODUCTION_RATE we put the + // interest in a queue, otherwise we handle it in the usual way + if(bytesProductionRate_ < MIN_PRODUCTION_RATE){ + + utils::SpinLock::Acquire locked(interests_cache_lock_); + + uint64_t next_timer = ~0; + if(!timers_map_.empty()){ + next_timer = timers_map_.begin()->first; + } + + uint64_t expiration = now + (lifetime * LIFETIME_FRACTION); + //check if the seq number exists already + auto it_seqs = seqs_map_.find(interestSeg); + if(it_seqs != seqs_map_.end()){ + //the seq already exists + if(expiration < it_seqs->second){ + // we need to update the timer becasue we got a smaller one + // 1) remove the entry from the multimap + // 2) update this entry + auto range = timers_map_.equal_range(it_seqs->second); + for(auto it_timers = range.first; it_timers != range.second; it_timers++){ + if(it_timers->second == it_seqs->first){ + timers_map_.erase(it_timers); + break; + } + } + timers_map_.insert(std::pair(expiration, interestSeg)); + it_seqs->second = expiration; + }else{ + //nothing to do here + return; + } + }else{ + // add the new seq + timers_map_.insert(std::pair(expiration, interestSeg)); + seqs_map_.insert(std::pair(interestSeg, expiration)); + } + + //here we have at least one interest in the queue, we need to start or + //update the timer + if(!timer_on_){ + //set timeout + timer_on_ = true; + scheduleTimer(timers_map_.begin()->first - now); + } else { + //re-schedule the timer because a new interest will expires sooner + if(next_timer > timers_map_.begin()->first){ + interests_cache_timer_->cancel(); + scheduleTimer(timers_map_.begin()->first - now); + } + } return; } if(interestSeg > HICN_MAX_DATA_SEQ){ - sendNack(*interest, isActive); + sendNack(interestSeg, isActive); + return; + } + + if (TRANSPORT_EXPECT_FALSE(!isActive)) { + sendNack(interestSeg, false); return; } @@ -202,18 +285,55 @@ void RTCProducerSocket::onInterest(Interest::Ptr &&interest) { (double)packetsProductionRate_.load())); if (interestSeg < currentSeg_ || interestSeg > (max_gap + currentSeg_)) { - sendNack(*interest, true); + sendNack(interestSeg, true); } // else drop packet } -void RTCProducerSocket::sendNack(const Interest &interest, bool isActive) { +void RTCProducerSocket::scheduleTimer(uint64_t wait){ + interests_cache_timer_->expires_from_now( + std::chrono::milliseconds(wait)); + interests_cache_timer_->async_wait([this](std::error_code ec) { + if (ec) return; + interestCacheTimer(); + }); +} + +void RTCProducerSocket::interestCacheTimer(){ + uint64_t now = std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + + utils::SpinLock::Acquire locked(interests_cache_lock_); + + for(auto it_timers = timers_map_.begin(); it_timers != timers_map_.end();){ + uint64_t expire = it_timers->first; + if(expire <= now){ + uint32_t seq = it_timers->second; + sendNack(seq, active_); + //remove the interest from the other map + seqs_map_.erase(seq); + it_timers = timers_map_.erase(it_timers); + }else{ + //stop, we are done! + break; + } + } + if(timers_map_.empty()){ + timer_on_ = false; + }else{ + timer_on_ = true; + scheduleTimer(timers_map_.begin()->first - now); + } +} + +void RTCProducerSocket::sendNack(uint32_t sequence, bool isActive) { auto nack_payload = utils::MemBuf::create(NACK_HEADER_SIZE); nack_payload->append(NACK_HEADER_SIZE); ContentObject nack; nack.appendPayload(std::move(nack_payload)); - nack.setName(interest.getName()); + nack.setName(flowName_.setSuffix(sequence)); uint32_t *payload_ptr = (uint32_t *)nack.getPayload()->data(); *payload_ptr = currentSeg_; diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h index 5b9a23dd7..aa67f1a29 100644 --- a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h +++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h @@ -41,8 +41,10 @@ class RTCProducerSocket : public ProducerSocket { void onInterest(Interest::Ptr &&interest) override; private: - void sendNack(const Interest &interest, bool isActive); + void sendNack(uint32_t sequence, bool isActive); void updateStats(uint32_t packet_size, uint64_t now); + void scheduleTimer(uint64_t wait); + void interestCacheTimer(); uint32_t currentSeg_; uint32_t prodLabel_; @@ -55,6 +57,20 @@ class RTCProducerSocket : public ProducerSocket { uint32_t perSecondFactor_; uint64_t lastStats_; + // cache for the received interests + // this map maps the expiration time of an interest to + // its sequence number. the map is sorted by timeouts + // the same timeout may be used for multiple sequence numbers + // but for each sequence number we store only the smallest + // expiry time. In this way the mapping from seqs_map_ to + // timers_map_ is unique + std::multimap timers_map_; + // this map does the opposite, this map is not ordered + std::unordered_map seqs_map_; + bool timer_on_; + std::unique_ptr interests_cache_timer_; + utils::SpinLock interests_cache_lock_; + uint64_t lastProduced_; bool active_; utils::SpinLock lock_; diff --git a/libtransport/src/hicn/transport/protocols/rtc.cc b/libtransport/src/hicn/transport/protocols/rtc.cc index b3a00c58d..4104d8883 100644 --- a/libtransport/src/hicn/transport/protocols/rtc.cc +++ b/libtransport/src/hicn/transport/protocols/rtc.cc @@ -744,7 +744,7 @@ bool RTCTransportProtocol::onNack(const ContentObject &content_object, bool rtx) old_nack = true; } else if (productionSeg < nackSegment) { - actualSegment_ = (productionSeg + 1) % HICN_MIN_PROBE_SEQ; + actualSegment_ = productionSeg % HICN_MIN_PROBE_SEQ; if(!rtx){ // we are asking stuff in the future -- cgit 1.2.3-korg id='n18' href='#n18'>18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139