From 755c6833ae2d2eee87e80ed3b84c75e968f48c46 Mon Sep 17 00:00:00 2001 From: Alberto Compagno Date: Tue, 15 Oct 2019 18:08:41 +0200 Subject: [HICN-328] Reworking setSocketOption and getSocketOption to be thread-safe Change-Id: Ie22572822b9ac1e6c300fd7982035c799546bd76 Signed-off-by: Alberto Compagno --- .../hicn/transport/interfaces/socket_producer.cc | 659 +++++++++++++++++++-- 1 file changed, 614 insertions(+), 45 deletions(-) (limited to 'libtransport/src/hicn/transport/interfaces/socket_producer.cc') diff --git a/libtransport/src/hicn/transport/interfaces/socket_producer.cc b/libtransport/src/hicn/transport/interfaces/socket_producer.cc index 9ca004c41..bc93e77c6 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_producer.cc +++ b/libtransport/src/hicn/transport/interfaces/socket_producer.cc @@ -37,10 +37,7 @@ ProducerSocket::ProducerSocket(asio::io_service &io_service) output_buffer_(default_values::producer_socket_output_buffer_size), registration_status_(REGISTRATION_NOT_ATTEMPTED), making_manifest_(false), - signature_type_(SHA_256), hash_algorithm_(HashAlgorithm::SHA_256), - input_buffer_capacity_(default_values::producer_socket_input_buffer_size), - input_buffer_size_(0), on_interest_input_(VOID_HANDLER), on_interest_dropped_input_buffer_(VOID_HANDLER), on_interest_inserted_input_buffer_(VOID_HANDLER), @@ -98,22 +95,30 @@ void ProducerSocket::listen() { void ProducerSocket::passContentObjectToCallbacks( const std::shared_ptr &content_object) { if (content_object) { - if (on_new_segment_ != VOID_HANDLER) { - on_new_segment_(*this, *content_object); + if (on_new_segment_) { + io_service_.dispatch([this, content_object]() { + on_new_segment_(*this, *content_object); + }); } - if (on_content_object_to_sign_ != VOID_HANDLER) { - on_content_object_to_sign_(*this, *content_object); + if (on_content_object_to_sign_) { + io_service_.dispatch([this, content_object]() { + on_content_object_to_sign_(*this, *content_object); + }); } - if (on_content_object_in_output_buffer_ != VOID_HANDLER) { - on_content_object_in_output_buffer_(*this, *content_object); + if (on_content_object_in_output_buffer_) { + io_service_.dispatch([this, content_object]() { + on_content_object_in_output_buffer_(*this, *content_object); + }); } output_buffer_.insert(content_object); - if (on_content_object_output_ != VOID_HANDLER) { - on_content_object_output_(*this, *content_object); + if (on_content_object_output_) { + io_service_.dispatch([this, content_object]() { + on_content_object_output_(*this, *content_object); + }); } portal_->sendContentObject(*content_object); @@ -121,15 +126,19 @@ void ProducerSocket::passContentObjectToCallbacks( } void ProducerSocket::produce(ContentObject &content_object) { - if (on_content_object_in_output_buffer_ != VOID_HANDLER) { - on_content_object_in_output_buffer_(*this, content_object); + if (on_content_object_in_output_buffer_) { + io_service_.dispatch([this, &content_object]() { + on_content_object_in_output_buffer_(*this, content_object); + }); } output_buffer_.insert(std::static_pointer_cast( content_object.shared_from_this())); - if (on_content_object_output_ != VOID_HANDLER) { - on_content_object_output_(*this, content_object); + if (on_content_object_output_) { + io_service_.dispatch([this, &content_object]() { + on_content_object_output_(*this, content_object); + }); } portal_->sendContentObject(content_object); @@ -142,6 +151,15 @@ uint32_t ProducerSocket::produce(Name content_name, return 0; } + // Copy the atomic variable to ensure always the same value during the a + // production + std::size_t data_packet_size = data_packet_size_; + uint32_t content_object_expiry_time = content_object_expiry_time_; + HashAlgorithm algo = hash_algorithm_; + bool making_manifest = making_manifest_; + std::shared_ptr identity; + getSocketOption(GeneralTransportOptions::IDENTITY, identity); + auto buffer_size = buffer->length(); const std::size_t hash_size = 32; @@ -162,7 +180,7 @@ uint32_t ProducerSocket::produce(Name content_name, std::unique_ptr zero_hash; // TODO Manifest may still be used for indexing - if (making_manifest_ && !identity_) { + if (making_manifest && !identity) { throw errors::RuntimeException( "Making manifests without setting producer identity. Aborting."); } @@ -182,18 +200,18 @@ uint32_t ProducerSocket::produce(Name content_name, } format = hf_format; - if (making_manifest_) { + if (making_manifest) { format = hf_format; manifest_header_size = core::Packet::getHeaderSizeFromFormat( - hf_format_ah, identity_->getSignatureLength()); - } else if (identity_) { + hf_format_ah, identity->getSignatureLength()); + } else if (identity) { format = hf_format_ah; - signature_length = identity_->getSignatureLength(); + signature_length = identity->getSignatureLength(); } header_size = core::Packet::getHeaderSizeFromFormat(format, signature_length); - free_space_for_content = data_packet_size_ - header_size; + free_space_for_content = data_packet_size - header_size; uint32_t number_of_segments = uint32_t(std::ceil(double(buffer_size) / double(free_space_for_content))); @@ -204,9 +222,9 @@ uint32_t ProducerSocket::produce(Name content_name, // TODO allocate space for all the headers - if (making_manifest_) { + if (making_manifest) { auto segment_in_manifest = static_cast( - std::floor(double(data_packet_size_ - manifest_header_size - + std::floor(double(data_packet_size - manifest_header_size - ContentObjectManifest::getManifestHeaderSize()) / (4.0 + 32.0)) - 1.0); @@ -219,8 +237,8 @@ uint32_t ProducerSocket::produce(Name content_name, core::ManifestVersion::VERSION_1, core::ManifestType::INLINE_MANIFEST, hash_algorithm_, is_last_manifest, content_name, core::NextSegmentCalculationStrategy::INCREMENTAL, - identity_->getSignatureLength())); - manifest->setLifetime(content_object_expiry_time_); + identity->getSignatureLength())); + manifest->setLifetime(content_object_expiry_time); if (is_last) { manifest->setFinalBlockNumber(final_block_number); @@ -231,21 +249,21 @@ uint32_t ProducerSocket::produce(Name content_name, uint8_t hash[hash_size]; std::memset(hash, 0, hash_size); zero_hash = std::make_unique( - hash, hash_size, static_cast(hash_algorithm_)); + hash, hash_size, static_cast(algo)); } for (unsigned int packaged_segments = 0; packaged_segments < number_of_segments; packaged_segments++) { - if (making_manifest_) { + if (making_manifest) { if (manifest->estimateManifestSize(2) > - data_packet_size_ - manifest_header_size) { + data_packet_size - manifest_header_size) { // Add next manifest manifest->addSuffixHash(current_segment, *zero_hash); // Send the current manifest manifest->encode(); - identity_->getSigner().sign(*manifest); + identity->getSigner().sign(*manifest); passContentObjectToCallbacks(manifest); @@ -258,8 +276,8 @@ uint32_t ProducerSocket::produce(Name content_name, core::ManifestType::INLINE_MANIFEST, hash_algorithm_, is_last_manifest, content_name, core::NextSegmentCalculationStrategy::INCREMENTAL, - identity_->getSignatureLength())); - manifest->setLifetime(content_object_expiry_time_); + identity->getSignatureLength())); + manifest->setLifetime(content_object_expiry_time); if (is_last) { manifest->setFinalBlockNumber(final_block_number); } else { @@ -271,7 +289,7 @@ uint32_t ProducerSocket::produce(Name content_name, auto content_object = std::make_shared( content_name.setSuffix(current_segment), format); - content_object->setLifetime(content_object_expiry_time_); + content_object->setLifetime(content_object_expiry_time); auto b = buffer->cloneOne(); b->trimStart(free_space_for_content * packaged_segments); @@ -280,7 +298,7 @@ uint32_t ProducerSocket::produce(Name content_name, b->append(buffer_size - bytes_segmented); bytes_segmented += (int)(buffer_size - bytes_segmented); - if (is_last && making_manifest_) { + if (is_last && making_manifest) { is_last_manifest = true; } else if (is_last) { content_object->setRst(); @@ -293,19 +311,19 @@ uint32_t ProducerSocket::produce(Name content_name, content_object->appendPayload(std::move(b)); - if (making_manifest_) { + if (making_manifest) { using namespace std::chrono_literals; utils::CryptoHash hash = content_object->computeDigest(hash_algorithm_); manifest->addSuffixHash(current_segment, hash); - } else if (identity_) { - identity_->getSigner().sign(*content_object); + } else if (identity) { + identity->getSigner().sign(*content_object); } current_segment++; passContentObjectToCallbacks(content_object); } - if (making_manifest_) { + if (making_manifest) { if (is_last_manifest) { manifest->setFinalManifest(is_last_manifest); } @@ -315,13 +333,15 @@ uint32_t ProducerSocket::produce(Name content_name, } manifest->encode(); - identity_->getSigner().sign(*manifest); + identity->getSigner().sign(*manifest); passContentObjectToCallbacks(manifest); } - if (on_content_produced_ != VOID_HANDLER) { - on_content_produced_(*this, std::make_error_code(std::errc(0)), - buffer_size); + if (on_content_produced_) { + io_service_.dispatch([this, buffer_size]() { + on_content_produced_(*this, std::make_error_code(std::errc(0)), + buffer_size); + }); } return current_segment - start_offset; @@ -347,7 +367,7 @@ void ProducerSocket::asyncProduce(const Name &suffix, const uint8_t *buf, } void ProducerSocket::onInterest(Interest &interest) { - if (on_interest_input_ != VOID_HANDLER) { + if (on_interest_input_) { on_interest_input_(*this, interest); } @@ -355,22 +375,571 @@ void ProducerSocket::onInterest(Interest &interest) { output_buffer_.find(interest); if (content_object) { - if (on_interest_satisfied_output_buffer_ != VOID_HANDLER) { + if (on_interest_satisfied_output_buffer_) { on_interest_satisfied_output_buffer_(*this, interest); } - if (on_content_object_output_ != VOID_HANDLER) { + if (on_content_object_output_) { on_content_object_output_(*this, *content_object); } portal_->sendContentObject(*content_object); } else { - if (on_interest_process_ != VOID_HANDLER) { + if (on_interest_process_) { on_interest_process_(*this, interest); } } } +// If the thread calling lambda_func is not the same of io_service, this +// function reschedule the function on it +template +int ProducerSocket::rescheduleOnIOService(int socket_option_key, + arg2 socket_option_value, + Lambda lambda_func) { + // To enforce type check + std::function func = lambda_func; + int result = SOCKET_OPTION_SET; + if (listening_thread_.joinable() && + std::this_thread::get_id() != listening_thread_.get_id()) { + std::mutex mtx; + /* Condition variable for the wait */ + std::condition_variable cv; + bool done = false; + io_service_.dispatch([this, &socket_option_key, &socket_option_value, &mtx, + &cv, &result, &done, &func]() { + std::unique_lock lck(mtx); + done = true; + result = func(socket_option_key, socket_option_value); + }); + std::unique_lock lck(mtx); + if (!done) { + cv.wait(lck); + } + } else { + result = func(socket_option_key, socket_option_value); + } + + return result; +} + +// If the thread calling lambda_func is not the same of io_service, this +// function reschedule the function on it +template +int ProducerSocket::rescheduleOnIOServiceWithReference( + int socket_option_key, arg2 &socket_option_value, Lambda lambda_func) { + // To enforce type check + std::function func = lambda_func; + int result = SOCKET_OPTION_SET; + if (listening_thread_.joinable() && + std::this_thread::get_id() != this->listening_thread_.get_id()) { + std::mutex mtx; + /* Condition variable for the wait */ + std::condition_variable cv; + std::unique_lock lck(mtx); + bool done = false; + io_service_.dispatch([this, &socket_option_key, &socket_option_value, &mtx, + &cv, &result, &done, &func]() { + std::unique_lock lck(mtx); + done = true; + result = func(socket_option_key, socket_option_value); + + if (!done) { + cv.wait(lck); + } + }); + } else { + result = func(socket_option_key, socket_option_value); + } + + return result; +} + +int ProducerSocket::setSocketOption(int socket_option_key, + uint32_t socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::DATA_PACKET_SIZE: + if (socket_option_value < default_values::max_content_object_size && + socket_option_value > 0) { + data_packet_size_ = socket_option_value; + break; + } + + case GeneralTransportOptions::OUTPUT_BUFFER_SIZE: + output_buffer_.setLimit(socket_option_value); + break; + + case GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME: + content_object_expiry_time_ = socket_option_value; + break; + + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; +} + +int ProducerSocket::setSocketOption(int socket_option_key, + std::nullptr_t socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ProducerContentObjectCallback socket_option_value) -> int { + switch (socket_option_key) { + case ProducerCallbacksOptions::INTEREST_INPUT: + if (socket_option_value == VOID_HANDLER) { + on_interest_input_ = VOID_HANDLER; + break; + } + + case ProducerCallbacksOptions::INTEREST_DROP: + if (socket_option_value == VOID_HANDLER) { + on_interest_dropped_input_buffer_ = VOID_HANDLER; + break; + } + + case ProducerCallbacksOptions::INTEREST_PASS: + if (socket_option_value == VOID_HANDLER) { + on_interest_inserted_input_buffer_ = VOID_HANDLER; + break; + } + + case ProducerCallbacksOptions::CACHE_HIT: + if (socket_option_value == VOID_HANDLER) { + on_interest_satisfied_output_buffer_ = VOID_HANDLER; + break; + } + + case ProducerCallbacksOptions::CACHE_MISS: + if (socket_option_value == VOID_HANDLER) { + on_interest_process_ = VOID_HANDLER; + break; + } + + case ProducerCallbacksOptions::NEW_CONTENT_OBJECT: + if (socket_option_value == VOID_HANDLER) { + on_new_segment_ = VOID_HANDLER; + break; + } + + case ProducerCallbacksOptions::CONTENT_OBJECT_SIGN: + if (socket_option_value == VOID_HANDLER) { + on_content_object_to_sign_ = VOID_HANDLER; + break; + } + + case ProducerCallbacksOptions::CONTENT_OBJECT_READY: + if (socket_option_value == VOID_HANDLER) { + on_content_object_in_output_buffer_ = VOID_HANDLER; + break; + } + + case ProducerCallbacksOptions::CONTENT_OBJECT_OUTPUT: + if (socket_option_value == VOID_HANDLER) { + on_content_object_output_ = VOID_HANDLER; + break; + } + + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; + }); +} + +int ProducerSocket::setSocketOption(int socket_option_key, + bool socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::MAKE_MANIFEST: + making_manifest_ = socket_option_value; + break; + + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; +} + +int ProducerSocket::setSocketOption(int socket_option_key, + Name *socket_option_value) { + return SOCKET_OPTION_NOT_SET; +} + +int ProducerSocket::setSocketOption(int socket_option_key, + std::list socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::NETWORK_NAME: + served_namespaces_ = socket_option_value; + break; + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; +} + +int ProducerSocket::setSocketOption( + int socket_option_key, ProducerContentObjectCallback socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ProducerContentObjectCallback socket_option_value) -> int { + switch (socket_option_key) { + case ProducerCallbacksOptions::NEW_CONTENT_OBJECT: + on_new_segment_ = socket_option_value; + break; + + case ProducerCallbacksOptions::CONTENT_OBJECT_SIGN: + on_content_object_to_sign_ = socket_option_value; + break; + + case ProducerCallbacksOptions::CONTENT_OBJECT_READY: + on_content_object_in_output_buffer_ = socket_option_value; + break; + + case ProducerCallbacksOptions::CONTENT_OBJECT_OUTPUT: + on_content_object_output_ = socket_option_value; + break; + + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; + }); +} + +int ProducerSocket::setSocketOption( + int socket_option_key, ProducerInterestCallback socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ProducerInterestCallback socket_option_value) -> int { + switch (socket_option_key) { + case ProducerCallbacksOptions::INTEREST_INPUT: + on_interest_input_ = socket_option_value; + break; + + case ProducerCallbacksOptions::INTEREST_DROP: + on_interest_dropped_input_buffer_ = socket_option_value; + break; + + case ProducerCallbacksOptions::INTEREST_PASS: + on_interest_inserted_input_buffer_ = socket_option_value; + break; + + case ProducerCallbacksOptions::CACHE_HIT: + on_interest_satisfied_output_buffer_ = socket_option_value; + break; + + case ProducerCallbacksOptions::CACHE_MISS: + on_interest_process_ = socket_option_value; + break; + + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; + }); +} + +int ProducerSocket::setSocketOption( + int socket_option_key, ProducerContentCallback socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ProducerContentCallback socket_option_value) -> int { + switch (socket_option_key) { + case ProducerCallbacksOptions::CONTENT_PRODUCED: + on_content_produced_ = socket_option_value; + break; + + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; + }); +} + +int ProducerSocket::setSocketOption(int socket_option_key, + HashAlgorithm socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::HASH_ALGORITHM: + hash_algorithm_ = socket_option_value; + break; + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; +} + +int ProducerSocket::setSocketOption(int socket_option_key, + utils::CryptoSuite socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::CRYPTO_SUITE: + crypto_suite_ = socket_option_value; + break; + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; +} + +int ProducerSocket::setSocketOption( + int socket_option_key, + const std::shared_ptr &socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::IDENTITY: { + utils::SpinLock::Acquire locked(identity_lock_); + identity_.reset(); + identity_ = socket_option_value; + } break; + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; +} + +int ProducerSocket::setSocketOption(int socket_option_key, + const std::string &socket_option_value) { + switch (socket_option_key) { + case DataLinkOptions::OUTPUT_INTERFACE: + output_interface_ = socket_option_value; + portal_->setOutputInterface(output_interface_); + break; + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; +} + +int ProducerSocket::getSocketOption(int socket_option_key, + uint32_t &socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::OUTPUT_BUFFER_SIZE: + socket_option_value = (uint32_t)output_buffer_.getLimit(); + break; + + case GeneralTransportOptions::DATA_PACKET_SIZE: + socket_option_value = (uint32_t)data_packet_size_; + break; + + case GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME: + socket_option_value = content_object_expiry_time_; + break; + + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_GET; +} + +int ProducerSocket::getSocketOption(int socket_option_key, + bool &socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::MAKE_MANIFEST: + socket_option_value = making_manifest_; + break; + + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; +} + +int ProducerSocket::getSocketOption(int socket_option_key, + std::list &socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::NETWORK_NAME: + socket_option_value = served_namespaces_; + break; + + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; +} + +int ProducerSocket::getSocketOption( + int socket_option_key, + ProducerContentObjectCallback **socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ProducerContentObjectCallback **socket_option_value) -> int { + switch (socket_option_key) { + case ProducerCallbacksOptions::NEW_CONTENT_OBJECT: + *socket_option_value = &on_new_segment_; + break; + + case ProducerCallbacksOptions::CONTENT_OBJECT_SIGN: + *socket_option_value = &on_content_object_to_sign_; + break; + + case ProducerCallbacksOptions::CONTENT_OBJECT_READY: + *socket_option_value = &on_content_object_in_output_buffer_; + break; + + case ProducerCallbacksOptions::CONTENT_OBJECT_OUTPUT: + *socket_option_value = &on_content_object_output_; + break; + + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; + }); +} + +int ProducerSocket::getSocketOption( + int socket_option_key, ProducerContentCallback **socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ProducerContentCallback **socket_option_value) -> int { + switch (socket_option_key) { + case ProducerCallbacksOptions::CONTENT_PRODUCED: + *socket_option_value = &on_content_produced_; + break; + + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; + }); +} + +int ProducerSocket::getSocketOption( + int socket_option_key, ProducerInterestCallback **socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ProducerInterestCallback **socket_option_value) -> int { + switch (socket_option_key) { + case ProducerCallbacksOptions::INTEREST_INPUT: + *socket_option_value = &on_interest_input_; + break; + + case ProducerCallbacksOptions::INTEREST_DROP: + *socket_option_value = &on_interest_dropped_input_buffer_; + break; + + case ProducerCallbacksOptions::INTEREST_PASS: + *socket_option_value = &on_interest_inserted_input_buffer_; + break; + + case CACHE_HIT: + *socket_option_value = &on_interest_satisfied_output_buffer_; + break; + + case CACHE_MISS: + *socket_option_value = &on_interest_process_; + break; + + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; + }); +} + +int ProducerSocket::getSocketOption( + int socket_option_key, std::shared_ptr &socket_option_value) { + switch (socket_option_key) { + case PORTAL: + socket_option_value = portal_; + break; + default: + return SOCKET_OPTION_NOT_GET; + ; + } + + return SOCKET_OPTION_GET; +} + +int ProducerSocket::getSocketOption(int socket_option_key, + HashAlgorithm &socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::HASH_ALGORITHM: + socket_option_value = hash_algorithm_; + break; + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; +} + +int ProducerSocket::getSocketOption(int socket_option_key, + utils::CryptoSuite &socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::HASH_ALGORITHM: + socket_option_value = crypto_suite_; + break; + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; +} + +int ProducerSocket::getSocketOption( + int socket_option_key, + std::shared_ptr &socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::IDENTITY: { + utils::SpinLock::Acquire locked(identity_lock_); + socket_option_value = identity_; + } break; + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; +} + +int ProducerSocket::getSocketOption(int socket_option_key, + std::string &socket_option_value) { + switch (socket_option_key) { + case DataLinkOptions::OUTPUT_INTERFACE: + socket_option_value = output_interface_; + break; + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; +} + asio::io_service &ProducerSocket::getIoService() { return io_service_; } } // namespace interface -- cgit 1.2.3-korg