summaryrefslogtreecommitdiffstats
path: root/apps/http-proxy/src/ATSConnector.cc
diff options
context:
space:
mode:
Diffstat (limited to 'apps/http-proxy/src/ATSConnector.cc')
-rw-r--r--apps/http-proxy/src/ATSConnector.cc92
1 files changed, 71 insertions, 21 deletions
diff --git a/apps/http-proxy/src/ATSConnector.cc b/apps/http-proxy/src/ATSConnector.cc
index f656a68cb..a9b889941 100644
--- a/apps/http-proxy/src/ATSConnector.cc
+++ b/apps/http-proxy/src/ATSConnector.cc
@@ -33,6 +33,9 @@ ATSConnector::ATSConnector(asio::io_service &io_service,
timer_(io_service),
is_reconnection_(false),
data_available_(false),
+ content_length_(0),
+ is_last_chunk_(false),
+ chunked_(false),
receive_callback_(receive_callback),
on_reconnect_callback_(on_reconnect_callback) {
input_buffer_.prepare(buffer_size + 2048);
@@ -96,23 +99,26 @@ void ATSConnector::doWrite() {
});
} // namespace transport
-void ATSConnector::handleRead(std::error_code ec, std::size_t length,
- std::size_t size) {
+void ATSConnector::handleRead(std::error_code ec, std::size_t length) {
if (TRANSPORT_EXPECT_TRUE(!ec)) {
- size -= length;
+ content_length_ -= length;
const uint8_t *buffer =
asio::buffer_cast<const uint8_t *>(input_buffer_.data());
- receive_callback_(buffer, input_buffer_.size(), !size, false);
+ receive_callback_(buffer, input_buffer_.size(), !content_length_, false);
input_buffer_.consume(input_buffer_.size());
- if (!size) {
- doReadHeader();
+ if (!content_length_) {
+ if (!chunked_ || is_last_chunk_) {
+ doReadHeader();
+ } else {
+ doReadChunkedHeader();
+ }
} else {
- auto to_read = size >= buffer_size ? buffer_size : size;
- asio::async_read(
- socket_, input_buffer_, asio::transfer_exactly(to_read),
- std::bind(&ATSConnector::handleRead, this, std::placeholders::_1,
- std::placeholders::_2, size));
+ auto to_read =
+ content_length_ >= buffer_size ? buffer_size : content_length_;
+ asio::async_read(socket_, input_buffer_, asio::transfer_exactly(to_read),
+ std::bind(&ATSConnector::handleRead, this,
+ std::placeholders::_1, std::placeholders::_2));
}
} else if (ec == asio::error::eof) {
input_buffer_.consume(input_buffer_.size());
@@ -129,20 +135,47 @@ void ATSConnector::doReadBody(std::size_t body_size,
? (buffer_size - input_buffer_.size())
: bytes_to_read;
+ is_last_chunk_ = chunked_ && body_size == 5;
+
if (to_read > 0) {
- asio::async_read(
- socket_, input_buffer_, asio::transfer_exactly(to_read),
- std::bind(&ATSConnector::handleRead, this, std::placeholders::_1,
- std::placeholders::_2, bytes_to_read));
+ content_length_ = bytes_to_read;
+ asio::async_read(socket_, input_buffer_, asio::transfer_exactly(to_read),
+ std::bind(&ATSConnector::handleRead, this,
+ std::placeholders::_1, std::placeholders::_2));
} else {
const uint8_t *buffer =
asio::buffer_cast<const uint8_t *>(input_buffer_.data());
- receive_callback_(buffer, body_size, !to_read, false);
+ receive_callback_(buffer, body_size, chunked_ ? is_last_chunk_ : !to_read,
+ false);
input_buffer_.consume(body_size);
- doReadHeader();
+
+ if (!chunked_ || is_last_chunk_) {
+ doReadHeader();
+ } else {
+ doReadChunkedHeader();
+ }
}
}
+void ATSConnector::doReadChunkedHeader() {
+ asio::async_read_until(
+ socket_, input_buffer_, "\r\n",
+ [this](std::error_code ec, std::size_t length) {
+ if (TRANSPORT_EXPECT_TRUE(!ec)) {
+ const uint8_t *buffer =
+ asio::buffer_cast<const uint8_t *>(input_buffer_.data());
+ std::size_t chunk_size =
+ std::stoul(reinterpret_cast<const char *>(buffer), 0, 16) + 2 +
+ length;
+ auto additional_bytes = input_buffer_.size();
+ doReadBody(chunk_size, additional_bytes);
+ } else {
+ input_buffer_.consume(input_buffer_.size());
+ tryReconnection();
+ }
+ });
+}
+
void ATSConnector::doReadHeader() {
asio::async_read_until(
socket_, input_buffer_, "\r\n\r\n",
@@ -150,14 +183,31 @@ void ATSConnector::doReadHeader() {
if (TRANSPORT_EXPECT_TRUE(!ec)) {
const uint8_t *buffer =
asio::buffer_cast<const uint8_t *>(input_buffer_.data());
- std::size_t size = HTTPMessageFastParser::hasBody(buffer, length);
+ auto headers = HTTPMessageFastParser::getHeaders(buffer, length);
- auto additional_bytes = input_buffer_.size() - length;
+ // Try to get content length, if available
+ auto it = headers.find(HTTPMessageFastParser::content_length);
+ std::size_t size = 0;
+ if (it != headers.end()) {
+ size = std::stoull(it->second);
+ chunked_ = false;
+ } else {
+ it = headers.find(HTTPMessageFastParser::transfer_encoding);
+ if (it != headers.end() &&
+ it->second.compare(HTTPMessageFastParser::chunked) == 0) {
+ chunked_ = true;
+ }
+ }
- receive_callback_(buffer, length, !size, true);
+ receive_callback_(buffer, length, !size && !chunked_, true);
+ auto additional_bytes = input_buffer_.size() - length;
input_buffer_.consume(length);
- doReadBody(size, additional_bytes);
+ if (!chunked_) {
+ doReadBody(size, additional_bytes);
+ } else {
+ doReadChunkedHeader();
+ }
} else {
input_buffer_.consume(input_buffer_.size());
tryReconnection();