diff options
author | Mauro Sardara <msardara@cisco.com> | 2020-02-21 11:52:28 +0100 |
---|---|---|
committer | Mauro Sardara <msardara@cisco.com> | 2020-02-26 13:19:16 +0100 |
commit | f4433f28b509a9f67ca85d79000ccf9c2f4b7a24 (patch) | |
tree | 0f754bc9d8222f3ace11849165753acd85be3b38 /libtransport/src/protocols | |
parent | 0e7669445b6be1163189521eabed7dd0124043c8 (diff) |
[HICN-534] Major rework on libtransport organization
Change-Id: I361b83a18b4fd59be136d5f0817fc28e17e89884
Signed-off-by: Mauro Sardara <msardara@cisco.com>
Diffstat (limited to 'libtransport/src/protocols')
37 files changed, 5277 insertions, 0 deletions
diff --git a/libtransport/src/protocols/CMakeLists.txt b/libtransport/src/protocols/CMakeLists.txt new file mode 100644 index 000000000..3156d9ae9 --- /dev/null +++ b/libtransport/src/protocols/CMakeLists.txt @@ -0,0 +1,75 @@ +# Copyright (c) 2017-2019 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +cmake_minimum_required(VERSION 3.5 FATAL_ERROR) + +list(APPEND HEADER_FILES + ${CMAKE_CURRENT_SOURCE_DIR}/indexer.h + ${CMAKE_CURRENT_SOURCE_DIR}/incremental_indexer.h + ${CMAKE_CURRENT_SOURCE_DIR}/manifest_incremental_indexer.h + ${CMAKE_CURRENT_SOURCE_DIR}/reassembly.h + ${CMAKE_CURRENT_SOURCE_DIR}/datagram_reassembly.h + ${CMAKE_CURRENT_SOURCE_DIR}/byte_stream_reassembly.h + ${CMAKE_CURRENT_SOURCE_DIR}/congestion_window_protocol.h + ${CMAKE_CURRENT_SOURCE_DIR}/packet_manager.h + ${CMAKE_CURRENT_SOURCE_DIR}/rate_estimation.h + ${CMAKE_CURRENT_SOURCE_DIR}/protocol.h + ${CMAKE_CURRENT_SOURCE_DIR}/raaqm.h + ${CMAKE_CURRENT_SOURCE_DIR}/raaqm_data_path.h + ${CMAKE_CURRENT_SOURCE_DIR}/cbr.h + ${CMAKE_CURRENT_SOURCE_DIR}/rtc.h + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_data_path.h + ${CMAKE_CURRENT_SOURCE_DIR}/errors.h + ${CMAKE_CURRENT_SOURCE_DIR}/verification_manager.h + ${CMAKE_CURRENT_SOURCE_DIR}/data_processing_events.h +) + +list(APPEND SOURCE_FILES + ${CMAKE_CURRENT_SOURCE_DIR}/indexer.cc + ${CMAKE_CURRENT_SOURCE_DIR}/incremental_indexer.cc + ${CMAKE_CURRENT_SOURCE_DIR}/manifest_incremental_indexer.cc + ${CMAKE_CURRENT_SOURCE_DIR}/reassembly.cc + ${CMAKE_CURRENT_SOURCE_DIR}/datagram_reassembly.cc + ${CMAKE_CURRENT_SOURCE_DIR}/byte_stream_reassembly.cc + ${CMAKE_CURRENT_SOURCE_DIR}/protocol.cc + ${CMAKE_CURRENT_SOURCE_DIR}/raaqm.cc + ${CMAKE_CURRENT_SOURCE_DIR}/rate_estimation.cc + ${CMAKE_CURRENT_SOURCE_DIR}/raaqm_data_path.cc + ${CMAKE_CURRENT_SOURCE_DIR}/cbr.cc + ${CMAKE_CURRENT_SOURCE_DIR}/rtc.cc + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_data_path.cc + ${CMAKE_CURRENT_SOURCE_DIR}/errors.cc + ${CMAKE_CURRENT_SOURCE_DIR}/verification_manager.cc +) + +set(RAAQM_CONFIG_INSTALL_PREFIX +${CMAKE_INSTALL_PREFIX}/etc/hicn +) + +set(raaqm_config_path + ${RAAQM_CONFIG_INSTALL_PREFIX}/consumer.conf + PARENT_SCOPE +) + +set(TRANSPORT_CONFIG + ${CMAKE_CURRENT_SOURCE_DIR}/consumer.conf +) + +install( + FILES ${TRANSPORT_CONFIG} + DESTINATION ${CMAKE_INSTALL_FULL_SYSCONFDIR}/hicn + COMPONENT lib${LIBTRANSPORT} +) + +set(SOURCE_FILES ${SOURCE_FILES} PARENT_SCOPE) +set(HEADER_FILES ${HEADER_FILES} PARENT_SCOPE)
\ No newline at end of file diff --git a/libtransport/src/protocols/byte_stream_reassembly.cc b/libtransport/src/protocols/byte_stream_reassembly.cc new file mode 100644 index 000000000..c2996ebc1 --- /dev/null +++ b/libtransport/src/protocols/byte_stream_reassembly.cc @@ -0,0 +1,126 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <hicn/transport/interfaces/socket_consumer.h> +#include <hicn/transport/utils/array.h> +#include <hicn/transport/utils/membuf.h> + +#include <implementation/socket_consumer.h> +#include <protocols/byte_stream_reassembly.h> +#include <protocols/errors.h> +#include <protocols/indexer.h> +#include <protocols/protocol.h> + +namespace transport { + +namespace protocol { + +using namespace core; +using ReadCallback = interface::ConsumerSocket::ReadCallback; + +ByteStreamReassembly::ByteStreamReassembly( + implementation::ConsumerSocket *icn_socket, + TransportProtocol *transport_protocol) + : Reassembly(icn_socket, transport_protocol), + index_(IndexManager::invalid_index), + download_complete_(false) {} + +void ByteStreamReassembly::reassemble( + std::unique_ptr<ContentObjectManifest> &&manifest) { + if (TRANSPORT_EXPECT_TRUE(manifest != nullptr)) { + received_packets_.emplace( + std::make_pair(manifest->getName().getSuffix(), nullptr)); + assembleContent(); + } +} + +void ByteStreamReassembly::reassemble(ContentObject::Ptr &&content_object) { + if (TRANSPORT_EXPECT_TRUE(content_object != nullptr)) { + received_packets_.emplace(std::make_pair( + content_object->getName().getSuffix(), std::move(content_object))); + assembleContent(); + } +} + +void ByteStreamReassembly::assembleContent() { + if (TRANSPORT_EXPECT_FALSE(index_ == IndexManager::invalid_index)) { + index_ = index_manager_->getNextReassemblySegment(); + if (index_ == IndexManager::invalid_index) { + return; + } + } + + auto it = received_packets_.find((const unsigned int)index_); + while (it != received_packets_.end()) { + // Check if valid packet + if (it->second) { + copyContent(*it->second); + } + + received_packets_.erase(it); + index_ = index_manager_->getNextReassemblySegment(); + it = received_packets_.find((const unsigned int)index_); + } + + if (!download_complete_ && index_ != IndexManager::invalid_index) { + transport_protocol_->onReassemblyFailed(index_); + } +} + +void ByteStreamReassembly::copyContent(const ContentObject &content_object) { + auto a = content_object.getPayload(); + auto payload_length = a->length(); + auto write_size = std::min(payload_length, read_buffer_->tailroom()); + auto additional_bytes = payload_length > read_buffer_->tailroom() + ? payload_length - read_buffer_->tailroom() + : 0; + + std::memcpy(read_buffer_->writableTail(), a->data(), write_size); + read_buffer_->append(write_size); + + if (!read_buffer_->tailroom()) { + notifyApplication(); + std::memcpy(read_buffer_->writableTail(), a->data() + write_size, + additional_bytes); + read_buffer_->append(additional_bytes); + } + + download_complete_ = + index_manager_->getFinalSuffix() == content_object.getName().getSuffix(); + + if (TRANSPORT_EXPECT_FALSE(download_complete_)) { + notifyApplication(); + transport_protocol_->onContentReassembled( + make_error_code(protocol_error::success)); + } +} + +void ByteStreamReassembly::reInitialize() { + index_ = IndexManager::invalid_index; + download_complete_ = false; + + received_packets_.clear(); + + // reset read buffer + ReadCallback *read_callback; + reassembly_consumer_socket_->getSocketOption( + interface::ConsumerCallbacksOptions::READ_CALLBACK, &read_callback); + + read_buffer_ = utils::MemBuf::create(read_callback->maxBufferSize()); +} + +} // namespace protocol + +} // namespace transport diff --git a/libtransport/src/protocols/byte_stream_reassembly.h b/libtransport/src/protocols/byte_stream_reassembly.h new file mode 100644 index 000000000..5e5c9ec6b --- /dev/null +++ b/libtransport/src/protocols/byte_stream_reassembly.h @@ -0,0 +1,54 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <protocols/reassembly.h> + +namespace transport { + +namespace protocol { + +class ByteStreamReassembly : public Reassembly { + public: + ByteStreamReassembly(implementation::ConsumerSocket *icn_socket, + TransportProtocol *transport_protocol); + + protected: + virtual void reassemble(core::ContentObject::Ptr &&content_object) override; + + virtual void reassemble( + std::unique_ptr<core::ContentObjectManifest> &&manifest) override; + + virtual void copyContent(const core::ContentObject &content_object); + + virtual void reInitialize() override; + + private: + void assembleContent(); + + protected: + // The consumer socket + // std::unique_ptr<IncrementalIndexManager> incremental_index_manager_; + // std::unique_ptr<ManifestIndexManager> manifest_index_manager_; + // IndexVerificationManager *index_manager_; + std::unordered_map<std::uint32_t, core::ContentObject::Ptr> received_packets_; + uint32_t index_; + bool download_complete_; +}; + +} // namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/cbr.cc b/libtransport/src/protocols/cbr.cc new file mode 100644 index 000000000..5df55bd5c --- /dev/null +++ b/libtransport/src/protocols/cbr.cc @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <implementation/socket_consumer.h> + +#include <protocols/cbr.h> + +namespace transport { + +namespace protocol { + +using namespace interface; + +CbrTransportProtocol::CbrTransportProtocol( + implementation::ConsumerSocket *icnet_socket) + : RaaqmTransportProtocol(icnet_socket) {} + +int CbrTransportProtocol::start() { return RaaqmTransportProtocol::start(); } + +void CbrTransportProtocol::reset() { + RaaqmTransportProtocol::reset(); + socket_->getSocketOption(GeneralTransportOptions::CURRENT_WINDOW_SIZE, + current_window_size_); +} + +void CbrTransportProtocol::afterDataUnsatisfied(uint64_t segment) {} + +void CbrTransportProtocol::afterContentReception( + const Interest &interest, const ContentObject &content_object) { + auto segment = content_object.getName().getSuffix(); + auto now = utils::SteadyClock::now(); + auto rtt = std::chrono::duration_cast<utils::Microseconds>( + now - interest_timepoints_[segment & mask]); + // Update stats + updateStats(segment, rtt.count(), now); +} + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/cbr.h b/libtransport/src/protocols/cbr.h new file mode 100644 index 000000000..20129f6a3 --- /dev/null +++ b/libtransport/src/protocols/cbr.h @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <protocols/raaqm.h> + +namespace transport { + +namespace protocol { + +class CbrTransportProtocol : public RaaqmTransportProtocol { + public: + CbrTransportProtocol(implementation::ConsumerSocket *icnet_socket); + + int start() override; + + void reset() override; + + private: + void afterContentReception(const Interest &interest, + const ContentObject &content_object) override; + void afterDataUnsatisfied(uint64_t segment) override; +}; + +} // end namespace protocol + +} // end namespace transport
\ No newline at end of file diff --git a/libtransport/src/protocols/congestion_window_protocol.h b/libtransport/src/protocols/congestion_window_protocol.h new file mode 100644 index 000000000..36ac6eb17 --- /dev/null +++ b/libtransport/src/protocols/congestion_window_protocol.h @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +namespace transport { + +namespace protocol { + +class CWindowProtocol { + protected: + virtual void increaseWindow() = 0; + virtual void decreaseWindow() = 0; +}; + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/consumer.conf b/libtransport/src/protocols/consumer.conf new file mode 100644 index 000000000..1a366f32f --- /dev/null +++ b/libtransport/src/protocols/consumer.conf @@ -0,0 +1,21 @@ +; this file contais the parameters for RAAQM +autotune = no +lifetime = 500 +retransmissions = 128 +beta = 0.99 +drop = 0.003 +beta_wifi_ = 0.99 +drop_wifi_ = 0.6 +beta_lte_ = 0.99 +drop_lte_ = 0.003 +wifi_delay_ = 200 +lte_delay_ = 9000 + +alpha = 0.95 +batching_parameter = 200 + +;Choice of rate estimator: +;0 --> an estimation each $(batching_parameter) packets +;1 --> an estimation "a la TCP", estimation at the end of the download of the segment + +rate_estimator = 0 diff --git a/libtransport/src/protocols/data_processing_events.h b/libtransport/src/protocols/data_processing_events.h new file mode 100644 index 000000000..8975c2b4a --- /dev/null +++ b/libtransport/src/protocols/data_processing_events.h @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <hicn/transport/core/content_object.h> +#include <hicn/transport/core/interest.h> + +namespace transport { +namespace protocol { + +class ContentObjectProcessingEventCallback { + public: + virtual ~ContentObjectProcessingEventCallback() = default; + virtual void onPacketDropped(core::Interest::Ptr &&i, + core::ContentObject::Ptr &&c) = 0; + virtual void onReassemblyFailed(std::uint32_t missing_segment) = 0; +}; + +} // namespace protocol +} // namespace transport diff --git a/libtransport/src/protocols/datagram_reassembly.cc b/libtransport/src/protocols/datagram_reassembly.cc new file mode 100644 index 000000000..abd7e984d --- /dev/null +++ b/libtransport/src/protocols/datagram_reassembly.cc @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <protocols/datagram_reassembly.h> + +namespace transport { + +namespace protocol { + +DatagramReassembly::DatagramReassembly( + implementation::ConsumerSocket* icn_socket, + TransportProtocol* transport_protocol) + : Reassembly(icn_socket, transport_protocol) {} + +void DatagramReassembly::reassemble(core::ContentObject::Ptr&& content_object) { + read_buffer_ = content_object->getPayload(); + Reassembly::notifyApplication(); +} + +void DatagramReassembly::reInitialize() {} + +} // namespace protocol + +} // namespace transport diff --git a/libtransport/src/protocols/datagram_reassembly.h b/libtransport/src/protocols/datagram_reassembly.h new file mode 100644 index 000000000..2427ae62f --- /dev/null +++ b/libtransport/src/protocols/datagram_reassembly.h @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <protocols/reassembly.h> + +namespace transport { + +namespace protocol { + +class DatagramReassembly : public Reassembly { + public: + DatagramReassembly(implementation::ConsumerSocket *icn_socket, + TransportProtocol *transport_protocol); + + virtual void reassemble(core::ContentObject::Ptr &&content_object) override; + virtual void reInitialize() override; + virtual void reassemble( + std::unique_ptr<core::ContentObjectManifest> &&manifest) override { + return; + } +}; + +} // namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/errors.cc b/libtransport/src/protocols/errors.cc new file mode 100644 index 000000000..eefb6f957 --- /dev/null +++ b/libtransport/src/protocols/errors.cc @@ -0,0 +1,60 @@ +/* + * Copyright (c) 2020 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <protocols/errors.h> + +namespace transport { +namespace protocol { + +const std::error_category& protocol_category() { + static protocol_category_impl instance; + + return instance; +} + +const char* protocol_category_impl::name() const throw() { + return "transport::protocol::error"; +} + +std::string protocol_category_impl::message(int ev) const { + switch (static_cast<protocol_error>(ev)) { + case protocol_error::success: { + return "Success"; + } + case protocol_error::signature_verification_failed: { + return "Signature verification failed."; + } + case protocol_error::integrity_verification_failed: { + return "Integrity verification failed"; + } + case protocol_error::no_verifier_provided: { + return "Transport cannot get any verifier for the given data."; + } + case protocol_error::io_error: { + return "Conectivity error between transport and local forwarder"; + } + case protocol_error::max_retransmissions_error: { + return "Transport protocol reached max number of retransmissions allowed " + "for the same interest."; + } + case protocol_error::session_aborted: { + return "The session has been aborted by the application."; + } + default: { return "Unknown protocol error"; } + } +} + +} // namespace protocol +} // namespace transport
\ No newline at end of file diff --git a/libtransport/src/protocols/errors.h b/libtransport/src/protocols/errors.h new file mode 100644 index 000000000..cb3d3474e --- /dev/null +++ b/libtransport/src/protocols/errors.h @@ -0,0 +1,91 @@ +/* + * Copyright (c) 2020 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <string> +#include <system_error> + +namespace transport { +namespace protocol { + +/** + * @brief Get the default server error category. + * @return The default server error category instance. + * + * @warning The first call to this function is thread-safe only starting with + * C++11. + */ +const std::error_category& protocol_category(); + +/** + * The list of errors. + */ +enum class protocol_error { + success = 0, + signature_verification_failed, + integrity_verification_failed, + no_verifier_provided, + io_error, + max_retransmissions_error, + session_aborted, +}; + +/** + * @brief Create an error_code instance for the given error. + * @param error The error. + * @return The error_code instance. + */ +inline std::error_code make_error_code(protocol_error error) { + return std::error_code(static_cast<int>(error), protocol_category()); +} + +/** + * @brief Create an error_condition instance for the given error. + * @param error The error. + * @return The error_condition instance. + */ +inline std::error_condition make_error_condition(protocol_error error) { + return std::error_condition(static_cast<int>(error), protocol_category()); +} + +/** + * @brief A server error category. + */ +class protocol_category_impl : public std::error_category { + public: + /** + * @brief Get the name of the category. + * @return The name of the category. + */ + virtual const char* name() const throw(); + + /** + * @brief Get the error message for a given error. + * @param ev The error numeric value. + * @return The message associated to the error. + */ + virtual std::string message(int ev) const; +}; +} // namespace protocol +} // namespace transport + +namespace std { +// namespace system { +template <> +struct is_error_code_enum<::transport::protocol::protocol_error> + : public std::true_type {}; +// } // namespace system +} // namespace std
\ No newline at end of file diff --git a/libtransport/src/protocols/incremental_indexer.cc b/libtransport/src/protocols/incremental_indexer.cc new file mode 100644 index 000000000..e590b4fee --- /dev/null +++ b/libtransport/src/protocols/incremental_indexer.cc @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <protocols/incremental_indexer.h> + +#include <hicn/transport/interfaces/socket_consumer.h> +#include <protocols/protocol.h> + +namespace transport { +namespace protocol { + +void IncrementalIndexer::onContentObject( + core::Interest::Ptr &&interest, core::ContentObject::Ptr &&content_object) { + using namespace interface; + + if (TRANSPORT_EXPECT_FALSE(content_object->testRst())) { + final_suffix_ = content_object->getName().getSuffix(); + } + + auto ret = verification_manager_->onPacketToVerify(*content_object); + + switch (ret) { + case VerificationPolicy::ACCEPT_PACKET: { + reassembly_->reassemble(std::move(content_object)); + break; + } + case VerificationPolicy::DROP_PACKET: { + transport_protocol_->onPacketDropped(std::move(interest), + std::move(content_object)); + break; + } + case VerificationPolicy::ABORT_SESSION: { + transport_protocol_->onContentReassembled( + make_error_code(protocol_error::session_aborted)); + break; + } + } +} + +} // namespace protocol +} // namespace transport
\ No newline at end of file diff --git a/libtransport/src/protocols/incremental_indexer.h b/libtransport/src/protocols/incremental_indexer.h new file mode 100644 index 000000000..20c5e4759 --- /dev/null +++ b/libtransport/src/protocols/incremental_indexer.h @@ -0,0 +1,143 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <hicn/transport/errors/runtime_exception.h> +#include <hicn/transport/errors/unexpected_manifest_exception.h> +#include <hicn/transport/utils/literals.h> + +#include <protocols/indexer.h> +#include <protocols/reassembly.h> +#include <protocols/verification_manager.h> + +#include <deque> + +namespace transport { + +namespace interface { +class ConsumerSocket; +} + +namespace protocol { + +class Reassembly; +class TransportProtocol; + +class IncrementalIndexer : public Indexer { + public: + IncrementalIndexer(implementation::ConsumerSocket *icn_socket, + TransportProtocol *transport, Reassembly *reassembly) + : socket_(icn_socket), + reassembly_(reassembly), + transport_protocol_(transport), + final_suffix_(std::numeric_limits<uint32_t>::max()), + first_suffix_(0), + next_download_suffix_(0), + next_reassembly_suffix_(0), + verification_manager_( + std::make_unique<SignatureVerificationManager>(icn_socket)) { + if (reassembly_) { + reassembly_->setIndexer(this); + } + } + + IncrementalIndexer(const IncrementalIndexer &) = delete; + + IncrementalIndexer(IncrementalIndexer &&other) + : socket_(other.socket_), + reassembly_(other.reassembly_), + transport_protocol_(other.transport_protocol_), + final_suffix_(other.final_suffix_), + first_suffix_(other.first_suffix_), + next_download_suffix_(other.next_download_suffix_), + next_reassembly_suffix_(other.next_reassembly_suffix_), + verification_manager_(std::move(other.verification_manager_)) { + if (reassembly_) { + reassembly_->setIndexer(this); + } + } + + /** + * + */ + virtual ~IncrementalIndexer() {} + + TRANSPORT_ALWAYS_INLINE virtual void reset( + std::uint32_t offset = 0) override { + final_suffix_ = std::numeric_limits<uint32_t>::max(); + next_download_suffix_ = offset; + next_reassembly_suffix_ = offset; + } + + /** + * Retrieve from the manifest the next suffix to retrieve. + */ + TRANSPORT_ALWAYS_INLINE virtual uint32_t getNextSuffix() override { + return next_download_suffix_ <= final_suffix_ ? next_download_suffix_++ + : IndexManager::invalid_index; + } + + TRANSPORT_ALWAYS_INLINE virtual void setFirstSuffix( + uint32_t suffix) override { + first_suffix_ = suffix; + } + + /** + * Retrive the next segment to be reassembled. + */ + TRANSPORT_ALWAYS_INLINE virtual uint32_t getNextReassemblySegment() override { + return next_reassembly_suffix_ <= final_suffix_ + ? next_reassembly_suffix_++ + : IndexManager::invalid_index; + } + + TRANSPORT_ALWAYS_INLINE virtual bool isFinalSuffixDiscovered() override { + return final_suffix_ != std::numeric_limits<uint32_t>::max(); + } + + TRANSPORT_ALWAYS_INLINE virtual uint32_t getFinalSuffix() override { + return final_suffix_; + } + + void onContentObject(core::Interest::Ptr &&interest, + core::ContentObject::Ptr &&content_object) override; + + TRANSPORT_ALWAYS_INLINE void setReassembly(Reassembly *reassembly) { + reassembly_ = reassembly; + + if (reassembly_) { + reassembly_->setIndexer(this); + } + } + + TRANSPORT_ALWAYS_INLINE bool onKeyToVerify() override { + return verification_manager_->onKeyToVerify(); + } + + protected: + implementation::ConsumerSocket *socket_; + Reassembly *reassembly_; + TransportProtocol *transport_protocol_; + uint32_t final_suffix_; + uint32_t first_suffix_; + uint32_t next_download_suffix_; + uint32_t next_reassembly_suffix_; + std::unique_ptr<VerificationManager> verification_manager_; +}; + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/indexer.cc b/libtransport/src/protocols/indexer.cc new file mode 100644 index 000000000..ca12330a6 --- /dev/null +++ b/libtransport/src/protocols/indexer.cc @@ -0,0 +1,79 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <hicn/transport/utils/branch_prediction.h> + +#include <protocols/incremental_indexer.h> +#include <protocols/indexer.h> +#include <protocols/manifest_incremental_indexer.h> +#include <protocols/protocol.h> + +namespace transport { +namespace protocol { + +IndexManager::IndexManager(implementation::ConsumerSocket *icn_socket, + TransportProtocol *transport, Reassembly *reassembly) + : indexer_(std::make_unique<IncrementalIndexer>(icn_socket, transport, + reassembly)), + first_segment_received_(false), + icn_socket_(icn_socket), + transport_(transport), + reassembly_(reassembly) {} + +void IndexManager::onContentObject(core::Interest::Ptr &&interest, + core::ContentObject::Ptr &&content_object) { + if (first_segment_received_) { + indexer_->onContentObject(std::move(interest), std::move(content_object)); + } else { + std::uint32_t segment_number = interest->getName().getSuffix(); + + if (segment_number == 0) { + // Check if manifest + if (content_object->getPayloadType() == PayloadType::MANIFEST) { + IncrementalIndexer *indexer = + static_cast<IncrementalIndexer *>(indexer_.release()); + indexer_ = + std::make_unique<ManifestIncrementalIndexer>(std::move(*indexer)); + delete indexer; + } + + indexer_->onContentObject(std::move(interest), std::move(content_object)); + auto it = interest_data_set_.begin(); + while (it != interest_data_set_.end()) { + indexer_->onContentObject( + std::move(const_cast<core::Interest::Ptr &&>(it->first)), + std::move(const_cast<core::ContentObject::Ptr &&>(it->second))); + it = interest_data_set_.erase(it); + } + + first_segment_received_ = true; + } else { + interest_data_set_.emplace(std::move(interest), + std::move(content_object)); + } + } +} + +bool IndexManager::onKeyToVerify() { return indexer_->onKeyToVerify(); } + +void IndexManager::reset(std::uint32_t offset) { + indexer_ = std::make_unique<IncrementalIndexer>(icn_socket_, transport_, + reassembly_); + first_segment_received_ = false; + interest_data_set_.clear(); +} + +} // namespace protocol +} // namespace transport diff --git a/libtransport/src/protocols/indexer.h b/libtransport/src/protocols/indexer.h new file mode 100644 index 000000000..8213a1503 --- /dev/null +++ b/libtransport/src/protocols/indexer.h @@ -0,0 +1,106 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <hicn/transport/core/content_object.h> +#include <hicn/transport/core/interest.h> + +#include <set> + +namespace transport { + +namespace implementation { +class ConsumerSocket; +} + +namespace protocol { + +class Reassembly; +class TransportProtocol; + +class Indexer { + public: + /** + * + */ + virtual ~Indexer() = default; + /** + * Retrieve from the manifest the next suffix to retrieve. + */ + virtual uint32_t getNextSuffix() = 0; + + virtual void setFirstSuffix(uint32_t suffix) = 0; + + /** + * Retrive the next segment to be reassembled. + */ + virtual uint32_t getNextReassemblySegment() = 0; + + virtual bool isFinalSuffixDiscovered() = 0; + + virtual uint32_t getFinalSuffix() = 0; + + virtual void reset(std::uint32_t offset = 0) = 0; + + virtual void onContentObject(core::Interest::Ptr &&interest, + core::ContentObject::Ptr &&content_object) = 0; + + virtual bool onKeyToVerify() = 0; +}; + +class IndexManager : Indexer { + public: + static constexpr uint32_t invalid_index = ~0; + + IndexManager(implementation::ConsumerSocket *icn_socket, + TransportProtocol *transport, Reassembly *reassembly); + + uint32_t getNextSuffix() override { return indexer_->getNextSuffix(); } + + void setFirstSuffix(uint32_t suffix) override { + indexer_->setFirstSuffix(suffix); + } + + uint32_t getNextReassemblySegment() override { + return indexer_->getNextReassemblySegment(); + } + + bool isFinalSuffixDiscovered() override { + return indexer_->isFinalSuffixDiscovered(); + } + + uint32_t getFinalSuffix() override { return indexer_->getFinalSuffix(); } + + void reset(std::uint32_t offset = 0) override; + + void onContentObject(core::Interest::Ptr &&interest, + core::ContentObject::Ptr &&content_object) override; + + bool onKeyToVerify() override; + + private: + std::unique_ptr<Indexer> indexer_; + bool first_segment_received_; + std::set<std::pair<core::Interest::Ptr, core::ContentObject::Ptr>> + interest_data_set_; + implementation::ConsumerSocket *icn_socket_; + TransportProtocol *transport_; + Reassembly *reassembly_; +}; + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/manifest_incremental_indexer.cc b/libtransport/src/protocols/manifest_incremental_indexer.cc new file mode 100644 index 000000000..1a2f9dec3 --- /dev/null +++ b/libtransport/src/protocols/manifest_incremental_indexer.cc @@ -0,0 +1,234 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <implementation/socket_consumer.h> + +#include <protocols/manifest_incremental_indexer.h> +#include <protocols/protocol.h> + +#include <cmath> +#include <deque> + +namespace transport { + +namespace protocol { + +using namespace interface; + +ManifestIncrementalIndexer::ManifestIncrementalIndexer( + implementation::ConsumerSocket *icn_socket, TransportProtocol *transport, + Reassembly *reassembly) + : IncrementalIndexer(icn_socket, transport, reassembly), + suffix_strategy_(utils::SuffixStrategyFactory::getSuffixStrategy( + NextSegmentCalculationStrategy::INCREMENTAL, next_download_suffix_, + 0)) {} + +void ManifestIncrementalIndexer::onContentObject( + core::Interest::Ptr &&interest, core::ContentObject::Ptr &&content_object) { + // Check if mainfiest or not + if (content_object->getPayloadType() == PayloadType::MANIFEST) { + onUntrustedManifest(std::move(interest), std::move(content_object)); + } else if (content_object->getPayloadType() == PayloadType::CONTENT_OBJECT) { + onUntrustedContentObject(std::move(interest), std::move(content_object)); + } +} + +void ManifestIncrementalIndexer::onUntrustedManifest( + core::Interest::Ptr &&interest, core::ContentObject::Ptr &&content_object) { + auto ret = verification_manager_->onPacketToVerify(*content_object); + + switch (ret) { + case VerificationPolicy::ACCEPT_PACKET: { + processTrustedManifest(std::move(content_object)); + break; + } + case VerificationPolicy::DROP_PACKET: + case VerificationPolicy::ABORT_SESSION: { + transport_protocol_->onContentReassembled( + make_error_code(protocol_error::session_aborted)); + break; + } + } +} + +void ManifestIncrementalIndexer::processTrustedManifest( + ContentObject::Ptr &&content_object) { + auto manifest = + std::make_unique<ContentObjectManifest>(std::move(*content_object)); + manifest->decode(); + + if (TRANSPORT_EXPECT_FALSE(manifest->getVersion() != + core::ManifestVersion::VERSION_1)) { + throw errors::RuntimeException("Received manifest with unknown version."); + } + + switch (manifest->getManifestType()) { + case core::ManifestType::INLINE_MANIFEST: { + auto _it = manifest->getSuffixList().begin(); + auto _end = manifest->getSuffixList().end(); + + suffix_strategy_->setFinalSuffix(manifest->getFinalBlockNumber()); + + for (; _it != _end; _it++) { + auto hash = + std::make_pair(std::vector<uint8_t>(_it->second, _it->second + 32), + manifest->getHashAlgorithm()); + + if (!checkUnverifiedSegments(_it->first, hash)) { + suffix_hash_map_[_it->first] = std::move(hash); + } + } + + reassembly_->reassemble(std::move(manifest)); + + break; + } + case core::ManifestType::FLIC_MANIFEST: { + throw errors::NotImplementedException(); + } + case core::ManifestType::FINAL_CHUNK_NUMBER: { + throw errors::NotImplementedException(); + } + } +} + +bool ManifestIncrementalIndexer::checkUnverifiedSegments( + std::uint32_t suffix, const HashEntry &hash) { + auto it = unverified_segments_.find(suffix); + + if (it != unverified_segments_.end()) { + auto ret = verifyContentObject(hash, *it->second.second); + + switch (ret) { + case VerificationPolicy::ACCEPT_PACKET: { + reassembly_->reassemble(std::move(it->second.second)); + break; + } + case VerificationPolicy::DROP_PACKET: { + transport_protocol_->onPacketDropped(std::move(it->second.first), + std::move(it->second.second)); + break; + } + case VerificationPolicy::ABORT_SESSION: { + transport_protocol_->onContentReassembled( + make_error_code(protocol_error::session_aborted)); + break; + } + } + + unverified_segments_.erase(it); + return true; + } + + return false; +} + +VerificationPolicy ManifestIncrementalIndexer::verifyContentObject( + const HashEntry &manifest_hash, const ContentObject &content_object) { + VerificationPolicy ret; + + auto hash_type = static_cast<utils::CryptoHashType>(manifest_hash.second); + auto data_packet_digest = content_object.computeDigest(manifest_hash.second); + auto data_packet_digest_bytes = + data_packet_digest.getDigest<uint8_t>().data(); + const std::vector<uint8_t> &manifest_digest_bytes = manifest_hash.first; + + if (utils::CryptoHash::compareBinaryDigest( + data_packet_digest_bytes, manifest_digest_bytes.data(), hash_type)) { + ret = VerificationPolicy::ACCEPT_PACKET; + } else { + ConsumerContentObjectVerificationFailedCallback + *verification_failed_callback = VOID_HANDLER; + socket_->getSocketOption(ConsumerCallbacksOptions::VERIFICATION_FAILED, + &verification_failed_callback); + ret = (*verification_failed_callback)( + *socket_->getInterface(), content_object, + make_error_code(protocol_error::integrity_verification_failed)); + } + + return ret; +} + +void ManifestIncrementalIndexer::onUntrustedContentObject( + Interest::Ptr &&i, ContentObject::Ptr &&c) { + auto suffix = c->getName().getSuffix(); + auto it = suffix_hash_map_.find(suffix); + + if (it != suffix_hash_map_.end()) { + auto ret = verifyContentObject(it->second, *c); + + switch (ret) { + case VerificationPolicy::ACCEPT_PACKET: { + suffix_hash_map_.erase(it); + reassembly_->reassemble(std::move(c)); + break; + } + case VerificationPolicy::DROP_PACKET: { + transport_protocol_->onPacketDropped(std::move(i), std::move(c)); + break; + } + case VerificationPolicy::ABORT_SESSION: { + transport_protocol_->onContentReassembled( + make_error_code(protocol_error::session_aborted)); + break; + } + } + } else { + unverified_segments_[suffix] = std::make_pair(std::move(i), std::move(c)); + } +} + +uint32_t ManifestIncrementalIndexer::getNextSuffix() { + auto ret = suffix_strategy_->getNextSuffix(); + + if (ret <= suffix_strategy_->getFinalSuffix() && + ret != utils::SuffixStrategy::INVALID_SUFFIX) { + suffix_queue_.push(ret); + return ret; + } + + return IndexManager::invalid_index; +} + +uint32_t ManifestIncrementalIndexer::getFinalSuffix() { + return suffix_strategy_->getFinalSuffix(); +} + +bool ManifestIncrementalIndexer::isFinalSuffixDiscovered() { + return IncrementalIndexer::isFinalSuffixDiscovered(); +} + +uint32_t ManifestIncrementalIndexer::getNextReassemblySegment() { + if (suffix_queue_.empty()) { + return IndexManager::invalid_index; + } + + auto ret = suffix_queue_.front(); + suffix_queue_.pop(); + return ret; +} + +void ManifestIncrementalIndexer::reset(std::uint32_t offset) { + IncrementalIndexer::reset(offset); + suffix_hash_map_.clear(); + unverified_segments_.clear(); + SuffixQueue empty; + std::swap(suffix_queue_, empty); + suffix_strategy_->reset(offset); +} + +} // namespace protocol + +} // namespace transport diff --git a/libtransport/src/protocols/manifest_incremental_indexer.h b/libtransport/src/protocols/manifest_incremental_indexer.h new file mode 100644 index 000000000..88ae1720b --- /dev/null +++ b/libtransport/src/protocols/manifest_incremental_indexer.h @@ -0,0 +1,92 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <implementation/socket.h> +#include <protocols/incremental_indexer.h> +#include <utils/suffix_strategy.h> + +#include <list> + +namespace transport { + +namespace protocol { + +class ManifestIncrementalIndexer : public IncrementalIndexer { + static constexpr double alpha = 0.3; + + public: + using SuffixQueue = std::queue<uint32_t>; + using HashEntry = std::pair<std::vector<uint8_t>, core::HashAlgorithm>; + + ManifestIncrementalIndexer(implementation::ConsumerSocket *icn_socket, + TransportProtocol *transport, + Reassembly *reassembly); + + ManifestIncrementalIndexer(IncrementalIndexer &&indexer) + : IncrementalIndexer(std::move(indexer)), + suffix_strategy_(utils::SuffixStrategyFactory::getSuffixStrategy( + core::NextSegmentCalculationStrategy::INCREMENTAL, + next_download_suffix_, 0)) { + for (uint32_t i = first_suffix_; i < next_download_suffix_; i++) { + suffix_queue_.push(i); + } + } + + virtual ~ManifestIncrementalIndexer() = default; + + void reset(std::uint32_t offset = 0) override; + + void onContentObject(core::Interest::Ptr &&interest, + core::ContentObject::Ptr &&content_object) override; + + uint32_t getNextSuffix() override; + + uint32_t getNextReassemblySegment() override; + + bool isFinalSuffixDiscovered() override; + + uint32_t getFinalSuffix() override; + + private: + void onUntrustedManifest(core::Interest::Ptr &&interest, + core::ContentObject::Ptr &&content_object); + void onUntrustedContentObject(core::Interest::Ptr &&interest, + core::ContentObject::Ptr &&content_object); + void processTrustedManifest(core::ContentObject::Ptr &&content_object); + void onManifestReceived(core::Interest::Ptr &&i, + core::ContentObject::Ptr &&c); + void onManifestTimeout(core::Interest::Ptr &&i); + VerificationPolicy verifyContentObject( + const HashEntry &manifest_hash, + const core::ContentObject &content_object); + bool checkUnverifiedSegments(std::uint32_t suffix, const HashEntry &hash); + + protected: + std::unique_ptr<utils::SuffixStrategy> suffix_strategy_; + SuffixQueue suffix_queue_; + + // Hash verification + std::unordered_map<uint32_t, HashEntry> suffix_hash_map_; + + std::unordered_map<uint32_t, + std::pair<core::Interest::Ptr, core::ContentObject::Ptr>> + unverified_segments_; +}; + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/packet_manager.h b/libtransport/src/protocols/packet_manager.h new file mode 100644 index 000000000..a552607ea --- /dev/null +++ b/libtransport/src/protocols/packet_manager.h @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <hicn/transport/utils/object_pool.h> + +namespace transport { + +namespace protocol { + +using namespace core; + +template <typename PacketType, std::size_t packet_pool_size = 4096> +class PacketManager { + static_assert(std::is_base_of<Packet, PacketType>::value, + "The packet manager support just Interest and Data."); + + public: + PacketManager(std::size_t size = packet_pool_size) : size_(0) { + // Create pool of interests + increasePoolSize(size); + } + + TRANSPORT_ALWAYS_INLINE void increasePoolSize(std::size_t size) { + for (std::size_t i = 0; i < size; i++) { + interest_pool_.add(new PacketType()); + } + + size_ += size; + } + + TRANSPORT_ALWAYS_INLINE typename PacketType::Ptr getPacket() { + auto result = interest_pool_.get(); + + while (TRANSPORT_EXPECT_FALSE(!result.first)) { + // Add packets to the pool + increasePoolSize(size_); + result = interest_pool_.get(); + } + + result.second->resetPayload(); + return std::move(result.second); + } + + private: + utils::ObjectPool<PacketType> interest_pool_; + std::size_t size_; +}; + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/protocol.cc b/libtransport/src/protocols/protocol.cc new file mode 100644 index 000000000..aa290bef8 --- /dev/null +++ b/libtransport/src/protocols/protocol.cc @@ -0,0 +1,105 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <hicn/transport/interfaces/socket_consumer.h> + +#include <implementation/socket_consumer.h> +#include <protocols/protocol.h> + +namespace transport { + +namespace protocol { + +using namespace interface; + +TransportProtocol::TransportProtocol(implementation::ConsumerSocket *icn_socket, + Reassembly *reassembly_protocol) + : socket_(icn_socket), + reassembly_protocol_(reassembly_protocol), + index_manager_( + std::make_unique<IndexManager>(socket_, this, reassembly_protocol)), + is_running_(false), + is_first_(false) { + socket_->getSocketOption(GeneralTransportOptions::PORTAL, portal_); + socket_->getSocketOption(OtherOptions::STATISTICS, &stats_); +} + +int TransportProtocol::start() { + // If the protocol is already running, return otherwise set as running + if (is_running_) return -1; + + // Reset the protocol state machine + reset(); + + // Set it is the first time we schedule an interest + is_first_ = true; + + // Schedule next interests + scheduleNextInterests(); + + is_first_ = false; + + // Set the protocol as running + is_running_ = true; + + // Start Event loop + portal_->runEventsLoop(); + + // Not running anymore + is_running_ = false; + + return 0; +} + +void TransportProtocol::stop() { + is_running_ = false; + portal_->stopEventsLoop(); +} + +void TransportProtocol::resume() { + if (is_running_) return; + + is_running_ = true; + + scheduleNextInterests(); + + portal_->runEventsLoop(); + + is_running_ = false; +} + +void TransportProtocol::onContentReassembled(std::error_code ec) { + interface::ConsumerSocket::ReadCallback *on_payload = VOID_HANDLER; + socket_->getSocketOption(READ_CALLBACK, &on_payload); + + if (!on_payload) { + throw errors::RuntimeException( + "The read callback must be installed in the transport before " + "starting " + "the content retrieval."); + } + + if (!ec) { + on_payload->readSuccess(stats_->getBytesRecv()); + } else { + on_payload->readError(ec); + } + + stop(); +} + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/protocol.h b/libtransport/src/protocols/protocol.h new file mode 100644 index 000000000..c094adaae --- /dev/null +++ b/libtransport/src/protocols/protocol.h @@ -0,0 +1,92 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <atomic> + +#include <hicn/transport/interfaces/statistics.h> +#include <hicn/transport/utils/object_pool.h> + +#include <implementation/socket.h> +#include <protocols/data_processing_events.h> +#include <protocols/indexer.h> +#include <protocols/packet_manager.h> +#include <protocols/reassembly.h> + +namespace transport { + +namespace protocol { + +using namespace core; + +class IndexVerificationManager; + +class TransportProtocolCallback { + virtual void onContentObject(const core::Interest &interest, + const core::ContentObject &content_object) = 0; + virtual void onTimeout(const core::Interest &interest) = 0; +}; + +class TransportProtocol : public implementation::BasePortal::ConsumerCallback, + public PacketManager<Interest>, + public ContentObjectProcessingEventCallback { + static constexpr std::size_t interest_pool_size = 4096; + + friend class ManifestIndexManager; + + public: + TransportProtocol(implementation::ConsumerSocket *icn_socket, + Reassembly *reassembly_protocol); + + virtual ~TransportProtocol() = default; + + TRANSPORT_ALWAYS_INLINE bool isRunning() { return is_running_; } + + virtual int start(); + + virtual void stop(); + + virtual void resume(); + + virtual bool verifyKeyPackets() = 0; + + virtual void scheduleNextInterests() = 0; + + // Events generated by the indexing + virtual void onContentReassembled(std::error_code ec); + virtual void onPacketDropped(Interest::Ptr &&interest, + ContentObject::Ptr &&content_object) = 0; + virtual void onReassemblyFailed(std::uint32_t missing_segment) = 0; + + protected: + // Consumer Callback + virtual void reset() = 0; + virtual void onContentObject(Interest::Ptr &&i, ContentObject::Ptr &&c) = 0; + virtual void onTimeout(Interest::Ptr &&i) = 0; + + protected: + implementation::ConsumerSocket *socket_; + std::unique_ptr<Reassembly> reassembly_protocol_; + std::unique_ptr<IndexManager> index_manager_; + std::shared_ptr<implementation::BasePortal> portal_; + std::atomic<bool> is_running_; + // True if it si the first time we schedule an interest + std::atomic<bool> is_first_; + interface::TransportStatistics *stats_; +}; + +} // end namespace protocol +} // end namespace transport diff --git a/libtransport/src/protocols/raaqm.cc b/libtransport/src/protocols/raaqm.cc new file mode 100644 index 000000000..8a38f8ccf --- /dev/null +++ b/libtransport/src/protocols/raaqm.cc @@ -0,0 +1,714 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <hicn/transport/interfaces/socket_consumer.h> + +#include <implementation/socket_consumer.h> +#include <protocols/errors.h> +#include <protocols/indexer.h> +#include <protocols/raaqm.h> + +#include <cstdlib> +#include <fstream> + +namespace transport { + +namespace protocol { + +using namespace interface; + +RaaqmTransportProtocol::RaaqmTransportProtocol( + implementation::ConsumerSocket *icn_socket) + : TransportProtocol(icn_socket, new ByteStreamReassembly(icn_socket, this)), + current_window_size_(1), + interests_in_flight_(0), + cur_path_(nullptr), + t0_(utils::SteadyClock::now()), + rate_estimator_(nullptr) { + init(); +} + +RaaqmTransportProtocol::~RaaqmTransportProtocol() { + if (rate_estimator_) { + delete rate_estimator_; + } +} + +int RaaqmTransportProtocol::start() { + if (rate_estimator_) { + rate_estimator_->onStart(); + } + + if (!cur_path_) { + // RAAQM + double drop_factor; + double minimum_drop_probability; + uint32_t sample_number; + uint32_t interest_lifetime; + + socket_->getSocketOption(RaaqmTransportOptions::DROP_FACTOR, drop_factor); + socket_->getSocketOption(RaaqmTransportOptions::MINIMUM_DROP_PROBABILITY, + minimum_drop_probability); + socket_->getSocketOption(RaaqmTransportOptions::SAMPLE_NUMBER, + sample_number); + socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, + interest_lifetime); + + // Rate Estimation + double alpha = 0.0; + uint32_t batching_param = 0; + uint32_t choice_param = 0; + socket_->getSocketOption(RateEstimationOptions::RATE_ESTIMATION_ALPHA, + alpha); + socket_->getSocketOption( + RateEstimationOptions::RATE_ESTIMATION_BATCH_PARAMETER, batching_param); + socket_->getSocketOption(RateEstimationOptions::RATE_ESTIMATION_CHOICE, + choice_param); + + if (choice_param == 1) { + rate_estimator_ = new ALaTcpEstimator(); + } else { + rate_estimator_ = new SimpleEstimator(alpha, batching_param); + } + + socket_->getSocketOption(RateEstimationOptions::RATE_ESTIMATION_OBSERVER, + &rate_estimator_->observer_); + + // Current path + auto cur_path = std::make_unique<RaaqmDataPath>( + drop_factor, minimum_drop_probability, interest_lifetime * 1000, + sample_number); + cur_path_ = cur_path.get(); + path_table_[default_values::path_id] = std::move(cur_path); + } + + portal_->setConsumerCallback(this); + return TransportProtocol::start(); +} + +void RaaqmTransportProtocol::resume() { return TransportProtocol::resume(); } + +void RaaqmTransportProtocol::reset() { + // Set first segment to retrieve + core::Name *name; + socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &name); + index_manager_->reset(); + index_manager_->setFirstSuffix(name->getSuffix()); + std::queue<Interest::Ptr> empty; + std::swap(interest_to_retransmit_, empty); + stats_->reset(); + + // Reset reassembly component + reassembly_protocol_->reInitialize(); + + // Reset protocol variables + interests_in_flight_ = 0; + t0_ = utils::SteadyClock::now(); +} + +bool RaaqmTransportProtocol::verifyKeyPackets() { + return index_manager_->onKeyToVerify(); +} + +void RaaqmTransportProtocol::increaseWindow() { + // return; + double max_window_size = 0.; + socket_->getSocketOption(GeneralTransportOptions::MAX_WINDOW_SIZE, + max_window_size); + if (current_window_size_ < max_window_size) { + double gamma = 0.; + socket_->getSocketOption(RaaqmTransportOptions::GAMMA_VALUE, gamma); + + current_window_size_ += gamma / current_window_size_; + socket_->setSocketOption(GeneralTransportOptions::CURRENT_WINDOW_SIZE, + current_window_size_); + } + rate_estimator_->onWindowIncrease(current_window_size_); +} + +void RaaqmTransportProtocol::decreaseWindow() { + // return; + double min_window_size = 0.; + socket_->getSocketOption(GeneralTransportOptions::MIN_WINDOW_SIZE, + min_window_size); + if (current_window_size_ > min_window_size) { + double beta = 0.; + socket_->getSocketOption(RaaqmTransportOptions::BETA_VALUE, beta); + + current_window_size_ = current_window_size_ * beta; + if (current_window_size_ < min_window_size) { + current_window_size_ = min_window_size; + } + + socket_->setSocketOption(GeneralTransportOptions::CURRENT_WINDOW_SIZE, + current_window_size_); + } + rate_estimator_->onWindowDecrease(current_window_size_); +} + +void RaaqmTransportProtocol::afterDataUnsatisfied(uint64_t segment) { + // Decrease the window because the timeout happened + decreaseWindow(); +} + +void RaaqmTransportProtocol::afterContentReception( + const Interest &interest, const ContentObject &content_object) { + updatePathTable(content_object); + increaseWindow(); + updateRtt(interest.getName().getSuffix()); + rate_estimator_->onDataReceived((int)content_object.payloadSize() + + (int)content_object.headerSize()); + // Set drop probablility and window size accordingly + RAAQM(); +} + +void RaaqmTransportProtocol::init() { + std::ifstream is(RAAQM_CONFIG_PATH); + + std::string line; + raaqm_autotune_ = false; + default_beta_ = default_values::beta_value; + default_drop_ = default_values::drop_factor; + beta_wifi_ = default_values::beta_value; + drop_wifi_ = default_values::drop_factor; + beta_lte_ = default_values::beta_value; + drop_lte_ = default_values::drop_factor; + wifi_delay_ = 1000; + lte_delay_ = 15000; + + if (!is) { + TRANSPORT_LOGW( + "WARNING: RAAQM parameters not found at %s, set default values", + RAAQM_CONFIG_PATH); + return; + } + + while (getline(is, line)) { + std::string command; + std::istringstream line_s(line); + + line_s >> command; + + if (command == ";") { + continue; + } + + if (command == "autotune") { + std::string tmp; + std::string val; + line_s >> tmp >> val; + if (val == "yes") { + raaqm_autotune_ = true; + } else { + raaqm_autotune_ = false; + } + continue; + } + + if (command == "lifetime") { + std::string tmp; + uint32_t lifetime; + line_s >> tmp >> lifetime; + socket_->setSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, + lifetime); + continue; + } + + if (command == "retransmissions") { + std::string tmp; + uint32_t rtx; + line_s >> tmp >> rtx; + socket_->setSocketOption(GeneralTransportOptions::MAX_INTEREST_RETX, rtx); + continue; + } + + if (command == "beta") { + std::string tmp; + line_s >> tmp >> default_beta_; + socket_->setSocketOption(RaaqmTransportOptions::BETA_VALUE, + default_beta_); + continue; + } + + if (command == "drop") { + std::string tmp; + line_s >> tmp >> default_drop_; + socket_->setSocketOption(RaaqmTransportOptions::DROP_FACTOR, + default_drop_); + continue; + } + + if (command == "beta_wifi_") { + std::string tmp; + line_s >> tmp >> beta_wifi_; + continue; + } + + if (command == "drop_wifi_") { + std::string tmp; + line_s >> tmp >> drop_wifi_; + continue; + } + + if (command == "beta_lte_") { + std::string tmp; + line_s >> tmp >> beta_lte_; + continue; + } + + if (command == "drop_lte_") { + std::string tmp; + line_s >> tmp >> drop_lte_; + continue; + } + + if (command == "wifi_delay_") { + std::string tmp; + line_s >> tmp >> wifi_delay_; + continue; + } + + if (command == "lte_delay_") { + std::string tmp; + line_s >> tmp >> lte_delay_; + continue; + } + if (command == "alpha") { + std::string tmp; + double rate_alpha = 0.0; + line_s >> tmp >> rate_alpha; + socket_->setSocketOption(RateEstimationOptions::RATE_ESTIMATION_ALPHA, + rate_alpha); + continue; + } + + if (command == "batching_parameter") { + std::string tmp; + uint32_t batching_param = 0; + line_s >> tmp >> batching_param; + socket_->setSocketOption( + RateEstimationOptions::RATE_ESTIMATION_BATCH_PARAMETER, + batching_param); + continue; + } + + if (command == "rate_estimator") { + std::string tmp; + uint32_t choice_param = 0; + line_s >> tmp >> choice_param; + socket_->setSocketOption(RateEstimationOptions::RATE_ESTIMATION_CHOICE, + choice_param); + continue; + } + } + + is.close(); +} + +void RaaqmTransportProtocol::onContentObject( + Interest::Ptr &&interest, ContentObject::Ptr &&content_object) { + // Check whether makes sense to continue + if (TRANSPORT_EXPECT_FALSE(!is_running_)) { + return; + } + + // Call application-defined callbacks + ConsumerContentObjectCallback *callback_content_object = VOID_HANDLER; + socket_->getSocketOption(ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT, + &callback_content_object); + if (*callback_content_object) { + (*callback_content_object)(*socket_->getInterface(), *content_object); + } + + ConsumerInterestCallback *callback_interest = VOID_HANDLER; + socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_SATISFIED, + &callback_interest); + if (*callback_interest) { + (*callback_interest)(*socket_->getInterface(), *interest); + } + + if (content_object->getPayloadType() == PayloadType::CONTENT_OBJECT) { + stats_->updateBytesRecv(content_object->payloadSize()); + } + + onContentSegment(std::move(interest), std::move(content_object)); + scheduleNextInterests(); +} + +void RaaqmTransportProtocol::onContentSegment( + Interest::Ptr &&interest, ContentObject::Ptr &&content_object) { + uint32_t incremental_suffix = content_object->getName().getSuffix(); + + // Decrease in-flight interests + interests_in_flight_--; + + // Update stats + if (!interest_retransmissions_[incremental_suffix & mask]) { + afterContentReception(*interest, *content_object); + } + + index_manager_->onContentObject(std::move(interest), + std::move(content_object)); +} + +void RaaqmTransportProtocol::onPacketDropped( + Interest::Ptr &&interest, ContentObject::Ptr &&content_object) { + uint32_t max_rtx = 0; + socket_->getSocketOption(GeneralTransportOptions::MAX_INTEREST_RETX, max_rtx); + + uint64_t segment = interest->getName().getSuffix(); + ConsumerInterestCallback *callback = VOID_HANDLER; + if (TRANSPORT_EXPECT_TRUE(interest_retransmissions_[segment & mask] < + max_rtx)) { + stats_->updateRetxCount(1); + + callback = VOID_HANDLER; + socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_RETRANSMISSION, + &callback); + if (*callback) { + (*callback)(*socket_->getInterface(), *interest); + } + + callback = VOID_HANDLER; + socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT, + &callback); + if (*callback) { + (*callback)(*socket_->getInterface(), *interest); + } + + if (!is_running_) { + return; + } + + interest_retransmissions_[segment & mask]++; + + interest_to_retransmit_.push(std::move(interest)); + } else { + TRANSPORT_LOGE( + "Stop: received not trusted packet %llu times", + (unsigned long long)interest_retransmissions_[segment & mask]); + onContentReassembled( + make_error_code(protocol_error::max_retransmissions_error)); + } +} + +void RaaqmTransportProtocol::onReassemblyFailed(std::uint32_t missing_segment) { + +} + +void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) { + checkForStalePaths(); + + const Name &n = interest->getName(); + + TRANSPORT_LOGW("Timeout on content %s", n.toString().c_str()); + + if (TRANSPORT_EXPECT_FALSE(!is_running_)) { + return; + } + + interests_in_flight_--; + + uint64_t segment = n.getSuffix(); + + // Do not retransmit interests asking contents that do not exist. + if (segment > index_manager_->getFinalSuffix()) { + return; + } + + ConsumerInterestCallback *callback = VOID_HANDLER; + socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_EXPIRED, + &callback); + if (*callback) { + (*callback)(*socket_->getInterface(), *interest); + } + + afterDataUnsatisfied(segment); + + uint32_t max_rtx = 0; + socket_->getSocketOption(GeneralTransportOptions::MAX_INTEREST_RETX, max_rtx); + + if (TRANSPORT_EXPECT_TRUE(interest_retransmissions_[segment & mask] < + max_rtx)) { + stats_->updateRetxCount(1); + + callback = VOID_HANDLER; + socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_RETRANSMISSION, + &callback); + if (*callback) { + (*callback)(*socket_->getInterface(), *interest); + } + + callback = VOID_HANDLER; + socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT, + &callback); + if (*callback) { + (*callback)(*socket_->getInterface(), *interest); + } + + if (!is_running_) { + return; + } + + interest_retransmissions_[segment & mask]++; + + interest_to_retransmit_.push(std::move(interest)); + + scheduleNextInterests(); + } else { + TRANSPORT_LOGE("Stop: reached max retx limit."); + onContentReassembled(std::make_error_code(std::errc(std::errc::io_error))); + } +} + +void RaaqmTransportProtocol::scheduleNextInterests() { + if (TRANSPORT_EXPECT_FALSE(!is_running_ && !is_first_)) { + return; + } + + if (TRANSPORT_EXPECT_FALSE(interests_in_flight_ >= current_window_size_ && + interest_to_retransmit_.size() > 0)) { + // send at least one interest if there are retransmissions to perform and + // there is no space left in the window + sendInterest(std::move(interest_to_retransmit_.front())); + TRANSPORT_LOGD("Window full, retransmit one content interest"); + interest_to_retransmit_.pop(); + } + + uint32_t index = IndexManager::invalid_index; + + // Send the interest needed for filling the window + while (interests_in_flight_ < current_window_size_) { + if (interest_to_retransmit_.size() > 0) { + sendInterest(std::move(interest_to_retransmit_.front())); + TRANSPORT_LOGD("Retransmit content interest"); + interest_to_retransmit_.pop(); + } else { + index = index_manager_->getNextSuffix(); + if (index == IndexManager::invalid_index) { + break; + } + + sendInterest(index); + TRANSPORT_LOGD("Send content interest %u", index); + } + } +} + +void RaaqmTransportProtocol::sendInterest(std::uint64_t next_suffix) { + auto interest = getPacket(); + core::Name *name; + socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &name); + name->setSuffix((uint32_t)next_suffix); + interest->setName(*name); + + uint32_t interest_lifetime; + socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, + interest_lifetime); + interest->setLifetime(interest_lifetime); + + ConsumerInterestCallback *callback = VOID_HANDLER; + socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT, + &callback); + if (*callback) { + callback->operator()(*socket_->getInterface(), *interest); + } + + if (TRANSPORT_EXPECT_FALSE(!is_running_ && !is_first_)) { + return; + } + + // This is set to ~0 so that the next interest_retransmissions_ + 1, + // performed by sendInterest, will result in 0 + interest_retransmissions_[next_suffix & mask] = ~0; + interest_timepoints_[next_suffix & mask] = utils::SteadyClock::now(); + sendInterest(std::move(interest)); +} + +void RaaqmTransportProtocol::sendInterest(Interest::Ptr &&interest) { + interests_in_flight_++; + interest_retransmissions_[interest->getName().getSuffix() & mask]++; + + portal_->sendInterest(std::move(interest)); +} + +void RaaqmTransportProtocol::onContentReassembled(std::error_code ec) { + rate_estimator_->onDownloadFinished(); + TransportProtocol::onContentReassembled(ec); +} + +void RaaqmTransportProtocol::updateRtt(uint64_t segment) { + if (TRANSPORT_EXPECT_FALSE(!cur_path_)) { + throw std::runtime_error("RAAQM ERROR: no current path found, exit"); + } else { + auto now = utils::SteadyClock::now(); + utils::Microseconds rtt = std::chrono::duration_cast<utils::Microseconds>( + now - interest_timepoints_[segment & mask]); + + // Update stats + updateStats((uint32_t)segment, rtt.count(), now); + + if (rate_estimator_) { + rate_estimator_->onRttUpdate((double)rtt.count()); + } + + cur_path_->insertNewRtt(rtt.count()); + cur_path_->smoothTimer(); + + if (cur_path_->newPropagationDelayAvailable()) { + checkDropProbability(); + } + } +} + +void RaaqmTransportProtocol::RAAQM() { + if (!cur_path_) { + throw errors::RuntimeException("ERROR: no current path found, exit"); + exit(EXIT_FAILURE); + } else { + // Change drop probability according to RTT statistics + cur_path_->updateDropProb(); + + double coin = ((double)rand() / (RAND_MAX)); + if (coin <= cur_path_->getDropProb()) { + decreaseWindow(); + } + } +} + +void RaaqmTransportProtocol::updateStats(uint32_t suffix, uint64_t rtt, + utils::TimePoint &now) { + // Update RTT statistics + stats_->updateAverageRtt(rtt); + stats_->updateAverageWindowSize(current_window_size_); + + // Call statistics callback + ConsumerTimerCallback *stats_callback = VOID_HANDLER; + socket_->getSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY, + &stats_callback); + if (*stats_callback) { + auto dt = std::chrono::duration_cast<utils::Milliseconds>(now - t0_); + + uint32_t timer_interval_milliseconds = 0; + socket_->getSocketOption(GeneralTransportOptions::STATS_INTERVAL, + timer_interval_milliseconds); + if (dt.count() > timer_interval_milliseconds) { + (*stats_callback)(*socket_->getInterface(), *stats_); + t0_ = utils::SteadyClock::now(); + } + } +} + +void RaaqmTransportProtocol::updatePathTable( + const ContentObject &content_object) { + uint32_t path_id = content_object.getPathLabel(); + + if (path_table_.find(path_id) == path_table_.end()) { + if (TRANSPORT_EXPECT_TRUE(cur_path_ != nullptr)) { + // Create a new path with some default param + + if (TRANSPORT_EXPECT_FALSE(path_table_.empty())) { + throw errors::RuntimeException( + "[RAAQM] No path initialized for path table, error could be in " + "default path initialization."); + } + + // Initiate the new path default param + auto new_path = std::make_unique<RaaqmDataPath>( + *(path_table_.at(default_values::path_id))); + + // Insert the new path into hash table + path_table_[path_id] = std::move(new_path); + } else { + throw errors::RuntimeException( + "UNEXPECTED ERROR: when running,current path not found."); + } + } + + cur_path_ = path_table_[path_id].get(); + + size_t header_size = content_object.headerSize(); + size_t data_size = content_object.payloadSize(); + + // Update measurements for path + cur_path_->updateReceivedStats(header_size + data_size, data_size); +} + +void RaaqmTransportProtocol::checkDropProbability() { + if (!raaqm_autotune_) { + return; + } + + unsigned int max_pd = 0; + PathTable::iterator it; + for (auto it = path_table_.begin(); it != path_table_.end(); ++it) { + if (it->second->getPropagationDelay() > max_pd && + it->second->getPropagationDelay() != UINT_MAX && + !it->second->isStale()) { + max_pd = it->second->getPropagationDelay(); + } + } + + double drop_prob = 0; + double beta = 0; + if (max_pd < wifi_delay_) { // only ethernet paths + drop_prob = default_drop_; + beta = default_beta_; + } else if (max_pd < lte_delay_) { // at least one wifi path + drop_prob = drop_wifi_; + beta = beta_wifi_; + } else { // at least one lte path + drop_prob = drop_lte_; + beta = beta_lte_; + } + + double old_drop_prob = 0; + double old_beta = 0; + socket_->getSocketOption(RaaqmTransportOptions::BETA_VALUE, old_beta); + socket_->getSocketOption(RaaqmTransportOptions::DROP_FACTOR, old_drop_prob); + + if (drop_prob == old_drop_prob && beta == old_beta) { + return; + } + + socket_->setSocketOption(RaaqmTransportOptions::BETA_VALUE, beta); + socket_->setSocketOption(RaaqmTransportOptions::DROP_FACTOR, drop_prob); + + for (it = path_table_.begin(); it != path_table_.end(); it++) { + it->second->setDropProb(drop_prob); + } +} + +void RaaqmTransportProtocol::checkForStalePaths() { + if (!raaqm_autotune_) { + return; + } + + bool stale = false; + PathTable::iterator it; + for (it = path_table_.begin(); it != path_table_.end(); ++it) { + if (it->second->isStale()) { + stale = true; + break; + } + } + if (stale) { + checkDropProbability(); + } +} + +} // end namespace protocol + +} // namespace transport diff --git a/libtransport/src/protocols/raaqm.h b/libtransport/src/protocols/raaqm.h new file mode 100644 index 000000000..412967770 --- /dev/null +++ b/libtransport/src/protocols/raaqm.h @@ -0,0 +1,142 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <hicn/transport/utils/chrono_typedefs.h> + +#include <protocols/byte_stream_reassembly.h> +#include <protocols/congestion_window_protocol.h> +#include <protocols/protocol.h> +#include <protocols/raaqm_data_path.h> +#include <protocols/rate_estimation.h> + +#include <queue> +#include <vector> + +namespace transport { + +namespace protocol { + +class RaaqmTransportProtocol : public TransportProtocol, + public CWindowProtocol { + public: + RaaqmTransportProtocol(implementation::ConsumerSocket *icnet_socket); + + ~RaaqmTransportProtocol(); + + int start() override; + + void resume() override; + + void reset() override; + + virtual bool verifyKeyPackets() override; + + protected: + static constexpr uint32_t buffer_size = + 1 << interface::default_values::log_2_default_buffer_size; + static constexpr uint16_t mask = buffer_size - 1; + using PathTable = + std::unordered_map<uint32_t, std::unique_ptr<RaaqmDataPath>>; + + void increaseWindow() override; + void decreaseWindow() override; + + virtual void afterContentReception(const Interest &interest, + const ContentObject &content_object); + virtual void afterDataUnsatisfied(uint64_t segment); + + virtual void updateStats(uint32_t suffix, uint64_t rtt, + utils::TimePoint &now); + + private: + void init(); + + void onContentObject(Interest::Ptr &&i, ContentObject::Ptr &&c) override; + + void onContentSegment(Interest::Ptr &&interest, + ContentObject::Ptr &&content_object); + + void onPacketDropped(Interest::Ptr &&interest, + ContentObject::Ptr &&content_object) override; + + void onReassemblyFailed(std::uint32_t missing_segment) override; + + void onTimeout(Interest::Ptr &&i) override; + + virtual void scheduleNextInterests() override; + + void sendInterest(std::uint64_t next_suffix); + + void sendInterest(Interest::Ptr &&interest); + + void onContentReassembled(std::error_code ec) override; + + void updateRtt(uint64_t segment); + + void RAAQM(); + + void updatePathTable(const ContentObject &content_object); + + void checkDropProbability(); + + void checkForStalePaths(); + + void printRtt(); + + protected: + // Congestion window management + double current_window_size_; + // Protocol management + uint64_t interests_in_flight_; + std::array<std::uint32_t, buffer_size> interest_retransmissions_; + std::array<utils::TimePoint, buffer_size> interest_timepoints_; + std::queue<Interest::Ptr> interest_to_retransmit_; + + private: + /** + * Current download path + */ + RaaqmDataPath *cur_path_; + + /** + * Hash table for path: each entry is a pair path ID(key) - path object + */ + PathTable path_table_; + + // TimePoints for statistic + utils::TimePoint t0_; + + bool set_interest_filter_; + + // for rate-estimation at packet level + IcnRateEstimator *rate_estimator_; + + // params for autotuning + bool raaqm_autotune_; + double default_beta_; + double default_drop_; + double beta_wifi_; + double drop_wifi_; + double beta_lte_; + double drop_lte_; + unsigned int wifi_delay_; + unsigned int lte_delay_; +}; + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/raaqm_data_path.cc b/libtransport/src/protocols/raaqm_data_path.cc new file mode 100644 index 000000000..439549c85 --- /dev/null +++ b/libtransport/src/protocols/raaqm_data_path.cc @@ -0,0 +1,158 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <hicn/transport/utils/chrono_typedefs.h> + +#include <protocols/raaqm_data_path.h> + +namespace transport { + +namespace protocol { + +RaaqmDataPath::RaaqmDataPath(double drop_factor, + double minimum_drop_probability, + unsigned new_timer, unsigned int samples, + uint64_t new_rtt, uint64_t new_rtt_min, + uint64_t new_rtt_max, unsigned new_pd) + + : drop_factor_(drop_factor), + minimum_drop_probability_(minimum_drop_probability), + timer_(new_timer), + samples_(samples), + rtt_(new_rtt), + rtt_min_(new_rtt_min), + rtt_max_(new_rtt_max), + prop_delay_(new_pd), + new_prop_delay_(false), + drop_prob_(0), + packets_received_(0), + last_packets_received_(0), + m_packets_bytes_received_(0), + last_packets_bytes_received_(0), + raw_data_bytes_received_(0), + last_raw_data_bytes_received_(0), + rtt_samples_(samples_), + last_received_pkt_(utils::SteadyClock::now()), + average_rtt_(0), + alpha_(ALPHA) {} + +RaaqmDataPath &RaaqmDataPath::insertNewRtt(uint64_t new_rtt) { + rtt_ = new_rtt; + rtt_samples_.pushBack(new_rtt); + + rtt_max_ = rtt_samples_.rBegin(); + rtt_min_ = rtt_samples_.begin(); + + if (rtt_min_ < prop_delay_) { + new_prop_delay_ = true; + prop_delay_ = rtt_min_; + } + + last_received_pkt_ = utils::SteadyClock::now(); + + return *this; +} + +RaaqmDataPath &RaaqmDataPath::updateReceivedStats(std::size_t packet_size, + std::size_t data_size) { + packets_received_++; + m_packets_bytes_received_ += packet_size; + raw_data_bytes_received_ += data_size; + + return *this; +} + +double RaaqmDataPath::getDropFactor() { return drop_factor_; } + +double RaaqmDataPath::getDropProb() { return drop_prob_; } + +RaaqmDataPath &RaaqmDataPath::setDropProb(double dropProb) { + drop_prob_ = dropProb; + + return *this; +} + +double RaaqmDataPath::getMinimumDropProbability() { + return minimum_drop_probability_; +} + +double RaaqmDataPath::getTimer() { return timer_; } + +RaaqmDataPath &RaaqmDataPath::smoothTimer() { + timer_ = (1 - TIMEOUT_SMOOTHER) * timer_ + + (TIMEOUT_SMOOTHER)*rtt_ * (TIMEOUT_RATIO); + + return *this; +} + +double RaaqmDataPath::getRtt() { return (double)rtt_; } + +double RaaqmDataPath::getAverageRtt() { return average_rtt_; } + +double RaaqmDataPath::getRttMax() { return (double)rtt_max_; } + +double RaaqmDataPath::getRttMin() { return (double)rtt_min_; } + +unsigned RaaqmDataPath::getSampleValue() { return samples_; } + +unsigned RaaqmDataPath::getRttQueueSize() { + return static_cast<unsigned>(rtt_samples_.size()); +} + +RaaqmDataPath &RaaqmDataPath::updateDropProb() { + drop_prob_ = 0.0; + + if (getSampleValue() == getRttQueueSize()) { + if (rtt_max_ == rtt_min_) { + drop_prob_ = minimum_drop_probability_; + } else { + drop_prob_ = minimum_drop_probability_ + + drop_factor_ * (rtt_ - rtt_min_) / (rtt_max_ - rtt_min_); + } + } + + return *this; +} + +void RaaqmDataPath::setAlpha(double alpha) { + if (alpha >= 0 && alpha <= 1) { + alpha_ = alpha; + } +} + +bool RaaqmDataPath::newPropagationDelayAvailable() { + bool r = new_prop_delay_; + new_prop_delay_ = false; + return r; +} + +unsigned int RaaqmDataPath::getPropagationDelay() { + return (unsigned int)prop_delay_; +} + +bool RaaqmDataPath::isStale() { + utils::TimePoint now = utils::SteadyClock::now(); + auto time = + std::chrono::duration_cast<utils::Microseconds>(now - last_received_pkt_) + .count(); + if (time > 2000000) { + return true; + } + return false; +} + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/raaqm_data_path.h b/libtransport/src/protocols/raaqm_data_path.h new file mode 100644 index 000000000..6f2afde72 --- /dev/null +++ b/libtransport/src/protocols/raaqm_data_path.h @@ -0,0 +1,226 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <hicn/transport/utils/chrono_typedefs.h> + +#include <utils/min_filter.h> + +#include <chrono> +#include <climits> +#include <iostream> + +#define TIMEOUT_SMOOTHER 0.1 +#define TIMEOUT_RATIO 10 +#define ALPHA 0.8 + +namespace transport { + +namespace protocol { + +class RaaqmDataPath { + public: + RaaqmDataPath(double drop_factor, double minimum_drop_probability, + unsigned new_timer, unsigned int samples, + uint64_t new_rtt = 1000, uint64_t new_rtt_min = 1000, + uint64_t new_rtt_max = 1000, unsigned new_pd = UINT_MAX); + + public: + /* + * @brief Add a new RTT to the RTT queue of the path, check if RTT queue is + * full, and thus need overwrite. Also it maintains the validity of min and + * max of RTT. + * @param new_rtt is the value of the new RTT + */ + RaaqmDataPath &insertNewRtt(uint64_t new_rtt); + + /** + * @brief Update the path statistics + * @param packet_size the size of the packet received, including the ICN + * header + * @param data_size the size of the data received, without the ICN header + */ + RaaqmDataPath &updateReceivedStats(std::size_t packet_size, + std::size_t data_size); + + /** + * @brief Get the value of the drop factor parameter + */ + double getDropFactor(); + + /** + * @brief Get the value of the drop probability + */ + double getDropProb(); + + /** + * @brief Set the value pf the drop probability + * @param drop_prob is the value of the drop probability + */ + RaaqmDataPath &setDropProb(double drop_prob); + + /** + * @brief Get the minimum drop probability + */ + double getMinimumDropProbability(); + + /** + * @brief Get last RTT + */ + double getRtt(); + + /** + * @brief Get average RTT + */ + double getAverageRtt(); + + /** + * @brief Get the current m_timer value + */ + double getTimer(); + + /** + * @brief Smooth he value of the m_timer accordingly with the last RTT + * measured + */ + RaaqmDataPath &smoothTimer(); + + /** + * @brief Get the maximum RTT among the last samples + */ + double getRttMax(); + + /** + * @brief Get the minimum RTT among the last samples + */ + double getRttMin(); + + /** + * @brief Get the number of saved samples + */ + unsigned getSampleValue(); + + /** + * @brief Get the size og the RTT queue + */ + unsigned getRttQueueSize(); + + /* + * @brief Change drop probability according to RTT statistics + * Invoked in RAAQM(), before control window size update. + */ + RaaqmDataPath &updateDropProb(); + + void setAlpha(double alpha); + + /** + * @brief Returns the smallest RTT registered so far for this path + */ + + unsigned int getPropagationDelay(); + + bool newPropagationDelayAvailable(); + + bool isStale(); + + private: + /** + * The value of the drop factor + */ + double drop_factor_; + + /** + * The minumum drop probability + */ + double minimum_drop_probability_; + + /** + * The timer, expressed in milliseconds + */ + double timer_; + + /** + * The number of samples to store for computing the protocol measurements + */ + const unsigned int samples_; + + /** + * The last, the minimum and the maximum value of the RTT (among the last + * m_samples samples) + */ + uint64_t rtt_, rtt_min_, rtt_max_, prop_delay_; + + bool new_prop_delay_; + + /** + * The current drop probability + */ + double drop_prob_; + + /** + * The number of packets received in this path + */ + intmax_t packets_received_; + + /** + * The first packet received after the statistics print + */ + intmax_t last_packets_received_; + + /** + * Total number of bytes received including the ICN header + */ + intmax_t m_packets_bytes_received_; + + /** + * The amount of packet bytes received at the last path summary computation + */ + intmax_t last_packets_bytes_received_; + + /** + * Total number of bytes received without including the ICN header + */ + intmax_t raw_data_bytes_received_; + + /** + * The amount of raw dat bytes received at the last path summary computation + */ + intmax_t last_raw_data_bytes_received_; + + class byArrival; + + class byOrder; + + /** + * Double ended queue for the RTTs + */ + + typedef utils::MinFilter<uint64_t> RTTQueue; + + RTTQueue rtt_samples_; + + /** + * Time of the last call to the path reporter method + */ + utils::TimePoint last_received_pkt_; + + double average_rtt_; + double alpha_; +}; + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/rate_estimation.cc b/libtransport/src/protocols/rate_estimation.cc new file mode 100644 index 000000000..a2cf1aefe --- /dev/null +++ b/libtransport/src/protocols/rate_estimation.cc @@ -0,0 +1,356 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <hicn/transport/interfaces/socket_options_default_values.h> +#include <hicn/transport/utils/log.h> + +#include <protocols/rate_estimation.h> + +#include <thread> + +namespace transport { + +namespace protocol { + +void *Timer(void *data) { + InterRttEstimator *estimator = (InterRttEstimator *)data; + + double dat_rtt, my_avg_win, my_avg_rtt; + int my_win_change, number_of_packets, max_packet_size; + + pthread_mutex_lock(&(estimator->mutex_)); + dat_rtt = estimator->rtt_; + pthread_mutex_unlock(&(estimator->mutex_)); + + while (estimator->is_running_) { + std::this_thread::sleep_for(std::chrono::microseconds( + (uint64_t)(interface::default_values::kv * dat_rtt))); + + pthread_mutex_lock(&(estimator->mutex_)); + + dat_rtt = estimator->rtt_; + my_avg_win = estimator->avg_win_; + my_avg_rtt = estimator->avg_rtt_; + my_win_change = (int)(estimator->win_change_); + number_of_packets = estimator->number_of_packets_; + max_packet_size = estimator->max_packet_size_; + estimator->avg_rtt_ = estimator->rtt_; + estimator->avg_win_ = 0; + estimator->win_change_ = 0; + estimator->number_of_packets_ = 1; + + pthread_mutex_unlock(&(estimator->mutex_)); + + if (number_of_packets == 0 || my_win_change == 0) { + continue; + } + if (estimator->estimation_ == 0) { + estimator->estimation_ = (my_avg_win * 8.0 * max_packet_size * 1000000.0 / + (1.0 * my_win_change)) / + (my_avg_rtt / (1.0 * number_of_packets)); + } + + estimator->estimation_ = + estimator->alpha_ * estimator->estimation_ + + (1 - estimator->alpha_) * ((my_avg_win * 8.0 * max_packet_size * + 1000000.0 / (1.0 * my_win_change)) / + (my_avg_rtt / (1.0 * number_of_packets))); + + if (estimator->observer_) { + estimator->observer_->notifyStats(estimator->estimation_); + } + } + + return nullptr; +} + +InterRttEstimator::InterRttEstimator(double alpha_arg) { + this->estimated_ = false; + this->observer_ = NULL; + this->alpha_ = alpha_arg; + this->thread_is_running_ = false; + this->my_th_ = NULL; + this->is_running_ = true; + this->avg_rtt_ = 0.0; + this->estimation_ = 0.0; + this->avg_win_ = 0.0; + this->rtt_ = 0.0; + this->win_change_ = 0; + this->number_of_packets_ = 0; + this->max_packet_size_ = 0; + this->win_current_ = 1.0; + + pthread_mutex_init(&(this->mutex_), NULL); + this->start_time_ = std::chrono::steady_clock::now(); + this->begin_batch_ = std::chrono::steady_clock::now(); +} + +InterRttEstimator::~InterRttEstimator() { + this->is_running_ = false; + if (this->my_th_) { + pthread_join(*(this->my_th_), NULL); + } + this->my_th_ = NULL; + pthread_mutex_destroy(&(this->mutex_)); +} + +void InterRttEstimator::onRttUpdate(double rtt) { + pthread_mutex_lock(&(this->mutex_)); + this->rtt_ = rtt; + this->number_of_packets_++; + this->avg_rtt_ += rtt; + pthread_mutex_unlock(&(this->mutex_)); + + if (!thread_is_running_) { + my_th_ = (pthread_t *)malloc(sizeof(pthread_t)); + if (!my_th_) { + TRANSPORT_LOGE("Error allocating thread."); + my_th_ = NULL; + } + if (/*int err = */ pthread_create(my_th_, NULL, transport::protocol::Timer, + (void *)this)) { + TRANSPORT_LOGE("Error creating the thread"); + my_th_ = NULL; + } + thread_is_running_ = true; + } +} + +void InterRttEstimator::onWindowIncrease(double win_current) { + TimePoint end = std::chrono::steady_clock::now(); + auto delay = + std::chrono::duration_cast<Microseconds>(end - this->begin_batch_) + .count(); + + pthread_mutex_lock(&(this->mutex_)); + this->avg_win_ += this->win_current_ * delay; + this->win_current_ = win_current; + this->win_change_ += delay; + pthread_mutex_unlock(&(this->mutex_)); + + this->begin_batch_ = std::chrono::steady_clock::now(); +} + +void InterRttEstimator::onWindowDecrease(double win_current) { + TimePoint end = std::chrono::steady_clock::now(); + auto delay = + std::chrono::duration_cast<Microseconds>(end - this->begin_batch_) + .count(); + + pthread_mutex_lock(&(this->mutex_)); + this->avg_win_ += this->win_current_ * delay; + this->win_current_ = win_current; + this->win_change_ += delay; + pthread_mutex_unlock(&(this->mutex_)); + + this->begin_batch_ = std::chrono::steady_clock::now(); +} + +ALaTcpEstimator::ALaTcpEstimator() { + this->estimation_ = 0.0; + this->observer_ = NULL; + this->start_time_ = std::chrono::steady_clock::now(); + this->totalSize_ = 0.0; +} + +void ALaTcpEstimator::onStart() { + this->totalSize_ = 0.0; + this->start_time_ = std::chrono::steady_clock::now(); +} + +void ALaTcpEstimator::onDownloadFinished() { + TimePoint end = std::chrono::steady_clock::now(); + auto delay = + std::chrono::duration_cast<Microseconds>(end - this->start_time_).count(); + this->estimation_ = this->totalSize_ * 8 * 1000000 / delay; + if (observer_) { + observer_->notifyStats(this->estimation_); + } +} + +void ALaTcpEstimator::onDataReceived(int packet_size) { + this->totalSize_ += packet_size; +} + +SimpleEstimator::SimpleEstimator(double alphaArg, int batching_param) { + this->estimation_ = 0.0; + this->estimated_ = false; + this->observer_ = nullptr; + this->batching_param_ = batching_param; + this->total_size_ = 0.0; + this->number_of_packets_ = 0; + this->base_alpha_ = alphaArg; + this->alpha_ = alphaArg; + this->start_time_ = std::chrono::steady_clock::now(); + this->begin_batch_ = std::chrono::steady_clock::now(); +} + +void SimpleEstimator::onStart() { + this->estimated_ = false; + this->number_of_packets_ = 0; + this->total_size_ = 0.0; + this->start_time_ = std::chrono::steady_clock::now(); + this->begin_batch_ = std::chrono::steady_clock::now(); +} + +void SimpleEstimator::onDownloadFinished() { + TimePoint end = std::chrono::steady_clock::now(); + auto delay = + std::chrono::duration_cast<Microseconds>(end - this->start_time_).count(); + if (observer_) { + observer_->notifyDownloadTime((double)delay); + } + if (!this->estimated_) { + // Assuming all packets carry max_packet_size_ bytes of data + // (8*max_packet_size_ bits); 1000000 factor to convert us to seconds + if (this->estimation_) { + this->estimation_ = + alpha_ * this->estimation_ + + (1 - alpha_) * (total_size_ * 8 * 1000000.0 / (delay)); + } else { + this->estimation_ = total_size_ * 8 * 1000000.0 / (delay); + } + if (observer_) { + observer_->notifyStats(this->estimation_); + } + this->alpha_ = this->base_alpha_ * (((double)this->number_of_packets_) / + ((double)this->batching_param_)); + } else { + if (this->number_of_packets_ >= + (int)(75.0 * (double)this->batching_param_ / 100.0)) { + delay = std::chrono::duration_cast<Microseconds>(end - this->begin_batch_) + .count(); + // Assuming all packets carry max_packet_size_ bytes of data + // (8*max_packet_size_ bits); 1000000 factor to convert us to seconds + if (this->estimation_) { + this->estimation_ = + alpha_ * this->estimation_ + + (1 - alpha_) * (total_size_ * 8 * 1000000.0 / (delay)); + } else { + this->estimation_ = total_size_ * 8 * 1000000.0 / (delay); + } + if (observer_) { + observer_->notifyStats(this->estimation_); + } + this->alpha_ = this->base_alpha_ * (((double)this->number_of_packets_) / + ((double)this->batching_param_)); + } + } + this->number_of_packets_ = 0; + this->total_size_ = 0.0; + this->start_time_ = std::chrono::steady_clock::now(); + this->begin_batch_ = std::chrono::steady_clock::now(); +} + +void SimpleEstimator::onDataReceived(int packet_size) { + this->total_size_ += packet_size; +} + +void SimpleEstimator::onRttUpdate(double rtt) { + this->number_of_packets_++; + + if (this->number_of_packets_ == this->batching_param_) { + TimePoint end = std::chrono::steady_clock::now(); + auto delay = + std::chrono::duration_cast<Microseconds>(end - this->begin_batch_) + .count(); + // Assuming all packets carry max_packet_size_ bytes of data + // (8*max_packet_size_ bits); 1000000 factor to convert us to seconds + if (this->estimation_) { + this->estimation_ = + alpha_ * this->estimation_ + + (1 - alpha_) * (total_size_ * 8 * 1000000.0 / (delay)); + } else { + this->estimation_ = total_size_ * 8 * 1000000.0 / (delay); + } + if (observer_) { + observer_->notifyStats(this->estimation_); + } + this->alpha_ = this->base_alpha_; + this->number_of_packets_ = 0; + this->total_size_ = 0.0; + this->begin_batch_ = std::chrono::steady_clock::now(); + } +} + +BatchingPacketsEstimator::BatchingPacketsEstimator(double alpha_arg, + int param) { + this->estimated_ = false; + this->observer_ = NULL; + this->alpha_ = alpha_arg; + this->batching_param_ = param; + this->number_of_packets_ = 0; + this->avg_win_ = 0.0; + this->avg_rtt_ = 0.0; + this->win_change_ = 0.0; + this->max_packet_size_ = 0; + this->estimation_ = 0.0; + this->win_current_ = 1.0; + this->begin_batch_ = std::chrono::steady_clock::now(); + this->start_time_ = std::chrono::steady_clock::now(); +} + +void BatchingPacketsEstimator::onRttUpdate(double rtt) { + this->number_of_packets_++; + this->avg_rtt_ += rtt; + + if (number_of_packets_ == this->batching_param_) { + if (estimation_ == 0) { + estimation_ = (avg_win_ * 8.0 * max_packet_size_ * 1000000.0 / + (1.0 * win_change_)) / + (avg_rtt_ / (1.0 * number_of_packets_)); + } else { + estimation_ = alpha_ * estimation_ + + (1 - alpha_) * ((avg_win_ * 8.0 * max_packet_size_ * + 1000000.0 / (1.0 * win_change_)) / + (avg_rtt_ / (1.0 * number_of_packets_))); + } + + if (observer_) { + observer_->notifyStats(estimation_); + } + + this->number_of_packets_ = 0; + this->avg_win_ = 0.0; + this->avg_rtt_ = 0.0; + this->win_change_ = 0.0; + } +} + +void BatchingPacketsEstimator::onWindowIncrease(double win_current) { + TimePoint end = std::chrono::steady_clock::now(); + auto delay = + std::chrono::duration_cast<Microseconds>(end - this->begin_batch_) + .count(); + this->avg_win_ += this->win_current_ * delay; + this->win_current_ = win_current; + this->win_change_ += delay; + this->begin_batch_ = std::chrono::steady_clock::now(); +} + +void BatchingPacketsEstimator::onWindowDecrease(double win_current) { + TimePoint end = std::chrono::steady_clock::now(); + auto delay = + std::chrono::duration_cast<Microseconds>(end - this->begin_batch_) + .count(); + this->avg_win_ += this->win_current_ * delay; + this->win_current_ = win_current; + this->win_change_ += delay; + this->begin_batch_ = std::chrono::steady_clock::now(); +} + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/rate_estimation.h b/libtransport/src/protocols/rate_estimation.h new file mode 100644 index 000000000..17f39e0b9 --- /dev/null +++ b/libtransport/src/protocols/rate_estimation.h @@ -0,0 +1,174 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <hicn/transport/interfaces/statistics.h> + +#include <protocols/raaqm_data_path.h> + +#include <chrono> + +namespace transport { + +namespace protocol { + +class IcnRateEstimator { + public: + using TimePoint = std::chrono::steady_clock::time_point; + using Microseconds = std::chrono::microseconds; + + IcnRateEstimator(){}; + + virtual ~IcnRateEstimator(){}; + + virtual void onRttUpdate(double rtt){}; + + virtual void onDataReceived(int packetSize){}; + + virtual void onWindowIncrease(double winCurrent){}; + + virtual void onWindowDecrease(double winCurrent){}; + + virtual void onStart(){}; + + virtual void onDownloadFinished(){}; + + virtual void setObserver(interface::IcnObserver *observer) { + this->observer_ = observer; + }; + interface::IcnObserver *observer_; + TimePoint start_time_; + TimePoint begin_batch_; + double base_alpha_; + double alpha_; + double estimation_; + int number_of_packets_; + // this boolean is to make sure at least one estimation of the BW is done + bool estimated_; +}; + +// A rate estimator RTT-based. Computes EWMA(WinSize)/EWMA(RTT) + +class InterRttEstimator : public IcnRateEstimator { + public: + InterRttEstimator(double alpha_arg); + + ~InterRttEstimator(); + + void onRttUpdate(double rtt); + + void onDataReceived(int packet_size) { + if (packet_size > this->max_packet_size_) { + this->max_packet_size_ = packet_size; + } + }; + + void onWindowIncrease(double win_current); + + void onWindowDecrease(double win_current); + + void onStart(){}; + + void onDownloadFinished(){}; + + // private: should be done by using getters + pthread_t *my_th_; + bool thread_is_running_; + double rtt_; + bool is_running_; + pthread_mutex_t mutex_; + double avg_rtt_; + double avg_win_; + int max_packet_size_; + double win_change_; + double win_current_; +}; + +// A rate estimator, Batching Packets based. Computes EWMA(WinSize)/EWMA(RTT) + +class BatchingPacketsEstimator : public IcnRateEstimator { + public: + BatchingPacketsEstimator(double alpha_arg, int batchingParam); + + void onRttUpdate(double rtt); + + void onDataReceived(int packet_size) { + if (packet_size > this->max_packet_size_) { + this->max_packet_size_ = packet_size; + } + }; + + void onWindowIncrease(double win_current); + + void onWindowDecrease(double win_current); + + void onStart(){}; + + void onDownloadFinished(){}; + + private: + int batching_param_; + double avg_rtt_; + double avg_win_; + double win_change_; + int max_packet_size_; + double win_current_; +}; + +// Segment Estimator + +class ALaTcpEstimator : public IcnRateEstimator { + public: + ALaTcpEstimator(); + + void onDataReceived(int packet_size); + void onStart(); + void onDownloadFinished(); + + private: + double totalSize_; +}; + +// A Rate estimator, this one is the simplest: counting batching_param_ packets +// and then divide the sum of the size of these packets by the time taken to DL +// them. Should be the one used + +class SimpleEstimator : public IcnRateEstimator { + public: + SimpleEstimator(double alpha, int batching_param); + + void onRttUpdate(double rtt); + + void onDataReceived(int packet_size); + + void onWindowIncrease(double win_current){}; + + void onWindowDecrease(double win_current){}; + + void onStart(); + + void onDownloadFinished(); + + private: + int batching_param_; + double total_size_; +}; + +void *Timer(void *data); + +} // namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/reassembly.cc b/libtransport/src/protocols/reassembly.cc new file mode 100644 index 000000000..c6602153c --- /dev/null +++ b/libtransport/src/protocols/reassembly.cc @@ -0,0 +1,72 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <hicn/transport/interfaces/socket_consumer.h> +#include <hicn/transport/utils/array.h> +#include <hicn/transport/utils/membuf.h> + +#include <implementation/socket_consumer.h> +#include <protocols/errors.h> +#include <protocols/indexer.h> +#include <protocols/reassembly.h> + +namespace transport { + +namespace protocol { + +void Reassembly::notifyApplication() { + interface::ConsumerSocket::ReadCallback *read_callback = nullptr; + reassembly_consumer_socket_->getSocketOption( + interface::ConsumerCallbacksOptions::READ_CALLBACK, &read_callback); + + if (TRANSPORT_EXPECT_FALSE(!read_callback)) { + TRANSPORT_LOGE("Read callback not installed!"); + return; + } + + if (read_callback->isBufferMovable()) { + // No need to perform an additional copy. The whole buffer will be + // tranferred to the application. + + read_callback->readBufferAvailable(std::move(read_buffer_)); + read_buffer_ = utils::MemBuf::create(read_callback->maxBufferSize()); + } else { + // The buffer will be copied into the application-provided buffer + uint8_t *buffer; + std::size_t length; + std::size_t total_length = read_buffer_->length(); + + while (read_buffer_->length()) { + buffer = nullptr; + length = 0; + read_callback->getReadBuffer(&buffer, &length); + + if (!buffer || !length) { + throw errors::RuntimeException( + "Invalid buffer provided by the application."); + } + + auto to_copy = std::min(read_buffer_->length(), length); + std::memcpy(buffer, read_buffer_->data(), to_copy); + read_buffer_->trimStart(to_copy); + } + + read_callback->readDataAvailable(total_length); + read_buffer_->clear(); + } +} + +} // namespace protocol +} // namespace transport diff --git a/libtransport/src/protocols/reassembly.h b/libtransport/src/protocols/reassembly.h new file mode 100644 index 000000000..fdc9f2a05 --- /dev/null +++ b/libtransport/src/protocols/reassembly.h @@ -0,0 +1,67 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <core/facade.h> + +namespace transport { + +namespace implementation { +class ConsumerReadCallback; +class ConsumerSocket; +} // namespace implementation + +namespace protocol { + +class TransportProtocol; +class Indexer; + +// Forward Declaration +class ManifestManager; + +class Reassembly { + public: + class ContentReassembledCallback { + public: + virtual void onContentReassembled(std::error_code ec) = 0; + }; + + Reassembly(implementation::ConsumerSocket *icn_socket, + TransportProtocol *transport_protocol) + : reassembly_consumer_socket_(icn_socket), + transport_protocol_(transport_protocol) {} + + virtual ~Reassembly() = default; + + virtual void reassemble(core::ContentObject::Ptr &&content_object) = 0; + virtual void reassemble( + std::unique_ptr<core::ContentObjectManifest> &&manifest) = 0; + virtual void reInitialize() = 0; + virtual void setIndexer(Indexer *indexer) { index_manager_ = indexer; } + + protected: + virtual void notifyApplication(); + + protected: + implementation::ConsumerSocket *reassembly_consumer_socket_; + TransportProtocol *transport_protocol_; + Indexer *index_manager_; + std::unique_ptr<utils::MemBuf> read_buffer_; +}; + +} // namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/rtc.cc b/libtransport/src/protocols/rtc.cc new file mode 100644 index 000000000..0ac3839dd --- /dev/null +++ b/libtransport/src/protocols/rtc.cc @@ -0,0 +1,1017 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <protocols/rtc.h> + +#include <hicn/transport/interfaces/socket_consumer.h> +#include <implementation/socket_consumer.h> + +#include <math.h> +#include <random> + +namespace transport { + +namespace protocol { + +using namespace interface; + +RTCTransportProtocol::RTCTransportProtocol( + implementation::ConsumerSocket *icn_socket) + : TransportProtocol(icn_socket, nullptr), + DatagramReassembly(icn_socket, this), + inflightInterests_(1 << default_values::log_2_default_buffer_size), + modMask_((1 << default_values::log_2_default_buffer_size) - 1) { + icn_socket->getSocketOption(PORTAL, portal_); + rtx_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService()); + probe_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService()); + sentinel_timer_ = + std::make_unique<asio::steady_timer>(portal_->getIoService()); + round_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService()); + reset(); +} + +RTCTransportProtocol::~RTCTransportProtocol() { + if (is_running_) { + stop(); + } +} + +int RTCTransportProtocol::start() { + if (is_running_) return -1; + + reset(); + is_first_ = true; + + probeRtt(); + sentinelTimer(); + newRound(); + scheduleNextInterests(); + + is_first_ = false; + is_running_ = true; + portal_->runEventsLoop(); + is_running_ = false; + + return 0; +} + +void RTCTransportProtocol::stop() { + if (!is_running_) return; + + is_running_ = false; + portal_->stopEventsLoop(); +} + +void RTCTransportProtocol::resume() { + if (is_running_) return; + + is_running_ = true; + inflightInterestsCount_ = 0; + + probeRtt(); + sentinelTimer(); + newRound(); + scheduleNextInterests(); + + portal_->runEventsLoop(); + is_running_ = false; +} + +// private +void RTCTransportProtocol::reset() { + portal_->setConsumerCallback(this); + // controller var + currentState_ = HICN_RTC_SYNC_STATE; + + // cwin var + currentCWin_ = HICN_INITIAL_CWIN; + maxCWin_ = HICN_INITIAL_CWIN_MAX; + + // names/packets var + actualSegment_ = 0; + inflightInterestsCount_ = 0; + interestRetransmissions_.clear(); + lastSegNacked_ = 0; + lastReceived_ = 0; + lastReceivedTime_ = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + lastEvent_ = lastReceivedTime_; + highestReceived_ = 0; + firstSequenceInRound_ = 0; + + rtx_timer_used_ = false; + for (int i = 0; i < (1 << default_values::log_2_default_buffer_size); i++) { + inflightInterests_[i] = {0}; + } + + // stats + firstPckReceived_ = false; + receivedBytes_ = 0; + sentInterest_ = 0; + receivedData_ = 0; + packetLost_ = 0; + lossRecovered_ = 0; + avgPacketSize_ = HICN_INIT_PACKET_SIZE; + gotNack_ = false; + gotFutureNack_ = 0; + rounds_ = 0; + roundsWithoutNacks_ = 0; + pathTable_.clear(); + + // CC var + estimatedBw_ = 0.0; + lossRate_ = 0.0; + queuingDelay_ = 0.0; + protocolState_ = HICN_RTC_NORMAL_STATE; + + producerPathLabels_[0] = 0; + producerPathLabels_[1] = 0; + initied = false; + + socket_->setSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, + (uint32_t)HICN_RTC_INTEREST_LIFETIME); + // XXX this should be done by the application +} + +uint32_t max(uint32_t a, uint32_t b) { + if (a > b) + return a; + else + return b; +} + +uint32_t min(uint32_t a, uint32_t b) { + if (a < b) + return a; + else + return b; +} + +void RTCTransportProtocol::newRound() { + round_timer_->expires_from_now(std::chrono::milliseconds(HICN_ROUND_LEN)); + round_timer_->async_wait([this](std::error_code ec) { + if (ec) return; + updateStats(HICN_ROUND_LEN); + newRound(); + }); +} + +void RTCTransportProtocol::updateDelayStats( + const ContentObject &content_object) { + uint32_t segmentNumber = content_object.getName().getSuffix(); + uint32_t pkt = segmentNumber & modMask_; + + if (inflightInterests_[pkt].state != sent_) return; + + if (interestRetransmissions_.find(segmentNumber) != + interestRetransmissions_.end()) + // this packet was rtx at least once + return; + + uint32_t pathLabel = content_object.getPathLabel(); + + if (pathTable_.find(pathLabel) == pathTable_.end()) { + // found a new path + std::shared_ptr<RTCDataPath> newPath = std::make_shared<RTCDataPath>(); + pathTable_[pathLabel] = newPath; + } + + // RTT measurements are useful both from NACKs and data packets + uint64_t RTT = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count() - + inflightInterests_[pkt].transmissionTime; + + pathTable_[pathLabel]->insertRttSample(RTT); + auto payload = content_object.getPayload(); + + // we collect OWD only for datapackets + if (payload->length() != HICN_NACK_HEADER_SIZE) { + uint64_t *senderTimeStamp = (uint64_t *)payload->data(); + int64_t OWD = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count() - + *senderTimeStamp; + + pathTable_[pathLabel]->insertOwdSample(OWD); + pathTable_[pathLabel]->computeInterArrivalGap(segmentNumber); + } else { + pathTable_[pathLabel]->receivedNack(); + } +} + +void RTCTransportProtocol::updateStats(uint32_t round_duration) { + if (pathTable_.empty()) return; + + if (receivedBytes_ != 0) { + double bytesPerSec = + (double)(receivedBytes_ * + ((double)HICN_MILLI_IN_A_SEC / (double)round_duration)); + estimatedBw_ = (estimatedBw_ * HICN_ESTIMATED_BW_ALPHA) + + ((1 - HICN_ESTIMATED_BW_ALPHA) * bytesPerSec); + } + + uint64_t minRtt = UINT_MAX; + uint64_t maxRtt = 0; + + for (auto it = pathTable_.begin(); it != pathTable_.end(); it++) { + it->second->roundEnd(); + if (it->second->isActive()) { + if (it->second->getMinRtt() < minRtt) { + minRtt = it->second->getMinRtt(); + producerPathLabels_[0] = it->first; + } + if (it->second->getMinRtt() > maxRtt) { + maxRtt = it->second->getMinRtt(); + producerPathLabels_[1] = it->first; + } + } + } + + if (pathTable_.find(producerPathLabels_[0]) == pathTable_.end() || + pathTable_.find(producerPathLabels_[1]) == pathTable_.end()) + return; // this should not happen + + // as a queuing delay we keep the lowest one among the two paths + // if one path is congested the forwarder should decide to do not + // use it so it does not make sense to inform the application + // that maybe we have a problem + if (pathTable_[producerPathLabels_[0]]->getQueuingDealy() < + pathTable_[producerPathLabels_[1]]->getQueuingDealy()) + queuingDelay_ = pathTable_[producerPathLabels_[0]]->getQueuingDealy(); + else + queuingDelay_ = pathTable_[producerPathLabels_[1]]->getQueuingDealy(); + + if (sentInterest_ != 0 && currentState_ == HICN_RTC_NORMAL_STATE) { + uint32_t numberTheoricallyReceivedPackets_ = + highestReceived_ - firstSequenceInRound_; + double lossRate = 0; + if (numberTheoricallyReceivedPackets_ != 0) + lossRate = (double)((double)(packetLost_ - lossRecovered_) / + (double)numberTheoricallyReceivedPackets_); + + if (lossRate < 0) lossRate = 0; + + if (initied) { + lossRate_ = lossRate_ * HICN_ESTIMATED_LOSSES_ALPHA + + (lossRate * (1 - HICN_ESTIMATED_LOSSES_ALPHA)); + } else { + lossRate_ = lossRate; + initied = true; + } + } + + if (avgPacketSize_ == 0) avgPacketSize_ = HICN_INIT_PACKET_SIZE; + + // for the BDP we use the max rtt, so that we calibrate the window on the + // RTT of the slowest path. In this way we are sure that the window will + // never be too small + uint32_t BDP = (uint32_t)ceil( + (estimatedBw_ * + (double)((double)pathTable_[producerPathLabels_[1]]->getMinRtt() / + (double)HICN_MILLI_IN_A_SEC) * + HICN_BANDWIDTH_SLACK_FACTOR) / + avgPacketSize_); + uint32_t BW = (uint32_t)ceil(estimatedBw_); + computeMaxWindow(BW, BDP); + + ConsumerTimerCallback *stats_callback = nullptr; + socket_->getSocketOption(ConsumerCallbacksOptions::STATS_SUMMARY, + &stats_callback); + if (*stats_callback) { + // Send the stats to the app + stats_->updateQueuingDelay(queuingDelay_); + stats_->updateLossRatio(lossRate_); + stats_->updateAverageRtt(pathTable_[producerPathLabels_[1]]->getMinRtt()); + (*stats_callback)(*socket_->getInterface(), *stats_); + } + // bound also by interest lifitime* production rate + if (!gotNack_) { + roundsWithoutNacks_++; + if (currentState_ == HICN_RTC_SYNC_STATE && + roundsWithoutNacks_ >= HICN_ROUNDS_IN_SYNC_BEFORE_SWITCH) { + currentState_ = HICN_RTC_NORMAL_STATE; + } + } else { + roundsWithoutNacks_ = 0; + } + + updateCCState(); + updateWindow(); + + if (queuingDelay_ > 25.0) { + // this indicates that the client will go soon out of synch, + // switch to synch mode + if (currentState_ == HICN_RTC_NORMAL_STATE) { + currentState_ = HICN_RTC_SYNC_STATE; + } + computeMaxWindow(BW, 0); + increaseWindow(); + } + + // in any case we reset all the counters + + gotNack_ = false; + gotFutureNack_ = 0; + receivedBytes_ = 0; + sentInterest_ = 0; + receivedData_ = 0; + packetLost_ = 0; + lossRecovered_ = 0; + rounds_++; + firstSequenceInRound_ = highestReceived_; +} + +void RTCTransportProtocol::updateCCState() { + // TODO +} + +void RTCTransportProtocol::computeMaxWindow(uint32_t productionRate, + uint32_t BDPWin) { + if (productionRate == + 0) // we have no info about the producer, keep the previous maxCWin + return; + + uint32_t interestLifetime = default_values::interest_lifetime; + socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, + interestLifetime); + uint32_t maxWaintingInterest = (uint32_t)ceil( + (productionRate / avgPacketSize_) * + (double)((double)(interestLifetime * + HICN_INTEREST_LIFETIME_REDUCTION_FACTOR) / + (double)HICN_MILLI_IN_A_SEC)); + + if (currentState_ == HICN_RTC_SYNC_STATE) { + // in this case we do not limit the window with the BDP, beacuse most + // likely it is wrong + maxCWin_ = maxWaintingInterest; + return; + } + + // currentState = RTC_NORMAL_STATE + if (BDPWin != 0) { + maxCWin_ = (uint32_t)ceil((double)BDPWin + + (((double)BDPWin * 30.0) / 100.0)); // BDP + 30% + } else { + maxCWin_ = min(maxWaintingInterest, maxCWin_); + } + + if (maxCWin_ < HICN_MIN_CWIN) maxCWin_ = HICN_MIN_CWIN; +} + +void RTCTransportProtocol::updateWindow() { + if (currentState_ == HICN_RTC_SYNC_STATE) return; + + if (currentCWin_ < maxCWin_ * 0.9) { + currentCWin_ = + min(maxCWin_, (uint32_t)(currentCWin_ * HICN_WIN_INCREASE_FACTOR)); + } else if (currentCWin_ > maxCWin_) { + currentCWin_ = + max((uint32_t)(currentCWin_ * HICN_WIN_DECREASE_FACTOR), HICN_MIN_CWIN); + } +} + +void RTCTransportProtocol::decreaseWindow() { + // this is used only in SYNC mode + if (currentState_ == HICN_RTC_NORMAL_STATE) return; + + if (gotFutureNack_ == 1) + currentCWin_ = min((currentCWin_ - 1), + (uint32_t)ceil((double)maxCWin_ * 0.66)); // 2/3 + else + currentCWin_--; + + currentCWin_ = max(currentCWin_, HICN_MIN_CWIN); +} + +void RTCTransportProtocol::increaseWindow() { + // this is used only in SYNC mode + if (currentState_ == HICN_RTC_NORMAL_STATE) return; + + // we need to be carefull to do not increase the window to much + if (currentCWin_ < ((double)maxCWin_ * 0.7)) { + currentCWin_ = currentCWin_ + 1; // exponential + } else { + currentCWin_ = min( + maxCWin_, + (uint32_t)ceil(currentCWin_ + (1.0 / (double)currentCWin_))); // linear + } +} + +void RTCTransportProtocol::probeRtt() { + probe_timer_->expires_from_now(std::chrono::milliseconds(1000)); + probe_timer_->async_wait([this](std::error_code ec) { + if (ec) return; + probeRtt(); + }); + + // To avoid sending the first probe, because the transport is not running yet + if (is_first_ && !is_running_) return; + + time_sent_probe_ = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + + Name *interest_name = nullptr; + socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, + &interest_name); + // get a random numbe in the probe seq range + std::default_random_engine eng((std::random_device())()); + std::uniform_int_distribution<uint32_t> idis(HICN_MIN_PROBE_SEQ, + HICN_MAX_PROBE_SEQ); + probe_seq_number_ = idis(eng); + interest_name->setSuffix(probe_seq_number_); + + // we considere the probe as a rtx so that we do not incresea inFlightInt + received_probe_ = false; + TRANSPORT_LOGD("Send content interest %u (probeRtt)", + interest_name->getSuffix()); + sendInterest(interest_name, true); +} + +void RTCTransportProtocol::sendInterest(Name *interest_name, bool rtx) { + auto interest = getPacket(); + interest->setName(*interest_name); + + uint32_t interestLifetime = default_values::interest_lifetime; + socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, + interestLifetime); + interest->setLifetime(uint32_t(interestLifetime)); + + ConsumerInterestCallback *on_interest_output = nullptr; + + socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT, + &on_interest_output); + + if (*on_interest_output) { + (*on_interest_output)(*socket_->getInterface(), *interest); + } + + if (TRANSPORT_EXPECT_FALSE(!is_running_ && !is_first_)) { + return; + } + + portal_->sendInterest(std::move(interest)); + + sentInterest_++; + + if (!rtx) { + packets_in_window_[interest_name->getSuffix()] = 0; + inflightInterestsCount_++; + } +} + +void RTCTransportProtocol::scheduleNextInterests() { + if (!is_running_ && !is_first_) return; + + TRANSPORT_LOGD("----- [window %u - inflight_interests %u = %d] -----", + currentCWin_, inflightInterestsCount_, + currentCWin_ - inflightInterestsCount_); + + while (inflightInterestsCount_ < currentCWin_) { + Name *interest_name = nullptr; + socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, + &interest_name); + + interest_name->setSuffix(actualSegment_); + + // if the producer socket is not stated (does not reply even with nacks) + // we keep asking for something without marking anything as lost (see + // timeout). In this way when the producer socket will start the + // consumer socket will not miss any packet + if (TRANSPORT_EXPECT_FALSE(!firstPckReceived_)) { + uint32_t pkt = actualSegment_ & modMask_; + inflightInterests_[pkt].state = sent_; + inflightInterests_[pkt].sequence = actualSegment_; + actualSegment_ = (actualSegment_ + 1) % HICN_MIN_PROBE_SEQ; + TRANSPORT_LOGD( + "Send content interest %u (scheduleNextInterests no replies)", + interest_name->getSuffix()); + sendInterest(interest_name, false); + return; + } + + // we send the packet only if it is not pending yet + // notice that this is not true for rtx packets + if (portal_->interestIsPending(*interest_name)) { + actualSegment_ = (actualSegment_ + 1) % HICN_MIN_PROBE_SEQ; + continue; + } + + uint32_t pkt = actualSegment_ & modMask_; + // if we already reacevied the content we don't ask it again + if (inflightInterests_[pkt].state == received_ && + inflightInterests_[pkt].sequence == actualSegment_) { + actualSegment_ = (actualSegment_ + 1) % HICN_MIN_PROBE_SEQ; + continue; + } + + // same if the packet is lost + if (inflightInterests_[pkt].state == lost_ && + inflightInterests_[pkt].sequence == actualSegment_) { + actualSegment_ = (actualSegment_ + 1) % HICN_MIN_PROBE_SEQ; + continue; + } + + inflightInterests_[pkt].transmissionTime = + std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + + // here the packet can be in any state except for lost or recevied + inflightInterests_[pkt].state = sent_; + inflightInterests_[pkt].sequence = actualSegment_; + actualSegment_ = (actualSegment_ + 1) % HICN_MIN_PROBE_SEQ; + + TRANSPORT_LOGD("Send content interest %u (scheduleNextInterests)", + interest_name->getSuffix()); + sendInterest(interest_name, false); + } + + TRANSPORT_LOGD("----- end of scheduleNextInterest -----"); +} + +bool RTCTransportProtocol::verifyKeyPackets() { + // Not yet implemented + return false; +} + +void RTCTransportProtocol::sentinelTimer() { + uint32_t wait = 50; + + if (pathTable_.find(producerPathLabels_[0]) != pathTable_.end() && + pathTable_.find(producerPathLabels_[1]) != pathTable_.end()) { + // we have all the info to set the timers + wait = round(pathTable_[producerPathLabels_[0]]->getInterArrivalGap()); + if (wait == 0) wait = 1; + } + + sentinel_timer_->expires_from_now(std::chrono::milliseconds(wait)); + sentinel_timer_->async_wait([this](std::error_code ec) { + if (ec) return; + + uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + + if (pathTable_.find(producerPathLabels_[0]) == pathTable_.end() || + pathTable_.find(producerPathLabels_[1]) == pathTable_.end()) { + // we have no info, so we send again + + for (auto it = packets_in_window_.begin(); it != packets_in_window_.end(); + it++) { + uint32_t pkt = it->first & modMask_; + if (inflightInterests_[pkt].sequence == it->first) { + inflightInterests_[pkt].transmissionTime = now; + Name *interest_name = nullptr; + socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, + &interest_name); + interest_name->setSuffix(it->first); + it->second++; + sendInterest(interest_name, true); + } + } + } else { + uint64_t max_waiting_time = // wait at least 50ms + (pathTable_[producerPathLabels_[1]]->getMinRtt() - + pathTable_[producerPathLabels_[0]]->getMinRtt()) + + (ceil(pathTable_[producerPathLabels_[0]]->getInterArrivalGap()) * 50); + + if ((currentState_ == HICN_RTC_NORMAL_STATE) && + (inflightInterestsCount_ >= currentCWin_) && + ((now - lastEvent_) > max_waiting_time) && (lossRate_ >= 0.05)) { + uint64_t RTT = pathTable_[producerPathLabels_[1]]->getMinRtt(); + + for (auto it = packets_in_window_.begin(); + it != packets_in_window_.end(); it++) { + uint32_t pkt = it->first & modMask_; + if (inflightInterests_[pkt].sequence == it->first && + ((now - inflightInterests_[pkt].transmissionTime) >= RTT)) { + inflightInterests_[pkt].transmissionTime = now; + Name *interest_name = nullptr; + socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, + &interest_name); + interest_name->setSuffix(it->first); + it->second++; + sendInterest(interest_name, true); + } + } + } + } + + sentinelTimer(); + }); +} +void RTCTransportProtocol::addRetransmissions(uint32_t val) { + // add only val in the rtx list + addRetransmissions(val, val + 1); +} + +void RTCTransportProtocol::addRetransmissions(uint32_t start, uint32_t stop) { + uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + + bool new_rtx = false; + for (uint32_t i = start; i < stop; i++) { + auto it = interestRetransmissions_.find(i); + if (it == interestRetransmissions_.end()) { + uint32_t pkt = i & modMask_; + if (lastSegNacked_ <= i && inflightInterests_[pkt].state != received_) { + // it must be larger than the last past nack received + packetLost_++; + interestRetransmissions_[i] = 0; + uint32_t pkt = i & modMask_; + // we reset the transmission time setting to now, so that rtx will + // happne in one RTT on waint one inter arrival gap + inflightInterests_[pkt].transmissionTime = now; + new_rtx = true; + } + } // if the retransmission is already there the rtx timer will + // take care of it + } + + // in case a new rtx is added to the map we need to run checkRtx() + if (new_rtx) { + if (rtx_timer_used_) { + // if a timer is pending we need to delete it + rtx_timer_->cancel(); + rtx_timer_used_ = false; + } + checkRtx(); + } +} + +uint64_t RTCTransportProtocol::retransmit() { + auto it = interestRetransmissions_.begin(); + + // cut len to max HICN_MAX_RTX_SIZE + // since we use a map, the smaller (and so the older) sequence number are at + // the beginnin of the map + while (interestRetransmissions_.size() > HICN_MAX_RTX_SIZE) { + it = interestRetransmissions_.erase(it); + } + + it = interestRetransmissions_.begin(); + uint64_t smallest_timeout = ULONG_MAX; + uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + + while (it != interestRetransmissions_.end()) { + uint32_t pkt = it->first & modMask_; + + if (inflightInterests_[pkt].sequence != it->first) { + // this packet is not anymore in the inflight buffer, erase it + it = interestRetransmissions_.erase(it); + continue; + } + + // we retransmitted the packet too many times + if (it->second >= HICN_MAX_RTX) { + it = interestRetransmissions_.erase(it); + continue; + } + + // this packet is too old + if ((lastReceived_ > it->first) && + (lastReceived_ - it->first) > HICN_MAX_RTX_MAX_AGE) { + it = interestRetransmissions_.erase(it); + continue; + } + + uint64_t rtx_time = now; + + if (it->second == 0) { + // first rtx + if (producerPathLabels_[0] != producerPathLabels_[1]) { + // multipath + if (pathTable_.find(producerPathLabels_[0]) != pathTable_.end() && + pathTable_.find(producerPathLabels_[1]) != pathTable_.end() && + (pathTable_[producerPathLabels_[0]]->getInterArrivalGap() < + HICN_MIN_INTER_ARRIVAL_GAP)) { + rtx_time = lastReceivedTime_ + + (pathTable_[producerPathLabels_[1]]->getMinRtt() - + pathTable_[producerPathLabels_[0]]->getMinRtt()) + + pathTable_[producerPathLabels_[0]]->getInterArrivalGap(); + } // else low rate producer, send it immediatly + } else { + // single path + if (pathTable_.find(producerPathLabels_[0]) != pathTable_.end() && + (pathTable_[producerPathLabels_[0]]->getInterArrivalGap() < + HICN_MIN_INTER_ARRIVAL_GAP)) { + rtx_time = lastReceivedTime_ + + pathTable_[producerPathLabels_[0]]->getInterArrivalGap(); + } // else low rate producer send immediatly + } + } else { + // second or plus rtx, wait for the min rtt + if (pathTable_.find(producerPathLabels_[0]) != pathTable_.end()) { + uint64_t sent_time = inflightInterests_[pkt].transmissionTime; + rtx_time = sent_time + pathTable_[producerPathLabels_[0]]->getMinRtt(); + } // if we don't have info we send it immediatly + } + + if (now >= rtx_time) { + inflightInterests_[pkt].transmissionTime = now; + it->second++; + + Name *interest_name = nullptr; + socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, + &interest_name); + interest_name->setSuffix(it->first); + TRANSPORT_LOGD("Send content interest %u (retransmit)", + interest_name->getSuffix()); + sendInterest(interest_name, true); + } else if (rtx_time < smallest_timeout) { + smallest_timeout = rtx_time; + } + + ++it; + } + return smallest_timeout; +} + +void RTCTransportProtocol::checkRtx() { + if (interestRetransmissions_.empty()) { + rtx_timer_used_ = false; + return; + } + + uint64_t next_timeout = retransmit(); + uint64_t wait = 1; + uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + if (next_timeout != ULONG_MAX && now < next_timeout) { + wait = next_timeout - now; + } + rtx_timer_used_ = true; + rtx_timer_->expires_from_now(std::chrono::milliseconds(wait)); + rtx_timer_->async_wait([this](std::error_code ec) { + if (ec) return; + rtx_timer_used_ = false; + checkRtx(); + }); +} + +void RTCTransportProtocol::onTimeout(Interest::Ptr &&interest) { + uint32_t segmentNumber = interest->getName().getSuffix(); + + if (segmentNumber >= HICN_MIN_PROBE_SEQ) { + // this is a timeout on a probe, do nothing + return; + } + + uint32_t pkt = segmentNumber & modMask_; + + if (TRANSPORT_EXPECT_FALSE(!firstPckReceived_)) { + // we do nothing, and we keep asking the same stuff over + // and over until we get at least a packet + inflightInterestsCount_--; + lastEvent_ = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + packets_in_window_.erase(segmentNumber); + scheduleNextInterests(); + return; + } + + if (inflightInterests_[pkt].state == sent_) { + lastEvent_ = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + packets_in_window_.erase(segmentNumber); + inflightInterestsCount_--; + } + + // check how many times we sent this packet + auto it = interestRetransmissions_.find(segmentNumber); + if (it != interestRetransmissions_.end() && it->second >= HICN_MAX_RTX) { + inflightInterests_[pkt].state = lost_; + } + + if (inflightInterests_[pkt].state == sent_) { + inflightInterests_[pkt].state = timeout1_; + } else if (inflightInterests_[pkt].state == timeout1_) { + inflightInterests_[pkt].state = timeout2_; + } else if (inflightInterests_[pkt].state == timeout2_) { + inflightInterests_[pkt].state = lost_; + } + + if (inflightInterests_[pkt].state == lost_) { + interestRetransmissions_.erase(segmentNumber); + } else { + addRetransmissions(segmentNumber); + } + + scheduleNextInterests(); +} + +bool RTCTransportProtocol::onNack(const ContentObject &content_object, + bool rtx) { + uint32_t *payload = (uint32_t *)content_object.getPayload()->data(); + uint32_t productionSeg = *payload; + uint32_t productionRate = *(++payload); + uint32_t nackSegment = content_object.getName().getSuffix(); + + bool old_nack = false; + + // if we did not received anything between lastReceived_ + 1 and productionSeg + // most likelly some packets got lost + if (lastReceived_ != 0) { + addRetransmissions(lastReceived_ + 1, productionSeg); + } + + if (!rtx) { + gotNack_ = true; + // we synch the estimated production rate with the actual one + estimatedBw_ = (double)productionRate; + } + + if (productionSeg > nackSegment) { + // we are asking for stuff produced in the past + actualSegment_ = max(productionSeg, actualSegment_) % HICN_MIN_PROBE_SEQ; + + if (!rtx) { + if (currentState_ == HICN_RTC_NORMAL_STATE) { + currentState_ = HICN_RTC_SYNC_STATE; + } + + computeMaxWindow(productionRate, 0); + increaseWindow(); + } + + lastSegNacked_ = productionSeg; + old_nack = true; + + } else if (productionSeg < nackSegment) { + actualSegment_ = productionSeg % HICN_MIN_PROBE_SEQ; + + if (!rtx) { + // we are asking stuff in the future + gotFutureNack_++; + computeMaxWindow(productionRate, 0); + decreaseWindow(); + + if (currentState_ == HICN_RTC_SYNC_STATE) { + currentState_ = HICN_RTC_NORMAL_STATE; + } + } + } else { + // we are asking the right thing, but the producer is slow + // keep doing the same until the packet is produced + actualSegment_ = productionSeg % HICN_MIN_PROBE_SEQ; + } + + return old_nack; +} + +void RTCTransportProtocol::onContentObject( + Interest::Ptr &&interest, ContentObject::Ptr &&content_object) { + // as soon as we get a packet firstPckReceived_ will never be false + firstPckReceived_ = true; + + auto payload = content_object->getPayload(); + uint32_t payload_size = (uint32_t)payload->length(); + uint32_t segmentNumber = content_object->getName().getSuffix(); + uint32_t pkt = segmentNumber & modMask_; + + ConsumerContentObjectCallback *callback_content_object = nullptr; + socket_->getSocketOption(ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT, + &callback_content_object); + if (*callback_content_object) { + (*callback_content_object)(*socket_->getInterface(), *content_object); + } + + if (segmentNumber >= HICN_MIN_PROBE_SEQ) { + TRANSPORT_LOGD("Received probe %u", segmentNumber); + if (segmentNumber == probe_seq_number_ && !received_probe_) { + received_probe_ = true; + + uint32_t pathLabel = content_object->getPathLabel(); + if (pathTable_.find(pathLabel) == pathTable_.end()) { + std::shared_ptr<RTCDataPath> newPath = std::make_shared<RTCDataPath>(); + pathTable_[pathLabel] = newPath; + } + + // this is the expected probe, update the RTT and drop the packet + uint64_t RTT = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count() - + time_sent_probe_; + + pathTable_[pathLabel]->insertRttSample(RTT); + pathTable_[pathLabel]->receivedNack(); + } + return; + } + + // check if the packet is a rtx + bool is_rtx = false; + if (interestRetransmissions_.find(segmentNumber) != + interestRetransmissions_.end()) { + is_rtx = true; + } else { + auto it_win = packets_in_window_.find(segmentNumber); + if (it_win != packets_in_window_.end() && it_win->second != 0) + is_rtx = true; + } + + if (payload_size == HICN_NACK_HEADER_SIZE) { + TRANSPORT_LOGD("Received nack %u", segmentNumber); + + if (inflightInterests_[pkt].state == sent_) { + lastEvent_ = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + packets_in_window_.erase(segmentNumber); + inflightInterestsCount_--; + } + + bool old_nack = false; + + if (!is_rtx) { + // this is not a retransmitted packet + old_nack = onNack(*content_object, false); + updateDelayStats(*content_object); + } else { + old_nack = onNack(*content_object, true); + } + + // the nacked_ state is used only to avoid to decrease + // inflightInterestsCount_ multiple times. In fact, every time that we + // receive an event related to an interest (timeout, nacked, content) we + // cange the state. In this way we are sure that we do not decrease twice + // the counter + if (old_nack) { + inflightInterests_[pkt].state = lost_; + interestRetransmissions_.erase(segmentNumber); + } else { + inflightInterests_[pkt].state = nacked_; + } + + } else { + TRANSPORT_LOGD("Received content %u", segmentNumber); + + avgPacketSize_ = (HICN_ESTIMATED_PACKET_SIZE * avgPacketSize_) + + ((1 - HICN_ESTIMATED_PACKET_SIZE) * payload->length()); + + receivedBytes_ += (uint32_t)(content_object->headerSize() + + content_object->payloadSize()); + + if (inflightInterests_[pkt].state == sent_) { + lastEvent_ = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + packets_in_window_.erase(segmentNumber); + inflightInterestsCount_--; // packet sent without timeouts + } + + if (inflightInterests_[pkt].state == sent_ && !is_rtx) { + // delay stats are computed only for non retransmitted data + updateDelayStats(*content_object); + } + + addRetransmissions(lastReceived_ + 1, segmentNumber); + if (segmentNumber > highestReceived_) { + highestReceived_ = segmentNumber; + } + if (segmentNumber > lastReceived_) { + lastReceived_ = segmentNumber; + lastReceivedTime_ = + std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + } + receivedData_++; + inflightInterests_[pkt].state = received_; + + auto it = interestRetransmissions_.find(segmentNumber); + if (it != interestRetransmissions_.end()) lossRecovered_++; + + interestRetransmissions_.erase(segmentNumber); + + reassemble(std::move(content_object)); + increaseWindow(); + } + + scheduleNextInterests(); +} + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/rtc.h b/libtransport/src/protocols/rtc.h new file mode 100644 index 000000000..f15cdd1eb --- /dev/null +++ b/libtransport/src/protocols/rtc.h @@ -0,0 +1,227 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiTC_SYNC_STATE + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <protocols/datagram_reassembly.h> +#include <protocols/protocol.h> +#include <protocols/rtc_data_path.h> + +#include <map> +#include <queue> +#include <unordered_map> + +// algorithm state +#define HICN_RTC_SYNC_STATE 0 +#define HICN_RTC_NORMAL_STATE 1 +#define HICN_ROUNDS_IN_SYNC_BEFORE_SWITCH 3 + +// packet constants +#define HICN_INIT_PACKET_SIZE 1300 // bytes +#define HICN_PACKET_HEADER_SIZE 60 // bytes ipv6+tcp +#define HICN_NACK_HEADER_SIZE 8 // bytes +#define HICN_TIMESTAMP_SIZE 8 // bytes +#define HICN_RTC_INTEREST_LIFETIME 1000 // ms + +// rtt measurement +// normal interests for data goes from 0 to +// HICN_MIN_PROBE_SEQ, the rest is reserverd for +// probes +#define HICN_MIN_PROBE_SEQ 0xefffffff +#define HICN_MAX_PROBE_SEQ 0xffffffff + +// controller constant +#define HICN_ROUND_LEN \ + 200 // ms interval of time on which + // we take decisions / measurements +#define HICN_MAX_RTX 10 +#define HICN_MAX_RTX_SIZE 1024 +#define HICN_MAX_RTX_MAX_AGE 10000 +#define HICN_MIN_RTT_WIN 30 // rounds +#define HICN_MIN_INTER_ARRIVAL_GAP 100 // ms + +// cwin +#define HICN_INITIAL_CWIN 1 // packets +#define HICN_INITIAL_CWIN_MAX 100000 // packets +#define HICN_MIN_CWIN 10 // packets +#define HICN_WIN_INCREASE_FACTOR 1.5 +#define HICN_WIN_DECREASE_FACTOR 0.9 + +// statistics constants +#define HICN_BANDWIDTH_SLACK_FACTOR 1.8 +#define HICN_ESTIMATED_BW_ALPHA 0.7 +#define HICN_ESTIMATED_PACKET_SIZE 0.7 +#define HICN_ESTIMATED_LOSSES_ALPHA 0.8 +#define HICN_INTEREST_LIFETIME_REDUCTION_FACTOR 0.8 + +// other constants +#define HICN_NANO_IN_A_SEC 1000000000 +#define HICN_MICRO_IN_A_SEC 1000000 +#define HICN_MILLI_IN_A_SEC 1000 + +namespace transport { + +namespace protocol { + +enum packetState { sent_, nacked_, received_, timeout1_, timeout2_, lost_ }; + +typedef enum packetState packetState_t; + +struct sentInterest { + uint64_t transmissionTime; + uint32_t sequence; // sequence number of the interest sent + // to handle seq % buffer_size + packetState_t state; // see packet state +}; + +class RTCTransportProtocol : public TransportProtocol, + public DatagramReassembly { + public: + RTCTransportProtocol(implementation::ConsumerSocket *icnet_socket); + + ~RTCTransportProtocol(); + + int start() override; + + void stop() override; + + void resume() override; + + bool verifyKeyPackets() override; + + private: + // algo functions + void reset() override; + + // CC functions + void updateDelayStats(const ContentObject &content_object); + void updateStats(uint32_t round_duration); + void updateCCState(); + void computeMaxWindow(uint32_t productionRate, uint32_t BDPWin); + void updateWindow(); + void decreaseWindow(); + void increaseWindow(); + void resetPreviousWindow(); + + // packet functions + void sendInterest(Name *interest_name, bool rtx); + void scheduleNextInterests() override; + void sentinelTimer(); + void addRetransmissions(uint32_t val); + void addRetransmissions(uint32_t start, uint32_t stop); + uint64_t retransmit(); + void checkRtx(); + void probeRtt(); + void newRound(); + void onTimeout(Interest::Ptr &&interest) override; + bool onNack(const ContentObject &content_object, bool rtx); + void onContentObject(Interest::Ptr &&interest, + ContentObject::Ptr &&content_object) override; + void onPacketDropped(Interest::Ptr &&interest, + ContentObject::Ptr &&content_object) override {} + void onReassemblyFailed(std::uint32_t missing_segment) override {} + + TRANSPORT_ALWAYS_INLINE virtual void reassemble( + ContentObject::Ptr &&content_object) override { + auto read_buffer = content_object->getPayload(); + read_buffer->trimStart(HICN_TIMESTAMP_SIZE); + Reassembly::read_buffer_ = std::move(read_buffer); + Reassembly::notifyApplication(); + } + + // controller var + std::unique_ptr<asio::steady_timer> round_timer_; + unsigned currentState_; + + // cwin var + uint32_t currentCWin_; + uint32_t maxCWin_; + + // names/packets var + uint32_t actualSegment_; + uint32_t inflightInterestsCount_; + // map seq to rtx + std::map<uint32_t, uint8_t> interestRetransmissions_; + bool rtx_timer_used_; + std::unique_ptr<asio::steady_timer> rtx_timer_; + std::vector<sentInterest> inflightInterests_; + uint32_t lastSegNacked_; // indicates the segment id in the last received + // past Nack. we do not ask for retransmissions + // for samething that is older than this value. + uint32_t lastReceived_; // segment of the last content object received + // indicates the base of the window on the client + uint64_t lastReceivedTime_; // time at which we recevied the + // lastReceived_ packet + + // sentinel + // if all packets in the window get lost we need something that + // wakes up our consumer socket. Interest timeouts set to 1 sec + // expire too late. This timers expire much sooner and if it + // detects that all the interest in the window may be lost + // it sends all of them again + std::unique_ptr<asio::steady_timer> sentinel_timer_; + uint64_t lastEvent_; // time at which we removed a pending + // interest from the window + std::unordered_map<uint32_t, uint8_t> packets_in_window_; + + // rtt probes + // the RTC transport tends to overestimate the RTT + // du to the production time on the server side + // once per second we send an interest for wich we know + // we will get a nack. This nack will keep our estimation + // close to the reality + std::unique_ptr<asio::steady_timer> probe_timer_; + uint64_t time_sent_probe_; + uint32_t probe_seq_number_; + bool received_probe_; + + uint32_t modMask_; + + // stats + bool firstPckReceived_; + uint32_t receivedBytes_; + uint32_t sentInterest_; + uint32_t receivedData_; + int32_t packetLost_; + int32_t lossRecovered_; + uint32_t firstSequenceInRound_; + uint32_t highestReceived_; + double avgPacketSize_; + bool gotNack_; + uint32_t gotFutureNack_; + uint32_t rounds_; + uint32_t roundsWithoutNacks_; + + // we keep track of up two paths (if only one path is in use + // the two values in the vector will be the same) + // position 0 stores the path with minRTT + // position 1 stores the path with maxRTT + uint32_t producerPathLabels_[2]; + + std::unordered_map<uint32_t, std::shared_ptr<RTCDataPath>> pathTable_; + uint32_t roundCounter_; + + // CC var + double estimatedBw_; + double lossRate_; + double queuingDelay_; + unsigned protocolState_; + + bool initied; +}; + +} // namespace protocol + +} // namespace transport diff --git a/libtransport/src/protocols/rtc_data_path.cc b/libtransport/src/protocols/rtc_data_path.cc new file mode 100644 index 000000000..30644e939 --- /dev/null +++ b/libtransport/src/protocols/rtc_data_path.cc @@ -0,0 +1,156 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <protocols/rtc_data_path.h> + +#include <cfloat> +#include <chrono> + +#define MAX_ROUNDS_WITHOUT_PKTS 10 // 2sec + +namespace transport { + +namespace protocol { + +RTCDataPath::RTCDataPath() + : min_rtt(UINT_MAX), + prev_min_rtt(UINT_MAX), + min_owd(INT_MAX), // this is computed like in LEDBAT, so it is not the + // real OWD, but the measured one, that depends on the + // clock of sender and receiver. the only meaningful + // value is is the queueing delay. for this reason we + // keep both RTT (for the windowd calculation) and OWD + // (for congestion/quality control) + prev_min_owd(INT_MAX), + avg_owd(0.0), + queuing_delay(DBL_MAX), + lastRecvSeq_(0), + lastRecvTime_(0), + avg_inter_arrival_(DBL_MAX), + received_nacks_(false), + received_packets_(false), + rounds_without_packets_(0), + RTThistory_(HISTORY_LEN), + OWDhistory_(HISTORY_LEN){}; + +void RTCDataPath::insertRttSample(uint64_t rtt) { + // for the rtt we only keep track of the min one + if (rtt < min_rtt) min_rtt = rtt; +} + +void RTCDataPath::insertOwdSample(int64_t owd) { + // for owd we use both min and avg + if (owd < min_owd) min_owd = owd; + + if (avg_owd != DBL_MAX) + avg_owd = (avg_owd * (1 - ALPHA_RTC)) + (owd * ALPHA_RTC); + else { + avg_owd = owd; + } + + // owd is computed only for valid data packets so we count only + // this for decide if we recevie traffic or not + received_packets_ = true; +} + +void RTCDataPath::computeInterArrivalGap(uint32_t segmentNumber) { + // got packet in sequence, compute gap + if (lastRecvSeq_ == (segmentNumber - 1)) { + uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + uint64_t delta = now - lastRecvTime_; + lastRecvSeq_ = segmentNumber; + lastRecvTime_ = now; + if (avg_inter_arrival_ == DBL_MAX) + avg_inter_arrival_ = delta; + else + avg_inter_arrival_ = + (avg_inter_arrival_ * (1 - ALPHA_RTC)) + (delta * ALPHA_RTC); + return; + } + + // ooo packet, update the stasts if needed + if (lastRecvSeq_ <= segmentNumber) { + lastRecvSeq_ = segmentNumber; + lastRecvTime_ = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + } +} + +void RTCDataPath::receivedNack() { received_nacks_ = true; } + +double RTCDataPath::getInterArrivalGap() { + if (avg_inter_arrival_ == DBL_MAX) return 0; + return avg_inter_arrival_; +} + +bool RTCDataPath::isActive() { + if (received_nacks_ && rounds_without_packets_ < MAX_ROUNDS_WITHOUT_PKTS) + return true; + return false; +} + +void RTCDataPath::roundEnd() { + // reset min_rtt and add it to the history + if (min_rtt != UINT_MAX) { + prev_min_rtt = min_rtt; + } else { + // this may happen if we do not receive any packet + // from this path in the last round. in this case + // we use the measure from the previuos round + min_rtt = prev_min_rtt; + } + + if (min_rtt == 0) min_rtt = 1; + + RTThistory_.pushBack(min_rtt); + min_rtt = UINT_MAX; + + // do the same for min owd + if (min_owd != INT_MAX) { + prev_min_owd = min_owd; + } else { + min_owd = prev_min_owd; + } + + if (min_owd != INT_MAX) { + OWDhistory_.pushBack(min_owd); + min_owd = INT_MAX; + + // compute queuing delay + queuing_delay = avg_owd - getMinOwd(); + + } else { + queuing_delay = 0.0; + } + + if (!received_packets_) + rounds_without_packets_++; + else + rounds_without_packets_ = 0; + received_packets_ = false; +} + +double RTCDataPath::getQueuingDealy() { return queuing_delay; } + +uint64_t RTCDataPath::getMinRtt() { return RTThistory_.begin(); } + +int64_t RTCDataPath::getMinOwd() { return OWDhistory_.begin(); } + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/rtc_data_path.h b/libtransport/src/protocols/rtc_data_path.h new file mode 100644 index 000000000..9076b355f --- /dev/null +++ b/libtransport/src/protocols/rtc_data_path.h @@ -0,0 +1,80 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <stdint.h> +#include <climits> + +#include <utils/min_filter.h> + +#define ALPHA_RTC 0.125 +#define HISTORY_LEN 20 // 4 sec + +namespace transport { + +namespace protocol { + +class RTCDataPath { + public: + RTCDataPath(); + + public: + void insertRttSample(uint64_t rtt); + void insertOwdSample(int64_t owd); + void computeInterArrivalGap(uint32_t segmentNumber); + void receivedNack(); + + uint64_t getMinRtt(); + double getQueuingDealy(); + double getInterArrivalGap(); + bool isActive(); + + void roundEnd(); + + private: + int64_t getMinOwd(); + + uint64_t min_rtt; + uint64_t prev_min_rtt; + + int64_t min_owd; + int64_t prev_min_owd; + + double avg_owd; + + double queuing_delay; + + uint32_t lastRecvSeq_; + uint64_t lastRecvTime_; + double avg_inter_arrival_; + + // flags to check if a path is active + // we considere a path active if it reaches a producer + //(not a cache) --aka we got at least one nack on this path-- + // and if we receives packets + bool received_nacks_; + bool received_packets_; + uint8_t rounds_without_packets_; // if we don't get any packet + // for MAX_ROUNDS_WITHOUT_PKTS + // we consider the path inactive + + utils::MinFilter<uint64_t> RTThistory_; + utils::MinFilter<int64_t> OWDhistory_; +}; + +} // namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/test/CMakeLists.txt b/libtransport/src/protocols/test/CMakeLists.txt new file mode 100644 index 000000000..6f9fdb9aa --- /dev/null +++ b/libtransport/src/protocols/test/CMakeLists.txt @@ -0,0 +1,10 @@ +# Enable gcov output for the tests +add_definitions(--coverage) +set(CMAKE_EXE_LINKER_FLAGS ${CMAKE_EXE_LINKER_FLAGS} " --coverage") + +set(TestsExpectedToPass + test_transport_producer) + +foreach(test ${TestsExpectedToPass}) + AddTest(${test}) +endforeach()
\ No newline at end of file diff --git a/libtransport/src/protocols/test/test_transport_producer.cc b/libtransport/src/protocols/test/test_transport_producer.cc new file mode 100644 index 000000000..204f2cbe2 --- /dev/null +++ b/libtransport/src/protocols/test/test_transport_producer.cc @@ -0,0 +1,80 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <gtest/gtest.h> + +#include "../socket_producer.h" +#include "literals.h" + +#include <test.h> +#include <random> + +namespace transport { + +namespace protocol { + +namespace { +// The fixture for testing class Foo. +class ProducerTest : public ::testing::Test { + protected: + ProducerTest() : name_("b001::123|321"), producer_(io_service_) { + // You can do set-up work for each test here. + } + + virtual ~ProducerTest() { + // You can do clean-up work that doesn't throw exceptions here. + } + + // If the constructor and destructor are not enough for setting up + // and cleaning up each test, you can define the following methods: + + virtual void SetUp() { + // Code here will be called immediately after the constructor (right + // before each test). + } + + virtual void TearDown() { + // Code here will be called immediately after each test (right + // before the destructor). + } + + Name name_; + asio::io_service io_service_; + ProducerSocket producer_; +}; + +} // namespace + +// Tests that the Foo::Bar() method does Abc. +TEST_F(ProducerTest, ProduceContent) { + std::string content(250000, '?'); + + producer_.registerPrefix(Prefix("b001::/64")); + producer_.produce(name_, reinterpret_cast<const uint8_t *>(content.data()), + content.size(), true); + producer_.setSocketOption(GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME, + 500000000_U32); + producer_.attach(); + producer_.serveForever(); +} + +} // namespace protocol + +} // namespace transport + +int main(int argc, char **argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +}
\ No newline at end of file diff --git a/libtransport/src/protocols/verification_manager.cc b/libtransport/src/protocols/verification_manager.cc new file mode 100644 index 000000000..8eedd6106 --- /dev/null +++ b/libtransport/src/protocols/verification_manager.cc @@ -0,0 +1,101 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <hicn/transport/interfaces/socket_consumer.h> +#include <hicn/transport/security/verifier.h> + +#include <implementation/socket_consumer.h> +#include <protocols/verification_manager.h> + +namespace transport { + +namespace protocol { + +interface::VerificationPolicy SignatureVerificationManager::onPacketToVerify( + const Packet& packet) { + using namespace interface; + + bool verify_signature = false, key_content = false; + VerificationPolicy ret = VerificationPolicy::DROP_PACKET; + + icn_socket_->getSocketOption(GeneralTransportOptions::VERIFY_SIGNATURE, + verify_signature); + icn_socket_->getSocketOption(GeneralTransportOptions::KEY_CONTENT, + key_content); + + if (!verify_signature) { + return VerificationPolicy::ACCEPT_PACKET; + } + + if (key_content) { + key_packets_.push(copyPacket(packet)); + return VerificationPolicy::ACCEPT_PACKET; + } else if (!key_packets_.empty()) { + std::queue<ContentObjectPtr>().swap(key_packets_); + } + + ConsumerContentObjectVerificationFailedCallback* + verification_failed_callback = VOID_HANDLER; + icn_socket_->getSocketOption(ConsumerCallbacksOptions::VERIFICATION_FAILED, + &verification_failed_callback); + + if (!verification_failed_callback) { + throw errors::RuntimeException( + "No verification failed callback provided by application. " + "Aborting."); + } + + std::shared_ptr<utils::Verifier> verifier; + icn_socket_->getSocketOption(GeneralTransportOptions::VERIFIER, verifier); + + if (TRANSPORT_EXPECT_FALSE(!verifier)) { + ret = (*verification_failed_callback)( + *icn_socket_->getInterface(), + dynamic_cast<const ContentObject&>(packet), + make_error_code(protocol_error::no_verifier_provided)); + return ret; + } + + if (!verifier->verify(packet)) { + ret = (*verification_failed_callback)( + *icn_socket_->getInterface(), + dynamic_cast<const ContentObject&>(packet), + make_error_code(protocol_error::signature_verification_failed)); + } else { + ret = VerificationPolicy::ACCEPT_PACKET; + } + + return ret; +} + +bool SignatureVerificationManager::onKeyToVerify() { + if (TRANSPORT_EXPECT_FALSE(key_packets_.empty())) { + throw errors::RuntimeException("No key to verify."); + } + + while (!key_packets_.empty()) { + ContentObjectPtr packet_to_verify = key_packets_.front(); + key_packets_.pop(); + if (onPacketToVerify(*packet_to_verify) != + VerificationPolicy::ACCEPT_PACKET) + return false; + } + + return true; +} + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/protocols/verification_manager.h b/libtransport/src/protocols/verification_manager.h new file mode 100644 index 000000000..7d8a00a65 --- /dev/null +++ b/libtransport/src/protocols/verification_manager.h @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <hicn/transport/interfaces/callbacks.h> +#include <hicn/transport/interfaces/verification_policy.h> + +#include <hicn/transport/core/content_object.h> + +#include <protocols/errors.h> + +#include <queue> + +namespace transport { + +namespace interface { +class ConsumerSocket; +} + +namespace protocol { + +using Packet = core::Packet; +using interface::VerificationPolicy; +using ContentObjectPtr = std::shared_ptr<core::ContentObject>; + +class VerificationManager { + public: + virtual ~VerificationManager() = default; + virtual VerificationPolicy onPacketToVerify(const Packet& packet) = 0; + virtual bool onKeyToVerify() { return false; } +}; + +class SignatureVerificationManager : public VerificationManager { + public: + SignatureVerificationManager(implementation::ConsumerSocket* icn_socket) + : icn_socket_(icn_socket), key_packets_() {} + + interface::VerificationPolicy onPacketToVerify(const Packet& packet) override; + bool onKeyToVerify() override; + + private: + implementation::ConsumerSocket* icn_socket_; + std::queue<ContentObjectPtr> key_packets_; + + ContentObjectPtr copyPacket(const Packet& packet) { + std::shared_ptr<utils::MemBuf> packet_copy = + packet.acquireMemBufReference(); + ContentObjectPtr content_object_copy = + std::make_shared<core::ContentObject>(std::move(packet_copy)); + std::unique_ptr<utils::MemBuf> payload_copy = packet.getPayload(); + content_object_copy->appendPayload(std::move(payload_copy)); + return content_object_copy; + } +}; + +} // end namespace protocol + +} // end namespace transport |