diff options
Diffstat (limited to 'libtransport/src/io_modules')
36 files changed, 572 insertions, 2380 deletions
diff --git a/libtransport/src/io_modules/CMakeLists.txt b/libtransport/src/io_modules/CMakeLists.txt index 29aec236a..f4143de04 100644 --- a/libtransport/src/io_modules/CMakeLists.txt +++ b/libtransport/src/io_modules/CMakeLists.txt @@ -1,4 +1,4 @@ -# Copyright (c) 2021 Cisco and/or its affiliates. +# Copyright (c) 2021-2022 Cisco and/or its affiliates. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at: @@ -11,25 +11,69 @@ # See the License for the specific language governing permissions and # limitations under the License. + +############################################################## +# Android case: no submodules +############################################################## if (${CMAKE_SYSTEM_NAME} MATCHES Android) list(APPEND SOURCE_FILES - ${CMAKE_CURRENT_SOURCE_DIR}/udp/hicn_forwarder_module.cc - ${CMAKE_CURRENT_SOURCE_DIR}/udp/udp_socket_connector.cc + ${CMAKE_CURRENT_SOURCE_DIR}/hicn-light-ng/hicn_forwarder_module.cc ) list(APPEND HEADER_FILES - ${CMAKE_CURRENT_SOURCE_DIR}/udp/hicn_forwarder_module.h - ${CMAKE_CURRENT_SOURCE_DIR}/udp/udp_socket_connector.h + ${CMAKE_CURRENT_SOURCE_DIR}/hicn-light-ng/hicn_forwarder_module.h + ) + + if(CMAKE_SOURCE_DIR STREQUAL PROJECT_SOURCE_DIR) + find_package(Libhicnctrl ${CURRENT_VERSION} REQUIRED NO_MODULE) + + if (DISABLE_SHARED_LIBRARIES) + set(LIBTYPE static) + else() + set(LIBTYPE shared) + endif() + + list(APPEND LIBHICNCTRL_LIBRARIES hicn::hicnctrl.${LIBTYPE}) + else() + if (DISABLE_SHARED_LIBRARIES) + if (WIN32) + set(LIBHICNCTRL_LIBRARIES ${LIBHICNCTRL_STATIC}) + else () + set(LIBHICNCTRL_LIBRARIES ${LIBHICNCTRL_STATIC} log) + endif () + list(APPEND DEPENDENCIES + ${LIBHICNCTRL_STATIC} + ) + else() + set(LIBHICNCTRL_LIBRARIES ${LIBHICNCTRL_SHARED}) + list(APPEND DEPENDENCIES + ${LIBHICNCTRL_SHARED} + ) + endif() + endif() + + list(APPEND LIBRARIES + PRIVATE ${LIBHICNCTRL_LIBRARIES} + ) + + list(APPEND LIBTRANSPORT_INTERNAL_INCLUDE_DIRS + PUBLIC + $<BUILD_INTERFACE:${LIBHICNCTRL_INCLUDE_DIRS}> ) set(SOURCE_FILES ${SOURCE_FILES} PARENT_SCOPE) set(HEADER_FILES ${HEADER_FILES} PARENT_SCOPE) + set(LIBRARIES ${LIBRARIES} PARENT_SCOPE) + set(LIBTRANSPORT_INTERNAL_INCLUDE_DIRS ${LIBTRANSPORT_INTERNAL_INCLUDE_DIRS} PARENT_SCOPE) else() - add_subdirectory(udp) +############################################################## +# Compile submodules +############################################################## + add_subdirectory(hicn-light-ng) add_subdirectory(loopback) add_subdirectory(forwarder) if (__vpp__) add_subdirectory(memif) endif() -endif()
\ No newline at end of file +endif() diff --git a/libtransport/src/io_modules/forwarder/CMakeLists.txt b/libtransport/src/io_modules/forwarder/CMakeLists.txt index a1d0c5db5..3922316d3 100644 --- a/libtransport/src/io_modules/forwarder/CMakeLists.txt +++ b/libtransport/src/io_modules/forwarder/CMakeLists.txt @@ -1,4 +1,4 @@ -# Copyright (c) 2021 Cisco and/or its affiliates. +# Copyright (c) 2021-2022 Cisco and/or its affiliates. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at: @@ -17,8 +17,6 @@ list(APPEND MODULE_HEADER_FILES ${CMAKE_CURRENT_SOURCE_DIR}/errors.h ${CMAKE_CURRENT_SOURCE_DIR}/forwarder_module.h ${CMAKE_CURRENT_SOURCE_DIR}/forwarder.h - ${CMAKE_CURRENT_SOURCE_DIR}/udp_tunnel_listener.h - ${CMAKE_CURRENT_SOURCE_DIR}/udp_tunnel.h ${CMAKE_CURRENT_SOURCE_DIR}/global_counter.h ) @@ -26,16 +24,13 @@ list(APPEND MODULE_SOURCE_FILES ${CMAKE_CURRENT_SOURCE_DIR}/errors.cc ${CMAKE_CURRENT_SOURCE_DIR}/forwarder_module.cc ${CMAKE_CURRENT_SOURCE_DIR}/forwarder.cc - ${CMAKE_CURRENT_SOURCE_DIR}/udp_tunnel_listener.cc - ${CMAKE_CURRENT_SOURCE_DIR}/udp_tunnel.cc ) build_module(forwarder_module - SHARED SOURCES ${MODULE_SOURCE_FILES} DEPENDS ${DEPENDENCIES} COMPONENT ${LIBTRANSPORT_COMPONENT}-io-modules - INCLUDE_DIRS ${LIBTRANSPORT_INCLUDE_DIRS} ${LIBTRANSPORT_INTERNAL_INCLUDE_DIRS} + INCLUDE_DIRS ${LIBTRANSPORT_INTERNAL_INCLUDE_DIRS} ${Libhicn_INCLUDE_DIRS} DEFINITIONS ${COMPILER_DEFINITIONS} - COMPILE_OPTIONS ${COMPILE_FLAGS} + COMPILE_OPTIONS ${COMPILER_OPTIONS} ) diff --git a/libtransport/src/io_modules/forwarder/errors.cc b/libtransport/src/io_modules/forwarder/errors.cc index b5f131499..6e93d0453 100644 --- a/libtransport/src/io_modules/forwarder/errors.cc +++ b/libtransport/src/io_modules/forwarder/errors.cc @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. */ #include <io_modules/forwarder/errors.h> diff --git a/libtransport/src/io_modules/forwarder/forwarder.cc b/libtransport/src/io_modules/forwarder/forwarder.cc index 0546cb8b3..3ae5bf397 100644 --- a/libtransport/src/io_modules/forwarder/forwarder.cc +++ b/libtransport/src/io_modules/forwarder/forwarder.cc @@ -15,11 +15,11 @@ #include <core/global_configuration.h> #include <core/local_connector.h> +#include <core/udp_connector.h> +#include <core/udp_listener.h> #include <glog/logging.h> #include <io_modules/forwarder/forwarder.h> #include <io_modules/forwarder/global_id_counter.h> -#include <io_modules/forwarder/udp_tunnel.h> -#include <io_modules/forwarder/udp_tunnel_listener.h> namespace transport { @@ -89,11 +89,12 @@ void Forwarder::initConnectors() { Connector::Id Forwarder::registerLocalConnector( asio::io_service &io_service, Connector::PacketReceivedCallback &&receive_callback, + Connector::PacketSentCallback &&sent_callback, Connector::OnReconnectCallback &&reconnect_callback) { utils::SpinLock::Acquire locked(connector_lock_); auto id = GlobalCounter<Connector::Id>::getInstance().getNext(); auto connector = std::make_shared<LocalConnector>( - io_service, receive_callback, nullptr, nullptr, reconnect_callback); + io_service, receive_callback, sent_callback, nullptr, reconnect_callback); connector->setConnectorId(id); local_connectors_.emplace(id, std::move(connector)); return id; @@ -105,6 +106,7 @@ Forwarder &Forwarder::deleteConnector(Connector::Id id) { if (it != local_connectors_.end()) { it->second->close(); local_connectors_.erase(it); + } else { } return *this; @@ -120,9 +122,9 @@ Connector::Ptr Forwarder::getConnector(Connector::Id id) { return nullptr; } -void Forwarder::onPacketFromListener(Connector *connector, - utils::MemBuf &packet_buffer, - const std::error_code &ec) { +void Forwarder::onPacketFromListener( + Connector *connector, const std::vector<utils::MemBuf::Ptr> &packets, + const std::error_code &ec) { // Create connector connector->setReceiveCallback( std::bind(&Forwarder::onPacketReceived, this, std::placeholders::_1, @@ -135,37 +137,47 @@ void Forwarder::onPacketFromListener(Connector *connector, remote_connectors_.emplace(connector->getConnectorId(), connector->shared_from_this()); } + // TODO Check if control packet or not. For the moment it is not. - onPacketReceived(connector, packet_buffer, ec); + onPacketReceived(connector, packets, ec); } void Forwarder::onPacketReceived(Connector *connector, - utils::MemBuf &packet_buffer, + const std::vector<utils::MemBuf::Ptr> &packets, const std::error_code &ec) { - // Figure out the type of packet we received - bool is_interest = Packet::isInterest(packet_buffer.data()); - - Packet *packet = nullptr; - if (is_interest) { - packet = static_cast<Interest *>(&packet_buffer); - } else { - packet = static_cast<ContentObject *>(&packet_buffer); + if (ec) { + LOG(ERROR) << "Error receiving packet: " << ec.message(); + return; } - for (auto &c : local_connectors_) { - auto role = c.second->getRole(); - auto is_producer = role == Connector::Role::PRODUCER; - if ((is_producer && is_interest) || (!is_producer && !is_interest)) { - c.second->send(*packet); + for (auto &packet_buffer_ptr : packets) { + auto &packet_buffer = *packet_buffer_ptr; + + // Figure out the type of packet we received + bool is_interest = Packet::isInterest(packet_buffer.data()); + + Packet *packet = nullptr; + if (is_interest) { + packet = static_cast<Interest *>(&packet_buffer); } else { - LOG(ERROR) << "Error sending packet to local connector. is_interest = " - << is_interest << " - is_producer = " << is_producer; + packet = static_cast<ContentObject *>(&packet_buffer); + } + + for (auto &c : local_connectors_) { + auto role = c.second->getRole(); + auto is_producer = role == Connector::Role::PRODUCER; + if ((is_producer && is_interest) || (!is_producer && !is_interest)) { + c.second->send(*packet); + } else { + LOG(ERROR) << "Error sending packet to local connector. is_interest = " + << is_interest << " - is_producer = " << is_producer; + } } - } - // PCS Lookup + FIB lookup. Skip for now + // PCS Lookup + FIB lookup. Skip for now - // Forward packet to local connectors + // Forward packet to local connectors + } } void Forwarder::send(Packet &packet) { diff --git a/libtransport/src/io_modules/forwarder/forwarder.h b/libtransport/src/io_modules/forwarder/forwarder.h index 5b564bb5e..38b4260b3 100644 --- a/libtransport/src/io_modules/forwarder/forwarder.h +++ b/libtransport/src/io_modules/forwarder/forwarder.h @@ -15,13 +15,13 @@ #pragma once +#include <core/udp_listener.h> #include <hicn/transport/core/io_module.h> #include <hicn/transport/core/prefix.h> #include <hicn/transport/utils/event_thread.h> #include <hicn/transport/utils/singleton.h> #include <hicn/transport/utils/spinlock.h> #include <io_modules/forwarder/configuration.h> -#include <io_modules/forwarder/udp_tunnel_listener.h> #include <atomic> #include <libconfig.h++> @@ -31,9 +31,8 @@ namespace transport { namespace core { -class Forwarder : public utils::Singleton<Forwarder> { +class Forwarder { static constexpr char forwarder_config_section[] = "forwarder"; - friend class utils::Singleton<Forwarder>; public: Forwarder(); @@ -47,6 +46,7 @@ class Forwarder : public utils::Singleton<Forwarder> { Connector::Id registerLocalConnector( asio::io_service &io_service, Connector::PacketReceivedCallback &&receive_callback, + Connector::PacketSentCallback &&sent_callback, Connector::OnReconnectCallback &&reconnect_callback); Forwarder &deleteConnector(Connector::Id id); @@ -58,9 +58,11 @@ class Forwarder : public utils::Singleton<Forwarder> { void stop(); private: - void onPacketFromListener(Connector *connector, utils::MemBuf &packet_buffer, + void onPacketFromListener(Connector *connector, + const std::vector<utils::MemBuf::Ptr> &packets, const std::error_code &ec); - void onPacketReceived(Connector *connector, utils::MemBuf &packet_buffer, + void onPacketReceived(Connector *connector, + const std::vector<utils::MemBuf::Ptr> &packets, const std::error_code &ec); void onPacketSent(Connector *connector, const std::error_code &ec); void onConnectorClosed(Connector *connector); @@ -86,5 +88,20 @@ class Forwarder : public utils::Singleton<Forwarder> { Configuration config_; }; +class ForwarderGlobal : public ::utils::Singleton<ForwarderGlobal> { + friend class utils::Singleton<ForwarderGlobal>; + + public: + ~ForwarderGlobal() {} + std::shared_ptr<Forwarder> &getReference() { return forwarder_; } + + private: + ForwarderGlobal() : forwarder_(std::make_shared<Forwarder>()) {} + + private: + std::shared_ptr<Forwarder> forwarder_; +}; + } // namespace core -} // namespace transport
\ No newline at end of file + +} // namespace transport diff --git a/libtransport/src/io_modules/forwarder/forwarder_module.cc b/libtransport/src/io_modules/forwarder/forwarder_module.cc index 4f95b9ca0..0ced84ab4 100644 --- a/libtransport/src/io_modules/forwarder/forwarder_module.cc +++ b/libtransport/src/io_modules/forwarder/forwarder_module.cc @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2020 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -25,11 +25,10 @@ ForwarderModule::ForwarderModule() : IoModule(), name_(""), connector_id_(Connector::invalid_connector), - forwarder_(Forwarder::getInstance()) {} + forwarder_ptr_(ForwarderGlobal::getInstance().getReference()), + forwarder_(*forwarder_ptr_) {} -ForwarderModule::~ForwarderModule() { - forwarder_.deleteConnector(connector_id_); -} +ForwarderModule::~ForwarderModule() {} bool ForwarderModule::isConnected() { return true; } @@ -42,7 +41,7 @@ void ForwarderModule::send(Packet &packet) { // local_faces_.at(1 - local_id_).onPacket(packet); } -void ForwarderModule::send(const uint8_t *packet, std::size_t len) { +void ForwarderModule::send(const utils::MemBuf::Ptr &buffer) { // not supported throw errors::NotImplementedException(); } @@ -58,11 +57,13 @@ void ForwarderModule::closeConnection() { } void ForwarderModule::init(Connector::PacketReceivedCallback &&receive_callback, + Connector::PacketSentCallback &&sent_callback, Connector::OnReconnectCallback &&reconnect_callback, asio::io_service &io_service, const std::string &app_name) { connector_id_ = forwarder_.registerLocalConnector( - io_service, std::move(receive_callback), std::move(reconnect_callback)); + io_service, std::move(receive_callback), std::move(sent_callback), + std::move(reconnect_callback)); name_ = app_name; } @@ -78,7 +79,9 @@ void ForwarderModule::connect(bool is_consumer) { std::uint32_t ForwarderModule::getMtu() { return interface_mtu; } -bool ForwarderModule::isControlMessage(const uint8_t *message) { return false; } +bool ForwarderModule::isControlMessage(utils::MemBuf &packet_buffer) { + return false; +} extern "C" IoModule *create_module(void) { return new ForwarderModule(); } diff --git a/libtransport/src/io_modules/forwarder/forwarder_module.h b/libtransport/src/io_modules/forwarder/forwarder_module.h index 58bfb7996..52a12b67e 100644 --- a/libtransport/src/io_modules/forwarder/forwarder_module.h +++ b/libtransport/src/io_modules/forwarder/forwarder_module.h @@ -38,11 +38,12 @@ class ForwarderModule : public IoModule { void connect(bool is_consumer) override; void send(Packet &packet) override; - void send(const uint8_t *packet, std::size_t len) override; + void send(const utils::MemBuf::Ptr &buffer) override; bool isConnected() override; void init(Connector::PacketReceivedCallback &&receive_callback, + Connector::PacketSentCallback &&sent_callback, Connector::OnReconnectCallback &&reconnect_callback, asio::io_service &io_service, const std::string &app_name = "Libtransport") override; @@ -51,15 +52,19 @@ class ForwarderModule : public IoModule { std::uint32_t getMtu() override; - bool isControlMessage(const uint8_t *message) override; + bool isControlMessage(utils::MemBuf &packet_buffer) override; void processControlMessageReply(utils::MemBuf &packet_buffer) override; void closeConnection() override; private: + static void initForwarder(); + + private: std::string name_; Connector::Id connector_id_; + std::shared_ptr<Forwarder> forwarder_ptr_; Forwarder &forwarder_; }; diff --git a/libtransport/src/io_modules/forwarder/udp_tunnel.cc b/libtransport/src/io_modules/forwarder/udp_tunnel.cc deleted file mode 100644 index bf6a69b92..000000000 --- a/libtransport/src/io_modules/forwarder/udp_tunnel.cc +++ /dev/null @@ -1,290 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - */ - -#include <glog/logging.h> -#include <hicn/transport/utils/branch_prediction.h> -#include <io_modules/forwarder/errors.h> -#include <io_modules/forwarder/udp_tunnel.h> - -#include <iostream> -#include <thread> -#include <vector> - -namespace transport { -namespace core { - -UdpTunnelConnector::~UdpTunnelConnector() {} - -void UdpTunnelConnector::connect(const std::string &hostname, uint16_t port, - const std::string &bind_address, - uint16_t bind_port) { - if (state_ == State::CLOSED) { - state_ = State::CONNECTING; - endpoint_iterator_ = resolver_.resolve({hostname, std::to_string(port)}); - remote_endpoint_send_ = *endpoint_iterator_; - socket_->open(remote_endpoint_send_.protocol()); - - if (!bind_address.empty() && bind_port != 0) { - using namespace asio::ip; - socket_->bind( - udp::endpoint(address::from_string(bind_address), bind_port)); - } - - state_ = State::CONNECTED; - - remote_endpoint_ = Endpoint(remote_endpoint_send_); - local_endpoint_ = Endpoint(socket_->local_endpoint()); - - doRecvPacket(); - -#ifdef LINUX - send_timer_.expires_from_now(std::chrono::microseconds(50)); - send_timer_.async_wait(std::bind(&UdpTunnelConnector::writeHandler, this, - std::placeholders::_1)); -#endif - } -} - -void UdpTunnelConnector::send(Packet &packet) { - strand_->post([this, pkt{packet.shared_from_this()}]() { - bool write_in_progress = !output_buffer_.empty(); - output_buffer_.push_back(std::move(pkt)); - if (TRANSPORT_EXPECT_TRUE(state_ == State::CONNECTED)) { - if (!write_in_progress) { - doSendPacket(); - } - } else { - data_available_ = true; - } - }); -} - -void UdpTunnelConnector::send(const uint8_t *packet, std::size_t len) {} - -void UdpTunnelConnector::close() { - DLOG_IF(INFO, VLOG_IS_ON(2)) << "UDPTunnelConnector::close"; - state_ = State::CLOSED; - bool is_socket_owned = socket_.use_count() == 1; - if (is_socket_owned) { - io_service_.dispatch([this]() { - this->socket_->close(); - // on_close_callback_(shared_from_this()); - }); - } -} - -void UdpTunnelConnector::doSendPacket() { -#ifdef LINUX - send_timer_.expires_from_now(std::chrono::microseconds(50)); - send_timer_.async_wait(std::bind(&UdpTunnelConnector::writeHandler, this, - std::placeholders::_1)); -#else - auto packet = output_buffer_.front().get(); - auto array = std::vector<asio::const_buffer>(); - - const ::utils::MemBuf *current = packet; - do { - array.push_back(asio::const_buffer(current->data(), current->length())); - current = current->next(); - } while (current != packet); - - socket_->async_send_to( - std::move(array), remote_endpoint_send_, - strand_->wrap([this](std::error_code ec, std::size_t length) { - if (TRANSPORT_EXPECT_TRUE(!ec)) { - sent_callback_(this, make_error_code(forwarder_error::success)); - } else if (ec.value() == - static_cast<int>(std::errc::operation_canceled)) { - // The connection has been closed by the application. - return; - } else { - sendFailed(); - sent_callback_(this, ec); - } - - output_buffer_.pop_front(); - if (!output_buffer_.empty()) { - doSendPacket(); - } - })); -#endif -} - -#ifdef LINUX -void UdpTunnelConnector::writeHandler(std::error_code ec) { - if (TRANSPORT_EXPECT_FALSE(state_ != State::CONNECTED)) { - return; - } - - auto len = std::min(output_buffer_.size(), std::size_t(Connector::max_burst)); - - if (len) { - int m = 0; - for (auto &p : output_buffer_) { - auto packet = p.get(); - ::utils::MemBuf *current = packet; - int b = 0; - do { - // array.push_back(asio::const_buffer(current->data(), - // current->length())); - tx_iovecs_[m][b].iov_base = current->writableData(); - tx_iovecs_[m][b].iov_len = current->length(); - current = current->next(); - b++; - } while (current != packet); - - tx_msgs_[m].msg_hdr.msg_iov = tx_iovecs_[m]; - tx_msgs_[m].msg_hdr.msg_iovlen = b; - tx_msgs_[m].msg_hdr.msg_name = remote_endpoint_send_.data(); - tx_msgs_[m].msg_hdr.msg_namelen = remote_endpoint_send_.size(); - m++; - - if (--len == 0) { - break; - } - } - - int retval = sendmmsg(socket_->native_handle(), tx_msgs_, m, MSG_DONTWAIT); - if (retval > 0) { - while (retval--) { - output_buffer_.pop_front(); - } - } else if (retval != EWOULDBLOCK && retval != EAGAIN) { - LOG(ERROR) << "Error sending messages! " << strerror(errno) - << " << retval"; - return; - } - } - - if (!output_buffer_.empty()) { - send_timer_.expires_from_now(std::chrono::microseconds(50)); - send_timer_.async_wait(std::bind(&UdpTunnelConnector::writeHandler, this, - std::placeholders::_1)); - } -} - -void UdpTunnelConnector::readHandler(std::error_code ec) { - DLOG_IF(INFO, VLOG_IS_ON(3)) << "UdpTunnelConnector receive packet"; - - if (TRANSPORT_EXPECT_TRUE(!ec)) { - if (TRANSPORT_EXPECT_TRUE(state_ == State::CONNECTED)) { - if (current_position_ == 0) { - for (int i = 0; i < max_burst; i++) { - auto read_buffer = getRawBuffer(); - rx_iovecs_[i][0].iov_base = read_buffer.first; - rx_iovecs_[i][0].iov_len = read_buffer.second; - rx_msgs_[i].msg_hdr.msg_iov = rx_iovecs_[i]; - rx_msgs_[i].msg_hdr.msg_iovlen = 1; - } - } - - int res = recvmmsg(socket_->native_handle(), rx_msgs_ + current_position_, - max_burst - current_position_, MSG_DONTWAIT, nullptr); - if (res < 0) { - LOG(ERROR) << "Error receiving messages! " << strerror(errno) << " " - << res; - return; - } - - for (int i = 0; i < res; i++) { - auto packet = getPacketFromBuffer( - reinterpret_cast<uint8_t *>( - rx_msgs_[current_position_].msg_hdr.msg_iov[0].iov_base), - rx_msgs_[current_position_].msg_len); - receiveSuccess(*packet); - receive_callback_(this, *packet, - make_error_code(forwarder_error::success)); - ++current_position_; - } - - doRecvPacket(); - } else { - LOG(ERROR) - << "Error in UDP: Receiving packets from a not connected socket."; - } - } else if (ec.value() == static_cast<int>(std::errc::operation_canceled)) { - LOG(ERROR) << "The connection has been closed by the application."; - return; - } else { - if (TRANSPORT_EXPECT_TRUE(state_ == State::CONNECTED)) { - // receive_callback_(this, *read_msg_, ec); - LOG(ERROR) << "Error in UDP connector: " << ec.value() << " " - << ec.message(); - } else { - LOG(ERROR) << "Error in connector while not connected. " << ec.value() - << " " << ec.message(); - } - } -} -#endif - -void UdpTunnelConnector::doRecvPacket() { -#ifdef LINUX - if (state_ == State::CONNECTED) { -#if ((ASIO_VERSION / 100 % 1000) < 11) - socket_->async_receive(asio::null_buffers(), -#else - socket_->async_wait(asio::ip::tcp::socket::wait_read, -#endif - std::bind(&UdpTunnelConnector::readHandler, this, - std::placeholders::_1)); - } -#else - DLOG_IF(INFO, VLOG_IS_ON(3)) << "UdpTunnelConnector receive packet"; - read_msg_ = getRawBuffer(); - socket_->async_receive_from( - asio::buffer(read_msg_.first, read_msg_.second), remote_endpoint_recv_, - [this](std::error_code ec, std::size_t length) { - DLOG_IF(INFO, VLOG_IS_ON(3)) - << "UdpTunnelConnector received packet length=" << length; - if (TRANSPORT_EXPECT_TRUE(!ec)) { - if (TRANSPORT_EXPECT_TRUE(state_ == State::CONNECTED)) { - auto packet = getPacketFromBuffer(read_msg_.first, length); - receiveSuccess(*packet); - receive_callback_(this, *packet, - make_error_code(forwarder_error::success)); - doRecvPacket(); - } else { - LOG(ERROR) << "Error in UDP: Receiving packets from a not " - "connected socket."; - } - } else if (ec.value() == - static_cast<int>(std::errc::operation_canceled)) { - LOG(ERROR) << "The connection has been closed by the application."; - return; - } else { - if (TRANSPORT_EXPECT_TRUE(state_ == State::CONNECTED)) { - LOG(ERROR) << "Error in UDP connector: " << ec.value() - << ec.message(); - } else { - LOG(ERROR) << "Error while not connected"; - } - } - }); -#endif -} - -void UdpTunnelConnector::doConnect() { - asio::async_connect( - *socket_, endpoint_iterator_, - [this](std::error_code ec, asio::ip::udp::resolver::iterator) { - if (!ec) { - state_ = State::CONNECTED; - doRecvPacket(); - - if (data_available_) { - data_available_ = false; - doSendPacket(); - } - } else { - LOG(ERROR) << "UDP Connection failed!!!"; - timer_.expires_from_now(std::chrono::milliseconds(500)); - timer_.async_wait(std::bind(&UdpTunnelConnector::doConnect, this)); - } - }); -} - -} // namespace core - -} // namespace transport diff --git a/libtransport/src/io_modules/forwarder/udp_tunnel.h b/libtransport/src/io_modules/forwarder/udp_tunnel.h deleted file mode 100644 index 4f044f93f..000000000 --- a/libtransport/src/io_modules/forwarder/udp_tunnel.h +++ /dev/null @@ -1,146 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - */ - -#pragma once - -#include <hicn/transport/core/asio_wrapper.h> -#include <hicn/transport/core/connector.h> -#include <hicn/transport/portability/platform.h> -#include <io_modules/forwarder/errors.h> - -#include <iostream> -#include <memory> - -namespace transport { -namespace core { - -class UdpTunnelListener; - -class UdpTunnelConnector : public Connector { - friend class UdpTunnelListener; - - public: - template <typename ReceiveCallback, typename SentCallback, typename OnClose, - typename OnReconnect> - UdpTunnelConnector(asio::io_service &io_service, - ReceiveCallback &&receive_callback, - SentCallback &&packet_sent, OnClose &&on_close_callback, - OnReconnect &&on_reconnect) - : Connector(receive_callback, packet_sent, on_close_callback, - on_reconnect), - io_service_(io_service), - strand_(std::make_shared<asio::io_service::strand>(io_service_)), - socket_(std::make_shared<asio::ip::udp::socket>(io_service_)), - resolver_(io_service_), - timer_(io_service_), -#ifdef LINUX - send_timer_(io_service_), - tx_iovecs_{0}, - tx_msgs_{0}, - rx_iovecs_{0}, - rx_msgs_{0}, - current_position_(0), -#else - read_msg_(nullptr, 0), -#endif - data_available_(false) { - } - - template <typename ReceiveCallback, typename SentCallback, typename OnClose, - typename OnReconnect, typename EndpointType> - UdpTunnelConnector(std::shared_ptr<asio::ip::udp::socket> &socket, - std::shared_ptr<asio::io_service::strand> &strand, - ReceiveCallback &&receive_callback, - SentCallback &&packet_sent, OnClose &&on_close_callback, - OnReconnect &&on_reconnect, EndpointType &&remote_endpoint) - : Connector(receive_callback, packet_sent, on_close_callback, - on_reconnect), -#if ((ASIO_VERSION / 100 % 1000) < 12) - io_service_(socket->get_io_service()), -#else - io_service_((asio::io_context &)(socket->get_executor().context())), -#endif - strand_(strand), - socket_(socket), - resolver_(io_service_), - remote_endpoint_send_(std::forward<EndpointType &&>(remote_endpoint)), - timer_(io_service_), -#ifdef LINUX - send_timer_(io_service_), - tx_iovecs_{0}, - tx_msgs_{0}, - rx_iovecs_{0}, - rx_msgs_{0}, - current_position_(0), -#else - read_msg_(nullptr, 0), -#endif - data_available_(false) { - if (socket_->is_open()) { - state_ = State::CONNECTED; - remote_endpoint_ = Endpoint(remote_endpoint_send_); - local_endpoint_ = socket_->local_endpoint(); - } - } - - ~UdpTunnelConnector() override; - - void send(Packet &packet) override; - - void send(const uint8_t *packet, std::size_t len) override; - - void close() override; - - void connect(const std::string &hostname, std::uint16_t port, - const std::string &bind_address = "", - std::uint16_t bind_port = 0); - - auto shared_from_this() { return utils::shared_from(this); } - - private: - void doConnect(); - void doRecvPacket(); - - void doRecvPacket(utils::MemBuf &buffer) { - receive_callback_(this, buffer, make_error_code(forwarder_error::success)); - } - -#ifdef LINUX - void readHandler(std::error_code ec); - void writeHandler(std::error_code ec); -#endif - - void setConnected() { state_ = State::CONNECTED; } - - void doSendPacket(); - void doClose(); - - private: - asio::io_service &io_service_; - std::shared_ptr<asio::io_service::strand> strand_; - std::shared_ptr<asio::ip::udp::socket> socket_; - asio::ip::udp::resolver resolver_; - asio::ip::udp::resolver::iterator endpoint_iterator_; - asio::ip::udp::endpoint remote_endpoint_send_; - asio::ip::udp::endpoint remote_endpoint_recv_; - - asio::steady_timer timer_; - -#ifdef LINUX - asio::steady_timer send_timer_; - struct iovec tx_iovecs_[max_burst][8]; - struct mmsghdr tx_msgs_[max_burst]; - struct iovec rx_iovecs_[max_burst][8]; - struct mmsghdr rx_msgs_[max_burst]; - std::uint8_t current_position_; -#else - std::pair<uint8_t *, std::size_t> read_msg_; -#endif - - bool data_available_; -}; - -} // namespace core - -} // namespace transport diff --git a/libtransport/src/io_modules/forwarder/udp_tunnel_listener.cc b/libtransport/src/io_modules/forwarder/udp_tunnel_listener.cc deleted file mode 100644 index d047cc568..000000000 --- a/libtransport/src/io_modules/forwarder/udp_tunnel_listener.cc +++ /dev/null @@ -1,177 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - */ - -#include <glog/logging.h> -#include <hicn/transport/utils/hash.h> -#include <io_modules/forwarder/udp_tunnel.h> -#include <io_modules/forwarder/udp_tunnel_listener.h> - -#ifndef LINUX -namespace std { -size_t hash<asio::ip::udp::endpoint>::operator()( - const asio::ip::udp::endpoint &endpoint) const { - auto hash_ip = endpoint.address().is_v4() - ? endpoint.address().to_v4().to_ulong() - : utils::hash::fnv32_buf( - endpoint.address().to_v6().to_bytes().data(), 16); - uint16_t port = endpoint.port(); - return utils::hash::fnv32_buf(&port, 2, hash_ip); -} -} // namespace std -#endif - -namespace transport { -namespace core { - -UdpTunnelListener::~UdpTunnelListener() {} - -void UdpTunnelListener::close() { - strand_->post([this]() { - if (socket_->is_open()) { - socket_->close(); - } - }); -} - -#ifdef LINUX -void UdpTunnelListener::readHandler(std::error_code ec) { - DLOG_IF(INFO, VLOG_IS_ON(3)) << "UdpTunnelConnector receive packet"; - - if (TRANSPORT_EXPECT_TRUE(!ec)) { - if (current_position_ == 0) { - for (int i = 0; i < Connector::max_burst; i++) { - auto read_buffer = Connector::getRawBuffer(); - iovecs_[i][0].iov_base = read_buffer.first; - iovecs_[i][0].iov_len = read_buffer.second; - msgs_[i].msg_hdr.msg_iov = iovecs_[i]; - msgs_[i].msg_hdr.msg_iovlen = 1; - msgs_[i].msg_hdr.msg_name = &remote_endpoints_[i]; - msgs_[i].msg_hdr.msg_namelen = sizeof(remote_endpoints_[i]); - } - } - - int res = recvmmsg(socket_->native_handle(), msgs_ + current_position_, - Connector::max_burst - current_position_, MSG_DONTWAIT, - nullptr); - if (res < 0) { - LOG(ERROR) << "Error in recvmmsg."; - return; - } - - for (int i = 0; i < res; i++) { - auto packet = Connector::getPacketFromBuffer( - reinterpret_cast<uint8_t *>( - msgs_[current_position_].msg_hdr.msg_iov[0].iov_base), - msgs_[current_position_].msg_len); - auto connector_id = - utils::hash::fnv64_buf(msgs_[current_position_].msg_hdr.msg_name, - msgs_[current_position_].msg_hdr.msg_namelen); - - auto connector = connectors_.find(connector_id); - if (connector == connectors_.end()) { - // Create new connector corresponding to new client - - /* - * Get the remote endpoint for this particular message - */ - using namespace asio::ip; - if (local_endpoint_.address().is_v4()) { - auto addr = reinterpret_cast<struct sockaddr_in *>( - &remote_endpoints_[current_position_]); - address_v4::bytes_type address_bytes; - std::copy_n(reinterpret_cast<uint8_t *>(&addr->sin_addr), - address_bytes.size(), address_bytes.begin()); - address_v4 address(address_bytes); - remote_endpoint_ = udp::endpoint(address, ntohs(addr->sin_port)); - } else { - auto addr = reinterpret_cast<struct sockaddr_in6 *>( - &remote_endpoints_[current_position_]); - address_v6::bytes_type address_bytes; - std::copy_n(reinterpret_cast<uint8_t *>(&addr->sin6_addr), - address_bytes.size(), address_bytes.begin()); - address_v6 address(address_bytes); - remote_endpoint_ = udp::endpoint(address, ntohs(addr->sin6_port)); - } - - /** - * Create new connector sharing the same socket of this listener. - */ - auto ret = connectors_.emplace( - connector_id, - std::make_shared<UdpTunnelConnector>( - socket_, strand_, receive_callback_, - [](Connector *, const std::error_code &) {}, [](Connector *) {}, - [](Connector *) {}, std::move(remote_endpoint_))); - connector = ret.first; - connector->second->setConnectorId(connector_id); - } - - /** - * Use connector callback to process incoming message. - */ - UdpTunnelConnector *c = - dynamic_cast<UdpTunnelConnector *>(connector->second.get()); - c->doRecvPacket(*packet); - - ++current_position_; - } - - doRecvPacket(); - } else if (ec.value() == static_cast<int>(std::errc::operation_canceled)) { - LOG(ERROR) << "The connection has been closed by the application."; - return; - } else { - LOG(ERROR) << ec.value() << " " << ec.message(); - } -} -#endif - -void UdpTunnelListener::doRecvPacket() { -#ifdef LINUX -#if ((ASIO_VERSION / 100 % 1000) < 11) - socket_->async_receive( - asio::null_buffers(), -#else - socket_->async_wait( - asio::ip::tcp::socket::wait_read, -#endif - std::bind(&UdpTunnelListener::readHandler, this, std::placeholders::_1)); -#else - read_msg_ = Connector::getRawBuffer(); - socket_->async_receive_from( - asio::buffer(read_msg_.first, read_msg_.second), remote_endpoint_, - [this](std::error_code ec, std::size_t length) { - if (TRANSPORT_EXPECT_TRUE(!ec)) { - auto packet = Connector::getPacketFromBuffer(read_msg_.first, length); - auto connector_id = - std::hash<asio::ip::udp::endpoint>{}(remote_endpoint_); - auto connector = connectors_.find(connector_id); - if (connector == connectors_.end()) { - // Create new connector corresponding to new client - auto ret = connectors_.emplace( - connector_id, std::make_shared<UdpTunnelConnector>( - socket_, strand_, receive_callback_, - [](Connector *, const std::error_code &) {}, - [](Connector *) {}, [](Connector *) {}, - std::move(remote_endpoint_))); - connector = ret.first; - connector->second->setConnectorId(connector_id); - } - - UdpTunnelConnector *c = - dynamic_cast<UdpTunnelConnector *>(connector->second.get()); - c->doRecvPacket(*packet); - doRecvPacket(); - } else if (ec.value() == - static_cast<int>(std::errc::operation_canceled)) { - LOG(ERROR) << "The connection has been closed by the application."; - return; - } else { - LOG(ERROR) << ec.value() << " " << ec.message(); - } - }); -#endif -} -} // namespace core -} // namespace transport
\ No newline at end of file diff --git a/libtransport/src/io_modules/forwarder/udp_tunnel_listener.h b/libtransport/src/io_modules/forwarder/udp_tunnel_listener.h deleted file mode 100644 index 5d197dcb0..000000000 --- a/libtransport/src/io_modules/forwarder/udp_tunnel_listener.h +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - */ - -#pragma once - -#include <hicn/transport/core/asio_wrapper.h> -#include <hicn/transport/core/connector.h> -#include <hicn/transport/portability/platform.h> - -#include <unordered_map> - -namespace std { -template <> -struct hash<asio::ip::udp::endpoint> { - size_t operator()(const asio::ip::udp::endpoint &endpoint) const; -}; -} // namespace std - -namespace transport { -namespace core { - -class UdpTunnelListener - : public std::enable_shared_from_this<UdpTunnelListener> { - using PacketReceivedCallback = Connector::PacketReceivedCallback; - using EndpointId = std::pair<uint32_t, uint16_t>; - - static constexpr uint16_t default_port = 5004; - - public: - using Ptr = std::shared_ptr<UdpTunnelListener>; - - template <typename ReceiveCallback> - UdpTunnelListener(asio::io_service &io_service, - ReceiveCallback &&receive_callback, - asio::ip::udp::endpoint endpoint = asio::ip::udp::endpoint( - asio::ip::udp::v4(), default_port)) - : io_service_(io_service), - strand_(std::make_shared<asio::io_service::strand>(io_service_)), - socket_(std::make_shared<asio::ip::udp::socket>(io_service_, - endpoint.protocol())), - local_endpoint_(endpoint), - receive_callback_(std::forward<ReceiveCallback &&>(receive_callback)), -#ifndef LINUX - read_msg_(nullptr, 0) -#else - iovecs_{0}, - msgs_{0}, - current_position_(0) -#endif - { - if (endpoint.protocol() == asio::ip::udp::v6()) { - std::error_code ec; - socket_->set_option(asio::ip::v6_only(false), ec); - // Call succeeds only on dual stack systems. - } - socket_->bind(local_endpoint_); - io_service_.post(std::bind(&UdpTunnelListener::doRecvPacket, this)); - } - - ~UdpTunnelListener(); - - void close(); - - int deleteConnector(Connector *connector) { - return connectors_.erase(connector->getConnectorId()); - } - - template <typename ReceiveCallback> - void setReceiveCallback(ReceiveCallback &&callback) { - receive_callback_ = std::forward<ReceiveCallback &&>(callback); - } - - Connector *findConnector(Connector::Id connId) { - auto it = connectors_.find(connId); - if (it != connectors_.end()) { - return it->second.get(); - } - - return nullptr; - } - - private: - void doRecvPacket(); - - void readHandler(std::error_code ec); - - asio::io_service &io_service_; - std::shared_ptr<asio::io_service::strand> strand_; - std::shared_ptr<asio::ip::udp::socket> socket_; - asio::ip::udp::endpoint local_endpoint_; - asio::ip::udp::endpoint remote_endpoint_; - std::unordered_map<Connector::Id, std::shared_ptr<Connector>> connectors_; - - PacketReceivedCallback receive_callback_; - -#ifdef LINUX - struct iovec iovecs_[Connector::max_burst][8]; - struct mmsghdr msgs_[Connector::max_burst]; - struct sockaddr_storage remote_endpoints_[Connector::max_burst]; - std::uint8_t current_position_; -#else - std::pair<uint8_t *, std::size_t> read_msg_; -#endif -}; - -} // namespace core - -} // namespace transport diff --git a/libtransport/src/io_modules/hicn-light-ng/CMakeLists.txt b/libtransport/src/io_modules/hicn-light-ng/CMakeLists.txt new file mode 100644 index 000000000..325a8bd1d --- /dev/null +++ b/libtransport/src/io_modules/hicn-light-ng/CMakeLists.txt @@ -0,0 +1,62 @@ +# Copyright (c) 2021-2022 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +if(CMAKE_SOURCE_DIR STREQUAL PROJECT_SOURCE_DIR) + find_package(Libhicnctrl ${CURRENT_VERSION} REQUIRED NO_MODULE) + + if (DISABLE_SHARED_LIBRARIES) + set(LIBTYPE static) + else() + set(LIBTYPE shared) + endif() + + list(APPEND LIBHICNCTRL_LIBRARIES hicn::hicnctrl.${LIBTYPE}) +else() + if (DISABLE_SHARED_LIBRARIES) + if (WIN32) + set(LIBHICNCTRL_LIBRARIES ${LIBHICNCTRL_STATIC}) + else () + set(LIBHICNCTRL_LIBRARIES ${LIBHICNCTRL_STATIC} log) + endif () + list(APPEND DEPENDENCIES + ${LIBHICNCTRL_STATIC} + ) + else() + set(LIBHICNCTRL_LIBRARIES ${LIBHICNCTRL_SHARED}) + list(APPEND DEPENDENCIES + ${LIBHICNCTRL_SHARED} + ) + endif() +endif() + +list(APPEND MODULE_HEADER_FILES + ${CMAKE_CURRENT_SOURCE_DIR}/hicn_forwarder_module.h +) + +list(APPEND MODULE_SOURCE_FILES + ${CMAKE_CURRENT_SOURCE_DIR}/hicn_forwarder_module.cc +) + +build_module(hicnlightng_module + SHARED + SOURCES ${MODULE_SOURCE_FILES} + DEPENDS ${DEPENDENCIES} + COMPONENT ${LIBTRANSPORT_COMPONENT} + LINK_LIBRARIES PRIVATE ${LIBHICNCTRL_LIBRARIES} + INCLUDE_DIRS + PRIVATE + ${LIBTRANSPORT_INTERNAL_INCLUDE_DIRS} + ${Libhicnctrl_INCLUDE_DIRS} + DEFINITIONS ${COMPILER_DEFINITIONS} + COMPILE_OPTIONS ${COMPILER_OPTIONS} +) diff --git a/libtransport/src/io_modules/hicn-light-ng/hicn_forwarder_module.cc b/libtransport/src/io_modules/hicn-light-ng/hicn_forwarder_module.cc new file mode 100644 index 000000000..f67bd9447 --- /dev/null +++ b/libtransport/src/io_modules/hicn-light-ng/hicn_forwarder_module.cc @@ -0,0 +1,264 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <core/udp_connector.h> +#include <io_modules/hicn-light-ng/hicn_forwarder_module.h> + +extern "C" { +#include <hicn/ctrl/hicn-light-ng.h> +} + +namespace transport { + +namespace core { + +HicnForwarderModule::HicnForwarderModule() + : IoModule(), connector_(nullptr), seq_(0) {} + +HicnForwarderModule::~HicnForwarderModule() {} + +void HicnForwarderModule::connect(bool is_consumer) { + connector_->connect("localhost", 9695); + connector_->setRole(is_consumer ? Connector::Role::CONSUMER + : Connector::Role::PRODUCER); +} + +bool HicnForwarderModule::isConnected() { return connector_->isConnected(); } + +void HicnForwarderModule::send(Packet &packet) { + IoModule::send(packet); + packet.setChecksum(); + connector_->send(packet); +} + +void HicnForwarderModule::send(const utils::MemBuf::Ptr &packet) { + counters_.tx_packets++; + counters_.tx_bytes += packet->length(); + + // Perfect forwarding + connector_->send(packet); +} + +void HicnForwarderModule::registerRoute(const Prefix &prefix) { + auto command = createCommandRoute(prefix.toSockaddr(), + (uint8_t)prefix.getPrefixLength()); + if (!command) { + // TODO error + return; + } + send(command); +} + +void HicnForwarderModule::sendMapme() { + auto command = createCommandMapmeSendUpdate(); + if (!command) { + // TODO error + return; + } + send(command); +} + +void HicnForwarderModule::setForwardingStrategy(const Prefix &prefix, + std::string &strategy) { + auto command = createCommandSetForwardingStrategy( + prefix.toSockaddr(), (uint8_t)prefix.getPrefixLength(), strategy); + if (!command) { + // TODO error + return; + } + send(command); +} + +void HicnForwarderModule::closeConnection() { + auto command = createCommandDeleteConnection(); + if (!command) { + // TODO error + return; + } + + connector_->setSentCallback([](Connector *c, const std::error_code &ec) { + if (!ec) { + c->close(); + } + }); + + send(command); +} + +void HicnForwarderModule::init( + Connector::PacketReceivedCallback &&receive_callback, + Connector::PacketSentCallback &&sent_callback, + Connector::OnReconnectCallback &&reconnect_callback, + asio::io_service &io_service, const std::string &app_name) { + if (!connector_) { + connector_.reset(new UdpTunnelConnector( + io_service, std::move(receive_callback), std::move(sent_callback), + nullptr, std::move(reconnect_callback))); + } +} + +void HicnForwarderModule::processControlMessageReply( + utils::MemBuf &packet_buffer) { + if (packet_buffer.data()[0] == NACK_LIGHT) { + throw errors::RuntimeException( + "Received Nack message from hicn light forwarder."); + } +} + +std::uint32_t HicnForwarderModule::getMtu() { return interface_mtu; } + +bool HicnForwarderModule::isControlMessage(utils::MemBuf &packet_buffer) { + return packet_buffer.data()[0] == ACK_LIGHT || + packet_buffer.data()[0] == NACK_LIGHT; +} + +/** + * @return A valid msg_route_add_t structure if the command was successful, or + * with .command_id == COMMAND_TYPE_UNDEFINED in case of error. + */ +utils::MemBuf::Ptr HicnForwarderModule::createCommandRoute( + std::unique_ptr<sockaddr> &&addr, uint8_t prefix_length) { + utils::MemBuf::Ptr ret = utils::MemBuf::create(sizeof(msg_route_add_t)); + auto command = reinterpret_cast<msg_route_add_t *>(ret->writableData()); + ret->append(sizeof(msg_route_add_t)); + std::memset(command, 0, sizeof(*command)); + + if (!IS_VALID_FAMILY(addr->sa_family)) return nullptr; + + *command = { + .header = + { + .message_type = REQUEST_LIGHT, + .command_id = COMMAND_TYPE_ROUTE_ADD, + .length = 1, + .seq_num = 0, + }, + .payload = + { + .cost = 1, + .family = (uint8_t)addr->sa_family, + .len = prefix_length, + }, + }; + + switch (addr->sa_family) { + case AF_INET: + command->payload.address.v4.as_inaddr = + ((sockaddr_in *)addr.get())->sin_addr; + break; + case AF_INET6: + command->payload.address.v6.as_in6addr = + ((sockaddr_in6 *)addr.get())->sin6_addr; + break; + } + snprintf(command->payload.symbolic_or_connid, SYMBOLIC_NAME_LEN, "%s", + "SELF"); + + return ret; +} + +utils::MemBuf::Ptr HicnForwarderModule::createCommandDeleteConnection() { + utils::MemBuf::Ptr ret = + utils::MemBuf::create(sizeof(msg_connection_remove_t)); + auto command = + reinterpret_cast<msg_connection_remove_t *>(ret->writableData()); + ret->append(sizeof(msg_connection_remove_t)); + std::memset(command, 0, sizeof(*command)); + + *command = { + .header = + { + .message_type = REQUEST_LIGHT, + .command_id = COMMAND_TYPE_CONNECTION_REMOVE, + .length = 1, + .seq_num = 0, + }, + }; + + snprintf(command->payload.symbolic_or_connid, SYMBOLIC_NAME_LEN, "%s", + "SELF"); + + return ret; +} + +utils::MemBuf::Ptr HicnForwarderModule::createCommandMapmeSendUpdate() { + utils::MemBuf::Ptr ret = + utils::MemBuf::create(sizeof(msg_mapme_send_update_t)); + auto command = + reinterpret_cast<msg_mapme_send_update_t *>(ret->writableData()); + ret->append(sizeof(msg_mapme_send_update_t)); + std::memset(command, 0, sizeof(*command)); + + *command = { + .header = + { + .message_type = REQUEST_LIGHT, + .command_id = COMMAND_TYPE_MAPME_SEND_UPDATE, + .length = 1, + .seq_num = seq_++, + }, + }; + + return ret; +} + +utils::MemBuf::Ptr HicnForwarderModule::createCommandSetForwardingStrategy( + std::unique_ptr<sockaddr> &&addr, uint32_t prefix_len, + std::string strategy) { + utils::MemBuf::Ptr ret = utils::MemBuf::create(sizeof(msg_strategy_set_t)); + auto command = reinterpret_cast<msg_strategy_set_t *>(ret->writableData()); + ret->append(sizeof(msg_strategy_set_t)); + std::memset(command, 0, sizeof(*command)); + + if (!IS_VALID_FAMILY(addr->sa_family)) return nullptr; + + strategy_type_t strategy_type = strategy_type_from_str(strategy.c_str()); + if (strategy_type == STRATEGY_TYPE_UNDEFINED) return nullptr; + + *command = { + .header = + { + .message_type = REQUEST_LIGHT, + .command_id = COMMAND_TYPE_STRATEGY_SET, + .length = 1, + .seq_num = seq_++, + }, + .payload = + { + .family = (uint8_t)addr->sa_family, + .len = (uint8_t)prefix_len, + .type = (uint8_t)strategy_type, + }, + }; + + switch (addr->sa_family) { + case AF_INET: + command->payload.address.v4.as_inaddr = + ((sockaddr_in *)addr.get())->sin_addr; + break; + case AF_INET6: + command->payload.address.v6.as_in6addr = + ((sockaddr_in6 *)addr.get())->sin6_addr; + break; + } + + return ret; +} + +extern "C" IoModule *create_module(void) { return new HicnForwarderModule(); } + +} // namespace core + +} // namespace transport diff --git a/libtransport/src/io_modules/udp/hicn_forwarder_module.h b/libtransport/src/io_modules/hicn-light-ng/hicn_forwarder_module.h index 845db73bf..0bf82757d 100644 --- a/libtransport/src/io_modules/udp/hicn_forwarder_module.h +++ b/libtransport/src/io_modules/hicn-light-ng/hicn_forwarder_module.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2020 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -18,15 +18,17 @@ #include <hicn/transport/core/io_module.h> #include <hicn/transport/core/prefix.h> +extern "C" { +#include <hicn/ctrl/hicn-light-ng.h> +} + namespace transport { namespace core { -class UdpSocketConnector; +class UdpTunnelConnector; class HicnForwarderModule : public IoModule { - static constexpr uint8_t ack_code = 0xc2; - static constexpr uint8_t nack_code = 0xc3; static constexpr std::uint16_t interface_mtu = 1500; public: @@ -56,27 +58,45 @@ class HicnForwarderModule : public IoModule { void connect(bool is_consumer) override; void send(Packet &packet) override; - void send(const uint8_t *packet, std::size_t len) override; + void send(const utils::MemBuf::Ptr &buffer) override; bool isConnected() override; void init(Connector::PacketReceivedCallback &&receive_callback, + Connector::PacketSentCallback &&sent_callback, Connector::OnReconnectCallback &&reconnect_callback, asio::io_service &io_service, const std::string &app_name = "Libtransport") override; void registerRoute(const Prefix &prefix) override; + void sendMapme() override; + + void setForwardingStrategy(const Prefix &prefix, + std::string &strategy) override; + std::uint32_t getMtu() override; - bool isControlMessage(const uint8_t *message) override; + bool isControlMessage(utils::MemBuf &packet_buffer) override; void processControlMessageReply(utils::MemBuf &packet_buffer) override; void closeConnection() override; private: - UdpSocketConnector *connector_; + utils::MemBuf::Ptr createCommandRoute(std::unique_ptr<sockaddr> &&addr, + uint8_t prefix_length); + utils::MemBuf::Ptr createCommandDeleteConnection(); + utils::MemBuf::Ptr createCommandMapmeSendUpdate(); + utils::MemBuf::Ptr createCommandSetForwardingStrategy( + std::unique_ptr<sockaddr> &&addr, uint32_t prefix_len, + std::string strategy); + + private: + std::shared_ptr<UdpTunnelConnector> connector_; + + /* Sequence number used for sending control messages */ + uint32_t seq_; }; extern "C" IoModule *create_module(void); diff --git a/libtransport/src/io_modules/loopback/CMakeLists.txt b/libtransport/src/io_modules/loopback/CMakeLists.txt index b5ae0b7f7..817effb3b 100644 --- a/libtransport/src/io_modules/loopback/CMakeLists.txt +++ b/libtransport/src/io_modules/loopback/CMakeLists.txt @@ -1,4 +1,4 @@ -# Copyright (c) 2021 Cisco and/or its affiliates. +# Copyright (c) 2021-2022 Cisco and/or its affiliates. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at: @@ -20,11 +20,11 @@ list(APPEND MODULE_SOURCE_FILES ) build_module(loopback_module - SHARED SOURCES ${MODULE_SOURCE_FILES} DEPENDS ${DEPENDENCIES} COMPONENT ${LIBTRANSPORT_COMPONENT} - INCLUDE_DIRS ${LIBTRANSPORT_INCLUDE_DIRS} ${LIBTRANSPORT_INTERNAL_INCLUDE_DIRS} + INCLUDE_DIRS + PRIVATE ${LIBTRANSPORT_INTERNAL_INCLUDE_DIRS} ${Libhicn_INCLUDE_DIRS} DEFINITIONS ${COMPILER_DEFINITIONS} - COMPILE_OPTIONS ${COMPILE_FLAGS} + COMPILE_OPTIONS ${COMPILER_OPTIONS} ) diff --git a/libtransport/src/io_modules/loopback/local_face.cc b/libtransport/src/io_modules/loopback/local_face.cc index b73444330..7ef3f1a59 100644 --- a/libtransport/src/io_modules/loopback/local_face.cc +++ b/libtransport/src/io_modules/loopback/local_face.cc @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2020 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: diff --git a/libtransport/src/io_modules/loopback/local_face.h b/libtransport/src/io_modules/loopback/local_face.h index 1f4101447..f54f38afa 100644 --- a/libtransport/src/io_modules/loopback/local_face.h +++ b/libtransport/src/io_modules/loopback/local_face.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2020 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: diff --git a/libtransport/src/io_modules/loopback/loopback_module.cc b/libtransport/src/io_modules/loopback/loopback_module.cc index f7dd5e7b0..5b7ed5f61 100644 --- a/libtransport/src/io_modules/loopback/loopback_module.cc +++ b/libtransport/src/io_modules/loopback/loopback_module.cc @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2020 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -41,7 +41,7 @@ void LoopbackModule::send(Packet &packet) { local_faces_.at(1 - local_id_)->send(packet); } -void LoopbackModule::send(const uint8_t *packet, std::size_t len) { +void LoopbackModule::send(const utils::MemBuf::Ptr &buffer) { // not supported throw errors::NotImplementedException(); } @@ -57,6 +57,7 @@ void LoopbackModule::closeConnection() { } void LoopbackModule::init(Connector::PacketReceivedCallback &&receive_callback, + Connector::PacketSentCallback &&sent_callback, Connector::OnReconnectCallback &&reconnect_callback, asio::io_service &io_service, const std::string &app_name) { @@ -64,8 +65,9 @@ void LoopbackModule::init(Connector::PacketReceivedCallback &&receive_callback, local_id_ = global_counter_++; local_faces_.emplace( local_faces_.begin() + local_id_, - new LocalConnector(io_service, std::move(receive_callback), nullptr, - nullptr, std::move(reconnect_callback))); + new LocalConnector(io_service, std::move(receive_callback), + std::move(sent_callback), nullptr, + std::move(reconnect_callback))); } } @@ -75,7 +77,9 @@ void LoopbackModule::processControlMessageReply(utils::MemBuf &packet_buffer) { std::uint32_t LoopbackModule::getMtu() { return interface_mtu; } -bool LoopbackModule::isControlMessage(const uint8_t *message) { return false; } +bool LoopbackModule::isControlMessage(utils::MemBuf &packet_buffer) { + return false; +} extern "C" IoModule *create_module(void) { return new LoopbackModule(); } diff --git a/libtransport/src/io_modules/loopback/loopback_module.h b/libtransport/src/io_modules/loopback/loopback_module.h index 219fa8841..2779ae7e3 100644 --- a/libtransport/src/io_modules/loopback/loopback_module.h +++ b/libtransport/src/io_modules/loopback/loopback_module.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2020 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -36,11 +36,12 @@ class LoopbackModule : public IoModule { void connect(bool is_consumer) override; void send(Packet &packet) override; - void send(const uint8_t *packet, std::size_t len) override; + void send(const utils::MemBuf::Ptr &buffer) override; bool isConnected() override; void init(Connector::PacketReceivedCallback &&receive_callback, + Connector::PacketSentCallback &&sent_callback, Connector::OnReconnectCallback &&reconnect_callback, asio::io_service &io_service, const std::string &app_name = "Libtransport") override; @@ -49,7 +50,7 @@ class LoopbackModule : public IoModule { std::uint32_t getMtu() override; - bool isControlMessage(const uint8_t *message) override; + bool isControlMessage(utils::MemBuf &packet_buffer) override; void processControlMessageReply(utils::MemBuf &packet_buffer) override; diff --git a/libtransport/src/io_modules/memif/CMakeLists.txt b/libtransport/src/io_modules/memif/CMakeLists.txt index fc1c1f135..134ac1db6 100644 --- a/libtransport/src/io_modules/memif/CMakeLists.txt +++ b/libtransport/src/io_modules/memif/CMakeLists.txt @@ -1,4 +1,4 @@ -# Copyright (c) 2021 Cisco and/or its affiliates. +# Copyright (c) 2021-2022 Cisco and/or its affiliates. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at: @@ -11,18 +11,28 @@ # See the License for the specific language governing permissions and # limitations under the License. -find_package(Vpp REQUIRED) -find_package(Libmemif REQUIRED) + +############################################################## +# Dependencies and third party libs +############################################################## +find_package(Vpp ${VPP_DEFAULT_VERSION} EXACT REQUIRED) if(CMAKE_SOURCE_DIR STREQUAL PROJECT_SOURCE_DIR) - find_package(HicnPlugin REQUIRED) - find_package(SafeVapi REQUIRED) + find_package(HicnPlugin ${CURRENT_VERSION} REQUIRED) + find_package(SafeVapi ${CURRENT_VERSION} REQUIRED) else() list(APPEND DEPENDENCIES ${SAFE_VAPI_SHARED} ) endif() +list(APPEND DEPENDENCIES + ${MEMIF_THIRD_PARTY_DEPENDENCIES} +) + +############################################################## +# Sources +############################################################## list(APPEND MODULE_HEADER_FILES ${CMAKE_CURRENT_SOURCE_DIR}/hicn_vapi.h ${CMAKE_CURRENT_SOURCE_DIR}/memif_connector.h @@ -32,23 +42,23 @@ list(APPEND MODULE_HEADER_FILES list(APPEND MODULE_SOURCE_FILES ${CMAKE_CURRENT_SOURCE_DIR}/hicn_vapi.c - ${CMAKE_CURRENT_SOURCE_DIR}/memif_connector.cc ${CMAKE_CURRENT_SOURCE_DIR}/memif_vapi.c ${CMAKE_CURRENT_SOURCE_DIR}/vpp_forwarder_module.cc ) build_module(memif_module - SHARED SOURCES ${MODULE_SOURCE_FILES} DEPENDS ${DEPENDENCIES} COMPONENT ${LIBTRANSPORT_COMPONENT}-io-modules - LINK_LIBRARIES ${LIBMEMIF_LIBRARIES} ${SAFE_VAPI_LIBRARIES} + OBJECT_LIBRARIES ${MEMIF_THIRD_PARTY_OBJECT_LIBRARIES} + LINK_LIBRARIES PRIVATE ${HICN_LIBRARIES} ${SAFE_VAPI_LIBRARIES} INCLUDE_DIRS - ${LIBTRANSPORT_INCLUDE_DIRS} - ${LIBTRANSPORT_INTERNAL_INCLUDE_DIRS} - ${VPP_INCLUDE_DIRS} - ${LIBMEMIF_INCLUDE_DIRS} - ${SAFE_VAPI_INCLUDE_DIRS} + PUBLIC + ${MEMIF_THIRD_PARTY_INCLUDE_DIRS} + ${LIBTRANSPORT_INTERNAL_INCLUDE_DIRS} + ${VPP_INCLUDE_DIRS} + ${LIBMEMIF_INCLUDE_DIRS} + ${SAFE_VAPI_INCLUDE_DIRS} DEFINITIONS ${COMPILER_DEFINITIONS} - COMPILE_OPTIONS ${COMPILE_FLAGS} + COMPILE_OPTIONS ${COMPILER_OPTIONS} ${MARCH_COMPILER_OPTIONS} ) diff --git a/libtransport/src/io_modules/memif/hicn_vapi.c b/libtransport/src/io_modules/memif/hicn_vapi.c index 6d78026ab..753679f54 100644 --- a/libtransport/src/io_modules/memif/hicn_vapi.c +++ b/libtransport/src/io_modules/memif/hicn_vapi.c @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2020 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -27,8 +27,6 @@ #include <vlibapi/api.h> #include <vlibmemory/api.h> #include <vnet/ip/format.h> -#include <vnet/ip/ip4_packet.h> -#include <vnet/ip/ip6_packet.h> #include <vpp_plugins/hicn/error.h> #include <vppinfra/error.h> @@ -46,9 +44,6 @@ u8 *format_vl_api_address_union(u8 *s, va_list *args) { return NULL; } /*********************************************************************************/ -DEFINE_VAPI_MSG_IDS_HICN_API_JSON -DEFINE_VAPI_MSG_IDS_IP_API_JSON - static vapi_error_e register_prod_app_cb( vapi_ctx_t ctx, void *callback_ctx, vapi_error_e rv, bool is_last, vapi_payload_hicn_api_register_prod_app_reply *reply) { @@ -78,7 +73,7 @@ int hicn_vapi_register_prod_app(vapi_ctx_t ctx, vapi_msg_hicn_api_register_prod_app *msg = vapi_alloc_hicn_api_register_prod_app(ctx); - if (ip46_address_is_ip4((ip46_address_t *)&input_params->prefix->address)) { + if (ip_address_is_v4((ip_address_t *)&input_params->prefix->address)) { memcpy(&msg->payload.prefix.address.un.ip4, &input_params->prefix->address, sizeof(ip4_address_t)); msg->payload.prefix.address.af = ADDRESS_IP4; @@ -190,7 +185,7 @@ int hicn_vapi_register_route(vapi_ctx_t ctx, vapi_msg_ip_route_add_del *msg = vapi_alloc_ip_route_add_del(ctx, 1); msg->payload.is_add = 1; - if (ip46_address_is_ip4((ip46_address_t *)(input_params->prod_addr))) { + if (ip_address_is_v4((ip_address_t *)(input_params->prod_addr))) { memcpy(&msg->payload.route.prefix.address.un.ip4, &input_params->prefix->address.v4, sizeof(ip4_address_t)); msg->payload.route.prefix.address.af = ADDRESS_IP4; @@ -204,7 +199,7 @@ int hicn_vapi_register_route(vapi_ctx_t ctx, msg->payload.route.paths[0].sw_if_index = ~0; msg->payload.route.paths[0].table_id = 0; - if (ip46_address_is_ip4((ip46_address_t *)(input_params->prod_addr))) { + if (ip_address_is_v4((ip_address_t *)(input_params->prod_addr))) { memcpy(&(msg->payload.route.paths[0].nh.address.ip4), input_params->prod_addr->v4.as_u8, sizeof(ip4_address_t)); msg->payload.route.paths[0].proto = FIB_API_PATH_NH_PROTO_IP4; @@ -214,7 +209,7 @@ int hicn_vapi_register_route(vapi_ctx_t ctx, msg->payload.route.paths[0].proto = FIB_API_PATH_NH_PROTO_IP6; } - msg->payload.route.paths[0].type = FIB_API_PATH_FLAG_NONE; + msg->payload.route.paths[0].type = FIB_API_PATH_TYPE_NORMAL; msg->payload.route.paths[0].flags = FIB_API_PATH_FLAG_NONE; int ret = vapi_ip_route_add_del(ctx, msg, reigster_route_cb, NULL); diff --git a/libtransport/src/io_modules/memif/hicn_vapi.h b/libtransport/src/io_modules/memif/hicn_vapi.h index e94c97749..967179f68 100644 --- a/libtransport/src/io_modules/memif/hicn_vapi.h +++ b/libtransport/src/io_modules/memif/hicn_vapi.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2020 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: diff --git a/libtransport/src/io_modules/memif/memif_connector.cc b/libtransport/src/io_modules/memif/memif_connector.cc deleted file mode 100644 index 68ad52b63..000000000 --- a/libtransport/src/io_modules/memif/memif_connector.cc +++ /dev/null @@ -1,492 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <glog/logging.h> -#include <hicn/transport/errors/not_implemented_exception.h> -#include <io_modules/memif/memif_connector.h> -#include <sys/epoll.h> - -#include <cstdlib> - -extern "C" { -#include <memif/libmemif.h> -}; - -#define CANCEL_TIMER 1 - -namespace transport { - -namespace core { - -struct memif_connection { - uint16_t index; - /* memif conenction handle */ - memif_conn_handle_t conn; - /* transmit queue id */ - uint16_t tx_qid; - /* tx buffers */ - memif_buffer_t *tx_bufs; - /* allocated tx buffers counter */ - /* number of tx buffers pointing to shared memory */ - uint16_t tx_buf_num; - /* rx buffers */ - memif_buffer_t *rx_bufs; - /* allcoated rx buffers counter */ - /* number of rx buffers pointing to shared memory */ - uint16_t rx_buf_num; - /* interface ip address */ - uint8_t ip_addr[4]; -}; - -std::once_flag MemifConnector::flag_; -utils::EpollEventReactor MemifConnector::main_event_reactor_; - -MemifConnector::MemifConnector(PacketReceivedCallback &&receive_callback, - PacketSentCallback &&packet_sent, - OnCloseCallback &&close_callback, - OnReconnectCallback &&on_reconnect, - asio::io_service &io_service, - std::string app_name) - : Connector(std::move(receive_callback), std::move(packet_sent), - std::move(close_callback), std::move(on_reconnect)), - memif_worker_(nullptr), - timer_set_(false), - send_timer_(std::make_unique<utils::FdDeadlineTimer>(event_reactor_)), - disconnect_timer_( - std::make_unique<utils::FdDeadlineTimer>(event_reactor_)), - io_service_(io_service), - work_(asio::make_work_guard(io_service_)), - memif_connection_(std::make_unique<memif_connection_t>()), - tx_buf_counter_(0), - is_reconnection_(false), - data_available_(false), - app_name_(app_name), - socket_filename_("") { - std::call_once(MemifConnector::flag_, &MemifConnector::init, this); -} - -MemifConnector::~MemifConnector() { close(); } - -void MemifConnector::init() { - /* initialize memory interface */ - int err = memif_init(controlFdUpdate, const_cast<char *>(app_name_.c_str()), - nullptr, nullptr, nullptr); - - if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) { - LOG(ERROR) << "memif_init: " << memif_strerror(err); - } -} - -void MemifConnector::connect(uint32_t memif_id, long memif_mode) { - state_ = State::CONNECTING; - - memif_id_ = memif_id; - socket_filename_ = "/run/vpp/memif.sock"; - - createMemif(memif_id, memif_mode, nullptr); - - while (state_ != State::CONNECTED) { - MemifConnector::main_event_reactor_.runOneEvent(); - } - - int err; - - /* get interrupt queue id */ - int fd = -1; - err = memif_get_queue_efd(memif_connection_->conn, 0, &fd); - if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) { - LOG(ERROR) << "memif_get_queue_efd: " << memif_strerror(err); - return; - } - - // Remove fd from main epoll - main_event_reactor_.delFileDescriptor(fd); - - // Add fd to epoll of instance - event_reactor_.addFileDescriptor( - fd, EPOLLIN, [this](const utils::Event &evt) -> int { - return onInterrupt(memif_connection_->conn, this, 0); - }); - - memif_worker_ = std::make_unique<std::thread>( - std::bind(&MemifConnector::threadMain, this)); -} - -int MemifConnector::createMemif(uint32_t index, uint8_t mode, char *s) { - memif_connection_t *c = memif_connection_.get(); - - /* setting memif connection arguments */ - memif_conn_args_t args; - memset(&args, 0, sizeof(args)); - - args.is_master = mode; - args.log2_ring_size = MEMIF_LOG2_RING_SIZE; - args.buffer_size = MEMIF_BUF_SIZE; - args.num_s2m_rings = 1; - args.num_m2s_rings = 1; - strncpy((char *)args.interface_name, IF_NAME, strlen(IF_NAME) + 1); - args.mode = memif_interface_mode_t::MEMIF_INTERFACE_MODE_IP; - - int err; - - err = memif_create_socket(&args.socket, socket_filename_.c_str(), nullptr); - - if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) { - throw errors::RuntimeException(memif_strerror(err)); - } - - args.interface_id = index; - /* last argument for memif_create (void * private_ctx) is used by user - to identify connection. this context is returned with callbacks */ - - /* default interrupt */ - if (s == nullptr) { - err = memif_create(&c->conn, &args, onConnect, onDisconnect, onInterrupt, - this); - - if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) { - throw errors::RuntimeException(memif_strerror(err)); - } - } - - c->index = (uint16_t)index; - c->tx_qid = 0; - /* alloc memif buffers */ - c->rx_buf_num = 0; - c->rx_bufs = static_cast<memif_buffer_t *>( - malloc(sizeof(memif_buffer_t) * MAX_MEMIF_BUFS)); - c->tx_buf_num = 0; - c->tx_bufs = static_cast<memif_buffer_t *>( - malloc(sizeof(memif_buffer_t) * MAX_MEMIF_BUFS)); - - // memif_set_rx_mode (c->conn, MEMIF_RX_MODE_POLLING, 0); - - return 0; -} - -int MemifConnector::deleteMemif() { - memif_connection_t *c = memif_connection_.get(); - - if (c->rx_bufs) { - free(c->rx_bufs); - } - - c->rx_bufs = nullptr; - c->rx_buf_num = 0; - - if (c->tx_bufs) { - free(c->tx_bufs); - } - - c->tx_bufs = nullptr; - c->tx_buf_num = 0; - - int err; - /* disconenct then delete memif connection */ - err = memif_delete(&c->conn); - - if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) { - LOG(ERROR) << "memif_delete: " << memif_strerror(err); - } - - if (TRANSPORT_EXPECT_FALSE(c->conn != nullptr)) { - LOG(ERROR) << "memif delete fail"; - } - - return 0; -} - -int MemifConnector::controlFdUpdate(int fd, uint8_t events, void *private_ctx) { - /* convert memif event definitions to epoll events */ - if (events & MEMIF_FD_EVENT_DEL) { - return MemifConnector::main_event_reactor_.delFileDescriptor(fd); - } - - uint32_t evt = 0; - - if (events & MEMIF_FD_EVENT_READ) { - evt |= EPOLLIN; - } - - if (events & MEMIF_FD_EVENT_WRITE) { - evt |= EPOLLOUT; - } - - if (events & MEMIF_FD_EVENT_MOD) { - return MemifConnector::main_event_reactor_.modFileDescriptor(fd, evt); - } - - return MemifConnector::main_event_reactor_.addFileDescriptor( - fd, evt, [](const utils::Event &evt) -> int { - uint32_t event = 0; - int memif_err = 0; - - if (evt.events & EPOLLIN) { - event |= MEMIF_FD_EVENT_READ; - } - - if (evt.events & EPOLLOUT) { - event |= MEMIF_FD_EVENT_WRITE; - } - - if (evt.events & EPOLLERR) { - event |= MEMIF_FD_EVENT_ERROR; - } - - memif_err = memif_control_fd_handler(evt.data.fd, event); - - if (TRANSPORT_EXPECT_FALSE(memif_err != MEMIF_ERR_SUCCESS)) { - LOG(ERROR) << "memif_control_fd_handler: " - << memif_strerror(memif_err); - } - - return 0; - }); -} - -int MemifConnector::bufferAlloc(long n, uint16_t qid) { - memif_connection_t *c = memif_connection_.get(); - int err; - uint16_t r; - /* set data pointer to shared memory and set buffer_len to shared mmeory - * buffer len */ - err = memif_buffer_alloc(c->conn, qid, c->tx_bufs, n, &r, 2000); - - if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) { - LOG(ERROR) << "memif_buffer_alloc: " << memif_strerror(err); - return -1; - } - - c->tx_buf_num += r; - return r; -} - -int MemifConnector::txBurst(uint16_t qid) { - memif_connection_t *c = memif_connection_.get(); - int err; - uint16_t r; - /* inform peer memif interface about data in shared memory buffers */ - /* mark memif buffers as free */ - err = memif_tx_burst(c->conn, qid, c->tx_bufs, c->tx_buf_num, &r); - - if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) { - LOG(ERROR) << "memif_tx_burst: " << memif_strerror(err); - } - - // err = memif_refill_queue(c->conn, qid, r, 0); - - if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) { - LOG(ERROR) << "memif_tx_burst: " << memif_strerror(err); - c->tx_buf_num -= r; - return -1; - } - - c->tx_buf_num -= r; - return 0; -} - -void MemifConnector::sendCallback(const std::error_code &ec) { - timer_set_ = false; - - if (TRANSPORT_EXPECT_TRUE(!ec && state_ == State::CONNECTED)) { - doSend(); - } -} - -void MemifConnector::processInputBuffer(std::uint16_t total_packets) { - utils::MemBuf::Ptr ptr; - - for (; total_packets > 0; total_packets--) { - if (input_buffer_.pop(ptr)) { - receive_callback_(this, *ptr, std::make_error_code(std::errc(0))); - } - } -} - -/* informs user about connected status. private_ctx is used by user to identify - connection (multiple connections WIP) */ -int MemifConnector::onConnect(memif_conn_handle_t conn, void *private_ctx) { - MemifConnector *connector = (MemifConnector *)private_ctx; - connector->state_ = State::CONNECTED; - memif_refill_queue(conn, 0, -1, 0); - - return 0; -} - -/* informs user about disconnected status. private_ctx is used by user to - identify connection (multiple connections WIP) */ -int MemifConnector::onDisconnect(memif_conn_handle_t conn, void *private_ctx) { - MemifConnector *connector = (MemifConnector *)private_ctx; - connector->state_ = State::CLOSED; - return 0; -} - -void MemifConnector::threadMain() { event_reactor_.runEventLoop(1000); } - -int MemifConnector::onInterrupt(memif_conn_handle_t conn, void *private_ctx, - uint16_t qid) { - MemifConnector *connector = (MemifConnector *)private_ctx; - - memif_connection_t *c = connector->memif_connection_.get(); - int err = MEMIF_ERR_SUCCESS, ret_val; - uint16_t total_packets = 0; - uint16_t rx; - - do { - err = memif_rx_burst(conn, qid, c->rx_bufs, MAX_MEMIF_BUFS, &rx); - ret_val = err; - - if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS && - err != MEMIF_ERR_NOBUF)) { - LOG(ERROR) << "memif_rx_burst: " << memif_strerror(err); - goto error; - } - - c->rx_buf_num += rx; - - if (TRANSPORT_EXPECT_FALSE(connector->io_service_.stopped())) { - LOG(ERROR) << "socket stopped: ignoring " << rx << " packets"; - goto error; - } - - std::size_t packet_length; - for (int i = 0; i < rx; i++) { - auto buffer = connector->getRawBuffer(); - packet_length = (c->rx_bufs + i)->len; - std::memcpy(buffer.first, (c->rx_bufs + i)->data, packet_length); - auto packet = connector->getPacketFromBuffer(buffer.first, packet_length); - - if (!connector->input_buffer_.push(std::move(packet))) { - LOG(ERROR) << "Error pushing packet. Ring buffer full."; - - // TODO Here we should consider the possibility to signal the congestion - // to the application, that would react properly (e.g. slow down - // message) - } - } - - /* mark memif buffers and shared memory buffers as free */ - /* free processed buffers */ - - err = memif_refill_queue(conn, qid, rx, 0); - - if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) { - LOG(ERROR) << "memif_buffer_free: " << memif_strerror(err); - } - - c->rx_buf_num -= rx; - total_packets += rx; - - } while (ret_val == MEMIF_ERR_NOBUF); - - connector->io_service_.post( - std::bind(&MemifConnector::processInputBuffer, connector, total_packets)); - - return 0; - -error: - err = memif_refill_queue(c->conn, qid, rx, 0); - - if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) { - LOG(ERROR) << "memif_buffer_free: " << memif_strerror(err); - } - c->rx_buf_num -= rx; - - return 0; -} - -void MemifConnector::close() { - if (state_ != State::CLOSED) { - disconnect_timer_->expiresFromNow(std::chrono::microseconds(50)); - disconnect_timer_->asyncWait([this](const std::error_code &ec) { - deleteMemif(); - event_reactor_.stop(); - }); - - if (memif_worker_ && memif_worker_->joinable()) { - memif_worker_->join(); - } - } -} - -void MemifConnector::send(Packet &packet) { - { - utils::SpinLock::Acquire locked(write_msgs_lock_); - output_buffer_.push_back(packet.shared_from_this()); - } -#if CANCEL_TIMER - if (!timer_set_) { - timer_set_ = true; - send_timer_->expiresFromNow(std::chrono::microseconds(50)); - send_timer_->asyncWait( - std::bind(&MemifConnector::sendCallback, this, std::placeholders::_1)); - } -#endif -} - -int MemifConnector::doSend() { - std::size_t max = 0; - int32_t n = 0; - std::size_t size = 0; - - { - utils::SpinLock::Acquire locked(write_msgs_lock_); - size = output_buffer_.size(); - } - - do { - max = size < MAX_MEMIF_BUFS ? size : MAX_MEMIF_BUFS; - n = bufferAlloc(max, memif_connection_->tx_qid); - - if (TRANSPORT_EXPECT_FALSE(n < 0)) { - LOG(ERROR) << "Error allocating buffers."; - return -1; - } - - for (uint16_t i = 0; i < n; i++) { - utils::SpinLock::Acquire locked(write_msgs_lock_); - - auto packet = output_buffer_.front().get(); - const utils::MemBuf *current = packet; - std::size_t offset = 0; - uint8_t *shared_buffer = - reinterpret_cast<uint8_t *>(memif_connection_->tx_bufs[i].data); - do { - std::memcpy(shared_buffer + offset, current->data(), current->length()); - offset += current->length(); - current = current->next(); - } while (current != packet); - - memif_connection_->tx_bufs[i].len = uint32_t(offset); - - output_buffer_.pop_front(); - } - - txBurst(memif_connection_->tx_qid); - - utils::SpinLock::Acquire locked(write_msgs_lock_); - size = output_buffer_.size(); - } while (size > 0); - - return 0; -} - -void MemifConnector::send(const uint8_t *packet, std::size_t len) { - throw errors::NotImplementedException(); -} - -} // end namespace core - -} // end namespace transport diff --git a/libtransport/src/io_modules/memif/memif_connector.h b/libtransport/src/io_modules/memif/memif_connector.h deleted file mode 100644 index 0a189f893..000000000 --- a/libtransport/src/io_modules/memif/memif_connector.h +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include <hicn/transport/config.h> -#include <hicn/transport/core/connector.h> -#include <hicn/transport/portability/portability.h> -#include <hicn/transport/utils/ring_buffer.h> -//#include <hicn/transport/core/hicn_vapi.h> -#include <hicn/transport/core/asio_wrapper.h> -#include <utils/epoll_event_reactor.h> -#include <utils/fd_deadline_timer.h> - -#include <deque> -#include <mutex> -#include <thread> - -#define _Static_assert static_assert - -namespace transport { - -namespace core { - -typedef struct memif_connection memif_connection_t; - -#define APP_NAME "libtransport" -#define IF_NAME "vpp_connection" - -#define MEMIF_BUF_SIZE 2048 -#define MEMIF_LOG2_RING_SIZE 13 -#define MAX_MEMIF_BUFS (1 << MEMIF_LOG2_RING_SIZE) - -class MemifConnector : public Connector { - using memif_conn_handle_t = void *; - using PacketRing = utils::CircularFifo<utils::MemBuf::Ptr, queue_size>; - - public: - MemifConnector(PacketReceivedCallback &&receive_callback, - PacketSentCallback &&packet_sent, - OnCloseCallback &&close_callback, - OnReconnectCallback &&on_reconnect, - asio::io_service &io_service, - std::string app_name = "Libtransport"); - - ~MemifConnector() override; - - void send(Packet &packet) override; - - void send(const uint8_t *packet, std::size_t len) override; - - void close() override; - - void connect(uint32_t memif_id, long memif_mode); - - TRANSPORT_ALWAYS_INLINE uint32_t getMemifId() { return memif_id_; }; - - private: - void init(); - - int doSend(); - - int createMemif(uint32_t index, uint8_t mode, char *s); - - uint32_t getMemifConfiguration(); - - int deleteMemif(); - - static int controlFdUpdate(int fd, uint8_t events, void *private_ctx); - - static int onConnect(memif_conn_handle_t conn, void *private_ctx); - - static int onDisconnect(memif_conn_handle_t conn, void *private_ctx); - - static int onInterrupt(memif_conn_handle_t conn, void *private_ctx, - uint16_t qid); - - void threadMain(); - - int txBurst(uint16_t qid); - - int bufferAlloc(long n, uint16_t qid); - - void sendCallback(const std::error_code &ec); - - void processInputBuffer(std::uint16_t total_packets); - - private: - static utils::EpollEventReactor main_event_reactor_; - static std::unique_ptr<std::thread> main_worker_; - - int epfd; - std::unique_ptr<std::thread> memif_worker_; - utils::EpollEventReactor event_reactor_; - std::atomic_bool timer_set_; - std::unique_ptr<utils::FdDeadlineTimer> send_timer_; - std::unique_ptr<utils::FdDeadlineTimer> disconnect_timer_; - asio::io_service &io_service_; - asio::executor_work_guard<asio::io_context::executor_type> work_; - std::unique_ptr<memif_connection_t> memif_connection_; - uint16_t tx_buf_counter_; - - PacketRing input_buffer_; - bool is_reconnection_; - bool data_available_; - uint32_t memif_id_; - uint8_t memif_mode_; - std::string app_name_; - uint16_t transmission_index_; - utils::SpinLock write_msgs_lock_; - std::string socket_filename_; - - static std::once_flag flag_; -}; - -} // end namespace core - -} // end namespace transport diff --git a/libtransport/src/io_modules/memif/memif_vapi.c b/libtransport/src/io_modules/memif/memif_vapi.c index b3da2b012..54e2c3134 100644 --- a/libtransport/src/io_modules/memif/memif_vapi.c +++ b/libtransport/src/io_modules/memif/memif_vapi.c @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -22,8 +22,6 @@ #include <vapi/vapi_safe.h> #include <vppinfra/clib.h> -DEFINE_VAPI_MSG_IDS_MEMIF_API_JSON - static vapi_error_e memif_details_cb(vapi_ctx_t ctx, void *callback_ctx, vapi_error_e rv, bool is_last, vapi_payload_memif_details *reply) { @@ -45,6 +43,9 @@ static vapi_error_e memif_details_cb(vapi_ctx_t ctx, void *callback_ctx, int memif_vapi_get_next_memif_id(vapi_ctx_t ctx, uint32_t *memif_id) { vapi_lock(); vapi_msg_memif_dump *msg = vapi_alloc_memif_dump(ctx); + + // Initialize memif id to 0 + *memif_id = 0; int ret = vapi_memif_dump(ctx, msg, memif_details_cb, memif_id); vapi_unlock(); return ret; diff --git a/libtransport/src/io_modules/memif/memif_vapi.h b/libtransport/src/io_modules/memif/memif_vapi.h index bcf06ed43..f5f0639e7 100644 --- a/libtransport/src/io_modules/memif/memif_vapi.h +++ b/libtransport/src/io_modules/memif/memif_vapi.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -44,6 +44,10 @@ typedef struct memif_output_params_s { int memif_vapi_get_next_memif_id(vapi_ctx_t ctx, uint32_t *memif_id); +int memif_vapi_create_socket(vapi_ctx_t ctx, + memif_create_params_t *input_params, + memif_output_params_t *output_params); + int memif_vapi_create_memif(vapi_ctx_t ctx, memif_create_params_t *input_params, memif_output_params_t *output_params); diff --git a/libtransport/src/io_modules/memif/vpp_forwarder_module.cc b/libtransport/src/io_modules/memif/vpp_forwarder_module.cc index 44c8376df..65260077a 100644 --- a/libtransport/src/io_modules/memif/vpp_forwarder_module.cc +++ b/libtransport/src/io_modules/memif/vpp_forwarder_module.cc @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -13,16 +13,16 @@ * limitations under the License. */ +#include <core/memif_connector.h> #include <glog/logging.h> #include <hicn/transport/config.h> #include <hicn/transport/errors/not_implemented_exception.h> #include <io_modules/memif/hicn_vapi.h> -#include <io_modules/memif/memif_connector.h> #include <io_modules/memif/memif_vapi.h> #include <io_modules/memif/vpp_forwarder_module.h> extern "C" { -#include <memif/libmemif.h> +#include <libmemif.h> }; typedef enum { MASTER = 0, SLAVE = 1 } memif_role_t; @@ -39,21 +39,24 @@ namespace core { VPPForwarderModule::VPPForwarderModule() : IoModule(), connector_(nullptr), + memif_id_(0), sw_if_index_(~0), face_id1_(~0), face_id2_(~0), is_consumer_(false) {} -VPPForwarderModule::~VPPForwarderModule() { delete connector_; } +VPPForwarderModule::~VPPForwarderModule() {} void VPPForwarderModule::init( Connector::PacketReceivedCallback &&receive_callback, + Connector::PacketSentCallback &&sent_callback, Connector::OnReconnectCallback &&reconnect_callback, asio::io_service &io_service, const std::string &app_name) { if (!connector_) { - connector_ = - new MemifConnector(std::move(receive_callback), 0, 0, - std::move(reconnect_callback), io_service, app_name); + connector_ = std::make_unique<MemifConnector>( + std::move(receive_callback), std::move(sent_callback), + Connector::OnCloseCallback(0), std::move(reconnect_callback), + io_service, app_name); } } @@ -62,7 +65,7 @@ void VPPForwarderModule::processControlMessageReply( throw errors::NotImplementedException(); } -bool VPPForwarderModule::isControlMessage(const uint8_t *message) { +bool VPPForwarderModule::isControlMessage(utils::MemBuf &packet_buffer) { return false; } @@ -73,12 +76,12 @@ void VPPForwarderModule::send(Packet &packet) { connector_->send(packet); } -void VPPForwarderModule::send(const uint8_t *packet, std::size_t len) { +void VPPForwarderModule::send(const utils::MemBuf::Ptr &buffer) { counters_.tx_packets++; - counters_.tx_bytes += len; + counters_.tx_bytes += buffer->length(); // Perfect forwarding - connector_->send(packet, len); + connector_->send(buffer); } std::uint32_t VPPForwarderModule::getMtu() { return interface_mtu; } @@ -170,7 +173,8 @@ void VPPForwarderModule::connect(bool is_consumer) { consumerConnection(); } - connector_->connect(memif_id_, 0); + connector_->connect(memif_id_, 0 /* is_master = false */, + memif_socket_filename); connector_->setRole(is_consumer_ ? Connector::Role::CONSUMER : Connector::Role::PRODUCER); } @@ -207,7 +211,8 @@ void VPPForwarderModule::registerRoute(const Prefix &prefix) { throw errors::RuntimeException(hicn_vapi_get_error_string(ret)); } - inet6_address_ = *output.prod_addr; + std::memcpy(inet6_address_.v6.as_u8, output.prod_addr->v6.as_u8, + sizeof(inet6_address_)); face_id1_ = output.face_id; } else { @@ -228,8 +233,6 @@ void VPPForwarderModule::registerRoute(const Prefix &prefix) { void VPPForwarderModule::closeConnection() { if (VPPForwarderModule::sock_) { - connector_->close(); - if (is_consumer_) { hicn_del_face_app_input_params params; params.face_id = face_id1_; @@ -242,6 +245,8 @@ void VPPForwarderModule::closeConnection() { hicn_vapi_face_prod_del(VPPForwarderModule::sock_, ¶ms); } + connector_->close(); + if (sw_if_index_ != uint32_t(~0)) { int ret = memif_vapi_delete_memif(VPPForwarderModule::sock_, sw_if_index_); diff --git a/libtransport/src/io_modules/memif/vpp_forwarder_module.h b/libtransport/src/io_modules/memif/vpp_forwarder_module.h index 8c4114fed..162ee0ca5 100644 --- a/libtransport/src/io_modules/memif/vpp_forwarder_module.h +++ b/libtransport/src/io_modules/memif/vpp_forwarder_module.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -32,7 +32,8 @@ namespace core { class MemifConnector; class VPPForwarderModule : public IoModule { - static constexpr std::uint16_t interface_mtu = 1500; + static inline std::uint16_t interface_mtu = 1500; + static inline std::string const memif_socket_filename = "/run/vpp/memif.sock"; public: VPPForwarderModule(); @@ -41,11 +42,12 @@ class VPPForwarderModule : public IoModule { void connect(bool is_consumer) override; void send(Packet &packet) override; - void send(const uint8_t *packet, std::size_t len) override; + void send(const utils::MemBuf::Ptr &buffer) override; bool isConnected() override; void init(Connector::PacketReceivedCallback &&receive_callback, + Connector::PacketSentCallback &&sent_callback, Connector::OnReconnectCallback &&reconnect_callback, asio::io_service &io_service, const std::string &app_name = "Libtransport") override; @@ -54,7 +56,7 @@ class VPPForwarderModule : public IoModule { std::uint32_t getMtu() override; - bool isControlMessage(const uint8_t *message) override; + bool isControlMessage(utils::MemBuf &packet_buffer) override; void processControlMessageReply(utils::MemBuf &packet_buffer) override; @@ -66,7 +68,7 @@ class VPPForwarderModule : public IoModule { void producerConnection(); private: - MemifConnector *connector_; + std::shared_ptr<MemifConnector> connector_; uint32_t memif_id_; uint32_t sw_if_index_; // A consumer socket in vpp has two faces (ipv4 and ipv6) diff --git a/libtransport/src/io_modules/raw_socket/raw_socket_connector.cc b/libtransport/src/io_modules/raw_socket/raw_socket_connector.cc deleted file mode 100644 index 62efdc3a5..000000000 --- a/libtransport/src/io_modules/raw_socket/raw_socket_connector.cc +++ /dev/null @@ -1,200 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <core/raw_socket_connector.h> -#include <hicn/transport/utils/conversions.h> -#include <net/if.h> -#include <netdb.h> -#include <stdio.h> -#include <string.h> -#include <sys/ioctl.h> -#include <sys/socket.h> - -#define MY_DEST_MAC0 0x0a -#define MY_DEST_MAC1 0x7b -#define MY_DEST_MAC2 0x7c -#define MY_DEST_MAC3 0x1c -#define MY_DEST_MAC4 0x4a -#define MY_DEST_MAC5 0x14 - -namespace transport { - -namespace core { - -RawSocketConnector::RawSocketConnector( - PacketReceivedCallback &&receive_callback, - OnReconnect &&on_reconnect_callback, asio::io_service &io_service, - std::string app_name) - : Connector(std::move(receive_callback), std::move(on_reconnect_callback)), - io_service_(io_service), - socket_(io_service_, raw_protocol(PF_PACKET, SOCK_RAW)), - // resolver_(io_service_), - timer_(io_service_), - read_msg_(packet_pool_.makePtr(nullptr)), - data_available_(false), - app_name_(app_name) { - memset(&link_layer_address_, 0, sizeof(link_layer_address_)); -} - -RawSocketConnector::~RawSocketConnector() {} - -void RawSocketConnector::connect(const std::string &interface_name, - const std::string &mac_address_str) { - state_ = ConnectorState::CONNECTING; - memset(ðernet_header_, 0, sizeof(ethernet_header_)); - struct ifreq ifr; - struct ifreq if_mac; - uint8_t mac_address[6]; - - utils::convertStringToMacAddress(mac_address_str, mac_address); - - // Get interface mac address - int fd = static_cast<int>(socket_.native_handle()); - - /* Get the index of the interface to send on */ - memset(&ifr, 0, sizeof(struct ifreq)); - strncpy(ifr.ifr_name, interface_name.c_str(), interface_name.size()); - - // if (ioctl(fd, SIOCGIFINDEX, &if_idx) < 0) { - // perror("SIOCGIFINDEX"); - // } - - /* Get the MAC address of the interface to send on */ - memset(&if_mac, 0, sizeof(struct ifreq)); - strncpy(if_mac.ifr_name, interface_name.c_str(), interface_name.size()); - if (ioctl(fd, SIOCGIFHWADDR, &if_mac) < 0) { - perror("SIOCGIFHWADDR"); - throw errors::RuntimeException("Interface does not exist"); - } - - /* Ethernet header */ - for (int i = 0; i < 6; i++) { - ethernet_header_.ether_shost[i] = - ((uint8_t *)&if_mac.ifr_hwaddr.sa_data)[i]; - ethernet_header_.ether_dhost[i] = mac_address[i]; - } - - /* Ethertype field */ - ethernet_header_.ether_type = htons(ETH_P_IPV6); - - strcpy(ifr.ifr_name, interface_name.c_str()); - - if (0 == ioctl(fd, SIOCGIFHWADDR, &ifr)) { - memcpy(link_layer_address_.sll_addr, ifr.ifr_hwaddr.sa_data, 6); - } - - // memset(&ifr, 0, sizeof(ifr)); - // ioctl(fd, SIOCGIFFLAGS, &ifr); - // ifr.ifr_flags |= IFF_PROMISC; - // ioctl(fd, SIOCSIFFLAGS, &ifr); - - link_layer_address_.sll_family = AF_PACKET; - link_layer_address_.sll_protocol = htons(ETH_P_ALL); - link_layer_address_.sll_ifindex = if_nametoindex(interface_name.c_str()); - link_layer_address_.sll_hatype = 1; - link_layer_address_.sll_halen = 6; - - // startConnectionTimer(); - doConnect(); - doRecvPacket(); -} - -void RawSocketConnector::send(const uint8_t *packet, std::size_t len, - const PacketSentCallback &packet_sent) { - if (packet_sent != 0) { - socket_.async_send( - asio::buffer(packet, len), - [packet_sent](std::error_code ec, std::size_t /*length*/) { - packet_sent(); - }); - } else { - if (state_ == ConnectorState::CONNECTED) { - socket_.send(asio::buffer(packet, len)); - } - } -} - -void RawSocketConnector::send(const Packet::MemBufPtr &packet) { - io_service_.post([this, packet]() { - bool write_in_progress = !output_buffer_.empty(); - output_buffer_.push_back(std::move(packet)); - if (TRANSPORT_EXPECT_TRUE(state_ == ConnectorState::CONNECTED)) { - if (!write_in_progress) { - doSendPacket(); - } else { - // Tell the handle connect it has data to write - data_available_ = true; - } - } - }); -} - -void RawSocketConnector::close() { - io_service_.post([this]() { socket_.close(); }); -} - -void RawSocketConnector::doSendPacket() { - auto packet = output_buffer_.front().get(); - auto array = std::vector<asio::const_buffer>(); - - const utils::MemBuf *current = packet; - do { - array.push_back(asio::const_buffer(current->data(), current->length())); - current = current->next(); - } while (current != packet); - - socket_.async_send( - std::move(array), - [this /*, packet*/](std::error_code ec, std::size_t bytes_transferred) { - if (TRANSPORT_EXPECT_TRUE(!ec)) { - output_buffer_.pop_front(); - if (!output_buffer_.empty()) { - doSendPacket(); - } - } else { - LOG(ERROR) << ec.value() << " " << ec.message(); - } - }); -} - -void RawSocketConnector::doRecvPacket() { - read_msg_ = getPacket(); - socket_.async_receive( - asio::buffer(read_msg_->writableData(), packet_size), - [this](std::error_code ec, std::size_t bytes_transferred) mutable { - if (!ec) { - // Ignore packets that are not for us - uint8_t *dst_mac_address = const_cast<uint8_t *>(read_msg_->data()); - if (!std::memcmp(dst_mac_address, ethernet_header_.ether_shost, - ETHER_ADDR_LEN)) { - read_msg_->append(bytes_transferred); - read_msg_->trimStart(sizeof(struct ether_header)); - receive_callback_(std::move(read_msg_)); - } - } else { - LOG(ERROR) << ec.value() << " " << ec.message(); - } - doRecvPacket(); - }); -} - -void RawSocketConnector::doConnect() { - state_ = ConnectorState::CONNECTED; - socket_.bind(raw_endpoint(&link_layer_address_, sizeof(link_layer_address_))); -} - -} // end namespace core - -} // end namespace transport diff --git a/libtransport/src/io_modules/raw_socket/raw_socket_connector.h b/libtransport/src/io_modules/raw_socket/raw_socket_connector.h deleted file mode 100644 index 06892b3d8..000000000 --- a/libtransport/src/io_modules/raw_socket/raw_socket_connector.h +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include <core/connector.h> -#include <hicn/transport/config.h> -#include <hicn/transport/core/asio_wrapper.h> -#include <hicn/transport/core/name.h> -#include <linux/if_packet.h> -#include <net/ethernet.h> -#include <sys/socket.h> - -#include <deque> - -namespace transport { - -namespace core { - -using asio::generic::raw_protocol; -using raw_endpoint = asio::generic::basic_endpoint<raw_protocol>; - -class RawSocketConnector : public Connector { - public: - RawSocketConnector(PacketReceivedCallback &&receive_callback, - OnReconnect &&reconnect_callback, - asio::io_service &io_service, - std::string app_name = "Libtransport"); - - ~RawSocketConnector() override; - - void send(const Packet::MemBufPtr &packet) override; - - void send(const uint8_t *packet, std::size_t len, - const PacketSentCallback &packet_sent = 0) override; - - void close() override; - - void connect(const std::string &interface_name, - const std::string &mac_address_str); - - private: - void doConnect(); - - void doRecvPacket(); - - void doSendPacket(); - - private: - asio::io_service &io_service_; - raw_protocol::socket socket_; - - struct ether_header ethernet_header_; - - struct sockaddr_ll link_layer_address_; - - asio::steady_timer timer_; - - utils::ObjectPool<utils::MemBuf>::Ptr read_msg_; - - bool data_available_; - std::string app_name_; -}; - -} // end namespace core - -} // end namespace transport diff --git a/libtransport/src/io_modules/raw_socket/raw_socket_interface.cc b/libtransport/src/io_modules/raw_socket/raw_socket_interface.cc deleted file mode 100644 index dcf489f59..000000000 --- a/libtransport/src/io_modules/raw_socket/raw_socket_interface.cc +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <core/raw_socket_interface.h> -#include <hicn/transport/utils/linux.h> - -#include <fstream> - -namespace transport { - -namespace core { - -static std::string config_folder_path = "/etc/transport/interface.conf.d"; - -RawSocketInterface::RawSocketInterface(RawSocketConnector &connector) - : ForwarderInterface<RawSocketInterface, RawSocketConnector>(connector) {} - -RawSocketInterface::~RawSocketInterface() {} - -void RawSocketInterface::connect(bool is_consumer) { - std::string complete_filename = - config_folder_path + std::string("/") + output_interface_; - - std::ifstream is(complete_filename); - std::string interface; - - if (is) { - is >> remote_mac_address_; - } - - // Get interface ip address - struct sockaddr_in6 address = {0}; - utils::retrieveInterfaceAddress(output_interface_, &address); - - std::memcpy(&inet6_address_.v6.as_u8, &address.sin6_addr, - sizeof(address.sin6_addr)); - connector_.connect(output_interface_, remote_mac_address_); -} - -void RawSocketInterface::registerRoute(Prefix &prefix) { return; } - -} // namespace core - -} // namespace transport diff --git a/libtransport/src/io_modules/raw_socket/raw_socket_interface.h b/libtransport/src/io_modules/raw_socket/raw_socket_interface.h deleted file mode 100644 index 7036cac7e..000000000 --- a/libtransport/src/io_modules/raw_socket/raw_socket_interface.h +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include <core/forwarder_interface.h> -#include <core/raw_socket_connector.h> -#include <hicn/transport/core/prefix.h> - -#include <atomic> -#include <deque> - -namespace transport { - -namespace core { - -class RawSocketInterface - : public ForwarderInterface<RawSocketInterface, RawSocketConnector> { - public: - typedef RawSocketConnector ConnectorType; - - RawSocketInterface(RawSocketConnector &connector); - - ~RawSocketInterface(); - - void connect(bool is_consumer); - - void registerRoute(Prefix &prefix); - - std::uint16_t getMtu() { return interface_mtu; } - - TRANSPORT_ALWAYS_INLINE static bool isControlMessageImpl( - const uint8_t *message) { - return false; - } - - TRANSPORT_ALWAYS_INLINE void processControlMessageReplyImpl( - Packet::MemBufPtr &&packet_buffer) {} - - TRANSPORT_ALWAYS_INLINE void closeConnection(){}; - - private: - static constexpr std::uint16_t interface_mtu = 1500; - std::string remote_mac_address_; -}; - -} // namespace core - -} // namespace transport diff --git a/libtransport/src/io_modules/udp/CMakeLists.txt b/libtransport/src/io_modules/udp/CMakeLists.txt deleted file mode 100644 index b9c19d063..000000000 --- a/libtransport/src/io_modules/udp/CMakeLists.txt +++ /dev/null @@ -1,32 +0,0 @@ -# Copyright (c) 2021 Cisco and/or its affiliates. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -list(APPEND MODULE_HEADER_FILES - ${CMAKE_CURRENT_SOURCE_DIR}/hicn_forwarder_module.h - ${CMAKE_CURRENT_SOURCE_DIR}/udp_socket_connector.h -) - -list(APPEND MODULE_SOURCE_FILES - ${CMAKE_CURRENT_SOURCE_DIR}/hicn_forwarder_module.cc - ${CMAKE_CURRENT_SOURCE_DIR}/udp_socket_connector.cc -) - -build_module(hicnlight_module - SHARED - SOURCES ${MODULE_SOURCE_FILES} - DEPENDS ${DEPENDENCIES} - COMPONENT ${LIBTRANSPORT_COMPONENT} - INCLUDE_DIRS ${LIBTRANSPORT_INCLUDE_DIRS} ${LIBTRANSPORT_INTERNAL_INCLUDE_DIRS} - DEFINITIONS ${COMPILER_DEFINITIONS} - COMPILE_OPTIONS ${COMPILE_FLAGS} -) diff --git a/libtransport/src/io_modules/udp/hicn_forwarder_module.cc b/libtransport/src/io_modules/udp/hicn_forwarder_module.cc deleted file mode 100644 index ba08dd8c0..000000000 --- a/libtransport/src/io_modules/udp/hicn_forwarder_module.cc +++ /dev/null @@ -1,181 +0,0 @@ -/* - * Copyright (c) 2017-2020 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <io_modules/udp/hicn_forwarder_module.h> -#include <io_modules/udp/udp_socket_connector.h> - -union AddressLight { - uint32_t ipv4; - struct in6_addr ipv6; -}; - -typedef struct { - uint8_t message_type; - uint8_t command_id; - uint16_t length; - uint32_t seq_num; -} CommandHeader; - -typedef struct { - uint8_t message_type; - uint8_t command_id; - uint16_t length; - uint32_t seq_num; - char symbolic_or_connid[16]; - union AddressLight address; - uint16_t cost; - uint8_t address_type; - uint8_t len; -} RouteToSelfCommand; - -typedef struct { - uint8_t message_type; - uint8_t command_id; - uint16_t length; - uint32_t seq_num; - char symbolic_or_connid[16]; -} DeleteSelfConnectionCommand; - -namespace { -static constexpr uint8_t addr_inet = 1; -static constexpr uint8_t addr_inet6 = 2; -static constexpr uint8_t add_route_command = 3; -static constexpr uint8_t delete_connection_command = 5; -static constexpr uint8_t request_light = 0xc0; -static constexpr char identifier[] = "SELF"; - -void fillCommandHeader(CommandHeader *header) { - // Allocate and fill the header - header->message_type = request_light; - header->length = 1; -} - -RouteToSelfCommand createCommandRoute(std::unique_ptr<sockaddr> &&addr, - uint8_t prefix_length) { - RouteToSelfCommand command = {0}; - - // check and set IP address - if (addr->sa_family == AF_INET) { - command.address_type = addr_inet; - command.address.ipv4 = ((sockaddr_in *)addr.get())->sin_addr.s_addr; - } else if (addr->sa_family == AF_INET6) { - command.address_type = addr_inet6; - command.address.ipv6 = ((sockaddr_in6 *)addr.get())->sin6_addr; - } - - // Fill remaining payload fields -#ifndef _WIN32 - strcpy(command.symbolic_or_connid, identifier); -#else - strcpy_s(command.symbolic_or_connid, 16, identifier); -#endif - command.cost = 1; - command.len = (uint8_t)prefix_length; - - // Allocate and fill the header - command.command_id = add_route_command; - fillCommandHeader((CommandHeader *)&command); - - return command; -} - -DeleteSelfConnectionCommand createCommandDeleteConnection() { - DeleteSelfConnectionCommand command = {0}; - fillCommandHeader((CommandHeader *)&command); - command.command_id = delete_connection_command; - -#ifndef _WIN32 - strcpy(command.symbolic_or_connid, identifier); -#else - strcpy_s(command.symbolic_or_connid, 16, identifier); -#endif - - return command; -} - -} // namespace - -namespace transport { - -namespace core { - -HicnForwarderModule::HicnForwarderModule() : IoModule(), connector_(nullptr) {} - -HicnForwarderModule::~HicnForwarderModule() {} - -void HicnForwarderModule::connect(bool is_consumer) { - connector_->connect(); - connector_->setRole(is_consumer ? Connector::Role::CONSUMER - : Connector::Role::PRODUCER); -} - -bool HicnForwarderModule::isConnected() { return connector_->isConnected(); } - -void HicnForwarderModule::send(Packet &packet) { - IoModule::send(packet); - packet.setChecksum(); - connector_->send(packet); -} - -void HicnForwarderModule::send(const uint8_t *packet, std::size_t len) { - counters_.tx_packets++; - counters_.tx_bytes += len; - - // Perfect forwarding - connector_->send(packet, len); -} - -void HicnForwarderModule::registerRoute(const Prefix &prefix) { - auto command = createCommandRoute(prefix.toSockaddr(), - (uint8_t)prefix.getPrefixLength()); - send((uint8_t *)&command, sizeof(RouteToSelfCommand)); -} - -void HicnForwarderModule::closeConnection() { - auto command = createCommandDeleteConnection(); - send((uint8_t *)&command, sizeof(DeleteSelfConnectionCommand)); - connector_->close(); -} - -void HicnForwarderModule::init( - Connector::PacketReceivedCallback &&receive_callback, - Connector::OnReconnectCallback &&reconnect_callback, - asio::io_service &io_service, const std::string &app_name) { - if (!connector_) { - connector_ = new UdpSocketConnector(std::move(receive_callback), nullptr, - nullptr, std::move(reconnect_callback), - io_service, app_name); - } -} - -void HicnForwarderModule::processControlMessageReply( - utils::MemBuf &packet_buffer) { - if (packet_buffer.data()[0] == nack_code) { - throw errors::RuntimeException( - "Received Nack message from hicn light forwarder."); - } -} - -std::uint32_t HicnForwarderModule::getMtu() { return interface_mtu; } - -bool HicnForwarderModule::isControlMessage(const uint8_t *message) { - return message[0] == ack_code || message[0] == nack_code; -} - -extern "C" IoModule *create_module(void) { return new HicnForwarderModule(); } - -} // namespace core - -} // namespace transport diff --git a/libtransport/src/io_modules/udp/udp_socket_connector.cc b/libtransport/src/io_modules/udp/udp_socket_connector.cc deleted file mode 100644 index 1412d8c07..000000000 --- a/libtransport/src/io_modules/udp/udp_socket_connector.cc +++ /dev/null @@ -1,211 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifdef _WIN32 -#include <hicn/transport/portability/win_portability.h> -#endif - -#include <glog/logging.h> -#include <hicn/transport/errors/errors.h> -#include <hicn/transport/utils/object_pool.h> -#include <io_modules/udp/udp_socket_connector.h> - -#include <thread> -#include <vector> - -namespace transport { - -namespace core { - -UdpSocketConnector::UdpSocketConnector( - PacketReceivedCallback &&receive_callback, PacketSentCallback &&packet_sent, - OnCloseCallback &&close_callback, OnReconnectCallback &&on_reconnect, - asio::io_service &io_service, std::string app_name) - : Connector(std::move(receive_callback), std::move(packet_sent), - std::move(close_callback), std::move(on_reconnect)), - io_service_(io_service), - socket_(io_service_), - resolver_(io_service_), - connection_timer_(io_service_), - read_msg_(std::make_pair(nullptr, 0)), - is_reconnection_(false), - data_available_(false), - app_name_(app_name) {} - -UdpSocketConnector::~UdpSocketConnector() {} - -void UdpSocketConnector::connect(std::string ip_address, std::string port) { - endpoint_iterator_ = resolver_.resolve( - {ip_address, port, asio::ip::resolver_query_base::numeric_service}); - - state_ = Connector::State::CONNECTING; - doConnect(); -} - -void UdpSocketConnector::send(const uint8_t *packet, std::size_t len) { - socket_.async_send(asio::buffer(packet, len), - [this](std::error_code ec, std::size_t /*length*/) { - if (sent_callback_) { - sent_callback_(this, ec); - } - }); -} - -void UdpSocketConnector::send(Packet &packet) { - io_service_.post([this, _packet{packet.shared_from_this()}]() { - bool write_in_progress = !output_buffer_.empty(); - output_buffer_.push_back(std::move(_packet)); - if (TRANSPORT_EXPECT_TRUE(state_ == Connector::State::CONNECTED)) { - if (!write_in_progress) { - doWrite(); - } - } else { - // Tell the handle connect it has data to write - data_available_ = true; - } - }); -} - -void UdpSocketConnector::close() { - if (io_service_.stopped()) { - doClose(); - } else { - io_service_.dispatch(std::bind(&UdpSocketConnector::doClose, this)); - } -} - -void UdpSocketConnector::doClose() { - if (state_ != Connector::State::CLOSED) { - state_ = Connector::State::CLOSED; - if (socket_.is_open()) { - socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both); - socket_.close(); - } - } -} - -void UdpSocketConnector::doWrite() { - auto packet = output_buffer_.front().get(); - auto array = std::vector<asio::const_buffer>(); - - const utils::MemBuf *current = packet; - do { - array.push_back(asio::const_buffer(current->data(), current->length())); - current = current->next(); - } while (current != packet); - - socket_.async_send(std::move(array), [this](std::error_code ec, - std::size_t length) { - if (TRANSPORT_EXPECT_TRUE(!ec)) { - output_buffer_.pop_front(); - if (!output_buffer_.empty()) { - doWrite(); - } - } else if (ec.value() == static_cast<int>(std::errc::operation_canceled)) { - // The connection has been closed by the application. - return; - } else { - LOG(ERROR) << ec.value() << " " << ec.message(); - tryReconnect(); - } - }); -} - -void UdpSocketConnector::doRead() { - read_msg_ = getRawBuffer(); - socket_.async_receive( - asio::buffer(read_msg_.first, read_msg_.second), - [this](std::error_code ec, std::size_t length) { - if (TRANSPORT_EXPECT_TRUE(!ec)) { - auto packet = getPacketFromBuffer(read_msg_.first, length); - receive_callback_(this, *packet, std::make_error_code(std::errc(0))); - doRead(); - } else if (ec.value() == - static_cast<int>(std::errc::operation_canceled)) { - // The connection has been closed by the application. - return; - } else { - LOG(ERROR) << ec.value() << " " << ec.message(); - tryReconnect(); - } - }); -} - -void UdpSocketConnector::tryReconnect() { - if (state_ == Connector::State::CONNECTED) { - LOG(ERROR) << "Connection lost. Trying to reconnect..."; - state_ = Connector::State::CONNECTING; - is_reconnection_ = true; - io_service_.post([this]() { - if (socket_.is_open()) { - socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both); - socket_.close(); - } - - doConnect(); - startConnectionTimer(); - std::this_thread::sleep_for(std::chrono::milliseconds(500)); - }); - } -} - -void UdpSocketConnector::doConnect() { - asio::async_connect( - socket_, endpoint_iterator_, - [this](std::error_code ec, udp::resolver::iterator) { - if (!ec) { - connection_timer_.cancel(); - state_ = Connector::State::CONNECTED; - doRead(); - - if (data_available_) { - data_available_ = false; - doWrite(); - } - - if (is_reconnection_) { - is_reconnection_ = false; - } - - on_reconnect_callback_(this); - } else { - doConnect(); - std::this_thread::sleep_for(std::chrono::milliseconds(500)); - } - }); -} - -bool UdpSocketConnector::checkConnected() { - return state_ == Connector::State::CONNECTED; -} - -void UdpSocketConnector::startConnectionTimer() { - connection_timer_.expires_from_now(std::chrono::seconds(60)); - connection_timer_.async_wait(std::bind(&UdpSocketConnector::handleDeadline, - this, std::placeholders::_1)); -} - -void UdpSocketConnector::handleDeadline(const std::error_code &ec) { - if (!ec) { - io_service_.post([this]() { - socket_.close(); - LOG(ERROR) << "Error connecting. Is the forwarder running?"; - }); - } -} - -} // end namespace core - -} // end namespace transport diff --git a/libtransport/src/io_modules/udp/udp_socket_connector.h b/libtransport/src/io_modules/udp/udp_socket_connector.h deleted file mode 100644 index c483e14aa..000000000 --- a/libtransport/src/io_modules/udp/udp_socket_connector.h +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include <hicn/transport/config.h> -#include <hicn/transport/core/asio_wrapper.h> -#include <hicn/transport/core/connector.h> -#include <hicn/transport/core/content_object.h> -#include <hicn/transport/core/global_object_pool.h> -#include <hicn/transport/core/interest.h> -#include <hicn/transport/core/name.h> -#include <hicn/transport/core/packet.h> -#include <hicn/transport/utils/branch_prediction.h> - -#include <deque> - -namespace transport { -namespace core { - -using asio::ip::udp; - -class UdpSocketConnector : public Connector { - public: - UdpSocketConnector(PacketReceivedCallback &&receive_callback, - PacketSentCallback &&packet_sent, - OnCloseCallback &&close_callback, - OnReconnectCallback &&on_reconnect, - asio::io_service &io_service, - std::string app_name = "Libtransport"); - - ~UdpSocketConnector() override; - - void send(Packet &packet) override; - - void send(const uint8_t *packet, std::size_t len) override; - - void close() override; - - void connect(std::string ip_address = "127.0.0.1", std::string port = "9695"); - - private: - void doConnect(); - - void doRead(); - - void doWrite(); - - void doClose(); - - bool checkConnected(); - - private: - void handleDeadline(const std::error_code &ec); - - void startConnectionTimer(); - - void tryReconnect(); - - asio::io_service &io_service_; - asio::ip::udp::socket socket_; - asio::ip::udp::resolver resolver_; - asio::ip::udp::resolver::iterator endpoint_iterator_; - asio::steady_timer connection_timer_; - - std::pair<uint8_t *, std::size_t> read_msg_; - - bool is_reconnection_; - bool data_available_; - - std::string app_name_; -}; - -} // end namespace core - -} // end namespace transport |