diff options
10 files changed, 125 insertions, 37 deletions
diff --git a/libtransport/src/hicn/transport/core/pending_interest.cc b/libtransport/src/hicn/transport/core/pending_interest.cc index 8f6de1839..a2df9ba44 100644 --- a/libtransport/src/hicn/transport/core/pending_interest.cc +++ b/libtransport/src/hicn/transport/core/pending_interest.cc @@ -20,12 +20,28 @@ namespace transport { namespace core { PendingInterest::PendingInterest() - : interest_(nullptr, nullptr), timer_(), received_(false) {} + : interest_(nullptr, nullptr), + timer_(), + on_content_object_callback_(), + on_interest_timeout_callback_(), + received_(false) {} PendingInterest::PendingInterest(Interest::Ptr &&interest, std::unique_ptr<asio::steady_timer> &&timer) : interest_(std::move(interest)), timer_(std::move(timer)), + on_content_object_callback_(), + on_interest_timeout_callback_(), + received_(false) {} + +PendingInterest::PendingInterest(Interest::Ptr &&interest, + const OnContentObjectCallback &&on_content_object, + const OnInterestTimeoutCallback &&on_interest_timeout, + std::unique_ptr<asio::steady_timer> &&timer) + : interest_(std::move(interest)), + timer_(std::move(timer)), + on_content_object_callback_(std::move(on_content_object)), + on_interest_timeout_callback_(std::move(on_interest_timeout)), received_(false) {} PendingInterest::~PendingInterest() { @@ -34,16 +50,28 @@ PendingInterest::~PendingInterest() { void PendingInterest::cancelTimer() { timer_->cancel(); } -bool PendingInterest::isReceived() const { return received_; } - void PendingInterest::setReceived() { received_ = true; } +bool PendingInterest::isReceived() const { return received_; } + Interest::Ptr &&PendingInterest::getInterest() { return std::move(interest_); } -void PendingInterest::setReceived(bool received) { - PendingInterest::received_ = received; +const OnContentObjectCallback &PendingInterest::getOnDataCallback() const { + return on_content_object_callback_; +} + +void PendingInterest::setOnDataCallback(const OnContentObjectCallback &on_content_object) { + PendingInterest::on_content_object_callback_ = on_content_object; +} + +const OnInterestTimeoutCallback &PendingInterest::getOnTimeoutCallback() const { + return on_interest_timeout_callback_; +} + +void PendingInterest::setOnTimeoutCallback(const OnInterestTimeoutCallback &on_interest_timeout) { + PendingInterest::on_interest_timeout_callback_ = on_interest_timeout; } } // end namespace core -} // end namespace transport
\ No newline at end of file +} // end namespace transport diff --git a/libtransport/src/hicn/transport/core/pending_interest.h b/libtransport/src/hicn/transport/core/pending_interest.h index cbcafb5d9..58b51db79 100644 --- a/libtransport/src/hicn/transport/core/pending_interest.h +++ b/libtransport/src/hicn/transport/core/pending_interest.h @@ -34,6 +34,8 @@ class RawSocketInterface; template <typename ForwarderInt> class Portal; +typedef std::function<void(Interest::Ptr&&, ContentObject::Ptr&&)> OnContentObjectCallback; +typedef std::function<void(Interest::Ptr&&)> OnInterestTimeoutCallback; typedef std::function<void(const std::error_code &)> TimerCallback; class PendingInterest { @@ -47,9 +49,12 @@ class PendingInterest { PendingInterest(Interest::Ptr &&interest, std::unique_ptr<asio::steady_timer> &&timer); - ~PendingInterest(); + PendingInterest(Interest::Ptr &&interest, + const OnContentObjectCallback &&on_content_object, + const OnInterestTimeoutCallback &&on_interest_timeout, + std::unique_ptr<asio::steady_timer> &&timer); - bool isReceived() const; + ~PendingInterest(); template <typename Handler> TRANSPORT_ALWAYS_INLINE void startCountdown(Handler &&cb) { @@ -62,17 +67,23 @@ class PendingInterest { void setReceived(); + bool isReceived() const; + Interest::Ptr &&getInterest(); - void setReceived(bool received); + const OnContentObjectCallback &getOnDataCallback() const; + + void setOnDataCallback(const OnContentObjectCallback &on_content_object); - bool isValid() const; + const OnInterestTimeoutCallback &getOnTimeoutCallback() const; - void setValid(bool valid); + void setOnTimeoutCallback(const OnInterestTimeoutCallback &on_interest_timeout); private: Interest::Ptr interest_; std::unique_ptr<asio::steady_timer> timer_; + OnContentObjectCallback on_content_object_callback_; + OnInterestTimeoutCallback on_interest_timeout_callback_; bool received_; }; diff --git a/libtransport/src/hicn/transport/core/portal.h b/libtransport/src/hicn/transport/core/portal.h index a79717037..52a721a35 100644 --- a/libtransport/src/hicn/transport/core/portal.h +++ b/libtransport/src/hicn/transport/core/portal.h @@ -103,7 +103,6 @@ class Portal { Portal(asio::io_service &io_service) : io_service_(io_service), - is_running_(false), app_name_("libtransport_application"), consumer_callback_(nullptr), producer_callback_(nullptr), @@ -157,9 +156,30 @@ class Portal { std::placeholders::_1, name)); } + TRANSPORT_ALWAYS_INLINE void sendInterest(Interest::Ptr &&interest, + const OnContentObjectCallback &&on_content_object_callback, + const OnInterestTimeoutCallback &&on_interest_timeout_callback) { + + const Name name(interest->getName(), true); + + // Send it + forwarder_interface_.send(*interest); + + pending_interest_hash_table_[name] = std::make_unique<PendingInterest>( + std::move(interest), std::move(on_content_object_callback), + std::move(on_interest_timeout_callback), + std::make_unique<asio::steady_timer>(io_service_)); + + pending_interest_hash_table_[name]->startCountdown( + std::bind(&Portal<ForwarderInt>::timerHandler, + this, std::placeholders::_1, name)); + + } + TRANSPORT_ALWAYS_INLINE void timerHandler(const std::error_code &ec, const Name &name) { - if (TRANSPORT_EXPECT_FALSE(!is_running_)) { + bool is_stopped = io_service_.stopped(); + if (TRANSPORT_EXPECT_FALSE(is_stopped)) { return; } @@ -170,7 +190,9 @@ class Portal { std::unique_ptr<PendingInterest> ptr = std::move(it->second); pending_interest_hash_table_.erase(it); - if (consumer_callback_) { + if(ptr->getOnTimeoutCallback() != UNSET_CALLBACK){ + ptr->on_interest_timeout_callback_(std::move(ptr->getInterest())); + }else if (consumer_callback_) { consumer_callback_->onTimeout(std::move(ptr->getInterest())); } } @@ -189,9 +211,7 @@ class Portal { io_service_.reset(); // ensure that run()/poll() will do some work } - is_running_ = true; this->io_service_.run(); - is_running_ = false; } TRANSPORT_ALWAYS_INLINE void runOneEvent() { @@ -199,9 +219,7 @@ class Portal { io_service_.reset(); // ensure that run()/poll() will do some work } - is_running_ = true; this->io_service_.run_one(); - is_running_ = false; } TRANSPORT_ALWAYS_INLINE void sendContentObject( @@ -210,7 +228,6 @@ class Portal { } TRANSPORT_ALWAYS_INLINE void stopEventsLoop() { - is_running_ = false; internal_work_.reset(); for (auto &pend_interest : pending_interest_hash_table_) { @@ -242,7 +259,8 @@ class Portal { private: TRANSPORT_ALWAYS_INLINE void processIncomingMessages( Packet::MemBufPtr &&packet_buffer) { - if (TRANSPORT_EXPECT_FALSE(!is_running_)) { + bool is_stopped = io_service_.stopped(); + if (TRANSPORT_EXPECT_FALSE(is_stopped)) { return; } @@ -293,7 +311,11 @@ class Portal { interest_ptr->setReceived(); pending_interest_hash_table_.erase(content_object->getName()); - if (consumer_callback_) { + if(interest_ptr->getOnDataCallback() != UNSET_CALLBACK){ + interest_ptr->on_content_object_callback_( + std::move(interest_ptr->getInterest()), + std::move(content_object)); + }else if (consumer_callback_) { consumer_callback_->onContentObject( std::move(interest_ptr->getInterest()), std::move(content_object)); @@ -320,8 +342,6 @@ class Portal { asio::io_service internal_io_service_; std::unique_ptr<asio::io_service::work> internal_work_; - volatile bool is_running_; - std::string app_name_; PendingInterestHashTable pending_interest_hash_table_; diff --git a/libtransport/src/hicn/transport/core/socket_connector.h b/libtransport/src/hicn/transport/core/socket_connector.h index d7a05aab4..b1757e59a 100644 --- a/libtransport/src/hicn/transport/core/socket_connector.h +++ b/libtransport/src/hicn/transport/core/socket_connector.h @@ -19,6 +19,7 @@ #include <hicn/transport/core/name.h> #include <hicn/transport/utils/branch_prediction.h> +#include <asio/steady_timer.hpp> #include <asio.hpp> #include <deque> diff --git a/libtransport/src/hicn/transport/interfaces/full_duplex_socket.cc b/libtransport/src/hicn/transport/interfaces/full_duplex_socket.cc index 37e1d7b3e..e06858cc3 100644 --- a/libtransport/src/hicn/transport/interfaces/full_duplex_socket.cc +++ b/libtransport/src/hicn/transport/interfaces/full_duplex_socket.cc @@ -397,7 +397,7 @@ void AsyncFullDuplexSocket::onContentRetrieved(ConsumerSocket &s, return; } - TRANSPORT_LOGI("Received content with size %lu", size); + TRANSPORT_LOGI("Received content with size %zu", size); if (!ec) { read_callback_->readBufferAvailable(std::move(*receive_buffer_)); } else { diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_consumer.cc b/libtransport/src/hicn/transport/interfaces/rtc_socket_consumer.cc index de3e84417..cc4f478af 100755 --- a/libtransport/src/hicn/transport/interfaces/rtc_socket_consumer.cc +++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_consumer.cc @@ -22,6 +22,9 @@ namespace interface { RTCConsumerSocket::RTCConsumerSocket(int protocol, asio::io_service &io_service) : ConsumerSocket(protocol, io_service) {} +RTCConsumerSocket::RTCConsumerSocket(int protocol) + : ConsumerSocket(protocol) {} + RTCConsumerSocket::~RTCConsumerSocket() {} void RTCConsumerSocket::handleRTCPPacket(uint8_t *packet, size_t len) { diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_consumer.h b/libtransport/src/hicn/transport/interfaces/rtc_socket_consumer.h index 86ccf6e22..cfde3128d 100755 --- a/libtransport/src/hicn/transport/interfaces/rtc_socket_consumer.h +++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_consumer.h @@ -25,6 +25,8 @@ class RTCConsumerSocket : public ConsumerSocket { public: explicit RTCConsumerSocket(int protocol, asio::io_service &io_service); + explicit RTCConsumerSocket(int protocol); + ~RTCConsumerSocket(); void handleRTCPPacket(uint8_t *packet, size_t len); diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc index ec8c4c69e..f19502dee 100644 --- a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc +++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc @@ -47,12 +47,32 @@ RTCProducerSocket::RTCProducerSocket(asio::io_service &io_service) bytesProductionRate_(0), packetsProductionRate_(INIT_PACKET_PRODUCTION_RATE), perSecondFactor_(1000 / STATS_INTERVAL_DURATION) { - nack_->appendPayload(utils::MemBuf::create(NACK_HEADER_SIZE)); + auto nack_payload = utils::MemBuf::create(NACK_HEADER_SIZE); + nack_payload->append(NACK_HEADER_SIZE); + nack_->appendPayload(std::move(nack_payload)); lastStats_ = std::chrono::steady_clock::now(); srand(time(NULL)); prodLabel_ = ((rand() % 255) << 24UL); } +RTCProducerSocket::RTCProducerSocket() + : ProducerSocket(), + currentSeg_(1), + nack_(std::make_shared<ContentObject>()), + producedBytes_(0), + producedPackets_(0), + bytesProductionRate_(0), + packetsProductionRate_(INIT_PACKET_PRODUCTION_RATE), + perSecondFactor_(1000 / STATS_INTERVAL_DURATION) { + auto nack_payload = utils::MemBuf::create(NACK_HEADER_SIZE); + nack_payload->append(NACK_HEADER_SIZE); + nack_->appendPayload(std::move(nack_payload)); + lastStats_ = std::chrono::steady_clock::now(); + srand(time(NULL)); + prodLabel_ = ((rand() % 255) << 24UL); +} + + RTCProducerSocket::~RTCProducerSocket() {} void RTCProducerSocket::registerName(Prefix &producer_namespace) { @@ -100,24 +120,23 @@ void RTCProducerSocket::produce(const uint8_t *buf, size_t buffer_size) { updateStats(buffer_size + headerSize_ + TIMESTAMP_LEN); - std::shared_ptr<ContentObject> content_object = - std::make_shared<ContentObject>(flowName_.setSuffix(currentSeg_)); - auto payload = utils::MemBuf::copyBuffer(buf, buffer_size, TIMESTAMP_LEN); - - // content_object->setLifetime(content_object_expiry_time_); - content_object->setLifetime(1000); // XXX this should be set by the APP + ContentObject content_object(flowName_.setSuffix(currentSeg_)); uint64_t timestamp = std::chrono::duration_cast<std::chrono::milliseconds>( std::chrono::system_clock::now().time_since_epoch()) .count(); - payload->prepend(TIMESTAMP_LEN); - uint8_t *payloadPointer = payload->writableData(); - *(uint64_t *)payloadPointer = timestamp; - content_object->appendPayload(std::move(payload)); + auto payload = utils::MemBuf::create(buffer_size + TIMESTAMP_LEN); + + memcpy(payload->writableData(), ×tamp, TIMESTAMP_LEN); + memcpy(payload->writableData() + TIMESTAMP_LEN, buf, buffer_size); + payload->append(buffer_size + TIMESTAMP_LEN); + content_object.appendPayload(std::move(payload)); + + content_object.setLifetime(1000); // XXX this should be set by the APP - content_object->setPathLabel(prodLabel_); - portal_->sendContentObject(*content_object); + content_object.setPathLabel(prodLabel_); + portal_->sendContentObject(content_object); currentSeg_++; } diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h index 1a42bdc56..f1bcaa9e8 100755 --- a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h +++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h @@ -28,6 +28,9 @@ namespace interface { class RTCProducerSocket : public ProducerSocket { public: RTCProducerSocket(asio::io_service &io_service); + + RTCProducerSocket(); + ~RTCProducerSocket(); void registerName(Prefix &producer_namespace); diff --git a/libtransport/src/hicn/transport/protocols/rtc.cc b/libtransport/src/hicn/transport/protocols/rtc.cc index ee90e5d9e..f73b40d1b 100644 --- a/libtransport/src/hicn/transport/protocols/rtc.cc +++ b/libtransport/src/hicn/transport/protocols/rtc.cc @@ -102,6 +102,7 @@ void RTCTransportProtocol::onRTCPPacket(uint8_t *packet, size_t len) { // private void RTCTransportProtocol::reset() { + portal_->setConsumerCallback(this); // controller var lastRoundBegin_ = std::chrono::steady_clock::now(); currentState_ = RTC_SYNC_STATE; |