aboutsummaryrefslogtreecommitdiffstats
path: root/icnet/transport/icnet_transport_vegas.cc
diff options
context:
space:
mode:
Diffstat (limited to 'icnet/transport/icnet_transport_vegas.cc')
-rw-r--r--icnet/transport/icnet_transport_vegas.cc488
1 files changed, 488 insertions, 0 deletions
diff --git a/icnet/transport/icnet_transport_vegas.cc b/icnet/transport/icnet_transport_vegas.cc
new file mode 100644
index 00000000..fae29bf8
--- /dev/null
+++ b/icnet/transport/icnet_transport_vegas.cc
@@ -0,0 +1,488 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "icnet_transport_vegas.h"
+#include "icnet_socket_consumer.h"
+
+namespace icnet {
+
+VegasTransportProtocol::VegasTransportProtocol(Socket *icnet_socket)
+ : TransportProtocol(icnet_socket),
+ is_final_block_number_discovered_(false),
+ final_block_number_(std::numeric_limits<uint64_t>::max()),
+ last_reassembled_segment_(0),
+ content_buffer_size_(0),
+ current_window_size_(default_values::min_window_size),
+ interests_in_flight_(0),
+ segment_number_(0),
+ interest_retransmissions_(default_values::max_window_size),
+ interest_timepoints_(default_values::default_buffer_size),
+ receive_buffer_(default_values::default_buffer_size),
+ unverified_segments_(default_values::default_buffer_size),
+ verified_manifests_(default_values::default_buffer_size) {
+ icnet_socket->getSocketOption(PORTAL, portal_);
+}
+
+VegasTransportProtocol::~VegasTransportProtocol() {
+ stop();
+}
+
+void VegasTransportProtocol::start() {
+ is_running_ = true;
+ is_final_block_number_discovered_ = false;
+ final_block_number_ = std::numeric_limits<uint64_t>::max();
+ segment_number_ = 0;
+ interests_in_flight_ = 0;
+ last_reassembled_segment_ = 0;
+ content_buffer_size_ = 0;
+ content_buffer_.clear();
+ interest_retransmissions_.clear();
+ receive_buffer_.clear();
+ unverified_segments_.clear();
+ verified_manifests_.clear();
+
+ sendInterest();
+
+ bool isAsync = false;
+ socket_->getSocketOption(ASYNC_MODE, isAsync);
+
+ bool isContextRunning = false;
+ socket_->getSocketOption(RUNNING, isContextRunning);
+
+ if (!isAsync && !isContextRunning) {
+ socket_->setSocketOption(RUNNING, true);
+ portal_->runEventsLoop();
+
+ // If portal returns, the download (maybe) is finished, so we can remove the pending interests
+
+ removeAllPendingInterests();
+ }
+}
+
+// TODO Reuse this function for sending an arbitrary interest
+void VegasTransportProtocol::sendInterest() {
+ Name prefix;
+ socket_->getSocketOption(GeneralTransportOptions::NAME_PREFIX, prefix);
+
+ Name suffix;
+ socket_->getSocketOption(GeneralTransportOptions::NAME_SUFFIX, suffix);
+
+ if (!suffix.empty()) {
+ prefix.append(suffix);
+ }
+
+ prefix.appendSegment(segment_number_);
+
+ std::shared_ptr<Interest> interest = std::make_shared<Interest>(std::move(prefix));
+
+ int interestLifetime = default_values::interest_lifetime;
+ socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, interestLifetime);
+ interest->setInterestLifetime(uint32_t(interestLifetime));
+
+ ConsumerInterestCallback on_interest_output = VOID_HANDLER;
+
+ socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT, on_interest_output);
+ if (on_interest_output != VOID_HANDLER) {
+ on_interest_output(*dynamic_cast<ConsumerSocket *>(socket_), *interest);
+ }
+
+ if (!is_running_) {
+ return;
+ }
+
+ interests_in_flight_++;
+ interest_retransmissions_[segment_number_ % default_values::default_buffer_size] = 0;
+ interest_timepoints_[segment_number_ % default_values::default_buffer_size] = std::chrono::steady_clock::now();
+
+ portal_->sendInterest(*interest,
+ bind(&VegasTransportProtocol::onContentSegment, this, _1, _2),
+ bind(&VegasTransportProtocol::onTimeout, this, _1));
+ segment_number_++;
+}
+
+void VegasTransportProtocol::stop() {
+ is_running_ = false;
+ portal_->stopEventsLoop();
+}
+
+void VegasTransportProtocol::onContentSegment(const Interest &interest, ContentObject &content_object) {
+ uint64_t segment = interest.getName().get(-1).toSegment();
+
+ if (is_running_ == false /*|| input_buffer_[segment]*/) {
+ return;
+ }
+
+ interests_in_flight_--;
+
+ changeInterestLifetime(segment);
+ ConsumerContentObjectCallback on_data_input = VOID_HANDLER;
+ socket_->getSocketOption(ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT, on_data_input);
+ if (on_data_input != VOID_HANDLER) {
+ on_data_input(*dynamic_cast<ConsumerSocket *>(socket_), content_object);
+ }
+
+ ConsumerInterestCallback on_interest_satisfied = VOID_HANDLER;
+ socket_->getSocketOption(INTEREST_SATISFIED, on_interest_satisfied);
+ if (on_interest_satisfied != VOID_HANDLER) {
+ on_interest_satisfied(*dynamic_cast<ConsumerSocket *>(socket_), const_cast<Interest &>(interest));
+ }
+
+ if (content_object.getContentType() == PayloadType::MANIFEST) {
+ onManifest(interest, content_object);
+ } else if (content_object.getContentType() == PayloadType::DATA) {
+ onContentObject(interest, content_object);
+ } // TODO InterestReturn
+
+ scheduleNextInterests();
+}
+
+void VegasTransportProtocol::afterContentReception(const Interest &interest, const ContentObject &content_object) {
+ increaseWindow();
+}
+
+void VegasTransportProtocol::afterDataUnsatisfied(uint64_t segment) {
+ decreaseWindow();
+}
+
+void VegasTransportProtocol::scheduleNextInterests() {
+ if (segment_number_ == 0) {
+ current_window_size_ = final_block_number_;
+
+ double maxWindowSize = -1;
+ socket_->getSocketOption(MAX_WINDOW_SIZE, maxWindowSize);
+
+ if (current_window_size_ > maxWindowSize) {
+ current_window_size_ = maxWindowSize;
+ }
+
+ while (interests_in_flight_ < current_window_size_) {
+ if (is_final_block_number_discovered_) {
+ if (segment_number_ <= final_block_number_) {
+ sendInterest();
+ } else {
+ break;
+ }
+ } else {
+ sendInterest();
+ }
+ }
+ } else {
+ if (is_running_) {
+ while (interests_in_flight_ < current_window_size_) {
+ if (is_final_block_number_discovered_) {
+ if (segment_number_ <= final_block_number_) {
+ sendInterest();
+ } else {
+ break;
+ }
+ } else {
+ sendInterest();
+ }
+ }
+ }
+ }
+}
+
+void VegasTransportProtocol::decreaseWindow() {
+ double min_window_size = -1;
+ socket_->getSocketOption(MIN_WINDOW_SIZE, min_window_size);
+ if (current_window_size_ > min_window_size) {
+ current_window_size_ = std::ceil(current_window_size_ / 2);
+
+ socket_->setSocketOption(CURRENT_WINDOW_SIZE, current_window_size_);
+ }
+}
+
+void VegasTransportProtocol::increaseWindow() {
+ double max_window_size = -1;
+ socket_->getSocketOption(MAX_WINDOW_SIZE, max_window_size);
+ if (current_window_size_ < max_window_size) // don't expand window above max level
+ {
+ current_window_size_++;
+ socket_->setSocketOption(CURRENT_WINDOW_SIZE, current_window_size_);
+ }
+};
+
+void VegasTransportProtocol::changeInterestLifetime(uint64_t segment) {
+ std::chrono::steady_clock::duration duration = std::chrono::steady_clock::now() - interest_timepoints_[segment];
+ rtt_estimator_.addMeasurement(std::chrono::duration_cast<std::chrono::microseconds>(duration));
+
+ RtoEstimator::Duration rto = rtt_estimator_.computeRto();
+ std::chrono::milliseconds lifetime = std::chrono::duration_cast<std::chrono::milliseconds>(rto);
+
+ socket_->setSocketOption(INTEREST_LIFETIME, (int) lifetime.count());
+}
+
+void VegasTransportProtocol::onManifest(const Interest &interest, ContentObject &content_object) {
+ if (!is_running_) {
+ return;
+ }
+
+ if (verifyManifest(content_object)) {
+ // TODO Retrieve piece of data using manifest
+ }
+}
+
+bool VegasTransportProtocol::verifyManifest(ContentObject &content_object) {
+ ConsumerContentObjectVerificationCallback on_manifest_to_verify = VOID_HANDLER;
+ socket_->getSocketOption(ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY, on_manifest_to_verify);
+
+ bool is_data_secure = false;
+
+ if (on_manifest_to_verify == VOID_HANDLER) {
+ // TODO Perform manifest verification
+ } else if (on_manifest_to_verify(*dynamic_cast<ConsumerSocket *>(socket_), content_object)) {
+ is_data_secure = true;
+ }
+
+ return is_data_secure;
+}
+
+bool VegasTransportProtocol::requireInterestWithHash(const Interest &interest,
+ const ContentObject &content_object,
+ Manifest &manifest) {
+ // TODO Require content object with specific hash.
+ return true;
+}
+
+// TODO Add the name in the digest computation!
+void VegasTransportProtocol::onContentObject(const Interest &interest, ContentObject &content_object) {
+ if (verifyContentObject(interest, content_object)) {
+ checkForFastRetransmission(interest);
+
+ uint64_t segment = interest.getName().get(-1).toSegment();
+
+ if (interest_retransmissions_[segment % default_values::default_buffer_size] == 0) {
+ afterContentReception(interest, content_object);
+ }
+
+ if (content_object.hasFinalChunkNumber()) {
+ is_final_block_number_discovered_ = true;
+ final_block_number_ = content_object.getFinalChunkNumber();
+ }
+
+ bool virtualDownload = false;
+
+ socket_->getSocketOption(VIRTUAL_DOWNLOAD, virtualDownload);
+
+ if (!virtualDownload) {
+ receive_buffer_[segment % default_values::default_buffer_size] = content_object.shared_from_this();
+ reassemble();
+ } else {
+ if (segment == final_block_number_) {
+ portal_->stopEventsLoop();
+ }
+ }
+ }
+}
+
+bool VegasTransportProtocol::verifyContentObject(const Interest &interest, ContentObject &content_object) {
+ // TODO Check content object using manifest
+ return true;
+}
+
+// TODO move inside manifest
+bool VegasTransportProtocol::pointsToManifest(ContentObject &content_object) {
+ // TODO Check content objects using manifest
+ return true;
+}
+
+void VegasTransportProtocol::onTimeout(const Interest &interest) {
+ if (!is_running_) {
+ return;
+ }
+
+ interests_in_flight_--;
+
+ std::cerr << "Timeout on " << interest.getName() << std::endl;
+
+ ConsumerInterestCallback on_interest_timeout = VOID_HANDLER;
+ socket_->getSocketOption(INTEREST_EXPIRED, on_interest_timeout);
+ if (on_interest_timeout != VOID_HANDLER) {
+ on_interest_timeout(*dynamic_cast<ConsumerSocket *>(socket_), const_cast<Interest &>(interest));
+ }
+
+ uint64_t segment = interest.getName().get(-1).toSegment();
+
+ // Do not retransmit interests asking contents that do not exist.
+ if (is_final_block_number_discovered_) {
+ if (segment > final_block_number_) {
+ return;
+ }
+ }
+
+ afterDataUnsatisfied(segment);
+
+ int max_retransmissions;
+ socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_RETRANSMISSION, max_retransmissions);
+
+ if (interest_retransmissions_[segment % default_values::default_buffer_size] < max_retransmissions) {
+
+ ConsumerInterestCallback on_interest_retransmission = VOID_HANDLER;
+ socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_RETRANSMISSION, on_interest_retransmission);
+
+ if (on_interest_retransmission != VOID_HANDLER) {
+ on_interest_retransmission(*dynamic_cast<ConsumerSocket *>(socket_), interest);
+ }
+
+ ConsumerInterestCallback on_interest_output = VOID_HANDLER;
+
+ socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT, on_interest_output);
+ if (on_interest_output != VOID_HANDLER) {
+ on_interest_output(*dynamic_cast<ConsumerSocket *>(socket_), interest);
+ }
+
+ if (!is_running_) {
+ return;
+ }
+
+ //retransmit
+ interests_in_flight_++;
+ interest_retransmissions_[segment % default_values::default_buffer_size]++;
+
+ portal_->sendInterest(interest,
+ bind(&VegasTransportProtocol::onContentSegment, this, _1, _2),
+ bind(&VegasTransportProtocol::onTimeout, this, _1));
+ } else {
+ is_running_ = false;
+
+ bool virtual_download = false;
+ socket_->getSocketOption(VIRTUAL_DOWNLOAD, virtual_download);
+
+ if (!virtual_download) {
+ reassemble();
+ }
+
+ portal_->stopEventsLoop();
+ }
+
+}
+
+void VegasTransportProtocol::copyContent(ContentObject &content_object) {
+ Array a = content_object.getContent();
+
+ content_buffer_.insert(content_buffer_.end(), (uint8_t *) a.data(), (uint8_t *) a.data() + a.size());
+
+ if ((content_object.getName().get(-1).toSegment() == final_block_number_) || (!is_running_)) {
+
+ // return content to the user
+ ConsumerContentCallback on_payload = VOID_HANDLER;
+ socket_->getSocketOption(CONTENT_RETRIEVED, on_payload);
+ if (on_payload != VOID_HANDLER) {
+ on_payload(*dynamic_cast<ConsumerSocket *>(socket_),
+ (uint8_t *) (content_buffer_.data()),
+ content_buffer_.size());
+ }
+
+ //reduce window size to prevent its speculative growth in case when consume() is called in loop
+ int current_window_size = -1;
+ socket_->getSocketOption(CURRENT_WINDOW_SIZE, current_window_size);
+ if ((uint64_t) current_window_size > final_block_number_) {
+ socket_->setSocketOption(CURRENT_WINDOW_SIZE, (int) (final_block_number_));
+ }
+
+ is_running_ = false;
+ portal_->stopEventsLoop();
+ }
+}
+
+void VegasTransportProtocol::reassemble() {
+ uint64_t index = last_reassembled_segment_ % default_values::default_buffer_size;
+
+ while (receive_buffer_[index % default_values::default_buffer_size]) {
+ if (receive_buffer_[index % default_values::default_buffer_size]->getContentType() == PayloadType::DATA) {
+ copyContent(*receive_buffer_[index % default_values::default_buffer_size]);
+ }
+
+ receive_buffer_[index % default_values::default_buffer_size].reset();
+
+ last_reassembled_segment_++;
+ index = last_reassembled_segment_ % default_values::default_buffer_size;
+ }
+}
+
+bool VegasTransportProtocol::verifySegmentUsingManifest(Manifest &manifestSegment, ContentObject &content_object) {
+ // TODO Content object verification exploiting manifest
+ return true;
+}
+
+void VegasTransportProtocol::checkForFastRetransmission(const Interest &interest) {
+ uint64_t segNumber = interest.getName().get(-1).toSegment();
+ received_segments_[segNumber] = true;
+ fast_retransmitted_segments.erase(segNumber);
+
+ uint64_t possibly_lost_segment = 0;
+ uint64_t highest_received_segment = received_segments_.rbegin()->first;
+
+ for (uint64_t i = 0; i <= highest_received_segment; i++) {
+ if (received_segments_.find(i) == received_segments_.end()) {
+ if (fast_retransmitted_segments.find(i) == fast_retransmitted_segments.end()) {
+ possibly_lost_segment = i;
+ uint8_t out_of_order_segments = 0;
+ for (uint64_t j = i; j <= highest_received_segment; j++) {
+ if (received_segments_.find(j) != received_segments_.end()) {
+ out_of_order_segments++;
+ if (out_of_order_segments >= default_values::max_out_of_order_segments) {
+ fast_retransmitted_segments[possibly_lost_segment] = true;
+ fastRetransmit(interest, possibly_lost_segment);
+ }
+ }
+ }
+ }
+ }
+ }
+}
+
+void VegasTransportProtocol::fastRetransmit(const Interest &interest, uint64_t chunk_number) {
+ int max_retransmissions;
+ socket_->getSocketOption(GeneralTransportOptions::MAX_INTEREST_RETX, max_retransmissions);
+
+ if (interest_retransmissions_[chunk_number % default_values::default_buffer_size] < max_retransmissions) {
+ Name name = interest.getName().getPrefix(-1);
+ name.appendSegment(chunk_number);
+
+ std::shared_ptr<Interest> retx_interest = std::make_shared<Interest>(name);
+
+ ConsumerInterestCallback on_interest_retransmission = VOID_HANDLER;
+ socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_RETRANSMISSION, on_interest_retransmission);
+
+ if (on_interest_retransmission != VOID_HANDLER) {
+ on_interest_retransmission(*dynamic_cast<ConsumerSocket *>(socket_), *retx_interest);
+ }
+
+ ConsumerInterestCallback on_interest_output = VOID_HANDLER;
+
+ socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT, on_interest_output);
+ if (on_interest_output != VOID_HANDLER) {
+ on_interest_output(*dynamic_cast<ConsumerSocket *>(socket_), *retx_interest);
+ }
+
+ if (!is_running_) {
+ return;
+ }
+
+ interests_in_flight_++;
+ interest_retransmissions_[chunk_number % default_values::default_buffer_size]++;
+ portal_->sendInterest(*retx_interest,
+ bind(&VegasTransportProtocol::onContentSegment, this, _1, _2),
+ bind(&VegasTransportProtocol::onTimeout, this, _1));
+ }
+}
+
+void VegasTransportProtocol::removeAllPendingInterests() {
+ portal_->clear();
+}
+
+} // namespace icn-interface