diff options
Diffstat (limited to 'apps/hiperf/src/server.cc')
-rw-r--r-- | apps/hiperf/src/server.cc | 209 |
1 files changed, 131 insertions, 78 deletions
diff --git a/apps/hiperf/src/server.cc b/apps/hiperf/src/server.cc index 968d42e2c..7101e7a4a 100644 --- a/apps/hiperf/src/server.cc +++ b/apps/hiperf/src/server.cc @@ -21,7 +21,7 @@ namespace hiperf { * Hiperf server class: configure and setup an hicn producer following the * ServerConfiguration. */ -class HIperfServer::Impl { +class HIperfServer::Impl : public ProducerSocket::Callback { const std::size_t log2_content_object_buffer_size = 8; public: @@ -35,11 +35,8 @@ class HIperfServer::Impl { mask_((std::uint16_t)(1 << log2_content_object_buffer_size) - 1), last_segment_(0), #ifndef _WIN32 - ptr_last_segment_(&last_segment_), input_(io_service_), rtc_running_(false), -#else - ptr_last_segment_(&last_segment_), #endif flow_name_(configuration_.name.getName()), socket_(io_service_), @@ -54,14 +51,16 @@ class HIperfServer::Impl { #endif for (int i = 0; i < (1 << log2_content_object_buffer_size); i++) { - content_objects_[i] = ContentObject::Ptr( - new ContentObject(conf.name.getName(), HF_INET6_TCP, 0, - (const uint8_t *)buffer.data(), buffer.size())); + content_objects_[i] = ContentObject::Ptr(new ContentObject( + configuration_.name.getName(), configuration_.packet_format_, 0, + (const uint8_t *)buffer.data(), buffer.size())); content_objects_[i]->setLifetime( default_values::content_object_expiry_time); } } + virtual ~Impl() {} + void virtualProcessInterest(ProducerSocket &p, const Interest &interest) { content_objects_[content_objects_index_ & mask_]->setName( interest.getName()); @@ -91,16 +90,14 @@ class HIperfServer::Impl { if (suffix == 0) { last_segment_ = 0; - ptr_last_segment_ = &last_segment_; unsatisfied_interests_.clear(); } - // The suffix will either be the one from the received interest or the - // smallest suffix of a previous interest not satisfed + // The suffix will either come from the received interest or will be set to + // the smallest suffix of a previous interest not satisfied if (!unsatisfied_interests_.empty()) { - auto it = - std::lower_bound(unsatisfied_interests_.begin(), - unsatisfied_interests_.end(), *ptr_last_segment_); + auto it = std::lower_bound(unsatisfied_interests_.begin(), + unsatisfied_interests_.end(), last_segment_); if (it != unsatisfied_interests_.end()) { suffix = *it; } @@ -121,27 +118,28 @@ class HIperfServer::Impl { b->append(configuration_.download_size); uint32_t total; - utils::TimePoint t0 = utils::SteadyClock::now(); + utils::SteadyTime::TimePoint t0 = utils::SteadyTime::Clock::now(); total = p.produceStream(content_name, std::move(b), !configuration_.multiphase_produce_, suffix); - utils::TimePoint t1 = utils::SteadyClock::now(); + utils::SteadyTime::TimePoint t1 = utils::SteadyTime::Clock::now(); - std::cout - << "Written " << total - << " data packets in output buffer (Segmentation time: " - << std::chrono::duration_cast<utils::Microseconds>(t1 - t0).count() - << " us)" << std::endl; + std::cout << "Written " << total + << " data packets in output buffer (Segmentation time: " + << utils::SteadyTime::getDurationUs(t0, t1).count() << " us)" + << std::endl; } void produceContentAsync(ProducerSocket &p, Name content_name, uint32_t suffix) { - auto b = utils::MemBuf::create(configuration_.download_size); - std::memset(b->writableData(), '?', configuration_.download_size); - b->append(configuration_.download_size); - - p.asyncProduce(content_name, std::move(b), - !configuration_.multiphase_produce_, suffix, - &ptr_last_segment_); + produce_thread_.add([this, suffix, content_name]() { + auto b = utils::MemBuf::create(configuration_.download_size); + std::memset(b->writableData(), '?', configuration_.download_size); + b->append(configuration_.download_size); + + last_segment_ = suffix + producer_socket_->produceStream( + content_name, std::move(b), + !configuration_.multiphase_produce_, suffix); + }); } void cacheMiss(ProducerSocket &p, const Interest &interest) { @@ -156,27 +154,22 @@ class HIperfServer::Impl { std::placeholders::_1, std::placeholders::_2)); } - std::shared_ptr<Identity> getProducerIdentity(std::string &keystore_path, - std::string &keystore_pwd, - CryptoHashType &hash_type) { - if (access(keystore_path.c_str(), F_OK) != -1) { - return std::make_shared<Identity>(keystore_path, keystore_pwd, hash_type); - } - return std::make_shared<Identity>(keystore_path, keystore_pwd, - CryptoSuite::RSA_SHA256, 1024, 365, - "producer-test"); + void produceError(const std::error_code &err) noexcept override { + std::cerr << "Error from producer transport: " << err.message() + << std::endl; + producer_socket_->stop(); + io_service_.stop(); } int setup() { int ret; int production_protocol; + std::shared_ptr<Signer> signer = std::make_shared<VoidSigner>(); if (configuration_.secure_) { - auto identity = getProducerIdentity(configuration_.keystore_name, - configuration_.keystore_password, - configuration_.hash_algorithm); producer_socket_ = std::make_unique<P2PSecureProducerSocket>( - configuration_.rtc_, identity); + configuration_.rtc_, configuration_.keystore_name, + configuration_.keystore_password); } else { if (!configuration_.rtc_) { production_protocol = ProductionProtocolAlgorithms::BYTE_STREAM; @@ -188,36 +181,79 @@ class HIperfServer::Impl { } if (producer_socket_->setSocketOption( + ProducerCallbacksOptions::PRODUCER_CALLBACK, this) == + SOCKET_OPTION_NOT_SET) { + std::cerr << "Failed to set producer callback." << std::endl; + return ERROR_SETUP; + } + + if (producer_socket_->setSocketOption( GeneralTransportOptions::MAKE_MANIFEST, configuration_.manifest) == SOCKET_OPTION_NOT_SET) { return ERROR_SETUP; } + if (producer_socket_->setSocketOption( + GeneralTransportOptions::HASH_ALGORITHM, + configuration_.hash_algorithm_) == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + + if (producer_socket_->setSocketOption(PACKET_FORMAT, + configuration_.packet_format_) == + SOCKET_OPTION_NOT_SET) { + std::cerr << "ERROR -- Impossible to set the packet format." << std::endl; + return ERROR_SETUP; + } + if (!configuration_.passphrase.empty()) { - std::shared_ptr<Signer> signer = std::make_shared<SymmetricSigner>( - CryptoSuite::HMAC_SHA256, configuration_.passphrase); - producer_socket_->setSocketOption(GeneralTransportOptions::SIGNER, - signer); + signer = std::make_shared<SymmetricSigner>(CryptoSuite::HMAC_SHA256, + configuration_.passphrase); } if (!configuration_.keystore_name.empty()) { - auto identity = getProducerIdentity(configuration_.keystore_name, - configuration_.keystore_password, - configuration_.hash_algorithm); - std::shared_ptr<Signer> signer = identity->getSigner(); - producer_socket_->setSocketOption(GeneralTransportOptions::SIGNER, - signer); + signer = std::make_shared<AsymmetricSigner>( + configuration_.keystore_name, configuration_.keystore_password); + } + + producer_socket_->setSocketOption(GeneralTransportOptions::SIGNER, signer); + + // Compute maximum payload size + Packet::Format format = PayloadSize::getFormatFromName( + configuration_.name.getName(), !configuration_.manifest); + payload_size_max_ = PayloadSize(format).getPayloadSizeMax( + configuration_.rtc_ ? RTC_HEADER_SIZE : 0, + configuration_.fec_type_.empty() ? 0 : FEC_HEADER_MAX_SIZE, + !configuration_.manifest ? signer->getSignatureFieldSize() : 0); + + if (configuration_.payload_size_ > payload_size_max_) { + std::cerr << "WARNING: Payload has size " << configuration_.payload_size_ + << ", maximum is " << payload_size_max_ + << ". Payload will be truncated to fit." << std::endl; + } + + if (configuration_.rtc_) { + ret = producer_socket_->setSocketOption( + RtcTransportOptions::AGGREGATED_DATA, + configuration_.aggregated_data_); + + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + } + + if (configuration_.rtc_) { + ret = producer_socket_->setSocketOption(GeneralTransportOptions::FEC_TYPE, + configuration_.fec_type_); + + if (ret == SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } } - uint32_t rtc_header_size = 0; - if (configuration_.rtc_) rtc_header_size = 12; - producer_socket_->setSocketOption( - GeneralTransportOptions::DATA_PACKET_SIZE, - (uint32_t)( - configuration_.payload_size_ + rtc_header_size + - (configuration_.name.getAddressFamily() == AF_INET ? 40 : 60))); producer_socket_->registerPrefix(configuration_.name); producer_socket_->connect(); + producer_socket_->start(); if (configuration_.rtc_) { std::cout << "Running RTC producer: the prefix length will be ignored." @@ -239,6 +275,13 @@ class HIperfServer::Impl { return ERROR_SETUP; } + if (producer_socket_->setSocketOption( + GeneralTransportOptions::MAX_SEGMENT_SIZE, + static_cast<uint32_t>(configuration_.payload_size_)) == + SOCKET_OPTION_NOT_SET) { + return ERROR_SETUP; + } + if (!configuration_.live_production) { produceContent(*producer_socket_, configuration_.name.getName(), 0); } else { @@ -283,7 +326,7 @@ class HIperfServer::Impl { void receiveStream() { socket_.async_receive_from( asio::buffer(recv_buffer_.first, recv_buffer_.second), remote_, - [this](std::error_code ec, std::size_t length) { + [this](const std::error_code &ec, std::size_t length) { if (ec) return; sendRTCContentFromStream(recv_buffer_.first, length); receiveStream(); @@ -296,9 +339,8 @@ class HIperfServer::Impl { // this is used to compute the data packet delay // Used only for performance evaluation // It requires clock synchronization between producer and consumer - uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::system_clock::now().time_since_epoch()) - .count(); + uint64_t now = utils::SystemTime::nowMs().count(); + uint8_t *start = (uint8_t *)payload->writableData(); std::memcpy(start, &now, sizeof(uint64_t)); std::memcpy(start + sizeof(uint64_t), buff, len); @@ -306,7 +348,7 @@ class HIperfServer::Impl { len + sizeof(uint64_t)); } - void sendRTCContentObjectCallback(std::error_code ec) { + void sendRTCContentObjectCallback(const std::error_code &ec) { if (ec) return; rtc_timer_.expires_from_now( configuration_.production_rate_.getMicrosecondsForPacket( @@ -319,18 +361,17 @@ class HIperfServer::Impl { // this is used to compute the data packet delay // Used only for performance evaluation // It requires clock synchronization between producer and consumer - uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::system_clock::now().time_since_epoch()) - .count(); + uint64_t now = utils::SystemTime::nowMs().count(); std::memcpy(payload->writableData(), &now, sizeof(uint64_t)); - producer_socket_->produceDatagram( - flow_name_, payload->data(), - payload->length() < 1400 ? payload->length() : 1400); + producer_socket_->produceDatagram(flow_name_, payload->data(), + payload->length() < payload_size_max_ + ? payload->length() + : payload_size_max_); } - void sendRTCContentObjectCallbackWithTrace(std::error_code ec) { + void sendRTCContentObjectCallbackWithTrace(const std::error_code &ec) { if (ec) return; auto payload = @@ -342,14 +383,11 @@ class HIperfServer::Impl { // this is used to compute the data packet delay // used only for performance evaluation // it requires clock synchronization between producer and consumer - uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::system_clock::now().time_since_epoch()) - .count(); - + uint64_t now = utils::SystemTime::nowMs().count(); std::memcpy(payload->writableData(), &now, sizeof(uint64_t)); if (packet_len > payload->length()) packet_len = payload->length(); - if (packet_len > 1400) packet_len = 1400; + if (packet_len > payload_size_max_) packet_len = payload_size_max_; producer_socket_->produceDatagram(flow_name_, payload->data(), packet_len); @@ -450,13 +488,13 @@ class HIperfServer::Impl { std::placeholders::_1)); } else if (configuration_.input_stream_mode_) { rtc_running_ = true; - // crate socket + // create socket remote_ = asio::ip::udp::endpoint( asio::ip::address::from_string("127.0.0.1"), configuration_.port_); socket_.open(asio::ip::udp::v4()); socket_.bind(remote_); - recv_buffer_.first = (uint8_t *)malloc(1500); - recv_buffer_.second = 1500; + recv_buffer_.first = (uint8_t *)malloc(HIPERF_MTU); + recv_buffer_.second = HIPERF_MTU; receiveStream(); } else { rtc_running_ = true; @@ -490,8 +528,9 @@ class HIperfServer::Impl { std::uint16_t content_objects_index_; std::uint16_t mask_; std::uint32_t last_segment_; - std::uint32_t *ptr_last_segment_; std::unique_ptr<ProducerSocket> producer_socket_; + ::utils::EventThread produce_thread_; + std::size_t payload_size_max_; #ifndef _WIN32 asio::posix::stream_descriptor input_; asio::streambuf input_buffer_; @@ -507,6 +546,20 @@ HIperfServer::HIperfServer(const ServerConfiguration &conf) { impl_ = new Impl(conf); } +HIperfServer::HIperfServer(HIperfServer &&other) { + impl_ = other.impl_; + other.impl_ = nullptr; +} + +HIperfServer &HIperfServer::operator=(HIperfServer &&other) { + if (this != &other) { + impl_ = other.impl_; + other.impl_ = nullptr; + } + + return *this; +} + HIperfServer::~HIperfServer() { delete impl_; } int HIperfServer::setup() { return impl_->setup(); } |