aboutsummaryrefslogtreecommitdiffstats
path: root/apps/http-proxy
diff options
context:
space:
mode:
Diffstat (limited to 'apps/http-proxy')
-rw-r--r--apps/http-proxy/CMakeLists.txt16
-rw-r--r--apps/http-proxy/src/HTTP1.xMessageFastParser.cc43
-rw-r--r--apps/http-proxy/src/HTTP1.xMessageFastParser.h30
-rw-r--r--apps/http-proxy/src/forwarder_config.h191
-rw-r--r--apps/http-proxy/src/forwarder_interface.cc262
-rw-r--r--apps/http-proxy/src/forwarder_interface.h102
-rw-r--r--apps/http-proxy/src/http_proxy.cc115
-rw-r--r--apps/http-proxy/src/http_proxy.h15
-rw-r--r--apps/http-proxy/src/http_session.cc49
-rw-r--r--apps/http-proxy/src/http_session.h31
-rw-r--r--apps/http-proxy/src/icn_receiver.cc14
11 files changed, 792 insertions, 76 deletions
diff --git a/apps/http-proxy/CMakeLists.txt b/apps/http-proxy/CMakeLists.txt
index 13839dbf7..44f6deea6 100644
--- a/apps/http-proxy/CMakeLists.txt
+++ b/apps/http-proxy/CMakeLists.txt
@@ -24,17 +24,12 @@ if (NOT CMAKE_BUILD_TYPE)
set(CMAKE_BUILD_TYPE "Release")
endif ()
-include_directories(
- SYSTEM
- ${CMAKE_BINARY_DIR}
- ${LIB${TRANSPORT_LIBRARY}_INCLUDE_DIR}
-)
-
set(LIB_SOURCE_FILES
src/http_session.cc
src/http_proxy.cc
src/HTTP1.xMessageFastParser.cc
src/icn_receiver.cc
+ src/forwarder_interface.cc
)
set(LIB_SERVER_HEADER_FILES
@@ -42,6 +37,8 @@ set(LIB_SERVER_HEADER_FILES
src/http_session.h
src/http_proxy.h
src/HTTP1.xMessageFastParser.h
+ src/forwarder_interface.h
+ src/forwarder_config.h
)
set(APP_SOURCE_FILES
@@ -51,14 +48,19 @@ set(APP_SOURCE_FILES
set(LIBHTTP_PROXY hicnhttpproxy)
set(LIBHTTP_PROXY_STATIC ${LIBHTTP_PROXY}.static)
+list(APPEND COMPILER_DEFINITIONS
+ -DWITH_POLICY
+)
+
build_library(${LIBHTTP_PROXY}
STATIC
SOURCES ${LIB_SOURCE_FILES}
LINK_LIBRARIES ${LIBRARIES}
DEPENDS ${DEPENDENCIES}
- INCLUDE_DIRS ${LIBTRANSPORT_INCLUDE_DIRS}
+ INCLUDE_DIRS ${LIBTRANSPORT_INCLUDE_DIRS} ${LIBHICNCTRL_INCLUDE_DIRS}
COMPONENT ${HICN_APPS}
LINK_FLAGS ${LINK_FLAGS}
+ DEFINITIONS ${COMPILER_DEFINITIONS}
)
if (NOT DISABLE_EXECUTABLES)
diff --git a/apps/http-proxy/src/HTTP1.xMessageFastParser.cc b/apps/http-proxy/src/HTTP1.xMessageFastParser.cc
index d1271ebdf..ea942a463 100644
--- a/apps/http-proxy/src/HTTP1.xMessageFastParser.cc
+++ b/apps/http-proxy/src/HTTP1.xMessageFastParser.cc
@@ -22,36 +22,41 @@
#include <experimental/functional>
#include <iostream>
+#include "http_session.h"
+
+constexpr char HTTPMessageFastParser::http_ok[];
+constexpr char HTTPMessageFastParser::http_cors[];
+constexpr char HTTPMessageFastParser::http_failed[];
+
std::string HTTPMessageFastParser::numbers = "0123456789";
-std::string HTTPMessageFastParser::content_length = "Content-Length";
-std::string HTTPMessageFastParser::transfer_encoding = "Transfer-Encoding";
+std::string HTTPMessageFastParser::content_length = "content-length";
+std::string HTTPMessageFastParser::transfer_encoding = "transfer-encoding";
std::string HTTPMessageFastParser::chunked = "chunked";
-std::string HTTPMessageFastParser::cache_control = "Cache-Control";
+std::string HTTPMessageFastParser::cache_control = "cache-control";
std::string HTTPMessageFastParser::mpd = "mpd";
-std::string HTTPMessageFastParser::connection = "Connection";
+std::string HTTPMessageFastParser::connection = "connection";
std::string HTTPMessageFastParser::separator = "\r\n\r\n";
-HTTPHeaders HTTPMessageFastParser::getHeaders(const uint8_t *headers,
- std::size_t length,
- bool request) {
- HTTPHeaders ret;
- std::string http_version;
-
+void HTTPMessageFastParser::getHeaders(const uint8_t *headers,
+ std::size_t length, bool request,
+ transport::Metadata *metadata) {
if (request) {
- std::string method;
- std::string url;
+ transport::RequestMetadata *_metadata =
+ (transport::RequestMetadata *)(metadata);
- if (transport::http::HTTPRequest::parseHeaders(headers, length, ret,
- http_version, method, url)) {
- return ret;
+ if (transport::http::HTTPRequest::parseHeaders(
+ headers, length, _metadata->headers, _metadata->http_version,
+ _metadata->method, _metadata->path)) {
+ return;
}
} else {
- std::string status_code;
- std::string status_string;
+ transport::ResponseMetadata *_metadata =
+ (transport::ResponseMetadata *)(metadata);
if (transport::http::HTTPResponse::parseHeaders(
- headers, length, ret, http_version, status_code, status_string)) {
- return ret;
+ headers, length, _metadata->headers, _metadata->http_version,
+ _metadata->status_code, _metadata->status_string)) {
+ return;
}
}
diff --git a/apps/http-proxy/src/HTTP1.xMessageFastParser.h b/apps/http-proxy/src/HTTP1.xMessageFastParser.h
index 0ad38b9b6..7c035c83b 100644
--- a/apps/http-proxy/src/HTTP1.xMessageFastParser.h
+++ b/apps/http-proxy/src/HTTP1.xMessageFastParser.h
@@ -22,10 +22,36 @@
using transport::http::HTTPHeaders;
+namespace transport {
+struct Metadata;
+}
+
class HTTPMessageFastParser {
public:
- static HTTPHeaders getHeaders(const uint8_t* headers, std::size_t length,
- bool request);
+ static constexpr char http_ok[] =
+ "HTTP/1.1 200 OK\r\n"
+ "Access-Control-Allow-Origin: *\r\n"
+ "Connection: close\r\n"
+ "Content-Length: 0\r\n\r\n";
+
+ static constexpr char http_cors[] =
+ "HTTP/1.1 200 OK\r\n"
+ "Date: %s\r\n"
+ "Connection: close\r\n"
+ "Content-Length: 0\r\n"
+ "Access-Control-Allow-Origin: *\r\n"
+ "Access-Control-Allow-Methods: GET\r\n"
+ "Access-Control-Allow-Headers: hicn\r\n"
+ "Access-Control-Max-Age: 1800\r\n\r\n";
+
+ static constexpr char http_failed[] =
+ "HTTP/1.1 500 Internal Server Error\r\n"
+ "Date: %s\r\n"
+ "Content-Length: 0\r\nConnection: "
+ "close\r\n\r\n";
+
+ static void getHeaders(const uint8_t* headers, std::size_t length,
+ bool request, transport::Metadata* metadata);
static std::size_t hasBody(const uint8_t* headers, std::size_t length);
static bool isMpdRequest(const uint8_t* headers, std::size_t length);
static uint32_t parseCacheControl(const uint8_t* headers, std::size_t length);
diff --git a/apps/http-proxy/src/forwarder_config.h b/apps/http-proxy/src/forwarder_config.h
new file mode 100644
index 000000000..96f275c9f
--- /dev/null
+++ b/apps/http-proxy/src/forwarder_config.h
@@ -0,0 +1,191 @@
+/*
+ * Copyright (c) 2020 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/portability/c_portability.h>
+#include <hicn/transport/utils/branch_prediction.h>
+#include <hicn/transport/utils/log.h>
+#include <hicn/transport/utils/string_utils.h>
+
+#include <asio.hpp>
+#include <chrono>
+#include <sstream>
+#include <string>
+
+#include "forwarder_interface.h"
+
+#define RETRY_INTERVAL 300
+
+namespace transport {
+
+static constexpr char server_header[] = "server";
+static constexpr char prefix_header[] = "prefix";
+static constexpr char port_header[] = "port";
+
+using OnForwarderConfiguredCallback = std::function<void(bool)>;
+
+class ForwarderConfig {
+ public:
+ using ListenerRetrievedCallback = std::function<void(std::error_code)>;
+
+ template <typename Callback>
+ ForwarderConfig(asio::io_service& io_service, Callback&& callback)
+ : forwarder_interface_(io_service),
+ resolver_(io_service),
+ retx_count_(0),
+ timer_(io_service),
+ hicn_listen_port_(~0),
+ listener_retrieved_callback_(std::forward<Callback>(callback)) {}
+
+ void tryToConnectToForwarder() {
+ doTryToConnectToForwarder(std::make_error_code(std::errc(0)));
+ }
+
+ void doTryToConnectToForwarder(std::error_code ec) {
+ if (!ec) {
+ // ec == 0 --> timer expired
+ int ret = forwarder_interface_.connectToForwarder();
+ if (ret < 0) {
+ // We were not able to connect to the local forwarder. Do not give up
+ // and retry.
+ TRANSPORT_LOGE("Could not connect to local forwarder. Retrying.");
+
+ timer_.expires_from_now(std::chrono::milliseconds(RETRY_INTERVAL));
+ timer_.async_wait(std::bind(&ForwarderConfig::doTryToConnectToForwarder,
+ this, std::placeholders::_1));
+ } else {
+ timer_.cancel();
+ retx_count_ = 0;
+ doGetMainListener(std::make_error_code(std::errc(0)));
+ }
+ } else {
+ TRANSPORT_LOGI("Timer for re-trying forwarder connection canceled.");
+ }
+ }
+
+ void doGetMainListener(std::error_code ec) {
+ if (!ec) {
+ // ec == 0 --> timer expired
+ int ret = forwarder_interface_.getMainListenerPort();
+ if (ret <= 0) {
+ // Since without the main listener of the forwarder the proxy cannot
+ // work, we can stop the program here until we get the listener port.
+ TRANSPORT_LOGE(
+ "Could not retrieve main listener port from the forwarder. "
+ "Retrying.");
+
+ timer_.expires_from_now(std::chrono::milliseconds(RETRY_INTERVAL));
+ timer_.async_wait(std::bind(&ForwarderConfig::doGetMainListener, this,
+ std::placeholders::_1));
+ } else {
+ timer_.cancel();
+ retx_count_ = 0;
+ hicn_listen_port_ = uint16_t(ret);
+ listener_retrieved_callback_(std::make_error_code(std::errc(0)));
+ }
+ } else {
+ TRANSPORT_LOGI("Timer for retrieving main hicn listener canceled.");
+ }
+ }
+
+ template <typename Callback>
+ TRANSPORT_ALWAYS_INLINE bool parseHicnHeader(std::string& header,
+ Callback&& callback) {
+ std::stringstream ss(header);
+ route_info_t* ret = new route_info_t();
+ std::string port_string;
+
+ while (ss.good()) {
+ std::string substr;
+ getline(ss, substr, ',');
+
+ if (TRANSPORT_EXPECT_FALSE(substr.empty())) {
+ continue;
+ }
+
+ utils::trim(substr);
+ auto it = std::find_if(substr.begin(), substr.end(),
+ [](int ch) { return ch == '='; });
+ if (it != std::end(substr)) {
+ auto key = std::string(substr.begin(), it);
+ auto value = std::string(it + 1, substr.end());
+
+ if (key == server_header) {
+ ret->remote_addr = value;
+ } else if (key == prefix_header) {
+ auto it = std::find_if(value.begin(), value.end(),
+ [](int ch) { return ch == '/'; });
+
+ if (it != std::end(value)) {
+ ret->route_addr = std::string(value.begin(), it);
+ ret->route_len = std::stoul(std::string(it + 1, value.end()));
+ } else {
+ return false;
+ }
+ } else if (key == port_header) {
+ ret->remote_port = std::stoul(value);
+ port_string = value;
+ } else {
+ // Header not recognized
+ return false;
+ }
+ }
+ }
+
+ /*
+ * Resolve server address
+ */
+ auto results =
+ resolver_.resolve({ret->remote_addr, port_string,
+ asio::ip::resolver_query_base::numeric_service});
+
+#if ((ASIO_VERSION / 100 % 1000) < 12)
+ asio::ip::udp::resolver::iterator end;
+ auto& it = results;
+ while (it != end) {
+#else
+ for (auto it = results.begin(); it != results.end();
+ it++) {
+#endif
+ if (it->endpoint().address().is_v4()) {
+ // Use this v4 address to configure the forwarder.
+ ret->remote_addr = it->endpoint().address().to_string();
+ ret->family = AF_INET;
+ forwarder_interface_.createFaceAndRoute(
+ RouteInfoPtr(ret),
+ [callback = std::forward<Callback>(callback)](
+ uint32_t route_id, bool result) { callback(result); });
+
+ return true;
+ }
+#if ((ASIO_VERSION / 100 % 1000) < 12)
+ it++;
+#endif
+ }
+
+ return false;
+ }
+
+ private:
+ ForwarderInterface forwarder_interface_;
+ asio::ip::udp::resolver resolver_;
+ std::uint32_t retx_count_;
+ asio::steady_timer timer_;
+ uint16_t hicn_listen_port_;
+ ListenerRetrievedCallback listener_retrieved_callback_;
+}; // namespace transport
+
+} // namespace transport \ No newline at end of file
diff --git a/apps/http-proxy/src/forwarder_interface.cc b/apps/http-proxy/src/forwarder_interface.cc
new file mode 100644
index 000000000..105d5a8e9
--- /dev/null
+++ b/apps/http-proxy/src/forwarder_interface.cc
@@ -0,0 +1,262 @@
+/*
+ * Copyright (c) 2020 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "forwarder_interface.h"
+
+#include <arpa/inet.h>
+#include <hicn/transport/utils/log.h>
+
+#include <chrono>
+#include <iostream>
+#include <thread>
+#include <unordered_set>
+
+namespace transport {
+
+ForwarderInterface::~ForwarderInterface() {}
+
+int ForwarderInterface::connectToForwarder() {
+ sock_ = hc_sock_create();
+ if (!sock_) return -1;
+
+ if (hc_sock_connect(sock_) < 0) {
+ hc_sock_free(sock_);
+ sock_ = nullptr;
+ return -1;
+ }
+
+ return 0;
+}
+
+void ForwarderInterface::close() {
+ internal_ioservice_.post([this]() {
+ work_.reset();
+ if (sock_) {
+ hc_sock_free(sock_);
+ sock_ = nullptr;
+ }
+ });
+
+ if (thread_->joinable()) {
+ thread_->join();
+ }
+}
+
+void ForwarderInterface::removeConnectedUserNow(uint32_t route_id) {
+ internalRemoveConnectedUser(route_id);
+}
+
+void ForwarderInterface::scheduleRemoveConnectedUser(uint32_t route_id) {
+ internal_ioservice_.post(
+ [this, route_id]() { internalRemoveConnectedUser(route_id); });
+}
+
+int32_t ForwarderInterface::getMainListenerPort() {
+ if (!sock_) return -1;
+
+ hc_data_t *data;
+ if (hc_listener_list(sock_, &data) < 0) return -1;
+
+ int ret = -1;
+ foreach_listener(l, data) {
+ std::string interface = std::string(l->interface_name);
+ if (interface.compare("lo") != 0) {
+ ret = l->local_port;
+ break;
+ }
+ }
+
+ hc_data_free(data);
+ return ret;
+}
+
+void ForwarderInterface::internalRemoveConnectedUser(uint32_t route_id) {
+ auto it = route_status_.find(route_id);
+ if (it == route_status_.end()) return;
+
+ if (!sock_) return;
+
+ // remove route
+ hc_data_t *data;
+ if (hc_route_list(sock_, &data) < 0) return;
+
+ std::vector<hc_route_t *> routes_to_remove;
+ foreach_route(r, data) {
+ char remote_addr[INET6_ADDRSTRLEN];
+ int ret = ip_address_ntop(&r->remote_addr, remote_addr, r->len, r->family);
+ if (ret < 0) continue;
+
+ std::string route_addr(remote_addr);
+ if (route_addr.compare(it->second->route_addr) == 0 &&
+ r->len == it->second->route_len) {
+ // route found
+ routes_to_remove.push_back(r);
+ }
+ }
+
+ route_status_.erase(it);
+
+ if (routes_to_remove.size() == 0) {
+ // nothing to do here
+ hc_data_free(data);
+ return;
+ }
+
+ std::unordered_set<uint32_t> connids_to_remove;
+ for (unsigned i = 0; i < routes_to_remove.size(); i++) {
+ connids_to_remove.insert(routes_to_remove[i]->face_id);
+ if (hc_route_delete(sock_, routes_to_remove[i]) < 0) {
+ TRANSPORT_LOGE("Error removing route from forwarder.");
+ }
+ }
+
+ // remove connection
+ if (hc_connection_list(sock_, &data) < 0) {
+ hc_data_free(data);
+ return;
+ }
+
+ // collects pointerst to the connections using the conn IDs
+ std::vector<hc_connection_t *> conns_to_remove;
+ foreach_connection(c, data) {
+ if (connids_to_remove.find(c->id) != connids_to_remove.end()) {
+ // conn found
+ conns_to_remove.push_back(c);
+ }
+ }
+
+ if (conns_to_remove.size() == 0) {
+ // nothing else to do here
+ hc_data_free(data);
+ return;
+ }
+
+ for (unsigned i = 0; i < conns_to_remove.size(); i++) {
+ if (hc_connection_delete(sock_, conns_to_remove[i]) < 0) {
+ TRANSPORT_LOGE("Error removing connection from forwarder.");
+ }
+ }
+
+ hc_data_free(data);
+}
+
+void ForwarderInterface::internalCreateFaceAndRoute(RouteInfoPtr route_info,
+ uint8_t max_try,
+ asio::steady_timer *timer,
+ SetRouteCallback callback) {
+ int ret = tryToCreateFaceAndRoute(route_info.get());
+
+ if (ret < 0 && max_try > 0) {
+ max_try--;
+ timer->expires_from_now(std::chrono::milliseconds(500));
+ timer->async_wait([this, _route_info = std::move(route_info), max_try,
+ timer, callback](std::error_code ec) {
+ if (ec) return;
+ internalCreateFaceAndRoute(std::move(_route_info), max_try, timer,
+ std::move(callback));
+ });
+ return;
+ }
+
+ if (max_try == 0 && ret < 0) {
+ pending_add_route_counter_--;
+ external_ioservice_.post([callback]() { callback(false, ~0); });
+ } else {
+ pending_add_route_counter_--;
+ route_status_[route_id_] = std::move(route_info);
+ external_ioservice_.post(
+ [route_id = route_id_, callback]() { callback(route_id, true); });
+ route_id_++;
+ }
+
+ delete timer;
+}
+
+int ForwarderInterface::tryToCreateFaceAndRoute(route_info_t *route_info) {
+ if (!sock_) return -1;
+
+ hc_data_t *data;
+ if (hc_listener_list(sock_, &data) < 0) {
+ return -1;
+ }
+
+ bool found = false;
+ uint32_t face_id;
+
+ foreach_listener(l, data) {
+ std::string interface = std::string(l->interface_name);
+ if (interface.compare("lo") != 0) {
+ found = true;
+
+ ip_address_t remote_ip;
+ if (ip_address_pton(route_info->remote_addr.c_str(), &remote_ip) < 0) {
+ hc_data_free(data);
+ return -1;
+ }
+
+ hc_face_t face;
+ memset(&face, 0, sizeof(hc_face_t));
+
+ face.face.type = FACE_TYPE_UDP;
+ face.face.family = route_info->family;
+ face.face.local_addr = l->local_addr;
+ face.face.remote_addr = remote_ip;
+ face.face.local_port = l->local_port;
+ face.face.remote_port = route_info->remote_port;
+
+ if (netdevice_set_name(&face.face.netdevice, l->interface_name) < 0) {
+ hc_data_free(data);
+ return -1;
+ }
+
+ if (hc_face_create(sock_, &face) < 0) {
+ hc_data_free(data);
+ return -1;
+ }
+
+ face_id = face.id;
+ break;
+ }
+ }
+
+ if (!found) {
+ hc_data_free(data);
+ return -1;
+ }
+
+ ip_address_t route_ip;
+ hc_route_t route;
+
+ if (ip_address_pton(route_info->route_addr.c_str(), &route_ip) < 0) {
+ hc_data_free(data);
+ return -1;
+ }
+
+ route.face_id = face_id;
+ route.family = AF_INET6;
+ route.remote_addr = route_ip;
+ route.len = route_info->route_len;
+ route.cost = 1;
+
+ if (hc_route_create(sock_, &route) < 0) {
+ hc_data_free(data);
+ return -1;
+ }
+
+ hc_data_free(data);
+ return 0;
+}
+
+} // namespace transport
diff --git a/apps/http-proxy/src/forwarder_interface.h b/apps/http-proxy/src/forwarder_interface.h
new file mode 100644
index 000000000..ee0354d32
--- /dev/null
+++ b/apps/http-proxy/src/forwarder_interface.h
@@ -0,0 +1,102 @@
+/*
+ * Copyright (c) 2020 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+extern "C" {
+#include <hicn/ctrl/api.h>
+#include <hicn/util/ip_address.h>
+}
+
+#include <asio.hpp>
+#include <asio/steady_timer.hpp>
+#include <functional>
+#include <thread>
+#include <unordered_map>
+
+namespace transport {
+
+typedef std::function<void(uint32_t, bool)> SetRouteCallback;
+
+struct route_info_t {
+ int family;
+ std::string remote_addr;
+ uint16_t remote_port;
+ std::string route_addr;
+ uint8_t route_len;
+};
+
+using RouteInfoPtr = std::shared_ptr<route_info_t>;
+
+class ForwarderInterface {
+ public:
+ ForwarderInterface(asio::io_service &io_service)
+ : external_ioservice_(io_service),
+ work_(std::make_unique<asio::io_service::work>(internal_ioservice_)),
+ sock_(nullptr),
+ thread_(std::make_unique<std::thread>(
+ [this]() { internal_ioservice_.run(); })),
+ check_routes_timer_(nullptr),
+ pending_add_route_counter_(0),
+ route_id_(0) {}
+
+ ~ForwarderInterface();
+
+ int connectToForwarder();
+
+ void removeConnectedUserNow(uint32_t route_id);
+
+ // to be called at the server
+ // at the client this creates a race condition
+ // and the program enters in a loop
+ void scheduleRemoveConnectedUser(uint32_t route_id);
+
+ template <typename Callback>
+ void createFaceAndRoute(RouteInfoPtr &&route_info, Callback &&callback) {
+ internal_ioservice_.post([this, _route_info = std::move(route_info),
+ _callback = std::forward<Callback>(callback)]() {
+ pending_add_route_counter_++;
+ uint8_t max_try = 5;
+ auto timer = new asio::steady_timer(internal_ioservice_);
+ internalCreateFaceAndRoute(std::move(_route_info), max_try, timer,
+ std::move(_callback));
+ });
+ }
+
+ int32_t getMainListenerPort();
+
+ void close();
+
+ private:
+ void internalRemoveConnectedUser(uint32_t route_id);
+
+ void internalCreateFaceAndRoute(RouteInfoPtr route_info, uint8_t max_try,
+ asio::steady_timer *timer,
+ SetRouteCallback callback);
+
+ int tryToCreateFaceAndRoute(route_info_t *route_info);
+
+ asio::io_service &external_ioservice_;
+ asio::io_service internal_ioservice_;
+ std::unique_ptr<asio::io_service::work> work_;
+ hc_sock_t *sock_;
+ std::unique_ptr<std::thread> thread_;
+ std::unordered_map<uint32_t, RouteInfoPtr> route_status_;
+ std::unique_ptr<asio::steady_timer> check_routes_timer_;
+ uint32_t pending_add_route_counter_;
+ uint32_t route_id_;
+};
+
+} // namespace transport \ No newline at end of file
diff --git a/apps/http-proxy/src/http_proxy.cc b/apps/http-proxy/src/http_proxy.cc
index 18e9bf727..98720d8d2 100644
--- a/apps/http-proxy/src/http_proxy.cc
+++ b/apps/http-proxy/src/http_proxy.cc
@@ -17,6 +17,7 @@
#include <hicn/transport/core/interest.h>
#include <hicn/transport/utils/log.h>
+#include <hicn/transport/utils/string_utils.h>
#include "utils.h"
@@ -47,6 +48,11 @@ class HTTPClientConnectionCallback : interface::ConsumerSocket::ReadCallback {
(ConsumerInterestCallback)std::bind(
&HTTPClientConnectionCallback::processLeavingInterest, this,
std::placeholders::_1, std::placeholders::_2));
+ consumer_.setSocketOption(
+ ConsumerCallbacksOptions::INTEREST_RETRANSMISSION,
+ (ConsumerInterestCallback)std::bind(
+ &HTTPClientConnectionCallback::processInterestRetx, this,
+ std::placeholders::_1, std::placeholders::_2));
consumer_.connect();
}
@@ -55,7 +61,8 @@ class HTTPClientConnectionCallback : interface::ConsumerSocket::ReadCallback {
std::move(socket),
std::bind(&HTTPClientConnectionCallback::readDataFromTcp, this,
std::placeholders::_1, std::placeholders::_2,
- std::placeholders::_3, std::placeholders::_4),
+ std::placeholders::_3, std::placeholders::_4,
+ std::placeholders::_5),
[this](asio::ip::tcp::socket& socket) -> bool {
try {
std::string remote_address =
@@ -68,6 +75,7 @@ class HTTPClientConnectionCallback : interface::ConsumerSocket::ReadCallback {
}
consumer_.stop();
+ request_buffer_queue_.clear();
tcp_receiver_.onClientDisconnect(this);
return false;
});
@@ -78,13 +86,13 @@ class HTTPClientConnectionCallback : interface::ConsumerSocket::ReadCallback {
private:
void consumeNextRequest() {
if (request_buffer_queue_.size() == 0) {
- // No additiona requests to process.
+ TRANSPORT_LOGD("No additional requests to process.");
return;
}
- auto& buffer = request_buffer_queue_.front();
+ auto& buffer = request_buffer_queue_.front().second;
uint64_t request_hash =
- utils::hash::fnv64_buf(buffer->data(), buffer->length());
+ utils::hash::fnv64_buf(buffer.data(), buffer.size());
std::stringstream name;
name << prefix_hash_.substr(0, prefix_hash_.length() - 2);
@@ -105,23 +113,34 @@ class HTTPClientConnectionCallback : interface::ConsumerSocket::ReadCallback {
// tcp callbacks
void readDataFromTcp(const uint8_t* data, std::size_t size, bool is_last,
- bool headers) {
+ bool headers, Metadata* metadata) {
if (headers) {
// Add the request to the request queue
- tmp_buffer_ = utils::MemBuf::copyBuffer(data, size);
+ RequestMetadata* _metadata = reinterpret_cast<RequestMetadata*>(metadata);
+ tmp_buffer_ = std::make_pair(utils::MemBuf::copyBuffer(data, size),
+ _metadata->path);
+ if (TRANSPORT_EXPECT_FALSE(
+ _metadata->path.compare("/isHicnProxyOn") == 0 && is_last)) {
+ /**
+ * It seems this request is for us.
+ * Get hicn parameters.
+ */
+ processClientRequest(_metadata);
+ return;
+ }
} else {
// Append payload chunk to last request added. Here we are assuming
// HTTP/1.1.
- tmp_buffer_->prependChain(utils::MemBuf::copyBuffer(data, size));
+ tmp_buffer_.first->prependChain(utils::MemBuf::copyBuffer(data, size));
}
current_size_ += size;
if (is_last) {
- TRANSPORT_LOGD(
- "Request received: %s",
- std::string((const char*)tmp_buffer_->data(), tmp_buffer_->length())
- .c_str());
+ TRANSPORT_LOGD("Request received: %s",
+ std::string((const char*)tmp_buffer_.first->data(),
+ tmp_buffer_.first->length())
+ .c_str());
if (current_size_ < 1400) {
request_buffer_queue_.emplace_back(std::move(tmp_buffer_));
} else {
@@ -134,7 +153,8 @@ class HTTPClientConnectionCallback : interface::ConsumerSocket::ReadCallback {
if (!consumer_.isRunning()) {
TRANSPORT_LOGD(
- "Consumer stopped, triggering consume from TCP session handler..");
+ "Consumer stopped, triggering consume from TCP session "
+ "handler..");
consumeNextRequest();
}
@@ -146,9 +166,17 @@ class HTTPClientConnectionCallback : interface::ConsumerSocket::ReadCallback {
void processLeavingInterest(interface::ConsumerSocket& c,
const core::Interest& interest) {
+ if (interest.getName().getSuffix() == 0 && interest.payloadSize() == 0) {
+ Interest& int2 = const_cast<Interest&>(interest);
+ int2.appendPayload(request_buffer_queue_.front().first->clone());
+ }
+ }
+
+ void processInterestRetx(interface::ConsumerSocket& c,
+ const core::Interest& interest) {
if (interest.payloadSize() == 0) {
Interest& int2 = const_cast<Interest&>(interest);
- int2.appendPayload(request_buffer_queue_.front()->clone());
+ int2.appendPayload(request_buffer_queue_.front().first->clone());
}
}
@@ -174,14 +202,55 @@ class HTTPClientConnectionCallback : interface::ConsumerSocket::ReadCallback {
consumeNextRequest();
}
+ void processClientRequest(RequestMetadata* metadata) {
+ auto it = metadata->headers.find("hicn");
+ if (it == metadata->headers.end()) {
+ /*
+ * Probably it is an OPTION message for access control.
+ * Let's grant it!
+ */
+ if (metadata->method == "OPTIONS") {
+ session_->send(
+ (const uint8_t*)HTTPMessageFastParser::http_cors,
+ std::strlen(HTTPMessageFastParser::http_cors), [this]() {
+ auto& socket = session_->socket_;
+ TRANSPORT_LOGI(
+ "Sent OPTIONS to client %s:%d",
+ socket.remote_endpoint().address().to_string().c_str(),
+ socket.remote_endpoint().port());
+ });
+ }
+ } else {
+ tcp_receiver_.parseHicnHeader(it->second, [this](bool result) {
+ const char* reply = nullptr;
+ if (result) {
+ reply = HTTPMessageFastParser::http_ok;
+ } else {
+ reply = HTTPMessageFastParser::http_failed;
+ }
+
+ /* Route created. Send back a 200 OK to client */
+ session_->send(
+ (const uint8_t*)reply, std::strlen(reply), [this, result]() {
+ auto& socket = session_->socket_;
+ TRANSPORT_LOGI(
+ "Sent %d response to client %s:%d", result,
+ socket.remote_endpoint().address().to_string().c_str(),
+ socket.remote_endpoint().port());
+ });
+ });
+ }
+ }
+
private:
TcpReceiver& tcp_receiver_;
utils::EventThread& thread_;
std::string prefix_hash_;
ConsumerSocket consumer_;
std::unique_ptr<HTTPSession> session_;
- std::deque<std::unique_ptr<utils::MemBuf>> request_buffer_queue_;
- std::unique_ptr<utils::MemBuf> tmp_buffer_;
+ std::deque<std::pair<std::unique_ptr<utils::MemBuf>, std::string>>
+ request_buffer_queue_;
+ std::pair<std::unique_ptr<utils::MemBuf>, std::string> tmp_buffer_;
std::size_t current_size_;
};
@@ -192,11 +261,17 @@ TcpReceiver::TcpReceiver(std::uint16_t port, const std::string& prefix,
std::bind(&TcpReceiver::onNewConnection, this,
std::placeholders::_1)),
prefix_(prefix),
- ipv6_first_word_(ipv6_first_word) {
- for (int i = 0; i < 10; i++) {
- http_clients_.emplace_back(new HTTPClientConnectionCallback(
- *this, thread_, prefix, ipv6_first_word));
- }
+ ipv6_first_word_(ipv6_first_word),
+ forwarder_config_(thread_.getIoService(), [this](std::error_code ec) {
+ if (!ec) {
+ listener_.doAccept();
+ for (int i = 0; i < 10; i++) {
+ http_clients_.emplace_back(new HTTPClientConnectionCallback(
+ *this, thread_, prefix_, ipv6_first_word_));
+ }
+ }
+ }) {
+ forwarder_config_.tryToConnectToForwarder();
}
void TcpReceiver::onClientDisconnect(HTTPClientConnectionCallback* client) {
diff --git a/apps/http-proxy/src/http_proxy.h b/apps/http-proxy/src/http_proxy.h
index 6fa394e28..aa47d6a92 100644
--- a/apps/http-proxy/src/http_proxy.h
+++ b/apps/http-proxy/src/http_proxy.h
@@ -18,6 +18,7 @@
#include <hicn/transport/interfaces/socket_consumer.h>
#include <hicn/transport/utils/event_thread.h>
+#include "forwarder_config.h"
#include "http_session.h"
#include "icn_receiver.h"
@@ -37,11 +38,10 @@ class TcpListener {
socket_(io_service),
#endif
callback_(callback) {
- do_accept();
}
- private:
- void do_accept() {
+ public:
+ void doAccept() {
#if ((ASIO_VERSION / 100 % 1000) >= 12)
acceptor_.async_accept(
[this](std::error_code ec, asio::ip::tcp::socket socket) {
@@ -53,7 +53,7 @@ class TcpListener {
callback_(std::move(socket));
}
- do_accept();
+ doAccept();
});
}
@@ -87,11 +87,18 @@ class TcpReceiver : public Receiver {
void onNewConnection(asio::ip::tcp::socket&& socket);
void onClientDisconnect(HTTPClientConnectionCallback* client);
+ template <typename Callback>
+ void parseHicnHeader(std::string& hicn_header, Callback&& callback) {
+ forwarder_config_.parseHicnHeader(hicn_header,
+ std::forward<Callback>(callback));
+ }
+
TcpListener listener_;
std::string prefix_;
std::string ipv6_first_word_;
std::deque<HTTPClientConnectionCallback*> http_clients_;
std::unordered_set<HTTPClientConnectionCallback*> used_http_clients_;
+ ForwarderConfig forwarder_config_;
};
class IcnReceiver : public Receiver {
diff --git a/apps/http-proxy/src/http_session.cc b/apps/http-proxy/src/http_session.cc
index ff5063617..760539fe0 100644
--- a/apps/http-proxy/src/http_session.cc
+++ b/apps/http-proxy/src/http_session.cc
@@ -20,21 +20,19 @@
#include <iostream>
-#include "HTTP1.xMessageFastParser.h"
-
namespace transport {
HTTPSession::HTTPSession(asio::io_service &io_service, std::string &ip_address,
std::string &port,
ContentReceivedCallback receive_callback,
OnConnectionClosed on_connection_closed_callback,
- bool reverse)
+ bool client)
: io_service_(io_service),
socket_(io_service_),
resolver_(io_service_),
endpoint_iterator_(resolver_.resolve({ip_address, port})),
timer_(io_service),
- reverse_(reverse),
+ reverse_(client),
is_reconnection_(false),
data_available_(false),
content_length_(0),
@@ -44,13 +42,20 @@ HTTPSession::HTTPSession(asio::io_service &io_service, std::string &ip_address,
on_connection_closed_callback_(on_connection_closed_callback) {
input_buffer_.prepare(buffer_size + 2048);
state_ = ConnectorState::CONNECTING;
+
+ if (reverse_) {
+ header_info_ = std::make_unique<RequestMetadata>();
+ } else {
+ header_info_ = std::make_unique<ResponseMetadata>();
+ }
+
doConnect();
}
HTTPSession::HTTPSession(asio::ip::tcp::socket socket,
ContentReceivedCallback receive_callback,
OnConnectionClosed on_connection_closed_callback,
- bool reverse)
+ bool client)
:
#if ((ASIO_VERSION / 100 % 1000) < 12)
io_service_(socket.get_io_service()),
@@ -60,7 +65,7 @@ HTTPSession::HTTPSession(asio::ip::tcp::socket socket,
socket_(std::move(socket)),
resolver_(io_service_),
timer_(io_service_),
- reverse_(reverse),
+ reverse_(client),
is_reconnection_(false),
data_available_(false),
content_length_(0),
@@ -72,6 +77,12 @@ HTTPSession::HTTPSession(asio::ip::tcp::socket socket,
state_ = ConnectorState::CONNECTED;
asio::ip::tcp::no_delay noDelayOption(true);
socket_.set_option(noDelayOption);
+
+ if (reverse_) {
+ header_info_ = std::make_unique<RequestMetadata>();
+ } else {
+ header_info_ = std::make_unique<ResponseMetadata>();
+ }
doReadHeader();
}
@@ -79,10 +90,15 @@ HTTPSession::~HTTPSession() {}
void HTTPSession::send(const uint8_t *packet, std::size_t len,
ContentSentCallback &&content_sent) {
- asio::async_write(
- socket_, asio::buffer(packet, len),
- [content_sent = std::move(content_sent)](
- std::error_code ec, std::size_t /*length*/) { content_sent(); });
+ io_service_.dispatch([this, packet, len, content_sent]() {
+ asio::async_write(socket_, asio::buffer(packet, len),
+ [content_sent = std::move(content_sent)](
+ std::error_code ec, std::size_t /*length*/) {
+ if (!ec) {
+ content_sent();
+ }
+ });
+ });
}
void HTTPSession::send(utils::MemBuf *buffer,
@@ -139,7 +155,7 @@ void HTTPSession::handleRead(std::error_code ec, std::size_t length) {
asio::buffer_cast<const uint8_t *>(input_buffer_.data());
bool is_last = chunked_ ? (is_last_chunk_ ? !content_length_ : false)
: !content_length_;
- receive_callback_(buffer, input_buffer_.size(), is_last, false);
+ receive_callback_(buffer, input_buffer_.size(), is_last, false, nullptr);
input_buffer_.consume(input_buffer_.size());
if (!content_length_) {
@@ -182,7 +198,7 @@ void HTTPSession::doReadBody(std::size_t body_size,
const uint8_t *buffer =
asio::buffer_cast<const uint8_t *>(input_buffer_.data());
receive_callback_(buffer, body_size, chunked_ ? is_last_chunk_ : !to_read,
- false);
+ false, nullptr);
input_buffer_.consume(body_size);
}
@@ -220,8 +236,10 @@ void HTTPSession::doReadHeader() {
if (TRANSPORT_EXPECT_TRUE(!ec)) {
const uint8_t *buffer =
asio::buffer_cast<const uint8_t *>(input_buffer_.data());
- auto headers =
- HTTPMessageFastParser::getHeaders(buffer, length, reverse_);
+ HTTPMessageFastParser::getHeaders(buffer, length, reverse_,
+ header_info_.get());
+
+ auto &headers = header_info_->headers;
// Try to get content length, if available
auto it = headers.find(HTTPMessageFastParser::content_length);
@@ -237,7 +255,8 @@ void HTTPSession::doReadHeader() {
}
}
- receive_callback_(buffer, length, !size && !chunked_, true);
+ receive_callback_(buffer, length, !size && !chunked_, true,
+ header_info_.get());
auto additional_bytes = input_buffer_.size() - length;
input_buffer_.consume(length);
diff --git a/apps/http-proxy/src/http_session.h b/apps/http-proxy/src/http_session.h
index 20ebc5d7d..05fdf62fa 100644
--- a/apps/http-proxy/src/http_session.h
+++ b/apps/http-proxy/src/http_session.h
@@ -17,6 +17,8 @@
#include <hicn/transport/core/packet.h>
+#include "HTTP1.xMessageFastParser.h"
+
#define ASIO_STANDALONE
#include <asio.hpp>
#include <deque>
@@ -26,8 +28,10 @@ namespace transport {
using asio::ip::tcp;
+struct Metadata;
+
typedef std::function<void(const uint8_t *data, std::size_t size, bool is_last,
- bool headers)>
+ bool headers, Metadata *metadata)>
ContentReceivedCallback;
typedef std::function<bool(asio::ip::tcp::socket &socket)> OnConnectionClosed;
typedef std::function<void()> ContentSentCallback;
@@ -35,7 +39,25 @@ typedef std::deque<
std::pair<std::unique_ptr<utils::MemBuf>, ContentSentCallback>>
BufferQueue;
+struct Metadata {
+ std::string http_version;
+ HTTPHeaders headers;
+};
+
+struct RequestMetadata : Metadata {
+ std::string method;
+ std::string path;
+};
+
+struct ResponseMetadata : Metadata {
+ std::string status_code;
+ std::string status_string;
+};
+
+class HTTPClientConnectionCallback;
+
class HTTPSession {
+ friend class HTTPClientConnectionCallback;
static constexpr uint32_t buffer_size = 1024 * 512;
enum class ConnectorState {
@@ -47,11 +69,11 @@ class HTTPSession {
public:
HTTPSession(asio::io_service &io_service, std::string &ip_address,
std::string &port, ContentReceivedCallback receive_callback,
- OnConnectionClosed on_reconnect_callback, bool reverse = false);
+ OnConnectionClosed on_reconnect_callback, bool client = false);
HTTPSession(asio::ip::tcp::socket socket,
ContentReceivedCallback receive_callback,
- OnConnectionClosed on_reconnect_callback, bool reverse = true);
+ OnConnectionClosed on_reconnect_callback, bool client = true);
~HTTPSession();
@@ -104,6 +126,9 @@ class HTTPSession {
ContentReceivedCallback receive_callback_;
OnConnectionClosed on_connection_closed_callback_;
+ // HTTP headers
+ std::unique_ptr<Metadata> header_info_;
+
// Connector state
ConnectorState state_;
};
diff --git a/apps/http-proxy/src/icn_receiver.cc b/apps/http-proxy/src/icn_receiver.cc
index 3bd5525cc..6ccd2dc31 100644
--- a/apps/http-proxy/src/icn_receiver.cc
+++ b/apps/http-proxy/src/icn_receiver.cc
@@ -105,11 +105,13 @@ void AsyncConsumerProducer::doReceive() {
interface::ProducerCallbacksOptions::CACHE_MISS,
[this](interface::ProducerSocket& producer,
interface::Interest& interest) {
- // core::Name n(interest.getWritableName(), true);
- io_service_.post(std::bind(
- &AsyncConsumerProducer::manageIncomingInterest, this,
- interest.getWritableName(), interest.acquireMemBufReference(),
- interest.getPayload().release()));
+ if (interest.payloadSize() > 0) {
+ // Interest may contain http request
+ io_service_.post(std::bind(
+ &AsyncConsumerProducer::manageIncomingInterest, this,
+ interest.getWritableName(), interest.acquireMemBufReference(),
+ interest.getPayload().release()));
+ }
});
producer_socket_.connect();
@@ -125,7 +127,7 @@ void AsyncConsumerProducer::manageIncomingInterest(
if (_it != _end) {
if (_it->second.second) {
TRANSPORT_LOGD(
- "Content is in production, interest will be satisfied shortly.");
+ "Content is in production, interests will be satisfied shortly.");
return;
}