diff options
Diffstat (limited to 'libtransport/src/implementation/socket_consumer.h')
-rw-r--r-- | libtransport/src/implementation/socket_consumer.h | 410 |
1 files changed, 186 insertions, 224 deletions
diff --git a/libtransport/src/implementation/socket_consumer.h b/libtransport/src/implementation/socket_consumer.h index 87965923e..3117e21e7 100644 --- a/libtransport/src/implementation/socket_consumer.h +++ b/libtransport/src/implementation/socket_consumer.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -13,15 +13,18 @@ * limitations under the License. */ +#pragma once + +#include <hicn/transport/auth/verifier.h> #include <hicn/transport/interfaces/socket_consumer.h> #include <hicn/transport/interfaces/socket_options_default_values.h> #include <hicn/transport/interfaces/statistics.h> -#include <hicn/transport/security/verifier.h> #include <hicn/transport/utils/event_thread.h> +#include <implementation/socket.h> #include <protocols/cbr.h> -#include <protocols/protocol.h> #include <protocols/raaqm.h> -#include <protocols/rtc.h> +#include <protocols/rtc/rtc.h> +#include <protocols/transport_protocol.h> namespace transport { namespace implementation { @@ -30,13 +33,13 @@ using namespace core; using namespace interface; using ReadCallback = interface::ConsumerSocket::ReadCallback; -class ConsumerSocket : public Socket<BasePortal> { +class ConsumerSocket : public Socket { private: ConsumerSocket(interface::ConsumerSocket *consumer, int protocol, - std::shared_ptr<Portal> &&portal) - : consumer_interface_(consumer), - portal_(portal), - async_downloader_(), + std::shared_ptr<core::Portal> &&portal) + : Socket(std::move(portal)), + consumer_interface_(consumer), + packet_format_(default_values::packet_format), interest_lifetime_(default_values::interest_lifetime), min_window_size_(default_values::min_window_size), max_window_size_(default_values::max_window_size), @@ -54,52 +57,53 @@ class ConsumerSocket : public Socket<BasePortal> { rate_estimation_observer_(nullptr), rate_estimation_batching_parameter_(default_values::batch), rate_estimation_choice_(0), - is_async_(false), - verifier_(std::make_shared<utils::Verifier>()), + manifest_factor_relevant_(default_values::manifest_factor_relevant), + manifest_factor_alert_(default_values::manifest_factor_alert), + verifier_(std::make_shared<auth::VoidVerifier>()), verify_signature_(false), - key_content_(false), reset_window_(false), on_interest_output_(VOID_HANDLER), on_interest_timeout_(VOID_HANDLER), on_interest_satisfied_(VOID_HANDLER), on_content_object_input_(VOID_HANDLER), - on_content_object_verification_(VOID_HANDLER), stats_summary_(VOID_HANDLER), + on_fwd_strategy_(VOID_HANDLER), + on_rec_strategy_(VOID_HANDLER), read_callback_(nullptr), timer_interval_milliseconds_(0), + recovery_strategy_(RtcTransportRecoveryStrategies::RTX_ONLY), + aggregated_data_(false), + content_sharing_mode_(false), + aggregated_interests_(false), guard_raaqm_params_() { switch (protocol) { case TransportProtocolAlgorithms::CBR: transport_protocol_ = - std::make_unique<protocol::CbrTransportProtocol>(this); + std::make_shared<protocol::CbrTransportProtocol>(this); break; case TransportProtocolAlgorithms::RTC: transport_protocol_ = - std::make_unique<protocol::RTCTransportProtocol>(this); + std::make_shared<protocol::rtc::RTCTransportProtocol>(this); break; case TransportProtocolAlgorithms::RAAQM: default: transport_protocol_ = - std::make_unique<protocol::RaaqmTransportProtocol>(this); + std::make_shared<protocol::RaaqmTransportProtocol>(this); break; } } public: ConsumerSocket(interface::ConsumerSocket *consumer, int protocol) - : ConsumerSocket(consumer, protocol, std::make_shared<Portal>()) {} + : ConsumerSocket(consumer, protocol, core::Portal::createShared()) {} ConsumerSocket(interface::ConsumerSocket *consumer, int protocol, - asio::io_service &io_service) - : ConsumerSocket(consumer, protocol, - std::make_shared<Portal>(io_service)) { + ::utils::EventThread &worker) + : ConsumerSocket(consumer, protocol, core::Portal::createShared(worker)) { is_async_ = true; } - ~ConsumerSocket() { - stop(); - async_downloader_.stop(); - } + ~ConsumerSocket() { stop(); } interface::ConsumerSocket *getInterface() { return consumer_interface_; @@ -123,23 +127,9 @@ class ConsumerSocket : public Socket<BasePortal> { transport_protocol_->start(); - return is_async_ ? CONSUMER_RUNNING : CONSUMER_FINISHED; - } - - virtual int asyncConsume(const Name &name) { - if (!async_downloader_.stopped()) { - async_downloader_.add([this, name]() { - network_name_ = std::move(name); - network_name_.setSuffix(0); - transport_protocol_->start(); - }); - } - return CONSUMER_RUNNING; } - bool verifyKeyPackets() { return transport_protocol_->verifyKeyPackets(); } - void stop() { if (transport_protocol_->isRunning()) { transport_protocol_->stop(); @@ -152,7 +142,8 @@ class ConsumerSocket : public Socket<BasePortal> { } } - asio::io_service &getIoService() { return portal_->getIoService(); } + using Socket::getSocketOption; + using Socket::setSocketOption; virtual int setSocketOption(int socket_option_key, ReadCallback *socket_option_value) { @@ -270,6 +261,23 @@ class ConsumerSocket : public Socket<BasePortal> { timer_interval_milliseconds_ = socket_option_value; break; + case RtcTransportOptions::RECOVERY_STRATEGY: + recovery_strategy_ = + (RtcTransportRecoveryStrategies)socket_option_value; + break; + + case MANIFEST_FACTOR_RELEVANT: + manifest_factor_relevant_ = socket_option_value; + break; + + case MANIFEST_FACTOR_ALERT: + manifest_factor_alert_ = socket_option_value; + break; + + case GeneralTransportOptions::PACKET_FORMAT: + packet_format_ = socket_option_value; + break; + default: return SOCKET_OPTION_NOT_SET; } @@ -315,13 +323,6 @@ class ConsumerSocket : public Socket<BasePortal> { on_content_object_input_ = VOID_HANDLER; break; } - - case ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY: - if (socket_option_value == VOID_HANDLER) { - on_content_object_verification_ = VOID_HANDLER; - break; - } - default: return SOCKET_OPTION_NOT_SET; } @@ -334,18 +335,23 @@ class ConsumerSocket : public Socket<BasePortal> { int result = SOCKET_OPTION_NOT_SET; if (!transport_protocol_->isRunning()) { switch (socket_option_key) { - case GeneralTransportOptions::VERIFY_SIGNATURE: - verify_signature_ = socket_option_value; + case RaaqmTransportOptions::PER_SESSION_CWINDOW_RESET: + reset_window_ = socket_option_value; result = SOCKET_OPTION_SET; break; - case GeneralTransportOptions::KEY_CONTENT: - key_content_ = socket_option_value; + case RtcTransportOptions::AGGREGATED_DATA: + aggregated_data_ = socket_option_value; result = SOCKET_OPTION_SET; break; - case RaaqmTransportOptions::PER_SESSION_CWINDOW_RESET: - reset_window_ = socket_option_value; + case RtcTransportOptions::CONTENT_SHARING_MODE: + content_sharing_mode_ = socket_option_value; + result = SOCKET_OPTION_SET; + break; + + case RtcTransportOptions::AGGREGATED_INTERESTS: + aggregated_interests_ = socket_option_value; result = SOCKET_OPTION_SET; break; @@ -377,29 +383,6 @@ class ConsumerSocket : public Socket<BasePortal> { }); } - int setSocketOption( - int socket_option_key, - ConsumerContentObjectVerificationCallback socket_option_value) { - // Reschedule the function on the io_service to avoid race condition in - // case setSocketOption is called while the io_service is running. - return rescheduleOnIOService( - socket_option_key, socket_option_value, - [this](int socket_option_key, - ConsumerContentObjectVerificationCallback socket_option_value) - -> int { - switch (socket_option_key) { - case ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY: - on_content_object_verification_ = socket_option_value; - break; - - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - }); - } - int setSocketOption(int socket_option_key, ConsumerInterestCallback socket_option_value) { // Reschedule the function on the io_service to avoid race condition in @@ -433,51 +416,6 @@ class ConsumerSocket : public Socket<BasePortal> { }); } - int setSocketOption( - int socket_option_key, - ConsumerContentObjectVerificationFailedCallback socket_option_value) { - return rescheduleOnIOService( - socket_option_key, socket_option_value, - [this]( - int socket_option_key, - ConsumerContentObjectVerificationFailedCallback socket_option_value) - -> int { - switch (socket_option_key) { - case ConsumerCallbacksOptions::VERIFICATION_FAILED: - verification_failed_callback_ = socket_option_value; - break; - - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - }); - } - - // int setSocketOption( - // int socket_option_key, - // ConsumerContentObjectVerificationFailedCallback socket_option_value) { - // return rescheduleOnIOService( - // socket_option_key, socket_option_value, - // [this]( - // int socket_option_key, - // ConsumerContentObjectVerificationFailedCallback - // socket_option_value) - // -> int { - // switch (socket_option_key) { - // case ConsumerCallbacksOptions::VERIFICATION_FAILED: - // verification_failed_callback_ = socket_option_value; - // break; - - // default: - // return SOCKET_OPTION_NOT_SET; - // } - - // return SOCKET_OPTION_SET; - // }); - // } - int setSocketOption(int socket_option_key, IcnObserver *socket_option_value) { utils::SpinLock::Acquire locked(guard_raaqm_params_); switch (socket_option_key) { @@ -494,45 +432,49 @@ class ConsumerSocket : public Socket<BasePortal> { int setSocketOption( int socket_option_key, - const std::shared_ptr<utils::Verifier> &socket_option_value) { - int result = SOCKET_OPTION_NOT_SET; + const std::shared_ptr<auth::Signer> &socket_option_value) { + if (!transport_protocol_->isRunning()) { + switch (socket_option_key) { + case GeneralTransportOptions::SIGNER: + signer_.reset(); + signer_ = socket_option_value; + break; + default: + return SOCKET_OPTION_NOT_SET; + } + } + return SOCKET_OPTION_SET; + } + + int setSocketOption( + int socket_option_key, + const std::shared_ptr<auth::Verifier> &socket_option_value) { if (!transport_protocol_->isRunning()) { switch (socket_option_key) { case GeneralTransportOptions::VERIFIER: verifier_.reset(); verifier_ = socket_option_value; - result = SOCKET_OPTION_SET; break; default: - return result; + return SOCKET_OPTION_NOT_SET; } } - - return result; + return SOCKET_OPTION_SET; } int setSocketOption(int socket_option_key, const std::string &socket_option_value) { int result = SOCKET_OPTION_NOT_SET; - if (!transport_protocol_->isRunning()) { - switch (socket_option_key) { - case GeneralTransportOptions::CERTIFICATE: - key_id_ = verifier_->addKeyFromCertificate(socket_option_value); - - if (key_id_ != nullptr) { - result = SOCKET_OPTION_SET; - } - break; - - case DataLinkOptions::OUTPUT_INTERFACE: + switch (socket_option_key) { + case DataLinkOptions::OUTPUT_INTERFACE: + if (!transport_protocol_->isRunning()) { output_interface_ = socket_option_value; portal_->setOutputInterface(output_interface_); result = SOCKET_OPTION_SET; - break; - - default: - return result; - } + } + break; + default: + return result; } return result; } @@ -558,6 +500,29 @@ class ConsumerSocket : public Socket<BasePortal> { }); } + int setSocketOption(int socket_option_key, + StrategyCallback socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in + // case setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + StrategyCallback socket_option_value) -> int { + switch (socket_option_key) { + case ConsumerCallbacksOptions::FWD_STRATEGY_CHANGE: + on_fwd_strategy_ = socket_option_value; + break; + case ConsumerCallbacksOptions::REC_STRATEGY_CHANGE: + on_rec_strategy_ = socket_option_value; + break; + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; + }); + } + int getSocketOption(int socket_option_key, double &socket_option_value) { utils::SpinLock::Acquire locked(guard_raaqm_params_); switch (socket_option_key) { @@ -629,6 +594,22 @@ class ConsumerSocket : public Socket<BasePortal> { socket_option_value = timer_interval_milliseconds_; break; + case RtcTransportOptions::RECOVERY_STRATEGY: + socket_option_value = recovery_strategy_; + break; + + case GeneralTransportOptions::MANIFEST_FACTOR_RELEVANT: + socket_option_value = manifest_factor_relevant_; + break; + + case GeneralTransportOptions::MANIFEST_FACTOR_ALERT: + socket_option_value = manifest_factor_alert_; + break; + + case GeneralTransportOptions::PACKET_FORMAT: + socket_option_value = packet_format_; + break; + default: return SOCKET_OPTION_NOT_GET; } @@ -642,14 +623,6 @@ class ConsumerSocket : public Socket<BasePortal> { socket_option_value = transport_protocol_->isRunning(); break; - case GeneralTransportOptions::VERIFY_SIGNATURE: - socket_option_value = verify_signature_; - break; - - case GeneralTransportOptions::KEY_CONTENT: - socket_option_value = key_content_; - break; - case GeneralTransportOptions::ASYNC_MODE: socket_option_value = is_async_; break; @@ -658,6 +631,18 @@ class ConsumerSocket : public Socket<BasePortal> { socket_option_value = reset_window_; break; + case RtcTransportOptions::AGGREGATED_DATA: + socket_option_value = aggregated_data_; + break; + + case RtcTransportOptions::CONTENT_SHARING_MODE: + socket_option_value = content_sharing_mode_; + break; + + case RtcTransportOptions::AGGREGATED_INTERESTS: + socket_option_value = aggregated_interests_; + break; + default: return SOCKET_OPTION_NOT_GET; } @@ -699,29 +684,6 @@ class ConsumerSocket : public Socket<BasePortal> { }); } - int getSocketOption( - int socket_option_key, - ConsumerContentObjectVerificationCallback **socket_option_value) { - // Reschedule the function on the io_service to avoid race condition in - // case setSocketOption is called while the io_service is running. - return rescheduleOnIOService( - socket_option_key, socket_option_value, - [this](int socket_option_key, - ConsumerContentObjectVerificationCallback **socket_option_value) - -> int { - switch (socket_option_key) { - case ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY: - *socket_option_value = &on_content_object_verification_; - break; - - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - }); - } - int getSocketOption(int socket_option_key, ConsumerInterestCallback **socket_option_value) { // Reschedule the function on the io_service to avoid race condition in @@ -755,33 +717,12 @@ class ConsumerSocket : public Socket<BasePortal> { }); } - int getSocketOption( - int socket_option_key, - ConsumerContentObjectVerificationFailedCallback **socket_option_value) { - // Reschedule the function on the io_service to avoid race condition in - // case setSocketOption is called while the io_service is running. - return rescheduleOnIOService( - socket_option_key, socket_option_value, - [this](int socket_option_key, - ConsumerContentObjectVerificationFailedCallback * - *socket_option_value) -> int { - switch (socket_option_key) { - case ConsumerCallbacksOptions::VERIFICATION_FAILED: - *socket_option_value = &verification_failed_callback_; - break; - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - }); - } - int getSocketOption(int socket_option_key, - std::shared_ptr<Portal> &socket_option_value) { + IcnObserver **socket_option_value) { + utils::SpinLock::Acquire locked(guard_raaqm_params_); switch (socket_option_key) { - case PORTAL: - socket_option_value = portal_; + case RateEstimationOptions::RATE_ESTIMATION_OBSERVER: + *socket_option_value = (rate_estimation_observer_); break; default: @@ -792,22 +733,19 @@ class ConsumerSocket : public Socket<BasePortal> { } int getSocketOption(int socket_option_key, - IcnObserver **socket_option_value) { - utils::SpinLock::Acquire locked(guard_raaqm_params_); + std::shared_ptr<auth::Signer> &socket_option_value) { switch (socket_option_key) { - case RateEstimationOptions::RATE_ESTIMATION_OBSERVER: - *socket_option_value = (rate_estimation_observer_); - break; + case GeneralTransportOptions::SIGNER: + socket_option_value = signer_; + return SOCKET_OPTION_GET; default: return SOCKET_OPTION_NOT_GET; } - - return SOCKET_OPTION_GET; } int getSocketOption(int socket_option_key, - std::shared_ptr<utils::Verifier> &socket_option_value) { + std::shared_ptr<auth::Verifier> &socket_option_value) { switch (socket_option_key) { case GeneralTransportOptions::VERIFIER: socket_option_value = verifier_; @@ -815,7 +753,6 @@ class ConsumerSocket : public Socket<BasePortal> { default: return SOCKET_OPTION_NOT_GET; } - return SOCKET_OPTION_GET; } @@ -827,7 +764,6 @@ class ConsumerSocket : public Socket<BasePortal> { default: return SOCKET_OPTION_NOT_GET; } - return SOCKET_OPTION_GET; } @@ -864,6 +800,29 @@ class ConsumerSocket : public Socket<BasePortal> { }); } + int getSocketOption(int socket_option_key, + StrategyCallback **socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in + // case setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + StrategyCallback **socket_option_value) -> int { + switch (socket_option_key) { + case ConsumerCallbacksOptions::FWD_STRATEGY_CHANGE: + *socket_option_value = &on_fwd_strategy_; + break; + case ConsumerCallbacksOptions::REC_STRATEGY_CHANGE: + *socket_option_value = &on_rec_strategy_; + break; + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; + }); + } + protected: template <typename Lambda, typename arg2> int rescheduleOnIOService(int socket_option_key, arg2 socket_option_value, @@ -871,14 +830,14 @@ class ConsumerSocket : public Socket<BasePortal> { // To enforce type check std::function<int(int, arg2)> func = lambda_func; int result = SOCKET_OPTION_SET; - if (transport_protocol_->isRunning()) { + if (transport_protocol_ && transport_protocol_->isRunning()) { std::mutex mtx; /* Condition variable for the wait */ std::condition_variable cv; bool done = false; - portal_->getIoService().dispatch([&socket_option_key, - &socket_option_value, &mtx, &cv, - &result, &done, &func]() { + portal_->getThread().tryRunHandlerNow([&socket_option_key, + &socket_option_value, &mtx, &cv, + &result, &done, &func]() { std::unique_lock<std::mutex> lck(mtx); done = true; result = func(socket_option_key, socket_option_value); @@ -898,14 +857,12 @@ class ConsumerSocket : public Socket<BasePortal> { protected: interface::ConsumerSocket *consumer_interface_; - std::shared_ptr<Portal> portal_; - utils::EventThread async_downloader_; - // No need to protect from multiple accesses in the async consumer // The parameter is accessible only with a getSocketOption and // set from the consume Name network_name_; + hicn_packet_format_t packet_format_; int interest_lifetime_; double min_window_size_; @@ -926,13 +883,12 @@ class ConsumerSocket : public Socket<BasePortal> { int rate_estimation_batching_parameter_; int rate_estimation_choice_; - bool is_async_; - // Verification parameters - std::shared_ptr<utils::Verifier> verifier_; - PARCKeyId *key_id_; + uint32_t manifest_factor_relevant_; + uint32_t manifest_factor_alert_; + std::shared_ptr<auth::Verifier> verifier_; + transport::auth::KeyId *key_id_; std::atomic_bool verify_signature_; - bool key_content_; bool reset_window_; ConsumerInterestCallback on_interest_retransmission_; @@ -940,23 +896,29 @@ class ConsumerSocket : public Socket<BasePortal> { ConsumerInterestCallback on_interest_timeout_; ConsumerInterestCallback on_interest_satisfied_; ConsumerContentObjectCallback on_content_object_input_; - ConsumerContentObjectVerificationCallback on_content_object_verification_; ConsumerTimerCallback stats_summary_; - ConsumerContentObjectVerificationFailedCallback verification_failed_callback_; + StrategyCallback on_fwd_strategy_; + StrategyCallback on_rec_strategy_; ReadCallback *read_callback_; uint32_t timer_interval_milliseconds_; // Transport protocol - std::unique_ptr<protocol::TransportProtocol> transport_protocol_; + std::shared_ptr<protocol::TransportProtocol> transport_protocol_; // Statistic TransportStatistics stats_; + // RTC protocol + RtcTransportRecoveryStrategies recovery_strategy_; + bool aggregated_data_; + bool content_sharing_mode_; + bool aggregated_interests_; + utils::SpinLock guard_raaqm_params_; std::string output_interface_; }; } // namespace implementation -} // namespace transport
\ No newline at end of file +} // namespace transport |