aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/includes/hicn/transport/core/connector.h
diff options
context:
space:
mode:
Diffstat (limited to 'libtransport/includes/hicn/transport/core/connector.h')
-rw-r--r--libtransport/includes/hicn/transport/core/connector.h88
1 files changed, 55 insertions, 33 deletions
diff --git a/libtransport/includes/hicn/transport/core/connector.h b/libtransport/includes/hicn/transport/core/connector.h
index dcf38cdc8..7882b285d 100644
--- a/libtransport/includes/hicn/transport/core/connector.h
+++ b/libtransport/includes/hicn/transport/core/connector.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2021 Cisco and/or its affiliates.
+ * Copyright (c) 2021-2022 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
@@ -15,6 +15,7 @@
#pragma once
+#include <glog/logging.h>
#include <hicn/transport/core/connector_stats.h>
#include <hicn/transport/core/content_object.h>
#include <hicn/transport/core/endpoint.h>
@@ -29,6 +30,7 @@
#include <deque>
#include <functional>
+#include <system_error>
namespace transport {
@@ -50,54 +52,51 @@ class Connector : public std::enable_shared_from_this<Connector> {
enum class Role : std::uint8_t { CONSUMER, PRODUCER };
- public:
static constexpr std::size_t queue_size = 4096;
static constexpr std::uint32_t invalid_connector = ~0;
-
-#ifdef LINUX
+ static constexpr std::uint32_t max_reconnection_reattempts = 5;
static constexpr std::uint16_t max_burst = 256;
-#endif
using Ptr = std::shared_ptr<Connector>;
- using PacketQueue = std::deque<Packet::Ptr>;
+ using ReceptionBuffer = std::vector<utils::MemBuf::Ptr>;
+ using PacketQueue = std::deque<utils::MemBuf::Ptr>;
using PacketReceivedCallback = std::function<void(
- Connector *, utils::MemBuf &, const std::error_code &)>;
+ Connector *, const ReceptionBuffer &, const std::error_code &)>;
using PacketSentCallback =
std::function<void(Connector *, const std::error_code &)>;
using OnCloseCallback = std::function<void(Connector *)>;
- using OnReconnectCallback = std::function<void(Connector *)>;
+ using OnReconnectCallback =
+ std::function<void(Connector *, const std::error_code &)>;
using Id = std::uint64_t;
template <typename ReceiveCallback, typename SentCallback, typename OnClose,
typename OnReconnect>
Connector(ReceiveCallback &&receive_callback, SentCallback &&packet_sent,
OnClose &&close_callback, OnReconnect &&on_reconnect)
- : receive_callback_(std::forward<ReceiveCallback &&>(receive_callback)),
- sent_callback_(std::forward<SentCallback &&>(packet_sent)),
- on_close_callback_(std::forward<OnClose &&>(close_callback)),
- on_reconnect_callback_(std::forward<OnReconnect &&>(on_reconnect)),
- state_(State::CLOSED),
- connector_id_(invalid_connector) {}
+ : receive_callback_(std::forward<ReceiveCallback>(receive_callback)),
+ sent_callback_(std::forward<SentCallback>(packet_sent)),
+ on_close_callback_(std::forward<OnClose>(close_callback)),
+ on_reconnect_callback_(std::forward<OnReconnect>(on_reconnect)) {}
virtual ~Connector(){};
template <typename ReceiveCallback>
void setReceiveCallback(ReceiveCallback &&callback) {
- receive_callback_ = std::forward<ReceiveCallback &&>(callback);
+ receive_callback_ = std::forward<ReceiveCallback>(callback);
}
template <typename SentCallback>
void setSentCallback(SentCallback &&callback) {
- sent_callback_ = std::forward<SentCallback &&>(callback);
+ sent_callback_ = std::forward<SentCallback>(callback);
}
template <typename OnClose>
void setOnCloseCallback(OnClose &&callback) {
- on_close_callback_ = std::forward<OnClose &&>(callback);
+ on_close_callback_ = std::forward<OnClose>(callback);
}
template <typename OnReconnect>
- void setReconnectCallback(const OnReconnect &&callback) {
+ void setReconnectCallback(OnReconnect &&callback) {
on_reconnect_callback_ = std::forward<OnReconnect>(callback);
}
@@ -115,7 +114,15 @@ class Connector : public std::enable_shared_from_this<Connector> {
virtual void send(Packet &packet) = 0;
- virtual void send(const uint8_t *packet, std::size_t len) = 0;
+ virtual void send(const utils::MemBuf::Ptr &buffer) = 0;
+
+ virtual void receive(const std::vector<utils::MemBuf::Ptr> &buffers) {
+ receive_callback_(this, buffers, std::make_error_code(std::errc()));
+ }
+
+ virtual void reconnect() {
+ on_reconnect_callback_(this, std::make_error_code(std::errc()));
+ }
virtual void close() = 0;
@@ -127,36 +134,48 @@ class Connector : public std::enable_shared_from_this<Connector> {
Id getConnectorId() { return connector_id_; }
- void setConnectorName(std::string connector_name) {
+ void setConnectorName(const std::string &connector_name) {
connector_name_ = connector_name;
}
- std::string getConnectorName() { return connector_name_; }
+ std::string getConnectorName() const { return connector_name_; }
+
+ template <typename EP>
+ void setLocalEndpoint(EP &&endpoint) {
+ local_endpoint_ = std::forward<EP>(endpoint);
+ }
- Endpoint getLocalEndpoint() { return local_endpoint_; }
+ Endpoint getLocalEndpoint() const { return local_endpoint_; }
- Endpoint getRemoteEndpoint() { return remote_endpoint_; }
+ Endpoint getRemoteEndpoint() const { return remote_endpoint_; }
void setRole(Role r) { role_ = r; }
- Role getRole() { return role_; }
+ Role getRole() const { return role_; }
static utils::MemBuf::Ptr getPacketFromBuffer(uint8_t *buffer,
std::size_t size) {
utils::MemBuf::Ptr ret;
- auto format = Packet::getFormatFromBuffer(buffer, size);
+ hicn_packet_buffer_t pkbuf;
+ hicn_packet_set_buffer(&pkbuf, buffer, size, size);
+ hicn_packet_analyze(&pkbuf);
+ hicn_packet_type_t type = hicn_packet_get_type(&pkbuf);
- if (TRANSPORT_EXPECT_TRUE(format != HF_UNSPEC && !_is_icmp(format))) {
- if (Packet::isInterest(buffer)) {
+ // XXX reuse pkbuf when creating the packet, to avoid reanalyzing it
+
+ switch (type) {
+ case HICN_PACKET_TYPE_INTEREST:
ret = core::PacketManager<>::getInstance()
.getPacketFromExistingBuffer<Interest>(buffer, size);
- } else {
+ break;
+ case HICN_PACKET_TYPE_DATA:
ret = core::PacketManager<>::getInstance()
.getPacketFromExistingBuffer<ContentObject>(buffer, size);
- }
- } else {
- ret = core::PacketManager<>::getInstance().getMemBuf(buffer, size);
+ break;
+ default:
+ ret = core::PacketManager<>::getInstance().getMemBuf(buffer, size);
+ break;
}
return ret;
@@ -191,8 +210,8 @@ class Connector : public std::enable_shared_from_this<Connector> {
OnReconnectCallback on_reconnect_callback_;
// Connector state
- std::atomic<State> state_;
- Id connector_id_;
+ std::atomic<State> state_ = State::CLOSED;
+ Id connector_id_ = invalid_connector;
// Endpoints
Endpoint local_endpoint_;
@@ -206,6 +225,9 @@ class Connector : public std::enable_shared_from_this<Connector> {
// Stats
AtomicConnectorStats stats_;
+
+ // Connection attempts
+ std::uint32_t connection_reattempts_ = 0;
};
} // namespace core