aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/hicn/transport/interfaces/socket_producer.h
diff options
context:
space:
mode:
authorAlberto Compagno <acompagn+fdio@cisco.com>2019-10-15 18:08:41 +0200
committerAlberto Compagno <acompagn+fdio@cisco.com>2019-10-22 11:01:41 +0200
commit755c6833ae2d2eee87e80ed3b84c75e968f48c46 (patch)
tree653345beb889acabc83b3b3b03e849fa34b1baac /libtransport/src/hicn/transport/interfaces/socket_producer.h
parent7204bac00804448a797d4e76ced04a3b84d0d741 (diff)
[HICN-328] Reworking setSocketOption and getSocketOption to be thread-safe
Change-Id: Ie22572822b9ac1e6c300fd7982035c799546bd76 Signed-off-by: Alberto Compagno <acompagn+fdio@cisco.com>
Diffstat (limited to 'libtransport/src/hicn/transport/interfaces/socket_producer.h')
-rw-r--r--libtransport/src/hicn/transport/interfaces/socket_producer.h508
1 files changed, 62 insertions, 446 deletions
diff --git a/libtransport/src/hicn/transport/interfaces/socket_producer.h b/libtransport/src/hicn/transport/interfaces/socket_producer.h
index 18adbf4a7..5c617d761 100644
--- a/libtransport/src/hicn/transport/interfaces/socket_producer.h
+++ b/libtransport/src/hicn/transport/interfaces/socket_producer.h
@@ -91,500 +91,116 @@ class ProducerSocket : public Socket<BasePortal>,
onInterest(*interest);
};
- TRANSPORT_ALWAYS_INLINE int setSocketOption(int socket_option_key,
- uint32_t socket_option_value) {
- switch (socket_option_key) {
- case GeneralTransportOptions::DATA_PACKET_SIZE:
- if (socket_option_value < default_values::max_content_object_size &&
- socket_option_value > 0) {
- data_packet_size_ = socket_option_value;
- break;
- }
-
- case GeneralTransportOptions::INPUT_BUFFER_SIZE:
- if (socket_option_value >= 1) {
- input_buffer_capacity_ = socket_option_value;
- break;
- }
-
- case GeneralTransportOptions::OUTPUT_BUFFER_SIZE:
- output_buffer_.setLimit(socket_option_value);
- break;
-
- case GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME:
- content_object_expiry_time_ = socket_option_value;
- break;
-
- case GeneralTransportOptions::SIGNATURE_TYPE:
- if (socket_option_value == SOCKET_OPTION_DEFAULT) {
- signature_type_ = SHA_256;
- } else {
- signature_type_ = socket_option_value;
- }
-
- if (signature_type_ == SHA_256 || signature_type_ == RSA_256) {
- signature_size_ = 32;
- }
-
- break;
-
- case ProducerCallbacksOptions::INTEREST_INPUT:
- if (socket_option_value == VOID_HANDLER) {
- on_interest_input_ = VOID_HANDLER;
- break;
- }
-
- case ProducerCallbacksOptions::INTEREST_DROP:
- if (socket_option_value == VOID_HANDLER) {
- on_interest_dropped_input_buffer_ = VOID_HANDLER;
- break;
- }
-
- case ProducerCallbacksOptions::INTEREST_PASS:
- if (socket_option_value == VOID_HANDLER) {
- on_interest_inserted_input_buffer_ = VOID_HANDLER;
- break;
- }
-
- case ProducerCallbacksOptions::CACHE_HIT:
- if (socket_option_value == VOID_HANDLER) {
- on_interest_satisfied_output_buffer_ = VOID_HANDLER;
- break;
- }
-
- case ProducerCallbacksOptions::CACHE_MISS:
- if (socket_option_value == VOID_HANDLER) {
- on_interest_process_ = VOID_HANDLER;
- break;
- }
-
- case ProducerCallbacksOptions::NEW_CONTENT_OBJECT:
- if (socket_option_value == VOID_HANDLER) {
- on_new_segment_ = VOID_HANDLER;
- break;
- }
-
- case ProducerCallbacksOptions::CONTENT_OBJECT_SIGN:
- if (socket_option_value == VOID_HANDLER) {
- on_content_object_to_sign_ = VOID_HANDLER;
- break;
- }
-
- case ProducerCallbacksOptions::CONTENT_OBJECT_READY:
- if (socket_option_value == VOID_HANDLER) {
- on_content_object_in_output_buffer_ = VOID_HANDLER;
- break;
- }
-
- case ProducerCallbacksOptions::CONTENT_OBJECT_OUTPUT:
- if (socket_option_value == VOID_HANDLER) {
- on_content_object_output_ = VOID_HANDLER;
- break;
- }
-
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
- }
-
- TRANSPORT_ALWAYS_INLINE int setSocketOption(int socket_option_key,
- bool socket_option_value) {
- switch (socket_option_key) {
- case GeneralTransportOptions::MAKE_MANIFEST:
- making_manifest_ = socket_option_value;
- break;
-
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
- }
-
- TRANSPORT_ALWAYS_INLINE int setSocketOption(int socket_option_key,
- Name *socket_option_value) {
- return SOCKET_OPTION_NOT_SET;
- }
-
- TRANSPORT_ALWAYS_INLINE int setSocketOption(
- int socket_option_key, std::list<Prefix> socket_option_value) {
- switch (socket_option_key) {
- case GeneralTransportOptions::NETWORK_NAME:
- served_namespaces_ = socket_option_value;
- break;
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
- }
+ virtual int setSocketOption(int socket_option_key,
+ uint32_t socket_option_value);
- TRANSPORT_ALWAYS_INLINE int setSocketOption(
- int socket_option_key,
- ProducerContentObjectCallback socket_option_value) {
- switch (socket_option_key) {
- case ProducerCallbacksOptions::NEW_CONTENT_OBJECT:
- on_new_segment_ = socket_option_value;
- break;
-
- case ProducerCallbacksOptions::CONTENT_OBJECT_SIGN:
- on_content_object_to_sign_ = socket_option_value;
- break;
-
- case ProducerCallbacksOptions::CONTENT_OBJECT_READY:
- on_content_object_in_output_buffer_ = socket_option_value;
- break;
+ virtual int setSocketOption(int socket_option_key,
+ std::nullptr_t socket_option_value);
- case ProducerCallbacksOptions::CONTENT_OBJECT_OUTPUT:
- on_content_object_output_ = socket_option_value;
- break;
+ virtual int setSocketOption(int socket_option_key, bool socket_option_value);
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
- }
+ virtual int setSocketOption(int socket_option_key, Name *socket_option_value);
- TRANSPORT_ALWAYS_INLINE int setSocketOption(
- int socket_option_key, ProducerInterestCallback socket_option_value) {
- switch (socket_option_key) {
- case ProducerCallbacksOptions::INTEREST_INPUT:
- on_interest_input_ = socket_option_value;
- break;
+ virtual int setSocketOption(int socket_option_key,
+ std::list<Prefix> socket_option_value);
- case ProducerCallbacksOptions::INTEREST_DROP:
- on_interest_dropped_input_buffer_ = socket_option_value;
- break;
+ virtual int setSocketOption(
+ int socket_option_key, ProducerContentObjectCallback socket_option_value);
- case ProducerCallbacksOptions::INTEREST_PASS:
- on_interest_inserted_input_buffer_ = socket_option_value;
- break;
+ virtual int setSocketOption(int socket_option_key,
+ ProducerInterestCallback socket_option_value);
- case ProducerCallbacksOptions::CACHE_HIT:
- on_interest_satisfied_output_buffer_ = socket_option_value;
- break;
-
- case ProducerCallbacksOptions::CACHE_MISS:
- on_interest_process_ = socket_option_value;
- break;
-
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
- }
+ virtual int setSocketOption(int socket_option_key,
+ ProducerContentCallback socket_option_value);
- TRANSPORT_ALWAYS_INLINE int setSocketOption(
- int socket_option_key, ProducerContentCallback socket_option_value) {
- switch (socket_option_key) {
- case ProducerCallbacksOptions::CONTENT_PRODUCED:
- on_content_produced_ = socket_option_value;
- break;
+ virtual int setSocketOption(int socket_option_key,
+ HashAlgorithm socket_option_value);
- default:
- return SOCKET_OPTION_NOT_SET;
- }
+ virtual int setSocketOption(int socket_option_key,
+ utils::CryptoSuite socket_option_value);
- return SOCKET_OPTION_SET;
- }
-
- TRANSPORT_ALWAYS_INLINE int setSocketOption(
- int socket_option_key, HashAlgorithm socket_option_value) {
- switch (socket_option_key) {
- case GeneralTransportOptions::HASH_ALGORITHM:
- hash_algorithm_ = socket_option_value;
- break;
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
- }
-
- TRANSPORT_ALWAYS_INLINE int setSocketOption(
- int socket_option_key, utils::CryptoSuite socket_option_value) {
- switch (socket_option_key) {
- case GeneralTransportOptions::CRYPTO_SUITE:
- crypto_suite_ = socket_option_value;
- break;
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
- }
-
- TRANSPORT_ALWAYS_INLINE int setSocketOption(
+ virtual int setSocketOption(
int socket_option_key,
- const std::shared_ptr<utils::Identity> &socket_option_value) {
- switch (socket_option_key) {
- case GeneralTransportOptions::IDENTITY:
- identity_ = socket_option_value;
- break;
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
- }
-
- TRANSPORT_ALWAYS_INLINE int setSocketOption(
- int socket_option_key, const std::string &socket_option_value) {
- switch (socket_option_key) {
- case DataLinkOptions::OUTPUT_INTERFACE:
- output_interface_ = socket_option_value;
- portal_->setOutputInterface(output_interface_);
- break;
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
- ;
- }
+ const std::shared_ptr<utils::Identity> &socket_option_value);
- TRANSPORT_ALWAYS_INLINE int getSocketOption(int socket_option_key,
- uint32_t &socket_option_value) {
- switch (socket_option_key) {
- case GeneralTransportOptions::INPUT_BUFFER_SIZE:
- socket_option_value = (uint32_t)input_buffer_capacity_;
- break;
+ virtual int setSocketOption(int socket_option_key,
+ const std::string &socket_option_value);
- case GeneralTransportOptions::OUTPUT_BUFFER_SIZE:
- socket_option_value = (uint32_t)output_buffer_.getLimit();
- break;
+ virtual int getSocketOption(int socket_option_key,
+ uint32_t &socket_option_value);
- case GeneralTransportOptions::DATA_PACKET_SIZE:
- socket_option_value = (uint32_t)data_packet_size_;
- break;
+ virtual int getSocketOption(int socket_option_key, bool &socket_option_value);
- case GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME:
- socket_option_value = content_object_expiry_time_;
- break;
-
- case GeneralTransportOptions::SIGNATURE_TYPE:
- socket_option_value = signature_type_;
- break;
-
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_GET;
- }
-
- TRANSPORT_ALWAYS_INLINE int getSocketOption(int socket_option_key,
- bool &socket_option_value) {
- switch (socket_option_key) {
- case GeneralTransportOptions::MAKE_MANIFEST:
- socket_option_value = making_manifest_;
- break;
-
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
- }
+ virtual int getSocketOption(int socket_option_key,
+ std::list<Prefix> &socket_option_value);
- TRANSPORT_ALWAYS_INLINE int getSocketOption(
- int socket_option_key, std::list<Prefix> &socket_option_value) {
- switch (socket_option_key) {
- case GeneralTransportOptions::NETWORK_NAME:
- socket_option_value = served_namespaces_;
- break;
-
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
- }
-
- TRANSPORT_ALWAYS_INLINE int getSocketOption(
+ virtual int getSocketOption(
int socket_option_key,
- ProducerContentObjectCallback **socket_option_value) {
- switch (socket_option_key) {
- case ProducerCallbacksOptions::NEW_CONTENT_OBJECT:
- *socket_option_value = &on_new_segment_;
- break;
-
- case ProducerCallbacksOptions::CONTENT_OBJECT_SIGN:
- *socket_option_value = &on_content_object_to_sign_;
- break;
-
- case ProducerCallbacksOptions::CONTENT_OBJECT_READY:
- *socket_option_value = &on_content_object_in_output_buffer_;
- break;
-
- case ProducerCallbacksOptions::CONTENT_OBJECT_OUTPUT:
- *socket_option_value = &on_content_object_output_;
- break;
-
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
- }
-
- TRANSPORT_ALWAYS_INLINE int getSocketOption(
- int socket_option_key, ProducerContentCallback **socket_option_value) {
- switch (socket_option_key) {
- case ProducerCallbacksOptions::CONTENT_PRODUCED:
- *socket_option_value = &on_content_produced_;
- break;
-
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
- }
-
- TRANSPORT_ALWAYS_INLINE int getSocketOption(
- int socket_option_key, ProducerInterestCallback **socket_option_value) {
- switch (socket_option_key) {
- case ProducerCallbacksOptions::INTEREST_INPUT:
- *socket_option_value = &on_interest_input_;
- break;
-
- case ProducerCallbacksOptions::INTEREST_DROP:
- *socket_option_value = &on_interest_dropped_input_buffer_;
- break;
-
- case ProducerCallbacksOptions::INTEREST_PASS:
- *socket_option_value = &on_interest_inserted_input_buffer_;
- break;
+ ProducerContentObjectCallback **socket_option_value);
- case CACHE_HIT:
- *socket_option_value = &on_interest_satisfied_output_buffer_;
- break;
+ virtual int getSocketOption(int socket_option_key,
+ ProducerContentCallback **socket_option_value);
- case CACHE_MISS:
- *socket_option_value = &on_interest_process_;
- break;
+ virtual int getSocketOption(int socket_option_key,
+ ProducerInterestCallback **socket_option_value);
- default:
- return SOCKET_OPTION_NOT_GET;
- }
+ virtual int getSocketOption(int socket_option_key,
+ std::shared_ptr<Portal> &socket_option_value);
- return SOCKET_OPTION_GET;
- }
+ virtual int getSocketOption(int socket_option_key,
+ HashAlgorithm &socket_option_value);
- TRANSPORT_ALWAYS_INLINE int getSocketOption(
- int socket_option_key, std::shared_ptr<Portal> &socket_option_value) {
- switch (socket_option_key) {
- case PORTAL:
- socket_option_value = portal_;
- break;
- default:
- return SOCKET_OPTION_NOT_GET;
- ;
- }
-
- return SOCKET_OPTION_GET;
- }
+ virtual int getSocketOption(int socket_option_key,
+ utils::CryptoSuite &socket_option_value);
- TRANSPORT_ALWAYS_INLINE int getSocketOption(
- int socket_option_key, HashAlgorithm &socket_option_value) {
- switch (socket_option_key) {
- case GeneralTransportOptions::HASH_ALGORITHM:
- socket_option_value = hash_algorithm_;
- break;
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
- }
+ virtual int getSocketOption(
+ int socket_option_key,
+ std::shared_ptr<utils::Identity> &socket_option_value);
- TRANSPORT_ALWAYS_INLINE int getSocketOption(
- int socket_option_key, utils::CryptoSuite &socket_option_value) {
- switch (socket_option_key) {
- case GeneralTransportOptions::HASH_ALGORITHM:
- socket_option_value = crypto_suite_;
- break;
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
- }
+ virtual int getSocketOption(int socket_option_key,
+ std::string &socket_option_value);
- TRANSPORT_ALWAYS_INLINE int getSocketOption(
- int socket_option_key,
- std::shared_ptr<utils::Identity> &socket_option_value) {
- switch (socket_option_key) {
- case GeneralTransportOptions::IDENTITY:
- if (identity_) {
- socket_option_value = identity_;
- break;
- }
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
- }
+ template <typename Lambda, typename arg2>
+ int rescheduleOnIOService(int socket_option_key, arg2 socket_option_value,
+ Lambda lambda_func);
- TRANSPORT_ALWAYS_INLINE int getSocketOption(
- int socket_option_key, std::string &socket_option_value) {
- switch (socket_option_key) {
- case DataLinkOptions::OUTPUT_INTERFACE:
- socket_option_value = output_interface_;
- break;
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
- }
+ template <typename Lambda, typename arg2>
+ int rescheduleOnIOServiceWithReference(int socket_option_key,
+ arg2 &socket_option_value,
+ Lambda lambda_func);
- private:
+ protected:
// Threads
std::thread listening_thread_;
- protected:
asio::io_service internal_io_service_;
asio::io_service &io_service_;
std::shared_ptr<Portal> portal_;
- std::size_t data_packet_size_;
- std::list<Prefix> served_namespaces_;
- uint32_t content_object_expiry_time_;
+ std::atomic<size_t> data_packet_size_;
+ std::list<Prefix>
+ served_namespaces_; // No need to be threadsafe, this is always modified
+ // by the application thread
+ std::atomic<uint32_t> content_object_expiry_time_;
// buffers
+ // ContentStore is thread-safe
utils::ContentStore output_buffer_;
- private:
utils::EventThread async_thread_;
int registration_status_;
- bool making_manifest_;
+ std::atomic<bool> making_manifest_;
// map for storing sequence numbers for several calls of the publish
// function
std::unordered_map<Name, std::unordered_map<int, uint32_t>> seq_number_map_;
- int signature_type_;
- int signature_size_;
-
- HashAlgorithm hash_algorithm_;
- utils::CryptoSuite crypto_suite_;
+ std::atomic<HashAlgorithm> hash_algorithm_;
+ std::atomic<utils::CryptoSuite> crypto_suite_;
+ utils::SpinLock identity_lock_;
std::shared_ptr<utils::Identity> identity_;
- // buffers
-
- std::queue<std::shared_ptr<const Interest>> input_buffer_;
- std::atomic_size_t input_buffer_capacity_;
- std::atomic_size_t input_buffer_size_;
-
// callbacks
- protected:
ProducerInterestCallback on_interest_input_;
ProducerInterestCallback on_interest_dropped_input_buffer_;
ProducerInterestCallback on_interest_inserted_input_buffer_;