diff options
Diffstat (limited to 'libtransport/src/core')
39 files changed, 6781 insertions, 0 deletions
diff --git a/libtransport/src/core/CMakeLists.txt b/libtransport/src/core/CMakeLists.txt new file mode 100644 index 000000000..12ef9cfe4 --- /dev/null +++ b/libtransport/src/core/CMakeLists.txt @@ -0,0 +1,76 @@ +# Copyright (c) 2017-2019 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +cmake_minimum_required(VERSION 3.5 FATAL_ERROR) + +list(APPEND HEADER_FILES + ${CMAKE_CURRENT_SOURCE_DIR}/facade.h + ${CMAKE_CURRENT_SOURCE_DIR}/manifest.h + ${CMAKE_CURRENT_SOURCE_DIR}/manifest_inline.h + ${CMAKE_CURRENT_SOURCE_DIR}/manifest_format_fixed.h + ${CMAKE_CURRENT_SOURCE_DIR}/manifest_format.h + ${CMAKE_CURRENT_SOURCE_DIR}/pending_interest.h + ${CMAKE_CURRENT_SOURCE_DIR}/portal.h + ${CMAKE_CURRENT_SOURCE_DIR}/connector.h + ${CMAKE_CURRENT_SOURCE_DIR}/tcp_socket_connector.h + ${CMAKE_CURRENT_SOURCE_DIR}/udp_socket_connector.h + ${CMAKE_CURRENT_SOURCE_DIR}/forwarder_interface.h + ${CMAKE_CURRENT_SOURCE_DIR}/hicn_forwarder_interface.h + ${CMAKE_CURRENT_SOURCE_DIR}/vpp_forwarder_interface.h + ${CMAKE_CURRENT_SOURCE_DIR}/memif_connector.h +) + +list(APPEND SOURCE_FILES + ${CMAKE_CURRENT_SOURCE_DIR}/content_object.cc + ${CMAKE_CURRENT_SOURCE_DIR}/interest.cc + ${CMAKE_CURRENT_SOURCE_DIR}/pending_interest.cc + ${CMAKE_CURRENT_SOURCE_DIR}/packet.cc + ${CMAKE_CURRENT_SOURCE_DIR}/name.cc + ${CMAKE_CURRENT_SOURCE_DIR}/prefix.cc + ${CMAKE_CURRENT_SOURCE_DIR}/tcp_socket_connector.cc + ${CMAKE_CURRENT_SOURCE_DIR}/udp_socket_connector.cc + ${CMAKE_CURRENT_SOURCE_DIR}/hicn_forwarder_interface.cc + ${CMAKE_CURRENT_SOURCE_DIR}/manifest_format_fixed.cc + ${CMAKE_CURRENT_SOURCE_DIR}/connector.cc +) + +if ("${CMAKE_SYSTEM_NAME}" STREQUAL "Linux") + if (BUILD_WITH_VPP OR BUILD_HICNPLUGIN) + list(APPEND HEADER_FILES + ${CMAKE_CURRENT_SOURCE_DIR}/vpp_forwarder_interface.h + ${CMAKE_CURRENT_SOURCE_DIR}/memif_connector.h + ${CMAKE_CURRENT_SOURCE_DIR}/hicn_vapi.h + ${CMAKE_CURRENT_SOURCE_DIR}/memif_vapi.h + ) + + list(APPEND SOURCE_FILES + ${CMAKE_CURRENT_SOURCE_DIR}/vpp_forwarder_interface.cc + ${CMAKE_CURRENT_SOURCE_DIR}/memif_connector.cc + ${CMAKE_CURRENT_SOURCE_DIR}/hicn_vapi.c + ${CMAKE_CURRENT_SOURCE_DIR}/memif_vapi.c + ) + endif() + + list(APPEND HEADER_FILES + ${CMAKE_CURRENT_SOURCE_DIR}/raw_socket_connector.h + ${CMAKE_CURRENT_SOURCE_DIR}/raw_socket_interface.h + ) + + list(APPEND SOURCE_FILES + ${CMAKE_CURRENT_SOURCE_DIR}/raw_socket_connector.cc + ${CMAKE_CURRENT_SOURCE_DIR}/raw_socket_interface.cc + ) +endif() + +set(SOURCE_FILES ${SOURCE_FILES} PARENT_SCOPE) +set(HEADER_FILES ${HEADER_FILES} PARENT_SCOPE)
\ No newline at end of file diff --git a/libtransport/src/core/connector.cc b/libtransport/src/core/connector.cc new file mode 100644 index 000000000..63919537d --- /dev/null +++ b/libtransport/src/core/connector.cc @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <core/connector.h> + +namespace transport { + +namespace core { + +std::once_flag Connector::init_flag_; + +Connector::Connector(PacketReceivedCallback &&receive_callback, + OnReconnect &&reconnect_callback) + : packet_pool_(), + receive_callback_(std::move(receive_callback)), + on_reconnect_callback_(std::move(reconnect_callback)), + state_(ConnectorState::CLOSED) { + init(); +} + +void Connector::init() { increasePoolSize(); } + +void Connector::increasePoolSize(std::size_t size) { + // Allocate space for receiving packets + const auto capacity = packet_size * size; + uint8_t *buffer = static_cast<uint8_t *>(malloc(capacity)); + std::unique_ptr<utils::MemBuf> buffer0 = + utils::MemBuf::takeOwnership(buffer, capacity, 0, nullptr, nullptr, true); + + for (std::size_t i = 1; i < size; i++) { + auto b = buffer0->cloneOne(); + b->advance(i * packet_size); + packet_pool_.add(b.release()); + } +} + +} // end namespace core + +} // end namespace transport diff --git a/libtransport/src/core/connector.h b/libtransport/src/core/connector.h new file mode 100644 index 000000000..f2bbe5dcd --- /dev/null +++ b/libtransport/src/core/connector.h @@ -0,0 +1,109 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <hicn/transport/core/packet.h> +#include <hicn/transport/utils/membuf.h> +#include <hicn/transport/utils/object_pool.h> +#include <hicn/transport/utils/ring_buffer.h> + +#include <deque> +#include <functional> + +namespace transport { + +namespace core { + +enum class ConnectorType : uint8_t { + SOCKET_CONNECTOR, + RAW_SOCKET_CONNECTOR, + VPP_CONNECTOR, +}; + +class Connector { + protected: + enum class ConnectorState { + CLOSED, + CONNECTING, + CONNECTED, + }; + + public: + static constexpr std::size_t packet_size = 2048; + static constexpr std::size_t queue_size = 4096; + static constexpr std::size_t packet_pool_size = 4096; + + using PacketRing = utils::CircularFifo<Packet::MemBufPtr, queue_size>; + using PacketQueue = std::deque<Packet::MemBufPtr>; + using PacketReceivedCallback = std::function<void(Packet::MemBufPtr &&)>; + using OnReconnect = std::function<void()>; + using PacketSentCallback = std::function<void()>; + + Connector(PacketReceivedCallback &&receive_callback, + OnReconnect &&reconnect_callback); + + virtual ~Connector(){}; + + virtual void send(const Packet::MemBufPtr &packet) = 0; + + virtual void send(const uint8_t *packet, std::size_t len, + const PacketSentCallback &packet_sent = 0) = 0; + + virtual void close() = 0; + + virtual ConnectorState state() { return state_; }; + + virtual bool isConnected() { return state_ == ConnectorState::CONNECTED; } + + protected: + void increasePoolSize(std::size_t size = packet_pool_size); + + TRANSPORT_ALWAYS_INLINE utils::ObjectPool<utils::MemBuf>::Ptr getPacket() { + auto result = packet_pool_.get(); + + while (TRANSPORT_EXPECT_FALSE(!result.first)) { + // Add packets to the pool + increasePoolSize(); + result = packet_pool_.get(); + } + + if (result.second->isChained()) { + result.second->separateChain(result.second->next(), + result.second->prev()); + } + + result.second->trimEnd(result.second->length()); + return std::move(result.second); + } + + private: + void init(); + + protected: + static std::once_flag init_flag_; + utils::ObjectPool<utils::MemBuf> packet_pool_; + PacketQueue output_buffer_; + + // Connector events + PacketReceivedCallback receive_callback_; + OnReconnect on_reconnect_callback_; + + // Connector state + ConnectorState state_; +}; +} // end namespace core + +} // end namespace transport diff --git a/libtransport/src/core/content_object.cc b/libtransport/src/core/content_object.cc new file mode 100644 index 000000000..6cbcdb29e --- /dev/null +++ b/libtransport/src/core/content_object.cc @@ -0,0 +1,201 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <hicn/transport/core/content_object.h> +#include <hicn/transport/errors/errors.h> +#include <hicn/transport/utils/branch_prediction.h> + +extern "C" { +#ifndef _WIN32 +TRANSPORT_CLANG_DISABLE_WARNING("-Wextern-c-compat") +#endif +#include <hicn/hicn.h> +#include <hicn/util/ip_address.h> +} + +#include <cstring> +#include <memory> + +namespace transport { + +namespace core { + +ContentObject::ContentObject(const Name &name, Packet::Format format) + : Packet(format) { + if (TRANSPORT_EXPECT_FALSE( + hicn_data_set_name(format, packet_start_, &name.name_) < 0)) { + throw errors::RuntimeException("Error filling the packet name."); + } + + if (TRANSPORT_EXPECT_FALSE(hicn_data_get_name(format_, packet_start_, + name_.getStructReference()) < + 0)) { + throw errors::MalformedPacketException(); + } +} + +#ifdef __ANDROID__ +ContentObject::ContentObject(hicn_format_t format) + : ContentObject(Name("0::0|0"), format) {} +#else +ContentObject::ContentObject(hicn_format_t format) + : ContentObject(Packet::base_name, format) {} +#endif + +ContentObject::ContentObject(const Name &name, hicn_format_t format, + const uint8_t *payload, std::size_t size) + : ContentObject(name, format) { + appendPayload(payload, size); +} + +ContentObject::ContentObject(const uint8_t *buffer, std::size_t size) + : Packet(buffer, size) { + if (hicn_data_get_name(format_, packet_start_, name_.getStructReference()) < + 0) { + throw errors::RuntimeException("Error getting name from content object."); + } +} + +ContentObject::ContentObject(MemBufPtr &&buffer) : Packet(std::move(buffer)) { + if (hicn_data_get_name(format_, packet_start_, name_.getStructReference()) < + 0) { + throw errors::RuntimeException("Error getting name from content object."); + } +} + +ContentObject::ContentObject(ContentObject &&other) : Packet(std::move(other)) { + name_ = std::move(other.name_); + + if (hicn_data_get_name(format_, packet_start_, name_.getStructReference()) < + 0) { + throw errors::MalformedPacketException(); + } +} + +ContentObject::~ContentObject() {} + +void ContentObject::replace(MemBufPtr &&buffer) { + Packet::replace(std::move(buffer)); + + if (hicn_data_get_name(format_, packet_start_, name_.getStructReference()) < + 0) { + throw errors::RuntimeException("Error getting name from content object."); + } +} + +const Name &ContentObject::getName() const { + if (!name_) { + if (hicn_data_get_name(format_, packet_start_, + (hicn_name_t *)name_.getConstStructReference()) < + 0) { + throw errors::MalformedPacketException(); + } + } + + return name_; +} + +Name &ContentObject::getWritableName() { return const_cast<Name &>(getName()); } + +void ContentObject::setName(const Name &name) { + if (hicn_data_set_name(format_, packet_start_, + name.getConstStructReference()) < 0) { + throw errors::RuntimeException("Error setting content object name."); + } + + if (hicn_data_get_name(format_, packet_start_, name_.getStructReference()) < + 0) { + throw errors::MalformedPacketException(); + } +} + +void ContentObject::setName(Name &&name) { + if (hicn_data_set_name(format_, packet_start_, name.getStructReference()) < + 0) { + throw errors::RuntimeException( + "Error getting the payload length from content object."); + } + + if (hicn_data_get_name(format_, packet_start_, name_.getStructReference()) < + 0) { + throw errors::MalformedPacketException(); + } +} + +uint32_t ContentObject::getPathLabel() const { + uint32_t path_label; + if (hicn_data_get_path_label(packet_start_, &path_label) < 0) { + throw errors::RuntimeException( + "Error retrieving the path label from content object"); + } + + return path_label; +} + +ContentObject &ContentObject::setPathLabel(uint32_t path_label) { + if (hicn_data_set_path_label((hicn_header_t *)packet_start_, path_label) < + 0) { + throw errors::RuntimeException( + "Error setting the path label from content object"); + } + + return *this; +} + +void ContentObject::setLocator(const ip_address_t &ip_address) { + if (hicn_data_set_locator(format_, packet_start_, &ip_address) < 0) { + throw errors::RuntimeException("Error setting content object locator"); + } + + return; +} + +ip_address_t ContentObject::getLocator() const { + ip_address_t ip; + + if (hicn_data_get_locator(format_, packet_start_, &ip) < 0) { + throw errors::RuntimeException("Error getting content object locator."); + } + + return ip; +} + +void ContentObject::setLifetime(uint32_t lifetime) { + if (hicn_data_set_expiry_time(packet_start_, lifetime) < 0) { + throw errors::MalformedPacketException(); + } +} + +uint32_t ContentObject::getLifetime() const { + uint32_t lifetime = 0; + + if (hicn_data_get_expiry_time(packet_start_, &lifetime) < 0) { + throw errors::MalformedPacketException(); + } + + return lifetime; +} + +void ContentObject::resetForHash() { + if (hicn_data_reset_for_hash( + format_, reinterpret_cast<hicn_header_t *>(packet_start_)) < 0) { + throw errors::RuntimeException( + "Error resetting content object fields for hash computation."); + } +} + +} // end namespace core + +} // end namespace transport diff --git a/libtransport/src/core/facade.h b/libtransport/src/core/facade.h new file mode 100644 index 000000000..04f643f63 --- /dev/null +++ b/libtransport/src/core/facade.h @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <core/forwarder_interface.h> +#include <core/hicn_forwarder_interface.h> +#include <core/manifest_format_fixed.h> +#include <core/manifest_inline.h> +#include <core/portal.h> + +#ifdef __linux__ +#ifndef __ANDROID__ +#include <core/raw_socket_interface.h> +#ifdef __vpp__ +#include <core/vpp_forwarder_interface.h> +#endif +#endif +#endif + +namespace transport { + +namespace core { + +using HicnForwarderPortal = Portal<HicnForwarderInterface>; + +#ifdef __linux__ +#ifndef __ANDROID__ +using RawSocketPortal = Portal<RawSocketInterface>; +#endif +#ifdef __vpp__ +using VPPForwarderPortal = Portal<VPPForwarderInterface>; +#endif +#endif + +using ContentObjectManifest = core::ManifestInline<ContentObject, Fixed>; +using InterestManifest = core::ManifestInline<Interest, Fixed>; + +} // namespace core + +} // namespace transport diff --git a/libtransport/src/core/forwarder_interface.h b/libtransport/src/core/forwarder_interface.h new file mode 100644 index 000000000..3e70e221d --- /dev/null +++ b/libtransport/src/core/forwarder_interface.h @@ -0,0 +1,147 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <core/udp_socket_connector.h> +#include <hicn/transport/core/prefix.h> +#include <hicn/transport/portability/portability.h> +#include <hicn/transport/utils/chrono_typedefs.h> + +#include <deque> + +namespace transport { + +namespace core { + +typedef struct { + uint64_t rx_packets; + uint64_t tx_packets; + uint64_t rx_bytes; + uint64_t tx_bytes; + uint64_t rx_errors; + uint64_t tx_errors; +} Counters; + +template <typename Implementation, typename ConnectorType> +class ForwarderInterface { + static_assert(std::is_base_of<Connector, ConnectorType>::value, + "T must inherit from connector!"); + + static constexpr uint32_t standard_cs_reserved = 5000; + + protected: + ForwarderInterface(ConnectorType &c) + : connector_(c), + inet_address_({}), + inet6_address_({}), + mtu_(1500), + output_interface_(""), + content_store_reserved_(standard_cs_reserved) {} + + public: + virtual ~ForwarderInterface() {} + + TRANSPORT_ALWAYS_INLINE void connect(bool is_consumer = true) { + static_cast<Implementation &>(*this).connect(is_consumer); + } + + TRANSPORT_ALWAYS_INLINE void registerRoute(Prefix &prefix) { + static_cast<Implementation &>(*this).registerRoute(); + } + + TRANSPORT_ALWAYS_INLINE std::uint32_t getMtu() { + return static_cast<Implementation &>(*this).getMtu(); + } + + TRANSPORT_ALWAYS_INLINE static bool isControlMessage(const uint8_t *message) { + return Implementation::isControlMessageImpl(message); + } + + template <typename R> + TRANSPORT_ALWAYS_INLINE void processControlMessageReply(R &&packet_buffer) { + return static_cast<Implementation &>(*this).processControlMessageReplyImpl( + std::forward<R &&>(packet_buffer)); + } + + TRANSPORT_ALWAYS_INLINE void closeConnection() { + return static_cast<Implementation &>(*this).closeConnection(); + } + + template < + typename R, + typename = std::enable_if_t< + std::is_base_of<Packet, typename std::remove_reference_t<R>>::value, + R>> + TRANSPORT_ALWAYS_INLINE void send(R &&packet) { + counters_.tx_packets++; + counters_.tx_bytes += packet.payloadSize() + packet.headerSize(); + + if (_is_ipv4(packet.getFormat())) { + packet.setLocator(inet_address_); + } else { + packet.setLocator(inet6_address_); + } + + // TRANSPORT_LOGI("Sending packet %s at %lu", + // packet.getName().toString().c_str(), + // utils::SteadyClock::now().time_since_epoch().count()); + packet.setChecksum(); + connector_.send(packet.acquireMemBufReference()); + } + + TRANSPORT_ALWAYS_INLINE void send(const uint8_t *packet, std::size_t len) { + // ASIO_COMPLETION_HANDLER_CHECK(Handler, packet_sent) type_check; + counters_.tx_packets++; + counters_.tx_bytes += len; + + // Perfect forwarding + connector_.send(packet, len); + } + + TRANSPORT_ALWAYS_INLINE void shutdown() { connector_.close(); } + + TRANSPORT_ALWAYS_INLINE Connector &getConnector() { return connector_; } + + TRANSPORT_ALWAYS_INLINE void setContentStoreSize(uint32_t cs_size) { + content_store_reserved_ = cs_size; + } + + TRANSPORT_ALWAYS_INLINE uint32_t getContentStoreSize() const { + return content_store_reserved_; + } + + TRANSPORT_ALWAYS_INLINE void setOutputInterface( + const std::string &interface) { + output_interface_ = interface; + } + + TRANSPORT_ALWAYS_INLINE std::string &getOutputInterface() { + return output_interface_; + } + + protected: + ConnectorType &connector_; + ip_address_t inet_address_; + ip_address_t inet6_address_; + uint16_t mtu_; + std::string output_interface_; + uint32_t content_store_reserved_; + Counters counters_; +}; + +} // namespace core + +} // namespace transport diff --git a/libtransport/src/core/hicn_forwarder_interface.cc b/libtransport/src/core/hicn_forwarder_interface.cc new file mode 100644 index 000000000..810daba3a --- /dev/null +++ b/libtransport/src/core/hicn_forwarder_interface.cc @@ -0,0 +1,135 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <core/hicn_forwarder_interface.h> + +union AddressLight { + uint32_t ipv4; + struct in6_addr ipv6; +}; + +typedef struct { + uint8_t message_type; + uint8_t command_id; + uint16_t length; + uint32_t seq_num; +} CommandHeader; + +typedef struct { + uint8_t message_type; + uint8_t command_id; + uint16_t length; + uint32_t seq_num; + char symbolic_or_connid[16]; + union AddressLight address; + uint16_t cost; + uint8_t address_type; + uint8_t len; +} RouteToSelfCommand; + +typedef struct { + uint8_t message_type; + uint8_t command_id; + uint16_t length; + uint32_t seq_num; + char symbolic_or_connid[16]; +} DeleteSelfConnectionCommand; + +namespace { +static constexpr uint8_t addr_inet = 1; +static constexpr uint8_t addr_inet6 = 2; +static constexpr uint8_t add_route_command = 3; +static constexpr uint8_t delete_connection_command = 5; +static constexpr uint8_t request_light = 0xc0; +static constexpr char identifier[] = "SELF"; + +void fillCommandHeader(CommandHeader *header) { + // Allocate and fill the header + header->message_type = request_light; + header->length = 1; +} + +RouteToSelfCommand createCommandRoute(std::unique_ptr<sockaddr> &&addr, + uint8_t prefix_length) { + RouteToSelfCommand command = {0}; + + // check and set IP address + if (addr->sa_family == AF_INET) { + command.address_type = addr_inet; + command.address.ipv4 = ((sockaddr_in *)addr.get())->sin_addr.s_addr; + } else if (addr->sa_family == AF_INET6) { + command.address_type = addr_inet6; + command.address.ipv6 = ((sockaddr_in6 *)addr.get())->sin6_addr; + } + + // Fill remaining payload fields +#ifndef _WIN32 + strcpy(command.symbolic_or_connid, identifier); +#else + strcpy_s(command.symbolic_or_connid, 16, identifier); +#endif + command.cost = 1; + command.len = (uint8_t)prefix_length; + + // Allocate and fill the header + command.command_id = add_route_command; + fillCommandHeader((CommandHeader *)&command); + + return command; +} + +DeleteSelfConnectionCommand createCommandDeleteConnection() { + DeleteSelfConnectionCommand command = {0}; + fillCommandHeader((CommandHeader *)&command); + command.command_id = delete_connection_command; + +#ifndef _WIN32 + strcpy(command.symbolic_or_connid, identifier); +#else + strcpy_s(command.symbolic_or_connid, 16, identifier); +#endif + + return command; +} + +} // namespace + +namespace transport { + +namespace core { + +HicnForwarderInterface::HicnForwarderInterface(UdpSocketConnector &connector) + : ForwarderInterface<HicnForwarderInterface, UdpSocketConnector>( + connector) {} + +HicnForwarderInterface::~HicnForwarderInterface() {} + +void HicnForwarderInterface::connect(bool is_consumer) { connector_.connect(); } + +void HicnForwarderInterface::registerRoute(Prefix &prefix) { + auto command = createCommandRoute(prefix.toSockaddr(), + (uint8_t)prefix.getPrefixLength()); + send((uint8_t *)&command, sizeof(RouteToSelfCommand)); +} + +void HicnForwarderInterface::closeConnection() { + auto command = createCommandDeleteConnection(); + send((uint8_t *)&command, sizeof(DeleteSelfConnectionCommand)); + connector_.close(); +} + +} // namespace core + +} // namespace transport diff --git a/libtransport/src/core/hicn_forwarder_interface.h b/libtransport/src/core/hicn_forwarder_interface.h new file mode 100644 index 000000000..6969f4a6b --- /dev/null +++ b/libtransport/src/core/hicn_forwarder_interface.h @@ -0,0 +1,86 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <hicn/transport/core/prefix.h> + +#include <core/forwarder_interface.h> +#include <core/udp_socket_connector.h> + +#include <deque> + +namespace transport { + +namespace core { + +class HicnForwarderInterface + : public ForwarderInterface<HicnForwarderInterface, UdpSocketConnector> { + static constexpr uint8_t ack_code = 0xc2; + static constexpr uint8_t nack_code = 0xc3; + + public: + union addressLight { + uint32_t ipv4; + struct in6_addr ipv6; + }; + + struct route_to_self_command { + uint8_t messageType; + uint8_t commandID; + uint16_t length; + uint32_t seqNum; + char symbolicOrConnid[16]; + union addressLight address; + uint16_t cost; + uint8_t addressType; + uint8_t len; + }; + + using route_to_self_command = struct route_to_self_command; + using ConnectorType = UdpSocketConnector; + + HicnForwarderInterface(UdpSocketConnector &connector); + + ~HicnForwarderInterface(); + + void connect(bool is_consumer); + + void registerRoute(Prefix &prefix); + + std::uint16_t getMtu() { return interface_mtu; } + + TRANSPORT_ALWAYS_INLINE static bool isControlMessageImpl( + const uint8_t *message) { + return message[0] == ack_code || message[0] == nack_code; + } + + TRANSPORT_ALWAYS_INLINE void processControlMessageReplyImpl( + Packet::MemBufPtr &&packet_buffer) { + if (packet_buffer->data()[0] == nack_code) { + throw errors::RuntimeException( + "Received Nack message from hicn light forwarder."); + } + } + + void closeConnection(); + + private: + static constexpr std::uint16_t interface_mtu = 1500; +}; + +} // namespace core + +} // namespace transport diff --git a/libtransport/src/core/hicn_vapi.c b/libtransport/src/core/hicn_vapi.c new file mode 100644 index 000000000..d19e36346 --- /dev/null +++ b/libtransport/src/core/hicn_vapi.c @@ -0,0 +1,216 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <hicn/transport/config.h> + +#ifdef __vpp__ + +#include <hicn/transport/utils/log.h> + +#include <core/hicn_vapi.h> + +#define HICN_VPP_PLUGIN +#include <hicn/name.h> +#undef HICN_VPP_PLUGIN + +#include <vapi/vapi_safe.h> +#include <vlib/vlib.h> +#include <vlibapi/api.h> +#include <vlibmemory/api.h> +#include <vppinfra/error.h> + +#include <vnet/fib/fib_types.h> +#include <vnet/ip/format.h> +#include <vnet/ip/ip4_packet.h> +#include <vnet/ip/ip6_packet.h> + +#include <vapi/hicn.api.vapi.h> +#include <vpp_plugins/hicn/error.h> + +///////////////////////////////////////////////////// +const char *HICN_ERROR_STRING[] = { +#define _(a, b, c) c, + foreach_hicn_error +#undef _ +}; +///////////////////////////////////////////////////// + +/*********************** Missing Symbol in vpp libraries + * *************************/ +u8 *format_vl_api_address_union(u8 *s, va_list *args) { return NULL; } + +/*********************************************************************************/ + +DEFINE_VAPI_MSG_IDS_HICN_API_JSON + +static vapi_error_e register_prod_app_cb( + vapi_ctx_t ctx, void *callback_ctx, vapi_error_e rv, bool is_last, + vapi_payload_hicn_api_register_prod_app_reply *reply) { + hicn_producer_output_params *output_params = + (hicn_producer_output_params *)callback_ctx; + + if (reply == NULL) return rv; + + output_params->cs_reserved = reply->cs_reserved; + output_params->prod_addr = (ip_address_t *)malloc(sizeof(ip_address_t)); + memset(output_params->prod_addr, 0, sizeof(ip_address_t)); + if (reply->prod_addr.af == ADDRESS_IP6) + memcpy(&output_params->prod_addr->v6, reply->prod_addr.un.ip6, + sizeof(ip6_address_t)); + else + memcpy(&output_params->prod_addr->v4, reply->prod_addr.un.ip4, + sizeof(ip4_address_t)); + output_params->face_id = reply->faceid; + + return reply->retval; +} + +int hicn_vapi_register_prod_app(vapi_ctx_t ctx, + hicn_producer_input_params *input_params, + hicn_producer_output_params *output_params) { + vapi_lock(); + vapi_msg_hicn_api_register_prod_app *msg = + vapi_alloc_hicn_api_register_prod_app(ctx); + + if (ip46_address_is_ip4((ip46_address_t *)&input_params->prefix->address)) { + memcpy(&msg->payload.prefix.address.un.ip4, &input_params->prefix->address, + sizeof(ip4_address_t)); + msg->payload.prefix.address.af = ADDRESS_IP4; + } else { + memcpy(&msg->payload.prefix.address.un.ip6, &input_params->prefix->address, + sizeof(ip6_address_t)); + msg->payload.prefix.address.af = ADDRESS_IP6; + } + msg->payload.prefix.len = input_params->prefix->len; + + msg->payload.swif = input_params->swif; + msg->payload.cs_reserved = input_params->cs_reserved; + + int ret = vapi_hicn_api_register_prod_app(ctx, msg, register_prod_app_cb, + output_params); + vapi_unlock(); + return ret; +} + +static vapi_error_e face_prod_del_cb( + vapi_ctx_t ctx, void *callback_ctx, vapi_error_e rv, bool is_last, + vapi_payload_hicn_api_face_prod_del_reply *reply) { + if (reply == NULL) return rv; + + return reply->retval; +} + +int hicn_vapi_face_prod_del(vapi_ctx_t ctx, + hicn_del_face_app_input_params *input_params) { + vapi_lock(); + vapi_msg_hicn_api_face_prod_del *msg = vapi_alloc_hicn_api_face_prod_del(ctx); + + msg->payload.faceid = input_params->face_id; + + int ret = vapi_hicn_api_face_prod_del(ctx, msg, face_prod_del_cb, NULL); + vapi_unlock(); + return ret; +} + +static vapi_error_e register_cons_app_cb( + vapi_ctx_t ctx, void *callback_ctx, vapi_error_e rv, bool is_last, + vapi_payload_hicn_api_register_cons_app_reply *reply) { + hicn_consumer_output_params *output_params = + (hicn_consumer_output_params *)callback_ctx; + + if (reply == NULL) return rv; + + output_params->src6 = (ip_address_t *)malloc(sizeof(ip_address_t)); + output_params->src4 = (ip_address_t *)malloc(sizeof(ip_address_t)); + memset(output_params->src6, 0, sizeof(ip_address_t)); + memset(output_params->src4, 0, sizeof(ip_address_t)); + memcpy(&output_params->src6->v6, &reply->src_addr6.un.ip6, + sizeof(ip6_address_t)); + memcpy(&output_params->src4->v4, &reply->src_addr4.un.ip4, + sizeof(ip4_address_t)); + + output_params->face_id1 = reply->faceid1; + output_params->face_id2 = reply->faceid2; + + return reply->retval; +} + +int hicn_vapi_register_cons_app(vapi_ctx_t ctx, + hicn_consumer_input_params *input_params, + hicn_consumer_output_params *output_params) { + vapi_lock(); + vapi_msg_hicn_api_register_cons_app *msg = + vapi_alloc_hicn_api_register_cons_app(ctx); + + msg->payload.swif = input_params->swif; + + int ret = vapi_hicn_api_register_cons_app(ctx, msg, register_cons_app_cb, + output_params); + vapi_unlock(); + return ret; +} + +static vapi_error_e face_cons_del_cb( + vapi_ctx_t ctx, void *callback_ctx, vapi_error_e rv, bool is_last, + vapi_payload_hicn_api_face_cons_del_reply *reply) { + if (reply == NULL) return rv; + + return reply->retval; +} + +int hicn_vapi_face_cons_del(vapi_ctx_t ctx, + hicn_del_face_app_input_params *input_params) { + vapi_lock(); + vapi_msg_hicn_api_face_cons_del *msg = vapi_alloc_hicn_api_face_cons_del(ctx); + + msg->payload.faceid = input_params->face_id; + + int ret = vapi_hicn_api_face_cons_del(ctx, msg, face_cons_del_cb, NULL); + vapi_unlock(); + return ret; +} + +static vapi_error_e reigster_route_cb( + vapi_ctx_t ctx, void *callback_ctx, vapi_error_e rv, bool is_last, + vapi_payload_hicn_api_route_nhops_add_reply *reply) { + if (reply == NULL) return rv; + + return reply->retval; +} + +int hicn_vapi_register_route(vapi_ctx_t ctx, + hicn_producer_set_route_params *input_params) { + vapi_lock(); + vapi_msg_hicn_api_route_nhops_add *msg = + vapi_alloc_hicn_api_route_nhops_add(ctx); + + fib_prefix_t prefix; + memcpy(&prefix.fp_addr, &input_params->prefix->address, + sizeof(ip46_address_t)); + prefix.fp_len = input_params->prefix->len; + msg->payload.face_ids[0] = input_params->face_id; + msg->payload.n_faces = 1; + + int ret = vapi_hicn_api_route_nhops_add(ctx, msg, reigster_route_cb, NULL); + + vapi_unlock(); + return ret; +} + +char *hicn_vapi_get_error_string(int ret_val) { + return get_error_string(ret_val); +} + +#endif // __vpp__ diff --git a/libtransport/src/core/hicn_vapi.h b/libtransport/src/core/hicn_vapi.h new file mode 100644 index 000000000..f2718e6f5 --- /dev/null +++ b/libtransport/src/core/hicn_vapi.h @@ -0,0 +1,86 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <hicn/transport/config.h> +#include <hicn/util/ip_address.h> + +#ifdef __vpp__ + +#ifdef __cplusplus +extern "C" { +#endif + +#include <vapi/vapi.h> + +#include "stdint.h" + +typedef struct { + ip_prefix_t* prefix; + uint32_t swif; + uint32_t cs_reserved; +} hicn_producer_input_params; + +typedef struct { + uint32_t swif; +} hicn_consumer_input_params; + +typedef struct { + uint32_t face_id; +} hicn_del_face_app_input_params; + +typedef struct { + uint32_t cs_reserved; + ip_address_t* prod_addr; + uint32_t face_id; +} hicn_producer_output_params; + +typedef struct { + ip_address_t* src4; + ip_address_t* src6; + uint32_t face_id1; + uint32_t face_id2; +} hicn_consumer_output_params; + +typedef struct { + ip_prefix_t* prefix; + uint32_t face_id; +} hicn_producer_set_route_params; + +int hicn_vapi_register_prod_app( + vapi_ctx_t ctx, hicn_producer_input_params* input_params, + hicn_producer_output_params* output_params); + +int hicn_vapi_register_cons_app( + vapi_ctx_t ctx, hicn_consumer_input_params* input_params, + hicn_consumer_output_params* output_params); + +int hicn_vapi_register_route( + vapi_ctx_t ctx, hicn_producer_set_route_params* input_params); + +int hicn_vapi_face_cons_del( + vapi_ctx_t ctx, hicn_del_face_app_input_params *input_params); + +int hicn_vapi_face_prod_del( + vapi_ctx_t ctx, hicn_del_face_app_input_params *input_params); + +char* hicn_vapi_get_error_string(int ret_val); + +#ifdef __cplusplus +} +#endif + +#endif // __vpp__ diff --git a/libtransport/src/core/interest.cc b/libtransport/src/core/interest.cc new file mode 100644 index 000000000..166632f0a --- /dev/null +++ b/libtransport/src/core/interest.cc @@ -0,0 +1,166 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <hicn/transport/core/interest.h> +#include <hicn/transport/errors/errors.h> +#include <hicn/transport/utils/hash.h> + +extern "C" { +#ifndef _WIN32 +TRANSPORT_CLANG_DISABLE_WARNING("-Wextern-c-compat") +#endif +#include <hicn/hicn.h> +} + +#include <cstring> +#include <memory> + +namespace transport { + +namespace core { + +Interest::Interest(const Name &interest_name, Packet::Format format) + : Packet(format) { + if (hicn_interest_set_name(format_, packet_start_, + interest_name.getConstStructReference()) < 0) { + throw errors::MalformedPacketException(); + } + + if (hicn_interest_get_name(format_, packet_start_, + name_.getStructReference()) < 0) { + throw errors::MalformedPacketException(); + } +} + +#ifdef __ANDROID__ +Interest::Interest(hicn_format_t format) : Interest(Name("0::0|0"), format) {} +#else +Interest::Interest(hicn_format_t format) : Interest(base_name, format) {} +#endif + +Interest::Interest(const uint8_t *buffer, std::size_t size) + : Packet(buffer, size) { + if (hicn_interest_get_name(format_, packet_start_, + name_.getStructReference()) < 0) { + throw errors::MalformedPacketException(); + } +} + +Interest::Interest(MemBufPtr &&buffer) : Packet(std::move(buffer)) { + if (hicn_interest_get_name(format_, packet_start_, + name_.getStructReference()) < 0) { + throw errors::MalformedPacketException(); + } +} + +Interest::Interest(Interest &&other_interest) + : Packet(std::move(other_interest)) { + name_ = std::move(other_interest.name_); +} + +Interest::~Interest() {} + +void Interest::replace(MemBufPtr &&buffer) { + Packet::replace(std::move(buffer)); + + if (hicn_interest_get_name(format_, packet_start_, + name_.getStructReference()) < 0) { + throw errors::MalformedPacketException(); + } +} + +const Name &Interest::getName() const { + if (!name_) { + if (hicn_interest_get_name(format_, packet_start_, + (hicn_name_t *)name_.getConstStructReference()) < + 0) { + throw errors::MalformedPacketException(); + } + } + + return name_; +} + +Name &Interest::getWritableName() { return const_cast<Name &>(getName()); } + +void Interest::setName(const Name &name) { + if (hicn_interest_set_name(format_, packet_start_, + name.getConstStructReference()) < 0) { + throw errors::RuntimeException("Error setting interest name."); + } + + if (hicn_interest_get_name(format_, packet_start_, + name_.getStructReference()) < 0) { + throw errors::MalformedPacketException(); + } +} + +void Interest::setName(Name &&name) { + if (hicn_interest_set_name(format_, packet_start_, + name.getStructReference()) < 0) { + throw errors::RuntimeException("Error setting interest name."); + } + + if (hicn_interest_get_name(format_, packet_start_, + name_.getStructReference()) < 0) { + throw errors::MalformedPacketException(); + } +} + +void Interest::setLocator(const ip_address_t &ip_address) { + if (hicn_interest_set_locator(format_, packet_start_, &ip_address) < 0) { + throw errors::RuntimeException("Error setting interest locator."); + } + + return; +} + +ip_address_t Interest::getLocator() const { + ip_address_t ip; + + if (hicn_interest_get_locator(format_, packet_start_, &ip) < 0) { + throw errors::RuntimeException("Error getting interest locator."); + } + + return ip; +} + +void Interest::setLifetime(uint32_t lifetime) { + if (hicn_interest_set_lifetime(packet_start_, lifetime) < 0) { + throw errors::MalformedPacketException(); + } +} + +uint32_t Interest::getLifetime() const { + uint32_t lifetime = 0; + + if (hicn_interest_get_lifetime(packet_start_, &lifetime) < 0) { + throw errors::MalformedPacketException(); + } + + return lifetime; +} + +void Interest::resetForHash() { + if (hicn_interest_reset_for_hash( + format_, reinterpret_cast<hicn_header_t *>(packet_start_)) < 0) { + throw errors::RuntimeException( + "Error resetting interest fields for hash computation."); + } +} + +} // end namespace core + +} // end namespace transport diff --git a/libtransport/src/core/manifest.cc b/libtransport/src/core/manifest.cc new file mode 100644 index 000000000..3f890f3d0 --- /dev/null +++ b/libtransport/src/core/manifest.cc @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <hicn/transport/core/manifest.h> + +namespace transport { + +namespace core { + +std::string ManifestEncoding::manifest_type = std::string("manifest_type"); + +std::map<ManifestType, std::string> ManifestEncoding::manifest_types = { + {FINAL_CHUNK_NUMBER, "FinalChunkNumber"}, {NAME_LIST, "NameList"}}; + +std::string ManifestEncoding::final_chunk_number = + std::string("final_chunk_number"); +std::string ManifestEncoding::content_name = std::string("content_name"); + +} // end namespace core + +} // end namespace transport
\ No newline at end of file diff --git a/libtransport/src/core/manifest.h b/libtransport/src/core/manifest.h new file mode 100644 index 000000000..b4875bf22 --- /dev/null +++ b/libtransport/src/core/manifest.h @@ -0,0 +1,150 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <hicn/transport/core/content_object.h> +#include <hicn/transport/core/name.h> + +#include <core/manifest_format.h> + +#include <set> + +namespace transport { + +namespace core { + +using typename core::Name; +using typename core::Packet; +using typename core::PayloadType; + +template <typename Base, typename FormatTraits, typename ManifestImpl> +class Manifest : public Base { + static_assert(std::is_base_of<Packet, Base>::value, + "Base must inherit from packet!"); + + public: + using Encoder = typename FormatTraits::Encoder; + using Decoder = typename FormatTraits::Decoder; + + Manifest(std::size_t signature_size = 0) + : Base(HF_INET6_TCP_AH), + encoder_(*this, signature_size), + decoder_(*this) { + Base::setPayloadType(PayloadType::MANIFEST); + } + + Manifest(const core::Name &name, std::size_t signature_size = 0) + : Base(name, HF_INET6_TCP_AH), + encoder_(*this, signature_size), + decoder_(*this) { + Base::setPayloadType(PayloadType::MANIFEST); + } + + template <typename T> + Manifest(T &&base) + : Base(std::forward<T &&>(base)), encoder_(*this), decoder_(*this) { + Base::setPayloadType(PayloadType::MANIFEST); + } + + virtual ~Manifest() = default; + + std::size_t estimateManifestSize(std::size_t additional_entries = 0) { + return static_cast<ManifestImpl &>(*this).estimateManifestSizeImpl( + additional_entries); + } + + /* + * After the call to encode, users MUST call clear before adding data + * to the manifest. + */ + Manifest &encode() { return static_cast<ManifestImpl &>(*this).encodeImpl(); } + + Manifest &decode() { + Manifest::decoder_.decode(); + + manifest_type_ = decoder_.getManifestType(); + hash_algorithm_ = decoder_.getHashAlgorithm(); + is_last_ = decoder_.getIsFinalManifest(); + + return static_cast<ManifestImpl &>(*this).decodeImpl(); + } + + static std::size_t getManifestHeaderSize() { + return Encoder::getManifestHeaderSize(); + } + + static std::size_t getManifestEntrySize() { + return Encoder::getManifestEntrySize(); + } + + Manifest &setManifestType(ManifestType type) { + manifest_type_ = type; + encoder_.setManifestType(manifest_type_); + return *this; + } + + Manifest &setHashAlgorithm(HashAlgorithm hash_algorithm) { + hash_algorithm_ = hash_algorithm; + encoder_.setHashAlgorithm(hash_algorithm_); + return *this; + } + + HashAlgorithm getHashAlgorithm() { return hash_algorithm_; } + + ManifestType getManifestType() const { return manifest_type_; } + + bool isFinalManifest() const { return is_last_; } + + Manifest &setVersion(ManifestVersion version) { + encoder_.setVersion(version); + return *this; + } + + Manifest &setFinalBlockNumber(std::uint32_t final_block_number) { + encoder_.setFinalBlockNumber(final_block_number); + return *this; + } + + uint32_t getFinalBlockNumber() const { + return decoder_.getFinalBlockNumber(); + } + + ManifestVersion getVersion() const { return decoder_.getVersion(); } + + Manifest &setFinalManifest(bool is_final_manifest) { + encoder_.setIsFinalManifest(is_final_manifest); + is_last_ = is_final_manifest; + return *this; + } + + Manifest &clear() { + encoder_.clear(); + decoder_.clear(); + return *this; + } + + protected: + ManifestType manifest_type_; + HashAlgorithm hash_algorithm_; + bool is_last_; + + Encoder encoder_; + Decoder decoder_; +}; + +} // end namespace core + +} // end namespace transport diff --git a/libtransport/src/core/manifest_format.h b/libtransport/src/core/manifest_format.h new file mode 100644 index 000000000..f95d19aa8 --- /dev/null +++ b/libtransport/src/core/manifest_format.h @@ -0,0 +1,210 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <hicn/transport/core/name.h> +#include <hicn/transport/security/crypto_hasher.h> + +#include <cinttypes> +#include <type_traits> +#include <unordered_map> + +namespace transport { + +namespace core { + +enum class ManifestFields : uint8_t { + VERSION, + HASH_ALGORITHM, + SEGMENT_CALCULATION_STRATEGY, + FINAL_MANIFEST, + NAME_HASH_LIST, + BASE_NAME +}; + +enum class ManifestVersion : uint8_t { + VERSION_1 = 1, +}; + +enum class ManifestType : uint8_t { + INLINE_MANIFEST = 1, + FINAL_CHUNK_NUMBER = 2, + FLIC_MANIFEST = 3, +}; + +enum class HashAlgorithm : uint8_t { + SHA_256 = static_cast<uint8_t>(utils::CryptoHashType::SHA_256), + SHA_512 = static_cast<uint8_t>(utils::CryptoHashType::SHA_512), + CRC32C = static_cast<uint8_t>(utils::CryptoHashType::CRC32C), +}; + +/** + * INCREMENTAL: Manifests will be received inline with the data with no specific + * assumption regarding the manifest capacity. Consumers can send interests + * using a +1 heuristic. + * + * MANIFEST_CAPACITY_BASED: manifests with capacity N have a suffix multiple of + * N+1: 0, N+1, 2(N+1) etc. Contents have a suffix incremented by 1 except when + * it conflicts with a manifest: 1, 2, ..., N, N+2, N+3, ..., 2N+1, 2N+3 + */ +enum class NextSegmentCalculationStrategy : uint8_t { + INCREMENTAL = 1, + MANIFEST_CAPACITY_BASED = 2, +}; + +template <typename T> +struct format_traits { + using Encoder = typename T::Encoder; + using Decoder = typename T::Decoder; + using HashType = typename T::HashType; + using HashList = typename T::HashList; +}; + +class Packet; + +template <typename Implementation> +class ManifestEncoder { + public: + virtual ~ManifestEncoder() = default; + + ManifestEncoder encode() { + return static_cast<Implementation &>(*this).encodeImpl(); + } + + ManifestEncoder &clear() { + return static_cast<Implementation &>(*this).clearImpl(); + } + + ManifestEncoder &setManifestType(ManifestType type) { + return static_cast<Implementation &>(*this).setManifestTypeImpl(type); + } + + ManifestEncoder &setHashAlgorithm(HashAlgorithm hash) { + return static_cast<Implementation &>(*this).setHashAlgorithmImpl(hash); + } + + ManifestEncoder &setFinalChunkNumber(uint32_t final_chunk) { + return static_cast<Implementation &>(*this).setFinalChunkImpl(final_chunk); + } + + ManifestEncoder &setNextSegmentCalculationStrategy( + NextSegmentCalculationStrategy strategy) { + return static_cast<Implementation &>(*this) + .setNextSegmentCalculationStrategyImpl(strategy); + } + + template < + typename T, + typename = std::enable_if_t<std::is_same< + std::remove_const_t<std::remove_reference_t<T>>, core::Name>::value>> + ManifestEncoder &setBaseName(T &&name) { + return static_cast<Implementation &>(*this).setBaseNameImpl(name); + } + + template <typename Hash> + ManifestEncoder &addSuffixAndHash(uint32_t suffix, Hash &&hash) { + return static_cast<Implementation &>(*this).addSuffixAndHashImpl( + suffix, std::forward<Hash &&>(hash)); + } + + ManifestEncoder &setIsFinalManifest(bool is_last) { + return static_cast<Implementation &>(*this).setIsFinalManifestImpl(is_last); + } + + ManifestEncoder &setVersion(ManifestVersion version) { + return static_cast<Implementation &>(*this).setVersionImpl(version); + } + + std::size_t estimateSerializedLength(std::size_t number_of_entries) { + return static_cast<Implementation &>(*this).estimateSerializedLengthImpl( + number_of_entries); + } + + ManifestEncoder &update() { + return static_cast<Implementation &>(*this).updateImpl(); + } + + ManifestEncoder &setFinalBlockNumber(std::uint32_t final_block_number) { + return static_cast<Implementation &>(*this).setFinalBlockNumberImpl( + final_block_number); + } + + static std::size_t getManifestHeaderSize() { + return Implementation::getManifestHeaderSizeImpl(); + } + + static std::size_t getManifestEntrySize() { + return Implementation::getManifestEntrySizeImpl(); + } +}; + +template <typename Implementation> +class ManifestDecoder { + public: + virtual ~ManifestDecoder() = default; + + ManifestDecoder &clear() { + return static_cast<Implementation &>(*this).clearImpl(); + } + + void decode() { static_cast<Implementation &>(*this).decodeImpl(); } + + ManifestType getManifestType() const { + return static_cast<const Implementation &>(*this).getManifestTypeImpl(); + } + + HashAlgorithm getHashAlgorithm() const { + return static_cast<const Implementation &>(*this).getHashAlgorithmImpl(); + } + + uint32_t getFinalChunkNumber() const { + return static_cast<const Implementation &>(*this).getFinalChunkImpl(); + } + + NextSegmentCalculationStrategy getNextSegmentCalculationStrategy() const { + return static_cast<const Implementation &>(*this) + .getNextSegmentCalculationStrategyImpl(); + } + + core::Name getBaseName() const { + return static_cast<const Implementation &>(*this).getBaseNameImpl(); + } + + auto getSuffixHashList() { + return static_cast<Implementation &>(*this).getSuffixHashListImpl(); + } + + bool getIsFinalManifest() const { + return static_cast<const Implementation &>(*this).getIsFinalManifestImpl(); + } + + ManifestVersion getVersion() const { + return static_cast<const Implementation &>(*this).getVersionImpl(); + } + + std::size_t estimateSerializedLength(std::size_t number_of_entries) const { + return static_cast<const Implementation &>(*this) + .estimateSerializedLengthImpl(number_of_entries); + } + + uint32_t getFinalBlockNumber() const { + return static_cast<const Implementation &>(*this).getFinalBlockNumberImpl(); + } +}; + +} // namespace core + +} // namespace transport diff --git a/libtransport/src/core/manifest_format_fixed.cc b/libtransport/src/core/manifest_format_fixed.cc new file mode 100644 index 000000000..4073a5d26 --- /dev/null +++ b/libtransport/src/core/manifest_format_fixed.cc @@ -0,0 +1,227 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <hicn/transport/core/packet.h> +#include <hicn/transport/utils/literals.h> + +#include <core/manifest_format_fixed.h> + +namespace transport { + +namespace core { + +// TODO use preallocated pool of membufs +FixedManifestEncoder::FixedManifestEncoder(Packet &packet, + std::size_t signature_size) + : packet_(packet), + max_size_(Packet::default_mtu - packet_.headerSize() - signature_size), + manifest_( + utils::MemBuf::create(Packet::default_mtu - packet_.headerSize())), + manifest_header_( + reinterpret_cast<ManifestHeader *>(manifest_->writableData())), + manifest_entries_(reinterpret_cast<ManifestEntry *>( + manifest_->writableData() + sizeof(ManifestHeader))), + current_entry_(0), + signature_size_(signature_size) { + *manifest_header_ = {0}; +} + +FixedManifestEncoder::~FixedManifestEncoder() {} + +FixedManifestEncoder &FixedManifestEncoder::encodeImpl() { + manifest_->append(sizeof(ManifestHeader) + + manifest_header_->number_of_entries * + sizeof(ManifestEntry)); + packet_.appendPayload(std::move(manifest_)); + return *this; +} + +FixedManifestEncoder &FixedManifestEncoder::clearImpl() { + manifest_ = utils::MemBuf::create(Packet::default_mtu - packet_.headerSize() - + signature_size_); + return *this; +} + +FixedManifestEncoder &FixedManifestEncoder::setHashAlgorithmImpl( + HashAlgorithm algorithm) { + manifest_header_->hash_algorithm = static_cast<uint8_t>(algorithm); + return *this; +} + +FixedManifestEncoder &FixedManifestEncoder::setManifestTypeImpl( + ManifestType manifest_type) { + manifest_header_->manifest_type = static_cast<uint8_t>(manifest_type); + return *this; +} + +FixedManifestEncoder & +FixedManifestEncoder::setNextSegmentCalculationStrategyImpl( + NextSegmentCalculationStrategy strategy) { + manifest_header_->next_segment_strategy = static_cast<uint8_t>(strategy); + return *this; +} + +FixedManifestEncoder &FixedManifestEncoder::setBaseNameImpl( + const core::Name &base_name) { + base_name.copyToDestination( + reinterpret_cast<uint8_t *>(&manifest_header_->prefix[0]), false); + manifest_header_->flags.ipv6 = + base_name.getAddressFamily() == AF_INET6 ? 1_U8 : 0_U8; + return *this; +} + +FixedManifestEncoder &FixedManifestEncoder::addSuffixAndHashImpl( + uint32_t suffix, const utils::CryptoHash &hash) { + auto _hash = hash.getDigest<std::uint8_t>(); + addSuffixHashBytes(suffix, _hash.data(), _hash.length()); + return *this; +} + +void FixedManifestEncoder::addSuffixHashBytes(uint32_t suffix, + const uint8_t *hash, + std::size_t length) { + manifest_entries_[current_entry_].suffix = htonl(suffix); + // std::copy(hash, hash + length, + // manifest_entries_[current_entry_].hash); + std::memcpy( + reinterpret_cast<uint8_t *>(manifest_entries_[current_entry_].hash), hash, + length); + + manifest_header_->number_of_entries++; + current_entry_++; + + if (TRANSPORT_EXPECT_FALSE(estimateSerializedLengthImpl() > max_size_)) { + throw errors::RuntimeException("Manifest size exceeded the packet MTU!"); + } +} + +FixedManifestEncoder &FixedManifestEncoder::setIsFinalManifestImpl( + bool is_last) { + manifest_header_->flags.is_last = static_cast<uint8_t>(is_last); + return *this; +} + +FixedManifestEncoder &FixedManifestEncoder::setVersionImpl( + ManifestVersion version) { + manifest_header_->version = static_cast<uint8_t>(version); + return *this; +} + +std::size_t FixedManifestEncoder::estimateSerializedLengthImpl( + std::size_t additional_entries) { + return sizeof(ManifestHeader) + + (manifest_header_->number_of_entries + additional_entries) * + sizeof(ManifestEntry); +} + +FixedManifestEncoder &FixedManifestEncoder::updateImpl() { + max_size_ = Packet::default_mtu - packet_.headerSize() - signature_size_; + return *this; +} + +FixedManifestEncoder &FixedManifestEncoder::setFinalBlockNumberImpl( + std::uint32_t final_block_number) { + manifest_header_->final_block_number = htonl(final_block_number); + return *this; +} + +std::size_t FixedManifestEncoder::getManifestHeaderSizeImpl() { + return sizeof(ManifestHeader); +} + +std::size_t FixedManifestEncoder::getManifestEntrySizeImpl() { + return sizeof(ManifestEntry); +} + +FixedManifestDecoder::FixedManifestDecoder(Packet &packet) + : packet_(packet), + manifest_header_(reinterpret_cast<ManifestHeader *>( + packet_.getPayload()->writableData())), + manifest_entries_(reinterpret_cast<ManifestEntry *>( + packet_.getPayload()->writableData() + sizeof(ManifestHeader))) {} + +FixedManifestDecoder::~FixedManifestDecoder() {} + +void FixedManifestDecoder::decodeImpl() { + std::size_t packet_size = packet_.payloadSize(); + + if (packet_size < sizeof(ManifestHeader) || + packet_size < estimateSerializedLengthImpl()) { + throw errors::RuntimeException( + "The packet does not match expected manifest size."); + } +} + +FixedManifestDecoder &FixedManifestDecoder::clearImpl() { return *this; } + +ManifestType FixedManifestDecoder::getManifestTypeImpl() const { + return static_cast<ManifestType>(manifest_header_->manifest_type); +} + +HashAlgorithm FixedManifestDecoder::getHashAlgorithmImpl() const { + return static_cast<HashAlgorithm>(manifest_header_->hash_algorithm); +} + +NextSegmentCalculationStrategy +FixedManifestDecoder::getNextSegmentCalculationStrategyImpl() const { + return static_cast<NextSegmentCalculationStrategy>( + manifest_header_->next_segment_strategy); +} + +typename Fixed::SuffixList FixedManifestDecoder::getSuffixHashListImpl() { + typename Fixed::SuffixList hash_list; + + for (int i = 0; i < manifest_header_->number_of_entries; i++) { + hash_list.insert(hash_list.end(), + std::make_pair(ntohl(manifest_entries_[i].suffix), + reinterpret_cast<uint8_t *>( + &manifest_entries_[i].hash[0]))); + } + + return hash_list; +} + +core::Name FixedManifestDecoder::getBaseNameImpl() const { + if (static_cast<bool>(manifest_header_->flags.ipv6)) { + return core::Name(AF_INET6, + reinterpret_cast<uint8_t *>(&manifest_header_->prefix)); + } else { + return core::Name(AF_INET, + reinterpret_cast<uint8_t *>(&manifest_header_->prefix)); + } +} + +bool FixedManifestDecoder::getIsFinalManifestImpl() const { + return static_cast<bool>(manifest_header_->flags.is_last); +} + +ManifestVersion FixedManifestDecoder::getVersionImpl() const { + return static_cast<ManifestVersion>(manifest_header_->version); +} + +std::size_t FixedManifestDecoder::estimateSerializedLengthImpl( + std::size_t additional_entries) const { + return sizeof(ManifestHeader) + + (additional_entries + manifest_header_->number_of_entries) * + sizeof(ManifestEntry); +} + +uint32_t FixedManifestDecoder::getFinalBlockNumberImpl() const { + return ntohl(manifest_header_->final_block_number); +} + +} // end namespace core + +} // end namespace transport diff --git a/libtransport/src/core/manifest_format_fixed.h b/libtransport/src/core/manifest_format_fixed.h new file mode 100644 index 000000000..9cacb3bc2 --- /dev/null +++ b/libtransport/src/core/manifest_format_fixed.h @@ -0,0 +1,172 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <hicn/transport/core/packet.h> + +#include <core/manifest_format.h> + +#include <string> + +namespace transport { + +namespace core { + +// 0 1 2 3 +// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// |Version| MType |HashAlg|NextStr| Flags |NumberOfEntries| +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | Final Block Number | +// +---------------------------------------------------------------| +// | | +// + + +// | | +// + Prefix + +// | | +// + + +// | | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | Suffix | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | Hash Value | +// | | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + +class FixedManifestEncoder; +class FixedManifestDecoder; +class Packet; + +struct Fixed { + using Encoder = FixedManifestEncoder; + using Decoder = FixedManifestDecoder; + using HashType = utils::CryptoHash; + using SuffixList = std::list<std::pair<std::uint32_t, std::uint8_t *>>; +}; + +struct Flags { + std::uint8_t ipv6 : 1; + std::uint8_t is_last : 1; + std::uint8_t unused : 6; +}; + +struct ManifestEntry { + std::uint32_t suffix; + std::uint32_t hash[8]; +}; + +struct ManifestHeader { + std::uint8_t version : 4; + std::uint8_t manifest_type : 4; + std::uint8_t hash_algorithm : 4; + std::uint8_t next_segment_strategy : 4; + Flags flags; + std::uint8_t number_of_entries; + std::uint32_t final_block_number; + std::uint32_t prefix[4]; + ManifestEntry entries[0]; +}; + +static const constexpr std::uint8_t manifest_version = 1; + +class FixedManifestEncoder : public ManifestEncoder<FixedManifestEncoder> { + public: + FixedManifestEncoder(Packet &packet, std::size_t signature_size = 0); + + ~FixedManifestEncoder(); + + FixedManifestEncoder &encodeImpl(); + + FixedManifestEncoder &clearImpl(); + + FixedManifestEncoder &setManifestTypeImpl(ManifestType manifest_type); + + FixedManifestEncoder &setHashAlgorithmImpl(HashAlgorithm algorithm); + + FixedManifestEncoder &setNextSegmentCalculationStrategyImpl( + NextSegmentCalculationStrategy strategy); + + FixedManifestEncoder &setBaseNameImpl(const core::Name &base_name); + + FixedManifestEncoder &addSuffixAndHashImpl(uint32_t suffix, + const utils::CryptoHash &hash); + + FixedManifestEncoder &setIsFinalManifestImpl(bool is_last); + + FixedManifestEncoder &setVersionImpl(ManifestVersion version); + + std::size_t estimateSerializedLengthImpl(std::size_t additional_entries = 0); + + FixedManifestEncoder &updateImpl(); + + FixedManifestEncoder &setFinalBlockNumberImpl( + std::uint32_t final_block_number); + + static std::size_t getManifestHeaderSizeImpl(); + + static std::size_t getManifestEntrySizeImpl(); + + private: + void addSuffixHashBytes(uint32_t suffix, const uint8_t *hash, + std::size_t length); + + Packet &packet_; + std::size_t max_size_; + std::unique_ptr<utils::MemBuf> manifest_; + ManifestHeader *manifest_header_; + ManifestEntry *manifest_entries_; + std::size_t current_entry_; + std::size_t signature_size_; +}; + +class FixedManifestDecoder : public ManifestDecoder<FixedManifestDecoder> { + public: + FixedManifestDecoder(Packet &packet); + + ~FixedManifestDecoder(); + + void decodeImpl(); + + FixedManifestDecoder &clearImpl(); + + ManifestType getManifestTypeImpl() const; + + HashAlgorithm getHashAlgorithmImpl() const; + + NextSegmentCalculationStrategy getNextSegmentCalculationStrategyImpl() const; + + typename Fixed::SuffixList getSuffixHashListImpl(); + + core::Name getBaseNameImpl() const; + + bool getIsFinalManifestImpl() const; + + std::size_t estimateSerializedLengthImpl( + std::size_t additional_entries = 0) const; + + ManifestVersion getVersionImpl() const; + + uint32_t getFinalBlockNumberImpl() const; + + private: + Packet &packet_; + ManifestHeader *manifest_header_; + ManifestEntry *manifest_entries_; +}; + +} // namespace core + +} // namespace transport diff --git a/libtransport/src/core/manifest_inline.h b/libtransport/src/core/manifest_inline.h new file mode 100644 index 000000000..235c6f3a0 --- /dev/null +++ b/libtransport/src/core/manifest_inline.h @@ -0,0 +1,115 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <hicn/transport/portability/portability.h> + +#include <core/manifest.h> +#include <core/manifest_format.h> +#include <set> + +namespace transport { + +namespace core { + +template <typename Base, typename FormatTraits> +class ManifestInline + : public Manifest<Base, FormatTraits, ManifestInline<Base, FormatTraits>> { + using ManifestBase = + Manifest<Base, FormatTraits, ManifestInline<Base, FormatTraits>>; + using HashType = typename FormatTraits::HashType; + using SuffixList = typename FormatTraits::SuffixList; + + public: + ManifestInline() : ManifestBase() {} + + ManifestInline(const core::Name &name, std::size_t signature_size = 0) + : ManifestBase(name, signature_size) {} + + template <typename T> + ManifestInline(T &&base) : ManifestBase(std::forward<T &&>(base)) {} + + static TRANSPORT_ALWAYS_INLINE ManifestInline *createManifest( + const core::Name &manifest_name, ManifestVersion version, + ManifestType type, HashAlgorithm algorithm, bool is_last, + const Name &base_name, NextSegmentCalculationStrategy strategy, + std::size_t signature_size) { + auto manifest = new ManifestInline(manifest_name, signature_size); + manifest->setVersion(version); + manifest->setManifestType(type); + manifest->setHashAlgorithm(algorithm); + manifest->setFinalManifest(is_last); + manifest->setBaseName(base_name); + manifest->setNextSegmentCalculationStrategy(strategy); + + return manifest; + } + + ManifestInline &encodeImpl() { + ManifestBase::encoder_.encode(); + return *this; + } + + ManifestInline &decodeImpl() { + base_name_ = ManifestBase::decoder_.getBaseName(); + next_segment_strategy_ = + ManifestBase::decoder_.getNextSegmentCalculationStrategy(); + suffix_hash_map_ = ManifestBase::decoder_.getSuffixHashList(); + + return *this; + } + + std::size_t estimateManifestSizeImpl(std::size_t additional_entries = 0) { + return ManifestBase::encoder_.estimateSerializedLength(additional_entries); + } + + ManifestInline &setBaseName(const Name &name) { + base_name_ = name; + ManifestBase::encoder_.setBaseName(base_name_); + return *this; + } + + const Name &getBaseName() { return base_name_; } + + ManifestInline &addSuffixHash(uint32_t suffix, const HashType &hash) { + ManifestBase::encoder_.addSuffixAndHash(suffix, hash); + return *this; + } + + // Call this function only after the decode function! + const SuffixList &getSuffixList() { return suffix_hash_map_; } + + ManifestInline &setNextSegmentCalculationStrategy( + NextSegmentCalculationStrategy strategy) { + next_segment_strategy_ = strategy; + ManifestBase::encoder_.setNextSegmentCalculationStrategy( + next_segment_strategy_); + return *this; + } + + NextSegmentCalculationStrategy getNextSegmentCalculationStrategy() { + return next_segment_strategy_; + } + + private: + core::Name base_name_; + NextSegmentCalculationStrategy next_segment_strategy_; + SuffixList suffix_hash_map_; +}; + +} // end namespace core + +} // end namespace transport
\ No newline at end of file diff --git a/libtransport/src/core/memif_connector.cc b/libtransport/src/core/memif_connector.cc new file mode 100644 index 000000000..2292e9b41 --- /dev/null +++ b/libtransport/src/core/memif_connector.cc @@ -0,0 +1,494 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <hicn/transport/errors/not_implemented_exception.h> + +#include <core/memif_connector.h> + +#ifdef __vpp__ + +#include <sys/epoll.h> +#include <cstdlib> + +extern "C" { +#include <memif/libmemif.h> +}; + +#define CANCEL_TIMER 1 + +namespace transport { + +namespace core { + +struct memif_connection { + uint16_t index; + /* memif conenction handle */ + memif_conn_handle_t conn; + /* transmit queue id */ + uint16_t tx_qid; + /* tx buffers */ + memif_buffer_t *tx_bufs; + /* allocated tx buffers counter */ + /* number of tx buffers pointing to shared memory */ + uint16_t tx_buf_num; + /* rx buffers */ + memif_buffer_t *rx_bufs; + /* allcoated rx buffers counter */ + /* number of rx buffers pointing to shared memory */ + uint16_t rx_buf_num; + /* interface ip address */ + uint8_t ip_addr[4]; +}; + +std::once_flag MemifConnector::flag_; +utils::EpollEventReactor MemifConnector::main_event_reactor_; + +MemifConnector::MemifConnector(PacketReceivedCallback &&receive_callback, + OnReconnect &&on_reconnect_callback, + asio::io_service &io_service, + std::string app_name) + : Connector(std::move(receive_callback), std::move(on_reconnect_callback)), + memif_worker_(nullptr), + timer_set_(false), + send_timer_(std::make_unique<utils::FdDeadlineTimer>(event_reactor_)), + disconnect_timer_( + std::make_unique<utils::FdDeadlineTimer>(event_reactor_)), + io_service_(io_service), + packet_counter_(0), + memif_connection_(std::make_unique<memif_connection_t>()), + tx_buf_counter_(0), + is_reconnection_(false), + data_available_(false), + app_name_(app_name), + socket_filename_("") { + std::call_once(MemifConnector::flag_, &MemifConnector::init, this); +} + +MemifConnector::~MemifConnector() { close(); } + +void MemifConnector::init() { + /* initialize memory interface */ + int err = memif_init(controlFdUpdate, const_cast<char *>(app_name_.c_str()), + nullptr, nullptr, nullptr); + + if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) { + TRANSPORT_LOGE("memif_init: %s", memif_strerror(err)); + } +} + +void MemifConnector::connect(uint32_t memif_id, long memif_mode) { + state_ = ConnectorState::CONNECTING; + + memif_id_ = memif_id; + socket_filename_ = "/run/vpp/memif.sock"; + + createMemif(memif_id, memif_mode, nullptr); + + work_ = std::make_unique<asio::io_service::work>(io_service_); + + while (state_ != ConnectorState::CONNECTED) { + MemifConnector::main_event_reactor_.runOneEvent(); + } + + int err; + + /* get interrupt queue id */ + int fd = -1; + err = memif_get_queue_efd(memif_connection_->conn, 0, &fd); + if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) { + TRANSPORT_LOGE("memif_get_queue_efd: %s", memif_strerror(err)); + return; + } + + // Remove fd from main epoll + main_event_reactor_.delFileDescriptor(fd); + + // Add fd to epoll of instance + event_reactor_.addFileDescriptor( + fd, EPOLLIN, [this](const utils::Event &evt) -> int { + return onInterrupt(memif_connection_->conn, this, 0); + }); + + memif_worker_ = std::make_unique<std::thread>( + std::bind(&MemifConnector::threadMain, this)); +} + +int MemifConnector::createMemif(uint32_t index, uint8_t mode, char *s) { + memif_connection_t *c = memif_connection_.get(); + + /* setting memif connection arguments */ + memif_conn_args_t args; + memset(&args, 0, sizeof(args)); + + args.is_master = mode; + args.log2_ring_size = MEMIF_LOG2_RING_SIZE; + args.buffer_size = MEMIF_BUF_SIZE; + args.num_s2m_rings = 1; + args.num_m2s_rings = 1; + strncpy((char *)args.interface_name, IF_NAME, strlen(IF_NAME) + 1); + args.mode = memif_interface_mode_t::MEMIF_INTERFACE_MODE_IP; + + int err; + + err = memif_create_socket(&args.socket, socket_filename_.c_str(), nullptr); + + if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) { + throw errors::RuntimeException(memif_strerror(err)); + } + + args.interface_id = index; + /* last argument for memif_create (void * private_ctx) is used by user + to identify connection. this context is returned with callbacks */ + + /* default interrupt */ + if (s == nullptr) { + err = memif_create(&c->conn, &args, onConnect, onDisconnect, onInterrupt, + this); + + if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) { + throw errors::RuntimeException(memif_strerror(err)); + } + } + + c->index = (uint16_t)index; + c->tx_qid = 0; + /* alloc memif buffers */ + c->rx_buf_num = 0; + c->rx_bufs = static_cast<memif_buffer_t *>( + malloc(sizeof(memif_buffer_t) * MAX_MEMIF_BUFS)); + c->tx_buf_num = 0; + c->tx_bufs = static_cast<memif_buffer_t *>( + malloc(sizeof(memif_buffer_t) * MAX_MEMIF_BUFS)); + + // memif_set_rx_mode (c->conn, MEMIF_RX_MODE_POLLING, 0); + + return 0; +} + +int MemifConnector::deleteMemif() { + memif_connection_t *c = memif_connection_.get(); + + if (c->rx_bufs) { + free(c->rx_bufs); + } + + c->rx_bufs = nullptr; + c->rx_buf_num = 0; + + if (c->tx_bufs) { + free(c->tx_bufs); + } + + c->tx_bufs = nullptr; + c->tx_buf_num = 0; + + int err; + /* disconenct then delete memif connection */ + err = memif_delete(&c->conn); + + if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) { + TRANSPORT_LOGE("memif_delete: %s", memif_strerror(err)); + } + + if (TRANSPORT_EXPECT_FALSE(c->conn != nullptr)) { + TRANSPORT_LOGE("memif delete fail"); + } + + return 0; +} + +int MemifConnector::controlFdUpdate(int fd, uint8_t events, void *private_ctx) { + /* convert memif event definitions to epoll events */ + if (events & MEMIF_FD_EVENT_DEL) { + return MemifConnector::main_event_reactor_.delFileDescriptor(fd); + } + + uint32_t evt = 0; + + if (events & MEMIF_FD_EVENT_READ) { + evt |= EPOLLIN; + } + + if (events & MEMIF_FD_EVENT_WRITE) { + evt |= EPOLLOUT; + } + + if (events & MEMIF_FD_EVENT_MOD) { + return MemifConnector::main_event_reactor_.modFileDescriptor(fd, evt); + } + + return MemifConnector::main_event_reactor_.addFileDescriptor( + fd, evt, [](const utils::Event &evt) -> int { + uint32_t event = 0; + int memif_err = 0; + + if (evt.events & EPOLLIN) { + event |= MEMIF_FD_EVENT_READ; + } + + if (evt.events & EPOLLOUT) { + event |= MEMIF_FD_EVENT_WRITE; + } + + if (evt.events & EPOLLERR) { + event |= MEMIF_FD_EVENT_ERROR; + } + + memif_err = memif_control_fd_handler(evt.data.fd, event); + + if (TRANSPORT_EXPECT_FALSE(memif_err != MEMIF_ERR_SUCCESS)) { + TRANSPORT_LOGE("memif_control_fd_handler: %s", + memif_strerror(memif_err)); + } + + return 0; + }); +} + +int MemifConnector::bufferAlloc(long n, uint16_t qid) { + memif_connection_t *c = memif_connection_.get(); + int err; + uint16_t r; + /* set data pointer to shared memory and set buffer_len to shared mmeory + * buffer len */ + err = memif_buffer_alloc(c->conn, qid, c->tx_bufs, n, &r, 2000); + + if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) { + TRANSPORT_LOGE("memif_buffer_alloc: %s", memif_strerror(err)); + } + + c->tx_buf_num += r; + return r; +} + +int MemifConnector::txBurst(uint16_t qid) { + memif_connection_t *c = memif_connection_.get(); + int err; + uint16_t r; + /* inform peer memif interface about data in shared memory buffers */ + /* mark memif buffers as free */ + err = memif_tx_burst(c->conn, qid, c->tx_bufs, c->tx_buf_num, &r); + + if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) { + TRANSPORT_LOGE("memif_tx_burst: %s", memif_strerror(err)); + } + + // err = memif_refill_queue(c->conn, qid, r, 0); + + if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) { + TRANSPORT_LOGE("memif_tx_burst: %s", memif_strerror(err)); + c->tx_buf_num -= r; + return -1; + } + + c->tx_buf_num -= r; + return 0; +} + +void MemifConnector::sendCallback(const std::error_code &ec) { + timer_set_ = false; + + if (TRANSPORT_EXPECT_TRUE(!ec && state_ == ConnectorState::CONNECTED)) { + doSend(); + } +} + +void MemifConnector::processInputBuffer() { + Packet::MemBufPtr ptr; + + while (input_buffer_.pop(ptr)) { + receive_callback_(std::move(ptr)); + } +} + +/* informs user about connected status. private_ctx is used by user to identify + connection (multiple connections WIP) */ +int MemifConnector::onConnect(memif_conn_handle_t conn, void *private_ctx) { + MemifConnector *connector = (MemifConnector *)private_ctx; + connector->state_ = ConnectorState::CONNECTED; + memif_refill_queue(conn, 0, -1, 0); + + return 0; +} + +/* informs user about disconnected status. private_ctx is used by user to + identify connection (multiple connections WIP) */ +int MemifConnector::onDisconnect(memif_conn_handle_t conn, void *private_ctx) { + MemifConnector *connector = (MemifConnector *)private_ctx; + connector->state_ = ConnectorState::CLOSED; + return 0; +} + +void MemifConnector::threadMain() { event_reactor_.runEventLoop(1000); } + +int MemifConnector::onInterrupt(memif_conn_handle_t conn, void *private_ctx, + uint16_t qid) { + MemifConnector *connector = (MemifConnector *)private_ctx; + + memif_connection_t *c = connector->memif_connection_.get(); + int err = MEMIF_ERR_SUCCESS, ret_val; + uint16_t rx; + + do { + err = memif_rx_burst(conn, qid, c->rx_bufs, MAX_MEMIF_BUFS, &rx); + ret_val = err; + + if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS && + err != MEMIF_ERR_NOBUF)) { + TRANSPORT_LOGE("memif_rx_burst: %s", memif_strerror(err)); + goto error; + } + + c->rx_buf_num += rx; + + if (TRANSPORT_EXPECT_TRUE(connector->io_service_.stopped())) { + TRANSPORT_LOGE("socket stopped: ignoring %u packets", rx); + goto error; + } + + std::size_t packet_length; + for (int i = 0; i < rx; i++) { + auto packet = connector->getPacket(); + packet_length = (c->rx_bufs + i)->len; + std::memcpy(packet->writableData(), + reinterpret_cast<const uint8_t *>((c->rx_bufs + i)->data), + packet_length); + packet->append(packet_length); + + if (!connector->input_buffer_.push(std::move(packet))) { + TRANSPORT_LOGE("Error pushing packet. Ring buffer full."); + + // TODO Here we should consider the possibility to signal the congestion + // to the application, that would react properly (e.g. slow down + // message) + } + } + + /* mark memif buffers and shared memory buffers as free */ + /* free processed buffers */ + + err = memif_refill_queue(conn, qid, rx, 0); + + if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) { + TRANSPORT_LOGE("memif_buffer_free: %s", memif_strerror(err)); + } + + c->rx_buf_num -= rx; + + } while (ret_val == MEMIF_ERR_NOBUF); + + connector->io_service_.post( + std::bind(&MemifConnector::processInputBuffer, connector)); + + return 0; + +error: + err = memif_refill_queue(c->conn, qid, rx, 0); + + if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) { + TRANSPORT_LOGE("memif_buffer_free: %s", memif_strerror(err)); + } + c->rx_buf_num -= rx; + + return 0; +} + +void MemifConnector::close() { + if (state_ != ConnectorState::CLOSED) { + disconnect_timer_->expiresFromNow(std::chrono::microseconds(50)); + disconnect_timer_->asyncWait([this](const std::error_code &ec) { + deleteMemif(); + event_reactor_.stop(); + work_.reset(); + }); + + if (memif_worker_ && memif_worker_->joinable()) { + memif_worker_->join(); + } + } +} + +void MemifConnector::send(const Packet::MemBufPtr &packet) { + { + utils::SpinLock::Acquire locked(write_msgs_lock_); + output_buffer_.push_back(packet); + } +#if CANCEL_TIMER + if (!timer_set_) { + timer_set_ = true; + send_timer_->expiresFromNow(std::chrono::microseconds(50)); + send_timer_->asyncWait( + std::bind(&MemifConnector::sendCallback, this, std::placeholders::_1)); + } +#endif +} + +int MemifConnector::doSend() { + std::size_t max = 0; + uint16_t n = 0; + std::size_t size = 0; + + { + utils::SpinLock::Acquire locked(write_msgs_lock_); + size = output_buffer_.size(); + } + + do { + max = size < MAX_MEMIF_BUFS ? size : MAX_MEMIF_BUFS; + + if (TRANSPORT_EXPECT_FALSE( + (n = bufferAlloc(max, memif_connection_->tx_qid)) < 0)) { + TRANSPORT_LOGE("Error allocating buffers."); + return -1; + } + + for (uint16_t i = 0; i < n; i++) { + utils::SpinLock::Acquire locked(write_msgs_lock_); + + auto packet = output_buffer_.front().get(); + const utils::MemBuf *current = packet; + std::size_t offset = 0; + uint8_t *shared_buffer = + reinterpret_cast<uint8_t *>(memif_connection_->tx_bufs[i].data); + do { + std::memcpy(shared_buffer + offset, current->data(), current->length()); + offset += current->length(); + current = current->next(); + } while (current != packet); + + memif_connection_->tx_bufs[i].len = uint32_t(offset); + + output_buffer_.pop_front(); + } + + txBurst(memif_connection_->tx_qid); + + utils::SpinLock::Acquire locked(write_msgs_lock_); + size = output_buffer_.size(); + } while (size > 0); + + return 0; +} + +void MemifConnector::send(const uint8_t *packet, std::size_t len, + const PacketSentCallback &packet_sent) { + throw errors::NotImplementedException(); +} + +} // end namespace core + +} // end namespace transport + +#endif // __vpp__ diff --git a/libtransport/src/core/memif_connector.h b/libtransport/src/core/memif_connector.h new file mode 100644 index 000000000..aafef1e56 --- /dev/null +++ b/libtransport/src/core/memif_connector.h @@ -0,0 +1,134 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <hicn/transport/config.h> +#include <hicn/transport/portability/portability.h> +#include <hicn/transport/utils/ring_buffer.h> + +#include <core/connector.h> +//#include <hicn/transport/core/hicn_vapi.h> +#include <utils/epoll_event_reactor.h> +#include <utils/fd_deadline_timer.h> + +#include <asio.hpp> +#include <deque> +#include <mutex> +#include <thread> + +#ifdef __vpp__ + +#define _Static_assert static_assert + +namespace transport { + +namespace core { + +typedef struct memif_connection memif_connection_t; + +#define APP_NAME "libtransport" +#define IF_NAME "vpp_connection" + +#define MEMIF_BUF_SIZE 2048 +#define MEMIF_LOG2_RING_SIZE 11 +#define MAX_MEMIF_BUFS (1 << MEMIF_LOG2_RING_SIZE) + +class MemifConnector : public Connector { + typedef void *memif_conn_handle_t; + + public: + MemifConnector(PacketReceivedCallback &&receive_callback, + OnReconnect &&on_reconnect_callback, + asio::io_service &io_service, + std::string app_name = "Libtransport"); + + ~MemifConnector() override; + + void send(const Packet::MemBufPtr &packet) override; + + void send(const uint8_t *packet, std::size_t len, + const PacketSentCallback &packet_sent = 0) override; + + void close() override; + + void connect(uint32_t memif_id, long memif_mode); + + TRANSPORT_ALWAYS_INLINE uint32_t getMemifId() { return memif_id_; }; + + private: + void init(); + + int doSend(); + + int createMemif(uint32_t index, uint8_t mode, char *s); + + uint32_t getMemifConfiguration(); + + int deleteMemif(); + + static int controlFdUpdate(int fd, uint8_t events, void *private_ctx); + + static int onConnect(memif_conn_handle_t conn, void *private_ctx); + + static int onDisconnect(memif_conn_handle_t conn, void *private_ctx); + + static int onInterrupt(memif_conn_handle_t conn, void *private_ctx, + uint16_t qid); + + void threadMain(); + + int txBurst(uint16_t qid); + + int bufferAlloc(long n, uint16_t qid); + + void sendCallback(const std::error_code &ec); + + void processInputBuffer(); + + private: + static utils::EpollEventReactor main_event_reactor_; + static std::unique_ptr<std::thread> main_worker_; + + int epfd; + std::unique_ptr<std::thread> memif_worker_; + utils::EpollEventReactor event_reactor_; + std::atomic_bool timer_set_; + std::unique_ptr<utils::FdDeadlineTimer> send_timer_; + std::unique_ptr<utils::FdDeadlineTimer> disconnect_timer_; + asio::io_service &io_service_; + std::unique_ptr<asio::io_service::work> work_; + uint32_t packet_counter_; + std::unique_ptr<memif_connection_t> memif_connection_; + uint16_t tx_buf_counter_; + + PacketRing input_buffer_; + bool is_reconnection_; + bool data_available_; + uint32_t memif_id_; + uint8_t memif_mode_; + std::string app_name_; + uint16_t transmission_index_; + utils::SpinLock write_msgs_lock_; + std::string socket_filename_; + + static std::once_flag flag_; +}; + +} // end namespace core + +} // end namespace transport + +#endif // __vpp__
\ No newline at end of file diff --git a/libtransport/src/core/memif_vapi.c b/libtransport/src/core/memif_vapi.c new file mode 100644 index 000000000..ea3513306 --- /dev/null +++ b/libtransport/src/core/memif_vapi.c @@ -0,0 +1,133 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <hicn/transport/config.h> + +#ifdef __vpp__ + +#include <core/memif_vapi.h> + +#include <fcntl.h> +#include <inttypes.h> +#include <semaphore.h> +#include <string.h> +#include <sys/stat.h> +#include <vapi/vapi_safe.h> +#include <vppinfra/clib.h> + +DEFINE_VAPI_MSG_IDS_MEMIF_API_JSON + +static vapi_error_e memif_details_cb(vapi_ctx_t ctx, void *callback_ctx, + vapi_error_e rv, bool is_last, + vapi_payload_memif_details *reply) { + uint32_t *last_memif_id = (uint32_t *)callback_ctx; + uint32_t current_memif_id = 0; + if (reply != NULL) { + current_memif_id = reply->id; + } else { + return rv; + } + + if (current_memif_id >= *last_memif_id) { + *last_memif_id = current_memif_id + 1; + } + + return rv; +} + +int memif_vapi_get_next_memif_id(vapi_ctx_t ctx, uint32_t *memif_id) { + vapi_lock(); + vapi_msg_memif_dump *msg = vapi_alloc_memif_dump(ctx); + int ret = vapi_memif_dump(ctx, msg, memif_details_cb, memif_id); + vapi_unlock(); + return ret; +} + +static vapi_error_e memif_create_cb(vapi_ctx_t ctx, void *callback_ctx, + vapi_error_e rv, bool is_last, + vapi_payload_memif_create_reply *reply) { + memif_output_params_t *output_params = (memif_output_params_t *)callback_ctx; + + if (reply == NULL) return rv; + + output_params->sw_if_index = reply->sw_if_index; + + return rv; +} + +int memif_vapi_create_memif(vapi_ctx_t ctx, memif_create_params_t *input_params, + memif_output_params_t *output_params) { + vapi_lock(); + vapi_msg_memif_create *msg = vapi_alloc_memif_create(ctx); + + int ret = 0; + if (input_params->socket_id == ~0) { + // invalid socket-id + ret = -1; + goto END; + } + + if (!is_pow2(input_params->ring_size)) { + // ring size must be power of 2 + ret = -1; + goto END; + } + + if (input_params->rx_queues > 255 || input_params->rx_queues < 1) { + // rx queue must be between 1 - 255 + ret = -1; + goto END; + } + + if (input_params->tx_queues > 255 || input_params->tx_queues < 1) { + // tx queue must be between 1 - 255 + ret = -1; + goto END; + } + + msg->payload.role = input_params->role; + msg->payload.mode = input_params->mode; + msg->payload.rx_queues = input_params->rx_queues; + msg->payload.tx_queues = input_params->tx_queues; + msg->payload.id = input_params->id; + msg->payload.socket_id = input_params->socket_id; + msg->payload.ring_size = input_params->ring_size; + msg->payload.buffer_size = input_params->buffer_size; + + ret = vapi_memif_create(ctx, msg, memif_create_cb, output_params); +END: + vapi_unlock(); + return ret; +} + +static vapi_error_e memif_delete_cb(vapi_ctx_t ctx, void *callback_ctx, + vapi_error_e rv, bool is_last, + vapi_payload_memif_delete_reply *reply) { + if (reply == NULL) return rv; + + return reply->retval; +} + +int memif_vapi_delete_memif(vapi_ctx_t ctx, uint32_t sw_if_index) { + vapi_lock(); + vapi_msg_memif_delete *msg = vapi_alloc_memif_delete(ctx); + + msg->payload.sw_if_index = sw_if_index; + + int ret = vapi_memif_delete(ctx, msg, memif_delete_cb, NULL); + vapi_unlock(); + return ret; +} + +#endif // __vpp__ diff --git a/libtransport/src/core/memif_vapi.h b/libtransport/src/core/memif_vapi.h new file mode 100644 index 000000000..c045cf093 --- /dev/null +++ b/libtransport/src/core/memif_vapi.h @@ -0,0 +1,60 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <hicn/transport/config.h> + +#ifdef __vpp__ + +#ifdef __cplusplus +extern "C" { +#endif + +#include <vapi/memif.api.vapi.h> +#include "stdint.h" + +typedef struct memif_create_params_s { + uint8_t role; + uint8_t mode; + uint8_t rx_queues; + uint8_t tx_queues; + uint32_t id; + uint32_t socket_id; + uint8_t secret[24]; + uint32_t ring_size; + uint16_t buffer_size; + uint8_t hw_addr[6]; +} memif_create_params_t; + +typedef struct memif_output_params_s { + uint32_t sw_if_index; +} memif_output_params_t; + +int memif_vapi_get_next_memif_id(vapi_ctx_t ctx, + uint32_t *memif_id); + +int memif_vapi_create_memif(vapi_ctx_t ctx, + memif_create_params_t *input_params, + memif_output_params_t *output_params); + +int memif_vapi_delete_memif(vapi_ctx_t ctx, + uint32_t sw_if_index); + +#ifdef __cplusplus +} +#endif + +#endif // __vpp__
\ No newline at end of file diff --git a/libtransport/src/core/name.cc b/libtransport/src/core/name.cc new file mode 100644 index 000000000..811e93b87 --- /dev/null +++ b/libtransport/src/core/name.cc @@ -0,0 +1,231 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <hicn/transport/core/name.h> +#include <hicn/transport/errors/errors.h> +#include <hicn/transport/errors/tokenizer_exception.h> +#include <hicn/transport/utils/hash.h> +#include <hicn/transport/utils/string_tokenizer.h> + +#include <core/manifest_format.h> + +namespace transport { + +namespace core { + +Name::Name() { name_ = {}; } + +Name::Name(int family, const uint8_t *ip_address, std::uint32_t suffix) + : name_({}) { + name_.type = HNT_UNSPEC; + std::size_t length; + uint8_t *dst = NULL; + + if (family == AF_INET) { + dst = name_.ip4.prefix_as_u8; + length = IPV4_ADDR_LEN; + name_.type = HNT_CONTIGUOUS_V4; + } else if (family == AF_INET6) { + dst = name_.ip6.prefix_as_u8; + length = IPV6_ADDR_LEN; + name_.type = HNT_CONTIGUOUS_V6; + } else { + throw errors::RuntimeException("Specified name family does not exist."); + } + + std::memcpy(dst, ip_address, length); + *reinterpret_cast<std::uint32_t *>(dst + length) = suffix; +} + +Name::Name(const char *name, uint32_t segment) { + name_.type = HNT_UNSPEC; + if (hicn_name_create(name, segment, &name_) < 0) { + throw errors::InvalidIpAddressException(); + } +} + +Name::Name(const std::string &uri, uint32_t segment) + : Name(uri.c_str(), segment) {} + +Name::Name(const std::string &uri) { + name_.type = HNT_UNSPEC; + utils::StringTokenizer tokenizer(uri, "|"); + std::string ip_address; + std::string seq_number; + + ip_address = tokenizer.nextToken(); + + try { + seq_number = tokenizer.nextToken(); + } catch (errors::TokenizerException &) { + seq_number = "0"; + } + + if (hicn_name_create(ip_address.c_str(), (uint32_t)atoi(seq_number.c_str()), + &name_) < 0) { + throw errors::InvalidIpAddressException(); + } +} + +Name::Name(const Name &name) { this->name_ = name.name_; } + +Name &Name::operator=(const Name &name) { + if (hicn_name_copy(&this->name_, &name.name_) < 0) { + throw errors::MalformedNameException(); + } + + return *this; +} + +bool Name::operator==(const Name &name) const { + return this->equals(name, true); +} + +bool Name::operator!=(const Name &name) const { + return !this->operator==(name); +} + +Name::operator bool() const { + return bool(hicn_name_empty((hicn_name_t *)&name_)); +} + +bool Name::equals(const Name &name, bool consider_segment) const { + return !hicn_name_compare(&name_, &name.name_, consider_segment); +} + +std::string Name::toString() const { + char *name = new char[100]; + int ret = hicn_name_ntop(&name_, name, standard_name_string_length); + if (ret < 0) { + throw errors::MalformedNameException(); + } + std::string name_string(name); + delete[] name; + + return name_string; +} + +uint32_t Name::getHash32(bool consider_suffix) const { + uint32_t hash; + if (hicn_name_hash(&name_, &hash, consider_suffix) < 0) { + throw errors::RuntimeException("Error computing the hash of the name!"); + } + return hash; +} + +void Name::clear() { name_.type = HNT_UNSPEC; }; + +Name::Type Name::getType() const { return name_.type; } + +uint32_t Name::getSuffix() const { + uint32_t ret = 0; + if (hicn_name_get_seq_number((hicn_name_t *)&name_, &ret) < 0) { + throw errors::RuntimeException( + "Impossible to retrieve the sequence number from the name."); + } + return ret; +} + +Name &Name::setSuffix(uint32_t seq_number) { + if (hicn_name_set_seq_number(&name_, seq_number) < 0) { + throw errors::RuntimeException( + "Impossible to set the sequence number to the name."); + } + + return *this; +} + +std::shared_ptr<Sockaddr> Name::getAddress() const { + Sockaddr *ret = nullptr; + + switch (name_.type) { + case HNT_CONTIGUOUS_V4: + case HNT_IOV_V4: + ret = (Sockaddr *)new Sockaddr4; + break; + case HNT_CONTIGUOUS_V6: + case HNT_IOV_V6: + ret = (Sockaddr *)new Sockaddr6; + break; + default: + throw errors::MalformedNameException(); + } + + if (hicn_name_to_sockaddr_address((hicn_name_t *)&name_, ret) < 0) { + throw errors::MalformedNameException(); + } + + return std::shared_ptr<Sockaddr>(ret); +} + +ip_prefix_t Name::toIpAddress() const { + ip_prefix_t ret; + std::memset(&ret, 0, sizeof(ret)); + + if (hicn_name_to_ip_prefix(&name_, &ret) < 0) { + throw errors::InvalidIpAddressException(); + } + + return ret; +} + +int Name::getAddressFamily() const { + int ret = 0; + + if (hicn_name_get_family(&name_, &ret) < 0) { + throw errors::InvalidIpAddressException(); + } + + return ret; +} + +void Name::copyToDestination(uint8_t *destination, bool include_suffix) const { + if (hicn_name_copy_to_destination(destination, &name_, include_suffix) < 0) { + throw errors::RuntimeException( + "Impossibe to copy the name into the " + "provided destination"); + } +} + +std::ostream &operator<<(std::ostream &os, const Name &name) { + const std::string &str = name.toString(); + // os << "core:/"; + os << str; + + return os; +} + +size_t hash<transport::core::Name>::operator()( + const transport::core::Name &name) const { + return name.getHash32(false); +} + +size_t compare2<transport::core::Name>::operator()( + const transport::core::Name &name1, + const transport::core::Name &name2) const { + return name1.equals(name2, false); +} + +} // end namespace core + +} // end namespace transport + +namespace std { +size_t hash<transport::core::Name>::operator()( + const transport::core::Name &name) const { + return name.getHash32(); +} + +} // end namespace std diff --git a/libtransport/src/core/packet.cc b/libtransport/src/core/packet.cc new file mode 100644 index 000000000..817f8de66 --- /dev/null +++ b/libtransport/src/core/packet.cc @@ -0,0 +1,607 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <hicn/transport/core/packet.h> +#include <hicn/transport/errors/malformed_packet_exception.h> +#include <hicn/transport/utils/hash.h> +#include <hicn/transport/utils/log.h> + +extern "C" { +#ifndef _WIN32 +TRANSPORT_CLANG_DISABLE_WARNING("-Wextern-c-compat") +#endif +#include <hicn/error.h> +} + +namespace transport { + +namespace core { + +const core::Name Packet::base_name("0::0|0"); + +Packet::Packet(Format format) + : packet_(utils::MemBuf::create(getHeaderSizeFromFormat(format, 256)) + .release()), + packet_start_(reinterpret_cast<hicn_header_t *>(packet_->writableData())), + header_head_(packet_.get()), + payload_head_(nullptr), + format_(format) { + if (hicn_packet_init_header(format, packet_start_) < 0) { + throw errors::RuntimeException("Unexpected error initializing the packet."); + } + + packet_->append(getHeaderSizeFromFormat(format_)); +} + +Packet::Packet(MemBufPtr &&buffer) + : packet_(std::move(buffer)), + packet_start_(reinterpret_cast<hicn_header_t *>(packet_->writableData())), + header_head_(packet_.get()), + payload_head_(nullptr), + format_(getFormatFromBuffer(packet_->writableData())) {} + +Packet::Packet(const uint8_t *buffer, std::size_t size) + : Packet(MemBufPtr(utils::MemBuf::copyBuffer(buffer, size).release())) {} + +Packet::Packet(Packet &&other) + : packet_(std::move(other.packet_)), + packet_start_(other.packet_start_), + header_head_(other.header_head_), + payload_head_(other.payload_head_), + format_(other.format_) { + other.packet_start_ = nullptr; + other.header_head_ = nullptr; + other.payload_head_ = nullptr; + other.format_ = HF_UNSPEC; +} + +Packet::~Packet() {} + +std::size_t Packet::getHeaderSizeFromFormat(Format format, + size_t signature_size) { + std::size_t header_length; + hicn_packet_get_header_length_from_format(format, &header_length); + int is_ah = _is_ah(format); + return is_ah * (header_length + signature_size) + (!is_ah) * header_length; +} + +std::size_t Packet::getHeaderSizeFromBuffer(Format format, + const uint8_t *buffer) { + size_t header_length; + if (hicn_packet_get_header_length(format, (hicn_header_t *)buffer, + &header_length) < 0) { + throw errors::MalformedPacketException(); + } + return header_length; +} + +bool Packet::isInterest(const uint8_t *buffer) { + bool is_interest = false; + + if (TRANSPORT_EXPECT_FALSE(hicn_packet_test_ece((const hicn_header_t *)buffer, + &is_interest) < 0)) { + throw errors::RuntimeException( + "Impossible to retrieve ece flag from packet"); + } + + return !is_interest; +} + +Packet::Format Packet::getFormatFromBuffer(const uint8_t *buffer) { + Format format = HF_UNSPEC; + + if (TRANSPORT_EXPECT_FALSE( + hicn_packet_get_format((const hicn_header_t *)buffer, &format) < 0)) { + throw errors::MalformedPacketException(); + } + + return format; +} + +std::size_t Packet::getPayloadSizeFromBuffer(Format format, + const uint8_t *buffer) { + std::size_t payload_length; + if (TRANSPORT_EXPECT_FALSE( + hicn_packet_get_payload_length(format, (hicn_header_t *)buffer, + &payload_length) < 0)) { + throw errors::MalformedPacketException(); + } + + return payload_length; +} + +void Packet::replace(MemBufPtr &&buffer) { + packet_ = std::move(buffer); + packet_start_ = reinterpret_cast<hicn_header_t *>(packet_->writableData()); + header_head_ = packet_.get(); + payload_head_ = nullptr; + format_ = getFormatFromBuffer(reinterpret_cast<uint8_t *>(packet_start_)); +} + +std::size_t Packet::payloadSize() const { + return getPayloadSizeFromBuffer(format_, + reinterpret_cast<uint8_t *>(packet_start_)); +} + +std::size_t Packet::headerSize() const { + return getHeaderSizeFromBuffer(format_, + reinterpret_cast<uint8_t *>(packet_start_)); +} + +Packet &Packet::appendPayload(std::unique_ptr<utils::MemBuf> &&payload) { + separateHeaderPayload(); + + if (!payload_head_) { + payload_head_ = payload.get(); + } + + header_head_->prependChain(std::move(payload)); + updateLength(); + return *this; +} + +Packet &Packet::appendPayload(const uint8_t *buffer, std::size_t length) { + return appendPayload(utils::MemBuf::copyBuffer(buffer, length)); +} + +Packet &Packet::appendHeader(std::unique_ptr<utils::MemBuf> &&header) { + separateHeaderPayload(); + + if (!payload_head_) { + header_head_->prependChain(std::move(header)); + } else { + payload_head_->prependChain(std::move(header)); + } + + updateLength(); + return *this; +} + +Packet &Packet::appendHeader(const uint8_t *buffer, std::size_t length) { + return appendHeader(utils::MemBuf::copyBuffer(buffer, length)); +} + +std::unique_ptr<utils::MemBuf> Packet::getPayload() const { + const_cast<Packet *>(this)->separateHeaderPayload(); + + // Hopefully the payload is contiguous + if (TRANSPORT_EXPECT_FALSE(payload_head_ && + payload_head_->next() != header_head_)) { + payload_head_->gather(payloadSize()); + } + + return payload_head_->cloneOne(); +} + +Packet &Packet::updateLength(std::size_t length) { + std::size_t total_length = length; + + for (utils::MemBuf *current = payload_head_; + current && current != header_head_; current = current->next()) { + total_length += current->length(); + } + + if (hicn_packet_set_payload_length(format_, packet_start_, total_length) < + 0) { + throw errors::RuntimeException("Error setting the packet payload."); + } + + return *this; +} + +PayloadType Packet::getPayloadType() const { + hicn_payload_type_t ret = HPT_UNSPEC; + + if (hicn_packet_get_payload_type(packet_start_, &ret) < 0) { + throw errors::RuntimeException("Impossible to retrieve payload type."); + } + + return PayloadType(ret); +} + +Packet &Packet::setPayloadType(PayloadType payload_type) { + if (hicn_packet_set_payload_type(packet_start_, + hicn_payload_type_t(payload_type)) < 0) { + throw errors::RuntimeException("Error setting payload type of the packet."); + } + + return *this; +} + +Packet::Format Packet::getFormat() const { + if (format_ == HF_UNSPEC) { + if (hicn_packet_get_format(packet_start_, &format_) < 0) { + throw errors::MalformedPacketException(); + } + } + + return format_; +} + +const std::shared_ptr<utils::MemBuf> Packet::acquireMemBufReference() const { + return packet_; +} + +void Packet::dump() const { + const_cast<Packet *>(this)->separateHeaderPayload(); + + std::cout << "HEADER -- Length: " << headerSize() << std::endl; + hicn_packet_dump((uint8_t *)header_head_->data(), headerSize()); + + std::cout << std::endl << "PAYLOAD -- Length: " << payloadSize() << std::endl; + for (utils::MemBuf *current = payload_head_; + current && current != header_head_; current = current->next()) { + std::cout << "MemBuf Length: " << current->length() << std::endl; + hicn_packet_dump((uint8_t *)current->data(), current->length()); + } +} + +void Packet::setSignatureSize(std::size_t size_bytes) { + int ret = hicn_packet_set_signature_size(format_, packet_start_, size_bytes); + + if (ret < 0) { + throw errors::RuntimeException("Packet without Authentication Header."); + } + + packet_->append(size_bytes); + updateLength(); +} + +uint8_t *Packet::getSignature() const { + uint8_t *signature; + int ret = hicn_packet_get_signature(format_, packet_start_, &signature); + + if (ret < 0) { + throw errors::RuntimeException("Packet without Authentication Header."); + } + + return signature; +} + +std::size_t Packet::getSignatureSize() const { + size_t size_bytes; + int ret = hicn_packet_get_signature_size(format_, packet_start_, &size_bytes); + + if (ret < 0) { + throw errors::RuntimeException("Packet without Authentication Header."); + } + + return size_bytes; +} + +void Packet::setSignatureTimestamp(const uint64_t ×tamp) { + int ret = + hicn_packet_set_signature_timestamp(format_, packet_start_, timestamp); + + if (ret < 0) { + throw errors::RuntimeException("Error setting the signature timestamp."); + } +} + +uint64_t Packet::getSignatureTimestamp() const { + uint64_t return_value; + int ret = hicn_packet_get_signature_timestamp(format_, packet_start_, + &return_value); + + if (ret < 0) { + throw errors::RuntimeException("Error getting the signature timestamp."); + } + + return return_value; +} + +void Packet::setValidationAlgorithm( + const utils::CryptoSuite &validation_algorithm) { + int ret = hicn_packet_set_validation_algorithm(format_, packet_start_, + uint8_t(validation_algorithm)); + + if (ret < 0) { + throw errors::RuntimeException("Error setting the validation algorithm."); + } +} + +utils::CryptoSuite Packet::getValidationAlgorithm() const { + uint8_t return_value; + int ret = hicn_packet_get_validation_algorithm(format_, packet_start_, + &return_value); + + if (ret < 0) { + throw errors::RuntimeException("Error getting the validation algorithm."); + } + + return utils::CryptoSuite(return_value); +} + +void Packet::setKeyId(const utils::KeyId &key_id) { + int ret = hicn_packet_set_key_id(format_, packet_start_, key_id.first); + + if (ret < 0) { + throw errors::RuntimeException("Error setting the key id."); + } +} + +utils::KeyId Packet::getKeyId() const { + utils::KeyId return_value; + int ret = hicn_packet_get_key_id(format_, packet_start_, &return_value.first, + &return_value.second); + + if (ret < 0) { + throw errors::RuntimeException("Error getting the validation algorithm."); + } + + return return_value; +} + +utils::CryptoHash Packet::computeDigest(HashAlgorithm algorithm) const { + utils::CryptoHasher hasher(static_cast<utils::CryptoHashType>(algorithm)); + hasher.init(); + + // Copy IP+TCP/ICMP header before zeroing them + hicn_header_t header_copy; + + hicn_packet_copy_header(format_, packet_start_, &header_copy, false); + + const_cast<Packet *>(this)->resetForHash(); + + auto current = header_head_; + do { + hasher.updateBytes(current->data(), current->length()); + current = current->next(); + } while (current != header_head_); + + hicn_packet_copy_header(format_, &header_copy, packet_start_, false); + + return hasher.finalize(); +} + +void Packet::setChecksum() { + uint16_t partial_csum = 0; + + for (utils::MemBuf *current = header_head_->next(); + current && current != header_head_; current = current->next()) { + if (partial_csum != 0) { + partial_csum = ~partial_csum; + } + partial_csum = csum(current->data(), current->length(), partial_csum); + } + if (hicn_packet_compute_header_checksum(format_, packet_start_, + partial_csum) < 0) { + throw errors::MalformedPacketException(); + } +} + +bool Packet::checkIntegrity() const { + if (hicn_packet_check_integrity(format_, packet_start_) < 0) { + return false; + } + + return true; +} + +Packet &Packet::setSyn() { + if (hicn_packet_set_syn(packet_start_) < 0) { + throw errors::RuntimeException("Error setting syn bit in the packet."); + } + + return *this; +} + +Packet &Packet::resetSyn() { + if (hicn_packet_reset_syn(packet_start_) < 0) { + throw errors::RuntimeException("Error resetting syn bit in the packet."); + } + + return *this; +} + +bool Packet::testSyn() const { + bool res = false; + if (hicn_packet_test_syn(packet_start_, &res) < 0) { + throw errors::RuntimeException("Error testing syn bit in the packet."); + } + + return res; +} + +Packet &Packet::setAck() { + if (hicn_packet_set_ack(packet_start_) < 0) { + throw errors::RuntimeException("Error setting ack bit in the packet."); + } + + return *this; +} + +Packet &Packet::resetAck() { + if (hicn_packet_reset_ack(packet_start_) < 0) { + throw errors::RuntimeException("Error resetting ack bit in the packet."); + } + + return *this; +} + +bool Packet::testAck() const { + bool res = false; + if (hicn_packet_test_ack(packet_start_, &res) < 0) { + throw errors::RuntimeException("Error testing ack bit in the packet."); + } + + return res; +} + +Packet &Packet::setRst() { + if (hicn_packet_set_rst(packet_start_) < 0) { + throw errors::RuntimeException("Error setting rst bit in the packet."); + } + + return *this; +} + +Packet &Packet::resetRst() { + if (hicn_packet_reset_rst(packet_start_) < 0) { + throw errors::RuntimeException("Error resetting rst bit in the packet."); + } + + return *this; +} + +bool Packet::testRst() const { + bool res = false; + if (hicn_packet_test_rst(packet_start_, &res) < 0) { + throw errors::RuntimeException("Error testing rst bit in the packet."); + } + + return res; +} + +Packet &Packet::setFin() { + if (hicn_packet_set_fin(packet_start_) < 0) { + throw errors::RuntimeException("Error setting fin bit in the packet."); + } + + return *this; +} + +Packet &Packet::resetFin() { + if (hicn_packet_reset_fin(packet_start_) < 0) { + throw errors::RuntimeException("Error resetting fin bit in the packet."); + } + + return *this; +} + +bool Packet::testFin() const { + bool res = false; + if (hicn_packet_test_fin(packet_start_, &res) < 0) { + throw errors::RuntimeException("Error testing fin bit in the packet."); + } + + return res; +} + +Packet &Packet::resetFlags() { + resetSyn(); + resetAck(); + resetRst(); + resetFin(); + + return *this; +} + +std::string Packet::printFlags() const { + std::string flags = ""; + if (testSyn()) { + flags += "S"; + } + if (testAck()) { + flags += "A"; + } + if (testRst()) { + flags += "R"; + } + if (testFin()) { + flags += "F"; + } + return flags; +} + +Packet &Packet::setSrcPort(uint16_t srcPort) { + if (hicn_packet_set_src_port(packet_start_, srcPort) < 0) { + throw errors::RuntimeException("Error setting source port in the packet."); + } + + return *this; +} + +Packet &Packet::setDstPort(uint16_t dstPort) { + if (hicn_packet_set_dst_port(packet_start_, dstPort) < 0) { + throw errors::RuntimeException( + "Error setting destination port in the packet."); + } + + return *this; +} + +uint16_t Packet::getSrcPort() const { + uint16_t port = 0; + + if (hicn_packet_get_src_port(packet_start_, &port) < 0) { + throw errors::RuntimeException("Error reading source port in the packet."); + } + + return port; +} + +uint16_t Packet::getDstPort() const { + uint16_t port = 0; + + if (hicn_packet_get_dst_port(packet_start_, &port) < 0) { + throw errors::RuntimeException( + "Error reading destination port in the packet."); + } + + return port; +} + +Packet &Packet::setTTL(uint8_t hops) { + if (hicn_packet_set_hoplimit(packet_start_, hops) < 0) { + throw errors::RuntimeException("Error setting TTL."); + } + + return *this; +} + +uint8_t Packet::getTTL() const { + uint8_t hops = 0; + if (hicn_packet_get_hoplimit(packet_start_, &hops) < 0) { + throw errors::RuntimeException("Error reading TTL."); + } + + return hops; +} + +void Packet::separateHeaderPayload() { + if (payload_head_) { + return; + } + + int signature_size = 0; + if (_is_ah(format_)) { + signature_size = (uint32_t)getSignatureSize(); + } + + auto header_size = getHeaderSizeFromFormat(format_, signature_size); + auto payload_length = packet_->length() - header_size; + + packet_->trimEnd(packet_->length()); + + auto payload = packet_->cloneOne(); + payload_head_ = payload.get(); + payload_head_->advance(header_size); + payload_head_->append(payload_length); + packet_->prependChain(std::move(payload)); + packet_->append(header_size); +} + +void Packet::resetPayload() { + if (packet_->isChained()) { + packet_->separateChain(packet_->next(), packet_->prev()); + payload_head_ = nullptr; + updateLength(); + } +} + +} // end namespace core + +} // end namespace transport diff --git a/libtransport/src/core/pending_interest.cc b/libtransport/src/core/pending_interest.cc new file mode 100644 index 000000000..fbe98cab5 --- /dev/null +++ b/libtransport/src/core/pending_interest.cc @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <core/pending_interest.h> + +namespace transport { + +namespace core { + +PendingInterest::PendingInterest() + : interest_(nullptr, nullptr), + timer_(), + on_content_object_callback_(), + on_interest_timeout_callback_() {} + +PendingInterest::PendingInterest(Interest::Ptr &&interest, + std::unique_ptr<asio::steady_timer> &&timer) + : interest_(std::move(interest)), + timer_(std::move(timer)), + on_content_object_callback_(), + on_interest_timeout_callback_() {} + +PendingInterest::PendingInterest( + Interest::Ptr &&interest, OnContentObjectCallback &&on_content_object, + OnInterestTimeoutCallback &&on_interest_timeout, + std::unique_ptr<asio::steady_timer> &&timer) + : interest_(std::move(interest)), + timer_(std::move(timer)), + on_content_object_callback_(std::move(on_content_object)), + on_interest_timeout_callback_(std::move(on_interest_timeout)) {} + +PendingInterest::~PendingInterest() {} + +void PendingInterest::cancelTimer() { timer_->cancel(); } + +void PendingInterest::setInterest(Interest::Ptr &&interest) { + interest_ = std::move(interest); +} + +Interest::Ptr &&PendingInterest::getInterest() { return std::move(interest_); } + +const OnContentObjectCallback &PendingInterest::getOnDataCallback() const { + return on_content_object_callback_; +} + +void PendingInterest::setOnContentObjectCallback( + OnContentObjectCallback &&on_content_object) { + PendingInterest::on_content_object_callback_ = on_content_object; +} + +const OnInterestTimeoutCallback &PendingInterest::getOnTimeoutCallback() const { + return on_interest_timeout_callback_; +} + +void PendingInterest::setOnTimeoutCallback( + OnInterestTimeoutCallback &&on_interest_timeout) { + PendingInterest::on_interest_timeout_callback_ = on_interest_timeout; +} + +} // end namespace core + +} // end namespace transport diff --git a/libtransport/src/core/pending_interest.h b/libtransport/src/core/pending_interest.h new file mode 100644 index 000000000..87fed5073 --- /dev/null +++ b/libtransport/src/core/pending_interest.h @@ -0,0 +1,93 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <hicn/transport/config.h> +#include <hicn/transport/core/content_object.h> +#include <hicn/transport/core/interest.h> +#include <hicn/transport/core/name.h> +#include <hicn/transport/portability/portability.h> + +#include <utils/deadline_timer.h> + +#include <asio/steady_timer.hpp> + +namespace transport { + +namespace core { + +class HicnForwarderInterface; +class VPPForwarderInterface; +class RawSocketInterface; + +template <typename ForwarderInt> +class Portal; + +typedef std::function<void(Interest::Ptr &&, ContentObject::Ptr &&)> + OnContentObjectCallback; +typedef std::function<void(Interest::Ptr &&)> OnInterestTimeoutCallback; +typedef std::function<void(const std::error_code &)> TimerCallback; + +class PendingInterest { + friend class Portal<HicnForwarderInterface>; + friend class Portal<VPPForwarderInterface>; + friend class Portal<RawSocketInterface>; + + public: + using Ptr = utils::ObjectPool<PendingInterest>::Ptr; + PendingInterest(); + + PendingInterest(Interest::Ptr &&interest, + std::unique_ptr<asio::steady_timer> &&timer); + + PendingInterest(Interest::Ptr &&interest, + OnContentObjectCallback &&on_content_object, + OnInterestTimeoutCallback &&on_interest_timeout, + std::unique_ptr<asio::steady_timer> &&timer); + + ~PendingInterest(); + + template <typename Handler> + TRANSPORT_ALWAYS_INLINE void startCountdown(Handler &&cb) { + timer_->expires_from_now( + std::chrono::milliseconds(interest_->getLifetime())); + timer_->async_wait(std::forward<Handler &&>(cb)); + } + + void cancelTimer(); + + Interest::Ptr &&getInterest(); + + void setInterest(Interest::Ptr &&interest); + + const OnContentObjectCallback &getOnDataCallback() const; + + void setOnContentObjectCallback(OnContentObjectCallback &&on_content_object); + + const OnInterestTimeoutCallback &getOnTimeoutCallback() const; + + void setOnTimeoutCallback(OnInterestTimeoutCallback &&on_interest_timeout); + + private: + Interest::Ptr interest_; + std::unique_ptr<asio::steady_timer> timer_; + OnContentObjectCallback on_content_object_callback_; + OnInterestTimeoutCallback on_interest_timeout_callback_; +}; + +} // end namespace core + +} // end namespace transport diff --git a/libtransport/src/core/portal.h b/libtransport/src/core/portal.h new file mode 100644 index 000000000..d7c463dfd --- /dev/null +++ b/libtransport/src/core/portal.h @@ -0,0 +1,696 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <hicn/transport/config.h> +#include <hicn/transport/core/content_object.h> +#include <hicn/transport/core/interest.h> +#include <hicn/transport/core/name.h> +#include <hicn/transport/core/prefix.h> +#include <hicn/transport/errors/errors.h> +#include <hicn/transport/portability/portability.h> +#include <hicn/transport/utils/log.h> + +#include <core/forwarder_interface.h> +#include <core/pending_interest.h> +#include <core/udp_socket_connector.h> + +#ifdef __vpp__ +#include <core/memif_connector.h> +#endif + +#include <asio.hpp> +#include <asio/steady_timer.hpp> +#include <future> +#include <memory> +#include <queue> +#include <unordered_map> + +#define UNSET_CALLBACK 0 + +namespace transport { +namespace core { + +namespace portal_details { + +static constexpr uint32_t pool_size = 2048; + +class HandlerMemory { +#ifdef __vpp__ + static constexpr std::size_t memory_size = 1024 * 1024; + + public: + HandlerMemory() : index_(0) {} + + HandlerMemory(const HandlerMemory &) = delete; + HandlerMemory &operator=(const HandlerMemory &) = delete; + + TRANSPORT_ALWAYS_INLINE void *allocate(std::size_t size) { + return &storage_[index_++ % memory_size]; + } + + TRANSPORT_ALWAYS_INLINE void deallocate(void *pointer) {} + + private: + // Storage space used for handler-based custom memory allocation. + typename std::aligned_storage<128>::type storage_[memory_size]; + uint32_t index_; +#else + public: + HandlerMemory() {} + + HandlerMemory(const HandlerMemory &) = delete; + HandlerMemory &operator=(const HandlerMemory &) = delete; + + TRANSPORT_ALWAYS_INLINE void *allocate(std::size_t size) { + return ::operator new(size); + } + + TRANSPORT_ALWAYS_INLINE void deallocate(void *pointer) { + ::operator delete(pointer); + } +#endif +}; + +// The allocator to be associated with the handler objects. This allocator only +// needs to satisfy the C++11 minimal allocator requirements. +template <typename T> +class HandlerAllocator { + public: + using value_type = T; + + explicit HandlerAllocator(HandlerMemory &mem) : memory_(mem) {} + + template <typename U> + HandlerAllocator(const HandlerAllocator<U> &other) noexcept + : memory_(other.memory_) {} + + TRANSPORT_ALWAYS_INLINE bool operator==(const HandlerAllocator &other) const + noexcept { + return &memory_ == &other.memory_; + } + + TRANSPORT_ALWAYS_INLINE bool operator!=(const HandlerAllocator &other) const + noexcept { + return &memory_ != &other.memory_; + } + + TRANSPORT_ALWAYS_INLINE T *allocate(std::size_t n) const { + return static_cast<T *>(memory_.allocate(sizeof(T) * n)); + } + + TRANSPORT_ALWAYS_INLINE void deallocate(T *p, std::size_t /*n*/) const { + return memory_.deallocate(p); + } + + private: + template <typename> + friend class HandlerAllocator; + + // The underlying memory. + HandlerMemory &memory_; +}; + +// Wrapper class template for handler objects to allow handler memory +// allocation to be customised. The allocator_type type and get_allocator() +// member function are used by the asynchronous operations to obtain the +// allocator. Calls to operator() are forwarded to the encapsulated handler. +template <typename Handler> +class CustomAllocatorHandler { + public: + using allocator_type = HandlerAllocator<Handler>; + + CustomAllocatorHandler(HandlerMemory &m, Handler h) + : memory_(m), handler_(h) {} + + allocator_type get_allocator() const noexcept { + return allocator_type(memory_); + } + + template <typename... Args> + void operator()(Args &&... args) { + handler_(std::forward<Args>(args)...); + } + + private: + HandlerMemory &memory_; + Handler handler_; +}; + +// Helper function to wrap a handler object to add custom allocation. +template <typename Handler> +inline CustomAllocatorHandler<Handler> makeCustomAllocatorHandler( + HandlerMemory &m, Handler h) { + return CustomAllocatorHandler<Handler>(m, h); +} + +class Pool { + public: + Pool(asio::io_service &io_service) : io_service_(io_service) { + increasePendingInterestPool(); + increaseInterestPool(); + increaseContentObjectPool(); + } + + TRANSPORT_ALWAYS_INLINE void increasePendingInterestPool() { + // Create pool of pending interests to reuse + for (uint32_t i = 0; i < pool_size; i++) { + pending_interests_pool_.add(new PendingInterest( + Interest::Ptr(nullptr), + std::make_unique<asio::steady_timer>(io_service_))); + } + } + + TRANSPORT_ALWAYS_INLINE void increaseInterestPool() { + // Create pool of interests to reuse + for (uint32_t i = 0; i < pool_size; i++) { + interest_pool_.add(new Interest()); + } + } + + TRANSPORT_ALWAYS_INLINE void increaseContentObjectPool() { + // Create pool of content object to reuse + for (uint32_t i = 0; i < pool_size; i++) { + content_object_pool_.add(new ContentObject()); + } + } + + PendingInterest::Ptr getPendingInterest() { + auto res = pending_interests_pool_.get(); + while (TRANSPORT_EXPECT_FALSE(!res.first)) { + increasePendingInterestPool(); + res = pending_interests_pool_.get(); + } + + return std::move(res.second); + } + + TRANSPORT_ALWAYS_INLINE ContentObject::Ptr getContentObject() { + auto res = content_object_pool_.get(); + while (TRANSPORT_EXPECT_FALSE(!res.first)) { + increaseContentObjectPool(); + res = content_object_pool_.get(); + } + + return std::move(res.second); + } + + TRANSPORT_ALWAYS_INLINE Interest::Ptr getInterest() { + auto res = interest_pool_.get(); + while (TRANSPORT_EXPECT_FALSE(!res.first)) { + increaseInterestPool(); + res = interest_pool_.get(); + } + + return std::move(res.second); + } + + private: + utils::ObjectPool<PendingInterest> pending_interests_pool_; + utils::ObjectPool<ContentObject> content_object_pool_; + utils::ObjectPool<Interest> interest_pool_; + asio::io_service &io_service_; +}; + +} // namespace portal_details + +using PendingInterestHashTable = + std::unordered_map<uint32_t, PendingInterest::Ptr>; + +template <typename PrefixType> +class BasicBindConfig { + static_assert(std::is_same<Prefix, PrefixType>::value, + "Prefix must be a Prefix type."); + + const uint32_t standard_cs_reserved = 5000; + + public: + template <typename T> + BasicBindConfig(T &&prefix) + : prefix_(std::forward<T &&>(prefix)), + content_store_reserved_(standard_cs_reserved) {} + + template <typename T> + BasicBindConfig(T &&prefix, uint32_t cs_reserved) + : prefix_(std::forward<T &&>(prefix)), + content_store_reserved_(cs_reserved) {} + + TRANSPORT_ALWAYS_INLINE const PrefixType &prefix() const { return prefix_; } + + TRANSPORT_ALWAYS_INLINE uint32_t csReserved() const { + return content_store_reserved_; + } + + private: + PrefixType prefix_; + uint32_t content_store_reserved_; +}; + +using BindConfig = BasicBindConfig<Prefix>; + +/** + * Portal is a opaque class which is used for sending/receiving interest/data + * packets over multiple kind of connector. The connector itself is defined by + * the template ForwarderInt, which is resolved at compile time. It is then not + * possible to decide at runtime what the connector will be. + * + * The tasks performed by portal are the following: + * - Sending/Receiving Interest packets + * - Sending/Receiving Data packets + * - Set timers (one per interest), in order to trigger events if an interest is + * not satisfied + * - Register a producer prefix to the local forwarder + * + * The way of working of portal is event-based, which means that data and + * interests are sent/received in a asynchronous manner, and the notifications + * are performed through callbacks. + * + * The portal class is not thread safe, appropriate locking is required by the + * users of this class. + */ +template <typename ForwarderInt> +class Portal { + static_assert( + std::is_base_of<ForwarderInterface<ForwarderInt, + typename ForwarderInt::ConnectorType>, + ForwarderInt>::value, + "ForwarderInt must inherit from ForwarderInterface!"); + + public: + /** + * Consumer callback is an abstract class containing two methods to be + * implemented by a consumer application. + */ + class ConsumerCallback { + public: + virtual void onContentObject(Interest::Ptr &&i, ContentObject::Ptr &&c) = 0; + virtual void onTimeout(Interest::Ptr &&i) = 0; + }; + + /** + * Producer callback is an abstract class containing two methods to be + * implemented by a producer application. + */ + class ProducerCallback { + public: + virtual void onInterest(Interest::Ptr &&i) = 0; + }; + + Portal() : Portal(internal_io_service_) {} + + Portal(asio::io_service &io_service) + : io_service_(io_service), + packet_pool_(io_service), + app_name_("libtransport_application"), + consumer_callback_(nullptr), + producer_callback_(nullptr), + connector_(std::bind(&Portal::processIncomingMessages, this, + std::placeholders::_1), + std::bind(&Portal::setLocalRoutes, this), io_service_, + app_name_), + forwarder_interface_(connector_) {} + + /** + * Set the consumer callback. + * + * @param consumer_callback - The pointer to the ConsumerCallback object. + */ + void setConsumerCallback(ConsumerCallback *consumer_callback) { + consumer_callback_ = consumer_callback; + } + + /** + * Set the producer callback. + * + * @param producer_callback - The pointer to the ProducerCallback object. + */ + void setProducerCallback(ProducerCallback *producer_callback) { + producer_callback_ = producer_callback; + } + + /** + * Specify the output interface to use. This method will be useful in a future + * scenario where the library will be able to forward packets without + * connecting to a local forwarder. Now it is not used. + * + * @param output_interface - The output interface to use for + * forwarding/receiving packets. + */ + TRANSPORT_ALWAYS_INLINE void setOutputInterface( + const std::string &output_interface) { + forwarder_interface_.setOutputInterface(output_interface); + } + + /** + * Connect the transport to the local hicn forwarder. + * + * @param is_consumer - Boolean specifying if the application on top of portal + * is a consumer or a producer. + */ + TRANSPORT_ALWAYS_INLINE void connect(bool is_consumer = true) { + pending_interest_hash_table_.reserve(portal_details::pool_size); + forwarder_interface_.connect(is_consumer); + } + + /** + * Destructor. + */ + ~Portal() { killConnection(); } + + /** + * Check if there is already a pending interest for a given name. + * + * @param name - The interest name. + */ + TRANSPORT_ALWAYS_INLINE bool interestIsPending(const Name &name) { + auto it = + pending_interest_hash_table_.find(name.getHash32() + name.getSuffix()); + if (it != pending_interest_hash_table_.end()) { + return true; + } + + return false; + } + + /** + * Send an interest through to the local forwarder. + * + * @param interest - The pointer to the interest. The ownership of the + * interest is transferred by the caller to portal. + * + * @param on_content_object_callback - If the caller wishes to use a different + * callback to be called for this interest, it can set this parameter. + * Otherwise ConsumerCallback::onContentObject will be used. + * + * @param on_interest_timeout_callback - If the caller wishes to use a + * different callback to be called for this interest, it can set this + * parameter. Otherwise ConsumerCallback::onTimeout will be used. + */ + TRANSPORT_ALWAYS_INLINE void sendInterest( + Interest::Ptr &&interest, + OnContentObjectCallback &&on_content_object_callback = UNSET_CALLBACK, + OnInterestTimeoutCallback &&on_interest_timeout_callback = + UNSET_CALLBACK) { + uint32_t hash = + interest->getName().getHash32() + interest->getName().getSuffix(); + // Send it + forwarder_interface_.send(*interest); + + auto pending_interest = packet_pool_.getPendingInterest(); + pending_interest->setInterest(std::move(interest)); + pending_interest->setOnContentObjectCallback( + std::move(on_content_object_callback)); + pending_interest->setOnTimeoutCallback( + std::move(on_interest_timeout_callback)); + pending_interest->startCountdown(portal_details::makeCustomAllocatorHandler( + async_callback_memory_, std::bind(&Portal<ForwarderInt>::timerHandler, + this, std::placeholders::_1, hash))); + + auto it = pending_interest_hash_table_.find(hash); + if (it != pending_interest_hash_table_.end()) { + it->second->cancelTimer(); + + // Get reference to interest packet in order to have it destroyed. + auto _int = it->second->getInterest(); + it->second = std::move(pending_interest); + } else { + pending_interest_hash_table_[hash] = std::move(pending_interest); + } + } + + /** + * Handler fot the timer set when the interest is sent. + * + * @param ec - Error code which says whether the timer expired or has been + * canceled upon data packet reception. + * + * @param hash - The index of the interest in the pending interest hash table. + */ + TRANSPORT_ALWAYS_INLINE void timerHandler(const std::error_code &ec, + uint32_t hash) { + bool is_stopped = io_service_.stopped(); + if (TRANSPORT_EXPECT_FALSE(is_stopped)) { + return; + } + + if (TRANSPORT_EXPECT_TRUE(!ec)) { + PendingInterestHashTable::iterator it = + pending_interest_hash_table_.find(hash); + if (it != pending_interest_hash_table_.end()) { + PendingInterest::Ptr ptr = std::move(it->second); + pending_interest_hash_table_.erase(it); + auto _int = ptr->getInterest(); + + if (ptr->getOnTimeoutCallback() != UNSET_CALLBACK) { + ptr->on_interest_timeout_callback_(std::move(_int)); + } else if (consumer_callback_) { + consumer_callback_->onTimeout(std::move(_int)); + } + } + } + } + + /** + * Register a producer name to the local forwarder and optionally set the + * content store size in a per-face manner. + * + * @param config - The configuration for the local forwarder binding. + */ + TRANSPORT_ALWAYS_INLINE void bind(const BindConfig &config) { + forwarder_interface_.setContentStoreSize(config.csReserved()); + served_namespaces_.push_back(config.prefix()); + + setLocalRoutes(); + } + + /** + * Start the event loop. This function blocks here and calls the callback set + * by the application upon interest/data received or timeout. + */ + TRANSPORT_ALWAYS_INLINE void runEventsLoop() { + if (io_service_.stopped()) { + io_service_.reset(); // ensure that run()/poll() will do some work + } + + io_service_.run(); + } + + /** + * Run one event and return. + */ + TRANSPORT_ALWAYS_INLINE void runOneEvent() { + if (io_service_.stopped()) { + io_service_.reset(); // ensure that run()/poll() will do some work + } + + io_service_.run_one(); + } + + /** + * Send a data packet to the local forwarder. As opposite to sendInterest, the + * ownership of the content object is not transferred to the portal. + * + * @param content_object - The data packet. + */ + TRANSPORT_ALWAYS_INLINE void sendContentObject( + ContentObject &content_object) { + forwarder_interface_.send(content_object); + } + + /** + * Stop the event loop, canceling all the pending events in the event queue. + * + * Beware that stopping the event loop DOES NOT disconnect the transport from + * the local forwarder, the connector underneath will stay connected. + */ + TRANSPORT_ALWAYS_INLINE void stopEventsLoop() { + if (!io_service_.stopped()) { + io_service_.dispatch([this]() { + clear(); + io_service_.stop(); + }); + } + } + + /** + * Disconnect the transport from the local forwarder. + */ + TRANSPORT_ALWAYS_INLINE void killConnection() { + forwarder_interface_.closeConnection(); + } + + /** + * Clear the pending interest hash table. + */ + TRANSPORT_ALWAYS_INLINE void clear() { + if (!io_service_.stopped()) { + io_service_.dispatch(std::bind(&Portal::doClear, this)); + } else { + doClear(); + } + } + + /** + * Get a reference to the io_service object. + */ + TRANSPORT_ALWAYS_INLINE asio::io_service &getIoService() { + return io_service_; + } + + /** + * Register a route to the local forwarder. + */ + TRANSPORT_ALWAYS_INLINE void registerRoute(Prefix &prefix) { + served_namespaces_.push_back(prefix); + if (connector_.isConnected()) { + forwarder_interface_.registerRoute(prefix); + } + } + + private: + /** + * Clear the pending interest hash table. + */ + TRANSPORT_ALWAYS_INLINE void doClear() { + for (auto &pend_interest : pending_interest_hash_table_) { + pend_interest.second->cancelTimer(); + + // Get interest packet from pending interest and do nothing with it. It + // will get destroyed as it goes out of scope. + auto _int = pend_interest.second->getInterest(); + } + + pending_interest_hash_table_.clear(); + } + + /** + * Callback called by the underlying connector upon reception of a packet from + * the local forwarder. + * + * @param packet_buffer - The bytes of the packet. + */ + TRANSPORT_ALWAYS_INLINE void processIncomingMessages( + Packet::MemBufPtr &&packet_buffer) { + bool is_stopped = io_service_.stopped(); + if (TRANSPORT_EXPECT_FALSE(is_stopped)) { + return; + } + + if (TRANSPORT_EXPECT_FALSE( + ForwarderInt::isControlMessage(packet_buffer->data()))) { + processControlMessage(std::move(packet_buffer)); + return; + } + + Packet::Format format = Packet::getFormatFromBuffer(packet_buffer->data()); + + if (TRANSPORT_EXPECT_TRUE(_is_tcp(format))) { + if (!Packet::isInterest(packet_buffer->data())) { + auto content_object = packet_pool_.getContentObject(); + content_object->replace(std::move(packet_buffer)); + processContentObject(std::move(content_object)); + } else { + auto interest = packet_pool_.getInterest(); + interest->replace(std::move(packet_buffer)); + processInterest(std::move(interest)); + } + } else { + TRANSPORT_LOGE("Received not supported packet. Ignoring it."); + } + } + + /** + * Callback called by the transport upon connection to the local forwarder. + * It register the prefixes in the served_namespaces_ list to the local + * forwarder. + */ + TRANSPORT_ALWAYS_INLINE void setLocalRoutes() { + for (auto &prefix : served_namespaces_) { + if (connector_.isConnected()) { + forwarder_interface_.registerRoute(prefix); + } + } + } + + TRANSPORT_ALWAYS_INLINE void processInterest(Interest::Ptr &&interest) { + // Interest for a producer + if (TRANSPORT_EXPECT_TRUE(producer_callback_ != nullptr)) { + producer_callback_->onInterest(std::move(interest)); + } + } + + /** + * Process a content object: + * - Check if the data packet was effectively requested by portal + * - Delete its timer + * - Pass packet to application + * + * @param content_object - The data packet + */ + TRANSPORT_ALWAYS_INLINE void processContentObject( + ContentObject::Ptr &&content_object) { + uint32_t hash = content_object->getName().getHash32() + + content_object->getName().getSuffix(); + + auto it = pending_interest_hash_table_.find(hash); + if (it != pending_interest_hash_table_.end()) { + PendingInterest::Ptr interest_ptr = std::move(it->second); + pending_interest_hash_table_.erase(it); + interest_ptr->cancelTimer(); + auto _int = interest_ptr->getInterest(); + + if (interest_ptr->getOnDataCallback() != UNSET_CALLBACK) { + interest_ptr->on_content_object_callback_(std::move(_int), + std::move(content_object)); + } else if (consumer_callback_) { + consumer_callback_->onContentObject(std::move(_int), + std::move(content_object)); + } + } + } + + /** + * Process a control message. Control messages are different depending on the + * connector, then the forwarder_interface will do the job of understanding + * them. + */ + TRANSPORT_ALWAYS_INLINE void processControlMessage( + Packet::MemBufPtr &&packet_buffer) { + forwarder_interface_.processControlMessageReply(std::move(packet_buffer)); + } + + private: + asio::io_service &io_service_; + asio::io_service internal_io_service_; + portal_details::Pool packet_pool_; + + std::string app_name_; + + PendingInterestHashTable pending_interest_hash_table_; + std::list<Prefix> served_namespaces_; + + ConsumerCallback *consumer_callback_; + ProducerCallback *producer_callback_; + + portal_details::HandlerMemory async_callback_memory_; + + typename ForwarderInt::ConnectorType connector_; + ForwarderInt forwarder_interface_; +}; + +} // namespace core + +} // end namespace transport diff --git a/libtransport/src/core/prefix.cc b/libtransport/src/core/prefix.cc new file mode 100644 index 000000000..59898ab70 --- /dev/null +++ b/libtransport/src/core/prefix.cc @@ -0,0 +1,338 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <hicn/transport/core/prefix.h> +#include <hicn/transport/errors/errors.h> +#include <hicn/transport/utils/string_tokenizer.h> + +#ifndef _WIN32 +extern "C" { +#include <arpa/inet.h> +} +#else +#include <hicn/transport/portability/win_portability.h> +#endif + +#include <cstring> +#include <memory> +#include <random> + +#include <openssl/rand.h> + +namespace transport { + +namespace core { + +Prefix::Prefix() { std::memset(&ip_prefix_, 0, sizeof(ip_prefix_t)); } + +Prefix::Prefix(const char *prefix) : Prefix(std::string(prefix)) {} + +Prefix::Prefix(std::string &&prefix) : Prefix(prefix) {} + +Prefix::Prefix(const std::string &prefix) { + utils::StringTokenizer st(prefix, "/"); + + std::string ip_address = st.nextToken(); + int family = get_addr_family(ip_address.c_str()); + + std::string prefix_length = family == AF_INET6 ? "128" : "32"; + + if (st.hasMoreTokens()) { + prefix_length = st.nextToken(); + } + + buildPrefix(ip_address, uint16_t(atoi(prefix_length.c_str())), family); +} + +Prefix::Prefix(std::string &prefix, uint16_t prefix_length) { + int family = get_addr_family(prefix.c_str()); + buildPrefix(prefix, prefix_length, family); +} + +Prefix::Prefix(const core::Name &content_name, uint16_t prefix_length) { + int family = content_name.getAddressFamily(); + + if (!checkPrefixLengthAndAddressFamily(prefix_length, family)) { + throw errors::InvalidIpAddressException(); + } + + ip_prefix_ = content_name.toIpAddress(); + ip_prefix_.len = prefix_length; + ip_prefix_.family = family; +} + +void Prefix::buildPrefix(std::string &prefix, uint16_t prefix_length, + int family) { + if (!checkPrefixLengthAndAddressFamily(prefix_length, family)) { + throw errors::InvalidIpAddressException(); + } + + int ret; + switch (family) { + case AF_INET: + ret = inet_pton(AF_INET, prefix.c_str(), ip_prefix_.address.v4.buffer); + break; + case AF_INET6: + ret = inet_pton(AF_INET6, prefix.c_str(), ip_prefix_.address.v6.buffer); + break; + default: + throw errors::InvalidIpAddressException(); + } + + if (ret != 1) { + throw errors::InvalidIpAddressException(); + } + + ip_prefix_.len = prefix_length; + ip_prefix_.family = family; +} + +std::unique_ptr<Sockaddr> Prefix::toSockaddr() { + Sockaddr *ret = nullptr; + + switch (ip_prefix_.family) { + case AF_INET6: + ret = (Sockaddr *)new Sockaddr6; + break; + case AF_INET: + ret = (Sockaddr *)new Sockaddr4; + break; + default: + throw errors::InvalidIpAddressException(); + } + + if (ip_prefix_to_sockaddr(&ip_prefix_, ret) < 0) { + throw errors::InvalidIpAddressException(); + } + + return std::unique_ptr<Sockaddr>(ret); +} + +uint16_t Prefix::getPrefixLength() { return ip_prefix_.len; } + +Prefix &Prefix::setPrefixLength(uint16_t prefix_length) { + ip_prefix_.len = prefix_length; + return *this; +} + +int Prefix::getAddressFamily() { return ip_prefix_.family; } + +Prefix &Prefix::setAddressFamily(int address_family) { + ip_prefix_.family = address_family; + return *this; +} + +std::string Prefix::getNetwork() const { + if (!checkPrefixLengthAndAddressFamily(ip_prefix_.len, ip_prefix_.family)) { + throw errors::InvalidIpAddressException(); + } + + std::size_t size = + ip_prefix_.family == 4 + AF_INET ? INET_ADDRSTRLEN : INET6_ADDRSTRLEN; + + std::string network(size, 0); + + if (ip_prefix_ntop_short(&ip_prefix_, (char *)network.c_str(), size) < 0) { + throw errors::RuntimeException( + "Impossible to retrieve network from ip address."); + } + + return network; +} + +int Prefix::contains(const ip_address_t &content_name) const { + int res = + ip_address_cmp(&content_name, &(ip_prefix_.address), ip_prefix_.family); + + if (ip_prefix_.len != (ip_prefix_.family == AF_INET6 ? IPV6_ADDR_LEN_BITS + : IPV4_ADDR_LEN_BITS)) { + const u8 *ip_prefix_buffer = + ip_address_get_buffer(&(ip_prefix_.address), ip_prefix_.family); + const u8 *content_name_buffer = + ip_address_get_buffer(&content_name, ip_prefix_.family); + uint8_t mask = 0xFF >> (ip_prefix_.len % 8); + mask = ~mask; + + res += (ip_prefix_buffer[ip_prefix_.len] & mask) == + (content_name_buffer[ip_prefix_.len] & mask); + } + + return res; +} + +int Prefix::contains(const core::Name &content_name) const { + return contains(content_name.toIpAddress().address); +} + +Name Prefix::getName() const { + std::string s(getNetwork()); + return Name(s); +} + +/* + * Mask is used to apply the components to a content name that belong to this + * prefix + */ +Name Prefix::getName(const core::Name &mask, const core::Name &components, + const core::Name &content_name) const { + if (ip_prefix_.family != mask.getAddressFamily() || + ip_prefix_.family != components.getAddressFamily() || + ip_prefix_.family != content_name.getAddressFamily()) + throw errors::RuntimeException( + "Prefix, mask, components and content name are not of the same address " + "family"); + + ip_address_t mask_ip = mask.toIpAddress().address; + ip_address_t component_ip = components.toIpAddress().address; + ip_address_t name_ip = content_name.toIpAddress().address; + const u8 *mask_ip_buffer = ip_address_get_buffer(&mask_ip, ip_prefix_.family); + const u8 *component_ip_buffer = + ip_address_get_buffer(&component_ip, ip_prefix_.family); + u8 *name_ip_buffer = + const_cast<u8 *>(ip_address_get_buffer(&name_ip, ip_prefix_.family)); + + int addr_len = ip_prefix_.family == AF_INET6 ? IPV6_ADDR_LEN : IPV4_ADDR_LEN; + + for (int i = 0; i < addr_len; i++) { + if (mask_ip_buffer[i]) { + name_ip_buffer[i] = component_ip_buffer[i] & mask_ip_buffer[i]; + } + } + + if (this->contains(name_ip)) + throw errors::RuntimeException("Mask overrides the prefix"); + return Name(ip_prefix_.family, (uint8_t *)&name_ip); +} + +Name Prefix::getRandomName() const { + ip_address_t name_ip = ip_prefix_.address; + u8 *name_ip_buffer = + const_cast<u8 *>(ip_address_get_buffer(&name_ip, ip_prefix_.family)); + + int addr_len = + (ip_prefix_.family == AF_INET6 ? IPV6_ADDR_LEN * 8 : IPV4_ADDR_LEN * 8) - + ip_prefix_.len; + + size_t size = (size_t)ceil((float)addr_len / 8.0); + uint8_t buffer[size]; + + RAND_bytes(buffer, size); + + int j = 0; + for (uint8_t i = (uint8_t)ceil((float)ip_prefix_.len / 8.0); + i < (ip_prefix_.family == AF_INET6 ? IPV6_ADDR_LEN : IPV4_ADDR_LEN); + i++) { + name_ip_buffer[i] = buffer[j]; + j++; + } + + return Name(ip_prefix_.family, (uint8_t *)&name_ip); +} + +/* + * Map a name in a different name prefix to this name prefix + */ +Name Prefix::mapName(const core::Name &content_name) const { + if (ip_prefix_.family != content_name.getAddressFamily()) + throw errors::RuntimeException( + "Prefix content name are not of the same address " + "family"); + + ip_address_t name_ip = content_name.toIpAddress().address; + const u8 *ip_prefix_buffer = + ip_address_get_buffer(&(ip_prefix_.address), ip_prefix_.family); + u8 *name_ip_buffer = + const_cast<u8 *>(ip_address_get_buffer(&name_ip, ip_prefix_.family)); + + memcpy(name_ip_buffer, ip_prefix_buffer, ip_prefix_.len / 8); + + if (ip_prefix_.len != (ip_prefix_.family == AF_INET6 ? IPV6_ADDR_LEN_BITS + : IPV4_ADDR_LEN_BITS)) { + uint8_t mask = 0xFF >> (ip_prefix_.len % 8); + name_ip_buffer[ip_prefix_.len / 8 + 1] = + (name_ip_buffer[ip_prefix_.len / 8 + 1] & mask) | + (ip_prefix_buffer[ip_prefix_.len / 8 + 1] & ~mask); + } + + return Name(ip_prefix_.family, (uint8_t *)&name_ip); +} + +Prefix &Prefix::setNetwork(std::string &network) { + if (!inet_pton(AF_INET6, network.c_str(), ip_prefix_.address.v6.buffer)) { + throw errors::RuntimeException("The network name is not valid."); + } + + return *this; +} + +Name Prefix::makeRandomName() const { + srand((unsigned int)time(nullptr)); + + if (ip_prefix_.family == AF_INET6) { + std::default_random_engine eng((std::random_device())()); + std::uniform_int_distribution<uint32_t> idis( + 0, std::numeric_limits<uint32_t>::max()); + uint64_t random_number = idis(eng); + + uint32_t hash_size_bits = IPV6_ADDR_LEN_BITS - ip_prefix_.len; + uint64_t ip_address[2]; + memcpy(ip_address, ip_prefix_.address.v6.buffer, sizeof(uint64_t)); + memcpy(ip_address + 1, ip_prefix_.address.v6.buffer + 8, sizeof(uint64_t)); + std::string network(IPV6_ADDR_LEN * 3, 0); + + // Let's do the magic ;) + int shift_size = hash_size_bits > sizeof(random_number) * 8 + ? sizeof(random_number) * 8 + : hash_size_bits; + + ip_address[1] >>= shift_size; + ip_address[1] <<= shift_size; + + ip_address[1] |= random_number >> (sizeof(uint64_t) * 8 - shift_size); + + if (!inet_ntop(ip_prefix_.family, ip_address, (char *)network.c_str(), + IPV6_ADDR_LEN * 3)) { + throw errors::RuntimeException( + "Impossible to retrieve network from ip address."); + } + + return Name(network); + } + + return Name(); +} + +bool Prefix::checkPrefixLengthAndAddressFamily(uint16_t prefix_length, + int family) { + // First check the family + if (family != AF_INET6 && family != AF_INET) { + return false; + } + + int max_addr_len_bits = + family == AF_INET6 ? IPV6_ADDR_LEN_BITS : IPV4_ADDR_LEN_BITS; + + if (prefix_length > max_addr_len_bits) { + return false; + } + + return true; +} + +ip_prefix_t &Prefix::toIpPrefixStruct() { return ip_prefix_; } + +} // namespace core + +} // namespace transport diff --git a/libtransport/src/core/raw_socket_connector.cc b/libtransport/src/core/raw_socket_connector.cc new file mode 100644 index 000000000..4d780959b --- /dev/null +++ b/libtransport/src/core/raw_socket_connector.cc @@ -0,0 +1,203 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <hicn/transport/utils/conversions.h> +#include <hicn/transport/utils/log.h> + +#include <core/raw_socket_connector.h> + +#include <net/if.h> +#include <netdb.h> +#include <stdio.h> +#include <string.h> +#include <sys/ioctl.h> +#include <sys/socket.h> + +#define MY_DEST_MAC0 0x0a +#define MY_DEST_MAC1 0x7b +#define MY_DEST_MAC2 0x7c +#define MY_DEST_MAC3 0x1c +#define MY_DEST_MAC4 0x4a +#define MY_DEST_MAC5 0x14 + +namespace transport { + +namespace core { + +RawSocketConnector::RawSocketConnector( + PacketReceivedCallback &&receive_callback, + OnReconnect &&on_reconnect_callback, asio::io_service &io_service, + std::string app_name) + : Connector(std::move(receive_callback), std::move(on_reconnect_callback)), + io_service_(io_service), + socket_(io_service_, raw_protocol(PF_PACKET, SOCK_RAW)), + // resolver_(io_service_), + timer_(io_service_), + read_msg_(packet_pool_.makePtr(nullptr)), + data_available_(false), + app_name_(app_name) { + memset(&link_layer_address_, 0, sizeof(link_layer_address_)); +} + +RawSocketConnector::~RawSocketConnector() {} + +void RawSocketConnector::connect(const std::string &interface_name, + const std::string &mac_address_str) { + state_ = ConnectorState::CONNECTING; + memset(ðernet_header_, 0, sizeof(ethernet_header_)); + struct ifreq ifr; + struct ifreq if_mac; + uint8_t mac_address[6]; + + utils::convertStringToMacAddress(mac_address_str, mac_address); + + // Get interface mac address + int fd = static_cast<int>(socket_.native_handle()); + + /* Get the index of the interface to send on */ + memset(&ifr, 0, sizeof(struct ifreq)); + strncpy(ifr.ifr_name, interface_name.c_str(), interface_name.size()); + + // if (ioctl(fd, SIOCGIFINDEX, &if_idx) < 0) { + // perror("SIOCGIFINDEX"); + // } + + /* Get the MAC address of the interface to send on */ + memset(&if_mac, 0, sizeof(struct ifreq)); + strncpy(if_mac.ifr_name, interface_name.c_str(), interface_name.size()); + if (ioctl(fd, SIOCGIFHWADDR, &if_mac) < 0) { + perror("SIOCGIFHWADDR"); + throw errors::RuntimeException("Interface does not exist"); + } + + /* Ethernet header */ + for (int i = 0; i < 6; i++) { + ethernet_header_.ether_shost[i] = + ((uint8_t *)&if_mac.ifr_hwaddr.sa_data)[i]; + ethernet_header_.ether_dhost[i] = mac_address[i]; + } + + /* Ethertype field */ + ethernet_header_.ether_type = htons(ETH_P_IPV6); + + strcpy(ifr.ifr_name, interface_name.c_str()); + + if (0 == ioctl(fd, SIOCGIFHWADDR, &ifr)) { + memcpy(link_layer_address_.sll_addr, ifr.ifr_hwaddr.sa_data, 6); + } + + // memset(&ifr, 0, sizeof(ifr)); + // ioctl(fd, SIOCGIFFLAGS, &ifr); + // ifr.ifr_flags |= IFF_PROMISC; + // ioctl(fd, SIOCSIFFLAGS, &ifr); + + link_layer_address_.sll_family = AF_PACKET; + link_layer_address_.sll_protocol = htons(ETH_P_ALL); + link_layer_address_.sll_ifindex = if_nametoindex(interface_name.c_str()); + link_layer_address_.sll_hatype = 1; + link_layer_address_.sll_halen = 6; + + // startConnectionTimer(); + doConnect(); + doRecvPacket(); +} + +void RawSocketConnector::send(const uint8_t *packet, std::size_t len, + const PacketSentCallback &packet_sent) { + if (packet_sent != 0) { + socket_.async_send( + asio::buffer(packet, len), + [packet_sent](std::error_code ec, std::size_t /*length*/) { + packet_sent(); + }); + } else { + if (state_ == ConnectorState::CONNECTED) { + socket_.send(asio::buffer(packet, len)); + } + } +} + +void RawSocketConnector::send(const Packet::MemBufPtr &packet) { + io_service_.post([this, packet]() { + bool write_in_progress = !output_buffer_.empty(); + output_buffer_.push_back(std::move(packet)); + if (TRANSPORT_EXPECT_TRUE(state_ == ConnectorState::CONNECTED)) { + if (!write_in_progress) { + doSendPacket(); + } else { + // Tell the handle connect it has data to write + data_available_ = true; + } + } + }); +} + +void RawSocketConnector::close() { + io_service_.post([this]() { socket_.close(); }); +} + +void RawSocketConnector::doSendPacket() { + auto packet = output_buffer_.front().get(); + auto array = std::vector<asio::const_buffer>(); + + const utils::MemBuf *current = packet; + do { + array.push_back(asio::const_buffer(current->data(), current->length())); + current = current->next(); + } while (current != packet); + + socket_.async_send( + std::move(array), + [this /*, packet*/](std::error_code ec, std::size_t bytes_transferred) { + if (TRANSPORT_EXPECT_TRUE(!ec)) { + output_buffer_.pop_front(); + if (!output_buffer_.empty()) { + doSendPacket(); + } + } else { + TRANSPORT_LOGE("%d %s", ec.value(), ec.message().c_str()); + } + }); +} + +void RawSocketConnector::doRecvPacket() { + read_msg_ = getPacket(); + socket_.async_receive( + asio::buffer(read_msg_->writableData(), packet_size), + [this](std::error_code ec, std::size_t bytes_transferred) mutable { + if (!ec) { + // Ignore packets that are not for us + uint8_t *dst_mac_address = const_cast<uint8_t *>(read_msg_->data()); + if (!std::memcmp(dst_mac_address, ethernet_header_.ether_shost, + ETHER_ADDR_LEN)) { + read_msg_->append(bytes_transferred); + read_msg_->trimStart(sizeof(struct ether_header)); + receive_callback_(std::move(read_msg_)); + } + } else { + TRANSPORT_LOGE("%d %s", ec.value(), ec.message().c_str()); + } + doRecvPacket(); + }); +} + +void RawSocketConnector::doConnect() { + state_ = ConnectorState::CONNECTED; + socket_.bind(raw_endpoint(&link_layer_address_, sizeof(link_layer_address_))); +} + +} // end namespace core + +} // end namespace transport diff --git a/libtransport/src/core/raw_socket_connector.h b/libtransport/src/core/raw_socket_connector.h new file mode 100644 index 000000000..1d4e9cb39 --- /dev/null +++ b/libtransport/src/core/raw_socket_connector.h @@ -0,0 +1,81 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <hicn/transport/config.h> +#include <hicn/transport/core/name.h> + +#include <core/connector.h> + +#include <linux/if_packet.h> +#include <net/ethernet.h> +#include <sys/socket.h> +#include <asio.hpp> +#include <asio/steady_timer.hpp> +#include <deque> + +namespace transport { + +namespace core { + +using asio::generic::raw_protocol; +using raw_endpoint = asio::generic::basic_endpoint<raw_protocol>; + +class RawSocketConnector : public Connector { + public: + RawSocketConnector(PacketReceivedCallback &&receive_callback, + OnReconnect &&reconnect_callback, + asio::io_service &io_service, + std::string app_name = "Libtransport"); + + ~RawSocketConnector() override; + + void send(const Packet::MemBufPtr &packet) override; + + void send(const uint8_t *packet, std::size_t len, + const PacketSentCallback &packet_sent = 0) override; + + void close() override; + + void connect(const std::string &interface_name, + const std::string &mac_address_str); + + private: + void doConnect(); + + void doRecvPacket(); + + void doSendPacket(); + + private: + asio::io_service &io_service_; + raw_protocol::socket socket_; + + struct ether_header ethernet_header_; + + struct sockaddr_ll link_layer_address_; + + asio::steady_timer timer_; + + utils::ObjectPool<utils::MemBuf>::Ptr read_msg_; + + bool data_available_; + std::string app_name_; +}; + +} // end namespace core + +} // end namespace transport diff --git a/libtransport/src/core/raw_socket_interface.cc b/libtransport/src/core/raw_socket_interface.cc new file mode 100644 index 000000000..7ee2a844d --- /dev/null +++ b/libtransport/src/core/raw_socket_interface.cc @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <hicn/transport/utils/linux.h> + +#include <core/raw_socket_interface.h> + +#include <fstream> + +namespace transport { + +namespace core { + +static std::string config_folder_path = "/etc/transport/interface.conf.d"; + +RawSocketInterface::RawSocketInterface(RawSocketConnector &connector) + : ForwarderInterface<RawSocketInterface, RawSocketConnector>(connector) {} + +RawSocketInterface::~RawSocketInterface() {} + +void RawSocketInterface::connect(bool is_consumer) { + std::string complete_filename = + config_folder_path + std::string("/") + output_interface_; + + std::ifstream is(complete_filename); + std::string interface; + + if (is) { + is >> remote_mac_address_; + } + + // Get interface ip address + struct sockaddr_in6 address = {0}; + utils::retrieveInterfaceAddress(output_interface_, &address); + + std::memcpy(&inet6_address_.v6.as_u8, &address.sin6_addr, + sizeof(address.sin6_addr)); + connector_.connect(output_interface_, remote_mac_address_); +} + +void RawSocketInterface::registerRoute(Prefix &prefix) { return; } + +} // namespace core + +} // namespace transport diff --git a/libtransport/src/core/raw_socket_interface.h b/libtransport/src/core/raw_socket_interface.h new file mode 100644 index 000000000..c06d14637 --- /dev/null +++ b/libtransport/src/core/raw_socket_interface.h @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <hicn/transport/core/prefix.h> + +#include <core/forwarder_interface.h> +#include <core/raw_socket_connector.h> + +#include <atomic> +#include <deque> + +namespace transport { + +namespace core { + +class RawSocketInterface + : public ForwarderInterface<RawSocketInterface, RawSocketConnector> { + public: + typedef RawSocketConnector ConnectorType; + + RawSocketInterface(RawSocketConnector &connector); + + ~RawSocketInterface(); + + void connect(bool is_consumer); + + void registerRoute(Prefix &prefix); + + std::uint16_t getMtu() { return interface_mtu; } + + TRANSPORT_ALWAYS_INLINE static bool isControlMessageImpl( + const uint8_t *message) { + return false; + } + + TRANSPORT_ALWAYS_INLINE void processControlMessageReplyImpl( + Packet::MemBufPtr &&packet_buffer) {} + + TRANSPORT_ALWAYS_INLINE void closeConnection(){}; + + private: + static constexpr std::uint16_t interface_mtu = 1500; + std::string remote_mac_address_; +}; + +} // namespace core + +} // namespace transport diff --git a/libtransport/src/core/tcp_socket_connector.cc b/libtransport/src/core/tcp_socket_connector.cc new file mode 100644 index 000000000..58df8fb08 --- /dev/null +++ b/libtransport/src/core/tcp_socket_connector.cc @@ -0,0 +1,282 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <core/tcp_socket_connector.h> +#ifdef _WIN32 +#include <portability/win_portability.h> +#endif + +#include <hicn/transport/errors/errors.h> +#include <hicn/transport/utils/log.h> +#include <hicn/transport/utils/object_pool.h> + +#include <thread> +#include <vector> + +namespace transport { + +namespace core { + +namespace { +class NetworkMessage { + public: + static constexpr std::size_t fixed_header_length = 10; + + static std::size_t decodeHeader(const uint8_t *packet) { + // General checks + // CCNX Control packet format + uint8_t first_byte = packet[0]; + uint8_t ip_format = (packet[0] & 0xf0) >> 4; + + if (TRANSPORT_EXPECT_FALSE(first_byte == 102)) { + // Get packet length + return 44; + } else if (TRANSPORT_EXPECT_TRUE(ip_format == 6 || ip_format == 4)) { + Packet::Format format = Packet::getFormatFromBuffer(packet); + return Packet::getHeaderSizeFromBuffer(format, packet) + + Packet::getPayloadSizeFromBuffer(format, packet); + } + + return 0; + } +}; +} // namespace + +TcpSocketConnector::TcpSocketConnector( + PacketReceivedCallback &&receive_callback, + OnReconnect &&on_reconnect_callback, asio::io_service &io_service, + std::string app_name) + : Connector(std::move(receive_callback), std::move(on_reconnect_callback)), + io_service_(io_service), + socket_(io_service_), + resolver_(io_service_), + timer_(io_service_), + read_msg_(packet_pool_.makePtr(nullptr)), + is_reconnection_(false), + data_available_(false), + app_name_(app_name) {} + +TcpSocketConnector::~TcpSocketConnector() {} + +void TcpSocketConnector::connect(std::string ip_address, std::string port) { + endpoint_iterator_ = resolver_.resolve( + {ip_address, port, asio::ip::resolver_query_base::numeric_service}); + + state_ = ConnectorState::CONNECTING; + doConnect(); +} + +void TcpSocketConnector::send(const uint8_t *packet, std::size_t len, + const PacketSentCallback &packet_sent) { + if (packet_sent != 0) { + asio::async_write(socket_, asio::buffer(packet, len), + [packet_sent](std::error_code ec, + std::size_t /*length*/) { packet_sent(); }); + } else { + if (state_ == ConnectorState::CONNECTED) { + asio::write(socket_, asio::buffer(packet, len)); + } + } +} + +void TcpSocketConnector::send(const Packet::MemBufPtr &packet) { + io_service_.post([this, packet]() { + bool write_in_progress = !output_buffer_.empty(); + output_buffer_.push_back(std::move(packet)); + if (TRANSPORT_EXPECT_TRUE(state_ == ConnectorState::CONNECTED)) { + if (!write_in_progress) { + doWrite(); + } + } else { + // Tell the handle connect it has data to write + data_available_ = true; + } + }); +} + +void TcpSocketConnector::close() { + if (state_ != ConnectorState::CLOSED) { + state_ = ConnectorState::CLOSED; + if (socket_.is_open()) { + socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both); + socket_.close(); + } + } +} + +void TcpSocketConnector::doWrite() { +#if 1 + auto array = std::vector<asio::const_buffer>(); + std::vector<Packet::MemBufPtr> packet_store(packet_store_size); + uint8_t i = 0; + + utils::MemBuf *packet = nullptr; + const utils::MemBuf *current = nullptr; + // Send vectors of 32 packets + while (!output_buffer_.empty() && i++ < packet_store_size) { + packet_store[i] = output_buffer_.front(); + output_buffer_.pop_front(); + packet = packet_store[i].get(); + current = packet; + do { + array.push_back(asio::const_buffer(current->data(), current->length())); + current = current->next(); + } while (current != packet); + } +#else + auto packet = output_buffer_.front().get(); + auto array = std::vector<asio::const_buffer>(); + + const utils::MemBuf *current = packet; + do { + array.push_back(asio::const_buffer(current->data(), current->length())); + current = current->next(); + } while (current != packet); +#endif + + asio::async_write( + socket_, std::move(array), + [this, packet_store = std::move(packet_store)](std::error_code ec, + std::size_t length) { + if (TRANSPORT_EXPECT_TRUE(!ec)) { + if (!output_buffer_.empty()) { + doWrite(); + } + } else if (ec.value() == + static_cast<int>(std::errc::operation_canceled)) { + // The connection has been closed by the application. + return; + } else { + TRANSPORT_LOGE("%d %s", ec.value(), ec.message().c_str()); + tryReconnect(); + } + }); +} + +void TcpSocketConnector::doReadBody(std::size_t body_length) { + asio::async_read( + socket_, asio::buffer(read_msg_->writableTail(), body_length), + asio::transfer_exactly(body_length), + [this](std::error_code ec, std::size_t length) { + read_msg_->append(length); + if (TRANSPORT_EXPECT_TRUE(!ec)) { + receive_callback_(std::move(read_msg_)); + doReadHeader(); + } else if (ec.value() == + static_cast<int>(std::errc::operation_canceled)) { + // The connection has been closed by the application. + return; + } else { + TRANSPORT_LOGE("%d %s", ec.value(), ec.message().c_str()); + tryReconnect(); + } + }); +} + +void TcpSocketConnector::doReadHeader() { + read_msg_ = getPacket(); + asio::async_read( + socket_, + asio::buffer(read_msg_->writableData(), + NetworkMessage::fixed_header_length), + asio::transfer_exactly(NetworkMessage::fixed_header_length), + [this](std::error_code ec, std::size_t length) { + if (TRANSPORT_EXPECT_TRUE(!ec)) { + read_msg_->append(NetworkMessage::fixed_header_length); + std::size_t body_length = 0; + if ((body_length = NetworkMessage::decodeHeader(read_msg_->data())) > + 0) { + doReadBody(body_length - length); + } else { + TRANSPORT_LOGE("Decoding error. Ignoring packet."); + } + } else if (ec.value() == + static_cast<int>(std::errc::operation_canceled)) { + // The connection has been closed by the application. + return; + } else { + TRANSPORT_LOGE("%d %s", ec.value(), ec.message().c_str()); + tryReconnect(); + } + }); +} + +void TcpSocketConnector::tryReconnect() { + if (state_ == ConnectorState::CONNECTED) { + TRANSPORT_LOGE("Connection lost. Trying to reconnect...\n"); + state_ = ConnectorState::CONNECTING; + is_reconnection_ = true; + io_service_.post([this]() { + if (socket_.is_open()) { + socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both); + socket_.close(); + } + startConnectionTimer(); + doConnect(); + }); + } +} + +void TcpSocketConnector::doConnect() { + asio::async_connect( + socket_, endpoint_iterator_, + [this](std::error_code ec, tcp::resolver::iterator) { + if (!ec) { + timer_.cancel(); + state_ = ConnectorState::CONNECTED; + asio::ip::tcp::no_delay noDelayOption(true); + socket_.set_option(noDelayOption); + doReadHeader(); + + if (data_available_) { + data_available_ = false; + doWrite(); + } + + if (is_reconnection_) { + is_reconnection_ = false; + TRANSPORT_LOGI("Connection recovered!\n"); + on_reconnect_callback_(); + } + } else { + doConnect(); + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + } + }); +} + +bool TcpSocketConnector::checkConnected() { + return state_ == ConnectorState::CONNECTED; +} + +void TcpSocketConnector::startConnectionTimer() { + timer_.expires_from_now(std::chrono::seconds(60)); + timer_.async_wait(std::bind(&TcpSocketConnector::handleDeadline, this, + std::placeholders::_1)); +} + +void TcpSocketConnector::handleDeadline(const std::error_code &ec) { + if (!ec) { + io_service_.post([this]() { + socket_.close(); + TRANSPORT_LOGE("Error connecting. Is the forwarder running?\n"); + io_service_.stop(); + }); + } +} + +} // end namespace core + +} // end namespace transport diff --git a/libtransport/src/core/tcp_socket_connector.h b/libtransport/src/core/tcp_socket_connector.h new file mode 100644 index 000000000..c57123e9f --- /dev/null +++ b/libtransport/src/core/tcp_socket_connector.h @@ -0,0 +1,87 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <hicn/transport/config.h> +#include <hicn/transport/core/name.h> +#include <hicn/transport/utils/branch_prediction.h> + +#include <core/connector.h> + +#include <asio.hpp> +#include <asio/steady_timer.hpp> +#include <deque> + +namespace transport { +namespace core { + +using asio::ip::tcp; + +class TcpSocketConnector : public Connector { + static constexpr uint16_t packet_store_size = 32; + + public: + TcpSocketConnector(PacketReceivedCallback &&receive_callback, + OnReconnect &&reconnect_callback, + asio::io_service &io_service, + std::string app_name = "Libtransport"); + + ~TcpSocketConnector() override; + + void send(const Packet::MemBufPtr &packet) override; + + void send(const uint8_t *packet, std::size_t len, + const PacketSentCallback &packet_sent = 0) override; + + void close() override; + + void connect(std::string ip_address = "127.0.0.1", std::string port = "9695"); + + private: + void doConnect(); + + void doReadHeader(); + + void doReadBody(std::size_t body_length); + + void doWrite(); + + bool checkConnected(); + + private: + void handleDeadline(const std::error_code &ec); + + void startConnectionTimer(); + + void tryReconnect(); + + asio::io_service &io_service_; + asio::ip::tcp::socket socket_; + asio::ip::tcp::resolver resolver_; + asio::ip::tcp::resolver::iterator endpoint_iterator_; + asio::steady_timer timer_; + + utils::ObjectPool<utils::MemBuf>::Ptr read_msg_; + + bool is_reconnection_; + bool data_available_; + + std::string app_name_; +}; + +} // end namespace core + +} // end namespace transport diff --git a/libtransport/src/core/test/CMakeLists.txt b/libtransport/src/core/test/CMakeLists.txt new file mode 100644 index 000000000..48c50e9b0 --- /dev/null +++ b/libtransport/src/core/test/CMakeLists.txt @@ -0,0 +1,10 @@ +# Enable gcov output for the tests +add_definitions(--coverage) +set(CMAKE_EXE_LINKER_FLAGS ${CMAKE_EXE_LINKER_FLAGS} " --coverage") + +set(TestsExpectedToPass + test_core_manifest) + +foreach(test ${TestsExpectedToPass}) + AddTest(${test}) +endforeach()
\ No newline at end of file diff --git a/libtransport/src/core/test/test_core_manifest.cc b/libtransport/src/core/test/test_core_manifest.cc new file mode 100644 index 000000000..58563d8f9 --- /dev/null +++ b/libtransport/src/core/test/test_core_manifest.cc @@ -0,0 +1,296 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <gtest/gtest.h> + +#include "../manifest_format_fixed.h" +#include "../manifest_inline.h" + +#include <test.h> +#include <random> +#include <vector> + +namespace transport { + +namespace core { + +namespace { +// The fixture for testing class Foo. +class ManifestTest : public ::testing::Test { + protected: + using ContentObjectManifest = ManifestInline<ContentObject, Fixed>; + + ManifestTest() : name_("b001::123|321"), manifest1_(name_) { + // You can do set-up work for each test here. + } + + virtual ~ManifestTest() { + // You can do clean-up work that doesn't throw exceptions here. + } + + // If the constructor and destructor are not enough for setting up + // and cleaning up each test, you can define the following methods: + + virtual void SetUp() { + // Code here will be called immediately after the constructor (right + // before each test). + } + + virtual void TearDown() { + // Code here will be called immediately after each test (right + // before the destructor). + } + + Name name_; + ContentObjectManifest manifest1_; + + std::vector<uint8_t> manifest_payload = { + 0x11, 0x11, 0x01, 0x00, 0xb0, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xde, 0xad // , 0x00, 0x00, + // 0x00, 0x45, 0xa3, + // 0xd1, 0xf2, 0x2b, + // 0x94, 0x41, 0x22, + // 0xc9, 0x00, 0x00, + // 0x00, 0x44, 0xa3, + // 0xd1, 0xf2, 0x2b, + // 0x94, 0x41, 0x22, + // 0xc8 + }; +}; + +} // namespace + +TEST_F(ManifestTest, ManifestCreate) { + ContentObjectManifest manifest2(name_); + ContentObjectManifest manifest3 = manifest2; + + EXPECT_EQ(manifest1_, manifest2); + EXPECT_EQ(manifest1_, manifest3); +} + +TEST_F(ManifestTest, ManifestCreateFromBase) { + ContentObject content_object(name_); + content_object.setPayload(manifest_payload.data(), manifest_payload.size()); + ContentObjectManifest manifest(std::move(content_object)); + + auto manifest4 = ContentObjectManifest::createManifest( + name_, core::ManifestVersion::VERSION_1, + core::ManifestType::INLINE_MANIFEST, HashAlgorithm::SHA_256, true, + core::Name("b001::dead"), + core::NextSegmentCalculationStrategy::INCREMENTAL, 128); + + manifest4->encode(); + manifest4->dump(); + manifest.dump(); + + EXPECT_EQ(manifest1_, manifest); + // EXPECT_EQ(manifest1_, manifest3); +} + +TEST_F(ManifestTest, SetLastManifest) { + manifest1_.clear(); + + manifest1_.setFinalManifest(true); + manifest1_.encode(); + manifest1_.decode(); + bool fcn = manifest1_.isFinalManifest(); + + ASSERT_TRUE(fcn); +} + +TEST_F(ManifestTest, SetManifestType) { + manifest1_.clear(); + + ManifestType type1 = ManifestType::INLINE_MANIFEST; + ManifestType type2 = ManifestType::FLIC_MANIFEST; + + manifest1_.setManifestType(type1); + manifest1_.encode(); + manifest1_.decode(); + ManifestType type_returned1 = manifest1_.getManifestType(); + + manifest1_.clear(); + + manifest1_.setManifestType(type2); + manifest1_.encode(); + manifest1_.decode(); + ManifestType type_returned2 = manifest1_.getManifestType(); + + ASSERT_EQ(type1, type_returned1); + ASSERT_EQ(type2, type_returned2); +} + +TEST_F(ManifestTest, SetHashAlgorithm) { + manifest1_.clear(); + + HashAlgorithm hash1 = HashAlgorithm::SHA_512; + HashAlgorithm hash2 = HashAlgorithm::CRC32C; + HashAlgorithm hash3 = HashAlgorithm::SHA_256; + + manifest1_.setHashAlgorithm(hash1); + manifest1_.encode(); + manifest1_.decode(); + HashAlgorithm type_returned1 = manifest1_.getHashAlgorithm(); + + manifest1_.clear(); + + manifest1_.setHashAlgorithm(hash2); + manifest1_.encode(); + manifest1_.decode(); + HashAlgorithm type_returned2 = manifest1_.getHashAlgorithm(); + + manifest1_.clear(); + + manifest1_.setHashAlgorithm(hash3); + manifest1_.encode(); + manifest1_.decode(); + HashAlgorithm type_returned3 = manifest1_.getHashAlgorithm(); + + ASSERT_EQ(hash1, type_returned1); + ASSERT_EQ(hash2, type_returned2); + ASSERT_EQ(hash3, type_returned3); +} + +TEST_F(ManifestTest, SetNextSegmentCalculationStrategy) { + manifest1_.clear(); + + NextSegmentCalculationStrategy strategy1 = + NextSegmentCalculationStrategy::INCREMENTAL; + + manifest1_.setNextSegmentCalculationStrategy(strategy1); + manifest1_.encode(); + manifest1_.decode(); + NextSegmentCalculationStrategy type_returned1 = + manifest1_.getNextSegmentCalculationStrategy(); + + ASSERT_EQ(strategy1, type_returned1); +} + +TEST_F(ManifestTest, SetBaseName) { + manifest1_.clear(); + + core::Name base_name("b001::dead"); + manifest1_.setBaseName(base_name); + manifest1_.encode(); + manifest1_.decode(); + core::Name ret_name = manifest1_.getBaseName(); + + ASSERT_EQ(base_name, ret_name); +} + +TEST_F(ManifestTest, SetSuffixList) { + manifest1_.clear(); + + core::Name base_name("b001::dead"); + + using random_bytes_engine = + std::independent_bits_engine<std::default_random_engine, CHAR_BIT, + unsigned char>; + random_bytes_engine rbe; + + std::default_random_engine eng((std::random_device())()); + std::uniform_int_distribution<uint64_t> idis( + 0, std::numeric_limits<uint32_t>::max()); + + auto entries = new std::pair<uint32_t, utils::CryptoHash>[3]; + uint32_t suffixes[3]; + std::vector<unsigned char> data[3]; + + for (int i = 0; i < 3; i++) { + data[i].resize(32); + std::generate(std::begin(data[i]), std::end(data[i]), std::ref(rbe)); + suffixes[i] = idis(eng); + entries[i] = std::make_pair( + suffixes[i], utils::CryptoHash(data[i].data(), data[i].size(), + utils::CryptoHashType::SHA_256)); + manifest1_.addSuffixHash(entries[i].first, entries[i].second); + } + + manifest1_.setBaseName(base_name); + + manifest1_.encode(); + manifest1_.decode(); + + core::Name ret_name = manifest1_.getBaseName(); + + // auto & hash_list = manifest1_.getSuffixHashList(); + + bool cond; + int i = 0; + + // for (auto & item : manifest1_.getSuffixList()) { + // auto hash = manifest1_.getHash(suffixes[i]); + // cond = utils::CryptoHash::compareBinaryDigest(hash, + // entries[i].second.getDigest<uint8_t>().data(), + // entries[i].second.getType()); + // ASSERT_TRUE(cond); + // i++; + // } + + ASSERT_EQ(base_name, ret_name); + + delete[] entries; +} + +TEST_F(ManifestTest, EstimateSize) { + manifest1_.clear(); + + HashAlgorithm hash1 = HashAlgorithm::SHA_256; + NextSegmentCalculationStrategy strategy1 = + NextSegmentCalculationStrategy::INCREMENTAL; + ManifestType type1 = ManifestType::INLINE_MANIFEST; + core::Name base_name1("b001:abcd:fede:baba:cece:d0d0:face:dead"); + + manifest1_.setFinalManifest(true); + manifest1_.setBaseName(base_name1); + manifest1_.setNextSegmentCalculationStrategy(strategy1); + manifest1_.setHashAlgorithm(hash1); + manifest1_.setManifestType(type1); + + std::default_random_engine eng((std::random_device())()); + std::uniform_int_distribution<uint64_t> idis( + 0, std::numeric_limits<uint64_t>::max()); + + using random_bytes_engine = + std::independent_bits_engine<std::default_random_engine, CHAR_BIT, + unsigned char>; + random_bytes_engine rbe; + + while (manifest1_.estimateManifestSize(1) < 1440) { + uint32_t suffix = static_cast<std::uint32_t>(idis(eng)); + std::vector<unsigned char> data(32); + std::generate(std::begin(data), std::end(data), std::ref(rbe)); + auto hash = utils::CryptoHash(data.data(), data.size(), + utils::CryptoHashType::SHA_256); + manifest1_.addSuffixHash(suffix, hash); + } + + manifest1_.encode(); + manifest1_.decode(); + + manifest1_.dump(); + + ASSERT_GT(manifest1_.estimateManifestSize(), 0); + ASSERT_LT(manifest1_.estimateManifestSize(), 1500); +} + +} // namespace core + +} // namespace transport + +int main(int argc, char **argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +}
\ No newline at end of file diff --git a/libtransport/src/core/udp_socket_connector.cc b/libtransport/src/core/udp_socket_connector.cc new file mode 100644 index 000000000..ec59c2e64 --- /dev/null +++ b/libtransport/src/core/udp_socket_connector.cc @@ -0,0 +1,224 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifdef _WIN32 +#include <portability/win_portability.h> +#endif + +#include <hicn/transport/errors/errors.h> +#include <hicn/transport/utils/log.h> +#include <hicn/transport/utils/object_pool.h> + +#include <core/udp_socket_connector.h> + +#include <thread> +#include <vector> + +namespace transport { + +namespace core { + +UdpSocketConnector::UdpSocketConnector( + PacketReceivedCallback &&receive_callback, + OnReconnect &&on_reconnect_callback, asio::io_service &io_service, + std::string app_name) + : Connector(std::move(receive_callback), std::move(on_reconnect_callback)), + io_service_(io_service), + socket_(io_service_), + resolver_(io_service_), + connection_timer_(io_service_), + read_msg_(packet_pool_.makePtr(nullptr)), + is_reconnection_(false), + data_available_(false), + app_name_(app_name) {} + +UdpSocketConnector::~UdpSocketConnector() {} + +void UdpSocketConnector::connect(std::string ip_address, std::string port) { + endpoint_iterator_ = resolver_.resolve( + {ip_address, port, asio::ip::resolver_query_base::numeric_service}); + + state_ = ConnectorState::CONNECTING; + doConnect(); +} + +void UdpSocketConnector::send(const uint8_t *packet, std::size_t len, + const PacketSentCallback &packet_sent) { + if (packet_sent != 0) { + socket_.async_send( + asio::buffer(packet, len), + [packet_sent](std::error_code ec, std::size_t /*length*/) { + packet_sent(); + }); + } else { + if (state_ == ConnectorState::CONNECTED) { + try { + socket_.send(asio::buffer(packet, len)); + } catch (std::system_error &err) { + TRANSPORT_LOGE( + "Sending of disconnect message to forwarder failed. Reason: %s", + err.what()); + } + } + } +} + +void UdpSocketConnector::send(const Packet::MemBufPtr &packet) { + io_service_.post([this, packet]() { + bool write_in_progress = !output_buffer_.empty(); + output_buffer_.push_back(std::move(packet)); + if (TRANSPORT_EXPECT_TRUE(state_ == ConnectorState::CONNECTED)) { + if (!write_in_progress) { + doWrite(); + } + } else { + // Tell the handle connect it has data to write + data_available_ = true; + } + }); +} + +void UdpSocketConnector::close() { + if (io_service_.stopped()) { + doClose(); + } else { + io_service_.dispatch(std::bind(&UdpSocketConnector::doClose, this)); + } +} + +void UdpSocketConnector::doClose() { + if (state_ != ConnectorState::CLOSED) { + state_ = ConnectorState::CLOSED; + if (socket_.is_open()) { + socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both); + socket_.close(); + } + } +} + +void UdpSocketConnector::doWrite() { + auto packet = output_buffer_.front().get(); + auto array = std::vector<asio::const_buffer>(); + + const utils::MemBuf *current = packet; + do { + array.push_back(asio::const_buffer(current->data(), current->length())); + current = current->next(); + } while (current != packet); + + socket_.async_send(std::move(array), [this](std::error_code ec, + std::size_t length) { + if (TRANSPORT_EXPECT_TRUE(!ec)) { + output_buffer_.pop_front(); + if (!output_buffer_.empty()) { + doWrite(); + } + } else if (ec.value() == static_cast<int>(std::errc::operation_canceled)) { + // The connection has been closed by the application. + return; + } else { + TRANSPORT_LOGE("%d %s", ec.value(), ec.message().c_str()); + tryReconnect(); + } + }); +} + +void UdpSocketConnector::doRead() { + read_msg_ = getPacket(); + socket_.async_receive( + asio::buffer(read_msg_->writableData(), Connector::packet_size), + [this](std::error_code ec, std::size_t length) { + if (TRANSPORT_EXPECT_TRUE(!ec)) { + read_msg_->append(length); + receive_callback_(std::move(read_msg_)); + doRead(); + } else if (ec.value() == + static_cast<int>(std::errc::operation_canceled)) { + // The connection has been closed by the application. + return; + } else { + TRANSPORT_LOGE("%d %s", ec.value(), ec.message().c_str()); + tryReconnect(); + } + }); +} + +void UdpSocketConnector::tryReconnect() { + if (state_ == ConnectorState::CONNECTED) { + TRANSPORT_LOGE("Connection lost. Trying to reconnect...\n"); + state_ = ConnectorState::CONNECTING; + is_reconnection_ = true; + io_service_.post([this]() { + if (socket_.is_open()) { + socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both); + socket_.close(); + } + + doConnect(); + startConnectionTimer(); + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + }); + } +} + +void UdpSocketConnector::doConnect() { + asio::async_connect( + socket_, endpoint_iterator_, + [this](std::error_code ec, udp::resolver::iterator) { + if (!ec) { + connection_timer_.cancel(); + state_ = ConnectorState::CONNECTED; + doRead(); + + if (data_available_) { + data_available_ = false; + doWrite(); + } + + if (is_reconnection_) { + is_reconnection_ = false; + } + + on_reconnect_callback_(); + } else { + doConnect(); + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + } + }); +} + +bool UdpSocketConnector::checkConnected() { + return state_ == ConnectorState::CONNECTED; +} + +void UdpSocketConnector::startConnectionTimer() { + connection_timer_.expires_from_now(std::chrono::seconds(60)); + connection_timer_.async_wait(std::bind(&UdpSocketConnector::handleDeadline, + this, std::placeholders::_1)); +} + +void UdpSocketConnector::handleDeadline(const std::error_code &ec) { + if (!ec) { + io_service_.post([this]() { + socket_.close(); + TRANSPORT_LOGE("Error connecting. Is the forwarder running?\n"); + io_service_.stop(); + }); + } +} + +} // end namespace core + +} // end namespace transport diff --git a/libtransport/src/core/udp_socket_connector.h b/libtransport/src/core/udp_socket_connector.h new file mode 100644 index 000000000..5fdb6aeec --- /dev/null +++ b/libtransport/src/core/udp_socket_connector.h @@ -0,0 +1,85 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <hicn/transport/config.h> +#include <hicn/transport/core/name.h> +#include <hicn/transport/utils/branch_prediction.h> + +#include <core/connector.h> + +#include <asio.hpp> +#include <asio/steady_timer.hpp> +#include <deque> + +namespace transport { +namespace core { + +using asio::ip::udp; + +class UdpSocketConnector : public Connector { + public: + UdpSocketConnector(PacketReceivedCallback &&receive_callback, + OnReconnect &&reconnect_callback, + asio::io_service &io_service, + std::string app_name = "Libtransport"); + + ~UdpSocketConnector() override; + + void send(const Packet::MemBufPtr &packet) override; + + void send(const uint8_t *packet, std::size_t len, + const PacketSentCallback &packet_sent = 0) override; + + void close() override; + + void connect(std::string ip_address = "127.0.0.1", std::string port = "9695"); + + private: + void doConnect(); + + void doRead(); + + void doWrite(); + + void doClose(); + + bool checkConnected(); + + private: + void handleDeadline(const std::error_code &ec); + + void startConnectionTimer(); + + void tryReconnect(); + + asio::io_service &io_service_; + asio::ip::udp::socket socket_; + asio::ip::udp::resolver resolver_; + asio::ip::udp::resolver::iterator endpoint_iterator_; + asio::steady_timer connection_timer_; + + utils::ObjectPool<utils::MemBuf>::Ptr read_msg_; + + bool is_reconnection_; + bool data_available_; + + std::string app_name_; +}; + +} // end namespace core + +} // end namespace transport diff --git a/libtransport/src/core/vpp_forwarder_interface.cc b/libtransport/src/core/vpp_forwarder_interface.cc new file mode 100644 index 000000000..7b4298592 --- /dev/null +++ b/libtransport/src/core/vpp_forwarder_interface.cc @@ -0,0 +1,213 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <hicn/transport/config.h> + +#ifdef __vpp__ + +#include <core/hicn_vapi.h> +#include <core/memif_vapi.h> +#include <core/vpp_forwarder_interface.h> + +extern "C" { +#include <memif/libmemif.h> +}; + +typedef enum { MASTER = 0, SLAVE = 1 } memif_role_t; + +#define MEMIF_DEFAULT_RING_SIZE 2048 +#define MEMIF_DEFAULT_RX_QUEUES 1 +#define MEMIF_DEFAULT_TX_QUEUES 1 +#define MEMIF_DEFAULT_BUFFER_SIZE 2048 + +namespace transport { + +namespace core { + +std::mutex VPPForwarderInterface::global_lock_; + +VPPForwarderInterface::VPPForwarderInterface(MemifConnector &connector) + : ForwarderInterface<VPPForwarderInterface, MemifConnector>(connector), + sw_if_index_(~0), + face_id1_(~0), + face_id2_(~0), + is_consumer_(false) {} + +VPPForwarderInterface::~VPPForwarderInterface() {} + +/** + * @brief Create a memif interface in the local VPP forwarder. + */ +uint32_t VPPForwarderInterface::getMemifConfiguration() { + memif_create_params_t input_params = {0}; + + int ret = + memif_vapi_get_next_memif_id(VPPForwarderInterface::sock_, &memif_id_); + + if (ret < 0) { + throw errors::RuntimeException( + "Error getting next memif id. Could not create memif interface."); + } + + input_params.id = memif_id_; + input_params.role = memif_role_t::MASTER; + input_params.mode = memif_interface_mode_t::MEMIF_INTERFACE_MODE_IP; + input_params.rx_queues = MEMIF_DEFAULT_RX_QUEUES; + input_params.tx_queues = MEMIF_DEFAULT_TX_QUEUES; + input_params.ring_size = MEMIF_DEFAULT_RING_SIZE; + input_params.buffer_size = MEMIF_DEFAULT_BUFFER_SIZE; + + memif_output_params_t output_params = {0}; + + ret = memif_vapi_create_memif(VPPForwarderInterface::sock_, &input_params, + &output_params); + + if (ret < 0) { + throw errors::RuntimeException( + "Error creating memif interface in the local VPP forwarder."); + } + + return output_params.sw_if_index; +} + +void VPPForwarderInterface::consumerConnection() { + hicn_consumer_input_params input = {0}; + hicn_consumer_output_params output = {0}; + ip_address_t ip4_address; + ip_address_t ip6_address; + + output.src4 = &ip4_address; + output.src6 = &ip6_address; + input.swif = sw_if_index_; + + int ret = hicn_vapi_register_cons_app(VPPForwarderInterface::sock_, &input, + &output); + + if (ret < 0) { + throw errors::RuntimeException(hicn_vapi_get_error_string(ret)); + } + + face_id1_ = output.face_id1; + face_id2_ = output.face_id2; + + std::memcpy(inet_address_.v4.as_u8, output.src4->v4.as_u8, IPV4_ADDR_LEN); + + std::memcpy(inet6_address_.v6.as_u8, output.src6->v6.as_u8, IPV6_ADDR_LEN); +} + +void VPPForwarderInterface::producerConnection() { + // Producer connection will be set when we set the first route. +} + +void VPPForwarderInterface::connect(bool is_consumer) { + std::lock_guard<std::mutex> connection_lock(global_lock_); + + vapi_connect_safe(&sock_, 0); + + sw_if_index_ = getMemifConfiguration(); + + is_consumer_ = is_consumer; + if (is_consumer_) { + consumerConnection(); + } + + connector_.connect(memif_id_, 0); +} + +void VPPForwarderInterface::registerRoute(Prefix &prefix) { + ip_prefix_t &addr = prefix.toIpPrefixStruct(); + + ip_prefix_t producer_prefix; + ip_address_t producer_locator; + + if (face_id1_ == uint32_t(~0)) { + hicn_producer_input_params input; + std::memset(&input, 0, sizeof(input)); + + hicn_producer_output_params output; + std::memset(&output, 0, sizeof(output)); + + input.prefix = &producer_prefix; + output.prod_addr = &producer_locator; + + // Here we have to ask to the actual connector what is the + // memif_id, since this function should be called after the + // memif creation. + input.swif = sw_if_index_; + input.prefix->address = addr.address; + input.prefix->family = addr.family; + input.prefix->len = addr.len; + input.cs_reserved = content_store_reserved_; + + int ret = hicn_vapi_register_prod_app(VPPForwarderInterface::sock_, &input, + &output); + + if (ret < 0) { + throw errors::RuntimeException(hicn_vapi_get_error_string(ret)); + } + + inet6_address_ = *output.prod_addr; + + face_id1_ = output.face_id; + } else { + hicn_producer_set_route_params params; + params.prefix = &producer_prefix; + params.prefix->address = addr.address; + params.prefix->family = addr.family; + params.prefix->len = addr.len; + params.face_id = face_id1_; + + int ret = hicn_vapi_register_route(VPPForwarderInterface::sock_, ¶ms); + + if (ret < 0) { + throw errors::RuntimeException(hicn_vapi_get_error_string(ret)); + } + } +} + +void VPPForwarderInterface::closeConnection() { + if (VPPForwarderInterface::sock_) { + connector_.close(); + + if (is_consumer_) { + hicn_del_face_app_input_params params; + params.face_id = face_id1_; + hicn_vapi_face_cons_del(VPPForwarderInterface::sock_, ¶ms); + params.face_id = face_id2_; + hicn_vapi_face_cons_del(VPPForwarderInterface::sock_, ¶ms); + } else { + hicn_del_face_app_input_params params; + params.face_id = face_id1_; + hicn_vapi_face_prod_del(VPPForwarderInterface::sock_, ¶ms); + } + + if (sw_if_index_ != uint32_t(~0)) { + int ret = + memif_vapi_delete_memif(VPPForwarderInterface::sock_, sw_if_index_); + if (ret < 0) { + TRANSPORT_LOGE("Error deleting memif with sw idx %u.", sw_if_index_); + } + } + + vapi_disconnect_safe(); + VPPForwarderInterface::sock_ = nullptr; + } +} + +} // namespace core + +} // namespace transport + +#endif diff --git a/libtransport/src/core/vpp_forwarder_interface.h b/libtransport/src/core/vpp_forwarder_interface.h new file mode 100644 index 000000000..eb759f8bc --- /dev/null +++ b/libtransport/src/core/vpp_forwarder_interface.h @@ -0,0 +1,88 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <hicn/transport/config.h> + +#ifdef __vpp__ + +#include <hicn/transport/core/prefix.h> + +#ifdef always_inline +#undef always_inline +#endif +extern "C" { +#include <vapi/vapi_safe.h> +}; + +#include <core/forwarder_interface.h> +#include <core/memif_connector.h> + +#include <deque> + +namespace transport { + +namespace core { + +class VPPForwarderInterface + : public ForwarderInterface<VPPForwarderInterface, MemifConnector> { + static constexpr std::uint16_t interface_mtu = 1500; + + public: + VPPForwarderInterface(MemifConnector &connector); + + typedef MemifConnector ConnectorType; + + ~VPPForwarderInterface(); + + void connect(bool is_consumer); + + void registerRoute(Prefix &prefix); + + TRANSPORT_ALWAYS_INLINE std::uint16_t getMtu() { return interface_mtu; } + + TRANSPORT_ALWAYS_INLINE static bool isControlMessageImpl( + const uint8_t *message) { + return false; + } + + TRANSPORT_ALWAYS_INLINE void processControlMessageReplyImpl( + Packet::MemBufPtr &&packet_buffer) {} + + void closeConnection(); + + private: + uint32_t getMemifConfiguration(); + + void consumerConnection(); + + void producerConnection(); + + uint32_t memif_id_; + uint32_t sw_if_index_; + // A consumer socket in vpp has two faces (ipv4 and ipv6) + uint32_t face_id1_; + uint32_t face_id2_; + bool is_consumer_; + vapi_ctx_t sock_; + static std::mutex global_lock_; +}; + +} // namespace core + +} // namespace transport + +#endif |