diff options
Diffstat (limited to 'libtransport/src/implementation/socket_consumer.h')
-rw-r--r-- | libtransport/src/implementation/socket_consumer.h | 232 |
1 files changed, 175 insertions, 57 deletions
diff --git a/libtransport/src/implementation/socket_consumer.h b/libtransport/src/implementation/socket_consumer.h index e0981af7f..3117e21e7 100644 --- a/libtransport/src/implementation/socket_consumer.h +++ b/libtransport/src/implementation/socket_consumer.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -20,6 +20,7 @@ #include <hicn/transport/interfaces/socket_options_default_values.h> #include <hicn/transport/interfaces/statistics.h> #include <hicn/transport/utils/event_thread.h> +#include <implementation/socket.h> #include <protocols/cbr.h> #include <protocols/raaqm.h> #include <protocols/rtc/rtc.h> @@ -38,7 +39,7 @@ class ConsumerSocket : public Socket { std::shared_ptr<core::Portal> &&portal) : Socket(std::move(portal)), consumer_interface_(consumer), - async_downloader_(), + packet_format_(default_values::packet_format), interest_lifetime_(default_values::interest_lifetime), min_window_size_(default_values::min_window_size), max_window_size_(default_values::max_window_size), @@ -56,6 +57,8 @@ class ConsumerSocket : public Socket { rate_estimation_observer_(nullptr), rate_estimation_batching_parameter_(default_values::batch), rate_estimation_choice_(0), + manifest_factor_relevant_(default_values::manifest_factor_relevant), + manifest_factor_alert_(default_values::manifest_factor_alert), verifier_(std::make_shared<auth::VoidVerifier>()), verify_signature_(false), reset_window_(false), @@ -64,41 +67,43 @@ class ConsumerSocket : public Socket { on_interest_satisfied_(VOID_HANDLER), on_content_object_input_(VOID_HANDLER), stats_summary_(VOID_HANDLER), + on_fwd_strategy_(VOID_HANDLER), + on_rec_strategy_(VOID_HANDLER), read_callback_(nullptr), timer_interval_milliseconds_(0), + recovery_strategy_(RtcTransportRecoveryStrategies::RTX_ONLY), + aggregated_data_(false), + content_sharing_mode_(false), + aggregated_interests_(false), guard_raaqm_params_() { switch (protocol) { case TransportProtocolAlgorithms::CBR: transport_protocol_ = - std::make_unique<protocol::CbrTransportProtocol>(this); + std::make_shared<protocol::CbrTransportProtocol>(this); break; case TransportProtocolAlgorithms::RTC: transport_protocol_ = - std::make_unique<protocol::rtc::RTCTransportProtocol>(this); + std::make_shared<protocol::rtc::RTCTransportProtocol>(this); break; case TransportProtocolAlgorithms::RAAQM: default: transport_protocol_ = - std::make_unique<protocol::RaaqmTransportProtocol>(this); + std::make_shared<protocol::RaaqmTransportProtocol>(this); break; } } public: ConsumerSocket(interface::ConsumerSocket *consumer, int protocol) - : ConsumerSocket(consumer, protocol, std::make_shared<core::Portal>()) {} + : ConsumerSocket(consumer, protocol, core::Portal::createShared()) {} ConsumerSocket(interface::ConsumerSocket *consumer, int protocol, - asio::io_service &io_service) - : ConsumerSocket(consumer, protocol, - std::make_shared<core::Portal>(io_service)) { + ::utils::EventThread &worker) + : ConsumerSocket(consumer, protocol, core::Portal::createShared(worker)) { is_async_ = true; } - ~ConsumerSocket() { - stop(); - async_downloader_.stop(); - } + ~ConsumerSocket() { stop(); } interface::ConsumerSocket *getInterface() { return consumer_interface_; @@ -122,18 +127,6 @@ class ConsumerSocket : public Socket { transport_protocol_->start(); - return is_async_ ? CONSUMER_RUNNING : CONSUMER_FINISHED; - } - - virtual int asyncConsume(const Name &name) { - if (!async_downloader_.stopped()) { - async_downloader_.add([this, name]() { - network_name_ = std::move(name); - network_name_.setSuffix(0); - transport_protocol_->start(); - }); - } - return CONSUMER_RUNNING; } @@ -149,6 +142,9 @@ class ConsumerSocket : public Socket { } } + using Socket::getSocketOption; + using Socket::setSocketOption; + virtual int setSocketOption(int socket_option_key, ReadCallback *socket_option_value) { // Reschedule the function on the io_service to avoid race condition in @@ -265,6 +261,23 @@ class ConsumerSocket : public Socket { timer_interval_milliseconds_ = socket_option_value; break; + case RtcTransportOptions::RECOVERY_STRATEGY: + recovery_strategy_ = + (RtcTransportRecoveryStrategies)socket_option_value; + break; + + case MANIFEST_FACTOR_RELEVANT: + manifest_factor_relevant_ = socket_option_value; + break; + + case MANIFEST_FACTOR_ALERT: + manifest_factor_alert_ = socket_option_value; + break; + + case GeneralTransportOptions::PACKET_FORMAT: + packet_format_ = socket_option_value; + break; + default: return SOCKET_OPTION_NOT_SET; } @@ -310,7 +323,6 @@ class ConsumerSocket : public Socket { on_content_object_input_ = VOID_HANDLER; break; } - default: return SOCKET_OPTION_NOT_SET; } @@ -328,6 +340,21 @@ class ConsumerSocket : public Socket { result = SOCKET_OPTION_SET; break; + case RtcTransportOptions::AGGREGATED_DATA: + aggregated_data_ = socket_option_value; + result = SOCKET_OPTION_SET; + break; + + case RtcTransportOptions::CONTENT_SHARING_MODE: + content_sharing_mode_ = socket_option_value; + result = SOCKET_OPTION_SET; + break; + + case RtcTransportOptions::AGGREGATED_INTERESTS: + aggregated_interests_ = socket_option_value; + result = SOCKET_OPTION_SET; + break; + default: return result; } @@ -405,37 +432,49 @@ class ConsumerSocket : public Socket { int setSocketOption( int socket_option_key, + const std::shared_ptr<auth::Signer> &socket_option_value) { + if (!transport_protocol_->isRunning()) { + switch (socket_option_key) { + case GeneralTransportOptions::SIGNER: + signer_.reset(); + signer_ = socket_option_value; + break; + default: + return SOCKET_OPTION_NOT_SET; + } + } + return SOCKET_OPTION_SET; + } + + int setSocketOption( + int socket_option_key, const std::shared_ptr<auth::Verifier> &socket_option_value) { - int result = SOCKET_OPTION_NOT_SET; if (!transport_protocol_->isRunning()) { switch (socket_option_key) { case GeneralTransportOptions::VERIFIER: verifier_.reset(); verifier_ = socket_option_value; - result = SOCKET_OPTION_SET; break; default: - return result; + return SOCKET_OPTION_NOT_SET; } } - - return result; + return SOCKET_OPTION_SET; } int setSocketOption(int socket_option_key, const std::string &socket_option_value) { int result = SOCKET_OPTION_NOT_SET; - if (!transport_protocol_->isRunning()) { - switch (socket_option_key) { - case DataLinkOptions::OUTPUT_INTERFACE: + switch (socket_option_key) { + case DataLinkOptions::OUTPUT_INTERFACE: + if (!transport_protocol_->isRunning()) { output_interface_ = socket_option_value; portal_->setOutputInterface(output_interface_); result = SOCKET_OPTION_SET; - break; - - default: - return result; - } + } + break; + default: + return result; } return result; } @@ -461,6 +500,29 @@ class ConsumerSocket : public Socket { }); } + int setSocketOption(int socket_option_key, + StrategyCallback socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in + // case setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + StrategyCallback socket_option_value) -> int { + switch (socket_option_key) { + case ConsumerCallbacksOptions::FWD_STRATEGY_CHANGE: + on_fwd_strategy_ = socket_option_value; + break; + case ConsumerCallbacksOptions::REC_STRATEGY_CHANGE: + on_rec_strategy_ = socket_option_value; + break; + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; + }); + } + int getSocketOption(int socket_option_key, double &socket_option_value) { utils::SpinLock::Acquire locked(guard_raaqm_params_); switch (socket_option_key) { @@ -532,6 +594,22 @@ class ConsumerSocket : public Socket { socket_option_value = timer_interval_milliseconds_; break; + case RtcTransportOptions::RECOVERY_STRATEGY: + socket_option_value = recovery_strategy_; + break; + + case GeneralTransportOptions::MANIFEST_FACTOR_RELEVANT: + socket_option_value = manifest_factor_relevant_; + break; + + case GeneralTransportOptions::MANIFEST_FACTOR_ALERT: + socket_option_value = manifest_factor_alert_; + break; + + case GeneralTransportOptions::PACKET_FORMAT: + socket_option_value = packet_format_; + break; + default: return SOCKET_OPTION_NOT_GET; } @@ -553,6 +631,18 @@ class ConsumerSocket : public Socket { socket_option_value = reset_window_; break; + case RtcTransportOptions::AGGREGATED_DATA: + socket_option_value = aggregated_data_; + break; + + case RtcTransportOptions::CONTENT_SHARING_MODE: + socket_option_value = content_sharing_mode_; + break; + + case RtcTransportOptions::AGGREGATED_INTERESTS: + socket_option_value = aggregated_interests_; + break; + default: return SOCKET_OPTION_NOT_GET; } @@ -628,10 +718,11 @@ class ConsumerSocket : public Socket { } int getSocketOption(int socket_option_key, - std::shared_ptr<core::Portal> &socket_option_value) { + IcnObserver **socket_option_value) { + utils::SpinLock::Acquire locked(guard_raaqm_params_); switch (socket_option_key) { - case PORTAL: - socket_option_value = portal_; + case RateEstimationOptions::RATE_ESTIMATION_OBSERVER: + *socket_option_value = (rate_estimation_observer_); break; default: @@ -642,18 +733,15 @@ class ConsumerSocket : public Socket { } int getSocketOption(int socket_option_key, - IcnObserver **socket_option_value) { - utils::SpinLock::Acquire locked(guard_raaqm_params_); + std::shared_ptr<auth::Signer> &socket_option_value) { switch (socket_option_key) { - case RateEstimationOptions::RATE_ESTIMATION_OBSERVER: - *socket_option_value = (rate_estimation_observer_); - break; + case GeneralTransportOptions::SIGNER: + socket_option_value = signer_; + return SOCKET_OPTION_GET; default: return SOCKET_OPTION_NOT_GET; } - - return SOCKET_OPTION_GET; } int getSocketOption(int socket_option_key, @@ -665,7 +753,6 @@ class ConsumerSocket : public Socket { default: return SOCKET_OPTION_NOT_GET; } - return SOCKET_OPTION_GET; } @@ -677,7 +764,6 @@ class ConsumerSocket : public Socket { default: return SOCKET_OPTION_NOT_GET; } - return SOCKET_OPTION_GET; } @@ -714,6 +800,29 @@ class ConsumerSocket : public Socket { }); } + int getSocketOption(int socket_option_key, + StrategyCallback **socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in + // case setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + StrategyCallback **socket_option_value) -> int { + switch (socket_option_key) { + case ConsumerCallbacksOptions::FWD_STRATEGY_CHANGE: + *socket_option_value = &on_fwd_strategy_; + break; + case ConsumerCallbacksOptions::REC_STRATEGY_CHANGE: + *socket_option_value = &on_rec_strategy_; + break; + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; + }); + } + protected: template <typename Lambda, typename arg2> int rescheduleOnIOService(int socket_option_key, arg2 socket_option_value, @@ -726,9 +835,9 @@ class ConsumerSocket : public Socket { /* Condition variable for the wait */ std::condition_variable cv; bool done = false; - portal_->getIoService().dispatch([&socket_option_key, - &socket_option_value, &mtx, &cv, - &result, &done, &func]() { + portal_->getThread().tryRunHandlerNow([&socket_option_key, + &socket_option_value, &mtx, &cv, + &result, &done, &func]() { std::unique_lock<std::mutex> lck(mtx); done = true; result = func(socket_option_key, socket_option_value); @@ -748,13 +857,12 @@ class ConsumerSocket : public Socket { protected: interface::ConsumerSocket *consumer_interface_; - utils::EventThread async_downloader_; - // No need to protect from multiple accesses in the async consumer // The parameter is accessible only with a getSocketOption and // set from the consume Name network_name_; + hicn_packet_format_t packet_format_; int interest_lifetime_; double min_window_size_; @@ -776,6 +884,8 @@ class ConsumerSocket : public Socket { int rate_estimation_choice_; // Verification parameters + uint32_t manifest_factor_relevant_; + uint32_t manifest_factor_alert_; std::shared_ptr<auth::Verifier> verifier_; transport::auth::KeyId *key_id_; std::atomic_bool verify_signature_; @@ -787,17 +897,25 @@ class ConsumerSocket : public Socket { ConsumerInterestCallback on_interest_satisfied_; ConsumerContentObjectCallback on_content_object_input_; ConsumerTimerCallback stats_summary_; + StrategyCallback on_fwd_strategy_; + StrategyCallback on_rec_strategy_; ReadCallback *read_callback_; uint32_t timer_interval_milliseconds_; // Transport protocol - std::unique_ptr<protocol::TransportProtocol> transport_protocol_; + std::shared_ptr<protocol::TransportProtocol> transport_protocol_; // Statistic TransportStatistics stats_; + // RTC protocol + RtcTransportRecoveryStrategies recovery_strategy_; + bool aggregated_data_; + bool content_sharing_mode_; + bool aggregated_interests_; + utils::SpinLock guard_raaqm_params_; std::string output_interface_; }; |