diff options
-rw-r--r-- | libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc | 71 | ||||
-rw-r--r-- | libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h | 10 | ||||
-rw-r--r-- | libtransport/src/hicn/transport/protocols/rtc.cc | 73 | ||||
-rw-r--r-- | libtransport/src/hicn/transport/protocols/rtc.h | 16 | ||||
-rw-r--r-- | utils/src/hiperf.cc | 418 |
5 files changed, 368 insertions, 220 deletions
diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc index 00cc82543..67fcc83e3 100644 --- a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc +++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc @@ -24,6 +24,11 @@ #define INIT_PACKET_PRODUCTION_RATE 100 // pps random value (almost 1Mbps) #define STATS_INTERVAL_DURATION 500 // ms #define INTEREST_LIFETIME_REDUCTION_FACTOR 0.8 +#define INACTIVE_TIME 100 // ms opus generates ~50 packets per seocnd, one + // every +// 20ms. to be safe we use 20ms*5 as timer for an +// inactive socket +#define MILLI_IN_A_SEC 1000 // ms in a second // NACK HEADER // +-----------------------------------------+ @@ -46,11 +51,14 @@ RTCProducerSocket::RTCProducerSocket(asio::io_service &io_service) producedPackets_(0), bytesProductionRate_(0), packetsProductionRate_(INIT_PACKET_PRODUCTION_RATE), - perSecondFactor_(1000 / STATS_INTERVAL_DURATION) { + perSecondFactor_(MILLI_IN_A_SEC / STATS_INTERVAL_DURATION), + active_(false) { auto nack_payload = utils::MemBuf::create(NACK_HEADER_SIZE); nack_payload->append(NACK_HEADER_SIZE); nack_->appendPayload(std::move(nack_payload)); - lastStats_ = std::chrono::steady_clock::now(); + lastStats_ = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); srand((unsigned int)time(NULL)); prodLabel_ = ((rand() % 255) << 24UL); } @@ -63,11 +71,14 @@ RTCProducerSocket::RTCProducerSocket() producedPackets_(0), bytesProductionRate_(0), packetsProductionRate_(INIT_PACKET_PRODUCTION_RATE), - perSecondFactor_(1000 / STATS_INTERVAL_DURATION) { + perSecondFactor_(MILLI_IN_A_SEC / STATS_INTERVAL_DURATION), + active_(false) { auto nack_payload = utils::MemBuf::create(NACK_HEADER_SIZE); nack_payload->append(NACK_HEADER_SIZE); nack_->appendPayload(std::move(nack_payload)); - lastStats_ = std::chrono::steady_clock::now(); + lastStats_ = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); srand((unsigned int)time(NULL)); prodLabel_ = ((rand() % 255) << 24UL); } @@ -92,16 +103,15 @@ void RTCProducerSocket::registerPrefix(const Prefix &producer_namespace) { } } -void RTCProducerSocket::updateStats(uint32_t packet_size) { +void RTCProducerSocket::updateStats(uint32_t packet_size, uint64_t now) { producedBytes_ += packet_size; producedPackets_++; - std::chrono::steady_clock::duration duration = - std::chrono::steady_clock::now() - lastStats_; - if (std::chrono::duration_cast<std::chrono::milliseconds>(duration).count() >= - STATS_INTERVAL_DURATION) { - lastStats_ = std::chrono::steady_clock::now(); + uint64_t duration = now - lastStats_; + if (duration >= STATS_INTERVAL_DURATION) { + lastStats_ = now; bytesProductionRate_ = producedBytes_ * perSecondFactor_; packetsProductionRate_ = producedPackets_ * perSecondFactor_; + if (packetsProductionRate_.load() == 0) packetsProductionRate_ = 1; producedBytes_ = 0; producedPackets_ = 0; } @@ -117,17 +127,20 @@ void RTCProducerSocket::produce(const uint8_t *buf, size_t buffer_size) { return; } - updateStats((uint32_t)(buffer_size + headerSize_ + TIMESTAMP_LEN)); + active_ = true; + uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); - ContentObject content_object(flowName_.setSuffix(currentSeg_)); + lastProduced_ = now; + + updateStats((uint32_t)(buffer_size + headerSize_ + TIMESTAMP_LEN), now); - uint64_t timestamp = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::system_clock::now().time_since_epoch()) - .count(); + ContentObject content_object(flowName_.setSuffix(currentSeg_)); auto payload = utils::MemBuf::create(buffer_size + TIMESTAMP_LEN); - memcpy(payload->writableData(), ×tamp, TIMESTAMP_LEN); + memcpy(payload->writableData(), &now, TIMESTAMP_LEN); memcpy(payload->writableData() + TIMESTAMP_LEN, buf, buffer_size); payload->append(buffer_size + TIMESTAMP_LEN); content_object.appendPayload(std::move(payload)); @@ -149,23 +162,41 @@ void RTCProducerSocket::onInterest(Interest::Ptr &&interest) { on_interest_input_(*this, *interest); } - // packetsProductionRate_ is modified by another thread in updateStats - // this should be safe since I just read here. + if (active_.load()) { + uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + if (now - lastProduced_.load() >= INACTIVE_TIME) { + active_ = false; + } + } + + if (TRANSPORT_EXPECT_FALSE(!active_.load())) { + sendNack(*interest); + return; + } + max_gap = (uint32_t)floor( (double)((double)((double)lifetime * INTEREST_LIFETIME_REDUCTION_FACTOR / 1000.0) * - (double)packetsProductionRate_)); + (double)packetsProductionRate_.load())); if (interestSeg < currentSeg_ || interestSeg > (max_gap + currentSeg_)) { sendNack(*interest); } + // else drop packet } void RTCProducerSocket::sendNack(const Interest &interest) { nack_->setName(interest.getName()); uint32_t *payload_ptr = (uint32_t *)nack_->getPayload()->data(); *payload_ptr = currentSeg_; - *(++payload_ptr) = bytesProductionRate_; + + if (active_.load()) { + *(++payload_ptr) = bytesProductionRate_; + } else { + *(++payload_ptr) = 0; + } nack_->setLifetime(0); nack_->setPathLabel(prodLabel_); diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h index bc54be4bb..cb09ef991 100644 --- a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h +++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h @@ -18,6 +18,7 @@ #include <hicn/transport/interfaces/socket_producer.h> #include <hicn/transport/utils/content_store.h> +#include <atomic> #include <map> #include <mutex> @@ -41,7 +42,7 @@ class RTCProducerSocket : public ProducerSocket { private: void sendNack(const Interest &interest); - void updateStats(uint32_t packet_size); + void updateStats(uint32_t packet_size, uint64_t now); // std::map<uint32_t, uint64_t> pendingInterests_; uint32_t currentSeg_; @@ -53,9 +54,12 @@ class RTCProducerSocket : public ProducerSocket { uint32_t producedBytes_; uint32_t producedPackets_; uint32_t bytesProductionRate_; - uint32_t packetsProductionRate_; + std::atomic<uint32_t> packetsProductionRate_; uint32_t perSecondFactor_; - std::chrono::steady_clock::time_point lastStats_; + uint64_t lastStats_; + // std::chrono::steady_clock::time_point lastProduced_; + std::atomic<uint64_t> lastProduced_; + std::atomic<bool> active_; }; } // namespace interface diff --git a/libtransport/src/hicn/transport/protocols/rtc.cc b/libtransport/src/hicn/transport/protocols/rtc.cc index a993d596b..98abbe35b 100644 --- a/libtransport/src/hicn/transport/protocols/rtc.cc +++ b/libtransport/src/hicn/transport/protocols/rtc.cc @@ -38,6 +38,8 @@ RTCTransportProtocol::RTCTransportProtocol( inflightInterests_(1 << default_values::log_2_default_buffer_size), modMask_((1 << default_values::log_2_default_buffer_size) - 1) { icnet_socket->getSocketOption(PORTAL, portal_); + nack_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService()); + nack_timer_used_ = false; reset(); } @@ -211,8 +213,6 @@ void RTCTransportProtocol::updateStats(uint32_t round_duration) { auto it = pathTable_.find(producerPathLabel_); if (it == pathTable_.end()) return; - // double maxAvgRTT = it->second->getAverageRtt(); - // double minRTT = it->second->getMinRtt(); minRtt_ = it->second->getMinRtt(); queuingDelay_ = it->second->getQueuingDealy(); @@ -222,22 +222,6 @@ void RTCTransportProtocol::updateStats(uint32_t round_duration) { it->second->roundEnd(); } - // this is inefficient but the window is supposed to be small, so it - // probably makes sense to leave it like this - // if(minRTT == 0) - // minRTT = 1; - - // minRTTwin_[roundCounter_ % MIN_RTT_WIN] = minRTT; - // minRtt_ = minRTT; - // for (int i = 0; i < MIN_RTT_WIN; i++) - // if(minRtt_ > minRTTwin_[i]) - // minRtt_ = minRTTwin_[i]; - - // roundCounter_++; - - // std::cout << "min RTT " << minRtt_ << " queuing " << queuingDelay_ << - // std::endl; - if (sentInterest_ != 0 && currentState_ == HICN_RTC_NORMAL_STATE) { double lossRate = (double)((double)packetLost_ / (double)sentInterest_); lossRate_ = lossRate_ * HICN_ESTIMATED_LOSSES_ALPHA + @@ -473,23 +457,45 @@ void RTCTransportProtocol::onTimeout(Interest::Ptr &&interest) { scheduleNextInterests(); } +bool RTCTransportProtocol::checkIfProducerIsActive( + const ContentObject &content_object) { + uint32_t *payload = (uint32_t *)content_object.getPayload()->data(); + uint32_t productionSeg = *payload; + uint32_t productionRate = *(++payload); + + if (productionRate == 0) { + // the producer socket is not active + // in this case we consider only the first nack + if (nack_timer_used_) return false; + + nack_timer_used_ = true; + // actualSegment_ should be the one in the nack, which will the next in + // production + actualSegment_ = productionSeg; + // all the rest (win size should not change) + // we wait a bit before pull the socket again + nack_timer_->expires_from_now(std::chrono::milliseconds(500)); + nack_timer_->async_wait([this](std::error_code ec) { + if (ec) return; + nack_timer_used_ = false; + scheduleNextInterests(); + }); + return false; + } + + return true; +} + void RTCTransportProtocol::onNack(const ContentObject &content_object) { uint32_t *payload = (uint32_t *)content_object.getPayload()->data(); uint32_t productionSeg = *payload; uint32_t productionRate = *(++payload); uint32_t nackSegment = content_object.getName().getSuffix(); + gotNack_ = true; // we synch the estimated production rate with the actual one estimatedBw_ = (double)productionRate; - // if(inflightInterests_[segmentNumber % - // default_values::default_buffer_size].retransmissions != 0){ ignore nacks - // for retransmissions - // return; - //} - - gotNack_ = true; - if (productionSeg > nackSegment) { // we are asking for stuff produced in the past actualSegment_ = max(productionSeg + 1, actualSegment_); @@ -535,14 +541,21 @@ void RTCTransportProtocol::onContentObject( uint32_t payload_size = (uint32_t)payload->length(); uint32_t segmentNumber = content_object->getName().getSuffix(); uint32_t pkt = segmentNumber & modMask_; + bool schedule_next_interest = true; if (payload_size == HICN_NACK_HEADER_SIZE) { - // Nacks always come form the producer, so we set the producerePathLabel_; + // Nacks always come form the producer, so we set the producerPathLabel_; producerPathLabel_ = content_object->getPathLabel(); if (inflightInterests_[pkt].retransmissions == 0) { + // discard nacks for rtx packets inflightInterestsCount_--; - onNack(*content_object); + schedule_next_interest = checkIfProducerIsActive(*content_object); + // if checkIfProducerIsActive returns true, we did all we need to do + // inside that function, no need to call onNack + if (!schedule_next_interest) onNack(*content_object); updateDelayStats(*content_object); + } else { + schedule_next_interest = checkIfProducerIsActive(*content_object); } } else { @@ -564,7 +577,9 @@ void RTCTransportProtocol::onContentObject( increaseWindow(); } - scheduleNextInterests(); + if (schedule_next_interest) { + scheduleNextInterests(); + } } void RTCTransportProtocol::returnContentToApplication( diff --git a/libtransport/src/hicn/transport/protocols/rtc.h b/libtransport/src/hicn/transport/protocols/rtc.h index 58c143988..0bb9d9b2e 100644 --- a/libtransport/src/hicn/transport/protocols/rtc.h +++ b/libtransport/src/hicn/transport/protocols/rtc.h @@ -121,6 +121,10 @@ class RTCTransportProtocol : public TransportProtocol, public Reassembly { void scheduleNextInterests() override; void scheduleAppNackRtx(std::vector<uint32_t> &nacks); void onTimeout(Interest::Ptr &&interest) override; + // checkIfProducerIsActive: return true if we need to schedule an interest + // immediatly after, false otherwise (this happens when the producer socket + // is not active) + bool checkIfProducerIsActive(const ContentObject &content_object); void onNack(const ContentObject &content_object); void onContentObject(Interest::Ptr &&interest, ContentObject::Ptr &&content_object) override; @@ -142,18 +146,11 @@ class RTCTransportProtocol : public TransportProtocol, public Reassembly { // controller var std::chrono::steady_clock::time_point lastRoundBegin_; - // bool allPacketsInSync_; - // unsigned numberOfRoundsInSync_; - // unsigned numberOfCatchUpRounds_; - // bool catchUpPhase_; unsigned currentState_; - // uint32_t inProduction_; - // cwin var uint32_t currentCWin_; uint32_t maxCWin_; - // uint32_t previousCWin_; // names/packets var uint32_t actualSegment_; @@ -167,6 +164,10 @@ class RTCTransportProtocol : public TransportProtocol, public Reassembly { // application for pakets for which we already got a // past NACK by the producer these packet are too old, // they will never be retrived + bool nack_timer_used_; + std::unique_ptr<asio::steady_timer> nack_timer_; // timer used to schedule + // a nack retransmission in + // of inactive prod socket uint32_t modMask_; @@ -185,7 +186,6 @@ class RTCTransportProtocol : public TransportProtocol, public Reassembly { // vector std::unordered_map<uint32_t, std::shared_ptr<RTCDataPath>> pathTable_; uint32_t roundCounter_; - // std::vector<uint64_t> minRTTwin_; uint64_t minRtt_; // CC var diff --git a/utils/src/hiperf.cc b/utils/src/hiperf.cc index 34cb79b3f..425b91bca 100644 --- a/utils/src/hiperf.cc +++ b/utils/src/hiperf.cc @@ -46,11 +46,17 @@ using Identity = utils::Identity; struct ClientConfiguration { ClientConfiguration() - : name("b001::abcd", 0), verify(false), beta(-1.f), drop_factor(-1.f), - window(-1), virtual_download(true), + : name("b001::abcd", 0), + verify(false), + beta(-1.f), + drop_factor(-1.f), + window(-1), + virtual_download(true), producer_certificate("/tmp/rsa_certificate.pem"), receive_buffer(std::make_shared<std::vector<uint8_t>>()), - download_size(0), report_interval_milliseconds_(1000), rtc_(false) {} + download_size(0), + report_interval_milliseconds_(1000), + rtc_(false) {} Name name; bool verify; @@ -67,7 +73,7 @@ struct ClientConfiguration { }; class Rate { -public: + public: Rate() : rate_kbps_(0) {} Rate(const std::string &rate) { @@ -97,19 +103,28 @@ public: packet_size * long(std::round(1000.0 * 8.0 / rate_kbps_))); } -private: + private: float rate_kbps_; }; struct ServerConfiguration { ServerConfiguration() - : name("b001::abcd/64"), virtual_producer(true), manifest(false), - live_production(false), sign(false), content_lifetime(600000000_U32), - content_object_size(1440), download_size(20 * 1024 * 1024), + : name("b001::abcd/64"), + virtual_producer(true), + manifest(false), + live_production(false), + sign(false), + content_lifetime(600000000_U32), + content_object_size(1440), + download_size(20 * 1024 * 1024), hash_algorithm(HashAlgorithm::SHA_256), keystore_name("/tmp/rsa_crypto_material.p12"), - keystore_password("cisco"), multiphase_produce_(false), rtc_(false), - production_rate_(std::string("2048kbps")), payload_size_(1400) {} + keystore_password("cisco"), + multiphase_produce_(false), + rtc_(false), + interactive_(false), + production_rate_(std::string("2048kbps")), + payload_size_(1400) {} Prefix name; bool virtual_producer; @@ -124,6 +139,7 @@ struct ServerConfiguration { std::string keystore_password; bool multiphase_produce_; bool rtc_; + bool interactive_; Rate production_rate_; std::size_t payload_size_; }; @@ -132,10 +148,12 @@ class HIperfClient { typedef std::chrono::time_point<std::chrono::steady_clock> Time; typedef std::chrono::microseconds TimeDuration; -public: + public: HIperfClient(const ClientConfiguration &conf) - : configuration_(conf), total_duration_milliseconds_(0), - old_bytes_value_(0), signals_(io_service_, SIGINT) {} + : configuration_(conf), + total_duration_milliseconds_(0), + old_bytes_value_(0), + signals_(io_service_, SIGINT) {} void processPayload(ConsumerSocket &c, std::size_t bytes_transferred, const std::error_code &ec) { @@ -154,6 +172,11 @@ public: io_service_.stop(); } + void processPayloadRtc(ConsumerSocket &c, std::size_t bytes_transferred, + const std::error_code &ec) { + configuration_.receive_buffer->clear(); + } + bool verifyData(ConsumerSocket &c, const ContentObject &contentObject) { if (contentObject.getPayloadType() == PayloadType::CONTENT_OBJECT) { std::cout << "VERIFY CONTENT" << std::endl; @@ -298,11 +321,19 @@ public: return ERROR_SETUP; } - ret = consumer_socket_->setSocketOption( - ConsumerCallbacksOptions::CONTENT_RETRIEVED, - (ConsumerContentCallback)std::bind( - &HIperfClient::processPayload, this, std::placeholders::_1, - std::placeholders::_2, std::placeholders::_3)); + if (!configuration_.rtc_) { + ret = consumer_socket_->setSocketOption( + ConsumerCallbacksOptions::CONTENT_RETRIEVED, + (ConsumerContentCallback)std::bind( + &HIperfClient::processPayload, this, std::placeholders::_1, + std::placeholders::_2, std::placeholders::_3)); + } else { + ret = consumer_socket_->setSocketOption( + ConsumerCallbacksOptions::CONTENT_RETRIEVED, + (ConsumerContentCallback)std::bind( + &HIperfClient::processPayloadRtc, this, std::placeholders::_1, + std::placeholders::_2, std::placeholders::_3)); + } if (ret == SOCKET_OPTION_NOT_SET) { return ERROR_SETUP; @@ -346,7 +377,7 @@ public: return ERROR_SUCCESS; } -private: + private: ClientConfiguration configuration_; Time t_stats_; Time t_download_; @@ -360,13 +391,19 @@ private: class HIperfServer { const std::size_t log2_content_object_buffer_size = 8; -public: + public: HIperfServer(ServerConfiguration &conf) - : configuration_(conf), signals_(io_service_, SIGINT), + : configuration_(conf), + signals_(io_service_, SIGINT), rtc_timer_(io_service_), content_objects_((std::uint16_t)(1 << log2_content_object_buffer_size)), content_objects_index_(0), - mask_((std::uint16_t)(1 << log2_content_object_buffer_size) - 1) { + mask_((std::uint16_t)(1 << log2_content_object_buffer_size) - 1), +#ifndef _WIN32 + input_(io_service_, ::dup(STDIN_FILENO)), + rtc_running_(false) +#endif + { std::string buffer(configuration_.payload_size_, 'X'); std::cout << "Producing contents under name " << conf.name.getName() << std::endl; @@ -414,10 +451,9 @@ public: << std::endl; } - std::shared_ptr<utils::Identity> - setProducerIdentity(std::string &keystore_name, - std::string &keystore_password, - HashAlgorithm &hash_algorithm) { + std::shared_ptr<utils::Identity> setProducerIdentity( + std::string &keystore_name, std::string &keystore_password, + HashAlgorithm &hash_algorithm) { if (access(keystore_name.c_str(), F_OK) != -1) { return std::make_shared<utils::Identity>(keystore_name, keystore_password, hash_algorithm); @@ -527,6 +563,36 @@ public: } } +#ifndef _WIN32 + void handleInput(const std::error_code &error, std::size_t length) { + if (error) { + producer_socket_->stop(); + io_service_.stop(); + } + + if (rtc_running_) { + std::cout << "stop real time content production" << std::endl; + rtc_running_ = false; + rtc_timer_.cancel(); + } else { + std::cout << "start real time content production" << std::endl; + rtc_running_ = true; + rtc_timer_.expires_from_now( + configuration_.production_rate_.getMicrosecondsForPacket( + configuration_.payload_size_)); + rtc_timer_.async_wait( + std::bind(&HIperfServer::sendRTCContentObjectCallback, this, + std::placeholders::_1)); + } + + input_buffer_.consume(length); // Remove newline from input. + asio::async_read_until( + input_, input_buffer_, '\n', + std::bind(&HIperfServer::handleInput, this, std::placeholders::_1, + std::placeholders::_2)); + } +#endif + int run() { std::cerr << "Starting to serve consumers" << std::endl; @@ -537,12 +603,29 @@ public: }); if (configuration_.rtc_) { +#ifndef _WIN32 + if (configuration_.interactive_) { + asio::async_read_until( + input_, input_buffer_, '\n', + std::bind(&HIperfServer::handleInput, this, std::placeholders::_1, + std::placeholders::_2)); + } else { + rtc_running_ = true; + rtc_timer_.expires_from_now( + configuration_.production_rate_.getMicrosecondsForPacket( + configuration_.payload_size_)); + rtc_timer_.async_wait( + std::bind(&HIperfServer::sendRTCContentObjectCallback, this, + std::placeholders::_1)); + } +#else rtc_timer_.expires_from_now( configuration_.production_rate_.getMicrosecondsForPacket( configuration_.payload_size_)); rtc_timer_.async_wait( std::bind(&HIperfServer::sendRTCContentObjectCallback, this, std::placeholders::_1)); +#endif } io_service_.run(); @@ -550,7 +633,7 @@ public: return ERROR_SUCCESS; } -private: + private: ServerConfiguration configuration_; asio::io_service io_service_; asio::signal_set signals_; @@ -559,6 +642,11 @@ private: std::uint16_t content_objects_index_; std::uint16_t mask_; std::unique_ptr<ProducerSocket> producer_socket_; +#ifndef _WIN32 + asio::posix::stream_descriptor input_; + asio::streambuf input_buffer_; + bool rtc_running_; +#endif }; void usage() { @@ -607,10 +695,16 @@ void usage() { << std::endl; std::cerr << " <download_size> without " "resetting the suffix to 0" - << std::endl; - std::cerr << "-B <bitrate> = bitrate for RTC " + << std::endl; + std::cerr << "-B <bitrate> = bitrate for RTC " "producer, to be used with the -R option" << std::endl; +#ifndef _WIN32 + std::cerr << "-I = interactive mode," + "start/stop real time content production " + "by pressing return. To be used with the -R option" + << std::endl; +#endif std::cerr << std::endl; std::cerr << "Client specific:" << std::endl; std::cerr << "-b <beta_parameter> = RAAQM beta parameter" @@ -657,146 +751,150 @@ int main(int argc, char *argv[]) { int opt; #ifndef _WIN32 - while ((opt = getopt(argc, argv, "DSCf:b:d:W:RMc:vA:s:rmlk:y:p:hi:xB:")) != + while ((opt = getopt(argc, argv, "DSCf:b:d:W:RMc:vA:s:rmlk:y:p:hi:xB:I")) != -1) { switch (opt) { - // Common - case 'D': { - daemon = true; - break; - } + // Common + case 'D': { + daemon = true; + break; + } + case 'I': { + server_configuration.interactive_ = true; + break; + } #else while ((opt = getopt(argc, argv, "SCf:b:d:W:RMc:vA:s:rmlk:y:p:hi:xB:")) != -1) { switch (opt) { #endif - case 'f': { - log_file = optarg; - break; - } - case 'R': { - client_configuration.rtc_ = true; - server_configuration.rtc_ = true; - break; - } + case 'f': { + log_file = optarg; + break; + } + case 'R': { + client_configuration.rtc_ = true; + server_configuration.rtc_ = true; + break; + } - // Server or Client - case 'S': { - role -= 1; - break; - } - case 'C': { - role += 1; - break; - } + // Server or Client + case 'S': { + role -= 1; + break; + } + case 'C': { + role += 1; + break; + } - // Client specifc - case 'b': { - client_configuration.beta = std::stod(optarg); - options = 1; - break; - } - case 'd': { - client_configuration.drop_factor = std::stod(optarg); - options = 1; - break; - } - case 'W': { - client_configuration.window = std::stod(optarg); - options = 1; - break; - } - case 'M': { - client_configuration.virtual_download = false; - options = 1; - break; - } - case 'c': { - client_configuration.producer_certificate = std::string(optarg); - options = 1; - break; - } - case 'v': { - client_configuration.verify = true; - options = 1; - break; - } - case 'i': { - client_configuration.report_interval_milliseconds_ = std::stoul(optarg); - options = 1; - break; - } + // Client specifc + case 'b': { + client_configuration.beta = std::stod(optarg); + options = 1; + break; + } + case 'd': { + client_configuration.drop_factor = std::stod(optarg); + options = 1; + break; + } + case 'W': { + client_configuration.window = std::stod(optarg); + options = 1; + break; + } + case 'M': { + client_configuration.virtual_download = false; + options = 1; + break; + } + case 'c': { + client_configuration.producer_certificate = std::string(optarg); + options = 1; + break; + } + case 'v': { + client_configuration.verify = true; + options = 1; + break; + } + case 'i': { + client_configuration.report_interval_milliseconds_ = std::stoul(optarg); + options = 1; + break; + } - // Server specific - case 'A': { - server_configuration.download_size = std::stoul(optarg); - options = -1; - break; - } - case 's': { - server_configuration.payload_size_ = std::stoul(optarg); - options = -1; - break; - } - case 'r': { - server_configuration.virtual_producer = false; - options = -1; - break; - } - case 'm': { - server_configuration.manifest = true; - options = -1; - break; - } - case 'l': { - server_configuration.live_production = true; - options = -1; - break; - } - case 'k': { - server_configuration.keystore_name = std::string(optarg); - server_configuration.sign = true; - options = -1; - break; - } - case 'y': { - if (strncasecmp(optarg, "sha256", 6) == 0) { - server_configuration.hash_algorithm = HashAlgorithm::SHA_256; - } else if (strncasecmp(optarg, "sha512", 6) == 0) { - server_configuration.hash_algorithm = HashAlgorithm::SHA_512; - } else if (strncasecmp(optarg, "crc32", 5) == 0) { - server_configuration.hash_algorithm = HashAlgorithm::CRC32C; - } else { - std::cerr << "Ignored unknown hash algorithm. Using SHA 256." - << std::endl; + // Server specific + case 'A': { + server_configuration.download_size = std::stoul(optarg); + options = -1; + break; } - options = -1; - break; - } - case 'p': { - server_configuration.keystore_password = std::string(optarg); - options = -1; - break; - } - case 'x': { - server_configuration.multiphase_produce_ = true; - options = -1; - break; - } - case 'B': { - auto str = std::string(optarg); - std::transform(str.begin(), str.end(), str.begin(), ::tolower); - std::cout << "---------------------------------------------------------" - "---------------------->" - << str << std::endl; - server_configuration.production_rate_ = str; - options = -1; - break; - } - case 'h': - default: - usage(); - return EXIT_FAILURE; + case 's': { + server_configuration.payload_size_ = std::stoul(optarg); + options = -1; + break; + } + case 'r': { + server_configuration.virtual_producer = false; + options = -1; + break; + } + case 'm': { + server_configuration.manifest = true; + options = -1; + break; + } + case 'l': { + server_configuration.live_production = true; + options = -1; + break; + } + case 'k': { + server_configuration.keystore_name = std::string(optarg); + server_configuration.sign = true; + options = -1; + break; + } + case 'y': { + if (strncasecmp(optarg, "sha256", 6) == 0) { + server_configuration.hash_algorithm = HashAlgorithm::SHA_256; + } else if (strncasecmp(optarg, "sha512", 6) == 0) { + server_configuration.hash_algorithm = HashAlgorithm::SHA_512; + } else if (strncasecmp(optarg, "crc32", 5) == 0) { + server_configuration.hash_algorithm = HashAlgorithm::CRC32C; + } else { + std::cerr << "Ignored unknown hash algorithm. Using SHA 256." + << std::endl; + } + options = -1; + break; + } + case 'p': { + server_configuration.keystore_password = std::string(optarg); + options = -1; + break; + } + case 'x': { + server_configuration.multiphase_produce_ = true; + options = -1; + break; + } + case 'B': { + auto str = std::string(optarg); + std::transform(str.begin(), str.end(), str.begin(), ::tolower); + std::cout << "---------------------------------------------------------" + "---------------------->" + << str << std::endl; + server_configuration.production_rate_ = str; + options = -1; + break; + } + case 'h': + default: + usage(); + return EXIT_FAILURE; } } @@ -875,9 +973,9 @@ int main(int argc, char *argv[]) { return 0; } -} // end namespace interface +} // end namespace interface -} // end namespace transport +} // end namespace transport int main(int argc, char *argv[]) { return transport::interface::main(argc, argv); |