diff options
Diffstat (limited to 'apps/http-proxy/src/http_session.cc')
-rw-r--r-- | apps/http-proxy/src/http_session.cc | 321 |
1 files changed, 321 insertions, 0 deletions
diff --git a/apps/http-proxy/src/http_session.cc b/apps/http-proxy/src/http_session.cc new file mode 100644 index 000000000..2c281468f --- /dev/null +++ b/apps/http-proxy/src/http_session.cc @@ -0,0 +1,321 @@ +/* + * Copyright (c) 2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "http_session.h" + +#include <hicn/transport/utils/branch_prediction.h> +#include <hicn/transport/utils/log.h> + +#include <iostream> + +#include "HTTP1.xMessageFastParser.h" + +namespace transport { + +HTTPSession::HTTPSession(asio::io_service &io_service, std::string &ip_address, + std::string &port, + ContentReceivedCallback receive_callback, + OnConnectionClosed on_connection_closed_callback, + bool reverse) + : io_service_(io_service), + socket_(io_service_), + resolver_(io_service_), + endpoint_iterator_(resolver_.resolve({ip_address, port})), + timer_(io_service), + reverse_(reverse), + is_reconnection_(false), + data_available_(false), + content_length_(0), + is_last_chunk_(false), + chunked_(false), + receive_callback_(receive_callback), + on_connection_closed_callback_(on_connection_closed_callback) { + input_buffer_.prepare(buffer_size + 2048); + state_ = ConnectorState::CONNECTING; + doConnect(); +} + +HTTPSession::HTTPSession(asio::ip::tcp::socket socket, + ContentReceivedCallback receive_callback, + OnConnectionClosed on_connection_closed_callback, + bool reverse) + : io_service_(socket.get_io_service()), + socket_(std::move(socket)), + resolver_(io_service_), + timer_(io_service_), + reverse_(reverse), + is_reconnection_(false), + data_available_(false), + content_length_(0), + is_last_chunk_(false), + chunked_(false), + receive_callback_(receive_callback), + on_connection_closed_callback_(on_connection_closed_callback) { + input_buffer_.prepare(buffer_size + 2048); + state_ = ConnectorState::CONNECTED; + asio::ip::tcp::no_delay noDelayOption(true); + socket_.set_option(noDelayOption); + doReadHeader(); +} + +HTTPSession::~HTTPSession() {} + +void HTTPSession::send(const uint8_t *packet, std::size_t len, + ContentSentCallback &&content_sent) { + asio::async_write( + socket_, asio::buffer(packet, len), + [content_sent = std::move(content_sent)]( + std::error_code ec, std::size_t /*length*/) { content_sent(); }); +} + +void HTTPSession::send(utils::MemBuf *buffer, + ContentSentCallback &&content_sent) { + io_service_.dispatch([this, buffer, callback = std::move(content_sent)]() { + bool write_in_progress = !write_msgs_.empty(); + write_msgs_.emplace_back(std::unique_ptr<utils::MemBuf>(buffer), + std::move(callback)); + if (TRANSPORT_EXPECT_TRUE(state_ == ConnectorState::CONNECTED)) { + if (!write_in_progress) { + doWrite(); + } + } else { + TRANSPORT_LOGD("Tell the handle connect it has data to write"); + data_available_ = true; + } + }); +} + +void HTTPSession::close() { + if (state_ != ConnectorState::CLOSED) { + state_ = ConnectorState::CLOSED; + if (socket_.is_open()) { + // socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both); + socket_.close(); + // on_disconnect_callback_(); + } + } +} + +void HTTPSession::doWrite() { + auto &buffer = write_msgs_.front().first; + + asio::async_write(socket_, asio::buffer(buffer->data(), buffer->length()), + [this](std::error_code ec, std::size_t length) { + if (TRANSPORT_EXPECT_FALSE(!ec)) { + TRANSPORT_LOGD("Content successfully sent! %zu", + length); + write_msgs_.front().second(); + write_msgs_.pop_front(); + if (!write_msgs_.empty()) { + doWrite(); + } + } else { + TRANSPORT_LOGD("Content NOT sent!"); + } + }); +} // namespace transport + +void HTTPSession::handleRead(std::error_code ec, std::size_t length) { + if (TRANSPORT_EXPECT_TRUE(!ec)) { + content_length_ -= length; + const uint8_t *buffer = + asio::buffer_cast<const uint8_t *>(input_buffer_.data()); + bool is_last = chunked_ ? (is_last_chunk_ ? !content_length_ : false) + : !content_length_; + receive_callback_(buffer, input_buffer_.size(), is_last, false); + input_buffer_.consume(input_buffer_.size()); + + if (!content_length_) { + if (!chunked_ || is_last_chunk_) { + doReadHeader(); + } else { + doReadChunkedHeader(); + } + } else { + auto to_read = + content_length_ >= buffer_size ? buffer_size : content_length_; + asio::async_read(socket_, input_buffer_, asio::transfer_exactly(to_read), + std::bind(&HTTPSession::handleRead, this, + std::placeholders::_1, std::placeholders::_2)); + } + } else if (ec == asio::error::eof) { + input_buffer_.consume(input_buffer_.size()); + tryReconnection(); + } +} + +void HTTPSession::doReadBody(std::size_t body_size, + std::size_t additional_bytes) { + auto bytes_to_read = + body_size > additional_bytes ? (body_size - additional_bytes) : 0; + + auto to_read = bytes_to_read >= buffer_size + ? (buffer_size - input_buffer_.size()) + : bytes_to_read; + + is_last_chunk_ = chunked_ && body_size == 5; + + if (to_read > 0) { + content_length_ = bytes_to_read; + asio::async_read(socket_, input_buffer_, asio::transfer_exactly(to_read), + std::bind(&HTTPSession::handleRead, this, + std::placeholders::_1, std::placeholders::_2)); + } else { + if (body_size) { + const uint8_t *buffer = + asio::buffer_cast<const uint8_t *>(input_buffer_.data()); + receive_callback_(buffer, body_size, chunked_ ? is_last_chunk_ : !to_read, + false); + input_buffer_.consume(body_size); + } + + if (!chunked_ || is_last_chunk_) { + doReadHeader(); + } else { + doReadChunkedHeader(); + } + } +} + +void HTTPSession::doReadChunkedHeader() { + asio::async_read_until( + socket_, input_buffer_, "\r\n", + [this](std::error_code ec, std::size_t length) { + if (TRANSPORT_EXPECT_TRUE(!ec)) { + const uint8_t *buffer = + asio::buffer_cast<const uint8_t *>(input_buffer_.data()); + std::size_t chunk_size = + std::stoul(reinterpret_cast<const char *>(buffer), 0, 16) + 2 + + length; + auto additional_bytes = input_buffer_.size(); + doReadBody(chunk_size, additional_bytes); + } else { + input_buffer_.consume(input_buffer_.size()); + tryReconnection(); + } + }); +} + +void HTTPSession::doReadHeader() { + asio::async_read_until( + socket_, input_buffer_, "\r\n\r\n", + [this](std::error_code ec, std::size_t length) { + if (TRANSPORT_EXPECT_TRUE(!ec)) { + const uint8_t *buffer = + asio::buffer_cast<const uint8_t *>(input_buffer_.data()); + auto headers = + HTTPMessageFastParser::getHeaders(buffer, length, reverse_); + + // Try to get content length, if available + auto it = headers.find(HTTPMessageFastParser::content_length); + std::size_t size = 0; + if (it != headers.end()) { + size = std::stoull(it->second); + chunked_ = false; + } else { + it = headers.find(HTTPMessageFastParser::transfer_encoding); + if (it != headers.end() && + it->second.compare(HTTPMessageFastParser::chunked) == 0) { + chunked_ = true; + } + } + + receive_callback_(buffer, length, !size && !chunked_, true); + auto additional_bytes = input_buffer_.size() - length; + input_buffer_.consume(length); + + if (!chunked_) { + doReadBody(size, additional_bytes); + } else { + doReadChunkedHeader(); + } + } else { + input_buffer_.consume(input_buffer_.size()); + tryReconnection(); + } + }); +} + +void HTTPSession::tryReconnection() { + if (on_connection_closed_callback_(socket_)) { + if (state_ == ConnectorState::CONNECTED) { + TRANSPORT_LOGD("Connection lost. Trying to reconnect...\n"); + state_ = ConnectorState::CONNECTING; + is_reconnection_ = true; + io_service_.post([this]() { + if (socket_.is_open()) { + // socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both); + socket_.close(); + } + startConnectionTimer(); + doConnect(); + }); + } + } +} + +void HTTPSession::doConnect() { + asio::async_connect(socket_, endpoint_iterator_, + [this](std::error_code ec, tcp::resolver::iterator) { + if (!ec) { + timer_.cancel(); + state_ = ConnectorState::CONNECTED; + + asio::ip::tcp::no_delay noDelayOption(true); + socket_.set_option(noDelayOption); + + // on_reconnect_callback_(); + + doReadHeader(); + + if (data_available_ && !write_msgs_.empty()) { + data_available_ = false; + doWrite(); + } + + if (is_reconnection_) { + is_reconnection_ = false; + TRANSPORT_LOGD("Connection recovered!"); + } + + } else { + TRANSPORT_LOGE("Impossible to reconnect: %s", + ec.message().c_str()); + close(); + } + }); +} + +bool HTTPSession::checkConnected() { + return state_ == ConnectorState::CONNECTED; +} + +void HTTPSession::startConnectionTimer() { + timer_.expires_from_now(std::chrono::seconds(10)); + timer_.async_wait( + std::bind(&HTTPSession::handleDeadline, this, std::placeholders::_1)); +} + +void HTTPSession::handleDeadline(const std::error_code &ec) { + if (!ec) { + io_service_.post([this]() { + socket_.close(); + TRANSPORT_LOGE("Error connecting. Is the server running?\n"); + io_service_.stop(); + }); + } +} + +} // namespace transport |