aboutsummaryrefslogtreecommitdiffstats
path: root/apps/hiperf/src/server.cc
diff options
context:
space:
mode:
Diffstat (limited to 'apps/hiperf/src/server.cc')
-rw-r--r--apps/hiperf/src/server.cc816
1 files changed, 457 insertions, 359 deletions
diff --git a/apps/hiperf/src/server.cc b/apps/hiperf/src/server.cc
index 7101e7a4a..afaf5423b 100644
--- a/apps/hiperf/src/server.cc
+++ b/apps/hiperf/src/server.cc
@@ -21,156 +21,79 @@ namespace hiperf {
* Hiperf server class: configure and setup an hicn producer following the
* ServerConfiguration.
*/
-class HIperfServer::Impl : public ProducerSocket::Callback {
- const std::size_t log2_content_object_buffer_size = 8;
-
- public:
- Impl(const hiperf::ServerConfiguration &conf)
- : configuration_(conf),
- signals_(io_service_),
- rtc_timer_(io_service_),
- unsatisfied_interests_(),
- content_objects_((std::uint16_t)(1 << log2_content_object_buffer_size)),
- content_objects_index_(0),
- mask_((std::uint16_t)(1 << log2_content_object_buffer_size) - 1),
- last_segment_(0),
-#ifndef _WIN32
- input_(io_service_),
- rtc_running_(false),
-#endif
- flow_name_(configuration_.name.getName()),
- socket_(io_service_),
- recv_buffer_(nullptr, 0) {
- std::string buffer(configuration_.payload_size_, 'X');
- std::cout << "Producing contents under name " << conf.name.getName()
- << std::endl;
-#ifndef _WIN32
- if (configuration_.interactive_) {
- input_.assign(::dup(STDIN_FILENO));
- }
-#endif
-
- for (int i = 0; i < (1 << log2_content_object_buffer_size); i++) {
- content_objects_[i] = ContentObject::Ptr(new ContentObject(
- configuration_.name.getName(), configuration_.packet_format_, 0,
- (const uint8_t *)buffer.data(), buffer.size()));
- content_objects_[i]->setLifetime(
- default_values::content_object_expiry_time);
- }
+class HIperfServer::Impl {
+ static inline constexpr std::size_t klog2_content_object_buffer_size() {
+ return 8;
}
-
- virtual ~Impl() {}
-
- void virtualProcessInterest(ProducerSocket &p, const Interest &interest) {
- content_objects_[content_objects_index_ & mask_]->setName(
- interest.getName());
- producer_socket_->produce(
- *content_objects_[content_objects_index_++ & mask_]);
+ static inline constexpr std::size_t kcontent_object_buffer_size() {
+ return (1 << klog2_content_object_buffer_size());
}
-
- void processInterest(ProducerSocket &p, const Interest &interest) {
- p.setSocketOption(ProducerCallbacksOptions::CACHE_MISS,
- (ProducerInterestCallback)VOID_HANDLER);
- p.setSocketOption(GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME,
- 5000000_U32);
-
- produceContent(p, interest.getName(), interest.getName().getSuffix());
- std::cout << "Received interest " << interest.getName().getSuffix()
- << std::endl;
+ static inline constexpr std::size_t kmask() {
+ return (kcontent_object_buffer_size() - 1);
}
- void asyncProcessInterest(ProducerSocket &p, const Interest &interest) {
- p.setSocketOption(ProducerCallbacksOptions::CACHE_MISS,
- (ProducerInterestCallback)bind(&Impl::cacheMiss, this,
- std::placeholders::_1,
- std::placeholders::_2));
- p.setSocketOption(GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME,
- 5000000_U32);
- uint32_t suffix = interest.getName().getSuffix();
-
- if (suffix == 0) {
- last_segment_ = 0;
- unsatisfied_interests_.clear();
- }
-
- // The suffix will either come from the received interest or will be set to
- // the smallest suffix of a previous interest not satisfied
- if (!unsatisfied_interests_.empty()) {
- auto it = std::lower_bound(unsatisfied_interests_.begin(),
- unsatisfied_interests_.end(), last_segment_);
- if (it != unsatisfied_interests_.end()) {
- suffix = *it;
+ /**
+ * @brief As we can (potentially) setup many producer sockets, we need to keep
+ * a separate context for each one of them. The context contains parameters
+ * and variable that are specific to a single producer socket.
+ */
+ class ProducerContext
+ : public Base<ProducerContext, ServerConfiguration, Impl>,
+ public ProducerSocket::Callback {
+ public:
+ using ConfType = ServerConfiguration;
+ using ParentType = typename HIperfServer::Impl;
+ static inline const auto getContextType() { return "ProducerContext"; }
+
+ ProducerContext(HIperfServer::Impl &server, int producer_identifier)
+ : Base(server, server.io_service_, producer_identifier) {
+ // Allocate buffer to copy as content objects payload
+ std::string buffer(configuration_.payload_size_, 'X');
+
+ // Allocate array of content objects. They are share_ptr so that the
+ // transport will only capture a reference to them instead of performing
+ // an hard copy.
+ for (std::size_t i = 0; i < kcontent_object_buffer_size(); i++) {
+ const auto &element =
+ content_objects_.emplace_back(std::make_shared<ContentObject>(
+ configuration_.name_.makeName(), configuration_.packet_format_,
+ 0, (const uint8_t *)buffer.data(), buffer.size()));
+ element->setLifetime(default_values::content_object_expiry_time);
}
- unsatisfied_interests_.erase(unsatisfied_interests_.begin(), it);
}
- std::cout << "Received interest " << interest.getName().getSuffix()
- << ", starting production at " << suffix << std::endl;
- std::cout << unsatisfied_interests_.size() << " interests still unsatisfied"
- << std::endl;
- produceContentAsync(p, interest.getName(), suffix);
- }
+ // To make vector happy (move or copy constructor is needed when vector
+ // resizes)
+ ProducerContext(ProducerContext &&other) noexcept
+ : Base(std::move(other)),
+ content_objects_(std::move(other.content_objects_)),
+ unsatisfied_interests_(std::move(other.unsatisfied_interests_)),
+ last_segment_(other.last_segment_),
+ producer_socket_(std::move(other.producer_socket_)),
+ content_objects_index_(other.content_objects_index_),
+ payload_size_max_(other.payload_size_max_) {}
- void produceContent(ProducerSocket &p, const Name &content_name,
- uint32_t suffix) {
- auto b = utils::MemBuf::create(configuration_.download_size);
- std::memset(b->writableData(), '?', configuration_.download_size);
- b->append(configuration_.download_size);
- uint32_t total;
-
- utils::SteadyTime::TimePoint t0 = utils::SteadyTime::Clock::now();
- total = p.produceStream(content_name, std::move(b),
- !configuration_.multiphase_produce_, suffix);
- utils::SteadyTime::TimePoint t1 = utils::SteadyTime::Clock::now();
-
- std::cout << "Written " << total
- << " data packets in output buffer (Segmentation time: "
- << utils::SteadyTime::getDurationUs(t0, t1).count() << " us)"
- << std::endl;
- }
+ virtual ~ProducerContext() = default;
- void produceContentAsync(ProducerSocket &p, Name content_name,
- uint32_t suffix) {
- produce_thread_.add([this, suffix, content_name]() {
- auto b = utils::MemBuf::create(configuration_.download_size);
- std::memset(b->writableData(), '?', configuration_.download_size);
- b->append(configuration_.download_size);
-
- last_segment_ = suffix + producer_socket_->produceStream(
- content_name, std::move(b),
- !configuration_.multiphase_produce_, suffix);
- });
- }
+ /**
+ * @brief Produce datagram
+ */
+ void produceDatagram(const uint8_t *buffer, std::size_t buffer_size) const {
+ assert(producer_socket_);
- void cacheMiss(ProducerSocket &p, const Interest &interest) {
- unsatisfied_interests_.push_back(interest.getName().getSuffix());
- }
+ auto size = std::min(buffer_size, payload_size_max_);
- void onContentProduced(ProducerSocket &p, const std::error_code &err,
- uint64_t bytes_written) {
- p.setSocketOption(ProducerCallbacksOptions::CACHE_MISS,
- (ProducerInterestCallback)bind(
- &Impl::asyncProcessInterest, this,
- std::placeholders::_1, std::placeholders::_2));
- }
+ producer_socket_->produceDatagram(flow_name_, buffer, size);
+ }
- void produceError(const std::error_code &err) noexcept override {
- std::cerr << "Error from producer transport: " << err.message()
- << std::endl;
- producer_socket_->stop();
- io_service_.stop();
- }
+ /**
+ * @brief Create and setup the producer socket
+ */
+ int setup() {
+ int ret;
+ int production_protocol;
+ std::shared_ptr<Signer> signer = std::make_shared<VoidSigner>();
- int setup() {
- int ret;
- int production_protocol;
- std::shared_ptr<Signer> signer = std::make_shared<VoidSigner>();
-
- if (configuration_.secure_) {
- producer_socket_ = std::make_unique<P2PSecureProducerSocket>(
- configuration_.rtc_, configuration_.keystore_name,
- configuration_.keystore_password);
- } else {
if (!configuration_.rtc_) {
production_protocol = ProductionProtocolAlgorithms::BYTE_STREAM;
} else {
@@ -178,255 +101,466 @@ class HIperfServer::Impl : public ProducerSocket::Callback {
}
producer_socket_ = std::make_unique<ProducerSocket>(production_protocol);
- }
-
- if (producer_socket_->setSocketOption(
- ProducerCallbacksOptions::PRODUCER_CALLBACK, this) ==
- SOCKET_OPTION_NOT_SET) {
- std::cerr << "Failed to set producer callback." << std::endl;
- return ERROR_SETUP;
- }
-
- if (producer_socket_->setSocketOption(
- GeneralTransportOptions::MAKE_MANIFEST, configuration_.manifest) ==
- SOCKET_OPTION_NOT_SET) {
- return ERROR_SETUP;
- }
- if (producer_socket_->setSocketOption(
- GeneralTransportOptions::HASH_ALGORITHM,
- configuration_.hash_algorithm_) == SOCKET_OPTION_NOT_SET) {
- return ERROR_SETUP;
- }
-
- if (producer_socket_->setSocketOption(PACKET_FORMAT,
- configuration_.packet_format_) ==
- SOCKET_OPTION_NOT_SET) {
- std::cerr << "ERROR -- Impossible to set the packet format." << std::endl;
- return ERROR_SETUP;
- }
+ if (producer_socket_->setSocketOption(
+ ProducerCallbacksOptions::PRODUCER_CALLBACK, this) ==
+ SOCKET_OPTION_NOT_SET) {
+ getOutputStream() << "Failed to set producer callback." << std::endl;
+ return ERROR_SETUP;
+ }
- if (!configuration_.passphrase.empty()) {
- signer = std::make_shared<SymmetricSigner>(CryptoSuite::HMAC_SHA256,
- configuration_.passphrase);
- }
+ if (producer_socket_->setSocketOption(
+ GeneralTransportOptions::HASH_ALGORITHM,
+ configuration_.hash_algorithm_) == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
- if (!configuration_.keystore_name.empty()) {
- signer = std::make_shared<AsymmetricSigner>(
- configuration_.keystore_name, configuration_.keystore_password);
- }
+ if (producer_socket_->setSocketOption(
+ GeneralTransportOptions::MANIFEST_MAX_CAPACITY,
+ configuration_.manifest_max_capacity_) == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
- producer_socket_->setSocketOption(GeneralTransportOptions::SIGNER, signer);
+ if (producer_socket_->setSocketOption(PACKET_FORMAT,
+ configuration_.packet_format_) ==
+ SOCKET_OPTION_NOT_SET) {
+ getOutputStream() << "ERROR -- Impossible to set the packet format."
+ << std::endl;
+ return ERROR_SETUP;
+ }
- // Compute maximum payload size
- Packet::Format format = PayloadSize::getFormatFromName(
- configuration_.name.getName(), !configuration_.manifest);
- payload_size_max_ = PayloadSize(format).getPayloadSizeMax(
- configuration_.rtc_ ? RTC_HEADER_SIZE : 0,
- configuration_.fec_type_.empty() ? 0 : FEC_HEADER_MAX_SIZE,
- !configuration_.manifest ? signer->getSignatureFieldSize() : 0);
+ if (!configuration_.passphrase_.empty()) {
+ signer = std::make_shared<SymmetricSigner>(CryptoSuite::HMAC_SHA256,
+ configuration_.passphrase_);
+ }
- if (configuration_.payload_size_ > payload_size_max_) {
- std::cerr << "WARNING: Payload has size " << configuration_.payload_size_
- << ", maximum is " << payload_size_max_
- << ". Payload will be truncated to fit." << std::endl;
- }
+ if (!configuration_.keystore_name_.empty()) {
+ signer = std::make_shared<AsymmetricSigner>(
+ configuration_.keystore_name_, configuration_.keystore_password_);
+ }
- if (configuration_.rtc_) {
- ret = producer_socket_->setSocketOption(
- RtcTransportOptions::AGGREGATED_DATA,
- configuration_.aggregated_data_);
+ producer_socket_->setSocketOption(GeneralTransportOptions::SIGNER,
+ signer);
+
+ // Compute maximum payload size
+ Packet::Format format = PayloadSize::getFormatFromPrefix(
+ configuration_.name_, !configuration_.manifest_max_capacity_);
+ payload_size_max_ = PayloadSize(format).getPayloadSizeMax(
+ configuration_.rtc_ ? RTC_HEADER_SIZE : 0,
+ configuration_.fec_type_.empty() ? 0 : FEC_HEADER_MAX_SIZE,
+ !configuration_.manifest_max_capacity_
+ ? signer->getSignatureFieldSize()
+ : 0);
+
+ if (configuration_.payload_size_ > payload_size_max_) {
+ getOutputStream() << "WARNING: Payload has size "
+ << configuration_.payload_size_ << ", maximum is "
+ << payload_size_max_
+ << ". Payload will be truncated to fit." << std::endl;
+ }
- if (ret == SOCKET_OPTION_NOT_SET) {
- return ERROR_SETUP;
+ // Verifier for aggregated interests
+ std::shared_ptr<Verifier> verifier = std::make_shared<VoidVerifier>();
+ if (!configuration_.aggr_interest_passphrase_.empty()) {
+ verifier = std::make_unique<SymmetricVerifier>(
+ configuration_.aggr_interest_passphrase_);
}
- }
+ ret = producer_socket_->setSocketOption(GeneralTransportOptions::VERIFIER,
+ verifier);
+ if (ret == SOCKET_OPTION_NOT_SET) return ERROR_SETUP;
- if (configuration_.rtc_) {
- ret = producer_socket_->setSocketOption(GeneralTransportOptions::FEC_TYPE,
- configuration_.fec_type_);
+ if (configuration_.rtc_) {
+ ret = producer_socket_->setSocketOption(
+ RtcTransportOptions::AGGREGATED_DATA,
+ configuration_.aggregated_data_);
- if (ret == SOCKET_OPTION_NOT_SET) {
- return ERROR_SETUP;
- }
- }
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
- producer_socket_->registerPrefix(configuration_.name);
- producer_socket_->connect();
- producer_socket_->start();
+ ret = producer_socket_->setSocketOption(
+ GeneralTransportOptions::FEC_TYPE, configuration_.fec_type_);
- if (configuration_.rtc_) {
- std::cout << "Running RTC producer: the prefix length will be ignored."
- " Use /128 by default in RTC mode"
- << std::endl;
- return ERROR_SUCCESS;
- }
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
+ }
- if (!configuration_.virtual_producer) {
if (producer_socket_->setSocketOption(
GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME,
- configuration_.content_lifetime) == SOCKET_OPTION_NOT_SET) {
+ configuration_.content_lifetime_) == SOCKET_OPTION_NOT_SET) {
return ERROR_SETUP;
}
- if (producer_socket_->setSocketOption(
- GeneralTransportOptions::OUTPUT_BUFFER_SIZE, 200000U) ==
- SOCKET_OPTION_NOT_SET) {
- return ERROR_SETUP;
- }
+ producer_socket_->registerPrefix(Prefix(flow_name_, 128));
+ producer_socket_->connect();
+ producer_socket_->start();
- if (producer_socket_->setSocketOption(
- GeneralTransportOptions::MAX_SEGMENT_SIZE,
- static_cast<uint32_t>(configuration_.payload_size_)) ==
- SOCKET_OPTION_NOT_SET) {
- return ERROR_SETUP;
+ if (configuration_.rtc_) {
+ return ERROR_SUCCESS;
}
- if (!configuration_.live_production) {
- produceContent(*producer_socket_, configuration_.name.getName(), 0);
+ if (!configuration_.virtual_producer_) {
+ if (producer_socket_->setSocketOption(
+ GeneralTransportOptions::OUTPUT_BUFFER_SIZE, 200000U) ==
+ SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
+
+ if (producer_socket_->setSocketOption(
+ GeneralTransportOptions::MAX_SEGMENT_SIZE,
+ static_cast<uint32_t>(configuration_.payload_size_)) ==
+ SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
+
+ if (!configuration_.live_production_) {
+ produceContent(*producer_socket_, configuration_.name_.makeName(), 0);
+ } else {
+ ret = producer_socket_->setSocketOption(
+ ProducerCallbacksOptions::CACHE_MISS,
+ (ProducerInterestCallback)bind(
+ &ProducerContext::asyncProcessInterest, this,
+ std::placeholders::_1, std::placeholders::_2));
+
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
+ }
} else {
ret = producer_socket_->setSocketOption(
+ GeneralTransportOptions::OUTPUT_BUFFER_SIZE, 0U);
+
+ if (ret == SOCKET_OPTION_NOT_SET) {
+ return ERROR_SETUP;
+ }
+
+ ret = producer_socket_->setSocketOption(
ProducerCallbacksOptions::CACHE_MISS,
- (ProducerInterestCallback)bind(&Impl::asyncProcessInterest, this,
- std::placeholders::_1,
- std::placeholders::_2));
+ (ProducerInterestCallback)bind(
+ &ProducerContext::virtualProcessInterest, this,
+ std::placeholders::_1, std::placeholders::_2));
if (ret == SOCKET_OPTION_NOT_SET) {
return ERROR_SETUP;
}
}
- } else {
- ret = producer_socket_->setSocketOption(
- GeneralTransportOptions::OUTPUT_BUFFER_SIZE, 0U);
+ ret = producer_socket_->setSocketOption(
+ ProducerCallbacksOptions::CONTENT_PRODUCED,
+ (ProducerContentCallback)bind(
+ &ProducerContext::onContentProduced, this, std::placeholders::_1,
+ std::placeholders::_2, std::placeholders::_3));
if (ret == SOCKET_OPTION_NOT_SET) {
return ERROR_SETUP;
}
- ret = producer_socket_->setSocketOption(
- ProducerCallbacksOptions::CACHE_MISS,
- (ProducerInterestCallback)bind(&Impl::virtualProcessInterest, this,
- std::placeholders::_1,
- std::placeholders::_2));
+ return ERROR_SUCCESS;
+ }
- if (ret == SOCKET_OPTION_NOT_SET) {
- return ERROR_SETUP;
+ int run() {
+ getOutputStream() << "started to serve consumers with name " << flow_name_
+ << std::endl;
+ return ERROR_SUCCESS;
+ }
+
+ void stop() {
+ getOutputStream() << "stopped to serve consumers" << std::endl;
+ producer_socket_->stop();
+ }
+
+ private:
+ /**
+ * @brief Produce an existing content object. Set the name as the
+ * interest.
+ */
+ void virtualProcessInterest(ProducerSocket &p, const Interest &interest) {
+ content_objects_[content_objects_index_ & kmask()]->setName(
+ interest.getName());
+ p.produce(*content_objects_[content_objects_index_++ & kmask()]);
+ }
+
+ /**
+ * @brief Create and produce a buffer of configuration_.download_size_
+ * length.
+ */
+ void produceContent(ProducerSocket &p, const Name &content_name,
+ uint32_t suffix) const {
+ uint32_t total;
+
+ auto b = utils::MemBuf::create(configuration_.download_size_);
+ std::memset(b->writableData(), '?', configuration_.download_size_);
+ b->append(configuration_.download_size_);
+
+ utils::SteadyTime::TimePoint t0 = utils::SteadyTime::Clock::now();
+ total = p.produceStream(content_name, std::move(b),
+ !configuration_.multiphase_produce_, suffix);
+ utils::SteadyTime::TimePoint t1 = utils::SteadyTime::Clock::now();
+
+ Logger() << "Written " << total
+ << " data packets in output buffer (Segmentation time: "
+ << utils::SteadyTime::getDurationUs(t0, t1).count() << " us)"
+ << std::endl;
+ }
+
+ /**
+ * @brief Synchronously produce content upon reception of one interest
+ */
+ void processInterest(ProducerSocket &p, const Interest &interest) const {
+ p.setSocketOption(ProducerCallbacksOptions::CACHE_MISS,
+ (ProducerInterestCallback)VOID_HANDLER);
+ p.setSocketOption(GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME,
+ configuration_.content_lifetime_);
+
+ produceContent(p, interest.getName(), interest.getName().getSuffix());
+ Logger() << "Received interest " << interest.getName().getSuffix()
+ << std::endl;
+ }
+
+ /**
+ * @brief Async create and produce a buffer of
+ * configuration_.download_size_ length.
+ */
+ void produceContentAsync(ProducerSocket &p, Name content_name,
+ uint32_t suffix) {
+ parent_.produce_thread_.add([this, suffix, content_name, &p]() {
+ auto b = utils::MemBuf::create(configuration_.download_size_);
+ std::memset(b->writableData(), '?', configuration_.download_size_);
+ b->append(configuration_.download_size_);
+
+ last_segment_ =
+ suffix + p.produceStream(content_name, std::move(b),
+ !configuration_.multiphase_produce_,
+ suffix);
+ });
+ }
+
+ /**
+ * @brief Asynchronously produce content upon reception of one interest
+ */
+ void asyncProcessInterest(ProducerSocket &p, const Interest &interest) {
+ p.setSocketOption(ProducerCallbacksOptions::CACHE_MISS,
+ (ProducerInterestCallback)bind(
+ &ProducerContext::cacheMiss, this,
+ std::placeholders::_1, std::placeholders::_2));
+ p.setSocketOption(GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME,
+ configuration_.content_lifetime_);
+ uint32_t suffix = interest.getName().getSuffix();
+
+ if (suffix == 0) {
+ last_segment_ = 0;
+ unsatisfied_interests_.clear();
+ }
+
+ // The suffix will either come from the received interest or will be set
+ // to the smallest suffix of a previous interest not satisfied
+ if (!unsatisfied_interests_.empty()) {
+ auto it = std::lower_bound(unsatisfied_interests_.begin(),
+ unsatisfied_interests_.end(), last_segment_);
+ if (it != unsatisfied_interests_.end()) {
+ suffix = *it;
+ }
+ unsatisfied_interests_.erase(unsatisfied_interests_.begin(), it);
}
+
+ getOutputStream() << " Received interest "
+ << interest.getName().getSuffix()
+ << ", starting production at " << suffix << end_mod_
+ << std::endl;
+ getOutputStream() << unsatisfied_interests_.size()
+ << " interests still unsatisfied" << end_mod_
+ << std::endl;
+ produceContentAsync(p, interest.getName(), suffix);
}
- ret = producer_socket_->setSocketOption(
- ProducerCallbacksOptions::CONTENT_PRODUCED,
- (ProducerContentCallback)bind(
- &Impl::onContentProduced, this, std::placeholders::_1,
- std::placeholders::_2, std::placeholders::_3));
+ /**
+ * @brief Register cache miss events
+ */
+ void cacheMiss([[maybe_unused]] const ProducerSocket &p,
+ const Interest &interest) {
+ unsatisfied_interests_.push_back(interest.getName().getSuffix());
+ }
- return ERROR_SUCCESS;
+ /**
+ * @brief When content is produced, set cache miss callback so that we can
+ * register any cache miss happening after the production.
+ */
+ void onContentProduced(ProducerSocket &p,
+ [[maybe_unused]] const std::error_code &err,
+ [[maybe_unused]] uint64_t bytes_written) {
+ p.setSocketOption(ProducerCallbacksOptions::CACHE_MISS,
+ (ProducerInterestCallback)bind(
+ &ProducerContext::asyncProcessInterest, this,
+ std::placeholders::_1, std::placeholders::_2));
+ }
+
+ /**
+ * @brief Internal producer error. When this callback is triggered
+ * something important happened. Here we stop the program.
+ */
+ void produceError(const std::error_code &err) noexcept override {
+ getOutputStream() << "Error from producer transport: " << err.message()
+ << std::endl;
+ parent_.stop();
+ }
+
+ // Members initialized in constructor
+ std::vector<ContentObject::Ptr> content_objects_;
+
+ // Members initialized by in-class initializer
+ std::vector<uint32_t> unsatisfied_interests_;
+ std::uint32_t last_segment_{0};
+ std::unique_ptr<ProducerSocket> producer_socket_{nullptr};
+ std::uint16_t content_objects_index_{0};
+ std::size_t payload_size_max_{0};
+ };
+
+ public:
+ explicit Impl(const hiperf::ServerConfiguration &conf) : config_(conf) {
+#ifndef _WIN32
+ if (config_.interactive_) {
+ input_.assign(::dup(STDIN_FILENO));
+ }
+#endif
+
+ std::memset(rtc_payload_.data(), 'X', rtc_payload_.size());
+ }
+
+ ~Impl() = default;
+
+ int setup() {
+ int ret = ensureFlows(config_.name_, config_.parallel_flows_);
+ if (ret != ERROR_SUCCESS) {
+ return ret;
+ }
+
+ producer_contexts_.reserve(config_.parallel_flows_);
+ for (uint32_t i = 0; i < config_.parallel_flows_; i++) {
+ auto &ctx = producer_contexts_.emplace_back(*this, i);
+ ret = ctx.setup();
+
+ if (ret) {
+ break;
+ }
+ }
+
+ return ret;
}
void receiveStream() {
socket_.async_receive_from(
- asio::buffer(recv_buffer_.first, recv_buffer_.second), remote_,
- [this](const std::error_code &ec, std::size_t length) {
+ asio::buffer(recv_buffer_.writableData(), recv_buffer_.capacity()),
+ remote_, [this](const std::error_code &ec, std::size_t length) {
if (ec) return;
- sendRTCContentFromStream(recv_buffer_.first, length);
+ sendRTCContentFromStream(recv_buffer_.writableData(), length);
receiveStream();
});
}
- void sendRTCContentFromStream(uint8_t *buff, std::size_t len) {
- auto payload =
- content_objects_[content_objects_index_++ & mask_]->getPayload();
+ void sendRTCContentFromStream(const uint8_t *buff, std::size_t len) {
// this is used to compute the data packet delay
// Used only for performance evaluation
// It requires clock synchronization between producer and consumer
- uint64_t now = utils::SystemTime::nowMs().count();
+ auto now = utils::SystemTime::nowMs().count();
- uint8_t *start = (uint8_t *)payload->writableData();
+ auto start = rtc_payload_.data();
std::memcpy(start, &now, sizeof(uint64_t));
std::memcpy(start + sizeof(uint64_t), buff, len);
- producer_socket_->produceDatagram(flow_name_, start,
- len + sizeof(uint64_t));
+
+ for (const auto &producer_context : producer_contexts_) {
+ producer_context.produceDatagram(start, len + sizeof(uint64_t));
+ }
}
void sendRTCContentObjectCallback(const std::error_code &ec) {
if (ec) return;
rtc_timer_.expires_from_now(
- configuration_.production_rate_.getMicrosecondsForPacket(
- configuration_.payload_size_));
+ config_.production_rate_.getMicrosecondsForPacket(
+ config_.payload_size_));
rtc_timer_.async_wait(std::bind(&Impl::sendRTCContentObjectCallback, this,
std::placeholders::_1));
- auto payload =
- content_objects_[content_objects_index_++ & mask_]->getPayload();
+
+ auto start = rtc_payload_.data();
// this is used to compute the data packet delay
// Used only for performance evaluation
// It requires clock synchronization between producer and consumer
- uint64_t now = utils::SystemTime::nowMs().count();
-
- std::memcpy(payload->writableData(), &now, sizeof(uint64_t));
+ auto now = utils::SystemTime::nowMs().count();
+ std::memcpy(start, &now, sizeof(uint64_t));
- producer_socket_->produceDatagram(flow_name_, payload->data(),
- payload->length() < payload_size_max_
- ? payload->length()
- : payload_size_max_);
+ for (const auto &producer_context : producer_contexts_) {
+ producer_context.produceDatagram(start, config_.payload_size_);
+ }
}
void sendRTCContentObjectCallbackWithTrace(const std::error_code &ec) {
if (ec) return;
- auto payload =
- content_objects_[content_objects_index_++ & mask_]->getPayload();
-
- uint32_t packet_len =
- configuration_.trace_[configuration_.trace_index_].size;
+ std::size_t packet_len = config_.trace_[config_.trace_index_].size;
// this is used to compute the data packet delay
// used only for performance evaluation
// it requires clock synchronization between producer and consumer
- uint64_t now = utils::SystemTime::nowMs().count();
- std::memcpy(payload->writableData(), &now, sizeof(uint64_t));
+ auto now = utils::SystemTime::nowMs().count();
+ auto start = rtc_payload_.data();
+ std::memcpy(start, &now, sizeof(uint64_t));
- if (packet_len > payload->length()) packet_len = payload->length();
- if (packet_len > payload_size_max_) packet_len = payload_size_max_;
+ if (packet_len > config_.payload_size_) {
+ packet_len = config_.payload_size_;
+ }
- producer_socket_->produceDatagram(flow_name_, payload->data(), packet_len);
+ for (const auto &producer_context : producer_contexts_) {
+ producer_context.produceDatagram(start, packet_len);
+ }
- uint32_t next_index = configuration_.trace_index_ + 1;
+ uint32_t next_index = config_.trace_index_ + 1;
uint64_t schedule_next;
- if (next_index < configuration_.trace_.size()) {
- schedule_next =
- configuration_.trace_[next_index].timestamp -
- configuration_.trace_[configuration_.trace_index_].timestamp;
+ if (next_index < config_.trace_.size()) {
+ schedule_next = config_.trace_[next_index].timestamp -
+ config_.trace_[config_.trace_index_].timestamp;
} else {
// here we need to loop, schedule in a random time
schedule_next = 1000;
}
- configuration_.trace_index_ =
- (configuration_.trace_index_ + 1) % configuration_.trace_.size();
+ config_.trace_index_ = (config_.trace_index_ + 1) % config_.trace_.size();
rtc_timer_.expires_from_now(std::chrono::microseconds(schedule_next));
rtc_timer_.async_wait(
std::bind(&Impl::sendRTCContentObjectCallbackWithTrace, this,
std::placeholders::_1));
}
+ int parseTraceFile() {
+ std::ifstream trace(config_.trace_file_);
+ if (trace.fail()) {
+ return -1;
+ }
+ std::string line;
+ while (std::getline(trace, line)) {
+ std::istringstream iss(line);
+ hiperf::packet_t packet;
+ iss >> packet.timestamp >> packet.size;
+ config_.trace_.push_back(packet);
+ }
+ return 0;
+ }
+
#ifndef _WIN32
void handleInput(const std::error_code &error, std::size_t length) {
if (error) {
- producer_socket_->stop();
- io_service_.stop();
+ stop();
}
if (rtc_running_) {
- std::cout << "stop real time content production" << std::endl;
+ Logger() << "stop real time content production" << std::endl;
rtc_running_ = false;
rtc_timer_.cancel();
} else {
- std::cout << "start real time content production" << std::endl;
+ Logger() << "start real time content production" << std::endl;
rtc_running_ = true;
rtc_timer_.expires_from_now(
- configuration_.production_rate_.getMicrosecondsForPacket(
- configuration_.payload_size_));
+ config_.production_rate_.getMicrosecondsForPacket(
+ config_.payload_size_));
rtc_timer_.async_wait(std::bind(&Impl::sendRTCContentObjectCallback, this,
std::placeholders::_1));
}
@@ -439,46 +573,33 @@ class HIperfServer::Impl : public ProducerSocket::Callback {
}
#endif
- int parseTraceFile() {
- std::ifstream trace(configuration_.trace_file_);
- if (trace.fail()) {
- return -1;
- }
- std::string line;
- while (std::getline(trace, line)) {
- std::istringstream iss(line);
- hiperf::packet_t packet;
- iss >> packet.timestamp >> packet.size;
- configuration_.trace_.push_back(packet);
+ void stop() {
+ for (auto &producer_context : producer_contexts_) {
+ producer_context.stop();
}
- return 0;
+
+ io_service_.stop();
}
int run() {
- std::cerr << "Starting to serve consumers" << std::endl;
-
signals_.add(SIGINT);
- signals_.async_wait([this](const std::error_code &, const int &) {
- std::cout << "STOPPING!!" << std::endl;
- producer_socket_->stop();
- io_service_.stop();
- });
+ signals_.async_wait(
+ [this](const std::error_code &, const int &) { stop(); });
- if (configuration_.rtc_) {
-#ifndef _WIN32
- if (configuration_.interactive_) {
+ if (config_.rtc_) {
+ if (config_.interactive_) {
asio::async_read_until(
input_, input_buffer_, '\n',
std::bind(&Impl::handleInput, this, std::placeholders::_1,
std::placeholders::_2));
- } else if (configuration_.trace_based_) {
- std::cout << "trace-based mode enabled" << std::endl;
- if (configuration_.trace_file_ == nullptr) {
- std::cout << "cannot find the trace file" << std::endl;
+ } else if (config_.trace_based_) {
+ Logger() << "trace-based mode enabled" << std::endl;
+ if (config_.trace_file_ == nullptr) {
+ Logger() << "cannot find the trace file" << std::endl;
return ERROR_SETUP;
}
if (parseTraceFile() < 0) {
- std::cout << "cannot parse the trace file" << std::endl;
+ Logger() << "cannot parse the trace file" << std::endl;
return ERROR_SETUP;
}
rtc_running_ = true;
@@ -486,31 +607,26 @@ class HIperfServer::Impl : public ProducerSocket::Callback {
rtc_timer_.async_wait(
std::bind(&Impl::sendRTCContentObjectCallbackWithTrace, this,
std::placeholders::_1));
- } else if (configuration_.input_stream_mode_) {
+ } else if (config_.input_stream_mode_) {
rtc_running_ = true;
// create socket
remote_ = asio::ip::udp::endpoint(
- asio::ip::address::from_string("127.0.0.1"), configuration_.port_);
+ asio::ip::address::from_string("127.0.0.1"), config_.port_);
socket_.open(asio::ip::udp::v4());
socket_.bind(remote_);
- recv_buffer_.first = (uint8_t *)malloc(HIPERF_MTU);
- recv_buffer_.second = HIPERF_MTU;
receiveStream();
} else {
rtc_running_ = true;
rtc_timer_.expires_from_now(
- configuration_.production_rate_.getMicrosecondsForPacket(
- configuration_.payload_size_));
+ config_.production_rate_.getMicrosecondsForPacket(
+ config_.payload_size_));
rtc_timer_.async_wait(std::bind(&Impl::sendRTCContentObjectCallback,
this, std::placeholders::_1));
}
-#else
- rtc_timer_.expires_from_now(
- configuration_.production_rate_.getMicrosecondsForPacket(
- configuration_.payload_size_));
- rtc_timer_.async_wait(std::bind(&Impl::sendRTCContentObjectCallback, this,
- std::placeholders::_1));
-#endif
+ }
+
+ for (auto &producer_context : producer_contexts_) {
+ producer_context.run();
}
io_service_.run();
@@ -518,49 +634,31 @@ class HIperfServer::Impl : public ProducerSocket::Callback {
return ERROR_SUCCESS;
}
+ ServerConfiguration &getConfig() { return config_; }
+
private:
- hiperf::ServerConfiguration configuration_;
+ // Variables initialized by the constructor.
+ ServerConfiguration config_;
+
+ // Variable initialized in the in-class initializer list.
asio::io_service io_service_;
- asio::signal_set signals_;
- asio::steady_timer rtc_timer_;
- std::vector<uint32_t> unsatisfied_interests_;
- std::vector<std::shared_ptr<ContentObject>> content_objects_;
- std::uint16_t content_objects_index_;
- std::uint16_t mask_;
- std::uint32_t last_segment_;
- std::unique_ptr<ProducerSocket> producer_socket_;
+ asio::signal_set signals_{io_service_};
+ asio::steady_timer rtc_timer_{io_service_};
+ asio::posix::stream_descriptor input_{io_service_};
+ asio::ip::udp::socket socket_{io_service_};
+ std::vector<ProducerContext> producer_contexts_;
::utils::EventThread produce_thread_;
- std::size_t payload_size_max_;
-#ifndef _WIN32
- asio::posix::stream_descriptor input_;
asio::streambuf input_buffer_;
- bool rtc_running_;
- Name flow_name_;
- asio::ip::udp::socket socket_;
+ bool rtc_running_{false};
asio::ip::udp::endpoint remote_;
- std::pair<uint8_t *, std::size_t> recv_buffer_;
-#endif
+ utils::MemBuf recv_buffer_{utils::MemBuf::CREATE, HIPERF_MTU};
+ std::array<uint8_t, HIPERF_MTU> rtc_payload_;
};
-HIperfServer::HIperfServer(const ServerConfiguration &conf) {
- impl_ = new Impl(conf);
-}
-
-HIperfServer::HIperfServer(HIperfServer &&other) {
- impl_ = other.impl_;
- other.impl_ = nullptr;
-}
-
-HIperfServer &HIperfServer::operator=(HIperfServer &&other) {
- if (this != &other) {
- impl_ = other.impl_;
- other.impl_ = nullptr;
- }
-
- return *this;
-}
+HIperfServer::HIperfServer(const ServerConfiguration &conf)
+ : impl_(std::make_unique<Impl>(conf)) {}
-HIperfServer::~HIperfServer() { delete impl_; }
+HIperfServer::~HIperfServer() = default;
int HIperfServer::setup() { return impl_->setup(); }