diff options
Diffstat (limited to 'apps/http-proxy')
-rw-r--r-- | apps/http-proxy/src/ATSConnector.cc | 92 | ||||
-rw-r--r-- | apps/http-proxy/src/ATSConnector.h | 13 | ||||
-rw-r--r-- | apps/http-proxy/src/HTTP1.xMessageFastParser.cc | 19 | ||||
-rw-r--r-- | apps/http-proxy/src/HTTP1.xMessageFastParser.h | 8 |
4 files changed, 109 insertions, 23 deletions
diff --git a/apps/http-proxy/src/ATSConnector.cc b/apps/http-proxy/src/ATSConnector.cc index f656a68cb..a9b889941 100644 --- a/apps/http-proxy/src/ATSConnector.cc +++ b/apps/http-proxy/src/ATSConnector.cc @@ -33,6 +33,9 @@ ATSConnector::ATSConnector(asio::io_service &io_service, timer_(io_service), is_reconnection_(false), data_available_(false), + content_length_(0), + is_last_chunk_(false), + chunked_(false), receive_callback_(receive_callback), on_reconnect_callback_(on_reconnect_callback) { input_buffer_.prepare(buffer_size + 2048); @@ -96,23 +99,26 @@ void ATSConnector::doWrite() { }); } // namespace transport -void ATSConnector::handleRead(std::error_code ec, std::size_t length, - std::size_t size) { +void ATSConnector::handleRead(std::error_code ec, std::size_t length) { if (TRANSPORT_EXPECT_TRUE(!ec)) { - size -= length; + content_length_ -= length; const uint8_t *buffer = asio::buffer_cast<const uint8_t *>(input_buffer_.data()); - receive_callback_(buffer, input_buffer_.size(), !size, false); + receive_callback_(buffer, input_buffer_.size(), !content_length_, false); input_buffer_.consume(input_buffer_.size()); - if (!size) { - doReadHeader(); + if (!content_length_) { + if (!chunked_ || is_last_chunk_) { + doReadHeader(); + } else { + doReadChunkedHeader(); + } } else { - auto to_read = size >= buffer_size ? buffer_size : size; - asio::async_read( - socket_, input_buffer_, asio::transfer_exactly(to_read), - std::bind(&ATSConnector::handleRead, this, std::placeholders::_1, - std::placeholders::_2, size)); + auto to_read = + content_length_ >= buffer_size ? buffer_size : content_length_; + asio::async_read(socket_, input_buffer_, asio::transfer_exactly(to_read), + std::bind(&ATSConnector::handleRead, this, + std::placeholders::_1, std::placeholders::_2)); } } else if (ec == asio::error::eof) { input_buffer_.consume(input_buffer_.size()); @@ -129,20 +135,47 @@ void ATSConnector::doReadBody(std::size_t body_size, ? (buffer_size - input_buffer_.size()) : bytes_to_read; + is_last_chunk_ = chunked_ && body_size == 5; + if (to_read > 0) { - asio::async_read( - socket_, input_buffer_, asio::transfer_exactly(to_read), - std::bind(&ATSConnector::handleRead, this, std::placeholders::_1, - std::placeholders::_2, bytes_to_read)); + content_length_ = bytes_to_read; + asio::async_read(socket_, input_buffer_, asio::transfer_exactly(to_read), + std::bind(&ATSConnector::handleRead, this, + std::placeholders::_1, std::placeholders::_2)); } else { const uint8_t *buffer = asio::buffer_cast<const uint8_t *>(input_buffer_.data()); - receive_callback_(buffer, body_size, !to_read, false); + receive_callback_(buffer, body_size, chunked_ ? is_last_chunk_ : !to_read, + false); input_buffer_.consume(body_size); - doReadHeader(); + + if (!chunked_ || is_last_chunk_) { + doReadHeader(); + } else { + doReadChunkedHeader(); + } } } +void ATSConnector::doReadChunkedHeader() { + asio::async_read_until( + socket_, input_buffer_, "\r\n", + [this](std::error_code ec, std::size_t length) { + if (TRANSPORT_EXPECT_TRUE(!ec)) { + const uint8_t *buffer = + asio::buffer_cast<const uint8_t *>(input_buffer_.data()); + std::size_t chunk_size = + std::stoul(reinterpret_cast<const char *>(buffer), 0, 16) + 2 + + length; + auto additional_bytes = input_buffer_.size(); + doReadBody(chunk_size, additional_bytes); + } else { + input_buffer_.consume(input_buffer_.size()); + tryReconnection(); + } + }); +} + void ATSConnector::doReadHeader() { asio::async_read_until( socket_, input_buffer_, "\r\n\r\n", @@ -150,14 +183,31 @@ void ATSConnector::doReadHeader() { if (TRANSPORT_EXPECT_TRUE(!ec)) { const uint8_t *buffer = asio::buffer_cast<const uint8_t *>(input_buffer_.data()); - std::size_t size = HTTPMessageFastParser::hasBody(buffer, length); + auto headers = HTTPMessageFastParser::getHeaders(buffer, length); - auto additional_bytes = input_buffer_.size() - length; + // Try to get content length, if available + auto it = headers.find(HTTPMessageFastParser::content_length); + std::size_t size = 0; + if (it != headers.end()) { + size = std::stoull(it->second); + chunked_ = false; + } else { + it = headers.find(HTTPMessageFastParser::transfer_encoding); + if (it != headers.end() && + it->second.compare(HTTPMessageFastParser::chunked) == 0) { + chunked_ = true; + } + } - receive_callback_(buffer, length, !size, true); + receive_callback_(buffer, length, !size && !chunked_, true); + auto additional_bytes = input_buffer_.size() - length; input_buffer_.consume(length); - doReadBody(size, additional_bytes); + if (!chunked_) { + doReadBody(size, additional_bytes); + } else { + doReadChunkedHeader(); + } } else { input_buffer_.consume(input_buffer_.size()); tryReconnection(); diff --git a/apps/http-proxy/src/ATSConnector.h b/apps/http-proxy/src/ATSConnector.h index dbec30353..8d91b7b7b 100644 --- a/apps/http-proxy/src/ATSConnector.h +++ b/apps/http-proxy/src/ATSConnector.h @@ -65,12 +65,17 @@ class ATSConnector { void doReadBody(std::size_t body_size, std::size_t additional_bytes); + // void handleReadChunked(std::error_code ec, std::size_t length, + // std::size_t size); + + void doReadChunkedHeader(); + void doWrite(); bool checkConnected(); private: - void handleRead(std::error_code ec, std::size_t length, std::size_t bytes); + void handleRead(std::error_code ec, std::size_t length); void tryReconnection(); void startConnectionTimer(); void handleDeadline(const std::error_code &ec); @@ -88,6 +93,12 @@ class ATSConnector { bool is_reconnection_; bool data_available_; + std::size_t content_length_; + + // Chunked encoding + bool is_last_chunk_; + bool chunked_; + ContentReceivedCallback receive_callback_; OnReconnect on_reconnect_callback_; diff --git a/apps/http-proxy/src/HTTP1.xMessageFastParser.cc b/apps/http-proxy/src/HTTP1.xMessageFastParser.cc index a03871649..729eb3aeb 100644 --- a/apps/http-proxy/src/HTTP1.xMessageFastParser.cc +++ b/apps/http-proxy/src/HTTP1.xMessageFastParser.cc @@ -15,17 +15,36 @@ #include "HTTP1.xMessageFastParser.h" +#include <hicn/transport/http/response.h> + #include <experimental/algorithm> #include <experimental/functional> #include <iostream> std::string HTTPMessageFastParser::numbers = "0123456789"; std::string HTTPMessageFastParser::content_length = "Content-Length"; +std::string HTTPMessageFastParser::transfer_encoding = "Transfer-Encoding"; +std::string HTTPMessageFastParser::chunked = "chunked"; std::string HTTPMessageFastParser::cache_control = "Cache-Control"; std::string HTTPMessageFastParser::mpd = "mpd"; std::string HTTPMessageFastParser::connection = "Connection"; std::string HTTPMessageFastParser::separator = "\r\n\r\n"; +HTTPHeaders HTTPMessageFastParser::getHeaders(const uint8_t *headers, + std::size_t length) { + HTTPHeaders ret; + std::string http_version; + std::string status_code; + std::string status_string; + + if (transport::http::HTTPResponse::parseHeaders(headers, length, ret, http_version, + status_code, status_string)) { + return ret; + } + + throw std::runtime_error("Error parsing response headers."); +} + std::size_t HTTPMessageFastParser::hasBody(const uint8_t *headers, std::size_t length) { const char *buffer = reinterpret_cast<const char *>(headers); diff --git a/apps/http-proxy/src/HTTP1.xMessageFastParser.h b/apps/http-proxy/src/HTTP1.xMessageFastParser.h index 10a70c3e9..79dbce19d 100644 --- a/apps/http-proxy/src/HTTP1.xMessageFastParser.h +++ b/apps/http-proxy/src/HTTP1.xMessageFastParser.h @@ -18,15 +18,21 @@ #include <algorithm> #include <string> +#include <hicn/transport/http/message.h> + +using transport::http::HTTPHeaders; + class HTTPMessageFastParser { public: + static HTTPHeaders getHeaders(const uint8_t* headers, std::size_t length); static std::size_t hasBody(const uint8_t* headers, std::size_t length); static bool isMpdRequest(const uint8_t* headers, std::size_t length); static uint32_t parseCacheControl(const uint8_t* headers, std::size_t length); - private: static std::string numbers; static std::string content_length; + static std::string transfer_encoding; + static std::string chunked; static std::string cache_control; static std::string connection; static std::string mpd; |