aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--hicn-plugin/src/faces/app/face_prod.c40
-rw-r--r--libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc15
-rw-r--r--libtransport/src/hicn/transport/protocols/rtc.cc66
-rw-r--r--libtransport/src/hicn/transport/protocols/rtc.h7
-rw-r--r--utils/src/hiperf.cc8
5 files changed, 90 insertions, 46 deletions
diff --git a/hicn-plugin/src/faces/app/face_prod.c b/hicn-plugin/src/faces/app/face_prod.c
index 834b35da9..7e6117b84 100644
--- a/hicn-plugin/src/faces/app/face_prod.c
+++ b/hicn-plugin/src/faces/app/face_prod.c
@@ -131,7 +131,8 @@ hicn_face_prod_add (hicn_prefix_t * prefix, u32 sw_if, u32 * cs_reserved,
hicn_main_t *hm = &hicn_main;
- ip46_address_t app_ip;
+ ip46_address_t local_app_ip;
+ ip46_address_t remote_app_ip;
u32 if_flags = 0;
if (!hm->is_enabled)
@@ -195,7 +196,7 @@ hicn_face_prod_add (hicn_prefix_t * prefix, u32 sw_if, u32 * cs_reserved,
*/
face->shared.flags &= HICN_FACE_FLAGS_DELETED;
hicn_face_prod_t *prod_face = (hicn_face_prod_t *) face->data;
- app_ip = prod_face->ip_face.local_addr;
+ local_app_ip = prod_face->ip_face.local_addr;
}
}
else
@@ -207,31 +208,38 @@ hicn_face_prod_add (hicn_prefix_t * prefix, u32 sw_if, u32 * cs_reserved,
* Otherwise retrieve an ip address to assign as a
* local ip addr.
*/
- ip4_address_t app_ip4 = get_ip4_address ();
+ ip4_address_t local_app_ip4;
+ ip4_address_t remote_app_ip4;
+ get_two_ip4_addresses (&local_app_ip4, &remote_app_ip4);
ip4_add_del_interface_address (vm,
sw_if,
- &app_ip4, 32, 0 /* is_del */ );
- app_ip = to_ip46 ( /* isv6 */ 0, app_ip4.as_u8);
+ &local_app_ip4, 31, 0 /* is_del */ );
+ local_app_ip = to_ip46 ( /* isv6 */ 0, local_app_ip4.as_u8);
+ remote_app_ip = to_ip46 ( /* isv6 */ 0, remote_app_ip4.as_u8);
+
+ ret =
+ hicn_face_ip_add (&local_app_ip, &remote_app_ip, sw_if, faceid);
}
else
{
- ip6_address_t app_ip6 = get_ip6_address ();
+ ip6_address_t local_app_ip6;
+ ip6_address_t remote_app_ip6;
+ get_two_ip6_addresses (&local_app_ip6, &remote_app_ip6);
u8 *s0;
- s0 = format (0, "Prefix %U", format_ip6_address, &app_ip6);
+ s0 = format (0, "Prefix %U", format_ip6_address, &local_app_ip6);
vlib_cli_output (vm, "Setting ip address %s\n", s0);
ip6_add_del_interface_address (vm,
sw_if,
- &app_ip6, 128, 0 /* is_del */ );
- app_ip = to_ip46 ( /* isv6 */ 1, app_ip6.as_u8);
- }
+ &local_app_ip6, 127,
+ 0 /* is_del */ );
+ local_app_ip = to_ip46 ( /* isv6 */ 1, local_app_ip6.as_u8);
+ remote_app_ip = to_ip46 ( /* isv6 */ 1, remote_app_ip6.as_u8);
- /*
- * Special case: the nh_addr in the face is the appif ip
- * address
- */
- ret = hicn_face_ip_add (&app_ip, &(prefix->name), sw_if, faceid);
+ ret =
+ hicn_face_ip_add (&local_app_ip, &remote_app_ip, sw_if, faceid);
+ }
face = hicn_dpoi_get_from_idx (*faceid);
@@ -259,7 +267,7 @@ hicn_face_prod_add (hicn_prefix_t * prefix, u32 sw_if, u32 * cs_reserved,
ret = hicn_route_add (faceid, 1, &(prefix->name), prefix->len);
}
- *prod_addr = app_ip;
+ *prod_addr = local_app_ip;
/* Cleanup in case of something went wrong. */
if (ret)
diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc
index 67fcc83e3..cea421703 100644
--- a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc
+++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc
@@ -145,9 +145,14 @@ void RTCProducerSocket::produce(const uint8_t *buf, size_t buffer_size) {
payload->append(buffer_size + TIMESTAMP_LEN);
content_object.appendPayload(std::move(payload));
- content_object.setLifetime(1000); // XXX this should be set by the APP
+ content_object.setLifetime(500); // XXX this should be set by the APP
content_object.setPathLabel(prodLabel_);
+
+ if (on_content_object_output_ != VOID_HANDLER) {
+ on_content_object_output_(*this, content_object);
+ }
+
portal_->sendContentObject(content_object);
currentSeg_++;
@@ -178,8 +183,7 @@ void RTCProducerSocket::onInterest(Interest::Ptr &&interest) {
max_gap = (uint32_t)floor(
(double)((double)((double)lifetime * INTEREST_LIFETIME_REDUCTION_FACTOR /
- 1000.0) *
- (double)packetsProductionRate_.load()));
+ 1000.0) * (double)packetsProductionRate_.load()));
if (interestSeg < currentSeg_ || interestSeg > (max_gap + currentSeg_)) {
sendNack(*interest);
@@ -200,6 +204,11 @@ void RTCProducerSocket::sendNack(const Interest &interest) {
nack_->setLifetime(0);
nack_->setPathLabel(prodLabel_);
+
+ if (on_content_object_output_ != VOID_HANDLER) {
+ on_content_object_output_(*this, *nack_);
+ }
+
portal_->sendContentObject(*nack_);
}
diff --git a/libtransport/src/hicn/transport/protocols/rtc.cc b/libtransport/src/hicn/transport/protocols/rtc.cc
index 98abbe35b..13c688d00 100644
--- a/libtransport/src/hicn/transport/protocols/rtc.cc
+++ b/libtransport/src/hicn/transport/protocols/rtc.cc
@@ -105,6 +105,7 @@ void RTCTransportProtocol::reset() {
actualSegment_ = 0;
inflightInterestsCount_ = 0;
while (interestRetransmissions_.size() != 0) interestRetransmissions_.pop();
+ lastSegNacked_ = 0;
nackedByProducer_.clear();
nackedByProducerMaxSize_ = 512;
@@ -433,8 +434,8 @@ void RTCTransportProtocol::scheduleAppNackRtx(std::vector<uint32_t> &nacks) {
continue;
}
// packetLost_++;
- // XXX here I need to avoid the retrasmission for packet that were nacked by
- // the network
+ // XXX here I need to avoid the retrasmission for packet that were
+ // nacked by the network
interestRetransmissions_.push(nacks[i]);
}
@@ -450,7 +451,8 @@ void RTCTransportProtocol::onTimeout(Interest::Ptr &&interest) {
inflightInterestsCount_--;
}
- if (inflightInterests_[pkt].retransmissions < HICN_MAX_RTX) {
+ if (inflightInterests_[pkt].retransmissions < HICN_MAX_RTX &&
+ lastSegNacked_ <= segmentNumber) {
interestRetransmissions_.push(segmentNumber);
}
@@ -466,7 +468,9 @@ bool RTCTransportProtocol::checkIfProducerIsActive(
if (productionRate == 0) {
// the producer socket is not active
// in this case we consider only the first nack
- if (nack_timer_used_) return false;
+ if (nack_timer_used_) {
+ return false;
+ }
nack_timer_used_ = true;
// actualSegment_ should be the one in the nack, which will the next in
@@ -482,7 +486,6 @@ bool RTCTransportProtocol::checkIfProducerIsActive(
});
return false;
}
-
return true;
}
@@ -492,6 +495,7 @@ void RTCTransportProtocol::onNack(const ContentObject &content_object) {
uint32_t productionRate = *(++payload);
uint32_t nackSegment = content_object.getName().getSuffix();
+
gotNack_ = true;
// we synch the estimated production rate with the actual one
estimatedBw_ = (double)productionRate;
@@ -506,25 +510,18 @@ void RTCTransportProtocol::onNack(const ContentObject &content_object) {
computeMaxWindow(productionRate, 0);
increaseWindow();
+ while (interestRetransmissions_.size() != 0) interestRetransmissions_.pop();
+ lastSegNacked_ = productionSeg;
+
if (nackedByProducer_.size() >= nackedByProducerMaxSize_)
nackedByProducer_.erase(nackedByProducer_.begin());
nackedByProducer_.insert(nackSegment);
} else if (productionSeg < nackSegment) {
- gotFutureNack_++;
// we are asking stuff in the future
- // example
- // 10 12 13 14 15 16 17
- // ^ ^ ^
- // in prod nack actual
- // in this example we sent up to segment 17 and we get a nack for segment 15
- // this means that we will get nack also for 16 17
- // and valid data for 13 14
- // so the next segment to ask is 15, because 13 and 14 will can back anyway
- // we go back only in the case that the actual segment is really bigger than
- // nack segment, other we do nothing
-
- actualSegment_ = min(actualSegment_, nackSegment);
+ gotFutureNack_++;
+
+ actualSegment_ = productionSeg + 1;
computeMaxWindow(productionRate, 0);
decreaseWindow();
@@ -535,6 +532,24 @@ void RTCTransportProtocol::onNack(const ContentObject &content_object) {
} // equal should not happen
}
+void RTCTransportProtocol::onNackForRtx(const ContentObject &content_object) {
+ uint32_t *payload = (uint32_t *)content_object.getPayload()->data();
+ uint32_t productionSeg = *payload;
+ uint32_t nackSegment = content_object.getName().getSuffix();
+
+ if (productionSeg > nackSegment) {
+ // we are asking for stuff produced in the past
+ actualSegment_ = max(productionSeg + 1, actualSegment_);
+
+ while (interestRetransmissions_.size() != 0) interestRetransmissions_.pop();
+ lastSegNacked_ = productionSeg;
+
+ } else if (productionSeg < nackSegment) {
+
+ actualSegment_ = productionSeg + 1;
+ } // equal should not happen
+}
+
void RTCTransportProtocol::onContentObject(
Interest::Ptr &&interest, ContentObject::Ptr &&content_object) {
auto payload = content_object->getPayload();
@@ -543,19 +558,26 @@ void RTCTransportProtocol::onContentObject(
uint32_t pkt = segmentNumber & modMask_;
bool schedule_next_interest = true;
+ ConsumerContentObjectCallback *callback_content_object = nullptr;
+ socket_->getSocketOption(ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT,
+ &callback_content_object);
+ if (*callback_content_object != VOID_HANDLER) {
+ (*callback_content_object)(*socket_, *content_object);
+ }
+
if (payload_size == HICN_NACK_HEADER_SIZE) {
// Nacks always come form the producer, so we set the producerPathLabel_;
producerPathLabel_ = content_object->getPathLabel();
+ schedule_next_interest = checkIfProducerIsActive(*content_object);
if (inflightInterests_[pkt].retransmissions == 0) {
// discard nacks for rtx packets
inflightInterestsCount_--;
- schedule_next_interest = checkIfProducerIsActive(*content_object);
- // if checkIfProducerIsActive returns true, we did all we need to do
+ // if checkIfProducerIsActive returns false, we did all we need to do
// inside that function, no need to call onNack
- if (!schedule_next_interest) onNack(*content_object);
+ if (schedule_next_interest) onNack(*content_object);
updateDelayStats(*content_object);
} else {
- schedule_next_interest = checkIfProducerIsActive(*content_object);
+ if (schedule_next_interest) onNackForRtx(*content_object);
}
} else {
diff --git a/libtransport/src/hicn/transport/protocols/rtc.h b/libtransport/src/hicn/transport/protocols/rtc.h
index 0bb9d9b2e..51a288574 100644
--- a/libtransport/src/hicn/transport/protocols/rtc.h
+++ b/libtransport/src/hicn/transport/protocols/rtc.h
@@ -38,7 +38,7 @@
// controller constant
#define HICN_ROUND_LEN \
200 // ms interval of time on which we take decisions / measurements
-#define HICN_MAX_RTX 128
+#define HICN_MAX_RTX 3
#define HICN_MIN_RTT_WIN 30 // rounds
// cwin
@@ -126,6 +126,8 @@ class RTCTransportProtocol : public TransportProtocol, public Reassembly {
// is not active)
bool checkIfProducerIsActive(const ContentObject &content_object);
void onNack(const ContentObject &content_object);
+ //funtcion used to handle nacks for retransmitted interests
+ void onNackForRtx(const ContentObject &content_object);
void onContentObject(Interest::Ptr &&interest,
ContentObject::Ptr &&content_object) override;
void returnContentToApplication(const ContentObject &content_object);
@@ -158,6 +160,9 @@ class RTCTransportProtocol : public TransportProtocol, public Reassembly {
uint32_t inflightInterestsCount_;
std::queue<uint32_t> interestRetransmissions_;
std::vector<sentInterest> inflightInterests_;
+ uint32_t lastSegNacked_; //indicates the last segment id in a past Nack.
+ //we do not ask for retransmissions for samething
+ //that is older than this value.
uint32_t nackedByProducerMaxSize_;
std::set<uint32_t>
nackedByProducer_; // this is used to avoid retransmissions from the
diff --git a/utils/src/hiperf.cc b/utils/src/hiperf.cc
index 425b91bca..d546cfca9 100644
--- a/utils/src/hiperf.cc
+++ b/utils/src/hiperf.cc
@@ -100,7 +100,7 @@ class Rate {
std::chrono::microseconds getMicrosecondsForPacket(std::size_t packet_size) {
return std::chrono::microseconds(
- packet_size * long(std::round(1000.0 * 8.0 / rate_kbps_)));
+ (uint32_t)std::round(packet_size * 1000.0 * 8.0 / (double)rate_kbps_));
}
private:
@@ -551,15 +551,15 @@ class HIperfServer {
void sendRTCContentObjectCallback(std::error_code ec) {
if (!ec) {
- auto payload =
- content_objects_[content_objects_index_++ & mask_]->getPayload();
- producer_socket_->produce(payload->data(), payload->length());
rtc_timer_.expires_from_now(
configuration_.production_rate_.getMicrosecondsForPacket(
configuration_.payload_size_));
rtc_timer_.async_wait(
std::bind(&HIperfServer::sendRTCContentObjectCallback, this,
std::placeholders::_1));
+ auto payload =
+ content_objects_[content_objects_index_++ & mask_]->getPayload();
+ producer_socket_->produce(payload->data(), payload->length());
}
}