From 755c6833ae2d2eee87e80ed3b84c75e968f48c46 Mon Sep 17 00:00:00 2001 From: Alberto Compagno Date: Tue, 15 Oct 2019 18:08:41 +0200 Subject: [HICN-328] Reworking setSocketOption and getSocketOption to be thread-safe Change-Id: Ie22572822b9ac1e6c300fd7982035c799546bd76 Signed-off-by: Alberto Compagno --- .../transport/interfaces/rtc_socket_producer.cc | 158 ++++++++++----------- 1 file changed, 79 insertions(+), 79 deletions(-) (limited to 'libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc') diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc index e7dc98868..c1a45ebb7 100644 --- a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc +++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc @@ -13,6 +13,7 @@ * limitations under the License. */ +#include #include #include #include @@ -31,9 +32,10 @@ #define HICN_MAX_DATA_SEQ 0xefffffff -//slow production rate param -#define MIN_PRODUCTION_RATE 10 // in pacekts per sec. this value is computed - // through experiments +// slow production rate param +#define MIN_PRODUCTION_RATE \ + 10 // in pacekts per sec. this value is computed + // through experiments #define LIFETIME_FRACTION 0.5 // NACK HEADER @@ -63,13 +65,12 @@ RTCProducerSocket::RTCProducerSocket(asio::io_service &io_service) bytesProductionRate_(INIT_PACKET_PRODUCTION_RATE * 1400), packetsProductionRate_(INIT_PACKET_PRODUCTION_RATE), perSecondFactor_(MILLI_IN_A_SEC / STATS_INTERVAL_DURATION), - timer_on_(false){ + timer_on_(false) { srand((unsigned int)time(NULL)); prodLabel_ = ((rand() % 255) << 24UL); - interests_cache_timer_ = std::make_unique( - this->getIoService()); - round_timer_ = std::make_unique( - this->getIoService()); + interests_cache_timer_ = + std::make_unique(this->getIoService()); + round_timer_ = std::make_unique(this->getIoService()); setSocketOption(GeneralTransportOptions::OUTPUT_BUFFER_SIZE, 10000U); scheduleRoundTimer(); } @@ -82,13 +83,12 @@ RTCProducerSocket::RTCProducerSocket() bytesProductionRate_(INIT_PACKET_PRODUCTION_RATE * 1400), packetsProductionRate_(INIT_PACKET_PRODUCTION_RATE), perSecondFactor_(MILLI_IN_A_SEC / STATS_INTERVAL_DURATION), - timer_on_(false){ + timer_on_(false) { srand((unsigned int)time(NULL)); prodLabel_ = ((rand() % 255) << 24UL); - interests_cache_timer_ = std::make_unique( - this->getIoService()); - round_timer_ = std::make_unique( - this->getIoService()); + interests_cache_timer_ = + std::make_unique(this->getIoService()); + round_timer_ = std::make_unique(this->getIoService()); setSocketOption(GeneralTransportOptions::OUTPUT_BUFFER_SIZE, 10000U); scheduleRoundTimer(); } @@ -113,13 +113,13 @@ void RTCProducerSocket::registerPrefix(const Prefix &producer_namespace) { } } -void RTCProducerSocket::scheduleRoundTimer(){ - round_timer_->expires_from_now( - std::chrono::milliseconds(STATS_INTERVAL_DURATION)); - round_timer_->async_wait([this](std::error_code ec) { - if (ec) return; - updateStats(); - }); +void RTCProducerSocket::scheduleRoundTimer() { + round_timer_->expires_from_now( + std::chrono::milliseconds(STATS_INTERVAL_DURATION)); + round_timer_->async_wait([this](std::error_code ec) { + if (ec) return; + updateStats(); + }); } void RTCProducerSocket::updateStats() { @@ -150,8 +150,8 @@ void RTCProducerSocket::produce(std::unique_ptr &&buffer) { producedBytes_ += (uint32_t)(buffer_size + headerSize_ + TIMESTAMP_LEN); producedPackets_++; - auto content_object = std::make_shared( - flowName_.setSuffix(currentSeg_.load())); + auto content_object = + std::make_shared(flowName_.setSuffix(currentSeg_.load())); auto payload = utils::MemBuf::create(TIMESTAMP_LEN); memcpy(payload->writableData(), &now, TIMESTAMP_LEN); @@ -166,27 +166,26 @@ void RTCProducerSocket::produce(std::unique_ptr &&buffer) { output_buffer_.insert(std::static_pointer_cast( content_object->shared_from_this())); - if (on_content_object_in_output_buffer_ != VOID_HANDLER) { + if (on_content_object_in_output_buffer_) { on_content_object_in_output_buffer_(*this, *content_object); } portal_->sendContentObject(*content_object); - if (on_content_object_output_ != VOID_HANDLER) { + if (on_content_object_output_) { on_content_object_output_(*this, *content_object); } uint32_t old_curr = currentSeg_.load(); currentSeg_ = (currentSeg_.load() + 1) % HICN_MAX_DATA_SEQ; - //remove interests from the interest cache if it exists - //this generates nacks that will tell to the consumer - //that a new data packet was produced - if(!seqs_map_.empty()){ + // remove interests from the interest cache if it exists + // this generates nacks that will tell to the consumer + // that a new data packet was produced + if (!seqs_map_.empty()) { utils::SpinLock::Acquire locked(interests_cache_lock_); - for(auto it = seqs_map_.begin(); it != seqs_map_.end(); it++){ - if(it->first != old_curr) - sendNack(it->first); + for (auto it = seqs_map_.begin(); it != seqs_map_.end(); it++) { + if (it->first != old_curr) sendNack(it->first); } seqs_map_.clear(); timers_map_.clear(); @@ -197,15 +196,15 @@ void RTCProducerSocket::onInterest(Interest::Ptr &&interest) { uint32_t interestSeg = interest->getName().getSuffix(); uint32_t lifetime = interest->getLifetime(); - if (on_interest_input_ != VOID_HANDLER) { + if (on_interest_input_) { on_interest_input_(*this, *interest); } uint64_t now = std::chrono::duration_cast( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); + std::chrono::steady_clock::now().time_since_epoch()) + .count(); - if(interestSeg > HICN_MAX_DATA_SEQ){ + if (interestSeg > HICN_MAX_DATA_SEQ) { sendNack(interestSeg); return; } @@ -214,71 +213,73 @@ void RTCProducerSocket::onInterest(Interest::Ptr &&interest) { output_buffer_.find(*interest); if (content_object) { - if (on_interest_satisfied_output_buffer_ != VOID_HANDLER) { + if (on_interest_satisfied_output_buffer_) { on_interest_satisfied_output_buffer_(*this, *interest); } - if (on_content_object_output_ != VOID_HANDLER) { + if (on_content_object_output_) { on_content_object_output_(*this, *content_object); } portal_->sendContentObject(*content_object); return; } else { - if (on_interest_process_ != VOID_HANDLER) { + if (on_interest_process_) { on_interest_process_(*this, *interest); } } // if the production rate is less than MIN_PRODUCTION_RATE we put the // interest in a queue, otherwise we handle it in the usual way - if(packetsProductionRate_.load() < MIN_PRODUCTION_RATE && - interestSeg >= currentSeg_.load()){ - + if (packetsProductionRate_.load() < MIN_PRODUCTION_RATE && + interestSeg >= currentSeg_.load()) { utils::SpinLock::Acquire locked(interests_cache_lock_); uint64_t next_timer = ~0; - if(!timers_map_.empty()){ + if (!timers_map_.empty()) { next_timer = timers_map_.begin()->first; } uint64_t expiration = now + (lifetime * LIFETIME_FRACTION); - //check if the seq number exists already + // check if the seq number exists already auto it_seqs = seqs_map_.find(interestSeg); - if(it_seqs != seqs_map_.end()){ - //the seq already exists - if(expiration < it_seqs->second){ + if (it_seqs != seqs_map_.end()) { + // the seq already exists + if (expiration < it_seqs->second) { // we need to update the timer becasue we got a smaller one // 1) remove the entry from the multimap // 2) update this entry auto range = timers_map_.equal_range(it_seqs->second); - for(auto it_timers = range.first; it_timers != range.second; it_timers++){ - if(it_timers->second == it_seqs->first){ + for (auto it_timers = range.first; it_timers != range.second; + it_timers++) { + if (it_timers->second == it_seqs->first) { timers_map_.erase(it_timers); break; } } - timers_map_.insert(std::pair(expiration, interestSeg)); + timers_map_.insert( + std::pair(expiration, interestSeg)); it_seqs->second = expiration; - }else{ - //nothing to do here - return; + } else { + // nothing to do here + return; } - }else{ + } else { // add the new seq - timers_map_.insert(std::pair(expiration, interestSeg)); - seqs_map_.insert(std::pair(interestSeg, expiration)); + timers_map_.insert( + std::pair(expiration, interestSeg)); + seqs_map_.insert(std::pair(interestSeg, expiration)); } - //here we have at least one interest in the queue, we need to start or - //update the timer - if(!timer_on_){ - //set timeout + // here we have at least one interest in the queue, we need to start or + // update the timer + if (!timer_on_) { + // set timeout timer_on_ = true; scheduleCacheTimer(timers_map_.begin()->first - now); } else { - //re-schedule the timer because a new interest will expires sooner - if(next_timer > timers_map_.begin()->first){ + // re-schedule the timer because a new interest will expires sooner + if (next_timer > timers_map_.begin()->first) { interests_cache_timer_->cancel(); scheduleCacheTimer(timers_map_.begin()->first - now); } @@ -292,44 +293,43 @@ void RTCProducerSocket::onInterest(Interest::Ptr &&interest) { (double)packetsProductionRate_.load())); if (interestSeg < currentSeg_.load() || - interestSeg > (max_gap + currentSeg_.load())) { + interestSeg > (max_gap + currentSeg_.load())) { sendNack(interestSeg); } // else drop packet } -void RTCProducerSocket::scheduleCacheTimer(uint64_t wait){ - interests_cache_timer_->expires_from_now( - std::chrono::milliseconds(wait)); +void RTCProducerSocket::scheduleCacheTimer(uint64_t wait) { + interests_cache_timer_->expires_from_now(std::chrono::milliseconds(wait)); interests_cache_timer_->async_wait([this](std::error_code ec) { - if (ec) return; - interestCacheTimer(); - }); + if (ec) return; + interestCacheTimer(); + }); } -void RTCProducerSocket::interestCacheTimer(){ +void RTCProducerSocket::interestCacheTimer() { uint64_t now = std::chrono::duration_cast( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); + std::chrono::steady_clock::now().time_since_epoch()) + .count(); utils::SpinLock::Acquire locked(interests_cache_lock_); - for(auto it_timers = timers_map_.begin(); it_timers != timers_map_.end();){ + for (auto it_timers = timers_map_.begin(); it_timers != timers_map_.end();) { uint64_t expire = it_timers->first; - if(expire <= now){ + if (expire <= now) { uint32_t seq = it_timers->second; sendNack(seq); - //remove the interest from the other map + // remove the interest from the other map seqs_map_.erase(seq); it_timers = timers_map_.erase(it_timers); - }else{ - //stop, we are done! + } else { + // stop, we are done! break; } } - if(timers_map_.empty()){ + if (timers_map_.empty()) { timer_on_ = false; - }else{ + } else { timer_on_ = true; scheduleCacheTimer(timers_map_.begin()->first - now); } @@ -351,7 +351,7 @@ void RTCProducerSocket::sendNack(uint32_t sequence) { nack.setLifetime(0); nack.setPathLabel(prodLabel_); - if (on_content_object_output_ != VOID_HANDLER) { + if (on_content_object_output_) { on_content_object_output_(*this, nack); } -- cgit 1.2.3-korg