diff options
Diffstat (limited to 'libtransport/src/implementation')
15 files changed, 352 insertions, 275 deletions
diff --git a/libtransport/src/implementation/CMakeLists.txt b/libtransport/src/implementation/CMakeLists.txt index daf899d06..1f2a33a4c 100644 --- a/libtransport/src/implementation/CMakeLists.txt +++ b/libtransport/src/implementation/CMakeLists.txt @@ -1,4 +1,4 @@ -# Copyright (c) 2017-2019 Cisco and/or its affiliates. +# Copyright (c) 2021 Cisco and/or its affiliates. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at: diff --git a/libtransport/src/implementation/p2psecure_socket_consumer.cc b/libtransport/src/implementation/p2psecure_socket_consumer.cc index 4b14da5d2..6b67a5487 100644 --- a/libtransport/src/implementation/p2psecure_socket_consumer.cc +++ b/libtransport/src/implementation/p2psecure_socket_consumer.cc @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2020 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -44,7 +44,7 @@ int readOld(BIO *b, char *buf, int size) { if (!socket->something_to_read_) { if (!socket->transport_protocol_->isRunning()) { socket->network_name_.setSuffix(socket->random_suffix_); - socket->ConsumerSocket::asyncConsume(socket->network_name_); + socket->ConsumerSocket::consume(socket->network_name_); } if (!socket->something_to_read_) socket->cv_.wait(lck); @@ -312,36 +312,6 @@ int P2PSecureConsumerSocket::consume(const Name &name) { return tls_consumer_->consume((prefix->mapName(name))); } -int P2PSecureConsumerSocket::asyncConsume(const Name &name) { - if (transport_protocol_->isRunning()) { - return CONSUMER_BUSY; - } - - if (handshake() != 1) { - throw errors::RuntimeException("Unable to perform client handshake"); - } else { - DLOG_IF(INFO, VLOG_IS_ON(2)) << "Handshake performed!"; - } - - initSessionSocket(); - - if (tls_consumer_ == nullptr) { - throw errors::RuntimeException("TLS socket does not exist"); - } - - std::shared_ptr<Name> prefix_name = std::make_shared<Name>( - secure_prefix_.family, - ip_address_get_buffer(&(secure_prefix_.address), secure_prefix_.family)); - std::shared_ptr<Prefix> prefix = - std::make_shared<Prefix>(*prefix_name, secure_prefix_.len); - - if (payload_ != NULL) - return tls_consumer_->asyncConsume((prefix->mapName(name)), - std::move(payload_)); - else - return tls_consumer_->asyncConsume((prefix->mapName(name))); -} - void P2PSecureConsumerSocket::registerPrefix(const Prefix &producer_namespace) { producer_namespace_ = producer_namespace; } @@ -385,7 +355,7 @@ void P2PSecureConsumerSocket::readBufferAvailable( cv_.notify_one(); } -void P2PSecureConsumerSocket::readError(const std::error_code ec) noexcept {}; +void P2PSecureConsumerSocket::readError(const std::error_code &ec) noexcept {}; void P2PSecureConsumerSocket::readSuccess(std::size_t total_size) noexcept { std::unique_lock<std::mutex> lck(this->mtx_); diff --git a/libtransport/src/implementation/p2psecure_socket_consumer.h b/libtransport/src/implementation/p2psecure_socket_consumer.h index a35a50352..a5e69f611 100644 --- a/libtransport/src/implementation/p2psecure_socket_consumer.h +++ b/libtransport/src/implementation/p2psecure_socket_consumer.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2020 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -48,8 +48,6 @@ class P2PSecureConsumerSocket : public ConsumerSocket, int consume(const Name &name) override; - int asyncConsume(const Name &name) override; - void registerPrefix(const Prefix &producer_namespace); int setSocketOption( @@ -120,7 +118,7 @@ class P2PSecureConsumerSocket : public ConsumerSocket, virtual void readBufferAvailable( std::unique_ptr<utils::MemBuf> &&buffer) noexcept override; - virtual void readError(const std::error_code ec) noexcept override; + virtual void readError(const std::error_code &ec) noexcept override; virtual void readSuccess(std::size_t total_size) noexcept override; diff --git a/libtransport/src/implementation/p2psecure_socket_producer.cc b/libtransport/src/implementation/p2psecure_socket_producer.cc index 3748001fc..ee78ea53b 100644 --- a/libtransport/src/implementation/p2psecure_socket_producer.cc +++ b/libtransport/src/implementation/p2psecure_socket_producer.cc @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2020 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -20,6 +20,7 @@ #include <interfaces/tls_rtc_socket_producer.h> #include <interfaces/tls_socket_producer.h> #include <openssl/bio.h> +#include <openssl/pkcs12.h> #include <openssl/rand.h> #include <openssl/ssl.h> @@ -41,7 +42,7 @@ P2PSecureProducerSocket::P2PSecureProducerSocket( P2PSecureProducerSocket::P2PSecureProducerSocket( interface::ProducerSocket *producer_socket, bool rtc, - const std::shared_ptr<auth::Identity> &identity) + std::string &keystore_path, std::string &keystore_pwd) : ProducerSocket(producer_socket, ProductionProtocolAlgorithms::BYTE_STREAM), rtc_(rtc), @@ -50,8 +51,16 @@ P2PSecureProducerSocket::P2PSecureProducerSocket( map_producers(), list_producers() { /* Setup SSL context (identity and parameter to use TLS 1.3) */ - cert_509_ = identity->getCertificate().get(); - pkey_rsa_ = identity->getPrivateKey().get(); + FILE *p12file = fopen(keystore_path.c_str(), "r"); + if (p12file == NULL) + throw errors::RuntimeException("impossible open keystore"); + std::unique_ptr<PKCS12, decltype(&::PKCS12_free)> p12( + d2i_PKCS12_fp(p12file, NULL), ::PKCS12_free); + // now we parse the file to get the first key and certificate + if (1 != PKCS12_parse(p12.get(), keystore_pwd.c_str(), &pkey_rsa_, &cert_509_, + NULL)) + throw errors::RuntimeException("impossible to get the private key"); + fclose(p12file); /* Set the callback so that when an interest is received we catch it and we * decrypt the payload before passing it to the application. */ @@ -133,15 +142,15 @@ uint32_t P2PSecureProducerSocket::produceDatagram( // TODO throw errors::NotImplementedException(); - if (!rtc_) { - throw errors::RuntimeException( - "RTC must be the transport protocol to start the production of current " - "data. Aborting."); - } + // if (!rtc_) { + // throw errors::RuntimeException( + // "RTC must be the transport protocol to start the production of + // current " "data. Aborting."); + // } - std::unique_lock<std::mutex> lck(mtx_); + // std::unique_lock<std::mutex> lck(mtx_); - if (list_producers.empty()) cv_.wait(lck); + // if (list_producers.empty()) cv_.wait(lck); // TODO // for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) @@ -151,7 +160,7 @@ uint32_t P2PSecureProducerSocket::produceDatagram( // rtc_producer->produce(utils::MemBuf::copyBuffer(buffer, buffer_size)); // } - return 0; + // return 0; } uint32_t P2PSecureProducerSocket::produceStream( @@ -197,44 +206,6 @@ uint32_t P2PSecureProducerSocket::produceStream(const Name &content_name, return segments; } -// void P2PSecureProducerSocket::asyncProduce(const Name &content_name, -// const uint8_t *buf, -// size_t buffer_size, bool is_last, -// uint32_t *start_offset) { -// if (rtc_) { -// throw errors::RuntimeException( -// "RTC transport protocol is not compatible with the production of " -// "current data. Aborting."); -// } - -// std::unique_lock<std::mutex> lck(mtx_); -// if (list_producers.empty()) cv_.wait(lck); - -// for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) -// { -// (*it)->asyncProduce(content_name, buf, buffer_size, is_last, -// start_offset); -// } -// } - -void P2PSecureProducerSocket::asyncProduce( - Name content_name, std::unique_ptr<utils::MemBuf> &&buffer, bool is_last, - uint32_t offset, uint32_t **last_segment) { - if (rtc_) { - throw errors::RuntimeException( - "RTC transport protocol is not compatible with the production of " - "current data. Aborting."); - } - - std::unique_lock<std::mutex> lck(mtx_); - if (list_producers.empty()) cv_.wait(lck); - - for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) { - (*it)->asyncProduce(content_name, buffer->clone(), is_last, offset, - last_segment); - } -} - /* Redefinition of socket options to avoid name hiding */ int P2PSecureProducerSocket::setSocketOption( int socket_option_key, ProducerInterestCallback socket_option_value) { diff --git a/libtransport/src/implementation/p2psecure_socket_producer.h b/libtransport/src/implementation/p2psecure_socket_producer.h index f94347258..00f407a75 100644 --- a/libtransport/src/implementation/p2psecure_socket_producer.h +++ b/libtransport/src/implementation/p2psecure_socket_producer.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -15,7 +15,6 @@ #pragma once -#include <hicn/transport/auth/identity.h> #include <hicn/transport/auth/signer.h> #include <implementation/socket_producer.h> // #include <implementation/tls_rtc_socket_producer.h> @@ -38,9 +37,9 @@ class P2PSecureProducerSocket : public ProducerSocket { public: explicit P2PSecureProducerSocket(interface::ProducerSocket *producer_socket); - explicit P2PSecureProducerSocket( - interface::ProducerSocket *producer_socket, bool rtc, - const std::shared_ptr<auth::Identity> &identity); + explicit P2PSecureProducerSocket(interface::ProducerSocket *producer_socket, + bool rtc, std::string &keystore_path, + std::string &keystore_pwd); ~P2PSecureProducerSocket(); @@ -56,10 +55,6 @@ class P2PSecureProducerSocket : public ProducerSocket { bool is_last = true, uint32_t start_offset = 0) override; - void asyncProduce(Name content_name, std::unique_ptr<utils::MemBuf> &&buffer, - bool is_last, uint32_t offset, - uint32_t **last_segment = nullptr) override; - int setSocketOption(int socket_option_key, ProducerInterestCallback socket_option_value) override; diff --git a/libtransport/src/implementation/socket.cc b/libtransport/src/implementation/socket.cc index 2e21f2bc3..95941da07 100644 --- a/libtransport/src/implementation/socket.cc +++ b/libtransport/src/implementation/socket.cc @@ -14,13 +14,42 @@ */ #include <core/global_configuration.h> +#include <hicn/transport/interfaces/socket_options_default_values.h> #include <implementation/socket.h> namespace transport { namespace implementation { Socket::Socket(std::shared_ptr<core::Portal> &&portal) - : portal_(std::move(portal)), is_async_(false) {} + : portal_(std::move(portal)), + is_async_(false), + packet_format_(interface::default_values::packet_format) {} + +int Socket::setSocketOption(int socket_option_key, + hicn_format_t packet_format) { + switch (socket_option_key) { + case interface::GeneralTransportOptions::PACKET_FORMAT: + packet_format_ = packet_format; + break; + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; +} + +int Socket::getSocketOption(int socket_option_key, + hicn_format_t &packet_format) { + switch (socket_option_key) { + case interface::GeneralTransportOptions::PACKET_FORMAT: + packet_format = packet_format_; + break; + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; +} } // namespace implementation } // namespace transport
\ No newline at end of file diff --git a/libtransport/src/implementation/socket.h b/libtransport/src/implementation/socket.h index cf22c03e1..11c9a704d 100644 --- a/libtransport/src/implementation/socket.h +++ b/libtransport/src/implementation/socket.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -38,7 +38,26 @@ class Socket { virtual void connect() = 0; virtual bool isRunning() = 0; - virtual asio::io_service &getIoService() { return portal_->getIoService(); } + virtual asio::io_service &getIoService() { + return portal_->getThread().getIoService(); + } + + int setSocketOption(int socket_option_key, hicn_format_t packet_format); + int getSocketOption(int socket_option_key, hicn_format_t &packet_format); + + int getSocketOption(int socket_option_key, + std::shared_ptr<core::Portal> &socket_option_value) { + switch (socket_option_key) { + case interface::GeneralTransportOptions::PORTAL: + socket_option_value = portal_; + break; + default: + return SOCKET_OPTION_NOT_GET; + ; + } + + return SOCKET_OPTION_GET; + } protected: Socket(std::shared_ptr<core::Portal> &&portal); @@ -48,6 +67,7 @@ class Socket { protected: std::shared_ptr<core::Portal> portal_; bool is_async_; + hicn_format_t packet_format_; }; } // namespace implementation diff --git a/libtransport/src/implementation/socket_consumer.h b/libtransport/src/implementation/socket_consumer.h index e0981af7f..ebdac7f93 100644 --- a/libtransport/src/implementation/socket_consumer.h +++ b/libtransport/src/implementation/socket_consumer.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -20,6 +20,7 @@ #include <hicn/transport/interfaces/socket_options_default_values.h> #include <hicn/transport/interfaces/statistics.h> #include <hicn/transport/utils/event_thread.h> +#include <implementation/socket.h> #include <protocols/cbr.h> #include <protocols/raaqm.h> #include <protocols/rtc/rtc.h> @@ -38,7 +39,6 @@ class ConsumerSocket : public Socket { std::shared_ptr<core::Portal> &&portal) : Socket(std::move(portal)), consumer_interface_(consumer), - async_downloader_(), interest_lifetime_(default_values::interest_lifetime), min_window_size_(default_values::min_window_size), max_window_size_(default_values::max_window_size), @@ -56,6 +56,7 @@ class ConsumerSocket : public Socket { rate_estimation_observer_(nullptr), rate_estimation_batching_parameter_(default_values::batch), rate_estimation_choice_(0), + max_unverified_delay_(default_values::max_unverified_delay), verifier_(std::make_shared<auth::VoidVerifier>()), verify_signature_(false), reset_window_(false), @@ -64,41 +65,42 @@ class ConsumerSocket : public Socket { on_interest_satisfied_(VOID_HANDLER), on_content_object_input_(VOID_HANDLER), stats_summary_(VOID_HANDLER), + on_fwd_strategy_(VOID_HANDLER), + on_rec_strategy_(VOID_HANDLER), read_callback_(nullptr), timer_interval_milliseconds_(0), + recovery_strategy_(RtcTransportRecoveryStrategies::RTX_ONLY), + aggregated_data_(false), + fec_setting_(""), guard_raaqm_params_() { switch (protocol) { case TransportProtocolAlgorithms::CBR: transport_protocol_ = - std::make_unique<protocol::CbrTransportProtocol>(this); + std::make_shared<protocol::CbrTransportProtocol>(this); break; case TransportProtocolAlgorithms::RTC: transport_protocol_ = - std::make_unique<protocol::rtc::RTCTransportProtocol>(this); + std::make_shared<protocol::rtc::RTCTransportProtocol>(this); break; case TransportProtocolAlgorithms::RAAQM: default: transport_protocol_ = - std::make_unique<protocol::RaaqmTransportProtocol>(this); + std::make_shared<protocol::RaaqmTransportProtocol>(this); break; } } public: ConsumerSocket(interface::ConsumerSocket *consumer, int protocol) - : ConsumerSocket(consumer, protocol, std::make_shared<core::Portal>()) {} + : ConsumerSocket(consumer, protocol, core::Portal::createShared()) {} ConsumerSocket(interface::ConsumerSocket *consumer, int protocol, - asio::io_service &io_service) - : ConsumerSocket(consumer, protocol, - std::make_shared<core::Portal>(io_service)) { + ::utils::EventThread &worker) + : ConsumerSocket(consumer, protocol, core::Portal::createShared(worker)) { is_async_ = true; } - ~ConsumerSocket() { - stop(); - async_downloader_.stop(); - } + ~ConsumerSocket() { stop(); } interface::ConsumerSocket *getInterface() { return consumer_interface_; @@ -122,18 +124,6 @@ class ConsumerSocket : public Socket { transport_protocol_->start(); - return is_async_ ? CONSUMER_RUNNING : CONSUMER_FINISHED; - } - - virtual int asyncConsume(const Name &name) { - if (!async_downloader_.stopped()) { - async_downloader_.add([this, name]() { - network_name_ = std::move(name); - network_name_.setSuffix(0); - transport_protocol_->start(); - }); - } - return CONSUMER_RUNNING; } @@ -149,6 +139,9 @@ class ConsumerSocket : public Socket { } } + using Socket::getSocketOption; + using Socket::setSocketOption; + virtual int setSocketOption(int socket_option_key, ReadCallback *socket_option_value) { // Reschedule the function on the io_service to avoid race condition in @@ -245,6 +238,10 @@ class ConsumerSocket : public Socket { interest_lifetime_ = socket_option_value; break; + case GeneralTransportOptions::MAX_UNVERIFIED_TIME: + max_unverified_delay_ = socket_option_value; + break; + case RateEstimationOptions::RATE_ESTIMATION_BATCH_PARAMETER: if (socket_option_value > 0) { rate_estimation_batching_parameter_ = socket_option_value; @@ -265,6 +262,11 @@ class ConsumerSocket : public Socket { timer_interval_milliseconds_ = socket_option_value; break; + case RtcTransportOptions::RECOVERY_STRATEGY: + recovery_strategy_ = + (RtcTransportRecoveryStrategies)socket_option_value; + break; + default: return SOCKET_OPTION_NOT_SET; } @@ -328,6 +330,11 @@ class ConsumerSocket : public Socket { result = SOCKET_OPTION_SET; break; + case RtcTransportOptions::AGGREGATED_DATA: + aggregated_data_ = socket_option_value; + result = SOCKET_OPTION_SET; + break; + default: return result; } @@ -406,36 +413,37 @@ class ConsumerSocket : public Socket { int setSocketOption( int socket_option_key, const std::shared_ptr<auth::Verifier> &socket_option_value) { - int result = SOCKET_OPTION_NOT_SET; if (!transport_protocol_->isRunning()) { switch (socket_option_key) { case GeneralTransportOptions::VERIFIER: verifier_.reset(); verifier_ = socket_option_value; - result = SOCKET_OPTION_SET; break; default: - return result; + return SOCKET_OPTION_NOT_SET; } } - - return result; + return SOCKET_OPTION_SET; } int setSocketOption(int socket_option_key, const std::string &socket_option_value) { int result = SOCKET_OPTION_NOT_SET; - if (!transport_protocol_->isRunning()) { - switch (socket_option_key) { - case DataLinkOptions::OUTPUT_INTERFACE: + switch (socket_option_key) { + case DataLinkOptions::OUTPUT_INTERFACE: + if (!transport_protocol_->isRunning()) { output_interface_ = socket_option_value; portal_->setOutputInterface(output_interface_); result = SOCKET_OPTION_SET; - break; + } + break; + case GeneralTransportOptions::FEC_TYPE: + fec_setting_ = socket_option_value; + result = SOCKET_OPTION_SET; + break; - default: - return result; - } + default: + return result; } return result; } @@ -461,6 +469,29 @@ class ConsumerSocket : public Socket { }); } + int setSocketOption(int socket_option_key, + StrategyCallback socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in + // case setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + StrategyCallback socket_option_value) -> int { + switch (socket_option_key) { + case ConsumerCallbacksOptions::FWD_STRATEGY_CHANGE: + on_fwd_strategy_ = socket_option_value; + break; + case ConsumerCallbacksOptions::REC_STRATEGY_CHANGE: + on_rec_strategy_ = socket_option_value; + break; + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; + }); + } + int getSocketOption(int socket_option_key, double &socket_option_value) { utils::SpinLock::Acquire locked(guard_raaqm_params_); switch (socket_option_key) { @@ -516,6 +547,10 @@ class ConsumerSocket : public Socket { socket_option_value = interest_lifetime_; break; + case GeneralTransportOptions::MAX_UNVERIFIED_TIME: + socket_option_value = max_unverified_delay_; + break; + case RaaqmTransportOptions::SAMPLE_NUMBER: socket_option_value = sample_number_; break; @@ -532,6 +567,10 @@ class ConsumerSocket : public Socket { socket_option_value = timer_interval_milliseconds_; break; + case RtcTransportOptions::RECOVERY_STRATEGY: + socket_option_value = recovery_strategy_; + break; + default: return SOCKET_OPTION_NOT_GET; } @@ -553,6 +592,10 @@ class ConsumerSocket : public Socket { socket_option_value = reset_window_; break; + case RtcTransportOptions::AGGREGATED_DATA: + socket_option_value = aggregated_data_; + break; + default: return SOCKET_OPTION_NOT_GET; } @@ -628,20 +671,6 @@ class ConsumerSocket : public Socket { } int getSocketOption(int socket_option_key, - std::shared_ptr<core::Portal> &socket_option_value) { - switch (socket_option_key) { - case PORTAL: - socket_option_value = portal_; - break; - - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } - - int getSocketOption(int socket_option_key, IcnObserver **socket_option_value) { utils::SpinLock::Acquire locked(guard_raaqm_params_); switch (socket_option_key) { @@ -665,7 +694,6 @@ class ConsumerSocket : public Socket { default: return SOCKET_OPTION_NOT_GET; } - return SOCKET_OPTION_GET; } @@ -674,6 +702,9 @@ class ConsumerSocket : public Socket { case DataLinkOptions::OUTPUT_INTERFACE: socket_option_value = output_interface_; break; + case GeneralTransportOptions::FEC_TYPE: + socket_option_value = fec_setting_; + break; default: return SOCKET_OPTION_NOT_GET; } @@ -714,6 +745,29 @@ class ConsumerSocket : public Socket { }); } + int getSocketOption(int socket_option_key, + StrategyCallback **socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in + // case setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + StrategyCallback **socket_option_value) -> int { + switch (socket_option_key) { + case ConsumerCallbacksOptions::FWD_STRATEGY_CHANGE: + *socket_option_value = &on_fwd_strategy_; + break; + case ConsumerCallbacksOptions::REC_STRATEGY_CHANGE: + *socket_option_value = &on_rec_strategy_; + break; + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; + }); + } + protected: template <typename Lambda, typename arg2> int rescheduleOnIOService(int socket_option_key, arg2 socket_option_value, @@ -726,9 +780,9 @@ class ConsumerSocket : public Socket { /* Condition variable for the wait */ std::condition_variable cv; bool done = false; - portal_->getIoService().dispatch([&socket_option_key, - &socket_option_value, &mtx, &cv, - &result, &done, &func]() { + portal_->getThread().tryRunHandlerNow([&socket_option_key, + &socket_option_value, &mtx, &cv, + &result, &done, &func]() { std::unique_lock<std::mutex> lck(mtx); done = true; result = func(socket_option_key, socket_option_value); @@ -748,8 +802,6 @@ class ConsumerSocket : public Socket { protected: interface::ConsumerSocket *consumer_interface_; - utils::EventThread async_downloader_; - // No need to protect from multiple accesses in the async consumer // The parameter is accessible only with a getSocketOption and // set from the consume @@ -776,6 +828,7 @@ class ConsumerSocket : public Socket { int rate_estimation_choice_; // Verification parameters + int max_unverified_delay_; std::shared_ptr<auth::Verifier> verifier_; transport::auth::KeyId *key_id_; std::atomic_bool verify_signature_; @@ -787,17 +840,26 @@ class ConsumerSocket : public Socket { ConsumerInterestCallback on_interest_satisfied_; ConsumerContentObjectCallback on_content_object_input_; ConsumerTimerCallback stats_summary_; + StrategyCallback on_fwd_strategy_; + StrategyCallback on_rec_strategy_; ReadCallback *read_callback_; uint32_t timer_interval_milliseconds_; // Transport protocol - std::unique_ptr<protocol::TransportProtocol> transport_protocol_; + std::shared_ptr<protocol::TransportProtocol> transport_protocol_; // Statistic TransportStatistics stats_; + // RTC protocol + RtcTransportRecoveryStrategies recovery_strategy_; + bool aggregated_data_; + + // FEC setting + std::string fec_setting_; + utils::SpinLock guard_raaqm_params_; std::string output_interface_; }; diff --git a/libtransport/src/implementation/socket_producer.h b/libtransport/src/implementation/socket_producer.h index 9daf79b9d..37151d497 100644 --- a/libtransport/src/implementation/socket_producer.h +++ b/libtransport/src/implementation/socket_producer.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -40,6 +40,7 @@ namespace implementation { using namespace core; using namespace interface; +using ProducerCallback = interface::ProducerSocket::Callback; class ProducerSocket : public Socket { private: @@ -48,11 +49,14 @@ class ProducerSocket : public Socket { : Socket(std::move(portal)), producer_interface_(producer_socket), data_packet_size_(default_values::content_object_packet_size), + max_segment_size_(default_values::content_object_packet_size), content_object_expiry_time_(default_values::content_object_expiry_time), - async_thread_(), - making_manifest_(false), + making_manifest_(default_values::manifest_capacity), hash_algorithm_(auth::CryptoHashType::SHA256), - suffix_strategy_(core::NextSegmentCalculationStrategy::INCREMENTAL), + signer_(std::make_shared<auth::VoidSigner>()), + suffix_strategy_(std::make_shared<utils::IncrementalSuffixStrategy>(0)), + aggregated_data_(false), + fec_setting_(""), on_interest_input_(VOID_HANDLER), on_interest_dropped_input_buffer_(VOID_HANDLER), on_interest_inserted_input_buffer_(VOID_HANDLER), @@ -63,29 +67,30 @@ class ProducerSocket : public Socket { on_content_object_in_output_buffer_(VOID_HANDLER), on_content_object_output_(VOID_HANDLER), on_content_object_evicted_from_output_buffer_(VOID_HANDLER), - on_content_produced_(VOID_HANDLER) { + on_content_produced_(VOID_HANDLER), + application_callback_(nullptr) { switch (protocol) { case ProductionProtocolAlgorithms::RTC_PROD: production_protocol_ = - std::make_unique<protocol::RTCProductionProtocol>(this); + std::make_shared<protocol::RTCProductionProtocol>(this); break; case ProductionProtocolAlgorithms::BYTE_STREAM: default: production_protocol_ = - std::make_unique<protocol::ByteStreamProductionProtocol>(this); + std::make_shared<protocol::ByteStreamProductionProtocol>(this); break; } } public: ProducerSocket(interface::ProducerSocket *producer, int protocol) - : ProducerSocket(producer, protocol, std::make_shared<core::Portal>()) {} + : ProducerSocket(producer, protocol, core::Portal::createShared()) { + is_async_ = true; + } ProducerSocket(interface::ProducerSocket *producer, int protocol, - asio::io_service &io_service) - : ProducerSocket(producer, protocol, - std::make_shared<core::Portal>(io_service)) { - is_async_ = true; + ::utils::EventThread &worker) + : ProducerSocket(producer, protocol, core::Portal::createShared(worker)) { } virtual ~ProducerSocket() {} @@ -98,31 +103,9 @@ class ProducerSocket : public Socket { producer_interface_ = producer_socket; } - void connect() override { - portal_->connect(false); - production_protocol_->start(); - } - - bool isRunning() override { return !production_protocol_->isRunning(); }; + void connect() override { portal_->connect(false); } - virtual void asyncProduce(Name content_name, - std::unique_ptr<utils::MemBuf> &&buffer, - bool is_last, uint32_t offset, - uint32_t **last_segment = nullptr) { - if (!async_thread_.stopped()) { - auto a = buffer.release(); - async_thread_.add([this, content_name, a, is_last, offset, - last_segment]() { - auto buf = std::unique_ptr<utils::MemBuf>(a); - if (last_segment != NULL) { - **last_segment = offset + produceStream(content_name, std::move(buf), - is_last, offset); - } else { - produceStream(content_name, std::move(buf), is_last, offset); - } - }); - } - } + bool isRunning() override { return production_protocol_->isRunning(); }; virtual uint32_t produceStream(const Name &content_name, std::unique_ptr<utils::MemBuf> &&buffer, @@ -156,12 +139,38 @@ class ProducerSocket : public Socket { production_protocol_->produce(content_object); } + void sendMapme() { production_protocol_->sendMapme(); } + void registerPrefix(const Prefix &producer_namespace) { - production_protocol_->registerNamespaceWithNetwork(producer_namespace); + portal_->registerRoute(producer_namespace); } + void start() { production_protocol_->start(); } void stop() { production_protocol_->stop(); } + using Socket::getSocketOption; + using Socket::setSocketOption; + + virtual int setSocketOption(int socket_option_key, + ProducerCallback *socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in + // case setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ProducerCallback *socket_option_value) -> int { + switch (socket_option_key) { + case ProducerCallbacksOptions::PRODUCER_CALLBACK: + application_callback_ = socket_option_value; + break; + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; + }); + } + virtual int setSocketOption(int socket_option_key, uint32_t socket_option_value) { switch (socket_option_key) { @@ -172,6 +181,17 @@ class ProducerSocket : public Socket { } break; + case GeneralTransportOptions::MAKE_MANIFEST: + making_manifest_ = socket_option_value; + break; + + case GeneralTransportOptions::MAX_SEGMENT_SIZE: + if (socket_option_value <= default_values::max_content_object_size && + socket_option_value > 0) { + max_segment_size_ = socket_option_value; + } + break; + case GeneralTransportOptions::OUTPUT_BUFFER_SIZE: production_protocol_->setOutputBufferSize(socket_option_value); break; @@ -260,8 +280,8 @@ class ProducerSocket : public Socket { virtual int setSocketOption(int socket_option_key, bool socket_option_value) { switch (socket_option_key) { - case GeneralTransportOptions::MAKE_MANIFEST: - making_manifest_ = socket_option_value; + case RtcTransportOptions::AGGREGATED_DATA: + aggregated_data_ = socket_option_value; break; default: @@ -385,7 +405,7 @@ class ProducerSocket : public Socket { virtual int setSocketOption( int socket_option_key, - core::NextSegmentCalculationStrategy socket_option_value) { + const std::shared_ptr<utils::SuffixStrategy> &socket_option_value) { switch (socket_option_key) { case GeneralTransportOptions::SUFFIX_STRATEGY: suffix_strategy_ = socket_option_value; @@ -413,9 +433,33 @@ class ProducerSocket : public Socket { return SOCKET_OPTION_SET; } + int getSocketOption(int socket_option_key, + ProducerCallback **socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in + // case setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, + ProducerCallback **socket_option_value) -> int { + switch (socket_option_key) { + case ProducerCallbacksOptions::PRODUCER_CALLBACK: + *socket_option_value = application_callback_; + break; + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; + }); + } + virtual int getSocketOption(int socket_option_key, uint32_t &socket_option_value) { switch (socket_option_key) { + case GeneralTransportOptions::MAKE_MANIFEST: + socket_option_value = making_manifest_; + break; + case GeneralTransportOptions::OUTPUT_BUFFER_SIZE: socket_option_value = production_protocol_->getOutputBufferSize(); break; @@ -424,6 +468,10 @@ class ProducerSocket : public Socket { socket_option_value = (uint32_t)data_packet_size_; break; + case GeneralTransportOptions::MAX_SEGMENT_SIZE: + socket_option_value = (uint32_t)max_segment_size_; + break; + case GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME: socket_option_value = content_object_expiry_time_; break; @@ -438,14 +486,14 @@ class ProducerSocket : public Socket { virtual int getSocketOption(int socket_option_key, bool &socket_option_value) { switch (socket_option_key) { - case GeneralTransportOptions::MAKE_MANIFEST: - socket_option_value = making_manifest_; - break; - case GeneralTransportOptions::ASYNC_MODE: socket_option_value = is_async_; break; + case RtcTransportOptions::AGGREGATED_DATA: + socket_option_value = aggregated_data_; + break; + default: return SOCKET_OPTION_NOT_GET; } @@ -547,21 +595,6 @@ class ProducerSocket : public Socket { }); } - virtual int getSocketOption( - int socket_option_key, - std::shared_ptr<core::Portal> &socket_option_value) { - switch (socket_option_key) { - case PORTAL: - socket_option_value = portal_; - break; - default: - return SOCKET_OPTION_NOT_GET; - ; - } - - return SOCKET_OPTION_GET; - } - virtual int getSocketOption(int socket_option_key, auth::CryptoHashType &socket_option_value) { switch (socket_option_key) { @@ -577,7 +610,7 @@ class ProducerSocket : public Socket { virtual int getSocketOption( int socket_option_key, - core::NextSegmentCalculationStrategy &socket_option_value) { + std::shared_ptr<utils::SuffixStrategy> &socket_option_value) { switch (socket_option_key) { case GeneralTransportOptions::SUFFIX_STRATEGY: socket_option_value = suffix_strategy_; @@ -603,9 +636,31 @@ class ProducerSocket : public Socket { return SOCKET_OPTION_GET; } + int getSocketOption(int socket_option_key, std::string &socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::FEC_TYPE: + socket_option_value = fec_setting_; + break; + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; + } + virtual int setSocketOption(int socket_option_key, const std::string &socket_option_value) { - return SOCKET_OPTION_NOT_SET; + int result = SOCKET_OPTION_NOT_SET; + switch (socket_option_key) { + case GeneralTransportOptions::FEC_TYPE: + fec_setting_ = socket_option_value; + result = SOCKET_OPTION_SET; + break; + + default: + return result; + } + return result; } // If the thread calling lambda_func is not the same of io_service, this @@ -623,9 +678,9 @@ class ProducerSocket : public Socket { std::condition_variable cv; bool done = false; - portal_->getIoService().dispatch([&socket_option_key, - &socket_option_value, &mtx, &cv, - &result, &done, &func]() { + portal_->getThread().tryRunHandlerNow([&socket_option_key, + &socket_option_value, &mtx, &cv, + &result, &done, &func]() { std::unique_lock<std::mutex> lck(mtx); done = true; result = func(socket_option_key, socket_option_value); @@ -655,9 +710,9 @@ class ProducerSocket : public Socket { /* Condition variable for the wait */ std::condition_variable cv; bool done = false; - portal_->getIoService().dispatch([&socket_option_key, - &socket_option_value, &mtx, &cv, - &result, &done, &func]() { + portal_->getThread().tryRunHandlerNow([&socket_option_key, + &socket_option_value, &mtx, &cv, + &result, &done, &func]() { std::unique_lock<std::mutex> lck(mtx); done = true; result = func(socket_option_key, socket_option_value); @@ -677,20 +732,24 @@ class ProducerSocket : public Socket { // Threads protected: interface::ProducerSocket *producer_interface_; - asio::io_service io_service_; std::atomic<size_t> data_packet_size_; + std::atomic<size_t> max_segment_size_; std::atomic<uint32_t> content_object_expiry_time_; - utils::EventThread async_thread_; - - std::atomic<bool> making_manifest_; + std::atomic<uint32_t> making_manifest_; std::atomic<auth::CryptoHashType> hash_algorithm_; std::atomic<auth::CryptoSuite> crypto_suite_; utils::SpinLock signer_lock_; std::shared_ptr<auth::Signer> signer_; - core::NextSegmentCalculationStrategy suffix_strategy_; + std::shared_ptr<utils::SuffixStrategy> suffix_strategy_; + + std::shared_ptr<protocol::ProductionProtocol> production_protocol_; - std::unique_ptr<protocol::ProductionProtocol> production_protocol_; + // RTC transport + bool aggregated_data_; + + // FEC setting + std::string fec_setting_; // callbacks ProducerInterestCallback on_interest_input_; @@ -706,6 +765,8 @@ class ProducerSocket : public Socket { ProducerContentObjectCallback on_content_object_evicted_from_output_buffer_; ProducerContentCallback on_content_produced_; + + ProducerCallback *application_callback_; }; } // namespace implementation diff --git a/libtransport/src/implementation/tls_rtc_socket_producer.cc b/libtransport/src/implementation/tls_rtc_socket_producer.cc index db62b10c1..06d613ef0 100644 --- a/libtransport/src/implementation/tls_rtc_socket_producer.cc +++ b/libtransport/src/implementation/tls_rtc_socket_producer.cc @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -90,11 +90,11 @@ int TLSRTCProducerSocket::writeOld(BIO *b, const char *buf, int num) { socket = (TLSRTCProducerSocket *)BIO_get_data(b); if (socket->getHandshakeState() != SERVER_FINISHED && socket->first_) { - bool making_manifest = socket->parent_->making_manifest_; + uint32_t making_manifest = socket->parent_->making_manifest_; socket->tls_chunks_--; socket->parent_->setSocketOption(GeneralTransportOptions::MAKE_MANIFEST, - false); + 0U); socket->parent_->ProducerSocket::produce( socket->name_, (const uint8_t *)buf, num, socket->tls_chunks_ == 0, 0); socket->parent_->setSocketOption(GeneralTransportOptions::MAKE_MANIFEST, diff --git a/libtransport/src/implementation/tls_rtc_socket_producer.h b/libtransport/src/implementation/tls_rtc_socket_producer.h index 92c657afc..f6dc425e4 100644 --- a/libtransport/src/implementation/tls_rtc_socket_producer.h +++ b/libtransport/src/implementation/tls_rtc_socket_producer.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: diff --git a/libtransport/src/implementation/tls_socket_consumer.cc b/libtransport/src/implementation/tls_socket_consumer.cc index 65472b41d..b368c4b88 100644 --- a/libtransport/src/implementation/tls_socket_consumer.cc +++ b/libtransport/src/implementation/tls_socket_consumer.cc @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -43,7 +43,7 @@ int readOldTLS(BIO *b, char *buf, int size) { if (!socket->something_to_read_) { if (!socket->transport_protocol_->isRunning()) { socket->network_name_.setSuffix(socket->random_suffix_); - socket->ConsumerSocket::asyncConsume(socket->network_name_); + socket->ConsumerSocket::consume(socket->network_name_); } if (!socket->something_to_read_) socket->cv_.wait(lck); @@ -284,31 +284,6 @@ int TLSConsumerSocket::download_content(const Name &name) { return CONSUMER_FINISHED; } -int TLSConsumerSocket::asyncConsume(const Name &name, - std::unique_ptr<utils::MemBuf> &&buffer) { - this->payload_ = std::move(buffer); - - this->ConsumerSocket::setSocketOption( - ConsumerCallbacksOptions::INTEREST_OUTPUT, - (ConsumerInterestCallback)std::bind( - &TLSConsumerSocket::setInterestPayload, this, std::placeholders::_1, - std::placeholders::_2)); - - return asyncConsume(name); -} - -int TLSConsumerSocket::asyncConsume(const Name &name) { - if ((SSL_in_before(this->ssl_) || SSL_in_init(this->ssl_))) { - throw errors::RuntimeException("Handshake not performed"); - } - - if (!async_downloader_tls_.stopped()) { - async_downloader_tls_.add([this, name]() { download_content(name); }); - } - - return CONSUMER_RUNNING; -} - void TLSConsumerSocket::registerPrefix(const Prefix &producer_namespace) { producer_namespace_ = producer_namespace; } @@ -353,7 +328,7 @@ void TLSConsumerSocket::readBufferAvailable( cv_.notify_one(); } -void TLSConsumerSocket::readError(const std::error_code ec) noexcept {} +void TLSConsumerSocket::readError(const std::error_code &ec) noexcept {} void TLSConsumerSocket::readSuccess(std::size_t total_size) noexcept { std::unique_lock<std::mutex> lck(this->mtx_); diff --git a/libtransport/src/implementation/tls_socket_consumer.h b/libtransport/src/implementation/tls_socket_consumer.h index be08ec47d..a74f1ee10 100644 --- a/libtransport/src/implementation/tls_socket_consumer.h +++ b/libtransport/src/implementation/tls_socket_consumer.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -47,9 +47,6 @@ class TLSConsumerSocket : public ConsumerSocket, int consume(const Name &name, std::unique_ptr<utils::MemBuf> &&buffer); int consume(const Name &name) override; - int asyncConsume(const Name &name, std::unique_ptr<utils::MemBuf> &&buffer); - int asyncConsume(const Name &name) override; - void registerPrefix(const Prefix &producer_namespace); int setSocketOption( @@ -100,7 +97,7 @@ class TLSConsumerSocket : public ConsumerSocket, virtual void readBufferAvailable( std::unique_ptr<utils::MemBuf> &&buffer) noexcept override; - virtual void readError(const std::error_code ec) noexcept override; + virtual void readError(const std::error_code &ec) noexcept override; virtual void readSuccess(std::size_t total_size) noexcept override; diff --git a/libtransport/src/implementation/tls_socket_producer.cc b/libtransport/src/implementation/tls_socket_producer.cc index 3992ca45c..47f3b43a6 100644 --- a/libtransport/src/implementation/tls_socket_producer.cc +++ b/libtransport/src/implementation/tls_socket_producer.cc @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2020 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -99,12 +99,12 @@ int TLSProducerSocket::writeOld(BIO *b, const char *buf, int num) { socket = (TLSProducerSocket *)BIO_get_data(b); if (socket->getHandshakeState() != SERVER_FINISHED && socket->first_) { - bool making_manifest = socket->parent_->making_manifest_; + uint32_t making_manifest = socket->parent_->making_manifest_; //! socket->tls_chunks_ corresponds to is_last socket->tls_chunks_--; socket->parent_->setSocketOption(GeneralTransportOptions::MAKE_MANIFEST, - false); + 0U); socket->parent_->ProducerSocket::produceStream( socket->name_, (const uint8_t *)buf, num, socket->tls_chunks_ == 0, socket->last_segment_); @@ -358,7 +358,7 @@ uint32_t TLSProducerSocket::produceStream( } size_t buf_size = buffer->length(); - name_ = production_protocol_->getNamespaces().front().mapName(content_name); + name_ = portal_->getServedNamespaces().begin()->mapName(content_name); tls_chunks_ = to_call_oncontentproduced_ = (int)ceil((float)buf_size / (float)SSL3_RT_MAX_PLAIN_LENGTH); @@ -394,8 +394,7 @@ int TLSProducerSocket::addHicnKeyIdCb(SSL *s, unsigned int ext_type, << "On addHicnKeyIdCb, for the prefix registration."; if (ext_type == 100) { - auto &prefix = - socket->parent_->production_protocol_->getNamespaces().front(); + auto &prefix = *socket->parent_->portal_->getServedNamespaces().begin(); const ip_prefix_t &ip_prefix = prefix.toIpPrefixStruct(); int inet_family = prefix.getAddressFamily(); uint16_t prefix_len_bits = prefix.getPrefixLength(); diff --git a/libtransport/src/implementation/tls_socket_producer.h b/libtransport/src/implementation/tls_socket_producer.h index a542a4d9f..0e958b321 100644 --- a/libtransport/src/implementation/tls_socket_producer.h +++ b/libtransport/src/implementation/tls_socket_producer.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: |