aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/hicn/transport/protocols/rtc.cc
diff options
context:
space:
mode:
authorMauro Sardara <msardara@cisco.com>2020-02-07 20:00:06 +0100
committerMauro Sardara <msardara@cisco.com>2020-02-12 18:40:52 +0100
commit3bce9bfdce707313de4f9cccdc867abd9edf82df (patch)
treebd7d75a7251888a3fc269fadebd59842c46a14a1 /libtransport/src/hicn/transport/protocols/rtc.cc
parentf9243a2bf823086404be1c41c7bcc1b27cfab7de (diff)
[HICN-508] [HICN-509] [HICN-506] Manifest rework
Change-Id: I992205148910be008d66b5acb7f6f1365770f9e8 Signed-off-by: Mauro Sardara <msardara@cisco.com>
Diffstat (limited to 'libtransport/src/hicn/transport/protocols/rtc.cc')
-rw-r--r--libtransport/src/hicn/transport/protocols/rtc.cc210
1 files changed, 80 insertions, 130 deletions
diff --git a/libtransport/src/hicn/transport/protocols/rtc.cc b/libtransport/src/hicn/transport/protocols/rtc.cc
index 559e86592..e371217f8 100644
--- a/libtransport/src/hicn/transport/protocols/rtc.cc
+++ b/libtransport/src/hicn/transport/protocols/rtc.cc
@@ -13,11 +13,12 @@
* limitations under the License.
*/
-#include <math.h>
-#include <random>
+#include <hicn/transport/protocols/rtc.h>
#include <hicn/transport/interfaces/socket_consumer.h>
-#include <hicn/transport/protocols/rtc.h>
+
+#include <math.h>
+#include <random>
namespace transport {
@@ -26,14 +27,16 @@ namespace protocol {
using namespace interface;
RTCTransportProtocol::RTCTransportProtocol(
- interface::ConsumerSocket *icnet_socket)
- : TransportProtocol(icnet_socket),
+ interface::ConsumerSocket *icn_socket)
+ : TransportProtocol(icn_socket, nullptr),
+ DatagramReassembly(icn_socket, this),
inflightInterests_(1 << default_values::log_2_default_buffer_size),
modMask_((1 << default_values::log_2_default_buffer_size) - 1) {
- icnet_socket->getSocketOption(PORTAL, portal_);
+ icn_socket->getSocketOption(PORTAL, portal_);
rtx_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService());
probe_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService());
- sentinel_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService());
+ sentinel_timer_ =
+ std::make_unique<asio::steady_timer>(portal_->getIoService());
round_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService());
reset();
}
@@ -147,8 +150,7 @@ uint32_t min(uint32_t a, uint32_t b) {
}
void RTCTransportProtocol::newRound() {
- round_timer_->expires_from_now(std::chrono::milliseconds(
- HICN_ROUND_LEN));
+ round_timer_->expires_from_now(std::chrono::milliseconds(HICN_ROUND_LEN));
round_timer_->async_wait([this](std::error_code ec) {
if (ec) return;
updateStats(HICN_ROUND_LEN);
@@ -281,10 +283,10 @@ void RTCTransportProtocol::updateStats(uint32_t round_duration) {
&stats_callback);
if (*stats_callback) {
// Send the stats to the app
- stats_.updateQueuingDelay(queuingDelay_);
- stats_.updateLossRatio(lossRate_);
- stats_.updateAverageRtt(pathTable_[producerPathLabels_[1]]->getMinRtt());
- (*stats_callback)(*socket_, stats_);
+ stats_->updateQueuingDelay(queuingDelay_);
+ stats_->updateLossRatio(lossRate_);
+ stats_->updateAverageRtt(pathTable_[producerPathLabels_[1]]->getMinRtt());
+ (*stats_callback)(*socket_, *stats_);
}
// bound also by interest lifitime* production rate
@@ -301,9 +303,9 @@ void RTCTransportProtocol::updateStats(uint32_t round_duration) {
updateCCState();
updateWindow();
- if(queuingDelay_ > 25.0){
- //this indicates that the client will go soon out of synch,
- //switch to synch mode
+ if (queuingDelay_ > 25.0) {
+ // this indicates that the client will go soon out of synch,
+ // switch to synch mode
if (currentState_ == HICN_RTC_NORMAL_STATE) {
currentState_ = HICN_RTC_SYNC_STATE;
}
@@ -358,8 +360,7 @@ void RTCTransportProtocol::computeMaxWindow(uint32_t productionRate,
maxCWin_ = min(maxWaintingInterest, maxCWin_);
}
- if(maxCWin_ < HICN_MIN_CWIN)
- maxCWin_ = HICN_MIN_CWIN;
+ if (maxCWin_ < HICN_MIN_CWIN) maxCWin_ = HICN_MIN_CWIN;
}
void RTCTransportProtocol::updateWindow() {
@@ -518,68 +519,64 @@ void RTCTransportProtocol::scheduleNextInterests() {
}
}
-void RTCTransportProtocol::sentinelTimer(){
+void RTCTransportProtocol::sentinelTimer() {
uint32_t wait = 50;
- if(pathTable_.find(producerPathLabels_[0]) != pathTable_.end() &&
- pathTable_.find(producerPathLabels_[1]) != pathTable_.end()){
- //we have all the info to set the timers
+ if (pathTable_.find(producerPathLabels_[0]) != pathTable_.end() &&
+ pathTable_.find(producerPathLabels_[1]) != pathTable_.end()) {
+ // we have all the info to set the timers
wait = round(pathTable_[producerPathLabels_[0]]->getInterArrivalGap());
- if(wait == 0)
- wait = 1;
+ if (wait == 0) wait = 1;
}
sentinel_timer_->expires_from_now(std::chrono::milliseconds(wait));
sentinel_timer_->async_wait([this](std::error_code ec) {
-
if (ec) return;
- uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
-
- if(pathTable_.find(producerPathLabels_[0]) == pathTable_.end() ||
- pathTable_.find(producerPathLabels_[1]) == pathTable_.end()){
- //we have no info, so we send again
-
- for(auto it = packets_in_window_.begin();
- it != packets_in_window_.end(); it++){
- uint32_t pkt = it->first & modMask_;
- if (inflightInterests_[pkt].sequence == it->first) {
- inflightInterests_[pkt].transmissionTime = now;
- Name *interest_name = nullptr;
- socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
- &interest_name);
- interest_name->setSuffix(it->first);
- it->second++;
- sendInterest(interest_name, true);
- }
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+
+ if (pathTable_.find(producerPathLabels_[0]) == pathTable_.end() ||
+ pathTable_.find(producerPathLabels_[1]) == pathTable_.end()) {
+ // we have no info, so we send again
+
+ for (auto it = packets_in_window_.begin(); it != packets_in_window_.end();
+ it++) {
+ uint32_t pkt = it->first & modMask_;
+ if (inflightInterests_[pkt].sequence == it->first) {
+ inflightInterests_[pkt].transmissionTime = now;
+ Name *interest_name = nullptr;
+ socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
+ &interest_name);
+ interest_name->setSuffix(it->first);
+ it->second++;
+ sendInterest(interest_name, true);
}
- }else{
- uint64_t max_waiting_time = //wait at least 50ms
- (pathTable_[producerPathLabels_[1]]->getMinRtt() -
- pathTable_[producerPathLabels_[0]]->getMinRtt()) +
- (ceil(pathTable_[producerPathLabels_[0]]->getInterArrivalGap()) * 50);
+ }
+ } else {
+ uint64_t max_waiting_time = // wait at least 50ms
+ (pathTable_[producerPathLabels_[1]]->getMinRtt() -
+ pathTable_[producerPathLabels_[0]]->getMinRtt()) +
+ (ceil(pathTable_[producerPathLabels_[0]]->getInterArrivalGap()) * 50);
- if((currentState_ == HICN_RTC_NORMAL_STATE) &&
+ if ((currentState_ == HICN_RTC_NORMAL_STATE) &&
(inflightInterestsCount_ >= currentCWin_) &&
- ((now - lastEvent_) > max_waiting_time) &&
- (lossRate_ >= 0.05)){
+ ((now - lastEvent_) > max_waiting_time) && (lossRate_ >= 0.05)) {
+ uint64_t RTT = pathTable_[producerPathLabels_[1]]->getMinRtt();
- uint64_t RTT = pathTable_[producerPathLabels_[1]]->getMinRtt();
-
- for(auto it = packets_in_window_.begin();
- it != packets_in_window_.end(); it++){
+ for (auto it = packets_in_window_.begin();
+ it != packets_in_window_.end(); it++) {
uint32_t pkt = it->first & modMask_;
if (inflightInterests_[pkt].sequence == it->first &&
- ((now - inflightInterests_[pkt].transmissionTime) >= RTT)){
- inflightInterests_[pkt].transmissionTime = now;
- Name *interest_name = nullptr;
- socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
- &interest_name);
- interest_name->setSuffix(it->first);
- it->second++;
- sendInterest(interest_name, true);
+ ((now - inflightInterests_[pkt].transmissionTime) >= RTT)) {
+ inflightInterests_[pkt].transmissionTime = now;
+ Name *interest_name = nullptr;
+ socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
+ &interest_name);
+ interest_name->setSuffix(it->first);
+ it->second++;
+ sendInterest(interest_name, true);
}
}
}
@@ -754,8 +751,8 @@ void RTCTransportProtocol::onTimeout(Interest::Ptr &&interest) {
// and over until we get at least a packet
inflightInterestsCount_--;
lastEvent_ = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
packets_in_window_.erase(segmentNumber);
scheduleNextInterests();
return;
@@ -763,8 +760,8 @@ void RTCTransportProtocol::onTimeout(Interest::Ptr &&interest) {
if (inflightInterests_[pkt].state == sent_) {
lastEvent_ = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
packets_in_window_.erase(segmentNumber);
inflightInterestsCount_--;
}
@@ -890,30 +887,29 @@ void RTCTransportProtocol::onContentObject(
return;
}
- //check if the packet is a rtx
+ // check if the packet is a rtx
bool is_rtx = false;
- if(interestRetransmissions_.find(segmentNumber) !=
- interestRetransmissions_.end()){
+ if (interestRetransmissions_.find(segmentNumber) !=
+ interestRetransmissions_.end()) {
is_rtx = true;
- }else{
+ } else {
auto it_win = packets_in_window_.find(segmentNumber);
- if(it_win != packets_in_window_.end() &&
- it_win->second != 0)
- is_rtx = true;
+ if (it_win != packets_in_window_.end() && it_win->second != 0)
+ is_rtx = true;
}
if (payload_size == HICN_NACK_HEADER_SIZE) {
if (inflightInterests_[pkt].state == sent_) {
lastEvent_ = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
packets_in_window_.erase(segmentNumber);
inflightInterestsCount_--;
}
bool old_nack = false;
- if (!is_rtx){
+ if (!is_rtx) {
// this is not a retransmitted packet
old_nack = onNack(*content_object, false);
updateDelayStats(*content_object);
@@ -924,8 +920,8 @@ void RTCTransportProtocol::onContentObject(
// the nacked_ state is used only to avoid to decrease
// inflightInterestsCount_ multiple times. In fact, every time that we
// receive an event related to an interest (timeout, nacked, content) we
- // cange the state. In this way we are sure that we do not decrease twice the
- // counter
+ // cange the state. In this way we are sure that we do not decrease twice
+ // the counter
if (old_nack) {
inflightInterests_[pkt].state = lost_;
interestRetransmissions_.erase(segmentNumber);
@@ -942,13 +938,13 @@ void RTCTransportProtocol::onContentObject(
if (inflightInterests_[pkt].state == sent_) {
lastEvent_ = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now().time_since_epoch())
- .count();
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
packets_in_window_.erase(segmentNumber);
inflightInterestsCount_--; // packet sent without timeouts
}
- if (inflightInterests_[pkt].state == sent_ && !is_rtx){
+ if (inflightInterests_[pkt].state == sent_ && !is_rtx) {
// delay stats are computed only for non retransmitted data
updateDelayStats(*content_object);
}
@@ -979,52 +975,6 @@ void RTCTransportProtocol::onContentObject(
scheduleNextInterests();
}
-void RTCTransportProtocol::returnContentToApplication(
- const ContentObject &content_object) {
- // return content to the user
- auto read_buffer = content_object.getPayload();
-
- read_buffer->trimStart(HICN_TIMESTAMP_SIZE);
-
- interface::ConsumerSocket::ReadCallback *read_callback = nullptr;
- socket_->getSocketOption(READ_CALLBACK, &read_callback);
-
- if (read_callback == nullptr) {
- throw errors::RuntimeException(
- "The read callback must be installed in the transport before starting "
- "the content retrieval.");
- }
-
- if (read_callback->isBufferMovable()) {
- read_callback->readBufferAvailable(
- utils::MemBuf::copyBuffer(read_buffer->data(), read_buffer->length()));
- } else {
- // The buffer will be copied into the application-provided buffer
- uint8_t *buffer;
- std::size_t length;
- std::size_t total_length = read_buffer->length();
-
- while (read_buffer->length()) {
- buffer = nullptr;
- length = 0;
- read_callback->getReadBuffer(&buffer, &length);
-
- if (!buffer || !length) {
- throw errors::RuntimeException(
- "Invalid buffer provided by the application.");
- }
-
- auto to_copy = std::min(read_buffer->length(), length);
-
- std::memcpy(buffer, read_buffer->data(), to_copy);
- read_buffer->trimStart(to_copy);
- }
-
- read_callback->readDataAvailable(total_length);
- read_buffer->clear();
- }
-}
-
} // end namespace protocol
} // end namespace transport