diff options
Diffstat (limited to 'utils/src/hiperf.cc')
-rw-r--r-- | utils/src/hiperf.cc | 635 |
1 files changed, 422 insertions, 213 deletions
diff --git a/utils/src/hiperf.cc b/utils/src/hiperf.cc index f4764e096..203c2acb9 100644 --- a/utils/src/hiperf.cc +++ b/utils/src/hiperf.cc @@ -16,19 +16,16 @@ #include <hicn/transport/config.h> #include <hicn/transport/core/content_object.h> #include <hicn/transport/core/interest.h> -#include <hicn/transport/interfaces/rtc_socket_producer.h> +#include <hicn/transport/interfaces/global_conf_interface.h> +#include <hicn/transport/interfaces/p2psecure_socket_consumer.h> +#include <hicn/transport/interfaces/p2psecure_socket_producer.h> #include <hicn/transport/interfaces/socket_consumer.h> #include <hicn/transport/interfaces/socket_producer.h> -#include <hicn/transport/security/identity.h> -#include <hicn/transport/security/signer.h> +#include <hicn/transport/auth/identity.h> +#include <hicn/transport/auth/signer.h> #include <hicn/transport/utils/chrono_typedefs.h> #include <hicn/transport/utils/literals.h> -#ifdef SECURE_HICNTRANSPORT -#include <hicn/transport/interfaces/p2psecure_socket_consumer.h> -#include <hicn/transport/interfaces/p2psecure_socket_producer.h> -#endif - #ifndef _WIN32 #include <hicn/transport/utils/daemonizator.h> #endif @@ -37,6 +34,7 @@ #include <cmath> #include <fstream> #include <iomanip> +#include <sstream> #include <unordered_set> #ifdef __linux__ @@ -50,7 +48,6 @@ #endif namespace transport { - namespace interface { #ifndef ERROR_SUCCESS @@ -59,13 +56,50 @@ namespace interface { #define ERROR_SETUP -5 #define MIN_PROBE_SEQ 0xefffffff +struct packet_t { + uint64_t timestamp; + uint32_t size; +}; + +inline uint64_t _ntohll(const uint64_t *input) { + uint64_t return_val; + uint8_t *tmp = (uint8_t *)&return_val; + + tmp[0] = *input >> 56; + tmp[1] = *input >> 48; + tmp[2] = *input >> 40; + tmp[3] = *input >> 32; + tmp[4] = *input >> 24; + tmp[5] = *input >> 16; + tmp[6] = *input >> 8; + tmp[7] = *input >> 0; + + return return_val; +} + +inline uint64_t _htonll(const uint64_t *input) { return (_ntohll(input)); } + +struct nack_packet_t { + uint64_t timestamp; + uint32_t prod_rate; + uint32_t prod_seg; + + inline uint64_t getTimestamp() const { return _ntohll(×tamp); } + inline void setTimestamp(uint64_t time) { timestamp = _htonll(&time); } + + inline uint32_t getProductionRate() const { return ntohl(prod_rate); } + inline void setProductionRate(uint32_t rate) { prod_rate = htonl(rate); } + + inline uint32_t getProductionSegement() const { return ntohl(prod_seg); } + inline void setProductionSegement(uint32_t seg) { prod_seg = htonl(seg); } +}; + /** * Container for command line configuration for hiperf client. */ struct ClientConfiguration { ClientConfiguration() : name("b001::abcd", 0), - verify(false), beta(-1.f), drop_factor(-1.f), window(-1), @@ -78,15 +112,11 @@ struct ClientConfiguration { transport_protocol_(CBR), rtc_(false), test_mode_(false), -#ifdef SECURE_HICNTRANSPORT secure_(false), -#endif producer_prefix_(), - interest_lifetime_(500) { - } + interest_lifetime_(500) {} Name name; - bool verify; double beta; double drop_factor; double window; @@ -99,9 +129,7 @@ struct ClientConfiguration { TransportProtocolAlgorithms transport_protocol_; bool rtc_; bool test_mode_; -#ifdef SECURE_HICNTRANSPORT bool secure_; -#endif Prefix producer_prefix_; uint32_t interest_lifetime_; }; @@ -156,21 +184,19 @@ struct ServerConfiguration { sign(false), content_lifetime(600000000_U32), download_size(20 * 1024 * 1024), - hash_algorithm(utils::CryptoHashType::SHA_256), + hash_algorithm(auth::CryptoHashType::SHA_256), keystore_name(""), passphrase(""), keystore_password("cisco"), multiphase_produce_(false), rtc_(false), interactive_(false), + trace_based_(false), + trace_index_(0), + trace_file_(nullptr), production_rate_(std::string("2048kbps")), - payload_size_(1400) -#ifdef SECURE_HICNTRANSPORT - , - secure_(false) -#endif - { - } + payload_size_(1400), + secure_(false) {} Prefix name; bool virtual_producer; @@ -179,18 +205,20 @@ struct ServerConfiguration { bool sign; std::uint32_t content_lifetime; std::uint32_t download_size; - utils::CryptoHashType hash_algorithm; + auth::CryptoHashType hash_algorithm; std::string keystore_name; std::string passphrase; std::string keystore_password; bool multiphase_produce_; bool rtc_; bool interactive_; + bool trace_based_; + std::uint32_t trace_index_; + char *trace_file_; Rate production_rate_; std::size_t payload_size_; -#ifdef SECURE_HICNTRANSPORT bool secure_; -#endif + std::vector<struct packet_t> trace_; }; /** @@ -217,18 +245,26 @@ class HIperfClient { : configuration_(conf), total_duration_milliseconds_(0), old_bytes_value_(0), + old_interest_tx_value_(0), + old_fec_interest_tx_value_(0), + old_fec_data_rx_value_(0), + old_lost_data_value_(0), + old_bytes_recovered_value_(0), + old_retx_value_(0), + old_sent_int_value_(0), + old_received_nacks_value_(0), + avg_data_delay_(0), + delay_sample_(0), + received_bytes_(0), + received_data_pkt_(0), signals_(io_service_), expected_seg_(0), lost_packets_(std::unordered_set<uint32_t>()), - rtc_callback_(configuration_.rtc_ ? new RTCCallback(*this) : nullptr), - callback_(configuration_.rtc_ ? nullptr : new Callback(*this)), - key_callback_(configuration_.rtc_ ? nullptr : new KeyCallback(*this)) {} - - ~HIperfClient() { - delete callback_; - delete key_callback_; - delete rtc_callback_; - } + rtc_callback_(*this), + callback_(*this), + key_callback_(*this) {} + + ~HIperfClient() {} void checkReceivedRtcContent(ConsumerSocket &c, const ContentObject &contentObject) { @@ -237,11 +273,15 @@ class HIperfClient { uint32_t receivedSeg = contentObject.getName().getSuffix(); auto payload = contentObject.getPayload(); - if ((uint32_t)payload->length() == 8) { // 8 is the size of the NACK - // payload - uint32_t *payloadPtr = (uint32_t *)payload->data(); - uint32_t productionSeg = *(payloadPtr); - uint32_t productionRate = *(++payloadPtr); + if ((uint32_t)payload->length() == 16) { // 16 is the size of the NACK + struct nack_packet_t *nack_pkt = + (struct nack_packet_t *)contentObject.getPayload()->data(); + uint32_t productionSeg = nack_pkt->getProductionSegement(); + uint32_t productionRate = nack_pkt->getProductionRate(); + + // uint32_t *payloadPtr = (uint32_t *)payload->data(); + // uint32_t productionSeg = *(payloadPtr); + // uint32_t productionRate = *(++payloadPtr); if (productionRate == 0) { std::cout << "[STOP] producer is not producing content" << std::endl; @@ -254,7 +294,7 @@ class HIperfClient { << std::endl; expected_seg_ = productionSeg; } else if (receivedSeg > productionSeg && receivedSeg < MIN_PROBE_SEQ) { - std::cout << "[WINDOW TO LARGE] received NACK for " << receivedSeg + std::cout << "[WINDOW TOO LARGE] received NACK for " << receivedSeg << ". Next expected packet " << productionSeg << std::endl; } else if (receivedSeg >= MIN_PROBE_SEQ) { std::cout << "[PROBE] probe number = " << receivedSeg << std::endl; @@ -262,7 +302,24 @@ class HIperfClient { return; } - if (receivedSeg > expected_seg_) { + received_bytes_ += (payload->length() - 12); + received_data_pkt_++; + + // collecting delay stats. Just for performance testing + // XXX we should probably get the transport header (12) somewhere + uint64_t *senderTimeStamp = (uint64_t *)(payload->data() + 12); + uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::system_clock::now().time_since_epoch()) + .count(); + double new_delay = (double)(now - *senderTimeStamp); + if (*senderTimeStamp > now) + new_delay = -1 * (double)(*senderTimeStamp - now); + + delay_sample_++; + avg_data_delay_ = + avg_data_delay_ + (new_delay - avg_data_delay_) / delay_sample_; + + if (receivedSeg > expected_seg_ && expected_seg_ != 0) { for (uint32_t i = expected_seg_; i < receivedSeg; i++) { std::cout << "[LOSS] lost packet " << i << std::endl; lost_packets_.insert(i); @@ -283,24 +340,12 @@ class HIperfClient { expected_seg_ = receivedSeg + 1; } - bool verifyData(ConsumerSocket &c, const ContentObject &contentObject) { - if (contentObject.getPayloadType() == PayloadType::CONTENT_OBJECT) { - std::cout << "VERIFY CONTENT" << std::endl; - } else if (contentObject.getPayloadType() == PayloadType::MANIFEST) { - std::cout << "VERIFY MANIFEST" << std::endl; - } - - return true; - } - void processLeavingInterest(ConsumerSocket &c, const Interest &interest) {} void handleTimerExpiration(ConsumerSocket &c, const TransportStatistics &stats) { - if (configuration_.rtc_) return; - const char separator = ' '; - const int width = 20; + const int width = 15; utils::TimePoint t2 = utils::SteadyClock::now(); auto exact_duration = @@ -323,28 +368,125 @@ class HIperfClient { std::stringstream window; window << stats.getAverageWindowSize() << std::setfill(separator) - << "[Interest]"; + << "[Int]"; std::stringstream avg_rtt; - avg_rtt << stats.getAverageRtt() << std::setfill(separator) << "[us]"; - - std::cout << std::left << std::setw(width) << "Interval"; - std::cout << std::left << std::setw(width) << "Transfer"; - std::cout << std::left << std::setw(width) << "Bandwidth"; - std::cout << std::left << std::setw(width) << "Retr"; - std::cout << std::left << std::setw(width) << "Cwnd"; - std::cout << std::left << std::setw(width) << "AvgRtt" << std::endl; - - std::cout << std::left << std::setw(width) << interval.str(); - std::cout << std::left << std::setw(width) << bytes_transferred.str(); - std::cout << std::left << std::setw(width) << bandwidth.str(); - std::cout << std::left << std::setw(width) << stats.getRetxCount(); - std::cout << std::left << std::setw(width) << window.str(); - std::cout << std::left << std::setw(width) << avg_rtt.str() << std::endl; - std::cout << std::endl; + avg_rtt << stats.getAverageRtt() << std::setfill(separator) << "[ms]"; + if (configuration_.rtc_) { + // we get rtc stats more often, thus we need ms in the interval + std::stringstream interval_ms; + interval_ms << total_duration_milliseconds_ << "-" + << total_duration_milliseconds_ + exact_duration.count(); + + std::stringstream lost_data; + lost_data << stats.getLostData() - old_lost_data_value_ + << std::setfill(separator) << "[pkt]"; + + std::stringstream bytes_recovered_data; + bytes_recovered_data << stats.getBytesRecoveredData() - + old_bytes_recovered_value_ + << std::setfill(separator) << "[pkt]"; + + std::stringstream data_delay; + data_delay << avg_data_delay_ << std::setfill(separator) << "[ms]"; + + std::stringstream received_data_pkt; + received_data_pkt << received_data_pkt_ << std::setfill(separator) + << "[pkt]"; + + std::stringstream goodput; + goodput << (received_bytes_ * 8.0) / (exact_duration.count()) / 1000.0 + << std::setfill(separator) << "[Mbps]"; + + std::stringstream loss_rate; + loss_rate << std::fixed << std::setprecision(2) + << stats.getLossRatio() * 100.0 << std::setfill(separator) + << "[%]"; + + std::stringstream retx_sent; + retx_sent << stats.getRetxCount() - old_retx_value_ + << std::setfill(separator) << "[pkt]"; + + std::stringstream interest_sent; + interest_sent << stats.getInterestTx() - old_sent_int_value_ + << std::setfill(separator) << "[pkt]"; + + std::stringstream nacks; + nacks << stats.getReceivedNacks() - old_received_nacks_value_ + << std::setfill(separator) << "[pkt]"; + + // statistics not yet available in the transport + // std::stringstream interest_fec_tx; + // interest_fec_tx << stats.getInterestFecTxCount() - + // old_fec_interest_tx_value_ << std::setfill(separator) << "[pkt]"; + // std::stringstream bytes_fec_recv; + // bytes_fec_recv << stats.getBytesFecRecv() - old_fec_data_rx_value_ + // << std::setfill(separator) << "[bytes]"; + std::cout << std::left << std::setw(width) << "Interval"; + std::cout << std::left << std::setw(width) << "RecvData"; + std::cout << std::left << std::setw(width) << "Bandwidth"; + std::cout << std::left << std::setw(width) << "Goodput"; + std::cout << std::left << std::setw(width) << "LossRate"; + std::cout << std::left << std::setw(width) << "Retr"; + std::cout << std::left << std::setw(width) << "InterestSent"; + std::cout << std::left << std::setw(width) << "ReceivedNacks"; + std::cout << std::left << std::setw(width) << "SyncWnd"; + std::cout << std::left << std::setw(width) << "MinRtt"; + std::cout << std::left << std::setw(width) << "LostData"; + std::cout << std::left << std::setw(width) << "RecoveredData"; + std::cout << std::left << std::setw(width) << "State"; + std::cout << std::left << std::setw(width) << "DataDelay" << std::endl; + + std::cout << std::left << std::setw(width) << interval_ms.str(); + std::cout << std::left << std::setw(width) << received_data_pkt.str(); + std::cout << std::left << std::setw(width) << bandwidth.str(); + std::cout << std::left << std::setw(width) << goodput.str(); + std::cout << std::left << std::setw(width) << loss_rate.str(); + std::cout << std::left << std::setw(width) << retx_sent.str(); + std::cout << std::left << std::setw(width) << interest_sent.str(); + std::cout << std::left << std::setw(width) << nacks.str(); + std::cout << std::left << std::setw(width) << window.str(); + std::cout << std::left << std::setw(width) << avg_rtt.str(); + std::cout << std::left << std::setw(width) << lost_data.str(); + std::cout << std::left << std::setw(width) << bytes_recovered_data.str(); + std::cout << std::left << std::setw(width) << stats.getCCStatus(); + std::cout << std::left << std::setw(width) << data_delay.str(); + std::cout << std::endl; + + // statistics not yet available in the transport + // std::cout << std::left << std::setw(width) << interest_fec_tx.str(); + // std::cout << std::left << std::setw(width) << bytes_fec_recv.str(); + } else { + std::cout << std::left << std::setw(width) << "Interval"; + std::cout << std::left << std::setw(width) << "Transfer"; + std::cout << std::left << std::setw(width) << "Bandwidth"; + std::cout << std::left << std::setw(width) << "Retr"; + std::cout << std::left << std::setw(width) << "Cwnd"; + std::cout << std::left << std::setw(width) << "AvgRtt" << std::endl; + + std::cout << std::left << std::setw(width) << interval.str(); + std::cout << std::left << std::setw(width) << bytes_transferred.str(); + std::cout << std::left << std::setw(width) << bandwidth.str(); + std::cout << std::left << std::setw(width) << stats.getRetxCount(); + std::cout << std::left << std::setw(width) << window.str(); + std::cout << std::left << std::setw(width) << avg_rtt.str() << std::endl; + std::cout << std::endl; + } total_duration_milliseconds_ += (uint32_t)exact_duration.count(); old_bytes_value_ = stats.getBytesRecv(); + old_lost_data_value_ = stats.getLostData(); + old_bytes_recovered_value_ = stats.getBytesRecoveredData(); + old_fec_interest_tx_value_ = stats.getInterestFecTxCount(); + old_fec_data_rx_value_ = stats.getBytesFecRecv(); + old_retx_value_ = stats.getRetxCount(); + old_sent_int_value_ = stats.getInterestTx(); + old_received_nacks_value_ = stats.getReceivedNacks(); + delay_sample_ = 0; + avg_data_delay_ = 0; + received_bytes_ = 0; + received_data_pkt_ = 0; + t_stats_ = utils::SteadyClock::now(); } @@ -359,7 +501,6 @@ class HIperfClient { configuration_.transport_protocol_ = CBR; } -#ifdef SECURE_HICNTRANSPORT if (configuration_.secure_) { consumer_socket_ = std::make_shared<P2PSecureConsumerSocket>( RAAQM, configuration_.transport_protocol_); @@ -373,12 +514,9 @@ class HIperfClient { secure_consumer_socket.registerPrefix(configuration_.producer_prefix_); } } else { -#endif consumer_socket_ = std::make_shared<ConsumerSocket>(configuration_.transport_protocol_); -#ifdef SECURE_HICNTRANSPORT } -#endif consumer_socket_->setSocketOption( GeneralTransportOptions::INTEREST_LIFETIME, @@ -421,34 +559,21 @@ class HIperfClient { } } - if (configuration_.verify) { - std::shared_ptr<utils::Verifier> verifier = - std::make_shared<utils::Verifier>(); - PARCKeyId *key_id_; - - if (!configuration_.producer_certificate.empty()) { - key_id_ = verifier->addKeyFromCertificate( - configuration_.producer_certificate); - if (key_id_ == nullptr) return ERROR_SETUP; - } - - if (!configuration_.passphrase.empty()) { - key_id_ = verifier->addKeyFromPassphrase( - configuration_.passphrase, utils::CryptoSuite::HMAC_SHA256); - if (key_id_ == nullptr) return ERROR_SETUP; - } - + if (!configuration_.producer_certificate.empty()) { + std::shared_ptr<auth::Verifier> verifier = + std::make_shared<auth::AsymmetricVerifier>( + configuration_.producer_certificate); if (consumer_socket_->setSocketOption(GeneralTransportOptions::VERIFIER, - verifier) == - SOCKET_OPTION_NOT_SET) { + verifier) == SOCKET_OPTION_NOT_SET) return ERROR_SETUP; - } } - if (consumer_socket_->setSocketOption( - GeneralTransportOptions::VERIFY_SIGNATURE, configuration_.verify) == - SOCKET_OPTION_NOT_SET) { - return ERROR_SETUP; + if (!configuration_.passphrase.empty()) { + std::shared_ptr<auth::Verifier> verifier = + std::make_shared<auth::SymmetricVerifier>(configuration_.passphrase); + if (consumer_socket_->setSocketOption(GeneralTransportOptions::VERIFIER, + verifier) == SOCKET_OPTION_NOT_SET) + return ERROR_SETUP; } ret = consumer_socket_->setSocketOption( @@ -462,16 +587,11 @@ class HIperfClient { } if (!configuration_.rtc_) { - /* key_callback_->setConsumer(consumer_socket_); */ - /* consumer_socket_->setSocketOption(ConsumerCallbacksOptions::READ_CALLBACK, - * key_callback_); */ - /* consumer_socket_->setSocketOption(GeneralTransportOptions::KEY_CONTENT, - * true); */ ret = consumer_socket_->setSocketOption( - ConsumerCallbacksOptions::READ_CALLBACK, callback_); + ConsumerCallbacksOptions::READ_CALLBACK, &callback_); } else { ret = consumer_socket_->setSocketOption( - ConsumerCallbacksOptions::READ_CALLBACK, rtc_callback_); + ConsumerCallbacksOptions::READ_CALLBACK, &rtc_callback_); } if (ret == SOCKET_OPTION_NOT_SET) { @@ -489,6 +609,13 @@ class HIperfClient { } } + if (configuration_.rtc_) { + std::shared_ptr<TransportStatistics> transport_stats; + consumer_socket_->getSocketOption( + OtherOptions::STATISTICS, (TransportStatistics **)&transport_stats); + transport_stats->setAlpha(0.0); + } + ret = consumer_socket_->setSocketOption( ConsumerCallbacksOptions::STATS_SUMMARY, (ConsumerTimerCallback)std::bind(&HIperfClient::handleTimerExpiration, @@ -515,15 +642,15 @@ class HIperfClient { std::cout << "Starting download of " << configuration_.name << std::endl; signals_.add(SIGINT); - signals_.async_wait([this](const std::error_code &, const int &) { - consumer_socket_->stop(); - io_service_.stop(); - }); + signals_.async_wait( + [this](const std::error_code &, const int &) { io_service_.stop(); }); t_download_ = t_stats_ = std::chrono::steady_clock::now(); consumer_socket_->asyncConsume(configuration_.name); io_service_.run(); + consumer_socket_->stop(); + return ERROR_SUCCESS; } @@ -648,36 +775,6 @@ class HIperfClient { std::cout << "Key size: " << total_size << " bytes" << std::endl; } - void readKey() { - std::shared_ptr<utils::Verifier> verifier = - std::make_shared<utils::Verifier>(); - verifier->addKeyFromPassphrase(*key_, utils::CryptoSuite::HMAC_SHA256); - - if (consumer_socket_) { - consumer_socket_->setSocketOption(GeneralTransportOptions::KEY_CONTENT, - false); - consumer_socket_->setSocketOption(GeneralTransportOptions::VERIFIER, - verifier); - } else { - std::cout << "Consumer socket not set" << std::endl; - return; - } - - if (validateKey()) { - std::cout << "Key has been authenticated" << std::endl; - } else { - std::cout << "Key could not be authenticated" << std::endl; - return; - } - - if (consumer_socket_->verifyKeyPackets()) { - std::cout << "Signatures of key packets are valid" << std::endl; - } else { - std::cout << "Signatures of key packets are not valid" << std::endl; - return; - } - } - void setConsumer(std::shared_ptr<ConsumerSocket> consumer_socket) { consumer_socket_ = consumer_socket; } @@ -693,14 +790,31 @@ class HIperfClient { Time t_download_; uint32_t total_duration_milliseconds_; uint64_t old_bytes_value_; + uint64_t old_interest_tx_value_; + uint64_t old_fec_interest_tx_value_; + uint64_t old_fec_data_rx_value_; + uint64_t old_lost_data_value_; + uint64_t old_bytes_recovered_value_; + uint32_t old_retx_value_; + uint32_t old_sent_int_value_; + uint32_t old_received_nacks_value_; + + // IMPORTANT: to be used only for performance testing, when consumer and + // producer are synchronized. Used for rtc only at the moment + double avg_data_delay_; + uint32_t delay_sample_; + + uint32_t received_bytes_; + uint32_t received_data_pkt_; + asio::io_service io_service_; asio::signal_set signals_; - std::shared_ptr<ConsumerSocket> consumer_socket_; uint32_t expected_seg_; std::unordered_set<uint32_t> lost_packets_; - RTCCallback *rtc_callback_; - Callback *callback_; - KeyCallback *key_callback_; + RTCCallback rtc_callback_; + Callback callback_; + KeyCallback key_callback_; + std::shared_ptr<ConsumerSocket> consumer_socket_; }; // namespace interface /** @@ -723,11 +837,11 @@ class HIperfServer { #ifndef _WIN32 ptr_last_segment_(&last_segment_), input_(io_service_), - rtc_running_(false) + rtc_running_(false), #else - ptr_last_segment_(&last_segment_) + ptr_last_segment_(&last_segment_), #endif - { + flow_name_(configuration_.name.getName()) { std::string buffer(configuration_.payload_size_, 'X'); std::cout << "Producing contents under name " << conf.name.getName() << std::endl; @@ -738,9 +852,9 @@ class HIperfServer { #endif for (int i = 0; i < (1 << log2_content_object_buffer_size); i++) { - content_objects_[i] = std::make_shared<ContentObject>( - conf.name.getName(), HF_INET6_TCP, (const uint8_t *)buffer.data(), - buffer.size()); + content_objects_[i] = ContentObject::Ptr( + new ContentObject(conf.name.getName(), HF_INET6_TCP, 0, + (const uint8_t *)buffer.data(), buffer.size())); content_objects_[i]->setLifetime( default_values::content_object_expiry_time); } @@ -799,15 +913,16 @@ class HIperfServer { produceContentAsync(p, interest.getName(), suffix); } - void produceContent(ProducerSocket &p, Name content_name, uint32_t suffix) { + void produceContent(ProducerSocket &p, const Name &content_name, + uint32_t suffix) { auto b = utils::MemBuf::create(configuration_.download_size); std::memset(b->writableData(), '?', configuration_.download_size); b->append(configuration_.download_size); uint32_t total; utils::TimePoint t0 = utils::SteadyClock::now(); - total = p.produce(content_name, std::move(b), - !configuration_.multiphase_produce_, suffix); + total = p.produceStream(content_name, std::move(b), + !configuration_.multiphase_produce_, suffix); utils::TimePoint t1 = utils::SteadyClock::now(); std::cout @@ -822,11 +937,6 @@ class HIperfServer { auto b = utils::MemBuf::create(configuration_.download_size); std::memset(b->writableData(), '?', configuration_.download_size); b->append(configuration_.download_size); - /* std::string passphrase = "hunter2"; */ - /* auto b = utils::MemBuf::create(passphrase.length() + 1); */ - /* std::memcpy(b->writableData(), passphrase.c_str(), passphrase.length() + - * 1); */ - /* b->append(passphrase.length() + 1); */ p.asyncProduce(content_name, std::move(b), !configuration_.multiphase_produce_, suffix, @@ -845,23 +955,22 @@ class HIperfServer { std::placeholders::_1, std::placeholders::_2)); } - std::shared_ptr<utils::Identity> getProducerIdentity( - std::string &keystore_name, std::string &keystore_password, - utils::CryptoHashType &hash_algorithm) { - if (access(keystore_name.c_str(), F_OK) != -1) { - return std::make_shared<utils::Identity>(keystore_name, keystore_password, - hash_algorithm); - } else { - return std::make_shared<utils::Identity>(keystore_name, keystore_password, - utils::CryptoSuite::RSA_SHA256, - 1024, 365, "producer-test"); + std::shared_ptr<auth::Identity> getProducerIdentity( + std::string &keystore_path, std::string &keystore_pwd, + auth::CryptoHashType &hash_type) { + if (access(keystore_path.c_str(), F_OK) != -1) { + return std::make_shared<auth::Identity>(keystore_path, keystore_pwd, + hash_type); } + return std::make_shared<auth::Identity>(keystore_path, keystore_pwd, + auth::CryptoSuite::RSA_SHA256, 1024, + 365, "producer-test"); } int setup() { int ret; + int production_protocol; -#ifdef SECURE_HICNTRANSPORT if (configuration_.secure_) { auto identity = getProducerIdentity(configuration_.keystore_name, configuration_.keystore_password, @@ -869,22 +978,21 @@ class HIperfServer { producer_socket_ = std::make_unique<P2PSecureProducerSocket>( configuration_.rtc_, identity); } else { -#endif - if (configuration_.rtc_) { - producer_socket_ = std::make_unique<RTCProducerSocket>(); + if (!configuration_.rtc_) { + production_protocol = ProductionProtocolAlgorithms::BYTE_STREAM; } else { - producer_socket_ = std::make_unique<ProducerSocket>(); + production_protocol = ProductionProtocolAlgorithms::RTC_PROD; } -#ifdef SECURE_HICNTRANSPORT + + producer_socket_ = std::make_unique<ProducerSocket>(production_protocol); } -#endif if (configuration_.sign) { - std::shared_ptr<utils::Signer> signer; + std::shared_ptr<auth::Signer> signer; if (!configuration_.passphrase.empty()) { - signer = std::make_shared<utils::Signer>( - configuration_.passphrase, utils::CryptoSuite::HMAC_SHA256); + signer = std::make_shared<auth::SymmetricSigner>( + auth::CryptoSuite::HMAC_SHA256, configuration_.passphrase); } else if (!configuration_.keystore_name.empty()) { auto identity = getProducerIdentity(configuration_.keystore_name, configuration_.keystore_password, @@ -901,8 +1009,7 @@ class HIperfServer { } uint32_t rtc_header_size = 0; - if(configuration_.rtc_) - rtc_header_size = 8; + if (configuration_.rtc_) rtc_header_size = 12; producer_socket_->setSocketOption( GeneralTransportOptions::DATA_PACKET_SIZE, (uint32_t)( @@ -987,8 +1094,61 @@ class HIperfServer { this, std::placeholders::_1)); auto payload = content_objects_[content_objects_index_++ & mask_]->getPayload(); - producer_socket_->produce( - payload->data(), payload->length() < 1400 ? payload->length() : 1400); + + // this is used to compute the data packet delay + // Used only for performance evaluation + // It requires clock synchronization between producer and consumer + uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::system_clock::now().time_since_epoch()) + .count(); + + std::memcpy(payload->writableData(), &now, sizeof(uint64_t)); + + producer_socket_->produceDatagram( + flow_name_, payload->data(), + payload->length() < 1400 ? payload->length() : 1400); + } + + void sendRTCContentObjectCallbackWithTrace(std::error_code ec) { + if (ec) return; + + auto payload = + content_objects_[content_objects_index_++ & mask_]->getPayload(); + + uint32_t packet_len = + configuration_.trace_[configuration_.trace_index_].size; + + // this is used to compute the data packet delay + // used only for performance evaluation + // it requires clock synchronization between producer and consumer + uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::system_clock::now().time_since_epoch()) + .count(); + + std::memcpy(payload->writableData(), &now, sizeof(uint64_t)); + + if (packet_len > payload->length()) packet_len = payload->length(); + if (packet_len > 1400) packet_len = 1400; + + producer_socket_->produceDatagram(flow_name_, payload->data(), packet_len); + + uint32_t next_index = configuration_.trace_index_ + 1; + uint64_t schedule_next; + if (next_index < configuration_.trace_.size()) { + schedule_next = + configuration_.trace_[next_index].timestamp - + configuration_.trace_[configuration_.trace_index_].timestamp; + } else { + // here we need to loop, schedule in a random time + schedule_next = 1000; + } + + configuration_.trace_index_ = + (configuration_.trace_index_ + 1) % configuration_.trace_.size(); + rtc_timer_.expires_from_now(std::chrono::microseconds(schedule_next)); + rtc_timer_.async_wait( + std::bind(&HIperfServer::sendRTCContentObjectCallbackWithTrace, this, + std::placeholders::_1)); } #ifndef _WIN32 @@ -1021,6 +1181,21 @@ class HIperfServer { } #endif + int parseTraceFile() { + std::ifstream trace(configuration_.trace_file_); + if (trace.fail()) { + return -1; + } + std::string line; + while (std::getline(trace, line)) { + std::istringstream iss(line); + struct packet_t packet; + iss >> packet.timestamp >> packet.size; + configuration_.trace_.push_back(packet); + } + return 0; + } + int run() { std::cerr << "Starting to serve consumers" << std::endl; @@ -1038,6 +1213,21 @@ class HIperfServer { input_, input_buffer_, '\n', std::bind(&HIperfServer::handleInput, this, std::placeholders::_1, std::placeholders::_2)); + } else if (configuration_.trace_based_) { + std::cout << "trace-based mode enabled" << std::endl; + if (configuration_.trace_file_ == nullptr) { + std::cout << "cannot find the trace file" << std::endl; + return ERROR_SETUP; + } + if (parseTraceFile() < 0) { + std::cout << "cannot parse the trace file" << std::endl; + return ERROR_SETUP; + } + rtc_running_ = true; + rtc_timer_.expires_from_now(std::chrono::milliseconds(1)); + rtc_timer_.async_wait( + std::bind(&HIperfServer::sendRTCContentObjectCallbackWithTrace, + this, std::placeholders::_1)); } else { rtc_running_ = true; rtc_timer_.expires_from_now( @@ -1078,6 +1268,7 @@ class HIperfServer { asio::posix::stream_descriptor input_; asio::streambuf input_buffer_; bool rtc_running_; + core::Name flow_name_; #endif }; // namespace interface @@ -1095,6 +1286,8 @@ void usage() { << "Run RTC protocol (client or server)" << std::endl; std::cerr << "-f\t<filename>\t\t\t" << "Log file" << std::endl; + std::cerr << "-z\t<io_module>\t\t\t" + << "IO module to use. Default: hicnlight_module" << std::endl; #endif std::cerr << std::endl; std::cerr << "SERVER SPECIFIC:" << std::endl; @@ -1140,14 +1333,19 @@ void usage() { "Interactive mode, start/stop real time content production " "by pressing return. To be used with the -R option" << std::endl; -#ifdef SECURE_HICNTRANSPORT + std::cerr + << "-T\t<filename>\t\t\t" + "Trace based mode, hiperf takes as input a file with a trace. " + "Each line of the file indicates the timestamp and the size of " + "the packet to generate. To be used with the -R option. -B and -I " + "will be ignored." + << std::endl; std::cerr << "-E\t\t\t\t\t" << "Enable encrypted communication. Requires the path to a p12 " "file containing the " "crypto material used for the TLS handshake" << std::endl; #endif -#endif std::cerr << std::endl; std::cerr << "CLIENT SPECIFIC:" << std::endl; std::cerr << "-b\t<beta_parameter>\t\t" @@ -1169,26 +1367,21 @@ void usage() { std::cerr << "-i\t<stats_interval>\t\t" << "Show the statistics every <stats_interval> milliseconds." << std::endl; - std::cerr << "-v\t\t\t\t\t" - << "Enable verification of received data" << std::endl; std::cerr << "-c\t<certificate_path>\t\t" << "Path of the producer certificate to be used for verifying the " - "origin of the packets received. Must be used with -v." + "origin of the packets received." << std::endl; std::cerr << "-k\t<passphrase>\t\t\t" << "String from which is derived the symmetric key used by the " - "producer to sign packets and by the consumer to verify them. " - "Must be used with -v." + "producer to sign packets and by the consumer to verify them." << std::endl; std::cerr << "-t\t\t\t\t\t" "Test mode, check if the client is receiving the " "correct data. This is an RTC specific option, to be " "used with the -R (default false)" << std::endl; -#ifdef SECURE_HICNTRANSPORT std::cerr << "-P\t\t\t\t\t" << "Prefix of the producer where to do the handshake" << std::endl; -#endif } int main(int argc, char *argv[]) { @@ -1205,6 +1398,9 @@ int main(int argc, char *argv[]) { int options = 0; char *log_file = nullptr; + interface::global_config::IoModuleConfiguration config; + std::string conf_file; + config.name = "hicnlight_module"; // Consumer ClientConfiguration client_configuration; @@ -1214,9 +1410,9 @@ int main(int argc, char *argv[]) { int opt; #ifndef _WIN32 - while ((opt = getopt(argc, argv, - "DSCf:b:d:W:RM:c:vA:s:rmlK:k:y:p:hi:xE:P:B:ItL:")) != - -1) { + while ((opt = getopt( + argc, argv, + "DSCf:b:d:W:RM:c:vA:s:rmlK:k:y:p:hi:xE:P:B:ItL:z:T:F:")) != -1) { switch (opt) { // Common case 'D': { @@ -1225,11 +1421,19 @@ int main(int argc, char *argv[]) { } case 'I': { server_configuration.interactive_ = true; + server_configuration.trace_based_ = false; + break; + } + case 'T': { + server_configuration.interactive_ = false; + server_configuration.trace_based_ = true; + server_configuration.trace_file_ = optarg; break; } #else while ((opt = getopt(argc, argv, - "SCf:b:d:W:RM:c:vA:s:rmlK:k:y:p:hi:xB:E:P:tL:")) != -1) { + "SCf:b:d:W:RM:c:vA:s:rmlK:k:y:p:hi:xB:E:P:tL:z:F:")) != + -1) { switch (opt) { #endif case 'f': { @@ -1241,6 +1445,14 @@ int main(int argc, char *argv[]) { server_configuration.rtc_ = true; break; } + case 'z': { + config.name = optarg; + break; + } + case 'F': { + conf_file = optarg; + break; + } // Server or Client case 'S': { @@ -1255,7 +1467,6 @@ int main(int argc, char *argv[]) { server_configuration.passphrase = std::string(optarg); client_configuration.passphrase = std::string(optarg); server_configuration.sign = true; - options = -1; break; } @@ -1280,23 +1491,16 @@ int main(int argc, char *argv[]) { options = 1; break; } -#ifdef SECURE_HICNTRANSPORT case 'P': { client_configuration.producer_prefix_ = Prefix(optarg); client_configuration.secure_ = true; break; } -#endif case 'c': { client_configuration.producer_certificate = std::string(optarg); options = 1; break; } - case 'v': { - client_configuration.verify = true; - options = 1; - break; - } case 'i': { client_configuration.report_interval_milliseconds_ = std::stoul(optarg); options = 1; @@ -1346,11 +1550,11 @@ int main(int argc, char *argv[]) { } case 'y': { if (strncasecmp(optarg, "sha256", 6) == 0) { - server_configuration.hash_algorithm = utils::CryptoHashType::SHA_256; + server_configuration.hash_algorithm = auth::CryptoHashType::SHA_256; } else if (strncasecmp(optarg, "sha512", 6) == 0) { - server_configuration.hash_algorithm = utils::CryptoHashType::SHA_512; + server_configuration.hash_algorithm = auth::CryptoHashType::SHA_512; } else if (strncasecmp(optarg, "crc32", 5) == 0) { - server_configuration.hash_algorithm = utils::CryptoHashType::CRC32C; + server_configuration.hash_algorithm = auth::CryptoHashType::CRC32C; } else { std::cerr << "Ignored unknown hash algorithm. Using SHA 256." << std::endl; @@ -1375,13 +1579,11 @@ int main(int argc, char *argv[]) { options = -1; break; } -#ifdef SECURE_HICNTRANSPORT case 'E': { server_configuration.keystore_name = std::string(optarg); server_configuration.secure_ = true; break; } -#endif case 'h': default: usage(); @@ -1395,7 +1597,6 @@ int main(int argc, char *argv[]) { << std::endl; usage(); return EXIT_FAILURE; - } else if (options < 0 && role > 0) { std::cerr << "Server options cannot be used when using the " "software in client mode" @@ -1443,6 +1644,14 @@ int main(int argc, char *argv[]) { } #endif + /** + * IO module configuration + */ + config.set(); + + // Parse config file + transport::interface::global_config::parseConfigurationFile(conf_file); + if (role > 0) { HIperfClient c(client_configuration); if (c.setup() != ERROR_SETUP) { @@ -1461,11 +1670,11 @@ int main(int argc, char *argv[]) { #ifdef _WIN32 WSACleanup(); #endif + return 0; } } // end namespace interface - } // end namespace transport int main(int argc, char *argv[]) { |