From 5891bc7dc9d5538614f23dc176f40f6e5d18efc2 Mon Sep 17 00:00:00 2001 From: michele papalini Date: Thu, 10 Oct 2019 16:00:23 +0200 Subject: [HICN-316] improve rtc for low rate streams Signed-off-by: michele papalini Change-Id: I29d9720450f8cee429eb02a494092f208c298355 --- .../transport/interfaces/rtc_socket_producer.cc | 50 +++++----------------- .../transport/interfaces/rtc_socket_producer.h | 8 +--- 2 files changed, 12 insertions(+), 46 deletions(-) (limited to 'libtransport/src/hicn/transport/interfaces') diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc index 5667b0640..009db812f 100644 --- a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc +++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc @@ -32,8 +32,8 @@ #define HICN_MAX_DATA_SEQ 0xefffffff //slow production rate param -#define MIN_PRODUCTION_RATE 8000 // in bytes per sec. this value is computed - // through experiments +#define MIN_PRODUCTION_RATE 10 // in pacekts per sec. this value is computed + // through experiments #define LIFETIME_FRACTION 0.5 // NACK HEADER @@ -63,8 +63,7 @@ RTCProducerSocket::RTCProducerSocket(asio::io_service &io_service) bytesProductionRate_(INIT_PACKET_PRODUCTION_RATE * 1400), packetsProductionRate_(INIT_PACKET_PRODUCTION_RATE), perSecondFactor_(MILLI_IN_A_SEC / STATS_INTERVAL_DURATION), - timer_on_(false), - active_(false) { + timer_on_(false){ lastStats_ = std::chrono::duration_cast( std::chrono::steady_clock::now().time_since_epoch()) .count(); @@ -82,8 +81,7 @@ RTCProducerSocket::RTCProducerSocket() bytesProductionRate_(INIT_PACKET_PRODUCTION_RATE * 1400), packetsProductionRate_(INIT_PACKET_PRODUCTION_RATE), perSecondFactor_(MILLI_IN_A_SEC / STATS_INTERVAL_DURATION), - timer_on_(false), - active_(false) { + timer_on_(false){ lastStats_ = std::chrono::duration_cast( std::chrono::steady_clock::now().time_since_epoch()) .count(); @@ -143,12 +141,6 @@ void RTCProducerSocket::produce(std::unique_ptr &&buffer) { std::chrono::steady_clock::now().time_since_epoch()) .count(); - { - utils::SpinLock::Acquire locked(lock_); - active_ = true; - lastProduced_ = now; - } - updateStats((uint32_t)(buffer_size + headerSize_ + TIMESTAMP_LEN), now); ContentObject content_object(flowName_.setSuffix(currentSeg_)); @@ -203,27 +195,14 @@ void RTCProducerSocket::onInterest(Interest::Ptr &&interest) { std::chrono::steady_clock::now().time_since_epoch()) .count(); - bool isActive; - { - utils::SpinLock::Acquire locked(lock_); - isActive = active_; - if (isActive) { - if ((now - lastProduced_) > INACTIVE_TIME) { - // socket is inactive - active_ = false; - isActive = false; - } - } - } - if(interestSeg > HICN_MAX_DATA_SEQ){ - sendNack(interestSeg, isActive); + sendNack(interestSeg); return; } // if the production rate is less than MIN_PRODUCTION_RATE we put the // interest in a queue, otherwise we handle it in the usual way - if(bytesProductionRate_ < MIN_PRODUCTION_RATE && interestSeg > currentSeg_){ + if(packetsProductionRate_.load() < MIN_PRODUCTION_RATE && interestSeg >= currentSeg_){ utils::SpinLock::Acquire locked(interests_cache_lock_); @@ -276,18 +255,13 @@ void RTCProducerSocket::onInterest(Interest::Ptr &&interest) { return; } - if (TRANSPORT_EXPECT_FALSE(!isActive)) { - sendNack(interestSeg, false); - return; - } - uint32_t max_gap = (uint32_t)floor( (double)((double)((double)lifetime * INTEREST_LIFETIME_REDUCTION_FACTOR / 1000.0) * (double)packetsProductionRate_.load())); if (interestSeg < currentSeg_ || interestSeg > (max_gap + currentSeg_)) { - sendNack(interestSeg, true); + sendNack(interestSeg); } // else drop packet } @@ -312,7 +286,7 @@ void RTCProducerSocket::interestCacheTimer(){ uint64_t expire = it_timers->first; if(expire <= now){ uint32_t seq = it_timers->second; - sendNack(seq, active_); + sendNack(seq); //remove the interest from the other map seqs_map_.erase(seq); it_timers = timers_map_.erase(it_timers); @@ -329,7 +303,7 @@ void RTCProducerSocket::interestCacheTimer(){ } } -void RTCProducerSocket::sendNack(uint32_t sequence, bool isActive) { +void RTCProducerSocket::sendNack(uint32_t sequence) { auto nack_payload = utils::MemBuf::create(NACK_HEADER_SIZE); nack_payload->append(NACK_HEADER_SIZE); ContentObject nack; @@ -340,11 +314,7 @@ void RTCProducerSocket::sendNack(uint32_t sequence, bool isActive) { uint32_t *payload_ptr = (uint32_t *)nack.getPayload()->data(); *payload_ptr = currentSeg_; - if (isActive) { - *(++payload_ptr) = bytesProductionRate_; - } else { - *(++payload_ptr) = 0; - } + *(++payload_ptr) = bytesProductionRate_.load(); nack.setLifetime(0); nack.setPathLabel(prodLabel_); diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h index aa67f1a29..62aa7a296 100644 --- a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h +++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h @@ -41,7 +41,7 @@ class RTCProducerSocket : public ProducerSocket { void onInterest(Interest::Ptr &&interest) override; private: - void sendNack(uint32_t sequence, bool isActive); + void sendNack(uint32_t sequence); void updateStats(uint32_t packet_size, uint64_t now); void scheduleTimer(uint64_t wait); void interestCacheTimer(); @@ -52,7 +52,7 @@ class RTCProducerSocket : public ProducerSocket { Name flowName_; uint32_t producedBytes_; uint32_t producedPackets_; - uint32_t bytesProductionRate_; + std::atomic bytesProductionRate_; std::atomic packetsProductionRate_; uint32_t perSecondFactor_; uint64_t lastStats_; @@ -70,10 +70,6 @@ class RTCProducerSocket : public ProducerSocket { bool timer_on_; std::unique_ptr interests_cache_timer_; utils::SpinLock interests_cache_lock_; - - uint64_t lastProduced_; - bool active_; - utils::SpinLock lock_; }; } // namespace interface -- cgit 1.2.3-korg