summaryrefslogtreecommitdiffstats
path: root/libtransport/src/implementation
diff options
context:
space:
mode:
Diffstat (limited to 'libtransport/src/implementation')
-rw-r--r--libtransport/src/implementation/CMakeLists.txt2
-rw-r--r--libtransport/src/implementation/p2psecure_socket_consumer.cc36
-rw-r--r--libtransport/src/implementation/p2psecure_socket_consumer.h6
-rw-r--r--libtransport/src/implementation/p2psecure_socket_producer.cc71
-rw-r--r--libtransport/src/implementation/p2psecure_socket_producer.h13
-rw-r--r--libtransport/src/implementation/socket.cc31
-rw-r--r--libtransport/src/implementation/socket.h24
-rw-r--r--libtransport/src/implementation/socket_consumer.h178
-rw-r--r--libtransport/src/implementation/socket_producer.h207
-rw-r--r--libtransport/src/implementation/tls_rtc_socket_producer.cc6
-rw-r--r--libtransport/src/implementation/tls_rtc_socket_producer.h2
-rw-r--r--libtransport/src/implementation/tls_socket_consumer.cc31
-rw-r--r--libtransport/src/implementation/tls_socket_consumer.h7
-rw-r--r--libtransport/src/implementation/tls_socket_producer.cc11
-rw-r--r--libtransport/src/implementation/tls_socket_producer.h2
15 files changed, 352 insertions, 275 deletions
diff --git a/libtransport/src/implementation/CMakeLists.txt b/libtransport/src/implementation/CMakeLists.txt
index daf899d06..1f2a33a4c 100644
--- a/libtransport/src/implementation/CMakeLists.txt
+++ b/libtransport/src/implementation/CMakeLists.txt
@@ -1,4 +1,4 @@
-# Copyright (c) 2017-2019 Cisco and/or its affiliates.
+# Copyright (c) 2021 Cisco and/or its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
diff --git a/libtransport/src/implementation/p2psecure_socket_consumer.cc b/libtransport/src/implementation/p2psecure_socket_consumer.cc
index 4b14da5d2..6b67a5487 100644
--- a/libtransport/src/implementation/p2psecure_socket_consumer.cc
+++ b/libtransport/src/implementation/p2psecure_socket_consumer.cc
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2020 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -44,7 +44,7 @@ int readOld(BIO *b, char *buf, int size) {
if (!socket->something_to_read_) {
if (!socket->transport_protocol_->isRunning()) {
socket->network_name_.setSuffix(socket->random_suffix_);
- socket->ConsumerSocket::asyncConsume(socket->network_name_);
+ socket->ConsumerSocket::consume(socket->network_name_);
}
if (!socket->something_to_read_) socket->cv_.wait(lck);
@@ -312,36 +312,6 @@ int P2PSecureConsumerSocket::consume(const Name &name) {
return tls_consumer_->consume((prefix->mapName(name)));
}
-int P2PSecureConsumerSocket::asyncConsume(const Name &name) {
- if (transport_protocol_->isRunning()) {
- return CONSUMER_BUSY;
- }
-
- if (handshake() != 1) {
- throw errors::RuntimeException("Unable to perform client handshake");
- } else {
- DLOG_IF(INFO, VLOG_IS_ON(2)) << "Handshake performed!";
- }
-
- initSessionSocket();
-
- if (tls_consumer_ == nullptr) {
- throw errors::RuntimeException("TLS socket does not exist");
- }
-
- std::shared_ptr<Name> prefix_name = std::make_shared<Name>(
- secure_prefix_.family,
- ip_address_get_buffer(&(secure_prefix_.address), secure_prefix_.family));
- std::shared_ptr<Prefix> prefix =
- std::make_shared<Prefix>(*prefix_name, secure_prefix_.len);
-
- if (payload_ != NULL)
- return tls_consumer_->asyncConsume((prefix->mapName(name)),
- std::move(payload_));
- else
- return tls_consumer_->asyncConsume((prefix->mapName(name)));
-}
-
void P2PSecureConsumerSocket::registerPrefix(const Prefix &producer_namespace) {
producer_namespace_ = producer_namespace;
}
@@ -385,7 +355,7 @@ void P2PSecureConsumerSocket::readBufferAvailable(
cv_.notify_one();
}
-void P2PSecureConsumerSocket::readError(const std::error_code ec) noexcept {};
+void P2PSecureConsumerSocket::readError(const std::error_code &ec) noexcept {};
void P2PSecureConsumerSocket::readSuccess(std::size_t total_size) noexcept {
std::unique_lock<std::mutex> lck(this->mtx_);
diff --git a/libtransport/src/implementation/p2psecure_socket_consumer.h b/libtransport/src/implementation/p2psecure_socket_consumer.h
index a35a50352..a5e69f611 100644
--- a/libtransport/src/implementation/p2psecure_socket_consumer.h
+++ b/libtransport/src/implementation/p2psecure_socket_consumer.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2020 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -48,8 +48,6 @@ class P2PSecureConsumerSocket : public ConsumerSocket,
int consume(const Name &name) override;
- int asyncConsume(const Name &name) override;
-
void registerPrefix(const Prefix &producer_namespace);
int setSocketOption(
@@ -120,7 +118,7 @@ class P2PSecureConsumerSocket : public ConsumerSocket,
virtual void readBufferAvailable(
std::unique_ptr<utils::MemBuf> &&buffer) noexcept override;
- virtual void readError(const std::error_code ec) noexcept override;
+ virtual void readError(const std::error_code &ec) noexcept override;
virtual void readSuccess(std::size_t total_size) noexcept override;
diff --git a/libtransport/src/implementation/p2psecure_socket_producer.cc b/libtransport/src/implementation/p2psecure_socket_producer.cc
index 3748001fc..ee78ea53b 100644
--- a/libtransport/src/implementation/p2psecure_socket_producer.cc
+++ b/libtransport/src/implementation/p2psecure_socket_producer.cc
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2020 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -20,6 +20,7 @@
#include <interfaces/tls_rtc_socket_producer.h>
#include <interfaces/tls_socket_producer.h>
#include <openssl/bio.h>
+#include <openssl/pkcs12.h>
#include <openssl/rand.h>
#include <openssl/ssl.h>
@@ -41,7 +42,7 @@ P2PSecureProducerSocket::P2PSecureProducerSocket(
P2PSecureProducerSocket::P2PSecureProducerSocket(
interface::ProducerSocket *producer_socket, bool rtc,
- const std::shared_ptr<auth::Identity> &identity)
+ std::string &keystore_path, std::string &keystore_pwd)
: ProducerSocket(producer_socket,
ProductionProtocolAlgorithms::BYTE_STREAM),
rtc_(rtc),
@@ -50,8 +51,16 @@ P2PSecureProducerSocket::P2PSecureProducerSocket(
map_producers(),
list_producers() {
/* Setup SSL context (identity and parameter to use TLS 1.3) */
- cert_509_ = identity->getCertificate().get();
- pkey_rsa_ = identity->getPrivateKey().get();
+ FILE *p12file = fopen(keystore_path.c_str(), "r");
+ if (p12file == NULL)
+ throw errors::RuntimeException("impossible open keystore");
+ std::unique_ptr<PKCS12, decltype(&::PKCS12_free)> p12(
+ d2i_PKCS12_fp(p12file, NULL), ::PKCS12_free);
+ // now we parse the file to get the first key and certificate
+ if (1 != PKCS12_parse(p12.get(), keystore_pwd.c_str(), &pkey_rsa_, &cert_509_,
+ NULL))
+ throw errors::RuntimeException("impossible to get the private key");
+ fclose(p12file);
/* Set the callback so that when an interest is received we catch it and we
* decrypt the payload before passing it to the application. */
@@ -133,15 +142,15 @@ uint32_t P2PSecureProducerSocket::produceDatagram(
// TODO
throw errors::NotImplementedException();
- if (!rtc_) {
- throw errors::RuntimeException(
- "RTC must be the transport protocol to start the production of current "
- "data. Aborting.");
- }
+ // if (!rtc_) {
+ // throw errors::RuntimeException(
+ // "RTC must be the transport protocol to start the production of
+ // current " "data. Aborting.");
+ // }
- std::unique_lock<std::mutex> lck(mtx_);
+ // std::unique_lock<std::mutex> lck(mtx_);
- if (list_producers.empty()) cv_.wait(lck);
+ // if (list_producers.empty()) cv_.wait(lck);
// TODO
// for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++)
@@ -151,7 +160,7 @@ uint32_t P2PSecureProducerSocket::produceDatagram(
// rtc_producer->produce(utils::MemBuf::copyBuffer(buffer, buffer_size));
// }
- return 0;
+ // return 0;
}
uint32_t P2PSecureProducerSocket::produceStream(
@@ -197,44 +206,6 @@ uint32_t P2PSecureProducerSocket::produceStream(const Name &content_name,
return segments;
}
-// void P2PSecureProducerSocket::asyncProduce(const Name &content_name,
-// const uint8_t *buf,
-// size_t buffer_size, bool is_last,
-// uint32_t *start_offset) {
-// if (rtc_) {
-// throw errors::RuntimeException(
-// "RTC transport protocol is not compatible with the production of "
-// "current data. Aborting.");
-// }
-
-// std::unique_lock<std::mutex> lck(mtx_);
-// if (list_producers.empty()) cv_.wait(lck);
-
-// for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++)
-// {
-// (*it)->asyncProduce(content_name, buf, buffer_size, is_last,
-// start_offset);
-// }
-// }
-
-void P2PSecureProducerSocket::asyncProduce(
- Name content_name, std::unique_ptr<utils::MemBuf> &&buffer, bool is_last,
- uint32_t offset, uint32_t **last_segment) {
- if (rtc_) {
- throw errors::RuntimeException(
- "RTC transport protocol is not compatible with the production of "
- "current data. Aborting.");
- }
-
- std::unique_lock<std::mutex> lck(mtx_);
- if (list_producers.empty()) cv_.wait(lck);
-
- for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) {
- (*it)->asyncProduce(content_name, buffer->clone(), is_last, offset,
- last_segment);
- }
-}
-
/* Redefinition of socket options to avoid name hiding */
int P2PSecureProducerSocket::setSocketOption(
int socket_option_key, ProducerInterestCallback socket_option_value) {
diff --git a/libtransport/src/implementation/p2psecure_socket_producer.h b/libtransport/src/implementation/p2psecure_socket_producer.h
index f94347258..00f407a75 100644
--- a/libtransport/src/implementation/p2psecure_socket_producer.h
+++ b/libtransport/src/implementation/p2psecure_socket_producer.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -15,7 +15,6 @@
#pragma once
-#include <hicn/transport/auth/identity.h>
#include <hicn/transport/auth/signer.h>
#include <implementation/socket_producer.h>
// #include <implementation/tls_rtc_socket_producer.h>
@@ -38,9 +37,9 @@ class P2PSecureProducerSocket : public ProducerSocket {
public:
explicit P2PSecureProducerSocket(interface::ProducerSocket *producer_socket);
- explicit P2PSecureProducerSocket(
- interface::ProducerSocket *producer_socket, bool rtc,
- const std::shared_ptr<auth::Identity> &identity);
+ explicit P2PSecureProducerSocket(interface::ProducerSocket *producer_socket,
+ bool rtc, std::string &keystore_path,
+ std::string &keystore_pwd);
~P2PSecureProducerSocket();
@@ -56,10 +55,6 @@ class P2PSecureProducerSocket : public ProducerSocket {
bool is_last = true,
uint32_t start_offset = 0) override;
- void asyncProduce(Name content_name, std::unique_ptr<utils::MemBuf> &&buffer,
- bool is_last, uint32_t offset,
- uint32_t **last_segment = nullptr) override;
-
int setSocketOption(int socket_option_key,
ProducerInterestCallback socket_option_value) override;
diff --git a/libtransport/src/implementation/socket.cc b/libtransport/src/implementation/socket.cc
index 2e21f2bc3..95941da07 100644
--- a/libtransport/src/implementation/socket.cc
+++ b/libtransport/src/implementation/socket.cc
@@ -14,13 +14,42 @@
*/
#include <core/global_configuration.h>
+#include <hicn/transport/interfaces/socket_options_default_values.h>
#include <implementation/socket.h>
namespace transport {
namespace implementation {
Socket::Socket(std::shared_ptr<core::Portal> &&portal)
- : portal_(std::move(portal)), is_async_(false) {}
+ : portal_(std::move(portal)),
+ is_async_(false),
+ packet_format_(interface::default_values::packet_format) {}
+
+int Socket::setSocketOption(int socket_option_key,
+ hicn_format_t packet_format) {
+ switch (socket_option_key) {
+ case interface::GeneralTransportOptions::PACKET_FORMAT:
+ packet_format_ = packet_format;
+ break;
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+
+ return SOCKET_OPTION_SET;
+}
+
+int Socket::getSocketOption(int socket_option_key,
+ hicn_format_t &packet_format) {
+ switch (socket_option_key) {
+ case interface::GeneralTransportOptions::PACKET_FORMAT:
+ packet_format = packet_format_;
+ break;
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+
+ return SOCKET_OPTION_GET;
+}
} // namespace implementation
} // namespace transport \ No newline at end of file
diff --git a/libtransport/src/implementation/socket.h b/libtransport/src/implementation/socket.h
index cf22c03e1..11c9a704d 100644
--- a/libtransport/src/implementation/socket.h
+++ b/libtransport/src/implementation/socket.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -38,7 +38,26 @@ class Socket {
virtual void connect() = 0;
virtual bool isRunning() = 0;
- virtual asio::io_service &getIoService() { return portal_->getIoService(); }
+ virtual asio::io_service &getIoService() {
+ return portal_->getThread().getIoService();
+ }
+
+ int setSocketOption(int socket_option_key, hicn_format_t packet_format);
+ int getSocketOption(int socket_option_key, hicn_format_t &packet_format);
+
+ int getSocketOption(int socket_option_key,
+ std::shared_ptr<core::Portal> &socket_option_value) {
+ switch (socket_option_key) {
+ case interface::GeneralTransportOptions::PORTAL:
+ socket_option_value = portal_;
+ break;
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ ;
+ }
+
+ return SOCKET_OPTION_GET;
+ }
protected:
Socket(std::shared_ptr<core::Portal> &&portal);
@@ -48,6 +67,7 @@ class Socket {
protected:
std::shared_ptr<core::Portal> portal_;
bool is_async_;
+ hicn_format_t packet_format_;
};
} // namespace implementation
diff --git a/libtransport/src/implementation/socket_consumer.h b/libtransport/src/implementation/socket_consumer.h
index e0981af7f..ebdac7f93 100644
--- a/libtransport/src/implementation/socket_consumer.h
+++ b/libtransport/src/implementation/socket_consumer.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -20,6 +20,7 @@
#include <hicn/transport/interfaces/socket_options_default_values.h>
#include <hicn/transport/interfaces/statistics.h>
#include <hicn/transport/utils/event_thread.h>
+#include <implementation/socket.h>
#include <protocols/cbr.h>
#include <protocols/raaqm.h>
#include <protocols/rtc/rtc.h>
@@ -38,7 +39,6 @@ class ConsumerSocket : public Socket {
std::shared_ptr<core::Portal> &&portal)
: Socket(std::move(portal)),
consumer_interface_(consumer),
- async_downloader_(),
interest_lifetime_(default_values::interest_lifetime),
min_window_size_(default_values::min_window_size),
max_window_size_(default_values::max_window_size),
@@ -56,6 +56,7 @@ class ConsumerSocket : public Socket {
rate_estimation_observer_(nullptr),
rate_estimation_batching_parameter_(default_values::batch),
rate_estimation_choice_(0),
+ max_unverified_delay_(default_values::max_unverified_delay),
verifier_(std::make_shared<auth::VoidVerifier>()),
verify_signature_(false),
reset_window_(false),
@@ -64,41 +65,42 @@ class ConsumerSocket : public Socket {
on_interest_satisfied_(VOID_HANDLER),
on_content_object_input_(VOID_HANDLER),
stats_summary_(VOID_HANDLER),
+ on_fwd_strategy_(VOID_HANDLER),
+ on_rec_strategy_(VOID_HANDLER),
read_callback_(nullptr),
timer_interval_milliseconds_(0),
+ recovery_strategy_(RtcTransportRecoveryStrategies::RTX_ONLY),
+ aggregated_data_(false),
+ fec_setting_(""),
guard_raaqm_params_() {
switch (protocol) {
case TransportProtocolAlgorithms::CBR:
transport_protocol_ =
- std::make_unique<protocol::CbrTransportProtocol>(this);
+ std::make_shared<protocol::CbrTransportProtocol>(this);
break;
case TransportProtocolAlgorithms::RTC:
transport_protocol_ =
- std::make_unique<protocol::rtc::RTCTransportProtocol>(this);
+ std::make_shared<protocol::rtc::RTCTransportProtocol>(this);
break;
case TransportProtocolAlgorithms::RAAQM:
default:
transport_protocol_ =
- std::make_unique<protocol::RaaqmTransportProtocol>(this);
+ std::make_shared<protocol::RaaqmTransportProtocol>(this);
break;
}
}
public:
ConsumerSocket(interface::ConsumerSocket *consumer, int protocol)
- : ConsumerSocket(consumer, protocol, std::make_shared<core::Portal>()) {}
+ : ConsumerSocket(consumer, protocol, core::Portal::createShared()) {}
ConsumerSocket(interface::ConsumerSocket *consumer, int protocol,
- asio::io_service &io_service)
- : ConsumerSocket(consumer, protocol,
- std::make_shared<core::Portal>(io_service)) {
+ ::utils::EventThread &worker)
+ : ConsumerSocket(consumer, protocol, core::Portal::createShared(worker)) {
is_async_ = true;
}
- ~ConsumerSocket() {
- stop();
- async_downloader_.stop();
- }
+ ~ConsumerSocket() { stop(); }
interface::ConsumerSocket *getInterface() {
return consumer_interface_;
@@ -122,18 +124,6 @@ class ConsumerSocket : public Socket {
transport_protocol_->start();
- return is_async_ ? CONSUMER_RUNNING : CONSUMER_FINISHED;
- }
-
- virtual int asyncConsume(const Name &name) {
- if (!async_downloader_.stopped()) {
- async_downloader_.add([this, name]() {
- network_name_ = std::move(name);
- network_name_.setSuffix(0);
- transport_protocol_->start();
- });
- }
-
return CONSUMER_RUNNING;
}
@@ -149,6 +139,9 @@ class ConsumerSocket : public Socket {
}
}
+ using Socket::getSocketOption;
+ using Socket::setSocketOption;
+
virtual int setSocketOption(int socket_option_key,
ReadCallback *socket_option_value) {
// Reschedule the function on the io_service to avoid race condition in
@@ -245,6 +238,10 @@ class ConsumerSocket : public Socket {
interest_lifetime_ = socket_option_value;
break;
+ case GeneralTransportOptions::MAX_UNVERIFIED_TIME:
+ max_unverified_delay_ = socket_option_value;
+ break;
+
case RateEstimationOptions::RATE_ESTIMATION_BATCH_PARAMETER:
if (socket_option_value > 0) {
rate_estimation_batching_parameter_ = socket_option_value;
@@ -265,6 +262,11 @@ class ConsumerSocket : public Socket {
timer_interval_milliseconds_ = socket_option_value;
break;
+ case RtcTransportOptions::RECOVERY_STRATEGY:
+ recovery_strategy_ =
+ (RtcTransportRecoveryStrategies)socket_option_value;
+ break;
+
default:
return SOCKET_OPTION_NOT_SET;
}
@@ -328,6 +330,11 @@ class ConsumerSocket : public Socket {
result = SOCKET_OPTION_SET;
break;
+ case RtcTransportOptions::AGGREGATED_DATA:
+ aggregated_data_ = socket_option_value;
+ result = SOCKET_OPTION_SET;
+ break;
+
default:
return result;
}
@@ -406,36 +413,37 @@ class ConsumerSocket : public Socket {
int setSocketOption(
int socket_option_key,
const std::shared_ptr<auth::Verifier> &socket_option_value) {
- int result = SOCKET_OPTION_NOT_SET;
if (!transport_protocol_->isRunning()) {
switch (socket_option_key) {
case GeneralTransportOptions::VERIFIER:
verifier_.reset();
verifier_ = socket_option_value;
- result = SOCKET_OPTION_SET;
break;
default:
- return result;
+ return SOCKET_OPTION_NOT_SET;
}
}
-
- return result;
+ return SOCKET_OPTION_SET;
}
int setSocketOption(int socket_option_key,
const std::string &socket_option_value) {
int result = SOCKET_OPTION_NOT_SET;
- if (!transport_protocol_->isRunning()) {
- switch (socket_option_key) {
- case DataLinkOptions::OUTPUT_INTERFACE:
+ switch (socket_option_key) {
+ case DataLinkOptions::OUTPUT_INTERFACE:
+ if (!transport_protocol_->isRunning()) {
output_interface_ = socket_option_value;
portal_->setOutputInterface(output_interface_);
result = SOCKET_OPTION_SET;
- break;
+ }
+ break;
+ case GeneralTransportOptions::FEC_TYPE:
+ fec_setting_ = socket_option_value;
+ result = SOCKET_OPTION_SET;
+ break;
- default:
- return result;
- }
+ default:
+ return result;
}
return result;
}
@@ -461,6 +469,29 @@ class ConsumerSocket : public Socket {
});
}
+ int setSocketOption(int socket_option_key,
+ StrategyCallback socket_option_value) {
+ // Reschedule the function on the io_service to avoid race condition in
+ // case setSocketOption is called while the io_service is running.
+ return rescheduleOnIOService(
+ socket_option_key, socket_option_value,
+ [this](int socket_option_key,
+ StrategyCallback socket_option_value) -> int {
+ switch (socket_option_key) {
+ case ConsumerCallbacksOptions::FWD_STRATEGY_CHANGE:
+ on_fwd_strategy_ = socket_option_value;
+ break;
+ case ConsumerCallbacksOptions::REC_STRATEGY_CHANGE:
+ on_rec_strategy_ = socket_option_value;
+ break;
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+
+ return SOCKET_OPTION_SET;
+ });
+ }
+
int getSocketOption(int socket_option_key, double &socket_option_value) {
utils::SpinLock::Acquire locked(guard_raaqm_params_);
switch (socket_option_key) {
@@ -516,6 +547,10 @@ class ConsumerSocket : public Socket {
socket_option_value = interest_lifetime_;
break;
+ case GeneralTransportOptions::MAX_UNVERIFIED_TIME:
+ socket_option_value = max_unverified_delay_;
+ break;
+
case RaaqmTransportOptions::SAMPLE_NUMBER:
socket_option_value = sample_number_;
break;
@@ -532,6 +567,10 @@ class ConsumerSocket : public Socket {
socket_option_value = timer_interval_milliseconds_;
break;
+ case RtcTransportOptions::RECOVERY_STRATEGY:
+ socket_option_value = recovery_strategy_;
+ break;
+
default:
return SOCKET_OPTION_NOT_GET;
}
@@ -553,6 +592,10 @@ class ConsumerSocket : public Socket {
socket_option_value = reset_window_;
break;
+ case RtcTransportOptions::AGGREGATED_DATA:
+ socket_option_value = aggregated_data_;
+ break;
+
default:
return SOCKET_OPTION_NOT_GET;
}
@@ -628,20 +671,6 @@ class ConsumerSocket : public Socket {
}
int getSocketOption(int socket_option_key,
- std::shared_ptr<core::Portal> &socket_option_value) {
- switch (socket_option_key) {
- case PORTAL:
- socket_option_value = portal_;
- break;
-
- default:
- return SOCKET_OPTION_NOT_GET;
- }
-
- return SOCKET_OPTION_GET;
- }
-
- int getSocketOption(int socket_option_key,
IcnObserver **socket_option_value) {
utils::SpinLock::Acquire locked(guard_raaqm_params_);
switch (socket_option_key) {
@@ -665,7 +694,6 @@ class ConsumerSocket : public Socket {
default:
return SOCKET_OPTION_NOT_GET;
}
-
return SOCKET_OPTION_GET;
}
@@ -674,6 +702,9 @@ class ConsumerSocket : public Socket {
case DataLinkOptions::OUTPUT_INTERFACE:
socket_option_value = output_interface_;
break;
+ case GeneralTransportOptions::FEC_TYPE:
+ socket_option_value = fec_setting_;
+ break;
default:
return SOCKET_OPTION_NOT_GET;
}
@@ -714,6 +745,29 @@ class ConsumerSocket : public Socket {
});
}
+ int getSocketOption(int socket_option_key,
+ StrategyCallback **socket_option_value) {
+ // Reschedule the function on the io_service to avoid race condition in
+ // case setSocketOption is called while the io_service is running.
+ return rescheduleOnIOService(
+ socket_option_key, socket_option_value,
+ [this](int socket_option_key,
+ StrategyCallback **socket_option_value) -> int {
+ switch (socket_option_key) {
+ case ConsumerCallbacksOptions::FWD_STRATEGY_CHANGE:
+ *socket_option_value = &on_fwd_strategy_;
+ break;
+ case ConsumerCallbacksOptions::REC_STRATEGY_CHANGE:
+ *socket_option_value = &on_rec_strategy_;
+ break;
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+
+ return SOCKET_OPTION_GET;
+ });
+ }
+
protected:
template <typename Lambda, typename arg2>
int rescheduleOnIOService(int socket_option_key, arg2 socket_option_value,
@@ -726,9 +780,9 @@ class ConsumerSocket : public Socket {
/* Condition variable for the wait */
std::condition_variable cv;
bool done = false;
- portal_->getIoService().dispatch([&socket_option_key,
- &socket_option_value, &mtx, &cv,
- &result, &done, &func]() {
+ portal_->getThread().tryRunHandlerNow([&socket_option_key,
+ &socket_option_value, &mtx, &cv,
+ &result, &done, &func]() {
std::unique_lock<std::mutex> lck(mtx);
done = true;
result = func(socket_option_key, socket_option_value);
@@ -748,8 +802,6 @@ class ConsumerSocket : public Socket {
protected:
interface::ConsumerSocket *consumer_interface_;
- utils::EventThread async_downloader_;
-
// No need to protect from multiple accesses in the async consumer
// The parameter is accessible only with a getSocketOption and
// set from the consume
@@ -776,6 +828,7 @@ class ConsumerSocket : public Socket {
int rate_estimation_choice_;
// Verification parameters
+ int max_unverified_delay_;
std::shared_ptr<auth::Verifier> verifier_;
transport::auth::KeyId *key_id_;
std::atomic_bool verify_signature_;
@@ -787,17 +840,26 @@ class ConsumerSocket : public Socket {
ConsumerInterestCallback on_interest_satisfied_;
ConsumerContentObjectCallback on_content_object_input_;
ConsumerTimerCallback stats_summary_;
+ StrategyCallback on_fwd_strategy_;
+ StrategyCallback on_rec_strategy_;
ReadCallback *read_callback_;
uint32_t timer_interval_milliseconds_;
// Transport protocol
- std::unique_ptr<protocol::TransportProtocol> transport_protocol_;
+ std::shared_ptr<protocol::TransportProtocol> transport_protocol_;
// Statistic
TransportStatistics stats_;
+ // RTC protocol
+ RtcTransportRecoveryStrategies recovery_strategy_;
+ bool aggregated_data_;
+
+ // FEC setting
+ std::string fec_setting_;
+
utils::SpinLock guard_raaqm_params_;
std::string output_interface_;
};
diff --git a/libtransport/src/implementation/socket_producer.h b/libtransport/src/implementation/socket_producer.h
index 9daf79b9d..37151d497 100644
--- a/libtransport/src/implementation/socket_producer.h
+++ b/libtransport/src/implementation/socket_producer.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -40,6 +40,7 @@ namespace implementation {
using namespace core;
using namespace interface;
+using ProducerCallback = interface::ProducerSocket::Callback;
class ProducerSocket : public Socket {
private:
@@ -48,11 +49,14 @@ class ProducerSocket : public Socket {
: Socket(std::move(portal)),
producer_interface_(producer_socket),
data_packet_size_(default_values::content_object_packet_size),
+ max_segment_size_(default_values::content_object_packet_size),
content_object_expiry_time_(default_values::content_object_expiry_time),
- async_thread_(),
- making_manifest_(false),
+ making_manifest_(default_values::manifest_capacity),
hash_algorithm_(auth::CryptoHashType::SHA256),
- suffix_strategy_(core::NextSegmentCalculationStrategy::INCREMENTAL),
+ signer_(std::make_shared<auth::VoidSigner>()),
+ suffix_strategy_(std::make_shared<utils::IncrementalSuffixStrategy>(0)),
+ aggregated_data_(false),
+ fec_setting_(""),
on_interest_input_(VOID_HANDLER),
on_interest_dropped_input_buffer_(VOID_HANDLER),
on_interest_inserted_input_buffer_(VOID_HANDLER),
@@ -63,29 +67,30 @@ class ProducerSocket : public Socket {
on_content_object_in_output_buffer_(VOID_HANDLER),
on_content_object_output_(VOID_HANDLER),
on_content_object_evicted_from_output_buffer_(VOID_HANDLER),
- on_content_produced_(VOID_HANDLER) {
+ on_content_produced_(VOID_HANDLER),
+ application_callback_(nullptr) {
switch (protocol) {
case ProductionProtocolAlgorithms::RTC_PROD:
production_protocol_ =
- std::make_unique<protocol::RTCProductionProtocol>(this);
+ std::make_shared<protocol::RTCProductionProtocol>(this);
break;
case ProductionProtocolAlgorithms::BYTE_STREAM:
default:
production_protocol_ =
- std::make_unique<protocol::ByteStreamProductionProtocol>(this);
+ std::make_shared<protocol::ByteStreamProductionProtocol>(this);
break;
}
}
public:
ProducerSocket(interface::ProducerSocket *producer, int protocol)
- : ProducerSocket(producer, protocol, std::make_shared<core::Portal>()) {}
+ : ProducerSocket(producer, protocol, core::Portal::createShared()) {
+ is_async_ = true;
+ }
ProducerSocket(interface::ProducerSocket *producer, int protocol,
- asio::io_service &io_service)
- : ProducerSocket(producer, protocol,
- std::make_shared<core::Portal>(io_service)) {
- is_async_ = true;
+ ::utils::EventThread &worker)
+ : ProducerSocket(producer, protocol, core::Portal::createShared(worker)) {
}
virtual ~ProducerSocket() {}
@@ -98,31 +103,9 @@ class ProducerSocket : public Socket {
producer_interface_ = producer_socket;
}
- void connect() override {
- portal_->connect(false);
- production_protocol_->start();
- }
-
- bool isRunning() override { return !production_protocol_->isRunning(); };
+ void connect() override { portal_->connect(false); }
- virtual void asyncProduce(Name content_name,
- std::unique_ptr<utils::MemBuf> &&buffer,
- bool is_last, uint32_t offset,
- uint32_t **last_segment = nullptr) {
- if (!async_thread_.stopped()) {
- auto a = buffer.release();
- async_thread_.add([this, content_name, a, is_last, offset,
- last_segment]() {
- auto buf = std::unique_ptr<utils::MemBuf>(a);
- if (last_segment != NULL) {
- **last_segment = offset + produceStream(content_name, std::move(buf),
- is_last, offset);
- } else {
- produceStream(content_name, std::move(buf), is_last, offset);
- }
- });
- }
- }
+ bool isRunning() override { return production_protocol_->isRunning(); };
virtual uint32_t produceStream(const Name &content_name,
std::unique_ptr<utils::MemBuf> &&buffer,
@@ -156,12 +139,38 @@ class ProducerSocket : public Socket {
production_protocol_->produce(content_object);
}
+ void sendMapme() { production_protocol_->sendMapme(); }
+
void registerPrefix(const Prefix &producer_namespace) {
- production_protocol_->registerNamespaceWithNetwork(producer_namespace);
+ portal_->registerRoute(producer_namespace);
}
+ void start() { production_protocol_->start(); }
void stop() { production_protocol_->stop(); }
+ using Socket::getSocketOption;
+ using Socket::setSocketOption;
+
+ virtual int setSocketOption(int socket_option_key,
+ ProducerCallback *socket_option_value) {
+ // Reschedule the function on the io_service to avoid race condition in
+ // case setSocketOption is called while the io_service is running.
+ return rescheduleOnIOService(
+ socket_option_key, socket_option_value,
+ [this](int socket_option_key,
+ ProducerCallback *socket_option_value) -> int {
+ switch (socket_option_key) {
+ case ProducerCallbacksOptions::PRODUCER_CALLBACK:
+ application_callback_ = socket_option_value;
+ break;
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+
+ return SOCKET_OPTION_SET;
+ });
+ }
+
virtual int setSocketOption(int socket_option_key,
uint32_t socket_option_value) {
switch (socket_option_key) {
@@ -172,6 +181,17 @@ class ProducerSocket : public Socket {
}
break;
+ case GeneralTransportOptions::MAKE_MANIFEST:
+ making_manifest_ = socket_option_value;
+ break;
+
+ case GeneralTransportOptions::MAX_SEGMENT_SIZE:
+ if (socket_option_value <= default_values::max_content_object_size &&
+ socket_option_value > 0) {
+ max_segment_size_ = socket_option_value;
+ }
+ break;
+
case GeneralTransportOptions::OUTPUT_BUFFER_SIZE:
production_protocol_->setOutputBufferSize(socket_option_value);
break;
@@ -260,8 +280,8 @@ class ProducerSocket : public Socket {
virtual int setSocketOption(int socket_option_key, bool socket_option_value) {
switch (socket_option_key) {
- case GeneralTransportOptions::MAKE_MANIFEST:
- making_manifest_ = socket_option_value;
+ case RtcTransportOptions::AGGREGATED_DATA:
+ aggregated_data_ = socket_option_value;
break;
default:
@@ -385,7 +405,7 @@ class ProducerSocket : public Socket {
virtual int setSocketOption(
int socket_option_key,
- core::NextSegmentCalculationStrategy socket_option_value) {
+ const std::shared_ptr<utils::SuffixStrategy> &socket_option_value) {
switch (socket_option_key) {
case GeneralTransportOptions::SUFFIX_STRATEGY:
suffix_strategy_ = socket_option_value;
@@ -413,9 +433,33 @@ class ProducerSocket : public Socket {
return SOCKET_OPTION_SET;
}
+ int getSocketOption(int socket_option_key,
+ ProducerCallback **socket_option_value) {
+ // Reschedule the function on the io_service to avoid race condition in
+ // case setSocketOption is called while the io_service is running.
+ return rescheduleOnIOService(
+ socket_option_key, socket_option_value,
+ [this](int socket_option_key,
+ ProducerCallback **socket_option_value) -> int {
+ switch (socket_option_key) {
+ case ProducerCallbacksOptions::PRODUCER_CALLBACK:
+ *socket_option_value = application_callback_;
+ break;
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+
+ return SOCKET_OPTION_GET;
+ });
+ }
+
virtual int getSocketOption(int socket_option_key,
uint32_t &socket_option_value) {
switch (socket_option_key) {
+ case GeneralTransportOptions::MAKE_MANIFEST:
+ socket_option_value = making_manifest_;
+ break;
+
case GeneralTransportOptions::OUTPUT_BUFFER_SIZE:
socket_option_value = production_protocol_->getOutputBufferSize();
break;
@@ -424,6 +468,10 @@ class ProducerSocket : public Socket {
socket_option_value = (uint32_t)data_packet_size_;
break;
+ case GeneralTransportOptions::MAX_SEGMENT_SIZE:
+ socket_option_value = (uint32_t)max_segment_size_;
+ break;
+
case GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME:
socket_option_value = content_object_expiry_time_;
break;
@@ -438,14 +486,14 @@ class ProducerSocket : public Socket {
virtual int getSocketOption(int socket_option_key,
bool &socket_option_value) {
switch (socket_option_key) {
- case GeneralTransportOptions::MAKE_MANIFEST:
- socket_option_value = making_manifest_;
- break;
-
case GeneralTransportOptions::ASYNC_MODE:
socket_option_value = is_async_;
break;
+ case RtcTransportOptions::AGGREGATED_DATA:
+ socket_option_value = aggregated_data_;
+ break;
+
default:
return SOCKET_OPTION_NOT_GET;
}
@@ -547,21 +595,6 @@ class ProducerSocket : public Socket {
});
}
- virtual int getSocketOption(
- int socket_option_key,
- std::shared_ptr<core::Portal> &socket_option_value) {
- switch (socket_option_key) {
- case PORTAL:
- socket_option_value = portal_;
- break;
- default:
- return SOCKET_OPTION_NOT_GET;
- ;
- }
-
- return SOCKET_OPTION_GET;
- }
-
virtual int getSocketOption(int socket_option_key,
auth::CryptoHashType &socket_option_value) {
switch (socket_option_key) {
@@ -577,7 +610,7 @@ class ProducerSocket : public Socket {
virtual int getSocketOption(
int socket_option_key,
- core::NextSegmentCalculationStrategy &socket_option_value) {
+ std::shared_ptr<utils::SuffixStrategy> &socket_option_value) {
switch (socket_option_key) {
case GeneralTransportOptions::SUFFIX_STRATEGY:
socket_option_value = suffix_strategy_;
@@ -603,9 +636,31 @@ class ProducerSocket : public Socket {
return SOCKET_OPTION_GET;
}
+ int getSocketOption(int socket_option_key, std::string &socket_option_value) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::FEC_TYPE:
+ socket_option_value = fec_setting_;
+ break;
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+
+ return SOCKET_OPTION_GET;
+ }
+
virtual int setSocketOption(int socket_option_key,
const std::string &socket_option_value) {
- return SOCKET_OPTION_NOT_SET;
+ int result = SOCKET_OPTION_NOT_SET;
+ switch (socket_option_key) {
+ case GeneralTransportOptions::FEC_TYPE:
+ fec_setting_ = socket_option_value;
+ result = SOCKET_OPTION_SET;
+ break;
+
+ default:
+ return result;
+ }
+ return result;
}
// If the thread calling lambda_func is not the same of io_service, this
@@ -623,9 +678,9 @@ class ProducerSocket : public Socket {
std::condition_variable cv;
bool done = false;
- portal_->getIoService().dispatch([&socket_option_key,
- &socket_option_value, &mtx, &cv,
- &result, &done, &func]() {
+ portal_->getThread().tryRunHandlerNow([&socket_option_key,
+ &socket_option_value, &mtx, &cv,
+ &result, &done, &func]() {
std::unique_lock<std::mutex> lck(mtx);
done = true;
result = func(socket_option_key, socket_option_value);
@@ -655,9 +710,9 @@ class ProducerSocket : public Socket {
/* Condition variable for the wait */
std::condition_variable cv;
bool done = false;
- portal_->getIoService().dispatch([&socket_option_key,
- &socket_option_value, &mtx, &cv,
- &result, &done, &func]() {
+ portal_->getThread().tryRunHandlerNow([&socket_option_key,
+ &socket_option_value, &mtx, &cv,
+ &result, &done, &func]() {
std::unique_lock<std::mutex> lck(mtx);
done = true;
result = func(socket_option_key, socket_option_value);
@@ -677,20 +732,24 @@ class ProducerSocket : public Socket {
// Threads
protected:
interface::ProducerSocket *producer_interface_;
- asio::io_service io_service_;
std::atomic<size_t> data_packet_size_;
+ std::atomic<size_t> max_segment_size_;
std::atomic<uint32_t> content_object_expiry_time_;
- utils::EventThread async_thread_;
-
- std::atomic<bool> making_manifest_;
+ std::atomic<uint32_t> making_manifest_;
std::atomic<auth::CryptoHashType> hash_algorithm_;
std::atomic<auth::CryptoSuite> crypto_suite_;
utils::SpinLock signer_lock_;
std::shared_ptr<auth::Signer> signer_;
- core::NextSegmentCalculationStrategy suffix_strategy_;
+ std::shared_ptr<utils::SuffixStrategy> suffix_strategy_;
+
+ std::shared_ptr<protocol::ProductionProtocol> production_protocol_;
- std::unique_ptr<protocol::ProductionProtocol> production_protocol_;
+ // RTC transport
+ bool aggregated_data_;
+
+ // FEC setting
+ std::string fec_setting_;
// callbacks
ProducerInterestCallback on_interest_input_;
@@ -706,6 +765,8 @@ class ProducerSocket : public Socket {
ProducerContentObjectCallback on_content_object_evicted_from_output_buffer_;
ProducerContentCallback on_content_produced_;
+
+ ProducerCallback *application_callback_;
};
} // namespace implementation
diff --git a/libtransport/src/implementation/tls_rtc_socket_producer.cc b/libtransport/src/implementation/tls_rtc_socket_producer.cc
index db62b10c1..06d613ef0 100644
--- a/libtransport/src/implementation/tls_rtc_socket_producer.cc
+++ b/libtransport/src/implementation/tls_rtc_socket_producer.cc
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -90,11 +90,11 @@ int TLSRTCProducerSocket::writeOld(BIO *b, const char *buf, int num) {
socket = (TLSRTCProducerSocket *)BIO_get_data(b);
if (socket->getHandshakeState() != SERVER_FINISHED && socket->first_) {
- bool making_manifest = socket->parent_->making_manifest_;
+ uint32_t making_manifest = socket->parent_->making_manifest_;
socket->tls_chunks_--;
socket->parent_->setSocketOption(GeneralTransportOptions::MAKE_MANIFEST,
- false);
+ 0U);
socket->parent_->ProducerSocket::produce(
socket->name_, (const uint8_t *)buf, num, socket->tls_chunks_ == 0, 0);
socket->parent_->setSocketOption(GeneralTransportOptions::MAKE_MANIFEST,
diff --git a/libtransport/src/implementation/tls_rtc_socket_producer.h b/libtransport/src/implementation/tls_rtc_socket_producer.h
index 92c657afc..f6dc425e4 100644
--- a/libtransport/src/implementation/tls_rtc_socket_producer.h
+++ b/libtransport/src/implementation/tls_rtc_socket_producer.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
diff --git a/libtransport/src/implementation/tls_socket_consumer.cc b/libtransport/src/implementation/tls_socket_consumer.cc
index 65472b41d..b368c4b88 100644
--- a/libtransport/src/implementation/tls_socket_consumer.cc
+++ b/libtransport/src/implementation/tls_socket_consumer.cc
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -43,7 +43,7 @@ int readOldTLS(BIO *b, char *buf, int size) {
if (!socket->something_to_read_) {
if (!socket->transport_protocol_->isRunning()) {
socket->network_name_.setSuffix(socket->random_suffix_);
- socket->ConsumerSocket::asyncConsume(socket->network_name_);
+ socket->ConsumerSocket::consume(socket->network_name_);
}
if (!socket->something_to_read_) socket->cv_.wait(lck);
@@ -284,31 +284,6 @@ int TLSConsumerSocket::download_content(const Name &name) {
return CONSUMER_FINISHED;
}
-int TLSConsumerSocket::asyncConsume(const Name &name,
- std::unique_ptr<utils::MemBuf> &&buffer) {
- this->payload_ = std::move(buffer);
-
- this->ConsumerSocket::setSocketOption(
- ConsumerCallbacksOptions::INTEREST_OUTPUT,
- (ConsumerInterestCallback)std::bind(
- &TLSConsumerSocket::setInterestPayload, this, std::placeholders::_1,
- std::placeholders::_2));
-
- return asyncConsume(name);
-}
-
-int TLSConsumerSocket::asyncConsume(const Name &name) {
- if ((SSL_in_before(this->ssl_) || SSL_in_init(this->ssl_))) {
- throw errors::RuntimeException("Handshake not performed");
- }
-
- if (!async_downloader_tls_.stopped()) {
- async_downloader_tls_.add([this, name]() { download_content(name); });
- }
-
- return CONSUMER_RUNNING;
-}
-
void TLSConsumerSocket::registerPrefix(const Prefix &producer_namespace) {
producer_namespace_ = producer_namespace;
}
@@ -353,7 +328,7 @@ void TLSConsumerSocket::readBufferAvailable(
cv_.notify_one();
}
-void TLSConsumerSocket::readError(const std::error_code ec) noexcept {}
+void TLSConsumerSocket::readError(const std::error_code &ec) noexcept {}
void TLSConsumerSocket::readSuccess(std::size_t total_size) noexcept {
std::unique_lock<std::mutex> lck(this->mtx_);
diff --git a/libtransport/src/implementation/tls_socket_consumer.h b/libtransport/src/implementation/tls_socket_consumer.h
index be08ec47d..a74f1ee10 100644
--- a/libtransport/src/implementation/tls_socket_consumer.h
+++ b/libtransport/src/implementation/tls_socket_consumer.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -47,9 +47,6 @@ class TLSConsumerSocket : public ConsumerSocket,
int consume(const Name &name, std::unique_ptr<utils::MemBuf> &&buffer);
int consume(const Name &name) override;
- int asyncConsume(const Name &name, std::unique_ptr<utils::MemBuf> &&buffer);
- int asyncConsume(const Name &name) override;
-
void registerPrefix(const Prefix &producer_namespace);
int setSocketOption(
@@ -100,7 +97,7 @@ class TLSConsumerSocket : public ConsumerSocket,
virtual void readBufferAvailable(
std::unique_ptr<utils::MemBuf> &&buffer) noexcept override;
- virtual void readError(const std::error_code ec) noexcept override;
+ virtual void readError(const std::error_code &ec) noexcept override;
virtual void readSuccess(std::size_t total_size) noexcept override;
diff --git a/libtransport/src/implementation/tls_socket_producer.cc b/libtransport/src/implementation/tls_socket_producer.cc
index 3992ca45c..47f3b43a6 100644
--- a/libtransport/src/implementation/tls_socket_producer.cc
+++ b/libtransport/src/implementation/tls_socket_producer.cc
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2020 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -99,12 +99,12 @@ int TLSProducerSocket::writeOld(BIO *b, const char *buf, int num) {
socket = (TLSProducerSocket *)BIO_get_data(b);
if (socket->getHandshakeState() != SERVER_FINISHED && socket->first_) {
- bool making_manifest = socket->parent_->making_manifest_;
+ uint32_t making_manifest = socket->parent_->making_manifest_;
//! socket->tls_chunks_ corresponds to is_last
socket->tls_chunks_--;
socket->parent_->setSocketOption(GeneralTransportOptions::MAKE_MANIFEST,
- false);
+ 0U);
socket->parent_->ProducerSocket::produceStream(
socket->name_, (const uint8_t *)buf, num, socket->tls_chunks_ == 0,
socket->last_segment_);
@@ -358,7 +358,7 @@ uint32_t TLSProducerSocket::produceStream(
}
size_t buf_size = buffer->length();
- name_ = production_protocol_->getNamespaces().front().mapName(content_name);
+ name_ = portal_->getServedNamespaces().begin()->mapName(content_name);
tls_chunks_ = to_call_oncontentproduced_ =
(int)ceil((float)buf_size / (float)SSL3_RT_MAX_PLAIN_LENGTH);
@@ -394,8 +394,7 @@ int TLSProducerSocket::addHicnKeyIdCb(SSL *s, unsigned int ext_type,
<< "On addHicnKeyIdCb, for the prefix registration.";
if (ext_type == 100) {
- auto &prefix =
- socket->parent_->production_protocol_->getNamespaces().front();
+ auto &prefix = *socket->parent_->portal_->getServedNamespaces().begin();
const ip_prefix_t &ip_prefix = prefix.toIpPrefixStruct();
int inet_family = prefix.getAddressFamily();
uint16_t prefix_len_bits = prefix.getPrefixLength();
diff --git a/libtransport/src/implementation/tls_socket_producer.h b/libtransport/src/implementation/tls_socket_producer.h
index a542a4d9f..0e958b321 100644
--- a/libtransport/src/implementation/tls_socket_producer.h
+++ b/libtransport/src/implementation/tls_socket_producer.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at: