summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--libtransport/includes/hicn/transport/core/packet.h8
-rw-r--r--libtransport/includes/hicn/transport/interfaces/p2psecure_socket_consumer.h4
-rw-r--r--libtransport/includes/hicn/transport/interfaces/socket_consumer.h2
-rw-r--r--libtransport/src/core/prefix.cc4
-rw-r--r--libtransport/src/implementation/p2psecure_socket_consumer.cc149
-rw-r--r--libtransport/src/implementation/p2psecure_socket_consumer.h18
-rw-r--r--libtransport/src/implementation/p2psecure_socket_producer.cc198
-rw-r--r--libtransport/src/implementation/p2psecure_socket_producer.h17
-rw-r--r--libtransport/src/implementation/socket_producer.h28
-rw-r--r--libtransport/src/implementation/tls_rtc_socket_producer.cc31
-rw-r--r--libtransport/src/implementation/tls_socket_consumer.cc52
-rw-r--r--libtransport/src/implementation/tls_socket_consumer.h21
-rw-r--r--libtransport/src/implementation/tls_socket_producer.cc159
-rw-r--r--libtransport/src/implementation/tls_socket_producer.h26
-rw-r--r--libtransport/src/interfaces/p2psecure_socket_consumer.cc10
-rw-r--r--libtransport/src/interfaces/tls_rtc_socket_producer.h4
-rw-r--r--libtransport/src/protocols/consumer.conf2
-rw-r--r--libtransport/src/protocols/incremental_indexer.cc4
-rw-r--r--libtransport/src/protocols/manifest_incremental_indexer.cc4
-rw-r--r--libtransport/src/protocols/raaqm.cc5
-rw-r--r--utils/src/hiperf.cc479
21 files changed, 600 insertions, 625 deletions
diff --git a/libtransport/includes/hicn/transport/core/packet.h b/libtransport/includes/hicn/transport/core/packet.h
index e58e7962d..2efd7439d 100644
--- a/libtransport/includes/hicn/transport/core/packet.h
+++ b/libtransport/includes/hicn/transport/core/packet.h
@@ -148,15 +148,15 @@ class Packet : public std::enable_shared_from_this<Packet> {
std::pair<const uint8_t *, std::size_t> getPayloadReference() const {
int signature_size = 0;
+
if (_is_ah(format_)) {
signature_size = (uint32_t)getSignatureSize();
}
auto header_size = getHeaderSizeFromFormat(format_, signature_size);
- auto payload_length = packet_->length() - header_size;
+ auto payload_length = payloadSize();
- return std::make_pair(packet_->data() + header_size,
- payload_length);
+ return std::make_pair(packet_->data() + header_size, payload_length);
}
Packet &updateLength(std::size_t length = 0);
@@ -229,6 +229,7 @@ class Packet : public std::enable_shared_from_this<Packet> {
Packet &setTTL(uint8_t hops);
uint8_t getTTL() const;
+ void separateHeaderPayload();
void resetPayload();
private:
@@ -248,7 +249,6 @@ class Packet : public std::enable_shared_from_this<Packet> {
}
uint8_t *getSignature() const;
- void separateHeaderPayload();
protected:
Name name_;
diff --git a/libtransport/includes/hicn/transport/interfaces/p2psecure_socket_consumer.h b/libtransport/includes/hicn/transport/interfaces/p2psecure_socket_consumer.h
index 097b0a8c0..224493f00 100644
--- a/libtransport/includes/hicn/transport/interfaces/p2psecure_socket_consumer.h
+++ b/libtransport/includes/hicn/transport/interfaces/p2psecure_socket_consumer.h
@@ -23,10 +23,10 @@ namespace interface {
class P2PSecureConsumerSocket : public ConsumerSocket {
public:
- P2PSecureConsumerSocket(int handshake_protocol, int protocol);
+ P2PSecureConsumerSocket(int handshake_protocol, int transport_protocol);
~P2PSecureConsumerSocket() = default;
+ void registerPrefix(const Prefix &producer_namespace);
};
} // namespace interface
-
} // end namespace transport
diff --git a/libtransport/includes/hicn/transport/interfaces/socket_consumer.h b/libtransport/includes/hicn/transport/interfaces/socket_consumer.h
index 0a6e9a43a..73cbb78b0 100644
--- a/libtransport/includes/hicn/transport/interfaces/socket_consumer.h
+++ b/libtransport/includes/hicn/transport/interfaces/socket_consumer.h
@@ -106,7 +106,7 @@ class ConsumerSocket {
/**
* This method will be called by the transport for understanding how many
- * bytes it should read (at most) before notifying the application.
+ * bytes it should read before notifying the application.
*
* By default it reads 64 KB.
*/
diff --git a/libtransport/src/core/prefix.cc b/libtransport/src/core/prefix.cc
index 30f780461..eea4aeb8b 100644
--- a/libtransport/src/core/prefix.cc
+++ b/libtransport/src/core/prefix.cc
@@ -211,8 +211,8 @@ Name Prefix::getName(const core::Name &mask, const core::Name &components,
}
}
- if (this->contains(name_ip))
- throw errors::RuntimeException("Mask overrides the prefix");
+ // if (this->contains(name_ip))
+ // throw errors::RuntimeException("Mask overrides the prefix");
return Name(ip_prefix_.family, (uint8_t *)&name_ip);
}
diff --git a/libtransport/src/implementation/p2psecure_socket_consumer.cc b/libtransport/src/implementation/p2psecure_socket_consumer.cc
index 40ab58161..9b79850d6 100644
--- a/libtransport/src/implementation/p2psecure_socket_consumer.cc
+++ b/libtransport/src/implementation/p2psecure_socket_consumer.cc
@@ -33,10 +33,6 @@ void P2PSecureConsumerSocket::setInterestPayload(
if (payload_ != NULL) int2.appendPayload(std::move(payload_));
}
-// implement void readBufferAvailable(), size_t maxBufferSize() const override,
-// void readError(), void readSuccess(). getReadBuffer() and readDataAvailable()
-// must be implemented even if empty.
-
/* Return the number of read bytes in the return param */
int readOld(BIO *b, char *buf, int size) {
if (size < 0) return size;
@@ -51,11 +47,13 @@ int readOld(BIO *b, char *buf, int size) {
socket->network_name_.setSuffix(socket->random_suffix_);
socket->ConsumerSocket::asyncConsume(socket->network_name_);
}
+
if (!socket->something_to_read_) socket->cv_.wait(lck);
}
size_t size_to_read, read;
size_t chain_size = socket->head_->length();
+
if (socket->head_->isChained())
chain_size = socket->head_->computeChainDataLength();
@@ -106,6 +104,7 @@ int writeOld(BIO *b, const char *buf, int num) {
socket = (P2PSecureConsumerSocket *)BIO_get_data(b);
socket->payload_ = utils::MemBuf::copyBuffer(buf, num);
+
socket->ConsumerSocket::setSocketOption(
ConsumerCallbacksOptions::INTEREST_OUTPUT,
(ConsumerInterestCallback)std::bind(
@@ -173,9 +172,9 @@ int P2PSecureConsumerSocket::parseHicnKeyIdCb(SSL *s, unsigned int ext_type,
P2PSecureConsumerSocket::P2PSecureConsumerSocket(
interface::ConsumerSocket *consumer, int handshake_protocol,
int transport_protocol)
- : ConsumerSocket(consumer, transport_protocol),
+ : ConsumerSocket(consumer, handshake_protocol),
name_(),
- tls_consumer_(),
+ tls_consumer_(nullptr),
buf_pool_(),
decrypted_content_(),
payload_(),
@@ -224,12 +223,6 @@ P2PSecureConsumerSocket::P2PSecureConsumerSocket(
BIO_set_data(bio, this);
SSL_set_bio(ssl_, bio, bio);
- ConsumerSocket::getSocketOption(MAX_WINDOW_SIZE, old_max_win_);
- ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, (double)1.0);
-
- ConsumerSocket::getSocketOption(CURRENT_WINDOW_SIZE, old_current_win_);
- ConsumerSocket::setSocketOption(CURRENT_WINDOW_SIZE, (double)1.0);
-
std::default_random_engine generator;
std::uniform_int_distribution<int> distribution(
1, std::numeric_limits<uint32_t>::max());
@@ -244,76 +237,29 @@ P2PSecureConsumerSocket::~P2PSecureConsumerSocket() {
SSL_shutdown(ssl_);
}
-int P2PSecureConsumerSocket::consume(const Name &name) {
- if (transport_protocol_->isRunning()) {
- return CONSUMER_BUSY;
- }
+int P2PSecureConsumerSocket::handshake() {
+ int result = 1;
- if ((SSL_in_before(this->ssl_) || SSL_in_init(this->ssl_))) {
- ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, (double)1.0);
- network_name_ = producer_namespace_.getRandomName();
- network_name_.setSuffix(0);
- int result = SSL_connect(this->ssl_);
- ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, old_max_win_);
- ConsumerSocket::setSocketOption(CURRENT_WINDOW_SIZE, old_current_win_);
- if (result != 1)
- throw errors::RuntimeException("Unable to perform client handshake");
+ if (!(SSL_in_before(this->ssl_) || SSL_in_init(this->ssl_))) {
+ return 1;
}
- std::shared_ptr<Name> prefix_name = std::make_shared<Name>(
- secure_prefix_.family,
- ip_address_get_buffer(&(secure_prefix_.address), secure_prefix_.family));
- std::shared_ptr<Prefix> prefix =
- std::make_shared<Prefix>(*prefix_name, secure_prefix_.len);
- TLSConsumerSocket tls_consumer(nullptr, this->protocol_, this->ssl_);
- tls_consumer.setInterface(new interface::TLSConsumerSocket(&tls_consumer));
- ConsumerTimerCallback *stats_summary_callback = nullptr;
- this->getSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY,
- &stats_summary_callback);
+ ConsumerSocket::getSocketOption(MAX_WINDOW_SIZE, old_max_win_);
+ ConsumerSocket::getSocketOption(CURRENT_WINDOW_SIZE, old_current_win_);
- uint32_t lifetime;
- this->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, lifetime);
- tls_consumer.setSocketOption(GeneralTransportOptions::INTEREST_LIFETIME,
- lifetime);
- tls_consumer.setSocketOption(ConsumerCallbacksOptions::READ_CALLBACK,
- read_callback_decrypted_);
- tls_consumer.setSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY,
- *stats_summary_callback);
- tls_consumer.setSocketOption(GeneralTransportOptions::STATS_INTERVAL,
- this->timer_interval_milliseconds_);
- tls_consumer.setSocketOption(MAX_WINDOW_SIZE, old_max_win_);
- tls_consumer.setSocketOption(CURRENT_WINDOW_SIZE, old_current_win_);
- tls_consumer.connect();
+ ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, (double)1.0);
+ ConsumerSocket::setSocketOption(CURRENT_WINDOW_SIZE, (double)1.0);
- if (payload_ != NULL)
- return tls_consumer.consume((prefix->mapName(name)), std::move(payload_));
- else
- return tls_consumer.consume((prefix->mapName(name)));
-}
+ network_name_ = producer_namespace_.getRandomName();
+ network_name_.setSuffix(0);
-int P2PSecureConsumerSocket::asyncConsume(const Name &name) {
- if ((SSL_in_before(this->ssl_) || SSL_in_init(this->ssl_))) {
- ConsumerSocket::setSocketOption(CURRENT_WINDOW_SIZE, (double)1.0);
- ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, (double)1.0);
- network_name_ = producer_namespace_.getRandomName();
- network_name_.setSuffix(0);
- TRANSPORT_LOGD("Start handshake at %s", network_name_.toString().c_str());
- interface::ConsumerSocket::ReadCallback *on_payload = VOID_HANDLER;
- this->getSocketOption(ConsumerCallbacksOptions::READ_CALLBACK, &on_payload);
- int result = SSL_connect(this->ssl_);
- ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, old_max_win_);
- ConsumerSocket::setSocketOption(CURRENT_WINDOW_SIZE, old_current_win_);
- if (result != 1)
- throw errors::RuntimeException("Unable to perform client handshake");
- TRANSPORT_LOGD("Handshake performed!");
- }
+ TRANSPORT_LOGD("Start handshake at %s", network_name_.toString().c_str());
+ result = SSL_connect(this->ssl_);
- std::shared_ptr<Name> prefix_name = std::make_shared<Name>(
- secure_prefix_.family,
- ip_address_get_buffer(&(secure_prefix_.address), secure_prefix_.family));
- std::shared_ptr<Prefix> prefix =
- std::make_shared<Prefix>(*prefix_name, secure_prefix_.len);
+ return result;
+}
+void P2PSecureConsumerSocket::initSessionSocket() {
tls_consumer_ =
std::make_shared<TLSConsumerSocket>(nullptr, this->protocol_, this->ssl_);
tls_consumer_->setInterface(
@@ -325,6 +271,7 @@ int P2PSecureConsumerSocket::asyncConsume(const Name &name) {
uint32_t lifetime;
this->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, lifetime);
+
tls_consumer_->setSocketOption(GeneralTransportOptions::INTEREST_LIFETIME,
lifetime);
tls_consumer_->setSocketOption(ConsumerCallbacksOptions::READ_CALLBACK,
@@ -336,6 +283,59 @@ int P2PSecureConsumerSocket::asyncConsume(const Name &name) {
tls_consumer_->setSocketOption(MAX_WINDOW_SIZE, old_max_win_);
tls_consumer_->setSocketOption(CURRENT_WINDOW_SIZE, old_current_win_);
tls_consumer_->connect();
+}
+
+int P2PSecureConsumerSocket::consume(const Name &name) {
+ if (transport_protocol_->isRunning()) {
+ return CONSUMER_BUSY;
+ }
+
+ if (handshake() != 1) {
+ throw errors::RuntimeException("Unable to perform client handshake");
+ } else {
+ TRANSPORT_LOGD("Handshake performed!");
+ }
+
+ initSessionSocket();
+
+ if (tls_consumer_ == nullptr) {
+ throw errors::RuntimeException("TLS socket does not exist");
+ }
+
+ std::shared_ptr<Name> prefix_name = std::make_shared<Name>(
+ secure_prefix_.family,
+ ip_address_get_buffer(&(secure_prefix_.address), secure_prefix_.family));
+ std::shared_ptr<Prefix> prefix =
+ std::make_shared<Prefix>(*prefix_name, secure_prefix_.len);
+
+ if (payload_ != nullptr)
+ return tls_consumer_->consume((prefix->mapName(name)), std::move(payload_));
+ else
+ return tls_consumer_->consume((prefix->mapName(name)));
+}
+
+int P2PSecureConsumerSocket::asyncConsume(const Name &name) {
+ if (transport_protocol_->isRunning()) {
+ return CONSUMER_BUSY;
+ }
+
+ if (handshake() != 1) {
+ throw errors::RuntimeException("Unable to perform client handshake");
+ } else {
+ TRANSPORT_LOGD("Handshake performed!");
+ }
+
+ initSessionSocket();
+
+ if (tls_consumer_ == nullptr) {
+ throw errors::RuntimeException("TLS socket does not exist");
+ }
+
+ std::shared_ptr<Name> prefix_name = std::make_shared<Name>(
+ secure_prefix_.family,
+ ip_address_get_buffer(&(secure_prefix_.address), secure_prefix_.family));
+ std::shared_ptr<Prefix> prefix =
+ std::make_shared<Prefix>(*prefix_name, secure_prefix_.len);
if (payload_ != NULL)
return tls_consumer_->asyncConsume((prefix->mapName(name)),
@@ -399,5 +399,4 @@ void P2PSecureConsumerSocket::readSuccess(std::size_t total_size) noexcept {
bool P2PSecureConsumerSocket::isBufferMovable() noexcept { return true; }
} // namespace implementation
-
} // namespace transport
diff --git a/libtransport/src/implementation/p2psecure_socket_consumer.h b/libtransport/src/implementation/p2psecure_socket_consumer.h
index e2ebaf94e..d4c3b26c2 100644
--- a/libtransport/src/implementation/p2psecure_socket_consumer.h
+++ b/libtransport/src/implementation/p2psecure_socket_consumer.h
@@ -69,39 +69,26 @@ class P2PSecureConsumerSocket : public ConsumerSocket,
private:
Name name_;
std::shared_ptr<TLSConsumerSocket> tls_consumer_;
-
/* SSL handle */
SSL *ssl_;
SSL_CTX *ctx_;
BIO_METHOD *bio_meth_;
-
/* Chain of MemBuf to be used as a temporary buffer to pass descypted data
* from the underlying layer to the application */
utils::ObjectPool<utils::MemBuf> buf_pool_;
std::unique_ptr<utils::MemBuf> decrypted_content_;
-
/* Chain of MemBuf holding the payload to be written into interest or data */
std::unique_ptr<utils::MemBuf> payload_;
-
/* Chain of MemBuf holding the data retrieved from the underlying layer */
std::unique_ptr<utils::MemBuf> head_;
-
bool something_to_read_;
-
bool content_downloaded_;
-
double old_max_win_;
-
double old_current_win_;
-
uint32_t random_suffix_;
-
ip_prefix_t secure_prefix_;
-
Prefix producer_namespace_;
-
interface::ConsumerSocket::ReadCallback *read_callback_decrypted_;
-
std::mutex mtx_;
/* Condition variable for the wait */
@@ -138,9 +125,12 @@ class P2PSecureConsumerSocket : public ConsumerSocket,
virtual void readError(const std::error_code ec) noexcept override;
virtual void readSuccess(std::size_t total_size) noexcept override;
+
virtual bool isBufferMovable() noexcept override;
- int download_content(const Name &name);
+ int handshake();
+
+ void initSessionSocket();
};
} // namespace implementation
diff --git a/libtransport/src/implementation/p2psecure_socket_producer.cc b/libtransport/src/implementation/p2psecure_socket_producer.cc
index d0852539a..15c7d25cd 100644
--- a/libtransport/src/implementation/p2psecure_socket_producer.cc
+++ b/libtransport/src/implementation/p2psecure_socket_producer.cc
@@ -37,9 +37,8 @@ P2PSecureProducerSocket::P2PSecureProducerSocket(
: ProducerSocket(producer_socket),
mtx_(),
cv_(),
- map_secure_producers(),
- map_secure_rtc_producers(),
- list_secure_producers() {}
+ map_producers(),
+ list_producers() {}
P2PSecureProducerSocket::P2PSecureProducerSocket(
interface::ProducerSocket *producer_socket, bool rtc,
@@ -48,12 +47,9 @@ P2PSecureProducerSocket::P2PSecureProducerSocket(
rtc_(rtc),
mtx_(),
cv_(),
- map_secure_producers(),
- map_secure_rtc_producers(),
- list_secure_producers() {
- /*
- * Setup SSL context (identity and parameter to use TLS 1.3)
- */
+ map_producers(),
+ list_producers() {
+ /* Setup SSL context (identity and parameter to use TLS 1.3) */
der_cert_ = parcKeyStore_GetDEREncodedCertificate(
(identity->getSigner()->getKeyStore()));
der_prk_ = parcKeyStore_GetDEREncodedPrivateKey(
@@ -68,10 +64,8 @@ P2PSecureProducerSocket::P2PSecureProducerSocket(
cert_509_ = d2i_X509(NULL, &cert, cert_size);
pkey_rsa_ = d2i_AutoPrivateKey(NULL, &prk, prk_size);
- /*
- * Set the callback so that when an interest is received we catch it and we
- * decrypt the payload before passing it to the application.
- */
+ /* Set the callback so that when an interest is received we catch it and we
+ * decrypt the payload before passing it to the application. */
ProducerSocket::setSocketOption(
ProducerCallbacksOptions::INTEREST_INPUT,
(ProducerInterestCallback)std::bind(
@@ -84,53 +78,61 @@ P2PSecureProducerSocket::~P2PSecureProducerSocket() {
if (der_prk_) parcBuffer_Release(&der_prk_);
}
+void P2PSecureProducerSocket::initSessionSocket(
+ std::unique_ptr<TLSProducerSocket> &producer) {
+ producer->on_content_produced_application_ =
+ this->on_content_produced_application_;
+ producer->setSocketOption(CONTENT_OBJECT_EXPIRY_TIME,
+ this->content_object_expiry_time_);
+ producer->setSocketOption(SIGNER, this->signer_);
+ producer->setSocketOption(MAKE_MANIFEST, this->making_manifest_);
+ producer->setSocketOption(DATA_PACKET_SIZE,
+ (uint32_t)(this->data_packet_size_));
+ producer->output_buffer_.setLimit(this->output_buffer_.getLimit());
+
+ if (!rtc_) {
+ producer->setInterface(new interface::TLSProducerSocket(producer.get()));
+ } else {
+ TLSRTCProducerSocket *rtc_producer =
+ dynamic_cast<TLSRTCProducerSocket *>(producer.get());
+ rtc_producer->setInterface(
+ new interface::TLSRTCProducerSocket(rtc_producer));
+ }
+}
+
void P2PSecureProducerSocket::onInterestCallback(interface::ProducerSocket &p,
Interest &interest) {
std::unique_lock<std::mutex> lck(mtx_);
+ std::unique_ptr<TLSProducerSocket> tls_producer;
+ auto it = map_producers.find(interest.getName());
+
+ if (it != map_producers.end()) {
+ return;
+ }
+
+ if (!rtc_) {
+ tls_producer =
+ std::make_unique<TLSProducerSocket>(nullptr, this, interest.getName());
+ } else {
+ tls_producer = std::make_unique<TLSRTCProducerSocket>(nullptr, this,
+ interest.getName());
+ }
+
+ initSessionSocket(tls_producer);
+ TLSProducerSocket *tls_producer_ptr = tls_producer.get();
+ map_producers.insert({interest.getName(), move(tls_producer)});
TRANSPORT_LOGD("Start handshake at %s",
interest.getName().toString().c_str());
+
if (!rtc_) {
- auto it = map_secure_producers.find(interest.getName());
- if (it != map_secure_producers.end()) return;
- TLSProducerSocket *tls_producer =
- new TLSProducerSocket(nullptr, this, interest.getName());
- tls_producer->setInterface(new interface::TLSProducerSocket(tls_producer));
-
- tls_producer->on_content_produced_application_ =
- this->on_content_produced_application_;
- tls_producer->setSocketOption(CONTENT_OBJECT_EXPIRY_TIME,
- this->content_object_expiry_time_);
- tls_producer->setSocketOption(SIGNER, this->signer_);
- tls_producer->setSocketOption(MAKE_MANIFEST, this->making_manifest_);
- tls_producer->setSocketOption(DATA_PACKET_SIZE,
- (uint32_t)(this->data_packet_size_));
- tls_producer->output_buffer_.setLimit(this->output_buffer_.getLimit());
- map_secure_producers.insert(
- {interest.getName(), std::unique_ptr<TLSProducerSocket>(tls_producer)});
- tls_producer->onInterest(*tls_producer, interest);
- tls_producer->async_accept();
+ tls_producer_ptr->onInterest(*tls_producer_ptr, interest);
+ tls_producer_ptr->async_accept();
} else {
- auto it = map_secure_rtc_producers.find(interest.getName());
- if (it != map_secure_rtc_producers.end()) return;
- TLSRTCProducerSocket *tls_producer =
- new TLSRTCProducerSocket(nullptr, this, interest.getName());
- tls_producer->setInterface(
- new interface::TLSRTCProducerSocket(tls_producer));
- tls_producer->on_content_produced_application_ =
- this->on_content_produced_application_;
- tls_producer->setSocketOption(CONTENT_OBJECT_EXPIRY_TIME,
- this->content_object_expiry_time_);
- tls_producer->setSocketOption(SIGNER, this->signer_);
- tls_producer->setSocketOption(MAKE_MANIFEST, this->making_manifest_);
- tls_producer->setSocketOption(DATA_PACKET_SIZE,
- (uint32_t)(this->data_packet_size_));
- tls_producer->output_buffer_.setLimit(this->output_buffer_.getLimit());
- map_secure_rtc_producers.insert(
- {interest.getName(),
- std::unique_ptr<TLSRTCProducerSocket>(tls_producer)});
- tls_producer->onInterest(*tls_producer, interest);
- tls_producer->async_accept();
+ TLSRTCProducerSocket *rtc_producer_ptr =
+ dynamic_cast<TLSRTCProducerSocket *>(tls_producer_ptr);
+ rtc_producer_ptr->onInterest(*rtc_producer_ptr, interest);
+ rtc_producer_ptr->async_accept();
}
}
@@ -143,11 +145,13 @@ void P2PSecureProducerSocket::produce(const uint8_t *buffer,
}
std::unique_lock<std::mutex> lck(mtx_);
- if (list_secure_rtc_producers.empty()) cv_.wait(lck);
- for (auto it = list_secure_rtc_producers.cbegin();
- it != list_secure_rtc_producers.cend(); it++) {
- (*it)->produce(utils::MemBuf::copyBuffer(buffer, buffer_size));
+ if (list_producers.empty()) cv_.wait(lck);
+
+ for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) {
+ TLSRTCProducerSocket *rtc_producer =
+ dynamic_cast<TLSRTCProducerSocket *>(it->get());
+ rtc_producer->produce(utils::MemBuf::copyBuffer(buffer, buffer_size));
}
}
@@ -162,12 +166,13 @@ uint32_t P2PSecureProducerSocket::produce(
std::unique_lock<std::mutex> lck(mtx_);
uint32_t segments = 0;
- if (list_secure_producers.empty()) cv_.wait(lck);
- for (auto it = list_secure_producers.cbegin();
- it != list_secure_producers.cend(); it++)
+ if (list_producers.empty()) cv_.wait(lck);
+
+ for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++)
segments +=
(*it)->produce(content_name, buffer->clone(), is_last, start_offset);
+
return segments;
}
@@ -183,12 +188,12 @@ uint32_t P2PSecureProducerSocket::produce(Name content_name,
std::unique_lock<std::mutex> lck(mtx_);
uint32_t segments = 0;
- if (list_secure_producers.empty()) cv_.wait(lck);
+ if (list_producers.empty()) cv_.wait(lck);
- for (auto it = list_secure_producers.cbegin();
- it != list_secure_producers.cend(); it++)
+ for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++)
segments += (*it)->produce(content_name, buffer, buffer_size, is_last,
start_offset);
+
return segments;
}
@@ -203,10 +208,9 @@ void P2PSecureProducerSocket::asyncProduce(const Name &content_name,
}
std::unique_lock<std::mutex> lck(mtx_);
- if (list_secure_producers.empty()) cv_.wait(lck);
+ if (list_producers.empty()) cv_.wait(lck);
- for (auto it = list_secure_producers.cbegin();
- it != list_secure_producers.cend(); it++) {
+ for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) {
(*it)->asyncProduce(content_name, buf, buffer_size, is_last, start_offset);
}
}
@@ -221,22 +225,19 @@ void P2PSecureProducerSocket::asyncProduce(
}
std::unique_lock<std::mutex> lck(mtx_);
- if (list_secure_producers.empty()) cv_.wait(lck);
+ if (list_producers.empty()) cv_.wait(lck);
- for (auto it = list_secure_producers.cbegin();
- it != list_secure_producers.cend(); it++) {
+ for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++) {
(*it)->asyncProduce(content_name, buffer->clone(), is_last, offset,
last_segment);
}
}
-// Socket Option Redefinition to avoid name hiding
-
+/* Redefinition of socket options to avoid name hiding */
int P2PSecureProducerSocket::setSocketOption(
int socket_option_key, ProducerInterestCallback socket_option_value) {
- if (!list_secure_producers.empty()) {
- for (auto it = list_secure_producers.cbegin();
- it != list_secure_producers.cend(); it++)
+ if (!list_producers.empty()) {
+ for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++)
(*it)->setSocketOption(socket_option_key, socket_option_value);
}
@@ -269,9 +270,8 @@ int P2PSecureProducerSocket::setSocketOption(
int P2PSecureProducerSocket::setSocketOption(
int socket_option_key,
const std::shared_ptr<utils::Signer> &socket_option_value) {
- if (!list_secure_producers.empty())
- for (auto it = list_secure_producers.cbegin();
- it != list_secure_producers.cend(); it++)
+ if (!list_producers.empty())
+ for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++)
(*it)->setSocketOption(socket_option_key, socket_option_value);
switch (socket_option_key) {
@@ -288,9 +288,8 @@ int P2PSecureProducerSocket::setSocketOption(
int P2PSecureProducerSocket::setSocketOption(int socket_option_key,
uint32_t socket_option_value) {
- if (!list_secure_producers.empty()) {
- for (auto it = list_secure_producers.cbegin();
- it != list_secure_producers.cend(); it++)
+ if (!list_producers.empty()) {
+ for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++)
(*it)->setSocketOption(socket_option_key, socket_option_value);
}
switch (socket_option_key) {
@@ -305,9 +304,8 @@ int P2PSecureProducerSocket::setSocketOption(int socket_option_key,
int P2PSecureProducerSocket::setSocketOption(int socket_option_key,
bool socket_option_value) {
- if (!list_secure_producers.empty())
- for (auto it = list_secure_producers.cbegin();
- it != list_secure_producers.cend(); it++)
+ if (!list_producers.empty())
+ for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++)
(*it)->setSocketOption(socket_option_key, socket_option_value);
return ProducerSocket::setSocketOption(socket_option_key,
@@ -316,9 +314,8 @@ int P2PSecureProducerSocket::setSocketOption(int socket_option_key,
int P2PSecureProducerSocket::setSocketOption(int socket_option_key,
Name *socket_option_value) {
- if (!list_secure_producers.empty())
- for (auto it = list_secure_producers.cbegin();
- it != list_secure_producers.cend(); it++)
+ if (!list_producers.empty())
+ for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++)
(*it)->setSocketOption(socket_option_key, socket_option_value);
return ProducerSocket::setSocketOption(socket_option_key,
@@ -327,9 +324,8 @@ int P2PSecureProducerSocket::setSocketOption(int socket_option_key,
int P2PSecureProducerSocket::setSocketOption(
int socket_option_key, std::list<Prefix> socket_option_value) {
- if (!list_secure_producers.empty())
- for (auto it = list_secure_producers.cbegin();
- it != list_secure_producers.cend(); it++)
+ if (!list_producers.empty())
+ for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++)
(*it)->setSocketOption(socket_option_key, socket_option_value);
return ProducerSocket::setSocketOption(socket_option_key,
@@ -338,9 +334,8 @@ int P2PSecureProducerSocket::setSocketOption(
int P2PSecureProducerSocket::setSocketOption(
int socket_option_key, ProducerContentObjectCallback socket_option_value) {
- if (!list_secure_producers.empty())
- for (auto it = list_secure_producers.cbegin();
- it != list_secure_producers.cend(); it++)
+ if (!list_producers.empty())
+ for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++)
(*it)->setSocketOption(socket_option_key, socket_option_value);
return ProducerSocket::setSocketOption(socket_option_key,
@@ -349,9 +344,8 @@ int P2PSecureProducerSocket::setSocketOption(
int P2PSecureProducerSocket::setSocketOption(
int socket_option_key, ProducerContentCallback socket_option_value) {
- if (!list_secure_producers.empty())
- for (auto it = list_secure_producers.cbegin();
- it != list_secure_producers.cend(); it++)
+ if (!list_producers.empty())
+ for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++)
(*it)->setSocketOption(socket_option_key, socket_option_value);
switch (socket_option_key) {
@@ -368,9 +362,8 @@ int P2PSecureProducerSocket::setSocketOption(
int P2PSecureProducerSocket::setSocketOption(
int socket_option_key, utils::CryptoHashType socket_option_value) {
- if (!list_secure_producers.empty())
- for (auto it = list_secure_producers.cbegin();
- it != list_secure_producers.cend(); it++)
+ if (!list_producers.empty())
+ for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++)
(*it)->setSocketOption(socket_option_key, socket_option_value);
return ProducerSocket::setSocketOption(socket_option_key,
@@ -379,9 +372,8 @@ int P2PSecureProducerSocket::setSocketOption(
int P2PSecureProducerSocket::setSocketOption(
int socket_option_key, utils::CryptoSuite socket_option_value) {
- if (!list_secure_producers.empty())
- for (auto it = list_secure_producers.cbegin();
- it != list_secure_producers.cend(); it++)
+ if (!list_producers.empty())
+ for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++)
(*it)->setSocketOption(socket_option_key, socket_option_value);
return ProducerSocket::setSocketOption(socket_option_key,
@@ -390,9 +382,8 @@ int P2PSecureProducerSocket::setSocketOption(
int P2PSecureProducerSocket::setSocketOption(
int socket_option_key, const std::string &socket_option_value) {
- if (!list_secure_producers.empty())
- for (auto it = list_secure_producers.cbegin();
- it != list_secure_producers.cend(); it++)
+ if (!list_producers.empty())
+ for (auto it = list_producers.cbegin(); it != list_producers.cend(); it++)
(*it)->setSocketOption(socket_option_key, socket_option_value);
return ProducerSocket::setSocketOption(socket_option_key,
@@ -400,5 +391,4 @@ int P2PSecureProducerSocket::setSocketOption(
}
} // namespace implementation
-
} // namespace transport
diff --git a/libtransport/src/implementation/p2psecure_socket_producer.h b/libtransport/src/implementation/p2psecure_socket_producer.h
index 33339deba..bfc9fc2c1 100644
--- a/libtransport/src/implementation/p2psecure_socket_producer.h
+++ b/libtransport/src/implementation/p2psecure_socket_producer.h
@@ -37,9 +37,11 @@ class P2PSecureProducerSocket : public ProducerSocket {
public:
explicit P2PSecureProducerSocket(interface::ProducerSocket *producer_socket);
+
explicit P2PSecureProducerSocket(
interface::ProducerSocket *producer_socket, bool rtc,
const std::shared_ptr<utils::Identity> &identity);
+
~P2PSecureProducerSocket();
void produce(const uint8_t *buffer, size_t buffer_size) override;
@@ -96,7 +98,6 @@ class P2PSecureProducerSocket : public ProducerSocket {
using ProducerSocket::onInterest;
protected:
- bool rtc_;
/* Callback invoked once an interest has been received and its payload
* decrypted */
ProducerInterestCallback on_interest_input_decrypted_;
@@ -104,27 +105,23 @@ class P2PSecureProducerSocket : public ProducerSocket {
ProducerContentCallback on_content_produced_application_;
private:
+ bool rtc_;
std::mutex mtx_;
-
/* Condition variable for the wait */
std::condition_variable cv_;
-
PARCBuffer *der_cert_;
PARCBuffer *der_prk_;
X509 *cert_509_;
EVP_PKEY *pkey_rsa_;
std::unordered_map<core::Name, std::unique_ptr<TLSProducerSocket>,
core::hash<core::Name>, core::compare2<core::Name>>
- map_secure_producers;
- std::unordered_map<core::Name, std::unique_ptr<TLSRTCProducerSocket>,
- core::hash<core::Name>, core::compare2<core::Name>>
- map_secure_rtc_producers;
- std::list<std::unique_ptr<TLSProducerSocket>> list_secure_producers;
- std::list<std::unique_ptr<TLSRTCProducerSocket>> list_secure_rtc_producers;
+ map_producers;
+ std::list<std::unique_ptr<TLSProducerSocket>> list_producers;
void onInterestCallback(interface::ProducerSocket &p, Interest &interest);
+
+ void initSessionSocket(std::unique_ptr<TLSProducerSocket> &producer);
};
} // namespace implementation
-
} // namespace transport
diff --git a/libtransport/src/implementation/socket_producer.h b/libtransport/src/implementation/socket_producer.h
index 9ccc59d9e..8c5c453fc 100644
--- a/libtransport/src/implementation/socket_producer.h
+++ b/libtransport/src/implementation/socket_producer.h
@@ -131,7 +131,7 @@ class ProducerSocket : public Socket<BasePortal>,
// TODO Manifest may still be used for indexing
if (making_manifest && !signer) {
- TRANSPORT_LOGD("Making manifests without setting producer identity.");
+ TRANSPORT_LOGE("Making manifests without setting producer identity.");
}
core::Packet::Format hf_format = core::Packet::Format::HF_UNSPEC;
@@ -192,7 +192,6 @@ class ProducerSocket : public Socket<BasePortal>,
}
}
- TRANSPORT_LOGD("--------- START PRODUCE ----------");
for (unsigned int packaged_segments = 0;
packaged_segments < number_of_segments; packaged_segments++) {
if (making_manifest) {
@@ -207,13 +206,12 @@ class ProducerSocket : public Socket<BasePortal>,
}
passContentObjectToCallbacks(manifest);
- TRANSPORT_LOGD("Send manifest %u", manifest->getName().getSuffix());
+ TRANSPORT_LOGD("Send manifest %s", manifest->getName().toString().c_str());
// Send content objects stored in the queue
while (!content_queue_.empty()) {
passContentObjectToCallbacks(content_queue_.front());
- TRANSPORT_LOGD("Send content %u",
- content_queue_.front()->getName().getSuffix());
+ TRANSPORT_LOGD("Send content %s", content_queue_.front()->getName().toString().c_str());
content_queue_.pop();
}
@@ -270,8 +268,7 @@ class ProducerSocket : public Socket<BasePortal>,
signer->sign(*content_object);
}
passContentObjectToCallbacks(content_object);
- TRANSPORT_LOGD("Send content %u",
- content_object->getName().getSuffix());
+ TRANSPORT_LOGD("Send content %s", content_object->getName().toString().c_str());
}
}
@@ -286,11 +283,11 @@ class ProducerSocket : public Socket<BasePortal>,
}
passContentObjectToCallbacks(manifest);
- TRANSPORT_LOGD("Send manifest %u", manifest->getName().getSuffix());
+ TRANSPORT_LOGD("Send manifest %s", manifest->getName().toString().c_str());
+
while (!content_queue_.empty()) {
passContentObjectToCallbacks(content_queue_.front());
- TRANSPORT_LOGD("Send content %u",
- content_queue_.front()->getName().getSuffix());
+ TRANSPORT_LOGD("Send content %s", content_queue_.front()->getName().toString().c_str());
content_queue_.pop();
}
}
@@ -949,18 +946,19 @@ class ProducerSocket : public Socket<BasePortal>,
std::mutex mtx;
/* Condition variable for the wait */
std::condition_variable cv;
- std::unique_lock<std::mutex> lck(mtx);
+
bool done = false;
io_service_.dispatch([&socket_option_key, &socket_option_value, &mtx, &cv,
&result, &done, &func]() {
std::unique_lock<std::mutex> lck(mtx);
done = true;
result = func(socket_option_key, socket_option_value);
-
- if (!done) {
- cv.wait(lck);
- }
+ cv.notify_all();
});
+ std::unique_lock<std::mutex> lck(mtx);
+ if (!done) {
+ cv.wait(lck);
+ }
} else {
result = func(socket_option_key, socket_option_value);
}
diff --git a/libtransport/src/implementation/tls_rtc_socket_producer.cc b/libtransport/src/implementation/tls_rtc_socket_producer.cc
index 3b3152993..9ef79ca23 100644
--- a/libtransport/src/implementation/tls_rtc_socket_producer.cc
+++ b/libtransport/src/implementation/tls_rtc_socket_producer.cc
@@ -53,7 +53,7 @@ int TLSRTCProducerSocket::readOld(BIO *b, char *buf, int size) {
(socket->cv_).wait(lck);
}
- utils::MemBuf *membuf = socket->packet_->next();
+ utils::MemBuf *membuf = socket->handshake_packet_->next();
int size_to_read;
if ((int)membuf->length() > size) {
@@ -91,10 +91,10 @@ int TLSRTCProducerSocket::writeOld(BIO *b, const char *buf, int num) {
TLSRTCProducerSocket *socket;
socket = (TLSRTCProducerSocket *)BIO_get_data(b);
- if ((SSL_in_before(socket->ssl_) || SSL_in_init(socket->ssl_)) &&
- socket->first_) {
- socket->tls_chunks_--;
+ if (socket->getHandshakeState() != SERVER_FINISHED && socket->first_) {
bool making_manifest = socket->parent_->making_manifest_;
+
+ socket->tls_chunks_--;
socket->parent_->setSocketOption(GeneralTransportOptions::MAKE_MANIFEST,
false);
socket->parent_->ProducerSocket::produce(
@@ -107,13 +107,17 @@ int TLSRTCProducerSocket::writeOld(BIO *b, const char *buf, int num) {
std::unique_ptr<utils::MemBuf> mbuf =
utils::MemBuf::copyBuffer(buf, (std::size_t)num, 0, 0);
auto a = mbuf.release();
+
socket->async_thread_.add([socket = socket, a]() {
socket->to_call_oncontentproduced_--;
auto mbuf = std::unique_ptr<utils::MemBuf>(a);
+
socket->RTCProducerSocket::produce(std::move(mbuf));
+
ProducerContentCallback on_content_produced_application;
socket->getSocketOption(ProducerCallbacksOptions::CONTENT_PRODUCED,
on_content_produced_application);
+
if (socket->to_call_oncontentproduced_ == 0 &&
on_content_produced_application) {
on_content_produced_application(
@@ -144,24 +148,28 @@ TLSRTCProducerSocket::TLSRTCProducerSocket(
}
void TLSRTCProducerSocket::accept() {
- if (SSL_in_before(ssl_) || SSL_in_init(ssl_)) {
+ HandshakeState handshake_state = getHandshakeState();
+
+ if (handshake_state == UNINITIATED || handshake_state == CLIENT_HELLO) {
tls_chunks_ = 1;
int result = SSL_accept(ssl_);
+
if (result != 1)
throw errors::RuntimeException("Unable to perform client handshake");
}
TRANSPORT_LOGD("Handshake performed!");
- parent_->list_secure_rtc_producers.push_front(
- std::move(parent_->map_secure_rtc_producers[handshake_name_]));
- parent_->map_secure_rtc_producers.erase(handshake_name_);
+
+ parent_->list_producers.push_front(
+ std::move(parent_->map_producers[handshake_name_]));
+ parent_->map_producers.erase(handshake_name_);
ProducerInterestCallback on_interest_process_decrypted;
getSocketOption(ProducerCallbacksOptions::CACHE_MISS,
on_interest_process_decrypted);
if (on_interest_process_decrypted) {
- Interest inter(std::move(packet_));
+ Interest inter(std::move(handshake_packet_));
on_interest_process_decrypted(
(transport::interface::ProducerSocket &)(*getInterface()), inter);
}
@@ -181,7 +189,9 @@ int TLSRTCProducerSocket::async_accept() {
}
void TLSRTCProducerSocket::produce(std::unique_ptr<utils::MemBuf> &&buffer) {
- if (SSL_in_before(ssl_) || SSL_in_init(ssl_)) {
+ HandshakeState handshake_state = getHandshakeState();
+
+ if (handshake_state != SERVER_FINISHED) {
throw errors::RuntimeException(
"New handshake on the same P2P secure producer socket not supported");
}
@@ -197,5 +207,4 @@ void TLSRTCProducerSocket::produce(std::unique_ptr<utils::MemBuf> &&buffer) {
}
} // namespace implementation
-
} // namespace transport
diff --git a/libtransport/src/implementation/tls_socket_consumer.cc b/libtransport/src/implementation/tls_socket_consumer.cc
index 95b287aa6..7cf653848 100644
--- a/libtransport/src/implementation/tls_socket_consumer.cc
+++ b/libtransport/src/implementation/tls_socket_consumer.cc
@@ -46,11 +46,13 @@ int readOldTLS(BIO *b, char *buf, int size) {
socket->network_name_.setSuffix(socket->random_suffix_);
socket->ConsumerSocket::asyncConsume(socket->network_name_);
}
+
if (!socket->something_to_read_) socket->cv_.wait(lck);
}
size_t size_to_read, read;
size_t chain_size = socket->head_->length();
+
if (socket->head_->isChained())
chain_size = socket->head_->computeChainDataLength();
@@ -101,6 +103,7 @@ int writeOldTLS(BIO *b, const char *buf, int num) {
socket = (TLSConsumerSocket *)BIO_get_data(b);
socket->payload_ = utils::MemBuf::copyBuffer(buf, num);
+
socket->ConsumerSocket::setSocketOption(
ConsumerCallbacksOptions::INTEREST_OUTPUT,
(ConsumerInterestCallback)std::bind(
@@ -176,12 +179,6 @@ TLSConsumerSocket::TLSConsumerSocket(interface::ConsumerSocket *consumer_socket,
BIO_set_data(bio, this);
SSL_set_bio(ssl_, bio, bio);
- ConsumerSocket::getSocketOption(MAX_WINDOW_SIZE, old_max_win_);
- ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, (double)1.0);
-
- ConsumerSocket::getSocketOption(CURRENT_WINDOW_SIZE, old_current_win_);
- ConsumerSocket::setSocketOption(CURRENT_WINDOW_SIZE, (double)1.0);
-
std::default_random_engine generator;
std::uniform_int_distribution<int> distribution(
1, std::numeric_limits<uint32_t>::max());
@@ -191,10 +188,8 @@ TLSConsumerSocket::TLSConsumerSocket(interface::ConsumerSocket *consumer_socket,
this);
};
-/*
- * The producer interface is not owned by the application, so is TLSSocket task
- * to deallocate the memory
- */
+/* The producer interface is not owned by the application, so is TLSSocket task
+ * to deallocate the memory */
TLSConsumerSocket::~TLSConsumerSocket() { delete consumer_interface_; }
int TLSConsumerSocket::consume(const Name &name,
@@ -228,22 +223,15 @@ int TLSConsumerSocket::download_content(const Name &name) {
something_to_read_ = false;
content_downloaded_ = false;
- decrypted_content_ = utils::MemBuf::createCombined(SSL3_RT_MAX_PLAIN_LENGTH);
- uint8_t *buf = decrypted_content_->writableData();
- size_t size = 0;
+ std::size_t max_buffer_size = read_callback_decrypted_->maxBufferSize();
+ std::size_t buffer_size = read_callback_decrypted_->maxBufferSize() + SSL3_RT_MAX_PLAIN_LENGTH;
+ decrypted_content_ = utils::MemBuf::createCombined(buffer_size);
int result = -1;
+ std::size_t size = 0;
while (!content_downloaded_ || something_to_read_) {
- if (decrypted_content_->tailroom() < SSL3_RT_MAX_PLAIN_LENGTH) {
- decrypted_content_->appendChain(
- utils::MemBuf::createCombined(SSL3_RT_MAX_PLAIN_LENGTH));
- // decrypted_content_->computeChainDataLength();
- buf = decrypted_content_->prev()->writableData();
- } else {
- buf = decrypted_content_->writableTail();
- }
-
- result = SSL_read(this->ssl_, buf, SSL3_RT_MAX_PLAIN_LENGTH);
+ result = SSL_read(
+ this->ssl_, decrypted_content_->writableTail(), SSL3_RT_MAX_PLAIN_LENGTH);
/* SSL_read returns the data only if there were SSL3_RT_MAX_PLAIN_LENGTH of
* the data has been fully downloaded */
@@ -253,20 +241,20 @@ int TLSConsumerSocket::download_content(const Name &name) {
if (result >= 0) {
size += result;
- decrypted_content_->prepend(result);
- } else
+ decrypted_content_->append(result);
+ } else {
throw errors::RuntimeException("Unable to download content");
+ }
- if (size >= read_callback_decrypted_->maxBufferSize()) {
+ if (decrypted_content_->length() >= max_buffer_size) {
if (read_callback_decrypted_->isBufferMovable()) {
- // No need to perform an additional copy. The whole buffer will be
- // tranferred to the application.
-
+ /* No need to perform an additional copy. The whole buffer will be
+ * tranferred to the application. */
read_callback_decrypted_->readBufferAvailable(
std::move(decrypted_content_));
- decrypted_content_ = utils::MemBuf::create(SSL3_RT_MAX_PLAIN_LENGTH);
+ decrypted_content_ = utils::MemBuf::create(buffer_size);
} else {
- // The buffer will be copied into the application-provided buffer
+ /* The buffer will be copied into the application-provided buffer */
uint8_t *buffer;
std::size_t length;
std::size_t total_length = decrypted_content_->length();
@@ -358,6 +346,7 @@ size_t TLSConsumerSocket::maxBufferSize() const {
void TLSConsumerSocket::readBufferAvailable(
std::unique_ptr<utils::MemBuf> &&buffer) noexcept {
std::unique_lock<std::mutex> lck(this->mtx_);
+
if (head_) {
head_->prependChain(std::move(buffer));
} else {
@@ -380,5 +369,4 @@ void TLSConsumerSocket::readSuccess(std::size_t total_size) noexcept {
bool TLSConsumerSocket::isBufferMovable() noexcept { return true; }
} // namespace implementation
-
} // namespace transport
diff --git a/libtransport/src/implementation/tls_socket_consumer.h b/libtransport/src/implementation/tls_socket_consumer.h
index 2e88dc47e..6931a7a8a 100644
--- a/libtransport/src/implementation/tls_socket_consumer.h
+++ b/libtransport/src/implementation/tls_socket_consumer.h
@@ -69,42 +69,27 @@ class TLSConsumerSocket : public ConsumerSocket,
private:
Name name_;
-
/* SSL handle */
SSL *ssl_;
SSL_CTX *ctx_;
-
/* Chain of MemBuf to be used as a temporary buffer to pass descypted data
* from the underlying layer to the application */
utils::ObjectPool<utils::MemBuf> buf_pool_;
std::unique_ptr<utils::MemBuf> decrypted_content_;
-
- /* Chain of MemBuf holding the payload to be written into interest or data
- */
+ /* Chain of MemBuf holding the payload to be written into interest or data */
std::unique_ptr<utils::MemBuf> payload_;
-
/* Chain of MemBuf holding the data retrieved from the underlying layer */
std::unique_ptr<utils::MemBuf> head_;
-
bool something_to_read_;
-
bool content_downloaded_;
-
double old_max_win_;
-
double old_current_win_;
-
uint32_t random_suffix_;
-
Prefix producer_namespace_;
-
interface::ConsumerSocket::ReadCallback *read_callback_decrypted_;
-
std::mutex mtx_;
-
/* Condition variable for the wait */
std::condition_variable cv_;
-
utils::EventThread async_downloader_tls_;
void setInterestPayload(interface::ConsumerSocket &c,
@@ -123,11 +108,11 @@ class TLSConsumerSocket : public ConsumerSocket,
virtual void readError(const std::error_code ec) noexcept override;
virtual void readSuccess(std::size_t total_size) noexcept override;
+
virtual bool isBufferMovable() noexcept override;
int download_content(const Name &name);
};
} // namespace implementation
-
-} // end namespace transport \ No newline at end of file
+} // end namespace transport
diff --git a/libtransport/src/implementation/tls_socket_producer.cc b/libtransport/src/implementation/tls_socket_producer.cc
index 9a5b94a1c..339a1ad58 100644
--- a/libtransport/src/implementation/tls_socket_producer.cc
+++ b/libtransport/src/implementation/tls_socket_producer.cc
@@ -48,18 +48,17 @@ int TLSProducerSocket::readOld(BIO *b, char *buf, int size) {
TLSProducerSocket *socket;
socket = (TLSProducerSocket *)BIO_get_data(b);
- /* take a lock on the mutex. It will be unlocked by */
std::unique_lock<std::mutex> lck(socket->mtx_);
+
if (!socket->something_to_read_) {
(socket->cv_).wait(lck);
}
- /* Either there already is something to read, or the thread has been waken up
- */
- /* must return the payload in the interest */
-
- utils::MemBuf *membuf = socket->packet_->next();
+ /* Either there already is something to read, or the thread has been waken up.
+ * We must return the payload in the interest anyway */
+ utils::MemBuf *membuf = socket->handshake_packet_->next();
int size_to_read;
+
if ((int)membuf->length() > size) {
size_to_read = size;
} else {
@@ -97,11 +96,11 @@ int TLSProducerSocket::writeOld(BIO *b, const char *buf, int num) {
TLSProducerSocket *socket;
socket = (TLSProducerSocket *)BIO_get_data(b);
- if ((SSL_in_before(socket->ssl_) || SSL_in_init(socket->ssl_)) &&
- socket->first_) {
+ if (socket->getHandshakeState() != SERVER_FINISHED && socket->first_) {
+ bool making_manifest = socket->parent_->making_manifest_;
+
//! socket->tls_chunks_ corresponds to is_last
socket->tls_chunks_--;
- bool making_manifest = socket->parent_->making_manifest_;
socket->parent_->setSocketOption(GeneralTransportOptions::MAKE_MANIFEST,
false);
socket->parent_->ProducerSocket::produce(
@@ -116,16 +115,21 @@ int TLSProducerSocket::writeOld(BIO *b, const char *buf, int num) {
std::unique_ptr<utils::MemBuf> mbuf =
utils::MemBuf::copyBuffer(buf, (std::size_t)num, 0, 0);
auto a = mbuf.release();
+
socket->async_thread_.add([socket = socket, a]() {
+ auto mbuf = std::unique_ptr<utils::MemBuf>(a);
+
socket->tls_chunks_--;
socket->to_call_oncontentproduced_--;
- auto mbuf = std::unique_ptr<utils::MemBuf>(a);
+
socket->last_segment_ += socket->ProducerSocket::produce(
socket->name_, std::move(mbuf), socket->tls_chunks_ == 0,
socket->last_segment_);
+
ProducerContentCallback on_content_produced_application;
socket->getSocketOption(ProducerCallbacksOptions::CONTENT_PRODUCED,
on_content_produced_application);
+
if (socket->to_call_oncontentproduced_ == 0 &&
on_content_produced_application) {
on_content_produced_application(*socket->getInterface(),
@@ -144,8 +148,10 @@ TLSProducerSocket::TLSProducerSocket(interface::ProducerSocket *producer_socket,
on_content_produced_application_(),
mtx_(),
cv_(),
- something_to_read_(),
+ something_to_read_(false),
+ handshake_state_(UNINITIATED),
name_(),
+ handshake_packet_(),
last_segment_(0),
parent_(parent),
first_(true),
@@ -157,9 +163,7 @@ TLSProducerSocket::TLSProducerSocket(interface::ProducerSocket *producer_socket,
const SSL_METHOD *meth = TLS_server_method();
ctx_ = SSL_CTX_new(meth);
- /*
- * Setup SSL context (identity and parameter to use TLS 1.3)
- */
+ /* Setup SSL context (identity and parameter to use TLS 1.3) */
SSL_CTX_use_certificate(ctx_, parent->cert_509_);
SSL_CTX_use_PrivateKey(ctx_, parent->pkey_rsa_);
@@ -167,6 +171,7 @@ TLSProducerSocket::TLSProducerSocket(interface::ProducerSocket *producer_socket,
SSL_CTX_set_ciphersuites(ctx_,
"TLS_AES_256_GCM_SHA384:TLS_CHACHA20_POLY1305_"
"SHA256:TLS_AES_128_GCM_SHA256");
+
if (result != 1) {
throw errors::RuntimeException(
"Unable to set cipher list on TLS subsystem. Aborting.");
@@ -184,10 +189,9 @@ TLSProducerSocket::TLSProducerSocket(interface::ProducerSocket *producer_socket,
this, TLSProducerSocket::parseHicnKeyIdCb, NULL);
ssl_ = SSL_new(ctx_);
- /*
- * Setup this producer socker as the bio that TLS will use to write and read
- * data (in stream mode)
- */
+
+ /* Setup this producer socker as the bio that TLS will use to write and read
+ * data (in stream mode) */
BIO_METHOD *bio_meth =
BIO_meth_new(BIO_TYPE_ACCEPT, "secure producer socket");
BIO_meth_set_read(bio_meth, TLSProducerSocket::readOld);
@@ -197,15 +201,15 @@ TLSProducerSocket::TLSProducerSocket(interface::ProducerSocket *producer_socket,
BIO_set_init(bio, 1);
BIO_set_data(bio, this);
SSL_set_bio(ssl_, bio, bio);
- /*
- * Set the callback so that when an interest is received we catch it and we
- * decrypt the payload before passing it to the application.
- */
+
+ /* Set the callback so that when an interest is received we catch it and we
+ * decrypt the payload before passing it to the application. */
this->ProducerSocket::setSocketOption(
ProducerCallbacksOptions::CACHE_MISS,
(ProducerInterestCallback)std::bind(&TLSProducerSocket::cacheMiss, this,
std::placeholders::_1,
std::placeholders::_2));
+
this->ProducerSocket::setSocketOption(
ProducerCallbacksOptions::CONTENT_PRODUCED,
(ProducerContentCallback)bind(
@@ -213,35 +217,39 @@ TLSProducerSocket::TLSProducerSocket(interface::ProducerSocket *producer_socket,
std::placeholders::_2, std::placeholders::_3));
}
-/*
- * The producer interface is not owned by the application, so is TLSSocket task
- * to deallocate the memory
- */
+/* The producer interface is not owned by the application, so is TLSSocket task
+ * to deallocate the memory */
TLSProducerSocket::~TLSProducerSocket() { delete producer_interface_; }
void TLSProducerSocket::accept() {
- if (SSL_in_before(ssl_) || SSL_in_init(ssl_)) {
+ HandshakeState handshake_state = getHandshakeState();
+
+ if (handshake_state == UNINITIATED || handshake_state == CLIENT_HELLO) {
tls_chunks_ = 1;
int result = SSL_accept(ssl_);
+
if (result != 1)
throw errors::RuntimeException("Unable to perform client handshake");
}
- TRANSPORT_LOGD("Handshake performed!");
- parent_->list_secure_producers.push_front(
- std::move(parent_->map_secure_producers[handshake_name_]));
- parent_->map_secure_producers.erase(handshake_name_);
+
+ parent_->list_producers.push_front(
+ std::move(parent_->map_producers[handshake_name_]));
+ parent_->map_producers.erase(handshake_name_);
ProducerInterestCallback on_interest_process_decrypted;
getSocketOption(ProducerCallbacksOptions::CACHE_MISS,
on_interest_process_decrypted);
if (on_interest_process_decrypted) {
- Interest inter(std::move(packet_));
+ Interest inter(std::move(handshake_packet_));
on_interest_process_decrypted(*getInterface(), inter);
} else {
throw errors::RuntimeException(
- "On interest process unset. Unable to perform handshake");
+ "On interest process unset: unable to perform handshake");
}
+
+ handshake_state_ = SERVER_FINISHED;
+ TRANSPORT_LOGD("Handshake performed!");
}
int TLSProducerSocket::async_accept() {
@@ -249,71 +257,88 @@ int TLSProducerSocket::async_accept() {
async_thread_.add([this]() { this->accept(); });
} else {
throw errors::RuntimeException(
- "Async thread not running, impossible to perform handshake");
+ "Async thread not running: unable to perform handshake");
}
return 1;
}
void TLSProducerSocket::onInterest(ProducerSocket &p, Interest &interest) {
- /* Based on the state machine of (D)TLS, we know what action to do */
- if (SSL_in_before(ssl_) || SSL_in_init(ssl_)) {
+ HandshakeState handshake_state = getHandshakeState();
+
+ if (handshake_state == UNINITIATED || handshake_state == CLIENT_HELLO) {
std::unique_lock<std::mutex> lck(mtx_);
+
name_ = interest.getName();
+ interest.separateHeaderPayload();
+ handshake_packet_ = interest.acquireMemBufReference();
something_to_read_ = true;
- packet_ = interest.acquireMemBufReference();
- if (head_) {
- payload_->prependChain(interest.getPayload());
- } else {
- payload_ = interest.getPayload(); // std::move(interest.getPayload());
- }
+
cv_.notify_one();
- } else {
- name_ = interest.getName();
- packet_ = interest.acquireMemBufReference();
- payload_ = interest.getPayload();
+ return;
+ } else if (handshake_state == SERVER_FINISHED) {
+ interest.separateHeaderPayload();
+ handshake_packet_ = interest.acquireMemBufReference();
something_to_read_ = true;
- if (interest.getPayload()->length() > 0)
+ if (interest.getPayload()->length() > 0) {
SSL_read(
ssl_,
const_cast<unsigned char *>(interest.getPayload()->writableData()),
interest.getPayload()->length());
- }
+ }
+
+ ProducerInterestCallback on_interest_input_decrypted;
+ getSocketOption(ProducerCallbacksOptions::INTEREST_INPUT,
+ on_interest_input_decrypted);
- ProducerInterestCallback on_interest_input_decrypted;
- getSocketOption(ProducerCallbacksOptions::INTEREST_INPUT,
- on_interest_input_decrypted);
- if (on_interest_input_decrypted)
- (on_interest_input_decrypted)(*getInterface(), interest);
+ if (on_interest_input_decrypted)
+ (on_interest_input_decrypted)(*getInterface(), interest);
+ }
}
void TLSProducerSocket::cacheMiss(interface::ProducerSocket &p,
Interest &interest) {
- if (SSL_in_before(ssl_) || SSL_in_init(ssl_)) {
+ HandshakeState handshake_state = getHandshakeState();
+
+ if (handshake_state == CLIENT_HELLO) {
std::unique_lock<std::mutex> lck(mtx_);
- name_ = interest.getName();
+
+ interest.separateHeaderPayload();
+ handshake_packet_ = interest.acquireMemBufReference();
something_to_read_ = true;
- packet_ = interest.acquireMemBufReference();
- payload_ = interest.getPayload();
+ handshake_state_ = CLIENT_FINISHED;
+
cv_.notify_one();
- } else {
- name_ = interest.getName();
- packet_ = interest.acquireMemBufReference();
- payload_ = interest.getPayload();
+ } else if (handshake_state == SERVER_FINISHED) {
+ interest.separateHeaderPayload();
+ handshake_packet_ = interest.acquireMemBufReference();
something_to_read_ = true;
- if (interest.getPayload()->length() > 0)
+ if (interest.getPayload()->length() > 0) {
SSL_read(
ssl_,
const_cast<unsigned char *>(interest.getPayload()->writableData()),
interest.getPayload()->length());
+ }
if (on_interest_process_decrypted_ != VOID_HANDLER)
on_interest_process_decrypted_(*getInterface(), interest);
}
}
+TLSProducerSocket::HandshakeState TLSProducerSocket::getHandshakeState() {
+ if (SSL_in_before(ssl_)) {
+ handshake_state_ = UNINITIATED;
+ }
+
+ if (SSL_in_init(ssl_) && handshake_state_ == UNINITIATED) {
+ handshake_state_ = CLIENT_HELLO;
+ }
+
+ return handshake_state_;
+}
+
void TLSProducerSocket::onContentProduced(interface::ProducerSocket &p,
const std::error_code &err,
uint64_t bytes_written) {}
@@ -321,13 +346,13 @@ void TLSProducerSocket::onContentProduced(interface::ProducerSocket &p,
uint32_t TLSProducerSocket::produce(Name content_name,
std::unique_ptr<utils::MemBuf> &&buffer,
bool is_last, uint32_t start_offset) {
- if (SSL_in_before(ssl_) || SSL_in_init(ssl_)) {
+ if (getHandshakeState() != SERVER_FINISHED) {
throw errors::RuntimeException(
"New handshake on the same P2P secure producer socket not supported");
}
+
size_t buf_size = buffer->length();
name_ = served_namespaces_.front().mapName(content_name);
-
tls_chunks_ = to_call_oncontentproduced_ =
ceil((float)buf_size / (float)SSL3_RT_MAX_PLAIN_LENGTH);
@@ -388,6 +413,7 @@ void TLSProducerSocket::produce(ContentObject &content_object) {
long TLSProducerSocket::ctrl(BIO *b, int cmd, long num, void *ptr) {
if (cmd == BIO_CTRL_FLUSH) {
}
+
return 1;
}
@@ -397,6 +423,7 @@ int TLSProducerSocket::addHicnKeyIdCb(SSL *s, unsigned int ext_type,
X509 *x, size_t chainidx, int *al,
void *add_arg) {
TLSProducerSocket *socket = reinterpret_cast<TLSProducerSocket *>(add_arg);
+
if (ext_type == 100) {
ip_prefix_t ip_prefix =
socket->parent_->served_namespaces_.front().toIpPrefixStruct();
@@ -425,6 +452,7 @@ int TLSProducerSocket::addHicnKeyIdCb(SSL *s, unsigned int ext_type,
ip_address_t keyId_component = {};
u32 *mask_buf;
u32 *keyId_component_buf;
+
switch (inet_family) {
case AF_INET:
mask_buf = &(mask.v4.as_u32);
@@ -483,6 +511,7 @@ int TLSProducerSocket::setSocketOption(
[this](int socket_option_key,
ProducerInterestCallback socket_option_value) -> int {
int result = SOCKET_OPTION_SET;
+
switch (socket_option_key) {
case ProducerCallbacksOptions::INTEREST_INPUT:
on_interest_input_decrypted_ = socket_option_value;
@@ -508,6 +537,7 @@ int TLSProducerSocket::setSocketOption(
result = SOCKET_OPTION_NOT_SET;
break;
}
+
return result;
});
}
@@ -607,5 +637,4 @@ int TLSProducerSocket::getSocketOption(
}
} // namespace implementation
-
} // namespace transport
diff --git a/libtransport/src/implementation/tls_socket_producer.h b/libtransport/src/implementation/tls_socket_producer.h
index e910c8259..2382e8695 100644
--- a/libtransport/src/implementation/tls_socket_producer.h
+++ b/libtransport/src/implementation/tls_socket_producer.h
@@ -84,47 +84,44 @@ class TLSProducerSocket : virtual public ProducerSocket {
using ProducerSocket::setSocketOption;
protected:
+ enum HandshakeState {
+ UNINITIATED,
+ CLIENT_HELLO, // when CLIENT_HELLO interest has been received
+ CLIENT_FINISHED, // when CLIENT_FINISHED interest has been received
+ SERVER_FINISHED, // when handshake is done
+ };
/* Callback invoked once an interest has been received and its payload
* decrypted */
ProducerInterestCallback on_interest_input_decrypted_;
ProducerInterestCallback on_interest_process_decrypted_;
ProducerContentCallback on_content_produced_application_;
-
std::mutex mtx_;
-
/* Condition variable for the wait */
std::condition_variable cv_;
-
/* Bool variable, true if there is something to read (an interest arrived) */
bool something_to_read_;
-
+ /* Bool variable, true if CLIENT_FINISHED interest has been received */
+ HandshakeState handshake_state_;
/* First interest that open a secure connection */
transport::core::Name name_;
-
/* SSL handle */
SSL *ssl_;
SSL_CTX *ctx_;
-
- Packet::MemBufPtr packet_;
-
+ Packet::MemBufPtr handshake_packet_;
std::unique_ptr<utils::MemBuf> head_;
std::uint32_t last_segment_;
- std::shared_ptr<utils::MemBuf> payload_;
std::uint32_t key_id_;
-
std::thread *handshake;
P2PSecureProducerSocket *parent_;
-
bool first_;
Name handshake_name_;
int tls_chunks_;
int to_call_oncontentproduced_;
-
bool still_writing_;
-
utils::EventThread encryption_thread_;
void onInterest(ProducerSocket &p, Interest &interest);
+
void cacheMiss(interface::ProducerSocket &p, Interest &interest);
/* Return the number of read bytes in readbytes */
@@ -156,8 +153,9 @@ class TLSProducerSocket : virtual public ProducerSocket {
void onContentProduced(interface::ProducerSocket &p,
const std::error_code &err, uint64_t bytes_written);
+
+ HandshakeState getHandshakeState();
};
} // namespace implementation
-
} // end namespace transport
diff --git a/libtransport/src/interfaces/p2psecure_socket_consumer.cc b/libtransport/src/interfaces/p2psecure_socket_consumer.cc
index 2fa8bb6e3..038441dfc 100644
--- a/libtransport/src/interfaces/p2psecure_socket_consumer.cc
+++ b/libtransport/src/interfaces/p2psecure_socket_consumer.cc
@@ -21,11 +21,17 @@ namespace transport {
namespace interface {
P2PSecureConsumerSocket::P2PSecureConsumerSocket(int handshake_protocol,
- int protocol)
+ int transport_protocol)
: ConsumerSocket() {
socket_ = std::unique_ptr<implementation::ConsumerSocket>(
new implementation::P2PSecureConsumerSocket(this, handshake_protocol,
- protocol));
+ transport_protocol));
+}
+
+void P2PSecureConsumerSocket::registerPrefix(const Prefix &producer_namespace) {
+ implementation::P2PSecureConsumerSocket &secure_consumer_socket =
+ *(static_cast<implementation::P2PSecureConsumerSocket *>(socket_.get()));
+ secure_consumer_socket.registerPrefix(producer_namespace);
}
} // namespace interface
diff --git a/libtransport/src/interfaces/tls_rtc_socket_producer.h b/libtransport/src/interfaces/tls_rtc_socket_producer.h
index 434edb522..3ea84095b 100644
--- a/libtransport/src/interfaces/tls_rtc_socket_producer.h
+++ b/libtransport/src/interfaces/tls_rtc_socket_producer.h
@@ -28,9 +28,9 @@ namespace interface {
class TLSRTCProducerSocket : public ProducerSocket {
public:
TLSRTCProducerSocket(implementation::TLSRTCProducerSocket *implementation);
+
~TLSRTCProducerSocket();
};
} // namespace interface
-
-} // end namespace transport \ No newline at end of file
+} // end namespace transport
diff --git a/libtransport/src/protocols/consumer.conf b/libtransport/src/protocols/consumer.conf
index 1a366f32f..d0eab75ac 100644
--- a/libtransport/src/protocols/consumer.conf
+++ b/libtransport/src/protocols/consumer.conf
@@ -1,4 +1,4 @@
-; this file contais the parameters for RAAQM
+; This file contains the parameters for RAAQM
autotune = no
lifetime = 500
retransmissions = 128
diff --git a/libtransport/src/protocols/incremental_indexer.cc b/libtransport/src/protocols/incremental_indexer.cc
index e590b4fee..0872c4554 100644
--- a/libtransport/src/protocols/incremental_indexer.cc
+++ b/libtransport/src/protocols/incremental_indexer.cc
@@ -25,6 +25,8 @@ void IncrementalIndexer::onContentObject(
core::Interest::Ptr &&interest, core::ContentObject::Ptr &&content_object) {
using namespace interface;
+ TRANSPORT_LOGD("Receive content %s", content_object->getName().toString().c_str());
+
if (TRANSPORT_EXPECT_FALSE(content_object->testRst())) {
final_suffix_ = content_object->getName().getSuffix();
}
@@ -50,4 +52,4 @@ void IncrementalIndexer::onContentObject(
}
} // namespace protocol
-} // namespace transport \ No newline at end of file
+} // namespace transport
diff --git a/libtransport/src/protocols/manifest_incremental_indexer.cc b/libtransport/src/protocols/manifest_incremental_indexer.cc
index 1a2f9dec3..da835b577 100644
--- a/libtransport/src/protocols/manifest_incremental_indexer.cc
+++ b/libtransport/src/protocols/manifest_incremental_indexer.cc
@@ -37,10 +37,12 @@ ManifestIncrementalIndexer::ManifestIncrementalIndexer(
void ManifestIncrementalIndexer::onContentObject(
core::Interest::Ptr &&interest, core::ContentObject::Ptr &&content_object) {
- // Check if mainfiest or not
+ // Check if manifest or not
if (content_object->getPayloadType() == PayloadType::MANIFEST) {
+ TRANSPORT_LOGD("Receive content %s", content_object->getName().toString().c_str());
onUntrustedManifest(std::move(interest), std::move(content_object));
} else if (content_object->getPayloadType() == PayloadType::CONTENT_OBJECT) {
+ TRANSPORT_LOGD("Receive manifest %s", content_object->getName().toString().c_str());
onUntrustedContentObject(std::move(interest), std::move(content_object));
}
}
diff --git a/libtransport/src/protocols/raaqm.cc b/libtransport/src/protocols/raaqm.cc
index 0a93dec44..f8da69ceb 100644
--- a/libtransport/src/protocols/raaqm.cc
+++ b/libtransport/src/protocols/raaqm.cc
@@ -460,7 +460,6 @@ void RaaqmTransportProtocol::scheduleNextInterests() {
// send at least one interest if there are retransmissions to perform and
// there is no space left in the window
sendInterest(std::move(interest_to_retransmit_.front()));
- TRANSPORT_LOGD("Window full, retransmit one content interest");
interest_to_retransmit_.pop();
}
@@ -470,7 +469,6 @@ void RaaqmTransportProtocol::scheduleNextInterests() {
while (interests_in_flight_ < current_window_size_) {
if (interest_to_retransmit_.size() > 0) {
sendInterest(std::move(interest_to_retransmit_.front()));
- TRANSPORT_LOGD("Retransmit content interest");
interest_to_retransmit_.pop();
} else {
index = index_manager_->getNextSuffix();
@@ -479,7 +477,6 @@ void RaaqmTransportProtocol::scheduleNextInterests() {
}
sendInterest(index);
- TRANSPORT_LOGD("Send content interest %u", index);
}
}
}
@@ -508,6 +505,7 @@ bool RaaqmTransportProtocol::sendInterest(std::uint64_t next_suffix) {
// performed by sendInterest, will result in 0
interest_retransmissions_[next_suffix & mask] = ~0;
interest_timepoints_[next_suffix & mask] = utils::SteadyClock::now();
+
sendInterest(std::move(interest));
return true;
@@ -517,6 +515,7 @@ void RaaqmTransportProtocol::sendInterest(Interest::Ptr &&interest) {
interests_in_flight_++;
interest_retransmissions_[interest->getName().getSuffix() & mask]++;
+ TRANSPORT_LOGD("Send interest %s", interest->getName().toString().c_str());
portal_->sendInterest(std::move(interest));
}
diff --git a/utils/src/hiperf.cc b/utils/src/hiperf.cc
index 151e4df3d..0b1578b6f 100644
--- a/utils/src/hiperf.cc
+++ b/utils/src/hiperf.cc
@@ -64,25 +64,15 @@ namespace interface {
*/
struct ClientConfiguration {
ClientConfiguration()
- : name("b001::abcd", 0),
- verify(false),
- beta(-1.f),
- drop_factor(-1.f),
- window(-1),
- producer_certificate(""),
- passphrase(""),
- receive_buffer(nullptr),
- receive_buffer_size_(128 * 1024),
- download_size(0),
- report_interval_milliseconds_(1000),
- transport_protocol_(CBR),
- rtc_(false),
- test_mode_(false),
+ : name("b001::abcd", 0), verify(false), beta(-1.f), drop_factor(-1.f),
+ window(-1), producer_certificate(""), passphrase(""),
+ receive_buffer(nullptr), receive_buffer_size_(128 * 1024),
+ download_size(0), report_interval_milliseconds_(1000),
+ transport_protocol_(CBR), rtc_(false), test_mode_(false),
#ifdef SECURE_HICNTRANSPORT
secure_(false),
#endif
- producer_prefix_(),
- interest_lifetime_(500) {
+ producer_prefix_(), interest_lifetime_(500) {
}
Name name;
@@ -110,7 +100,7 @@ struct ClientConfiguration {
* Class for handling the production rate for the RTC producer.
*/
class Rate {
- public:
+public:
Rate() : rate_kbps_(0) {}
Rate(const std::string &rate) {
@@ -140,7 +130,7 @@ class Rate {
(uint32_t)std::round(packet_size * 1000.0 * 8.0 / (double)rate_kbps_));
}
- private:
+private:
float rate_kbps_;
};
@@ -149,23 +139,13 @@ class Rate {
*/
struct ServerConfiguration {
ServerConfiguration()
- : name("b001::abcd/64"),
- virtual_producer(true),
- manifest(false),
- live_production(false),
- sign(false),
- content_lifetime(600000000_U32),
- content_object_size(1440),
- download_size(20 * 1024 * 1024),
- hash_algorithm(utils::CryptoHashType::SHA_256),
- keystore_name(""),
- passphrase(""),
- keystore_password("cisco"),
- multiphase_produce_(false),
- rtc_(false),
- interactive_(false),
- production_rate_(std::string("2048kbps")),
- payload_size_(1440)
+ : name("b001::abcd/64"), virtual_producer(true), manifest(false),
+ live_production(false), sign(false), content_lifetime(600000000_U32),
+ content_object_size(1440), download_size(20 * 1024 * 1024),
+ hash_algorithm(utils::CryptoHashType::SHA_256), keystore_name(""),
+ passphrase(""), keystore_password("cisco"), multiphase_produce_(false),
+ rtc_(false), interactive_(false),
+ production_rate_(std::string("2048kbps")), payload_size_(1440)
#ifdef SECURE_HICNTRANSPORT
,
secure_(false)
@@ -214,13 +194,10 @@ class HIperfClient {
friend class KeyCallback;
friend class RTCCallback;
- public:
+public:
HIperfClient(const ClientConfiguration &conf)
- : configuration_(conf),
- total_duration_milliseconds_(0),
- old_bytes_value_(0),
- signals_(io_service_, SIGINT),
- expected_seg_(0),
+ : configuration_(conf), total_duration_milliseconds_(0),
+ old_bytes_value_(0), signals_(io_service_, SIGINT), expected_seg_(0),
lost_packets_(std::unordered_set<uint32_t>()),
rtc_callback_(configuration_.rtc_ ? new RTCCallback(*this) : nullptr),
callback_(configuration_.rtc_ ? nullptr : new Callback(*this)),
@@ -234,13 +211,14 @@ class HIperfClient {
void checkReceivedRtcContent(ConsumerSocket &c,
const ContentObject &contentObject) {
- if (!configuration_.test_mode_) return;
+ if (!configuration_.test_mode_)
+ return;
uint32_t receivedSeg = contentObject.getName().getSuffix();
auto payload = contentObject.getPayload();
- if ((uint32_t)payload->length() == 8) { // 8 is the size of the NACK
- // payload
+ if ((uint32_t)payload->length() == 8) { // 8 is the size of the NACK
+ // payload
uint32_t *payloadPtr = (uint32_t *)payload->data();
uint32_t productionSeg = *(payloadPtr);
uint32_t productionRate = *(++payloadPtr);
@@ -299,7 +277,8 @@ class HIperfClient {
void handleTimerExpiration(ConsumerSocket &c,
const TransportStatistics &stats) {
- if (configuration_.rtc_) return;
+ if (configuration_.rtc_)
+ return;
const char separator = ' ';
const int width = 20;
@@ -361,7 +340,7 @@ class HIperfClient {
configuration_.transport_protocol_ = CBR;
}
-#ifdef SECURE_HICNSOCKET
+#ifdef SECURE_HICNTRANSPORT
if (configuration_.secure_) {
consumer_socket_ = std::make_shared<P2PSecureConsumerSocket>(
RAAQM, configuration_.transport_protocol_);
@@ -378,7 +357,7 @@ class HIperfClient {
#endif
consumer_socket_ =
std::make_shared<ConsumerSocket>(configuration_.transport_protocol_);
-#ifdef SECURE_HICNSOCKET
+#ifdef SECURE_HICNTRANSPORT
}
#endif
@@ -431,13 +410,15 @@ class HIperfClient {
if (!configuration_.producer_certificate.empty()) {
key_id_ = verifier->addKeyFromCertificate(
configuration_.producer_certificate);
- if (key_id_ == nullptr) return ERROR_SETUP;
+ if (key_id_ == nullptr)
+ return ERROR_SETUP;
}
if (!configuration_.passphrase.empty()) {
key_id_ = verifier->addKeyFromPassphrase(
configuration_.passphrase, utils::CryptoSuite::HMAC_SHA256);
- if (key_id_ == nullptr) return ERROR_SETUP;
+ if (key_id_ == nullptr)
+ return ERROR_SETUP;
}
if (consumer_socket_->setSocketOption(GeneralTransportOptions::VERIFIER,
@@ -528,11 +509,11 @@ class HIperfClient {
return ERROR_SUCCESS;
}
- private:
+private:
class RTCCallback : public ConsumerSocket::ReadCallback {
static constexpr std::size_t mtu = 1500;
- public:
+ public:
RTCCallback(HIperfClient &hiperf_client) : client_(hiperf_client) {
client_.configuration_.receive_buffer = utils::MemBuf::create(mtu);
}
@@ -559,12 +540,12 @@ class HIperfClient {
std::cout << "Data successfully read" << std::endl;
}
- private:
+ private:
HIperfClient &client_;
};
class Callback : public ConsumerSocket::ReadCallback {
- public:
+ public:
Callback(HIperfClient &hiperf_client) : client_(hiperf_client) {
client_.configuration_.receive_buffer =
utils::MemBuf::create(client_.configuration_.receive_buffer_size_);
@@ -610,14 +591,14 @@ class HIperfClient {
client_.io_service_.stop();
}
- private:
+ private:
HIperfClient &client_;
};
class KeyCallback : public ConsumerSocket::ReadCallback {
static constexpr std::size_t read_size = 16 * 1024;
- public:
+ public:
KeyCallback(HIperfClient &hiperf_client)
: client_(hiperf_client), key_(nullptr) {}
@@ -643,14 +624,13 @@ class HIperfClient {
client_.io_service_.stop();
}
- bool verifyKey() { return !key_->empty(); }
+ bool validateKey() { return !key_->empty(); }
void readSuccess(std::size_t total_size) noexcept override {
std::cout << "Key size: " << total_size << " bytes" << std::endl;
- afterRead();
}
- void afterRead() {
+ void readKey() {
std::shared_ptr<utils::Verifier> verifier =
std::make_shared<utils::Verifier>();
verifier->addKeyFromPassphrase(*key_, utils::CryptoSuite::HMAC_SHA256);
@@ -661,26 +641,30 @@ class HIperfClient {
consumer_socket_->setSocketOption(GeneralTransportOptions::VERIFIER,
verifier);
} else {
- std::cout << "Could not set verifier" << std::endl;
+ std::cout << "Consumer socket not set" << std::endl;
return;
}
- if (consumer_socket_->verifyKeyPackets()) {
- std::cout << "Verification of packet signatures successful"
- << std::endl;
+ if (validateKey()) {
+ std::cout << "Key has been authenticated" << std::endl;
} else {
- std::cout << "Could not verify packet signatures" << std::endl;
+ std::cout << "Key could not be authenticated" << std::endl;
return;
}
- std::cout << "Key retrieval done" << std::endl;
+ if (consumer_socket_->verifyKeyPackets()) {
+ std::cout << "Signatures of key packets are valid" << std::endl;
+ } else {
+ std::cout << "Signatures of key packets are not valid" << std::endl;
+ return;
+ }
}
void setConsumer(std::shared_ptr<ConsumerSocket> consumer_socket) {
consumer_socket_ = consumer_socket;
}
- private:
+ private:
HIperfClient &client_;
std::unique_ptr<std::string> key_;
std::shared_ptr<ConsumerSocket> consumer_socket_;
@@ -699,7 +683,7 @@ class HIperfClient {
RTCCallback *rtc_callback_;
Callback *callback_;
KeyCallback *key_callback_;
-}; // namespace interface
+}; // namespace interface
/**
* Hiperf server class: configure and setup an hicn producer following the
@@ -708,19 +692,16 @@ class HIperfClient {
class HIperfServer {
const std::size_t log2_content_object_buffer_size = 8;
- public:
+public:
HIperfServer(ServerConfiguration &conf)
- : configuration_(conf),
- signals_(io_service_, SIGINT),
- rtc_timer_(io_service_),
- unsatisfied_interests_(),
+ : configuration_(conf), signals_(io_service_, SIGINT),
+ rtc_timer_(io_service_), unsatisfied_interests_(),
content_objects_((std::uint16_t)(1 << log2_content_object_buffer_size)),
content_objects_index_(0),
mask_((std::uint16_t)(1 << log2_content_object_buffer_size) - 1),
last_segment_(0),
#ifndef _WIN32
- ptr_last_segment_(&last_segment_),
- input_(io_service_),
+ ptr_last_segment_(&last_segment_), input_(io_service_),
rtc_running_(false)
#else
ptr_last_segment_(&last_segment_)
@@ -843,9 +824,10 @@ class HIperfServer {
std::placeholders::_1, std::placeholders::_2));
}
- std::shared_ptr<utils::Identity> getProducerIdentity(
- std::string &keystore_name, std::string &keystore_password,
- utils::CryptoHashType &hash_algorithm) {
+ std::shared_ptr<utils::Identity>
+ getProducerIdentity(std::string &keystore_name,
+ std::string &keystore_password,
+ utils::CryptoHashType &hash_algorithm) {
if (access(keystore_name.c_str(), F_OK) != -1) {
return std::make_shared<utils::Identity>(keystore_name, keystore_password,
hash_algorithm);
@@ -859,7 +841,7 @@ class HIperfServer {
int setup() {
int ret;
-#ifdef SECURE_HICNSOCKET
+#ifdef SECURE_HICNTRANSPORT
if (configuration_.secure_) {
auto identity = getProducerIdentity(configuration_.keystore_name,
configuration_.keystore_password,
@@ -873,7 +855,7 @@ class HIperfServer {
} else {
producer_socket_ = std::make_unique<ProducerSocket>();
}
-#ifdef SECURE_HICNSOCKET
+#ifdef SECURE_HICNTRANSPORT
}
#endif
@@ -974,7 +956,8 @@ class HIperfServer {
}
void sendRTCContentObjectCallback(std::error_code ec) {
- if (ec) return;
+ if (ec)
+ return;
rtc_timer_.expires_from_now(
configuration_.production_rate_.getMicrosecondsForPacket(
configuration_.payload_size_));
@@ -1007,11 +990,11 @@ class HIperfServer {
std::placeholders::_1));
}
- input_buffer_.consume(length); // Remove newline from input.
- asio::async_read_until(
- input_, input_buffer_, '\n',
- std::bind(&HIperfServer::handleInput, this, std::placeholders::_1,
- std::placeholders::_2));
+ input_buffer_.consume(length); // Remove newline from input.
+ asio::async_read_until(input_, input_buffer_, '\n',
+ std::bind(&HIperfServer::handleInput, this,
+ std::placeholders::_1,
+ std::placeholders::_2));
}
#endif
@@ -1027,10 +1010,10 @@ class HIperfServer {
if (configuration_.rtc_) {
#ifndef _WIN32
if (configuration_.interactive_) {
- asio::async_read_until(
- input_, input_buffer_, '\n',
- std::bind(&HIperfServer::handleInput, this, std::placeholders::_1,
- std::placeholders::_2));
+ asio::async_read_until(input_, input_buffer_, '\n',
+ std::bind(&HIperfServer::handleInput, this,
+ std::placeholders::_1,
+ std::placeholders::_2));
} else {
rtc_running_ = true;
rtc_timer_.expires_from_now(
@@ -1055,7 +1038,7 @@ class HIperfServer {
return ERROR_SUCCESS;
}
- private:
+private:
ServerConfiguration configuration_;
asio::io_service io_service_;
asio::signal_set signals_;
@@ -1072,7 +1055,7 @@ class HIperfServer {
asio::streambuf input_buffer_;
bool rtc_running_;
#endif
-}; // namespace interface
+}; // namespace interface
void usage() {
std::cerr << "HIPERF - A tool for performing network throughput "
@@ -1211,174 +1194,174 @@ int main(int argc, char *argv[]) {
"DSCf:b:d:W:RM:c:vA:s:rmlK:k:y:p:hi:xE:P:B:ItL:")) !=
-1) {
switch (opt) {
- // Common
- case 'D': {
- daemon = true;
- break;
- }
- case 'I': {
- server_configuration.interactive_ = true;
- break;
- }
+ // Common
+ case 'D': {
+ daemon = true;
+ break;
+ }
+ case 'I': {
+ server_configuration.interactive_ = true;
+ break;
+ }
#else
while ((opt = getopt(argc, argv,
"SCf:b:d:W:RM:c:vA:s:rmlK:k:y:p:hi:xB:E:P:tL:")) != -1) {
switch (opt) {
#endif
- case 'f': {
- log_file = optarg;
- break;
- }
- case 'R': {
- client_configuration.rtc_ = true;
- server_configuration.rtc_ = true;
- break;
- }
+ case 'f': {
+ log_file = optarg;
+ break;
+ }
+ case 'R': {
+ client_configuration.rtc_ = true;
+ server_configuration.rtc_ = true;
+ break;
+ }
- // Server or Client
- case 'S': {
- role -= 1;
- break;
- }
- case 'C': {
- role += 1;
- break;
- }
- case 'k': {
- server_configuration.passphrase = std::string(optarg);
- client_configuration.passphrase = std::string(optarg);
- server_configuration.sign = true;
- options = -1;
- break;
- }
+ // Server or Client
+ case 'S': {
+ role -= 1;
+ break;
+ }
+ case 'C': {
+ role += 1;
+ break;
+ }
+ case 'k': {
+ server_configuration.passphrase = std::string(optarg);
+ client_configuration.passphrase = std::string(optarg);
+ server_configuration.sign = true;
+ options = -1;
+ break;
+ }
- // Client specifc
- case 'b': {
- client_configuration.beta = std::stod(optarg);
- options = 1;
- break;
- }
- case 'd': {
- client_configuration.drop_factor = std::stod(optarg);
- options = 1;
- break;
- }
- case 'W': {
- client_configuration.window = std::stod(optarg);
- options = 1;
- break;
- }
- case 'M': {
- client_configuration.receive_buffer_size_ = std::stoull(optarg);
- options = 1;
- break;
- }
+ // Client specifc
+ case 'b': {
+ client_configuration.beta = std::stod(optarg);
+ options = 1;
+ break;
+ }
+ case 'd': {
+ client_configuration.drop_factor = std::stod(optarg);
+ options = 1;
+ break;
+ }
+ case 'W': {
+ client_configuration.window = std::stod(optarg);
+ options = 1;
+ break;
+ }
+ case 'M': {
+ client_configuration.receive_buffer_size_ = std::stoull(optarg);
+ options = 1;
+ break;
+ }
#ifdef SECURE_HICNTRANSPORT
- case 'P': {
- client_configuration.producer_prefix_ = Prefix(optarg);
- client_configuration.secure_ = true;
- break;
- }
+ case 'P': {
+ client_configuration.producer_prefix_ = Prefix(optarg);
+ client_configuration.secure_ = true;
+ break;
+ }
#endif
- case 'c': {
- client_configuration.producer_certificate = std::string(optarg);
- options = 1;
- break;
- }
- case 'v': {
- client_configuration.verify = true;
- options = 1;
- break;
- }
- case 'i': {
- client_configuration.report_interval_milliseconds_ = std::stoul(optarg);
- options = 1;
- break;
- }
- case 't': {
- client_configuration.test_mode_ = true;
- options = 1;
- break;
- }
- case 'L': {
- client_configuration.interest_lifetime_ = std::stoul(optarg);
- options = 1;
- break;
- }
- // Server specific
- case 'A': {
- server_configuration.download_size = std::stoul(optarg);
- options = -1;
- break;
- }
- case 's': {
- server_configuration.payload_size_ = std::stoul(optarg);
- options = -1;
- break;
- }
- case 'r': {
- server_configuration.virtual_producer = false;
- options = -1;
- break;
- }
- case 'm': {
- server_configuration.manifest = true;
- options = -1;
- break;
- }
- case 'l': {
- server_configuration.live_production = true;
- options = -1;
- break;
- }
- case 'K': {
- server_configuration.keystore_name = std::string(optarg);
- server_configuration.sign = true;
- options = -1;
- break;
- }
- case 'y': {
- if (strncasecmp(optarg, "sha256", 6) == 0) {
- server_configuration.hash_algorithm = utils::CryptoHashType::SHA_256;
- } else if (strncasecmp(optarg, "sha512", 6) == 0) {
- server_configuration.hash_algorithm = utils::CryptoHashType::SHA_512;
- } else if (strncasecmp(optarg, "crc32", 5) == 0) {
- server_configuration.hash_algorithm = utils::CryptoHashType::CRC32C;
- } else {
- std::cerr << "Ignored unknown hash algorithm. Using SHA 256."
- << std::endl;
- }
- options = -1;
- break;
- }
- case 'p': {
- server_configuration.keystore_password = std::string(optarg);
- options = -1;
- break;
- }
- case 'x': {
- server_configuration.multiphase_produce_ = true;
- options = -1;
- break;
- }
- case 'B': {
- auto str = std::string(optarg);
- std::transform(str.begin(), str.end(), str.begin(), ::tolower);
- server_configuration.production_rate_ = str;
- options = -1;
- break;
+ case 'c': {
+ client_configuration.producer_certificate = std::string(optarg);
+ options = 1;
+ break;
+ }
+ case 'v': {
+ client_configuration.verify = true;
+ options = 1;
+ break;
+ }
+ case 'i': {
+ client_configuration.report_interval_milliseconds_ = std::stoul(optarg);
+ options = 1;
+ break;
+ }
+ case 't': {
+ client_configuration.test_mode_ = true;
+ options = 1;
+ break;
+ }
+ case 'L': {
+ client_configuration.interest_lifetime_ = std::stoul(optarg);
+ options = 1;
+ break;
+ }
+ // Server specific
+ case 'A': {
+ server_configuration.download_size = std::stoul(optarg);
+ options = -1;
+ break;
+ }
+ case 's': {
+ server_configuration.payload_size_ = std::stoul(optarg);
+ options = -1;
+ break;
+ }
+ case 'r': {
+ server_configuration.virtual_producer = false;
+ options = -1;
+ break;
+ }
+ case 'm': {
+ server_configuration.manifest = true;
+ options = -1;
+ break;
+ }
+ case 'l': {
+ server_configuration.live_production = true;
+ options = -1;
+ break;
+ }
+ case 'K': {
+ server_configuration.keystore_name = std::string(optarg);
+ server_configuration.sign = true;
+ options = -1;
+ break;
+ }
+ case 'y': {
+ if (strncasecmp(optarg, "sha256", 6) == 0) {
+ server_configuration.hash_algorithm = utils::CryptoHashType::SHA_256;
+ } else if (strncasecmp(optarg, "sha512", 6) == 0) {
+ server_configuration.hash_algorithm = utils::CryptoHashType::SHA_512;
+ } else if (strncasecmp(optarg, "crc32", 5) == 0) {
+ server_configuration.hash_algorithm = utils::CryptoHashType::CRC32C;
+ } else {
+ std::cerr << "Ignored unknown hash algorithm. Using SHA 256."
+ << std::endl;
}
+ options = -1;
+ break;
+ }
+ case 'p': {
+ server_configuration.keystore_password = std::string(optarg);
+ options = -1;
+ break;
+ }
+ case 'x': {
+ server_configuration.multiphase_produce_ = true;
+ options = -1;
+ break;
+ }
+ case 'B': {
+ auto str = std::string(optarg);
+ std::transform(str.begin(), str.end(), str.begin(), ::tolower);
+ server_configuration.production_rate_ = str;
+ options = -1;
+ break;
+ }
#ifdef SECURE_HICNTRANSPORT
- case 'E': {
- server_configuration.keystore_name = std::string(optarg);
- server_configuration.secure_ = true;
- break;
- }
+ case 'E': {
+ server_configuration.keystore_name = std::string(optarg);
+ server_configuration.secure_ = true;
+ break;
+ }
#endif
- case 'h':
- default:
- usage();
- return EXIT_FAILURE;
+ case 'h':
+ default:
+ usage();
+ return EXIT_FAILURE;
}
}
@@ -1457,9 +1440,9 @@ int main(int argc, char *argv[]) {
return 0;
}
-} // end namespace interface
+} // end namespace interface
-} // end namespace transport
+} // end namespace transport
int main(int argc, char *argv[]) {
return transport::interface::main(argc, argv);