aboutsummaryrefslogtreecommitdiffstats
path: root/apps/http-proxy/src/http_proxy.cc
diff options
context:
space:
mode:
Diffstat (limited to 'apps/http-proxy/src/http_proxy.cc')
-rw-r--r--apps/http-proxy/src/http_proxy.cc115
1 files changed, 95 insertions, 20 deletions
diff --git a/apps/http-proxy/src/http_proxy.cc b/apps/http-proxy/src/http_proxy.cc
index 18e9bf727..98720d8d2 100644
--- a/apps/http-proxy/src/http_proxy.cc
+++ b/apps/http-proxy/src/http_proxy.cc
@@ -17,6 +17,7 @@
#include <hicn/transport/core/interest.h>
#include <hicn/transport/utils/log.h>
+#include <hicn/transport/utils/string_utils.h>
#include "utils.h"
@@ -47,6 +48,11 @@ class HTTPClientConnectionCallback : interface::ConsumerSocket::ReadCallback {
(ConsumerInterestCallback)std::bind(
&HTTPClientConnectionCallback::processLeavingInterest, this,
std::placeholders::_1, std::placeholders::_2));
+ consumer_.setSocketOption(
+ ConsumerCallbacksOptions::INTEREST_RETRANSMISSION,
+ (ConsumerInterestCallback)std::bind(
+ &HTTPClientConnectionCallback::processInterestRetx, this,
+ std::placeholders::_1, std::placeholders::_2));
consumer_.connect();
}
@@ -55,7 +61,8 @@ class HTTPClientConnectionCallback : interface::ConsumerSocket::ReadCallback {
std::move(socket),
std::bind(&HTTPClientConnectionCallback::readDataFromTcp, this,
std::placeholders::_1, std::placeholders::_2,
- std::placeholders::_3, std::placeholders::_4),
+ std::placeholders::_3, std::placeholders::_4,
+ std::placeholders::_5),
[this](asio::ip::tcp::socket& socket) -> bool {
try {
std::string remote_address =
@@ -68,6 +75,7 @@ class HTTPClientConnectionCallback : interface::ConsumerSocket::ReadCallback {
}
consumer_.stop();
+ request_buffer_queue_.clear();
tcp_receiver_.onClientDisconnect(this);
return false;
});
@@ -78,13 +86,13 @@ class HTTPClientConnectionCallback : interface::ConsumerSocket::ReadCallback {
private:
void consumeNextRequest() {
if (request_buffer_queue_.size() == 0) {
- // No additiona requests to process.
+ TRANSPORT_LOGD("No additional requests to process.");
return;
}
- auto& buffer = request_buffer_queue_.front();
+ auto& buffer = request_buffer_queue_.front().second;
uint64_t request_hash =
- utils::hash::fnv64_buf(buffer->data(), buffer->length());
+ utils::hash::fnv64_buf(buffer.data(), buffer.size());
std::stringstream name;
name << prefix_hash_.substr(0, prefix_hash_.length() - 2);
@@ -105,23 +113,34 @@ class HTTPClientConnectionCallback : interface::ConsumerSocket::ReadCallback {
// tcp callbacks
void readDataFromTcp(const uint8_t* data, std::size_t size, bool is_last,
- bool headers) {
+ bool headers, Metadata* metadata) {
if (headers) {
// Add the request to the request queue
- tmp_buffer_ = utils::MemBuf::copyBuffer(data, size);
+ RequestMetadata* _metadata = reinterpret_cast<RequestMetadata*>(metadata);
+ tmp_buffer_ = std::make_pair(utils::MemBuf::copyBuffer(data, size),
+ _metadata->path);
+ if (TRANSPORT_EXPECT_FALSE(
+ _metadata->path.compare("/isHicnProxyOn") == 0 && is_last)) {
+ /**
+ * It seems this request is for us.
+ * Get hicn parameters.
+ */
+ processClientRequest(_metadata);
+ return;
+ }
} else {
// Append payload chunk to last request added. Here we are assuming
// HTTP/1.1.
- tmp_buffer_->prependChain(utils::MemBuf::copyBuffer(data, size));
+ tmp_buffer_.first->prependChain(utils::MemBuf::copyBuffer(data, size));
}
current_size_ += size;
if (is_last) {
- TRANSPORT_LOGD(
- "Request received: %s",
- std::string((const char*)tmp_buffer_->data(), tmp_buffer_->length())
- .c_str());
+ TRANSPORT_LOGD("Request received: %s",
+ std::string((const char*)tmp_buffer_.first->data(),
+ tmp_buffer_.first->length())
+ .c_str());
if (current_size_ < 1400) {
request_buffer_queue_.emplace_back(std::move(tmp_buffer_));
} else {
@@ -134,7 +153,8 @@ class HTTPClientConnectionCallback : interface::ConsumerSocket::ReadCallback {
if (!consumer_.isRunning()) {
TRANSPORT_LOGD(
- "Consumer stopped, triggering consume from TCP session handler..");
+ "Consumer stopped, triggering consume from TCP session "
+ "handler..");
consumeNextRequest();
}
@@ -146,9 +166,17 @@ class HTTPClientConnectionCallback : interface::ConsumerSocket::ReadCallback {
void processLeavingInterest(interface::ConsumerSocket& c,
const core::Interest& interest) {
+ if (interest.getName().getSuffix() == 0 && interest.payloadSize() == 0) {
+ Interest& int2 = const_cast<Interest&>(interest);
+ int2.appendPayload(request_buffer_queue_.front().first->clone());
+ }
+ }
+
+ void processInterestRetx(interface::ConsumerSocket& c,
+ const core::Interest& interest) {
if (interest.payloadSize() == 0) {
Interest& int2 = const_cast<Interest&>(interest);
- int2.appendPayload(request_buffer_queue_.front()->clone());
+ int2.appendPayload(request_buffer_queue_.front().first->clone());
}
}
@@ -174,14 +202,55 @@ class HTTPClientConnectionCallback : interface::ConsumerSocket::ReadCallback {
consumeNextRequest();
}
+ void processClientRequest(RequestMetadata* metadata) {
+ auto it = metadata->headers.find("hicn");
+ if (it == metadata->headers.end()) {
+ /*
+ * Probably it is an OPTION message for access control.
+ * Let's grant it!
+ */
+ if (metadata->method == "OPTIONS") {
+ session_->send(
+ (const uint8_t*)HTTPMessageFastParser::http_cors,
+ std::strlen(HTTPMessageFastParser::http_cors), [this]() {
+ auto& socket = session_->socket_;
+ TRANSPORT_LOGI(
+ "Sent OPTIONS to client %s:%d",
+ socket.remote_endpoint().address().to_string().c_str(),
+ socket.remote_endpoint().port());
+ });
+ }
+ } else {
+ tcp_receiver_.parseHicnHeader(it->second, [this](bool result) {
+ const char* reply = nullptr;
+ if (result) {
+ reply = HTTPMessageFastParser::http_ok;
+ } else {
+ reply = HTTPMessageFastParser::http_failed;
+ }
+
+ /* Route created. Send back a 200 OK to client */
+ session_->send(
+ (const uint8_t*)reply, std::strlen(reply), [this, result]() {
+ auto& socket = session_->socket_;
+ TRANSPORT_LOGI(
+ "Sent %d response to client %s:%d", result,
+ socket.remote_endpoint().address().to_string().c_str(),
+ socket.remote_endpoint().port());
+ });
+ });
+ }
+ }
+
private:
TcpReceiver& tcp_receiver_;
utils::EventThread& thread_;
std::string prefix_hash_;
ConsumerSocket consumer_;
std::unique_ptr<HTTPSession> session_;
- std::deque<std::unique_ptr<utils::MemBuf>> request_buffer_queue_;
- std::unique_ptr<utils::MemBuf> tmp_buffer_;
+ std::deque<std::pair<std::unique_ptr<utils::MemBuf>, std::string>>
+ request_buffer_queue_;
+ std::pair<std::unique_ptr<utils::MemBuf>, std::string> tmp_buffer_;
std::size_t current_size_;
};
@@ -192,11 +261,17 @@ TcpReceiver::TcpReceiver(std::uint16_t port, const std::string& prefix,
std::bind(&TcpReceiver::onNewConnection, this,
std::placeholders::_1)),
prefix_(prefix),
- ipv6_first_word_(ipv6_first_word) {
- for (int i = 0; i < 10; i++) {
- http_clients_.emplace_back(new HTTPClientConnectionCallback(
- *this, thread_, prefix, ipv6_first_word));
- }
+ ipv6_first_word_(ipv6_first_word),
+ forwarder_config_(thread_.getIoService(), [this](std::error_code ec) {
+ if (!ec) {
+ listener_.doAccept();
+ for (int i = 0; i < 10; i++) {
+ http_clients_.emplace_back(new HTTPClientConnectionCallback(
+ *this, thread_, prefix_, ipv6_first_word_));
+ }
+ }
+ }) {
+ forwarder_config_.tryToConnectToForwarder();
}
void TcpReceiver::onClientDisconnect(HTTPClientConnectionCallback* client) {