summaryrefslogtreecommitdiffstats
path: root/libtransport
diff options
context:
space:
mode:
Diffstat (limited to 'libtransport')
-rwxr-xr-xlibtransport/cmake/Modules/Packaging.cmake24
-rw-r--r--libtransport/src/hicn/transport/core/memif_connector.cc5
-rw-r--r--libtransport/src/hicn/transport/core/pending_interest.cc40
-rw-r--r--libtransport/src/hicn/transport/core/pending_interest.h21
-rw-r--r--libtransport/src/hicn/transport/core/portal.h60
-rw-r--r--libtransport/src/hicn/transport/core/raw_socket_interface.cc2
-rw-r--r--libtransport/src/hicn/transport/core/socket_connector.cc19
-rw-r--r--libtransport/src/hicn/transport/core/socket_connector.h2
-rw-r--r--libtransport/src/hicn/transport/interfaces/full_duplex_socket.cc2
-rwxr-xr-xlibtransport/src/hicn/transport/interfaces/rtc_socket_consumer.cc3
-rwxr-xr-xlibtransport/src/hicn/transport/interfaces/rtc_socket_consumer.h2
-rw-r--r--libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc18
-rwxr-xr-xlibtransport/src/hicn/transport/interfaces/rtc_socket_producer.h3
-rw-r--r--libtransport/src/hicn/transport/interfaces/socket_consumer.cc4
-rwxr-xr-xlibtransport/src/hicn/transport/interfaces/socket_consumer.h3
-rwxr-xr-xlibtransport/src/hicn/transport/interfaces/socket_producer.cc6
16 files changed, 155 insertions, 59 deletions
diff --git a/libtransport/cmake/Modules/Packaging.cmake b/libtransport/cmake/Modules/Packaging.cmake
index 6dc0eb457..64d1dd325 100755
--- a/libtransport/cmake/Modules/Packaging.cmake
+++ b/libtransport/cmake/Modules/Packaging.cmake
@@ -32,48 +32,44 @@ set(lib${LIBTRANSPORT}-devel_DESCRIPTION ${lib${LIBTRANSPORT}_DESCRIPTION}
if ((BUILD_MEMIF_CONNECTOR OR BUILD_VPP_PLUGIN) AND "${CMAKE_SYSTEM_NAME}" STREQUAL "Linux")
set(lib${LIBTRANSPORT}_DEB_DEPENDENCIES
- "libhicn (>= 1.0), libparc (>= 1.0), vpp-lib (== 19.01-release)"
+ "libhicn (>= stable_version), libparc (>= 1.0), vpp-lib (== stable_version-release)"
CACHE STRING "Dependencies for deb/rpm package."
)
set(lib${LIBTRANSPORT}_RPM_DEPENDENCIES
- "libhicn >= 1.0, libparc >= 1.0, vpp-lib = 19.01-release"
+ "libhicn >= stable_version, libparc >= 1.0, vpp-lib = stable_version-release"
CACHE STRING "Dependencies for deb/rpm package."
)
set(lib${LIBTRANSPORT}-dev_DEB_DEPENDENCIES
- "libtransport (>= 1.0), libasio-dev (>= 1.10), libhicn-dev (>= 1.0), libparc-dev (>= 1.0), vpp-dev (== 19.01-release)"
+ "lib${LIBTRANSPORT} (>= stable_version), libasio-dev (>= 1.10), libhicn-dev (>= stable_version), libparc-dev (>= 1.0), vpp-dev (== stable_version-release)"
CACHE STRING "Dependencies for deb/rpm package."
)
- set(lib${LIBTRANSPORT}-devel_RPM_DEPENDENCIES
- "libtransport >= 1.0, asio-devel >= 1.10, libhicn-devel >= 1.0, libparc-devel >= 1.0, vpp-devel = 19.01-release"
+ set(lib${LIBTRANSPORT}-dev_RPM_DEPENDENCIES
+ "lib${LIBTRANSPORT} >= stable_version, asio-devel >= 1.10, libhicn-devel >= stable_version, libparc-devel >= 1.0, vpp-devel = stable_version-release"
CACHE STRING "Dependencies for deb/rpm package."
)
else()
-<<<<<<< HEAD
-
-=======
->>>>>>> 691dfde... [HICN-3] First version of packaging system based on cmake.
set(lib${LIBTRANSPORT}_DEB_DEPENDENCIES
- "libhicn (>= 1.0), libparc (>= 1.0)"
+ "libhicn (>= stable_version), libparc (>= 1.0)"
CACHE STRING "Dependencies for deb/rpm package."
)
set(lib${LIBTRANSPORT}_RPM_DEPENDENCIES
- "libhicn >= 1.0, libparc >= 1.0"
+ "libhicn >= stable_version, libparc >= 1.0"
CACHE STRING "Dependencies for deb/rpm package."
)
set(lib${LIBTRANSPORT}-dev_DEB_DEPENDENCIES
- "libtransport (>= 1.0), libasio-dev (>= 1.10), libhicn-dev (>= 1.0), libparc-dev (>= 1.0)"
+ "lib${LIBTRANSPORT} (>= stable_version), libasio-dev (>= 1.10), libhicn-dev (>= stable_version), libparc-dev (>= 1.0)"
CACHE STRING "Dependencies for deb/rpm package."
)
- set(lib${LIBTRANSPORT}-devel_RPM_DEPENDENCIES
- "libtransport >= 1.0, asio-devel >= 1.10, libhicn-devel >= 1.0, libparc-devel >= 1.0"
+ set(lib${LIBTRANSPORT}-dev_RPM_DEPENDENCIES
+ "lib${LIBTRANSPORT} >= stable_version, asio-devel >= 1.10, libhicn-devel >= stable_version, libparc-devel >= 1.0"
CACHE STRING "Dependencies for deb/rpm package."
)
diff --git a/libtransport/src/hicn/transport/core/memif_connector.cc b/libtransport/src/hicn/transport/core/memif_connector.cc
index b3785e5c3..6c5f2ff5f 100644
--- a/libtransport/src/hicn/transport/core/memif_connector.cc
+++ b/libtransport/src/hicn/transport/core/memif_connector.cc
@@ -38,7 +38,6 @@ MemifConnector::MemifConnector(PacketReceivedCallback &&receive_callback,
timer_set_(false),
send_timer_(std::make_unique<utils::FdDeadlineTimer>(event_reactor_)),
io_service_(io_service),
- work_(std::make_unique<asio::io_service::work>(io_service_)),
packet_counter_(0),
memif_connection_({}),
tx_buf_counter_(0),
@@ -74,6 +73,8 @@ void MemifConnector::connect(uint32_t memif_id, long memif_mode) {
createMemif(memif_id, memif_mode, nullptr);
+ work_ = std::make_unique<asio::io_service::work>(io_service_);
+
while (is_connecting_) {
MemifConnector::main_event_reactor_.runOneEvent();
}
@@ -402,7 +403,7 @@ void MemifConnector::close() {
if (!closed_) {
closed_ = true;
event_reactor_.stop();
- io_service_.stop();
+ work_.reset();
if (memif_worker_ && memif_worker_->joinable()) {
memif_worker_->join();
diff --git a/libtransport/src/hicn/transport/core/pending_interest.cc b/libtransport/src/hicn/transport/core/pending_interest.cc
index 8f6de1839..a2df9ba44 100644
--- a/libtransport/src/hicn/transport/core/pending_interest.cc
+++ b/libtransport/src/hicn/transport/core/pending_interest.cc
@@ -20,12 +20,28 @@ namespace transport {
namespace core {
PendingInterest::PendingInterest()
- : interest_(nullptr, nullptr), timer_(), received_(false) {}
+ : interest_(nullptr, nullptr),
+ timer_(),
+ on_content_object_callback_(),
+ on_interest_timeout_callback_(),
+ received_(false) {}
PendingInterest::PendingInterest(Interest::Ptr &&interest,
std::unique_ptr<asio::steady_timer> &&timer)
: interest_(std::move(interest)),
timer_(std::move(timer)),
+ on_content_object_callback_(),
+ on_interest_timeout_callback_(),
+ received_(false) {}
+
+PendingInterest::PendingInterest(Interest::Ptr &&interest,
+ const OnContentObjectCallback &&on_content_object,
+ const OnInterestTimeoutCallback &&on_interest_timeout,
+ std::unique_ptr<asio::steady_timer> &&timer)
+ : interest_(std::move(interest)),
+ timer_(std::move(timer)),
+ on_content_object_callback_(std::move(on_content_object)),
+ on_interest_timeout_callback_(std::move(on_interest_timeout)),
received_(false) {}
PendingInterest::~PendingInterest() {
@@ -34,16 +50,28 @@ PendingInterest::~PendingInterest() {
void PendingInterest::cancelTimer() { timer_->cancel(); }
-bool PendingInterest::isReceived() const { return received_; }
-
void PendingInterest::setReceived() { received_ = true; }
+bool PendingInterest::isReceived() const { return received_; }
+
Interest::Ptr &&PendingInterest::getInterest() { return std::move(interest_); }
-void PendingInterest::setReceived(bool received) {
- PendingInterest::received_ = received;
+const OnContentObjectCallback &PendingInterest::getOnDataCallback() const {
+ return on_content_object_callback_;
+}
+
+void PendingInterest::setOnDataCallback(const OnContentObjectCallback &on_content_object) {
+ PendingInterest::on_content_object_callback_ = on_content_object;
+}
+
+const OnInterestTimeoutCallback &PendingInterest::getOnTimeoutCallback() const {
+ return on_interest_timeout_callback_;
+}
+
+void PendingInterest::setOnTimeoutCallback(const OnInterestTimeoutCallback &on_interest_timeout) {
+ PendingInterest::on_interest_timeout_callback_ = on_interest_timeout;
}
} // end namespace core
-} // end namespace transport \ No newline at end of file
+} // end namespace transport
diff --git a/libtransport/src/hicn/transport/core/pending_interest.h b/libtransport/src/hicn/transport/core/pending_interest.h
index cbcafb5d9..58b51db79 100644
--- a/libtransport/src/hicn/transport/core/pending_interest.h
+++ b/libtransport/src/hicn/transport/core/pending_interest.h
@@ -34,6 +34,8 @@ class RawSocketInterface;
template <typename ForwarderInt>
class Portal;
+typedef std::function<void(Interest::Ptr&&, ContentObject::Ptr&&)> OnContentObjectCallback;
+typedef std::function<void(Interest::Ptr&&)> OnInterestTimeoutCallback;
typedef std::function<void(const std::error_code &)> TimerCallback;
class PendingInterest {
@@ -47,9 +49,12 @@ class PendingInterest {
PendingInterest(Interest::Ptr &&interest,
std::unique_ptr<asio::steady_timer> &&timer);
- ~PendingInterest();
+ PendingInterest(Interest::Ptr &&interest,
+ const OnContentObjectCallback &&on_content_object,
+ const OnInterestTimeoutCallback &&on_interest_timeout,
+ std::unique_ptr<asio::steady_timer> &&timer);
- bool isReceived() const;
+ ~PendingInterest();
template <typename Handler>
TRANSPORT_ALWAYS_INLINE void startCountdown(Handler &&cb) {
@@ -62,17 +67,23 @@ class PendingInterest {
void setReceived();
+ bool isReceived() const;
+
Interest::Ptr &&getInterest();
- void setReceived(bool received);
+ const OnContentObjectCallback &getOnDataCallback() const;
+
+ void setOnDataCallback(const OnContentObjectCallback &on_content_object);
- bool isValid() const;
+ const OnInterestTimeoutCallback &getOnTimeoutCallback() const;
- void setValid(bool valid);
+ void setOnTimeoutCallback(const OnInterestTimeoutCallback &on_interest_timeout);
private:
Interest::Ptr interest_;
std::unique_ptr<asio::steady_timer> timer_;
+ OnContentObjectCallback on_content_object_callback_;
+ OnInterestTimeoutCallback on_interest_timeout_callback_;
bool received_;
};
diff --git a/libtransport/src/hicn/transport/core/portal.h b/libtransport/src/hicn/transport/core/portal.h
index a79717037..58406bbde 100644
--- a/libtransport/src/hicn/transport/core/portal.h
+++ b/libtransport/src/hicn/transport/core/portal.h
@@ -97,13 +97,10 @@ class Portal {
virtual void onInterest(Interest::Ptr &&i) = 0;
};
- Portal() : Portal(internal_io_service_) {
- internal_work_ = std::make_unique<asio::io_service::work>(io_service_);
- }
+ Portal() : Portal(internal_io_service_) {}
Portal(asio::io_service &io_service)
: io_service_(io_service),
- is_running_(false),
app_name_("libtransport_application"),
consumer_callback_(nullptr),
producer_callback_(nullptr),
@@ -131,8 +128,7 @@ class Portal {
}
~Portal() {
- connector_.close();
- stopEventsLoop();
+ stopEventsLoop(true);
}
TRANSPORT_ALWAYS_INLINE bool interestIsPending(const Name &name) {
@@ -157,9 +153,30 @@ class Portal {
std::placeholders::_1, name));
}
+ TRANSPORT_ALWAYS_INLINE void sendInterest(Interest::Ptr &&interest,
+ const OnContentObjectCallback &&on_content_object_callback,
+ const OnInterestTimeoutCallback &&on_interest_timeout_callback) {
+
+ const Name name(interest->getName(), true);
+
+ // Send it
+ forwarder_interface_.send(*interest);
+
+ pending_interest_hash_table_[name] = std::make_unique<PendingInterest>(
+ std::move(interest), std::move(on_content_object_callback),
+ std::move(on_interest_timeout_callback),
+ std::make_unique<asio::steady_timer>(io_service_));
+
+ pending_interest_hash_table_[name]->startCountdown(
+ std::bind(&Portal<ForwarderInt>::timerHandler,
+ this, std::placeholders::_1, name));
+
+ }
+
TRANSPORT_ALWAYS_INLINE void timerHandler(const std::error_code &ec,
const Name &name) {
- if (TRANSPORT_EXPECT_FALSE(!is_running_)) {
+ bool is_stopped = io_service_.stopped();
+ if (TRANSPORT_EXPECT_FALSE(is_stopped)) {
return;
}
@@ -170,7 +187,9 @@ class Portal {
std::unique_ptr<PendingInterest> ptr = std::move(it->second);
pending_interest_hash_table_.erase(it);
- if (consumer_callback_) {
+ if(ptr->getOnTimeoutCallback() != UNSET_CALLBACK){
+ ptr->on_interest_timeout_callback_(std::move(ptr->getInterest()));
+ }else if (consumer_callback_) {
consumer_callback_->onTimeout(std::move(ptr->getInterest()));
}
}
@@ -189,9 +208,7 @@ class Portal {
io_service_.reset(); // ensure that run()/poll() will do some work
}
- is_running_ = true;
this->io_service_.run();
- is_running_ = false;
}
TRANSPORT_ALWAYS_INLINE void runOneEvent() {
@@ -199,9 +216,7 @@ class Portal {
io_service_.reset(); // ensure that run()/poll() will do some work
}
- is_running_ = true;
this->io_service_.run_one();
- is_running_ = false;
}
TRANSPORT_ALWAYS_INLINE void sendContentObject(
@@ -209,16 +224,17 @@ class Portal {
forwarder_interface_.send(content_object);
}
- TRANSPORT_ALWAYS_INLINE void stopEventsLoop() {
- is_running_ = false;
- internal_work_.reset();
-
+ TRANSPORT_ALWAYS_INLINE void stopEventsLoop(bool kill_connection = false) {
for (auto &pend_interest : pending_interest_hash_table_) {
pend_interest.second->cancelTimer();
}
clear();
+ if(kill_connection) {
+ connector_.close();
+ }
+
io_service_.post([this]() { io_service_.stop(); });
}
@@ -242,7 +258,8 @@ class Portal {
private:
TRANSPORT_ALWAYS_INLINE void processIncomingMessages(
Packet::MemBufPtr &&packet_buffer) {
- if (TRANSPORT_EXPECT_FALSE(!is_running_)) {
+ bool is_stopped = io_service_.stopped();
+ if (TRANSPORT_EXPECT_FALSE(is_stopped)) {
return;
}
@@ -293,7 +310,11 @@ class Portal {
interest_ptr->setReceived();
pending_interest_hash_table_.erase(content_object->getName());
- if (consumer_callback_) {
+ if(interest_ptr->getOnDataCallback() != UNSET_CALLBACK){
+ interest_ptr->on_content_object_callback_(
+ std::move(interest_ptr->getInterest()),
+ std::move(content_object));
+ }else if (consumer_callback_) {
consumer_callback_->onContentObject(
std::move(interest_ptr->getInterest()),
std::move(content_object));
@@ -318,9 +339,6 @@ class Portal {
private:
asio::io_service &io_service_;
asio::io_service internal_io_service_;
- std::unique_ptr<asio::io_service::work> internal_work_;
-
- volatile bool is_running_;
std::string app_name_;
diff --git a/libtransport/src/hicn/transport/core/raw_socket_interface.cc b/libtransport/src/hicn/transport/core/raw_socket_interface.cc
index 37aaff7e0..4cf7b2ca6 100644
--- a/libtransport/src/hicn/transport/core/raw_socket_interface.cc
+++ b/libtransport/src/hicn/transport/core/raw_socket_interface.cc
@@ -41,7 +41,7 @@ void RawSocketInterface::connect(bool is_consumer) {
}
// Get interface ip address
- struct sockaddr_in6 address;
+ struct sockaddr_in6 address = {0};
utils::retrieveInterfaceAddress(output_interface_, &address);
inet6_address_.family = address.sin6_family;
diff --git a/libtransport/src/hicn/transport/core/socket_connector.cc b/libtransport/src/hicn/transport/core/socket_connector.cc
index 332b87ec7..704d3badb 100644
--- a/libtransport/src/hicn/transport/core/socket_connector.cc
+++ b/libtransport/src/hicn/transport/core/socket_connector.cc
@@ -62,6 +62,7 @@ SocketConnector::SocketConnector(PacketReceivedCallback &&receive_callback,
is_connecting_(false),
is_reconnection_(false),
data_available_(false),
+ is_closed_(false),
receive_callback_(receive_callback),
on_reconnect_callback_(on_reconnect_callback),
app_name_(app_name) {}
@@ -102,7 +103,11 @@ void SocketConnector::send(const Packet::MemBufPtr &packet) {
}
void SocketConnector::close() {
- io_service_.post([this]() { socket_.close(); });
+ io_service_.dispatch([this]() {
+ is_closed_ = true;
+ socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both);
+ socket_.close();
+ });
}
void SocketConnector::doWrite() {
@@ -125,6 +130,9 @@ void SocketConnector::doWrite() {
if (!output_buffer_.empty()) {
doWrite();
}
+ } else if (ec.value() == static_cast<int>(std::errc::operation_canceled)) {
+ // The connection has been closed by the application.
+ return;
} else {
TRANSPORT_LOGE("%d %s", ec.value(), ec.message().c_str());
tryReconnect();
@@ -141,6 +149,9 @@ void SocketConnector::doReadBody(std::size_t body_length) {
if (TRANSPORT_EXPECT_TRUE(!ec)) {
receive_callback_(std::move(read_msg_));
doReadHeader();
+ } else if (ec.value() == static_cast<int>(std::errc::operation_canceled)) {
+ // The connection has been closed by the application.
+ return;
} else {
TRANSPORT_LOGE("%d %s", ec.value(), ec.message().c_str());
tryReconnect();
@@ -165,6 +176,9 @@ void SocketConnector::doReadHeader() {
} else {
TRANSPORT_LOGE("Decoding error. Ignoring packet.");
}
+ } else if (ec.value() == static_cast<int>(std::errc::operation_canceled)) {
+ // The connection has been closed by the application.
+ return;
} else {
TRANSPORT_LOGE("%d %s", ec.value(), ec.message().c_str());
tryReconnect();
@@ -173,11 +187,12 @@ void SocketConnector::doReadHeader() {
}
void SocketConnector::tryReconnect() {
- if (!is_connecting_) {
+ if (!is_connecting_ && !is_closed_) {
TRANSPORT_LOGE("Connection lost. Trying to reconnect...\n");
is_connecting_ = true;
is_reconnection_ = true;
io_service_.post([this]() {
+ socket_.shutdown(asio::ip::tcp::socket::shutdown_type::shutdown_both);
socket_.close();
startConnectionTimer();
doConnect();
diff --git a/libtransport/src/hicn/transport/core/socket_connector.h b/libtransport/src/hicn/transport/core/socket_connector.h
index d7a05aab4..e014111e2 100644
--- a/libtransport/src/hicn/transport/core/socket_connector.h
+++ b/libtransport/src/hicn/transport/core/socket_connector.h
@@ -19,6 +19,7 @@
#include <hicn/transport/core/name.h>
#include <hicn/transport/utils/branch_prediction.h>
+#include <asio/steady_timer.hpp>
#include <asio.hpp>
#include <deque>
@@ -78,6 +79,7 @@ class SocketConnector : public Connector {
bool is_connecting_;
bool is_reconnection_;
bool data_available_;
+ bool is_closed_;
PacketReceivedCallback receive_callback_;
OnReconnect on_reconnect_callback_;
diff --git a/libtransport/src/hicn/transport/interfaces/full_duplex_socket.cc b/libtransport/src/hicn/transport/interfaces/full_duplex_socket.cc
index 37e1d7b3e..e06858cc3 100644
--- a/libtransport/src/hicn/transport/interfaces/full_duplex_socket.cc
+++ b/libtransport/src/hicn/transport/interfaces/full_duplex_socket.cc
@@ -397,7 +397,7 @@ void AsyncFullDuplexSocket::onContentRetrieved(ConsumerSocket &s,
return;
}
- TRANSPORT_LOGI("Received content with size %lu", size);
+ TRANSPORT_LOGI("Received content with size %zu", size);
if (!ec) {
read_callback_->readBufferAvailable(std::move(*receive_buffer_));
} else {
diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_consumer.cc b/libtransport/src/hicn/transport/interfaces/rtc_socket_consumer.cc
index de3e84417..cc4f478af 100755
--- a/libtransport/src/hicn/transport/interfaces/rtc_socket_consumer.cc
+++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_consumer.cc
@@ -22,6 +22,9 @@ namespace interface {
RTCConsumerSocket::RTCConsumerSocket(int protocol, asio::io_service &io_service)
: ConsumerSocket(protocol, io_service) {}
+RTCConsumerSocket::RTCConsumerSocket(int protocol)
+ : ConsumerSocket(protocol) {}
+
RTCConsumerSocket::~RTCConsumerSocket() {}
void RTCConsumerSocket::handleRTCPPacket(uint8_t *packet, size_t len) {
diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_consumer.h b/libtransport/src/hicn/transport/interfaces/rtc_socket_consumer.h
index 86ccf6e22..cfde3128d 100755
--- a/libtransport/src/hicn/transport/interfaces/rtc_socket_consumer.h
+++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_consumer.h
@@ -25,6 +25,8 @@ class RTCConsumerSocket : public ConsumerSocket {
public:
explicit RTCConsumerSocket(int protocol, asio::io_service &io_service);
+ explicit RTCConsumerSocket(int protocol);
+
~RTCConsumerSocket();
void handleRTCPPacket(uint8_t *packet, size_t len);
diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc
index 7b39e7ac9..f19502dee 100644
--- a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc
+++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc
@@ -55,6 +55,24 @@ RTCProducerSocket::RTCProducerSocket(asio::io_service &io_service)
prodLabel_ = ((rand() % 255) << 24UL);
}
+RTCProducerSocket::RTCProducerSocket()
+ : ProducerSocket(),
+ currentSeg_(1),
+ nack_(std::make_shared<ContentObject>()),
+ producedBytes_(0),
+ producedPackets_(0),
+ bytesProductionRate_(0),
+ packetsProductionRate_(INIT_PACKET_PRODUCTION_RATE),
+ perSecondFactor_(1000 / STATS_INTERVAL_DURATION) {
+ auto nack_payload = utils::MemBuf::create(NACK_HEADER_SIZE);
+ nack_payload->append(NACK_HEADER_SIZE);
+ nack_->appendPayload(std::move(nack_payload));
+ lastStats_ = std::chrono::steady_clock::now();
+ srand(time(NULL));
+ prodLabel_ = ((rand() % 255) << 24UL);
+}
+
+
RTCProducerSocket::~RTCProducerSocket() {}
void RTCProducerSocket::registerName(Prefix &producer_namespace) {
diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h
index 1a42bdc56..f1bcaa9e8 100755
--- a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h
+++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h
@@ -28,6 +28,9 @@ namespace interface {
class RTCProducerSocket : public ProducerSocket {
public:
RTCProducerSocket(asio::io_service &io_service);
+
+ RTCProducerSocket();
+
~RTCProducerSocket();
void registerName(Prefix &producer_namespace);
diff --git a/libtransport/src/hicn/transport/interfaces/socket_consumer.cc b/libtransport/src/hicn/transport/interfaces/socket_consumer.cc
index 27ed4e65f..89411e92c 100644
--- a/libtransport/src/hicn/transport/interfaces/socket_consumer.cc
+++ b/libtransport/src/hicn/transport/interfaces/socket_consumer.cc
@@ -99,7 +99,7 @@ int ConsumerSocket::consume(const Name &name,
transport_protocol_->start(receive_buffer);
- return CONSUMER_READY;
+ return CONSUMER_FINISHED;
}
int ConsumerSocket::asyncConsume(
@@ -115,7 +115,7 @@ int ConsumerSocket::asyncConsume(
});
}
- return CONSUMER_READY;
+ return CONSUMER_RUNNING;
}
void ConsumerSocket::asyncSendInterest(Interest::Ptr &&interest,
diff --git a/libtransport/src/hicn/transport/interfaces/socket_consumer.h b/libtransport/src/hicn/transport/interfaces/socket_consumer.h
index 9e309aae8..536d2fde3 100755
--- a/libtransport/src/hicn/transport/interfaces/socket_consumer.h
+++ b/libtransport/src/hicn/transport/interfaces/socket_consumer.h
@@ -26,8 +26,9 @@
#include <hicn/transport/utils/event_thread.h>
#include <hicn/transport/utils/sharable_vector.h>
-#define CONSUMER_READY 0
+#define CONSUMER_FINISHED 0
#define CONSUMER_BUSY 1
+#define CONSUMER_RUNNING 2
namespace transport {
diff --git a/libtransport/src/hicn/transport/interfaces/socket_producer.cc b/libtransport/src/hicn/transport/interfaces/socket_producer.cc
index 69adc2b3f..d9204f111 100755
--- a/libtransport/src/hicn/transport/interfaces/socket_producer.cc
+++ b/libtransport/src/hicn/transport/interfaces/socket_producer.cc
@@ -54,9 +54,9 @@ ProducerSocket::ProducerSocket(asio::io_service &io_service)
}
ProducerSocket::~ProducerSocket() {
- TRANSPORT_LOGI("Destroying the ProducerSocket");
+
processing_thread_stop_ = true;
- portal_->stopEventsLoop();
+ portal_->stopEventsLoop(true);
if (processing_thread_.joinable()) {
processing_thread_.join();
@@ -79,8 +79,6 @@ void ProducerSocket::serveForever() {
}
void ProducerSocket::stop() {
- TRANSPORT_LOGI("Calling stop for ProducerSocket");
- portal_->killConnection();
portal_->stopEventsLoop();
}