aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc
diff options
context:
space:
mode:
authorAlberto Compagno <acompagn+fdio@cisco.com>2019-10-15 18:08:41 +0200
committerAlberto Compagno <acompagn+fdio@cisco.com>2019-10-22 11:01:41 +0200
commit755c6833ae2d2eee87e80ed3b84c75e968f48c46 (patch)
tree653345beb889acabc83b3b3b03e849fa34b1baac /libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc
parent7204bac00804448a797d4e76ced04a3b84d0d741 (diff)
[HICN-328] Reworking setSocketOption and getSocketOption to be thread-safe
Change-Id: Ie22572822b9ac1e6c300fd7982035c799546bd76 Signed-off-by: Alberto Compagno <acompagn+fdio@cisco.com>
Diffstat (limited to 'libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc')
-rw-r--r--libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc158
1 files changed, 79 insertions, 79 deletions
diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc
index e7dc98868..c1a45ebb7 100644
--- a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc
+++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc
@@ -13,6 +13,7 @@
* limitations under the License.
*/
+#include <hicn/transport/interfaces/callbacks.h>
#include <hicn/transport/interfaces/rtc_socket_producer.h>
#include <stdlib.h>
#include <time.h>
@@ -31,9 +32,10 @@
#define HICN_MAX_DATA_SEQ 0xefffffff
-//slow production rate param
-#define MIN_PRODUCTION_RATE 10 // in pacekts per sec. this value is computed
- // through experiments
+// slow production rate param
+#define MIN_PRODUCTION_RATE \
+ 10 // in pacekts per sec. this value is computed
+ // through experiments
#define LIFETIME_FRACTION 0.5
// NACK HEADER
@@ -63,13 +65,12 @@ RTCProducerSocket::RTCProducerSocket(asio::io_service &io_service)
bytesProductionRate_(INIT_PACKET_PRODUCTION_RATE * 1400),
packetsProductionRate_(INIT_PACKET_PRODUCTION_RATE),
perSecondFactor_(MILLI_IN_A_SEC / STATS_INTERVAL_DURATION),
- timer_on_(false){
+ timer_on_(false) {
srand((unsigned int)time(NULL));
prodLabel_ = ((rand() % 255) << 24UL);
- interests_cache_timer_ = std::make_unique<asio::steady_timer>(
- this->getIoService());
- round_timer_ = std::make_unique<asio::steady_timer>(
- this->getIoService());
+ interests_cache_timer_ =
+ std::make_unique<asio::steady_timer>(this->getIoService());
+ round_timer_ = std::make_unique<asio::steady_timer>(this->getIoService());
setSocketOption(GeneralTransportOptions::OUTPUT_BUFFER_SIZE, 10000U);
scheduleRoundTimer();
}
@@ -82,13 +83,12 @@ RTCProducerSocket::RTCProducerSocket()
bytesProductionRate_(INIT_PACKET_PRODUCTION_RATE * 1400),
packetsProductionRate_(INIT_PACKET_PRODUCTION_RATE),
perSecondFactor_(MILLI_IN_A_SEC / STATS_INTERVAL_DURATION),
- timer_on_(false){
+ timer_on_(false) {
srand((unsigned int)time(NULL));
prodLabel_ = ((rand() % 255) << 24UL);
- interests_cache_timer_ = std::make_unique<asio::steady_timer>(
- this->getIoService());
- round_timer_ = std::make_unique<asio::steady_timer>(
- this->getIoService());
+ interests_cache_timer_ =
+ std::make_unique<asio::steady_timer>(this->getIoService());
+ round_timer_ = std::make_unique<asio::steady_timer>(this->getIoService());
setSocketOption(GeneralTransportOptions::OUTPUT_BUFFER_SIZE, 10000U);
scheduleRoundTimer();
}
@@ -113,13 +113,13 @@ void RTCProducerSocket::registerPrefix(const Prefix &producer_namespace) {
}
}
-void RTCProducerSocket::scheduleRoundTimer(){
- round_timer_->expires_from_now(
- std::chrono::milliseconds(STATS_INTERVAL_DURATION));
- round_timer_->async_wait([this](std::error_code ec) {
- if (ec) return;
- updateStats();
- });
+void RTCProducerSocket::scheduleRoundTimer() {
+ round_timer_->expires_from_now(
+ std::chrono::milliseconds(STATS_INTERVAL_DURATION));
+ round_timer_->async_wait([this](std::error_code ec) {
+ if (ec) return;
+ updateStats();
+ });
}
void RTCProducerSocket::updateStats() {
@@ -150,8 +150,8 @@ void RTCProducerSocket::produce(std::unique_ptr<utils::MemBuf> &&buffer) {
producedBytes_ += (uint32_t)(buffer_size + headerSize_ + TIMESTAMP_LEN);
producedPackets_++;
- auto content_object = std::make_shared<ContentObject>(
- flowName_.setSuffix(currentSeg_.load()));
+ auto content_object =
+ std::make_shared<ContentObject>(flowName_.setSuffix(currentSeg_.load()));
auto payload = utils::MemBuf::create(TIMESTAMP_LEN);
memcpy(payload->writableData(), &now, TIMESTAMP_LEN);
@@ -166,27 +166,26 @@ void RTCProducerSocket::produce(std::unique_ptr<utils::MemBuf> &&buffer) {
output_buffer_.insert(std::static_pointer_cast<ContentObject>(
content_object->shared_from_this()));
- if (on_content_object_in_output_buffer_ != VOID_HANDLER) {
+ if (on_content_object_in_output_buffer_) {
on_content_object_in_output_buffer_(*this, *content_object);
}
portal_->sendContentObject(*content_object);
- if (on_content_object_output_ != VOID_HANDLER) {
+ if (on_content_object_output_) {
on_content_object_output_(*this, *content_object);
}
uint32_t old_curr = currentSeg_.load();
currentSeg_ = (currentSeg_.load() + 1) % HICN_MAX_DATA_SEQ;
- //remove interests from the interest cache if it exists
- //this generates nacks that will tell to the consumer
- //that a new data packet was produced
- if(!seqs_map_.empty()){
+ // remove interests from the interest cache if it exists
+ // this generates nacks that will tell to the consumer
+ // that a new data packet was produced
+ if (!seqs_map_.empty()) {
utils::SpinLock::Acquire locked(interests_cache_lock_);
- for(auto it = seqs_map_.begin(); it != seqs_map_.end(); it++){
- if(it->first != old_curr)
- sendNack(it->first);
+ for (auto it = seqs_map_.begin(); it != seqs_map_.end(); it++) {
+ if (it->first != old_curr) sendNack(it->first);
}
seqs_map_.clear();
timers_map_.clear();
@@ -197,15 +196,15 @@ void RTCProducerSocket::onInterest(Interest::Ptr &&interest) {
uint32_t interestSeg = interest->getName().getSuffix();
uint32_t lifetime = interest->getLifetime();
- if (on_interest_input_ != VOID_HANDLER) {
+ if (on_interest_input_) {
on_interest_input_(*this, *interest);
}
uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
- if(interestSeg > HICN_MAX_DATA_SEQ){
+ if (interestSeg > HICN_MAX_DATA_SEQ) {
sendNack(interestSeg);
return;
}
@@ -214,71 +213,73 @@ void RTCProducerSocket::onInterest(Interest::Ptr &&interest) {
output_buffer_.find(*interest);
if (content_object) {
- if (on_interest_satisfied_output_buffer_ != VOID_HANDLER) {
+ if (on_interest_satisfied_output_buffer_) {
on_interest_satisfied_output_buffer_(*this, *interest);
}
- if (on_content_object_output_ != VOID_HANDLER) {
+ if (on_content_object_output_) {
on_content_object_output_(*this, *content_object);
}
portal_->sendContentObject(*content_object);
return;
} else {
- if (on_interest_process_ != VOID_HANDLER) {
+ if (on_interest_process_) {
on_interest_process_(*this, *interest);
}
}
// if the production rate is less than MIN_PRODUCTION_RATE we put the
// interest in a queue, otherwise we handle it in the usual way
- if(packetsProductionRate_.load() < MIN_PRODUCTION_RATE &&
- interestSeg >= currentSeg_.load()){
-
+ if (packetsProductionRate_.load() < MIN_PRODUCTION_RATE &&
+ interestSeg >= currentSeg_.load()) {
utils::SpinLock::Acquire locked(interests_cache_lock_);
uint64_t next_timer = ~0;
- if(!timers_map_.empty()){
+ if (!timers_map_.empty()) {
next_timer = timers_map_.begin()->first;
}
uint64_t expiration = now + (lifetime * LIFETIME_FRACTION);
- //check if the seq number exists already
+ // check if the seq number exists already
auto it_seqs = seqs_map_.find(interestSeg);
- if(it_seqs != seqs_map_.end()){
- //the seq already exists
- if(expiration < it_seqs->second){
+ if (it_seqs != seqs_map_.end()) {
+ // the seq already exists
+ if (expiration < it_seqs->second) {
// we need to update the timer becasue we got a smaller one
// 1) remove the entry from the multimap
// 2) update this entry
auto range = timers_map_.equal_range(it_seqs->second);
- for(auto it_timers = range.first; it_timers != range.second; it_timers++){
- if(it_timers->second == it_seqs->first){
+ for (auto it_timers = range.first; it_timers != range.second;
+ it_timers++) {
+ if (it_timers->second == it_seqs->first) {
timers_map_.erase(it_timers);
break;
}
}
- timers_map_.insert(std::pair<uint64_t,uint32_t>(expiration, interestSeg));
+ timers_map_.insert(
+ std::pair<uint64_t, uint32_t>(expiration, interestSeg));
it_seqs->second = expiration;
- }else{
- //nothing to do here
- return;
+ } else {
+ // nothing to do here
+ return;
}
- }else{
+ } else {
// add the new seq
- timers_map_.insert(std::pair<uint64_t,uint32_t>(expiration, interestSeg));
- seqs_map_.insert(std::pair<uint32_t,uint64_t>(interestSeg, expiration));
+ timers_map_.insert(
+ std::pair<uint64_t, uint32_t>(expiration, interestSeg));
+ seqs_map_.insert(std::pair<uint32_t, uint64_t>(interestSeg, expiration));
}
- //here we have at least one interest in the queue, we need to start or
- //update the timer
- if(!timer_on_){
- //set timeout
+ // here we have at least one interest in the queue, we need to start or
+ // update the timer
+ if (!timer_on_) {
+ // set timeout
timer_on_ = true;
scheduleCacheTimer(timers_map_.begin()->first - now);
} else {
- //re-schedule the timer because a new interest will expires sooner
- if(next_timer > timers_map_.begin()->first){
+ // re-schedule the timer because a new interest will expires sooner
+ if (next_timer > timers_map_.begin()->first) {
interests_cache_timer_->cancel();
scheduleCacheTimer(timers_map_.begin()->first - now);
}
@@ -292,44 +293,43 @@ void RTCProducerSocket::onInterest(Interest::Ptr &&interest) {
(double)packetsProductionRate_.load()));
if (interestSeg < currentSeg_.load() ||
- interestSeg > (max_gap + currentSeg_.load())) {
+ interestSeg > (max_gap + currentSeg_.load())) {
sendNack(interestSeg);
}
// else drop packet
}
-void RTCProducerSocket::scheduleCacheTimer(uint64_t wait){
- interests_cache_timer_->expires_from_now(
- std::chrono::milliseconds(wait));
+void RTCProducerSocket::scheduleCacheTimer(uint64_t wait) {
+ interests_cache_timer_->expires_from_now(std::chrono::milliseconds(wait));
interests_cache_timer_->async_wait([this](std::error_code ec) {
- if (ec) return;
- interestCacheTimer();
- });
+ if (ec) return;
+ interestCacheTimer();
+ });
}
-void RTCProducerSocket::interestCacheTimer(){
+void RTCProducerSocket::interestCacheTimer() {
uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
utils::SpinLock::Acquire locked(interests_cache_lock_);
- for(auto it_timers = timers_map_.begin(); it_timers != timers_map_.end();){
+ for (auto it_timers = timers_map_.begin(); it_timers != timers_map_.end();) {
uint64_t expire = it_timers->first;
- if(expire <= now){
+ if (expire <= now) {
uint32_t seq = it_timers->second;
sendNack(seq);
- //remove the interest from the other map
+ // remove the interest from the other map
seqs_map_.erase(seq);
it_timers = timers_map_.erase(it_timers);
- }else{
- //stop, we are done!
+ } else {
+ // stop, we are done!
break;
}
}
- if(timers_map_.empty()){
+ if (timers_map_.empty()) {
timer_on_ = false;
- }else{
+ } else {
timer_on_ = true;
scheduleCacheTimer(timers_map_.begin()->first - now);
}
@@ -351,7 +351,7 @@ void RTCProducerSocket::sendNack(uint32_t sequence) {
nack.setLifetime(0);
nack.setPathLabel(prodLabel_);
- if (on_content_object_output_ != VOID_HANDLER) {
+ if (on_content_object_output_) {
on_content_object_output_(*this, nack);
}