aboutsummaryrefslogtreecommitdiffstats
path: root/apps/http-proxy
diff options
context:
space:
mode:
Diffstat (limited to 'apps/http-proxy')
-rw-r--r--apps/http-proxy/src/ATSConnector.cc92
-rw-r--r--apps/http-proxy/src/ATSConnector.h13
-rw-r--r--apps/http-proxy/src/HTTP1.xMessageFastParser.cc19
-rw-r--r--apps/http-proxy/src/HTTP1.xMessageFastParser.h8
4 files changed, 109 insertions, 23 deletions
diff --git a/apps/http-proxy/src/ATSConnector.cc b/apps/http-proxy/src/ATSConnector.cc
index f656a68cb..a9b889941 100644
--- a/apps/http-proxy/src/ATSConnector.cc
+++ b/apps/http-proxy/src/ATSConnector.cc
@@ -33,6 +33,9 @@ ATSConnector::ATSConnector(asio::io_service &io_service,
timer_(io_service),
is_reconnection_(false),
data_available_(false),
+ content_length_(0),
+ is_last_chunk_(false),
+ chunked_(false),
receive_callback_(receive_callback),
on_reconnect_callback_(on_reconnect_callback) {
input_buffer_.prepare(buffer_size + 2048);
@@ -96,23 +99,26 @@ void ATSConnector::doWrite() {
});
} // namespace transport
-void ATSConnector::handleRead(std::error_code ec, std::size_t length,
- std::size_t size) {
+void ATSConnector::handleRead(std::error_code ec, std::size_t length) {
if (TRANSPORT_EXPECT_TRUE(!ec)) {
- size -= length;
+ content_length_ -= length;
const uint8_t *buffer =
asio::buffer_cast<const uint8_t *>(input_buffer_.data());
- receive_callback_(buffer, input_buffer_.size(), !size, false);
+ receive_callback_(buffer, input_buffer_.size(), !content_length_, false);
input_buffer_.consume(input_buffer_.size());
- if (!size) {
- doReadHeader();
+ if (!content_length_) {
+ if (!chunked_ || is_last_chunk_) {
+ doReadHeader();
+ } else {
+ doReadChunkedHeader();
+ }
} else {
- auto to_read = size >= buffer_size ? buffer_size : size;
- asio::async_read(
- socket_, input_buffer_, asio::transfer_exactly(to_read),
- std::bind(&ATSConnector::handleRead, this, std::placeholders::_1,
- std::placeholders::_2, size));
+ auto to_read =
+ content_length_ >= buffer_size ? buffer_size : content_length_;
+ asio::async_read(socket_, input_buffer_, asio::transfer_exactly(to_read),
+ std::bind(&ATSConnector::handleRead, this,
+ std::placeholders::_1, std::placeholders::_2));
}
} else if (ec == asio::error::eof) {
input_buffer_.consume(input_buffer_.size());
@@ -129,20 +135,47 @@ void ATSConnector::doReadBody(std::size_t body_size,
? (buffer_size - input_buffer_.size())
: bytes_to_read;
+ is_last_chunk_ = chunked_ && body_size == 5;
+
if (to_read > 0) {
- asio::async_read(
- socket_, input_buffer_, asio::transfer_exactly(to_read),
- std::bind(&ATSConnector::handleRead, this, std::placeholders::_1,
- std::placeholders::_2, bytes_to_read));
+ content_length_ = bytes_to_read;
+ asio::async_read(socket_, input_buffer_, asio::transfer_exactly(to_read),
+ std::bind(&ATSConnector::handleRead, this,
+ std::placeholders::_1, std::placeholders::_2));
} else {
const uint8_t *buffer =
asio::buffer_cast<const uint8_t *>(input_buffer_.data());
- receive_callback_(buffer, body_size, !to_read, false);
+ receive_callback_(buffer, body_size, chunked_ ? is_last_chunk_ : !to_read,
+ false);
input_buffer_.consume(body_size);
- doReadHeader();
+
+ if (!chunked_ || is_last_chunk_) {
+ doReadHeader();
+ } else {
+ doReadChunkedHeader();
+ }
}
}
+void ATSConnector::doReadChunkedHeader() {
+ asio::async_read_until(
+ socket_, input_buffer_, "\r\n",
+ [this](std::error_code ec, std::size_t length) {
+ if (TRANSPORT_EXPECT_TRUE(!ec)) {
+ const uint8_t *buffer =
+ asio::buffer_cast<const uint8_t *>(input_buffer_.data());
+ std::size_t chunk_size =
+ std::stoul(reinterpret_cast<const char *>(buffer), 0, 16) + 2 +
+ length;
+ auto additional_bytes = input_buffer_.size();
+ doReadBody(chunk_size, additional_bytes);
+ } else {
+ input_buffer_.consume(input_buffer_.size());
+ tryReconnection();
+ }
+ });
+}
+
void ATSConnector::doReadHeader() {
asio::async_read_until(
socket_, input_buffer_, "\r\n\r\n",
@@ -150,14 +183,31 @@ void ATSConnector::doReadHeader() {
if (TRANSPORT_EXPECT_TRUE(!ec)) {
const uint8_t *buffer =
asio::buffer_cast<const uint8_t *>(input_buffer_.data());
- std::size_t size = HTTPMessageFastParser::hasBody(buffer, length);
+ auto headers = HTTPMessageFastParser::getHeaders(buffer, length);
- auto additional_bytes = input_buffer_.size() - length;
+ // Try to get content length, if available
+ auto it = headers.find(HTTPMessageFastParser::content_length);
+ std::size_t size = 0;
+ if (it != headers.end()) {
+ size = std::stoull(it->second);
+ chunked_ = false;
+ } else {
+ it = headers.find(HTTPMessageFastParser::transfer_encoding);
+ if (it != headers.end() &&
+ it->second.compare(HTTPMessageFastParser::chunked) == 0) {
+ chunked_ = true;
+ }
+ }
- receive_callback_(buffer, length, !size, true);
+ receive_callback_(buffer, length, !size && !chunked_, true);
+ auto additional_bytes = input_buffer_.size() - length;
input_buffer_.consume(length);
- doReadBody(size, additional_bytes);
+ if (!chunked_) {
+ doReadBody(size, additional_bytes);
+ } else {
+ doReadChunkedHeader();
+ }
} else {
input_buffer_.consume(input_buffer_.size());
tryReconnection();
diff --git a/apps/http-proxy/src/ATSConnector.h b/apps/http-proxy/src/ATSConnector.h
index dbec30353..8d91b7b7b 100644
--- a/apps/http-proxy/src/ATSConnector.h
+++ b/apps/http-proxy/src/ATSConnector.h
@@ -65,12 +65,17 @@ class ATSConnector {
void doReadBody(std::size_t body_size, std::size_t additional_bytes);
+ // void handleReadChunked(std::error_code ec, std::size_t length,
+ // std::size_t size);
+
+ void doReadChunkedHeader();
+
void doWrite();
bool checkConnected();
private:
- void handleRead(std::error_code ec, std::size_t length, std::size_t bytes);
+ void handleRead(std::error_code ec, std::size_t length);
void tryReconnection();
void startConnectionTimer();
void handleDeadline(const std::error_code &ec);
@@ -88,6 +93,12 @@ class ATSConnector {
bool is_reconnection_;
bool data_available_;
+ std::size_t content_length_;
+
+ // Chunked encoding
+ bool is_last_chunk_;
+ bool chunked_;
+
ContentReceivedCallback receive_callback_;
OnReconnect on_reconnect_callback_;
diff --git a/apps/http-proxy/src/HTTP1.xMessageFastParser.cc b/apps/http-proxy/src/HTTP1.xMessageFastParser.cc
index a03871649..729eb3aeb 100644
--- a/apps/http-proxy/src/HTTP1.xMessageFastParser.cc
+++ b/apps/http-proxy/src/HTTP1.xMessageFastParser.cc
@@ -15,17 +15,36 @@
#include "HTTP1.xMessageFastParser.h"
+#include <hicn/transport/http/response.h>
+
#include <experimental/algorithm>
#include <experimental/functional>
#include <iostream>
std::string HTTPMessageFastParser::numbers = "0123456789";
std::string HTTPMessageFastParser::content_length = "Content-Length";
+std::string HTTPMessageFastParser::transfer_encoding = "Transfer-Encoding";
+std::string HTTPMessageFastParser::chunked = "chunked";
std::string HTTPMessageFastParser::cache_control = "Cache-Control";
std::string HTTPMessageFastParser::mpd = "mpd";
std::string HTTPMessageFastParser::connection = "Connection";
std::string HTTPMessageFastParser::separator = "\r\n\r\n";
+HTTPHeaders HTTPMessageFastParser::getHeaders(const uint8_t *headers,
+ std::size_t length) {
+ HTTPHeaders ret;
+ std::string http_version;
+ std::string status_code;
+ std::string status_string;
+
+ if (transport::http::HTTPResponse::parseHeaders(headers, length, ret, http_version,
+ status_code, status_string)) {
+ return ret;
+ }
+
+ throw std::runtime_error("Error parsing response headers.");
+}
+
std::size_t HTTPMessageFastParser::hasBody(const uint8_t *headers,
std::size_t length) {
const char *buffer = reinterpret_cast<const char *>(headers);
diff --git a/apps/http-proxy/src/HTTP1.xMessageFastParser.h b/apps/http-proxy/src/HTTP1.xMessageFastParser.h
index 10a70c3e9..79dbce19d 100644
--- a/apps/http-proxy/src/HTTP1.xMessageFastParser.h
+++ b/apps/http-proxy/src/HTTP1.xMessageFastParser.h
@@ -18,15 +18,21 @@
#include <algorithm>
#include <string>
+#include <hicn/transport/http/message.h>
+
+using transport::http::HTTPHeaders;
+
class HTTPMessageFastParser {
public:
+ static HTTPHeaders getHeaders(const uint8_t* headers, std::size_t length);
static std::size_t hasBody(const uint8_t* headers, std::size_t length);
static bool isMpdRequest(const uint8_t* headers, std::size_t length);
static uint32_t parseCacheControl(const uint8_t* headers, std::size_t length);
- private:
static std::string numbers;
static std::string content_length;
+ static std::string transfer_encoding;
+ static std::string chunked;
static std::string cache_control;
static std::string connection;
static std::string mpd;