diff options
author | Alberto Compagno <acompagn+fdio@cisco.com> | 2019-10-15 18:08:41 +0200 |
---|---|---|
committer | Alberto Compagno <acompagn+fdio@cisco.com> | 2019-10-22 11:01:41 +0200 |
commit | 755c6833ae2d2eee87e80ed3b84c75e968f48c46 (patch) | |
tree | 653345beb889acabc83b3b3b03e849fa34b1baac /libtransport/src | |
parent | 7204bac00804448a797d4e76ced04a3b84d0d741 (diff) |
[HICN-328] Reworking setSocketOption and getSocketOption to be thread-safe
Change-Id: Ie22572822b9ac1e6c300fd7982035c799546bd76
Signed-off-by: Alberto Compagno <acompagn+fdio@cisco.com>
Diffstat (limited to 'libtransport/src')
17 files changed, 1755 insertions, 1365 deletions
diff --git a/libtransport/src/hicn/transport/interfaces/CMakeLists.txt b/libtransport/src/hicn/transport/interfaces/CMakeLists.txt index a5cca78a6..0c2c73623 100644 --- a/libtransport/src/hicn/transport/interfaces/CMakeLists.txt +++ b/libtransport/src/hicn/transport/interfaces/CMakeLists.txt @@ -28,6 +28,7 @@ list(APPEND SOURCE_FILES ${CMAKE_CURRENT_SOURCE_DIR}/rtc_socket_producer.cc ${CMAKE_CURRENT_SOURCE_DIR}/socket_producer.cc ${CMAKE_CURRENT_SOURCE_DIR}/socket_consumer.cc + ${CMAKE_CURRENT_SOURCE_DIR}/callbacks.cc ) set(SOURCE_FILES ${SOURCE_FILES} PARENT_SCOPE) diff --git a/libtransport/src/hicn/transport/interfaces/callbacks.cc b/libtransport/src/hicn/transport/interfaces/callbacks.cc new file mode 100644 index 000000000..af470898c --- /dev/null +++ b/libtransport/src/hicn/transport/interfaces/callbacks.cc @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "callbacks.h" + +namespace transport { + +namespace interface { + +nullptr_t VOID_HANDLER = nullptr; + +} // namespace interface + +} // namespace transport
\ No newline at end of file diff --git a/libtransport/src/hicn/transport/interfaces/callbacks.h b/libtransport/src/hicn/transport/interfaces/callbacks.h index 24f47eb75..41b6213fe 100644 --- a/libtransport/src/hicn/transport/interfaces/callbacks.h +++ b/libtransport/src/hicn/transport/interfaces/callbacks.h @@ -18,6 +18,8 @@ #include <functional> #include <system_error> +#include <hicn/transport/core/facade.h> + namespace utils { class MemBuf; } @@ -105,6 +107,8 @@ using ProducerContentObjectCallback = using ProducerInterestCallback = std::function<void(ProducerSocket &, core::Interest &)>; +extern nullptr_t VOID_HANDLER; + } // namespace interface } // namespace transport
\ No newline at end of file diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc index e7dc98868..c1a45ebb7 100644 --- a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc +++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc @@ -13,6 +13,7 @@ * limitations under the License. */ +#include <hicn/transport/interfaces/callbacks.h> #include <hicn/transport/interfaces/rtc_socket_producer.h> #include <stdlib.h> #include <time.h> @@ -31,9 +32,10 @@ #define HICN_MAX_DATA_SEQ 0xefffffff -//slow production rate param -#define MIN_PRODUCTION_RATE 10 // in pacekts per sec. this value is computed - // through experiments +// slow production rate param +#define MIN_PRODUCTION_RATE \ + 10 // in pacekts per sec. this value is computed + // through experiments #define LIFETIME_FRACTION 0.5 // NACK HEADER @@ -63,13 +65,12 @@ RTCProducerSocket::RTCProducerSocket(asio::io_service &io_service) bytesProductionRate_(INIT_PACKET_PRODUCTION_RATE * 1400), packetsProductionRate_(INIT_PACKET_PRODUCTION_RATE), perSecondFactor_(MILLI_IN_A_SEC / STATS_INTERVAL_DURATION), - timer_on_(false){ + timer_on_(false) { srand((unsigned int)time(NULL)); prodLabel_ = ((rand() % 255) << 24UL); - interests_cache_timer_ = std::make_unique<asio::steady_timer>( - this->getIoService()); - round_timer_ = std::make_unique<asio::steady_timer>( - this->getIoService()); + interests_cache_timer_ = + std::make_unique<asio::steady_timer>(this->getIoService()); + round_timer_ = std::make_unique<asio::steady_timer>(this->getIoService()); setSocketOption(GeneralTransportOptions::OUTPUT_BUFFER_SIZE, 10000U); scheduleRoundTimer(); } @@ -82,13 +83,12 @@ RTCProducerSocket::RTCProducerSocket() bytesProductionRate_(INIT_PACKET_PRODUCTION_RATE * 1400), packetsProductionRate_(INIT_PACKET_PRODUCTION_RATE), perSecondFactor_(MILLI_IN_A_SEC / STATS_INTERVAL_DURATION), - timer_on_(false){ + timer_on_(false) { srand((unsigned int)time(NULL)); prodLabel_ = ((rand() % 255) << 24UL); - interests_cache_timer_ = std::make_unique<asio::steady_timer>( - this->getIoService()); - round_timer_ = std::make_unique<asio::steady_timer>( - this->getIoService()); + interests_cache_timer_ = + std::make_unique<asio::steady_timer>(this->getIoService()); + round_timer_ = std::make_unique<asio::steady_timer>(this->getIoService()); setSocketOption(GeneralTransportOptions::OUTPUT_BUFFER_SIZE, 10000U); scheduleRoundTimer(); } @@ -113,13 +113,13 @@ void RTCProducerSocket::registerPrefix(const Prefix &producer_namespace) { } } -void RTCProducerSocket::scheduleRoundTimer(){ - round_timer_->expires_from_now( - std::chrono::milliseconds(STATS_INTERVAL_DURATION)); - round_timer_->async_wait([this](std::error_code ec) { - if (ec) return; - updateStats(); - }); +void RTCProducerSocket::scheduleRoundTimer() { + round_timer_->expires_from_now( + std::chrono::milliseconds(STATS_INTERVAL_DURATION)); + round_timer_->async_wait([this](std::error_code ec) { + if (ec) return; + updateStats(); + }); } void RTCProducerSocket::updateStats() { @@ -150,8 +150,8 @@ void RTCProducerSocket::produce(std::unique_ptr<utils::MemBuf> &&buffer) { producedBytes_ += (uint32_t)(buffer_size + headerSize_ + TIMESTAMP_LEN); producedPackets_++; - auto content_object = std::make_shared<ContentObject>( - flowName_.setSuffix(currentSeg_.load())); + auto content_object = + std::make_shared<ContentObject>(flowName_.setSuffix(currentSeg_.load())); auto payload = utils::MemBuf::create(TIMESTAMP_LEN); memcpy(payload->writableData(), &now, TIMESTAMP_LEN); @@ -166,27 +166,26 @@ void RTCProducerSocket::produce(std::unique_ptr<utils::MemBuf> &&buffer) { output_buffer_.insert(std::static_pointer_cast<ContentObject>( content_object->shared_from_this())); - if (on_content_object_in_output_buffer_ != VOID_HANDLER) { + if (on_content_object_in_output_buffer_) { on_content_object_in_output_buffer_(*this, *content_object); } portal_->sendContentObject(*content_object); - if (on_content_object_output_ != VOID_HANDLER) { + if (on_content_object_output_) { on_content_object_output_(*this, *content_object); } uint32_t old_curr = currentSeg_.load(); currentSeg_ = (currentSeg_.load() + 1) % HICN_MAX_DATA_SEQ; - //remove interests from the interest cache if it exists - //this generates nacks that will tell to the consumer - //that a new data packet was produced - if(!seqs_map_.empty()){ + // remove interests from the interest cache if it exists + // this generates nacks that will tell to the consumer + // that a new data packet was produced + if (!seqs_map_.empty()) { utils::SpinLock::Acquire locked(interests_cache_lock_); - for(auto it = seqs_map_.begin(); it != seqs_map_.end(); it++){ - if(it->first != old_curr) - sendNack(it->first); + for (auto it = seqs_map_.begin(); it != seqs_map_.end(); it++) { + if (it->first != old_curr) sendNack(it->first); } seqs_map_.clear(); timers_map_.clear(); @@ -197,15 +196,15 @@ void RTCProducerSocket::onInterest(Interest::Ptr &&interest) { uint32_t interestSeg = interest->getName().getSuffix(); uint32_t lifetime = interest->getLifetime(); - if (on_interest_input_ != VOID_HANDLER) { + if (on_interest_input_) { on_interest_input_(*this, *interest); } uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); + std::chrono::steady_clock::now().time_since_epoch()) + .count(); - if(interestSeg > HICN_MAX_DATA_SEQ){ + if (interestSeg > HICN_MAX_DATA_SEQ) { sendNack(interestSeg); return; } @@ -214,71 +213,73 @@ void RTCProducerSocket::onInterest(Interest::Ptr &&interest) { output_buffer_.find(*interest); if (content_object) { - if (on_interest_satisfied_output_buffer_ != VOID_HANDLER) { + if (on_interest_satisfied_output_buffer_) { on_interest_satisfied_output_buffer_(*this, *interest); } - if (on_content_object_output_ != VOID_HANDLER) { + if (on_content_object_output_) { on_content_object_output_(*this, *content_object); } portal_->sendContentObject(*content_object); return; } else { - if (on_interest_process_ != VOID_HANDLER) { + if (on_interest_process_) { on_interest_process_(*this, *interest); } } // if the production rate is less than MIN_PRODUCTION_RATE we put the // interest in a queue, otherwise we handle it in the usual way - if(packetsProductionRate_.load() < MIN_PRODUCTION_RATE && - interestSeg >= currentSeg_.load()){ - + if (packetsProductionRate_.load() < MIN_PRODUCTION_RATE && + interestSeg >= currentSeg_.load()) { utils::SpinLock::Acquire locked(interests_cache_lock_); uint64_t next_timer = ~0; - if(!timers_map_.empty()){ + if (!timers_map_.empty()) { next_timer = timers_map_.begin()->first; } uint64_t expiration = now + (lifetime * LIFETIME_FRACTION); - //check if the seq number exists already + // check if the seq number exists already auto it_seqs = seqs_map_.find(interestSeg); - if(it_seqs != seqs_map_.end()){ - //the seq already exists - if(expiration < it_seqs->second){ + if (it_seqs != seqs_map_.end()) { + // the seq already exists + if (expiration < it_seqs->second) { // we need to update the timer becasue we got a smaller one // 1) remove the entry from the multimap // 2) update this entry auto range = timers_map_.equal_range(it_seqs->second); - for(auto it_timers = range.first; it_timers != range.second; it_timers++){ - if(it_timers->second == it_seqs->first){ + for (auto it_timers = range.first; it_timers != range.second; + it_timers++) { + if (it_timers->second == it_seqs->first) { timers_map_.erase(it_timers); break; } } - timers_map_.insert(std::pair<uint64_t,uint32_t>(expiration, interestSeg)); + timers_map_.insert( + std::pair<uint64_t, uint32_t>(expiration, interestSeg)); it_seqs->second = expiration; - }else{ - //nothing to do here - return; + } else { + // nothing to do here + return; } - }else{ + } else { // add the new seq - timers_map_.insert(std::pair<uint64_t,uint32_t>(expiration, interestSeg)); - seqs_map_.insert(std::pair<uint32_t,uint64_t>(interestSeg, expiration)); + timers_map_.insert( + std::pair<uint64_t, uint32_t>(expiration, interestSeg)); + seqs_map_.insert(std::pair<uint32_t, uint64_t>(interestSeg, expiration)); } - //here we have at least one interest in the queue, we need to start or - //update the timer - if(!timer_on_){ - //set timeout + // here we have at least one interest in the queue, we need to start or + // update the timer + if (!timer_on_) { + // set timeout timer_on_ = true; scheduleCacheTimer(timers_map_.begin()->first - now); } else { - //re-schedule the timer because a new interest will expires sooner - if(next_timer > timers_map_.begin()->first){ + // re-schedule the timer because a new interest will expires sooner + if (next_timer > timers_map_.begin()->first) { interests_cache_timer_->cancel(); scheduleCacheTimer(timers_map_.begin()->first - now); } @@ -292,44 +293,43 @@ void RTCProducerSocket::onInterest(Interest::Ptr &&interest) { (double)packetsProductionRate_.load())); if (interestSeg < currentSeg_.load() || - interestSeg > (max_gap + currentSeg_.load())) { + interestSeg > (max_gap + currentSeg_.load())) { sendNack(interestSeg); } // else drop packet } -void RTCProducerSocket::scheduleCacheTimer(uint64_t wait){ - interests_cache_timer_->expires_from_now( - std::chrono::milliseconds(wait)); +void RTCProducerSocket::scheduleCacheTimer(uint64_t wait) { + interests_cache_timer_->expires_from_now(std::chrono::milliseconds(wait)); interests_cache_timer_->async_wait([this](std::error_code ec) { - if (ec) return; - interestCacheTimer(); - }); + if (ec) return; + interestCacheTimer(); + }); } -void RTCProducerSocket::interestCacheTimer(){ +void RTCProducerSocket::interestCacheTimer() { uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); + std::chrono::steady_clock::now().time_since_epoch()) + .count(); utils::SpinLock::Acquire locked(interests_cache_lock_); - for(auto it_timers = timers_map_.begin(); it_timers != timers_map_.end();){ + for (auto it_timers = timers_map_.begin(); it_timers != timers_map_.end();) { uint64_t expire = it_timers->first; - if(expire <= now){ + if (expire <= now) { uint32_t seq = it_timers->second; sendNack(seq); - //remove the interest from the other map + // remove the interest from the other map seqs_map_.erase(seq); it_timers = timers_map_.erase(it_timers); - }else{ - //stop, we are done! + } else { + // stop, we are done! break; } } - if(timers_map_.empty()){ + if (timers_map_.empty()) { timer_on_ = false; - }else{ + } else { timer_on_ = true; scheduleCacheTimer(timers_map_.begin()->first - now); } @@ -351,7 +351,7 @@ void RTCProducerSocket::sendNack(uint32_t sequence) { nack.setLifetime(0); nack.setPathLabel(prodLabel_); - if (on_content_object_output_ != VOID_HANDLER) { + if (on_content_object_output_) { on_content_object_output_(*this, nack); } diff --git a/libtransport/src/hicn/transport/interfaces/socket.h b/libtransport/src/hicn/transport/interfaces/socket.h index 90f6a3ef6..f0194880a 100644 --- a/libtransport/src/hicn/transport/interfaces/socket.h +++ b/libtransport/src/hicn/transport/interfaces/socket.h @@ -27,8 +27,6 @@ #define SOCKET_OPTION_NOT_SET 3 #define SOCKET_OPTION_DEFAULT 12345 -#define VOID_HANDLER 0 - namespace transport { namespace interface { diff --git a/libtransport/src/hicn/transport/interfaces/socket_consumer.cc b/libtransport/src/hicn/transport/interfaces/socket_consumer.cc index e1afd2161..6eae23c85 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_consumer.cc +++ b/libtransport/src/hicn/transport/interfaces/socket_consumer.cc @@ -45,7 +45,6 @@ ConsumerSocket::ConsumerSocket(int protocol, asio::io_service &io_service) rate_estimation_alpha_(default_values::rate_alpha), rate_estimation_observer_(nullptr), rate_estimation_choice_(0), - is_async_(false), verifier_(std::make_shared<utils::Verifier>()), verify_signature_(false), on_interest_output_(VOID_HANDLER), @@ -58,8 +57,8 @@ ConsumerSocket::ConsumerSocket(int protocol, asio::io_service &io_service) stats_summary_(VOID_HANDLER), read_callback_(nullptr), virtual_download_(false), - rtt_stats_(false), - timer_interval_milliseconds_(0) { + timer_interval_milliseconds_(0), + guard_raaqm_params_() { switch (protocol) { case TransportProtocolAlgorithms::CBR: transport_protocol_ = std::make_unique<CbrTransportProtocol>(this); @@ -88,7 +87,6 @@ int ConsumerSocket::consume(const Name &name) { network_name_ = name; network_name_.setSuffix(0); - is_async_ = false; transport_protocol_->start(); @@ -100,7 +98,6 @@ int ConsumerSocket::asyncConsume(const Name &name) { async_downloader_.add([this, name]() { network_name_ = std::move(name); network_name_.setSuffix(0); - is_async_ = true; transport_protocol_->start(); }); } @@ -108,20 +105,6 @@ int ConsumerSocket::asyncConsume(const Name &name) { return CONSUMER_RUNNING; } -void ConsumerSocket::asyncSendInterest(Interest::Ptr &&interest, - Portal::ConsumerCallback *callback) { - if (!async_downloader_.stopped()) { - // TODO Workaround, to be fixed! - auto i = interest.release(); - async_downloader_.add([this, i, callback]() mutable { - Interest::Ptr _interest(i); - portal_->setConsumerCallback(callback); - portal_->sendInterest(std::move(_interest)); - portal_->runEventsLoop(); - }); - } -} - void ConsumerSocket::stop() { if (transport_protocol_->isRunning()) { transport_protocol_->stop(); @@ -138,6 +121,702 @@ asio::io_service &ConsumerSocket::getIoService() { return portal_->getIoService(); } +// If the thread calling lambda_func is not the same of io_service, this +// function reschedule the function on it +template <typename Lambda, typename arg2> +int ConsumerSocket::rescheduleOnIOService(int socket_option_key, + arg2 socket_option_value, + Lambda lambda_func) { + // To enforce type check + std::function<int(int, arg2)> func = lambda_func; + int result = SOCKET_OPTION_SET; + if (transport_protocol_->isRunning()) { + std::mutex mtx; + /* Condition variable for the wait */ + std::condition_variable cv; + bool done = false; + io_service_.dispatch([this, &socket_option_key, &socket_option_value, &mtx, + &cv, &result, &done, &func]() { + std::unique_lock<std::mutex> lck(mtx); + done = true; + result = func(socket_option_key, socket_option_value); + }); + std::unique_lock<std::mutex> lck(mtx); + if (!done) { + cv.wait(lck); + } + } else { + result = func(socket_option_key, socket_option_value); + } + + return result; +} + +int ConsumerSocket::setSocketOption(int socket_option_key, + ReadCallback *socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, ReadCallback *socket_option_value) -> int { + switch (socket_option_key) { + case ConsumerCallbacksOptions::READ_CALLBACK: + read_callback_ = socket_option_value; + break; + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; + }); +} + +int ConsumerSocket::getSocketOption(int socket_option_key, + ReadCallback **socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, ReadCallback **socket_option_value) -> int { + switch (socket_option_key) { + case ConsumerCallbacksOptions::READ_CALLBACK: + *socket_option_value = read_callback_; + break; + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; + }); +} + +int ConsumerSocket::setSocketOption(int socket_option_key, + double socket_option_value) { + utils::SpinLock::Acquire locked(guard_raaqm_params_); + switch (socket_option_key) { + case MIN_WINDOW_SIZE: + min_window_size_ = socket_option_value; + break; + + case MAX_WINDOW_SIZE: + max_window_size_ = socket_option_value; + break; + + case CURRENT_WINDOW_SIZE: + current_window_size_ = socket_option_value; + break; + + case GAMMA_VALUE: + gamma_ = socket_option_value; + break; + + case BETA_VALUE: + beta_ = socket_option_value; + break; + + case DROP_FACTOR: + drop_factor_ = socket_option_value; + break; + + case MINIMUM_DROP_PROBABILITY: + minimum_drop_probability_ = socket_option_value; + break; + + case RATE_ESTIMATION_ALPHA: + if (socket_option_value >= 0 && socket_option_value < 1) { + rate_estimation_alpha_ = socket_option_value; + } else { + rate_estimation_alpha_ = default_values::alpha; + } + break; + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; +} + +int ConsumerSocket::setSocketOption(int socket_option_key, + uint32_t socket_option_value) { + utils::SpinLock::Acquire locked(guard_raaqm_params_); + switch (socket_option_key) { + case GeneralTransportOptions::MAX_INTEREST_RETX: + max_retransmissions_ = socket_option_value; + break; + + case GeneralTransportOptions::INTEREST_LIFETIME: + interest_lifetime_ = socket_option_value; + break; + + case RateEstimationOptions::RATE_ESTIMATION_BATCH_PARAMETER: + if (socket_option_value > 0) { + rate_estimation_batching_parameter_ = socket_option_value; + } else { + rate_estimation_batching_parameter_ = default_values::batch; + } + break; + + case RateEstimationOptions::RATE_ESTIMATION_CHOICE: + if (socket_option_value > 0) { + rate_estimation_choice_ = socket_option_value; + } else { + rate_estimation_choice_ = default_values::rate_choice; + } + break; + + case GeneralTransportOptions::STATS_INTERVAL: + timer_interval_milliseconds_ = socket_option_value; + break; + + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; +} + +int ConsumerSocket::setSocketOption(int socket_option_key, + std::nullptr_t socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, std::nullptr_t socket_option_value) -> int { + switch (socket_option_key) { + case ConsumerCallbacksOptions::INTEREST_RETRANSMISSION: + if (socket_option_value == VOID_HANDLER) { + on_interest_retransmission_ = VOID_HANDLER; + break; + } + + case ConsumerCallbacksOptions::INTEREST_EXPIRED: + if (socket_option_value == VOID_HANDLER) { + on_interest_timeout_ = VOID_HANDLER; + break; + } + + case ConsumerCallbacksOptions::INTEREST_SATISFIED: + if (socket_option_value == VOID_HANDLER) { + on_interest_satisfied_ = VOID_HANDLER; + break; + } + + case ConsumerCallbacksOptions::INTEREST_OUTPUT: + if (socket_option_value == VOID_HANDLER) { + on_interest_output_ = VOID_HANDLER; + break; + } + + case ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT: + if (socket_option_value == VOID_HANDLER) { + on_content_object_input_ = VOID_HANDLER; + break; + } + + case ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY: + if (socket_option_value == VOID_HANDLER) { + on_content_object_verification_ = VOID_HANDLER; + break; + } + + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; + }); +} + +int ConsumerSocket::setSocketOption(int socket_option_key, + bool socket_option_value) { + int result = SOCKET_OPTION_NOT_SET; + if (!transport_protocol_->isRunning()) { + switch (socket_option_key) { + case OtherOptions::VIRTUAL_DOWNLOAD: + virtual_download_ = socket_option_value; + result = SOCKET_OPTION_SET; + break; + + case GeneralTransportOptions::VERIFY_SIGNATURE: + verify_signature_ = socket_option_value; + result = SOCKET_OPTION_SET; + break; + + default: + return result; + } + } + return result; +} + +int ConsumerSocket::setSocketOption( + int socket_option_key, ConsumerContentObjectCallback socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ConsumerContentObjectCallback socket_option_value) -> int { + switch (socket_option_key) { + case ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT: + on_content_object_input_ = socket_option_value; + break; + + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; + }); +} + +int ConsumerSocket::setSocketOption( + int socket_option_key, + ConsumerContentObjectVerificationCallback socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ConsumerContentObjectVerificationCallback socket_option_value) + -> int { + switch (socket_option_key) { + case ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY: + on_content_object_verification_ = socket_option_value; + break; + + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; + }); +} + +int ConsumerSocket::setSocketOption( + int socket_option_key, ConsumerInterestCallback socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ConsumerInterestCallback socket_option_value) -> int { + switch (socket_option_key) { + case ConsumerCallbacksOptions::INTEREST_RETRANSMISSION: + on_interest_retransmission_ = socket_option_value; + break; + + case ConsumerCallbacksOptions::INTEREST_OUTPUT: + on_interest_output_ = socket_option_value; + break; + + case ConsumerCallbacksOptions::INTEREST_EXPIRED: + on_interest_timeout_ = socket_option_value; + break; + + case ConsumerCallbacksOptions::INTEREST_SATISFIED: + on_interest_satisfied_ = socket_option_value; + break; + + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; + }); +} + +int ConsumerSocket::setSocketOption( + int socket_option_key, ConsumerManifestCallback socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ConsumerManifestCallback socket_option_value) -> int { + switch (socket_option_key) { + case ConsumerCallbacksOptions::MANIFEST_INPUT: + on_manifest_ = socket_option_value; + break; + + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; + }); +} + +int ConsumerSocket::setSocketOption(int socket_option_key, + IcnObserver *socket_option_value) { + utils::SpinLock::Acquire locked(guard_raaqm_params_); + switch (socket_option_key) { + case RateEstimationOptions::RATE_ESTIMATION_OBSERVER: + rate_estimation_observer_ = socket_option_value; + break; + + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; +} + +int ConsumerSocket::setSocketOption( + int socket_option_key, + const std::shared_ptr<utils::Verifier> &socket_option_value) { + int result = SOCKET_OPTION_NOT_SET; + if (!transport_protocol_->isRunning()) { + switch (socket_option_key) { + case GeneralTransportOptions::VERIFIER: + verifier_ = socket_option_value; + result = SOCKET_OPTION_SET; + break; + default: + return result; + } + } + + return result; +} + +int ConsumerSocket::setSocketOption(int socket_option_key, + const std::string &socket_option_value) { + int result = SOCKET_OPTION_NOT_SET; + if (!transport_protocol_->isRunning()) { + switch (socket_option_key) { + case GeneralTransportOptions::CERTIFICATE: + key_id_ = verifier_->addKeyFromCertificate(socket_option_value); + + if (key_id_ != nullptr) { + result = SOCKET_OPTION_SET; + } + break; + + case DataLinkOptions::OUTPUT_INTERFACE: + output_interface_ = socket_option_value; + portal_->setOutputInterface(output_interface_); + result = SOCKET_OPTION_SET; + break; + + default: + return result; + } + } + return result; +} + +int ConsumerSocket::setSocketOption(int socket_option_key, + ConsumerTimerCallback socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ConsumerTimerCallback socket_option_value) -> int { + switch (socket_option_key) { + case ConsumerCallbacksOptions::STATS_SUMMARY: + stats_summary_ = socket_option_value; + break; + + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; + }); +} + +int ConsumerSocket::getSocketOption(int socket_option_key, + double &socket_option_value) { + utils::SpinLock::Acquire locked(guard_raaqm_params_); + switch (socket_option_key) { + case GeneralTransportOptions::MIN_WINDOW_SIZE: + socket_option_value = min_window_size_; + break; + + case GeneralTransportOptions::MAX_WINDOW_SIZE: + socket_option_value = max_window_size_; + break; + + case GeneralTransportOptions::CURRENT_WINDOW_SIZE: + socket_option_value = current_window_size_; + break; + + // RAAQM parameters + + case RaaqmTransportOptions::GAMMA_VALUE: + socket_option_value = gamma_; + break; + + case RaaqmTransportOptions::BETA_VALUE: + socket_option_value = beta_; + break; + + case RaaqmTransportOptions::DROP_FACTOR: + socket_option_value = drop_factor_; + break; + + case RaaqmTransportOptions::MINIMUM_DROP_PROBABILITY: + socket_option_value = minimum_drop_probability_; + break; + + case RateEstimationOptions::RATE_ESTIMATION_ALPHA: + socket_option_value = rate_estimation_alpha_; + break; + + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; +} + +int ConsumerSocket::getSocketOption(int socket_option_key, + uint32_t &socket_option_value) { + utils::SpinLock::Acquire locked(guard_raaqm_params_); + switch (socket_option_key) { + case GeneralTransportOptions::MAX_INTEREST_RETX: + socket_option_value = max_retransmissions_; + break; + + case GeneralTransportOptions::INTEREST_LIFETIME: + socket_option_value = interest_lifetime_; + break; + + case RaaqmTransportOptions::SAMPLE_NUMBER: + socket_option_value = sample_number_; + break; + + case RateEstimationOptions::RATE_ESTIMATION_BATCH_PARAMETER: + socket_option_value = rate_estimation_batching_parameter_; + break; + + case RateEstimationOptions::RATE_ESTIMATION_CHOICE: + socket_option_value = rate_estimation_choice_; + break; + + case GeneralTransportOptions::STATS_INTERVAL: + socket_option_value = timer_interval_milliseconds_; + break; + + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; +} + +int ConsumerSocket::getSocketOption(int socket_option_key, + bool &socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::RUNNING: + socket_option_value = transport_protocol_->isRunning(); + break; + + case OtherOptions::VIRTUAL_DOWNLOAD: + socket_option_value = virtual_download_; + break; + + case GeneralTransportOptions::VERIFY_SIGNATURE: + socket_option_value = verify_signature_; + break; + + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; +} + +int ConsumerSocket::getSocketOption(int socket_option_key, + Name **socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::NETWORK_NAME: + *socket_option_value = &network_name_; + break; + + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; +} + +int ConsumerSocket::getSocketOption( + int socket_option_key, + ConsumerContentObjectCallback **socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ConsumerContentObjectCallback **socket_option_value) -> int { + switch (socket_option_key) { + case ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT: + *socket_option_value = &on_content_object_input_; + break; + + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; + }); +} + +int ConsumerSocket::getSocketOption( + int socket_option_key, + ConsumerContentObjectVerificationCallback **socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ConsumerContentObjectVerificationCallback **socket_option_value) + -> int { + switch (socket_option_key) { + case ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY: + *socket_option_value = &on_content_object_verification_; + break; + + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; + }); +} + +int ConsumerSocket::getSocketOption( + int socket_option_key, ConsumerInterestCallback **socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ConsumerInterestCallback **socket_option_value) -> int { + switch (socket_option_key) { + case ConsumerCallbacksOptions::INTEREST_RETRANSMISSION: + *socket_option_value = &on_interest_retransmission_; + break; + + case ConsumerCallbacksOptions::INTEREST_OUTPUT: + *socket_option_value = &on_interest_output_; + break; + + case ConsumerCallbacksOptions::INTEREST_EXPIRED: + *socket_option_value = &on_interest_timeout_; + break; + + case ConsumerCallbacksOptions::INTEREST_SATISFIED: + *socket_option_value = &on_interest_satisfied_; + break; + + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; + }); +} + +int ConsumerSocket::getSocketOption( + int socket_option_key, ConsumerManifestCallback **socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ConsumerManifestCallback **socket_option_value) -> int { + switch (socket_option_key) { + case ConsumerCallbacksOptions::MANIFEST_INPUT: + *socket_option_value = &on_manifest_; + break; + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; + }); +} + +int ConsumerSocket::getSocketOption( + int socket_option_key, std::shared_ptr<Portal> &socket_option_value) { + switch (socket_option_key) { + case PORTAL: + socket_option_value = portal_; + break; + + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; +} + +int ConsumerSocket::getSocketOption(int socket_option_key, + IcnObserver **socket_option_value) { + utils::SpinLock::Acquire locked(guard_raaqm_params_); + switch (socket_option_key) { + case RateEstimationOptions::RATE_ESTIMATION_OBSERVER: + *socket_option_value = (rate_estimation_observer_); + break; + + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; +} + +int ConsumerSocket::getSocketOption( + int socket_option_key, + std::shared_ptr<utils::Verifier> &socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::VERIFIER: + socket_option_value = verifier_; + break; + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; +} + +int ConsumerSocket::getSocketOption(int socket_option_key, + std::string &socket_option_value) { + switch (socket_option_key) { + case DataLinkOptions::OUTPUT_INTERFACE: + socket_option_value = output_interface_; + break; + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; +} + +int ConsumerSocket::getSocketOption( + int socket_option_key, ConsumerTimerCallback **socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ConsumerTimerCallback **socket_option_value) -> int { + switch (socket_option_key) { + case ConsumerCallbacksOptions::STATS_SUMMARY: + *socket_option_value = &stats_summary_; + break; + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; + }); +} + } // namespace interface } // end namespace transport
\ No newline at end of file diff --git a/libtransport/src/hicn/transport/interfaces/socket_consumer.h b/libtransport/src/hicn/transport/interfaces/socket_consumer.h index 8f7a9718c..e3620b269 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_consumer.h +++ b/libtransport/src/hicn/transport/interfaces/socket_consumer.h @@ -179,20 +179,6 @@ class ConsumerSocket : public BaseSocket { int asyncConsume(const Name &name); /** - * Send an interest asynchronously in another thread, which is the same used - * for asyncConsume. - * - * @param interest - An Interest::Ptr to the interest. Notice that the - * application looses the ownership of the interest, which is transferred to - * the library itself. - * @param callback - A ConsumerCallback containing the events to be trigger in - * case of timeout or content reception. - * - */ - void asyncSendInterest(Interest::Ptr &&interest, - Portal::ConsumerCallback *callback); - - /** * Stops the consumer socket. If several downloads are queued (using * asyncConsume), this call stops just the current one. */ @@ -211,595 +197,94 @@ class ConsumerSocket : public BaseSocket { */ asio::io_service &getIoService() override; - TRANSPORT_ALWAYS_INLINE int setSocketOption( - int socket_option_key, ReadCallback *socket_option_value) { - switch (socket_option_key) { - case ConsumerCallbacksOptions::READ_CALLBACK: - read_callback_ = socket_option_value; - break; - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } - - TRANSPORT_ALWAYS_INLINE int getSocketOption( - int socket_option_key, ReadCallback **socket_option_value) { - switch (socket_option_key) { - case ConsumerCallbacksOptions::READ_CALLBACK: - *socket_option_value = read_callback_; - break; - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } - - TRANSPORT_ALWAYS_INLINE int setSocketOption(int socket_option_key, - double socket_option_value) { - switch (socket_option_key) { - case MIN_WINDOW_SIZE: - min_window_size_ = socket_option_value; - break; - - case MAX_WINDOW_SIZE: - max_window_size_ = socket_option_value; - break; - - case CURRENT_WINDOW_SIZE: - current_window_size_ = socket_option_value; - break; - - case GAMMA_VALUE: - gamma_ = socket_option_value; - break; - - case BETA_VALUE: - beta_ = socket_option_value; - break; - - case DROP_FACTOR: - drop_factor_ = socket_option_value; - break; - - case MINIMUM_DROP_PROBABILITY: - minimum_drop_probability_ = socket_option_value; - break; - - case RATE_ESTIMATION_ALPHA: - if (socket_option_value >= 0 && socket_option_value < 1) { - rate_estimation_alpha_ = socket_option_value; - } else { - rate_estimation_alpha_ = default_values::alpha; - } - break; - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } - - TRANSPORT_ALWAYS_INLINE int setSocketOption(int socket_option_key, - uint32_t socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::INPUT_BUFFER_SIZE: - input_buffer_size_ = socket_option_value; - break; - - case GeneralTransportOptions::OUTPUT_BUFFER_SIZE: - output_buffer_size_ = socket_option_value; - break; - - case GeneralTransportOptions::MAX_INTEREST_RETX: - max_retransmissions_ = socket_option_value; - break; - - case GeneralTransportOptions::INTEREST_LIFETIME: - interest_lifetime_ = socket_option_value; - break; - - case ConsumerCallbacksOptions::INTEREST_RETRANSMISSION: - if (socket_option_value == VOID_HANDLER) { - on_interest_retransmission_ = VOID_HANDLER; - break; - } - - case ConsumerCallbacksOptions::INTEREST_EXPIRED: - if (socket_option_value == VOID_HANDLER) { - on_interest_timeout_ = VOID_HANDLER; - break; - } - - case ConsumerCallbacksOptions::INTEREST_SATISFIED: - if (socket_option_value == VOID_HANDLER) { - on_interest_satisfied_ = VOID_HANDLER; - break; - } - - case ConsumerCallbacksOptions::INTEREST_OUTPUT: - if (socket_option_value == VOID_HANDLER) { - on_interest_output_ = VOID_HANDLER; - break; - } - - case ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT: - if (socket_option_value == VOID_HANDLER) { - on_content_object_input_ = VOID_HANDLER; - break; - } - - case ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY: - if (socket_option_value == VOID_HANDLER) { - on_content_object_verification_ = VOID_HANDLER; - break; - } - - case RateEstimationOptions::RATE_ESTIMATION_BATCH_PARAMETER: - if (socket_option_value > 0) { - rate_estimation_batching_parameter_ = socket_option_value; - } else { - rate_estimation_batching_parameter_ = default_values::batch; - } - break; - - case RateEstimationOptions::RATE_ESTIMATION_CHOICE: - if (socket_option_value > 0) { - rate_estimation_choice_ = socket_option_value; - } else { - rate_estimation_choice_ = default_values::rate_choice; - } - break; - - case GeneralTransportOptions::STATS_INTERVAL: - timer_interval_milliseconds_ = socket_option_value; - break; - - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } - - TRANSPORT_ALWAYS_INLINE int setSocketOption(int socket_option_key, - bool socket_option_value) { - switch (socket_option_key) { - case OtherOptions::VIRTUAL_DOWNLOAD: - virtual_download_ = socket_option_value; - break; - - case RaaqmTransportOptions::RTT_STATS: - rtt_stats_ = socket_option_value; - break; - - case GeneralTransportOptions::VERIFY_SIGNATURE: - verify_signature_ = socket_option_value; - break; - - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } - - TRANSPORT_ALWAYS_INLINE int setSocketOption(int socket_option_key, - Name *socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::NETWORK_NAME: - network_name_ = *socket_option_value; - break; - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } - - TRANSPORT_ALWAYS_INLINE int setSocketOption( - int socket_option_key, - ConsumerContentObjectCallback socket_option_value) { - switch (socket_option_key) { - case ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT: - on_content_object_input_ = socket_option_value; - break; + virtual int setSocketOption(int socket_option_key, + ReadCallback *socket_option_value); + + virtual int getSocketOption(int socket_option_key, + ReadCallback **socket_option_value); - default: - return SOCKET_OPTION_NOT_SET; - } + virtual int setSocketOption(int socket_option_key, + double socket_option_value); - return SOCKET_OPTION_SET; - } + virtual int setSocketOption(int socket_option_key, + uint32_t socket_option_value); - TRANSPORT_ALWAYS_INLINE int setSocketOption( + virtual int setSocketOption(int socket_option_key, + std::nullptr_t socket_option_value); + + virtual int setSocketOption(int socket_option_key, bool socket_option_value); + + virtual int setSocketOption( + int socket_option_key, ConsumerContentObjectCallback socket_option_value); + + virtual int setSocketOption( int socket_option_key, - ConsumerContentObjectVerificationCallback socket_option_value) { - switch (socket_option_key) { - case ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY: - on_content_object_verification_ = socket_option_value; - break; - - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } - - TRANSPORT_ALWAYS_INLINE int setSocketOption( - int socket_option_key, ConsumerInterestCallback socket_option_value) { - switch (socket_option_key) { - case ConsumerCallbacksOptions::INTEREST_RETRANSMISSION: - on_interest_retransmission_ = socket_option_value; - break; - - case ConsumerCallbacksOptions::INTEREST_OUTPUT: - on_interest_output_ = socket_option_value; - break; - - case ConsumerCallbacksOptions::INTEREST_EXPIRED: - on_interest_timeout_ = socket_option_value; - break; - - case ConsumerCallbacksOptions::INTEREST_SATISFIED: - on_interest_satisfied_ = socket_option_value; - break; - - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } - - TRANSPORT_ALWAYS_INLINE int setSocketOption( - int socket_option_key, ConsumerManifestCallback socket_option_value) { - switch (socket_option_key) { - case ConsumerCallbacksOptions::MANIFEST_INPUT: - on_manifest_ = socket_option_value; - break; - - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } - - TRANSPORT_ALWAYS_INLINE int setSocketOption( - int socket_option_key, IcnObserver *socket_option_value) { - switch (socket_option_key) { - case RateEstimationOptions::RATE_ESTIMATION_OBSERVER: - rate_estimation_observer_ = socket_option_value; - break; - - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } - - TRANSPORT_ALWAYS_INLINE int setSocketOption( + ConsumerContentObjectVerificationCallback socket_option_value); + + virtual int setSocketOption(int socket_option_key, + ConsumerInterestCallback socket_option_value); + + virtual int setSocketOption(int socket_option_key, + ConsumerManifestCallback socket_option_value); + + virtual int setSocketOption(int socket_option_key, + IcnObserver *socket_option_value); + + virtual int setSocketOption( int socket_option_key, - const std::shared_ptr<utils::Verifier> &socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::VERIFIER: - verifier_ = socket_option_value; - break; - - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } - - TRANSPORT_ALWAYS_INLINE int setSocketOption( - int socket_option_key, const std::string &socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::CERTIFICATE: - key_id_ = verifier_->addKeyFromCertificate(socket_option_value); - - if (key_id_ != nullptr) { - break; - } - - case DataLinkOptions::OUTPUT_INTERFACE: - output_interface_ = socket_option_value; - portal_->setOutputInterface(output_interface_); - break; - - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } - - TRANSPORT_ALWAYS_INLINE int setSocketOption( - int socket_option_key, ConsumerTimerCallback socket_option_value) { - switch (socket_option_key) { - case ConsumerCallbacksOptions::STATS_SUMMARY: - stats_summary_ = socket_option_value; - break; - - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } - - TRANSPORT_ALWAYS_INLINE int getSocketOption(int socket_option_key, - double &socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::MIN_WINDOW_SIZE: - socket_option_value = min_window_size_; - break; - - case GeneralTransportOptions::MAX_WINDOW_SIZE: - socket_option_value = max_window_size_; - break; - - case GeneralTransportOptions::CURRENT_WINDOW_SIZE: - socket_option_value = current_window_size_; - break; - - // RAAQM parameters - - case RaaqmTransportOptions::GAMMA_VALUE: - socket_option_value = gamma_; - break; - - case RaaqmTransportOptions::BETA_VALUE: - socket_option_value = beta_; - break; - - case RaaqmTransportOptions::DROP_FACTOR: - socket_option_value = drop_factor_; - break; - - case RaaqmTransportOptions::MINIMUM_DROP_PROBABILITY: - socket_option_value = minimum_drop_probability_; - break; - - case RateEstimationOptions::RATE_ESTIMATION_ALPHA: - socket_option_value = rate_estimation_alpha_; - break; - - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } - - TRANSPORT_ALWAYS_INLINE int getSocketOption(int socket_option_key, - uint32_t &socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::INPUT_BUFFER_SIZE: - socket_option_value = (uint32_t)input_buffer_size_; - break; - - case GeneralTransportOptions::OUTPUT_BUFFER_SIZE: - socket_option_value = (uint32_t)output_buffer_size_; - break; - - case GeneralTransportOptions::MAX_INTEREST_RETX: - socket_option_value = max_retransmissions_; - break; - - case GeneralTransportOptions::INTEREST_LIFETIME: - socket_option_value = interest_lifetime_; - break; - - case RaaqmTransportOptions::SAMPLE_NUMBER: - socket_option_value = sample_number_; - break; - - case RateEstimationOptions::RATE_ESTIMATION_BATCH_PARAMETER: - socket_option_value = rate_estimation_batching_parameter_; - break; - - case RateEstimationOptions::RATE_ESTIMATION_CHOICE: - socket_option_value = rate_estimation_choice_; - break; - - case GeneralTransportOptions::STATS_INTERVAL: - socket_option_value = timer_interval_milliseconds_; - break; - - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } - - TRANSPORT_ALWAYS_INLINE int getSocketOption(int socket_option_key, - bool &socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::ASYNC_MODE: - socket_option_value = is_async_; - break; + const std::shared_ptr<utils::Verifier> &socket_option_value); - case GeneralTransportOptions::RUNNING: - socket_option_value = transport_protocol_->isRunning(); - break; + virtual int setSocketOption(int socket_option_key, + const std::string &socket_option_value); - case OtherOptions::VIRTUAL_DOWNLOAD: - socket_option_value = virtual_download_; - break; - - case RaaqmTransportOptions::RTT_STATS: - socket_option_value = rtt_stats_; - break; + virtual int setSocketOption(int socket_option_key, + ConsumerTimerCallback socket_option_value); - case GeneralTransportOptions::VERIFY_SIGNATURE: - socket_option_value = verify_signature_; - break; + virtual int getSocketOption(int socket_option_key, + double &socket_option_value); - default: - return SOCKET_OPTION_NOT_GET; - } + virtual int getSocketOption(int socket_option_key, + uint32_t &socket_option_value); - return SOCKET_OPTION_GET; - } - - TRANSPORT_ALWAYS_INLINE int getSocketOption(int socket_option_key, - Name **socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::NETWORK_NAME: - *socket_option_value = &network_name_; - break; - - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } - - TRANSPORT_ALWAYS_INLINE int getSocketOption( - int socket_option_key, - ConsumerContentObjectCallback **socket_option_value) { - switch (socket_option_key) { - case ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT: - *socket_option_value = &on_content_object_input_; - break; + virtual int getSocketOption(int socket_option_key, bool &socket_option_value); - default: - return SOCKET_OPTION_NOT_GET; - } + virtual int getSocketOption(int socket_option_key, + Name **socket_option_value); - return SOCKET_OPTION_GET; - } + virtual int getSocketOption( + int socket_option_key, + ConsumerContentObjectCallback **socket_option_value); - TRANSPORT_ALWAYS_INLINE int getSocketOption( + virtual int getSocketOption( int socket_option_key, - ConsumerContentObjectVerificationCallback **socket_option_value) { - switch (socket_option_key) { - case ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY: - *socket_option_value = &on_content_object_verification_; - break; - - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } - - TRANSPORT_ALWAYS_INLINE int getSocketOption( - int socket_option_key, ConsumerInterestCallback **socket_option_value) { - switch (socket_option_key) { - case ConsumerCallbacksOptions::INTEREST_RETRANSMISSION: - *socket_option_value = &on_interest_retransmission_; - break; - - case ConsumerCallbacksOptions::INTEREST_OUTPUT: - *socket_option_value = &on_interest_output_; - break; - - case ConsumerCallbacksOptions::INTEREST_EXPIRED: - *socket_option_value = &on_interest_timeout_; - break; - - case ConsumerCallbacksOptions::INTEREST_SATISFIED: - *socket_option_value = &on_interest_satisfied_; - break; - - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } - - TRANSPORT_ALWAYS_INLINE int getSocketOption( - int socket_option_key, ConsumerManifestCallback **socket_option_value) { - switch (socket_option_key) { - case ConsumerCallbacksOptions::MANIFEST_INPUT: - *socket_option_value = &on_manifest_; - break; - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } - - TRANSPORT_ALWAYS_INLINE int getSocketOption( - int socket_option_key, std::shared_ptr<Portal> &socket_option_value) { - switch (socket_option_key) { - case PORTAL: - socket_option_value = portal_; - break; - - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } - - TRANSPORT_ALWAYS_INLINE int getSocketOption( - int socket_option_key, IcnObserver **socket_option_value) { - switch (socket_option_key) { - case RateEstimationOptions::RATE_ESTIMATION_OBSERVER: - *socket_option_value = (rate_estimation_observer_); - break; - - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } - - TRANSPORT_ALWAYS_INLINE int getSocketOption( + ConsumerContentObjectVerificationCallback **socket_option_value); + + virtual int getSocketOption(int socket_option_key, + ConsumerInterestCallback **socket_option_value); + + virtual int getSocketOption(int socket_option_key, + ConsumerManifestCallback **socket_option_value); + + virtual int getSocketOption(int socket_option_key, + std::shared_ptr<Portal> &socket_option_value); + + virtual int getSocketOption(int socket_option_key, + IcnObserver **socket_option_value); + + virtual int getSocketOption( int socket_option_key, - std::shared_ptr<utils::Verifier> &socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::VERIFIER: - socket_option_value = verifier_; - break; - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } - - TRANSPORT_ALWAYS_INLINE int getSocketOption( - int socket_option_key, std::string &socket_option_value) { - switch (socket_option_key) { - case DataLinkOptions::OUTPUT_INTERFACE: - socket_option_value = output_interface_; - break; - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } - - TRANSPORT_ALWAYS_INLINE int getSocketOption( - int socket_option_key, ConsumerTimerCallback **socket_option_value) { - switch (socket_option_key) { - case ConsumerCallbacksOptions::STATS_SUMMARY: - *socket_option_value = &stats_summary_; - break; - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } + std::shared_ptr<utils::Verifier> &socket_option_value); + + virtual int getSocketOption(int socket_option_key, + std::string &socket_option_value); + + virtual int getSocketOption(int socket_option_key, + ConsumerTimerCallback **socket_option_value); + + protected: + template <typename Lambda, typename arg2> + int rescheduleOnIOService(int socket_option_key, arg2 socket_option_value, + Lambda lambda_func); private: asio::io_service internal_io_service_; @@ -808,6 +293,9 @@ class ConsumerSocket : public BaseSocket { std::shared_ptr<Portal> portal_; utils::EventThread async_downloader_; + // No need to protect from multiple accesses in the async consumer + // The parameter is accessible only with a getSocketOption and + // set from the consume Name network_name_; int interest_lifetime_; @@ -816,8 +304,6 @@ class ConsumerSocket : public BaseSocket { double max_window_size_; double current_window_size_; uint32_t max_retransmissions_; - size_t output_buffer_size_; - size_t input_buffer_size_; // RAAQM Parameters double minimum_drop_probability_; @@ -832,12 +318,10 @@ class ConsumerSocket : public BaseSocket { int rate_estimation_batching_parameter_; int rate_estimation_choice_; - bool is_async_; - // Verification parameters std::shared_ptr<utils::Verifier> verifier_; PARCKeyId *key_id_; - bool verify_signature_; + std::atomic_bool verify_signature_; ConsumerInterestCallback on_interest_retransmission_; ConsumerInterestCallback on_interest_output_; @@ -853,12 +337,13 @@ class ConsumerSocket : public BaseSocket { // Virtual download for traffic generator bool virtual_download_; - bool rtt_stats_; uint32_t timer_interval_milliseconds_; // Transport protocol std::unique_ptr<TransportProtocol> transport_protocol_; + + utils::SpinLock guard_raaqm_params_; }; } // namespace interface diff --git a/libtransport/src/hicn/transport/interfaces/socket_options_default_values.h b/libtransport/src/hicn/transport/interfaces/socket_options_default_values.h index 13029e83a..bcf103b8c 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_options_default_values.h +++ b/libtransport/src/hicn/transport/interfaces/socket_options_default_values.h @@ -31,8 +31,6 @@ static constexpr uint32_t content_object_expiry_time = never_expire_time; // milliseconds -> 50 seconds static constexpr uint32_t content_object_packet_size = 1500; // The ethernet MTU -static constexpr uint32_t producer_socket_input_buffer_size = - 150000; // Interests static constexpr uint32_t producer_socket_output_buffer_size = 150000; // Content Object static constexpr uint32_t log_2_default_buffer_size = 12; diff --git a/libtransport/src/hicn/transport/interfaces/socket_options_keys.h b/libtransport/src/hicn/transport/interfaces/socket_options_keys.h index c21108186..e14f0f412 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_options_keys.h +++ b/libtransport/src/hicn/transport/interfaces/socket_options_keys.h @@ -35,22 +35,21 @@ typedef enum { INTEREST_LIFETIME = 107, CONTENT_OBJECT_EXPIRY_TIME = 108, KEY_LOCATOR = 110, - SIGNATURE_TYPE = 111, - MIN_WINDOW_SIZE = 112, - MAX_WINDOW_SIZE = 113, - CURRENT_WINDOW_SIZE = 114, - ASYNC_MODE = 115, - MAKE_MANIFEST = 116, - PORTAL = 117, - RUNNING = 118, - APPLICATION_BUFFER = 119, - HASH_ALGORITHM = 120, - CRYPTO_SUITE = 121, - IDENTITY = 122, - VERIFIER = 123, - CERTIFICATE = 124, - VERIFY_SIGNATURE = 125, - STATS_INTERVAL = 126 + MIN_WINDOW_SIZE = 111, + MAX_WINDOW_SIZE = 112, + CURRENT_WINDOW_SIZE = 113, + ASYNC_MODE = 114, + MAKE_MANIFEST = 115, + PORTAL = 116, + RUNNING = 117, + APPLICATION_BUFFER = 118, + HASH_ALGORITHM = 119, + CRYPTO_SUITE = 120, + IDENTITY = 121, + VERIFIER = 122, + CERTIFICATE = 123, + VERIFY_SIGNATURE = 124, + STATS_INTERVAL = 125 } GeneralTransportOptions; typedef enum { diff --git a/libtransport/src/hicn/transport/interfaces/socket_producer.cc b/libtransport/src/hicn/transport/interfaces/socket_producer.cc index 9ca004c41..bc93e77c6 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_producer.cc +++ b/libtransport/src/hicn/transport/interfaces/socket_producer.cc @@ -37,10 +37,7 @@ ProducerSocket::ProducerSocket(asio::io_service &io_service) output_buffer_(default_values::producer_socket_output_buffer_size), registration_status_(REGISTRATION_NOT_ATTEMPTED), making_manifest_(false), - signature_type_(SHA_256), hash_algorithm_(HashAlgorithm::SHA_256), - input_buffer_capacity_(default_values::producer_socket_input_buffer_size), - input_buffer_size_(0), on_interest_input_(VOID_HANDLER), on_interest_dropped_input_buffer_(VOID_HANDLER), on_interest_inserted_input_buffer_(VOID_HANDLER), @@ -98,22 +95,30 @@ void ProducerSocket::listen() { void ProducerSocket::passContentObjectToCallbacks( const std::shared_ptr<ContentObject> &content_object) { if (content_object) { - if (on_new_segment_ != VOID_HANDLER) { - on_new_segment_(*this, *content_object); + if (on_new_segment_) { + io_service_.dispatch([this, content_object]() { + on_new_segment_(*this, *content_object); + }); } - if (on_content_object_to_sign_ != VOID_HANDLER) { - on_content_object_to_sign_(*this, *content_object); + if (on_content_object_to_sign_) { + io_service_.dispatch([this, content_object]() { + on_content_object_to_sign_(*this, *content_object); + }); } - if (on_content_object_in_output_buffer_ != VOID_HANDLER) { - on_content_object_in_output_buffer_(*this, *content_object); + if (on_content_object_in_output_buffer_) { + io_service_.dispatch([this, content_object]() { + on_content_object_in_output_buffer_(*this, *content_object); + }); } output_buffer_.insert(content_object); - if (on_content_object_output_ != VOID_HANDLER) { - on_content_object_output_(*this, *content_object); + if (on_content_object_output_) { + io_service_.dispatch([this, content_object]() { + on_content_object_output_(*this, *content_object); + }); } portal_->sendContentObject(*content_object); @@ -121,15 +126,19 @@ void ProducerSocket::passContentObjectToCallbacks( } void ProducerSocket::produce(ContentObject &content_object) { - if (on_content_object_in_output_buffer_ != VOID_HANDLER) { - on_content_object_in_output_buffer_(*this, content_object); + if (on_content_object_in_output_buffer_) { + io_service_.dispatch([this, &content_object]() { + on_content_object_in_output_buffer_(*this, content_object); + }); } output_buffer_.insert(std::static_pointer_cast<ContentObject>( content_object.shared_from_this())); - if (on_content_object_output_ != VOID_HANDLER) { - on_content_object_output_(*this, content_object); + if (on_content_object_output_) { + io_service_.dispatch([this, &content_object]() { + on_content_object_output_(*this, content_object); + }); } portal_->sendContentObject(content_object); @@ -142,6 +151,15 @@ uint32_t ProducerSocket::produce(Name content_name, return 0; } + // Copy the atomic variable to ensure always the same value during the a + // production + std::size_t data_packet_size = data_packet_size_; + uint32_t content_object_expiry_time = content_object_expiry_time_; + HashAlgorithm algo = hash_algorithm_; + bool making_manifest = making_manifest_; + std::shared_ptr<utils::Identity> identity; + getSocketOption(GeneralTransportOptions::IDENTITY, identity); + auto buffer_size = buffer->length(); const std::size_t hash_size = 32; @@ -162,7 +180,7 @@ uint32_t ProducerSocket::produce(Name content_name, std::unique_ptr<utils::CryptoHash> zero_hash; // TODO Manifest may still be used for indexing - if (making_manifest_ && !identity_) { + if (making_manifest && !identity) { throw errors::RuntimeException( "Making manifests without setting producer identity. Aborting."); } @@ -182,18 +200,18 @@ uint32_t ProducerSocket::produce(Name content_name, } format = hf_format; - if (making_manifest_) { + if (making_manifest) { format = hf_format; manifest_header_size = core::Packet::getHeaderSizeFromFormat( - hf_format_ah, identity_->getSignatureLength()); - } else if (identity_) { + hf_format_ah, identity->getSignatureLength()); + } else if (identity) { format = hf_format_ah; - signature_length = identity_->getSignatureLength(); + signature_length = identity->getSignatureLength(); } header_size = core::Packet::getHeaderSizeFromFormat(format, signature_length); - free_space_for_content = data_packet_size_ - header_size; + free_space_for_content = data_packet_size - header_size; uint32_t number_of_segments = uint32_t(std::ceil(double(buffer_size) / double(free_space_for_content))); @@ -204,9 +222,9 @@ uint32_t ProducerSocket::produce(Name content_name, // TODO allocate space for all the headers - if (making_manifest_) { + if (making_manifest) { auto segment_in_manifest = static_cast<float>( - std::floor(double(data_packet_size_ - manifest_header_size - + std::floor(double(data_packet_size - manifest_header_size - ContentObjectManifest::getManifestHeaderSize()) / (4.0 + 32.0)) - 1.0); @@ -219,8 +237,8 @@ uint32_t ProducerSocket::produce(Name content_name, core::ManifestVersion::VERSION_1, core::ManifestType::INLINE_MANIFEST, hash_algorithm_, is_last_manifest, content_name, core::NextSegmentCalculationStrategy::INCREMENTAL, - identity_->getSignatureLength())); - manifest->setLifetime(content_object_expiry_time_); + identity->getSignatureLength())); + manifest->setLifetime(content_object_expiry_time); if (is_last) { manifest->setFinalBlockNumber(final_block_number); @@ -231,21 +249,21 @@ uint32_t ProducerSocket::produce(Name content_name, uint8_t hash[hash_size]; std::memset(hash, 0, hash_size); zero_hash = std::make_unique<utils::CryptoHash>( - hash, hash_size, static_cast<utils::CryptoHashType>(hash_algorithm_)); + hash, hash_size, static_cast<utils::CryptoHashType>(algo)); } for (unsigned int packaged_segments = 0; packaged_segments < number_of_segments; packaged_segments++) { - if (making_manifest_) { + if (making_manifest) { if (manifest->estimateManifestSize(2) > - data_packet_size_ - manifest_header_size) { + data_packet_size - manifest_header_size) { // Add next manifest manifest->addSuffixHash(current_segment, *zero_hash); // Send the current manifest manifest->encode(); - identity_->getSigner().sign(*manifest); + identity->getSigner().sign(*manifest); passContentObjectToCallbacks(manifest); @@ -258,8 +276,8 @@ uint32_t ProducerSocket::produce(Name content_name, core::ManifestType::INLINE_MANIFEST, hash_algorithm_, is_last_manifest, content_name, core::NextSegmentCalculationStrategy::INCREMENTAL, - identity_->getSignatureLength())); - manifest->setLifetime(content_object_expiry_time_); + identity->getSignatureLength())); + manifest->setLifetime(content_object_expiry_time); if (is_last) { manifest->setFinalBlockNumber(final_block_number); } else { @@ -271,7 +289,7 @@ uint32_t ProducerSocket::produce(Name content_name, auto content_object = std::make_shared<ContentObject>( content_name.setSuffix(current_segment), format); - content_object->setLifetime(content_object_expiry_time_); + content_object->setLifetime(content_object_expiry_time); auto b = buffer->cloneOne(); b->trimStart(free_space_for_content * packaged_segments); @@ -280,7 +298,7 @@ uint32_t ProducerSocket::produce(Name content_name, b->append(buffer_size - bytes_segmented); bytes_segmented += (int)(buffer_size - bytes_segmented); - if (is_last && making_manifest_) { + if (is_last && making_manifest) { is_last_manifest = true; } else if (is_last) { content_object->setRst(); @@ -293,19 +311,19 @@ uint32_t ProducerSocket::produce(Name content_name, content_object->appendPayload(std::move(b)); - if (making_manifest_) { + if (making_manifest) { using namespace std::chrono_literals; utils::CryptoHash hash = content_object->computeDigest(hash_algorithm_); manifest->addSuffixHash(current_segment, hash); - } else if (identity_) { - identity_->getSigner().sign(*content_object); + } else if (identity) { + identity->getSigner().sign(*content_object); } current_segment++; passContentObjectToCallbacks(content_object); } - if (making_manifest_) { + if (making_manifest) { if (is_last_manifest) { manifest->setFinalManifest(is_last_manifest); } @@ -315,13 +333,15 @@ uint32_t ProducerSocket::produce(Name content_name, } manifest->encode(); - identity_->getSigner().sign(*manifest); + identity->getSigner().sign(*manifest); passContentObjectToCallbacks(manifest); } - if (on_content_produced_ != VOID_HANDLER) { - on_content_produced_(*this, std::make_error_code(std::errc(0)), - buffer_size); + if (on_content_produced_) { + io_service_.dispatch([this, buffer_size]() { + on_content_produced_(*this, std::make_error_code(std::errc(0)), + buffer_size); + }); } return current_segment - start_offset; @@ -347,7 +367,7 @@ void ProducerSocket::asyncProduce(const Name &suffix, const uint8_t *buf, } void ProducerSocket::onInterest(Interest &interest) { - if (on_interest_input_ != VOID_HANDLER) { + if (on_interest_input_) { on_interest_input_(*this, interest); } @@ -355,22 +375,571 @@ void ProducerSocket::onInterest(Interest &interest) { output_buffer_.find(interest); if (content_object) { - if (on_interest_satisfied_output_buffer_ != VOID_HANDLER) { + if (on_interest_satisfied_output_buffer_) { on_interest_satisfied_output_buffer_(*this, interest); } - if (on_content_object_output_ != VOID_HANDLER) { + if (on_content_object_output_) { on_content_object_output_(*this, *content_object); } portal_->sendContentObject(*content_object); } else { - if (on_interest_process_ != VOID_HANDLER) { + if (on_interest_process_) { on_interest_process_(*this, interest); } } } +// If the thread calling lambda_func is not the same of io_service, this +// function reschedule the function on it +template <typename Lambda, typename arg2> +int ProducerSocket::rescheduleOnIOService(int socket_option_key, + arg2 socket_option_value, + Lambda lambda_func) { + // To enforce type check + std::function<int(int, arg2)> func = lambda_func; + int result = SOCKET_OPTION_SET; + if (listening_thread_.joinable() && + std::this_thread::get_id() != listening_thread_.get_id()) { + std::mutex mtx; + /* Condition variable for the wait */ + std::condition_variable cv; + bool done = false; + io_service_.dispatch([this, &socket_option_key, &socket_option_value, &mtx, + &cv, &result, &done, &func]() { + std::unique_lock<std::mutex> lck(mtx); + done = true; + result = func(socket_option_key, socket_option_value); + }); + std::unique_lock<std::mutex> lck(mtx); + if (!done) { + cv.wait(lck); + } + } else { + result = func(socket_option_key, socket_option_value); + } + + return result; +} + +// If the thread calling lambda_func is not the same of io_service, this +// function reschedule the function on it +template <typename Lambda, typename arg2> +int ProducerSocket::rescheduleOnIOServiceWithReference( + int socket_option_key, arg2 &socket_option_value, Lambda lambda_func) { + // To enforce type check + std::function<int(int, arg2 &)> func = lambda_func; + int result = SOCKET_OPTION_SET; + if (listening_thread_.joinable() && + std::this_thread::get_id() != this->listening_thread_.get_id()) { + std::mutex mtx; + /* Condition variable for the wait */ + std::condition_variable cv; + std::unique_lock<std::mutex> lck(mtx); + bool done = false; + io_service_.dispatch([this, &socket_option_key, &socket_option_value, &mtx, + &cv, &result, &done, &func]() { + std::unique_lock<std::mutex> lck(mtx); + done = true; + result = func(socket_option_key, socket_option_value); + + if (!done) { + cv.wait(lck); + } + }); + } else { + result = func(socket_option_key, socket_option_value); + } + + return result; +} + +int ProducerSocket::setSocketOption(int socket_option_key, + uint32_t socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::DATA_PACKET_SIZE: + if (socket_option_value < default_values::max_content_object_size && + socket_option_value > 0) { + data_packet_size_ = socket_option_value; + break; + } + + case GeneralTransportOptions::OUTPUT_BUFFER_SIZE: + output_buffer_.setLimit(socket_option_value); + break; + + case GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME: + content_object_expiry_time_ = socket_option_value; + break; + + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; +} + +int ProducerSocket::setSocketOption(int socket_option_key, + std::nullptr_t socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ProducerContentObjectCallback socket_option_value) -> int { + switch (socket_option_key) { + case ProducerCallbacksOptions::INTEREST_INPUT: + if (socket_option_value == VOID_HANDLER) { + on_interest_input_ = VOID_HANDLER; + break; + } + + case ProducerCallbacksOptions::INTEREST_DROP: + if (socket_option_value == VOID_HANDLER) { + on_interest_dropped_input_buffer_ = VOID_HANDLER; + break; + } + + case ProducerCallbacksOptions::INTEREST_PASS: + if (socket_option_value == VOID_HANDLER) { + on_interest_inserted_input_buffer_ = VOID_HANDLER; + break; + } + + case ProducerCallbacksOptions::CACHE_HIT: + if (socket_option_value == VOID_HANDLER) { + on_interest_satisfied_output_buffer_ = VOID_HANDLER; + break; + } + + case ProducerCallbacksOptions::CACHE_MISS: + if (socket_option_value == VOID_HANDLER) { + on_interest_process_ = VOID_HANDLER; + break; + } + + case ProducerCallbacksOptions::NEW_CONTENT_OBJECT: + if (socket_option_value == VOID_HANDLER) { + on_new_segment_ = VOID_HANDLER; + break; + } + + case ProducerCallbacksOptions::CONTENT_OBJECT_SIGN: + if (socket_option_value == VOID_HANDLER) { + on_content_object_to_sign_ = VOID_HANDLER; + break; + } + + case ProducerCallbacksOptions::CONTENT_OBJECT_READY: + if (socket_option_value == VOID_HANDLER) { + on_content_object_in_output_buffer_ = VOID_HANDLER; + break; + } + + case ProducerCallbacksOptions::CONTENT_OBJECT_OUTPUT: + if (socket_option_value == VOID_HANDLER) { + on_content_object_output_ = VOID_HANDLER; + break; + } + + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; + }); +} + +int ProducerSocket::setSocketOption(int socket_option_key, + bool socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::MAKE_MANIFEST: + making_manifest_ = socket_option_value; + break; + + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; +} + +int ProducerSocket::setSocketOption(int socket_option_key, + Name *socket_option_value) { + return SOCKET_OPTION_NOT_SET; +} + +int ProducerSocket::setSocketOption(int socket_option_key, + std::list<Prefix> socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::NETWORK_NAME: + served_namespaces_ = socket_option_value; + break; + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; +} + +int ProducerSocket::setSocketOption( + int socket_option_key, ProducerContentObjectCallback socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ProducerContentObjectCallback socket_option_value) -> int { + switch (socket_option_key) { + case ProducerCallbacksOptions::NEW_CONTENT_OBJECT: + on_new_segment_ = socket_option_value; + break; + + case ProducerCallbacksOptions::CONTENT_OBJECT_SIGN: + on_content_object_to_sign_ = socket_option_value; + break; + + case ProducerCallbacksOptions::CONTENT_OBJECT_READY: + on_content_object_in_output_buffer_ = socket_option_value; + break; + + case ProducerCallbacksOptions::CONTENT_OBJECT_OUTPUT: + on_content_object_output_ = socket_option_value; + break; + + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; + }); +} + +int ProducerSocket::setSocketOption( + int socket_option_key, ProducerInterestCallback socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ProducerInterestCallback socket_option_value) -> int { + switch (socket_option_key) { + case ProducerCallbacksOptions::INTEREST_INPUT: + on_interest_input_ = socket_option_value; + break; + + case ProducerCallbacksOptions::INTEREST_DROP: + on_interest_dropped_input_buffer_ = socket_option_value; + break; + + case ProducerCallbacksOptions::INTEREST_PASS: + on_interest_inserted_input_buffer_ = socket_option_value; + break; + + case ProducerCallbacksOptions::CACHE_HIT: + on_interest_satisfied_output_buffer_ = socket_option_value; + break; + + case ProducerCallbacksOptions::CACHE_MISS: + on_interest_process_ = socket_option_value; + break; + + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; + }); +} + +int ProducerSocket::setSocketOption( + int socket_option_key, ProducerContentCallback socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ProducerContentCallback socket_option_value) -> int { + switch (socket_option_key) { + case ProducerCallbacksOptions::CONTENT_PRODUCED: + on_content_produced_ = socket_option_value; + break; + + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; + }); +} + +int ProducerSocket::setSocketOption(int socket_option_key, + HashAlgorithm socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::HASH_ALGORITHM: + hash_algorithm_ = socket_option_value; + break; + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; +} + +int ProducerSocket::setSocketOption(int socket_option_key, + utils::CryptoSuite socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::CRYPTO_SUITE: + crypto_suite_ = socket_option_value; + break; + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; +} + +int ProducerSocket::setSocketOption( + int socket_option_key, + const std::shared_ptr<utils::Identity> &socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::IDENTITY: { + utils::SpinLock::Acquire locked(identity_lock_); + identity_.reset(); + identity_ = socket_option_value; + } break; + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; +} + +int ProducerSocket::setSocketOption(int socket_option_key, + const std::string &socket_option_value) { + switch (socket_option_key) { + case DataLinkOptions::OUTPUT_INTERFACE: + output_interface_ = socket_option_value; + portal_->setOutputInterface(output_interface_); + break; + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; +} + +int ProducerSocket::getSocketOption(int socket_option_key, + uint32_t &socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::OUTPUT_BUFFER_SIZE: + socket_option_value = (uint32_t)output_buffer_.getLimit(); + break; + + case GeneralTransportOptions::DATA_PACKET_SIZE: + socket_option_value = (uint32_t)data_packet_size_; + break; + + case GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME: + socket_option_value = content_object_expiry_time_; + break; + + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_GET; +} + +int ProducerSocket::getSocketOption(int socket_option_key, + bool &socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::MAKE_MANIFEST: + socket_option_value = making_manifest_; + break; + + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; +} + +int ProducerSocket::getSocketOption(int socket_option_key, + std::list<Prefix> &socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::NETWORK_NAME: + socket_option_value = served_namespaces_; + break; + + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; +} + +int ProducerSocket::getSocketOption( + int socket_option_key, + ProducerContentObjectCallback **socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ProducerContentObjectCallback **socket_option_value) -> int { + switch (socket_option_key) { + case ProducerCallbacksOptions::NEW_CONTENT_OBJECT: + *socket_option_value = &on_new_segment_; + break; + + case ProducerCallbacksOptions::CONTENT_OBJECT_SIGN: + *socket_option_value = &on_content_object_to_sign_; + break; + + case ProducerCallbacksOptions::CONTENT_OBJECT_READY: + *socket_option_value = &on_content_object_in_output_buffer_; + break; + + case ProducerCallbacksOptions::CONTENT_OBJECT_OUTPUT: + *socket_option_value = &on_content_object_output_; + break; + + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; + }); +} + +int ProducerSocket::getSocketOption( + int socket_option_key, ProducerContentCallback **socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ProducerContentCallback **socket_option_value) -> int { + switch (socket_option_key) { + case ProducerCallbacksOptions::CONTENT_PRODUCED: + *socket_option_value = &on_content_produced_; + break; + + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; + }); +} + +int ProducerSocket::getSocketOption( + int socket_option_key, ProducerInterestCallback **socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ProducerInterestCallback **socket_option_value) -> int { + switch (socket_option_key) { + case ProducerCallbacksOptions::INTEREST_INPUT: + *socket_option_value = &on_interest_input_; + break; + + case ProducerCallbacksOptions::INTEREST_DROP: + *socket_option_value = &on_interest_dropped_input_buffer_; + break; + + case ProducerCallbacksOptions::INTEREST_PASS: + *socket_option_value = &on_interest_inserted_input_buffer_; + break; + + case CACHE_HIT: + *socket_option_value = &on_interest_satisfied_output_buffer_; + break; + + case CACHE_MISS: + *socket_option_value = &on_interest_process_; + break; + + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; + }); +} + +int ProducerSocket::getSocketOption( + int socket_option_key, std::shared_ptr<Portal> &socket_option_value) { + switch (socket_option_key) { + case PORTAL: + socket_option_value = portal_; + break; + default: + return SOCKET_OPTION_NOT_GET; + ; + } + + return SOCKET_OPTION_GET; +} + +int ProducerSocket::getSocketOption(int socket_option_key, + HashAlgorithm &socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::HASH_ALGORITHM: + socket_option_value = hash_algorithm_; + break; + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; +} + +int ProducerSocket::getSocketOption(int socket_option_key, + utils::CryptoSuite &socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::HASH_ALGORITHM: + socket_option_value = crypto_suite_; + break; + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; +} + +int ProducerSocket::getSocketOption( + int socket_option_key, + std::shared_ptr<utils::Identity> &socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::IDENTITY: { + utils::SpinLock::Acquire locked(identity_lock_); + socket_option_value = identity_; + } break; + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; +} + +int ProducerSocket::getSocketOption(int socket_option_key, + std::string &socket_option_value) { + switch (socket_option_key) { + case DataLinkOptions::OUTPUT_INTERFACE: + socket_option_value = output_interface_; + break; + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; +} + asio::io_service &ProducerSocket::getIoService() { return io_service_; } } // namespace interface diff --git a/libtransport/src/hicn/transport/interfaces/socket_producer.h b/libtransport/src/hicn/transport/interfaces/socket_producer.h index 18adbf4a7..5c617d761 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_producer.h +++ b/libtransport/src/hicn/transport/interfaces/socket_producer.h @@ -91,500 +91,116 @@ class ProducerSocket : public Socket<BasePortal>, onInterest(*interest); }; - TRANSPORT_ALWAYS_INLINE int setSocketOption(int socket_option_key, - uint32_t socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::DATA_PACKET_SIZE: - if (socket_option_value < default_values::max_content_object_size && - socket_option_value > 0) { - data_packet_size_ = socket_option_value; - break; - } - - case GeneralTransportOptions::INPUT_BUFFER_SIZE: - if (socket_option_value >= 1) { - input_buffer_capacity_ = socket_option_value; - break; - } - - case GeneralTransportOptions::OUTPUT_BUFFER_SIZE: - output_buffer_.setLimit(socket_option_value); - break; - - case GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME: - content_object_expiry_time_ = socket_option_value; - break; - - case GeneralTransportOptions::SIGNATURE_TYPE: - if (socket_option_value == SOCKET_OPTION_DEFAULT) { - signature_type_ = SHA_256; - } else { - signature_type_ = socket_option_value; - } - - if (signature_type_ == SHA_256 || signature_type_ == RSA_256) { - signature_size_ = 32; - } - - break; - - case ProducerCallbacksOptions::INTEREST_INPUT: - if (socket_option_value == VOID_HANDLER) { - on_interest_input_ = VOID_HANDLER; - break; - } - - case ProducerCallbacksOptions::INTEREST_DROP: - if (socket_option_value == VOID_HANDLER) { - on_interest_dropped_input_buffer_ = VOID_HANDLER; - break; - } - - case ProducerCallbacksOptions::INTEREST_PASS: - if (socket_option_value == VOID_HANDLER) { - on_interest_inserted_input_buffer_ = VOID_HANDLER; - break; - } - - case ProducerCallbacksOptions::CACHE_HIT: - if (socket_option_value == VOID_HANDLER) { - on_interest_satisfied_output_buffer_ = VOID_HANDLER; - break; - } - - case ProducerCallbacksOptions::CACHE_MISS: - if (socket_option_value == VOID_HANDLER) { - on_interest_process_ = VOID_HANDLER; - break; - } - - case ProducerCallbacksOptions::NEW_CONTENT_OBJECT: - if (socket_option_value == VOID_HANDLER) { - on_new_segment_ = VOID_HANDLER; - break; - } - - case ProducerCallbacksOptions::CONTENT_OBJECT_SIGN: - if (socket_option_value == VOID_HANDLER) { - on_content_object_to_sign_ = VOID_HANDLER; - break; - } - - case ProducerCallbacksOptions::CONTENT_OBJECT_READY: - if (socket_option_value == VOID_HANDLER) { - on_content_object_in_output_buffer_ = VOID_HANDLER; - break; - } - - case ProducerCallbacksOptions::CONTENT_OBJECT_OUTPUT: - if (socket_option_value == VOID_HANDLER) { - on_content_object_output_ = VOID_HANDLER; - break; - } - - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } - - TRANSPORT_ALWAYS_INLINE int setSocketOption(int socket_option_key, - bool socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::MAKE_MANIFEST: - making_manifest_ = socket_option_value; - break; - - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } - - TRANSPORT_ALWAYS_INLINE int setSocketOption(int socket_option_key, - Name *socket_option_value) { - return SOCKET_OPTION_NOT_SET; - } - - TRANSPORT_ALWAYS_INLINE int setSocketOption( - int socket_option_key, std::list<Prefix> socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::NETWORK_NAME: - served_namespaces_ = socket_option_value; - break; - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } + virtual int setSocketOption(int socket_option_key, + uint32_t socket_option_value); - TRANSPORT_ALWAYS_INLINE int setSocketOption( - int socket_option_key, - ProducerContentObjectCallback socket_option_value) { - switch (socket_option_key) { - case ProducerCallbacksOptions::NEW_CONTENT_OBJECT: - on_new_segment_ = socket_option_value; - break; - - case ProducerCallbacksOptions::CONTENT_OBJECT_SIGN: - on_content_object_to_sign_ = socket_option_value; - break; - - case ProducerCallbacksOptions::CONTENT_OBJECT_READY: - on_content_object_in_output_buffer_ = socket_option_value; - break; + virtual int setSocketOption(int socket_option_key, + std::nullptr_t socket_option_value); - case ProducerCallbacksOptions::CONTENT_OBJECT_OUTPUT: - on_content_object_output_ = socket_option_value; - break; + virtual int setSocketOption(int socket_option_key, bool socket_option_value); - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } + virtual int setSocketOption(int socket_option_key, Name *socket_option_value); - TRANSPORT_ALWAYS_INLINE int setSocketOption( - int socket_option_key, ProducerInterestCallback socket_option_value) { - switch (socket_option_key) { - case ProducerCallbacksOptions::INTEREST_INPUT: - on_interest_input_ = socket_option_value; - break; + virtual int setSocketOption(int socket_option_key, + std::list<Prefix> socket_option_value); - case ProducerCallbacksOptions::INTEREST_DROP: - on_interest_dropped_input_buffer_ = socket_option_value; - break; + virtual int setSocketOption( + int socket_option_key, ProducerContentObjectCallback socket_option_value); - case ProducerCallbacksOptions::INTEREST_PASS: - on_interest_inserted_input_buffer_ = socket_option_value; - break; + virtual int setSocketOption(int socket_option_key, + ProducerInterestCallback socket_option_value); - case ProducerCallbacksOptions::CACHE_HIT: - on_interest_satisfied_output_buffer_ = socket_option_value; - break; - - case ProducerCallbacksOptions::CACHE_MISS: - on_interest_process_ = socket_option_value; - break; - - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } + virtual int setSocketOption(int socket_option_key, + ProducerContentCallback socket_option_value); - TRANSPORT_ALWAYS_INLINE int setSocketOption( - int socket_option_key, ProducerContentCallback socket_option_value) { - switch (socket_option_key) { - case ProducerCallbacksOptions::CONTENT_PRODUCED: - on_content_produced_ = socket_option_value; - break; + virtual int setSocketOption(int socket_option_key, + HashAlgorithm socket_option_value); - default: - return SOCKET_OPTION_NOT_SET; - } + virtual int setSocketOption(int socket_option_key, + utils::CryptoSuite socket_option_value); - return SOCKET_OPTION_SET; - } - - TRANSPORT_ALWAYS_INLINE int setSocketOption( - int socket_option_key, HashAlgorithm socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::HASH_ALGORITHM: - hash_algorithm_ = socket_option_value; - break; - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } - - TRANSPORT_ALWAYS_INLINE int setSocketOption( - int socket_option_key, utils::CryptoSuite socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::CRYPTO_SUITE: - crypto_suite_ = socket_option_value; - break; - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } - - TRANSPORT_ALWAYS_INLINE int setSocketOption( + virtual int setSocketOption( int socket_option_key, - const std::shared_ptr<utils::Identity> &socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::IDENTITY: - identity_ = socket_option_value; - break; - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } - - TRANSPORT_ALWAYS_INLINE int setSocketOption( - int socket_option_key, const std::string &socket_option_value) { - switch (socket_option_key) { - case DataLinkOptions::OUTPUT_INTERFACE: - output_interface_ = socket_option_value; - portal_->setOutputInterface(output_interface_); - break; - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - ; - } + const std::shared_ptr<utils::Identity> &socket_option_value); - TRANSPORT_ALWAYS_INLINE int getSocketOption(int socket_option_key, - uint32_t &socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::INPUT_BUFFER_SIZE: - socket_option_value = (uint32_t)input_buffer_capacity_; - break; + virtual int setSocketOption(int socket_option_key, + const std::string &socket_option_value); - case GeneralTransportOptions::OUTPUT_BUFFER_SIZE: - socket_option_value = (uint32_t)output_buffer_.getLimit(); - break; + virtual int getSocketOption(int socket_option_key, + uint32_t &socket_option_value); - case GeneralTransportOptions::DATA_PACKET_SIZE: - socket_option_value = (uint32_t)data_packet_size_; - break; + virtual int getSocketOption(int socket_option_key, bool &socket_option_value); - case GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME: - socket_option_value = content_object_expiry_time_; - break; - - case GeneralTransportOptions::SIGNATURE_TYPE: - socket_option_value = signature_type_; - break; - - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_GET; - } - - TRANSPORT_ALWAYS_INLINE int getSocketOption(int socket_option_key, - bool &socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::MAKE_MANIFEST: - socket_option_value = making_manifest_; - break; - - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } + virtual int getSocketOption(int socket_option_key, + std::list<Prefix> &socket_option_value); - TRANSPORT_ALWAYS_INLINE int getSocketOption( - int socket_option_key, std::list<Prefix> &socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::NETWORK_NAME: - socket_option_value = served_namespaces_; - break; - - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } - - TRANSPORT_ALWAYS_INLINE int getSocketOption( + virtual int getSocketOption( int socket_option_key, - ProducerContentObjectCallback **socket_option_value) { - switch (socket_option_key) { - case ProducerCallbacksOptions::NEW_CONTENT_OBJECT: - *socket_option_value = &on_new_segment_; - break; - - case ProducerCallbacksOptions::CONTENT_OBJECT_SIGN: - *socket_option_value = &on_content_object_to_sign_; - break; - - case ProducerCallbacksOptions::CONTENT_OBJECT_READY: - *socket_option_value = &on_content_object_in_output_buffer_; - break; - - case ProducerCallbacksOptions::CONTENT_OBJECT_OUTPUT: - *socket_option_value = &on_content_object_output_; - break; - - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } - - TRANSPORT_ALWAYS_INLINE int getSocketOption( - int socket_option_key, ProducerContentCallback **socket_option_value) { - switch (socket_option_key) { - case ProducerCallbacksOptions::CONTENT_PRODUCED: - *socket_option_value = &on_content_produced_; - break; - - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } - - TRANSPORT_ALWAYS_INLINE int getSocketOption( - int socket_option_key, ProducerInterestCallback **socket_option_value) { - switch (socket_option_key) { - case ProducerCallbacksOptions::INTEREST_INPUT: - *socket_option_value = &on_interest_input_; - break; - - case ProducerCallbacksOptions::INTEREST_DROP: - *socket_option_value = &on_interest_dropped_input_buffer_; - break; - - case ProducerCallbacksOptions::INTEREST_PASS: - *socket_option_value = &on_interest_inserted_input_buffer_; - break; + ProducerContentObjectCallback **socket_option_value); - case CACHE_HIT: - *socket_option_value = &on_interest_satisfied_output_buffer_; - break; + virtual int getSocketOption(int socket_option_key, + ProducerContentCallback **socket_option_value); - case CACHE_MISS: - *socket_option_value = &on_interest_process_; - break; + virtual int getSocketOption(int socket_option_key, + ProducerInterestCallback **socket_option_value); - default: - return SOCKET_OPTION_NOT_GET; - } + virtual int getSocketOption(int socket_option_key, + std::shared_ptr<Portal> &socket_option_value); - return SOCKET_OPTION_GET; - } + virtual int getSocketOption(int socket_option_key, + HashAlgorithm &socket_option_value); - TRANSPORT_ALWAYS_INLINE int getSocketOption( - int socket_option_key, std::shared_ptr<Portal> &socket_option_value) { - switch (socket_option_key) { - case PORTAL: - socket_option_value = portal_; - break; - default: - return SOCKET_OPTION_NOT_GET; - ; - } - - return SOCKET_OPTION_GET; - } + virtual int getSocketOption(int socket_option_key, + utils::CryptoSuite &socket_option_value); - TRANSPORT_ALWAYS_INLINE int getSocketOption( - int socket_option_key, HashAlgorithm &socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::HASH_ALGORITHM: - socket_option_value = hash_algorithm_; - break; - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } + virtual int getSocketOption( + int socket_option_key, + std::shared_ptr<utils::Identity> &socket_option_value); - TRANSPORT_ALWAYS_INLINE int getSocketOption( - int socket_option_key, utils::CryptoSuite &socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::HASH_ALGORITHM: - socket_option_value = crypto_suite_; - break; - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } + virtual int getSocketOption(int socket_option_key, + std::string &socket_option_value); - TRANSPORT_ALWAYS_INLINE int getSocketOption( - int socket_option_key, - std::shared_ptr<utils::Identity> &socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::IDENTITY: - if (identity_) { - socket_option_value = identity_; - break; - } - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } + template <typename Lambda, typename arg2> + int rescheduleOnIOService(int socket_option_key, arg2 socket_option_value, + Lambda lambda_func); - TRANSPORT_ALWAYS_INLINE int getSocketOption( - int socket_option_key, std::string &socket_option_value) { - switch (socket_option_key) { - case DataLinkOptions::OUTPUT_INTERFACE: - socket_option_value = output_interface_; - break; - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } + template <typename Lambda, typename arg2> + int rescheduleOnIOServiceWithReference(int socket_option_key, + arg2 &socket_option_value, + Lambda lambda_func); - private: + protected: // Threads std::thread listening_thread_; - protected: asio::io_service internal_io_service_; asio::io_service &io_service_; std::shared_ptr<Portal> portal_; - std::size_t data_packet_size_; - std::list<Prefix> served_namespaces_; - uint32_t content_object_expiry_time_; + std::atomic<size_t> data_packet_size_; + std::list<Prefix> + served_namespaces_; // No need to be threadsafe, this is always modified + // by the application thread + std::atomic<uint32_t> content_object_expiry_time_; // buffers + // ContentStore is thread-safe utils::ContentStore output_buffer_; - private: utils::EventThread async_thread_; int registration_status_; - bool making_manifest_; + std::atomic<bool> making_manifest_; // map for storing sequence numbers for several calls of the publish // function std::unordered_map<Name, std::unordered_map<int, uint32_t>> seq_number_map_; - int signature_type_; - int signature_size_; - - HashAlgorithm hash_algorithm_; - utils::CryptoSuite crypto_suite_; + std::atomic<HashAlgorithm> hash_algorithm_; + std::atomic<utils::CryptoSuite> crypto_suite_; + utils::SpinLock identity_lock_; std::shared_ptr<utils::Identity> identity_; - // buffers - - std::queue<std::shared_ptr<const Interest>> input_buffer_; - std::atomic_size_t input_buffer_capacity_; - std::atomic_size_t input_buffer_size_; - // callbacks - protected: ProducerInterestCallback on_interest_input_; ProducerInterestCallback on_interest_dropped_input_buffer_; ProducerInterestCallback on_interest_inserted_input_buffer_; diff --git a/libtransport/src/hicn/transport/protocols/protocol.cc b/libtransport/src/hicn/transport/protocols/protocol.cc index 9caa2eca7..8da9529d6 100644 --- a/libtransport/src/hicn/transport/protocols/protocol.cc +++ b/libtransport/src/hicn/transport/protocols/protocol.cc @@ -23,23 +23,28 @@ namespace protocol { using namespace interface; TransportProtocol::TransportProtocol(interface::ConsumerSocket *icn_socket) - : socket_(icn_socket), is_running_(false) { + : socket_(icn_socket), is_running_(false), is_first_(false) { socket_->getSocketOption(GeneralTransportOptions::PORTAL, portal_); } int TransportProtocol::start() { - // If the protocol is already running, return + // If the protocol is already running, return otherwise set as running if (is_running_) return -1; - // Set the protocol as running - is_running_ = true; - // Reset the protocol state machine reset(); + // Set it is the first time we schedule an interest + is_first_ = true; + // Schedule next interests scheduleNextInterests(); + is_first_ = false; + + // Set the protocol as running + is_running_ = true; + // Start Event loop portal_->runEventsLoop(); diff --git a/libtransport/src/hicn/transport/protocols/protocol.h b/libtransport/src/hicn/transport/protocols/protocol.h index 88889bb8c..e4821b6a0 100644 --- a/libtransport/src/hicn/transport/protocols/protocol.h +++ b/libtransport/src/hicn/transport/protocols/protocol.h @@ -15,6 +15,8 @@ #pragma once +#include <atomic> + #include <hicn/transport/interfaces/socket.h> #include <hicn/transport/protocols/packet_manager.h> #include <hicn/transport/protocols/statistics.h> @@ -60,7 +62,9 @@ class TransportProtocol : public interface::BasePortal::ConsumerCallback, protected: interface::ConsumerSocket *socket_; std::shared_ptr<interface::BasePortal> portal_; - volatile bool is_running_; + std::atomic<bool> is_running_; + // True if it si the first time we schedule an interest + std::atomic<bool> is_first_; TransportStatistics stats_; }; diff --git a/libtransport/src/hicn/transport/protocols/raaqm.cc b/libtransport/src/hicn/transport/protocols/raaqm.cc index 574693c51..c816158f9 100644 --- a/libtransport/src/hicn/transport/protocols/raaqm.cc +++ b/libtransport/src/hicn/transport/protocols/raaqm.cc @@ -318,17 +318,17 @@ void RaaqmTransportProtocol::onContentObject( } // Call application-defined callbacks - ConsumerContentObjectCallback *callback_content_object = nullptr; + ConsumerContentObjectCallback *callback_content_object = VOID_HANDLER; socket_->getSocketOption(ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT, &callback_content_object); - if (*callback_content_object != VOID_HANDLER) { + if (*callback_content_object) { (*callback_content_object)(*socket_, *content_object); } - ConsumerInterestCallback *callback_interest = nullptr; + ConsumerInterestCallback *callback_interest = VOID_HANDLER; socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_SATISFIED, &callback_interest); - if (*callback_content_object != VOID_HANDLER) { + if (*callback_content_object) { (*callback_interest)(*socket_, *interest); } @@ -369,10 +369,10 @@ void RaaqmTransportProtocol::onContentSegment( reassemble(std::move(content_object)); } else if (TRANSPORT_EXPECT_FALSE(incremental_suffix == index_manager_->getFinalSuffix())) { - interface::ConsumerSocket::ReadCallback *on_payload = nullptr; + interface::ConsumerSocket::ReadCallback *on_payload = VOID_HANDLER; socket_->getSocketOption(READ_CALLBACK, &on_payload); - if (on_payload != nullptr) { + if (on_payload) { on_payload->readSuccess(stats_.getBytesRecv()); } } @@ -404,10 +404,10 @@ void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) { return; } - ConsumerInterestCallback *callback = nullptr; + ConsumerInterestCallback *callback = VOID_HANDLER; socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_EXPIRED, &callback); - if (*callback != VOID_HANDLER) { + if (*callback) { (*callback)(*socket_, *interest); } @@ -420,17 +420,17 @@ void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) { max_rtx)) { stats_.updateRetxCount(1); - callback = nullptr; + callback = VOID_HANDLER; socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_RETRANSMISSION, &callback); - if (*callback != VOID_HANDLER) { + if (*callback) { (*callback)(*socket_, *interest); } - callback = nullptr; + callback = VOID_HANDLER; socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT, &callback); - if ((*callback) != VOID_HANDLER) { + if (*callback) { (*callback)(*socket_, *interest); } @@ -450,7 +450,7 @@ void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) { } void RaaqmTransportProtocol::scheduleNextInterests() { - if (TRANSPORT_EXPECT_FALSE(!is_running_)) { + if (TRANSPORT_EXPECT_FALSE(!is_running_ && !is_first_)) { return; } @@ -490,14 +490,14 @@ void RaaqmTransportProtocol::sendInterest(std::uint64_t next_suffix) { interest_lifetime); interest->setLifetime(interest_lifetime); - ConsumerInterestCallback *callback = nullptr; + ConsumerInterestCallback *callback = VOID_HANDLER; socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT, &callback); - if (*callback != VOID_HANDLER) { + if (*callback) { callback->operator()(*socket_, *interest); } - if (TRANSPORT_EXPECT_FALSE(!is_running_)) { + if (TRANSPORT_EXPECT_FALSE(!is_running_ && !is_first_)) { return; } @@ -516,10 +516,10 @@ void RaaqmTransportProtocol::sendInterest(Interest::Ptr &&interest) { } void RaaqmTransportProtocol::onContentReassembled(std::error_code ec) { - interface::ConsumerSocket::ReadCallback *on_payload = nullptr; + interface::ConsumerSocket::ReadCallback *on_payload = VOID_HANDLER; socket_->getSocketOption(READ_CALLBACK, &on_payload); - if (on_payload == nullptr) { + if (on_payload) { throw errors::RuntimeException( "The read callback must be installed in the transport before " "starting " @@ -581,10 +581,10 @@ void RaaqmTransportProtocol::updateStats(uint32_t suffix, uint64_t rtt, stats_.updateAverageWindowSize(current_window_size_); // Call statistics callback - ConsumerTimerCallback *stats_callback = nullptr; + ConsumerTimerCallback *stats_callback = VOID_HANDLER; socket_->getSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY, &stats_callback); - if (*stats_callback != VOID_HANDLER) { + if (*stats_callback) { auto dt = std::chrono::duration_cast<utils::Milliseconds>(now - t0_); uint32_t timer_interval_milliseconds = 0; diff --git a/libtransport/src/hicn/transport/protocols/rtc.cc b/libtransport/src/hicn/transport/protocols/rtc.cc index 620523cbe..e6134f767 100644 --- a/libtransport/src/hicn/transport/protocols/rtc.cc +++ b/libtransport/src/hicn/transport/protocols/rtc.cc @@ -88,13 +88,13 @@ void RTCTransportProtocol::reset() { lastSegNacked_ = 0; lastReceived_ = 0; lastReceivedTime_ = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); + std::chrono::steady_clock::now().time_since_epoch()) + .count(); highestReceived_ = 0; firstSequenceInRound_ = 0; rtx_timer_used_ = false; - for(int i = 0; i < (1 << default_values::log_2_default_buffer_size); i++){ + for (int i = 0; i < (1 << default_values::log_2_default_buffer_size); i++) { inflightInterests_[i] = {0}; } @@ -191,14 +191,13 @@ void RTCTransportProtocol::updateDelayStats( pathTable_[pathLabel]->insertOwdSample(OWD); pathTable_[pathLabel]->computeInterArrivalGap(segmentNumber); - }else{ + } else { pathTable_[pathLabel]->receivedNack(); } } void RTCTransportProtocol::updateStats(uint32_t round_duration) { - if(pathTable_.empty()) - return; + if (pathTable_.empty()) return; if (receivedBytes_ != 0) { double bytesPerSec = @@ -213,68 +212,70 @@ void RTCTransportProtocol::updateStats(uint32_t round_duration) { for (auto it = pathTable_.begin(); it != pathTable_.end(); it++) { it->second->roundEnd(); - if(it->second->isActive()){ - if(it->second->getMinRtt() < minRtt){ + if (it->second->isActive()) { + if (it->second->getMinRtt() < minRtt) { minRtt = it->second->getMinRtt(); producerPathLabels_[0] = it->first; } - if(it->second->getMinRtt() > maxRtt){ + if (it->second->getMinRtt() > maxRtt) { maxRtt = it->second->getMinRtt(); producerPathLabels_[1] = it->first; } } } - if(pathTable_.find(producerPathLabels_[0]) == pathTable_.end() || + if (pathTable_.find(producerPathLabels_[0]) == pathTable_.end() || pathTable_.find(producerPathLabels_[1]) == pathTable_.end()) - return; //this should not happen + return; // this should not happen - //as a queuing delay we keep the lowest one among the two paths - //if one path is congested the forwarder should decide to do not - //use it so it does not make sense to inform the application - //that maybe we have a problem - if(pathTable_[producerPathLabels_[0]]->getQueuingDealy() < + // as a queuing delay we keep the lowest one among the two paths + // if one path is congested the forwarder should decide to do not + // use it so it does not make sense to inform the application + // that maybe we have a problem + if (pathTable_[producerPathLabels_[0]]->getQueuingDealy() < pathTable_[producerPathLabels_[1]]->getQueuingDealy()) - queuingDelay_ = pathTable_[producerPathLabels_[0]]->getQueuingDealy(); + queuingDelay_ = pathTable_[producerPathLabels_[0]]->getQueuingDealy(); else - queuingDelay_ = pathTable_[producerPathLabels_[1]]->getQueuingDealy(); + queuingDelay_ = pathTable_[producerPathLabels_[1]]->getQueuingDealy(); if (sentInterest_ != 0 && currentState_ == HICN_RTC_NORMAL_STATE) { - uint32_t numberTheoricallyReceivedPackets_ = highestReceived_ - firstSequenceInRound_; + uint32_t numberTheoricallyReceivedPackets_ = + highestReceived_ - firstSequenceInRound_; double lossRate = 0; - if(numberTheoricallyReceivedPackets_ != 0) - lossRate = (double)((double)(packetLost_ - lossRecovered_) / (double)numberTheoricallyReceivedPackets_); + if (numberTheoricallyReceivedPackets_ != 0) + lossRate = (double)((double)(packetLost_ - lossRecovered_) / + (double)numberTheoricallyReceivedPackets_); - if(lossRate < 0) - lossRate = 0; + if (lossRate < 0) lossRate = 0; - if(initied){ + if (initied) { lossRate_ = lossRate_ * HICN_ESTIMATED_LOSSES_ALPHA + - (lossRate * (1 - HICN_ESTIMATED_LOSSES_ALPHA)); - }else { - lossRate_ =lossRate; + (lossRate * (1 - HICN_ESTIMATED_LOSSES_ALPHA)); + } else { + lossRate_ = lossRate; initied = true; } } if (avgPacketSize_ == 0) avgPacketSize_ = HICN_INIT_PACKET_SIZE; - //for the BDP we use the max rtt, so that we calibrate the window on the - //RTT of the slowest path. In this way we are sure that the window will - //never be too small + // for the BDP we use the max rtt, so that we calibrate the window on the + // RTT of the slowest path. In this way we are sure that the window will + // never be too small uint32_t BDP = (uint32_t)ceil( - (estimatedBw_ * (double)((double) pathTable_[producerPathLabels_[1]]->getMinRtt() / - (double)HICN_MILLI_IN_A_SEC) * - HICN_BANDWIDTH_SLACK_FACTOR) / - avgPacketSize_); + (estimatedBw_ * + (double)((double)pathTable_[producerPathLabels_[1]]->getMinRtt() / + (double)HICN_MILLI_IN_A_SEC) * + HICN_BANDWIDTH_SLACK_FACTOR) / + avgPacketSize_); uint32_t BW = (uint32_t)ceil(estimatedBw_); computeMaxWindow(BW, BDP); ConsumerTimerCallback *stats_callback = nullptr; socket_->getSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY, &stats_callback); - if (*stats_callback != VOID_HANDLER) { - //Send the stats to the app + if (*stats_callback) { + // Send the stats to the app stats_.updateQueuingDelay(queuingDelay_); stats_.updateLossRatio(lossRate_); (*stats_callback)(*socket_, stats_); @@ -334,8 +335,8 @@ void RTCTransportProtocol::computeMaxWindow(uint32_t productionRate, // currentState = RTC_NORMAL_STATE if (BDPWin != 0) { - maxCWin_ = - (uint32_t)ceil((double)BDPWin + (((double)BDPWin * 30.0) / 100.0)); // BDP + 30% + maxCWin_ = (uint32_t)ceil((double)BDPWin + + (((double)BDPWin * 30.0) / 100.0)); // BDP + 30% } else { maxCWin_ = min(maxWaintingInterest, maxCWin_); } @@ -380,22 +381,22 @@ void RTCTransportProtocol::increaseWindow() { } } -void RTCTransportProtocol::probeRtt(){ +void RTCTransportProtocol::probeRtt() { time_sent_probe_ = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); + std::chrono::steady_clock::now().time_since_epoch()) + .count(); Name *interest_name = nullptr; socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, - &interest_name); - //get a random numbe in the probe seq range + &interest_name); + // get a random numbe in the probe seq range std::default_random_engine eng((std::random_device())()); - std::uniform_int_distribution<uint32_t> idis( - HICN_MIN_PROBE_SEQ, HICN_MAX_PROBE_SEQ); + std::uniform_int_distribution<uint32_t> idis(HICN_MIN_PROBE_SEQ, + HICN_MAX_PROBE_SEQ); probe_seq_number_ = idis(eng); interest_name->setSuffix(probe_seq_number_); - //we considere the probe as a rtx so that we do not incresea inFlightInt + // we considere the probe as a rtx so that we do not incresea inFlightInt received_probe_ = false; sendInterest(interest_name, true); @@ -406,7 +407,6 @@ void RTCTransportProtocol::probeRtt(){ }); } - void RTCTransportProtocol::sendInterest(Name *interest_name, bool rtx) { auto interest = getPacket(); interest->setName(*interest_name); @@ -421,11 +421,11 @@ void RTCTransportProtocol::sendInterest(Name *interest_name, bool rtx) { socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT, &on_interest_output); - if (*on_interest_output != VOID_HANDLER) { + if (*on_interest_output) { (*on_interest_output)(*socket_, *interest); } - if (TRANSPORT_EXPECT_FALSE(!is_running_)) { + if (TRANSPORT_EXPECT_FALSE(!is_running_ && !is_first_)) { return; } @@ -440,20 +440,20 @@ void RTCTransportProtocol::sendInterest(Name *interest_name, bool rtx) { void RTCTransportProtocol::scheduleNextInterests() { checkRound(); - if (!is_running_) return; + if (!is_running_ && !is_first_) return; while (inflightInterestsCount_ < currentCWin_) { Name *interest_name = nullptr; socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &interest_name); - interest_name->setSuffix(actualSegment_); + interest_name->setSuffix(actualSegment_); // if the producer socket is not stated (does not reply even with nacks) // we keep asking for something without marking anything as lost (see // timeout). In this way when the producer socket will start the - //consumer socket will not miss any packet - if(TRANSPORT_EXPECT_FALSE(!firstPckReceived_)){ + // consumer socket will not miss any packet + if (TRANSPORT_EXPECT_FALSE(!firstPckReceived_)) { uint32_t pkt = actualSegment_ & modMask_; inflightInterests_[pkt].state = sent_; inflightInterests_[pkt].sequence = actualSegment_; @@ -477,9 +477,9 @@ void RTCTransportProtocol::scheduleNextInterests() { continue; } - //same if the packet is lost + // same if the packet is lost if (inflightInterests_[pkt].state == lost_ && - inflightInterests_[pkt].sequence == actualSegment_){ + inflightInterests_[pkt].sequence == actualSegment_) { actualSegment_ = (actualSegment_ + 1) % HICN_MIN_PROBE_SEQ; continue; } @@ -489,7 +489,7 @@ void RTCTransportProtocol::scheduleNextInterests() { std::chrono::steady_clock::now().time_since_epoch()) .count(); - //here the packet can be in any state except for lost or recevied + // here the packet can be in any state except for lost or recevied inflightInterests_[pkt].state = sent_; inflightInterests_[pkt].sequence = actualSegment_; actualSegment_ = (actualSegment_ + 1) % HICN_MIN_PROBE_SEQ; @@ -506,22 +506,21 @@ void RTCTransportProtocol::addRetransmissions(uint32_t val) { void RTCTransportProtocol::addRetransmissions(uint32_t start, uint32_t stop) { uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); + std::chrono::steady_clock::now().time_since_epoch()) + .count(); bool new_rtx = false; for (uint32_t i = start; i < stop; i++) { auto it = interestRetransmissions_.find(i); if (it == interestRetransmissions_.end()) { uint32_t pkt = i & modMask_; - if (lastSegNacked_ <= i && - inflightInterests_[pkt].state != received_) { + if (lastSegNacked_ <= i && inflightInterests_[pkt].state != received_) { // it must be larger than the last past nack received packetLost_++; interestRetransmissions_[i] = 0; uint32_t pkt = i & modMask_; - //we reset the transmission time setting to now, so that rtx will - //happne in one RTT on waint one inter arrival gap + // we reset the transmission time setting to now, so that rtx will + // happne in one RTT on waint one inter arrival gap inflightInterests_[pkt].transmissionTime = now; new_rtx = true; } @@ -529,10 +528,10 @@ void RTCTransportProtocol::addRetransmissions(uint32_t start, uint32_t stop) { // take care of it } - //in case a new rtx is added to the map we need to run checkRtx() - if(new_rtx){ - if(rtx_timer_used_){ - //if a timer is pending we need to delete it + // in case a new rtx is added to the map we need to run checkRtx() + if (new_rtx) { + if (rtx_timer_used_) { + // if a timer is pending we need to delete it rtx_timer_->cancel(); rtx_timer_used_ = false; } @@ -553,8 +552,8 @@ uint64_t RTCTransportProtocol::retransmit() { it = interestRetransmissions_.begin(); uint64_t smallest_timeout = ULONG_MAX; uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); + std::chrono::steady_clock::now().time_since_epoch()) + .count(); while (it != interestRetransmissions_.end()) { uint32_t pkt = it->first & modMask_; @@ -580,46 +579,46 @@ uint64_t RTCTransportProtocol::retransmit() { uint64_t rtx_time = now; - if(it->second == 0) { - //first rtx - if(producerPathLabels_[0] != producerPathLabels_[1]){ - //multipath - if(pathTable_.find(producerPathLabels_[0]) != pathTable_.end() && - pathTable_.find(producerPathLabels_[1]) != pathTable_.end() && - (pathTable_[producerPathLabels_[0]]->getInterArrivalGap() < - HICN_MIN_INTER_ARRIVAL_GAP)){ + if (it->second == 0) { + // first rtx + if (producerPathLabels_[0] != producerPathLabels_[1]) { + // multipath + if (pathTable_.find(producerPathLabels_[0]) != pathTable_.end() && + pathTable_.find(producerPathLabels_[1]) != pathTable_.end() && + (pathTable_[producerPathLabels_[0]]->getInterArrivalGap() < + HICN_MIN_INTER_ARRIVAL_GAP)) { rtx_time = lastReceivedTime_ + - (pathTable_[producerPathLabels_[1]]->getMinRtt() - - pathTable_[producerPathLabels_[0]]->getMinRtt()) + - pathTable_[producerPathLabels_[0]]->getInterArrivalGap(); - }//else low rate producer, send it immediatly - }else{ - //single path - if(pathTable_.find(producerPathLabels_[0]) != pathTable_.end() && + (pathTable_[producerPathLabels_[1]]->getMinRtt() - + pathTable_[producerPathLabels_[0]]->getMinRtt()) + + pathTable_[producerPathLabels_[0]]->getInterArrivalGap(); + } // else low rate producer, send it immediatly + } else { + // single path + if (pathTable_.find(producerPathLabels_[0]) != pathTable_.end() && (pathTable_[producerPathLabels_[0]]->getInterArrivalGap() < - HICN_MIN_INTER_ARRIVAL_GAP)){ + HICN_MIN_INTER_ARRIVAL_GAP)) { rtx_time = lastReceivedTime_ + - pathTable_[producerPathLabels_[0]]->getInterArrivalGap(); - }//else low rate producer send immediatly + pathTable_[producerPathLabels_[0]]->getInterArrivalGap(); + } // else low rate producer send immediatly } - }else{ - //second or plus rtx, wait for the min rtt - if(pathTable_.find(producerPathLabels_[0]) != pathTable_.end()){ + } else { + // second or plus rtx, wait for the min rtt + if (pathTable_.find(producerPathLabels_[0]) != pathTable_.end()) { uint64_t sent_time = inflightInterests_[pkt].transmissionTime; rtx_time = sent_time + pathTable_[producerPathLabels_[0]]->getMinRtt(); - }//if we don't have info we send it immediatly + } // if we don't have info we send it immediatly } - if(now >= rtx_time){ + if (now >= rtx_time) { inflightInterests_[pkt].transmissionTime = now; it->second++; Name *interest_name = nullptr; socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, - &interest_name); + &interest_name); interest_name->setSuffix(it->first); sendInterest(interest_name, true); - }else if(rtx_time < smallest_timeout){ + } else if (rtx_time < smallest_timeout) { smallest_timeout = rtx_time; } @@ -629,7 +628,7 @@ uint64_t RTCTransportProtocol::retransmit() { } void RTCTransportProtocol::checkRtx() { - if(interestRetransmissions_.empty()){ + if (interestRetransmissions_.empty()) { rtx_timer_used_ = false; return; } @@ -637,10 +636,10 @@ void RTCTransportProtocol::checkRtx() { uint64_t next_timeout = retransmit(); uint64_t wait = 1; uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); - if(next_timeout != ULONG_MAX && now < next_timeout){ - wait = next_timeout - now; + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + if (next_timeout != ULONG_MAX && now < next_timeout) { + wait = next_timeout - now; } rtx_timer_used_ = true; rtx_timer_->expires_from_now(std::chrono::milliseconds(wait)); @@ -656,14 +655,14 @@ void RTCTransportProtocol::onTimeout(Interest::Ptr &&interest) { uint32_t segmentNumber = interest->getName().getSuffix(); - if(segmentNumber >= HICN_MIN_PROBE_SEQ){ + if (segmentNumber >= HICN_MIN_PROBE_SEQ) { // this is a timeout on a probe, do nothing return; } uint32_t pkt = segmentNumber & modMask_; - if(TRANSPORT_EXPECT_FALSE(!firstPckReceived_)){ + if (TRANSPORT_EXPECT_FALSE(!firstPckReceived_)) { inflightInterestsCount_--; // we do nothing, and we keep asking the same stuff over // and over until we get at least a packet @@ -698,7 +697,8 @@ void RTCTransportProtocol::onTimeout(Interest::Ptr &&interest) { scheduleNextInterests(); } -bool RTCTransportProtocol::onNack(const ContentObject &content_object, bool rtx) { +bool RTCTransportProtocol::onNack(const ContentObject &content_object, + bool rtx) { uint32_t *payload = (uint32_t *)content_object.getPayload()->data(); uint32_t productionSeg = *payload; uint32_t productionRate = *(++payload); @@ -708,11 +708,11 @@ bool RTCTransportProtocol::onNack(const ContentObject &content_object, bool rtx) // if we did not received anything between lastReceived_ + 1 and productionSeg // most likelly some packets got lost - if(lastReceived_ != 0){ + if (lastReceived_ != 0) { addRetransmissions(lastReceived_ + 1, productionSeg); } - if(!rtx){ + if (!rtx) { gotNack_ = true; // we synch the estimated production rate with the actual one estimatedBw_ = (double)productionRate; @@ -722,7 +722,7 @@ bool RTCTransportProtocol::onNack(const ContentObject &content_object, bool rtx) // we are asking for stuff produced in the past actualSegment_ = max(productionSeg, actualSegment_) % HICN_MIN_PROBE_SEQ; - if(!rtx) { + if (!rtx) { if (currentState_ == HICN_RTC_NORMAL_STATE) { currentState_ = HICN_RTC_SYNC_STATE; } @@ -737,7 +737,7 @@ bool RTCTransportProtocol::onNack(const ContentObject &content_object, bool rtx) } else if (productionSeg < nackSegment) { actualSegment_ = productionSeg % HICN_MIN_PROBE_SEQ; - if(!rtx){ + if (!rtx) { // we are asking stuff in the future gotFutureNack_++; computeMaxWindow(productionRate, 0); @@ -748,8 +748,8 @@ bool RTCTransportProtocol::onNack(const ContentObject &content_object, bool rtx) } } } else { - //we are asking the right thing, but the producer is slow - //keep doing the same until the packet is produced + // we are asking the right thing, but the producer is slow + // keep doing the same until the packet is produced actualSegment_ = productionSeg % HICN_MIN_PROBE_SEQ; } @@ -758,7 +758,6 @@ bool RTCTransportProtocol::onNack(const ContentObject &content_object, bool rtx) void RTCTransportProtocol::onContentObject( Interest::Ptr &&interest, ContentObject::Ptr &&content_object) { - // as soon as we get a packet firstPckReceived_ will never be false firstPckReceived_ = true; @@ -770,24 +769,25 @@ void RTCTransportProtocol::onContentObject( ConsumerContentObjectCallback *callback_content_object = nullptr; socket_->getSocketOption(ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT, &callback_content_object); - if (*callback_content_object != VOID_HANDLER) { + if (*callback_content_object) { (*callback_content_object)(*socket_, *content_object); } - if(segmentNumber >= HICN_MIN_PROBE_SEQ){ - if(segmentNumber == probe_seq_number_ && !received_probe_){ + if (segmentNumber >= HICN_MIN_PROBE_SEQ) { + if (segmentNumber == probe_seq_number_ && !received_probe_) { received_probe_ = true; uint32_t pathLabel = content_object->getPathLabel(); - if (pathTable_.find(pathLabel) == pathTable_.end()){ - //if this path does not exists we cannot create a new one so drop + if (pathTable_.find(pathLabel) == pathTable_.end()) { + // if this path does not exists we cannot create a new one so drop return; } - //this is the expected probe, update the RTT and drop the packet + // this is the expected probe, update the RTT and drop the packet uint64_t RTT = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::steady_clock::now().time_since_epoch()) - .count() - time_sent_probe_; + std::chrono::steady_clock::now().time_since_epoch()) + .count() - + time_sent_probe_; pathTable_[pathLabel]->insertRttSample(RTT); pathTable_[pathLabel]->receivedNack(); @@ -803,22 +803,23 @@ void RTCTransportProtocol::onContentObject( bool old_nack = false; if (interestRetransmissions_.find(segmentNumber) == - interestRetransmissions_.end()){ - //this is not a retransmitted packet + interestRetransmissions_.end()) { + // this is not a retransmitted packet old_nack = onNack(*content_object, false); updateDelayStats(*content_object); } else { old_nack = onNack(*content_object, true); } - //the nacked_ state is used only to avoid to decrease inflightInterestsCount_ - //multiple times. In fact, every time that we receive an event related to an - //interest (timeout, nacked, content) we cange the state. In this way we are - //sure that we do not decrease twice the counter - if(old_nack){ + // the nacked_ state is used only to avoid to decrease + // inflightInterestsCount_ multiple times. In fact, every time that we + // receive an event related to an interest (timeout, nacked, content) we + // cange the state. In this way we are sure that we do not decrease twice the + // counter + if (old_nack) { inflightInterests_[pkt].state = lost_; interestRetransmissions_.erase(segmentNumber); - }else{ + } else { inflightInterests_[pkt].state = nacked_; } @@ -827,7 +828,7 @@ void RTCTransportProtocol::onContentObject( ((1 - HICN_ESTIMATED_PACKET_SIZE) * payload->length()); receivedBytes_ += (uint32_t)(content_object->headerSize() + - content_object->payloadSize()); + content_object->payloadSize()); if (inflightInterests_[pkt].state == sent_) { inflightInterestsCount_--; // packet sent without timeouts @@ -841,22 +842,21 @@ void RTCTransportProtocol::onContentObject( } addRetransmissions(lastReceived_ + 1, segmentNumber); - if(segmentNumber > highestReceived_){ + if (segmentNumber > highestReceived_) { highestReceived_ = segmentNumber; } - if(segmentNumber > lastReceived_){ + if (segmentNumber > lastReceived_) { lastReceived_ = segmentNumber; - lastReceivedTime_ = std::chrono::duration_cast< - std::chrono::milliseconds>( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); + lastReceivedTime_ = + std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); } receivedData_++; inflightInterests_[pkt].state = received_; auto it = interestRetransmissions_.find(segmentNumber); - if(it != interestRetransmissions_.end()) - lossRecovered_ ++; + if (it != interestRetransmissions_.end()) lossRecovered_++; interestRetransmissions_.erase(segmentNumber); diff --git a/libtransport/src/hicn/transport/utils/content_store.cc b/libtransport/src/hicn/transport/utils/content_store.cc index 1e6b9fcea..8e3435507 100644 --- a/libtransport/src/hicn/transport/utils/content_store.cc +++ b/libtransport/src/hicn/transport/utils/content_store.cc @@ -85,12 +85,17 @@ void ContentStore::erase(const Name &exact_name) { } void ContentStore::setLimit(size_t max_packets) { + utils::SpinLock::Acquire locked(cs_mutex_); max_content_store_size_ = max_packets; } -std::size_t ContentStore::getLimit() const { return max_content_store_size_; } +std::size_t ContentStore::getLimit() const { + utils::SpinLock::Acquire locked(cs_mutex_); + return max_content_store_size_; +} std::size_t ContentStore::size() const { + utils::SpinLock::Acquire locked(cs_mutex_); return content_store_hash_table_.size(); } diff --git a/libtransport/src/hicn/transport/utils/content_store.h b/libtransport/src/hicn/transport/utils/content_store.h index a89403a01..f7dc41835 100644 --- a/libtransport/src/hicn/transport/utils/content_store.h +++ b/libtransport/src/hicn/transport/utils/content_store.h @@ -68,8 +68,9 @@ class ContentStore { ContentStoreHashTable content_store_hash_table_; FIFOList fifo_list_; std::shared_ptr<ContentObject> empty_reference_; - std::size_t max_content_store_size_; - utils::SpinLock cs_mutex_; + // Must be atomic + std::atomic_size_t max_content_store_size_; + mutable utils::SpinLock cs_mutex_; }; } // end namespace utils
\ No newline at end of file |