From a51b4a6dc22dbd8d14f1d50d6c74986b8cb7ffca Mon Sep 17 00:00:00 2001 From: michele papalini Date: Mon, 15 Apr 2019 10:46:56 +0200 Subject: [HICN-163] check inside hiperf if the client receives all the packets in case of RTC Change-Id: Ie418b4cdba3e4e17cfe302c0e79e9d786e40cbd4 Signed-off-by: michele papalini --- utils/src/hiperf.cc | 93 +++++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 86 insertions(+), 7 deletions(-) diff --git a/utils/src/hiperf.cc b/utils/src/hiperf.cc index d546cfca9..7594d1f94 100644 --- a/utils/src/hiperf.cc +++ b/utils/src/hiperf.cc @@ -21,6 +21,7 @@ #endif #include +#include #include #include @@ -56,7 +57,8 @@ struct ClientConfiguration { receive_buffer(std::make_shared>()), download_size(0), report_interval_milliseconds_(1000), - rtc_(false) {} + rtc_(false), + test_mode_(false) {} Name name; bool verify; @@ -70,6 +72,7 @@ struct ClientConfiguration { std::uint32_t report_interval_milliseconds_; TransportProtocolAlgorithms transport_protocol_; bool rtc_; + bool test_mode_; }; class Rate { @@ -153,7 +156,9 @@ class HIperfClient { : configuration_(conf), total_duration_milliseconds_(0), old_bytes_value_(0), - signals_(io_service_, SIGINT) {} + signals_(io_service_, SIGINT), + expected_seg_(0), + lost_packets_(std::unordered_set()) {} void processPayload(ConsumerSocket &c, std::size_t bytes_transferred, const std::error_code &ec) { @@ -177,6 +182,57 @@ class HIperfClient { configuration_.receive_buffer->clear(); } + void checkReceivedRtcContent(ConsumerSocket &c, + const ContentObject &contentObject) { + if(!configuration_.test_mode_) + return; + + uint32_t receivedSeg = contentObject.getName().getSuffix(); + auto payload = contentObject.getPayload(); + + if((uint32_t)payload->length() == 8){ //8 is the size of the NACK payload + uint32_t *payloadPtr = (uint32_t *)payload->data(); + uint32_t productionSeg = *(payloadPtr); + uint32_t productionRate = *(++payloadPtr); + + if(productionRate == 0){ + std::cout << "[STOP] producer is not producing content" + << std::endl; + return; + } + + if(receivedSeg < productionSeg){ + std::cout << "[OUT OF SYNCH] received NACK for " << receivedSeg << + ". Next expected packet " << productionSeg + 1 << std::endl; + expected_seg_ = productionSeg; + } else if(receivedSeg > productionSeg){ + std::cout << "[WINDOW TO LARGE] received NACK for " << receivedSeg << + ". Next expected packet " << productionSeg << std::endl; + } + return; + } + + if(receivedSeg > expected_seg_){ + for(uint32_t i = expected_seg_; i < receivedSeg; i++){ + std::cout << "[LOSS] lost packet " << i << std::endl; + lost_packets_.insert(i); + } + expected_seg_ = receivedSeg + 1; + return; + }else if (receivedSeg < expected_seg_){ + auto it = lost_packets_.find(receivedSeg); + if(it != lost_packets_.end()){ + std::cout << "[RECOVER] recovered packet " << receivedSeg << std::endl; + lost_packets_.erase(it); + }else{ + std::cout << "[OUT OF ORDER] recevied " << receivedSeg << " expedted " << + expected_seg_ << std::endl; + } + return; + } + expected_seg_ = receivedSeg + 1; + } + bool verifyData(ConsumerSocket &c, const ContentObject &contentObject) { if (contentObject.getPayloadType() == PayloadType::CONTENT_OBJECT) { std::cout << "VERIFY CONTENT" << std::endl; @@ -339,6 +395,17 @@ class HIperfClient { return ERROR_SETUP; } + if(configuration_.rtc_){ + ret = consumer_socket_->setSocketOption( + ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT, + (ConsumerContentObjectCallback)std::bind( + &HIperfClient::checkReceivedRtcContent, this, + std::placeholders::_1, std::placeholders::_2)); + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + } + ret = consumer_socket_->setSocketOption( ConsumerCallbacksOptions::STATS_SUMMARY, (ConsumerTimerCallback)std::bind(&HIperfClient::handleTimerExpiration, @@ -386,6 +453,8 @@ class HIperfClient { asio::io_service io_service_; asio::signal_set signals_; std::unique_ptr consumer_socket_; + uint32_t expected_seg_; + std::unordered_set lost_packets_; }; class HIperfServer { @@ -489,8 +558,8 @@ class HIperfServer { producer_socket_->connect(); if (configuration_.rtc_) { - std::cout << "Running RTC producer: all other options (with the " - "exception of the bitrate) will be ignored." + std::cout << "Running RTC producer: the prefix length will be ignored." + " Use /128 by default in RTC mode" << std::endl; return ERROR_SUCCESS; } @@ -726,6 +795,12 @@ void usage() { std::cout << "-v = Enable verification of received data" << std::endl; + std::cout + << "-t = Test mode, check if the client is " + "receiving the correct data. This is an RTC specific option, to be " + "used with the -R (default false)" + << std::endl; + } int main(int argc, char *argv[]) { @@ -751,7 +826,7 @@ int main(int argc, char *argv[]) { int opt; #ifndef _WIN32 - while ((opt = getopt(argc, argv, "DSCf:b:d:W:RMc:vA:s:rmlk:y:p:hi:xB:I")) != + while ((opt = getopt(argc, argv, "DSCf:b:d:W:RMc:vA:s:rmlk:y:p:hi:xB:It")) != -1) { switch (opt) { // Common @@ -764,7 +839,7 @@ int main(int argc, char *argv[]) { break; } #else - while ((opt = getopt(argc, argv, "SCf:b:d:W:RMc:vA:s:rmlk:y:p:hi:xB:")) != + while ((opt = getopt(argc, argv, "SCf:b:d:W:RMc:vA:s:rmlk:y:p:hi:xB:t")) != -1) { switch (opt) { #endif @@ -824,7 +899,11 @@ int main(int argc, char *argv[]) { options = 1; break; } - + case 't':{ + client_configuration.test_mode_ = true; + options = 1; + break; + } // Server specific case 'A': { server_configuration.download_size = std::stoul(optarg); -- cgit 1.2.3-korg