From f4433f28b509a9f67ca85d79000ccf9c2f4b7a24 Mon Sep 17 00:00:00 2001 From: Mauro Sardara Date: Fri, 21 Feb 2020 11:52:28 +0100 Subject: [HICN-534] Major rework on libtransport organization Change-Id: I361b83a18b4fd59be136d5f0817fc28e17e89884 Signed-off-by: Mauro Sardara --- .../src/hicn/transport/protocols/CMakeLists.txt | 77 -- .../transport/protocols/byte_stream_reassembly.cc | 121 --- .../transport/protocols/byte_stream_reassembly.h | 54 -- libtransport/src/hicn/transport/protocols/cbr.cc | 51 - libtransport/src/hicn/transport/protocols/cbr.h | 40 - .../protocols/congestion_window_protocol.h | 30 - .../src/hicn/transport/protocols/consumer.conf | 21 - .../transport/protocols/data_processing_events.h | 33 - .../transport/protocols/datagram_reassembly.cc | 35 - .../hicn/transport/protocols/datagram_reassembly.h | 39 - .../hicn/transport/protocols/download_observer.h | 32 - .../src/hicn/transport/protocols/errors.cc | 60 -- libtransport/src/hicn/transport/protocols/errors.h | 91 -- .../transport/protocols/incremental_indexer.cc | 52 - .../hicn/transport/protocols/incremental_indexer.h | 143 --- .../src/hicn/transport/protocols/indexer.cc | 78 -- .../src/hicn/transport/protocols/indexer.h | 106 -- .../protocols/manifest_incremental_indexer.cc | 232 ----- .../protocols/manifest_incremental_indexer.h | 91 -- .../protocols/manifest_indexing_manager.cc | 297 ------ .../src/hicn/transport/protocols/packet_manager.h | 67 -- .../src/hicn/transport/protocols/protocol.cc | 105 -- .../src/hicn/transport/protocols/protocol.h | 91 -- libtransport/src/hicn/transport/protocols/raaqm.cc | 712 -------------- libtransport/src/hicn/transport/protocols/raaqm.h | 141 --- .../hicn/transport/protocols/raaqm_data_path.cc | 157 --- .../src/hicn/transport/protocols/raaqm_data_path.h | 225 ----- .../hicn/transport/protocols/rate_estimation.cc | 355 ------- .../src/hicn/transport/protocols/rate_estimation.h | 173 ---- .../src/hicn/transport/protocols/reassembly.cc | 70 -- .../src/hicn/transport/protocols/reassembly.h | 67 -- libtransport/src/hicn/transport/protocols/rtc.cc | 1016 -------------------- libtransport/src/hicn/transport/protocols/rtc.h | 227 ----- .../src/hicn/transport/protocols/rtc_data_path.cc | 160 --- .../src/hicn/transport/protocols/rtc_data_path.h | 79 -- .../src/hicn/transport/protocols/statistics.h | 113 --- .../hicn/transport/protocols/test/CMakeLists.txt | 10 - .../protocols/test/test_transport_producer.cc | 80 -- .../transport/protocols/verification_manager.cc | 96 -- .../transport/protocols/verification_manager.h | 67 -- 40 files changed, 5694 deletions(-) delete mode 100644 libtransport/src/hicn/transport/protocols/CMakeLists.txt delete mode 100644 libtransport/src/hicn/transport/protocols/byte_stream_reassembly.cc delete mode 100644 libtransport/src/hicn/transport/protocols/byte_stream_reassembly.h delete mode 100644 libtransport/src/hicn/transport/protocols/cbr.cc delete mode 100644 libtransport/src/hicn/transport/protocols/cbr.h delete mode 100644 libtransport/src/hicn/transport/protocols/congestion_window_protocol.h delete mode 100644 libtransport/src/hicn/transport/protocols/consumer.conf delete mode 100644 libtransport/src/hicn/transport/protocols/data_processing_events.h delete mode 100644 libtransport/src/hicn/transport/protocols/datagram_reassembly.cc delete mode 100644 libtransport/src/hicn/transport/protocols/datagram_reassembly.h delete mode 100644 libtransport/src/hicn/transport/protocols/download_observer.h delete mode 100644 libtransport/src/hicn/transport/protocols/errors.cc delete mode 100644 libtransport/src/hicn/transport/protocols/errors.h delete mode 100644 libtransport/src/hicn/transport/protocols/incremental_indexer.cc delete mode 100644 libtransport/src/hicn/transport/protocols/incremental_indexer.h delete mode 100644 libtransport/src/hicn/transport/protocols/indexer.cc delete mode 100644 libtransport/src/hicn/transport/protocols/indexer.h delete mode 100644 libtransport/src/hicn/transport/protocols/manifest_incremental_indexer.cc delete mode 100644 libtransport/src/hicn/transport/protocols/manifest_incremental_indexer.h delete mode 100644 libtransport/src/hicn/transport/protocols/manifest_indexing_manager.cc delete mode 100644 libtransport/src/hicn/transport/protocols/packet_manager.h delete mode 100644 libtransport/src/hicn/transport/protocols/protocol.cc delete mode 100644 libtransport/src/hicn/transport/protocols/protocol.h delete mode 100644 libtransport/src/hicn/transport/protocols/raaqm.cc delete mode 100644 libtransport/src/hicn/transport/protocols/raaqm.h delete mode 100644 libtransport/src/hicn/transport/protocols/raaqm_data_path.cc delete mode 100644 libtransport/src/hicn/transport/protocols/raaqm_data_path.h delete mode 100644 libtransport/src/hicn/transport/protocols/rate_estimation.cc delete mode 100644 libtransport/src/hicn/transport/protocols/rate_estimation.h delete mode 100644 libtransport/src/hicn/transport/protocols/reassembly.cc delete mode 100644 libtransport/src/hicn/transport/protocols/reassembly.h delete mode 100644 libtransport/src/hicn/transport/protocols/rtc.cc delete mode 100644 libtransport/src/hicn/transport/protocols/rtc.h delete mode 100644 libtransport/src/hicn/transport/protocols/rtc_data_path.cc delete mode 100644 libtransport/src/hicn/transport/protocols/rtc_data_path.h delete mode 100644 libtransport/src/hicn/transport/protocols/statistics.h delete mode 100644 libtransport/src/hicn/transport/protocols/test/CMakeLists.txt delete mode 100644 libtransport/src/hicn/transport/protocols/test/test_transport_producer.cc delete mode 100644 libtransport/src/hicn/transport/protocols/verification_manager.cc delete mode 100644 libtransport/src/hicn/transport/protocols/verification_manager.h (limited to 'libtransport/src/hicn/transport/protocols') diff --git a/libtransport/src/hicn/transport/protocols/CMakeLists.txt b/libtransport/src/hicn/transport/protocols/CMakeLists.txt deleted file mode 100644 index 06515e0e2..000000000 --- a/libtransport/src/hicn/transport/protocols/CMakeLists.txt +++ /dev/null @@ -1,77 +0,0 @@ -# Copyright (c) 2017-2019 Cisco and/or its affiliates. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -cmake_minimum_required(VERSION 3.5 FATAL_ERROR) - -list(APPEND HEADER_FILES - ${CMAKE_CURRENT_SOURCE_DIR}/indexer.h - ${CMAKE_CURRENT_SOURCE_DIR}/incremental_indexer.h - ${CMAKE_CURRENT_SOURCE_DIR}/manifest_incremental_indexer.h - ${CMAKE_CURRENT_SOURCE_DIR}/reassembly.h - ${CMAKE_CURRENT_SOURCE_DIR}/datagram_reassembly.h - ${CMAKE_CURRENT_SOURCE_DIR}/byte_stream_reassembly.h - ${CMAKE_CURRENT_SOURCE_DIR}/congestion_window_protocol.h - ${CMAKE_CURRENT_SOURCE_DIR}/packet_manager.h - ${CMAKE_CURRENT_SOURCE_DIR}/statistics.h - ${CMAKE_CURRENT_SOURCE_DIR}/rate_estimation.h - ${CMAKE_CURRENT_SOURCE_DIR}/download_observer.h - ${CMAKE_CURRENT_SOURCE_DIR}/protocol.h - ${CMAKE_CURRENT_SOURCE_DIR}/raaqm.h - ${CMAKE_CURRENT_SOURCE_DIR}/raaqm_data_path.h - ${CMAKE_CURRENT_SOURCE_DIR}/cbr.h - ${CMAKE_CURRENT_SOURCE_DIR}/rtc.h - ${CMAKE_CURRENT_SOURCE_DIR}/rtc_data_path.h - ${CMAKE_CURRENT_SOURCE_DIR}/errors.h - ${CMAKE_CURRENT_SOURCE_DIR}/verification_manager.h - ${CMAKE_CURRENT_SOURCE_DIR}/data_processing_events.h -) - -list(APPEND SOURCE_FILES - ${CMAKE_CURRENT_SOURCE_DIR}/indexer.cc - ${CMAKE_CURRENT_SOURCE_DIR}/incremental_indexer.cc - ${CMAKE_CURRENT_SOURCE_DIR}/manifest_incremental_indexer.cc - ${CMAKE_CURRENT_SOURCE_DIR}/reassembly.cc - ${CMAKE_CURRENT_SOURCE_DIR}/datagram_reassembly.cc - ${CMAKE_CURRENT_SOURCE_DIR}/byte_stream_reassembly.cc - ${CMAKE_CURRENT_SOURCE_DIR}/protocol.cc - ${CMAKE_CURRENT_SOURCE_DIR}/raaqm.cc - ${CMAKE_CURRENT_SOURCE_DIR}/rate_estimation.cc - ${CMAKE_CURRENT_SOURCE_DIR}/raaqm_data_path.cc - ${CMAKE_CURRENT_SOURCE_DIR}/cbr.cc - ${CMAKE_CURRENT_SOURCE_DIR}/rtc.cc - ${CMAKE_CURRENT_SOURCE_DIR}/rtc_data_path.cc - ${CMAKE_CURRENT_SOURCE_DIR}/errors.cc - ${CMAKE_CURRENT_SOURCE_DIR}/verification_manager.cc -) - -set(RAAQM_CONFIG_INSTALL_PREFIX -${CMAKE_INSTALL_PREFIX}/etc/hicn -) - -set(raaqm_config_path - ${RAAQM_CONFIG_INSTALL_PREFIX}/consumer.conf - PARENT_SCOPE -) - -set(TRANSPORT_CONFIG - ${CMAKE_CURRENT_SOURCE_DIR}/consumer.conf -) - -install( - FILES ${TRANSPORT_CONFIG} - DESTINATION ${CMAKE_INSTALL_FULL_SYSCONFDIR}/hicn - COMPONENT lib${LIBTRANSPORT} -) - -set(SOURCE_FILES ${SOURCE_FILES} PARENT_SCOPE) -set(HEADER_FILES ${HEADER_FILES} PARENT_SCOPE) \ No newline at end of file diff --git a/libtransport/src/hicn/transport/protocols/byte_stream_reassembly.cc b/libtransport/src/hicn/transport/protocols/byte_stream_reassembly.cc deleted file mode 100644 index 2f1e5d8fd..000000000 --- a/libtransport/src/hicn/transport/protocols/byte_stream_reassembly.cc +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include - -#include -#include -#include -#include -#include - -namespace transport { - -namespace protocol { - -ByteStreamReassembly::ByteStreamReassembly( - interface::ConsumerSocket *icn_socket, - TransportProtocol *transport_protocol) - : Reassembly(icn_socket, transport_protocol), - index_(IndexManager::invalid_index), - download_complete_(false) {} - -void ByteStreamReassembly::reassemble( - std::unique_ptr &&manifest) { - if (TRANSPORT_EXPECT_TRUE(manifest != nullptr)) { - received_packets_.emplace( - std::make_pair(manifest->getName().getSuffix(), nullptr)); - assembleContent(); - } -} - -void ByteStreamReassembly::reassemble(ContentObject::Ptr &&content_object) { - if (TRANSPORT_EXPECT_TRUE(content_object != nullptr)) { - received_packets_.emplace(std::make_pair( - content_object->getName().getSuffix(), std::move(content_object))); - assembleContent(); - } -} - -void ByteStreamReassembly::assembleContent() { - if (TRANSPORT_EXPECT_FALSE(index_ == IndexManager::invalid_index)) { - index_ = index_manager_->getNextReassemblySegment(); - if (index_ == IndexManager::invalid_index) { - return; - } - } - - auto it = received_packets_.find((const unsigned int)index_); - while (it != received_packets_.end()) { - // Check if valid packet - if (it->second) { - copyContent(*it->second); - } - - received_packets_.erase(it); - index_ = index_manager_->getNextReassemblySegment(); - it = received_packets_.find((const unsigned int)index_); - } - - if (!download_complete_ && index_ != IndexManager::invalid_index) { - transport_protocol_->onReassemblyFailed(index_); - } -} - -void ByteStreamReassembly::copyContent(const ContentObject &content_object) { - auto a = content_object.getPayload(); - auto payload_length = a->length(); - auto write_size = std::min(payload_length, read_buffer_->tailroom()); - auto additional_bytes = payload_length > read_buffer_->tailroom() - ? payload_length - read_buffer_->tailroom() - : 0; - - std::memcpy(read_buffer_->writableTail(), a->data(), write_size); - read_buffer_->append(write_size); - - if (!read_buffer_->tailroom()) { - notifyApplication(); - std::memcpy(read_buffer_->writableTail(), a->data() + write_size, - additional_bytes); - read_buffer_->append(additional_bytes); - } - - download_complete_ = - index_manager_->getFinalSuffix() == content_object.getName().getSuffix(); - - if (TRANSPORT_EXPECT_FALSE(download_complete_)) { - notifyApplication(); - transport_protocol_->onContentReassembled( - make_error_code(protocol_error::success)); - } -} - -void ByteStreamReassembly::reInitialize() { - index_ = IndexManager::invalid_index; - download_complete_ = false; - - received_packets_.clear(); - - // reset read buffer - interface::ConsumerSocket::ReadCallback *read_callback; - reassembly_consumer_socket_->getSocketOption( - interface::ConsumerCallbacksOptions::READ_CALLBACK, &read_callback); - - read_buffer_ = utils::MemBuf::create(read_callback->maxBufferSize()); -} - -} // namespace protocol - -} // namespace transport diff --git a/libtransport/src/hicn/transport/protocols/byte_stream_reassembly.h b/libtransport/src/hicn/transport/protocols/byte_stream_reassembly.h deleted file mode 100644 index 7c77d486f..000000000 --- a/libtransport/src/hicn/transport/protocols/byte_stream_reassembly.h +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include - -namespace transport { - -namespace protocol { - -class ByteStreamReassembly : public Reassembly { - public: - ByteStreamReassembly(interface::ConsumerSocket *icn_socket, - TransportProtocol *transport_protocol); - - protected: - virtual void reassemble(core::ContentObject::Ptr &&content_object) override; - - virtual void reassemble( - std::unique_ptr &&manifest) override; - - virtual void copyContent(const core::ContentObject &content_object); - - virtual void reInitialize() override; - - private: - void assembleContent(); - - protected: - // The consumer socket - // std::unique_ptr incremental_index_manager_; - // std::unique_ptr manifest_index_manager_; - // IndexVerificationManager *index_manager_; - std::unordered_map received_packets_; - uint32_t index_; - bool download_complete_; -}; - -} // namespace protocol - -} // end namespace transport diff --git a/libtransport/src/hicn/transport/protocols/cbr.cc b/libtransport/src/hicn/transport/protocols/cbr.cc deleted file mode 100644 index 02bc7b5e4..000000000 --- a/libtransport/src/hicn/transport/protocols/cbr.cc +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include -#include - -namespace transport { - -namespace protocol { - -using namespace interface; - -CbrTransportProtocol::CbrTransportProtocol( - interface::ConsumerSocket *icnet_socket) - : RaaqmTransportProtocol(icnet_socket) {} - -int CbrTransportProtocol::start() { return RaaqmTransportProtocol::start(); } - -void CbrTransportProtocol::reset() { - RaaqmTransportProtocol::reset(); - socket_->getSocketOption(GeneralTransportOptions::CURRENT_WINDOW_SIZE, - current_window_size_); -} - -void CbrTransportProtocol::afterDataUnsatisfied(uint64_t segment) {} - -void CbrTransportProtocol::afterContentReception( - const Interest &interest, const ContentObject &content_object) { - auto segment = content_object.getName().getSuffix(); - auto now = utils::SteadyClock::now(); - auto rtt = std::chrono::duration_cast( - now - interest_timepoints_[segment & mask]); - // Update stats - updateStats(segment, rtt.count(), now); -} - -} // end namespace protocol - -} // end namespace transport diff --git a/libtransport/src/hicn/transport/protocols/cbr.h b/libtransport/src/hicn/transport/protocols/cbr.h deleted file mode 100644 index e80da14f5..000000000 --- a/libtransport/src/hicn/transport/protocols/cbr.h +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include - -namespace transport { - -namespace protocol { - -class CbrTransportProtocol : public RaaqmTransportProtocol { - public: - CbrTransportProtocol(interface::ConsumerSocket *icnet_socket); - - int start() override; - - void reset() override; - - private: - void afterContentReception(const Interest &interest, - const ContentObject &content_object) override; - void afterDataUnsatisfied(uint64_t segment) override; -}; - -} // end namespace protocol - -} // end namespace transport \ No newline at end of file diff --git a/libtransport/src/hicn/transport/protocols/congestion_window_protocol.h b/libtransport/src/hicn/transport/protocols/congestion_window_protocol.h deleted file mode 100644 index 36ac6eb17..000000000 --- a/libtransport/src/hicn/transport/protocols/congestion_window_protocol.h +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -namespace transport { - -namespace protocol { - -class CWindowProtocol { - protected: - virtual void increaseWindow() = 0; - virtual void decreaseWindow() = 0; -}; - -} // end namespace protocol - -} // end namespace transport diff --git a/libtransport/src/hicn/transport/protocols/consumer.conf b/libtransport/src/hicn/transport/protocols/consumer.conf deleted file mode 100644 index 1a366f32f..000000000 --- a/libtransport/src/hicn/transport/protocols/consumer.conf +++ /dev/null @@ -1,21 +0,0 @@ -; this file contais the parameters for RAAQM -autotune = no -lifetime = 500 -retransmissions = 128 -beta = 0.99 -drop = 0.003 -beta_wifi_ = 0.99 -drop_wifi_ = 0.6 -beta_lte_ = 0.99 -drop_lte_ = 0.003 -wifi_delay_ = 200 -lte_delay_ = 9000 - -alpha = 0.95 -batching_parameter = 200 - -;Choice of rate estimator: -;0 --> an estimation each $(batching_parameter) packets -;1 --> an estimation "a la TCP", estimation at the end of the download of the segment - -rate_estimator = 0 diff --git a/libtransport/src/hicn/transport/protocols/data_processing_events.h b/libtransport/src/hicn/transport/protocols/data_processing_events.h deleted file mode 100644 index 8975c2b4a..000000000 --- a/libtransport/src/hicn/transport/protocols/data_processing_events.h +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include -#include - -namespace transport { -namespace protocol { - -class ContentObjectProcessingEventCallback { - public: - virtual ~ContentObjectProcessingEventCallback() = default; - virtual void onPacketDropped(core::Interest::Ptr &&i, - core::ContentObject::Ptr &&c) = 0; - virtual void onReassemblyFailed(std::uint32_t missing_segment) = 0; -}; - -} // namespace protocol -} // namespace transport diff --git a/libtransport/src/hicn/transport/protocols/datagram_reassembly.cc b/libtransport/src/hicn/transport/protocols/datagram_reassembly.cc deleted file mode 100644 index 7b01ad4bc..000000000 --- a/libtransport/src/hicn/transport/protocols/datagram_reassembly.cc +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include - -namespace transport { - -namespace protocol { - -DatagramReassembly::DatagramReassembly(interface::ConsumerSocket* icn_socket, - TransportProtocol* transport_protocol) - : Reassembly(icn_socket, transport_protocol) {} - -void DatagramReassembly::reassemble(core::ContentObject::Ptr&& content_object) { - read_buffer_ = content_object->getPayload(); - Reassembly::notifyApplication(); -} - -void DatagramReassembly::reInitialize() {} - -} // namespace protocol - -} // namespace transport diff --git a/libtransport/src/hicn/transport/protocols/datagram_reassembly.h b/libtransport/src/hicn/transport/protocols/datagram_reassembly.h deleted file mode 100644 index 923b6f2c1..000000000 --- a/libtransport/src/hicn/transport/protocols/datagram_reassembly.h +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include - -namespace transport { - -namespace protocol { - -class DatagramReassembly : public Reassembly { - public: - DatagramReassembly(interface::ConsumerSocket *icn_socket, - TransportProtocol *transport_protocol); - - virtual void reassemble(core::ContentObject::Ptr &&content_object) override; - virtual void reInitialize() override; - virtual void reassemble( - std::unique_ptr &&manifest) override { - return; - } -}; - -} // namespace protocol - -} // end namespace transport diff --git a/libtransport/src/hicn/transport/protocols/download_observer.h b/libtransport/src/hicn/transport/protocols/download_observer.h deleted file mode 100644 index 6d24fe6fd..000000000 --- a/libtransport/src/hicn/transport/protocols/download_observer.h +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -namespace transport { - -namespace protocol { - -class IcnObserver { - public: - virtual ~IcnObserver(){}; - - virtual void notifyStats(double throughput) = 0; - virtual void notifyDownloadTime(double downloadTime) = 0; -}; - -} // end namespace protocol - -} // end namespace transport diff --git a/libtransport/src/hicn/transport/protocols/errors.cc b/libtransport/src/hicn/transport/protocols/errors.cc deleted file mode 100644 index c2249ed4a..000000000 --- a/libtransport/src/hicn/transport/protocols/errors.cc +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Copyright (c) 2020 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include - -namespace transport { -namespace protocol { - -const std::error_category& protocol_category() { - static protocol_category_impl instance; - - return instance; -} - -const char* protocol_category_impl::name() const throw() { - return "transport::protocol::error"; -} - -std::string protocol_category_impl::message(int ev) const { - switch (static_cast(ev)) { - case protocol_error::success: { - return "Success"; - } - case protocol_error::signature_verification_failed: { - return "Signature verification failed."; - } - case protocol_error::integrity_verification_failed: { - return "Integrity verification failed"; - } - case protocol_error::no_verifier_provided: { - return "Transport cannot get any verifier for the given data."; - } - case protocol_error::io_error: { - return "Conectivity error between transport and local forwarder"; - } - case protocol_error::max_retransmissions_error: { - return "Transport protocol reached max number of retransmissions allowed " - "for the same interest."; - } - case protocol_error::session_aborted: { - return "The session has been aborted by the application."; - } - default: { return "Unknown protocol error"; } - } -} - -} // namespace protocol -} // namespace transport \ No newline at end of file diff --git a/libtransport/src/hicn/transport/protocols/errors.h b/libtransport/src/hicn/transport/protocols/errors.h deleted file mode 100644 index cb3d3474e..000000000 --- a/libtransport/src/hicn/transport/protocols/errors.h +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Copyright (c) 2020 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include -#include - -namespace transport { -namespace protocol { - -/** - * @brief Get the default server error category. - * @return The default server error category instance. - * - * @warning The first call to this function is thread-safe only starting with - * C++11. - */ -const std::error_category& protocol_category(); - -/** - * The list of errors. - */ -enum class protocol_error { - success = 0, - signature_verification_failed, - integrity_verification_failed, - no_verifier_provided, - io_error, - max_retransmissions_error, - session_aborted, -}; - -/** - * @brief Create an error_code instance for the given error. - * @param error The error. - * @return The error_code instance. - */ -inline std::error_code make_error_code(protocol_error error) { - return std::error_code(static_cast(error), protocol_category()); -} - -/** - * @brief Create an error_condition instance for the given error. - * @param error The error. - * @return The error_condition instance. - */ -inline std::error_condition make_error_condition(protocol_error error) { - return std::error_condition(static_cast(error), protocol_category()); -} - -/** - * @brief A server error category. - */ -class protocol_category_impl : public std::error_category { - public: - /** - * @brief Get the name of the category. - * @return The name of the category. - */ - virtual const char* name() const throw(); - - /** - * @brief Get the error message for a given error. - * @param ev The error numeric value. - * @return The message associated to the error. - */ - virtual std::string message(int ev) const; -}; -} // namespace protocol -} // namespace transport - -namespace std { -// namespace system { -template <> -struct is_error_code_enum<::transport::protocol::protocol_error> - : public std::true_type {}; -// } // namespace system -} // namespace std \ No newline at end of file diff --git a/libtransport/src/hicn/transport/protocols/incremental_indexer.cc b/libtransport/src/hicn/transport/protocols/incremental_indexer.cc deleted file mode 100644 index 5a8046daa..000000000 --- a/libtransport/src/hicn/transport/protocols/incremental_indexer.cc +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include - -#include - -namespace transport { -namespace protocol { - -void IncrementalIndexer::onContentObject( - core::Interest::Ptr &&interest, core::ContentObject::Ptr &&content_object) { - using namespace interface; - - if (TRANSPORT_EXPECT_FALSE(content_object->testRst())) { - final_suffix_ = content_object->getName().getSuffix(); - } - - auto ret = verification_manager_->onPacketToVerify(*content_object); - - switch (ret) { - case VerificationPolicy::ACCEPT_PACKET: { - reassembly_->reassemble(std::move(content_object)); - break; - } - case VerificationPolicy::DROP_PACKET: { - transport_protocol_->onPacketDropped(std::move(interest), - std::move(content_object)); - break; - } - case VerificationPolicy::ABORT_SESSION: { - transport_protocol_->onContentReassembled( - make_error_code(protocol_error::session_aborted)); - break; - } - } -} - -} // namespace protocol -} // namespace transport \ No newline at end of file diff --git a/libtransport/src/hicn/transport/protocols/incremental_indexer.h b/libtransport/src/hicn/transport/protocols/incremental_indexer.h deleted file mode 100644 index b587a8332..000000000 --- a/libtransport/src/hicn/transport/protocols/incremental_indexer.h +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include - -#include -#include -#include -#include -#include - -#include - -namespace transport { - -namespace interface { -class ConsumerSocket; -} - -namespace protocol { - -class Reassembly; -class TransportProtocol; - -class IncrementalIndexer : public Indexer { - public: - IncrementalIndexer(interface::ConsumerSocket *icn_socket, - TransportProtocol *transport, Reassembly *reassembly) - : socket_(icn_socket), - reassembly_(reassembly), - transport_protocol_(transport), - final_suffix_(std::numeric_limits::max()), - first_suffix_(0), - next_download_suffix_(0), - next_reassembly_suffix_(0), - verification_manager_( - std::make_unique(icn_socket)) { - if (reassembly_) { - reassembly_->setIndexer(this); - } - } - - IncrementalIndexer(const IncrementalIndexer &) = delete; - - IncrementalIndexer(IncrementalIndexer &&other) - : socket_(other.socket_), - reassembly_(other.reassembly_), - transport_protocol_(other.transport_protocol_), - final_suffix_(other.final_suffix_), - first_suffix_(other.first_suffix_), - next_download_suffix_(other.next_download_suffix_), - next_reassembly_suffix_(other.next_reassembly_suffix_), - verification_manager_(std::move(other.verification_manager_)) { - if (reassembly_) { - reassembly_->setIndexer(this); - } - } - - /** - * - */ - virtual ~IncrementalIndexer() {} - - TRANSPORT_ALWAYS_INLINE virtual void reset( - std::uint32_t offset = 0) override { - final_suffix_ = std::numeric_limits::max(); - next_download_suffix_ = offset; - next_reassembly_suffix_ = offset; - } - - /** - * Retrieve from the manifest the next suffix to retrieve. - */ - TRANSPORT_ALWAYS_INLINE virtual uint32_t getNextSuffix() override { - return next_download_suffix_ <= final_suffix_ ? next_download_suffix_++ - : IndexManager::invalid_index; - } - - TRANSPORT_ALWAYS_INLINE virtual void setFirstSuffix( - uint32_t suffix) override { - first_suffix_ = suffix; - } - - /** - * Retrive the next segment to be reassembled. - */ - TRANSPORT_ALWAYS_INLINE virtual uint32_t getNextReassemblySegment() override { - return next_reassembly_suffix_ <= final_suffix_ - ? next_reassembly_suffix_++ - : IndexManager::invalid_index; - } - - TRANSPORT_ALWAYS_INLINE virtual bool isFinalSuffixDiscovered() override { - return final_suffix_ != std::numeric_limits::max(); - } - - TRANSPORT_ALWAYS_INLINE virtual uint32_t getFinalSuffix() override { - return final_suffix_; - } - - void onContentObject(core::Interest::Ptr &&interest, - core::ContentObject::Ptr &&content_object) override; - - TRANSPORT_ALWAYS_INLINE void setReassembly(Reassembly *reassembly) { - reassembly_ = reassembly; - - if (reassembly_) { - reassembly_->setIndexer(this); - } - } - - TRANSPORT_ALWAYS_INLINE bool onKeyToVerify() override { - return verification_manager_->onKeyToVerify(); - } - - protected: - interface::ConsumerSocket *socket_; - Reassembly *reassembly_; - TransportProtocol *transport_protocol_; - uint32_t final_suffix_; - uint32_t first_suffix_; - uint32_t next_download_suffix_; - uint32_t next_reassembly_suffix_; - std::unique_ptr verification_manager_; -}; - -} // end namespace protocol - -} // end namespace transport diff --git a/libtransport/src/hicn/transport/protocols/indexer.cc b/libtransport/src/hicn/transport/protocols/indexer.cc deleted file mode 100644 index d95c10ff9..000000000 --- a/libtransport/src/hicn/transport/protocols/indexer.cc +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include - -#include -#include -#include -#include - -namespace transport { -namespace protocol { - -IndexManager::IndexManager(interface::ConsumerSocket *icn_socket, - TransportProtocol *transport, Reassembly *reassembly) - : indexer_(std::make_unique(icn_socket, transport, - reassembly)), - first_segment_received_(false), - icn_socket_(icn_socket), - transport_(transport), - reassembly_(reassembly) {} - -void IndexManager::onContentObject(core::Interest::Ptr &&interest, - core::ContentObject::Ptr &&content_object) { - if (first_segment_received_) { - indexer_->onContentObject(std::move(interest), std::move(content_object)); - } else { - std::uint32_t segment_number = interest->getName().getSuffix(); - - if (segment_number == 0) { - // Check if manifest - if (content_object->getPayloadType() == PayloadType::MANIFEST) { - IncrementalIndexer *indexer = - static_cast(indexer_.release()); - indexer_ = - std::make_unique(std::move(*indexer)); - delete indexer; - } - - indexer_->onContentObject(std::move(interest), std::move(content_object)); - auto it = interest_data_set_.begin(); - while (it != interest_data_set_.end()) { - indexer_->onContentObject(std::move(const_cast(it->first)), std::move(const_cast(it->second))); - it = interest_data_set_.erase(it); - } - - first_segment_received_ = true; - } else { - interest_data_set_.emplace(std::move(interest), std::move(content_object)); - } - } -} - -bool IndexManager::onKeyToVerify() { - return indexer_->onKeyToVerify(); -} - -void IndexManager::reset(std::uint32_t offset) { - indexer_ = std::make_unique(icn_socket_, transport_, - reassembly_); - first_segment_received_ = false; - interest_data_set_.clear(); -} - -} // namespace protocol -} // namespace transport diff --git a/libtransport/src/hicn/transport/protocols/indexer.h b/libtransport/src/hicn/transport/protocols/indexer.h deleted file mode 100644 index 87cf9b307..000000000 --- a/libtransport/src/hicn/transport/protocols/indexer.h +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include -#include - -#include - -namespace transport { - -namespace interface { -class ConsumerSocket; -} - -namespace protocol { - -class Reassembly; -class TransportProtocol; - -class Indexer { - public: - /** - * - */ - virtual ~Indexer() = default; - /** - * Retrieve from the manifest the next suffix to retrieve. - */ - virtual uint32_t getNextSuffix() = 0; - - virtual void setFirstSuffix(uint32_t suffix) = 0; - - /** - * Retrive the next segment to be reassembled. - */ - virtual uint32_t getNextReassemblySegment() = 0; - - virtual bool isFinalSuffixDiscovered() = 0; - - virtual uint32_t getFinalSuffix() = 0; - - virtual void reset(std::uint32_t offset = 0) = 0; - - virtual void onContentObject(core::Interest::Ptr &&interest, - core::ContentObject::Ptr &&content_object) = 0; - - virtual bool onKeyToVerify() = 0; -}; - -class IndexManager : Indexer { - public: - static constexpr uint32_t invalid_index = ~0; - - IndexManager(interface::ConsumerSocket *icn_socket, - TransportProtocol *transport, Reassembly *reassembly); - - uint32_t getNextSuffix() override { return indexer_->getNextSuffix(); } - - void setFirstSuffix(uint32_t suffix) override { - indexer_->setFirstSuffix(suffix); - } - - uint32_t getNextReassemblySegment() override { - return indexer_->getNextReassemblySegment(); - } - - bool isFinalSuffixDiscovered() override { - return indexer_->isFinalSuffixDiscovered(); - } - - uint32_t getFinalSuffix() override { return indexer_->getFinalSuffix(); } - - void reset(std::uint32_t offset = 0) override; - - void onContentObject(core::Interest::Ptr &&interest, - core::ContentObject::Ptr &&content_object) override; - - bool onKeyToVerify() override; - - private: - std::unique_ptr indexer_; - bool first_segment_received_; - std::set> - interest_data_set_; - interface::ConsumerSocket *icn_socket_; - TransportProtocol *transport_; - Reassembly *reassembly_; -}; - -} // end namespace protocol - -} // end namespace transport diff --git a/libtransport/src/hicn/transport/protocols/manifest_incremental_indexer.cc b/libtransport/src/hicn/transport/protocols/manifest_incremental_indexer.cc deleted file mode 100644 index 592daa4d4..000000000 --- a/libtransport/src/hicn/transport/protocols/manifest_incremental_indexer.cc +++ /dev/null @@ -1,232 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include -#include - -#include -#include - -namespace transport { - -namespace protocol { - -using namespace interface; - -ManifestIncrementalIndexer::ManifestIncrementalIndexer( - interface::ConsumerSocket *icn_socket, TransportProtocol *transport, - Reassembly *reassembly) - : IncrementalIndexer(icn_socket, transport, reassembly), - suffix_strategy_(utils::SuffixStrategyFactory::getSuffixStrategy( - NextSegmentCalculationStrategy::INCREMENTAL, - next_download_suffix_, 0)) {} - -void ManifestIncrementalIndexer::onContentObject( - core::Interest::Ptr &&interest, core::ContentObject::Ptr &&content_object) { - // Check if mainfiest or not - if (content_object->getPayloadType() == PayloadType::MANIFEST) { - onUntrustedManifest(std::move(interest), std::move(content_object)); - } else if (content_object->getPayloadType() == PayloadType::CONTENT_OBJECT) { - onUntrustedContentObject(std::move(interest), std::move(content_object)); - } -} - -void ManifestIncrementalIndexer::onUntrustedManifest( - core::Interest::Ptr &&interest, core::ContentObject::Ptr &&content_object) { - auto ret = verification_manager_->onPacketToVerify(*content_object); - - switch (ret) { - case VerificationPolicy::ACCEPT_PACKET: { - processTrustedManifest(std::move(content_object)); - break; - } - case VerificationPolicy::DROP_PACKET: - case VerificationPolicy::ABORT_SESSION: { - transport_protocol_->onContentReassembled( - make_error_code(protocol_error::session_aborted)); - break; - } - } -} - -void ManifestIncrementalIndexer::processTrustedManifest( - ContentObject::Ptr &&content_object) { - auto manifest = - std::make_unique(std::move(*content_object)); - manifest->decode(); - - if (TRANSPORT_EXPECT_FALSE(manifest->getVersion() != - core::ManifestVersion::VERSION_1)) { - throw errors::RuntimeException("Received manifest with unknown version."); - } - - switch (manifest->getManifestType()) { - case core::ManifestType::INLINE_MANIFEST: { - auto _it = manifest->getSuffixList().begin(); - auto _end = manifest->getSuffixList().end(); - - suffix_strategy_->setFinalSuffix(manifest->getFinalBlockNumber()); - - for (; _it != _end; _it++) { - auto hash = - std::make_pair(std::vector(_it->second, _it->second + 32), - manifest->getHashAlgorithm()); - - if (!checkUnverifiedSegments(_it->first, hash)) { - suffix_hash_map_[_it->first] = std::move(hash); - } - } - - reassembly_->reassemble(std::move(manifest)); - - break; - } - case core::ManifestType::FLIC_MANIFEST: { - throw errors::NotImplementedException(); - } - case core::ManifestType::FINAL_CHUNK_NUMBER: { - throw errors::NotImplementedException(); - } - } -} - -bool ManifestIncrementalIndexer::checkUnverifiedSegments( - std::uint32_t suffix, const HashEntry &hash) { - auto it = unverified_segments_.find(suffix); - - if (it != unverified_segments_.end()) { - auto ret = verifyContentObject(hash, *it->second.second); - - switch (ret) { - case VerificationPolicy::ACCEPT_PACKET: { - reassembly_->reassemble(std::move(it->second.second)); - break; - } - case VerificationPolicy::DROP_PACKET: { - transport_protocol_->onPacketDropped(std::move(it->second.first), - std::move(it->second.second)); - break; - } - case VerificationPolicy::ABORT_SESSION: { - transport_protocol_->onContentReassembled( - make_error_code(protocol_error::session_aborted)); - break; - } - } - - unverified_segments_.erase(it); - return true; - } - - return false; -} - -VerificationPolicy ManifestIncrementalIndexer::verifyContentObject( - const HashEntry &manifest_hash, const ContentObject &content_object) { - VerificationPolicy ret; - - auto hash_type = static_cast(manifest_hash.second); - auto data_packet_digest = content_object.computeDigest(manifest_hash.second); - auto data_packet_digest_bytes = - data_packet_digest.getDigest().data(); - const std::vector &manifest_digest_bytes = manifest_hash.first; - - if (utils::CryptoHash::compareBinaryDigest( - data_packet_digest_bytes, manifest_digest_bytes.data(), hash_type)) { - ret = VerificationPolicy::ACCEPT_PACKET; - } else { - ConsumerContentObjectVerificationFailedCallback - *verification_failed_callback = VOID_HANDLER; - socket_->getSocketOption(ConsumerCallbacksOptions::VERIFICATION_FAILED, - &verification_failed_callback); - ret = (*verification_failed_callback)( - *socket_, content_object, - make_error_code(protocol_error::integrity_verification_failed)); - } - - return ret; -} - -void ManifestIncrementalIndexer::onUntrustedContentObject( - Interest::Ptr &&i, ContentObject::Ptr &&c) { - auto suffix = c->getName().getSuffix(); - auto it = suffix_hash_map_.find(suffix); - - if (it != suffix_hash_map_.end()) { - auto ret = verifyContentObject(it->second, *c); - - switch (ret) { - case VerificationPolicy::ACCEPT_PACKET: { - suffix_hash_map_.erase(it); - reassembly_->reassemble(std::move(c)); - break; - } - case VerificationPolicy::DROP_PACKET: { - transport_protocol_->onPacketDropped(std::move(i), std::move(c)); - break; - } - case VerificationPolicy::ABORT_SESSION: { - transport_protocol_->onContentReassembled( - make_error_code(protocol_error::session_aborted)); - break; - } - } - } else { - unverified_segments_[suffix] = std::make_pair(std::move(i), std::move(c)); - } -} - -uint32_t ManifestIncrementalIndexer::getNextSuffix() { - auto ret = suffix_strategy_->getNextSuffix(); - - if (ret <= suffix_strategy_->getFinalSuffix() && - ret != utils::SuffixStrategy::INVALID_SUFFIX) { - suffix_queue_.push(ret); - return ret; - } - - return IndexManager::invalid_index; -} - -uint32_t ManifestIncrementalIndexer::getFinalSuffix() { - return suffix_strategy_->getFinalSuffix(); -} - -bool ManifestIncrementalIndexer::isFinalSuffixDiscovered() { - return IncrementalIndexer::isFinalSuffixDiscovered(); -} - -uint32_t ManifestIncrementalIndexer::getNextReassemblySegment() { - if (suffix_queue_.empty()) { - return IndexManager::invalid_index; - } - - auto ret = suffix_queue_.front(); - suffix_queue_.pop(); - return ret; -} - -void ManifestIncrementalIndexer::reset(std::uint32_t offset) { - IncrementalIndexer::reset(offset); - suffix_hash_map_.clear(); - unverified_segments_.clear(); - SuffixQueue empty; - std::swap(suffix_queue_, empty); - suffix_strategy_->reset(offset); -} - -} // namespace protocol - -} // namespace transport diff --git a/libtransport/src/hicn/transport/protocols/manifest_incremental_indexer.h b/libtransport/src/hicn/transport/protocols/manifest_incremental_indexer.h deleted file mode 100644 index 6e991f86f..000000000 --- a/libtransport/src/hicn/transport/protocols/manifest_incremental_indexer.h +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include -#include -#include - -#include - -namespace transport { - -namespace protocol { - -class ManifestIncrementalIndexer : public IncrementalIndexer { - static constexpr double alpha = 0.3; - - public: - using SuffixQueue = std::queue; - using HashEntry = std::pair, core::HashAlgorithm>; - - ManifestIncrementalIndexer(interface::ConsumerSocket *icn_socket, - TransportProtocol *transport, Reassembly *reassembly); - - ManifestIncrementalIndexer(IncrementalIndexer &&indexer) - : IncrementalIndexer(std::move(indexer)), - suffix_strategy_(utils::SuffixStrategyFactory::getSuffixStrategy( - core::NextSegmentCalculationStrategy::INCREMENTAL, - next_download_suffix_, 0)) { - for (uint32_t i = first_suffix_; i < next_download_suffix_; i++) { - suffix_queue_.push(i); - } - } - - virtual ~ManifestIncrementalIndexer() = default; - - void reset(std::uint32_t offset = 0) override; - - void onContentObject(core::Interest::Ptr &&interest, - core::ContentObject::Ptr &&content_object) override; - - uint32_t getNextSuffix() override; - - uint32_t getNextReassemblySegment() override; - - bool isFinalSuffixDiscovered() override; - - uint32_t getFinalSuffix() override; - - private: - void onUntrustedManifest(core::Interest::Ptr &&interest, - core::ContentObject::Ptr &&content_object); - void onUntrustedContentObject(core::Interest::Ptr &&interest, - core::ContentObject::Ptr &&content_object); - void processTrustedManifest(core::ContentObject::Ptr &&content_object); - void onManifestReceived(core::Interest::Ptr &&i, - core::ContentObject::Ptr &&c); - void onManifestTimeout(core::Interest::Ptr &&i); - VerificationPolicy verifyContentObject( - const HashEntry &manifest_hash, - const core::ContentObject &content_object); - bool checkUnverifiedSegments(std::uint32_t suffix, const HashEntry &hash); - - protected: - std::unique_ptr suffix_strategy_; - SuffixQueue suffix_queue_; - - // Hash verification - std::unordered_map suffix_hash_map_; - - std::unordered_map> - unverified_segments_; -}; - -} // end namespace protocol - -} // end namespace transport diff --git a/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.cc b/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.cc deleted file mode 100644 index 5bf9c89f7..000000000 --- a/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.cc +++ /dev/null @@ -1,297 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include -#include - -#include -#include - -namespace transport { - -namespace protocol { - -using namespace interface; - -ManifestIndexManager::ManifestIndexManager( - interface::ConsumerSocket *icn_socket, TransportProtocol *next_interest) - : IncrementalIndexManager(icn_socket), - PacketManager(1024), - next_to_retrieve_segment_(suffix_queue_.end()), - suffix_manifest_(core::NextSegmentCalculationStrategy::INCREMENTAL, 0), - next_reassembly_segment_( - core::NextSegmentCalculationStrategy::INCREMENTAL, 1, true), - ignored_segments_(), - next_interest_(next_interest) {} - -bool ManifestIndexManager::onManifest( - core::ContentObject::Ptr &&content_object) { - auto manifest = - std::make_unique(std::move(*content_object)); - bool manifest_verified = verification_manager_->onPacketToVerify(*manifest); - - if (manifest_verified) { - manifest->decode(); - - if (TRANSPORT_EXPECT_FALSE(manifest->getVersion() != - core::ManifestVersion::VERSION_1)) { - throw errors::RuntimeException("Received manifest with unknown version."); - } - - switch (manifest->getManifestType()) { - case core::ManifestType::INLINE_MANIFEST: { - auto _it = manifest->getSuffixList().begin(); - auto _end = manifest->getSuffixList().end(); - size_t nb_segments = std::distance(_it, _end); - final_suffix_ = manifest->getFinalBlockNumber(); // final block number - - TRANSPORT_LOGD("Received manifest %u", - manifest->getWritableName().getSuffix()); - suffix_hash_map_[_it->first] = - std::make_pair(std::vector(_it->second, _it->second + 32), - manifest->getHashAlgorithm()); - suffix_queue_.push_back(_it->first); - - // If the transport protocol finished the list of segments to retrieve, - // reset the next_to_retrieve_segment_ iterator to the next segment - // provided by this manifest. - if (TRANSPORT_EXPECT_FALSE(next_to_retrieve_segment_ == - suffix_queue_.end())) { - next_to_retrieve_segment_ = --suffix_queue_.end(); - } - - std::advance(_it, 1); - for (; _it != _end; _it++) { - suffix_hash_map_[_it->first] = std::make_pair( - std::vector(_it->second, _it->second + 32), - manifest->getHashAlgorithm()); - suffix_queue_.push_back(_it->first); - } - - if (TRANSPORT_EXPECT_FALSE(manifest->getName().getSuffix()) == 0) { - core::NextSegmentCalculationStrategy strategy = - manifest->getNextSegmentCalculationStrategy(); - - suffix_manifest_.reset(0); - suffix_manifest_.setNbSegments(nb_segments); - suffix_manifest_.setSuffixStrategy(strategy); - TRANSPORT_LOGD("Capacity of 1st manifest %zu", - suffix_manifest_.getNbSegments()); - - next_reassembly_segment_.reset(*suffix_queue_.begin()); - next_reassembly_segment_.setNbSegments(nb_segments); - suffix_manifest_.setSuffixStrategy(strategy); - } - - // If the manifest is not full, we add the suffixes of missing segments - // to the list of segments to ignore when computing the next reassembly - // index. - if (TRANSPORT_EXPECT_FALSE( - suffix_manifest_.getNbSegments() - nb_segments > 0)) { - auto start = manifest->getSuffixList().begin(); - auto last = --_end; - for (uint32_t i = last->first + 1; - i < start->first + suffix_manifest_.getNbSegments(); i++) { - ignored_segments_.push_back(i); - } - } - - if (TRANSPORT_EXPECT_FALSE(manifest->isFinalManifest()) == 0) { - fillWindow(manifest->getWritableName(), - manifest->getName().getSuffix()); - } - - break; - } - case core::ManifestType::FLIC_MANIFEST: { - throw errors::NotImplementedException(); - } - case core::ManifestType::FINAL_CHUNK_NUMBER: { - throw errors::NotImplementedException(); - } - } - } - - return manifest_verified; -} - -void ManifestIndexManager::onManifestReceived(Interest::Ptr &&i, - ContentObject::Ptr &&c) { - onManifest(std::move(c)); - if (next_interest_) { - next_interest_->scheduleNextInterests(); - } -} - -void ManifestIndexManager::onManifestTimeout(Interest::Ptr &&i) { - const Name &n = i->getName(); - uint32_t segment = n.getSuffix(); - - if (segment > final_suffix_) { - return; - } - - TRANSPORT_LOGD("Timeout on manifest %u", segment); - // Get portal - std::shared_ptr portal; - socket_->getSocketOption(GeneralTransportOptions::PORTAL, portal); - - // Send requests for manifest out of the congestion window (no - // in_flight_interests++) - portal->sendInterest( - std::move(i), - std::bind(&ManifestIndexManager::onManifestReceived, this, - std::placeholders::_1, std::placeholders::_2), - std::bind(&ManifestIndexManager::onManifestTimeout, this, - std::placeholders::_1)); -} - -void ManifestIndexManager::fillWindow(Name &name, uint32_t current_manifest) { - /* Send as many manifest as required for filling window. */ - uint32_t interest_lifetime; - double window_size; - std::shared_ptr portal; - Interest::Ptr interest; - uint32_t current_segment = *next_to_retrieve_segment_; - // suffix_manifest_ now points to the next manifest to request - uint32_t last_requested_manifest = (suffix_manifest_++).getSuffix(); - - socket_->getSocketOption(GeneralTransportOptions::PORTAL, portal); - socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, - interest_lifetime); - socket_->getSocketOption(GeneralTransportOptions::CURRENT_WINDOW_SIZE, - window_size); - - if (TRANSPORT_EXPECT_FALSE(suffix_manifest_.getSuffix() >= final_suffix_)) { - suffix_manifest_.updateSuffix(last_requested_manifest); - return; - } - - if (current_segment + window_size < suffix_manifest_.getSuffix() && - current_manifest != last_requested_manifest) { - suffix_manifest_.updateSuffix(last_requested_manifest); - return; - } - - do { - interest = getPacket(); - name.setSuffix(suffix_manifest_.getSuffix()); - interest->setName(name); - interest->setLifetime(interest_lifetime); - - // Send interests for manifest out of the congestion window (no - // in_flight_interests++) - portal->sendInterest( - std::move(interest), - std::bind(&ManifestIndexManager::onManifestReceived, this, - std::placeholders::_1, std::placeholders::_2), - std::bind(&ManifestIndexManager::onManifestTimeout, this, - std::placeholders::_1)); - TRANSPORT_LOGD("Send manifest interest %u", name.getSuffix()); - - last_requested_manifest = (suffix_manifest_++).getSuffix(); - } while (current_segment + window_size >= suffix_manifest_.getSuffix() && - suffix_manifest_.getSuffix() < final_suffix_); - - // suffix_manifest_ now points to the last requested manifest - suffix_manifest_.updateSuffix(last_requested_manifest); -} - -bool ManifestIndexManager::onContentObject( - const core::ContentObject &content_object) { - bool verify_signature; - socket_->getSocketOption(GeneralTransportOptions::VERIFY_SIGNATURE, - verify_signature); - - if (!verify_signature) { - return true; - } - - uint64_t segment = content_object.getName().getSuffix(); - - bool ret = false; - - auto it = suffix_hash_map_.find((const unsigned int)segment); - if (it != suffix_hash_map_.end()) { - auto hash_type = static_cast(it->second.second); - auto data_packet_digest = content_object.computeDigest(it->second.second); - auto data_packet_digest_bytes = - data_packet_digest.getDigest().data(); - std::vector &manifest_digest_bytes = it->second.first; - - if (utils::CryptoHash::compareBinaryDigest(data_packet_digest_bytes, - manifest_digest_bytes.data(), - hash_type)) { - suffix_hash_map_.erase(it); - ret = true; - } else { - throw errors::RuntimeException( - "Verification failure policy has to be implemented."); - } - } - - return ret; -} - -uint32_t ManifestIndexManager::getNextSuffix() { - if (TRANSPORT_EXPECT_FALSE(next_to_retrieve_segment_ == - suffix_queue_.end())) { - return invalid_index; - } - - return *next_to_retrieve_segment_++; -} - -uint32_t ManifestIndexManager::getFinalSuffix() { return final_suffix_; } - -bool ManifestIndexManager::isFinalSuffixDiscovered() { - return IncrementalIndexManager::isFinalSuffixDiscovered(); -} - -uint32_t ManifestIndexManager::getNextReassemblySegment() { - uint32_t current_reassembly_segment; - - while (true) { - current_reassembly_segment = next_reassembly_segment_.getSuffix(); - next_reassembly_segment_++; - - if (TRANSPORT_EXPECT_FALSE(current_reassembly_segment > final_suffix_)) { - return invalid_index; - } - - if (ignored_segments_.empty()) break; - - auto is_ignored = - std::find(ignored_segments_.begin(), ignored_segments_.end(), - current_reassembly_segment); - - if (is_ignored == ignored_segments_.end()) break; - - ignored_segments_.erase(is_ignored); - } - - return current_reassembly_segment; -} - -void ManifestIndexManager::reset() { - IncrementalIndexManager::reset(); - suffix_manifest_.reset(0); - suffix_queue_.clear(); - suffix_hash_map_.clear(); -} - -} // end namespace protocol - -} // end namespace transport diff --git a/libtransport/src/hicn/transport/protocols/packet_manager.h b/libtransport/src/hicn/transport/protocols/packet_manager.h deleted file mode 100644 index 4d4011ecf..000000000 --- a/libtransport/src/hicn/transport/protocols/packet_manager.h +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include - -namespace transport { - -namespace protocol { - -using namespace core; - -template -class PacketManager { - static_assert(std::is_base_of::value, - "The packet manager support just Interest and Data."); - - static constexpr std::size_t packet_pool_size = 4096; - - public: - PacketManager(std::size_t size = packet_pool_size) : size_(0) { - // Create pool of interests - increasePoolSize(size); - } - - TRANSPORT_ALWAYS_INLINE void increasePoolSize(std::size_t size) { - for (std::size_t i = 0; i < size; i++) { - interest_pool_.add(new PacketType()); - } - - size_ += size; - } - - TRANSPORT_ALWAYS_INLINE typename PacketType::Ptr getPacket() { - auto result = interest_pool_.get(); - - while (TRANSPORT_EXPECT_FALSE(!result.first)) { - // Add packets to the pool - increasePoolSize(size_); - result = interest_pool_.get(); - } - - result.second->resetPayload(); - return std::move(result.second); - } - - private: - utils::ObjectPool interest_pool_; - std::size_t size_; -}; - -} // end namespace protocol - -} // end namespace transport diff --git a/libtransport/src/hicn/transport/protocols/protocol.cc b/libtransport/src/hicn/transport/protocols/protocol.cc deleted file mode 100644 index db461a66f..000000000 --- a/libtransport/src/hicn/transport/protocols/protocol.cc +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include -#include - -namespace transport { - -namespace protocol { - -using namespace interface; - -TransportProtocol::TransportProtocol(interface::ConsumerSocket *icn_socket, - Reassembly *reassembly_protocol) - : socket_(icn_socket), - reassembly_protocol_(reassembly_protocol), - index_manager_( - std::make_unique(socket_, this, reassembly_protocol)), - is_running_(false), - is_first_(false) { - socket_->getSocketOption(GeneralTransportOptions::PORTAL, portal_); - socket_->getSocketOption(OtherOptions::STATISTICS, &stats_); -} - -int TransportProtocol::start() { - // If the protocol is already running, return otherwise set as running - if (is_running_) return -1; - - // Reset the protocol state machine - reset(); - - // Set it is the first time we schedule an interest - is_first_ = true; - - // Schedule next interests - scheduleNextInterests(); - - is_first_ = false; - - // Set the protocol as running - is_running_ = true; - - // Start Event loop - portal_->runEventsLoop(); - - // Not running anymore - is_running_ = false; - - return 0; -} - -void TransportProtocol::stop() { - is_running_ = false; - portal_->stopEventsLoop(); -} - -void TransportProtocol::resume() { - if (is_running_) return; - - is_running_ = true; - - scheduleNextInterests(); - - portal_->runEventsLoop(); - - is_running_ = false; -} - -void TransportProtocol::onContentReassembled(std::error_code ec) { - interface::ConsumerSocket::ReadCallback *on_payload = VOID_HANDLER; - socket_->getSocketOption(READ_CALLBACK, &on_payload); - - if (!on_payload) { - throw errors::RuntimeException( - "The read callback must be installed in the transport before " - "starting " - "the content retrieval."); - } - - if (!ec) { - on_payload->readSuccess(stats_->getBytesRecv()); - } else { - on_payload->readError(ec); - } - - stop(); - - on_payload->afterRead(); -} - -} // end namespace protocol - -} // end namespace transport diff --git a/libtransport/src/hicn/transport/protocols/protocol.h b/libtransport/src/hicn/transport/protocols/protocol.h deleted file mode 100644 index 4897da902..000000000 --- a/libtransport/src/hicn/transport/protocols/protocol.h +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include - -#include -#include -#include -#include -#include -#include -#include - -namespace transport { - -namespace protocol { - -using namespace core; - -class IndexVerificationManager; - -class TransportProtocolCallback { - virtual void onContentObject(const core::Interest &interest, - const core::ContentObject &content_object) = 0; - virtual void onTimeout(const core::Interest &interest) = 0; -}; - -class TransportProtocol : public interface::BasePortal::ConsumerCallback, - public PacketManager, - public ContentObjectProcessingEventCallback { - static constexpr std::size_t interest_pool_size = 4096; - - friend class ManifestIndexManager; - - public: - TransportProtocol(interface::ConsumerSocket *icn_socket, - Reassembly *reassembly_protocol); - - virtual ~TransportProtocol() = default; - - TRANSPORT_ALWAYS_INLINE bool isRunning() { return is_running_; } - - virtual int start(); - - virtual void stop(); - - virtual void resume(); - - virtual bool verifyKeyPackets() = 0; - - virtual void scheduleNextInterests() = 0; - - // Events generated by the indexing - virtual void onContentReassembled(std::error_code ec); - virtual void onPacketDropped(Interest::Ptr &&interest, - ContentObject::Ptr &&content_object) = 0; - virtual void onReassemblyFailed(std::uint32_t missing_segment) = 0; - - protected: - // Consumer Callback - virtual void reset() = 0; - virtual void onContentObject(Interest::Ptr &&i, ContentObject::Ptr &&c) = 0; - virtual void onTimeout(Interest::Ptr &&i) = 0; - - protected: - interface::ConsumerSocket *socket_; - std::unique_ptr reassembly_protocol_; - std::unique_ptr index_manager_; - std::shared_ptr portal_; - std::atomic is_running_; - // True if it si the first time we schedule an interest - std::atomic is_first_; - TransportStatistics *stats_; -}; - -} // end namespace protocol -} // end namespace transport diff --git a/libtransport/src/hicn/transport/protocols/raaqm.cc b/libtransport/src/hicn/transport/protocols/raaqm.cc deleted file mode 100644 index 984470edb..000000000 --- a/libtransport/src/hicn/transport/protocols/raaqm.cc +++ /dev/null @@ -1,712 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include -#include -#include -#include - -#include -#include - -namespace transport { - -namespace protocol { - -using namespace interface; - -RaaqmTransportProtocol::RaaqmTransportProtocol(ConsumerSocket *icn_socket) - : TransportProtocol(icn_socket, new ByteStreamReassembly(icn_socket, this)), - current_window_size_(1), - interests_in_flight_(0), - cur_path_(nullptr), - t0_(utils::SteadyClock::now()), - rate_estimator_(nullptr) { - init(); -} - -RaaqmTransportProtocol::~RaaqmTransportProtocol() { - if (rate_estimator_) { - delete rate_estimator_; - } -} - -int RaaqmTransportProtocol::start() { - if (rate_estimator_) { - rate_estimator_->onStart(); - } - - if (!cur_path_) { - // RAAQM - double drop_factor; - double minimum_drop_probability; - uint32_t sample_number; - uint32_t interest_lifetime; - - socket_->getSocketOption(RaaqmTransportOptions::DROP_FACTOR, drop_factor); - socket_->getSocketOption(RaaqmTransportOptions::MINIMUM_DROP_PROBABILITY, - minimum_drop_probability); - socket_->getSocketOption(RaaqmTransportOptions::SAMPLE_NUMBER, - sample_number); - socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, - interest_lifetime); - - // Rate Estimation - double alpha = 0.0; - uint32_t batching_param = 0; - uint32_t choice_param = 0; - socket_->getSocketOption(RateEstimationOptions::RATE_ESTIMATION_ALPHA, - alpha); - socket_->getSocketOption( - RateEstimationOptions::RATE_ESTIMATION_BATCH_PARAMETER, batching_param); - socket_->getSocketOption(RateEstimationOptions::RATE_ESTIMATION_CHOICE, - choice_param); - - if (choice_param == 1) { - rate_estimator_ = new ALaTcpEstimator(); - } else { - rate_estimator_ = new SimpleEstimator(alpha, batching_param); - } - - socket_->getSocketOption(RateEstimationOptions::RATE_ESTIMATION_OBSERVER, - &rate_estimator_->observer_); - - // Current path - auto cur_path = std::make_unique( - drop_factor, minimum_drop_probability, interest_lifetime * 1000, - sample_number); - cur_path_ = cur_path.get(); - path_table_[default_values::path_id] = std::move(cur_path); - } - - portal_->setConsumerCallback(this); - return TransportProtocol::start(); -} - -void RaaqmTransportProtocol::resume() { return TransportProtocol::resume(); } - -void RaaqmTransportProtocol::reset() { - // Set first segment to retrieve - core::Name *name; - socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &name); - index_manager_->reset(); - index_manager_->setFirstSuffix(name->getSuffix()); - std::queue empty; - std::swap(interest_to_retransmit_, empty); - stats_->reset(); - - // Reset reassembly component - reassembly_protocol_->reInitialize(); - - // Reset protocol variables - interests_in_flight_ = 0; - t0_ = utils::SteadyClock::now(); -} - -bool RaaqmTransportProtocol::verifyKeyPackets() { - return index_manager_->onKeyToVerify(); -} - -void RaaqmTransportProtocol::increaseWindow() { - // return; - double max_window_size = 0.; - socket_->getSocketOption(GeneralTransportOptions::MAX_WINDOW_SIZE, - max_window_size); - if (current_window_size_ < max_window_size) { - double gamma = 0.; - socket_->getSocketOption(RaaqmTransportOptions::GAMMA_VALUE, gamma); - - current_window_size_ += gamma / current_window_size_; - socket_->setSocketOption(GeneralTransportOptions::CURRENT_WINDOW_SIZE, - current_window_size_); - } - rate_estimator_->onWindowIncrease(current_window_size_); -} - -void RaaqmTransportProtocol::decreaseWindow() { - // return; - double min_window_size = 0.; - socket_->getSocketOption(GeneralTransportOptions::MIN_WINDOW_SIZE, - min_window_size); - if (current_window_size_ > min_window_size) { - double beta = 0.; - socket_->getSocketOption(RaaqmTransportOptions::BETA_VALUE, beta); - - current_window_size_ = current_window_size_ * beta; - if (current_window_size_ < min_window_size) { - current_window_size_ = min_window_size; - } - - socket_->setSocketOption(GeneralTransportOptions::CURRENT_WINDOW_SIZE, - current_window_size_); - } - rate_estimator_->onWindowDecrease(current_window_size_); -} - -void RaaqmTransportProtocol::afterDataUnsatisfied(uint64_t segment) { - // Decrease the window because the timeout happened - decreaseWindow(); -} - -void RaaqmTransportProtocol::afterContentReception( - const Interest &interest, const ContentObject &content_object) { - updatePathTable(content_object); - increaseWindow(); - updateRtt(interest.getName().getSuffix()); - rate_estimator_->onDataReceived((int)content_object.payloadSize() + - (int)content_object.headerSize()); - // Set drop probablility and window size accordingly - RAAQM(); -} - -void RaaqmTransportProtocol::init() { - std::ifstream is(RAAQM_CONFIG_PATH); - - std::string line; - raaqm_autotune_ = false; - default_beta_ = default_values::beta_value; - default_drop_ = default_values::drop_factor; - beta_wifi_ = default_values::beta_value; - drop_wifi_ = default_values::drop_factor; - beta_lte_ = default_values::beta_value; - drop_lte_ = default_values::drop_factor; - wifi_delay_ = 1000; - lte_delay_ = 15000; - - if (!is) { - TRANSPORT_LOGW( - "WARNING: RAAQM parameters not found at %s, set default values", - RAAQM_CONFIG_PATH); - return; - } - - while (getline(is, line)) { - std::string command; - std::istringstream line_s(line); - - line_s >> command; - - if (command == ";") { - continue; - } - - if (command == "autotune") { - std::string tmp; - std::string val; - line_s >> tmp >> val; - if (val == "yes") { - raaqm_autotune_ = true; - } else { - raaqm_autotune_ = false; - } - continue; - } - - if (command == "lifetime") { - std::string tmp; - uint32_t lifetime; - line_s >> tmp >> lifetime; - socket_->setSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, - lifetime); - continue; - } - - if (command == "retransmissions") { - std::string tmp; - uint32_t rtx; - line_s >> tmp >> rtx; - socket_->setSocketOption(GeneralTransportOptions::MAX_INTEREST_RETX, rtx); - continue; - } - - if (command == "beta") { - std::string tmp; - line_s >> tmp >> default_beta_; - socket_->setSocketOption(RaaqmTransportOptions::BETA_VALUE, - default_beta_); - continue; - } - - if (command == "drop") { - std::string tmp; - line_s >> tmp >> default_drop_; - socket_->setSocketOption(RaaqmTransportOptions::DROP_FACTOR, - default_drop_); - continue; - } - - if (command == "beta_wifi_") { - std::string tmp; - line_s >> tmp >> beta_wifi_; - continue; - } - - if (command == "drop_wifi_") { - std::string tmp; - line_s >> tmp >> drop_wifi_; - continue; - } - - if (command == "beta_lte_") { - std::string tmp; - line_s >> tmp >> beta_lte_; - continue; - } - - if (command == "drop_lte_") { - std::string tmp; - line_s >> tmp >> drop_lte_; - continue; - } - - if (command == "wifi_delay_") { - std::string tmp; - line_s >> tmp >> wifi_delay_; - continue; - } - - if (command == "lte_delay_") { - std::string tmp; - line_s >> tmp >> lte_delay_; - continue; - } - if (command == "alpha") { - std::string tmp; - double rate_alpha = 0.0; - line_s >> tmp >> rate_alpha; - socket_->setSocketOption(RateEstimationOptions::RATE_ESTIMATION_ALPHA, - rate_alpha); - continue; - } - - if (command == "batching_parameter") { - std::string tmp; - uint32_t batching_param = 0; - line_s >> tmp >> batching_param; - socket_->setSocketOption( - RateEstimationOptions::RATE_ESTIMATION_BATCH_PARAMETER, - batching_param); - continue; - } - - if (command == "rate_estimator") { - std::string tmp; - uint32_t choice_param = 0; - line_s >> tmp >> choice_param; - socket_->setSocketOption(RateEstimationOptions::RATE_ESTIMATION_CHOICE, - choice_param); - continue; - } - } - - is.close(); -} - -void RaaqmTransportProtocol::onContentObject( - Interest::Ptr &&interest, ContentObject::Ptr &&content_object) { - // Check whether makes sense to continue - if (TRANSPORT_EXPECT_FALSE(!is_running_)) { - return; - } - - // Call application-defined callbacks - ConsumerContentObjectCallback *callback_content_object = VOID_HANDLER; - socket_->getSocketOption(ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT, - &callback_content_object); - if (*callback_content_object) { - (*callback_content_object)(*socket_, *content_object); - } - - ConsumerInterestCallback *callback_interest = VOID_HANDLER; - socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_SATISFIED, - &callback_interest); - if (*callback_interest) { - (*callback_interest)(*socket_, *interest); - } - - if (content_object->getPayloadType() == PayloadType::CONTENT_OBJECT) { - stats_->updateBytesRecv(content_object->payloadSize()); - } - - onContentSegment(std::move(interest), std::move(content_object)); - scheduleNextInterests(); -} - -void RaaqmTransportProtocol::onContentSegment( - Interest::Ptr &&interest, ContentObject::Ptr &&content_object) { - uint32_t incremental_suffix = content_object->getName().getSuffix(); - - // Decrease in-flight interests - interests_in_flight_--; - - // Update stats - if (!interest_retransmissions_[incremental_suffix & mask]) { - afterContentReception(*interest, *content_object); - } - - index_manager_->onContentObject(std::move(interest), - std::move(content_object)); -} - -void RaaqmTransportProtocol::onPacketDropped( - Interest::Ptr &&interest, ContentObject::Ptr &&content_object) { - uint32_t max_rtx = 0; - socket_->getSocketOption(GeneralTransportOptions::MAX_INTEREST_RETX, max_rtx); - - uint64_t segment = interest->getName().getSuffix(); - ConsumerInterestCallback *callback = VOID_HANDLER; - if (TRANSPORT_EXPECT_TRUE(interest_retransmissions_[segment & mask] < - max_rtx)) { - stats_->updateRetxCount(1); - - callback = VOID_HANDLER; - socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_RETRANSMISSION, - &callback); - if (*callback) { - (*callback)(*socket_, *interest); - } - - callback = VOID_HANDLER; - socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT, - &callback); - if (*callback) { - (*callback)(*socket_, *interest); - } - - if (!is_running_) { - return; - } - - interest_retransmissions_[segment & mask]++; - - interest_to_retransmit_.push(std::move(interest)); - } else { - TRANSPORT_LOGE( - "Stop: received not trusted packet %llu times", - (unsigned long long)interest_retransmissions_[segment & mask]); - onContentReassembled( - make_error_code(protocol_error::max_retransmissions_error)); - } -} - -void RaaqmTransportProtocol::onReassemblyFailed(std::uint32_t missing_segment) { - -} - -void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) { - checkForStalePaths(); - - const Name &n = interest->getName(); - - TRANSPORT_LOGW("Timeout on content %s", n.toString().c_str()); - - if (TRANSPORT_EXPECT_FALSE(!is_running_)) { - return; - } - - interests_in_flight_--; - - uint64_t segment = n.getSuffix(); - - // Do not retransmit interests asking contents that do not exist. - if (segment > index_manager_->getFinalSuffix()) { - return; - } - - ConsumerInterestCallback *callback = VOID_HANDLER; - socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_EXPIRED, - &callback); - if (*callback) { - (*callback)(*socket_, *interest); - } - - afterDataUnsatisfied(segment); - - uint32_t max_rtx = 0; - socket_->getSocketOption(GeneralTransportOptions::MAX_INTEREST_RETX, max_rtx); - - if (TRANSPORT_EXPECT_TRUE(interest_retransmissions_[segment & mask] < - max_rtx)) { - stats_->updateRetxCount(1); - - callback = VOID_HANDLER; - socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_RETRANSMISSION, - &callback); - if (*callback) { - (*callback)(*socket_, *interest); - } - - callback = VOID_HANDLER; - socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT, - &callback); - if (*callback) { - (*callback)(*socket_, *interest); - } - - if (!is_running_) { - return; - } - - interest_retransmissions_[segment & mask]++; - - interest_to_retransmit_.push(std::move(interest)); - - scheduleNextInterests(); - } else { - TRANSPORT_LOGE("Stop: reached max retx limit."); - onContentReassembled(std::make_error_code(std::errc(std::errc::io_error))); - } -} - -void RaaqmTransportProtocol::scheduleNextInterests() { - if (TRANSPORT_EXPECT_FALSE(!is_running_ && !is_first_)) { - return; - } - - if (TRANSPORT_EXPECT_FALSE(interests_in_flight_ >= current_window_size_ && - interest_to_retransmit_.size() > 0)) { - // send at least one interest if there are retransmissions to perform and - // there is no space left in the window - sendInterest(std::move(interest_to_retransmit_.front())); - TRANSPORT_LOGD("Window full, retransmit one content interest"); - interest_to_retransmit_.pop(); - } - - uint32_t index = IndexManager::invalid_index; - - // Send the interest needed for filling the window - while (interests_in_flight_ < current_window_size_) { - if (interest_to_retransmit_.size() > 0) { - sendInterest(std::move(interest_to_retransmit_.front())); - TRANSPORT_LOGD("Retransmit content interest"); - interest_to_retransmit_.pop(); - } else { - index = index_manager_->getNextSuffix(); - if (index == IndexManager::invalid_index) { - TRANSPORT_LOGE("INVALID INDEX %d", index); - break; - } - - sendInterest(index); - TRANSPORT_LOGD("Send content interest %u", index); - } - } -} - -void RaaqmTransportProtocol::sendInterest(std::uint64_t next_suffix) { - auto interest = getPacket(); - core::Name *name; - socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &name); - name->setSuffix((uint32_t)next_suffix); - interest->setName(*name); - - uint32_t interest_lifetime; - socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, - interest_lifetime); - interest->setLifetime(interest_lifetime); - - ConsumerInterestCallback *callback = VOID_HANDLER; - socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT, - &callback); - if (*callback) { - callback->operator()(*socket_, *interest); - } - - if (TRANSPORT_EXPECT_FALSE(!is_running_ && !is_first_)) { - return; - } - - // This is set to ~0 so that the next interest_retransmissions_ + 1, - // performed by sendInterest, will result in 0 - interest_retransmissions_[next_suffix & mask] = ~0; - interest_timepoints_[next_suffix & mask] = utils::SteadyClock::now(); - sendInterest(std::move(interest)); -} - -void RaaqmTransportProtocol::sendInterest(Interest::Ptr &&interest) { - interests_in_flight_++; - interest_retransmissions_[interest->getName().getSuffix() & mask]++; - - portal_->sendInterest(std::move(interest)); -} - -void RaaqmTransportProtocol::onContentReassembled(std::error_code ec) { - rate_estimator_->onDownloadFinished(); - TransportProtocol::onContentReassembled(ec); -} - -void RaaqmTransportProtocol::updateRtt(uint64_t segment) { - if (TRANSPORT_EXPECT_FALSE(!cur_path_)) { - throw std::runtime_error("RAAQM ERROR: no current path found, exit"); - } else { - auto now = utils::SteadyClock::now(); - utils::Microseconds rtt = std::chrono::duration_cast( - now - interest_timepoints_[segment & mask]); - - // Update stats - updateStats((uint32_t)segment, rtt.count(), now); - - if (rate_estimator_) { - rate_estimator_->onRttUpdate((double)rtt.count()); - } - - cur_path_->insertNewRtt(rtt.count()); - cur_path_->smoothTimer(); - - if (cur_path_->newPropagationDelayAvailable()) { - checkDropProbability(); - } - } -} - -void RaaqmTransportProtocol::RAAQM() { - if (!cur_path_) { - throw errors::RuntimeException("ERROR: no current path found, exit"); - exit(EXIT_FAILURE); - } else { - // Change drop probability according to RTT statistics - cur_path_->updateDropProb(); - - double coin = ((double)rand() / (RAND_MAX)); - if (coin <= cur_path_->getDropProb()) { - decreaseWindow(); - } - } -} - -void RaaqmTransportProtocol::updateStats(uint32_t suffix, uint64_t rtt, - utils::TimePoint &now) { - // Update RTT statistics - stats_->updateAverageRtt(rtt); - stats_->updateAverageWindowSize(current_window_size_); - - // Call statistics callback - ConsumerTimerCallback *stats_callback = VOID_HANDLER; - socket_->getSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY, - &stats_callback); - if (*stats_callback) { - auto dt = std::chrono::duration_cast(now - t0_); - - uint32_t timer_interval_milliseconds = 0; - socket_->getSocketOption(GeneralTransportOptions::STATS_INTERVAL, - timer_interval_milliseconds); - if (dt.count() > timer_interval_milliseconds) { - (*stats_callback)(*socket_, *stats_); - t0_ = utils::SteadyClock::now(); - } - } -} - -void RaaqmTransportProtocol::updatePathTable( - const ContentObject &content_object) { - uint32_t path_id = content_object.getPathLabel(); - - if (path_table_.find(path_id) == path_table_.end()) { - if (TRANSPORT_EXPECT_TRUE(cur_path_ != nullptr)) { - // Create a new path with some default param - - if (TRANSPORT_EXPECT_FALSE(path_table_.empty())) { - throw errors::RuntimeException( - "[RAAQM] No path initialized for path table, error could be in " - "default path initialization."); - } - - // Initiate the new path default param - auto new_path = std::make_unique( - *(path_table_.at(default_values::path_id))); - - // Insert the new path into hash table - path_table_[path_id] = std::move(new_path); - } else { - throw errors::RuntimeException( - "UNEXPECTED ERROR: when running,current path not found."); - } - } - - cur_path_ = path_table_[path_id].get(); - - size_t header_size = content_object.headerSize(); - size_t data_size = content_object.payloadSize(); - - // Update measurements for path - cur_path_->updateReceivedStats(header_size + data_size, data_size); -} - -void RaaqmTransportProtocol::checkDropProbability() { - if (!raaqm_autotune_) { - return; - } - - unsigned int max_pd = 0; - PathTable::iterator it; - for (auto it = path_table_.begin(); it != path_table_.end(); ++it) { - if (it->second->getPropagationDelay() > max_pd && - it->second->getPropagationDelay() != UINT_MAX && - !it->second->isStale()) { - max_pd = it->second->getPropagationDelay(); - } - } - - double drop_prob = 0; - double beta = 0; - if (max_pd < wifi_delay_) { // only ethernet paths - drop_prob = default_drop_; - beta = default_beta_; - } else if (max_pd < lte_delay_) { // at least one wifi path - drop_prob = drop_wifi_; - beta = beta_wifi_; - } else { // at least one lte path - drop_prob = drop_lte_; - beta = beta_lte_; - } - - double old_drop_prob = 0; - double old_beta = 0; - socket_->getSocketOption(RaaqmTransportOptions::BETA_VALUE, old_beta); - socket_->getSocketOption(RaaqmTransportOptions::DROP_FACTOR, old_drop_prob); - - if (drop_prob == old_drop_prob && beta == old_beta) { - return; - } - - socket_->setSocketOption(RaaqmTransportOptions::BETA_VALUE, beta); - socket_->setSocketOption(RaaqmTransportOptions::DROP_FACTOR, drop_prob); - - for (it = path_table_.begin(); it != path_table_.end(); it++) { - it->second->setDropProb(drop_prob); - } -} - -void RaaqmTransportProtocol::checkForStalePaths() { - if (!raaqm_autotune_) { - return; - } - - bool stale = false; - PathTable::iterator it; - for (it = path_table_.begin(); it != path_table_.end(); ++it) { - if (it->second->isStale()) { - stale = true; - break; - } - } - if (stale) { - checkDropProbability(); - } -} - -} // end namespace protocol - -} // namespace transport diff --git a/libtransport/src/hicn/transport/protocols/raaqm.h b/libtransport/src/hicn/transport/protocols/raaqm.h deleted file mode 100644 index f2d819ec5..000000000 --- a/libtransport/src/hicn/transport/protocols/raaqm.h +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include -#include -#include -#include -#include -#include - -#include -#include - -namespace transport { - -namespace protocol { - -class RaaqmTransportProtocol : public TransportProtocol, - public CWindowProtocol { - public: - RaaqmTransportProtocol(interface::ConsumerSocket *icnet_socket); - - ~RaaqmTransportProtocol(); - - int start() override; - - void resume() override; - - void reset() override; - - virtual bool verifyKeyPackets() override; - - protected: - static constexpr uint32_t buffer_size = - 1 << interface::default_values::log_2_default_buffer_size; - static constexpr uint16_t mask = buffer_size - 1; - using PathTable = - std::unordered_map>; - - void increaseWindow() override; - void decreaseWindow() override; - - virtual void afterContentReception(const Interest &interest, - const ContentObject &content_object); - virtual void afterDataUnsatisfied(uint64_t segment); - - virtual void updateStats(uint32_t suffix, uint64_t rtt, - utils::TimePoint &now); - - private: - void init(); - - void onContentObject(Interest::Ptr &&i, ContentObject::Ptr &&c) override; - - void onContentSegment(Interest::Ptr &&interest, - ContentObject::Ptr &&content_object); - - void onPacketDropped(Interest::Ptr &&interest, - ContentObject::Ptr &&content_object) override; - - void onReassemblyFailed(std::uint32_t missing_segment) override; - - void onTimeout(Interest::Ptr &&i) override; - - virtual void scheduleNextInterests() override; - - void sendInterest(std::uint64_t next_suffix); - - void sendInterest(Interest::Ptr &&interest); - - void onContentReassembled(std::error_code ec) override; - - void updateRtt(uint64_t segment); - - void RAAQM(); - - void updatePathTable(const ContentObject &content_object); - - void checkDropProbability(); - - void checkForStalePaths(); - - void printRtt(); - - protected: - // Congestion window management - double current_window_size_; - // Protocol management - uint64_t interests_in_flight_; - std::array interest_retransmissions_; - std::array interest_timepoints_; - std::queue interest_to_retransmit_; - - private: - /** - * Current download path - */ - RaaqmDataPath *cur_path_; - - /** - * Hash table for path: each entry is a pair path ID(key) - path object - */ - PathTable path_table_; - - // TimePoints for statistic - utils::TimePoint t0_; - - bool set_interest_filter_; - - // for rate-estimation at packet level - IcnRateEstimator *rate_estimator_; - - // params for autotuning - bool raaqm_autotune_; - double default_beta_; - double default_drop_; - double beta_wifi_; - double drop_wifi_; - double beta_lte_; - double drop_lte_; - unsigned int wifi_delay_; - unsigned int lte_delay_; -}; - -} // end namespace protocol - -} // end namespace transport diff --git a/libtransport/src/hicn/transport/protocols/raaqm_data_path.cc b/libtransport/src/hicn/transport/protocols/raaqm_data_path.cc deleted file mode 100644 index e25646205..000000000 --- a/libtransport/src/hicn/transport/protocols/raaqm_data_path.cc +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include -#include - -namespace transport { - -namespace protocol { - -RaaqmDataPath::RaaqmDataPath(double drop_factor, - double minimum_drop_probability, - unsigned new_timer, unsigned int samples, - uint64_t new_rtt, uint64_t new_rtt_min, - uint64_t new_rtt_max, unsigned new_pd) - - : drop_factor_(drop_factor), - minimum_drop_probability_(minimum_drop_probability), - timer_(new_timer), - samples_(samples), - rtt_(new_rtt), - rtt_min_(new_rtt_min), - rtt_max_(new_rtt_max), - prop_delay_(new_pd), - new_prop_delay_(false), - drop_prob_(0), - packets_received_(0), - last_packets_received_(0), - m_packets_bytes_received_(0), - last_packets_bytes_received_(0), - raw_data_bytes_received_(0), - last_raw_data_bytes_received_(0), - rtt_samples_(samples_), - last_received_pkt_(utils::SteadyClock::now()), - average_rtt_(0), - alpha_(ALPHA) {} - -RaaqmDataPath &RaaqmDataPath::insertNewRtt(uint64_t new_rtt) { - rtt_ = new_rtt; - rtt_samples_.pushBack(new_rtt); - - rtt_max_ = rtt_samples_.rBegin(); - rtt_min_ = rtt_samples_.begin(); - - if (rtt_min_ < prop_delay_) { - new_prop_delay_ = true; - prop_delay_ = rtt_min_; - } - - last_received_pkt_ = utils::SteadyClock::now(); - - return *this; -} - -RaaqmDataPath &RaaqmDataPath::updateReceivedStats(std::size_t packet_size, - std::size_t data_size) { - packets_received_++; - m_packets_bytes_received_ += packet_size; - raw_data_bytes_received_ += data_size; - - return *this; -} - -double RaaqmDataPath::getDropFactor() { return drop_factor_; } - -double RaaqmDataPath::getDropProb() { return drop_prob_; } - -RaaqmDataPath &RaaqmDataPath::setDropProb(double dropProb) { - drop_prob_ = dropProb; - - return *this; -} - -double RaaqmDataPath::getMinimumDropProbability() { - return minimum_drop_probability_; -} - -double RaaqmDataPath::getTimer() { return timer_; } - -RaaqmDataPath &RaaqmDataPath::smoothTimer() { - timer_ = (1 - TIMEOUT_SMOOTHER) * timer_ + - (TIMEOUT_SMOOTHER)*rtt_ * (TIMEOUT_RATIO); - - return *this; -} - -double RaaqmDataPath::getRtt() { return (double)rtt_; } - -double RaaqmDataPath::getAverageRtt() { return average_rtt_; } - -double RaaqmDataPath::getRttMax() { return (double)rtt_max_; } - -double RaaqmDataPath::getRttMin() { return (double)rtt_min_; } - -unsigned RaaqmDataPath::getSampleValue() { return samples_; } - -unsigned RaaqmDataPath::getRttQueueSize() { - return static_cast(rtt_samples_.size()); -} - -RaaqmDataPath &RaaqmDataPath::updateDropProb() { - drop_prob_ = 0.0; - - if (getSampleValue() == getRttQueueSize()) { - if (rtt_max_ == rtt_min_) { - drop_prob_ = minimum_drop_probability_; - } else { - drop_prob_ = minimum_drop_probability_ + - drop_factor_ * (rtt_ - rtt_min_) / (rtt_max_ - rtt_min_); - } - } - - return *this; -} - -void RaaqmDataPath::setAlpha(double alpha) { - if (alpha >= 0 && alpha <= 1) { - alpha_ = alpha; - } -} - -bool RaaqmDataPath::newPropagationDelayAvailable() { - bool r = new_prop_delay_; - new_prop_delay_ = false; - return r; -} - -unsigned int RaaqmDataPath::getPropagationDelay() { - return (unsigned int)prop_delay_; -} - -bool RaaqmDataPath::isStale() { - utils::TimePoint now = utils::SteadyClock::now(); - auto time = - std::chrono::duration_cast(now - last_received_pkt_) - .count(); - if (time > 2000000) { - return true; - } - return false; -} - -} // end namespace protocol - -} // end namespace transport diff --git a/libtransport/src/hicn/transport/protocols/raaqm_data_path.h b/libtransport/src/hicn/transport/protocols/raaqm_data_path.h deleted file mode 100644 index 9e4accfa5..000000000 --- a/libtransport/src/hicn/transport/protocols/raaqm_data_path.h +++ /dev/null @@ -1,225 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include -#include - -#include -#include -#include - -#define TIMEOUT_SMOOTHER 0.1 -#define TIMEOUT_RATIO 10 -#define ALPHA 0.8 - -namespace transport { - -namespace protocol { - -class RaaqmDataPath { - public: - RaaqmDataPath(double drop_factor, double minimum_drop_probability, - unsigned new_timer, unsigned int samples, - uint64_t new_rtt = 1000, uint64_t new_rtt_min = 1000, - uint64_t new_rtt_max = 1000, unsigned new_pd = UINT_MAX); - - public: - /* - * @brief Add a new RTT to the RTT queue of the path, check if RTT queue is - * full, and thus need overwrite. Also it maintains the validity of min and - * max of RTT. - * @param new_rtt is the value of the new RTT - */ - RaaqmDataPath &insertNewRtt(uint64_t new_rtt); - - /** - * @brief Update the path statistics - * @param packet_size the size of the packet received, including the ICN - * header - * @param data_size the size of the data received, without the ICN header - */ - RaaqmDataPath &updateReceivedStats(std::size_t packet_size, - std::size_t data_size); - - /** - * @brief Get the value of the drop factor parameter - */ - double getDropFactor(); - - /** - * @brief Get the value of the drop probability - */ - double getDropProb(); - - /** - * @brief Set the value pf the drop probability - * @param drop_prob is the value of the drop probability - */ - RaaqmDataPath &setDropProb(double drop_prob); - - /** - * @brief Get the minimum drop probability - */ - double getMinimumDropProbability(); - - /** - * @brief Get last RTT - */ - double getRtt(); - - /** - * @brief Get average RTT - */ - double getAverageRtt(); - - /** - * @brief Get the current m_timer value - */ - double getTimer(); - - /** - * @brief Smooth he value of the m_timer accordingly with the last RTT - * measured - */ - RaaqmDataPath &smoothTimer(); - - /** - * @brief Get the maximum RTT among the last samples - */ - double getRttMax(); - - /** - * @brief Get the minimum RTT among the last samples - */ - double getRttMin(); - - /** - * @brief Get the number of saved samples - */ - unsigned getSampleValue(); - - /** - * @brief Get the size og the RTT queue - */ - unsigned getRttQueueSize(); - - /* - * @brief Change drop probability according to RTT statistics - * Invoked in RAAQM(), before control window size update. - */ - RaaqmDataPath &updateDropProb(); - - void setAlpha(double alpha); - - /** - * @brief Returns the smallest RTT registered so far for this path - */ - - unsigned int getPropagationDelay(); - - bool newPropagationDelayAvailable(); - - bool isStale(); - - private: - /** - * The value of the drop factor - */ - double drop_factor_; - - /** - * The minumum drop probability - */ - double minimum_drop_probability_; - - /** - * The timer, expressed in milliseconds - */ - double timer_; - - /** - * The number of samples to store for computing the protocol measurements - */ - const unsigned int samples_; - - /** - * The last, the minimum and the maximum value of the RTT (among the last - * m_samples samples) - */ - uint64_t rtt_, rtt_min_, rtt_max_, prop_delay_; - - bool new_prop_delay_; - - /** - * The current drop probability - */ - double drop_prob_; - - /** - * The number of packets received in this path - */ - intmax_t packets_received_; - - /** - * The first packet received after the statistics print - */ - intmax_t last_packets_received_; - - /** - * Total number of bytes received including the ICN header - */ - intmax_t m_packets_bytes_received_; - - /** - * The amount of packet bytes received at the last path summary computation - */ - intmax_t last_packets_bytes_received_; - - /** - * Total number of bytes received without including the ICN header - */ - intmax_t raw_data_bytes_received_; - - /** - * The amount of raw dat bytes received at the last path summary computation - */ - intmax_t last_raw_data_bytes_received_; - - class byArrival; - - class byOrder; - - /** - * Double ended queue for the RTTs - */ - - typedef utils::MinFilter RTTQueue; - - RTTQueue rtt_samples_; - - /** - * Time of the last call to the path reporter method - */ - utils::TimePoint last_received_pkt_; - - double average_rtt_; - double alpha_; -}; - -} // end namespace protocol - -} // end namespace transport diff --git a/libtransport/src/hicn/transport/protocols/rate_estimation.cc b/libtransport/src/hicn/transport/protocols/rate_estimation.cc deleted file mode 100644 index 50306e6e5..000000000 --- a/libtransport/src/hicn/transport/protocols/rate_estimation.cc +++ /dev/null @@ -1,355 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include -#include -#include - -#include - -namespace transport { - -namespace protocol { - -void *Timer(void *data) { - InterRttEstimator *estimator = (InterRttEstimator *)data; - - double dat_rtt, my_avg_win, my_avg_rtt; - int my_win_change, number_of_packets, max_packet_size; - - pthread_mutex_lock(&(estimator->mutex_)); - dat_rtt = estimator->rtt_; - pthread_mutex_unlock(&(estimator->mutex_)); - - while (estimator->is_running_) { - std::this_thread::sleep_for(std::chrono::microseconds( - (uint64_t)(interface::default_values::kv * dat_rtt))); - - pthread_mutex_lock(&(estimator->mutex_)); - - dat_rtt = estimator->rtt_; - my_avg_win = estimator->avg_win_; - my_avg_rtt = estimator->avg_rtt_; - my_win_change = (int)(estimator->win_change_); - number_of_packets = estimator->number_of_packets_; - max_packet_size = estimator->max_packet_size_; - estimator->avg_rtt_ = estimator->rtt_; - estimator->avg_win_ = 0; - estimator->win_change_ = 0; - estimator->number_of_packets_ = 1; - - pthread_mutex_unlock(&(estimator->mutex_)); - - if (number_of_packets == 0 || my_win_change == 0) { - continue; - } - if (estimator->estimation_ == 0) { - estimator->estimation_ = (my_avg_win * 8.0 * max_packet_size * 1000000.0 / - (1.0 * my_win_change)) / - (my_avg_rtt / (1.0 * number_of_packets)); - } - - estimator->estimation_ = - estimator->alpha_ * estimator->estimation_ + - (1 - estimator->alpha_) * ((my_avg_win * 8.0 * max_packet_size * - 1000000.0 / (1.0 * my_win_change)) / - (my_avg_rtt / (1.0 * number_of_packets))); - - if (estimator->observer_) { - estimator->observer_->notifyStats(estimator->estimation_); - } - } - - return nullptr; -} - -InterRttEstimator::InterRttEstimator(double alpha_arg) { - this->estimated_ = false; - this->observer_ = NULL; - this->alpha_ = alpha_arg; - this->thread_is_running_ = false; - this->my_th_ = NULL; - this->is_running_ = true; - this->avg_rtt_ = 0.0; - this->estimation_ = 0.0; - this->avg_win_ = 0.0; - this->rtt_ = 0.0; - this->win_change_ = 0; - this->number_of_packets_ = 0; - this->max_packet_size_ = 0; - this->win_current_ = 1.0; - - pthread_mutex_init(&(this->mutex_), NULL); - this->start_time_ = std::chrono::steady_clock::now(); - this->begin_batch_ = std::chrono::steady_clock::now(); -} - -InterRttEstimator::~InterRttEstimator() { - this->is_running_ = false; - if (this->my_th_) { - pthread_join(*(this->my_th_), NULL); - } - this->my_th_ = NULL; - pthread_mutex_destroy(&(this->mutex_)); -} - -void InterRttEstimator::onRttUpdate(double rtt) { - pthread_mutex_lock(&(this->mutex_)); - this->rtt_ = rtt; - this->number_of_packets_++; - this->avg_rtt_ += rtt; - pthread_mutex_unlock(&(this->mutex_)); - - if (!thread_is_running_) { - my_th_ = (pthread_t *)malloc(sizeof(pthread_t)); - if (!my_th_) { - TRANSPORT_LOGE("Error allocating thread."); - my_th_ = NULL; - } - if (/*int err = */ pthread_create(my_th_, NULL, transport::protocol::Timer, - (void *)this)) { - TRANSPORT_LOGE("Error creating the thread"); - my_th_ = NULL; - } - thread_is_running_ = true; - } -} - -void InterRttEstimator::onWindowIncrease(double win_current) { - TimePoint end = std::chrono::steady_clock::now(); - auto delay = - std::chrono::duration_cast(end - this->begin_batch_) - .count(); - - pthread_mutex_lock(&(this->mutex_)); - this->avg_win_ += this->win_current_ * delay; - this->win_current_ = win_current; - this->win_change_ += delay; - pthread_mutex_unlock(&(this->mutex_)); - - this->begin_batch_ = std::chrono::steady_clock::now(); -} - -void InterRttEstimator::onWindowDecrease(double win_current) { - TimePoint end = std::chrono::steady_clock::now(); - auto delay = - std::chrono::duration_cast(end - this->begin_batch_) - .count(); - - pthread_mutex_lock(&(this->mutex_)); - this->avg_win_ += this->win_current_ * delay; - this->win_current_ = win_current; - this->win_change_ += delay; - pthread_mutex_unlock(&(this->mutex_)); - - this->begin_batch_ = std::chrono::steady_clock::now(); -} - -ALaTcpEstimator::ALaTcpEstimator() { - this->estimation_ = 0.0; - this->observer_ = NULL; - this->start_time_ = std::chrono::steady_clock::now(); - this->totalSize_ = 0.0; -} - -void ALaTcpEstimator::onStart() { - this->totalSize_ = 0.0; - this->start_time_ = std::chrono::steady_clock::now(); -} - -void ALaTcpEstimator::onDownloadFinished() { - TimePoint end = std::chrono::steady_clock::now(); - auto delay = - std::chrono::duration_cast(end - this->start_time_).count(); - this->estimation_ = this->totalSize_ * 8 * 1000000 / delay; - if (observer_) { - observer_->notifyStats(this->estimation_); - } -} - -void ALaTcpEstimator::onDataReceived(int packet_size) { - this->totalSize_ += packet_size; -} - -SimpleEstimator::SimpleEstimator(double alphaArg, int batching_param) { - this->estimation_ = 0.0; - this->estimated_ = false; - this->observer_ = nullptr; - this->batching_param_ = batching_param; - this->total_size_ = 0.0; - this->number_of_packets_ = 0; - this->base_alpha_ = alphaArg; - this->alpha_ = alphaArg; - this->start_time_ = std::chrono::steady_clock::now(); - this->begin_batch_ = std::chrono::steady_clock::now(); -} - -void SimpleEstimator::onStart() { - this->estimated_ = false; - this->number_of_packets_ = 0; - this->total_size_ = 0.0; - this->start_time_ = std::chrono::steady_clock::now(); - this->begin_batch_ = std::chrono::steady_clock::now(); -} - -void SimpleEstimator::onDownloadFinished() { - TimePoint end = std::chrono::steady_clock::now(); - auto delay = - std::chrono::duration_cast(end - this->start_time_).count(); - if (observer_) { - observer_->notifyDownloadTime((double)delay); - } - if (!this->estimated_) { - // Assuming all packets carry max_packet_size_ bytes of data - // (8*max_packet_size_ bits); 1000000 factor to convert us to seconds - if (this->estimation_) { - this->estimation_ = - alpha_ * this->estimation_ + - (1 - alpha_) * (total_size_ * 8 * 1000000.0 / (delay)); - } else { - this->estimation_ = total_size_ * 8 * 1000000.0 / (delay); - } - if (observer_) { - observer_->notifyStats(this->estimation_); - } - this->alpha_ = this->base_alpha_ * (((double)this->number_of_packets_) / - ((double)this->batching_param_)); - } else { - if (this->number_of_packets_ >= - (int)(75.0 * (double)this->batching_param_ / 100.0)) { - delay = std::chrono::duration_cast(end - this->begin_batch_) - .count(); - // Assuming all packets carry max_packet_size_ bytes of data - // (8*max_packet_size_ bits); 1000000 factor to convert us to seconds - if (this->estimation_) { - this->estimation_ = - alpha_ * this->estimation_ + - (1 - alpha_) * (total_size_ * 8 * 1000000.0 / (delay)); - } else { - this->estimation_ = total_size_ * 8 * 1000000.0 / (delay); - } - if (observer_) { - observer_->notifyStats(this->estimation_); - } - this->alpha_ = this->base_alpha_ * (((double)this->number_of_packets_) / - ((double)this->batching_param_)); - } - } - this->number_of_packets_ = 0; - this->total_size_ = 0.0; - this->start_time_ = std::chrono::steady_clock::now(); - this->begin_batch_ = std::chrono::steady_clock::now(); -} - -void SimpleEstimator::onDataReceived(int packet_size) { - this->total_size_ += packet_size; -} - -void SimpleEstimator::onRttUpdate(double rtt) { - this->number_of_packets_++; - - if (this->number_of_packets_ == this->batching_param_) { - TimePoint end = std::chrono::steady_clock::now(); - auto delay = - std::chrono::duration_cast(end - this->begin_batch_) - .count(); - // Assuming all packets carry max_packet_size_ bytes of data - // (8*max_packet_size_ bits); 1000000 factor to convert us to seconds - if (this->estimation_) { - this->estimation_ = - alpha_ * this->estimation_ + - (1 - alpha_) * (total_size_ * 8 * 1000000.0 / (delay)); - } else { - this->estimation_ = total_size_ * 8 * 1000000.0 / (delay); - } - if (observer_) { - observer_->notifyStats(this->estimation_); - } - this->alpha_ = this->base_alpha_; - this->number_of_packets_ = 0; - this->total_size_ = 0.0; - this->begin_batch_ = std::chrono::steady_clock::now(); - } -} - -BatchingPacketsEstimator::BatchingPacketsEstimator(double alpha_arg, - int param) { - this->estimated_ = false; - this->observer_ = NULL; - this->alpha_ = alpha_arg; - this->batching_param_ = param; - this->number_of_packets_ = 0; - this->avg_win_ = 0.0; - this->avg_rtt_ = 0.0; - this->win_change_ = 0.0; - this->max_packet_size_ = 0; - this->estimation_ = 0.0; - this->win_current_ = 1.0; - this->begin_batch_ = std::chrono::steady_clock::now(); - this->start_time_ = std::chrono::steady_clock::now(); -} - -void BatchingPacketsEstimator::onRttUpdate(double rtt) { - this->number_of_packets_++; - this->avg_rtt_ += rtt; - - if (number_of_packets_ == this->batching_param_) { - if (estimation_ == 0) { - estimation_ = (avg_win_ * 8.0 * max_packet_size_ * 1000000.0 / - (1.0 * win_change_)) / - (avg_rtt_ / (1.0 * number_of_packets_)); - } else { - estimation_ = alpha_ * estimation_ + - (1 - alpha_) * ((avg_win_ * 8.0 * max_packet_size_ * - 1000000.0 / (1.0 * win_change_)) / - (avg_rtt_ / (1.0 * number_of_packets_))); - } - - if (observer_) { - observer_->notifyStats(estimation_); - } - - this->number_of_packets_ = 0; - this->avg_win_ = 0.0; - this->avg_rtt_ = 0.0; - this->win_change_ = 0.0; - } -} - -void BatchingPacketsEstimator::onWindowIncrease(double win_current) { - TimePoint end = std::chrono::steady_clock::now(); - auto delay = - std::chrono::duration_cast(end - this->begin_batch_) - .count(); - this->avg_win_ += this->win_current_ * delay; - this->win_current_ = win_current; - this->win_change_ += delay; - this->begin_batch_ = std::chrono::steady_clock::now(); -} - -void BatchingPacketsEstimator::onWindowDecrease(double win_current) { - TimePoint end = std::chrono::steady_clock::now(); - auto delay = - std::chrono::duration_cast(end - this->begin_batch_) - .count(); - this->avg_win_ += this->win_current_ * delay; - this->win_current_ = win_current; - this->win_change_ += delay; - this->begin_batch_ = std::chrono::steady_clock::now(); -} - -} // end namespace protocol - -} // end namespace transport diff --git a/libtransport/src/hicn/transport/protocols/rate_estimation.h b/libtransport/src/hicn/transport/protocols/rate_estimation.h deleted file mode 100644 index 616501b24..000000000 --- a/libtransport/src/hicn/transport/protocols/rate_estimation.h +++ /dev/null @@ -1,173 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include -#include - -#include - -namespace transport { - -namespace protocol { - -class IcnRateEstimator { - public: - using TimePoint = std::chrono::steady_clock::time_point; - using Microseconds = std::chrono::microseconds; - - IcnRateEstimator(){}; - - virtual ~IcnRateEstimator(){}; - - virtual void onRttUpdate(double rtt){}; - - virtual void onDataReceived(int packetSize){}; - - virtual void onWindowIncrease(double winCurrent){}; - - virtual void onWindowDecrease(double winCurrent){}; - - virtual void onStart(){}; - - virtual void onDownloadFinished(){}; - - virtual void setObserver(IcnObserver *observer) { - this->observer_ = observer; - }; - IcnObserver *observer_; - TimePoint start_time_; - TimePoint begin_batch_; - double base_alpha_; - double alpha_; - double estimation_; - int number_of_packets_; - // this boolean is to make sure at least one estimation of the BW is done - bool estimated_; -}; - -// A rate estimator RTT-based. Computes EWMA(WinSize)/EWMA(RTT) - -class InterRttEstimator : public IcnRateEstimator { - public: - InterRttEstimator(double alpha_arg); - - ~InterRttEstimator(); - - void onRttUpdate(double rtt); - - void onDataReceived(int packet_size) { - if (packet_size > this->max_packet_size_) { - this->max_packet_size_ = packet_size; - } - }; - - void onWindowIncrease(double win_current); - - void onWindowDecrease(double win_current); - - void onStart(){}; - - void onDownloadFinished(){}; - - // private: should be done by using getters - pthread_t *my_th_; - bool thread_is_running_; - double rtt_; - bool is_running_; - pthread_mutex_t mutex_; - double avg_rtt_; - double avg_win_; - int max_packet_size_; - double win_change_; - double win_current_; -}; - -// A rate estimator, Batching Packets based. Computes EWMA(WinSize)/EWMA(RTT) - -class BatchingPacketsEstimator : public IcnRateEstimator { - public: - BatchingPacketsEstimator(double alpha_arg, int batchingParam); - - void onRttUpdate(double rtt); - - void onDataReceived(int packet_size) { - if (packet_size > this->max_packet_size_) { - this->max_packet_size_ = packet_size; - } - }; - - void onWindowIncrease(double win_current); - - void onWindowDecrease(double win_current); - - void onStart(){}; - - void onDownloadFinished(){}; - - private: - int batching_param_; - double avg_rtt_; - double avg_win_; - double win_change_; - int max_packet_size_; - double win_current_; -}; - -// Segment Estimator - -class ALaTcpEstimator : public IcnRateEstimator { - public: - ALaTcpEstimator(); - - void onDataReceived(int packet_size); - void onStart(); - void onDownloadFinished(); - - private: - double totalSize_; -}; - -// A Rate estimator, this one is the simplest: counting batching_param_ packets -// and then divide the sum of the size of these packets by the time taken to DL -// them. Should be the one used - -class SimpleEstimator : public IcnRateEstimator { - public: - SimpleEstimator(double alpha, int batching_param); - - void onRttUpdate(double rtt); - - void onDataReceived(int packet_size); - - void onWindowIncrease(double win_current){}; - - void onWindowDecrease(double win_current){}; - - void onStart(); - - void onDownloadFinished(); - - private: - int batching_param_; - double total_size_; -}; - -void *Timer(void *data); - -} // namespace protocol - -} // end namespace transport diff --git a/libtransport/src/hicn/transport/protocols/reassembly.cc b/libtransport/src/hicn/transport/protocols/reassembly.cc deleted file mode 100644 index 9682d338d..000000000 --- a/libtransport/src/hicn/transport/protocols/reassembly.cc +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include -#include -#include -#include -#include -#include - -namespace transport { - -namespace protocol { - -void Reassembly::notifyApplication() { - interface::ConsumerSocket::ReadCallback *read_callback = nullptr; - reassembly_consumer_socket_->getSocketOption( - interface::ConsumerCallbacksOptions::READ_CALLBACK, &read_callback); - - if (TRANSPORT_EXPECT_FALSE(!read_callback)) { - TRANSPORT_LOGE("Read callback not installed!"); - return; - } - - if (read_callback->isBufferMovable()) { - // No need to perform an additional copy. The whole buffer will be - // tranferred to the application. - - read_callback->readBufferAvailable(std::move(read_buffer_)); - read_buffer_ = utils::MemBuf::create(read_callback->maxBufferSize()); - } else { - // The buffer will be copied into the application-provided buffer - uint8_t *buffer; - std::size_t length; - std::size_t total_length = read_buffer_->length(); - - while (read_buffer_->length()) { - buffer = nullptr; - length = 0; - read_callback->getReadBuffer(&buffer, &length); - - if (!buffer || !length) { - throw errors::RuntimeException( - "Invalid buffer provided by the application."); - } - - auto to_copy = std::min(read_buffer_->length(), length); - std::memcpy(buffer, read_buffer_->data(), to_copy); - read_buffer_->trimStart(to_copy); - } - - read_callback->readDataAvailable(total_length); - read_buffer_->clear(); - } -} - -} // namespace protocol -} // namespace transport diff --git a/libtransport/src/hicn/transport/protocols/reassembly.h b/libtransport/src/hicn/transport/protocols/reassembly.h deleted file mode 100644 index 34af2a70a..000000000 --- a/libtransport/src/hicn/transport/protocols/reassembly.h +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include - -namespace transport { - -namespace interface { -class ConsumerReadCallback; -class ConsumerSocket; -} // namespace interface - -namespace protocol { - -class TransportProtocol; -class Indexer; - -// Forward Declaration -class ManifestManager; - -class Reassembly { - public: - class ContentReassembledCallback { - public: - virtual void onContentReassembled(std::error_code ec) = 0; - }; - - Reassembly(interface::ConsumerSocket *icn_socket, - TransportProtocol *transport_protocol) - : reassembly_consumer_socket_(icn_socket), - transport_protocol_(transport_protocol) {} - - virtual ~Reassembly() = default; - - virtual void reassemble(core::ContentObject::Ptr &&content_object) = 0; - virtual void reassemble( - std::unique_ptr &&manifest) = 0; - virtual void reInitialize() = 0; - virtual void setIndexer(Indexer *indexer) { index_manager_ = indexer; } - - protected: - virtual void notifyApplication(); - - protected: - interface::ConsumerSocket *reassembly_consumer_socket_; - TransportProtocol *transport_protocol_; - Indexer *index_manager_; - std::unique_ptr read_buffer_; -}; - -} // namespace protocol - -} // end namespace transport diff --git a/libtransport/src/hicn/transport/protocols/rtc.cc b/libtransport/src/hicn/transport/protocols/rtc.cc deleted file mode 100644 index fece95d03..000000000 --- a/libtransport/src/hicn/transport/protocols/rtc.cc +++ /dev/null @@ -1,1016 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include - -#include - -#include -#include - -namespace transport { - -namespace protocol { - -using namespace interface; - -RTCTransportProtocol::RTCTransportProtocol( - interface::ConsumerSocket *icn_socket) - : TransportProtocol(icn_socket, nullptr), - DatagramReassembly(icn_socket, this), - inflightInterests_(1 << default_values::log_2_default_buffer_size), - modMask_((1 << default_values::log_2_default_buffer_size) - 1) { - icn_socket->getSocketOption(PORTAL, portal_); - rtx_timer_ = std::make_unique(portal_->getIoService()); - probe_timer_ = std::make_unique(portal_->getIoService()); - sentinel_timer_ = - std::make_unique(portal_->getIoService()); - round_timer_ = std::make_unique(portal_->getIoService()); - reset(); -} - -RTCTransportProtocol::~RTCTransportProtocol() { - if (is_running_) { - stop(); - } -} - -int RTCTransportProtocol::start() { - if (is_running_) return -1; - - reset(); - is_first_ = true; - - probeRtt(); - sentinelTimer(); - newRound(); - scheduleNextInterests(); - - is_first_ = false; - is_running_ = true; - portal_->runEventsLoop(); - is_running_ = false; - - return 0; -} - -void RTCTransportProtocol::stop() { - if (!is_running_) return; - - is_running_ = false; - portal_->stopEventsLoop(); -} - -void RTCTransportProtocol::resume() { - if (is_running_) return; - - is_running_ = true; - inflightInterestsCount_ = 0; - - probeRtt(); - sentinelTimer(); - newRound(); - scheduleNextInterests(); - - portal_->runEventsLoop(); - is_running_ = false; -} - -// private -void RTCTransportProtocol::reset() { - portal_->setConsumerCallback(this); - // controller var - currentState_ = HICN_RTC_SYNC_STATE; - - // cwin var - currentCWin_ = HICN_INITIAL_CWIN; - maxCWin_ = HICN_INITIAL_CWIN_MAX; - - // names/packets var - actualSegment_ = 0; - inflightInterestsCount_ = 0; - interestRetransmissions_.clear(); - lastSegNacked_ = 0; - lastReceived_ = 0; - lastReceivedTime_ = std::chrono::duration_cast( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); - lastEvent_ = lastReceivedTime_; - highestReceived_ = 0; - firstSequenceInRound_ = 0; - - rtx_timer_used_ = false; - for (int i = 0; i < (1 << default_values::log_2_default_buffer_size); i++) { - inflightInterests_[i] = {0}; - } - - // stats - firstPckReceived_ = false; - receivedBytes_ = 0; - sentInterest_ = 0; - receivedData_ = 0; - packetLost_ = 0; - lossRecovered_ = 0; - avgPacketSize_ = HICN_INIT_PACKET_SIZE; - gotNack_ = false; - gotFutureNack_ = 0; - rounds_ = 0; - roundsWithoutNacks_ = 0; - pathTable_.clear(); - - // CC var - estimatedBw_ = 0.0; - lossRate_ = 0.0; - queuingDelay_ = 0.0; - protocolState_ = HICN_RTC_NORMAL_STATE; - - producerPathLabels_[0] = 0; - producerPathLabels_[1] = 0; - initied = false; - - socket_->setSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, - (uint32_t)HICN_RTC_INTEREST_LIFETIME); - // XXX this should be done by the application -} - -uint32_t max(uint32_t a, uint32_t b) { - if (a > b) - return a; - else - return b; -} - -uint32_t min(uint32_t a, uint32_t b) { - if (a < b) - return a; - else - return b; -} - -void RTCTransportProtocol::newRound() { - round_timer_->expires_from_now(std::chrono::milliseconds(HICN_ROUND_LEN)); - round_timer_->async_wait([this](std::error_code ec) { - if (ec) return; - updateStats(HICN_ROUND_LEN); - newRound(); - }); -} - -void RTCTransportProtocol::updateDelayStats( - const ContentObject &content_object) { - uint32_t segmentNumber = content_object.getName().getSuffix(); - uint32_t pkt = segmentNumber & modMask_; - - if (inflightInterests_[pkt].state != sent_) return; - - if (interestRetransmissions_.find(segmentNumber) != - interestRetransmissions_.end()) - // this packet was rtx at least once - return; - - uint32_t pathLabel = content_object.getPathLabel(); - - if (pathTable_.find(pathLabel) == pathTable_.end()) { - // found a new path - std::shared_ptr newPath = std::make_shared(); - pathTable_[pathLabel] = newPath; - } - - // RTT measurements are useful both from NACKs and data packets - uint64_t RTT = std::chrono::duration_cast( - std::chrono::steady_clock::now().time_since_epoch()) - .count() - - inflightInterests_[pkt].transmissionTime; - - pathTable_[pathLabel]->insertRttSample(RTT); - auto payload = content_object.getPayload(); - - // we collect OWD only for datapackets - if (payload->length() != HICN_NACK_HEADER_SIZE) { - uint64_t *senderTimeStamp = (uint64_t *)payload->data(); - int64_t OWD = std::chrono::duration_cast( - std::chrono::steady_clock::now().time_since_epoch()) - .count() - - *senderTimeStamp; - - pathTable_[pathLabel]->insertOwdSample(OWD); - pathTable_[pathLabel]->computeInterArrivalGap(segmentNumber); - } else { - pathTable_[pathLabel]->receivedNack(); - } -} - -void RTCTransportProtocol::updateStats(uint32_t round_duration) { - if (pathTable_.empty()) return; - - if (receivedBytes_ != 0) { - double bytesPerSec = - (double)(receivedBytes_ * - ((double)HICN_MILLI_IN_A_SEC / (double)round_duration)); - estimatedBw_ = (estimatedBw_ * HICN_ESTIMATED_BW_ALPHA) + - ((1 - HICN_ESTIMATED_BW_ALPHA) * bytesPerSec); - } - - uint64_t minRtt = UINT_MAX; - uint64_t maxRtt = 0; - - for (auto it = pathTable_.begin(); it != pathTable_.end(); it++) { - it->second->roundEnd(); - if (it->second->isActive()) { - if (it->second->getMinRtt() < minRtt) { - minRtt = it->second->getMinRtt(); - producerPathLabels_[0] = it->first; - } - if (it->second->getMinRtt() > maxRtt) { - maxRtt = it->second->getMinRtt(); - producerPathLabels_[1] = it->first; - } - } - } - - if (pathTable_.find(producerPathLabels_[0]) == pathTable_.end() || - pathTable_.find(producerPathLabels_[1]) == pathTable_.end()) - return; // this should not happen - - // as a queuing delay we keep the lowest one among the two paths - // if one path is congested the forwarder should decide to do not - // use it so it does not make sense to inform the application - // that maybe we have a problem - if (pathTable_[producerPathLabels_[0]]->getQueuingDealy() < - pathTable_[producerPathLabels_[1]]->getQueuingDealy()) - queuingDelay_ = pathTable_[producerPathLabels_[0]]->getQueuingDealy(); - else - queuingDelay_ = pathTable_[producerPathLabels_[1]]->getQueuingDealy(); - - if (sentInterest_ != 0 && currentState_ == HICN_RTC_NORMAL_STATE) { - uint32_t numberTheoricallyReceivedPackets_ = - highestReceived_ - firstSequenceInRound_; - double lossRate = 0; - if (numberTheoricallyReceivedPackets_ != 0) - lossRate = (double)((double)(packetLost_ - lossRecovered_) / - (double)numberTheoricallyReceivedPackets_); - - if (lossRate < 0) lossRate = 0; - - if (initied) { - lossRate_ = lossRate_ * HICN_ESTIMATED_LOSSES_ALPHA + - (lossRate * (1 - HICN_ESTIMATED_LOSSES_ALPHA)); - } else { - lossRate_ = lossRate; - initied = true; - } - } - - if (avgPacketSize_ == 0) avgPacketSize_ = HICN_INIT_PACKET_SIZE; - - // for the BDP we use the max rtt, so that we calibrate the window on the - // RTT of the slowest path. In this way we are sure that the window will - // never be too small - uint32_t BDP = (uint32_t)ceil( - (estimatedBw_ * - (double)((double)pathTable_[producerPathLabels_[1]]->getMinRtt() / - (double)HICN_MILLI_IN_A_SEC) * - HICN_BANDWIDTH_SLACK_FACTOR) / - avgPacketSize_); - uint32_t BW = (uint32_t)ceil(estimatedBw_); - computeMaxWindow(BW, BDP); - - ConsumerTimerCallback *stats_callback = nullptr; - socket_->getSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY, - &stats_callback); - if (*stats_callback) { - // Send the stats to the app - stats_->updateQueuingDelay(queuingDelay_); - stats_->updateLossRatio(lossRate_); - stats_->updateAverageRtt(pathTable_[producerPathLabels_[1]]->getMinRtt()); - (*stats_callback)(*socket_, *stats_); - } - // bound also by interest lifitime* production rate - if (!gotNack_) { - roundsWithoutNacks_++; - if (currentState_ == HICN_RTC_SYNC_STATE && - roundsWithoutNacks_ >= HICN_ROUNDS_IN_SYNC_BEFORE_SWITCH) { - currentState_ = HICN_RTC_NORMAL_STATE; - } - } else { - roundsWithoutNacks_ = 0; - } - - updateCCState(); - updateWindow(); - - if (queuingDelay_ > 25.0) { - // this indicates that the client will go soon out of synch, - // switch to synch mode - if (currentState_ == HICN_RTC_NORMAL_STATE) { - currentState_ = HICN_RTC_SYNC_STATE; - } - computeMaxWindow(BW, 0); - increaseWindow(); - } - - // in any case we reset all the counters - - gotNack_ = false; - gotFutureNack_ = 0; - receivedBytes_ = 0; - sentInterest_ = 0; - receivedData_ = 0; - packetLost_ = 0; - lossRecovered_ = 0; - rounds_++; - firstSequenceInRound_ = highestReceived_; -} - -void RTCTransportProtocol::updateCCState() { - // TODO -} - -void RTCTransportProtocol::computeMaxWindow(uint32_t productionRate, - uint32_t BDPWin) { - if (productionRate == - 0) // we have no info about the producer, keep the previous maxCWin - return; - - uint32_t interestLifetime = default_values::interest_lifetime; - socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, - interestLifetime); - uint32_t maxWaintingInterest = (uint32_t)ceil( - (productionRate / avgPacketSize_) * - (double)((double)(interestLifetime * - HICN_INTEREST_LIFETIME_REDUCTION_FACTOR) / - (double)HICN_MILLI_IN_A_SEC)); - - if (currentState_ == HICN_RTC_SYNC_STATE) { - // in this case we do not limit the window with the BDP, beacuse most - // likely it is wrong - maxCWin_ = maxWaintingInterest; - return; - } - - // currentState = RTC_NORMAL_STATE - if (BDPWin != 0) { - maxCWin_ = (uint32_t)ceil((double)BDPWin + - (((double)BDPWin * 30.0) / 100.0)); // BDP + 30% - } else { - maxCWin_ = min(maxWaintingInterest, maxCWin_); - } - - if (maxCWin_ < HICN_MIN_CWIN) maxCWin_ = HICN_MIN_CWIN; -} - -void RTCTransportProtocol::updateWindow() { - if (currentState_ == HICN_RTC_SYNC_STATE) return; - - if (currentCWin_ < maxCWin_ * 0.9) { - currentCWin_ = - min(maxCWin_, (uint32_t)(currentCWin_ * HICN_WIN_INCREASE_FACTOR)); - } else if (currentCWin_ > maxCWin_) { - currentCWin_ = - max((uint32_t)(currentCWin_ * HICN_WIN_DECREASE_FACTOR), HICN_MIN_CWIN); - } -} - -void RTCTransportProtocol::decreaseWindow() { - // this is used only in SYNC mode - if (currentState_ == HICN_RTC_NORMAL_STATE) return; - - if (gotFutureNack_ == 1) - currentCWin_ = min((currentCWin_ - 1), - (uint32_t)ceil((double)maxCWin_ * 0.66)); // 2/3 - else - currentCWin_--; - - currentCWin_ = max(currentCWin_, HICN_MIN_CWIN); -} - -void RTCTransportProtocol::increaseWindow() { - // this is used only in SYNC mode - if (currentState_ == HICN_RTC_NORMAL_STATE) return; - - // we need to be carefull to do not increase the window to much - if (currentCWin_ < ((double)maxCWin_ * 0.7)) { - currentCWin_ = currentCWin_ + 1; // exponential - } else { - currentCWin_ = min( - maxCWin_, - (uint32_t)ceil(currentCWin_ + (1.0 / (double)currentCWin_))); // linear - } -} - -void RTCTransportProtocol::probeRtt() { - probe_timer_->expires_from_now(std::chrono::milliseconds(1000)); - probe_timer_->async_wait([this](std::error_code ec) { - if (ec) return; - probeRtt(); - }); - - // To avoid sending the first probe, because the transport is not running yet - if (is_first_ && !is_running_) return; - - time_sent_probe_ = std::chrono::duration_cast( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); - - Name *interest_name = nullptr; - socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, - &interest_name); - // get a random numbe in the probe seq range - std::default_random_engine eng((std::random_device())()); - std::uniform_int_distribution idis(HICN_MIN_PROBE_SEQ, - HICN_MAX_PROBE_SEQ); - probe_seq_number_ = idis(eng); - interest_name->setSuffix(probe_seq_number_); - - // we considere the probe as a rtx so that we do not incresea inFlightInt - received_probe_ = false; - TRANSPORT_LOGD("Send content interest %u (probeRtt)", - interest_name->getSuffix()); - sendInterest(interest_name, true); -} - -void RTCTransportProtocol::sendInterest(Name *interest_name, bool rtx) { - auto interest = getPacket(); - interest->setName(*interest_name); - - uint32_t interestLifetime = default_values::interest_lifetime; - socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, - interestLifetime); - interest->setLifetime(uint32_t(interestLifetime)); - - ConsumerInterestCallback *on_interest_output = nullptr; - - socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT, - &on_interest_output); - - if (*on_interest_output) { - (*on_interest_output)(*socket_, *interest); - } - - if (TRANSPORT_EXPECT_FALSE(!is_running_ && !is_first_)) { - return; - } - - portal_->sendInterest(std::move(interest)); - - sentInterest_++; - - if (!rtx) { - packets_in_window_[interest_name->getSuffix()] = 0; - inflightInterestsCount_++; - } -} - -void RTCTransportProtocol::scheduleNextInterests() { - if (!is_running_ && !is_first_) return; - - TRANSPORT_LOGD("----- [window %u - inflight_interests %u = %d] -----", - currentCWin_, inflightInterestsCount_, - currentCWin_ - inflightInterestsCount_); - - while (inflightInterestsCount_ < currentCWin_) { - Name *interest_name = nullptr; - socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, - &interest_name); - - interest_name->setSuffix(actualSegment_); - - // if the producer socket is not stated (does not reply even with nacks) - // we keep asking for something without marking anything as lost (see - // timeout). In this way when the producer socket will start the - // consumer socket will not miss any packet - if (TRANSPORT_EXPECT_FALSE(!firstPckReceived_)) { - uint32_t pkt = actualSegment_ & modMask_; - inflightInterests_[pkt].state = sent_; - inflightInterests_[pkt].sequence = actualSegment_; - actualSegment_ = (actualSegment_ + 1) % HICN_MIN_PROBE_SEQ; - TRANSPORT_LOGD( - "Send content interest %u (scheduleNextInterests no replies)", - interest_name->getSuffix()); - sendInterest(interest_name, false); - return; - } - - // we send the packet only if it is not pending yet - // notice that this is not true for rtx packets - if (portal_->interestIsPending(*interest_name)) { - actualSegment_ = (actualSegment_ + 1) % HICN_MIN_PROBE_SEQ; - continue; - } - - uint32_t pkt = actualSegment_ & modMask_; - // if we already reacevied the content we don't ask it again - if (inflightInterests_[pkt].state == received_ && - inflightInterests_[pkt].sequence == actualSegment_) { - actualSegment_ = (actualSegment_ + 1) % HICN_MIN_PROBE_SEQ; - continue; - } - - // same if the packet is lost - if (inflightInterests_[pkt].state == lost_ && - inflightInterests_[pkt].sequence == actualSegment_) { - actualSegment_ = (actualSegment_ + 1) % HICN_MIN_PROBE_SEQ; - continue; - } - - inflightInterests_[pkt].transmissionTime = - std::chrono::duration_cast( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); - - // here the packet can be in any state except for lost or recevied - inflightInterests_[pkt].state = sent_; - inflightInterests_[pkt].sequence = actualSegment_; - actualSegment_ = (actualSegment_ + 1) % HICN_MIN_PROBE_SEQ; - - TRANSPORT_LOGD("Send content interest %u (scheduleNextInterests)", - interest_name->getSuffix()); - sendInterest(interest_name, false); - } - - TRANSPORT_LOGD("----- end of scheduleNextInterest -----"); -} - -bool RTCTransportProtocol::verifyKeyPackets() { - // Not yet implemented - return false; -} - -void RTCTransportProtocol::sentinelTimer() { - uint32_t wait = 50; - - if (pathTable_.find(producerPathLabels_[0]) != pathTable_.end() && - pathTable_.find(producerPathLabels_[1]) != pathTable_.end()) { - // we have all the info to set the timers - wait = round(pathTable_[producerPathLabels_[0]]->getInterArrivalGap()); - if (wait == 0) wait = 1; - } - - sentinel_timer_->expires_from_now(std::chrono::milliseconds(wait)); - sentinel_timer_->async_wait([this](std::error_code ec) { - if (ec) return; - - uint64_t now = std::chrono::duration_cast( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); - - if (pathTable_.find(producerPathLabels_[0]) == pathTable_.end() || - pathTable_.find(producerPathLabels_[1]) == pathTable_.end()) { - // we have no info, so we send again - - for (auto it = packets_in_window_.begin(); it != packets_in_window_.end(); - it++) { - uint32_t pkt = it->first & modMask_; - if (inflightInterests_[pkt].sequence == it->first) { - inflightInterests_[pkt].transmissionTime = now; - Name *interest_name = nullptr; - socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, - &interest_name); - interest_name->setSuffix(it->first); - it->second++; - sendInterest(interest_name, true); - } - } - } else { - uint64_t max_waiting_time = // wait at least 50ms - (pathTable_[producerPathLabels_[1]]->getMinRtt() - - pathTable_[producerPathLabels_[0]]->getMinRtt()) + - (ceil(pathTable_[producerPathLabels_[0]]->getInterArrivalGap()) * 50); - - if ((currentState_ == HICN_RTC_NORMAL_STATE) && - (inflightInterestsCount_ >= currentCWin_) && - ((now - lastEvent_) > max_waiting_time) && (lossRate_ >= 0.05)) { - uint64_t RTT = pathTable_[producerPathLabels_[1]]->getMinRtt(); - - for (auto it = packets_in_window_.begin(); - it != packets_in_window_.end(); it++) { - uint32_t pkt = it->first & modMask_; - if (inflightInterests_[pkt].sequence == it->first && - ((now - inflightInterests_[pkt].transmissionTime) >= RTT)) { - inflightInterests_[pkt].transmissionTime = now; - Name *interest_name = nullptr; - socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, - &interest_name); - interest_name->setSuffix(it->first); - it->second++; - sendInterest(interest_name, true); - } - } - } - } - - sentinelTimer(); - }); -} -void RTCTransportProtocol::addRetransmissions(uint32_t val) { - // add only val in the rtx list - addRetransmissions(val, val + 1); -} - -void RTCTransportProtocol::addRetransmissions(uint32_t start, uint32_t stop) { - uint64_t now = std::chrono::duration_cast( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); - - bool new_rtx = false; - for (uint32_t i = start; i < stop; i++) { - auto it = interestRetransmissions_.find(i); - if (it == interestRetransmissions_.end()) { - uint32_t pkt = i & modMask_; - if (lastSegNacked_ <= i && inflightInterests_[pkt].state != received_) { - // it must be larger than the last past nack received - packetLost_++; - interestRetransmissions_[i] = 0; - uint32_t pkt = i & modMask_; - // we reset the transmission time setting to now, so that rtx will - // happne in one RTT on waint one inter arrival gap - inflightInterests_[pkt].transmissionTime = now; - new_rtx = true; - } - } // if the retransmission is already there the rtx timer will - // take care of it - } - - // in case a new rtx is added to the map we need to run checkRtx() - if (new_rtx) { - if (rtx_timer_used_) { - // if a timer is pending we need to delete it - rtx_timer_->cancel(); - rtx_timer_used_ = false; - } - checkRtx(); - } -} - -uint64_t RTCTransportProtocol::retransmit() { - auto it = interestRetransmissions_.begin(); - - // cut len to max HICN_MAX_RTX_SIZE - // since we use a map, the smaller (and so the older) sequence number are at - // the beginnin of the map - while (interestRetransmissions_.size() > HICN_MAX_RTX_SIZE) { - it = interestRetransmissions_.erase(it); - } - - it = interestRetransmissions_.begin(); - uint64_t smallest_timeout = ULONG_MAX; - uint64_t now = std::chrono::duration_cast( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); - - while (it != interestRetransmissions_.end()) { - uint32_t pkt = it->first & modMask_; - - if (inflightInterests_[pkt].sequence != it->first) { - // this packet is not anymore in the inflight buffer, erase it - it = interestRetransmissions_.erase(it); - continue; - } - - // we retransmitted the packet too many times - if (it->second >= HICN_MAX_RTX) { - it = interestRetransmissions_.erase(it); - continue; - } - - // this packet is too old - if ((lastReceived_ > it->first) && - (lastReceived_ - it->first) > HICN_MAX_RTX_MAX_AGE) { - it = interestRetransmissions_.erase(it); - continue; - } - - uint64_t rtx_time = now; - - if (it->second == 0) { - // first rtx - if (producerPathLabels_[0] != producerPathLabels_[1]) { - // multipath - if (pathTable_.find(producerPathLabels_[0]) != pathTable_.end() && - pathTable_.find(producerPathLabels_[1]) != pathTable_.end() && - (pathTable_[producerPathLabels_[0]]->getInterArrivalGap() < - HICN_MIN_INTER_ARRIVAL_GAP)) { - rtx_time = lastReceivedTime_ + - (pathTable_[producerPathLabels_[1]]->getMinRtt() - - pathTable_[producerPathLabels_[0]]->getMinRtt()) + - pathTable_[producerPathLabels_[0]]->getInterArrivalGap(); - } // else low rate producer, send it immediatly - } else { - // single path - if (pathTable_.find(producerPathLabels_[0]) != pathTable_.end() && - (pathTable_[producerPathLabels_[0]]->getInterArrivalGap() < - HICN_MIN_INTER_ARRIVAL_GAP)) { - rtx_time = lastReceivedTime_ + - pathTable_[producerPathLabels_[0]]->getInterArrivalGap(); - } // else low rate producer send immediatly - } - } else { - // second or plus rtx, wait for the min rtt - if (pathTable_.find(producerPathLabels_[0]) != pathTable_.end()) { - uint64_t sent_time = inflightInterests_[pkt].transmissionTime; - rtx_time = sent_time + pathTable_[producerPathLabels_[0]]->getMinRtt(); - } // if we don't have info we send it immediatly - } - - if (now >= rtx_time) { - inflightInterests_[pkt].transmissionTime = now; - it->second++; - - Name *interest_name = nullptr; - socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, - &interest_name); - interest_name->setSuffix(it->first); - TRANSPORT_LOGD("Send content interest %u (retransmit)", - interest_name->getSuffix()); - sendInterest(interest_name, true); - } else if (rtx_time < smallest_timeout) { - smallest_timeout = rtx_time; - } - - ++it; - } - return smallest_timeout; -} - -void RTCTransportProtocol::checkRtx() { - if (interestRetransmissions_.empty()) { - rtx_timer_used_ = false; - return; - } - - uint64_t next_timeout = retransmit(); - uint64_t wait = 1; - uint64_t now = std::chrono::duration_cast( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); - if (next_timeout != ULONG_MAX && now < next_timeout) { - wait = next_timeout - now; - } - rtx_timer_used_ = true; - rtx_timer_->expires_from_now(std::chrono::milliseconds(wait)); - rtx_timer_->async_wait([this](std::error_code ec) { - if (ec) return; - rtx_timer_used_ = false; - checkRtx(); - }); -} - -void RTCTransportProtocol::onTimeout(Interest::Ptr &&interest) { - uint32_t segmentNumber = interest->getName().getSuffix(); - - if (segmentNumber >= HICN_MIN_PROBE_SEQ) { - // this is a timeout on a probe, do nothing - return; - } - - uint32_t pkt = segmentNumber & modMask_; - - if (TRANSPORT_EXPECT_FALSE(!firstPckReceived_)) { - // we do nothing, and we keep asking the same stuff over - // and over until we get at least a packet - inflightInterestsCount_--; - lastEvent_ = std::chrono::duration_cast( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); - packets_in_window_.erase(segmentNumber); - scheduleNextInterests(); - return; - } - - if (inflightInterests_[pkt].state == sent_) { - lastEvent_ = std::chrono::duration_cast( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); - packets_in_window_.erase(segmentNumber); - inflightInterestsCount_--; - } - - // check how many times we sent this packet - auto it = interestRetransmissions_.find(segmentNumber); - if (it != interestRetransmissions_.end() && it->second >= HICN_MAX_RTX) { - inflightInterests_[pkt].state = lost_; - } - - if (inflightInterests_[pkt].state == sent_) { - inflightInterests_[pkt].state = timeout1_; - } else if (inflightInterests_[pkt].state == timeout1_) { - inflightInterests_[pkt].state = timeout2_; - } else if (inflightInterests_[pkt].state == timeout2_) { - inflightInterests_[pkt].state = lost_; - } - - if (inflightInterests_[pkt].state == lost_) { - interestRetransmissions_.erase(segmentNumber); - } else { - addRetransmissions(segmentNumber); - } - - scheduleNextInterests(); -} - -bool RTCTransportProtocol::onNack(const ContentObject &content_object, - bool rtx) { - uint32_t *payload = (uint32_t *)content_object.getPayload()->data(); - uint32_t productionSeg = *payload; - uint32_t productionRate = *(++payload); - uint32_t nackSegment = content_object.getName().getSuffix(); - - bool old_nack = false; - - // if we did not received anything between lastReceived_ + 1 and productionSeg - // most likelly some packets got lost - if (lastReceived_ != 0) { - addRetransmissions(lastReceived_ + 1, productionSeg); - } - - if (!rtx) { - gotNack_ = true; - // we synch the estimated production rate with the actual one - estimatedBw_ = (double)productionRate; - } - - if (productionSeg > nackSegment) { - // we are asking for stuff produced in the past - actualSegment_ = max(productionSeg, actualSegment_) % HICN_MIN_PROBE_SEQ; - - if (!rtx) { - if (currentState_ == HICN_RTC_NORMAL_STATE) { - currentState_ = HICN_RTC_SYNC_STATE; - } - - computeMaxWindow(productionRate, 0); - increaseWindow(); - } - - lastSegNacked_ = productionSeg; - old_nack = true; - - } else if (productionSeg < nackSegment) { - actualSegment_ = productionSeg % HICN_MIN_PROBE_SEQ; - - if (!rtx) { - // we are asking stuff in the future - gotFutureNack_++; - computeMaxWindow(productionRate, 0); - decreaseWindow(); - - if (currentState_ == HICN_RTC_SYNC_STATE) { - currentState_ = HICN_RTC_NORMAL_STATE; - } - } - } else { - // we are asking the right thing, but the producer is slow - // keep doing the same until the packet is produced - actualSegment_ = productionSeg % HICN_MIN_PROBE_SEQ; - } - - return old_nack; -} - -void RTCTransportProtocol::onContentObject( - Interest::Ptr &&interest, ContentObject::Ptr &&content_object) { - // as soon as we get a packet firstPckReceived_ will never be false - firstPckReceived_ = true; - - auto payload = content_object->getPayload(); - uint32_t payload_size = (uint32_t)payload->length(); - uint32_t segmentNumber = content_object->getName().getSuffix(); - uint32_t pkt = segmentNumber & modMask_; - - ConsumerContentObjectCallback *callback_content_object = nullptr; - socket_->getSocketOption(ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT, - &callback_content_object); - if (*callback_content_object) { - (*callback_content_object)(*socket_, *content_object); - } - - if (segmentNumber >= HICN_MIN_PROBE_SEQ) { - TRANSPORT_LOGD("Received probe %u", segmentNumber); - if (segmentNumber == probe_seq_number_ && !received_probe_) { - received_probe_ = true; - - uint32_t pathLabel = content_object->getPathLabel(); - if (pathTable_.find(pathLabel) == pathTable_.end()) { - std::shared_ptr newPath = std::make_shared(); - pathTable_[pathLabel] = newPath; - } - - // this is the expected probe, update the RTT and drop the packet - uint64_t RTT = std::chrono::duration_cast( - std::chrono::steady_clock::now().time_since_epoch()) - .count() - - time_sent_probe_; - - pathTable_[pathLabel]->insertRttSample(RTT); - pathTable_[pathLabel]->receivedNack(); - } - return; - } - - // check if the packet is a rtx - bool is_rtx = false; - if (interestRetransmissions_.find(segmentNumber) != - interestRetransmissions_.end()) { - is_rtx = true; - } else { - auto it_win = packets_in_window_.find(segmentNumber); - if (it_win != packets_in_window_.end() && it_win->second != 0) - is_rtx = true; - } - - if (payload_size == HICN_NACK_HEADER_SIZE) { - TRANSPORT_LOGD("Received nack %u", segmentNumber); - - if (inflightInterests_[pkt].state == sent_) { - lastEvent_ = std::chrono::duration_cast( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); - packets_in_window_.erase(segmentNumber); - inflightInterestsCount_--; - } - - bool old_nack = false; - - if (!is_rtx) { - // this is not a retransmitted packet - old_nack = onNack(*content_object, false); - updateDelayStats(*content_object); - } else { - old_nack = onNack(*content_object, true); - } - - // the nacked_ state is used only to avoid to decrease - // inflightInterestsCount_ multiple times. In fact, every time that we - // receive an event related to an interest (timeout, nacked, content) we - // cange the state. In this way we are sure that we do not decrease twice - // the counter - if (old_nack) { - inflightInterests_[pkt].state = lost_; - interestRetransmissions_.erase(segmentNumber); - } else { - inflightInterests_[pkt].state = nacked_; - } - - } else { - TRANSPORT_LOGD("Received content %u", segmentNumber); - - avgPacketSize_ = (HICN_ESTIMATED_PACKET_SIZE * avgPacketSize_) + - ((1 - HICN_ESTIMATED_PACKET_SIZE) * payload->length()); - - receivedBytes_ += (uint32_t)(content_object->headerSize() + - content_object->payloadSize()); - - if (inflightInterests_[pkt].state == sent_) { - lastEvent_ = std::chrono::duration_cast( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); - packets_in_window_.erase(segmentNumber); - inflightInterestsCount_--; // packet sent without timeouts - } - - if (inflightInterests_[pkt].state == sent_ && !is_rtx) { - // delay stats are computed only for non retransmitted data - updateDelayStats(*content_object); - } - - addRetransmissions(lastReceived_ + 1, segmentNumber); - if (segmentNumber > highestReceived_) { - highestReceived_ = segmentNumber; - } - if (segmentNumber > lastReceived_) { - lastReceived_ = segmentNumber; - lastReceivedTime_ = - std::chrono::duration_cast( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); - } - receivedData_++; - inflightInterests_[pkt].state = received_; - - auto it = interestRetransmissions_.find(segmentNumber); - if (it != interestRetransmissions_.end()) lossRecovered_++; - - interestRetransmissions_.erase(segmentNumber); - - reassemble(std::move(content_object)); - increaseWindow(); - } - - scheduleNextInterests(); -} - -} // end namespace protocol - -} // end namespace transport diff --git a/libtransport/src/hicn/transport/protocols/rtc.h b/libtransport/src/hicn/transport/protocols/rtc.h deleted file mode 100644 index f34afbb5f..000000000 --- a/libtransport/src/hicn/transport/protocols/rtc.h +++ /dev/null @@ -1,227 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiTC_SYNC_STATE - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include -#include -#include - -#include -#include -#include - -// algorithm state -#define HICN_RTC_SYNC_STATE 0 -#define HICN_RTC_NORMAL_STATE 1 -#define HICN_ROUNDS_IN_SYNC_BEFORE_SWITCH 3 - -// packet constants -#define HICN_INIT_PACKET_SIZE 1300 // bytes -#define HICN_PACKET_HEADER_SIZE 60 // bytes ipv6+tcp -#define HICN_NACK_HEADER_SIZE 8 // bytes -#define HICN_TIMESTAMP_SIZE 8 // bytes -#define HICN_RTC_INTEREST_LIFETIME 1000 // ms - -// rtt measurement -// normal interests for data goes from 0 to -// HICN_MIN_PROBE_SEQ, the rest is reserverd for -// probes -#define HICN_MIN_PROBE_SEQ 0xefffffff -#define HICN_MAX_PROBE_SEQ 0xffffffff - -// controller constant -#define HICN_ROUND_LEN \ - 200 // ms interval of time on which - // we take decisions / measurements -#define HICN_MAX_RTX 10 -#define HICN_MAX_RTX_SIZE 1024 -#define HICN_MAX_RTX_MAX_AGE 10000 -#define HICN_MIN_RTT_WIN 30 // rounds -#define HICN_MIN_INTER_ARRIVAL_GAP 100 // ms - -// cwin -#define HICN_INITIAL_CWIN 1 // packets -#define HICN_INITIAL_CWIN_MAX 100000 // packets -#define HICN_MIN_CWIN 10 // packets -#define HICN_WIN_INCREASE_FACTOR 1.5 -#define HICN_WIN_DECREASE_FACTOR 0.9 - -// statistics constants -#define HICN_BANDWIDTH_SLACK_FACTOR 1.8 -#define HICN_ESTIMATED_BW_ALPHA 0.7 -#define HICN_ESTIMATED_PACKET_SIZE 0.7 -#define HICN_ESTIMATED_LOSSES_ALPHA 0.8 -#define HICN_INTEREST_LIFETIME_REDUCTION_FACTOR 0.8 - -// other constants -#define HICN_NANO_IN_A_SEC 1000000000 -#define HICN_MICRO_IN_A_SEC 1000000 -#define HICN_MILLI_IN_A_SEC 1000 - -namespace transport { - -namespace protocol { - -enum packetState { sent_, nacked_, received_, timeout1_, timeout2_, lost_ }; - -typedef enum packetState packetState_t; - -struct sentInterest { - uint64_t transmissionTime; - uint32_t sequence; // sequence number of the interest sent - // to handle seq % buffer_size - packetState_t state; // see packet state -}; - -class RTCTransportProtocol : public TransportProtocol, - public DatagramReassembly { - public: - RTCTransportProtocol(interface::ConsumerSocket *icnet_socket); - - ~RTCTransportProtocol(); - - int start() override; - - void stop() override; - - void resume() override; - - bool verifyKeyPackets() override; - - private: - // algo functions - void reset() override; - - // CC functions - void updateDelayStats(const ContentObject &content_object); - void updateStats(uint32_t round_duration); - void updateCCState(); - void computeMaxWindow(uint32_t productionRate, uint32_t BDPWin); - void updateWindow(); - void decreaseWindow(); - void increaseWindow(); - void resetPreviousWindow(); - - // packet functions - void sendInterest(Name *interest_name, bool rtx); - void scheduleNextInterests() override; - void sentinelTimer(); - void addRetransmissions(uint32_t val); - void addRetransmissions(uint32_t start, uint32_t stop); - uint64_t retransmit(); - void checkRtx(); - void probeRtt(); - void newRound(); - void onTimeout(Interest::Ptr &&interest) override; - bool onNack(const ContentObject &content_object, bool rtx); - void onContentObject(Interest::Ptr &&interest, - ContentObject::Ptr &&content_object) override; - void onPacketDropped(Interest::Ptr &&interest, - ContentObject::Ptr &&content_object) override {} - void onReassemblyFailed(std::uint32_t missing_segment) override {} - - TRANSPORT_ALWAYS_INLINE virtual void reassemble( - ContentObject::Ptr &&content_object) override { - auto read_buffer = content_object->getPayload(); - read_buffer->trimStart(HICN_TIMESTAMP_SIZE); - Reassembly::read_buffer_ = std::move(read_buffer); - Reassembly::notifyApplication(); - } - - // controller var - std::unique_ptr round_timer_; - unsigned currentState_; - - // cwin var - uint32_t currentCWin_; - uint32_t maxCWin_; - - // names/packets var - uint32_t actualSegment_; - uint32_t inflightInterestsCount_; - // map seq to rtx - std::map interestRetransmissions_; - bool rtx_timer_used_; - std::unique_ptr rtx_timer_; - std::vector inflightInterests_; - uint32_t lastSegNacked_; // indicates the segment id in the last received - // past Nack. we do not ask for retransmissions - // for samething that is older than this value. - uint32_t lastReceived_; // segment of the last content object received - // indicates the base of the window on the client - uint64_t lastReceivedTime_; // time at which we recevied the - // lastReceived_ packet - - // sentinel - // if all packets in the window get lost we need something that - // wakes up our consumer socket. Interest timeouts set to 1 sec - // expire too late. This timers expire much sooner and if it - // detects that all the interest in the window may be lost - // it sends all of them again - std::unique_ptr sentinel_timer_; - uint64_t lastEvent_; // time at which we removed a pending - // interest from the window - std::unordered_map packets_in_window_; - - // rtt probes - // the RTC transport tends to overestimate the RTT - // du to the production time on the server side - // once per second we send an interest for wich we know - // we will get a nack. This nack will keep our estimation - // close to the reality - std::unique_ptr probe_timer_; - uint64_t time_sent_probe_; - uint32_t probe_seq_number_; - bool received_probe_; - - uint32_t modMask_; - - // stats - bool firstPckReceived_; - uint32_t receivedBytes_; - uint32_t sentInterest_; - uint32_t receivedData_; - int32_t packetLost_; - int32_t lossRecovered_; - uint32_t firstSequenceInRound_; - uint32_t highestReceived_; - double avgPacketSize_; - bool gotNack_; - uint32_t gotFutureNack_; - uint32_t rounds_; - uint32_t roundsWithoutNacks_; - - // we keep track of up two paths (if only one path is in use - // the two values in the vector will be the same) - // position 0 stores the path with minRTT - // position 1 stores the path with maxRTT - uint32_t producerPathLabels_[2]; - - std::unordered_map> pathTable_; - uint32_t roundCounter_; - - // CC var - double estimatedBw_; - double lossRate_; - double queuingDelay_; - unsigned protocolState_; - - bool initied; -}; - -} // namespace protocol - -} // namespace transport diff --git a/libtransport/src/hicn/transport/protocols/rtc_data_path.cc b/libtransport/src/hicn/transport/protocols/rtc_data_path.cc deleted file mode 100644 index 0cbff0e3c..000000000 --- a/libtransport/src/hicn/transport/protocols/rtc_data_path.cc +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include -#include -#include - -#define MAX_ROUNDS_WITHOUT_PKTS 10 //2sec - -namespace transport { - -namespace protocol { - -RTCDataPath::RTCDataPath() - : min_rtt(UINT_MAX), - prev_min_rtt(UINT_MAX), - min_owd(INT_MAX), // this is computed like in LEDBAT, so it is not the - // real OWD, but the measured one, that depends on the - // clock of sender and receiver. the only meaningful - // value is is the queueing delay. for this reason we - // keep both RTT (for the windowd calculation) and OWD - // (for congestion/quality control) - prev_min_owd(INT_MAX), - avg_owd(0.0), - queuing_delay(DBL_MAX), - lastRecvSeq_(0), - lastRecvTime_(0), - avg_inter_arrival_(DBL_MAX), - received_nacks_(false), - received_packets_(false), - rounds_without_packets_(0), - RTThistory_(HISTORY_LEN), - OWDhistory_(HISTORY_LEN){}; - -void RTCDataPath::insertRttSample(uint64_t rtt) { - // for the rtt we only keep track of the min one - if (rtt < min_rtt) min_rtt = rtt; -} - -void RTCDataPath::insertOwdSample(int64_t owd) { - // for owd we use both min and avg - if (owd < min_owd) min_owd = owd; - - if(avg_owd != DBL_MAX) - avg_owd = (avg_owd * (1 - ALPHA_RTC)) + (owd * ALPHA_RTC); - else { - avg_owd = owd; - } - - //owd is computed only for valid data packets so we count only - //this for decide if we recevie traffic or not - received_packets_ = true; -} - -void RTCDataPath::computeInterArrivalGap(uint32_t segmentNumber){ - - //got packet in sequence, compute gap - if(lastRecvSeq_ == (segmentNumber - 1)){ - uint64_t now = std::chrono::duration_cast( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); - uint64_t delta = now - lastRecvTime_; - lastRecvSeq_ = segmentNumber; - lastRecvTime_ = now; - if(avg_inter_arrival_ == DBL_MAX) - avg_inter_arrival_ = delta; - else - avg_inter_arrival_ = (avg_inter_arrival_ * (1 -ALPHA_RTC)) - + (delta * ALPHA_RTC); - return; - } - - //ooo packet, update the stasts if needed - if(lastRecvSeq_ <= segmentNumber){ - lastRecvSeq_ = segmentNumber; - lastRecvTime_ = std::chrono::duration_cast( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); - } -} - -void RTCDataPath::receivedNack(){ - received_nacks_ = true; -} - -double RTCDataPath::getInterArrivalGap(){ - if(avg_inter_arrival_ == DBL_MAX) - return 0; - return avg_inter_arrival_; -} - -bool RTCDataPath::isActive(){ - if(received_nacks_ && rounds_without_packets_ < MAX_ROUNDS_WITHOUT_PKTS) - return true; - return false; -} - -void RTCDataPath::roundEnd() { - // reset min_rtt and add it to the history - if (min_rtt != UINT_MAX) { - prev_min_rtt = min_rtt; - } else { - // this may happen if we do not receive any packet - // from this path in the last round. in this case - // we use the measure from the previuos round - min_rtt = prev_min_rtt; - } - - if(min_rtt == 0) - min_rtt = 1; - - RTThistory_.pushBack(min_rtt); - min_rtt = UINT_MAX; - - // do the same for min owd - if (min_owd != INT_MAX) { - prev_min_owd = min_owd; - } else { - min_owd = prev_min_owd; - } - - if (min_owd != INT_MAX) { - OWDhistory_.pushBack(min_owd); - min_owd = INT_MAX; - - // compute queuing delay - queuing_delay = avg_owd - getMinOwd(); - - } else { - queuing_delay = 0.0; - } - - if(!received_packets_) - rounds_without_packets_++; - else - rounds_without_packets_ = 0; - received_packets_ = false; -} - -double RTCDataPath::getQueuingDealy() { return queuing_delay; } - -uint64_t RTCDataPath::getMinRtt() { return RTThistory_.begin(); } - -int64_t RTCDataPath::getMinOwd() { return OWDhistory_.begin(); } - -} // end namespace protocol - -} // end namespace transport diff --git a/libtransport/src/hicn/transport/protocols/rtc_data_path.h b/libtransport/src/hicn/transport/protocols/rtc_data_path.h deleted file mode 100644 index 48a67c525..000000000 --- a/libtransport/src/hicn/transport/protocols/rtc_data_path.h +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include -#include -#include - -#define ALPHA_RTC 0.125 -#define HISTORY_LEN 20 //4 sec - -namespace transport { - -namespace protocol { - -class RTCDataPath { - public: - RTCDataPath(); - - public: - void insertRttSample(uint64_t rtt); - void insertOwdSample(int64_t owd); - void computeInterArrivalGap(uint32_t segmentNumber); - void receivedNack(); - - uint64_t getMinRtt(); - double getQueuingDealy(); - double getInterArrivalGap(); - bool isActive(); - - void roundEnd(); - - private: - int64_t getMinOwd(); - - uint64_t min_rtt; - uint64_t prev_min_rtt; - - int64_t min_owd; - int64_t prev_min_owd; - - double avg_owd; - - double queuing_delay; - - uint32_t lastRecvSeq_; - uint64_t lastRecvTime_; - double avg_inter_arrival_; - - //flags to check if a path is active - //we considere a path active if it reaches a producer - //(not a cache) --aka we got at least one nack on this path-- - //and if we receives packets - bool received_nacks_; - bool received_packets_; - uint8_t rounds_without_packets_; //if we don't get any packet - //for MAX_ROUNDS_WITHOUT_PKTS - //we consider the path inactive - - utils::MinFilter RTThistory_; - utils::MinFilter OWDhistory_; -}; - -} // namespace protocol - -} // end namespace transport diff --git a/libtransport/src/hicn/transport/protocols/statistics.h b/libtransport/src/hicn/transport/protocols/statistics.h deleted file mode 100644 index c92940ab4..000000000 --- a/libtransport/src/hicn/transport/protocols/statistics.h +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include - -#include - -namespace transport { - -namespace protocol { - -class TransportStatistics { - static constexpr double default_alpha = 0.7; - - public: - TransportStatistics(double alpha = default_alpha) - : retx_count_(0), - bytes_received_(0), - average_rtt_(0), - avg_window_size_(0), - interest_tx_(0), - alpha_(alpha), - loss_ratio_(0.0), - queuing_delay_(0.0) {} - - TRANSPORT_ALWAYS_INLINE void updateRetxCount(uint64_t retx) { - retx_count_ += retx; - } - - TRANSPORT_ALWAYS_INLINE void updateBytesRecv(uint64_t bytes) { - bytes_received_ += bytes; - } - - TRANSPORT_ALWAYS_INLINE void updateAverageRtt(uint64_t rtt) { - average_rtt_ = (alpha_ * average_rtt_) + ((1. - alpha_) * double(rtt)); - } - - TRANSPORT_ALWAYS_INLINE void updateAverageWindowSize(double current_window) { - avg_window_size_ = - (alpha_ * avg_window_size_) + ((1. - alpha_) * current_window); - } - - TRANSPORT_ALWAYS_INLINE void updateInterestTx(uint64_t int_tx) { - interest_tx_ += int_tx; - } - - TRANSPORT_ALWAYS_INLINE void updateLossRatio(double loss_ratio) { - loss_ratio_ = loss_ratio; - } - - TRANSPORT_ALWAYS_INLINE void updateQueuingDelay(double queuing_delay) { - queuing_delay_ = queuing_delay; - } - - TRANSPORT_ALWAYS_INLINE uint64_t getRetxCount() const { return retx_count_; } - - TRANSPORT_ALWAYS_INLINE uint64_t getBytesRecv() const { - return bytes_received_; - } - - TRANSPORT_ALWAYS_INLINE double getAverageRtt() const { return average_rtt_; } - - TRANSPORT_ALWAYS_INLINE double getAverageWindowSize() const { - return avg_window_size_; - } - - TRANSPORT_ALWAYS_INLINE uint64_t getInterestTx() const { - return interest_tx_; - } - - TRANSPORT_ALWAYS_INLINE double getLossRatio() const { return loss_ratio_; } - - TRANSPORT_ALWAYS_INLINE double getQueuingDelay() const { - return queuing_delay_; - } - - TRANSPORT_ALWAYS_INLINE void reset() { - retx_count_ = 0; - bytes_received_ = 0; - average_rtt_ = 0; - avg_window_size_ = 0; - interest_tx_ = 0; - loss_ratio_ = 0; - } - - private: - uint64_t retx_count_; - uint64_t bytes_received_; - double average_rtt_; - double avg_window_size_; - uint64_t interest_tx_; - double alpha_; - double loss_ratio_; - double queuing_delay_; -}; - -} // end namespace protocol - -} // end namespace transport diff --git a/libtransport/src/hicn/transport/protocols/test/CMakeLists.txt b/libtransport/src/hicn/transport/protocols/test/CMakeLists.txt deleted file mode 100644 index 6f9fdb9aa..000000000 --- a/libtransport/src/hicn/transport/protocols/test/CMakeLists.txt +++ /dev/null @@ -1,10 +0,0 @@ -# Enable gcov output for the tests -add_definitions(--coverage) -set(CMAKE_EXE_LINKER_FLAGS ${CMAKE_EXE_LINKER_FLAGS} " --coverage") - -set(TestsExpectedToPass - test_transport_producer) - -foreach(test ${TestsExpectedToPass}) - AddTest(${test}) -endforeach() \ No newline at end of file diff --git a/libtransport/src/hicn/transport/protocols/test/test_transport_producer.cc b/libtransport/src/hicn/transport/protocols/test/test_transport_producer.cc deleted file mode 100644 index 204f2cbe2..000000000 --- a/libtransport/src/hicn/transport/protocols/test/test_transport_producer.cc +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include - -#include "../socket_producer.h" -#include "literals.h" - -#include -#include - -namespace transport { - -namespace protocol { - -namespace { -// The fixture for testing class Foo. -class ProducerTest : public ::testing::Test { - protected: - ProducerTest() : name_("b001::123|321"), producer_(io_service_) { - // You can do set-up work for each test here. - } - - virtual ~ProducerTest() { - // You can do clean-up work that doesn't throw exceptions here. - } - - // If the constructor and destructor are not enough for setting up - // and cleaning up each test, you can define the following methods: - - virtual void SetUp() { - // Code here will be called immediately after the constructor (right - // before each test). - } - - virtual void TearDown() { - // Code here will be called immediately after each test (right - // before the destructor). - } - - Name name_; - asio::io_service io_service_; - ProducerSocket producer_; -}; - -} // namespace - -// Tests that the Foo::Bar() method does Abc. -TEST_F(ProducerTest, ProduceContent) { - std::string content(250000, '?'); - - producer_.registerPrefix(Prefix("b001::/64")); - producer_.produce(name_, reinterpret_cast(content.data()), - content.size(), true); - producer_.setSocketOption(GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME, - 500000000_U32); - producer_.attach(); - producer_.serveForever(); -} - -} // namespace protocol - -} // namespace transport - -int main(int argc, char **argv) { - ::testing::InitGoogleTest(&argc, argv); - return RUN_ALL_TESTS(); -} \ No newline at end of file diff --git a/libtransport/src/hicn/transport/protocols/verification_manager.cc b/libtransport/src/hicn/transport/protocols/verification_manager.cc deleted file mode 100644 index 74faf0521..000000000 --- a/libtransport/src/hicn/transport/protocols/verification_manager.cc +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include -#include - -namespace transport { - -namespace protocol { - -interface::VerificationPolicy SignatureVerificationManager::onPacketToVerify( - const Packet& packet) { - using namespace interface; - - bool verify_signature = false, key_content = false; - VerificationPolicy ret = VerificationPolicy::DROP_PACKET; - - icn_socket_->getSocketOption(GeneralTransportOptions::VERIFY_SIGNATURE, - verify_signature); - icn_socket_->getSocketOption(GeneralTransportOptions::KEY_CONTENT, - key_content); - - if (!verify_signature) { - return VerificationPolicy::ACCEPT_PACKET; - } - - if (key_content) { - key_packets_.push(copyPacket(packet)); - return VerificationPolicy::ACCEPT_PACKET; - } else if (!key_packets_.empty()) { - std::queue().swap(key_packets_); - } - - ConsumerContentObjectVerificationFailedCallback* - verification_failed_callback = VOID_HANDLER; - icn_socket_->getSocketOption(ConsumerCallbacksOptions::VERIFICATION_FAILED, - &verification_failed_callback); - - if (!verification_failed_callback) { - throw errors::RuntimeException( - "No verification failed callback provided by application. " - "Aborting."); - } - - std::shared_ptr verifier; - icn_socket_->getSocketOption(GeneralTransportOptions::VERIFIER, verifier); - - if (TRANSPORT_EXPECT_FALSE(!verifier)) { - ret = (*verification_failed_callback)( - *icn_socket_, dynamic_cast(packet), - make_error_code(protocol_error::no_verifier_provided)); - return ret; - } - - if (!verifier->verify(packet)) { - ret = (*verification_failed_callback)( - *icn_socket_, dynamic_cast(packet), - make_error_code(protocol_error::signature_verification_failed)); - } else { - ret = VerificationPolicy::ACCEPT_PACKET; - } - - return ret; -} - -bool SignatureVerificationManager::onKeyToVerify() { - if (TRANSPORT_EXPECT_FALSE(key_packets_.empty())) { - throw errors::RuntimeException("No key to verify."); - } - - while (!key_packets_.empty()) { - ContentObjectPtr packet_to_verify = key_packets_.front(); - key_packets_.pop(); - if (onPacketToVerify(*packet_to_verify) != - VerificationPolicy::ACCEPT_PACKET) - return false; - } - - return true; -} - -} // end namespace protocol - -} // end namespace transport diff --git a/libtransport/src/hicn/transport/protocols/verification_manager.h b/libtransport/src/hicn/transport/protocols/verification_manager.h deleted file mode 100644 index 293e8103a..000000000 --- a/libtransport/src/hicn/transport/protocols/verification_manager.h +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include -#include -#include - -namespace transport { - -namespace interface { -class ConsumerSocket; -} - -namespace protocol { - -using Packet = core::Packet; -using interface::ConsumerSocket; -using interface::VerificationPolicy; -using ContentObjectPtr = std::shared_ptr; - -class VerificationManager { - public: - virtual ~VerificationManager() = default; - virtual VerificationPolicy onPacketToVerify(const Packet& packet) = 0; - virtual bool onKeyToVerify() { return false; } -}; - -class SignatureVerificationManager : public VerificationManager { - public: - SignatureVerificationManager(interface::ConsumerSocket* icn_socket) - : icn_socket_(icn_socket), key_packets_() {} - - interface::VerificationPolicy onPacketToVerify(const Packet& packet) override; - bool onKeyToVerify() override; - - private: - ConsumerSocket* icn_socket_; - std::queue key_packets_; - - ContentObjectPtr copyPacket(const Packet& packet) { - std::shared_ptr packet_copy = - packet.acquireMemBufReference(); - ContentObjectPtr content_object_copy = - std::make_shared(std::move(packet_copy)); - std::unique_ptr payload_copy = packet.getPayload(); - content_object_copy->appendPayload(std::move(payload_copy)); - return content_object_copy; - } -}; - -} // end namespace protocol - -} // end namespace transport -- cgit 1.2.3-korg