aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--hicn-plugin/vapi/vapi_safe.c16
-rw-r--r--libtransport/src/core/vpp_forwarder_interface.cc21
-rw-r--r--libtransport/src/core/vpp_forwarder_interface.h1
-rw-r--r--libtransport/src/utils/epoll_event_reactor.cc7
-rw-r--r--utils/src/hiperf.cc6
5 files changed, 34 insertions, 17 deletions
diff --git a/hicn-plugin/vapi/vapi_safe.c b/hicn-plugin/vapi/vapi_safe.c
index c1d66c0ac..bd9cfe61c 100644
--- a/hicn-plugin/vapi/vapi_safe.c
+++ b/hicn-plugin/vapi/vapi_safe.c
@@ -7,21 +7,21 @@
#define RESPONSE_QUEUE_SIZE 2
pthread_mutex_t *mutex = NULL;
-vapi_ctx_t g_vapi_ctx_instance = NULL;
u32 count = 0;
int lock = 0;
vapi_error_e vapi_connect_safe(vapi_ctx_t *vapi_ctx_ret, int async) {
vapi_error_e rv = VAPI_OK;
+ vapi_ctx_t g_vapi_ctx_instance = NULL;
while (!__sync_bool_compare_and_swap(&lock, 0, 1));
- if (!g_vapi_ctx_instance && !mutex)
- {
- rv = vapi_ctx_alloc(&g_vapi_ctx_instance);
- if (rv != VAPI_OK)
- goto err;
+ rv = vapi_ctx_alloc(&g_vapi_ctx_instance);
+ if (rv != VAPI_OK)
+ goto err;
+ if (!mutex)
+ {
mutex = malloc(sizeof(pthread_mutex_t));
if (!mutex)
goto err_mutex_alloc;
@@ -49,11 +49,11 @@ vapi_error_e vapi_connect_safe(vapi_ctx_t *vapi_ctx_ret, int async) {
while (!__sync_bool_compare_and_swap(&lock, 1, 0));
return rv;
- err_vapi:
- vapi_ctx_free(g_vapi_ctx_instance);
err_mutex_init:
free(mutex);
err_mutex_alloc:
+ err_vapi:
+ vapi_ctx_free(g_vapi_ctx_instance);
err:
while (!__sync_bool_compare_and_swap(&lock, 1, 0));
return VAPI_ENOMEM;
diff --git a/libtransport/src/core/vpp_forwarder_interface.cc b/libtransport/src/core/vpp_forwarder_interface.cc
index 28a2560b3..9f7beeb37 100644
--- a/libtransport/src/core/vpp_forwarder_interface.cc
+++ b/libtransport/src/core/vpp_forwarder_interface.cc
@@ -36,8 +36,6 @@ namespace transport {
namespace core {
-std::mutex VPPForwarderInterface::global_lock_;
-
VPPForwarderInterface::VPPForwarderInterface(MemifConnector &connector)
: ForwarderInterface<VPPForwarderInterface, MemifConnector>(connector),
sw_if_index_(~0),
@@ -112,9 +110,24 @@ void VPPForwarderInterface::producerConnection() {
}
void VPPForwarderInterface::connect(bool is_consumer) {
- std::lock_guard<std::mutex> connection_lock(global_lock_);
+ int retry = 20;
+
+ TRANSPORT_LOGI("Connecting to VPP through vapi.");
+ vapi_error_e ret = vapi_connect_safe(&sock_, 0);
+
+ while (ret != VAPI_OK && retry > 0) {
+ TRANSPORT_LOGE("Error connecting to VPP through vapi. Retrying..");
+ --retry;
+ ret = vapi_connect_safe(&sock_, 0);
+ }
+
+ if (ret != VAPI_OK) {
+ throw std::runtime_error(
+ "Impossible to connect to forwarder. Is VPP running?");
+ }
+
- vapi_connect_safe(&sock_, 0);
+ TRANSPORT_LOGI("Connected to VPP through vapi.");
sw_if_index_ = getMemifConfiguration();
diff --git a/libtransport/src/core/vpp_forwarder_interface.h b/libtransport/src/core/vpp_forwarder_interface.h
index bc83f476e..31d23b40d 100644
--- a/libtransport/src/core/vpp_forwarder_interface.h
+++ b/libtransport/src/core/vpp_forwarder_interface.h
@@ -79,7 +79,6 @@ class VPPForwarderInterface
uint32_t face_id2_;
bool is_consumer_;
vapi_ctx_t sock_;
- static std::mutex global_lock_;
};
} // namespace core
diff --git a/libtransport/src/utils/epoll_event_reactor.cc b/libtransport/src/utils/epoll_event_reactor.cc
index 0e6590d0e..63c08df95 100644
--- a/libtransport/src/utils/epoll_event_reactor.cc
+++ b/libtransport/src/utils/epoll_event_reactor.cc
@@ -104,12 +104,15 @@ void EpollEventReactor::runEventLoop(int timeout) {
while (run_event_loop_) {
memset(&evt, 0, sizeof(evt));
-
en = epoll_pwait(epoll_fd_, evt, 128, timeout, &sigset);
if (TRANSPORT_EXPECT_FALSE(en < 0)) {
TRANSPORT_LOGE("epoll_pwait: %s", strerror(errno));
- return;
+ if (errno == EINTR) {
+ continue;
+ } else {
+ return;
+ }
}
for (int i = 0; i < en; i++) {
diff --git a/utils/src/hiperf.cc b/utils/src/hiperf.cc
index 967241250..f4764e096 100644
--- a/utils/src/hiperf.cc
+++ b/utils/src/hiperf.cc
@@ -217,7 +217,7 @@ class HIperfClient {
: configuration_(conf),
total_duration_milliseconds_(0),
old_bytes_value_(0),
- signals_(io_service_, SIGINT),
+ signals_(io_service_),
expected_seg_(0),
lost_packets_(std::unordered_set<uint32_t>()),
rtc_callback_(configuration_.rtc_ ? new RTCCallback(*this) : nullptr),
@@ -514,6 +514,7 @@ class HIperfClient {
int run() {
std::cout << "Starting download of " << configuration_.name << std::endl;
+ signals_.add(SIGINT);
signals_.async_wait([this](const std::error_code &, const int &) {
consumer_socket_->stop();
io_service_.stop();
@@ -712,7 +713,7 @@ class HIperfServer {
public:
HIperfServer(ServerConfiguration &conf)
: configuration_(conf),
- signals_(io_service_, SIGINT),
+ signals_(io_service_),
rtc_timer_(io_service_),
unsatisfied_interests_(),
content_objects_((std::uint16_t)(1 << log2_content_object_buffer_size)),
@@ -1023,6 +1024,7 @@ class HIperfServer {
int run() {
std::cerr << "Starting to serve consumers" << std::endl;
+ signals_.add(SIGINT);
signals_.async_wait([this](const std::error_code &, const int &) {
std::cout << "STOPPING!!" << std::endl;
producer_socket_->stop();