aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src
diff options
context:
space:
mode:
authorOlivier Roques <oroques+fdio@cisco.com>2020-04-08 15:29:55 +0200
committerOlivier Roques <oroques+fdio@cisco.com>2020-04-11 17:25:30 +0200
commiteb9119968cfc53f41526981924e5c8d44612f98a (patch)
tree065b282b91e48fc62a01f5de5a5fe1bd29092c5c /libtransport/src
parent0ea5735b98f38beacf92dfdca74b7a6d5b3f7182 (diff)
[HICN-595] Bring TLS up to date
HICN-2 would enable TLS only if OpenSSL 1.1.1 was present. However the mechanism to do so was broken and hiperf always ended up using normal consumer and producer sockets. This patch fixes that by updating the build files. It also fixes various bugs in the TLS implementation that went unnoticed and cleans up the code. Change-Id: Ifda75a9929e14460af43fe79d737d0c926bb671e Signed-off-by: Olivier Roques <oroques+fdio@cisco.com> Signed-off-by: Mauro Sardara <msardara@cisco.com>
Diffstat (limited to 'libtransport/src')
-rw-r--r--libtransport/src/core/prefix.cc4
-rw-r--r--libtransport/src/implementation/p2psecure_socket_consumer.cc149
-rw-r--r--libtransport/src/implementation/p2psecure_socket_consumer.h18
-rw-r--r--libtransport/src/implementation/p2psecure_socket_producer.cc198
-rw-r--r--libtransport/src/implementation/p2psecure_socket_producer.h17
-rw-r--r--libtransport/src/implementation/socket_producer.h28
-rw-r--r--libtransport/src/implementation/tls_rtc_socket_producer.cc31
-rw-r--r--libtransport/src/implementation/tls_socket_consumer.cc52
-rw-r--r--libtransport/src/implementation/tls_socket_consumer.h21
-rw-r--r--libtransport/src/implementation/tls_socket_producer.cc159
-rw-r--r--libtransport/src/implementation/tls_socket_producer.h26
-rw-r--r--libtransport/src/interfaces/p2psecure_socket_consumer.cc10
-rw-r--r--libtransport/src/interfaces/tls_rtc_socket_producer.h4
-rw-r--r--libtransport/src/protocols/consumer.conf2
-rw-r--r--libtransport/src/protocols/incremental_indexer.cc4
-rw-r--r--libtransport/src/protocols/manifest_incremental_indexer.cc4
-rw-r--r--libtransport/src/protocols/raaqm.cc5
17 files changed, 362 insertions, 370 deletions
diff --git a/libtransport/src/core/prefix.cc b/libtransport/src/core/prefix.cc
index 30f780461..eea4aeb8b 100644
--- a/libtransport/src/core/prefix.cc
+++ b/libtransport/src/core/prefix.cc
@@ -211,8 +211,8 @@ Name Prefix::getName(const core::Name &mask, const core::Name &components,
}
}
- if (this->contains(name_ip))
- throw errors::RuntimeException("Mask overrides the prefix");
+ // if (this->contains(name_ip))
+ // throw errors::RuntimeException("Mask overrides the prefix");
return Name(ip_prefix_.family, (uint8_t *)&name_ip);
}
diff --git a/libtransport/src/implementation/p2psecure_socket_consumer.cc b/libtransport/src/implementation/p2psecure_socket_consumer.cc
index 40ab58161..9b79850d6 100644
--- a/libtransport/src/implementation/p2psecure_socket_consumer.cc
+++ b/libtransport/src/implementation/p2psecure_socket_consumer.cc
@@ -33,10 +33,6 @@ void P2PSecureConsumerSocket::setInterestPayload(
if (payload_ != NULL) int2.appendPayload(std::move(payload_));
}
-// implement void readBufferAvailable(), size_t maxBufferSize() const override,
-// void readError(), void readSuccess(). getReadBuffer() and readDataAvailable()
-// must be implemented even if empty.
-
/* Return the number of read bytes in the return param */
int readOld(BIO *b, char *buf, int size) {
if (size < 0) return size;
@@ -51,11 +47,13 @@ int readOld(BIO *b, char *buf, int size) {
socket->network_name_.setSuffix(socket->random_suffix_);
socket->ConsumerSocket::asyncConsume(socket->network_name_);
}
+
if (!socket->something_to_read_) socket->cv_.wait(lck);
}
size_t size_to_read, read;
size_t chain_size = socket->head_->length();
+
if (socket->head_->isChained())
chain_size = socket->head_->computeChainDataLength();
@@ -106,6 +104,7 @@ int writeOld(BIO *b, const char *buf, int num) {
socket = (P2PSecureConsumerSocket *)BIO_get_data(b);
socket->payload_ = utils::MemBuf::copyBuffer(buf, num);
+
socket->ConsumerSocket::setSocketOption(
ConsumerCallbacksOptions::INTEREST_OUTPUT,
(ConsumerInterestCallback)std::bind(
@@ -173,9 +172,9 @@ int P2PSecureConsumerSocket::parseHicnKeyIdCb(SSL *s, unsigned int ext_type,
P2PSecureConsumerSocket::P2PSecureConsumerSocket(
interface::ConsumerSocket *consumer, int handshake_protocol,
int transport_protocol)
- : ConsumerSocket(consumer, transport_protocol),
+ : ConsumerSocket(consumer, handshake_protocol),
name_(),
- tls_consumer_(),
+ tls_consumer_(nullptr),
buf_pool_(),
decrypted_content_(),
payload_(),
@@ -224,12 +223,6 @@ P2PSecureConsumerSocket::P2PSecureConsumerSocket(
BIO_set_data(bio, this);
SSL_set_bio(ssl_, bio, bio);
- ConsumerSocket::getSocketOption(MAX_WINDOW_SIZE, old_max_win_);
- ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, (double)1.0);
-
- ConsumerSocket::getSocketOption(CURRENT_WINDOW_SIZE, old_current_win_);
- ConsumerSocket::setSocketOption(CURRENT_WINDOW_SIZE, (double)1.0);
-
std::default_random_engine generator;
std::uniform_int_distribution<int> distribution(
1, std::numeric_limits<uint32_t>::max());
@@ -244,76 +237,29 @@ P2PSecureConsumerSocket::~P2PSecureConsumerSocket() {
SSL_shutdown(ssl_);
}
-int P2PSecureConsumerSocket::consume(const Name &name) {
- if (transport_protocol_->isRunning()) {
- return CONSUMER_BUSY;
- }
+int P2PSecureConsumerSocket::handshake() {
+ int result = 1;
- if ((SSL_in_before(this->ssl_) || SSL_in_init(this->ssl_))) {
- ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, (double)1.0);
- network_name_ = producer_namespace_.getRandomName();
- network_name_.setSuffix(0);
- int result = SSL_connect(this->ssl_);
- ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, old_max_win_);
- ConsumerSocket::setSocketOption(CURRENT_WINDOW_SIZE, old_current_win_);
- if (result != 1)
- throw errors::RuntimeException("Unable to perform client handshake");
+ if (!(SSL_in_before(this->ssl_) || SSL_in_init(this->ssl_))) {
+ return 1;
}
- std::shared_ptr<Name> prefix_name = std::make_shared<Name>(
- secure_prefix_.family,
- ip_address_get_buffer(&(secure_prefix_.address), secure_prefix_.family));
- std::shared_ptr<Prefix> prefix =
- std::make_shared<Prefix>(*prefix_name, secure_prefix_.len);
- TLSConsumerSocket tls_consumer(nullptr, this->protocol_, this->ssl_);
- tls_consumer.setInterface(new interface::TLSConsumerSocket(&tls_consumer));
- ConsumerTimerCallback *stats_summary_callback = nullptr;
- this->getSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY,
- &stats_summary_callback);
+ ConsumerSocket::getSocketOption(MAX_WINDOW_SIZE, old_max_win_);
+ ConsumerSocket::getSocketOption(CURRENT_WINDOW_SIZE, old_current_win_);
- uint32_t lifetime;
- this->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, lifetime);
- tls_consumer.setSocketOption(GeneralTransportOptions::INTEREST_LIFETIME,
- lifetime);
- tls_consumer.setSocketOption(ConsumerCallbacksOptions::READ_CALLBACK,
- read_callback_decrypted_);
- tls_consumer.setSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY,
- *stats_summary_callback);
- tls_consumer.setSocketOption(GeneralTransportOptions::STATS_INTERVAL,
- this->timer_interval_milliseconds_);
- tls_consumer.setSocketOption(MAX_WINDOW_SIZE, old_max_win_);
- tls_consumer.setSocketOption(CURRENT_WINDOW_SIZE, old_current_win_);
- tls_consumer.connect();
+ ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, (double)1.0);
+ ConsumerSocket::setSocketOption(CURRENT_WINDOW_SIZE, (double)1.0);
- if (payload_ != NULL)
- return tls_consumer.consume((prefix->mapName(name)), std::move(payload_));
- else
- return tls_consumer.consume((prefix->mapName(name)));
-}
+ network_name_ = producer_namespace_.getRandomName();
+ network_name_.setSuffix(0);
-int P2PSecureConsumerSocket::asyncConsume(const Name &name) {
- if ((SSL_in_before(this->ssl_) || SSL_in_init(this->ssl_))) {
- ConsumerSocket::setSocketOption(CURRENT_WINDOW_SIZE, (double)1.0);
- ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, (double)1.0);
- network_name_ = producer_namespace_.getRandomName();
- network_name_.setSuffix(0);
- TRANSPORT_LOGD("Start handshake at %s", network_name_.toString().c_str());
- interface::ConsumerSocket::ReadCallback *on_payload = VOID_HANDLER;
- this->getSocketOption(ConsumerCallbacksOptions::READ_CALLBACK, &on_payload);
- int result = SSL_connect(this->ssl_);
- ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, old_max_win_);
- ConsumerSocket::setSocketOption(CURRENT_WINDOW_SIZE, old_current_win_);
- if (result != 1)
- throw errors::RuntimeException("Unable to perform client handshake");
- TRANSPORT_LOGD("Handshake performed!");
- }
+ TRANSPORT_LOGD("Start handshake at %s", network_name_.toString().c_str());
+ result = SSL_connect(this->ssl_);
- std::shared_ptr<Name> prefix_name = std::make_shared<Name>(
- secure_prefix_.family,
- ip_address_get_buffer(&(secure_prefix_.address), secure_prefix_.family));
- std::shared_ptr<Prefix> prefix =
- std::make_shared<Prefix>(*prefix_name, secure_prefix_.len);
+ return result;
+}
+void P2PSecureConsumerSocket::initSessionSocket() {
tls_consumer_ =
std::make_shared<TLSConsumerSocket>(nullptr, this->protocol_, this->ssl_);
tls_consumer_->setInterface(
@@ -325,6 +271,7 @@ int P2PSecureConsumerSocket::asyncConsume(const Name &name) {
uint32_t lifetime;
this->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, lifetime);
+
tls_consumer_->setSocketOption(GeneralTransportOptions::INTEREST_LIFETIME,
lifetime);
tls_consumer_->setSocketOption(ConsumerCallbacksOptions::READ_CALLBACK,
@@ -336,6 +283,59 @@ int P2PSecureConsumerSocket::asyncConsume(const Name &name) {
tls_consumer_->setSocketOption(MAX_WINDOW_SIZE, old_max_win_);
tls_consumer_->setSocketOption(CURRENT_WINDOW_SIZE, old_current_win_);
tls_consumer_->connect();
+}
+
+int P2PSecureConsumerSocket::consume(const Name &name) {
+ if (transport_protocol_->isRunning()) {
+ return CONSUMER_BUSY;
+ }
+
+ if (handshake() != 1) {
+ throw errors::RuntimeException("Unable to perform client handshake");
+ } else {
+ TRANSPORT_LOGD("Handshake performed!");
+ }
+
+ initSessionSocket();
+
+ if (tls_consumer_ == nullptr) {
+ throw errors::RuntimeException("TLS socket does not exist");
+ }
+
+ std::shared_ptr<Name> prefix_name = std::make_shared<Name>(
+ secure_prefix_.family,
+ ip_address_get_buffer(&(secure_prefix_.address), secure_prefix_.family));
+ std::shared_ptr<Prefix> prefix =
+ std::make_shared<Prefix>(*prefix_name, secure_prefix_.len);
+
+ if (payload_ != nullptr)
+ return tls_consumer_->consume((prefix->mapName(name)), std::move(payload_));
+ else
+ return tls_consumer_->consume((prefix->mapName(name)));
+}
+
+int P2PSecureConsumerSocket::asyncConsume(const Name &name) {
+ if (transport_protocol_->isRunning()) {
+ return CONSUMER_BUSY;
+ }
+
+ if (handshake() != 1) {
+ throw errors::RuntimeException("Unable to perform client handshake");
+ } else {
+ TRANSPORT_LOGD("Handshake performed!");
+ }
+
+ initSessionSocket();
+
+ if (tls_consumer_ == nullptr) {
+ throw errors::RuntimeException("TLS socket does not exist");
+ }
+
+ std::shared_ptr<Name> prefix_name = std::make_shared<Name>(
+ secure_prefix_.family,
+ ip_address_get_buffer(&(secure_prefix_.address), secure_prefix_.family));
+ std::shared_ptr<Prefix> prefix =
+ std::make_shared<Prefix>(*prefix_name, secure_prefix_.len);
if (payload_ != NULL)
return tls_consumer_->asyncConsume((prefix->mapName(name)),
@@ -399,5 +399,4 @@ void P2PSecureConsumerSocket::readSuccess(std::size_t total_size) noexcept {
bool P2PSecureConsumerSocket::isBufferMovable() noexcept { return true; }
} // namespace implementation
-
} // namespace transport
diff --git a/libtransport/src/implementation/p2psecure_socket_consumer.h b/libtransport/src/implementation/p2psecure_socket_consumer.h
index e2ebaf94e..d4c3b26c2 100644
--- a/libtransport/src/implementation/p2psecure_socket_consumer.h
+++ b/libtransport/src/implementation/p2psecure_socket_consumer.h
@@ -69,39 +69,26 @@ class P2PSecureConsumerSocket : public ConsumerSocket,
private:
Name name_;
std::shared_ptr<TLSConsumerSocket> tls_consumer_;
-
/* SSL handle */
SSL *ssl_;
SSL_CTX *ctx_;
BIO_METHOD *bio_meth_;
-
/* Chain of MemBuf to be used as a temporary buffer to pass descypted data
* from the underlying layer to the application */
utils::ObjectPool<utils::MemBuf> buf_pool_;
std::unique_ptr<utils::MemBuf> decrypted_content_;
-
/* Chain of MemBuf holding the payload to be written into interest or data */
std::unique_ptr<utils::MemBuf> payload_;
-
/* Chain of MemBuf holding the data retrieved from the underlying layer */
std::unique_ptr<utils::MemBuf> head_;
-
bool something_to_read_;
-
bool content_downloaded_;
-
double old_max_win_;
-
double old_current_win_;
-
uint32_t random_suffix_;
-
ip_prefix_t secure_prefix_;
-
Prefix producer_namespace_;
-
interface::ConsumerSocket::ReadCallback *read_callback_decrypted_;
-
std::mutex mtx_;
/* Condition variable for the wait */
@@ -138,9 +125,12 @@ class P2PSecureConsumerSocket : public ConsumerSocket,
virtual void readError(const std::error_code ec) noexcept override;
virtual void readSuccess(std::size_t total_size) noexcept override;
+
virtual bool isBufferMovable() noexcept override;
- int download_content(const Name &name);
+ int handshake();
+
+ void initSessionSocket();
};
} // namespace implementation
diff --git a/libtransport/src/implementation/p2psecure_socket_producer.cc b/libtransport/src/implementation/p2psecure_socket_producer.cc
index d0852539a..15c7d25cd 100644
--- a/libtransport/src/implementation/p2psecure_socket_producer.cc
+++ b/libtransport/src/implementation/p2psecure_socket_producer.cc
@@ -37,9 +37,8 @@ P2PSecureProducerSocket::P2PSecureProducerSocket(
: ProducerSocket(producer_socket),
mtx_(),
cv_(),
- map_secure_producers(),
- map_secure_rtc_producers(),
- list_secure_producers() {}
+ map_producers(),
+ list_producers() {}
P2PSecureProducerSocket::P2PSecureProducerSocket(
interface::ProducerSocket *producer_socket, bool rtc,
@@ -48,12 +47,9 @@ P2PSecureProducerSocket::P2PSecureProducerSocket(
rtc_(rtc),
mtx_(),
cv_(),
- map_secure_producers(),
- map_secure_rtc_producers(),
- list_secure_producers() {
- /*
- * Setup SSL context (identity and parameter to use TLS 1.3)
- */
+ map_producers(),
+ list_producers() {
+ /* Setup SSL context (identity and parameter to use TLS 1.3) */
der_cert_ = parcKeyStore_GetDEREncodedCertificate(
(identity->getSigner()->getKeyStore()));
der_prk_ = parcKeyStore_GetDEREncodedPrivateKey(
@@ -68,10 +64,8 @@ P2PSecureProducerSocket::P2PSecureProducerSocket(
cert_509_ = d2i_X509(NULL, &cert, cert_size);
pkey_rsa_ = d2i_AutoPrivateKey(NULL, &prk, prk_size);
- /*
- * Set the callback so that when an interest is received we catch it and we
- * decrypt the payload before passing it to the application.
- */
+ /* Set the callback so that when an interest is received we catch it and we
+ * decrypt the payload before passing it to the application. */
ProducerSocket::setSocketOption(
ProducerCallbacksOptions::INTEREST_INPUT,
(ProducerInterestCallback)std::bind(
@@ -84,53 +78,61 @@ P2PSecureProducerSocket::~P2PSecureProducerSocket() {
if (der_prk_) parcBuffer_Release(&der_prk_);
}
+void P2PSecureProducerSocket::initSessionSocket(
+ std::unique_ptr<TLSProducerSocket> &producer) {
+ producer->on_content_produced_application_ =
+ this->on_content_produced_application_;
+ producer->setSocketOption(CONTENT_OBJECT_EXPIRY_TIME,
+ this->content_object_expiry_time_);
+ producer->setSocketOption(SIGNER, this->signer_);
+ producer->setSocketOption(MAKE_MANIFEST, this->making_manifest_);
+ producer->setSocketOption(DATA_PACKET_SIZE,
+ (uint32_t)(this->data_packet_size_));
+ producer->output_buffer_.setLimit(this->output_buffer_.getLimit());
+
+ if (!rtc_) {
+ producer->setInterface(new interface::TLSProducerSocket(producer.get()));
+ } else {
+ TLSRTCProducerSocket *rtc_producer =
+ dynamic_cast<TLSRTCProducerSocket *>(producer.get());
+ rtc_producer->setInterface(
+ new interface::TLSRTCProducerSocket(rtc_producer));
+ }
+}
+
void P2PSecureProducerSocket::onInterestCallback(interface::ProducerSocket &p,
Interest &interest) {
std::unique_lock<std::mutex> lck(mtx_);
+ std::unique_ptr<TLSProducerSocket> tls_producer;
+ auto it = map_producers.find(interest.getName());
+
+ if (it != map_producers.end()) {
+ return;
+ }
+
+ if (!rtc_) {
+ tls_producer =
+ std::make_unique<TLSProducerSocket>(nullptr, this, interest.getName());
+ } else {
+ tls_producer = std::make_unique<TLSRTCProducerSocket>(nullptr, this,
+ interest.getName());
+ }
+
+ initSessionSocket(tls_producer);
+ TLSProducerSocket *tls_producer_ptr = tls_producer.get();
+ map_producers.insert({interest.getName(), move(tls_producer)});
TRANSPORT_LOGD("Start handshake at %s",
interest.getName().toString().c_str());
+
if (!rtc_) {
- auto it = map_secure_producers.find(interest.getName());
- if (it != map_secure_producers.end()) return;
- TLSProducerSocket *tls_producer =
- new TLSProducerSocket(nullptr, this, interest.getName());
- tls_producer->setInterface(new interface::TLSProducerSocket(tls_producer));
-
- tls_producer->on_content_produced_application_ =
- this->on_content_produced_application_;
- tls_producer->setSocketOption(CONTENT_OBJECT_EXPIRY_TIME,
- this->content_object_expiry_time_);
- tls_producer->setSocketOption(SIGNER, this->signer_);
- tls_producer->setSocketOption(MAKE_MANIFEST, this->making_manifest_);
- tls_producer->setSocketOption(DATA_PACKET_SIZE,
- (uint32_t)(this->data_packet_size_));
- tls_producer->output_buffer_.setLimit(this->output_buffer_.getLimit());
- map_secure_producers.insert(
- {interest.getName(), std::unique_ptr<TLSProducerSocket>(tls_producer)});
- tls_producer->onInterest(*tls_producer, interest);
- tls_producer->async_accept();
+ tls_producer_ptr->onInterest(*tls_producer_ptr, interest);
+ tls_producer_ptr->async_accept();
} else {
- auto it = map_secure_rtc_producers.find(interest.getName());
- if (it != map_secure_rtc_producers.end()) return;
- TLSRTCProducerSocket *tls_producer =
- new TLSRTCProducerSocket(nullptr, this, interest.getName());
- tls_producer->setInterface(
- new interface::TLSRTCProducerSocket(tls_producer));
- tls_producer->on_content_produced_application_ =
- this->on_content_produced_application_;
- tls_producer->setSocketOption(CONTENT_OBJECT_EXPIRY_TIME,
- this->content_object_expiry_time_);
- tls_producer->setSocketOption(SIGNER, this->signer_);
- tls_producer->setSocketOption(MAKE_MANIFEST, this->making_manifest_);
- tls_producer->setSocketOption(DATA_PACKET_SIZE,
- (uint32_t)(this->data_packet_size_));
- tls_producer->output_buffer_.setLimit(this->output_buffer_.getLimit());
- map_secure_rtc_producers.insert(
- {interest.getName(),
- std::unique_ptr<TLSRTCProducerSocket>(tls_producer)});
- tls_producer->onInterest(*tls_producer, interest);
- tls_producer->async_accept();
+ TLSRTCProducerSocket *rtc_producer_ptr =
+ dynamic_cast<TLSRTCProducerSocket *>(tls_producer_ptr);
+ rtc_producer_ptr->onInterest(*rtc_producer_ptr, interest);
+ rtc_producer_ptr->async_accept();
}
}
@@ -143,11 +145,13 @@ void P2PSecureProducerSocket::produce(const uint8_t *buffer,
}
std::unique_lock<std::mutex> lck(mtx_);
- if (list_secure_rtc_producers.empty()) cv_.wait(lck);
- for (auto it = list_secure_rtc_producers.cbegin();
- it != list_secure_rtc_producers.cend(); it++) {
- (*it)->produce(utils::MemBuf::copyBuffer(buffer, buffer_size));
+ if (list_producers.empty()) cv_.wait(lck);
+
+ for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) {
+ TLSRTCProducerSocket *rtc_producer =
+ dynamic_cast<TLSRTCProducerSocket *>(it->get());
+ rtc_producer->produce(utils::MemBuf::copyBuffer(buffer, buffer_size));
}
}
@@ -162,12 +166,13 @@ uint32_t P2PSecureProducerSocket::produce(
std::unique_lock<std::mutex> lck(mtx_);
uint32_t segments = 0;
- if (list_secure_producers.empty()) cv_.wait(lck);
- for (auto it = list_secure_producers.cbegin();
- it != list_secure_producers.cend(); it++)
+ if (list_producers.empty()) cv_.wait(lck);
+
+ for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++)
segments +=
(*it)->produce(content_name, buffer->clone(), is_last, start_offset);
+
return segments;
}
@@ -183,12 +188,12 @@ uint32_t P2PSecureProducerSocket::produce(Name content_name,
std::unique_lock<std::mutex> lck(mtx_);
uint32_t segments = 0;
- if (list_secure_producers.empty()) cv_.wait(lck);
+ if (list_producers.empty()) cv_.wait(lck);
- for (auto it = list_secure_producers.cbegin();
- it != list_secure_producers.cend(); it++)
+ for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++)
segments += (*it)->produce(content_name, buffer, buffer_size, is_last,
start_offset);
+
return segments;
}
@@ -203,10 +208,9 @@ void P2PSecureProducerSocket::asyncProduce(const Name &content_name,
}
std::unique_lock<std::mutex> lck(mtx_);
- if (list_secure_producers.empty()) cv_.wait(lck);
+ if (list_producers.empty()) cv_.wait(lck);
- for (auto it = list_secure_producers.cbegin();
- it != list_secure_producers.cend(); it++) {
+ for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) {
(*it)->asyncProduce(content_name, buf, buffer_size, is_last, start_offset);
}
}
@@ -221,22 +225,19 @@ void P2PSecureProducerSocket::asyncProduce(
}
std::unique_lock<std::mutex> lck(mtx_);
- if (list_secure_producers.empty()) cv_.wait(lck);
+ if (list_producers.empty()) cv_.wait(lck);
- for (auto it = list_secure_producers.cbegin();
- it != list_secure_producers.cend(); it++) {
+ for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) {
(*it)->asyncProduce(content_name, buffer->clone(), is_last, offset,
last_segment);
}
}
-// Socket Option Redefinition to avoid name hiding
-
+/* Redefinition of socket options to avoid name hiding */
int P2PSecureProducerSocket::setSocketOption(
int socket_option_key, ProducerInterestCallback socket_option_value) {
- if (!list_secure_producers.empty()) {
- for (auto it = list_secure_producers.cbegin();
- it != list_secure_producers.cend(); it++)
+ if (!list_producers.empty()) {
+ for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++)
(*it)->setSocketOption(socket_option_key, socket_option_value);
}
@@ -269,9 +270,8 @@ int P2PSecureProducerSocket::setSocketOption(
int P2PSecureProducerSocket::setSocketOption(
int socket_option_key,
const std::shared_ptr<utils::Signer> &socket_option_value) {
- if (!list_secure_producers.empty())
- for (auto it = list_secure_producers.cbegin();
- it != list_secure_producers.cend(); it++)
+ if (!list_producers.empty())
+ for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++)
(*it)->setSocketOption(socket_option_key, socket_option_value);
switch (socket_option_key) {
@@ -288,9 +288,8 @@ int P2PSecureProducerSocket::setSocketOption(
int P2PSecureProducerSocket::setSocketOption(int socket_option_key,
uint32_t socket_option_value) {
- if (!list_secure_producers.empty()) {
- for (auto it = list_secure_producers.cbegin();
- it != list_secure_producers.cend(); it++)
+ if (!list_producers.empty()) {
+ for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++)
(*it)->setSocketOption(socket_option_key, socket_option_value);
}
switch (socket_option_key) {
@@ -305,9 +304,8 @@ int P2PSecureProducerSocket::setSocketOption(int socket_option_key,
int P2PSecureProducerSocket::setSocketOption(int socket_option_key,
bool socket_option_value) {
- if (!list_secure_producers.empty())
- for (auto it = list_secure_producers.cbegin();
- it != list_secure_producers.cend(); it++)
+ if (!list_producers.empty())
+ for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++)
(*it)->setSocketOption(socket_option_key, socket_option_value);
return ProducerSocket::setSocketOption(socket_option_key,
@@ -316,9 +314,8 @@ int P2PSecureProducerSocket::setSocketOption(int socket_option_key,
int P2PSecureProducerSocket::setSocketOption(int socket_option_key,
Name *socket_option_value) {
- if (!list_secure_producers.empty())
- for (auto it = list_secure_producers.cbegin();
- it != list_secure_producers.cend(); it++)
+ if (!list_producers.empty())
+ for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++)
(*it)->setSocketOption(socket_option_key, socket_option_value);
return ProducerSocket::setSocketOption(socket_option_key,
@@ -327,9 +324,8 @@ int P2PSecureProducerSocket::setSocketOption(int socket_option_key,
int P2PSecureProducerSocket::setSocketOption(
int socket_option_key, std::list<Prefix> socket_option_value) {
- if (!list_secure_producers.empty())
- for (auto it = list_secure_producers.cbegin();
- it != list_secure_producers.cend(); it++)
+ if (!list_producers.empty())
+ for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++)
(*it)->setSocketOption(socket_option_key, socket_option_value);
return ProducerSocket::setSocketOption(socket_option_key,
@@ -338,9 +334,8 @@ int P2PSecureProducerSocket::setSocketOption(
int P2PSecureProducerSocket::setSocketOption(
int socket_option_key, ProducerContentObjectCallback socket_option_value) {
- if (!list_secure_producers.empty())
- for (auto it = list_secure_producers.cbegin();
- it != list_secure_producers.cend(); it++)
+ if (!list_producers.empty())
+ for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++)
(*it)->setSocketOption(socket_option_key, socket_option_value);
return ProducerSocket::setSocketOption(socket_option_key,
@@ -349,9 +344,8 @@ int P2PSecureProducerSocket::setSocketOption(
int P2PSecureProducerSocket::setSocketOption(
int socket_option_key, ProducerContentCallback socket_option_value) {
- if (!list_secure_producers.empty())
- for (auto it = list_secure_producers.cbegin();
- it != list_secure_producers.cend(); it++)
+ if (!list_producers.empty())
+ for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++)
(*it)->setSocketOption(socket_option_key, socket_option_value);
switch (socket_option_key) {
@@ -368,9 +362,8 @@ int P2PSecureProducerSocket::setSocketOption(
int P2PSecureProducerSocket::setSocketOption(
int socket_option_key, utils::CryptoHashType socket_option_value) {
- if (!list_secure_producers.empty())
- for (auto it = list_secure_producers.cbegin();
- it != list_secure_producers.cend(); it++)
+ if (!list_producers.empty())
+ for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++)
(*it)->setSocketOption(socket_option_key, socket_option_value);
return ProducerSocket::setSocketOption(socket_option_key,
@@ -379,9 +372,8 @@ int P2PSecureProducerSocket::setSocketOption(
int P2PSecureProducerSocket::setSocketOption(
int socket_option_key, utils::CryptoSuite socket_option_value) {
- if (!list_secure_producers.empty())
- for (auto it = list_secure_producers.cbegin();
- it != list_secure_producers.cend(); it++)
+ if (!list_producers.empty())
+ for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++)
(*it)->setSocketOption(socket_option_key, socket_option_value);
return ProducerSocket::setSocketOption(socket_option_key,
@@ -390,9 +382,8 @@ int P2PSecureProducerSocket::setSocketOption(
int P2PSecureProducerSocket::setSocketOption(
int socket_option_key, const std::string &socket_option_value) {
- if (!list_secure_producers.empty())
- for (auto it = list_secure_producers.cbegin();
- it != list_secure_producers.cend(); it++)
+ if (!list_producers.empty())
+ for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++)
(*it)->setSocketOption(socket_option_key, socket_option_value);
return ProducerSocket::setSocketOption(socket_option_key,
@@ -400,5 +391,4 @@ int P2PSecureProducerSocket::setSocketOption(
}
} // namespace implementation
-
} // namespace transport
diff --git a/libtransport/src/implementation/p2psecure_socket_producer.h b/libtransport/src/implementation/p2psecure_socket_producer.h
index 33339deba..bfc9fc2c1 100644
--- a/libtransport/src/implementation/p2psecure_socket_producer.h
+++ b/libtransport/src/implementation/p2psecure_socket_producer.h
@@ -37,9 +37,11 @@ class P2PSecureProducerSocket : public ProducerSocket {
public:
explicit P2PSecureProducerSocket(interface::ProducerSocket *producer_socket);
+
explicit P2PSecureProducerSocket(
interface::ProducerSocket *producer_socket, bool rtc,
const std::shared_ptr<utils::Identity> &identity);
+
~P2PSecureProducerSocket();
void produce(const uint8_t *buffer, size_t buffer_size) override;
@@ -96,7 +98,6 @@ class P2PSecureProducerSocket : public ProducerSocket {
using ProducerSocket::onInterest;
protected:
- bool rtc_;
/* Callback invoked once an interest has been received and its payload
* decrypted */
ProducerInterestCallback on_interest_input_decrypted_;
@@ -104,27 +105,23 @@ class P2PSecureProducerSocket : public ProducerSocket {
ProducerContentCallback on_content_produced_application_;
private:
+ bool rtc_;
std::mutex mtx_;
-
/* Condition variable for the wait */
std::condition_variable cv_;
-
PARCBuffer *der_cert_;
PARCBuffer *der_prk_;
X509 *cert_509_;
EVP_PKEY *pkey_rsa_;
std::unordered_map<core::Name, std::unique_ptr<TLSProducerSocket>,
core::hash<core::Name>, core::compare2<core::Name>>
- map_secure_producers;
- std::unordered_map<core::Name, std::unique_ptr<TLSRTCProducerSocket>,
- core::hash<core::Name>, core::compare2<core::Name>>
- map_secure_rtc_producers;
- std::list<std::unique_ptr<TLSProducerSocket>> list_secure_producers;
- std::list<std::unique_ptr<TLSRTCProducerSocket>> list_secure_rtc_producers;
+ map_producers;
+ std::list<std::unique_ptr<TLSProducerSocket>> list_producers;
void onInterestCallback(interface::ProducerSocket &p, Interest &interest);
+
+ void initSessionSocket(std::unique_ptr<TLSProducerSocket> &producer);
};
} // namespace implementation
-
} // namespace transport
diff --git a/libtransport/src/implementation/socket_producer.h b/libtransport/src/implementation/socket_producer.h
index 9ccc59d9e..8c5c453fc 100644
--- a/libtransport/src/implementation/socket_producer.h
+++ b/libtransport/src/implementation/socket_producer.h
@@ -131,7 +131,7 @@ class ProducerSocket : public Socket<BasePortal>,
// TODO Manifest may still be used for indexing
if (making_manifest && !signer) {
- TRANSPORT_LOGD("Making manifests without setting producer identity.");
+ TRANSPORT_LOGE("Making manifests without setting producer identity.");
}
core::Packet::Format hf_format = core::Packet::Format::HF_UNSPEC;
@@ -192,7 +192,6 @@ class ProducerSocket : public Socket<BasePortal>,
}
}
- TRANSPORT_LOGD("--------- START PRODUCE ----------");
for (unsigned int packaged_segments = 0;
packaged_segments < number_of_segments; packaged_segments++) {
if (making_manifest) {
@@ -207,13 +206,12 @@ class ProducerSocket : public Socket<BasePortal>,
}
passContentObjectToCallbacks(manifest);
- TRANSPORT_LOGD("Send manifest %u", manifest->getName().getSuffix());
+ TRANSPORT_LOGD("Send manifest %s", manifest->getName().toString().c_str());
// Send content objects stored in the queue
while (!content_queue_.empty()) {
passContentObjectToCallbacks(content_queue_.front());
- TRANSPORT_LOGD("Send content %u",
- content_queue_.front()->getName().getSuffix());
+ TRANSPORT_LOGD("Send content %s", content_queue_.front()->getName().toString().c_str());
content_queue_.pop();
}
@@ -270,8 +268,7 @@ class ProducerSocket : public Socket<BasePortal>,
signer->sign(*content_object);
}
passContentObjectToCallbacks(content_object);
- TRANSPORT_LOGD("Send content %u",
- content_object->getName().getSuffix());
+ TRANSPORT_LOGD("Send content %s", content_object->getName().toString().c_str());
}
}
@@ -286,11 +283,11 @@ class ProducerSocket : public Socket<BasePortal>,
}
passContentObjectToCallbacks(manifest);
- TRANSPORT_LOGD("Send manifest %u", manifest->getName().getSuffix());
+ TRANSPORT_LOGD("Send manifest %s", manifest->getName().toString().c_str());
+
while (!content_queue_.empty()) {
passContentObjectToCallbacks(content_queue_.front());
- TRANSPORT_LOGD("Send content %u",
- content_queue_.front()->getName().getSuffix());
+ TRANSPORT_LOGD("Send content %s", content_queue_.front()->getName().toString().c_str());
content_queue_.pop();
}
}
@@ -949,18 +946,19 @@ class ProducerSocket : public Socket<BasePortal>,
std::mutex mtx;
/* Condition variable for the wait */
std::condition_variable cv;
- std::unique_lock<std::mutex> lck(mtx);
+
bool done = false;
io_service_.dispatch([&socket_option_key, &socket_option_value, &mtx, &cv,
&result, &done, &func]() {
std::unique_lock<std::mutex> lck(mtx);
done = true;
result = func(socket_option_key, socket_option_value);
-
- if (!done) {
- cv.wait(lck);
- }
+ cv.notify_all();
});
+ std::unique_lock<std::mutex> lck(mtx);
+ if (!done) {
+ cv.wait(lck);
+ }
} else {
result = func(socket_option_key, socket_option_value);
}
diff --git a/libtransport/src/implementation/tls_rtc_socket_producer.cc b/libtransport/src/implementation/tls_rtc_socket_producer.cc
index 3b3152993..9ef79ca23 100644
--- a/libtransport/src/implementation/tls_rtc_socket_producer.cc
+++ b/libtransport/src/implementation/tls_rtc_socket_producer.cc
@@ -53,7 +53,7 @@ int TLSRTCProducerSocket::readOld(BIO *b, char *buf, int size) {
(socket->cv_).wait(lck);
}
- utils::MemBuf *membuf = socket->packet_->next();
+ utils::MemBuf *membuf = socket->handshake_packet_->next();
int size_to_read;
if ((int)membuf->length() > size) {
@@ -91,10 +91,10 @@ int TLSRTCProducerSocket::writeOld(BIO *b, const char *buf, int num) {
TLSRTCProducerSocket *socket;
socket = (TLSRTCProducerSocket *)BIO_get_data(b);
- if ((SSL_in_before(socket->ssl_) || SSL_in_init(socket->ssl_)) &&
- socket->first_) {
- socket->tls_chunks_--;
+ if (socket->getHandshakeState() != SERVER_FINISHED && socket->first_) {
bool making_manifest = socket->parent_->making_manifest_;
+
+ socket->tls_chunks_--;
socket->parent_->setSocketOption(GeneralTransportOptions::MAKE_MANIFEST,
false);
socket->parent_->ProducerSocket::produce(
@@ -107,13 +107,17 @@ int TLSRTCProducerSocket::writeOld(BIO *b, const char *buf, int num) {
std::unique_ptr<utils::MemBuf> mbuf =
utils::MemBuf::copyBuffer(buf, (std::size_t)num, 0, 0);
auto a = mbuf.release();
+
socket->async_thread_.add([socket = socket, a]() {
socket->to_call_oncontentproduced_--;
auto mbuf = std::unique_ptr<utils::MemBuf>(a);
+
socket->RTCProducerSocket::produce(std::move(mbuf));
+
ProducerContentCallback on_content_produced_application;
socket->getSocketOption(ProducerCallbacksOptions::CONTENT_PRODUCED,
on_content_produced_application);
+
if (socket->to_call_oncontentproduced_ == 0 &&
on_content_produced_application) {
on_content_produced_application(
@@ -144,24 +148,28 @@ TLSRTCProducerSocket::TLSRTCProducerSocket(
}
void TLSRTCProducerSocket::accept() {
- if (SSL_in_before(ssl_) || SSL_in_init(ssl_)) {
+ HandshakeState handshake_state = getHandshakeState();
+
+ if (handshake_state == UNINITIATED || handshake_state == CLIENT_HELLO) {
tls_chunks_ = 1;
int result = SSL_accept(ssl_);
+
if (result != 1)
throw errors::RuntimeException("Unable to perform client handshake");
}
TRANSPORT_LOGD("Handshake performed!");
- parent_->list_secure_rtc_producers.push_front(
- std::move(parent_->map_secure_rtc_producers[handshake_name_]));
- parent_->map_secure_rtc_producers.erase(handshake_name_);
+
+ parent_->list_producers.push_front(
+ std::move(parent_->map_producers[handshake_name_]));
+ parent_->map_producers.erase(handshake_name_);
ProducerInterestCallback on_interest_process_decrypted;
getSocketOption(ProducerCallbacksOptions::CACHE_MISS,
on_interest_process_decrypted);
if (on_interest_process_decrypted) {
- Interest inter(std::move(packet_));
+ Interest inter(std::move(handshake_packet_));
on_interest_process_decrypted(
(transport::interface::ProducerSocket &)(*getInterface()), inter);
}
@@ -181,7 +189,9 @@ int TLSRTCProducerSocket::async_accept() {
}
void TLSRTCProducerSocket::produce(std::unique_ptr<utils::MemBuf> &&buffer) {
- if (SSL_in_before(ssl_) || SSL_in_init(ssl_)) {
+ HandshakeState handshake_state = getHandshakeState();
+
+ if (handshake_state != SERVER_FINISHED) {
throw errors::RuntimeException(
"New handshake on the same P2P secure producer socket not supported");
}
@@ -197,5 +207,4 @@ void TLSRTCProducerSocket::produce(std::unique_ptr<utils::MemBuf> &&buffer) {
}
} // namespace implementation
-
} // namespace transport
diff --git a/libtransport/src/implementation/tls_socket_consumer.cc b/libtransport/src/implementation/tls_socket_consumer.cc
index 95b287aa6..7cf653848 100644
--- a/libtransport/src/implementation/tls_socket_consumer.cc
+++ b/libtransport/src/implementation/tls_socket_consumer.cc
@@ -46,11 +46,13 @@ int readOldTLS(BIO *b, char *buf, int size) {
socket->network_name_.setSuffix(socket->random_suffix_);
socket->ConsumerSocket::asyncConsume(socket->network_name_);
}
+
if (!socket->something_to_read_) socket->cv_.wait(lck);
}
size_t size_to_read, read;
size_t chain_size = socket->head_->length();
+
if (socket->head_->isChained())
chain_size = socket->head_->computeChainDataLength();
@@ -101,6 +103,7 @@ int writeOldTLS(BIO *b, const char *buf, int num) {
socket = (TLSConsumerSocket *)BIO_get_data(b);
socket->payload_ = utils::MemBuf::copyBuffer(buf, num);
+
socket->ConsumerSocket::setSocketOption(
ConsumerCallbacksOptions::INTEREST_OUTPUT,
(ConsumerInterestCallback)std::bind(
@@ -176,12 +179,6 @@ TLSConsumerSocket::TLSConsumerSocket(interface::ConsumerSocket *consumer_socket,
BIO_set_data(bio, this);
SSL_set_bio(ssl_, bio, bio);
- ConsumerSocket::getSocketOption(MAX_WINDOW_SIZE, old_max_win_);
- ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, (double)1.0);
-
- ConsumerSocket::getSocketOption(CURRENT_WINDOW_SIZE, old_current_win_);
- ConsumerSocket::setSocketOption(CURRENT_WINDOW_SIZE, (double)1.0);
-
std::default_random_engine generator;
std::uniform_int_distribution<int> distribution(
1, std::numeric_limits<uint32_t>::max());
@@ -191,10 +188,8 @@ TLSConsumerSocket::TLSConsumerSocket(interface::ConsumerSocket *consumer_socket,
this);
};
-/*
- * The producer interface is not owned by the application, so is TLSSocket task
- * to deallocate the memory
- */
+/* The producer interface is not owned by the application, so is TLSSocket task
+ * to deallocate the memory */
TLSConsumerSocket::~TLSConsumerSocket() { delete consumer_interface_; }
int TLSConsumerSocket::consume(const Name &name,
@@ -228,22 +223,15 @@ int TLSConsumerSocket::download_content(const Name &name) {
something_to_read_ = false;
content_downloaded_ = false;
- decrypted_content_ = utils::MemBuf::createCombined(SSL3_RT_MAX_PLAIN_LENGTH);
- uint8_t *buf = decrypted_content_->writableData();
- size_t size = 0;
+ std::size_t max_buffer_size = read_callback_decrypted_->maxBufferSize();
+ std::size_t buffer_size = read_callback_decrypted_->maxBufferSize() + SSL3_RT_MAX_PLAIN_LENGTH;
+ decrypted_content_ = utils::MemBuf::createCombined(buffer_size);
int result = -1;
+ std::size_t size = 0;
while (!content_downloaded_ || something_to_read_) {
- if (decrypted_content_->tailroom() < SSL3_RT_MAX_PLAIN_LENGTH) {
- decrypted_content_->appendChain(
- utils::MemBuf::createCombined(SSL3_RT_MAX_PLAIN_LENGTH));
- // decrypted_content_->computeChainDataLength();
- buf = decrypted_content_->prev()->writableData();
- } else {
- buf = decrypted_content_->writableTail();
- }
-
- result = SSL_read(this->ssl_, buf, SSL3_RT_MAX_PLAIN_LENGTH);
+ result = SSL_read(
+ this->ssl_, decrypted_content_->writableTail(), SSL3_RT_MAX_PLAIN_LENGTH);
/* SSL_read returns the data only if there were SSL3_RT_MAX_PLAIN_LENGTH of
* the data has been fully downloaded */
@@ -253,20 +241,20 @@ int TLSConsumerSocket::download_content(const Name &name) {
if (result >= 0) {
size += result;
- decrypted_content_->prepend(result);
- } else
+ decrypted_content_->append(result);
+ } else {
throw errors::RuntimeException("Unable to download content");
+ }
- if (size >= read_callback_decrypted_->maxBufferSize()) {
+ if (decrypted_content_->length() >= max_buffer_size) {
if (read_callback_decrypted_->isBufferMovable()) {
- // No need to perform an additional copy. The whole buffer will be
- // tranferred to the application.
-
+ /* No need to perform an additional copy. The whole buffer will be
+ * tranferred to the application. */
read_callback_decrypted_->readBufferAvailable(
std::move(decrypted_content_));
- decrypted_content_ = utils::MemBuf::create(SSL3_RT_MAX_PLAIN_LENGTH);
+ decrypted_content_ = utils::MemBuf::create(buffer_size);
} else {
- // The buffer will be copied into the application-provided buffer
+ /* The buffer will be copied into the application-provided buffer */
uint8_t *buffer;
std::size_t length;
std::size_t total_length = decrypted_content_->length();
@@ -358,6 +346,7 @@ size_t TLSConsumerSocket::maxBufferSize() const {
void TLSConsumerSocket::readBufferAvailable(
std::unique_ptr<utils::MemBuf> &&buffer) noexcept {
std::unique_lock<std::mutex> lck(this->mtx_);
+
if (head_) {
head_->prependChain(std::move(buffer));
} else {
@@ -380,5 +369,4 @@ void TLSConsumerSocket::readSuccess(std::size_t total_size) noexcept {
bool TLSConsumerSocket::isBufferMovable() noexcept { return true; }
} // namespace implementation
-
} // namespace transport
diff --git a/libtransport/src/implementation/tls_socket_consumer.h b/libtransport/src/implementation/tls_socket_consumer.h
index 2e88dc47e..6931a7a8a 100644
--- a/libtransport/src/implementation/tls_socket_consumer.h
+++ b/libtransport/src/implementation/tls_socket_consumer.h
@@ -69,42 +69,27 @@ class TLSConsumerSocket : public ConsumerSocket,
private:
Name name_;
-
/* SSL handle */
SSL *ssl_;
SSL_CTX *ctx_;
-
/* Chain of MemBuf to be used as a temporary buffer to pass descypted data
* from the underlying layer to the application */
utils::ObjectPool<utils::MemBuf> buf_pool_;
std::unique_ptr<utils::MemBuf> decrypted_content_;
-
- /* Chain of MemBuf holding the payload to be written into interest or data
- */
+ /* Chain of MemBuf holding the payload to be written into interest or data */
std::unique_ptr<utils::MemBuf> payload_;
-
/* Chain of MemBuf holding the data retrieved from the underlying layer */
std::unique_ptr<utils::MemBuf> head_;
-
bool something_to_read_;
-
bool content_downloaded_;
-
double old_max_win_;
-
double old_current_win_;
-
uint32_t random_suffix_;
-
Prefix producer_namespace_;
-
interface::ConsumerSocket::ReadCallback *read_callback_decrypted_;
-
std::mutex mtx_;
-
/* Condition variable for the wait */
std::condition_variable cv_;
-
utils::EventThread async_downloader_tls_;
void setInterestPayload(interface::ConsumerSocket &c,
@@ -123,11 +108,11 @@ class TLSConsumerSocket : public ConsumerSocket,
virtual void readError(const std::error_code ec) noexcept override;
virtual void readSuccess(std::size_t total_size) noexcept override;
+
virtual bool isBufferMovable() noexcept override;
int download_content(const Name &name);
};
} // namespace implementation
-
-} // end namespace transport \ No newline at end of file
+} // end namespace transport
diff --git a/libtransport/src/implementation/tls_socket_producer.cc b/libtransport/src/implementation/tls_socket_producer.cc
index 9a5b94a1c..339a1ad58 100644
--- a/libtransport/src/implementation/tls_socket_producer.cc
+++ b/libtransport/src/implementation/tls_socket_producer.cc
@@ -48,18 +48,17 @@ int TLSProducerSocket::readOld(BIO *b, char *buf, int size) {
TLSProducerSocket *socket;
socket = (TLSProducerSocket *)BIO_get_data(b);
- /* take a lock on the mutex. It will be unlocked by */
std::unique_lock<std::mutex> lck(socket->mtx_);
+
if (!socket->something_to_read_) {
(socket->cv_).wait(lck);
}
- /* Either there already is something to read, or the thread has been waken up
- */
- /* must return the payload in the interest */
-
- utils::MemBuf *membuf = socket->packet_->next();
+ /* Either there already is something to read, or the thread has been waken up.
+ * We must return the payload in the interest anyway */
+ utils::MemBuf *membuf = socket->handshake_packet_->next();
int size_to_read;
+
if ((int)membuf->length() > size) {
size_to_read = size;
} else {
@@ -97,11 +96,11 @@ int TLSProducerSocket::writeOld(BIO *b, const char *buf, int num) {
TLSProducerSocket *socket;
socket = (TLSProducerSocket *)BIO_get_data(b);
- if ((SSL_in_before(socket->ssl_) || SSL_in_init(socket->ssl_)) &&
- socket->first_) {
+ if (socket->getHandshakeState() != SERVER_FINISHED && socket->first_) {
+ bool making_manifest = socket->parent_->making_manifest_;
+
//! socket->tls_chunks_ corresponds to is_last
socket->tls_chunks_--;
- bool making_manifest = socket->parent_->making_manifest_;
socket->parent_->setSocketOption(GeneralTransportOptions::MAKE_MANIFEST,
false);
socket->parent_->ProducerSocket::produce(
@@ -116,16 +115,21 @@ int TLSProducerSocket::writeOld(BIO *b, const char *buf, int num) {
std::unique_ptr<utils::MemBuf> mbuf =
utils::MemBuf::copyBuffer(buf, (std::size_t)num, 0, 0);
auto a = mbuf.release();
+
socket->async_thread_.add([socket = socket, a]() {
+ auto mbuf = std::unique_ptr<utils::MemBuf>(a);
+
socket->tls_chunks_--;
socket->to_call_oncontentproduced_--;
- auto mbuf = std::unique_ptr<utils::MemBuf>(a);
+
socket->last_segment_ += socket->ProducerSocket::produce(
socket->name_, std::move(mbuf), socket->tls_chunks_ == 0,
socket->last_segment_);
+
ProducerContentCallback on_content_produced_application;
socket->getSocketOption(ProducerCallbacksOptions::CONTENT_PRODUCED,
on_content_produced_application);
+
if (socket->to_call_oncontentproduced_ == 0 &&
on_content_produced_application) {
on_content_produced_application(*socket->getInterface(),
@@ -144,8 +148,10 @@ TLSProducerSocket::TLSProducerSocket(interface::ProducerSocket *producer_socket,
on_content_produced_application_(),
mtx_(),
cv_(),
- something_to_read_(),
+ something_to_read_(false),
+ handshake_state_(UNINITIATED),
name_(),
+ handshake_packet_(),
last_segment_(0),
parent_(parent),
first_(true),
@@ -157,9 +163,7 @@ TLSProducerSocket::TLSProducerSocket(interface::ProducerSocket *producer_socket,
const SSL_METHOD *meth = TLS_server_method();
ctx_ = SSL_CTX_new(meth);
- /*
- * Setup SSL context (identity and parameter to use TLS 1.3)
- */
+ /* Setup SSL context (identity and parameter to use TLS 1.3) */
SSL_CTX_use_certificate(ctx_, parent->cert_509_);
SSL_CTX_use_PrivateKey(ctx_, parent->pkey_rsa_);
@@ -167,6 +171,7 @@ TLSProducerSocket::TLSProducerSocket(interface::ProducerSocket *producer_socket,
SSL_CTX_set_ciphersuites(ctx_,
"TLS_AES_256_GCM_SHA384:TLS_CHACHA20_POLY1305_"
"SHA256:TLS_AES_128_GCM_SHA256");
+
if (result != 1) {
throw errors::RuntimeException(
"Unable to set cipher list on TLS subsystem. Aborting.");
@@ -184,10 +189,9 @@ TLSProducerSocket::TLSProducerSocket(interface::ProducerSocket *producer_socket,
this, TLSProducerSocket::parseHicnKeyIdCb, NULL);
ssl_ = SSL_new(ctx_);
- /*
- * Setup this producer socker as the bio that TLS will use to write and read
- * data (in stream mode)
- */
+
+ /* Setup this producer socker as the bio that TLS will use to write and read
+ * data (in stream mode) */
BIO_METHOD *bio_meth =
BIO_meth_new(BIO_TYPE_ACCEPT, "secure producer socket");
BIO_meth_set_read(bio_meth, TLSProducerSocket::readOld);
@@ -197,15 +201,15 @@ TLSProducerSocket::TLSProducerSocket(interface::ProducerSocket *producer_socket,
BIO_set_init(bio, 1);
BIO_set_data(bio, this);
SSL_set_bio(ssl_, bio, bio);
- /*
- * Set the callback so that when an interest is received we catch it and we
- * decrypt the payload before passing it to the application.
- */
+
+ /* Set the callback so that when an interest is received we catch it and we
+ * decrypt the payload before passing it to the application. */
this->ProducerSocket::setSocketOption(
ProducerCallbacksOptions::CACHE_MISS,
(ProducerInterestCallback)std::bind(&TLSProducerSocket::cacheMiss, this,
std::placeholders::_1,
std::placeholders::_2));
+
this->ProducerSocket::setSocketOption(
ProducerCallbacksOptions::CONTENT_PRODUCED,
(ProducerContentCallback)bind(
@@ -213,35 +217,39 @@ TLSProducerSocket::TLSProducerSocket(interface::ProducerSocket *producer_socket,
std::placeholders::_2, std::placeholders::_3));
}
-/*
- * The producer interface is not owned by the application, so is TLSSocket task
- * to deallocate the memory
- */
+/* The producer interface is not owned by the application, so is TLSSocket task
+ * to deallocate the memory */
TLSProducerSocket::~TLSProducerSocket() { delete producer_interface_; }
void TLSProducerSocket::accept() {
- if (SSL_in_before(ssl_) || SSL_in_init(ssl_)) {
+ HandshakeState handshake_state = getHandshakeState();
+
+ if (handshake_state == UNINITIATED || handshake_state == CLIENT_HELLO) {
tls_chunks_ = 1;
int result = SSL_accept(ssl_);
+
if (result != 1)
throw errors::RuntimeException("Unable to perform client handshake");
}
- TRANSPORT_LOGD("Handshake performed!");
- parent_->list_secure_producers.push_front(
- std::move(parent_->map_secure_producers[handshake_name_]));
- parent_->map_secure_producers.erase(handshake_name_);
+
+ parent_->list_producers.push_front(
+ std::move(parent_->map_producers[handshake_name_]));
+ parent_->map_producers.erase(handshake_name_);
ProducerInterestCallback on_interest_process_decrypted;
getSocketOption(ProducerCallbacksOptions::CACHE_MISS,
on_interest_process_decrypted);
if (on_interest_process_decrypted) {
- Interest inter(std::move(packet_));
+ Interest inter(std::move(handshake_packet_));
on_interest_process_decrypted(*getInterface(), inter);
} else {
throw errors::RuntimeException(
- "On interest process unset. Unable to perform handshake");
+ "On interest process unset: unable to perform handshake");
}
+
+ handshake_state_ = SERVER_FINISHED;
+ TRANSPORT_LOGD("Handshake performed!");
}
int TLSProducerSocket::async_accept() {
@@ -249,71 +257,88 @@ int TLSProducerSocket::async_accept() {
async_thread_.add([this]() { this->accept(); });
} else {
throw errors::RuntimeException(
- "Async thread not running, impossible to perform handshake");
+ "Async thread not running: unable to perform handshake");
}
return 1;
}
void TLSProducerSocket::onInterest(ProducerSocket &p, Interest &interest) {
- /* Based on the state machine of (D)TLS, we know what action to do */
- if (SSL_in_before(ssl_) || SSL_in_init(ssl_)) {
+ HandshakeState handshake_state = getHandshakeState();
+
+ if (handshake_state == UNINITIATED || handshake_state == CLIENT_HELLO) {
std::unique_lock<std::mutex> lck(mtx_);
+
name_ = interest.getName();
+ interest.separateHeaderPayload();
+ handshake_packet_ = interest.acquireMemBufReference();
something_to_read_ = true;
- packet_ = interest.acquireMemBufReference();
- if (head_) {
- payload_->prependChain(interest.getPayload());
- } else {
- payload_ = interest.getPayload(); // std::move(interest.getPayload());
- }
+
cv_.notify_one();
- } else {
- name_ = interest.getName();
- packet_ = interest.acquireMemBufReference();
- payload_ = interest.getPayload();
+ return;
+ } else if (handshake_state == SERVER_FINISHED) {
+ interest.separateHeaderPayload();
+ handshake_packet_ = interest.acquireMemBufReference();
something_to_read_ = true;
- if (interest.getPayload()->length() > 0)
+ if (interest.getPayload()->length() > 0) {
SSL_read(
ssl_,
const_cast<unsigned char *>(interest.getPayload()->writableData()),
interest.getPayload()->length());
- }
+ }
+
+ ProducerInterestCallback on_interest_input_decrypted;
+ getSocketOption(ProducerCallbacksOptions::INTEREST_INPUT,
+ on_interest_input_decrypted);
- ProducerInterestCallback on_interest_input_decrypted;
- getSocketOption(ProducerCallbacksOptions::INTEREST_INPUT,
- on_interest_input_decrypted);
- if (on_interest_input_decrypted)
- (on_interest_input_decrypted)(*getInterface(), interest);
+ if (on_interest_input_decrypted)
+ (on_interest_input_decrypted)(*getInterface(), interest);
+ }
}
void TLSProducerSocket::cacheMiss(interface::ProducerSocket &p,
Interest &interest) {
- if (SSL_in_before(ssl_) || SSL_in_init(ssl_)) {
+ HandshakeState handshake_state = getHandshakeState();
+
+ if (handshake_state == CLIENT_HELLO) {
std::unique_lock<std::mutex> lck(mtx_);
- name_ = interest.getName();
+
+ interest.separateHeaderPayload();
+ handshake_packet_ = interest.acquireMemBufReference();
something_to_read_ = true;
- packet_ = interest.acquireMemBufReference();
- payload_ = interest.getPayload();
+ handshake_state_ = CLIENT_FINISHED;
+
cv_.notify_one();
- } else {
- name_ = interest.getName();
- packet_ = interest.acquireMemBufReference();
- payload_ = interest.getPayload();
+ } else if (handshake_state == SERVER_FINISHED) {
+ interest.separateHeaderPayload();
+ handshake_packet_ = interest.acquireMemBufReference();
something_to_read_ = true;
- if (interest.getPayload()->length() > 0)
+ if (interest.getPayload()->length() > 0) {
SSL_read(
ssl_,
const_cast<unsigned char *>(interest.getPayload()->writableData()),
interest.getPayload()->length());
+ }
if (on_interest_process_decrypted_ != VOID_HANDLER)
on_interest_process_decrypted_(*getInterface(), interest);
}
}
+TLSProducerSocket::HandshakeState TLSProducerSocket::getHandshakeState() {
+ if (SSL_in_before(ssl_)) {
+ handshake_state_ = UNINITIATED;
+ }
+
+ if (SSL_in_init(ssl_) && handshake_state_ == UNINITIATED) {
+ handshake_state_ = CLIENT_HELLO;
+ }
+
+ return handshake_state_;
+}
+
void TLSProducerSocket::onContentProduced(interface::ProducerSocket &p,
const std::error_code &err,
uint64_t bytes_written) {}
@@ -321,13 +346,13 @@ void TLSProducerSocket::onContentProduced(interface::ProducerSocket &p,
uint32_t TLSProducerSocket::produce(Name content_name,
std::unique_ptr<utils::MemBuf> &&buffer,
bool is_last, uint32_t start_offset) {
- if (SSL_in_before(ssl_) || SSL_in_init(ssl_)) {
+ if (getHandshakeState() != SERVER_FINISHED) {
throw errors::RuntimeException(
"New handshake on the same P2P secure producer socket not supported");
}
+
size_t buf_size = buffer->length();
name_ = served_namespaces_.front().mapName(content_name);
-
tls_chunks_ = to_call_oncontentproduced_ =
ceil((float)buf_size / (float)SSL3_RT_MAX_PLAIN_LENGTH);
@@ -388,6 +413,7 @@ void TLSProducerSocket::produce(ContentObject &content_object) {
long TLSProducerSocket::ctrl(BIO *b, int cmd, long num, void *ptr) {
if (cmd == BIO_CTRL_FLUSH) {
}
+
return 1;
}
@@ -397,6 +423,7 @@ int TLSProducerSocket::addHicnKeyIdCb(SSL *s, unsigned int ext_type,
X509 *x, size_t chainidx, int *al,
void *add_arg) {
TLSProducerSocket *socket = reinterpret_cast<TLSProducerSocket *>(add_arg);
+
if (ext_type == 100) {
ip_prefix_t ip_prefix =
socket->parent_->served_namespaces_.front().toIpPrefixStruct();
@@ -425,6 +452,7 @@ int TLSProducerSocket::addHicnKeyIdCb(SSL *s, unsigned int ext_type,
ip_address_t keyId_component = {};
u32 *mask_buf;
u32 *keyId_component_buf;
+
switch (inet_family) {
case AF_INET:
mask_buf = &(mask.v4.as_u32);
@@ -483,6 +511,7 @@ int TLSProducerSocket::setSocketOption(
[this](int socket_option_key,
ProducerInterestCallback socket_option_value) -> int {
int result = SOCKET_OPTION_SET;
+
switch (socket_option_key) {
case ProducerCallbacksOptions::INTEREST_INPUT:
on_interest_input_decrypted_ = socket_option_value;
@@ -508,6 +537,7 @@ int TLSProducerSocket::setSocketOption(
result = SOCKET_OPTION_NOT_SET;
break;
}
+
return result;
});
}
@@ -607,5 +637,4 @@ int TLSProducerSocket::getSocketOption(
}
} // namespace implementation
-
} // namespace transport
diff --git a/libtransport/src/implementation/tls_socket_producer.h b/libtransport/src/implementation/tls_socket_producer.h
index e910c8259..2382e8695 100644
--- a/libtransport/src/implementation/tls_socket_producer.h
+++ b/libtransport/src/implementation/tls_socket_producer.h
@@ -84,47 +84,44 @@ class TLSProducerSocket : virtual public ProducerSocket {
using ProducerSocket::setSocketOption;
protected:
+ enum HandshakeState {
+ UNINITIATED,
+ CLIENT_HELLO, // when CLIENT_HELLO interest has been received
+ CLIENT_FINISHED, // when CLIENT_FINISHED interest has been received
+ SERVER_FINISHED, // when handshake is done
+ };
/* Callback invoked once an interest has been received and its payload
* decrypted */
ProducerInterestCallback on_interest_input_decrypted_;
ProducerInterestCallback on_interest_process_decrypted_;
ProducerContentCallback on_content_produced_application_;
-
std::mutex mtx_;
-
/* Condition variable for the wait */
std::condition_variable cv_;
-
/* Bool variable, true if there is something to read (an interest arrived) */
bool something_to_read_;
-
+ /* Bool variable, true if CLIENT_FINISHED interest has been received */
+ HandshakeState handshake_state_;
/* First interest that open a secure connection */
transport::core::Name name_;
-
/* SSL handle */
SSL *ssl_;
SSL_CTX *ctx_;
-
- Packet::MemBufPtr packet_;
-
+ Packet::MemBufPtr handshake_packet_;
std::unique_ptr<utils::MemBuf> head_;
std::uint32_t last_segment_;
- std::shared_ptr<utils::MemBuf> payload_;
std::uint32_t key_id_;
-
std::thread *handshake;
P2PSecureProducerSocket *parent_;
-
bool first_;
Name handshake_name_;
int tls_chunks_;
int to_call_oncontentproduced_;
-
bool still_writing_;
-
utils::EventThread encryption_thread_;
void onInterest(ProducerSocket &p, Interest &interest);
+
void cacheMiss(interface::ProducerSocket &p, Interest &interest);
/* Return the number of read bytes in readbytes */
@@ -156,8 +153,9 @@ class TLSProducerSocket : virtual public ProducerSocket {
void onContentProduced(interface::ProducerSocket &p,
const std::error_code &err, uint64_t bytes_written);
+
+ HandshakeState getHandshakeState();
};
} // namespace implementation
-
} // end namespace transport
diff --git a/libtransport/src/interfaces/p2psecure_socket_consumer.cc b/libtransport/src/interfaces/p2psecure_socket_consumer.cc
index 2fa8bb6e3..038441dfc 100644
--- a/libtransport/src/interfaces/p2psecure_socket_consumer.cc
+++ b/libtransport/src/interfaces/p2psecure_socket_consumer.cc
@@ -21,11 +21,17 @@ namespace transport {
namespace interface {
P2PSecureConsumerSocket::P2PSecureConsumerSocket(int handshake_protocol,
- int protocol)
+ int transport_protocol)
: ConsumerSocket() {
socket_ = std::unique_ptr<implementation::ConsumerSocket>(
new implementation::P2PSecureConsumerSocket(this, handshake_protocol,
- protocol));
+ transport_protocol));
+}
+
+void P2PSecureConsumerSocket::registerPrefix(const Prefix &producer_namespace) {
+ implementation::P2PSecureConsumerSocket &secure_consumer_socket =
+ *(static_cast<implementation::P2PSecureConsumerSocket *>(socket_.get()));
+ secure_consumer_socket.registerPrefix(producer_namespace);
}
} // namespace interface
diff --git a/libtransport/src/interfaces/tls_rtc_socket_producer.h b/libtransport/src/interfaces/tls_rtc_socket_producer.h
index 434edb522..3ea84095b 100644
--- a/libtransport/src/interfaces/tls_rtc_socket_producer.h
+++ b/libtransport/src/interfaces/tls_rtc_socket_producer.h
@@ -28,9 +28,9 @@ namespace interface {
class TLSRTCProducerSocket : public ProducerSocket {
public:
TLSRTCProducerSocket(implementation::TLSRTCProducerSocket *implementation);
+
~TLSRTCProducerSocket();
};
} // namespace interface
-
-} // end namespace transport \ No newline at end of file
+} // end namespace transport
diff --git a/libtransport/src/protocols/consumer.conf b/libtransport/src/protocols/consumer.conf
index 1a366f32f..d0eab75ac 100644
--- a/libtransport/src/protocols/consumer.conf
+++ b/libtransport/src/protocols/consumer.conf
@@ -1,4 +1,4 @@
-; this file contais the parameters for RAAQM
+; This file contains the parameters for RAAQM
autotune = no
lifetime = 500
retransmissions = 128
diff --git a/libtransport/src/protocols/incremental_indexer.cc b/libtransport/src/protocols/incremental_indexer.cc
index e590b4fee..0872c4554 100644
--- a/libtransport/src/protocols/incremental_indexer.cc
+++ b/libtransport/src/protocols/incremental_indexer.cc
@@ -25,6 +25,8 @@ void IncrementalIndexer::onContentObject(
core::Interest::Ptr &&interest, core::ContentObject::Ptr &&content_object) {
using namespace interface;
+ TRANSPORT_LOGD("Receive content %s", content_object->getName().toString().c_str());
+
if (TRANSPORT_EXPECT_FALSE(content_object->testRst())) {
final_suffix_ = content_object->getName().getSuffix();
}
@@ -50,4 +52,4 @@ void IncrementalIndexer::onContentObject(
}
} // namespace protocol
-} // namespace transport \ No newline at end of file
+} // namespace transport
diff --git a/libtransport/src/protocols/manifest_incremental_indexer.cc b/libtransport/src/protocols/manifest_incremental_indexer.cc
index 1a2f9dec3..da835b577 100644
--- a/libtransport/src/protocols/manifest_incremental_indexer.cc
+++ b/libtransport/src/protocols/manifest_incremental_indexer.cc
@@ -37,10 +37,12 @@ ManifestIncrementalIndexer::ManifestIncrementalIndexer(
void ManifestIncrementalIndexer::onContentObject(
core::Interest::Ptr &&interest, core::ContentObject::Ptr &&content_object) {
- // Check if mainfiest or not
+ // Check if manifest or not
if (content_object->getPayloadType() == PayloadType::MANIFEST) {
+ TRANSPORT_LOGD("Receive content %s", content_object->getName().toString().c_str());
onUntrustedManifest(std::move(interest), std::move(content_object));
} else if (content_object->getPayloadType() == PayloadType::CONTENT_OBJECT) {
+ TRANSPORT_LOGD("Receive manifest %s", content_object->getName().toString().c_str());
onUntrustedContentObject(std::move(interest), std::move(content_object));
}
}
diff --git a/libtransport/src/protocols/raaqm.cc b/libtransport/src/protocols/raaqm.cc
index 0a93dec44..f8da69ceb 100644
--- a/libtransport/src/protocols/raaqm.cc
+++ b/libtransport/src/protocols/raaqm.cc
@@ -460,7 +460,6 @@ void RaaqmTransportProtocol::scheduleNextInterests() {
// send at least one interest if there are retransmissions to perform and
// there is no space left in the window
sendInterest(std::move(interest_to_retransmit_.front()));
- TRANSPORT_LOGD("Window full, retransmit one content interest");
interest_to_retransmit_.pop();
}
@@ -470,7 +469,6 @@ void RaaqmTransportProtocol::scheduleNextInterests() {
while (interests_in_flight_ < current_window_size_) {
if (interest_to_retransmit_.size() > 0) {
sendInterest(std::move(interest_to_retransmit_.front()));
- TRANSPORT_LOGD("Retransmit content interest");
interest_to_retransmit_.pop();
} else {
index = index_manager_->getNextSuffix();
@@ -479,7 +477,6 @@ void RaaqmTransportProtocol::scheduleNextInterests() {
}
sendInterest(index);
- TRANSPORT_LOGD("Send content interest %u", index);
}
}
}
@@ -508,6 +505,7 @@ bool RaaqmTransportProtocol::sendInterest(std::uint64_t next_suffix) {
// performed by sendInterest, will result in 0
interest_retransmissions_[next_suffix & mask] = ~0;
interest_timepoints_[next_suffix & mask] = utils::SteadyClock::now();
+
sendInterest(std::move(interest));
return true;
@@ -517,6 +515,7 @@ void RaaqmTransportProtocol::sendInterest(Interest::Ptr &&interest) {
interests_in_flight_++;
interest_retransmissions_[interest->getName().getSuffix() & mask]++;
+ TRANSPORT_LOGD("Send interest %s", interest->getName().toString().c_str());
portal_->sendInterest(std::move(interest));
}