diff options
Diffstat (limited to 'utils/src/hiperf.cc')
-rw-r--r-- | utils/src/hiperf.cc | 1683 |
1 files changed, 0 insertions, 1683 deletions
diff --git a/utils/src/hiperf.cc b/utils/src/hiperf.cc deleted file mode 100644 index 9a1cf6236..000000000 --- a/utils/src/hiperf.cc +++ /dev/null @@ -1,1683 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <hicn/transport/config.h> -#include <hicn/transport/core/content_object.h> -#include <hicn/transport/core/interest.h> -#include <hicn/transport/interfaces/global_conf_interface.h> -#include <hicn/transport/interfaces/p2psecure_socket_consumer.h> -#include <hicn/transport/interfaces/p2psecure_socket_producer.h> -#include <hicn/transport/interfaces/socket_consumer.h> -#include <hicn/transport/interfaces/socket_producer.h> -#include <hicn/transport/auth/identity.h> -#include <hicn/transport/auth/signer.h> -#include <hicn/transport/utils/chrono_typedefs.h> -#include <hicn/transport/utils/literals.h> - -#ifndef _WIN32 -#include <hicn/transport/utils/daemonizator.h> -#endif - -#include <asio.hpp> -#include <cmath> -#include <fstream> -#include <iomanip> -#include <sstream> -#include <unordered_set> - -#ifdef __linux__ -#ifndef __ANDROID__ -#include <mcheck.h> -#endif -#endif - -#ifdef _WIN32 -#include <hicn/transport/portability/win_portability.h> -#endif - -namespace transport { -namespace interface { - -#ifndef ERROR_SUCCESS -#define ERROR_SUCCESS 0 -#endif -#define ERROR_SETUP -5 -#define MIN_PROBE_SEQ 0xefffffff - -struct packet_t { - uint64_t timestamp; - uint32_t size; -}; - -inline uint64_t _ntohll(const uint64_t *input) { - uint64_t return_val; - uint8_t *tmp = (uint8_t *)&return_val; - - tmp[0] = (uint8_t)(*input >> 56); - tmp[1] = (uint8_t)(*input >> 48); - tmp[2] = (uint8_t)(*input >> 40); - tmp[3] = (uint8_t)(*input >> 32); - tmp[4] = (uint8_t)(*input >> 24); - tmp[5] = (uint8_t)(*input >> 16); - tmp[6] = (uint8_t)(*input >> 8); - tmp[7] = (uint8_t)(*input >> 0); - - return return_val; -} - -inline uint64_t _htonll(const uint64_t *input) { return (_ntohll(input)); } - -struct nack_packet_t { - uint64_t timestamp; - uint32_t prod_rate; - uint32_t prod_seg; - - inline uint64_t getTimestamp() const { return _ntohll(×tamp); } - inline void setTimestamp(uint64_t time) { timestamp = _htonll(&time); } - - inline uint32_t getProductionRate() const { return ntohl(prod_rate); } - inline void setProductionRate(uint32_t rate) { prod_rate = htonl(rate); } - - inline uint32_t getProductionSegement() const { return ntohl(prod_seg); } - inline void setProductionSegement(uint32_t seg) { prod_seg = htonl(seg); } -}; - -/** - * Container for command line configuration for hiperf client. - */ -struct ClientConfiguration { - ClientConfiguration() - : name("b001::abcd", 0), - beta(-1.f), - drop_factor(-1.f), - window(-1), - producer_certificate(""), - passphrase(""), - receive_buffer(nullptr), - receive_buffer_size_(128 * 1024), - download_size(0), - report_interval_milliseconds_(1000), - transport_protocol_(CBR), - rtc_(false), - test_mode_(false), - secure_(false), - producer_prefix_(), - interest_lifetime_(500) {} - - Name name; - double beta; - double drop_factor; - double window; - std::string producer_certificate; - std::string passphrase; - std::shared_ptr<utils::MemBuf> receive_buffer; - std::size_t receive_buffer_size_; - std::size_t download_size; - std::uint32_t report_interval_milliseconds_; - TransportProtocolAlgorithms transport_protocol_; - bool rtc_; - bool test_mode_; - bool secure_; - Prefix producer_prefix_; - uint32_t interest_lifetime_; -}; - -/** - * Class for handling the production rate for the RTC producer. - */ -class Rate { - public: - Rate() : rate_kbps_(0) {} - - Rate(const std::string &rate) { - std::size_t found = rate.find("kbps"); - if (found != std::string::npos) { - rate_kbps_ = std::stof(rate.substr(0, found)); - } else { - throw std::runtime_error("Format " + rate + " not correct"); - } - } - - Rate(const Rate &other) : rate_kbps_(other.rate_kbps_) {} - - Rate &operator=(const std::string &rate) { - std::size_t found = rate.find("kbps"); - if (found != std::string::npos) { - rate_kbps_ = std::stof(rate.substr(0, found)); - } else { - throw std::runtime_error("Format " + rate + " not correct"); - } - - return *this; - } - - std::chrono::microseconds getMicrosecondsForPacket(std::size_t packet_size) { - return std::chrono::microseconds( - (uint32_t)std::round(packet_size * 1000.0 * 8.0 / (double)rate_kbps_)); - } - - private: - float rate_kbps_; -}; - -/** - * Container for command line configuration for hiperf server. - */ -struct ServerConfiguration { - ServerConfiguration() - : name("b001::abcd/64"), - virtual_producer(true), - manifest(false), - live_production(false), - sign(false), - content_lifetime(600000000_U32), - download_size(20 * 1024 * 1024), - hash_algorithm(auth::CryptoHashType::SHA_256), - keystore_name(""), - passphrase(""), - keystore_password("cisco"), - multiphase_produce_(false), - rtc_(false), - interactive_(false), - trace_based_(false), - trace_index_(0), - trace_file_(nullptr), - production_rate_(std::string("2048kbps")), - payload_size_(1400), - secure_(false) {} - - Prefix name; - bool virtual_producer; - bool manifest; - bool live_production; - bool sign; - std::uint32_t content_lifetime; - std::uint32_t download_size; - auth::CryptoHashType hash_algorithm; - std::string keystore_name; - std::string passphrase; - std::string keystore_password; - bool multiphase_produce_; - bool rtc_; - bool interactive_; - bool trace_based_; - std::uint32_t trace_index_; - char *trace_file_; - Rate production_rate_; - std::size_t payload_size_; - bool secure_; - std::vector<struct packet_t> trace_; -}; - -/** - * Forward declaration of client Read callbacks. - */ -class RTCCallback; -class Callback; -class KeyCallback; - -/** - * Hiperf client class: configure and setup an hicn consumer following the - * ClientConfiguration. - */ -class HIperfClient { - typedef std::chrono::time_point<std::chrono::steady_clock> Time; - typedef std::chrono::microseconds TimeDuration; - - friend class Callback; - friend class KeyCallback; - friend class RTCCallback; - - public: - HIperfClient(const ClientConfiguration &conf) - : configuration_(conf), - total_duration_milliseconds_(0), - old_bytes_value_(0), - old_interest_tx_value_(0), - old_fec_interest_tx_value_(0), - old_fec_data_rx_value_(0), - old_lost_data_value_(0), - old_bytes_recovered_value_(0), - old_retx_value_(0), - old_sent_int_value_(0), - old_received_nacks_value_(0), - avg_data_delay_(0), - delay_sample_(0), - received_bytes_(0), - received_data_pkt_(0), - signals_(io_service_), - expected_seg_(0), - lost_packets_(std::unordered_set<uint32_t>()), - rtc_callback_(*this), - callback_(*this), - key_callback_(*this) {} - - ~HIperfClient() {} - - void checkReceivedRtcContent(ConsumerSocket &c, - const ContentObject &contentObject) { - if (!configuration_.test_mode_) return; - - uint32_t receivedSeg = contentObject.getName().getSuffix(); - auto payload = contentObject.getPayload(); - - if ((uint32_t)payload->length() == 16) { // 16 is the size of the NACK - struct nack_packet_t *nack_pkt = - (struct nack_packet_t *)contentObject.getPayload()->data(); - uint32_t productionSeg = nack_pkt->getProductionSegement(); - uint32_t productionRate = nack_pkt->getProductionRate(); - - // uint32_t *payloadPtr = (uint32_t *)payload->data(); - // uint32_t productionSeg = *(payloadPtr); - // uint32_t productionRate = *(++payloadPtr); - - if (productionRate == 0) { - std::cout << "[STOP] producer is not producing content" << std::endl; - return; - } - - if (receivedSeg < productionSeg) { - std::cout << "[OUT OF SYNCH] received NACK for " << receivedSeg - << ". Next expected packet " << productionSeg + 1 - << std::endl; - expected_seg_ = productionSeg; - } else if (receivedSeg > productionSeg && receivedSeg < MIN_PROBE_SEQ) { - std::cout << "[WINDOW TOO LARGE] received NACK for " << receivedSeg - << ". Next expected packet " << productionSeg << std::endl; - } else if (receivedSeg >= MIN_PROBE_SEQ) { - std::cout << "[PROBE] probe number = " << receivedSeg << std::endl; - } - return; - } - - received_bytes_ += (uint32_t)(payload->length() - 12); - received_data_pkt_++; - - // collecting delay stats. Just for performance testing - // XXX we should probably get the transport header (12) somewhere - uint64_t *senderTimeStamp = (uint64_t *)(payload->data() + 12); - uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::system_clock::now().time_since_epoch()) - .count(); - double new_delay = (double)(now - *senderTimeStamp); - if (*senderTimeStamp > now) - new_delay = -1 * (double)(*senderTimeStamp - now); - - delay_sample_++; - avg_data_delay_ = - avg_data_delay_ + (new_delay - avg_data_delay_) / delay_sample_; - - if (receivedSeg > expected_seg_ && expected_seg_ != 0) { - for (uint32_t i = expected_seg_; i < receivedSeg; i++) { - std::cout << "[LOSS] lost packet " << i << std::endl; - lost_packets_.insert(i); - } - expected_seg_ = receivedSeg + 1; - return; - } else if (receivedSeg < expected_seg_) { - auto it = lost_packets_.find(receivedSeg); - if (it != lost_packets_.end()) { - std::cout << "[RECOVER] recovered packet " << receivedSeg << std::endl; - lost_packets_.erase(it); - } else { - std::cout << "[OUT OF ORDER] recevied " << receivedSeg << " expedted " - << expected_seg_ << std::endl; - } - return; - } - expected_seg_ = receivedSeg + 1; - } - - void processLeavingInterest(ConsumerSocket &c, const Interest &interest) {} - - void handleTimerExpiration(ConsumerSocket &c, - const TransportStatistics &stats) { - const char separator = ' '; - const int width = 15; - - utils::TimePoint t2 = utils::SteadyClock::now(); - auto exact_duration = - std::chrono::duration_cast<utils::Milliseconds>(t2 - t_stats_); - - std::stringstream interval; - interval << total_duration_milliseconds_ / 1000 << "-" - << total_duration_milliseconds_ / 1000 + - exact_duration.count() / 1000; - - std::stringstream bytes_transferred; - bytes_transferred << std::fixed << std::setprecision(3) - << (stats.getBytesRecv() - old_bytes_value_) / 1000000.0 - << std::setfill(separator) << "[MBytes]"; - - std::stringstream bandwidth; - bandwidth << ((stats.getBytesRecv() - old_bytes_value_) * 8) / - (exact_duration.count()) / 1000.0 - << std::setfill(separator) << "[Mbps]"; - - std::stringstream window; - window << stats.getAverageWindowSize() << std::setfill(separator) - << "[Int]"; - - std::stringstream avg_rtt; - avg_rtt << stats.getAverageRtt() << std::setfill(separator) << "[ms]"; - - if (configuration_.rtc_) { - // we get rtc stats more often, thus we need ms in the interval - std::stringstream interval_ms; - interval_ms << total_duration_milliseconds_ << "-" - << total_duration_milliseconds_ + exact_duration.count(); - - std::stringstream lost_data; - lost_data << stats.getLostData() - old_lost_data_value_ - << std::setfill(separator) << "[pkt]"; - - std::stringstream bytes_recovered_data; - bytes_recovered_data << stats.getBytesRecoveredData() - - old_bytes_recovered_value_ - << std::setfill(separator) << "[pkt]"; - - std::stringstream data_delay; - data_delay << avg_data_delay_ << std::setfill(separator) << "[ms]"; - - std::stringstream received_data_pkt; - received_data_pkt << received_data_pkt_ << std::setfill(separator) - << "[pkt]"; - - std::stringstream goodput; - goodput << (received_bytes_ * 8.0) / (exact_duration.count()) / 1000.0 - << std::setfill(separator) << "[Mbps]"; - - std::stringstream loss_rate; - loss_rate << std::fixed << std::setprecision(2) - << stats.getLossRatio() * 100.0 << std::setfill(separator) - << "[%]"; - - std::stringstream retx_sent; - retx_sent << stats.getRetxCount() - old_retx_value_ - << std::setfill(separator) << "[pkt]"; - - std::stringstream interest_sent; - interest_sent << stats.getInterestTx() - old_sent_int_value_ - << std::setfill(separator) << "[pkt]"; - - std::stringstream nacks; - nacks << stats.getReceivedNacks() - old_received_nacks_value_ - << std::setfill(separator) << "[pkt]"; - - // statistics not yet available in the transport - // std::stringstream interest_fec_tx; - // interest_fec_tx << stats.getInterestFecTxCount() - - // old_fec_interest_tx_value_ << std::setfill(separator) << "[pkt]"; - // std::stringstream bytes_fec_recv; - // bytes_fec_recv << stats.getBytesFecRecv() - old_fec_data_rx_value_ - // << std::setfill(separator) << "[bytes]"; - std::cout << std::left << std::setw(width) << "Interval"; - std::cout << std::left << std::setw(width) << "RecvData"; - std::cout << std::left << std::setw(width) << "Bandwidth"; - std::cout << std::left << std::setw(width) << "Goodput"; - std::cout << std::left << std::setw(width) << "LossRate"; - std::cout << std::left << std::setw(width) << "Retr"; - std::cout << std::left << std::setw(width) << "InterestSent"; - std::cout << std::left << std::setw(width) << "ReceivedNacks"; - std::cout << std::left << std::setw(width) << "SyncWnd"; - std::cout << std::left << std::setw(width) << "MinRtt"; - std::cout << std::left << std::setw(width) << "LostData"; - std::cout << std::left << std::setw(width) << "RecoveredData"; - std::cout << std::left << std::setw(width) << "State"; - std::cout << std::left << std::setw(width) << "DataDelay" << std::endl; - - std::cout << std::left << std::setw(width) << interval_ms.str(); - std::cout << std::left << std::setw(width) << received_data_pkt.str(); - std::cout << std::left << std::setw(width) << bandwidth.str(); - std::cout << std::left << std::setw(width) << goodput.str(); - std::cout << std::left << std::setw(width) << loss_rate.str(); - std::cout << std::left << std::setw(width) << retx_sent.str(); - std::cout << std::left << std::setw(width) << interest_sent.str(); - std::cout << std::left << std::setw(width) << nacks.str(); - std::cout << std::left << std::setw(width) << window.str(); - std::cout << std::left << std::setw(width) << avg_rtt.str(); - std::cout << std::left << std::setw(width) << lost_data.str(); - std::cout << std::left << std::setw(width) << bytes_recovered_data.str(); - std::cout << std::left << std::setw(width) << stats.getCCStatus(); - std::cout << std::left << std::setw(width) << data_delay.str(); - std::cout << std::endl; - - // statistics not yet available in the transport - // std::cout << std::left << std::setw(width) << interest_fec_tx.str(); - // std::cout << std::left << std::setw(width) << bytes_fec_recv.str(); - } else { - std::cout << std::left << std::setw(width) << "Interval"; - std::cout << std::left << std::setw(width) << "Transfer"; - std::cout << std::left << std::setw(width) << "Bandwidth"; - std::cout << std::left << std::setw(width) << "Retr"; - std::cout << std::left << std::setw(width) << "Cwnd"; - std::cout << std::left << std::setw(width) << "AvgRtt" << std::endl; - - std::cout << std::left << std::setw(width) << interval.str(); - std::cout << std::left << std::setw(width) << bytes_transferred.str(); - std::cout << std::left << std::setw(width) << bandwidth.str(); - std::cout << std::left << std::setw(width) << stats.getRetxCount(); - std::cout << std::left << std::setw(width) << window.str(); - std::cout << std::left << std::setw(width) << avg_rtt.str() << std::endl; - std::cout << std::endl; - } - total_duration_milliseconds_ += (uint32_t)exact_duration.count(); - old_bytes_value_ = stats.getBytesRecv(); - old_lost_data_value_ = stats.getLostData(); - old_bytes_recovered_value_ = stats.getBytesRecoveredData(); - old_fec_interest_tx_value_ = stats.getInterestFecTxCount(); - old_fec_data_rx_value_ = stats.getBytesFecRecv(); - old_retx_value_ = (uint32_t)stats.getRetxCount(); - old_sent_int_value_ = (uint32_t)stats.getInterestTx(); - old_received_nacks_value_ = stats.getReceivedNacks(); - delay_sample_ = 0; - avg_data_delay_ = 0; - received_bytes_ = 0; - received_data_pkt_ = 0; - - t_stats_ = utils::SteadyClock::now(); - } - - int setup() { - int ret; - - if (configuration_.rtc_) { - configuration_.transport_protocol_ = RTC; - } else if (configuration_.window < 0) { - configuration_.transport_protocol_ = RAAQM; - } else { - configuration_.transport_protocol_ = CBR; - } - - if (configuration_.secure_) { - consumer_socket_ = std::make_shared<P2PSecureConsumerSocket>( - RAAQM, configuration_.transport_protocol_); - if (configuration_.producer_prefix_.getPrefixLength() == 0) { - std::cerr << "ERROR -- Missing producer prefix on which perform the " - "handshake." - << std::endl; - } else { - P2PSecureConsumerSocket &secure_consumer_socket = - *(static_cast<P2PSecureConsumerSocket *>(consumer_socket_.get())); - secure_consumer_socket.registerPrefix(configuration_.producer_prefix_); - } - } else { - consumer_socket_ = - std::make_shared<ConsumerSocket>(configuration_.transport_protocol_); - } - - consumer_socket_->setSocketOption( - GeneralTransportOptions::INTEREST_LIFETIME, - configuration_.interest_lifetime_); - -#if defined(DEBUG) && defined(__linux__) - std::shared_ptr<transport::BasePortal> portal; - consumer_socket_->getSocketOption(GeneralTransportOptions::PORTAL, portal); - signals_ = - std::make_unique<asio::signal_set>(portal->getIoService(), SIGUSR1); - signals_->async_wait([this](const std::error_code &, const int &) { - std::cout << "Signal SIGUSR1!" << std::endl; - mtrace(); - }); -#endif - - if (consumer_socket_->setSocketOption(CURRENT_WINDOW_SIZE, - configuration_.window) == - SOCKET_OPTION_NOT_SET) { - std::cerr << "ERROR -- Impossible to set the size of the window." - << std::endl; - return ERROR_SETUP; - } - - if (configuration_.transport_protocol_ == RAAQM && - configuration_.beta != -1.f) { - if (consumer_socket_->setSocketOption(RaaqmTransportOptions::BETA_VALUE, - configuration_.beta) == - SOCKET_OPTION_NOT_SET) { - return ERROR_SETUP; - } - } - - if (configuration_.transport_protocol_ == RAAQM && - configuration_.drop_factor != -1.f) { - if (consumer_socket_->setSocketOption(RaaqmTransportOptions::DROP_FACTOR, - configuration_.drop_factor) == - SOCKET_OPTION_NOT_SET) { - return ERROR_SETUP; - } - } - - if (!configuration_.producer_certificate.empty()) { - std::shared_ptr<auth::Verifier> verifier = - std::make_shared<auth::AsymmetricVerifier>( - configuration_.producer_certificate); - if (consumer_socket_->setSocketOption(GeneralTransportOptions::VERIFIER, - verifier) == SOCKET_OPTION_NOT_SET) - return ERROR_SETUP; - } - - if (!configuration_.passphrase.empty()) { - std::shared_ptr<auth::Verifier> verifier = - std::make_shared<auth::SymmetricVerifier>(configuration_.passphrase); - if (consumer_socket_->setSocketOption(GeneralTransportOptions::VERIFIER, - verifier) == SOCKET_OPTION_NOT_SET) - return ERROR_SETUP; - } - - ret = consumer_socket_->setSocketOption( - ConsumerCallbacksOptions::INTEREST_OUTPUT, - (ConsumerInterestCallback)std::bind( - &HIperfClient::processLeavingInterest, this, std::placeholders::_1, - std::placeholders::_2)); - - if (ret == SOCKET_OPTION_NOT_SET) { - return ERROR_SETUP; - } - - if (!configuration_.rtc_) { - ret = consumer_socket_->setSocketOption( - ConsumerCallbacksOptions::READ_CALLBACK, &callback_); - } else { - ret = consumer_socket_->setSocketOption( - ConsumerCallbacksOptions::READ_CALLBACK, &rtc_callback_); - } - - if (ret == SOCKET_OPTION_NOT_SET) { - return ERROR_SETUP; - } - - if (configuration_.rtc_) { - ret = consumer_socket_->setSocketOption( - ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT, - (ConsumerContentObjectCallback)std::bind( - &HIperfClient::checkReceivedRtcContent, this, - std::placeholders::_1, std::placeholders::_2)); - if (ret == SOCKET_OPTION_NOT_SET) { - return ERROR_SETUP; - } - } - - if (configuration_.rtc_) { - std::shared_ptr<TransportStatistics> transport_stats; - consumer_socket_->getSocketOption( - OtherOptions::STATISTICS, (TransportStatistics **)&transport_stats); - transport_stats->setAlpha(0.0); - } - - ret = consumer_socket_->setSocketOption( - ConsumerCallbacksOptions::STATS_SUMMARY, - (ConsumerTimerCallback)std::bind(&HIperfClient::handleTimerExpiration, - this, std::placeholders::_1, - std::placeholders::_2)); - - if (ret == SOCKET_OPTION_NOT_SET) { - return ERROR_SETUP; - } - - if (consumer_socket_->setSocketOption( - GeneralTransportOptions::STATS_INTERVAL, - configuration_.report_interval_milliseconds_) == - SOCKET_OPTION_NOT_SET) { - return ERROR_SETUP; - } - - consumer_socket_->connect(); - - return ERROR_SUCCESS; - } - - int run() { - std::cout << "Starting download of " << configuration_.name << std::endl; - - signals_.add(SIGINT); - signals_.async_wait( - [this](const std::error_code &, const int &) { io_service_.stop(); }); - - t_download_ = t_stats_ = std::chrono::steady_clock::now(); - consumer_socket_->asyncConsume(configuration_.name); - io_service_.run(); - - consumer_socket_->stop(); - - return ERROR_SUCCESS; - } - - private: - class RTCCallback : public ConsumerSocket::ReadCallback { - static constexpr std::size_t mtu = 1500; - - public: - RTCCallback(HIperfClient &hiperf_client) : client_(hiperf_client) { - client_.configuration_.receive_buffer = utils::MemBuf::create(mtu); - } - - bool isBufferMovable() noexcept override { return false; } - - void getReadBuffer(uint8_t **application_buffer, - size_t *max_length) override { - *application_buffer = - client_.configuration_.receive_buffer->writableData(); - *max_length = mtu; - } - - void readDataAvailable(std::size_t length) noexcept override {} - - size_t maxBufferSize() const override { return mtu; } - - void readError(const std::error_code ec) noexcept override { - std::cerr << "Error while reading from RTC socket" << std::endl; - client_.io_service_.stop(); - } - - void readSuccess(std::size_t total_size) noexcept override { - std::cout << "Data successfully read" << std::endl; - } - - private: - HIperfClient &client_; - }; - - class Callback : public ConsumerSocket::ReadCallback { - public: - Callback(HIperfClient &hiperf_client) : client_(hiperf_client) { - client_.configuration_.receive_buffer = - utils::MemBuf::create(client_.configuration_.receive_buffer_size_); - } - - bool isBufferMovable() noexcept override { return false; } - - void getReadBuffer(uint8_t **application_buffer, - size_t *max_length) override { - *application_buffer = - client_.configuration_.receive_buffer->writableData(); - *max_length = client_.configuration_.receive_buffer_size_; - } - - void readDataAvailable(std::size_t length) noexcept override {} - - void readBufferAvailable( - std::unique_ptr<utils::MemBuf> &&buffer) noexcept override {} - - size_t maxBufferSize() const override { - return client_.configuration_.receive_buffer_size_; - } - - void readError(const std::error_code ec) noexcept override { - std::cerr << "Error " << ec.message() << " while reading from socket" - << std::endl; - client_.io_service_.stop(); - } - - void readSuccess(std::size_t total_size) noexcept override { - Time t2 = std::chrono::steady_clock::now(); - TimeDuration dt = - std::chrono::duration_cast<TimeDuration>(t2 - client_.t_download_); - long usec = (long)dt.count(); - - std::cout << "Content retrieved. Size: " << total_size << " [Bytes]" - << std::endl; - - std::cerr << "Elapsed Time: " << usec / 1000000.0 << " seconds -- " - << (total_size * 8) * 1.0 / usec * 1.0 << " [Mbps]" - << std::endl; - - client_.io_service_.stop(); - } - - private: - HIperfClient &client_; - }; - - class KeyCallback : public ConsumerSocket::ReadCallback { - static constexpr std::size_t read_size = 16 * 1024; - - public: - KeyCallback(HIperfClient &hiperf_client) - : client_(hiperf_client), key_(nullptr) {} - - bool isBufferMovable() noexcept override { return true; } - - void getReadBuffer(uint8_t **application_buffer, - size_t *max_length) override {} - - void readDataAvailable(std::size_t length) noexcept override {} - - void readBufferAvailable( - std::unique_ptr<utils::MemBuf> &&buffer) noexcept override { - key_ = std::make_unique<std::string>((const char *)buffer->data(), - buffer->length()); - std::cout << "Key: " << *key_ << std::endl; - } - - size_t maxBufferSize() const override { return read_size; } - - void readError(const std::error_code ec) noexcept override { - std::cerr << "Error " << ec.message() << " while reading from socket" - << std::endl; - client_.io_service_.stop(); - } - - bool validateKey() { return !key_->empty(); } - - void readSuccess(std::size_t total_size) noexcept override { - std::cout << "Key size: " << total_size << " bytes" << std::endl; - } - - void setConsumer(std::shared_ptr<ConsumerSocket> consumer_socket) { - consumer_socket_ = consumer_socket; - } - - private: - HIperfClient &client_; - std::unique_ptr<std::string> key_; - std::shared_ptr<ConsumerSocket> consumer_socket_; - }; - - ClientConfiguration configuration_; - Time t_stats_; - Time t_download_; - uint32_t total_duration_milliseconds_; - uint64_t old_bytes_value_; - uint64_t old_interest_tx_value_; - uint64_t old_fec_interest_tx_value_; - uint64_t old_fec_data_rx_value_; - uint64_t old_lost_data_value_; - uint64_t old_bytes_recovered_value_; - uint32_t old_retx_value_; - uint32_t old_sent_int_value_; - uint32_t old_received_nacks_value_; - - // IMPORTANT: to be used only for performance testing, when consumer and - // producer are synchronized. Used for rtc only at the moment - double avg_data_delay_; - uint32_t delay_sample_; - - uint32_t received_bytes_; - uint32_t received_data_pkt_; - - asio::io_service io_service_; - asio::signal_set signals_; - uint32_t expected_seg_; - std::unordered_set<uint32_t> lost_packets_; - RTCCallback rtc_callback_; - Callback callback_; - KeyCallback key_callback_; - std::shared_ptr<ConsumerSocket> consumer_socket_; -}; // namespace interface - -/** - * Hiperf server class: configure and setup an hicn producer following the - * ServerConfiguration. - */ -class HIperfServer { - const std::size_t log2_content_object_buffer_size = 8; - - public: - HIperfServer(ServerConfiguration &conf) - : configuration_(conf), - signals_(io_service_), - rtc_timer_(io_service_), - unsatisfied_interests_(), - content_objects_((std::uint16_t)(1 << log2_content_object_buffer_size)), - content_objects_index_(0), - mask_((std::uint16_t)(1 << log2_content_object_buffer_size) - 1), - last_segment_(0), -#ifndef _WIN32 - ptr_last_segment_(&last_segment_), - input_(io_service_), - rtc_running_(false), -#else - ptr_last_segment_(&last_segment_), -#endif - flow_name_(configuration_.name.getName()) { - std::string buffer(configuration_.payload_size_, 'X'); - std::cout << "Producing contents under name " << conf.name.getName() - << std::endl; -#ifndef _WIN32 - if (configuration_.interactive_) { - input_.assign(::dup(STDIN_FILENO)); - } -#endif - - for (int i = 0; i < (1 << log2_content_object_buffer_size); i++) { - content_objects_[i] = ContentObject::Ptr( - new ContentObject(conf.name.getName(), HF_INET6_TCP, 0, - (const uint8_t *)buffer.data(), buffer.size())); - content_objects_[i]->setLifetime( - default_values::content_object_expiry_time); - } - } - - void virtualProcessInterest(ProducerSocket &p, const Interest &interest) { - // std::cout << "Received interest " << interest.getName() << std::endl; - content_objects_[content_objects_index_ & mask_]->setName( - interest.getName()); - producer_socket_->produce( - *content_objects_[content_objects_index_++ & mask_]); - } - - void processInterest(ProducerSocket &p, const Interest &interest) { - p.setSocketOption(ProducerCallbacksOptions::CACHE_MISS, - (ProducerInterestCallback)VOID_HANDLER); - p.setSocketOption(GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME, - 5000000_U32); - - produceContent(p, interest.getName(), interest.getName().getSuffix()); - std::cout << "Received interest " << interest.getName().getSuffix() - << std::endl; - } - - void asyncProcessInterest(ProducerSocket &p, const Interest &interest) { - p.setSocketOption(ProducerCallbacksOptions::CACHE_MISS, - (ProducerInterestCallback)bind( - &HIperfServer::cacheMiss, this, std::placeholders::_1, - std::placeholders::_2)); - p.setSocketOption(GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME, - 5000000_U32); - uint32_t suffix = interest.getName().getSuffix(); - - if (suffix == 0) { - last_segment_ = 0; - ptr_last_segment_ = &last_segment_; - unsatisfied_interests_.clear(); - } - - // The suffix will either be the one from the received interest or the - // smallest suffix of a previous interest not satisfed - if (!unsatisfied_interests_.empty()) { - auto it = - std::lower_bound(unsatisfied_interests_.begin(), - unsatisfied_interests_.end(), *ptr_last_segment_); - if (it != unsatisfied_interests_.end()) { - suffix = *it; - } - unsatisfied_interests_.erase(unsatisfied_interests_.begin(), it); - } - - std::cout << "Received interest " << interest.getName().getSuffix() - << ", starting production at " << suffix << std::endl; - std::cout << unsatisfied_interests_.size() << " interests still unsatisfied" - << std::endl; - produceContentAsync(p, interest.getName(), suffix); - } - - void produceContent(ProducerSocket &p, const Name &content_name, - uint32_t suffix) { - auto b = utils::MemBuf::create(configuration_.download_size); - std::memset(b->writableData(), '?', configuration_.download_size); - b->append(configuration_.download_size); - uint32_t total; - - utils::TimePoint t0 = utils::SteadyClock::now(); - total = p.produceStream(content_name, std::move(b), - !configuration_.multiphase_produce_, suffix); - utils::TimePoint t1 = utils::SteadyClock::now(); - - std::cout - << "Written " << total - << " data packets in output buffer (Segmentation time: " - << std::chrono::duration_cast<utils::Microseconds>(t1 - t0).count() - << " us)" << std::endl; - } - - void produceContentAsync(ProducerSocket &p, Name content_name, - uint32_t suffix) { - auto b = utils::MemBuf::create(configuration_.download_size); - std::memset(b->writableData(), '?', configuration_.download_size); - b->append(configuration_.download_size); - - p.asyncProduce(content_name, std::move(b), - !configuration_.multiphase_produce_, suffix, - &ptr_last_segment_); - } - - void cacheMiss(ProducerSocket &p, const Interest &interest) { - unsatisfied_interests_.push_back(interest.getName().getSuffix()); - } - - void onContentProduced(ProducerSocket &p, const std::error_code &err, - uint64_t bytes_written) { - p.setSocketOption(ProducerCallbacksOptions::CACHE_MISS, - (ProducerInterestCallback)bind( - &HIperfServer::asyncProcessInterest, this, - std::placeholders::_1, std::placeholders::_2)); - } - - std::shared_ptr<auth::Identity> getProducerIdentity( - std::string &keystore_path, std::string &keystore_pwd, - auth::CryptoHashType &hash_type) { - if (access(keystore_path.c_str(), F_OK) != -1) { - return std::make_shared<auth::Identity>(keystore_path, keystore_pwd, - hash_type); - } - return std::make_shared<auth::Identity>(keystore_path, keystore_pwd, - auth::CryptoSuite::RSA_SHA256, 1024, - 365, "producer-test"); - } - - int setup() { - int ret; - int production_protocol; - - if (configuration_.secure_) { - auto identity = getProducerIdentity(configuration_.keystore_name, - configuration_.keystore_password, - configuration_.hash_algorithm); - producer_socket_ = std::make_unique<P2PSecureProducerSocket>( - configuration_.rtc_, identity); - } else { - if (!configuration_.rtc_) { - production_protocol = ProductionProtocolAlgorithms::BYTE_STREAM; - } else { - production_protocol = ProductionProtocolAlgorithms::RTC_PROD; - } - - producer_socket_ = std::make_unique<ProducerSocket>(production_protocol); - } - - if (configuration_.sign) { - std::shared_ptr<auth::Signer> signer; - - if (!configuration_.passphrase.empty()) { - signer = std::make_shared<auth::SymmetricSigner>( - auth::CryptoSuite::HMAC_SHA256, configuration_.passphrase); - } else if (!configuration_.keystore_name.empty()) { - auto identity = getProducerIdentity(configuration_.keystore_name, - configuration_.keystore_password, - configuration_.hash_algorithm); - signer = identity->getSigner(); - } else { - return ERROR_SETUP; - } - - if (producer_socket_->setSocketOption(GeneralTransportOptions::SIGNER, - signer) == SOCKET_OPTION_NOT_SET) { - return ERROR_SETUP; - } - } - - uint32_t rtc_header_size = 0; - if (configuration_.rtc_) rtc_header_size = 12; - producer_socket_->setSocketOption( - GeneralTransportOptions::DATA_PACKET_SIZE, - (uint32_t)( - configuration_.payload_size_ + rtc_header_size + - (configuration_.name.getAddressFamily() == AF_INET ? 40 : 60))); - producer_socket_->registerPrefix(configuration_.name); - producer_socket_->connect(); - - if (configuration_.rtc_) { - std::cout << "Running RTC producer: the prefix length will be ignored." - " Use /128 by default in RTC mode" - << std::endl; - return ERROR_SUCCESS; - } - - if (!configuration_.virtual_producer) { - if (producer_socket_->setSocketOption( - GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME, - configuration_.content_lifetime) == SOCKET_OPTION_NOT_SET) { - return ERROR_SETUP; - } - - if (producer_socket_->setSocketOption( - GeneralTransportOptions::MAKE_MANIFEST, - configuration_.manifest) == SOCKET_OPTION_NOT_SET) { - return ERROR_SETUP; - } - - if (producer_socket_->setSocketOption( - GeneralTransportOptions::OUTPUT_BUFFER_SIZE, 200000U) == - SOCKET_OPTION_NOT_SET) { - return ERROR_SETUP; - } - - if (!configuration_.live_production) { - produceContent(*producer_socket_, configuration_.name.getName(), 0); - } else { - ret = producer_socket_->setSocketOption( - ProducerCallbacksOptions::CACHE_MISS, - (ProducerInterestCallback)bind(&HIperfServer::asyncProcessInterest, - this, std::placeholders::_1, - std::placeholders::_2)); - - if (ret == SOCKET_OPTION_NOT_SET) { - return ERROR_SETUP; - } - } - } else { - ret = producer_socket_->setSocketOption( - GeneralTransportOptions::OUTPUT_BUFFER_SIZE, 0U); - - if (ret == SOCKET_OPTION_NOT_SET) { - return ERROR_SETUP; - } - - ret = producer_socket_->setSocketOption( - ProducerCallbacksOptions::CACHE_MISS, - (ProducerInterestCallback)bind(&HIperfServer::virtualProcessInterest, - this, std::placeholders::_1, - std::placeholders::_2)); - - if (ret == SOCKET_OPTION_NOT_SET) { - return ERROR_SETUP; - } - } - - ret = producer_socket_->setSocketOption( - ProducerCallbacksOptions::CONTENT_PRODUCED, - (ProducerContentCallback)bind( - &HIperfServer::onContentProduced, this, std::placeholders::_1, - std::placeholders::_2, std::placeholders::_3)); - - return ERROR_SUCCESS; - } - - void sendRTCContentObjectCallback(std::error_code ec) { - if (ec) return; - rtc_timer_.expires_from_now( - configuration_.production_rate_.getMicrosecondsForPacket( - configuration_.payload_size_)); - rtc_timer_.async_wait(std::bind(&HIperfServer::sendRTCContentObjectCallback, - this, std::placeholders::_1)); - auto payload = - content_objects_[content_objects_index_++ & mask_]->getPayload(); - - // this is used to compute the data packet delay - // Used only for performance evaluation - // It requires clock synchronization between producer and consumer - uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::system_clock::now().time_since_epoch()) - .count(); - - std::memcpy(payload->writableData(), &now, sizeof(uint64_t)); - - producer_socket_->produceDatagram( - flow_name_, payload->data(), - payload->length() < 1400 ? payload->length() : 1400); - } - - void sendRTCContentObjectCallbackWithTrace(std::error_code ec) { - if (ec) return; - - auto payload = - content_objects_[content_objects_index_++ & mask_]->getPayload(); - - uint32_t packet_len = - configuration_.trace_[configuration_.trace_index_].size; - - // this is used to compute the data packet delay - // used only for performance evaluation - // it requires clock synchronization between producer and consumer - uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::system_clock::now().time_since_epoch()) - .count(); - - std::memcpy(payload->writableData(), &now, sizeof(uint64_t)); - - if (packet_len > payload->length()) packet_len = (uint32_t)payload->length(); - if (packet_len > 1400) packet_len = 1400; - - producer_socket_->produceDatagram(flow_name_, payload->data(), packet_len); - - uint32_t next_index = configuration_.trace_index_ + 1; - uint64_t schedule_next; - if (next_index < configuration_.trace_.size()) { - schedule_next = - configuration_.trace_[next_index].timestamp - - configuration_.trace_[configuration_.trace_index_].timestamp; - } else { - // here we need to loop, schedule in a random time - schedule_next = 1000; - } - - configuration_.trace_index_ = - (configuration_.trace_index_ + 1) % configuration_.trace_.size(); - rtc_timer_.expires_from_now(std::chrono::microseconds(schedule_next)); - rtc_timer_.async_wait( - std::bind(&HIperfServer::sendRTCContentObjectCallbackWithTrace, this, - std::placeholders::_1)); - } - -#ifndef _WIN32 - void handleInput(const std::error_code &error, std::size_t length) { - if (error) { - producer_socket_->stop(); - io_service_.stop(); - } - - if (rtc_running_) { - std::cout << "stop real time content production" << std::endl; - rtc_running_ = false; - rtc_timer_.cancel(); - } else { - std::cout << "start real time content production" << std::endl; - rtc_running_ = true; - rtc_timer_.expires_from_now( - configuration_.production_rate_.getMicrosecondsForPacket( - configuration_.payload_size_)); - rtc_timer_.async_wait( - std::bind(&HIperfServer::sendRTCContentObjectCallback, this, - std::placeholders::_1)); - } - - input_buffer_.consume(length); // Remove newline from input. - asio::async_read_until( - input_, input_buffer_, '\n', - std::bind(&HIperfServer::handleInput, this, std::placeholders::_1, - std::placeholders::_2)); - } -#endif - - int parseTraceFile() { - std::ifstream trace(configuration_.trace_file_); - if (trace.fail()) { - return -1; - } - std::string line; - while (std::getline(trace, line)) { - std::istringstream iss(line); - struct packet_t packet; - iss >> packet.timestamp >> packet.size; - configuration_.trace_.push_back(packet); - } - return 0; - } - - int run() { - std::cerr << "Starting to serve consumers" << std::endl; - - signals_.add(SIGINT); - signals_.async_wait([this](const std::error_code &, const int &) { - std::cout << "STOPPING!!" << std::endl; - producer_socket_->stop(); - io_service_.stop(); - }); - - if (configuration_.rtc_) { -#ifndef _WIN32 - if (configuration_.interactive_) { - asio::async_read_until( - input_, input_buffer_, '\n', - std::bind(&HIperfServer::handleInput, this, std::placeholders::_1, - std::placeholders::_2)); - } else if (configuration_.trace_based_) { - std::cout << "trace-based mode enabled" << std::endl; - if (configuration_.trace_file_ == nullptr) { - std::cout << "cannot find the trace file" << std::endl; - return ERROR_SETUP; - } - if (parseTraceFile() < 0) { - std::cout << "cannot parse the trace file" << std::endl; - return ERROR_SETUP; - } - rtc_running_ = true; - rtc_timer_.expires_from_now(std::chrono::milliseconds(1)); - rtc_timer_.async_wait( - std::bind(&HIperfServer::sendRTCContentObjectCallbackWithTrace, - this, std::placeholders::_1)); - } else { - rtc_running_ = true; - rtc_timer_.expires_from_now( - configuration_.production_rate_.getMicrosecondsForPacket( - configuration_.payload_size_)); - rtc_timer_.async_wait( - std::bind(&HIperfServer::sendRTCContentObjectCallback, this, - std::placeholders::_1)); - } -#else - rtc_timer_.expires_from_now( - configuration_.production_rate_.getMicrosecondsForPacket( - configuration_.payload_size_)); - rtc_timer_.async_wait( - std::bind(&HIperfServer::sendRTCContentObjectCallback, this, - std::placeholders::_1)); -#endif - } - - io_service_.run(); - - return ERROR_SUCCESS; - } - - private: - ServerConfiguration configuration_; - asio::io_service io_service_; - asio::signal_set signals_; - asio::steady_timer rtc_timer_; - std::vector<uint32_t> unsatisfied_interests_; - std::vector<std::shared_ptr<ContentObject>> content_objects_; - std::uint16_t content_objects_index_; - std::uint16_t mask_; - std::uint32_t last_segment_; - std::uint32_t *ptr_last_segment_; - std::unique_ptr<ProducerSocket> producer_socket_; -#ifndef _WIN32 - asio::posix::stream_descriptor input_; - asio::streambuf input_buffer_; - bool rtc_running_; - -#endif - core::Name flow_name_; -}; // namespace interface - -void usage() { - std::cerr << "HIPERF - A tool for performing network throughput " - "measurements with hICN" - << std::endl; - std::cerr << "usage: hiperf [-S|-C] [options] [prefix|name]" << std::endl; - std::cerr << std::endl; - std::cerr << "SERVER OR CLIENT:" << std::endl; -#ifndef _WIN32 - std::cerr << "-D\t\t\t\t\t" - << "Run as a daemon" << std::endl; - std::cerr << "-R\t\t\t\t\t" - << "Run RTC protocol (client or server)" << std::endl; - std::cerr << "-f\t<filename>\t\t\t" - << "Log file" << std::endl; - std::cerr << "-z\t<io_module>\t\t\t" - << "IO module to use. Default: hicnlight_module" << std::endl; -#endif - std::cerr << std::endl; - std::cerr << "SERVER SPECIFIC:" << std::endl; - std::cerr << "-A\t<content_size>\t\t\t" - "Size of the content to publish. This " - "is not the size of the packet (see -s for it)." - << std::endl; - std::cerr << "-s\t<packet_size>\t\t\tSize of the payload of each data packet." - << std::endl; - std::cerr << "-r\t\t\t\t\t" - << "Produce real content of <content_size> bytes" << std::endl; - std::cerr << "-m\t\t\t\t\t" - << "Produce transport manifest" << std::endl; - std::cerr << "-l\t\t\t\t\t" - << "Start producing content upon the reception of the " - "first interest" - << std::endl; - std::cerr << "-K\t<keystore_path>\t\t\t" - << "Path of p12 file containing the " - "crypto material used for signing packets" - << std::endl; - std::cerr << "-k\t<passphrase>\t\t\t" - << "String from which a 128-bit symmetric key will be " - "derived for signing packets" - << std::endl; - std::cerr << "-y\t<hash_algorithm>\t\t" - << "Use the selected hash algorithm for " - "calculating manifest digests" - << std::endl; - std::cerr << "-p\t<password>\t\t\t" - << "Password for p12 keystore" << std::endl; - std::cerr << "-x\t\t\t\t\t" - << "Produce a content of <content_size>, then after downloading " - "it produce a new content of" - << "\n\t\t\t\t\t<content_size> without resetting " - "the suffix to 0." - << std::endl; - std::cerr << "-B\t<bitrate>\t\t\t" - << "Bitrate for RTC producer, to be used with the -R option." - << std::endl; -#ifndef _WIN32 - std::cerr << "-I\t\t\t\t\t" - "Interactive mode, start/stop real time content production " - "by pressing return. To be used with the -R option" - << std::endl; - std::cerr - << "-T\t<filename>\t\t\t" - "Trace based mode, hiperf takes as input a file with a trace. " - "Each line of the file indicates the timestamp and the size of " - "the packet to generate. To be used with the -R option. -B and -I " - "will be ignored." - << std::endl; - std::cerr << "-E\t\t\t\t\t" - << "Enable encrypted communication. Requires the path to a p12 " - "file containing the " - "crypto material used for the TLS handshake" - << std::endl; -#endif - std::cerr << std::endl; - std::cerr << "CLIENT SPECIFIC:" << std::endl; - std::cerr << "-b\t<beta_parameter>\t\t" - << "RAAQM beta parameter" << std::endl; - std::cerr << "-d\t<drop_factor_parameter>\t\t" - << "RAAQM drop factor " - "parameter" - << std::endl; - std::cerr << "-L\t<interest lifetime>\t\t" - << "Set interest lifetime." << std::endl; - std::cerr << "-M\t<input_buffer_size>\t\t" - << "Size of consumer input buffer. If 0, reassembly of packets " - "will be disabled." - << std::endl; - std::cerr << "-W\t<window_size>\t\t\t" - << "Use a fixed congestion window " - "for retrieving the data." - << std::endl; - std::cerr << "-i\t<stats_interval>\t\t" - << "Show the statistics every <stats_interval> milliseconds." - << std::endl; - std::cerr << "-c\t<certificate_path>\t\t" - << "Path of the producer certificate to be used for verifying the " - "origin of the packets received." - << std::endl; - std::cerr << "-k\t<passphrase>\t\t\t" - << "String from which is derived the symmetric key used by the " - "producer to sign packets and by the consumer to verify them." - << std::endl; - std::cerr << "-t\t\t\t\t\t" - "Test mode, check if the client is receiving the " - "correct data. This is an RTC specific option, to be " - "used with the -R (default false)" - << std::endl; - std::cerr << "-P\t\t\t\t\t" - << "Prefix of the producer where to do the handshake" << std::endl; -} - -int main(int argc, char *argv[]) { -#ifndef _WIN32 - // Common - bool daemon = false; -#else - WSADATA wsaData = {0}; - WSAStartup(MAKEWORD(2, 2), &wsaData); -#endif - - // -1 server, 0 undefined, 1 client - int role = 0; - int options = 0; - - char *log_file = nullptr; - interface::global_config::IoModuleConfiguration config; - std::string conf_file; - config.name = "hicnlight_module"; - - // Consumer - ClientConfiguration client_configuration; - - // Producer - ServerConfiguration server_configuration; - - int opt; -#ifndef _WIN32 - while ((opt = getopt( - argc, argv, - "DSCf:b:d:W:RM:c:vA:s:rmlK:k:y:p:hi:xE:P:B:ItL:z:T:F:")) != -1) { - switch (opt) { - // Common - case 'D': { - daemon = true; - break; - } - case 'I': { - server_configuration.interactive_ = true; - server_configuration.trace_based_ = false; - break; - } - case 'T': { - server_configuration.interactive_ = false; - server_configuration.trace_based_ = true; - server_configuration.trace_file_ = optarg; - break; - } -#else - while ((opt = getopt(argc, argv, - "SCf:b:d:W:RM:c:vA:s:rmlK:k:y:p:hi:xB:E:P:tL:z:F:")) != - -1) { - switch (opt) { -#endif - case 'f': { - log_file = optarg; - break; - } - case 'R': { - client_configuration.rtc_ = true; - server_configuration.rtc_ = true; - break; - } - case 'z': { - config.name = optarg; - break; - } - case 'F': { - conf_file = optarg; - break; - } - - // Server or Client - case 'S': { - role -= 1; - break; - } - case 'C': { - role += 1; - break; - } - case 'k': { - server_configuration.passphrase = std::string(optarg); - client_configuration.passphrase = std::string(optarg); - server_configuration.sign = true; - break; - } - - // Client specifc - case 'b': { - client_configuration.beta = std::stod(optarg); - options = 1; - break; - } - case 'd': { - client_configuration.drop_factor = std::stod(optarg); - options = 1; - break; - } - case 'W': { - client_configuration.window = std::stod(optarg); - options = 1; - break; - } - case 'M': { - client_configuration.receive_buffer_size_ = std::stoull(optarg); - options = 1; - break; - } - case 'P': { - client_configuration.producer_prefix_ = Prefix(optarg); - client_configuration.secure_ = true; - break; - } - case 'c': { - client_configuration.producer_certificate = std::string(optarg); - options = 1; - break; - } - case 'i': { - client_configuration.report_interval_milliseconds_ = std::stoul(optarg); - options = 1; - break; - } - case 't': { - client_configuration.test_mode_ = true; - options = 1; - break; - } - case 'L': { - client_configuration.interest_lifetime_ = std::stoul(optarg); - options = 1; - break; - } - // Server specific - case 'A': { - server_configuration.download_size = std::stoul(optarg); - options = -1; - break; - } - case 's': { - server_configuration.payload_size_ = std::stoul(optarg); - options = -1; - break; - } - case 'r': { - server_configuration.virtual_producer = false; - options = -1; - break; - } - case 'm': { - server_configuration.manifest = true; - options = -1; - break; - } - case 'l': { - server_configuration.live_production = true; - options = -1; - break; - } - case 'K': { - server_configuration.keystore_name = std::string(optarg); - server_configuration.sign = true; - options = -1; - break; - } - case 'y': { - if (strncasecmp(optarg, "sha256", 6) == 0) { - server_configuration.hash_algorithm = auth::CryptoHashType::SHA_256; - } else if (strncasecmp(optarg, "sha512", 6) == 0) { - server_configuration.hash_algorithm = auth::CryptoHashType::SHA_512; - } else if (strncasecmp(optarg, "crc32", 5) == 0) { - server_configuration.hash_algorithm = auth::CryptoHashType::CRC32C; - } else { - std::cerr << "Ignored unknown hash algorithm. Using SHA 256." - << std::endl; - } - options = -1; - break; - } - case 'p': { - server_configuration.keystore_password = std::string(optarg); - options = -1; - break; - } - case 'x': { - server_configuration.multiphase_produce_ = true; - options = -1; - break; - } - case 'B': { - auto str = std::string(optarg); - std::transform(str.begin(), str.end(), str.begin(), ::tolower); - server_configuration.production_rate_ = str; - options = -1; - break; - } - case 'E': { - server_configuration.keystore_name = std::string(optarg); - server_configuration.secure_ = true; - break; - } - case 'h': - default: - usage(); - return EXIT_FAILURE; - } - } - - if (options > 0 && role < 0) { - std::cerr << "Client options cannot be used when using the " - "software in server mode" - << std::endl; - usage(); - return EXIT_FAILURE; - } else if (options < 0 && role > 0) { - std::cerr << "Server options cannot be used when using the " - "software in client mode" - << std::endl; - usage(); - return EXIT_FAILURE; - } else if (!role) { - std::cerr << "Please specify if running hiperf as client " - "or server." - << std::endl; - usage(); - return EXIT_FAILURE; - } - - if (argv[optind] == 0) { - std::cerr << "Please specify the name/prefix to use." << std::endl; - usage(); - return EXIT_FAILURE; - } else { - if (role > 0) { - client_configuration.name = Name(argv[optind]); - } else { - server_configuration.name = Prefix(argv[optind]); - } - } - - if (log_file) { -#ifndef _WIN32 - int fd = open(log_file, O_WRONLY | O_APPEND | O_CREAT, S_IWUSR | S_IRUSR); - dup2(fd, STDOUT_FILENO); - dup2(STDOUT_FILENO, STDERR_FILENO); - close(fd); -#else - int fd = - _open(log_file, _O_WRONLY | _O_APPEND | _O_CREAT, _S_IWRITE | _S_IREAD); - _dup2(fd, _fileno(stdout)); - _dup2(_fileno(stdout), _fileno(stderr)); - _close(fd); -#endif - } - -#ifndef _WIN32 - if (daemon) { - utils::Daemonizator::daemonize(false); - } -#endif - - /** - * IO module configuration - */ - config.set(); - - // Parse config file - transport::interface::global_config::parseConfigurationFile(conf_file); - - if (role > 0) { - HIperfClient c(client_configuration); - if (c.setup() != ERROR_SETUP) { - c.run(); - } - } else if (role < 0) { - HIperfServer s(server_configuration); - if (s.setup() != ERROR_SETUP) { - s.run(); - } - } else { - usage(); - return EXIT_FAILURE; - } - -#ifdef _WIN32 - WSACleanup(); -#endif - - return 0; -} - -} // end namespace interface -} // end namespace transport - -int main(int argc, char *argv[]) { - return transport::interface::main(argc, argv); -} |