diff options
Diffstat (limited to 'libtransport/src/core')
34 files changed, 3313 insertions, 1762 deletions
diff --git a/libtransport/src/core/CMakeLists.txt b/libtransport/src/core/CMakeLists.txt index e442bb863..777772a04 100644 --- a/libtransport/src/core/CMakeLists.txt +++ b/libtransport/src/core/CMakeLists.txt @@ -1,4 +1,4 @@ -# Copyright (c) 2017-2019 Cisco and/or its affiliates. +# Copyright (c) 2021 Cisco and/or its affiliates. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at: @@ -14,14 +14,18 @@ list(APPEND HEADER_FILES ${CMAKE_CURRENT_SOURCE_DIR}/facade.h ${CMAKE_CURRENT_SOURCE_DIR}/manifest.h - ${CMAKE_CURRENT_SOURCE_DIR}/manifest_inline.h ${CMAKE_CURRENT_SOURCE_DIR}/manifest_format_fixed.h ${CMAKE_CURRENT_SOURCE_DIR}/manifest_format.h ${CMAKE_CURRENT_SOURCE_DIR}/pending_interest.h ${CMAKE_CURRENT_SOURCE_DIR}/portal.h ${CMAKE_CURRENT_SOURCE_DIR}/errors.h ${CMAKE_CURRENT_SOURCE_DIR}/global_configuration.h + ${CMAKE_CURRENT_SOURCE_DIR}/global_id_counter.h ${CMAKE_CURRENT_SOURCE_DIR}/local_connector.h + ${CMAKE_CURRENT_SOURCE_DIR}/global_workers.h + ${CMAKE_CURRENT_SOURCE_DIR}/udp_connector.h + ${CMAKE_CURRENT_SOURCE_DIR}/udp_listener.h + ${CMAKE_CURRENT_SOURCE_DIR}/global_module_manager.h ) list(APPEND SOURCE_FILES @@ -35,8 +39,22 @@ list(APPEND SOURCE_FILES ${CMAKE_CURRENT_SOURCE_DIR}/portal.cc ${CMAKE_CURRENT_SOURCE_DIR}/global_configuration.cc ${CMAKE_CURRENT_SOURCE_DIR}/io_module.cc - ${CMAKE_CURRENT_SOURCE_DIR}/local_connector.cc + ${CMAKE_CURRENT_SOURCE_DIR}/udp_connector.cc + ${CMAKE_CURRENT_SOURCE_DIR}/udp_listener.cc + ${CMAKE_CURRENT_SOURCE_DIR}/constructor.cc ) +if (NOT ${CMAKE_SYSTEM_NAME} MATCHES Android) + if (UNIX AND NOT APPLE) + list(APPEND SOURCE_FILES + ${CMAKE_CURRENT_SOURCE_DIR}/memif_connector.cc + ) + + list(APPEND HEADER_FILES + ${CMAKE_CURRENT_SOURCE_DIR}/memif_connector.h + ) + endif() +endif() + set(SOURCE_FILES ${SOURCE_FILES} PARENT_SCOPE) -set(HEADER_FILES ${HEADER_FILES} PARENT_SCOPE)
\ No newline at end of file +set(HEADER_FILES ${HEADER_FILES} PARENT_SCOPE) diff --git a/libtransport/src/core/constructor.cc b/libtransport/src/core/constructor.cc new file mode 100644 index 000000000..0c7f0dfa8 --- /dev/null +++ b/libtransport/src/core/constructor.cc @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2022 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <core/global_configuration.h> +#include <core/global_module_manager.h> +#include <core/global_workers.h> +#include <hicn/transport/core/global_object_pool.h> + +namespace transport { +namespace core { + +void __attribute__((constructor)) libtransportInit() { + // First the global module manager is initialized + GlobalModuleManager::getInstance(); + // Then the packet allocator is initialized + PacketManager<>::getInstance(); + // Then the global configuration is initialized + GlobalConfiguration::getInstance(); + // Then the global workers are initialized + GlobalWorkers::getInstance(); +} + +} // namespace core +} // namespace transport
\ No newline at end of file diff --git a/libtransport/src/core/content_object.cc b/libtransport/src/core/content_object.cc index 411494fdf..7ed6c57ab 100644 --- a/libtransport/src/core/content_object.cc +++ b/libtransport/src/core/content_object.cc @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Copyright (c) 2021-2022 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -15,6 +15,7 @@ #include <hicn/transport/core/content_object.h> #include <hicn/transport/errors/errors.h> +#include <hicn/transport/portability/endianess.h> #include <hicn/transport/utils/branch_prediction.h> extern "C" { @@ -34,30 +35,29 @@ namespace core { ContentObject::ContentObject(const Name &name, Packet::Format format, std::size_t additional_header_size) - : Packet(format, additional_header_size) { - if (TRANSPORT_EXPECT_FALSE( - hicn_data_set_name(format, packet_start_, &name.name_) < 0)) { + : Packet(HICN_PACKET_TYPE_DATA, format, additional_header_size) { + if (TRANSPORT_EXPECT_FALSE(hicn_data_set_name(&pkbuf_, &name.name_) < 0)) { throw errors::RuntimeException("Error filling the packet name."); } - if (TRANSPORT_EXPECT_FALSE(hicn_data_get_name(format_, packet_start_, - name_.getStructReference()) < - 0)) { + if (TRANSPORT_EXPECT_FALSE( + hicn_data_get_name(&pkbuf_, &name_.getStructReference()) < 0)) { throw errors::MalformedPacketException(); } } -#ifdef __ANDROID__ -ContentObject::ContentObject(hicn_format_t format, +ContentObject::ContentObject(hicn_packet_format_t format, std::size_t additional_header_size) - : ContentObject(Name("0::0|0"), format, additional_header_size) {} + : ContentObject( +#ifdef __ANDROID__ + Name("0::0|0"), #else -ContentObject::ContentObject(hicn_format_t format, - std::size_t additional_header_size) - : ContentObject(Packet::base_name, format, additional_header_size) {} + Packet::base_name, #endif + format, additional_header_size) { +} -ContentObject::ContentObject(const Name &name, hicn_format_t format, +ContentObject::ContentObject(const Name &name, hicn_packet_format_t format, std::size_t additional_header_size, const uint8_t *payload, std::size_t size) : ContentObject(name, format, additional_header_size) { @@ -80,9 +80,8 @@ ContentObject::~ContentObject() {} const Name &ContentObject::getName() const { if (!name_) { - if (hicn_data_get_name(format_, packet_start_, - (hicn_name_t *)name_.getConstStructReference()) < - 0) { + if (hicn_data_get_name( + &pkbuf_, (hicn_name_t *)&name_.getConstStructReference()) < 0) { throw errors::MalformedPacketException(); } } @@ -93,31 +92,27 @@ const Name &ContentObject::getName() const { Name &ContentObject::getWritableName() { return const_cast<Name &>(getName()); } void ContentObject::setName(const Name &name) { - if (hicn_data_set_name(format_, packet_start_, - name.getConstStructReference()) < 0) { + if (hicn_data_set_name(&pkbuf_, &name.getConstStructReference()) < 0) { throw errors::RuntimeException("Error setting content object name."); } - if (hicn_data_get_name(format_, packet_start_, name_.getStructReference()) < - 0) { + if (hicn_data_get_name(&pkbuf_, &name_.getStructReference()) < 0) { throw errors::MalformedPacketException(); } } -uint32_t ContentObject::getPathLabel() const { - uint32_t path_label; - if (hicn_data_get_path_label(packet_start_, &path_label) < 0) { +hicn_path_label_t ContentObject::getPathLabel() const { + hicn_path_label_t path_label; + if (hicn_data_get_path_label(&pkbuf_, &path_label) < 0) { throw errors::RuntimeException( "Error retrieving the path label from content object"); } - return ntohl(path_label); + return path_label; } -ContentObject &ContentObject::setPathLabel(uint32_t path_label) { - path_label = htonl(path_label); - if (hicn_data_set_path_label((hicn_header_t *)packet_start_, path_label) < - 0) { +ContentObject &ContentObject::setPathLabel(hicn_path_label_t path_label) { + if (hicn_data_set_path_label(&pkbuf_, path_label) < 0) { throw errors::RuntimeException( "Error setting the path label from content object"); } @@ -125,18 +120,18 @@ ContentObject &ContentObject::setPathLabel(uint32_t path_label) { return *this; } -void ContentObject::setLocator(const ip_address_t &ip_address) { - if (hicn_data_set_locator(format_, packet_start_, &ip_address) < 0) { +void ContentObject::setLocator(const hicn_ip_address_t &ip_address) { + if (hicn_data_set_locator(&pkbuf_, &ip_address) < 0) { throw errors::RuntimeException("Error setting content object locator"); } return; } -ip_address_t ContentObject::getLocator() const { - ip_address_t ip; +hicn_ip_address_t ContentObject::getLocator() const { + hicn_ip_address_t ip; - if (hicn_data_get_locator(format_, packet_start_, &ip) < 0) { + if (hicn_data_get_locator(&pkbuf_, &ip) < 0) { throw errors::RuntimeException("Error getting content object locator."); } @@ -144,7 +139,7 @@ ip_address_t ContentObject::getLocator() const { } void ContentObject::setLifetime(uint32_t lifetime) { - if (hicn_data_set_expiry_time(packet_start_, lifetime) < 0) { + if (hicn_data_set_expiry_time(&pkbuf_, lifetime) < 0) { throw errors::MalformedPacketException(); } } @@ -152,7 +147,7 @@ void ContentObject::setLifetime(uint32_t lifetime) { uint32_t ContentObject::getLifetime() const { uint32_t lifetime = 0; - if (hicn_data_get_expiry_time(packet_start_, &lifetime) < 0) { + if (hicn_data_get_expiry_time(&pkbuf_, &lifetime) < 0) { throw errors::MalformedPacketException(); } @@ -160,13 +155,29 @@ uint32_t ContentObject::getLifetime() const { } void ContentObject::resetForHash() { - if (hicn_data_reset_for_hash( - format_, reinterpret_cast<hicn_header_t *>(packet_start_)) < 0) { + if (hicn_data_reset_for_hash(&pkbuf_) < 0) { throw errors::RuntimeException( "Error resetting content object fields for hash computation."); } } +bool ContentObject::isLast() const { + int is_last = 0; + if (hicn_data_is_last(&pkbuf_, &is_last) < 0) { + throw errors::RuntimeException( + "Impossible to get last data flag from packet header."); + } + + return is_last; +} + +void ContentObject::setLast() { + if (hicn_data_set_last(&pkbuf_) < 0) { + throw errors::RuntimeException( + "Impossible to set last data flag to packet header."); + } +} + } // end namespace core } // end namespace transport diff --git a/libtransport/src/core/errors.cc b/libtransport/src/core/errors.cc index 82647a60b..68fd7bf38 100644 --- a/libtransport/src/core/errors.cc +++ b/libtransport/src/core/errors.cc @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -39,6 +39,15 @@ std::string core_category_impl::message(int ev) const { case core_error::configuration_not_applied: { return "Configuration was not applied due to wrong parameters."; } + case core_error::send_failed: { + return "Error sending data to socket."; + } + case core_error::send_buffer_allocation_failed: { + return "Error allocating buffers to send data."; + } + case core_error::receive_failed: { + return "Error receiving data from socket."; + } default: { return "Unknown core error"; } diff --git a/libtransport/src/core/errors.h b/libtransport/src/core/errors.h index a46f1dbcd..4532e6dc5 100644 --- a/libtransport/src/core/errors.h +++ b/libtransport/src/core/errors.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -36,7 +36,10 @@ const std::error_category& core_category(); enum class core_error { success = 0, configuration_parse_failed, - configuration_not_applied + configuration_not_applied, + send_failed, + send_buffer_allocation_failed, + receive_failed }; /** diff --git a/libtransport/src/core/facade.h b/libtransport/src/core/facade.h index 199081271..77c1d16d2 100644 --- a/libtransport/src/core/facade.h +++ b/libtransport/src/core/facade.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -15,16 +15,16 @@ #pragma once +#include <core/manifest.h> #include <core/manifest_format_fixed.h> -#include <core/manifest_inline.h> #include <core/portal.h> namespace transport { namespace core { -using ContentObjectManifest = core::ManifestInline<ContentObject, Fixed>; -using InterestManifest = core::ManifestInline<Interest, Fixed>; +using ContentObjectManifest = core::Manifest<Fixed>; +using InterestManifest = core::Manifest<Fixed>; } // namespace core diff --git a/libtransport/src/core/global_configuration.cc b/libtransport/src/core/global_configuration.cc index 9da37c2fa..f53e1f0e2 100644 --- a/libtransport/src/core/global_configuration.cc +++ b/libtransport/src/core/global_configuration.cc @@ -68,6 +68,7 @@ void GlobalConfiguration::parseConfiguration(const std::string& path) { // variable comes first. std::unique_lock<std::mutex> lck(cp_mtx_); if (const char* env_c = std::getenv(GlobalConfiguration::conf_file)) { + conf_file_path_ = env_c; parseTransportConfig(env_c); } else if (!path.empty()) { conf_file_path_ = path; diff --git a/libtransport/src/core/manifest.cc b/libtransport/src/core/global_id_counter.h index 3f890f3d0..0a67b76d5 100644 --- a/libtransport/src/core/manifest.cc +++ b/libtransport/src/core/global_id_counter.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -13,21 +13,27 @@ * limitations under the License. */ -#include <hicn/transport/core/manifest.h> +#pragma once -namespace transport { +#include <hicn/transport/utils/singleton.h> -namespace core { +#include <atomic> +#include <mutex> -std::string ManifestEncoding::manifest_type = std::string("manifest_type"); +namespace transport { -std::map<ManifestType, std::string> ManifestEncoding::manifest_types = { - {FINAL_CHUNK_NUMBER, "FinalChunkNumber"}, {NAME_LIST, "NameList"}}; +namespace core { -std::string ManifestEncoding::final_chunk_number = - std::string("final_chunk_number"); -std::string ManifestEncoding::content_name = std::string("content_name"); +template <typename T = uint64_t> +class GlobalCounter : public utils::Singleton<GlobalCounter<T>> { + public: + friend class utils::Singleton<GlobalCounter>; + T getNext() { return counter_++; } -} // end namespace core + private: + GlobalCounter() : counter_(0) {} + std::atomic<T> counter_; +}; -} // end namespace transport
\ No newline at end of file +} // namespace core +} // namespace transport
\ No newline at end of file diff --git a/libtransport/src/core/global_module_manager.h b/libtransport/src/core/global_module_manager.h new file mode 100644 index 000000000..c9d272cdb --- /dev/null +++ b/libtransport/src/core/global_module_manager.h @@ -0,0 +1,94 @@ +/* + * Copyright (c) 2022 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <glog/logging.h> +#include <hicn/transport/utils/singleton.h> + +#ifndef _WIN32 +#include <dlfcn.h> +#endif + +#include <atomic> +#include <iostream> +#include <mutex> +#include <unordered_map> + +namespace transport { +namespace core { + +class GlobalModuleManager : public utils::Singleton<GlobalModuleManager> { + public: + friend class utils::Singleton<GlobalModuleManager>; + + ~GlobalModuleManager() { + for (const auto &[key, value] : modules_) { + unload(value); + } + } + + void *loadModule(const std::string &module_name) { + void *handle = nullptr; + const char *error = nullptr; + + // Lock + std::unique_lock lck(mtx_); + + auto it = modules_.find(module_name); + if (it != modules_.end()) { + return it->second; + } + + // open module + handle = dlopen(module_name.c_str(), RTLD_NOW); + if (!handle) { + if ((error = dlerror()) != nullptr) { + LOG(ERROR) << error; + } + return nullptr; + } + + auto ret = modules_.try_emplace(module_name, handle); + DCHECK(ret.second); + + return handle; + } + + void unload(void *handle) { + // destroy object and close module + dlclose(handle); + } + + bool unloadModule(const std::string &module_name) { + // Lock + std::unique_lock lck(mtx_); + auto it = modules_.find(module_name); + if (it != modules_.end()) { + unload(it->second); + return true; + } + + return false; + } + + private: + GlobalModuleManager() = default; + std::mutex mtx_; + std::unordered_map<std::string, void *> modules_; +}; + +} // namespace core +} // namespace transport
\ No newline at end of file diff --git a/libtransport/src/core/global_workers.h b/libtransport/src/core/global_workers.h new file mode 100644 index 000000000..c5d794ef2 --- /dev/null +++ b/libtransport/src/core/global_workers.h @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <hicn/transport/utils/singleton.h> +#include <hicn/transport/utils/thread_pool.h> + +#include <atomic> +#include <mutex> + +namespace transport { +namespace core { + +class GlobalWorkers : public utils::Singleton<GlobalWorkers> { + public: + friend class utils::Singleton<GlobalWorkers>; + + ::utils::EventThread& getWorker() { + return thread_pool_.getWorker(counter_++ % thread_pool_.getNThreads()); + } + + auto& getWorkers() { return thread_pool_.getWorkers(); } + + private: + GlobalWorkers() : counter_(0), thread_pool_() {} + + std::atomic_uint16_t counter_; + ::utils::ThreadPool thread_pool_; +}; + +} // namespace core +} // namespace transport
\ No newline at end of file diff --git a/libtransport/src/core/interest.cc b/libtransport/src/core/interest.cc index 9d868ced0..c3eb7c379 100644 --- a/libtransport/src/core/interest.cc +++ b/libtransport/src/core/interest.cc @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Copyright (c) 2021-2022 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -21,7 +21,9 @@ extern "C" { #ifndef _WIN32 TRANSPORT_CLANG_DISABLE_WARNING("-Wextern-c-compat") #endif +#include <hicn/base.h> #include <hicn/hicn.h> +#include <hicn/interest_manifest.h> } #include <cstring> @@ -33,29 +35,30 @@ namespace core { Interest::Interest(const Name &interest_name, Packet::Format format, std::size_t additional_header_size) - : Packet(format, additional_header_size) { - if (hicn_interest_set_name(format_, packet_start_, - interest_name.getConstStructReference()) < 0) { + : Packet(HICN_PACKET_TYPE_INTEREST, format, additional_header_size) { + if (hicn_interest_set_name(&pkbuf_, + &interest_name.getConstStructReference()) < 0) { throw errors::MalformedPacketException(); } - if (hicn_interest_get_name(format_, packet_start_, - name_.getStructReference()) < 0) { + if (hicn_interest_get_name(&pkbuf_, &name_.getStructReference()) < 0) { throw errors::MalformedPacketException(); } } +Interest::Interest(hicn_packet_format_t format, + std::size_t additional_header_size) + : Interest( #ifdef __ANDROID__ -Interest::Interest(hicn_format_t format, std::size_t additional_header_size) - : Interest(Name("0::0|0"), format, additional_header_size) {} + Name("0::0|0"), #else -Interest::Interest(hicn_format_t format, std::size_t additional_header_size) - : Interest(base_name, format, additional_header_size) {} + base_name, #endif + format, additional_header_size) { +} Interest::Interest(MemBuf &&buffer) : Packet(std::move(buffer)) { - if (hicn_interest_get_name(format_, packet_start_, - name_.getStructReference()) < 0) { + if (hicn_interest_get_name(&pkbuf_, &name_.getStructReference()) < 0) { throw errors::MalformedPacketException(); } } @@ -73,15 +76,13 @@ Interest &Interest::operator=(const Interest &other) { return (Interest &)Packet::operator=(other); } -Interest::~Interest() {} +Interest::~Interest() = default; const Name &Interest::getName() const { - if (!name_) { - if (hicn_interest_get_name(format_, packet_start_, - (hicn_name_t *)name_.getConstStructReference()) < - 0) { - throw errors::MalformedPacketException(); - } + if (!name_ && + hicn_interest_get_name( + &pkbuf_, (hicn_name_t *)&name_.getConstStructReference()) < 0) { + throw errors::MalformedPacketException(); } return name_; @@ -90,29 +91,27 @@ const Name &Interest::getName() const { Name &Interest::getWritableName() { return const_cast<Name &>(getName()); } void Interest::setName(const Name &name) { - if (hicn_interest_set_name(format_, packet_start_, - name.getConstStructReference()) < 0) { + if (hicn_interest_set_name(&pkbuf_, &name.getConstStructReference()) < 0) { throw errors::RuntimeException("Error setting interest name."); } - if (hicn_interest_get_name(format_, packet_start_, - name_.getStructReference()) < 0) { + if (hicn_interest_get_name(&pkbuf_, &name_.getStructReference()) < 0) { throw errors::MalformedPacketException(); } } -void Interest::setLocator(const ip_address_t &ip_address) { - if (hicn_interest_set_locator(format_, packet_start_, &ip_address) < 0) { +void Interest::setLocator(const hicn_ip_address_t &ip_address) { + if (hicn_interest_set_locator(&pkbuf_, &ip_address) < 0) { throw errors::RuntimeException("Error setting interest locator."); } return; } -ip_address_t Interest::getLocator() const { - ip_address_t ip; +hicn_ip_address_t Interest::getLocator() const { + hicn_ip_address_t ip; - if (hicn_interest_get_locator(format_, packet_start_, &ip) < 0) { + if (hicn_interest_get_locator(&pkbuf_, &ip) < 0) { throw errors::RuntimeException("Error getting interest locator."); } @@ -120,7 +119,7 @@ ip_address_t Interest::getLocator() const { } void Interest::setLifetime(uint32_t lifetime) { - if (hicn_interest_set_lifetime(packet_start_, lifetime) < 0) { + if (hicn_interest_set_lifetime(&pkbuf_, lifetime) < 0) { throw errors::MalformedPacketException(); } } @@ -128,7 +127,7 @@ void Interest::setLifetime(uint32_t lifetime) { uint32_t Interest::getLifetime() const { uint32_t lifetime = 0; - if (hicn_interest_get_lifetime(packet_start_, &lifetime) < 0) { + if (hicn_interest_get_lifetime(&pkbuf_, &lifetime) < 0) { throw errors::MalformedPacketException(); } @@ -136,14 +135,20 @@ uint32_t Interest::getLifetime() const { } void Interest::resetForHash() { - if (hicn_interest_reset_for_hash( - format_, reinterpret_cast<hicn_header_t *>(packet_start_)) < 0) { + if (hicn_interest_reset_for_hash(&pkbuf_) < 0) { throw errors::RuntimeException( "Error resetting interest fields for hash computation."); } + + // Reset request bitmap in manifest + if (hasManifest()) { + auto int_manifest_header = + (interest_manifest_header_t *)(writableData() + headerSize()); + memset(int_manifest_header->request_bitmap, 0, BITMAP_SIZE * sizeof(u32)); + } } -bool Interest::hasManifest() { +bool Interest::hasManifest() const { return (getPayloadType() == PayloadType::MANIFEST); } @@ -162,19 +167,43 @@ void Interest::encodeSuffixes() { // We assume interest does not hold signature for the moment. auto int_manifest_header = - (InterestManifestHeader *)(writableData() + headerSize()); - int_manifest_header->n_suffixes = suffix_set_.size(); + (interest_manifest_header_t *)(writableData() + headerSize()); + + interest_manifest_init(int_manifest_header, name_.getSuffix()); + for (auto it = suffix_set_.begin(); it != suffix_set_.end(); it++) { + interest_manifest_add_suffix(int_manifest_header, *it); + } + std::size_t additional_length = - sizeof(InterestManifestHeader) + + sizeof(interest_manifest_header_t) + int_manifest_header->n_suffixes * sizeof(uint32_t); - uint32_t *suffix = (uint32_t *)(int_manifest_header + 1); - for (auto it = suffix_set_.begin(); it != suffix_set_.end(); it++, suffix++) { - *suffix = *it; + append(additional_length); + updateLength(); +} + +void Interest::serializeSuffixes() { + if (!hasManifest()) { + return; } - append(additional_length); - updateLength(additional_length); + // We assume interest does not hold signature for the moment. + auto int_manifest_header = + (interest_manifest_header_t *)(writableData() + headerSize()); + // Serialize interest manifest + interest_manifest_serialize(int_manifest_header); +} + +void Interest::deserializeSuffixes() { + if (!hasManifest()) { + return; + } + + // We assume interest does not hold signature for the moment. + auto int_manifest_header = + (interest_manifest_header_t *)(writableData() + headerSize()); + // Serialize interest manifest + interest_manifest_deserialize(int_manifest_header); } uint32_t *Interest::firstSuffix() { @@ -182,7 +211,7 @@ uint32_t *Interest::firstSuffix() { return nullptr; } - auto ret = (InterestManifestHeader *)(writableData() + headerSize()); + auto ret = (interest_manifest_header_t *)(writableData() + headerSize()); ret += 1; return (uint32_t *)ret; @@ -193,11 +222,39 @@ uint32_t Interest::numberOfSuffixes() { return 0; } - auto header = (InterestManifestHeader *)(writableData() + headerSize()); + auto header = (interest_manifest_header_t *)(writableData() + headerSize()); return header->n_suffixes; } +hicn_uword *Interest::getRequestBitmap() { + if (!hasManifest()) return nullptr; + + auto header = (interest_manifest_header_t *)(writableData() + headerSize()); + return header->request_bitmap; +} + +interest_manifest_header_t *Interest::getIntManifestHeader() { + if (!hasManifest()) return nullptr; + + auto header = (interest_manifest_header_t *)(writableData() + headerSize()); + return header; +}; + +void Interest::setRequestBitmap(const uint32_t *request_bitmap) { + if (!hasManifest()) return; + + auto header = (interest_manifest_header_t *)(writableData() + headerSize()); + memcpy(header->request_bitmap, request_bitmap, + BITMAP_SIZE * sizeof(uint32_t)); +} + +bool Interest::isValid() { + if (!hasManifest()) return true; + auto header = (interest_manifest_header_t *)(writableData() + headerSize()); + return interest_manifest_is_valid(header, payloadSize()); +} + } // end namespace core } // end namespace transport diff --git a/libtransport/src/core/io_module.cc b/libtransport/src/core/io_module.cc index a751eabf5..0fdb735c4 100644 --- a/libtransport/src/core/io_module.cc +++ b/libtransport/src/core/io_module.cc @@ -16,11 +16,14 @@ #ifndef _WIN32 #include <dlfcn.h> #endif +#include <core/global_module_manager.h> #include <glog/logging.h> #include <hicn/transport/core/io_module.h> +#include <iostream> + #ifdef ANDROID -#include <io_modules/udp/hicn_forwarder_module.h> +#include <io_modules/hicn-light/hicn_forwarder_module.h> #elif _WIN32 #include <hicn/util/windows/windows_utils.h> #endif @@ -36,53 +39,28 @@ IoModule *IoModule::load(const char *module_name) { #ifdef ANDROID return new HicnForwarderModule(); #else - void *handle = 0; - IoModule *module = 0; - IoModule *(*creator)(void) = 0; - const char *error = 0; + IoModule *iomodule = nullptr; + IoModule *(*creator)(void) = nullptr; + const char *error = nullptr; - // open module - handle = dlopen(module_name, RTLD_NOW); - if (!handle) { - if ((error = dlerror()) != 0) { - LOG(ERROR) << error; - } - return 0; - } + auto handle = GlobalModuleManager::getInstance().loadModule(module_name); // get factory method creator = (IoModule * (*)(void)) dlsym(handle, "create_module"); if (!creator) { - if ((error = dlerror()) != 0) { - LOG(ERROR) << error; - return 0; + if ((error = dlerror()) != nullptr) { + LOG(ERROR) << error << ": " << module_name; } + + return nullptr; } // create object and return it - module = (*creator)(); - module->handle_ = handle; - - return module; -#endif -} - -bool IoModule::unload(IoModule *module) { - if (!module) { - return false; - } + iomodule = (*creator)(); -#ifdef ANDROID - delete module; -#else - // destroy object and close module - void *handle = module->handle_; - delete module; - dlclose(handle); + return iomodule; #endif - - return true; } } // namespace core -} // namespace transport
\ No newline at end of file +} // namespace transport diff --git a/libtransport/src/core/local_connector.cc b/libtransport/src/core/local_connector.cc deleted file mode 100644 index 50dadc677..000000000 --- a/libtransport/src/core/local_connector.cc +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright (c) 2017-2020 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <core/local_connector.h> -#include <glog/logging.h> -#include <hicn/transport/core/asio_wrapper.h> -#include <hicn/transport/core/content_object.h> -#include <hicn/transport/core/interest.h> -#include <hicn/transport/errors/not_implemented_exception.h> - -namespace transport { -namespace core { - -LocalConnector::~LocalConnector() {} - -void LocalConnector::close() { state_ = State::CLOSED; } - -void LocalConnector::send(Packet &packet) { - if (!isConnected()) { - return; - } - - DLOG_IF(INFO, VLOG_IS_ON(3)) << "Sending packet to local socket."; - io_service_.get().post([this, p{packet.shared_from_this()}]() mutable { - receive_callback_(this, *p, std::make_error_code(std::errc(0))); - }); -} - -void LocalConnector::send(const uint8_t *packet, std::size_t len) { - throw errors::NotImplementedException(); -} - -} // namespace core -} // namespace transport
\ No newline at end of file diff --git a/libtransport/src/core/local_connector.h b/libtransport/src/core/local_connector.h index 0e2d8f676..bf64b71d6 100644 --- a/libtransport/src/core/local_connector.h +++ b/libtransport/src/core/local_connector.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2020 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -15,10 +15,11 @@ #pragma once +#include <core/errors.h> #include <hicn/transport/core/asio_wrapper.h> #include <hicn/transport/core/connector.h> #include <hicn/transport/core/global_object_pool.h> -#include <hicn/transport/utils/move_wrapper.h> +#include <hicn/transport/errors/not_implemented_exception.h> #include <hicn/transport/utils/shared_ptr_utils.h> #include <io_modules/forwarder/errors.h> @@ -34,19 +35,48 @@ class LocalConnector : public Connector { OnClose &&close_callback, OnReconnect &&on_reconnect) : Connector(receive_callback, packet_sent, close_callback, on_reconnect), io_service_(io_service), - io_service_work_(io_service_.get()) { - state_ = State::CONNECTED; - } + io_service_work_(io_service_.get()) {} - ~LocalConnector() override; + ~LocalConnector() override = default; - void send(Packet &packet) override; + auto shared_from_this() { return utils::shared_from(this); } - void send(const uint8_t *packet, std::size_t len) override; + void send(Packet &packet) override { send(packet.shared_from_this()); } - void close() override; + void send(const utils::MemBuf::Ptr &buffer) override { + throw errors::NotImplementedException(); + } - auto shared_from_this() { return utils::shared_from(this); } + void receive(const std::vector<utils::MemBuf::Ptr> &buffers) override { + DLOG_IF(INFO, VLOG_IS_ON(3)) << "Sending packet to local socket."; + std::weak_ptr<LocalConnector> self = shared_from_this(); + io_service_.get().post([self, _buffers{std::move(buffers)}]() mutable { + if (auto ptr = self.lock()) { + ptr->receive_callback_(ptr.get(), _buffers, + make_error_code(core_error::success)); + } + }); + } + + void reconnect() override { + state_ = State::CONNECTED; + std::weak_ptr<LocalConnector> self = shared_from_this(); + io_service_.get().post([self]() { + if (auto ptr = self.lock()) { + ptr->on_reconnect_callback_(ptr.get(), + make_error_code(core_error::success)); + } + }); + } + + void close() override { + std::weak_ptr<LocalConnector> self = shared_from_this(); + io_service_.get().post([self]() mutable { + if (auto ptr = self.lock()) { + ptr->on_close_callback_(ptr.get()); + } + }); + } private: std::reference_wrapper<asio::io_service> io_service_; diff --git a/libtransport/src/core/manifest.h b/libtransport/src/core/manifest.h index 9b25ebd67..40832bb6b 100644 --- a/libtransport/src/core/manifest.h +++ b/libtransport/src/core/manifest.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -16,138 +16,73 @@ #pragma once #include <core/manifest_format.h> -#include <hicn/transport/core/content_object.h> -#include <hicn/transport/core/name.h> - -#include <set> +#include <glog/logging.h> +#include <hicn/transport/auth/verifier.h> +#include <hicn/transport/core/global_object_pool.h> +#include <hicn/transport/core/packet.h> namespace transport { - namespace core { -using typename core::Name; -using typename core::Packet; -using typename core::PayloadType; - -template <typename Base, typename FormatTraits, typename ManifestImpl> -class Manifest : public Base { - static_assert(std::is_base_of<Packet, Base>::value, - "Base must inherit from packet!"); - +template <typename FormatTraits> +class Manifest : public FormatTraits::Encoder, public FormatTraits::Decoder { public: - // core::ContentObjectManifest::Ptr + using Ptr = std::shared_ptr<Manifest>; using Encoder = typename FormatTraits::Encoder; using Decoder = typename FormatTraits::Decoder; - Manifest(std::size_t signature_size = 0) - : Base(HF_INET6_TCP_AH, signature_size), - encoder_(*this, signature_size), - decoder_(*this) { - Base::setPayloadType(PayloadType::MANIFEST); - } - - Manifest(const core::Name &name, std::size_t signature_size = 0) - : Base(name, HF_INET6_TCP_AH, signature_size), - encoder_(*this, signature_size), - decoder_(*this) { - Base::setPayloadType(PayloadType::MANIFEST); - } + using Hash = typename FormatTraits::Hash; + using HashType = typename FormatTraits::HashType; + using Suffix = typename FormatTraits::Suffix; + using SuffixList = typename FormatTraits::SuffixList; + using HashEntry = std::pair<auth::CryptoHashType, std::vector<uint8_t>>; - template <typename T> - Manifest(T &&base) - : Base(std::forward<T &&>(base)), - encoder_(*this, 0, false), - decoder_(*this) { - Base::setPayloadType(PayloadType::MANIFEST); + Manifest(Packet::Ptr packet, bool clear = false) + : Encoder(packet, clear), Decoder(packet), packet_(packet) { + packet->setPayloadType(PayloadType::MANIFEST); } virtual ~Manifest() = default; - std::size_t estimateManifestSize(std::size_t additional_entries = 0) { - return static_cast<ManifestImpl &>(*this).estimateManifestSizeImpl( - additional_entries); - } + Packet::Ptr getPacket() const { return packet_; } - /* - * After the call to encode, users MUST call clear before adding data - * to the manifest. - */ - Manifest &encode() { return static_cast<ManifestImpl &>(*this).encodeImpl(); } - - Manifest &decode() { - Manifest::decoder_.decode(); - - manifest_type_ = decoder_.getManifestType(); - hash_algorithm_ = decoder_.getHashAlgorithm(); - is_last_ = decoder_.getIsFinalManifest(); - - return static_cast<ManifestImpl &>(*this).decodeImpl(); + void setHeaders(ManifestType type, uint8_t max_capacity, HashType hash_algo, + bool is_last, const Name &base_name) { + Encoder::setType(type); + Encoder::setMaxCapacity(max_capacity); + Encoder::setHashAlgorithm(hash_algo); + Encoder::setIsLast(is_last); + Encoder::setBaseName(base_name); } - static std::size_t getManifestHeaderSize() { - return Encoder::getManifestHeaderSize(); - } + auth::Verifier::SuffixMap getSuffixMap() const { + auth::Verifier::SuffixMap suffix_map; - static std::size_t getManifestEntrySize() { - return Encoder::getManifestEntrySize(); - } + HashType hash_algo = Decoder::getHashAlgorithm(); + SuffixList suffix_list = Decoder::getEntries(); - Manifest &setManifestType(ManifestType type) { - manifest_type_ = type; - encoder_.setManifestType(manifest_type_); - return *this; - } + for (auto it = suffix_list.begin(); it != suffix_list.end(); ++it) { + Hash hash(it->second, Hash::getSize(hash_algo), hash_algo); + suffix_map[it->first] = hash; + } - Manifest &setHashAlgorithm(auth::CryptoHashType hash_algorithm) { - hash_algorithm_ = hash_algorithm; - encoder_.setHashAlgorithm(hash_algorithm_); - return *this; + return suffix_map; } - auth::CryptoHashType getHashAlgorithm() { return hash_algorithm_; } - - ManifestType getManifestType() const { return manifest_type_; } - - bool isFinalManifest() const { return is_last_; } - - Manifest &setVersion(ManifestVersion version) { - encoder_.setVersion(version); - return *this; - } - - Manifest &setFinalBlockNumber(std::uint32_t final_block_number) { - encoder_.setFinalBlockNumber(final_block_number); - return *this; - } - - uint32_t getFinalBlockNumber() const { - return decoder_.getFinalBlockNumber(); - } - - ManifestVersion getVersion() const { return decoder_.getVersion(); } - - Manifest &setFinalManifest(bool is_final_manifest) { - encoder_.setIsFinalManifest(is_final_manifest); - is_last_ = is_final_manifest; - return *this; - } - - Manifest &clear() { - encoder_.clear(); - decoder_.clear(); - return *this; - } + static Manifest::Ptr createContentManifest(Packet::Format format, + const core::Name &manifest_name, + std::size_t signature_size) { + ContentObject::Ptr content_object = + core::PacketManager<>::getInstance().getPacket<ContentObject>( + format, signature_size); + content_object->setName(manifest_name); + return std::make_shared<Manifest>(content_object, true); + }; protected: - ManifestType manifest_type_; - auth::CryptoHashType hash_algorithm_; - bool is_last_; - - Encoder encoder_; - Decoder decoder_; + Packet::Ptr packet_; }; } // end namespace core - } // end namespace transport diff --git a/libtransport/src/core/manifest_format.h b/libtransport/src/core/manifest_format.h index 90d221f5e..89412316a 100644 --- a/libtransport/src/core/manifest_format.h +++ b/libtransport/src/core/manifest_format.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -17,46 +17,40 @@ #include <hicn/transport/auth/crypto_hash.h> #include <hicn/transport/core/name.h> +#include <hicn/transport/interfaces/socket_options_keys.h> +#include <protocols/fec_utils.h> #include <cinttypes> #include <type_traits> #include <unordered_map> namespace transport { - namespace core { -enum class ManifestFields : uint8_t { - VERSION, - HASH_ALGORITHM, - SEGMENT_CALCULATION_STRATEGY, - FINAL_MANIFEST, - NAME_HASH_LIST, - BASE_NAME -}; - -enum class ManifestVersion : uint8_t { - VERSION_1 = 1, -}; - enum class ManifestType : uint8_t { INLINE_MANIFEST = 1, FINAL_CHUNK_NUMBER = 2, FLIC_MANIFEST = 3, }; -/** - * INCREMENTAL: Manifests will be received inline with the data with no specific - * assumption regarding the manifest capacity. Consumers can send interests - * using a +1 heuristic. - * - * MANIFEST_CAPACITY_BASED: manifests with capacity N have a suffix multiple of - * N+1: 0, N+1, 2(N+1) etc. Contents have a suffix incremented by 1 except when - * it conflicts with a manifest: 1, 2, ..., N, N+2, N+3, ..., 2N+1, 2N+3 - */ -enum class NextSegmentCalculationStrategy : uint8_t { - INCREMENTAL = 1, - MANIFEST_CAPACITY_BASED = 2, +struct ParamsRTC { + std::uint64_t timestamp; + std::uint32_t prod_rate; + std::uint32_t prod_seg; + protocol::fec::FECType fec_type; + + bool operator==(const ParamsRTC &other) const { + return (timestamp == other.timestamp && prod_rate == other.prod_rate && + prod_seg == other.prod_seg && fec_type == other.fec_type); + } +}; + +struct ParamsBytestream { + std::uint32_t final_segment; + + bool operator==(const ParamsBytestream &other) const { + return (final_segment == other.final_segment); + } }; template <typename T> @@ -84,22 +78,25 @@ class ManifestEncoder { return static_cast<Implementation &>(*this).clearImpl(); } - ManifestEncoder &setManifestType(ManifestType type) { - return static_cast<Implementation &>(*this).setManifestTypeImpl(type); + bool isEncoded() const { + return static_cast<const Implementation &>(*this).isEncodedImpl(); } - ManifestEncoder &setHashAlgorithm(auth::CryptoHashType hash) { - return static_cast<Implementation &>(*this).setHashAlgorithmImpl(hash); + ManifestEncoder &setType(ManifestType type) { + return static_cast<Implementation &>(*this).setTypeImpl(type); } - ManifestEncoder &setFinalChunkNumber(uint32_t final_chunk) { - return static_cast<Implementation &>(*this).setFinalChunkImpl(final_chunk); + ManifestEncoder &setMaxCapacity(uint8_t max_capacity) { + return static_cast<Implementation &>(*this).setMaxCapacityImpl( + max_capacity); } - ManifestEncoder &setNextSegmentCalculationStrategy( - NextSegmentCalculationStrategy strategy) { - return static_cast<Implementation &>(*this) - .setNextSegmentCalculationStrategyImpl(strategy); + ManifestEncoder &setHashAlgorithm(auth::CryptoHashType hash) { + return static_cast<Implementation &>(*this).setHashAlgorithmImpl(hash); + } + + ManifestEncoder &setIsLast(bool is_last) { + return static_cast<Implementation &>(*this).setIsLastImpl(is_last); } template < @@ -110,40 +107,36 @@ class ManifestEncoder { return static_cast<Implementation &>(*this).setBaseNameImpl(name); } - template <typename Hash> - ManifestEncoder &addSuffixAndHash(uint32_t suffix, Hash &&hash) { - return static_cast<Implementation &>(*this).addSuffixAndHashImpl( - suffix, std::forward<Hash &&>(hash)); + ManifestEncoder &setParamsBytestream(const ParamsBytestream ¶ms) { + return static_cast<Implementation &>(*this).setParamsBytestreamImpl(params); } - ManifestEncoder &setIsFinalManifest(bool is_last) { - return static_cast<Implementation &>(*this).setIsFinalManifestImpl(is_last); + ManifestEncoder &setParamsRTC(const ParamsRTC ¶ms) { + return static_cast<Implementation &>(*this).setParamsRTCImpl(params); } - ManifestEncoder &setVersion(ManifestVersion version) { - return static_cast<Implementation &>(*this).setVersionImpl(version); - } - - std::size_t estimateSerializedLength(std::size_t number_of_entries) { - return static_cast<Implementation &>(*this).estimateSerializedLengthImpl( - number_of_entries); + template <typename Hash> + ManifestEncoder &addEntry(uint32_t suffix, Hash &&hash) { + return static_cast<Implementation &>(*this).addEntryImpl( + suffix, std::forward<Hash>(hash)); } - ManifestEncoder &update() { - return static_cast<Implementation &>(*this).updateImpl(); + ManifestEncoder &removeEntry(uint32_t suffix) { + return static_cast<Implementation &>(*this).removeEntryImpl(suffix); } - ManifestEncoder &setFinalBlockNumber(std::uint32_t final_block_number) { - return static_cast<Implementation &>(*this).setFinalBlockNumberImpl( - final_block_number); + std::size_t manifestHeaderSize() const { + return static_cast<const Implementation &>(*this).manifestHeaderSizeImpl(); } - static std::size_t getManifestHeaderSize() { - return Implementation::getManifestHeaderSizeImpl(); + std::size_t manifestPayloadSize(size_t additional_entries = 0) const { + return static_cast<const Implementation &>(*this).manifestPayloadSizeImpl( + additional_entries); } - static std::size_t getManifestEntrySize() { - return Implementation::getManifestEntrySizeImpl(); + std::size_t manifestSize(size_t additional_entries = 0) const { + return static_cast<const Implementation &>(*this).manifestSizeImpl( + additional_entries); } }; @@ -152,55 +145,68 @@ class ManifestDecoder { public: virtual ~ManifestDecoder() = default; + ManifestDecoder &decode() { + return static_cast<Implementation &>(*this).decodeImpl(); + } + ManifestDecoder &clear() { return static_cast<Implementation &>(*this).clearImpl(); } - void decode() { static_cast<Implementation &>(*this).decodeImpl(); } + bool isDecoded() const { + return static_cast<const Implementation &>(*this).isDecodedImpl(); + } - ManifestType getManifestType() const { - return static_cast<const Implementation &>(*this).getManifestTypeImpl(); + ManifestType getType() const { + return static_cast<const Implementation &>(*this).getTypeImpl(); } - auth::CryptoHashType getHashAlgorithm() const { - return static_cast<const Implementation &>(*this).getHashAlgorithmImpl(); + interface::ProductionProtocolAlgorithms getTransportType() const { + return static_cast<const Implementation &>(*this).getTransportTypeImpl(); + } + + uint8_t getMaxCapacity() const { + return static_cast<const Implementation &>(*this).getMaxCapacityImpl(); } - uint32_t getFinalChunkNumber() const { - return static_cast<const Implementation &>(*this).getFinalChunkImpl(); + auth::CryptoHashType getHashAlgorithm() const { + return static_cast<const Implementation &>(*this).getHashAlgorithmImpl(); } - NextSegmentCalculationStrategy getNextSegmentCalculationStrategy() const { - return static_cast<const Implementation &>(*this) - .getNextSegmentCalculationStrategyImpl(); + bool getIsLast() const { + return static_cast<const Implementation &>(*this).getIsLastImpl(); } core::Name getBaseName() const { return static_cast<const Implementation &>(*this).getBaseNameImpl(); } - auto getSuffixHashList() { - return static_cast<Implementation &>(*this).getSuffixHashListImpl(); + ParamsBytestream getParamsBytestream() const { + return static_cast<const Implementation &>(*this).getParamsBytestreamImpl(); + } + + ParamsRTC getParamsRTC() const { + return static_cast<const Implementation &>(*this).getParamsRTCImpl(); } - bool getIsFinalManifest() const { - return static_cast<const Implementation &>(*this).getIsFinalManifestImpl(); + auto getEntries() const { + return static_cast<const Implementation &>(*this).getEntriesImpl(); } - ManifestVersion getVersion() const { - return static_cast<const Implementation &>(*this).getVersionImpl(); + std::size_t manifestHeaderSize() const { + return static_cast<const Implementation &>(*this).manifestHeaderSizeImpl(); } - std::size_t estimateSerializedLength(std::size_t number_of_entries) const { - return static_cast<const Implementation &>(*this) - .estimateSerializedLengthImpl(number_of_entries); + std::size_t manifestPayloadSize(size_t additional_entries = 0) const { + return static_cast<const Implementation &>(*this).manifestPayloadSizeImpl( + additional_entries); } - uint32_t getFinalBlockNumber() const { - return static_cast<const Implementation &>(*this).getFinalBlockNumberImpl(); + std::size_t manifestSize(size_t additional_entries = 0) const { + return static_cast<const Implementation &>(*this).manifestSizeImpl( + additional_entries); } }; } // namespace core - } // namespace transport diff --git a/libtransport/src/core/manifest_format_fixed.cc b/libtransport/src/core/manifest_format_fixed.cc index 11d4a56cb..bda666c0c 100644 --- a/libtransport/src/core/manifest_format_fixed.cc +++ b/libtransport/src/core/manifest_format_fixed.cc @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Copyright (c) 2021-2022 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -18,211 +18,331 @@ #include <hicn/transport/utils/literals.h> namespace transport { - namespace core { -// TODO use preallocated pool of membufs -FixedManifestEncoder::FixedManifestEncoder(Packet &packet, - std::size_t signature_size, - bool clear) +// --------------------------------------------------------- +// FixedManifest +// --------------------------------------------------------- +size_t FixedManifest::manifestHeaderSize( + interface::ProductionProtocolAlgorithms transport_type) { + uint32_t params_size = 0; + + switch (transport_type) { + case interface::ProductionProtocolAlgorithms::BYTE_STREAM: + params_size = MANIFEST_PARAMS_BYTESTREAM_SIZE; + break; + case interface::ProductionProtocolAlgorithms::RTC_PROD: + params_size = MANIFEST_PARAMS_RTC_SIZE; + break; + default: + break; + } + + return MANIFEST_META_SIZE + MANIFEST_ENTRY_META_SIZE + params_size; +} + +size_t FixedManifest::manifestPayloadSize(size_t nb_entries) { + return nb_entries * MANIFEST_ENTRY_SIZE; +} + +// --------------------------------------------------------- +// FixedManifestEncoder +// --------------------------------------------------------- +FixedManifestEncoder::FixedManifestEncoder(Packet::Ptr packet, bool clear) : packet_(packet), - max_size_(Packet::default_mtu - packet_.headerSize()), - manifest_header_(reinterpret_cast<ManifestHeader *>( - packet_.writableData() + packet_.headerSize())), - manifest_entries_( - reinterpret_cast<ManifestEntry *>(manifest_header_ + 1)), - current_entry_(0), - signature_size_(signature_size) { + transport_type_(interface::ProductionProtocolAlgorithms::UNKNOWN), + encoded_(false) { + manifest_meta_ = reinterpret_cast<ManifestMeta *>(packet_->writableData() + + packet_->headerSize()); + manifest_entry_meta_ = + reinterpret_cast<ManifestEntryMeta *>(manifest_meta_ + 1); + if (clear) { - *manifest_header_ = {0}; + *manifest_meta_ = {0}; + *manifest_entry_meta_ = {0}; } } FixedManifestEncoder::~FixedManifestEncoder() {} FixedManifestEncoder &FixedManifestEncoder::encodeImpl() { - packet_.append(sizeof(ManifestHeader) + - manifest_header_->number_of_entries * sizeof(ManifestEntry)); - packet_.updateLength(); + if (encoded_) { + return *this; + } + + // Copy manifest header + manifest_meta_->transport_type = static_cast<uint8_t>(transport_type_); + manifest_entry_meta_->nb_entries = manifest_entries_.size(); + + packet_->append(manifestHeaderSizeImpl()); + + packet_->updateLength(); + auto params = reinterpret_cast<uint8_t *>(manifest_entry_meta_ + 1); + + switch (transport_type_) { + case interface::ProductionProtocolAlgorithms::BYTE_STREAM: { + auto bytestream = reinterpret_cast<const uint8_t *>(¶ms_bytestream_); + std::memcpy(params, bytestream, MANIFEST_PARAMS_BYTESTREAM_SIZE); + break; + } + case interface::ProductionProtocolAlgorithms::RTC_PROD: { + auto rtc = reinterpret_cast<const uint8_t *>(¶ms_rtc_); + std::memcpy(params, rtc, MANIFEST_PARAMS_RTC_SIZE); + break; + } + default: + break; + } + + // Copy manifest entries + auto payload = reinterpret_cast<const uint8_t *>(manifest_entries_.data()); + packet_->appendPayload(payload, manifestPayloadSizeImpl()); + + packet_->updateLength(); + if (TRANSPORT_EXPECT_FALSE(packet_->payloadSize() < manifestSizeImpl())) { + throw errors::RuntimeException("Error encoding the manifest"); + } + + encoded_ = true; return *this; } FixedManifestEncoder &FixedManifestEncoder::clearImpl() { - packet_.trimEnd(sizeof(ManifestHeader) + - manifest_header_->number_of_entries * sizeof(ManifestEntry)); - current_entry_ = 0; - *manifest_header_ = {0}; + if (encoded_) { + packet_->trimEnd(manifestSizeImpl()); + } + + transport_type_ = interface::ProductionProtocolAlgorithms::UNKNOWN; + encoded_ = false; + *manifest_meta_ = {0}; + *manifest_entry_meta_ = {0}; + params_bytestream_ = {0}; + params_rtc_ = {0}; + manifest_entries_.clear(); + return *this; } -FixedManifestEncoder &FixedManifestEncoder::setHashAlgorithmImpl( - auth::CryptoHashType algorithm) { - manifest_header_->hash_algorithm = static_cast<uint8_t>(algorithm); +bool FixedManifestEncoder::isEncodedImpl() const { return encoded_; } + +FixedManifestEncoder &FixedManifestEncoder::setTypeImpl( + ManifestType manifest_type) { + manifest_meta_->type = static_cast<uint8_t>(manifest_type); return *this; } -FixedManifestEncoder &FixedManifestEncoder::setManifestTypeImpl( - ManifestType manifest_type) { - manifest_header_->manifest_type = static_cast<uint8_t>(manifest_type); +FixedManifestEncoder &FixedManifestEncoder::setMaxCapacityImpl( + uint8_t max_capacity) { + manifest_meta_->max_capacity = max_capacity; + return *this; +} + +FixedManifestEncoder &FixedManifestEncoder::setHashAlgorithmImpl( + auth::CryptoHashType algorithm) { + manifest_meta_->hash_algorithm = static_cast<uint8_t>(algorithm); return *this; } -FixedManifestEncoder & -FixedManifestEncoder::setNextSegmentCalculationStrategyImpl( - NextSegmentCalculationStrategy strategy) { - manifest_header_->next_segment_strategy = static_cast<uint8_t>(strategy); +FixedManifestEncoder &FixedManifestEncoder::setIsLastImpl(bool is_last) { + manifest_meta_->is_last = static_cast<uint8_t>(is_last); return *this; } FixedManifestEncoder &FixedManifestEncoder::setBaseNameImpl( const core::Name &base_name) { - base_name.copyToDestination( - reinterpret_cast<uint8_t *>(&manifest_header_->prefix[0]), false); - manifest_header_->flags.ipv6 = + manifest_entry_meta_->is_ipv6 = base_name.getAddressFamily() == AF_INET6 ? 1_U8 : 0_U8; + base_name.copyPrefixToDestination( + reinterpret_cast<uint8_t *>(&manifest_entry_meta_->prefix[0])); return *this; } -FixedManifestEncoder &FixedManifestEncoder::addSuffixAndHashImpl( - uint32_t suffix, const auth::CryptoHash &hash) { - auto _hash = hash.getDigest(); - addSuffixHashBytes(suffix, _hash.data(), _hash.size()); +FixedManifestEncoder &FixedManifestEncoder::setParamsBytestreamImpl( + const ParamsBytestream ¶ms) { + transport_type_ = interface::ProductionProtocolAlgorithms::BYTE_STREAM; + params_bytestream_ = TransportParamsBytestream{ + .final_segment = params.final_segment, + }; return *this; } -void FixedManifestEncoder::addSuffixHashBytes(uint32_t suffix, - const uint8_t *hash, - std::size_t length) { - manifest_entries_[current_entry_].suffix = htonl(suffix); - // std::copy(hash, hash + length, - // manifest_entries_[current_entry_].hash); - std::memcpy( - reinterpret_cast<uint8_t *>(manifest_entries_[current_entry_].hash), hash, - length); +FixedManifestEncoder &FixedManifestEncoder::setParamsRTCImpl( + const ParamsRTC ¶ms) { + transport_type_ = interface::ProductionProtocolAlgorithms::RTC_PROD; + params_rtc_ = TransportParamsRTC{ + .timestamp = params.timestamp, + .prod_rate = params.prod_rate, + .prod_seg = params.prod_seg, + .fec_type = static_cast<uint32_t>(params.fec_type), + }; + return *this; +} - manifest_header_->number_of_entries++; - current_entry_++; +FixedManifestEncoder &FixedManifestEncoder::addEntryImpl( + uint32_t suffix, const auth::CryptoHash &hash) { + ManifestEntry last_entry = { + .suffix = portability::host_to_net(suffix), + .hash = {0}, + }; - if (TRANSPORT_EXPECT_FALSE(estimateSerializedLengthImpl() > max_size_)) { - throw errors::RuntimeException("Manifest size exceeded the packet MTU!"); - } -} + auto last_hash = reinterpret_cast<uint8_t *>(last_entry.hash); + std::memcpy(last_hash, hash.getDigest()->data(), hash.getSize()); -FixedManifestEncoder &FixedManifestEncoder::setIsFinalManifestImpl( - bool is_last) { - manifest_header_->flags.is_last = static_cast<uint8_t>(is_last); + manifest_entries_.push_back(last_entry); return *this; } -FixedManifestEncoder &FixedManifestEncoder::setVersionImpl( - ManifestVersion version) { - manifest_header_->version = static_cast<uint8_t>(version); +FixedManifestEncoder &FixedManifestEncoder::removeEntryImpl(uint32_t suffix) { + for (auto it = manifest_entries_.begin(); it != manifest_entries_.end();) { + if (it->suffix == suffix) + it = manifest_entries_.erase(it); + else + ++it; + } return *this; } -std::size_t FixedManifestEncoder::estimateSerializedLengthImpl( - std::size_t additional_entries) { - return sizeof(ManifestHeader) + - (manifest_header_->number_of_entries + additional_entries) * - sizeof(ManifestEntry); +size_t FixedManifestEncoder::manifestHeaderSizeImpl() const { + return FixedManifest::manifestHeaderSize(transport_type_); } -FixedManifestEncoder &FixedManifestEncoder::updateImpl() { - max_size_ = Packet::default_mtu - packet_.headerSize() - signature_size_; - return *this; +size_t FixedManifestEncoder::manifestPayloadSizeImpl( + size_t additional_entries) const { + return FixedManifest::manifestPayloadSize(manifest_entries_.size() + + additional_entries); } -FixedManifestEncoder &FixedManifestEncoder::setFinalBlockNumberImpl( - std::uint32_t final_block_number) { - manifest_header_->final_block_number = htonl(final_block_number); - return *this; +size_t FixedManifestEncoder::manifestSizeImpl(size_t additional_entries) const { + return manifestHeaderSizeImpl() + manifestPayloadSizeImpl(additional_entries); } -std::size_t FixedManifestEncoder::getManifestHeaderSizeImpl() { - return sizeof(ManifestHeader); +// --------------------------------------------------------- +// FixedManifestDecoder +// --------------------------------------------------------- +FixedManifestDecoder::FixedManifestDecoder(Packet::Ptr packet) + : packet_(packet), decoded_(false) { + manifest_meta_ = + reinterpret_cast<ManifestMeta *>(packet_->getPayload()->writableData()); + manifest_entry_meta_ = + reinterpret_cast<ManifestEntryMeta *>(manifest_meta_ + 1); } -std::size_t FixedManifestEncoder::getManifestEntrySizeImpl() { - return sizeof(ManifestEntry); -} - -FixedManifestDecoder::FixedManifestDecoder(Packet &packet) - : packet_(packet), - manifest_header_(reinterpret_cast<ManifestHeader *>( - packet_.getPayload()->writableData())), - manifest_entries_(reinterpret_cast<ManifestEntry *>( - packet_.getPayload()->writableData() + sizeof(ManifestHeader))) {} - FixedManifestDecoder::~FixedManifestDecoder() {} -void FixedManifestDecoder::decodeImpl() { - std::size_t packet_size = packet_.payloadSize(); +FixedManifestDecoder &FixedManifestDecoder::decodeImpl() { + if (decoded_) { + return *this; + } - if (packet_size < sizeof(ManifestHeader) || - packet_size < estimateSerializedLengthImpl()) { + if (packet_->payloadSize() < manifestSizeImpl()) { throw errors::RuntimeException( - "The packet does not match expected manifest size."); + "The packet payload size does not match expected manifest size"); } -} -FixedManifestDecoder &FixedManifestDecoder::clearImpl() { return *this; } + switch (getTransportTypeImpl()) { + case interface::ProductionProtocolAlgorithms::BYTE_STREAM: + params_bytestream_ = reinterpret_cast<TransportParamsBytestream *>( + manifest_entry_meta_ + 1); + manifest_entries_ = + reinterpret_cast<ManifestEntry *>(params_bytestream_ + 1); + break; + case interface::ProductionProtocolAlgorithms::RTC_PROD: + params_rtc_ = + reinterpret_cast<TransportParamsRTC *>(manifest_entry_meta_ + 1); + manifest_entries_ = reinterpret_cast<ManifestEntry *>(params_rtc_ + 1); + break; + default: + manifest_entries_ = + reinterpret_cast<ManifestEntry *>(manifest_entry_meta_ + 1); + break; + } -ManifestType FixedManifestDecoder::getManifestTypeImpl() const { - return static_cast<ManifestType>(manifest_header_->manifest_type); + decoded_ = true; + return *this; } -auth::CryptoHashType FixedManifestDecoder::getHashAlgorithmImpl() const { - return static_cast<auth::CryptoHashType>(manifest_header_->hash_algorithm); +FixedManifestDecoder &FixedManifestDecoder::clearImpl() { + decoded_ = false; + return *this; } -NextSegmentCalculationStrategy -FixedManifestDecoder::getNextSegmentCalculationStrategyImpl() const { - return static_cast<NextSegmentCalculationStrategy>( - manifest_header_->next_segment_strategy); +bool FixedManifestDecoder::isDecodedImpl() const { return decoded_; } + +ManifestType FixedManifestDecoder::getTypeImpl() const { + return static_cast<ManifestType>(manifest_meta_->type); } -typename Fixed::SuffixList FixedManifestDecoder::getSuffixHashListImpl() { - typename Fixed::SuffixList hash_list; +interface::ProductionProtocolAlgorithms +FixedManifestDecoder::getTransportTypeImpl() const { + return static_cast<interface::ProductionProtocolAlgorithms>( + manifest_meta_->transport_type); +} - for (int i = 0; i < manifest_header_->number_of_entries; i++) { - hash_list.insert(hash_list.end(), - std::make_pair(ntohl(manifest_entries_[i].suffix), - reinterpret_cast<uint8_t *>( - &manifest_entries_[i].hash[0]))); - } +uint8_t FixedManifestDecoder::getMaxCapacityImpl() const { + return manifest_meta_->max_capacity; +} - return hash_list; +auth::CryptoHashType FixedManifestDecoder::getHashAlgorithmImpl() const { + return static_cast<auth::CryptoHashType>(manifest_meta_->hash_algorithm); +} + +bool FixedManifestDecoder::getIsLastImpl() const { + return static_cast<bool>(manifest_meta_->is_last); } core::Name FixedManifestDecoder::getBaseNameImpl() const { - if (static_cast<bool>(manifest_header_->flags.ipv6)) { - return core::Name(AF_INET6, - reinterpret_cast<uint8_t *>(&manifest_header_->prefix)); + if (static_cast<bool>(manifest_entry_meta_->is_ipv6)) { + return core::Name( + AF_INET6, reinterpret_cast<uint8_t *>(&manifest_entry_meta_->prefix)); } else { - return core::Name(AF_INET, - reinterpret_cast<uint8_t *>(&manifest_header_->prefix)); + return core::Name( + AF_INET, reinterpret_cast<uint8_t *>(&manifest_entry_meta_->prefix)); } } -bool FixedManifestDecoder::getIsFinalManifestImpl() const { - return static_cast<bool>(manifest_header_->flags.is_last); +ParamsBytestream FixedManifestDecoder::getParamsBytestreamImpl() const { + return ParamsBytestream{ + .final_segment = params_bytestream_->final_segment, + }; +} + +ParamsRTC FixedManifestDecoder::getParamsRTCImpl() const { + return ParamsRTC{ + .timestamp = params_rtc_->timestamp, + .prod_rate = params_rtc_->prod_rate, + .prod_seg = params_rtc_->prod_seg, + .fec_type = static_cast<protocol::fec::FECType>(params_rtc_->fec_type), + }; } -ManifestVersion FixedManifestDecoder::getVersionImpl() const { - return static_cast<ManifestVersion>(manifest_header_->version); +typename Fixed::SuffixList FixedManifestDecoder::getEntriesImpl() const { + typename Fixed::SuffixList hash_list; + + for (int i = 0; i < manifest_entry_meta_->nb_entries; i++) { + hash_list.insert( + hash_list.end(), + std::make_pair( + portability::net_to_host(manifest_entries_[i].suffix), + reinterpret_cast<uint8_t *>(&manifest_entries_[i].hash[0]))); + } + + return hash_list; } -std::size_t FixedManifestDecoder::estimateSerializedLengthImpl( - std::size_t additional_entries) const { - return sizeof(ManifestHeader) + - (additional_entries + manifest_header_->number_of_entries) * - sizeof(ManifestEntry); +size_t FixedManifestDecoder::manifestHeaderSizeImpl() const { + interface::ProductionProtocolAlgorithms type = getTransportTypeImpl(); + return FixedManifest::manifestHeaderSize(type); } -uint32_t FixedManifestDecoder::getFinalBlockNumberImpl() const { - return ntohl(manifest_header_->final_block_number); +size_t FixedManifestDecoder::manifestPayloadSizeImpl( + size_t additional_entries) const { + size_t nb_entries = manifest_entry_meta_->nb_entries + additional_entries; + return FixedManifest::manifestPayloadSize(nb_entries); } -} // end namespace core +size_t FixedManifestDecoder::manifestSizeImpl(size_t additional_entries) const { + return manifestHeaderSizeImpl() + manifestPayloadSizeImpl(additional_entries); +} +} // end namespace core } // end namespace transport diff --git a/libtransport/src/core/manifest_format_fixed.h b/libtransport/src/core/manifest_format_fixed.h index 56ad4ef6d..7ab371974 100644 --- a/libtransport/src/core/manifest_format_fixed.h +++ b/libtransport/src/core/manifest_format_fixed.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -24,26 +24,72 @@ namespace transport { namespace core { -// 0 1 2 3 -// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 -// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ -// |Version| MType |HashAlg|NextStr| Flags |NumberOfEntries| -// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ -// | Final Block Number | -// +---------------------------------------------------------------| -// | | -// + + -// | | -// + Prefix + -// | | -// + + -// | | -// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ -// | Suffix | -// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ -// | Hash Value | -// | | -// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// Manifest Metadata: +// 0 1 2 3 +// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | Type | TTYpe | Max Capacity | Hash Algo |L| Reserved | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + +// Manifest Entry Metadata: +// 0 1 2 3 +// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | Nb entries |I| Reserved | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | | +// + + +// | | +// + Prefix + +// | | +// + + +// | | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + +// Manifest Transport Parameters - Bytestream: +// 0 1 2 3 +// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | Final Segment | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + +// Manifest Transport Parameters - RTC: +// 0 1 2 3 +// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | | +// + Timestamp + +// | | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | Production Rate | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | Current Segment | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | FEC Type | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + +// Manifest Entry: +// 0 1 2 3 +// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | Packet Suffix | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +// | | +// + + +// | | +// + + +// | | +// + + +// | | +// + Packet Digest + +// | | +// + + +// | | +// + + +// | | +// + + +// | | +// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ class FixedManifestEncoder; class FixedManifestDecoder; @@ -58,113 +104,143 @@ struct Fixed { using SuffixList = std::list<std::pair<uint32_t, uint8_t *>>; }; -struct Flags { - std::uint8_t ipv6 : 1; - std::uint8_t is_last : 1; - std::uint8_t unused : 6; +const size_t MANIFEST_META_SIZE = 4; +struct __attribute__((__packed__)) ManifestMeta { + std::uint8_t type : 4; + std::uint8_t transport_type : 4; + std::uint8_t max_capacity; + std::uint8_t hash_algorithm; + std::uint8_t is_last; }; +static_assert(sizeof(ManifestMeta) == MANIFEST_META_SIZE); -struct ManifestEntry { +const size_t MANIFEST_ENTRY_META_SIZE = 20; +struct __attribute__((__packed__)) ManifestEntryMeta { + std::uint8_t nb_entries; + std::uint8_t is_ipv6; + std::uint16_t unused; + std::uint32_t prefix[4]; +}; +static_assert(sizeof(ManifestEntryMeta) == MANIFEST_ENTRY_META_SIZE); + +const size_t MANIFEST_PARAMS_BYTESTREAM_SIZE = 4; +struct __attribute__((__packed__)) TransportParamsBytestream { + std::uint32_t final_segment; +}; +static_assert(sizeof(TransportParamsBytestream) == + MANIFEST_PARAMS_BYTESTREAM_SIZE); + +const size_t MANIFEST_PARAMS_RTC_SIZE = 20; +struct __attribute__((__packed__)) TransportParamsRTC { + std::uint64_t timestamp; + std::uint32_t prod_rate; + std::uint32_t prod_seg; + std::uint32_t fec_type; +}; +static_assert(sizeof(TransportParamsRTC) == MANIFEST_PARAMS_RTC_SIZE); + +const size_t MANIFEST_ENTRY_SIZE = 36; +struct __attribute__((__packed__)) ManifestEntry { std::uint32_t suffix; std::uint32_t hash[8]; }; +static_assert(sizeof(ManifestEntry) == MANIFEST_ENTRY_SIZE); -struct ManifestHeader { - std::uint8_t version : 4; - std::uint8_t manifest_type : 4; - std::uint8_t hash_algorithm : 4; - std::uint8_t next_segment_strategy : 4; - Flags flags; - std::uint8_t number_of_entries; - std::uint32_t final_block_number; - std::uint32_t prefix[4]; - ManifestEntry entries[0]; +class FixedManifest { + public: + static size_t manifestHeaderSize( + interface::ProductionProtocolAlgorithms transport_type); + static size_t manifestPayloadSize(size_t nb_entries); }; -static const constexpr std::uint8_t manifest_version = 1; - class FixedManifestEncoder : public ManifestEncoder<FixedManifestEncoder> { public: - FixedManifestEncoder(Packet &packet, std::size_t signature_size = 0, - bool clear = true); + FixedManifestEncoder(Packet::Ptr packet, bool clear = false); ~FixedManifestEncoder(); FixedManifestEncoder &encodeImpl(); - FixedManifestEncoder &clearImpl(); + bool isEncodedImpl() const; - FixedManifestEncoder &setManifestTypeImpl(ManifestType manifest_type); - + // ManifestMeta + FixedManifestEncoder &setTypeImpl(ManifestType manifest_type); + FixedManifestEncoder &setMaxCapacityImpl(uint8_t max_capacity); FixedManifestEncoder &setHashAlgorithmImpl(Fixed::HashType algorithm); + FixedManifestEncoder &setIsLastImpl(bool is_last); - FixedManifestEncoder &setNextSegmentCalculationStrategyImpl( - NextSegmentCalculationStrategy strategy); - + // ManifestEntryMeta FixedManifestEncoder &setBaseNameImpl(const core::Name &base_name); - FixedManifestEncoder &addSuffixAndHashImpl(uint32_t suffix, - const Fixed::Hash &hash); - - FixedManifestEncoder &setIsFinalManifestImpl(bool is_last); + // TransportParams + FixedManifestEncoder &setParamsBytestreamImpl(const ParamsBytestream ¶ms); + FixedManifestEncoder &setParamsRTCImpl(const ParamsRTC ¶ms); - FixedManifestEncoder &setVersionImpl(ManifestVersion version); + // ManifestEntry + FixedManifestEncoder &addEntryImpl(uint32_t suffix, const Fixed::Hash &hash); + FixedManifestEncoder &removeEntryImpl(uint32_t suffix); - std::size_t estimateSerializedLengthImpl(std::size_t additional_entries = 0); - - FixedManifestEncoder &updateImpl(); - - FixedManifestEncoder &setFinalBlockNumberImpl( - std::uint32_t final_block_number); - - static std::size_t getManifestHeaderSizeImpl(); - - static std::size_t getManifestEntrySizeImpl(); + size_t manifestHeaderSizeImpl() const; + size_t manifestPayloadSizeImpl(size_t additional_entries = 0) const; + size_t manifestSizeImpl(size_t additional_entries = 0) const; private: - void addSuffixHashBytes(uint32_t suffix, const uint8_t *hash, - std::size_t length); - - Packet &packet_; - std::size_t max_size_; - ManifestHeader *manifest_header_; - ManifestEntry *manifest_entries_; - std::size_t current_entry_; - std::size_t signature_size_; + Packet::Ptr packet_; + interface::ProductionProtocolAlgorithms transport_type_; + bool encoded_; + + // Manifest Header + ManifestMeta *manifest_meta_; + ManifestEntryMeta *manifest_entry_meta_; + TransportParamsBytestream params_bytestream_; + TransportParamsRTC params_rtc_; + + // Manifest Entries + std::vector<ManifestEntry> manifest_entries_; }; class FixedManifestDecoder : public ManifestDecoder<FixedManifestDecoder> { public: - FixedManifestDecoder(Packet &packet); + FixedManifestDecoder(Packet::Ptr packet); ~FixedManifestDecoder(); - void decodeImpl(); - + FixedManifestDecoder &decodeImpl(); FixedManifestDecoder &clearImpl(); + bool isDecodedImpl() const; - ManifestType getManifestTypeImpl() const; - + // ManifestMeta + ManifestType getTypeImpl() const; + interface::ProductionProtocolAlgorithms getTransportTypeImpl() const; + uint8_t getMaxCapacityImpl() const; Fixed::HashType getHashAlgorithmImpl() const; + bool getIsLastImpl() const; - NextSegmentCalculationStrategy getNextSegmentCalculationStrategyImpl() const; - - typename Fixed::SuffixList getSuffixHashListImpl(); - + // ManifestEntryMeta core::Name getBaseNameImpl() const; - bool getIsFinalManifestImpl() const; + // TransportParams + ParamsBytestream getParamsBytestreamImpl() const; + ParamsRTC getParamsRTCImpl() const; - std::size_t estimateSerializedLengthImpl( - std::size_t additional_entries = 0) const; + // ManifestEntry + typename Fixed::SuffixList getEntriesImpl() const; - ManifestVersion getVersionImpl() const; - - uint32_t getFinalBlockNumberImpl() const; + size_t manifestHeaderSizeImpl() const; + size_t manifestPayloadSizeImpl(size_t additional_entries = 0) const; + size_t manifestSizeImpl(size_t additional_entries = 0) const; private: - Packet &packet_; - ManifestHeader *manifest_header_; + Packet::Ptr packet_; + bool decoded_; + + // Manifest Header + ManifestMeta *manifest_meta_; + ManifestEntryMeta *manifest_entry_meta_; + TransportParamsBytestream *params_bytestream_; + TransportParamsRTC *params_rtc_; + + // Manifest Entries ManifestEntry *manifest_entries_; }; diff --git a/libtransport/src/core/manifest_inline.h b/libtransport/src/core/manifest_inline.h deleted file mode 100644 index a487ccfe3..000000000 --- a/libtransport/src/core/manifest_inline.h +++ /dev/null @@ -1,142 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include <core/manifest.h> -#include <core/manifest_format.h> -#include <hicn/transport/portability/portability.h> - -#include <set> - -namespace transport { - -namespace core { - -template <typename Base, typename FormatTraits> -class ManifestInline - : public Manifest<Base, FormatTraits, ManifestInline<Base, FormatTraits>> { - using ManifestBase = - Manifest<Base, FormatTraits, ManifestInline<Base, FormatTraits>>; - - using Hash = typename FormatTraits::Hash; - using HashType = typename FormatTraits::HashType; - using Suffix = typename FormatTraits::Suffix; - using SuffixList = typename FormatTraits::SuffixList; - using HashEntry = std::pair<auth::CryptoHashType, std::vector<uint8_t>>; - - public: - ManifestInline() : ManifestBase() {} - - ManifestInline(const core::Name &name, std::size_t signature_size = 0) - : ManifestBase(name, signature_size) {} - - template <typename T> - ManifestInline(T &&base) : ManifestBase(std::forward<T &&>(base)) {} - - static TRANSPORT_ALWAYS_INLINE ManifestInline *createManifest( - const core::Name &manifest_name, ManifestVersion version, - ManifestType type, HashType algorithm, bool is_last, - const Name &base_name, NextSegmentCalculationStrategy strategy, - std::size_t signature_size) { - auto manifest = new ManifestInline(manifest_name, signature_size); - manifest->setVersion(version); - manifest->setManifestType(type); - manifest->setHashAlgorithm(algorithm); - manifest->setFinalManifest(is_last); - manifest->setBaseName(base_name); - manifest->setNextSegmentCalculationStrategy(strategy); - - return manifest; - } - - ManifestInline &encodeImpl() { - ManifestBase::encoder_.encode(); - return *this; - } - - ManifestInline &decodeImpl() { - base_name_ = ManifestBase::decoder_.getBaseName(); - next_segment_strategy_ = - ManifestBase::decoder_.getNextSegmentCalculationStrategy(); - suffix_hash_map_ = ManifestBase::decoder_.getSuffixHashList(); - - return *this; - } - - std::size_t estimateManifestSizeImpl(std::size_t additional_entries = 0) { - return ManifestBase::encoder_.estimateSerializedLength(additional_entries); - } - - ManifestInline &setBaseName(const Name &name) { - base_name_ = name; - ManifestBase::encoder_.setBaseName(base_name_); - return *this; - } - - const Name &getBaseName() { return base_name_; } - - ManifestInline &addSuffixHash(Suffix suffix, const Hash &hash) { - ManifestBase::encoder_.addSuffixAndHash(suffix, hash); - return *this; - } - - // Call this function only after the decode function! - const SuffixList &getSuffixList() { return suffix_hash_map_; } - - ManifestInline &setNextSegmentCalculationStrategy( - NextSegmentCalculationStrategy strategy) { - next_segment_strategy_ = strategy; - ManifestBase::encoder_.setNextSegmentCalculationStrategy( - next_segment_strategy_); - return *this; - } - - NextSegmentCalculationStrategy getNextSegmentCalculationStrategy() { - return next_segment_strategy_; - } - - // Convert several manifests into a single map from suffixes to packet hashes. - // All manifests must have been decoded beforehand. - static std::unordered_map<Suffix, Hash> getSuffixMap( - const std::vector<ManifestInline *> &manifests) { - std::unordered_map<Suffix, Hash> suffix_map; - - for (auto manifest_ptr : manifests) { - HashType hash_type = manifest_ptr->getHashAlgorithm(); - SuffixList suffix_list = manifest_ptr->getSuffixList(); - - for (auto it = suffix_list.begin(); it != suffix_list.end(); ++it) { - Hash hash(it->second, Hash::getSize(hash_type), hash_type); - suffix_map[it->first] = hash; - } - } - - return suffix_map; - } - - static std::unordered_map<Suffix, Hash> getSuffixMap( - ManifestInline *manifest) { - return getSuffixMap(std::vector<ManifestInline *>{manifest}); - } - - private: - core::Name base_name_; - NextSegmentCalculationStrategy next_segment_strategy_; - SuffixList suffix_hash_map_; -}; - -} // namespace core -} // namespace transport diff --git a/libtransport/src/core/memif_connector.cc b/libtransport/src/core/memif_connector.cc new file mode 100644 index 000000000..a224beb11 --- /dev/null +++ b/libtransport/src/core/memif_connector.cc @@ -0,0 +1,503 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <core/errors.h> +#include <core/memif_connector.h> +#include <glog/logging.h> +#include <hicn/transport/errors/not_implemented_exception.h> +#include <sys/epoll.h> + +#include <cstdlib> + +/* sstrncpy */ +#include <hicn/util/sstrncpy.h> + +#define CANCEL_TIMER 1 + +namespace transport { + +namespace core { + +MemifConnector::MemifConnector(PacketReceivedCallback &&receive_callback, + PacketSentCallback &&packet_sent, + OnCloseCallback &&close_callback, + OnReconnectCallback &&on_reconnect, + asio::io_service &io_service, + std::string app_name) + : Connector(std::move(receive_callback), std::move(packet_sent), + std::move(close_callback), std::move(on_reconnect)), + event_reactor_(), + memif_worker_(std::bind(&MemifConnector::threadMain, this)), + timer_set_(false), + send_timer_(event_reactor_), + disconnect_timer_(event_reactor_), + io_service_(io_service), + work_(asio::make_work_guard(io_service_)), + memif_connection_({0}), + tx_buf_counter_(0), + is_reconnection_(false), + data_available_(false), + app_name_(app_name), + socket_filename_(""), + buffer_size_(kbuf_size), + log2_ring_size_(klog2_ring_size), + max_memif_bufs_(1 << klog2_ring_size) {} + +MemifConnector::~MemifConnector() { + try { + close(); + } catch (errors::RuntimeException &e) { + // do nothing + } +} + +void MemifConnector::connect(uint32_t memif_id, long memif_mode, + const std::string &socket_filename, + std::size_t buffer_size, + std::size_t log2_ring_size) { + state_ = State::CONNECTING; + + memif_id_ = memif_id; + socket_filename_ = socket_filename; + buffer_size_ = buffer_size; + log2_ring_size_ = log2_ring_size; + max_memif_bufs_ = 1 << log2_ring_size; + createMemif(memif_id, memif_mode); +} + +int MemifConnector::createMemif(uint32_t index, uint8_t is_master) { + int err = MEMIF_ERR_SUCCESS; + + memif_socket_args_t socket_args; + memif_conn_args_t args; + memset(&socket_args, 0, sizeof(memif_socket_args_t)); + memset(&args, 0, sizeof(memif_conn_args_t)); + + // Setup memif socket first + + int rc = strcpy_s(socket_args.path, sizeof(socket_args.path) - 1, + socket_filename_.c_str()); + if (rc != EOK) { + std::string error = "Provided socket path is larger than " + + std::to_string(sizeof(socket_args.path)) + " bytes."; + throw errors::RuntimeException(error); + } + + rc = strcpy_s(socket_args.app_name, sizeof(socket_args.app_name) - 1, + app_name_.c_str()); + if (rc != EOK) { + std::string error = "Provided app_name is larger than " + + std::to_string(sizeof(socket_args.app_name)) + + " bytes."; + throw errors::RuntimeException(error); + } + + socket_args.on_control_fd_update = controlFdUpdate; + socket_args.alloc = nullptr; + socket_args.realloc = nullptr; + socket_args.free = nullptr; + + err = memif_create_socket(&args.socket, &socket_args, this); + + if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) { + throw errors::RuntimeException(memif_strerror(err)); + } + + // Setup memif connection using provided memif_socket_handle_t + args.is_master = is_master; + args.log2_ring_size = log2_ring_size_; + args.buffer_size = buffer_size_; + args.num_s2m_rings = 1; + args.num_m2s_rings = 1; + strcpy_s((char *)args.interface_name, sizeof(args.interface_name), IF_NAME); + args.mode = memif_interface_mode_t::MEMIF_INTERFACE_MODE_IP; + args.interface_id = index; + err = memif_create(&memif_connection_.conn, &args, onConnect, onDisconnect, + onInterrupt, this); + + if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) { + throw errors::RuntimeException(memif_strerror(err)); + } + + memif_connection_.index = (uint16_t)index; + memif_connection_.tx_qid = 0; + /* alloc memif buffers */ + memif_connection_.rx_buf_num = 0; + memif_connection_.rx_bufs = static_cast<memif_buffer_t *>( + malloc(sizeof(memif_buffer_t) * max_memif_bufs_)); + memif_connection_.tx_buf_num = 0; + memif_connection_.tx_bufs = static_cast<memif_buffer_t *>( + malloc(sizeof(memif_buffer_t) * max_memif_bufs_)); + + return 0; +} + +int MemifConnector::deleteMemif() { + if (memif_connection_.rx_bufs) { + free(memif_connection_.rx_bufs); + } + + memif_connection_.rx_bufs = nullptr; + memif_connection_.rx_buf_num = 0; + + if (memif_connection_.tx_bufs) { + free(memif_connection_.tx_bufs); + } + + memif_connection_.tx_bufs = nullptr; + memif_connection_.tx_buf_num = 0; + + int err; + /* disconenct then delete memif connection */ + err = memif_delete(&memif_connection_.conn); + + if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) { + LOG(ERROR) << "memif_delete: " << memif_strerror(err); + } + + if (TRANSPORT_EXPECT_FALSE(memif_connection_.conn != nullptr)) { + LOG(ERROR) << "memif delete fail"; + } + + state_ = State::CLOSED; + + return 0; +} + +int MemifConnector::controlFdUpdate(memif_fd_event_t fde, void *private_ctx) { + auto self = reinterpret_cast<MemifConnector *>(private_ctx); + uint32_t evt = 0; + + /* convert memif event definitions to epoll events */ + auto events = fde.type; + auto fd = fde.fd; + + if (events & MEMIF_FD_EVENT_ERROR) { + LOG(ERROR) << "memif fd event: Error"; + return -1; + } + + if (events & MEMIF_FD_EVENT_DEL) { + DLOG_IF(INFO, VLOG_IS_ON(4)) << "memif fd event: DEL fd " << fd; + return self->event_reactor_.delFileDescriptor(fd); + } + + if (events & MEMIF_FD_EVENT_MOD) { + DLOG_IF(INFO, VLOG_IS_ON(4)) << "memif fd event: MOD fd " << fd; + return self->event_reactor_.modFileDescriptor(fd, evt); + } + + if (events & MEMIF_FD_EVENT_READ) { + evt |= EPOLLIN; + } + + if (events & MEMIF_FD_EVENT_WRITE) { + evt |= EPOLLOUT; + } + + DLOG_IF(INFO, VLOG_IS_ON(4)) << "memif fd event: ADD fd " << fd; + return self->event_reactor_.addFileDescriptor( + fd, evt, [fde](const utils::Event &evt) -> int { + int event = 0; + int memif_err = 0; + + if (evt.events & EPOLLIN) { + event |= MEMIF_FD_EVENT_READ; + } + + if (evt.events & EPOLLOUT) { + event |= MEMIF_FD_EVENT_WRITE; + } + + if (evt.events & EPOLLERR) { + event |= MEMIF_FD_EVENT_ERROR; + } + + memif_err = memif_control_fd_handler(fde.private_ctx, + memif_fd_event_type_t(event)); + + if (TRANSPORT_EXPECT_FALSE(memif_err != MEMIF_ERR_SUCCESS)) { + LOG(ERROR) << "memif_control_fd_handler: " + << memif_strerror(memif_err); + } + + return 0; + }); +} + +uint16_t MemifConnector::bufferAlloc(long n, uint16_t qid, + std::error_code &ec) { + int err; + uint16_t r = 0; + /* set data pointer to shared memory and set buffer_len to shared mmeory + * buffer len */ + err = memif_buffer_alloc(memif_connection_.conn, qid, + memif_connection_.tx_bufs, n, &r, buffer_size_); + + if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) { + ec = make_error_code(core_error::send_buffer_allocation_failed); + } + + memif_connection_.tx_buf_num += r; + return r; +} + +uint16_t MemifConnector::txBurst(uint16_t qid, std::error_code &ec) { + int err = MEMIF_ERR_SUCCESS; + ec = make_error_code(core_error::success); + uint16_t tx = 0; + + /* inform peer memif interface about data in shared memory buffers */ + /* mark memif buffers as free */ + err = memif_tx_burst(memif_connection_.conn, qid, memif_connection_.tx_bufs, + memif_connection_.tx_buf_num, &tx); + + if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) { + ec = make_error_code(core_error::send_failed); + } + + memif_connection_.tx_buf_num -= tx; + return tx; +} + +void MemifConnector::scheduleSend(std::uint64_t delay) { + if (!timer_set_) { + timer_set_ = true; + send_timer_.expiresFromNow(std::chrono::microseconds(delay)); + send_timer_.asyncWait( + std::bind(&MemifConnector::sendCallback, this, std::placeholders::_1)); + } +} + +void MemifConnector::sendCallback(const std::error_code &ec) { + timer_set_ = false; + + if (TRANSPORT_EXPECT_TRUE(!ec && state_ == State::CONNECTED)) { + doSend(); + } +} + +/* informs user about connected status. private_ctx is used by user to identify + connection (multiple connections WIP) */ +int MemifConnector::onConnect(memif_conn_handle_t conn, void *private_ctx) { + auto self = reinterpret_cast<MemifConnector *>(private_ctx); + self->state_ = State::CONNECTED; + memif_refill_queue(conn, 0, -1, 0); + + DLOG_IF(INFO, VLOG_IS_ON(3)) << "Memif " << self->app_name_ << " connected"; + + // We are connected. Notify higher layers. + self->io_service_.post([self]() { + self->on_reconnect_callback_(self, make_error_code(core_error::success)); + }); + + self->doSend(); + + return 0; +} + +/* informs user about disconnected status. private_ctx is used by user to + identify connection (multiple connections WIP) */ +int MemifConnector::onDisconnect(memif_conn_handle_t conn, void *private_ctx) { + MemifConnector *connector = (MemifConnector *)private_ctx; + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "Memif " << connector->app_name_ << " disconnected"; + return 0; +} + +void MemifConnector::threadMain() { event_reactor_.runEventLoop(200); } + +int MemifConnector::onInterrupt(memif_conn_handle_t conn, void *private_ctx, + uint16_t qid) { + MemifConnector *connector = (MemifConnector *)private_ctx; + + Details &c = connector->memif_connection_; + std::weak_ptr<MemifConnector> self = connector->shared_from_this(); + std::vector<::utils::MemBuf::Ptr> v; + std::error_code ec = make_error_code(core_error::success); + + int err = MEMIF_ERR_SUCCESS, ret_val; + uint16_t rx = 0; + + do { + err = memif_rx_burst(conn, qid, c.rx_bufs, max_burst, &rx); + ret_val = err; + + if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS && + err != MEMIF_ERR_NOBUF)) { + ec = make_error_code(core_error::receive_failed); + LOG(ERROR) << "memif_rx_burst: " << memif_strerror(err); + goto error; + } + + c.rx_buf_num += rx; + + if (TRANSPORT_EXPECT_FALSE(connector->io_service_.stopped())) { + LOG(ERROR) << "socket stopped: ignoring " << rx << " packets"; + goto error; + } + + std::size_t packet_length; + v.reserve(rx); + for (int i = 0; i < rx; i++) { + auto buffer = connector->getRawBuffer(); + packet_length = (c.rx_bufs + i)->len; + std::memcpy(buffer.first, (c.rx_bufs + i)->data, packet_length); + auto packet = connector->getPacketFromBuffer(buffer.first, packet_length); + v.emplace_back(std::move(packet)); + } + + /* mark memif buffers and shared memory buffers as free */ + /* free processed buffers */ + + err = memif_refill_queue(conn, qid, rx, 0); + + if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) { + LOG(ERROR) << "memif_buffer_free: " << memif_strerror(err); + } + + c.rx_buf_num -= rx; + + } while (ret_val == MEMIF_ERR_NOBUF); + + connector->io_service_.post([self, buffers = std::move(v)]() { + if (auto c = self.lock()) { + c->receive_callback_(c.get(), buffers, + std::make_error_code(std::errc(0))); + } + }); + + return 0; + +error: + err = memif_refill_queue(c.conn, qid, rx, 0); + + if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) { + LOG(ERROR) << "memif_buffer_free: " << memif_strerror(err); + } + c.rx_buf_num -= rx; + + connector->io_service_.post([self, ec]() { + if (auto c = self.lock()) { + c->receive_callback_(c.get(), {}, ec); + } + }); + + return 0; +} + +void MemifConnector::close() { + if (state_ != State::CLOSED) { + disconnect_timer_.expiresFromNow(std::chrono::microseconds(50)); + disconnect_timer_.asyncWait([this](const std::error_code &ec) { + deleteMemif(); + event_reactor_.stop(); + }); + } + + if (memif_worker_.joinable()) { + memif_worker_.join(); + } +} + +void MemifConnector::send(Packet &packet) { send(packet.shared_from_this()); } + +void MemifConnector::send(const utils::MemBuf::Ptr &buffer) { + { + utils::SpinLock::Acquire locked(write_msgs_lock_); + output_buffer_.push_back(buffer); + } +#if CANCEL_TIMER + scheduleSend(50); +#endif +} + +int MemifConnector::doSend() { + std::size_t max = 0; + std::size_t size = 0; + std::error_code ec = make_error_code(core_error::success); + int ret = 0; + uint64_t delay = 50; // microseconds + + utils::SpinLock::Acquire locked(write_msgs_lock_); + + // Check if there are pending buffers to send + if (memif_connection_.tx_buf_num > 0) { + ret = txBurst(memif_connection_.tx_qid, ec); + if (TRANSPORT_EXPECT_FALSE(ec.operator bool())) { + delay = 200; + goto done; + } + } + + // Continue trying to send buffers in output_buffer_ + size = output_buffer_.size(); + max = size < max_burst ? size : max_burst; + + ret = bufferAlloc(max, memif_connection_.tx_qid, ec); + if (TRANSPORT_EXPECT_FALSE(ec.operator bool() && ret == 0)) { + delay = 200; + goto done; + } + + // Fill allocated buffers and remove them from output_buffer_ + for (uint16_t i = 0; i < ret; i++) { + auto packet = output_buffer_.front().get(); + const utils::MemBuf *current = packet; + std::size_t offset = 0; + uint8_t *shared_buffer = + reinterpret_cast<uint8_t *>(memif_connection_.tx_bufs[i].data); + do { + std::memcpy(shared_buffer + offset, current->data(), current->length()); + offset += current->length(); + current = current->next(); + } while (current != packet); + + memif_connection_.tx_bufs[i].len = uint32_t(offset); + output_buffer_.pop_front(); + } + + // Try to send them + ret = txBurst(memif_connection_.tx_qid, ec); + if (TRANSPORT_EXPECT_FALSE(ec.operator bool())) { + LOG(ERROR) << "Tx burst failed " << ec.message(); + delay = 200; + goto done; + } + +done: + memif_refill_queue(memif_connection_.conn, memif_connection_.tx_qid, ret, 0); + + // If there are still packets to send, schedule another send + if (memif_connection_.tx_buf_num > 0 || !output_buffer_.empty()) { + scheduleSend(delay); + } + + // If error, signal to upper layers + if (ec.operator bool()) { + std::weak_ptr<MemifConnector> self = shared_from_this(); + io_service_.post([self, ec]() { + if (auto c = self.lock()) { + c->sent_callback_(c.get(), ec); + } + }); + } + + return 0; +} + +} // end namespace core + +} // end namespace transport diff --git a/libtransport/src/core/memif_connector.h b/libtransport/src/core/memif_connector.h new file mode 100644 index 000000000..d36be4616 --- /dev/null +++ b/libtransport/src/core/memif_connector.h @@ -0,0 +1,154 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <hicn/transport/config.h> +#include <hicn/transport/core/connector.h> +#include <hicn/transport/portability/portability.h> +#include <hicn/transport/utils/ring_buffer.h> +//#include <hicn/transport/core/hicn_vapi.h> +#include <hicn/transport/core/asio_wrapper.h> +#include <utils/epoll_event_reactor.h> +#include <utils/fd_deadline_timer.h> + +#include <deque> +#include <future> +#include <mutex> +#include <thread> + +extern "C" { +#include <libmemif.h> +}; + +#define _Static_assert static_assert + +namespace transport { + +namespace core { + +#define APP_NAME "libtransport" +#define IF_NAME "vpp_connection" + +class MemifConnector : public Connector { + static inline std::size_t kbuf_size = 2048; + static inline std::size_t klog2_ring_size = 13; + + using PacketRing = utils::CircularFifo<utils::MemBuf::Ptr, queue_size>; + struct Details { + // index + uint16_t index; + // memif conenction handle + memif_conn_handle_t conn; + // transmit queue id + uint16_t tx_qid; + // tx buffers + memif_buffer_t *tx_bufs; + // allocated tx buffers counter + // number of tx buffers pointing to shared memory + uint16_t tx_buf_num; + // rx buffers + memif_buffer_t *rx_bufs; + // allocated rx buffers counter + // number of rx buffers pointing to shared memory + uint16_t rx_buf_num; + // interface ip address + uint8_t ip_addr[4]; + }; + + public: + MemifConnector(PacketReceivedCallback &&receive_callback, + PacketSentCallback &&packet_sent, + OnCloseCallback &&close_callback, + OnReconnectCallback &&on_reconnect, + asio::io_service &io_service, + std::string app_name = "Libtransport"); + + ~MemifConnector() override; + + void send(Packet &packet) override; + + void send(const utils::MemBuf::Ptr &buffer) override; + + void close() override; + + void connect(uint32_t memif_id, long memif_mode, + const std::string &socket_filename, + std::size_t buffer_size = kbuf_size, + std::size_t log2_ring_size = klog2_ring_size); + + TRANSPORT_ALWAYS_INLINE uint32_t getMemifId() { return memif_id_; }; + + private: + void init(); + + int doSend(); + + int createMemif(uint32_t index, uint8_t is_master); + + uint32_t getMemifConfiguration(); + + int deleteMemif(); + + static int controlFdUpdate(memif_fd_event_t fde, void *private_ctx); + + static int onConnect(memif_conn_handle_t conn, void *private_ctx); + + static int onDisconnect(memif_conn_handle_t conn, void *private_ctx); + + static int onInterrupt(memif_conn_handle_t conn, void *private_ctx, + uint16_t qid); + + void threadMain(); + + uint16_t txBurst(uint16_t qid, std::error_code &ec); + + uint16_t bufferAlloc(long n, uint16_t qid, std::error_code &ec); + + void scheduleSend(std::uint64_t delay); + + void sendCallback(const std::error_code &ec); + + auto shared_from_this() { return utils::shared_from(this); } + + private: + int epfd; + utils::EpollEventReactor event_reactor_; + std::thread memif_worker_; + std::atomic_bool timer_set_; + utils::FdDeadlineTimer send_timer_; + utils::FdDeadlineTimer disconnect_timer_; + asio::io_service &io_service_; + asio::executor_work_guard<asio::io_context::executor_type> work_; + Details memif_connection_; + uint16_t tx_buf_counter_; + + PacketRing input_buffer_; + bool is_reconnection_; + bool data_available_; + uint32_t memif_id_; + uint8_t memif_mode_; + std::string app_name_; + uint16_t transmission_index_; + utils::SpinLock write_msgs_lock_; + std::string socket_filename_; + std::size_t buffer_size_; + std::size_t log2_ring_size_; + std::size_t max_memif_bufs_; +}; + +} // end namespace core + +} // end namespace transport diff --git a/libtransport/src/core/name.cc b/libtransport/src/core/name.cc index 795c8a697..4f8ba7873 100644 --- a/libtransport/src/core/name.cc +++ b/libtransport/src/core/name.cc @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Copyright (c) 2021-2022 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -14,6 +14,7 @@ */ #include <core/manifest_format.h> +#include <hicn/name.h> #include <hicn/transport/core/name.h> #include <hicn/transport/errors/errors.h> #include <hicn/transport/errors/tokenizer_exception.h> @@ -24,32 +25,31 @@ namespace transport { namespace core { -Name::Name() { name_ = {}; } +Name::Name() { std::memset(&name_, 0, sizeof(name_)); } +/** + * XXX This function does not use the name API provided by libhicn + */ Name::Name(int family, const uint8_t *ip_address, std::uint32_t suffix) : name_({}) { - name_.type = HNT_UNSPEC; std::size_t length; uint8_t *dst = NULL; if (family == AF_INET) { - dst = name_.ip4.prefix_as_u8; + dst = name_.prefix.v4.as_u8; length = IPV4_ADDR_LEN; - name_.type = HNT_CONTIGUOUS_V4; } else if (family == AF_INET6) { - dst = name_.ip6.prefix_as_u8; + dst = name_.prefix.v6.as_u8; length = IPV6_ADDR_LEN; - name_.type = HNT_CONTIGUOUS_V6; } else { throw errors::RuntimeException("Specified name family does not exist."); } std::memcpy(dst, ip_address, length); - *reinterpret_cast<std::uint32_t *>(dst + length) = suffix; + name_.suffix = suffix; } Name::Name(const char *name, uint32_t segment) { - name_.type = HNT_UNSPEC; if (hicn_name_create(name, segment, &name_) < 0) { throw errors::InvalidIpAddressException(); } @@ -59,7 +59,6 @@ Name::Name(const std::string &uri, uint32_t segment) : Name(uri.c_str(), segment) {} Name::Name(const std::string &uri) { - name_.type = HNT_UNSPEC; utils::StringTokenizer tokenizer(uri, "|"); std::string ip_address; std::string seq_number; @@ -80,9 +79,13 @@ Name::Name(const std::string &uri) { Name::Name(const Name &name) { this->name_ = name.name_; } +Name::~Name() {} + Name &Name::operator=(const Name &name) { - if (hicn_name_copy(&this->name_, &name.name_) < 0) { - throw errors::MalformedNameException(); + if (this != &name) { + if (hicn_name_copy(&this->name_, &name.name_) < 0) { + throw errors::MalformedNameException(); + } } return *this; @@ -122,16 +125,20 @@ std::string Name::toString() const { } uint32_t Name::getHash32(bool consider_suffix) const { - uint32_t hash; - if (hicn_name_hash(&name_, &hash, consider_suffix) < 0) { + uint32_t hash = _hicn_name_get_hash(&name_, consider_suffix); + if (hash < 0) { throw errors::RuntimeException("Error computing the hash of the name!"); } return hash; } -void Name::clear() { name_.type = HNT_UNSPEC; }; +void Name::clear() { std::memset(&name_, 0, sizeof(name_)); }; -Name::Type Name::getType() const { return name_.type; } +Name::Type Name::getType() const { + int family; + hicn_name_get_family(&name_, &family); + return family == AF_INET ? Name::Type::V4 : Name::Type::V6; +} uint32_t Name::getSuffix() const { uint32_t ret = 0; @@ -142,49 +149,32 @@ uint32_t Name::getSuffix() const { return ret; } -Name &Name::setSuffix(uint32_t seq_number) { - if (hicn_name_set_seq_number(&name_, seq_number) < 0) { - throw errors::RuntimeException( - "Impossible to set the sequence number to the name."); +Name &Name::setSuffix(hicn_name_suffix_t suffix) { + if (hicn_name_set_suffix(&name_, suffix) < 0) { + throw errors::RuntimeException("Impossible to set name suffix."); } return *this; } -std::shared_ptr<Sockaddr> Name::getAddress() const { - Sockaddr *ret = nullptr; - - switch (name_.type) { - case HNT_CONTIGUOUS_V4: - case HNT_IOV_V4: - ret = (Sockaddr *)new Sockaddr4; - break; - case HNT_CONTIGUOUS_V6: - case HNT_IOV_V6: - ret = (Sockaddr *)new Sockaddr6; - break; - default: - throw errors::MalformedNameException(); - } - - if (hicn_name_to_sockaddr_address((hicn_name_t *)&name_, ret) < 0) { - throw errors::MalformedNameException(); - } - - return std::shared_ptr<Sockaddr>(ret); -} - -ip_prefix_t Name::toIpAddress() const { - ip_prefix_t ret; +hicn_ip_prefix_t Name::toIpAddress() const { + hicn_ip_prefix_t ret; std::memset(&ret, 0, sizeof(ret)); - if (hicn_name_to_ip_prefix(&name_, &ret) < 0) { + if (hicn_name_to_hicn_ip_prefix(&name_, &ret) < 0) { throw errors::InvalidIpAddressException(); } return ret; } +std::string Name::getPrefix() const { + char prefix[MAXSZ_HICN_NAME]; + hicn_name_no_suffix_snprintf(prefix, MAXSZ_HICN_NAME, &name_); + + return std::string(prefix); +} + int Name::getAddressFamily() const { int ret = 0; @@ -195,8 +185,8 @@ int Name::getAddressFamily() const { return ret; } -void Name::copyToDestination(uint8_t *destination, bool include_suffix) const { - if (hicn_name_copy_to_destination(destination, &name_, include_suffix) < 0) { +void Name::copyPrefixToDestination(uint8_t *destination) const { + if (hicn_name_copy_prefix_to_destination(destination, &name_) < 0) { throw errors::RuntimeException( "Impossibe to copy the name into the " "provided destination"); diff --git a/libtransport/src/core/packet.cc b/libtransport/src/core/packet.cc index 51337201f..bcd0b8498 100644 --- a/libtransport/src/core/packet.cc +++ b/libtransport/src/core/packet.cc @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Copyright (c) 2021-2022 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -15,6 +15,7 @@ #include <glog/logging.h> #include <hicn/transport/auth/crypto_hash.h> +#include <hicn/transport/core/global_object_pool.h> #include <hicn/transport/core/packet.h> #include <hicn/transport/errors/malformed_packet_exception.h> #include <hicn/transport/utils/hash.h> @@ -23,6 +24,7 @@ extern "C" { #ifndef _WIN32 TRANSPORT_CLANG_DISABLE_WARNING("-Wextern-c-compat") #endif +#include <hicn/base.h> #include <hicn/error.h> } @@ -32,64 +34,70 @@ namespace core { const core::Name Packet::base_name("0::0|0"); -Packet::Packet(Format format, std::size_t additional_header_size) +Packet::Packet(Type type, Format format, std::size_t additional_header_size) : utils::MemBuf(utils::MemBuf(CREATE, 2048)), - packet_start_(reinterpret_cast<hicn_header_t *>(writableData())), - header_offset_(0), - format_(format), payload_type_(PayloadType::UNSPECIFIED) { - setFormat(format_, additional_header_size); + /* + * We define the format and the storage area of the packet buffer we + * manipulate + */ + setFormat(format); + setBuffer(); + initializeType(type); // type requires packet format + initialize(additional_header_size); } -Packet::Packet(MemBuf &&buffer) - : utils::MemBuf(std::move(buffer)), - packet_start_(reinterpret_cast<hicn_header_t *>(writableData())), - header_offset_(0), - format_(getFormatFromBuffer(data(), length())), - payload_type_(PayloadType::UNSPECIFIED) {} - Packet::Packet(CopyBufferOp, const uint8_t *buffer, std::size_t size) : utils::MemBuf(COPY_BUFFER, buffer, size), - packet_start_(reinterpret_cast<hicn_header_t *>(writableData())), - header_offset_(0), - format_(getFormatFromBuffer(data(), length())), - payload_type_(PayloadType::UNSPECIFIED) {} + payload_type_(PayloadType::UNSPECIFIED) { + setBuffer(); + analyze(); +} Packet::Packet(WrapBufferOp, uint8_t *buffer, std::size_t length, std::size_t size) : utils::MemBuf(WRAP_BUFFER, buffer, length, size), - packet_start_(reinterpret_cast<hicn_header_t *>(writableData())), - header_offset_(0), - format_(getFormatFromBuffer(this->data(), this->length())), - payload_type_(PayloadType::UNSPECIFIED) {} + payload_type_(PayloadType::UNSPECIFIED) { + setBuffer(); + analyze(); +} -Packet::Packet(CreateOp, uint8_t *buffer, std::size_t length, std::size_t size, - Format format, std::size_t additional_header_size) +Packet::Packet(CreateOp, Type type, uint8_t *buffer, std::size_t length, + std::size_t size, Format format, + std::size_t additional_header_size) : utils::MemBuf(WRAP_BUFFER, buffer, length, size), - packet_start_(reinterpret_cast<hicn_header_t *>(writableData())), - header_offset_(0), - format_(format), payload_type_(PayloadType::UNSPECIFIED) { clear(); - setFormat(format_, additional_header_size); + setType(type); + setFormat(format); + setBuffer(); + initialize(additional_header_size); } -Packet::Packet(const Packet &other) - : utils::MemBuf(other), - packet_start_(reinterpret_cast<hicn_header_t *>(writableData())), - header_offset_(other.header_offset_), - format_(other.format_), - payload_type_(PayloadType::UNSPECIFIED) {} +Packet::Packet(MemBuf &&buffer) + : utils::MemBuf(std::move(buffer)), + payload_type_(PayloadType::UNSPECIFIED) { + setBuffer(); + analyze(); +} + +/* + * In the two following constructors, we inherit the pkbuf and only need to + * recompute the pointer fields, aka the buffer. + */ Packet::Packet(Packet &&other) : utils::MemBuf(std::move(other)), - packet_start_(other.packet_start_), - header_offset_(other.header_offset_), - format_(other.format_), + pkbuf_(other.pkbuf_), + payload_type_(PayloadType::UNSPECIFIED) { + hicn_packet_reset(&other.pkbuf_); +} + +Packet::Packet(const Packet &other) + : utils::MemBuf(other), + pkbuf_(other.pkbuf_), payload_type_(PayloadType::UNSPECIFIED) { - other.packet_start_ = nullptr; - other.format_ = HF_UNSPEC; - other.header_offset_ = 0; + setBuffer(); } Packet::~Packet() {} @@ -97,86 +105,82 @@ Packet::~Packet() {} Packet &Packet::operator=(const Packet &other) { if (this != &other) { *this = other; - packet_start_ = reinterpret_cast<hicn_header_t *>(writableData()); + setBuffer(); } return *this; } -std::size_t Packet::getHeaderSizeFromBuffer(Format format, - const uint8_t *buffer) { - size_t header_length; - - if (hicn_packet_get_header_length(format, (hicn_header_t *)buffer, - &header_length) < 0) { - throw errors::MalformedPacketException(); - } - - return header_length; +std::shared_ptr<utils::MemBuf> Packet::acquireMemBufReference() { + return std::static_pointer_cast<utils::MemBuf>(shared_from_this()); } -bool Packet::isInterest(const uint8_t *buffer) { - bool is_interest = false; - - if (TRANSPORT_EXPECT_FALSE(hicn_packet_test_ece(HF_INET6_TCP, - (const hicn_header_t *)buffer, - &is_interest) < 0)) { - throw errors::RuntimeException( - "Impossible to retrieve ece flag from packet"); - } +Packet::Format Packet::getFormat() const { + return hicn_packet_get_format(&pkbuf_); +} - return !is_interest; +void Packet::setFormat(Packet::Format format) { + hicn_packet_set_format(&pkbuf_, format); } -void Packet::setFormat(Packet::Format format, - std::size_t additional_header_size) { - format_ = format; - if (hicn_packet_init_header(format_, packet_start_) < 0) { +void Packet::initialize(std::size_t additional_header_size) { + if (hicn_packet_init_header(&pkbuf_, additional_header_size) < 0) { throw errors::RuntimeException("Unexpected error initializing the packet."); } - auto header_size = getHeaderSizeFromFormat(format_); - assert(header_size <= tailroom()); + auto header_size = getHeaderSizeFromFormat(getFormat()); + DCHECK(header_size <= tailroom()); append(header_size); - - assert(additional_header_size <= tailroom()); + DCHECK(additional_header_size <= tailroom()); append(additional_header_size); +} - header_offset_ = length(); +void Packet::analyze() { + if (hicn_packet_analyze(&pkbuf_) < 0) + throw errors::MalformedPacketException(); } -bool Packet::isInterest() { return Packet::isInterest(data()); } +Packet::Type Packet::getType() const { return hicn_packet_get_type(&pkbuf_); } -std::size_t Packet::getPayloadSizeFromBuffer(Format format, - const uint8_t *buffer) { - std::size_t payload_length; - if (TRANSPORT_EXPECT_FALSE( - hicn_packet_get_payload_length(format, (hicn_header_t *)buffer, - &payload_length) < 0)) { - throw errors::MalformedPacketException(); - } +void Packet::initializeType(Packet::Type type) { + hicn_packet_initialize_type(&pkbuf_, type); +} - return payload_length; +void Packet::setType(Packet::Type type) { hicn_packet_set_type(&pkbuf_, type); } + +void Packet::setBuffer() { + hicn_packet_set_buffer(&pkbuf_, writableData(), + this->capacity() - this->headroom(), this->length()); } -std::size_t Packet::payloadSize() const { - std::size_t ret = 0; +PayloadType Packet::getPayloadType() const { + if (payload_type_ == PayloadType::UNSPECIFIED) { + hicn_payload_type_t ret; - if (length()) { - ret = getPayloadSizeFromBuffer(format_, - reinterpret_cast<uint8_t *>(packet_start_)); + if (hicn_packet_get_payload_type(&pkbuf_, &ret) < 0) { + throw errors::RuntimeException("Impossible to retrieve payload type."); + } + + payload_type_ = (PayloadType)ret; } - return ret; + return payload_type_; } -std::size_t Packet::headerSize() const { - if (header_offset_ == 0 && length()) { - const_cast<Packet *>(this)->header_offset_ = getHeaderSizeFromBuffer( - format_, reinterpret_cast<uint8_t *>(packet_start_)); +Packet &Packet::setPayloadType(PayloadType payload_type) { + if (hicn_packet_set_payload_type(&pkbuf_, hicn_payload_type_t(payload_type)) < + 0) { + throw errors::RuntimeException("Error setting payload type of the packet."); } - return header_offset_; + payload_type_ = payload_type; + return *this; +} + +std::unique_ptr<utils::MemBuf> Packet::getPayload() const { + auto ret = clone(); + ret->trimStart(headerSize()); + return ret; } Packet &Packet::appendPayload(std::unique_ptr<utils::MemBuf> &&payload) { @@ -196,70 +200,71 @@ Packet &Packet::appendPayload(const uint8_t *buffer, std::size_t length) { return *this; } -std::unique_ptr<utils::MemBuf> Packet::getPayload() const { - auto ret = clone(); - ret->trimStart(headerSize()); - return ret; +std::size_t Packet::headerSize() const { + std::size_t len; + hicn_packet_get_header_len(&pkbuf_, &len); + return len; } -Packet &Packet::updateLength(std::size_t length) { - std::size_t total_length = length; +std::size_t Packet::payloadSize() const { + std::size_t len; + hicn_packet_get_payload_len(&pkbuf_, &len); + return len; +} - const utils::MemBuf *current = this; - do { - total_length += current->length(); - current = current->next(); - } while (current != this); +auth::CryptoHash Packet::computeDigest(auth::CryptoHashType algorithm) const { + auth::CryptoHash hash; + hash.setType(algorithm); - total_length -= headerSize(); + // Copy IP+TCP/ICMP header before zeroing them + u8 header_copy[HICN_HDRLEN_MAX]; + size_t header_len; + hicn_packet_save_header(&pkbuf_, header_copy, &header_len, + /* copy_ah */ false); + const_cast<Packet *>(this)->resetForHash(); - if (hicn_packet_set_payload_length(format_, packet_start_, total_length) < - 0) { - throw errors::RuntimeException("Error setting the packet payload."); - } + hash.computeDigest(this); + hicn_packet_load_header(&pkbuf_, header_copy, header_len); - return *this; + return hash; } -PayloadType Packet::getPayloadType() const { - if (payload_type_ == PayloadType::UNSPECIFIED) { - hicn_payload_type_t ret; - if (hicn_packet_get_payload_type(packet_start_, &ret) < 0) { - throw errors::RuntimeException("Impossible to retrieve payload type."); - } +void Packet::reset() { + clear(); + hicn_packet_reset(&pkbuf_); + setBuffer(); + payload_type_ = PayloadType::UNSPECIFIED; + name_.clear(); - payload_type_ = (PayloadType)ret; + if (isChained()) { + separateChain(next(), prev()); } - - return payload_type_; } -Packet &Packet::setPayloadType(PayloadType payload_type) { - if (hicn_packet_set_payload_type(packet_start_, - hicn_payload_type_t(payload_type)) < 0) { - throw errors::RuntimeException("Error setting payload type of the packet."); - } +bool Packet::isInterest() { + return hicn_packet_get_type(&pkbuf_) == HICN_PACKET_TYPE_INTEREST; +} - payload_type_ = payload_type; +Packet &Packet::updateLength(std::size_t length) { + std::size_t total_length = length; - return *this; -} + const utils::MemBuf *current = this; + do { + total_length += current->length(); + current = current->next(); + } while (current != this); -Packet::Format Packet::getFormat() const { - /** - * We check packet start because after a movement it will result in a nullptr - */ - if (format_ == HF_UNSPEC && length()) { - if (hicn_packet_get_format(packet_start_, &format_) < 0) { - LOG(ERROR) << "Unexpected packet format HF_UNSPEC."; - } + if (hicn_packet_set_len(&pkbuf_, total_length) < 0) { + throw errors::RuntimeException("Error setting the packet length."); } - return format_; -} + total_length -= headerSize(); -std::shared_ptr<utils::MemBuf> Packet::acquireMemBufReference() { - return std::static_pointer_cast<utils::MemBuf>(shared_from_this()); + if (hicn_packet_set_payload_length(&pkbuf_, total_length) < 0) { + throw errors::RuntimeException("Error setting the packet payload length."); + } + + return *this; } void Packet::dump() const { @@ -274,364 +279,282 @@ void Packet::dump() const { } while (current != this); } -void Packet::dump(uint8_t *buffer, std::size_t length) { - hicn_packet_dump(buffer, length); -} +void Packet::setChecksum() { + size_t header_len = 0; + if (hicn_packet_get_header_len(&pkbuf_, &header_len) < 0) + throw errors::RuntimeException( + "Error setting getting packet header length."); -void Packet::setSignatureSize(std::size_t size_bytes) { - if (!authenticationHeader()) { - throw errors::RuntimeException("Packet without Authentication Header."); - } + uint16_t partial_csum = csum(data() + header_len, length() - header_len, 0); - int ret = hicn_packet_set_signature_size(format_, packet_start_, size_bytes); + for (utils::MemBuf *current = next(); current != this; + current = current->next()) { + partial_csum = csum(current->data(), current->length(), ~partial_csum); + } - if (ret < 0) { - throw errors::RuntimeException("Error setting signature size."); + if (hicn_packet_compute_header_checksum(&pkbuf_, partial_csum) < 0) { + throw errors::MalformedPacketException(); } } -void Packet::setSignatureSizeGap(std::size_t size_bytes) { - if (!authenticationHeader()) { - throw errors::RuntimeException("Packet without Authentication Header."); - } +bool Packet::checkIntegrity() const { + size_t header_len = 0; + if (hicn_packet_get_header_len(&pkbuf_, &header_len) < 0) + throw errors::RuntimeException( + "Error setting getting packet header length."); - int ret = hicn_packet_set_signature_gap(format_, packet_start_, - (uint8_t)size_bytes); + uint16_t partial_csum = csum(data() + header_len, length() - header_len, 0); - if (ret < 0) { - throw errors::RuntimeException("Error setting signature size."); + for (const utils::MemBuf *current = next(); current != this; + current = current->next()) { + partial_csum = csum(current->data(), current->length(), ~partial_csum); + } + + if (hicn_packet_check_integrity_no_payload(&pkbuf_, partial_csum) < 0) { + return false; } + + return true; } -uint8_t *Packet::getSignature() const { - if (!authenticationHeader()) { +bool Packet::hasAH() const { return HICN_PACKET_FORMAT_IS_AH(getFormat()); } + +utils::MemBuf::Ptr Packet::getSignature() const { + if (!hasAH()) { throw errors::RuntimeException("Packet without Authentication Header."); } uint8_t *signature; - int ret = hicn_packet_get_signature(format_, packet_start_, &signature); + int ret = hicn_packet_get_signature(&pkbuf_, &signature); if (ret < 0) { throw errors::RuntimeException("Error getting signature."); } - return signature; + utils::MemBuf::Ptr membuf = PacketManager<>::getInstance().getMemBuf(); + membuf->append(getSignatureFieldSize()); + memcpy(membuf->writableData(), signature, getSignatureFieldSize()); + + return membuf; } -void Packet::setSignatureTimestamp(const uint64_t ×tamp) { - if (!authenticationHeader()) { +std::size_t Packet::getSignatureFieldSize() const { + if (!hasAH()) { throw errors::RuntimeException("Packet without Authentication Header."); } - int ret = - hicn_packet_set_signature_timestamp(format_, packet_start_, timestamp); - + size_t field_size; + int ret = hicn_packet_get_signature_size(&pkbuf_, &field_size); if (ret < 0) { - throw errors::RuntimeException("Error setting the signature timestamp."); + throw errors::RuntimeException("Error reading signature field size"); } + return field_size; } -uint64_t Packet::getSignatureTimestamp() const { - if (!authenticationHeader()) { +std::size_t Packet::getSignatureSize() const { + if (!hasAH()) { throw errors::RuntimeException("Packet without Authentication Header."); } - uint64_t return_value; - int ret = hicn_packet_get_signature_timestamp(format_, packet_start_, - &return_value); - + size_t padding; + int ret = hicn_packet_get_signature_padding(&pkbuf_, &padding); if (ret < 0) { - throw errors::RuntimeException("Error getting the signature timestamp."); + throw errors::RuntimeException("Error reading signature padding"); + } + + size_t size = getSignatureFieldSize() - padding; + if (size < 0) { + throw errors::RuntimeException("Error reading signature size"); } - return return_value; + return size; } -void Packet::setValidationAlgorithm( - const auth::CryptoSuite &validation_algorithm) { - if (!authenticationHeader()) { +uint64_t Packet::getSignatureTimestamp() const { + if (!hasAH()) { throw errors::RuntimeException("Packet without Authentication Header."); } - int ret = hicn_packet_set_validation_algorithm(format_, packet_start_, - uint8_t(validation_algorithm)); - + uint64_t timestamp; + int ret = hicn_packet_get_signature_timestamp(&pkbuf_, ×tamp); if (ret < 0) { - throw errors::RuntimeException("Error setting the validation algorithm."); + throw errors::RuntimeException("Error getting the signature timestamp."); } + return timestamp; } -auth::CryptoSuite Packet::getValidationAlgorithm() const { - if (!authenticationHeader()) { +auth::KeyId Packet::getKeyId() const { + if (!hasAH()) { throw errors::RuntimeException("Packet without Authentication Header."); } - uint8_t return_value; - int ret = hicn_packet_get_validation_algorithm(format_, packet_start_, - &return_value); - + auth::KeyId key_id; + int ret = hicn_packet_get_key_id(&pkbuf_, &key_id.first, &key_id.second); if (ret < 0) { throw errors::RuntimeException("Error getting the validation algorithm."); } - - return auth::CryptoSuite(return_value); + return key_id; } -void Packet::setKeyId(const auth::KeyId &key_id) { - if (!authenticationHeader()) { +auth::CryptoSuite Packet::getValidationAlgorithm() const { + if (!hasAH()) { throw errors::RuntimeException("Packet without Authentication Header."); } - int ret = hicn_packet_set_key_id(format_, packet_start_, key_id.first); - + uint8_t return_value; + int ret = hicn_packet_get_validation_algorithm(&pkbuf_, &return_value); if (ret < 0) { - throw errors::RuntimeException("Error setting the key id."); + throw errors::RuntimeException("Error getting the validation algorithm."); } + return auth::CryptoSuite(return_value); } -auth::KeyId Packet::getKeyId() const { - if (!authenticationHeader()) { +void Packet::setSignature(const utils::MemBuf::Ptr &signature) { + if (!hasAH()) { throw errors::RuntimeException("Packet without Authentication Header."); } - auth::KeyId return_value; - int ret = hicn_packet_get_key_id(format_, packet_start_, &return_value.first, - &return_value.second); - + uint8_t *signature_field; + int ret = hicn_packet_get_signature(&pkbuf_, &signature_field); if (ret < 0) { - throw errors::RuntimeException("Error getting the validation algorithm."); + throw errors::RuntimeException("Error getting signature."); } - - return return_value; -} - -auth::CryptoHash Packet::computeDigest(auth::CryptoHashType algorithm) const { - auth::CryptoHash hash; - hash.setType(algorithm); - - // Copy IP+TCP/ICMP header before zeroing them - hicn_header_t header_copy; - - hicn_packet_copy_header(format_, packet_start_, &header_copy, false); - - const_cast<Packet *>(this)->resetForHash(); - - hash.computeDigest(this); - hicn_packet_copy_header(format_, &header_copy, packet_start_, false); - - return hash; + memcpy(signature_field, signature->data(), signature->length()); } -bool Packet::checkIntegrity() const { - uint16_t partial_csum = - csum(data() + HICN_V6_TCP_HDRLEN, length() - HICN_V6_TCP_HDRLEN, 0); - - for (const utils::MemBuf *current = next(); current != this; - current = current->next()) { - partial_csum = csum(current->data(), current->length(), ~partial_csum); - } - - if (hicn_packet_check_integrity_no_payload(format_, packet_start_, - partial_csum) < 0) { - return false; +void Packet::setSignatureFieldSize(std::size_t size) { + if (!hasAH()) { + throw errors::RuntimeException("Packet without Authentication Header."); } - return true; -} - -void Packet::prependPayload(const uint8_t **buffer, std::size_t *size) { - auto last = prev(); - auto to_copy = std::min(*size, last->tailroom()); - std::memcpy(last->writableTail(), *buffer, to_copy); - last->append(to_copy); - *size -= to_copy; - *buffer += to_copy; -} - -Packet &Packet::setSyn() { - if (hicn_packet_set_syn(format_, packet_start_) < 0) { - throw errors::RuntimeException("Error setting syn bit in the packet."); + int ret = hicn_packet_set_signature_size(&pkbuf_, size); + if (ret < 0) { + throw errors::RuntimeException("Error setting signature size."); } - - return *this; } -Packet &Packet::resetSyn() { - if (hicn_packet_reset_syn(format_, packet_start_) < 0) { - throw errors::RuntimeException("Error resetting syn bit in the packet."); +void Packet::setSignatureSize(std::size_t size) { + if (!hasAH()) { + throw errors::RuntimeException("Packet without Authentication Header."); } - return *this; -} - -bool Packet::testSyn() const { - bool res = false; - if (hicn_packet_test_syn(format_, packet_start_, &res) < 0) { - throw errors::RuntimeException("Error testing syn bit in the packet."); + size_t padding = getSignatureFieldSize() - size; + if (padding < 0) { + throw errors::RuntimeException("Error setting signature padding."); } - return res; -} - -Packet &Packet::setAck() { - if (hicn_packet_set_ack(format_, packet_start_) < 0) { - throw errors::RuntimeException("Error setting ack bit in the packet."); + int ret = hicn_packet_set_signature_padding(&pkbuf_, padding); + if (ret < 0) { + throw errors::RuntimeException("Error setting signature padding."); } - - return *this; } -Packet &Packet::resetAck() { - if (hicn_packet_reset_ack(format_, packet_start_) < 0) { - throw errors::RuntimeException("Error resetting ack bit in the packet."); +void Packet::setSignatureTimestamp(const uint64_t ×tamp) { + if (!hasAH()) { + throw errors::RuntimeException("Packet without Authentication Header."); } - return *this; -} - -bool Packet::testAck() const { - bool res = false; - if (hicn_packet_test_ack(format_, packet_start_, &res) < 0) { - throw errors::RuntimeException("Error testing ack bit in the packet."); + int ret = hicn_packet_set_signature_timestamp(&pkbuf_, timestamp); + if (ret < 0) { + throw errors::RuntimeException("Error setting the signature timestamp."); } - - return res; } -Packet &Packet::setRst() { - if (hicn_packet_set_rst(format_, packet_start_) < 0) { - throw errors::RuntimeException("Error setting rst bit in the packet."); +void Packet::setKeyId(const auth::KeyId &key_id) { + if (!hasAH()) { + throw errors::RuntimeException("Packet without Authentication Header."); } - return *this; -} - -Packet &Packet::resetRst() { - if (hicn_packet_reset_rst(format_, packet_start_) < 0) { - throw errors::RuntimeException("Error resetting rst bit in the packet."); + int ret = hicn_packet_set_key_id(&pkbuf_, key_id.first, key_id.second); + if (ret < 0) { + throw errors::RuntimeException("Error setting the key id."); } - - return *this; } -bool Packet::testRst() const { - bool res = false; - if (hicn_packet_test_rst(format_, packet_start_, &res) < 0) { - throw errors::RuntimeException("Error testing rst bit in the packet."); +void Packet::setValidationAlgorithm( + const auth::CryptoSuite &validation_algorithm) { + if (!hasAH()) { + throw errors::RuntimeException("Packet without Authentication Header."); } - return res; -} - -Packet &Packet::setFin() { - if (hicn_packet_set_fin(format_, packet_start_) < 0) { - throw errors::RuntimeException("Error setting fin bit in the packet."); + int ret = hicn_packet_set_validation_algorithm(&pkbuf_, + uint8_t(validation_algorithm)); + if (ret < 0) { + throw errors::RuntimeException("Error setting the validation algorithm."); } - - return *this; } -Packet &Packet::resetFin() { - if (hicn_packet_reset_fin(format_, packet_start_) < 0) { - throw errors::RuntimeException("Error resetting fin bit in the packet."); - } - - return *this; +Packet::Format Packet::toAHFormat(const Format &format) { + return hicn_get_ah_format(format); } -bool Packet::testFin() const { - bool res = false; - if (hicn_packet_test_fin(format_, packet_start_, &res) < 0) { - throw errors::RuntimeException("Error testing fin bit in the packet."); - } +Packet::Format Packet::getFormatFromBuffer(const uint8_t *buffer, + std::size_t length) { + hicn_packet_buffer_t pkbuf; + /* un-const to be able to use pkbuf API */ + hicn_packet_set_buffer(&pkbuf, (uint8_t *)buffer, length, length); + if (hicn_packet_analyze(&pkbuf) < 0) throw errors::MalformedPacketException(); - return res; + return hicn_packet_get_format(&pkbuf); } -Packet &Packet::resetFlags() { - resetSyn(); - resetAck(); - resetRst(); - resetFin(); - - return *this; +std::size_t Packet::getHeaderSizeFromFormat(Format format, + std::size_t signature_size) { + std::size_t header_length; + hicn_packet_get_header_length_from_format(format, &header_length); + int is_ah = HICN_PACKET_FORMAT_IS_AH(format); + return is_ah * (header_length + signature_size) + (!is_ah) * header_length; } -std::string Packet::printFlags() const { - std::string flags; - - if (testSyn()) { - flags += "S"; - } - - if (testAck()) { - flags += "A"; - } +std::size_t Packet::getHeaderSizeFromBuffer(const uint8_t *buffer, + std::size_t length) { + size_t header_length; - if (testRst()) { - flags += "R"; - } + hicn_packet_buffer_t pkbuf; + /* un-const to be able to use pkbuf API */ + hicn_packet_set_buffer(&pkbuf, (uint8_t *)buffer, length, length); + if (hicn_packet_analyze(&pkbuf) < 0) throw errors::MalformedPacketException(); - if (testFin()) { - flags += "F"; - } + int rc = hicn_packet_get_header_len(&pkbuf, &header_length); + if (TRANSPORT_EXPECT_FALSE(rc < 0)) throw errors::MalformedPacketException(); - return flags; + return header_length; } -Packet &Packet::setSrcPort(uint16_t srcPort) { - if (hicn_packet_set_src_port(format_, packet_start_, srcPort) < 0) { - throw errors::RuntimeException("Error setting source port in the packet."); - } +std::size_t Packet::getPayloadSizeFromBuffer(const uint8_t *buffer, + std::size_t length) { + std::size_t payload_length; - return *this; -} + hicn_packet_buffer_t pkbuf; + /* un-const to be able to use pkbuf API */ + hicn_packet_set_buffer(&pkbuf, (uint8_t *)buffer, length, length); + if (hicn_packet_analyze(&pkbuf) < 0) throw errors::MalformedPacketException(); -Packet &Packet::setDstPort(uint16_t dstPort) { - if (hicn_packet_set_dst_port(format_, packet_start_, dstPort) < 0) { - throw errors::RuntimeException( - "Error setting destination port in the packet."); - } + int rc = hicn_packet_get_payload_len(&pkbuf, &payload_length); + if (TRANSPORT_EXPECT_FALSE(rc < 0)) throw errors::MalformedPacketException(); - return *this; + return payload_length; } -uint16_t Packet::getSrcPort() const { - uint16_t port = 0; - - if (hicn_packet_get_src_port(format_, packet_start_, &port) < 0) { - throw errors::RuntimeException("Error reading source port in the packet."); - } - - return port; +void Packet::dump(uint8_t *buffer, std::size_t length) { + hicn_packet_dump(buffer, length); } -uint16_t Packet::getDstPort() const { - uint16_t port = 0; - - if (hicn_packet_get_dst_port(format_, packet_start_, &port) < 0) { - throw errors::RuntimeException( - "Error reading destination port in the packet."); - } - - return port; +void Packet::prependPayload(const uint8_t **buffer, std::size_t *size) { + auto last = prev(); + auto to_copy = std::min(*size, last->tailroom()); + std::memcpy(last->writableTail(), *buffer, to_copy); + last->append(to_copy); + *size -= to_copy; + *buffer += to_copy; } -Packet &Packet::setTTL(uint8_t hops) { - if (hicn_packet_set_hoplimit(packet_start_, hops) < 0) { - throw errors::RuntimeException("Error setting TTL."); - } - - return *this; +void Packet::saveHeader(u8 *header, size_t *header_len) { + hicn_packet_save_header(&pkbuf_, header, header_len, /* copy_ah */ false); } -uint8_t Packet::getTTL() const { - uint8_t hops = 0; - if (hicn_packet_get_hoplimit(packet_start_, &hops) < 0) { - throw errors::RuntimeException("Error reading TTL."); - } - - return hops; +void Packet::loadHeader(u8 *header, size_t header_len) { + hicn_packet_load_header(&pkbuf_, header, header_len); } } // end namespace core diff --git a/libtransport/src/core/pending_interest.cc b/libtransport/src/core/pending_interest.cc deleted file mode 100644 index fbe98cab5..000000000 --- a/libtransport/src/core/pending_interest.cc +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <core/pending_interest.h> - -namespace transport { - -namespace core { - -PendingInterest::PendingInterest() - : interest_(nullptr, nullptr), - timer_(), - on_content_object_callback_(), - on_interest_timeout_callback_() {} - -PendingInterest::PendingInterest(Interest::Ptr &&interest, - std::unique_ptr<asio::steady_timer> &&timer) - : interest_(std::move(interest)), - timer_(std::move(timer)), - on_content_object_callback_(), - on_interest_timeout_callback_() {} - -PendingInterest::PendingInterest( - Interest::Ptr &&interest, OnContentObjectCallback &&on_content_object, - OnInterestTimeoutCallback &&on_interest_timeout, - std::unique_ptr<asio::steady_timer> &&timer) - : interest_(std::move(interest)), - timer_(std::move(timer)), - on_content_object_callback_(std::move(on_content_object)), - on_interest_timeout_callback_(std::move(on_interest_timeout)) {} - -PendingInterest::~PendingInterest() {} - -void PendingInterest::cancelTimer() { timer_->cancel(); } - -void PendingInterest::setInterest(Interest::Ptr &&interest) { - interest_ = std::move(interest); -} - -Interest::Ptr &&PendingInterest::getInterest() { return std::move(interest_); } - -const OnContentObjectCallback &PendingInterest::getOnDataCallback() const { - return on_content_object_callback_; -} - -void PendingInterest::setOnContentObjectCallback( - OnContentObjectCallback &&on_content_object) { - PendingInterest::on_content_object_callback_ = on_content_object; -} - -const OnInterestTimeoutCallback &PendingInterest::getOnTimeoutCallback() const { - return on_interest_timeout_callback_; -} - -void PendingInterest::setOnTimeoutCallback( - OnInterestTimeoutCallback &&on_interest_timeout) { - PendingInterest::on_interest_timeout_callback_ = on_interest_timeout; -} - -} // end namespace core - -} // end namespace transport diff --git a/libtransport/src/core/pending_interest.h b/libtransport/src/core/pending_interest.h index 99a8bd327..f49348bac 100644 --- a/libtransport/src/core/pending_interest.h +++ b/libtransport/src/core/pending_interest.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -42,70 +42,60 @@ class PendingInterest { public: using Ptr = utils::ObjectPool<PendingInterest>::Ptr; - // PendingInterest() - // : interest_(nullptr, nullptr), - // timer_(), - // on_content_object_callback_(), - // on_interest_timeout_callback_() {} - - PendingInterest(Interest::Ptr &&interest, - std::unique_ptr<asio::steady_timer> &&timer) - : interest_(std::move(interest)), - timer_(std::move(timer)), - on_content_object_callback_(), - on_interest_timeout_callback_() {} - - PendingInterest(Interest::Ptr &&interest, + + PendingInterest(asio::io_service &io_service, const Interest::Ptr &interest) + : interest_(interest), timer_(io_service) {} + + PendingInterest(asio::io_service &io_service, const Interest::Ptr &interest, OnContentObjectCallback &&on_content_object, - OnInterestTimeoutCallback &&on_interest_timeout, - std::unique_ptr<asio::steady_timer> &&timer) - : interest_(std::move(interest)), - timer_(std::move(timer)), + OnInterestTimeoutCallback &&on_interest_timeout) + : interest_(interest), + timer_(io_service), on_content_object_callback_(std::move(on_content_object)), on_interest_timeout_callback_(std::move(on_interest_timeout)) {} ~PendingInterest() = default; template <typename Handler> - TRANSPORT_ALWAYS_INLINE void startCountdown(Handler &&cb) { - timer_->expires_from_now( - std::chrono::milliseconds(interest_->getLifetime())); - timer_->async_wait(std::forward<Handler &&>(cb)); + void startCountdown(uint32_t lifetime, Handler &&cb) { + timer_.expires_from_now(std::chrono::milliseconds(lifetime)); + timer_.async_wait(std::forward<Handler>(cb)); } - TRANSPORT_ALWAYS_INLINE void cancelTimer() { timer_->cancel(); } - - TRANSPORT_ALWAYS_INLINE Interest::Ptr &&getInterest() { - return std::move(interest_); + void cancelTimer() { + try { + timer_.cancel(); + } catch (asio::system_error &e) { + // do nothing + } } - TRANSPORT_ALWAYS_INLINE void setInterest(Interest::Ptr &interest) { - interest_ = interest; - } + Interest::Ptr &&getInterest() { return std::move(interest_); } + + const Interest::Ptr &getInterestReference() const { return interest_; } + + void setInterest(const Interest::Ptr &interest) { interest_ = interest; } - TRANSPORT_ALWAYS_INLINE const OnContentObjectCallback &getOnDataCallback() - const { + const OnContentObjectCallback &getOnDataCallback() const { return on_content_object_callback_; } - TRANSPORT_ALWAYS_INLINE void setOnContentObjectCallback( - OnContentObjectCallback &&on_content_object) { - PendingInterest::on_content_object_callback_ = on_content_object; + void setOnContentObjectCallback(OnContentObjectCallback &&on_content_object) { + PendingInterest::on_content_object_callback_ = std::move(on_content_object); } - TRANSPORT_ALWAYS_INLINE const OnInterestTimeoutCallback & - getOnTimeoutCallback() const { + const OnInterestTimeoutCallback &getOnTimeoutCallback() const { return on_interest_timeout_callback_; } - TRANSPORT_ALWAYS_INLINE void setOnTimeoutCallback( - OnInterestTimeoutCallback &&on_interest_timeout) { - PendingInterest::on_interest_timeout_callback_ = on_interest_timeout; + void setOnTimeoutCallback(OnInterestTimeoutCallback &&on_interest_timeout) { + PendingInterest::on_interest_timeout_callback_ = + std::move(on_interest_timeout); } private: Interest::Ptr interest_; - std::unique_ptr<asio::steady_timer> timer_; + asio::steady_timer timer_; OnContentObjectCallback on_content_object_callback_; OnInterestTimeoutCallback on_interest_timeout_callback_; }; diff --git a/libtransport/src/core/portal.cc b/libtransport/src/core/portal.cc index c4c0cf8ba..72b6a6f37 100644 --- a/libtransport/src/core/portal.cc +++ b/libtransport/src/core/portal.cc @@ -43,12 +43,14 @@ std::string Portal::io_module_path_ = defaultIoModule(); std::string Portal::defaultIoModule() { using namespace std::placeholders; GlobalConfiguration::getInstance().registerConfigurationParser( - io_module_section, + IoModuleConfiguration::section, std::bind(&Portal::parseIoModuleConfiguration, _1, _2)); GlobalConfiguration::getInstance().registerConfigurationGetter( - io_module_section, std::bind(&Portal::getModuleConfiguration, _1, _2)); + IoModuleConfiguration::section, + std::bind(&Portal::getModuleConfiguration, _1, _2)); GlobalConfiguration::getInstance().registerConfigurationSetter( - io_module_section, std::bind(&Portal::setModuleConfiguration, _1, _2)); + IoModuleConfiguration::section, + std::bind(&Portal::setModuleConfiguration, _1, _2)); // return default conf_.name = default_module; @@ -57,7 +59,7 @@ std::string Portal::defaultIoModule() { void Portal::getModuleConfiguration(ConfigurationObject& object, std::error_code& ec) { - assert(object.getKey() == io_module_section); + DCHECK(object.getKey() == IoModuleConfiguration::section); auto conf = dynamic_cast<const IoModuleConfiguration&>(object); conf = conf_; @@ -103,7 +105,7 @@ std::string getIoModulePath(const std::string& name, void Portal::setModuleConfiguration(const ConfigurationObject& object, std::error_code& ec) { - assert(object.getKey() == io_module_section); + DCHECK(object.getKey() == IoModuleConfiguration::section); const IoModuleConfiguration& conf = dynamic_cast<const IoModuleConfiguration&>(object); @@ -147,4 +149,4 @@ void Portal::parseIoModuleConfiguration(const libconfig::Setting& io_config, } } // namespace core -} // namespace transport
\ No newline at end of file +} // namespace transport diff --git a/libtransport/src/core/portal.h b/libtransport/src/core/portal.h index f6a9ce85b..c4ba096b9 100644 --- a/libtransport/src/core/portal.h +++ b/libtransport/src/core/portal.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Copyright (c) 2021-2022 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -15,6 +15,7 @@ #pragma once +#include <core/global_workers.h> #include <core/pending_interest.h> #include <glog/logging.h> #include <hicn/transport/config.h> @@ -28,6 +29,7 @@ #include <hicn/transport/interfaces/global_conf_interface.h> #include <hicn/transport/interfaces/portal.h> #include <hicn/transport/portability/portability.h> +#include <hicn/transport/utils/event_thread.h> #include <hicn/transport/utils/fixed_block_allocator.h> #include <future> @@ -54,11 +56,11 @@ class HandlerMemory { HandlerMemory(const HandlerMemory &) = delete; HandlerMemory &operator=(const HandlerMemory &) = delete; - TRANSPORT_ALWAYS_INLINE void *allocate(std::size_t size) { + void *allocate(std::size_t /* size */) { return utils::FixedBlockAllocator<128, 8192>::getInstance().allocateBlock(); } - TRANSPORT_ALWAYS_INLINE void deallocate(void *pointer) { + void deallocate(void *pointer) { utils::FixedBlockAllocator<128, 8192>::getInstance().deallocateBlock( pointer); } @@ -69,13 +71,9 @@ class HandlerMemory { HandlerMemory(const HandlerMemory &) = delete; HandlerMemory &operator=(const HandlerMemory &) = delete; - TRANSPORT_ALWAYS_INLINE void *allocate(std::size_t size) { - return ::operator new(size); - } + void *allocate(std::size_t size) { return ::operator new(size); } - TRANSPORT_ALWAYS_INLINE void deallocate(void *pointer) { - ::operator delete(pointer); - } + void deallocate(void *pointer) { ::operator delete(pointer); } #endif }; @@ -92,21 +90,19 @@ class HandlerAllocator { HandlerAllocator(const HandlerAllocator<U> &other) noexcept : memory_(other.memory_) {} - TRANSPORT_ALWAYS_INLINE bool operator==( - const HandlerAllocator &other) const noexcept { + bool operator==(const HandlerAllocator &other) const noexcept { return &memory_ == &other.memory_; } - TRANSPORT_ALWAYS_INLINE bool operator!=( - const HandlerAllocator &other) const noexcept { + bool operator!=(const HandlerAllocator &other) const noexcept { return &memory_ != &other.memory_; } - TRANSPORT_ALWAYS_INLINE T *allocate(std::size_t n) const { + T *allocate(std::size_t n) const { return static_cast<T *>(memory_.allocate(sizeof(T) * n)); } - TRANSPORT_ALWAYS_INLINE void deallocate(T *p, std::size_t /*n*/) const { + void deallocate(T *p, std::size_t /*n*/) const { return memory_.deallocate(p); } @@ -135,7 +131,7 @@ class CustomAllocatorHandler { } template <typename... Args> - void operator()(Args &&... args) { + void operator()(Args &&...args) { handler_(std::forward<Args>(args)...); } @@ -151,49 +147,16 @@ inline CustomAllocatorHandler<Handler> makeCustomAllocatorHandler( return CustomAllocatorHandler<Handler>(m, h); } -class Pool { - public: - Pool(asio::io_service &io_service) : io_service_(io_service) { - increasePendingInterestPool(); - } - - TRANSPORT_ALWAYS_INLINE void increasePendingInterestPool() { - // Create pool of pending interests to reuse - for (uint32_t i = 0; i < pit_size; i++) { - pending_interests_pool_.add(new PendingInterest( - Interest::Ptr(nullptr), - std::make_unique<asio::steady_timer>(io_service_))); - } - } - PendingInterest::Ptr getPendingInterest() { - auto res = pending_interests_pool_.get(); - while (TRANSPORT_EXPECT_FALSE(!res.first)) { - increasePendingInterestPool(); - res = pending_interests_pool_.get(); - } - - return std::move(res.second); - } - - private: - utils::ObjectPool<PendingInterest> pending_interests_pool_; - asio::io_service &io_service_; -}; - } // namespace portal_details class PortalConfiguration; -using PendingInterestHashTable = - std::unordered_map<uint32_t, PendingInterest::Ptr>; - -using interface::BindConfig; +using PendingInterestHashTable = std::unordered_map<uint32_t, PendingInterest>; /** * Portal is a opaque class which is used for sending/receiving interest/data - * packets over multiple kind of connector. The connector itself is defined by - * the template ForwarderInt, which is resolved at compile time. It is then not - * possible to decide at runtime what the connector will be. + * packets over multiple kind of io_modules. The io_module itself is an external + * module loaded at runtime. * * The tasks performed by portal are the following: * - Sending/Receiving Interest packets @@ -210,83 +173,117 @@ using interface::BindConfig; * users of this class. */ -class Portal { - public: - using ConsumerCallback = interface::Portal::ConsumerCallback; - using ProducerCallback = interface::Portal::ProducerCallback; +class Portal : public ::utils::NonCopyable, + public std::enable_shared_from_this<Portal> { + private: + Portal() : Portal(GlobalWorkers::getInstance().getWorker()) {} + Portal(::utils::EventThread &worker) + : io_module_(nullptr), + worker_(worker), + app_name_("libtransport_application"), + transport_callback_(nullptr), + is_consumer_(false) {} + + public: + using TransportCallback = interface::Portal::TransportCallback; friend class PortalConfiguration; - Portal() : Portal(internal_io_service_) {} + static std::shared_ptr<Portal> createShared() { + return std::shared_ptr<Portal>(new Portal()); + } - Portal(asio::io_service &io_service) - : io_module_(nullptr, [](IoModule *module) { IoModule::unload(module); }), - io_service_(io_service), - packet_pool_(io_service), - app_name_("libtransport_application"), - consumer_callback_(nullptr), - producer_callback_(nullptr), - is_consumer_(false) { - /** - * This workaroung allows to initialize memory for packet buffers *before* - * any static variables that may be initialized in the io_modules. In this - * way static variables in modules will be destroyed before the packet - * memory. - */ - PacketManager<>::getInstance(); + static std::shared_ptr<Portal> createShared(::utils::EventThread &worker) { + return std::shared_ptr<Portal>(new Portal(worker)); } + + bool isConnected() const { return io_module_.get() != nullptr; } + /** - * Set the consumer callback. + * Set the transport callback. Must be called from the same worker thread. * - * @param consumer_callback - The pointer to the ConsumerCallback object. + * @param consumer_callback - The pointer to the TransportCallback object. */ - void setConsumerCallback(ConsumerCallback *consumer_callback) { - consumer_callback_ = consumer_callback; + void registerTransportCallback(TransportCallback *transport_callback) { + DCHECK(std::this_thread::get_id() == worker_.getThreadId()); + transport_callback_ = transport_callback; } /** - * Set the producer callback. - * - * @param producer_callback - The pointer to the ProducerCallback object. + * Unset the consumer callback. Must be called from the same worker thread. */ - void setProducerCallback(ProducerCallback *producer_callback) { - producer_callback_ = producer_callback; + void unregisterTransportCallback() { + DCHECK(std::this_thread::get_id() == worker_.getThreadId()); + transport_callback_ = nullptr; } /** - * Specify the output interface to use. This method will be useful in a future - * scenario where the library will be able to forward packets without + * Specify the output interface to use. This method will be useful in a + * future scenario where the library will be able to forward packets without * connecting to a local forwarder. Now it is not used. * * @param output_interface - The output interface to use for * forwarding/receiving packets. */ - TRANSPORT_ALWAYS_INLINE void setOutputInterface( - const std::string &output_interface) { - io_module_->setOutputInterface(output_interface); + void setOutputInterface(const std::string &output_interface) { + if (io_module_) { + io_module_->setOutputInterface(output_interface); + } } /** * Connect the transport to the local hicn forwarder. * - * @param is_consumer - Boolean specifying if the application on top of portal - * is a consumer or a producer. + * @param is_consumer - Boolean specifying if the application on top of + * portal is a consumer or a producer. */ - TRANSPORT_ALWAYS_INLINE void connect(bool is_consumer = true) { - if (!io_module_) { - pending_interest_hash_table_.reserve(portal_details::pit_size); - io_module_.reset(IoModule::load(io_module_path_.c_str())); - - CHECK(io_module_); - - io_module_->init(std::bind(&Portal::processIncomingMessages, this, - std::placeholders::_1, std::placeholders::_2, - std::placeholders::_3), - std::bind(&Portal::setLocalRoutes, this), io_service_, - app_name_); - io_module_->connect(is_consumer); - is_consumer_ = is_consumer; + void connect(bool is_consumer = true) { + if (isConnected()) { + return; } + + worker_.addAndWaitForExecution([this, is_consumer]() { + if (!io_module_) { + pending_interest_hash_table_.reserve(portal_details::pit_size); + io_module_.reset(IoModule::load(io_module_path_.c_str())); + + CHECK(io_module_); + + std::weak_ptr<Portal> self(shared_from_this()); + + io_module_->init( + [self](Connector *c, const std::vector<utils::MemBuf::Ptr> &buffers, + const std::error_code &ec) { + if (auto ptr = self.lock()) { + ptr->processIncomingMessages(c, buffers, ec); + } + }, + [self](Connector *c, const std::error_code &ec) { + if (!ec) { + return; + } + auto ptr = self.lock(); + if (ptr && ptr->transport_callback_) { + ptr->transport_callback_->onError(ec); + } + }, + [self]([[maybe_unused]] Connector *c) { /* Nothing to do here */ }, + [self](Connector *c, const std::error_code &ec) { + auto ptr = self.lock(); + if (ptr) { + if (ec && ptr->transport_callback_) { + ptr->transport_callback_->onError(ec); + return; + } + ptr->setLocalRoutes(); + } + }, + worker_.getIoService(), app_name_); + + io_module_->connect(is_consumer); + is_consumer_ = is_consumer; + } + }); } /** @@ -295,18 +292,13 @@ class Portal { ~Portal() { killConnection(); } /** - * Compute name hash - */ - TRANSPORT_ALWAYS_INLINE uint32_t getHash(const Name &name) { - return name.getHash32(false) + name.getSuffix(); - } - - /** * Check if there is already a pending interest for a given name. * * @param name - The interest name. */ - TRANSPORT_ALWAYS_INLINE bool interestIsPending(const Name &name) { + bool interestIsPending(const Name &name) { + DCHECK(std::this_thread::get_id() == worker_.getThreadId()); + auto it = pending_interest_hash_table_.find(getHash(name)); if (it != pending_interest_hash_table_.end()) { return true; @@ -316,58 +308,53 @@ class Portal { } /** - * Send an interest through to the local forwarder. + * @brief Add interest to PIT * - * @param interest - The pointer to the interest. The ownership of the - * interest is transferred by the caller to portal. - * - * @param on_content_object_callback - If the caller wishes to use a different - * callback to be called for this interest, it can set this parameter. - * Otherwise ConsumerCallback::onContentObject will be used. - * - * @param on_interest_timeout_callback - If the caller wishes to use a - * different callback to be called for this interest, it can set this - * parameter. Otherwise ConsumerCallback::onTimeout will be used. */ - TRANSPORT_ALWAYS_INLINE void sendInterest( - Interest::Ptr &&interest, + void addInterestToPIT( + const Interest::Ptr &interest, uint32_t lifetime, OnContentObjectCallback &&on_content_object_callback = UNSET_CALLBACK, OnInterestTimeoutCallback &&on_interest_timeout_callback = UNSET_CALLBACK) { - // Send it - interest->encodeSuffixes(); - io_module_->send(*interest); - uint32_t initial_hash = interest->getName().getHash32(false); auto hash = initial_hash + interest->getName().getSuffix(); uint32_t seq = interest->getName().getSuffix(); - uint32_t *suffix = interest->firstSuffix(); - auto n_suffixes = interest->numberOfSuffixes(); + const uint32_t *suffix = interest->firstSuffix() != nullptr + ? interest->firstSuffix() + 1 + : nullptr; + auto n_suffixes = + interest->numberOfSuffixes() > 0 ? interest->numberOfSuffixes() - 1 : 0; uint32_t counter = 0; // Set timers do { - auto pending_interest = packet_pool_.getPendingInterest(); - pending_interest->setInterest(interest); - pending_interest->setOnContentObjectCallback( + auto pend_int = pending_interest_hash_table_.try_emplace( + hash, worker_.getIoService(), interest); + PendingInterest &pending_interest = pend_int.first->second; + if (!pend_int.second) { + // element was already in map + pend_int.first->second.cancelTimer(); + pending_interest.setInterest(interest); + } + + pending_interest.setOnContentObjectCallback( std::move(on_content_object_callback)); - pending_interest->setOnTimeoutCallback( + pending_interest.setOnTimeoutCallback( std::move(on_interest_timeout_callback)); - pending_interest->startCountdown( - portal_details::makeCustomAllocatorHandler( - async_callback_memory_, - std::bind(&Portal::timerHandler, this, std::placeholders::_1, - hash, seq))); - - auto it = pending_interest_hash_table_.find(hash); - if (it != pending_interest_hash_table_.end()) { - it->second->cancelTimer(); - - // Get reference to interest packet in order to have it destroyed. - auto _int = it->second->getInterest(); - it->second = std::move(pending_interest); - } else { - pending_interest_hash_table_[hash] = std::move(pending_interest); + if (is_consumer_) { + auto self = weak_from_this(); + pending_interest.startCountdown( + lifetime, portal_details::makeCustomAllocatorHandler( + async_callback_memory_, + [self, hash, seq](const std::error_code &ec) { + if (TRANSPORT_EXPECT_FALSE(ec.operator bool())) { + return; + } + + if (auto ptr = self.lock()) { + ptr->timerHandler(hash, seq); + } + })); } if (suffix) { @@ -375,224 +362,261 @@ class Portal { seq = *suffix; suffix++; } - } while (counter++ < n_suffixes); } - /** - * Handler fot the timer set when the interest is sent. - * - * @param ec - Error code which says whether the timer expired or has been - * canceled upon data packet reception. - * - * @param hash - The index of the interest in the pending interest hash table. - */ - TRANSPORT_ALWAYS_INLINE void timerHandler(const std::error_code &ec, - uint32_t hash, uint32_t seq) { - bool is_stopped = io_service_.stopped(); - if (TRANSPORT_EXPECT_FALSE(is_stopped)) { - return; - } + void matchContentObjectInPIT(ContentObject &content_object) { + uint32_t hash = getHash(content_object.getName()); + auto it = pending_interest_hash_table_.find(hash); + if (it != pending_interest_hash_table_.end()) { + DLOG_IF(INFO, VLOG_IS_ON(3)) << "Found pending interest."; + + PendingInterest &pend_interest = it->second; + pend_interest.cancelTimer(); + auto _int = pend_interest.getInterest(); + auto callback = pend_interest.getOnDataCallback(); + pending_interest_hash_table_.erase(it); - if (TRANSPORT_EXPECT_TRUE(!ec)) { - PendingInterestHashTable::iterator it = - pending_interest_hash_table_.find(hash); - if (it != pending_interest_hash_table_.end()) { - PendingInterest::Ptr ptr = std::move(it->second); - pending_interest_hash_table_.erase(it); - auto _int = ptr->getInterest(); - Name &name = const_cast<Name &>(_int->getName()); - name.setSuffix(seq); - - if (ptr->getOnTimeoutCallback() != UNSET_CALLBACK) { - ptr->on_interest_timeout_callback_(_int, name); - } else if (consumer_callback_) { - consumer_callback_->onTimeout(_int, name); + if (is_consumer_) { + // Send object is for the app + if (callback != UNSET_CALLBACK) { + callback(*_int, content_object); + } else if (transport_callback_) { + transport_callback_->onContentObject(*_int, content_object); } + } else { + // Send content object to the network + io_module_->send(content_object); } + } else if (is_consumer_) { + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "No interest pending for received content object."; } } /** - * Register a producer name to the local forwarder and optionally set the - * content store size in a per-face manner. + * Send an interest through to the local forwarder. * - * @param config - The configuration for the local forwarder binding. - */ - TRANSPORT_ALWAYS_INLINE void bind(const BindConfig &config) { - assert(io_module_); - io_module_->setContentStoreSize(config.csReserved()); - served_namespaces_.push_back(config.prefix()); - setLocalRoutes(); - } - - /** - * Start the event loop. This function blocks here and calls the callback set - * by the application upon interest/data received or timeout. + * @param interest - The pointer to the interest. The ownership of the + * interest is transferred by the caller to portal. + * + * @param on_content_object_callback - If the caller wishes to use a + * different callback to be called for this interest, it can set this + * parameter. Otherwise ConsumerCallback::onContentObject will be used. + * + * @param on_interest_timeout_callback - If the caller wishes to use a + * different callback to be called for this interest, it can set this + * parameter. Otherwise ConsumerCallback::onTimeout will be used. */ - TRANSPORT_ALWAYS_INLINE void runEventsLoop() { - if (io_service_.stopped()) { - io_service_.reset(); // ensure that run()/poll() will do some work - } + void sendInterest( + Interest::Ptr &interest, uint32_t lifetime, + OnContentObjectCallback &&on_content_object_callback = UNSET_CALLBACK, + OnInterestTimeoutCallback &&on_interest_timeout_callback = + UNSET_CALLBACK) { + DCHECK(std::this_thread::get_id() == worker_.getThreadId()); - io_service_.run(); + addInterestToPIT(interest, lifetime, std::move(on_content_object_callback), + std::move(on_interest_timeout_callback)); + interest->serializeSuffixes(); + io_module_->send(*interest); } /** - * Run one event and return. + * Handler fot the timer set when the interest is sent. + * + * @param ec - Error code which says whether the timer expired or has been + * canceled upon data packet reception. + * + * @param hash - The index of the interest in the pending interest hash + * table. */ - TRANSPORT_ALWAYS_INLINE void runOneEvent() { - if (io_service_.stopped()) { - io_service_.reset(); // ensure that run()/poll() will do some work - } + void timerHandler(uint32_t hash, uint32_t seq) { + PendingInterestHashTable::iterator it = + pending_interest_hash_table_.find(hash); + if (it != pending_interest_hash_table_.end()) { + PendingInterest &pend_interest = it->second; + auto _int = pend_interest.getInterest(); + auto callback = pend_interest.getOnTimeoutCallback(); + pending_interest_hash_table_.erase(it); + Name &name = const_cast<Name &>(_int->getName()); + name.setSuffix(seq); - io_service_.run_one(); + if (callback != UNSET_CALLBACK) { + callback(_int, name); + } else if (transport_callback_) { + transport_callback_->onTimeout(_int, name); + } + } } /** - * Send a data packet to the local forwarder. As opposite to sendInterest, the - * ownership of the content object is not transferred to the portal. + * Send a data packet to the local forwarder. * * @param content_object - The data packet. */ - TRANSPORT_ALWAYS_INLINE void sendContentObject( - ContentObject &content_object) { - io_module_->send(content_object); + void sendContentObject(ContentObject &content_object) { + DCHECK(io_module_); + DCHECK(std::this_thread::get_id() == worker_.getThreadId()); + matchContentObjectInPIT(content_object); } /** - * Stop the event loop, canceling all the pending events in the event queue. - * - * Beware that stopping the event loop DOES NOT disconnect the transport from - * the local forwarder, the connector underneath will stay connected. + * Disconnect the transport from the local forwarder. */ - TRANSPORT_ALWAYS_INLINE void stopEventsLoop() { - if (!io_service_.stopped()) { - io_service_.dispatch([this]() { - clear(); - io_service_.stop(); - }); + void killConnection() { + if (TRANSPORT_EXPECT_TRUE(io_module_ != nullptr)) { + io_module_->closeConnection(); } } /** - * Disconnect the transport from the local forwarder. + * Clear the pending interest hash table. */ - TRANSPORT_ALWAYS_INLINE void killConnection() { - io_module_->closeConnection(); + void clear() { + worker_.tryRunHandlerNow([self{shared_from_this()}]() { self->doClear(); }); } /** - * Clear the pending interest hash table. + * Get a reference to the io_service object. */ - TRANSPORT_ALWAYS_INLINE void clear() { - if (!io_service_.stopped()) { - io_service_.dispatch(std::bind(&Portal::doClear, this)); - } else { - doClear(); - } - } + utils::EventThread &getThread() { return worker_; } /** - * Remove one pending interest. + * Register a route to the local forwarder. */ - TRANSPORT_ALWAYS_INLINE void clearOne(const Name &name) { - if (!io_service_.stopped()) { - io_service_.dispatch(std::bind(&Portal::doClearOne, this, name)); - } else { - doClearOne(name); - } + void registerRoute(const Prefix &prefix) { + std::weak_ptr<Portal> self = shared_from_this(); + worker_.tryRunHandlerNow([self, prefix]() { + if (auto ptr = self.lock()) { + auto ret = ptr->served_namespaces_.insert(prefix); + if (ret.second && ptr->io_module_ && ptr->io_module_->isConnected()) { + ptr->io_module_->registerRoute(prefix); + } + } + }); } /** - * Get a reference to the io_service object. + * Send a MAP-Me update to traverse NATs. */ - TRANSPORT_ALWAYS_INLINE asio::io_service &getIoService() { - return io_service_; + TRANSPORT_ALWAYS_INLINE void sendMapme() { + if (io_module_->isConnected()) { + io_module_->sendMapme(); + } } /** - * Register a route to the local forwarder. + * set forwarding strategy */ - TRANSPORT_ALWAYS_INLINE void registerRoute(Prefix &prefix) { - served_namespaces_.push_back(prefix); + TRANSPORT_ALWAYS_INLINE void setForwardingStrategy(Prefix &prefix, + std::string &strategy) { if (io_module_->isConnected()) { - io_module_->registerRoute(prefix); + io_module_->setForwardingStrategy(prefix, strategy); } } /** * Check if the transport is connected to a forwarder or not */ - TRANSPORT_ALWAYS_INLINE bool isConnectedToFwd() { + bool isConnectedToFwd() { std::string mod = io_module_path_.substr(0, io_module_path_.find(".")); if (mod == "forwarder_module") return false; return true; } + auto &getServedNamespaces() { return served_namespaces_; } + private: /** + * Compute name hash + */ + uint32_t getHash(const Name &name) { + return name.getHash32(false) + name.getSuffix(); + } + + /** * Clear the pending interest hash table. */ - TRANSPORT_ALWAYS_INLINE void doClear() { + void doClear() { for (auto &pend_interest : pending_interest_hash_table_) { - pend_interest.second->cancelTimer(); - - // Get interest packet from pending interest and do nothing with it. It - // will get destroyed as it goes out of scope. - auto _int = pend_interest.second->getInterest(); + pend_interest.second.cancelTimer(); } pending_interest_hash_table_.clear(); } - /** - * Remove one pending interest. - */ - TRANSPORT_ALWAYS_INLINE void doClearOne(const Name &name) { - auto it = pending_interest_hash_table_.find(getHash(name)); - - if (it != pending_interest_hash_table_.end()) { - it->second->cancelTimer(); + void dumpPIT() { + std::vector<Name> sorted_elements; + for (const auto &[key, value] : pending_interest_hash_table_) { + sorted_elements.push_back(value.getInterestReference()->getName()); + } - // Get interest packet from pending interest and do nothing with it. It - // will get destroyed as it goes out of scope. - auto _int = it->second->getInterest(); + std::sort(sorted_elements.begin(), sorted_elements.end(), + [](const Name &a, const Name &b) { + return a.getSuffix() < b.getSuffix(); + }); - pending_interest_hash_table_.erase(it); + for (auto &elt : sorted_elements) { + LOG(INFO) << elt; } } /** - * Callback called by the underlying connector upon reception of a packet from - * the local forwarder. + * Callback called by the underlying connector upon reception of a packet + * from the local forwarder. * * @param packet_buffer - The bytes of the packet. */ - TRANSPORT_ALWAYS_INLINE void processIncomingMessages( - Connector *c, utils::MemBuf &buffer, const std::error_code &ec) { - bool is_stopped = io_service_.stopped(); - if (TRANSPORT_EXPECT_FALSE(is_stopped)) { + void processIncomingMessages(Connector *c, + const std::vector<utils::MemBuf::Ptr> &buffers, + const std::error_code &ec) { + if (!transport_callback_) { return; } - if (TRANSPORT_EXPECT_FALSE(io_module_->isControlMessage(buffer.data()))) { - processControlMessage(buffer); + if (TRANSPORT_EXPECT_FALSE(ec.operator bool())) { + // Error receiving from underlying infra. + if (transport_callback_) { + transport_callback_->onError(ec); + } + return; } - // The buffer is a base class for an interest or a content object - Packet &packet_buffer = static_cast<Packet &>(buffer); + for (auto &buffer_ptr : buffers) { + auto &buffer = *buffer_ptr; - auto format = packet_buffer.getFormat(); - if (TRANSPORT_EXPECT_TRUE(_is_tcp(format))) { - if (is_consumer_) { - processContentObject(static_cast<ContentObject &>(packet_buffer)); - } else { - processInterest(static_cast<Interest &>(packet_buffer)); + if (TRANSPORT_EXPECT_FALSE(io_module_->isControlMessage(buffer))) { + processControlMessage(buffer); + return; + } + + // The buffer is a base class for an interest or a content object + Packet &packet_buffer = static_cast<Packet &>(buffer); + + switch (packet_buffer.getType()) { + case HICN_PACKET_TYPE_INTEREST: + if (!is_consumer_) { + processInterest(static_cast<Interest &>(packet_buffer)); + } else { + LOG(ERROR) << "Received an Interest packet with name " + << packet_buffer.getName() + << " in a consumer transport. Ignoring it."; + } + break; + case HICN_PACKET_TYPE_DATA: + if (is_consumer_) { + processContentObject(static_cast<ContentObject &>(packet_buffer)); + } else { + LOG(ERROR) << "Received a Data packet with name " + << packet_buffer.getName() + << " in a producer transport. Ignoring it."; + } + break; + default: + LOG(ERROR) << "Received not supported packet. Ignoring it."; + break; } - } else { - LOG(ERROR) << "Received not supported packet. Ignoring it."; } } @@ -601,19 +625,25 @@ class Portal { * It register the prefixes in the served_namespaces_ list to the local * forwarder. */ - TRANSPORT_ALWAYS_INLINE void setLocalRoutes() { + void setLocalRoutes() { + DCHECK(io_module_); + DCHECK(io_module_->isConnected()); + DCHECK(std::this_thread::get_id() == worker_.getThreadId()); + for (auto &prefix : served_namespaces_) { - if (io_module_->isConnected()) { - io_module_->registerRoute(prefix); - } + io_module_->registerRoute(prefix); } } - TRANSPORT_ALWAYS_INLINE void processInterest(Interest &interest) { + void processInterest(Interest &interest) { // Interest for a producer DLOG_IF(INFO, VLOG_IS_ON(3)) << "processInterest " << interest.getName(); - if (TRANSPORT_EXPECT_TRUE(producer_callback_ != nullptr)) { - producer_callback_->onInterest(interest); + + // Save interest in PIT + interest.deserializeSuffixes(); + addInterestToPIT(interest.shared_from_this(), interest.getLifetime()); + if (TRANSPORT_EXPECT_TRUE(transport_callback_ != nullptr)) { + transport_callback_->onInterest(interest); } } @@ -625,57 +655,33 @@ class Portal { * * @param content_object - The data packet */ - TRANSPORT_ALWAYS_INLINE void processContentObject( - ContentObject &content_object) { + void processContentObject(ContentObject &content_object) { DLOG_IF(INFO, VLOG_IS_ON(3)) << "processContentObject " << content_object.getName(); - uint32_t hash = getHash(content_object.getName()); - - auto it = pending_interest_hash_table_.find(hash); - if (it != pending_interest_hash_table_.end()) { - DLOG_IF(INFO, VLOG_IS_ON(3)) << "Found pending interest."; - - PendingInterest::Ptr interest_ptr = std::move(it->second); - pending_interest_hash_table_.erase(it); - interest_ptr->cancelTimer(); - auto _int = interest_ptr->getInterest(); - - if (interest_ptr->getOnDataCallback() != UNSET_CALLBACK) { - interest_ptr->on_content_object_callback_(*_int, content_object); - } else if (consumer_callback_) { - consumer_callback_->onContentObject(*_int, content_object); - } - } else { - DLOG_IF(INFO, VLOG_IS_ON(3)) - << "No interest pending for received content object."; - } + matchContentObjectInPIT(content_object); } /** - * Process a control message. Control messages are different depending on the - * connector, then the forwarder_interface will do the job of understanding - * them. + * Process a control message. Control messages are different depending on + * the connector, then the forwarder_interface will do the job of + * understanding them. */ - TRANSPORT_ALWAYS_INLINE void processControlMessage( - utils::MemBuf &packet_buffer) { + void processControlMessage(utils::MemBuf &packet_buffer) { io_module_->processControlMessageReply(packet_buffer); } private: portal_details::HandlerMemory async_callback_memory_; - std::unique_ptr<IoModule, void (*)(IoModule *)> io_module_; + std::unique_ptr<IoModule> io_module_; - asio::io_service &io_service_; - asio::io_service internal_io_service_; - portal_details::Pool packet_pool_; + ::utils::EventThread &worker_; std::string app_name_; PendingInterestHashTable pending_interest_hash_table_; - std::list<Prefix> served_namespaces_; + std::set<Prefix> served_namespaces_; - ConsumerCallback *consumer_callback_; - ProducerCallback *producer_callback_; + TransportCallback *transport_callback_; bool is_consumer_; diff --git a/libtransport/src/core/prefix.cc b/libtransport/src/core/prefix.cc index d598cff75..8c5b153bc 100644 --- a/libtransport/src/core/prefix.cc +++ b/libtransport/src/core/prefix.cc @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Copyright (c) 2021-2022 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -13,8 +13,10 @@ * limitations under the License. */ +#include <glog/logging.h> #include <hicn/transport/core/prefix.h> #include <hicn/transport/errors/errors.h> +#include <hicn/transport/portability/endianess.h> #include <hicn/transport/utils/string_tokenizer.h> #ifndef _WIN32 @@ -35,11 +37,7 @@ namespace transport { namespace core { -Prefix::Prefix() { std::memset(&ip_prefix_, 0, sizeof(ip_prefix_t)); } - -Prefix::Prefix(const char *prefix) : Prefix(std::string(prefix)) {} - -Prefix::Prefix(std::string &&prefix) : Prefix(prefix) {} +Prefix::Prefix() { std::memset(&hicn_ip_prefix_, 0, sizeof(hicn_ip_prefix_t)); } Prefix::Prefix(const std::string &prefix) { utils::StringTokenizer st(prefix, "/"); @@ -56,7 +54,7 @@ Prefix::Prefix(const std::string &prefix) { buildPrefix(ip_address, uint16_t(atoi(prefix_length.c_str())), family); } -Prefix::Prefix(std::string &prefix, uint16_t prefix_length) { +Prefix::Prefix(const std::string &prefix, uint16_t prefix_length) { int family = get_addr_family(prefix.c_str()); buildPrefix(prefix, prefix_length, family); } @@ -68,24 +66,28 @@ Prefix::Prefix(const core::Name &content_name, uint16_t prefix_length) { throw errors::InvalidIpAddressException(); } - ip_prefix_ = content_name.toIpAddress(); - ip_prefix_.len = (u8)prefix_length; - ip_prefix_.family = family; + hicn_ip_prefix_ = content_name.toIpAddress(); + hicn_ip_prefix_.len = (u8)prefix_length; + hicn_ip_prefix_.family = family; } -void Prefix::buildPrefix(std::string &prefix, uint16_t prefix_length, +void Prefix::buildPrefix(const std::string &prefix, uint16_t prefix_length, int family) { if (!checkPrefixLengthAndAddressFamily(prefix_length, family)) { throw errors::InvalidIpAddressException(); } + std::memset(&hicn_ip_prefix_, 0, sizeof(hicn_ip_prefix_t)); + int ret; switch (family) { case AF_INET: - ret = inet_pton(AF_INET, prefix.c_str(), ip_prefix_.address.v4.buffer); + ret = + inet_pton(AF_INET, prefix.c_str(), hicn_ip_prefix_.address.v4.buffer); break; case AF_INET6: - ret = inet_pton(AF_INET6, prefix.c_str(), ip_prefix_.address.v6.buffer); + ret = inet_pton(AF_INET6, prefix.c_str(), + hicn_ip_prefix_.address.v6.buffer); break; default: throw errors::InvalidIpAddressException(); @@ -95,14 +97,22 @@ void Prefix::buildPrefix(std::string &prefix, uint16_t prefix_length, throw errors::InvalidIpAddressException(); } - ip_prefix_.len = (u8)prefix_length; - ip_prefix_.family = family; + hicn_ip_prefix_.len = (u8)prefix_length; + hicn_ip_prefix_.family = family; +} + +bool Prefix::operator<(const Prefix &other) const { + return hicn_ip_prefix_cmp(&hicn_ip_prefix_, &other.hicn_ip_prefix_) < 0; +} + +bool Prefix::operator==(const Prefix &other) const { + return hicn_ip_prefix_cmp(&hicn_ip_prefix_, &other.hicn_ip_prefix_) == 0; } std::unique_ptr<Sockaddr> Prefix::toSockaddr() const { Sockaddr *ret = nullptr; - switch (ip_prefix_.family) { + switch (hicn_ip_prefix_.family) { case AF_INET6: ret = (Sockaddr *)new Sockaddr6; break; @@ -113,72 +123,79 @@ std::unique_ptr<Sockaddr> Prefix::toSockaddr() const { throw errors::InvalidIpAddressException(); } - if (ip_prefix_to_sockaddr(&ip_prefix_, ret) < 0) { + if (hicn_ip_prefix_to_sockaddr(&hicn_ip_prefix_, ret) < 0) { throw errors::InvalidIpAddressException(); } return std::unique_ptr<Sockaddr>(ret); } -uint16_t Prefix::getPrefixLength() const { return ip_prefix_.len; } +uint16_t Prefix::getPrefixLength() const { return hicn_ip_prefix_.len; } Prefix &Prefix::setPrefixLength(uint16_t prefix_length) { - ip_prefix_.len = (u8)prefix_length; - return *this; -} - -int Prefix::getAddressFamily() const { return ip_prefix_.family; } + if (!checkPrefixLengthAndAddressFamily(prefix_length, + hicn_ip_prefix_.family)) { + throw errors::InvalidIpAddressException(); + } -Prefix &Prefix::setAddressFamily(int address_family) { - ip_prefix_.family = address_family; + hicn_ip_prefix_.len = (u8)prefix_length; return *this; } +int Prefix::getAddressFamily() const { return hicn_ip_prefix_.family; } + std::string Prefix::getNetwork() const { - if (!checkPrefixLengthAndAddressFamily(ip_prefix_.len, ip_prefix_.family)) { + if (!checkPrefixLengthAndAddressFamily(hicn_ip_prefix_.len, + hicn_ip_prefix_.family)) { throw errors::InvalidIpAddressException(); } - std::size_t size = - ip_prefix_.family == 4 + AF_INET ? INET_ADDRSTRLEN : INET6_ADDRSTRLEN; - - std::string network(size, 0); + char buffer[INET6_ADDRSTRLEN]; - if (ip_prefix_ntop_short(&ip_prefix_, (char *)network.c_str(), size) < 0) { + if (hicn_ip_prefix_ntop_short(&hicn_ip_prefix_, buffer, INET6_ADDRSTRLEN) < + 0) { throw errors::RuntimeException( "Impossible to retrieve network from ip address."); } - return network; + return buffer; } -int Prefix::contains(const ip_address_t &content_name) const { - int res = - ip_address_cmp(&content_name, &(ip_prefix_.address), ip_prefix_.family); - - if (ip_prefix_.len != (ip_prefix_.family == AF_INET6 ? IPV6_ADDR_LEN_BITS - : IPV4_ADDR_LEN_BITS)) { - const u8 *ip_prefix_buffer = - ip_address_get_buffer(&(ip_prefix_.address), ip_prefix_.family); - const u8 *content_name_buffer = - ip_address_get_buffer(&content_name, ip_prefix_.family); - uint8_t mask = 0xFF >> (ip_prefix_.len % 8); - mask = ~mask; - - res += (ip_prefix_buffer[ip_prefix_.len] & mask) == - (content_name_buffer[ip_prefix_.len] & mask); +bool Prefix::contains(const hicn_ip_address_t &content_name) const { + uint64_t mask[2] = {0, 0}; + auto content_name_copy = content_name; + auto network_copy = hicn_ip_prefix_.address; + + auto prefix_length = getPrefixLength(); + if (hicn_ip_prefix_.family == AF_INET) { + prefix_length += 3 * IPV4_ADDR_LEN_BITS; } - return res; -} + if (prefix_length == 0) { + mask[0] = mask[1] = 0; + } else if (prefix_length <= 64) { + mask[0] = portability::host_to_net((uint64_t)(~0) << (64 - prefix_length)); + mask[1] = 0; + } else if (prefix_length == 128) { + mask[0] = mask[1] = 0xffffffffffffffff; + } else { + prefix_length -= 64; + mask[0] = 0xffffffffffffffff; + mask[1] = portability::host_to_net((uint64_t)(~0) << (64 - prefix_length)); + } -int Prefix::contains(const core::Name &content_name) const { - return contains(content_name.toIpAddress().address); + // Apply mask + content_name_copy.v6.as_u64[0] &= mask[0]; + content_name_copy.v6.as_u64[1] &= mask[1]; + + network_copy.v6.as_u64[0] &= mask[0]; + network_copy.v6.as_u64[1] &= mask[1]; + + return hicn_ip_address_cmp(&network_copy, &content_name_copy) == 0; } -Name Prefix::getName() const { - std::string s(getNetwork()); - return Name(s); +bool Prefix::contains(const core::Name &content_name) const { + return contains(content_name.toIpAddress().address); } /* @@ -187,23 +204,25 @@ Name Prefix::getName() const { */ Name Prefix::getName(const core::Name &mask, const core::Name &components, const core::Name &content_name) const { - if (ip_prefix_.family != mask.getAddressFamily() || - ip_prefix_.family != components.getAddressFamily() || - ip_prefix_.family != content_name.getAddressFamily()) + if (hicn_ip_prefix_.family != mask.getAddressFamily() || + hicn_ip_prefix_.family != components.getAddressFamily() || + hicn_ip_prefix_.family != content_name.getAddressFamily()) throw errors::RuntimeException( - "Prefix, mask, components and content name are not of the same address " - "family"); - - ip_address_t mask_ip = mask.toIpAddress().address; - ip_address_t component_ip = components.toIpAddress().address; - ip_address_t name_ip = content_name.toIpAddress().address; - const u8 *mask_ip_buffer = ip_address_get_buffer(&mask_ip, ip_prefix_.family); + "Prefix, mask, components and content name are not of the same" + "address family"); + + hicn_ip_address_t mask_ip = mask.toIpAddress().address; + hicn_ip_address_t component_ip = components.toIpAddress().address; + hicn_ip_address_t name_ip = content_name.toIpAddress().address; + const u8 *mask_ip_buffer = + hicn_ip_address_get_buffer(&mask_ip, hicn_ip_prefix_.family); const u8 *component_ip_buffer = - ip_address_get_buffer(&component_ip, ip_prefix_.family); - u8 *name_ip_buffer = - const_cast<u8 *>(ip_address_get_buffer(&name_ip, ip_prefix_.family)); + hicn_ip_address_get_buffer(&component_ip, hicn_ip_prefix_.family); + u8 *name_ip_buffer = const_cast<u8 *>( + hicn_ip_address_get_buffer(&name_ip, hicn_ip_prefix_.family)); - int addr_len = ip_prefix_.family == AF_INET6 ? IPV6_ADDR_LEN : IPV4_ADDR_LEN; + int addr_len = + hicn_ip_prefix_.family == AF_INET6 ? IPV6_ADDR_LEN : IPV4_ADDR_LEN; for (int i = 0; i < addr_len; i++) { if (mask_ip_buffer[i]) { @@ -211,108 +230,98 @@ Name Prefix::getName(const core::Name &mask, const core::Name &components, } } - // if (this->contains(name_ip)) - // throw errors::RuntimeException("Mask overrides the prefix"); - return Name(ip_prefix_.family, (uint8_t *)&name_ip); -} - -Name Prefix::getRandomName() const { - ip_address_t name_ip = ip_prefix_.address; - u8 *name_ip_buffer = - const_cast<u8 *>(ip_address_get_buffer(&name_ip, ip_prefix_.family)); - - int addr_len = - (ip_prefix_.family == AF_INET6 ? IPV6_ADDR_LEN * 8 : IPV4_ADDR_LEN * 8) - - ip_prefix_.len; - - size_t size = (size_t)ceil((float)addr_len / 8.0); - uint8_t *buffer = (uint8_t *)malloc(sizeof(uint8_t) * size); - - RAND_bytes(buffer, (int)size); - - int j = 0; - for (uint8_t i = (uint8_t)ceil((float)ip_prefix_.len / 8.0); - i < (ip_prefix_.family == AF_INET6 ? IPV6_ADDR_LEN : IPV4_ADDR_LEN); - i++) { - name_ip_buffer[i] = buffer[j]; - j++; - } - free(buffer); - - return Name(ip_prefix_.family, (uint8_t *)&name_ip); + return Name(hicn_ip_prefix_.family, (uint8_t *)&name_ip); } /* * Map a name in a different name prefix to this name prefix */ Name Prefix::mapName(const core::Name &content_name) const { - if (ip_prefix_.family != content_name.getAddressFamily()) + if (hicn_ip_prefix_.family != content_name.getAddressFamily()) throw errors::RuntimeException( "Prefix content name are not of the same address " "family"); - ip_address_t name_ip = content_name.toIpAddress().address; - const u8 *ip_prefix_buffer = - ip_address_get_buffer(&(ip_prefix_.address), ip_prefix_.family); - u8 *name_ip_buffer = - const_cast<u8 *>(ip_address_get_buffer(&name_ip, ip_prefix_.family)); - - memcpy(name_ip_buffer, ip_prefix_buffer, ip_prefix_.len / 8); - - if (ip_prefix_.len != (ip_prefix_.family == AF_INET6 ? IPV6_ADDR_LEN_BITS - : IPV4_ADDR_LEN_BITS)) { - uint8_t mask = 0xFF >> (ip_prefix_.len % 8); - name_ip_buffer[ip_prefix_.len / 8 + 1] = - (name_ip_buffer[ip_prefix_.len / 8 + 1] & mask) | - (ip_prefix_buffer[ip_prefix_.len / 8 + 1] & ~mask); + hicn_ip_address_t name_ip = content_name.toIpAddress().address; + const u8 *hicn_ip_prefix_buffer = hicn_ip_address_get_buffer( + &(hicn_ip_prefix_.address), hicn_ip_prefix_.family); + u8 *name_ip_buffer = const_cast<u8 *>( + hicn_ip_address_get_buffer(&name_ip, hicn_ip_prefix_.family)); + + memcpy(name_ip_buffer, hicn_ip_prefix_buffer, hicn_ip_prefix_.len / 8); + + if (hicn_ip_prefix_.len != (hicn_ip_prefix_.family == AF_INET6 + ? IPV6_ADDR_LEN_BITS + : IPV4_ADDR_LEN_BITS)) { + uint8_t mask = 0xFF >> (hicn_ip_prefix_.len % 8); + name_ip_buffer[hicn_ip_prefix_.len / 8 + 1] = + (name_ip_buffer[hicn_ip_prefix_.len / 8 + 1] & mask) | + (hicn_ip_prefix_buffer[hicn_ip_prefix_.len / 8 + 1] & ~mask); } - return Name(ip_prefix_.family, (uint8_t *)&name_ip); + return Name(hicn_ip_prefix_.family, (uint8_t *)&name_ip); } -Prefix &Prefix::setNetwork(std::string &network) { - if (!inet_pton(AF_INET6, network.c_str(), ip_prefix_.address.v6.buffer)) { +Prefix &Prefix::setNetwork(const std::string &network) { + if (hicn_ip_address_pton(network.c_str(), &hicn_ip_prefix_.address) < 0) { throw errors::RuntimeException("The network name is not valid."); } return *this; } +Name Prefix::makeName() const { return makeNameWithIndex(0); } + Name Prefix::makeRandomName() const { - srand((unsigned int)time(nullptr)); - - if (ip_prefix_.family == AF_INET6) { - std::default_random_engine eng((std::random_device())()); - std::uniform_int_distribution<uint32_t> idis( - 0, std::numeric_limits<uint32_t>::max()); - uint64_t random_number = idis(eng); - - uint32_t hash_size_bits = IPV6_ADDR_LEN_BITS - ip_prefix_.len; - uint64_t ip_address[2]; - memcpy(ip_address, ip_prefix_.address.v6.buffer, sizeof(uint64_t)); - memcpy(ip_address + 1, ip_prefix_.address.v6.buffer + 8, sizeof(uint64_t)); - std::string network(IPV6_ADDR_LEN * 3, 0); - - // Let's do the magic ;) - int shift_size = hash_size_bits > sizeof(random_number) * 8 - ? sizeof(random_number) * 8 - : hash_size_bits; - - ip_address[1] >>= shift_size; - ip_address[1] <<= shift_size; - - ip_address[1] |= random_number >> (sizeof(uint64_t) * 8 - shift_size); - - if (!inet_ntop(ip_prefix_.family, ip_address, (char *)network.c_str(), - IPV6_ADDR_LEN * 3)) { - throw errors::RuntimeException( - "Impossible to retrieve network from ip address."); - } + std::default_random_engine eng((std::random_device())()); + std::uniform_int_distribution<uint32_t> idis( + 0, std::numeric_limits<uint32_t>::max()); + uint64_t random_number = idis(eng); + + return makeNameWithIndex(random_number); +} + +Name Prefix::makeNameWithIndex(std::uint64_t index) const { + uint16_t prefix_length = getPrefixLength(); + + Name ret; - return Name(network); + // Adjust prefix length depending on the address family + if (getAddressFamily() == AF_INET) { + // Sanity check + DCHECK(prefix_length <= 32); + // Convert prefix length to ip46_address_t prefix length + prefix_length += IPV4_ADDR_LEN_BITS * 3; } - return Name(); + std::memcpy(ret.getStructReference().prefix.v6.as_u8, + hicn_ip_prefix_.address.v6.as_u8, sizeof(hicn_ip_address_t)); + + // Convert index in network byte order + index = portability::host_to_net(index); + + // Apply mask + uint64_t mask; + if (prefix_length == 0) { + mask = 0; + } else if (prefix_length <= 64) { + mask = 0; + } else if (prefix_length == 128) { + mask = 0xffffffffffffffff; + } else { + prefix_length -= 64; + mask = portability::host_to_net((uint64_t)(~0) << (64 - prefix_length)); + } + + ret.getStructReference().prefix.v6.as_u64[1] &= mask; + // Eventually truncate index if too big + index &= ~mask; + + // Apply index + ret.getStructReference().prefix.v6.as_u64[1] |= index; + + // Done + return ret; } bool Prefix::checkPrefixLengthAndAddressFamily(uint16_t prefix_length, @@ -332,7 +341,9 @@ bool Prefix::checkPrefixLengthAndAddressFamily(uint16_t prefix_length, return true; } -const ip_prefix_t &Prefix::toIpPrefixStruct() const { return ip_prefix_; } +const hicn_ip_prefix_t &Prefix::toIpPrefixStruct() const { + return hicn_ip_prefix_; +} } // namespace core diff --git a/libtransport/src/core/tcp_socket_connector.cc b/libtransport/src/core/tcp_socket_connector.cc index a30264271..7758e2cf2 100644 --- a/libtransport/src/core/tcp_socket_connector.cc +++ b/libtransport/src/core/tcp_socket_connector.cc @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: @@ -85,8 +85,10 @@ void TcpSocketConnector::send(const uint8_t *packet, std::size_t len, const PacketSentCallback &packet_sent) { if (packet_sent != 0) { asio::async_write(socket_, asio::buffer(packet, len), - [packet_sent](std::error_code ec, - std::size_t /*length*/) { packet_sent(); }); + [packet_sent](const std::error_code &ec) + std::size_t /*length*/) { + packet_sent(); + }); } else { if (state_ == ConnectorState::CONNECTED) { asio::write(socket_, asio::buffer(packet, len)); @@ -151,7 +153,7 @@ void TcpSocketConnector::doWrite() { asio::async_write( socket_, std::move(array), - [this, packet_store = std::move(packet_store)](std::error_code ec, + [this, packet_store = std::move(packet_store)](const std::error_code &ec, std::size_t length) { if (TRANSPORT_EXPECT_TRUE(!ec)) { if (!output_buffer_.empty()) { @@ -172,7 +174,7 @@ void TcpSocketConnector::doReadBody(std::size_t body_length) { asio::async_read( socket_, asio::buffer(read_msg_->writableTail(), body_length), asio::transfer_exactly(body_length), - [this](std::error_code ec, std::size_t length) { + [this](const std::error_code &ec, std::size_t length) { read_msg_->append(length); if (TRANSPORT_EXPECT_TRUE(!ec)) { receive_callback_(std::move(read_msg_)); @@ -195,7 +197,7 @@ void TcpSocketConnector::doReadHeader() { asio::buffer(read_msg_->writableData(), NetworkMessage::fixed_header_length), asio::transfer_exactly(NetworkMessage::fixed_header_length), - [this](std::error_code ec, std::size_t length) { + [this](const std::error_code &ec, std::size_t length) { if (TRANSPORT_EXPECT_TRUE(!ec)) { read_msg_->append(NetworkMessage::fixed_header_length); std::size_t body_length = 0; @@ -235,7 +237,7 @@ void TcpSocketConnector::tryReconnect() { void TcpSocketConnector::doConnect() { asio::async_connect( socket_, endpoint_iterator_, - [this](std::error_code ec, tcp::resolver::iterator) { + [this](const std::error_code &ec, tcp::resolver::iterator) { if (!ec) { timer_.cancel(); state_ = ConnectorState::CONNECTED; diff --git a/libtransport/src/core/tcp_socket_connector.h b/libtransport/src/core/tcp_socket_connector.h index 21db8301e..2901a5c9d 100644 --- a/libtransport/src/core/tcp_socket_connector.h +++ b/libtransport/src/core/tcp_socket_connector.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Copyright (c) 2021 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: diff --git a/libtransport/src/core/udp_connector.cc b/libtransport/src/core/udp_connector.cc new file mode 100644 index 000000000..5f620b1a8 --- /dev/null +++ b/libtransport/src/core/udp_connector.cc @@ -0,0 +1,371 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + */ + +#include <core/errors.h> +#include <core/udp_connector.h> +#include <glog/logging.h> +#include <hicn/transport/utils/branch_prediction.h> + +#include <iostream> +#include <thread> +#include <vector> + +namespace transport { +namespace core { + +UdpTunnelConnector::~UdpTunnelConnector() {} + +void UdpTunnelConnector::connect(const std::string &hostname, uint16_t port, + const std::string &bind_address, + uint16_t bind_port) { + if (state_ == State::CLOSED) { + state_ = State::CONNECTING; + + asio::ip::udp::resolver::query query(asio::ip::udp::v4(), hostname, + std::to_string(port)); + + endpoint_iterator_ = resolver_.resolve(query); + remote_endpoint_send_ = *endpoint_iterator_; + socket_->open(remote_endpoint_send_.protocol()); + + if (!bind_address.empty() && bind_port != 0) { + using namespace asio::ip; + + auto address = address::from_string(bind_address); + if (address.is_v6()) { + std::error_code ec; + socket_->set_option(asio::ip::v6_only(false), ec); + // Call succeeds only on dual stack systems. + } + + socket_->bind(udp::endpoint(address, bind_port)); + } + + remote_endpoint_ = Endpoint(remote_endpoint_send_); + local_endpoint_ = Endpoint(socket_->local_endpoint()); + + auto self = shared_from_this(); + doConnect(self); + } +} + +void UdpTunnelConnector::send(Packet &packet) { + send(packet.shared_from_this()); +} + +void UdpTunnelConnector::send(const utils::MemBuf::Ptr &buffer) { + auto self = shared_from_this(); + io_service_.post([self, buffer]() { + bool write_in_progress = !self->output_buffer_.empty(); + self->output_buffer_.push_back(std::move(buffer)); + if (TRANSPORT_EXPECT_TRUE(self->state_ == State::CONNECTED)) { + if (!write_in_progress) { + self->doSendPacket(self); + } + } else { + self->data_available_ = true; + } + }); +} + +void UdpTunnelConnector::close() { + DLOG_IF(INFO, VLOG_IS_ON(2)) << "UDPTunnelConnector::close"; + state_ = State::CLOSED; + bool is_socket_owned = socket_.use_count() == 1; + if (is_socket_owned) { + // Here we use a shared ptr to keep the object alive until we call the close + // function + auto self = shared_from_this(); + io_service_.dispatch([this, self]() { + socket_->close(); + // on_close_callback_(shared_from_this()); + }); + } +} + +void UdpTunnelConnector::doSendPacket( + const std::shared_ptr<UdpTunnelConnector> &self) { +#ifdef LINUX + send_timer_.expires_from_now(std::chrono::microseconds(50)); + send_timer_.async_wait([self](const std::error_code &ec) { + if (ec) { + return; + } + + self->writeHandler(); + }); +#else + auto packet = output_buffer_.front().get(); + auto array = std::vector<asio::const_buffer>(); + + const ::utils::MemBuf *current = packet; + do { + array.push_back(asio::const_buffer(current->data(), current->length())); + current = current->next(); + } while (current != packet); + + socket_->async_send(std::move(array), [this, self](const std::error_code &ec, + std::size_t length) { + if (TRANSPORT_EXPECT_TRUE(!ec)) { + sent_callback_(this, make_error_code(core_error::success)); + } else if (ec.value() == static_cast<int>(std::errc::operation_canceled)) { + // The connection has been closed by the application. + return; + } else { + sendFailed(); + sent_callback_(this, ec); + } + + output_buffer_.pop_front(); + if (!output_buffer_.empty()) { + doSendPacket(self); + } + }); +#endif +} + +void UdpTunnelConnector::retryConnection() { + // The connection was refused. In this case let's retry to reconnect. + connection_reattempts_++; + LOG(ERROR) << "Error in UDP: Connection refused. Retrying..."; + state_ = State::CONNECTING; + timer_.expires_from_now(std::chrono::milliseconds(500)); + std::weak_ptr<UdpTunnelConnector> self = shared_from_this(); + timer_.async_wait([self, this](const std::error_code &ec) { + if (ec) { + } + if (auto ptr = self.lock()) { + doConnect(ptr); + } + }); + return; +} + +#ifdef LINUX +void UdpTunnelConnector::writeHandler() { + if (TRANSPORT_EXPECT_FALSE(state_ != State::CONNECTED)) { + return; + } + + auto len = std::min(output_buffer_.size(), std::size_t(Connector::max_burst)); + + if (len) { + int m = 0; + for (auto &p : output_buffer_) { + auto packet = p.get(); + ::utils::MemBuf *current = packet; + int b = 0; + do { + // array.push_back(asio::const_buffer(current->data(), + // current->length())); + tx_iovecs_[m][b].iov_base = current->writableData(); + tx_iovecs_[m][b].iov_len = current->length(); + current = current->next(); + b++; + } while (current != packet); + + tx_msgs_[m].msg_hdr.msg_iov = tx_iovecs_[m]; + tx_msgs_[m].msg_hdr.msg_iovlen = b; + tx_msgs_[m].msg_hdr.msg_name = remote_endpoint_send_.data(); + tx_msgs_[m].msg_hdr.msg_namelen = remote_endpoint_send_.size(); + m++; + + if (--len == 0) { + break; + } + } + + int retval = sendmmsg(socket_->native_handle(), tx_msgs_, m, MSG_DONTWAIT); + if (retval > 0) { + while (retval--) { + output_buffer_.pop_front(); + } + } else if (errno != EWOULDBLOCK && errno != EAGAIN) { // NOSONAR + LOG(ERROR) << "Error sending messages: " << strerror(errno); + sent_callback_(this, make_error_code(core_error::send_failed)); + return; + } + } + + if (!output_buffer_.empty()) { + send_timer_.expires_from_now(std::chrono::microseconds(50)); + std::weak_ptr<UdpTunnelConnector> self = shared_from_this(); + send_timer_.async_wait([self](const std::error_code &ec) { + if (ec) { + return; + } + if (auto ptr = self.lock()) { + ptr->writeHandler(); + } + }); + } else { + sent_callback_(this, make_error_code(core_error::success)); + } +} + +void UdpTunnelConnector::readHandler(const std::error_code &ec) { + DLOG_IF(INFO, VLOG_IS_ON(3)) << "UdpTunnelConnector receive packet"; + + if (TRANSPORT_EXPECT_TRUE(!ec)) { + if (TRANSPORT_EXPECT_TRUE(state_ == State::CONNECTED)) { + if (current_position_ == 0) { + for (int i = 0; i < max_burst; i++) { + auto read_buffer = getRawBuffer(); + rx_iovecs_[i][0].iov_base = read_buffer.first; + rx_iovecs_[i][0].iov_len = read_buffer.second; + rx_msgs_[i].msg_hdr.msg_iov = rx_iovecs_[i]; + rx_msgs_[i].msg_hdr.msg_iovlen = 1; + } + } + + int res = recvmmsg(socket_->native_handle(), rx_msgs_ + current_position_, + max_burst - current_position_, MSG_DONTWAIT, nullptr); + if (res < 0) { + if (errno == EWOULDBLOCK || errno == EAGAIN) { // NOSONAR + // Try again later + return; + } + + if (errno == ECONNREFUSED && + connection_reattempts_ < max_reconnection_reattempts) { + retryConnection(); + return; + } + + LOG(ERROR) << "Error receiving messages! " << strerror(errno) << " " + << res; + std::vector<utils::MemBuf::Ptr> v; + auto ec = make_error_code(core_error::receive_failed); + + receive_callback_(this, v, ec); + return; + } + + std::vector<utils::MemBuf::Ptr> v; + v.reserve(res); + for (int i = 0; i < res; i++) { + auto packet = getPacketFromBuffer( + reinterpret_cast<uint8_t *>( + rx_msgs_[current_position_].msg_hdr.msg_iov[0].iov_base), + rx_msgs_[current_position_].msg_len); + receiveSuccess(*packet); + v.push_back(std::move(packet)); + ++current_position_; + } + + receive_callback_(this, v, make_error_code(core_error::success)); + + doRecvPacket(); + } else { + LOG(ERROR) << "Error in UDP: Receiving packets from a not " + "connected socket."; + } + } else if (ec.value() == static_cast<int>(std::errc::operation_canceled)) { + LOG(ERROR) << "The connection has been closed by the application."; + return; + } else { + if (TRANSPORT_EXPECT_TRUE(state_ == State::CONNECTED)) { + // receive_callback_(this, *read_msg_, ec); + LOG(ERROR) << "Error in UDP connector: " << ec.value() << " " + << ec.message(); + } else { + LOG(ERROR) << "Error in connector while not connected. " << ec.value() + << " " << ec.message(); + } + } +} +#endif + +void UdpTunnelConnector::doRecvPacket() { + std::weak_ptr<UdpTunnelConnector> self = shared_from_this(); +#ifdef LINUX + if (state_ == State::CONNECTED) { +#if ((ASIO_VERSION / 100 % 1000) < 11) + socket_->async_receive(asio::null_buffers(), +#else + socket_->async_wait(asio::ip::tcp::socket::wait_read, +#endif + [self](const std::error_code &ec) { + if (ec) { + LOG(ERROR) + << "Error in UDP connector: " << ec.value() + << " " << ec.message(); + return; + } + if (auto ptr = self.lock()) { + ptr->readHandler(ec); + } + }); + } +#else + DLOG_IF(INFO, VLOG_IS_ON(3)) << "UdpTunnelConnector receive packet"; + read_msg_ = getRawBuffer(); + socket_->async_receive_from( + asio::buffer(read_msg_.first, read_msg_.second), remote_endpoint_recv_, + [this, self](const std::error_code &ec, std::size_t length) { + if (auto ptr = self.lock()) { + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "UdpTunnelConnector received packet length=" << length; + if (TRANSPORT_EXPECT_TRUE(!ec)) { + if (TRANSPORT_EXPECT_TRUE(state_ == State::CONNECTED)) { + auto packet = getPacketFromBuffer(read_msg_.first, length); + receiveSuccess(*packet); + std::vector<utils::MemBuf::Ptr> v{std::move(packet)}; + receive_callback_(this, v, make_error_code(core_error::success)); + doRecvPacket(); + } else { + LOG(ERROR) << "Error in UDP: Receiving packets from a not " + "connected socket."; + } + } else if (ec.value() == + static_cast<int>(std::errc::operation_canceled)) { + LOG(ERROR) << "The connection has been closed by the application."; + return; + } else if (ec.value() == + static_cast<int>(std::errc::connection_refused)) { + if (connection_reattempts_ < max_reconnection_reattempts) { + retryConnection(); + } + } else { + if (TRANSPORT_EXPECT_TRUE(state_ == State::CONNECTED)) { + LOG(ERROR) << "Error in UDP connector: " << ec.value() + << ec.message(); + } else { + LOG(ERROR) << "Error while not connected"; + } + } + } + }); +#endif +} + +void UdpTunnelConnector::doConnect( + std::shared_ptr<UdpTunnelConnector> &self_shared) { + std::weak_ptr<UdpTunnelConnector> self = self_shared; + asio::async_connect(*socket_, endpoint_iterator_, + [this, self](const std::error_code &ec, + asio::ip::udp::resolver::iterator) { + if (auto ptr = self.lock()) { + if (!ec) { + state_ = State::CONNECTED; + doRecvPacket(); + + if (data_available_) { + data_available_ = false; + doSendPacket(ptr); + } + + on_reconnect_callback_( + this, make_error_code(core_error::success)); + } else { + LOG(ERROR) << "UDP Connection failed!!!"; + retryConnection(); + } + } + }); +} + +} // namespace core + +} // namespace transport diff --git a/libtransport/src/core/udp_connector.h b/libtransport/src/core/udp_connector.h new file mode 100644 index 000000000..002f4ca9f --- /dev/null +++ b/libtransport/src/core/udp_connector.h @@ -0,0 +1,145 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + */ + +#pragma once + +#include <core/errors.h> +#include <hicn/transport/core/asio_wrapper.h> +#include <hicn/transport/core/connector.h> +#include <hicn/transport/portability/platform.h> + +#include <iostream> +#include <memory> + +namespace transport { +namespace core { + +class UdpTunnelListener; + +class UdpTunnelConnector : public Connector { + friend class UdpTunnelListener; + + public: + template <typename ReceiveCallback, typename SentCallback, typename OnClose, + typename OnReconnect> + UdpTunnelConnector(asio::io_service &io_service, + ReceiveCallback &&receive_callback, + SentCallback &&packet_sent, OnClose &&on_close_callback, + OnReconnect &&on_reconnect) + : Connector(receive_callback, packet_sent, on_close_callback, + on_reconnect), + io_service_(io_service), + socket_(std::make_shared<asio::ip::udp::socket>(io_service_)), + resolver_(io_service_), + timer_(io_service_), +#ifdef LINUX + send_timer_(io_service_), + tx_iovecs_{}, + tx_msgs_{}, + rx_iovecs_{}, + rx_msgs_{}, + current_position_(0), +#else + read_msg_(nullptr, 0), +#endif + data_available_(false) { + } + + template <typename ReceiveCallback, typename SentCallback, typename OnClose, + typename OnReconnect, typename EndpointType> + UdpTunnelConnector(std::shared_ptr<asio::ip::udp::socket> &socket, + std::shared_ptr<asio::io_service::strand> &strand, + ReceiveCallback &&receive_callback, + SentCallback &&packet_sent, OnClose &&on_close_callback, + OnReconnect &&on_reconnect, EndpointType &&remote_endpoint) + : Connector(receive_callback, packet_sent, on_close_callback, + on_reconnect), +#if ((ASIO_VERSION / 100 % 1000) < 12) + io_service_(socket->get_io_service()), +#else + io_service_((asio::io_context &)(socket->get_executor().context())), +#endif + socket_(socket), + resolver_(io_service_), + remote_endpoint_send_(std::forward<EndpointType>(remote_endpoint)), + timer_(io_service_), +#ifdef LINUX + send_timer_(io_service_), + tx_iovecs_{}, + tx_msgs_{}, + rx_iovecs_{}, + rx_msgs_{}, + current_position_(0), +#else + read_msg_(nullptr, 0), +#endif + data_available_(false) { + if (socket_->is_open()) { + state_ = State::CONNECTED; + remote_endpoint_ = Endpoint(remote_endpoint_send_); + local_endpoint_ = socket_->local_endpoint(); + } + } + + ~UdpTunnelConnector() override; + + void send(Packet &packet) override; + + void send(const utils::MemBuf::Ptr &buffer) override; + + void close() override; + + void connect(const std::string &hostname, std::uint16_t port, + const std::string &bind_address = "", + std::uint16_t bind_port = 0); + + auto shared_from_this() { return utils::shared_from(this); } + + private: + void retryConnection(); + void doConnect(std::shared_ptr<UdpTunnelConnector> &self); + void doRecvPacket(); + + void doRecvPacket(utils::MemBuf::Ptr &buffer) { + std::vector<utils::MemBuf::Ptr> v{std::move(buffer)}; + receive_callback_(this, v, make_error_code(core_error::success)); + } + +#ifdef LINUX + void readHandler(const std::error_code &ec); + void writeHandler(); +#endif + + void setConnected() { state_ = State::CONNECTED; } + + void doSendPacket(const std::shared_ptr<UdpTunnelConnector> &self); + void doClose(); + + private: + asio::io_service &io_service_; + std::shared_ptr<asio::ip::udp::socket> socket_; + asio::ip::udp::resolver resolver_; + asio::ip::udp::resolver::iterator endpoint_iterator_; + asio::ip::udp::endpoint remote_endpoint_send_; + asio::ip::udp::endpoint remote_endpoint_recv_; + + asio::steady_timer timer_; + +#ifdef LINUX + asio::steady_timer send_timer_; + struct iovec tx_iovecs_[max_burst][8]; + struct mmsghdr tx_msgs_[max_burst]; + struct iovec rx_iovecs_[max_burst][8]; + struct mmsghdr rx_msgs_[max_burst]; + std::uint8_t current_position_; +#else + std::pair<uint8_t *, std::size_t> read_msg_; +#endif + + bool data_available_; +}; + +} // namespace core + +} // namespace transport diff --git a/libtransport/src/core/udp_listener.cc b/libtransport/src/core/udp_listener.cc new file mode 100644 index 000000000..caa97e0ee --- /dev/null +++ b/libtransport/src/core/udp_listener.cc @@ -0,0 +1,182 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + */ + +#include <core/udp_connector.h> +#include <core/udp_listener.h> +#include <glog/logging.h> +#include <hicn/transport/portability/endianess.h> +#include <hicn/transport/utils/hash.h> + +#ifndef LINUX +namespace std { +size_t hash<asio::ip::udp::endpoint>::operator()( + const asio::ip::udp::endpoint &endpoint) const { + auto hash_ip = endpoint.address().is_v4() + ? endpoint.address().to_v4().to_ulong() + : utils::hash::fnv32_buf( + endpoint.address().to_v6().to_bytes().data(), 16); + uint16_t port = endpoint.port(); + return utils::hash::fnv32_buf(&port, 2, (unsigned int)hash_ip); +} +} // namespace std +#endif + +namespace transport { +namespace core { + +UdpTunnelListener::~UdpTunnelListener() {} + +void UdpTunnelListener::close() { + strand_->post([this]() { + if (socket_->is_open()) { + socket_->close(); + } + }); +} + +#ifdef LINUX +void UdpTunnelListener::readHandler(const std::error_code &ec) { + DLOG_IF(INFO, VLOG_IS_ON(3)) << "UdpTunnelConnector receive packet"; + + if (TRANSPORT_EXPECT_TRUE(!ec)) { + if (current_position_ == 0) { + for (int i = 0; i < Connector::max_burst; i++) { + auto read_buffer = Connector::getRawBuffer(); + iovecs_[i][0].iov_base = read_buffer.first; + iovecs_[i][0].iov_len = read_buffer.second; + msgs_[i].msg_hdr.msg_iov = iovecs_[i]; + msgs_[i].msg_hdr.msg_iovlen = 1; + msgs_[i].msg_hdr.msg_name = &remote_endpoints_[i]; + msgs_[i].msg_hdr.msg_namelen = sizeof(remote_endpoints_[i]); + } + } + + int res = recvmmsg(socket_->native_handle(), msgs_ + current_position_, + Connector::max_burst - current_position_, MSG_DONTWAIT, + nullptr); + if (res < 0) { + LOG(ERROR) << "Error in recvmmsg."; + return; + } + + for (int i = 0; i < res; i++) { + auto packet = Connector::getPacketFromBuffer( + reinterpret_cast<uint8_t *>( + msgs_[current_position_].msg_hdr.msg_iov[0].iov_base), + msgs_[current_position_].msg_len); + auto connector_id = + utils::hash::fnv64_buf(msgs_[current_position_].msg_hdr.msg_name, + msgs_[current_position_].msg_hdr.msg_namelen); + + auto connector = connectors_.find(connector_id); + if (connector == connectors_.end()) { + // Create new connector corresponding to new client + + /* + * Get the remote endpoint for this particular message + */ + using namespace asio::ip; + if (local_endpoint_.address().is_v4()) { + auto addr = reinterpret_cast<struct sockaddr_in *>( + &remote_endpoints_[current_position_]); + address_v4::bytes_type address_bytes; + std::copy_n(reinterpret_cast<uint8_t *>(&addr->sin_addr), + address_bytes.size(), address_bytes.begin()); + address_v4 address(address_bytes); + remote_endpoint_ = + udp::endpoint(address, portability::net_to_host(addr->sin_port)); + } else { + auto addr = reinterpret_cast<struct sockaddr_in6 *>( + &remote_endpoints_[current_position_]); + address_v6::bytes_type address_bytes; + std::copy_n(reinterpret_cast<uint8_t *>(&addr->sin6_addr), + address_bytes.size(), address_bytes.begin()); + address_v6 address(address_bytes); + remote_endpoint_ = + udp::endpoint(address, portability::net_to_host(addr->sin6_port)); + } + + /** + * Create new connector sharing the same socket of this listener. + */ + auto ret = connectors_.emplace( + connector_id, + std::make_shared<UdpTunnelConnector>( + socket_, strand_, receive_callback_, + [](Connector *, const std::error_code &) {}, [](Connector *) {}, + [](Connector *, const std::error_code &) {}, + std::move(remote_endpoint_))); + connector = ret.first; + connector->second->setConnectorId(connector_id); + } + + /** + * Use connector callback to process incoming message. + */ + UdpTunnelConnector *c = + dynamic_cast<UdpTunnelConnector *>(connector->second.get()); + c->doRecvPacket(packet); + + ++current_position_; + } + + doRecvPacket(); + } else if (ec.value() == static_cast<int>(std::errc::operation_canceled)) { + LOG(ERROR) << "The connection has been closed by the application."; + return; + } else { + LOG(ERROR) << ec.value() << " " << ec.message(); + } +} +#endif + +void UdpTunnelListener::doRecvPacket() { +#ifdef LINUX +#if ((ASIO_VERSION / 100 % 1000) < 11) + socket_->async_receive( + asio::null_buffers(), +#else + socket_->async_wait( + asio::ip::tcp::socket::wait_read, +#endif + std::bind(&UdpTunnelListener::readHandler, this, std::placeholders::_1)); +#else + read_msg_ = Connector::getRawBuffer(); + socket_->async_receive_from( + asio::buffer(read_msg_.first, read_msg_.second), remote_endpoint_, + [this](const std::error_code &ec, std::size_t length) { + if (TRANSPORT_EXPECT_TRUE(!ec)) { + auto packet = Connector::getPacketFromBuffer(read_msg_.first, length); + auto connector_id = + std::hash<asio::ip::udp::endpoint>{}(remote_endpoint_); + auto connector = connectors_.find(connector_id); + if (connector == connectors_.end()) { + // Create new connector corresponding to new client + auto ret = connectors_.emplace( + connector_id, std::make_shared<UdpTunnelConnector>( + socket_, strand_, receive_callback_, + [](Connector *, const std::error_code &) {}, + [](Connector *) {}, + [](Connector *, const std::error_code &) {}, + std::move(remote_endpoint_))); + connector = ret.first; + connector->second->setConnectorId(connector_id); + } + + UdpTunnelConnector *c = + dynamic_cast<UdpTunnelConnector *>(connector->second.get()); + c->doRecvPacket(packet); + doRecvPacket(); + } else if (ec.value() == + static_cast<int>(std::errc::operation_canceled)) { + LOG(ERROR) << "The connection has been closed by the application."; + return; + } else { + LOG(ERROR) << ec.value() << " " << ec.message(); + } + }); +#endif +} +} // namespace core +} // namespace transport
\ No newline at end of file diff --git a/libtransport/src/core/udp_listener.h b/libtransport/src/core/udp_listener.h new file mode 100644 index 000000000..d8095a262 --- /dev/null +++ b/libtransport/src/core/udp_listener.h @@ -0,0 +1,109 @@ +/* + * Copyright (c) 2021 Cisco and/or its affiliates. + */ + +#pragma once + +#include <hicn/transport/core/asio_wrapper.h> +#include <hicn/transport/core/connector.h> +#include <hicn/transport/portability/platform.h> + +#include <unordered_map> + +namespace std { +template <> +struct hash<asio::ip::udp::endpoint> { + size_t operator()(const asio::ip::udp::endpoint &endpoint) const; +}; +} // namespace std + +namespace transport { +namespace core { + +class UdpTunnelListener + : public std::enable_shared_from_this<UdpTunnelListener> { + using PacketReceivedCallback = Connector::PacketReceivedCallback; + using EndpointId = std::pair<uint32_t, uint16_t>; + + static constexpr uint16_t default_port = 5004; + + public: + using Ptr = std::shared_ptr<UdpTunnelListener>; + + template <typename ReceiveCallback> + UdpTunnelListener(asio::io_service &io_service, + ReceiveCallback &&receive_callback, + asio::ip::udp::endpoint endpoint = asio::ip::udp::endpoint( + asio::ip::udp::v4(), default_port)) + : io_service_(io_service), + strand_(std::make_shared<asio::io_service::strand>(io_service_)), + socket_(std::make_shared<asio::ip::udp::socket>(io_service_, + endpoint.protocol())), + local_endpoint_(endpoint), + receive_callback_(std::forward<ReceiveCallback>(receive_callback)), +#ifndef LINUX + read_msg_(nullptr, 0) +#else + iovecs_{}, + msgs_{}, + current_position_(0) +#endif + { + if (endpoint.protocol() == asio::ip::udp::v6()) { + std::error_code ec; + socket_->set_option(asio::ip::v6_only(false), ec); + // Call succeeds only on dual stack systems. + } + socket_->bind(local_endpoint_); + io_service_.post(std::bind(&UdpTunnelListener::doRecvPacket, this)); + } + + ~UdpTunnelListener(); + + void close(); + + int deleteConnector(Connector *connector) { + return (int)connectors_.erase(connector->getConnectorId()); + } + + template <typename ReceiveCallback> + void setReceiveCallback(ReceiveCallback &&callback) { + receive_callback_ = std::forward<ReceiveCallback>(callback); + } + + Connector *findConnector(Connector::Id connId) { + auto it = connectors_.find(connId); + if (it != connectors_.end()) { + return it->second.get(); + } + + return nullptr; + } + + private: + void doRecvPacket(); + + void readHandler(const std::error_code &ec); + + asio::io_service &io_service_; + std::shared_ptr<asio::io_service::strand> strand_; + std::shared_ptr<asio::ip::udp::socket> socket_; + asio::ip::udp::endpoint local_endpoint_; + asio::ip::udp::endpoint remote_endpoint_; + std::unordered_map<Connector::Id, std::shared_ptr<Connector>> connectors_; + + PacketReceivedCallback receive_callback_; + +#ifdef LINUX + struct iovec iovecs_[Connector::max_burst][8]; + struct mmsghdr msgs_[Connector::max_burst]; + struct sockaddr_storage remote_endpoints_[Connector::max_burst]; + std::uint8_t current_position_; +#else + std::pair<uint8_t *, std::size_t> read_msg_; +#endif +}; + +} // namespace core + +} // namespace transport |