aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport
diff options
context:
space:
mode:
Diffstat (limited to 'libtransport')
-rw-r--r--libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc51
-rw-r--r--libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h12
2 files changed, 35 insertions, 28 deletions
diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc
index 009db812f..740f9f77c 100644
--- a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc
+++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc
@@ -64,13 +64,13 @@ RTCProducerSocket::RTCProducerSocket(asio::io_service &io_service)
packetsProductionRate_(INIT_PACKET_PRODUCTION_RATE),
perSecondFactor_(MILLI_IN_A_SEC / STATS_INTERVAL_DURATION),
timer_on_(false){
- lastStats_ = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
srand((unsigned int)time(NULL));
prodLabel_ = ((rand() % 255) << 24UL);
interests_cache_timer_ = std::make_unique<asio::steady_timer>(
this->getIoService());
+ round_timer_ = std::make_unique<asio::steady_timer>(
+ this->getIoService());
+ scheduleRoundTimer();
}
RTCProducerSocket::RTCProducerSocket()
@@ -82,13 +82,13 @@ RTCProducerSocket::RTCProducerSocket()
packetsProductionRate_(INIT_PACKET_PRODUCTION_RATE),
perSecondFactor_(MILLI_IN_A_SEC / STATS_INTERVAL_DURATION),
timer_on_(false){
- lastStats_ = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
srand((unsigned int)time(NULL));
prodLabel_ = ((rand() % 255) << 24UL);
interests_cache_timer_ = std::make_unique<asio::steady_timer>(
this->getIoService());
+ round_timer_ = std::make_unique<asio::steady_timer>(
+ this->getIoService());
+ scheduleRoundTimer();
}
RTCProducerSocket::~RTCProducerSocket() {}
@@ -111,18 +111,22 @@ void RTCProducerSocket::registerPrefix(const Prefix &producer_namespace) {
}
}
-void RTCProducerSocket::updateStats(uint32_t packet_size, uint64_t now) {
- producedBytes_ += packet_size;
- producedPackets_++;
- uint64_t duration = now - lastStats_;
- if (duration >= STATS_INTERVAL_DURATION) {
- lastStats_ = now;
- bytesProductionRate_ = producedBytes_ * perSecondFactor_;
- packetsProductionRate_ = producedPackets_ * perSecondFactor_;
- if (packetsProductionRate_.load() == 0) packetsProductionRate_ = 1;
- producedBytes_ = 0;
- producedPackets_ = 0;
- }
+void RTCProducerSocket::scheduleRoundTimer(){
+ round_timer_->expires_from_now(
+ std::chrono::milliseconds(STATS_INTERVAL_DURATION));
+ round_timer_->async_wait([this](std::error_code ec) {
+ if (ec) return;
+ updateStats();
+ });
+}
+
+void RTCProducerSocket::updateStats() {
+ bytesProductionRate_ = producedBytes_.load() * perSecondFactor_;
+ packetsProductionRate_ = producedPackets_.load() * perSecondFactor_;
+ if (packetsProductionRate_.load() == 0) packetsProductionRate_ = 1;
+ producedBytes_ = 0;
+ producedPackets_ = 0;
+ scheduleRoundTimer();
}
void RTCProducerSocket::produce(std::unique_ptr<utils::MemBuf> &&buffer) {
@@ -141,7 +145,8 @@ void RTCProducerSocket::produce(std::unique_ptr<utils::MemBuf> &&buffer) {
std::chrono::steady_clock::now().time_since_epoch())
.count();
- updateStats((uint32_t)(buffer_size + headerSize_ + TIMESTAMP_LEN), now);
+ producedBytes_ += (uint32_t)(buffer_size + headerSize_ + TIMESTAMP_LEN);
+ producedPackets_++;
ContentObject content_object(flowName_.setSuffix(currentSeg_));
@@ -244,12 +249,12 @@ void RTCProducerSocket::onInterest(Interest::Ptr &&interest) {
if(!timer_on_){
//set timeout
timer_on_ = true;
- scheduleTimer(timers_map_.begin()->first - now);
+ scheduleCacheTimer(timers_map_.begin()->first - now);
} else {
//re-schedule the timer because a new interest will expires sooner
if(next_timer > timers_map_.begin()->first){
interests_cache_timer_->cancel();
- scheduleTimer(timers_map_.begin()->first - now);
+ scheduleCacheTimer(timers_map_.begin()->first - now);
}
}
return;
@@ -266,7 +271,7 @@ void RTCProducerSocket::onInterest(Interest::Ptr &&interest) {
// else drop packet
}
-void RTCProducerSocket::scheduleTimer(uint64_t wait){
+void RTCProducerSocket::scheduleCacheTimer(uint64_t wait){
interests_cache_timer_->expires_from_now(
std::chrono::milliseconds(wait));
interests_cache_timer_->async_wait([this](std::error_code ec) {
@@ -299,7 +304,7 @@ void RTCProducerSocket::interestCacheTimer(){
timer_on_ = false;
}else{
timer_on_ = true;
- scheduleTimer(timers_map_.begin()->first - now);
+ scheduleCacheTimer(timers_map_.begin()->first - now);
}
}
diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h
index 62aa7a296..a2540ceef 100644
--- a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h
+++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h
@@ -42,20 +42,22 @@ class RTCProducerSocket : public ProducerSocket {
private:
void sendNack(uint32_t sequence);
- void updateStats(uint32_t packet_size, uint64_t now);
- void scheduleTimer(uint64_t wait);
+ void updateStats();
+ void scheduleCacheTimer(uint64_t wait);
+ void scheduleRoundTimer();
void interestCacheTimer();
uint32_t currentSeg_;
uint32_t prodLabel_;
uint16_t headerSize_;
Name flowName_;
- uint32_t producedBytes_;
- uint32_t producedPackets_;
+ std::atomic<uint32_t> producedBytes_;
+ std::atomic<uint32_t> producedPackets_;
std::atomic<uint32_t> bytesProductionRate_;
std::atomic<uint32_t> packetsProductionRate_;
uint32_t perSecondFactor_;
- uint64_t lastStats_;
+
+ std::unique_ptr<asio::steady_timer> round_timer_;
// cache for the received interests
// this map maps the expiration time of an interest to