From 08233d44a6cfde878d7e10bca38ae935ed1c8fd5 Mon Sep 17 00:00:00 2001 From: Mauro Date: Wed, 30 Jun 2021 07:57:22 +0000 Subject: [HICN-713] Transport Library Major Refactoring 2 Co-authored-by: Luca Muscariello Co-authored-by: Michele Papalini Co-authored-by: Olivier Roques Co-authored-by: Giulio Grassi Signed-off-by: Mauro Sardara Change-Id: I5b2c667bad66feb45abdb5effe22ed0f6c85d1c2 --- utils/.clang-format | 14 - utils/CMakeLists.txt | 98 -- utils/cmake/Modules/Packaging.cmake | 28 - utils/src/hiperf.cc | 1683 ----------------------------------- utils/src/ping_client.cc | 449 ---------- utils/src/ping_server.cc | 340 ------- 6 files changed, 2612 deletions(-) delete mode 100644 utils/.clang-format delete mode 100644 utils/CMakeLists.txt delete mode 100644 utils/cmake/Modules/Packaging.cmake delete mode 100644 utils/src/hiperf.cc delete mode 100644 utils/src/ping_client.cc delete mode 100644 utils/src/ping_server.cc (limited to 'utils') diff --git a/utils/.clang-format b/utils/.clang-format deleted file mode 100644 index cd21e2017..000000000 --- a/utils/.clang-format +++ /dev/null @@ -1,14 +0,0 @@ -# Copyright (c) 2017-2021 Cisco and/or its affiliates. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -BasedOnStyle: Google diff --git a/utils/CMakeLists.txt b/utils/CMakeLists.txt deleted file mode 100644 index 953b46339..000000000 --- a/utils/CMakeLists.txt +++ /dev/null @@ -1,98 +0,0 @@ -# Copyright (c) 2017-2019 Cisco and/or its affiliates. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -cmake_minimum_required(VERSION 3.5 FATAL_ERROR) -set(CMAKE_CXX_STANDARD 14) - -project(utils) - -if (WIN32) - set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} /NODEFAULTLIB:\"LIBCMT\"" ) -endif() - -set(CMAKE_MODULE_PATH - ${CMAKE_MODULE_PATH} - "${CMAKE_CURRENT_SOURCE_DIR}/../cmake/Modules" - "${CMAKE_CURRENT_SOURCE_DIR}/cmake/Modules" -) - -if (NOT CMAKE_BUILD_TYPE) - message(STATUS "${PROJECT_NAME}: No build type selected, default to Release") - set(CMAKE_BUILD_TYPE "Release") -endif () - -include(BuildMacros) -include(WindowsMacros) - -set(HICN_UTILS hicn-utils CACHE INTERNAL "" FORCE) - -if(CMAKE_SOURCE_DIR STREQUAL PROJECT_SOURCE_DIR) - find_package(Libtransport REQUIRED) -else() - if (DISABLE_SHARED_LIBRARIES) - set(LIBTRANSPORT_LIBRARIES ${LIBTRANSPORT_STATIC}) - set(DEPENDENCIES ${LIBTRANSPORT_STATIC}) - else () - set(LIBTRANSPORT_LIBRARIES ${LIBTRANSPORT_SHARED}) - set(DEPENDENCIES ${LIBTRANSPORT_SHARED}) - endif () - -endif() - -set(SUFFIX "") -if (${LIBTRANSPORT_LIBRARIES} MATCHES ".*-memif.*") - set(SUFFIX "-memif") - set(LINK_FLAGS "-Wl,-unresolved-symbols=ignore-in-shared-libs") -endif() - -set(HICN_UTILS "${HICN_UTILS}${SUFFIX}") - -set (COMPILER_DEFINITIONS "") - -if (WIN32) - set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /wd4200 /wd4996") -endif () - -include(Packaging) - -if (NOT DISABLE_EXECUTABLES) - build_executable(hiperf - SOURCES src/hiperf.cc - LINK_LIBRARIES ${LIBTRANSPORT_LIBRARIES} ${WSOCK32_LIBRARY} ${WS2_32_LIBRARY} - INCLUDE_DIRS ${LIBTRANSPORT_INCLUDE_DIRS} - DEPENDS ${DEPENDENCIES} - COMPONENT ${HICN_UTILS} - DEFINITIONS ${COMPILER_DEFINITIONS} - LINK_FLAGS ${LINK_FLAGS} - ) - - build_executable(hicn-ping-server - SOURCES src/ping_server.cc - LINK_LIBRARIES ${LIBTRANSPORT_LIBRARIES} ${WSOCK32_LIBRARY} ${WS2_32_LIBRARY} - INCLUDE_DIRS ${LIBTRANSPORT_INCLUDE_DIRS} - DEPENDS ${DEPENDENCIES} - COMPONENT ${HICN_UTILS} - DEFINITIONS ${COMPILER_DEFINITIONS} - LINK_FLAGS ${LINK_FLAGS} - ) - - build_executable(hicn-ping-client - SOURCES src/ping_client.cc - LINK_LIBRARIES ${LIBTRANSPORT_LIBRARIES} ${WSOCK32_LIBRARY} ${WS2_32_LIBRARY} - INCLUDE_DIRS ${LIBTRANSPORT_INCLUDE_DIRS} - DEPENDS ${DEPENDENCIES} - COMPONENT ${HICN_UTILS} - DEFINITIONS ${COMPILER_DEFINITIONS} - LINK_FLAGS ${LINK_FLAGS} - ) -endif () diff --git a/utils/cmake/Modules/Packaging.cmake b/utils/cmake/Modules/Packaging.cmake deleted file mode 100644 index 783aa432a..000000000 --- a/utils/cmake/Modules/Packaging.cmake +++ /dev/null @@ -1,28 +0,0 @@ -# Copyright (c) 2017-2019 Cisco and/or its affiliates. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -set(${HICN_UTILS}_DESCRIPTION -"Hicn utils provide the hicn-ping and hiperf applications, \ -useful for testing and debugging within a hicn network." - CACHE STRING "Description for deb/rpm package." -) - -set(${HICN_UTILS}_DEB_DEPENDENCIES - "lib${LIBTRANSPORT} (>= stable_version)" - CACHE STRING "Dependencies for deb/rpm package." -) - -set(${HICN_UTILS}_RPM_DEPENDENCIES - "lib${LIBTRANSPORT} >= stable_version" - CACHE STRING "Dependencies for deb/rpm package." -) \ No newline at end of file diff --git a/utils/src/hiperf.cc b/utils/src/hiperf.cc deleted file mode 100644 index 9a1cf6236..000000000 --- a/utils/src/hiperf.cc +++ /dev/null @@ -1,1683 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#ifndef _WIN32 -#include -#endif - -#include -#include -#include -#include -#include -#include - -#ifdef __linux__ -#ifndef __ANDROID__ -#include -#endif -#endif - -#ifdef _WIN32 -#include -#endif - -namespace transport { -namespace interface { - -#ifndef ERROR_SUCCESS -#define ERROR_SUCCESS 0 -#endif -#define ERROR_SETUP -5 -#define MIN_PROBE_SEQ 0xefffffff - -struct packet_t { - uint64_t timestamp; - uint32_t size; -}; - -inline uint64_t _ntohll(const uint64_t *input) { - uint64_t return_val; - uint8_t *tmp = (uint8_t *)&return_val; - - tmp[0] = (uint8_t)(*input >> 56); - tmp[1] = (uint8_t)(*input >> 48); - tmp[2] = (uint8_t)(*input >> 40); - tmp[3] = (uint8_t)(*input >> 32); - tmp[4] = (uint8_t)(*input >> 24); - tmp[5] = (uint8_t)(*input >> 16); - tmp[6] = (uint8_t)(*input >> 8); - tmp[7] = (uint8_t)(*input >> 0); - - return return_val; -} - -inline uint64_t _htonll(const uint64_t *input) { return (_ntohll(input)); } - -struct nack_packet_t { - uint64_t timestamp; - uint32_t prod_rate; - uint32_t prod_seg; - - inline uint64_t getTimestamp() const { return _ntohll(×tamp); } - inline void setTimestamp(uint64_t time) { timestamp = _htonll(&time); } - - inline uint32_t getProductionRate() const { return ntohl(prod_rate); } - inline void setProductionRate(uint32_t rate) { prod_rate = htonl(rate); } - - inline uint32_t getProductionSegement() const { return ntohl(prod_seg); } - inline void setProductionSegement(uint32_t seg) { prod_seg = htonl(seg); } -}; - -/** - * Container for command line configuration for hiperf client. - */ -struct ClientConfiguration { - ClientConfiguration() - : name("b001::abcd", 0), - beta(-1.f), - drop_factor(-1.f), - window(-1), - producer_certificate(""), - passphrase(""), - receive_buffer(nullptr), - receive_buffer_size_(128 * 1024), - download_size(0), - report_interval_milliseconds_(1000), - transport_protocol_(CBR), - rtc_(false), - test_mode_(false), - secure_(false), - producer_prefix_(), - interest_lifetime_(500) {} - - Name name; - double beta; - double drop_factor; - double window; - std::string producer_certificate; - std::string passphrase; - std::shared_ptr receive_buffer; - std::size_t receive_buffer_size_; - std::size_t download_size; - std::uint32_t report_interval_milliseconds_; - TransportProtocolAlgorithms transport_protocol_; - bool rtc_; - bool test_mode_; - bool secure_; - Prefix producer_prefix_; - uint32_t interest_lifetime_; -}; - -/** - * Class for handling the production rate for the RTC producer. - */ -class Rate { - public: - Rate() : rate_kbps_(0) {} - - Rate(const std::string &rate) { - std::size_t found = rate.find("kbps"); - if (found != std::string::npos) { - rate_kbps_ = std::stof(rate.substr(0, found)); - } else { - throw std::runtime_error("Format " + rate + " not correct"); - } - } - - Rate(const Rate &other) : rate_kbps_(other.rate_kbps_) {} - - Rate &operator=(const std::string &rate) { - std::size_t found = rate.find("kbps"); - if (found != std::string::npos) { - rate_kbps_ = std::stof(rate.substr(0, found)); - } else { - throw std::runtime_error("Format " + rate + " not correct"); - } - - return *this; - } - - std::chrono::microseconds getMicrosecondsForPacket(std::size_t packet_size) { - return std::chrono::microseconds( - (uint32_t)std::round(packet_size * 1000.0 * 8.0 / (double)rate_kbps_)); - } - - private: - float rate_kbps_; -}; - -/** - * Container for command line configuration for hiperf server. - */ -struct ServerConfiguration { - ServerConfiguration() - : name("b001::abcd/64"), - virtual_producer(true), - manifest(false), - live_production(false), - sign(false), - content_lifetime(600000000_U32), - download_size(20 * 1024 * 1024), - hash_algorithm(auth::CryptoHashType::SHA_256), - keystore_name(""), - passphrase(""), - keystore_password("cisco"), - multiphase_produce_(false), - rtc_(false), - interactive_(false), - trace_based_(false), - trace_index_(0), - trace_file_(nullptr), - production_rate_(std::string("2048kbps")), - payload_size_(1400), - secure_(false) {} - - Prefix name; - bool virtual_producer; - bool manifest; - bool live_production; - bool sign; - std::uint32_t content_lifetime; - std::uint32_t download_size; - auth::CryptoHashType hash_algorithm; - std::string keystore_name; - std::string passphrase; - std::string keystore_password; - bool multiphase_produce_; - bool rtc_; - bool interactive_; - bool trace_based_; - std::uint32_t trace_index_; - char *trace_file_; - Rate production_rate_; - std::size_t payload_size_; - bool secure_; - std::vector trace_; -}; - -/** - * Forward declaration of client Read callbacks. - */ -class RTCCallback; -class Callback; -class KeyCallback; - -/** - * Hiperf client class: configure and setup an hicn consumer following the - * ClientConfiguration. - */ -class HIperfClient { - typedef std::chrono::time_point Time; - typedef std::chrono::microseconds TimeDuration; - - friend class Callback; - friend class KeyCallback; - friend class RTCCallback; - - public: - HIperfClient(const ClientConfiguration &conf) - : configuration_(conf), - total_duration_milliseconds_(0), - old_bytes_value_(0), - old_interest_tx_value_(0), - old_fec_interest_tx_value_(0), - old_fec_data_rx_value_(0), - old_lost_data_value_(0), - old_bytes_recovered_value_(0), - old_retx_value_(0), - old_sent_int_value_(0), - old_received_nacks_value_(0), - avg_data_delay_(0), - delay_sample_(0), - received_bytes_(0), - received_data_pkt_(0), - signals_(io_service_), - expected_seg_(0), - lost_packets_(std::unordered_set()), - rtc_callback_(*this), - callback_(*this), - key_callback_(*this) {} - - ~HIperfClient() {} - - void checkReceivedRtcContent(ConsumerSocket &c, - const ContentObject &contentObject) { - if (!configuration_.test_mode_) return; - - uint32_t receivedSeg = contentObject.getName().getSuffix(); - auto payload = contentObject.getPayload(); - - if ((uint32_t)payload->length() == 16) { // 16 is the size of the NACK - struct nack_packet_t *nack_pkt = - (struct nack_packet_t *)contentObject.getPayload()->data(); - uint32_t productionSeg = nack_pkt->getProductionSegement(); - uint32_t productionRate = nack_pkt->getProductionRate(); - - // uint32_t *payloadPtr = (uint32_t *)payload->data(); - // uint32_t productionSeg = *(payloadPtr); - // uint32_t productionRate = *(++payloadPtr); - - if (productionRate == 0) { - std::cout << "[STOP] producer is not producing content" << std::endl; - return; - } - - if (receivedSeg < productionSeg) { - std::cout << "[OUT OF SYNCH] received NACK for " << receivedSeg - << ". Next expected packet " << productionSeg + 1 - << std::endl; - expected_seg_ = productionSeg; - } else if (receivedSeg > productionSeg && receivedSeg < MIN_PROBE_SEQ) { - std::cout << "[WINDOW TOO LARGE] received NACK for " << receivedSeg - << ". Next expected packet " << productionSeg << std::endl; - } else if (receivedSeg >= MIN_PROBE_SEQ) { - std::cout << "[PROBE] probe number = " << receivedSeg << std::endl; - } - return; - } - - received_bytes_ += (uint32_t)(payload->length() - 12); - received_data_pkt_++; - - // collecting delay stats. Just for performance testing - // XXX we should probably get the transport header (12) somewhere - uint64_t *senderTimeStamp = (uint64_t *)(payload->data() + 12); - uint64_t now = std::chrono::duration_cast( - std::chrono::system_clock::now().time_since_epoch()) - .count(); - double new_delay = (double)(now - *senderTimeStamp); - if (*senderTimeStamp > now) - new_delay = -1 * (double)(*senderTimeStamp - now); - - delay_sample_++; - avg_data_delay_ = - avg_data_delay_ + (new_delay - avg_data_delay_) / delay_sample_; - - if (receivedSeg > expected_seg_ && expected_seg_ != 0) { - for (uint32_t i = expected_seg_; i < receivedSeg; i++) { - std::cout << "[LOSS] lost packet " << i << std::endl; - lost_packets_.insert(i); - } - expected_seg_ = receivedSeg + 1; - return; - } else if (receivedSeg < expected_seg_) { - auto it = lost_packets_.find(receivedSeg); - if (it != lost_packets_.end()) { - std::cout << "[RECOVER] recovered packet " << receivedSeg << std::endl; - lost_packets_.erase(it); - } else { - std::cout << "[OUT OF ORDER] recevied " << receivedSeg << " expedted " - << expected_seg_ << std::endl; - } - return; - } - expected_seg_ = receivedSeg + 1; - } - - void processLeavingInterest(ConsumerSocket &c, const Interest &interest) {} - - void handleTimerExpiration(ConsumerSocket &c, - const TransportStatistics &stats) { - const char separator = ' '; - const int width = 15; - - utils::TimePoint t2 = utils::SteadyClock::now(); - auto exact_duration = - std::chrono::duration_cast(t2 - t_stats_); - - std::stringstream interval; - interval << total_duration_milliseconds_ / 1000 << "-" - << total_duration_milliseconds_ / 1000 + - exact_duration.count() / 1000; - - std::stringstream bytes_transferred; - bytes_transferred << std::fixed << std::setprecision(3) - << (stats.getBytesRecv() - old_bytes_value_) / 1000000.0 - << std::setfill(separator) << "[MBytes]"; - - std::stringstream bandwidth; - bandwidth << ((stats.getBytesRecv() - old_bytes_value_) * 8) / - (exact_duration.count()) / 1000.0 - << std::setfill(separator) << "[Mbps]"; - - std::stringstream window; - window << stats.getAverageWindowSize() << std::setfill(separator) - << "[Int]"; - - std::stringstream avg_rtt; - avg_rtt << stats.getAverageRtt() << std::setfill(separator) << "[ms]"; - - if (configuration_.rtc_) { - // we get rtc stats more often, thus we need ms in the interval - std::stringstream interval_ms; - interval_ms << total_duration_milliseconds_ << "-" - << total_duration_milliseconds_ + exact_duration.count(); - - std::stringstream lost_data; - lost_data << stats.getLostData() - old_lost_data_value_ - << std::setfill(separator) << "[pkt]"; - - std::stringstream bytes_recovered_data; - bytes_recovered_data << stats.getBytesRecoveredData() - - old_bytes_recovered_value_ - << std::setfill(separator) << "[pkt]"; - - std::stringstream data_delay; - data_delay << avg_data_delay_ << std::setfill(separator) << "[ms]"; - - std::stringstream received_data_pkt; - received_data_pkt << received_data_pkt_ << std::setfill(separator) - << "[pkt]"; - - std::stringstream goodput; - goodput << (received_bytes_ * 8.0) / (exact_duration.count()) / 1000.0 - << std::setfill(separator) << "[Mbps]"; - - std::stringstream loss_rate; - loss_rate << std::fixed << std::setprecision(2) - << stats.getLossRatio() * 100.0 << std::setfill(separator) - << "[%]"; - - std::stringstream retx_sent; - retx_sent << stats.getRetxCount() - old_retx_value_ - << std::setfill(separator) << "[pkt]"; - - std::stringstream interest_sent; - interest_sent << stats.getInterestTx() - old_sent_int_value_ - << std::setfill(separator) << "[pkt]"; - - std::stringstream nacks; - nacks << stats.getReceivedNacks() - old_received_nacks_value_ - << std::setfill(separator) << "[pkt]"; - - // statistics not yet available in the transport - // std::stringstream interest_fec_tx; - // interest_fec_tx << stats.getInterestFecTxCount() - - // old_fec_interest_tx_value_ << std::setfill(separator) << "[pkt]"; - // std::stringstream bytes_fec_recv; - // bytes_fec_recv << stats.getBytesFecRecv() - old_fec_data_rx_value_ - // << std::setfill(separator) << "[bytes]"; - std::cout << std::left << std::setw(width) << "Interval"; - std::cout << std::left << std::setw(width) << "RecvData"; - std::cout << std::left << std::setw(width) << "Bandwidth"; - std::cout << std::left << std::setw(width) << "Goodput"; - std::cout << std::left << std::setw(width) << "LossRate"; - std::cout << std::left << std::setw(width) << "Retr"; - std::cout << std::left << std::setw(width) << "InterestSent"; - std::cout << std::left << std::setw(width) << "ReceivedNacks"; - std::cout << std::left << std::setw(width) << "SyncWnd"; - std::cout << std::left << std::setw(width) << "MinRtt"; - std::cout << std::left << std::setw(width) << "LostData"; - std::cout << std::left << std::setw(width) << "RecoveredData"; - std::cout << std::left << std::setw(width) << "State"; - std::cout << std::left << std::setw(width) << "DataDelay" << std::endl; - - std::cout << std::left << std::setw(width) << interval_ms.str(); - std::cout << std::left << std::setw(width) << received_data_pkt.str(); - std::cout << std::left << std::setw(width) << bandwidth.str(); - std::cout << std::left << std::setw(width) << goodput.str(); - std::cout << std::left << std::setw(width) << loss_rate.str(); - std::cout << std::left << std::setw(width) << retx_sent.str(); - std::cout << std::left << std::setw(width) << interest_sent.str(); - std::cout << std::left << std::setw(width) << nacks.str(); - std::cout << std::left << std::setw(width) << window.str(); - std::cout << std::left << std::setw(width) << avg_rtt.str(); - std::cout << std::left << std::setw(width) << lost_data.str(); - std::cout << std::left << std::setw(width) << bytes_recovered_data.str(); - std::cout << std::left << std::setw(width) << stats.getCCStatus(); - std::cout << std::left << std::setw(width) << data_delay.str(); - std::cout << std::endl; - - // statistics not yet available in the transport - // std::cout << std::left << std::setw(width) << interest_fec_tx.str(); - // std::cout << std::left << std::setw(width) << bytes_fec_recv.str(); - } else { - std::cout << std::left << std::setw(width) << "Interval"; - std::cout << std::left << std::setw(width) << "Transfer"; - std::cout << std::left << std::setw(width) << "Bandwidth"; - std::cout << std::left << std::setw(width) << "Retr"; - std::cout << std::left << std::setw(width) << "Cwnd"; - std::cout << std::left << std::setw(width) << "AvgRtt" << std::endl; - - std::cout << std::left << std::setw(width) << interval.str(); - std::cout << std::left << std::setw(width) << bytes_transferred.str(); - std::cout << std::left << std::setw(width) << bandwidth.str(); - std::cout << std::left << std::setw(width) << stats.getRetxCount(); - std::cout << std::left << std::setw(width) << window.str(); - std::cout << std::left << std::setw(width) << avg_rtt.str() << std::endl; - std::cout << std::endl; - } - total_duration_milliseconds_ += (uint32_t)exact_duration.count(); - old_bytes_value_ = stats.getBytesRecv(); - old_lost_data_value_ = stats.getLostData(); - old_bytes_recovered_value_ = stats.getBytesRecoveredData(); - old_fec_interest_tx_value_ = stats.getInterestFecTxCount(); - old_fec_data_rx_value_ = stats.getBytesFecRecv(); - old_retx_value_ = (uint32_t)stats.getRetxCount(); - old_sent_int_value_ = (uint32_t)stats.getInterestTx(); - old_received_nacks_value_ = stats.getReceivedNacks(); - delay_sample_ = 0; - avg_data_delay_ = 0; - received_bytes_ = 0; - received_data_pkt_ = 0; - - t_stats_ = utils::SteadyClock::now(); - } - - int setup() { - int ret; - - if (configuration_.rtc_) { - configuration_.transport_protocol_ = RTC; - } else if (configuration_.window < 0) { - configuration_.transport_protocol_ = RAAQM; - } else { - configuration_.transport_protocol_ = CBR; - } - - if (configuration_.secure_) { - consumer_socket_ = std::make_shared( - RAAQM, configuration_.transport_protocol_); - if (configuration_.producer_prefix_.getPrefixLength() == 0) { - std::cerr << "ERROR -- Missing producer prefix on which perform the " - "handshake." - << std::endl; - } else { - P2PSecureConsumerSocket &secure_consumer_socket = - *(static_cast(consumer_socket_.get())); - secure_consumer_socket.registerPrefix(configuration_.producer_prefix_); - } - } else { - consumer_socket_ = - std::make_shared(configuration_.transport_protocol_); - } - - consumer_socket_->setSocketOption( - GeneralTransportOptions::INTEREST_LIFETIME, - configuration_.interest_lifetime_); - -#if defined(DEBUG) && defined(__linux__) - std::shared_ptr portal; - consumer_socket_->getSocketOption(GeneralTransportOptions::PORTAL, portal); - signals_ = - std::make_unique(portal->getIoService(), SIGUSR1); - signals_->async_wait([this](const std::error_code &, const int &) { - std::cout << "Signal SIGUSR1!" << std::endl; - mtrace(); - }); -#endif - - if (consumer_socket_->setSocketOption(CURRENT_WINDOW_SIZE, - configuration_.window) == - SOCKET_OPTION_NOT_SET) { - std::cerr << "ERROR -- Impossible to set the size of the window." - << std::endl; - return ERROR_SETUP; - } - - if (configuration_.transport_protocol_ == RAAQM && - configuration_.beta != -1.f) { - if (consumer_socket_->setSocketOption(RaaqmTransportOptions::BETA_VALUE, - configuration_.beta) == - SOCKET_OPTION_NOT_SET) { - return ERROR_SETUP; - } - } - - if (configuration_.transport_protocol_ == RAAQM && - configuration_.drop_factor != -1.f) { - if (consumer_socket_->setSocketOption(RaaqmTransportOptions::DROP_FACTOR, - configuration_.drop_factor) == - SOCKET_OPTION_NOT_SET) { - return ERROR_SETUP; - } - } - - if (!configuration_.producer_certificate.empty()) { - std::shared_ptr verifier = - std::make_shared( - configuration_.producer_certificate); - if (consumer_socket_->setSocketOption(GeneralTransportOptions::VERIFIER, - verifier) == SOCKET_OPTION_NOT_SET) - return ERROR_SETUP; - } - - if (!configuration_.passphrase.empty()) { - std::shared_ptr verifier = - std::make_shared(configuration_.passphrase); - if (consumer_socket_->setSocketOption(GeneralTransportOptions::VERIFIER, - verifier) == SOCKET_OPTION_NOT_SET) - return ERROR_SETUP; - } - - ret = consumer_socket_->setSocketOption( - ConsumerCallbacksOptions::INTEREST_OUTPUT, - (ConsumerInterestCallback)std::bind( - &HIperfClient::processLeavingInterest, this, std::placeholders::_1, - std::placeholders::_2)); - - if (ret == SOCKET_OPTION_NOT_SET) { - return ERROR_SETUP; - } - - if (!configuration_.rtc_) { - ret = consumer_socket_->setSocketOption( - ConsumerCallbacksOptions::READ_CALLBACK, &callback_); - } else { - ret = consumer_socket_->setSocketOption( - ConsumerCallbacksOptions::READ_CALLBACK, &rtc_callback_); - } - - if (ret == SOCKET_OPTION_NOT_SET) { - return ERROR_SETUP; - } - - if (configuration_.rtc_) { - ret = consumer_socket_->setSocketOption( - ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT, - (ConsumerContentObjectCallback)std::bind( - &HIperfClient::checkReceivedRtcContent, this, - std::placeholders::_1, std::placeholders::_2)); - if (ret == SOCKET_OPTION_NOT_SET) { - return ERROR_SETUP; - } - } - - if (configuration_.rtc_) { - std::shared_ptr transport_stats; - consumer_socket_->getSocketOption( - OtherOptions::STATISTICS, (TransportStatistics **)&transport_stats); - transport_stats->setAlpha(0.0); - } - - ret = consumer_socket_->setSocketOption( - ConsumerCallbacksOptions::STATS_SUMMARY, - (ConsumerTimerCallback)std::bind(&HIperfClient::handleTimerExpiration, - this, std::placeholders::_1, - std::placeholders::_2)); - - if (ret == SOCKET_OPTION_NOT_SET) { - return ERROR_SETUP; - } - - if (consumer_socket_->setSocketOption( - GeneralTransportOptions::STATS_INTERVAL, - configuration_.report_interval_milliseconds_) == - SOCKET_OPTION_NOT_SET) { - return ERROR_SETUP; - } - - consumer_socket_->connect(); - - return ERROR_SUCCESS; - } - - int run() { - std::cout << "Starting download of " << configuration_.name << std::endl; - - signals_.add(SIGINT); - signals_.async_wait( - [this](const std::error_code &, const int &) { io_service_.stop(); }); - - t_download_ = t_stats_ = std::chrono::steady_clock::now(); - consumer_socket_->asyncConsume(configuration_.name); - io_service_.run(); - - consumer_socket_->stop(); - - return ERROR_SUCCESS; - } - - private: - class RTCCallback : public ConsumerSocket::ReadCallback { - static constexpr std::size_t mtu = 1500; - - public: - RTCCallback(HIperfClient &hiperf_client) : client_(hiperf_client) { - client_.configuration_.receive_buffer = utils::MemBuf::create(mtu); - } - - bool isBufferMovable() noexcept override { return false; } - - void getReadBuffer(uint8_t **application_buffer, - size_t *max_length) override { - *application_buffer = - client_.configuration_.receive_buffer->writableData(); - *max_length = mtu; - } - - void readDataAvailable(std::size_t length) noexcept override {} - - size_t maxBufferSize() const override { return mtu; } - - void readError(const std::error_code ec) noexcept override { - std::cerr << "Error while reading from RTC socket" << std::endl; - client_.io_service_.stop(); - } - - void readSuccess(std::size_t total_size) noexcept override { - std::cout << "Data successfully read" << std::endl; - } - - private: - HIperfClient &client_; - }; - - class Callback : public ConsumerSocket::ReadCallback { - public: - Callback(HIperfClient &hiperf_client) : client_(hiperf_client) { - client_.configuration_.receive_buffer = - utils::MemBuf::create(client_.configuration_.receive_buffer_size_); - } - - bool isBufferMovable() noexcept override { return false; } - - void getReadBuffer(uint8_t **application_buffer, - size_t *max_length) override { - *application_buffer = - client_.configuration_.receive_buffer->writableData(); - *max_length = client_.configuration_.receive_buffer_size_; - } - - void readDataAvailable(std::size_t length) noexcept override {} - - void readBufferAvailable( - std::unique_ptr &&buffer) noexcept override {} - - size_t maxBufferSize() const override { - return client_.configuration_.receive_buffer_size_; - } - - void readError(const std::error_code ec) noexcept override { - std::cerr << "Error " << ec.message() << " while reading from socket" - << std::endl; - client_.io_service_.stop(); - } - - void readSuccess(std::size_t total_size) noexcept override { - Time t2 = std::chrono::steady_clock::now(); - TimeDuration dt = - std::chrono::duration_cast(t2 - client_.t_download_); - long usec = (long)dt.count(); - - std::cout << "Content retrieved. Size: " << total_size << " [Bytes]" - << std::endl; - - std::cerr << "Elapsed Time: " << usec / 1000000.0 << " seconds -- " - << (total_size * 8) * 1.0 / usec * 1.0 << " [Mbps]" - << std::endl; - - client_.io_service_.stop(); - } - - private: - HIperfClient &client_; - }; - - class KeyCallback : public ConsumerSocket::ReadCallback { - static constexpr std::size_t read_size = 16 * 1024; - - public: - KeyCallback(HIperfClient &hiperf_client) - : client_(hiperf_client), key_(nullptr) {} - - bool isBufferMovable() noexcept override { return true; } - - void getReadBuffer(uint8_t **application_buffer, - size_t *max_length) override {} - - void readDataAvailable(std::size_t length) noexcept override {} - - void readBufferAvailable( - std::unique_ptr &&buffer) noexcept override { - key_ = std::make_unique((const char *)buffer->data(), - buffer->length()); - std::cout << "Key: " << *key_ << std::endl; - } - - size_t maxBufferSize() const override { return read_size; } - - void readError(const std::error_code ec) noexcept override { - std::cerr << "Error " << ec.message() << " while reading from socket" - << std::endl; - client_.io_service_.stop(); - } - - bool validateKey() { return !key_->empty(); } - - void readSuccess(std::size_t total_size) noexcept override { - std::cout << "Key size: " << total_size << " bytes" << std::endl; - } - - void setConsumer(std::shared_ptr consumer_socket) { - consumer_socket_ = consumer_socket; - } - - private: - HIperfClient &client_; - std::unique_ptr key_; - std::shared_ptr consumer_socket_; - }; - - ClientConfiguration configuration_; - Time t_stats_; - Time t_download_; - uint32_t total_duration_milliseconds_; - uint64_t old_bytes_value_; - uint64_t old_interest_tx_value_; - uint64_t old_fec_interest_tx_value_; - uint64_t old_fec_data_rx_value_; - uint64_t old_lost_data_value_; - uint64_t old_bytes_recovered_value_; - uint32_t old_retx_value_; - uint32_t old_sent_int_value_; - uint32_t old_received_nacks_value_; - - // IMPORTANT: to be used only for performance testing, when consumer and - // producer are synchronized. Used for rtc only at the moment - double avg_data_delay_; - uint32_t delay_sample_; - - uint32_t received_bytes_; - uint32_t received_data_pkt_; - - asio::io_service io_service_; - asio::signal_set signals_; - uint32_t expected_seg_; - std::unordered_set lost_packets_; - RTCCallback rtc_callback_; - Callback callback_; - KeyCallback key_callback_; - std::shared_ptr consumer_socket_; -}; // namespace interface - -/** - * Hiperf server class: configure and setup an hicn producer following the - * ServerConfiguration. - */ -class HIperfServer { - const std::size_t log2_content_object_buffer_size = 8; - - public: - HIperfServer(ServerConfiguration &conf) - : configuration_(conf), - signals_(io_service_), - rtc_timer_(io_service_), - unsatisfied_interests_(), - content_objects_((std::uint16_t)(1 << log2_content_object_buffer_size)), - content_objects_index_(0), - mask_((std::uint16_t)(1 << log2_content_object_buffer_size) - 1), - last_segment_(0), -#ifndef _WIN32 - ptr_last_segment_(&last_segment_), - input_(io_service_), - rtc_running_(false), -#else - ptr_last_segment_(&last_segment_), -#endif - flow_name_(configuration_.name.getName()) { - std::string buffer(configuration_.payload_size_, 'X'); - std::cout << "Producing contents under name " << conf.name.getName() - << std::endl; -#ifndef _WIN32 - if (configuration_.interactive_) { - input_.assign(::dup(STDIN_FILENO)); - } -#endif - - for (int i = 0; i < (1 << log2_content_object_buffer_size); i++) { - content_objects_[i] = ContentObject::Ptr( - new ContentObject(conf.name.getName(), HF_INET6_TCP, 0, - (const uint8_t *)buffer.data(), buffer.size())); - content_objects_[i]->setLifetime( - default_values::content_object_expiry_time); - } - } - - void virtualProcessInterest(ProducerSocket &p, const Interest &interest) { - // std::cout << "Received interest " << interest.getName() << std::endl; - content_objects_[content_objects_index_ & mask_]->setName( - interest.getName()); - producer_socket_->produce( - *content_objects_[content_objects_index_++ & mask_]); - } - - void processInterest(ProducerSocket &p, const Interest &interest) { - p.setSocketOption(ProducerCallbacksOptions::CACHE_MISS, - (ProducerInterestCallback)VOID_HANDLER); - p.setSocketOption(GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME, - 5000000_U32); - - produceContent(p, interest.getName(), interest.getName().getSuffix()); - std::cout << "Received interest " << interest.getName().getSuffix() - << std::endl; - } - - void asyncProcessInterest(ProducerSocket &p, const Interest &interest) { - p.setSocketOption(ProducerCallbacksOptions::CACHE_MISS, - (ProducerInterestCallback)bind( - &HIperfServer::cacheMiss, this, std::placeholders::_1, - std::placeholders::_2)); - p.setSocketOption(GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME, - 5000000_U32); - uint32_t suffix = interest.getName().getSuffix(); - - if (suffix == 0) { - last_segment_ = 0; - ptr_last_segment_ = &last_segment_; - unsatisfied_interests_.clear(); - } - - // The suffix will either be the one from the received interest or the - // smallest suffix of a previous interest not satisfed - if (!unsatisfied_interests_.empty()) { - auto it = - std::lower_bound(unsatisfied_interests_.begin(), - unsatisfied_interests_.end(), *ptr_last_segment_); - if (it != unsatisfied_interests_.end()) { - suffix = *it; - } - unsatisfied_interests_.erase(unsatisfied_interests_.begin(), it); - } - - std::cout << "Received interest " << interest.getName().getSuffix() - << ", starting production at " << suffix << std::endl; - std::cout << unsatisfied_interests_.size() << " interests still unsatisfied" - << std::endl; - produceContentAsync(p, interest.getName(), suffix); - } - - void produceContent(ProducerSocket &p, const Name &content_name, - uint32_t suffix) { - auto b = utils::MemBuf::create(configuration_.download_size); - std::memset(b->writableData(), '?', configuration_.download_size); - b->append(configuration_.download_size); - uint32_t total; - - utils::TimePoint t0 = utils::SteadyClock::now(); - total = p.produceStream(content_name, std::move(b), - !configuration_.multiphase_produce_, suffix); - utils::TimePoint t1 = utils::SteadyClock::now(); - - std::cout - << "Written " << total - << " data packets in output buffer (Segmentation time: " - << std::chrono::duration_cast(t1 - t0).count() - << " us)" << std::endl; - } - - void produceContentAsync(ProducerSocket &p, Name content_name, - uint32_t suffix) { - auto b = utils::MemBuf::create(configuration_.download_size); - std::memset(b->writableData(), '?', configuration_.download_size); - b->append(configuration_.download_size); - - p.asyncProduce(content_name, std::move(b), - !configuration_.multiphase_produce_, suffix, - &ptr_last_segment_); - } - - void cacheMiss(ProducerSocket &p, const Interest &interest) { - unsatisfied_interests_.push_back(interest.getName().getSuffix()); - } - - void onContentProduced(ProducerSocket &p, const std::error_code &err, - uint64_t bytes_written) { - p.setSocketOption(ProducerCallbacksOptions::CACHE_MISS, - (ProducerInterestCallback)bind( - &HIperfServer::asyncProcessInterest, this, - std::placeholders::_1, std::placeholders::_2)); - } - - std::shared_ptr getProducerIdentity( - std::string &keystore_path, std::string &keystore_pwd, - auth::CryptoHashType &hash_type) { - if (access(keystore_path.c_str(), F_OK) != -1) { - return std::make_shared(keystore_path, keystore_pwd, - hash_type); - } - return std::make_shared(keystore_path, keystore_pwd, - auth::CryptoSuite::RSA_SHA256, 1024, - 365, "producer-test"); - } - - int setup() { - int ret; - int production_protocol; - - if (configuration_.secure_) { - auto identity = getProducerIdentity(configuration_.keystore_name, - configuration_.keystore_password, - configuration_.hash_algorithm); - producer_socket_ = std::make_unique( - configuration_.rtc_, identity); - } else { - if (!configuration_.rtc_) { - production_protocol = ProductionProtocolAlgorithms::BYTE_STREAM; - } else { - production_protocol = ProductionProtocolAlgorithms::RTC_PROD; - } - - producer_socket_ = std::make_unique(production_protocol); - } - - if (configuration_.sign) { - std::shared_ptr signer; - - if (!configuration_.passphrase.empty()) { - signer = std::make_shared( - auth::CryptoSuite::HMAC_SHA256, configuration_.passphrase); - } else if (!configuration_.keystore_name.empty()) { - auto identity = getProducerIdentity(configuration_.keystore_name, - configuration_.keystore_password, - configuration_.hash_algorithm); - signer = identity->getSigner(); - } else { - return ERROR_SETUP; - } - - if (producer_socket_->setSocketOption(GeneralTransportOptions::SIGNER, - signer) == SOCKET_OPTION_NOT_SET) { - return ERROR_SETUP; - } - } - - uint32_t rtc_header_size = 0; - if (configuration_.rtc_) rtc_header_size = 12; - producer_socket_->setSocketOption( - GeneralTransportOptions::DATA_PACKET_SIZE, - (uint32_t)( - configuration_.payload_size_ + rtc_header_size + - (configuration_.name.getAddressFamily() == AF_INET ? 40 : 60))); - producer_socket_->registerPrefix(configuration_.name); - producer_socket_->connect(); - - if (configuration_.rtc_) { - std::cout << "Running RTC producer: the prefix length will be ignored." - " Use /128 by default in RTC mode" - << std::endl; - return ERROR_SUCCESS; - } - - if (!configuration_.virtual_producer) { - if (producer_socket_->setSocketOption( - GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME, - configuration_.content_lifetime) == SOCKET_OPTION_NOT_SET) { - return ERROR_SETUP; - } - - if (producer_socket_->setSocketOption( - GeneralTransportOptions::MAKE_MANIFEST, - configuration_.manifest) == SOCKET_OPTION_NOT_SET) { - return ERROR_SETUP; - } - - if (producer_socket_->setSocketOption( - GeneralTransportOptions::OUTPUT_BUFFER_SIZE, 200000U) == - SOCKET_OPTION_NOT_SET) { - return ERROR_SETUP; - } - - if (!configuration_.live_production) { - produceContent(*producer_socket_, configuration_.name.getName(), 0); - } else { - ret = producer_socket_->setSocketOption( - ProducerCallbacksOptions::CACHE_MISS, - (ProducerInterestCallback)bind(&HIperfServer::asyncProcessInterest, - this, std::placeholders::_1, - std::placeholders::_2)); - - if (ret == SOCKET_OPTION_NOT_SET) { - return ERROR_SETUP; - } - } - } else { - ret = producer_socket_->setSocketOption( - GeneralTransportOptions::OUTPUT_BUFFER_SIZE, 0U); - - if (ret == SOCKET_OPTION_NOT_SET) { - return ERROR_SETUP; - } - - ret = producer_socket_->setSocketOption( - ProducerCallbacksOptions::CACHE_MISS, - (ProducerInterestCallback)bind(&HIperfServer::virtualProcessInterest, - this, std::placeholders::_1, - std::placeholders::_2)); - - if (ret == SOCKET_OPTION_NOT_SET) { - return ERROR_SETUP; - } - } - - ret = producer_socket_->setSocketOption( - ProducerCallbacksOptions::CONTENT_PRODUCED, - (ProducerContentCallback)bind( - &HIperfServer::onContentProduced, this, std::placeholders::_1, - std::placeholders::_2, std::placeholders::_3)); - - return ERROR_SUCCESS; - } - - void sendRTCContentObjectCallback(std::error_code ec) { - if (ec) return; - rtc_timer_.expires_from_now( - configuration_.production_rate_.getMicrosecondsForPacket( - configuration_.payload_size_)); - rtc_timer_.async_wait(std::bind(&HIperfServer::sendRTCContentObjectCallback, - this, std::placeholders::_1)); - auto payload = - content_objects_[content_objects_index_++ & mask_]->getPayload(); - - // this is used to compute the data packet delay - // Used only for performance evaluation - // It requires clock synchronization between producer and consumer - uint64_t now = std::chrono::duration_cast( - std::chrono::system_clock::now().time_since_epoch()) - .count(); - - std::memcpy(payload->writableData(), &now, sizeof(uint64_t)); - - producer_socket_->produceDatagram( - flow_name_, payload->data(), - payload->length() < 1400 ? payload->length() : 1400); - } - - void sendRTCContentObjectCallbackWithTrace(std::error_code ec) { - if (ec) return; - - auto payload = - content_objects_[content_objects_index_++ & mask_]->getPayload(); - - uint32_t packet_len = - configuration_.trace_[configuration_.trace_index_].size; - - // this is used to compute the data packet delay - // used only for performance evaluation - // it requires clock synchronization between producer and consumer - uint64_t now = std::chrono::duration_cast( - std::chrono::system_clock::now().time_since_epoch()) - .count(); - - std::memcpy(payload->writableData(), &now, sizeof(uint64_t)); - - if (packet_len > payload->length()) packet_len = (uint32_t)payload->length(); - if (packet_len > 1400) packet_len = 1400; - - producer_socket_->produceDatagram(flow_name_, payload->data(), packet_len); - - uint32_t next_index = configuration_.trace_index_ + 1; - uint64_t schedule_next; - if (next_index < configuration_.trace_.size()) { - schedule_next = - configuration_.trace_[next_index].timestamp - - configuration_.trace_[configuration_.trace_index_].timestamp; - } else { - // here we need to loop, schedule in a random time - schedule_next = 1000; - } - - configuration_.trace_index_ = - (configuration_.trace_index_ + 1) % configuration_.trace_.size(); - rtc_timer_.expires_from_now(std::chrono::microseconds(schedule_next)); - rtc_timer_.async_wait( - std::bind(&HIperfServer::sendRTCContentObjectCallbackWithTrace, this, - std::placeholders::_1)); - } - -#ifndef _WIN32 - void handleInput(const std::error_code &error, std::size_t length) { - if (error) { - producer_socket_->stop(); - io_service_.stop(); - } - - if (rtc_running_) { - std::cout << "stop real time content production" << std::endl; - rtc_running_ = false; - rtc_timer_.cancel(); - } else { - std::cout << "start real time content production" << std::endl; - rtc_running_ = true; - rtc_timer_.expires_from_now( - configuration_.production_rate_.getMicrosecondsForPacket( - configuration_.payload_size_)); - rtc_timer_.async_wait( - std::bind(&HIperfServer::sendRTCContentObjectCallback, this, - std::placeholders::_1)); - } - - input_buffer_.consume(length); // Remove newline from input. - asio::async_read_until( - input_, input_buffer_, '\n', - std::bind(&HIperfServer::handleInput, this, std::placeholders::_1, - std::placeholders::_2)); - } -#endif - - int parseTraceFile() { - std::ifstream trace(configuration_.trace_file_); - if (trace.fail()) { - return -1; - } - std::string line; - while (std::getline(trace, line)) { - std::istringstream iss(line); - struct packet_t packet; - iss >> packet.timestamp >> packet.size; - configuration_.trace_.push_back(packet); - } - return 0; - } - - int run() { - std::cerr << "Starting to serve consumers" << std::endl; - - signals_.add(SIGINT); - signals_.async_wait([this](const std::error_code &, const int &) { - std::cout << "STOPPING!!" << std::endl; - producer_socket_->stop(); - io_service_.stop(); - }); - - if (configuration_.rtc_) { -#ifndef _WIN32 - if (configuration_.interactive_) { - asio::async_read_until( - input_, input_buffer_, '\n', - std::bind(&HIperfServer::handleInput, this, std::placeholders::_1, - std::placeholders::_2)); - } else if (configuration_.trace_based_) { - std::cout << "trace-based mode enabled" << std::endl; - if (configuration_.trace_file_ == nullptr) { - std::cout << "cannot find the trace file" << std::endl; - return ERROR_SETUP; - } - if (parseTraceFile() < 0) { - std::cout << "cannot parse the trace file" << std::endl; - return ERROR_SETUP; - } - rtc_running_ = true; - rtc_timer_.expires_from_now(std::chrono::milliseconds(1)); - rtc_timer_.async_wait( - std::bind(&HIperfServer::sendRTCContentObjectCallbackWithTrace, - this, std::placeholders::_1)); - } else { - rtc_running_ = true; - rtc_timer_.expires_from_now( - configuration_.production_rate_.getMicrosecondsForPacket( - configuration_.payload_size_)); - rtc_timer_.async_wait( - std::bind(&HIperfServer::sendRTCContentObjectCallback, this, - std::placeholders::_1)); - } -#else - rtc_timer_.expires_from_now( - configuration_.production_rate_.getMicrosecondsForPacket( - configuration_.payload_size_)); - rtc_timer_.async_wait( - std::bind(&HIperfServer::sendRTCContentObjectCallback, this, - std::placeholders::_1)); -#endif - } - - io_service_.run(); - - return ERROR_SUCCESS; - } - - private: - ServerConfiguration configuration_; - asio::io_service io_service_; - asio::signal_set signals_; - asio::steady_timer rtc_timer_; - std::vector unsatisfied_interests_; - std::vector> content_objects_; - std::uint16_t content_objects_index_; - std::uint16_t mask_; - std::uint32_t last_segment_; - std::uint32_t *ptr_last_segment_; - std::unique_ptr producer_socket_; -#ifndef _WIN32 - asio::posix::stream_descriptor input_; - asio::streambuf input_buffer_; - bool rtc_running_; - -#endif - core::Name flow_name_; -}; // namespace interface - -void usage() { - std::cerr << "HIPERF - A tool for performing network throughput " - "measurements with hICN" - << std::endl; - std::cerr << "usage: hiperf [-S|-C] [options] [prefix|name]" << std::endl; - std::cerr << std::endl; - std::cerr << "SERVER OR CLIENT:" << std::endl; -#ifndef _WIN32 - std::cerr << "-D\t\t\t\t\t" - << "Run as a daemon" << std::endl; - std::cerr << "-R\t\t\t\t\t" - << "Run RTC protocol (client or server)" << std::endl; - std::cerr << "-f\t\t\t\t" - << "Log file" << std::endl; - std::cerr << "-z\t\t\t\t" - << "IO module to use. Default: hicnlight_module" << std::endl; -#endif - std::cerr << std::endl; - std::cerr << "SERVER SPECIFIC:" << std::endl; - std::cerr << "-A\t\t\t\t" - "Size of the content to publish. This " - "is not the size of the packet (see -s for it)." - << std::endl; - std::cerr << "-s\t\t\t\tSize of the payload of each data packet." - << std::endl; - std::cerr << "-r\t\t\t\t\t" - << "Produce real content of bytes" << std::endl; - std::cerr << "-m\t\t\t\t\t" - << "Produce transport manifest" << std::endl; - std::cerr << "-l\t\t\t\t\t" - << "Start producing content upon the reception of the " - "first interest" - << std::endl; - std::cerr << "-K\t\t\t\t" - << "Path of p12 file containing the " - "crypto material used for signing packets" - << std::endl; - std::cerr << "-k\t\t\t\t" - << "String from which a 128-bit symmetric key will be " - "derived for signing packets" - << std::endl; - std::cerr << "-y\t\t\t" - << "Use the selected hash algorithm for " - "calculating manifest digests" - << std::endl; - std::cerr << "-p\t\t\t\t" - << "Password for p12 keystore" << std::endl; - std::cerr << "-x\t\t\t\t\t" - << "Produce a content of , then after downloading " - "it produce a new content of" - << "\n\t\t\t\t\t without resetting " - "the suffix to 0." - << std::endl; - std::cerr << "-B\t\t\t\t" - << "Bitrate for RTC producer, to be used with the -R option." - << std::endl; -#ifndef _WIN32 - std::cerr << "-I\t\t\t\t\t" - "Interactive mode, start/stop real time content production " - "by pressing return. To be used with the -R option" - << std::endl; - std::cerr - << "-T\t\t\t\t" - "Trace based mode, hiperf takes as input a file with a trace. " - "Each line of the file indicates the timestamp and the size of " - "the packet to generate. To be used with the -R option. -B and -I " - "will be ignored." - << std::endl; - std::cerr << "-E\t\t\t\t\t" - << "Enable encrypted communication. Requires the path to a p12 " - "file containing the " - "crypto material used for the TLS handshake" - << std::endl; -#endif - std::cerr << std::endl; - std::cerr << "CLIENT SPECIFIC:" << std::endl; - std::cerr << "-b\t\t\t" - << "RAAQM beta parameter" << std::endl; - std::cerr << "-d\t\t\t" - << "RAAQM drop factor " - "parameter" - << std::endl; - std::cerr << "-L\t\t\t" - << "Set interest lifetime." << std::endl; - std::cerr << "-M\t\t\t" - << "Size of consumer input buffer. If 0, reassembly of packets " - "will be disabled." - << std::endl; - std::cerr << "-W\t\t\t\t" - << "Use a fixed congestion window " - "for retrieving the data." - << std::endl; - std::cerr << "-i\t\t\t" - << "Show the statistics every milliseconds." - << std::endl; - std::cerr << "-c\t\t\t" - << "Path of the producer certificate to be used for verifying the " - "origin of the packets received." - << std::endl; - std::cerr << "-k\t\t\t\t" - << "String from which is derived the symmetric key used by the " - "producer to sign packets and by the consumer to verify them." - << std::endl; - std::cerr << "-t\t\t\t\t\t" - "Test mode, check if the client is receiving the " - "correct data. This is an RTC specific option, to be " - "used with the -R (default false)" - << std::endl; - std::cerr << "-P\t\t\t\t\t" - << "Prefix of the producer where to do the handshake" << std::endl; -} - -int main(int argc, char *argv[]) { -#ifndef _WIN32 - // Common - bool daemon = false; -#else - WSADATA wsaData = {0}; - WSAStartup(MAKEWORD(2, 2), &wsaData); -#endif - - // -1 server, 0 undefined, 1 client - int role = 0; - int options = 0; - - char *log_file = nullptr; - interface::global_config::IoModuleConfiguration config; - std::string conf_file; - config.name = "hicnlight_module"; - - // Consumer - ClientConfiguration client_configuration; - - // Producer - ServerConfiguration server_configuration; - - int opt; -#ifndef _WIN32 - while ((opt = getopt( - argc, argv, - "DSCf:b:d:W:RM:c:vA:s:rmlK:k:y:p:hi:xE:P:B:ItL:z:T:F:")) != -1) { - switch (opt) { - // Common - case 'D': { - daemon = true; - break; - } - case 'I': { - server_configuration.interactive_ = true; - server_configuration.trace_based_ = false; - break; - } - case 'T': { - server_configuration.interactive_ = false; - server_configuration.trace_based_ = true; - server_configuration.trace_file_ = optarg; - break; - } -#else - while ((opt = getopt(argc, argv, - "SCf:b:d:W:RM:c:vA:s:rmlK:k:y:p:hi:xB:E:P:tL:z:F:")) != - -1) { - switch (opt) { -#endif - case 'f': { - log_file = optarg; - break; - } - case 'R': { - client_configuration.rtc_ = true; - server_configuration.rtc_ = true; - break; - } - case 'z': { - config.name = optarg; - break; - } - case 'F': { - conf_file = optarg; - break; - } - - // Server or Client - case 'S': { - role -= 1; - break; - } - case 'C': { - role += 1; - break; - } - case 'k': { - server_configuration.passphrase = std::string(optarg); - client_configuration.passphrase = std::string(optarg); - server_configuration.sign = true; - break; - } - - // Client specifc - case 'b': { - client_configuration.beta = std::stod(optarg); - options = 1; - break; - } - case 'd': { - client_configuration.drop_factor = std::stod(optarg); - options = 1; - break; - } - case 'W': { - client_configuration.window = std::stod(optarg); - options = 1; - break; - } - case 'M': { - client_configuration.receive_buffer_size_ = std::stoull(optarg); - options = 1; - break; - } - case 'P': { - client_configuration.producer_prefix_ = Prefix(optarg); - client_configuration.secure_ = true; - break; - } - case 'c': { - client_configuration.producer_certificate = std::string(optarg); - options = 1; - break; - } - case 'i': { - client_configuration.report_interval_milliseconds_ = std::stoul(optarg); - options = 1; - break; - } - case 't': { - client_configuration.test_mode_ = true; - options = 1; - break; - } - case 'L': { - client_configuration.interest_lifetime_ = std::stoul(optarg); - options = 1; - break; - } - // Server specific - case 'A': { - server_configuration.download_size = std::stoul(optarg); - options = -1; - break; - } - case 's': { - server_configuration.payload_size_ = std::stoul(optarg); - options = -1; - break; - } - case 'r': { - server_configuration.virtual_producer = false; - options = -1; - break; - } - case 'm': { - server_configuration.manifest = true; - options = -1; - break; - } - case 'l': { - server_configuration.live_production = true; - options = -1; - break; - } - case 'K': { - server_configuration.keystore_name = std::string(optarg); - server_configuration.sign = true; - options = -1; - break; - } - case 'y': { - if (strncasecmp(optarg, "sha256", 6) == 0) { - server_configuration.hash_algorithm = auth::CryptoHashType::SHA_256; - } else if (strncasecmp(optarg, "sha512", 6) == 0) { - server_configuration.hash_algorithm = auth::CryptoHashType::SHA_512; - } else if (strncasecmp(optarg, "crc32", 5) == 0) { - server_configuration.hash_algorithm = auth::CryptoHashType::CRC32C; - } else { - std::cerr << "Ignored unknown hash algorithm. Using SHA 256." - << std::endl; - } - options = -1; - break; - } - case 'p': { - server_configuration.keystore_password = std::string(optarg); - options = -1; - break; - } - case 'x': { - server_configuration.multiphase_produce_ = true; - options = -1; - break; - } - case 'B': { - auto str = std::string(optarg); - std::transform(str.begin(), str.end(), str.begin(), ::tolower); - server_configuration.production_rate_ = str; - options = -1; - break; - } - case 'E': { - server_configuration.keystore_name = std::string(optarg); - server_configuration.secure_ = true; - break; - } - case 'h': - default: - usage(); - return EXIT_FAILURE; - } - } - - if (options > 0 && role < 0) { - std::cerr << "Client options cannot be used when using the " - "software in server mode" - << std::endl; - usage(); - return EXIT_FAILURE; - } else if (options < 0 && role > 0) { - std::cerr << "Server options cannot be used when using the " - "software in client mode" - << std::endl; - usage(); - return EXIT_FAILURE; - } else if (!role) { - std::cerr << "Please specify if running hiperf as client " - "or server." - << std::endl; - usage(); - return EXIT_FAILURE; - } - - if (argv[optind] == 0) { - std::cerr << "Please specify the name/prefix to use." << std::endl; - usage(); - return EXIT_FAILURE; - } else { - if (role > 0) { - client_configuration.name = Name(argv[optind]); - } else { - server_configuration.name = Prefix(argv[optind]); - } - } - - if (log_file) { -#ifndef _WIN32 - int fd = open(log_file, O_WRONLY | O_APPEND | O_CREAT, S_IWUSR | S_IRUSR); - dup2(fd, STDOUT_FILENO); - dup2(STDOUT_FILENO, STDERR_FILENO); - close(fd); -#else - int fd = - _open(log_file, _O_WRONLY | _O_APPEND | _O_CREAT, _S_IWRITE | _S_IREAD); - _dup2(fd, _fileno(stdout)); - _dup2(_fileno(stdout), _fileno(stderr)); - _close(fd); -#endif - } - -#ifndef _WIN32 - if (daemon) { - utils::Daemonizator::daemonize(false); - } -#endif - - /** - * IO module configuration - */ - config.set(); - - // Parse config file - transport::interface::global_config::parseConfigurationFile(conf_file); - - if (role > 0) { - HIperfClient c(client_configuration); - if (c.setup() != ERROR_SETUP) { - c.run(); - } - } else if (role < 0) { - HIperfServer s(server_configuration); - if (s.setup() != ERROR_SETUP) { - s.run(); - } - } else { - usage(); - return EXIT_FAILURE; - } - -#ifdef _WIN32 - WSACleanup(); -#endif - - return 0; -} - -} // end namespace interface -} // end namespace transport - -int main(int argc, char *argv[]) { - return transport::interface::main(argc, argv); -} diff --git a/utils/src/ping_client.cc b/utils/src/ping_client.cc deleted file mode 100644 index e7a9228f2..000000000 --- a/utils/src/ping_client.cc +++ /dev/null @@ -1,449 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include -#include -#include -#include -#include - -// Let's make the linker happy -#if !TRANSPORT_LOG_EXTERN_GLOBAL_OUTPUT_LEVEL -#ifndef TRANSPORT_LOG_DEFINE_GLOBAL_OUTPUT_LEVEL -TRANSPORT_LOG_DEFINE_GLOBAL_OUTPUT_LEVEL = 0; -#endif -#endif - -#include -#include -#include -#include - -#define SYN_STATE 1 -#define ACK_STATE 2 - -namespace transport { - -namespace core { - -namespace ping { - -typedef std::map SendTimeMap; -typedef auth::AsymmetricVerifier Verifier; - -class Configuration { - public: - uint64_t interestLifetime_; - uint64_t pingInterval_; - uint64_t maxPing_; - uint64_t first_suffix_; - std::string name_; - std::string certificate_; - uint16_t srcPort_; - uint16_t dstPort_; - bool verbose_; - bool dump_; - bool jump_; - bool open_; - bool always_syn_; - bool always_ack_; - bool quiet_; - uint32_t jump_freq_; - uint32_t jump_size_; - uint8_t ttl_; - - Configuration() { - interestLifetime_ = 500; // ms - pingInterval_ = 1000000; // us - maxPing_ = 10; // number of interests - first_suffix_ = 0; - name_ = "b001::1"; // string - srcPort_ = 9695; - dstPort_ = 8080; - verbose_ = false; - dump_ = false; - jump_ = false; - open_ = false; - always_syn_ = false; - always_ack_ = false; - quiet_ = false; - jump_freq_ = 0; - jump_size_ = 0; - ttl_ = 64; - } -}; - -class Client : interface::Portal::ConsumerCallback { - public: - Client(Configuration *c) - : portal_(), signals_(portal_.getIoService(), SIGINT) { - // Let the main thread to catch SIGINT - portal_.connect(); - portal_.setConsumerCallback(this); - - signals_.async_wait(std::bind(&Client::afterSignal, this)); - - timer_.reset(new asio::steady_timer(portal_.getIoService())); - config_ = c; - sequence_number_ = config_->first_suffix_; - last_jump_ = 0; - processed_ = 0; - state_ = SYN_STATE; - sent_ = 0; - received_ = 0; - timedout_ = 0; - if (!c->certificate_.empty()) { - verifier_.setCertificate(c->certificate_); - } - } - - virtual ~Client() {} - - void ping() { - std::cout << "start ping" << std::endl; - doPing(); - portal_.runEventsLoop(); - } - - void onContentObject(Interest &interest, ContentObject &object) override { - uint64_t rtt = 0; - - if (!config_->certificate_.empty()) { - auto t0 = std::chrono::steady_clock::now(); - if (verifier_.verifyPacket(&object)) { - auto t1 = std::chrono::steady_clock::now(); - auto dt = - std::chrono::duration_cast(t1 - t0); - std::cout << "Verification time: " << dt.count() << std::endl; - std::cout << "<<< Signature Ok." << std::endl; - } else { - std::cout << "<<< Signature verification failed!" << std::endl; - } - } - - auto it = send_timestamps_.find(interest.getName().getSuffix()); - if (it != send_timestamps_.end()) { - rtt = std::chrono::duration_cast( - std::chrono::steady_clock::now().time_since_epoch()) - .count() - - it->second; - send_timestamps_.erase(it); - } - - if (config_->verbose_) { - std::cout << "<<< recevied object. " << std::endl; - std::cout << "<<< interest name: " << interest.getName() - << " src port: " << interest.getSrcPort() - << " dst port: " << interest.getDstPort() - << " flags: " << interest.printFlags() << std::endl; - std::cout << "<<< object name: " << object.getName() - << " src port: " << object.getSrcPort() - << " dst port: " << object.getDstPort() - << " flags: " << object.printFlags() << " path label " - << object.getPathLabel() << " (" - << (object.getPathLabel() >> 24) << ")" - << " TTL: " << (int)object.getTTL() << std::endl; - } else if (!config_->quiet_) { - std::cout << "<<< received object. " << std::endl; - std::cout << "<<< round trip: " << rtt << " [us]" << std::endl; - std::cout << "<<< interest name: " << interest.getName() << std::endl; - std::cout << "<<< object name: " << object.getName() << std::endl; - std::cout << "<<< content object size: " - << object.payloadSize() + object.headerSize() << " [bytes]" - << std::endl; - } - - if (config_->dump_) { - std::cout << "----- interest dump -----" << std::endl; - interest.dump(); - std::cout << "-------------------------" << std::endl; - std::cout << "----- object dump -------" << std::endl; - object.dump(); - std::cout << "-------------------------" << std::endl; - } - - if (!config_->quiet_) std::cout << std::endl; - - if (!config_->always_syn_) { - if (object.testSyn() && object.testAck() && state_ == SYN_STATE) { - state_ = ACK_STATE; - } - } - - received_++; - processed_++; - if (processed_ >= config_->maxPing_) { - afterSignal(); - } - } - - void onTimeout(Interest::Ptr &&interest) override { - if (config_->verbose_) { - std::cout << "### timeout for " << interest->getName() - << " src port: " << interest->getSrcPort() - << " dst port: " << interest->getDstPort() - << " flags: " << interest->printFlags() << std::endl; - } else if (!config_->quiet_) { - std::cout << "### timeout for " << interest->getName() << std::endl; - } - - if (config_->dump_) { - std::cout << "----- interest dump -----" << std::endl; - interest->dump(); - std::cout << "-------------------------" << std::endl; - } - - if (!config_->quiet_) std::cout << std::endl; - - timedout_++; - processed_++; - if (processed_ >= config_->maxPing_) { - afterSignal(); - } - } - - void onError(std::error_code ec) override {} - - void doPing() { - const Name interest_name(config_->name_, (uint32_t)sequence_number_); - hicn_format_t format; - if (interest_name.getAddressFamily() == AF_INET) { - format = HF_INET_TCP; - } else { - format = HF_INET6_TCP; - } - - auto interest = std::make_shared(interest_name, format); - - interest->setLifetime(uint32_t(config_->interestLifetime_)); - interest->resetFlags(); - - if (config_->open_ || config_->always_syn_) { - if (state_ == SYN_STATE) { - interest->setSyn(); - } else if (state_ == ACK_STATE) { - interest->setAck(); - } - } else if (config_->always_ack_) { - interest->setAck(); - } - - interest->setSrcPort(config_->srcPort_); - interest->setDstPort(config_->dstPort_); - interest->setTTL(config_->ttl_); - - if (config_->verbose_) { - std::cout << ">>> send interest " << interest->getName() - << " src port: " << interest->getSrcPort() - << " dst port: " << interest->getDstPort() - << " flags: " << interest->printFlags() - << " TTL: " << (int)interest->getTTL() << std::endl; - } else if (!config_->quiet_) { - std::cout << ">>> send interest " << interest->getName() << std::endl; - } - - if (config_->dump_) { - std::cout << "----- interest dump -----" << std::endl; - interest->dump(); - std::cout << "-------------------------" << std::endl; - } - - if (!config_->quiet_) std::cout << std::endl; - - send_timestamps_[sequence_number_] = - std::chrono::duration_cast( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); - - portal_.sendInterest(std::move(interest)); - - sequence_number_++; - sent_++; - - if (sent_ < config_->maxPing_) { - this->timer_->expires_from_now( - std::chrono::microseconds(config_->pingInterval_)); - this->timer_->async_wait([this](const std::error_code e) { doPing(); }); - } - } - - void afterSignal() { - std::cout << "Stop ping" << std::endl; - std::cout << "Sent: " << sent_ << " Received: " << received_ - << " Timeouts: " << timedout_ << std::endl; - portal_.stopEventsLoop(); - } - - void reset() { - timer_.reset(new asio::steady_timer(portal_.getIoService())); - sequence_number_ = config_->first_suffix_; - last_jump_ = 0; - processed_ = 0; - state_ = SYN_STATE; - sent_ = 0; - received_ = 0; - timedout_ = 0; - } - - private: - SendTimeMap send_timestamps_; - interface::Portal portal_; - asio::signal_set signals_; - uint64_t sequence_number_; - uint64_t last_jump_; - uint64_t processed_; - uint32_t state_; - uint32_t sent_; - uint32_t received_; - uint32_t timedout_; - std::unique_ptr timer_; - Configuration *config_; - Verifier verifier_; -}; - -void help() { - std::cout << "usage: hicn-consumer-ping [options]" << std::endl; - std::cout << "PING options" << std::endl; - std::cout - << "-i ping interval in microseconds (default 1000000ms)" - << std::endl; - std::cout << "-m maximum number of pings to send (default 10)" - << std::endl; - std::cout << "-s sorce port (default 9695)" << std::endl; - std::cout << "-d destination port (default 8080)" << std::endl; - std::cout << "-t set packet ttl (default 64)" << std::endl; - std::cout << "-O open tcp connection (three way handshake) " - "(default false)" - << std::endl; - std::cout << "-S send always syn messages (default false)" - << std::endl; - std::cout << "-A send always ack messages (default false)" - << std::endl; - std::cout << "HICN options" << std::endl; - std::cout << "-n hicn name (default b001::1)" << std::endl; - std::cout - << "-l interest lifetime in milliseconds (default 500ms)" - << std::endl; - std::cout << "OUTPUT options" << std::endl; - std::cout << "-V verbose, prints statistics about the " - "messagges sent and received (default false)" - << std::endl; - std::cout << "-D dump, dumps sent and received packets " - "(default false)" - << std::endl; - std::cout << "-q quiet, not prints (default false)" - << std::endl; - std::cout << "-H prints this message" << std::endl; -} - -int main(int argc, char *argv[]) { -#ifdef _WIN32 - WSADATA wsaData = {0}; - WSAStartup(MAKEWORD(2, 2), &wsaData); -#endif - - Configuration *c = new Configuration(); - int opt; - std::string producer_certificate = ""; - - while ((opt = getopt(argc, argv, "j::t:i:m:s:d:n:l:f:c:SAOqVDH")) != -1) { - switch (opt) { - case 't': - c->ttl_ = (uint8_t)std::stoi(optarg); - break; - case 'i': - c->pingInterval_ = std::stoi(optarg); - break; - case 'm': - c->maxPing_ = std::stoi(optarg); - break; - case 'f': - c->first_suffix_ = std::stoul(optarg); - break; - case 's': - c->srcPort_ = std::stoi(optarg); - break; - case 'd': - c->dstPort_ = std::stoi(optarg); - break; - case 'n': - c->name_ = optarg; - break; - case 'l': - c->interestLifetime_ = std::stoi(optarg); - break; - case 'V': - c->verbose_ = true; - ; - break; - case 'D': - c->dump_ = true; - break; - case 'O': - c->always_syn_ = false; - c->always_ack_ = false; - c->open_ = true; - break; - case 'S': - c->always_syn_ = true; - c->always_ack_ = false; - c->open_ = false; - break; - case 'A': - c->always_syn_ = false; - c->always_ack_ = true; - c->open_ = false; - break; - case 'q': - c->quiet_ = true; - c->verbose_ = false; - c->dump_ = false; - break; - case 'c': - c->certificate_ = std::string(optarg); - break; - case 'H': - default: - help(); - exit(EXIT_FAILURE); - } - } - - auto ping = std::make_unique(c); - - auto t0 = std::chrono::steady_clock::now(); - ping->ping(); - auto t1 = std::chrono::steady_clock::now(); - - std::cout - << "Elapsed time: " - << std::chrono::duration_cast(t1 - t0).count() - << std::endl; - -#ifdef _WIN32 - WSACleanup(); -#endif - return 0; -} - -} // namespace ping - -} // namespace core - -} // namespace transport - -int main(int argc, char *argv[]) { - return transport::core::ping::main(argc, argv); -} diff --git a/utils/src/ping_server.cc b/utils/src/ping_server.cc deleted file mode 100644 index 79c662231..000000000 --- a/utils/src/ping_server.cc +++ /dev/null @@ -1,340 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include -#ifndef _WIN32 -#include -#include -#else -#include -#endif - -#include -#include -#include -#include -#include - -#include - -namespace transport { - -namespace interface { - -using HashAlgorithm = core::HashAlgorithm; -using CryptoSuite = auth::CryptoSuite; - -auth::Identity setProducerIdentity(std::string keystore_name, - std::string keystore_password, - auth::CryptoHashType hash_algorithm) { - if (access(keystore_name.c_str(), F_OK) != -1) { - return auth::Identity(keystore_name, keystore_password, hash_algorithm); - } else { - return auth::Identity(keystore_name, keystore_password, - CryptoSuite::RSA_SHA256, 1024, 365, "producer-test"); - } -} - -class CallbackContainer { - const std::size_t log2_content_object_buffer_size = 12; - - public: - CallbackContainer(const Name &prefix, uint32_t object_size, bool verbose, - bool dump, bool quite, bool flags, bool reset, uint8_t ttl, - auth::Identity *identity, bool sign, uint32_t lifetime) - : buffer_(object_size, 'X'), - content_objects_((std::uint32_t)(1 << log2_content_object_buffer_size)), - mask_((std::uint16_t)(1 << log2_content_object_buffer_size) - 1), - content_objects_index_(0), - verbose_(verbose), - dump_(dump), - quite_(quite), - flags_(flags), - reset_(reset), - ttl_(ttl), - identity_(identity), - sign_(sign) { - core::Packet::Format format; - - if (prefix.getAddressFamily() == AF_INET) { - format = core::Packet::Format::HF_INET_TCP; - if (sign_) { - format = core::Packet::Format::HF_INET_TCP_AH; - } - } else { - format = core::Packet::Format::HF_INET6_TCP; - if (sign_) { - format = core::Packet::Format::HF_INET6_TCP_AH; - } - } - - for (int i = 0; i < (1 << log2_content_object_buffer_size); i++) { - content_objects_[i] = std::make_shared( - prefix, format, 0, (const uint8_t *)buffer_.data(), buffer_.size()); - content_objects_[i]->setLifetime(lifetime); - } - } - - void processInterest(ProducerSocket &p, const Interest &interest, - uint32_t lifetime) { - if (verbose_) { - std::cout << "<<< received interest " << interest.getName() - << " src port: " << interest.getSrcPort() - << " dst port: " << interest.getDstPort() - << " flags: " << interest.printFlags() - << "TTL: " << (int)interest.getTTL() << std::endl; - } else if (!quite_) { - std::cout << "<<< received interest " << interest.getName() << std::endl; - } - - if (dump_) { - std::cout << "----- interest dump -----" << std::endl; - interest.dump(); - std::cout << "-------------------------" << std::endl; - } - - if (interest.testRst()) { - std::cout << "!!!got a reset, I don't reply" << std::endl; - } else { - auto &content_object = content_objects_[content_objects_index_++ & mask_]; - - content_object->setName(interest.getName()); - content_object->setLifetime(lifetime); - content_object->setLocator(interest.getLocator()); - content_object->setSrcPort(interest.getDstPort()); - content_object->setDstPort(interest.getSrcPort()); - content_object->setTTL(ttl_); - - if (!sign_) { - content_object->resetFlags(); - } - - if (flags_) { - if (interest.testSyn()) { - content_object->setSyn(); - content_object->setAck(); - } else if (interest.testAck()) { - content_object->setAck(); - } // here I may need to handle the FIN flag; - } else if (reset_) { - content_object->setRst(); - } - - if (verbose_) { - std::cout << ">>> send object " << content_object->getName() - << " src port: " << content_object->getSrcPort() - << " dst port: " << content_object->getDstPort() - << " flags: " << content_object->printFlags() - << " TTL: " << (int)content_object->getTTL() << std::endl; - } else if (!quite_) { - std::cout << ">>> send object " << content_object->getName() - << std::endl; - } - - if (dump_) { - std::cout << "----- object dump -----" << std::endl; - content_object->dump(); - std::cout << "-----------------------" << std::endl; - } - - if (!quite_) std::cout << std::endl; - - if (sign_) { - identity_->getSigner()->signPacket(content_object.get()); - } - - p.produce(*content_object); - } - } - - private: - std::string buffer_; - std::vector> content_objects_; - std::uint16_t mask_; - std::uint16_t content_objects_index_; - bool verbose_; - bool dump_; - bool quite_; - bool flags_; - bool reset_; - uint8_t ttl_; - auth::Identity *identity_; - bool sign_; -}; - -void help() { - std::cout << "usage: hicn-preoducer-ping [options]" << std::endl; - std::cout << "PING options" << std::endl; - std::cout << "-s object content size (default 1350B)" << std::endl; - std::cout << "-n hicn name (default b001::/64)" << std::endl; - std::cout << "-f set tcp flags according to the flag received " - "(default false)" - << std::endl; - std::cout << "-l data lifetime" << std::endl; - std::cout << "-r always reply with a reset flag (default false)" - << std::endl; - std::cout << "-t set ttl (default 64)" << std::endl; - std::cout << "OUTPUT options" << std::endl; - std::cout << "-V verbose, prints statistics about the messagges sent " - "and received (default false)" - << std::endl; - std::cout << "-D dump, dumps sent and received packets (default false)" - << std::endl; - std::cout << "-q quite, not prints (default false)" << std::endl; -#ifndef _WIN32 - std::cout << "-d daemon mode" << std::endl; -#endif - std::cout << "-H prints this message" << std::endl; -} - -int main(int argc, char **argv) { -#ifdef _WIN32 - WSADATA wsaData = {0}; - WSAStartup(MAKEWORD(2, 2), &wsaData); -#else - bool daemon = false; -#endif - std::string name_prefix = "b001::0/64"; - std::string delimiter = "/"; - bool verbose = false; - bool dump = false; - bool quite = false; - bool flags = false; - bool reset = false; - uint32_t object_size = 1250; - uint8_t ttl = 64; - std::string keystore_path = "./rsa_crypto_material.p12"; - std::string keystore_password = "cisco"; - bool sign = false; - uint32_t data_lifetime = default_values::content_object_expiry_time; - - int opt; -#ifndef _WIN32 - while ((opt = getopt(argc, argv, "s:n:t:l:qfrVDdHk:p:")) != -1) { -#else - while ((opt = getopt(argc, argv, "s:n:t:l:qfrVDHk:p:")) != -1) { -#endif - switch (opt) { - case 's': - object_size = std::stoi(optarg); - break; - case 'n': - name_prefix = optarg; - break; - case 't': - ttl = (uint8_t)std::stoi(optarg); - break; - case 'l': - data_lifetime = std::stoi(optarg); - break; - case 'V': - verbose = true; - break; - case 'D': - dump = true; - break; - case 'q': - verbose = false; - dump = false; - quite = true; - break; -#ifndef _WIN32 - case 'd': - daemon = true; - break; -#endif - case 'f': - flags = true; - break; - case 'r': - reset = true; - break; - case 'k': - keystore_path = optarg; - sign = true; - break; - case 'p': - keystore_password = optarg; - break; - case 'H': - default: - help(); - exit(EXIT_FAILURE); - } - } - -#ifndef _WIN32 - if (daemon) { - utils::Daemonizator::daemonize(); - } -#endif - - core::Prefix producer_namespace(name_prefix); - - utils::StringTokenizer tokenizer(name_prefix, delimiter); - std::string ip_address = tokenizer.nextToken(); - Name n(ip_address); - - if (object_size > 1350) object_size = 1350; - - CallbackContainer *stubs; - auth::Identity identity = setProducerIdentity( - keystore_path, keystore_password, auth::CryptoHashType::SHA_256); - - if (sign) { - stubs = new CallbackContainer(n, object_size, verbose, dump, quite, flags, - reset, ttl, &identity, sign, data_lifetime); - } else { - auth::Identity *identity = nullptr; - stubs = new CallbackContainer(n, object_size, verbose, dump, quite, flags, - reset, ttl, identity, sign, data_lifetime); - } - - ProducerSocket p; - p.registerPrefix(producer_namespace); - - p.setSocketOption(GeneralTransportOptions::OUTPUT_BUFFER_SIZE, 0U); - p.setSocketOption( - ProducerCallbacksOptions::CACHE_MISS, - (ProducerInterestCallback)bind(&CallbackContainer::processInterest, stubs, - std::placeholders::_1, - std::placeholders::_2, data_lifetime)); - - p.connect(); - - asio::io_service io_service; - asio::signal_set signal_set(io_service, SIGINT); - signal_set.async_wait( - [&p, &io_service](const std::error_code &, const int &) { - std::cout << "STOPPING!!" << std::endl; - p.stop(); - io_service.stop(); - }); - - io_service.run(); - -#ifdef _WIN32 - WSACleanup(); -#endif - return 0; -} - -} // namespace interface - -} // end namespace transport - -int main(int argc, char **argv) { - return transport::interface::main(argc, argv); -} -- cgit 1.2.3-korg