diff options
Diffstat (limited to 'libtransport/src/implementation/socket_consumer.h')
-rw-r--r-- | libtransport/src/implementation/socket_consumer.h | 178 |
1 files changed, 120 insertions, 58 deletions
diff --git a/libtransport/src/implementation/socket_consumer.h b/libtransport/src/implementation/socket_consumer.h index e0981af7f..ebdac7f93 100644 --- a/libtransport/src/implementation/socket_consumer.h +++ b/libtransport/src/implementation/socket_consumer.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -20,6 +20,7 @@ #include <hicn/transport/interfaces/socket_options_default_values.h> #include <hicn/transport/interfaces/statistics.h> #include <hicn/transport/utils/event_thread.h> +#include <implementation/socket.h> #include <protocols/cbr.h> #include <protocols/raaqm.h> #include <protocols/rtc/rtc.h> @@ -38,7 +39,6 @@ class ConsumerSocket : public Socket { std::shared_ptr<core::Portal> &&portal) : Socket(std::move(portal)), consumer_interface_(consumer), - async_downloader_(), interest_lifetime_(default_values::interest_lifetime), min_window_size_(default_values::min_window_size), max_window_size_(default_values::max_window_size), @@ -56,6 +56,7 @@ class ConsumerSocket : public Socket { rate_estimation_observer_(nullptr), rate_estimation_batching_parameter_(default_values::batch), rate_estimation_choice_(0), + max_unverified_delay_(default_values::max_unverified_delay), verifier_(std::make_shared<auth::VoidVerifier>()), verify_signature_(false), reset_window_(false), @@ -64,41 +65,42 @@ class ConsumerSocket : public Socket { on_interest_satisfied_(VOID_HANDLER), on_content_object_input_(VOID_HANDLER), stats_summary_(VOID_HANDLER), + on_fwd_strategy_(VOID_HANDLER), + on_rec_strategy_(VOID_HANDLER), read_callback_(nullptr), timer_interval_milliseconds_(0), + recovery_strategy_(RtcTransportRecoveryStrategies::RTX_ONLY), + aggregated_data_(false), + fec_setting_(""), guard_raaqm_params_() { switch (protocol) { case TransportProtocolAlgorithms::CBR: transport_protocol_ = - std::make_unique<protocol::CbrTransportProtocol>(this); + std::make_shared<protocol::CbrTransportProtocol>(this); break; case TransportProtocolAlgorithms::RTC: transport_protocol_ = - std::make_unique<protocol::rtc::RTCTransportProtocol>(this); + std::make_shared<protocol::rtc::RTCTransportProtocol>(this); break; case TransportProtocolAlgorithms::RAAQM: default: transport_protocol_ = - std::make_unique<protocol::RaaqmTransportProtocol>(this); + std::make_shared<protocol::RaaqmTransportProtocol>(this); break; } } public: ConsumerSocket(interface::ConsumerSocket *consumer, int protocol) - : ConsumerSocket(consumer, protocol, std::make_shared<core::Portal>()) {} + : ConsumerSocket(consumer, protocol, core::Portal::createShared()) {} ConsumerSocket(interface::ConsumerSocket *consumer, int protocol, - asio::io_service &io_service) - : ConsumerSocket(consumer, protocol, - std::make_shared<core::Portal>(io_service)) { + ::utils::EventThread &worker) + : ConsumerSocket(consumer, protocol, core::Portal::createShared(worker)) { is_async_ = true; } - ~ConsumerSocket() { - stop(); - async_downloader_.stop(); - } + ~ConsumerSocket() { stop(); } interface::ConsumerSocket *getInterface() { return consumer_interface_; @@ -122,18 +124,6 @@ class ConsumerSocket : public Socket { transport_protocol_->start(); - return is_async_ ? CONSUMER_RUNNING : CONSUMER_FINISHED; - } - - virtual int asyncConsume(const Name &name) { - if (!async_downloader_.stopped()) { - async_downloader_.add([this, name]() { - network_name_ = std::move(name); - network_name_.setSuffix(0); - transport_protocol_->start(); - }); - } - return CONSUMER_RUNNING; } @@ -149,6 +139,9 @@ class ConsumerSocket : public Socket { } } + using Socket::getSocketOption; + using Socket::setSocketOption; + virtual int setSocketOption(int socket_option_key, ReadCallback *socket_option_value) { // Reschedule the function on the io_service to avoid race condition in @@ -245,6 +238,10 @@ class ConsumerSocket : public Socket { interest_lifetime_ = socket_option_value; break; + case GeneralTransportOptions::MAX_UNVERIFIED_TIME: + max_unverified_delay_ = socket_option_value; + break; + case RateEstimationOptions::RATE_ESTIMATION_BATCH_PARAMETER: if (socket_option_value > 0) { rate_estimation_batching_parameter_ = socket_option_value; @@ -265,6 +262,11 @@ class ConsumerSocket : public Socket { timer_interval_milliseconds_ = socket_option_value; break; + case RtcTransportOptions::RECOVERY_STRATEGY: + recovery_strategy_ = + (RtcTransportRecoveryStrategies)socket_option_value; + break; + default: return SOCKET_OPTION_NOT_SET; } @@ -328,6 +330,11 @@ class ConsumerSocket : public Socket { result = SOCKET_OPTION_SET; break; + case RtcTransportOptions::AGGREGATED_DATA: + aggregated_data_ = socket_option_value; + result = SOCKET_OPTION_SET; + break; + default: return result; } @@ -406,36 +413,37 @@ class ConsumerSocket : public Socket { int setSocketOption( int socket_option_key, const std::shared_ptr<auth::Verifier> &socket_option_value) { - int result = SOCKET_OPTION_NOT_SET; if (!transport_protocol_->isRunning()) { switch (socket_option_key) { case GeneralTransportOptions::VERIFIER: verifier_.reset(); verifier_ = socket_option_value; - result = SOCKET_OPTION_SET; break; default: - return result; + return SOCKET_OPTION_NOT_SET; } } - - return result; + return SOCKET_OPTION_SET; } int setSocketOption(int socket_option_key, const std::string &socket_option_value) { int result = SOCKET_OPTION_NOT_SET; - if (!transport_protocol_->isRunning()) { - switch (socket_option_key) { - case DataLinkOptions::OUTPUT_INTERFACE: + switch (socket_option_key) { + case DataLinkOptions::OUTPUT_INTERFACE: + if (!transport_protocol_->isRunning()) { output_interface_ = socket_option_value; portal_->setOutputInterface(output_interface_); result = SOCKET_OPTION_SET; - break; + } + break; + case GeneralTransportOptions::FEC_TYPE: + fec_setting_ = socket_option_value; + result = SOCKET_OPTION_SET; + break; - default: - return result; - } + default: + return result; } return result; } @@ -461,6 +469,29 @@ class ConsumerSocket : public Socket { }); } + int setSocketOption(int socket_option_key, + StrategyCallback socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in + // case setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + StrategyCallback socket_option_value) -> int { + switch (socket_option_key) { + case ConsumerCallbacksOptions::FWD_STRATEGY_CHANGE: + on_fwd_strategy_ = socket_option_value; + break; + case ConsumerCallbacksOptions::REC_STRATEGY_CHANGE: + on_rec_strategy_ = socket_option_value; + break; + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; + }); + } + int getSocketOption(int socket_option_key, double &socket_option_value) { utils::SpinLock::Acquire locked(guard_raaqm_params_); switch (socket_option_key) { @@ -516,6 +547,10 @@ class ConsumerSocket : public Socket { socket_option_value = interest_lifetime_; break; + case GeneralTransportOptions::MAX_UNVERIFIED_TIME: + socket_option_value = max_unverified_delay_; + break; + case RaaqmTransportOptions::SAMPLE_NUMBER: socket_option_value = sample_number_; break; @@ -532,6 +567,10 @@ class ConsumerSocket : public Socket { socket_option_value = timer_interval_milliseconds_; break; + case RtcTransportOptions::RECOVERY_STRATEGY: + socket_option_value = recovery_strategy_; + break; + default: return SOCKET_OPTION_NOT_GET; } @@ -553,6 +592,10 @@ class ConsumerSocket : public Socket { socket_option_value = reset_window_; break; + case RtcTransportOptions::AGGREGATED_DATA: + socket_option_value = aggregated_data_; + break; + default: return SOCKET_OPTION_NOT_GET; } @@ -628,20 +671,6 @@ class ConsumerSocket : public Socket { } int getSocketOption(int socket_option_key, - std::shared_ptr<core::Portal> &socket_option_value) { - switch (socket_option_key) { - case PORTAL: - socket_option_value = portal_; - break; - - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } - - int getSocketOption(int socket_option_key, IcnObserver **socket_option_value) { utils::SpinLock::Acquire locked(guard_raaqm_params_); switch (socket_option_key) { @@ -665,7 +694,6 @@ class ConsumerSocket : public Socket { default: return SOCKET_OPTION_NOT_GET; } - return SOCKET_OPTION_GET; } @@ -674,6 +702,9 @@ class ConsumerSocket : public Socket { case DataLinkOptions::OUTPUT_INTERFACE: socket_option_value = output_interface_; break; + case GeneralTransportOptions::FEC_TYPE: + socket_option_value = fec_setting_; + break; default: return SOCKET_OPTION_NOT_GET; } @@ -714,6 +745,29 @@ class ConsumerSocket : public Socket { }); } + int getSocketOption(int socket_option_key, + StrategyCallback **socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in + // case setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + StrategyCallback **socket_option_value) -> int { + switch (socket_option_key) { + case ConsumerCallbacksOptions::FWD_STRATEGY_CHANGE: + *socket_option_value = &on_fwd_strategy_; + break; + case ConsumerCallbacksOptions::REC_STRATEGY_CHANGE: + *socket_option_value = &on_rec_strategy_; + break; + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; + }); + } + protected: template <typename Lambda, typename arg2> int rescheduleOnIOService(int socket_option_key, arg2 socket_option_value, @@ -726,9 +780,9 @@ class ConsumerSocket : public Socket { /* Condition variable for the wait */ std::condition_variable cv; bool done = false; - portal_->getIoService().dispatch([&socket_option_key, - &socket_option_value, &mtx, &cv, - &result, &done, &func]() { + portal_->getThread().tryRunHandlerNow([&socket_option_key, + &socket_option_value, &mtx, &cv, + &result, &done, &func]() { std::unique_lock<std::mutex> lck(mtx); done = true; result = func(socket_option_key, socket_option_value); @@ -748,8 +802,6 @@ class ConsumerSocket : public Socket { protected: interface::ConsumerSocket *consumer_interface_; - utils::EventThread async_downloader_; - // No need to protect from multiple accesses in the async consumer // The parameter is accessible only with a getSocketOption and // set from the consume @@ -776,6 +828,7 @@ class ConsumerSocket : public Socket { int rate_estimation_choice_; // Verification parameters + int max_unverified_delay_; std::shared_ptr<auth::Verifier> verifier_; transport::auth::KeyId *key_id_; std::atomic_bool verify_signature_; @@ -787,17 +840,26 @@ class ConsumerSocket : public Socket { ConsumerInterestCallback on_interest_satisfied_; ConsumerContentObjectCallback on_content_object_input_; ConsumerTimerCallback stats_summary_; + StrategyCallback on_fwd_strategy_; + StrategyCallback on_rec_strategy_; ReadCallback *read_callback_; uint32_t timer_interval_milliseconds_; // Transport protocol - std::unique_ptr<protocol::TransportProtocol> transport_protocol_; + std::shared_ptr<protocol::TransportProtocol> transport_protocol_; // Statistic TransportStatistics stats_; + // RTC protocol + RtcTransportRecoveryStrategies recovery_strategy_; + bool aggregated_data_; + + // FEC setting + std::string fec_setting_; + utils::SpinLock guard_raaqm_params_; std::string output_interface_; }; |