diff options
Diffstat (limited to 'libtransport/src/implementation/p2psecure_socket_consumer.cc')
-rw-r--r-- | libtransport/src/implementation/p2psecure_socket_consumer.cc | 149 |
1 files changed, 74 insertions, 75 deletions
diff --git a/libtransport/src/implementation/p2psecure_socket_consumer.cc b/libtransport/src/implementation/p2psecure_socket_consumer.cc index 40ab58161..9b79850d6 100644 --- a/libtransport/src/implementation/p2psecure_socket_consumer.cc +++ b/libtransport/src/implementation/p2psecure_socket_consumer.cc @@ -33,10 +33,6 @@ void P2PSecureConsumerSocket::setInterestPayload( if (payload_ != NULL) int2.appendPayload(std::move(payload_)); } -// implement void readBufferAvailable(), size_t maxBufferSize() const override, -// void readError(), void readSuccess(). getReadBuffer() and readDataAvailable() -// must be implemented even if empty. - /* Return the number of read bytes in the return param */ int readOld(BIO *b, char *buf, int size) { if (size < 0) return size; @@ -51,11 +47,13 @@ int readOld(BIO *b, char *buf, int size) { socket->network_name_.setSuffix(socket->random_suffix_); socket->ConsumerSocket::asyncConsume(socket->network_name_); } + if (!socket->something_to_read_) socket->cv_.wait(lck); } size_t size_to_read, read; size_t chain_size = socket->head_->length(); + if (socket->head_->isChained()) chain_size = socket->head_->computeChainDataLength(); @@ -106,6 +104,7 @@ int writeOld(BIO *b, const char *buf, int num) { socket = (P2PSecureConsumerSocket *)BIO_get_data(b); socket->payload_ = utils::MemBuf::copyBuffer(buf, num); + socket->ConsumerSocket::setSocketOption( ConsumerCallbacksOptions::INTEREST_OUTPUT, (ConsumerInterestCallback)std::bind( @@ -173,9 +172,9 @@ int P2PSecureConsumerSocket::parseHicnKeyIdCb(SSL *s, unsigned int ext_type, P2PSecureConsumerSocket::P2PSecureConsumerSocket( interface::ConsumerSocket *consumer, int handshake_protocol, int transport_protocol) - : ConsumerSocket(consumer, transport_protocol), + : ConsumerSocket(consumer, handshake_protocol), name_(), - tls_consumer_(), + tls_consumer_(nullptr), buf_pool_(), decrypted_content_(), payload_(), @@ -224,12 +223,6 @@ P2PSecureConsumerSocket::P2PSecureConsumerSocket( BIO_set_data(bio, this); SSL_set_bio(ssl_, bio, bio); - ConsumerSocket::getSocketOption(MAX_WINDOW_SIZE, old_max_win_); - ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, (double)1.0); - - ConsumerSocket::getSocketOption(CURRENT_WINDOW_SIZE, old_current_win_); - ConsumerSocket::setSocketOption(CURRENT_WINDOW_SIZE, (double)1.0); - std::default_random_engine generator; std::uniform_int_distribution<int> distribution( 1, std::numeric_limits<uint32_t>::max()); @@ -244,76 +237,29 @@ P2PSecureConsumerSocket::~P2PSecureConsumerSocket() { SSL_shutdown(ssl_); } -int P2PSecureConsumerSocket::consume(const Name &name) { - if (transport_protocol_->isRunning()) { - return CONSUMER_BUSY; - } +int P2PSecureConsumerSocket::handshake() { + int result = 1; - if ((SSL_in_before(this->ssl_) || SSL_in_init(this->ssl_))) { - ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, (double)1.0); - network_name_ = producer_namespace_.getRandomName(); - network_name_.setSuffix(0); - int result = SSL_connect(this->ssl_); - ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, old_max_win_); - ConsumerSocket::setSocketOption(CURRENT_WINDOW_SIZE, old_current_win_); - if (result != 1) - throw errors::RuntimeException("Unable to perform client handshake"); + if (!(SSL_in_before(this->ssl_) || SSL_in_init(this->ssl_))) { + return 1; } - std::shared_ptr<Name> prefix_name = std::make_shared<Name>( - secure_prefix_.family, - ip_address_get_buffer(&(secure_prefix_.address), secure_prefix_.family)); - std::shared_ptr<Prefix> prefix = - std::make_shared<Prefix>(*prefix_name, secure_prefix_.len); - TLSConsumerSocket tls_consumer(nullptr, this->protocol_, this->ssl_); - tls_consumer.setInterface(new interface::TLSConsumerSocket(&tls_consumer)); - ConsumerTimerCallback *stats_summary_callback = nullptr; - this->getSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY, - &stats_summary_callback); + ConsumerSocket::getSocketOption(MAX_WINDOW_SIZE, old_max_win_); + ConsumerSocket::getSocketOption(CURRENT_WINDOW_SIZE, old_current_win_); - uint32_t lifetime; - this->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, lifetime); - tls_consumer.setSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, - lifetime); - tls_consumer.setSocketOption(ConsumerCallbacksOptions::READ_CALLBACK, - read_callback_decrypted_); - tls_consumer.setSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY, - *stats_summary_callback); - tls_consumer.setSocketOption(GeneralTransportOptions::STATS_INTERVAL, - this->timer_interval_milliseconds_); - tls_consumer.setSocketOption(MAX_WINDOW_SIZE, old_max_win_); - tls_consumer.setSocketOption(CURRENT_WINDOW_SIZE, old_current_win_); - tls_consumer.connect(); + ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, (double)1.0); + ConsumerSocket::setSocketOption(CURRENT_WINDOW_SIZE, (double)1.0); - if (payload_ != NULL) - return tls_consumer.consume((prefix->mapName(name)), std::move(payload_)); - else - return tls_consumer.consume((prefix->mapName(name))); -} + network_name_ = producer_namespace_.getRandomName(); + network_name_.setSuffix(0); -int P2PSecureConsumerSocket::asyncConsume(const Name &name) { - if ((SSL_in_before(this->ssl_) || SSL_in_init(this->ssl_))) { - ConsumerSocket::setSocketOption(CURRENT_WINDOW_SIZE, (double)1.0); - ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, (double)1.0); - network_name_ = producer_namespace_.getRandomName(); - network_name_.setSuffix(0); - TRANSPORT_LOGD("Start handshake at %s", network_name_.toString().c_str()); - interface::ConsumerSocket::ReadCallback *on_payload = VOID_HANDLER; - this->getSocketOption(ConsumerCallbacksOptions::READ_CALLBACK, &on_payload); - int result = SSL_connect(this->ssl_); - ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, old_max_win_); - ConsumerSocket::setSocketOption(CURRENT_WINDOW_SIZE, old_current_win_); - if (result != 1) - throw errors::RuntimeException("Unable to perform client handshake"); - TRANSPORT_LOGD("Handshake performed!"); - } + TRANSPORT_LOGD("Start handshake at %s", network_name_.toString().c_str()); + result = SSL_connect(this->ssl_); - std::shared_ptr<Name> prefix_name = std::make_shared<Name>( - secure_prefix_.family, - ip_address_get_buffer(&(secure_prefix_.address), secure_prefix_.family)); - std::shared_ptr<Prefix> prefix = - std::make_shared<Prefix>(*prefix_name, secure_prefix_.len); + return result; +} +void P2PSecureConsumerSocket::initSessionSocket() { tls_consumer_ = std::make_shared<TLSConsumerSocket>(nullptr, this->protocol_, this->ssl_); tls_consumer_->setInterface( @@ -325,6 +271,7 @@ int P2PSecureConsumerSocket::asyncConsume(const Name &name) { uint32_t lifetime; this->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, lifetime); + tls_consumer_->setSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, lifetime); tls_consumer_->setSocketOption(ConsumerCallbacksOptions::READ_CALLBACK, @@ -336,6 +283,59 @@ int P2PSecureConsumerSocket::asyncConsume(const Name &name) { tls_consumer_->setSocketOption(MAX_WINDOW_SIZE, old_max_win_); tls_consumer_->setSocketOption(CURRENT_WINDOW_SIZE, old_current_win_); tls_consumer_->connect(); +} + +int P2PSecureConsumerSocket::consume(const Name &name) { + if (transport_protocol_->isRunning()) { + return CONSUMER_BUSY; + } + + if (handshake() != 1) { + throw errors::RuntimeException("Unable to perform client handshake"); + } else { + TRANSPORT_LOGD("Handshake performed!"); + } + + initSessionSocket(); + + if (tls_consumer_ == nullptr) { + throw errors::RuntimeException("TLS socket does not exist"); + } + + std::shared_ptr<Name> prefix_name = std::make_shared<Name>( + secure_prefix_.family, + ip_address_get_buffer(&(secure_prefix_.address), secure_prefix_.family)); + std::shared_ptr<Prefix> prefix = + std::make_shared<Prefix>(*prefix_name, secure_prefix_.len); + + if (payload_ != nullptr) + return tls_consumer_->consume((prefix->mapName(name)), std::move(payload_)); + else + return tls_consumer_->consume((prefix->mapName(name))); +} + +int P2PSecureConsumerSocket::asyncConsume(const Name &name) { + if (transport_protocol_->isRunning()) { + return CONSUMER_BUSY; + } + + if (handshake() != 1) { + throw errors::RuntimeException("Unable to perform client handshake"); + } else { + TRANSPORT_LOGD("Handshake performed!"); + } + + initSessionSocket(); + + if (tls_consumer_ == nullptr) { + throw errors::RuntimeException("TLS socket does not exist"); + } + + std::shared_ptr<Name> prefix_name = std::make_shared<Name>( + secure_prefix_.family, + ip_address_get_buffer(&(secure_prefix_.address), secure_prefix_.family)); + std::shared_ptr<Prefix> prefix = + std::make_shared<Prefix>(*prefix_name, secure_prefix_.len); if (payload_ != NULL) return tls_consumer_->asyncConsume((prefix->mapName(name)), @@ -399,5 +399,4 @@ void P2PSecureConsumerSocket::readSuccess(std::size_t total_size) noexcept { bool P2PSecureConsumerSocket::isBufferMovable() noexcept { return true; } } // namespace implementation - } // namespace transport |