aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/hicn/transport
diff options
context:
space:
mode:
Diffstat (limited to 'libtransport/src/hicn/transport')
-rw-r--r--libtransport/src/hicn/transport/interfaces/socket_consumer.cc31
-rw-r--r--libtransport/src/hicn/transport/interfaces/socket_consumer.h29
-rw-r--r--libtransport/src/hicn/transport/interfaces/socket_producer.cc64
-rw-r--r--libtransport/src/hicn/transport/interfaces/socket_producer.h41
-rw-r--r--libtransport/src/hicn/transport/protocols/raaqm.cc2
-rw-r--r--libtransport/src/hicn/transport/protocols/rtc.cc71
-rw-r--r--libtransport/src/hicn/transport/protocols/rtc.h4
7 files changed, 105 insertions, 137 deletions
diff --git a/libtransport/src/hicn/transport/interfaces/socket_consumer.cc b/libtransport/src/hicn/transport/interfaces/socket_consumer.cc
index 14cd27b6b..64b60101d 100644
--- a/libtransport/src/hicn/transport/interfaces/socket_consumer.cc
+++ b/libtransport/src/hicn/transport/interfaces/socket_consumer.cc
@@ -121,37 +121,6 @@ asio::io_service &ConsumerSocket::getIoService() {
return portal_->getIoService();
}
-// If the thread calling lambda_func is not the same of io_service, this
-// function reschedule the function on it
-template <typename Lambda, typename arg2>
-int ConsumerSocket::rescheduleOnIOService(int socket_option_key,
- arg2 socket_option_value,
- Lambda lambda_func) {
- // To enforce type check
- std::function<int(int, arg2)> func = lambda_func;
- int result = SOCKET_OPTION_SET;
- if (transport_protocol_->isRunning()) {
- std::mutex mtx;
- /* Condition variable for the wait */
- std::condition_variable cv;
- bool done = false;
- io_service_.dispatch([&socket_option_key, &socket_option_value, &mtx,
- &result, &done, &func]() {
- std::unique_lock<std::mutex> lck(mtx);
- done = true;
- result = func(socket_option_key, socket_option_value);
- });
- std::unique_lock<std::mutex> lck(mtx);
- if (!done) {
- cv.wait(lck);
- }
- } else {
- result = func(socket_option_key, socket_option_value);
- }
-
- return result;
-}
-
int ConsumerSocket::setSocketOption(int socket_option_key,
ReadCallback *socket_option_value) {
// Reschedule the function on the io_service to avoid race condition in case
diff --git a/libtransport/src/hicn/transport/interfaces/socket_consumer.h b/libtransport/src/hicn/transport/interfaces/socket_consumer.h
index e3620b269..eceee2d34 100644
--- a/libtransport/src/hicn/transport/interfaces/socket_consumer.h
+++ b/libtransport/src/hicn/transport/interfaces/socket_consumer.h
@@ -282,9 +282,36 @@ class ConsumerSocket : public BaseSocket {
ConsumerTimerCallback **socket_option_value);
protected:
+ // If the thread calling lambda_func is not the same of io_service, this
+ // function reschedule the function on it
template <typename Lambda, typename arg2>
int rescheduleOnIOService(int socket_option_key, arg2 socket_option_value,
- Lambda lambda_func);
+ Lambda lambda_func) {
+ // To enforce type check
+ std::function<int(int, arg2)> func = lambda_func;
+ int result = SOCKET_OPTION_SET;
+ if (transport_protocol_->isRunning()) {
+ std::mutex mtx;
+ /* Condition variable for the wait */
+ std::condition_variable cv;
+ bool done = false;
+ io_service_.dispatch([&socket_option_key, &socket_option_value, &mtx, &cv,
+ &result, &done, &func]() {
+ std::unique_lock<std::mutex> lck(mtx);
+ done = true;
+ result = func(socket_option_key, socket_option_value);
+ cv.notify_all();
+ });
+ std::unique_lock<std::mutex> lck(mtx);
+ if (!done) {
+ cv.wait(lck);
+ }
+ } else {
+ result = func(socket_option_key, socket_option_value);
+ }
+
+ return result;
+ }
private:
asio::io_service internal_io_service_;
diff --git a/libtransport/src/hicn/transport/interfaces/socket_producer.cc b/libtransport/src/hicn/transport/interfaces/socket_producer.cc
index f90197490..8f8fc1a79 100644
--- a/libtransport/src/hicn/transport/interfaces/socket_producer.cc
+++ b/libtransport/src/hicn/transport/interfaces/socket_producer.cc
@@ -391,70 +391,6 @@ void ProducerSocket::onInterest(Interest &interest) {
}
}
-// If the thread calling lambda_func is not the same of io_service, this
-// function reschedule the function on it
-template <typename Lambda, typename arg2>
-int ProducerSocket::rescheduleOnIOService(int socket_option_key,
- arg2 socket_option_value,
- Lambda lambda_func) {
- // To enforce type check
- std::function<int(int, arg2)> func = lambda_func;
- int result = SOCKET_OPTION_SET;
- if (listening_thread_.joinable() &&
- std::this_thread::get_id() != listening_thread_.get_id()) {
- std::mutex mtx;
- /* Condition variable for the wait */
- std::condition_variable cv;
- bool done = false;
- io_service_.dispatch([&socket_option_key, &socket_option_value, &mtx,
- &result, &done, &func]() {
- std::unique_lock<std::mutex> lck(mtx);
- done = true;
- result = func(socket_option_key, socket_option_value);
- });
- std::unique_lock<std::mutex> lck(mtx);
- if (!done) {
- cv.wait(lck);
- }
- } else {
- result = func(socket_option_key, socket_option_value);
- }
-
- return result;
-}
-
-// If the thread calling lambda_func is not the same of io_service, this
-// function reschedule the function on it
-template <typename Lambda, typename arg2>
-int ProducerSocket::rescheduleOnIOServiceWithReference(
- int socket_option_key, arg2 &socket_option_value, Lambda lambda_func) {
- // To enforce type check
- std::function<int(int, arg2 &)> func = lambda_func;
- int result = SOCKET_OPTION_SET;
- if (listening_thread_.joinable() &&
- std::this_thread::get_id() != this->listening_thread_.get_id()) {
- std::mutex mtx;
- /* Condition variable for the wait */
- std::condition_variable cv;
- std::unique_lock<std::mutex> lck(mtx);
- bool done = false;
- io_service_.dispatch([this, &socket_option_key, &socket_option_value, &mtx,
- &cv, &result, &done, &func]() {
- std::unique_lock<std::mutex> lck(mtx);
- done = true;
- result = func(socket_option_key, socket_option_value);
-
- if (!done) {
- cv.wait(lck);
- }
- });
- } else {
- result = func(socket_option_key, socket_option_value);
- }
-
- return result;
-}
-
int ProducerSocket::setSocketOption(int socket_option_key,
uint32_t socket_option_value) {
switch (socket_option_key) {
diff --git a/libtransport/src/hicn/transport/interfaces/socket_producer.h b/libtransport/src/hicn/transport/interfaces/socket_producer.h
index 5c617d761..709a2582b 100644
--- a/libtransport/src/hicn/transport/interfaces/socket_producer.h
+++ b/libtransport/src/hicn/transport/interfaces/socket_producer.h
@@ -160,15 +160,6 @@ class ProducerSocket : public Socket<BasePortal>,
virtual int getSocketOption(int socket_option_key,
std::string &socket_option_value);
- template <typename Lambda, typename arg2>
- int rescheduleOnIOService(int socket_option_key, arg2 socket_option_value,
- Lambda lambda_func);
-
- template <typename Lambda, typename arg2>
- int rescheduleOnIOServiceWithReference(int socket_option_key,
- arg2 &socket_option_value,
- Lambda lambda_func);
-
protected:
// Threads
std::thread listening_thread_;
@@ -215,6 +206,38 @@ class ProducerSocket : public Socket<BasePortal>,
ProducerContentCallback on_content_produced_;
+ // If the thread calling lambda_func is not the same of io_service, this
+ // function reschedule the function on it
+ template <typename Lambda, typename arg2>
+ int rescheduleOnIOService(int socket_option_key, arg2 socket_option_value,
+ Lambda lambda_func) {
+ // To enforce type check
+ std::function<int(int, arg2)> func = lambda_func;
+ int result = SOCKET_OPTION_SET;
+ if (listening_thread_.joinable() &&
+ std::this_thread::get_id() != listening_thread_.get_id()) {
+ std::mutex mtx;
+ /* Condition variable for the wait */
+ std::condition_variable cv;
+ bool done = false;
+ io_service_.dispatch([&socket_option_key, &socket_option_value,
+ &mtx, &cv, &result, &done, &func]() {
+ std::unique_lock<std::mutex> lck(mtx);
+ done = true;
+ result = func(socket_option_key, socket_option_value);
+ cv.notify_all();
+ });
+ std::unique_lock<std::mutex> lck(mtx);
+ if (!done) {
+ cv.wait(lck);
+ }
+ } else {
+ result = func(socket_option_key, socket_option_value);
+ }
+
+ return result;
+ }
+
private:
void listen();
diff --git a/libtransport/src/hicn/transport/protocols/raaqm.cc b/libtransport/src/hicn/transport/protocols/raaqm.cc
index c816158f9..779f9a9a1 100644
--- a/libtransport/src/hicn/transport/protocols/raaqm.cc
+++ b/libtransport/src/hicn/transport/protocols/raaqm.cc
@@ -519,7 +519,7 @@ void RaaqmTransportProtocol::onContentReassembled(std::error_code ec) {
interface::ConsumerSocket::ReadCallback *on_payload = VOID_HANDLER;
socket_->getSocketOption(READ_CALLBACK, &on_payload);
- if (on_payload) {
+ if (!on_payload) {
throw errors::RuntimeException(
"The read callback must be installed in the transport before "
"starting "
diff --git a/libtransport/src/hicn/transport/protocols/rtc.cc b/libtransport/src/hicn/transport/protocols/rtc.cc
index f52494aba..1a3511003 100644
--- a/libtransport/src/hicn/transport/protocols/rtc.cc
+++ b/libtransport/src/hicn/transport/protocols/rtc.cc
@@ -34,6 +34,7 @@ RTCTransportProtocol::RTCTransportProtocol(
rtx_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService());
probe_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService());
sentinel_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService());
+ round_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService());
reset();
}
@@ -45,6 +46,8 @@ RTCTransportProtocol::~RTCTransportProtocol() {
int RTCTransportProtocol::start() {
probeRtt();
+ sentinelTimer();
+ newRound();
return TransportProtocol::start();
}
@@ -60,10 +63,11 @@ void RTCTransportProtocol::resume() {
is_running_ = true;
- lastRoundBegin_ = std::chrono::steady_clock::now();
inflightInterestsCount_ = 0;
probeRtt();
+ sentinelTimer();
+ newRound();
scheduleNextInterests();
portal_->runEventsLoop();
@@ -75,7 +79,6 @@ void RTCTransportProtocol::resume() {
void RTCTransportProtocol::reset() {
portal_->setConsumerCallback(this);
// controller var
- lastRoundBegin_ = std::chrono::steady_clock::now();
currentState_ = HICN_RTC_SYNC_STATE;
// cwin var
@@ -143,15 +146,14 @@ uint32_t min(uint32_t a, uint32_t b) {
return b;
}
-void RTCTransportProtocol::checkRound() {
- uint32_t duration =
- (uint32_t)std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now() - lastRoundBegin_)
- .count();
- if (duration >= HICN_ROUND_LEN) {
- lastRoundBegin_ = std::chrono::steady_clock::now();
- updateStats(duration); // update stats and window
- }
+void RTCTransportProtocol::newRound() {
+ round_timer_->expires_from_now(std::chrono::milliseconds(
+ HICN_ROUND_LEN));
+ round_timer_->async_wait([this](std::error_code ec) {
+ if (ec) return;
+ updateStats(HICN_ROUND_LEN);
+ newRound();
+ });
}
void RTCTransportProtocol::updateDelayStats(
@@ -231,11 +233,6 @@ void RTCTransportProtocol::updateStats(uint32_t round_duration) {
pathTable_.find(producerPathLabels_[1]) == pathTable_.end())
return; // this should not happen
- //set sentinel timer if needed
- if(rounds_ == 0){
- sentinelTimer();
- }
-
// as a queuing delay we keep the lowest one among the two paths
// if one path is congested the forwarder should decide to do not
// use it so it does not make sense to inform the application
@@ -449,7 +446,6 @@ void RTCTransportProtocol::sendInterest(Name *interest_name, bool rtx) {
}
void RTCTransportProtocol::scheduleNextInterests() {
- checkRound();
if (!is_running_ && !is_first_) return;
while (inflightInterestsCount_ < currentCWin_) {
@@ -505,29 +501,47 @@ void RTCTransportProtocol::scheduleNextInterests() {
actualSegment_ = (actualSegment_ + 1) % HICN_MIN_PROBE_SEQ;
sendInterest(interest_name, false);
- checkRound();
}
}
void RTCTransportProtocol::sentinelTimer(){
- uint32_t wait = 1;
- if(pathTable_.find(producerPathLabels_[0]) != pathTable_.end()){
- wait = round(
- pathTable_[producerPathLabels_[0]]->getInterArrivalGap());
+ uint32_t wait = 10;
+
+ if(pathTable_.find(producerPathLabels_[0]) != pathTable_.end() &&
+ pathTable_.find(producerPathLabels_[1]) != pathTable_.end()){
+ //we have all the info to set the timers
+ wait = round(pathTable_[producerPathLabels_[0]]->getInterArrivalGap());
+ if(wait == 0)
+ wait = 1;
}
- if(wait == 0)
- wait = 1;
sentinel_timer_->expires_from_now(std::chrono::milliseconds(wait));
sentinel_timer_->async_wait([this](std::error_code ec) {
+
if (ec) return;
- uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
+
+ uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now().time_since_epoch())
.count();
- if(pathTable_.find(producerPathLabels_[0]) != pathTable_.end() ||
- pathTable_.find(producerPathLabels_[1]) != pathTable_.end()){
+ if(pathTable_.find(producerPathLabels_[0]) == pathTable_.end() ||
+ pathTable_.find(producerPathLabels_[1]) == pathTable_.end()){
+ //we have no info, so we send again
+ for(auto it = packets_in_window_.begin();
+ it != packets_in_window_.end(); it++){
+ uint32_t pkt = it->first & modMask_;
+ if (inflightInterests_[pkt].sequence == it->first) {
+ inflightInterests_[pkt].transmissionTime = now;
+ Name *interest_name = nullptr;
+ socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME,
+ &interest_name);
+ interest_name->setSuffix(it->first);
+ it->second++;
+ sendInterest(interest_name, true);
+ }
+ }
+ }else{
uint64_t max_waiting_time =
round((pathTable_[producerPathLabels_[1]]->getMinRtt() -
pathTable_[producerPathLabels_[0]]->getMinRtt()) +
@@ -554,12 +568,11 @@ void RTCTransportProtocol::sentinelTimer(){
}
}
}
- }//esle not enough info to resend the packet, schedule the timer agian
+ }
sentinelTimer();
});
}
-
void RTCTransportProtocol::addRetransmissions(uint32_t val) {
// add only val in the rtx list
addRetransmissions(val, val + 1);
diff --git a/libtransport/src/hicn/transport/protocols/rtc.h b/libtransport/src/hicn/transport/protocols/rtc.h
index 908be017a..46063d041 100644
--- a/libtransport/src/hicn/transport/protocols/rtc.h
+++ b/libtransport/src/hicn/transport/protocols/rtc.h
@@ -108,7 +108,6 @@ class RTCTransportProtocol : public TransportProtocol, public Reassembly {
private:
// algo functions
void reset() override;
- void checkRound();
// CC functions
void updateDelayStats(const ContentObject &content_object);
@@ -129,6 +128,7 @@ class RTCTransportProtocol : public TransportProtocol, public Reassembly {
uint64_t retransmit();
void checkRtx();
void probeRtt();
+ void newRound();
void onTimeout(Interest::Ptr &&interest) override;
bool onNack(const ContentObject &content_object, bool rtx);
void onContentObject(Interest::Ptr &&interest,
@@ -141,7 +141,7 @@ class RTCTransportProtocol : public TransportProtocol, public Reassembly {
}
// controller var
- std::chrono::steady_clock::time_point lastRoundBegin_;
+ std::unique_ptr<asio::steady_timer> round_timer_;
unsigned currentState_;
// cwin var