diff options
Diffstat (limited to 'libtransport/src/implementation/p2psecure_socket_consumer.cc')
-rw-r--r-- | libtransport/src/implementation/p2psecure_socket_consumer.cc | 403 |
1 files changed, 403 insertions, 0 deletions
diff --git a/libtransport/src/implementation/p2psecure_socket_consumer.cc b/libtransport/src/implementation/p2psecure_socket_consumer.cc new file mode 100644 index 000000000..40ab58161 --- /dev/null +++ b/libtransport/src/implementation/p2psecure_socket_consumer.cc @@ -0,0 +1,403 @@ +/* + * Copyright (c) 2017-2020 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <implementation/p2psecure_socket_consumer.h> +#include <interfaces/tls_socket_consumer.h> + +#include <openssl/bio.h> +#include <openssl/ssl.h> +#include <openssl/tls1.h> + +#include <random> + +namespace transport { +namespace implementation { + +void P2PSecureConsumerSocket::setInterestPayload( + interface::ConsumerSocket &c, const core::Interest &interest) { + Interest &int2 = const_cast<Interest &>(interest); + random_suffix_ = int2.getName().getSuffix(); + + if (payload_ != NULL) int2.appendPayload(std::move(payload_)); +} + +// implement void readBufferAvailable(), size_t maxBufferSize() const override, +// void readError(), void readSuccess(). getReadBuffer() and readDataAvailable() +// must be implemented even if empty. + +/* Return the number of read bytes in the return param */ +int readOld(BIO *b, char *buf, int size) { + if (size < 0) return size; + + P2PSecureConsumerSocket *socket; + socket = (P2PSecureConsumerSocket *)BIO_get_data(b); + + std::unique_lock<std::mutex> lck(socket->mtx_); + + if (!socket->something_to_read_) { + if (!socket->transport_protocol_->isRunning()) { + socket->network_name_.setSuffix(socket->random_suffix_); + socket->ConsumerSocket::asyncConsume(socket->network_name_); + } + if (!socket->something_to_read_) socket->cv_.wait(lck); + } + + size_t size_to_read, read; + size_t chain_size = socket->head_->length(); + if (socket->head_->isChained()) + chain_size = socket->head_->computeChainDataLength(); + + if (chain_size > (size_t)size) { + read = size_to_read = (size_t)size; + } else { + read = size_to_read = chain_size; + socket->something_to_read_ = false; + } + + while (size_to_read) { + if (socket->head_->length() < size_to_read) { + std::memcpy(buf, socket->head_->data(), socket->head_->length()); + size_to_read -= socket->head_->length(); + buf += socket->head_->length(); + socket->head_ = socket->head_->pop(); + } else { + std::memcpy(buf, socket->head_->data(), size_to_read); + socket->head_->trimStart(size_to_read); + size_to_read = 0; + } + } + + return read; +} + +/* Return the number of read bytes in readbytes */ +int read(BIO *b, char *buf, size_t size, size_t *readbytes) { + int ret; + + if (size > INT_MAX) size = INT_MAX; + + ret = readOld(b, buf, (int)size); + + if (ret <= 0) { + *readbytes = 0; + return ret; + } + + *readbytes = (size_t)ret; + + return 1; +} + +/* Return the number of written bytes in the return param */ +int writeOld(BIO *b, const char *buf, int num) { + P2PSecureConsumerSocket *socket; + socket = (P2PSecureConsumerSocket *)BIO_get_data(b); + + socket->payload_ = utils::MemBuf::copyBuffer(buf, num); + socket->ConsumerSocket::setSocketOption( + ConsumerCallbacksOptions::INTEREST_OUTPUT, + (ConsumerInterestCallback)std::bind( + &P2PSecureConsumerSocket::setInterestPayload, socket, + std::placeholders::_1, std::placeholders::_2)); + + return num; +} + +/* Return the number of written bytes in written */ +int write(BIO *b, const char *buf, size_t size, size_t *written) { + int ret; + + if (size > INT_MAX) size = INT_MAX; + + ret = writeOld(b, buf, (int)size); + + if (ret <= 0) { + *written = 0; + return ret; + } + + *written = (size_t)ret; + + return 1; +} + +long ctrl(BIO *b, int cmd, long num, void *ptr) { return 1; } + +int P2PSecureConsumerSocket::addHicnKeyIdCb(SSL *s, unsigned int ext_type, + unsigned int context, + const unsigned char **out, + size_t *outlen, X509 *x, + size_t chainidx, int *al, + void *add_arg) { + if (ext_type == 100) { + *out = (unsigned char *)malloc(4); + *(uint32_t *)*out = 10; + *outlen = 4; + } + return 1; +} + +void P2PSecureConsumerSocket::freeHicnKeyIdCb(SSL *s, unsigned int ext_type, + unsigned int context, + const unsigned char *out, + void *add_arg) { + free(const_cast<unsigned char *>(out)); +} + +int P2PSecureConsumerSocket::parseHicnKeyIdCb(SSL *s, unsigned int ext_type, + unsigned int context, + const unsigned char *in, + size_t inlen, X509 *x, + size_t chainidx, int *al, + void *add_arg) { + P2PSecureConsumerSocket *socket = + reinterpret_cast<P2PSecureConsumerSocket *>(add_arg); + if (ext_type == 100) { + memcpy(&socket->secure_prefix_, in, sizeof(ip_prefix_t)); + } + return 1; +} + +P2PSecureConsumerSocket::P2PSecureConsumerSocket( + interface::ConsumerSocket *consumer, int handshake_protocol, + int transport_protocol) + : ConsumerSocket(consumer, transport_protocol), + name_(), + tls_consumer_(), + buf_pool_(), + decrypted_content_(), + payload_(), + head_(), + something_to_read_(false), + content_downloaded_(false), + random_suffix_(), + secure_prefix_(), + producer_namespace_(), + read_callback_decrypted_(), + mtx_(), + cv_(), + protocol_(transport_protocol) { + /* Create the (d)TLS state */ + const SSL_METHOD *meth = TLS_client_method(); + ctx_ = SSL_CTX_new(meth); + + int result = + SSL_CTX_set_ciphersuites(ctx_, + "TLS_AES_256_GCM_SHA384:TLS_CHACHA20_POLY1305_" + "SHA256:TLS_AES_128_GCM_SHA256"); + if (result != 1) { + throw errors::RuntimeException( + "Unable to set cipher list on TLS subsystem. Aborting."); + } + + SSL_CTX_set_min_proto_version(ctx_, TLS1_3_VERSION); + SSL_CTX_set_max_proto_version(ctx_, TLS1_3_VERSION); + SSL_CTX_set_verify(ctx_, SSL_VERIFY_NONE, NULL); + SSL_CTX_set_ssl_version(ctx_, meth); + + result = SSL_CTX_add_custom_ext( + ctx_, 100, SSL_EXT_CLIENT_HELLO | SSL_EXT_TLS1_3_ENCRYPTED_EXTENSIONS, + P2PSecureConsumerSocket::addHicnKeyIdCb, + P2PSecureConsumerSocket::freeHicnKeyIdCb, NULL, + P2PSecureConsumerSocket::parseHicnKeyIdCb, this); + + ssl_ = SSL_new(ctx_); + + bio_meth_ = BIO_meth_new(BIO_TYPE_CONNECT, "secure consumer socket"); + BIO_meth_set_read(bio_meth_, readOld); + BIO_meth_set_write(bio_meth_, writeOld); + BIO_meth_set_ctrl(bio_meth_, ctrl); + BIO *bio = BIO_new(bio_meth_); + BIO_set_init(bio, 1); + BIO_set_data(bio, this); + SSL_set_bio(ssl_, bio, bio); + + ConsumerSocket::getSocketOption(MAX_WINDOW_SIZE, old_max_win_); + ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, (double)1.0); + + ConsumerSocket::getSocketOption(CURRENT_WINDOW_SIZE, old_current_win_); + ConsumerSocket::setSocketOption(CURRENT_WINDOW_SIZE, (double)1.0); + + std::default_random_engine generator; + std::uniform_int_distribution<int> distribution( + 1, std::numeric_limits<uint32_t>::max()); + random_suffix_ = 0; + + this->ConsumerSocket::setSocketOption(ConsumerCallbacksOptions::READ_CALLBACK, + this); +}; + +P2PSecureConsumerSocket::~P2PSecureConsumerSocket() { + BIO_meth_free(bio_meth_); + SSL_shutdown(ssl_); +} + +int P2PSecureConsumerSocket::consume(const Name &name) { + if (transport_protocol_->isRunning()) { + return CONSUMER_BUSY; + } + + if ((SSL_in_before(this->ssl_) || SSL_in_init(this->ssl_))) { + ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, (double)1.0); + network_name_ = producer_namespace_.getRandomName(); + network_name_.setSuffix(0); + int result = SSL_connect(this->ssl_); + ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, old_max_win_); + ConsumerSocket::setSocketOption(CURRENT_WINDOW_SIZE, old_current_win_); + if (result != 1) + throw errors::RuntimeException("Unable to perform client handshake"); + } + std::shared_ptr<Name> prefix_name = std::make_shared<Name>( + secure_prefix_.family, + ip_address_get_buffer(&(secure_prefix_.address), secure_prefix_.family)); + std::shared_ptr<Prefix> prefix = + std::make_shared<Prefix>(*prefix_name, secure_prefix_.len); + TLSConsumerSocket tls_consumer(nullptr, this->protocol_, this->ssl_); + tls_consumer.setInterface(new interface::TLSConsumerSocket(&tls_consumer)); + + ConsumerTimerCallback *stats_summary_callback = nullptr; + this->getSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY, + &stats_summary_callback); + + uint32_t lifetime; + this->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, lifetime); + tls_consumer.setSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, + lifetime); + tls_consumer.setSocketOption(ConsumerCallbacksOptions::READ_CALLBACK, + read_callback_decrypted_); + tls_consumer.setSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY, + *stats_summary_callback); + tls_consumer.setSocketOption(GeneralTransportOptions::STATS_INTERVAL, + this->timer_interval_milliseconds_); + tls_consumer.setSocketOption(MAX_WINDOW_SIZE, old_max_win_); + tls_consumer.setSocketOption(CURRENT_WINDOW_SIZE, old_current_win_); + tls_consumer.connect(); + + if (payload_ != NULL) + return tls_consumer.consume((prefix->mapName(name)), std::move(payload_)); + else + return tls_consumer.consume((prefix->mapName(name))); +} + +int P2PSecureConsumerSocket::asyncConsume(const Name &name) { + if ((SSL_in_before(this->ssl_) || SSL_in_init(this->ssl_))) { + ConsumerSocket::setSocketOption(CURRENT_WINDOW_SIZE, (double)1.0); + ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, (double)1.0); + network_name_ = producer_namespace_.getRandomName(); + network_name_.setSuffix(0); + TRANSPORT_LOGD("Start handshake at %s", network_name_.toString().c_str()); + interface::ConsumerSocket::ReadCallback *on_payload = VOID_HANDLER; + this->getSocketOption(ConsumerCallbacksOptions::READ_CALLBACK, &on_payload); + int result = SSL_connect(this->ssl_); + ConsumerSocket::setSocketOption(MAX_WINDOW_SIZE, old_max_win_); + ConsumerSocket::setSocketOption(CURRENT_WINDOW_SIZE, old_current_win_); + if (result != 1) + throw errors::RuntimeException("Unable to perform client handshake"); + TRANSPORT_LOGD("Handshake performed!"); + } + + std::shared_ptr<Name> prefix_name = std::make_shared<Name>( + secure_prefix_.family, + ip_address_get_buffer(&(secure_prefix_.address), secure_prefix_.family)); + std::shared_ptr<Prefix> prefix = + std::make_shared<Prefix>(*prefix_name, secure_prefix_.len); + + tls_consumer_ = + std::make_shared<TLSConsumerSocket>(nullptr, this->protocol_, this->ssl_); + tls_consumer_->setInterface( + new interface::TLSConsumerSocket(tls_consumer_.get())); + + ConsumerTimerCallback *stats_summary_callback = nullptr; + this->getSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY, + &stats_summary_callback); + + uint32_t lifetime; + this->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, lifetime); + tls_consumer_->setSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, + lifetime); + tls_consumer_->setSocketOption(ConsumerCallbacksOptions::READ_CALLBACK, + read_callback_decrypted_); + tls_consumer_->setSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY, + *stats_summary_callback); + tls_consumer_->setSocketOption(GeneralTransportOptions::STATS_INTERVAL, + this->timer_interval_milliseconds_); + tls_consumer_->setSocketOption(MAX_WINDOW_SIZE, old_max_win_); + tls_consumer_->setSocketOption(CURRENT_WINDOW_SIZE, old_current_win_); + tls_consumer_->connect(); + + if (payload_ != NULL) + return tls_consumer_->asyncConsume((prefix->mapName(name)), + std::move(payload_)); + else + return tls_consumer_->asyncConsume((prefix->mapName(name))); +} + +void P2PSecureConsumerSocket::registerPrefix(const Prefix &producer_namespace) { + producer_namespace_ = producer_namespace; +} + +int P2PSecureConsumerSocket::setSocketOption( + int socket_option_key, ReadCallback *socket_option_value) { + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this](int socket_option_key, ReadCallback *socket_option_value) -> int { + switch (socket_option_key) { + case ConsumerCallbacksOptions::READ_CALLBACK: + read_callback_decrypted_ = socket_option_value; + break; + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; + }); +} + +void P2PSecureConsumerSocket::getReadBuffer(uint8_t **application_buffer, + size_t *max_length){}; + +void P2PSecureConsumerSocket::readDataAvailable(size_t length) noexcept {}; + +size_t P2PSecureConsumerSocket::maxBufferSize() const { + return SSL3_RT_MAX_PLAIN_LENGTH; +} + +void P2PSecureConsumerSocket::readBufferAvailable( + std::unique_ptr<utils::MemBuf> &&buffer) noexcept { + std::unique_lock<std::mutex> lck(this->mtx_); + if (head_) { + head_->prependChain(std::move(buffer)); + } else { + head_ = std::move(buffer); + } + + something_to_read_ = true; + cv_.notify_one(); +} + +void P2PSecureConsumerSocket::readError(const std::error_code ec) noexcept {}; + +void P2PSecureConsumerSocket::readSuccess(std::size_t total_size) noexcept { + std::unique_lock<std::mutex> lck(this->mtx_); + content_downloaded_ = true; + something_to_read_ = true; + cv_.notify_one(); +} + +bool P2PSecureConsumerSocket::isBufferMovable() noexcept { return true; } + +} // namespace implementation + +} // namespace transport |