summaryrefslogtreecommitdiffstats
path: root/libtransport/src/implementation/p2psecure_socket_consumer.cc
diff options
context:
space:
mode:
Diffstat (limited to 'libtransport/src/implementation/p2psecure_socket_consumer.cc')
-rw-r--r--libtransport/src/implementation/p2psecure_socket_consumer.cc149
1 files changed, 74 insertions, 75 deletions
diff --git a/libtransport/src/implementation/p2psecure_socket_consumer.cc b/libtransport/src/implementation/p2psecure_socket_consumer.cc
index 40ab58161..9b79850d6 100644
--- a/libtransport/src/implementation/p2psecure_socket_consumer.cc
+++ b/libtransport/src/implementation/p2psecure_socket_consumer.cc
@@ -33,10 +33,6 @@ void P2PSecureConsumerSocket::setInterestPayload(
if (payload_ != NULL) int2.appendPayload(std::move(payload_));
}
-// implement void readBufferAvailable(), size_t maxBufferSize() const override,
-// void readError(), void readSuccess(). getReadBuffer() and readDataAvailable()
-// must be implemented even if empty.
-
/* Return the number of read bytes in the return param */
int readOld(BIO *b, char *buf, int size) {
if (size < 0) return size;
@@ -51,11 +47,13 @@ int readOld(BIO *b, char *buf, int size) {
socket->network_name_.setSuffix(socket->random_suffix_);
socket->ConsumerSocket::asyncConsume(socket->network_name_);
}
+
if (!socket->something_to_read_) socket->cv_.wait(lck);
}
size_t size_to_read, read;
size_t chain_size = socket->head_->length();
+
if (socket->head_->isChained())
chain_size = socket->head_->computeChainDataLength();
@@ -106,6 +104,7 @@ int writeOld(BIO *b, const char *buf, int num) {
socket = (P2PSecureConsumerSocket *)BIO_get_data(b);
socket->payload_ = utils::MemBuf::copyBuffer(buf, num);
+
socket->ConsumerSocket::setSocketOption(
ConsumerCallbacksOptions::INTEREST_OUTPUT,
(ConsumerInterestCallback)std::bind(
@@ -173,9 +172,9 @@ int P2PSecureConsumerSocket::parseHicnKeyIdCb(SSL *s, unsigned int ext_type,
P2PSecureConsumerSocket::P2PSecureConsumerSocket(
interface::ConsumerSocket *consumer, int handshake_protocol,
int transport_protocol)
- : ConsumerSocket(consumer, transport_protocol),
+ : ConsumerSocket(consumer, handshake_protocol),
name_(),
- tls_consumer_(),
+ tls_consumer_(nullptr),
buf_pool_(),
decrypted_content_(),
payload_(),
@@ -224,12 +223,6 @@ P2PSecureConsumerSocket::P2PSecureConsumerSocket(
BIO_set_data(bio, this);
SSL_set_bio(ssl_, bio, bio);
- ConsumerSocket::getSocketOption(MAX_WINDOW_SIZE, old_max_win_);
- ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, (double)1.0);
-
- ConsumerSocket::getSocketOption(CURRENT_WINDOW_SIZE, old_current_win_);
- ConsumerSocket::setSocketOption(CURRENT_WINDOW_SIZE, (double)1.0);
-
std::default_random_engine generator;
std::uniform_int_distribution<int> distribution(
1, std::numeric_limits<uint32_t>::max());
@@ -244,76 +237,29 @@ P2PSecureConsumerSocket::~P2PSecureConsumerSocket() {
SSL_shutdown(ssl_);
}
-int P2PSecureConsumerSocket::consume(const Name &name) {
- if (transport_protocol_->isRunning()) {
- return CONSUMER_BUSY;
- }
+int P2PSecureConsumerSocket::handshake() {
+ int result = 1;
- if ((SSL_in_before(this->ssl_) || SSL_in_init(this->ssl_))) {
- ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, (double)1.0);
- network_name_ = producer_namespace_.getRandomName();
- network_name_.setSuffix(0);
- int result = SSL_connect(this->ssl_);
- ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, old_max_win_);
- ConsumerSocket::setSocketOption(CURRENT_WINDOW_SIZE, old_current_win_);
- if (result != 1)
- throw errors::RuntimeException("Unable to perform client handshake");
+ if (!(SSL_in_before(this->ssl_) || SSL_in_init(this->ssl_))) {
+ return 1;
}
- std::shared_ptr<Name> prefix_name = std::make_shared<Name>(
- secure_prefix_.family,
- ip_address_get_buffer(&(secure_prefix_.address), secure_prefix_.family));
- std::shared_ptr<Prefix> prefix =
- std::make_shared<Prefix>(*prefix_name, secure_prefix_.len);
- TLSConsumerSocket tls_consumer(nullptr, this->protocol_, this->ssl_);
- tls_consumer.setInterface(new interface::TLSConsumerSocket(&tls_consumer));
- ConsumerTimerCallback *stats_summary_callback = nullptr;
- this->getSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY,
- &stats_summary_callback);
+ ConsumerSocket::getSocketOption(MAX_WINDOW_SIZE, old_max_win_);
+ ConsumerSocket::getSocketOption(CURRENT_WINDOW_SIZE, old_current_win_);
- uint32_t lifetime;
- this->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, lifetime);
- tls_consumer.setSocketOption(GeneralTransportOptions::INTEREST_LIFETIME,
- lifetime);
- tls_consumer.setSocketOption(ConsumerCallbacksOptions::READ_CALLBACK,
- read_callback_decrypted_);
- tls_consumer.setSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY,
- *stats_summary_callback);
- tls_consumer.setSocketOption(GeneralTransportOptions::STATS_INTERVAL,
- this->timer_interval_milliseconds_);
- tls_consumer.setSocketOption(MAX_WINDOW_SIZE, old_max_win_);
- tls_consumer.setSocketOption(CURRENT_WINDOW_SIZE, old_current_win_);
- tls_consumer.connect();
+ ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, (double)1.0);
+ ConsumerSocket::setSocketOption(CURRENT_WINDOW_SIZE, (double)1.0);
- if (payload_ != NULL)
- return tls_consumer.consume((prefix->mapName(name)), std::move(payload_));
- else
- return tls_consumer.consume((prefix->mapName(name)));
-}
+ network_name_ = producer_namespace_.getRandomName();
+ network_name_.setSuffix(0);
-int P2PSecureConsumerSocket::asyncConsume(const Name &name) {
- if ((SSL_in_before(this->ssl_) || SSL_in_init(this->ssl_))) {
- ConsumerSocket::setSocketOption(CURRENT_WINDOW_SIZE, (double)1.0);
- ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, (double)1.0);
- network_name_ = producer_namespace_.getRandomName();
- network_name_.setSuffix(0);
- TRANSPORT_LOGD("Start handshake at %s", network_name_.toString().c_str());
- interface::ConsumerSocket::ReadCallback *on_payload = VOID_HANDLER;
- this->getSocketOption(ConsumerCallbacksOptions::READ_CALLBACK, &on_payload);
- int result = SSL_connect(this->ssl_);
- ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, old_max_win_);
- ConsumerSocket::setSocketOption(CURRENT_WINDOW_SIZE, old_current_win_);
- if (result != 1)
- throw errors::RuntimeException("Unable to perform client handshake");
- TRANSPORT_LOGD("Handshake performed!");
- }
+ TRANSPORT_LOGD("Start handshake at %s", network_name_.toString().c_str());
+ result = SSL_connect(this->ssl_);
- std::shared_ptr<Name> prefix_name = std::make_shared<Name>(
- secure_prefix_.family,
- ip_address_get_buffer(&(secure_prefix_.address), secure_prefix_.family));
- std::shared_ptr<Prefix> prefix =
- std::make_shared<Prefix>(*prefix_name, secure_prefix_.len);
+ return result;
+}
+void P2PSecureConsumerSocket::initSessionSocket() {
tls_consumer_ =
std::make_shared<TLSConsumerSocket>(nullptr, this->protocol_, this->ssl_);
tls_consumer_->setInterface(
@@ -325,6 +271,7 @@ int P2PSecureConsumerSocket::asyncConsume(const Name &name) {
uint32_t lifetime;
this->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, lifetime);
+
tls_consumer_->setSocketOption(GeneralTransportOptions::INTEREST_LIFETIME,
lifetime);
tls_consumer_->setSocketOption(ConsumerCallbacksOptions::READ_CALLBACK,
@@ -336,6 +283,59 @@ int P2PSecureConsumerSocket::asyncConsume(const Name &name) {
tls_consumer_->setSocketOption(MAX_WINDOW_SIZE, old_max_win_);
tls_consumer_->setSocketOption(CURRENT_WINDOW_SIZE, old_current_win_);
tls_consumer_->connect();
+}
+
+int P2PSecureConsumerSocket::consume(const Name &name) {
+ if (transport_protocol_->isRunning()) {
+ return CONSUMER_BUSY;
+ }
+
+ if (handshake() != 1) {
+ throw errors::RuntimeException("Unable to perform client handshake");
+ } else {
+ TRANSPORT_LOGD("Handshake performed!");
+ }
+
+ initSessionSocket();
+
+ if (tls_consumer_ == nullptr) {
+ throw errors::RuntimeException("TLS socket does not exist");
+ }
+
+ std::shared_ptr<Name> prefix_name = std::make_shared<Name>(
+ secure_prefix_.family,
+ ip_address_get_buffer(&(secure_prefix_.address), secure_prefix_.family));
+ std::shared_ptr<Prefix> prefix =
+ std::make_shared<Prefix>(*prefix_name, secure_prefix_.len);
+
+ if (payload_ != nullptr)
+ return tls_consumer_->consume((prefix->mapName(name)), std::move(payload_));
+ else
+ return tls_consumer_->consume((prefix->mapName(name)));
+}
+
+int P2PSecureConsumerSocket::asyncConsume(const Name &name) {
+ if (transport_protocol_->isRunning()) {
+ return CONSUMER_BUSY;
+ }
+
+ if (handshake() != 1) {
+ throw errors::RuntimeException("Unable to perform client handshake");
+ } else {
+ TRANSPORT_LOGD("Handshake performed!");
+ }
+
+ initSessionSocket();
+
+ if (tls_consumer_ == nullptr) {
+ throw errors::RuntimeException("TLS socket does not exist");
+ }
+
+ std::shared_ptr<Name> prefix_name = std::make_shared<Name>(
+ secure_prefix_.family,
+ ip_address_get_buffer(&(secure_prefix_.address), secure_prefix_.family));
+ std::shared_ptr<Prefix> prefix =
+ std::make_shared<Prefix>(*prefix_name, secure_prefix_.len);
if (payload_ != NULL)
return tls_consumer_->asyncConsume((prefix->mapName(name)),
@@ -399,5 +399,4 @@ void P2PSecureConsumerSocket::readSuccess(std::size_t total_size) noexcept {
bool P2PSecureConsumerSocket::isBufferMovable() noexcept { return true; }
} // namespace implementation
-
} // namespace transport