From 69a7a13dab43f774c8624ecd51994ca5ef322cc6 Mon Sep 17 00:00:00 2001 From: michele papalini Date: Tue, 2 Apr 2019 17:41:45 +0200 Subject: [HICN-159] RTC traffic in hiperf Change-Id: I6a6d791e97446aab03ea463046d24f98444e9a1c Signed-off-by: michele papalini --- utils/src/hiperf.cc | 418 ++++++++++++++++++++++++++++++++-------------------- 1 file changed, 258 insertions(+), 160 deletions(-) diff --git a/utils/src/hiperf.cc b/utils/src/hiperf.cc index 34cb79b3f..425b91bca 100644 --- a/utils/src/hiperf.cc +++ b/utils/src/hiperf.cc @@ -46,11 +46,17 @@ using Identity = utils::Identity; struct ClientConfiguration { ClientConfiguration() - : name("b001::abcd", 0), verify(false), beta(-1.f), drop_factor(-1.f), - window(-1), virtual_download(true), + : name("b001::abcd", 0), + verify(false), + beta(-1.f), + drop_factor(-1.f), + window(-1), + virtual_download(true), producer_certificate("/tmp/rsa_certificate.pem"), receive_buffer(std::make_shared>()), - download_size(0), report_interval_milliseconds_(1000), rtc_(false) {} + download_size(0), + report_interval_milliseconds_(1000), + rtc_(false) {} Name name; bool verify; @@ -67,7 +73,7 @@ struct ClientConfiguration { }; class Rate { -public: + public: Rate() : rate_kbps_(0) {} Rate(const std::string &rate) { @@ -97,19 +103,28 @@ public: packet_size * long(std::round(1000.0 * 8.0 / rate_kbps_))); } -private: + private: float rate_kbps_; }; struct ServerConfiguration { ServerConfiguration() - : name("b001::abcd/64"), virtual_producer(true), manifest(false), - live_production(false), sign(false), content_lifetime(600000000_U32), - content_object_size(1440), download_size(20 * 1024 * 1024), + : name("b001::abcd/64"), + virtual_producer(true), + manifest(false), + live_production(false), + sign(false), + content_lifetime(600000000_U32), + content_object_size(1440), + download_size(20 * 1024 * 1024), hash_algorithm(HashAlgorithm::SHA_256), keystore_name("/tmp/rsa_crypto_material.p12"), - keystore_password("cisco"), multiphase_produce_(false), rtc_(false), - production_rate_(std::string("2048kbps")), payload_size_(1400) {} + keystore_password("cisco"), + multiphase_produce_(false), + rtc_(false), + interactive_(false), + production_rate_(std::string("2048kbps")), + payload_size_(1400) {} Prefix name; bool virtual_producer; @@ -124,6 +139,7 @@ struct ServerConfiguration { std::string keystore_password; bool multiphase_produce_; bool rtc_; + bool interactive_; Rate production_rate_; std::size_t payload_size_; }; @@ -132,10 +148,12 @@ class HIperfClient { typedef std::chrono::time_point Time; typedef std::chrono::microseconds TimeDuration; -public: + public: HIperfClient(const ClientConfiguration &conf) - : configuration_(conf), total_duration_milliseconds_(0), - old_bytes_value_(0), signals_(io_service_, SIGINT) {} + : configuration_(conf), + total_duration_milliseconds_(0), + old_bytes_value_(0), + signals_(io_service_, SIGINT) {} void processPayload(ConsumerSocket &c, std::size_t bytes_transferred, const std::error_code &ec) { @@ -154,6 +172,11 @@ public: io_service_.stop(); } + void processPayloadRtc(ConsumerSocket &c, std::size_t bytes_transferred, + const std::error_code &ec) { + configuration_.receive_buffer->clear(); + } + bool verifyData(ConsumerSocket &c, const ContentObject &contentObject) { if (contentObject.getPayloadType() == PayloadType::CONTENT_OBJECT) { std::cout << "VERIFY CONTENT" << std::endl; @@ -298,11 +321,19 @@ public: return ERROR_SETUP; } - ret = consumer_socket_->setSocketOption( - ConsumerCallbacksOptions::CONTENT_RETRIEVED, - (ConsumerContentCallback)std::bind( - &HIperfClient::processPayload, this, std::placeholders::_1, - std::placeholders::_2, std::placeholders::_3)); + if (!configuration_.rtc_) { + ret = consumer_socket_->setSocketOption( + ConsumerCallbacksOptions::CONTENT_RETRIEVED, + (ConsumerContentCallback)std::bind( + &HIperfClient::processPayload, this, std::placeholders::_1, + std::placeholders::_2, std::placeholders::_3)); + } else { + ret = consumer_socket_->setSocketOption( + ConsumerCallbacksOptions::CONTENT_RETRIEVED, + (ConsumerContentCallback)std::bind( + &HIperfClient::processPayloadRtc, this, std::placeholders::_1, + std::placeholders::_2, std::placeholders::_3)); + } if (ret == SOCKET_OPTION_NOT_SET) { return ERROR_SETUP; @@ -346,7 +377,7 @@ public: return ERROR_SUCCESS; } -private: + private: ClientConfiguration configuration_; Time t_stats_; Time t_download_; @@ -360,13 +391,19 @@ private: class HIperfServer { const std::size_t log2_content_object_buffer_size = 8; -public: + public: HIperfServer(ServerConfiguration &conf) - : configuration_(conf), signals_(io_service_, SIGINT), + : configuration_(conf), + signals_(io_service_, SIGINT), rtc_timer_(io_service_), content_objects_((std::uint16_t)(1 << log2_content_object_buffer_size)), content_objects_index_(0), - mask_((std::uint16_t)(1 << log2_content_object_buffer_size) - 1) { + mask_((std::uint16_t)(1 << log2_content_object_buffer_size) - 1), +#ifndef _WIN32 + input_(io_service_, ::dup(STDIN_FILENO)), + rtc_running_(false) +#endif + { std::string buffer(configuration_.payload_size_, 'X'); std::cout << "Producing contents under name " << conf.name.getName() << std::endl; @@ -414,10 +451,9 @@ public: << std::endl; } - std::shared_ptr - setProducerIdentity(std::string &keystore_name, - std::string &keystore_password, - HashAlgorithm &hash_algorithm) { + std::shared_ptr setProducerIdentity( + std::string &keystore_name, std::string &keystore_password, + HashAlgorithm &hash_algorithm) { if (access(keystore_name.c_str(), F_OK) != -1) { return std::make_shared(keystore_name, keystore_password, hash_algorithm); @@ -527,6 +563,36 @@ public: } } +#ifndef _WIN32 + void handleInput(const std::error_code &error, std::size_t length) { + if (error) { + producer_socket_->stop(); + io_service_.stop(); + } + + if (rtc_running_) { + std::cout << "stop real time content production" << std::endl; + rtc_running_ = false; + rtc_timer_.cancel(); + } else { + std::cout << "start real time content production" << std::endl; + rtc_running_ = true; + rtc_timer_.expires_from_now( + configuration_.production_rate_.getMicrosecondsForPacket( + configuration_.payload_size_)); + rtc_timer_.async_wait( + std::bind(&HIperfServer::sendRTCContentObjectCallback, this, + std::placeholders::_1)); + } + + input_buffer_.consume(length); // Remove newline from input. + asio::async_read_until( + input_, input_buffer_, '\n', + std::bind(&HIperfServer::handleInput, this, std::placeholders::_1, + std::placeholders::_2)); + } +#endif + int run() { std::cerr << "Starting to serve consumers" << std::endl; @@ -537,12 +603,29 @@ public: }); if (configuration_.rtc_) { +#ifndef _WIN32 + if (configuration_.interactive_) { + asio::async_read_until( + input_, input_buffer_, '\n', + std::bind(&HIperfServer::handleInput, this, std::placeholders::_1, + std::placeholders::_2)); + } else { + rtc_running_ = true; + rtc_timer_.expires_from_now( + configuration_.production_rate_.getMicrosecondsForPacket( + configuration_.payload_size_)); + rtc_timer_.async_wait( + std::bind(&HIperfServer::sendRTCContentObjectCallback, this, + std::placeholders::_1)); + } +#else rtc_timer_.expires_from_now( configuration_.production_rate_.getMicrosecondsForPacket( configuration_.payload_size_)); rtc_timer_.async_wait( std::bind(&HIperfServer::sendRTCContentObjectCallback, this, std::placeholders::_1)); +#endif } io_service_.run(); @@ -550,7 +633,7 @@ public: return ERROR_SUCCESS; } -private: + private: ServerConfiguration configuration_; asio::io_service io_service_; asio::signal_set signals_; @@ -559,6 +642,11 @@ private: std::uint16_t content_objects_index_; std::uint16_t mask_; std::unique_ptr producer_socket_; +#ifndef _WIN32 + asio::posix::stream_descriptor input_; + asio::streambuf input_buffer_; + bool rtc_running_; +#endif }; void usage() { @@ -607,10 +695,16 @@ void usage() { << std::endl; std::cerr << " without " "resetting the suffix to 0" - << std::endl; - std::cerr << "-B = bitrate for RTC " + << std::endl; + std::cerr << "-B = bitrate for RTC " "producer, to be used with the -R option" << std::endl; +#ifndef _WIN32 + std::cerr << "-I = interactive mode," + "start/stop real time content production " + "by pressing return. To be used with the -R option" + << std::endl; +#endif std::cerr << std::endl; std::cerr << "Client specific:" << std::endl; std::cerr << "-b = RAAQM beta parameter" @@ -657,146 +751,150 @@ int main(int argc, char *argv[]) { int opt; #ifndef _WIN32 - while ((opt = getopt(argc, argv, "DSCf:b:d:W:RMc:vA:s:rmlk:y:p:hi:xB:")) != + while ((opt = getopt(argc, argv, "DSCf:b:d:W:RMc:vA:s:rmlk:y:p:hi:xB:I")) != -1) { switch (opt) { - // Common - case 'D': { - daemon = true; - break; - } + // Common + case 'D': { + daemon = true; + break; + } + case 'I': { + server_configuration.interactive_ = true; + break; + } #else while ((opt = getopt(argc, argv, "SCf:b:d:W:RMc:vA:s:rmlk:y:p:hi:xB:")) != -1) { switch (opt) { #endif - case 'f': { - log_file = optarg; - break; - } - case 'R': { - client_configuration.rtc_ = true; - server_configuration.rtc_ = true; - break; - } + case 'f': { + log_file = optarg; + break; + } + case 'R': { + client_configuration.rtc_ = true; + server_configuration.rtc_ = true; + break; + } - // Server or Client - case 'S': { - role -= 1; - break; - } - case 'C': { - role += 1; - break; - } + // Server or Client + case 'S': { + role -= 1; + break; + } + case 'C': { + role += 1; + break; + } - // Client specifc - case 'b': { - client_configuration.beta = std::stod(optarg); - options = 1; - break; - } - case 'd': { - client_configuration.drop_factor = std::stod(optarg); - options = 1; - break; - } - case 'W': { - client_configuration.window = std::stod(optarg); - options = 1; - break; - } - case 'M': { - client_configuration.virtual_download = false; - options = 1; - break; - } - case 'c': { - client_configuration.producer_certificate = std::string(optarg); - options = 1; - break; - } - case 'v': { - client_configuration.verify = true; - options = 1; - break; - } - case 'i': { - client_configuration.report_interval_milliseconds_ = std::stoul(optarg); - options = 1; - break; - } + // Client specifc + case 'b': { + client_configuration.beta = std::stod(optarg); + options = 1; + break; + } + case 'd': { + client_configuration.drop_factor = std::stod(optarg); + options = 1; + break; + } + case 'W': { + client_configuration.window = std::stod(optarg); + options = 1; + break; + } + case 'M': { + client_configuration.virtual_download = false; + options = 1; + break; + } + case 'c': { + client_configuration.producer_certificate = std::string(optarg); + options = 1; + break; + } + case 'v': { + client_configuration.verify = true; + options = 1; + break; + } + case 'i': { + client_configuration.report_interval_milliseconds_ = std::stoul(optarg); + options = 1; + break; + } - // Server specific - case 'A': { - server_configuration.download_size = std::stoul(optarg); - options = -1; - break; - } - case 's': { - server_configuration.payload_size_ = std::stoul(optarg); - options = -1; - break; - } - case 'r': { - server_configuration.virtual_producer = false; - options = -1; - break; - } - case 'm': { - server_configuration.manifest = true; - options = -1; - break; - } - case 'l': { - server_configuration.live_production = true; - options = -1; - break; - } - case 'k': { - server_configuration.keystore_name = std::string(optarg); - server_configuration.sign = true; - options = -1; - break; - } - case 'y': { - if (strncasecmp(optarg, "sha256", 6) == 0) { - server_configuration.hash_algorithm = HashAlgorithm::SHA_256; - } else if (strncasecmp(optarg, "sha512", 6) == 0) { - server_configuration.hash_algorithm = HashAlgorithm::SHA_512; - } else if (strncasecmp(optarg, "crc32", 5) == 0) { - server_configuration.hash_algorithm = HashAlgorithm::CRC32C; - } else { - std::cerr << "Ignored unknown hash algorithm. Using SHA 256." - << std::endl; + // Server specific + case 'A': { + server_configuration.download_size = std::stoul(optarg); + options = -1; + break; } - options = -1; - break; - } - case 'p': { - server_configuration.keystore_password = std::string(optarg); - options = -1; - break; - } - case 'x': { - server_configuration.multiphase_produce_ = true; - options = -1; - break; - } - case 'B': { - auto str = std::string(optarg); - std::transform(str.begin(), str.end(), str.begin(), ::tolower); - std::cout << "---------------------------------------------------------" - "---------------------->" - << str << std::endl; - server_configuration.production_rate_ = str; - options = -1; - break; - } - case 'h': - default: - usage(); - return EXIT_FAILURE; + case 's': { + server_configuration.payload_size_ = std::stoul(optarg); + options = -1; + break; + } + case 'r': { + server_configuration.virtual_producer = false; + options = -1; + break; + } + case 'm': { + server_configuration.manifest = true; + options = -1; + break; + } + case 'l': { + server_configuration.live_production = true; + options = -1; + break; + } + case 'k': { + server_configuration.keystore_name = std::string(optarg); + server_configuration.sign = true; + options = -1; + break; + } + case 'y': { + if (strncasecmp(optarg, "sha256", 6) == 0) { + server_configuration.hash_algorithm = HashAlgorithm::SHA_256; + } else if (strncasecmp(optarg, "sha512", 6) == 0) { + server_configuration.hash_algorithm = HashAlgorithm::SHA_512; + } else if (strncasecmp(optarg, "crc32", 5) == 0) { + server_configuration.hash_algorithm = HashAlgorithm::CRC32C; + } else { + std::cerr << "Ignored unknown hash algorithm. Using SHA 256." + << std::endl; + } + options = -1; + break; + } + case 'p': { + server_configuration.keystore_password = std::string(optarg); + options = -1; + break; + } + case 'x': { + server_configuration.multiphase_produce_ = true; + options = -1; + break; + } + case 'B': { + auto str = std::string(optarg); + std::transform(str.begin(), str.end(), str.begin(), ::tolower); + std::cout << "---------------------------------------------------------" + "---------------------->" + << str << std::endl; + server_configuration.production_rate_ = str; + options = -1; + break; + } + case 'h': + default: + usage(); + return EXIT_FAILURE; } } @@ -875,9 +973,9 @@ int main(int argc, char *argv[]) { return 0; } -} // end namespace interface +} // end namespace interface -} // end namespace transport +} // end namespace transport int main(int argc, char *argv[]) { return transport::interface::main(argc, argv); -- cgit 1.2.3-korg