diff options
Diffstat (limited to 'apps/http-proxy/src')
-rw-r--r-- | apps/http-proxy/src/forwarder_interface.cc | 48 | ||||
-rw-r--r-- | apps/http-proxy/src/http_1x_message_fast_parser.cc | 7 | ||||
-rw-r--r-- | apps/http-proxy/src/http_proxy.cc | 97 | ||||
-rw-r--r-- | apps/http-proxy/src/http_session.cc | 74 | ||||
-rw-r--r-- | apps/http-proxy/src/icn_receiver.cc | 48 |
5 files changed, 129 insertions, 145 deletions
diff --git a/apps/http-proxy/src/forwarder_interface.cc b/apps/http-proxy/src/forwarder_interface.cc index c2448de9a..717679e09 100644 --- a/apps/http-proxy/src/forwarder_interface.cc +++ b/apps/http-proxy/src/forwarder_interface.cc @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 Cisco and/or its affiliates. + * Copyright (c) 2021-2022 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -14,8 +14,8 @@ */ #include <arpa/inet.h> +#include <hicn/apps/utils/logger.h> #include <hicn/http-proxy/forwarder_interface.h> -#include <hicn/transport/utils/log.h> #include <chrono> #include <iostream> @@ -27,7 +27,7 @@ namespace transport { ForwarderInterface::~ForwarderInterface() {} int ForwarderInterface::connectToForwarder() { - sock_ = hc_sock_create(); + sock_ = hc_sock_create(FORWARDER_TYPE_HICNLIGHT, NULL); if (!sock_) return -1; if (hc_sock_connect(sock_) < 0) { @@ -96,7 +96,8 @@ void ForwarderInterface::internalRemoveConnectedUser(uint32_t route_id) { std::vector<hc_route_t *> routes_to_remove; foreach_route(r, data) { char remote_addr[INET6_ADDRSTRLEN]; - int ret = ip_address_ntop(&r->remote_addr, remote_addr, r->len, r->family); + int ret = + hicn_ip_address_ntop(&r->remote_addr, remote_addr, r->len, r->family); if (ret < 0) continue; std::string route_addr(remote_addr); @@ -119,7 +120,7 @@ void ForwarderInterface::internalRemoveConnectedUser(uint32_t route_id) { for (unsigned i = 0; i < routes_to_remove.size(); i++) { connids_to_remove.insert(routes_to_remove[i]->face_id); if (hc_route_delete(sock_, routes_to_remove[i]) < 0) { - TRANSPORT_LOG_ERROR << "Error removing route from forwarder."; + LoggerErr() << "Error removing route from forwarder."; } } @@ -146,24 +147,23 @@ void ForwarderInterface::internalRemoveConnectedUser(uint32_t route_id) { for (unsigned i = 0; i < conns_to_remove.size(); i++) { if (hc_connection_delete(sock_, conns_to_remove[i]) < 0) { - TRANSPORT_LOG_ERROR << "Error removing connection from forwarder."; + LoggerErr() << "Error removing connection from forwarder."; } } hc_data_free(data); } -void ForwarderInterface::internalCreateFaceAndRoute(RouteInfoPtr route_info, - uint8_t max_try, - asio::steady_timer *timer, - SetRouteCallback callback) { +void ForwarderInterface::internalCreateFaceAndRoute( + RouteInfoPtr route_info, uint8_t max_try, asio::steady_timer *timer, + const SetRouteCallback &callback) { int ret = tryToCreateFaceAndRoute(route_info.get()); if (ret < 0 && max_try > 0) { max_try--; timer->expires_from_now(std::chrono::milliseconds(500)); timer->async_wait([this, _route_info = std::move(route_info), max_try, - timer, callback](std::error_code ec) { + timer, callback](const std::error_code &ec) { if (ec) return; internalCreateFaceAndRoute(std::move(_route_info), max_try, timer, std::move(callback)); @@ -185,7 +185,8 @@ void ForwarderInterface::internalCreateFaceAndRoute(RouteInfoPtr route_info, delete timer; } -int ForwarderInterface::tryToCreateFaceAndRoute(route_info_t *route_info) { +int ForwarderInterface::tryToCreateFaceAndRoute( + const route_info_t *route_info) { if (!sock_) return -1; hc_data_t *data; @@ -201,8 +202,9 @@ int ForwarderInterface::tryToCreateFaceAndRoute(route_info_t *route_info) { if (interface.compare("lo") != 0) { found = true; - ip_address_t remote_ip; - if (ip_address_pton(route_info->remote_addr.c_str(), &remote_ip) < 0) { + hicn_ip_address_t remote_ip; + if (hicn_ip_address_pton(route_info->remote_addr.c_str(), &remote_ip) < + 0) { hc_data_free(data); return -1; } @@ -210,14 +212,14 @@ int ForwarderInterface::tryToCreateFaceAndRoute(route_info_t *route_info) { hc_face_t face; memset(&face, 0, sizeof(hc_face_t)); - face.face.type = FACE_TYPE_UDP; - face.face.family = route_info->family; - face.face.local_addr = l->local_addr; - face.face.remote_addr = remote_ip; - face.face.local_port = l->local_port; - face.face.remote_port = route_info->remote_port; + face.type = FACE_TYPE_UDP; + face.family = route_info->family; + face.local_addr = l->local_addr; + face.remote_addr = remote_ip; + face.local_port = l->local_port; + face.remote_port = route_info->remote_port; - if (netdevice_set_name(&face.face.netdevice, l->interface_name) < 0) { + if (netdevice_set_name(&face.netdevice, l->interface_name) < 0) { hc_data_free(data); return -1; } @@ -237,10 +239,10 @@ int ForwarderInterface::tryToCreateFaceAndRoute(route_info_t *route_info) { return -1; } - ip_address_t route_ip; + hicn_ip_address_t route_ip; hc_route_t route; - if (ip_address_pton(route_info->route_addr.c_str(), &route_ip) < 0) { + if (hicn_ip_address_pton(route_info->route_addr.c_str(), &route_ip) < 0) { hc_data_free(data); return -1; } diff --git a/apps/http-proxy/src/http_1x_message_fast_parser.cc b/apps/http-proxy/src/http_1x_message_fast_parser.cc index 4b6b78d55..ffae2368b 100644 --- a/apps/http-proxy/src/http_1x_message_fast_parser.cc +++ b/apps/http-proxy/src/http_1x_message_fast_parser.cc @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -99,7 +99,8 @@ bool HTTPMessageFastParser::isMpdRequest(const uint8_t *headers, return false; } -uint32_t HTTPMessageFastParser::parseCacheControl(const uint8_t *headers, - std::size_t length) { +uint32_t HTTPMessageFastParser::parseCacheControl( + [[maybe_unused]] const uint8_t *headers, + [[maybe_unused]] std::size_t length) { return 0; } diff --git a/apps/http-proxy/src/http_proxy.cc b/apps/http-proxy/src/http_proxy.cc index 2040f7cfa..7419c7f7f 100644 --- a/apps/http-proxy/src/http_proxy.cc +++ b/apps/http-proxy/src/http_proxy.cc @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 Cisco and/or its affiliates. + * Copyright (c) 2021-2022 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -13,11 +13,11 @@ * limitations under the License. */ +#include <hicn/apps/utils/logger.h> #include <hicn/http-proxy/http_proxy.h> #include <hicn/http-proxy/http_session.h> #include <hicn/http-proxy/utils.h> #include <hicn/transport/core/interest.h> -#include <hicn/transport/utils/log.h> #include <hicn/transport/utils/string_utils.h> namespace transport { @@ -29,16 +29,15 @@ using interface::ConsumerInterestCallback; using interface::ConsumerSocket; using interface::TransportProtocolAlgorithms; -class HTTPClientConnectionCallback : interface::ConsumerSocket::ReadCallback { +class HTTPClientConnectionCallback + : public interface::ConsumerSocket::ReadCallback { public: HTTPClientConnectionCallback(TcpReceiver& tcp_receiver, utils::EventThread& thread) : tcp_receiver_(tcp_receiver), thread_(thread), prefix_hash_(tcp_receiver_.prefix_hash_), - consumer_(TransportProtocolAlgorithms::RAAQM, thread_.getIoService()), - session_(nullptr), - current_size_(0) { + consumer_(TransportProtocolAlgorithms::RAAQM, thread_) { consumer_.setSocketOption(ConsumerCallbacksOptions::READ_CALLBACK, this); consumer_.setSocketOption( ConsumerCallbacksOptions::INTEREST_OUTPUT, @@ -62,15 +61,15 @@ class HTTPClientConnectionCallback : interface::ConsumerSocket::ReadCallback { std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4, std::placeholders::_5), - [this](asio::ip::tcp::socket& socket) -> bool { + [this](const asio::ip::tcp::socket& socket) { try { std::string remote_address = socket.remote_endpoint().address().to_string(); std::uint16_t remote_port = socket.remote_endpoint().port(); - TRANSPORT_LOG_INFO << "Client " << remote_address << ":" - << remote_port << "disconnected."; - } catch (std::system_error& e) { - // Do nothing + LoggerInfo() << "Client " << remote_address << ":" << remote_port + << "disconnected."; + } catch (asio::system_error& e) { + LoggerInfo() << "Client disconnected."; } consumer_.stop(); @@ -136,24 +135,19 @@ class HTTPClientConnectionCallback : interface::ConsumerSocket::ReadCallback { current_size_ += size; if (is_last) { - // TRANSPORT_LOGD("Request received: %s", - // std::string((const char*)tmp_buffer_.first->data(), - // tmp_buffer_.first->length()) - // .c_str()); if (current_size_ < 1400) { request_buffer_queue_.emplace_back(std::move(tmp_buffer_)); } else { - TRANSPORT_LOG_ERROR << "Ignoring client request due to size (" - << current_size_ << ") > 1400."; + LoggerErr() << "Ignoring client request due to size (" << current_size_ + << ") > 1400."; session_->close(); current_size_ = 0; return; } if (!consumer_.isRunning()) { - TRANSPORT_LOG_INFO - << "Consumer stopped, triggering consume from TCP session " - "handler.."; + LoggerInfo() << "Consumer stopped, triggering consume from TCP session " + "handler.."; consumeNextRequest(); } @@ -163,15 +157,16 @@ class HTTPClientConnectionCallback : interface::ConsumerSocket::ReadCallback { // hicn callbacks - void processLeavingInterest(interface::ConsumerSocket& c, - const core::Interest& interest) { + void processLeavingInterest( + [[maybe_unused]] const interface::ConsumerSocket& c, + const core::Interest& interest) { if (interest.getName().getSuffix() == 0 && interest.payloadSize() == 0) { Interest& int2 = const_cast<Interest&>(interest); int2.appendPayload(request_buffer_queue_.front().first->clone()); } } - void processInterestRetx(interface::ConsumerSocket& c, + void processInterestRetx([[maybe_unused]] const interface::ConsumerSocket& c, const core::Interest& interest) { if (interest.payloadSize() == 0) { Interest& int2 = const_cast<Interest&>(interest); @@ -180,20 +175,22 @@ class HTTPClientConnectionCallback : interface::ConsumerSocket::ReadCallback { } bool isBufferMovable() noexcept { return true; } - void getReadBuffer(uint8_t** application_buffer, size_t* max_length) {} - void readDataAvailable(size_t length) noexcept {} + void getReadBuffer(uint8_t** application_buffer, + size_t* max_length) { /*nothing to do*/ + } + void readDataAvailable(size_t length) noexcept { /*nothing to do*/ + } size_t maxBufferSize() const { return 64 * 1024; } void readBufferAvailable(std::unique_ptr<utils::MemBuf>&& buffer) noexcept { // Response received. Send it back to client auto _buffer = buffer.release(); // TRANSPORT_LOGD("From hicn: %zu bytes.", _buffer->length()); - session_->send(_buffer, []() {}); + session_->send(_buffer, []() { /*nothing to do*/ }); } - void readError(const std::error_code ec) noexcept { - TRANSPORT_LOG_ERROR - << "Error reading from hicn consumer socket. Closing session."; + void readError(const std::error_code& ec) noexcept { + LoggerErr() << "Error reading from hicn consumer socket. Closing session."; session_->close(); } @@ -213,15 +210,16 @@ class HTTPClientConnectionCallback : interface::ConsumerSocket::ReadCallback { session_->send((const uint8_t*)HTTPMessageFastParser::http_cors, std::strlen(HTTPMessageFastParser::http_cors), [this]() { auto& socket = session_->socket_; - TRANSPORT_LOG_INFO - << "Sent OPTIONS to client " - << socket.remote_endpoint().address() << ":" - << socket.remote_endpoint().port(); + LoggerInfo() << "Sent OPTIONS to client " + << socket.remote_endpoint().address() + << ":" << socket.remote_endpoint().port(); }); } } else { tcp_receiver_.parseHicnHeader( - it->second, [this](bool result, std::string configured_prefix) { + it->second, + [this](bool result, + [[maybe_unused]] const std::string& configured_prefix) { const char* reply = nullptr; if (result) { reply = HTTPMessageFastParser::http_ok; @@ -230,28 +228,26 @@ class HTTPClientConnectionCallback : interface::ConsumerSocket::ReadCallback { } /* Route created. Send back a 200 OK to client */ - session_->send((const uint8_t*)reply, std::strlen(reply), - [this, result]() { - auto& socket = session_->socket_; - TRANSPORT_LOG_INFO - << "Sent " << result << " response to client " - << socket.remote_endpoint().address() << ":" - << socket.remote_endpoint().port(); - }); + session_->send( + (const uint8_t*)reply, std::strlen(reply), [this, result]() { + auto& socket = session_->socket_; + LoggerInfo() << "Sent " << result << " response to client " + << socket.remote_endpoint().address() << ":" + << socket.remote_endpoint().port(); + }); }); } } - private: TcpReceiver& tcp_receiver_; utils::EventThread& thread_; std::string& prefix_hash_; ConsumerSocket consumer_; - std::unique_ptr<HTTPSession> session_; + std::unique_ptr<HTTPSession> session_ = nullptr; std::deque<std::pair<std::unique_ptr<utils::MemBuf>, std::string>> request_buffer_queue_; std::pair<std::unique_ptr<utils::MemBuf>, std::string> tmp_buffer_; - std::size_t current_size_; + std::size_t current_size_ = 0; }; TcpReceiver::TcpReceiver(std::uint16_t port, const std::string& prefix, @@ -264,8 +260,7 @@ TcpReceiver::TcpReceiver(std::uint16_t port, const std::string& prefix, ipv6_first_word_(ipv6_first_word), prefix_hash_(generatePrefix(prefix_, ipv6_first_word_)), forwarder_config_( - thread_.getIoService(), - [this](std::error_code ec) { + thread_.getIoService(), [this](const std::error_code& ec) { if (!ec) { listener_.doAccept(); for (int i = 0; i < 10; i++) { @@ -273,8 +268,7 @@ TcpReceiver::TcpReceiver(std::uint16_t port, const std::string& prefix, new HTTPClientConnectionCallback(*this, thread_)); } } - }), - stopped_(false) { + }) { forwarder_config_.tryToConnectToForwarder(); } @@ -331,8 +325,8 @@ void TcpReceiver::onNewConnection(asio::ip::tcp::socket&& socket) { void HTTPProxy::setupSignalHandler() { signals_.async_wait([this](const std::error_code& ec, int signal_number) { if (!ec) { - TRANSPORT_LOG_INFO << "Received signal " << signal_number - << ". Stopping gracefully."; + LoggerInfo() << "Received signal " << signal_number + << ". Stopping gracefully."; stop(); } }); @@ -353,7 +347,6 @@ void HTTPProxy::stop() { HTTPProxy::HTTPProxy(ClientParams& params, std::size_t n_thread) : signals_(main_io_context_, SIGINT, SIGQUIT) { for (uint16_t i = 0; i < n_thread; i++) { - // icn_receivers_.emplace_back(std::make_unique<IcnReceiver>(icn_params)); receivers_.emplace_back(std::make_unique<TcpReceiver>( params.tcp_listen_port, params.prefix, params.first_ipv6_word)); } diff --git a/apps/http-proxy/src/http_session.cc b/apps/http-proxy/src/http_session.cc index 84c814cbd..06a81dc27 100644 --- a/apps/http-proxy/src/http_session.cc +++ b/apps/http-proxy/src/http_session.cc @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019 Cisco and/or its affiliates. + * Copyright (c) 2021-2022 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -13,30 +13,24 @@ * limitations under the License. */ +#include <hicn/apps/utils/logger.h> #include <hicn/http-proxy/http_proxy.h> #include <hicn/transport/utils/branch_prediction.h> -#include <hicn/transport/utils/log.h> #include <iostream> namespace transport { -HTTPSession::HTTPSession(asio::io_service &io_service, std::string &ip_address, - std::string &port, - ContentReceivedCallback receive_callback, - OnConnectionClosed on_connection_closed_callback, - bool client) +HTTPSession::HTTPSession( + asio::io_service &io_service, const std::string &ip_address, + const std::string &port, const ContentReceivedCallback &receive_callback, + const OnConnectionClosed &on_connection_closed_callback, bool client) : io_service_(io_service), socket_(io_service_), resolver_(io_service_), endpoint_iterator_(resolver_.resolve({ip_address, port})), timer_(io_service), reverse_(client), - is_reconnection_(false), - data_available_(false), - content_length_(0), - is_last_chunk_(false), - chunked_(false), receive_callback_(receive_callback), on_connection_closed_callback_(on_connection_closed_callback) { input_buffer_.prepare(buffer_size + 2048); @@ -51,10 +45,10 @@ HTTPSession::HTTPSession(asio::io_service &io_service, std::string &ip_address, doConnect(); } -HTTPSession::HTTPSession(asio::ip::tcp::socket socket, - ContentReceivedCallback receive_callback, - OnConnectionClosed on_connection_closed_callback, - bool client) +HTTPSession::HTTPSession( + asio::ip::tcp::socket socket, + const ContentReceivedCallback &receive_callback, + const OnConnectionClosed &on_connection_closed_callback, bool client) : #if ((ASIO_VERSION / 100 % 1000) < 12) io_service_(socket.get_io_service()), @@ -92,7 +86,7 @@ void HTTPSession::send(const uint8_t *packet, std::size_t len, io_service_.dispatch([this, packet, len, content_sent]() { asio::async_write(socket_, asio::buffer(packet, len), [content_sent = std::move(content_sent)]( - std::error_code ec, std::size_t /*length*/) { + const std::error_code &ec, std::size_t /*length*/) { if (!ec) { content_sent(); } @@ -120,9 +114,7 @@ void HTTPSession::close() { if (state_ != ConnectorState::CLOSED) { state_ = ConnectorState::CLOSED; if (socket_.is_open()) { - // socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both); socket_.close(); - // on_disconnect_callback_(); } } } @@ -130,25 +122,26 @@ void HTTPSession::close() { void HTTPSession::doWrite() { auto &buffer = write_msgs_.front().first; - asio::async_write(socket_, asio::buffer(buffer->data(), buffer->length()), - [this](std::error_code ec, std::size_t length) { - if (TRANSPORT_EXPECT_FALSE(!ec)) { - write_msgs_.front().second(); - write_msgs_.pop_front(); - if (!write_msgs_.empty()) { - doWrite(); - } - } - }); + asio::async_write( + socket_, asio::buffer(buffer->data(), buffer->length()), + [this](const std::error_code &ec, [[maybe_unused]] std::size_t length) { + if (TRANSPORT_EXPECT_FALSE(!ec)) { + write_msgs_.front().second(); + write_msgs_.pop_front(); + if (!write_msgs_.empty()) { + doWrite(); + } + } + }); } // namespace transport -void HTTPSession::handleRead(std::error_code ec, std::size_t length) { +void HTTPSession::handleRead(const std::error_code &ec, std::size_t length) { if (TRANSPORT_EXPECT_TRUE(!ec)) { content_length_ -= length; const uint8_t *buffer = asio::buffer_cast<const uint8_t *>(input_buffer_.data()); - bool is_last = chunked_ ? (is_last_chunk_ ? !content_length_ : false) - : !content_length_; + bool check = is_last_chunk_ ? !content_length_ : false; + bool is_last = chunked_ ? check : !content_length_; receive_callback_(buffer, input_buffer_.size(), is_last, false, nullptr); input_buffer_.consume(input_buffer_.size()); @@ -207,7 +200,7 @@ void HTTPSession::doReadBody(std::size_t body_size, void HTTPSession::doReadChunkedHeader() { asio::async_read_until( socket_, input_buffer_, "\r\n", - [this](std::error_code ec, std::size_t length) { + [this](const std::error_code &ec, std::size_t length) { if (TRANSPORT_EXPECT_TRUE(!ec)) { const uint8_t *buffer = asio::buffer_cast<const uint8_t *>(input_buffer_.data()); @@ -226,7 +219,7 @@ void HTTPSession::doReadChunkedHeader() { void HTTPSession::doReadHeader() { asio::async_read_until( socket_, input_buffer_, "\r\n\r\n", - [this](std::error_code ec, std::size_t length) { + [this](const std::error_code &ec, std::size_t length) { if (TRANSPORT_EXPECT_TRUE(!ec)) { const uint8_t *buffer = asio::buffer_cast<const uint8_t *>(input_buffer_.data()); @@ -269,12 +262,11 @@ void HTTPSession::doReadHeader() { void HTTPSession::tryReconnection() { if (on_connection_closed_callback_(socket_)) { if (state_ == ConnectorState::CONNECTED) { - TRANSPORT_LOG_ERROR << "Connection lost. Trying to reconnect..."; + LoggerErr() << "Connection lost. Trying to reconnect..."; state_ = ConnectorState::CONNECTING; is_reconnection_ = true; io_service_.post([this]() { if (socket_.is_open()) { - // socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both); socket_.close(); } startConnectionTimer(); @@ -287,7 +279,7 @@ void HTTPSession::tryReconnection() { void HTTPSession::doConnect() { asio::async_connect( socket_, endpoint_iterator_, - [this](std::error_code ec, tcp::resolver::iterator) { + [this](const std::error_code &ec, tcp::resolver::iterator) { if (!ec) { timer_.cancel(); state_ = ConnectorState::CONNECTED; @@ -295,8 +287,6 @@ void HTTPSession::doConnect() { asio::ip::tcp::no_delay noDelayOption(true); socket_.set_option(noDelayOption); - // on_reconnect_callback_(); - doReadHeader(); if (data_available_ && !write_msgs_.empty()) { @@ -306,11 +296,11 @@ void HTTPSession::doConnect() { if (is_reconnection_) { is_reconnection_ = false; - TRANSPORT_LOG_INFO << "Connection recovered!"; + LoggerInfo() << "Connection recovered!"; } } else { - TRANSPORT_LOG_ERROR << "Impossible to reconnect: " << ec.message(); + LoggerErr() << "Impossible to reconnect: " << ec.message(); close(); } }); @@ -330,7 +320,7 @@ void HTTPSession::handleDeadline(const std::error_code &ec) { if (!ec) { io_service_.post([this]() { socket_.close(); - TRANSPORT_LOG_ERROR << "Error connecting. Is the server running?"; + LoggerErr() << "Error connecting. Is the server running?"; io_service_.stop(); }); } diff --git a/apps/http-proxy/src/icn_receiver.cc b/apps/http-proxy/src/icn_receiver.cc index ea8ac7191..c8904aa95 100644 --- a/apps/http-proxy/src/icn_receiver.cc +++ b/apps/http-proxy/src/icn_receiver.cc @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019 Cisco and/or its affiliates. + * Copyright (c) 2021-2022 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -13,13 +13,13 @@ * limitations under the License. */ +#include <hicn/apps/utils/logger.h> #include <hicn/http-proxy/http_1x_message_fast_parser.h> #include <hicn/http-proxy/icn_receiver.h> #include <hicn/http-proxy/utils.h> #include <hicn/transport/core/interest.h> #include <hicn/transport/http/default_values.h> #include <hicn/transport/utils/hash.h> -#include <hicn/transport/utils/log.h> #include <functional> #include <memory> @@ -33,18 +33,15 @@ AsyncConsumerProducer::AsyncConsumerProducer( const std::string& mtu, const std::string& content_lifetime, bool manifest) : prefix_(core::Prefix(generatePrefix(prefix, first_ipv6_word), 64)), io_service_(io_service), - external_io_service_(true), - producer_socket_(), ip_address_(origin_address), port_(origin_port), - cache_size_(std::stoul(cache_size)), - mtu_(std::stoul(mtu)), - request_counter_(0), + cache_size_((uint32_t)std::stoul(cache_size)), + mtu_((uint32_t)std::stoul(mtu)), connector_(io_service_, ip_address_, port_, std::bind(&AsyncConsumerProducer::publishContent, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4), - [this](asio::ip::tcp::socket& socket) -> bool { + [this]([[maybe_unused]] const asio::ip::tcp::socket& socket) { std::queue<interface::PublicationOptions> empty; std::swap(response_name_queue_, empty); @@ -55,28 +52,28 @@ AsyncConsumerProducer::AsyncConsumerProducer( interface::GeneralTransportOptions::OUTPUT_BUFFER_SIZE, cache_size_); if (ret != SOCKET_OPTION_SET) { - TRANSPORT_LOG_WARNING << "Warning: output buffer size has not been set."; + LoggerWarn() << "Warning: output buffer size has not been set."; } ret = producer_socket_.setSocketOption( - interface::GeneralTransportOptions::MAKE_MANIFEST, manifest); + interface::GeneralTransportOptions::MANIFEST_MAX_CAPACITY, manifest); if (ret != SOCKET_OPTION_SET) { - TRANSPORT_LOG_WARNING << "Warning: impossible to enable signatures."; + LoggerWarn() << "Warning: impossible to enable signatures."; } ret = producer_socket_.setSocketOption( interface::GeneralTransportOptions::DATA_PACKET_SIZE, mtu_); if (ret != SOCKET_OPTION_SET) { - TRANSPORT_LOG_WARNING << "Warning: mtu has not been set."; + LoggerWarn() << "Warning: mtu has not been set."; } producer_socket_.registerPrefix(prefix_); } void AsyncConsumerProducer::start() { - TRANSPORT_LOG_INFO << "Starting listening"; + LoggerInfo() << "Starting listening"; doReceive(); } @@ -90,8 +87,8 @@ void AsyncConsumerProducer::run() { void AsyncConsumerProducer::stop() { io_service_.post([this]() { - TRANSPORT_LOG_INFO << "Number of requests processed by plugin: " - << request_counter_; + LoggerInfo() << "Number of requests processed by plugin: " + << request_counter_; producer_socket_.stop(); connector_.close(); }); @@ -100,7 +97,7 @@ void AsyncConsumerProducer::stop() { void AsyncConsumerProducer::doReceive() { producer_socket_.setSocketOption( interface::ProducerCallbacksOptions::CACHE_MISS, - [this](interface::ProducerSocket& producer, + [this]([[maybe_unused]] const interface::ProducerSocket& producer, interface::Interest& interest) { if (interest.payloadSize() > 0) { // Interest may contain http request @@ -112,6 +109,7 @@ void AsyncConsumerProducer::doReceive() { }); producer_socket_.connect(); + producer_socket_.start(); } void AsyncConsumerProducer::manageIncomingInterest( @@ -128,9 +126,9 @@ void AsyncConsumerProducer::manageIncomingInterest( if (seg >= _it->second.first) { // TRANSPORT_LOGD( - // "Ignoring interest with name %s for a content object which does not " - // "exist. (Request: %u, max: %u)", - // name.toString().c_str(), (uint32_t)seg, (uint32_t)_it->second.first); + // "Ignoring interest with name %s for a content object which does not + // " "exist. (Request: %u, max: %u)", name.toString().c_str(), + // (uint32_t)seg, (uint32_t)_it->second.first); return; } } @@ -148,7 +146,8 @@ void AsyncConsumerProducer::manageIncomingInterest( response_name_queue_.emplace(std::move(name), is_mpd ? 1000 : default_content_lifetime_); - connector_.send(payload, [packet = std::move(packet)]() {}); + connector_.send(payload, + [packet = std::move(packet)]() { /*nothing to do*/ }); } void AsyncConsumerProducer::publishContent(const uint8_t* data, @@ -157,26 +156,25 @@ void AsyncConsumerProducer::publishContent(const uint8_t* data, uint32_t start_suffix = 0; if (response_name_queue_.empty()) { - std::cerr << "Aborting due tue empty request queue" << std::endl; + LoggerErr() << "Aborting due tue empty request queue"; abort(); } - interface::PublicationOptions& options = response_name_queue_.front(); + const interface::PublicationOptions& options = response_name_queue_.front(); int ret = producer_socket_.setSocketOption( interface::GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME, options.getLifetime()); if (TRANSPORT_EXPECT_FALSE(ret != SOCKET_OPTION_SET)) { - TRANSPORT_LOG_WARNING << "Warning: content object lifetime has not been set."; + LoggerWarn() << "Warning: content object lifetime has not been set."; } const interface::Name& name = options.getName(); auto it = chunk_number_map_.find(name); if (it == chunk_number_map_.end()) { - std::cerr << "Aborting due to response not found in ResposeInfo map." - << std::endl; + LoggerErr() << "Aborting due to response not found in ResposeInfo map."; abort(); } |