diff options
Diffstat (limited to 'libtransport/src/protocols/production_protocol.cc')
-rw-r--r-- | libtransport/src/protocols/production_protocol.cc | 141 |
1 files changed, 66 insertions, 75 deletions
diff --git a/libtransport/src/protocols/production_protocol.cc b/libtransport/src/protocols/production_protocol.cc index 6b317d47d..8b781e38a 100644 --- a/libtransport/src/protocols/production_protocol.cc +++ b/libtransport/src/protocols/production_protocol.cc @@ -24,8 +24,8 @@ using namespace interface; ProductionProtocol::ProductionProtocol( implementation::ProducerSocket *icn_socket) - : socket_(icn_socket), - is_running_(false), + : Protocol(), + socket_(icn_socket), fec_encoder_(nullptr), on_interest_input_(VOID_HANDLER), on_interest_dropped_input_buffer_(VOID_HANDLER), @@ -38,101 +38,92 @@ ProductionProtocol::ProductionProtocol( on_content_object_output_(VOID_HANDLER), on_content_object_evicted_from_output_buffer_(VOID_HANDLER), on_content_produced_(VOID_HANDLER), + producer_callback_(VOID_HANDLER), fec_type_(fec::FECType::UNKNOWN) { socket_->getSocketOption(GeneralTransportOptions::PORTAL, portal_); // TODO add statistics for producer // socket_->getSocketOption(OtherOptions::STATISTICS, &stats_); } -ProductionProtocol::~ProductionProtocol() { - if (!is_async_ && is_running_) { - stop(); - } +ProductionProtocol::~ProductionProtocol() {} - if (listening_thread_.joinable()) { - listening_thread_.join(); +int ProductionProtocol::start() { + if (isRunning()) { + return -1; } -} -int ProductionProtocol::start() { - socket_->getSocketOption(ProducerCallbacksOptions::INTEREST_INPUT, - &on_interest_input_); - socket_->getSocketOption(ProducerCallbacksOptions::INTEREST_DROP, - &on_interest_dropped_input_buffer_); - socket_->getSocketOption(ProducerCallbacksOptions::INTEREST_PASS, - &on_interest_inserted_input_buffer_); - socket_->getSocketOption(ProducerCallbacksOptions::CACHE_HIT, - &on_interest_satisfied_output_buffer_); - socket_->getSocketOption(ProducerCallbacksOptions::CACHE_MISS, - &on_interest_process_); - socket_->getSocketOption(ProducerCallbacksOptions::NEW_CONTENT_OBJECT, - &on_new_segment_); - socket_->getSocketOption(ProducerCallbacksOptions::CONTENT_OBJECT_READY, - &on_content_object_in_output_buffer_); - socket_->getSocketOption(ProducerCallbacksOptions::CONTENT_OBJECT_OUTPUT, - &on_content_object_output_); - socket_->getSocketOption(ProducerCallbacksOptions::CONTENT_OBJECT_TO_SIGN, - &on_content_object_to_sign_); - socket_->getSocketOption(ProducerCallbacksOptions::CONTENT_PRODUCED, - &on_content_produced_); - - socket_->getSocketOption(GeneralTransportOptions::ASYNC_MODE, is_async_); - socket_->getSocketOption(GeneralTransportOptions::SIGNER, signer_); - socket_->getSocketOption(GeneralTransportOptions::MAKE_MANIFEST, - making_manifest_); - - bool first = true; - - for (core::Prefix &producer_namespace : served_namespaces_) { - if (first) { - core::BindConfig bind_config(producer_namespace, 1000); - portal_->bind(bind_config); - portal_->setProducerCallback(this); - first = !first; - } else { - portal_->registerRoute(producer_namespace); + portal_->getThread().addAndWaitForExecution([this]() { + socket_->getSocketOption(ProducerCallbacksOptions::INTEREST_INPUT, + &on_interest_input_); + socket_->getSocketOption(ProducerCallbacksOptions::INTEREST_DROP, + &on_interest_dropped_input_buffer_); + socket_->getSocketOption(ProducerCallbacksOptions::INTEREST_PASS, + &on_interest_inserted_input_buffer_); + socket_->getSocketOption(ProducerCallbacksOptions::CACHE_HIT, + &on_interest_satisfied_output_buffer_); + socket_->getSocketOption(ProducerCallbacksOptions::CACHE_MISS, + &on_interest_process_); + socket_->getSocketOption(ProducerCallbacksOptions::NEW_CONTENT_OBJECT, + &on_new_segment_); + socket_->getSocketOption(ProducerCallbacksOptions::CONTENT_OBJECT_READY, + &on_content_object_in_output_buffer_); + socket_->getSocketOption(ProducerCallbacksOptions::CONTENT_OBJECT_OUTPUT, + &on_content_object_output_); + socket_->getSocketOption(ProducerCallbacksOptions::CONTENT_OBJECT_TO_SIGN, + &on_content_object_to_sign_); + socket_->getSocketOption(ProducerCallbacksOptions::CONTENT_PRODUCED, + &on_content_produced_); + socket_->getSocketOption(ProducerCallbacksOptions::PRODUCER_CALLBACK, + &producer_callback_); + + socket_->getSocketOption(GeneralTransportOptions::ASYNC_MODE, is_async_); + socket_->getSocketOption(GeneralTransportOptions::SIGNER, signer_); + socket_->getSocketOption(GeneralTransportOptions::MAKE_MANIFEST, + making_manifest_); + + std::string fec_type_str = ""; + socket_->getSocketOption(GeneralTransportOptions::FEC_TYPE, fec_type_str); + if (fec_type_str != "") { + fec_type_ = fec::FECUtils::fecTypeFromString(fec_type_str.c_str()); } - } - is_running_ = true; + portal_->registerTransportCallback(this); + setProducerParam(); - if (!is_async_) { - listening_thread_ = std::thread([this]() { portal_->runEventsLoop(); }); - } + setRunning(); + }); return 0; } -void ProductionProtocol::stop() { - is_running_ = false; - - if (!is_async_) { - portal_->stopEventsLoop(); - } else { - portal_->clear(); - } -} - void ProductionProtocol::produce(ContentObject &content_object) { - if (*on_content_object_in_output_buffer_) { - on_content_object_in_output_buffer_->operator()(*socket_->getInterface(), - content_object); - } + auto content_object_ptr = content_object.shared_from_this(); + portal_->getThread().add([this, co = std::move(content_object_ptr)]() { + if (*on_content_object_in_output_buffer_) { + on_content_object_in_output_buffer_->operator()(*socket_->getInterface(), + *co); + } - output_buffer_.insert(std::static_pointer_cast<ContentObject>( - content_object.shared_from_this())); + output_buffer_.insert(co); - if (*on_content_object_output_) { - on_content_object_output_->operator()(*socket_->getInterface(), - content_object); - } + if (*on_content_object_output_) { + on_content_object_output_->operator()(*socket_->getInterface(), *co); + } - portal_->sendContentObject(content_object); + portal_->sendContentObject(*co); + }); } -void ProductionProtocol::registerNamespaceWithNetwork( - const Prefix &producer_namespace) { - served_namespaces_.push_back(producer_namespace); +void ProductionProtocol::sendMapme() { portal_->sendMapme(); } + +void ProductionProtocol::onError(const std::error_code &ec) { + // Stop production protocol + stop(); + + // Call error callback + if (producer_callback_) { + producer_callback_->produceError(ec); + } } } // namespace protocol |