aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/hicn/transport/protocols
diff options
context:
space:
mode:
Diffstat (limited to 'libtransport/src/hicn/transport/protocols')
-rw-r--r--libtransport/src/hicn/transport/protocols/CMakeLists.txt77
-rw-r--r--libtransport/src/hicn/transport/protocols/byte_stream_reassembly.cc121
-rw-r--r--libtransport/src/hicn/transport/protocols/byte_stream_reassembly.h54
-rw-r--r--libtransport/src/hicn/transport/protocols/cbr.cc51
-rw-r--r--libtransport/src/hicn/transport/protocols/cbr.h40
-rw-r--r--libtransport/src/hicn/transport/protocols/congestion_window_protocol.h30
-rw-r--r--libtransport/src/hicn/transport/protocols/consumer.conf21
-rw-r--r--libtransport/src/hicn/transport/protocols/data_processing_events.h33
-rw-r--r--libtransport/src/hicn/transport/protocols/datagram_reassembly.cc35
-rw-r--r--libtransport/src/hicn/transport/protocols/datagram_reassembly.h39
-rw-r--r--libtransport/src/hicn/transport/protocols/download_observer.h32
-rw-r--r--libtransport/src/hicn/transport/protocols/errors.cc60
-rw-r--r--libtransport/src/hicn/transport/protocols/errors.h91
-rw-r--r--libtransport/src/hicn/transport/protocols/incremental_indexer.cc52
-rw-r--r--libtransport/src/hicn/transport/protocols/incremental_indexer.h143
-rw-r--r--libtransport/src/hicn/transport/protocols/indexer.cc78
-rw-r--r--libtransport/src/hicn/transport/protocols/indexer.h106
-rw-r--r--libtransport/src/hicn/transport/protocols/manifest_incremental_indexer.cc232
-rw-r--r--libtransport/src/hicn/transport/protocols/manifest_incremental_indexer.h91
-rw-r--r--libtransport/src/hicn/transport/protocols/manifest_indexing_manager.cc297
-rw-r--r--libtransport/src/hicn/transport/protocols/packet_manager.h67
-rw-r--r--libtransport/src/hicn/transport/protocols/protocol.cc105
-rw-r--r--libtransport/src/hicn/transport/protocols/protocol.h91
-rw-r--r--libtransport/src/hicn/transport/protocols/raaqm.cc712
-rw-r--r--libtransport/src/hicn/transport/protocols/raaqm.h141
-rw-r--r--libtransport/src/hicn/transport/protocols/raaqm_data_path.cc157
-rw-r--r--libtransport/src/hicn/transport/protocols/raaqm_data_path.h225
-rw-r--r--libtransport/src/hicn/transport/protocols/rate_estimation.cc355
-rw-r--r--libtransport/src/hicn/transport/protocols/rate_estimation.h173
-rw-r--r--libtransport/src/hicn/transport/protocols/reassembly.cc70
-rw-r--r--libtransport/src/hicn/transport/protocols/reassembly.h67
-rw-r--r--libtransport/src/hicn/transport/protocols/rtc.cc1016
-rw-r--r--libtransport/src/hicn/transport/protocols/rtc.h227
-rw-r--r--libtransport/src/hicn/transport/protocols/rtc_data_path.cc160
-rw-r--r--libtransport/src/hicn/transport/protocols/rtc_data_path.h79
-rw-r--r--libtransport/src/hicn/transport/protocols/statistics.h113
-rw-r--r--libtransport/src/hicn/transport/protocols/test/CMakeLists.txt10
-rw-r--r--libtransport/src/hicn/transport/protocols/test/test_transport_producer.cc80
-rw-r--r--libtransport/src/hicn/transport/protocols/verification_manager.cc96
-rw-r--r--libtransport/src/hicn/transport/protocols/verification_manager.h67
40 files changed, 0 insertions, 5694 deletions
diff --git a/libtransport/src/hicn/transport/protocols/CMakeLists.txt b/libtransport/src/hicn/transport/protocols/CMakeLists.txt
deleted file mode 100644
index 06515e0e2..000000000
--- a/libtransport/src/hicn/transport/protocols/CMakeLists.txt
+++ /dev/null
@@ -1,77 +0,0 @@
-# Copyright (c) 2017-2019 Cisco and/or its affiliates.
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at:
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-cmake_minimum_required(VERSION 3.5 FATAL_ERROR)
-
-list(APPEND HEADER_FILES
- ${CMAKE_CURRENT_SOURCE_DIR}/indexer.h
- ${CMAKE_CURRENT_SOURCE_DIR}/incremental_indexer.h
- ${CMAKE_CURRENT_SOURCE_DIR}/manifest_incremental_indexer.h
- ${CMAKE_CURRENT_SOURCE_DIR}/reassembly.h
- ${CMAKE_CURRENT_SOURCE_DIR}/datagram_reassembly.h
- ${CMAKE_CURRENT_SOURCE_DIR}/byte_stream_reassembly.h
- ${CMAKE_CURRENT_SOURCE_DIR}/congestion_window_protocol.h
- ${CMAKE_CURRENT_SOURCE_DIR}/packet_manager.h
- ${CMAKE_CURRENT_SOURCE_DIR}/statistics.h
- ${CMAKE_CURRENT_SOURCE_DIR}/rate_estimation.h
- ${CMAKE_CURRENT_SOURCE_DIR}/download_observer.h
- ${CMAKE_CURRENT_SOURCE_DIR}/protocol.h
- ${CMAKE_CURRENT_SOURCE_DIR}/raaqm.h
- ${CMAKE_CURRENT_SOURCE_DIR}/raaqm_data_path.h
- ${CMAKE_CURRENT_SOURCE_DIR}/cbr.h
- ${CMAKE_CURRENT_SOURCE_DIR}/rtc.h
- ${CMAKE_CURRENT_SOURCE_DIR}/rtc_data_path.h
- ${CMAKE_CURRENT_SOURCE_DIR}/errors.h
- ${CMAKE_CURRENT_SOURCE_DIR}/verification_manager.h
- ${CMAKE_CURRENT_SOURCE_DIR}/data_processing_events.h
-)
-
-list(APPEND SOURCE_FILES
- ${CMAKE_CURRENT_SOURCE_DIR}/indexer.cc
- ${CMAKE_CURRENT_SOURCE_DIR}/incremental_indexer.cc
- ${CMAKE_CURRENT_SOURCE_DIR}/manifest_incremental_indexer.cc
- ${CMAKE_CURRENT_SOURCE_DIR}/reassembly.cc
- ${CMAKE_CURRENT_SOURCE_DIR}/datagram_reassembly.cc
- ${CMAKE_CURRENT_SOURCE_DIR}/byte_stream_reassembly.cc
- ${CMAKE_CURRENT_SOURCE_DIR}/protocol.cc
- ${CMAKE_CURRENT_SOURCE_DIR}/raaqm.cc
- ${CMAKE_CURRENT_SOURCE_DIR}/rate_estimation.cc
- ${CMAKE_CURRENT_SOURCE_DIR}/raaqm_data_path.cc
- ${CMAKE_CURRENT_SOURCE_DIR}/cbr.cc
- ${CMAKE_CURRENT_SOURCE_DIR}/rtc.cc
- ${CMAKE_CURRENT_SOURCE_DIR}/rtc_data_path.cc
- ${CMAKE_CURRENT_SOURCE_DIR}/errors.cc
- ${CMAKE_CURRENT_SOURCE_DIR}/verification_manager.cc
-)
-
-set(RAAQM_CONFIG_INSTALL_PREFIX
-${CMAKE_INSTALL_PREFIX}/etc/hicn
-)
-
-set(raaqm_config_path
- ${RAAQM_CONFIG_INSTALL_PREFIX}/consumer.conf
- PARENT_SCOPE
-)
-
-set(TRANSPORT_CONFIG
- ${CMAKE_CURRENT_SOURCE_DIR}/consumer.conf
-)
-
-install(
- FILES ${TRANSPORT_CONFIG}
- DESTINATION ${CMAKE_INSTALL_FULL_SYSCONFDIR}/hicn
- COMPONENT lib${LIBTRANSPORT}
-)
-
-set(SOURCE_FILES ${SOURCE_FILES} PARENT_SCOPE)
-set(HEADER_FILES ${HEADER_FILES} PARENT_SCOPE) \ No newline at end of file
diff --git a/libtransport/src/hicn/transport/protocols/byte_stream_reassembly.cc b/libtransport/src/hicn/transport/protocols/byte_stream_reassembly.cc
deleted file mode 100644
index 2f1e5d8fd..000000000
--- a/libtransport/src/hicn/transport/protocols/byte_stream_reassembly.cc
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <hicn/transport/protocols/byte_stream_reassembly.h>
-
-#include <hicn/transport/interfaces/socket_consumer.h>
-#include <hicn/transport/protocols/errors.h>
-#include <hicn/transport/protocols/indexer.h>
-#include <hicn/transport/utils/array.h>
-#include <hicn/transport/utils/membuf.h>
-
-namespace transport {
-
-namespace protocol {
-
-ByteStreamReassembly::ByteStreamReassembly(
- interface::ConsumerSocket *icn_socket,
- TransportProtocol *transport_protocol)
- : Reassembly(icn_socket, transport_protocol),
- index_(IndexManager::invalid_index),
- download_complete_(false) {}
-
-void ByteStreamReassembly::reassemble(
- std::unique_ptr<ContentObjectManifest> &&manifest) {
- if (TRANSPORT_EXPECT_TRUE(manifest != nullptr)) {
- received_packets_.emplace(
- std::make_pair(manifest->getName().getSuffix(), nullptr));
- assembleContent();
- }
-}
-
-void ByteStreamReassembly::reassemble(ContentObject::Ptr &&content_object) {
- if (TRANSPORT_EXPECT_TRUE(content_object != nullptr)) {
- received_packets_.emplace(std::make_pair(
- content_object->getName().getSuffix(), std::move(content_object)));
- assembleContent();
- }
-}
-
-void ByteStreamReassembly::assembleContent() {
- if (TRANSPORT_EXPECT_FALSE(index_ == IndexManager::invalid_index)) {
- index_ = index_manager_->getNextReassemblySegment();
- if (index_ == IndexManager::invalid_index) {
- return;
- }
- }
-
- auto it = received_packets_.find((const unsigned int)index_);
- while (it != received_packets_.end()) {
- // Check if valid packet
- if (it->second) {
- copyContent(*it->second);
- }
-
- received_packets_.erase(it);
- index_ = index_manager_->getNextReassemblySegment();
- it = received_packets_.find((const unsigned int)index_);
- }
-
- if (!download_complete_ && index_ != IndexManager::invalid_index) {
- transport_protocol_->onReassemblyFailed(index_);
- }
-}
-
-void ByteStreamReassembly::copyContent(const ContentObject &content_object) {
- auto a = content_object.getPayload();
- auto payload_length = a->length();
- auto write_size = std::min(payload_length, read_buffer_->tailroom());
- auto additional_bytes = payload_length > read_buffer_->tailroom()
- ? payload_length - read_buffer_->tailroom()
- : 0;
-
- std::memcpy(read_buffer_->writableTail(), a->data(), write_size);
- read_buffer_->append(write_size);
-
- if (!read_buffer_->tailroom()) {
- notifyApplication();
- std::memcpy(read_buffer_->writableTail(), a->data() + write_size,
- additional_bytes);
- read_buffer_->append(additional_bytes);
- }
-
- download_complete_ =
- index_manager_->getFinalSuffix() == content_object.getName().getSuffix();
-
- if (TRANSPORT_EXPECT_FALSE(download_complete_)) {
- notifyApplication();
- transport_protocol_->onContentReassembled(
- make_error_code(protocol_error::success));
- }
-}
-
-void ByteStreamReassembly::reInitialize() {
- index_ = IndexManager::invalid_index;
- download_complete_ = false;
-
- received_packets_.clear();
-
- // reset read buffer
- interface::ConsumerSocket::ReadCallback *read_callback;
- reassembly_consumer_socket_->getSocketOption(
- interface::ConsumerCallbacksOptions::READ_CALLBACK, &read_callback);
-
- read_buffer_ = utils::MemBuf::create(read_callback->maxBufferSize());
-}
-
-} // namespace protocol
-
-} // namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/byte_stream_reassembly.h b/libtransport/src/hicn/transport/protocols/byte_stream_reassembly.h
deleted file mode 100644
index 7c77d486f..000000000
--- a/libtransport/src/hicn/transport/protocols/byte_stream_reassembly.h
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <hicn/transport/protocols/reassembly.h>
-
-namespace transport {
-
-namespace protocol {
-
-class ByteStreamReassembly : public Reassembly {
- public:
- ByteStreamReassembly(interface::ConsumerSocket *icn_socket,
- TransportProtocol *transport_protocol);
-
- protected:
- virtual void reassemble(core::ContentObject::Ptr &&content_object) override;
-
- virtual void reassemble(
- std::unique_ptr<core::ContentObjectManifest> &&manifest) override;
-
- virtual void copyContent(const core::ContentObject &content_object);
-
- virtual void reInitialize() override;
-
- private:
- void assembleContent();
-
- protected:
- // The consumer socket
- // std::unique_ptr<IncrementalIndexManager> incremental_index_manager_;
- // std::unique_ptr<ManifestIndexManager> manifest_index_manager_;
- // IndexVerificationManager *index_manager_;
- std::unordered_map<std::uint32_t, core::ContentObject::Ptr> received_packets_;
- uint32_t index_;
- bool download_complete_;
-};
-
-} // namespace protocol
-
-} // end namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/cbr.cc b/libtransport/src/hicn/transport/protocols/cbr.cc
deleted file mode 100644
index 02bc7b5e4..000000000
--- a/libtransport/src/hicn/transport/protocols/cbr.cc
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <hicn/transport/interfaces/socket_consumer.h>
-#include <hicn/transport/protocols/cbr.h>
-
-namespace transport {
-
-namespace protocol {
-
-using namespace interface;
-
-CbrTransportProtocol::CbrTransportProtocol(
- interface::ConsumerSocket *icnet_socket)
- : RaaqmTransportProtocol(icnet_socket) {}
-
-int CbrTransportProtocol::start() { return RaaqmTransportProtocol::start(); }
-
-void CbrTransportProtocol::reset() {
- RaaqmTransportProtocol::reset();
- socket_->getSocketOption(GeneralTransportOptions::CURRENT_WINDOW_SIZE,
- current_window_size_);
-}
-
-void CbrTransportProtocol::afterDataUnsatisfied(uint64_t segment) {}
-
-void CbrTransportProtocol::afterContentReception(
- const Interest &interest, const ContentObject &content_object) {
- auto segment = content_object.getName().getSuffix();
- auto now = utils::SteadyClock::now();
- auto rtt = std::chrono::duration_cast<utils::Microseconds>(
- now - interest_timepoints_[segment & mask]);
- // Update stats
- updateStats(segment, rtt.count(), now);
-}
-
-} // end namespace protocol
-
-} // end namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/cbr.h b/libtransport/src/hicn/transport/protocols/cbr.h
deleted file mode 100644
index e80da14f5..000000000
--- a/libtransport/src/hicn/transport/protocols/cbr.h
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <hicn/transport/protocols/raaqm.h>
-
-namespace transport {
-
-namespace protocol {
-
-class CbrTransportProtocol : public RaaqmTransportProtocol {
- public:
- CbrTransportProtocol(interface::ConsumerSocket *icnet_socket);
-
- int start() override;
-
- void reset() override;
-
- private:
- void afterContentReception(const Interest &interest,
- const ContentObject &content_object) override;
- void afterDataUnsatisfied(uint64_t segment) override;
-};
-
-} // end namespace protocol
-
-} // end namespace transport \ No newline at end of file
diff --git a/libtransport/src/hicn/transport/protocols/congestion_window_protocol.h b/libtransport/src/hicn/transport/protocols/congestion_window_protocol.h
deleted file mode 100644
index 36ac6eb17..000000000
--- a/libtransport/src/hicn/transport/protocols/congestion_window_protocol.h
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-namespace transport {
-
-namespace protocol {
-
-class CWindowProtocol {
- protected:
- virtual void increaseWindow() = 0;
- virtual void decreaseWindow() = 0;
-};
-
-} // end namespace protocol
-
-} // end namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/consumer.conf b/libtransport/src/hicn/transport/protocols/consumer.conf
deleted file mode 100644
index 1a366f32f..000000000
--- a/libtransport/src/hicn/transport/protocols/consumer.conf
+++ /dev/null
@@ -1,21 +0,0 @@
-; this file contais the parameters for RAAQM
-autotune = no
-lifetime = 500
-retransmissions = 128
-beta = 0.99
-drop = 0.003
-beta_wifi_ = 0.99
-drop_wifi_ = 0.6
-beta_lte_ = 0.99
-drop_lte_ = 0.003
-wifi_delay_ = 200
-lte_delay_ = 9000
-
-alpha = 0.95
-batching_parameter = 200
-
-;Choice of rate estimator:
-;0 --> an estimation each $(batching_parameter) packets
-;1 --> an estimation "a la TCP", estimation at the end of the download of the segment
-
-rate_estimator = 0
diff --git a/libtransport/src/hicn/transport/protocols/data_processing_events.h b/libtransport/src/hicn/transport/protocols/data_processing_events.h
deleted file mode 100644
index 8975c2b4a..000000000
--- a/libtransport/src/hicn/transport/protocols/data_processing_events.h
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <hicn/transport/core/content_object.h>
-#include <hicn/transport/core/interest.h>
-
-namespace transport {
-namespace protocol {
-
-class ContentObjectProcessingEventCallback {
- public:
- virtual ~ContentObjectProcessingEventCallback() = default;
- virtual void onPacketDropped(core::Interest::Ptr &&i,
- core::ContentObject::Ptr &&c) = 0;
- virtual void onReassemblyFailed(std::uint32_t missing_segment) = 0;
-};
-
-} // namespace protocol
-} // namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/datagram_reassembly.cc b/libtransport/src/hicn/transport/protocols/datagram_reassembly.cc
deleted file mode 100644
index 7b01ad4bc..000000000
--- a/libtransport/src/hicn/transport/protocols/datagram_reassembly.cc
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <hicn/transport/protocols/datagram_reassembly.h>
-
-namespace transport {
-
-namespace protocol {
-
-DatagramReassembly::DatagramReassembly(interface::ConsumerSocket* icn_socket,
- TransportProtocol* transport_protocol)
- : Reassembly(icn_socket, transport_protocol) {}
-
-void DatagramReassembly::reassemble(core::ContentObject::Ptr&& content_object) {
- read_buffer_ = content_object->getPayload();
- Reassembly::notifyApplication();
-}
-
-void DatagramReassembly::reInitialize() {}
-
-} // namespace protocol
-
-} // namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/datagram_reassembly.h b/libtransport/src/hicn/transport/protocols/datagram_reassembly.h
deleted file mode 100644
index 923b6f2c1..000000000
--- a/libtransport/src/hicn/transport/protocols/datagram_reassembly.h
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <hicn/transport/protocols/reassembly.h>
-
-namespace transport {
-
-namespace protocol {
-
-class DatagramReassembly : public Reassembly {
- public:
- DatagramReassembly(interface::ConsumerSocket *icn_socket,
- TransportProtocol *transport_protocol);
-
- virtual void reassemble(core::ContentObject::Ptr &&content_object) override;
- virtual void reInitialize() override;
- virtual void reassemble(
- std::unique_ptr<core::ContentObjectManifest> &&manifest) override {
- return;
- }
-};
-
-} // namespace protocol
-
-} // end namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/download_observer.h b/libtransport/src/hicn/transport/protocols/download_observer.h
deleted file mode 100644
index 6d24fe6fd..000000000
--- a/libtransport/src/hicn/transport/protocols/download_observer.h
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-namespace transport {
-
-namespace protocol {
-
-class IcnObserver {
- public:
- virtual ~IcnObserver(){};
-
- virtual void notifyStats(double throughput) = 0;
- virtual void notifyDownloadTime(double downloadTime) = 0;
-};
-
-} // end namespace protocol
-
-} // end namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/errors.cc b/libtransport/src/hicn/transport/protocols/errors.cc
deleted file mode 100644
index c2249ed4a..000000000
--- a/libtransport/src/hicn/transport/protocols/errors.cc
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Copyright (c) 2020 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <hicn/transport/protocols/errors.h>
-
-namespace transport {
-namespace protocol {
-
-const std::error_category& protocol_category() {
- static protocol_category_impl instance;
-
- return instance;
-}
-
-const char* protocol_category_impl::name() const throw() {
- return "transport::protocol::error";
-}
-
-std::string protocol_category_impl::message(int ev) const {
- switch (static_cast<protocol_error>(ev)) {
- case protocol_error::success: {
- return "Success";
- }
- case protocol_error::signature_verification_failed: {
- return "Signature verification failed.";
- }
- case protocol_error::integrity_verification_failed: {
- return "Integrity verification failed";
- }
- case protocol_error::no_verifier_provided: {
- return "Transport cannot get any verifier for the given data.";
- }
- case protocol_error::io_error: {
- return "Conectivity error between transport and local forwarder";
- }
- case protocol_error::max_retransmissions_error: {
- return "Transport protocol reached max number of retransmissions allowed "
- "for the same interest.";
- }
- case protocol_error::session_aborted: {
- return "The session has been aborted by the application.";
- }
- default: { return "Unknown protocol error"; }
- }
-}
-
-} // namespace protocol
-} // namespace transport \ No newline at end of file
diff --git a/libtransport/src/hicn/transport/protocols/errors.h b/libtransport/src/hicn/transport/protocols/errors.h
deleted file mode 100644
index cb3d3474e..000000000
--- a/libtransport/src/hicn/transport/protocols/errors.h
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Copyright (c) 2020 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <string>
-#include <system_error>
-
-namespace transport {
-namespace protocol {
-
-/**
- * @brief Get the default server error category.
- * @return The default server error category instance.
- *
- * @warning The first call to this function is thread-safe only starting with
- * C++11.
- */
-const std::error_category& protocol_category();
-
-/**
- * The list of errors.
- */
-enum class protocol_error {
- success = 0,
- signature_verification_failed,
- integrity_verification_failed,
- no_verifier_provided,
- io_error,
- max_retransmissions_error,
- session_aborted,
-};
-
-/**
- * @brief Create an error_code instance for the given error.
- * @param error The error.
- * @return The error_code instance.
- */
-inline std::error_code make_error_code(protocol_error error) {
- return std::error_code(static_cast<int>(error), protocol_category());
-}
-
-/**
- * @brief Create an error_condition instance for the given error.
- * @param error The error.
- * @return The error_condition instance.
- */
-inline std::error_condition make_error_condition(protocol_error error) {
- return std::error_condition(static_cast<int>(error), protocol_category());
-}
-
-/**
- * @brief A server error category.
- */
-class protocol_category_impl : public std::error_category {
- public:
- /**
- * @brief Get the name of the category.
- * @return The name of the category.
- */
- virtual const char* name() const throw();
-
- /**
- * @brief Get the error message for a given error.
- * @param ev The error numeric value.
- * @return The message associated to the error.
- */
- virtual std::string message(int ev) const;
-};
-} // namespace protocol
-} // namespace transport
-
-namespace std {
-// namespace system {
-template <>
-struct is_error_code_enum<::transport::protocol::protocol_error>
- : public std::true_type {};
-// } // namespace system
-} // namespace std \ No newline at end of file
diff --git a/libtransport/src/hicn/transport/protocols/incremental_indexer.cc b/libtransport/src/hicn/transport/protocols/incremental_indexer.cc
deleted file mode 100644
index 5a8046daa..000000000
--- a/libtransport/src/hicn/transport/protocols/incremental_indexer.cc
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <hicn/transport/protocols/incremental_indexer.h>
-
-#include <hicn/transport/interfaces/socket_consumer.h>
-
-namespace transport {
-namespace protocol {
-
-void IncrementalIndexer::onContentObject(
- core::Interest::Ptr &&interest, core::ContentObject::Ptr &&content_object) {
- using namespace interface;
-
- if (TRANSPORT_EXPECT_FALSE(content_object->testRst())) {
- final_suffix_ = content_object->getName().getSuffix();
- }
-
- auto ret = verification_manager_->onPacketToVerify(*content_object);
-
- switch (ret) {
- case VerificationPolicy::ACCEPT_PACKET: {
- reassembly_->reassemble(std::move(content_object));
- break;
- }
- case VerificationPolicy::DROP_PACKET: {
- transport_protocol_->onPacketDropped(std::move(interest),
- std::move(content_object));
- break;
- }
- case VerificationPolicy::ABORT_SESSION: {
- transport_protocol_->onContentReassembled(
- make_error_code(protocol_error::session_aborted));
- break;
- }
- }
-}
-
-} // namespace protocol
-} // namespace transport \ No newline at end of file
diff --git a/libtransport/src/hicn/transport/protocols/incremental_indexer.h b/libtransport/src/hicn/transport/protocols/incremental_indexer.h
deleted file mode 100644
index b587a8332..000000000
--- a/libtransport/src/hicn/transport/protocols/incremental_indexer.h
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <hicn/transport/protocols/indexer.h>
-
-#include <hicn/transport/errors/runtime_exception.h>
-#include <hicn/transport/errors/unexpected_manifest_exception.h>
-#include <hicn/transport/protocols/reassembly.h>
-#include <hicn/transport/protocols/verification_manager.h>
-#include <hicn/transport/utils/literals.h>
-
-#include <deque>
-
-namespace transport {
-
-namespace interface {
-class ConsumerSocket;
-}
-
-namespace protocol {
-
-class Reassembly;
-class TransportProtocol;
-
-class IncrementalIndexer : public Indexer {
- public:
- IncrementalIndexer(interface::ConsumerSocket *icn_socket,
- TransportProtocol *transport, Reassembly *reassembly)
- : socket_(icn_socket),
- reassembly_(reassembly),
- transport_protocol_(transport),
- final_suffix_(std::numeric_limits<uint32_t>::max()),
- first_suffix_(0),
- next_download_suffix_(0),
- next_reassembly_suffix_(0),
- verification_manager_(
- std::make_unique<SignatureVerificationManager>(icn_socket)) {
- if (reassembly_) {
- reassembly_->setIndexer(this);
- }
- }
-
- IncrementalIndexer(const IncrementalIndexer &) = delete;
-
- IncrementalIndexer(IncrementalIndexer &&other)
- : socket_(other.socket_),
- reassembly_(other.reassembly_),
- transport_protocol_(other.transport_protocol_),
- final_suffix_(other.final_suffix_),
- first_suffix_(other.first_suffix_),
- next_download_suffix_(other.next_download_suffix_),
- next_reassembly_suffix_(other.next_reassembly_suffix_),
- verification_manager_(std::move(other.verification_manager_)) {
- if (reassembly_) {
- reassembly_->setIndexer(this);
- }
- }
-
- /**
- *
- */
- virtual ~IncrementalIndexer() {}
-
- TRANSPORT_ALWAYS_INLINE virtual void reset(
- std::uint32_t offset = 0) override {
- final_suffix_ = std::numeric_limits<uint32_t>::max();
- next_download_suffix_ = offset;
- next_reassembly_suffix_ = offset;
- }
-
- /**
- * Retrieve from the manifest the next suffix to retrieve.
- */
- TRANSPORT_ALWAYS_INLINE virtual uint32_t getNextSuffix() override {
- return next_download_suffix_ <= final_suffix_ ? next_download_suffix_++
- : IndexManager::invalid_index;
- }
-
- TRANSPORT_ALWAYS_INLINE virtual void setFirstSuffix(
- uint32_t suffix) override {
- first_suffix_ = suffix;
- }
-
- /**
- * Retrive the next segment to be reassembled.
- */
- TRANSPORT_ALWAYS_INLINE virtual uint32_t getNextReassemblySegment() override {
- return next_reassembly_suffix_ <= final_suffix_
- ? next_reassembly_suffix_++
- : IndexManager::invalid_index;
- }
-
- TRANSPORT_ALWAYS_INLINE virtual bool isFinalSuffixDiscovered() override {
- return final_suffix_ != std::numeric_limits<uint32_t>::max();
- }
-
- TRANSPORT_ALWAYS_INLINE virtual uint32_t getFinalSuffix() override {
- return final_suffix_;
- }
-
- void onContentObject(core::Interest::Ptr &&interest,
- core::ContentObject::Ptr &&content_object) override;
-
- TRANSPORT_ALWAYS_INLINE void setReassembly(Reassembly *reassembly) {
- reassembly_ = reassembly;
-
- if (reassembly_) {
- reassembly_->setIndexer(this);
- }
- }
-
- TRANSPORT_ALWAYS_INLINE bool onKeyToVerify() override {
- return verification_manager_->onKeyToVerify();
- }
-
- protected:
- interface::ConsumerSocket *socket_;
- Reassembly *reassembly_;
- TransportProtocol *transport_protocol_;
- uint32_t final_suffix_;
- uint32_t first_suffix_;
- uint32_t next_download_suffix_;
- uint32_t next_reassembly_suffix_;
- std::unique_ptr<VerificationManager> verification_manager_;
-};
-
-} // end namespace protocol
-
-} // end namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/indexer.cc b/libtransport/src/hicn/transport/protocols/indexer.cc
deleted file mode 100644
index d95c10ff9..000000000
--- a/libtransport/src/hicn/transport/protocols/indexer.cc
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <hicn/transport/protocols/indexer.h>
-
-#include <hicn/transport/protocols/incremental_indexer.h>
-#include <hicn/transport/protocols/manifest_incremental_indexer.h>
-#include <hicn/transport/protocols/protocol.h>
-#include <hicn/transport/utils/branch_prediction.h>
-
-namespace transport {
-namespace protocol {
-
-IndexManager::IndexManager(interface::ConsumerSocket *icn_socket,
- TransportProtocol *transport, Reassembly *reassembly)
- : indexer_(std::make_unique<IncrementalIndexer>(icn_socket, transport,
- reassembly)),
- first_segment_received_(false),
- icn_socket_(icn_socket),
- transport_(transport),
- reassembly_(reassembly) {}
-
-void IndexManager::onContentObject(core::Interest::Ptr &&interest,
- core::ContentObject::Ptr &&content_object) {
- if (first_segment_received_) {
- indexer_->onContentObject(std::move(interest), std::move(content_object));
- } else {
- std::uint32_t segment_number = interest->getName().getSuffix();
-
- if (segment_number == 0) {
- // Check if manifest
- if (content_object->getPayloadType() == PayloadType::MANIFEST) {
- IncrementalIndexer *indexer =
- static_cast<IncrementalIndexer *>(indexer_.release());
- indexer_ =
- std::make_unique<ManifestIncrementalIndexer>(std::move(*indexer));
- delete indexer;
- }
-
- indexer_->onContentObject(std::move(interest), std::move(content_object));
- auto it = interest_data_set_.begin();
- while (it != interest_data_set_.end()) {
- indexer_->onContentObject(std::move(const_cast<core::Interest::Ptr &&>(it->first)), std::move(const_cast<core::ContentObject::Ptr &&>(it->second)));
- it = interest_data_set_.erase(it);
- }
-
- first_segment_received_ = true;
- } else {
- interest_data_set_.emplace(std::move(interest), std::move(content_object));
- }
- }
-}
-
-bool IndexManager::onKeyToVerify() {
- return indexer_->onKeyToVerify();
-}
-
-void IndexManager::reset(std::uint32_t offset) {
- indexer_ = std::make_unique<IncrementalIndexer>(icn_socket_, transport_,
- reassembly_);
- first_segment_received_ = false;
- interest_data_set_.clear();
-}
-
-} // namespace protocol
-} // namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/indexer.h b/libtransport/src/hicn/transport/protocols/indexer.h
deleted file mode 100644
index 87cf9b307..000000000
--- a/libtransport/src/hicn/transport/protocols/indexer.h
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <hicn/transport/core/content_object.h>
-#include <hicn/transport/core/interest.h>
-
-#include <set>
-
-namespace transport {
-
-namespace interface {
-class ConsumerSocket;
-}
-
-namespace protocol {
-
-class Reassembly;
-class TransportProtocol;
-
-class Indexer {
- public:
- /**
- *
- */
- virtual ~Indexer() = default;
- /**
- * Retrieve from the manifest the next suffix to retrieve.
- */
- virtual uint32_t getNextSuffix() = 0;
-
- virtual void setFirstSuffix(uint32_t suffix) = 0;
-
- /**
- * Retrive the next segment to be reassembled.
- */
- virtual uint32_t getNextReassemblySegment() = 0;
-
- virtual bool isFinalSuffixDiscovered() = 0;
-
- virtual uint32_t getFinalSuffix() = 0;
-
- virtual void reset(std::uint32_t offset = 0) = 0;
-
- virtual void onContentObject(core::Interest::Ptr &&interest,
- core::ContentObject::Ptr &&content_object) = 0;
-
- virtual bool onKeyToVerify() = 0;
-};
-
-class IndexManager : Indexer {
- public:
- static constexpr uint32_t invalid_index = ~0;
-
- IndexManager(interface::ConsumerSocket *icn_socket,
- TransportProtocol *transport, Reassembly *reassembly);
-
- uint32_t getNextSuffix() override { return indexer_->getNextSuffix(); }
-
- void setFirstSuffix(uint32_t suffix) override {
- indexer_->setFirstSuffix(suffix);
- }
-
- uint32_t getNextReassemblySegment() override {
- return indexer_->getNextReassemblySegment();
- }
-
- bool isFinalSuffixDiscovered() override {
- return indexer_->isFinalSuffixDiscovered();
- }
-
- uint32_t getFinalSuffix() override { return indexer_->getFinalSuffix(); }
-
- void reset(std::uint32_t offset = 0) override;
-
- void onContentObject(core::Interest::Ptr &&interest,
- core::ContentObject::Ptr &&content_object) override;
-
- bool onKeyToVerify() override;
-
- private:
- std::unique_ptr<Indexer> indexer_;
- bool first_segment_received_;
- std::set<std::pair<core::Interest::Ptr, core::ContentObject::Ptr>>
- interest_data_set_;
- interface::ConsumerSocket *icn_socket_;
- TransportProtocol *transport_;
- Reassembly *reassembly_;
-};
-
-} // end namespace protocol
-
-} // end namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/manifest_incremental_indexer.cc b/libtransport/src/hicn/transport/protocols/manifest_incremental_indexer.cc
deleted file mode 100644
index 592daa4d4..000000000
--- a/libtransport/src/hicn/transport/protocols/manifest_incremental_indexer.cc
+++ /dev/null
@@ -1,232 +0,0 @@
-/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <hicn/transport/interfaces/socket_consumer.h>
-#include <hicn/transport/protocols/manifest_incremental_indexer.h>
-
-#include <cmath>
-#include <deque>
-
-namespace transport {
-
-namespace protocol {
-
-using namespace interface;
-
-ManifestIncrementalIndexer::ManifestIncrementalIndexer(
- interface::ConsumerSocket *icn_socket, TransportProtocol *transport,
- Reassembly *reassembly)
- : IncrementalIndexer(icn_socket, transport, reassembly),
- suffix_strategy_(utils::SuffixStrategyFactory::getSuffixStrategy(
- NextSegmentCalculationStrategy::INCREMENTAL,
- next_download_suffix_, 0)) {}
-
-void ManifestIncrementalIndexer::onContentObject(
- core::Interest::Ptr &&interest, core::ContentObject::Ptr &&content_object) {
- // Check if mainfiest or not
- if (content_object->getPayloadType() == PayloadType::MANIFEST) {
- onUntrustedManifest(std::move(interest), std::move(content_object));
- } else if (content_object->getPayloadType() == PayloadType::CONTENT_OBJECT) {
- onUntrustedContentObject(std::move(interest), std::move(content_object));
- }
-}
-
-void ManifestIncrementalIndexer::onUntrustedManifest(
- core::Interest::Ptr &&interest, core::ContentObject::Ptr &&content_object) {
- auto ret = verification_manager_->onPacketToVerify(*content_object);
-
- switch (ret) {
- case VerificationPolicy::ACCEPT_PACKET: {
- processTrustedManifest(std::move(content_object));
- break;
- }
- case VerificationPolicy::DROP_PACKET:
- case VerificationPolicy::ABORT_SESSION: {
- transport_protocol_->onContentReassembled(
- make_error_code(protocol_error::session_aborted));
- break;
- }
- }
-}
-
-void ManifestIncrementalIndexer::processTrustedManifest(
- ContentObject::Ptr &&content_object) {
- auto manifest =
- std::make_unique<ContentObjectManifest>(std::move(*content_object));
- manifest->decode();
-
- if (TRANSPORT_EXPECT_FALSE(manifest->getVersion() !=
- core::ManifestVersion::VERSION_1)) {
- throw errors::RuntimeException("Received manifest with unknown version.");
- }
-
- switch (manifest->getManifestType()) {
- case core::ManifestType::INLINE_MANIFEST: {
- auto _it = manifest->getSuffixList().begin();
- auto _end = manifest->getSuffixList().end();
-
- suffix_strategy_->setFinalSuffix(manifest->getFinalBlockNumber());
-
- for (; _it != _end; _it++) {
- auto hash =
- std::make_pair(std::vector<uint8_t>(_it->second, _it->second + 32),
- manifest->getHashAlgorithm());
-
- if (!checkUnverifiedSegments(_it->first, hash)) {
- suffix_hash_map_[_it->first] = std::move(hash);
- }
- }
-
- reassembly_->reassemble(std::move(manifest));
-
- break;
- }
- case core::ManifestType::FLIC_MANIFEST: {
- throw errors::NotImplementedException();
- }
- case core::ManifestType::FINAL_CHUNK_NUMBER: {
- throw errors::NotImplementedException();
- }
- }
-}
-
-bool ManifestIncrementalIndexer::checkUnverifiedSegments(
- std::uint32_t suffix, const HashEntry &hash) {
- auto it = unverified_segments_.find(suffix);
-
- if (it != unverified_segments_.end()) {
- auto ret = verifyContentObject(hash, *it->second.second);
-
- switch (ret) {
- case VerificationPolicy::ACCEPT_PACKET: {
- reassembly_->reassemble(std::move(it->second.second));
- break;
- }
- case VerificationPolicy::DROP_PACKET: {
- transport_protocol_->onPacketDropped(std::move(it->second.first),
- std::move(it->second.second));
- break;
- }
- case VerificationPolicy::ABORT_SESSION: {
- transport_protocol_->onContentReassembled(
- make_error_code(protocol_error::session_aborted));
- break;
- }
- }
-
- unverified_segments_.erase(it);
- return true;
- }
-
- return false;
-}
-
-VerificationPolicy ManifestIncrementalIndexer::verifyContentObject(
- const HashEntry &manifest_hash, const ContentObject &content_object) {
- VerificationPolicy ret;
-
- auto hash_type = static_cast<utils::CryptoHashType>(manifest_hash.second);
- auto data_packet_digest = content_object.computeDigest(manifest_hash.second);
- auto data_packet_digest_bytes =
- data_packet_digest.getDigest<uint8_t>().data();
- const std::vector<uint8_t> &manifest_digest_bytes = manifest_hash.first;
-
- if (utils::CryptoHash::compareBinaryDigest(
- data_packet_digest_bytes, manifest_digest_bytes.data(), hash_type)) {
- ret = VerificationPolicy::ACCEPT_PACKET;
- } else {
- ConsumerContentObjectVerificationFailedCallback
- *verification_failed_callback = VOID_HANDLER;
- socket_->getSocketOption(ConsumerCallbacksOptions::VERIFICATION_FAILED,
- &verification_failed_callback);
- ret = (*verification_failed_callback)(
- *socket_, content_object,
- make_error_code(protocol_error::integrity_verification_failed));
- }
-
- return ret;
-}
-
-void ManifestIncrementalIndexer::onUntrustedContentObject(
- Interest::Ptr &&i, ContentObject::Ptr &&c) {
- auto suffix = c->getName().getSuffix();
- auto it = suffix_hash_map_.find(suffix);
-
- if (it != suffix_hash_map_.end()) {
- auto ret = verifyContentObject(it->second, *c);
-
- switch (ret) {
- case VerificationPolicy::ACCEPT_PACKET: {
- suffix_hash_map_.erase(it);
- reassembly_->reassemble(std::move(c));
- break;
- }
- case VerificationPolicy::DROP_PACKET: {
- transport_protocol_->onPacketDropped(std::move(i), std::move(c));
- break;
- }
- case VerificationPolicy::ABORT_SESSION: {
- transport_protocol_->onContentReassembled(
- make_error_code(protocol_error::session_aborted));
- break;
- }
- }
- } else {
- unverified_segments_[suffix] = std::make_pair(std::move(i), std::move(c));
- }
-}
-
-uint32_t ManifestIncrementalIndexer::getNextSuffix() {
- auto ret = suffix_strategy_->getNextSuffix();
-
- if (ret <= suffix_strategy_->getFinalSuffix() &&
- ret != utils::SuffixStrategy::INVALID_SUFFIX) {
- suffix_queue_.push(ret);
- return ret;
- }
-
- return IndexManager::invalid_index;
-}
-
-uint32_t ManifestIncrementalIndexer::getFinalSuffix() {
- return suffix_strategy_->getFinalSuffix();
-}
-
-bool ManifestIncrementalIndexer::isFinalSuffixDiscovered() {
- return IncrementalIndexer::isFinalSuffixDiscovered();
-}
-
-uint32_t ManifestIncrementalIndexer::getNextReassemblySegment() {
- if (suffix_queue_.empty()) {
- return IndexManager::invalid_index;
- }
-
- auto ret = suffix_queue_.front();
- suffix_queue_.pop();
- return ret;
-}
-
-void ManifestIncrementalIndexer::reset(std::uint32_t offset) {
- IncrementalIndexer::reset(offset);
- suffix_hash_map_.clear();
- unverified_segments_.clear();
- SuffixQueue empty;
- std::swap(suffix_queue_, empty);
- suffix_strategy_->reset(offset);
-}
-
-} // namespace protocol
-
-} // namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/manifest_incremental_indexer.h b/libtransport/src/hicn/transport/protocols/manifest_incremental_indexer.h
deleted file mode 100644
index 6e991f86f..000000000
--- a/libtransport/src/hicn/transport/protocols/manifest_incremental_indexer.h
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <hicn/transport/interfaces/socket.h>
-#include <hicn/transport/protocols/incremental_indexer.h>
-#include <hicn/transport/utils/suffix_strategy.h>
-
-#include <list>
-
-namespace transport {
-
-namespace protocol {
-
-class ManifestIncrementalIndexer : public IncrementalIndexer {
- static constexpr double alpha = 0.3;
-
- public:
- using SuffixQueue = std::queue<uint32_t>;
- using HashEntry = std::pair<std::vector<uint8_t>, core::HashAlgorithm>;
-
- ManifestIncrementalIndexer(interface::ConsumerSocket *icn_socket,
- TransportProtocol *transport, Reassembly *reassembly);
-
- ManifestIncrementalIndexer(IncrementalIndexer &&indexer)
- : IncrementalIndexer(std::move(indexer)),
- suffix_strategy_(utils::SuffixStrategyFactory::getSuffixStrategy(
- core::NextSegmentCalculationStrategy::INCREMENTAL,
- next_download_suffix_, 0)) {
- for (uint32_t i = first_suffix_; i < next_download_suffix_; i++) {
- suffix_queue_.push(i);
- }
- }
-
- virtual ~ManifestIncrementalIndexer() = default;
-
- void reset(std::uint32_t offset = 0) override;
-
- void onContentObject(core::Interest::Ptr &&interest,
- core::ContentObject::Ptr &&content_object) override;
-
- uint32_t getNextSuffix() override;
-
- uint32_t getNextReassemblySegment() override;
-
- bool isFinalSuffixDiscovered() override;
-
- uint32_t getFinalSuffix() override;
-
- private:
- void onUntrustedManifest(core::Interest::Ptr &&interest,
- core::ContentObject::Ptr &&content_object);
- void onUntrustedContentObject(core::Interest::Ptr &&interest,
- core::ContentObject::Ptr &&content_object);
- void processTrustedManifest(core::ContentObject::Ptr &&content_object);
- void onManifestReceived(core::Interest::Ptr &&i,
- core::ContentObject::Ptr &&c);
- void onManifestTimeout(core::Interest::Ptr &&i);
- VerificationPolicy verifyContentObject(
- const HashEntry &manifest_hash,
- const core::ContentObject &content_object);
- bool checkUnverifiedSegments(std::uint32_t suffix, const HashEntry &hash);
-
- protected:
- std::unique_ptr<utils::SuffixStrategy> suffix_strategy_;
- SuffixQueue suffix_queue_;
-
- // Hash verification
- std::unordered_map<uint32_t, HashEntry> suffix_hash_map_;
-
- std::unordered_map<uint32_t,
- std::pair<core::Interest::Ptr, core::ContentObject::Ptr>>
- unverified_segments_;
-};
-
-} // end namespace protocol
-
-} // end namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.cc b/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.cc
deleted file mode 100644
index 5bf9c89f7..000000000
--- a/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.cc
+++ /dev/null
@@ -1,297 +0,0 @@
-/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <hicn/transport/interfaces/socket_consumer.h>
-#include <hicn/transport/protocols/manifest_indexing_manager.h>
-
-#include <cmath>
-#include <deque>
-
-namespace transport {
-
-namespace protocol {
-
-using namespace interface;
-
-ManifestIndexManager::ManifestIndexManager(
- interface::ConsumerSocket *icn_socket, TransportProtocol *next_interest)
- : IncrementalIndexManager(icn_socket),
- PacketManager<Interest>(1024),
- next_to_retrieve_segment_(suffix_queue_.end()),
- suffix_manifest_(core::NextSegmentCalculationStrategy::INCREMENTAL, 0),
- next_reassembly_segment_(
- core::NextSegmentCalculationStrategy::INCREMENTAL, 1, true),
- ignored_segments_(),
- next_interest_(next_interest) {}
-
-bool ManifestIndexManager::onManifest(
- core::ContentObject::Ptr &&content_object) {
- auto manifest =
- std::make_unique<ContentObjectManifest>(std::move(*content_object));
- bool manifest_verified = verification_manager_->onPacketToVerify(*manifest);
-
- if (manifest_verified) {
- manifest->decode();
-
- if (TRANSPORT_EXPECT_FALSE(manifest->getVersion() !=
- core::ManifestVersion::VERSION_1)) {
- throw errors::RuntimeException("Received manifest with unknown version.");
- }
-
- switch (manifest->getManifestType()) {
- case core::ManifestType::INLINE_MANIFEST: {
- auto _it = manifest->getSuffixList().begin();
- auto _end = manifest->getSuffixList().end();
- size_t nb_segments = std::distance(_it, _end);
- final_suffix_ = manifest->getFinalBlockNumber(); // final block number
-
- TRANSPORT_LOGD("Received manifest %u",
- manifest->getWritableName().getSuffix());
- suffix_hash_map_[_it->first] =
- std::make_pair(std::vector<uint8_t>(_it->second, _it->second + 32),
- manifest->getHashAlgorithm());
- suffix_queue_.push_back(_it->first);
-
- // If the transport protocol finished the list of segments to retrieve,
- // reset the next_to_retrieve_segment_ iterator to the next segment
- // provided by this manifest.
- if (TRANSPORT_EXPECT_FALSE(next_to_retrieve_segment_ ==
- suffix_queue_.end())) {
- next_to_retrieve_segment_ = --suffix_queue_.end();
- }
-
- std::advance(_it, 1);
- for (; _it != _end; _it++) {
- suffix_hash_map_[_it->first] = std::make_pair(
- std::vector<uint8_t>(_it->second, _it->second + 32),
- manifest->getHashAlgorithm());
- suffix_queue_.push_back(_it->first);
- }
-
- if (TRANSPORT_EXPECT_FALSE(manifest->getName().getSuffix()) == 0) {
- core::NextSegmentCalculationStrategy strategy =
- manifest->getNextSegmentCalculationStrategy();
-
- suffix_manifest_.reset(0);
- suffix_manifest_.setNbSegments(nb_segments);
- suffix_manifest_.setSuffixStrategy(strategy);
- TRANSPORT_LOGD("Capacity of 1st manifest %zu",
- suffix_manifest_.getNbSegments());
-
- next_reassembly_segment_.reset(*suffix_queue_.begin());
- next_reassembly_segment_.setNbSegments(nb_segments);
- suffix_manifest_.setSuffixStrategy(strategy);
- }
-
- // If the manifest is not full, we add the suffixes of missing segments
- // to the list of segments to ignore when computing the next reassembly
- // index.
- if (TRANSPORT_EXPECT_FALSE(
- suffix_manifest_.getNbSegments() - nb_segments > 0)) {
- auto start = manifest->getSuffixList().begin();
- auto last = --_end;
- for (uint32_t i = last->first + 1;
- i < start->first + suffix_manifest_.getNbSegments(); i++) {
- ignored_segments_.push_back(i);
- }
- }
-
- if (TRANSPORT_EXPECT_FALSE(manifest->isFinalManifest()) == 0) {
- fillWindow(manifest->getWritableName(),
- manifest->getName().getSuffix());
- }
-
- break;
- }
- case core::ManifestType::FLIC_MANIFEST: {
- throw errors::NotImplementedException();
- }
- case core::ManifestType::FINAL_CHUNK_NUMBER: {
- throw errors::NotImplementedException();
- }
- }
- }
-
- return manifest_verified;
-}
-
-void ManifestIndexManager::onManifestReceived(Interest::Ptr &&i,
- ContentObject::Ptr &&c) {
- onManifest(std::move(c));
- if (next_interest_) {
- next_interest_->scheduleNextInterests();
- }
-}
-
-void ManifestIndexManager::onManifestTimeout(Interest::Ptr &&i) {
- const Name &n = i->getName();
- uint32_t segment = n.getSuffix();
-
- if (segment > final_suffix_) {
- return;
- }
-
- TRANSPORT_LOGD("Timeout on manifest %u", segment);
- // Get portal
- std::shared_ptr<interface::BasePortal> portal;
- socket_->getSocketOption(GeneralTransportOptions::PORTAL, portal);
-
- // Send requests for manifest out of the congestion window (no
- // in_flight_interests++)
- portal->sendInterest(
- std::move(i),
- std::bind(&ManifestIndexManager::onManifestReceived, this,
- std::placeholders::_1, std::placeholders::_2),
- std::bind(&ManifestIndexManager::onManifestTimeout, this,
- std::placeholders::_1));
-}
-
-void ManifestIndexManager::fillWindow(Name &name, uint32_t current_manifest) {
- /* Send as many manifest as required for filling window. */
- uint32_t interest_lifetime;
- double window_size;
- std::shared_ptr<interface::BasePortal> portal;
- Interest::Ptr interest;
- uint32_t current_segment = *next_to_retrieve_segment_;
- // suffix_manifest_ now points to the next manifest to request
- uint32_t last_requested_manifest = (suffix_manifest_++).getSuffix();
-
- socket_->getSocketOption(GeneralTransportOptions::PORTAL, portal);
- socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME,
- interest_lifetime);
- socket_->getSocketOption(GeneralTransportOptions::CURRENT_WINDOW_SIZE,
- window_size);
-
- if (TRANSPORT_EXPECT_FALSE(suffix_manifest_.getSuffix() >= final_suffix_)) {
- suffix_manifest_.updateSuffix(last_requested_manifest);
- return;
- }
-
- if (current_segment + window_size < suffix_manifest_.getSuffix() &&
- current_manifest != last_requested_manifest) {
- suffix_manifest_.updateSuffix(last_requested_manifest);
- return;
- }
-
- do {
- interest = getPacket();
- name.setSuffix(suffix_manifest_.getSuffix());
- interest->setName(name);
- interest->setLifetime(interest_lifetime);
-
- // Send interests for manifest out of the congestion window (no
- // in_flight_interests++)
- portal->sendInterest(
- std::move(interest),
- std::bind(&ManifestIndexManager::onManifestReceived, this,
- std::placeholders::_1, std::placeholders::_2),
- std::bind(&ManifestIndexManager::onManifestTimeout, this,
- std::placeholders::_1));
- TRANSPORT_LOGD("Send manifest interest %u", name.getSuffix());
-
- last_requested_manifest = (suffix_manifest_++).getSuffix();
- } while (current_segment + window_size >= suffix_manifest_.getSuffix() &&
- suffix_manifest_.getSuffix() < final_suffix_);
-
- // suffix_manifest_ now points to the last requested manifest
- suffix_manifest_.updateSuffix(last_requested_manifest);
-}
-
-bool ManifestIndexManager::onContentObject(
- const core::ContentObject &content_object) {
- bool verify_signature;
- socket_->getSocketOption(GeneralTransportOptions::VERIFY_SIGNATURE,
- verify_signature);
-
- if (!verify_signature) {
- return true;
- }
-
- uint64_t segment = content_object.getName().getSuffix();
-
- bool ret = false;
-
- auto it = suffix_hash_map_.find((const unsigned int)segment);
- if (it != suffix_hash_map_.end()) {
- auto hash_type = static_cast<utils::CryptoHashType>(it->second.second);
- auto data_packet_digest = content_object.computeDigest(it->second.second);
- auto data_packet_digest_bytes =
- data_packet_digest.getDigest<uint8_t>().data();
- std::vector<uint8_t> &manifest_digest_bytes = it->second.first;
-
- if (utils::CryptoHash::compareBinaryDigest(data_packet_digest_bytes,
- manifest_digest_bytes.data(),
- hash_type)) {
- suffix_hash_map_.erase(it);
- ret = true;
- } else {
- throw errors::RuntimeException(
- "Verification failure policy has to be implemented.");
- }
- }
-
- return ret;
-}
-
-uint32_t ManifestIndexManager::getNextSuffix() {
- if (TRANSPORT_EXPECT_FALSE(next_to_retrieve_segment_ ==
- suffix_queue_.end())) {
- return invalid_index;
- }
-
- return *next_to_retrieve_segment_++;
-}
-
-uint32_t ManifestIndexManager::getFinalSuffix() { return final_suffix_; }
-
-bool ManifestIndexManager::isFinalSuffixDiscovered() {
- return IncrementalIndexManager::isFinalSuffixDiscovered();
-}
-
-uint32_t ManifestIndexManager::getNextReassemblySegment() {
- uint32_t current_reassembly_segment;
-
- while (true) {
- current_reassembly_segment = next_reassembly_segment_.getSuffix();
- next_reassembly_segment_++;
-
- if (TRANSPORT_EXPECT_FALSE(current_reassembly_segment > final_suffix_)) {
- return invalid_index;
- }
-
- if (ignored_segments_.empty()) break;
-
- auto is_ignored =
- std::find(ignored_segments_.begin(), ignored_segments_.end(),
- current_reassembly_segment);
-
- if (is_ignored == ignored_segments_.end()) break;
-
- ignored_segments_.erase(is_ignored);
- }
-
- return current_reassembly_segment;
-}
-
-void ManifestIndexManager::reset() {
- IncrementalIndexManager::reset();
- suffix_manifest_.reset(0);
- suffix_queue_.clear();
- suffix_hash_map_.clear();
-}
-
-} // end namespace protocol
-
-} // end namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/packet_manager.h b/libtransport/src/hicn/transport/protocols/packet_manager.h
deleted file mode 100644
index 4d4011ecf..000000000
--- a/libtransport/src/hicn/transport/protocols/packet_manager.h
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <hicn/transport/utils/object_pool.h>
-
-namespace transport {
-
-namespace protocol {
-
-using namespace core;
-
-template <typename PacketType>
-class PacketManager {
- static_assert(std::is_base_of<Packet, PacketType>::value,
- "The packet manager support just Interest and Data.");
-
- static constexpr std::size_t packet_pool_size = 4096;
-
- public:
- PacketManager(std::size_t size = packet_pool_size) : size_(0) {
- // Create pool of interests
- increasePoolSize(size);
- }
-
- TRANSPORT_ALWAYS_INLINE void increasePoolSize(std::size_t size) {
- for (std::size_t i = 0; i < size; i++) {
- interest_pool_.add(new PacketType());
- }
-
- size_ += size;
- }
-
- TRANSPORT_ALWAYS_INLINE typename PacketType::Ptr getPacket() {
- auto result = interest_pool_.get();
-
- while (TRANSPORT_EXPECT_FALSE(!result.first)) {
- // Add packets to the pool
- increasePoolSize(size_);
- result = interest_pool_.get();
- }
-
- result.second->resetPayload();
- return std::move(result.second);
- }
-
- private:
- utils::ObjectPool<PacketType> interest_pool_;
- std::size_t size_;
-};
-
-} // end namespace protocol
-
-} // end namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/protocol.cc b/libtransport/src/hicn/transport/protocols/protocol.cc
deleted file mode 100644
index db461a66f..000000000
--- a/libtransport/src/hicn/transport/protocols/protocol.cc
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <hicn/transport/interfaces/socket_consumer.h>
-#include <hicn/transport/protocols/protocol.h>
-
-namespace transport {
-
-namespace protocol {
-
-using namespace interface;
-
-TransportProtocol::TransportProtocol(interface::ConsumerSocket *icn_socket,
- Reassembly *reassembly_protocol)
- : socket_(icn_socket),
- reassembly_protocol_(reassembly_protocol),
- index_manager_(
- std::make_unique<IndexManager>(socket_, this, reassembly_protocol)),
- is_running_(false),
- is_first_(false) {
- socket_->getSocketOption(GeneralTransportOptions::PORTAL, portal_);
- socket_->getSocketOption(OtherOptions::STATISTICS, &stats_);
-}
-
-int TransportProtocol::start() {
- // If the protocol is already running, return otherwise set as running
- if (is_running_) return -1;
-
- // Reset the protocol state machine
- reset();
-
- // Set it is the first time we schedule an interest
- is_first_ = true;
-
- // Schedule next interests
- scheduleNextInterests();
-
- is_first_ = false;
-
- // Set the protocol as running
- is_running_ = true;
-
- // Start Event loop
- portal_->runEventsLoop();
-
- // Not running anymore
- is_running_ = false;
-
- return 0;
-}
-
-void TransportProtocol::stop() {
- is_running_ = false;
- portal_->stopEventsLoop();
-}
-
-void TransportProtocol::resume() {
- if (is_running_) return;
-
- is_running_ = true;
-
- scheduleNextInterests();
-
- portal_->runEventsLoop();
-
- is_running_ = false;
-}
-
-void TransportProtocol::onContentReassembled(std::error_code ec) {
- interface::ConsumerSocket::ReadCallback *on_payload = VOID_HANDLER;
- socket_->getSocketOption(READ_CALLBACK, &on_payload);
-
- if (!on_payload) {
- throw errors::RuntimeException(
- "The read callback must be installed in the transport before "
- "starting "
- "the content retrieval.");
- }
-
- if (!ec) {
- on_payload->readSuccess(stats_->getBytesRecv());
- } else {
- on_payload->readError(ec);
- }
-
- stop();
-
- on_payload->afterRead();
-}
-
-} // end namespace protocol
-
-} // end namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/protocol.h b/libtransport/src/hicn/transport/protocols/protocol.h
deleted file mode 100644
index 4897da902..000000000
--- a/libtransport/src/hicn/transport/protocols/protocol.h
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <atomic>
-
-#include <hicn/transport/interfaces/socket.h>
-#include <hicn/transport/protocols/data_processing_events.h>
-#include <hicn/transport/protocols/indexer.h>
-#include <hicn/transport/protocols/packet_manager.h>
-#include <hicn/transport/protocols/reassembly.h>
-#include <hicn/transport/protocols/statistics.h>
-#include <hicn/transport/utils/object_pool.h>
-
-namespace transport {
-
-namespace protocol {
-
-using namespace core;
-
-class IndexVerificationManager;
-
-class TransportProtocolCallback {
- virtual void onContentObject(const core::Interest &interest,
- const core::ContentObject &content_object) = 0;
- virtual void onTimeout(const core::Interest &interest) = 0;
-};
-
-class TransportProtocol : public interface::BasePortal::ConsumerCallback,
- public PacketManager<Interest>,
- public ContentObjectProcessingEventCallback {
- static constexpr std::size_t interest_pool_size = 4096;
-
- friend class ManifestIndexManager;
-
- public:
- TransportProtocol(interface::ConsumerSocket *icn_socket,
- Reassembly *reassembly_protocol);
-
- virtual ~TransportProtocol() = default;
-
- TRANSPORT_ALWAYS_INLINE bool isRunning() { return is_running_; }
-
- virtual int start();
-
- virtual void stop();
-
- virtual void resume();
-
- virtual bool verifyKeyPackets() = 0;
-
- virtual void scheduleNextInterests() = 0;
-
- // Events generated by the indexing
- virtual void onContentReassembled(std::error_code ec);
- virtual void onPacketDropped(Interest::Ptr &&interest,
- ContentObject::Ptr &&content_object) = 0;
- virtual void onReassemblyFailed(std::uint32_t missing_segment) = 0;
-
- protected:
- // Consumer Callback
- virtual void reset() = 0;
- virtual void onContentObject(Interest::Ptr &&i, ContentObject::Ptr &&c) = 0;
- virtual void onTimeout(Interest::Ptr &&i) = 0;
-
- protected:
- interface::ConsumerSocket *socket_;
- std::unique_ptr<Reassembly> reassembly_protocol_;
- std::unique_ptr<IndexManager> index_manager_;
- std::shared_ptr<interface::BasePortal> portal_;
- std::atomic<bool> is_running_;
- // True if it si the first time we schedule an interest
- std::atomic<bool> is_first_;
- TransportStatistics *stats_;
-};
-
-} // end namespace protocol
-} // end namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/raaqm.cc b/libtransport/src/hicn/transport/protocols/raaqm.cc
deleted file mode 100644
index 984470edb..000000000
--- a/libtransport/src/hicn/transport/protocols/raaqm.cc
+++ /dev/null
@@ -1,712 +0,0 @@
-/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <hicn/transport/interfaces/socket_consumer.h>
-#include <hicn/transport/protocols/indexer.h>
-#include <hicn/transport/protocols/raaqm.h>
-#include <hicn/transport/protocols/errors.h>
-
-#include <cstdlib>
-#include <fstream>
-
-namespace transport {
-
-namespace protocol {
-
-using namespace interface;
-
-RaaqmTransportProtocol::RaaqmTransportProtocol(ConsumerSocket *icn_socket)
- : TransportProtocol(icn_socket, new ByteStreamReassembly(icn_socket, this)),
- current_window_size_(1),
- interests_in_flight_(0),
- cur_path_(nullptr),
- t0_(utils::SteadyClock::now()),
- rate_estimator_(nullptr) {
- init();
-}
-
-RaaqmTransportProtocol::~RaaqmTransportProtocol() {
- if (rate_estimator_) {
- delete rate_estimator_;
- }
-}
-
-int RaaqmTransportProtocol::start() {
- if (rate_estimator_) {
- rate_estimator_->onStart();
- }
-
- if (!cur_path_) {
- // RAAQM
- double drop_factor;
- double minimum_drop_probability;
- uint32_t sample_number;
- uint32_t interest_lifetime;
-
- socket_->getSocketOption(RaaqmTransportOptions::DROP_FACTOR, drop_factor);
- socket_->getSocketOption(RaaqmTransportOptions::MINIMUM_DROP_PROBABILITY,
- minimum_drop_probability);
- socket_->getSocketOption(RaaqmTransportOptions::SAMPLE_NUMBER,
- sample_number);
- socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME,
- interest_lifetime);
-
- // Rate Estimation
- double alpha = 0.0;
- uint32_t batching_param = 0;
- uint32_t choice_param = 0;
- socket_->getSocketOption(RateEstimationOptions::RATE_ESTIMATION_ALPHA,
- alpha);
- socket_->getSocketOption(
- RateEstimationOptions::RATE_ESTIMATION_BATCH_PARAMETER, batching_param);
- socket_->getSocketOption(RateEstimationOptions::RATE_ESTIMATION_CHOICE,
- choice_param);
-
- if (choice_param == 1) {
- rate_estimator_ = new ALaTcpEstimator();
- } else {
- rate_estimator_ = new SimpleEstimator(alpha, batching_param);
- }
-
- socket_->getSocketOption(RateEstimationOptions::RATE_ESTIMATION_OBSERVER,
- &rate_estimator_->observer_);
-
- // Current path
- auto cur_path = std::make_unique<RaaqmDataPath>(
- drop_factor, minimum_drop_probability, interest_lifetime * 1000,
- sample_number);
- cur_path_ = cur_path.get();
- path_table_[default_values::path_id] = std::move(cur_path);
- }
-
- portal_->setConsumerCallback(this);
- return TransportProtocol::start();
-}
-
-void RaaqmTransportProtocol::resume() { return TransportProtocol::resume(); }
-
-void RaaqmTransportProtocol::reset() {
- // Set first segment to retrieve
- core::Name *name;
- socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &name);
- index_manager_->reset();
- index_manager_->setFirstSuffix(name->getSuffix());
- std::queue<Interest::Ptr> empty;
- std::swap(interest_to_retransmit_, empty);
- stats_->reset();
-
- // Reset reassembly component
- reassembly_protocol_->reInitialize();
-
- // Reset protocol variables
- interests_in_flight_ = 0;
- t0_ = utils::SteadyClock::now();
-}
-
-bool RaaqmTransportProtocol::verifyKeyPackets() {
- return index_manager_->onKeyToVerify();
-}
-
-void RaaqmTransportProtocol::increaseWindow() {
- // return;
- double max_window_size = 0.;
- socket_->getSocketOption(GeneralTransportOptions::MAX_WINDOW_SIZE,
- max_window_size);
- if (current_window_size_ < max_window_size) {
- double gamma = 0.;
- socket_->getSocketOption(RaaqmTransportOptions::GAMMA_VALUE, gamma);
-
- current_window_size_ += gamma / current_window_size_;
- socket_->setSocketOption(GeneralTransportOptions::CURRENT_WINDOW_SIZE,
- current_window_size_);
- }
- rate_estimator_->onWindowIncrease(current_window_size_);
-}
-
-void RaaqmTransportProtocol::decreaseWindow() {
- // return;
- double min_window_size = 0.;
- socket_->getSocketOption(GeneralTransportOptions::MIN_WINDOW_SIZE,
- min_window_size);
- if (current_window_size_ > min_window_size) {
- double beta = 0.;
- socket_->getSocketOption(RaaqmTransportOptions::BETA_VALUE, beta);
-
- current_window_size_ = current_window_size_ * beta;
- if (current_window_size_ < min_window_size) {
- current_window_size_ = min_window_size;
- }
-
- socket_->setSocketOption(GeneralTransportOptions::CURRENT_WINDOW_SIZE,
- current_window_size_);
- }
- rate_estimator_->onWindowDecrease(current_window_size_);
-}
-
-void RaaqmTransportProtocol::afterDataUnsatisfied(uint64_t segment) {
- // Decrease the window because the timeout happened
- decreaseWindow();
-}
-
-void RaaqmTransportProtocol::afterContentReception(
- const Interest &interest, const ContentObject &content_object) {
- updatePathTable(content_object);
- increaseWindow();
- updateRtt(interest.getName().getSuffix());
- rate_estimator_->onDataReceived((int)content_object.payloadSize() +
- (int)content_object.headerSize());
- // Set drop probablility and window size accordingly
- RAAQM();
-}
-
-void RaaqmTransportProtocol::init() {
- std::ifstream is(RAAQM_CONFIG_PATH);
-
- std::string line;
- raaqm_autotune_ = false;
- default_beta_ = default_values::beta_value;
- default_drop_ = default_values::drop_factor;
- beta_wifi_ = default_values::beta_value;
- drop_wifi_ = default_values::drop_factor;
- beta_lte_ = default_values::beta_value;
- drop_lte_ = default_values::drop_factor;
- wifi_delay_ = 1000;
- lte_delay_ = 15000;
-
- if (!is) {
- TRANSPORT_LOGW(
- "WARNING: RAAQM parameters not found at %s, set default values",
- RAAQM_CONFIG_PATH);
- return;
- }
-
- while (getline(is, line)) {
- std::string command;
- std::istringstream line_s(line);
-
- line_s >> command;
-
- if (command == ";") {
- continue;
- }
-
- if (command == "autotune") {
- std::string tmp;
- std::string val;
- line_s >> tmp >> val;
- if (val == "yes") {
- raaqm_autotune_ = true;
- } else {
- raaqm_autotune_ = false;
- }
- continue;
- }
-
- if (command == "lifetime") {
- std::string tmp;
- uint32_t lifetime;
- line_s >> tmp >> lifetime;
- socket_->setSocketOption(GeneralTransportOptions::INTEREST_LIFETIME,
- lifetime);
- continue;
- }
-
- if (command == "retransmissions") {
- std::string tmp;
- uint32_t rtx;
- line_s >> tmp >> rtx;
- socket_->setSocketOption(GeneralTransportOptions::MAX_INTEREST_RETX, rtx);
- continue;
- }
-
- if (command == "beta") {
- std::string tmp;
- line_s >> tmp >> default_beta_;
- socket_->setSocketOption(RaaqmTransportOptions::BETA_VALUE,
- default_beta_);
- continue;
- }
-
- if (command == "drop") {
- std::string tmp;
- line_s >> tmp >> default_drop_;
- socket_->setSocketOption(RaaqmTransportOptions::DROP_FACTOR,
- default_drop_);
- continue;
- }
-
- if (command == "beta_wifi_") {
- std::string tmp;
- line_s >> tmp >> beta_wifi_;
- continue;
- }
-
- if (command == "drop_wifi_") {
- std::string tmp;
- line_s >> tmp >> drop_wifi_;
- continue;
- }
-
- if (command == "beta_lte_") {
- std::string tmp;
- line_s >> tmp >> beta_lte_;
- continue;
- }
-
- if (command == "drop_lte_") {
- std::string tmp;
- line_s >> tmp >> drop_lte_;
- continue;
- }
-
- if (command == "wifi_delay_") {
- std::string tmp;
- line_s >> tmp >> wifi_delay_;
- continue;
- }
-
- if (command == "lte_delay_") {
- std::string tmp;
- line_s >> tmp >> lte_delay_;
- continue;
- }
- if (command == "alpha") {
- std::string tmp;
- double rate_alpha = 0.0;
- line_s >> tmp >> rate_alpha;
- socket_->setSocketOption(RateEstimationOptions::RATE_ESTIMATION_ALPHA,
- rate_alpha);
- continue;
- }
-
- if (command == "batching_parameter") {
- std::string tmp;
- uint32_t batching_param = 0;
- line_s >> tmp >> batching_param;
- socket_->setSocketOption(
- RateEstimationOptions::RATE_ESTIMATION_BATCH_PARAMETER,
- batching_param);
- continue;
- }
-
- if (command == "rate_estimator") {
- std::string tmp;
- uint32_t choice_param = 0;
- line_s >> tmp >> choice_param;
- socket_->setSocketOption(RateEstimationOptions::RATE_ESTIMATION_CHOICE,
- choice_param);
- continue;
- }
- }
-
- is.close();
-}
-
-void RaaqmTransportProtocol::onContentObject(
- Interest::Ptr &&interest, ContentObject::Ptr &&content_object) {
- // Check whether makes sense to continue
- if (TRANSPORT_EXPECT_FALSE(!is_running_)) {
- return;
- }
-
- // Call application-defined callbacks
- ConsumerContentObjectCallback *callback_content_object = VOID_HANDLER;
- socket_->getSocketOption(ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT,
- &callback_content_object);
- if (*callback_content_object) {
- (*callback_content_object)(*socket_, *content_object);
- }
-
- ConsumerInterestCallback *callback_interest = VOID_HANDLER;
- socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_SATISFIED,
- &callback_interest);
- if (*callback_interest) {
- (*callback_interest)(*socket_, *interest);
- }
-
- if (content_object->getPayloadType() == PayloadType::CONTENT_OBJECT) {
- stats_->updateBytesRecv(content_object->payloadSize());
- }
-
- onContentSegment(std::move(interest), std::move(content_object));
- scheduleNextInterests();
-}
-
-void RaaqmTransportProtocol::onContentSegment(
- Interest::Ptr &&interest, ContentObject::Ptr &&content_object) {
- uint32_t incremental_suffix = content_object->getName().getSuffix();
-
- // Decrease in-flight interests
- interests_in_flight_--;
-
- // Update stats
- if (!interest_retransmissions_[incremental_suffix & mask]) {
- afterContentReception(*interest, *content_object);
- }
-
- index_manager_->onContentObject(std::move(interest),
- std::move(content_object));
-}
-
-void RaaqmTransportProtocol::onPacketDropped(
- Interest::Ptr &&interest, ContentObject::Ptr &&content_object) {
- uint32_t max_rtx = 0;
- socket_->getSocketOption(GeneralTransportOptions::MAX_INTEREST_RETX, max_rtx);
-
- uint64_t segment = interest->getName().getSuffix();
- ConsumerInterestCallback *callback = VOID_HANDLER;
- if (TRANSPORT_EXPECT_TRUE(interest_retransmissions_[segment & mask] <
- max_rtx)) {
- stats_->updateRetxCount(1);
-
- callback = VOID_HANDLER;
- socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_RETRANSMISSION,
- &callback);
- if (*callback) {
- (*callback)(*socket_, *interest);
- }
-
- callback = VOID_HANDLER;
- socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT,
- &callback);
- if (*callback) {
- (*callback)(*socket_, *interest);
- }
-
- if (!is_running_) {
- return;
- }
-
- interest_retransmissions_[segment & mask]++;
-
- interest_to_retransmit_.push(std::move(interest));
- } else {
- TRANSPORT_LOGE(
- "Stop: received not trusted packet %llu times",
- (unsigned long long)interest_retransmissions_[segment & mask]);
- onContentReassembled(
- make_error_code(protocol_error::max_retransmissions_error));
- }
-}
-
-void RaaqmTransportProtocol::onReassemblyFailed(std::uint32_t missing_segment) {
-
-}
-
-void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) {
- checkForStalePaths();
-
- const Name &n = interest->getName();
-
- TRANSPORT_LOGW("Timeout on content %s", n.toString().c_str());
-
- if (TRANSPORT_EXPECT_FALSE(!is_running_)) {
- return;
- }
-
- interests_in_flight_--;
-
- uint64_t segment = n.getSuffix();
-
- // Do not retransmit interests asking contents that do not exist.
- if (segment > index_manager_->getFinalSuffix()) {
- return;
- }
-
- ConsumerInterestCallback *callback = VOID_HANDLER;
- socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_EXPIRED,
- &callback);
- if (*callback) {
- (*callback)(*socket_, *interest);
- }
-
- afterDataUnsatisfied(segment);
-
- uint32_t max_rtx = 0;
- socket_->getSocketOption(GeneralTransportOptions::MAX_INTEREST_RETX, max_rtx);
-
- if (TRANSPORT_EXPECT_TRUE(interest_retransmissions_[segment & mask] <
- max_rtx)) {
- stats_->updateRetxCount(1);
-
- callback = VOID_HANDLER;
- socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_RETRANSMISSION,
- &callback);
- if (*callback) {
- (*callback)(*socket_, *interest);
- }
-
- callback = VOID_HANDLER;
- socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT,
- &callback);
- if (*callback) {
- (*callback)(*socket_, *interest);
- }
-
- if (!is_running_) {
- return;
- }
-
- interest_retransmissions_[segment & mask]++;
-
- interest_to_retransmit_.push(std::move(interest));
-
- scheduleNextInterests();
- } else {
- TRANSPORT_LOGE("Stop: reached max retx limit.");
- onContentReassembled(std::make_error_code(std::errc(std::errc::io_error)));
- }
-}
-
-void RaaqmTransportProtocol::scheduleNextInterests() {
- if (TRANSPORT_EXPECT_FALSE(!is_running_ && !is_first_)) {
- return;
- }
-
- if (TRANSPORT_EXPECT_FALSE(interests_in_flight_ >= current_window_size_ &&
- interest_to_retransmit_.size() > 0)) {
- // send at least one interest if there are retransmissions to perform and
- // there is no space left in the window
- sendInterest(std::move(interest_to_retransmit_.front()));
- TRANSPORT_LOGD("Window full, retransmit one content interest");
- interest_to_retransmit_.pop();
- }
-
- uint32_t index = IndexManager::invalid_index;
-
- // Send the interest needed for filling the window
- while (interests_in_flight_ < current_window_size_) {
- if (interest_to_retransmit_.size() > 0) {
- sendInterest(std::move(interest_to_retransmit_.front()));
- TRANSPORT_LOGD("Retransmit content interest");
- interest_to_retransmit_.pop();
- } else {
- index = index_manager_->getNextSuffix();
- if (index == IndexManager::invalid_index) {
- TRANSPORT_LOGE("INVALID INDEX %d", index);
- break;
- }
-
- sendInterest(index);
- TRANSPORT_LOGD("Send content interest %u", index);
- }
- }
-}
-
-void RaaqmTransportProtocol::sendInterest(std::uint64_t next_suffix) {
- auto interest = getPacket();
- core::Name *name;
- socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &name);
- name->setSuffix((uint32_t)next_suffix);
- interest->setName(*name);
-
- uint32_t interest_lifetime;
- socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME,
- interest_lifetime);
- interest->setLifetime(interest_lifetime);
-
- ConsumerInterestCallback *callback = VOID_HANDLER;
- socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT,
- &callback);
- if (*callback) {
- callback->operator()(*socket_, *interest);
- }
-
- if (TRANSPORT_EXPECT_FALSE(!is_running_ && !is_first_)) {
- return;
- }
-
- // This is set to ~0 so that the next interest_retransmissions_ + 1,
- // performed by sendInterest, will result in 0
- interest_retransmissions_[next_suffix & mask] = ~0;
- interest_timepoints_[next_suffix & mask] = utils::SteadyClock::now();
- sendInterest(std::move(interest));
-}
-
-void RaaqmTransportProtocol::sendInterest(Interest::Ptr &&interest) {
- interests_in_flight_++;
- interest_retransmissions_[interest->getName().getSuffix() & mask]++;
-
- portal_->sendInterest(std::move(interest));
-}
-
-void RaaqmTransportProtocol::onContentReassembled(std::error_code ec) {
- rate_estimator_->onDownloadFinished();
- TransportProtocol::onContentReassembled(ec);
-}
-
-void RaaqmTransportProtocol::updateRtt(uint64_t segment) {
- if (TRANSPORT_EXPECT_FALSE(!cur_path_)) {
- throw std::runtime_error("RAAQM ERROR: no current path found, exit");
- } else {
- auto now = utils::SteadyClock::now();
- utils::Microseconds rtt = std::chrono::duration_cast<utils::Microseconds>(
- now - interest_timepoints_[segment & mask]);
-
- // Update stats
- updateStats((uint32_t)segment, rtt.count(), now);
-
- if (rate_estimator_) {
- rate_estimator_->onRttUpdate((double)rtt.count());
- }
-
- cur_path_->insertNewRtt(rtt.count());
- cur_path_->smoothTimer();
-
- if (cur_path_->newPropagationDelayAvailable()) {
- checkDropProbability();
- }
- }
-}
-
-void RaaqmTransportProtocol::RAAQM() {
- if (!cur_path_) {
- throw errors::RuntimeException("ERROR: no current path found, exit");
- exit(EXIT_FAILURE);
- } else {
- // Change drop probability according to RTT statistics
- cur_path_->updateDropProb();
-
- double coin = ((double)rand() / (RAND_MAX));
- if (coin <= cur_path_->getDropProb()) {
- decreaseWindow();
- }
- }
-}
-
-void RaaqmTransportProtocol::updateStats(uint32_t suffix, uint64_t rtt,
- utils::TimePoint &now) {
- // Update RTT statistics
- stats_->updateAverageRtt(rtt);
- stats_->updateAverageWindowSize(current_window_size_);
-
- // Call statistics callback
- ConsumerTimerCallback *stats_callback = VOID_HANDLER;
- socket_->getSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY,
- &stats_callback);
- if (*stats_callback) {
- auto dt = std::chrono::duration_cast<utils::Milliseconds>(now - t0_);
-
- uint32_t timer_interval_milliseconds = 0;
- socket_->getSocketOption(GeneralTransportOptions::STATS_INTERVAL,
- timer_interval_milliseconds);
- if (dt.count() > timer_interval_milliseconds) {
- (*stats_callback)(*socket_, *stats_);
- t0_ = utils::SteadyClock::now();
- }
- }
-}
-
-void RaaqmTransportProtocol::updatePathTable(
- const ContentObject &content_object) {
- uint32_t path_id = content_object.getPathLabel();
-
- if (path_table_.find(path_id) == path_table_.end()) {
- if (TRANSPORT_EXPECT_TRUE(cur_path_ != nullptr)) {
- // Create a new path with some default param
-
- if (TRANSPORT_EXPECT_FALSE(path_table_.empty())) {
- throw errors::RuntimeException(
- "[RAAQM] No path initialized for path table, error could be in "
- "default path initialization.");
- }
-
- // Initiate the new path default param
- auto new_path = std::make_unique<RaaqmDataPath>(
- *(path_table_.at(default_values::path_id)));
-
- // Insert the new path into hash table
- path_table_[path_id] = std::move(new_path);
- } else {
- throw errors::RuntimeException(
- "UNEXPECTED ERROR: when running,current path not found.");
- }
- }
-
- cur_path_ = path_table_[path_id].get();
-
- size_t header_size = content_object.headerSize();
- size_t data_size = content_object.payloadSize();
-
- // Update measurements for path
- cur_path_->updateReceivedStats(header_size + data_size, data_size);
-}
-
-void RaaqmTransportProtocol::checkDropProbability() {
- if (!raaqm_autotune_) {
- return;
- }
-
- unsigned int max_pd = 0;
- PathTable::iterator it;
- for (auto it = path_table_.begin(); it != path_table_.end(); ++it) {
- if (it->second->getPropagationDelay() > max_pd &&
- it->second->getPropagationDelay() != UINT_MAX &&
- !it->second->isStale()) {
- max_pd = it->second->getPropagationDelay();
- }
- }
-
- double drop_prob = 0;
- double beta = 0;
- if (max_pd < wifi_delay_) { // only ethernet paths
- drop_prob = default_drop_;
- beta = default_beta_;
- } else if (max_pd < lte_delay_) { // at least one wifi path
- drop_prob = drop_wifi_;
- beta = beta_wifi_;
- } else { // at least one lte path
- drop_prob = drop_lte_;
- beta = beta_lte_;
- }
-
- double old_drop_prob = 0;
- double old_beta = 0;
- socket_->getSocketOption(RaaqmTransportOptions::BETA_VALUE, old_beta);
- socket_->getSocketOption(RaaqmTransportOptions::DROP_FACTOR, old_drop_prob);
-
- if (drop_prob == old_drop_prob && beta == old_beta) {
- return;
- }
-
- socket_->setSocketOption(RaaqmTransportOptions::BETA_VALUE, beta);
- socket_->setSocketOption(RaaqmTransportOptions::DROP_FACTOR, drop_prob);
-
- for (it = path_table_.begin(); it != path_table_.end(); it++) {
- it->second->setDropProb(drop_prob);
- }
-}
-
-void RaaqmTransportProtocol::checkForStalePaths() {
- if (!raaqm_autotune_) {
- return;
- }
-
- bool stale = false;
- PathTable::iterator it;
- for (it = path_table_.begin(); it != path_table_.end(); ++it) {
- if (it->second->isStale()) {
- stale = true;
- break;
- }
- }
- if (stale) {
- checkDropProbability();
- }
-}
-
-} // end namespace protocol
-
-} // namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/raaqm.h b/libtransport/src/hicn/transport/protocols/raaqm.h
deleted file mode 100644
index f2d819ec5..000000000
--- a/libtransport/src/hicn/transport/protocols/raaqm.h
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <hicn/transport/protocols/byte_stream_reassembly.h>
-#include <hicn/transport/protocols/congestion_window_protocol.h>
-#include <hicn/transport/protocols/protocol.h>
-#include <hicn/transport/protocols/raaqm_data_path.h>
-#include <hicn/transport/protocols/rate_estimation.h>
-#include <hicn/transport/utils/chrono_typedefs.h>
-
-#include <queue>
-#include <vector>
-
-namespace transport {
-
-namespace protocol {
-
-class RaaqmTransportProtocol : public TransportProtocol,
- public CWindowProtocol {
- public:
- RaaqmTransportProtocol(interface::ConsumerSocket *icnet_socket);
-
- ~RaaqmTransportProtocol();
-
- int start() override;
-
- void resume() override;
-
- void reset() override;
-
- virtual bool verifyKeyPackets() override;
-
- protected:
- static constexpr uint32_t buffer_size =
- 1 << interface::default_values::log_2_default_buffer_size;
- static constexpr uint16_t mask = buffer_size - 1;
- using PathTable =
- std::unordered_map<uint32_t, std::unique_ptr<RaaqmDataPath>>;
-
- void increaseWindow() override;
- void decreaseWindow() override;
-
- virtual void afterContentReception(const Interest &interest,
- const ContentObject &content_object);
- virtual void afterDataUnsatisfied(uint64_t segment);
-
- virtual void updateStats(uint32_t suffix, uint64_t rtt,
- utils::TimePoint &now);
-
- private:
- void init();
-
- void onContentObject(Interest::Ptr &&i, ContentObject::Ptr &&c) override;
-
- void onContentSegment(Interest::Ptr &&interest,
- ContentObject::Ptr &&content_object);
-
- void onPacketDropped(Interest::Ptr &&interest,
- ContentObject::Ptr &&content_object) override;
-
- void onReassemblyFailed(std::uint32_t missing_segment) override;
-
- void onTimeout(Interest::Ptr &&i) override;
-
- virtual void scheduleNextInterests() override;
-
- void sendInterest(std::uint64_t next_suffix);
-
- void sendInterest(Interest::Ptr &&interest);
-
- void onContentReassembled(std::error_code ec) override;
-
- void updateRtt(uint64_t segment);
-
- void RAAQM();
-
- void updatePathTable(const ContentObject &content_object);
-
- void checkDropProbability();
-
- void checkForStalePaths();
-
- void printRtt();
-
- protected:
- // Congestion window management
- double current_window_size_;
- // Protocol management
- uint64_t interests_in_flight_;
- std::array<std::uint32_t, buffer_size> interest_retransmissions_;
- std::array<utils::TimePoint, buffer_size> interest_timepoints_;
- std::queue<Interest::Ptr> interest_to_retransmit_;
-
- private:
- /**
- * Current download path
- */
- RaaqmDataPath *cur_path_;
-
- /**
- * Hash table for path: each entry is a pair path ID(key) - path object
- */
- PathTable path_table_;
-
- // TimePoints for statistic
- utils::TimePoint t0_;
-
- bool set_interest_filter_;
-
- // for rate-estimation at packet level
- IcnRateEstimator *rate_estimator_;
-
- // params for autotuning
- bool raaqm_autotune_;
- double default_beta_;
- double default_drop_;
- double beta_wifi_;
- double drop_wifi_;
- double beta_lte_;
- double drop_lte_;
- unsigned int wifi_delay_;
- unsigned int lte_delay_;
-};
-
-} // end namespace protocol
-
-} // end namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/raaqm_data_path.cc b/libtransport/src/hicn/transport/protocols/raaqm_data_path.cc
deleted file mode 100644
index e25646205..000000000
--- a/libtransport/src/hicn/transport/protocols/raaqm_data_path.cc
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <hicn/transport/protocols/raaqm_data_path.h>
-#include <hicn/transport/utils/chrono_typedefs.h>
-
-namespace transport {
-
-namespace protocol {
-
-RaaqmDataPath::RaaqmDataPath(double drop_factor,
- double minimum_drop_probability,
- unsigned new_timer, unsigned int samples,
- uint64_t new_rtt, uint64_t new_rtt_min,
- uint64_t new_rtt_max, unsigned new_pd)
-
- : drop_factor_(drop_factor),
- minimum_drop_probability_(minimum_drop_probability),
- timer_(new_timer),
- samples_(samples),
- rtt_(new_rtt),
- rtt_min_(new_rtt_min),
- rtt_max_(new_rtt_max),
- prop_delay_(new_pd),
- new_prop_delay_(false),
- drop_prob_(0),
- packets_received_(0),
- last_packets_received_(0),
- m_packets_bytes_received_(0),
- last_packets_bytes_received_(0),
- raw_data_bytes_received_(0),
- last_raw_data_bytes_received_(0),
- rtt_samples_(samples_),
- last_received_pkt_(utils::SteadyClock::now()),
- average_rtt_(0),
- alpha_(ALPHA) {}
-
-RaaqmDataPath &RaaqmDataPath::insertNewRtt(uint64_t new_rtt) {
- rtt_ = new_rtt;
- rtt_samples_.pushBack(new_rtt);
-
- rtt_max_ = rtt_samples_.rBegin();
- rtt_min_ = rtt_samples_.begin();
-
- if (rtt_min_ < prop_delay_) {
- new_prop_delay_ = true;
- prop_delay_ = rtt_min_;
- }
-
- last_received_pkt_ = utils::SteadyClock::now();
-
- return *this;
-}
-
-RaaqmDataPath &RaaqmDataPath::updateReceivedStats(std::size_t packet_size,
- std::size_t data_size) {
- packets_received_++;
- m_packets_bytes_received_ += packet_size;
- raw_data_bytes_received_ += data_size;
-
- return *this;
-}
-
-double RaaqmDataPath::getDropFactor() { return drop_factor_; }
-
-double RaaqmDataPath::getDropProb() { return drop_prob_; }
-
-RaaqmDataPath &RaaqmDataPath::setDropProb(double dropProb) {
- drop_prob_ = dropProb;
-
- return *this;
-}
-
-double RaaqmDataPath::getMinimumDropProbability() {
- return minimum_drop_probability_;
-}
-
-double RaaqmDataPath::getTimer() { return timer_; }
-
-RaaqmDataPath &RaaqmDataPath::smoothTimer() {
- timer_ = (1 - TIMEOUT_SMOOTHER) * timer_ +
- (TIMEOUT_SMOOTHER)*rtt_ * (TIMEOUT_RATIO);
-
- return *this;
-}
-
-double RaaqmDataPath::getRtt() { return (double)rtt_; }
-
-double RaaqmDataPath::getAverageRtt() { return average_rtt_; }
-
-double RaaqmDataPath::getRttMax() { return (double)rtt_max_; }
-
-double RaaqmDataPath::getRttMin() { return (double)rtt_min_; }
-
-unsigned RaaqmDataPath::getSampleValue() { return samples_; }
-
-unsigned RaaqmDataPath::getRttQueueSize() {
- return static_cast<unsigned>(rtt_samples_.size());
-}
-
-RaaqmDataPath &RaaqmDataPath::updateDropProb() {
- drop_prob_ = 0.0;
-
- if (getSampleValue() == getRttQueueSize()) {
- if (rtt_max_ == rtt_min_) {
- drop_prob_ = minimum_drop_probability_;
- } else {
- drop_prob_ = minimum_drop_probability_ +
- drop_factor_ * (rtt_ - rtt_min_) / (rtt_max_ - rtt_min_);
- }
- }
-
- return *this;
-}
-
-void RaaqmDataPath::setAlpha(double alpha) {
- if (alpha >= 0 && alpha <= 1) {
- alpha_ = alpha;
- }
-}
-
-bool RaaqmDataPath::newPropagationDelayAvailable() {
- bool r = new_prop_delay_;
- new_prop_delay_ = false;
- return r;
-}
-
-unsigned int RaaqmDataPath::getPropagationDelay() {
- return (unsigned int)prop_delay_;
-}
-
-bool RaaqmDataPath::isStale() {
- utils::TimePoint now = utils::SteadyClock::now();
- auto time =
- std::chrono::duration_cast<utils::Microseconds>(now - last_received_pkt_)
- .count();
- if (time > 2000000) {
- return true;
- }
- return false;
-}
-
-} // end namespace protocol
-
-} // end namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/raaqm_data_path.h b/libtransport/src/hicn/transport/protocols/raaqm_data_path.h
deleted file mode 100644
index 9e4accfa5..000000000
--- a/libtransport/src/hicn/transport/protocols/raaqm_data_path.h
+++ /dev/null
@@ -1,225 +0,0 @@
-/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <hicn/transport/utils/chrono_typedefs.h>
-#include <hicn/transport/utils/min_filter.h>
-
-#include <chrono>
-#include <climits>
-#include <iostream>
-
-#define TIMEOUT_SMOOTHER 0.1
-#define TIMEOUT_RATIO 10
-#define ALPHA 0.8
-
-namespace transport {
-
-namespace protocol {
-
-class RaaqmDataPath {
- public:
- RaaqmDataPath(double drop_factor, double minimum_drop_probability,
- unsigned new_timer, unsigned int samples,
- uint64_t new_rtt = 1000, uint64_t new_rtt_min = 1000,
- uint64_t new_rtt_max = 1000, unsigned new_pd = UINT_MAX);
-
- public:
- /*
- * @brief Add a new RTT to the RTT queue of the path, check if RTT queue is
- * full, and thus need overwrite. Also it maintains the validity of min and
- * max of RTT.
- * @param new_rtt is the value of the new RTT
- */
- RaaqmDataPath &insertNewRtt(uint64_t new_rtt);
-
- /**
- * @brief Update the path statistics
- * @param packet_size the size of the packet received, including the ICN
- * header
- * @param data_size the size of the data received, without the ICN header
- */
- RaaqmDataPath &updateReceivedStats(std::size_t packet_size,
- std::size_t data_size);
-
- /**
- * @brief Get the value of the drop factor parameter
- */
- double getDropFactor();
-
- /**
- * @brief Get the value of the drop probability
- */
- double getDropProb();
-
- /**
- * @brief Set the value pf the drop probability
- * @param drop_prob is the value of the drop probability
- */
- RaaqmDataPath &setDropProb(double drop_prob);
-
- /**
- * @brief Get the minimum drop probability
- */
- double getMinimumDropProbability();
-
- /**
- * @brief Get last RTT
- */
- double getRtt();
-
- /**
- * @brief Get average RTT
- */
- double getAverageRtt();
-
- /**
- * @brief Get the current m_timer value
- */
- double getTimer();
-
- /**
- * @brief Smooth he value of the m_timer accordingly with the last RTT
- * measured
- */
- RaaqmDataPath &smoothTimer();
-
- /**
- * @brief Get the maximum RTT among the last samples
- */
- double getRttMax();
-
- /**
- * @brief Get the minimum RTT among the last samples
- */
- double getRttMin();
-
- /**
- * @brief Get the number of saved samples
- */
- unsigned getSampleValue();
-
- /**
- * @brief Get the size og the RTT queue
- */
- unsigned getRttQueueSize();
-
- /*
- * @brief Change drop probability according to RTT statistics
- * Invoked in RAAQM(), before control window size update.
- */
- RaaqmDataPath &updateDropProb();
-
- void setAlpha(double alpha);
-
- /**
- * @brief Returns the smallest RTT registered so far for this path
- */
-
- unsigned int getPropagationDelay();
-
- bool newPropagationDelayAvailable();
-
- bool isStale();
-
- private:
- /**
- * The value of the drop factor
- */
- double drop_factor_;
-
- /**
- * The minumum drop probability
- */
- double minimum_drop_probability_;
-
- /**
- * The timer, expressed in milliseconds
- */
- double timer_;
-
- /**
- * The number of samples to store for computing the protocol measurements
- */
- const unsigned int samples_;
-
- /**
- * The last, the minimum and the maximum value of the RTT (among the last
- * m_samples samples)
- */
- uint64_t rtt_, rtt_min_, rtt_max_, prop_delay_;
-
- bool new_prop_delay_;
-
- /**
- * The current drop probability
- */
- double drop_prob_;
-
- /**
- * The number of packets received in this path
- */
- intmax_t packets_received_;
-
- /**
- * The first packet received after the statistics print
- */
- intmax_t last_packets_received_;
-
- /**
- * Total number of bytes received including the ICN header
- */
- intmax_t m_packets_bytes_received_;
-
- /**
- * The amount of packet bytes received at the last path summary computation
- */
- intmax_t last_packets_bytes_received_;
-
- /**
- * Total number of bytes received without including the ICN header
- */
- intmax_t raw_data_bytes_received_;
-
- /**
- * The amount of raw dat bytes received at the last path summary computation
- */
- intmax_t last_raw_data_bytes_received_;
-
- class byArrival;
-
- class byOrder;
-
- /**
- * Double ended queue for the RTTs
- */
-
- typedef utils::MinFilter<uint64_t> RTTQueue;
-
- RTTQueue rtt_samples_;
-
- /**
- * Time of the last call to the path reporter method
- */
- utils::TimePoint last_received_pkt_;
-
- double average_rtt_;
- double alpha_;
-};
-
-} // end namespace protocol
-
-} // end namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/rate_estimation.cc b/libtransport/src/hicn/transport/protocols/rate_estimation.cc
deleted file mode 100644
index 50306e6e5..000000000
--- a/libtransport/src/hicn/transport/protocols/rate_estimation.cc
+++ /dev/null
@@ -1,355 +0,0 @@
-/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <hicn/transport/interfaces/socket_options_default_values.h>
-#include <hicn/transport/protocols/rate_estimation.h>
-#include <hicn/transport/utils/log.h>
-
-#include <thread>
-
-namespace transport {
-
-namespace protocol {
-
-void *Timer(void *data) {
- InterRttEstimator *estimator = (InterRttEstimator *)data;
-
- double dat_rtt, my_avg_win, my_avg_rtt;
- int my_win_change, number_of_packets, max_packet_size;
-
- pthread_mutex_lock(&(estimator->mutex_));
- dat_rtt = estimator->rtt_;
- pthread_mutex_unlock(&(estimator->mutex_));
-
- while (estimator->is_running_) {
- std::this_thread::sleep_for(std::chrono::microseconds(
- (uint64_t)(interface::default_values::kv * dat_rtt)));
-
- pthread_mutex_lock(&(estimator->mutex_));
-
- dat_rtt = estimator->rtt_;
- my_avg_win = estimator->avg_win_;
- my_avg_rtt = estimator->avg_rtt_;
- my_win_change = (int)(estimator->win_change_);
- number_of_packets = estimator->number_of_packets_;
- max_packet_size = estimator->max_packet_size_;
- estimator->avg_rtt_ = estimator->rtt_;
- estimator->avg_win_ = 0;
- estimator->win_change_ = 0;
- estimator->number_of_packets_ = 1;
-
- pthread_mutex_unlock(&(estimator->mutex_));
-
- if (number_of_packets == 0 || my_win_change == 0) {
- continue;
- }
- if (estimator->estimation_ == 0) {
- estimator->estimation_ = (my_avg_win * 8.0 * max_packet_size * 1000000.0 /
- (1.0 * my_win_change)) /
- (my_avg_rtt / (1.0 * number_of_packets));
- }
-
- estimator->estimation_ =
- estimator->alpha_ * estimator->estimation_ +
- (1 - estimator->alpha_) * ((my_avg_win * 8.0 * max_packet_size *
- 1000000.0 / (1.0 * my_win_change)) /
- (my_avg_rtt / (1.0 * number_of_packets)));
-
- if (estimator->observer_) {
- estimator->observer_->notifyStats(estimator->estimation_);
- }
- }
-
- return nullptr;
-}
-
-InterRttEstimator::InterRttEstimator(double alpha_arg) {
- this->estimated_ = false;
- this->observer_ = NULL;
- this->alpha_ = alpha_arg;
- this->thread_is_running_ = false;
- this->my_th_ = NULL;
- this->is_running_ = true;
- this->avg_rtt_ = 0.0;
- this->estimation_ = 0.0;
- this->avg_win_ = 0.0;
- this->rtt_ = 0.0;
- this->win_change_ = 0;
- this->number_of_packets_ = 0;
- this->max_packet_size_ = 0;
- this->win_current_ = 1.0;
-
- pthread_mutex_init(&(this->mutex_), NULL);
- this->start_time_ = std::chrono::steady_clock::now();
- this->begin_batch_ = std::chrono::steady_clock::now();
-}
-
-InterRttEstimator::~InterRttEstimator() {
- this->is_running_ = false;
- if (this->my_th_) {
- pthread_join(*(this->my_th_), NULL);
- }
- this->my_th_ = NULL;
- pthread_mutex_destroy(&(this->mutex_));
-}
-
-void InterRttEstimator::onRttUpdate(double rtt) {
- pthread_mutex_lock(&(this->mutex_));
- this->rtt_ = rtt;
- this->number_of_packets_++;
- this->avg_rtt_ += rtt;
- pthread_mutex_unlock(&(this->mutex_));
-
- if (!thread_is_running_) {
- my_th_ = (pthread_t *)malloc(sizeof(pthread_t));
- if (!my_th_) {
- TRANSPORT_LOGE("Error allocating thread.");
- my_th_ = NULL;
- }
- if (/*int err = */ pthread_create(my_th_, NULL, transport::protocol::Timer,
- (void *)this)) {
- TRANSPORT_LOGE("Error creating the thread");
- my_th_ = NULL;
- }
- thread_is_running_ = true;
- }
-}
-
-void InterRttEstimator::onWindowIncrease(double win_current) {
- TimePoint end = std::chrono::steady_clock::now();
- auto delay =
- std::chrono::duration_cast<Microseconds>(end - this->begin_batch_)
- .count();
-
- pthread_mutex_lock(&(this->mutex_));
- this->avg_win_ += this->win_current_ * delay;
- this->win_current_ = win_current;
- this->win_change_ += delay;
- pthread_mutex_unlock(&(this->mutex_));
-
- this->begin_batch_ = std::chrono::steady_clock::now();
-}
-
-void InterRttEstimator::onWindowDecrease(double win_current) {
- TimePoint end = std::chrono::steady_clock::now();
- auto delay =
- std::chrono::duration_cast<Microseconds>(end - this->begin_batch_)
- .count();
-
- pthread_mutex_lock(&(this->mutex_));
- this->avg_win_ += this->win_current_ * delay;
- this->win_current_ = win_current;
- this->win_change_ += delay;
- pthread_mutex_unlock(&(this->mutex_));
-
- this->begin_batch_ = std::chrono::steady_clock::now();
-}
-
-ALaTcpEstimator::ALaTcpEstimator() {
- this->estimation_ = 0.0;
- this->observer_ = NULL;
- this->start_time_ = std::chrono::steady_clock::now();
- this->totalSize_ = 0.0;
-}
-
-void ALaTcpEstimator::onStart() {
- this->totalSize_ = 0.0;
- this->start_time_ = std::chrono::steady_clock::now();
-}
-
-void ALaTcpEstimator::onDownloadFinished() {
- TimePoint end = std::chrono::steady_clock::now();
- auto delay =
- std::chrono::duration_cast<Microseconds>(end - this->start_time_).count();
- this->estimation_ = this->totalSize_ * 8 * 1000000 / delay;
- if (observer_) {
- observer_->notifyStats(this->estimation_);
- }
-}
-
-void ALaTcpEstimator::onDataReceived(int packet_size) {
- this->totalSize_ += packet_size;
-}
-
-SimpleEstimator::SimpleEstimator(double alphaArg, int batching_param) {
- this->estimation_ = 0.0;
- this->estimated_ = false;
- this->observer_ = nullptr;
- this->batching_param_ = batching_param;
- this->total_size_ = 0.0;
- this->number_of_packets_ = 0;
- this->base_alpha_ = alphaArg;
- this->alpha_ = alphaArg;
- this->start_time_ = std::chrono::steady_clock::now();
- this->begin_batch_ = std::chrono::steady_clock::now();
-}
-
-void SimpleEstimator::onStart() {
- this->estimated_ = false;
- this->number_of_packets_ = 0;
- this->total_size_ = 0.0;
- this->start_time_ = std::chrono::steady_clock::now();
- this->begin_batch_ = std::chrono::steady_clock::now();
-}
-
-void SimpleEstimator::onDownloadFinished() {
- TimePoint end = std::chrono::steady_clock::now();
- auto delay =
- std::chrono::duration_cast<Microseconds>(end - this->start_time_).count();
- if (observer_) {
- observer_->notifyDownloadTime((double)delay);
- }
- if (!this->estimated_) {
- // Assuming all packets carry max_packet_size_ bytes of data
- // (8*max_packet_size_ bits); 1000000 factor to convert us to seconds
- if (this->estimation_) {
- this->estimation_ =
- alpha_ * this->estimation_ +
- (1 - alpha_) * (total_size_ * 8 * 1000000.0 / (delay));
- } else {
- this->estimation_ = total_size_ * 8 * 1000000.0 / (delay);
- }
- if (observer_) {
- observer_->notifyStats(this->estimation_);
- }
- this->alpha_ = this->base_alpha_ * (((double)this->number_of_packets_) /
- ((double)this->batching_param_));
- } else {
- if (this->number_of_packets_ >=
- (int)(75.0 * (double)this->batching_param_ / 100.0)) {
- delay = std::chrono::duration_cast<Microseconds>(end - this->begin_batch_)
- .count();
- // Assuming all packets carry max_packet_size_ bytes of data
- // (8*max_packet_size_ bits); 1000000 factor to convert us to seconds
- if (this->estimation_) {
- this->estimation_ =
- alpha_ * this->estimation_ +
- (1 - alpha_) * (total_size_ * 8 * 1000000.0 / (delay));
- } else {
- this->estimation_ = total_size_ * 8 * 1000000.0 / (delay);
- }
- if (observer_) {
- observer_->notifyStats(this->estimation_);
- }
- this->alpha_ = this->base_alpha_ * (((double)this->number_of_packets_) /
- ((double)this->batching_param_));
- }
- }
- this->number_of_packets_ = 0;
- this->total_size_ = 0.0;
- this->start_time_ = std::chrono::steady_clock::now();
- this->begin_batch_ = std::chrono::steady_clock::now();
-}
-
-void SimpleEstimator::onDataReceived(int packet_size) {
- this->total_size_ += packet_size;
-}
-
-void SimpleEstimator::onRttUpdate(double rtt) {
- this->number_of_packets_++;
-
- if (this->number_of_packets_ == this->batching_param_) {
- TimePoint end = std::chrono::steady_clock::now();
- auto delay =
- std::chrono::duration_cast<Microseconds>(end - this->begin_batch_)
- .count();
- // Assuming all packets carry max_packet_size_ bytes of data
- // (8*max_packet_size_ bits); 1000000 factor to convert us to seconds
- if (this->estimation_) {
- this->estimation_ =
- alpha_ * this->estimation_ +
- (1 - alpha_) * (total_size_ * 8 * 1000000.0 / (delay));
- } else {
- this->estimation_ = total_size_ * 8 * 1000000.0 / (delay);
- }
- if (observer_) {
- observer_->notifyStats(this->estimation_);
- }
- this->alpha_ = this->base_alpha_;
- this->number_of_packets_ = 0;
- this->total_size_ = 0.0;
- this->begin_batch_ = std::chrono::steady_clock::now();
- }
-}
-
-BatchingPacketsEstimator::BatchingPacketsEstimator(double alpha_arg,
- int param) {
- this->estimated_ = false;
- this->observer_ = NULL;
- this->alpha_ = alpha_arg;
- this->batching_param_ = param;
- this->number_of_packets_ = 0;
- this->avg_win_ = 0.0;
- this->avg_rtt_ = 0.0;
- this->win_change_ = 0.0;
- this->max_packet_size_ = 0;
- this->estimation_ = 0.0;
- this->win_current_ = 1.0;
- this->begin_batch_ = std::chrono::steady_clock::now();
- this->start_time_ = std::chrono::steady_clock::now();
-}
-
-void BatchingPacketsEstimator::onRttUpdate(double rtt) {
- this->number_of_packets_++;
- this->avg_rtt_ += rtt;
-
- if (number_of_packets_ == this->batching_param_) {
- if (estimation_ == 0) {
- estimation_ = (avg_win_ * 8.0 * max_packet_size_ * 1000000.0 /
- (1.0 * win_change_)) /
- (avg_rtt_ / (1.0 * number_of_packets_));
- } else {
- estimation_ = alpha_ * estimation_ +
- (1 - alpha_) * ((avg_win_ * 8.0 * max_packet_size_ *
- 1000000.0 / (1.0 * win_change_)) /
- (avg_rtt_ / (1.0 * number_of_packets_)));
- }
-
- if (observer_) {
- observer_->notifyStats(estimation_);
- }
-
- this->number_of_packets_ = 0;
- this->avg_win_ = 0.0;
- this->avg_rtt_ = 0.0;
- this->win_change_ = 0.0;
- }
-}
-
-void BatchingPacketsEstimator::onWindowIncrease(double win_current) {
- TimePoint end = std::chrono::steady_clock::now();
- auto delay =
- std::chrono::duration_cast<Microseconds>(end - this->begin_batch_)
- .count();
- this->avg_win_ += this->win_current_ * delay;
- this->win_current_ = win_current;
- this->win_change_ += delay;
- this->begin_batch_ = std::chrono::steady_clock::now();
-}
-
-void BatchingPacketsEstimator::onWindowDecrease(double win_current) {
- TimePoint end = std::chrono::steady_clock::now();
- auto delay =
- std::chrono::duration_cast<Microseconds>(end - this->begin_batch_)
- .count();
- this->avg_win_ += this->win_current_ * delay;
- this->win_current_ = win_current;
- this->win_change_ += delay;
- this->begin_batch_ = std::chrono::steady_clock::now();
-}
-
-} // end namespace protocol
-
-} // end namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/rate_estimation.h b/libtransport/src/hicn/transport/protocols/rate_estimation.h
deleted file mode 100644
index 616501b24..000000000
--- a/libtransport/src/hicn/transport/protocols/rate_estimation.h
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <hicn/transport/protocols/download_observer.h>
-#include <hicn/transport/protocols/raaqm_data_path.h>
-
-#include <chrono>
-
-namespace transport {
-
-namespace protocol {
-
-class IcnRateEstimator {
- public:
- using TimePoint = std::chrono::steady_clock::time_point;
- using Microseconds = std::chrono::microseconds;
-
- IcnRateEstimator(){};
-
- virtual ~IcnRateEstimator(){};
-
- virtual void onRttUpdate(double rtt){};
-
- virtual void onDataReceived(int packetSize){};
-
- virtual void onWindowIncrease(double winCurrent){};
-
- virtual void onWindowDecrease(double winCurrent){};
-
- virtual void onStart(){};
-
- virtual void onDownloadFinished(){};
-
- virtual void setObserver(IcnObserver *observer) {
- this->observer_ = observer;
- };
- IcnObserver *observer_;
- TimePoint start_time_;
- TimePoint begin_batch_;
- double base_alpha_;
- double alpha_;
- double estimation_;
- int number_of_packets_;
- // this boolean is to make sure at least one estimation of the BW is done
- bool estimated_;
-};
-
-// A rate estimator RTT-based. Computes EWMA(WinSize)/EWMA(RTT)
-
-class InterRttEstimator : public IcnRateEstimator {
- public:
- InterRttEstimator(double alpha_arg);
-
- ~InterRttEstimator();
-
- void onRttUpdate(double rtt);
-
- void onDataReceived(int packet_size) {
- if (packet_size > this->max_packet_size_) {
- this->max_packet_size_ = packet_size;
- }
- };
-
- void onWindowIncrease(double win_current);
-
- void onWindowDecrease(double win_current);
-
- void onStart(){};
-
- void onDownloadFinished(){};
-
- // private: should be done by using getters
- pthread_t *my_th_;
- bool thread_is_running_;
- double rtt_;
- bool is_running_;
- pthread_mutex_t mutex_;
- double avg_rtt_;
- double avg_win_;
- int max_packet_size_;
- double win_change_;
- double win_current_;
-};
-
-// A rate estimator, Batching Packets based. Computes EWMA(WinSize)/EWMA(RTT)
-
-class BatchingPacketsEstimator : public IcnRateEstimator {
- public:
- BatchingPacketsEstimator(double alpha_arg, int batchingParam);
-
- void onRttUpdate(double rtt);
-
- void onDataReceived(int packet_size) {
- if (packet_size > this->max_packet_size_) {
- this->max_packet_size_ = packet_size;
- }
- };
-
- void onWindowIncrease(double win_current);
-
- void onWindowDecrease(double win_current);
-
- void onStart(){};
-
- void onDownloadFinished(){};
-
- private:
- int batching_param_;
- double avg_rtt_;
- double avg_win_;
- double win_change_;
- int max_packet_size_;
- double win_current_;
-};
-
-// Segment Estimator
-
-class ALaTcpEstimator : public IcnRateEstimator {
- public:
- ALaTcpEstimator();
-
- void onDataReceived(int packet_size);
- void onStart();
- void onDownloadFinished();
-
- private:
- double totalSize_;
-};
-
-// A Rate estimator, this one is the simplest: counting batching_param_ packets
-// and then divide the sum of the size of these packets by the time taken to DL
-// them. Should be the one used
-
-class SimpleEstimator : public IcnRateEstimator {
- public:
- SimpleEstimator(double alpha, int batching_param);
-
- void onRttUpdate(double rtt);
-
- void onDataReceived(int packet_size);
-
- void onWindowIncrease(double win_current){};
-
- void onWindowDecrease(double win_current){};
-
- void onStart();
-
- void onDownloadFinished();
-
- private:
- int batching_param_;
- double total_size_;
-};
-
-void *Timer(void *data);
-
-} // namespace protocol
-
-} // end namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/reassembly.cc b/libtransport/src/hicn/transport/protocols/reassembly.cc
deleted file mode 100644
index 9682d338d..000000000
--- a/libtransport/src/hicn/transport/protocols/reassembly.cc
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <hicn/transport/interfaces/socket_consumer.h>
-#include <hicn/transport/protocols/errors.h>
-#include <hicn/transport/protocols/indexer.h>
-#include <hicn/transport/protocols/reassembly.h>
-#include <hicn/transport/utils/array.h>
-#include <hicn/transport/utils/membuf.h>
-
-namespace transport {
-
-namespace protocol {
-
-void Reassembly::notifyApplication() {
- interface::ConsumerSocket::ReadCallback *read_callback = nullptr;
- reassembly_consumer_socket_->getSocketOption(
- interface::ConsumerCallbacksOptions::READ_CALLBACK, &read_callback);
-
- if (TRANSPORT_EXPECT_FALSE(!read_callback)) {
- TRANSPORT_LOGE("Read callback not installed!");
- return;
- }
-
- if (read_callback->isBufferMovable()) {
- // No need to perform an additional copy. The whole buffer will be
- // tranferred to the application.
-
- read_callback->readBufferAvailable(std::move(read_buffer_));
- read_buffer_ = utils::MemBuf::create(read_callback->maxBufferSize());
- } else {
- // The buffer will be copied into the application-provided buffer
- uint8_t *buffer;
- std::size_t length;
- std::size_t total_length = read_buffer_->length();
-
- while (read_buffer_->length()) {
- buffer = nullptr;
- length = 0;
- read_callback->getReadBuffer(&buffer, &length);
-
- if (!buffer || !length) {
- throw errors::RuntimeException(
- "Invalid buffer provided by the application.");
- }
-
- auto to_copy = std::min(read_buffer_->length(), length);
- std::memcpy(buffer, read_buffer_->data(), to_copy);
- read_buffer_->trimStart(to_copy);
- }
-
- read_callback->readDataAvailable(total_length);
- read_buffer_->clear();
- }
-}
-
-} // namespace protocol
-} // namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/reassembly.h b/libtransport/src/hicn/transport/protocols/reassembly.h
deleted file mode 100644
index 34af2a70a..000000000
--- a/libtransport/src/hicn/transport/protocols/reassembly.h
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <hicn/transport/core/facade.h>
-
-namespace transport {
-
-namespace interface {
-class ConsumerReadCallback;
-class ConsumerSocket;
-} // namespace interface
-
-namespace protocol {
-
-class TransportProtocol;
-class Indexer;
-
-// Forward Declaration
-class ManifestManager;
-
-class Reassembly {
- public:
- class ContentReassembledCallback {
- public:
- virtual void onContentReassembled(std::error_code ec) = 0;
- };
-
- Reassembly(interface::ConsumerSocket *icn_socket,
- TransportProtocol *transport_protocol)
- : reassembly_consumer_socket_(icn_socket),
- transport_protocol_(transport_protocol) {}
-
- virtual ~Reassembly() = default;
-
- virtual void reassemble(core::ContentObject::Ptr &&content_object) = 0;
- virtual void reassemble(
- std::unique_ptr<core::ContentObjectManifest> &&manifest) = 0;
- virtual void reInitialize() = 0;
- virtual void setIndexer(Indexer *indexer) { index_manager_ = indexer; }
-
- protected:
- virtual void notifyApplication();
-
- protected:
- interface::ConsumerSocket *reassembly_consumer_socket_;
- TransportProtocol *transport_protocol_;
- Indexer *index_manager_;
- std::unique_ptr<utils::MemBuf> read_buffer_;
-};
-
-} // namespace protocol
-
-} // end namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/rtc.cc b/libtransport/src/hicn/transport/protocols/rtc.cc
deleted file mode 100644
index fece95d03..000000000
--- a/libtransport/src/hicn/transport/protocols/rtc.cc
+++ /dev/null
@@ -1,1016 +0,0 @@
-/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <hicn/transport/protocols/rtc.h>
-
-#include <hicn/transport/interfaces/socket_consumer.h>
-
-#include <math.h>
-#include <random>
-
-namespace transport {
-
-namespace protocol {
-
-using namespace interface;
-
-RTCTransportProtocol::RTCTransportProtocol(
- interface::ConsumerSocket *icn_socket)
- : TransportProtocol(icn_socket, nullptr),
- DatagramReassembly(icn_socket, this),
- inflightInterests_(1 << default_values::log_2_default_buffer_size),
- modMask_((1 << default_values::log_2_default_buffer_size) - 1) {
- icn_socket->getSocketOption(PORTAL, portal_);
- rtx_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService());
- probe_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService());
- sentinel_timer_ =
- std::make_unique<asio::steady_timer>(portal_->getIoService());
- round_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService());
- reset();
-}
-
-RTCTransportProtocol::~RTCTransportProtocol() {
- if (is_running_) {
- stop();
- }
-}
-
-int RTCTransportProtocol::start() {
- if (is_running_) return -1;
-
- reset();
- is_first_ = true;
-
- probeRtt();
- sentinelTimer();
- newRound();
- scheduleNextInterests();
-
- is_first_ = false;
- is_running_ = true;
- portal_->runEventsLoop();
- is_running_ = false;
-
- return 0;
-}
-
-void RTCTransportProtocol::stop() {
- if (!is_running_) return;
-
- is_running_ = false;
- portal_->stopEventsLoop();
-}
-
-void RTCTransportProtocol::resume() {
- if (is_running_) return;
-
- is_running_ = true;
- inflightInterestsCount_ = 0;
-
- probeRtt();
- sentinelTimer();
- newRound();
- scheduleNextInterests();
-
- portal_->runEventsLoop();
- is_running_ = false;
-}
-
-// private
-void RTCTransportProtocol::reset() {
- portal_->setConsumerCallback(this);
- // controller var
- currentState_ = HICN_RTC_SYNC_STATE;
-
- // cwin var
- currentCWin_ = HICN_INITIAL_CWIN;
- maxCWin_ = HICN_INITIAL_CWIN_MAX;
-
- // names/packets var
- actualSegment_ = 0;
- inflightInterestsCount_ = 0;
- interestRetransmissions_.clear();
- lastSegNacked_ = 0;
- lastReceived_ = 0;
- lastReceivedTime_ = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
- lastEvent_ = lastReceivedTime_;
- highestReceived_ = 0;
- firstSequenceInRound_ = 0;
-
- rtx_timer_used_ = false;
- for (int i = 0; i < (1 << default_values::log_2_default_buffer_size); i++) {
- inflightInterests_[i] = {0};
- }
-
- // stats
- firstPckReceived_ = false;
- receivedBytes_ = 0;
- sentInterest_ = 0;
- receivedData_ = 0;
- packetLost_ = 0;
- lossRecovered_ = 0;
- avgPacketSize_ = HICN_INIT_PACKET_SIZE;
- gotNack_ = false;
- gotFutureNack_ = 0;
- rounds_ = 0;
- roundsWithoutNacks_ = 0;
- pathTable_.clear();
-
- // CC var
- estimatedBw_ = 0.0;
- lossRate_ = 0.0;
- queuingDelay_ = 0.0;
- protocolState_ = HICN_RTC_NORMAL_STATE;
-
- producerPathLabels_[0] = 0;
- producerPathLabels_[1] = 0;
- initied = false;
-
- socket_->setSocketOption(GeneralTransportOptions::INTEREST_LIFETIME,
- (uint32_t)HICN_RTC_INTEREST_LIFETIME);
- // XXX this should be done by the application
-}
-
-uint32_t max(uint32_t a, uint32_t b) {
- if (a > b)
- return a;
- else
- return b;
-}
-
-uint32_t min(uint32_t a, uint32_t b) {
- if (a < b)
- return a;
- else
- return b;
-}
-
-void RTCTransportProtocol::newRound() {
- round_timer_->expires_from_now(std::chrono::milliseconds(HICN_ROUND_LEN));
- round_timer_->async_wait([this](std::error_code ec) {
- if (ec) return;
- updateStats(HICN_ROUND_LEN);
- newRound();
- });
-}
-
-void RTCTransportProtocol::updateDelayStats(
- const ContentObject &content_object) {
- uint32_t segmentNumber = content_object.getName().getSuffix();
- uint32_t pkt = segmentNumber & modMask_;
-
- if (inflightInterests_[pkt].state != sent_) return;
-
- if (interestRetransmissions_.find(segmentNumber) !=
- interestRetransmissions_.end())
- // this packet was rtx at least once
- return;
-
- uint32_t pathLabel = content_object.getPathLabel();
-
- if (pathTable_.find(pathLabel) == pathTable_.end()) {
- // found a new path
- std::shared_ptr<RTCDataPath> newPath = std::make_shared<RTCDataPath>();
- pathTable_[pathLabel] = newPath;
- }
-
- // RTT measurements are useful both from NACKs and data packets
- uint64_t RTT = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count() -
- inflightInterests_[pkt].transmissionTime;
-
- pathTable_[pathLabel]->insertRttSample(RTT);
- auto payload = content_object.getPayload();
-
- // we collect OWD only for datapackets
- if (payload->length() != HICN_NACK_HEADER_SIZE) {
- uint64_t *senderTimeStamp = (uint64_t *)payload->data();
- int64_t OWD = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count() -
- *senderTimeStamp;
-
- pathTable_[pathLabel]->insertOwdSample(OWD);
- pathTable_[pathLabel]->computeInterArrivalGap(segmentNumber);
- } else {
- pathTable_[pathLabel]->receivedNack();
- }
-}
-
-void RTCTransportProtocol::updateStats(uint32_t round_duration) {
- if (pathTable_.empty()) return;
-
- if (receivedBytes_ != 0) {
- double bytesPerSec =
- (double)(receivedBytes_ *
- ((double)HICN_MILLI_IN_A_SEC / (double)round_duration));
- estimatedBw_ = (estimatedBw_ * HICN_ESTIMATED_BW_ALPHA) +
- ((1 - HICN_ESTIMATED_BW_ALPHA) * bytesPerSec);
- }
-
- uint64_t minRtt = UINT_MAX;
- uint64_t maxRtt = 0;
-
- for (auto it = pathTable_.begin(); it != pathTable_.end(); it++) {
- it->second->roundEnd();
- if (it->second->isActive()) {
- if (it->second->getMinRtt() < minRtt) {
- minRtt = it->second->getMinRtt();
- producerPathLabels_[0] = it->first;
- }
- if (it->second->getMinRtt() > maxRtt) {
- maxRtt = it->second->getMinRtt();
- producerPathLabels_[1] = it->first;
- }
- }
- }
-
- if (pathTable_.find(producerPathLabels_[0]) == pathTable_.end() ||
- pathTable_.find(producerPathLabels_[1]) == pathTable_.end())
- return; // this should not happen
-
- // as a queuing delay we keep the lowest one among the two paths
- // if one path is congested the forwarder should decide to do not
- // use it so it does not make sense to inform the application
- // that maybe we have a problem
- if (pathTable_[producerPathLabels_[0]]->getQueuingDealy() <
- pathTable_[producerPathLabels_[1]]->getQueuingDealy())
- queuingDelay_ = pathTable_[producerPathLabels_[0]]->getQueuingDealy();
- else
- queuingDelay_ = pathTable_[producerPathLabels_[1]]->getQueuingDealy();
-
- if (sentInterest_ != 0 && currentState_ == HICN_RTC_NORMAL_STATE) {
- uint32_t numberTheoricallyReceivedPackets_ =
- highestReceived_ - firstSequenceInRound_;
- double lossRate = 0;
- if (numberTheoricallyReceivedPackets_ != 0)
- lossRate = (double)((double)(packetLost_ - lossRecovered_) /
- (double)numberTheoricallyReceivedPackets_);
-
- if (lossRate < 0) lossRate = 0;
-
- if (initied) {
- lossRate_ = lossRate_ * HICN_ESTIMATED_LOSSES_ALPHA +
- (lossRate * (1 - HICN_ESTIMATED_LOSSES_ALPHA));
- } else {
- lossRate_ = lossRate;
- initied = true;
- }
- }
-
- if (avgPacketSize_ == 0) avgPacketSize_ = HICN_INIT_PACKET_SIZE;
-
- // for the BDP we use the max rtt, so that we calibrate the window on the
- // RTT of the slowest path. In this way we are sure that the window will
- // never be too small
- uint32_t BDP = (uint32_t)ceil(
- (estimatedBw_ *
- (double)((double)pathTable_[producerPathLabels_[1]]->getMinRtt() /
- (double)HICN_MILLI_IN_A_SEC) *
- HICN_BANDWIDTH_SLACK_FACTOR) /
- avgPacketSize_);
- uint32_t BW = (uint32_t)ceil(estimatedBw_);
- computeMaxWindow(BW, BDP);
-
- ConsumerTimerCallback *stats_callback = nullptr;
- socket_->getSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY,
- &stats_callback);
- if (*stats_callback) {
- // Send the stats to the app
- stats_->updateQueuingDelay(queuingDelay_);
- stats_->updateLossRatio(lossRate_);
- stats_->updateAverageRtt(pathTable_[producerPathLabels_[1]]->getMinRtt());
- (*stats_callback)(*socket_, *stats_);
- }
- // bound also by interest lifitime* production rate
- if (!gotNack_) {
- roundsWithoutNacks_++;
- if (currentState_ == HICN_RTC_SYNC_STATE &&
- roundsWithoutNacks_ >= HICN_ROUNDS_IN_SYNC_BEFORE_SWITCH) {
- currentState_ = HICN_RTC_NORMAL_STATE;
- }
- } else {
- roundsWithoutNacks_ = 0;
- }
-
- updateCCState();
- updateWindow();
-
- if (queuingDelay_ > 25.0) {
- // this indicates that the client will go soon out of synch,
- // switch to synch mode
- if (currentState_ == HICN_RTC_NORMAL_STATE) {
- currentState_ = HICN_RTC_SYNC_STATE;
- }
- computeMaxWindow(BW, 0);
- increaseWindow();
- }
-
- // in any case we reset all the counters
-
- gotNack_ = false;
- gotFutureNack_ = 0;
- receivedBytes_ = 0;
- sentInterest_ = 0;
- receivedData_ = 0;
- packetLost_ = 0;
- lossRecovered_ = 0;
- rounds_++;
- firstSequenceInRound_ = highestReceived_;
-}
-
-void RTCTransportProtocol::updateCCState() {
- // TODO
-}
-
-void RTCTransportProtocol::computeMaxWindow(uint32_t productionRate,
- uint32_t BDPWin) {
- if (productionRate ==
- 0) // we have no info about the producer, keep the previous maxCWin
- return;
-
- uint32_t interestLifetime = default_values::interest_lifetime;
- socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME,
- interestLifetime);
- uint32_t maxWaintingInterest = (uint32_t)ceil(
- (productionRate / avgPacketSize_) *
- (double)((double)(interestLifetime *
- HICN_INTEREST_LIFETIME_REDUCTION_FACTOR) /
- (double)HICN_MILLI_IN_A_SEC));
-
- if (currentState_ == HICN_RTC_SYNC_STATE) {
- // in this case we do not limit the window with the BDP, beacuse most
- // likely it is wrong
- maxCWin_ = maxWaintingInterest;
- return;
- }
-
- // currentState = RTC_NORMAL_STATE
- if (BDPWin != 0) {
- maxCWin_ = (uint32_t)ceil((double)BDPWin +
- (((double)BDPWin * 30.0) / 100.0)); // BDP + 30%
- } else {
- maxCWin_ = min(maxWaintingInterest, maxCWin_);
- }
-
- if (maxCWin_ < HICN_MIN_CWIN) maxCWin_ = HICN_MIN_CWIN;
-}
-
-void RTCTransportProtocol::updateWindow() {
- if (currentState_ == HICN_RTC_SYNC_STATE) return;
-
- if (currentCWin_ < maxCWin_ * 0.9) {
- currentCWin_ =
- min(maxCWin_, (uint32_t)(currentCWin_ * HICN_WIN_INCREASE_FACTOR));
- } else if (currentCWin_ > maxCWin_) {
- currentCWin_ =
- max((uint32_t)(currentCWin_ * HICN_WIN_DECREASE_FACTOR), HICN_MIN_CWIN);
- }
-}
-
-void RTCTransportProtocol::decreaseWindow() {
- // this is used only in SYNC mode
- if (currentState_ == HICN_RTC_NORMAL_STATE) return;
-
- if (gotFutureNack_ == 1)
- currentCWin_ = min((currentCWin_ - 1),
- (uint32_t)ceil((double)maxCWin_ * 0.66)); // 2/3
- else
- currentCWin_--;
-
- currentCWin_ = max(currentCWin_, HICN_MIN_CWIN);
-}
-
-void RTCTransportProtocol::increaseWindow() {
- // this is used only in SYNC mode
- if (currentState_ == HICN_RTC_NORMAL_STATE) return;
-
- // we need to be carefull to do not increase the window to much
- if (currentCWin_ < ((double)maxCWin_ * 0.7)) {
- currentCWin_ = currentCWin_ + 1; // exponential
- } else {
- currentCWin_ = min(
- maxCWin_,
- (uint32_t)ceil(currentCWin_ + (1.0 / (double)currentCWin_))); // linear
- }
-}
-
-void RTCTransportProtocol::probeRtt() {
- probe_timer_->expires_from_now(std::chrono::milliseconds(1000));
- probe_timer_->async_wait([this](std::error_code ec) {
- if (ec) return;
- probeRtt();
- });
-
- // To avoid sending the first probe, because the transport is not running yet
- if (is_first_ && !is_running_) return;
-
- time_sent_probe_ = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
-
- Name *interest_name = nullptr;
- socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
- &interest_name);
- // get a random numbe in the probe seq range
- std::default_random_engine eng((std::random_device())());
- std::uniform_int_distribution<uint32_t> idis(HICN_MIN_PROBE_SEQ,
- HICN_MAX_PROBE_SEQ);
- probe_seq_number_ = idis(eng);
- interest_name->setSuffix(probe_seq_number_);
-
- // we considere the probe as a rtx so that we do not incresea inFlightInt
- received_probe_ = false;
- TRANSPORT_LOGD("Send content interest %u (probeRtt)",
- interest_name->getSuffix());
- sendInterest(interest_name, true);
-}
-
-void RTCTransportProtocol::sendInterest(Name *interest_name, bool rtx) {
- auto interest = getPacket();
- interest->setName(*interest_name);
-
- uint32_t interestLifetime = default_values::interest_lifetime;
- socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME,
- interestLifetime);
- interest->setLifetime(uint32_t(interestLifetime));
-
- ConsumerInterestCallback *on_interest_output = nullptr;
-
- socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT,
- &on_interest_output);
-
- if (*on_interest_output) {
- (*on_interest_output)(*socket_, *interest);
- }
-
- if (TRANSPORT_EXPECT_FALSE(!is_running_ && !is_first_)) {
- return;
- }
-
- portal_->sendInterest(std::move(interest));
-
- sentInterest_++;
-
- if (!rtx) {
- packets_in_window_[interest_name->getSuffix()] = 0;
- inflightInterestsCount_++;
- }
-}
-
-void RTCTransportProtocol::scheduleNextInterests() {
- if (!is_running_ && !is_first_) return;
-
- TRANSPORT_LOGD("----- [window %u - inflight_interests %u = %d] -----",
- currentCWin_, inflightInterestsCount_,
- currentCWin_ - inflightInterestsCount_);
-
- while (inflightInterestsCount_ < currentCWin_) {
- Name *interest_name = nullptr;
- socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
- &interest_name);
-
- interest_name->setSuffix(actualSegment_);
-
- // if the producer socket is not stated (does not reply even with nacks)
- // we keep asking for something without marking anything as lost (see
- // timeout). In this way when the producer socket will start the
- // consumer socket will not miss any packet
- if (TRANSPORT_EXPECT_FALSE(!firstPckReceived_)) {
- uint32_t pkt = actualSegment_ & modMask_;
- inflightInterests_[pkt].state = sent_;
- inflightInterests_[pkt].sequence = actualSegment_;
- actualSegment_ = (actualSegment_ + 1) % HICN_MIN_PROBE_SEQ;
- TRANSPORT_LOGD(
- "Send content interest %u (scheduleNextInterests no replies)",
- interest_name->getSuffix());
- sendInterest(interest_name, false);
- return;
- }
-
- // we send the packet only if it is not pending yet
- // notice that this is not true for rtx packets
- if (portal_->interestIsPending(*interest_name)) {
- actualSegment_ = (actualSegment_ + 1) % HICN_MIN_PROBE_SEQ;
- continue;
- }
-
- uint32_t pkt = actualSegment_ & modMask_;
- // if we already reacevied the content we don't ask it again
- if (inflightInterests_[pkt].state == received_ &&
- inflightInterests_[pkt].sequence == actualSegment_) {
- actualSegment_ = (actualSegment_ + 1) % HICN_MIN_PROBE_SEQ;
- continue;
- }
-
- // same if the packet is lost
- if (inflightInterests_[pkt].state == lost_ &&
- inflightInterests_[pkt].sequence == actualSegment_) {
- actualSegment_ = (actualSegment_ + 1) % HICN_MIN_PROBE_SEQ;
- continue;
- }
-
- inflightInterests_[pkt].transmissionTime =
- std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
-
- // here the packet can be in any state except for lost or recevied
- inflightInterests_[pkt].state = sent_;
- inflightInterests_[pkt].sequence = actualSegment_;
- actualSegment_ = (actualSegment_ + 1) % HICN_MIN_PROBE_SEQ;
-
- TRANSPORT_LOGD("Send content interest %u (scheduleNextInterests)",
- interest_name->getSuffix());
- sendInterest(interest_name, false);
- }
-
- TRANSPORT_LOGD("----- end of scheduleNextInterest -----");
-}
-
-bool RTCTransportProtocol::verifyKeyPackets() {
- // Not yet implemented
- return false;
-}
-
-void RTCTransportProtocol::sentinelTimer() {
- uint32_t wait = 50;
-
- if (pathTable_.find(producerPathLabels_[0]) != pathTable_.end() &&
- pathTable_.find(producerPathLabels_[1]) != pathTable_.end()) {
- // we have all the info to set the timers
- wait = round(pathTable_[producerPathLabels_[0]]->getInterArrivalGap());
- if (wait == 0) wait = 1;
- }
-
- sentinel_timer_->expires_from_now(std::chrono::milliseconds(wait));
- sentinel_timer_->async_wait([this](std::error_code ec) {
- if (ec) return;
-
- uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
-
- if (pathTable_.find(producerPathLabels_[0]) == pathTable_.end() ||
- pathTable_.find(producerPathLabels_[1]) == pathTable_.end()) {
- // we have no info, so we send again
-
- for (auto it = packets_in_window_.begin(); it != packets_in_window_.end();
- it++) {
- uint32_t pkt = it->first & modMask_;
- if (inflightInterests_[pkt].sequence == it->first) {
- inflightInterests_[pkt].transmissionTime = now;
- Name *interest_name = nullptr;
- socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
- &interest_name);
- interest_name->setSuffix(it->first);
- it->second++;
- sendInterest(interest_name, true);
- }
- }
- } else {
- uint64_t max_waiting_time = // wait at least 50ms
- (pathTable_[producerPathLabels_[1]]->getMinRtt() -
- pathTable_[producerPathLabels_[0]]->getMinRtt()) +
- (ceil(pathTable_[producerPathLabels_[0]]->getInterArrivalGap()) * 50);
-
- if ((currentState_ == HICN_RTC_NORMAL_STATE) &&
- (inflightInterestsCount_ >= currentCWin_) &&
- ((now - lastEvent_) > max_waiting_time) && (lossRate_ >= 0.05)) {
- uint64_t RTT = pathTable_[producerPathLabels_[1]]->getMinRtt();
-
- for (auto it = packets_in_window_.begin();
- it != packets_in_window_.end(); it++) {
- uint32_t pkt = it->first & modMask_;
- if (inflightInterests_[pkt].sequence == it->first &&
- ((now - inflightInterests_[pkt].transmissionTime) >= RTT)) {
- inflightInterests_[pkt].transmissionTime = now;
- Name *interest_name = nullptr;
- socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
- &interest_name);
- interest_name->setSuffix(it->first);
- it->second++;
- sendInterest(interest_name, true);
- }
- }
- }
- }
-
- sentinelTimer();
- });
-}
-void RTCTransportProtocol::addRetransmissions(uint32_t val) {
- // add only val in the rtx list
- addRetransmissions(val, val + 1);
-}
-
-void RTCTransportProtocol::addRetransmissions(uint32_t start, uint32_t stop) {
- uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
-
- bool new_rtx = false;
- for (uint32_t i = start; i < stop; i++) {
- auto it = interestRetransmissions_.find(i);
- if (it == interestRetransmissions_.end()) {
- uint32_t pkt = i & modMask_;
- if (lastSegNacked_ <= i && inflightInterests_[pkt].state != received_) {
- // it must be larger than the last past nack received
- packetLost_++;
- interestRetransmissions_[i] = 0;
- uint32_t pkt = i & modMask_;
- // we reset the transmission time setting to now, so that rtx will
- // happne in one RTT on waint one inter arrival gap
- inflightInterests_[pkt].transmissionTime = now;
- new_rtx = true;
- }
- } // if the retransmission is already there the rtx timer will
- // take care of it
- }
-
- // in case a new rtx is added to the map we need to run checkRtx()
- if (new_rtx) {
- if (rtx_timer_used_) {
- // if a timer is pending we need to delete it
- rtx_timer_->cancel();
- rtx_timer_used_ = false;
- }
- checkRtx();
- }
-}
-
-uint64_t RTCTransportProtocol::retransmit() {
- auto it = interestRetransmissions_.begin();
-
- // cut len to max HICN_MAX_RTX_SIZE
- // since we use a map, the smaller (and so the older) sequence number are at
- // the beginnin of the map
- while (interestRetransmissions_.size() > HICN_MAX_RTX_SIZE) {
- it = interestRetransmissions_.erase(it);
- }
-
- it = interestRetransmissions_.begin();
- uint64_t smallest_timeout = ULONG_MAX;
- uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
-
- while (it != interestRetransmissions_.end()) {
- uint32_t pkt = it->first & modMask_;
-
- if (inflightInterests_[pkt].sequence != it->first) {
- // this packet is not anymore in the inflight buffer, erase it
- it = interestRetransmissions_.erase(it);
- continue;
- }
-
- // we retransmitted the packet too many times
- if (it->second >= HICN_MAX_RTX) {
- it = interestRetransmissions_.erase(it);
- continue;
- }
-
- // this packet is too old
- if ((lastReceived_ > it->first) &&
- (lastReceived_ - it->first) > HICN_MAX_RTX_MAX_AGE) {
- it = interestRetransmissions_.erase(it);
- continue;
- }
-
- uint64_t rtx_time = now;
-
- if (it->second == 0) {
- // first rtx
- if (producerPathLabels_[0] != producerPathLabels_[1]) {
- // multipath
- if (pathTable_.find(producerPathLabels_[0]) != pathTable_.end() &&
- pathTable_.find(producerPathLabels_[1]) != pathTable_.end() &&
- (pathTable_[producerPathLabels_[0]]->getInterArrivalGap() <
- HICN_MIN_INTER_ARRIVAL_GAP)) {
- rtx_time = lastReceivedTime_ +
- (pathTable_[producerPathLabels_[1]]->getMinRtt() -
- pathTable_[producerPathLabels_[0]]->getMinRtt()) +
- pathTable_[producerPathLabels_[0]]->getInterArrivalGap();
- } // else low rate producer, send it immediatly
- } else {
- // single path
- if (pathTable_.find(producerPathLabels_[0]) != pathTable_.end() &&
- (pathTable_[producerPathLabels_[0]]->getInterArrivalGap() <
- HICN_MIN_INTER_ARRIVAL_GAP)) {
- rtx_time = lastReceivedTime_ +
- pathTable_[producerPathLabels_[0]]->getInterArrivalGap();
- } // else low rate producer send immediatly
- }
- } else {
- // second or plus rtx, wait for the min rtt
- if (pathTable_.find(producerPathLabels_[0]) != pathTable_.end()) {
- uint64_t sent_time = inflightInterests_[pkt].transmissionTime;
- rtx_time = sent_time + pathTable_[producerPathLabels_[0]]->getMinRtt();
- } // if we don't have info we send it immediatly
- }
-
- if (now >= rtx_time) {
- inflightInterests_[pkt].transmissionTime = now;
- it->second++;
-
- Name *interest_name = nullptr;
- socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
- &interest_name);
- interest_name->setSuffix(it->first);
- TRANSPORT_LOGD("Send content interest %u (retransmit)",
- interest_name->getSuffix());
- sendInterest(interest_name, true);
- } else if (rtx_time < smallest_timeout) {
- smallest_timeout = rtx_time;
- }
-
- ++it;
- }
- return smallest_timeout;
-}
-
-void RTCTransportProtocol::checkRtx() {
- if (interestRetransmissions_.empty()) {
- rtx_timer_used_ = false;
- return;
- }
-
- uint64_t next_timeout = retransmit();
- uint64_t wait = 1;
- uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
- if (next_timeout != ULONG_MAX && now < next_timeout) {
- wait = next_timeout - now;
- }
- rtx_timer_used_ = true;
- rtx_timer_->expires_from_now(std::chrono::milliseconds(wait));
- rtx_timer_->async_wait([this](std::error_code ec) {
- if (ec) return;
- rtx_timer_used_ = false;
- checkRtx();
- });
-}
-
-void RTCTransportProtocol::onTimeout(Interest::Ptr &&interest) {
- uint32_t segmentNumber = interest->getName().getSuffix();
-
- if (segmentNumber >= HICN_MIN_PROBE_SEQ) {
- // this is a timeout on a probe, do nothing
- return;
- }
-
- uint32_t pkt = segmentNumber & modMask_;
-
- if (TRANSPORT_EXPECT_FALSE(!firstPckReceived_)) {
- // we do nothing, and we keep asking the same stuff over
- // and over until we get at least a packet
- inflightInterestsCount_--;
- lastEvent_ = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
- packets_in_window_.erase(segmentNumber);
- scheduleNextInterests();
- return;
- }
-
- if (inflightInterests_[pkt].state == sent_) {
- lastEvent_ = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
- packets_in_window_.erase(segmentNumber);
- inflightInterestsCount_--;
- }
-
- // check how many times we sent this packet
- auto it = interestRetransmissions_.find(segmentNumber);
- if (it != interestRetransmissions_.end() && it->second >= HICN_MAX_RTX) {
- inflightInterests_[pkt].state = lost_;
- }
-
- if (inflightInterests_[pkt].state == sent_) {
- inflightInterests_[pkt].state = timeout1_;
- } else if (inflightInterests_[pkt].state == timeout1_) {
- inflightInterests_[pkt].state = timeout2_;
- } else if (inflightInterests_[pkt].state == timeout2_) {
- inflightInterests_[pkt].state = lost_;
- }
-
- if (inflightInterests_[pkt].state == lost_) {
- interestRetransmissions_.erase(segmentNumber);
- } else {
- addRetransmissions(segmentNumber);
- }
-
- scheduleNextInterests();
-}
-
-bool RTCTransportProtocol::onNack(const ContentObject &content_object,
- bool rtx) {
- uint32_t *payload = (uint32_t *)content_object.getPayload()->data();
- uint32_t productionSeg = *payload;
- uint32_t productionRate = *(++payload);
- uint32_t nackSegment = content_object.getName().getSuffix();
-
- bool old_nack = false;
-
- // if we did not received anything between lastReceived_ + 1 and productionSeg
- // most likelly some packets got lost
- if (lastReceived_ != 0) {
- addRetransmissions(lastReceived_ + 1, productionSeg);
- }
-
- if (!rtx) {
- gotNack_ = true;
- // we synch the estimated production rate with the actual one
- estimatedBw_ = (double)productionRate;
- }
-
- if (productionSeg > nackSegment) {
- // we are asking for stuff produced in the past
- actualSegment_ = max(productionSeg, actualSegment_) % HICN_MIN_PROBE_SEQ;
-
- if (!rtx) {
- if (currentState_ == HICN_RTC_NORMAL_STATE) {
- currentState_ = HICN_RTC_SYNC_STATE;
- }
-
- computeMaxWindow(productionRate, 0);
- increaseWindow();
- }
-
- lastSegNacked_ = productionSeg;
- old_nack = true;
-
- } else if (productionSeg < nackSegment) {
- actualSegment_ = productionSeg % HICN_MIN_PROBE_SEQ;
-
- if (!rtx) {
- // we are asking stuff in the future
- gotFutureNack_++;
- computeMaxWindow(productionRate, 0);
- decreaseWindow();
-
- if (currentState_ == HICN_RTC_SYNC_STATE) {
- currentState_ = HICN_RTC_NORMAL_STATE;
- }
- }
- } else {
- // we are asking the right thing, but the producer is slow
- // keep doing the same until the packet is produced
- actualSegment_ = productionSeg % HICN_MIN_PROBE_SEQ;
- }
-
- return old_nack;
-}
-
-void RTCTransportProtocol::onContentObject(
- Interest::Ptr &&interest, ContentObject::Ptr &&content_object) {
- // as soon as we get a packet firstPckReceived_ will never be false
- firstPckReceived_ = true;
-
- auto payload = content_object->getPayload();
- uint32_t payload_size = (uint32_t)payload->length();
- uint32_t segmentNumber = content_object->getName().getSuffix();
- uint32_t pkt = segmentNumber & modMask_;
-
- ConsumerContentObjectCallback *callback_content_object = nullptr;
- socket_->getSocketOption(ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT,
- &callback_content_object);
- if (*callback_content_object) {
- (*callback_content_object)(*socket_, *content_object);
- }
-
- if (segmentNumber >= HICN_MIN_PROBE_SEQ) {
- TRANSPORT_LOGD("Received probe %u", segmentNumber);
- if (segmentNumber == probe_seq_number_ && !received_probe_) {
- received_probe_ = true;
-
- uint32_t pathLabel = content_object->getPathLabel();
- if (pathTable_.find(pathLabel) == pathTable_.end()) {
- std::shared_ptr<RTCDataPath> newPath = std::make_shared<RTCDataPath>();
- pathTable_[pathLabel] = newPath;
- }
-
- // this is the expected probe, update the RTT and drop the packet
- uint64_t RTT = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count() -
- time_sent_probe_;
-
- pathTable_[pathLabel]->insertRttSample(RTT);
- pathTable_[pathLabel]->receivedNack();
- }
- return;
- }
-
- // check if the packet is a rtx
- bool is_rtx = false;
- if (interestRetransmissions_.find(segmentNumber) !=
- interestRetransmissions_.end()) {
- is_rtx = true;
- } else {
- auto it_win = packets_in_window_.find(segmentNumber);
- if (it_win != packets_in_window_.end() && it_win->second != 0)
- is_rtx = true;
- }
-
- if (payload_size == HICN_NACK_HEADER_SIZE) {
- TRANSPORT_LOGD("Received nack %u", segmentNumber);
-
- if (inflightInterests_[pkt].state == sent_) {
- lastEvent_ = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
- packets_in_window_.erase(segmentNumber);
- inflightInterestsCount_--;
- }
-
- bool old_nack = false;
-
- if (!is_rtx) {
- // this is not a retransmitted packet
- old_nack = onNack(*content_object, false);
- updateDelayStats(*content_object);
- } else {
- old_nack = onNack(*content_object, true);
- }
-
- // the nacked_ state is used only to avoid to decrease
- // inflightInterestsCount_ multiple times. In fact, every time that we
- // receive an event related to an interest (timeout, nacked, content) we
- // cange the state. In this way we are sure that we do not decrease twice
- // the counter
- if (old_nack) {
- inflightInterests_[pkt].state = lost_;
- interestRetransmissions_.erase(segmentNumber);
- } else {
- inflightInterests_[pkt].state = nacked_;
- }
-
- } else {
- TRANSPORT_LOGD("Received content %u", segmentNumber);
-
- avgPacketSize_ = (HICN_ESTIMATED_PACKET_SIZE * avgPacketSize_) +
- ((1 - HICN_ESTIMATED_PACKET_SIZE) * payload->length());
-
- receivedBytes_ += (uint32_t)(content_object->headerSize() +
- content_object->payloadSize());
-
- if (inflightInterests_[pkt].state == sent_) {
- lastEvent_ = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
- packets_in_window_.erase(segmentNumber);
- inflightInterestsCount_--; // packet sent without timeouts
- }
-
- if (inflightInterests_[pkt].state == sent_ && !is_rtx) {
- // delay stats are computed only for non retransmitted data
- updateDelayStats(*content_object);
- }
-
- addRetransmissions(lastReceived_ + 1, segmentNumber);
- if (segmentNumber > highestReceived_) {
- highestReceived_ = segmentNumber;
- }
- if (segmentNumber > lastReceived_) {
- lastReceived_ = segmentNumber;
- lastReceivedTime_ =
- std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
- }
- receivedData_++;
- inflightInterests_[pkt].state = received_;
-
- auto it = interestRetransmissions_.find(segmentNumber);
- if (it != interestRetransmissions_.end()) lossRecovered_++;
-
- interestRetransmissions_.erase(segmentNumber);
-
- reassemble(std::move(content_object));
- increaseWindow();
- }
-
- scheduleNextInterests();
-}
-
-} // end namespace protocol
-
-} // end namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/rtc.h b/libtransport/src/hicn/transport/protocols/rtc.h
deleted file mode 100644
index f34afbb5f..000000000
--- a/libtransport/src/hicn/transport/protocols/rtc.h
+++ /dev/null
@@ -1,227 +0,0 @@
-/*
- * Copyright (c) 2017-2019 Cisco and/or its affiTC_SYNC_STATE
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <map>
-#include <queue>
-#include <unordered_map>
-
-#include <hicn/transport/protocols/datagram_reassembly.h>
-#include <hicn/transport/protocols/protocol.h>
-#include <hicn/transport/protocols/rtc_data_path.h>
-
-// algorithm state
-#define HICN_RTC_SYNC_STATE 0
-#define HICN_RTC_NORMAL_STATE 1
-#define HICN_ROUNDS_IN_SYNC_BEFORE_SWITCH 3
-
-// packet constants
-#define HICN_INIT_PACKET_SIZE 1300 // bytes
-#define HICN_PACKET_HEADER_SIZE 60 // bytes ipv6+tcp
-#define HICN_NACK_HEADER_SIZE 8 // bytes
-#define HICN_TIMESTAMP_SIZE 8 // bytes
-#define HICN_RTC_INTEREST_LIFETIME 1000 // ms
-
-// rtt measurement
-// normal interests for data goes from 0 to
-// HICN_MIN_PROBE_SEQ, the rest is reserverd for
-// probes
-#define HICN_MIN_PROBE_SEQ 0xefffffff
-#define HICN_MAX_PROBE_SEQ 0xffffffff
-
-// controller constant
-#define HICN_ROUND_LEN \
- 200 // ms interval of time on which
- // we take decisions / measurements
-#define HICN_MAX_RTX 10
-#define HICN_MAX_RTX_SIZE 1024
-#define HICN_MAX_RTX_MAX_AGE 10000
-#define HICN_MIN_RTT_WIN 30 // rounds
-#define HICN_MIN_INTER_ARRIVAL_GAP 100 // ms
-
-// cwin
-#define HICN_INITIAL_CWIN 1 // packets
-#define HICN_INITIAL_CWIN_MAX 100000 // packets
-#define HICN_MIN_CWIN 10 // packets
-#define HICN_WIN_INCREASE_FACTOR 1.5
-#define HICN_WIN_DECREASE_FACTOR 0.9
-
-// statistics constants
-#define HICN_BANDWIDTH_SLACK_FACTOR 1.8
-#define HICN_ESTIMATED_BW_ALPHA 0.7
-#define HICN_ESTIMATED_PACKET_SIZE 0.7
-#define HICN_ESTIMATED_LOSSES_ALPHA 0.8
-#define HICN_INTEREST_LIFETIME_REDUCTION_FACTOR 0.8
-
-// other constants
-#define HICN_NANO_IN_A_SEC 1000000000
-#define HICN_MICRO_IN_A_SEC 1000000
-#define HICN_MILLI_IN_A_SEC 1000
-
-namespace transport {
-
-namespace protocol {
-
-enum packetState { sent_, nacked_, received_, timeout1_, timeout2_, lost_ };
-
-typedef enum packetState packetState_t;
-
-struct sentInterest {
- uint64_t transmissionTime;
- uint32_t sequence; // sequence number of the interest sent
- // to handle seq % buffer_size
- packetState_t state; // see packet state
-};
-
-class RTCTransportProtocol : public TransportProtocol,
- public DatagramReassembly {
- public:
- RTCTransportProtocol(interface::ConsumerSocket *icnet_socket);
-
- ~RTCTransportProtocol();
-
- int start() override;
-
- void stop() override;
-
- void resume() override;
-
- bool verifyKeyPackets() override;
-
- private:
- // algo functions
- void reset() override;
-
- // CC functions
- void updateDelayStats(const ContentObject &content_object);
- void updateStats(uint32_t round_duration);
- void updateCCState();
- void computeMaxWindow(uint32_t productionRate, uint32_t BDPWin);
- void updateWindow();
- void decreaseWindow();
- void increaseWindow();
- void resetPreviousWindow();
-
- // packet functions
- void sendInterest(Name *interest_name, bool rtx);
- void scheduleNextInterests() override;
- void sentinelTimer();
- void addRetransmissions(uint32_t val);
- void addRetransmissions(uint32_t start, uint32_t stop);
- uint64_t retransmit();
- void checkRtx();
- void probeRtt();
- void newRound();
- void onTimeout(Interest::Ptr &&interest) override;
- bool onNack(const ContentObject &content_object, bool rtx);
- void onContentObject(Interest::Ptr &&interest,
- ContentObject::Ptr &&content_object) override;
- void onPacketDropped(Interest::Ptr &&interest,
- ContentObject::Ptr &&content_object) override {}
- void onReassemblyFailed(std::uint32_t missing_segment) override {}
-
- TRANSPORT_ALWAYS_INLINE virtual void reassemble(
- ContentObject::Ptr &&content_object) override {
- auto read_buffer = content_object->getPayload();
- read_buffer->trimStart(HICN_TIMESTAMP_SIZE);
- Reassembly::read_buffer_ = std::move(read_buffer);
- Reassembly::notifyApplication();
- }
-
- // controller var
- std::unique_ptr<asio::steady_timer> round_timer_;
- unsigned currentState_;
-
- // cwin var
- uint32_t currentCWin_;
- uint32_t maxCWin_;
-
- // names/packets var
- uint32_t actualSegment_;
- uint32_t inflightInterestsCount_;
- // map seq to rtx
- std::map<uint32_t, uint8_t> interestRetransmissions_;
- bool rtx_timer_used_;
- std::unique_ptr<asio::steady_timer> rtx_timer_;
- std::vector<sentInterest> inflightInterests_;
- uint32_t lastSegNacked_; // indicates the segment id in the last received
- // past Nack. we do not ask for retransmissions
- // for samething that is older than this value.
- uint32_t lastReceived_; // segment of the last content object received
- // indicates the base of the window on the client
- uint64_t lastReceivedTime_; // time at which we recevied the
- // lastReceived_ packet
-
- // sentinel
- // if all packets in the window get lost we need something that
- // wakes up our consumer socket. Interest timeouts set to 1 sec
- // expire too late. This timers expire much sooner and if it
- // detects that all the interest in the window may be lost
- // it sends all of them again
- std::unique_ptr<asio::steady_timer> sentinel_timer_;
- uint64_t lastEvent_; // time at which we removed a pending
- // interest from the window
- std::unordered_map<uint32_t, uint8_t> packets_in_window_;
-
- // rtt probes
- // the RTC transport tends to overestimate the RTT
- // du to the production time on the server side
- // once per second we send an interest for wich we know
- // we will get a nack. This nack will keep our estimation
- // close to the reality
- std::unique_ptr<asio::steady_timer> probe_timer_;
- uint64_t time_sent_probe_;
- uint32_t probe_seq_number_;
- bool received_probe_;
-
- uint32_t modMask_;
-
- // stats
- bool firstPckReceived_;
- uint32_t receivedBytes_;
- uint32_t sentInterest_;
- uint32_t receivedData_;
- int32_t packetLost_;
- int32_t lossRecovered_;
- uint32_t firstSequenceInRound_;
- uint32_t highestReceived_;
- double avgPacketSize_;
- bool gotNack_;
- uint32_t gotFutureNack_;
- uint32_t rounds_;
- uint32_t roundsWithoutNacks_;
-
- // we keep track of up two paths (if only one path is in use
- // the two values in the vector will be the same)
- // position 0 stores the path with minRTT
- // position 1 stores the path with maxRTT
- uint32_t producerPathLabels_[2];
-
- std::unordered_map<uint32_t, std::shared_ptr<RTCDataPath>> pathTable_;
- uint32_t roundCounter_;
-
- // CC var
- double estimatedBw_;
- double lossRate_;
- double queuingDelay_;
- unsigned protocolState_;
-
- bool initied;
-};
-
-} // namespace protocol
-
-} // namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/rtc_data_path.cc b/libtransport/src/hicn/transport/protocols/rtc_data_path.cc
deleted file mode 100644
index 0cbff0e3c..000000000
--- a/libtransport/src/hicn/transport/protocols/rtc_data_path.cc
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <hicn/transport/protocols/rtc_data_path.h>
-#include <chrono>
-#include <cfloat>
-
-#define MAX_ROUNDS_WITHOUT_PKTS 10 //2sec
-
-namespace transport {
-
-namespace protocol {
-
-RTCDataPath::RTCDataPath()
- : min_rtt(UINT_MAX),
- prev_min_rtt(UINT_MAX),
- min_owd(INT_MAX), // this is computed like in LEDBAT, so it is not the
- // real OWD, but the measured one, that depends on the
- // clock of sender and receiver. the only meaningful
- // value is is the queueing delay. for this reason we
- // keep both RTT (for the windowd calculation) and OWD
- // (for congestion/quality control)
- prev_min_owd(INT_MAX),
- avg_owd(0.0),
- queuing_delay(DBL_MAX),
- lastRecvSeq_(0),
- lastRecvTime_(0),
- avg_inter_arrival_(DBL_MAX),
- received_nacks_(false),
- received_packets_(false),
- rounds_without_packets_(0),
- RTThistory_(HISTORY_LEN),
- OWDhistory_(HISTORY_LEN){};
-
-void RTCDataPath::insertRttSample(uint64_t rtt) {
- // for the rtt we only keep track of the min one
- if (rtt < min_rtt) min_rtt = rtt;
-}
-
-void RTCDataPath::insertOwdSample(int64_t owd) {
- // for owd we use both min and avg
- if (owd < min_owd) min_owd = owd;
-
- if(avg_owd != DBL_MAX)
- avg_owd = (avg_owd * (1 - ALPHA_RTC)) + (owd * ALPHA_RTC);
- else {
- avg_owd = owd;
- }
-
- //owd is computed only for valid data packets so we count only
- //this for decide if we recevie traffic or not
- received_packets_ = true;
-}
-
-void RTCDataPath::computeInterArrivalGap(uint32_t segmentNumber){
-
- //got packet in sequence, compute gap
- if(lastRecvSeq_ == (segmentNumber - 1)){
- uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
- uint64_t delta = now - lastRecvTime_;
- lastRecvSeq_ = segmentNumber;
- lastRecvTime_ = now;
- if(avg_inter_arrival_ == DBL_MAX)
- avg_inter_arrival_ = delta;
- else
- avg_inter_arrival_ = (avg_inter_arrival_ * (1 -ALPHA_RTC))
- + (delta * ALPHA_RTC);
- return;
- }
-
- //ooo packet, update the stasts if needed
- if(lastRecvSeq_ <= segmentNumber){
- lastRecvSeq_ = segmentNumber;
- lastRecvTime_ = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
- }
-}
-
-void RTCDataPath::receivedNack(){
- received_nacks_ = true;
-}
-
-double RTCDataPath::getInterArrivalGap(){
- if(avg_inter_arrival_ == DBL_MAX)
- return 0;
- return avg_inter_arrival_;
-}
-
-bool RTCDataPath::isActive(){
- if(received_nacks_ && rounds_without_packets_ < MAX_ROUNDS_WITHOUT_PKTS)
- return true;
- return false;
-}
-
-void RTCDataPath::roundEnd() {
- // reset min_rtt and add it to the history
- if (min_rtt != UINT_MAX) {
- prev_min_rtt = min_rtt;
- } else {
- // this may happen if we do not receive any packet
- // from this path in the last round. in this case
- // we use the measure from the previuos round
- min_rtt = prev_min_rtt;
- }
-
- if(min_rtt == 0)
- min_rtt = 1;
-
- RTThistory_.pushBack(min_rtt);
- min_rtt = UINT_MAX;
-
- // do the same for min owd
- if (min_owd != INT_MAX) {
- prev_min_owd = min_owd;
- } else {
- min_owd = prev_min_owd;
- }
-
- if (min_owd != INT_MAX) {
- OWDhistory_.pushBack(min_owd);
- min_owd = INT_MAX;
-
- // compute queuing delay
- queuing_delay = avg_owd - getMinOwd();
-
- } else {
- queuing_delay = 0.0;
- }
-
- if(!received_packets_)
- rounds_without_packets_++;
- else
- rounds_without_packets_ = 0;
- received_packets_ = false;
-}
-
-double RTCDataPath::getQueuingDealy() { return queuing_delay; }
-
-uint64_t RTCDataPath::getMinRtt() { return RTThistory_.begin(); }
-
-int64_t RTCDataPath::getMinOwd() { return OWDhistory_.begin(); }
-
-} // end namespace protocol
-
-} // end namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/rtc_data_path.h b/libtransport/src/hicn/transport/protocols/rtc_data_path.h
deleted file mode 100644
index 48a67c525..000000000
--- a/libtransport/src/hicn/transport/protocols/rtc_data_path.h
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <hicn/transport/utils/min_filter.h>
-#include <stdint.h>
-#include <climits>
-
-#define ALPHA_RTC 0.125
-#define HISTORY_LEN 20 //4 sec
-
-namespace transport {
-
-namespace protocol {
-
-class RTCDataPath {
- public:
- RTCDataPath();
-
- public:
- void insertRttSample(uint64_t rtt);
- void insertOwdSample(int64_t owd);
- void computeInterArrivalGap(uint32_t segmentNumber);
- void receivedNack();
-
- uint64_t getMinRtt();
- double getQueuingDealy();
- double getInterArrivalGap();
- bool isActive();
-
- void roundEnd();
-
- private:
- int64_t getMinOwd();
-
- uint64_t min_rtt;
- uint64_t prev_min_rtt;
-
- int64_t min_owd;
- int64_t prev_min_owd;
-
- double avg_owd;
-
- double queuing_delay;
-
- uint32_t lastRecvSeq_;
- uint64_t lastRecvTime_;
- double avg_inter_arrival_;
-
- //flags to check if a path is active
- //we considere a path active if it reaches a producer
- //(not a cache) --aka we got at least one nack on this path--
- //and if we receives packets
- bool received_nacks_;
- bool received_packets_;
- uint8_t rounds_without_packets_; //if we don't get any packet
- //for MAX_ROUNDS_WITHOUT_PKTS
- //we consider the path inactive
-
- utils::MinFilter<uint64_t> RTThistory_;
- utils::MinFilter<int64_t> OWDhistory_;
-};
-
-} // namespace protocol
-
-} // end namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/statistics.h b/libtransport/src/hicn/transport/protocols/statistics.h
deleted file mode 100644
index c92940ab4..000000000
--- a/libtransport/src/hicn/transport/protocols/statistics.h
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <hicn/transport/portability/c_portability.h>
-
-#include <cstdint>
-
-namespace transport {
-
-namespace protocol {
-
-class TransportStatistics {
- static constexpr double default_alpha = 0.7;
-
- public:
- TransportStatistics(double alpha = default_alpha)
- : retx_count_(0),
- bytes_received_(0),
- average_rtt_(0),
- avg_window_size_(0),
- interest_tx_(0),
- alpha_(alpha),
- loss_ratio_(0.0),
- queuing_delay_(0.0) {}
-
- TRANSPORT_ALWAYS_INLINE void updateRetxCount(uint64_t retx) {
- retx_count_ += retx;
- }
-
- TRANSPORT_ALWAYS_INLINE void updateBytesRecv(uint64_t bytes) {
- bytes_received_ += bytes;
- }
-
- TRANSPORT_ALWAYS_INLINE void updateAverageRtt(uint64_t rtt) {
- average_rtt_ = (alpha_ * average_rtt_) + ((1. - alpha_) * double(rtt));
- }
-
- TRANSPORT_ALWAYS_INLINE void updateAverageWindowSize(double current_window) {
- avg_window_size_ =
- (alpha_ * avg_window_size_) + ((1. - alpha_) * current_window);
- }
-
- TRANSPORT_ALWAYS_INLINE void updateInterestTx(uint64_t int_tx) {
- interest_tx_ += int_tx;
- }
-
- TRANSPORT_ALWAYS_INLINE void updateLossRatio(double loss_ratio) {
- loss_ratio_ = loss_ratio;
- }
-
- TRANSPORT_ALWAYS_INLINE void updateQueuingDelay(double queuing_delay) {
- queuing_delay_ = queuing_delay;
- }
-
- TRANSPORT_ALWAYS_INLINE uint64_t getRetxCount() const { return retx_count_; }
-
- TRANSPORT_ALWAYS_INLINE uint64_t getBytesRecv() const {
- return bytes_received_;
- }
-
- TRANSPORT_ALWAYS_INLINE double getAverageRtt() const { return average_rtt_; }
-
- TRANSPORT_ALWAYS_INLINE double getAverageWindowSize() const {
- return avg_window_size_;
- }
-
- TRANSPORT_ALWAYS_INLINE uint64_t getInterestTx() const {
- return interest_tx_;
- }
-
- TRANSPORT_ALWAYS_INLINE double getLossRatio() const { return loss_ratio_; }
-
- TRANSPORT_ALWAYS_INLINE double getQueuingDelay() const {
- return queuing_delay_;
- }
-
- TRANSPORT_ALWAYS_INLINE void reset() {
- retx_count_ = 0;
- bytes_received_ = 0;
- average_rtt_ = 0;
- avg_window_size_ = 0;
- interest_tx_ = 0;
- loss_ratio_ = 0;
- }
-
- private:
- uint64_t retx_count_;
- uint64_t bytes_received_;
- double average_rtt_;
- double avg_window_size_;
- uint64_t interest_tx_;
- double alpha_;
- double loss_ratio_;
- double queuing_delay_;
-};
-
-} // end namespace protocol
-
-} // end namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/test/CMakeLists.txt b/libtransport/src/hicn/transport/protocols/test/CMakeLists.txt
deleted file mode 100644
index 6f9fdb9aa..000000000
--- a/libtransport/src/hicn/transport/protocols/test/CMakeLists.txt
+++ /dev/null
@@ -1,10 +0,0 @@
-# Enable gcov output for the tests
-add_definitions(--coverage)
-set(CMAKE_EXE_LINKER_FLAGS ${CMAKE_EXE_LINKER_FLAGS} " --coverage")
-
-set(TestsExpectedToPass
- test_transport_producer)
-
-foreach(test ${TestsExpectedToPass})
- AddTest(${test})
-endforeach() \ No newline at end of file
diff --git a/libtransport/src/hicn/transport/protocols/test/test_transport_producer.cc b/libtransport/src/hicn/transport/protocols/test/test_transport_producer.cc
deleted file mode 100644
index 204f2cbe2..000000000
--- a/libtransport/src/hicn/transport/protocols/test/test_transport_producer.cc
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <gtest/gtest.h>
-
-#include "../socket_producer.h"
-#include "literals.h"
-
-#include <test.h>
-#include <random>
-
-namespace transport {
-
-namespace protocol {
-
-namespace {
-// The fixture for testing class Foo.
-class ProducerTest : public ::testing::Test {
- protected:
- ProducerTest() : name_("b001::123|321"), producer_(io_service_) {
- // You can do set-up work for each test here.
- }
-
- virtual ~ProducerTest() {
- // You can do clean-up work that doesn't throw exceptions here.
- }
-
- // If the constructor and destructor are not enough for setting up
- // and cleaning up each test, you can define the following methods:
-
- virtual void SetUp() {
- // Code here will be called immediately after the constructor (right
- // before each test).
- }
-
- virtual void TearDown() {
- // Code here will be called immediately after each test (right
- // before the destructor).
- }
-
- Name name_;
- asio::io_service io_service_;
- ProducerSocket producer_;
-};
-
-} // namespace
-
-// Tests that the Foo::Bar() method does Abc.
-TEST_F(ProducerTest, ProduceContent) {
- std::string content(250000, '?');
-
- producer_.registerPrefix(Prefix("b001::/64"));
- producer_.produce(name_, reinterpret_cast<const uint8_t *>(content.data()),
- content.size(), true);
- producer_.setSocketOption(GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME,
- 500000000_U32);
- producer_.attach();
- producer_.serveForever();
-}
-
-} // namespace protocol
-
-} // namespace transport
-
-int main(int argc, char **argv) {
- ::testing::InitGoogleTest(&argc, argv);
- return RUN_ALL_TESTS();
-} \ No newline at end of file
diff --git a/libtransport/src/hicn/transport/protocols/verification_manager.cc b/libtransport/src/hicn/transport/protocols/verification_manager.cc
deleted file mode 100644
index 74faf0521..000000000
--- a/libtransport/src/hicn/transport/protocols/verification_manager.cc
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <hicn/transport/interfaces/socket_consumer.h>
-#include <hicn/transport/protocols/verification_manager.h>
-
-namespace transport {
-
-namespace protocol {
-
-interface::VerificationPolicy SignatureVerificationManager::onPacketToVerify(
- const Packet& packet) {
- using namespace interface;
-
- bool verify_signature = false, key_content = false;
- VerificationPolicy ret = VerificationPolicy::DROP_PACKET;
-
- icn_socket_->getSocketOption(GeneralTransportOptions::VERIFY_SIGNATURE,
- verify_signature);
- icn_socket_->getSocketOption(GeneralTransportOptions::KEY_CONTENT,
- key_content);
-
- if (!verify_signature) {
- return VerificationPolicy::ACCEPT_PACKET;
- }
-
- if (key_content) {
- key_packets_.push(copyPacket(packet));
- return VerificationPolicy::ACCEPT_PACKET;
- } else if (!key_packets_.empty()) {
- std::queue<ContentObjectPtr>().swap(key_packets_);
- }
-
- ConsumerContentObjectVerificationFailedCallback*
- verification_failed_callback = VOID_HANDLER;
- icn_socket_->getSocketOption(ConsumerCallbacksOptions::VERIFICATION_FAILED,
- &verification_failed_callback);
-
- if (!verification_failed_callback) {
- throw errors::RuntimeException(
- "No verification failed callback provided by application. "
- "Aborting.");
- }
-
- std::shared_ptr<utils::Verifier> verifier;
- icn_socket_->getSocketOption(GeneralTransportOptions::VERIFIER, verifier);
-
- if (TRANSPORT_EXPECT_FALSE(!verifier)) {
- ret = (*verification_failed_callback)(
- *icn_socket_, dynamic_cast<const ContentObject&>(packet),
- make_error_code(protocol_error::no_verifier_provided));
- return ret;
- }
-
- if (!verifier->verify(packet)) {
- ret = (*verification_failed_callback)(
- *icn_socket_, dynamic_cast<const ContentObject&>(packet),
- make_error_code(protocol_error::signature_verification_failed));
- } else {
- ret = VerificationPolicy::ACCEPT_PACKET;
- }
-
- return ret;
-}
-
-bool SignatureVerificationManager::onKeyToVerify() {
- if (TRANSPORT_EXPECT_FALSE(key_packets_.empty())) {
- throw errors::RuntimeException("No key to verify.");
- }
-
- while (!key_packets_.empty()) {
- ContentObjectPtr packet_to_verify = key_packets_.front();
- key_packets_.pop();
- if (onPacketToVerify(*packet_to_verify) !=
- VerificationPolicy::ACCEPT_PACKET)
- return false;
- }
-
- return true;
-}
-
-} // end namespace protocol
-
-} // end namespace transport
diff --git a/libtransport/src/hicn/transport/protocols/verification_manager.h b/libtransport/src/hicn/transport/protocols/verification_manager.h
deleted file mode 100644
index 293e8103a..000000000
--- a/libtransport/src/hicn/transport/protocols/verification_manager.h
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Copyright (c) 2017-2019 Cisco and/or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <hicn/transport/interfaces/callbacks.h>
-#include <hicn/transport/interfaces/verification_policy.h>
-#include <hicn/transport/protocols/errors.h>
-
-namespace transport {
-
-namespace interface {
-class ConsumerSocket;
-}
-
-namespace protocol {
-
-using Packet = core::Packet;
-using interface::ConsumerSocket;
-using interface::VerificationPolicy;
-using ContentObjectPtr = std::shared_ptr<core::ContentObject>;
-
-class VerificationManager {
- public:
- virtual ~VerificationManager() = default;
- virtual VerificationPolicy onPacketToVerify(const Packet& packet) = 0;
- virtual bool onKeyToVerify() { return false; }
-};
-
-class SignatureVerificationManager : public VerificationManager {
- public:
- SignatureVerificationManager(interface::ConsumerSocket* icn_socket)
- : icn_socket_(icn_socket), key_packets_() {}
-
- interface::VerificationPolicy onPacketToVerify(const Packet& packet) override;
- bool onKeyToVerify() override;
-
- private:
- ConsumerSocket* icn_socket_;
- std::queue<ContentObjectPtr> key_packets_;
-
- ContentObjectPtr copyPacket(const Packet& packet) {
- std::shared_ptr<utils::MemBuf> packet_copy =
- packet.acquireMemBufReference();
- ContentObjectPtr content_object_copy =
- std::make_shared<core::ContentObject>(std::move(packet_copy));
- std::unique_ptr<utils::MemBuf> payload_copy = packet.getPayload();
- content_object_copy->appendPayload(std::move(payload_copy));
- return content_object_copy;
- }
-};
-
-} // end namespace protocol
-
-} // end namespace transport