diff options
Diffstat (limited to 'apps/http-proxy/src')
-rw-r--r-- | apps/http-proxy/src/ATSConnector.h | 109 | ||||
-rw-r--r-- | apps/http-proxy/src/HTTP1.xMessageFastParser.h | 40 | ||||
-rw-r--r-- | apps/http-proxy/src/IcnReceiver.h | 81 | ||||
-rw-r--r-- | apps/http-proxy/src/forwarder_interface.cc | 263 | ||||
-rw-r--r-- | apps/http-proxy/src/http_1x_message_fast_parser.cc (renamed from apps/http-proxy/src/HTTP1.xMessageFastParser.cc) | 47 | ||||
-rw-r--r-- | apps/http-proxy/src/http_proxy.cc | 376 | ||||
-rw-r--r-- | apps/http-proxy/src/http_session.cc (renamed from apps/http-proxy/src/ATSConnector.cc) | 166 | ||||
-rw-r--r-- | apps/http-proxy/src/icn_receiver.cc (renamed from apps/http-proxy/src/IcnReceiver.cc) | 103 |
8 files changed, 822 insertions, 363 deletions
diff --git a/apps/http-proxy/src/ATSConnector.h b/apps/http-proxy/src/ATSConnector.h deleted file mode 100644 index 8d91b7b7b..000000000 --- a/apps/http-proxy/src/ATSConnector.h +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Copyright (c) 2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include <hicn/transport/core/packet.h> - -#define ASIO_STANDALONE -#include <asio.hpp> -#include <deque> -#include <functional> - -namespace transport { - -using asio::ip::tcp; - -typedef std::function<void(const uint8_t *data, std::size_t size, bool is_last, - bool headers)> - ContentReceivedCallback; -typedef std::function<void()> OnReconnect; -typedef std::function<void()> ContentSentCallback; -typedef std::deque< - std::pair<std::unique_ptr<utils::MemBuf>, ContentSentCallback>> - BufferQueue; - -class ATSConnector { - static constexpr uint32_t buffer_size = 1024 * 512; - - enum class ConnectorState { - CLOSED, - CONNECTING, - CONNECTED, - }; - - public: - ATSConnector(asio::io_service &io_service, std::string &ip_address, - std::string &port, ContentReceivedCallback receive_callback, - OnReconnect on_reconnect_callback); - - ~ATSConnector(); - - void send(const uint8_t *buffer, std::size_t len, - ContentSentCallback &&content_sent = 0); - - void send(utils::MemBuf *buffer, ContentSentCallback &&content_sent); - - void close(); - - private: - void doConnect(); - - void doReadHeader(); - - void doReadBody(std::size_t body_size, std::size_t additional_bytes); - - // void handleReadChunked(std::error_code ec, std::size_t length, - // std::size_t size); - - void doReadChunkedHeader(); - - void doWrite(); - - bool checkConnected(); - - private: - void handleRead(std::error_code ec, std::size_t length); - void tryReconnection(); - void startConnectionTimer(); - void handleDeadline(const std::error_code &ec); - - asio::io_service &io_service_; - asio::ip::tcp::socket socket_; - asio::ip::tcp::resolver resolver_; - asio::ip::tcp::resolver::iterator endpoint_iterator_; - asio::steady_timer timer_; - - BufferQueue write_msgs_; - - asio::streambuf input_buffer_; - - bool is_reconnection_; - bool data_available_; - - std::size_t content_length_; - - // Chunked encoding - bool is_last_chunk_; - bool chunked_; - - ContentReceivedCallback receive_callback_; - OnReconnect on_reconnect_callback_; - - // Connector state - ConnectorState state_; -}; - -} // namespace transport diff --git a/apps/http-proxy/src/HTTP1.xMessageFastParser.h b/apps/http-proxy/src/HTTP1.xMessageFastParser.h deleted file mode 100644 index 79dbce19d..000000000 --- a/apps/http-proxy/src/HTTP1.xMessageFastParser.h +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Copyright (c) 2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include <algorithm> -#include <string> - -#include <hicn/transport/http/message.h> - -using transport::http::HTTPHeaders; - -class HTTPMessageFastParser { - public: - static HTTPHeaders getHeaders(const uint8_t* headers, std::size_t length); - static std::size_t hasBody(const uint8_t* headers, std::size_t length); - static bool isMpdRequest(const uint8_t* headers, std::size_t length); - static uint32_t parseCacheControl(const uint8_t* headers, std::size_t length); - - static std::string numbers; - static std::string content_length; - static std::string transfer_encoding; - static std::string chunked; - static std::string cache_control; - static std::string connection; - static std::string mpd; - static std::string separator; -}; diff --git a/apps/http-proxy/src/IcnReceiver.h b/apps/http-proxy/src/IcnReceiver.h deleted file mode 100644 index 9d0ab5172..000000000 --- a/apps/http-proxy/src/IcnReceiver.h +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Copyright (c) 2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "ATSConnector.h" - -#include <hicn/transport/core/prefix.h> -#include <hicn/transport/interfaces/publication_options.h> -#include <hicn/transport/interfaces/socket_producer.h> -#include <hicn/transport/utils/spinlock.h> - -#include <cassert> -#include <cstring> -#include <queue> -#include <utility> - -namespace transport { - -class AsyncConsumerProducer { - using SegmentProductionPair = std::pair<uint32_t, bool>; - using ResponseInfoMap = std::unordered_map<core::Name, SegmentProductionPair>; - using RequestQueue = std::queue<interface::PublicationOptions>; - - public: - explicit AsyncConsumerProducer(const std::string& prefix, - std::string& ip_address, std::string& port, - std::string& cache_size, std::string& mtu, - std::string& first_ipv6_word, - unsigned long default_content_lifetime, bool manifest); - - void start(); - - void run(); - - private: - void doSend(); - - void doReceive(); - - void publishContent(const uint8_t* data, std::size_t size, - bool is_last = true, bool headers = false); - - void manageIncomingInterest(core::Name& name, core::Packet::MemBufPtr& packet, - utils::MemBuf* payload); - - core::Prefix prefix_; - asio::io_service io_service_; - interface::ProducerSocket producer_socket_; - - std::string ip_address_; - std::string port_; - uint32_t cache_size_; - uint32_t mtu_; - - uint64_t request_counter_; - asio::signal_set signals_; - - // std::unordered_map<core::Name, std::shared_ptr<ATSConnector>> - // connection_map_; - ATSConnector connector_; - - unsigned long default_content_lifetime_; - - // ResponseInfoMap --> max_seq_number + bool indicating whether request is in - // production - ResponseInfoMap chunk_number_map_; - RequestQueue response_name_queue_; -}; - -} // namespace transport diff --git a/apps/http-proxy/src/forwarder_interface.cc b/apps/http-proxy/src/forwarder_interface.cc new file mode 100644 index 000000000..7d8235ac6 --- /dev/null +++ b/apps/http-proxy/src/forwarder_interface.cc @@ -0,0 +1,263 @@ +/* + * Copyright (c) 2020 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <arpa/inet.h> +#include <hicn/http-proxy/forwarder_interface.h> +#include <hicn/transport/utils/log.h> + +#include <chrono> +#include <iostream> +#include <thread> +#include <unordered_set> + +namespace transport { + +ForwarderInterface::~ForwarderInterface() {} + +int ForwarderInterface::connectToForwarder() { + sock_ = hc_sock_create(); + if (!sock_) return -1; + + if (hc_sock_connect(sock_) < 0) { + hc_sock_free(sock_); + sock_ = nullptr; + return -1; + } + + return 0; +} + +void ForwarderInterface::close() { + if (!closed_) { + internal_ioservice_.post([this]() { + work_.reset(); + if (sock_) { + hc_sock_free(sock_); + sock_ = nullptr; + } + }); + + if (thread_->joinable()) { + thread_->join(); + } + } +} + +void ForwarderInterface::removeConnectedUserNow(uint32_t route_id) { + internalRemoveConnectedUser(route_id); +} + +void ForwarderInterface::scheduleRemoveConnectedUser(uint32_t route_id) { + internal_ioservice_.post( + [this, route_id]() { internalRemoveConnectedUser(route_id); }); +} + +int32_t ForwarderInterface::getMainListenerPort() { + if (!sock_) return -1; + + hc_data_t *data; + if (hc_listener_list(sock_, &data) < 0) return -1; + + int ret = -1; + foreach_listener(l, data) { + std::string interface = std::string(l->interface_name); + if (interface.compare("lo") != 0) { + ret = l->local_port; + break; + } + } + + hc_data_free(data); + return ret; +} + +void ForwarderInterface::internalRemoveConnectedUser(uint32_t route_id) { + auto it = route_status_.find(route_id); + if (it == route_status_.end()) return; + + if (!sock_) return; + + // remove route + hc_data_t *data; + if (hc_route_list(sock_, &data) < 0) return; + + std::vector<hc_route_t *> routes_to_remove; + foreach_route(r, data) { + char remote_addr[INET6_ADDRSTRLEN]; + int ret = ip_address_ntop(&r->remote_addr, remote_addr, r->len, r->family); + if (ret < 0) continue; + + std::string route_addr(remote_addr); + if (route_addr.compare(it->second->route_addr) == 0 && + r->len == it->second->route_len) { + // route found + routes_to_remove.push_back(r); + } + } + + route_status_.erase(it); + + if (routes_to_remove.size() == 0) { + // nothing to do here + hc_data_free(data); + return; + } + + std::unordered_set<uint32_t> connids_to_remove; + for (unsigned i = 0; i < routes_to_remove.size(); i++) { + connids_to_remove.insert(routes_to_remove[i]->face_id); + if (hc_route_delete(sock_, routes_to_remove[i]) < 0) { + TRANSPORT_LOGE("Error removing route from forwarder."); + } + } + + // remove connection + if (hc_connection_list(sock_, &data) < 0) { + hc_data_free(data); + return; + } + + // collects pointerst to the connections using the conn IDs + std::vector<hc_connection_t *> conns_to_remove; + foreach_connection(c, data) { + if (connids_to_remove.find(c->id) != connids_to_remove.end()) { + // conn found + conns_to_remove.push_back(c); + } + } + + if (conns_to_remove.size() == 0) { + // nothing else to do here + hc_data_free(data); + return; + } + + for (unsigned i = 0; i < conns_to_remove.size(); i++) { + if (hc_connection_delete(sock_, conns_to_remove[i]) < 0) { + TRANSPORT_LOGE("Error removing connection from forwarder."); + } + } + + hc_data_free(data); +} + +void ForwarderInterface::internalCreateFaceAndRoute(RouteInfoPtr route_info, + uint8_t max_try, + asio::steady_timer *timer, + SetRouteCallback callback) { + int ret = tryToCreateFaceAndRoute(route_info.get()); + + if (ret < 0 && max_try > 0) { + max_try--; + timer->expires_from_now(std::chrono::milliseconds(500)); + timer->async_wait([this, _route_info = std::move(route_info), max_try, + timer, callback](std::error_code ec) { + if (ec) return; + internalCreateFaceAndRoute(std::move(_route_info), max_try, timer, + std::move(callback)); + }); + return; + } + + if (max_try == 0 && ret < 0) { + pending_add_route_counter_--; + external_ioservice_.post([callback]() { callback(false, ~0); }); + } else { + pending_add_route_counter_--; + route_status_[route_id_] = std::move(route_info); + external_ioservice_.post( + [route_id = route_id_, callback]() { callback(route_id, true); }); + route_id_++; + } + + delete timer; +} + +int ForwarderInterface::tryToCreateFaceAndRoute(route_info_t *route_info) { + if (!sock_) return -1; + + hc_data_t *data; + if (hc_listener_list(sock_, &data) < 0) { + return -1; + } + + bool found = false; + uint32_t face_id; + + foreach_listener(l, data) { + std::string interface = std::string(l->interface_name); + if (interface.compare("lo") != 0) { + found = true; + + ip_address_t remote_ip; + if (ip_address_pton(route_info->remote_addr.c_str(), &remote_ip) < 0) { + hc_data_free(data); + return -1; + } + + hc_face_t face; + memset(&face, 0, sizeof(hc_face_t)); + + face.face.type = FACE_TYPE_UDP; + face.face.family = route_info->family; + face.face.local_addr = l->local_addr; + face.face.remote_addr = remote_ip; + face.face.local_port = l->local_port; + face.face.remote_port = route_info->remote_port; + + if (netdevice_set_name(&face.face.netdevice, l->interface_name) < 0) { + hc_data_free(data); + return -1; + } + + if (hc_face_create(sock_, &face) < 0) { + hc_data_free(data); + return -1; + } + + face_id = face.id; + break; + } + } + + if (!found) { + hc_data_free(data); + return -1; + } + + ip_address_t route_ip; + hc_route_t route; + + if (ip_address_pton(route_info->route_addr.c_str(), &route_ip) < 0) { + hc_data_free(data); + return -1; + } + + route.face_id = face_id; + route.family = AF_INET6; + route.remote_addr = route_ip; + route.len = route_info->route_len; + route.cost = 1; + + if (hc_route_create(sock_, &route) < 0) { + hc_data_free(data); + return -1; + } + + hc_data_free(data); + return 0; +} + +} // namespace transport diff --git a/apps/http-proxy/src/HTTP1.xMessageFastParser.cc b/apps/http-proxy/src/http_1x_message_fast_parser.cc index 729eb3aeb..4b6b78d55 100644 --- a/apps/http-proxy/src/HTTP1.xMessageFastParser.cc +++ b/apps/http-proxy/src/http_1x_message_fast_parser.cc @@ -13,33 +13,48 @@ * limitations under the License. */ -#include "HTTP1.xMessageFastParser.h" - +#include <hicn/http-proxy/http_session.h> +#include <hicn/transport/http/request.h> #include <hicn/transport/http/response.h> #include <experimental/algorithm> #include <experimental/functional> #include <iostream> +constexpr char HTTPMessageFastParser::http_ok[]; +constexpr char HTTPMessageFastParser::http_cors[]; +constexpr char HTTPMessageFastParser::http_failed[]; + std::string HTTPMessageFastParser::numbers = "0123456789"; -std::string HTTPMessageFastParser::content_length = "Content-Length"; -std::string HTTPMessageFastParser::transfer_encoding = "Transfer-Encoding"; +std::string HTTPMessageFastParser::content_length = "content-length"; +std::string HTTPMessageFastParser::transfer_encoding = "transfer-encoding"; std::string HTTPMessageFastParser::chunked = "chunked"; -std::string HTTPMessageFastParser::cache_control = "Cache-Control"; +std::string HTTPMessageFastParser::cache_control = "cache-control"; std::string HTTPMessageFastParser::mpd = "mpd"; -std::string HTTPMessageFastParser::connection = "Connection"; +std::string HTTPMessageFastParser::connection = "connection"; std::string HTTPMessageFastParser::separator = "\r\n\r\n"; -HTTPHeaders HTTPMessageFastParser::getHeaders(const uint8_t *headers, - std::size_t length) { - HTTPHeaders ret; - std::string http_version; - std::string status_code; - std::string status_string; - - if (transport::http::HTTPResponse::parseHeaders(headers, length, ret, http_version, - status_code, status_string)) { - return ret; +void HTTPMessageFastParser::getHeaders(const uint8_t *headers, + std::size_t length, bool request, + transport::Metadata *metadata) { + if (request) { + transport::RequestMetadata *_metadata = + (transport::RequestMetadata *)(metadata); + + if (transport::http::HTTPRequest::parseHeaders( + headers, length, _metadata->headers, _metadata->http_version, + _metadata->method, _metadata->path)) { + return; + } + } else { + transport::ResponseMetadata *_metadata = + (transport::ResponseMetadata *)(metadata); + + if (transport::http::HTTPResponse::parseHeaders( + headers, length, _metadata->headers, _metadata->http_version, + _metadata->status_code, _metadata->status_string)) { + return; + } } throw std::runtime_error("Error parsing response headers."); diff --git a/apps/http-proxy/src/http_proxy.cc b/apps/http-proxy/src/http_proxy.cc new file mode 100644 index 000000000..262fcb8e1 --- /dev/null +++ b/apps/http-proxy/src/http_proxy.cc @@ -0,0 +1,376 @@ +/* + * Copyright (c) 2020 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <hicn/http-proxy/http_proxy.h> +#include <hicn/http-proxy/http_session.h> +#include <hicn/http-proxy/utils.h> +#include <hicn/transport/core/interest.h> +#include <hicn/transport/utils/log.h> +#include <hicn/transport/utils/string_utils.h> + +namespace transport { + +using core::Interest; +using core::Name; +using interface::ConsumerCallbacksOptions; +using interface::ConsumerInterestCallback; +using interface::ConsumerSocket; +using interface::TransportProtocolAlgorithms; + +class HTTPClientConnectionCallback : interface::ConsumerSocket::ReadCallback { + public: + HTTPClientConnectionCallback(TcpReceiver& tcp_receiver, + utils::EventThread& thread) + : tcp_receiver_(tcp_receiver), + thread_(thread), + prefix_hash_(tcp_receiver_.prefix_hash_), + consumer_(TransportProtocolAlgorithms::RAAQM, thread_.getIoService()), + session_(nullptr), + current_size_(0) { + consumer_.setSocketOption(ConsumerCallbacksOptions::READ_CALLBACK, this); + consumer_.setSocketOption( + ConsumerCallbacksOptions::INTEREST_OUTPUT, + (ConsumerInterestCallback)std::bind( + &HTTPClientConnectionCallback::processLeavingInterest, this, + std::placeholders::_1, std::placeholders::_2)); + consumer_.setSocketOption( + ConsumerCallbacksOptions::INTEREST_RETRANSMISSION, + (ConsumerInterestCallback)std::bind( + &HTTPClientConnectionCallback::processInterestRetx, this, + std::placeholders::_1, std::placeholders::_2)); + consumer_.connect(); + } + + void stop() { session_->close(); } + + void setHttpSession(asio::ip::tcp::socket&& socket) { + session_ = std::make_unique<HTTPSession>( + std::move(socket), + std::bind(&HTTPClientConnectionCallback::readDataFromTcp, this, + std::placeholders::_1, std::placeholders::_2, + std::placeholders::_3, std::placeholders::_4, + std::placeholders::_5), + [this](asio::ip::tcp::socket& socket) -> bool { + try { + std::string remote_address = + socket.remote_endpoint().address().to_string(); + std::uint16_t remote_port = socket.remote_endpoint().port(); + TRANSPORT_LOGD("Client %s:%d disconnected.", remote_address.c_str(), + remote_port); + } catch (std::system_error& e) { + // Do nothing + } + + consumer_.stop(); + request_buffer_queue_.clear(); + tcp_receiver_.onClientDisconnect(this); + return false; + }); + + current_size_ = 0; + } + + private: + void consumeNextRequest() { + if (request_buffer_queue_.size() == 0) { + TRANSPORT_LOGD("No additional requests to process."); + return; + } + + auto& buffer = request_buffer_queue_.front().second; + uint64_t request_hash = + utils::hash::fnv64_buf(buffer.data(), buffer.size()); + + std::stringstream name; + name << prefix_hash_.substr(0, prefix_hash_.length() - 2); + + for (uint16_t* word = (uint16_t*)&request_hash; + std::size_t(word) < + (std::size_t(&request_hash) + sizeof(request_hash)); + word++) { + name << ":" << std::hex << *word; + } + + name << "|0"; + + // Non blocking consume :) + consumer_.consume(Name(name.str())); + } + + // tcp callbacks + + void readDataFromTcp(const uint8_t* data, std::size_t size, bool is_last, + bool headers, Metadata* metadata) { + if (headers) { + // Add the request to the request queue + RequestMetadata* _metadata = reinterpret_cast<RequestMetadata*>(metadata); + tmp_buffer_ = std::make_pair(utils::MemBuf::copyBuffer(data, size), + _metadata->path); + if (TRANSPORT_EXPECT_FALSE( + _metadata->path.compare("/isHicnProxyOn") == 0 && is_last)) { + /** + * It seems this request is for us. + * Get hicn parameters. + */ + processClientRequest(_metadata); + return; + } + } else { + // Append payload chunk to last request added. Here we are assuming + // HTTP/1.1. + tmp_buffer_.first->prependChain(utils::MemBuf::copyBuffer(data, size)); + } + + current_size_ += size; + + if (is_last) { + TRANSPORT_LOGD("Request received: %s", + std::string((const char*)tmp_buffer_.first->data(), + tmp_buffer_.first->length()) + .c_str()); + if (current_size_ < 1400) { + request_buffer_queue_.emplace_back(std::move(tmp_buffer_)); + } else { + TRANSPORT_LOGE("Ignoring client request due to size (%zu) > 1400.", + current_size_); + session_->close(); + current_size_ = 0; + return; + } + + if (!consumer_.isRunning()) { + TRANSPORT_LOGD( + "Consumer stopped, triggering consume from TCP session " + "handler.."); + consumeNextRequest(); + } + + current_size_ = 0; + } + } + + // hicn callbacks + + void processLeavingInterest(interface::ConsumerSocket& c, + const core::Interest& interest) { + if (interest.getName().getSuffix() == 0 && interest.payloadSize() == 0) { + Interest& int2 = const_cast<Interest&>(interest); + int2.appendPayload(request_buffer_queue_.front().first->clone()); + } + } + + void processInterestRetx(interface::ConsumerSocket& c, + const core::Interest& interest) { + if (interest.payloadSize() == 0) { + Interest& int2 = const_cast<Interest&>(interest); + int2.appendPayload(request_buffer_queue_.front().first->clone()); + } + } + + bool isBufferMovable() noexcept { return true; } + void getReadBuffer(uint8_t** application_buffer, size_t* max_length) {} + void readDataAvailable(size_t length) noexcept {} + size_t maxBufferSize() const { return 64 * 1024; } + + void readBufferAvailable(std::unique_ptr<utils::MemBuf>&& buffer) noexcept { + // Response received. Send it back to client + auto _buffer = buffer.release(); + TRANSPORT_LOGD("From hicn: %zu bytes.", _buffer->length()); + session_->send(_buffer, []() {}); + } + + void readError(const std::error_code ec) noexcept { + TRANSPORT_LOGE("Error reading from hicn consumer socket. Closing session."); + session_->close(); + } + + void readSuccess(std::size_t total_size) noexcept { + request_buffer_queue_.pop_front(); + consumeNextRequest(); + } + + void processClientRequest(RequestMetadata* metadata) { + auto it = metadata->headers.find("hicn"); + if (it == metadata->headers.end()) { + /* + * Probably it is an OPTION message for access control. + * Let's grant it! + */ + if (metadata->method == "OPTIONS") { + session_->send( + (const uint8_t*)HTTPMessageFastParser::http_cors, + std::strlen(HTTPMessageFastParser::http_cors), [this]() { + auto& socket = session_->socket_; + TRANSPORT_LOGI( + "Sent OPTIONS to client %s:%d", + socket.remote_endpoint().address().to_string().c_str(), + socket.remote_endpoint().port()); + }); + } + } else { + tcp_receiver_.parseHicnHeader( + it->second, [this](bool result, std::string configured_prefix) { + const char* reply = nullptr; + if (result) { + reply = HTTPMessageFastParser::http_ok; + } else { + reply = HTTPMessageFastParser::http_failed; + } + + /* Route created. Send back a 200 OK to client */ + session_->send( + (const uint8_t*)reply, std::strlen(reply), [this, result]() { + auto& socket = session_->socket_; + TRANSPORT_LOGI( + "Sent %d response to client %s:%d", result, + socket.remote_endpoint().address().to_string().c_str(), + socket.remote_endpoint().port()); + }); + }); + } + } + + private: + TcpReceiver& tcp_receiver_; + utils::EventThread& thread_; + std::string& prefix_hash_; + ConsumerSocket consumer_; + std::unique_ptr<HTTPSession> session_; + std::deque<std::pair<std::unique_ptr<utils::MemBuf>, std::string>> + request_buffer_queue_; + std::pair<std::unique_ptr<utils::MemBuf>, std::string> tmp_buffer_; + std::size_t current_size_; +}; + +TcpReceiver::TcpReceiver(std::uint16_t port, const std::string& prefix, + const std::string& ipv6_first_word) + : Receiver(), + listener_(thread_.getIoService(), port, + std::bind(&TcpReceiver::onNewConnection, this, + std::placeholders::_1)), + prefix_(prefix), + ipv6_first_word_(ipv6_first_word), + prefix_hash_(generatePrefix(prefix_, ipv6_first_word_)), + forwarder_config_( + thread_.getIoService(), + [this](std::error_code ec) { + if (!ec) { + listener_.doAccept(); + for (int i = 0; i < 10; i++) { + http_clients_.emplace_back( + new HTTPClientConnectionCallback(*this, thread_)); + } + } + }), + stopped_(false) { + forwarder_config_.tryToConnectToForwarder(); +} + +void TcpReceiver::stop() { + thread_.add([this]() { + stopped_ = true; + + /* Stop the listener */ + listener_.stop(); + + /* Close connection with forwarder */ + forwarder_config_.close(); + + /* Stop the used http clients */ + for (auto& client : used_http_clients_) { + client->stop(); + } + + /* Delete unused clients */ + for (auto& client : http_clients_) { + delete client; + } + }); +} + +void TcpReceiver::onClientDisconnect(HTTPClientConnectionCallback* client) { + if (stopped_) { + delete client; + return; + } + + http_clients_.emplace_front(client); + used_http_clients_.erase(client); +} + +void TcpReceiver::onNewConnection(asio::ip::tcp::socket&& socket) { + if (http_clients_.size() == 0) { + // Create new HTTPClientConnectionCallback + TRANSPORT_LOGD("Creating new HTTPClientConnectionCallback."); + http_clients_.emplace_back( + new HTTPClientConnectionCallback(*this, thread_)); + } + + // Get new HTTPClientConnectionCallback + HTTPClientConnectionCallback* c = http_clients_.front(); + http_clients_.pop_front(); + + // Set http session + c->setHttpSession(std::move(socket)); + + // Move it to used clients + used_http_clients_.insert(c); +} + +void HTTPProxy::setupSignalHandler() { + signals_.async_wait([this](const std::error_code& ec, int signal_number) { + if (!ec) { + TRANSPORT_LOGI("Received signal %d. Stopping gracefully.", signal_number); + stop(); + } + }); +} + +void HTTPProxy::stop() { + for (auto& receiver : receivers_) { + receiver->stop(); + } + + for (auto& receiver : receivers_) { + receiver->stopAndJoinThread(); + } + + signals_.cancel(); +} + +HTTPProxy::HTTPProxy(ClientParams& params, std::size_t n_thread) + : signals_(main_io_context_, SIGINT, SIGQUIT) { + for (uint16_t i = 0; i < n_thread; i++) { + // icn_receivers_.emplace_back(std::make_unique<IcnReceiver>(icn_params)); + receivers_.emplace_back(std::make_unique<TcpReceiver>( + params.tcp_listen_port, params.prefix, params.first_ipv6_word)); + } + + setupSignalHandler(); +} + +HTTPProxy::HTTPProxy(ServerParams& params, std::size_t n_thread) + : signals_(main_io_context_, SIGINT, SIGQUIT) { + for (uint16_t i = 0; i < n_thread; i++) { + receivers_.emplace_back(std::make_unique<IcnReceiver>( + params.prefix, params.first_ipv6_word, params.origin_address, + params.origin_port, params.cache_size, params.mtu, + params.content_lifetime, params.manifest)); + } + + setupSignalHandler(); +} + +} // namespace transport diff --git a/apps/http-proxy/src/ATSConnector.cc b/apps/http-proxy/src/http_session.cc index a9b889941..6b91c12c3 100644 --- a/apps/http-proxy/src/ATSConnector.cc +++ b/apps/http-proxy/src/http_session.cc @@ -13,48 +13,95 @@ * limitations under the License. */ -#include "ATSConnector.h" -#include "HTTP1.xMessageFastParser.h" - +#include <hicn/http-proxy/http_proxy.h> #include <hicn/transport/utils/branch_prediction.h> #include <hicn/transport/utils/log.h> + #include <iostream> namespace transport { -ATSConnector::ATSConnector(asio::io_service &io_service, - std::string &ip_address, std::string &port, - ContentReceivedCallback receive_callback, - OnReconnect on_reconnect_callback) +HTTPSession::HTTPSession(asio::io_service &io_service, std::string &ip_address, + std::string &port, + ContentReceivedCallback receive_callback, + OnConnectionClosed on_connection_closed_callback, + bool client) : io_service_(io_service), socket_(io_service_), resolver_(io_service_), endpoint_iterator_(resolver_.resolve({ip_address, port})), timer_(io_service), + reverse_(client), is_reconnection_(false), data_available_(false), content_length_(0), is_last_chunk_(false), chunked_(false), receive_callback_(receive_callback), - on_reconnect_callback_(on_reconnect_callback) { + on_connection_closed_callback_(on_connection_closed_callback) { input_buffer_.prepare(buffer_size + 2048); state_ = ConnectorState::CONNECTING; + + if (reverse_) { + header_info_ = std::make_unique<RequestMetadata>(); + } else { + header_info_ = std::make_unique<ResponseMetadata>(); + } + doConnect(); } -ATSConnector::~ATSConnector() {} +HTTPSession::HTTPSession(asio::ip::tcp::socket socket, + ContentReceivedCallback receive_callback, + OnConnectionClosed on_connection_closed_callback, + bool client) + : +#if ((ASIO_VERSION / 100 % 1000) < 12) + io_service_(socket.get_io_service()), +#else + io_service_((asio::io_context &)(socket.get_executor().context())), +#endif + socket_(std::move(socket)), + resolver_(io_service_), + timer_(io_service_), + reverse_(client), + is_reconnection_(false), + data_available_(false), + content_length_(0), + is_last_chunk_(false), + chunked_(false), + receive_callback_(receive_callback), + on_connection_closed_callback_(on_connection_closed_callback) { + input_buffer_.prepare(buffer_size + 2048); + state_ = ConnectorState::CONNECTED; + asio::ip::tcp::no_delay noDelayOption(true); + socket_.set_option(noDelayOption); -void ATSConnector::send(const uint8_t *packet, std::size_t len, - ContentSentCallback &&content_sent) { - asio::async_write( - socket_, asio::buffer(packet, len), - [content_sent = std::move(content_sent)]( - std::error_code ec, std::size_t /*length*/) { content_sent(); }); + if (reverse_) { + header_info_ = std::make_unique<RequestMetadata>(); + } else { + header_info_ = std::make_unique<ResponseMetadata>(); + } + doReadHeader(); } -void ATSConnector::send(utils::MemBuf *buffer, - ContentSentCallback &&content_sent) { +HTTPSession::~HTTPSession() {} + +void HTTPSession::send(const uint8_t *packet, std::size_t len, + ContentSentCallback &&content_sent) { + io_service_.dispatch([this, packet, len, content_sent]() { + asio::async_write(socket_, asio::buffer(packet, len), + [content_sent = std::move(content_sent)]( + std::error_code ec, std::size_t /*length*/) { + if (!ec) { + content_sent(); + } + }); + }); +} + +void HTTPSession::send(utils::MemBuf *buffer, + ContentSentCallback &&content_sent) { io_service_.dispatch([this, buffer, callback = std::move(content_sent)]() { bool write_in_progress = !write_msgs_.empty(); write_msgs_.emplace_back(std::unique_ptr<utils::MemBuf>(buffer), @@ -70,24 +117,25 @@ void ATSConnector::send(utils::MemBuf *buffer, }); } -void ATSConnector::close() { +void HTTPSession::close() { if (state_ != ConnectorState::CLOSED) { state_ = ConnectorState::CLOSED; if (socket_.is_open()) { - socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both); + // socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both); socket_.close(); // on_disconnect_callback_(); } } } -void ATSConnector::doWrite() { +void HTTPSession::doWrite() { auto &buffer = write_msgs_.front().first; asio::async_write(socket_, asio::buffer(buffer->data(), buffer->length()), [this](std::error_code ec, std::size_t length) { if (TRANSPORT_EXPECT_FALSE(!ec)) { - TRANSPORT_LOGD("Content successfully sent!"); + TRANSPORT_LOGD("Content successfully sent! %zu", + length); write_msgs_.front().second(); write_msgs_.pop_front(); if (!write_msgs_.empty()) { @@ -99,12 +147,14 @@ void ATSConnector::doWrite() { }); } // namespace transport -void ATSConnector::handleRead(std::error_code ec, std::size_t length) { +void HTTPSession::handleRead(std::error_code ec, std::size_t length) { if (TRANSPORT_EXPECT_TRUE(!ec)) { content_length_ -= length; const uint8_t *buffer = asio::buffer_cast<const uint8_t *>(input_buffer_.data()); - receive_callback_(buffer, input_buffer_.size(), !content_length_, false); + bool is_last = chunked_ ? (is_last_chunk_ ? !content_length_ : false) + : !content_length_; + receive_callback_(buffer, input_buffer_.size(), is_last, false, nullptr); input_buffer_.consume(input_buffer_.size()); if (!content_length_) { @@ -117,7 +167,7 @@ void ATSConnector::handleRead(std::error_code ec, std::size_t length) { auto to_read = content_length_ >= buffer_size ? buffer_size : content_length_; asio::async_read(socket_, input_buffer_, asio::transfer_exactly(to_read), - std::bind(&ATSConnector::handleRead, this, + std::bind(&HTTPSession::handleRead, this, std::placeholders::_1, std::placeholders::_2)); } } else if (ec == asio::error::eof) { @@ -126,8 +176,8 @@ void ATSConnector::handleRead(std::error_code ec, std::size_t length) { } } -void ATSConnector::doReadBody(std::size_t body_size, - std::size_t additional_bytes) { +void HTTPSession::doReadBody(std::size_t body_size, + std::size_t additional_bytes) { auto bytes_to_read = body_size > additional_bytes ? (body_size - additional_bytes) : 0; @@ -140,14 +190,16 @@ void ATSConnector::doReadBody(std::size_t body_size, if (to_read > 0) { content_length_ = bytes_to_read; asio::async_read(socket_, input_buffer_, asio::transfer_exactly(to_read), - std::bind(&ATSConnector::handleRead, this, + std::bind(&HTTPSession::handleRead, this, std::placeholders::_1, std::placeholders::_2)); } else { - const uint8_t *buffer = - asio::buffer_cast<const uint8_t *>(input_buffer_.data()); - receive_callback_(buffer, body_size, chunked_ ? is_last_chunk_ : !to_read, - false); - input_buffer_.consume(body_size); + if (body_size) { + const uint8_t *buffer = + asio::buffer_cast<const uint8_t *>(input_buffer_.data()); + receive_callback_(buffer, body_size, chunked_ ? is_last_chunk_ : !to_read, + false, nullptr); + input_buffer_.consume(body_size); + } if (!chunked_ || is_last_chunk_) { doReadHeader(); @@ -157,7 +209,7 @@ void ATSConnector::doReadBody(std::size_t body_size, } } -void ATSConnector::doReadChunkedHeader() { +void HTTPSession::doReadChunkedHeader() { asio::async_read_until( socket_, input_buffer_, "\r\n", [this](std::error_code ec, std::size_t length) { @@ -176,14 +228,17 @@ void ATSConnector::doReadChunkedHeader() { }); } -void ATSConnector::doReadHeader() { +void HTTPSession::doReadHeader() { asio::async_read_until( socket_, input_buffer_, "\r\n\r\n", [this](std::error_code ec, std::size_t length) { if (TRANSPORT_EXPECT_TRUE(!ec)) { const uint8_t *buffer = asio::buffer_cast<const uint8_t *>(input_buffer_.data()); - auto headers = HTTPMessageFastParser::getHeaders(buffer, length); + HTTPMessageFastParser::getHeaders(buffer, length, reverse_, + header_info_.get()); + + auto &headers = header_info_->headers; // Try to get content length, if available auto it = headers.find(HTTPMessageFastParser::content_length); @@ -199,7 +254,8 @@ void ATSConnector::doReadHeader() { } } - receive_callback_(buffer, length, !size && !chunked_, true); + receive_callback_(buffer, length, !size && !chunked_, true, + header_info_.get()); auto additional_bytes = input_buffer_.size() - length; input_buffer_.consume(length); @@ -215,23 +271,25 @@ void ATSConnector::doReadHeader() { }); } -void ATSConnector::tryReconnection() { - TRANSPORT_LOGD("Connection lost. Trying to reconnect...\n"); - if (state_ == ConnectorState::CONNECTED) { - state_ = ConnectorState::CONNECTING; - is_reconnection_ = true; - io_service_.post([this]() { - if (socket_.is_open()) { - // socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both); - socket_.close(); - } - startConnectionTimer(); - doConnect(); - }); +void HTTPSession::tryReconnection() { + if (on_connection_closed_callback_(socket_)) { + if (state_ == ConnectorState::CONNECTED) { + TRANSPORT_LOGD("Connection lost. Trying to reconnect...\n"); + state_ = ConnectorState::CONNECTING; + is_reconnection_ = true; + io_service_.post([this]() { + if (socket_.is_open()) { + // socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both); + socket_.close(); + } + startConnectionTimer(); + doConnect(); + }); + } } } -void ATSConnector::doConnect() { +void HTTPSession::doConnect() { asio::async_connect(socket_, endpoint_iterator_, [this](std::error_code ec, tcp::resolver::iterator) { if (!ec) { @@ -263,17 +321,17 @@ void ATSConnector::doConnect() { }); } -bool ATSConnector::checkConnected() { +bool HTTPSession::checkConnected() { return state_ == ConnectorState::CONNECTED; } -void ATSConnector::startConnectionTimer() { +void HTTPSession::startConnectionTimer() { timer_.expires_from_now(std::chrono::seconds(10)); timer_.async_wait( - std::bind(&ATSConnector::handleDeadline, this, std::placeholders::_1)); + std::bind(&HTTPSession::handleDeadline, this, std::placeholders::_1)); } -void ATSConnector::handleDeadline(const std::error_code &ec) { +void HTTPSession::handleDeadline(const std::error_code &ec) { if (!ec) { io_service_.post([this]() { socket_.close(); diff --git a/apps/http-proxy/src/IcnReceiver.cc b/apps/http-proxy/src/icn_receiver.cc index 24b0eb5dc..23e5b5623 100644 --- a/apps/http-proxy/src/IcnReceiver.cc +++ b/apps/http-proxy/src/icn_receiver.cc @@ -13,9 +13,9 @@ * limitations under the License. */ -#include "IcnReceiver.h" -#include "HTTP1.xMessageFastParser.h" - +#include <hicn/http-proxy/http_1x_message_fast_parser.h> +#include <hicn/http-proxy/icn_receiver.h> +#include <hicn/http-proxy/utils.h> #include <hicn/transport/core/interest.h> #include <hicn/transport/http/default_values.h> #include <hicn/transport/utils/hash.h> @@ -26,56 +26,31 @@ namespace transport { -core::Prefix generatePrefix(const std::string& prefix_url, - std::string& first_ipv6_word) { - const char* str = prefix_url.c_str(); - uint16_t pos = 0; - - if (strncmp("http://", str, 7) == 0) { - pos = 7; - } else if (strncmp("https://", str, 8) == 0) { - pos = 8; - } - - str += pos; - - uint32_t locator_hash = utils::hash::fnv32_buf(str, strlen(str)); - - std::stringstream stream; - stream << first_ipv6_word << ":0"; - - for (uint16_t* word = (uint16_t*)&locator_hash; - std::size_t(word) < (std::size_t(&locator_hash) + sizeof(locator_hash)); - word++) { - stream << ":" << std::hex << *word; - } - - stream << "::0"; - - return core::Prefix(stream.str(), 64); -} - AsyncConsumerProducer::AsyncConsumerProducer( - const std::string& prefix, std::string& ip_address, std::string& port, - std::string& cache_size, std::string& mtu, std::string& first_ipv6_word, - unsigned long default_lifetime, bool manifest) - : prefix_(generatePrefix(prefix, first_ipv6_word)), + asio::io_service& io_service, const std::string& prefix, + const std::string& first_ipv6_word, const std::string& origin_address, + const std::string& origin_port, const std::string& cache_size, + const std::string& mtu, const std::string& content_lifetime, bool manifest) + : prefix_(core::Prefix(generatePrefix(prefix, first_ipv6_word), 64)), + io_service_(io_service), + external_io_service_(true), producer_socket_(), - ip_address_(ip_address), - port_(port), + ip_address_(origin_address), + port_(origin_port), cache_size_(std::stoul(cache_size)), mtu_(std::stoul(mtu)), request_counter_(0), - signals_(io_service_, SIGINT, SIGQUIT), connector_(io_service_, ip_address_, port_, std::bind(&AsyncConsumerProducer::publishContent, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4), - [this]() { + [this](asio::ip::tcp::socket& socket) -> bool { std::queue<interface::PublicationOptions> empty; std::swap(response_name_queue_, empty); + + return true; }), - default_content_lifetime_(default_lifetime) { + default_content_lifetime_(std::stoul(content_lifetime)) { int ret = producer_socket_.setSocketOption( interface::GeneralTransportOptions::OUTPUT_BUFFER_SIZE, cache_size_); @@ -98,15 +73,6 @@ AsyncConsumerProducer::AsyncConsumerProducer( } producer_socket_.registerPrefix(prefix_); - - // Let the main thread to catch SIGINT and SIGQUIT - signals_.async_wait( - [this](const std::error_code& errorCode, int signal_number) { - TRANSPORT_LOGI("Number of requests processed by plugin: %lu", - (unsigned long)request_counter_); - producer_socket_.stop(); - connector_.close(); - }); } void AsyncConsumerProducer::start() { @@ -116,7 +82,19 @@ void AsyncConsumerProducer::start() { void AsyncConsumerProducer::run() { start(); - io_service_.run(); + + if (!external_io_service_) { + io_service_.run(); + } +} + +void AsyncConsumerProducer::stop() { + io_service_.post([this]() { + TRANSPORT_LOGI("Number of requests processed by plugin: %lu", + (unsigned long)request_counter_); + producer_socket_.stop(); + connector_.close(); + }); } void AsyncConsumerProducer::doReceive() { @@ -124,11 +102,13 @@ void AsyncConsumerProducer::doReceive() { interface::ProducerCallbacksOptions::CACHE_MISS, [this](interface::ProducerSocket& producer, interface::Interest& interest) { - // core::Name n(interest.getWritableName(), true); - io_service_.post(std::bind( - &AsyncConsumerProducer::manageIncomingInterest, this, - interest.getWritableName(), interest.acquireMemBufReference(), - interest.getPayload().release())); + if (interest.payloadSize() > 0) { + // Interest may contain http request + io_service_.post(std::bind( + &AsyncConsumerProducer::manageIncomingInterest, this, + interest.getWritableName(), interest.acquireMemBufReference(), + interest.getPayload().release())); + } }); producer_socket_.connect(); @@ -141,16 +121,15 @@ void AsyncConsumerProducer::manageIncomingInterest( auto _it = chunk_number_map_.find(name); auto _end = chunk_number_map_.end(); - std::cout << "Received interest " << seg << std::endl; - if (_it != _end) { if (_it->second.second) { - // Content is in production + TRANSPORT_LOGD( + "Content is in production, interests will be satisfied shortly."); return; } if (seg >= _it->second.first) { - TRANSPORT_LOGI( + TRANSPORT_LOGD( "Ignoring interest with name %s for a content object which does not " "exist. (Request: %u, max: %u)", name.toString().c_str(), (uint32_t)seg, (uint32_t)_it->second.first); @@ -158,8 +137,6 @@ void AsyncConsumerProducer::manageIncomingInterest( } } - std::cout << "Received interest " << seg << std::endl; - bool is_mpd = HTTPMessageFastParser::isMpdRequest(payload->data(), payload->length()); @@ -212,7 +189,7 @@ void AsyncConsumerProducer::publishContent(const uint8_t* data, } it->second.first += - producer_socket_.produce(name, data, size, is_last, start_suffix); + producer_socket_.produceStream(name, data, size, is_last, start_suffix); if (is_last) { it->second.second = false; |