diff options
-rw-r--r-- | hicn-plugin/src/faces/app/face_prod.c | 40 | ||||
-rw-r--r-- | libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc | 15 | ||||
-rw-r--r-- | libtransport/src/hicn/transport/protocols/rtc.cc | 66 | ||||
-rw-r--r-- | libtransport/src/hicn/transport/protocols/rtc.h | 7 | ||||
-rw-r--r-- | utils/src/hiperf.cc | 8 |
5 files changed, 90 insertions, 46 deletions
diff --git a/hicn-plugin/src/faces/app/face_prod.c b/hicn-plugin/src/faces/app/face_prod.c index 834b35da9..7e6117b84 100644 --- a/hicn-plugin/src/faces/app/face_prod.c +++ b/hicn-plugin/src/faces/app/face_prod.c @@ -131,7 +131,8 @@ hicn_face_prod_add (hicn_prefix_t * prefix, u32 sw_if, u32 * cs_reserved, hicn_main_t *hm = &hicn_main; - ip46_address_t app_ip; + ip46_address_t local_app_ip; + ip46_address_t remote_app_ip; u32 if_flags = 0; if (!hm->is_enabled) @@ -195,7 +196,7 @@ hicn_face_prod_add (hicn_prefix_t * prefix, u32 sw_if, u32 * cs_reserved, */ face->shared.flags &= HICN_FACE_FLAGS_DELETED; hicn_face_prod_t *prod_face = (hicn_face_prod_t *) face->data; - app_ip = prod_face->ip_face.local_addr; + local_app_ip = prod_face->ip_face.local_addr; } } else @@ -207,31 +208,38 @@ hicn_face_prod_add (hicn_prefix_t * prefix, u32 sw_if, u32 * cs_reserved, * Otherwise retrieve an ip address to assign as a * local ip addr. */ - ip4_address_t app_ip4 = get_ip4_address (); + ip4_address_t local_app_ip4; + ip4_address_t remote_app_ip4; + get_two_ip4_addresses (&local_app_ip4, &remote_app_ip4); ip4_add_del_interface_address (vm, sw_if, - &app_ip4, 32, 0 /* is_del */ ); - app_ip = to_ip46 ( /* isv6 */ 0, app_ip4.as_u8); + &local_app_ip4, 31, 0 /* is_del */ ); + local_app_ip = to_ip46 ( /* isv6 */ 0, local_app_ip4.as_u8); + remote_app_ip = to_ip46 ( /* isv6 */ 0, remote_app_ip4.as_u8); + + ret = + hicn_face_ip_add (&local_app_ip, &remote_app_ip, sw_if, faceid); } else { - ip6_address_t app_ip6 = get_ip6_address (); + ip6_address_t local_app_ip6; + ip6_address_t remote_app_ip6; + get_two_ip6_addresses (&local_app_ip6, &remote_app_ip6); u8 *s0; - s0 = format (0, "Prefix %U", format_ip6_address, &app_ip6); + s0 = format (0, "Prefix %U", format_ip6_address, &local_app_ip6); vlib_cli_output (vm, "Setting ip address %s\n", s0); ip6_add_del_interface_address (vm, sw_if, - &app_ip6, 128, 0 /* is_del */ ); - app_ip = to_ip46 ( /* isv6 */ 1, app_ip6.as_u8); - } + &local_app_ip6, 127, + 0 /* is_del */ ); + local_app_ip = to_ip46 ( /* isv6 */ 1, local_app_ip6.as_u8); + remote_app_ip = to_ip46 ( /* isv6 */ 1, remote_app_ip6.as_u8); - /* - * Special case: the nh_addr in the face is the appif ip - * address - */ - ret = hicn_face_ip_add (&app_ip, &(prefix->name), sw_if, faceid); + ret = + hicn_face_ip_add (&local_app_ip, &remote_app_ip, sw_if, faceid); + } face = hicn_dpoi_get_from_idx (*faceid); @@ -259,7 +267,7 @@ hicn_face_prod_add (hicn_prefix_t * prefix, u32 sw_if, u32 * cs_reserved, ret = hicn_route_add (faceid, 1, &(prefix->name), prefix->len); } - *prod_addr = app_ip; + *prod_addr = local_app_ip; /* Cleanup in case of something went wrong. */ if (ret) diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc index 67fcc83e3..cea421703 100644 --- a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc +++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc @@ -145,9 +145,14 @@ void RTCProducerSocket::produce(const uint8_t *buf, size_t buffer_size) { payload->append(buffer_size + TIMESTAMP_LEN); content_object.appendPayload(std::move(payload)); - content_object.setLifetime(1000); // XXX this should be set by the APP + content_object.setLifetime(500); // XXX this should be set by the APP content_object.setPathLabel(prodLabel_); + + if (on_content_object_output_ != VOID_HANDLER) { + on_content_object_output_(*this, content_object); + } + portal_->sendContentObject(content_object); currentSeg_++; @@ -178,8 +183,7 @@ void RTCProducerSocket::onInterest(Interest::Ptr &&interest) { max_gap = (uint32_t)floor( (double)((double)((double)lifetime * INTEREST_LIFETIME_REDUCTION_FACTOR / - 1000.0) * - (double)packetsProductionRate_.load())); + 1000.0) * (double)packetsProductionRate_.load())); if (interestSeg < currentSeg_ || interestSeg > (max_gap + currentSeg_)) { sendNack(*interest); @@ -200,6 +204,11 @@ void RTCProducerSocket::sendNack(const Interest &interest) { nack_->setLifetime(0); nack_->setPathLabel(prodLabel_); + + if (on_content_object_output_ != VOID_HANDLER) { + on_content_object_output_(*this, *nack_); + } + portal_->sendContentObject(*nack_); } diff --git a/libtransport/src/hicn/transport/protocols/rtc.cc b/libtransport/src/hicn/transport/protocols/rtc.cc index 98abbe35b..13c688d00 100644 --- a/libtransport/src/hicn/transport/protocols/rtc.cc +++ b/libtransport/src/hicn/transport/protocols/rtc.cc @@ -105,6 +105,7 @@ void RTCTransportProtocol::reset() { actualSegment_ = 0; inflightInterestsCount_ = 0; while (interestRetransmissions_.size() != 0) interestRetransmissions_.pop(); + lastSegNacked_ = 0; nackedByProducer_.clear(); nackedByProducerMaxSize_ = 512; @@ -433,8 +434,8 @@ void RTCTransportProtocol::scheduleAppNackRtx(std::vector<uint32_t> &nacks) { continue; } // packetLost_++; - // XXX here I need to avoid the retrasmission for packet that were nacked by - // the network + // XXX here I need to avoid the retrasmission for packet that were + // nacked by the network interestRetransmissions_.push(nacks[i]); } @@ -450,7 +451,8 @@ void RTCTransportProtocol::onTimeout(Interest::Ptr &&interest) { inflightInterestsCount_--; } - if (inflightInterests_[pkt].retransmissions < HICN_MAX_RTX) { + if (inflightInterests_[pkt].retransmissions < HICN_MAX_RTX && + lastSegNacked_ <= segmentNumber) { interestRetransmissions_.push(segmentNumber); } @@ -466,7 +468,9 @@ bool RTCTransportProtocol::checkIfProducerIsActive( if (productionRate == 0) { // the producer socket is not active // in this case we consider only the first nack - if (nack_timer_used_) return false; + if (nack_timer_used_) { + return false; + } nack_timer_used_ = true; // actualSegment_ should be the one in the nack, which will the next in @@ -482,7 +486,6 @@ bool RTCTransportProtocol::checkIfProducerIsActive( }); return false; } - return true; } @@ -492,6 +495,7 @@ void RTCTransportProtocol::onNack(const ContentObject &content_object) { uint32_t productionRate = *(++payload); uint32_t nackSegment = content_object.getName().getSuffix(); + gotNack_ = true; // we synch the estimated production rate with the actual one estimatedBw_ = (double)productionRate; @@ -506,25 +510,18 @@ void RTCTransportProtocol::onNack(const ContentObject &content_object) { computeMaxWindow(productionRate, 0); increaseWindow(); + while (interestRetransmissions_.size() != 0) interestRetransmissions_.pop(); + lastSegNacked_ = productionSeg; + if (nackedByProducer_.size() >= nackedByProducerMaxSize_) nackedByProducer_.erase(nackedByProducer_.begin()); nackedByProducer_.insert(nackSegment); } else if (productionSeg < nackSegment) { - gotFutureNack_++; // we are asking stuff in the future - // example - // 10 12 13 14 15 16 17 - // ^ ^ ^ - // in prod nack actual - // in this example we sent up to segment 17 and we get a nack for segment 15 - // this means that we will get nack also for 16 17 - // and valid data for 13 14 - // so the next segment to ask is 15, because 13 and 14 will can back anyway - // we go back only in the case that the actual segment is really bigger than - // nack segment, other we do nothing - - actualSegment_ = min(actualSegment_, nackSegment); + gotFutureNack_++; + + actualSegment_ = productionSeg + 1; computeMaxWindow(productionRate, 0); decreaseWindow(); @@ -535,6 +532,24 @@ void RTCTransportProtocol::onNack(const ContentObject &content_object) { } // equal should not happen } +void RTCTransportProtocol::onNackForRtx(const ContentObject &content_object) { + uint32_t *payload = (uint32_t *)content_object.getPayload()->data(); + uint32_t productionSeg = *payload; + uint32_t nackSegment = content_object.getName().getSuffix(); + + if (productionSeg > nackSegment) { + // we are asking for stuff produced in the past + actualSegment_ = max(productionSeg + 1, actualSegment_); + + while (interestRetransmissions_.size() != 0) interestRetransmissions_.pop(); + lastSegNacked_ = productionSeg; + + } else if (productionSeg < nackSegment) { + + actualSegment_ = productionSeg + 1; + } // equal should not happen +} + void RTCTransportProtocol::onContentObject( Interest::Ptr &&interest, ContentObject::Ptr &&content_object) { auto payload = content_object->getPayload(); @@ -543,19 +558,26 @@ void RTCTransportProtocol::onContentObject( uint32_t pkt = segmentNumber & modMask_; bool schedule_next_interest = true; + ConsumerContentObjectCallback *callback_content_object = nullptr; + socket_->getSocketOption(ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT, + &callback_content_object); + if (*callback_content_object != VOID_HANDLER) { + (*callback_content_object)(*socket_, *content_object); + } + if (payload_size == HICN_NACK_HEADER_SIZE) { // Nacks always come form the producer, so we set the producerPathLabel_; producerPathLabel_ = content_object->getPathLabel(); + schedule_next_interest = checkIfProducerIsActive(*content_object); if (inflightInterests_[pkt].retransmissions == 0) { // discard nacks for rtx packets inflightInterestsCount_--; - schedule_next_interest = checkIfProducerIsActive(*content_object); - // if checkIfProducerIsActive returns true, we did all we need to do + // if checkIfProducerIsActive returns false, we did all we need to do // inside that function, no need to call onNack - if (!schedule_next_interest) onNack(*content_object); + if (schedule_next_interest) onNack(*content_object); updateDelayStats(*content_object); } else { - schedule_next_interest = checkIfProducerIsActive(*content_object); + if (schedule_next_interest) onNackForRtx(*content_object); } } else { diff --git a/libtransport/src/hicn/transport/protocols/rtc.h b/libtransport/src/hicn/transport/protocols/rtc.h index 0bb9d9b2e..51a288574 100644 --- a/libtransport/src/hicn/transport/protocols/rtc.h +++ b/libtransport/src/hicn/transport/protocols/rtc.h @@ -38,7 +38,7 @@ // controller constant #define HICN_ROUND_LEN \ 200 // ms interval of time on which we take decisions / measurements -#define HICN_MAX_RTX 128 +#define HICN_MAX_RTX 3 #define HICN_MIN_RTT_WIN 30 // rounds // cwin @@ -126,6 +126,8 @@ class RTCTransportProtocol : public TransportProtocol, public Reassembly { // is not active) bool checkIfProducerIsActive(const ContentObject &content_object); void onNack(const ContentObject &content_object); + //funtcion used to handle nacks for retransmitted interests + void onNackForRtx(const ContentObject &content_object); void onContentObject(Interest::Ptr &&interest, ContentObject::Ptr &&content_object) override; void returnContentToApplication(const ContentObject &content_object); @@ -158,6 +160,9 @@ class RTCTransportProtocol : public TransportProtocol, public Reassembly { uint32_t inflightInterestsCount_; std::queue<uint32_t> interestRetransmissions_; std::vector<sentInterest> inflightInterests_; + uint32_t lastSegNacked_; //indicates the last segment id in a past Nack. + //we do not ask for retransmissions for samething + //that is older than this value. uint32_t nackedByProducerMaxSize_; std::set<uint32_t> nackedByProducer_; // this is used to avoid retransmissions from the diff --git a/utils/src/hiperf.cc b/utils/src/hiperf.cc index 425b91bca..d546cfca9 100644 --- a/utils/src/hiperf.cc +++ b/utils/src/hiperf.cc @@ -100,7 +100,7 @@ class Rate { std::chrono::microseconds getMicrosecondsForPacket(std::size_t packet_size) { return std::chrono::microseconds( - packet_size * long(std::round(1000.0 * 8.0 / rate_kbps_))); + (uint32_t)std::round(packet_size * 1000.0 * 8.0 / (double)rate_kbps_)); } private: @@ -551,15 +551,15 @@ class HIperfServer { void sendRTCContentObjectCallback(std::error_code ec) { if (!ec) { - auto payload = - content_objects_[content_objects_index_++ & mask_]->getPayload(); - producer_socket_->produce(payload->data(), payload->length()); rtc_timer_.expires_from_now( configuration_.production_rate_.getMicrosecondsForPacket( configuration_.payload_size_)); rtc_timer_.async_wait( std::bind(&HIperfServer::sendRTCContentObjectCallback, this, std::placeholders::_1)); + auto payload = + content_objects_[content_objects_index_++ & mask_]->getPayload(); + producer_socket_->produce(payload->data(), payload->length()); } } |