diff options
author | Luca Muscariello <lumuscar+fdio@cisco.com> | 2019-01-17 13:47:57 +0100 |
---|---|---|
committer | Luca Muscariello <lumuscar+fdio@cisco.com> | 2019-01-17 16:32:51 +0100 |
commit | bac3da61644515f05663789b122554dc77549286 (patch) | |
tree | 898210bc8e70371d77de7d446a26c5dd4fd1165a /libtransport/src/hicn/transport/protocols/vegas.cc | |
parent | d5165246787301d0f13b646fda5e8a8567aef5ac (diff) |
This is the first commit of the hicn projectv19.01
Change-Id: I6f2544ad9b9f8891c88cc4bcce3cf19bd3cc863f
Signed-off-by: Luca Muscariello <lumuscar+fdio@cisco.com>
Diffstat (limited to 'libtransport/src/hicn/transport/protocols/vegas.cc')
-rwxr-xr-x | libtransport/src/hicn/transport/protocols/vegas.cc | 630 |
1 files changed, 630 insertions, 0 deletions
diff --git a/libtransport/src/hicn/transport/protocols/vegas.cc b/libtransport/src/hicn/transport/protocols/vegas.cc new file mode 100755 index 000000000..b6d79bfcc --- /dev/null +++ b/libtransport/src/hicn/transport/protocols/vegas.cc @@ -0,0 +1,630 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <hicn/transport/errors/not_implemented_exception.h> +#include <hicn/transport/interfaces/socket_consumer.h> +#include <hicn/transport/protocols/vegas.h> +#include <hicn/transport/utils/literals.h> + +#include <cmath> + +namespace transport { + +namespace protocol { + +using namespace interface; + +VegasTransportProtocol::VegasTransportProtocol(BaseSocket *icnet_socket) + : TransportProtocol(icnet_socket), + is_final_block_number_discovered_(false), + final_block_number_(std::numeric_limits<uint32_t>::max()), + last_reassembled_segment_(0), + content_buffer_size_(0), + current_window_size_(default_values::min_window_size), + interests_in_flight_(0), + next_suffix_(0), + interest_retransmissions_(1 << default_values::log_2_default_buffer_size), + interest_timepoints_(1 << default_values::log_2_default_buffer_size), + retx_count_(0), + receive_buffer_(1 << default_values::log_2_default_buffer_size), + unverified_segments_(1 << default_values::log_2_default_buffer_size), + verified_manifests_(1 << default_values::log_2_default_buffer_size), + mask_((1 << default_values::log_2_default_buffer_size) - 1), + incremental_suffix_index_(0), + suffix_queue_completed_(false), + download_with_manifest_(false), + next_manifest_interval_(0_U16), + interest_tx_(0), + interest_count_(0), + byte_count_(0), + average_rtt_(0.0) { + portal_ = socket_->portal_; + incremental_suffix_index_++; +} + +VegasTransportProtocol::~VegasTransportProtocol() { stop(); } + +void VegasTransportProtocol::reset() { + portal_->setConsumerCallback(this); + + is_final_block_number_discovered_ = false; + interest_pool_index_ = 0; + final_block_number_ = std::numeric_limits<uint32_t>::max(); + next_suffix_ = 0; + interests_in_flight_ = 0; + last_reassembled_segment_ = 0; + content_buffer_size_ = 0; + content_buffer_->clear(); + interest_retransmissions_.clear(); + interest_retransmissions_.resize( + 1 << default_values::log_2_default_buffer_size, 0); + interest_timepoints_.clear(); + interest_timepoints_.resize(1 << default_values::log_2_default_buffer_size, + std::chrono::steady_clock::time_point()); + receive_buffer_.clear(); + unverified_segments_.clear(); + verified_manifests_.clear(); + next_manifest_interval_ = 0; + next_manifest_ = 0; + download_with_manifest_ = false; + incremental_suffix_index_ = 0; + + interest_tx_ = 0; + interest_count_ = 0; + byte_count_ = 0; + average_rtt_ = 0; + + // asio::io_service &io_service = portal_->getIoService(); + + // if (io_service.stopped()) { + // io_service.reset(); + // } +} + +void VegasTransportProtocol::start( + utils::SharableVector<uint8_t> &content_buffer) { + + if(is_running_) + return; + + socket_->t0_ = std::chrono::steady_clock::now(); + + is_running_ = true; + content_buffer_ = content_buffer.shared_from_this(); + + reset(); + + sendInterest(next_suffix_++); + portal_->runEventsLoop(); + removeAllPendingInterests(); + is_running_ = false; + +} + +void VegasTransportProtocol::resume(){ + if(is_running_) + return; + + is_running_ = true; + sendInterest(next_suffix_++); + portal_->runEventsLoop(); + removeAllPendingInterests(); + is_running_ = false; +} + +void VegasTransportProtocol::sendInterest(std::uint64_t next_suffix) { + auto interest = getInterest(); + socket_->network_name_.setSuffix(next_suffix); + interest->setName(socket_->network_name_); + + interest->setLifetime(uint32_t(socket_->interest_lifetime_)); + + if (socket_->on_interest_output_ != VOID_HANDLER) { + socket_->on_interest_output_(*socket_, *interest); + } + + if (TRANSPORT_EXPECT_FALSE(!is_running_)) { + return; + } + + interests_in_flight_++; + interest_retransmissions_[next_suffix & mask_] = 0; + interest_timepoints_[next_suffix & mask_] = std::chrono::steady_clock::now(); + + using namespace std::placeholders; + portal_->sendInterest(std::move(interest)); +} + +void VegasTransportProtocol::stop() { + is_running_ = false; + portal_->stopEventsLoop(); +} + +void VegasTransportProtocol::onContentSegment( + Interest::Ptr &&interest, ContentObject::Ptr &&content_object) { + uint32_t incremental_suffix = content_object->getName().getSuffix(); + bool virtual_download = socket_->virtual_download_; + + if (verifyContentObject(*content_object)) { + byte_count_ += content_object->getPayload().length(); + + if (TRANSPORT_EXPECT_FALSE(content_object->testRst())) { + is_final_block_number_discovered_ = true; + final_block_number_ = incremental_suffix; + } + + if (!virtual_download) { + receive_buffer_.emplace( + std::make_pair(incremental_suffix, std::move(content_object))); + reassemble(); + } else if (TRANSPORT_EXPECT_FALSE(is_final_block_number_discovered_ && + incremental_suffix == + final_block_number_)) { + returnContentToUser(); + } + } else { + unverified_segments_.emplace( + std::make_pair(incremental_suffix, std::move(content_object))); + } +} + +void VegasTransportProtocol::afterContentReception( + const Interest &interest, const ContentObject &content_object) { + increaseWindow(); +} + +void VegasTransportProtocol::afterDataUnsatisfied(uint64_t segment) { + decreaseWindow(); +} + +void VegasTransportProtocol::scheduleNextInterests() { + if (is_running_) { + uint32_t next_suffix; + while (interests_in_flight_ < current_window_size_) { + if (download_with_manifest_) { + if (suffix_queue_.size() * 2 < current_window_size_ && + next_manifest_ < final_block_number_ && next_manifest_interval_) { + next_manifest_ += next_manifest_interval_; + sendInterest(next_manifest_); + continue; + } + + if (suffix_queue_.pop(next_suffix)) { + // next_suffix = suffix_queue_.front(); + sendInterest(next_suffix); + // suffix_queue_.pop_front(); + } else { + if (!suffix_queue_completed_) { + TRANSPORT_LOGE("Empty queue!!!!!!"); + } + break; + } + } else { + if (is_final_block_number_discovered_) { + if (next_suffix_ > final_block_number_) { + return; + } + } + + sendInterest(next_suffix_++); + } + } + } +} + +void VegasTransportProtocol::decreaseWindow() { + if (current_window_size_ > socket_->min_window_size_) { + current_window_size_ = std::ceil(current_window_size_ / 2); + socket_->current_window_size_ = current_window_size_; + } +} + +void VegasTransportProtocol::increaseWindow() { + if (current_window_size_ < socket_->max_window_size_) { + current_window_size_++; + socket_->max_window_size_ = current_window_size_; + } +}; + +void VegasTransportProtocol::changeInterestLifetime(uint64_t segment) { + std::chrono::steady_clock::duration duration = + std::chrono::steady_clock::now() - interest_timepoints_[segment]; + rtt_estimator_.addMeasurement( + std::chrono::duration_cast<std::chrono::microseconds>(duration)); + + RtoEstimator::Duration rto = rtt_estimator_.computeRto(); + std::chrono::milliseconds lifetime = + std::chrono::duration_cast<std::chrono::milliseconds>(rto); + + socket_->interest_lifetime_ = lifetime.count(); +} + +void VegasTransportProtocol::returnContentToUser() { + if (socket_->on_payload_retrieved_ != VOID_HANDLER) { + socket_->on_payload_retrieved_(*socket_, byte_count_, + std::make_error_code(std::errc(0))); + } + + stop(); +} + +void VegasTransportProtocol::onManifest( + std::unique_ptr<ContentObjectManifest> &&manifest) { + if (TRANSPORT_EXPECT_FALSE(!is_running_)) { + return; + } + + download_with_manifest_ = true; + + uint32_t segment = manifest->getName().getSuffix(); + + if (verifyManifest(*manifest)) { + manifest->decode(); + + if (TRANSPORT_EXPECT_TRUE(manifest->getVersion() == + core::ManifestVersion::VERSION_1)) { + switch (manifest->getManifestType()) { + case core::ManifestType::INLINE_MANIFEST: { + auto _it = manifest->getSuffixList().begin(); + auto _end = --manifest->getSuffixList().end(); + + if (TRANSPORT_EXPECT_FALSE(manifest->isFinalManifest())) { + _end++; + } + + // Get final block number + is_final_block_number_discovered_ = true; + final_block_number_ = manifest->getFinalBlockNumber(); + + for (; _it != _end; _it++) { + suffix_hash_map_[_it->first] = std::make_pair( + std::vector<uint8_t>(_it->second, _it->second + 32), + manifest->getHashAlgorithm()); + suffix_queue_.push(_it->first); + } + + next_manifest_interval_ = manifest->getSuffixList().size(); + + if (manifest->isFinalManifest()) { + suffix_queue_completed_ = true; + // Give it a try + if (verifier_thread_) { + asio::io_service &io_service = portal_->getIoService(); + io_service.post([this]() { scheduleNextInterests(); }); + } + } + + break; + } + case core::ManifestType::FLIC_MANIFEST: { + throw errors::NotImplementedException(); + } + case core::ManifestType::FINAL_CHUNK_NUMBER: { + throw errors::NotImplementedException(); + } + } + } + + if (!socket_->virtual_download_) { + receive_buffer_.emplace( + std::make_pair(segment, std::move(manifest->getPacket()))); + reassemble(); + } else { + if (segment >= final_block_number_) { + stop(); + } + } + } +} + +bool VegasTransportProtocol::verifyManifest( + const ContentObjectManifest &manifest) { + if (!socket_->verify_signature_) { + return true; + } + + bool is_data_secure = false; + + if (socket_->on_content_object_verification_ == VOID_HANDLER) { + is_data_secure = static_cast<bool>(socket_->verifier_.verify(manifest)); + } else if (socket_->on_content_object_verification_(*socket_, manifest)) { + is_data_secure = true; + } + + if (TRANSPORT_EXPECT_FALSE(!is_data_secure)) { + TRANSPORT_LOGE("Verification failed for %s\n", + manifest.getName().toString().c_str()); + } + + return is_data_secure; +} + +// TODO Add the name in the digest computation! +void VegasTransportProtocol::onContentObject( + Interest::Ptr &&interest, ContentObject::Ptr &&content_object) { + uint32_t incremental_suffix = content_object->getName().getSuffix(); + + std::chrono::microseconds rtt; + Time now = std::chrono::steady_clock::now(); + std::chrono::steady_clock::duration duration = + now - interest_timepoints_[incremental_suffix & mask_]; + rtt = std::chrono::duration_cast<std::chrono::microseconds>(duration); + + average_rtt_ = (0.7 * average_rtt_) + (0.3 * (double)rtt.count()); + + if (socket_->on_timer_expires_ != VOID_HANDLER) { + auto dt = std::chrono::duration_cast<TimeDuration>(now - socket_->t0_); + if (dt.count() > socket_->timer_interval_milliseconds_) { + socket_->on_timer_expires_(*socket_, byte_count_, dt, + current_window_size_, retx_count_, + std::round(average_rtt_)); + socket_->t0_ = std::chrono::steady_clock::now(); + } + } + + interests_in_flight_--; + + if (TRANSPORT_EXPECT_FALSE(!is_running_ || incremental_suffix == ~0_U64 || + receive_buffer_.find(incremental_suffix) != + receive_buffer_.end())) { + return; + } + + changeInterestLifetime(incremental_suffix); + + if (socket_->on_content_object_input_ != VOID_HANDLER) { + socket_->on_content_object_input_(*socket_, *content_object); + } + + if (socket_->on_interest_satisfied_ != VOID_HANDLER) { + socket_->on_interest_satisfied_(*socket_, *interest); + } + + if (!interest_retransmissions_[incremental_suffix & mask_]) { + afterContentReception(*interest, *content_object); + } + + if (TRANSPORT_EXPECT_FALSE(content_object->getPayloadType() == + PayloadType::MANIFEST)) { + // TODO Fix manifest!! + auto manifest = + std::make_unique<ContentObjectManifest>(std::move(content_object)); + + if (verifier_thread_ && incremental_suffix != 0) { + // verifier_thread_->add(std::bind(&VegasTransportProtocol::onManifest, + // this, std::move(manifest))); + } else { + onManifest(std::move(manifest)); + } + } else if (content_object->getPayloadType() == PayloadType::CONTENT_OBJECT) { + if (verifier_thread_) { + // verifier_thread_->add(std::bind(&VegasTransportProtocol::onContentSegment, + // this, std::move(content_object))); + } else { + onContentSegment(std::move(interest), std::move(content_object)); + } + } + + scheduleNextInterests(); +} + +bool VegasTransportProtocol::verifyContentObject( + const ContentObject &content_object) { + if (!dynamic_cast<ConsumerSocket *>(socket_)->verify_signature_) { + return true; + } + + uint64_t segment = content_object.getName().getSuffix(); + + bool ret = false; + + if (download_with_manifest_) { + auto it = suffix_hash_map_.find(segment); + if (it != suffix_hash_map_.end()) { + auto hash_type = static_cast<utils::CryptoHashType>(it->second.second); + auto data_packet_digest = content_object.computeDigest(it->second.second); + auto data_packet_digest_bytes = + data_packet_digest.getDigest<uint8_t>().data(); + std::vector<uint8_t> &manifest_digest_bytes = it->second.first; + + if (utils::CryptoHash::compareBinaryDigest(data_packet_digest_bytes, + manifest_digest_bytes.data(), + hash_type)) { + suffix_hash_map_.erase(it); + ret = true; + } else { + throw errors::RuntimeException( + "Verification failure policy has to be implemented."); + } + } + } else { + ret = static_cast<bool>( + dynamic_cast<ConsumerSocket *>(socket_)->verifier_.verify( + content_object)); + + if (!ret) { + throw errors::RuntimeException( + "Verification failure policy has to be implemented."); + } + } + + return ret; + ; +} + +void VegasTransportProtocol::onTimeout(Interest::Ptr &&interest) { + TRANSPORT_LOGW("Timeout on %s", interest->getName().toString().c_str()); + + if (TRANSPORT_EXPECT_FALSE(!is_running_)) { + return; + } + + interests_in_flight_--; + + uint64_t segment = interest->getName().getSuffix(); + + // Do not retransmit interests asking contents that do not exist. + if (is_final_block_number_discovered_) { + if (segment > final_block_number_) { + return; + } + } + + if (socket_->on_interest_timeout_ != VOID_HANDLER) { + socket_->on_interest_timeout_(*socket_, *interest); + } + + afterDataUnsatisfied(segment); + + if (TRANSPORT_EXPECT_TRUE(interest_retransmissions_[segment & mask_] < + socket_->max_retransmissions_)) { + retx_count_++; + + if (socket_->on_interest_retransmission_ != VOID_HANDLER) { + socket_->on_interest_retransmission_(*socket_, *interest); + } + + if (socket_->on_interest_output_ != VOID_HANDLER) { + socket_->on_interest_output_(*socket_, *interest); + } + + if (!is_running_) { + return; + } + + // retransmit + interests_in_flight_++; + interest_retransmissions_[segment & mask_]++; + + using namespace std::placeholders; + portal_->sendInterest(std::move(interest)); + } else { + TRANSPORT_LOGE("Stop: reached max retx limit."); + partialDownload(); + stop(); + } +} + +void VegasTransportProtocol::copyContent(const ContentObject &content_object) { + Array a = content_object.getPayload(); + + content_buffer_->insert(content_buffer_->end(), (uint8_t *)a.data(), + (uint8_t *)a.data() + a.length()); + + bool download_completed = + is_final_block_number_discovered_ && + content_object.getName().getSuffix() == final_block_number_; + + if (TRANSPORT_EXPECT_FALSE(download_completed || !is_running_)) { + // asio::io_service& io_service = portal_->getIoService(); + // io_service.post([this] () { + returnContentToUser(); + // }); + } +} + +void VegasTransportProtocol::reassemble() { + uint64_t index = last_reassembled_segment_; + auto it = receive_buffer_.find(index); + + do { + if (it->second->getPayloadType() == PayloadType::CONTENT_OBJECT) { + copyContent(*it->second); + receive_buffer_.erase(it); + } + + index = ++last_reassembled_segment_; + it = receive_buffer_.find(index); + } while (it != receive_buffer_.end()); +} + +void VegasTransportProtocol::partialDownload() { + if (!socket_->virtual_download_) { + reassemble(); + } + + if (socket_->on_payload_retrieved_ != VOID_HANDLER) { + socket_->on_payload_retrieved_( + *socket_, byte_count_, + std::make_error_code(std::errc(std::errc::io_error))); + } +} + +// TODO Check vegas protocol +// void VegasTransportProtocol::checkForFastRetransmission(const Interest +// &interest) { +// uint64_t segNumber = interest.getName().getSuffix(); +// received_segments_[segNumber] = true; +// fast_retransmitted_segments.erase(segNumber); + +// uint64_t possibly_lost_segment = 0; +// uint64_t highest_received_segment = received_segments_.rbegin()->first; + +// for (uint64_t i = 0; i <= highest_received_segment; i++) { +// if (received_segments_.find(i) == received_segments_.end()) { +// if (fast_retransmitted_segments.find(i) == +// fast_retransmitted_segments.end()) { +// possibly_lost_segment = i; +// uint8_t out_of_order_segments = 0; +// for (uint64_t j = i; j <= highest_received_segment; j++) { +// if (received_segments_.find(j) != received_segments_.end()) { +// out_of_order_segments++; +// if (out_of_order_segments >= +// default_values::max_out_of_order_segments) { +// fast_retransmitted_segments[possibly_lost_segment] = true; +// fastRetransmit(interest, possibly_lost_segment); +// } +// } +// } +// } +// } +// } +// } + +// void VegasTransportProtocol::fastRetransmit(const Interest &interest, +// uint32_t chunk_number) { +// if (interest_retransmissions_[chunk_number & mask_] < +// socket_->max_retransmissions_) { +// Name name = interest.getName(); +// name.setSuffix(chunk_number); + +// std::shared_ptr<Interest> retx_interest = +// std::make_shared<Interest>(name); + +// if (socket_->on_interest_retransmission_ != VOID_HANDLER) { +// socket_->on_interest_retransmission_(*socket_, *retx_interest); +// } + +// if (socket_->on_interest_output_ != VOID_HANDLER) { +// socket_->on_interest_output_(*socket_, *retx_interest); +// } + +// if (!is_running_) { +// return; +// } + +// interests_in_flight_++; +// interest_retransmissions_[chunk_number & mask_]++; + +// using namespace std::placeholders; +// portal_->sendInterest(std::move(retx_interest)); +// } +// } + +void VegasTransportProtocol::removeAllPendingInterests() { portal_->clear(); } + +} // end namespace protocol + +} // namespace transport |