diff options
Diffstat (limited to 'libtransport/src/implementation')
15 files changed, 341 insertions, 997 deletions
diff --git a/libtransport/src/implementation/CMakeLists.txt b/libtransport/src/implementation/CMakeLists.txt index 5423a7697..392c99e15 100644 --- a/libtransport/src/implementation/CMakeLists.txt +++ b/libtransport/src/implementation/CMakeLists.txt @@ -13,13 +13,8 @@ cmake_minimum_required(VERSION 3.5 FATAL_ERROR) -list(APPEND SOURCE_FILES - ${CMAKE_CURRENT_SOURCE_DIR}/rtc_socket_producer.cc -) - list(APPEND HEADER_FILES ${CMAKE_CURRENT_SOURCE_DIR}/socket.h - ${CMAKE_CURRENT_SOURCE_DIR}/rtc_socket_producer.h ${CMAKE_CURRENT_SOURCE_DIR}/socket_producer.h ${CMAKE_CURRENT_SOURCE_DIR}/socket_consumer.h ) @@ -27,15 +22,16 @@ list(APPEND HEADER_FILES if (${OPENSSL_VERSION} VERSION_EQUAL "1.1.1a" OR ${OPENSSL_VERSION} VERSION_GREATER "1.1.1a") list(APPEND SOURCE_FILES ${CMAKE_CURRENT_SOURCE_DIR}/tls_socket_producer.cc - ${CMAKE_CURRENT_SOURCE_DIR}/tls_rtc_socket_producer.cc + # ${CMAKE_CURRENT_SOURCE_DIR}/tls_rtc_socket_producer.cc ${CMAKE_CURRENT_SOURCE_DIR}/p2psecure_socket_producer.cc ${CMAKE_CURRENT_SOURCE_DIR}/tls_socket_consumer.cc ${CMAKE_CURRENT_SOURCE_DIR}/p2psecure_socket_consumer.cc + ${CMAKE_CURRENT_SOURCE_DIR}/socket.cc ) list(APPEND HEADER_FILES ${CMAKE_CURRENT_SOURCE_DIR}/tls_socket_producer.h - ${CMAKE_CURRENT_SOURCE_DIR}/tls_rtc_socket_producer.h + # ${CMAKE_CURRENT_SOURCE_DIR}/tls_rtc_socket_producer.h ${CMAKE_CURRENT_SOURCE_DIR}/p2psecure_socket_producer.h ${CMAKE_CURRENT_SOURCE_DIR}/tls_socket_consumer.h ${CMAKE_CURRENT_SOURCE_DIR}/p2psecure_socket_consumer.h diff --git a/libtransport/src/implementation/p2psecure_socket_consumer.cc b/libtransport/src/implementation/p2psecure_socket_consumer.cc index 9b79850d6..8c7c175b2 100644 --- a/libtransport/src/implementation/p2psecure_socket_consumer.cc +++ b/libtransport/src/implementation/p2psecure_socket_consumer.cc @@ -15,7 +15,6 @@ #include <implementation/p2psecure_socket_consumer.h> #include <interfaces/tls_socket_consumer.h> - #include <openssl/bio.h> #include <openssl/ssl.h> #include <openssl/tls1.h> @@ -175,7 +174,6 @@ P2PSecureConsumerSocket::P2PSecureConsumerSocket( : ConsumerSocket(consumer, handshake_protocol), name_(), tls_consumer_(nullptr), - buf_pool_(), decrypted_content_(), payload_(), head_(), diff --git a/libtransport/src/implementation/p2psecure_socket_consumer.h b/libtransport/src/implementation/p2psecure_socket_consumer.h index d4c3b26c2..a35a50352 100644 --- a/libtransport/src/implementation/p2psecure_socket_consumer.h +++ b/libtransport/src/implementation/p2psecure_socket_consumer.h @@ -16,7 +16,6 @@ #pragma once #include <hicn/transport/interfaces/socket_consumer.h> - #include <implementation/tls_socket_consumer.h> #include <openssl/bio.h> #include <openssl/ssl.h> @@ -75,7 +74,6 @@ class P2PSecureConsumerSocket : public ConsumerSocket, BIO_METHOD *bio_meth_; /* Chain of MemBuf to be used as a temporary buffer to pass descypted data * from the underlying layer to the application */ - utils::ObjectPool<utils::MemBuf> buf_pool_; std::unique_ptr<utils::MemBuf> decrypted_content_; /* Chain of MemBuf holding the payload to be written into interest or data */ std::unique_ptr<utils::MemBuf> payload_; diff --git a/libtransport/src/implementation/p2psecure_socket_producer.cc b/libtransport/src/implementation/p2psecure_socket_producer.cc index 15c7d25cd..6dff2ba08 100644 --- a/libtransport/src/implementation/p2psecure_socket_producer.cc +++ b/libtransport/src/implementation/p2psecure_socket_producer.cc @@ -14,13 +14,11 @@ */ #include <hicn/transport/core/interest.h> - #include <implementation/p2psecure_socket_producer.h> -#include <implementation/tls_rtc_socket_producer.h> +// #include <implementation/tls_rtc_socket_producer.h> #include <implementation/tls_socket_producer.h> #include <interfaces/tls_rtc_socket_producer.h> #include <interfaces/tls_socket_producer.h> - #include <openssl/bio.h> #include <openssl/rand.h> #include <openssl/ssl.h> @@ -34,7 +32,8 @@ namespace implementation { P2PSecureProducerSocket::P2PSecureProducerSocket( interface::ProducerSocket *producer_socket) - : ProducerSocket(producer_socket), + : ProducerSocket(producer_socket, + ProductionProtocolAlgorithms::BYTE_STREAM), mtx_(), cv_(), map_producers(), @@ -42,8 +41,9 @@ P2PSecureProducerSocket::P2PSecureProducerSocket( P2PSecureProducerSocket::P2PSecureProducerSocket( interface::ProducerSocket *producer_socket, bool rtc, - const std::shared_ptr<utils::Identity> &identity) - : ProducerSocket(producer_socket), + const std::shared_ptr<auth::Identity> &identity) + : ProducerSocket(producer_socket, + ProductionProtocolAlgorithms::BYTE_STREAM), rtc_(rtc), mtx_(), cv_(), @@ -51,9 +51,9 @@ P2PSecureProducerSocket::P2PSecureProducerSocket( list_producers() { /* Setup SSL context (identity and parameter to use TLS 1.3) */ der_cert_ = parcKeyStore_GetDEREncodedCertificate( - (identity->getSigner()->getKeyStore())); + (identity->getSigner()->getParcKeyStore())); der_prk_ = parcKeyStore_GetDEREncodedPrivateKey( - (identity->getSigner()->getKeyStore())); + (identity->getSigner()->getParcKeyStore())); int cert_size = parcBuffer_Limit(der_cert_); int prk_size = parcBuffer_Limit(der_prk_); @@ -88,15 +88,20 @@ void P2PSecureProducerSocket::initSessionSocket( producer->setSocketOption(MAKE_MANIFEST, this->making_manifest_); producer->setSocketOption(DATA_PACKET_SIZE, (uint32_t)(this->data_packet_size_)); - producer->output_buffer_.setLimit(this->output_buffer_.getLimit()); + uint32_t output_buffer_size = 0; + this->getSocketOption(GeneralTransportOptions::OUTPUT_BUFFER_SIZE, + output_buffer_size); + producer->setSocketOption(GeneralTransportOptions::OUTPUT_BUFFER_SIZE, + output_buffer_size); if (!rtc_) { producer->setInterface(new interface::TLSProducerSocket(producer.get())); } else { - TLSRTCProducerSocket *rtc_producer = - dynamic_cast<TLSRTCProducerSocket *>(producer.get()); - rtc_producer->setInterface( - new interface::TLSRTCProducerSocket(rtc_producer)); + // TODO + // TLSRTCProducerSocket *rtc_producer = + // dynamic_cast<TLSRTCProducerSocket *>(producer.get()); + // rtc_producer->setInterface( + // new interface::TLSRTCProducerSocket(rtc_producer)); } } @@ -114,8 +119,9 @@ void P2PSecureProducerSocket::onInterestCallback(interface::ProducerSocket &p, tls_producer = std::make_unique<TLSProducerSocket>(nullptr, this, interest.getName()); } else { - tls_producer = std::make_unique<TLSRTCProducerSocket>(nullptr, this, - interest.getName()); + // TODO + // tls_producer = std::make_unique<TLSRTCProducerSocket>(nullptr, this, + // interest.getName()); } initSessionSocket(tls_producer); @@ -129,15 +135,19 @@ void P2PSecureProducerSocket::onInterestCallback(interface::ProducerSocket &p, tls_producer_ptr->onInterest(*tls_producer_ptr, interest); tls_producer_ptr->async_accept(); } else { - TLSRTCProducerSocket *rtc_producer_ptr = - dynamic_cast<TLSRTCProducerSocket *>(tls_producer_ptr); - rtc_producer_ptr->onInterest(*rtc_producer_ptr, interest); - rtc_producer_ptr->async_accept(); + // TODO + // TLSRTCProducerSocket *rtc_producer_ptr = + // dynamic_cast<TLSRTCProducerSocket *>(tls_producer_ptr); + // rtc_producer_ptr->onInterest(*rtc_producer_ptr, interest); + // rtc_producer_ptr->async_accept(); } } -void P2PSecureProducerSocket::produce(const uint8_t *buffer, - size_t buffer_size) { +uint32_t P2PSecureProducerSocket::produceDatagram( + const Name &content_name, std::unique_ptr<utils::MemBuf> &&buffer) { + // TODO + throw errors::NotImplementedException(); + if (!rtc_) { throw errors::RuntimeException( "RTC must be the transport protocol to start the production of current " @@ -148,16 +158,20 @@ void P2PSecureProducerSocket::produce(const uint8_t *buffer, if (list_producers.empty()) cv_.wait(lck); - for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) { - TLSRTCProducerSocket *rtc_producer = - dynamic_cast<TLSRTCProducerSocket *>(it->get()); - rtc_producer->produce(utils::MemBuf::copyBuffer(buffer, buffer_size)); - } + // TODO + // for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) + // { + // TLSRTCProducerSocket *rtc_producer = + // dynamic_cast<TLSRTCProducerSocket *>(it->get()); + // rtc_producer->produce(utils::MemBuf::copyBuffer(buffer, buffer_size)); + // } + + return 0; } -uint32_t P2PSecureProducerSocket::produce( - Name content_name, std::unique_ptr<utils::MemBuf> &&buffer, bool is_last, - uint32_t start_offset) { +uint32_t P2PSecureProducerSocket::produceStream( + const Name &content_name, std::unique_ptr<utils::MemBuf> &&buffer, + bool is_last, uint32_t start_offset) { if (rtc_) { throw errors::RuntimeException( "RTC transport protocol is not compatible with the production of " @@ -170,16 +184,17 @@ uint32_t P2PSecureProducerSocket::produce( if (list_producers.empty()) cv_.wait(lck); for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) - segments += - (*it)->produce(content_name, buffer->clone(), is_last, start_offset); + segments += (*it)->produceStream(content_name, buffer->clone(), is_last, + start_offset); return segments; } -uint32_t P2PSecureProducerSocket::produce(Name content_name, - const uint8_t *buffer, - size_t buffer_size, bool is_last, - uint32_t start_offset) { +uint32_t P2PSecureProducerSocket::produceStream(const Name &content_name, + const uint8_t *buffer, + size_t buffer_size, + bool is_last, + uint32_t start_offset) { if (rtc_) { throw errors::RuntimeException( "RTC transport protocol is not compatible with the production of " @@ -191,29 +206,31 @@ uint32_t P2PSecureProducerSocket::produce(Name content_name, if (list_producers.empty()) cv_.wait(lck); for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) - segments += (*it)->produce(content_name, buffer, buffer_size, is_last, - start_offset); + segments += (*it)->produceStream(content_name, buffer, buffer_size, is_last, + start_offset); return segments; } -void P2PSecureProducerSocket::asyncProduce(const Name &content_name, - const uint8_t *buf, - size_t buffer_size, bool is_last, - uint32_t *start_offset) { - if (rtc_) { - throw errors::RuntimeException( - "RTC transport protocol is not compatible with the production of " - "current data. Aborting."); - } - - std::unique_lock<std::mutex> lck(mtx_); - if (list_producers.empty()) cv_.wait(lck); - - for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) { - (*it)->asyncProduce(content_name, buf, buffer_size, is_last, start_offset); - } -} +// void P2PSecureProducerSocket::asyncProduce(const Name &content_name, +// const uint8_t *buf, +// size_t buffer_size, bool is_last, +// uint32_t *start_offset) { +// if (rtc_) { +// throw errors::RuntimeException( +// "RTC transport protocol is not compatible with the production of " +// "current data. Aborting."); +// } + +// std::unique_lock<std::mutex> lck(mtx_); +// if (list_producers.empty()) cv_.wait(lck); + +// for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) +// { +// (*it)->asyncProduce(content_name, buf, buffer_size, is_last, +// start_offset); +// } +// } void P2PSecureProducerSocket::asyncProduce( Name content_name, std::unique_ptr<utils::MemBuf> &&buffer, bool is_last, @@ -269,7 +286,7 @@ int P2PSecureProducerSocket::setSocketOption( int P2PSecureProducerSocket::setSocketOption( int socket_option_key, - const std::shared_ptr<utils::Signer> &socket_option_value) { + const std::shared_ptr<auth::Signer> &socket_option_value) { if (!list_producers.empty()) for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) (*it)->setSocketOption(socket_option_key, socket_option_value); @@ -323,16 +340,6 @@ int P2PSecureProducerSocket::setSocketOption(int socket_option_key, } int P2PSecureProducerSocket::setSocketOption( - int socket_option_key, std::list<Prefix> socket_option_value) { - if (!list_producers.empty()) - for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) - (*it)->setSocketOption(socket_option_key, socket_option_value); - - return ProducerSocket::setSocketOption(socket_option_key, - socket_option_value); -} - -int P2PSecureProducerSocket::setSocketOption( int socket_option_key, ProducerContentObjectCallback socket_option_value) { if (!list_producers.empty()) for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) @@ -361,17 +368,7 @@ int P2PSecureProducerSocket::setSocketOption( } int P2PSecureProducerSocket::setSocketOption( - int socket_option_key, utils::CryptoHashType socket_option_value) { - if (!list_producers.empty()) - for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) - (*it)->setSocketOption(socket_option_key, socket_option_value); - - return ProducerSocket::setSocketOption(socket_option_key, - socket_option_value); -} - -int P2PSecureProducerSocket::setSocketOption( - int socket_option_key, utils::CryptoSuite socket_option_value) { + int socket_option_key, auth::CryptoHashType socket_option_value) { if (!list_producers.empty()) for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) (*it)->setSocketOption(socket_option_key, socket_option_value); diff --git a/libtransport/src/implementation/p2psecure_socket_producer.h b/libtransport/src/implementation/p2psecure_socket_producer.h index bfc9fc2c1..b7c3d1958 100644 --- a/libtransport/src/implementation/p2psecure_socket_producer.h +++ b/libtransport/src/implementation/p2psecure_socket_producer.h @@ -15,15 +15,14 @@ #pragma once -#include <hicn/transport/security/identity.h> -#include <hicn/transport/security/signer.h> - +#include <hicn/transport/auth/identity.h> +#include <hicn/transport/auth/signer.h> #include <implementation/socket_producer.h> -#include <implementation/tls_rtc_socket_producer.h> +// #include <implementation/tls_rtc_socket_producer.h> #include <implementation/tls_socket_producer.h> +#include <openssl/ssl.h> #include <utils/content_store.h> -#include <openssl/ssl.h> #include <condition_variable> #include <forward_list> #include <mutex> @@ -33,39 +32,40 @@ namespace implementation { class P2PSecureProducerSocket : public ProducerSocket { friend class TLSProducerSocket; - friend class TLSRTCProducerSocket; + // TODO + // friend class TLSRTCProducerSocket; public: explicit P2PSecureProducerSocket(interface::ProducerSocket *producer_socket); explicit P2PSecureProducerSocket( interface::ProducerSocket *producer_socket, bool rtc, - const std::shared_ptr<utils::Identity> &identity); + const std::shared_ptr<auth::Identity> &identity); ~P2PSecureProducerSocket(); - void produce(const uint8_t *buffer, size_t buffer_size) override; + uint32_t produceDatagram(const Name &content_name, + std::unique_ptr<utils::MemBuf> &&buffer) override; - uint32_t produce(Name content_name, const uint8_t *buffer, size_t buffer_size, - bool is_last = true, uint32_t start_offset = 0) override; + uint32_t produceStream(const Name &content_name, const uint8_t *buffer, + size_t buffer_size, bool is_last = true, + uint32_t start_offset = 0) override; - uint32_t produce(Name content_name, std::unique_ptr<utils::MemBuf> &&buffer, - bool is_last = true, uint32_t start_offset = 0) override; + uint32_t produceStream(const Name &content_name, + std::unique_ptr<utils::MemBuf> &&buffer, + bool is_last = true, + uint32_t start_offset = 0) override; void asyncProduce(Name content_name, std::unique_ptr<utils::MemBuf> &&buffer, bool is_last, uint32_t offset, uint32_t **last_segment = nullptr) override; - void asyncProduce(const Name &suffix, const uint8_t *buf, size_t buffer_size, - bool is_last = true, - uint32_t *start_offset = nullptr) override; - int setSocketOption(int socket_option_key, ProducerInterestCallback socket_option_value) override; int setSocketOption( int socket_option_key, - const std::shared_ptr<utils::Signer> &socket_option_value) override; + const std::shared_ptr<auth::Signer> &socket_option_value) override; int setSocketOption(int socket_option_key, uint32_t socket_option_value) override; @@ -75,9 +75,6 @@ class P2PSecureProducerSocket : public ProducerSocket { int setSocketOption(int socket_option_key, Name *socket_option_value) override; - int setSocketOption(int socket_option_key, - std::list<Prefix> socket_option_value) override; - int setSocketOption( int socket_option_key, ProducerContentObjectCallback socket_option_value) override; @@ -86,16 +83,13 @@ class P2PSecureProducerSocket : public ProducerSocket { ProducerContentCallback socket_option_value) override; int setSocketOption(int socket_option_key, - utils::CryptoHashType socket_option_value) override; - - int setSocketOption(int socket_option_key, - utils::CryptoSuite socket_option_value) override; + auth::CryptoHashType socket_option_value) override; int setSocketOption(int socket_option_key, const std::string &socket_option_value) override; using ProducerSocket::getSocketOption; - using ProducerSocket::onInterest; + // using ProducerSocket::onInterest; protected: /* Callback invoked once an interest has been received and its payload diff --git a/libtransport/src/implementation/socket.cc b/libtransport/src/implementation/socket.cc new file mode 100644 index 000000000..2e21f2bc3 --- /dev/null +++ b/libtransport/src/implementation/socket.cc @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <core/global_configuration.h> +#include <implementation/socket.h> + +namespace transport { +namespace implementation { + +Socket::Socket(std::shared_ptr<core::Portal> &&portal) + : portal_(std::move(portal)), is_async_(false) {} + +} // namespace implementation +} // namespace transport
\ No newline at end of file diff --git a/libtransport/src/implementation/socket.h b/libtransport/src/implementation/socket.h index 2e51f3027..cf22c03e1 100644 --- a/libtransport/src/implementation/socket.h +++ b/libtransport/src/implementation/socket.h @@ -15,13 +15,12 @@ #pragma once +#include <core/facade.h> #include <hicn/transport/config.h> #include <hicn/transport/interfaces/callbacks.h> #include <hicn/transport/interfaces/socket_options_default_values.h> #include <hicn/transport/interfaces/socket_options_keys.h> -#include <core/facade.h> - #define SOCKET_OPTION_GET 0 #define SOCKET_OPTION_NOT_GET 1 #define SOCKET_OPTION_SET 2 @@ -32,56 +31,23 @@ namespace transport { namespace implementation { // Forward Declarations -template <typename PortalType> class Socket; -// Define the portal and its connector, depending on the compilation options -// passed by the build tool. -using HicnForwarderPortal = core::HicnForwarderPortal; - -#ifdef __linux__ -#ifndef __ANDROID__ -using RawSocketPortal = core::RawSocketPortal; -#endif -#endif - -#ifdef __vpp__ -using VPPForwarderPortal = core::VPPForwarderPortal; -using BaseSocket = Socket<VPPForwarderPortal>; -using BasePortal = VPPForwarderPortal; -#else -using BaseSocket = Socket<HicnForwarderPortal>; -using BasePortal = HicnForwarderPortal; -#endif - -template <typename PortalType> class Socket { - static_assert(std::is_same<PortalType, HicnForwarderPortal>::value -#ifdef __linux__ -#ifndef __ANDROID__ - || std::is_same<PortalType, RawSocketPortal>::value -#ifdef __vpp__ - || std::is_same<PortalType, VPPForwarderPortal>::value -#endif -#endif - , -#else - , - -#endif - "This class is not allowed as Portal"); - public: - using Portal = PortalType; - - virtual asio::io_service &getIoService() = 0; - virtual void connect() = 0; - virtual bool isRunning() = 0; + virtual asio::io_service &getIoService() { return portal_->getIoService(); } + protected: + Socket(std::shared_ptr<core::Portal> &&portal); + virtual ~Socket(){}; + + protected: + std::shared_ptr<core::Portal> portal_; + bool is_async_; }; } // namespace implementation diff --git a/libtransport/src/implementation/socket_consumer.h b/libtransport/src/implementation/socket_consumer.h index 87965923e..a7b6ac4e7 100644 --- a/libtransport/src/implementation/socket_consumer.h +++ b/libtransport/src/implementation/socket_consumer.h @@ -13,15 +13,17 @@ * limitations under the License. */ +#pragma once + #include <hicn/transport/interfaces/socket_consumer.h> #include <hicn/transport/interfaces/socket_options_default_values.h> #include <hicn/transport/interfaces/statistics.h> -#include <hicn/transport/security/verifier.h> +#include <hicn/transport/auth/verifier.h> #include <hicn/transport/utils/event_thread.h> #include <protocols/cbr.h> -#include <protocols/protocol.h> #include <protocols/raaqm.h> -#include <protocols/rtc.h> +#include <protocols/rtc/rtc.h> +#include <protocols/transport_protocol.h> namespace transport { namespace implementation { @@ -30,12 +32,12 @@ using namespace core; using namespace interface; using ReadCallback = interface::ConsumerSocket::ReadCallback; -class ConsumerSocket : public Socket<BasePortal> { +class ConsumerSocket : public Socket { private: ConsumerSocket(interface::ConsumerSocket *consumer, int protocol, - std::shared_ptr<Portal> &&portal) - : consumer_interface_(consumer), - portal_(portal), + std::shared_ptr<core::Portal> &&portal) + : Socket(std::move(portal)), + consumer_interface_(consumer), async_downloader_(), interest_lifetime_(default_values::interest_lifetime), min_window_size_(default_values::min_window_size), @@ -54,16 +56,13 @@ class ConsumerSocket : public Socket<BasePortal> { rate_estimation_observer_(nullptr), rate_estimation_batching_parameter_(default_values::batch), rate_estimation_choice_(0), - is_async_(false), - verifier_(std::make_shared<utils::Verifier>()), + verifier_(std::make_shared<auth::VoidVerifier>()), verify_signature_(false), - key_content_(false), reset_window_(false), on_interest_output_(VOID_HANDLER), on_interest_timeout_(VOID_HANDLER), on_interest_satisfied_(VOID_HANDLER), on_content_object_input_(VOID_HANDLER), - on_content_object_verification_(VOID_HANDLER), stats_summary_(VOID_HANDLER), read_callback_(nullptr), timer_interval_milliseconds_(0), @@ -75,7 +74,7 @@ class ConsumerSocket : public Socket<BasePortal> { break; case TransportProtocolAlgorithms::RTC: transport_protocol_ = - std::make_unique<protocol::RTCTransportProtocol>(this); + std::make_unique<protocol::rtc::RTCTransportProtocol>(this); break; case TransportProtocolAlgorithms::RAAQM: default: @@ -87,12 +86,12 @@ class ConsumerSocket : public Socket<BasePortal> { public: ConsumerSocket(interface::ConsumerSocket *consumer, int protocol) - : ConsumerSocket(consumer, protocol, std::make_shared<Portal>()) {} + : ConsumerSocket(consumer, protocol, std::make_shared<core::Portal>()) {} ConsumerSocket(interface::ConsumerSocket *consumer, int protocol, asio::io_service &io_service) : ConsumerSocket(consumer, protocol, - std::make_shared<Portal>(io_service)) { + std::make_shared<core::Portal>(io_service)) { is_async_ = true; } @@ -138,8 +137,6 @@ class ConsumerSocket : public Socket<BasePortal> { return CONSUMER_RUNNING; } - bool verifyKeyPackets() { return transport_protocol_->verifyKeyPackets(); } - void stop() { if (transport_protocol_->isRunning()) { transport_protocol_->stop(); @@ -152,8 +149,6 @@ class ConsumerSocket : public Socket<BasePortal> { } } - asio::io_service &getIoService() { return portal_->getIoService(); } - virtual int setSocketOption(int socket_option_key, ReadCallback *socket_option_value) { // Reschedule the function on the io_service to avoid race condition in @@ -316,12 +311,6 @@ class ConsumerSocket : public Socket<BasePortal> { break; } - case ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY: - if (socket_option_value == VOID_HANDLER) { - on_content_object_verification_ = VOID_HANDLER; - break; - } - default: return SOCKET_OPTION_NOT_SET; } @@ -334,16 +323,6 @@ class ConsumerSocket : public Socket<BasePortal> { int result = SOCKET_OPTION_NOT_SET; if (!transport_protocol_->isRunning()) { switch (socket_option_key) { - case GeneralTransportOptions::VERIFY_SIGNATURE: - verify_signature_ = socket_option_value; - result = SOCKET_OPTION_SET; - break; - - case GeneralTransportOptions::KEY_CONTENT: - key_content_ = socket_option_value; - result = SOCKET_OPTION_SET; - break; - case RaaqmTransportOptions::PER_SESSION_CWINDOW_RESET: reset_window_ = socket_option_value; result = SOCKET_OPTION_SET; @@ -377,29 +356,6 @@ class ConsumerSocket : public Socket<BasePortal> { }); } - int setSocketOption( - int socket_option_key, - ConsumerContentObjectVerificationCallback socket_option_value) { - // Reschedule the function on the io_service to avoid race condition in - // case setSocketOption is called while the io_service is running. - return rescheduleOnIOService( - socket_option_key, socket_option_value, - [this](int socket_option_key, - ConsumerContentObjectVerificationCallback socket_option_value) - -> int { - switch (socket_option_key) { - case ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY: - on_content_object_verification_ = socket_option_value; - break; - - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - }); - } - int setSocketOption(int socket_option_key, ConsumerInterestCallback socket_option_value) { // Reschedule the function on the io_service to avoid race condition in @@ -433,51 +389,6 @@ class ConsumerSocket : public Socket<BasePortal> { }); } - int setSocketOption( - int socket_option_key, - ConsumerContentObjectVerificationFailedCallback socket_option_value) { - return rescheduleOnIOService( - socket_option_key, socket_option_value, - [this]( - int socket_option_key, - ConsumerContentObjectVerificationFailedCallback socket_option_value) - -> int { - switch (socket_option_key) { - case ConsumerCallbacksOptions::VERIFICATION_FAILED: - verification_failed_callback_ = socket_option_value; - break; - - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - }); - } - - // int setSocketOption( - // int socket_option_key, - // ConsumerContentObjectVerificationFailedCallback socket_option_value) { - // return rescheduleOnIOService( - // socket_option_key, socket_option_value, - // [this]( - // int socket_option_key, - // ConsumerContentObjectVerificationFailedCallback - // socket_option_value) - // -> int { - // switch (socket_option_key) { - // case ConsumerCallbacksOptions::VERIFICATION_FAILED: - // verification_failed_callback_ = socket_option_value; - // break; - - // default: - // return SOCKET_OPTION_NOT_SET; - // } - - // return SOCKET_OPTION_SET; - // }); - // } - int setSocketOption(int socket_option_key, IcnObserver *socket_option_value) { utils::SpinLock::Acquire locked(guard_raaqm_params_); switch (socket_option_key) { @@ -494,7 +405,7 @@ class ConsumerSocket : public Socket<BasePortal> { int setSocketOption( int socket_option_key, - const std::shared_ptr<utils::Verifier> &socket_option_value) { + const std::shared_ptr<auth::Verifier> &socket_option_value) { int result = SOCKET_OPTION_NOT_SET; if (!transport_protocol_->isRunning()) { switch (socket_option_key) { @@ -516,14 +427,6 @@ class ConsumerSocket : public Socket<BasePortal> { int result = SOCKET_OPTION_NOT_SET; if (!transport_protocol_->isRunning()) { switch (socket_option_key) { - case GeneralTransportOptions::CERTIFICATE: - key_id_ = verifier_->addKeyFromCertificate(socket_option_value); - - if (key_id_ != nullptr) { - result = SOCKET_OPTION_SET; - } - break; - case DataLinkOptions::OUTPUT_INTERFACE: output_interface_ = socket_option_value; portal_->setOutputInterface(output_interface_); @@ -642,14 +545,6 @@ class ConsumerSocket : public Socket<BasePortal> { socket_option_value = transport_protocol_->isRunning(); break; - case GeneralTransportOptions::VERIFY_SIGNATURE: - socket_option_value = verify_signature_; - break; - - case GeneralTransportOptions::KEY_CONTENT: - socket_option_value = key_content_; - break; - case GeneralTransportOptions::ASYNC_MODE: socket_option_value = is_async_; break; @@ -699,29 +594,6 @@ class ConsumerSocket : public Socket<BasePortal> { }); } - int getSocketOption( - int socket_option_key, - ConsumerContentObjectVerificationCallback **socket_option_value) { - // Reschedule the function on the io_service to avoid race condition in - // case setSocketOption is called while the io_service is running. - return rescheduleOnIOService( - socket_option_key, socket_option_value, - [this](int socket_option_key, - ConsumerContentObjectVerificationCallback **socket_option_value) - -> int { - switch (socket_option_key) { - case ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY: - *socket_option_value = &on_content_object_verification_; - break; - - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - }); - } - int getSocketOption(int socket_option_key, ConsumerInterestCallback **socket_option_value) { // Reschedule the function on the io_service to avoid race condition in @@ -755,30 +627,8 @@ class ConsumerSocket : public Socket<BasePortal> { }); } - int getSocketOption( - int socket_option_key, - ConsumerContentObjectVerificationFailedCallback **socket_option_value) { - // Reschedule the function on the io_service to avoid race condition in - // case setSocketOption is called while the io_service is running. - return rescheduleOnIOService( - socket_option_key, socket_option_value, - [this](int socket_option_key, - ConsumerContentObjectVerificationFailedCallback * - *socket_option_value) -> int { - switch (socket_option_key) { - case ConsumerCallbacksOptions::VERIFICATION_FAILED: - *socket_option_value = &verification_failed_callback_; - break; - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - }); - } - int getSocketOption(int socket_option_key, - std::shared_ptr<Portal> &socket_option_value) { + std::shared_ptr<core::Portal> &socket_option_value) { switch (socket_option_key) { case PORTAL: socket_option_value = portal_; @@ -807,7 +657,7 @@ class ConsumerSocket : public Socket<BasePortal> { } int getSocketOption(int socket_option_key, - std::shared_ptr<utils::Verifier> &socket_option_value) { + std::shared_ptr<auth::Verifier> &socket_option_value) { switch (socket_option_key) { case GeneralTransportOptions::VERIFIER: socket_option_value = verifier_; @@ -871,7 +721,7 @@ class ConsumerSocket : public Socket<BasePortal> { // To enforce type check std::function<int(int, arg2)> func = lambda_func; int result = SOCKET_OPTION_SET; - if (transport_protocol_->isRunning()) { + if (transport_protocol_ && transport_protocol_->isRunning()) { std::mutex mtx; /* Condition variable for the wait */ std::condition_variable cv; @@ -898,7 +748,6 @@ class ConsumerSocket : public Socket<BasePortal> { protected: interface::ConsumerSocket *consumer_interface_; - std::shared_ptr<Portal> portal_; utils::EventThread async_downloader_; // No need to protect from multiple accesses in the async consumer @@ -926,13 +775,10 @@ class ConsumerSocket : public Socket<BasePortal> { int rate_estimation_batching_parameter_; int rate_estimation_choice_; - bool is_async_; - // Verification parameters - std::shared_ptr<utils::Verifier> verifier_; + std::shared_ptr<auth::Verifier> verifier_; PARCKeyId *key_id_; std::atomic_bool verify_signature_; - bool key_content_; bool reset_window_; ConsumerInterestCallback on_interest_retransmission_; @@ -940,9 +786,7 @@ class ConsumerSocket : public Socket<BasePortal> { ConsumerInterestCallback on_interest_timeout_; ConsumerInterestCallback on_interest_satisfied_; ConsumerContentObjectCallback on_content_object_input_; - ConsumerContentObjectVerificationCallback on_content_object_verification_; ConsumerTimerCallback stats_summary_; - ConsumerContentObjectVerificationFailedCallback verification_failed_callback_; ReadCallback *read_callback_; @@ -959,4 +803,4 @@ class ConsumerSocket : public Socket<BasePortal> { }; } // namespace implementation -} // namespace transport
\ No newline at end of file +} // namespace transport diff --git a/libtransport/src/implementation/socket_producer.h b/libtransport/src/implementation/socket_producer.h index a6f0f969e..af69cd818 100644 --- a/libtransport/src/implementation/socket_producer.h +++ b/libtransport/src/implementation/socket_producer.h @@ -15,9 +15,11 @@ #pragma once -#include <hicn/transport/security/signer.h> +#include <hicn/transport/auth/signer.h> #include <hicn/transport/utils/event_thread.h> #include <implementation/socket.h> +#include <protocols/prod_protocol_bytestream.h> +#include <protocols/prod_protocol_rtc.h> #include <utils/content_store.h> #include <utils/suffix_strategy.h> @@ -39,21 +41,17 @@ namespace implementation { using namespace core; using namespace interface; -class ProducerSocket : public Socket<BasePortal>, - public BasePortal::ProducerCallback { - static constexpr uint32_t burst_size = 256; - - public: - explicit ProducerSocket(interface::ProducerSocket *producer_socket) - : producer_interface_(producer_socket), - portal_(std::make_shared<Portal>(io_service_)), +class ProducerSocket : public Socket { + private: + ProducerSocket(interface::ProducerSocket *producer_socket, int protocol, + std::shared_ptr<core::Portal> &&portal) + : Socket(std::move(portal)), + producer_interface_(producer_socket), data_packet_size_(default_values::content_object_packet_size), content_object_expiry_time_(default_values::content_object_expiry_time), - output_buffer_(default_values::producer_socket_output_buffer_size), async_thread_(), - registration_status_(REGISTRATION_NOT_ATTEMPTED), making_manifest_(false), - hash_algorithm_(utils::CryptoHashType::SHA_256), + hash_algorithm_(auth::CryptoHashType::SHA_256), suffix_strategy_(core::NextSegmentCalculationStrategy::INCREMENTAL), on_interest_input_(VOID_HANDLER), on_interest_dropped_input_buffer_(VOID_HANDLER), @@ -65,15 +63,33 @@ class ProducerSocket : public Socket<BasePortal>, on_content_object_in_output_buffer_(VOID_HANDLER), on_content_object_output_(VOID_HANDLER), on_content_object_evicted_from_output_buffer_(VOID_HANDLER), - on_content_produced_(VOID_HANDLER) {} - - virtual ~ProducerSocket() { - stop(); - if (listening_thread_.joinable()) { - listening_thread_.join(); + on_content_produced_(VOID_HANDLER) { + switch (protocol) { + case ProductionProtocolAlgorithms::RTC_PROD: + production_protocol_ = + std::make_unique<protocol::RTCProductionProtocol>(this); + break; + case ProductionProtocolAlgorithms::BYTE_STREAM: + default: + production_protocol_ = + std::make_unique<protocol::ByteStreamProductionProtocol>(this); + break; } } + public: + ProducerSocket(interface::ProducerSocket *producer, int protocol) + : ProducerSocket(producer, protocol, std::make_shared<core::Portal>()) {} + + ProducerSocket(interface::ProducerSocket *producer, int protocol, + asio::io_service &io_service) + : ProducerSocket(producer, protocol, + std::make_shared<core::Portal>(io_service)) { + is_async_ = true; + } + + virtual ~ProducerSocket() {} + interface::ProducerSocket *getInterface() { return producer_interface_; } @@ -84,296 +100,10 @@ class ProducerSocket : public Socket<BasePortal>, void connect() override { portal_->connect(false); - listening_thread_ = std::thread(std::bind(&ProducerSocket::listen, this)); + production_protocol_->start(); } - bool isRunning() override { return !io_service_.stopped(); }; - - virtual uint32_t produce(Name content_name, const uint8_t *buffer, - size_t buffer_size, bool is_last = true, - uint32_t start_offset = 0) { - return ProducerSocket::produce( - content_name, utils::MemBuf::copyBuffer(buffer, buffer_size), is_last, - start_offset); - } - - virtual uint32_t produce(Name content_name, - std::unique_ptr<utils::MemBuf> &&buffer, - bool is_last = true, uint32_t start_offset = 0) { - if (TRANSPORT_EXPECT_FALSE(buffer->length() == 0)) { - return 0; - } - - // Copy the atomic variables to ensure they keep the same value - // during the production - std::size_t data_packet_size = data_packet_size_; - uint32_t content_object_expiry_time = content_object_expiry_time_; - utils::CryptoHashType hash_algo = hash_algorithm_; - bool making_manifest = making_manifest_; - auto suffix_strategy = utils::SuffixStrategyFactory::getSuffixStrategy( - suffix_strategy_, start_offset); - std::shared_ptr<utils::Signer> signer; - getSocketOption(GeneralTransportOptions::SIGNER, signer); - - auto buffer_size = buffer->length(); - int bytes_segmented = 0; - std::size_t header_size; - std::size_t manifest_header_size = 0; - std::size_t signature_length = 0; - std::uint32_t final_block_number = start_offset; - uint64_t free_space_for_content = 0; - - core::Packet::Format format; - std::shared_ptr<ContentObjectManifest> manifest; - bool is_last_manifest = false; - - // TODO Manifest may still be used for indexing - if (making_manifest && !signer) { - TRANSPORT_LOGE("Making manifests without setting producer identity."); - } - - core::Packet::Format hf_format = core::Packet::Format::HF_UNSPEC; - core::Packet::Format hf_format_ah = core::Packet::Format::HF_UNSPEC; - if (content_name.getType() == HNT_CONTIGUOUS_V4 || - content_name.getType() == HNT_IOV_V4) { - hf_format = core::Packet::Format::HF_INET_TCP; - hf_format_ah = core::Packet::Format::HF_INET_TCP_AH; - } else if (content_name.getType() == HNT_CONTIGUOUS_V6 || - content_name.getType() == HNT_IOV_V6) { - hf_format = core::Packet::Format::HF_INET6_TCP; - hf_format_ah = core::Packet::Format::HF_INET6_TCP_AH; - } else { - throw errors::RuntimeException("Unknown name format."); - } - - format = hf_format; - if (making_manifest) { - manifest_header_size = core::Packet::getHeaderSizeFromFormat( - signer ? hf_format_ah : hf_format, - signer ? signer->getSignatureLength() : 0); - } else if (signer) { - format = hf_format_ah; - signature_length = signer->getSignatureLength(); - } - - header_size = - core::Packet::getHeaderSizeFromFormat(format, signature_length); - free_space_for_content = data_packet_size - header_size; - uint32_t number_of_segments = uint32_t( - std::ceil(double(buffer_size) / double(free_space_for_content))); - if (free_space_for_content * number_of_segments < buffer_size) { - number_of_segments++; - } - - // TODO allocate space for all the headers - if (making_manifest) { - uint32_t segment_in_manifest = static_cast<uint32_t>( - std::floor(double(data_packet_size - manifest_header_size - - ContentObjectManifest::getManifestHeaderSize()) / - ContentObjectManifest::getManifestEntrySize()) - - 1.0); - uint32_t number_of_manifests = static_cast<uint32_t>( - std::ceil(float(number_of_segments) / segment_in_manifest)); - final_block_number += number_of_segments + number_of_manifests - 1; - - manifest.reset(ContentObjectManifest::createManifest( - content_name.setSuffix(suffix_strategy->getNextManifestSuffix()), - core::ManifestVersion::VERSION_1, core::ManifestType::INLINE_MANIFEST, - hash_algo, is_last_manifest, content_name, suffix_strategy_, - signer ? signer->getSignatureLength() : 0)); - manifest->setLifetime(content_object_expiry_time); - - if (is_last) { - manifest->setFinalBlockNumber(final_block_number); - } else { - manifest->setFinalBlockNumber(utils::SuffixStrategy::INVALID_SUFFIX); - } - } - - for (unsigned int packaged_segments = 0; - packaged_segments < number_of_segments; packaged_segments++) { - if (making_manifest) { - if (manifest->estimateManifestSize(2) > - data_packet_size - manifest_header_size) { - // Send the current manifest - manifest->encode(); - - // If identity set, sign manifest - if (signer) { - signer->sign(*manifest); - } - - passContentObjectToCallbacks(manifest); - TRANSPORT_LOGD("Send manifest %s", - manifest->getName().toString().c_str()); - - // Send content objects stored in the queue - while (!content_queue_.empty()) { - passContentObjectToCallbacks(content_queue_.front()); - TRANSPORT_LOGD( - "Send content %s", - content_queue_.front()->getName().toString().c_str()); - content_queue_.pop(); - } - - // Create new manifest. The reference to the last manifest has been - // acquired in the passContentObjectToCallbacks function, so we can - // safely release this reference - manifest.reset(ContentObjectManifest::createManifest( - content_name.setSuffix(suffix_strategy->getNextManifestSuffix()), - core::ManifestVersion::VERSION_1, - core::ManifestType::INLINE_MANIFEST, hash_algo, is_last_manifest, - content_name, suffix_strategy_, - signer ? signer->getSignatureLength() : 0)); - - manifest->setLifetime(content_object_expiry_time); - manifest->setFinalBlockNumber( - is_last ? final_block_number - : utils::SuffixStrategy::INVALID_SUFFIX); - } - } - - auto content_suffix = suffix_strategy->getNextContentSuffix(); - auto content_object = std::make_shared<ContentObject>( - content_name.setSuffix(content_suffix), format); - content_object->setLifetime(content_object_expiry_time); - - auto b = buffer->cloneOne(); - b->trimStart(free_space_for_content * packaged_segments); - b->trimEnd(b->length()); - - if (TRANSPORT_EXPECT_FALSE(packaged_segments == number_of_segments - 1)) { - b->append(buffer_size - bytes_segmented); - bytes_segmented += (int)(buffer_size - bytes_segmented); - - if (is_last && making_manifest) { - is_last_manifest = true; - } else if (is_last) { - content_object->setRst(); - } - - } else { - b->append(free_space_for_content); - bytes_segmented += (int)(free_space_for_content); - } - - content_object->appendPayload(std::move(b)); - - if (making_manifest) { - using namespace std::chrono_literals; - utils::CryptoHash hash = content_object->computeDigest(hash_algo); - manifest->addSuffixHash(content_suffix, hash); - content_queue_.push(content_object); - } else { - if (signer) { - signer->sign(*content_object); - } - passContentObjectToCallbacks(content_object); - TRANSPORT_LOGD("Send content %s", - content_object->getName().toString().c_str()); - } - } - - if (making_manifest) { - if (is_last_manifest) { - manifest->setFinalManifest(is_last_manifest); - } - - manifest->encode(); - if (signer) { - signer->sign(*manifest); - } - - passContentObjectToCallbacks(manifest); - TRANSPORT_LOGD("Send manifest %s", - manifest->getName().toString().c_str()); - - while (!content_queue_.empty()) { - passContentObjectToCallbacks(content_queue_.front()); - TRANSPORT_LOGD("Send content %s", - content_queue_.front()->getName().toString().c_str()); - content_queue_.pop(); - } - } - - io_service_.post([this]() { - std::shared_ptr<ContentObject> co; - while (object_queue_for_callbacks_.pop(co)) { - if (on_new_segment_) { - on_new_segment_(*producer_interface_, *co); - } - - if (on_content_object_to_sign_) { - on_content_object_to_sign_(*producer_interface_, *co); - } - - if (on_content_object_in_output_buffer_) { - on_content_object_in_output_buffer_(*producer_interface_, *co); - } - - if (on_content_object_output_) { - on_content_object_output_(*producer_interface_, *co); - } - } - }); - - io_service_.dispatch([this, buffer_size]() { - if (on_content_produced_) { - on_content_produced_(*producer_interface_, - std::make_error_code(std::errc(0)), buffer_size); - } - }); - - return suffix_strategy->getTotalCount(); - } - - virtual void produce(ContentObject &content_object) { - io_service_.dispatch([this, &content_object]() { - if (on_content_object_in_output_buffer_) { - on_content_object_in_output_buffer_(*producer_interface_, - content_object); - } - }); - - output_buffer_.insert(std::static_pointer_cast<ContentObject>( - content_object.shared_from_this())); - - io_service_.dispatch([this, &content_object]() { - if (on_content_object_output_) { - on_content_object_output_(*producer_interface_, content_object); - } - }); - - portal_->sendContentObject(content_object); - } - - virtual void produce(const uint8_t *buffer, size_t buffer_size) { - produce(utils::MemBuf::copyBuffer(buffer, buffer_size)); - } - - virtual void produce(std::unique_ptr<utils::MemBuf> &&buffer) { - // This API is meant to be used just with the RTC producer. - // Here it cannot be used since no name for the content is specified. - throw errors::NotImplementedException(); - } - - virtual void asyncProduce(const Name &suffix, const uint8_t *buf, - size_t buffer_size, bool is_last = true, - uint32_t *start_offset = nullptr) { - if (!async_thread_.stopped()) { - async_thread_.add([this, suffix, buffer = buf, size = buffer_size, - is_last, start_offset]() { - if (start_offset != nullptr) { - *start_offset = ProducerSocket::produce(suffix, buffer, size, is_last, - *start_offset); - } else { - ProducerSocket::produce(suffix, buffer, size, is_last, 0); - } - }); - } - } - - void asyncProduce(const Name &suffix); + bool isRunning() override { return !production_protocol_->isRunning(); }; virtual void asyncProduce(Name content_name, std::unique_ptr<utils::MemBuf> &&buffer, @@ -381,75 +111,56 @@ class ProducerSocket : public Socket<BasePortal>, uint32_t **last_segment = nullptr) { if (!async_thread_.stopped()) { auto a = buffer.release(); - async_thread_.add( - [this, content_name, a, is_last, offset, last_segment]() { - auto buf = std::unique_ptr<utils::MemBuf>(a); - if (last_segment != NULL) { - **last_segment = - offset + ProducerSocket::produce(content_name, std::move(buf), - is_last, offset); - } else { - ProducerSocket::produce(content_name, std::move(buf), is_last, - offset); - } - }); - } - } - - virtual void asyncProduce(ContentObject &content_object) { - if (!async_thread_.stopped()) { - auto co_ptr = std::static_pointer_cast<ContentObject>( - content_object.shared_from_this()); - async_thread_.add([this, content_object = std::move(co_ptr)]() { - ProducerSocket::produce(*content_object); + async_thread_.add([this, content_name, a, is_last, offset, + last_segment]() { + auto buf = std::unique_ptr<utils::MemBuf>(a); + if (last_segment != NULL) { + **last_segment = offset + produceStream(content_name, std::move(buf), + is_last, offset); + } else { + produceStream(content_name, std::move(buf), is_last, offset); + } }); } } - virtual void registerPrefix(const Prefix &producer_namespace) { - served_namespaces_.push_back(producer_namespace); + virtual uint32_t produceStream(const Name &content_name, + std::unique_ptr<utils::MemBuf> &&buffer, + bool is_last = true, + uint32_t start_offset = 0) { + return production_protocol_->produceStream(content_name, std::move(buffer), + is_last, start_offset); } - void serveForever() { - if (listening_thread_.joinable()) { - listening_thread_.join(); - } + virtual uint32_t produceStream(const Name &content_name, + const uint8_t *buffer, size_t buffer_size, + bool is_last = true, + uint32_t start_offset = 0) { + return production_protocol_->produceStream( + content_name, buffer, buffer_size, is_last, start_offset); } - void stop() { portal_->stopEventsLoop(); } - - asio::io_service &getIoService() override { return portal_->getIoService(); }; - - virtual void onInterest(Interest &interest) { - if (on_interest_input_) { - on_interest_input_(*producer_interface_, interest); - } - - const std::shared_ptr<ContentObject> content_object = - output_buffer_.find(interest); - - if (content_object) { - if (on_interest_satisfied_output_buffer_) { - on_interest_satisfied_output_buffer_(*producer_interface_, interest); - } + virtual uint32_t produceDatagram(const Name &content_name, + std::unique_ptr<utils::MemBuf> &&buffer) { + return production_protocol_->produceDatagram(content_name, + std::move(buffer)); + } - if (on_content_object_output_) { - on_content_object_output_(*producer_interface_, *content_object); - } + virtual uint32_t produceDatagram(const Name &content_name, + const uint8_t *buffer, size_t buffer_size) { + return production_protocol_->produceDatagram(content_name, buffer, + buffer_size); + } - portal_->sendContentObject(*content_object); - } else { - if (on_interest_process_) { - on_interest_process_(*producer_interface_, interest); - } - } + void produce(ContentObject &content_object) { + production_protocol_->produce(content_object); } - virtual void onInterest(Interest::Ptr &&interest) override { - onInterest(*interest); - }; + void registerPrefix(const Prefix &producer_namespace) { + production_protocol_->registerNamespaceWithNetwork(producer_namespace); + } - virtual void onError(std::error_code ec) override {} + void stop() { production_protocol_->stop(); } virtual int setSocketOption(int socket_option_key, uint32_t socket_option_value) { @@ -462,7 +173,7 @@ class ProducerSocket : public Socket<BasePortal>, break; case GeneralTransportOptions::OUTPUT_BUFFER_SIZE: - output_buffer_.setLimit(socket_option_value); + production_protocol_->setOutputBufferSize(socket_option_value); break; case GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME: @@ -533,6 +244,12 @@ class ProducerSocket : public Socket<BasePortal>, break; } + case ProducerCallbacksOptions::CONTENT_OBJECT_TO_SIGN: + if (socket_option_value == VOID_HANDLER) { + on_content_object_to_sign_ = VOID_HANDLER; + break; + } + default: return SOCKET_OPTION_NOT_SET; } @@ -559,19 +276,6 @@ class ProducerSocket : public Socket<BasePortal>, return SOCKET_OPTION_NOT_SET; } - virtual int setSocketOption(int socket_option_key, - std::list<Prefix> socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::NETWORK_NAME: - served_namespaces_ = socket_option_value; - break; - default: - return SOCKET_OPTION_NOT_SET; - } - - return SOCKET_OPTION_SET; - } - virtual int setSocketOption( int socket_option_key, interface::ProducerContentObjectCallback socket_option_value) { @@ -594,6 +298,10 @@ class ProducerSocket : public Socket<BasePortal>, on_content_object_output_ = socket_option_value; break; + case ProducerCallbacksOptions::CONTENT_OBJECT_TO_SIGN: + on_content_object_to_sign_ = socket_option_value; + break; + default: return SOCKET_OPTION_NOT_SET; } @@ -663,7 +371,7 @@ class ProducerSocket : public Socket<BasePortal>, } virtual int setSocketOption(int socket_option_key, - utils::CryptoHashType socket_option_value) { + auth::CryptoHashType socket_option_value) { switch (socket_option_key) { case GeneralTransportOptions::HASH_ALGORITHM: hash_algorithm_ = socket_option_value; @@ -675,11 +383,12 @@ class ProducerSocket : public Socket<BasePortal>, return SOCKET_OPTION_SET; } - virtual int setSocketOption(int socket_option_key, - utils::CryptoSuite socket_option_value) { + virtual int setSocketOption( + int socket_option_key, + core::NextSegmentCalculationStrategy socket_option_value) { switch (socket_option_key) { - case GeneralTransportOptions::CRYPTO_SUITE: - crypto_suite_ = socket_option_value; + case GeneralTransportOptions::SUFFIX_STRATEGY: + suffix_strategy_ = socket_option_value; break; default: return SOCKET_OPTION_NOT_SET; @@ -690,7 +399,7 @@ class ProducerSocket : public Socket<BasePortal>, virtual int setSocketOption( int socket_option_key, - const std::shared_ptr<utils::Signer> &socket_option_value) { + const std::shared_ptr<auth::Signer> &socket_option_value) { switch (socket_option_key) { case GeneralTransportOptions::SIGNER: { utils::SpinLock::Acquire locked(signer_lock_); @@ -708,7 +417,7 @@ class ProducerSocket : public Socket<BasePortal>, uint32_t &socket_option_value) { switch (socket_option_key) { case GeneralTransportOptions::OUTPUT_BUFFER_SIZE: - socket_option_value = (uint32_t)output_buffer_.getLimit(); + socket_option_value = production_protocol_->getOutputBufferSize(); break; case GeneralTransportOptions::DATA_PACKET_SIZE: @@ -733,18 +442,8 @@ class ProducerSocket : public Socket<BasePortal>, socket_option_value = making_manifest_; break; - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - } - - virtual int getSocketOption(int socket_option_key, - std::list<Prefix> &socket_option_value) { - switch (socket_option_key) { - case GeneralTransportOptions::NETWORK_NAME: - socket_option_value = served_namespaces_; + case GeneralTransportOptions::ASYNC_MODE: + socket_option_value = is_async_; break; default: @@ -776,6 +475,10 @@ class ProducerSocket : public Socket<BasePortal>, *socket_option_value = &on_content_object_output_; break; + case ProducerCallbacksOptions::CONTENT_OBJECT_TO_SIGN: + *socket_option_value = &on_content_object_to_sign_; + break; + default: return SOCKET_OPTION_NOT_GET; } @@ -828,11 +531,11 @@ class ProducerSocket : public Socket<BasePortal>, *socket_option_value = &on_interest_inserted_input_buffer_; break; - case CACHE_HIT: + case ProducerCallbacksOptions::CACHE_HIT: *socket_option_value = &on_interest_satisfied_output_buffer_; break; - case CACHE_MISS: + case ProducerCallbacksOptions::CACHE_MISS: *socket_option_value = &on_interest_process_; break; @@ -844,8 +547,9 @@ class ProducerSocket : public Socket<BasePortal>, }); } - virtual int getSocketOption(int socket_option_key, - std::shared_ptr<Portal> &socket_option_value) { + virtual int getSocketOption( + int socket_option_key, + std::shared_ptr<core::Portal> &socket_option_value) { switch (socket_option_key) { case PORTAL: socket_option_value = portal_; @@ -859,7 +563,7 @@ class ProducerSocket : public Socket<BasePortal>, } virtual int getSocketOption(int socket_option_key, - utils::CryptoHashType &socket_option_value) { + auth::CryptoHashType &socket_option_value) { switch (socket_option_key) { case GeneralTransportOptions::HASH_ALGORITHM: socket_option_value = hash_algorithm_; @@ -871,22 +575,22 @@ class ProducerSocket : public Socket<BasePortal>, return SOCKET_OPTION_GET; } - virtual int getSocketOption(int socket_option_key, - utils::CryptoSuite &socket_option_value) { + virtual int getSocketOption( + int socket_option_key, + core::NextSegmentCalculationStrategy &socket_option_value) { switch (socket_option_key) { - case GeneralTransportOptions::HASH_ALGORITHM: - socket_option_value = crypto_suite_; + case GeneralTransportOptions::SUFFIX_STRATEGY: + socket_option_value = suffix_strategy_; break; default: return SOCKET_OPTION_NOT_GET; } - return SOCKET_OPTION_GET; } virtual int getSocketOption( int socket_option_key, - std::shared_ptr<utils::Signer> &socket_option_value) { + std::shared_ptr<auth::Signer> &socket_option_value) { switch (socket_option_key) { case GeneralTransportOptions::SIGNER: { utils::SpinLock::Acquire locked(signer_lock_); @@ -907,19 +611,21 @@ class ProducerSocket : public Socket<BasePortal>, // If the thread calling lambda_func is not the same of io_service, this // function reschedule the function on it template <typename Lambda, typename arg2> - int rescheduleOnIOService(int socket_option_key, arg2 socket_option_value, - Lambda lambda_func) { + int rescheduleOnIOServiceWithReference(int socket_option_key, + arg2 &socket_option_value, + Lambda lambda_func) { // To enforce type check - std::function<int(int, arg2)> func = lambda_func; + std::function<int(int, arg2 &)> func = lambda_func; int result = SOCKET_OPTION_SET; - if (listening_thread_.joinable() && - std::this_thread::get_id() != listening_thread_.get_id()) { + if (production_protocol_ && production_protocol_->isRunning()) { std::mutex mtx; /* Condition variable for the wait */ std::condition_variable cv; + bool done = false; - io_service_.dispatch([&socket_option_key, &socket_option_value, &mtx, &cv, - &result, &done, &func]() { + portal_->getIoService().dispatch([&socket_option_key, + &socket_option_value, &mtx, &cv, + &result, &done, &func]() { std::unique_lock<std::mutex> lck(mtx); done = true; result = func(socket_option_key, socket_option_value); @@ -939,21 +645,19 @@ class ProducerSocket : public Socket<BasePortal>, // If the thread calling lambda_func is not the same of io_service, this // function reschedule the function on it template <typename Lambda, typename arg2> - int rescheduleOnIOServiceWithReference(int socket_option_key, - arg2 &socket_option_value, - Lambda lambda_func) { + int rescheduleOnIOService(int socket_option_key, arg2 socket_option_value, + Lambda lambda_func) { // To enforce type check - std::function<int(int, arg2 &)> func = lambda_func; + std::function<int(int, arg2)> func = lambda_func; int result = SOCKET_OPTION_SET; - if (listening_thread_.joinable() && - std::this_thread::get_id() != this->listening_thread_.get_id()) { + if (production_protocol_ && production_protocol_->isRunning()) { std::mutex mtx; /* Condition variable for the wait */ std::condition_variable cv; - bool done = false; - io_service_.dispatch([&socket_option_key, &socket_option_value, &mtx, &cv, - &result, &done, &func]() { + portal_->getIoService().dispatch([&socket_option_key, + &socket_option_value, &mtx, &cv, + &result, &done, &func]() { std::unique_lock<std::mutex> lck(mtx); done = true; result = func(socket_option_key, socket_option_value); @@ -973,39 +677,20 @@ class ProducerSocket : public Socket<BasePortal>, // Threads protected: interface::ProducerSocket *producer_interface_; - std::thread listening_thread_; asio::io_service io_service_; - std::shared_ptr<Portal> portal_; std::atomic<size_t> data_packet_size_; - std::list<Prefix> - served_namespaces_; // No need to be threadsafe, this is always modified - // by the application thread std::atomic<uint32_t> content_object_expiry_time_; - utils::CircularFifo<std::shared_ptr<ContentObject>, 2048> - object_queue_for_callbacks_; - - // buffers - // ContentStore is thread-safe - utils::ContentStore output_buffer_; - utils::EventThread async_thread_; - int registration_status_; std::atomic<bool> making_manifest_; - - // map for storing sequence numbers for several calls of the publish - // function - std::unordered_map<Name, std::unordered_map<int, uint32_t>> seq_number_map_; - - std::atomic<utils::CryptoHashType> hash_algorithm_; - std::atomic<utils::CryptoSuite> crypto_suite_; + std::atomic<auth::CryptoHashType> hash_algorithm_; + std::atomic<auth::CryptoSuite> crypto_suite_; utils::SpinLock signer_lock_; - std::shared_ptr<utils::Signer> signer_; + std::shared_ptr<auth::Signer> signer_; core::NextSegmentCalculationStrategy suffix_strategy_; - // While manifests are being built, contents are stored in a queue - std::queue<std::shared_ptr<ContentObject>> content_queue_; + std::unique_ptr<protocol::ProductionProtocol> production_protocol_; // callbacks ProducerInterestCallback on_interest_input_; @@ -1021,63 +706,6 @@ class ProducerSocket : public Socket<BasePortal>, ProducerContentObjectCallback on_content_object_evicted_from_output_buffer_; ProducerContentCallback on_content_produced_; - - private: - void listen() { - bool first = true; - - for (core::Prefix &producer_namespace : served_namespaces_) { - if (first) { - core::BindConfig bind_config(producer_namespace, 1000); - portal_->bind(bind_config); - portal_->setProducerCallback(this); - first = !first; - } else { - portal_->registerRoute(producer_namespace); - } - } - - portal_->runEventsLoop(); - } - - void scheduleSendBurst() { - io_service_.post([this]() { - std::shared_ptr<ContentObject> co; - - for (uint32_t i = 0; i < burst_size; i++) { - if (object_queue_for_callbacks_.pop(co)) { - if (on_new_segment_) { - on_new_segment_(*producer_interface_, *co); - } - - if (on_content_object_to_sign_) { - on_content_object_to_sign_(*producer_interface_, *co); - } - - if (on_content_object_in_output_buffer_) { - on_content_object_in_output_buffer_(*producer_interface_, *co); - } - - if (on_content_object_output_) { - on_content_object_output_(*producer_interface_, *co); - } - } else { - break; - } - } - }); - } - - void passContentObjectToCallbacks( - const std::shared_ptr<ContentObject> &content_object) { - output_buffer_.insert(content_object); - portal_->sendContentObject(*content_object); - object_queue_for_callbacks_.push(std::move(content_object)); - - if (object_queue_for_callbacks_.size() >= burst_size) { - scheduleSendBurst(); - } - } }; } // namespace implementation diff --git a/libtransport/src/implementation/tls_rtc_socket_producer.cc b/libtransport/src/implementation/tls_rtc_socket_producer.cc index 9ef79ca23..9a62c8683 100644 --- a/libtransport/src/implementation/tls_rtc_socket_producer.cc +++ b/libtransport/src/implementation/tls_rtc_socket_producer.cc @@ -15,10 +15,8 @@ #include <hicn/transport/core/interest.h> #include <hicn/transport/interfaces/p2psecure_socket_producer.h> - #include <implementation/p2psecure_socket_producer.h> #include <implementation/tls_rtc_socket_producer.h> - #include <openssl/bio.h> #include <openssl/rand.h> #include <openssl/ssl.h> diff --git a/libtransport/src/implementation/tls_rtc_socket_producer.h b/libtransport/src/implementation/tls_rtc_socket_producer.h index 685c91244..92c657afc 100644 --- a/libtransport/src/implementation/tls_rtc_socket_producer.h +++ b/libtransport/src/implementation/tls_rtc_socket_producer.h @@ -15,7 +15,6 @@ #pragma once -#include <implementation/rtc_socket_producer.h> #include <implementation/tls_socket_producer.h> namespace transport { @@ -23,8 +22,7 @@ namespace implementation { class P2PSecureProducerSocket; -class TLSRTCProducerSocket : public RTCProducerSocket, - public TLSProducerSocket { +class TLSRTCProducerSocket : public TLSProducerSocket { friend class P2PSecureProducerSocket; public: @@ -34,7 +32,8 @@ class TLSRTCProducerSocket : public RTCProducerSocket, ~TLSRTCProducerSocket() = default; - void produce(std::unique_ptr<utils::MemBuf> &&buffer) override; + uint32_t produceDatagram(const Name &content_name, + std::unique_ptr<utils::MemBuf> &&buffer) override; void accept() override; diff --git a/libtransport/src/implementation/tls_socket_consumer.cc b/libtransport/src/implementation/tls_socket_consumer.cc index 1be6f41a7..99bcd4360 100644 --- a/libtransport/src/implementation/tls_socket_consumer.cc +++ b/libtransport/src/implementation/tls_socket_consumer.cc @@ -136,7 +136,6 @@ TLSConsumerSocket::TLSConsumerSocket(interface::ConsumerSocket *consumer_socket, int protocol, SSL *ssl) : ConsumerSocket(consumer_socket, protocol), name_(), - buf_pool_(), decrypted_content_(), payload_(), head_(), @@ -223,14 +222,15 @@ int TLSConsumerSocket::download_content(const Name &name) { content_downloaded_ = false; std::size_t max_buffer_size = read_callback_decrypted_->maxBufferSize(); - std::size_t buffer_size = read_callback_decrypted_->maxBufferSize() + SSL3_RT_MAX_PLAIN_LENGTH; + std::size_t buffer_size = + read_callback_decrypted_->maxBufferSize() + SSL3_RT_MAX_PLAIN_LENGTH; decrypted_content_ = utils::MemBuf::createCombined(buffer_size); int result = -1; std::size_t size = 0; while (!content_downloaded_ || something_to_read_) { - result = SSL_read( - this->ssl_, decrypted_content_->writableTail(), SSL3_RT_MAX_PLAIN_LENGTH); + result = SSL_read(this->ssl_, decrypted_content_->writableTail(), + SSL3_RT_MAX_PLAIN_LENGTH); /* SSL_read returns the data only if there were SSL3_RT_MAX_PLAIN_LENGTH of * the data has been fully downloaded */ diff --git a/libtransport/src/implementation/tls_socket_consumer.h b/libtransport/src/implementation/tls_socket_consumer.h index 1c5df346a..be08ec47d 100644 --- a/libtransport/src/implementation/tls_socket_consumer.h +++ b/libtransport/src/implementation/tls_socket_consumer.h @@ -16,9 +16,7 @@ #pragma once #include <hicn/transport/interfaces/socket_consumer.h> - #include <implementation/socket_consumer.h> - #include <openssl/ssl.h> namespace transport { @@ -74,7 +72,6 @@ class TLSConsumerSocket : public ConsumerSocket, SSL_CTX *ctx_; /* Chain of MemBuf to be used as a temporary buffer to pass descypted data * from the underlying layer to the application */ - utils::ObjectPool<utils::MemBuf> buf_pool_; std::unique_ptr<utils::MemBuf> decrypted_content_; /* Chain of MemBuf holding the payload to be written into interest or data */ std::unique_ptr<utils::MemBuf> payload_; diff --git a/libtransport/src/implementation/tls_socket_producer.cc b/libtransport/src/implementation/tls_socket_producer.cc index 339a1ad58..e54d38d56 100644 --- a/libtransport/src/implementation/tls_socket_producer.cc +++ b/libtransport/src/implementation/tls_socket_producer.cc @@ -14,10 +14,8 @@ */ #include <hicn/transport/interfaces/socket_producer.h> - #include <implementation/p2psecure_socket_producer.h> #include <implementation/tls_socket_producer.h> - #include <openssl/bio.h> #include <openssl/rand.h> #include <openssl/ssl.h> @@ -50,10 +48,14 @@ int TLSProducerSocket::readOld(BIO *b, char *buf, int size) { std::unique_lock<std::mutex> lck(socket->mtx_); + TRANSPORT_LOGD("Start wait on the CV."); + if (!socket->something_to_read_) { (socket->cv_).wait(lck); } + TRANSPORT_LOGD("CV unlocked."); + /* Either there already is something to read, or the thread has been waken up. * We must return the payload in the interest anyway */ utils::MemBuf *membuf = socket->handshake_packet_->next(); @@ -103,7 +105,7 @@ int TLSProducerSocket::writeOld(BIO *b, const char *buf, int num) { socket->tls_chunks_--; socket->parent_->setSocketOption(GeneralTransportOptions::MAKE_MANIFEST, false); - socket->parent_->ProducerSocket::produce( + socket->parent_->ProducerSocket::produceStream( socket->name_, (const uint8_t *)buf, num, socket->tls_chunks_ == 0, socket->last_segment_); socket->parent_->setSocketOption(GeneralTransportOptions::MAKE_MANIFEST, @@ -122,18 +124,18 @@ int TLSProducerSocket::writeOld(BIO *b, const char *buf, int num) { socket->tls_chunks_--; socket->to_call_oncontentproduced_--; - socket->last_segment_ += socket->ProducerSocket::produce( + socket->last_segment_ += socket->ProducerSocket::produceStream( socket->name_, std::move(mbuf), socket->tls_chunks_ == 0, socket->last_segment_); - ProducerContentCallback on_content_produced_application; + ProducerContentCallback *on_content_produced_application; socket->getSocketOption(ProducerCallbacksOptions::CONTENT_PRODUCED, - on_content_produced_application); + &on_content_produced_application); if (socket->to_call_oncontentproduced_ == 0 && on_content_produced_application) { - on_content_produced_application(*socket->getInterface(), - std::error_code(), 0); + on_content_produced_application->operator()(*socket->getInterface(), + std::error_code(), 0); } }); } @@ -144,7 +146,8 @@ int TLSProducerSocket::writeOld(BIO *b, const char *buf, int num) { TLSProducerSocket::TLSProducerSocket(interface::ProducerSocket *producer_socket, P2PSecureProducerSocket *parent, const Name &handshake_name) - : ProducerSocket(producer_socket), + : ProducerSocket(producer_socket, + ProductionProtocolAlgorithms::BYTE_STREAM), on_content_produced_application_(), mtx_(), cv_(), @@ -236,13 +239,14 @@ void TLSProducerSocket::accept() { std::move(parent_->map_producers[handshake_name_])); parent_->map_producers.erase(handshake_name_); - ProducerInterestCallback on_interest_process_decrypted; + ProducerInterestCallback *on_interest_process_decrypted; getSocketOption(ProducerCallbacksOptions::CACHE_MISS, - on_interest_process_decrypted); + &on_interest_process_decrypted); - if (on_interest_process_decrypted) { - Interest inter(std::move(handshake_packet_)); - on_interest_process_decrypted(*getInterface(), inter); + if (*on_interest_process_decrypted) { + Interest inter(std::move(*handshake_packet_)); + handshake_packet_.reset(); + on_interest_process_decrypted->operator()(*getInterface(), inter); } else { throw errors::RuntimeException( "On interest process unset: unable to perform handshake"); @@ -270,14 +274,14 @@ void TLSProducerSocket::onInterest(ProducerSocket &p, Interest &interest) { std::unique_lock<std::mutex> lck(mtx_); name_ = interest.getName(); - interest.separateHeaderPayload(); + // interest.separateHeaderPayload(); handshake_packet_ = interest.acquireMemBufReference(); something_to_read_ = true; cv_.notify_one(); return; } else if (handshake_state == SERVER_FINISHED) { - interest.separateHeaderPayload(); + // interest.separateHeaderPayload(); handshake_packet_ = interest.acquireMemBufReference(); something_to_read_ = true; @@ -288,12 +292,12 @@ void TLSProducerSocket::onInterest(ProducerSocket &p, Interest &interest) { interest.getPayload()->length()); } - ProducerInterestCallback on_interest_input_decrypted; + ProducerInterestCallback *on_interest_input_decrypted; getSocketOption(ProducerCallbacksOptions::INTEREST_INPUT, - on_interest_input_decrypted); + &on_interest_input_decrypted); - if (on_interest_input_decrypted) - (on_interest_input_decrypted)(*getInterface(), interest); + if (*on_interest_input_decrypted) + (*on_interest_input_decrypted)(*getInterface(), interest); } } @@ -301,17 +305,19 @@ void TLSProducerSocket::cacheMiss(interface::ProducerSocket &p, Interest &interest) { HandshakeState handshake_state = getHandshakeState(); + TRANSPORT_LOGD("On cache miss in TLS socket producer."); + if (handshake_state == CLIENT_HELLO) { std::unique_lock<std::mutex> lck(mtx_); - interest.separateHeaderPayload(); + // interest.separateHeaderPayload(); handshake_packet_ = interest.acquireMemBufReference(); something_to_read_ = true; handshake_state_ = CLIENT_FINISHED; cv_.notify_one(); } else if (handshake_state == SERVER_FINISHED) { - interest.separateHeaderPayload(); + // interest.separateHeaderPayload(); handshake_packet_ = interest.acquireMemBufReference(); something_to_read_ = true; @@ -343,16 +349,16 @@ void TLSProducerSocket::onContentProduced(interface::ProducerSocket &p, const std::error_code &err, uint64_t bytes_written) {} -uint32_t TLSProducerSocket::produce(Name content_name, - std::unique_ptr<utils::MemBuf> &&buffer, - bool is_last, uint32_t start_offset) { +uint32_t TLSProducerSocket::produceStream( + const Name &content_name, std::unique_ptr<utils::MemBuf> &&buffer, + bool is_last, uint32_t start_offset) { if (getHandshakeState() != SERVER_FINISHED) { throw errors::RuntimeException( "New handshake on the same P2P secure producer socket not supported"); } size_t buf_size = buffer->length(); - name_ = served_namespaces_.front().mapName(content_name); + name_ = production_protocol_->getNamespaces().front().mapName(content_name); tls_chunks_ = to_call_oncontentproduced_ = ceil((float)buf_size / (float)SSL3_RT_MAX_PLAIN_LENGTH); @@ -370,46 +376,6 @@ uint32_t TLSProducerSocket::produce(Name content_name, return 0; } -void TLSProducerSocket::asyncProduce(const Name &content_name, - const uint8_t *buf, size_t buffer_size, - bool is_last, uint32_t *start_offset) { - if (!encryption_thread_.stopped()) { - encryption_thread_.add([this, content_name, buffer = buf, - size = buffer_size, is_last, start_offset]() { - if (start_offset != NULL) { - produce(content_name, buffer, size, is_last, *start_offset); - } else { - produce(content_name, buffer, size, is_last, 0); - } - }); - } -} - -void TLSProducerSocket::asyncProduce(Name content_name, - std::unique_ptr<utils::MemBuf> &&buffer, - bool is_last, uint32_t offset, - uint32_t **last_segment) { - if (!encryption_thread_.stopped()) { - auto a = buffer.release(); - encryption_thread_.add( - [this, content_name, a, is_last, offset, last_segment]() { - auto buf = std::unique_ptr<utils::MemBuf>(a); - if (last_segment != NULL) { - *last_segment = &last_segment_; - } - produce(content_name, std::move(buf), is_last, offset); - }); - } -} - -void TLSProducerSocket::asyncProduce(ContentObject &content_object) { - throw errors::RuntimeException("API not supported"); -} - -void TLSProducerSocket::produce(ContentObject &content_object) { - throw errors::RuntimeException("API not supported"); -} - long TLSProducerSocket::ctrl(BIO *b, int cmd, long num, void *ptr) { if (cmd == BIO_CTRL_FLUSH) { } @@ -424,13 +390,14 @@ int TLSProducerSocket::addHicnKeyIdCb(SSL *s, unsigned int ext_type, void *add_arg) { TLSProducerSocket *socket = reinterpret_cast<TLSProducerSocket *>(add_arg); + TRANSPORT_LOGD("On addHicnKeyIdCb, for the prefix registration."); + if (ext_type == 100) { - ip_prefix_t ip_prefix = - socket->parent_->served_namespaces_.front().toIpPrefixStruct(); - int inet_family = - socket->parent_->served_namespaces_.front().getAddressFamily(); - uint16_t prefix_len_bits = - socket->parent_->served_namespaces_.front().getPrefixLength(); + auto &prefix = + socket->parent_->production_protocol_->getNamespaces().front(); + const ip_prefix_t &ip_prefix = prefix.toIpPrefixStruct(); + int inet_family = prefix.getAddressFamily(); + uint16_t prefix_len_bits = prefix.getPrefixLength(); uint8_t prefix_len_bytes = prefix_len_bits / 8; uint8_t prefix_len_u32 = prefix_len_bits / 32; @@ -479,10 +446,9 @@ int TLSProducerSocket::addHicnKeyIdCb(SSL *s, unsigned int ext_type, socket->parent_->on_interest_process_decrypted_; socket->registerPrefix( - Prefix(socket->parent_->served_namespaces_.front().getName( - Name(inet_family, (uint8_t *)&mask), - Name(inet_family, (uint8_t *)&keyId_component), - socket->parent_->served_namespaces_.front().getName()), + Prefix(prefix.getName(Name(inet_family, (uint8_t *)&mask), + Name(inet_family, (uint8_t *)&keyId_component), + prefix.getName()), out_ip->len)); socket->connect(); } @@ -580,61 +546,5 @@ int TLSProducerSocket::getSocketOption( }); } -int TLSProducerSocket::getSocketOption( - int socket_option_key, ProducerContentCallback &socket_option_value) { - return rescheduleOnIOServiceWithReference( - socket_option_key, socket_option_value, - [this](int socket_option_key, - ProducerContentCallback &socket_option_value) -> int { - switch (socket_option_key) { - case ProducerCallbacksOptions::CONTENT_PRODUCED: - socket_option_value = on_content_produced_application_; - break; - - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - }); -} - -int TLSProducerSocket::getSocketOption( - int socket_option_key, ProducerInterestCallback &socket_option_value) { - // Reschedule the function on the io_service to avoid race condition in case - // setSocketOption is called while the io_service is running. - return rescheduleOnIOServiceWithReference( - socket_option_key, socket_option_value, - [this](int socket_option_key, - ProducerInterestCallback &socket_option_value) -> int { - switch (socket_option_key) { - case ProducerCallbacksOptions::INTEREST_INPUT: - socket_option_value = on_interest_input_decrypted_; - break; - - case ProducerCallbacksOptions::INTEREST_DROP: - socket_option_value = on_interest_dropped_input_buffer_; - break; - - case ProducerCallbacksOptions::INTEREST_PASS: - socket_option_value = on_interest_inserted_input_buffer_; - break; - - case ProducerCallbacksOptions::CACHE_HIT: - socket_option_value = on_interest_satisfied_output_buffer_; - break; - - case ProducerCallbacksOptions::CACHE_MISS: - socket_option_value = on_interest_process_decrypted_; - break; - - default: - return SOCKET_OPTION_NOT_GET; - } - - return SOCKET_OPTION_GET; - }); -} - } // namespace implementation } // namespace transport diff --git a/libtransport/src/implementation/tls_socket_producer.h b/libtransport/src/implementation/tls_socket_producer.h index 2382e8695..a542a4d9f 100644 --- a/libtransport/src/implementation/tls_socket_producer.h +++ b/libtransport/src/implementation/tls_socket_producer.h @@ -16,8 +16,8 @@ #pragma once #include <implementation/socket_producer.h> - #include <openssl/ssl.h> + #include <condition_variable> #include <mutex> @@ -36,26 +36,18 @@ class TLSProducerSocket : virtual public ProducerSocket { ~TLSProducerSocket(); - uint32_t produce(Name content_name, const uint8_t *buffer, size_t buffer_size, - bool is_last = true, uint32_t start_offset = 0) override { - return produce(content_name, utils::MemBuf::copyBuffer(buffer, buffer_size), - is_last, start_offset); + uint32_t produceStream(const Name &content_name, const uint8_t *buffer, + size_t buffer_size, bool is_last = true, + uint32_t start_offset = 0) override { + return produceStream(content_name, + utils::MemBuf::copyBuffer(buffer, buffer_size), + is_last, start_offset); } - uint32_t produce(Name content_name, std::unique_ptr<utils::MemBuf> &&buffer, - bool is_last = true, uint32_t start_offset = 0) override; - - void produce(ContentObject &content_object) override; - - void asyncProduce(const Name &suffix, const uint8_t *buf, size_t buffer_size, - bool is_last = true, - uint32_t *start_offset = nullptr) override; - - void asyncProduce(Name content_name, std::unique_ptr<utils::MemBuf> &&buffer, - bool is_last, uint32_t offset, - uint32_t **last_segment = nullptr) override; - - void asyncProduce(ContentObject &content_object) override; + uint32_t produceStream(const Name &content_name, + std::unique_ptr<utils::MemBuf> &&buffer, + bool is_last = true, + uint32_t start_offset = 0) override; virtual void accept(); @@ -80,7 +72,7 @@ class TLSProducerSocket : virtual public ProducerSocket { ProducerInterestCallback &socket_option_value); using ProducerSocket::getSocketOption; - using ProducerSocket::onInterest; + // using ProducerSocket::onInterest; using ProducerSocket::setSocketOption; protected: @@ -119,6 +111,7 @@ class TLSProducerSocket : virtual public ProducerSocket { int to_call_oncontentproduced_; bool still_writing_; utils::EventThread encryption_thread_; + utils::EventThread async_thread_; void onInterest(ProducerSocket &p, Interest &interest); |