aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/core
diff options
context:
space:
mode:
authorLuca Muscariello <lumuscar@cisco.com>2022-03-30 22:29:28 +0200
committerMauro Sardara <msardara@cisco.com>2022-03-31 19:51:47 +0200
commitc46e5df56b67bb8ea7a068d39324c640084ead2b (patch)
treeeddeb17785938e09bc42eec98ee09b8a28846de6 /libtransport/src/core
parent18fa668f25d3cc5463417ce7df6637e31578e898 (diff)
feat: boostrap hicn 22.02
The current patch provides several new features, improvements, bug fixes and also complete rewrite of entire components. - lib The hicn packet parser has been improved with a new packet format fully based on UDP. The TCP header is still temporarily supported but the UDP header will replace completely the new hicn packet format. Improvements have been made to make sure every packet parsing operation is made via this library. The current new header can be used as header between the payload and the UDP header or as trailer in the UDP surplus area to be tested when UDP options will start to be used. - hicn-light The portable packet forwarder has been completely rewritten from scratch with the twofold objective to improve performance and code size but also to drop dependencies such as libparc which is now removed by the current implementation. - hicn control the control library is the agent that is used to program the packet forwarders via their binary API. This component has benefited from significant improvements in terms of interaction model which is now event driven and more robust to failures. - VPP plugin has been updated to support VPP 22.02 - transport Major improvement have been made to the RTC protocol, to the support of IO modules and to the security sub system. Signed manifests are the default data authenticity and integrity framework. Confidentiality can be enabled by sharing the encryption key to the prod/cons layer. The library has been tested with group key based applications such as broadcast/multicast and real-time on-line meetings with trusted server keys or MLS. - testing Unit testing has been introduced using GoogleTest. One third of the code base is covered by unit testing with priority on critical features. Functional testing has also been introduce using Docker, linux bridging and Robot Framework to define test with Less Code techniques to facilitate the extension of the coverage. Co-authored-by: Mauro Sardara <msardara@cisco.com> Co-authored-by: Jordan Augé <jordan.auge+fdio@cisco.com> Co-authored-by: Michele Papalini <micpapal@cisco.com> Co-authored-by: Angelo Mantellini <manangel@cisco.com> Co-authored-by: Jacques Samain <jsamain@cisco.com> Co-authored-by: Olivier Roques <oroques+fdio@cisco.com> Co-authored-by: Enrico Loparco <eloparco@cisco.com> Co-authored-by: Giulio Grassi <gigrassi@cisco.com> Change-Id: I75d0ef70f86d921e3ef503c99271216ff583c215 Signed-off-by: Luca Muscariello <muscariello@ieee.org> Signed-off-by: Mauro Sardara <msardara@cisco.com>
Diffstat (limited to 'libtransport/src/core')
-rw-r--r--libtransport/src/core/CMakeLists.txt19
-rw-r--r--libtransport/src/core/content_object.cc39
-rw-r--r--libtransport/src/core/errors.cc11
-rw-r--r--libtransport/src/core/errors.h7
-rw-r--r--libtransport/src/core/facade.h2
-rw-r--r--libtransport/src/core/global_workers.h43
-rw-r--r--libtransport/src/core/interest.cc19
-rw-r--r--libtransport/src/core/io_module.cc9
-rw-r--r--libtransport/src/core/local_connector.cc12
-rw-r--r--libtransport/src/core/local_connector.h4
-rw-r--r--libtransport/src/core/manifest.cc2
-rw-r--r--libtransport/src/core/manifest.h74
-rw-r--r--libtransport/src/core/manifest_format.h108
-rw-r--r--libtransport/src/core/manifest_format_fixed.cc318
-rw-r--r--libtransport/src/core/manifest_format_fixed.h228
-rw-r--r--libtransport/src/core/manifest_inline.h42
-rw-r--r--libtransport/src/core/memif_connector.cc497
-rw-r--r--libtransport/src/core/memif_connector.h154
-rw-r--r--libtransport/src/core/name.cc60
-rw-r--r--libtransport/src/core/packet.cc618
-rw-r--r--libtransport/src/core/pending_interest.cc74
-rw-r--r--libtransport/src/core/pending_interest.h26
-rw-r--r--libtransport/src/core/portal.cc10
-rw-r--r--libtransport/src/core/portal.h550
-rw-r--r--libtransport/src/core/prefix.cc14
-rw-r--r--libtransport/src/core/tcp_socket_connector.cc16
-rw-r--r--libtransport/src/core/tcp_socket_connector.h2
-rw-r--r--libtransport/src/core/udp_connector.cc371
-rw-r--r--libtransport/src/core/udp_connector.h145
-rw-r--r--libtransport/src/core/udp_listener.cc179
-rw-r--r--libtransport/src/core/udp_listener.h109
31 files changed, 2740 insertions, 1022 deletions
diff --git a/libtransport/src/core/CMakeLists.txt b/libtransport/src/core/CMakeLists.txt
index e442bb863..b9b024d60 100644
--- a/libtransport/src/core/CMakeLists.txt
+++ b/libtransport/src/core/CMakeLists.txt
@@ -1,4 +1,4 @@
-# Copyright (c) 2017-2019 Cisco and/or its affiliates.
+# Copyright (c) 2021 Cisco and/or its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
@@ -22,6 +22,9 @@ list(APPEND HEADER_FILES
${CMAKE_CURRENT_SOURCE_DIR}/errors.h
${CMAKE_CURRENT_SOURCE_DIR}/global_configuration.h
${CMAKE_CURRENT_SOURCE_DIR}/local_connector.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/global_workers.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/udp_connector.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/udp_listener.h
)
list(APPEND SOURCE_FILES
@@ -36,7 +39,21 @@ list(APPEND SOURCE_FILES
${CMAKE_CURRENT_SOURCE_DIR}/global_configuration.cc
${CMAKE_CURRENT_SOURCE_DIR}/io_module.cc
${CMAKE_CURRENT_SOURCE_DIR}/local_connector.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/udp_connector.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/udp_listener.cc
)
+if (NOT ${CMAKE_SYSTEM_NAME} MATCHES Android)
+ if (UNIX AND NOT APPLE)
+ list(APPEND SOURCE_FILES
+ ${CMAKE_CURRENT_SOURCE_DIR}/memif_connector.cc
+ )
+
+ list(APPEND HEADER_FILES
+ ${CMAKE_CURRENT_SOURCE_DIR}/memif_connector.h
+ )
+ endif()
+endif()
+
set(SOURCE_FILES ${SOURCE_FILES} PARENT_SCOPE)
set(HEADER_FILES ${HEADER_FILES} PARENT_SCOPE) \ No newline at end of file
diff --git a/libtransport/src/core/content_object.cc b/libtransport/src/core/content_object.cc
index 411494fdf..643e0388e 100644
--- a/libtransport/src/core/content_object.cc
+++ b/libtransport/src/core/content_object.cc
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -35,6 +35,11 @@ namespace core {
ContentObject::ContentObject(const Name &name, Packet::Format format,
std::size_t additional_header_size)
: Packet(format, additional_header_size) {
+ if (TRANSPORT_EXPECT_FALSE(hicn_packet_set_data(format_, packet_start_) <
+ 0)) {
+ throw errors::MalformedPacketException();
+ }
+
if (TRANSPORT_EXPECT_FALSE(
hicn_data_set_name(format, packet_start_, &name.name_) < 0)) {
throw errors::RuntimeException("Error filling the packet name.");
@@ -47,15 +52,20 @@ ContentObject::ContentObject(const Name &name, Packet::Format format,
}
}
-#ifdef __ANDROID__
ContentObject::ContentObject(hicn_format_t format,
std::size_t additional_header_size)
- : ContentObject(Name("0::0|0"), format, additional_header_size) {}
+ : ContentObject(
+#ifdef __ANDROID__
+ Name("0::0|0"),
#else
-ContentObject::ContentObject(hicn_format_t format,
- std::size_t additional_header_size)
- : ContentObject(Packet::base_name, format, additional_header_size) {}
+ Packet::base_name,
#endif
+ format, additional_header_size) {
+ if (TRANSPORT_EXPECT_FALSE(hicn_packet_set_data(format_, packet_start_) <
+ 0)) {
+ throw errors::MalformedPacketException();
+ }
+}
ContentObject::ContentObject(const Name &name, hicn_format_t format,
std::size_t additional_header_size,
@@ -167,6 +177,23 @@ void ContentObject::resetForHash() {
}
}
+bool ContentObject::isLast() const {
+ int is_last = 0;
+ if (hicn_data_is_last(format_, packet_start_, &is_last) < 0) {
+ throw errors::RuntimeException(
+ "Impossible to get last data flag from packet header.");
+ }
+
+ return is_last;
+}
+
+void ContentObject::setLast() {
+ if (hicn_data_set_last(format_, packet_start_) < 0) {
+ throw errors::RuntimeException(
+ "Impossible to set last data flag to packet header.");
+ }
+}
+
} // end namespace core
} // end namespace transport
diff --git a/libtransport/src/core/errors.cc b/libtransport/src/core/errors.cc
index 82647a60b..68fd7bf38 100644
--- a/libtransport/src/core/errors.cc
+++ b/libtransport/src/core/errors.cc
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -39,6 +39,15 @@ std::string core_category_impl::message(int ev) const {
case core_error::configuration_not_applied: {
return "Configuration was not applied due to wrong parameters.";
}
+ case core_error::send_failed: {
+ return "Error sending data to socket.";
+ }
+ case core_error::send_buffer_allocation_failed: {
+ return "Error allocating buffers to send data.";
+ }
+ case core_error::receive_failed: {
+ return "Error receiving data from socket.";
+ }
default: {
return "Unknown core error";
}
diff --git a/libtransport/src/core/errors.h b/libtransport/src/core/errors.h
index a46f1dbcd..4532e6dc5 100644
--- a/libtransport/src/core/errors.h
+++ b/libtransport/src/core/errors.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -36,7 +36,10 @@ const std::error_category& core_category();
enum class core_error {
success = 0,
configuration_parse_failed,
- configuration_not_applied
+ configuration_not_applied,
+ send_failed,
+ send_buffer_allocation_failed,
+ receive_failed
};
/**
diff --git a/libtransport/src/core/facade.h b/libtransport/src/core/facade.h
index 199081271..1ad4437e2 100644
--- a/libtransport/src/core/facade.h
+++ b/libtransport/src/core/facade.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
diff --git a/libtransport/src/core/global_workers.h b/libtransport/src/core/global_workers.h
new file mode 100644
index 000000000..1ac254188
--- /dev/null
+++ b/libtransport/src/core/global_workers.h
@@ -0,0 +1,43 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/utils/singleton.h>
+#include <hicn/transport/utils/thread_pool.h>
+
+#include <atomic>
+#include <mutex>
+
+namespace transport {
+namespace core {
+
+class GlobalWorkers : public utils::Singleton<GlobalWorkers> {
+ public:
+ friend class utils::Singleton<GlobalWorkers>;
+
+ ::utils::EventThread& getWorker() {
+ return thread_pool_.getWorker(counter_++ % thread_pool_.getNThreads());
+ }
+
+ private:
+ GlobalWorkers() : counter_(0), thread_pool_() {}
+
+ std::atomic_uint16_t counter_;
+ ::utils::ThreadPool thread_pool_;
+};
+
+} // namespace core
+} // namespace transport \ No newline at end of file
diff --git a/libtransport/src/core/interest.cc b/libtransport/src/core/interest.cc
index 9d868ced0..b7719b3ed 100644
--- a/libtransport/src/core/interest.cc
+++ b/libtransport/src/core/interest.cc
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -34,6 +34,10 @@ namespace core {
Interest::Interest(const Name &interest_name, Packet::Format format,
std::size_t additional_header_size)
: Packet(format, additional_header_size) {
+ if (hicn_packet_set_interest(format_, packet_start_) < 0) {
+ throw errors::MalformedPacketException();
+ }
+
if (hicn_interest_set_name(format_, packet_start_,
interest_name.getConstStructReference()) < 0) {
throw errors::MalformedPacketException();
@@ -45,13 +49,18 @@ Interest::Interest(const Name &interest_name, Packet::Format format,
}
}
-#ifdef __ANDROID__
Interest::Interest(hicn_format_t format, std::size_t additional_header_size)
- : Interest(Name("0::0|0"), format, additional_header_size) {}
+ : Interest(
+#ifdef __ANDROID__
+ Name("0::0|0"),
#else
-Interest::Interest(hicn_format_t format, std::size_t additional_header_size)
- : Interest(base_name, format, additional_header_size) {}
+ base_name,
#endif
+ format, additional_header_size) {
+ if (hicn_packet_set_interest(format_, packet_start_) < 0) {
+ throw errors::MalformedPacketException();
+ }
+}
Interest::Interest(MemBuf &&buffer) : Packet(std::move(buffer)) {
if (hicn_interest_get_name(format_, packet_start_,
diff --git a/libtransport/src/core/io_module.cc b/libtransport/src/core/io_module.cc
index a751eabf5..69e4e8bcf 100644
--- a/libtransport/src/core/io_module.cc
+++ b/libtransport/src/core/io_module.cc
@@ -19,8 +19,10 @@
#include <glog/logging.h>
#include <hicn/transport/core/io_module.h>
+#include <iostream>
+
#ifdef ANDROID
-#include <io_modules/udp/hicn_forwarder_module.h>
+#include <io_modules/hicn-light-ng/hicn_forwarder_module.h>
#elif _WIN32
#include <hicn/util/windows/windows_utils.h>
#endif
@@ -55,8 +57,9 @@ IoModule *IoModule::load(const char *module_name) {
if (!creator) {
if ((error = dlerror()) != 0) {
LOG(ERROR) << error;
- return 0;
}
+
+ return 0;
}
// create object and return it
@@ -85,4 +88,4 @@ bool IoModule::unload(IoModule *module) {
}
} // namespace core
-} // namespace transport \ No newline at end of file
+} // namespace transport
diff --git a/libtransport/src/core/local_connector.cc b/libtransport/src/core/local_connector.cc
index 50dadc677..f27be2e5c 100644
--- a/libtransport/src/core/local_connector.cc
+++ b/libtransport/src/core/local_connector.cc
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2020 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -32,13 +32,17 @@ void LocalConnector::send(Packet &packet) {
return;
}
+ auto buffer =
+ std::static_pointer_cast<utils::MemBuf>(packet.shared_from_this());
+
DLOG_IF(INFO, VLOG_IS_ON(3)) << "Sending packet to local socket.";
- io_service_.get().post([this, p{packet.shared_from_this()}]() mutable {
- receive_callback_(this, *p, std::make_error_code(std::errc(0)));
+ io_service_.get().post([this, buffer]() mutable {
+ std::vector<utils::MemBuf::Ptr> v{std::move(buffer)};
+ receive_callback_(this, v, std::make_error_code(std::errc(0)));
});
}
-void LocalConnector::send(const uint8_t *packet, std::size_t len) {
+void LocalConnector::send(const utils::MemBuf::Ptr &buffer) {
throw errors::NotImplementedException();
}
diff --git a/libtransport/src/core/local_connector.h b/libtransport/src/core/local_connector.h
index 0e2d8f676..eede89e74 100644
--- a/libtransport/src/core/local_connector.h
+++ b/libtransport/src/core/local_connector.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2020 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -42,7 +42,7 @@ class LocalConnector : public Connector {
void send(Packet &packet) override;
- void send(const uint8_t *packet, std::size_t len) override;
+ void send(const utils::MemBuf::Ptr &buffer) override;
void close() override;
diff --git a/libtransport/src/core/manifest.cc b/libtransport/src/core/manifest.cc
index 3f890f3d0..da2689426 100644
--- a/libtransport/src/core/manifest.cc
+++ b/libtransport/src/core/manifest.cc
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
diff --git a/libtransport/src/core/manifest.h b/libtransport/src/core/manifest.h
index 9b25ebd67..5bdbfc6ff 100644
--- a/libtransport/src/core/manifest.h
+++ b/libtransport/src/core/manifest.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -16,6 +16,7 @@
#pragma once
#include <core/manifest_format.h>
+#include <glog/logging.h>
#include <hicn/transport/core/content_object.h>
#include <hicn/transport/core/name.h>
@@ -40,17 +41,20 @@ class Manifest : public Base {
using Encoder = typename FormatTraits::Encoder;
using Decoder = typename FormatTraits::Decoder;
- Manifest(std::size_t signature_size = 0)
- : Base(HF_INET6_TCP_AH, signature_size),
+ Manifest(Packet::Format format, std::size_t signature_size = 0)
+ : Base(format, signature_size),
encoder_(*this, signature_size),
decoder_(*this) {
+ DCHECK(_is_ah(format));
Base::setPayloadType(PayloadType::MANIFEST);
}
- Manifest(const core::Name &name, std::size_t signature_size = 0)
- : Base(name, HF_INET6_TCP_AH, signature_size),
+ Manifest(Packet::Format format, const core::Name &name,
+ std::size_t signature_size = 0)
+ : Base(name, format, signature_size),
encoder_(*this, signature_size),
decoder_(*this) {
+ DCHECK(_is_ah(format));
Base::setPayloadType(PayloadType::MANIFEST);
}
@@ -62,6 +66,11 @@ class Manifest : public Base {
Base::setPayloadType(PayloadType::MANIFEST);
}
+ // Useful for decoding manifests while avoiding packet copy
+ template <typename T>
+ Manifest(T &base)
+ : Base(base.getFormat()), encoder_(base, 0, false), decoder_(base) {}
+
virtual ~Manifest() = default;
std::size_t estimateManifestSize(std::size_t additional_entries = 0) {
@@ -78,24 +87,27 @@ class Manifest : public Base {
Manifest &decode() {
Manifest::decoder_.decode();
- manifest_type_ = decoder_.getManifestType();
+ manifest_type_ = decoder_.getType();
+ manifest_transport_type_ = decoder_.getTransportType();
hash_algorithm_ = decoder_.getHashAlgorithm();
- is_last_ = decoder_.getIsFinalManifest();
+ is_last_ = decoder_.getIsLast();
return static_cast<ManifestImpl &>(*this).decodeImpl();
}
- static std::size_t getManifestHeaderSize() {
- return Encoder::getManifestHeaderSize();
+ static std::size_t manifestHeaderSize(
+ interface::ProductionProtocolAlgorithms transport_type =
+ interface::ProductionProtocolAlgorithms::UNKNOWN) {
+ return Encoder::manifestHeaderSize(transport_type);
}
- static std::size_t getManifestEntrySize() {
- return Encoder::getManifestEntrySize();
+ static std::size_t manifestEntrySize() {
+ return Encoder::manifestEntrySize();
}
- Manifest &setManifestType(ManifestType type) {
+ Manifest &setType(ManifestType type) {
manifest_type_ = type;
- encoder_.setManifestType(manifest_type_);
+ encoder_.setType(manifest_type_);
return *this;
}
@@ -105,31 +117,46 @@ class Manifest : public Base {
return *this;
}
- auth::CryptoHashType getHashAlgorithm() { return hash_algorithm_; }
+ auth::CryptoHashType getHashAlgorithm() const { return hash_algorithm_; }
- ManifestType getManifestType() const { return manifest_type_; }
+ ManifestType getType() const { return manifest_type_; }
+
+ interface::ProductionProtocolAlgorithms getTransportType() const {
+ return manifest_transport_type_;
+ }
- bool isFinalManifest() const { return is_last_; }
+ bool getIsLast() const { return is_last_; }
Manifest &setVersion(ManifestVersion version) {
encoder_.setVersion(version);
return *this;
}
- Manifest &setFinalBlockNumber(std::uint32_t final_block_number) {
- encoder_.setFinalBlockNumber(final_block_number);
+ Manifest &setParamsBytestream(const ParamsBytestream &params) {
+ manifest_transport_type_ =
+ interface::ProductionProtocolAlgorithms::BYTE_STREAM;
+ encoder_.setParamsBytestream(params);
return *this;
}
- uint32_t getFinalBlockNumber() const {
- return decoder_.getFinalBlockNumber();
+ Manifest &setParamsRTC(const ParamsRTC &params) {
+ manifest_transport_type_ =
+ interface::ProductionProtocolAlgorithms::RTC_PROD;
+ encoder_.setParamsRTC(params);
+ return *this;
+ }
+
+ ParamsBytestream getParamsBytestream() const {
+ return decoder_.getParamsBytestream();
}
+ ParamsRTC getParamsRTC() const { return decoder_.getParamsRTC(); }
+
ManifestVersion getVersion() const { return decoder_.getVersion(); }
- Manifest &setFinalManifest(bool is_final_manifest) {
- encoder_.setIsFinalManifest(is_final_manifest);
- is_last_ = is_final_manifest;
+ Manifest &setIsLast(bool is_last) {
+ encoder_.setIsLast(is_last);
+ is_last_ = is_last;
return *this;
}
@@ -141,6 +168,7 @@ class Manifest : public Base {
protected:
ManifestType manifest_type_;
+ interface::ProductionProtocolAlgorithms manifest_transport_type_;
auth::CryptoHashType hash_algorithm_;
bool is_last_;
diff --git a/libtransport/src/core/manifest_format.h b/libtransport/src/core/manifest_format.h
index 90d221f5e..38f26067e 100644
--- a/libtransport/src/core/manifest_format.h
+++ b/libtransport/src/core/manifest_format.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -17,6 +17,7 @@
#include <hicn/transport/auth/crypto_hash.h>
#include <hicn/transport/core/name.h>
+#include <hicn/transport/interfaces/socket_options_keys.h>
#include <cinttypes>
#include <type_traits>
@@ -26,15 +27,6 @@ namespace transport {
namespace core {
-enum class ManifestFields : uint8_t {
- VERSION,
- HASH_ALGORITHM,
- SEGMENT_CALCULATION_STRATEGY,
- FINAL_MANIFEST,
- NAME_HASH_LIST,
- BASE_NAME
-};
-
enum class ManifestVersion : uint8_t {
VERSION_1 = 1,
};
@@ -45,18 +37,24 @@ enum class ManifestType : uint8_t {
FLIC_MANIFEST = 3,
};
-/**
- * INCREMENTAL: Manifests will be received inline with the data with no specific
- * assumption regarding the manifest capacity. Consumers can send interests
- * using a +1 heuristic.
- *
- * MANIFEST_CAPACITY_BASED: manifests with capacity N have a suffix multiple of
- * N+1: 0, N+1, 2(N+1) etc. Contents have a suffix incremented by 1 except when
- * it conflicts with a manifest: 1, 2, ..., N, N+2, N+3, ..., 2N+1, 2N+3
- */
-enum class NextSegmentCalculationStrategy : uint8_t {
- INCREMENTAL = 1,
- MANIFEST_CAPACITY_BASED = 2,
+struct ParamsRTC {
+ std::uint64_t timestamp;
+ std::uint32_t prod_rate;
+ std::uint32_t prod_seg;
+ std::uint32_t support_fec;
+
+ bool operator==(const ParamsRTC &other) const {
+ return (timestamp == other.timestamp && prod_rate == other.prod_rate &&
+ prod_seg == other.prod_seg && support_fec == other.support_fec);
+ }
+};
+
+struct ParamsBytestream {
+ std::uint32_t final_segment;
+
+ bool operator==(const ParamsBytestream &other) const {
+ return (final_segment == other.final_segment);
+ }
};
template <typename T>
@@ -84,24 +82,14 @@ class ManifestEncoder {
return static_cast<Implementation &>(*this).clearImpl();
}
- ManifestEncoder &setManifestType(ManifestType type) {
- return static_cast<Implementation &>(*this).setManifestTypeImpl(type);
+ ManifestEncoder &setType(ManifestType type) {
+ return static_cast<Implementation &>(*this).setTypeImpl(type);
}
ManifestEncoder &setHashAlgorithm(auth::CryptoHashType hash) {
return static_cast<Implementation &>(*this).setHashAlgorithmImpl(hash);
}
- ManifestEncoder &setFinalChunkNumber(uint32_t final_chunk) {
- return static_cast<Implementation &>(*this).setFinalChunkImpl(final_chunk);
- }
-
- ManifestEncoder &setNextSegmentCalculationStrategy(
- NextSegmentCalculationStrategy strategy) {
- return static_cast<Implementation &>(*this)
- .setNextSegmentCalculationStrategyImpl(strategy);
- }
-
template <
typename T,
typename = std::enable_if_t<std::is_same<
@@ -116,8 +104,8 @@ class ManifestEncoder {
suffix, std::forward<Hash &&>(hash));
}
- ManifestEncoder &setIsFinalManifest(bool is_last) {
- return static_cast<Implementation &>(*this).setIsFinalManifestImpl(is_last);
+ ManifestEncoder &setIsLast(bool is_last) {
+ return static_cast<Implementation &>(*this).setIsLastImpl(is_last);
}
ManifestEncoder &setVersion(ManifestVersion version) {
@@ -133,17 +121,22 @@ class ManifestEncoder {
return static_cast<Implementation &>(*this).updateImpl();
}
- ManifestEncoder &setFinalBlockNumber(std::uint32_t final_block_number) {
- return static_cast<Implementation &>(*this).setFinalBlockNumberImpl(
- final_block_number);
+ ManifestEncoder &setParamsBytestream(const ParamsBytestream &params) {
+ return static_cast<Implementation &>(*this).setParamsBytestreamImpl(params);
+ }
+
+ ManifestEncoder &setParamsRTC(const ParamsRTC &params) {
+ return static_cast<Implementation &>(*this).setParamsRTCImpl(params);
}
- static std::size_t getManifestHeaderSize() {
- return Implementation::getManifestHeaderSizeImpl();
+ static std::size_t manifestHeaderSize(
+ interface::ProductionProtocolAlgorithms transport_type =
+ interface::ProductionProtocolAlgorithms::UNKNOWN) {
+ return Implementation::manifestHeaderSizeImpl(transport_type);
}
- static std::size_t getManifestEntrySize() {
- return Implementation::getManifestEntrySizeImpl();
+ static std::size_t manifestEntrySize() {
+ return Implementation::manifestEntrySizeImpl();
}
};
@@ -158,21 +151,16 @@ class ManifestDecoder {
void decode() { static_cast<Implementation &>(*this).decodeImpl(); }
- ManifestType getManifestType() const {
- return static_cast<const Implementation &>(*this).getManifestTypeImpl();
- }
-
- auth::CryptoHashType getHashAlgorithm() const {
- return static_cast<const Implementation &>(*this).getHashAlgorithmImpl();
+ ManifestType getType() const {
+ return static_cast<const Implementation &>(*this).getTypeImpl();
}
- uint32_t getFinalChunkNumber() const {
- return static_cast<const Implementation &>(*this).getFinalChunkImpl();
+ interface::ProductionProtocolAlgorithms getTransportType() const {
+ return static_cast<const Implementation &>(*this).getTransportTypeImpl();
}
- NextSegmentCalculationStrategy getNextSegmentCalculationStrategy() const {
- return static_cast<const Implementation &>(*this)
- .getNextSegmentCalculationStrategyImpl();
+ auth::CryptoHashType getHashAlgorithm() const {
+ return static_cast<const Implementation &>(*this).getHashAlgorithmImpl();
}
core::Name getBaseName() const {
@@ -183,8 +171,8 @@ class ManifestDecoder {
return static_cast<Implementation &>(*this).getSuffixHashListImpl();
}
- bool getIsFinalManifest() const {
- return static_cast<const Implementation &>(*this).getIsFinalManifestImpl();
+ bool getIsLast() const {
+ return static_cast<const Implementation &>(*this).getIsLastImpl();
}
ManifestVersion getVersion() const {
@@ -196,8 +184,12 @@ class ManifestDecoder {
.estimateSerializedLengthImpl(number_of_entries);
}
- uint32_t getFinalBlockNumber() const {
- return static_cast<const Implementation &>(*this).getFinalBlockNumberImpl();
+ ParamsBytestream getParamsBytestream() const {
+ return static_cast<const Implementation &>(*this).getParamsBytestreamImpl();
+ }
+
+ ParamsRTC getParamsRTC() const {
+ return static_cast<const Implementation &>(*this).getParamsRTCImpl();
}
};
diff --git a/libtransport/src/core/manifest_format_fixed.cc b/libtransport/src/core/manifest_format_fixed.cc
index 11d4a56cb..4c8a5e031 100644
--- a/libtransport/src/core/manifest_format_fixed.cc
+++ b/libtransport/src/core/manifest_format_fixed.cc
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -27,164 +27,288 @@ FixedManifestEncoder::FixedManifestEncoder(Packet &packet,
bool clear)
: packet_(packet),
max_size_(Packet::default_mtu - packet_.headerSize()),
- manifest_header_(reinterpret_cast<ManifestHeader *>(
- packet_.writableData() + packet_.headerSize())),
- manifest_entries_(
- reinterpret_cast<ManifestEntry *>(manifest_header_ + 1)),
- current_entry_(0),
- signature_size_(signature_size) {
+ signature_size_(signature_size),
+ transport_type_(interface::ProductionProtocolAlgorithms::UNKNOWN),
+ encoded_(false),
+ params_bytestream_({0}),
+ params_rtc_({0}) {
+ manifest_meta_ = reinterpret_cast<ManifestMeta *>(packet_.writableData() +
+ packet_.headerSize());
+ manifest_entry_meta_ =
+ reinterpret_cast<ManifestEntryMeta *>(manifest_meta_ + 1);
+
if (clear) {
- *manifest_header_ = {0};
+ *manifest_meta_ = {0};
+ *manifest_entry_meta_ = {0};
}
}
FixedManifestEncoder::~FixedManifestEncoder() {}
FixedManifestEncoder &FixedManifestEncoder::encodeImpl() {
- packet_.append(sizeof(ManifestHeader) +
- manifest_header_->number_of_entries * sizeof(ManifestEntry));
+ if (encoded_) {
+ return *this;
+ }
+
+ manifest_meta_->transport_type = static_cast<uint8_t>(transport_type_);
+ manifest_entry_meta_->nb_entries = manifest_entries_.size();
+
+ packet_.append(FixedManifestEncoder::manifestHeaderSizeImpl());
packet_.updateLength();
+
+ switch (transport_type_) {
+ case interface::ProductionProtocolAlgorithms::BYTE_STREAM:
+ packet_.appendPayload(
+ reinterpret_cast<const uint8_t *>(&params_bytestream_),
+ MANIFEST_PARAMS_BYTESTREAM_SIZE);
+ break;
+ case interface::ProductionProtocolAlgorithms::RTC_PROD:
+ packet_.appendPayload(reinterpret_cast<const uint8_t *>(&params_rtc_),
+ MANIFEST_PARAMS_RTC_SIZE);
+ break;
+ default:
+ break;
+ }
+
+ packet_.appendPayload(
+ reinterpret_cast<const uint8_t *>(manifest_entries_.data()),
+ manifest_entries_.size() * FixedManifestEncoder::manifestEntrySizeImpl());
+
+ if (TRANSPORT_EXPECT_FALSE(packet_.payloadSize() <
+ estimateSerializedLengthImpl())) {
+ throw errors::RuntimeException("Error encoding the manifest");
+ }
+
+ encoded_ = true;
return *this;
}
FixedManifestEncoder &FixedManifestEncoder::clearImpl() {
- packet_.trimEnd(sizeof(ManifestHeader) +
- manifest_header_->number_of_entries * sizeof(ManifestEntry));
- current_entry_ = 0;
- *manifest_header_ = {0};
+ if (encoded_) {
+ packet_.trimEnd(FixedManifestEncoder::manifestHeaderSizeImpl() +
+ manifest_entries_.size() *
+ FixedManifestEncoder::manifestEntrySizeImpl());
+ }
+
+ transport_type_ = interface::ProductionProtocolAlgorithms::UNKNOWN;
+ encoded_ = false;
+ params_bytestream_ = {0};
+ params_rtc_ = {0};
+ *manifest_meta_ = {0};
+ *manifest_entry_meta_ = {0};
+ manifest_entries_.clear();
+
return *this;
}
-FixedManifestEncoder &FixedManifestEncoder::setHashAlgorithmImpl(
- auth::CryptoHashType algorithm) {
- manifest_header_->hash_algorithm = static_cast<uint8_t>(algorithm);
+FixedManifestEncoder &FixedManifestEncoder::updateImpl() {
+ max_size_ = Packet::default_mtu - packet_.headerSize() - signature_size_;
+ return *this;
+}
+
+FixedManifestEncoder &FixedManifestEncoder::setVersionImpl(
+ ManifestVersion version) {
+ manifest_meta_->version = static_cast<uint8_t>(version);
return *this;
}
-FixedManifestEncoder &FixedManifestEncoder::setManifestTypeImpl(
+FixedManifestEncoder &FixedManifestEncoder::setTypeImpl(
ManifestType manifest_type) {
- manifest_header_->manifest_type = static_cast<uint8_t>(manifest_type);
+ manifest_meta_->type = static_cast<uint8_t>(manifest_type);
+ return *this;
+}
+
+FixedManifestEncoder &FixedManifestEncoder::setHashAlgorithmImpl(
+ auth::CryptoHashType algorithm) {
+ manifest_meta_->hash_algorithm = static_cast<uint8_t>(algorithm);
return *this;
}
-FixedManifestEncoder &
-FixedManifestEncoder::setNextSegmentCalculationStrategyImpl(
- NextSegmentCalculationStrategy strategy) {
- manifest_header_->next_segment_strategy = static_cast<uint8_t>(strategy);
+FixedManifestEncoder &FixedManifestEncoder::setIsLastImpl(bool is_last) {
+ manifest_meta_->is_last = static_cast<uint8_t>(is_last);
return *this;
}
FixedManifestEncoder &FixedManifestEncoder::setBaseNameImpl(
const core::Name &base_name) {
- base_name.copyToDestination(
- reinterpret_cast<uint8_t *>(&manifest_header_->prefix[0]), false);
- manifest_header_->flags.ipv6 =
+ manifest_entry_meta_->is_ipv6 =
base_name.getAddressFamily() == AF_INET6 ? 1_U8 : 0_U8;
+ base_name.copyPrefixToDestination(
+ reinterpret_cast<uint8_t *>(&manifest_entry_meta_->prefix[0]));
return *this;
}
-FixedManifestEncoder &FixedManifestEncoder::addSuffixAndHashImpl(
- uint32_t suffix, const auth::CryptoHash &hash) {
- auto _hash = hash.getDigest();
- addSuffixHashBytes(suffix, _hash.data(), _hash.size());
+FixedManifestEncoder &FixedManifestEncoder::setParamsBytestreamImpl(
+ const ParamsBytestream &params) {
+ transport_type_ = interface::ProductionProtocolAlgorithms::BYTE_STREAM;
+ params_bytestream_ = TransportParamsBytestream{
+ .final_segment = params.final_segment,
+ };
return *this;
}
-void FixedManifestEncoder::addSuffixHashBytes(uint32_t suffix,
- const uint8_t *hash,
- std::size_t length) {
- manifest_entries_[current_entry_].suffix = htonl(suffix);
- // std::copy(hash, hash + length,
- // manifest_entries_[current_entry_].hash);
- std::memcpy(
- reinterpret_cast<uint8_t *>(manifest_entries_[current_entry_].hash), hash,
- length);
+FixedManifestEncoder &FixedManifestEncoder::setParamsRTCImpl(
+ const ParamsRTC &params) {
+ transport_type_ = interface::ProductionProtocolAlgorithms::RTC_PROD;
+ params_rtc_ = TransportParamsRTC{
+ .timestamp = params.timestamp,
+ .prod_rate = params.prod_rate,
+ .prod_seg = params.prod_seg,
+ .support_fec = params.support_fec,
+ };
+ return *this;
+}
- manifest_header_->number_of_entries++;
- current_entry_++;
+FixedManifestEncoder &FixedManifestEncoder::addSuffixAndHashImpl(
+ uint32_t suffix, const auth::CryptoHash &hash) {
+ std::vector<uint8_t> _hash = hash.getDigest();
+
+ manifest_entries_.push_back(ManifestEntry{
+ .suffix = htonl(suffix),
+ .hash = {0},
+ });
+
+ std::memcpy(reinterpret_cast<uint8_t *>(manifest_entries_.back().hash),
+ _hash.data(), _hash.size());
if (TRANSPORT_EXPECT_FALSE(estimateSerializedLengthImpl() > max_size_)) {
throw errors::RuntimeException("Manifest size exceeded the packet MTU!");
}
-}
-
-FixedManifestEncoder &FixedManifestEncoder::setIsFinalManifestImpl(
- bool is_last) {
- manifest_header_->flags.is_last = static_cast<uint8_t>(is_last);
- return *this;
-}
-FixedManifestEncoder &FixedManifestEncoder::setVersionImpl(
- ManifestVersion version) {
- manifest_header_->version = static_cast<uint8_t>(version);
return *this;
}
std::size_t FixedManifestEncoder::estimateSerializedLengthImpl(
std::size_t additional_entries) {
- return sizeof(ManifestHeader) +
- (manifest_header_->number_of_entries + additional_entries) *
- sizeof(ManifestEntry);
+ return FixedManifestEncoder::manifestHeaderSizeImpl(transport_type_) +
+ (manifest_entries_.size() + additional_entries) *
+ FixedManifestEncoder::manifestEntrySizeImpl();
}
-FixedManifestEncoder &FixedManifestEncoder::updateImpl() {
- max_size_ = Packet::default_mtu - packet_.headerSize() - signature_size_;
- return *this;
-}
-
-FixedManifestEncoder &FixedManifestEncoder::setFinalBlockNumberImpl(
- std::uint32_t final_block_number) {
- manifest_header_->final_block_number = htonl(final_block_number);
- return *this;
-}
+std::size_t FixedManifestEncoder::manifestHeaderSizeImpl(
+ interface::ProductionProtocolAlgorithms transport_type) {
+ uint32_t params_size = 0;
+
+ switch (transport_type) {
+ case interface::ProductionProtocolAlgorithms::BYTE_STREAM:
+ params_size = MANIFEST_PARAMS_BYTESTREAM_SIZE;
+ break;
+ case interface::ProductionProtocolAlgorithms::RTC_PROD:
+ params_size = MANIFEST_PARAMS_RTC_SIZE;
+ break;
+ default:
+ break;
+ }
-std::size_t FixedManifestEncoder::getManifestHeaderSizeImpl() {
- return sizeof(ManifestHeader);
+ return MANIFEST_META_SIZE + MANIFEST_ENTRY_META_SIZE + params_size;
}
-std::size_t FixedManifestEncoder::getManifestEntrySizeImpl() {
- return sizeof(ManifestEntry);
+std::size_t FixedManifestEncoder::manifestEntrySizeImpl() {
+ return MANIFEST_ENTRY_SIZE;
}
FixedManifestDecoder::FixedManifestDecoder(Packet &packet)
- : packet_(packet),
- manifest_header_(reinterpret_cast<ManifestHeader *>(
- packet_.getPayload()->writableData())),
- manifest_entries_(reinterpret_cast<ManifestEntry *>(
- packet_.getPayload()->writableData() + sizeof(ManifestHeader))) {}
+ : packet_(packet), decoded_(false) {
+ manifest_meta_ =
+ reinterpret_cast<ManifestMeta *>(packet_.getPayload()->writableData());
+ manifest_entry_meta_ =
+ reinterpret_cast<ManifestEntryMeta *>(manifest_meta_ + 1);
+ transport_type_ = getTransportTypeImpl();
+
+ switch (transport_type_) {
+ case interface::ProductionProtocolAlgorithms::BYTE_STREAM:
+ params_bytestream_ = reinterpret_cast<TransportParamsBytestream *>(
+ manifest_entry_meta_ + 1);
+ manifest_entries_ =
+ reinterpret_cast<ManifestEntry *>(params_bytestream_ + 1);
+ break;
+ case interface::ProductionProtocolAlgorithms::RTC_PROD:
+ params_rtc_ =
+ reinterpret_cast<TransportParamsRTC *>(manifest_entry_meta_ + 1);
+ manifest_entries_ = reinterpret_cast<ManifestEntry *>(params_rtc_ + 1);
+ break;
+ default:
+ manifest_entries_ =
+ reinterpret_cast<ManifestEntry *>(manifest_entry_meta_ + 1);
+ break;
+ }
+}
FixedManifestDecoder::~FixedManifestDecoder() {}
void FixedManifestDecoder::decodeImpl() {
+ if (decoded_) {
+ return;
+ }
+
std::size_t packet_size = packet_.payloadSize();
- if (packet_size < sizeof(ManifestHeader) ||
+ if (packet_size <
+ FixedManifestEncoder::manifestHeaderSizeImpl(transport_type_) ||
packet_size < estimateSerializedLengthImpl()) {
throw errors::RuntimeException(
"The packet does not match expected manifest size.");
}
+
+ decoded_ = true;
+}
+
+FixedManifestDecoder &FixedManifestDecoder::clearImpl() {
+ decoded_ = false;
+ return *this;
}
-FixedManifestDecoder &FixedManifestDecoder::clearImpl() { return *this; }
+ManifestType FixedManifestDecoder::getTypeImpl() const {
+ return static_cast<ManifestType>(manifest_meta_->type);
+}
-ManifestType FixedManifestDecoder::getManifestTypeImpl() const {
- return static_cast<ManifestType>(manifest_header_->manifest_type);
+ManifestVersion FixedManifestDecoder::getVersionImpl() const {
+ return static_cast<ManifestVersion>(manifest_meta_->version);
+}
+
+interface::ProductionProtocolAlgorithms
+FixedManifestDecoder::getTransportTypeImpl() const {
+ return static_cast<interface::ProductionProtocolAlgorithms>(
+ manifest_meta_->transport_type);
}
auth::CryptoHashType FixedManifestDecoder::getHashAlgorithmImpl() const {
- return static_cast<auth::CryptoHashType>(manifest_header_->hash_algorithm);
+ return static_cast<auth::CryptoHashType>(manifest_meta_->hash_algorithm);
}
-NextSegmentCalculationStrategy
-FixedManifestDecoder::getNextSegmentCalculationStrategyImpl() const {
- return static_cast<NextSegmentCalculationStrategy>(
- manifest_header_->next_segment_strategy);
+bool FixedManifestDecoder::getIsLastImpl() const {
+ return static_cast<bool>(manifest_meta_->is_last);
+}
+
+core::Name FixedManifestDecoder::getBaseNameImpl() const {
+ if (static_cast<bool>(manifest_entry_meta_->is_ipv6)) {
+ return core::Name(
+ AF_INET6, reinterpret_cast<uint8_t *>(&manifest_entry_meta_->prefix));
+ } else {
+ return core::Name(
+ AF_INET, reinterpret_cast<uint8_t *>(&manifest_entry_meta_->prefix));
+ }
+}
+
+ParamsBytestream FixedManifestDecoder::getParamsBytestreamImpl() const {
+ return ParamsBytestream{
+ .final_segment = params_bytestream_->final_segment,
+ };
+}
+
+ParamsRTC FixedManifestDecoder::getParamsRTCImpl() const {
+ return ParamsRTC{
+ .timestamp = params_rtc_->timestamp,
+ .prod_rate = params_rtc_->prod_rate,
+ .prod_seg = params_rtc_->prod_seg,
+ .support_fec = params_rtc_->support_fec,
+ };
}
typename Fixed::SuffixList FixedManifestDecoder::getSuffixHashListImpl() {
typename Fixed::SuffixList hash_list;
- for (int i = 0; i < manifest_header_->number_of_entries; i++) {
+ for (int i = 0; i < manifest_entry_meta_->nb_entries; i++) {
hash_list.insert(hash_list.end(),
std::make_pair(ntohl(manifest_entries_[i].suffix),
reinterpret_cast<uint8_t *>(
@@ -194,33 +318,11 @@ typename Fixed::SuffixList FixedManifestDecoder::getSuffixHashListImpl() {
return hash_list;
}
-core::Name FixedManifestDecoder::getBaseNameImpl() const {
- if (static_cast<bool>(manifest_header_->flags.ipv6)) {
- return core::Name(AF_INET6,
- reinterpret_cast<uint8_t *>(&manifest_header_->prefix));
- } else {
- return core::Name(AF_INET,
- reinterpret_cast<uint8_t *>(&manifest_header_->prefix));
- }
-}
-
-bool FixedManifestDecoder::getIsFinalManifestImpl() const {
- return static_cast<bool>(manifest_header_->flags.is_last);
-}
-
-ManifestVersion FixedManifestDecoder::getVersionImpl() const {
- return static_cast<ManifestVersion>(manifest_header_->version);
-}
-
std::size_t FixedManifestDecoder::estimateSerializedLengthImpl(
std::size_t additional_entries) const {
- return sizeof(ManifestHeader) +
- (additional_entries + manifest_header_->number_of_entries) *
- sizeof(ManifestEntry);
-}
-
-uint32_t FixedManifestDecoder::getFinalBlockNumberImpl() const {
- return ntohl(manifest_header_->final_block_number);
+ return FixedManifestEncoder::manifestHeaderSizeImpl(transport_type_) +
+ (manifest_entry_meta_->nb_entries + additional_entries) *
+ FixedManifestEncoder::manifestEntrySizeImpl();
}
} // end namespace core
diff --git a/libtransport/src/core/manifest_format_fixed.h b/libtransport/src/core/manifest_format_fixed.h
index 56ad4ef6d..ade4bf02c 100644
--- a/libtransport/src/core/manifest_format_fixed.h
+++ b/libtransport/src/core/manifest_format_fixed.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -24,26 +24,74 @@ namespace transport {
namespace core {
-// 0 1 2 3
-// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
-// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
-// |Version| MType |HashAlg|NextStr| Flags |NumberOfEntries|
-// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
-// | Final Block Number |
-// +---------------------------------------------------------------|
-// | |
-// + +
-// | |
-// + Prefix +
-// | |
-// + +
-// | |
-// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
-// | Suffix |
-// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
-// | Hash Value |
-// | |
-// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+// Manifest Metadata:
+// 0 1 2 3
+// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+// |Version| Type | Transport Type| Hash Algorithm|L| Reserved |
+// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+
+// Manifest Entry Metadata:
+// 0 1 2 3
+// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+// | Nb entries |I| Reserved |
+// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+// | |
+// + +
+// | |
+// + Prefix +
+// | |
+// + +
+// | |
+// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+
+// Manifest Transport Parameters - Bytestream:
+// 0 1 2 3
+// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+// | Final Segment |
+// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+
+// Manifest Transport Parameters - RTC:
+// 0 1 2 3
+// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+// | |
+// + Timestamp +
+// | |
+// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+// | Production Rate |
+// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+// | Current Segment |
+// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+// |F| |
+// + Reserved for future parameters +
+// | |
+// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+
+// Manifest Entry:
+// 0 1 2 3
+// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+// | Packet Suffix |
+// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+// | |
+// + +
+// | |
+// + +
+// | |
+// + +
+// | |
+// + Packet Digest +
+// | |
+// + +
+// | |
+// + +
+// | |
+// + +
+// | |
+// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
class FixedManifestEncoder;
class FixedManifestDecoder;
@@ -58,28 +106,47 @@ struct Fixed {
using SuffixList = std::list<std::pair<uint32_t, uint8_t *>>;
};
-struct Flags {
- std::uint8_t ipv6 : 1;
- std::uint8_t is_last : 1;
- std::uint8_t unused : 6;
+const size_t MANIFEST_META_SIZE = 4;
+struct __attribute__((__packed__)) ManifestMeta {
+ std::uint8_t version : 4;
+ std::uint8_t type : 4;
+ std::uint8_t transport_type;
+ std::uint8_t hash_algorithm;
+ std::uint8_t is_last;
};
+static_assert(sizeof(ManifestMeta) == MANIFEST_META_SIZE);
-struct ManifestEntry {
- std::uint32_t suffix;
- std::uint32_t hash[8];
+const size_t MANIFEST_ENTRY_META_SIZE = 20;
+struct __attribute__((__packed__)) ManifestEntryMeta {
+ std::uint8_t nb_entries;
+ std::uint8_t is_ipv6;
+ std::uint16_t unused;
+ std::uint32_t prefix[4];
};
+static_assert(sizeof(ManifestEntryMeta) == MANIFEST_ENTRY_META_SIZE);
-struct ManifestHeader {
- std::uint8_t version : 4;
- std::uint8_t manifest_type : 4;
- std::uint8_t hash_algorithm : 4;
- std::uint8_t next_segment_strategy : 4;
- Flags flags;
- std::uint8_t number_of_entries;
- std::uint32_t final_block_number;
- std::uint32_t prefix[4];
- ManifestEntry entries[0];
+const size_t MANIFEST_PARAMS_BYTESTREAM_SIZE = 4;
+struct __attribute__((__packed__)) TransportParamsBytestream {
+ std::uint32_t final_segment;
+};
+static_assert(sizeof(TransportParamsBytestream) ==
+ MANIFEST_PARAMS_BYTESTREAM_SIZE);
+
+const size_t MANIFEST_PARAMS_RTC_SIZE = 20;
+struct __attribute__((__packed__)) TransportParamsRTC {
+ std::uint64_t timestamp;
+ std::uint32_t prod_rate;
+ std::uint32_t prod_seg;
+ std::uint32_t support_fec;
+};
+static_assert(sizeof(TransportParamsRTC) == MANIFEST_PARAMS_RTC_SIZE);
+
+const size_t MANIFEST_ENTRY_SIZE = 36;
+struct __attribute__((__packed__)) ManifestEntry {
+ std::uint32_t suffix;
+ std::uint32_t hash[8];
};
+static_assert(sizeof(ManifestEntry) == MANIFEST_ENTRY_SIZE);
static const constexpr std::uint8_t manifest_version = 1;
@@ -91,46 +158,48 @@ class FixedManifestEncoder : public ManifestEncoder<FixedManifestEncoder> {
~FixedManifestEncoder();
FixedManifestEncoder &encodeImpl();
-
FixedManifestEncoder &clearImpl();
+ FixedManifestEncoder &updateImpl();
- FixedManifestEncoder &setManifestTypeImpl(ManifestType manifest_type);
-
+ // ManifestMeta
+ FixedManifestEncoder &setVersionImpl(ManifestVersion version);
+ FixedManifestEncoder &setTypeImpl(ManifestType manifest_type);
FixedManifestEncoder &setHashAlgorithmImpl(Fixed::HashType algorithm);
+ FixedManifestEncoder &setIsLastImpl(bool is_last);
- FixedManifestEncoder &setNextSegmentCalculationStrategyImpl(
- NextSegmentCalculationStrategy strategy);
-
+ // ManifestEntryMeta
FixedManifestEncoder &setBaseNameImpl(const core::Name &base_name);
+ // TransportParams
+ FixedManifestEncoder &setParamsBytestreamImpl(const ParamsBytestream &params);
+ FixedManifestEncoder &setParamsRTCImpl(const ParamsRTC &params);
+
+ // ManifestEntry
FixedManifestEncoder &addSuffixAndHashImpl(uint32_t suffix,
const Fixed::Hash &hash);
- FixedManifestEncoder &setIsFinalManifestImpl(bool is_last);
-
- FixedManifestEncoder &setVersionImpl(ManifestVersion version);
-
std::size_t estimateSerializedLengthImpl(std::size_t additional_entries = 0);
- FixedManifestEncoder &updateImpl();
-
- FixedManifestEncoder &setFinalBlockNumberImpl(
- std::uint32_t final_block_number);
-
- static std::size_t getManifestHeaderSizeImpl();
-
- static std::size_t getManifestEntrySizeImpl();
+ static std::size_t manifestHeaderSizeImpl(
+ interface::ProductionProtocolAlgorithms transport_type =
+ interface::ProductionProtocolAlgorithms::UNKNOWN);
+ static std::size_t manifestEntrySizeImpl();
private:
- void addSuffixHashBytes(uint32_t suffix, const uint8_t *hash,
- std::size_t length);
-
Packet &packet_;
std::size_t max_size_;
- ManifestHeader *manifest_header_;
- ManifestEntry *manifest_entries_;
- std::size_t current_entry_;
std::size_t signature_size_;
+ interface::ProductionProtocolAlgorithms transport_type_;
+ bool encoded_;
+
+ // Manifest Header
+ ManifestMeta *manifest_meta_;
+ ManifestEntryMeta *manifest_entry_meta_;
+ TransportParamsBytestream params_bytestream_;
+ TransportParamsRTC params_rtc_;
+
+ // Manifest Entries
+ std::vector<ManifestEntry> manifest_entries_;
};
class FixedManifestDecoder : public ManifestDecoder<FixedManifestDecoder> {
@@ -140,31 +209,40 @@ class FixedManifestDecoder : public ManifestDecoder<FixedManifestDecoder> {
~FixedManifestDecoder();
void decodeImpl();
-
FixedManifestDecoder &clearImpl();
- ManifestType getManifestTypeImpl() const;
-
+ // ManifestMeta
+ ManifestVersion getVersionImpl() const;
+ ManifestType getTypeImpl() const;
+ interface::ProductionProtocolAlgorithms getTransportTypeImpl() const;
Fixed::HashType getHashAlgorithmImpl() const;
+ bool getIsLastImpl() const;
- NextSegmentCalculationStrategy getNextSegmentCalculationStrategyImpl() const;
-
- typename Fixed::SuffixList getSuffixHashListImpl();
-
+ // ManifestEntryMeta
core::Name getBaseNameImpl() const;
- bool getIsFinalManifestImpl() const;
+ // TransportParams
+ ParamsBytestream getParamsBytestreamImpl() const;
+ ParamsRTC getParamsRTCImpl() const;
+
+ // ManifestEntry
+ typename Fixed::SuffixList getSuffixHashListImpl();
std::size_t estimateSerializedLengthImpl(
std::size_t additional_entries = 0) const;
- ManifestVersion getVersionImpl() const;
-
- uint32_t getFinalBlockNumberImpl() const;
-
private:
Packet &packet_;
- ManifestHeader *manifest_header_;
+ interface::ProductionProtocolAlgorithms transport_type_;
+ bool decoded_;
+
+ // Manifest Header
+ ManifestMeta *manifest_meta_;
+ ManifestEntryMeta *manifest_entry_meta_;
+ TransportParamsBytestream *params_bytestream_;
+ TransportParamsRTC *params_rtc_;
+
+ // Manifest Entries
ManifestEntry *manifest_entries_;
};
diff --git a/libtransport/src/core/manifest_inline.h b/libtransport/src/core/manifest_inline.h
index a487ccfe3..ca48a4a79 100644
--- a/libtransport/src/core/manifest_inline.h
+++ b/libtransport/src/core/manifest_inline.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -40,25 +40,26 @@ class ManifestInline
public:
ManifestInline() : ManifestBase() {}
- ManifestInline(const core::Name &name, std::size_t signature_size = 0)
- : ManifestBase(name, signature_size) {}
+ ManifestInline(Packet::Format format, const core::Name &name,
+ std::size_t signature_size = 0)
+ : ManifestBase(format, name, signature_size) {}
template <typename T>
ManifestInline(T &&base) : ManifestBase(std::forward<T &&>(base)) {}
+ template <typename T>
+ ManifestInline(T &base) : ManifestBase(base) {}
+
static TRANSPORT_ALWAYS_INLINE ManifestInline *createManifest(
- const core::Name &manifest_name, ManifestVersion version,
- ManifestType type, HashType algorithm, bool is_last,
- const Name &base_name, NextSegmentCalculationStrategy strategy,
- std::size_t signature_size) {
- auto manifest = new ManifestInline(manifest_name, signature_size);
+ Packet::Format format, const core::Name &manifest_name,
+ ManifestVersion version, ManifestType type, bool is_last,
+ const Name &base_name, HashType hash_algo, std::size_t signature_size) {
+ auto manifest = new ManifestInline(format, manifest_name, signature_size);
manifest->setVersion(version);
- manifest->setManifestType(type);
- manifest->setHashAlgorithm(algorithm);
- manifest->setFinalManifest(is_last);
+ manifest->setType(type);
+ manifest->setHashAlgorithm(hash_algo);
+ manifest->setIsLast(is_last);
manifest->setBaseName(base_name);
- manifest->setNextSegmentCalculationStrategy(strategy);
-
return manifest;
}
@@ -69,8 +70,6 @@ class ManifestInline
ManifestInline &decodeImpl() {
base_name_ = ManifestBase::decoder_.getBaseName();
- next_segment_strategy_ =
- ManifestBase::decoder_.getNextSegmentCalculationStrategy();
suffix_hash_map_ = ManifestBase::decoder_.getSuffixHashList();
return *this;
@@ -96,18 +95,6 @@ class ManifestInline
// Call this function only after the decode function!
const SuffixList &getSuffixList() { return suffix_hash_map_; }
- ManifestInline &setNextSegmentCalculationStrategy(
- NextSegmentCalculationStrategy strategy) {
- next_segment_strategy_ = strategy;
- ManifestBase::encoder_.setNextSegmentCalculationStrategy(
- next_segment_strategy_);
- return *this;
- }
-
- NextSegmentCalculationStrategy getNextSegmentCalculationStrategy() {
- return next_segment_strategy_;
- }
-
// Convert several manifests into a single map from suffixes to packet hashes.
// All manifests must have been decoded beforehand.
static std::unordered_map<Suffix, Hash> getSuffixMap(
@@ -134,7 +121,6 @@ class ManifestInline
private:
core::Name base_name_;
- NextSegmentCalculationStrategy next_segment_strategy_;
SuffixList suffix_hash_map_;
};
diff --git a/libtransport/src/core/memif_connector.cc b/libtransport/src/core/memif_connector.cc
new file mode 100644
index 000000000..fb38a6e23
--- /dev/null
+++ b/libtransport/src/core/memif_connector.cc
@@ -0,0 +1,497 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <core/errors.h>
+#include <core/memif_connector.h>
+#include <glog/logging.h>
+#include <hicn/transport/errors/not_implemented_exception.h>
+#include <sys/epoll.h>
+
+#include <cstdlib>
+
+/* sstrncpy */
+#include <hicn/util/sstrncpy.h>
+
+#define CANCEL_TIMER 1
+
+namespace transport {
+
+namespace core {
+
+MemifConnector::MemifConnector(PacketReceivedCallback &&receive_callback,
+ PacketSentCallback &&packet_sent,
+ OnCloseCallback &&close_callback,
+ OnReconnectCallback &&on_reconnect,
+ asio::io_service &io_service,
+ std::string app_name)
+ : Connector(std::move(receive_callback), std::move(packet_sent),
+ std::move(close_callback), std::move(on_reconnect)),
+ event_reactor_(),
+ memif_worker_(std::bind(&MemifConnector::threadMain, this)),
+ timer_set_(false),
+ send_timer_(event_reactor_),
+ disconnect_timer_(event_reactor_),
+ io_service_(io_service),
+ work_(asio::make_work_guard(io_service_)),
+ memif_connection_({0}),
+ tx_buf_counter_(0),
+ is_reconnection_(false),
+ data_available_(false),
+ app_name_(app_name),
+ socket_filename_(""),
+ buffer_size_(kbuf_size),
+ log2_ring_size_(klog2_ring_size),
+ max_memif_bufs_(1 << klog2_ring_size) {}
+
+MemifConnector::~MemifConnector() { close(); }
+
+void MemifConnector::connect(uint32_t memif_id, long memif_mode,
+ const std::string &socket_filename,
+ std::size_t buffer_size,
+ std::size_t log2_ring_size) {
+ state_ = State::CONNECTING;
+
+ memif_id_ = memif_id;
+ socket_filename_ = socket_filename;
+ buffer_size_ = buffer_size;
+ log2_ring_size_ = log2_ring_size;
+ max_memif_bufs_ = 1 << log2_ring_size;
+ createMemif(memif_id, memif_mode);
+}
+
+int MemifConnector::createMemif(uint32_t index, uint8_t is_master) {
+ int err = MEMIF_ERR_SUCCESS;
+
+ memif_socket_args_t socket_args;
+ memif_conn_args_t args;
+ memset(&socket_args, 0, sizeof(memif_socket_args_t));
+ memset(&args, 0, sizeof(memif_conn_args_t));
+
+ // Setup memif socket first
+
+ int rc = strcpy_s(socket_args.path, sizeof(socket_args.path) - 1,
+ socket_filename_.c_str());
+ if (rc != EOK) {
+ std::string error = "Provided socket path is larger than " +
+ std::to_string(sizeof(socket_args.path)) + " bytes.";
+ throw errors::RuntimeException(error);
+ }
+
+ rc = strcpy_s(socket_args.app_name, sizeof(socket_args.app_name) - 1,
+ app_name_.c_str());
+ if (rc != EOK) {
+ std::string error = "Provided app_name is larger than " +
+ std::to_string(sizeof(socket_args.app_name)) +
+ " bytes.";
+ throw errors::RuntimeException(error);
+ }
+
+ socket_args.on_control_fd_update = controlFdUpdate;
+ socket_args.alloc = nullptr;
+ socket_args.realloc = nullptr;
+ socket_args.free = nullptr;
+
+ err = memif_create_socket(&args.socket, &socket_args, this);
+
+ if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) {
+ throw errors::RuntimeException(memif_strerror(err));
+ }
+
+ // Setup memif connection using provided memif_socket_handle_t
+ args.is_master = is_master;
+ args.log2_ring_size = log2_ring_size_;
+ args.buffer_size = buffer_size_;
+ args.num_s2m_rings = 1;
+ args.num_m2s_rings = 1;
+ strcpy_s((char *)args.interface_name, sizeof(args.interface_name), IF_NAME);
+ args.mode = memif_interface_mode_t::MEMIF_INTERFACE_MODE_IP;
+ args.interface_id = index;
+ err = memif_create(&memif_connection_.conn, &args, onConnect, onDisconnect,
+ onInterrupt, this);
+
+ if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) {
+ throw errors::RuntimeException(memif_strerror(err));
+ }
+
+ memif_connection_.index = (uint16_t)index;
+ memif_connection_.tx_qid = 0;
+ /* alloc memif buffers */
+ memif_connection_.rx_buf_num = 0;
+ memif_connection_.rx_bufs = static_cast<memif_buffer_t *>(
+ malloc(sizeof(memif_buffer_t) * max_memif_bufs_));
+ memif_connection_.tx_buf_num = 0;
+ memif_connection_.tx_bufs = static_cast<memif_buffer_t *>(
+ malloc(sizeof(memif_buffer_t) * max_memif_bufs_));
+
+ return 0;
+}
+
+int MemifConnector::deleteMemif() {
+ if (memif_connection_.rx_bufs) {
+ free(memif_connection_.rx_bufs);
+ }
+
+ memif_connection_.rx_bufs = nullptr;
+ memif_connection_.rx_buf_num = 0;
+
+ if (memif_connection_.tx_bufs) {
+ free(memif_connection_.tx_bufs);
+ }
+
+ memif_connection_.tx_bufs = nullptr;
+ memif_connection_.tx_buf_num = 0;
+
+ int err;
+ /* disconenct then delete memif connection */
+ err = memif_delete(&memif_connection_.conn);
+
+ if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) {
+ LOG(ERROR) << "memif_delete: " << memif_strerror(err);
+ }
+
+ if (TRANSPORT_EXPECT_FALSE(memif_connection_.conn != nullptr)) {
+ LOG(ERROR) << "memif delete fail";
+ }
+
+ state_ = State::CLOSED;
+
+ return 0;
+}
+
+int MemifConnector::controlFdUpdate(memif_fd_event_t fde, void *private_ctx) {
+ auto self = reinterpret_cast<MemifConnector *>(private_ctx);
+ uint32_t evt = 0;
+
+ /* convert memif event definitions to epoll events */
+ auto events = fde.type;
+ auto fd = fde.fd;
+
+ if (events & MEMIF_FD_EVENT_ERROR) {
+ LOG(ERROR) << "memif fd event: Error";
+ return -1;
+ }
+
+ if (events & MEMIF_FD_EVENT_DEL) {
+ DLOG_IF(INFO, VLOG_IS_ON(4)) << "memif fd event: DEL fd " << fd;
+ return self->event_reactor_.delFileDescriptor(fd);
+ }
+
+ if (events & MEMIF_FD_EVENT_MOD) {
+ DLOG_IF(INFO, VLOG_IS_ON(4)) << "memif fd event: MOD fd " << fd;
+ return self->event_reactor_.modFileDescriptor(fd, evt);
+ }
+
+ if (events & MEMIF_FD_EVENT_READ) {
+ evt |= EPOLLIN;
+ }
+
+ if (events & MEMIF_FD_EVENT_WRITE) {
+ evt |= EPOLLOUT;
+ }
+
+ DLOG_IF(INFO, VLOG_IS_ON(4)) << "memif fd event: ADD fd " << fd;
+ return self->event_reactor_.addFileDescriptor(
+ fd, evt, [fde](const utils::Event &evt) -> int {
+ int event = 0;
+ int memif_err = 0;
+
+ if (evt.events & EPOLLIN) {
+ event |= MEMIF_FD_EVENT_READ;
+ }
+
+ if (evt.events & EPOLLOUT) {
+ event |= MEMIF_FD_EVENT_WRITE;
+ }
+
+ if (evt.events & EPOLLERR) {
+ event |= MEMIF_FD_EVENT_ERROR;
+ }
+
+ memif_err = memif_control_fd_handler(fde.private_ctx,
+ memif_fd_event_type_t(event));
+
+ if (TRANSPORT_EXPECT_FALSE(memif_err != MEMIF_ERR_SUCCESS)) {
+ LOG(ERROR) << "memif_control_fd_handler: "
+ << memif_strerror(memif_err);
+ }
+
+ return 0;
+ });
+}
+
+uint16_t MemifConnector::bufferAlloc(long n, uint16_t qid,
+ std::error_code &ec) {
+ int err;
+ uint16_t r = 0;
+ /* set data pointer to shared memory and set buffer_len to shared mmeory
+ * buffer len */
+ err = memif_buffer_alloc(memif_connection_.conn, qid,
+ memif_connection_.tx_bufs, n, &r, buffer_size_);
+
+ if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) {
+ ec = make_error_code(core_error::send_buffer_allocation_failed);
+ }
+
+ memif_connection_.tx_buf_num += r;
+ return r;
+}
+
+uint16_t MemifConnector::txBurst(uint16_t qid, std::error_code &ec) {
+ int err = MEMIF_ERR_SUCCESS;
+ ec = make_error_code(core_error::success);
+ uint16_t tx = 0;
+
+ /* inform peer memif interface about data in shared memory buffers */
+ /* mark memif buffers as free */
+ err = memif_tx_burst(memif_connection_.conn, qid, memif_connection_.tx_bufs,
+ memif_connection_.tx_buf_num, &tx);
+
+ if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) {
+ ec = make_error_code(core_error::send_failed);
+ }
+
+ memif_connection_.tx_buf_num -= tx;
+ return tx;
+}
+
+void MemifConnector::scheduleSend(std::uint64_t delay) {
+ if (!timer_set_) {
+ timer_set_ = true;
+ send_timer_.expiresFromNow(std::chrono::microseconds(delay));
+ send_timer_.asyncWait(
+ std::bind(&MemifConnector::sendCallback, this, std::placeholders::_1));
+ }
+}
+
+void MemifConnector::sendCallback(const std::error_code &ec) {
+ timer_set_ = false;
+
+ if (TRANSPORT_EXPECT_TRUE(!ec && state_ == State::CONNECTED)) {
+ doSend();
+ }
+}
+
+/* informs user about connected status. private_ctx is used by user to identify
+ connection (multiple connections WIP) */
+int MemifConnector::onConnect(memif_conn_handle_t conn, void *private_ctx) {
+ auto self = reinterpret_cast<MemifConnector *>(private_ctx);
+ self->state_ = State::CONNECTED;
+ memif_refill_queue(conn, 0, -1, 0);
+
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "Memif " << self->app_name_ << " connected";
+
+ // We are connected. Notify higher layers.
+ self->io_service_.post([self]() {
+ self->on_reconnect_callback_(self, make_error_code(core_error::success));
+ });
+
+ self->doSend();
+
+ return 0;
+}
+
+/* informs user about disconnected status. private_ctx is used by user to
+ identify connection (multiple connections WIP) */
+int MemifConnector::onDisconnect(memif_conn_handle_t conn, void *private_ctx) {
+ MemifConnector *connector = (MemifConnector *)private_ctx;
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "Memif " << connector->app_name_ << " disconnected";
+ return 0;
+}
+
+void MemifConnector::threadMain() { event_reactor_.runEventLoop(200); }
+
+int MemifConnector::onInterrupt(memif_conn_handle_t conn, void *private_ctx,
+ uint16_t qid) {
+ MemifConnector *connector = (MemifConnector *)private_ctx;
+
+ Details &c = connector->memif_connection_;
+ std::weak_ptr<MemifConnector> self = connector->shared_from_this();
+ std::vector<::utils::MemBuf::Ptr> v;
+ std::error_code ec = make_error_code(core_error::success);
+
+ int err = MEMIF_ERR_SUCCESS, ret_val;
+ uint16_t rx = 0;
+
+ do {
+ err = memif_rx_burst(conn, qid, c.rx_bufs, max_burst, &rx);
+ ret_val = err;
+
+ if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS &&
+ err != MEMIF_ERR_NOBUF)) {
+ ec = make_error_code(core_error::receive_failed);
+ LOG(ERROR) << "memif_rx_burst: " << memif_strerror(err);
+ goto error;
+ }
+
+ c.rx_buf_num += rx;
+
+ if (TRANSPORT_EXPECT_FALSE(connector->io_service_.stopped())) {
+ LOG(ERROR) << "socket stopped: ignoring " << rx << " packets";
+ goto error;
+ }
+
+ std::size_t packet_length;
+ v.reserve(rx);
+ for (int i = 0; i < rx; i++) {
+ auto buffer = connector->getRawBuffer();
+ packet_length = (c.rx_bufs + i)->len;
+ std::memcpy(buffer.first, (c.rx_bufs + i)->data, packet_length);
+ auto packet = connector->getPacketFromBuffer(buffer.first, packet_length);
+ v.emplace_back(std::move(packet));
+ }
+
+ /* mark memif buffers and shared memory buffers as free */
+ /* free processed buffers */
+
+ err = memif_refill_queue(conn, qid, rx, 0);
+
+ if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) {
+ LOG(ERROR) << "memif_buffer_free: " << memif_strerror(err);
+ }
+
+ c.rx_buf_num -= rx;
+
+ } while (ret_val == MEMIF_ERR_NOBUF);
+
+ connector->io_service_.post([self, buffers = std::move(v)]() {
+ if (auto c = self.lock()) {
+ c->receive_callback_(c.get(), buffers,
+ std::make_error_code(std::errc(0)));
+ }
+ });
+
+ return 0;
+
+error:
+ err = memif_refill_queue(c.conn, qid, rx, 0);
+
+ if (TRANSPORT_EXPECT_FALSE(err != MEMIF_ERR_SUCCESS)) {
+ LOG(ERROR) << "memif_buffer_free: " << memif_strerror(err);
+ }
+ c.rx_buf_num -= rx;
+
+ connector->io_service_.post([self, ec]() {
+ if (auto c = self.lock()) {
+ c->receive_callback_(c.get(), {}, ec);
+ }
+ });
+
+ return 0;
+}
+
+void MemifConnector::close() {
+ if (state_ != State::CLOSED) {
+ disconnect_timer_.expiresFromNow(std::chrono::microseconds(50));
+ disconnect_timer_.asyncWait([this](const std::error_code &ec) {
+ deleteMemif();
+ event_reactor_.stop();
+ });
+ }
+
+ if (memif_worker_.joinable()) {
+ memif_worker_.join();
+ }
+}
+
+void MemifConnector::send(Packet &packet) { send(packet.shared_from_this()); }
+
+void MemifConnector::send(const utils::MemBuf::Ptr &buffer) {
+ {
+ utils::SpinLock::Acquire locked(write_msgs_lock_);
+ output_buffer_.push_back(buffer);
+ }
+#if CANCEL_TIMER
+ scheduleSend(50);
+#endif
+}
+
+int MemifConnector::doSend() {
+ std::size_t max = 0;
+ std::size_t size = 0;
+ std::error_code ec = make_error_code(core_error::success);
+ int ret = 0;
+ uint64_t delay = 50; // microseconds
+
+ utils::SpinLock::Acquire locked(write_msgs_lock_);
+
+ // Check if there are pending buffers to send
+ if (memif_connection_.tx_buf_num > 0) {
+ ret = txBurst(memif_connection_.tx_qid, ec);
+ if (TRANSPORT_EXPECT_FALSE(ec.operator bool())) {
+ delay = 200;
+ goto done;
+ }
+ }
+
+ // Continue trying to send buffers in output_buffer_
+ size = output_buffer_.size();
+ max = size < max_burst ? size : max_burst;
+
+ ret = bufferAlloc(max, memif_connection_.tx_qid, ec);
+ if (TRANSPORT_EXPECT_FALSE(ec.operator bool() && ret == 0)) {
+ delay = 200;
+ goto done;
+ }
+
+ // Fill allocated buffers and remove them from output_buffer_
+ for (uint16_t i = 0; i < ret; i++) {
+ auto packet = output_buffer_.front().get();
+ const utils::MemBuf *current = packet;
+ std::size_t offset = 0;
+ uint8_t *shared_buffer =
+ reinterpret_cast<uint8_t *>(memif_connection_.tx_bufs[i].data);
+ do {
+ std::memcpy(shared_buffer + offset, current->data(), current->length());
+ offset += current->length();
+ current = current->next();
+ } while (current != packet);
+
+ memif_connection_.tx_bufs[i].len = uint32_t(offset);
+ output_buffer_.pop_front();
+ }
+
+ // Try to send them
+ ret = txBurst(memif_connection_.tx_qid, ec);
+ if (TRANSPORT_EXPECT_FALSE(ec.operator bool())) {
+ LOG(ERROR) << "Tx burst failed " << ec.message();
+ delay = 200;
+ goto done;
+ }
+
+done:
+ memif_refill_queue(memif_connection_.conn, memif_connection_.tx_qid, ret, 0);
+
+ // If there are still packets to send, schedule another send
+ if (memif_connection_.tx_buf_num > 0 || !output_buffer_.empty()) {
+ scheduleSend(delay);
+ }
+
+ // If error, signal to upper layers
+ if (ec.operator bool()) {
+ std::weak_ptr<MemifConnector> self = shared_from_this();
+ io_service_.post([self, ec]() {
+ if (auto c = self.lock()) {
+ c->sent_callback_(c.get(), ec);
+ }
+ });
+ }
+
+ return 0;
+}
+
+} // end namespace core
+
+} // end namespace transport
diff --git a/libtransport/src/core/memif_connector.h b/libtransport/src/core/memif_connector.h
new file mode 100644
index 000000000..d36be4616
--- /dev/null
+++ b/libtransport/src/core/memif_connector.h
@@ -0,0 +1,154 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/config.h>
+#include <hicn/transport/core/connector.h>
+#include <hicn/transport/portability/portability.h>
+#include <hicn/transport/utils/ring_buffer.h>
+//#include <hicn/transport/core/hicn_vapi.h>
+#include <hicn/transport/core/asio_wrapper.h>
+#include <utils/epoll_event_reactor.h>
+#include <utils/fd_deadline_timer.h>
+
+#include <deque>
+#include <future>
+#include <mutex>
+#include <thread>
+
+extern "C" {
+#include <libmemif.h>
+};
+
+#define _Static_assert static_assert
+
+namespace transport {
+
+namespace core {
+
+#define APP_NAME "libtransport"
+#define IF_NAME "vpp_connection"
+
+class MemifConnector : public Connector {
+ static inline std::size_t kbuf_size = 2048;
+ static inline std::size_t klog2_ring_size = 13;
+
+ using PacketRing = utils::CircularFifo<utils::MemBuf::Ptr, queue_size>;
+ struct Details {
+ // index
+ uint16_t index;
+ // memif conenction handle
+ memif_conn_handle_t conn;
+ // transmit queue id
+ uint16_t tx_qid;
+ // tx buffers
+ memif_buffer_t *tx_bufs;
+ // allocated tx buffers counter
+ // number of tx buffers pointing to shared memory
+ uint16_t tx_buf_num;
+ // rx buffers
+ memif_buffer_t *rx_bufs;
+ // allocated rx buffers counter
+ // number of rx buffers pointing to shared memory
+ uint16_t rx_buf_num;
+ // interface ip address
+ uint8_t ip_addr[4];
+ };
+
+ public:
+ MemifConnector(PacketReceivedCallback &&receive_callback,
+ PacketSentCallback &&packet_sent,
+ OnCloseCallback &&close_callback,
+ OnReconnectCallback &&on_reconnect,
+ asio::io_service &io_service,
+ std::string app_name = "Libtransport");
+
+ ~MemifConnector() override;
+
+ void send(Packet &packet) override;
+
+ void send(const utils::MemBuf::Ptr &buffer) override;
+
+ void close() override;
+
+ void connect(uint32_t memif_id, long memif_mode,
+ const std::string &socket_filename,
+ std::size_t buffer_size = kbuf_size,
+ std::size_t log2_ring_size = klog2_ring_size);
+
+ TRANSPORT_ALWAYS_INLINE uint32_t getMemifId() { return memif_id_; };
+
+ private:
+ void init();
+
+ int doSend();
+
+ int createMemif(uint32_t index, uint8_t is_master);
+
+ uint32_t getMemifConfiguration();
+
+ int deleteMemif();
+
+ static int controlFdUpdate(memif_fd_event_t fde, void *private_ctx);
+
+ static int onConnect(memif_conn_handle_t conn, void *private_ctx);
+
+ static int onDisconnect(memif_conn_handle_t conn, void *private_ctx);
+
+ static int onInterrupt(memif_conn_handle_t conn, void *private_ctx,
+ uint16_t qid);
+
+ void threadMain();
+
+ uint16_t txBurst(uint16_t qid, std::error_code &ec);
+
+ uint16_t bufferAlloc(long n, uint16_t qid, std::error_code &ec);
+
+ void scheduleSend(std::uint64_t delay);
+
+ void sendCallback(const std::error_code &ec);
+
+ auto shared_from_this() { return utils::shared_from(this); }
+
+ private:
+ int epfd;
+ utils::EpollEventReactor event_reactor_;
+ std::thread memif_worker_;
+ std::atomic_bool timer_set_;
+ utils::FdDeadlineTimer send_timer_;
+ utils::FdDeadlineTimer disconnect_timer_;
+ asio::io_service &io_service_;
+ asio::executor_work_guard<asio::io_context::executor_type> work_;
+ Details memif_connection_;
+ uint16_t tx_buf_counter_;
+
+ PacketRing input_buffer_;
+ bool is_reconnection_;
+ bool data_available_;
+ uint32_t memif_id_;
+ uint8_t memif_mode_;
+ std::string app_name_;
+ uint16_t transmission_index_;
+ utils::SpinLock write_msgs_lock_;
+ std::string socket_filename_;
+ std::size_t buffer_size_;
+ std::size_t log2_ring_size_;
+ std::size_t max_memif_bufs_;
+};
+
+} // end namespace core
+
+} // end namespace transport
diff --git a/libtransport/src/core/name.cc b/libtransport/src/core/name.cc
index 795c8a697..98091eea5 100644
--- a/libtransport/src/core/name.cc
+++ b/libtransport/src/core/name.cc
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -26,30 +26,28 @@ namespace core {
Name::Name() { name_ = {}; }
+/**
+ * XXX This function does not use the name API provided by libhicn
+ */
Name::Name(int family, const uint8_t *ip_address, std::uint32_t suffix)
: name_({}) {
- name_.type = HNT_UNSPEC;
std::size_t length;
uint8_t *dst = NULL;
if (family == AF_INET) {
- dst = name_.ip4.prefix_as_u8;
+ dst = name_.prefix.v4.as_u8;
length = IPV4_ADDR_LEN;
- name_.type = HNT_CONTIGUOUS_V4;
} else if (family == AF_INET6) {
- dst = name_.ip6.prefix_as_u8;
+ dst = name_.prefix.v6.as_u8;
length = IPV6_ADDR_LEN;
- name_.type = HNT_CONTIGUOUS_V6;
} else {
throw errors::RuntimeException("Specified name family does not exist.");
}
std::memcpy(dst, ip_address, length);
- *reinterpret_cast<std::uint32_t *>(dst + length) = suffix;
+ name_.suffix = suffix;
}
-
Name::Name(const char *name, uint32_t segment) {
- name_.type = HNT_UNSPEC;
if (hicn_name_create(name, segment, &name_) < 0) {
throw errors::InvalidIpAddressException();
}
@@ -59,7 +57,6 @@ Name::Name(const std::string &uri, uint32_t segment)
: Name(uri.c_str(), segment) {}
Name::Name(const std::string &uri) {
- name_.type = HNT_UNSPEC;
utils::StringTokenizer tokenizer(uri, "|");
std::string ip_address;
std::string seq_number;
@@ -80,9 +77,13 @@ Name::Name(const std::string &uri) {
Name::Name(const Name &name) { this->name_ = name.name_; }
+Name::~Name() {}
+
Name &Name::operator=(const Name &name) {
- if (hicn_name_copy(&this->name_, &name.name_) < 0) {
- throw errors::MalformedNameException();
+ if (this != &name) {
+ if (hicn_name_copy(&this->name_, &name.name_) < 0) {
+ throw errors::MalformedNameException();
+ }
}
return *this;
@@ -129,9 +130,13 @@ uint32_t Name::getHash32(bool consider_suffix) const {
return hash;
}
-void Name::clear() { name_.type = HNT_UNSPEC; };
+void Name::clear() { std::memset(&name_, 0, sizeof(name_)); };
-Name::Type Name::getType() const { return name_.type; }
+Name::Type Name::getType() const {
+ int family;
+ hicn_name_get_family(&name_, &family);
+ return family == AF_INET ? Name::Type::V4 : Name::Type::V6;
+}
uint32_t Name::getSuffix() const {
uint32_t ret = 0;
@@ -151,29 +156,6 @@ Name &Name::setSuffix(uint32_t seq_number) {
return *this;
}
-std::shared_ptr<Sockaddr> Name::getAddress() const {
- Sockaddr *ret = nullptr;
-
- switch (name_.type) {
- case HNT_CONTIGUOUS_V4:
- case HNT_IOV_V4:
- ret = (Sockaddr *)new Sockaddr4;
- break;
- case HNT_CONTIGUOUS_V6:
- case HNT_IOV_V6:
- ret = (Sockaddr *)new Sockaddr6;
- break;
- default:
- throw errors::MalformedNameException();
- }
-
- if (hicn_name_to_sockaddr_address((hicn_name_t *)&name_, ret) < 0) {
- throw errors::MalformedNameException();
- }
-
- return std::shared_ptr<Sockaddr>(ret);
-}
-
ip_prefix_t Name::toIpAddress() const {
ip_prefix_t ret;
std::memset(&ret, 0, sizeof(ret));
@@ -195,8 +177,8 @@ int Name::getAddressFamily() const {
return ret;
}
-void Name::copyToDestination(uint8_t *destination, bool include_suffix) const {
- if (hicn_name_copy_to_destination(destination, &name_, include_suffix) < 0) {
+void Name::copyPrefixToDestination(uint8_t *destination) const {
+ if (hicn_name_copy_prefix_to_destination(destination, &name_) < 0) {
throw errors::RuntimeException(
"Impossibe to copy the name into the "
"provided destination");
diff --git a/libtransport/src/core/packet.cc b/libtransport/src/core/packet.cc
index 51337201f..df27444af 100644
--- a/libtransport/src/core/packet.cc
+++ b/libtransport/src/core/packet.cc
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -41,13 +41,6 @@ Packet::Packet(Format format, std::size_t additional_header_size)
setFormat(format_, additional_header_size);
}
-Packet::Packet(MemBuf &&buffer)
- : utils::MemBuf(std::move(buffer)),
- packet_start_(reinterpret_cast<hicn_header_t *>(writableData())),
- header_offset_(0),
- format_(getFormatFromBuffer(data(), length())),
- payload_type_(PayloadType::UNSPECIFIED) {}
-
Packet::Packet(CopyBufferOp, const uint8_t *buffer, std::size_t size)
: utils::MemBuf(COPY_BUFFER, buffer, size),
packet_start_(reinterpret_cast<hicn_header_t *>(writableData())),
@@ -74,11 +67,11 @@ Packet::Packet(CreateOp, uint8_t *buffer, std::size_t length, std::size_t size,
setFormat(format_, additional_header_size);
}
-Packet::Packet(const Packet &other)
- : utils::MemBuf(other),
+Packet::Packet(MemBuf &&buffer)
+ : utils::MemBuf(std::move(buffer)),
packet_start_(reinterpret_cast<hicn_header_t *>(writableData())),
- header_offset_(other.header_offset_),
- format_(other.format_),
+ header_offset_(0),
+ format_(getFormatFromBuffer(data(), length())),
payload_type_(PayloadType::UNSPECIFIED) {}
Packet::Packet(Packet &&other)
@@ -92,6 +85,13 @@ Packet::Packet(Packet &&other)
other.header_offset_ = 0;
}
+Packet::Packet(const Packet &other)
+ : utils::MemBuf(other),
+ packet_start_(reinterpret_cast<hicn_header_t *>(writableData())),
+ header_offset_(other.header_offset_),
+ format_(other.format_),
+ payload_type_(PayloadType::UNSPECIFIED) {}
+
Packet::~Packet() {}
Packet &Packet::operator=(const Packet &other) {
@@ -103,29 +103,19 @@ Packet &Packet::operator=(const Packet &other) {
return *this;
}
-std::size_t Packet::getHeaderSizeFromBuffer(Format format,
- const uint8_t *buffer) {
- size_t header_length;
-
- if (hicn_packet_get_header_length(format, (hicn_header_t *)buffer,
- &header_length) < 0) {
- throw errors::MalformedPacketException();
- }
-
- return header_length;
+std::shared_ptr<utils::MemBuf> Packet::acquireMemBufReference() {
+ return std::static_pointer_cast<utils::MemBuf>(shared_from_this());
}
-bool Packet::isInterest(const uint8_t *buffer) {
- bool is_interest = false;
-
- if (TRANSPORT_EXPECT_FALSE(hicn_packet_test_ece(HF_INET6_TCP,
- (const hicn_header_t *)buffer,
- &is_interest) < 0)) {
- throw errors::RuntimeException(
- "Impossible to retrieve ece flag from packet");
+Packet::Format Packet::getFormat() const {
+ // We check packet start because after a movement it will result in a nullptr
+ if (format_ == HF_UNSPEC && length()) {
+ if (hicn_packet_get_format(packet_start_, &format_) < 0) {
+ LOG(ERROR) << "Unexpected packet format HF_UNSPEC.";
+ }
}
- return !is_interest;
+ return format_;
}
void Packet::setFormat(Packet::Format format,
@@ -136,47 +126,43 @@ void Packet::setFormat(Packet::Format format,
}
auto header_size = getHeaderSizeFromFormat(format_);
- assert(header_size <= tailroom());
+ DCHECK(header_size <= tailroom());
append(header_size);
- assert(additional_header_size <= tailroom());
+ DCHECK(additional_header_size <= tailroom());
append(additional_header_size);
header_offset_ = length();
}
-bool Packet::isInterest() { return Packet::isInterest(data()); }
+PayloadType Packet::getPayloadType() const {
+ if (payload_type_ == PayloadType::UNSPECIFIED) {
+ hicn_payload_type_t ret;
-std::size_t Packet::getPayloadSizeFromBuffer(Format format,
- const uint8_t *buffer) {
- std::size_t payload_length;
- if (TRANSPORT_EXPECT_FALSE(
- hicn_packet_get_payload_length(format, (hicn_header_t *)buffer,
- &payload_length) < 0)) {
- throw errors::MalformedPacketException();
+ if (hicn_packet_get_payload_type(format_, packet_start_, &ret) < 0) {
+ throw errors::RuntimeException("Impossible to retrieve payload type.");
+ }
+
+ payload_type_ = (PayloadType)ret;
}
- return payload_length;
+ return payload_type_;
}
-std::size_t Packet::payloadSize() const {
- std::size_t ret = 0;
-
- if (length()) {
- ret = getPayloadSizeFromBuffer(format_,
- reinterpret_cast<uint8_t *>(packet_start_));
+Packet &Packet::setPayloadType(PayloadType payload_type) {
+ if (hicn_packet_set_payload_type(format_, packet_start_,
+ hicn_payload_type_t(payload_type)) < 0) {
+ throw errors::RuntimeException("Error setting payload type of the packet.");
}
- return ret;
+ payload_type_ = payload_type;
+ return *this;
}
-std::size_t Packet::headerSize() const {
- if (header_offset_ == 0 && length()) {
- const_cast<Packet *>(this)->header_offset_ = getHeaderSizeFromBuffer(
- format_, reinterpret_cast<uint8_t *>(packet_start_));
- }
-
- return header_offset_;
+std::unique_ptr<utils::MemBuf> Packet::getPayload() const {
+ auto ret = clone();
+ ret->trimStart(headerSize());
+ return ret;
}
Packet &Packet::appendPayload(std::unique_ptr<utils::MemBuf> &&payload) {
@@ -196,12 +182,56 @@ Packet &Packet::appendPayload(const uint8_t *buffer, std::size_t length) {
return *this;
}
-std::unique_ptr<utils::MemBuf> Packet::getPayload() const {
- auto ret = clone();
- ret->trimStart(headerSize());
+std::size_t Packet::headerSize() const {
+ if (header_offset_ == 0 && length()) {
+ const_cast<Packet *>(this)->header_offset_ = getHeaderSizeFromBuffer(
+ format_, reinterpret_cast<uint8_t *>(packet_start_));
+ }
+
+ return header_offset_;
+}
+
+std::size_t Packet::payloadSize() const {
+ std::size_t ret = 0;
+
+ if (length()) {
+ ret = getPayloadSizeFromBuffer(format_,
+ reinterpret_cast<uint8_t *>(packet_start_));
+ }
+
return ret;
}
+auth::CryptoHash Packet::computeDigest(auth::CryptoHashType algorithm) const {
+ auth::CryptoHash hash;
+ hash.setType(algorithm);
+
+ // Copy IP+TCP/ICMP header before zeroing them
+ hicn_header_t header_copy;
+ hicn_packet_copy_header(format_, packet_start_, &header_copy, false);
+ const_cast<Packet *>(this)->resetForHash();
+
+ hash.computeDigest(this);
+ hicn_packet_copy_header(format_, &header_copy, packet_start_, false);
+
+ return hash;
+}
+
+void Packet::reset() {
+ clear();
+ packet_start_ = reinterpret_cast<hicn_header_t *>(writableData());
+ header_offset_ = 0;
+ format_ = HF_UNSPEC;
+ payload_type_ = PayloadType::UNSPECIFIED;
+ name_.clear();
+
+ if (isChained()) {
+ separateChain(next(), prev());
+ }
+}
+
+bool Packet::isInterest() { return Packet::isInterest(data(), format_); }
+
Packet &Packet::updateLength(std::size_t length) {
std::size_t total_length = length;
@@ -221,47 +251,6 @@ Packet &Packet::updateLength(std::size_t length) {
return *this;
}
-PayloadType Packet::getPayloadType() const {
- if (payload_type_ == PayloadType::UNSPECIFIED) {
- hicn_payload_type_t ret;
- if (hicn_packet_get_payload_type(packet_start_, &ret) < 0) {
- throw errors::RuntimeException("Impossible to retrieve payload type.");
- }
-
- payload_type_ = (PayloadType)ret;
- }
-
- return payload_type_;
-}
-
-Packet &Packet::setPayloadType(PayloadType payload_type) {
- if (hicn_packet_set_payload_type(packet_start_,
- hicn_payload_type_t(payload_type)) < 0) {
- throw errors::RuntimeException("Error setting payload type of the packet.");
- }
-
- payload_type_ = payload_type;
-
- return *this;
-}
-
-Packet::Format Packet::getFormat() const {
- /**
- * We check packet start because after a movement it will result in a nullptr
- */
- if (format_ == HF_UNSPEC && length()) {
- if (hicn_packet_get_format(packet_start_, &format_) < 0) {
- LOG(ERROR) << "Unexpected packet format HF_UNSPEC.";
- }
- }
-
- return format_;
-}
-
-std::shared_ptr<utils::MemBuf> Packet::acquireMemBufReference() {
- return std::static_pointer_cast<utils::MemBuf>(shared_from_this());
-}
-
void Packet::dump() const {
LOG(INFO) << "HEADER -- Length: " << headerSize();
LOG(INFO) << "PAYLOAD -- Length: " << payloadSize();
@@ -274,180 +263,42 @@ void Packet::dump() const {
} while (current != this);
}
-void Packet::dump(uint8_t *buffer, std::size_t length) {
- hicn_packet_dump(buffer, length);
-}
-
-void Packet::setSignatureSize(std::size_t size_bytes) {
- if (!authenticationHeader()) {
- throw errors::RuntimeException("Packet without Authentication Header.");
- }
-
- int ret = hicn_packet_set_signature_size(format_, packet_start_, size_bytes);
-
- if (ret < 0) {
- throw errors::RuntimeException("Error setting signature size.");
- }
-}
-
-void Packet::setSignatureSizeGap(std::size_t size_bytes) {
- if (!authenticationHeader()) {
- throw errors::RuntimeException("Packet without Authentication Header.");
- }
-
- int ret = hicn_packet_set_signature_gap(format_, packet_start_,
- (uint8_t)size_bytes);
-
- if (ret < 0) {
- throw errors::RuntimeException("Error setting signature size.");
- }
-}
-
-uint8_t *Packet::getSignature() const {
- if (!authenticationHeader()) {
- throw errors::RuntimeException("Packet without Authentication Header.");
- }
-
- uint8_t *signature;
- int ret = hicn_packet_get_signature(format_, packet_start_, &signature);
-
- if (ret < 0) {
- throw errors::RuntimeException("Error getting signature.");
- }
-
- return signature;
-}
-
-void Packet::setSignatureTimestamp(const uint64_t &timestamp) {
- if (!authenticationHeader()) {
- throw errors::RuntimeException("Packet without Authentication Header.");
- }
-
- int ret =
- hicn_packet_set_signature_timestamp(format_, packet_start_, timestamp);
-
- if (ret < 0) {
- throw errors::RuntimeException("Error setting the signature timestamp.");
- }
-}
-
-uint64_t Packet::getSignatureTimestamp() const {
- if (!authenticationHeader()) {
- throw errors::RuntimeException("Packet without Authentication Header.");
- }
-
- uint64_t return_value;
- int ret = hicn_packet_get_signature_timestamp(format_, packet_start_,
- &return_value);
-
- if (ret < 0) {
- throw errors::RuntimeException("Error getting the signature timestamp.");
- }
-
- return return_value;
-}
-
-void Packet::setValidationAlgorithm(
- const auth::CryptoSuite &validation_algorithm) {
- if (!authenticationHeader()) {
- throw errors::RuntimeException("Packet without Authentication Header.");
- }
-
- int ret = hicn_packet_set_validation_algorithm(format_, packet_start_,
- uint8_t(validation_algorithm));
-
- if (ret < 0) {
- throw errors::RuntimeException("Error setting the validation algorithm.");
- }
-}
-
-auth::CryptoSuite Packet::getValidationAlgorithm() const {
- if (!authenticationHeader()) {
- throw errors::RuntimeException("Packet without Authentication Header.");
- }
-
- uint8_t return_value;
- int ret = hicn_packet_get_validation_algorithm(format_, packet_start_,
- &return_value);
-
- if (ret < 0) {
- throw errors::RuntimeException("Error getting the validation algorithm.");
- }
-
- return auth::CryptoSuite(return_value);
-}
-
-void Packet::setKeyId(const auth::KeyId &key_id) {
- if (!authenticationHeader()) {
- throw errors::RuntimeException("Packet without Authentication Header.");
- }
-
- int ret = hicn_packet_set_key_id(format_, packet_start_, key_id.first);
-
- if (ret < 0) {
- throw errors::RuntimeException("Error setting the key id.");
- }
-}
-
-auth::KeyId Packet::getKeyId() const {
- if (!authenticationHeader()) {
- throw errors::RuntimeException("Packet without Authentication Header.");
- }
+void Packet::setChecksum() {
+ if (_is_tcp(format_)) {
+ uint16_t partial_csum =
+ csum(data() + HICN_V6_TCP_HDRLEN, length() - HICN_V6_TCP_HDRLEN, 0);
- auth::KeyId return_value;
- int ret = hicn_packet_get_key_id(format_, packet_start_, &return_value.first,
- &return_value.second);
+ for (utils::MemBuf *current = next(); current != this;
+ current = current->next()) {
+ partial_csum = csum(current->data(), current->length(), ~partial_csum);
+ }
- if (ret < 0) {
- throw errors::RuntimeException("Error getting the validation algorithm.");
+ if (hicn_packet_compute_header_checksum(format_, packet_start_,
+ partial_csum) < 0) {
+ throw errors::MalformedPacketException();
+ }
}
-
- return return_value;
-}
-
-auth::CryptoHash Packet::computeDigest(auth::CryptoHashType algorithm) const {
- auth::CryptoHash hash;
- hash.setType(algorithm);
-
- // Copy IP+TCP/ICMP header before zeroing them
- hicn_header_t header_copy;
-
- hicn_packet_copy_header(format_, packet_start_, &header_copy, false);
-
- const_cast<Packet *>(this)->resetForHash();
-
- hash.computeDigest(this);
- hicn_packet_copy_header(format_, &header_copy, packet_start_, false);
-
- return hash;
}
bool Packet::checkIntegrity() const {
- uint16_t partial_csum =
- csum(data() + HICN_V6_TCP_HDRLEN, length() - HICN_V6_TCP_HDRLEN, 0);
+ if (_is_tcp(format_)) {
+ uint16_t partial_csum =
+ csum(data() + HICN_V6_TCP_HDRLEN, length() - HICN_V6_TCP_HDRLEN, 0);
- for (const utils::MemBuf *current = next(); current != this;
- current = current->next()) {
- partial_csum = csum(current->data(), current->length(), ~partial_csum);
- }
+ for (const utils::MemBuf *current = next(); current != this;
+ current = current->next()) {
+ partial_csum = csum(current->data(), current->length(), ~partial_csum);
+ }
- if (hicn_packet_check_integrity_no_payload(format_, packet_start_,
- partial_csum) < 0) {
- return false;
+ if (hicn_packet_check_integrity_no_payload(format_, packet_start_,
+ partial_csum) < 0) {
+ return false;
+ }
}
return true;
}
-void Packet::prependPayload(const uint8_t **buffer, std::size_t *size) {
- auto last = prev();
- auto to_copy = std::min(*size, last->tailroom());
- std::memcpy(last->writableTail(), *buffer, to_copy);
- last->append(to_copy);
- *size -= to_copy;
- *buffer += to_copy;
-}
-
Packet &Packet::setSyn() {
if (hicn_packet_set_syn(format_, packet_start_) < 0) {
throw errors::RuntimeException("Error setting syn bit in the packet.");
@@ -553,29 +404,23 @@ Packet &Packet::resetFlags() {
resetAck();
resetRst();
resetFin();
-
return *this;
}
std::string Packet::printFlags() const {
std::string flags;
-
if (testSyn()) {
flags += "S";
}
-
if (testAck()) {
flags += "A";
}
-
if (testRst()) {
flags += "R";
}
-
if (testFin()) {
flags += "F";
}
-
return flags;
}
@@ -634,6 +479,245 @@ uint8_t Packet::getTTL() const {
return hops;
}
+bool Packet::hasAH() const { return _is_ah(format_); }
+
+std::vector<uint8_t> Packet::getSignature() const {
+ if (!hasAH()) {
+ throw errors::RuntimeException("Packet without Authentication Header.");
+ }
+
+ uint8_t *signature;
+ int ret = hicn_packet_get_signature(format_, packet_start_, &signature);
+
+ if (ret < 0) {
+ throw errors::RuntimeException("Error getting signature.");
+ }
+
+ return std::vector<uint8_t>(signature, signature + getSignatureFieldSize());
+}
+
+std::size_t Packet::getSignatureFieldSize() const {
+ if (!hasAH()) {
+ throw errors::RuntimeException("Packet without Authentication Header.");
+ }
+
+ size_t field_size;
+ int ret = hicn_packet_get_signature_size(format_, packet_start_, &field_size);
+ if (ret < 0) {
+ throw errors::RuntimeException("Error reading signature field size");
+ }
+ return field_size;
+}
+
+std::size_t Packet::getSignatureSize() const {
+ if (!hasAH()) {
+ throw errors::RuntimeException("Packet without Authentication Header.");
+ }
+
+ size_t padding;
+ int ret = hicn_packet_get_signature_padding(format_, packet_start_, &padding);
+ if (ret < 0) {
+ throw errors::RuntimeException("Error reading signature padding");
+ }
+
+ size_t size = getSignatureFieldSize() - padding;
+ if (size < 0) {
+ throw errors::RuntimeException("Error reading signature size");
+ }
+
+ return size;
+}
+
+uint64_t Packet::getSignatureTimestamp() const {
+ if (!hasAH()) {
+ throw errors::RuntimeException("Packet without Authentication Header.");
+ }
+
+ uint64_t timestamp;
+ int ret =
+ hicn_packet_get_signature_timestamp(format_, packet_start_, &timestamp);
+ if (ret < 0) {
+ throw errors::RuntimeException("Error getting the signature timestamp.");
+ }
+ return timestamp;
+}
+
+auth::KeyId Packet::getKeyId() const {
+ if (!hasAH()) {
+ throw errors::RuntimeException("Packet without Authentication Header.");
+ }
+
+ auth::KeyId key_id;
+ int ret = hicn_packet_get_key_id(format_, packet_start_, &key_id.first,
+ &key_id.second);
+ if (ret < 0) {
+ throw errors::RuntimeException("Error getting the validation algorithm.");
+ }
+ return key_id;
+}
+
+auth::CryptoSuite Packet::getValidationAlgorithm() const {
+ if (!hasAH()) {
+ throw errors::RuntimeException("Packet without Authentication Header.");
+ }
+
+ uint8_t return_value;
+ int ret = hicn_packet_get_validation_algorithm(format_, packet_start_,
+ &return_value);
+ if (ret < 0) {
+ throw errors::RuntimeException("Error getting the validation algorithm.");
+ }
+ return auth::CryptoSuite(return_value);
+}
+
+void Packet::setSignature(const std::vector<uint8_t> &signature) {
+ if (!hasAH()) {
+ throw errors::RuntimeException("Packet without Authentication Header.");
+ }
+
+ uint8_t *signature_field;
+ int ret = hicn_packet_get_signature(format_, packet_start_, &signature_field);
+ if (ret < 0) {
+ throw errors::RuntimeException("Error getting signature.");
+ }
+ memcpy(signature_field, signature.data(), signature.size());
+}
+
+void Packet::setSignatureFieldSize(std::size_t size) {
+ if (!hasAH()) {
+ throw errors::RuntimeException("Packet without Authentication Header.");
+ }
+
+ int ret = hicn_packet_set_signature_size(format_, packet_start_, size);
+ if (ret < 0) {
+ throw errors::RuntimeException("Error setting signature size.");
+ }
+}
+
+void Packet::setSignatureSize(std::size_t size) {
+ if (!hasAH()) {
+ throw errors::RuntimeException("Packet without Authentication Header.");
+ }
+
+ size_t padding = getSignatureFieldSize() - size;
+ if (padding < 0) {
+ throw errors::RuntimeException("Error setting signature padding.");
+ }
+
+ int ret = hicn_packet_set_signature_padding(format_, packet_start_, padding);
+ if (ret < 0) {
+ throw errors::RuntimeException("Error setting signature padding.");
+ }
+}
+
+void Packet::setSignatureTimestamp(const uint64_t &timestamp) {
+ if (!hasAH()) {
+ throw errors::RuntimeException("Packet without Authentication Header.");
+ }
+
+ int ret =
+ hicn_packet_set_signature_timestamp(format_, packet_start_, timestamp);
+ if (ret < 0) {
+ throw errors::RuntimeException("Error setting the signature timestamp.");
+ }
+}
+
+void Packet::setKeyId(const auth::KeyId &key_id) {
+ if (!hasAH()) {
+ throw errors::RuntimeException("Packet without Authentication Header.");
+ }
+
+ int ret = hicn_packet_set_key_id(format_, packet_start_, key_id.first);
+ if (ret < 0) {
+ throw errors::RuntimeException("Error setting the key id.");
+ }
+}
+
+void Packet::setValidationAlgorithm(
+ const auth::CryptoSuite &validation_algorithm) {
+ if (!hasAH()) {
+ throw errors::RuntimeException("Packet without Authentication Header.");
+ }
+
+ int ret = hicn_packet_set_validation_algorithm(format_, packet_start_,
+ uint8_t(validation_algorithm));
+ if (ret < 0) {
+ throw errors::RuntimeException("Error setting the validation algorithm.");
+ }
+}
+
+Packet::Format Packet::toAHFormat(const Format &format) {
+ return hicn_get_ah_format(format);
+}
+
+Packet::Format Packet::getFormatFromBuffer(const uint8_t *buffer,
+ std::size_t /* length */) {
+ Packet::Format format = HF_UNSPEC;
+ hicn_packet_get_format((const hicn_header_t *)buffer, &format);
+ return format;
+}
+
+std::size_t Packet::getHeaderSizeFromFormat(Format format,
+ std::size_t signature_size) {
+ std::size_t header_length;
+ hicn_packet_get_header_length_from_format(format, &header_length);
+ int is_ah = _is_ah(format);
+ return is_ah * (header_length + signature_size) + (!is_ah) * header_length;
+}
+
+std::size_t Packet::getHeaderSizeFromBuffer(Format format,
+ const uint8_t *buffer) {
+ size_t header_length;
+
+ if (hicn_packet_get_header_length(format, (hicn_header_t *)buffer,
+ &header_length) < 0) {
+ throw errors::MalformedPacketException();
+ }
+
+ return header_length;
+}
+
+std::size_t Packet::getPayloadSizeFromBuffer(Format format,
+ const uint8_t *buffer) {
+ std::size_t payload_length;
+ if (TRANSPORT_EXPECT_FALSE(
+ hicn_packet_get_payload_length(format, (hicn_header_t *)buffer,
+ &payload_length) < 0)) {
+ throw errors::MalformedPacketException();
+ }
+
+ return payload_length;
+}
+
+bool Packet::isInterest(const uint8_t *buffer, Format format) {
+ int is_interest = 0;
+
+ if (TRANSPORT_EXPECT_FALSE(format == Format::HF_UNSPEC)) {
+ format = getFormatFromBuffer(buffer, /* Unused length */ 128);
+ }
+
+ if (TRANSPORT_EXPECT_FALSE(
+ hicn_packet_is_interest(format, (const hicn_header_t *)buffer,
+ &is_interest) < 0)) {
+ throw errors::RuntimeException("Error reading ece flag from packet");
+ }
+
+ return is_interest;
+}
+
+void Packet::dump(uint8_t *buffer, std::size_t length) {
+ hicn_packet_dump(buffer, length);
+}
+
+void Packet::prependPayload(const uint8_t **buffer, std::size_t *size) {
+ auto last = prev();
+ auto to_copy = std::min(*size, last->tailroom());
+ std::memcpy(last->writableTail(), *buffer, to_copy);
+ last->append(to_copy);
+ *size -= to_copy;
+ *buffer += to_copy;
+}
+
} // end namespace core
} // end namespace transport
diff --git a/libtransport/src/core/pending_interest.cc b/libtransport/src/core/pending_interest.cc
deleted file mode 100644
index fbe98cab5..000000000
--- a/libtransport/src/core/pending_interest.cc
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <core/pending_interest.h>
-
-namespace transport {
-
-namespace core {
-
-PendingInterest::PendingInterest()
- : interest_(nullptr, nullptr),
- timer_(),
- on_content_object_callback_(),
- on_interest_timeout_callback_() {}
-
-PendingInterest::PendingInterest(Interest::Ptr &&interest,
- std::unique_ptr<asio::steady_timer> &&timer)
- : interest_(std::move(interest)),
- timer_(std::move(timer)),
- on_content_object_callback_(),
- on_interest_timeout_callback_() {}
-
-PendingInterest::PendingInterest(
- Interest::Ptr &&interest, OnContentObjectCallback &&on_content_object,
- OnInterestTimeoutCallback &&on_interest_timeout,
- std::unique_ptr<asio::steady_timer> &&timer)
- : interest_(std::move(interest)),
- timer_(std::move(timer)),
- on_content_object_callback_(std::move(on_content_object)),
- on_interest_timeout_callback_(std::move(on_interest_timeout)) {}
-
-PendingInterest::~PendingInterest() {}
-
-void PendingInterest::cancelTimer() { timer_->cancel(); }
-
-void PendingInterest::setInterest(Interest::Ptr &&interest) {
- interest_ = std::move(interest);
-}
-
-Interest::Ptr &&PendingInterest::getInterest() { return std::move(interest_); }
-
-const OnContentObjectCallback &PendingInterest::getOnDataCallback() const {
- return on_content_object_callback_;
-}
-
-void PendingInterest::setOnContentObjectCallback(
- OnContentObjectCallback &&on_content_object) {
- PendingInterest::on_content_object_callback_ = on_content_object;
-}
-
-const OnInterestTimeoutCallback &PendingInterest::getOnTimeoutCallback() const {
- return on_interest_timeout_callback_;
-}
-
-void PendingInterest::setOnTimeoutCallback(
- OnInterestTimeoutCallback &&on_interest_timeout) {
- PendingInterest::on_interest_timeout_callback_ = on_interest_timeout;
-}
-
-} // end namespace core
-
-} // end namespace transport
diff --git a/libtransport/src/core/pending_interest.h b/libtransport/src/core/pending_interest.h
index 99a8bd327..f8a4ba10e 100644
--- a/libtransport/src/core/pending_interest.h
+++ b/libtransport/src/core/pending_interest.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -48,19 +48,17 @@ class PendingInterest {
// on_content_object_callback_(),
// on_interest_timeout_callback_() {}
- PendingInterest(Interest::Ptr &&interest,
- std::unique_ptr<asio::steady_timer> &&timer)
- : interest_(std::move(interest)),
- timer_(std::move(timer)),
+ PendingInterest(asio::io_service &io_service, const Interest::Ptr &interest)
+ : interest_(interest),
+ timer_(io_service),
on_content_object_callback_(),
on_interest_timeout_callback_() {}
- PendingInterest(Interest::Ptr &&interest,
+ PendingInterest(asio::io_service &io_service, const Interest::Ptr &interest,
OnContentObjectCallback &&on_content_object,
- OnInterestTimeoutCallback &&on_interest_timeout,
- std::unique_ptr<asio::steady_timer> &&timer)
- : interest_(std::move(interest)),
- timer_(std::move(timer)),
+ OnInterestTimeoutCallback &&on_interest_timeout)
+ : interest_(interest),
+ timer_(io_service),
on_content_object_callback_(std::move(on_content_object)),
on_interest_timeout_callback_(std::move(on_interest_timeout)) {}
@@ -68,12 +66,12 @@ class PendingInterest {
template <typename Handler>
TRANSPORT_ALWAYS_INLINE void startCountdown(Handler &&cb) {
- timer_->expires_from_now(
+ timer_.expires_from_now(
std::chrono::milliseconds(interest_->getLifetime()));
- timer_->async_wait(std::forward<Handler &&>(cb));
+ timer_.async_wait(std::forward<Handler &&>(cb));
}
- TRANSPORT_ALWAYS_INLINE void cancelTimer() { timer_->cancel(); }
+ TRANSPORT_ALWAYS_INLINE void cancelTimer() { timer_.cancel(); }
TRANSPORT_ALWAYS_INLINE Interest::Ptr &&getInterest() {
return std::move(interest_);
@@ -105,7 +103,7 @@ class PendingInterest {
private:
Interest::Ptr interest_;
- std::unique_ptr<asio::steady_timer> timer_;
+ asio::steady_timer timer_;
OnContentObjectCallback on_content_object_callback_;
OnInterestTimeoutCallback on_interest_timeout_callback_;
};
diff --git a/libtransport/src/core/portal.cc b/libtransport/src/core/portal.cc
index c4c0cf8ba..d8e8d78ea 100644
--- a/libtransport/src/core/portal.cc
+++ b/libtransport/src/core/portal.cc
@@ -30,11 +30,11 @@ namespace core {
#ifdef ANDROID
static const constexpr char default_module[] = "";
#elif defined(MACINTOSH)
-static const constexpr char default_module[] = "hicnlight_module.dylib";
+static const constexpr char default_module[] = "hicnlightng_module.dylib";
#elif defined(LINUX)
-static const constexpr char default_module[] = "hicnlight_module.so";
+static const constexpr char default_module[] = "hicnlightng_module.so";
#elif defined(WINDOWS)
-static const constexpr char default_module[] = "hicnlight_module.lib";
+static const constexpr char default_module[] = "hicnlightng_module.lib";
#endif
IoModuleConfiguration Portal::conf_;
@@ -57,7 +57,7 @@ std::string Portal::defaultIoModule() {
void Portal::getModuleConfiguration(ConfigurationObject& object,
std::error_code& ec) {
- assert(object.getKey() == io_module_section);
+ DCHECK(object.getKey() == io_module_section);
auto conf = dynamic_cast<const IoModuleConfiguration&>(object);
conf = conf_;
@@ -103,7 +103,7 @@ std::string getIoModulePath(const std::string& name,
void Portal::setModuleConfiguration(const ConfigurationObject& object,
std::error_code& ec) {
- assert(object.getKey() == io_module_section);
+ DCHECK(object.getKey() == io_module_section);
const IoModuleConfiguration& conf =
dynamic_cast<const IoModuleConfiguration&>(object);
diff --git a/libtransport/src/core/portal.h b/libtransport/src/core/portal.h
index f6a9ce85b..aae4c573e 100644
--- a/libtransport/src/core/portal.h
+++ b/libtransport/src/core/portal.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -15,6 +15,7 @@
#pragma once
+#include <core/global_workers.h>
#include <core/pending_interest.h>
#include <glog/logging.h>
#include <hicn/transport/config.h>
@@ -28,6 +29,7 @@
#include <hicn/transport/interfaces/global_conf_interface.h>
#include <hicn/transport/interfaces/portal.h>
#include <hicn/transport/portability/portability.h>
+#include <hicn/transport/utils/event_thread.h>
#include <hicn/transport/utils/fixed_block_allocator.h>
#include <future>
@@ -54,11 +56,11 @@ class HandlerMemory {
HandlerMemory(const HandlerMemory &) = delete;
HandlerMemory &operator=(const HandlerMemory &) = delete;
- TRANSPORT_ALWAYS_INLINE void *allocate(std::size_t size) {
+ void *allocate(std::size_t size) {
return utils::FixedBlockAllocator<128, 8192>::getInstance().allocateBlock();
}
- TRANSPORT_ALWAYS_INLINE void deallocate(void *pointer) {
+ void deallocate(void *pointer) {
utils::FixedBlockAllocator<128, 8192>::getInstance().deallocateBlock(
pointer);
}
@@ -69,13 +71,9 @@ class HandlerMemory {
HandlerMemory(const HandlerMemory &) = delete;
HandlerMemory &operator=(const HandlerMemory &) = delete;
- TRANSPORT_ALWAYS_INLINE void *allocate(std::size_t size) {
- return ::operator new(size);
- }
+ void *allocate(std::size_t size) { return ::operator new(size); }
- TRANSPORT_ALWAYS_INLINE void deallocate(void *pointer) {
- ::operator delete(pointer);
- }
+ void deallocate(void *pointer) { ::operator delete(pointer); }
#endif
};
@@ -92,21 +90,19 @@ class HandlerAllocator {
HandlerAllocator(const HandlerAllocator<U> &other) noexcept
: memory_(other.memory_) {}
- TRANSPORT_ALWAYS_INLINE bool operator==(
- const HandlerAllocator &other) const noexcept {
+ bool operator==(const HandlerAllocator &other) const noexcept {
return &memory_ == &other.memory_;
}
- TRANSPORT_ALWAYS_INLINE bool operator!=(
- const HandlerAllocator &other) const noexcept {
+ bool operator!=(const HandlerAllocator &other) const noexcept {
return &memory_ != &other.memory_;
}
- TRANSPORT_ALWAYS_INLINE T *allocate(std::size_t n) const {
+ T *allocate(std::size_t n) const {
return static_cast<T *>(memory_.allocate(sizeof(T) * n));
}
- TRANSPORT_ALWAYS_INLINE void deallocate(T *p, std::size_t /*n*/) const {
+ void deallocate(T *p, std::size_t /*n*/) const {
return memory_.deallocate(p);
}
@@ -135,7 +131,7 @@ class CustomAllocatorHandler {
}
template <typename... Args>
- void operator()(Args &&... args) {
+ void operator()(Args &&...args) {
handler_(std::forward<Args>(args)...);
}
@@ -151,49 +147,16 @@ inline CustomAllocatorHandler<Handler> makeCustomAllocatorHandler(
return CustomAllocatorHandler<Handler>(m, h);
}
-class Pool {
- public:
- Pool(asio::io_service &io_service) : io_service_(io_service) {
- increasePendingInterestPool();
- }
-
- TRANSPORT_ALWAYS_INLINE void increasePendingInterestPool() {
- // Create pool of pending interests to reuse
- for (uint32_t i = 0; i < pit_size; i++) {
- pending_interests_pool_.add(new PendingInterest(
- Interest::Ptr(nullptr),
- std::make_unique<asio::steady_timer>(io_service_)));
- }
- }
- PendingInterest::Ptr getPendingInterest() {
- auto res = pending_interests_pool_.get();
- while (TRANSPORT_EXPECT_FALSE(!res.first)) {
- increasePendingInterestPool();
- res = pending_interests_pool_.get();
- }
-
- return std::move(res.second);
- }
-
- private:
- utils::ObjectPool<PendingInterest> pending_interests_pool_;
- asio::io_service &io_service_;
-};
-
} // namespace portal_details
class PortalConfiguration;
-using PendingInterestHashTable =
- std::unordered_map<uint32_t, PendingInterest::Ptr>;
-
-using interface::BindConfig;
+using PendingInterestHashTable = std::unordered_map<uint32_t, PendingInterest>;
/**
* Portal is a opaque class which is used for sending/receiving interest/data
- * packets over multiple kind of connector. The connector itself is defined by
- * the template ForwarderInt, which is resolved at compile time. It is then not
- * possible to decide at runtime what the connector will be.
+ * packets over multiple kind of io_modules. The io_module itself is an external
+ * module loaded at runtime.
*
* The tasks performed by portal are the following:
* - Sending/Receiving Interest packets
@@ -210,22 +173,16 @@ using interface::BindConfig;
* users of this class.
*/
-class Portal {
- public:
- using ConsumerCallback = interface::Portal::ConsumerCallback;
- using ProducerCallback = interface::Portal::ProducerCallback;
-
- friend class PortalConfiguration;
-
- Portal() : Portal(internal_io_service_) {}
+class Portal : public ::utils::NonCopyable,
+ public std::enable_shared_from_this<Portal> {
+ private:
+ Portal() : Portal(GlobalWorkers::getInstance().getWorker()) {}
- Portal(asio::io_service &io_service)
+ Portal(::utils::EventThread &worker)
: io_module_(nullptr, [](IoModule *module) { IoModule::unload(module); }),
- io_service_(io_service),
- packet_pool_(io_service),
+ worker_(worker),
app_name_("libtransport_application"),
- consumer_callback_(nullptr),
- producer_callback_(nullptr),
+ transport_callback_(nullptr),
is_consumer_(false) {
/**
* This workaroung allows to initialize memory for packet buffers *before*
@@ -235,58 +192,105 @@ class Portal {
*/
PacketManager<>::getInstance();
}
+
+ public:
+ using TransportCallback = interface::Portal::TransportCallback;
+ friend class PortalConfiguration;
+
+ static std::shared_ptr<Portal> createShared() {
+ return std::shared_ptr<Portal>(new Portal());
+ }
+
+ static std::shared_ptr<Portal> createShared(::utils::EventThread &worker) {
+ return std::shared_ptr<Portal>(new Portal(worker));
+ }
+
+ bool isConnected() const { return io_module_.get() != nullptr; }
+
/**
- * Set the consumer callback.
+ * Set the transport callback. Must be called from the same worker thread.
*
- * @param consumer_callback - The pointer to the ConsumerCallback object.
+ * @param consumer_callback - The pointer to the TransportCallback object.
*/
- void setConsumerCallback(ConsumerCallback *consumer_callback) {
- consumer_callback_ = consumer_callback;
+ void registerTransportCallback(TransportCallback *transport_callback) {
+ DCHECK(std::this_thread::get_id() == worker_.getThreadId());
+ transport_callback_ = transport_callback;
}
/**
- * Set the producer callback.
- *
- * @param producer_callback - The pointer to the ProducerCallback object.
+ * Unset the consumer callback. Must be called from the same worker thread.
*/
- void setProducerCallback(ProducerCallback *producer_callback) {
- producer_callback_ = producer_callback;
+ void unregisterTransportCallback() {
+ DCHECK(std::this_thread::get_id() == worker_.getThreadId());
+ transport_callback_ = nullptr;
}
/**
- * Specify the output interface to use. This method will be useful in a future
- * scenario where the library will be able to forward packets without
+ * Specify the output interface to use. This method will be useful in a
+ * future scenario where the library will be able to forward packets without
* connecting to a local forwarder. Now it is not used.
*
* @param output_interface - The output interface to use for
* forwarding/receiving packets.
*/
- TRANSPORT_ALWAYS_INLINE void setOutputInterface(
- const std::string &output_interface) {
- io_module_->setOutputInterface(output_interface);
+ void setOutputInterface(const std::string &output_interface) {
+ if (io_module_) {
+ io_module_->setOutputInterface(output_interface);
+ }
}
/**
* Connect the transport to the local hicn forwarder.
*
- * @param is_consumer - Boolean specifying if the application on top of portal
- * is a consumer or a producer.
+ * @param is_consumer - Boolean specifying if the application on top of
+ * portal is a consumer or a producer.
*/
- TRANSPORT_ALWAYS_INLINE void connect(bool is_consumer = true) {
- if (!io_module_) {
- pending_interest_hash_table_.reserve(portal_details::pit_size);
- io_module_.reset(IoModule::load(io_module_path_.c_str()));
-
- CHECK(io_module_);
-
- io_module_->init(std::bind(&Portal::processIncomingMessages, this,
- std::placeholders::_1, std::placeholders::_2,
- std::placeholders::_3),
- std::bind(&Portal::setLocalRoutes, this), io_service_,
- app_name_);
- io_module_->connect(is_consumer);
- is_consumer_ = is_consumer;
+ void connect(bool is_consumer = true) {
+ if (isConnected()) {
+ return;
}
+
+ worker_.addAndWaitForExecution([this, is_consumer]() {
+ if (!io_module_) {
+ pending_interest_hash_table_.reserve(portal_details::pit_size);
+ io_module_.reset(IoModule::load(io_module_path_.c_str()));
+
+ CHECK(io_module_);
+
+ std::weak_ptr<Portal> self(shared_from_this());
+
+ io_module_->init(
+ [self](Connector *c, const std::vector<utils::MemBuf::Ptr> &buffers,
+ const std::error_code &ec) {
+ if (auto ptr = self.lock()) {
+ ptr->processIncomingMessages(c, buffers, ec);
+ }
+ },
+ [self](Connector *c, const std::error_code &ec) {
+ if (!ec) {
+ return;
+ }
+ auto ptr = self.lock();
+ if (ptr && ptr->transport_callback_) {
+ ptr->transport_callback_->onError(ec);
+ }
+ },
+ [self](Connector *c, const std::error_code &ec) {
+ auto ptr = self.lock();
+ if (ptr) {
+ if (ec && ptr->transport_callback_) {
+ ptr->transport_callback_->onError(ec);
+ return;
+ }
+ ptr->setLocalRoutes();
+ }
+ },
+ worker_.getIoService(), app_name_);
+
+ io_module_->connect(is_consumer);
+ is_consumer_ = is_consumer;
+ }
+ });
}
/**
@@ -295,18 +299,13 @@ class Portal {
~Portal() { killConnection(); }
/**
- * Compute name hash
- */
- TRANSPORT_ALWAYS_INLINE uint32_t getHash(const Name &name) {
- return name.getHash32(false) + name.getSuffix();
- }
-
- /**
* Check if there is already a pending interest for a given name.
*
* @param name - The interest name.
*/
- TRANSPORT_ALWAYS_INLINE bool interestIsPending(const Name &name) {
+ bool interestIsPending(const Name &name) {
+ DCHECK(std::this_thread::get_id() == worker_.getThreadId());
+
auto it = pending_interest_hash_table_.find(getHash(name));
if (it != pending_interest_hash_table_.end()) {
return true;
@@ -321,19 +320,21 @@ class Portal {
* @param interest - The pointer to the interest. The ownership of the
* interest is transferred by the caller to portal.
*
- * @param on_content_object_callback - If the caller wishes to use a different
- * callback to be called for this interest, it can set this parameter.
- * Otherwise ConsumerCallback::onContentObject will be used.
+ * @param on_content_object_callback - If the caller wishes to use a
+ * different callback to be called for this interest, it can set this
+ * parameter. Otherwise ConsumerCallback::onContentObject will be used.
*
* @param on_interest_timeout_callback - If the caller wishes to use a
* different callback to be called for this interest, it can set this
* parameter. Otherwise ConsumerCallback::onTimeout will be used.
*/
- TRANSPORT_ALWAYS_INLINE void sendInterest(
+ void sendInterest(
Interest::Ptr &&interest,
OnContentObjectCallback &&on_content_object_callback = UNSET_CALLBACK,
OnInterestTimeoutCallback &&on_interest_timeout_callback =
UNSET_CALLBACK) {
+ DCHECK(std::this_thread::get_id() == worker_.getThreadId());
+
// Send it
interest->encodeSuffixes();
io_module_->send(*interest);
@@ -346,35 +347,42 @@ class Portal {
uint32_t counter = 0;
// Set timers
do {
- auto pending_interest = packet_pool_.getPendingInterest();
- pending_interest->setInterest(interest);
+ if (suffix) {
+ hash = initial_hash + *suffix;
+ seq = *suffix;
+ suffix++;
+ }
+
+ auto it = pending_interest_hash_table_.find(hash);
+ PendingInterest *pending_interest = nullptr;
+ if (it != pending_interest_hash_table_.end()) {
+ it->second.cancelTimer();
+ pending_interest = &it->second;
+ pending_interest->setInterest(interest);
+ } else {
+ auto pend_int = pending_interest_hash_table_.try_emplace(
+ hash, worker_.getIoService(), interest);
+ pending_interest = &pend_int.first->second;
+ }
+
pending_interest->setOnContentObjectCallback(
std::move(on_content_object_callback));
pending_interest->setOnTimeoutCallback(
std::move(on_interest_timeout_callback));
+ auto self = weak_from_this();
pending_interest->startCountdown(
portal_details::makeCustomAllocatorHandler(
async_callback_memory_,
- std::bind(&Portal::timerHandler, this, std::placeholders::_1,
- hash, seq)));
+ [self, hash, seq](const std::error_code &ec) {
+ if (TRANSPORT_EXPECT_FALSE(ec.operator bool())) {
+ return;
+ }
- auto it = pending_interest_hash_table_.find(hash);
- if (it != pending_interest_hash_table_.end()) {
- it->second->cancelTimer();
-
- // Get reference to interest packet in order to have it destroyed.
- auto _int = it->second->getInterest();
- it->second = std::move(pending_interest);
- } else {
- pending_interest_hash_table_[hash] = std::move(pending_interest);
- }
-
- if (suffix) {
- hash = initial_hash + *suffix;
- seq = *suffix;
- suffix++;
- }
+ if (auto ptr = self.lock()) {
+ ptr->timerHandler(hash, seq);
+ }
+ }));
} while (counter++ < n_suffixes);
}
@@ -385,214 +393,174 @@ class Portal {
* @param ec - Error code which says whether the timer expired or has been
* canceled upon data packet reception.
*
- * @param hash - The index of the interest in the pending interest hash table.
+ * @param hash - The index of the interest in the pending interest hash
+ * table.
*/
- TRANSPORT_ALWAYS_INLINE void timerHandler(const std::error_code &ec,
- uint32_t hash, uint32_t seq) {
- bool is_stopped = io_service_.stopped();
- if (TRANSPORT_EXPECT_FALSE(is_stopped)) {
- return;
- }
+ void timerHandler(uint32_t hash, uint32_t seq) {
+ PendingInterestHashTable::iterator it =
+ pending_interest_hash_table_.find(hash);
+ if (it != pending_interest_hash_table_.end()) {
+ PendingInterest &pend_interest = it->second;
+ auto _int = pend_interest.getInterest();
+ auto callback = pend_interest.getOnTimeoutCallback();
+ pending_interest_hash_table_.erase(it);
+ Name &name = const_cast<Name &>(_int->getName());
+ name.setSuffix(seq);
- if (TRANSPORT_EXPECT_TRUE(!ec)) {
- PendingInterestHashTable::iterator it =
- pending_interest_hash_table_.find(hash);
- if (it != pending_interest_hash_table_.end()) {
- PendingInterest::Ptr ptr = std::move(it->second);
- pending_interest_hash_table_.erase(it);
- auto _int = ptr->getInterest();
- Name &name = const_cast<Name &>(_int->getName());
- name.setSuffix(seq);
-
- if (ptr->getOnTimeoutCallback() != UNSET_CALLBACK) {
- ptr->on_interest_timeout_callback_(_int, name);
- } else if (consumer_callback_) {
- consumer_callback_->onTimeout(_int, name);
- }
+ if (callback != UNSET_CALLBACK) {
+ callback(_int, name);
+ } else if (transport_callback_) {
+ transport_callback_->onTimeout(_int, name);
}
}
}
/**
- * Register a producer name to the local forwarder and optionally set the
- * content store size in a per-face manner.
- *
- * @param config - The configuration for the local forwarder binding.
- */
- TRANSPORT_ALWAYS_INLINE void bind(const BindConfig &config) {
- assert(io_module_);
- io_module_->setContentStoreSize(config.csReserved());
- served_namespaces_.push_back(config.prefix());
- setLocalRoutes();
- }
-
- /**
- * Start the event loop. This function blocks here and calls the callback set
- * by the application upon interest/data received or timeout.
- */
- TRANSPORT_ALWAYS_INLINE void runEventsLoop() {
- if (io_service_.stopped()) {
- io_service_.reset(); // ensure that run()/poll() will do some work
- }
-
- io_service_.run();
- }
-
- /**
- * Run one event and return.
- */
- TRANSPORT_ALWAYS_INLINE void runOneEvent() {
- if (io_service_.stopped()) {
- io_service_.reset(); // ensure that run()/poll() will do some work
- }
-
- io_service_.run_one();
- }
-
- /**
- * Send a data packet to the local forwarder. As opposite to sendInterest, the
- * ownership of the content object is not transferred to the portal.
+ * Send a data packet to the local forwarder.
*
* @param content_object - The data packet.
*/
- TRANSPORT_ALWAYS_INLINE void sendContentObject(
- ContentObject &content_object) {
+ void sendContentObject(ContentObject &content_object) {
+ DCHECK(io_module_);
+ DCHECK(std::this_thread::get_id() == worker_.getThreadId());
+
io_module_->send(content_object);
}
/**
- * Stop the event loop, canceling all the pending events in the event queue.
- *
- * Beware that stopping the event loop DOES NOT disconnect the transport from
- * the local forwarder, the connector underneath will stay connected.
+ * Disconnect the transport from the local forwarder.
*/
- TRANSPORT_ALWAYS_INLINE void stopEventsLoop() {
- if (!io_service_.stopped()) {
- io_service_.dispatch([this]() {
- clear();
- io_service_.stop();
- });
+ void killConnection() {
+ if (TRANSPORT_EXPECT_TRUE(io_module_ != nullptr)) {
+ io_module_->closeConnection();
}
}
/**
- * Disconnect the transport from the local forwarder.
+ * Clear the pending interest hash table.
*/
- TRANSPORT_ALWAYS_INLINE void killConnection() {
- io_module_->closeConnection();
+ void clear() {
+ worker_.tryRunHandlerNow([self{shared_from_this()}]() { self->doClear(); });
}
/**
- * Clear the pending interest hash table.
+ * Get a reference to the io_service object.
*/
- TRANSPORT_ALWAYS_INLINE void clear() {
- if (!io_service_.stopped()) {
- io_service_.dispatch(std::bind(&Portal::doClear, this));
- } else {
- doClear();
- }
- }
+ utils::EventThread &getThread() { return worker_; }
/**
- * Remove one pending interest.
+ * Register a route to the local forwarder.
*/
- TRANSPORT_ALWAYS_INLINE void clearOne(const Name &name) {
- if (!io_service_.stopped()) {
- io_service_.dispatch(std::bind(&Portal::doClearOne, this, name));
- } else {
- doClearOne(name);
- }
+ void registerRoute(const Prefix &prefix) {
+ std::weak_ptr<Portal> self = shared_from_this();
+ worker_.tryRunHandlerNow([self, prefix]() {
+ if (auto ptr = self.lock()) {
+ auto ret = ptr->served_namespaces_.insert(prefix);
+ if (ret.second && ptr->io_module_ && ptr->io_module_->isConnected()) {
+ ptr->io_module_->registerRoute(prefix);
+ }
+ }
+ });
}
/**
- * Get a reference to the io_service object.
+ * Send a MAP-Me update to traverse NATs.
*/
- TRANSPORT_ALWAYS_INLINE asio::io_service &getIoService() {
- return io_service_;
+ TRANSPORT_ALWAYS_INLINE void sendMapme() {
+ if (io_module_->isConnected()) {
+ io_module_->sendMapme();
+ }
}
/**
- * Register a route to the local forwarder.
+ * set forwarding strategy
*/
- TRANSPORT_ALWAYS_INLINE void registerRoute(Prefix &prefix) {
- served_namespaces_.push_back(prefix);
+ TRANSPORT_ALWAYS_INLINE void setForwardingStrategy(Prefix &prefix,
+ std::string &strategy) {
if (io_module_->isConnected()) {
- io_module_->registerRoute(prefix);
+ io_module_->setForwardingStrategy(prefix, strategy);
}
}
/**
* Check if the transport is connected to a forwarder or not
*/
- TRANSPORT_ALWAYS_INLINE bool isConnectedToFwd() {
+ bool isConnectedToFwd() {
std::string mod = io_module_path_.substr(0, io_module_path_.find("."));
if (mod == "forwarder_module") return false;
return true;
}
+ auto &getServedNamespaces() { return served_namespaces_; }
+
private:
/**
- * Clear the pending interest hash table.
+ * Compute name hash
*/
- TRANSPORT_ALWAYS_INLINE void doClear() {
- for (auto &pend_interest : pending_interest_hash_table_) {
- pend_interest.second->cancelTimer();
-
- // Get interest packet from pending interest and do nothing with it. It
- // will get destroyed as it goes out of scope.
- auto _int = pend_interest.second->getInterest();
- }
-
- pending_interest_hash_table_.clear();
+ uint32_t getHash(const Name &name) {
+ return name.getHash32(false) + name.getSuffix();
}
/**
- * Remove one pending interest.
+ * Clear the pending interest hash table.
*/
- TRANSPORT_ALWAYS_INLINE void doClearOne(const Name &name) {
- auto it = pending_interest_hash_table_.find(getHash(name));
-
- if (it != pending_interest_hash_table_.end()) {
- it->second->cancelTimer();
-
- // Get interest packet from pending interest and do nothing with it. It
- // will get destroyed as it goes out of scope.
- auto _int = it->second->getInterest();
-
- pending_interest_hash_table_.erase(it);
+ void doClear() {
+ for (auto &pend_interest : pending_interest_hash_table_) {
+ pend_interest.second.cancelTimer();
}
+
+ pending_interest_hash_table_.clear();
}
/**
- * Callback called by the underlying connector upon reception of a packet from
- * the local forwarder.
+ * Callback called by the underlying connector upon reception of a packet
+ * from the local forwarder.
*
* @param packet_buffer - The bytes of the packet.
*/
- TRANSPORT_ALWAYS_INLINE void processIncomingMessages(
- Connector *c, utils::MemBuf &buffer, const std::error_code &ec) {
- bool is_stopped = io_service_.stopped();
- if (TRANSPORT_EXPECT_FALSE(is_stopped)) {
+ void processIncomingMessages(Connector *c,
+ const std::vector<utils::MemBuf::Ptr> &buffers,
+ const std::error_code &ec) {
+ if (!transport_callback_) {
return;
}
- if (TRANSPORT_EXPECT_FALSE(io_module_->isControlMessage(buffer.data()))) {
- processControlMessage(buffer);
+ if (TRANSPORT_EXPECT_FALSE(ec.operator bool())) {
+ // Error receiving from underlying infra.
+ if (transport_callback_) {
+ transport_callback_->onError(ec);
+ }
+
return;
}
- // The buffer is a base class for an interest or a content object
- Packet &packet_buffer = static_cast<Packet &>(buffer);
+ for (auto &buffer_ptr : buffers) {
+ auto &buffer = *buffer_ptr;
+
+ if (TRANSPORT_EXPECT_FALSE(io_module_->isControlMessage(buffer))) {
+ processControlMessage(buffer);
+ return;
+ }
- auto format = packet_buffer.getFormat();
- if (TRANSPORT_EXPECT_TRUE(_is_tcp(format))) {
- if (is_consumer_) {
- processContentObject(static_cast<ContentObject &>(packet_buffer));
+ auto format = Packet::getFormatFromBuffer(buffer.data(), buffer.length());
+ if (TRANSPORT_EXPECT_TRUE(_is_cmpr(format) || _is_tcp(format))) {
+ // The buffer is a base class for an interest or a content object
+ Packet &packet_buffer = static_cast<Packet &>(buffer);
+ if (is_consumer_ && !packet_buffer.isInterest()) {
+ processContentObject(static_cast<ContentObject &>(packet_buffer));
+ } else if (!is_consumer_ && packet_buffer.isInterest()) {
+ processInterest(static_cast<Interest &>(packet_buffer));
+ } else {
+ auto packet_type =
+ packet_buffer.isInterest() ? "Interest" : "ContentObject";
+ auto socket_type = is_consumer_ ? "consumer " : "producer ";
+ LOG(ERROR) << "Received a " << packet_type << " packet with name "
+ << packet_buffer.getName() << " in a " << socket_type
+ << " transport. Ignoring it.";
+ }
} else {
- processInterest(static_cast<Interest &>(packet_buffer));
+ LOG(ERROR) << "Received not supported packet. Ignoring it.";
}
- } else {
- LOG(ERROR) << "Received not supported packet. Ignoring it.";
}
}
@@ -601,19 +569,21 @@ class Portal {
* It register the prefixes in the served_namespaces_ list to the local
* forwarder.
*/
- TRANSPORT_ALWAYS_INLINE void setLocalRoutes() {
+ void setLocalRoutes() {
+ DCHECK(io_module_);
+ DCHECK(io_module_->isConnected());
+ DCHECK(std::this_thread::get_id() == worker_.getThreadId());
+
for (auto &prefix : served_namespaces_) {
- if (io_module_->isConnected()) {
- io_module_->registerRoute(prefix);
- }
+ io_module_->registerRoute(prefix);
}
}
- TRANSPORT_ALWAYS_INLINE void processInterest(Interest &interest) {
+ void processInterest(Interest &interest) {
// Interest for a producer
DLOG_IF(INFO, VLOG_IS_ON(3)) << "processInterest " << interest.getName();
- if (TRANSPORT_EXPECT_TRUE(producer_callback_ != nullptr)) {
- producer_callback_->onInterest(interest);
+ if (TRANSPORT_EXPECT_TRUE(transport_callback_ != nullptr)) {
+ transport_callback_->onInterest(interest);
}
}
@@ -625,8 +595,7 @@ class Portal {
*
* @param content_object - The data packet
*/
- TRANSPORT_ALWAYS_INLINE void processContentObject(
- ContentObject &content_object) {
+ void processContentObject(ContentObject &content_object) {
DLOG_IF(INFO, VLOG_IS_ON(3))
<< "processContentObject " << content_object.getName();
uint32_t hash = getHash(content_object.getName());
@@ -635,15 +604,16 @@ class Portal {
if (it != pending_interest_hash_table_.end()) {
DLOG_IF(INFO, VLOG_IS_ON(3)) << "Found pending interest.";
- PendingInterest::Ptr interest_ptr = std::move(it->second);
+ PendingInterest &pend_interest = it->second;
+ pend_interest.cancelTimer();
+ auto _int = pend_interest.getInterest();
+ auto callback = pend_interest.getOnDataCallback();
pending_interest_hash_table_.erase(it);
- interest_ptr->cancelTimer();
- auto _int = interest_ptr->getInterest();
- if (interest_ptr->getOnDataCallback() != UNSET_CALLBACK) {
- interest_ptr->on_content_object_callback_(*_int, content_object);
- } else if (consumer_callback_) {
- consumer_callback_->onContentObject(*_int, content_object);
+ if (callback != UNSET_CALLBACK) {
+ callback(*_int, content_object);
+ } else if (transport_callback_) {
+ transport_callback_->onContentObject(*_int, content_object);
}
} else {
DLOG_IF(INFO, VLOG_IS_ON(3))
@@ -652,12 +622,11 @@ class Portal {
}
/**
- * Process a control message. Control messages are different depending on the
- * connector, then the forwarder_interface will do the job of understanding
- * them.
+ * Process a control message. Control messages are different depending on
+ * the connector, then the forwarder_interface will do the job of
+ * understanding them.
*/
- TRANSPORT_ALWAYS_INLINE void processControlMessage(
- utils::MemBuf &packet_buffer) {
+ void processControlMessage(utils::MemBuf &packet_buffer) {
io_module_->processControlMessageReply(packet_buffer);
}
@@ -665,17 +634,14 @@ class Portal {
portal_details::HandlerMemory async_callback_memory_;
std::unique_ptr<IoModule, void (*)(IoModule *)> io_module_;
- asio::io_service &io_service_;
- asio::io_service internal_io_service_;
- portal_details::Pool packet_pool_;
+ ::utils::EventThread &worker_;
std::string app_name_;
PendingInterestHashTable pending_interest_hash_table_;
- std::list<Prefix> served_namespaces_;
+ std::set<Prefix> served_namespaces_;
- ConsumerCallback *consumer_callback_;
- ProducerCallback *producer_callback_;
+ TransportCallback *transport_callback_;
bool is_consumer_;
diff --git a/libtransport/src/core/prefix.cc b/libtransport/src/core/prefix.cc
index d598cff75..4c1e191e9 100644
--- a/libtransport/src/core/prefix.cc
+++ b/libtransport/src/core/prefix.cc
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -99,6 +99,14 @@ void Prefix::buildPrefix(std::string &prefix, uint16_t prefix_length,
ip_prefix_.family = family;
}
+bool Prefix::operator<(const Prefix &other) const {
+ return ip_prefix_cmp(&ip_prefix_, &other.ip_prefix_) < 0;
+}
+
+bool Prefix::operator==(const Prefix &other) const {
+ return ip_prefix_cmp(&ip_prefix_, &other.ip_prefix_) == 0;
+}
+
std::unique_ptr<Sockaddr> Prefix::toSockaddr() const {
Sockaddr *ret = nullptr;
@@ -211,8 +219,6 @@ Name Prefix::getName(const core::Name &mask, const core::Name &components,
}
}
- // if (this->contains(name_ip))
- // throw errors::RuntimeException("Mask overrides the prefix");
return Name(ip_prefix_.family, (uint8_t *)&name_ip);
}
@@ -279,8 +285,6 @@ Prefix &Prefix::setNetwork(std::string &network) {
}
Name Prefix::makeRandomName() const {
- srand((unsigned int)time(nullptr));
-
if (ip_prefix_.family == AF_INET6) {
std::default_random_engine eng((std::random_device())());
std::uniform_int_distribution<uint32_t> idis(
diff --git a/libtransport/src/core/tcp_socket_connector.cc b/libtransport/src/core/tcp_socket_connector.cc
index a30264271..7758e2cf2 100644
--- a/libtransport/src/core/tcp_socket_connector.cc
+++ b/libtransport/src/core/tcp_socket_connector.cc
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -85,8 +85,10 @@ void TcpSocketConnector::send(const uint8_t *packet, std::size_t len,
const PacketSentCallback &packet_sent) {
if (packet_sent != 0) {
asio::async_write(socket_, asio::buffer(packet, len),
- [packet_sent](std::error_code ec,
- std::size_t /*length*/) { packet_sent(); });
+ [packet_sent](const std::error_code &ec)
+ std::size_t /*length*/) {
+ packet_sent();
+ });
} else {
if (state_ == ConnectorState::CONNECTED) {
asio::write(socket_, asio::buffer(packet, len));
@@ -151,7 +153,7 @@ void TcpSocketConnector::doWrite() {
asio::async_write(
socket_, std::move(array),
- [this, packet_store = std::move(packet_store)](std::error_code ec,
+ [this, packet_store = std::move(packet_store)](const std::error_code &ec,
std::size_t length) {
if (TRANSPORT_EXPECT_TRUE(!ec)) {
if (!output_buffer_.empty()) {
@@ -172,7 +174,7 @@ void TcpSocketConnector::doReadBody(std::size_t body_length) {
asio::async_read(
socket_, asio::buffer(read_msg_->writableTail(), body_length),
asio::transfer_exactly(body_length),
- [this](std::error_code ec, std::size_t length) {
+ [this](const std::error_code &ec, std::size_t length) {
read_msg_->append(length);
if (TRANSPORT_EXPECT_TRUE(!ec)) {
receive_callback_(std::move(read_msg_));
@@ -195,7 +197,7 @@ void TcpSocketConnector::doReadHeader() {
asio::buffer(read_msg_->writableData(),
NetworkMessage::fixed_header_length),
asio::transfer_exactly(NetworkMessage::fixed_header_length),
- [this](std::error_code ec, std::size_t length) {
+ [this](const std::error_code &ec, std::size_t length) {
if (TRANSPORT_EXPECT_TRUE(!ec)) {
read_msg_->append(NetworkMessage::fixed_header_length);
std::size_t body_length = 0;
@@ -235,7 +237,7 @@ void TcpSocketConnector::tryReconnect() {
void TcpSocketConnector::doConnect() {
asio::async_connect(
socket_, endpoint_iterator_,
- [this](std::error_code ec, tcp::resolver::iterator) {
+ [this](const std::error_code &ec, tcp::resolver::iterator) {
if (!ec) {
timer_.cancel();
state_ = ConnectorState::CONNECTED;
diff --git a/libtransport/src/core/tcp_socket_connector.h b/libtransport/src/core/tcp_socket_connector.h
index 21db8301e..2901a5c9d 100644
--- a/libtransport/src/core/tcp_socket_connector.h
+++ b/libtransport/src/core/tcp_socket_connector.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Copyright (c) 2021 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
diff --git a/libtransport/src/core/udp_connector.cc b/libtransport/src/core/udp_connector.cc
new file mode 100644
index 000000000..ee0c7ea9c
--- /dev/null
+++ b/libtransport/src/core/udp_connector.cc
@@ -0,0 +1,371 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ */
+
+#include <core/errors.h>
+#include <core/udp_connector.h>
+#include <glog/logging.h>
+#include <hicn/transport/utils/branch_prediction.h>
+
+#include <iostream>
+#include <thread>
+#include <vector>
+
+namespace transport {
+namespace core {
+
+UdpTunnelConnector::~UdpTunnelConnector() {}
+
+void UdpTunnelConnector::connect(const std::string &hostname, uint16_t port,
+ const std::string &bind_address,
+ uint16_t bind_port) {
+ if (state_ == State::CLOSED) {
+ state_ = State::CONNECTING;
+
+ asio::ip::udp::resolver::query query(asio::ip::udp::v4(), hostname,
+ std::to_string(port));
+
+ endpoint_iterator_ = resolver_.resolve(query);
+ remote_endpoint_send_ = *endpoint_iterator_;
+ socket_->open(remote_endpoint_send_.protocol());
+
+ if (!bind_address.empty() && bind_port != 0) {
+ using namespace asio::ip;
+
+ auto address = address::from_string(bind_address);
+ if (address.is_v6()) {
+ std::error_code ec;
+ socket_->set_option(asio::ip::v6_only(false), ec);
+ // Call succeeds only on dual stack systems.
+ }
+
+ socket_->bind(udp::endpoint(address, bind_port));
+ }
+
+ remote_endpoint_ = Endpoint(remote_endpoint_send_);
+ local_endpoint_ = Endpoint(socket_->local_endpoint());
+
+ auto self = shared_from_this();
+ doConnect(self);
+ }
+}
+
+void UdpTunnelConnector::send(Packet &packet) {
+ send(packet.shared_from_this());
+}
+
+void UdpTunnelConnector::send(const utils::MemBuf::Ptr &buffer) {
+ auto self = shared_from_this();
+ io_service_.post([self, pkt{buffer}]() {
+ bool write_in_progress = !self->output_buffer_.empty();
+ self->output_buffer_.push_back(std::move(pkt));
+ if (TRANSPORT_EXPECT_TRUE(self->state_ == State::CONNECTED)) {
+ if (!write_in_progress) {
+ self->doSendPacket(self);
+ }
+ } else {
+ self->data_available_ = true;
+ }
+ });
+}
+
+void UdpTunnelConnector::close() {
+ DLOG_IF(INFO, VLOG_IS_ON(2)) << "UDPTunnelConnector::close";
+ state_ = State::CLOSED;
+ bool is_socket_owned = socket_.use_count() == 1;
+ if (is_socket_owned) {
+ // Here we use a shared ptr to keep the object alive until we call the close
+ // function
+ auto self = shared_from_this();
+ io_service_.dispatch([this, self]() {
+ socket_->close();
+ // on_close_callback_(shared_from_this());
+ });
+ }
+}
+
+void UdpTunnelConnector::doSendPacket(
+ const std::shared_ptr<UdpTunnelConnector> &self) {
+#ifdef LINUX
+ send_timer_.expires_from_now(std::chrono::microseconds(50));
+ send_timer_.async_wait([self](const std::error_code &ec) {
+ if (ec) {
+ return;
+ }
+
+ self->writeHandler();
+ });
+#else
+ auto packet = output_buffer_.front().get();
+ auto array = std::vector<asio::const_buffer>();
+
+ const ::utils::MemBuf *current = packet;
+ do {
+ array.push_back(asio::const_buffer(current->data(), current->length()));
+ current = current->next();
+ } while (current != packet);
+
+ socket_->async_send_to(
+ std::move(array), remote_endpoint_send_,
+ [this, self](const std::error_code &ec, std::size_t length) {
+ if (TRANSPORT_EXPECT_TRUE(!ec)) {
+ sent_callback_(this, make_error_code(core_error::success));
+ } else if (ec.value() ==
+ static_cast<int>(std::errc::operation_canceled)) {
+ // The connection has been closed by the application.
+ return;
+ } else {
+ sendFailed();
+ sent_callback_(this, ec);
+ }
+
+ output_buffer_.pop_front();
+ if (!output_buffer_.empty()) {
+ doSendPacket(self);
+ }
+ });
+#endif
+}
+
+void UdpTunnelConnector::retryConnection() {
+ // The connection was refused. In this case let's retry to reconnect.
+ connection_reattempts_++;
+ LOG(ERROR) << "Error in UDP: Connection refused. Retrying...";
+ state_ = State::CONNECTING;
+ timer_.expires_from_now(std::chrono::milliseconds(500));
+ std::weak_ptr<UdpTunnelConnector> self = shared_from_this();
+ timer_.async_wait([self, this](const std::error_code &ec) {
+ if (ec) {
+ }
+ if (auto ptr = self.lock()) {
+ doConnect(ptr);
+ }
+ });
+ return;
+}
+
+#ifdef LINUX
+void UdpTunnelConnector::writeHandler() {
+ if (TRANSPORT_EXPECT_FALSE(state_ != State::CONNECTED)) {
+ return;
+ }
+
+ auto len = std::min(output_buffer_.size(), std::size_t(Connector::max_burst));
+
+ if (len) {
+ int m = 0;
+ for (auto &p : output_buffer_) {
+ auto packet = p.get();
+ ::utils::MemBuf *current = packet;
+ int b = 0;
+ do {
+ // array.push_back(asio::const_buffer(current->data(),
+ // current->length()));
+ tx_iovecs_[m][b].iov_base = current->writableData();
+ tx_iovecs_[m][b].iov_len = current->length();
+ current = current->next();
+ b++;
+ } while (current != packet);
+
+ tx_msgs_[m].msg_hdr.msg_iov = tx_iovecs_[m];
+ tx_msgs_[m].msg_hdr.msg_iovlen = b;
+ tx_msgs_[m].msg_hdr.msg_name = remote_endpoint_send_.data();
+ tx_msgs_[m].msg_hdr.msg_namelen = remote_endpoint_send_.size();
+ m++;
+
+ if (--len == 0) {
+ break;
+ }
+ }
+
+ int retval = sendmmsg(socket_->native_handle(), tx_msgs_, m, MSG_DONTWAIT);
+ if (retval > 0) {
+ while (retval--) {
+ output_buffer_.pop_front();
+ }
+ } else if (errno != EWOULDBLOCK && errno != EAGAIN) {
+ LOG(ERROR) << "Error sending messages: " << strerror(errno);
+ sent_callback_(this, make_error_code(core_error::send_failed));
+ return;
+ }
+ }
+
+ if (!output_buffer_.empty()) {
+ send_timer_.expires_from_now(std::chrono::microseconds(50));
+ std::weak_ptr<UdpTunnelConnector> self = shared_from_this();
+ send_timer_.async_wait([self](const std::error_code &ec) {
+ if (ec) {
+ return;
+ }
+ if (auto ptr = self.lock()) {
+ ptr->writeHandler();
+ }
+ });
+ }
+}
+
+void UdpTunnelConnector::readHandler(const std::error_code &ec) {
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "UdpTunnelConnector receive packet";
+
+ if (TRANSPORT_EXPECT_TRUE(!ec)) {
+ if (TRANSPORT_EXPECT_TRUE(state_ == State::CONNECTED)) {
+ if (current_position_ == 0) {
+ for (int i = 0; i < max_burst; i++) {
+ auto read_buffer = getRawBuffer();
+ rx_iovecs_[i][0].iov_base = read_buffer.first;
+ rx_iovecs_[i][0].iov_len = read_buffer.second;
+ rx_msgs_[i].msg_hdr.msg_iov = rx_iovecs_[i];
+ rx_msgs_[i].msg_hdr.msg_iovlen = 1;
+ }
+ }
+
+ int res = recvmmsg(socket_->native_handle(), rx_msgs_ + current_position_,
+ max_burst - current_position_, MSG_DONTWAIT, nullptr);
+ if (res < 0) {
+ if (errno == EWOULDBLOCK || errno == EAGAIN) {
+ // Try again later
+ return;
+ }
+
+ if (errno == ECONNREFUSED &&
+ connection_reattempts_ < max_reconnection_reattempts) {
+ retryConnection();
+ return;
+ }
+
+ LOG(ERROR) << "Error receiving messages! " << strerror(errno) << " "
+ << res;
+ std::vector<utils::MemBuf::Ptr> v;
+ auto ec = make_error_code(core_error::receive_failed);
+
+ receive_callback_(this, v, ec);
+ return;
+ }
+
+ std::vector<utils::MemBuf::Ptr> v;
+ v.reserve(res);
+ for (int i = 0; i < res; i++) {
+ auto packet = getPacketFromBuffer(
+ reinterpret_cast<uint8_t *>(
+ rx_msgs_[current_position_].msg_hdr.msg_iov[0].iov_base),
+ rx_msgs_[current_position_].msg_len);
+ receiveSuccess(*packet);
+ v.push_back(std::move(packet));
+ ++current_position_;
+ }
+
+ receive_callback_(this, v, make_error_code(core_error::success));
+
+ doRecvPacket();
+ } else {
+ LOG(ERROR) << "Error in UDP: Receiving packets from a not "
+ "connected socket.";
+ }
+ } else if (ec.value() == static_cast<int>(std::errc::operation_canceled)) {
+ LOG(ERROR) << "The connection has been closed by the application.";
+ return;
+ } else {
+ if (TRANSPORT_EXPECT_TRUE(state_ == State::CONNECTED)) {
+ // receive_callback_(this, *read_msg_, ec);
+ LOG(ERROR) << "Error in UDP connector: " << ec.value() << " "
+ << ec.message();
+ } else {
+ LOG(ERROR) << "Error in connector while not connected. " << ec.value()
+ << " " << ec.message();
+ }
+ }
+}
+#endif
+
+void UdpTunnelConnector::doRecvPacket() {
+ std::weak_ptr<UdpTunnelConnector> self = shared_from_this();
+#ifdef LINUX
+ if (state_ == State::CONNECTED) {
+#if ((ASIO_VERSION / 100 % 1000) < 11)
+ socket_->async_receive(asio::null_buffers(),
+#else
+ socket_->async_wait(asio::ip::tcp::socket::wait_read,
+#endif
+ [self](const std::error_code &ec) {
+ if (ec) {
+ LOG(ERROR)
+ << "Error in UDP connector: " << ec.value()
+ << " " << ec.message();
+ return;
+ }
+ if (auto ptr = self.lock()) {
+ ptr->readHandler(ec);
+ }
+ });
+ }
+#else
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "UdpTunnelConnector receive packet";
+ read_msg_ = getRawBuffer();
+ socket_->async_receive_from(
+ asio::buffer(read_msg_.first, read_msg_.second), remote_endpoint_recv_,
+ [this, self](const std::error_code &ec, std::size_t length) {
+ if (auto ptr = self.lock()) {
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "UdpTunnelConnector received packet length=" << length;
+ if (TRANSPORT_EXPECT_TRUE(!ec)) {
+ if (TRANSPORT_EXPECT_TRUE(state_ == State::CONNECTED)) {
+ auto packet = getPacketFromBuffer(read_msg_.first, length);
+ receiveSuccess(*packet);
+ std::vector<utils::MemBuf::Ptr> v{std::move(packet)};
+ receive_callback_(this, v, make_error_code(core_error::success));
+ doRecvPacket();
+ } else {
+ LOG(ERROR) << "Error in UDP: Receiving packets from a not "
+ "connected socket.";
+ }
+ } else if (ec.value() ==
+ static_cast<int>(std::errc::operation_canceled)) {
+ LOG(ERROR) << "The connection has been closed by the application.";
+ return;
+ } else if (ec.value() ==
+ static_cast<int>(std::errc::connection_refused)) {
+ if (connection_reattempts_ < max_reconnection_reattempts) {
+ retryConnection();
+ }
+ } else {
+ if (TRANSPORT_EXPECT_TRUE(state_ == State::CONNECTED)) {
+ LOG(ERROR) << "Error in UDP connector: " << ec.value()
+ << ec.message();
+ } else {
+ LOG(ERROR) << "Error while not connected";
+ }
+ }
+ }
+ });
+#endif
+}
+
+void UdpTunnelConnector::doConnect(
+ std::shared_ptr<UdpTunnelConnector> &self_shared) {
+ std::weak_ptr<UdpTunnelConnector> self = self_shared;
+ asio::async_connect(*socket_, endpoint_iterator_,
+ [this, self](const std::error_code &ec,
+ asio::ip::udp::resolver::iterator) {
+ if (auto ptr = self.lock()) {
+ if (!ec) {
+ state_ = State::CONNECTED;
+ doRecvPacket();
+
+ if (data_available_) {
+ data_available_ = false;
+ doSendPacket(ptr);
+ }
+
+ on_reconnect_callback_(
+ this, make_error_code(core_error::success));
+ } else {
+ LOG(ERROR) << "UDP Connection failed!!!";
+ retryConnection();
+ }
+ }
+ });
+}
+
+} // namespace core
+
+} // namespace transport
diff --git a/libtransport/src/core/udp_connector.h b/libtransport/src/core/udp_connector.h
new file mode 100644
index 000000000..65821852d
--- /dev/null
+++ b/libtransport/src/core/udp_connector.h
@@ -0,0 +1,145 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ */
+
+#pragma once
+
+#include <core/errors.h>
+#include <hicn/transport/core/asio_wrapper.h>
+#include <hicn/transport/core/connector.h>
+#include <hicn/transport/portability/platform.h>
+
+#include <iostream>
+#include <memory>
+
+namespace transport {
+namespace core {
+
+class UdpTunnelListener;
+
+class UdpTunnelConnector : public Connector {
+ friend class UdpTunnelListener;
+
+ public:
+ template <typename ReceiveCallback, typename SentCallback, typename OnClose,
+ typename OnReconnect>
+ UdpTunnelConnector(asio::io_service &io_service,
+ ReceiveCallback &&receive_callback,
+ SentCallback &&packet_sent, OnClose &&on_close_callback,
+ OnReconnect &&on_reconnect)
+ : Connector(receive_callback, packet_sent, on_close_callback,
+ on_reconnect),
+ io_service_(io_service),
+ socket_(std::make_shared<asio::ip::udp::socket>(io_service_)),
+ resolver_(io_service_),
+ timer_(io_service_),
+#ifdef LINUX
+ send_timer_(io_service_),
+ tx_iovecs_{},
+ tx_msgs_{},
+ rx_iovecs_{},
+ rx_msgs_{},
+ current_position_(0),
+#else
+ read_msg_(nullptr, 0),
+#endif
+ data_available_(false) {
+ }
+
+ template <typename ReceiveCallback, typename SentCallback, typename OnClose,
+ typename OnReconnect, typename EndpointType>
+ UdpTunnelConnector(std::shared_ptr<asio::ip::udp::socket> &socket,
+ std::shared_ptr<asio::io_service::strand> &strand,
+ ReceiveCallback &&receive_callback,
+ SentCallback &&packet_sent, OnClose &&on_close_callback,
+ OnReconnect &&on_reconnect, EndpointType &&remote_endpoint)
+ : Connector(receive_callback, packet_sent, on_close_callback,
+ on_reconnect),
+#if ((ASIO_VERSION / 100 % 1000) < 12)
+ io_service_(socket->get_io_service()),
+#else
+ io_service_((asio::io_context &)(socket->get_executor().context())),
+#endif
+ socket_(socket),
+ resolver_(io_service_),
+ remote_endpoint_send_(std::forward<EndpointType &&>(remote_endpoint)),
+ timer_(io_service_),
+#ifdef LINUX
+ send_timer_(io_service_),
+ tx_iovecs_{},
+ tx_msgs_{},
+ rx_iovecs_{},
+ rx_msgs_{},
+ current_position_(0),
+#else
+ read_msg_(nullptr, 0),
+#endif
+ data_available_(false) {
+ if (socket_->is_open()) {
+ state_ = State::CONNECTED;
+ remote_endpoint_ = Endpoint(remote_endpoint_send_);
+ local_endpoint_ = socket_->local_endpoint();
+ }
+ }
+
+ ~UdpTunnelConnector() override;
+
+ void send(Packet &packet) override;
+
+ void send(const utils::MemBuf::Ptr &buffer) override;
+
+ void close() override;
+
+ void connect(const std::string &hostname, std::uint16_t port,
+ const std::string &bind_address = "",
+ std::uint16_t bind_port = 0);
+
+ auto shared_from_this() { return utils::shared_from(this); }
+
+ private:
+ void retryConnection();
+ void doConnect(std::shared_ptr<UdpTunnelConnector> &self);
+ void doRecvPacket();
+
+ void doRecvPacket(utils::MemBuf::Ptr &buffer) {
+ std::vector<utils::MemBuf::Ptr> v{std::move(buffer)};
+ receive_callback_(this, v, make_error_code(core_error::success));
+ }
+
+#ifdef LINUX
+ void readHandler(const std::error_code &ec);
+ void writeHandler();
+#endif
+
+ void setConnected() { state_ = State::CONNECTED; }
+
+ void doSendPacket(const std::shared_ptr<UdpTunnelConnector> &self);
+ void doClose();
+
+ private:
+ asio::io_service &io_service_;
+ std::shared_ptr<asio::ip::udp::socket> socket_;
+ asio::ip::udp::resolver resolver_;
+ asio::ip::udp::resolver::iterator endpoint_iterator_;
+ asio::ip::udp::endpoint remote_endpoint_send_;
+ asio::ip::udp::endpoint remote_endpoint_recv_;
+
+ asio::steady_timer timer_;
+
+#ifdef LINUX
+ asio::steady_timer send_timer_;
+ struct iovec tx_iovecs_[max_burst][8];
+ struct mmsghdr tx_msgs_[max_burst];
+ struct iovec rx_iovecs_[max_burst][8];
+ struct mmsghdr rx_msgs_[max_burst];
+ std::uint8_t current_position_;
+#else
+ std::pair<uint8_t *, std::size_t> read_msg_;
+#endif
+
+ bool data_available_;
+};
+
+} // namespace core
+
+} // namespace transport
diff --git a/libtransport/src/core/udp_listener.cc b/libtransport/src/core/udp_listener.cc
new file mode 100644
index 000000000..c67673392
--- /dev/null
+++ b/libtransport/src/core/udp_listener.cc
@@ -0,0 +1,179 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ */
+
+#include <core/udp_connector.h>
+#include <core/udp_listener.h>
+#include <glog/logging.h>
+#include <hicn/transport/utils/hash.h>
+
+#ifndef LINUX
+namespace std {
+size_t hash<asio::ip::udp::endpoint>::operator()(
+ const asio::ip::udp::endpoint &endpoint) const {
+ auto hash_ip = endpoint.address().is_v4()
+ ? endpoint.address().to_v4().to_ulong()
+ : utils::hash::fnv32_buf(
+ endpoint.address().to_v6().to_bytes().data(), 16);
+ uint16_t port = endpoint.port();
+ return utils::hash::fnv32_buf(&port, 2, hash_ip);
+}
+} // namespace std
+#endif
+
+namespace transport {
+namespace core {
+
+UdpTunnelListener::~UdpTunnelListener() {}
+
+void UdpTunnelListener::close() {
+ strand_->post([this]() {
+ if (socket_->is_open()) {
+ socket_->close();
+ }
+ });
+}
+
+#ifdef LINUX
+void UdpTunnelListener::readHandler(const std::error_code &ec) {
+ DLOG_IF(INFO, VLOG_IS_ON(3)) << "UdpTunnelConnector receive packet";
+
+ if (TRANSPORT_EXPECT_TRUE(!ec)) {
+ if (current_position_ == 0) {
+ for (int i = 0; i < Connector::max_burst; i++) {
+ auto read_buffer = Connector::getRawBuffer();
+ iovecs_[i][0].iov_base = read_buffer.first;
+ iovecs_[i][0].iov_len = read_buffer.second;
+ msgs_[i].msg_hdr.msg_iov = iovecs_[i];
+ msgs_[i].msg_hdr.msg_iovlen = 1;
+ msgs_[i].msg_hdr.msg_name = &remote_endpoints_[i];
+ msgs_[i].msg_hdr.msg_namelen = sizeof(remote_endpoints_[i]);
+ }
+ }
+
+ int res = recvmmsg(socket_->native_handle(), msgs_ + current_position_,
+ Connector::max_burst - current_position_, MSG_DONTWAIT,
+ nullptr);
+ if (res < 0) {
+ LOG(ERROR) << "Error in recvmmsg.";
+ return;
+ }
+
+ for (int i = 0; i < res; i++) {
+ auto packet = Connector::getPacketFromBuffer(
+ reinterpret_cast<uint8_t *>(
+ msgs_[current_position_].msg_hdr.msg_iov[0].iov_base),
+ msgs_[current_position_].msg_len);
+ auto connector_id =
+ utils::hash::fnv64_buf(msgs_[current_position_].msg_hdr.msg_name,
+ msgs_[current_position_].msg_hdr.msg_namelen);
+
+ auto connector = connectors_.find(connector_id);
+ if (connector == connectors_.end()) {
+ // Create new connector corresponding to new client
+
+ /*
+ * Get the remote endpoint for this particular message
+ */
+ using namespace asio::ip;
+ if (local_endpoint_.address().is_v4()) {
+ auto addr = reinterpret_cast<struct sockaddr_in *>(
+ &remote_endpoints_[current_position_]);
+ address_v4::bytes_type address_bytes;
+ std::copy_n(reinterpret_cast<uint8_t *>(&addr->sin_addr),
+ address_bytes.size(), address_bytes.begin());
+ address_v4 address(address_bytes);
+ remote_endpoint_ = udp::endpoint(address, ntohs(addr->sin_port));
+ } else {
+ auto addr = reinterpret_cast<struct sockaddr_in6 *>(
+ &remote_endpoints_[current_position_]);
+ address_v6::bytes_type address_bytes;
+ std::copy_n(reinterpret_cast<uint8_t *>(&addr->sin6_addr),
+ address_bytes.size(), address_bytes.begin());
+ address_v6 address(address_bytes);
+ remote_endpoint_ = udp::endpoint(address, ntohs(addr->sin6_port));
+ }
+
+ /**
+ * Create new connector sharing the same socket of this listener.
+ */
+ auto ret = connectors_.emplace(
+ connector_id,
+ std::make_shared<UdpTunnelConnector>(
+ socket_, strand_, receive_callback_,
+ [](Connector *, const std::error_code &) {}, [](Connector *) {},
+ [](Connector *, const std::error_code &) {},
+ std::move(remote_endpoint_)));
+ connector = ret.first;
+ connector->second->setConnectorId(connector_id);
+ }
+
+ /**
+ * Use connector callback to process incoming message.
+ */
+ UdpTunnelConnector *c =
+ dynamic_cast<UdpTunnelConnector *>(connector->second.get());
+ c->doRecvPacket(packet);
+
+ ++current_position_;
+ }
+
+ doRecvPacket();
+ } else if (ec.value() == static_cast<int>(std::errc::operation_canceled)) {
+ LOG(ERROR) << "The connection has been closed by the application.";
+ return;
+ } else {
+ LOG(ERROR) << ec.value() << " " << ec.message();
+ }
+}
+#endif
+
+void UdpTunnelListener::doRecvPacket() {
+#ifdef LINUX
+#if ((ASIO_VERSION / 100 % 1000) < 11)
+ socket_->async_receive(
+ asio::null_buffers(),
+#else
+ socket_->async_wait(
+ asio::ip::tcp::socket::wait_read,
+#endif
+ std::bind(&UdpTunnelListener::readHandler, this, std::placeholders::_1));
+#else
+ read_msg_ = Connector::getRawBuffer();
+ socket_->async_receive_from(
+ asio::buffer(read_msg_.first, read_msg_.second), remote_endpoint_,
+ [this](const std::error_code &ec, std::size_t length) {
+ if (TRANSPORT_EXPECT_TRUE(!ec)) {
+ auto packet = Connector::getPacketFromBuffer(read_msg_.first, length);
+ auto connector_id =
+ std::hash<asio::ip::udp::endpoint>{}(remote_endpoint_);
+ auto connector = connectors_.find(connector_id);
+ if (connector == connectors_.end()) {
+ // Create new connector corresponding to new client
+ auto ret = connectors_.emplace(
+ connector_id, std::make_shared<UdpTunnelConnector>(
+ socket_, strand_, receive_callback_,
+ [](Connector *, const std::error_code &) {},
+ [](Connector *) {},
+ [](Connector *, const std::error_code &) {},
+ std::move(remote_endpoint_)));
+ connector = ret.first;
+ connector->second->setConnectorId(connector_id);
+ }
+
+ UdpTunnelConnector *c =
+ dynamic_cast<UdpTunnelConnector *>(connector->second.get());
+ c->doRecvPacket(packet);
+ doRecvPacket();
+ } else if (ec.value() ==
+ static_cast<int>(std::errc::operation_canceled)) {
+ LOG(ERROR) << "The connection has been closed by the application.";
+ return;
+ } else {
+ LOG(ERROR) << ec.value() << " " << ec.message();
+ }
+ });
+#endif
+}
+} // namespace core
+} // namespace transport \ No newline at end of file
diff --git a/libtransport/src/core/udp_listener.h b/libtransport/src/core/udp_listener.h
new file mode 100644
index 000000000..813520309
--- /dev/null
+++ b/libtransport/src/core/udp_listener.h
@@ -0,0 +1,109 @@
+/*
+ * Copyright (c) 2021 Cisco and/or its affiliates.
+ */
+
+#pragma once
+
+#include <hicn/transport/core/asio_wrapper.h>
+#include <hicn/transport/core/connector.h>
+#include <hicn/transport/portability/platform.h>
+
+#include <unordered_map>
+
+namespace std {
+template <>
+struct hash<asio::ip::udp::endpoint> {
+ size_t operator()(const asio::ip::udp::endpoint &endpoint) const;
+};
+} // namespace std
+
+namespace transport {
+namespace core {
+
+class UdpTunnelListener
+ : public std::enable_shared_from_this<UdpTunnelListener> {
+ using PacketReceivedCallback = Connector::PacketReceivedCallback;
+ using EndpointId = std::pair<uint32_t, uint16_t>;
+
+ static constexpr uint16_t default_port = 5004;
+
+ public:
+ using Ptr = std::shared_ptr<UdpTunnelListener>;
+
+ template <typename ReceiveCallback>
+ UdpTunnelListener(asio::io_service &io_service,
+ ReceiveCallback &&receive_callback,
+ asio::ip::udp::endpoint endpoint = asio::ip::udp::endpoint(
+ asio::ip::udp::v4(), default_port))
+ : io_service_(io_service),
+ strand_(std::make_shared<asio::io_service::strand>(io_service_)),
+ socket_(std::make_shared<asio::ip::udp::socket>(io_service_,
+ endpoint.protocol())),
+ local_endpoint_(endpoint),
+ receive_callback_(std::forward<ReceiveCallback &&>(receive_callback)),
+#ifndef LINUX
+ read_msg_(nullptr, 0)
+#else
+ iovecs_{},
+ msgs_{},
+ current_position_(0)
+#endif
+ {
+ if (endpoint.protocol() == asio::ip::udp::v6()) {
+ std::error_code ec;
+ socket_->set_option(asio::ip::v6_only(false), ec);
+ // Call succeeds only on dual stack systems.
+ }
+ socket_->bind(local_endpoint_);
+ io_service_.post(std::bind(&UdpTunnelListener::doRecvPacket, this));
+ }
+
+ ~UdpTunnelListener();
+
+ void close();
+
+ int deleteConnector(Connector *connector) {
+ return connectors_.erase(connector->getConnectorId());
+ }
+
+ template <typename ReceiveCallback>
+ void setReceiveCallback(ReceiveCallback &&callback) {
+ receive_callback_ = std::forward<ReceiveCallback &&>(callback);
+ }
+
+ Connector *findConnector(Connector::Id connId) {
+ auto it = connectors_.find(connId);
+ if (it != connectors_.end()) {
+ return it->second.get();
+ }
+
+ return nullptr;
+ }
+
+ private:
+ void doRecvPacket();
+
+ void readHandler(const std::error_code &ec);
+
+ asio::io_service &io_service_;
+ std::shared_ptr<asio::io_service::strand> strand_;
+ std::shared_ptr<asio::ip::udp::socket> socket_;
+ asio::ip::udp::endpoint local_endpoint_;
+ asio::ip::udp::endpoint remote_endpoint_;
+ std::unordered_map<Connector::Id, std::shared_ptr<Connector>> connectors_;
+
+ PacketReceivedCallback receive_callback_;
+
+#ifdef LINUX
+ struct iovec iovecs_[Connector::max_burst][8];
+ struct mmsghdr msgs_[Connector::max_burst];
+ struct sockaddr_storage remote_endpoints_[Connector::max_burst];
+ std::uint8_t current_position_;
+#else
+ std::pair<uint8_t *, std::size_t> read_msg_;
+#endif
+};
+
+} // namespace core
+
+} // namespace transport