aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'libtransport/src/core')
-rw-r--r--libtransport/src/core/CMakeLists.txt26
-rw-r--r--libtransport/src/core/constructor.cc36
-rw-r--r--libtransport/src/core/content_object.cc87
-rw-r--r--libtransport/src/core/errors.cc11
-rw-r--r--libtransport/src/core/errors.h7
-rw-r--r--libtransport/src/core/facade.h8
-rw-r--r--libtransport/src/core/global_configuration.cc1
-rw-r--r--libtransport/src/core/global_id_counter.h (renamed from libtransport/src/core/manifest.cc)30
-rw-r--r--libtransport/src/core/global_module_manager.h94
-rw-r--r--libtransport/src/core/global_workers.h45
-rw-r--r--libtransport/src/core/interest.cc143
-rw-r--r--libtransport/src/core/io_module.cc52
-rw-r--r--libtransport/src/core/local_connector.cc46
-rw-r--r--libtransport/src/core/local_connector.h50
-rw-r--r--libtransport/src/core/manifest.h151
-rw-r--r--libtransport/src/core/manifest_format.h168
-rw-r--r--libtransport/src/core/manifest_format_fixed.cc370
-rw-r--r--libtransport/src/core/manifest_format_fixed.h250
-rw-r--r--libtransport/src/core/manifest_inline.h142
-rw-r--r--libtransport/src/core/memif_connector.cc503
-rw-r--r--libtransport/src/core/memif_connector.h154
-rw-r--r--libtransport/src/core/name.cc86
-rw-r--r--libtransport/src/core/packet.cc679
-rw-r--r--libtransport/src/core/pending_interest.cc74
-rw-r--r--libtransport/src/core/pending_interest.h72
-rw-r--r--libtransport/src/core/portal.cc14
-rw-r--r--libtransport/src/core/portal.h646
-rw-r--r--libtransport/src/core/prefix.cc305
-rw-r--r--libtransport/src/core/tcp_socket_connector.cc16
-rw-r--r--libtransport/src/core/tcp_socket_connector.h2
-rw-r--r--libtransport/src/core/udp_connector.cc371
-rw-r--r--libtransport/src/core/udp_connector.h145
-rw-r--r--libtransport/src/core/udp_listener.cc182
-rw-r--r--libtransport/src/core/udp_listener.h109
34 files changed, 3313 insertions, 1762 deletions
diff --git a/libtransport/src/core/CMakeLists.txt b/libtransport/src/core/CMakeLists.txt
index e442bb863..777772a04 100644
--- a/libtransport/src/core/CMakeLists.txt
+++ b/libtransport/src/core/CMakeLists.txt
@@ -1,4 +1,4 @@
-# Copyright (c) 2017-2019 Cisco and/or its affiliates.
+# Copyright (c) 2021 Cisco and/or its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
@@ -14,14 +14,18 @@
list(APPEND HEADER_FILES
${CMAKE_CURRENT_SOURCE_DIR}/facade.h
${CMAKE_CURRENT_SOURCE_DIR}/manifest.h
- ${CMAKE_CURRENT_SOURCE_DIR}/manifest_inline.h
${CMAKE_CURRENT_SOURCE_DIR}/manifest_format_fixed.h
${CMAKE_CURRENT_SOURCE_DIR}/manifest_format.h
${CMAKE_CURRENT_SOURCE_DIR}/pending_interest.h
${CMAKE_CURRENT_SOURCE_DIR}/portal.h
${CMAKE_CURRENT_SOURCE_DIR}/errors.h
${CMAKE_CURRENT_SOURCE_DIR}/global_configuration.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/global_id_counter.h
${CMAKE_CURRENT_SOURCE_DIR}/local_connector.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/global_workers.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/udp_connector.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/udp_listener.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/global_module_manager.h
)
list(APPEND SOURCE_FILES
@@ -35,8 +39,22 @@ list(APPEND SOURCE_FILES
${CMAKE_CURRENT_SOURCE_DIR}/portal.cc
${CMAKE_CURRENT_SOURCE_DIR}/global_configuration.cc
${CMAKE_CURRENT_SOURCE_DIR}/io_module.cc
- ${CMAKE_CURRENT_SOURCE_DIR}/local_connector.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/udp_connector.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/udp_listener.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/constructor.cc
)
+if (NOT ${CMAKE_SYSTEM_NAME} MATCHES Android)
+ if (UNIX AND NOT APPLE)
+ list(APPEND SOURCE_FILES
+ ${CMAKE_CURRENT_SOURCE_DIR}/memif_connector.cc
+ )
+
+ list(APPEND HEADER_FILES
+ ${CMAKE_CURRENT_SOURCE_DIR}/memif_connector.h
+ )
+ endif()
+endif()
+
set(SOURCE_FILES ${SOURCE_FILES} PARENT_SCOPE)
-set(HEADER_FILES ${HEADER_FILES} PARENT_SCOPE) \ No newline at end of file
+set(HEADER_FILES ${HEADER_FILES} PARENT_SCOPE)
diff --git a/libtransport/src/core/constructor.cc b/libtransport/src/core/constructor.cc
new file mode 100644
index 000000000..0c7f0dfa8
--- /dev/null
+++ b/libtransport/src/core/constructor.cc
@@ -0,0 +1,36 @@
+/*
+ * Copyright (c) 2022 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <core/global_configuration.h>
+#include <core/global_module_manager.h>
+#include <core/global_workers.h>
+#include <hicn/transport/core/global_object_pool.h>
+
+namespace transport {
+namespace core {
+
+void __attribute__((constructor)) libtransportInit() {
+ // First the global module manager is initialized
+ GlobalModuleManager::getInstance();
+ // Then the packet allocator is initialized
+ PacketManager<>::getInstance();
+ // Then the global configuration is initialized
+ GlobalConfiguration::getInstance();
+ // Then the global workers are initialized
+ GlobalWorkers::getInstance();
+}
+
+} // namespace core
+} // namespace transport \ No newline at end of file
diff --git a/libtransport/src/core/content_object.cc b/libtransport/src/core/content_object.cc
index 411494fdf..7ed6c57ab 100644
--- a/libtransport/src/core/content_object.cc
+++ b/libtransport/src/core/content_object.cc
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Copyright (c) 2021-2022 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -15,6 +15,7 @@
#include <hicn/transport/core/content_object.h>
#include <hicn/transport/errors/errors.h>
+#include <hicn/transport/portability/endianess.h>
#include <hicn/transport/utils/branch_prediction.h>
extern "C" {
@@ -34,30 +35,29 @@ namespace core {
ContentObject::ContentObject(const Name &name, Packet::Format format,
std::size_t additional_header_size)
- : Packet(format, additional_header_size) {
- if (TRANSPORT_EXPECT_FALSE(
- hicn_data_set_name(format, packet_start_, &name.name_) < 0)) {
+ : Packet(HICN_PACKET_TYPE_DATA, format, additional_header_size) {
+ if (TRANSPORT_EXPECT_FALSE(hicn_data_set_name(&pkbuf_, &name.name_) < 0)) {
throw errors::RuntimeException("Error filling the packet name.");
}
- if (TRANSPORT_EXPECT_FALSE(hicn_data_get_name(format_, packet_start_,
- name_.getStructReference()) <
- 0)) {
+ if (TRANSPORT_EXPECT_FALSE(
+ hicn_data_get_name(&pkbuf_, &name_.getStructReference()) < 0)) {
throw errors::MalformedPacketException();
}
}
-#ifdef __ANDROID__
-ContentObject::ContentObject(hicn_format_t format,
+ContentObject::ContentObject(hicn_packet_format_t format,
std::size_t additional_header_size)
- : ContentObject(Name("0::0|0"), format, additional_header_size) {}
+ : ContentObject(
+#ifdef __ANDROID__
+ Name("0::0|0"),
#else
-ContentObject::ContentObject(hicn_format_t format,
- std::size_t additional_header_size)
- : ContentObject(Packet::base_name, format, additional_header_size) {}
+ Packet::base_name,
#endif
+ format, additional_header_size) {
+}
-ContentObject::ContentObject(const Name &name, hicn_format_t format,
+ContentObject::ContentObject(const Name &name, hicn_packet_format_t format,
std::size_t additional_header_size,
const uint8_t *payload, std::size_t size)
: ContentObject(name, format, additional_header_size) {
@@ -80,9 +80,8 @@ ContentObject::~ContentObject() {}
const Name &ContentObject::getName() const {
if (!name_) {
- if (hicn_data_get_name(format_, packet_start_,
- (hicn_name_t *)name_.getConstStructReference()) <
- 0) {
+ if (hicn_data_get_name(
+ &pkbuf_, (hicn_name_t *)&name_.getConstStructReference()) < 0) {
throw errors::MalformedPacketException();
}
}
@@ -93,31 +92,27 @@ const Name &ContentObject::getName() const {
Name &ContentObject::getWritableName() { return const_cast<Name &>(getName()); }
void ContentObject::setName(const Name &name) {
- if (hicn_data_set_name(format_, packet_start_,
- name.getConstStructReference()) < 0) {
+ if (hicn_data_set_name(&pkbuf_, &name.getConstStructReference()) < 0) {
throw errors::RuntimeException("Error setting content object name.");
}
- if (hicn_data_get_name(format_, packet_start_, name_.getStructReference()) <
- 0) {
+ if (hicn_data_get_name(&pkbuf_, &name_.getStructReference()) < 0) {
throw errors::MalformedPacketException();
}
}
-uint32_t ContentObject::getPathLabel() const {
- uint32_t path_label;
- if (hicn_data_get_path_label(packet_start_, &path_label) < 0) {
+hicn_path_label_t ContentObject::getPathLabel() const {
+ hicn_path_label_t path_label;
+ if (hicn_data_get_path_label(&pkbuf_, &path_label) < 0) {
throw errors::RuntimeException(
"Error retrieving the path label from content object");
}
- return ntohl(path_label);
+ return path_label;
}
-ContentObject &ContentObject::setPathLabel(uint32_t path_label) {
- path_label = htonl(path_label);
- if (hicn_data_set_path_label((hicn_header_t *)packet_start_, path_label) <
- 0) {
+ContentObject &ContentObject::setPathLabel(hicn_path_label_t path_label) {
+ if (hicn_data_set_path_label(&pkbuf_, path_label) < 0) {
throw errors::RuntimeException(
"Error setting the path label from content object");
}
@@ -125,18 +120,18 @@ ContentObject &ContentObject::setPathLabel(uint32_t path_label) {
return *this;
}
-void ContentObject::setLocator(const ip_address_t &ip_address) {
- if (hicn_data_set_locator(format_, packet_start_, &ip_address) < 0) {
+void ContentObject::setLocator(const hicn_ip_address_t &ip_address) {
+ if (hicn_data_set_locator(&pkbuf_, &ip_address) < 0) {
throw errors::RuntimeException("Error setting content object locator");
}
return;
}
-ip_address_t ContentObject::getLocator() const {
- ip_address_t ip;
+hicn_ip_address_t ContentObject::getLocator() const {
+ hicn_ip_address_t ip;
- if (hicn_data_get_locator(format_, packet_start_, &ip) < 0) {
+ if (hicn_data_get_locator(&pkbuf_, &ip) < 0) {
throw errors::RuntimeException("Error getting content object locator.");
}
@@ -144,7 +139,7 @@ ip_address_t ContentObject::getLocator() const {
}
void ContentObject::setLifetime(uint32_t lifetime) {
- if (hicn_data_set_expiry_time(packet_start_, lifetime) < 0) {
+ if (hicn_data_set_expiry_time(&pkbuf_, lifetime) < 0) {
throw errors::MalformedPacketException();
}
}
@@ -152,7 +147,7 @@ void ContentObject::setLifetime(uint32_t lifetime) {
uint32_t ContentObject::getLifetime() const {
uint32_t lifetime = 0;
- if (hicn_data_get_expiry_time(packet_start_, &lifetime) < 0) {
+ if (hicn_data_get_expiry_time(&pkbuf_, &lifetime) < 0) {
throw errors::MalformedPacketException();
}
@@ -160,13 +155,29 @@ uint32_t ContentObject::getLifetime() const {
}
void ContentObject::resetForHash() {
- if (hicn_data_reset_for_hash(
- format_, reinterpret_cast<hicn_header_t *>(packet_start_)) < 0) {
+ if (hicn_data_reset_for_hash(&pkbuf_) < 0) {
throw errors::RuntimeException(
"Error resetting content object fields for hash computation.");
}
}
+bool ContentObject::isLast() const {
+ int is_last = 0;
+ if (hicn_data_is_last(&pkbuf_, &is_last) < 0) {
+ throw errors::RuntimeException(
+ "Impossible to get last data flag from packet header.");
+ }
+
+ return is_last;
+}
+
+void ContentObject::setLast() {
+ if (hicn_data_set_last(&pkbuf_) < 0) {
+ throw errors::RuntimeException(
+ "Impossible to set last data flag to packet header.");
+ }
+}
+
} // end namespace core
} // end namespace transport
diff --git a/libtransport/src/core/errors.cc b/libtransport/src/core/errors.cc
index 82647a60b..68fd7bf38 100644
--- a/libtransport/src/core/errors.cc
+++ b/libtransport/src/core/errors.cc
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -39,6 +39,15 @@ std::string core_category_impl::message(int ev) const {
case core_error::configuration_not_applied: {
return "Configuration was not applied due to wrong parameters.";
}
+ case core_error::send_failed: {
+ return "Error sending data to socket.";
+ }
+ case core_error::send_buffer_allocation_failed: {
+ return "Error allocating buffers to send data.";
+ }
+ case core_error::receive_failed: {
+ return "Error receiving data from socket.";
+ }
default: {
return "Unknown core error";
}
diff --git a/libtransport/src/core/errors.h b/libtransport/src/core/errors.h
index a46f1dbcd..4532e6dc5 100644
--- a/libtransport/src/core/errors.h
+++ b/libtransport/src/core/errors.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -36,7 +36,10 @@ const std::error_category& core_category();
enum class core_error {
success = 0,
configuration_parse_failed,
- configuration_not_applied
+ configuration_not_applied,
+ send_failed,
+ send_buffer_allocation_failed,
+ receive_failed
};
/**
diff --git a/libtransport/src/core/facade.h b/libtransport/src/core/facade.h
index 199081271..77c1d16d2 100644
--- a/libtransport/src/core/facade.h
+++ b/libtransport/src/core/facade.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -15,16 +15,16 @@
#pragma once
+#include <core/manifest.h>
#include <core/manifest_format_fixed.h>
-#include <core/manifest_inline.h>
#include <core/portal.h>
namespace transport {
namespace core {
-using ContentObjectManifest = core::ManifestInline<ContentObject, Fixed>;
-using InterestManifest = core::ManifestInline<Interest, Fixed>;
+using ContentObjectManifest = core::Manifest<Fixed>;
+using InterestManifest = core::Manifest<Fixed>;
} // namespace core
diff --git a/libtransport/src/core/global_configuration.cc b/libtransport/src/core/global_configuration.cc
index 9da37c2fa..f53e1f0e2 100644
--- a/libtransport/src/core/global_configuration.cc
+++ b/libtransport/src/core/global_configuration.cc
@@ -68,6 +68,7 @@ void GlobalConfiguration::parseConfiguration(const std::string& path) {
// variable comes first.
std::unique_lock<std::mutex> lck(cp_mtx_);
if (const char* env_c = std::getenv(GlobalConfiguration::conf_file)) {
+ conf_file_path_ = env_c;
parseTransportConfig(env_c);
} else if (!path.empty()) {
conf_file_path_ = path;
diff --git a/libtransport/src/core/manifest.cc b/libtransport/src/core/global_id_counter.h
index 3f890f3d0..0a67b76d5 100644
--- a/libtransport/src/core/manifest.cc
+++ b/libtransport/src/core/global_id_counter.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -13,21 +13,27 @@
* limitations under the License.
*/
-#include <hicn/transport/core/manifest.h>
+#pragma once
-namespace transport {
+#include <hicn/transport/utils/singleton.h>
-namespace core {
+#include <atomic>
+#include <mutex>
-std::string ManifestEncoding::manifest_type = std::string("manifest_type");
+namespace transport {
-std::map<ManifestType, std::string> ManifestEncoding::manifest_types = {
- {FINAL_CHUNK_NUMBER, "FinalChunkNumber"}, {NAME_LIST, "NameList"}};
+namespace core {
-std::string ManifestEncoding::final_chunk_number =
- std::string("final_chunk_number");
-std::string ManifestEncoding::content_name = std::string("content_name");
+template <typename T = uint64_t>
+class GlobalCounter : public utils::Singleton<GlobalCounter<T>> {
+ public:
+ friend class utils::Singleton<GlobalCounter>;
+ T getNext() { return counter_++; }
-} // end namespace core
+ private:
+ GlobalCounter() : counter_(0) {}
+ std::atomic<T> counter_;
+};
-} // end namespace transport \ No newline at end of file
+} // namespace core
+} // namespace transport \ No newline at end of file
diff --git a/libtransport/src/core/global_module_manager.h b/libtransport/src/core/global_module_manager.h
new file mode 100644
index 000000000..c9d272cdb
--- /dev/null
+++ b/libtransport/src/core/global_module_manager.h
@@ -0,0 +1,94 @@
+/*
+ * Copyright (c) 2022 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <glog/logging.h>
+#include <hicn/transport/utils/singleton.h>
+
+#ifndef _WIN32
+#include <dlfcn.h>
+#endif
+
+#include <atomic>
+#include <iostream>
+#include <mutex>
+#include <unordered_map>
+
+namespace transport {
+namespace core {
+
+class GlobalModuleManager : public utils::Singleton<GlobalModuleManager> {
+ public:
+ friend class utils::Singleton<GlobalModuleManager>;
+
+ ~GlobalModuleManager() {
+ for (const auto &[key, value] : modules_) {
+ unload(value);
+ }
+ }
+
+ void *loadModule(const std::string &module_name) {
+ void *handle = nullptr;
+ const char *error = nullptr;
+
+ // Lock
+ std::unique_lock lck(mtx_);
+
+ auto it = modules_.find(module_name);
+ if (it != modules_.end()) {
+ return it->second;
+ }
+
+ // open module
+ handle = dlopen(module_name.c_str(), RTLD_NOW);
+ if (!handle) {
+ if ((error = dlerror()) != nullptr) {
+ LOG(ERROR) << error;
+ }
+ return nullptr;
+ }
+
+ auto ret = modules_.try_emplace(module_name, handle);
+ DCHECK(ret.second);
+
+ return handle;
+ }
+
+ void unload(void *handle) {
+ // destroy object and close module
+ dlclose(handle);
+ }
+
+ bool unloadModule(const std::string &module_name) {
+ // Lock
+ std::unique_lock lck(mtx_);
+ auto it = modules_.find(module_name);
+ if (it != modules_.end()) {
+ unload(it->second);
+ return true;
+ }
+
+ return false;
+ }
+
+ private:
+ GlobalModuleManager() = default;
+ std::mutex mtx_;
+ std::unordered_map<std::string, void *> modules_;
+};
+
+} // namespace core
+} // namespace transport \ No newline at end of file
diff --git a/libtransport/src/core/global_workers.h b/libtransport/src/core/global_workers.h
new file mode 100644
index 000000000..c5d794ef2
--- /dev/null
+++ b/libtransport/src/core/global_workers.h
@@ -0,0 +1,45 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/utils/singleton.h>
+#include <hicn/transport/utils/thread_pool.h>
+
+#include <atomic>
+#include <mutex>
+
+namespace transport {
+namespace core {
+
+class GlobalWorkers : public utils::Singleton<GlobalWorkers> {
+ public:
+ friend class utils::Singleton<GlobalWorkers>;
+
+ ::utils::EventThread& getWorker() {
+ return thread_pool_.getWorker(counter_++ % thread_pool_.getNThreads());
+ }
+
+ auto& getWorkers() { return thread_pool_.getWorkers(); }
+
+ private:
+ GlobalWorkers() : counter_(0), thread_pool_() {}
+
+ std::atomic_uint16_t counter_;
+ ::utils::ThreadPool thread_pool_;
+};
+
+} // namespace core
+} // namespace transport \ No newline at end of file
diff --git a/libtransport/src/core/interest.cc b/libtransport/src/core/interest.cc
index 9d868ced0..c3eb7c379 100644
--- a/libtransport/src/core/interest.cc
+++ b/libtransport/src/core/interest.cc
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Copyright (c) 2021-2022 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -21,7 +21,9 @@ extern "C" {
#ifndef _WIN32
TRANSPORT_CLANG_DISABLE_WARNING("-Wextern-c-compat")
#endif
+#include <hicn/base.h>
#include <hicn/hicn.h>
+#include <hicn/interest_manifest.h>
}
#include <cstring>
@@ -33,29 +35,30 @@ namespace core {
Interest::Interest(const Name &interest_name, Packet::Format format,
std::size_t additional_header_size)
- : Packet(format, additional_header_size) {
- if (hicn_interest_set_name(format_, packet_start_,
- interest_name.getConstStructReference()) < 0) {
+ : Packet(HICN_PACKET_TYPE_INTEREST, format, additional_header_size) {
+ if (hicn_interest_set_name(&pkbuf_,
+ &interest_name.getConstStructReference()) < 0) {
throw errors::MalformedPacketException();
}
- if (hicn_interest_get_name(format_, packet_start_,
- name_.getStructReference()) < 0) {
+ if (hicn_interest_get_name(&pkbuf_, &name_.getStructReference()) < 0) {
throw errors::MalformedPacketException();
}
}
+Interest::Interest(hicn_packet_format_t format,
+ std::size_t additional_header_size)
+ : Interest(
#ifdef __ANDROID__
-Interest::Interest(hicn_format_t format, std::size_t additional_header_size)
- : Interest(Name("0::0|0"), format, additional_header_size) {}
+ Name("0::0|0"),
#else
-Interest::Interest(hicn_format_t format, std::size_t additional_header_size)
- : Interest(base_name, format, additional_header_size) {}
+ base_name,
#endif
+ format, additional_header_size) {
+}
Interest::Interest(MemBuf &&buffer) : Packet(std::move(buffer)) {
- if (hicn_interest_get_name(format_, packet_start_,
- name_.getStructReference()) < 0) {
+ if (hicn_interest_get_name(&pkbuf_, &name_.getStructReference()) < 0) {
throw errors::MalformedPacketException();
}
}
@@ -73,15 +76,13 @@ Interest &Interest::operator=(const Interest &other) {
return (Interest &)Packet::operator=(other);
}
-Interest::~Interest() {}
+Interest::~Interest() = default;
const Name &Interest::getName() const {
- if (!name_) {
- if (hicn_interest_get_name(format_, packet_start_,
- (hicn_name_t *)name_.getConstStructReference()) <
- 0) {
- throw errors::MalformedPacketException();
- }
+ if (!name_ &&
+ hicn_interest_get_name(
+ &pkbuf_, (hicn_name_t *)&name_.getConstStructReference()) < 0) {
+ throw errors::MalformedPacketException();
}
return name_;
@@ -90,29 +91,27 @@ const Name &Interest::getName() const {
Name &Interest::getWritableName() { return const_cast<Name &>(getName()); }
void Interest::setName(const Name &name) {
- if (hicn_interest_set_name(format_, packet_start_,
- name.getConstStructReference()) < 0) {
+ if (hicn_interest_set_name(&pkbuf_, &name.getConstStructReference()) < 0) {
throw errors::RuntimeException("Error setting interest name.");
}
- if (hicn_interest_get_name(format_, packet_start_,
- name_.getStructReference()) < 0) {
+ if (hicn_interest_get_name(&pkbuf_, &name_.getStructReference()) < 0) {
throw errors::MalformedPacketException();
}
}
-void Interest::setLocator(const ip_address_t &ip_address) {
- if (hicn_interest_set_locator(format_, packet_start_, &ip_address) < 0) {
+void Interest::setLocator(const hicn_ip_address_t &ip_address) {
+ if (hicn_interest_set_locator(&pkbuf_, &ip_address) < 0) {
throw errors::RuntimeException("Error setting interest locator.");
}
return;
}
-ip_address_t Interest::getLocator() const {
- ip_address_t ip;
+hicn_ip_address_t Interest::getLocator() const {
+ hicn_ip_address_t ip;
- if (hicn_interest_get_locator(format_, packet_start_, &ip) < 0) {
+ if (hicn_interest_get_locator(&pkbuf_, &ip) < 0) {
throw errors::RuntimeException("Error getting interest locator.");
}
@@ -120,7 +119,7 @@ ip_address_t Interest::getLocator() const {
}
void Interest::setLifetime(uint32_t lifetime) {
- if (hicn_interest_set_lifetime(packet_start_, lifetime) < 0) {
+ if (hicn_interest_set_lifetime(&pkbuf_, lifetime) < 0) {
throw errors::MalformedPacketException();
}
}
@@ -128,7 +127,7 @@ void Interest::setLifetime(uint32_t lifetime) {
uint32_t Interest::getLifetime() const {
uint32_t lifetime = 0;
- if (hicn_interest_get_lifetime(packet_start_, &lifetime) < 0) {
+ if (hicn_interest_get_lifetime(&pkbuf_, &lifetime) < 0) {
throw errors::MalformedPacketException();
}
@@ -136,14 +135,20 @@ uint32_t Interest::getLifetime() const {
}
void Interest::resetForHash() {
- if (hicn_interest_reset_for_hash(
- format_, reinterpret_cast<hicn_header_t *>(packet_start_)) < 0) {
+ if (hicn_interest_reset_for_hash(&pkbuf_) < 0) {
throw errors::RuntimeException(
"Error resetting interest fields for hash computation.");
}
+
+ // Reset request bitmap in manifest
+ if (hasManifest()) {
+ auto int_manifest_header =
+ (interest_manifest_header_t *)(writableData() + headerSize());
+ memset(int_manifest_header->request_bitmap, 0, BITMAP_SIZE * sizeof(u32));
+ }
}
-bool Interest::hasManifest() {
+bool Interest::hasManifest() const {
return (getPayloadType() == PayloadType::MANIFEST);
}
@@ -162,19 +167,43 @@ void Interest::encodeSuffixes() {
// We assume interest does not hold signature for the moment.
auto int_manifest_header =
- (InterestManifestHeader *)(writableData() + headerSize());
- int_manifest_header->n_suffixes = suffix_set_.size();
+ (interest_manifest_header_t *)(writableData() + headerSize());
+
+ interest_manifest_init(int_manifest_header, name_.getSuffix());
+ for (auto it = suffix_set_.begin(); it != suffix_set_.end(); it++) {
+ interest_manifest_add_suffix(int_manifest_header, *it);
+ }
+
std::size_t additional_length =
- sizeof(InterestManifestHeader) +
+ sizeof(interest_manifest_header_t) +
int_manifest_header->n_suffixes * sizeof(uint32_t);
- uint32_t *suffix = (uint32_t *)(int_manifest_header + 1);
- for (auto it = suffix_set_.begin(); it != suffix_set_.end(); it++, suffix++) {
- *suffix = *it;
+ append(additional_length);
+ updateLength();
+}
+
+void Interest::serializeSuffixes() {
+ if (!hasManifest()) {
+ return;
}
- append(additional_length);
- updateLength(additional_length);
+ // We assume interest does not hold signature for the moment.
+ auto int_manifest_header =
+ (interest_manifest_header_t *)(writableData() + headerSize());
+ // Serialize interest manifest
+ interest_manifest_serialize(int_manifest_header);
+}
+
+void Interest::deserializeSuffixes() {
+ if (!hasManifest()) {
+ return;
+ }
+
+ // We assume interest does not hold signature for the moment.
+ auto int_manifest_header =
+ (interest_manifest_header_t *)(writableData() + headerSize());
+ // Serialize interest manifest
+ interest_manifest_deserialize(int_manifest_header);
}
uint32_t *Interest::firstSuffix() {
@@ -182,7 +211,7 @@ uint32_t *Interest::firstSuffix() {
return nullptr;
}
- auto ret = (InterestManifestHeader *)(writableData() + headerSize());
+ auto ret = (interest_manifest_header_t *)(writableData() + headerSize());
ret += 1;
return (uint32_t *)ret;
@@ -193,11 +222,39 @@ uint32_t Interest::numberOfSuffixes() {
return 0;
}
- auto header = (InterestManifestHeader *)(writableData() + headerSize());
+ auto header = (interest_manifest_header_t *)(writableData() + headerSize());
return header->n_suffixes;
}
+hicn_uword *Interest::getRequestBitmap() {
+ if (!hasManifest()) return nullptr;
+
+ auto header = (interest_manifest_header_t *)(writableData() + headerSize());
+ return header->request_bitmap;
+}
+
+interest_manifest_header_t *Interest::getIntManifestHeader() {
+ if (!hasManifest()) return nullptr;
+
+ auto header = (interest_manifest_header_t *)(writableData() + headerSize());
+ return header;
+};
+
+void Interest::setRequestBitmap(const uint32_t *request_bitmap) {
+ if (!hasManifest()) return;
+
+ auto header = (interest_manifest_header_t *)(writableData() + headerSize());
+ memcpy(header->request_bitmap, request_bitmap,
+ BITMAP_SIZE * sizeof(uint32_t));
+}
+
+bool Interest::isValid() {
+ if (!hasManifest()) return true;
+ auto header = (interest_manifest_header_t *)(writableData() + headerSize());
+ return interest_manifest_is_valid(header, payloadSize());
+}
+
} // end namespace core
} // end namespace transport
diff --git a/libtransport/src/core/io_module.cc b/libtransport/src/core/io_module.cc
index a751eabf5..0fdb735c4 100644
--- a/libtransport/src/core/io_module.cc
+++ b/libtransport/src/core/io_module.cc
@@ -16,11 +16,14 @@
#ifndef _WIN32
#include <dlfcn.h>
#endif
+#include <core/global_module_manager.h>
#include <glog/logging.h>
#include <hicn/transport/core/io_module.h>
+#include <iostream>
+
#ifdef ANDROID
-#include <io_modules/udp/hicn_forwarder_module.h>
+#include <io_modules/hicn-light/hicn_forwarder_module.h>
#elif _WIN32
#include <hicn/util/windows/windows_utils.h>
#endif
@@ -36,53 +39,28 @@ IoModule *IoModule::load(const char *module_name) {
#ifdef ANDROID
return new HicnForwarderModule();
#else
- void *handle = 0;
- IoModule *module = 0;
- IoModule *(*creator)(void) = 0;
- const char *error = 0;
+ IoModule *iomodule = nullptr;
+ IoModule *(*creator)(void) = nullptr;
+ const char *error = nullptr;
- // open module
- handle = dlopen(module_name, RTLD_NOW);
- if (!handle) {
- if ((error = dlerror()) != 0) {
- LOG(ERROR) << error;
- }
- return 0;
- }
+ auto handle = GlobalModuleManager::getInstance().loadModule(module_name);
// get factory method
creator = (IoModule * (*)(void)) dlsym(handle, "create_module");
if (!creator) {
- if ((error = dlerror()) != 0) {
- LOG(ERROR) << error;
- return 0;
+ if ((error = dlerror()) != nullptr) {
+ LOG(ERROR) << error << ": " << module_name;
}
+
+ return nullptr;
}
// create object and return it
- module = (*creator)();
- module->handle_ = handle;
-
- return module;
-#endif
-}
-
-bool IoModule::unload(IoModule *module) {
- if (!module) {
- return false;
- }
+ iomodule = (*creator)();
-#ifdef ANDROID
- delete module;
-#else
- // destroy object and close module
- void *handle = module->handle_;
- delete module;
- dlclose(handle);
+ return iomodule;
#endif
-
- return true;
}
} // namespace core
-} // namespace transport \ No newline at end of file
+} // namespace transport
diff --git a/libtransport/src/core/local_connector.cc b/libtransport/src/core/local_connector.cc
deleted file mode 100644
index 50dadc677..000000000
--- a/libtransport/src/core/local_connector.cc
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Copyright (c) 2017-2020 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <core/local_connector.h>
-#include <glog/logging.h>
-#include <hicn/transport/core/asio_wrapper.h>
-#include <hicn/transport/core/content_object.h>
-#include <hicn/transport/core/interest.h>
-#include <hicn/transport/errors/not_implemented_exception.h>
-
-namespace transport {
-namespace core {
-
-LocalConnector::~LocalConnector() {}
-
-void LocalConnector::close() { state_ = State::CLOSED; }
-
-void LocalConnector::send(Packet &packet) {
- if (!isConnected()) {
- return;
- }
-
- DLOG_IF(INFO, VLOG_IS_ON(3)) << "Sending packet to local socket.";
- io_service_.get().post([this, p{packet.shared_from_this()}]() mutable {
- receive_callback_(this, *p, std::make_error_code(std::errc(0)));
- });
-}
-
-void LocalConnector::send(const uint8_t *packet, std::size_t len) {
- throw errors::NotImplementedException();
-}
-
-} // namespace core
-} // namespace transport \ No newline at end of file
diff --git a/libtransport/src/core/local_connector.h b/libtransport/src/core/local_connector.h
index 0e2d8f676..bf64b71d6 100644
--- a/libtransport/src/core/local_connector.h
+++ b/libtransport/src/core/local_connector.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2020 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -15,10 +15,11 @@
#pragma once
+#include <core/errors.h>
#include <hicn/transport/core/asio_wrapper.h>
#include <hicn/transport/core/connector.h>
#include <hicn/transport/core/global_object_pool.h>
-#include <hicn/transport/utils/move_wrapper.h>
+#include <hicn/transport/errors/not_implemented_exception.h>
#include <hicn/transport/utils/shared_ptr_utils.h>
#include <io_modules/forwarder/errors.h>
@@ -34,19 +35,48 @@ class LocalConnector : public Connector {
OnClose &&close_callback, OnReconnect &&on_reconnect)
: Connector(receive_callback, packet_sent, close_callback, on_reconnect),
io_service_(io_service),
- io_service_work_(io_service_.get()) {
- state_ = State::CONNECTED;
- }
+ io_service_work_(io_service_.get()) {}
- ~LocalConnector() override;
+ ~LocalConnector() override = default;
- void send(Packet &packet) override;
+ auto shared_from_this() { return utils::shared_from(this); }
- void send(const uint8_t *packet, std::size_t len) override;
+ void send(Packet &packet) override { send(packet.shared_from_this()); }
- void close() override;
+ void send(const utils::MemBuf::Ptr &buffer) override {
+ throw errors::NotImplementedException();
+ }
- auto shared_from_this() { return utils::shared_from(this); }
+ void receive(const std::vector<utils::MemBuf::Ptr> &buffers) override {
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "Sending packet to local socket.";
+ std::weak_ptr<LocalConnector> self = shared_from_this();
+ io_service_.get().post([self, _buffers{std::move(buffers)}]() mutable {
+ if (auto ptr = self.lock()) {
+ ptr->receive_callback_(ptr.get(), _buffers,
+ make_error_code(core_error::success));
+ }
+ });
+ }
+
+ void reconnect() override {
+ state_ = State::CONNECTED;
+ std::weak_ptr<LocalConnector> self = shared_from_this();
+ io_service_.get().post([self]() {
+ if (auto ptr = self.lock()) {
+ ptr->on_reconnect_callback_(ptr.get(),
+ make_error_code(core_error::success));
+ }
+ });
+ }
+
+ void close() override {
+ std::weak_ptr<LocalConnector> self = shared_from_this();
+ io_service_.get().post([self]() mutable {
+ if (auto ptr = self.lock()) {
+ ptr->on_close_callback_(ptr.get());
+ }
+ });
+ }
private:
std::reference_wrapper<asio::io_service> io_service_;
diff --git a/libtransport/src/core/manifest.h b/libtransport/src/core/manifest.h
index 9b25ebd67..40832bb6b 100644
--- a/libtransport/src/core/manifest.h
+++ b/libtransport/src/core/manifest.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -16,138 +16,73 @@
#pragma once
#include <core/manifest_format.h>
-#include <hicn/transport/core/content_object.h>
-#include <hicn/transport/core/name.h>
-
-#include <set>
+#include <glog/logging.h>
+#include <hicn/transport/auth/verifier.h>
+#include <hicn/transport/core/global_object_pool.h>
+#include <hicn/transport/core/packet.h>
namespace transport {
-
namespace core {
-using typename core::Name;
-using typename core::Packet;
-using typename core::PayloadType;
-
-template <typename Base, typename FormatTraits, typename ManifestImpl>
-class Manifest : public Base {
- static_assert(std::is_base_of<Packet, Base>::value,
- "Base must inherit from packet!");
-
+template <typename FormatTraits>
+class Manifest : public FormatTraits::Encoder, public FormatTraits::Decoder {
public:
- // core::ContentObjectManifest::Ptr
+ using Ptr = std::shared_ptr<Manifest>;
using Encoder = typename FormatTraits::Encoder;
using Decoder = typename FormatTraits::Decoder;
- Manifest(std::size_t signature_size = 0)
- : Base(HF_INET6_TCP_AH, signature_size),
- encoder_(*this, signature_size),
- decoder_(*this) {
- Base::setPayloadType(PayloadType::MANIFEST);
- }
-
- Manifest(const core::Name &name, std::size_t signature_size = 0)
- : Base(name, HF_INET6_TCP_AH, signature_size),
- encoder_(*this, signature_size),
- decoder_(*this) {
- Base::setPayloadType(PayloadType::MANIFEST);
- }
+ using Hash = typename FormatTraits::Hash;
+ using HashType = typename FormatTraits::HashType;
+ using Suffix = typename FormatTraits::Suffix;
+ using SuffixList = typename FormatTraits::SuffixList;
+ using HashEntry = std::pair<auth::CryptoHashType, std::vector<uint8_t>>;
- template <typename T>
- Manifest(T &&base)
- : Base(std::forward<T &&>(base)),
- encoder_(*this, 0, false),
- decoder_(*this) {
- Base::setPayloadType(PayloadType::MANIFEST);
+ Manifest(Packet::Ptr packet, bool clear = false)
+ : Encoder(packet, clear), Decoder(packet), packet_(packet) {
+ packet->setPayloadType(PayloadType::MANIFEST);
}
virtual ~Manifest() = default;
- std::size_t estimateManifestSize(std::size_t additional_entries = 0) {
- return static_cast<ManifestImpl &>(*this).estimateManifestSizeImpl(
- additional_entries);
- }
+ Packet::Ptr getPacket() const { return packet_; }
- /*
- * After the call to encode, users MUST call clear before adding data
- * to the manifest.
- */
- Manifest &encode() { return static_cast<ManifestImpl &>(*this).encodeImpl(); }
-
- Manifest &decode() {
- Manifest::decoder_.decode();
-
- manifest_type_ = decoder_.getManifestType();
- hash_algorithm_ = decoder_.getHashAlgorithm();
- is_last_ = decoder_.getIsFinalManifest();
-
- return static_cast<ManifestImpl &>(*this).decodeImpl();
+ void setHeaders(ManifestType type, uint8_t max_capacity, HashType hash_algo,
+ bool is_last, const Name &base_name) {
+ Encoder::setType(type);
+ Encoder::setMaxCapacity(max_capacity);
+ Encoder::setHashAlgorithm(hash_algo);
+ Encoder::setIsLast(is_last);
+ Encoder::setBaseName(base_name);
}
- static std::size_t getManifestHeaderSize() {
- return Encoder::getManifestHeaderSize();
- }
+ auth::Verifier::SuffixMap getSuffixMap() const {
+ auth::Verifier::SuffixMap suffix_map;
- static std::size_t getManifestEntrySize() {
- return Encoder::getManifestEntrySize();
- }
+ HashType hash_algo = Decoder::getHashAlgorithm();
+ SuffixList suffix_list = Decoder::getEntries();
- Manifest &setManifestType(ManifestType type) {
- manifest_type_ = type;
- encoder_.setManifestType(manifest_type_);
- return *this;
- }
+ for (auto it = suffix_list.begin(); it != suffix_list.end(); ++it) {
+ Hash hash(it->second, Hash::getSize(hash_algo), hash_algo);
+ suffix_map[it->first] = hash;
+ }
- Manifest &setHashAlgorithm(auth::CryptoHashType hash_algorithm) {
- hash_algorithm_ = hash_algorithm;
- encoder_.setHashAlgorithm(hash_algorithm_);
- return *this;
+ return suffix_map;
}
- auth::CryptoHashType getHashAlgorithm() { return hash_algorithm_; }
-
- ManifestType getManifestType() const { return manifest_type_; }
-
- bool isFinalManifest() const { return is_last_; }
-
- Manifest &setVersion(ManifestVersion version) {
- encoder_.setVersion(version);
- return *this;
- }
-
- Manifest &setFinalBlockNumber(std::uint32_t final_block_number) {
- encoder_.setFinalBlockNumber(final_block_number);
- return *this;
- }
-
- uint32_t getFinalBlockNumber() const {
- return decoder_.getFinalBlockNumber();
- }
-
- ManifestVersion getVersion() const { return decoder_.getVersion(); }
-
- Manifest &setFinalManifest(bool is_final_manifest) {
- encoder_.setIsFinalManifest(is_final_manifest);
- is_last_ = is_final_manifest;
- return *this;
- }
-
- Manifest &clear() {
- encoder_.clear();
- decoder_.clear();
- return *this;
- }
+ static Manifest::Ptr createContentManifest(Packet::Format format,
+ const core::Name &manifest_name,
+ std::size_t signature_size) {
+ ContentObject::Ptr content_object =
+ core::PacketManager<>::getInstance().getPacket<ContentObject>(
+ format, signature_size);
+ content_object->setName(manifest_name);
+ return std::make_shared<Manifest>(content_object, true);
+ };
protected:
- ManifestType manifest_type_;
- auth::CryptoHashType hash_algorithm_;
- bool is_last_;
-
- Encoder encoder_;
- Decoder decoder_;
+ Packet::Ptr packet_;
};
} // end namespace core
-
} // end namespace transport
diff --git a/libtransport/src/core/manifest_format.h b/libtransport/src/core/manifest_format.h
index 90d221f5e..89412316a 100644
--- a/libtransport/src/core/manifest_format.h
+++ b/libtransport/src/core/manifest_format.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -17,46 +17,40 @@
#include <hicn/transport/auth/crypto_hash.h>
#include <hicn/transport/core/name.h>
+#include <hicn/transport/interfaces/socket_options_keys.h>
+#include <protocols/fec_utils.h>
#include <cinttypes>
#include <type_traits>
#include <unordered_map>
namespace transport {
-
namespace core {
-enum class ManifestFields : uint8_t {
- VERSION,
- HASH_ALGORITHM,
- SEGMENT_CALCULATION_STRATEGY,
- FINAL_MANIFEST,
- NAME_HASH_LIST,
- BASE_NAME
-};
-
-enum class ManifestVersion : uint8_t {
- VERSION_1 = 1,
-};
-
enum class ManifestType : uint8_t {
INLINE_MANIFEST = 1,
FINAL_CHUNK_NUMBER = 2,
FLIC_MANIFEST = 3,
};
-/**
- * INCREMENTAL: Manifests will be received inline with the data with no specific
- * assumption regarding the manifest capacity. Consumers can send interests
- * using a +1 heuristic.
- *
- * MANIFEST_CAPACITY_BASED: manifests with capacity N have a suffix multiple of
- * N+1: 0, N+1, 2(N+1) etc. Contents have a suffix incremented by 1 except when
- * it conflicts with a manifest: 1, 2, ..., N, N+2, N+3, ..., 2N+1, 2N+3
- */
-enum class NextSegmentCalculationStrategy : uint8_t {
- INCREMENTAL = 1,
- MANIFEST_CAPACITY_BASED = 2,
+struct ParamsRTC {
+ std::uint64_t timestamp;
+ std::uint32_t prod_rate;
+ std::uint32_t prod_seg;
+ protocol::fec::FECType fec_type;
+
+ bool operator==(const ParamsRTC &other) const {
+ return (timestamp == other.timestamp && prod_rate == other.prod_rate &&
+ prod_seg == other.prod_seg && fec_type == other.fec_type);
+ }
+};
+
+struct ParamsBytestream {
+ std::uint32_t final_segment;
+
+ bool operator==(const ParamsBytestream &other) const {
+ return (final_segment == other.final_segment);
+ }
};
template <typename T>
@@ -84,22 +78,25 @@ class ManifestEncoder {
return static_cast<Implementation &>(*this).clearImpl();
}
- ManifestEncoder &setManifestType(ManifestType type) {
- return static_cast<Implementation &>(*this).setManifestTypeImpl(type);
+ bool isEncoded() const {
+ return static_cast<const Implementation &>(*this).isEncodedImpl();
}
- ManifestEncoder &setHashAlgorithm(auth::CryptoHashType hash) {
- return static_cast<Implementation &>(*this).setHashAlgorithmImpl(hash);
+ ManifestEncoder &setType(ManifestType type) {
+ return static_cast<Implementation &>(*this).setTypeImpl(type);
}
- ManifestEncoder &setFinalChunkNumber(uint32_t final_chunk) {
- return static_cast<Implementation &>(*this).setFinalChunkImpl(final_chunk);
+ ManifestEncoder &setMaxCapacity(uint8_t max_capacity) {
+ return static_cast<Implementation &>(*this).setMaxCapacityImpl(
+ max_capacity);
}
- ManifestEncoder &setNextSegmentCalculationStrategy(
- NextSegmentCalculationStrategy strategy) {
- return static_cast<Implementation &>(*this)
- .setNextSegmentCalculationStrategyImpl(strategy);
+ ManifestEncoder &setHashAlgorithm(auth::CryptoHashType hash) {
+ return static_cast<Implementation &>(*this).setHashAlgorithmImpl(hash);
+ }
+
+ ManifestEncoder &setIsLast(bool is_last) {
+ return static_cast<Implementation &>(*this).setIsLastImpl(is_last);
}
template <
@@ -110,40 +107,36 @@ class ManifestEncoder {
return static_cast<Implementation &>(*this).setBaseNameImpl(name);
}
- template <typename Hash>
- ManifestEncoder &addSuffixAndHash(uint32_t suffix, Hash &&hash) {
- return static_cast<Implementation &>(*this).addSuffixAndHashImpl(
- suffix, std::forward<Hash &&>(hash));
+ ManifestEncoder &setParamsBytestream(const ParamsBytestream &params) {
+ return static_cast<Implementation &>(*this).setParamsBytestreamImpl(params);
}
- ManifestEncoder &setIsFinalManifest(bool is_last) {
- return static_cast<Implementation &>(*this).setIsFinalManifestImpl(is_last);
+ ManifestEncoder &setParamsRTC(const ParamsRTC &params) {
+ return static_cast<Implementation &>(*this).setParamsRTCImpl(params);
}
- ManifestEncoder &setVersion(ManifestVersion version) {
- return static_cast<Implementation &>(*this).setVersionImpl(version);
- }
-
- std::size_t estimateSerializedLength(std::size_t number_of_entries) {
- return static_cast<Implementation &>(*this).estimateSerializedLengthImpl(
- number_of_entries);
+ template <typename Hash>
+ ManifestEncoder &addEntry(uint32_t suffix, Hash &&hash) {
+ return static_cast<Implementation &>(*this).addEntryImpl(
+ suffix, std::forward<Hash>(hash));
}
- ManifestEncoder &update() {
- return static_cast<Implementation &>(*this).updateImpl();
+ ManifestEncoder &removeEntry(uint32_t suffix) {
+ return static_cast<Implementation &>(*this).removeEntryImpl(suffix);
}
- ManifestEncoder &setFinalBlockNumber(std::uint32_t final_block_number) {
- return static_cast<Implementation &>(*this).setFinalBlockNumberImpl(
- final_block_number);
+ std::size_t manifestHeaderSize() const {
+ return static_cast<const Implementation &>(*this).manifestHeaderSizeImpl();
}
- static std::size_t getManifestHeaderSize() {
- return Implementation::getManifestHeaderSizeImpl();
+ std::size_t manifestPayloadSize(size_t additional_entries = 0) const {
+ return static_cast<const Implementation &>(*this).manifestPayloadSizeImpl(
+ additional_entries);
}
- static std::size_t getManifestEntrySize() {
- return Implementation::getManifestEntrySizeImpl();
+ std::size_t manifestSize(size_t additional_entries = 0) const {
+ return static_cast<const Implementation &>(*this).manifestSizeImpl(
+ additional_entries);
}
};
@@ -152,55 +145,68 @@ class ManifestDecoder {
public:
virtual ~ManifestDecoder() = default;
+ ManifestDecoder &decode() {
+ return static_cast<Implementation &>(*this).decodeImpl();
+ }
+
ManifestDecoder &clear() {
return static_cast<Implementation &>(*this).clearImpl();
}
- void decode() { static_cast<Implementation &>(*this).decodeImpl(); }
+ bool isDecoded() const {
+ return static_cast<const Implementation &>(*this).isDecodedImpl();
+ }
- ManifestType getManifestType() const {
- return static_cast<const Implementation &>(*this).getManifestTypeImpl();
+ ManifestType getType() const {
+ return static_cast<const Implementation &>(*this).getTypeImpl();
}
- auth::CryptoHashType getHashAlgorithm() const {
- return static_cast<const Implementation &>(*this).getHashAlgorithmImpl();
+ interface::ProductionProtocolAlgorithms getTransportType() const {
+ return static_cast<const Implementation &>(*this).getTransportTypeImpl();
+ }
+
+ uint8_t getMaxCapacity() const {
+ return static_cast<const Implementation &>(*this).getMaxCapacityImpl();
}
- uint32_t getFinalChunkNumber() const {
- return static_cast<const Implementation &>(*this).getFinalChunkImpl();
+ auth::CryptoHashType getHashAlgorithm() const {
+ return static_cast<const Implementation &>(*this).getHashAlgorithmImpl();
}
- NextSegmentCalculationStrategy getNextSegmentCalculationStrategy() const {
- return static_cast<const Implementation &>(*this)
- .getNextSegmentCalculationStrategyImpl();
+ bool getIsLast() const {
+ return static_cast<const Implementation &>(*this).getIsLastImpl();
}
core::Name getBaseName() const {
return static_cast<const Implementation &>(*this).getBaseNameImpl();
}
- auto getSuffixHashList() {
- return static_cast<Implementation &>(*this).getSuffixHashListImpl();
+ ParamsBytestream getParamsBytestream() const {
+ return static_cast<const Implementation &>(*this).getParamsBytestreamImpl();
+ }
+
+ ParamsRTC getParamsRTC() const {
+ return static_cast<const Implementation &>(*this).getParamsRTCImpl();
}
- bool getIsFinalManifest() const {
- return static_cast<const Implementation &>(*this).getIsFinalManifestImpl();
+ auto getEntries() const {
+ return static_cast<const Implementation &>(*this).getEntriesImpl();
}
- ManifestVersion getVersion() const {
- return static_cast<const Implementation &>(*this).getVersionImpl();
+ std::size_t manifestHeaderSize() const {
+ return static_cast<const Implementation &>(*this).manifestHeaderSizeImpl();
}
- std::size_t estimateSerializedLength(std::size_t number_of_entries) const {
- return static_cast<const Implementation &>(*this)
- .estimateSerializedLengthImpl(number_of_entries);
+ std::size_t manifestPayloadSize(size_t additional_entries = 0) const {
+ return static_cast<const Implementation &>(*this).manifestPayloadSizeImpl(
+ additional_entries);
}
- uint32_t getFinalBlockNumber() const {
- return static_cast<const Implementation &>(*this).getFinalBlockNumberImpl();
+ std::size_t manifestSize(size_t additional_entries = 0) const {
+ return static_cast<const Implementation &>(*this).manifestSizeImpl(
+ additional_entries);
}
};
} // namespace core
-
} // namespace transport
diff --git a/libtransport/src/core/manifest_format_fixed.cc b/libtransport/src/core/manifest_format_fixed.cc
index 11d4a56cb..bda666c0c 100644
--- a/libtransport/src/core/manifest_format_fixed.cc
+++ b/libtransport/src/core/manifest_format_fixed.cc
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Copyright (c) 2021-2022 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -18,211 +18,331 @@
#include <hicn/transport/utils/literals.h>
namespace transport {
-
namespace core {
-// TODO use preallocated pool of membufs
-FixedManifestEncoder::FixedManifestEncoder(Packet &packet,
- std::size_t signature_size,
- bool clear)
+// ---------------------------------------------------------
+// FixedManifest
+// ---------------------------------------------------------
+size_t FixedManifest::manifestHeaderSize(
+ interface::ProductionProtocolAlgorithms transport_type) {
+ uint32_t params_size = 0;
+
+ switch (transport_type) {
+ case interface::ProductionProtocolAlgorithms::BYTE_STREAM:
+ params_size = MANIFEST_PARAMS_BYTESTREAM_SIZE;
+ break;
+ case interface::ProductionProtocolAlgorithms::RTC_PROD:
+ params_size = MANIFEST_PARAMS_RTC_SIZE;
+ break;
+ default:
+ break;
+ }
+
+ return MANIFEST_META_SIZE + MANIFEST_ENTRY_META_SIZE + params_size;
+}
+
+size_t FixedManifest::manifestPayloadSize(size_t nb_entries) {
+ return nb_entries * MANIFEST_ENTRY_SIZE;
+}
+
+// ---------------------------------------------------------
+// FixedManifestEncoder
+// ---------------------------------------------------------
+FixedManifestEncoder::FixedManifestEncoder(Packet::Ptr packet, bool clear)
: packet_(packet),
- max_size_(Packet::default_mtu - packet_.headerSize()),
- manifest_header_(reinterpret_cast<ManifestHeader *>(
- packet_.writableData() + packet_.headerSize())),
- manifest_entries_(
- reinterpret_cast<ManifestEntry *>(manifest_header_ + 1)),
- current_entry_(0),
- signature_size_(signature_size) {
+ transport_type_(interface::ProductionProtocolAlgorithms::UNKNOWN),
+ encoded_(false) {
+ manifest_meta_ = reinterpret_cast<ManifestMeta *>(packet_->writableData() +
+ packet_->headerSize());
+ manifest_entry_meta_ =
+ reinterpret_cast<ManifestEntryMeta *>(manifest_meta_ + 1);
+
if (clear) {
- *manifest_header_ = {0};
+ *manifest_meta_ = {0};
+ *manifest_entry_meta_ = {0};
}
}
FixedManifestEncoder::~FixedManifestEncoder() {}
FixedManifestEncoder &FixedManifestEncoder::encodeImpl() {
- packet_.append(sizeof(ManifestHeader) +
- manifest_header_->number_of_entries * sizeof(ManifestEntry));
- packet_.updateLength();
+ if (encoded_) {
+ return *this;
+ }
+
+ // Copy manifest header
+ manifest_meta_->transport_type = static_cast<uint8_t>(transport_type_);
+ manifest_entry_meta_->nb_entries = manifest_entries_.size();
+
+ packet_->append(manifestHeaderSizeImpl());
+
+ packet_->updateLength();
+ auto params = reinterpret_cast<uint8_t *>(manifest_entry_meta_ + 1);
+
+ switch (transport_type_) {
+ case interface::ProductionProtocolAlgorithms::BYTE_STREAM: {
+ auto bytestream = reinterpret_cast<const uint8_t *>(&params_bytestream_);
+ std::memcpy(params, bytestream, MANIFEST_PARAMS_BYTESTREAM_SIZE);
+ break;
+ }
+ case interface::ProductionProtocolAlgorithms::RTC_PROD: {
+ auto rtc = reinterpret_cast<const uint8_t *>(&params_rtc_);
+ std::memcpy(params, rtc, MANIFEST_PARAMS_RTC_SIZE);
+ break;
+ }
+ default:
+ break;
+ }
+
+ // Copy manifest entries
+ auto payload = reinterpret_cast<const uint8_t *>(manifest_entries_.data());
+ packet_->appendPayload(payload, manifestPayloadSizeImpl());
+
+ packet_->updateLength();
+ if (TRANSPORT_EXPECT_FALSE(packet_->payloadSize() < manifestSizeImpl())) {
+ throw errors::RuntimeException("Error encoding the manifest");
+ }
+
+ encoded_ = true;
return *this;
}
FixedManifestEncoder &FixedManifestEncoder::clearImpl() {
- packet_.trimEnd(sizeof(ManifestHeader) +
- manifest_header_->number_of_entries * sizeof(ManifestEntry));
- current_entry_ = 0;
- *manifest_header_ = {0};
+ if (encoded_) {
+ packet_->trimEnd(manifestSizeImpl());
+ }
+
+ transport_type_ = interface::ProductionProtocolAlgorithms::UNKNOWN;
+ encoded_ = false;
+ *manifest_meta_ = {0};
+ *manifest_entry_meta_ = {0};
+ params_bytestream_ = {0};
+ params_rtc_ = {0};
+ manifest_entries_.clear();
+
return *this;
}
-FixedManifestEncoder &FixedManifestEncoder::setHashAlgorithmImpl(
- auth::CryptoHashType algorithm) {
- manifest_header_->hash_algorithm = static_cast<uint8_t>(algorithm);
+bool FixedManifestEncoder::isEncodedImpl() const { return encoded_; }
+
+FixedManifestEncoder &FixedManifestEncoder::setTypeImpl(
+ ManifestType manifest_type) {
+ manifest_meta_->type = static_cast<uint8_t>(manifest_type);
return *this;
}
-FixedManifestEncoder &FixedManifestEncoder::setManifestTypeImpl(
- ManifestType manifest_type) {
- manifest_header_->manifest_type = static_cast<uint8_t>(manifest_type);
+FixedManifestEncoder &FixedManifestEncoder::setMaxCapacityImpl(
+ uint8_t max_capacity) {
+ manifest_meta_->max_capacity = max_capacity;
+ return *this;
+}
+
+FixedManifestEncoder &FixedManifestEncoder::setHashAlgorithmImpl(
+ auth::CryptoHashType algorithm) {
+ manifest_meta_->hash_algorithm = static_cast<uint8_t>(algorithm);
return *this;
}
-FixedManifestEncoder &
-FixedManifestEncoder::setNextSegmentCalculationStrategyImpl(
- NextSegmentCalculationStrategy strategy) {
- manifest_header_->next_segment_strategy = static_cast<uint8_t>(strategy);
+FixedManifestEncoder &FixedManifestEncoder::setIsLastImpl(bool is_last) {
+ manifest_meta_->is_last = static_cast<uint8_t>(is_last);
return *this;
}
FixedManifestEncoder &FixedManifestEncoder::setBaseNameImpl(
const core::Name &base_name) {
- base_name.copyToDestination(
- reinterpret_cast<uint8_t *>(&manifest_header_->prefix[0]), false);
- manifest_header_->flags.ipv6 =
+ manifest_entry_meta_->is_ipv6 =
base_name.getAddressFamily() == AF_INET6 ? 1_U8 : 0_U8;
+ base_name.copyPrefixToDestination(
+ reinterpret_cast<uint8_t *>(&manifest_entry_meta_->prefix[0]));
return *this;
}
-FixedManifestEncoder &FixedManifestEncoder::addSuffixAndHashImpl(
- uint32_t suffix, const auth::CryptoHash &hash) {
- auto _hash = hash.getDigest();
- addSuffixHashBytes(suffix, _hash.data(), _hash.size());
+FixedManifestEncoder &FixedManifestEncoder::setParamsBytestreamImpl(
+ const ParamsBytestream &params) {
+ transport_type_ = interface::ProductionProtocolAlgorithms::BYTE_STREAM;
+ params_bytestream_ = TransportParamsBytestream{
+ .final_segment = params.final_segment,
+ };
return *this;
}
-void FixedManifestEncoder::addSuffixHashBytes(uint32_t suffix,
- const uint8_t *hash,
- std::size_t length) {
- manifest_entries_[current_entry_].suffix = htonl(suffix);
- // std::copy(hash, hash + length,
- // manifest_entries_[current_entry_].hash);
- std::memcpy(
- reinterpret_cast<uint8_t *>(manifest_entries_[current_entry_].hash), hash,
- length);
+FixedManifestEncoder &FixedManifestEncoder::setParamsRTCImpl(
+ const ParamsRTC &params) {
+ transport_type_ = interface::ProductionProtocolAlgorithms::RTC_PROD;
+ params_rtc_ = TransportParamsRTC{
+ .timestamp = params.timestamp,
+ .prod_rate = params.prod_rate,
+ .prod_seg = params.prod_seg,
+ .fec_type = static_cast<uint32_t>(params.fec_type),
+ };
+ return *this;
+}
- manifest_header_->number_of_entries++;
- current_entry_++;
+FixedManifestEncoder &FixedManifestEncoder::addEntryImpl(
+ uint32_t suffix, const auth::CryptoHash &hash) {
+ ManifestEntry last_entry = {
+ .suffix = portability::host_to_net(suffix),
+ .hash = {0},
+ };
- if (TRANSPORT_EXPECT_FALSE(estimateSerializedLengthImpl() > max_size_)) {
- throw errors::RuntimeException("Manifest size exceeded the packet MTU!");
- }
-}
+ auto last_hash = reinterpret_cast<uint8_t *>(last_entry.hash);
+ std::memcpy(last_hash, hash.getDigest()->data(), hash.getSize());
-FixedManifestEncoder &FixedManifestEncoder::setIsFinalManifestImpl(
- bool is_last) {
- manifest_header_->flags.is_last = static_cast<uint8_t>(is_last);
+ manifest_entries_.push_back(last_entry);
return *this;
}
-FixedManifestEncoder &FixedManifestEncoder::setVersionImpl(
- ManifestVersion version) {
- manifest_header_->version = static_cast<uint8_t>(version);
+FixedManifestEncoder &FixedManifestEncoder::removeEntryImpl(uint32_t suffix) {
+ for (auto it = manifest_entries_.begin(); it != manifest_entries_.end();) {
+ if (it->suffix == suffix)
+ it = manifest_entries_.erase(it);
+ else
+ ++it;
+ }
return *this;
}
-std::size_t FixedManifestEncoder::estimateSerializedLengthImpl(
- std::size_t additional_entries) {
- return sizeof(ManifestHeader) +
- (manifest_header_->number_of_entries + additional_entries) *
- sizeof(ManifestEntry);
+size_t FixedManifestEncoder::manifestHeaderSizeImpl() const {
+ return FixedManifest::manifestHeaderSize(transport_type_);
}
-FixedManifestEncoder &FixedManifestEncoder::updateImpl() {
- max_size_ = Packet::default_mtu - packet_.headerSize() - signature_size_;
- return *this;
+size_t FixedManifestEncoder::manifestPayloadSizeImpl(
+ size_t additional_entries) const {
+ return FixedManifest::manifestPayloadSize(manifest_entries_.size() +
+ additional_entries);
}
-FixedManifestEncoder &FixedManifestEncoder::setFinalBlockNumberImpl(
- std::uint32_t final_block_number) {
- manifest_header_->final_block_number = htonl(final_block_number);
- return *this;
+size_t FixedManifestEncoder::manifestSizeImpl(size_t additional_entries) const {
+ return manifestHeaderSizeImpl() + manifestPayloadSizeImpl(additional_entries);
}
-std::size_t FixedManifestEncoder::getManifestHeaderSizeImpl() {
- return sizeof(ManifestHeader);
+// ---------------------------------------------------------
+// FixedManifestDecoder
+// ---------------------------------------------------------
+FixedManifestDecoder::FixedManifestDecoder(Packet::Ptr packet)
+ : packet_(packet), decoded_(false) {
+ manifest_meta_ =
+ reinterpret_cast<ManifestMeta *>(packet_->getPayload()->writableData());
+ manifest_entry_meta_ =
+ reinterpret_cast<ManifestEntryMeta *>(manifest_meta_ + 1);
}
-std::size_t FixedManifestEncoder::getManifestEntrySizeImpl() {
- return sizeof(ManifestEntry);
-}
-
-FixedManifestDecoder::FixedManifestDecoder(Packet &packet)
- : packet_(packet),
- manifest_header_(reinterpret_cast<ManifestHeader *>(
- packet_.getPayload()->writableData())),
- manifest_entries_(reinterpret_cast<ManifestEntry *>(
- packet_.getPayload()->writableData() + sizeof(ManifestHeader))) {}
-
FixedManifestDecoder::~FixedManifestDecoder() {}
-void FixedManifestDecoder::decodeImpl() {
- std::size_t packet_size = packet_.payloadSize();
+FixedManifestDecoder &FixedManifestDecoder::decodeImpl() {
+ if (decoded_) {
+ return *this;
+ }
- if (packet_size < sizeof(ManifestHeader) ||
- packet_size < estimateSerializedLengthImpl()) {
+ if (packet_->payloadSize() < manifestSizeImpl()) {
throw errors::RuntimeException(
- "The packet does not match expected manifest size.");
+ "The packet payload size does not match expected manifest size");
}
-}
-FixedManifestDecoder &FixedManifestDecoder::clearImpl() { return *this; }
+ switch (getTransportTypeImpl()) {
+ case interface::ProductionProtocolAlgorithms::BYTE_STREAM:
+ params_bytestream_ = reinterpret_cast<TransportParamsBytestream *>(
+ manifest_entry_meta_ + 1);
+ manifest_entries_ =
+ reinterpret_cast<ManifestEntry *>(params_bytestream_ + 1);
+ break;
+ case interface::ProductionProtocolAlgorithms::RTC_PROD:
+ params_rtc_ =
+ reinterpret_cast<TransportParamsRTC *>(manifest_entry_meta_ + 1);
+ manifest_entries_ = reinterpret_cast<ManifestEntry *>(params_rtc_ + 1);
+ break;
+ default:
+ manifest_entries_ =
+ reinterpret_cast<ManifestEntry *>(manifest_entry_meta_ + 1);
+ break;
+ }
-ManifestType FixedManifestDecoder::getManifestTypeImpl() const {
- return static_cast<ManifestType>(manifest_header_->manifest_type);
+ decoded_ = true;
+ return *this;
}
-auth::CryptoHashType FixedManifestDecoder::getHashAlgorithmImpl() const {
- return static_cast<auth::CryptoHashType>(manifest_header_->hash_algorithm);
+FixedManifestDecoder &FixedManifestDecoder::clearImpl() {
+ decoded_ = false;
+ return *this;
}
-NextSegmentCalculationStrategy
-FixedManifestDecoder::getNextSegmentCalculationStrategyImpl() const {
- return static_cast<NextSegmentCalculationStrategy>(
- manifest_header_->next_segment_strategy);
+bool FixedManifestDecoder::isDecodedImpl() const { return decoded_; }
+
+ManifestType FixedManifestDecoder::getTypeImpl() const {
+ return static_cast<ManifestType>(manifest_meta_->type);
}
-typename Fixed::SuffixList FixedManifestDecoder::getSuffixHashListImpl() {
- typename Fixed::SuffixList hash_list;
+interface::ProductionProtocolAlgorithms
+FixedManifestDecoder::getTransportTypeImpl() const {
+ return static_cast<interface::ProductionProtocolAlgorithms>(
+ manifest_meta_->transport_type);
+}
- for (int i = 0; i < manifest_header_->number_of_entries; i++) {
- hash_list.insert(hash_list.end(),
- std::make_pair(ntohl(manifest_entries_[i].suffix),
- reinterpret_cast<uint8_t *>(
- &manifest_entries_[i].hash[0])));
- }
+uint8_t FixedManifestDecoder::getMaxCapacityImpl() const {
+ return manifest_meta_->max_capacity;
+}
- return hash_list;
+auth::CryptoHashType FixedManifestDecoder::getHashAlgorithmImpl() const {
+ return static_cast<auth::CryptoHashType>(manifest_meta_->hash_algorithm);
+}
+
+bool FixedManifestDecoder::getIsLastImpl() const {
+ return static_cast<bool>(manifest_meta_->is_last);
}
core::Name FixedManifestDecoder::getBaseNameImpl() const {
- if (static_cast<bool>(manifest_header_->flags.ipv6)) {
- return core::Name(AF_INET6,
- reinterpret_cast<uint8_t *>(&manifest_header_->prefix));
+ if (static_cast<bool>(manifest_entry_meta_->is_ipv6)) {
+ return core::Name(
+ AF_INET6, reinterpret_cast<uint8_t *>(&manifest_entry_meta_->prefix));
} else {
- return core::Name(AF_INET,
- reinterpret_cast<uint8_t *>(&manifest_header_->prefix));
+ return core::Name(
+ AF_INET, reinterpret_cast<uint8_t *>(&manifest_entry_meta_->prefix));
}
}
-bool FixedManifestDecoder::getIsFinalManifestImpl() const {
- return static_cast<bool>(manifest_header_->flags.is_last);
+ParamsBytestream FixedManifestDecoder::getParamsBytestreamImpl() const {
+ return ParamsBytestream{
+ .final_segment = params_bytestream_->final_segment,
+ };
+}
+
+ParamsRTC FixedManifestDecoder::getParamsRTCImpl() const {
+ return ParamsRTC{
+ .timestamp = params_rtc_->timestamp,
+ .prod_rate = params_rtc_->prod_rate,
+ .prod_seg = params_rtc_->prod_seg,
+ .fec_type = static_cast<protocol::fec::FECType>(params_rtc_->fec_type),
+ };
}
-ManifestVersion FixedManifestDecoder::getVersionImpl() const {
- return static_cast<ManifestVersion>(manifest_header_->version);
+typename Fixed::SuffixList FixedManifestDecoder::getEntriesImpl() const {
+ typename Fixed::SuffixList hash_list;
+
+ for (int i = 0; i < manifest_entry_meta_->nb_entries; i++) {
+ hash_list.insert(
+ hash_list.end(),
+ std::make_pair(
+ portability::net_to_host(manifest_entries_[i].suffix),
+ reinterpret_cast<uint8_t *>(&manifest_entries_[i].hash[0])));
+ }
+
+ return hash_list;
}
-std::size_t FixedManifestDecoder::estimateSerializedLengthImpl(
- std::size_t additional_entries) const {
- return sizeof(ManifestHeader) +
- (additional_entries + manifest_header_->number_of_entries) *
- sizeof(ManifestEntry);
+size_t FixedManifestDecoder::manifestHeaderSizeImpl() const {
+ interface::ProductionProtocolAlgorithms type = getTransportTypeImpl();
+ return FixedManifest::manifestHeaderSize(type);
}
-uint32_t FixedManifestDecoder::getFinalBlockNumberImpl() const {
- return ntohl(manifest_header_->final_block_number);
+size_t FixedManifestDecoder::manifestPayloadSizeImpl(
+ size_t additional_entries) const {
+ size_t nb_entries = manifest_entry_meta_->nb_entries + additional_entries;
+ return FixedManifest::manifestPayloadSize(nb_entries);
}
-} // end namespace core
+size_t FixedManifestDecoder::manifestSizeImpl(size_t additional_entries) const {
+ return manifestHeaderSizeImpl() + manifestPayloadSizeImpl(additional_entries);
+}
+} // end namespace core
} // end namespace transport
diff --git a/libtransport/src/core/manifest_format_fixed.h b/libtransport/src/core/manifest_format_fixed.h
index 56ad4ef6d..7ab371974 100644
--- a/libtransport/src/core/manifest_format_fixed.h
+++ b/libtransport/src/core/manifest_format_fixed.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -24,26 +24,72 @@ namespace transport {
namespace core {
-// 0 1 2 3
-// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
-// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
-// |Version| MType |HashAlg|NextStr| Flags |NumberOfEntries|
-// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
-// | Final Block Number |
-// +---------------------------------------------------------------|
-// | |
-// + +
-// | |
-// + Prefix +
-// | |
-// + +
-// | |
-// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
-// | Suffix |
-// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
-// | Hash Value |
-// | |
-// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+// Manifest Metadata:
+// 0 1 2 3
+// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+// | Type | TTYpe | Max Capacity | Hash Algo |L| Reserved |
+// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+
+// Manifest Entry Metadata:
+// 0 1 2 3
+// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+// | Nb entries |I| Reserved |
+// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+// | |
+// + +
+// | |
+// + Prefix +
+// | |
+// + +
+// | |
+// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+
+// Manifest Transport Parameters - Bytestream:
+// 0 1 2 3
+// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+// | Final Segment |
+// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+
+// Manifest Transport Parameters - RTC:
+// 0 1 2 3
+// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+// | |
+// + Timestamp +
+// | |
+// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+// | Production Rate |
+// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+// | Current Segment |
+// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+// | FEC Type |
+// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+
+// Manifest Entry:
+// 0 1 2 3
+// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+// | Packet Suffix |
+// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+// | |
+// + +
+// | |
+// + +
+// | |
+// + +
+// | |
+// + Packet Digest +
+// | |
+// + +
+// | |
+// + +
+// | |
+// + +
+// | |
+// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
class FixedManifestEncoder;
class FixedManifestDecoder;
@@ -58,113 +104,143 @@ struct Fixed {
using SuffixList = std::list<std::pair<uint32_t, uint8_t *>>;
};
-struct Flags {
- std::uint8_t ipv6 : 1;
- std::uint8_t is_last : 1;
- std::uint8_t unused : 6;
+const size_t MANIFEST_META_SIZE = 4;
+struct __attribute__((__packed__)) ManifestMeta {
+ std::uint8_t type : 4;
+ std::uint8_t transport_type : 4;
+ std::uint8_t max_capacity;
+ std::uint8_t hash_algorithm;
+ std::uint8_t is_last;
};
+static_assert(sizeof(ManifestMeta) == MANIFEST_META_SIZE);
-struct ManifestEntry {
+const size_t MANIFEST_ENTRY_META_SIZE = 20;
+struct __attribute__((__packed__)) ManifestEntryMeta {
+ std::uint8_t nb_entries;
+ std::uint8_t is_ipv6;
+ std::uint16_t unused;
+ std::uint32_t prefix[4];
+};
+static_assert(sizeof(ManifestEntryMeta) == MANIFEST_ENTRY_META_SIZE);
+
+const size_t MANIFEST_PARAMS_BYTESTREAM_SIZE = 4;
+struct __attribute__((__packed__)) TransportParamsBytestream {
+ std::uint32_t final_segment;
+};
+static_assert(sizeof(TransportParamsBytestream) ==
+ MANIFEST_PARAMS_BYTESTREAM_SIZE);
+
+const size_t MANIFEST_PARAMS_RTC_SIZE = 20;
+struct __attribute__((__packed__)) TransportParamsRTC {
+ std::uint64_t timestamp;
+ std::uint32_t prod_rate;
+ std::uint32_t prod_seg;
+ std::uint32_t fec_type;
+};
+static_assert(sizeof(TransportParamsRTC) == MANIFEST_PARAMS_RTC_SIZE);
+
+const size_t MANIFEST_ENTRY_SIZE = 36;
+struct __attribute__((__packed__)) ManifestEntry {
std::uint32_t suffix;
std::uint32_t hash[8];
};
+static_assert(sizeof(ManifestEntry) == MANIFEST_ENTRY_SIZE);
-struct ManifestHeader {
- std::uint8_t version : 4;
- std::uint8_t manifest_type : 4;
- std::uint8_t hash_algorithm : 4;
- std::uint8_t next_segment_strategy : 4;
- Flags flags;
- std::uint8_t number_of_entries;
- std::uint32_t final_block_number;
- std::uint32_t prefix[4];
- ManifestEntry entries[0];
+class FixedManifest {
+ public:
+ static size_t manifestHeaderSize(
+ interface::ProductionProtocolAlgorithms transport_type);
+ static size_t manifestPayloadSize(size_t nb_entries);
};
-static const constexpr std::uint8_t manifest_version = 1;
-
class FixedManifestEncoder : public ManifestEncoder<FixedManifestEncoder> {
public:
- FixedManifestEncoder(Packet &packet, std::size_t signature_size = 0,
- bool clear = true);
+ FixedManifestEncoder(Packet::Ptr packet, bool clear = false);
~FixedManifestEncoder();
FixedManifestEncoder &encodeImpl();
-
FixedManifestEncoder &clearImpl();
+ bool isEncodedImpl() const;
- FixedManifestEncoder &setManifestTypeImpl(ManifestType manifest_type);
-
+ // ManifestMeta
+ FixedManifestEncoder &setTypeImpl(ManifestType manifest_type);
+ FixedManifestEncoder &setMaxCapacityImpl(uint8_t max_capacity);
FixedManifestEncoder &setHashAlgorithmImpl(Fixed::HashType algorithm);
+ FixedManifestEncoder &setIsLastImpl(bool is_last);
- FixedManifestEncoder &setNextSegmentCalculationStrategyImpl(
- NextSegmentCalculationStrategy strategy);
-
+ // ManifestEntryMeta
FixedManifestEncoder &setBaseNameImpl(const core::Name &base_name);
- FixedManifestEncoder &addSuffixAndHashImpl(uint32_t suffix,
- const Fixed::Hash &hash);
-
- FixedManifestEncoder &setIsFinalManifestImpl(bool is_last);
+ // TransportParams
+ FixedManifestEncoder &setParamsBytestreamImpl(const ParamsBytestream &params);
+ FixedManifestEncoder &setParamsRTCImpl(const ParamsRTC &params);
- FixedManifestEncoder &setVersionImpl(ManifestVersion version);
+ // ManifestEntry
+ FixedManifestEncoder &addEntryImpl(uint32_t suffix, const Fixed::Hash &hash);
+ FixedManifestEncoder &removeEntryImpl(uint32_t suffix);
- std::size_t estimateSerializedLengthImpl(std::size_t additional_entries = 0);
-
- FixedManifestEncoder &updateImpl();
-
- FixedManifestEncoder &setFinalBlockNumberImpl(
- std::uint32_t final_block_number);
-
- static std::size_t getManifestHeaderSizeImpl();
-
- static std::size_t getManifestEntrySizeImpl();
+ size_t manifestHeaderSizeImpl() const;
+ size_t manifestPayloadSizeImpl(size_t additional_entries = 0) const;
+ size_t manifestSizeImpl(size_t additional_entries = 0) const;
private:
- void addSuffixHashBytes(uint32_t suffix, const uint8_t *hash,
- std::size_t length);
-
- Packet &packet_;
- std::size_t max_size_;
- ManifestHeader *manifest_header_;
- ManifestEntry *manifest_entries_;
- std::size_t current_entry_;
- std::size_t signature_size_;
+ Packet::Ptr packet_;
+ interface::ProductionProtocolAlgorithms transport_type_;
+ bool encoded_;
+
+ // Manifest Header
+ ManifestMeta *manifest_meta_;
+ ManifestEntryMeta *manifest_entry_meta_;
+ TransportParamsBytestream params_bytestream_;
+ TransportParamsRTC params_rtc_;
+
+ // Manifest Entries
+ std::vector<ManifestEntry> manifest_entries_;
};
class FixedManifestDecoder : public ManifestDecoder<FixedManifestDecoder> {
public:
- FixedManifestDecoder(Packet &packet);
+ FixedManifestDecoder(Packet::Ptr packet);
~FixedManifestDecoder();
- void decodeImpl();
-
+ FixedManifestDecoder &decodeImpl();
FixedManifestDecoder &clearImpl();
+ bool isDecodedImpl() const;
- ManifestType getManifestTypeImpl() const;
-
+ // ManifestMeta
+ ManifestType getTypeImpl() const;
+ interface::ProductionProtocolAlgorithms getTransportTypeImpl() const;
+ uint8_t getMaxCapacityImpl() const;
Fixed::HashType getHashAlgorithmImpl() const;
+ bool getIsLastImpl() const;
- NextSegmentCalculationStrategy getNextSegmentCalculationStrategyImpl() const;
-
- typename Fixed::SuffixList getSuffixHashListImpl();
-
+ // ManifestEntryMeta
core::Name getBaseNameImpl() const;
- bool getIsFinalManifestImpl() const;
+ // TransportParams
+ ParamsBytestream getParamsBytestreamImpl() const;
+ ParamsRTC getParamsRTCImpl() const;
- std::size_t estimateSerializedLengthImpl(
- std::size_t additional_entries = 0) const;
+ // ManifestEntry
+ typename Fixed::SuffixList getEntriesImpl() const;
- ManifestVersion getVersionImpl() const;
-
- uint32_t getFinalBlockNumberImpl() const;
+ size_t manifestHeaderSizeImpl() const;
+ size_t manifestPayloadSizeImpl(size_t additional_entries = 0) const;
+ size_t manifestSizeImpl(size_t additional_entries = 0) const;
private:
- Packet &packet_;
- ManifestHeader *manifest_header_;
+ Packet::Ptr packet_;
+ bool decoded_;
+
+ // Manifest Header
+ ManifestMeta *manifest_meta_;
+ ManifestEntryMeta *manifest_entry_meta_;
+ TransportParamsBytestream *params_bytestream_;
+ TransportParamsRTC *params_rtc_;
+
+ // Manifest Entries
ManifestEntry *manifest_entries_;
};
diff --git a/libtransport/src/core/manifest_inline.h b/libtransport/src/core/manifest_inline.h
deleted file mode 100644
index a487ccfe3..000000000
--- a/libtransport/src/core/manifest_inline.h
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <core/manifest.h>
-#include <core/manifest_format.h>
-#include <hicn/transport/portability/portability.h>
-
-#include <set>
-
-namespace transport {
-
-namespace core {
-
-template <typename Base, typename FormatTraits>
-class ManifestInline
- : public Manifest<Base, FormatTraits, ManifestInline<Base, FormatTraits>> {
- using ManifestBase =
- Manifest<Base, FormatTraits, ManifestInline<Base, FormatTraits>>;
-
- using Hash = typename FormatTraits::Hash;
- using HashType = typename FormatTraits::HashType;
- using Suffix = typename FormatTraits::Suffix;
- using SuffixList = typename FormatTraits::SuffixList;
- using HashEntry = std::pair<auth::CryptoHashType, std::vector<uint8_t>>;
-
- public:
- ManifestInline() : ManifestBase() {}
-
- ManifestInline(const core::Name &name, std::size_t signature_size = 0)
- : ManifestBase(name, signature_size) {}
-
- template <typename T>
- ManifestInline(T &&base) : ManifestBase(std::forward<T &&>(base)) {}
-
- static TRANSPORT_ALWAYS_INLINE ManifestInline *createManifest(
- const core::Name &manifest_name, ManifestVersion version,
- ManifestType type, HashType algorithm, bool is_last,
- const Name &base_name, NextSegmentCalculationStrategy strategy,
- std::size_t signature_size) {
- auto manifest = new ManifestInline(manifest_name, signature_size);
- manifest->setVersion(version);
- manifest->setManifestType(type);
- manifest->setHashAlgorithm(algorithm);
- manifest->setFinalManifest(is_last);
- manifest->setBaseName(base_name);
- manifest->setNextSegmentCalculationStrategy(strategy);
-
- return manifest;
- }
-
- ManifestInline &encodeImpl() {
- ManifestBase::encoder_.encode();
- return *this;
- }
-
- ManifestInline &decodeImpl() {
- base_name_ = ManifestBase::decoder_.getBaseName();
- next_segment_strategy_ =
- ManifestBase::decoder_.getNextSegmentCalculationStrategy();
- suffix_hash_map_ = ManifestBase::decoder_.getSuffixHashList();
-
- return *this;
- }
-
- std::size_t estimateManifestSizeImpl(std::size_t additional_entries = 0) {
- return ManifestBase::encoder_.estimateSerializedLength(additional_entries);
- }
-
- ManifestInline &setBaseName(const Name &name) {
- base_name_ = name;
- ManifestBase::encoder_.setBaseName(base_name_);
- return *this;
- }
-
- const Name &getBaseName() { return base_name_; }
-
- ManifestInline &addSuffixHash(Suffix suffix, const Hash &hash) {
- ManifestBase::encoder_.addSuffixAndHash(suffix, hash);
- return *this;
- }
-
- // Call this function only after the decode function!
- const SuffixList &getSuffixList() { return suffix_hash_map_; }
-
- ManifestInline &setNextSegmentCalculationStrategy(
- NextSegmentCalculationStrategy strategy) {
- next_segment_strategy_ = strategy;
- ManifestBase::encoder_.setNextSegmentCalculationStrategy(
- next_segment_strategy_);
- return *this;
- }
-
- NextSegmentCalculationStrategy getNextSegmentCalculationStrategy() {
- return next_segment_strategy_;
- }
-
- // Convert several manifests into a single map from suffixes to packet hashes.
- // All manifests must have been decoded beforehand.
- static std::unordered_map<Suffix, Hash> getSuffixMap(
- const std::vector<ManifestInline *> &manifests) {
- std::unordered_map<Suffix, Hash> suffix_map;
-
- for (auto manifest_ptr : manifests) {
- HashType hash_type = manifest_ptr->getHashAlgorithm();
- SuffixList suffix_list = manifest_ptr->getSuffixList();
-
- for (auto it = suffix_list.begin(); it != suffix_list.end(); ++it) {
- Hash hash(it->second, Hash::getSize(hash_type), hash_type);
- suffix_map[it->first] = hash;
- }
- }
-
- return suffix_map;
- }
-
- static std::unordered_map<Suffix, Hash> getSuffixMap(
- ManifestInline *manifest) {
- return getSuffixMap(std::vector<ManifestInline *>{manifest});
- }
-
- private:
- core::Name base_name_;
- NextSegmentCalculationStrategy next_segment_strategy_;
- SuffixList suffix_hash_map_;
-};
-
-} // namespace core
-} // namespace transport
diff --git a/libtransport/src/core/memif_connector.cc b/libtransport/src/core/memif_connector.cc
new file mode 100644
index 000000000..a224beb11
--- /dev/null
+++ b/libtransport/src/core/memif_connector.cc
@@ -0,0 +1,503 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <core/errors.h>
+#include <core/memif_connector.h>
+#include <glog/logging.h>
+#include <hicn/transport/errors/not_implemented_exception.h>
+#include <sys/epoll.h>
+
+#include <cstdlib>
+
+/* sstrncpy */
+#include <hicn/util/sstrncpy.h>
+
+#define CANCEL_TIMER 1
+
+namespace transport {
+
+namespace core {
+
+MemifConnector::MemifConnector(PacketReceivedCallback &&receive_callback,
+ PacketSentCallback &&packet_sent,
+ OnCloseCallback &&close_callback,
+ OnReconnectCallback &&on_reconnect,
+ asio::io_service &io_service,
+ std::string app_name)
+ : Connector(std::move(receive_callback), std::move(packet_sent),
+ std::move(close_callback), std::move(on_reconnect)),
+ event_reactor_(),
+ memif_worker_(std::bind(&MemifConnector::threadMain, this)),
+ timer_set_(false),
+ send_timer_(event_reactor_),
+ disconnect_timer_(event_reactor_),
+ io_service_(io_service),
+ work_(asio::make_work_guard(io_service_)),
+ memif_connection_({0}),
+ tx_buf_counter_(0),
+ is_reconnection_(false),
+ data_available_(false),
+ app_name_(app_name),
+ socket_filename_(""),
+ buffer_size_(kbuf_size),
+ log2_ring_size_(klog2_ring_size),
+ max_memif_bufs_(1 << klog2_ring_size) {}
+
+MemifConnector::~MemifConnector() {
+ try {
+ close();
+ } catch (errors::RuntimeException &e) {
+ // do nothing
+ }
+}
+
+void MemifConnector::connect(uint32_t memif_id, long memif_mode,
+ const std::string &socket_filename,
+ std::size_t buffer_size,
+ std::size_t log2_ring_size) {
+ state_ = State::CONNECTING;
+
+ memif_id_ = memif_id;
+ socket_filename_ = socket_filename;
+ buffer_size_ = buffer_size;
+ log2_ring_size_ = log2_ring_size;
+ max_memif_bufs_ = 1 << log2_ring_size;
+ createMemif(memif_id, memif_mode);
+}
+
+int MemifConnector::createMemif(uint32_t index, uint8_t is_master) {
+ int err = MEMIF_ERR_SUCCESS;
+
+ memif_socket_args_t socket_args;
+ memif_conn_args_t args;
+ memset(&socket_args, 0, sizeof(memif_socket_args_t));
+ memset(&args, 0, sizeof(memif_conn_args_t));
+
+ // Setup memif socket first
+
+ int rc = strcpy_s(socket_args.path, sizeof(socket_args.path) - 1,
+ socket_filename_.c_str());
+ if (rc != EOK) {
+ std::string error = "Provided socket path is larger than " +
+ std::to_string(sizeof(socket_args.path)) + " bytes.";
+ throw errors::RuntimeException(error);
+ }
+
+ rc = strcpy_s(socket_args.app_name, sizeof(socket_args.app_name) - 1,
+ app_name_.c_str());
+ if (rc != EOK) {
+ std::string error = "Provided app_name is larger than " +
+ std::to_string(sizeof(socket_args.app_name)) +
+ " bytes.";
+ throw errors::RuntimeException(error);
+ }
+
+ socket_args.on_control_fd_update = controlFdUpdate;
+ socket_args.alloc = nullptr;
+ socket_args.realloc = nullptr;
+ socket_args.free = nullptr;
+
+ err = memif_create_socket(&args.socket, &socket_args, this);
+
+ if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) {
+ throw errors::RuntimeException(memif_strerror(err));
+ }
+
+ // Setup memif connection using provided memif_socket_handle_t
+ args.is_master = is_master;
+ args.log2_ring_size = log2_ring_size_;
+ args.buffer_size = buffer_size_;
+ args.num_s2m_rings = 1;
+ args.num_m2s_rings = 1;
+ strcpy_s((char *)args.interface_name, sizeof(args.interface_name), IF_NAME);
+ args.mode = memif_interface_mode_t::MEMIF_INTERFACE_MODE_IP;
+ args.interface_id = index;
+ err = memif_create(&memif_connection_.conn, &args, onConnect, onDisconnect,
+ onInterrupt, this);
+
+ if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) {
+ throw errors::RuntimeException(memif_strerror(err));
+ }
+
+ memif_connection_.index = (uint16_t)index;
+ memif_connection_.tx_qid = 0;
+ /* alloc memif buffers */
+ memif_connection_.rx_buf_num = 0;
+ memif_connection_.rx_bufs = static_cast<memif_buffer_t *>(
+ malloc(sizeof(memif_buffer_t) * max_memif_bufs_));
+ memif_connection_.tx_buf_num = 0;
+ memif_connection_.tx_bufs = static_cast<memif_buffer_t *>(
+ malloc(sizeof(memif_buffer_t) * max_memif_bufs_));
+
+ return 0;
+}
+
+int MemifConnector::deleteMemif() {
+ if (memif_connection_.rx_bufs) {
+ free(memif_connection_.rx_bufs);
+ }
+
+ memif_connection_.rx_bufs = nullptr;
+ memif_connection_.rx_buf_num = 0;
+
+ if (memif_connection_.tx_bufs) {
+ free(memif_connection_.tx_bufs);
+ }
+
+ memif_connection_.tx_bufs = nullptr;
+ memif_connection_.tx_buf_num = 0;
+
+ int err;
+ /* disconenct then delete memif connection */
+ err = memif_delete(&memif_connection_.conn);
+
+ if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) {
+ LOG(ERROR) << "memif_delete: " << memif_strerror(err);
+ }
+
+ if (TRANSPORT_EXPECT_FALSE(memif_connection_.conn != nullptr)) {
+ LOG(ERROR) << "memif delete fail";
+ }
+
+ state_ = State::CLOSED;
+
+ return 0;
+}
+
+int MemifConnector::controlFdUpdate(memif_fd_event_t fde, void *private_ctx) {
+ auto self = reinterpret_cast<MemifConnector *>(private_ctx);
+ uint32_t evt = 0;
+
+ /* convert memif event definitions to epoll events */
+ auto events = fde.type;
+ auto fd = fde.fd;
+
+ if (events & MEMIF_FD_EVENT_ERROR) {
+ LOG(ERROR) << "memif fd event: Error";
+ return -1;
+ }
+
+ if (events & MEMIF_FD_EVENT_DEL) {
+ DLOG_IF(INFO, VLOG_IS_ON(4)) << "memif fd event: DEL fd " << fd;
+ return self->event_reactor_.delFileDescriptor(fd);
+ }
+
+ if (events & MEMIF_FD_EVENT_MOD) {
+ DLOG_IF(INFO, VLOG_IS_ON(4)) << "memif fd event: MOD fd " << fd;
+ return self->event_reactor_.modFileDescriptor(fd, evt);
+ }
+
+ if (events & MEMIF_FD_EVENT_READ) {
+ evt |= EPOLLIN;
+ }
+
+ if (events & MEMIF_FD_EVENT_WRITE) {
+ evt |= EPOLLOUT;
+ }
+
+ DLOG_IF(INFO, VLOG_IS_ON(4)) << "memif fd event: ADD fd " << fd;
+ return self->event_reactor_.addFileDescriptor(
+ fd, evt, [fde](const utils::Event &evt) -> int {
+ int event = 0;
+ int memif_err = 0;
+
+ if (evt.events & EPOLLIN) {
+ event |= MEMIF_FD_EVENT_READ;
+ }
+
+ if (evt.events & EPOLLOUT) {
+ event |= MEMIF_FD_EVENT_WRITE;
+ }
+
+ if (evt.events & EPOLLERR) {
+ event |= MEMIF_FD_EVENT_ERROR;
+ }
+
+ memif_err = memif_control_fd_handler(fde.private_ctx,
+ memif_fd_event_type_t(event));
+
+ if (TRANSPORT_EXPECT_FALSE(memif_err != MEMIF_ERR_SUCCESS)) {
+ LOG(ERROR) << "memif_control_fd_handler: "
+ << memif_strerror(memif_err);
+ }
+
+ return 0;
+ });
+}
+
+uint16_t MemifConnector::bufferAlloc(long n, uint16_t qid,
+ std::error_code &ec) {
+ int err;
+ uint16_t r = 0;
+ /* set data pointer to shared memory and set buffer_len to shared mmeory
+ * buffer len */
+ err = memif_buffer_alloc(memif_connection_.conn, qid,
+ memif_connection_.tx_bufs, n, &r, buffer_size_);
+
+ if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) {
+ ec = make_error_code(core_error::send_buffer_allocation_failed);
+ }
+
+ memif_connection_.tx_buf_num += r;
+ return r;
+}
+
+uint16_t MemifConnector::txBurst(uint16_t qid, std::error_code &ec) {
+ int err = MEMIF_ERR_SUCCESS;
+ ec = make_error_code(core_error::success);
+ uint16_t tx = 0;
+
+ /* inform peer memif interface about data in shared memory buffers */
+ /* mark memif buffers as free */
+ err = memif_tx_burst(memif_connection_.conn, qid, memif_connection_.tx_bufs,
+ memif_connection_.tx_buf_num, &tx);
+
+ if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) {
+ ec = make_error_code(core_error::send_failed);
+ }
+
+ memif_connection_.tx_buf_num -= tx;
+ return tx;
+}
+
+void MemifConnector::scheduleSend(std::uint64_t delay) {
+ if (!timer_set_) {
+ timer_set_ = true;
+ send_timer_.expiresFromNow(std::chrono::microseconds(delay));
+ send_timer_.asyncWait(
+ std::bind(&MemifConnector::sendCallback, this, std::placeholders::_1));
+ }
+}
+
+void MemifConnector::sendCallback(const std::error_code &ec) {
+ timer_set_ = false;
+
+ if (TRANSPORT_EXPECT_TRUE(!ec && state_ == State::CONNECTED)) {
+ doSend();
+ }
+}
+
+/* informs user about connected status. private_ctx is used by user to identify
+ connection (multiple connections WIP) */
+int MemifConnector::onConnect(memif_conn_handle_t conn, void *private_ctx) {
+ auto self = reinterpret_cast<MemifConnector *>(private_ctx);
+ self->state_ = State::CONNECTED;
+ memif_refill_queue(conn, 0, -1, 0);
+
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "Memif " << self->app_name_ << " connected";
+
+ // We are connected. Notify higher layers.
+ self->io_service_.post([self]() {
+ self->on_reconnect_callback_(self, make_error_code(core_error::success));
+ });
+
+ self->doSend();
+
+ return 0;
+}
+
+/* informs user about disconnected status. private_ctx is used by user to
+ identify connection (multiple connections WIP) */
+int MemifConnector::onDisconnect(memif_conn_handle_t conn, void *private_ctx) {
+ MemifConnector *connector = (MemifConnector *)private_ctx;
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "Memif " << connector->app_name_ << " disconnected";
+ return 0;
+}
+
+void MemifConnector::threadMain() { event_reactor_.runEventLoop(200); }
+
+int MemifConnector::onInterrupt(memif_conn_handle_t conn, void *private_ctx,
+ uint16_t qid) {
+ MemifConnector *connector = (MemifConnector *)private_ctx;
+
+ Details &c = connector->memif_connection_;
+ std::weak_ptr<MemifConnector> self = connector->shared_from_this();
+ std::vector<::utils::MemBuf::Ptr> v;
+ std::error_code ec = make_error_code(core_error::success);
+
+ int err = MEMIF_ERR_SUCCESS, ret_val;
+ uint16_t rx = 0;
+
+ do {
+ err = memif_rx_burst(conn, qid, c.rx_bufs, max_burst, &rx);
+ ret_val = err;
+
+ if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS &&
+ err != MEMIF_ERR_NOBUF)) {
+ ec = make_error_code(core_error::receive_failed);
+ LOG(ERROR) << "memif_rx_burst: " << memif_strerror(err);
+ goto error;
+ }
+
+ c.rx_buf_num += rx;
+
+ if (TRANSPORT_EXPECT_FALSE(connector->io_service_.stopped())) {
+ LOG(ERROR) << "socket stopped: ignoring " << rx << " packets";
+ goto error;
+ }
+
+ std::size_t packet_length;
+ v.reserve(rx);
+ for (int i = 0; i < rx; i++) {
+ auto buffer = connector->getRawBuffer();
+ packet_length = (c.rx_bufs + i)->len;
+ std::memcpy(buffer.first, (c.rx_bufs + i)->data, packet_length);
+ auto packet = connector->getPacketFromBuffer(buffer.first, packet_length);
+ v.emplace_back(std::move(packet));
+ }
+
+ /* mark memif buffers and shared memory buffers as free */
+ /* free processed buffers */
+
+ err = memif_refill_queue(conn, qid, rx, 0);
+
+ if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) {
+ LOG(ERROR) << "memif_buffer_free: " << memif_strerror(err);
+ }
+
+ c.rx_buf_num -= rx;
+
+ } while (ret_val == MEMIF_ERR_NOBUF);
+
+ connector->io_service_.post([self, buffers = std::move(v)]() {
+ if (auto c = self.lock()) {
+ c->receive_callback_(c.get(), buffers,
+ std::make_error_code(std::errc(0)));
+ }
+ });
+
+ return 0;
+
+error:
+ err = memif_refill_queue(c.conn, qid, rx, 0);
+
+ if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) {
+ LOG(ERROR) << "memif_buffer_free: " << memif_strerror(err);
+ }
+ c.rx_buf_num -= rx;
+
+ connector->io_service_.post([self, ec]() {
+ if (auto c = self.lock()) {
+ c->receive_callback_(c.get(), {}, ec);
+ }
+ });
+
+ return 0;
+}
+
+void MemifConnector::close() {
+ if (state_ != State::CLOSED) {
+ disconnect_timer_.expiresFromNow(std::chrono::microseconds(50));
+ disconnect_timer_.asyncWait([this](const std::error_code &ec) {
+ deleteMemif();
+ event_reactor_.stop();
+ });
+ }
+
+ if (memif_worker_.joinable()) {
+ memif_worker_.join();
+ }
+}
+
+void MemifConnector::send(Packet &packet) { send(packet.shared_from_this()); }
+
+void MemifConnector::send(const utils::MemBuf::Ptr &buffer) {
+ {
+ utils::SpinLock::Acquire locked(write_msgs_lock_);
+ output_buffer_.push_back(buffer);
+ }
+#if CANCEL_TIMER
+ scheduleSend(50);
+#endif
+}
+
+int MemifConnector::doSend() {
+ std::size_t max = 0;
+ std::size_t size = 0;
+ std::error_code ec = make_error_code(core_error::success);
+ int ret = 0;
+ uint64_t delay = 50; // microseconds
+
+ utils::SpinLock::Acquire locked(write_msgs_lock_);
+
+ // Check if there are pending buffers to send
+ if (memif_connection_.tx_buf_num > 0) {
+ ret = txBurst(memif_connection_.tx_qid, ec);
+ if (TRANSPORT_EXPECT_FALSE(ec.operator bool())) {
+ delay = 200;
+ goto done;
+ }
+ }
+
+ // Continue trying to send buffers in output_buffer_
+ size = output_buffer_.size();
+ max = size < max_burst ? size : max_burst;
+
+ ret = bufferAlloc(max, memif_connection_.tx_qid, ec);
+ if (TRANSPORT_EXPECT_FALSE(ec.operator bool() && ret == 0)) {
+ delay = 200;
+ goto done;
+ }
+
+ // Fill allocated buffers and remove them from output_buffer_
+ for (uint16_t i = 0; i < ret; i++) {
+ auto packet = output_buffer_.front().get();
+ const utils::MemBuf *current = packet;
+ std::size_t offset = 0;
+ uint8_t *shared_buffer =
+ reinterpret_cast<uint8_t *>(memif_connection_.tx_bufs[i].data);
+ do {
+ std::memcpy(shared_buffer + offset, current->data(), current->length());
+ offset += current->length();
+ current = current->next();
+ } while (current != packet);
+
+ memif_connection_.tx_bufs[i].len = uint32_t(offset);
+ output_buffer_.pop_front();
+ }
+
+ // Try to send them
+ ret = txBurst(memif_connection_.tx_qid, ec);
+ if (TRANSPORT_EXPECT_FALSE(ec.operator bool())) {
+ LOG(ERROR) << "Tx burst failed " << ec.message();
+ delay = 200;
+ goto done;
+ }
+
+done:
+ memif_refill_queue(memif_connection_.conn, memif_connection_.tx_qid, ret, 0);
+
+ // If there are still packets to send, schedule another send
+ if (memif_connection_.tx_buf_num > 0 || !output_buffer_.empty()) {
+ scheduleSend(delay);
+ }
+
+ // If error, signal to upper layers
+ if (ec.operator bool()) {
+ std::weak_ptr<MemifConnector> self = shared_from_this();
+ io_service_.post([self, ec]() {
+ if (auto c = self.lock()) {
+ c->sent_callback_(c.get(), ec);
+ }
+ });
+ }
+
+ return 0;
+}
+
+} // end namespace core
+
+} // end namespace transport
diff --git a/libtransport/src/core/memif_connector.h b/libtransport/src/core/memif_connector.h
new file mode 100644
index 000000000..d36be4616
--- /dev/null
+++ b/libtransport/src/core/memif_connector.h
@@ -0,0 +1,154 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/config.h>
+#include <hicn/transport/core/connector.h>
+#include <hicn/transport/portability/portability.h>
+#include <hicn/transport/utils/ring_buffer.h>
+//#include <hicn/transport/core/hicn_vapi.h>
+#include <hicn/transport/core/asio_wrapper.h>
+#include <utils/epoll_event_reactor.h>
+#include <utils/fd_deadline_timer.h>
+
+#include <deque>
+#include <future>
+#include <mutex>
+#include <thread>
+
+extern "C" {
+#include <libmemif.h>
+};
+
+#define _Static_assert static_assert
+
+namespace transport {
+
+namespace core {
+
+#define APP_NAME "libtransport"
+#define IF_NAME "vpp_connection"
+
+class MemifConnector : public Connector {
+ static inline std::size_t kbuf_size = 2048;
+ static inline std::size_t klog2_ring_size = 13;
+
+ using PacketRing = utils::CircularFifo<utils::MemBuf::Ptr, queue_size>;
+ struct Details {
+ // index
+ uint16_t index;
+ // memif conenction handle
+ memif_conn_handle_t conn;
+ // transmit queue id
+ uint16_t tx_qid;
+ // tx buffers
+ memif_buffer_t *tx_bufs;
+ // allocated tx buffers counter
+ // number of tx buffers pointing to shared memory
+ uint16_t tx_buf_num;
+ // rx buffers
+ memif_buffer_t *rx_bufs;
+ // allocated rx buffers counter
+ // number of rx buffers pointing to shared memory
+ uint16_t rx_buf_num;
+ // interface ip address
+ uint8_t ip_addr[4];
+ };
+
+ public:
+ MemifConnector(PacketReceivedCallback &&receive_callback,
+ PacketSentCallback &&packet_sent,
+ OnCloseCallback &&close_callback,
+ OnReconnectCallback &&on_reconnect,
+ asio::io_service &io_service,
+ std::string app_name = "Libtransport");
+
+ ~MemifConnector() override;
+
+ void send(Packet &packet) override;
+
+ void send(const utils::MemBuf::Ptr &buffer) override;
+
+ void close() override;
+
+ void connect(uint32_t memif_id, long memif_mode,
+ const std::string &socket_filename,
+ std::size_t buffer_size = kbuf_size,
+ std::size_t log2_ring_size = klog2_ring_size);
+
+ TRANSPORT_ALWAYS_INLINE uint32_t getMemifId() { return memif_id_; };
+
+ private:
+ void init();
+
+ int doSend();
+
+ int createMemif(uint32_t index, uint8_t is_master);
+
+ uint32_t getMemifConfiguration();
+
+ int deleteMemif();
+
+ static int controlFdUpdate(memif_fd_event_t fde, void *private_ctx);
+
+ static int onConnect(memif_conn_handle_t conn, void *private_ctx);
+
+ static int onDisconnect(memif_conn_handle_t conn, void *private_ctx);
+
+ static int onInterrupt(memif_conn_handle_t conn, void *private_ctx,
+ uint16_t qid);
+
+ void threadMain();
+
+ uint16_t txBurst(uint16_t qid, std::error_code &ec);
+
+ uint16_t bufferAlloc(long n, uint16_t qid, std::error_code &ec);
+
+ void scheduleSend(std::uint64_t delay);
+
+ void sendCallback(const std::error_code &ec);
+
+ auto shared_from_this() { return utils::shared_from(this); }
+
+ private:
+ int epfd;
+ utils::EpollEventReactor event_reactor_;
+ std::thread memif_worker_;
+ std::atomic_bool timer_set_;
+ utils::FdDeadlineTimer send_timer_;
+ utils::FdDeadlineTimer disconnect_timer_;
+ asio::io_service &io_service_;
+ asio::executor_work_guard<asio::io_context::executor_type> work_;
+ Details memif_connection_;
+ uint16_t tx_buf_counter_;
+
+ PacketRing input_buffer_;
+ bool is_reconnection_;
+ bool data_available_;
+ uint32_t memif_id_;
+ uint8_t memif_mode_;
+ std::string app_name_;
+ uint16_t transmission_index_;
+ utils::SpinLock write_msgs_lock_;
+ std::string socket_filename_;
+ std::size_t buffer_size_;
+ std::size_t log2_ring_size_;
+ std::size_t max_memif_bufs_;
+};
+
+} // end namespace core
+
+} // end namespace transport
diff --git a/libtransport/src/core/name.cc b/libtransport/src/core/name.cc
index 795c8a697..4f8ba7873 100644
--- a/libtransport/src/core/name.cc
+++ b/libtransport/src/core/name.cc
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Copyright (c) 2021-2022 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -14,6 +14,7 @@
*/
#include <core/manifest_format.h>
+#include <hicn/name.h>
#include <hicn/transport/core/name.h>
#include <hicn/transport/errors/errors.h>
#include <hicn/transport/errors/tokenizer_exception.h>
@@ -24,32 +25,31 @@ namespace transport {
namespace core {
-Name::Name() { name_ = {}; }
+Name::Name() { std::memset(&name_, 0, sizeof(name_)); }
+/**
+ * XXX This function does not use the name API provided by libhicn
+ */
Name::Name(int family, const uint8_t *ip_address, std::uint32_t suffix)
: name_({}) {
- name_.type = HNT_UNSPEC;
std::size_t length;
uint8_t *dst = NULL;
if (family == AF_INET) {
- dst = name_.ip4.prefix_as_u8;
+ dst = name_.prefix.v4.as_u8;
length = IPV4_ADDR_LEN;
- name_.type = HNT_CONTIGUOUS_V4;
} else if (family == AF_INET6) {
- dst = name_.ip6.prefix_as_u8;
+ dst = name_.prefix.v6.as_u8;
length = IPV6_ADDR_LEN;
- name_.type = HNT_CONTIGUOUS_V6;
} else {
throw errors::RuntimeException("Specified name family does not exist.");
}
std::memcpy(dst, ip_address, length);
- *reinterpret_cast<std::uint32_t *>(dst + length) = suffix;
+ name_.suffix = suffix;
}
Name::Name(const char *name, uint32_t segment) {
- name_.type = HNT_UNSPEC;
if (hicn_name_create(name, segment, &name_) < 0) {
throw errors::InvalidIpAddressException();
}
@@ -59,7 +59,6 @@ Name::Name(const std::string &uri, uint32_t segment)
: Name(uri.c_str(), segment) {}
Name::Name(const std::string &uri) {
- name_.type = HNT_UNSPEC;
utils::StringTokenizer tokenizer(uri, "|");
std::string ip_address;
std::string seq_number;
@@ -80,9 +79,13 @@ Name::Name(const std::string &uri) {
Name::Name(const Name &name) { this->name_ = name.name_; }
+Name::~Name() {}
+
Name &Name::operator=(const Name &name) {
- if (hicn_name_copy(&this->name_, &name.name_) < 0) {
- throw errors::MalformedNameException();
+ if (this != &name) {
+ if (hicn_name_copy(&this->name_, &name.name_) < 0) {
+ throw errors::MalformedNameException();
+ }
}
return *this;
@@ -122,16 +125,20 @@ std::string Name::toString() const {
}
uint32_t Name::getHash32(bool consider_suffix) const {
- uint32_t hash;
- if (hicn_name_hash(&name_, &hash, consider_suffix) < 0) {
+ uint32_t hash = _hicn_name_get_hash(&name_, consider_suffix);
+ if (hash < 0) {
throw errors::RuntimeException("Error computing the hash of the name!");
}
return hash;
}
-void Name::clear() { name_.type = HNT_UNSPEC; };
+void Name::clear() { std::memset(&name_, 0, sizeof(name_)); };
-Name::Type Name::getType() const { return name_.type; }
+Name::Type Name::getType() const {
+ int family;
+ hicn_name_get_family(&name_, &family);
+ return family == AF_INET ? Name::Type::V4 : Name::Type::V6;
+}
uint32_t Name::getSuffix() const {
uint32_t ret = 0;
@@ -142,49 +149,32 @@ uint32_t Name::getSuffix() const {
return ret;
}
-Name &Name::setSuffix(uint32_t seq_number) {
- if (hicn_name_set_seq_number(&name_, seq_number) < 0) {
- throw errors::RuntimeException(
- "Impossible to set the sequence number to the name.");
+Name &Name::setSuffix(hicn_name_suffix_t suffix) {
+ if (hicn_name_set_suffix(&name_, suffix) < 0) {
+ throw errors::RuntimeException("Impossible to set name suffix.");
}
return *this;
}
-std::shared_ptr<Sockaddr> Name::getAddress() const {
- Sockaddr *ret = nullptr;
-
- switch (name_.type) {
- case HNT_CONTIGUOUS_V4:
- case HNT_IOV_V4:
- ret = (Sockaddr *)new Sockaddr4;
- break;
- case HNT_CONTIGUOUS_V6:
- case HNT_IOV_V6:
- ret = (Sockaddr *)new Sockaddr6;
- break;
- default:
- throw errors::MalformedNameException();
- }
-
- if (hicn_name_to_sockaddr_address((hicn_name_t *)&name_, ret) < 0) {
- throw errors::MalformedNameException();
- }
-
- return std::shared_ptr<Sockaddr>(ret);
-}
-
-ip_prefix_t Name::toIpAddress() const {
- ip_prefix_t ret;
+hicn_ip_prefix_t Name::toIpAddress() const {
+ hicn_ip_prefix_t ret;
std::memset(&ret, 0, sizeof(ret));
- if (hicn_name_to_ip_prefix(&name_, &ret) < 0) {
+ if (hicn_name_to_hicn_ip_prefix(&name_, &ret) < 0) {
throw errors::InvalidIpAddressException();
}
return ret;
}
+std::string Name::getPrefix() const {
+ char prefix[MAXSZ_HICN_NAME];
+ hicn_name_no_suffix_snprintf(prefix, MAXSZ_HICN_NAME, &name_);
+
+ return std::string(prefix);
+}
+
int Name::getAddressFamily() const {
int ret = 0;
@@ -195,8 +185,8 @@ int Name::getAddressFamily() const {
return ret;
}
-void Name::copyToDestination(uint8_t *destination, bool include_suffix) const {
- if (hicn_name_copy_to_destination(destination, &name_, include_suffix) < 0) {
+void Name::copyPrefixToDestination(uint8_t *destination) const {
+ if (hicn_name_copy_prefix_to_destination(destination, &name_) < 0) {
throw errors::RuntimeException(
"Impossibe to copy the name into the "
"provided destination");
diff --git a/libtransport/src/core/packet.cc b/libtransport/src/core/packet.cc
index 51337201f..bcd0b8498 100644
--- a/libtransport/src/core/packet.cc
+++ b/libtransport/src/core/packet.cc
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Copyright (c) 2021-2022 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -15,6 +15,7 @@
#include <glog/logging.h>
#include <hicn/transport/auth/crypto_hash.h>
+#include <hicn/transport/core/global_object_pool.h>
#include <hicn/transport/core/packet.h>
#include <hicn/transport/errors/malformed_packet_exception.h>
#include <hicn/transport/utils/hash.h>
@@ -23,6 +24,7 @@ extern "C" {
#ifndef _WIN32
TRANSPORT_CLANG_DISABLE_WARNING("-Wextern-c-compat")
#endif
+#include <hicn/base.h>
#include <hicn/error.h>
}
@@ -32,64 +34,70 @@ namespace core {
const core::Name Packet::base_name("0::0|0");
-Packet::Packet(Format format, std::size_t additional_header_size)
+Packet::Packet(Type type, Format format, std::size_t additional_header_size)
: utils::MemBuf(utils::MemBuf(CREATE, 2048)),
- packet_start_(reinterpret_cast<hicn_header_t *>(writableData())),
- header_offset_(0),
- format_(format),
payload_type_(PayloadType::UNSPECIFIED) {
- setFormat(format_, additional_header_size);
+ /*
+ * We define the format and the storage area of the packet buffer we
+ * manipulate
+ */
+ setFormat(format);
+ setBuffer();
+ initializeType(type); // type requires packet format
+ initialize(additional_header_size);
}
-Packet::Packet(MemBuf &&buffer)
- : utils::MemBuf(std::move(buffer)),
- packet_start_(reinterpret_cast<hicn_header_t *>(writableData())),
- header_offset_(0),
- format_(getFormatFromBuffer(data(), length())),
- payload_type_(PayloadType::UNSPECIFIED) {}
-
Packet::Packet(CopyBufferOp, const uint8_t *buffer, std::size_t size)
: utils::MemBuf(COPY_BUFFER, buffer, size),
- packet_start_(reinterpret_cast<hicn_header_t *>(writableData())),
- header_offset_(0),
- format_(getFormatFromBuffer(data(), length())),
- payload_type_(PayloadType::UNSPECIFIED) {}
+ payload_type_(PayloadType::UNSPECIFIED) {
+ setBuffer();
+ analyze();
+}
Packet::Packet(WrapBufferOp, uint8_t *buffer, std::size_t length,
std::size_t size)
: utils::MemBuf(WRAP_BUFFER, buffer, length, size),
- packet_start_(reinterpret_cast<hicn_header_t *>(writableData())),
- header_offset_(0),
- format_(getFormatFromBuffer(this->data(), this->length())),
- payload_type_(PayloadType::UNSPECIFIED) {}
+ payload_type_(PayloadType::UNSPECIFIED) {
+ setBuffer();
+ analyze();
+}
-Packet::Packet(CreateOp, uint8_t *buffer, std::size_t length, std::size_t size,
- Format format, std::size_t additional_header_size)
+Packet::Packet(CreateOp, Type type, uint8_t *buffer, std::size_t length,
+ std::size_t size, Format format,
+ std::size_t additional_header_size)
: utils::MemBuf(WRAP_BUFFER, buffer, length, size),
- packet_start_(reinterpret_cast<hicn_header_t *>(writableData())),
- header_offset_(0),
- format_(format),
payload_type_(PayloadType::UNSPECIFIED) {
clear();
- setFormat(format_, additional_header_size);
+ setType(type);
+ setFormat(format);
+ setBuffer();
+ initialize(additional_header_size);
}
-Packet::Packet(const Packet &other)
- : utils::MemBuf(other),
- packet_start_(reinterpret_cast<hicn_header_t *>(writableData())),
- header_offset_(other.header_offset_),
- format_(other.format_),
- payload_type_(PayloadType::UNSPECIFIED) {}
+Packet::Packet(MemBuf &&buffer)
+ : utils::MemBuf(std::move(buffer)),
+ payload_type_(PayloadType::UNSPECIFIED) {
+ setBuffer();
+ analyze();
+}
+
+/*
+ * In the two following constructors, we inherit the pkbuf and only need to
+ * recompute the pointer fields, aka the buffer.
+ */
Packet::Packet(Packet &&other)
: utils::MemBuf(std::move(other)),
- packet_start_(other.packet_start_),
- header_offset_(other.header_offset_),
- format_(other.format_),
+ pkbuf_(other.pkbuf_),
+ payload_type_(PayloadType::UNSPECIFIED) {
+ hicn_packet_reset(&other.pkbuf_);
+}
+
+Packet::Packet(const Packet &other)
+ : utils::MemBuf(other),
+ pkbuf_(other.pkbuf_),
payload_type_(PayloadType::UNSPECIFIED) {
- other.packet_start_ = nullptr;
- other.format_ = HF_UNSPEC;
- other.header_offset_ = 0;
+ setBuffer();
}
Packet::~Packet() {}
@@ -97,86 +105,82 @@ Packet::~Packet() {}
Packet &Packet::operator=(const Packet &other) {
if (this != &other) {
*this = other;
- packet_start_ = reinterpret_cast<hicn_header_t *>(writableData());
+ setBuffer();
}
return *this;
}
-std::size_t Packet::getHeaderSizeFromBuffer(Format format,
- const uint8_t *buffer) {
- size_t header_length;
-
- if (hicn_packet_get_header_length(format, (hicn_header_t *)buffer,
- &header_length) < 0) {
- throw errors::MalformedPacketException();
- }
-
- return header_length;
+std::shared_ptr<utils::MemBuf> Packet::acquireMemBufReference() {
+ return std::static_pointer_cast<utils::MemBuf>(shared_from_this());
}
-bool Packet::isInterest(const uint8_t *buffer) {
- bool is_interest = false;
-
- if (TRANSPORT_EXPECT_FALSE(hicn_packet_test_ece(HF_INET6_TCP,
- (const hicn_header_t *)buffer,
- &is_interest) < 0)) {
- throw errors::RuntimeException(
- "Impossible to retrieve ece flag from packet");
- }
+Packet::Format Packet::getFormat() const {
+ return hicn_packet_get_format(&pkbuf_);
+}
- return !is_interest;
+void Packet::setFormat(Packet::Format format) {
+ hicn_packet_set_format(&pkbuf_, format);
}
-void Packet::setFormat(Packet::Format format,
- std::size_t additional_header_size) {
- format_ = format;
- if (hicn_packet_init_header(format_, packet_start_) < 0) {
+void Packet::initialize(std::size_t additional_header_size) {
+ if (hicn_packet_init_header(&pkbuf_, additional_header_size) < 0) {
throw errors::RuntimeException("Unexpected error initializing the packet.");
}
- auto header_size = getHeaderSizeFromFormat(format_);
- assert(header_size <= tailroom());
+ auto header_size = getHeaderSizeFromFormat(getFormat());
+ DCHECK(header_size <= tailroom());
append(header_size);
-
- assert(additional_header_size <= tailroom());
+ DCHECK(additional_header_size <= tailroom());
append(additional_header_size);
+}
- header_offset_ = length();
+void Packet::analyze() {
+ if (hicn_packet_analyze(&pkbuf_) < 0)
+ throw errors::MalformedPacketException();
}
-bool Packet::isInterest() { return Packet::isInterest(data()); }
+Packet::Type Packet::getType() const { return hicn_packet_get_type(&pkbuf_); }
-std::size_t Packet::getPayloadSizeFromBuffer(Format format,
- const uint8_t *buffer) {
- std::size_t payload_length;
- if (TRANSPORT_EXPECT_FALSE(
- hicn_packet_get_payload_length(format, (hicn_header_t *)buffer,
- &payload_length) < 0)) {
- throw errors::MalformedPacketException();
- }
+void Packet::initializeType(Packet::Type type) {
+ hicn_packet_initialize_type(&pkbuf_, type);
+}
- return payload_length;
+void Packet::setType(Packet::Type type) { hicn_packet_set_type(&pkbuf_, type); }
+
+void Packet::setBuffer() {
+ hicn_packet_set_buffer(&pkbuf_, writableData(),
+ this->capacity() - this->headroom(), this->length());
}
-std::size_t Packet::payloadSize() const {
- std::size_t ret = 0;
+PayloadType Packet::getPayloadType() const {
+ if (payload_type_ == PayloadType::UNSPECIFIED) {
+ hicn_payload_type_t ret;
- if (length()) {
- ret = getPayloadSizeFromBuffer(format_,
- reinterpret_cast<uint8_t *>(packet_start_));
+ if (hicn_packet_get_payload_type(&pkbuf_, &ret) < 0) {
+ throw errors::RuntimeException("Impossible to retrieve payload type.");
+ }
+
+ payload_type_ = (PayloadType)ret;
}
- return ret;
+ return payload_type_;
}
-std::size_t Packet::headerSize() const {
- if (header_offset_ == 0 && length()) {
- const_cast<Packet *>(this)->header_offset_ = getHeaderSizeFromBuffer(
- format_, reinterpret_cast<uint8_t *>(packet_start_));
+Packet &Packet::setPayloadType(PayloadType payload_type) {
+ if (hicn_packet_set_payload_type(&pkbuf_, hicn_payload_type_t(payload_type)) <
+ 0) {
+ throw errors::RuntimeException("Error setting payload type of the packet.");
}
- return header_offset_;
+ payload_type_ = payload_type;
+ return *this;
+}
+
+std::unique_ptr<utils::MemBuf> Packet::getPayload() const {
+ auto ret = clone();
+ ret->trimStart(headerSize());
+ return ret;
}
Packet &Packet::appendPayload(std::unique_ptr<utils::MemBuf> &&payload) {
@@ -196,70 +200,71 @@ Packet &Packet::appendPayload(const uint8_t *buffer, std::size_t length) {
return *this;
}
-std::unique_ptr<utils::MemBuf> Packet::getPayload() const {
- auto ret = clone();
- ret->trimStart(headerSize());
- return ret;
+std::size_t Packet::headerSize() const {
+ std::size_t len;
+ hicn_packet_get_header_len(&pkbuf_, &len);
+ return len;
}
-Packet &Packet::updateLength(std::size_t length) {
- std::size_t total_length = length;
+std::size_t Packet::payloadSize() const {
+ std::size_t len;
+ hicn_packet_get_payload_len(&pkbuf_, &len);
+ return len;
+}
- const utils::MemBuf *current = this;
- do {
- total_length += current->length();
- current = current->next();
- } while (current != this);
+auth::CryptoHash Packet::computeDigest(auth::CryptoHashType algorithm) const {
+ auth::CryptoHash hash;
+ hash.setType(algorithm);
- total_length -= headerSize();
+ // Copy IP+TCP/ICMP header before zeroing them
+ u8 header_copy[HICN_HDRLEN_MAX];
+ size_t header_len;
+ hicn_packet_save_header(&pkbuf_, header_copy, &header_len,
+ /* copy_ah */ false);
+ const_cast<Packet *>(this)->resetForHash();
- if (hicn_packet_set_payload_length(format_, packet_start_, total_length) <
- 0) {
- throw errors::RuntimeException("Error setting the packet payload.");
- }
+ hash.computeDigest(this);
+ hicn_packet_load_header(&pkbuf_, header_copy, header_len);
- return *this;
+ return hash;
}
-PayloadType Packet::getPayloadType() const {
- if (payload_type_ == PayloadType::UNSPECIFIED) {
- hicn_payload_type_t ret;
- if (hicn_packet_get_payload_type(packet_start_, &ret) < 0) {
- throw errors::RuntimeException("Impossible to retrieve payload type.");
- }
+void Packet::reset() {
+ clear();
+ hicn_packet_reset(&pkbuf_);
+ setBuffer();
+ payload_type_ = PayloadType::UNSPECIFIED;
+ name_.clear();
- payload_type_ = (PayloadType)ret;
+ if (isChained()) {
+ separateChain(next(), prev());
}
-
- return payload_type_;
}
-Packet &Packet::setPayloadType(PayloadType payload_type) {
- if (hicn_packet_set_payload_type(packet_start_,
- hicn_payload_type_t(payload_type)) < 0) {
- throw errors::RuntimeException("Error setting payload type of the packet.");
- }
+bool Packet::isInterest() {
+ return hicn_packet_get_type(&pkbuf_) == HICN_PACKET_TYPE_INTEREST;
+}
- payload_type_ = payload_type;
+Packet &Packet::updateLength(std::size_t length) {
+ std::size_t total_length = length;
- return *this;
-}
+ const utils::MemBuf *current = this;
+ do {
+ total_length += current->length();
+ current = current->next();
+ } while (current != this);
-Packet::Format Packet::getFormat() const {
- /**
- * We check packet start because after a movement it will result in a nullptr
- */
- if (format_ == HF_UNSPEC && length()) {
- if (hicn_packet_get_format(packet_start_, &format_) < 0) {
- LOG(ERROR) << "Unexpected packet format HF_UNSPEC.";
- }
+ if (hicn_packet_set_len(&pkbuf_, total_length) < 0) {
+ throw errors::RuntimeException("Error setting the packet length.");
}
- return format_;
-}
+ total_length -= headerSize();
-std::shared_ptr<utils::MemBuf> Packet::acquireMemBufReference() {
- return std::static_pointer_cast<utils::MemBuf>(shared_from_this());
+ if (hicn_packet_set_payload_length(&pkbuf_, total_length) < 0) {
+ throw errors::RuntimeException("Error setting the packet payload length.");
+ }
+
+ return *this;
}
void Packet::dump() const {
@@ -274,364 +279,282 @@ void Packet::dump() const {
} while (current != this);
}
-void Packet::dump(uint8_t *buffer, std::size_t length) {
- hicn_packet_dump(buffer, length);
-}
+void Packet::setChecksum() {
+ size_t header_len = 0;
+ if (hicn_packet_get_header_len(&pkbuf_, &header_len) < 0)
+ throw errors::RuntimeException(
+ "Error setting getting packet header length.");
-void Packet::setSignatureSize(std::size_t size_bytes) {
- if (!authenticationHeader()) {
- throw errors::RuntimeException("Packet without Authentication Header.");
- }
+ uint16_t partial_csum = csum(data() + header_len, length() - header_len, 0);
- int ret = hicn_packet_set_signature_size(format_, packet_start_, size_bytes);
+ for (utils::MemBuf *current = next(); current != this;
+ current = current->next()) {
+ partial_csum = csum(current->data(), current->length(), ~partial_csum);
+ }
- if (ret < 0) {
- throw errors::RuntimeException("Error setting signature size.");
+ if (hicn_packet_compute_header_checksum(&pkbuf_, partial_csum) < 0) {
+ throw errors::MalformedPacketException();
}
}
-void Packet::setSignatureSizeGap(std::size_t size_bytes) {
- if (!authenticationHeader()) {
- throw errors::RuntimeException("Packet without Authentication Header.");
- }
+bool Packet::checkIntegrity() const {
+ size_t header_len = 0;
+ if (hicn_packet_get_header_len(&pkbuf_, &header_len) < 0)
+ throw errors::RuntimeException(
+ "Error setting getting packet header length.");
- int ret = hicn_packet_set_signature_gap(format_, packet_start_,
- (uint8_t)size_bytes);
+ uint16_t partial_csum = csum(data() + header_len, length() - header_len, 0);
- if (ret < 0) {
- throw errors::RuntimeException("Error setting signature size.");
+ for (const utils::MemBuf *current = next(); current != this;
+ current = current->next()) {
+ partial_csum = csum(current->data(), current->length(), ~partial_csum);
+ }
+
+ if (hicn_packet_check_integrity_no_payload(&pkbuf_, partial_csum) < 0) {
+ return false;
}
+
+ return true;
}
-uint8_t *Packet::getSignature() const {
- if (!authenticationHeader()) {
+bool Packet::hasAH() const { return HICN_PACKET_FORMAT_IS_AH(getFormat()); }
+
+utils::MemBuf::Ptr Packet::getSignature() const {
+ if (!hasAH()) {
throw errors::RuntimeException("Packet without Authentication Header.");
}
uint8_t *signature;
- int ret = hicn_packet_get_signature(format_, packet_start_, &signature);
+ int ret = hicn_packet_get_signature(&pkbuf_, &signature);
if (ret < 0) {
throw errors::RuntimeException("Error getting signature.");
}
- return signature;
+ utils::MemBuf::Ptr membuf = PacketManager<>::getInstance().getMemBuf();
+ membuf->append(getSignatureFieldSize());
+ memcpy(membuf->writableData(), signature, getSignatureFieldSize());
+
+ return membuf;
}
-void Packet::setSignatureTimestamp(const uint64_t &timestamp) {
- if (!authenticationHeader()) {
+std::size_t Packet::getSignatureFieldSize() const {
+ if (!hasAH()) {
throw errors::RuntimeException("Packet without Authentication Header.");
}
- int ret =
- hicn_packet_set_signature_timestamp(format_, packet_start_, timestamp);
-
+ size_t field_size;
+ int ret = hicn_packet_get_signature_size(&pkbuf_, &field_size);
if (ret < 0) {
- throw errors::RuntimeException("Error setting the signature timestamp.");
+ throw errors::RuntimeException("Error reading signature field size");
}
+ return field_size;
}
-uint64_t Packet::getSignatureTimestamp() const {
- if (!authenticationHeader()) {
+std::size_t Packet::getSignatureSize() const {
+ if (!hasAH()) {
throw errors::RuntimeException("Packet without Authentication Header.");
}
- uint64_t return_value;
- int ret = hicn_packet_get_signature_timestamp(format_, packet_start_,
- &return_value);
-
+ size_t padding;
+ int ret = hicn_packet_get_signature_padding(&pkbuf_, &padding);
if (ret < 0) {
- throw errors::RuntimeException("Error getting the signature timestamp.");
+ throw errors::RuntimeException("Error reading signature padding");
+ }
+
+ size_t size = getSignatureFieldSize() - padding;
+ if (size < 0) {
+ throw errors::RuntimeException("Error reading signature size");
}
- return return_value;
+ return size;
}
-void Packet::setValidationAlgorithm(
- const auth::CryptoSuite &validation_algorithm) {
- if (!authenticationHeader()) {
+uint64_t Packet::getSignatureTimestamp() const {
+ if (!hasAH()) {
throw errors::RuntimeException("Packet without Authentication Header.");
}
- int ret = hicn_packet_set_validation_algorithm(format_, packet_start_,
- uint8_t(validation_algorithm));
-
+ uint64_t timestamp;
+ int ret = hicn_packet_get_signature_timestamp(&pkbuf_, &timestamp);
if (ret < 0) {
- throw errors::RuntimeException("Error setting the validation algorithm.");
+ throw errors::RuntimeException("Error getting the signature timestamp.");
}
+ return timestamp;
}
-auth::CryptoSuite Packet::getValidationAlgorithm() const {
- if (!authenticationHeader()) {
+auth::KeyId Packet::getKeyId() const {
+ if (!hasAH()) {
throw errors::RuntimeException("Packet without Authentication Header.");
}
- uint8_t return_value;
- int ret = hicn_packet_get_validation_algorithm(format_, packet_start_,
- &return_value);
-
+ auth::KeyId key_id;
+ int ret = hicn_packet_get_key_id(&pkbuf_, &key_id.first, &key_id.second);
if (ret < 0) {
throw errors::RuntimeException("Error getting the validation algorithm.");
}
-
- return auth::CryptoSuite(return_value);
+ return key_id;
}
-void Packet::setKeyId(const auth::KeyId &key_id) {
- if (!authenticationHeader()) {
+auth::CryptoSuite Packet::getValidationAlgorithm() const {
+ if (!hasAH()) {
throw errors::RuntimeException("Packet without Authentication Header.");
}
- int ret = hicn_packet_set_key_id(format_, packet_start_, key_id.first);
-
+ uint8_t return_value;
+ int ret = hicn_packet_get_validation_algorithm(&pkbuf_, &return_value);
if (ret < 0) {
- throw errors::RuntimeException("Error setting the key id.");
+ throw errors::RuntimeException("Error getting the validation algorithm.");
}
+ return auth::CryptoSuite(return_value);
}
-auth::KeyId Packet::getKeyId() const {
- if (!authenticationHeader()) {
+void Packet::setSignature(const utils::MemBuf::Ptr &signature) {
+ if (!hasAH()) {
throw errors::RuntimeException("Packet without Authentication Header.");
}
- auth::KeyId return_value;
- int ret = hicn_packet_get_key_id(format_, packet_start_, &return_value.first,
- &return_value.second);
-
+ uint8_t *signature_field;
+ int ret = hicn_packet_get_signature(&pkbuf_, &signature_field);
if (ret < 0) {
- throw errors::RuntimeException("Error getting the validation algorithm.");
+ throw errors::RuntimeException("Error getting signature.");
}
-
- return return_value;
-}
-
-auth::CryptoHash Packet::computeDigest(auth::CryptoHashType algorithm) const {
- auth::CryptoHash hash;
- hash.setType(algorithm);
-
- // Copy IP+TCP/ICMP header before zeroing them
- hicn_header_t header_copy;
-
- hicn_packet_copy_header(format_, packet_start_, &header_copy, false);
-
- const_cast<Packet *>(this)->resetForHash();
-
- hash.computeDigest(this);
- hicn_packet_copy_header(format_, &header_copy, packet_start_, false);
-
- return hash;
+ memcpy(signature_field, signature->data(), signature->length());
}
-bool Packet::checkIntegrity() const {
- uint16_t partial_csum =
- csum(data() + HICN_V6_TCP_HDRLEN, length() - HICN_V6_TCP_HDRLEN, 0);
-
- for (const utils::MemBuf *current = next(); current != this;
- current = current->next()) {
- partial_csum = csum(current->data(), current->length(), ~partial_csum);
- }
-
- if (hicn_packet_check_integrity_no_payload(format_, packet_start_,
- partial_csum) < 0) {
- return false;
+void Packet::setSignatureFieldSize(std::size_t size) {
+ if (!hasAH()) {
+ throw errors::RuntimeException("Packet without Authentication Header.");
}
- return true;
-}
-
-void Packet::prependPayload(const uint8_t **buffer, std::size_t *size) {
- auto last = prev();
- auto to_copy = std::min(*size, last->tailroom());
- std::memcpy(last->writableTail(), *buffer, to_copy);
- last->append(to_copy);
- *size -= to_copy;
- *buffer += to_copy;
-}
-
-Packet &Packet::setSyn() {
- if (hicn_packet_set_syn(format_, packet_start_) < 0) {
- throw errors::RuntimeException("Error setting syn bit in the packet.");
+ int ret = hicn_packet_set_signature_size(&pkbuf_, size);
+ if (ret < 0) {
+ throw errors::RuntimeException("Error setting signature size.");
}
-
- return *this;
}
-Packet &Packet::resetSyn() {
- if (hicn_packet_reset_syn(format_, packet_start_) < 0) {
- throw errors::RuntimeException("Error resetting syn bit in the packet.");
+void Packet::setSignatureSize(std::size_t size) {
+ if (!hasAH()) {
+ throw errors::RuntimeException("Packet without Authentication Header.");
}
- return *this;
-}
-
-bool Packet::testSyn() const {
- bool res = false;
- if (hicn_packet_test_syn(format_, packet_start_, &res) < 0) {
- throw errors::RuntimeException("Error testing syn bit in the packet.");
+ size_t padding = getSignatureFieldSize() - size;
+ if (padding < 0) {
+ throw errors::RuntimeException("Error setting signature padding.");
}
- return res;
-}
-
-Packet &Packet::setAck() {
- if (hicn_packet_set_ack(format_, packet_start_) < 0) {
- throw errors::RuntimeException("Error setting ack bit in the packet.");
+ int ret = hicn_packet_set_signature_padding(&pkbuf_, padding);
+ if (ret < 0) {
+ throw errors::RuntimeException("Error setting signature padding.");
}
-
- return *this;
}
-Packet &Packet::resetAck() {
- if (hicn_packet_reset_ack(format_, packet_start_) < 0) {
- throw errors::RuntimeException("Error resetting ack bit in the packet.");
+void Packet::setSignatureTimestamp(const uint64_t &timestamp) {
+ if (!hasAH()) {
+ throw errors::RuntimeException("Packet without Authentication Header.");
}
- return *this;
-}
-
-bool Packet::testAck() const {
- bool res = false;
- if (hicn_packet_test_ack(format_, packet_start_, &res) < 0) {
- throw errors::RuntimeException("Error testing ack bit in the packet.");
+ int ret = hicn_packet_set_signature_timestamp(&pkbuf_, timestamp);
+ if (ret < 0) {
+ throw errors::RuntimeException("Error setting the signature timestamp.");
}
-
- return res;
}
-Packet &Packet::setRst() {
- if (hicn_packet_set_rst(format_, packet_start_) < 0) {
- throw errors::RuntimeException("Error setting rst bit in the packet.");
+void Packet::setKeyId(const auth::KeyId &key_id) {
+ if (!hasAH()) {
+ throw errors::RuntimeException("Packet without Authentication Header.");
}
- return *this;
-}
-
-Packet &Packet::resetRst() {
- if (hicn_packet_reset_rst(format_, packet_start_) < 0) {
- throw errors::RuntimeException("Error resetting rst bit in the packet.");
+ int ret = hicn_packet_set_key_id(&pkbuf_, key_id.first, key_id.second);
+ if (ret < 0) {
+ throw errors::RuntimeException("Error setting the key id.");
}
-
- return *this;
}
-bool Packet::testRst() const {
- bool res = false;
- if (hicn_packet_test_rst(format_, packet_start_, &res) < 0) {
- throw errors::RuntimeException("Error testing rst bit in the packet.");
+void Packet::setValidationAlgorithm(
+ const auth::CryptoSuite &validation_algorithm) {
+ if (!hasAH()) {
+ throw errors::RuntimeException("Packet without Authentication Header.");
}
- return res;
-}
-
-Packet &Packet::setFin() {
- if (hicn_packet_set_fin(format_, packet_start_) < 0) {
- throw errors::RuntimeException("Error setting fin bit in the packet.");
+ int ret = hicn_packet_set_validation_algorithm(&pkbuf_,
+ uint8_t(validation_algorithm));
+ if (ret < 0) {
+ throw errors::RuntimeException("Error setting the validation algorithm.");
}
-
- return *this;
}
-Packet &Packet::resetFin() {
- if (hicn_packet_reset_fin(format_, packet_start_) < 0) {
- throw errors::RuntimeException("Error resetting fin bit in the packet.");
- }
-
- return *this;
+Packet::Format Packet::toAHFormat(const Format &format) {
+ return hicn_get_ah_format(format);
}
-bool Packet::testFin() const {
- bool res = false;
- if (hicn_packet_test_fin(format_, packet_start_, &res) < 0) {
- throw errors::RuntimeException("Error testing fin bit in the packet.");
- }
+Packet::Format Packet::getFormatFromBuffer(const uint8_t *buffer,
+ std::size_t length) {
+ hicn_packet_buffer_t pkbuf;
+ /* un-const to be able to use pkbuf API */
+ hicn_packet_set_buffer(&pkbuf, (uint8_t *)buffer, length, length);
+ if (hicn_packet_analyze(&pkbuf) < 0) throw errors::MalformedPacketException();
- return res;
+ return hicn_packet_get_format(&pkbuf);
}
-Packet &Packet::resetFlags() {
- resetSyn();
- resetAck();
- resetRst();
- resetFin();
-
- return *this;
+std::size_t Packet::getHeaderSizeFromFormat(Format format,
+ std::size_t signature_size) {
+ std::size_t header_length;
+ hicn_packet_get_header_length_from_format(format, &header_length);
+ int is_ah = HICN_PACKET_FORMAT_IS_AH(format);
+ return is_ah * (header_length + signature_size) + (!is_ah) * header_length;
}
-std::string Packet::printFlags() const {
- std::string flags;
-
- if (testSyn()) {
- flags += "S";
- }
-
- if (testAck()) {
- flags += "A";
- }
+std::size_t Packet::getHeaderSizeFromBuffer(const uint8_t *buffer,
+ std::size_t length) {
+ size_t header_length;
- if (testRst()) {
- flags += "R";
- }
+ hicn_packet_buffer_t pkbuf;
+ /* un-const to be able to use pkbuf API */
+ hicn_packet_set_buffer(&pkbuf, (uint8_t *)buffer, length, length);
+ if (hicn_packet_analyze(&pkbuf) < 0) throw errors::MalformedPacketException();
- if (testFin()) {
- flags += "F";
- }
+ int rc = hicn_packet_get_header_len(&pkbuf, &header_length);
+ if (TRANSPORT_EXPECT_FALSE(rc < 0)) throw errors::MalformedPacketException();
- return flags;
+ return header_length;
}
-Packet &Packet::setSrcPort(uint16_t srcPort) {
- if (hicn_packet_set_src_port(format_, packet_start_, srcPort) < 0) {
- throw errors::RuntimeException("Error setting source port in the packet.");
- }
+std::size_t Packet::getPayloadSizeFromBuffer(const uint8_t *buffer,
+ std::size_t length) {
+ std::size_t payload_length;
- return *this;
-}
+ hicn_packet_buffer_t pkbuf;
+ /* un-const to be able to use pkbuf API */
+ hicn_packet_set_buffer(&pkbuf, (uint8_t *)buffer, length, length);
+ if (hicn_packet_analyze(&pkbuf) < 0) throw errors::MalformedPacketException();
-Packet &Packet::setDstPort(uint16_t dstPort) {
- if (hicn_packet_set_dst_port(format_, packet_start_, dstPort) < 0) {
- throw errors::RuntimeException(
- "Error setting destination port in the packet.");
- }
+ int rc = hicn_packet_get_payload_len(&pkbuf, &payload_length);
+ if (TRANSPORT_EXPECT_FALSE(rc < 0)) throw errors::MalformedPacketException();
- return *this;
+ return payload_length;
}
-uint16_t Packet::getSrcPort() const {
- uint16_t port = 0;
-
- if (hicn_packet_get_src_port(format_, packet_start_, &port) < 0) {
- throw errors::RuntimeException("Error reading source port in the packet.");
- }
-
- return port;
+void Packet::dump(uint8_t *buffer, std::size_t length) {
+ hicn_packet_dump(buffer, length);
}
-uint16_t Packet::getDstPort() const {
- uint16_t port = 0;
-
- if (hicn_packet_get_dst_port(format_, packet_start_, &port) < 0) {
- throw errors::RuntimeException(
- "Error reading destination port in the packet.");
- }
-
- return port;
+void Packet::prependPayload(const uint8_t **buffer, std::size_t *size) {
+ auto last = prev();
+ auto to_copy = std::min(*size, last->tailroom());
+ std::memcpy(last->writableTail(), *buffer, to_copy);
+ last->append(to_copy);
+ *size -= to_copy;
+ *buffer += to_copy;
}
-Packet &Packet::setTTL(uint8_t hops) {
- if (hicn_packet_set_hoplimit(packet_start_, hops) < 0) {
- throw errors::RuntimeException("Error setting TTL.");
- }
-
- return *this;
+void Packet::saveHeader(u8 *header, size_t *header_len) {
+ hicn_packet_save_header(&pkbuf_, header, header_len, /* copy_ah */ false);
}
-uint8_t Packet::getTTL() const {
- uint8_t hops = 0;
- if (hicn_packet_get_hoplimit(packet_start_, &hops) < 0) {
- throw errors::RuntimeException("Error reading TTL.");
- }
-
- return hops;
+void Packet::loadHeader(u8 *header, size_t header_len) {
+ hicn_packet_load_header(&pkbuf_, header, header_len);
}
} // end namespace core
diff --git a/libtransport/src/core/pending_interest.cc b/libtransport/src/core/pending_interest.cc
deleted file mode 100644
index fbe98cab5..000000000
--- a/libtransport/src/core/pending_interest.cc
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <core/pending_interest.h>
-
-namespace transport {
-
-namespace core {
-
-PendingInterest::PendingInterest()
- : interest_(nullptr, nullptr),
- timer_(),
- on_content_object_callback_(),
- on_interest_timeout_callback_() {}
-
-PendingInterest::PendingInterest(Interest::Ptr &&interest,
- std::unique_ptr<asio::steady_timer> &&timer)
- : interest_(std::move(interest)),
- timer_(std::move(timer)),
- on_content_object_callback_(),
- on_interest_timeout_callback_() {}
-
-PendingInterest::PendingInterest(
- Interest::Ptr &&interest, OnContentObjectCallback &&on_content_object,
- OnInterestTimeoutCallback &&on_interest_timeout,
- std::unique_ptr<asio::steady_timer> &&timer)
- : interest_(std::move(interest)),
- timer_(std::move(timer)),
- on_content_object_callback_(std::move(on_content_object)),
- on_interest_timeout_callback_(std::move(on_interest_timeout)) {}
-
-PendingInterest::~PendingInterest() {}
-
-void PendingInterest::cancelTimer() { timer_->cancel(); }
-
-void PendingInterest::setInterest(Interest::Ptr &&interest) {
- interest_ = std::move(interest);
-}
-
-Interest::Ptr &&PendingInterest::getInterest() { return std::move(interest_); }
-
-const OnContentObjectCallback &PendingInterest::getOnDataCallback() const {
- return on_content_object_callback_;
-}
-
-void PendingInterest::setOnContentObjectCallback(
- OnContentObjectCallback &&on_content_object) {
- PendingInterest::on_content_object_callback_ = on_content_object;
-}
-
-const OnInterestTimeoutCallback &PendingInterest::getOnTimeoutCallback() const {
- return on_interest_timeout_callback_;
-}
-
-void PendingInterest::setOnTimeoutCallback(
- OnInterestTimeoutCallback &&on_interest_timeout) {
- PendingInterest::on_interest_timeout_callback_ = on_interest_timeout;
-}
-
-} // end namespace core
-
-} // end namespace transport
diff --git a/libtransport/src/core/pending_interest.h b/libtransport/src/core/pending_interest.h
index 99a8bd327..f49348bac 100644
--- a/libtransport/src/core/pending_interest.h
+++ b/libtransport/src/core/pending_interest.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -42,70 +42,60 @@ class PendingInterest {
public:
using Ptr = utils::ObjectPool<PendingInterest>::Ptr;
- // PendingInterest()
- // : interest_(nullptr, nullptr),
- // timer_(),
- // on_content_object_callback_(),
- // on_interest_timeout_callback_() {}
-
- PendingInterest(Interest::Ptr &&interest,
- std::unique_ptr<asio::steady_timer> &&timer)
- : interest_(std::move(interest)),
- timer_(std::move(timer)),
- on_content_object_callback_(),
- on_interest_timeout_callback_() {}
-
- PendingInterest(Interest::Ptr &&interest,
+
+ PendingInterest(asio::io_service &io_service, const Interest::Ptr &interest)
+ : interest_(interest), timer_(io_service) {}
+
+ PendingInterest(asio::io_service &io_service, const Interest::Ptr &interest,
OnContentObjectCallback &&on_content_object,
- OnInterestTimeoutCallback &&on_interest_timeout,
- std::unique_ptr<asio::steady_timer> &&timer)
- : interest_(std::move(interest)),
- timer_(std::move(timer)),
+ OnInterestTimeoutCallback &&on_interest_timeout)
+ : interest_(interest),
+ timer_(io_service),
on_content_object_callback_(std::move(on_content_object)),
on_interest_timeout_callback_(std::move(on_interest_timeout)) {}
~PendingInterest() = default;
template <typename Handler>
- TRANSPORT_ALWAYS_INLINE void startCountdown(Handler &&cb) {
- timer_->expires_from_now(
- std::chrono::milliseconds(interest_->getLifetime()));
- timer_->async_wait(std::forward<Handler &&>(cb));
+ void startCountdown(uint32_t lifetime, Handler &&cb) {
+ timer_.expires_from_now(std::chrono::milliseconds(lifetime));
+ timer_.async_wait(std::forward<Handler>(cb));
}
- TRANSPORT_ALWAYS_INLINE void cancelTimer() { timer_->cancel(); }
-
- TRANSPORT_ALWAYS_INLINE Interest::Ptr &&getInterest() {
- return std::move(interest_);
+ void cancelTimer() {
+ try {
+ timer_.cancel();
+ } catch (asio::system_error &e) {
+ // do nothing
+ }
}
- TRANSPORT_ALWAYS_INLINE void setInterest(Interest::Ptr &interest) {
- interest_ = interest;
- }
+ Interest::Ptr &&getInterest() { return std::move(interest_); }
+
+ const Interest::Ptr &getInterestReference() const { return interest_; }
+
+ void setInterest(const Interest::Ptr &interest) { interest_ = interest; }
- TRANSPORT_ALWAYS_INLINE const OnContentObjectCallback &getOnDataCallback()
- const {
+ const OnContentObjectCallback &getOnDataCallback() const {
return on_content_object_callback_;
}
- TRANSPORT_ALWAYS_INLINE void setOnContentObjectCallback(
- OnContentObjectCallback &&on_content_object) {
- PendingInterest::on_content_object_callback_ = on_content_object;
+ void setOnContentObjectCallback(OnContentObjectCallback &&on_content_object) {
+ PendingInterest::on_content_object_callback_ = std::move(on_content_object);
}
- TRANSPORT_ALWAYS_INLINE const OnInterestTimeoutCallback &
- getOnTimeoutCallback() const {
+ const OnInterestTimeoutCallback &getOnTimeoutCallback() const {
return on_interest_timeout_callback_;
}
- TRANSPORT_ALWAYS_INLINE void setOnTimeoutCallback(
- OnInterestTimeoutCallback &&on_interest_timeout) {
- PendingInterest::on_interest_timeout_callback_ = on_interest_timeout;
+ void setOnTimeoutCallback(OnInterestTimeoutCallback &&on_interest_timeout) {
+ PendingInterest::on_interest_timeout_callback_ =
+ std::move(on_interest_timeout);
}
private:
Interest::Ptr interest_;
- std::unique_ptr<asio::steady_timer> timer_;
+ asio::steady_timer timer_;
OnContentObjectCallback on_content_object_callback_;
OnInterestTimeoutCallback on_interest_timeout_callback_;
};
diff --git a/libtransport/src/core/portal.cc b/libtransport/src/core/portal.cc
index c4c0cf8ba..72b6a6f37 100644
--- a/libtransport/src/core/portal.cc
+++ b/libtransport/src/core/portal.cc
@@ -43,12 +43,14 @@ std::string Portal::io_module_path_ = defaultIoModule();
std::string Portal::defaultIoModule() {
using namespace std::placeholders;
GlobalConfiguration::getInstance().registerConfigurationParser(
- io_module_section,
+ IoModuleConfiguration::section,
std::bind(&Portal::parseIoModuleConfiguration, _1, _2));
GlobalConfiguration::getInstance().registerConfigurationGetter(
- io_module_section, std::bind(&Portal::getModuleConfiguration, _1, _2));
+ IoModuleConfiguration::section,
+ std::bind(&Portal::getModuleConfiguration, _1, _2));
GlobalConfiguration::getInstance().registerConfigurationSetter(
- io_module_section, std::bind(&Portal::setModuleConfiguration, _1, _2));
+ IoModuleConfiguration::section,
+ std::bind(&Portal::setModuleConfiguration, _1, _2));
// return default
conf_.name = default_module;
@@ -57,7 +59,7 @@ std::string Portal::defaultIoModule() {
void Portal::getModuleConfiguration(ConfigurationObject& object,
std::error_code& ec) {
- assert(object.getKey() == io_module_section);
+ DCHECK(object.getKey() == IoModuleConfiguration::section);
auto conf = dynamic_cast<const IoModuleConfiguration&>(object);
conf = conf_;
@@ -103,7 +105,7 @@ std::string getIoModulePath(const std::string& name,
void Portal::setModuleConfiguration(const ConfigurationObject& object,
std::error_code& ec) {
- assert(object.getKey() == io_module_section);
+ DCHECK(object.getKey() == IoModuleConfiguration::section);
const IoModuleConfiguration& conf =
dynamic_cast<const IoModuleConfiguration&>(object);
@@ -147,4 +149,4 @@ void Portal::parseIoModuleConfiguration(const libconfig::Setting& io_config,
}
} // namespace core
-} // namespace transport \ No newline at end of file
+} // namespace transport
diff --git a/libtransport/src/core/portal.h b/libtransport/src/core/portal.h
index f6a9ce85b..c4ba096b9 100644
--- a/libtransport/src/core/portal.h
+++ b/libtransport/src/core/portal.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Copyright (c) 2021-2022 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -15,6 +15,7 @@
#pragma once
+#include <core/global_workers.h>
#include <core/pending_interest.h>
#include <glog/logging.h>
#include <hicn/transport/config.h>
@@ -28,6 +29,7 @@
#include <hicn/transport/interfaces/global_conf_interface.h>
#include <hicn/transport/interfaces/portal.h>
#include <hicn/transport/portability/portability.h>
+#include <hicn/transport/utils/event_thread.h>
#include <hicn/transport/utils/fixed_block_allocator.h>
#include <future>
@@ -54,11 +56,11 @@ class HandlerMemory {
HandlerMemory(const HandlerMemory &) = delete;
HandlerMemory &operator=(const HandlerMemory &) = delete;
- TRANSPORT_ALWAYS_INLINE void *allocate(std::size_t size) {
+ void *allocate(std::size_t /* size */) {
return utils::FixedBlockAllocator<128, 8192>::getInstance().allocateBlock();
}
- TRANSPORT_ALWAYS_INLINE void deallocate(void *pointer) {
+ void deallocate(void *pointer) {
utils::FixedBlockAllocator<128, 8192>::getInstance().deallocateBlock(
pointer);
}
@@ -69,13 +71,9 @@ class HandlerMemory {
HandlerMemory(const HandlerMemory &) = delete;
HandlerMemory &operator=(const HandlerMemory &) = delete;
- TRANSPORT_ALWAYS_INLINE void *allocate(std::size_t size) {
- return ::operator new(size);
- }
+ void *allocate(std::size_t size) { return ::operator new(size); }
- TRANSPORT_ALWAYS_INLINE void deallocate(void *pointer) {
- ::operator delete(pointer);
- }
+ void deallocate(void *pointer) { ::operator delete(pointer); }
#endif
};
@@ -92,21 +90,19 @@ class HandlerAllocator {
HandlerAllocator(const HandlerAllocator<U> &other) noexcept
: memory_(other.memory_) {}
- TRANSPORT_ALWAYS_INLINE bool operator==(
- const HandlerAllocator &other) const noexcept {
+ bool operator==(const HandlerAllocator &other) const noexcept {
return &memory_ == &other.memory_;
}
- TRANSPORT_ALWAYS_INLINE bool operator!=(
- const HandlerAllocator &other) const noexcept {
+ bool operator!=(const HandlerAllocator &other) const noexcept {
return &memory_ != &other.memory_;
}
- TRANSPORT_ALWAYS_INLINE T *allocate(std::size_t n) const {
+ T *allocate(std::size_t n) const {
return static_cast<T *>(memory_.allocate(sizeof(T) * n));
}
- TRANSPORT_ALWAYS_INLINE void deallocate(T *p, std::size_t /*n*/) const {
+ void deallocate(T *p, std::size_t /*n*/) const {
return memory_.deallocate(p);
}
@@ -135,7 +131,7 @@ class CustomAllocatorHandler {
}
template <typename... Args>
- void operator()(Args &&... args) {
+ void operator()(Args &&...args) {
handler_(std::forward<Args>(args)...);
}
@@ -151,49 +147,16 @@ inline CustomAllocatorHandler<Handler> makeCustomAllocatorHandler(
return CustomAllocatorHandler<Handler>(m, h);
}
-class Pool {
- public:
- Pool(asio::io_service &io_service) : io_service_(io_service) {
- increasePendingInterestPool();
- }
-
- TRANSPORT_ALWAYS_INLINE void increasePendingInterestPool() {
- // Create pool of pending interests to reuse
- for (uint32_t i = 0; i < pit_size; i++) {
- pending_interests_pool_.add(new PendingInterest(
- Interest::Ptr(nullptr),
- std::make_unique<asio::steady_timer>(io_service_)));
- }
- }
- PendingInterest::Ptr getPendingInterest() {
- auto res = pending_interests_pool_.get();
- while (TRANSPORT_EXPECT_FALSE(!res.first)) {
- increasePendingInterestPool();
- res = pending_interests_pool_.get();
- }
-
- return std::move(res.second);
- }
-
- private:
- utils::ObjectPool<PendingInterest> pending_interests_pool_;
- asio::io_service &io_service_;
-};
-
} // namespace portal_details
class PortalConfiguration;
-using PendingInterestHashTable =
- std::unordered_map<uint32_t, PendingInterest::Ptr>;
-
-using interface::BindConfig;
+using PendingInterestHashTable = std::unordered_map<uint32_t, PendingInterest>;
/**
* Portal is a opaque class which is used for sending/receiving interest/data
- * packets over multiple kind of connector. The connector itself is defined by
- * the template ForwarderInt, which is resolved at compile time. It is then not
- * possible to decide at runtime what the connector will be.
+ * packets over multiple kind of io_modules. The io_module itself is an external
+ * module loaded at runtime.
*
* The tasks performed by portal are the following:
* - Sending/Receiving Interest packets
@@ -210,83 +173,117 @@ using interface::BindConfig;
* users of this class.
*/
-class Portal {
- public:
- using ConsumerCallback = interface::Portal::ConsumerCallback;
- using ProducerCallback = interface::Portal::ProducerCallback;
+class Portal : public ::utils::NonCopyable,
+ public std::enable_shared_from_this<Portal> {
+ private:
+ Portal() : Portal(GlobalWorkers::getInstance().getWorker()) {}
+ Portal(::utils::EventThread &worker)
+ : io_module_(nullptr),
+ worker_(worker),
+ app_name_("libtransport_application"),
+ transport_callback_(nullptr),
+ is_consumer_(false) {}
+
+ public:
+ using TransportCallback = interface::Portal::TransportCallback;
friend class PortalConfiguration;
- Portal() : Portal(internal_io_service_) {}
+ static std::shared_ptr<Portal> createShared() {
+ return std::shared_ptr<Portal>(new Portal());
+ }
- Portal(asio::io_service &io_service)
- : io_module_(nullptr, [](IoModule *module) { IoModule::unload(module); }),
- io_service_(io_service),
- packet_pool_(io_service),
- app_name_("libtransport_application"),
- consumer_callback_(nullptr),
- producer_callback_(nullptr),
- is_consumer_(false) {
- /**
- * This workaroung allows to initialize memory for packet buffers *before*
- * any static variables that may be initialized in the io_modules. In this
- * way static variables in modules will be destroyed before the packet
- * memory.
- */
- PacketManager<>::getInstance();
+ static std::shared_ptr<Portal> createShared(::utils::EventThread &worker) {
+ return std::shared_ptr<Portal>(new Portal(worker));
}
+
+ bool isConnected() const { return io_module_.get() != nullptr; }
+
/**
- * Set the consumer callback.
+ * Set the transport callback. Must be called from the same worker thread.
*
- * @param consumer_callback - The pointer to the ConsumerCallback object.
+ * @param consumer_callback - The pointer to the TransportCallback object.
*/
- void setConsumerCallback(ConsumerCallback *consumer_callback) {
- consumer_callback_ = consumer_callback;
+ void registerTransportCallback(TransportCallback *transport_callback) {
+ DCHECK(std::this_thread::get_id() == worker_.getThreadId());
+ transport_callback_ = transport_callback;
}
/**
- * Set the producer callback.
- *
- * @param producer_callback - The pointer to the ProducerCallback object.
+ * Unset the consumer callback. Must be called from the same worker thread.
*/
- void setProducerCallback(ProducerCallback *producer_callback) {
- producer_callback_ = producer_callback;
+ void unregisterTransportCallback() {
+ DCHECK(std::this_thread::get_id() == worker_.getThreadId());
+ transport_callback_ = nullptr;
}
/**
- * Specify the output interface to use. This method will be useful in a future
- * scenario where the library will be able to forward packets without
+ * Specify the output interface to use. This method will be useful in a
+ * future scenario where the library will be able to forward packets without
* connecting to a local forwarder. Now it is not used.
*
* @param output_interface - The output interface to use for
* forwarding/receiving packets.
*/
- TRANSPORT_ALWAYS_INLINE void setOutputInterface(
- const std::string &output_interface) {
- io_module_->setOutputInterface(output_interface);
+ void setOutputInterface(const std::string &output_interface) {
+ if (io_module_) {
+ io_module_->setOutputInterface(output_interface);
+ }
}
/**
* Connect the transport to the local hicn forwarder.
*
- * @param is_consumer - Boolean specifying if the application on top of portal
- * is a consumer or a producer.
+ * @param is_consumer - Boolean specifying if the application on top of
+ * portal is a consumer or a producer.
*/
- TRANSPORT_ALWAYS_INLINE void connect(bool is_consumer = true) {
- if (!io_module_) {
- pending_interest_hash_table_.reserve(portal_details::pit_size);
- io_module_.reset(IoModule::load(io_module_path_.c_str()));
-
- CHECK(io_module_);
-
- io_module_->init(std::bind(&Portal::processIncomingMessages, this,
- std::placeholders::_1, std::placeholders::_2,
- std::placeholders::_3),
- std::bind(&Portal::setLocalRoutes, this), io_service_,
- app_name_);
- io_module_->connect(is_consumer);
- is_consumer_ = is_consumer;
+ void connect(bool is_consumer = true) {
+ if (isConnected()) {
+ return;
}
+
+ worker_.addAndWaitForExecution([this, is_consumer]() {
+ if (!io_module_) {
+ pending_interest_hash_table_.reserve(portal_details::pit_size);
+ io_module_.reset(IoModule::load(io_module_path_.c_str()));
+
+ CHECK(io_module_);
+
+ std::weak_ptr<Portal> self(shared_from_this());
+
+ io_module_->init(
+ [self](Connector *c, const std::vector<utils::MemBuf::Ptr> &buffers,
+ const std::error_code &ec) {
+ if (auto ptr = self.lock()) {
+ ptr->processIncomingMessages(c, buffers, ec);
+ }
+ },
+ [self](Connector *c, const std::error_code &ec) {
+ if (!ec) {
+ return;
+ }
+ auto ptr = self.lock();
+ if (ptr && ptr->transport_callback_) {
+ ptr->transport_callback_->onError(ec);
+ }
+ },
+ [self]([[maybe_unused]] Connector *c) { /* Nothing to do here */ },
+ [self](Connector *c, const std::error_code &ec) {
+ auto ptr = self.lock();
+ if (ptr) {
+ if (ec && ptr->transport_callback_) {
+ ptr->transport_callback_->onError(ec);
+ return;
+ }
+ ptr->setLocalRoutes();
+ }
+ },
+ worker_.getIoService(), app_name_);
+
+ io_module_->connect(is_consumer);
+ is_consumer_ = is_consumer;
+ }
+ });
}
/**
@@ -295,18 +292,13 @@ class Portal {
~Portal() { killConnection(); }
/**
- * Compute name hash
- */
- TRANSPORT_ALWAYS_INLINE uint32_t getHash(const Name &name) {
- return name.getHash32(false) + name.getSuffix();
- }
-
- /**
* Check if there is already a pending interest for a given name.
*
* @param name - The interest name.
*/
- TRANSPORT_ALWAYS_INLINE bool interestIsPending(const Name &name) {
+ bool interestIsPending(const Name &name) {
+ DCHECK(std::this_thread::get_id() == worker_.getThreadId());
+
auto it = pending_interest_hash_table_.find(getHash(name));
if (it != pending_interest_hash_table_.end()) {
return true;
@@ -316,58 +308,53 @@ class Portal {
}
/**
- * Send an interest through to the local forwarder.
+ * @brief Add interest to PIT
*
- * @param interest - The pointer to the interest. The ownership of the
- * interest is transferred by the caller to portal.
- *
- * @param on_content_object_callback - If the caller wishes to use a different
- * callback to be called for this interest, it can set this parameter.
- * Otherwise ConsumerCallback::onContentObject will be used.
- *
- * @param on_interest_timeout_callback - If the caller wishes to use a
- * different callback to be called for this interest, it can set this
- * parameter. Otherwise ConsumerCallback::onTimeout will be used.
*/
- TRANSPORT_ALWAYS_INLINE void sendInterest(
- Interest::Ptr &&interest,
+ void addInterestToPIT(
+ const Interest::Ptr &interest, uint32_t lifetime,
OnContentObjectCallback &&on_content_object_callback = UNSET_CALLBACK,
OnInterestTimeoutCallback &&on_interest_timeout_callback =
UNSET_CALLBACK) {
- // Send it
- interest->encodeSuffixes();
- io_module_->send(*interest);
-
uint32_t initial_hash = interest->getName().getHash32(false);
auto hash = initial_hash + interest->getName().getSuffix();
uint32_t seq = interest->getName().getSuffix();
- uint32_t *suffix = interest->firstSuffix();
- auto n_suffixes = interest->numberOfSuffixes();
+ const uint32_t *suffix = interest->firstSuffix() != nullptr
+ ? interest->firstSuffix() + 1
+ : nullptr;
+ auto n_suffixes =
+ interest->numberOfSuffixes() > 0 ? interest->numberOfSuffixes() - 1 : 0;
uint32_t counter = 0;
// Set timers
do {
- auto pending_interest = packet_pool_.getPendingInterest();
- pending_interest->setInterest(interest);
- pending_interest->setOnContentObjectCallback(
+ auto pend_int = pending_interest_hash_table_.try_emplace(
+ hash, worker_.getIoService(), interest);
+ PendingInterest &pending_interest = pend_int.first->second;
+ if (!pend_int.second) {
+ // element was already in map
+ pend_int.first->second.cancelTimer();
+ pending_interest.setInterest(interest);
+ }
+
+ pending_interest.setOnContentObjectCallback(
std::move(on_content_object_callback));
- pending_interest->setOnTimeoutCallback(
+ pending_interest.setOnTimeoutCallback(
std::move(on_interest_timeout_callback));
- pending_interest->startCountdown(
- portal_details::makeCustomAllocatorHandler(
- async_callback_memory_,
- std::bind(&Portal::timerHandler, this, std::placeholders::_1,
- hash, seq)));
-
- auto it = pending_interest_hash_table_.find(hash);
- if (it != pending_interest_hash_table_.end()) {
- it->second->cancelTimer();
-
- // Get reference to interest packet in order to have it destroyed.
- auto _int = it->second->getInterest();
- it->second = std::move(pending_interest);
- } else {
- pending_interest_hash_table_[hash] = std::move(pending_interest);
+ if (is_consumer_) {
+ auto self = weak_from_this();
+ pending_interest.startCountdown(
+ lifetime, portal_details::makeCustomAllocatorHandler(
+ async_callback_memory_,
+ [self, hash, seq](const std::error_code &ec) {
+ if (TRANSPORT_EXPECT_FALSE(ec.operator bool())) {
+ return;
+ }
+
+ if (auto ptr = self.lock()) {
+ ptr->timerHandler(hash, seq);
+ }
+ }));
}
if (suffix) {
@@ -375,224 +362,261 @@ class Portal {
seq = *suffix;
suffix++;
}
-
} while (counter++ < n_suffixes);
}
- /**
- * Handler fot the timer set when the interest is sent.
- *
- * @param ec - Error code which says whether the timer expired or has been
- * canceled upon data packet reception.
- *
- * @param hash - The index of the interest in the pending interest hash table.
- */
- TRANSPORT_ALWAYS_INLINE void timerHandler(const std::error_code &ec,
- uint32_t hash, uint32_t seq) {
- bool is_stopped = io_service_.stopped();
- if (TRANSPORT_EXPECT_FALSE(is_stopped)) {
- return;
- }
+ void matchContentObjectInPIT(ContentObject &content_object) {
+ uint32_t hash = getHash(content_object.getName());
+ auto it = pending_interest_hash_table_.find(hash);
+ if (it != pending_interest_hash_table_.end()) {
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "Found pending interest.";
+
+ PendingInterest &pend_interest = it->second;
+ pend_interest.cancelTimer();
+ auto _int = pend_interest.getInterest();
+ auto callback = pend_interest.getOnDataCallback();
+ pending_interest_hash_table_.erase(it);
- if (TRANSPORT_EXPECT_TRUE(!ec)) {
- PendingInterestHashTable::iterator it =
- pending_interest_hash_table_.find(hash);
- if (it != pending_interest_hash_table_.end()) {
- PendingInterest::Ptr ptr = std::move(it->second);
- pending_interest_hash_table_.erase(it);
- auto _int = ptr->getInterest();
- Name &name = const_cast<Name &>(_int->getName());
- name.setSuffix(seq);
-
- if (ptr->getOnTimeoutCallback() != UNSET_CALLBACK) {
- ptr->on_interest_timeout_callback_(_int, name);
- } else if (consumer_callback_) {
- consumer_callback_->onTimeout(_int, name);
+ if (is_consumer_) {
+ // Send object is for the app
+ if (callback != UNSET_CALLBACK) {
+ callback(*_int, content_object);
+ } else if (transport_callback_) {
+ transport_callback_->onContentObject(*_int, content_object);
}
+ } else {
+ // Send content object to the network
+ io_module_->send(content_object);
}
+ } else if (is_consumer_) {
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "No interest pending for received content object.";
}
}
/**
- * Register a producer name to the local forwarder and optionally set the
- * content store size in a per-face manner.
+ * Send an interest through to the local forwarder.
*
- * @param config - The configuration for the local forwarder binding.
- */
- TRANSPORT_ALWAYS_INLINE void bind(const BindConfig &config) {
- assert(io_module_);
- io_module_->setContentStoreSize(config.csReserved());
- served_namespaces_.push_back(config.prefix());
- setLocalRoutes();
- }
-
- /**
- * Start the event loop. This function blocks here and calls the callback set
- * by the application upon interest/data received or timeout.
+ * @param interest - The pointer to the interest. The ownership of the
+ * interest is transferred by the caller to portal.
+ *
+ * @param on_content_object_callback - If the caller wishes to use a
+ * different callback to be called for this interest, it can set this
+ * parameter. Otherwise ConsumerCallback::onContentObject will be used.
+ *
+ * @param on_interest_timeout_callback - If the caller wishes to use a
+ * different callback to be called for this interest, it can set this
+ * parameter. Otherwise ConsumerCallback::onTimeout will be used.
*/
- TRANSPORT_ALWAYS_INLINE void runEventsLoop() {
- if (io_service_.stopped()) {
- io_service_.reset(); // ensure that run()/poll() will do some work
- }
+ void sendInterest(
+ Interest::Ptr &interest, uint32_t lifetime,
+ OnContentObjectCallback &&on_content_object_callback = UNSET_CALLBACK,
+ OnInterestTimeoutCallback &&on_interest_timeout_callback =
+ UNSET_CALLBACK) {
+ DCHECK(std::this_thread::get_id() == worker_.getThreadId());
- io_service_.run();
+ addInterestToPIT(interest, lifetime, std::move(on_content_object_callback),
+ std::move(on_interest_timeout_callback));
+ interest->serializeSuffixes();
+ io_module_->send(*interest);
}
/**
- * Run one event and return.
+ * Handler fot the timer set when the interest is sent.
+ *
+ * @param ec - Error code which says whether the timer expired or has been
+ * canceled upon data packet reception.
+ *
+ * @param hash - The index of the interest in the pending interest hash
+ * table.
*/
- TRANSPORT_ALWAYS_INLINE void runOneEvent() {
- if (io_service_.stopped()) {
- io_service_.reset(); // ensure that run()/poll() will do some work
- }
+ void timerHandler(uint32_t hash, uint32_t seq) {
+ PendingInterestHashTable::iterator it =
+ pending_interest_hash_table_.find(hash);
+ if (it != pending_interest_hash_table_.end()) {
+ PendingInterest &pend_interest = it->second;
+ auto _int = pend_interest.getInterest();
+ auto callback = pend_interest.getOnTimeoutCallback();
+ pending_interest_hash_table_.erase(it);
+ Name &name = const_cast<Name &>(_int->getName());
+ name.setSuffix(seq);
- io_service_.run_one();
+ if (callback != UNSET_CALLBACK) {
+ callback(_int, name);
+ } else if (transport_callback_) {
+ transport_callback_->onTimeout(_int, name);
+ }
+ }
}
/**
- * Send a data packet to the local forwarder. As opposite to sendInterest, the
- * ownership of the content object is not transferred to the portal.
+ * Send a data packet to the local forwarder.
*
* @param content_object - The data packet.
*/
- TRANSPORT_ALWAYS_INLINE void sendContentObject(
- ContentObject &content_object) {
- io_module_->send(content_object);
+ void sendContentObject(ContentObject &content_object) {
+ DCHECK(io_module_);
+ DCHECK(std::this_thread::get_id() == worker_.getThreadId());
+ matchContentObjectInPIT(content_object);
}
/**
- * Stop the event loop, canceling all the pending events in the event queue.
- *
- * Beware that stopping the event loop DOES NOT disconnect the transport from
- * the local forwarder, the connector underneath will stay connected.
+ * Disconnect the transport from the local forwarder.
*/
- TRANSPORT_ALWAYS_INLINE void stopEventsLoop() {
- if (!io_service_.stopped()) {
- io_service_.dispatch([this]() {
- clear();
- io_service_.stop();
- });
+ void killConnection() {
+ if (TRANSPORT_EXPECT_TRUE(io_module_ != nullptr)) {
+ io_module_->closeConnection();
}
}
/**
- * Disconnect the transport from the local forwarder.
+ * Clear the pending interest hash table.
*/
- TRANSPORT_ALWAYS_INLINE void killConnection() {
- io_module_->closeConnection();
+ void clear() {
+ worker_.tryRunHandlerNow([self{shared_from_this()}]() { self->doClear(); });
}
/**
- * Clear the pending interest hash table.
+ * Get a reference to the io_service object.
*/
- TRANSPORT_ALWAYS_INLINE void clear() {
- if (!io_service_.stopped()) {
- io_service_.dispatch(std::bind(&Portal::doClear, this));
- } else {
- doClear();
- }
- }
+ utils::EventThread &getThread() { return worker_; }
/**
- * Remove one pending interest.
+ * Register a route to the local forwarder.
*/
- TRANSPORT_ALWAYS_INLINE void clearOne(const Name &name) {
- if (!io_service_.stopped()) {
- io_service_.dispatch(std::bind(&Portal::doClearOne, this, name));
- } else {
- doClearOne(name);
- }
+ void registerRoute(const Prefix &prefix) {
+ std::weak_ptr<Portal> self = shared_from_this();
+ worker_.tryRunHandlerNow([self, prefix]() {
+ if (auto ptr = self.lock()) {
+ auto ret = ptr->served_namespaces_.insert(prefix);
+ if (ret.second && ptr->io_module_ && ptr->io_module_->isConnected()) {
+ ptr->io_module_->registerRoute(prefix);
+ }
+ }
+ });
}
/**
- * Get a reference to the io_service object.
+ * Send a MAP-Me update to traverse NATs.
*/
- TRANSPORT_ALWAYS_INLINE asio::io_service &getIoService() {
- return io_service_;
+ TRANSPORT_ALWAYS_INLINE void sendMapme() {
+ if (io_module_->isConnected()) {
+ io_module_->sendMapme();
+ }
}
/**
- * Register a route to the local forwarder.
+ * set forwarding strategy
*/
- TRANSPORT_ALWAYS_INLINE void registerRoute(Prefix &prefix) {
- served_namespaces_.push_back(prefix);
+ TRANSPORT_ALWAYS_INLINE void setForwardingStrategy(Prefix &prefix,
+ std::string &strategy) {
if (io_module_->isConnected()) {
- io_module_->registerRoute(prefix);
+ io_module_->setForwardingStrategy(prefix, strategy);
}
}
/**
* Check if the transport is connected to a forwarder or not
*/
- TRANSPORT_ALWAYS_INLINE bool isConnectedToFwd() {
+ bool isConnectedToFwd() {
std::string mod = io_module_path_.substr(0, io_module_path_.find("."));
if (mod == "forwarder_module") return false;
return true;
}
+ auto &getServedNamespaces() { return served_namespaces_; }
+
private:
/**
+ * Compute name hash
+ */
+ uint32_t getHash(const Name &name) {
+ return name.getHash32(false) + name.getSuffix();
+ }
+
+ /**
* Clear the pending interest hash table.
*/
- TRANSPORT_ALWAYS_INLINE void doClear() {
+ void doClear() {
for (auto &pend_interest : pending_interest_hash_table_) {
- pend_interest.second->cancelTimer();
-
- // Get interest packet from pending interest and do nothing with it. It
- // will get destroyed as it goes out of scope.
- auto _int = pend_interest.second->getInterest();
+ pend_interest.second.cancelTimer();
}
pending_interest_hash_table_.clear();
}
- /**
- * Remove one pending interest.
- */
- TRANSPORT_ALWAYS_INLINE void doClearOne(const Name &name) {
- auto it = pending_interest_hash_table_.find(getHash(name));
-
- if (it != pending_interest_hash_table_.end()) {
- it->second->cancelTimer();
+ void dumpPIT() {
+ std::vector<Name> sorted_elements;
+ for (const auto &[key, value] : pending_interest_hash_table_) {
+ sorted_elements.push_back(value.getInterestReference()->getName());
+ }
- // Get interest packet from pending interest and do nothing with it. It
- // will get destroyed as it goes out of scope.
- auto _int = it->second->getInterest();
+ std::sort(sorted_elements.begin(), sorted_elements.end(),
+ [](const Name &a, const Name &b) {
+ return a.getSuffix() < b.getSuffix();
+ });
- pending_interest_hash_table_.erase(it);
+ for (auto &elt : sorted_elements) {
+ LOG(INFO) << elt;
}
}
/**
- * Callback called by the underlying connector upon reception of a packet from
- * the local forwarder.
+ * Callback called by the underlying connector upon reception of a packet
+ * from the local forwarder.
*
* @param packet_buffer - The bytes of the packet.
*/
- TRANSPORT_ALWAYS_INLINE void processIncomingMessages(
- Connector *c, utils::MemBuf &buffer, const std::error_code &ec) {
- bool is_stopped = io_service_.stopped();
- if (TRANSPORT_EXPECT_FALSE(is_stopped)) {
+ void processIncomingMessages(Connector *c,
+ const std::vector<utils::MemBuf::Ptr> &buffers,
+ const std::error_code &ec) {
+ if (!transport_callback_) {
return;
}
- if (TRANSPORT_EXPECT_FALSE(io_module_->isControlMessage(buffer.data()))) {
- processControlMessage(buffer);
+ if (TRANSPORT_EXPECT_FALSE(ec.operator bool())) {
+ // Error receiving from underlying infra.
+ if (transport_callback_) {
+ transport_callback_->onError(ec);
+ }
+
return;
}
- // The buffer is a base class for an interest or a content object
- Packet &packet_buffer = static_cast<Packet &>(buffer);
+ for (auto &buffer_ptr : buffers) {
+ auto &buffer = *buffer_ptr;
- auto format = packet_buffer.getFormat();
- if (TRANSPORT_EXPECT_TRUE(_is_tcp(format))) {
- if (is_consumer_) {
- processContentObject(static_cast<ContentObject &>(packet_buffer));
- } else {
- processInterest(static_cast<Interest &>(packet_buffer));
+ if (TRANSPORT_EXPECT_FALSE(io_module_->isControlMessage(buffer))) {
+ processControlMessage(buffer);
+ return;
+ }
+
+ // The buffer is a base class for an interest or a content object
+ Packet &packet_buffer = static_cast<Packet &>(buffer);
+
+ switch (packet_buffer.getType()) {
+ case HICN_PACKET_TYPE_INTEREST:
+ if (!is_consumer_) {
+ processInterest(static_cast<Interest &>(packet_buffer));
+ } else {
+ LOG(ERROR) << "Received an Interest packet with name "
+ << packet_buffer.getName()
+ << " in a consumer transport. Ignoring it.";
+ }
+ break;
+ case HICN_PACKET_TYPE_DATA:
+ if (is_consumer_) {
+ processContentObject(static_cast<ContentObject &>(packet_buffer));
+ } else {
+ LOG(ERROR) << "Received a Data packet with name "
+ << packet_buffer.getName()
+ << " in a producer transport. Ignoring it.";
+ }
+ break;
+ default:
+ LOG(ERROR) << "Received not supported packet. Ignoring it.";
+ break;
}
- } else {
- LOG(ERROR) << "Received not supported packet. Ignoring it.";
}
}
@@ -601,19 +625,25 @@ class Portal {
* It register the prefixes in the served_namespaces_ list to the local
* forwarder.
*/
- TRANSPORT_ALWAYS_INLINE void setLocalRoutes() {
+ void setLocalRoutes() {
+ DCHECK(io_module_);
+ DCHECK(io_module_->isConnected());
+ DCHECK(std::this_thread::get_id() == worker_.getThreadId());
+
for (auto &prefix : served_namespaces_) {
- if (io_module_->isConnected()) {
- io_module_->registerRoute(prefix);
- }
+ io_module_->registerRoute(prefix);
}
}
- TRANSPORT_ALWAYS_INLINE void processInterest(Interest &interest) {
+ void processInterest(Interest &interest) {
// Interest for a producer
DLOG_IF(INFO, VLOG_IS_ON(3)) << "processInterest " << interest.getName();
- if (TRANSPORT_EXPECT_TRUE(producer_callback_ != nullptr)) {
- producer_callback_->onInterest(interest);
+
+ // Save interest in PIT
+ interest.deserializeSuffixes();
+ addInterestToPIT(interest.shared_from_this(), interest.getLifetime());
+ if (TRANSPORT_EXPECT_TRUE(transport_callback_ != nullptr)) {
+ transport_callback_->onInterest(interest);
}
}
@@ -625,57 +655,33 @@ class Portal {
*
* @param content_object - The data packet
*/
- TRANSPORT_ALWAYS_INLINE void processContentObject(
- ContentObject &content_object) {
+ void processContentObject(ContentObject &content_object) {
DLOG_IF(INFO, VLOG_IS_ON(3))
<< "processContentObject " << content_object.getName();
- uint32_t hash = getHash(content_object.getName());
-
- auto it = pending_interest_hash_table_.find(hash);
- if (it != pending_interest_hash_table_.end()) {
- DLOG_IF(INFO, VLOG_IS_ON(3)) << "Found pending interest.";
-
- PendingInterest::Ptr interest_ptr = std::move(it->second);
- pending_interest_hash_table_.erase(it);
- interest_ptr->cancelTimer();
- auto _int = interest_ptr->getInterest();
-
- if (interest_ptr->getOnDataCallback() != UNSET_CALLBACK) {
- interest_ptr->on_content_object_callback_(*_int, content_object);
- } else if (consumer_callback_) {
- consumer_callback_->onContentObject(*_int, content_object);
- }
- } else {
- DLOG_IF(INFO, VLOG_IS_ON(3))
- << "No interest pending for received content object.";
- }
+ matchContentObjectInPIT(content_object);
}
/**
- * Process a control message. Control messages are different depending on the
- * connector, then the forwarder_interface will do the job of understanding
- * them.
+ * Process a control message. Control messages are different depending on
+ * the connector, then the forwarder_interface will do the job of
+ * understanding them.
*/
- TRANSPORT_ALWAYS_INLINE void processControlMessage(
- utils::MemBuf &packet_buffer) {
+ void processControlMessage(utils::MemBuf &packet_buffer) {
io_module_->processControlMessageReply(packet_buffer);
}
private:
portal_details::HandlerMemory async_callback_memory_;
- std::unique_ptr<IoModule, void (*)(IoModule *)> io_module_;
+ std::unique_ptr<IoModule> io_module_;
- asio::io_service &io_service_;
- asio::io_service internal_io_service_;
- portal_details::Pool packet_pool_;
+ ::utils::EventThread &worker_;
std::string app_name_;
PendingInterestHashTable pending_interest_hash_table_;
- std::list<Prefix> served_namespaces_;
+ std::set<Prefix> served_namespaces_;
- ConsumerCallback *consumer_callback_;
- ProducerCallback *producer_callback_;
+ TransportCallback *transport_callback_;
bool is_consumer_;
diff --git a/libtransport/src/core/prefix.cc b/libtransport/src/core/prefix.cc
index d598cff75..8c5b153bc 100644
--- a/libtransport/src/core/prefix.cc
+++ b/libtransport/src/core/prefix.cc
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Copyright (c) 2021-2022 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -13,8 +13,10 @@
* limitations under the License.
*/
+#include <glog/logging.h>
#include <hicn/transport/core/prefix.h>
#include <hicn/transport/errors/errors.h>
+#include <hicn/transport/portability/endianess.h>
#include <hicn/transport/utils/string_tokenizer.h>
#ifndef _WIN32
@@ -35,11 +37,7 @@ namespace transport {
namespace core {
-Prefix::Prefix() { std::memset(&ip_prefix_, 0, sizeof(ip_prefix_t)); }
-
-Prefix::Prefix(const char *prefix) : Prefix(std::string(prefix)) {}
-
-Prefix::Prefix(std::string &&prefix) : Prefix(prefix) {}
+Prefix::Prefix() { std::memset(&hicn_ip_prefix_, 0, sizeof(hicn_ip_prefix_t)); }
Prefix::Prefix(const std::string &prefix) {
utils::StringTokenizer st(prefix, "/");
@@ -56,7 +54,7 @@ Prefix::Prefix(const std::string &prefix) {
buildPrefix(ip_address, uint16_t(atoi(prefix_length.c_str())), family);
}
-Prefix::Prefix(std::string &prefix, uint16_t prefix_length) {
+Prefix::Prefix(const std::string &prefix, uint16_t prefix_length) {
int family = get_addr_family(prefix.c_str());
buildPrefix(prefix, prefix_length, family);
}
@@ -68,24 +66,28 @@ Prefix::Prefix(const core::Name &content_name, uint16_t prefix_length) {
throw errors::InvalidIpAddressException();
}
- ip_prefix_ = content_name.toIpAddress();
- ip_prefix_.len = (u8)prefix_length;
- ip_prefix_.family = family;
+ hicn_ip_prefix_ = content_name.toIpAddress();
+ hicn_ip_prefix_.len = (u8)prefix_length;
+ hicn_ip_prefix_.family = family;
}
-void Prefix::buildPrefix(std::string &prefix, uint16_t prefix_length,
+void Prefix::buildPrefix(const std::string &prefix, uint16_t prefix_length,
int family) {
if (!checkPrefixLengthAndAddressFamily(prefix_length, family)) {
throw errors::InvalidIpAddressException();
}
+ std::memset(&hicn_ip_prefix_, 0, sizeof(hicn_ip_prefix_t));
+
int ret;
switch (family) {
case AF_INET:
- ret = inet_pton(AF_INET, prefix.c_str(), ip_prefix_.address.v4.buffer);
+ ret =
+ inet_pton(AF_INET, prefix.c_str(), hicn_ip_prefix_.address.v4.buffer);
break;
case AF_INET6:
- ret = inet_pton(AF_INET6, prefix.c_str(), ip_prefix_.address.v6.buffer);
+ ret = inet_pton(AF_INET6, prefix.c_str(),
+ hicn_ip_prefix_.address.v6.buffer);
break;
default:
throw errors::InvalidIpAddressException();
@@ -95,14 +97,22 @@ void Prefix::buildPrefix(std::string &prefix, uint16_t prefix_length,
throw errors::InvalidIpAddressException();
}
- ip_prefix_.len = (u8)prefix_length;
- ip_prefix_.family = family;
+ hicn_ip_prefix_.len = (u8)prefix_length;
+ hicn_ip_prefix_.family = family;
+}
+
+bool Prefix::operator<(const Prefix &other) const {
+ return hicn_ip_prefix_cmp(&hicn_ip_prefix_, &other.hicn_ip_prefix_) < 0;
+}
+
+bool Prefix::operator==(const Prefix &other) const {
+ return hicn_ip_prefix_cmp(&hicn_ip_prefix_, &other.hicn_ip_prefix_) == 0;
}
std::unique_ptr<Sockaddr> Prefix::toSockaddr() const {
Sockaddr *ret = nullptr;
- switch (ip_prefix_.family) {
+ switch (hicn_ip_prefix_.family) {
case AF_INET6:
ret = (Sockaddr *)new Sockaddr6;
break;
@@ -113,72 +123,79 @@ std::unique_ptr<Sockaddr> Prefix::toSockaddr() const {
throw errors::InvalidIpAddressException();
}
- if (ip_prefix_to_sockaddr(&ip_prefix_, ret) < 0) {
+ if (hicn_ip_prefix_to_sockaddr(&hicn_ip_prefix_, ret) < 0) {
throw errors::InvalidIpAddressException();
}
return std::unique_ptr<Sockaddr>(ret);
}
-uint16_t Prefix::getPrefixLength() const { return ip_prefix_.len; }
+uint16_t Prefix::getPrefixLength() const { return hicn_ip_prefix_.len; }
Prefix &Prefix::setPrefixLength(uint16_t prefix_length) {
- ip_prefix_.len = (u8)prefix_length;
- return *this;
-}
-
-int Prefix::getAddressFamily() const { return ip_prefix_.family; }
+ if (!checkPrefixLengthAndAddressFamily(prefix_length,
+ hicn_ip_prefix_.family)) {
+ throw errors::InvalidIpAddressException();
+ }
-Prefix &Prefix::setAddressFamily(int address_family) {
- ip_prefix_.family = address_family;
+ hicn_ip_prefix_.len = (u8)prefix_length;
return *this;
}
+int Prefix::getAddressFamily() const { return hicn_ip_prefix_.family; }
+
std::string Prefix::getNetwork() const {
- if (!checkPrefixLengthAndAddressFamily(ip_prefix_.len, ip_prefix_.family)) {
+ if (!checkPrefixLengthAndAddressFamily(hicn_ip_prefix_.len,
+ hicn_ip_prefix_.family)) {
throw errors::InvalidIpAddressException();
}
- std::size_t size =
- ip_prefix_.family == 4 + AF_INET ? INET_ADDRSTRLEN : INET6_ADDRSTRLEN;
-
- std::string network(size, 0);
+ char buffer[INET6_ADDRSTRLEN];
- if (ip_prefix_ntop_short(&ip_prefix_, (char *)network.c_str(), size) < 0) {
+ if (hicn_ip_prefix_ntop_short(&hicn_ip_prefix_, buffer, INET6_ADDRSTRLEN) <
+ 0) {
throw errors::RuntimeException(
"Impossible to retrieve network from ip address.");
}
- return network;
+ return buffer;
}
-int Prefix::contains(const ip_address_t &content_name) const {
- int res =
- ip_address_cmp(&content_name, &(ip_prefix_.address), ip_prefix_.family);
-
- if (ip_prefix_.len != (ip_prefix_.family == AF_INET6 ? IPV6_ADDR_LEN_BITS
- : IPV4_ADDR_LEN_BITS)) {
- const u8 *ip_prefix_buffer =
- ip_address_get_buffer(&(ip_prefix_.address), ip_prefix_.family);
- const u8 *content_name_buffer =
- ip_address_get_buffer(&content_name, ip_prefix_.family);
- uint8_t mask = 0xFF >> (ip_prefix_.len % 8);
- mask = ~mask;
-
- res += (ip_prefix_buffer[ip_prefix_.len] & mask) ==
- (content_name_buffer[ip_prefix_.len] & mask);
+bool Prefix::contains(const hicn_ip_address_t &content_name) const {
+ uint64_t mask[2] = {0, 0};
+ auto content_name_copy = content_name;
+ auto network_copy = hicn_ip_prefix_.address;
+
+ auto prefix_length = getPrefixLength();
+ if (hicn_ip_prefix_.family == AF_INET) {
+ prefix_length += 3 * IPV4_ADDR_LEN_BITS;
}
- return res;
-}
+ if (prefix_length == 0) {
+ mask[0] = mask[1] = 0;
+ } else if (prefix_length <= 64) {
+ mask[0] = portability::host_to_net((uint64_t)(~0) << (64 - prefix_length));
+ mask[1] = 0;
+ } else if (prefix_length == 128) {
+ mask[0] = mask[1] = 0xffffffffffffffff;
+ } else {
+ prefix_length -= 64;
+ mask[0] = 0xffffffffffffffff;
+ mask[1] = portability::host_to_net((uint64_t)(~0) << (64 - prefix_length));
+ }
-int Prefix::contains(const core::Name &content_name) const {
- return contains(content_name.toIpAddress().address);
+ // Apply mask
+ content_name_copy.v6.as_u64[0] &= mask[0];
+ content_name_copy.v6.as_u64[1] &= mask[1];
+
+ network_copy.v6.as_u64[0] &= mask[0];
+ network_copy.v6.as_u64[1] &= mask[1];
+
+ return hicn_ip_address_cmp(&network_copy, &content_name_copy) == 0;
}
-Name Prefix::getName() const {
- std::string s(getNetwork());
- return Name(s);
+bool Prefix::contains(const core::Name &content_name) const {
+ return contains(content_name.toIpAddress().address);
}
/*
@@ -187,23 +204,25 @@ Name Prefix::getName() const {
*/
Name Prefix::getName(const core::Name &mask, const core::Name &components,
const core::Name &content_name) const {
- if (ip_prefix_.family != mask.getAddressFamily() ||
- ip_prefix_.family != components.getAddressFamily() ||
- ip_prefix_.family != content_name.getAddressFamily())
+ if (hicn_ip_prefix_.family != mask.getAddressFamily() ||
+ hicn_ip_prefix_.family != components.getAddressFamily() ||
+ hicn_ip_prefix_.family != content_name.getAddressFamily())
throw errors::RuntimeException(
- "Prefix, mask, components and content name are not of the same address "
- "family");
-
- ip_address_t mask_ip = mask.toIpAddress().address;
- ip_address_t component_ip = components.toIpAddress().address;
- ip_address_t name_ip = content_name.toIpAddress().address;
- const u8 *mask_ip_buffer = ip_address_get_buffer(&mask_ip, ip_prefix_.family);
+ "Prefix, mask, components and content name are not of the same"
+ "address family");
+
+ hicn_ip_address_t mask_ip = mask.toIpAddress().address;
+ hicn_ip_address_t component_ip = components.toIpAddress().address;
+ hicn_ip_address_t name_ip = content_name.toIpAddress().address;
+ const u8 *mask_ip_buffer =
+ hicn_ip_address_get_buffer(&mask_ip, hicn_ip_prefix_.family);
const u8 *component_ip_buffer =
- ip_address_get_buffer(&component_ip, ip_prefix_.family);
- u8 *name_ip_buffer =
- const_cast<u8 *>(ip_address_get_buffer(&name_ip, ip_prefix_.family));
+ hicn_ip_address_get_buffer(&component_ip, hicn_ip_prefix_.family);
+ u8 *name_ip_buffer = const_cast<u8 *>(
+ hicn_ip_address_get_buffer(&name_ip, hicn_ip_prefix_.family));
- int addr_len = ip_prefix_.family == AF_INET6 ? IPV6_ADDR_LEN : IPV4_ADDR_LEN;
+ int addr_len =
+ hicn_ip_prefix_.family == AF_INET6 ? IPV6_ADDR_LEN : IPV4_ADDR_LEN;
for (int i = 0; i < addr_len; i++) {
if (mask_ip_buffer[i]) {
@@ -211,108 +230,98 @@ Name Prefix::getName(const core::Name &mask, const core::Name &components,
}
}
- // if (this->contains(name_ip))
- // throw errors::RuntimeException("Mask overrides the prefix");
- return Name(ip_prefix_.family, (uint8_t *)&name_ip);
-}
-
-Name Prefix::getRandomName() const {
- ip_address_t name_ip = ip_prefix_.address;
- u8 *name_ip_buffer =
- const_cast<u8 *>(ip_address_get_buffer(&name_ip, ip_prefix_.family));
-
- int addr_len =
- (ip_prefix_.family == AF_INET6 ? IPV6_ADDR_LEN * 8 : IPV4_ADDR_LEN * 8) -
- ip_prefix_.len;
-
- size_t size = (size_t)ceil((float)addr_len / 8.0);
- uint8_t *buffer = (uint8_t *)malloc(sizeof(uint8_t) * size);
-
- RAND_bytes(buffer, (int)size);
-
- int j = 0;
- for (uint8_t i = (uint8_t)ceil((float)ip_prefix_.len / 8.0);
- i < (ip_prefix_.family == AF_INET6 ? IPV6_ADDR_LEN : IPV4_ADDR_LEN);
- i++) {
- name_ip_buffer[i] = buffer[j];
- j++;
- }
- free(buffer);
-
- return Name(ip_prefix_.family, (uint8_t *)&name_ip);
+ return Name(hicn_ip_prefix_.family, (uint8_t *)&name_ip);
}
/*
* Map a name in a different name prefix to this name prefix
*/
Name Prefix::mapName(const core::Name &content_name) const {
- if (ip_prefix_.family != content_name.getAddressFamily())
+ if (hicn_ip_prefix_.family != content_name.getAddressFamily())
throw errors::RuntimeException(
"Prefix content name are not of the same address "
"family");
- ip_address_t name_ip = content_name.toIpAddress().address;
- const u8 *ip_prefix_buffer =
- ip_address_get_buffer(&(ip_prefix_.address), ip_prefix_.family);
- u8 *name_ip_buffer =
- const_cast<u8 *>(ip_address_get_buffer(&name_ip, ip_prefix_.family));
-
- memcpy(name_ip_buffer, ip_prefix_buffer, ip_prefix_.len / 8);
-
- if (ip_prefix_.len != (ip_prefix_.family == AF_INET6 ? IPV6_ADDR_LEN_BITS
- : IPV4_ADDR_LEN_BITS)) {
- uint8_t mask = 0xFF >> (ip_prefix_.len % 8);
- name_ip_buffer[ip_prefix_.len / 8 + 1] =
- (name_ip_buffer[ip_prefix_.len / 8 + 1] & mask) |
- (ip_prefix_buffer[ip_prefix_.len / 8 + 1] & ~mask);
+ hicn_ip_address_t name_ip = content_name.toIpAddress().address;
+ const u8 *hicn_ip_prefix_buffer = hicn_ip_address_get_buffer(
+ &(hicn_ip_prefix_.address), hicn_ip_prefix_.family);
+ u8 *name_ip_buffer = const_cast<u8 *>(
+ hicn_ip_address_get_buffer(&name_ip, hicn_ip_prefix_.family));
+
+ memcpy(name_ip_buffer, hicn_ip_prefix_buffer, hicn_ip_prefix_.len / 8);
+
+ if (hicn_ip_prefix_.len != (hicn_ip_prefix_.family == AF_INET6
+ ? IPV6_ADDR_LEN_BITS
+ : IPV4_ADDR_LEN_BITS)) {
+ uint8_t mask = 0xFF >> (hicn_ip_prefix_.len % 8);
+ name_ip_buffer[hicn_ip_prefix_.len / 8 + 1] =
+ (name_ip_buffer[hicn_ip_prefix_.len / 8 + 1] & mask) |
+ (hicn_ip_prefix_buffer[hicn_ip_prefix_.len / 8 + 1] & ~mask);
}
- return Name(ip_prefix_.family, (uint8_t *)&name_ip);
+ return Name(hicn_ip_prefix_.family, (uint8_t *)&name_ip);
}
-Prefix &Prefix::setNetwork(std::string &network) {
- if (!inet_pton(AF_INET6, network.c_str(), ip_prefix_.address.v6.buffer)) {
+Prefix &Prefix::setNetwork(const std::string &network) {
+ if (hicn_ip_address_pton(network.c_str(), &hicn_ip_prefix_.address) < 0) {
throw errors::RuntimeException("The network name is not valid.");
}
return *this;
}
+Name Prefix::makeName() const { return makeNameWithIndex(0); }
+
Name Prefix::makeRandomName() const {
- srand((unsigned int)time(nullptr));
-
- if (ip_prefix_.family == AF_INET6) {
- std::default_random_engine eng((std::random_device())());
- std::uniform_int_distribution<uint32_t> idis(
- 0, std::numeric_limits<uint32_t>::max());
- uint64_t random_number = idis(eng);
-
- uint32_t hash_size_bits = IPV6_ADDR_LEN_BITS - ip_prefix_.len;
- uint64_t ip_address[2];
- memcpy(ip_address, ip_prefix_.address.v6.buffer, sizeof(uint64_t));
- memcpy(ip_address + 1, ip_prefix_.address.v6.buffer + 8, sizeof(uint64_t));
- std::string network(IPV6_ADDR_LEN * 3, 0);
-
- // Let's do the magic ;)
- int shift_size = hash_size_bits > sizeof(random_number) * 8
- ? sizeof(random_number) * 8
- : hash_size_bits;
-
- ip_address[1] >>= shift_size;
- ip_address[1] <<= shift_size;
-
- ip_address[1] |= random_number >> (sizeof(uint64_t) * 8 - shift_size);
-
- if (!inet_ntop(ip_prefix_.family, ip_address, (char *)network.c_str(),
- IPV6_ADDR_LEN * 3)) {
- throw errors::RuntimeException(
- "Impossible to retrieve network from ip address.");
- }
+ std::default_random_engine eng((std::random_device())());
+ std::uniform_int_distribution<uint32_t> idis(
+ 0, std::numeric_limits<uint32_t>::max());
+ uint64_t random_number = idis(eng);
+
+ return makeNameWithIndex(random_number);
+}
+
+Name Prefix::makeNameWithIndex(std::uint64_t index) const {
+ uint16_t prefix_length = getPrefixLength();
+
+ Name ret;
- return Name(network);
+ // Adjust prefix length depending on the address family
+ if (getAddressFamily() == AF_INET) {
+ // Sanity check
+ DCHECK(prefix_length <= 32);
+ // Convert prefix length to ip46_address_t prefix length
+ prefix_length += IPV4_ADDR_LEN_BITS * 3;
}
- return Name();
+ std::memcpy(ret.getStructReference().prefix.v6.as_u8,
+ hicn_ip_prefix_.address.v6.as_u8, sizeof(hicn_ip_address_t));
+
+ // Convert index in network byte order
+ index = portability::host_to_net(index);
+
+ // Apply mask
+ uint64_t mask;
+ if (prefix_length == 0) {
+ mask = 0;
+ } else if (prefix_length <= 64) {
+ mask = 0;
+ } else if (prefix_length == 128) {
+ mask = 0xffffffffffffffff;
+ } else {
+ prefix_length -= 64;
+ mask = portability::host_to_net((uint64_t)(~0) << (64 - prefix_length));
+ }
+
+ ret.getStructReference().prefix.v6.as_u64[1] &= mask;
+ // Eventually truncate index if too big
+ index &= ~mask;
+
+ // Apply index
+ ret.getStructReference().prefix.v6.as_u64[1] |= index;
+
+ // Done
+ return ret;
}
bool Prefix::checkPrefixLengthAndAddressFamily(uint16_t prefix_length,
@@ -332,7 +341,9 @@ bool Prefix::checkPrefixLengthAndAddressFamily(uint16_t prefix_length,
return true;
}
-const ip_prefix_t &Prefix::toIpPrefixStruct() const { return ip_prefix_; }
+const hicn_ip_prefix_t &Prefix::toIpPrefixStruct() const {
+ return hicn_ip_prefix_;
+}
} // namespace core
diff --git a/libtransport/src/core/tcp_socket_connector.cc b/libtransport/src/core/tcp_socket_connector.cc
index a30264271..7758e2cf2 100644
--- a/libtransport/src/core/tcp_socket_connector.cc
+++ b/libtransport/src/core/tcp_socket_connector.cc
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -85,8 +85,10 @@ void TcpSocketConnector::send(const uint8_t *packet, std::size_t len,
const PacketSentCallback &packet_sent) {
if (packet_sent != 0) {
asio::async_write(socket_, asio::buffer(packet, len),
- [packet_sent](std::error_code ec,
- std::size_t /*length*/) { packet_sent(); });
+ [packet_sent](const std::error_code &ec)
+ std::size_t /*length*/) {
+ packet_sent();
+ });
} else {
if (state_ == ConnectorState::CONNECTED) {
asio::write(socket_, asio::buffer(packet, len));
@@ -151,7 +153,7 @@ void TcpSocketConnector::doWrite() {
asio::async_write(
socket_, std::move(array),
- [this, packet_store = std::move(packet_store)](std::error_code ec,
+ [this, packet_store = std::move(packet_store)](const std::error_code &ec,
std::size_t length) {
if (TRANSPORT_EXPECT_TRUE(!ec)) {
if (!output_buffer_.empty()) {
@@ -172,7 +174,7 @@ void TcpSocketConnector::doReadBody(std::size_t body_length) {
asio::async_read(
socket_, asio::buffer(read_msg_->writableTail(), body_length),
asio::transfer_exactly(body_length),
- [this](std::error_code ec, std::size_t length) {
+ [this](const std::error_code &ec, std::size_t length) {
read_msg_->append(length);
if (TRANSPORT_EXPECT_TRUE(!ec)) {
receive_callback_(std::move(read_msg_));
@@ -195,7 +197,7 @@ void TcpSocketConnector::doReadHeader() {
asio::buffer(read_msg_->writableData(),
NetworkMessage::fixed_header_length),
asio::transfer_exactly(NetworkMessage::fixed_header_length),
- [this](std::error_code ec, std::size_t length) {
+ [this](const std::error_code &ec, std::size_t length) {
if (TRANSPORT_EXPECT_TRUE(!ec)) {
read_msg_->append(NetworkMessage::fixed_header_length);
std::size_t body_length = 0;
@@ -235,7 +237,7 @@ void TcpSocketConnector::tryReconnect() {
void TcpSocketConnector::doConnect() {
asio::async_connect(
socket_, endpoint_iterator_,
- [this](std::error_code ec, tcp::resolver::iterator) {
+ [this](const std::error_code &ec, tcp::resolver::iterator) {
if (!ec) {
timer_.cancel();
state_ = ConnectorState::CONNECTED;
diff --git a/libtransport/src/core/tcp_socket_connector.h b/libtransport/src/core/tcp_socket_connector.h
index 21db8301e..2901a5c9d 100644
--- a/libtransport/src/core/tcp_socket_connector.h
+++ b/libtransport/src/core/tcp_socket_connector.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
diff --git a/libtransport/src/core/udp_connector.cc b/libtransport/src/core/udp_connector.cc
new file mode 100644
index 000000000..5f620b1a8
--- /dev/null
+++ b/libtransport/src/core/udp_connector.cc
@@ -0,0 +1,371 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ */
+
+#include <core/errors.h>
+#include <core/udp_connector.h>
+#include <glog/logging.h>
+#include <hicn/transport/utils/branch_prediction.h>
+
+#include <iostream>
+#include <thread>
+#include <vector>
+
+namespace transport {
+namespace core {
+
+UdpTunnelConnector::~UdpTunnelConnector() {}
+
+void UdpTunnelConnector::connect(const std::string &hostname, uint16_t port,
+ const std::string &bind_address,
+ uint16_t bind_port) {
+ if (state_ == State::CLOSED) {
+ state_ = State::CONNECTING;
+
+ asio::ip::udp::resolver::query query(asio::ip::udp::v4(), hostname,
+ std::to_string(port));
+
+ endpoint_iterator_ = resolver_.resolve(query);
+ remote_endpoint_send_ = *endpoint_iterator_;
+ socket_->open(remote_endpoint_send_.protocol());
+
+ if (!bind_address.empty() && bind_port != 0) {
+ using namespace asio::ip;
+
+ auto address = address::from_string(bind_address);
+ if (address.is_v6()) {
+ std::error_code ec;
+ socket_->set_option(asio::ip::v6_only(false), ec);
+ // Call succeeds only on dual stack systems.
+ }
+
+ socket_->bind(udp::endpoint(address, bind_port));
+ }
+
+ remote_endpoint_ = Endpoint(remote_endpoint_send_);
+ local_endpoint_ = Endpoint(socket_->local_endpoint());
+
+ auto self = shared_from_this();
+ doConnect(self);
+ }
+}
+
+void UdpTunnelConnector::send(Packet &packet) {
+ send(packet.shared_from_this());
+}
+
+void UdpTunnelConnector::send(const utils::MemBuf::Ptr &buffer) {
+ auto self = shared_from_this();
+ io_service_.post([self, buffer]() {
+ bool write_in_progress = !self->output_buffer_.empty();
+ self->output_buffer_.push_back(std::move(buffer));
+ if (TRANSPORT_EXPECT_TRUE(self->state_ == State::CONNECTED)) {
+ if (!write_in_progress) {
+ self->doSendPacket(self);
+ }
+ } else {
+ self->data_available_ = true;
+ }
+ });
+}
+
+void UdpTunnelConnector::close() {
+ DLOG_IF(INFO, VLOG_IS_ON(2)) << "UDPTunnelConnector::close";
+ state_ = State::CLOSED;
+ bool is_socket_owned = socket_.use_count() == 1;
+ if (is_socket_owned) {
+ // Here we use a shared ptr to keep the object alive until we call the close
+ // function
+ auto self = shared_from_this();
+ io_service_.dispatch([this, self]() {
+ socket_->close();
+ // on_close_callback_(shared_from_this());
+ });
+ }
+}
+
+void UdpTunnelConnector::doSendPacket(
+ const std::shared_ptr<UdpTunnelConnector> &self) {
+#ifdef LINUX
+ send_timer_.expires_from_now(std::chrono::microseconds(50));
+ send_timer_.async_wait([self](const std::error_code &ec) {
+ if (ec) {
+ return;
+ }
+
+ self->writeHandler();
+ });
+#else
+ auto packet = output_buffer_.front().get();
+ auto array = std::vector<asio::const_buffer>();
+
+ const ::utils::MemBuf *current = packet;
+ do {
+ array.push_back(asio::const_buffer(current->data(), current->length()));
+ current = current->next();
+ } while (current != packet);
+
+ socket_->async_send(std::move(array), [this, self](const std::error_code &ec,
+ std::size_t length) {
+ if (TRANSPORT_EXPECT_TRUE(!ec)) {
+ sent_callback_(this, make_error_code(core_error::success));
+ } else if (ec.value() == static_cast<int>(std::errc::operation_canceled)) {
+ // The connection has been closed by the application.
+ return;
+ } else {
+ sendFailed();
+ sent_callback_(this, ec);
+ }
+
+ output_buffer_.pop_front();
+ if (!output_buffer_.empty()) {
+ doSendPacket(self);
+ }
+ });
+#endif
+}
+
+void UdpTunnelConnector::retryConnection() {
+ // The connection was refused. In this case let's retry to reconnect.
+ connection_reattempts_++;
+ LOG(ERROR) << "Error in UDP: Connection refused. Retrying...";
+ state_ = State::CONNECTING;
+ timer_.expires_from_now(std::chrono::milliseconds(500));
+ std::weak_ptr<UdpTunnelConnector> self = shared_from_this();
+ timer_.async_wait([self, this](const std::error_code &ec) {
+ if (ec) {
+ }
+ if (auto ptr = self.lock()) {
+ doConnect(ptr);
+ }
+ });
+ return;
+}
+
+#ifdef LINUX
+void UdpTunnelConnector::writeHandler() {
+ if (TRANSPORT_EXPECT_FALSE(state_ != State::CONNECTED)) {
+ return;
+ }
+
+ auto len = std::min(output_buffer_.size(), std::size_t(Connector::max_burst));
+
+ if (len) {
+ int m = 0;
+ for (auto &p : output_buffer_) {
+ auto packet = p.get();
+ ::utils::MemBuf *current = packet;
+ int b = 0;
+ do {
+ // array.push_back(asio::const_buffer(current->data(),
+ // current->length()));
+ tx_iovecs_[m][b].iov_base = current->writableData();
+ tx_iovecs_[m][b].iov_len = current->length();
+ current = current->next();
+ b++;
+ } while (current != packet);
+
+ tx_msgs_[m].msg_hdr.msg_iov = tx_iovecs_[m];
+ tx_msgs_[m].msg_hdr.msg_iovlen = b;
+ tx_msgs_[m].msg_hdr.msg_name = remote_endpoint_send_.data();
+ tx_msgs_[m].msg_hdr.msg_namelen = remote_endpoint_send_.size();
+ m++;
+
+ if (--len == 0) {
+ break;
+ }
+ }
+
+ int retval = sendmmsg(socket_->native_handle(), tx_msgs_, m, MSG_DONTWAIT);
+ if (retval > 0) {
+ while (retval--) {
+ output_buffer_.pop_front();
+ }
+ } else if (errno != EWOULDBLOCK && errno != EAGAIN) { // NOSONAR
+ LOG(ERROR) << "Error sending messages: " << strerror(errno);
+ sent_callback_(this, make_error_code(core_error::send_failed));
+ return;
+ }
+ }
+
+ if (!output_buffer_.empty()) {
+ send_timer_.expires_from_now(std::chrono::microseconds(50));
+ std::weak_ptr<UdpTunnelConnector> self = shared_from_this();
+ send_timer_.async_wait([self](const std::error_code &ec) {
+ if (ec) {
+ return;
+ }
+ if (auto ptr = self.lock()) {
+ ptr->writeHandler();
+ }
+ });
+ } else {
+ sent_callback_(this, make_error_code(core_error::success));
+ }
+}
+
+void UdpTunnelConnector::readHandler(const std::error_code &ec) {
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "UdpTunnelConnector receive packet";
+
+ if (TRANSPORT_EXPECT_TRUE(!ec)) {
+ if (TRANSPORT_EXPECT_TRUE(state_ == State::CONNECTED)) {
+ if (current_position_ == 0) {
+ for (int i = 0; i < max_burst; i++) {
+ auto read_buffer = getRawBuffer();
+ rx_iovecs_[i][0].iov_base = read_buffer.first;
+ rx_iovecs_[i][0].iov_len = read_buffer.second;
+ rx_msgs_[i].msg_hdr.msg_iov = rx_iovecs_[i];
+ rx_msgs_[i].msg_hdr.msg_iovlen = 1;
+ }
+ }
+
+ int res = recvmmsg(socket_->native_handle(), rx_msgs_ + current_position_,
+ max_burst - current_position_, MSG_DONTWAIT, nullptr);
+ if (res < 0) {
+ if (errno == EWOULDBLOCK || errno == EAGAIN) { // NOSONAR
+ // Try again later
+ return;
+ }
+
+ if (errno == ECONNREFUSED &&
+ connection_reattempts_ < max_reconnection_reattempts) {
+ retryConnection();
+ return;
+ }
+
+ LOG(ERROR) << "Error receiving messages! " << strerror(errno) << " "
+ << res;
+ std::vector<utils::MemBuf::Ptr> v;
+ auto ec = make_error_code(core_error::receive_failed);
+
+ receive_callback_(this, v, ec);
+ return;
+ }
+
+ std::vector<utils::MemBuf::Ptr> v;
+ v.reserve(res);
+ for (int i = 0; i < res; i++) {
+ auto packet = getPacketFromBuffer(
+ reinterpret_cast<uint8_t *>(
+ rx_msgs_[current_position_].msg_hdr.msg_iov[0].iov_base),
+ rx_msgs_[current_position_].msg_len);
+ receiveSuccess(*packet);
+ v.push_back(std::move(packet));
+ ++current_position_;
+ }
+
+ receive_callback_(this, v, make_error_code(core_error::success));
+
+ doRecvPacket();
+ } else {
+ LOG(ERROR) << "Error in UDP: Receiving packets from a not "
+ "connected socket.";
+ }
+ } else if (ec.value() == static_cast<int>(std::errc::operation_canceled)) {
+ LOG(ERROR) << "The connection has been closed by the application.";
+ return;
+ } else {
+ if (TRANSPORT_EXPECT_TRUE(state_ == State::CONNECTED)) {
+ // receive_callback_(this, *read_msg_, ec);
+ LOG(ERROR) << "Error in UDP connector: " << ec.value() << " "
+ << ec.message();
+ } else {
+ LOG(ERROR) << "Error in connector while not connected. " << ec.value()
+ << " " << ec.message();
+ }
+ }
+}
+#endif
+
+void UdpTunnelConnector::doRecvPacket() {
+ std::weak_ptr<UdpTunnelConnector> self = shared_from_this();
+#ifdef LINUX
+ if (state_ == State::CONNECTED) {
+#if ((ASIO_VERSION / 100 % 1000) < 11)
+ socket_->async_receive(asio::null_buffers(),
+#else
+ socket_->async_wait(asio::ip::tcp::socket::wait_read,
+#endif
+ [self](const std::error_code &ec) {
+ if (ec) {
+ LOG(ERROR)
+ << "Error in UDP connector: " << ec.value()
+ << " " << ec.message();
+ return;
+ }
+ if (auto ptr = self.lock()) {
+ ptr->readHandler(ec);
+ }
+ });
+ }
+#else
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "UdpTunnelConnector receive packet";
+ read_msg_ = getRawBuffer();
+ socket_->async_receive_from(
+ asio::buffer(read_msg_.first, read_msg_.second), remote_endpoint_recv_,
+ [this, self](const std::error_code &ec, std::size_t length) {
+ if (auto ptr = self.lock()) {
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "UdpTunnelConnector received packet length=" << length;
+ if (TRANSPORT_EXPECT_TRUE(!ec)) {
+ if (TRANSPORT_EXPECT_TRUE(state_ == State::CONNECTED)) {
+ auto packet = getPacketFromBuffer(read_msg_.first, length);
+ receiveSuccess(*packet);
+ std::vector<utils::MemBuf::Ptr> v{std::move(packet)};
+ receive_callback_(this, v, make_error_code(core_error::success));
+ doRecvPacket();
+ } else {
+ LOG(ERROR) << "Error in UDP: Receiving packets from a not "
+ "connected socket.";
+ }
+ } else if (ec.value() ==
+ static_cast<int>(std::errc::operation_canceled)) {
+ LOG(ERROR) << "The connection has been closed by the application.";
+ return;
+ } else if (ec.value() ==
+ static_cast<int>(std::errc::connection_refused)) {
+ if (connection_reattempts_ < max_reconnection_reattempts) {
+ retryConnection();
+ }
+ } else {
+ if (TRANSPORT_EXPECT_TRUE(state_ == State::CONNECTED)) {
+ LOG(ERROR) << "Error in UDP connector: " << ec.value()
+ << ec.message();
+ } else {
+ LOG(ERROR) << "Error while not connected";
+ }
+ }
+ }
+ });
+#endif
+}
+
+void UdpTunnelConnector::doConnect(
+ std::shared_ptr<UdpTunnelConnector> &self_shared) {
+ std::weak_ptr<UdpTunnelConnector> self = self_shared;
+ asio::async_connect(*socket_, endpoint_iterator_,
+ [this, self](const std::error_code &ec,
+ asio::ip::udp::resolver::iterator) {
+ if (auto ptr = self.lock()) {
+ if (!ec) {
+ state_ = State::CONNECTED;
+ doRecvPacket();
+
+ if (data_available_) {
+ data_available_ = false;
+ doSendPacket(ptr);
+ }
+
+ on_reconnect_callback_(
+ this, make_error_code(core_error::success));
+ } else {
+ LOG(ERROR) << "UDP Connection failed!!!";
+ retryConnection();
+ }
+ }
+ });
+}
+
+} // namespace core
+
+} // namespace transport
diff --git a/libtransport/src/core/udp_connector.h b/libtransport/src/core/udp_connector.h
new file mode 100644
index 000000000..002f4ca9f
--- /dev/null
+++ b/libtransport/src/core/udp_connector.h
@@ -0,0 +1,145 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ */
+
+#pragma once
+
+#include <core/errors.h>
+#include <hicn/transport/core/asio_wrapper.h>
+#include <hicn/transport/core/connector.h>
+#include <hicn/transport/portability/platform.h>
+
+#include <iostream>
+#include <memory>
+
+namespace transport {
+namespace core {
+
+class UdpTunnelListener;
+
+class UdpTunnelConnector : public Connector {
+ friend class UdpTunnelListener;
+
+ public:
+ template <typename ReceiveCallback, typename SentCallback, typename OnClose,
+ typename OnReconnect>
+ UdpTunnelConnector(asio::io_service &io_service,
+ ReceiveCallback &&receive_callback,
+ SentCallback &&packet_sent, OnClose &&on_close_callback,
+ OnReconnect &&on_reconnect)
+ : Connector(receive_callback, packet_sent, on_close_callback,
+ on_reconnect),
+ io_service_(io_service),
+ socket_(std::make_shared<asio::ip::udp::socket>(io_service_)),
+ resolver_(io_service_),
+ timer_(io_service_),
+#ifdef LINUX
+ send_timer_(io_service_),
+ tx_iovecs_{},
+ tx_msgs_{},
+ rx_iovecs_{},
+ rx_msgs_{},
+ current_position_(0),
+#else
+ read_msg_(nullptr, 0),
+#endif
+ data_available_(false) {
+ }
+
+ template <typename ReceiveCallback, typename SentCallback, typename OnClose,
+ typename OnReconnect, typename EndpointType>
+ UdpTunnelConnector(std::shared_ptr<asio::ip::udp::socket> &socket,
+ std::shared_ptr<asio::io_service::strand> &strand,
+ ReceiveCallback &&receive_callback,
+ SentCallback &&packet_sent, OnClose &&on_close_callback,
+ OnReconnect &&on_reconnect, EndpointType &&remote_endpoint)
+ : Connector(receive_callback, packet_sent, on_close_callback,
+ on_reconnect),
+#if ((ASIO_VERSION / 100 % 1000) < 12)
+ io_service_(socket->get_io_service()),
+#else
+ io_service_((asio::io_context &)(socket->get_executor().context())),
+#endif
+ socket_(socket),
+ resolver_(io_service_),
+ remote_endpoint_send_(std::forward<EndpointType>(remote_endpoint)),
+ timer_(io_service_),
+#ifdef LINUX
+ send_timer_(io_service_),
+ tx_iovecs_{},
+ tx_msgs_{},
+ rx_iovecs_{},
+ rx_msgs_{},
+ current_position_(0),
+#else
+ read_msg_(nullptr, 0),
+#endif
+ data_available_(false) {
+ if (socket_->is_open()) {
+ state_ = State::CONNECTED;
+ remote_endpoint_ = Endpoint(remote_endpoint_send_);
+ local_endpoint_ = socket_->local_endpoint();
+ }
+ }
+
+ ~UdpTunnelConnector() override;
+
+ void send(Packet &packet) override;
+
+ void send(const utils::MemBuf::Ptr &buffer) override;
+
+ void close() override;
+
+ void connect(const std::string &hostname, std::uint16_t port,
+ const std::string &bind_address = "",
+ std::uint16_t bind_port = 0);
+
+ auto shared_from_this() { return utils::shared_from(this); }
+
+ private:
+ void retryConnection();
+ void doConnect(std::shared_ptr<UdpTunnelConnector> &self);
+ void doRecvPacket();
+
+ void doRecvPacket(utils::MemBuf::Ptr &buffer) {
+ std::vector<utils::MemBuf::Ptr> v{std::move(buffer)};
+ receive_callback_(this, v, make_error_code(core_error::success));
+ }
+
+#ifdef LINUX
+ void readHandler(const std::error_code &ec);
+ void writeHandler();
+#endif
+
+ void setConnected() { state_ = State::CONNECTED; }
+
+ void doSendPacket(const std::shared_ptr<UdpTunnelConnector> &self);
+ void doClose();
+
+ private:
+ asio::io_service &io_service_;
+ std::shared_ptr<asio::ip::udp::socket> socket_;
+ asio::ip::udp::resolver resolver_;
+ asio::ip::udp::resolver::iterator endpoint_iterator_;
+ asio::ip::udp::endpoint remote_endpoint_send_;
+ asio::ip::udp::endpoint remote_endpoint_recv_;
+
+ asio::steady_timer timer_;
+
+#ifdef LINUX
+ asio::steady_timer send_timer_;
+ struct iovec tx_iovecs_[max_burst][8];
+ struct mmsghdr tx_msgs_[max_burst];
+ struct iovec rx_iovecs_[max_burst][8];
+ struct mmsghdr rx_msgs_[max_burst];
+ std::uint8_t current_position_;
+#else
+ std::pair<uint8_t *, std::size_t> read_msg_;
+#endif
+
+ bool data_available_;
+};
+
+} // namespace core
+
+} // namespace transport
diff --git a/libtransport/src/core/udp_listener.cc b/libtransport/src/core/udp_listener.cc
new file mode 100644
index 000000000..caa97e0ee
--- /dev/null
+++ b/libtransport/src/core/udp_listener.cc
@@ -0,0 +1,182 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ */
+
+#include <core/udp_connector.h>
+#include <core/udp_listener.h>
+#include <glog/logging.h>
+#include <hicn/transport/portability/endianess.h>
+#include <hicn/transport/utils/hash.h>
+
+#ifndef LINUX
+namespace std {
+size_t hash<asio::ip::udp::endpoint>::operator()(
+ const asio::ip::udp::endpoint &endpoint) const {
+ auto hash_ip = endpoint.address().is_v4()
+ ? endpoint.address().to_v4().to_ulong()
+ : utils::hash::fnv32_buf(
+ endpoint.address().to_v6().to_bytes().data(), 16);
+ uint16_t port = endpoint.port();
+ return utils::hash::fnv32_buf(&port, 2, (unsigned int)hash_ip);
+}
+} // namespace std
+#endif
+
+namespace transport {
+namespace core {
+
+UdpTunnelListener::~UdpTunnelListener() {}
+
+void UdpTunnelListener::close() {
+ strand_->post([this]() {
+ if (socket_->is_open()) {
+ socket_->close();
+ }
+ });
+}
+
+#ifdef LINUX
+void UdpTunnelListener::readHandler(const std::error_code &ec) {
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "UdpTunnelConnector receive packet";
+
+ if (TRANSPORT_EXPECT_TRUE(!ec)) {
+ if (current_position_ == 0) {
+ for (int i = 0; i < Connector::max_burst; i++) {
+ auto read_buffer = Connector::getRawBuffer();
+ iovecs_[i][0].iov_base = read_buffer.first;
+ iovecs_[i][0].iov_len = read_buffer.second;
+ msgs_[i].msg_hdr.msg_iov = iovecs_[i];
+ msgs_[i].msg_hdr.msg_iovlen = 1;
+ msgs_[i].msg_hdr.msg_name = &remote_endpoints_[i];
+ msgs_[i].msg_hdr.msg_namelen = sizeof(remote_endpoints_[i]);
+ }
+ }
+
+ int res = recvmmsg(socket_->native_handle(), msgs_ + current_position_,
+ Connector::max_burst - current_position_, MSG_DONTWAIT,
+ nullptr);
+ if (res < 0) {
+ LOG(ERROR) << "Error in recvmmsg.";
+ return;
+ }
+
+ for (int i = 0; i < res; i++) {
+ auto packet = Connector::getPacketFromBuffer(
+ reinterpret_cast<uint8_t *>(
+ msgs_[current_position_].msg_hdr.msg_iov[0].iov_base),
+ msgs_[current_position_].msg_len);
+ auto connector_id =
+ utils::hash::fnv64_buf(msgs_[current_position_].msg_hdr.msg_name,
+ msgs_[current_position_].msg_hdr.msg_namelen);
+
+ auto connector = connectors_.find(connector_id);
+ if (connector == connectors_.end()) {
+ // Create new connector corresponding to new client
+
+ /*
+ * Get the remote endpoint for this particular message
+ */
+ using namespace asio::ip;
+ if (local_endpoint_.address().is_v4()) {
+ auto addr = reinterpret_cast<struct sockaddr_in *>(
+ &remote_endpoints_[current_position_]);
+ address_v4::bytes_type address_bytes;
+ std::copy_n(reinterpret_cast<uint8_t *>(&addr->sin_addr),
+ address_bytes.size(), address_bytes.begin());
+ address_v4 address(address_bytes);
+ remote_endpoint_ =
+ udp::endpoint(address, portability::net_to_host(addr->sin_port));
+ } else {
+ auto addr = reinterpret_cast<struct sockaddr_in6 *>(
+ &remote_endpoints_[current_position_]);
+ address_v6::bytes_type address_bytes;
+ std::copy_n(reinterpret_cast<uint8_t *>(&addr->sin6_addr),
+ address_bytes.size(), address_bytes.begin());
+ address_v6 address(address_bytes);
+ remote_endpoint_ =
+ udp::endpoint(address, portability::net_to_host(addr->sin6_port));
+ }
+
+ /**
+ * Create new connector sharing the same socket of this listener.
+ */
+ auto ret = connectors_.emplace(
+ connector_id,
+ std::make_shared<UdpTunnelConnector>(
+ socket_, strand_, receive_callback_,
+ [](Connector *, const std::error_code &) {}, [](Connector *) {},
+ [](Connector *, const std::error_code &) {},
+ std::move(remote_endpoint_)));
+ connector = ret.first;
+ connector->second->setConnectorId(connector_id);
+ }
+
+ /**
+ * Use connector callback to process incoming message.
+ */
+ UdpTunnelConnector *c =
+ dynamic_cast<UdpTunnelConnector *>(connector->second.get());
+ c->doRecvPacket(packet);
+
+ ++current_position_;
+ }
+
+ doRecvPacket();
+ } else if (ec.value() == static_cast<int>(std::errc::operation_canceled)) {
+ LOG(ERROR) << "The connection has been closed by the application.";
+ return;
+ } else {
+ LOG(ERROR) << ec.value() << " " << ec.message();
+ }
+}
+#endif
+
+void UdpTunnelListener::doRecvPacket() {
+#ifdef LINUX
+#if ((ASIO_VERSION / 100 % 1000) < 11)
+ socket_->async_receive(
+ asio::null_buffers(),
+#else
+ socket_->async_wait(
+ asio::ip::tcp::socket::wait_read,
+#endif
+ std::bind(&UdpTunnelListener::readHandler, this, std::placeholders::_1));
+#else
+ read_msg_ = Connector::getRawBuffer();
+ socket_->async_receive_from(
+ asio::buffer(read_msg_.first, read_msg_.second), remote_endpoint_,
+ [this](const std::error_code &ec, std::size_t length) {
+ if (TRANSPORT_EXPECT_TRUE(!ec)) {
+ auto packet = Connector::getPacketFromBuffer(read_msg_.first, length);
+ auto connector_id =
+ std::hash<asio::ip::udp::endpoint>{}(remote_endpoint_);
+ auto connector = connectors_.find(connector_id);
+ if (connector == connectors_.end()) {
+ // Create new connector corresponding to new client
+ auto ret = connectors_.emplace(
+ connector_id, std::make_shared<UdpTunnelConnector>(
+ socket_, strand_, receive_callback_,
+ [](Connector *, const std::error_code &) {},
+ [](Connector *) {},
+ [](Connector *, const std::error_code &) {},
+ std::move(remote_endpoint_)));
+ connector = ret.first;
+ connector->second->setConnectorId(connector_id);
+ }
+
+ UdpTunnelConnector *c =
+ dynamic_cast<UdpTunnelConnector *>(connector->second.get());
+ c->doRecvPacket(packet);
+ doRecvPacket();
+ } else if (ec.value() ==
+ static_cast<int>(std::errc::operation_canceled)) {
+ LOG(ERROR) << "The connection has been closed by the application.";
+ return;
+ } else {
+ LOG(ERROR) << ec.value() << " " << ec.message();
+ }
+ });
+#endif
+}
+} // namespace core
+} // namespace transport \ No newline at end of file
diff --git a/libtransport/src/core/udp_listener.h b/libtransport/src/core/udp_listener.h
new file mode 100644
index 000000000..d8095a262
--- /dev/null
+++ b/libtransport/src/core/udp_listener.h
@@ -0,0 +1,109 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ */
+
+#pragma once
+
+#include <hicn/transport/core/asio_wrapper.h>
+#include <hicn/transport/core/connector.h>
+#include <hicn/transport/portability/platform.h>
+
+#include <unordered_map>
+
+namespace std {
+template <>
+struct hash<asio::ip::udp::endpoint> {
+ size_t operator()(const asio::ip::udp::endpoint &endpoint) const;
+};
+} // namespace std
+
+namespace transport {
+namespace core {
+
+class UdpTunnelListener
+ : public std::enable_shared_from_this<UdpTunnelListener> {
+ using PacketReceivedCallback = Connector::PacketReceivedCallback;
+ using EndpointId = std::pair<uint32_t, uint16_t>;
+
+ static constexpr uint16_t default_port = 5004;
+
+ public:
+ using Ptr = std::shared_ptr<UdpTunnelListener>;
+
+ template <typename ReceiveCallback>
+ UdpTunnelListener(asio::io_service &io_service,
+ ReceiveCallback &&receive_callback,
+ asio::ip::udp::endpoint endpoint = asio::ip::udp::endpoint(
+ asio::ip::udp::v4(), default_port))
+ : io_service_(io_service),
+ strand_(std::make_shared<asio::io_service::strand>(io_service_)),
+ socket_(std::make_shared<asio::ip::udp::socket>(io_service_,
+ endpoint.protocol())),
+ local_endpoint_(endpoint),
+ receive_callback_(std::forward<ReceiveCallback>(receive_callback)),
+#ifndef LINUX
+ read_msg_(nullptr, 0)
+#else
+ iovecs_{},
+ msgs_{},
+ current_position_(0)
+#endif
+ {
+ if (endpoint.protocol() == asio::ip::udp::v6()) {
+ std::error_code ec;
+ socket_->set_option(asio::ip::v6_only(false), ec);
+ // Call succeeds only on dual stack systems.
+ }
+ socket_->bind(local_endpoint_);
+ io_service_.post(std::bind(&UdpTunnelListener::doRecvPacket, this));
+ }
+
+ ~UdpTunnelListener();
+
+ void close();
+
+ int deleteConnector(Connector *connector) {
+ return (int)connectors_.erase(connector->getConnectorId());
+ }
+
+ template <typename ReceiveCallback>
+ void setReceiveCallback(ReceiveCallback &&callback) {
+ receive_callback_ = std::forward<ReceiveCallback>(callback);
+ }
+
+ Connector *findConnector(Connector::Id connId) {
+ auto it = connectors_.find(connId);
+ if (it != connectors_.end()) {
+ return it->second.get();
+ }
+
+ return nullptr;
+ }
+
+ private:
+ void doRecvPacket();
+
+ void readHandler(const std::error_code &ec);
+
+ asio::io_service &io_service_;
+ std::shared_ptr<asio::io_service::strand> strand_;
+ std::shared_ptr<asio::ip::udp::socket> socket_;
+ asio::ip::udp::endpoint local_endpoint_;
+ asio::ip::udp::endpoint remote_endpoint_;
+ std::unordered_map<Connector::Id, std::shared_ptr<Connector>> connectors_;
+
+ PacketReceivedCallback receive_callback_;
+
+#ifdef LINUX
+ struct iovec iovecs_[Connector::max_burst][8];
+ struct mmsghdr msgs_[Connector::max_burst];
+ struct sockaddr_storage remote_endpoints_[Connector::max_burst];
+ std::uint8_t current_position_;
+#else
+ std::pair<uint8_t *, std::size_t> read_msg_;
+#endif
+};
+
+} // namespace core
+
+} // namespace transport