diff options
author | Mauro Sardara <msardara@cisco.com> | 2020-02-19 15:49:25 +0100 |
---|---|---|
committer | Mauro Sardara <msardara@cisco.com> | 2020-02-19 15:55:58 +0100 |
commit | 24acbd12881e2cbf3dd209afc384b1ab4cc3faf8 (patch) | |
tree | 85e84d242f71c9b527839f116c00c72e0b9ce837 | |
parent | 0710f1ff754ebf01ae5befabb055349fe472b0c2 (diff) |
[HICN-530] Add support for chunked Transfer-Encoding in higet and http-proxy
Change-Id: Ibf954e5e886412a934542a10d94d89bb8a55a676
Signed-off-by: Mauro Sardara <msardara@cisco.com>
-rw-r--r-- | apps/higet/higet.cc | 74 | ||||
-rw-r--r-- | apps/http-proxy/src/ATSConnector.cc | 92 | ||||
-rw-r--r-- | apps/http-proxy/src/ATSConnector.h | 13 | ||||
-rw-r--r-- | apps/http-proxy/src/HTTP1.xMessageFastParser.cc | 19 | ||||
-rw-r--r-- | apps/http-proxy/src/HTTP1.xMessageFastParser.h | 8 | ||||
-rw-r--r-- | libtransport/src/hicn/transport/http/response.cc | 46 | ||||
-rw-r--r-- | libtransport/src/hicn/transport/http/response.h | 6 |
7 files changed, 213 insertions, 45 deletions
diff --git a/apps/higet/higet.cc b/apps/higet/higet.cc index df34d5c14..fcb0cc540 100644 --- a/apps/higet/higet.cc +++ b/apps/higet/higet.cc @@ -17,6 +17,9 @@ #include <fstream> #include <map> +#include <experimental/algorithm> +#include <experimental/functional> + #ifndef ASIO_STANDALONE #define ASIO_STANDALONE #include <asio.hpp> @@ -41,12 +44,16 @@ typedef struct { class ReadBytesCallbackImplementation : public transport::http::HTTPClientConnection::ReadBytesCallback { + static std::string chunk_separator; + public: ReadBytesCallbackImplementation(std::string file_name, long yet_downloaded) : file_name_(file_name), temp_file_name_(file_name_ + ".temp"), yet_downloaded_(yet_downloaded), byte_downloaded_(yet_downloaded), + chunked_(false), + chunk_size_(0), work_(std::make_unique<asio::io_service::work>(io_service_)), thread_( std::make_unique<std::thread>([this]() { io_service_.run(); })) { @@ -71,21 +78,72 @@ class ReadBytesCallbackImplementation auto buffer_ptr = buffer.release(); io_service_.post([this, buffer_ptr]() { auto buffer = std::unique_ptr<utils::MemBuf>(buffer_ptr); + std::unique_ptr<utils::MemBuf> payload; if (!first_chunk_read_) { transport::http::HTTPResponse http_response(std::move(buffer)); - auto payload = http_response.getPayload(); + payload = http_response.getPayload(); auto header = http_response.getHeaders(); + content_size_ = yet_downloaded_; std::map<std::string, std::string>::iterator it = header.find("Content-Length"); if (it != header.end()) { - content_size_ = yet_downloaded_ + std::stol(it->second); + content_size_ += std::stol(it->second); + } else { + it = header.find("Transfer-Encoding"); + if (it != header.end() && it->second.compare("chunked") == 0) { + chunked_ = true; + } } - out_->write((char *)payload->data(), payload->length()); first_chunk_read_ = true; - byte_downloaded_ += payload->length(); } else { - out_->write((char *)buffer->data(), buffer->length()); - byte_downloaded_ += buffer->length(); + payload = std::move(buffer); + } + + if (chunked_) { + if (chunk_size_ > 0) { + out_->write((char *)payload->data(), chunk_size_); + payload->trimStart(chunk_size_); + + if (payload->length() >= chunk_separator.size()) { + payload->trimStart(chunk_separator.size()); + } + } + + while (payload->length() > 0) { + // read next chunk size + const char *begin = (const char *)payload->data(); + const char *end = (const char *)payload->tail(); + + using std::experimental::make_boyer_moore_searcher; + auto it = std::experimental::search( + begin, end, + make_boyer_moore_searcher(chunk_separator.begin(), + chunk_separator.end())); + if (it != end) { + chunk_size_ = std::stoul(begin, 0, 16); + content_size_ += chunk_size_; + payload->trimStart(it + chunk_separator.size() - begin); + + std::size_t to_write; + if (payload->length() >= chunk_size_) { + to_write = chunk_size_; + } else { + to_write = payload->length(); + chunk_size_ -= payload->length(); + } + + out_->write((char *)payload->data(), to_write); + byte_downloaded_ += to_write; + payload->trimStart(to_write); + + if (payload->length() >= chunk_separator.size()) { + payload->trimStart(chunk_separator.size()); + } + } + } + } else { + out_->write((char *)payload->data(), payload->length()); + byte_downloaded_ += payload->length(); } if (file_name_ != "-") { @@ -174,11 +232,15 @@ class ReadBytesCallbackImplementation long content_size_; bool first_chunk_read_ = false; long byte_downloaded_ = 0; + bool chunked_; + std::size_t chunk_size_; asio::io_service io_service_; std::unique_ptr<asio::io_service::work> work_; std::unique_ptr<std::thread> thread_; }; +std::string ReadBytesCallbackImplementation::chunk_separator = "\r\n"; + long checkFileStatus(std::string file_name) { struct stat stat_buf; std::string temp_file_name_ = file_name + ".temp"; diff --git a/apps/http-proxy/src/ATSConnector.cc b/apps/http-proxy/src/ATSConnector.cc index f656a68cb..a9b889941 100644 --- a/apps/http-proxy/src/ATSConnector.cc +++ b/apps/http-proxy/src/ATSConnector.cc @@ -33,6 +33,9 @@ ATSConnector::ATSConnector(asio::io_service &io_service, timer_(io_service), is_reconnection_(false), data_available_(false), + content_length_(0), + is_last_chunk_(false), + chunked_(false), receive_callback_(receive_callback), on_reconnect_callback_(on_reconnect_callback) { input_buffer_.prepare(buffer_size + 2048); @@ -96,23 +99,26 @@ void ATSConnector::doWrite() { }); } // namespace transport -void ATSConnector::handleRead(std::error_code ec, std::size_t length, - std::size_t size) { +void ATSConnector::handleRead(std::error_code ec, std::size_t length) { if (TRANSPORT_EXPECT_TRUE(!ec)) { - size -= length; + content_length_ -= length; const uint8_t *buffer = asio::buffer_cast<const uint8_t *>(input_buffer_.data()); - receive_callback_(buffer, input_buffer_.size(), !size, false); + receive_callback_(buffer, input_buffer_.size(), !content_length_, false); input_buffer_.consume(input_buffer_.size()); - if (!size) { - doReadHeader(); + if (!content_length_) { + if (!chunked_ || is_last_chunk_) { + doReadHeader(); + } else { + doReadChunkedHeader(); + } } else { - auto to_read = size >= buffer_size ? buffer_size : size; - asio::async_read( - socket_, input_buffer_, asio::transfer_exactly(to_read), - std::bind(&ATSConnector::handleRead, this, std::placeholders::_1, - std::placeholders::_2, size)); + auto to_read = + content_length_ >= buffer_size ? buffer_size : content_length_; + asio::async_read(socket_, input_buffer_, asio::transfer_exactly(to_read), + std::bind(&ATSConnector::handleRead, this, + std::placeholders::_1, std::placeholders::_2)); } } else if (ec == asio::error::eof) { input_buffer_.consume(input_buffer_.size()); @@ -129,20 +135,47 @@ void ATSConnector::doReadBody(std::size_t body_size, ? (buffer_size - input_buffer_.size()) : bytes_to_read; + is_last_chunk_ = chunked_ && body_size == 5; + if (to_read > 0) { - asio::async_read( - socket_, input_buffer_, asio::transfer_exactly(to_read), - std::bind(&ATSConnector::handleRead, this, std::placeholders::_1, - std::placeholders::_2, bytes_to_read)); + content_length_ = bytes_to_read; + asio::async_read(socket_, input_buffer_, asio::transfer_exactly(to_read), + std::bind(&ATSConnector::handleRead, this, + std::placeholders::_1, std::placeholders::_2)); } else { const uint8_t *buffer = asio::buffer_cast<const uint8_t *>(input_buffer_.data()); - receive_callback_(buffer, body_size, !to_read, false); + receive_callback_(buffer, body_size, chunked_ ? is_last_chunk_ : !to_read, + false); input_buffer_.consume(body_size); - doReadHeader(); + + if (!chunked_ || is_last_chunk_) { + doReadHeader(); + } else { + doReadChunkedHeader(); + } } } +void ATSConnector::doReadChunkedHeader() { + asio::async_read_until( + socket_, input_buffer_, "\r\n", + [this](std::error_code ec, std::size_t length) { + if (TRANSPORT_EXPECT_TRUE(!ec)) { + const uint8_t *buffer = + asio::buffer_cast<const uint8_t *>(input_buffer_.data()); + std::size_t chunk_size = + std::stoul(reinterpret_cast<const char *>(buffer), 0, 16) + 2 + + length; + auto additional_bytes = input_buffer_.size(); + doReadBody(chunk_size, additional_bytes); + } else { + input_buffer_.consume(input_buffer_.size()); + tryReconnection(); + } + }); +} + void ATSConnector::doReadHeader() { asio::async_read_until( socket_, input_buffer_, "\r\n\r\n", @@ -150,14 +183,31 @@ void ATSConnector::doReadHeader() { if (TRANSPORT_EXPECT_TRUE(!ec)) { const uint8_t *buffer = asio::buffer_cast<const uint8_t *>(input_buffer_.data()); - std::size_t size = HTTPMessageFastParser::hasBody(buffer, length); + auto headers = HTTPMessageFastParser::getHeaders(buffer, length); - auto additional_bytes = input_buffer_.size() - length; + // Try to get content length, if available + auto it = headers.find(HTTPMessageFastParser::content_length); + std::size_t size = 0; + if (it != headers.end()) { + size = std::stoull(it->second); + chunked_ = false; + } else { + it = headers.find(HTTPMessageFastParser::transfer_encoding); + if (it != headers.end() && + it->second.compare(HTTPMessageFastParser::chunked) == 0) { + chunked_ = true; + } + } - receive_callback_(buffer, length, !size, true); + receive_callback_(buffer, length, !size && !chunked_, true); + auto additional_bytes = input_buffer_.size() - length; input_buffer_.consume(length); - doReadBody(size, additional_bytes); + if (!chunked_) { + doReadBody(size, additional_bytes); + } else { + doReadChunkedHeader(); + } } else { input_buffer_.consume(input_buffer_.size()); tryReconnection(); diff --git a/apps/http-proxy/src/ATSConnector.h b/apps/http-proxy/src/ATSConnector.h index dbec30353..8d91b7b7b 100644 --- a/apps/http-proxy/src/ATSConnector.h +++ b/apps/http-proxy/src/ATSConnector.h @@ -65,12 +65,17 @@ class ATSConnector { void doReadBody(std::size_t body_size, std::size_t additional_bytes); + // void handleReadChunked(std::error_code ec, std::size_t length, + // std::size_t size); + + void doReadChunkedHeader(); + void doWrite(); bool checkConnected(); private: - void handleRead(std::error_code ec, std::size_t length, std::size_t bytes); + void handleRead(std::error_code ec, std::size_t length); void tryReconnection(); void startConnectionTimer(); void handleDeadline(const std::error_code &ec); @@ -88,6 +93,12 @@ class ATSConnector { bool is_reconnection_; bool data_available_; + std::size_t content_length_; + + // Chunked encoding + bool is_last_chunk_; + bool chunked_; + ContentReceivedCallback receive_callback_; OnReconnect on_reconnect_callback_; diff --git a/apps/http-proxy/src/HTTP1.xMessageFastParser.cc b/apps/http-proxy/src/HTTP1.xMessageFastParser.cc index a03871649..729eb3aeb 100644 --- a/apps/http-proxy/src/HTTP1.xMessageFastParser.cc +++ b/apps/http-proxy/src/HTTP1.xMessageFastParser.cc @@ -15,17 +15,36 @@ #include "HTTP1.xMessageFastParser.h" +#include <hicn/transport/http/response.h> + #include <experimental/algorithm> #include <experimental/functional> #include <iostream> std::string HTTPMessageFastParser::numbers = "0123456789"; std::string HTTPMessageFastParser::content_length = "Content-Length"; +std::string HTTPMessageFastParser::transfer_encoding = "Transfer-Encoding"; +std::string HTTPMessageFastParser::chunked = "chunked"; std::string HTTPMessageFastParser::cache_control = "Cache-Control"; std::string HTTPMessageFastParser::mpd = "mpd"; std::string HTTPMessageFastParser::connection = "Connection"; std::string HTTPMessageFastParser::separator = "\r\n\r\n"; +HTTPHeaders HTTPMessageFastParser::getHeaders(const uint8_t *headers, + std::size_t length) { + HTTPHeaders ret; + std::string http_version; + std::string status_code; + std::string status_string; + + if (transport::http::HTTPResponse::parseHeaders(headers, length, ret, http_version, + status_code, status_string)) { + return ret; + } + + throw std::runtime_error("Error parsing response headers."); +} + std::size_t HTTPMessageFastParser::hasBody(const uint8_t *headers, std::size_t length) { const char *buffer = reinterpret_cast<const char *>(headers); diff --git a/apps/http-proxy/src/HTTP1.xMessageFastParser.h b/apps/http-proxy/src/HTTP1.xMessageFastParser.h index 10a70c3e9..79dbce19d 100644 --- a/apps/http-proxy/src/HTTP1.xMessageFastParser.h +++ b/apps/http-proxy/src/HTTP1.xMessageFastParser.h @@ -18,15 +18,21 @@ #include <algorithm> #include <string> +#include <hicn/transport/http/message.h> + +using transport::http::HTTPHeaders; + class HTTPMessageFastParser { public: + static HTTPHeaders getHeaders(const uint8_t* headers, std::size_t length); static std::size_t hasBody(const uint8_t* headers, std::size_t length); static bool isMpdRequest(const uint8_t* headers, std::size_t length); static uint32_t parseCacheControl(const uint8_t* headers, std::size_t length); - private: static std::string numbers; static std::string content_length; + static std::string transfer_encoding; + static std::string chunked; static std::string cache_control; static std::string connection; static std::string mpd; diff --git a/libtransport/src/hicn/transport/http/response.cc b/libtransport/src/hicn/transport/http/response.cc index a2bc47e6b..ba0acd1ac 100644 --- a/libtransport/src/hicn/transport/http/response.cc +++ b/libtransport/src/hicn/transport/http/response.cc @@ -41,16 +41,33 @@ void HTTPResponse::appendResponseChunk( } bool HTTPResponse::parseHeaders(std::unique_ptr<utils::MemBuf> &&buffer) { + auto ret = + HTTPResponse::parseHeaders(buffer->data(), buffer->length(), headers_, + http_version_, status_code_, status_string_); + + if (ret) { + buffer->trimStart(ret); + payload_ = std::move(buffer); + return true; + } + + return false; +} + +std::size_t HTTPResponse::parseHeaders(const uint8_t *buffer, std::size_t size, + HTTPHeaders &headers, + std::string &http_version, + std::string &status_code, + std::string &status_string) { const char *crlf2 = "\r\n\r\n"; - const char *begin = (const char *)buffer->data(); - const char *end = begin + buffer->length(); + const char *begin = (const char *)buffer; + const char *end = begin + size; auto it = std::experimental::search(begin, end, std::experimental::make_boyer_moore_searcher( crlf2, crlf2 + strlen(crlf2))); if (it != end) { - buffer->trimStart(it + strlen(crlf2) - begin); std::stringstream ss; ss.str(std::string(begin, it)); @@ -58,29 +75,28 @@ bool HTTPResponse::parseHeaders(std::unique_ptr<utils::MemBuf> &&buffer) { getline(ss, line); std::istringstream line_s(line); std::string _http_version; - std::string http_version; line_s >> _http_version; std::size_t separator; if ((separator = _http_version.find('/')) != std::string::npos) { if (_http_version.substr(0, separator) != "HTTP") { - return false; + return 0; } - http_version_ = + http_version = line.substr(separator + 1, _http_version.length() - separator - 1); } else { - return false; + return 0; } - std::string status_code, status_string; + std::string _status_string; - line_s >> status_code_; - line_s >> status_string; + line_s >> status_code; + line_s >> _status_string; auto _it = std::search(line.begin(), line.end(), status_string.begin(), status_string.end()); - status_string_ = std::string(_it, line.end() - 1); + status_string = std::string(_it, line.end() - 1); std::size_t param_end; std::size_t value_start; @@ -92,19 +108,17 @@ bool HTTPResponse::parseHeaders(std::unique_ptr<utils::MemBuf> &&buffer) { value_start++; } if (value_start < line.size()) { - headers_[line.substr(0, param_end)] = + headers[line.substr(0, param_end)] = line.substr(value_start, line.size() - value_start - 1); } } } else { - return false; + return 0; } } } - payload_ = std::move(buffer); - - return true; + return it + strlen(crlf2) - begin; } void HTTPResponse::parse(std::unique_ptr<utils::MemBuf> &&response) { diff --git a/libtransport/src/hicn/transport/http/response.h b/libtransport/src/hicn/transport/http/response.h index 7ef655059..bab41acb8 100644 --- a/libtransport/src/hicn/transport/http/response.h +++ b/libtransport/src/hicn/transport/http/response.h @@ -42,6 +42,12 @@ class HTTPResponse : public HTTPMessage { bool parseHeaders(std::unique_ptr<utils::MemBuf> &&buffer); + static std::size_t parseHeaders(const uint8_t *buffer, std::size_t size, + HTTPHeaders &headers, + std::string &http_version, + std::string &status_code, + std::string &status_string); + private: std::string status_code_; std::string status_string_; |