diff options
author | Alberto Compagno <acompagn+fdio@cisco.com> | 2019-10-15 18:08:41 +0200 |
---|---|---|
committer | Alberto Compagno <acompagn+fdio@cisco.com> | 2019-10-22 11:01:41 +0200 |
commit | 755c6833ae2d2eee87e80ed3b84c75e968f48c46 (patch) | |
tree | 653345beb889acabc83b3b3b03e849fa34b1baac /libtransport/src/hicn/transport/interfaces | |
parent | 7204bac00804448a797d4e76ced04a3b84d0d741 (diff) |
[HICN-328] Reworking setSocketOption and getSocketOption to be thread-safe
Change-Id: Ie22572822b9ac1e6c300fd7982035c799546bd76
Signed-off-by: Alberto Compagno <acompagn+fdio@cisco.com>
Diffstat (limited to 'libtransport/src/hicn/transport/interfaces')
11 files changed, 1578 insertions, 1203 deletions
diff --git a/libtransport/src/hicn/transport/interfaces/CMakeLists.txt b/libtransport/src/hicn/transport/interfaces/CMakeLists.txt index a5cca78a6..0c2c73623 100644 --- a/libtransport/src/hicn/transport/interfaces/CMakeLists.txt +++ b/libtransport/src/hicn/transport/interfaces/CMakeLists.txt @@ -28,6 +28,7 @@ list(APPEND SOURCE_FILES ${CMAKE_CURRENT_SOURCE_DIR}/rtc_socket_producer.cc ${CMAKE_CURRENT_SOURCE_DIR}/socket_producer.cc ${CMAKE_CURRENT_SOURCE_DIR}/socket_consumer.cc + ${CMAKE_CURRENT_SOURCE_DIR}/callbacks.cc ) set(SOURCE_FILES ${SOURCE_FILES} PARENT_SCOPE) diff --git a/libtransport/src/hicn/transport/interfaces/callbacks.cc b/libtransport/src/hicn/transport/interfaces/callbacks.cc new file mode 100644 index 000000000..af470898c --- /dev/null +++ b/libtransport/src/hicn/transport/interfaces/callbacks.cc @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "callbacks.h" + +namespace transport { + +namespace interface { + +nullptr_t VOID_HANDLER = nullptr; + +} // namespace interface + +} // namespace transport
\ No newline at end of file diff --git a/libtransport/src/hicn/transport/interfaces/callbacks.h b/libtransport/src/hicn/transport/interfaces/callbacks.h index 24f47eb75..41b6213fe 100644 --- a/libtransport/src/hicn/transport/interfaces/callbacks.h +++ b/libtransport/src/hicn/transport/interfaces/callbacks.h @@ -18,6 +18,8 @@ #include <functional> #include <system_error> +#include <hicn/transport/core/facade.h> + namespace utils { class MemBuf; } @@ -105,6 +107,8 @@ using ProducerContentObjectCallback = using ProducerInterestCallback = std::function<void(ProducerSocket &, core::Interest &)>; +extern nullptr_t VOID_HANDLER; + } // namespace interface } // namespace transport
\ No newline at end of file diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc index e7dc98868..c1a45ebb7 100644 --- a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc +++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc @@ -13,6 +13,7 @@ * limitations under the License. */ +#include <hicn/transport/interfaces/callbacks.h> #include <hicn/transport/interfaces/rtc_socket_producer.h> #include <stdlib.h> #include <time.h> @@ -31,9 +32,10 @@ #define HICN_MAX_DATA_SEQ 0xefffffff -//slow production rate param -#define MIN_PRODUCTION_RATE 10 // in pacekts per sec. this value is computed - // through experiments +// slow production rate param +#define MIN_PRODUCTION_RATE \ + 10 // in pacekts per sec. this value is computed + // through experiments #define LIFETIME_FRACTION 0.5 // NACK HEADER @@ -63,13 +65,12 @@ RTCProducerSocket::RTCProducerSocket(asio::io_service &io_service) bytesProductionRate_(INIT_PACKET_PRODUCTION_RATE * 1400), packetsProductionRate_(INIT_PACKET_PRODUCTION_RATE), perSecondFactor_(MILLI_IN_A_SEC / STATS_INTERVAL_DURATION), - timer_on_(false){ + timer_on_(false) { srand((unsigned int)time(NULL)); prodLabel_ = ((rand() % 255) << 24UL); - interests_cache_timer_ = std::make_unique<asio::steady_timer>( - this->getIoService()); - round_timer_ = std::make_unique<asio::steady_timer>( - this->getIoService()); + interests_cache_timer_ = + std::make_unique<asio::steady_timer>(this->getIoService()); + round_timer_ = std::make_unique<asio::steady_timer>(this->getIoService()); setSocketOption(GeneralTransportOptions::OUTPUT_BUFFER_SIZE, 10000U); scheduleRoundTimer(); } @@ -82,13 +83,12 @@ RTCProducerSocket::RTCProducerSocket() bytesProductionRate_(INIT_PACKET_PRODUCTION_RATE * 1400), packetsProductionRate_(INIT_PACKET_PRODUCTION_RATE), perSecondFactor_(MILLI_IN_A_SEC / STATS_INTERVAL_DURATION), - timer_on_(false){ + timer_on_(false) { srand((unsigned int)time(NULL)); prodLabel_ = ((rand() % 255) << 24UL); - interests_cache_timer_ = std::make_unique<asio::steady_timer>( - this->getIoService()); - round_timer_ = std::make_unique<asio::steady_timer>( - this->getIoService()); + interests_cache_timer_ = + std::make_unique<asio::steady_timer>(this->getIoService()); + round_timer_ = std::make_unique<asio::steady_timer>(this->getIoService()); setSocketOption(GeneralTransportOptions::OUTPUT_BUFFER_SIZE, 10000U); scheduleRoundTimer(); } @@ -113,13 +113,13 @@ void RTCProducerSocket::registerPrefix(const Prefix &producer_namespace) { } } -void RTCProducerSocket::scheduleRoundTimer(){ - round_timer_->expires_from_now( - std::chrono::milliseconds(STATS_INTERVAL_DURATION)); - round_timer_->async_wait([this](std::error_code ec) { - if (ec) return; - updateStats(); - }); +void RTCProducerSocket::scheduleRoundTimer() { + round_timer_->expires_from_now( + std::chrono::milliseconds(STATS_INTERVAL_DURATION)); + round_timer_->async_wait([this](std::error_code ec) { + if (ec) return; + updateStats(); + }); } void RTCProducerSocket::updateStats() { @@ -150,8 +150,8 @@ void RTCProducerSocket::produce(std::unique_ptr<utils::MemBuf> &&buffer) { producedBytes_ += (uint32_t)(buffer_size + headerSize_ + TIMESTAMP_LEN); producedPackets_++; - auto content_object = std::make_shared<ContentObject>( - flowName_.setSuffix(currentSeg_.load())); + auto content_object = + std::make_shared<ContentObject>(flowName_.setSuffix(currentSeg_.load())); auto payload = utils::MemBuf::create(TIMESTAMP_LEN); memcpy(payload->writableData(), &now, TIMESTAMP_LEN); @@ -166,27 +166,26 @@ void RTCProducerSocket::produce(std::unique_ptr<utils::MemBuf> &&buffer) { output_buffer_.insert(std::static_pointer_cast<ContentObject>( content_object->shared_from_this())); - if (on_content_object_in_output_buffer_ != VOID_HANDLER) { + if (on_content_object_in_output_buffer_) { on_content_object_in_output_buffer_(*this, *content_object); } portal_->sendContentObject(*content_object); - if (on_content_object_output_ != VOID_HANDLER) { + if (on_content_object_output_) { on_content_object_output_(*this, *content_object); } uint32_t old_curr = currentSeg_.load(); currentSeg_ = (currentSeg_.load() + 1) % HICN_MAX_DATA_SEQ; - //remove interests from the interest cache if it exists - //this generates nacks that will tell to the consumer - //that a new data packet was produced - if(!seqs_map_.empty()){ + // remove interests from the interest cache if it exists + // this generates nacks that will tell to the consumer + // that a new data packet was produced + if (!seqs_map_.empty()) { utils::SpinLock::Acquire locked(interests_cache_lock_); - for(auto it = seqs_map_.begin(); it != seqs_map_.end(); it++){ - if(it->first != old_curr) - sendNack(it->first); + for (auto it = seqs_map_.begin(); it != seqs_map_.end(); it++) { + if (it->first != old_curr) sendNack(it->first); } seqs_map_.clear(); timers_map_.clear(); @@ -197,15 +196,15 @@ void RTCProducerSocket::onInterest(Interest::Ptr &&interest) { uint32_t interestSeg = interest->getName().getSuffix(); uint32_t lifetime = interest->getLifetime(); - if (on_interest_input_ != VOID_HANDLER) { + if (on_interest_input_) { on_interest_input_(*this, *interest); } uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); + std::chrono::steady_clock::now().time_since_epoch()) + .count(); - if(interestSeg > HICN_MAX_DATA_SEQ){ + if (interestSeg > HICN_MAX_DATA_SEQ) { sendNack(interestSeg); return; } @@ -214,71 +213,73 @@ void RTCProducerSocket::onInterest(Interest::Ptr &&interest) { output_buffer_.find(*interest); if (content_object) { - if (on_interest_satisfied_output_buffer_ != VOID_HANDLER) { + if (on_interest_satisfied_output_buffer_) { on_interest_satisfied_output_buffer_(*this, *interest); } - if (on_content_object_output_ != VOID_HANDLER) { + if (on_content_object_output_) { on_content_object_output_(*this, *content_object); } portal_->sendContentObject(*content_object); return; } else { - if (on_interest_process_ != VOID_HANDLER) { + if (on_interest_process_) { on_interest_process_(*this, *interest); } } // if the production rate is less than MIN_PRODUCTION_RATE we put the // interest in a queue, otherwise we handle it in the usual way - if(packetsProductionRate_.load() < MIN_PRODUCTION_RATE && - interestSeg >= currentSeg_.load()){ - + if (packetsProductionRate_.load() < MIN_PRODUCTION_RATE && + interestSeg >= currentSeg_.load()) { utils::SpinLock::Acquire locked(interests_cache_lock_); uint64_t next_timer = ~0; - if(!timers_map_.empty()){ + if (!timers_map_.empty()) { next_timer = timers_map_.begin()->first; } uint64_t expiration = now + (lifetime * LIFETIME_FRACTION); - //check if the seq number exists already + // check if the seq number exists already auto it_seqs = seqs_map_.find(interestSeg); - if(it_seqs != seqs_map_.end()){ - //the seq already exists - if(expiration < it_seqs->second){ + if (it_seqs != seqs_map_.end()) { + // the seq already exists + if (expiration < it_seqs->second) { // we need to update the timer becasue we got a smaller one // 1) remove the entry from the multimap // 2) update this entry auto range = timers_map_.equal_range(it_seqs->second); - for(auto it_timers = range.first; it_timers != range.second; it_timers++){ - if(it_timers->second == it_seqs->first){ + for (auto it_timers = range.first; it_timers != range.second; + it_timers++) { + if (it_timers->second == it_seqs->first) { timers_map_.erase(it_timers); break; } } - timers_map_.insert(std::pair<uint64_t,uint32_t>(expiration, interestSeg)); + timers_map_.insert( + std::pair<uint64_t, uint32_t>(expiration, interestSeg)); it_seqs->second = expiration; - }else{ - //nothing to do here - return; + } else { + // nothing to do here + return; } - }else{ + } else { // add the new seq - timers_map_.insert(std::pair<uint64_t,uint32_t>(expiration, interestSeg)); - seqs_map_.insert(std::pair<uint32_t,uint64_t>(interestSeg, expiration)); + timers_map_.insert( + std::pair<uint64_t, uint32_t>(expiration, interestSeg)); + seqs_map_.insert(std::pair<uint32_t, uint64_t>(interestSeg, expiration)); } - //here we have at least one interest in the queue, we need to start or - //update the timer - if(!timer_on_){ - //set timeout + // here we have at least one interest in the queue, we need to start or + // update the timer + if (!timer_on_) { + // set timeout timer_on_ = true; scheduleCacheTimer(timers_map_.begin()->first - now); } else { - //re-schedule the timer because a new interest will expires sooner - if(next_timer > timers_map_.begin()->first){ + // re-schedule the timer because a new interest will expires sooner + if (next_timer > timers_map_.begin()->first) { interests_cache_timer_->cancel(); scheduleCacheTimer(timers_map_.begin()->first - now); } @@ -292,44 +293,43 @@ void RTCProducerSocket::onInterest(Interest::Ptr &&interest) { (double)packetsProductionRate_.load())); if (interestSeg < currentSeg_.load() || - interestSeg > (max_gap + currentSeg_.load())) { + interestSeg > (max_gap + currentSeg_.load())) { sendNack(interestSeg); } // else drop packet } -void RTCProducerSocket::scheduleCacheTimer(uint64_t wait){ - interests_cache_timer_->expires_from_now( - std::chrono::milliseconds(wait)); +void RTCProducerSocket::scheduleCacheTimer(uint64_t wait) { + interests_cache_timer_->expires_from_now(std::chrono::milliseconds(wait)); interests_cache_timer_->async_wait([this](std::error_code ec) { - if (ec) return; - interestCacheTimer(); - }); + if (ec) return; + interestCacheTimer(); + }); } -void RTCProducerSocket::interestCacheTimer(){ +void RTCProducerSocket::interestCacheTimer() { uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); + std::chrono::steady_clock::now().time_since_epoch()) + .count(); utils::SpinLock::Acquire locked(interests_cache_lock_); - for(auto it_timers = timers_map_.begin(); it_timers != timers_map_.end();){ + for (auto it_timers = timers_map_.begin(); it_timers != timers_map_.end();) { uint64_t expire = it_timers->first; - if(expire <= now){ + if (expire <= now) { uint32_t seq = it_timers->second; sendNack(seq); - //remove the interest from the other map + // remove the interest from the other map seqs_map_.erase(seq); it_timers = timers_map_.erase(it_timers); - }else{ - //stop, we are done! + } else { + // stop, we are done! break; } } - if(timers_map_.empty()){ + if (timers_map_.empty()) { timer_on_ = false; - }else{ + } else { timer_on_ = true; scheduleCacheTimer(timers_map_.begin()->first - now); } @@ -351,7 +351,7 @@ void RTCProducerSocket::sendNack(uint32_t sequence) { nack.setLifetime(0); nack.setPathLabel(prodLabel_); - if (on_content_object_output_ != VOID_HANDLER) { + if (on_content_object_output_) { on_content_object_output_(*this, nack); } diff --git a/libtransport/src/hicn/transport/interfaces/socket.h b/libtransport/src/hicn/transport/interfaces/socket.h index 90f6a3ef6..f0194880a 100644 --- a/libtransport/src/hicn/transport/interfaces/socket.h +++ b/libtransport/src/hicn/transport/interfaces/socket.h @@ -27,8 +27,6 @@ #define SOCKET_OPTION_NOT_SET 3 #define SOCKET_OPTION_DEFAULT 12345 -#define VOID_HANDLER 0 - namespace transport { namespace interface { diff --git a/libtransport/src/hicn/transport/interfaces/socket_consumer.cc b/libtransport/src/hicn/transport/interfaces/socket_consumer.cc index e1afd2161..6eae23c85 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_consumer.cc +++ b/libtransport/src/hicn/transport/interfaces/socket_consumer.cc @@ -45,7 +45,6 @@ ConsumerSocket::ConsumerSocket(int protocol, asio::io_service &io_service) rate_estimation_alpha_(default_values::rate_alpha), rate_estimation_observer_(nullptr), rate_estimation_choice_(0), - is_async_(false), verifier_(std::make_shared<utils::Verifier>()), verify_signature_(false), on_interest_output_(VOID_HANDLER), @@ -58,8 +57,8 @@ ConsumerSocket::ConsumerSocket(int protocol, asio::io_service &io_service) stats_summary_(VOID_HANDLER), read_callback_(nullptr), virtual_download_(false), - rtt_stats_(false), - timer_interval_milliseconds_(0) { + timer_interval_milliseconds_(0), + guard_raaqm_params_() { switch (protocol) { case TransportProtocolAlgorithms::CBR: transport_protocol_ = std::make_unique<CbrTransportProtocol>(this); @@ -88,7 +87,6 @@ int ConsumerSocket::consume(const Name &name) { network_name_ = name; network_name_.setSuffix(0); - is_async_ = false; transport_protocol_->start(); @@ -100,7 +98,6 @@ int ConsumerSocket::asyncConsume(const Name &name) { async_downloader_.add([this, name]() { network_name_ = std::move(name); network_name_.setSuffix(0); - is_async_ = true; transport_protocol_->start(); }); } @@ -108,20 +105,6 @@ int ConsumerSocket::asyncConsume(const Name &name) { return CONSUMER_RUNNING; } -void ConsumerSocket::asyncSendInterest(Interest::Ptr &&interest, - Portal::ConsumerCallback *callback) { - if (!async_downloader_.stopped()) { - // TODO Workaround, to be fixed! - auto i = interest.release(); - async_downloader_.add([this, i, callback]() mutable { - Interest::Ptr _interest(i); - portal_->setConsumerCallback(callback); - portal_->sendInterest(std::move(_interest)); - portal_->runEventsLoop(); - }); - } -} - void ConsumerSocket::stop() { if (transport_protocol_->isRunning()) { transport_protocol_->stop(); @@ -138,6 +121,702 @@ asio::io_service &ConsumerSocket::getIoService() { return portal_->getIoService(); } +// If the thread calling lambda_func is not the same of io_service, this +// function reschedule the function on it +template <typename Lambda, typename arg2> +int ConsumerSocket::rescheduleOnIOService(int socket_option_key, + arg2 socket_option_value, + Lambda lambda_func) { + // To enforce type check + std::function<int(int, arg2)> func = lambda_func; + int result = SOCKET_OPTION_SET; + if (transport_protocol_->isRunning()) { + std::mutex mtx; + /* Condition variable for the wait */ + std::condition_variable cv; + bool done = false; + io_service_.dispatch([this, &socket_option_key, &socket_option_value, &mtx, + &cv, &result, &done, &func]() { + std::unique_lock<std::mutex> lck(mtx); + done = true; + result = func(socket_option_key, socket_option_value); + }); + std::unique_lock<std::mutex> lck(mtx); + if (!done) { + cv.wait(lck); + } + } else { + result = func(socket_option_key, socket_option_value); + } + + return result; +} + +int ConsumerSocket::setSocketOption(int socket_option_key, + ReadCallback *socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, ReadCallback *socket_option_value) -> int { + switch (socket_option_key) { + case ConsumerCallbacksOptions::READ_CALLBACK: + read_callback_ = socket_option_value; + break; + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; + }); +} + +int ConsumerSocket::getSocketOption(int socket_option_key, + ReadCallback **socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, ReadCallback **socket_option_value) -> int { + switch (socket_option_key) { + case ConsumerCallbacksOptions::READ_CALLBACK: + *socket_option_value = read_callback_; + break; + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; + }); +} + +int ConsumerSocket::setSocketOption(int socket_option_key, + double socket_option_value) { + utils::SpinLock::Acquire locked(guard_raaqm_params_); + switch (socket_option_key) { + case MIN_WINDOW_SIZE: + min_window_size_ = socket_option_value; + break; + + case MAX_WINDOW_SIZE: + max_window_size_ = socket_option_value; + break; + + case CURRENT_WINDOW_SIZE: + current_window_size_ = socket_option_value; + break; + + case GAMMA_VALUE: + gamma_ = socket_option_value; + break; + + case BETA_VALUE: + beta_ = socket_option_value; + break; + + case DROP_FACTOR: + drop_factor_ = socket_option_value; + break; + + case MINIMUM_DROP_PROBABILITY: + minimum_drop_probability_ = socket_option_value; + break; + + case RATE_ESTIMATION_ALPHA: + if (socket_option_value >= 0 && socket_option_value < 1) { + rate_estimation_alpha_ = socket_option_value; + } else { + rate_estimation_alpha_ = default_values::alpha; + } + break; + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; +} + +int ConsumerSocket::setSocketOption(int socket_option_key, + uint32_t socket_option_value) { + utils::SpinLock::Acquire locked(guard_raaqm_params_); + switch (socket_option_key) { + case GeneralTransportOptions::MAX_INTEREST_RETX: + max_retransmissions_ = socket_option_value; + break; + + case GeneralTransportOptions::INTEREST_LIFETIME: + interest_lifetime_ = socket_option_value; + break; + + case RateEstimationOptions::RATE_ESTIMATION_BATCH_PARAMETER: + if (socket_option_value > 0) { + rate_estimation_batching_parameter_ = socket_option_value; + } else { + rate_estimation_batching_parameter_ = default_values::batch; + } + break; + + case RateEstimationOptions::RATE_ESTIMATION_CHOICE: + if (socket_option_value > 0) { + rate_estimation_choice_ = socket_option_value; + } else { + rate_estimation_choice_ = default_values::rate_choice; + } + break; + + case GeneralTransportOptions::STATS_INTERVAL: + timer_interval_milliseconds_ = socket_option_value; + break; + + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; +} + +int ConsumerSocket::setSocketOption(int socket_option_key, + std::nullptr_t socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, std::nullptr_t socket_option_value) -> int { + switch (socket_option_key) { + case ConsumerCallbacksOptions::INTEREST_RETRANSMISSION: + if (socket_option_value == VOID_HANDLER) { + on_interest_retransmission_ = VOID_HANDLER; + break; + } + + case ConsumerCallbacksOptions::INTEREST_EXPIRED: + if (socket_option_value == VOID_HANDLER) { + on_interest_timeout_ = VOID_HANDLER; + break; + } + + case ConsumerCallbacksOptions::INTEREST_SATISFIED: + if (socket_option_value == VOID_HANDLER) { + on_interest_satisfied_ = VOID_HANDLER; + break; + } + + case ConsumerCallbacksOptions::INTEREST_OUTPUT: + if (socket_option_value == VOID_HANDLER) { + on_interest_output_ = VOID_HANDLER; + break; + } + + case ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT: + if (socket_option_value == VOID_HANDLER) { + on_content_object_input_ = VOID_HANDLER; + break; + } + + case ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY: + if (socket_option_value == VOID_HANDLER) { + on_content_object_verification_ = VOID_HANDLER; + break; + } + + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; + }); +} + +int ConsumerSocket::setSocketOption(int socket_option_key, + bool socket_option_value) { + int result = SOCKET_OPTION_NOT_SET; + if (!transport_protocol_->isRunning()) { + switch (socket_option_key) { + case OtherOptions::VIRTUAL_DOWNLOAD: + virtual_download_ = socket_option_value; + result = SOCKET_OPTION_SET; + break; + + case GeneralTransportOptions::VERIFY_SIGNATURE: + verify_signature_ = socket_option_value; + result = SOCKET_OPTION_SET; + break; + + default: + return result; + } + } + return result; +} + +int ConsumerSocket::setSocketOption( + int socket_option_key, ConsumerContentObjectCallback socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ConsumerContentObjectCallback socket_option_value) -> int { + switch (socket_option_key) { + case ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT: + on_content_object_input_ = socket_option_value; + break; + + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; + }); +} + +int ConsumerSocket::setSocketOption( + int socket_option_key, + ConsumerContentObjectVerificationCallback socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ConsumerContentObjectVerificationCallback socket_option_value) + -> int { + switch (socket_option_key) { + case ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY: + on_content_object_verification_ = socket_option_value; + break; + + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; + }); +} + +int ConsumerSocket::setSocketOption( + int socket_option_key, ConsumerInterestCallback socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ConsumerInterestCallback socket_option_value) -> int { + switch (socket_option_key) { + case ConsumerCallbacksOptions::INTEREST_RETRANSMISSION: + on_interest_retransmission_ = socket_option_value; + break; + + case ConsumerCallbacksOptions::INTEREST_OUTPUT: + on_interest_output_ = socket_option_value; + break; + + case ConsumerCallbacksOptions::INTEREST_EXPIRED: + on_interest_timeout_ = socket_option_value; + break; + + case ConsumerCallbacksOptions::INTEREST_SATISFIED: + on_interest_satisfied_ = socket_option_value; + break; + + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; + }); +} + +int ConsumerSocket::setSocketOption( + int socket_option_key, ConsumerManifestCallback socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ConsumerManifestCallback socket_option_value) -> int { + switch (socket_option_key) { + case ConsumerCallbacksOptions::MANIFEST_INPUT: + on_manifest_ = socket_option_value; + break; + + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; + }); +} + +int ConsumerSocket::setSocketOption(int socket_option_key, + IcnObserver *socket_option_value) { + utils::SpinLock::Acquire locked(guard_raaqm_params_); + switch (socket_option_key) { + case RateEstimationOptions::RATE_ESTIMATION_OBSERVER: + rate_estimation_observer_ = socket_option_value; + break; + + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; +} + +int ConsumerSocket::setSocketOption( + int socket_option_key, + const std::shared_ptr<utils::Verifier> &socket_option_value) { + int result = SOCKET_OPTION_NOT_SET; + if (!transport_protocol_->isRunning()) { + switch (socket_option_key) { + case GeneralTransportOptions::VERIFIER: + verifier_ = socket_option_value; + result = SOCKET_OPTION_SET; + break; + default: + return result; + } + } + + return result; +} + +int ConsumerSocket::setSocketOption(int socket_option_key, + const std::string &socket_option_value) { + int result = SOCKET_OPTION_NOT_SET; + if (!transport_protocol_->isRunning()) { + switch (socket_option_key) { + case GeneralTransportOptions::CERTIFICATE: + key_id_ = verifier_->addKeyFromCertificate(socket_option_value); + + if (key_id_ != nullptr) { + result = SOCKET_OPTION_SET; + } + break; + + case DataLinkOptions::OUTPUT_INTERFACE: + output_interface_ = socket_option_value; + portal_->setOutputInterface(output_interface_); + result = SOCKET_OPTION_SET; + break; + + default: + return result; + } + } + return result; +} + +int ConsumerSocket::setSocketOption(int socket_option_key, + ConsumerTimerCallback socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ConsumerTimerCallback socket_option_value) -> int { + switch (socket_option_key) { + case ConsumerCallbacksOptions::STATS_SUMMARY: + stats_summary_ = socket_option_value; + break; + + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; + }); +} + +int ConsumerSocket::getSocketOption(int socket_option_key, + double &socket_option_value) { + utils::SpinLock::Acquire locked(guard_raaqm_params_); + switch (socket_option_key) { + case GeneralTransportOptions::MIN_WINDOW_SIZE: + socket_option_value = min_window_size_; + break; + + case GeneralTransportOptions::MAX_WINDOW_SIZE: + socket_option_value = max_window_size_; + break; + + case GeneralTransportOptions::CURRENT_WINDOW_SIZE: + socket_option_value = current_window_size_; + break; + + // RAAQM parameters + + case RaaqmTransportOptions::GAMMA_VALUE: + socket_option_value = gamma_; + break; + + case RaaqmTransportOptions::BETA_VALUE: + socket_option_value = beta_; + break; + + case RaaqmTransportOptions::DROP_FACTOR: + socket_option_value = drop_factor_; + break; + + case RaaqmTransportOptions::MINIMUM_DROP_PROBABILITY: + socket_option_value = minimum_drop_probability_; + break; + + case RateEstimationOptions::RATE_ESTIMATION_ALPHA: + socket_option_value = rate_estimation_alpha_; + break; + + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; +} + +int ConsumerSocket::getSocketOption(int socket_option_key, + uint32_t &socket_option_value) { + utils::SpinLock::Acquire locked(guard_raaqm_params_); + switch (socket_option_key) { + case GeneralTransportOptions::MAX_INTEREST_RETX: + socket_option_value = max_retransmissions_; + break; + + case GeneralTransportOptions::INTEREST_LIFETIME: + socket_option_value = interest_lifetime_; + break; + + case RaaqmTransportOptions::SAMPLE_NUMBER: + socket_option_value = sample_number_; + break; + + case RateEstimationOptions::RATE_ESTIMATION_BATCH_PARAMETER: + socket_option_value = rate_estimation_batching_parameter_; + break; + + case RateEstimationOptions::RATE_ESTIMATION_CHOICE: + socket_option_value = rate_estimation_choice_; + break; + + case GeneralTransportOptions::STATS_INTERVAL: + socket_option_value = timer_interval_milliseconds_; + break; + + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; +} + +int ConsumerSocket::getSocketOption(int socket_option_key, + bool &socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::RUNNING: + socket_option_value = transport_protocol_->isRunning(); + break; + + case OtherOptions::VIRTUAL_DOWNLOAD: + socket_option_value = virtual_download_; + break; + + case GeneralTransportOptions::VERIFY_SIGNATURE: + socket_option_value = verify_signature_; + break; + + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; +} + +int ConsumerSocket::getSocketOption(int socket_option_key, + Name **socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::NETWORK_NAME: + *socket_option_value = &network_name_; + break; + + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; +} + +int ConsumerSocket::getSocketOption( + int socket_option_key, + ConsumerContentObjectCallback **socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ConsumerContentObjectCallback **socket_option_value) -> int { + switch (socket_option_key) { + case ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT: + *socket_option_value = &on_content_object_input_; + break; + + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; + }); +} + +int ConsumerSocket::getSocketOption( + int socket_option_key, + ConsumerContentObjectVerificationCallback **socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ConsumerContentObjectVerificationCallback **socket_option_value) + -> int { + switch (socket_option_key) { + case ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY: + *socket_option_value = &on_content_object_verification_; + break; + + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; + }); +} + +int ConsumerSocket::getSocketOption( + int socket_option_key, ConsumerInterestCallback **socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ConsumerInterestCallback **socket_option_value) -> int { + switch (socket_option_key) { + case ConsumerCallbacksOptions::INTEREST_RETRANSMISSION: + *socket_option_value = &on_interest_retransmission_; + break; + + case ConsumerCallbacksOptions::INTEREST_OUTPUT: + *socket_option_value = &on_interest_output_; + break; + + case ConsumerCallbacksOptions::INTEREST_EXPIRED: + *socket_option_value = &on_interest_timeout_; + break; + + case ConsumerCallbacksOptions::INTEREST_SATISFIED: + *socket_option_value = &on_interest_satisfied_; + break; + + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; + }); +} + +int ConsumerSocket::getSocketOption( + int socket_option_key, ConsumerManifestCallback **socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ConsumerManifestCallback **socket_option_value) -> int { + switch (socket_option_key) { + case ConsumerCallbacksOptions::MANIFEST_INPUT: + *socket_option_value = &on_manifest_; + break; + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; + }); +} + +int ConsumerSocket::getSocketOption( + int socket_option_key, std::shared_ptr<Portal> &socket_option_value) { + switch (socket_option_key) { + case PORTAL: + socket_option_value = portal_; + break; + + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; +} + +int ConsumerSocket::getSocketOption(int socket_option_key, + IcnObserver **socket_option_value) { + utils::SpinLock::Acquire locked(guard_raaqm_params_); + switch (socket_option_key) { + case RateEstimationOptions::RATE_ESTIMATION_OBSERVER: + *socket_option_value = (rate_estimation_observer_); + break; + + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; +} + +int ConsumerSocket::getSocketOption( + int socket_option_key, + std::shared_ptr<utils::Verifier> &socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::VERIFIER: + socket_option_value = verifier_; + break; + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; +} + +int ConsumerSocket::getSocketOption(int socket_option_key, + std::string &socket_option_value) { + switch (socket_option_key) { + case DataLinkOptions::OUTPUT_INTERFACE: + socket_option_value = output_interface_; + break; + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; +} + +int ConsumerSocket::getSocketOption( + int socket_option_key, ConsumerTimerCallback **socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ConsumerTimerCallback **socket_option_value) -> int { + switch (socket_option_key) { + case ConsumerCallbacksOptions::STATS_SUMMARY: + *socket_option_value = &stats_summary_; + break; + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; + }); +} + } // namespace interface } // end namespace transport
\ No newline at end of file diff --git a/libtransport/src/hicn/transport/interfaces/socket_consumer.h b/libtransport/src/hicn/transport/interfaces/socket_consumer.h index 8f7a9718c..e3620b269 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_consumer.h +++ b/libtransport/src/hicn/transport/interfaces/socket_consumer.h @@ -179,20 +179,6 @@ class ConsumerSocket : public BaseSocket { int asyncConsume(const Name &name); /** - * Send an interest asynchronously in another thread, which is the same used - * for asyncConsume. - * - * @param interest - An Interest::Ptr to the interest. Notice that the - * application looses the ownership of the interest, which is transferred to - * the library itself. - * @param callback - A ConsumerCallback containing the events to be trigger in - * case of timeout or content reception. - * - */ - void asyncSendInterest(Interest::Ptr &&interest, - Portal::ConsumerCallback *callback); - - /** * Stops the consumer socket. If several downloads are queued (using * asyncConsume), this call stops just the current one. */ @@ -211,595 +197,94 @@ class ConsumerSocket : public BaseSocket { */ asio::io_service &getIoService() override; - TRANSPORT_ALWAYS_INLINE int setSocketOption( - int socket_option_key, ReadCallback *socket_option_value) { - switch (socket_option_key) { - case ConsumerCallbacksOptions::READ_CALLBACK: - read_callback_ = socket_option_value; - break; - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } - - TRANSPORT_ALWAYS_INLINE int getSocketOption( - int socket_option_key, ReadCallback **socket_option_value) { - switch (socket_option_key) { - case ConsumerCallbacksOptions::READ_CALLBACK: - *socket_option_value = read_callback_; - break; - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } - - TRANSPORT_ALWAYS_INLINE int setSocketOption(int socket_option_key, - double socket_option_value) { - switch (socket_option_key) { - case MIN_WINDOW_SIZE: - min_window_size_ = socket_option_value; - break; - - case MAX_WINDOW_SIZE: - max_window_size_ = socket_option_value; - break; - - case CURRENT_WINDOW_SIZE: - current_window_size_ = socket_option_value; - break; - - case GAMMA_VALUE: - gamma_ = socket_option_value; - break; - - case BETA_VALUE: - beta_ = socket_option_value; - break; - - case DROP_FACTOR: - drop_factor_ = socket_option_value; - break; - - case MINIMUM_DROP_PROBABILITY: - minimum_drop_probability_ = socket_option_value; - break; - - case RATE_ESTIMATION_ALPHA: - if (socket_option_value >= 0 && socket_option_value < 1) { - rate_estimation_alpha_ = socket_option_value; - } else { - rate_estimation_alpha_ = default_values::alpha; - } - break; - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } - - TRANSPORT_ALWAYS_INLINE int setSocketOption(int socket_option_key, - uint32_t socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::INPUT_BUFFER_SIZE: - input_buffer_size_ = socket_option_value; - break; - - case GeneralTransportOptions::OUTPUT_BUFFER_SIZE: - output_buffer_size_ = socket_option_value; - break; - - case GeneralTransportOptions::MAX_INTEREST_RETX: - max_retransmissions_ = socket_option_value; - break; - - case GeneralTransportOptions::INTEREST_LIFETIME: - interest_lifetime_ = socket_option_value; - break; - - case ConsumerCallbacksOptions::INTEREST_RETRANSMISSION: - if (socket_option_value == VOID_HANDLER) { - on_interest_retransmission_ = VOID_HANDLER; - break; - } - - case ConsumerCallbacksOptions::INTEREST_EXPIRED: - if (socket_option_value == VOID_HANDLER) { - on_interest_timeout_ = VOID_HANDLER; - break; - } - - case ConsumerCallbacksOptions::INTEREST_SATISFIED: - if (socket_option_value == VOID_HANDLER) { - on_interest_satisfied_ = VOID_HANDLER; - break; - } - - case ConsumerCallbacksOptions::INTEREST_OUTPUT: - if (socket_option_value == VOID_HANDLER) { - on_interest_output_ = VOID_HANDLER; - break; - } - - case ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT: - if (socket_option_value == VOID_HANDLER) { - on_content_object_input_ = VOID_HANDLER; - break; - } - - case ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY: - if (socket_option_value == VOID_HANDLER) { - on_content_object_verification_ = VOID_HANDLER; - break; - } - - case RateEstimationOptions::RATE_ESTIMATION_BATCH_PARAMETER: - if (socket_option_value > 0) { - rate_estimation_batching_parameter_ = socket_option_value; - } else { - rate_estimation_batching_parameter_ = default_values::batch; - } - break; - - case RateEstimationOptions::RATE_ESTIMATION_CHOICE: - if (socket_option_value > 0) { - rate_estimation_choice_ = socket_option_value; - } else { - rate_estimation_choice_ = default_values::rate_choice; - } - break; - - case GeneralTransportOptions::STATS_INTERVAL: - timer_interval_milliseconds_ = socket_option_value; - break; - - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } - - TRANSPORT_ALWAYS_INLINE int setSocketOption(int socket_option_key, - bool socket_option_value) { - switch (socket_option_key) { - case OtherOptions::VIRTUAL_DOWNLOAD: - virtual_download_ = socket_option_value; - break; - - case RaaqmTransportOptions::RTT_STATS: - rtt_stats_ = socket_option_value; - break; - - case GeneralTransportOptions::VERIFY_SIGNATURE: - verify_signature_ = socket_option_value; - break; - - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } - - TRANSPORT_ALWAYS_INLINE int setSocketOption(int socket_option_key, - Name *socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::NETWORK_NAME: - network_name_ = *socket_option_value; - break; - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } - - TRANSPORT_ALWAYS_INLINE int setSocketOption( - int socket_option_key, - ConsumerContentObjectCallback socket_option_value) { - switch (socket_option_key) { - case ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT: - on_content_object_input_ = socket_option_value; - break; + virtual int setSocketOption(int socket_option_key, + ReadCallback *socket_option_value); + + virtual int getSocketOption(int socket_option_key, + ReadCallback **socket_option_value); - default: - return SOCKET_OPTION_NOT_SET; - } + virtual int setSocketOption(int socket_option_key, + double socket_option_value); - return SOCKET_OPTION_SET; - } + virtual int setSocketOption(int socket_option_key, + uint32_t socket_option_value); - TRANSPORT_ALWAYS_INLINE int setSocketOption( + virtual int setSocketOption(int socket_option_key, + std::nullptr_t socket_option_value); + + virtual int setSocketOption(int socket_option_key, bool socket_option_value); + + virtual int setSocketOption( + int socket_option_key, ConsumerContentObjectCallback socket_option_value); + + virtual int setSocketOption( int socket_option_key, - ConsumerContentObjectVerificationCallback socket_option_value) { - switch (socket_option_key) { - case ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY: - on_content_object_verification_ = socket_option_value; - break; - - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } - - TRANSPORT_ALWAYS_INLINE int setSocketOption( - int socket_option_key, ConsumerInterestCallback socket_option_value) { - switch (socket_option_key) { - case ConsumerCallbacksOptions::INTEREST_RETRANSMISSION: - on_interest_retransmission_ = socket_option_value; - break; - - case ConsumerCallbacksOptions::INTEREST_OUTPUT: - on_interest_output_ = socket_option_value; - break; - - case ConsumerCallbacksOptions::INTEREST_EXPIRED: - on_interest_timeout_ = socket_option_value; - break; - - case ConsumerCallbacksOptions::INTEREST_SATISFIED: - on_interest_satisfied_ = socket_option_value; - break; - - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } - - TRANSPORT_ALWAYS_INLINE int setSocketOption( - int socket_option_key, ConsumerManifestCallback socket_option_value) { - switch (socket_option_key) { - case ConsumerCallbacksOptions::MANIFEST_INPUT: - on_manifest_ = socket_option_value; - break; - - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } - - TRANSPORT_ALWAYS_INLINE int setSocketOption( - int socket_option_key, IcnObserver *socket_option_value) { - switch (socket_option_key) { - case RateEstimationOptions::RATE_ESTIMATION_OBSERVER: - rate_estimation_observer_ = socket_option_value; - break; - - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } - - TRANSPORT_ALWAYS_INLINE int setSocketOption( + ConsumerContentObjectVerificationCallback socket_option_value); + + virtual int setSocketOption(int socket_option_key, + ConsumerInterestCallback socket_option_value); + + virtual int setSocketOption(int socket_option_key, + ConsumerManifestCallback socket_option_value); + + virtual int setSocketOption(int socket_option_key, + IcnObserver *socket_option_value); + + virtual int setSocketOption( int socket_option_key, - const std::shared_ptr<utils::Verifier> &socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::VERIFIER: - verifier_ = socket_option_value; - break; - - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } - - TRANSPORT_ALWAYS_INLINE int setSocketOption( - int socket_option_key, const std::string &socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::CERTIFICATE: - key_id_ = verifier_->addKeyFromCertificate(socket_option_value); - - if (key_id_ != nullptr) { - break; - } - - case DataLinkOptions::OUTPUT_INTERFACE: - output_interface_ = socket_option_value; - portal_->setOutputInterface(output_interface_); - break; - - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } - - TRANSPORT_ALWAYS_INLINE int setSocketOption( - int socket_option_key, ConsumerTimerCallback socket_option_value) { - switch (socket_option_key) { - case ConsumerCallbacksOptions::STATS_SUMMARY: - stats_summary_ = socket_option_value; - break; - - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } - - TRANSPORT_ALWAYS_INLINE int getSocketOption(int socket_option_key, - double &socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::MIN_WINDOW_SIZE: - socket_option_value = min_window_size_; - break; - - case GeneralTransportOptions::MAX_WINDOW_SIZE: - socket_option_value = max_window_size_; - break; - - case GeneralTransportOptions::CURRENT_WINDOW_SIZE: - socket_option_value = current_window_size_; - break; - - // RAAQM parameters - - case RaaqmTransportOptions::GAMMA_VALUE: - socket_option_value = gamma_; - break; - - case RaaqmTransportOptions::BETA_VALUE: - socket_option_value = beta_; - break; - - case RaaqmTransportOptions::DROP_FACTOR: - socket_option_value = drop_factor_; - break; - - case RaaqmTransportOptions::MINIMUM_DROP_PROBABILITY: - socket_option_value = minimum_drop_probability_; - break; - - case RateEstimationOptions::RATE_ESTIMATION_ALPHA: - socket_option_value = rate_estimation_alpha_; - break; - - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } - - TRANSPORT_ALWAYS_INLINE int getSocketOption(int socket_option_key, - uint32_t &socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::INPUT_BUFFER_SIZE: - socket_option_value = (uint32_t)input_buffer_size_; - break; - - case GeneralTransportOptions::OUTPUT_BUFFER_SIZE: - socket_option_value = (uint32_t)output_buffer_size_; - break; - - case GeneralTransportOptions::MAX_INTEREST_RETX: - socket_option_value = max_retransmissions_; - break; - - case GeneralTransportOptions::INTEREST_LIFETIME: - socket_option_value = interest_lifetime_; - break; - - case RaaqmTransportOptions::SAMPLE_NUMBER: - socket_option_value = sample_number_; - break; - - case RateEstimationOptions::RATE_ESTIMATION_BATCH_PARAMETER: - socket_option_value = rate_estimation_batching_parameter_; - break; - - case RateEstimationOptions::RATE_ESTIMATION_CHOICE: - socket_option_value = rate_estimation_choice_; - break; - - case GeneralTransportOptions::STATS_INTERVAL: - socket_option_value = timer_interval_milliseconds_; - break; - - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } - - TRANSPORT_ALWAYS_INLINE int getSocketOption(int socket_option_key, - bool &socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::ASYNC_MODE: - socket_option_value = is_async_; - break; + const std::shared_ptr<utils::Verifier> &socket_option_value); - case GeneralTransportOptions::RUNNING: - socket_option_value = transport_protocol_->isRunning(); - break; + virtual int setSocketOption(int socket_option_key, + const std::string &socket_option_value); - case OtherOptions::VIRTUAL_DOWNLOAD: - socket_option_value = virtual_download_; - break; - - case RaaqmTransportOptions::RTT_STATS: - socket_option_value = rtt_stats_; - break; + virtual int setSocketOption(int socket_option_key, + ConsumerTimerCallback socket_option_value); - case GeneralTransportOptions::VERIFY_SIGNATURE: - socket_option_value = verify_signature_; - break; + virtual int getSocketOption(int socket_option_key, + double &socket_option_value); - default: - return SOCKET_OPTION_NOT_GET; - } + virtual int getSocketOption(int socket_option_key, + uint32_t &socket_option_value); - return SOCKET_OPTION_GET; - } - - TRANSPORT_ALWAYS_INLINE int getSocketOption(int socket_option_key, - Name **socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::NETWORK_NAME: - *socket_option_value = &network_name_; - break; - - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } - - TRANSPORT_ALWAYS_INLINE int getSocketOption( - int socket_option_key, - ConsumerContentObjectCallback **socket_option_value) { - switch (socket_option_key) { - case ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT: - *socket_option_value = &on_content_object_input_; - break; + virtual int getSocketOption(int socket_option_key, bool &socket_option_value); - default: - return SOCKET_OPTION_NOT_GET; - } + virtual int getSocketOption(int socket_option_key, + Name **socket_option_value); - return SOCKET_OPTION_GET; - } + virtual int getSocketOption( + int socket_option_key, + ConsumerContentObjectCallback **socket_option_value); - TRANSPORT_ALWAYS_INLINE int getSocketOption( + virtual int getSocketOption( int socket_option_key, - ConsumerContentObjectVerificationCallback **socket_option_value) { - switch (socket_option_key) { - case ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY: - *socket_option_value = &on_content_object_verification_; - break; - - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } - - TRANSPORT_ALWAYS_INLINE int getSocketOption( - int socket_option_key, ConsumerInterestCallback **socket_option_value) { - switch (socket_option_key) { - case ConsumerCallbacksOptions::INTEREST_RETRANSMISSION: - *socket_option_value = &on_interest_retransmission_; - break; - - case ConsumerCallbacksOptions::INTEREST_OUTPUT: - *socket_option_value = &on_interest_output_; - break; - - case ConsumerCallbacksOptions::INTEREST_EXPIRED: - *socket_option_value = &on_interest_timeout_; - break; - - case ConsumerCallbacksOptions::INTEREST_SATISFIED: - *socket_option_value = &on_interest_satisfied_; - break; - - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } - - TRANSPORT_ALWAYS_INLINE int getSocketOption( - int socket_option_key, ConsumerManifestCallback **socket_option_value) { - switch (socket_option_key) { - case ConsumerCallbacksOptions::MANIFEST_INPUT: - *socket_option_value = &on_manifest_; - break; - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } - - TRANSPORT_ALWAYS_INLINE int getSocketOption( - int socket_option_key, std::shared_ptr<Portal> &socket_option_value) { - switch (socket_option_key) { - case PORTAL: - socket_option_value = portal_; - break; - - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } - - TRANSPORT_ALWAYS_INLINE int getSocketOption( - int socket_option_key, IcnObserver **socket_option_value) { - switch (socket_option_key) { - case RateEstimationOptions::RATE_ESTIMATION_OBSERVER: - *socket_option_value = (rate_estimation_observer_); - break; - - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } - - TRANSPORT_ALWAYS_INLINE int getSocketOption( + ConsumerContentObjectVerificationCallback **socket_option_value); + + virtual int getSocketOption(int socket_option_key, + ConsumerInterestCallback **socket_option_value); + + virtual int getSocketOption(int socket_option_key, + ConsumerManifestCallback **socket_option_value); + + virtual int getSocketOption(int socket_option_key, + std::shared_ptr<Portal> &socket_option_value); + + virtual int getSocketOption(int socket_option_key, + IcnObserver **socket_option_value); + + virtual int getSocketOption( int socket_option_key, - std::shared_ptr<utils::Verifier> &socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::VERIFIER: - socket_option_value = verifier_; - break; - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } - - TRANSPORT_ALWAYS_INLINE int getSocketOption( - int socket_option_key, std::string &socket_option_value) { - switch (socket_option_key) { - case DataLinkOptions::OUTPUT_INTERFACE: - socket_option_value = output_interface_; - break; - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } - - TRANSPORT_ALWAYS_INLINE int getSocketOption( - int socket_option_key, ConsumerTimerCallback **socket_option_value) { - switch (socket_option_key) { - case ConsumerCallbacksOptions::STATS_SUMMARY: - *socket_option_value = &stats_summary_; - break; - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } + std::shared_ptr<utils::Verifier> &socket_option_value); + + virtual int getSocketOption(int socket_option_key, + std::string &socket_option_value); + + virtual int getSocketOption(int socket_option_key, + ConsumerTimerCallback **socket_option_value); + + protected: + template <typename Lambda, typename arg2> + int rescheduleOnIOService(int socket_option_key, arg2 socket_option_value, + Lambda lambda_func); private: asio::io_service internal_io_service_; @@ -808,6 +293,9 @@ class ConsumerSocket : public BaseSocket { std::shared_ptr<Portal> portal_; utils::EventThread async_downloader_; + // No need to protect from multiple accesses in the async consumer + // The parameter is accessible only with a getSocketOption and + // set from the consume Name network_name_; int interest_lifetime_; @@ -816,8 +304,6 @@ class ConsumerSocket : public BaseSocket { double max_window_size_; double current_window_size_; uint32_t max_retransmissions_; - size_t output_buffer_size_; - size_t input_buffer_size_; // RAAQM Parameters double minimum_drop_probability_; @@ -832,12 +318,10 @@ class ConsumerSocket : public BaseSocket { int rate_estimation_batching_parameter_; int rate_estimation_choice_; - bool is_async_; - // Verification parameters std::shared_ptr<utils::Verifier> verifier_; PARCKeyId *key_id_; - bool verify_signature_; + std::atomic_bool verify_signature_; ConsumerInterestCallback on_interest_retransmission_; ConsumerInterestCallback on_interest_output_; @@ -853,12 +337,13 @@ class ConsumerSocket : public BaseSocket { // Virtual download for traffic generator bool virtual_download_; - bool rtt_stats_; uint32_t timer_interval_milliseconds_; // Transport protocol std::unique_ptr<TransportProtocol> transport_protocol_; + + utils::SpinLock guard_raaqm_params_; }; } // namespace interface diff --git a/libtransport/src/hicn/transport/interfaces/socket_options_default_values.h b/libtransport/src/hicn/transport/interfaces/socket_options_default_values.h index 13029e83a..bcf103b8c 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_options_default_values.h +++ b/libtransport/src/hicn/transport/interfaces/socket_options_default_values.h @@ -31,8 +31,6 @@ static constexpr uint32_t content_object_expiry_time = never_expire_time; // milliseconds -> 50 seconds static constexpr uint32_t content_object_packet_size = 1500; // The ethernet MTU -static constexpr uint32_t producer_socket_input_buffer_size = - 150000; // Interests static constexpr uint32_t producer_socket_output_buffer_size = 150000; // Content Object static constexpr uint32_t log_2_default_buffer_size = 12; diff --git a/libtransport/src/hicn/transport/interfaces/socket_options_keys.h b/libtransport/src/hicn/transport/interfaces/socket_options_keys.h index c21108186..e14f0f412 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_options_keys.h +++ b/libtransport/src/hicn/transport/interfaces/socket_options_keys.h @@ -35,22 +35,21 @@ typedef enum { INTEREST_LIFETIME = 107, CONTENT_OBJECT_EXPIRY_TIME = 108, KEY_LOCATOR = 110, - SIGNATURE_TYPE = 111, - MIN_WINDOW_SIZE = 112, - MAX_WINDOW_SIZE = 113, - CURRENT_WINDOW_SIZE = 114, - ASYNC_MODE = 115, - MAKE_MANIFEST = 116, - PORTAL = 117, - RUNNING = 118, - APPLICATION_BUFFER = 119, - HASH_ALGORITHM = 120, - CRYPTO_SUITE = 121, - IDENTITY = 122, - VERIFIER = 123, - CERTIFICATE = 124, - VERIFY_SIGNATURE = 125, - STATS_INTERVAL = 126 + MIN_WINDOW_SIZE = 111, + MAX_WINDOW_SIZE = 112, + CURRENT_WINDOW_SIZE = 113, + ASYNC_MODE = 114, + MAKE_MANIFEST = 115, + PORTAL = 116, + RUNNING = 117, + APPLICATION_BUFFER = 118, + HASH_ALGORITHM = 119, + CRYPTO_SUITE = 120, + IDENTITY = 121, + VERIFIER = 122, + CERTIFICATE = 123, + VERIFY_SIGNATURE = 124, + STATS_INTERVAL = 125 } GeneralTransportOptions; typedef enum { diff --git a/libtransport/src/hicn/transport/interfaces/socket_producer.cc b/libtransport/src/hicn/transport/interfaces/socket_producer.cc index 9ca004c41..bc93e77c6 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_producer.cc +++ b/libtransport/src/hicn/transport/interfaces/socket_producer.cc @@ -37,10 +37,7 @@ ProducerSocket::ProducerSocket(asio::io_service &io_service) output_buffer_(default_values::producer_socket_output_buffer_size), registration_status_(REGISTRATION_NOT_ATTEMPTED), making_manifest_(false), - signature_type_(SHA_256), hash_algorithm_(HashAlgorithm::SHA_256), - input_buffer_capacity_(default_values::producer_socket_input_buffer_size), - input_buffer_size_(0), on_interest_input_(VOID_HANDLER), on_interest_dropped_input_buffer_(VOID_HANDLER), on_interest_inserted_input_buffer_(VOID_HANDLER), @@ -98,22 +95,30 @@ void ProducerSocket::listen() { void ProducerSocket::passContentObjectToCallbacks( const std::shared_ptr<ContentObject> &content_object) { if (content_object) { - if (on_new_segment_ != VOID_HANDLER) { - on_new_segment_(*this, *content_object); + if (on_new_segment_) { + io_service_.dispatch([this, content_object]() { + on_new_segment_(*this, *content_object); + }); } - if (on_content_object_to_sign_ != VOID_HANDLER) { - on_content_object_to_sign_(*this, *content_object); + if (on_content_object_to_sign_) { + io_service_.dispatch([this, content_object]() { + on_content_object_to_sign_(*this, *content_object); + }); } - if (on_content_object_in_output_buffer_ != VOID_HANDLER) { - on_content_object_in_output_buffer_(*this, *content_object); + if (on_content_object_in_output_buffer_) { + io_service_.dispatch([this, content_object]() { + on_content_object_in_output_buffer_(*this, *content_object); + }); } output_buffer_.insert(content_object); - if (on_content_object_output_ != VOID_HANDLER) { - on_content_object_output_(*this, *content_object); + if (on_content_object_output_) { + io_service_.dispatch([this, content_object]() { + on_content_object_output_(*this, *content_object); + }); } portal_->sendContentObject(*content_object); @@ -121,15 +126,19 @@ void ProducerSocket::passContentObjectToCallbacks( } void ProducerSocket::produce(ContentObject &content_object) { - if (on_content_object_in_output_buffer_ != VOID_HANDLER) { - on_content_object_in_output_buffer_(*this, content_object); + if (on_content_object_in_output_buffer_) { + io_service_.dispatch([this, &content_object]() { + on_content_object_in_output_buffer_(*this, content_object); + }); } output_buffer_.insert(std::static_pointer_cast<ContentObject>( content_object.shared_from_this())); - if (on_content_object_output_ != VOID_HANDLER) { - on_content_object_output_(*this, content_object); + if (on_content_object_output_) { + io_service_.dispatch([this, &content_object]() { + on_content_object_output_(*this, content_object); + }); } portal_->sendContentObject(content_object); @@ -142,6 +151,15 @@ uint32_t ProducerSocket::produce(Name content_name, return 0; } + // Copy the atomic variable to ensure always the same value during the a + // production + std::size_t data_packet_size = data_packet_size_; + uint32_t content_object_expiry_time = content_object_expiry_time_; + HashAlgorithm algo = hash_algorithm_; + bool making_manifest = making_manifest_; + std::shared_ptr<utils::Identity> identity; + getSocketOption(GeneralTransportOptions::IDENTITY, identity); + auto buffer_size = buffer->length(); const std::size_t hash_size = 32; @@ -162,7 +180,7 @@ uint32_t ProducerSocket::produce(Name content_name, std::unique_ptr<utils::CryptoHash> zero_hash; // TODO Manifest may still be used for indexing - if (making_manifest_ && !identity_) { + if (making_manifest && !identity) { throw errors::RuntimeException( "Making manifests without setting producer identity. Aborting."); } @@ -182,18 +200,18 @@ uint32_t ProducerSocket::produce(Name content_name, } format = hf_format; - if (making_manifest_) { + if (making_manifest) { format = hf_format; manifest_header_size = core::Packet::getHeaderSizeFromFormat( - hf_format_ah, identity_->getSignatureLength()); - } else if (identity_) { + hf_format_ah, identity->getSignatureLength()); + } else if (identity) { format = hf_format_ah; - signature_length = identity_->getSignatureLength(); + signature_length = identity->getSignatureLength(); } header_size = core::Packet::getHeaderSizeFromFormat(format, signature_length); - free_space_for_content = data_packet_size_ - header_size; + free_space_for_content = data_packet_size - header_size; uint32_t number_of_segments = uint32_t(std::ceil(double(buffer_size) / double(free_space_for_content))); @@ -204,9 +222,9 @@ uint32_t ProducerSocket::produce(Name content_name, // TODO allocate space for all the headers - if (making_manifest_) { + if (making_manifest) { auto segment_in_manifest = static_cast<float>( - std::floor(double(data_packet_size_ - manifest_header_size - + std::floor(double(data_packet_size - manifest_header_size - ContentObjectManifest::getManifestHeaderSize()) / (4.0 + 32.0)) - 1.0); @@ -219,8 +237,8 @@ uint32_t ProducerSocket::produce(Name content_name, core::ManifestVersion::VERSION_1, core::ManifestType::INLINE_MANIFEST, hash_algorithm_, is_last_manifest, content_name, core::NextSegmentCalculationStrategy::INCREMENTAL, - identity_->getSignatureLength())); - manifest->setLifetime(content_object_expiry_time_); + identity->getSignatureLength())); + manifest->setLifetime(content_object_expiry_time); if (is_last) { manifest->setFinalBlockNumber(final_block_number); @@ -231,21 +249,21 @@ uint32_t ProducerSocket::produce(Name content_name, uint8_t hash[hash_size]; std::memset(hash, 0, hash_size); zero_hash = std::make_unique<utils::CryptoHash>( - hash, hash_size, static_cast<utils::CryptoHashType>(hash_algorithm_)); + hash, hash_size, static_cast<utils::CryptoHashType>(algo)); } for (unsigned int packaged_segments = 0; packaged_segments < number_of_segments; packaged_segments++) { - if (making_manifest_) { + if (making_manifest) { if (manifest->estimateManifestSize(2) > - data_packet_size_ - manifest_header_size) { + data_packet_size - manifest_header_size) { // Add next manifest manifest->addSuffixHash(current_segment, *zero_hash); // Send the current manifest manifest->encode(); - identity_->getSigner().sign(*manifest); + identity->getSigner().sign(*manifest); passContentObjectToCallbacks(manifest); @@ -258,8 +276,8 @@ uint32_t ProducerSocket::produce(Name content_name, core::ManifestType::INLINE_MANIFEST, hash_algorithm_, is_last_manifest, content_name, core::NextSegmentCalculationStrategy::INCREMENTAL, - identity_->getSignatureLength())); - manifest->setLifetime(content_object_expiry_time_); + identity->getSignatureLength())); + manifest->setLifetime(content_object_expiry_time); if (is_last) { manifest->setFinalBlockNumber(final_block_number); } else { @@ -271,7 +289,7 @@ uint32_t ProducerSocket::produce(Name content_name, auto content_object = std::make_shared<ContentObject>( content_name.setSuffix(current_segment), format); - content_object->setLifetime(content_object_expiry_time_); + content_object->setLifetime(content_object_expiry_time); auto b = buffer->cloneOne(); b->trimStart(free_space_for_content * packaged_segments); @@ -280,7 +298,7 @@ uint32_t ProducerSocket::produce(Name content_name, b->append(buffer_size - bytes_segmented); bytes_segmented += (int)(buffer_size - bytes_segmented); - if (is_last && making_manifest_) { + if (is_last && making_manifest) { is_last_manifest = true; } else if (is_last) { content_object->setRst(); @@ -293,19 +311,19 @@ uint32_t ProducerSocket::produce(Name content_name, content_object->appendPayload(std::move(b)); - if (making_manifest_) { + if (making_manifest) { using namespace std::chrono_literals; utils::CryptoHash hash = content_object->computeDigest(hash_algorithm_); manifest->addSuffixHash(current_segment, hash); - } else if (identity_) { - identity_->getSigner().sign(*content_object); + } else if (identity) { + identity->getSigner().sign(*content_object); } current_segment++; passContentObjectToCallbacks(content_object); } - if (making_manifest_) { + if (making_manifest) { if (is_last_manifest) { manifest->setFinalManifest(is_last_manifest); } @@ -315,13 +333,15 @@ uint32_t ProducerSocket::produce(Name content_name, } manifest->encode(); - identity_->getSigner().sign(*manifest); + identity->getSigner().sign(*manifest); passContentObjectToCallbacks(manifest); } - if (on_content_produced_ != VOID_HANDLER) { - on_content_produced_(*this, std::make_error_code(std::errc(0)), - buffer_size); + if (on_content_produced_) { + io_service_.dispatch([this, buffer_size]() { + on_content_produced_(*this, std::make_error_code(std::errc(0)), + buffer_size); + }); } return current_segment - start_offset; @@ -347,7 +367,7 @@ void ProducerSocket::asyncProduce(const Name &suffix, const uint8_t *buf, } void ProducerSocket::onInterest(Interest &interest) { - if (on_interest_input_ != VOID_HANDLER) { + if (on_interest_input_) { on_interest_input_(*this, interest); } @@ -355,22 +375,571 @@ void ProducerSocket::onInterest(Interest &interest) { output_buffer_.find(interest); if (content_object) { - if (on_interest_satisfied_output_buffer_ != VOID_HANDLER) { + if (on_interest_satisfied_output_buffer_) { on_interest_satisfied_output_buffer_(*this, interest); } - if (on_content_object_output_ != VOID_HANDLER) { + if (on_content_object_output_) { on_content_object_output_(*this, *content_object); } portal_->sendContentObject(*content_object); } else { - if (on_interest_process_ != VOID_HANDLER) { + if (on_interest_process_) { on_interest_process_(*this, interest); } } } +// If the thread calling lambda_func is not the same of io_service, this +// function reschedule the function on it +template <typename Lambda, typename arg2> +int ProducerSocket::rescheduleOnIOService(int socket_option_key, + arg2 socket_option_value, + Lambda lambda_func) { + // To enforce type check + std::function<int(int, arg2)> func = lambda_func; + int result = SOCKET_OPTION_SET; + if (listening_thread_.joinable() && + std::this_thread::get_id() != listening_thread_.get_id()) { + std::mutex mtx; + /* Condition variable for the wait */ + std::condition_variable cv; + bool done = false; + io_service_.dispatch([this, &socket_option_key, &socket_option_value, &mtx, + &cv, &result, &done, &func]() { + std::unique_lock<std::mutex> lck(mtx); + done = true; + result = func(socket_option_key, socket_option_value); + }); + std::unique_lock<std::mutex> lck(mtx); + if (!done) { + cv.wait(lck); + } + } else { + result = func(socket_option_key, socket_option_value); + } + + return result; +} + +// If the thread calling lambda_func is not the same of io_service, this +// function reschedule the function on it +template <typename Lambda, typename arg2> +int ProducerSocket::rescheduleOnIOServiceWithReference( + int socket_option_key, arg2 &socket_option_value, Lambda lambda_func) { + // To enforce type check + std::function<int(int, arg2 &)> func = lambda_func; + int result = SOCKET_OPTION_SET; + if (listening_thread_.joinable() && + std::this_thread::get_id() != this->listening_thread_.get_id()) { + std::mutex mtx; + /* Condition variable for the wait */ + std::condition_variable cv; + std::unique_lock<std::mutex> lck(mtx); + bool done = false; + io_service_.dispatch([this, &socket_option_key, &socket_option_value, &mtx, + &cv, &result, &done, &func]() { + std::unique_lock<std::mutex> lck(mtx); + done = true; + result = func(socket_option_key, socket_option_value); + + if (!done) { + cv.wait(lck); + } + }); + } else { + result = func(socket_option_key, socket_option_value); + } + + return result; +} + +int ProducerSocket::setSocketOption(int socket_option_key, + uint32_t socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::DATA_PACKET_SIZE: + if (socket_option_value < default_values::max_content_object_size && + socket_option_value > 0) { + data_packet_size_ = socket_option_value; + break; + } + + case GeneralTransportOptions::OUTPUT_BUFFER_SIZE: + output_buffer_.setLimit(socket_option_value); + break; + + case GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME: + content_object_expiry_time_ = socket_option_value; + break; + + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; +} + +int ProducerSocket::setSocketOption(int socket_option_key, + std::nullptr_t socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ProducerContentObjectCallback socket_option_value) -> int { + switch (socket_option_key) { + case ProducerCallbacksOptions::INTEREST_INPUT: + if (socket_option_value == VOID_HANDLER) { + on_interest_input_ = VOID_HANDLER; + break; + } + + case ProducerCallbacksOptions::INTEREST_DROP: + if (socket_option_value == VOID_HANDLER) { + on_interest_dropped_input_buffer_ = VOID_HANDLER; + break; + } + + case ProducerCallbacksOptions::INTEREST_PASS: + if (socket_option_value == VOID_HANDLER) { + on_interest_inserted_input_buffer_ = VOID_HANDLER; + break; + } + + case ProducerCallbacksOptions::CACHE_HIT: + if (socket_option_value == VOID_HANDLER) { + on_interest_satisfied_output_buffer_ = VOID_HANDLER; + break; + } + + case ProducerCallbacksOptions::CACHE_MISS: + if (socket_option_value == VOID_HANDLER) { + on_interest_process_ = VOID_HANDLER; + break; + } + + case ProducerCallbacksOptions::NEW_CONTENT_OBJECT: + if (socket_option_value == VOID_HANDLER) { + on_new_segment_ = VOID_HANDLER; + break; + } + + case ProducerCallbacksOptions::CONTENT_OBJECT_SIGN: + if (socket_option_value == VOID_HANDLER) { + on_content_object_to_sign_ = VOID_HANDLER; + break; + } + + case ProducerCallbacksOptions::CONTENT_OBJECT_READY: + if (socket_option_value == VOID_HANDLER) { + on_content_object_in_output_buffer_ = VOID_HANDLER; + break; + } + + case ProducerCallbacksOptions::CONTENT_OBJECT_OUTPUT: + if (socket_option_value == VOID_HANDLER) { + on_content_object_output_ = VOID_HANDLER; + break; + } + + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; + }); +} + +int ProducerSocket::setSocketOption(int socket_option_key, + bool socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::MAKE_MANIFEST: + making_manifest_ = socket_option_value; + break; + + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; +} + +int ProducerSocket::setSocketOption(int socket_option_key, + Name *socket_option_value) { + return SOCKET_OPTION_NOT_SET; +} + +int ProducerSocket::setSocketOption(int socket_option_key, + std::list<Prefix> socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::NETWORK_NAME: + served_namespaces_ = socket_option_value; + break; + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; +} + +int ProducerSocket::setSocketOption( + int socket_option_key, ProducerContentObjectCallback socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ProducerContentObjectCallback socket_option_value) -> int { + switch (socket_option_key) { + case ProducerCallbacksOptions::NEW_CONTENT_OBJECT: + on_new_segment_ = socket_option_value; + break; + + case ProducerCallbacksOptions::CONTENT_OBJECT_SIGN: + on_content_object_to_sign_ = socket_option_value; + break; + + case ProducerCallbacksOptions::CONTENT_OBJECT_READY: + on_content_object_in_output_buffer_ = socket_option_value; + break; + + case ProducerCallbacksOptions::CONTENT_OBJECT_OUTPUT: + on_content_object_output_ = socket_option_value; + break; + + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; + }); +} + +int ProducerSocket::setSocketOption( + int socket_option_key, ProducerInterestCallback socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ProducerInterestCallback socket_option_value) -> int { + switch (socket_option_key) { + case ProducerCallbacksOptions::INTEREST_INPUT: + on_interest_input_ = socket_option_value; + break; + + case ProducerCallbacksOptions::INTEREST_DROP: + on_interest_dropped_input_buffer_ = socket_option_value; + break; + + case ProducerCallbacksOptions::INTEREST_PASS: + on_interest_inserted_input_buffer_ = socket_option_value; + break; + + case ProducerCallbacksOptions::CACHE_HIT: + on_interest_satisfied_output_buffer_ = socket_option_value; + break; + + case ProducerCallbacksOptions::CACHE_MISS: + on_interest_process_ = socket_option_value; + break; + + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; + }); +} + +int ProducerSocket::setSocketOption( + int socket_option_key, ProducerContentCallback socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ProducerContentCallback socket_option_value) -> int { + switch (socket_option_key) { + case ProducerCallbacksOptions::CONTENT_PRODUCED: + on_content_produced_ = socket_option_value; + break; + + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; + }); +} + +int ProducerSocket::setSocketOption(int socket_option_key, + HashAlgorithm socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::HASH_ALGORITHM: + hash_algorithm_ = socket_option_value; + break; + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; +} + +int ProducerSocket::setSocketOption(int socket_option_key, + utils::CryptoSuite socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::CRYPTO_SUITE: + crypto_suite_ = socket_option_value; + break; + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; +} + +int ProducerSocket::setSocketOption( + int socket_option_key, + const std::shared_ptr<utils::Identity> &socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::IDENTITY: { + utils::SpinLock::Acquire locked(identity_lock_); + identity_.reset(); + identity_ = socket_option_value; + } break; + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; +} + +int ProducerSocket::setSocketOption(int socket_option_key, + const std::string &socket_option_value) { + switch (socket_option_key) { + case DataLinkOptions::OUTPUT_INTERFACE: + output_interface_ = socket_option_value; + portal_->setOutputInterface(output_interface_); + break; + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; +} + +int ProducerSocket::getSocketOption(int socket_option_key, + uint32_t &socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::OUTPUT_BUFFER_SIZE: + socket_option_value = (uint32_t)output_buffer_.getLimit(); + break; + + case GeneralTransportOptions::DATA_PACKET_SIZE: + socket_option_value = (uint32_t)data_packet_size_; + break; + + case GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME: + socket_option_value = content_object_expiry_time_; + break; + + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_GET; +} + +int ProducerSocket::getSocketOption(int socket_option_key, + bool &socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::MAKE_MANIFEST: + socket_option_value = making_manifest_; + break; + + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; +} + +int ProducerSocket::getSocketOption(int socket_option_key, + std::list<Prefix> &socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::NETWORK_NAME: + socket_option_value = served_namespaces_; + break; + + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; +} + +int ProducerSocket::getSocketOption( + int socket_option_key, + ProducerContentObjectCallback **socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ProducerContentObjectCallback **socket_option_value) -> int { + switch (socket_option_key) { + case ProducerCallbacksOptions::NEW_CONTENT_OBJECT: + *socket_option_value = &on_new_segment_; + break; + + case ProducerCallbacksOptions::CONTENT_OBJECT_SIGN: + *socket_option_value = &on_content_object_to_sign_; + break; + + case ProducerCallbacksOptions::CONTENT_OBJECT_READY: + *socket_option_value = &on_content_object_in_output_buffer_; + break; + + case ProducerCallbacksOptions::CONTENT_OBJECT_OUTPUT: + *socket_option_value = &on_content_object_output_; + break; + + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; + }); +} + +int ProducerSocket::getSocketOption( + int socket_option_key, ProducerContentCallback **socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ProducerContentCallback **socket_option_value) -> int { + switch (socket_option_key) { + case ProducerCallbacksOptions::CONTENT_PRODUCED: + *socket_option_value = &on_content_produced_; + break; + + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; + }); +} + +int ProducerSocket::getSocketOption( + int socket_option_key, ProducerInterestCallback **socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ProducerInterestCallback **socket_option_value) -> int { + switch (socket_option_key) { + case ProducerCallbacksOptions::INTEREST_INPUT: + *socket_option_value = &on_interest_input_; + break; + + case ProducerCallbacksOptions::INTEREST_DROP: + *socket_option_value = &on_interest_dropped_input_buffer_; + break; + + case ProducerCallbacksOptions::INTEREST_PASS: + *socket_option_value = &on_interest_inserted_input_buffer_; + break; + + case CACHE_HIT: + *socket_option_value = &on_interest_satisfied_output_buffer_; + break; + + case CACHE_MISS: + *socket_option_value = &on_interest_process_; + break; + + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; + }); +} + +int ProducerSocket::getSocketOption( + int socket_option_key, std::shared_ptr<Portal> &socket_option_value) { + switch (socket_option_key) { + case PORTAL: + socket_option_value = portal_; + break; + default: + return SOCKET_OPTION_NOT_GET; + ; + } + + return SOCKET_OPTION_GET; +} + +int ProducerSocket::getSocketOption(int socket_option_key, + HashAlgorithm &socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::HASH_ALGORITHM: + socket_option_value = hash_algorithm_; + break; + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; +} + +int ProducerSocket::getSocketOption(int socket_option_key, + utils::CryptoSuite &socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::HASH_ALGORITHM: + socket_option_value = crypto_suite_; + break; + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; +} + +int ProducerSocket::getSocketOption( + int socket_option_key, + std::shared_ptr<utils::Identity> &socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::IDENTITY: { + utils::SpinLock::Acquire locked(identity_lock_); + socket_option_value = identity_; + } break; + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; +} + +int ProducerSocket::getSocketOption(int socket_option_key, + std::string &socket_option_value) { + switch (socket_option_key) { + case DataLinkOptions::OUTPUT_INTERFACE: + socket_option_value = output_interface_; + break; + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; +} + asio::io_service &ProducerSocket::getIoService() { return io_service_; } } // namespace interface diff --git a/libtransport/src/hicn/transport/interfaces/socket_producer.h b/libtransport/src/hicn/transport/interfaces/socket_producer.h index 18adbf4a7..5c617d761 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_producer.h +++ b/libtransport/src/hicn/transport/interfaces/socket_producer.h @@ -91,500 +91,116 @@ class ProducerSocket : public Socket<BasePortal>, onInterest(*interest); }; - TRANSPORT_ALWAYS_INLINE int setSocketOption(int socket_option_key, - uint32_t socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::DATA_PACKET_SIZE: - if (socket_option_value < default_values::max_content_object_size && - socket_option_value > 0) { - data_packet_size_ = socket_option_value; - break; - } - - case GeneralTransportOptions::INPUT_BUFFER_SIZE: - if (socket_option_value >= 1) { - input_buffer_capacity_ = socket_option_value; - break; - } - - case GeneralTransportOptions::OUTPUT_BUFFER_SIZE: - output_buffer_.setLimit(socket_option_value); - break; - - case GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME: - content_object_expiry_time_ = socket_option_value; - break; - - case GeneralTransportOptions::SIGNATURE_TYPE: - if (socket_option_value == SOCKET_OPTION_DEFAULT) { - signature_type_ = SHA_256; - } else { - signature_type_ = socket_option_value; - } - - if (signature_type_ == SHA_256 || signature_type_ == RSA_256) { - signature_size_ = 32; - } - - break; - - case ProducerCallbacksOptions::INTEREST_INPUT: - if (socket_option_value == VOID_HANDLER) { - on_interest_input_ = VOID_HANDLER; - break; - } - - case ProducerCallbacksOptions::INTEREST_DROP: - if (socket_option_value == VOID_HANDLER) { - on_interest_dropped_input_buffer_ = VOID_HANDLER; - break; - } - - case ProducerCallbacksOptions::INTEREST_PASS: - if (socket_option_value == VOID_HANDLER) { - on_interest_inserted_input_buffer_ = VOID_HANDLER; - break; - } - - case ProducerCallbacksOptions::CACHE_HIT: - if (socket_option_value == VOID_HANDLER) { - on_interest_satisfied_output_buffer_ = VOID_HANDLER; - break; - } - - case ProducerCallbacksOptions::CACHE_MISS: - if (socket_option_value == VOID_HANDLER) { - on_interest_process_ = VOID_HANDLER; - break; - } - - case ProducerCallbacksOptions::NEW_CONTENT_OBJECT: - if (socket_option_value == VOID_HANDLER) { - on_new_segment_ = VOID_HANDLER; - break; - } - - case ProducerCallbacksOptions::CONTENT_OBJECT_SIGN: - if (socket_option_value == VOID_HANDLER) { - on_content_object_to_sign_ = VOID_HANDLER; - break; - } - - case ProducerCallbacksOptions::CONTENT_OBJECT_READY: - if (socket_option_value == VOID_HANDLER) { - on_content_object_in_output_buffer_ = VOID_HANDLER; - break; - } - - case ProducerCallbacksOptions::CONTENT_OBJECT_OUTPUT: - if (socket_option_value == VOID_HANDLER) { - on_content_object_output_ = VOID_HANDLER; - break; - } - - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } - - TRANSPORT_ALWAYS_INLINE int setSocketOption(int socket_option_key, - bool socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::MAKE_MANIFEST: - making_manifest_ = socket_option_value; - break; - - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } - - TRANSPORT_ALWAYS_INLINE int setSocketOption(int socket_option_key, - Name *socket_option_value) { - return SOCKET_OPTION_NOT_SET; - } - - TRANSPORT_ALWAYS_INLINE int setSocketOption( - int socket_option_key, std::list<Prefix> socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::NETWORK_NAME: - served_namespaces_ = socket_option_value; - break; - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } + virtual int setSocketOption(int socket_option_key, + uint32_t socket_option_value); - TRANSPORT_ALWAYS_INLINE int setSocketOption( - int socket_option_key, - ProducerContentObjectCallback socket_option_value) { - switch (socket_option_key) { - case ProducerCallbacksOptions::NEW_CONTENT_OBJECT: - on_new_segment_ = socket_option_value; - break; - - case ProducerCallbacksOptions::CONTENT_OBJECT_SIGN: - on_content_object_to_sign_ = socket_option_value; - break; - - case ProducerCallbacksOptions::CONTENT_OBJECT_READY: - on_content_object_in_output_buffer_ = socket_option_value; - break; + virtual int setSocketOption(int socket_option_key, + std::nullptr_t socket_option_value); - case ProducerCallbacksOptions::CONTENT_OBJECT_OUTPUT: - on_content_object_output_ = socket_option_value; - break; + virtual int setSocketOption(int socket_option_key, bool socket_option_value); - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } + virtual int setSocketOption(int socket_option_key, Name *socket_option_value); - TRANSPORT_ALWAYS_INLINE int setSocketOption( - int socket_option_key, ProducerInterestCallback socket_option_value) { - switch (socket_option_key) { - case ProducerCallbacksOptions::INTEREST_INPUT: - on_interest_input_ = socket_option_value; - break; + virtual int setSocketOption(int socket_option_key, + std::list<Prefix> socket_option_value); - case ProducerCallbacksOptions::INTEREST_DROP: - on_interest_dropped_input_buffer_ = socket_option_value; - break; + virtual int setSocketOption( + int socket_option_key, ProducerContentObjectCallback socket_option_value); - case ProducerCallbacksOptions::INTEREST_PASS: - on_interest_inserted_input_buffer_ = socket_option_value; - break; + virtual int setSocketOption(int socket_option_key, + ProducerInterestCallback socket_option_value); - case ProducerCallbacksOptions::CACHE_HIT: - on_interest_satisfied_output_buffer_ = socket_option_value; - break; - - case ProducerCallbacksOptions::CACHE_MISS: - on_interest_process_ = socket_option_value; - break; - - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } + virtual int setSocketOption(int socket_option_key, + ProducerContentCallback socket_option_value); - TRANSPORT_ALWAYS_INLINE int setSocketOption( - int socket_option_key, ProducerContentCallback socket_option_value) { - switch (socket_option_key) { - case ProducerCallbacksOptions::CONTENT_PRODUCED: - on_content_produced_ = socket_option_value; - break; + virtual int setSocketOption(int socket_option_key, + HashAlgorithm socket_option_value); - default: - return SOCKET_OPTION_NOT_SET; - } + virtual int setSocketOption(int socket_option_key, + utils::CryptoSuite socket_option_value); - return SOCKET_OPTION_SET; - } - - TRANSPORT_ALWAYS_INLINE int setSocketOption( - int socket_option_key, HashAlgorithm socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::HASH_ALGORITHM: - hash_algorithm_ = socket_option_value; - break; - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } - - TRANSPORT_ALWAYS_INLINE int setSocketOption( - int socket_option_key, utils::CryptoSuite socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::CRYPTO_SUITE: - crypto_suite_ = socket_option_value; - break; - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } - - TRANSPORT_ALWAYS_INLINE int setSocketOption( + virtual int setSocketOption( int socket_option_key, - const std::shared_ptr<utils::Identity> &socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::IDENTITY: - identity_ = socket_option_value; - break; - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } - - TRANSPORT_ALWAYS_INLINE int setSocketOption( - int socket_option_key, const std::string &socket_option_value) { - switch (socket_option_key) { - case DataLinkOptions::OUTPUT_INTERFACE: - output_interface_ = socket_option_value; - portal_->setOutputInterface(output_interface_); - break; - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - ; - } + const std::shared_ptr<utils::Identity> &socket_option_value); - TRANSPORT_ALWAYS_INLINE int getSocketOption(int socket_option_key, - uint32_t &socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::INPUT_BUFFER_SIZE: - socket_option_value = (uint32_t)input_buffer_capacity_; - break; + virtual int setSocketOption(int socket_option_key, + const std::string &socket_option_value); - case GeneralTransportOptions::OUTPUT_BUFFER_SIZE: - socket_option_value = (uint32_t)output_buffer_.getLimit(); - break; + virtual int getSocketOption(int socket_option_key, + uint32_t &socket_option_value); - case GeneralTransportOptions::DATA_PACKET_SIZE: - socket_option_value = (uint32_t)data_packet_size_; - break; + virtual int getSocketOption(int socket_option_key, bool &socket_option_value); - case GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME: - socket_option_value = content_object_expiry_time_; - break; - - case GeneralTransportOptions::SIGNATURE_TYPE: - socket_option_value = signature_type_; - break; - - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_GET; - } - - TRANSPORT_ALWAYS_INLINE int getSocketOption(int socket_option_key, - bool &socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::MAKE_MANIFEST: - socket_option_value = making_manifest_; - break; - - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } + virtual int getSocketOption(int socket_option_key, + std::list<Prefix> &socket_option_value); - TRANSPORT_ALWAYS_INLINE int getSocketOption( - int socket_option_key, std::list<Prefix> &socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::NETWORK_NAME: - socket_option_value = served_namespaces_; - break; - - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } - - TRANSPORT_ALWAYS_INLINE int getSocketOption( + virtual int getSocketOption( int socket_option_key, - ProducerContentObjectCallback **socket_option_value) { - switch (socket_option_key) { - case ProducerCallbacksOptions::NEW_CONTENT_OBJECT: - *socket_option_value = &on_new_segment_; - break; - - case ProducerCallbacksOptions::CONTENT_OBJECT_SIGN: - *socket_option_value = &on_content_object_to_sign_; - break; - - case ProducerCallbacksOptions::CONTENT_OBJECT_READY: - *socket_option_value = &on_content_object_in_output_buffer_; - break; - - case ProducerCallbacksOptions::CONTENT_OBJECT_OUTPUT: - *socket_option_value = &on_content_object_output_; - break; - - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } - - TRANSPORT_ALWAYS_INLINE int getSocketOption( - int socket_option_key, ProducerContentCallback **socket_option_value) { - switch (socket_option_key) { - case ProducerCallbacksOptions::CONTENT_PRODUCED: - *socket_option_value = &on_content_produced_; - break; - - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } - - TRANSPORT_ALWAYS_INLINE int getSocketOption( - int socket_option_key, ProducerInterestCallback **socket_option_value) { - switch (socket_option_key) { - case ProducerCallbacksOptions::INTEREST_INPUT: - *socket_option_value = &on_interest_input_; - break; - - case ProducerCallbacksOptions::INTEREST_DROP: - *socket_option_value = &on_interest_dropped_input_buffer_; - break; - - case ProducerCallbacksOptions::INTEREST_PASS: - *socket_option_value = &on_interest_inserted_input_buffer_; - break; + ProducerContentObjectCallback **socket_option_value); - case CACHE_HIT: - *socket_option_value = &on_interest_satisfied_output_buffer_; - break; + virtual int getSocketOption(int socket_option_key, + ProducerContentCallback **socket_option_value); - case CACHE_MISS: - *socket_option_value = &on_interest_process_; - break; + virtual int getSocketOption(int socket_option_key, + ProducerInterestCallback **socket_option_value); - default: - return SOCKET_OPTION_NOT_GET; - } + virtual int getSocketOption(int socket_option_key, + std::shared_ptr<Portal> &socket_option_value); - return SOCKET_OPTION_GET; - } + virtual int getSocketOption(int socket_option_key, + HashAlgorithm &socket_option_value); - TRANSPORT_ALWAYS_INLINE int getSocketOption( - int socket_option_key, std::shared_ptr<Portal> &socket_option_value) { - switch (socket_option_key) { - case PORTAL: - socket_option_value = portal_; - break; - default: - return SOCKET_OPTION_NOT_GET; - ; - } - - return SOCKET_OPTION_GET; - } + virtual int getSocketOption(int socket_option_key, + utils::CryptoSuite &socket_option_value); - TRANSPORT_ALWAYS_INLINE int getSocketOption( - int socket_option_key, HashAlgorithm &socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::HASH_ALGORITHM: - socket_option_value = hash_algorithm_; - break; - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } + virtual int getSocketOption( + int socket_option_key, + std::shared_ptr<utils::Identity> &socket_option_value); - TRANSPORT_ALWAYS_INLINE int getSocketOption( - int socket_option_key, utils::CryptoSuite &socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::HASH_ALGORITHM: - socket_option_value = crypto_suite_; - break; - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } + virtual int getSocketOption(int socket_option_key, + std::string &socket_option_value); - TRANSPORT_ALWAYS_INLINE int getSocketOption( - int socket_option_key, - std::shared_ptr<utils::Identity> &socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::IDENTITY: - if (identity_) { - socket_option_value = identity_; - break; - } - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } + template <typename Lambda, typename arg2> + int rescheduleOnIOService(int socket_option_key, arg2 socket_option_value, + Lambda lambda_func); - TRANSPORT_ALWAYS_INLINE int getSocketOption( - int socket_option_key, std::string &socket_option_value) { - switch (socket_option_key) { - case DataLinkOptions::OUTPUT_INTERFACE: - socket_option_value = output_interface_; - break; - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } + template <typename Lambda, typename arg2> + int rescheduleOnIOServiceWithReference(int socket_option_key, + arg2 &socket_option_value, + Lambda lambda_func); - private: + protected: // Threads std::thread listening_thread_; - protected: asio::io_service internal_io_service_; asio::io_service &io_service_; std::shared_ptr<Portal> portal_; - std::size_t data_packet_size_; - std::list<Prefix> served_namespaces_; - uint32_t content_object_expiry_time_; + std::atomic<size_t> data_packet_size_; + std::list<Prefix> + served_namespaces_; // No need to be threadsafe, this is always modified + // by the application thread + std::atomic<uint32_t> content_object_expiry_time_; // buffers + // ContentStore is thread-safe utils::ContentStore output_buffer_; - private: utils::EventThread async_thread_; int registration_status_; - bool making_manifest_; + std::atomic<bool> making_manifest_; // map for storing sequence numbers for several calls of the publish // function std::unordered_map<Name, std::unordered_map<int, uint32_t>> seq_number_map_; - int signature_type_; - int signature_size_; - - HashAlgorithm hash_algorithm_; - utils::CryptoSuite crypto_suite_; + std::atomic<HashAlgorithm> hash_algorithm_; + std::atomic<utils::CryptoSuite> crypto_suite_; + utils::SpinLock identity_lock_; std::shared_ptr<utils::Identity> identity_; - // buffers - - std::queue<std::shared_ptr<const Interest>> input_buffer_; - std::atomic_size_t input_buffer_capacity_; - std::atomic_size_t input_buffer_size_; - // callbacks - protected: ProducerInterestCallback on_interest_input_; ProducerInterestCallback on_interest_dropped_input_buffer_; ProducerInterestCallback on_interest_inserted_input_buffer_; |