diff options
Diffstat (limited to 'libtransport')
5 files changed, 93 insertions, 99 deletions
diff --git a/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.cc b/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.cc index 05cabc60d..0c3fd76cf 100644 --- a/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.cc +++ b/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.cc @@ -29,7 +29,6 @@ ManifestIndexManager::ManifestIndexManager( interface::ConsumerSocket *icn_socket, TransportProtocol *next_interest) : IncrementalIndexManager(icn_socket), PacketManager<Interest>(1024), - manifests_in_flight_(0), next_reassembly_segment_(suffix_queue_.end()), next_to_retrieve_segment_(suffix_queue_.end()), suffix_manifest_(core::NextSegmentCalculationStrategy::INCREMENTAL, 0), @@ -39,7 +38,6 @@ bool ManifestIndexManager::onManifest( core::ContentObject::Ptr &&content_object) { auto manifest = std::make_unique<ContentObjectManifest>(std::move(*content_object)); - bool manifest_verified = verification_manager_->onPacketToVerify(*manifest); if (manifest_verified) { @@ -54,14 +52,12 @@ bool ManifestIndexManager::onManifest( case core::ManifestType::INLINE_MANIFEST: { auto _it = manifest->getSuffixList().begin(); auto _end = --manifest->getSuffixList().end(); + final_suffix_ = manifest->getFinalBlockNumber(); // final block number if (TRANSPORT_EXPECT_FALSE(manifest->isFinalManifest())) { _end++; } - // Get final block number - final_suffix_ = manifest->getFinalBlockNumber(); - suffix_hash_map_[_it->first] = std::make_pair(std::vector<uint8_t>(_it->second, _it->second + 32), manifest->getHashAlgorithm()); @@ -93,62 +89,13 @@ bool ManifestIndexManager::onManifest( 1); suffix_manifest_.setSuffixStrategy( manifest->getNextSegmentCalculationStrategy()); - } else if (manifests_in_flight_) { - manifests_in_flight_--; } - if (TRANSPORT_EXPECT_FALSE(manifest->isFinalManifest() || - suffix_manifest_.getSuffix() > - final_suffix_)) { - break; + if (TRANSPORT_EXPECT_FALSE(manifest->isFinalManifest()) == 0) { + fillWindow(manifest->getWritableName(), + manifest->getName().getSuffix()); } - // Get current window size - double current_window_size = 0.; - socket_->getSocketOption(GeneralTransportOptions::CURRENT_WINDOW_SIZE, - current_window_size); - - // Get portal - std::shared_ptr<interface::BasePortal> portal; - socket_->getSocketOption(GeneralTransportOptions::PORTAL, portal); - - // Number of segments in manifest - std::size_t segment_count = 0; - - // Manifest namespace - Name &name = manifest->getWritableName(); - - if (manifests_in_flight_ >= MAX_MANIFESTS_IN_FLIGHT) { - break; - } - - // Send as many manifest as required for filling window. - do { - segment_count += suffix_manifest_.getNbSegments(); - suffix_manifest_++; - - Interest::Ptr interest = getPacket(); - name.setSuffix(suffix_manifest_.getSuffix()); - interest->setName(name); - - uint32_t interest_lifetime; - socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, - interest_lifetime); - interest->setLifetime(interest_lifetime); - - // Send requests for manifest out of the congestion window (no - // in_flight_interests++) - portal->sendInterest( - std::move(interest), - std::bind(&ManifestIndexManager::onManifestReceived, this, - std::placeholders::_1, std::placeholders::_2), - std::bind(&ManifestIndexManager::onManifestTimeout, this, - std::placeholders::_1)); - manifests_in_flight_++; - } while (segment_count < current_window_size && - suffix_manifest_.getSuffix() < final_suffix_ && - manifests_in_flight_ < MAX_MANIFESTS_IN_FLIGHT); - break; } case core::ManifestType::FLIC_MANIFEST: { @@ -193,6 +140,56 @@ void ManifestIndexManager::onManifestTimeout(Interest::Ptr &&i) { std::placeholders::_1)); } +void ManifestIndexManager::fillWindow(Name &name, uint32_t current_manifest) { + /* Send as many manifest as required for filling window. */ + uint32_t interest_lifetime; + double window_size; + std::shared_ptr<interface::BasePortal> portal; + Interest::Ptr interest; + uint32_t current_segment = *next_to_retrieve_segment_; + // suffix_manifest_ now points to the next manifest to request + uint32_t last_requested_manifest = (suffix_manifest_++).getSuffix(); + + socket_->getSocketOption(GeneralTransportOptions::PORTAL, portal); + socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, + interest_lifetime); + socket_->getSocketOption(GeneralTransportOptions::CURRENT_WINDOW_SIZE, + window_size); + + if (TRANSPORT_EXPECT_FALSE(suffix_manifest_.getSuffix() >= final_suffix_)) { + suffix_manifest_.updateSuffix(last_requested_manifest); + return; + } + + if (current_segment + window_size < suffix_manifest_.getSuffix() && + current_manifest != last_requested_manifest) { + suffix_manifest_.updateSuffix(last_requested_manifest); + return; + } + + do { + interest = getPacket(); + name.setSuffix(suffix_manifest_.getSuffix()); + interest->setName(name); + interest->setLifetime(interest_lifetime); + + // Send interests for manifest out of the congestion window (no + // in_flight_interests++) + portal->sendInterest( + std::move(interest), + std::bind(&ManifestIndexManager::onManifestReceived, this, + std::placeholders::_1, std::placeholders::_2), + std::bind(&ManifestIndexManager::onManifestTimeout, this, + std::placeholders::_1)); + + last_requested_manifest = (suffix_manifest_++).getSuffix(); + } while (current_segment + window_size >= suffix_manifest_.getSuffix() && + suffix_manifest_.getSuffix() < final_suffix_); + + // suffix_manifest_ now points to the last requested manifest + suffix_manifest_.updateSuffix(last_requested_manifest); +} + bool ManifestIndexManager::onContentObject( const core::ContentObject &content_object) { bool verify_signature; diff --git a/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.h b/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.h index 74c86eb60..cb88940d5 100644 --- a/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.h +++ b/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.h @@ -21,9 +21,6 @@ #include <list> -/* #define MAX_MANIFESTS_IN_FLIGHT std::numeric_limits<uint32_t>::max() */ -#define MAX_MANIFESTS_IN_FLIGHT 10 - namespace transport { namespace protocol { @@ -58,9 +55,9 @@ class ManifestIndexManager : public IncrementalIndexManager, private: void onManifestReceived(Interest::Ptr &&i, ContentObject::Ptr &&c); void onManifestTimeout(Interest::Ptr &&i); + void fillWindow(Name &name, uint32_t current_manifest); protected: - uint32_t manifests_in_flight_; SuffixQueue suffix_queue_; SuffixQueue::iterator next_reassembly_segment_; SuffixQueue::iterator next_to_retrieve_segment_; diff --git a/libtransport/src/hicn/transport/protocols/rtc.cc b/libtransport/src/hicn/transport/protocols/rtc.cc index aeee48bac..4d8e4c514 100644 --- a/libtransport/src/hicn/transport/protocols/rtc.cc +++ b/libtransport/src/hicn/transport/protocols/rtc.cc @@ -300,6 +300,16 @@ void RTCTransportProtocol::updateStats(uint32_t round_duration) { updateCCState(); updateWindow(); + if(queuingDelay_ > 100.0){ + //this indicates that the client will go soon out of synch, + //switch to synch mode + if (currentState_ == HICN_RTC_NORMAL_STATE) { + currentState_ = HICN_RTC_SYNC_STATE; + } + computeMaxWindow(BW, 0); + increaseWindow(); + } + // in any case we reset all the counters gotNack_ = false; @@ -351,7 +361,7 @@ void RTCTransportProtocol::computeMaxWindow(uint32_t productionRate, void RTCTransportProtocol::updateWindow() { if (currentState_ == HICN_RTC_SYNC_STATE) return; - if (currentCWin_ < maxCWin_ * 0.7) { + if (currentCWin_ < maxCWin_ * 0.9) { currentCWin_ = min(maxCWin_, (uint32_t)(currentCWin_ * HICN_WIN_INCREASE_FACTOR)); } else if (currentCWin_ > maxCWin_) { @@ -378,7 +388,7 @@ void RTCTransportProtocol::increaseWindow() { if (currentState_ == HICN_RTC_NORMAL_STATE) return; // we need to be carefull to do not increase the window to much - if (currentCWin_ < ((double)maxCWin_ * 0.5)) { + if (currentCWin_ < ((double)maxCWin_ * 0.7)) { currentCWin_ = currentCWin_ + 1; // exponential } else { currentCWin_ = min( @@ -542,14 +552,15 @@ void RTCTransportProtocol::sentinelTimer(){ } } }else{ - uint64_t max_waiting_time = - round((pathTable_[producerPathLabels_[1]]->getMinRtt() - + uint64_t max_waiting_time = //wait at least 50ms + (pathTable_[producerPathLabels_[1]]->getMinRtt() - pathTable_[producerPathLabels_[0]]->getMinRtt()) + - (pathTable_[producerPathLabels_[0]]->getInterArrivalGap() * 10)); + (ceil(pathTable_[producerPathLabels_[0]]->getInterArrivalGap()) * 50); if((currentState_ == HICN_RTC_NORMAL_STATE) && (inflightInterestsCount_ >= currentCWin_) && - ((now - lastEvent_) > max_waiting_time)){ + ((now - lastEvent_) > max_waiting_time) && + (lossRate_ > 10.0)){ uint64_t RTT = pathTable_[producerPathLabels_[1]]->getMinRtt(); @@ -859,8 +870,8 @@ void RTCTransportProtocol::onContentObject( uint32_t pathLabel = content_object->getPathLabel(); if (pathTable_.find(pathLabel) == pathTable_.end()) { - // if this path does not exists we cannot create a new one so drop - return; + std::shared_ptr<RTCDataPath> newPath = std::make_shared<RTCDataPath>(); + pathTable_[pathLabel] = newPath; } // this is the expected probe, update the RTT and drop the packet diff --git a/libtransport/src/hicn/transport/protocols/rtc_data_path.h b/libtransport/src/hicn/transport/protocols/rtc_data_path.h index c8a049368..48a67c525 100644 --- a/libtransport/src/hicn/transport/protocols/rtc_data_path.h +++ b/libtransport/src/hicn/transport/protocols/rtc_data_path.h @@ -20,7 +20,7 @@ #include <climits> #define ALPHA_RTC 0.125 -#define HISTORY_LEN 30 +#define HISTORY_LEN 20 //4 sec namespace transport { diff --git a/libtransport/src/hicn/transport/utils/suffix_strategy.h b/libtransport/src/hicn/transport/utils/suffix_strategy.h index 4358d12f0..3014855f6 100644 --- a/libtransport/src/hicn/transport/utils/suffix_strategy.h +++ b/libtransport/src/hicn/transport/utils/suffix_strategy.h @@ -38,8 +38,6 @@ class SuffixStrategy { std::uint32_t getSuffix() { return suffix_; } - virtual std::uint32_t getNextSuffix() = 0; - void updateSuffix(std::uint32_t new_suffix) { suffix_ = new_suffix; } std::size_t getNbSegments() { return nb_segments_; } @@ -57,6 +55,7 @@ class SuffixStrategy { transport::core::NextSegmentCalculationStrategy suffix_stragegy_; std::uint32_t suffix_; std::size_t nb_segments_; + virtual std::uint32_t getNextSuffix() = 0; }; class SuffixManifest : public SuffixStrategy { @@ -66,27 +65,22 @@ class SuffixManifest : public SuffixStrategy { std::uint32_t start_offset) : SuffixStrategy(suffix_stragegy, start_offset) {} - std::uint32_t getNextSuffix(); - SuffixManifest operator++() { - uint32_t next_suffix = getNextSuffix(); - updateSuffix(next_suffix); - return SuffixManifest(suffix_stragegy_, next_suffix); + updateSuffix(getNextSuffix()); + SuffixManifest temp_suffix(suffix_stragegy_, suffix_); + temp_suffix.setNbSegments(getNbSegments()); + return temp_suffix; } SuffixManifest operator++(int) { SuffixManifest temp_suffix(suffix_stragegy_, suffix_); - uint32_t next_suffix = getNextSuffix(); - updateSuffix(next_suffix); + temp_suffix.setNbSegments(getNbSegments()); + updateSuffix(getNextSuffix()); return temp_suffix; } - SuffixManifest operator+(uint32_t shift) { - for (uint32_t i = 0; i < shift; i++) { - updateSuffix(getNextSuffix()); - } - return SuffixManifest(suffix_stragegy_, getSuffix()); - } + protected: + std::uint32_t getNextSuffix(); }; class SuffixContent : public SuffixStrategy { @@ -101,28 +95,22 @@ class SuffixContent : public SuffixStrategy { std::uint32_t start_offset) : SuffixContent(suffix_stragegy, start_offset, false) {} - std::uint32_t getNextSuffix(); - SuffixContent operator++() { - uint32_t next_suffix = getNextSuffix(); - updateSuffix(next_suffix); - return SuffixContent(suffix_stragegy_, next_suffix, making_manifest_); + updateSuffix(getNextSuffix()); + SuffixContent temp_suffix(suffix_stragegy_, suffix_, making_manifest_); + temp_suffix.setNbSegments(getNbSegments()); + temp_suffix.content_counter_ = content_counter_; + return temp_suffix; } SuffixContent operator++(int) { SuffixContent temp_suffix(suffix_stragegy_, suffix_, making_manifest_); - uint32_t next_suffix = getNextSuffix(); - updateSuffix(next_suffix); + temp_suffix.setNbSegments(getNbSegments()); + temp_suffix.content_counter_ = content_counter_; + updateSuffix(getNextSuffix()); return temp_suffix; } - SuffixContent operator+(uint32_t shift) { - for (uint32_t i = 0; i < shift; i++) { - updateSuffix(getNextSuffix()); - } - return SuffixContent(suffix_stragegy_, getSuffix(), making_manifest_); - } - void setUsingManifest(bool value) { making_manifest_ = value; } void reset(std::uint32_t reset_suffix) { @@ -135,5 +123,6 @@ class SuffixContent : public SuffixStrategy { /* content_counter_ keeps track of the number of segments */ /* between two manifests */ uint32_t content_counter_; + std::uint32_t getNextSuffix(); }; } // namespace utils |