aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/hicn/transport/protocols
diff options
context:
space:
mode:
authorMauro Sardara <msardara@cisco.com>2019-02-04 11:06:18 +0100
committerAlberto Compagno <acompagn+fdio@cisco.com>2019-03-05 09:56:19 +0000
commit6d7704c1b497341fd6dd3c27e3f64d0db062ccc2 (patch)
tree668c6820653cd84da8474d330d2807a8765f96b5 /libtransport/src/hicn/transport/protocols
parentca66305af16e2f8d8f271218ea71f132e6c21916 (diff)
[HICN-11] Rework on transport protocols improving components modularity
Change-Id: I6683ec5b494238dc93591c103d25275e89b9f267 Signed-off-by: Mauro Sardara <msardara@cisco.com>
Diffstat (limited to 'libtransport/src/hicn/transport/protocols')
-rw-r--r--libtransport/src/hicn/transport/protocols/CMakeLists.txt27
-rw-r--r--libtransport/src/hicn/transport/protocols/cbr.cc30
-rw-r--r--libtransport/src/hicn/transport/protocols/cbr.h18
-rw-r--r--libtransport/src/hicn/transport/protocols/congestion_window_protocol.h (renamed from libtransport/src/hicn/transport/protocols/vegas_rto_estimator.h)28
-rw-r--r--libtransport/src/hicn/transport/protocols/indexing_manager.h193
-rw-r--r--libtransport/src/hicn/transport/protocols/manifest_indexing_manager.cc217
-rw-r--r--libtransport/src/hicn/transport/protocols/manifest_indexing_manager.h69
-rw-r--r--libtransport/src/hicn/transport/protocols/packet_manager.h66
-rw-r--r--libtransport/src/hicn/transport/protocols/protocol.cc52
-rw-r--r--libtransport/src/hicn/transport/protocols/protocol.h37
-rw-r--r--libtransport/src/hicn/transport/protocols/raaqm.cc575
-rw-r--r--libtransport/src/hicn/transport/protocols/raaqm.h79
-rw-r--r--libtransport/src/hicn/transport/protocols/raaqm_data_path.cc16
-rw-r--r--libtransport/src/hicn/transport/protocols/raaqm_data_path.h12
-rw-r--r--libtransport/src/hicn/transport/protocols/rate_estimation.cc5
-rw-r--r--libtransport/src/hicn/transport/protocols/rate_estimation.h5
-rw-r--r--libtransport/src/hicn/transport/protocols/reassembly.cc83
-rw-r--r--libtransport/src/hicn/transport/protocols/reassembly.h71
-rw-r--r--libtransport/src/hicn/transport/protocols/rtc.cc74
-rw-r--r--libtransport/src/hicn/transport/protocols/rtc.h28
-rw-r--r--libtransport/src/hicn/transport/protocols/statistics.h94
-rw-r--r--libtransport/src/hicn/transport/protocols/vegas.cc627
-rw-r--r--libtransport/src/hicn/transport/protocols/vegas.h161
-rw-r--r--libtransport/src/hicn/transport/protocols/vegas_rto_estimator.cc57
-rw-r--r--libtransport/src/hicn/transport/protocols/verification_manager.h72
25 files changed, 1505 insertions, 1191 deletions
diff --git a/libtransport/src/hicn/transport/protocols/CMakeLists.txt b/libtransport/src/hicn/transport/protocols/CMakeLists.txt
index 1c3b76c24..84a4d5279 100644
--- a/libtransport/src/hicn/transport/protocols/CMakeLists.txt
+++ b/libtransport/src/hicn/transport/protocols/CMakeLists.txt
@@ -14,33 +14,52 @@
cmake_minimum_required(VERSION 3.5 FATAL_ERROR)
list(APPEND HEADER_FILES
+ ${CMAKE_CURRENT_SOURCE_DIR}/indexing_manager.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/reassembly.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/congestion_window_protocol.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/packet_manager.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/statistics.h
${CMAKE_CURRENT_SOURCE_DIR}/rate_estimation.h
${CMAKE_CURRENT_SOURCE_DIR}/download_observer.h
- ${CMAKE_CURRENT_SOURCE_DIR}/vegas.h
${CMAKE_CURRENT_SOURCE_DIR}/protocol.h
${CMAKE_CURRENT_SOURCE_DIR}/raaqm.h
- ${CMAKE_CURRENT_SOURCE_DIR}/vegas_rto_estimator.h
${CMAKE_CURRENT_SOURCE_DIR}/raaqm_data_path.h
${CMAKE_CURRENT_SOURCE_DIR}/cbr.h
${CMAKE_CURRENT_SOURCE_DIR}/rtc.h
${CMAKE_CURRENT_SOURCE_DIR}/rtc_data_path.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/manifest_indexing_manager.h
)
list(APPEND SOURCE_FILES
- ${CMAKE_CURRENT_SOURCE_DIR}/vegas.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/reassembly.cc
${CMAKE_CURRENT_SOURCE_DIR}/protocol.cc
${CMAKE_CURRENT_SOURCE_DIR}/raaqm.cc
- ${CMAKE_CURRENT_SOURCE_DIR}/vegas_rto_estimator.cc
${CMAKE_CURRENT_SOURCE_DIR}/rate_estimation.cc
${CMAKE_CURRENT_SOURCE_DIR}/raaqm_data_path.cc
${CMAKE_CURRENT_SOURCE_DIR}/cbr.cc
${CMAKE_CURRENT_SOURCE_DIR}/rtc.cc
${CMAKE_CURRENT_SOURCE_DIR}/rtc_data_path.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/manifest_indexing_manager.cc
+)
+
+set(RAAQM_CONFIG_INSTALL_PREFIX
+${CMAKE_INSTALL_PREFIX}/etc/hicn
+)
+
+set(raaqm_config_path
+ ${RAAQM_CONFIG_INSTALL_PREFIX}/consumer.conf
+ PARENT_SCOPE
)
set(TRANSPORT_CONFIG
${CMAKE_CURRENT_SOURCE_DIR}/consumer.conf
)
+install(
+ FILES ${TRANSPORT_CONFIG}
+ DESTINATION etc/hicn
+ COMPONENT lib${LIBTRANSPORT}
+)
+
set(SOURCE_FILES ${SOURCE_FILES} PARENT_SCOPE)
set(HEADER_FILES ${HEADER_FILES} PARENT_SCOPE) \ No newline at end of file
diff --git a/libtransport/src/hicn/transport/protocols/cbr.cc b/libtransport/src/hicn/transport/protocols/cbr.cc
index 3da4819c3..efd2149ad 100644
--- a/libtransport/src/hicn/transport/protocols/cbr.cc
+++ b/libtransport/src/hicn/transport/protocols/cbr.cc
@@ -22,25 +22,27 @@ namespace protocol {
using namespace interface;
-CbrTransportProtocol::CbrTransportProtocol(BaseSocket *icnet_socket)
- : VegasTransportProtocol(icnet_socket) {}
-
-void CbrTransportProtocol::start(
- utils::SharableVector<uint8_t> &receive_buffer) {
- current_window_size_ = socket_->current_window_size_;
- VegasTransportProtocol::start(receive_buffer);
+CbrTransportProtocol::CbrTransportProtocol(
+ interface::ConsumerSocket *icnet_socket)
+ : RaaqmTransportProtocol(icnet_socket) {}
+
+int CbrTransportProtocol::start() {
+ socket_->getSocketOption(GeneralTransportOptions::CURRENT_WINDOW_SIZE,
+ current_window_size_);
+ return RaaqmTransportProtocol::start();
}
-void CbrTransportProtocol::changeInterestLifetime(uint64_t segment) { return; }
-
-void CbrTransportProtocol::increaseWindow() {}
-
-void CbrTransportProtocol::decreaseWindow() {}
-
void CbrTransportProtocol::afterDataUnsatisfied(uint64_t segment) {}
void CbrTransportProtocol::afterContentReception(
- const Interest &interest, const ContentObject &content_object) {}
+ const Interest &interest, const ContentObject &content_object) {
+ auto segment = content_object.getName().getSuffix();
+ auto now = utils::SteadyClock::now();
+ auto rtt = std::chrono::duration_cast<utils::Microseconds>(
+ now - interest_timepoints_[segment & mask]);
+ // Update stats
+ updateStats(segment, rtt.count(), now);
+}
} // end namespace protocol
diff --git a/libtransport/src/hicn/transport/protocols/cbr.h b/libtransport/src/hicn/transport/protocols/cbr.h
index 0a572292a..a8eff2182 100644
--- a/libtransport/src/hicn/transport/protocols/cbr.h
+++ b/libtransport/src/hicn/transport/protocols/cbr.h
@@ -15,32 +15,22 @@
#pragma once
-#include <hicn/transport/protocols/raaqm_data_path.h>
-#include <hicn/transport/protocols/rate_estimation.h>
-#include <hicn/transport/protocols/vegas.h>
-#include <hicn/transport/protocols/vegas_rto_estimator.h>
+#include <hicn/transport/protocols/raaqm.h>
namespace transport {
namespace protocol {
-class CbrTransportProtocol : public VegasTransportProtocol {
+class CbrTransportProtocol : public RaaqmTransportProtocol {
public:
- CbrTransportProtocol(interface::BaseSocket *icnet_socket);
+ CbrTransportProtocol(interface::ConsumerSocket *icnet_socket);
- void start(utils::SharableVector<uint8_t> &receive_buffer) override;
+ int start() override;
private:
void afterContentReception(const Interest &interest,
const ContentObject &content_object) override;
-
void afterDataUnsatisfied(uint64_t segment) override;
-
- void increaseWindow() override;
-
- void decreaseWindow() override;
-
- void changeInterestLifetime(uint64_t segment) override;
};
} // end namespace protocol
diff --git a/libtransport/src/hicn/transport/protocols/vegas_rto_estimator.h b/libtransport/src/hicn/transport/protocols/congestion_window_protocol.h
index e84afc49c..36ac6eb17 100644
--- a/libtransport/src/hicn/transport/protocols/vegas_rto_estimator.h
+++ b/libtransport/src/hicn/transport/protocols/congestion_window_protocol.h
@@ -15,34 +15,16 @@
#pragma once
-#include <chrono>
-
-// Implementation inspired from RFC6298
-// (https://tools.ietf.org/search/rfc6298#ref-JK88)
-
namespace transport {
namespace protocol {
-class RtoEstimator {
- public:
- typedef std::chrono::microseconds Duration;
-
- static Duration getInitialRtt() { return std::chrono::seconds(1); }
-
- RtoEstimator(Duration min_rto = std::chrono::seconds(1));
-
- void addMeasurement(Duration measure);
-
- Duration computeRto() const;
-
- private:
- double smoothed_rtt_;
- double rtt_variation_;
- bool first_measurement_;
- double last_rto_;
+class CWindowProtocol {
+ protected:
+ virtual void increaseWindow() = 0;
+ virtual void decreaseWindow() = 0;
};
} // end namespace protocol
-} // end namespace transport \ No newline at end of file
+} // end namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/indexing_manager.h b/libtransport/src/hicn/transport/protocols/indexing_manager.h
new file mode 100644
index 000000000..888b17d9d
--- /dev/null
+++ b/libtransport/src/hicn/transport/protocols/indexing_manager.h
@@ -0,0 +1,193 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/errors/runtime_exception.h>
+#include <hicn/transport/errors/unexpected_manifest_exception.h>
+#include <hicn/transport/interfaces/socket_consumer.h>
+#include <hicn/transport/protocols/verification_manager.h>
+#include <hicn/transport/utils/literals.h>
+
+#include <deque>
+
+namespace transport {
+
+namespace protocol {
+
+class IndexManager {
+ public:
+ static constexpr uint32_t invalid_index = ~0;
+
+ /**
+ *
+ */
+ virtual ~IndexManager() = default;
+ /**
+ * Retrieve from the manifest the next suffix to retrieve.
+ */
+ virtual uint32_t getNextSuffix() = 0;
+
+ /**
+ * Retrive the next segment to be reassembled.
+ */
+ virtual uint32_t getNextReassemblySegment() = 0;
+
+ virtual bool isFinalSuffixDiscovered() = 0;
+
+ virtual uint32_t getFinalSuffix() = 0;
+
+ virtual void reset() = 0;
+};
+
+class IndexVerificationManager : public IndexManager {
+ public:
+ /**
+ *
+ */
+ virtual ~IndexVerificationManager() = default;
+
+ /**
+ * The ownership of the ContentObjectManifest is moved
+ * from the caller to the VerificationManager
+ */
+ virtual bool onManifest(core::ContentObject::Ptr &&content_object) = 0;
+
+ /**
+ * The content object must just be verified; the ownership is still of the
+ * caller.
+ */
+ virtual bool onContentObject(const core::ContentObject &content_object) = 0;
+};
+
+class ZeroIndexManager : public IndexVerificationManager {
+ public:
+ ZeroIndexManager() : reset_(true) {}
+
+ TRANSPORT_ALWAYS_INLINE virtual void reset() override { reset_ = true; }
+
+ /**
+ * Retrieve from the manifest the next suffix to retrieve.
+ */
+ TRANSPORT_ALWAYS_INLINE virtual uint32_t getNextSuffix() override {
+ uint32_t ret = reset_ ? 0 : IndexManager::invalid_index;
+ reset_ = false;
+ return ret;
+ }
+
+ /**
+ * Retrive the next segment to be reassembled.
+ */
+ TRANSPORT_ALWAYS_INLINE virtual uint32_t getNextReassemblySegment() override {
+ return IndexManager::invalid_index;
+ }
+
+ TRANSPORT_ALWAYS_INLINE virtual bool isFinalSuffixDiscovered() override {
+ return false;
+ }
+
+ TRANSPORT_ALWAYS_INLINE virtual uint32_t getFinalSuffix() override {
+ return IndexManager::invalid_index;
+ }
+
+ TRANSPORT_ALWAYS_INLINE bool onManifest(
+ core::ContentObject::Ptr &&content_object) override {
+ throw errors::UnexpectedManifestException();
+ }
+
+ TRANSPORT_ALWAYS_INLINE bool onContentObject(
+ const core::ContentObject &content_object) override {
+ throw errors::RuntimeException(
+ "Called onContentObject on a ZeroIndexManager, which is not able to "
+ "process packets.");
+ }
+
+ private:
+ bool reset_;
+};
+
+class IncrementalIndexManager : public IndexVerificationManager {
+ public:
+ IncrementalIndexManager(interface::ConsumerSocket *icn_socket)
+ : socket_(icn_socket),
+ final_suffix_(std::numeric_limits<uint64_t>::max()),
+ next_download_suffix_(0),
+ next_reassembly_suffix_(0),
+ verification_manager_(
+ std::make_unique<SignatureVerificationManager>(icn_socket)) {}
+
+ /**
+ *
+ */
+ virtual ~IncrementalIndexManager() {}
+
+ TRANSPORT_ALWAYS_INLINE virtual void reset() override {
+ final_suffix_ = std::numeric_limits<uint64_t>::max();
+ next_download_suffix_ = 0;
+ next_reassembly_suffix_ = 0;
+ }
+
+ /**
+ * Retrieve from the manifest the next suffix to retrieve.
+ */
+ TRANSPORT_ALWAYS_INLINE virtual uint32_t getNextSuffix() override {
+ return next_download_suffix_ <= final_suffix_ ? next_download_suffix_++
+ : IndexManager::invalid_index;
+ }
+
+ /**
+ * Retrive the next segment to be reassembled.
+ */
+ TRANSPORT_ALWAYS_INLINE virtual uint32_t getNextReassemblySegment() override {
+ return next_reassembly_suffix_ <= final_suffix_
+ ? next_reassembly_suffix_++
+ : IndexManager::invalid_index;
+ }
+
+ TRANSPORT_ALWAYS_INLINE virtual bool isFinalSuffixDiscovered() override {
+ return final_suffix_ != std::numeric_limits<uint64_t>::max();
+ }
+
+ TRANSPORT_ALWAYS_INLINE virtual uint32_t getFinalSuffix() override {
+ return final_suffix_;
+ }
+
+ TRANSPORT_ALWAYS_INLINE bool onManifest(
+ core::ContentObject::Ptr &&content_object) override {
+ throw errors::UnexpectedManifestException();
+ }
+
+ TRANSPORT_ALWAYS_INLINE bool onContentObject(
+ const core::ContentObject &content_object) override {
+ auto ret = verification_manager_->onPacketToVerify(content_object);
+
+ if (TRANSPORT_EXPECT_FALSE(content_object.testRst())) {
+ final_suffix_ = content_object.getName().getSuffix();
+ }
+
+ return ret;
+ }
+
+ protected:
+ interface::ConsumerSocket *socket_;
+ uint64_t final_suffix_;
+ uint64_t next_download_suffix_;
+ uint64_t next_reassembly_suffix_;
+ std::unique_ptr<VerificationManager> verification_manager_;
+};
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.cc b/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.cc
new file mode 100644
index 000000000..1afb4eaac
--- /dev/null
+++ b/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.cc
@@ -0,0 +1,217 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/transport/interfaces/socket_consumer.h>
+#include <hicn/transport/protocols/manifest_indexing_manager.h>
+
+#include <cmath>
+#include <deque>
+
+namespace transport {
+
+namespace protocol {
+
+using namespace interface;
+
+ManifestIndexManager::ManifestIndexManager(
+ interface::ConsumerSocket *icn_socket)
+ : IncrementalIndexManager(icn_socket),
+ PacketManager<Interest>(1024),
+ next_reassembly_segment_(suffix_queue_.end()),
+ next_to_retrieve_segment_(suffix_queue_.end()),
+ next_manifest_(0) {}
+
+bool ManifestIndexManager::onManifest(
+ core::ContentObject::Ptr &&content_object) {
+ auto manifest =
+ std::make_unique<ContentObjectManifest>(std::move(*content_object));
+
+ bool manifest_verified = verification_manager_->onPacketToVerify(*manifest);
+
+ if (manifest_verified) {
+ manifest->decode();
+
+ if (TRANSPORT_EXPECT_FALSE(manifest->getVersion() !=
+ core::ManifestVersion::VERSION_1)) {
+ throw errors::RuntimeException("Received manifest with unknown version.");
+ }
+
+ switch (manifest->getManifestType()) {
+ case core::ManifestType::INLINE_MANIFEST: {
+ auto _it = manifest->getSuffixList().begin();
+ auto _end = --manifest->getSuffixList().end();
+
+ if (TRANSPORT_EXPECT_FALSE(manifest->isFinalManifest())) {
+ _end++;
+ }
+
+ // Get final block number
+ final_suffix_ = manifest->getFinalBlockNumber();
+
+ suffix_hash_map_[_it->first] =
+ std::make_pair(std::vector<uint8_t>(_it->second, _it->second + 32),
+ manifest->getHashAlgorithm());
+ suffix_queue_.push_back(_it->first);
+
+ // If the transport protocol finished the list of segments to retrieve,
+ // reset the next_to_retrieve_segment_ iterator to the next segment
+ // provided by this manifest.
+ if (TRANSPORT_EXPECT_FALSE(next_to_retrieve_segment_ ==
+ suffix_queue_.end())) {
+ next_to_retrieve_segment_ = --suffix_queue_.end();
+ }
+
+ std::advance(_it, 1);
+ for (; _it != _end; _it++) {
+ suffix_hash_map_[_it->first] = std::make_pair(
+ std::vector<uint8_t>(_it->second, _it->second + 32),
+ manifest->getHashAlgorithm());
+ suffix_queue_.push_back(_it->first);
+ }
+
+ if (TRANSPORT_EXPECT_FALSE(manifest->getName().getSuffix()) == 0) {
+ // Set the iterators to the beginning of the suffix queue
+ next_reassembly_segment_ = suffix_queue_.begin();
+ }
+
+ if (TRANSPORT_EXPECT_FALSE(manifest->isFinalManifest() ||
+ next_manifest_ > final_suffix_)) {
+ break;
+ }
+
+ // Get current window size
+ double current_window_size = 0.;
+ socket_->getSocketOption(GeneralTransportOptions::CURRENT_WINDOW_SIZE,
+ current_window_size);
+
+ // Get portal
+ std::shared_ptr<interface::BasePortal> portal;
+ socket_->getSocketOption(GeneralTransportOptions::PORTAL, portal);
+
+ // Number of segments in manifest
+ std::size_t segments_in_manifest = std::distance(
+ manifest->getSuffixList().begin(), manifest->getSuffixList().end());
+ std::size_t segment_count = 0;
+
+ // Manifest namespace
+ Name &name = manifest->getWritableName();
+
+ // Send as many manifest as required for filling window.
+ do {
+ segment_count += segments_in_manifest;
+ next_manifest_ += segments_in_manifest;
+
+ Interest::Ptr interest = getPacket();
+ name.setSuffix(next_manifest_);
+ interest->setName(name);
+
+ uint32_t interest_lifetime;
+ socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME,
+ interest_lifetime);
+ interest->setLifetime(interest_lifetime);
+
+ // Send requests for manifest out of the congestion window (no
+ // in_flight_interest++)
+ portal->sendInterest(std::move(interest));
+ } while (segment_count < current_window_size &&
+ next_manifest_ < final_suffix_);
+
+ break;
+ }
+ case core::ManifestType::FLIC_MANIFEST: {
+ throw errors::NotImplementedException();
+ }
+ case core::ManifestType::FINAL_CHUNK_NUMBER: {
+ throw errors::NotImplementedException();
+ }
+ }
+ }
+
+ return manifest_verified;
+}
+
+bool ManifestIndexManager::onContentObject(
+ const core::ContentObject &content_object) {
+ bool verify_signature;
+ socket_->getSocketOption(GeneralTransportOptions::VERIFY_SIGNATURE,
+ verify_signature);
+
+ if (!verify_signature) {
+ return true;
+ }
+
+ uint64_t segment = content_object.getName().getSuffix();
+
+ bool ret = false;
+
+ auto it = suffix_hash_map_.find(segment);
+ if (it != suffix_hash_map_.end()) {
+ auto hash_type = static_cast<utils::CryptoHashType>(it->second.second);
+ auto data_packet_digest = content_object.computeDigest(it->second.second);
+ auto data_packet_digest_bytes =
+ data_packet_digest.getDigest<uint8_t>().data();
+ std::vector<uint8_t> &manifest_digest_bytes = it->second.first;
+
+ if (utils::CryptoHash::compareBinaryDigest(data_packet_digest_bytes,
+ manifest_digest_bytes.data(),
+ hash_type)) {
+ suffix_hash_map_.erase(it);
+ ret = true;
+ } else {
+ throw errors::RuntimeException(
+ "Verification failure policy has to be implemented.");
+ }
+ }
+
+ return ret;
+}
+
+uint32_t ManifestIndexManager::getNextSuffix() {
+ if (TRANSPORT_EXPECT_FALSE(next_to_retrieve_segment_ ==
+ suffix_queue_.end())) {
+ return invalid_index;
+ }
+
+ return *next_to_retrieve_segment_++;
+}
+
+uint32_t ManifestIndexManager::getFinalSuffix() { return final_suffix_; }
+
+bool ManifestIndexManager::isFinalSuffixDiscovered() {
+ return IncrementalIndexManager::isFinalSuffixDiscovered();
+}
+
+uint32_t ManifestIndexManager::getNextReassemblySegment() {
+ if (TRANSPORT_EXPECT_FALSE(next_reassembly_segment_ == suffix_queue_.end())) {
+ return invalid_index;
+ }
+
+ if (TRANSPORT_EXPECT_TRUE(next_reassembly_segment_ !=
+ suffix_queue_.begin())) {
+ suffix_queue_.erase(std::prev(next_reassembly_segment_));
+ }
+
+ return *next_reassembly_segment_++;
+}
+
+void ManifestIndexManager::reset() {
+ IncrementalIndexManager::reset();
+ suffix_queue_.clear();
+ suffix_hash_map_.clear();
+}
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.h b/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.h
new file mode 100644
index 000000000..ee2c531c7
--- /dev/null
+++ b/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.h
@@ -0,0 +1,69 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/interfaces/socket.h>
+#include <hicn/transport/protocols/indexing_manager.h>
+
+#include <list>
+
+namespace transport {
+
+namespace protocol {
+
+class ManifestIndexManager : public IncrementalIndexManager,
+ public PacketManager<Interest> {
+ static constexpr double alpha = 0.3;
+
+ public:
+ using SuffixQueue = std::list<uint32_t>;
+ using HashEntry = std::pair<std::vector<uint8_t>, core::HashAlgorithm>;
+
+ ManifestIndexManager(interface::ConsumerSocket *icn_socket);
+
+ virtual ~ManifestIndexManager() = default;
+
+ void reset() override;
+
+ bool onManifest(core::ContentObject::Ptr &&content_object) override;
+
+ bool onContentObject(const core::ContentObject &content_object) override;
+
+ uint32_t getNextSuffix() override;
+
+ uint32_t getNextReassemblySegment() override;
+
+ bool isFinalSuffixDiscovered() override;
+
+ uint32_t getFinalSuffix() override;
+
+ protected:
+ SuffixQueue suffix_queue_;
+ SuffixQueue::iterator next_reassembly_segment_;
+ SuffixQueue::iterator next_to_retrieve_segment_;
+
+ // Hash verification
+ std::unordered_map<uint32_t,
+ std::pair<std::vector<uint8_t>, core::HashAlgorithm>>
+ suffix_hash_map_;
+
+ // Next Manifest
+ std::uint32_t next_manifest_;
+};
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/packet_manager.h b/libtransport/src/hicn/transport/protocols/packet_manager.h
new file mode 100644
index 000000000..53486edde
--- /dev/null
+++ b/libtransport/src/hicn/transport/protocols/packet_manager.h
@@ -0,0 +1,66 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/utils/object_pool.h>
+
+namespace transport {
+
+namespace protocol {
+
+using namespace core;
+
+template <typename PacketType>
+class PacketManager {
+ static_assert(std::is_base_of<Packet, PacketType>::value,
+ "The packet manager support just Interest and Data.");
+
+ static constexpr std::size_t packet_pool_size = 4096;
+
+ public:
+ PacketManager(std::size_t size = packet_pool_size) : size_(0) {
+ // Create pool of interests
+ increasePoolSize(size);
+ }
+
+ TRANSPORT_ALWAYS_INLINE void increasePoolSize(std::size_t size) {
+ for (std::size_t i = 0; i < size; i++) {
+ interest_pool_.add(new PacketType());
+ }
+
+ size_ += size;
+ }
+
+ TRANSPORT_ALWAYS_INLINE typename PacketType::Ptr getPacket() {
+ auto result = interest_pool_.get();
+
+ while (TRANSPORT_EXPECT_FALSE(!result.first)) {
+ // Add packets to the pool
+ increasePoolSize(size_);
+ result = interest_pool_.get();
+ }
+
+ return std::move(result.second);
+ }
+
+ private:
+ utils::ObjectPool<PacketType> interest_pool_;
+ std::size_t size_;
+};
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/protocol.cc b/libtransport/src/hicn/transport/protocols/protocol.cc
index ea4fd6dbf..9caa2eca7 100644
--- a/libtransport/src/hicn/transport/protocols/protocol.cc
+++ b/libtransport/src/hicn/transport/protocols/protocol.cc
@@ -20,24 +20,50 @@ namespace transport {
namespace protocol {
-TransportProtocol::TransportProtocol(interface::BaseSocket *icn_socket)
- : socket_(dynamic_cast<interface::ConsumerSocket *>(icn_socket)),
- is_running_(false),
- interest_pool_() {
- // Create pool of interests
- increasePoolSize();
+using namespace interface;
+
+TransportProtocol::TransportProtocol(interface::ConsumerSocket *icn_socket)
+ : socket_(icn_socket), is_running_(false) {
+ socket_->getSocketOption(GeneralTransportOptions::PORTAL, portal_);
+}
+
+int TransportProtocol::start() {
+ // If the protocol is already running, return
+ if (is_running_) return -1;
+
+ // Set the protocol as running
+ is_running_ = true;
+
+ // Reset the protocol state machine
+ reset();
+
+ // Schedule next interests
+ scheduleNextInterests();
+
+ // Start Event loop
+ portal_->runEventsLoop();
+
+ // Not running anymore
+ is_running_ = false;
+
+ return 0;
}
-TransportProtocol::~TransportProtocol() {}
+void TransportProtocol::stop() {
+ is_running_ = false;
+ portal_->stopEventsLoop();
+}
+
+void TransportProtocol::resume() {
+ if (is_running_) return;
+
+ is_running_ = true;
-void TransportProtocol::updatePortal() { portal_ = socket_->portal_; }
+ scheduleNextInterests();
-bool TransportProtocol::isRunning() { return is_running_; }
+ portal_->runEventsLoop();
-void TransportProtocol::increasePoolSize(std::size_t size) {
- for (std::size_t i = 0; i < size; i++) {
- interest_pool_.add(new Interest());
- }
+ is_running_ = false;
}
} // end namespace protocol
diff --git a/libtransport/src/hicn/transport/protocols/protocol.h b/libtransport/src/hicn/transport/protocols/protocol.h
index 56c57e025..6a3b23753 100644
--- a/libtransport/src/hicn/transport/protocols/protocol.h
+++ b/libtransport/src/hicn/transport/protocols/protocol.h
@@ -16,8 +16,9 @@
#pragma once
#include <hicn/transport/interfaces/socket.h>
+#include <hicn/transport/protocols/packet_manager.h>
+#include <hicn/transport/protocols/statistics.h>
#include <hicn/transport/utils/object_pool.h>
-#include <hicn/transport/utils/sharable_vector.h>
namespace transport {
@@ -31,39 +32,28 @@ class TransportProtocolCallback {
virtual void onTimeout(const core::Interest &interest) = 0;
};
-class TransportProtocol : public interface::BasePortal::ConsumerCallback {
+class TransportProtocol : public interface::BasePortal::ConsumerCallback,
+ public PacketManager<Interest> {
static constexpr std::size_t interest_pool_size = 4096;
public:
- TransportProtocol(interface::BaseSocket *icn_socket);
+ TransportProtocol(interface::ConsumerSocket *icn_socket);
- virtual ~TransportProtocol();
+ virtual ~TransportProtocol() { stop(); };
- void updatePortal();
+ TRANSPORT_ALWAYS_INLINE bool isRunning() { return is_running_; }
- bool isRunning();
+ virtual int start();
- virtual void start(utils::SharableVector<uint8_t> &content_buffer) = 0;
+ virtual void stop();
- virtual void stop() = 0;
+ virtual void resume();
- virtual void resume() = 0;
+ virtual void scheduleNextInterests() = 0;
protected:
- virtual void increasePoolSize(std::size_t size = interest_pool_size);
-
- TRANSPORT_ALWAYS_INLINE Interest::Ptr getInterest() {
- auto result = interest_pool_.get();
-
- while (TRANSPORT_EXPECT_FALSE(!result.first)) {
- // Add packets to the pool
- increasePoolSize();
- result = interest_pool_.get();
- }
-
- return std::move(result.second);
- }
// Consumer Callback
+ virtual void reset() = 0;
virtual void onContentObject(Interest::Ptr &&i, ContentObject::Ptr &&c) = 0;
virtual void onTimeout(Interest::Ptr &&i) = 0;
@@ -71,7 +61,8 @@ class TransportProtocol : public interface::BasePortal::ConsumerCallback {
interface::ConsumerSocket *socket_;
std::shared_ptr<interface::BasePortal> portal_;
volatile bool is_running_;
- utils::ObjectPool<Interest> interest_pool_;
+ TransportStatistics stats_;
+ std::shared_ptr<std::vector<uint8_t>> content_buffer_;
};
} // end namespace protocol
diff --git a/libtransport/src/hicn/transport/protocols/raaqm.cc b/libtransport/src/hicn/transport/protocols/raaqm.cc
index 3e04689e9..f5eb48bd8 100644
--- a/libtransport/src/hicn/transport/protocols/raaqm.cc
+++ b/libtransport/src/hicn/transport/protocols/raaqm.cc
@@ -14,8 +14,10 @@
*/
#include <hicn/transport/interfaces/socket_consumer.h>
+#include <hicn/transport/protocols/manifest_indexing_manager.h>
#include <hicn/transport/protocols/raaqm.h>
+#include <cstdlib>
#include <fstream>
namespace transport {
@@ -24,8 +26,14 @@ namespace protocol {
using namespace interface;
-RaaqmTransportProtocol::RaaqmTransportProtocol(BaseSocket *icnet_socket)
- : VegasTransportProtocol(icnet_socket), rate_estimator_(NULL) {
+RaaqmTransportProtocol::RaaqmTransportProtocol(ConsumerSocket *icnet_socket)
+ : TransportProtocol(icnet_socket),
+ BaseReassembly(icnet_socket, this),
+ current_window_size_(1),
+ interests_in_flight_(0),
+ cur_path_(nullptr),
+ t0_(utils::SteadyClock::now()),
+ rate_estimator_(nullptr) {
init();
}
@@ -35,16 +43,123 @@ RaaqmTransportProtocol::~RaaqmTransportProtocol() {
}
}
+int RaaqmTransportProtocol::start() {
+ if (this->rate_estimator_) {
+ this->rate_estimator_->onStart();
+ }
+
+ if (!cur_path_) {
+ // RAAQM
+ double drop_factor;
+ double minimum_drop_probability;
+ uint32_t sample_number;
+ uint32_t interest_lifetime;
+
+ socket_->getSocketOption(RaaqmTransportOptions::DROP_FACTOR, drop_factor);
+ socket_->getSocketOption(RaaqmTransportOptions::MINIMUM_DROP_PROBABILITY,
+ minimum_drop_probability);
+ socket_->getSocketOption(RaaqmTransportOptions::SAMPLE_NUMBER,
+ sample_number);
+ socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME,
+ interest_lifetime);
+
+ // Rate Estimation
+ double alpha = 0.0;
+ uint32_t batching_param = 0;
+ uint32_t choice_param = 0;
+ socket_->getSocketOption(RateEstimationOptions::RATE_ESTIMATION_ALPHA,
+ alpha);
+ socket_->getSocketOption(
+ RateEstimationOptions::RATE_ESTIMATION_BATCH_PARAMETER, batching_param);
+ socket_->getSocketOption(RateEstimationOptions::RATE_ESTIMATION_CHOICE,
+ choice_param);
+
+ if (choice_param == 1) {
+ this->rate_estimator_ = new ALaTcpEstimator();
+ } else {
+ this->rate_estimator_ = new SimpleEstimator(alpha, batching_param);
+ }
+
+ socket_->getSocketOption(RateEstimationOptions::RATE_ESTIMATION_OBSERVER,
+ &this->rate_estimator_->observer_);
+
+ // Current path
+ auto cur_path = std::make_unique<RaaqmDataPath>(
+ drop_factor, minimum_drop_probability, interest_lifetime * 1000,
+ sample_number);
+ cur_path_ = cur_path.get();
+ path_table_[default_values::path_id] = std::move(cur_path);
+ }
+
+ portal_->setConsumerCallback(this);
+ return TransportProtocol::start();
+}
+
+void RaaqmTransportProtocol::resume() { return TransportProtocol::resume(); }
+
+void RaaqmTransportProtocol::reset() {
+ // Reset reassembly component
+ BaseReassembly::reset();
+
+ // Reset protocol variables
+ interests_in_flight_ = 0;
+ t0_ = utils::SteadyClock::now();
+}
+
+void RaaqmTransportProtocol::increaseWindow() {
+ double max_window_size = 0.;
+ socket_->getSocketOption(GeneralTransportOptions::MAX_WINDOW_SIZE,
+ max_window_size);
+ if (current_window_size_ < max_window_size) {
+ double gamma = 0.;
+ socket_->getSocketOption(RaaqmTransportOptions::GAMMA_VALUE, gamma);
+
+ current_window_size_ += gamma / current_window_size_;
+ socket_->setSocketOption(GeneralTransportOptions::CURRENT_WINDOW_SIZE,
+ current_window_size_);
+ }
+ this->rate_estimator_->onWindowIncrease(current_window_size_);
+}
+
+void RaaqmTransportProtocol::decreaseWindow() {
+ double min_window_size = 0.;
+ socket_->getSocketOption(GeneralTransportOptions::MIN_WINDOW_SIZE,
+ min_window_size);
+ if (current_window_size_ > min_window_size) {
+ double beta = 0.;
+ socket_->getSocketOption(RaaqmTransportOptions::BETA_VALUE, beta);
+
+ current_window_size_ = current_window_size_ * beta;
+ if (current_window_size_ < min_window_size) {
+ current_window_size_ = min_window_size;
+ }
+
+ socket_->setSocketOption(GeneralTransportOptions::CURRENT_WINDOW_SIZE,
+ current_window_size_);
+ }
+ this->rate_estimator_->onWindowDecrease(current_window_size_);
+}
+
+void RaaqmTransportProtocol::afterDataUnsatisfied(uint64_t segment) {
+ // Decrease the window because the timeout happened
+ decreaseWindow();
+}
+
+void RaaqmTransportProtocol::afterContentReception(
+ const Interest &interest, const ContentObject &content_object) {
+ updatePathTable(content_object);
+ increaseWindow();
+ updateRtt(interest.getName().getSuffix());
+ this->rate_estimator_->onDataReceived((int)content_object.payloadSize() +
+ content_object.headerSize());
+ // Set drop probablility and window size accordingly
+ RAAQM();
+}
+
void RaaqmTransportProtocol::init() {
std::ifstream is(RAAQM_CONFIG_PATH);
std::string line;
-
- socket_->beta_ = default_values::beta_value;
- socket_->drop_factor_ = default_values::drop_factor;
- socket_->interest_lifetime_ = default_values::interest_lifetime;
- socket_->max_retransmissions_ =
- default_values::transport_protocol_max_retransmissions;
raaqm_autotune_ = false;
default_beta_ = default_values::beta_value;
default_drop_ = default_values::drop_factor;
@@ -56,7 +171,9 @@ void RaaqmTransportProtocol::init() {
lte_delay_ = 15000;
if (!is) {
- TRANSPORT_LOGW("WARNING: RAAQM parameters not found, set default values");
+ TRANSPORT_LOGW(
+ "WARNING: RAAQM parameters not found at %s, set default values",
+ RAAQM_CONFIG_PATH);
return;
}
@@ -86,7 +203,8 @@ void RaaqmTransportProtocol::init() {
std::string tmp;
uint32_t lifetime;
line_s >> tmp >> lifetime;
- socket_->interest_lifetime_ = lifetime;
+ socket_->setSocketOption(GeneralTransportOptions::INTEREST_LIFETIME,
+ lifetime);
continue;
}
@@ -94,21 +212,23 @@ void RaaqmTransportProtocol::init() {
std::string tmp;
uint32_t rtx;
line_s >> tmp >> rtx;
- socket_->max_retransmissions_ = rtx;
+ socket_->setSocketOption(GeneralTransportOptions::MAX_INTEREST_RETX, rtx);
continue;
}
if (command == "beta") {
std::string tmp;
line_s >> tmp >> default_beta_;
- socket_->beta_ = default_beta_;
+ socket_->setSocketOption(RaaqmTransportOptions::BETA_VALUE,
+ default_beta_);
continue;
}
if (command == "drop") {
std::string tmp;
line_s >> tmp >> default_drop_;
- socket_->drop_factor_ = default_drop_;
+ socket_->setSocketOption(RaaqmTransportOptions::DROP_FACTOR,
+ default_drop_);
continue;
}
@@ -151,7 +271,8 @@ void RaaqmTransportProtocol::init() {
std::string tmp;
double rate_alpha = 0.0;
line_s >> tmp >> rate_alpha;
- socket_->rate_estimation_alpha_ = rate_alpha;
+ socket_->setSocketOption(RateEstimationOptions::RATE_ESTIMATION_ALPHA,
+ rate_alpha);
continue;
}
@@ -159,7 +280,9 @@ void RaaqmTransportProtocol::init() {
std::string tmp;
uint32_t batching_param = 0;
line_s >> tmp >> batching_param;
- socket_->rate_estimation_batching_parameter_ = batching_param;
+ socket_->setSocketOption(
+ RateEstimationOptions::RATE_ESTIMATION_BATCH_PARAMETER,
+ batching_param);
continue;
}
@@ -167,133 +290,332 @@ void RaaqmTransportProtocol::init() {
std::string tmp;
uint32_t choice_param = 0;
line_s >> tmp >> choice_param;
- socket_->rate_estimation_choice_ = choice_param;
+ socket_->setSocketOption(RateEstimationOptions::RATE_ESTIMATION_CHOICE,
+ choice_param);
continue;
}
}
+
is.close();
}
-void RaaqmTransportProtocol::start(
- utils::SharableVector<uint8_t> &content_buffer) {
- if (this->rate_estimator_) {
- this->rate_estimator_->onStart();
+void RaaqmTransportProtocol::onContentObject(
+ Interest::Ptr &&interest, ContentObject::Ptr &&content_object) {
+ uint32_t incremental_suffix = content_object->getName().getSuffix();
+
+ // Check whether makes sense to continue
+ if (TRANSPORT_EXPECT_FALSE(!is_running_)) {
+ return;
}
- if (!cur_path_) {
- double drop_factor;
- double minimum_drop_probability;
- uint32_t sample_number;
- uint32_t interest_lifetime;
- // double beta;
+ // Call application-defined callbacks
+ ConsumerContentObjectCallback *callback_content_object = nullptr;
+ socket_->getSocketOption(ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT,
+ &callback_content_object);
+ if (*callback_content_object != VOID_HANDLER) {
+ (*callback_content_object)(*socket_, *content_object);
+ }
- drop_factor = socket_->drop_factor_;
- minimum_drop_probability = socket_->minimum_drop_probability_;
- sample_number = socket_->sample_number_;
- interest_lifetime = socket_->interest_lifetime_;
- // beta = socket_->beta_;
+ ConsumerInterestCallback *callback_interest = nullptr;
+ socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_SATISFIED,
+ &callback_interest);
+ if (*callback_content_object != VOID_HANDLER) {
+ (*callback_interest)(*socket_, *interest);
+ }
- double alpha = 0.0;
- uint32_t batching_param = 0;
- uint32_t choice_param = 0;
- alpha = socket_->rate_estimation_alpha_;
- batching_param = socket_->rate_estimation_batching_parameter_;
- choice_param = socket_->rate_estimation_choice_;
+ if (TRANSPORT_EXPECT_FALSE(content_object->getPayloadType() ==
+ PayloadType::MANIFEST)) {
+ if (TRANSPORT_EXPECT_FALSE(incremental_suffix == 0)) {
+ index_manager_ = manifest_index_manager_.get();
+ interests_in_flight_--;
+ }
- if (choice_param == 1) {
- this->rate_estimator_ = new ALaTcpEstimator();
- } else {
- this->rate_estimator_ = new SimpleEstimator(alpha, batching_param);
+ index_manager_->onManifest(std::move(content_object));
+
+ } else if (content_object->getPayloadType() == PayloadType::CONTENT_OBJECT) {
+ if (TRANSPORT_EXPECT_FALSE(incremental_suffix == 0)) {
+ index_manager_ = incremental_index_manager_.get();
}
- this->rate_estimator_->observer_ = socket_->rate_estimation_observer_;
+ onContentSegment(std::move(interest), std::move(content_object));
+ }
- cur_path_ = std::make_shared<RaaqmDataPath>(
- drop_factor, minimum_drop_probability, interest_lifetime * 1000,
- sample_number);
- path_table_[default_values::path_id] = cur_path_;
+ if (TRANSPORT_EXPECT_FALSE(incremental_suffix == 0)) {
+ BaseReassembly::index_ = index_manager_->getNextReassemblySegment();
}
- VegasTransportProtocol::start(content_buffer);
+ scheduleNextInterests();
}
-void RaaqmTransportProtocol::copyContent(const ContentObject &content_object) {
- if (TRANSPORT_EXPECT_FALSE(
- (content_object.getName().getSuffix() == final_block_number_) ||
- !(is_running_))) {
- this->rate_estimator_->onDownloadFinished();
+void RaaqmTransportProtocol::onContentSegment(
+ Interest::Ptr &&interest, ContentObject::Ptr &&content_object) {
+ uint32_t incremental_suffix = content_object->getName().getSuffix();
+ bool virtual_download = false;
+ socket_->getSocketOption(OtherOptions::VIRTUAL_DOWNLOAD, virtual_download);
+
+ // Decrease in-flight interests
+ interests_in_flight_--;
+
+ // Update stats
+ if (!interest_retransmissions_[incremental_suffix & mask]) {
+ afterContentReception(*interest, *content_object);
+ }
+
+ if (index_manager_->onContentObject(*content_object)) {
+ stats_.updateBytesRecv(content_object->payloadSize());
+
+ if (!virtual_download) {
+ reassemble(std::move(content_object));
+ } else if (TRANSPORT_EXPECT_FALSE(incremental_suffix ==
+ index_manager_->getFinalSuffix())) {
+ onContentReassembled(std::make_error_code(std::errc(0)));
+ }
+ } else {
+ // TODO Application policy check
+ // unverified_segments_.emplace(
+ // std::make_pair(incremental_suffix, std::move(content_object)));
+ TRANSPORT_LOGE("Received not trusted segment.");
}
- VegasTransportProtocol::copyContent(content_object);
}
-void RaaqmTransportProtocol::updatePathTable(
- const ContentObject &content_object) {
- uint32_t path_id = content_object.getPathLabel();
+void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) {
+ checkForStalePaths();
- if (path_table_.find(path_id) == path_table_.end()) {
- if (cur_path_) {
- // Create a new path with some default param
- if (path_table_.empty()) {
- throw errors::RuntimeException(
- "No path initialized for path table, error could be in default "
- "path initialization.");
- } else {
- // Initiate the new path default param
- std::shared_ptr<RaaqmDataPath> new_path =
- std::make_shared<RaaqmDataPath>(
- *(path_table_.at(default_values::path_id)));
- // Insert the new path into hash table
- path_table_[path_id] = new_path;
- }
+ const Name &n = interest->getName();
+
+ TRANSPORT_LOGW("Timeout on %s", n.toString().c_str());
+
+ if (TRANSPORT_EXPECT_FALSE(!is_running_)) {
+ return;
+ }
+
+ interests_in_flight_--;
+
+ uint64_t segment = n.getSuffix();
+
+ // Do not retransmit interests asking contents that do not exist.
+ if (segment >= index_manager_->getFinalSuffix()) {
+ return;
+ }
+
+ ConsumerInterestCallback *callback = nullptr;
+ socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_EXPIRED,
+ &callback);
+ if (*callback != VOID_HANDLER) {
+ (*callback)(*socket_, *interest);
+ }
+
+ afterDataUnsatisfied(segment);
+
+ uint32_t max_rtx = 0;
+ socket_->getSocketOption(GeneralTransportOptions::MAX_INTEREST_RETX, max_rtx);
+
+ if (TRANSPORT_EXPECT_TRUE(interest_retransmissions_[segment & mask] <
+ max_rtx)) {
+ stats_.updateRetxCount(1);
+
+ callback = nullptr;
+ socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_RETRANSMISSION,
+ &callback);
+ if (*callback != VOID_HANDLER) {
+ (*callback)(*socket_, *interest);
+ }
+
+ callback = nullptr;
+ socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT,
+ &callback);
+ if ((*callback) != VOID_HANDLER) {
+ (*callback)(*socket_, *interest);
+ }
+
+ if (!is_running_) {
+ return;
+ }
+
+ interest_retransmissions_[segment & mask]++;
+
+ interest_to_retransmit_.push(std::move(interest));
+
+ scheduleNextInterests();
+ } else {
+ TRANSPORT_LOGE("Stop: reached max retx limit.");
+ onContentReassembled(std::make_error_code(std::errc(std::errc::io_error)));
+ }
+}
+
+void RaaqmTransportProtocol::scheduleNextInterests() {
+ if (TRANSPORT_EXPECT_FALSE(!is_running_)) {
+ return;
+ }
+
+ uint32_t index = IndexManager::invalid_index;
+ // Send the interest needed for filling the window
+ while (interests_in_flight_ < current_window_size_) {
+ if (interest_to_retransmit_.size() > 0) {
+ sendInterest(std::move(interest_to_retransmit_.front()));
+ interest_to_retransmit_.pop();
} else {
- throw errors::RuntimeException(
- "UNEXPECTED ERROR: when running,current path not found.");
+ index = index_manager_->getNextSuffix();
+ if (index == IndexManager::invalid_index) {
+ break;
+ }
+ sendInterest(index);
}
}
+}
- cur_path_ = path_table_[path_id];
+void RaaqmTransportProtocol::sendInterest(std::uint64_t next_suffix) {
+ auto interest = getPacket();
+ core::Name *name;
+ socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &name);
+ name->setSuffix(next_suffix);
+ interest->setName(*name);
+
+ uint32_t interest_lifetime;
+ socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME,
+ interest_lifetime);
+ interest->setLifetime(interest_lifetime);
+
+ ConsumerInterestCallback *callback = nullptr;
+ socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT,
+ &callback);
+ if (*callback != VOID_HANDLER) {
+ callback->operator()(*socket_, *interest);
+ }
- size_t header_size = content_object.headerSize();
- size_t data_size = content_object.payloadSize();
+ if (TRANSPORT_EXPECT_FALSE(!is_running_)) {
+ return;
+ }
- // Update measurements for path
- cur_path_->updateReceivedStats(header_size + data_size, data_size);
+ interest_retransmissions_[next_suffix & mask] = ~0;
+ interest_timepoints_[next_suffix & mask] = utils::SteadyClock::now();
+ sendInterest(std::move(interest));
+}
+
+void RaaqmTransportProtocol::sendInterest(Interest::Ptr &&interest) {
+ interests_in_flight_++;
+ interest_retransmissions_[interest->getName().getSuffix() & mask]++;
+
+ portal_->sendInterest(std::move(interest));
+}
+
+void RaaqmTransportProtocol::onContentReassembled(std::error_code ec) {
+ interface::ConsumerContentCallback *on_payload = nullptr;
+ socket_->getSocketOption(CONTENT_RETRIEVED, &on_payload);
+ if (*on_payload != VOID_HANDLER) {
+ std::shared_ptr<std::vector<uint8_t>> content_buffer;
+ socket_->getSocketOption(
+ interface::GeneralTransportOptions::APPLICATION_BUFFER, content_buffer);
+ (*on_payload)(*socket_, content_buffer->size(), ec);
+ }
+
+ this->rate_estimator_->onDownloadFinished();
+ stop();
}
void RaaqmTransportProtocol::updateRtt(uint64_t segment) {
if (TRANSPORT_EXPECT_FALSE(!cur_path_)) {
- throw std::runtime_error("ERROR: no current path found, exit");
+ throw std::runtime_error("RAAQM ERROR: no current path found, exit");
} else {
- std::chrono::microseconds rtt;
+ auto now = utils::SteadyClock::now();
+ utils::Microseconds rtt = std::chrono::duration_cast<utils::Microseconds>(
+ now - interest_timepoints_[segment & mask]);
- std::chrono::steady_clock::duration duration =
- std::chrono::steady_clock::now() -
- interest_timepoints_[segment & mask_];
- rtt = std::chrono::duration_cast<std::chrono::microseconds>(duration);
+ // Update stats
+ updateStats(segment, rtt.count(), now);
if (this->rate_estimator_) {
this->rate_estimator_->onRttUpdate((double)rtt.count());
}
+
cur_path_->insertNewRtt(rtt.count());
cur_path_->smoothTimer();
if (cur_path_->newPropagationDelayAvailable()) {
- check_drop_probability();
+ checkDropProbability();
}
}
}
-void RaaqmTransportProtocol::changeInterestLifetime(uint64_t segment) {
- return;
+void RaaqmTransportProtocol::RAAQM() {
+ if (!cur_path_) {
+ throw errors::RuntimeException("ERROR: no current path found, exit");
+ exit(EXIT_FAILURE);
+ } else {
+ // Change drop probability according to RTT statistics
+ cur_path_->updateDropProb();
+
+ if (std::rand() % 10000 <= cur_path_->getDropProb() * 10000) {
+ decreaseWindow();
+ }
+ }
}
-void RaaqmTransportProtocol::check_drop_probability() {
+void RaaqmTransportProtocol::updateStats(uint32_t suffix, uint64_t rtt,
+ utils::TimePoint &now) {
+ // Update RTT statistics
+ stats_.updateAverageRtt(rtt);
+ stats_.updateAverageWindowSize(current_window_size_);
+
+ // Call statistics callback
+ ConsumerTimerCallback *stats_callback = nullptr;
+ socket_->getSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY,
+ &stats_callback);
+ if (*stats_callback != VOID_HANDLER) {
+ auto dt = std::chrono::duration_cast<utils::Milliseconds>(now - t0_);
+
+ uint32_t timer_interval_milliseconds = 0;
+ socket_->getSocketOption(GeneralTransportOptions::STATS_INTERVAL,
+ timer_interval_milliseconds);
+ if (dt.count() > timer_interval_milliseconds) {
+ (*stats_callback)(*socket_, stats_);
+ t0_ = utils::SteadyClock::now();
+ }
+ }
+}
+
+void RaaqmTransportProtocol::updatePathTable(
+ const ContentObject &content_object) {
+ uint32_t path_id = content_object.getPathLabel();
+
+ if (path_table_.find(path_id) == path_table_.end()) {
+ if (TRANSPORT_EXPECT_TRUE(cur_path_ != nullptr)) {
+ // Create a new path with some default param
+
+ if (TRANSPORT_EXPECT_FALSE(path_table_.empty())) {
+ throw errors::RuntimeException(
+ "[RAAQM] No path initialized for path table, error could be in "
+ "default path initialization.");
+ }
+
+ // Initiate the new path default param
+ auto new_path = std::make_unique<RaaqmDataPath>(
+ *(path_table_.at(default_values::path_id)));
+
+ // Insert the new path into hash table
+ path_table_[path_id] = std::move(new_path);
+ } else {
+ throw errors::RuntimeException(
+ "UNEXPECTED ERROR: when running,current path not found.");
+ }
+ }
+
+ cur_path_ = path_table_[path_id].get();
+
+ size_t header_size = content_object.headerSize();
+ size_t data_size = content_object.payloadSize();
+
+ // Update measurements for path
+ cur_path_->updateReceivedStats(header_size + data_size, data_size);
+}
+
+void RaaqmTransportProtocol::checkDropProbability() {
if (!raaqm_autotune_) {
return;
}
unsigned int max_pd = 0;
- std::unordered_map<uint32_t, std::shared_ptr<RaaqmDataPath>>::iterator it;
+ PathTable::iterator it;
for (auto it = path_table_.begin(); it != path_table_.end(); ++it) {
if (it->second->getPropagationDelay() > max_pd &&
it->second->getPropagationDelay() != UINT_MAX &&
@@ -317,28 +639,28 @@ void RaaqmTransportProtocol::check_drop_probability() {
double old_drop_prob = 0;
double old_beta = 0;
- old_beta = socket_->beta_;
- old_drop_prob = socket_->drop_factor_;
+ socket_->getSocketOption(RaaqmTransportOptions::BETA_VALUE, old_beta);
+ socket_->getSocketOption(RaaqmTransportOptions::DROP_FACTOR, old_drop_prob);
if (drop_prob == old_drop_prob && beta == old_beta) {
return;
}
- socket_->beta_ = beta;
- socket_->drop_factor_ = drop_prob;
+ socket_->setSocketOption(RaaqmTransportOptions::BETA_VALUE, beta);
+ socket_->setSocketOption(RaaqmTransportOptions::DROP_FACTOR, drop_prob);
for (it = path_table_.begin(); it != path_table_.end(); it++) {
it->second->setDropProb(drop_prob);
}
}
-void RaaqmTransportProtocol::check_for_stale_paths() {
+void RaaqmTransportProtocol::checkForStalePaths() {
if (!raaqm_autotune_) {
return;
}
bool stale = false;
- std::unordered_map<uint32_t, std::shared_ptr<RaaqmDataPath>>::iterator it;
+ PathTable::iterator it;
for (it = path_table_.begin(); it != path_table_.end(); ++it) {
if (it->second->isStale()) {
stale = true;
@@ -346,71 +668,10 @@ void RaaqmTransportProtocol::check_for_stale_paths() {
}
}
if (stale) {
- check_drop_probability();
+ checkDropProbability();
}
}
-void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) {
- check_for_stale_paths();
- VegasTransportProtocol::onTimeout(std::move(interest));
-}
-
-void RaaqmTransportProtocol::increaseWindow() {
- double max_window_size = socket_->max_window_size_;
- if (current_window_size_ < max_window_size) {
- double gamma = socket_->gamma_;
-
- current_window_size_ += gamma / current_window_size_;
- socket_->current_window_size_ = current_window_size_;
- }
- this->rate_estimator_->onWindowIncrease(current_window_size_);
-}
-
-void RaaqmTransportProtocol::decreaseWindow() {
- double min_window_size = socket_->min_window_size_;
- if (current_window_size_ > min_window_size) {
- double beta = socket_->beta_;
-
- current_window_size_ = current_window_size_ * beta;
- if (current_window_size_ < min_window_size) {
- current_window_size_ = min_window_size;
- }
-
- socket_->current_window_size_ = current_window_size_;
- }
- this->rate_estimator_->onWindowDecrease(current_window_size_);
-}
-
-void RaaqmTransportProtocol::RAAQM() {
- if (!cur_path_) {
- throw errors::RuntimeException("ERROR: no current path found, exit");
- exit(EXIT_FAILURE);
- } else {
- // Change drop probability according to RTT statistics
- cur_path_->updateDropProb();
-
- if (rand() % 10000 <= cur_path_->getDropProb() * 10000) {
- decreaseWindow();
- }
- }
-}
-
-void RaaqmTransportProtocol::afterDataUnsatisfied(uint64_t segment) {
- // Decrease the window because the timeout happened
- decreaseWindow();
-}
-
-void RaaqmTransportProtocol::afterContentReception(
- const Interest &interest, const ContentObject &content_object) {
- updatePathTable(content_object);
- increaseWindow();
- updateRtt(interest.getName().getSuffix());
- this->rate_estimator_->onDataReceived(
- (int)(content_object.payloadSize() + content_object.headerSize()));
- // Set drop probablility and window size accordingly
- RAAQM();
-}
-
} // end namespace protocol
} // end namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/raaqm.h b/libtransport/src/hicn/transport/protocols/raaqm.h
index 6ca410251..09d22cd4f 100644
--- a/libtransport/src/hicn/transport/protocols/raaqm.h
+++ b/libtransport/src/hicn/transport/protocols/raaqm.h
@@ -15,65 +15,108 @@
#pragma once
+#include <hicn/transport/protocols/congestion_window_protocol.h>
+#include <hicn/transport/protocols/protocol.h>
#include <hicn/transport/protocols/raaqm_data_path.h>
#include <hicn/transport/protocols/rate_estimation.h>
-#include <hicn/transport/protocols/vegas.h>
-#include <hicn/transport/protocols/vegas_rto_estimator.h>
+#include <hicn/transport/protocols/reassembly.h>
+#include <hicn/transport/utils/chrono_typedefs.h>
+
+#include <queue>
+#include <vector>
namespace transport {
namespace protocol {
-class RaaqmTransportProtocol : public VegasTransportProtocol {
+class RaaqmTransportProtocol
+ : public TransportProtocol,
+ public BaseReassembly,
+ public CWindowProtocol,
+ public BaseReassembly::ContentReassembledCallback {
public:
- RaaqmTransportProtocol(interface::BaseSocket *icnet_socket);
+ RaaqmTransportProtocol(interface::ConsumerSocket *icnet_socket);
~RaaqmTransportProtocol();
- void start(utils::SharableVector<uint8_t> &content_buffer) override;
+ int start() override;
+
+ void resume() override;
+
+ void reset() override;
protected:
- void copyContent(const ContentObject &content_object) override;
+ static constexpr uint32_t buffer_size =
+ 1 << interface::default_values::log_2_default_buffer_size;
+ static constexpr uint16_t mask = buffer_size - 1;
+ using PathTable =
+ std::unordered_map<uint32_t, std::unique_ptr<RaaqmDataPath>>;
+
+ void increaseWindow() override;
+ void decreaseWindow() override;
+
+ virtual void afterContentReception(const Interest &interest,
+ const ContentObject &content_object);
+ virtual void afterDataUnsatisfied(uint64_t segment);
+
+ virtual void updateStats(uint32_t suffix, uint64_t rtt,
+ utils::TimePoint &now);
private:
void init();
- void afterContentReception(const Interest &interest,
- const ContentObject &content_object) override;
+ void onContentObject(Interest::Ptr &&i, ContentObject::Ptr &&c) override;
- void afterDataUnsatisfied(uint64_t segment) override;
+ void onContentSegment(Interest::Ptr &&interest,
+ ContentObject::Ptr &&content_object);
- void increaseWindow() override;
+ void onTimeout(Interest::Ptr &&i) override;
- void updateRtt(uint64_t segment);
+ virtual void scheduleNextInterests() override;
- void decreaseWindow() override;
+ void sendInterest(std::uint64_t next_suffix);
- void changeInterestLifetime(uint64_t segment) override;
+ void sendInterest(Interest::Ptr &&interest);
- void onTimeout(Interest::Ptr &&interest) override;
+ void onContentReassembled(std::error_code ec) override;
+
+ void updateRtt(uint64_t segment);
void RAAQM();
void updatePathTable(const ContentObject &content_object);
- void check_drop_probability();
+ void checkDropProbability();
- void check_for_stale_paths();
+ void checkForStalePaths();
void printRtt();
+ protected:
+ // Congestion window management
+ double current_window_size_;
+ // Protocol management
+ uint64_t interests_in_flight_;
+ std::array<std::uint32_t, buffer_size> interest_retransmissions_;
+ std::array<utils::TimePoint, buffer_size> interest_timepoints_;
+ std::queue<Interest::Ptr> interest_to_retransmit_;
+
+ private:
/**
* Current download path
*/
- std::shared_ptr<RaaqmDataPath> cur_path_;
+ RaaqmDataPath *cur_path_;
/**
* Hash table for path: each entry is a pair path ID(key) - path object
*/
- std::unordered_map<uint32_t, std::shared_ptr<RaaqmDataPath>> path_table_;
+ PathTable path_table_;
+
+ // TimePoints for statistic
+ utils::TimePoint t0_;
bool set_interest_filter_;
+
// for rate-estimation at packet level
IcnRateEstimator *rate_estimator_;
diff --git a/libtransport/src/hicn/transport/protocols/raaqm_data_path.cc b/libtransport/src/hicn/transport/protocols/raaqm_data_path.cc
index ef26eabb5..e25646205 100644
--- a/libtransport/src/hicn/transport/protocols/raaqm_data_path.cc
+++ b/libtransport/src/hicn/transport/protocols/raaqm_data_path.cc
@@ -14,6 +14,7 @@
*/
#include <hicn/transport/protocols/raaqm_data_path.h>
+#include <hicn/transport/utils/chrono_typedefs.h>
namespace transport {
@@ -42,7 +43,7 @@ RaaqmDataPath::RaaqmDataPath(double drop_factor,
raw_data_bytes_received_(0),
last_raw_data_bytes_received_(0),
rtt_samples_(samples_),
- last_received_pkt_(std::chrono::steady_clock::now()),
+ last_received_pkt_(utils::SteadyClock::now()),
average_rtt_(0),
alpha_(ALPHA) {}
@@ -58,7 +59,7 @@ RaaqmDataPath &RaaqmDataPath::insertNewRtt(uint64_t new_rtt) {
prop_delay_ = rtt_min_;
}
- last_received_pkt_ = std::chrono::steady_clock::now();
+ last_received_pkt_ = utils::SteadyClock::now();
return *this;
}
@@ -124,10 +125,6 @@ RaaqmDataPath &RaaqmDataPath::updateDropProb() {
return *this;
}
-double RaaqmDataPath::getMicroSeconds(struct timeval &time) {
- return (double)(time.tv_sec) * 1000000 + (double)(time.tv_usec);
-}
-
void RaaqmDataPath::setAlpha(double alpha) {
if (alpha >= 0 && alpha <= 1) {
alpha_ = alpha;
@@ -145,9 +142,10 @@ unsigned int RaaqmDataPath::getPropagationDelay() {
}
bool RaaqmDataPath::isStale() {
- TimePoint now = std::chrono::steady_clock::now();
- auto time = std::chrono::duration_cast<Microseconds>(now - last_received_pkt_)
- .count();
+ utils::TimePoint now = utils::SteadyClock::now();
+ auto time =
+ std::chrono::duration_cast<utils::Microseconds>(now - last_received_pkt_)
+ .count();
if (time > 2000000) {
return true;
}
diff --git a/libtransport/src/hicn/transport/protocols/raaqm_data_path.h b/libtransport/src/hicn/transport/protocols/raaqm_data_path.h
index a0b9ec9ca..9e4accfa5 100644
--- a/libtransport/src/hicn/transport/protocols/raaqm_data_path.h
+++ b/libtransport/src/hicn/transport/protocols/raaqm_data_path.h
@@ -15,6 +15,7 @@
#pragma once
+#include <hicn/transport/utils/chrono_typedefs.h>
#include <hicn/transport/utils/min_filter.h>
#include <chrono>
@@ -30,9 +31,6 @@ namespace transport {
namespace protocol {
class RaaqmDataPath {
- using TimePoint = std::chrono::steady_clock::time_point;
- using Microseconds = std::chrono::microseconds;
-
public:
RaaqmDataPath(double drop_factor, double minimum_drop_probability,
unsigned new_timer, unsigned int samples,
@@ -125,12 +123,6 @@ class RaaqmDataPath {
*/
RaaqmDataPath &updateDropProb();
- /**
- * @brief This function convert the time from struct timeval to its value in
- * microseconds
- */
- static double getMicroSeconds(struct timeval &time);
-
void setAlpha(double alpha);
/**
@@ -222,7 +214,7 @@ class RaaqmDataPath {
/**
* Time of the last call to the path reporter method
*/
- TimePoint last_received_pkt_;
+ utils::TimePoint last_received_pkt_;
double average_rtt_;
double alpha_;
diff --git a/libtransport/src/hicn/transport/protocols/rate_estimation.cc b/libtransport/src/hicn/transport/protocols/rate_estimation.cc
index a0bb69153..99fc78c3e 100644
--- a/libtransport/src/hicn/transport/protocols/rate_estimation.cc
+++ b/libtransport/src/hicn/transport/protocols/rate_estimation.cc
@@ -13,6 +13,7 @@
* limitations under the License.
*/
+#include <hicn/transport/interfaces/socket_options_default_values.h>
#include <hicn/transport/protocols/rate_estimation.h>
#include <hicn/transport/utils/log.h>
@@ -33,8 +34,8 @@ void *Timer(void *data) {
pthread_mutex_unlock(&(estimator->mutex_));
while (estimator->is_running_) {
- std::this_thread::sleep_for(
- std::chrono::microseconds((uint64_t)(KV * dat_rtt)));
+ std::this_thread::sleep_for(std::chrono::microseconds(
+ (uint64_t)(interface::default_values::kv * dat_rtt)));
pthread_mutex_lock(&(estimator->mutex_));
diff --git a/libtransport/src/hicn/transport/protocols/rate_estimation.h b/libtransport/src/hicn/transport/protocols/rate_estimation.h
index 91964ec1d..616501b24 100644
--- a/libtransport/src/hicn/transport/protocols/rate_estimation.h
+++ b/libtransport/src/hicn/transport/protocols/rate_estimation.h
@@ -20,11 +20,6 @@
#include <chrono>
-#define BATCH 50
-#define KV 20
-#define ALPHA 0.8
-#define RATE_CHOICE 0
-
namespace transport {
namespace protocol {
diff --git a/libtransport/src/hicn/transport/protocols/reassembly.cc b/libtransport/src/hicn/transport/protocols/reassembly.cc
new file mode 100644
index 000000000..36cfb89a7
--- /dev/null
+++ b/libtransport/src/hicn/transport/protocols/reassembly.cc
@@ -0,0 +1,83 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/transport/interfaces/socket_consumer.h>
+#include <hicn/transport/protocols/indexing_manager.h>
+#include <hicn/transport/protocols/reassembly.h>
+#include <hicn/transport/utils/array.h>
+
+namespace transport {
+
+namespace protocol {
+
+BaseReassembly::BaseReassembly(interface::ConsumerSocket *icn_socket,
+ ContentReassembledCallback *content_callback)
+ : reassembly_consumer_socket_(icn_socket),
+ zero_index_manager_(std::make_unique<ZeroIndexManager>()),
+ incremental_index_manager_(
+ std::make_unique<IncrementalIndexManager>(icn_socket)),
+ manifest_index_manager_(
+ std::make_unique<ManifestIndexManager>(icn_socket)),
+ index_manager_(zero_index_manager_.get()),
+ index_(0) {
+ setContentCallback(content_callback);
+}
+
+void BaseReassembly::reassemble(ContentObject::Ptr &&content_object) {
+ if (TRANSPORT_EXPECT_TRUE(content_object != nullptr)) {
+ received_packets_.emplace(std::make_pair(
+ content_object->getName().getSuffix(), std::move(content_object)));
+ }
+
+ auto it = received_packets_.find(index_);
+ while (it != received_packets_.end()) {
+ if (it->second->getPayloadType() == PayloadType::CONTENT_OBJECT) {
+ copyContent(*it->second);
+ received_packets_.erase(it);
+ }
+
+ index_ = index_manager_->getNextReassemblySegment();
+ it = received_packets_.find(index_);
+ }
+}
+
+void BaseReassembly::copyContent(const ContentObject &content_object) {
+ utils::Array<> a = content_object.getPayload();
+
+ std::shared_ptr<std::vector<uint8_t>> content_buffer;
+ reassembly_consumer_socket_->getSocketOption(
+ interface::GeneralTransportOptions::APPLICATION_BUFFER, content_buffer);
+
+ content_buffer->insert(content_buffer->end(), (uint8_t *)a.data(),
+ (uint8_t *)a.data() + a.length());
+
+ bool download_completed =
+ index_manager_->getFinalSuffix() == content_object.getName().getSuffix();
+
+ if (TRANSPORT_EXPECT_FALSE(download_completed)) {
+ content_callback_->onContentReassembled(std::make_error_code(std::errc(0)));
+ }
+}
+
+void BaseReassembly::reset() {
+ manifest_index_manager_->reset();
+ incremental_index_manager_->reset();
+
+ received_packets_.clear();
+}
+
+} // namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/reassembly.h b/libtransport/src/hicn/transport/protocols/reassembly.h
new file mode 100644
index 000000000..ef3e99fc5
--- /dev/null
+++ b/libtransport/src/hicn/transport/protocols/reassembly.h
@@ -0,0 +1,71 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/core/content_object.h>
+#include <hicn/transport/protocols/manifest_indexing_manager.h>
+
+namespace transport {
+
+namespace protocol {
+
+// Forward Declaration
+class ManifestManager;
+
+class Reassembly {
+ public:
+ class ContentReassembledCallback {
+ public:
+ virtual void onContentReassembled(std::error_code ec) = 0;
+ };
+
+ virtual void reassemble(ContentObject::Ptr &&content_object) = 0;
+ virtual void reset() = 0;
+ virtual void setContentCallback(ContentReassembledCallback *callback) {
+ content_callback_ = callback;
+ }
+
+ protected:
+ ContentReassembledCallback *content_callback_;
+};
+
+class BaseReassembly : public Reassembly {
+ public:
+ BaseReassembly(interface::ConsumerSocket *icn_socket,
+ ContentReassembledCallback *content_callback);
+
+ protected:
+ virtual void reassemble(ContentObject::Ptr &&content_object) override;
+
+ virtual void copyContent(const ContentObject &content_object);
+
+ virtual void reset() override;
+
+ protected:
+ // The consumer socket
+ interface::ConsumerSocket *reassembly_consumer_socket_;
+ std::unique_ptr<ZeroIndexManager> zero_index_manager_;
+ std::unique_ptr<IncrementalIndexManager> incremental_index_manager_;
+ std::unique_ptr<ManifestIndexManager> manifest_index_manager_;
+ IndexVerificationManager *index_manager_;
+ std::unordered_map<std::uint32_t, ContentObject::Ptr> received_packets_;
+
+ uint64_t index_;
+};
+
+} // namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/rtc.cc b/libtransport/src/hicn/transport/protocols/rtc.cc
index 1356ad566..c2323345f 100644
--- a/libtransport/src/hicn/transport/protocols/rtc.cc
+++ b/libtransport/src/hicn/transport/protocols/rtc.cc
@@ -22,8 +22,8 @@
* TODO
* 2) start/constructor/rest variable implementation
* 3) interest retransmission: now I always recover, we should recover only if
- * we have enough time 4) returnContentToUser: rememeber to remove the first
- * 32bits from the payload
+ * we have enough time 4) returnContentToApplication: rememeber to remove the
+ * first 32bits from the payload
*/
namespace transport {
@@ -32,7 +32,8 @@ namespace protocol {
using namespace interface;
-RTCTransportProtocol::RTCTransportProtocol(BaseSocket *icnet_socket)
+RTCTransportProtocol::RTCTransportProtocol(
+ interface::ConsumerSocket *icnet_socket)
: TransportProtocol(icnet_socket),
inflightInterests_(1 << default_values::log_2_default_buffer_size),
modMask_((1 << default_values::log_2_default_buffer_size) - 1) {
@@ -46,19 +47,7 @@ RTCTransportProtocol::~RTCTransportProtocol() {
}
}
-void RTCTransportProtocol::start(
- utils::SharableVector<uint8_t> &content_buffer) {
- if (is_running_) return;
-
- is_running_ = true;
- content_buffer_ = content_buffer.shared_from_this();
-
- reset();
- scheduleNextInterest();
-
- portal_->runEventsLoop();
- is_running_ = false;
-}
+int RTCTransportProtocol::start() { return TransportProtocol::start(); }
void RTCTransportProtocol::stop() {
if (!is_running_) return;
@@ -74,9 +63,8 @@ void RTCTransportProtocol::resume() {
lastRoundBegin_ = std::chrono::steady_clock::now();
inflightInterestsCount_ = 0;
- if (content_buffer_) content_buffer_->clear();
- scheduleNextInterest();
+ scheduleNextInterests();
portal_->runEventsLoop();
@@ -117,7 +105,6 @@ void RTCTransportProtocol::reset() {
while (interestRetransmissions_.size() != 0) interestRetransmissions_.pop();
nackedByProducer_.clear();
nackedByProducerMaxSize_ = 512;
- if (content_buffer_) content_buffer_->clear();
// stats
receivedBytes_ = 0;
@@ -364,9 +351,9 @@ void RTCTransportProtocol::increaseWindow() {
}
void RTCTransportProtocol::sendInterest() {
- Name interest_name;
+ Name *interest_name = nullptr;
socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
- interest_name);
+ &interest_name);
bool isRTX = false;
// uint32_t sentInt = 0;
@@ -382,11 +369,11 @@ void RTCTransportProtocol::sendInterest() {
packetLost_++;
uint32_t pkt = rtxSeg & modMask_;
- interest_name.setSuffix(rtxSeg);
+ interest_name->setSuffix(rtxSeg);
// if the interest is not pending anymore we encrease the retrasnmission
// counter in order to avoid to handle a recovered packt as a normal one
- if (!portal_->interestIsPending(interest_name)) {
+ if (!portal_->interestIsPending(*interest_name)) {
inflightInterests_[pkt].retransmissions++;
}
@@ -394,8 +381,8 @@ void RTCTransportProtocol::sendInterest() {
isRTX = true;
} else {
// in this case we send the packet only if it is not pending yet
- interest_name.setSuffix(actualSegment_);
- if (portal_->interestIsPending(interest_name)) {
+ interest_name->setSuffix(actualSegment_);
+ if (portal_->interestIsPending(*interest_name)) {
actualSegment_++;
return;
}
@@ -410,21 +397,21 @@ void RTCTransportProtocol::sendInterest() {
actualSegment_++;
}
- auto interest = getInterest();
- interest->setName(interest_name);
+ auto interest = getPacket();
+ interest->setName(*interest_name);
uint32_t interestLifetime = default_values::interest_lifetime;
socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME,
interestLifetime);
interest->setLifetime(uint32_t(interestLifetime));
- ConsumerInterestCallback on_interest_output = VOID_HANDLER;
+ ConsumerInterestCallback *on_interest_output = nullptr;
socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT,
- on_interest_output);
+ &on_interest_output);
- if (on_interest_output != VOID_HANDLER) {
- on_interest_output(*dynamic_cast<ConsumerSocket *>(socket_), *interest);
+ if (*on_interest_output != VOID_HANDLER) {
+ (*on_interest_output)(*socket_, *interest);
}
if (TRANSPORT_EXPECT_FALSE(!is_running_)) {
@@ -441,7 +428,7 @@ void RTCTransportProtocol::sendInterest() {
}
}
-void RTCTransportProtocol::scheduleNextInterest() {
+void RTCTransportProtocol::scheduleNextInterests() {
checkRound();
if (!is_running_) return;
@@ -467,7 +454,7 @@ void RTCTransportProtocol::scheduleAppNackRtx(std::vector<uint32_t> &nacks) {
interestRetransmissions_.push(nacks[i]);
}
- scheduleNextInterest();
+ scheduleNextInterests();
}
void RTCTransportProtocol::onTimeout(Interest::Ptr &&interest) {
// packetLost_++;
@@ -483,7 +470,7 @@ void RTCTransportProtocol::onTimeout(Interest::Ptr &&interest) {
interestRetransmissions_.push(segmentNumber);
}
- scheduleNextInterest();
+ scheduleNextInterests();
}
void RTCTransportProtocol::onNack(const ContentObject &content_object) {
@@ -573,14 +560,14 @@ void RTCTransportProtocol::onContentObject(
updateDelayStats(*content_object);
}
- returnContentToUser(*content_object);
+ reassemble(std::move(content_object));
increaseWindow();
}
- scheduleNextInterest();
+ scheduleNextInterests();
}
-void RTCTransportProtocol::returnContentToUser(
+void RTCTransportProtocol::returnContentToApplication(
const ContentObject &content_object) {
// return content to the user
Array a = content_object.getPayload();
@@ -592,13 +579,14 @@ void RTCTransportProtocol::returnContentToUser(
uint16_t rtp_seq = ntohs(*(((uint16_t *)start) + 1));
RTPhICN_offset_ = content_object.getName().getSuffix() - rtp_seq;
- content_buffer_->insert(content_buffer_->end(), start, start + size);
+ std::shared_ptr<std::vector<uint8_t>> content_buffer;
+ socket_->getSocketOption(APPLICATION_BUFFER, content_buffer);
+ content_buffer->insert(content_buffer_->end(), start, start + size);
- ConsumerContentCallback on_payload = VOID_HANDLER;
- socket_->getSocketOption(CONTENT_RETRIEVED, on_payload);
- if (on_payload != VOID_HANDLER) {
- on_payload(*dynamic_cast<ConsumerSocket *>(socket_), size,
- std::make_error_code(std::errc(0)));
+ ConsumerContentCallback *on_payload = nullptr;
+ socket_->getSocketOption(CONTENT_RETRIEVED, &on_payload);
+ if ((*on_payload) != VOID_HANDLER) {
+ (*on_payload)(*socket_, size, std::make_error_code(std::errc(0)));
}
}
diff --git a/libtransport/src/hicn/transport/protocols/rtc.h b/libtransport/src/hicn/transport/protocols/rtc.h
index 61590fc8e..58c143988 100644
--- a/libtransport/src/hicn/transport/protocols/rtc.h
+++ b/libtransport/src/hicn/transport/protocols/rtc.h
@@ -20,6 +20,7 @@
#include <unordered_map>
#include <hicn/transport/protocols/protocol.h>
+#include <hicn/transport/protocols/reassembly.h>
#include <hicn/transport/protocols/rtc_data_path.h>
// algorithm state
@@ -86,23 +87,23 @@ struct sentInterest {
uint8_t retransmissions;
};
-class RTCTransportProtocol : public TransportProtocol {
+class RTCTransportProtocol : public TransportProtocol, public Reassembly {
public:
- RTCTransportProtocol(interface::BaseSocket *icnet_socket);
+ RTCTransportProtocol(interface::ConsumerSocket *icnet_socket);
~RTCTransportProtocol();
- void start(utils::SharableVector<uint8_t> &content_buffer);
+ int start() override;
- void stop();
+ void stop() override;
- void resume();
+ void resume() override;
void onRTCPPacket(uint8_t *packet, size_t len);
private:
// algo functions
- void reset();
+ void reset() override;
void checkRound();
// CC functions
@@ -117,13 +118,18 @@ class RTCTransportProtocol : public TransportProtocol {
// packet functions
void sendInterest();
- void scheduleNextInterest();
+ void scheduleNextInterests() override;
void scheduleAppNackRtx(std::vector<uint32_t> &nacks);
- void onTimeout(Interest::Ptr &&interest);
+ void onTimeout(Interest::Ptr &&interest) override;
void onNack(const ContentObject &content_object);
void onContentObject(Interest::Ptr &&interest,
- ContentObject::Ptr &&content_object);
- void returnContentToUser(const ContentObject &content_object);
+ ContentObject::Ptr &&content_object) override;
+ void returnContentToApplication(const ContentObject &content_object);
+
+ TRANSPORT_ALWAYS_INLINE virtual void reassemble(
+ ContentObject::Ptr &&content_object) override {
+ returnContentToApplication(*content_object);
+ }
// RTCP functions
uint32_t hICN2RTP(uint32_t hicn_seq);
@@ -161,7 +167,7 @@ class RTCTransportProtocol : public TransportProtocol {
// application for pakets for which we already got a
// past NACK by the producer these packet are too old,
// they will never be retrived
- std::shared_ptr<utils::SharableVector<uint8_t>> content_buffer_;
+
uint32_t modMask_;
// stats
diff --git a/libtransport/src/hicn/transport/protocols/statistics.h b/libtransport/src/hicn/transport/protocols/statistics.h
new file mode 100644
index 000000000..47d164158
--- /dev/null
+++ b/libtransport/src/hicn/transport/protocols/statistics.h
@@ -0,0 +1,94 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/portability/c_portability.h>
+
+#include <cstdint>
+
+namespace transport {
+
+namespace protocol {
+
+class TransportStatistics {
+ static constexpr double default_alpha = 0.7;
+
+ public:
+ TransportStatistics(double alpha = default_alpha)
+ : retx_count_(0),
+ bytes_received_(0),
+ average_rtt_(0),
+ avg_window_size_(0),
+ interest_tx_(0),
+ alpha_(alpha) {}
+
+ TRANSPORT_ALWAYS_INLINE void updateRetxCount(uint64_t retx) {
+ retx_count_ += retx;
+ }
+
+ TRANSPORT_ALWAYS_INLINE void updateBytesRecv(uint64_t bytes) {
+ bytes_received_ += bytes;
+ }
+
+ TRANSPORT_ALWAYS_INLINE void updateAverageRtt(uint64_t rtt) {
+ average_rtt_ = (alpha_ * average_rtt_) + ((1. - alpha_) * double(rtt));
+ }
+
+ TRANSPORT_ALWAYS_INLINE void updateAverageWindowSize(double current_window) {
+ avg_window_size_ =
+ (alpha_ * avg_window_size_) + ((1. - alpha_) * current_window);
+ }
+
+ TRANSPORT_ALWAYS_INLINE void updateInterestTx(uint64_t int_tx) {
+ interest_tx_ += int_tx;
+ }
+
+ TRANSPORT_ALWAYS_INLINE uint64_t getRetxCount() const { return retx_count_; }
+
+ TRANSPORT_ALWAYS_INLINE uint64_t getBytesRecv() const {
+ return bytes_received_;
+ }
+
+ TRANSPORT_ALWAYS_INLINE double getAverageRtt() const { return average_rtt_; }
+
+ TRANSPORT_ALWAYS_INLINE double getAverageWindowSize() const {
+ return avg_window_size_;
+ }
+
+ TRANSPORT_ALWAYS_INLINE uint64_t getInterestTx() const {
+ return interest_tx_;
+ }
+
+ TRANSPORT_ALWAYS_INLINE void reset() {
+ retx_count_ = 0;
+ bytes_received_ = 0;
+ average_rtt_ = 0;
+ avg_window_size_ = 0;
+ interest_tx_ = 0;
+ }
+
+ private:
+ uint64_t retx_count_;
+ uint64_t bytes_received_;
+ double average_rtt_;
+ double avg_window_size_;
+ uint64_t interest_tx_;
+ double alpha_;
+};
+
+} // end namespace protocol
+
+} // end namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/vegas.cc b/libtransport/src/hicn/transport/protocols/vegas.cc
deleted file mode 100644
index f99eb83e0..000000000
--- a/libtransport/src/hicn/transport/protocols/vegas.cc
+++ /dev/null
@@ -1,627 +0,0 @@
-/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <hicn/transport/errors/not_implemented_exception.h>
-#include <hicn/transport/interfaces/socket_consumer.h>
-#include <hicn/transport/protocols/vegas.h>
-#include <hicn/transport/utils/literals.h>
-
-#include <cmath>
-
-namespace transport {
-
-namespace protocol {
-
-using namespace interface;
-
-VegasTransportProtocol::VegasTransportProtocol(BaseSocket *icnet_socket)
- : TransportProtocol(icnet_socket),
- is_final_block_number_discovered_(false),
- final_block_number_(std::numeric_limits<uint32_t>::max()),
- last_reassembled_segment_(0),
- content_buffer_size_(0),
- current_window_size_(default_values::min_window_size),
- interests_in_flight_(0),
- next_suffix_(0),
- interest_retransmissions_(1 << default_values::log_2_default_buffer_size),
- interest_timepoints_(1 << default_values::log_2_default_buffer_size),
- retx_count_(0),
- receive_buffer_(1 << default_values::log_2_default_buffer_size),
- unverified_segments_(1 << default_values::log_2_default_buffer_size),
- verified_manifests_(1 << default_values::log_2_default_buffer_size),
- mask_((1 << default_values::log_2_default_buffer_size) - 1),
- incremental_suffix_index_(0),
- suffix_queue_completed_(false),
- download_with_manifest_(false),
- next_manifest_interval_(0_U16),
- interest_tx_(0),
- interest_count_(0),
- byte_count_(0),
- average_rtt_(0.0) {
- portal_ = socket_->portal_;
- incremental_suffix_index_++;
-}
-
-VegasTransportProtocol::~VegasTransportProtocol() { stop(); }
-
-void VegasTransportProtocol::reset() {
- portal_->setConsumerCallback(this);
-
- is_final_block_number_discovered_ = false;
- interest_pool_index_ = 0;
- final_block_number_ = std::numeric_limits<uint32_t>::max();
- next_suffix_ = 0;
- interests_in_flight_ = 0;
- last_reassembled_segment_ = 0;
- content_buffer_size_ = 0;
- content_buffer_->clear();
- interest_retransmissions_.clear();
- interest_retransmissions_.resize(
- 1 << default_values::log_2_default_buffer_size, 0);
- interest_timepoints_.clear();
- interest_timepoints_.resize(1 << default_values::log_2_default_buffer_size,
- std::chrono::steady_clock::time_point());
- receive_buffer_.clear();
- unverified_segments_.clear();
- verified_manifests_.clear();
- next_manifest_interval_ = 0;
- next_manifest_ = 0;
- download_with_manifest_ = false;
- incremental_suffix_index_ = 0;
-
- interest_tx_ = 0;
- interest_count_ = 0;
- byte_count_ = 0;
- average_rtt_ = 0;
-
- // asio::io_service &io_service = portal_->getIoService();
-
- // if (io_service.stopped()) {
- // io_service.reset();
- // }
-}
-
-void VegasTransportProtocol::start(
- utils::SharableVector<uint8_t> &content_buffer) {
- if (is_running_) return;
-
- socket_->t0_ = std::chrono::steady_clock::now();
-
- is_running_ = true;
- content_buffer_ = content_buffer.shared_from_this();
-
- reset();
-
- sendInterest(next_suffix_++);
- portal_->runEventsLoop();
- removeAllPendingInterests();
- is_running_ = false;
-}
-
-void VegasTransportProtocol::resume() {
- if (is_running_) return;
-
- is_running_ = true;
- sendInterest(next_suffix_++);
- portal_->runEventsLoop();
- removeAllPendingInterests();
- is_running_ = false;
-}
-
-void VegasTransportProtocol::sendInterest(std::uint64_t next_suffix) {
- auto interest = getInterest();
- socket_->network_name_.setSuffix((uint32_t)next_suffix);
- interest->setName(socket_->network_name_);
-
- interest->setLifetime(uint32_t(socket_->interest_lifetime_));
-
- if (socket_->on_interest_output_ != VOID_HANDLER) {
- socket_->on_interest_output_(*socket_, *interest);
- }
-
- if (TRANSPORT_EXPECT_FALSE(!is_running_)) {
- return;
- }
-
- interests_in_flight_++;
- interest_retransmissions_[next_suffix & mask_] = 0;
- interest_timepoints_[next_suffix & mask_] = std::chrono::steady_clock::now();
-
- using namespace std::placeholders;
- portal_->sendInterest(std::move(interest));
-}
-
-void VegasTransportProtocol::stop() {
- is_running_ = false;
- portal_->stopEventsLoop();
-}
-
-void VegasTransportProtocol::onContentSegment(
- Interest::Ptr &&interest, ContentObject::Ptr &&content_object) {
- uint32_t incremental_suffix = content_object->getName().getSuffix();
- bool virtual_download = socket_->virtual_download_;
-
- if (verifyContentObject(*content_object)) {
- byte_count_ += content_object->getPayload().length();
-
- if (TRANSPORT_EXPECT_FALSE(content_object->testRst())) {
- is_final_block_number_discovered_ = true;
- final_block_number_ = incremental_suffix;
- }
-
- if (!virtual_download) {
- receive_buffer_.emplace(
- std::make_pair(incremental_suffix, std::move(content_object)));
- reassemble();
- } else if (TRANSPORT_EXPECT_FALSE(is_final_block_number_discovered_ &&
- incremental_suffix ==
- final_block_number_)) {
- returnContentToUser();
- }
- } else {
- unverified_segments_.emplace(
- std::make_pair(incremental_suffix, std::move(content_object)));
- }
-}
-
-void VegasTransportProtocol::afterContentReception(
- const Interest &interest, const ContentObject &content_object) {
- increaseWindow();
-}
-
-void VegasTransportProtocol::afterDataUnsatisfied(uint64_t segment) {
- decreaseWindow();
-}
-
-void VegasTransportProtocol::scheduleNextInterests() {
- if (is_running_) {
- uint32_t next_suffix;
- while (interests_in_flight_ < current_window_size_) {
- if (download_with_manifest_) {
- if (suffix_queue_.size() * 2 < current_window_size_ &&
- next_manifest_ < final_block_number_ && next_manifest_interval_) {
- next_manifest_ += next_manifest_interval_;
- sendInterest(next_manifest_);
- continue;
- }
-
- if (suffix_queue_.pop(next_suffix)) {
- // next_suffix = suffix_queue_.front();
- sendInterest(next_suffix);
- // suffix_queue_.pop_front();
- } else {
- if (!suffix_queue_completed_) {
- TRANSPORT_LOGE("Empty queue!!!!!!");
- }
- break;
- }
- } else {
- if (is_final_block_number_discovered_) {
- if (next_suffix_ > final_block_number_) {
- return;
- }
- }
-
- sendInterest(next_suffix_++);
- }
- }
- }
-}
-
-void VegasTransportProtocol::decreaseWindow() {
- if (current_window_size_ > socket_->min_window_size_) {
- current_window_size_ = std::ceil(current_window_size_ / 2);
- socket_->current_window_size_ = current_window_size_;
- }
-}
-
-void VegasTransportProtocol::increaseWindow() {
- if (current_window_size_ < socket_->max_window_size_) {
- current_window_size_++;
- socket_->max_window_size_ = current_window_size_;
- }
-};
-
-void VegasTransportProtocol::changeInterestLifetime(uint64_t segment) {
- std::chrono::steady_clock::duration duration =
- std::chrono::steady_clock::now() - interest_timepoints_[segment];
- rtt_estimator_.addMeasurement(
- std::chrono::duration_cast<std::chrono::microseconds>(duration));
-
- RtoEstimator::Duration rto = rtt_estimator_.computeRto();
- std::chrono::milliseconds lifetime =
- std::chrono::duration_cast<std::chrono::milliseconds>(rto);
-
- socket_->interest_lifetime_ = (int)lifetime.count();
-}
-
-void VegasTransportProtocol::returnContentToUser() {
- if (socket_->on_payload_retrieved_ != VOID_HANDLER) {
- socket_->on_payload_retrieved_(*socket_, byte_count_,
- std::make_error_code(std::errc(0)));
- }
-
- stop();
-}
-
-void VegasTransportProtocol::onManifest(
- std::unique_ptr<ContentObjectManifest> &&manifest) {
- if (TRANSPORT_EXPECT_FALSE(!is_running_)) {
- return;
- }
-
- download_with_manifest_ = true;
-
- uint32_t segment = manifest->getName().getSuffix();
-
- if (verifyManifest(*manifest)) {
- manifest->decode();
-
- if (TRANSPORT_EXPECT_TRUE(manifest->getVersion() ==
- core::ManifestVersion::VERSION_1)) {
- switch (manifest->getManifestType()) {
- case core::ManifestType::INLINE_MANIFEST: {
- auto _it = manifest->getSuffixList().begin();
- auto _end = --manifest->getSuffixList().end();
-
- if (TRANSPORT_EXPECT_FALSE(manifest->isFinalManifest())) {
- _end++;
- }
-
- // Get final block number
- is_final_block_number_discovered_ = true;
- final_block_number_ = manifest->getFinalBlockNumber();
-
- for (; _it != _end; _it++) {
- suffix_hash_map_[_it->first] = std::make_pair(
- std::vector<uint8_t>(_it->second, _it->second + 32),
- manifest->getHashAlgorithm());
- suffix_queue_.push(_it->first);
- }
-
- next_manifest_interval_ =
- (unsigned short)manifest->getSuffixList().size();
-
- if (manifest->isFinalManifest()) {
- suffix_queue_completed_ = true;
- // Give it a try
- if (verifier_thread_) {
- asio::io_service &io_service = portal_->getIoService();
- io_service.post([this]() { scheduleNextInterests(); });
- }
- }
-
- break;
- }
- case core::ManifestType::FLIC_MANIFEST: {
- throw errors::NotImplementedException();
- }
- case core::ManifestType::FINAL_CHUNK_NUMBER: {
- throw errors::NotImplementedException();
- }
- }
- }
-
- if (!socket_->virtual_download_) {
- receive_buffer_.emplace(
- std::make_pair(segment, std::move(manifest->getPacket())));
- reassemble();
- } else {
- if (segment >= final_block_number_) {
- stop();
- }
- }
- }
-}
-
-bool VegasTransportProtocol::verifyManifest(
- const ContentObjectManifest &manifest) {
- if (!socket_->verify_signature_) {
- return true;
- }
-
- bool is_data_secure = false;
-
- if (socket_->on_content_object_verification_ == VOID_HANDLER) {
- is_data_secure = static_cast<bool>(socket_->verifier_.verify(manifest));
- } else if (socket_->on_content_object_verification_(*socket_, manifest)) {
- is_data_secure = true;
- }
-
- if (TRANSPORT_EXPECT_FALSE(!is_data_secure)) {
- TRANSPORT_LOGE("Verification failed for %s\n",
- manifest.getName().toString().c_str());
- }
-
- return is_data_secure;
-}
-
-// TODO Add the name in the digest computation!
-void VegasTransportProtocol::onContentObject(
- Interest::Ptr &&interest, ContentObject::Ptr &&content_object) {
- uint32_t incremental_suffix = content_object->getName().getSuffix();
-
- std::chrono::microseconds rtt;
- Time now = std::chrono::steady_clock::now();
- std::chrono::steady_clock::duration duration =
- now - interest_timepoints_[incremental_suffix & mask_];
- rtt = std::chrono::duration_cast<std::chrono::microseconds>(duration);
-
- average_rtt_ = (0.7 * average_rtt_) + (0.3 * (double)rtt.count());
-
- if (socket_->on_timer_expires_ != VOID_HANDLER) {
- auto dt = std::chrono::duration_cast<TimeDuration>(now - socket_->t0_);
- if (dt.count() > socket_->timer_interval_milliseconds_) {
- socket_->on_timer_expires_(*socket_, byte_count_, dt,
- (float)current_window_size_, retx_count_,
- (uint32_t)std::round(average_rtt_));
- socket_->t0_ = std::chrono::steady_clock::now();
- }
- }
-
- interests_in_flight_--;
-
- if (TRANSPORT_EXPECT_FALSE(!is_running_ || incremental_suffix == ~0_U64 ||
- receive_buffer_.find(incremental_suffix) !=
- receive_buffer_.end())) {
- return;
- }
-
- changeInterestLifetime(incremental_suffix);
-
- if (socket_->on_content_object_input_ != VOID_HANDLER) {
- socket_->on_content_object_input_(*socket_, *content_object);
- }
-
- if (socket_->on_interest_satisfied_ != VOID_HANDLER) {
- socket_->on_interest_satisfied_(*socket_, *interest);
- }
-
- if (!interest_retransmissions_[incremental_suffix & mask_]) {
- afterContentReception(*interest, *content_object);
- }
-
- if (TRANSPORT_EXPECT_FALSE(content_object->getPayloadType() ==
- PayloadType::MANIFEST)) {
- // TODO Fix manifest!!
- auto manifest =
- std::make_unique<ContentObjectManifest>(std::move(content_object));
-
- if (verifier_thread_ && incremental_suffix != 0) {
- // verifier_thread_->add(std::bind(&VegasTransportProtocol::onManifest,
- // this, std::move(manifest)));
- } else {
- onManifest(std::move(manifest));
- }
- } else if (content_object->getPayloadType() == PayloadType::CONTENT_OBJECT) {
- if (verifier_thread_) {
- // verifier_thread_->add(std::bind(&VegasTransportProtocol::onContentSegment,
- // this, std::move(content_object)));
- } else {
- onContentSegment(std::move(interest), std::move(content_object));
- }
- }
-
- scheduleNextInterests();
-}
-
-bool VegasTransportProtocol::verifyContentObject(
- const ContentObject &content_object) {
- if (!dynamic_cast<ConsumerSocket *>(socket_)->verify_signature_) {
- return true;
- }
-
- uint64_t segment = content_object.getName().getSuffix();
-
- bool ret = false;
-
- if (download_with_manifest_) {
- auto it = suffix_hash_map_.find((const unsigned int)segment);
- if (it != suffix_hash_map_.end()) {
- auto hash_type = static_cast<utils::CryptoHashType>(it->second.second);
- auto data_packet_digest = content_object.computeDigest(it->second.second);
- auto data_packet_digest_bytes =
- data_packet_digest.getDigest<uint8_t>().data();
- std::vector<uint8_t> &manifest_digest_bytes = it->second.first;
-
- if (utils::CryptoHash::compareBinaryDigest(data_packet_digest_bytes,
- manifest_digest_bytes.data(),
- hash_type)) {
- suffix_hash_map_.erase(it);
- ret = true;
- } else {
- throw errors::RuntimeException(
- "Verification failure policy has to be implemented.");
- }
- }
- } else {
- ret = static_cast<bool>(
- dynamic_cast<ConsumerSocket *>(socket_)->verifier_.verify(
- content_object));
-
- if (!ret) {
- throw errors::RuntimeException(
- "Verification failure policy has to be implemented.");
- }
- }
-
- return ret;
- ;
-}
-
-void VegasTransportProtocol::onTimeout(Interest::Ptr &&interest) {
- TRANSPORT_LOGW("Timeout on %s", interest->getName().toString().c_str());
-
- if (TRANSPORT_EXPECT_FALSE(!is_running_)) {
- return;
- }
-
- interests_in_flight_--;
-
- uint64_t segment = interest->getName().getSuffix();
-
- // Do not retransmit interests asking contents that do not exist.
- if (is_final_block_number_discovered_) {
- if (segment > final_block_number_) {
- return;
- }
- }
-
- if (socket_->on_interest_timeout_ != VOID_HANDLER) {
- socket_->on_interest_timeout_(*socket_, *interest);
- }
-
- afterDataUnsatisfied(segment);
-
- if (TRANSPORT_EXPECT_TRUE(interest_retransmissions_[segment & mask_] <
- socket_->max_retransmissions_)) {
- retx_count_++;
-
- if (socket_->on_interest_retransmission_ != VOID_HANDLER) {
- socket_->on_interest_retransmission_(*socket_, *interest);
- }
-
- if (socket_->on_interest_output_ != VOID_HANDLER) {
- socket_->on_interest_output_(*socket_, *interest);
- }
-
- if (!is_running_) {
- return;
- }
-
- // retransmit
- interests_in_flight_++;
- interest_retransmissions_[segment & mask_]++;
-
- using namespace std::placeholders;
- portal_->sendInterest(std::move(interest));
- } else {
- TRANSPORT_LOGE("Stop: reached max retx limit.");
- partialDownload();
- stop();
- }
-}
-
-void VegasTransportProtocol::copyContent(const ContentObject &content_object) {
- Array a = content_object.getPayload();
-
- content_buffer_->insert(content_buffer_->end(), (uint8_t *)a.data(),
- (uint8_t *)a.data() + a.length());
-
- bool download_completed =
- is_final_block_number_discovered_ &&
- content_object.getName().getSuffix() == final_block_number_;
-
- if (TRANSPORT_EXPECT_FALSE(download_completed || !is_running_)) {
- // asio::io_service& io_service = portal_->getIoService();
- // io_service.post([this] () {
- returnContentToUser();
- // });
- }
-}
-
-void VegasTransportProtocol::reassemble() {
- uint64_t index = last_reassembled_segment_;
- auto it = receive_buffer_.find((const unsigned int)index);
-
- while (it != receive_buffer_.end()) {
- if (it->second->getPayloadType() == PayloadType::CONTENT_OBJECT) {
- copyContent(*it->second);
- receive_buffer_.erase(it);
- }
-
- index = ++last_reassembled_segment_;
- it = receive_buffer_.find((const unsigned int)index);
- }
-}
-
-void VegasTransportProtocol::partialDownload() {
- if (!socket_->virtual_download_) {
- reassemble();
- }
-
- if (socket_->on_payload_retrieved_ != VOID_HANDLER) {
- socket_->on_payload_retrieved_(
- *socket_, byte_count_,
- std::make_error_code(std::errc(std::errc::io_error)));
- }
-}
-
-// TODO Check vegas protocol
-// void VegasTransportProtocol::checkForFastRetransmission(const Interest
-// &interest) {
-// uint64_t segNumber = interest.getName().getSuffix();
-// received_segments_[segNumber] = true;
-// fast_retransmitted_segments.erase(segNumber);
-
-// uint64_t possibly_lost_segment = 0;
-// uint64_t highest_received_segment = received_segments_.rbegin()->first;
-
-// for (uint64_t i = 0; i <= highest_received_segment; i++) {
-// if (received_segments_.find(i) == received_segments_.end()) {
-// if (fast_retransmitted_segments.find(i) ==
-// fast_retransmitted_segments.end()) {
-// possibly_lost_segment = i;
-// uint8_t out_of_order_segments = 0;
-// for (uint64_t j = i; j <= highest_received_segment; j++) {
-// if (received_segments_.find(j) != received_segments_.end()) {
-// out_of_order_segments++;
-// if (out_of_order_segments >=
-// default_values::max_out_of_order_segments) {
-// fast_retransmitted_segments[possibly_lost_segment] = true;
-// fastRetransmit(interest, possibly_lost_segment);
-// }
-// }
-// }
-// }
-// }
-// }
-// }
-
-// void VegasTransportProtocol::fastRetransmit(const Interest &interest,
-// uint32_t chunk_number) {
-// if (interest_retransmissions_[chunk_number & mask_] <
-// socket_->max_retransmissions_) {
-// Name name = interest.getName();
-// name.setSuffix(chunk_number);
-
-// std::shared_ptr<Interest> retx_interest =
-// std::make_shared<Interest>(name);
-
-// if (socket_->on_interest_retransmission_ != VOID_HANDLER) {
-// socket_->on_interest_retransmission_(*socket_, *retx_interest);
-// }
-
-// if (socket_->on_interest_output_ != VOID_HANDLER) {
-// socket_->on_interest_output_(*socket_, *retx_interest);
-// }
-
-// if (!is_running_) {
-// return;
-// }
-
-// interests_in_flight_++;
-// interest_retransmissions_[chunk_number & mask_]++;
-
-// using namespace std::placeholders;
-// portal_->sendInterest(std::move(retx_interest));
-// }
-// }
-
-void VegasTransportProtocol::removeAllPendingInterests() { portal_->clear(); }
-
-} // end namespace protocol
-
-} // namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/vegas.h b/libtransport/src/hicn/transport/protocols/vegas.h
deleted file mode 100644
index 7791ffc94..000000000
--- a/libtransport/src/hicn/transport/protocols/vegas.h
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <hicn/transport/protocols/protocol.h>
-#include <hicn/transport/protocols/vegas_rto_estimator.h>
-#include <hicn/transport/utils/event_thread.h>
-#include <hicn/transport/utils/ring_buffer.h>
-#include <hicn/transport/utils/sharable_vector.h>
-
-#include <map>
-
-namespace transport {
-
-namespace protocol {
-
-typedef utils::CircularFifo<uint32_t, 1024 * 128> SuffixQueue;
-typedef std::chrono::time_point<std::chrono::steady_clock> Time;
-typedef std::chrono::milliseconds TimeDuration;
-
-class VegasTransportProtocol : public TransportProtocol {
- public:
- VegasTransportProtocol(interface::BaseSocket *icnet_socket);
-
- virtual ~VegasTransportProtocol();
-
- virtual void start(utils::SharableVector<uint8_t> &content_buffer) override;
-
- void stop() override;
-
- void resume() override;
-
- protected:
- void reset();
-
- void sendInterest(std::uint64_t next_suffix);
-
- void onContentSegment(Interest::Ptr &&interest,
- ContentObject::Ptr &&content_object);
-
- bool verifyContentObject(const ContentObject &content_object);
-
- bool verifyManifest(const interface::ContentObjectManifest &manifest);
-
- virtual void onTimeout(Interest::Ptr &&interest) override;
-
- void onManifest(std::unique_ptr<interface::ContentObjectManifest> &&manifest);
-
- void onContentObject(Interest::Ptr &&interest,
- ContentObject::Ptr &&content_object) override;
-
- virtual void changeInterestLifetime(uint64_t segment);
-
- void scheduleNextInterests();
-
- virtual void decreaseWindow();
-
- virtual void increaseWindow();
-
- virtual void afterContentReception(const Interest &interest,
- const ContentObject &content_object);
-
- virtual void afterDataUnsatisfied(uint64_t segment);
-
- void reassemble();
-
- void returnContentToUser();
-
- void partialDownload();
-
- virtual void copyContent(const ContentObject &content_object);
-
- // virtual void checkForFastRetransmission(const Interest &interest);
-
- // void fastRetransmit(const Interest &interest, uint32_t chunk_number);
-
- void removeAllPendingInterests();
-
- protected:
- void handleTimeout(const std::error_code &ec);
-
- // reassembly variables
- volatile bool is_final_block_number_discovered_;
- std::atomic<uint64_t> final_block_number_;
- uint64_t last_reassembled_segment_;
- std::shared_ptr<utils::SharableVector<uint8_t>> content_buffer_;
- size_t content_buffer_size_;
-
- // transmission variablesis_final_block_number_discovered_
- double current_window_size_;
- double pending_window_size_;
- uint64_t interests_in_flight_;
- uint64_t next_suffix_;
- std::vector<std::uint32_t> interest_retransmissions_;
- std::vector<std::chrono::steady_clock::time_point> interest_timepoints_;
- RtoEstimator rtt_estimator_;
-
- uint32_t retx_count_;
-
- // buffers
- std::unordered_map<std::uint32_t, ContentObject::Ptr>
- receive_buffer_; // verified segments by segment number
- std::unordered_map<std::uint32_t, ContentObject::Ptr>
- unverified_segments_; // used with embedded manifests
- std::unordered_map<std::uint32_t, ContentObject::Ptr>
- verified_manifests_; // by segment number
-
- std::uint16_t interest_pool_index_;
- std::uint16_t mask_;
-
- // suffix randomization: since the suffixes in the manifests could not be in a
- // sequential order, we need to map those suffixes into an ordered sequence.
- std::unordered_map<std::uint64_t, std::uint64_t>
- incremental_suffix_to_real_suffix_map_;
- std::unordered_map<std::uint64_t, std::uint64_t>
- real_suffix_to_incremental_suffix_map_;
- std::uint32_t incremental_suffix_index_;
-
- // verification
- std::unordered_map<uint32_t, std::pair<std::vector<uint8_t>, HashAlgorithm>>
- suffix_hash_map_;
-
- // Fast Retransmission
- std::map<uint64_t, bool> received_segments_;
- std::unordered_map<uint64_t, bool> fast_retransmitted_segments;
-
- // Suffix queue
- volatile bool suffix_queue_completed_;
- SuffixQueue suffix_queue_;
-
- volatile bool download_with_manifest_;
- uint32_t next_manifest_;
- std::atomic<uint16_t> next_manifest_interval_;
-
- std::unique_ptr<utils::EventThread> verifier_thread_;
-
- uint32_t interest_tx_;
- uint32_t interest_count_;
-
- uint64_t byte_count_;
- double average_rtt_;
-
- std::unordered_map<uint32_t, uint64_t> sign_time_;
-};
-
-} // namespace protocol
-
-} // end namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/vegas_rto_estimator.cc b/libtransport/src/hicn/transport/protocols/vegas_rto_estimator.cc
deleted file mode 100644
index a61fd05f0..000000000
--- a/libtransport/src/hicn/transport/protocols/vegas_rto_estimator.cc
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <hicn/transport/interfaces/socket_options_default_values.h>
-#include <hicn/transport/protocols/vegas_rto_estimator.h>
-
-#include <algorithm>
-#include <cmath>
-
-namespace transport {
-
-namespace protocol {
-
-using namespace interface;
-
-RtoEstimator::RtoEstimator(Duration min_rto)
- : smoothed_rtt_((double)RtoEstimator::getInitialRtt().count()),
- rtt_variation_(0),
- first_measurement_(true),
- last_rto_((double)min_rto.count()) {}
-
-void RtoEstimator::addMeasurement(Duration rtt) {
- double duration = static_cast<double>(rtt.count());
- if (first_measurement_) {
- smoothed_rtt_ = duration;
- rtt_variation_ = duration / 2;
- first_measurement_ = false;
- } else {
- rtt_variation_ = (1 - default_values::beta) * rtt_variation_ +
- default_values::beta * std::abs(smoothed_rtt_ - duration);
- smoothed_rtt_ = (1 - default_values::alpha) * smoothed_rtt_ +
- default_values::alpha * duration;
- }
-}
-
-RtoEstimator::Duration RtoEstimator::computeRto() const {
- double rto = smoothed_rtt_ +
- std::max(double(default_values::clock_granularity.count()),
- default_values::k* rtt_variation_);
- return Duration(static_cast<Duration::rep>(rto));
-}
-
-} // end namespace protocol
-
-} // end namespace transport \ No newline at end of file
diff --git a/libtransport/src/hicn/transport/protocols/verification_manager.h b/libtransport/src/hicn/transport/protocols/verification_manager.h
new file mode 100644
index 000000000..da67e86f8
--- /dev/null
+++ b/libtransport/src/hicn/transport/protocols/verification_manager.h
@@ -0,0 +1,72 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/interfaces/socket_consumer.h>
+
+#include <deque>
+
+namespace transport {
+
+namespace protocol {
+
+class VerificationManager {
+ public:
+ virtual ~VerificationManager() = default;
+ virtual bool onPacketToVerify(const Packet& packet) = 0;
+};
+
+class SignatureVerificationManager : public VerificationManager {
+ public:
+ SignatureVerificationManager(interface::ConsumerSocket* icn_socket)
+ : icn_socket_(icn_socket) {}
+
+ TRANSPORT_ALWAYS_INLINE bool onPacketToVerify(const Packet& packet) override {
+ using namespace interface;
+
+ bool verify_signature, ret = false;
+ icn_socket_->getSocketOption(GeneralTransportOptions::VERIFY_SIGNATURE,
+ verify_signature);
+
+ if (!verify_signature) {
+ return true;
+ }
+
+ std::shared_ptr<utils::Verifier> verifier;
+ icn_socket_->getSocketOption(GeneralTransportOptions::VERIFIER, verifier);
+
+ if (TRANSPORT_EXPECT_FALSE(!verifier)) {
+ throw errors::RuntimeException(
+ "No certificate provided by the application.");
+ }
+
+ ret = verifier->verify(packet);
+
+ if (!ret) {
+ throw errors::RuntimeException(
+ "Verification failure policy has to be implemented.");
+ }
+
+ return ret;
+ }
+
+ private:
+ interface::ConsumerSocket* icn_socket_;
+};
+
+} // end namespace protocol
+
+} // end namespace transport