aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/protocols/production_protocol.cc
diff options
context:
space:
mode:
Diffstat (limited to 'libtransport/src/protocols/production_protocol.cc')
-rw-r--r--libtransport/src/protocols/production_protocol.cc141
1 files changed, 66 insertions, 75 deletions
diff --git a/libtransport/src/protocols/production_protocol.cc b/libtransport/src/protocols/production_protocol.cc
index 6b317d47d..8b781e38a 100644
--- a/libtransport/src/protocols/production_protocol.cc
+++ b/libtransport/src/protocols/production_protocol.cc
@@ -24,8 +24,8 @@ using namespace interface;
ProductionProtocol::ProductionProtocol(
implementation::ProducerSocket *icn_socket)
- : socket_(icn_socket),
- is_running_(false),
+ : Protocol(),
+ socket_(icn_socket),
fec_encoder_(nullptr),
on_interest_input_(VOID_HANDLER),
on_interest_dropped_input_buffer_(VOID_HANDLER),
@@ -38,101 +38,92 @@ ProductionProtocol::ProductionProtocol(
on_content_object_output_(VOID_HANDLER),
on_content_object_evicted_from_output_buffer_(VOID_HANDLER),
on_content_produced_(VOID_HANDLER),
+ producer_callback_(VOID_HANDLER),
fec_type_(fec::FECType::UNKNOWN) {
socket_->getSocketOption(GeneralTransportOptions::PORTAL, portal_);
// TODO add statistics for producer
// socket_->getSocketOption(OtherOptions::STATISTICS, &stats_);
}
-ProductionProtocol::~ProductionProtocol() {
- if (!is_async_ && is_running_) {
- stop();
- }
+ProductionProtocol::~ProductionProtocol() {}
- if (listening_thread_.joinable()) {
- listening_thread_.join();
+int ProductionProtocol::start() {
+ if (isRunning()) {
+ return -1;
}
-}
-int ProductionProtocol::start() {
- socket_->getSocketOption(ProducerCallbacksOptions::INTEREST_INPUT,
- &on_interest_input_);
- socket_->getSocketOption(ProducerCallbacksOptions::INTEREST_DROP,
- &on_interest_dropped_input_buffer_);
- socket_->getSocketOption(ProducerCallbacksOptions::INTEREST_PASS,
- &on_interest_inserted_input_buffer_);
- socket_->getSocketOption(ProducerCallbacksOptions::CACHE_HIT,
- &on_interest_satisfied_output_buffer_);
- socket_->getSocketOption(ProducerCallbacksOptions::CACHE_MISS,
- &on_interest_process_);
- socket_->getSocketOption(ProducerCallbacksOptions::NEW_CONTENT_OBJECT,
- &on_new_segment_);
- socket_->getSocketOption(ProducerCallbacksOptions::CONTENT_OBJECT_READY,
- &on_content_object_in_output_buffer_);
- socket_->getSocketOption(ProducerCallbacksOptions::CONTENT_OBJECT_OUTPUT,
- &on_content_object_output_);
- socket_->getSocketOption(ProducerCallbacksOptions::CONTENT_OBJECT_TO_SIGN,
- &on_content_object_to_sign_);
- socket_->getSocketOption(ProducerCallbacksOptions::CONTENT_PRODUCED,
- &on_content_produced_);
-
- socket_->getSocketOption(GeneralTransportOptions::ASYNC_MODE, is_async_);
- socket_->getSocketOption(GeneralTransportOptions::SIGNER, signer_);
- socket_->getSocketOption(GeneralTransportOptions::MAKE_MANIFEST,
- making_manifest_);
-
- bool first = true;
-
- for (core::Prefix &producer_namespace : served_namespaces_) {
- if (first) {
- core::BindConfig bind_config(producer_namespace, 1000);
- portal_->bind(bind_config);
- portal_->setProducerCallback(this);
- first = !first;
- } else {
- portal_->registerRoute(producer_namespace);
+ portal_->getThread().addAndWaitForExecution([this]() {
+ socket_->getSocketOption(ProducerCallbacksOptions::INTEREST_INPUT,
+ &on_interest_input_);
+ socket_->getSocketOption(ProducerCallbacksOptions::INTEREST_DROP,
+ &on_interest_dropped_input_buffer_);
+ socket_->getSocketOption(ProducerCallbacksOptions::INTEREST_PASS,
+ &on_interest_inserted_input_buffer_);
+ socket_->getSocketOption(ProducerCallbacksOptions::CACHE_HIT,
+ &on_interest_satisfied_output_buffer_);
+ socket_->getSocketOption(ProducerCallbacksOptions::CACHE_MISS,
+ &on_interest_process_);
+ socket_->getSocketOption(ProducerCallbacksOptions::NEW_CONTENT_OBJECT,
+ &on_new_segment_);
+ socket_->getSocketOption(ProducerCallbacksOptions::CONTENT_OBJECT_READY,
+ &on_content_object_in_output_buffer_);
+ socket_->getSocketOption(ProducerCallbacksOptions::CONTENT_OBJECT_OUTPUT,
+ &on_content_object_output_);
+ socket_->getSocketOption(ProducerCallbacksOptions::CONTENT_OBJECT_TO_SIGN,
+ &on_content_object_to_sign_);
+ socket_->getSocketOption(ProducerCallbacksOptions::CONTENT_PRODUCED,
+ &on_content_produced_);
+ socket_->getSocketOption(ProducerCallbacksOptions::PRODUCER_CALLBACK,
+ &producer_callback_);
+
+ socket_->getSocketOption(GeneralTransportOptions::ASYNC_MODE, is_async_);
+ socket_->getSocketOption(GeneralTransportOptions::SIGNER, signer_);
+ socket_->getSocketOption(GeneralTransportOptions::MAKE_MANIFEST,
+ making_manifest_);
+
+ std::string fec_type_str = "";
+ socket_->getSocketOption(GeneralTransportOptions::FEC_TYPE, fec_type_str);
+ if (fec_type_str != "") {
+ fec_type_ = fec::FECUtils::fecTypeFromString(fec_type_str.c_str());
}
- }
- is_running_ = true;
+ portal_->registerTransportCallback(this);
+ setProducerParam();
- if (!is_async_) {
- listening_thread_ = std::thread([this]() { portal_->runEventsLoop(); });
- }
+ setRunning();
+ });
return 0;
}
-void ProductionProtocol::stop() {
- is_running_ = false;
-
- if (!is_async_) {
- portal_->stopEventsLoop();
- } else {
- portal_->clear();
- }
-}
-
void ProductionProtocol::produce(ContentObject &content_object) {
- if (*on_content_object_in_output_buffer_) {
- on_content_object_in_output_buffer_->operator()(*socket_->getInterface(),
- content_object);
- }
+ auto content_object_ptr = content_object.shared_from_this();
+ portal_->getThread().add([this, co = std::move(content_object_ptr)]() {
+ if (*on_content_object_in_output_buffer_) {
+ on_content_object_in_output_buffer_->operator()(*socket_->getInterface(),
+ *co);
+ }
- output_buffer_.insert(std::static_pointer_cast<ContentObject>(
- content_object.shared_from_this()));
+ output_buffer_.insert(co);
- if (*on_content_object_output_) {
- on_content_object_output_->operator()(*socket_->getInterface(),
- content_object);
- }
+ if (*on_content_object_output_) {
+ on_content_object_output_->operator()(*socket_->getInterface(), *co);
+ }
- portal_->sendContentObject(content_object);
+ portal_->sendContentObject(*co);
+ });
}
-void ProductionProtocol::registerNamespaceWithNetwork(
- const Prefix &producer_namespace) {
- served_namespaces_.push_back(producer_namespace);
+void ProductionProtocol::sendMapme() { portal_->sendMapme(); }
+
+void ProductionProtocol::onError(const std::error_code &ec) {
+ // Stop production protocol
+ stop();
+
+ // Call error callback
+ if (producer_callback_) {
+ producer_callback_->produceError(ec);
+ }
}
} // namespace protocol