From 24acbd12881e2cbf3dd209afc384b1ab4cc3faf8 Mon Sep 17 00:00:00 2001 From: Mauro Sardara Date: Wed, 19 Feb 2020 15:49:25 +0100 Subject: [HICN-530] Add support for chunked Transfer-Encoding in higet and http-proxy Change-Id: Ibf954e5e886412a934542a10d94d89bb8a55a676 Signed-off-by: Mauro Sardara --- apps/higet/higet.cc | 74 ++++++++++++++++++-- apps/http-proxy/src/ATSConnector.cc | 92 +++++++++++++++++++------ apps/http-proxy/src/ATSConnector.h | 13 +++- apps/http-proxy/src/HTTP1.xMessageFastParser.cc | 19 +++++ apps/http-proxy/src/HTTP1.xMessageFastParser.h | 8 ++- 5 files changed, 177 insertions(+), 29 deletions(-) (limited to 'apps') diff --git a/apps/higet/higet.cc b/apps/higet/higet.cc index df34d5c14..fcb0cc540 100644 --- a/apps/higet/higet.cc +++ b/apps/higet/higet.cc @@ -17,6 +17,9 @@ #include #include +#include +#include + #ifndef ASIO_STANDALONE #define ASIO_STANDALONE #include @@ -41,12 +44,16 @@ typedef struct { class ReadBytesCallbackImplementation : public transport::http::HTTPClientConnection::ReadBytesCallback { + static std::string chunk_separator; + public: ReadBytesCallbackImplementation(std::string file_name, long yet_downloaded) : file_name_(file_name), temp_file_name_(file_name_ + ".temp"), yet_downloaded_(yet_downloaded), byte_downloaded_(yet_downloaded), + chunked_(false), + chunk_size_(0), work_(std::make_unique(io_service_)), thread_( std::make_unique([this]() { io_service_.run(); })) { @@ -71,21 +78,72 @@ class ReadBytesCallbackImplementation auto buffer_ptr = buffer.release(); io_service_.post([this, buffer_ptr]() { auto buffer = std::unique_ptr(buffer_ptr); + std::unique_ptr payload; if (!first_chunk_read_) { transport::http::HTTPResponse http_response(std::move(buffer)); - auto payload = http_response.getPayload(); + payload = http_response.getPayload(); auto header = http_response.getHeaders(); + content_size_ = yet_downloaded_; std::map::iterator it = header.find("Content-Length"); if (it != header.end()) { - content_size_ = yet_downloaded_ + std::stol(it->second); + content_size_ += std::stol(it->second); + } else { + it = header.find("Transfer-Encoding"); + if (it != header.end() && it->second.compare("chunked") == 0) { + chunked_ = true; + } } - out_->write((char *)payload->data(), payload->length()); first_chunk_read_ = true; - byte_downloaded_ += payload->length(); } else { - out_->write((char *)buffer->data(), buffer->length()); - byte_downloaded_ += buffer->length(); + payload = std::move(buffer); + } + + if (chunked_) { + if (chunk_size_ > 0) { + out_->write((char *)payload->data(), chunk_size_); + payload->trimStart(chunk_size_); + + if (payload->length() >= chunk_separator.size()) { + payload->trimStart(chunk_separator.size()); + } + } + + while (payload->length() > 0) { + // read next chunk size + const char *begin = (const char *)payload->data(); + const char *end = (const char *)payload->tail(); + + using std::experimental::make_boyer_moore_searcher; + auto it = std::experimental::search( + begin, end, + make_boyer_moore_searcher(chunk_separator.begin(), + chunk_separator.end())); + if (it != end) { + chunk_size_ = std::stoul(begin, 0, 16); + content_size_ += chunk_size_; + payload->trimStart(it + chunk_separator.size() - begin); + + std::size_t to_write; + if (payload->length() >= chunk_size_) { + to_write = chunk_size_; + } else { + to_write = payload->length(); + chunk_size_ -= payload->length(); + } + + out_->write((char *)payload->data(), to_write); + byte_downloaded_ += to_write; + payload->trimStart(to_write); + + if (payload->length() >= chunk_separator.size()) { + payload->trimStart(chunk_separator.size()); + } + } + } + } else { + out_->write((char *)payload->data(), payload->length()); + byte_downloaded_ += payload->length(); } if (file_name_ != "-") { @@ -174,11 +232,15 @@ class ReadBytesCallbackImplementation long content_size_; bool first_chunk_read_ = false; long byte_downloaded_ = 0; + bool chunked_; + std::size_t chunk_size_; asio::io_service io_service_; std::unique_ptr work_; std::unique_ptr thread_; }; +std::string ReadBytesCallbackImplementation::chunk_separator = "\r\n"; + long checkFileStatus(std::string file_name) { struct stat stat_buf; std::string temp_file_name_ = file_name + ".temp"; diff --git a/apps/http-proxy/src/ATSConnector.cc b/apps/http-proxy/src/ATSConnector.cc index f656a68cb..a9b889941 100644 --- a/apps/http-proxy/src/ATSConnector.cc +++ b/apps/http-proxy/src/ATSConnector.cc @@ -33,6 +33,9 @@ ATSConnector::ATSConnector(asio::io_service &io_service, timer_(io_service), is_reconnection_(false), data_available_(false), + content_length_(0), + is_last_chunk_(false), + chunked_(false), receive_callback_(receive_callback), on_reconnect_callback_(on_reconnect_callback) { input_buffer_.prepare(buffer_size + 2048); @@ -96,23 +99,26 @@ void ATSConnector::doWrite() { }); } // namespace transport -void ATSConnector::handleRead(std::error_code ec, std::size_t length, - std::size_t size) { +void ATSConnector::handleRead(std::error_code ec, std::size_t length) { if (TRANSPORT_EXPECT_TRUE(!ec)) { - size -= length; + content_length_ -= length; const uint8_t *buffer = asio::buffer_cast(input_buffer_.data()); - receive_callback_(buffer, input_buffer_.size(), !size, false); + receive_callback_(buffer, input_buffer_.size(), !content_length_, false); input_buffer_.consume(input_buffer_.size()); - if (!size) { - doReadHeader(); + if (!content_length_) { + if (!chunked_ || is_last_chunk_) { + doReadHeader(); + } else { + doReadChunkedHeader(); + } } else { - auto to_read = size >= buffer_size ? buffer_size : size; - asio::async_read( - socket_, input_buffer_, asio::transfer_exactly(to_read), - std::bind(&ATSConnector::handleRead, this, std::placeholders::_1, - std::placeholders::_2, size)); + auto to_read = + content_length_ >= buffer_size ? buffer_size : content_length_; + asio::async_read(socket_, input_buffer_, asio::transfer_exactly(to_read), + std::bind(&ATSConnector::handleRead, this, + std::placeholders::_1, std::placeholders::_2)); } } else if (ec == asio::error::eof) { input_buffer_.consume(input_buffer_.size()); @@ -129,20 +135,47 @@ void ATSConnector::doReadBody(std::size_t body_size, ? (buffer_size - input_buffer_.size()) : bytes_to_read; + is_last_chunk_ = chunked_ && body_size == 5; + if (to_read > 0) { - asio::async_read( - socket_, input_buffer_, asio::transfer_exactly(to_read), - std::bind(&ATSConnector::handleRead, this, std::placeholders::_1, - std::placeholders::_2, bytes_to_read)); + content_length_ = bytes_to_read; + asio::async_read(socket_, input_buffer_, asio::transfer_exactly(to_read), + std::bind(&ATSConnector::handleRead, this, + std::placeholders::_1, std::placeholders::_2)); } else { const uint8_t *buffer = asio::buffer_cast(input_buffer_.data()); - receive_callback_(buffer, body_size, !to_read, false); + receive_callback_(buffer, body_size, chunked_ ? is_last_chunk_ : !to_read, + false); input_buffer_.consume(body_size); - doReadHeader(); + + if (!chunked_ || is_last_chunk_) { + doReadHeader(); + } else { + doReadChunkedHeader(); + } } } +void ATSConnector::doReadChunkedHeader() { + asio::async_read_until( + socket_, input_buffer_, "\r\n", + [this](std::error_code ec, std::size_t length) { + if (TRANSPORT_EXPECT_TRUE(!ec)) { + const uint8_t *buffer = + asio::buffer_cast(input_buffer_.data()); + std::size_t chunk_size = + std::stoul(reinterpret_cast(buffer), 0, 16) + 2 + + length; + auto additional_bytes = input_buffer_.size(); + doReadBody(chunk_size, additional_bytes); + } else { + input_buffer_.consume(input_buffer_.size()); + tryReconnection(); + } + }); +} + void ATSConnector::doReadHeader() { asio::async_read_until( socket_, input_buffer_, "\r\n\r\n", @@ -150,14 +183,31 @@ void ATSConnector::doReadHeader() { if (TRANSPORT_EXPECT_TRUE(!ec)) { const uint8_t *buffer = asio::buffer_cast(input_buffer_.data()); - std::size_t size = HTTPMessageFastParser::hasBody(buffer, length); + auto headers = HTTPMessageFastParser::getHeaders(buffer, length); - auto additional_bytes = input_buffer_.size() - length; + // Try to get content length, if available + auto it = headers.find(HTTPMessageFastParser::content_length); + std::size_t size = 0; + if (it != headers.end()) { + size = std::stoull(it->second); + chunked_ = false; + } else { + it = headers.find(HTTPMessageFastParser::transfer_encoding); + if (it != headers.end() && + it->second.compare(HTTPMessageFastParser::chunked) == 0) { + chunked_ = true; + } + } - receive_callback_(buffer, length, !size, true); + receive_callback_(buffer, length, !size && !chunked_, true); + auto additional_bytes = input_buffer_.size() - length; input_buffer_.consume(length); - doReadBody(size, additional_bytes); + if (!chunked_) { + doReadBody(size, additional_bytes); + } else { + doReadChunkedHeader(); + } } else { input_buffer_.consume(input_buffer_.size()); tryReconnection(); diff --git a/apps/http-proxy/src/ATSConnector.h b/apps/http-proxy/src/ATSConnector.h index dbec30353..8d91b7b7b 100644 --- a/apps/http-proxy/src/ATSConnector.h +++ b/apps/http-proxy/src/ATSConnector.h @@ -65,12 +65,17 @@ class ATSConnector { void doReadBody(std::size_t body_size, std::size_t additional_bytes); + // void handleReadChunked(std::error_code ec, std::size_t length, + // std::size_t size); + + void doReadChunkedHeader(); + void doWrite(); bool checkConnected(); private: - void handleRead(std::error_code ec, std::size_t length, std::size_t bytes); + void handleRead(std::error_code ec, std::size_t length); void tryReconnection(); void startConnectionTimer(); void handleDeadline(const std::error_code &ec); @@ -88,6 +93,12 @@ class ATSConnector { bool is_reconnection_; bool data_available_; + std::size_t content_length_; + + // Chunked encoding + bool is_last_chunk_; + bool chunked_; + ContentReceivedCallback receive_callback_; OnReconnect on_reconnect_callback_; diff --git a/apps/http-proxy/src/HTTP1.xMessageFastParser.cc b/apps/http-proxy/src/HTTP1.xMessageFastParser.cc index a03871649..729eb3aeb 100644 --- a/apps/http-proxy/src/HTTP1.xMessageFastParser.cc +++ b/apps/http-proxy/src/HTTP1.xMessageFastParser.cc @@ -15,17 +15,36 @@ #include "HTTP1.xMessageFastParser.h" +#include + #include #include #include std::string HTTPMessageFastParser::numbers = "0123456789"; std::string HTTPMessageFastParser::content_length = "Content-Length"; +std::string HTTPMessageFastParser::transfer_encoding = "Transfer-Encoding"; +std::string HTTPMessageFastParser::chunked = "chunked"; std::string HTTPMessageFastParser::cache_control = "Cache-Control"; std::string HTTPMessageFastParser::mpd = "mpd"; std::string HTTPMessageFastParser::connection = "Connection"; std::string HTTPMessageFastParser::separator = "\r\n\r\n"; +HTTPHeaders HTTPMessageFastParser::getHeaders(const uint8_t *headers, + std::size_t length) { + HTTPHeaders ret; + std::string http_version; + std::string status_code; + std::string status_string; + + if (transport::http::HTTPResponse::parseHeaders(headers, length, ret, http_version, + status_code, status_string)) { + return ret; + } + + throw std::runtime_error("Error parsing response headers."); +} + std::size_t HTTPMessageFastParser::hasBody(const uint8_t *headers, std::size_t length) { const char *buffer = reinterpret_cast(headers); diff --git a/apps/http-proxy/src/HTTP1.xMessageFastParser.h b/apps/http-proxy/src/HTTP1.xMessageFastParser.h index 10a70c3e9..79dbce19d 100644 --- a/apps/http-proxy/src/HTTP1.xMessageFastParser.h +++ b/apps/http-proxy/src/HTTP1.xMessageFastParser.h @@ -18,15 +18,21 @@ #include #include +#include + +using transport::http::HTTPHeaders; + class HTTPMessageFastParser { public: + static HTTPHeaders getHeaders(const uint8_t* headers, std::size_t length); static std::size_t hasBody(const uint8_t* headers, std::size_t length); static bool isMpdRequest(const uint8_t* headers, std::size_t length); static uint32_t parseCacheControl(const uint8_t* headers, std::size_t length); - private: static std::string numbers; static std::string content_length; + static std::string transfer_encoding; + static std::string chunked; static std::string cache_control; static std::string connection; static std::string mpd; -- cgit 1.2.3-korg