diff options
author | Alberto Compagno <acompagn+fdio@cisco.com> | 2019-10-15 18:08:41 +0200 |
---|---|---|
committer | Alberto Compagno <acompagn+fdio@cisco.com> | 2019-10-22 11:01:41 +0200 |
commit | 755c6833ae2d2eee87e80ed3b84c75e968f48c46 (patch) | |
tree | 653345beb889acabc83b3b3b03e849fa34b1baac /libtransport/src/hicn/transport/interfaces/socket_consumer.h | |
parent | 7204bac00804448a797d4e76ced04a3b84d0d741 (diff) |
[HICN-328] Reworking setSocketOption and getSocketOption to be thread-safe
Change-Id: Ie22572822b9ac1e6c300fd7982035c799546bd76
Signed-off-by: Alberto Compagno <acompagn+fdio@cisco.com>
Diffstat (limited to 'libtransport/src/hicn/transport/interfaces/socket_consumer.h')
-rw-r--r-- | libtransport/src/hicn/transport/interfaces/socket_consumer.h | 673 |
1 files changed, 79 insertions, 594 deletions
diff --git a/libtransport/src/hicn/transport/interfaces/socket_consumer.h b/libtransport/src/hicn/transport/interfaces/socket_consumer.h index 8f7a9718c..e3620b269 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_consumer.h +++ b/libtransport/src/hicn/transport/interfaces/socket_consumer.h @@ -179,20 +179,6 @@ class ConsumerSocket : public BaseSocket { int asyncConsume(const Name &name); /** - * Send an interest asynchronously in another thread, which is the same used - * for asyncConsume. - * - * @param interest - An Interest::Ptr to the interest. Notice that the - * application looses the ownership of the interest, which is transferred to - * the library itself. - * @param callback - A ConsumerCallback containing the events to be trigger in - * case of timeout or content reception. - * - */ - void asyncSendInterest(Interest::Ptr &&interest, - Portal::ConsumerCallback *callback); - - /** * Stops the consumer socket. If several downloads are queued (using * asyncConsume), this call stops just the current one. */ @@ -211,595 +197,94 @@ class ConsumerSocket : public BaseSocket { */ asio::io_service &getIoService() override; - TRANSPORT_ALWAYS_INLINE int setSocketOption( - int socket_option_key, ReadCallback *socket_option_value) { - switch (socket_option_key) { - case ConsumerCallbacksOptions::READ_CALLBACK: - read_callback_ = socket_option_value; - break; - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } - - TRANSPORT_ALWAYS_INLINE int getSocketOption( - int socket_option_key, ReadCallback **socket_option_value) { - switch (socket_option_key) { - case ConsumerCallbacksOptions::READ_CALLBACK: - *socket_option_value = read_callback_; - break; - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } - - TRANSPORT_ALWAYS_INLINE int setSocketOption(int socket_option_key, - double socket_option_value) { - switch (socket_option_key) { - case MIN_WINDOW_SIZE: - min_window_size_ = socket_option_value; - break; - - case MAX_WINDOW_SIZE: - max_window_size_ = socket_option_value; - break; - - case CURRENT_WINDOW_SIZE: - current_window_size_ = socket_option_value; - break; - - case GAMMA_VALUE: - gamma_ = socket_option_value; - break; - - case BETA_VALUE: - beta_ = socket_option_value; - break; - - case DROP_FACTOR: - drop_factor_ = socket_option_value; - break; - - case MINIMUM_DROP_PROBABILITY: - minimum_drop_probability_ = socket_option_value; - break; - - case RATE_ESTIMATION_ALPHA: - if (socket_option_value >= 0 && socket_option_value < 1) { - rate_estimation_alpha_ = socket_option_value; - } else { - rate_estimation_alpha_ = default_values::alpha; - } - break; - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } - - TRANSPORT_ALWAYS_INLINE int setSocketOption(int socket_option_key, - uint32_t socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::INPUT_BUFFER_SIZE: - input_buffer_size_ = socket_option_value; - break; - - case GeneralTransportOptions::OUTPUT_BUFFER_SIZE: - output_buffer_size_ = socket_option_value; - break; - - case GeneralTransportOptions::MAX_INTEREST_RETX: - max_retransmissions_ = socket_option_value; - break; - - case GeneralTransportOptions::INTEREST_LIFETIME: - interest_lifetime_ = socket_option_value; - break; - - case ConsumerCallbacksOptions::INTEREST_RETRANSMISSION: - if (socket_option_value == VOID_HANDLER) { - on_interest_retransmission_ = VOID_HANDLER; - break; - } - - case ConsumerCallbacksOptions::INTEREST_EXPIRED: - if (socket_option_value == VOID_HANDLER) { - on_interest_timeout_ = VOID_HANDLER; - break; - } - - case ConsumerCallbacksOptions::INTEREST_SATISFIED: - if (socket_option_value == VOID_HANDLER) { - on_interest_satisfied_ = VOID_HANDLER; - break; - } - - case ConsumerCallbacksOptions::INTEREST_OUTPUT: - if (socket_option_value == VOID_HANDLER) { - on_interest_output_ = VOID_HANDLER; - break; - } - - case ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT: - if (socket_option_value == VOID_HANDLER) { - on_content_object_input_ = VOID_HANDLER; - break; - } - - case ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY: - if (socket_option_value == VOID_HANDLER) { - on_content_object_verification_ = VOID_HANDLER; - break; - } - - case RateEstimationOptions::RATE_ESTIMATION_BATCH_PARAMETER: - if (socket_option_value > 0) { - rate_estimation_batching_parameter_ = socket_option_value; - } else { - rate_estimation_batching_parameter_ = default_values::batch; - } - break; - - case RateEstimationOptions::RATE_ESTIMATION_CHOICE: - if (socket_option_value > 0) { - rate_estimation_choice_ = socket_option_value; - } else { - rate_estimation_choice_ = default_values::rate_choice; - } - break; - - case GeneralTransportOptions::STATS_INTERVAL: - timer_interval_milliseconds_ = socket_option_value; - break; - - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } - - TRANSPORT_ALWAYS_INLINE int setSocketOption(int socket_option_key, - bool socket_option_value) { - switch (socket_option_key) { - case OtherOptions::VIRTUAL_DOWNLOAD: - virtual_download_ = socket_option_value; - break; - - case RaaqmTransportOptions::RTT_STATS: - rtt_stats_ = socket_option_value; - break; - - case GeneralTransportOptions::VERIFY_SIGNATURE: - verify_signature_ = socket_option_value; - break; - - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } - - TRANSPORT_ALWAYS_INLINE int setSocketOption(int socket_option_key, - Name *socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::NETWORK_NAME: - network_name_ = *socket_option_value; - break; - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } - - TRANSPORT_ALWAYS_INLINE int setSocketOption( - int socket_option_key, - ConsumerContentObjectCallback socket_option_value) { - switch (socket_option_key) { - case ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT: - on_content_object_input_ = socket_option_value; - break; + virtual int setSocketOption(int socket_option_key, + ReadCallback *socket_option_value); + + virtual int getSocketOption(int socket_option_key, + ReadCallback **socket_option_value); - default: - return SOCKET_OPTION_NOT_SET; - } + virtual int setSocketOption(int socket_option_key, + double socket_option_value); - return SOCKET_OPTION_SET; - } + virtual int setSocketOption(int socket_option_key, + uint32_t socket_option_value); - TRANSPORT_ALWAYS_INLINE int setSocketOption( + virtual int setSocketOption(int socket_option_key, + std::nullptr_t socket_option_value); + + virtual int setSocketOption(int socket_option_key, bool socket_option_value); + + virtual int setSocketOption( + int socket_option_key, ConsumerContentObjectCallback socket_option_value); + + virtual int setSocketOption( int socket_option_key, - ConsumerContentObjectVerificationCallback socket_option_value) { - switch (socket_option_key) { - case ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY: - on_content_object_verification_ = socket_option_value; - break; - - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } - - TRANSPORT_ALWAYS_INLINE int setSocketOption( - int socket_option_key, ConsumerInterestCallback socket_option_value) { - switch (socket_option_key) { - case ConsumerCallbacksOptions::INTEREST_RETRANSMISSION: - on_interest_retransmission_ = socket_option_value; - break; - - case ConsumerCallbacksOptions::INTEREST_OUTPUT: - on_interest_output_ = socket_option_value; - break; - - case ConsumerCallbacksOptions::INTEREST_EXPIRED: - on_interest_timeout_ = socket_option_value; - break; - - case ConsumerCallbacksOptions::INTEREST_SATISFIED: - on_interest_satisfied_ = socket_option_value; - break; - - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } - - TRANSPORT_ALWAYS_INLINE int setSocketOption( - int socket_option_key, ConsumerManifestCallback socket_option_value) { - switch (socket_option_key) { - case ConsumerCallbacksOptions::MANIFEST_INPUT: - on_manifest_ = socket_option_value; - break; - - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } - - TRANSPORT_ALWAYS_INLINE int setSocketOption( - int socket_option_key, IcnObserver *socket_option_value) { - switch (socket_option_key) { - case RateEstimationOptions::RATE_ESTIMATION_OBSERVER: - rate_estimation_observer_ = socket_option_value; - break; - - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } - - TRANSPORT_ALWAYS_INLINE int setSocketOption( + ConsumerContentObjectVerificationCallback socket_option_value); + + virtual int setSocketOption(int socket_option_key, + ConsumerInterestCallback socket_option_value); + + virtual int setSocketOption(int socket_option_key, + ConsumerManifestCallback socket_option_value); + + virtual int setSocketOption(int socket_option_key, + IcnObserver *socket_option_value); + + virtual int setSocketOption( int socket_option_key, - const std::shared_ptr<utils::Verifier> &socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::VERIFIER: - verifier_ = socket_option_value; - break; - - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } - - TRANSPORT_ALWAYS_INLINE int setSocketOption( - int socket_option_key, const std::string &socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::CERTIFICATE: - key_id_ = verifier_->addKeyFromCertificate(socket_option_value); - - if (key_id_ != nullptr) { - break; - } - - case DataLinkOptions::OUTPUT_INTERFACE: - output_interface_ = socket_option_value; - portal_->setOutputInterface(output_interface_); - break; - - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } - - TRANSPORT_ALWAYS_INLINE int setSocketOption( - int socket_option_key, ConsumerTimerCallback socket_option_value) { - switch (socket_option_key) { - case ConsumerCallbacksOptions::STATS_SUMMARY: - stats_summary_ = socket_option_value; - break; - - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } - - TRANSPORT_ALWAYS_INLINE int getSocketOption(int socket_option_key, - double &socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::MIN_WINDOW_SIZE: - socket_option_value = min_window_size_; - break; - - case GeneralTransportOptions::MAX_WINDOW_SIZE: - socket_option_value = max_window_size_; - break; - - case GeneralTransportOptions::CURRENT_WINDOW_SIZE: - socket_option_value = current_window_size_; - break; - - // RAAQM parameters - - case RaaqmTransportOptions::GAMMA_VALUE: - socket_option_value = gamma_; - break; - - case RaaqmTransportOptions::BETA_VALUE: - socket_option_value = beta_; - break; - - case RaaqmTransportOptions::DROP_FACTOR: - socket_option_value = drop_factor_; - break; - - case RaaqmTransportOptions::MINIMUM_DROP_PROBABILITY: - socket_option_value = minimum_drop_probability_; - break; - - case RateEstimationOptions::RATE_ESTIMATION_ALPHA: - socket_option_value = rate_estimation_alpha_; - break; - - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } - - TRANSPORT_ALWAYS_INLINE int getSocketOption(int socket_option_key, - uint32_t &socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::INPUT_BUFFER_SIZE: - socket_option_value = (uint32_t)input_buffer_size_; - break; - - case GeneralTransportOptions::OUTPUT_BUFFER_SIZE: - socket_option_value = (uint32_t)output_buffer_size_; - break; - - case GeneralTransportOptions::MAX_INTEREST_RETX: - socket_option_value = max_retransmissions_; - break; - - case GeneralTransportOptions::INTEREST_LIFETIME: - socket_option_value = interest_lifetime_; - break; - - case RaaqmTransportOptions::SAMPLE_NUMBER: - socket_option_value = sample_number_; - break; - - case RateEstimationOptions::RATE_ESTIMATION_BATCH_PARAMETER: - socket_option_value = rate_estimation_batching_parameter_; - break; - - case RateEstimationOptions::RATE_ESTIMATION_CHOICE: - socket_option_value = rate_estimation_choice_; - break; - - case GeneralTransportOptions::STATS_INTERVAL: - socket_option_value = timer_interval_milliseconds_; - break; - - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } - - TRANSPORT_ALWAYS_INLINE int getSocketOption(int socket_option_key, - bool &socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::ASYNC_MODE: - socket_option_value = is_async_; - break; + const std::shared_ptr<utils::Verifier> &socket_option_value); - case GeneralTransportOptions::RUNNING: - socket_option_value = transport_protocol_->isRunning(); - break; + virtual int setSocketOption(int socket_option_key, + const std::string &socket_option_value); - case OtherOptions::VIRTUAL_DOWNLOAD: - socket_option_value = virtual_download_; - break; - - case RaaqmTransportOptions::RTT_STATS: - socket_option_value = rtt_stats_; - break; + virtual int setSocketOption(int socket_option_key, + ConsumerTimerCallback socket_option_value); - case GeneralTransportOptions::VERIFY_SIGNATURE: - socket_option_value = verify_signature_; - break; + virtual int getSocketOption(int socket_option_key, + double &socket_option_value); - default: - return SOCKET_OPTION_NOT_GET; - } + virtual int getSocketOption(int socket_option_key, + uint32_t &socket_option_value); - return SOCKET_OPTION_GET; - } - - TRANSPORT_ALWAYS_INLINE int getSocketOption(int socket_option_key, - Name **socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::NETWORK_NAME: - *socket_option_value = &network_name_; - break; - - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } - - TRANSPORT_ALWAYS_INLINE int getSocketOption( - int socket_option_key, - ConsumerContentObjectCallback **socket_option_value) { - switch (socket_option_key) { - case ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT: - *socket_option_value = &on_content_object_input_; - break; + virtual int getSocketOption(int socket_option_key, bool &socket_option_value); - default: - return SOCKET_OPTION_NOT_GET; - } + virtual int getSocketOption(int socket_option_key, + Name **socket_option_value); - return SOCKET_OPTION_GET; - } + virtual int getSocketOption( + int socket_option_key, + ConsumerContentObjectCallback **socket_option_value); - TRANSPORT_ALWAYS_INLINE int getSocketOption( + virtual int getSocketOption( int socket_option_key, - ConsumerContentObjectVerificationCallback **socket_option_value) { - switch (socket_option_key) { - case ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY: - *socket_option_value = &on_content_object_verification_; - break; - - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } - - TRANSPORT_ALWAYS_INLINE int getSocketOption( - int socket_option_key, ConsumerInterestCallback **socket_option_value) { - switch (socket_option_key) { - case ConsumerCallbacksOptions::INTEREST_RETRANSMISSION: - *socket_option_value = &on_interest_retransmission_; - break; - - case ConsumerCallbacksOptions::INTEREST_OUTPUT: - *socket_option_value = &on_interest_output_; - break; - - case ConsumerCallbacksOptions::INTEREST_EXPIRED: - *socket_option_value = &on_interest_timeout_; - break; - - case ConsumerCallbacksOptions::INTEREST_SATISFIED: - *socket_option_value = &on_interest_satisfied_; - break; - - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } - - TRANSPORT_ALWAYS_INLINE int getSocketOption( - int socket_option_key, ConsumerManifestCallback **socket_option_value) { - switch (socket_option_key) { - case ConsumerCallbacksOptions::MANIFEST_INPUT: - *socket_option_value = &on_manifest_; - break; - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } - - TRANSPORT_ALWAYS_INLINE int getSocketOption( - int socket_option_key, std::shared_ptr<Portal> &socket_option_value) { - switch (socket_option_key) { - case PORTAL: - socket_option_value = portal_; - break; - - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } - - TRANSPORT_ALWAYS_INLINE int getSocketOption( - int socket_option_key, IcnObserver **socket_option_value) { - switch (socket_option_key) { - case RateEstimationOptions::RATE_ESTIMATION_OBSERVER: - *socket_option_value = (rate_estimation_observer_); - break; - - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } - - TRANSPORT_ALWAYS_INLINE int getSocketOption( + ConsumerContentObjectVerificationCallback **socket_option_value); + + virtual int getSocketOption(int socket_option_key, + ConsumerInterestCallback **socket_option_value); + + virtual int getSocketOption(int socket_option_key, + ConsumerManifestCallback **socket_option_value); + + virtual int getSocketOption(int socket_option_key, + std::shared_ptr<Portal> &socket_option_value); + + virtual int getSocketOption(int socket_option_key, + IcnObserver **socket_option_value); + + virtual int getSocketOption( int socket_option_key, - std::shared_ptr<utils::Verifier> &socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::VERIFIER: - socket_option_value = verifier_; - break; - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } - - TRANSPORT_ALWAYS_INLINE int getSocketOption( - int socket_option_key, std::string &socket_option_value) { - switch (socket_option_key) { - case DataLinkOptions::OUTPUT_INTERFACE: - socket_option_value = output_interface_; - break; - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } - - TRANSPORT_ALWAYS_INLINE int getSocketOption( - int socket_option_key, ConsumerTimerCallback **socket_option_value) { - switch (socket_option_key) { - case ConsumerCallbacksOptions::STATS_SUMMARY: - *socket_option_value = &stats_summary_; - break; - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } + std::shared_ptr<utils::Verifier> &socket_option_value); + + virtual int getSocketOption(int socket_option_key, + std::string &socket_option_value); + + virtual int getSocketOption(int socket_option_key, + ConsumerTimerCallback **socket_option_value); + + protected: + template <typename Lambda, typename arg2> + int rescheduleOnIOService(int socket_option_key, arg2 socket_option_value, + Lambda lambda_func); private: asio::io_service internal_io_service_; @@ -808,6 +293,9 @@ class ConsumerSocket : public BaseSocket { std::shared_ptr<Portal> portal_; utils::EventThread async_downloader_; + // No need to protect from multiple accesses in the async consumer + // The parameter is accessible only with a getSocketOption and + // set from the consume Name network_name_; int interest_lifetime_; @@ -816,8 +304,6 @@ class ConsumerSocket : public BaseSocket { double max_window_size_; double current_window_size_; uint32_t max_retransmissions_; - size_t output_buffer_size_; - size_t input_buffer_size_; // RAAQM Parameters double minimum_drop_probability_; @@ -832,12 +318,10 @@ class ConsumerSocket : public BaseSocket { int rate_estimation_batching_parameter_; int rate_estimation_choice_; - bool is_async_; - // Verification parameters std::shared_ptr<utils::Verifier> verifier_; PARCKeyId *key_id_; - bool verify_signature_; + std::atomic_bool verify_signature_; ConsumerInterestCallback on_interest_retransmission_; ConsumerInterestCallback on_interest_output_; @@ -853,12 +337,13 @@ class ConsumerSocket : public BaseSocket { // Virtual download for traffic generator bool virtual_download_; - bool rtt_stats_; uint32_t timer_interval_milliseconds_; // Transport protocol std::unique_ptr<TransportProtocol> transport_protocol_; + + utils::SpinLock guard_raaqm_params_; }; } // namespace interface |