diff options
Diffstat (limited to 'apps/http-proxy')
-rw-r--r-- | apps/http-proxy/CMakeLists.txt | 16 | ||||
-rw-r--r-- | apps/http-proxy/src/HTTP1.xMessageFastParser.cc | 43 | ||||
-rw-r--r-- | apps/http-proxy/src/HTTP1.xMessageFastParser.h | 30 | ||||
-rw-r--r-- | apps/http-proxy/src/forwarder_config.h | 191 | ||||
-rw-r--r-- | apps/http-proxy/src/forwarder_interface.cc | 262 | ||||
-rw-r--r-- | apps/http-proxy/src/forwarder_interface.h | 102 | ||||
-rw-r--r-- | apps/http-proxy/src/http_proxy.cc | 115 | ||||
-rw-r--r-- | apps/http-proxy/src/http_proxy.h | 15 | ||||
-rw-r--r-- | apps/http-proxy/src/http_session.cc | 49 | ||||
-rw-r--r-- | apps/http-proxy/src/http_session.h | 31 | ||||
-rw-r--r-- | apps/http-proxy/src/icn_receiver.cc | 14 |
11 files changed, 792 insertions, 76 deletions
diff --git a/apps/http-proxy/CMakeLists.txt b/apps/http-proxy/CMakeLists.txt index 13839dbf7..44f6deea6 100644 --- a/apps/http-proxy/CMakeLists.txt +++ b/apps/http-proxy/CMakeLists.txt @@ -24,17 +24,12 @@ if (NOT CMAKE_BUILD_TYPE) set(CMAKE_BUILD_TYPE "Release") endif () -include_directories( - SYSTEM - ${CMAKE_BINARY_DIR} - ${LIB${TRANSPORT_LIBRARY}_INCLUDE_DIR} -) - set(LIB_SOURCE_FILES src/http_session.cc src/http_proxy.cc src/HTTP1.xMessageFastParser.cc src/icn_receiver.cc + src/forwarder_interface.cc ) set(LIB_SERVER_HEADER_FILES @@ -42,6 +37,8 @@ set(LIB_SERVER_HEADER_FILES src/http_session.h src/http_proxy.h src/HTTP1.xMessageFastParser.h + src/forwarder_interface.h + src/forwarder_config.h ) set(APP_SOURCE_FILES @@ -51,14 +48,19 @@ set(APP_SOURCE_FILES set(LIBHTTP_PROXY hicnhttpproxy) set(LIBHTTP_PROXY_STATIC ${LIBHTTP_PROXY}.static) +list(APPEND COMPILER_DEFINITIONS + -DWITH_POLICY +) + build_library(${LIBHTTP_PROXY} STATIC SOURCES ${LIB_SOURCE_FILES} LINK_LIBRARIES ${LIBRARIES} DEPENDS ${DEPENDENCIES} - INCLUDE_DIRS ${LIBTRANSPORT_INCLUDE_DIRS} + INCLUDE_DIRS ${LIBTRANSPORT_INCLUDE_DIRS} ${LIBHICNCTRL_INCLUDE_DIRS} COMPONENT ${HICN_APPS} LINK_FLAGS ${LINK_FLAGS} + DEFINITIONS ${COMPILER_DEFINITIONS} ) if (NOT DISABLE_EXECUTABLES) diff --git a/apps/http-proxy/src/HTTP1.xMessageFastParser.cc b/apps/http-proxy/src/HTTP1.xMessageFastParser.cc index d1271ebdf..ea942a463 100644 --- a/apps/http-proxy/src/HTTP1.xMessageFastParser.cc +++ b/apps/http-proxy/src/HTTP1.xMessageFastParser.cc @@ -22,36 +22,41 @@ #include <experimental/functional> #include <iostream> +#include "http_session.h" + +constexpr char HTTPMessageFastParser::http_ok[]; +constexpr char HTTPMessageFastParser::http_cors[]; +constexpr char HTTPMessageFastParser::http_failed[]; + std::string HTTPMessageFastParser::numbers = "0123456789"; -std::string HTTPMessageFastParser::content_length = "Content-Length"; -std::string HTTPMessageFastParser::transfer_encoding = "Transfer-Encoding"; +std::string HTTPMessageFastParser::content_length = "content-length"; +std::string HTTPMessageFastParser::transfer_encoding = "transfer-encoding"; std::string HTTPMessageFastParser::chunked = "chunked"; -std::string HTTPMessageFastParser::cache_control = "Cache-Control"; +std::string HTTPMessageFastParser::cache_control = "cache-control"; std::string HTTPMessageFastParser::mpd = "mpd"; -std::string HTTPMessageFastParser::connection = "Connection"; +std::string HTTPMessageFastParser::connection = "connection"; std::string HTTPMessageFastParser::separator = "\r\n\r\n"; -HTTPHeaders HTTPMessageFastParser::getHeaders(const uint8_t *headers, - std::size_t length, - bool request) { - HTTPHeaders ret; - std::string http_version; - +void HTTPMessageFastParser::getHeaders(const uint8_t *headers, + std::size_t length, bool request, + transport::Metadata *metadata) { if (request) { - std::string method; - std::string url; + transport::RequestMetadata *_metadata = + (transport::RequestMetadata *)(metadata); - if (transport::http::HTTPRequest::parseHeaders(headers, length, ret, - http_version, method, url)) { - return ret; + if (transport::http::HTTPRequest::parseHeaders( + headers, length, _metadata->headers, _metadata->http_version, + _metadata->method, _metadata->path)) { + return; } } else { - std::string status_code; - std::string status_string; + transport::ResponseMetadata *_metadata = + (transport::ResponseMetadata *)(metadata); if (transport::http::HTTPResponse::parseHeaders( - headers, length, ret, http_version, status_code, status_string)) { - return ret; + headers, length, _metadata->headers, _metadata->http_version, + _metadata->status_code, _metadata->status_string)) { + return; } } diff --git a/apps/http-proxy/src/HTTP1.xMessageFastParser.h b/apps/http-proxy/src/HTTP1.xMessageFastParser.h index 0ad38b9b6..7c035c83b 100644 --- a/apps/http-proxy/src/HTTP1.xMessageFastParser.h +++ b/apps/http-proxy/src/HTTP1.xMessageFastParser.h @@ -22,10 +22,36 @@ using transport::http::HTTPHeaders; +namespace transport { +struct Metadata; +} + class HTTPMessageFastParser { public: - static HTTPHeaders getHeaders(const uint8_t* headers, std::size_t length, - bool request); + static constexpr char http_ok[] = + "HTTP/1.1 200 OK\r\n" + "Access-Control-Allow-Origin: *\r\n" + "Connection: close\r\n" + "Content-Length: 0\r\n\r\n"; + + static constexpr char http_cors[] = + "HTTP/1.1 200 OK\r\n" + "Date: %s\r\n" + "Connection: close\r\n" + "Content-Length: 0\r\n" + "Access-Control-Allow-Origin: *\r\n" + "Access-Control-Allow-Methods: GET\r\n" + "Access-Control-Allow-Headers: hicn\r\n" + "Access-Control-Max-Age: 1800\r\n\r\n"; + + static constexpr char http_failed[] = + "HTTP/1.1 500 Internal Server Error\r\n" + "Date: %s\r\n" + "Content-Length: 0\r\nConnection: " + "close\r\n\r\n"; + + static void getHeaders(const uint8_t* headers, std::size_t length, + bool request, transport::Metadata* metadata); static std::size_t hasBody(const uint8_t* headers, std::size_t length); static bool isMpdRequest(const uint8_t* headers, std::size_t length); static uint32_t parseCacheControl(const uint8_t* headers, std::size_t length); diff --git a/apps/http-proxy/src/forwarder_config.h b/apps/http-proxy/src/forwarder_config.h new file mode 100644 index 000000000..96f275c9f --- /dev/null +++ b/apps/http-proxy/src/forwarder_config.h @@ -0,0 +1,191 @@ +/* + * Copyright (c) 2020 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <hicn/transport/portability/c_portability.h> +#include <hicn/transport/utils/branch_prediction.h> +#include <hicn/transport/utils/log.h> +#include <hicn/transport/utils/string_utils.h> + +#include <asio.hpp> +#include <chrono> +#include <sstream> +#include <string> + +#include "forwarder_interface.h" + +#define RETRY_INTERVAL 300 + +namespace transport { + +static constexpr char server_header[] = "server"; +static constexpr char prefix_header[] = "prefix"; +static constexpr char port_header[] = "port"; + +using OnForwarderConfiguredCallback = std::function<void(bool)>; + +class ForwarderConfig { + public: + using ListenerRetrievedCallback = std::function<void(std::error_code)>; + + template <typename Callback> + ForwarderConfig(asio::io_service& io_service, Callback&& callback) + : forwarder_interface_(io_service), + resolver_(io_service), + retx_count_(0), + timer_(io_service), + hicn_listen_port_(~0), + listener_retrieved_callback_(std::forward<Callback>(callback)) {} + + void tryToConnectToForwarder() { + doTryToConnectToForwarder(std::make_error_code(std::errc(0))); + } + + void doTryToConnectToForwarder(std::error_code ec) { + if (!ec) { + // ec == 0 --> timer expired + int ret = forwarder_interface_.connectToForwarder(); + if (ret < 0) { + // We were not able to connect to the local forwarder. Do not give up + // and retry. + TRANSPORT_LOGE("Could not connect to local forwarder. Retrying."); + + timer_.expires_from_now(std::chrono::milliseconds(RETRY_INTERVAL)); + timer_.async_wait(std::bind(&ForwarderConfig::doTryToConnectToForwarder, + this, std::placeholders::_1)); + } else { + timer_.cancel(); + retx_count_ = 0; + doGetMainListener(std::make_error_code(std::errc(0))); + } + } else { + TRANSPORT_LOGI("Timer for re-trying forwarder connection canceled."); + } + } + + void doGetMainListener(std::error_code ec) { + if (!ec) { + // ec == 0 --> timer expired + int ret = forwarder_interface_.getMainListenerPort(); + if (ret <= 0) { + // Since without the main listener of the forwarder the proxy cannot + // work, we can stop the program here until we get the listener port. + TRANSPORT_LOGE( + "Could not retrieve main listener port from the forwarder. " + "Retrying."); + + timer_.expires_from_now(std::chrono::milliseconds(RETRY_INTERVAL)); + timer_.async_wait(std::bind(&ForwarderConfig::doGetMainListener, this, + std::placeholders::_1)); + } else { + timer_.cancel(); + retx_count_ = 0; + hicn_listen_port_ = uint16_t(ret); + listener_retrieved_callback_(std::make_error_code(std::errc(0))); + } + } else { + TRANSPORT_LOGI("Timer for retrieving main hicn listener canceled."); + } + } + + template <typename Callback> + TRANSPORT_ALWAYS_INLINE bool parseHicnHeader(std::string& header, + Callback&& callback) { + std::stringstream ss(header); + route_info_t* ret = new route_info_t(); + std::string port_string; + + while (ss.good()) { + std::string substr; + getline(ss, substr, ','); + + if (TRANSPORT_EXPECT_FALSE(substr.empty())) { + continue; + } + + utils::trim(substr); + auto it = std::find_if(substr.begin(), substr.end(), + [](int ch) { return ch == '='; }); + if (it != std::end(substr)) { + auto key = std::string(substr.begin(), it); + auto value = std::string(it + 1, substr.end()); + + if (key == server_header) { + ret->remote_addr = value; + } else if (key == prefix_header) { + auto it = std::find_if(value.begin(), value.end(), + [](int ch) { return ch == '/'; }); + + if (it != std::end(value)) { + ret->route_addr = std::string(value.begin(), it); + ret->route_len = std::stoul(std::string(it + 1, value.end())); + } else { + return false; + } + } else if (key == port_header) { + ret->remote_port = std::stoul(value); + port_string = value; + } else { + // Header not recognized + return false; + } + } + } + + /* + * Resolve server address + */ + auto results = + resolver_.resolve({ret->remote_addr, port_string, + asio::ip::resolver_query_base::numeric_service}); + +#if ((ASIO_VERSION / 100 % 1000) < 12) + asio::ip::udp::resolver::iterator end; + auto& it = results; + while (it != end) { +#else + for (auto it = results.begin(); it != results.end(); + it++) { +#endif + if (it->endpoint().address().is_v4()) { + // Use this v4 address to configure the forwarder. + ret->remote_addr = it->endpoint().address().to_string(); + ret->family = AF_INET; + forwarder_interface_.createFaceAndRoute( + RouteInfoPtr(ret), + [callback = std::forward<Callback>(callback)]( + uint32_t route_id, bool result) { callback(result); }); + + return true; + } +#if ((ASIO_VERSION / 100 % 1000) < 12) + it++; +#endif + } + + return false; + } + + private: + ForwarderInterface forwarder_interface_; + asio::ip::udp::resolver resolver_; + std::uint32_t retx_count_; + asio::steady_timer timer_; + uint16_t hicn_listen_port_; + ListenerRetrievedCallback listener_retrieved_callback_; +}; // namespace transport + +} // namespace transport
\ No newline at end of file diff --git a/apps/http-proxy/src/forwarder_interface.cc b/apps/http-proxy/src/forwarder_interface.cc new file mode 100644 index 000000000..105d5a8e9 --- /dev/null +++ b/apps/http-proxy/src/forwarder_interface.cc @@ -0,0 +1,262 @@ +/* + * Copyright (c) 2020 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "forwarder_interface.h" + +#include <arpa/inet.h> +#include <hicn/transport/utils/log.h> + +#include <chrono> +#include <iostream> +#include <thread> +#include <unordered_set> + +namespace transport { + +ForwarderInterface::~ForwarderInterface() {} + +int ForwarderInterface::connectToForwarder() { + sock_ = hc_sock_create(); + if (!sock_) return -1; + + if (hc_sock_connect(sock_) < 0) { + hc_sock_free(sock_); + sock_ = nullptr; + return -1; + } + + return 0; +} + +void ForwarderInterface::close() { + internal_ioservice_.post([this]() { + work_.reset(); + if (sock_) { + hc_sock_free(sock_); + sock_ = nullptr; + } + }); + + if (thread_->joinable()) { + thread_->join(); + } +} + +void ForwarderInterface::removeConnectedUserNow(uint32_t route_id) { + internalRemoveConnectedUser(route_id); +} + +void ForwarderInterface::scheduleRemoveConnectedUser(uint32_t route_id) { + internal_ioservice_.post( + [this, route_id]() { internalRemoveConnectedUser(route_id); }); +} + +int32_t ForwarderInterface::getMainListenerPort() { + if (!sock_) return -1; + + hc_data_t *data; + if (hc_listener_list(sock_, &data) < 0) return -1; + + int ret = -1; + foreach_listener(l, data) { + std::string interface = std::string(l->interface_name); + if (interface.compare("lo") != 0) { + ret = l->local_port; + break; + } + } + + hc_data_free(data); + return ret; +} + +void ForwarderInterface::internalRemoveConnectedUser(uint32_t route_id) { + auto it = route_status_.find(route_id); + if (it == route_status_.end()) return; + + if (!sock_) return; + + // remove route + hc_data_t *data; + if (hc_route_list(sock_, &data) < 0) return; + + std::vector<hc_route_t *> routes_to_remove; + foreach_route(r, data) { + char remote_addr[INET6_ADDRSTRLEN]; + int ret = ip_address_ntop(&r->remote_addr, remote_addr, r->len, r->family); + if (ret < 0) continue; + + std::string route_addr(remote_addr); + if (route_addr.compare(it->second->route_addr) == 0 && + r->len == it->second->route_len) { + // route found + routes_to_remove.push_back(r); + } + } + + route_status_.erase(it); + + if (routes_to_remove.size() == 0) { + // nothing to do here + hc_data_free(data); + return; + } + + std::unordered_set<uint32_t> connids_to_remove; + for (unsigned i = 0; i < routes_to_remove.size(); i++) { + connids_to_remove.insert(routes_to_remove[i]->face_id); + if (hc_route_delete(sock_, routes_to_remove[i]) < 0) { + TRANSPORT_LOGE("Error removing route from forwarder."); + } + } + + // remove connection + if (hc_connection_list(sock_, &data) < 0) { + hc_data_free(data); + return; + } + + // collects pointerst to the connections using the conn IDs + std::vector<hc_connection_t *> conns_to_remove; + foreach_connection(c, data) { + if (connids_to_remove.find(c->id) != connids_to_remove.end()) { + // conn found + conns_to_remove.push_back(c); + } + } + + if (conns_to_remove.size() == 0) { + // nothing else to do here + hc_data_free(data); + return; + } + + for (unsigned i = 0; i < conns_to_remove.size(); i++) { + if (hc_connection_delete(sock_, conns_to_remove[i]) < 0) { + TRANSPORT_LOGE("Error removing connection from forwarder."); + } + } + + hc_data_free(data); +} + +void ForwarderInterface::internalCreateFaceAndRoute(RouteInfoPtr route_info, + uint8_t max_try, + asio::steady_timer *timer, + SetRouteCallback callback) { + int ret = tryToCreateFaceAndRoute(route_info.get()); + + if (ret < 0 && max_try > 0) { + max_try--; + timer->expires_from_now(std::chrono::milliseconds(500)); + timer->async_wait([this, _route_info = std::move(route_info), max_try, + timer, callback](std::error_code ec) { + if (ec) return; + internalCreateFaceAndRoute(std::move(_route_info), max_try, timer, + std::move(callback)); + }); + return; + } + + if (max_try == 0 && ret < 0) { + pending_add_route_counter_--; + external_ioservice_.post([callback]() { callback(false, ~0); }); + } else { + pending_add_route_counter_--; + route_status_[route_id_] = std::move(route_info); + external_ioservice_.post( + [route_id = route_id_, callback]() { callback(route_id, true); }); + route_id_++; + } + + delete timer; +} + +int ForwarderInterface::tryToCreateFaceAndRoute(route_info_t *route_info) { + if (!sock_) return -1; + + hc_data_t *data; + if (hc_listener_list(sock_, &data) < 0) { + return -1; + } + + bool found = false; + uint32_t face_id; + + foreach_listener(l, data) { + std::string interface = std::string(l->interface_name); + if (interface.compare("lo") != 0) { + found = true; + + ip_address_t remote_ip; + if (ip_address_pton(route_info->remote_addr.c_str(), &remote_ip) < 0) { + hc_data_free(data); + return -1; + } + + hc_face_t face; + memset(&face, 0, sizeof(hc_face_t)); + + face.face.type = FACE_TYPE_UDP; + face.face.family = route_info->family; + face.face.local_addr = l->local_addr; + face.face.remote_addr = remote_ip; + face.face.local_port = l->local_port; + face.face.remote_port = route_info->remote_port; + + if (netdevice_set_name(&face.face.netdevice, l->interface_name) < 0) { + hc_data_free(data); + return -1; + } + + if (hc_face_create(sock_, &face) < 0) { + hc_data_free(data); + return -1; + } + + face_id = face.id; + break; + } + } + + if (!found) { + hc_data_free(data); + return -1; + } + + ip_address_t route_ip; + hc_route_t route; + + if (ip_address_pton(route_info->route_addr.c_str(), &route_ip) < 0) { + hc_data_free(data); + return -1; + } + + route.face_id = face_id; + route.family = AF_INET6; + route.remote_addr = route_ip; + route.len = route_info->route_len; + route.cost = 1; + + if (hc_route_create(sock_, &route) < 0) { + hc_data_free(data); + return -1; + } + + hc_data_free(data); + return 0; +} + +} // namespace transport diff --git a/apps/http-proxy/src/forwarder_interface.h b/apps/http-proxy/src/forwarder_interface.h new file mode 100644 index 000000000..ee0354d32 --- /dev/null +++ b/apps/http-proxy/src/forwarder_interface.h @@ -0,0 +1,102 @@ +/* + * Copyright (c) 2020 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +extern "C" { +#include <hicn/ctrl/api.h> +#include <hicn/util/ip_address.h> +} + +#include <asio.hpp> +#include <asio/steady_timer.hpp> +#include <functional> +#include <thread> +#include <unordered_map> + +namespace transport { + +typedef std::function<void(uint32_t, bool)> SetRouteCallback; + +struct route_info_t { + int family; + std::string remote_addr; + uint16_t remote_port; + std::string route_addr; + uint8_t route_len; +}; + +using RouteInfoPtr = std::shared_ptr<route_info_t>; + +class ForwarderInterface { + public: + ForwarderInterface(asio::io_service &io_service) + : external_ioservice_(io_service), + work_(std::make_unique<asio::io_service::work>(internal_ioservice_)), + sock_(nullptr), + thread_(std::make_unique<std::thread>( + [this]() { internal_ioservice_.run(); })), + check_routes_timer_(nullptr), + pending_add_route_counter_(0), + route_id_(0) {} + + ~ForwarderInterface(); + + int connectToForwarder(); + + void removeConnectedUserNow(uint32_t route_id); + + // to be called at the server + // at the client this creates a race condition + // and the program enters in a loop + void scheduleRemoveConnectedUser(uint32_t route_id); + + template <typename Callback> + void createFaceAndRoute(RouteInfoPtr &&route_info, Callback &&callback) { + internal_ioservice_.post([this, _route_info = std::move(route_info), + _callback = std::forward<Callback>(callback)]() { + pending_add_route_counter_++; + uint8_t max_try = 5; + auto timer = new asio::steady_timer(internal_ioservice_); + internalCreateFaceAndRoute(std::move(_route_info), max_try, timer, + std::move(_callback)); + }); + } + + int32_t getMainListenerPort(); + + void close(); + + private: + void internalRemoveConnectedUser(uint32_t route_id); + + void internalCreateFaceAndRoute(RouteInfoPtr route_info, uint8_t max_try, + asio::steady_timer *timer, + SetRouteCallback callback); + + int tryToCreateFaceAndRoute(route_info_t *route_info); + + asio::io_service &external_ioservice_; + asio::io_service internal_ioservice_; + std::unique_ptr<asio::io_service::work> work_; + hc_sock_t *sock_; + std::unique_ptr<std::thread> thread_; + std::unordered_map<uint32_t, RouteInfoPtr> route_status_; + std::unique_ptr<asio::steady_timer> check_routes_timer_; + uint32_t pending_add_route_counter_; + uint32_t route_id_; +}; + +} // namespace transport
\ No newline at end of file diff --git a/apps/http-proxy/src/http_proxy.cc b/apps/http-proxy/src/http_proxy.cc index 18e9bf727..98720d8d2 100644 --- a/apps/http-proxy/src/http_proxy.cc +++ b/apps/http-proxy/src/http_proxy.cc @@ -17,6 +17,7 @@ #include <hicn/transport/core/interest.h> #include <hicn/transport/utils/log.h> +#include <hicn/transport/utils/string_utils.h> #include "utils.h" @@ -47,6 +48,11 @@ class HTTPClientConnectionCallback : interface::ConsumerSocket::ReadCallback { (ConsumerInterestCallback)std::bind( &HTTPClientConnectionCallback::processLeavingInterest, this, std::placeholders::_1, std::placeholders::_2)); + consumer_.setSocketOption( + ConsumerCallbacksOptions::INTEREST_RETRANSMISSION, + (ConsumerInterestCallback)std::bind( + &HTTPClientConnectionCallback::processInterestRetx, this, + std::placeholders::_1, std::placeholders::_2)); consumer_.connect(); } @@ -55,7 +61,8 @@ class HTTPClientConnectionCallback : interface::ConsumerSocket::ReadCallback { std::move(socket), std::bind(&HTTPClientConnectionCallback::readDataFromTcp, this, std::placeholders::_1, std::placeholders::_2, - std::placeholders::_3, std::placeholders::_4), + std::placeholders::_3, std::placeholders::_4, + std::placeholders::_5), [this](asio::ip::tcp::socket& socket) -> bool { try { std::string remote_address = @@ -68,6 +75,7 @@ class HTTPClientConnectionCallback : interface::ConsumerSocket::ReadCallback { } consumer_.stop(); + request_buffer_queue_.clear(); tcp_receiver_.onClientDisconnect(this); return false; }); @@ -78,13 +86,13 @@ class HTTPClientConnectionCallback : interface::ConsumerSocket::ReadCallback { private: void consumeNextRequest() { if (request_buffer_queue_.size() == 0) { - // No additiona requests to process. + TRANSPORT_LOGD("No additional requests to process."); return; } - auto& buffer = request_buffer_queue_.front(); + auto& buffer = request_buffer_queue_.front().second; uint64_t request_hash = - utils::hash::fnv64_buf(buffer->data(), buffer->length()); + utils::hash::fnv64_buf(buffer.data(), buffer.size()); std::stringstream name; name << prefix_hash_.substr(0, prefix_hash_.length() - 2); @@ -105,23 +113,34 @@ class HTTPClientConnectionCallback : interface::ConsumerSocket::ReadCallback { // tcp callbacks void readDataFromTcp(const uint8_t* data, std::size_t size, bool is_last, - bool headers) { + bool headers, Metadata* metadata) { if (headers) { // Add the request to the request queue - tmp_buffer_ = utils::MemBuf::copyBuffer(data, size); + RequestMetadata* _metadata = reinterpret_cast<RequestMetadata*>(metadata); + tmp_buffer_ = std::make_pair(utils::MemBuf::copyBuffer(data, size), + _metadata->path); + if (TRANSPORT_EXPECT_FALSE( + _metadata->path.compare("/isHicnProxyOn") == 0 && is_last)) { + /** + * It seems this request is for us. + * Get hicn parameters. + */ + processClientRequest(_metadata); + return; + } } else { // Append payload chunk to last request added. Here we are assuming // HTTP/1.1. - tmp_buffer_->prependChain(utils::MemBuf::copyBuffer(data, size)); + tmp_buffer_.first->prependChain(utils::MemBuf::copyBuffer(data, size)); } current_size_ += size; if (is_last) { - TRANSPORT_LOGD( - "Request received: %s", - std::string((const char*)tmp_buffer_->data(), tmp_buffer_->length()) - .c_str()); + TRANSPORT_LOGD("Request received: %s", + std::string((const char*)tmp_buffer_.first->data(), + tmp_buffer_.first->length()) + .c_str()); if (current_size_ < 1400) { request_buffer_queue_.emplace_back(std::move(tmp_buffer_)); } else { @@ -134,7 +153,8 @@ class HTTPClientConnectionCallback : interface::ConsumerSocket::ReadCallback { if (!consumer_.isRunning()) { TRANSPORT_LOGD( - "Consumer stopped, triggering consume from TCP session handler.."); + "Consumer stopped, triggering consume from TCP session " + "handler.."); consumeNextRequest(); } @@ -146,9 +166,17 @@ class HTTPClientConnectionCallback : interface::ConsumerSocket::ReadCallback { void processLeavingInterest(interface::ConsumerSocket& c, const core::Interest& interest) { + if (interest.getName().getSuffix() == 0 && interest.payloadSize() == 0) { + Interest& int2 = const_cast<Interest&>(interest); + int2.appendPayload(request_buffer_queue_.front().first->clone()); + } + } + + void processInterestRetx(interface::ConsumerSocket& c, + const core::Interest& interest) { if (interest.payloadSize() == 0) { Interest& int2 = const_cast<Interest&>(interest); - int2.appendPayload(request_buffer_queue_.front()->clone()); + int2.appendPayload(request_buffer_queue_.front().first->clone()); } } @@ -174,14 +202,55 @@ class HTTPClientConnectionCallback : interface::ConsumerSocket::ReadCallback { consumeNextRequest(); } + void processClientRequest(RequestMetadata* metadata) { + auto it = metadata->headers.find("hicn"); + if (it == metadata->headers.end()) { + /* + * Probably it is an OPTION message for access control. + * Let's grant it! + */ + if (metadata->method == "OPTIONS") { + session_->send( + (const uint8_t*)HTTPMessageFastParser::http_cors, + std::strlen(HTTPMessageFastParser::http_cors), [this]() { + auto& socket = session_->socket_; + TRANSPORT_LOGI( + "Sent OPTIONS to client %s:%d", + socket.remote_endpoint().address().to_string().c_str(), + socket.remote_endpoint().port()); + }); + } + } else { + tcp_receiver_.parseHicnHeader(it->second, [this](bool result) { + const char* reply = nullptr; + if (result) { + reply = HTTPMessageFastParser::http_ok; + } else { + reply = HTTPMessageFastParser::http_failed; + } + + /* Route created. Send back a 200 OK to client */ + session_->send( + (const uint8_t*)reply, std::strlen(reply), [this, result]() { + auto& socket = session_->socket_; + TRANSPORT_LOGI( + "Sent %d response to client %s:%d", result, + socket.remote_endpoint().address().to_string().c_str(), + socket.remote_endpoint().port()); + }); + }); + } + } + private: TcpReceiver& tcp_receiver_; utils::EventThread& thread_; std::string prefix_hash_; ConsumerSocket consumer_; std::unique_ptr<HTTPSession> session_; - std::deque<std::unique_ptr<utils::MemBuf>> request_buffer_queue_; - std::unique_ptr<utils::MemBuf> tmp_buffer_; + std::deque<std::pair<std::unique_ptr<utils::MemBuf>, std::string>> + request_buffer_queue_; + std::pair<std::unique_ptr<utils::MemBuf>, std::string> tmp_buffer_; std::size_t current_size_; }; @@ -192,11 +261,17 @@ TcpReceiver::TcpReceiver(std::uint16_t port, const std::string& prefix, std::bind(&TcpReceiver::onNewConnection, this, std::placeholders::_1)), prefix_(prefix), - ipv6_first_word_(ipv6_first_word) { - for (int i = 0; i < 10; i++) { - http_clients_.emplace_back(new HTTPClientConnectionCallback( - *this, thread_, prefix, ipv6_first_word)); - } + ipv6_first_word_(ipv6_first_word), + forwarder_config_(thread_.getIoService(), [this](std::error_code ec) { + if (!ec) { + listener_.doAccept(); + for (int i = 0; i < 10; i++) { + http_clients_.emplace_back(new HTTPClientConnectionCallback( + *this, thread_, prefix_, ipv6_first_word_)); + } + } + }) { + forwarder_config_.tryToConnectToForwarder(); } void TcpReceiver::onClientDisconnect(HTTPClientConnectionCallback* client) { diff --git a/apps/http-proxy/src/http_proxy.h b/apps/http-proxy/src/http_proxy.h index 6fa394e28..aa47d6a92 100644 --- a/apps/http-proxy/src/http_proxy.h +++ b/apps/http-proxy/src/http_proxy.h @@ -18,6 +18,7 @@ #include <hicn/transport/interfaces/socket_consumer.h> #include <hicn/transport/utils/event_thread.h> +#include "forwarder_config.h" #include "http_session.h" #include "icn_receiver.h" @@ -37,11 +38,10 @@ class TcpListener { socket_(io_service), #endif callback_(callback) { - do_accept(); } - private: - void do_accept() { + public: + void doAccept() { #if ((ASIO_VERSION / 100 % 1000) >= 12) acceptor_.async_accept( [this](std::error_code ec, asio::ip::tcp::socket socket) { @@ -53,7 +53,7 @@ class TcpListener { callback_(std::move(socket)); } - do_accept(); + doAccept(); }); } @@ -87,11 +87,18 @@ class TcpReceiver : public Receiver { void onNewConnection(asio::ip::tcp::socket&& socket); void onClientDisconnect(HTTPClientConnectionCallback* client); + template <typename Callback> + void parseHicnHeader(std::string& hicn_header, Callback&& callback) { + forwarder_config_.parseHicnHeader(hicn_header, + std::forward<Callback>(callback)); + } + TcpListener listener_; std::string prefix_; std::string ipv6_first_word_; std::deque<HTTPClientConnectionCallback*> http_clients_; std::unordered_set<HTTPClientConnectionCallback*> used_http_clients_; + ForwarderConfig forwarder_config_; }; class IcnReceiver : public Receiver { diff --git a/apps/http-proxy/src/http_session.cc b/apps/http-proxy/src/http_session.cc index ff5063617..760539fe0 100644 --- a/apps/http-proxy/src/http_session.cc +++ b/apps/http-proxy/src/http_session.cc @@ -20,21 +20,19 @@ #include <iostream> -#include "HTTP1.xMessageFastParser.h" - namespace transport { HTTPSession::HTTPSession(asio::io_service &io_service, std::string &ip_address, std::string &port, ContentReceivedCallback receive_callback, OnConnectionClosed on_connection_closed_callback, - bool reverse) + bool client) : io_service_(io_service), socket_(io_service_), resolver_(io_service_), endpoint_iterator_(resolver_.resolve({ip_address, port})), timer_(io_service), - reverse_(reverse), + reverse_(client), is_reconnection_(false), data_available_(false), content_length_(0), @@ -44,13 +42,20 @@ HTTPSession::HTTPSession(asio::io_service &io_service, std::string &ip_address, on_connection_closed_callback_(on_connection_closed_callback) { input_buffer_.prepare(buffer_size + 2048); state_ = ConnectorState::CONNECTING; + + if (reverse_) { + header_info_ = std::make_unique<RequestMetadata>(); + } else { + header_info_ = std::make_unique<ResponseMetadata>(); + } + doConnect(); } HTTPSession::HTTPSession(asio::ip::tcp::socket socket, ContentReceivedCallback receive_callback, OnConnectionClosed on_connection_closed_callback, - bool reverse) + bool client) : #if ((ASIO_VERSION / 100 % 1000) < 12) io_service_(socket.get_io_service()), @@ -60,7 +65,7 @@ HTTPSession::HTTPSession(asio::ip::tcp::socket socket, socket_(std::move(socket)), resolver_(io_service_), timer_(io_service_), - reverse_(reverse), + reverse_(client), is_reconnection_(false), data_available_(false), content_length_(0), @@ -72,6 +77,12 @@ HTTPSession::HTTPSession(asio::ip::tcp::socket socket, state_ = ConnectorState::CONNECTED; asio::ip::tcp::no_delay noDelayOption(true); socket_.set_option(noDelayOption); + + if (reverse_) { + header_info_ = std::make_unique<RequestMetadata>(); + } else { + header_info_ = std::make_unique<ResponseMetadata>(); + } doReadHeader(); } @@ -79,10 +90,15 @@ HTTPSession::~HTTPSession() {} void HTTPSession::send(const uint8_t *packet, std::size_t len, ContentSentCallback &&content_sent) { - asio::async_write( - socket_, asio::buffer(packet, len), - [content_sent = std::move(content_sent)]( - std::error_code ec, std::size_t /*length*/) { content_sent(); }); + io_service_.dispatch([this, packet, len, content_sent]() { + asio::async_write(socket_, asio::buffer(packet, len), + [content_sent = std::move(content_sent)]( + std::error_code ec, std::size_t /*length*/) { + if (!ec) { + content_sent(); + } + }); + }); } void HTTPSession::send(utils::MemBuf *buffer, @@ -139,7 +155,7 @@ void HTTPSession::handleRead(std::error_code ec, std::size_t length) { asio::buffer_cast<const uint8_t *>(input_buffer_.data()); bool is_last = chunked_ ? (is_last_chunk_ ? !content_length_ : false) : !content_length_; - receive_callback_(buffer, input_buffer_.size(), is_last, false); + receive_callback_(buffer, input_buffer_.size(), is_last, false, nullptr); input_buffer_.consume(input_buffer_.size()); if (!content_length_) { @@ -182,7 +198,7 @@ void HTTPSession::doReadBody(std::size_t body_size, const uint8_t *buffer = asio::buffer_cast<const uint8_t *>(input_buffer_.data()); receive_callback_(buffer, body_size, chunked_ ? is_last_chunk_ : !to_read, - false); + false, nullptr); input_buffer_.consume(body_size); } @@ -220,8 +236,10 @@ void HTTPSession::doReadHeader() { if (TRANSPORT_EXPECT_TRUE(!ec)) { const uint8_t *buffer = asio::buffer_cast<const uint8_t *>(input_buffer_.data()); - auto headers = - HTTPMessageFastParser::getHeaders(buffer, length, reverse_); + HTTPMessageFastParser::getHeaders(buffer, length, reverse_, + header_info_.get()); + + auto &headers = header_info_->headers; // Try to get content length, if available auto it = headers.find(HTTPMessageFastParser::content_length); @@ -237,7 +255,8 @@ void HTTPSession::doReadHeader() { } } - receive_callback_(buffer, length, !size && !chunked_, true); + receive_callback_(buffer, length, !size && !chunked_, true, + header_info_.get()); auto additional_bytes = input_buffer_.size() - length; input_buffer_.consume(length); diff --git a/apps/http-proxy/src/http_session.h b/apps/http-proxy/src/http_session.h index 20ebc5d7d..05fdf62fa 100644 --- a/apps/http-proxy/src/http_session.h +++ b/apps/http-proxy/src/http_session.h @@ -17,6 +17,8 @@ #include <hicn/transport/core/packet.h> +#include "HTTP1.xMessageFastParser.h" + #define ASIO_STANDALONE #include <asio.hpp> #include <deque> @@ -26,8 +28,10 @@ namespace transport { using asio::ip::tcp; +struct Metadata; + typedef std::function<void(const uint8_t *data, std::size_t size, bool is_last, - bool headers)> + bool headers, Metadata *metadata)> ContentReceivedCallback; typedef std::function<bool(asio::ip::tcp::socket &socket)> OnConnectionClosed; typedef std::function<void()> ContentSentCallback; @@ -35,7 +39,25 @@ typedef std::deque< std::pair<std::unique_ptr<utils::MemBuf>, ContentSentCallback>> BufferQueue; +struct Metadata { + std::string http_version; + HTTPHeaders headers; +}; + +struct RequestMetadata : Metadata { + std::string method; + std::string path; +}; + +struct ResponseMetadata : Metadata { + std::string status_code; + std::string status_string; +}; + +class HTTPClientConnectionCallback; + class HTTPSession { + friend class HTTPClientConnectionCallback; static constexpr uint32_t buffer_size = 1024 * 512; enum class ConnectorState { @@ -47,11 +69,11 @@ class HTTPSession { public: HTTPSession(asio::io_service &io_service, std::string &ip_address, std::string &port, ContentReceivedCallback receive_callback, - OnConnectionClosed on_reconnect_callback, bool reverse = false); + OnConnectionClosed on_reconnect_callback, bool client = false); HTTPSession(asio::ip::tcp::socket socket, ContentReceivedCallback receive_callback, - OnConnectionClosed on_reconnect_callback, bool reverse = true); + OnConnectionClosed on_reconnect_callback, bool client = true); ~HTTPSession(); @@ -104,6 +126,9 @@ class HTTPSession { ContentReceivedCallback receive_callback_; OnConnectionClosed on_connection_closed_callback_; + // HTTP headers + std::unique_ptr<Metadata> header_info_; + // Connector state ConnectorState state_; }; diff --git a/apps/http-proxy/src/icn_receiver.cc b/apps/http-proxy/src/icn_receiver.cc index 3bd5525cc..6ccd2dc31 100644 --- a/apps/http-proxy/src/icn_receiver.cc +++ b/apps/http-proxy/src/icn_receiver.cc @@ -105,11 +105,13 @@ void AsyncConsumerProducer::doReceive() { interface::ProducerCallbacksOptions::CACHE_MISS, [this](interface::ProducerSocket& producer, interface::Interest& interest) { - // core::Name n(interest.getWritableName(), true); - io_service_.post(std::bind( - &AsyncConsumerProducer::manageIncomingInterest, this, - interest.getWritableName(), interest.acquireMemBufReference(), - interest.getPayload().release())); + if (interest.payloadSize() > 0) { + // Interest may contain http request + io_service_.post(std::bind( + &AsyncConsumerProducer::manageIncomingInterest, this, + interest.getWritableName(), interest.acquireMemBufReference(), + interest.getPayload().release())); + } }); producer_socket_.connect(); @@ -125,7 +127,7 @@ void AsyncConsumerProducer::manageIncomingInterest( if (_it != _end) { if (_it->second.second) { TRANSPORT_LOGD( - "Content is in production, interest will be satisfied shortly."); + "Content is in production, interests will be satisfied shortly."); return; } |