aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/hicn/transport/interfaces/socket_consumer.h
diff options
context:
space:
mode:
authorAlberto Compagno <acompagn+fdio@cisco.com>2019-10-15 18:08:41 +0200
committerAlberto Compagno <acompagn+fdio@cisco.com>2019-10-22 11:01:41 +0200
commit755c6833ae2d2eee87e80ed3b84c75e968f48c46 (patch)
tree653345beb889acabc83b3b3b03e849fa34b1baac /libtransport/src/hicn/transport/interfaces/socket_consumer.h
parent7204bac00804448a797d4e76ced04a3b84d0d741 (diff)
[HICN-328] Reworking setSocketOption and getSocketOption to be thread-safe
Change-Id: Ie22572822b9ac1e6c300fd7982035c799546bd76 Signed-off-by: Alberto Compagno <acompagn+fdio@cisco.com>
Diffstat (limited to 'libtransport/src/hicn/transport/interfaces/socket_consumer.h')
-rw-r--r--libtransport/src/hicn/transport/interfaces/socket_consumer.h673
1 files changed, 79 insertions, 594 deletions
diff --git a/libtransport/src/hicn/transport/interfaces/socket_consumer.h b/libtransport/src/hicn/transport/interfaces/socket_consumer.h
index 8f7a9718c..e3620b269 100644
--- a/libtransport/src/hicn/transport/interfaces/socket_consumer.h
+++ b/libtransport/src/hicn/transport/interfaces/socket_consumer.h
@@ -179,20 +179,6 @@ class ConsumerSocket : public BaseSocket {
int asyncConsume(const Name &name);
/**
- * Send an interest asynchronously in another thread, which is the same used
- * for asyncConsume.
- *
- * @param interest - An Interest::Ptr to the interest. Notice that the
- * application looses the ownership of the interest, which is transferred to
- * the library itself.
- * @param callback - A ConsumerCallback containing the events to be trigger in
- * case of timeout or content reception.
- *
- */
- void asyncSendInterest(Interest::Ptr &&interest,
- Portal::ConsumerCallback *callback);
-
- /**
* Stops the consumer socket. If several downloads are queued (using
* asyncConsume), this call stops just the current one.
*/
@@ -211,595 +197,94 @@ class ConsumerSocket : public BaseSocket {
*/
asio::io_service &getIoService() override;
- TRANSPORT_ALWAYS_INLINE int setSocketOption(
- int socket_option_key, ReadCallback *socket_option_value) {
- switch (socket_option_key) {
- case ConsumerCallbacksOptions::READ_CALLBACK:
- read_callback_ = socket_option_value;
- break;
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
- }
-
- TRANSPORT_ALWAYS_INLINE int getSocketOption(
- int socket_option_key, ReadCallback **socket_option_value) {
- switch (socket_option_key) {
- case ConsumerCallbacksOptions::READ_CALLBACK:
- *socket_option_value = read_callback_;
- break;
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
- }
-
- TRANSPORT_ALWAYS_INLINE int setSocketOption(int socket_option_key,
- double socket_option_value) {
- switch (socket_option_key) {
- case MIN_WINDOW_SIZE:
- min_window_size_ = socket_option_value;
- break;
-
- case MAX_WINDOW_SIZE:
- max_window_size_ = socket_option_value;
- break;
-
- case CURRENT_WINDOW_SIZE:
- current_window_size_ = socket_option_value;
- break;
-
- case GAMMA_VALUE:
- gamma_ = socket_option_value;
- break;
-
- case BETA_VALUE:
- beta_ = socket_option_value;
- break;
-
- case DROP_FACTOR:
- drop_factor_ = socket_option_value;
- break;
-
- case MINIMUM_DROP_PROBABILITY:
- minimum_drop_probability_ = socket_option_value;
- break;
-
- case RATE_ESTIMATION_ALPHA:
- if (socket_option_value >= 0 && socket_option_value < 1) {
- rate_estimation_alpha_ = socket_option_value;
- } else {
- rate_estimation_alpha_ = default_values::alpha;
- }
- break;
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
- }
-
- TRANSPORT_ALWAYS_INLINE int setSocketOption(int socket_option_key,
- uint32_t socket_option_value) {
- switch (socket_option_key) {
- case GeneralTransportOptions::INPUT_BUFFER_SIZE:
- input_buffer_size_ = socket_option_value;
- break;
-
- case GeneralTransportOptions::OUTPUT_BUFFER_SIZE:
- output_buffer_size_ = socket_option_value;
- break;
-
- case GeneralTransportOptions::MAX_INTEREST_RETX:
- max_retransmissions_ = socket_option_value;
- break;
-
- case GeneralTransportOptions::INTEREST_LIFETIME:
- interest_lifetime_ = socket_option_value;
- break;
-
- case ConsumerCallbacksOptions::INTEREST_RETRANSMISSION:
- if (socket_option_value == VOID_HANDLER) {
- on_interest_retransmission_ = VOID_HANDLER;
- break;
- }
-
- case ConsumerCallbacksOptions::INTEREST_EXPIRED:
- if (socket_option_value == VOID_HANDLER) {
- on_interest_timeout_ = VOID_HANDLER;
- break;
- }
-
- case ConsumerCallbacksOptions::INTEREST_SATISFIED:
- if (socket_option_value == VOID_HANDLER) {
- on_interest_satisfied_ = VOID_HANDLER;
- break;
- }
-
- case ConsumerCallbacksOptions::INTEREST_OUTPUT:
- if (socket_option_value == VOID_HANDLER) {
- on_interest_output_ = VOID_HANDLER;
- break;
- }
-
- case ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT:
- if (socket_option_value == VOID_HANDLER) {
- on_content_object_input_ = VOID_HANDLER;
- break;
- }
-
- case ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY:
- if (socket_option_value == VOID_HANDLER) {
- on_content_object_verification_ = VOID_HANDLER;
- break;
- }
-
- case RateEstimationOptions::RATE_ESTIMATION_BATCH_PARAMETER:
- if (socket_option_value > 0) {
- rate_estimation_batching_parameter_ = socket_option_value;
- } else {
- rate_estimation_batching_parameter_ = default_values::batch;
- }
- break;
-
- case RateEstimationOptions::RATE_ESTIMATION_CHOICE:
- if (socket_option_value > 0) {
- rate_estimation_choice_ = socket_option_value;
- } else {
- rate_estimation_choice_ = default_values::rate_choice;
- }
- break;
-
- case GeneralTransportOptions::STATS_INTERVAL:
- timer_interval_milliseconds_ = socket_option_value;
- break;
-
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
- }
-
- TRANSPORT_ALWAYS_INLINE int setSocketOption(int socket_option_key,
- bool socket_option_value) {
- switch (socket_option_key) {
- case OtherOptions::VIRTUAL_DOWNLOAD:
- virtual_download_ = socket_option_value;
- break;
-
- case RaaqmTransportOptions::RTT_STATS:
- rtt_stats_ = socket_option_value;
- break;
-
- case GeneralTransportOptions::VERIFY_SIGNATURE:
- verify_signature_ = socket_option_value;
- break;
-
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
- }
-
- TRANSPORT_ALWAYS_INLINE int setSocketOption(int socket_option_key,
- Name *socket_option_value) {
- switch (socket_option_key) {
- case GeneralTransportOptions::NETWORK_NAME:
- network_name_ = *socket_option_value;
- break;
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
- }
-
- TRANSPORT_ALWAYS_INLINE int setSocketOption(
- int socket_option_key,
- ConsumerContentObjectCallback socket_option_value) {
- switch (socket_option_key) {
- case ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT:
- on_content_object_input_ = socket_option_value;
- break;
+ virtual int setSocketOption(int socket_option_key,
+ ReadCallback *socket_option_value);
+
+ virtual int getSocketOption(int socket_option_key,
+ ReadCallback **socket_option_value);
- default:
- return SOCKET_OPTION_NOT_SET;
- }
+ virtual int setSocketOption(int socket_option_key,
+ double socket_option_value);
- return SOCKET_OPTION_SET;
- }
+ virtual int setSocketOption(int socket_option_key,
+ uint32_t socket_option_value);
- TRANSPORT_ALWAYS_INLINE int setSocketOption(
+ virtual int setSocketOption(int socket_option_key,
+ std::nullptr_t socket_option_value);
+
+ virtual int setSocketOption(int socket_option_key, bool socket_option_value);
+
+ virtual int setSocketOption(
+ int socket_option_key, ConsumerContentObjectCallback socket_option_value);
+
+ virtual int setSocketOption(
int socket_option_key,
- ConsumerContentObjectVerificationCallback socket_option_value) {
- switch (socket_option_key) {
- case ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY:
- on_content_object_verification_ = socket_option_value;
- break;
-
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
- }
-
- TRANSPORT_ALWAYS_INLINE int setSocketOption(
- int socket_option_key, ConsumerInterestCallback socket_option_value) {
- switch (socket_option_key) {
- case ConsumerCallbacksOptions::INTEREST_RETRANSMISSION:
- on_interest_retransmission_ = socket_option_value;
- break;
-
- case ConsumerCallbacksOptions::INTEREST_OUTPUT:
- on_interest_output_ = socket_option_value;
- break;
-
- case ConsumerCallbacksOptions::INTEREST_EXPIRED:
- on_interest_timeout_ = socket_option_value;
- break;
-
- case ConsumerCallbacksOptions::INTEREST_SATISFIED:
- on_interest_satisfied_ = socket_option_value;
- break;
-
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
- }
-
- TRANSPORT_ALWAYS_INLINE int setSocketOption(
- int socket_option_key, ConsumerManifestCallback socket_option_value) {
- switch (socket_option_key) {
- case ConsumerCallbacksOptions::MANIFEST_INPUT:
- on_manifest_ = socket_option_value;
- break;
-
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
- }
-
- TRANSPORT_ALWAYS_INLINE int setSocketOption(
- int socket_option_key, IcnObserver *socket_option_value) {
- switch (socket_option_key) {
- case RateEstimationOptions::RATE_ESTIMATION_OBSERVER:
- rate_estimation_observer_ = socket_option_value;
- break;
-
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
- }
-
- TRANSPORT_ALWAYS_INLINE int setSocketOption(
+ ConsumerContentObjectVerificationCallback socket_option_value);
+
+ virtual int setSocketOption(int socket_option_key,
+ ConsumerInterestCallback socket_option_value);
+
+ virtual int setSocketOption(int socket_option_key,
+ ConsumerManifestCallback socket_option_value);
+
+ virtual int setSocketOption(int socket_option_key,
+ IcnObserver *socket_option_value);
+
+ virtual int setSocketOption(
int socket_option_key,
- const std::shared_ptr<utils::Verifier> &socket_option_value) {
- switch (socket_option_key) {
- case GeneralTransportOptions::VERIFIER:
- verifier_ = socket_option_value;
- break;
-
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
- }
-
- TRANSPORT_ALWAYS_INLINE int setSocketOption(
- int socket_option_key, const std::string &socket_option_value) {
- switch (socket_option_key) {
- case GeneralTransportOptions::CERTIFICATE:
- key_id_ = verifier_->addKeyFromCertificate(socket_option_value);
-
- if (key_id_ != nullptr) {
- break;
- }
-
- case DataLinkOptions::OUTPUT_INTERFACE:
- output_interface_ = socket_option_value;
- portal_->setOutputInterface(output_interface_);
- break;
-
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
- }
-
- TRANSPORT_ALWAYS_INLINE int setSocketOption(
- int socket_option_key, ConsumerTimerCallback socket_option_value) {
- switch (socket_option_key) {
- case ConsumerCallbacksOptions::STATS_SUMMARY:
- stats_summary_ = socket_option_value;
- break;
-
- default:
- return SOCKET_OPTION_NOT_SET;
- }
-
- return SOCKET_OPTION_SET;
- }
-
- TRANSPORT_ALWAYS_INLINE int getSocketOption(int socket_option_key,
- double &socket_option_value) {
- switch (socket_option_key) {
- case GeneralTransportOptions::MIN_WINDOW_SIZE:
- socket_option_value = min_window_size_;
- break;
-
- case GeneralTransportOptions::MAX_WINDOW_SIZE:
- socket_option_value = max_window_size_;
- break;
-
- case GeneralTransportOptions::CURRENT_WINDOW_SIZE:
- socket_option_value = current_window_size_;
- break;
-
- // RAAQM parameters
-
- case RaaqmTransportOptions::GAMMA_VALUE:
- socket_option_value = gamma_;
- break;
-
- case RaaqmTransportOptions::BETA_VALUE:
- socket_option_value = beta_;
- break;
-
- case RaaqmTransportOptions::DROP_FACTOR:
- socket_option_value = drop_factor_;
- break;
-
- case RaaqmTransportOptions::MINIMUM_DROP_PROBABILITY:
- socket_option_value = minimum_drop_probability_;
- break;
-
- case RateEstimationOptions::RATE_ESTIMATION_ALPHA:
- socket_option_value = rate_estimation_alpha_;
- break;
-
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
- }
-
- TRANSPORT_ALWAYS_INLINE int getSocketOption(int socket_option_key,
- uint32_t &socket_option_value) {
- switch (socket_option_key) {
- case GeneralTransportOptions::INPUT_BUFFER_SIZE:
- socket_option_value = (uint32_t)input_buffer_size_;
- break;
-
- case GeneralTransportOptions::OUTPUT_BUFFER_SIZE:
- socket_option_value = (uint32_t)output_buffer_size_;
- break;
-
- case GeneralTransportOptions::MAX_INTEREST_RETX:
- socket_option_value = max_retransmissions_;
- break;
-
- case GeneralTransportOptions::INTEREST_LIFETIME:
- socket_option_value = interest_lifetime_;
- break;
-
- case RaaqmTransportOptions::SAMPLE_NUMBER:
- socket_option_value = sample_number_;
- break;
-
- case RateEstimationOptions::RATE_ESTIMATION_BATCH_PARAMETER:
- socket_option_value = rate_estimation_batching_parameter_;
- break;
-
- case RateEstimationOptions::RATE_ESTIMATION_CHOICE:
- socket_option_value = rate_estimation_choice_;
- break;
-
- case GeneralTransportOptions::STATS_INTERVAL:
- socket_option_value = timer_interval_milliseconds_;
- break;
-
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
- }
-
- TRANSPORT_ALWAYS_INLINE int getSocketOption(int socket_option_key,
- bool &socket_option_value) {
- switch (socket_option_key) {
- case GeneralTransportOptions::ASYNC_MODE:
- socket_option_value = is_async_;
- break;
+ const std::shared_ptr<utils::Verifier> &socket_option_value);
- case GeneralTransportOptions::RUNNING:
- socket_option_value = transport_protocol_->isRunning();
- break;
+ virtual int setSocketOption(int socket_option_key,
+ const std::string &socket_option_value);
- case OtherOptions::VIRTUAL_DOWNLOAD:
- socket_option_value = virtual_download_;
- break;
-
- case RaaqmTransportOptions::RTT_STATS:
- socket_option_value = rtt_stats_;
- break;
+ virtual int setSocketOption(int socket_option_key,
+ ConsumerTimerCallback socket_option_value);
- case GeneralTransportOptions::VERIFY_SIGNATURE:
- socket_option_value = verify_signature_;
- break;
+ virtual int getSocketOption(int socket_option_key,
+ double &socket_option_value);
- default:
- return SOCKET_OPTION_NOT_GET;
- }
+ virtual int getSocketOption(int socket_option_key,
+ uint32_t &socket_option_value);
- return SOCKET_OPTION_GET;
- }
-
- TRANSPORT_ALWAYS_INLINE int getSocketOption(int socket_option_key,
- Name **socket_option_value) {
- switch (socket_option_key) {
- case GeneralTransportOptions::NETWORK_NAME:
- *socket_option_value = &network_name_;
- break;
-
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
- }
-
- TRANSPORT_ALWAYS_INLINE int getSocketOption(
- int socket_option_key,
- ConsumerContentObjectCallback **socket_option_value) {
- switch (socket_option_key) {
- case ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT:
- *socket_option_value = &on_content_object_input_;
- break;
+ virtual int getSocketOption(int socket_option_key, bool &socket_option_value);
- default:
- return SOCKET_OPTION_NOT_GET;
- }
+ virtual int getSocketOption(int socket_option_key,
+ Name **socket_option_value);
- return SOCKET_OPTION_GET;
- }
+ virtual int getSocketOption(
+ int socket_option_key,
+ ConsumerContentObjectCallback **socket_option_value);
- TRANSPORT_ALWAYS_INLINE int getSocketOption(
+ virtual int getSocketOption(
int socket_option_key,
- ConsumerContentObjectVerificationCallback **socket_option_value) {
- switch (socket_option_key) {
- case ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY:
- *socket_option_value = &on_content_object_verification_;
- break;
-
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
- }
-
- TRANSPORT_ALWAYS_INLINE int getSocketOption(
- int socket_option_key, ConsumerInterestCallback **socket_option_value) {
- switch (socket_option_key) {
- case ConsumerCallbacksOptions::INTEREST_RETRANSMISSION:
- *socket_option_value = &on_interest_retransmission_;
- break;
-
- case ConsumerCallbacksOptions::INTEREST_OUTPUT:
- *socket_option_value = &on_interest_output_;
- break;
-
- case ConsumerCallbacksOptions::INTEREST_EXPIRED:
- *socket_option_value = &on_interest_timeout_;
- break;
-
- case ConsumerCallbacksOptions::INTEREST_SATISFIED:
- *socket_option_value = &on_interest_satisfied_;
- break;
-
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
- }
-
- TRANSPORT_ALWAYS_INLINE int getSocketOption(
- int socket_option_key, ConsumerManifestCallback **socket_option_value) {
- switch (socket_option_key) {
- case ConsumerCallbacksOptions::MANIFEST_INPUT:
- *socket_option_value = &on_manifest_;
- break;
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
- }
-
- TRANSPORT_ALWAYS_INLINE int getSocketOption(
- int socket_option_key, std::shared_ptr<Portal> &socket_option_value) {
- switch (socket_option_key) {
- case PORTAL:
- socket_option_value = portal_;
- break;
-
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
- }
-
- TRANSPORT_ALWAYS_INLINE int getSocketOption(
- int socket_option_key, IcnObserver **socket_option_value) {
- switch (socket_option_key) {
- case RateEstimationOptions::RATE_ESTIMATION_OBSERVER:
- *socket_option_value = (rate_estimation_observer_);
- break;
-
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
- }
-
- TRANSPORT_ALWAYS_INLINE int getSocketOption(
+ ConsumerContentObjectVerificationCallback **socket_option_value);
+
+ virtual int getSocketOption(int socket_option_key,
+ ConsumerInterestCallback **socket_option_value);
+
+ virtual int getSocketOption(int socket_option_key,
+ ConsumerManifestCallback **socket_option_value);
+
+ virtual int getSocketOption(int socket_option_key,
+ std::shared_ptr<Portal> &socket_option_value);
+
+ virtual int getSocketOption(int socket_option_key,
+ IcnObserver **socket_option_value);
+
+ virtual int getSocketOption(
int socket_option_key,
- std::shared_ptr<utils::Verifier> &socket_option_value) {
- switch (socket_option_key) {
- case GeneralTransportOptions::VERIFIER:
- socket_option_value = verifier_;
- break;
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
- }
-
- TRANSPORT_ALWAYS_INLINE int getSocketOption(
- int socket_option_key, std::string &socket_option_value) {
- switch (socket_option_key) {
- case DataLinkOptions::OUTPUT_INTERFACE:
- socket_option_value = output_interface_;
- break;
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
- }
-
- TRANSPORT_ALWAYS_INLINE int getSocketOption(
- int socket_option_key, ConsumerTimerCallback **socket_option_value) {
- switch (socket_option_key) {
- case ConsumerCallbacksOptions::STATS_SUMMARY:
- *socket_option_value = &stats_summary_;
- break;
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
- }
+ std::shared_ptr<utils::Verifier> &socket_option_value);
+
+ virtual int getSocketOption(int socket_option_key,
+ std::string &socket_option_value);
+
+ virtual int getSocketOption(int socket_option_key,
+ ConsumerTimerCallback **socket_option_value);
+
+ protected:
+ template <typename Lambda, typename arg2>
+ int rescheduleOnIOService(int socket_option_key, arg2 socket_option_value,
+ Lambda lambda_func);
private:
asio::io_service internal_io_service_;
@@ -808,6 +293,9 @@ class ConsumerSocket : public BaseSocket {
std::shared_ptr<Portal> portal_;
utils::EventThread async_downloader_;
+ // No need to protect from multiple accesses in the async consumer
+ // The parameter is accessible only with a getSocketOption and
+ // set from the consume
Name network_name_;
int interest_lifetime_;
@@ -816,8 +304,6 @@ class ConsumerSocket : public BaseSocket {
double max_window_size_;
double current_window_size_;
uint32_t max_retransmissions_;
- size_t output_buffer_size_;
- size_t input_buffer_size_;
// RAAQM Parameters
double minimum_drop_probability_;
@@ -832,12 +318,10 @@ class ConsumerSocket : public BaseSocket {
int rate_estimation_batching_parameter_;
int rate_estimation_choice_;
- bool is_async_;
-
// Verification parameters
std::shared_ptr<utils::Verifier> verifier_;
PARCKeyId *key_id_;
- bool verify_signature_;
+ std::atomic_bool verify_signature_;
ConsumerInterestCallback on_interest_retransmission_;
ConsumerInterestCallback on_interest_output_;
@@ -853,12 +337,13 @@ class ConsumerSocket : public BaseSocket {
// Virtual download for traffic generator
bool virtual_download_;
- bool rtt_stats_;
uint32_t timer_interval_milliseconds_;
// Transport protocol
std::unique_ptr<TransportProtocol> transport_protocol_;
+
+ utils::SpinLock guard_raaqm_params_;
};
} // namespace interface