summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMauro Sardara <msardara@cisco.com>2020-02-19 15:49:25 +0100
committerMauro Sardara <msardara@cisco.com>2020-02-19 15:55:58 +0100
commit24acbd12881e2cbf3dd209afc384b1ab4cc3faf8 (patch)
tree85e84d242f71c9b527839f116c00c72e0b9ce837
parent0710f1ff754ebf01ae5befabb055349fe472b0c2 (diff)
[HICN-530] Add support for chunked Transfer-Encoding in higet and http-proxy
Change-Id: Ibf954e5e886412a934542a10d94d89bb8a55a676 Signed-off-by: Mauro Sardara <msardara@cisco.com>
-rw-r--r--apps/higet/higet.cc74
-rw-r--r--apps/http-proxy/src/ATSConnector.cc92
-rw-r--r--apps/http-proxy/src/ATSConnector.h13
-rw-r--r--apps/http-proxy/src/HTTP1.xMessageFastParser.cc19
-rw-r--r--apps/http-proxy/src/HTTP1.xMessageFastParser.h8
-rw-r--r--libtransport/src/hicn/transport/http/response.cc46
-rw-r--r--libtransport/src/hicn/transport/http/response.h6
7 files changed, 213 insertions, 45 deletions
diff --git a/apps/higet/higet.cc b/apps/higet/higet.cc
index df34d5c14..fcb0cc540 100644
--- a/apps/higet/higet.cc
+++ b/apps/higet/higet.cc
@@ -17,6 +17,9 @@
#include <fstream>
#include <map>
+#include <experimental/algorithm>
+#include <experimental/functional>
+
#ifndef ASIO_STANDALONE
#define ASIO_STANDALONE
#include <asio.hpp>
@@ -41,12 +44,16 @@ typedef struct {
class ReadBytesCallbackImplementation
: public transport::http::HTTPClientConnection::ReadBytesCallback {
+ static std::string chunk_separator;
+
public:
ReadBytesCallbackImplementation(std::string file_name, long yet_downloaded)
: file_name_(file_name),
temp_file_name_(file_name_ + ".temp"),
yet_downloaded_(yet_downloaded),
byte_downloaded_(yet_downloaded),
+ chunked_(false),
+ chunk_size_(0),
work_(std::make_unique<asio::io_service::work>(io_service_)),
thread_(
std::make_unique<std::thread>([this]() { io_service_.run(); })) {
@@ -71,21 +78,72 @@ class ReadBytesCallbackImplementation
auto buffer_ptr = buffer.release();
io_service_.post([this, buffer_ptr]() {
auto buffer = std::unique_ptr<utils::MemBuf>(buffer_ptr);
+ std::unique_ptr<utils::MemBuf> payload;
if (!first_chunk_read_) {
transport::http::HTTPResponse http_response(std::move(buffer));
- auto payload = http_response.getPayload();
+ payload = http_response.getPayload();
auto header = http_response.getHeaders();
+ content_size_ = yet_downloaded_;
std::map<std::string, std::string>::iterator it =
header.find("Content-Length");
if (it != header.end()) {
- content_size_ = yet_downloaded_ + std::stol(it->second);
+ content_size_ += std::stol(it->second);
+ } else {
+ it = header.find("Transfer-Encoding");
+ if (it != header.end() && it->second.compare("chunked") == 0) {
+ chunked_ = true;
+ }
}
- out_->write((char *)payload->data(), payload->length());
first_chunk_read_ = true;
- byte_downloaded_ += payload->length();
} else {
- out_->write((char *)buffer->data(), buffer->length());
- byte_downloaded_ += buffer->length();
+ payload = std::move(buffer);
+ }
+
+ if (chunked_) {
+ if (chunk_size_ > 0) {
+ out_->write((char *)payload->data(), chunk_size_);
+ payload->trimStart(chunk_size_);
+
+ if (payload->length() >= chunk_separator.size()) {
+ payload->trimStart(chunk_separator.size());
+ }
+ }
+
+ while (payload->length() > 0) {
+ // read next chunk size
+ const char *begin = (const char *)payload->data();
+ const char *end = (const char *)payload->tail();
+
+ using std::experimental::make_boyer_moore_searcher;
+ auto it = std::experimental::search(
+ begin, end,
+ make_boyer_moore_searcher(chunk_separator.begin(),
+ chunk_separator.end()));
+ if (it != end) {
+ chunk_size_ = std::stoul(begin, 0, 16);
+ content_size_ += chunk_size_;
+ payload->trimStart(it + chunk_separator.size() - begin);
+
+ std::size_t to_write;
+ if (payload->length() >= chunk_size_) {
+ to_write = chunk_size_;
+ } else {
+ to_write = payload->length();
+ chunk_size_ -= payload->length();
+ }
+
+ out_->write((char *)payload->data(), to_write);
+ byte_downloaded_ += to_write;
+ payload->trimStart(to_write);
+
+ if (payload->length() >= chunk_separator.size()) {
+ payload->trimStart(chunk_separator.size());
+ }
+ }
+ }
+ } else {
+ out_->write((char *)payload->data(), payload->length());
+ byte_downloaded_ += payload->length();
}
if (file_name_ != "-") {
@@ -174,11 +232,15 @@ class ReadBytesCallbackImplementation
long content_size_;
bool first_chunk_read_ = false;
long byte_downloaded_ = 0;
+ bool chunked_;
+ std::size_t chunk_size_;
asio::io_service io_service_;
std::unique_ptr<asio::io_service::work> work_;
std::unique_ptr<std::thread> thread_;
};
+std::string ReadBytesCallbackImplementation::chunk_separator = "\r\n";
+
long checkFileStatus(std::string file_name) {
struct stat stat_buf;
std::string temp_file_name_ = file_name + ".temp";
diff --git a/apps/http-proxy/src/ATSConnector.cc b/apps/http-proxy/src/ATSConnector.cc
index f656a68cb..a9b889941 100644
--- a/apps/http-proxy/src/ATSConnector.cc
+++ b/apps/http-proxy/src/ATSConnector.cc
@@ -33,6 +33,9 @@ ATSConnector::ATSConnector(asio::io_service &io_service,
timer_(io_service),
is_reconnection_(false),
data_available_(false),
+ content_length_(0),
+ is_last_chunk_(false),
+ chunked_(false),
receive_callback_(receive_callback),
on_reconnect_callback_(on_reconnect_callback) {
input_buffer_.prepare(buffer_size + 2048);
@@ -96,23 +99,26 @@ void ATSConnector::doWrite() {
});
} // namespace transport
-void ATSConnector::handleRead(std::error_code ec, std::size_t length,
- std::size_t size) {
+void ATSConnector::handleRead(std::error_code ec, std::size_t length) {
if (TRANSPORT_EXPECT_TRUE(!ec)) {
- size -= length;
+ content_length_ -= length;
const uint8_t *buffer =
asio::buffer_cast<const uint8_t *>(input_buffer_.data());
- receive_callback_(buffer, input_buffer_.size(), !size, false);
+ receive_callback_(buffer, input_buffer_.size(), !content_length_, false);
input_buffer_.consume(input_buffer_.size());
- if (!size) {
- doReadHeader();
+ if (!content_length_) {
+ if (!chunked_ || is_last_chunk_) {
+ doReadHeader();
+ } else {
+ doReadChunkedHeader();
+ }
} else {
- auto to_read = size >= buffer_size ? buffer_size : size;
- asio::async_read(
- socket_, input_buffer_, asio::transfer_exactly(to_read),
- std::bind(&ATSConnector::handleRead, this, std::placeholders::_1,
- std::placeholders::_2, size));
+ auto to_read =
+ content_length_ >= buffer_size ? buffer_size : content_length_;
+ asio::async_read(socket_, input_buffer_, asio::transfer_exactly(to_read),
+ std::bind(&ATSConnector::handleRead, this,
+ std::placeholders::_1, std::placeholders::_2));
}
} else if (ec == asio::error::eof) {
input_buffer_.consume(input_buffer_.size());
@@ -129,20 +135,47 @@ void ATSConnector::doReadBody(std::size_t body_size,
? (buffer_size - input_buffer_.size())
: bytes_to_read;
+ is_last_chunk_ = chunked_ && body_size == 5;
+
if (to_read > 0) {
- asio::async_read(
- socket_, input_buffer_, asio::transfer_exactly(to_read),
- std::bind(&ATSConnector::handleRead, this, std::placeholders::_1,
- std::placeholders::_2, bytes_to_read));
+ content_length_ = bytes_to_read;
+ asio::async_read(socket_, input_buffer_, asio::transfer_exactly(to_read),
+ std::bind(&ATSConnector::handleRead, this,
+ std::placeholders::_1, std::placeholders::_2));
} else {
const uint8_t *buffer =
asio::buffer_cast<const uint8_t *>(input_buffer_.data());
- receive_callback_(buffer, body_size, !to_read, false);
+ receive_callback_(buffer, body_size, chunked_ ? is_last_chunk_ : !to_read,
+ false);
input_buffer_.consume(body_size);
- doReadHeader();
+
+ if (!chunked_ || is_last_chunk_) {
+ doReadHeader();
+ } else {
+ doReadChunkedHeader();
+ }
}
}
+void ATSConnector::doReadChunkedHeader() {
+ asio::async_read_until(
+ socket_, input_buffer_, "\r\n",
+ [this](std::error_code ec, std::size_t length) {
+ if (TRANSPORT_EXPECT_TRUE(!ec)) {
+ const uint8_t *buffer =
+ asio::buffer_cast<const uint8_t *>(input_buffer_.data());
+ std::size_t chunk_size =
+ std::stoul(reinterpret_cast<const char *>(buffer), 0, 16) + 2 +
+ length;
+ auto additional_bytes = input_buffer_.size();
+ doReadBody(chunk_size, additional_bytes);
+ } else {
+ input_buffer_.consume(input_buffer_.size());
+ tryReconnection();
+ }
+ });
+}
+
void ATSConnector::doReadHeader() {
asio::async_read_until(
socket_, input_buffer_, "\r\n\r\n",
@@ -150,14 +183,31 @@ void ATSConnector::doReadHeader() {
if (TRANSPORT_EXPECT_TRUE(!ec)) {
const uint8_t *buffer =
asio::buffer_cast<const uint8_t *>(input_buffer_.data());
- std::size_t size = HTTPMessageFastParser::hasBody(buffer, length);
+ auto headers = HTTPMessageFastParser::getHeaders(buffer, length);
- auto additional_bytes = input_buffer_.size() - length;
+ // Try to get content length, if available
+ auto it = headers.find(HTTPMessageFastParser::content_length);
+ std::size_t size = 0;
+ if (it != headers.end()) {
+ size = std::stoull(it->second);
+ chunked_ = false;
+ } else {
+ it = headers.find(HTTPMessageFastParser::transfer_encoding);
+ if (it != headers.end() &&
+ it->second.compare(HTTPMessageFastParser::chunked) == 0) {
+ chunked_ = true;
+ }
+ }
- receive_callback_(buffer, length, !size, true);
+ receive_callback_(buffer, length, !size && !chunked_, true);
+ auto additional_bytes = input_buffer_.size() - length;
input_buffer_.consume(length);
- doReadBody(size, additional_bytes);
+ if (!chunked_) {
+ doReadBody(size, additional_bytes);
+ } else {
+ doReadChunkedHeader();
+ }
} else {
input_buffer_.consume(input_buffer_.size());
tryReconnection();
diff --git a/apps/http-proxy/src/ATSConnector.h b/apps/http-proxy/src/ATSConnector.h
index dbec30353..8d91b7b7b 100644
--- a/apps/http-proxy/src/ATSConnector.h
+++ b/apps/http-proxy/src/ATSConnector.h
@@ -65,12 +65,17 @@ class ATSConnector {
void doReadBody(std::size_t body_size, std::size_t additional_bytes);
+ // void handleReadChunked(std::error_code ec, std::size_t length,
+ // std::size_t size);
+
+ void doReadChunkedHeader();
+
void doWrite();
bool checkConnected();
private:
- void handleRead(std::error_code ec, std::size_t length, std::size_t bytes);
+ void handleRead(std::error_code ec, std::size_t length);
void tryReconnection();
void startConnectionTimer();
void handleDeadline(const std::error_code &ec);
@@ -88,6 +93,12 @@ class ATSConnector {
bool is_reconnection_;
bool data_available_;
+ std::size_t content_length_;
+
+ // Chunked encoding
+ bool is_last_chunk_;
+ bool chunked_;
+
ContentReceivedCallback receive_callback_;
OnReconnect on_reconnect_callback_;
diff --git a/apps/http-proxy/src/HTTP1.xMessageFastParser.cc b/apps/http-proxy/src/HTTP1.xMessageFastParser.cc
index a03871649..729eb3aeb 100644
--- a/apps/http-proxy/src/HTTP1.xMessageFastParser.cc
+++ b/apps/http-proxy/src/HTTP1.xMessageFastParser.cc
@@ -15,17 +15,36 @@
#include "HTTP1.xMessageFastParser.h"
+#include <hicn/transport/http/response.h>
+
#include <experimental/algorithm>
#include <experimental/functional>
#include <iostream>
std::string HTTPMessageFastParser::numbers = "0123456789";
std::string HTTPMessageFastParser::content_length = "Content-Length";
+std::string HTTPMessageFastParser::transfer_encoding = "Transfer-Encoding";
+std::string HTTPMessageFastParser::chunked = "chunked";
std::string HTTPMessageFastParser::cache_control = "Cache-Control";
std::string HTTPMessageFastParser::mpd = "mpd";
std::string HTTPMessageFastParser::connection = "Connection";
std::string HTTPMessageFastParser::separator = "\r\n\r\n";
+HTTPHeaders HTTPMessageFastParser::getHeaders(const uint8_t *headers,
+ std::size_t length) {
+ HTTPHeaders ret;
+ std::string http_version;
+ std::string status_code;
+ std::string status_string;
+
+ if (transport::http::HTTPResponse::parseHeaders(headers, length, ret, http_version,
+ status_code, status_string)) {
+ return ret;
+ }
+
+ throw std::runtime_error("Error parsing response headers.");
+}
+
std::size_t HTTPMessageFastParser::hasBody(const uint8_t *headers,
std::size_t length) {
const char *buffer = reinterpret_cast<const char *>(headers);
diff --git a/apps/http-proxy/src/HTTP1.xMessageFastParser.h b/apps/http-proxy/src/HTTP1.xMessageFastParser.h
index 10a70c3e9..79dbce19d 100644
--- a/apps/http-proxy/src/HTTP1.xMessageFastParser.h
+++ b/apps/http-proxy/src/HTTP1.xMessageFastParser.h
@@ -18,15 +18,21 @@
#include <algorithm>
#include <string>
+#include <hicn/transport/http/message.h>
+
+using transport::http::HTTPHeaders;
+
class HTTPMessageFastParser {
public:
+ static HTTPHeaders getHeaders(const uint8_t* headers, std::size_t length);
static std::size_t hasBody(const uint8_t* headers, std::size_t length);
static bool isMpdRequest(const uint8_t* headers, std::size_t length);
static uint32_t parseCacheControl(const uint8_t* headers, std::size_t length);
- private:
static std::string numbers;
static std::string content_length;
+ static std::string transfer_encoding;
+ static std::string chunked;
static std::string cache_control;
static std::string connection;
static std::string mpd;
diff --git a/libtransport/src/hicn/transport/http/response.cc b/libtransport/src/hicn/transport/http/response.cc
index a2bc47e6b..ba0acd1ac 100644
--- a/libtransport/src/hicn/transport/http/response.cc
+++ b/libtransport/src/hicn/transport/http/response.cc
@@ -41,16 +41,33 @@ void HTTPResponse::appendResponseChunk(
}
bool HTTPResponse::parseHeaders(std::unique_ptr<utils::MemBuf> &&buffer) {
+ auto ret =
+ HTTPResponse::parseHeaders(buffer->data(), buffer->length(), headers_,
+ http_version_, status_code_, status_string_);
+
+ if (ret) {
+ buffer->trimStart(ret);
+ payload_ = std::move(buffer);
+ return true;
+ }
+
+ return false;
+}
+
+std::size_t HTTPResponse::parseHeaders(const uint8_t *buffer, std::size_t size,
+ HTTPHeaders &headers,
+ std::string &http_version,
+ std::string &status_code,
+ std::string &status_string) {
const char *crlf2 = "\r\n\r\n";
- const char *begin = (const char *)buffer->data();
- const char *end = begin + buffer->length();
+ const char *begin = (const char *)buffer;
+ const char *end = begin + size;
auto it =
std::experimental::search(begin, end,
std::experimental::make_boyer_moore_searcher(
crlf2, crlf2 + strlen(crlf2)));
if (it != end) {
- buffer->trimStart(it + strlen(crlf2) - begin);
std::stringstream ss;
ss.str(std::string(begin, it));
@@ -58,29 +75,28 @@ bool HTTPResponse::parseHeaders(std::unique_ptr<utils::MemBuf> &&buffer) {
getline(ss, line);
std::istringstream line_s(line);
std::string _http_version;
- std::string http_version;
line_s >> _http_version;
std::size_t separator;
if ((separator = _http_version.find('/')) != std::string::npos) {
if (_http_version.substr(0, separator) != "HTTP") {
- return false;
+ return 0;
}
- http_version_ =
+ http_version =
line.substr(separator + 1, _http_version.length() - separator - 1);
} else {
- return false;
+ return 0;
}
- std::string status_code, status_string;
+ std::string _status_string;
- line_s >> status_code_;
- line_s >> status_string;
+ line_s >> status_code;
+ line_s >> _status_string;
auto _it = std::search(line.begin(), line.end(), status_string.begin(),
status_string.end());
- status_string_ = std::string(_it, line.end() - 1);
+ status_string = std::string(_it, line.end() - 1);
std::size_t param_end;
std::size_t value_start;
@@ -92,19 +108,17 @@ bool HTTPResponse::parseHeaders(std::unique_ptr<utils::MemBuf> &&buffer) {
value_start++;
}
if (value_start < line.size()) {
- headers_[line.substr(0, param_end)] =
+ headers[line.substr(0, param_end)] =
line.substr(value_start, line.size() - value_start - 1);
}
}
} else {
- return false;
+ return 0;
}
}
}
- payload_ = std::move(buffer);
-
- return true;
+ return it + strlen(crlf2) - begin;
}
void HTTPResponse::parse(std::unique_ptr<utils::MemBuf> &&response) {
diff --git a/libtransport/src/hicn/transport/http/response.h b/libtransport/src/hicn/transport/http/response.h
index 7ef655059..bab41acb8 100644
--- a/libtransport/src/hicn/transport/http/response.h
+++ b/libtransport/src/hicn/transport/http/response.h
@@ -42,6 +42,12 @@ class HTTPResponse : public HTTPMessage {
bool parseHeaders(std::unique_ptr<utils::MemBuf> &&buffer);
+ static std::size_t parseHeaders(const uint8_t *buffer, std::size_t size,
+ HTTPHeaders &headers,
+ std::string &http_version,
+ std::string &status_code,
+ std::string &status_string);
+
private:
std::string status_code_;
std::string status_string_;