diff options
Diffstat (limited to 'icnet/ccnx/icnet_ccnx_local_connector.cc')
-rw-r--r-- | icnet/ccnx/icnet_ccnx_local_connector.cc | 195 |
1 files changed, 195 insertions, 0 deletions
diff --git a/icnet/ccnx/icnet_ccnx_local_connector.cc b/icnet/ccnx/icnet_ccnx_local_connector.cc new file mode 100644 index 00000000..2a47c117 --- /dev/null +++ b/icnet/ccnx/icnet_ccnx_local_connector.cc @@ -0,0 +1,195 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "icnet_ccnx_local_connector.h" + +namespace icnet { + +namespace ccnx { + +LocalConnector::LocalConnector(boost::asio::io_service &io_service, + std::string &ip_address, + std::string &port, + MessageReceivedCallback receive_callback, + std::list<Name> &name_list) + : io_service_(io_service), + socket_(io_service_), + resolver_(io_service_), + endpoint_iterator_(resolver_.resolve({ip_address, port})), + timer_(io_service), + is_connecting_(false), + is_reconnection_(false), + data_available_(false), + receive_callback_(receive_callback), + served_name_list_(name_list) { + startConnectionTimer(); + doConnect(); +} + +LocalConnector::~LocalConnector() { +} + +void LocalConnector::bind(Name &name) { + CCNxControl *control = ccnxControl_CreateAddRouteToSelfRequest(name.getWrappedStructure()); + CCNxMetaMessage *message = ccnxMetaMessage_CreateFromControl(control); + ccnxControl_Release(&control); + + send(message); + + ccnxMetaMessage_Release((CCNxMetaMessage **) &message); + +} + +void LocalConnector::send(CCNxMetaMessage *message) { + CCNxMetaMessage *msg = ccnxMetaMessage_Acquire(message); + + io_service_.post([this, msg]() { + bool write_in_progres = !write_msgs_.empty(); + write_msgs_.push_back(msg); + if (!is_connecting_) { + if (!write_in_progres) { + doWrite(); + } + } else { + // Tell the handle connect it has data to write + data_available_ = true; + } + }); +} + +void LocalConnector::close() { + io_service_.post([this]() { socket_.close(); }); +} + +void LocalConnector::doWrite() { + CCNxMetaMessage *message = write_msgs_.front(); + CCNxCodecNetworkBufferIoVec *network_buffer = ccnxCodecSchemaV1PacketEncoder_DictionaryEncode(message, NULL); + const iovec *iov = ccnxCodecNetworkBufferIoVec_GetArray(network_buffer); + + boost::asio::async_write(socket_, + boost::asio::buffer(iov->iov_base, iov->iov_len), + [this, network_buffer, message](boost::system::error_code ec, std::size_t /*length*/) { + if (!ec) { + ccnxMetaMessage_Release((CCNxMetaMessage **) &message); + write_msgs_.pop_front(); + + if (!write_msgs_.empty()) { + doWrite(); + } + } else { + tryReconnect(); + } + + ccnxCodecNetworkBufferIoVec_Release((CCNxCodecNetworkBufferIoVec **) &network_buffer); + + }); + +} + +void LocalConnector::doReadBody() { + boost::asio::async_read(socket_, + boost::asio::buffer(read_msg_.body(), read_msg_.bodyLength()), + boost::asio::transfer_exactly(read_msg_.bodyLength()), + [this](boost::system::error_code ec, std::size_t length) { + if (!ec) { + receive_callback_(read_msg_.decodeMessage()); + doReadHeader(); + } else { + tryReconnect(); + } + }); +} + +void LocalConnector::doReadHeader() { + boost::asio::async_read(socket_, + boost::asio::buffer(read_msg_.data(), TransportMessage::header_length), + boost::asio::transfer_exactly(TransportMessage::header_length), + [this](boost::system::error_code ec, std::size_t /*length*/) { + if (!ec) { + if (read_msg_.decodeHeader()) { + doReadBody(); + } else { + std::cerr << "Decoding error" << std::endl; + } + } else { + tryReconnect(); + } + }); +} + +void LocalConnector::tryReconnect() { + if (!is_connecting_) { + std::cerr << "Connection lost. Trying to reconnect..." << std::endl; + is_connecting_ = true; + is_reconnection_ = true; + io_service_.post([this]() { + socket_.close(); + startConnectionTimer(); + doConnect(); + }); + } +} + +void LocalConnector::doConnect() { + boost::asio::async_connect(socket_, + endpoint_iterator_, + [this](boost::system::error_code ec, tcp::resolver::iterator) { + if (!ec) { + timer_.cancel(); + is_connecting_ = false; + doReadHeader(); + + if (data_available_) { + data_available_ = false; + doWrite(); + } + + if (is_reconnection_) { + is_reconnection_ = false; + std::cout << "Connection recovered!" << std::endl; + for (auto &name : served_name_list_) { + bind(name); + } + } + + } else { + sleep(1); + doConnect(); + } + }); +} + +bool LocalConnector::checkConnected() { + return !is_connecting_; +} + +void LocalConnector::startConnectionTimer() { + timer_.expires_from_now(boost::posix_time::seconds(20)); + timer_.async_wait(std::bind(&LocalConnector::handleDeadline, this, std::placeholders::_1)); +} + +void LocalConnector::handleDeadline(const boost::system::error_code &ec) { + if (!ec) { + io_service_.post([this]() { + socket_.close(); + std::cerr << "Error connecting. Is the forwarder running?" << std::endl; + io_service_.stop(); + }); + } +} + +} // end namespace ccnx + +} // end namespace icnet
\ No newline at end of file |