From 1ad06afe9f952642a26f4d28239cf05eb3283eb7 Mon Sep 17 00:00:00 2001 From: Mauro Sardara Date: Tue, 19 Mar 2019 14:26:52 +0100 Subject: [HICN-6] ATS Working, little refactoring of apps Change-Id: I174815b70bf3a9fbe99ffab7dd2914be04d364b9 Signed-off-by: Mauro Sardara --- apps/http-proxy/src/ATSConnector.cc | 224 ++++++++++++++++++++++++ apps/http-proxy/src/ATSConnector.h | 99 +++++++++++ apps/http-proxy/src/HTTP1.xMessageFastParser.cc | 71 ++++++++ apps/http-proxy/src/HTTP1.xMessageFastParser.h | 34 ++++ apps/http-proxy/src/IcnReceiver.cc | 174 ++++++++++++++++++ apps/http-proxy/src/IcnReceiver.h | 69 ++++++++ 6 files changed, 671 insertions(+) create mode 100644 apps/http-proxy/src/ATSConnector.cc create mode 100644 apps/http-proxy/src/ATSConnector.h create mode 100644 apps/http-proxy/src/HTTP1.xMessageFastParser.cc create mode 100644 apps/http-proxy/src/HTTP1.xMessageFastParser.h create mode 100644 apps/http-proxy/src/IcnReceiver.cc create mode 100644 apps/http-proxy/src/IcnReceiver.h (limited to 'apps/http-proxy/src') diff --git a/apps/http-proxy/src/ATSConnector.cc b/apps/http-proxy/src/ATSConnector.cc new file mode 100644 index 000000000..81f7a776a --- /dev/null +++ b/apps/http-proxy/src/ATSConnector.cc @@ -0,0 +1,224 @@ +/* + * Copyright (c) 2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "ATSConnector.h" +#include "HTTP1.xMessageFastParser.h" + +#include +#include +#include + +namespace transport { + +ATSConnector::ATSConnector(asio::io_service &io_service, + std::string &ip_address, std::string &port, + ContentReceivedCallback receive_callback, + OnReconnect on_reconnect_callback) + : io_service_(io_service), + socket_(io_service_), + resolver_(io_service_), + endpoint_iterator_(resolver_.resolve({ip_address, port})), + timer_(io_service), + is_reconnection_(false), + data_available_(false), + receive_callback_(receive_callback), + on_reconnect_callback_(on_reconnect_callback) { + header_input_buffer_.prepare(2048); + state_ = ConnectorState::CONNECTING; + doConnect(); +} + +ATSConnector::~ATSConnector() {} + +void ATSConnector::send(const uint8_t *packet, std::size_t len, + ContentSentCallback &&content_sent) { + asio::async_write( + socket_, asio::buffer(packet, len), + [content_sent = std::move(content_sent)]( + std::error_code ec, std::size_t /*length*/) { content_sent(); }); +} + +void ATSConnector::send(utils::MemBuf *buffer, + ContentSentCallback &&content_sent) { + io_service_.dispatch([this, buffer, callback = std::move(content_sent)]() { + bool write_in_progress = !write_msgs_.empty(); + write_msgs_.emplace_back(std::unique_ptr(buffer), + std::move(callback)); + if (TRANSPORT_EXPECT_TRUE(state_ == ConnectorState::CONNECTED)) { + if (!write_in_progress) { + doWrite(); + } + } else { + TRANSPORT_LOGD(" Tell the handle connect it has data to write"); + data_available_ = true; + } + }); +} + +void ATSConnector::close() { + if (state_ != ConnectorState::CLOSED) { + state_ = ConnectorState::CLOSED; + if (socket_.is_open()) { + socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both); + socket_.close(); + // on_disconnect_callback_(); + } + } +} + +void ATSConnector::doWrite() { + auto &buffer = write_msgs_.front().first; + + asio::async_write(socket_, asio::buffer(buffer->data(), buffer->length()), + [this](std::error_code ec, std::size_t length) { + if (TRANSPORT_EXPECT_FALSE(!ec)) { + TRANSPORT_LOGD("Content successfully sent!"); + write_msgs_.front().second(); + write_msgs_.pop_front(); + if (!write_msgs_.empty()) { + doWrite(); + } + } else { + TRANSPORT_LOGD("Content NOT sent!"); + } + }); +} // namespace transport + +void ATSConnector::handleRead(std::error_code ec, std::size_t length, + std::size_t size) { + if (TRANSPORT_EXPECT_TRUE(!ec)) { + std::size_t bytes_in_buffer = length; + size -= bytes_in_buffer; + receive_callback_(input_buffer_, bytes_in_buffer, !size, false); + + if (!size) { + doReadHeader(); + } else { + auto to_read = size >= buffer_size ? buffer_size : size; + asio::async_read( + socket_, asio::buffer(input_buffer_, to_read), + std::bind(&ATSConnector::handleRead, this, std::placeholders::_1, + std::placeholders::_2, size)); + } + } else if (ec == asio::error::eof) { + tryReconnection(); + } +} + +void ATSConnector::doReadBody(std::size_t size) { + auto to_read = size >= buffer_size ? buffer_size : size; + asio::async_read( + socket_, asio::buffer(input_buffer_, to_read), + std::bind(&ATSConnector::handleRead, this, std::placeholders::_1, + std::placeholders::_2, size)); +} + +void ATSConnector::doReadHeader() { + asio::async_read_until( + socket_, header_input_buffer_, "\r\n\r\n", + [this](std::error_code ec, std::size_t length) { + if (TRANSPORT_EXPECT_TRUE(!ec)) { + // TRANSPORT_LOGD("Headers received"); + + const uint8_t *buffer = + asio::buffer_cast(header_input_buffer_.data()); + std::size_t size = HTTPMessageFastParser::hasBody(buffer, length); + + auto additional_bytes = header_input_buffer_.size() - length; + auto bytes_to_read = size - additional_bytes; + receive_callback_(buffer, header_input_buffer_.size(), !bytes_to_read, + true); + header_input_buffer_.consume(header_input_buffer_.size()); + + if (bytes_to_read) { + doReadBody(bytes_to_read); + } else { + doReadHeader(); + } + } else { + header_input_buffer_.consume(header_input_buffer_.size()); + tryReconnection(); + } + }); +} + +void ATSConnector::tryReconnection() { + TRANSPORT_LOGD("Connection lost. Trying to reconnect...\n"); + if (state_ == ConnectorState::CONNECTED) { + state_ = ConnectorState::CONNECTING; + is_reconnection_ = true; + io_service_.post([this]() { + if (socket_.is_open()) { + socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both); + socket_.close(); + } + startConnectionTimer(); + doConnect(); + }); + } +} + +void ATSConnector::doConnect() { + asio::async_connect(socket_, endpoint_iterator_, + [this](std::error_code ec, tcp::resolver::iterator) { + if (!ec) { + timer_.cancel(); + state_ = ConnectorState::CONNECTED; + + asio::ip::tcp::no_delay noDelayOption(true); + socket_.set_option(noDelayOption); + + // on_reconnect_callback_(); + + doReadHeader(); + + if (data_available_ && !write_msgs_.empty()) { + data_available_ = false; + doWrite(); + } + + if (is_reconnection_) { + is_reconnection_ = false; + TRANSPORT_LOGD("Connection recovered!"); + } + + } else { + TRANSPORT_LOGE("Impossible to reconnect."); + close(); + } + }); +} + +bool ATSConnector::checkConnected() { + return state_ == ConnectorState::CONNECTED; +} + +void ATSConnector::startConnectionTimer() { + timer_.expires_from_now(std::chrono::seconds(10)); + timer_.async_wait( + std::bind(&ATSConnector::handleDeadline, this, std::placeholders::_1)); +} + +void ATSConnector::handleDeadline(const std::error_code &ec) { + if (!ec) { + io_service_.post([this]() { + socket_.close(); + TRANSPORT_LOGE("Error connecting. Is the server running?\n"); + io_service_.stop(); + }); + } +} + +} // namespace transport diff --git a/apps/http-proxy/src/ATSConnector.h b/apps/http-proxy/src/ATSConnector.h new file mode 100644 index 000000000..be5c2c8d5 --- /dev/null +++ b/apps/http-proxy/src/ATSConnector.h @@ -0,0 +1,99 @@ +/* + * Copyright (c) 2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#define ASIO_STANDALONE +#include +#include +#include + +namespace transport { + +using asio::ip::tcp; + +typedef std::function + ContentReceivedCallback; +typedef std::function OnReconnect; +typedef std::function ContentSentCallback; +typedef std::deque< + std::pair, ContentSentCallback>> + BufferQueue; + +class ATSConnector { + static constexpr uint32_t buffer_size = 1024 * 64; + + enum class ConnectorState { + CLOSED, + CONNECTING, + CONNECTED, + }; + + public: + ATSConnector(asio::io_service &io_service, std::string &ip_address, + std::string &port, ContentReceivedCallback receive_callback, + OnReconnect on_reconnect_callback); + + ~ATSConnector(); + + void send(const uint8_t *buffer, std::size_t len, + ContentSentCallback &&content_sent = 0); + + void send(utils::MemBuf *buffer, ContentSentCallback &&content_sent); + + void close(); + + private: + void doConnect(); + + void doReadHeader(); + + void doReadBody(std::size_t size); + + void doWrite(); + + bool checkConnected(); + + private: + void handleRead(std::error_code ec, std::size_t length, std::size_t bytes); + void tryReconnection(); + void startConnectionTimer(); + void handleDeadline(const std::error_code &ec); + + asio::io_service &io_service_; + asio::ip::tcp::socket socket_; + asio::ip::tcp::resolver resolver_; + asio::ip::tcp::resolver::iterator endpoint_iterator_; + asio::steady_timer timer_; + + BufferQueue write_msgs_; + + asio::streambuf header_input_buffer_; + uint8_t input_buffer_[buffer_size]; + + bool is_reconnection_; + bool data_available_; + + ContentReceivedCallback receive_callback_; + OnReconnect on_reconnect_callback_; + + // Connector state + ConnectorState state_; +}; + +} // namespace transport diff --git a/apps/http-proxy/src/HTTP1.xMessageFastParser.cc b/apps/http-proxy/src/HTTP1.xMessageFastParser.cc new file mode 100644 index 000000000..a03871649 --- /dev/null +++ b/apps/http-proxy/src/HTTP1.xMessageFastParser.cc @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "HTTP1.xMessageFastParser.h" + +#include +#include +#include + +std::string HTTPMessageFastParser::numbers = "0123456789"; +std::string HTTPMessageFastParser::content_length = "Content-Length"; +std::string HTTPMessageFastParser::cache_control = "Cache-Control"; +std::string HTTPMessageFastParser::mpd = "mpd"; +std::string HTTPMessageFastParser::connection = "Connection"; +std::string HTTPMessageFastParser::separator = "\r\n\r\n"; + +std::size_t HTTPMessageFastParser::hasBody(const uint8_t *headers, + std::size_t length) { + const char *buffer = reinterpret_cast(headers); + const char *begin = buffer; + const char *end = buffer + length; + + using std::experimental::make_boyer_moore_searcher; + auto it = std::experimental::search( + begin, end, + make_boyer_moore_searcher(content_length.begin(), content_length.end())); + + if (it != end) { + // Read header line + auto it2 = std::find_first_of(it, end, numbers.begin(), numbers.end()); + auto it3 = std::find(it2, end, '\n'); + + return std::stoul(std::string(it2, it3)); + } + + return 0; +} + +bool HTTPMessageFastParser::isMpdRequest(const uint8_t *headers, + std::size_t length) { + const char *buffer = reinterpret_cast(headers); + const char *begin = buffer; + const char *end = buffer + length; + + using std::experimental::make_boyer_moore_searcher; + auto it = std::experimental::search( + begin, end, make_boyer_moore_searcher(mpd.begin(), mpd.end())); + + if (it != end) { + return true; + } + + return false; +} + +uint32_t HTTPMessageFastParser::parseCacheControl(const uint8_t *headers, + std::size_t length) { + return 0; +} diff --git a/apps/http-proxy/src/HTTP1.xMessageFastParser.h b/apps/http-proxy/src/HTTP1.xMessageFastParser.h new file mode 100644 index 000000000..10a70c3e9 --- /dev/null +++ b/apps/http-proxy/src/HTTP1.xMessageFastParser.h @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +class HTTPMessageFastParser { + public: + static std::size_t hasBody(const uint8_t* headers, std::size_t length); + static bool isMpdRequest(const uint8_t* headers, std::size_t length); + static uint32_t parseCacheControl(const uint8_t* headers, std::size_t length); + + private: + static std::string numbers; + static std::string content_length; + static std::string cache_control; + static std::string connection; + static std::string mpd; + static std::string separator; +}; diff --git a/apps/http-proxy/src/IcnReceiver.cc b/apps/http-proxy/src/IcnReceiver.cc new file mode 100644 index 000000000..ee8ef0823 --- /dev/null +++ b/apps/http-proxy/src/IcnReceiver.cc @@ -0,0 +1,174 @@ +/* + * Copyright (c) 2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "IcnReceiver.h" +#include "HTTP1.xMessageFastParser.h" + +#include +#include + +#include +#include + +namespace transport { + +core::Prefix generatePrefix(const std::string& prefix_url) { + const char* str = prefix_url.c_str(); + uint16_t pos = 0; + + if (strncmp("http://", str, 7) == 0) { + pos = 7; + } else if (strncmp("https://", str, 8) == 0) { + pos = 8; + } + + str += pos; + + uint32_t locator_hash = utils::hash::fnv32_buf(str, strlen(str)); + + std::stringstream stream; + stream << std::hex << http::default_values::ipv6_first_word << ":0"; + + for (uint16_t* word = (uint16_t*)&locator_hash; + std::size_t(word) < (std::size_t(&locator_hash) + sizeof(locator_hash)); + word++) { + stream << ":" << std::hex << *word; + } + + stream << "::0"; + + return core::Prefix(stream.str(), 64); +} + +AsyncConsumerProducer::AsyncConsumerProducer(const std::string& prefix, + std::string& ip_address, + std::string& port, + std::string& cache_size) + : prefix_(generatePrefix(prefix)), + producer_socket_(), + ip_address_(ip_address), + port_(port), + cache_size_(std::stoul(cache_size)), + request_counter_(0), + signals_(io_service_, SIGINT, SIGQUIT), + connector_(io_service_, ip_address_, port_, + std::bind(&AsyncConsumerProducer::publishContent, this, + std::placeholders::_1, std::placeholders::_2, + std::placeholders::_3, std::placeholders::_4), + [this]() { + std::queue empty; + std::swap(response_name_queue_, empty); + }) { + int ret = producer_socket_.setSocketOption( + interface::GeneralTransportOptions::OUTPUT_BUFFER_SIZE, cache_size_); + + if (ret != SOCKET_OPTION_SET) { + TRANSPORT_LOGD("Warning: output buffer size has not been set."); + } + + producer_socket_.registerPrefix(prefix_); + + // Let the main thread to catch SIGINT and SIGQUIT + signals_.async_wait( + [this](const std::error_code& errorCode, int signal_number) { + TRANSPORT_LOGI("Number of requests processed by plugin: %lu", + (unsigned long)request_counter_); + producer_socket_.stop(); + connector_.close(); + }); +} + +void AsyncConsumerProducer::start() { + TRANSPORT_LOGD("Starting listening"); + doReceive(); +} + +void AsyncConsumerProducer::run() { + start(); + io_service_.run(); +} + +void AsyncConsumerProducer::doReceive() { + producer_socket_.setSocketOption( + interface::ProducerCallbacksOptions::CACHE_MISS, + [this](interface::ProducerSocket& producer, + interface::Interest& interest) { + // core::Name n(interest.getWritableName(), true); + io_service_.post(std::bind( + &AsyncConsumerProducer::manageIncomingInterest, this, + interest.getWritableName(), interest.acquireMemBufReference(), + interest.getPayload().release())); + }); + + producer_socket_.connect(); +} + +void AsyncConsumerProducer::manageIncomingInterest( + core::Name& name, core::Packet::MemBufPtr& packet, utils::MemBuf* payload) { + // auto seg = name.getSuffix(); + name.setSuffix(0); + auto _it = chunk_number_map_.find(name); + auto _end = chunk_number_map_.end(); + + if (_it != _end) { + return; + } + + bool is_mpd = + HTTPMessageFastParser::isMpdRequest(payload->data(), payload->length()); + + chunk_number_map_.emplace(name, 0); + response_name_queue_.emplace(std::move(name), is_mpd ? 500 : 10000); + + connector_.send(payload, [packet = std::move(packet)]() {}); +} + +void AsyncConsumerProducer::publishContent(const uint8_t* data, + std::size_t size, bool is_last, + bool headers) { + uint32_t start_suffix = 0; + + if (response_name_queue_.empty()) { + abort(); + } + + interface::PublicationOptions& options = response_name_queue_.front(); + + int ret = producer_socket_.setSocketOption( + interface::GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME, + options.getLifetime()); + + if (TRANSPORT_EXPECT_FALSE(ret != SOCKET_OPTION_SET)) { + TRANSPORT_LOGD("Warning: content object lifetime has not been set."); + } + + const interface::Name& name = options.getName(); + + start_suffix = chunk_number_map_[name]; + + if (headers) { + request_counter_++; + } + + chunk_number_map_[name] += + producer_socket_.produce(name, data, size, is_last, start_suffix); + + if (is_last) { + chunk_number_map_.erase(name); + response_name_queue_.pop(); + } +} + +} // namespace transport diff --git a/apps/http-proxy/src/IcnReceiver.h b/apps/http-proxy/src/IcnReceiver.h new file mode 100644 index 000000000..7d5c5e4c8 --- /dev/null +++ b/apps/http-proxy/src/IcnReceiver.h @@ -0,0 +1,69 @@ +/* + * Copyright (c) 2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "ATSConnector.h" + +#include +#include +#include +#include + +#include +#include +#include +#include + +namespace transport { + +class AsyncConsumerProducer { + public: + explicit AsyncConsumerProducer(const std::string& prefix, + std::string& ip_address, std::string& port, + std::string& cache_size); + + void start(); + + void run(); + + private: + void doSend(); + + void doReceive(); + + void publishContent(const uint8_t* data, std::size_t size, + bool is_last = true, bool headers = false); + + void manageIncomingInterest(core::Name& name, core::Packet::MemBufPtr& packet, + utils::MemBuf* payload); + + core::Prefix prefix_; + asio::io_service io_service_; + interface::ProducerSocket producer_socket_; + + std::string ip_address_; + std::string port_; + uint32_t cache_size_; + + uint64_t request_counter_; + asio::signal_set signals_; + + // std::unordered_map> + // connection_map_; + ATSConnector connector_; + std::unordered_map chunk_number_map_; + std::queue response_name_queue_; +}; + +} // namespace transport -- cgit 1.2.3-korg