From 691b16435dbc0b8e53376a62683e792d0f25cc66 Mon Sep 17 00:00:00 2001 From: michele papalini Date: Thu, 10 Oct 2019 17:57:10 +0200 Subject: [HICN-317] schedule rounds using timers in rtc producer Signed-off-by: michele papalini Change-Id: I4a5fe9c954713dc266e7aeb5f461b460d508d8e3 --- .../transport/interfaces/rtc_socket_producer.cc | 51 ++++++++++++---------- .../transport/interfaces/rtc_socket_producer.h | 12 ++--- 2 files changed, 35 insertions(+), 28 deletions(-) diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc index 009db812f..740f9f77c 100644 --- a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc +++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc @@ -64,13 +64,13 @@ RTCProducerSocket::RTCProducerSocket(asio::io_service &io_service) packetsProductionRate_(INIT_PACKET_PRODUCTION_RATE), perSecondFactor_(MILLI_IN_A_SEC / STATS_INTERVAL_DURATION), timer_on_(false){ - lastStats_ = std::chrono::duration_cast( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); srand((unsigned int)time(NULL)); prodLabel_ = ((rand() % 255) << 24UL); interests_cache_timer_ = std::make_unique( this->getIoService()); + round_timer_ = std::make_unique( + this->getIoService()); + scheduleRoundTimer(); } RTCProducerSocket::RTCProducerSocket() @@ -82,13 +82,13 @@ RTCProducerSocket::RTCProducerSocket() packetsProductionRate_(INIT_PACKET_PRODUCTION_RATE), perSecondFactor_(MILLI_IN_A_SEC / STATS_INTERVAL_DURATION), timer_on_(false){ - lastStats_ = std::chrono::duration_cast( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); srand((unsigned int)time(NULL)); prodLabel_ = ((rand() % 255) << 24UL); interests_cache_timer_ = std::make_unique( this->getIoService()); + round_timer_ = std::make_unique( + this->getIoService()); + scheduleRoundTimer(); } RTCProducerSocket::~RTCProducerSocket() {} @@ -111,18 +111,22 @@ void RTCProducerSocket::registerPrefix(const Prefix &producer_namespace) { } } -void RTCProducerSocket::updateStats(uint32_t packet_size, uint64_t now) { - producedBytes_ += packet_size; - producedPackets_++; - uint64_t duration = now - lastStats_; - if (duration >= STATS_INTERVAL_DURATION) { - lastStats_ = now; - bytesProductionRate_ = producedBytes_ * perSecondFactor_; - packetsProductionRate_ = producedPackets_ * perSecondFactor_; - if (packetsProductionRate_.load() == 0) packetsProductionRate_ = 1; - producedBytes_ = 0; - producedPackets_ = 0; - } +void RTCProducerSocket::scheduleRoundTimer(){ + round_timer_->expires_from_now( + std::chrono::milliseconds(STATS_INTERVAL_DURATION)); + round_timer_->async_wait([this](std::error_code ec) { + if (ec) return; + updateStats(); + }); +} + +void RTCProducerSocket::updateStats() { + bytesProductionRate_ = producedBytes_.load() * perSecondFactor_; + packetsProductionRate_ = producedPackets_.load() * perSecondFactor_; + if (packetsProductionRate_.load() == 0) packetsProductionRate_ = 1; + producedBytes_ = 0; + producedPackets_ = 0; + scheduleRoundTimer(); } void RTCProducerSocket::produce(std::unique_ptr &&buffer) { @@ -141,7 +145,8 @@ void RTCProducerSocket::produce(std::unique_ptr &&buffer) { std::chrono::steady_clock::now().time_since_epoch()) .count(); - updateStats((uint32_t)(buffer_size + headerSize_ + TIMESTAMP_LEN), now); + producedBytes_ += (uint32_t)(buffer_size + headerSize_ + TIMESTAMP_LEN); + producedPackets_++; ContentObject content_object(flowName_.setSuffix(currentSeg_)); @@ -244,12 +249,12 @@ void RTCProducerSocket::onInterest(Interest::Ptr &&interest) { if(!timer_on_){ //set timeout timer_on_ = true; - scheduleTimer(timers_map_.begin()->first - now); + scheduleCacheTimer(timers_map_.begin()->first - now); } else { //re-schedule the timer because a new interest will expires sooner if(next_timer > timers_map_.begin()->first){ interests_cache_timer_->cancel(); - scheduleTimer(timers_map_.begin()->first - now); + scheduleCacheTimer(timers_map_.begin()->first - now); } } return; @@ -266,7 +271,7 @@ void RTCProducerSocket::onInterest(Interest::Ptr &&interest) { // else drop packet } -void RTCProducerSocket::scheduleTimer(uint64_t wait){ +void RTCProducerSocket::scheduleCacheTimer(uint64_t wait){ interests_cache_timer_->expires_from_now( std::chrono::milliseconds(wait)); interests_cache_timer_->async_wait([this](std::error_code ec) { @@ -299,7 +304,7 @@ void RTCProducerSocket::interestCacheTimer(){ timer_on_ = false; }else{ timer_on_ = true; - scheduleTimer(timers_map_.begin()->first - now); + scheduleCacheTimer(timers_map_.begin()->first - now); } } diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h index 62aa7a296..a2540ceef 100644 --- a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h +++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h @@ -42,20 +42,22 @@ class RTCProducerSocket : public ProducerSocket { private: void sendNack(uint32_t sequence); - void updateStats(uint32_t packet_size, uint64_t now); - void scheduleTimer(uint64_t wait); + void updateStats(); + void scheduleCacheTimer(uint64_t wait); + void scheduleRoundTimer(); void interestCacheTimer(); uint32_t currentSeg_; uint32_t prodLabel_; uint16_t headerSize_; Name flowName_; - uint32_t producedBytes_; - uint32_t producedPackets_; + std::atomic producedBytes_; + std::atomic producedPackets_; std::atomic bytesProductionRate_; std::atomic packetsProductionRate_; uint32_t perSecondFactor_; - uint64_t lastStats_; + + std::unique_ptr round_timer_; // cache for the received interests // this map maps the expiration time of an interest to -- cgit 1.2.3-korg