summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMichele Papalini <micpapal@cisco.com>2022-09-29 13:53:38 +0000
committerGerrit Code Review <gerrit@fd.io>2022-09-29 13:53:38 +0000
commit29647f687c8dadc90e2ba4d3a772eee09a1a4f1b (patch)
tree82e321ccbe834e0c0d5faa3c175e10f93ccacbff
parentd0c49cbf2887412e3b661d84032077db35c6d236 (diff)
parent7fd052761d21bfa38839a27cc9d03ef77a01f411 (diff)
Merge "fix(rtc-production-protocol): do not modify packet just after sending it"
-rw-r--r--libtransport/src/io_modules/forwarder/forwarder.cc23
-rw-r--r--libtransport/src/io_modules/forwarder/forwarder.h2
-rw-r--r--libtransport/src/io_modules/forwarder/forwarder_module.cc2
-rw-r--r--libtransport/src/protocols/prod_protocol_rtc.cc4
-rw-r--r--libtransport/src/test/CMakeLists.txt4
-rw-r--r--libtransport/src/test/test_consumer_producer_rtc.cc32
6 files changed, 40 insertions, 27 deletions
diff --git a/libtransport/src/io_modules/forwarder/forwarder.cc b/libtransport/src/io_modules/forwarder/forwarder.cc
index bfe4dd5de..d5f0b589e 100644
--- a/libtransport/src/io_modules/forwarder/forwarder.cc
+++ b/libtransport/src/io_modules/forwarder/forwarder.cc
@@ -161,19 +161,24 @@ void Forwarder::onPacketReceived(Connector *connector,
// Forward packet to local connectors
}
-void Forwarder::send(Packet &packet) {
+void Forwarder::send(Packet &packet, Connector::Id connector_id) {
// TODo Here a nice PIT/CS / FIB would be required:)
// For now let's just forward the packet on the remote connector we get
- if (remote_connectors_.begin() == remote_connectors_.end()) {
- return;
+ for (auto &c : remote_connectors_) {
+ auto remote_endpoint = c.second->getRemoteEndpoint();
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "Sending packet to: " << remote_endpoint.getAddress() << ":"
+ << remote_endpoint.getPort();
+ c.second->send(packet);
}
- auto remote_endpoint =
- remote_connectors_.begin()->second->getRemoteEndpoint();
- DLOG_IF(INFO, VLOG_IS_ON(3))
- << "Sending packet to: " << remote_endpoint.getAddress() << ":"
- << remote_endpoint.getPort();
- remote_connectors_.begin()->second->send(packet);
+ for (auto &c : local_connectors_) {
+ if (c.first != connector_id) {
+ DLOG_IF(INFO, VLOG_IS_ON(3))
+ << "Sending packet to local connector " << c.first << std::endl;
+ c.second->receive({packet.shared_from_this()});
+ }
+ }
}
void Forwarder::onPacketSent(Connector *connector, const std::error_code &ec) {}
diff --git a/libtransport/src/io_modules/forwarder/forwarder.h b/libtransport/src/io_modules/forwarder/forwarder.h
index 9ad989fcd..1022bf81b 100644
--- a/libtransport/src/io_modules/forwarder/forwarder.h
+++ b/libtransport/src/io_modules/forwarder/forwarder.h
@@ -54,7 +54,7 @@ class Forwarder {
Connector::Ptr getConnector(Connector::Id id);
- void send(Packet &packet);
+ void send(Packet &packet, Connector::Id id);
void stop();
diff --git a/libtransport/src/io_modules/forwarder/forwarder_module.cc b/libtransport/src/io_modules/forwarder/forwarder_module.cc
index 77d2b5e6a..ca9466f01 100644
--- a/libtransport/src/io_modules/forwarder/forwarder_module.cc
+++ b/libtransport/src/io_modules/forwarder/forwarder_module.cc
@@ -34,7 +34,7 @@ bool ForwarderModule::isConnected() { return true; }
void ForwarderModule::send(Packet &packet) {
IoModule::send(packet);
- forwarder_.send(packet);
+ forwarder_.send(packet, connector_id_);
DLOG_IF(INFO, VLOG_IS_ON(3))
<< "Sending from " << connector_id_ << " to " << 1 - connector_id_;
}
diff --git a/libtransport/src/protocols/prod_protocol_rtc.cc b/libtransport/src/protocols/prod_protocol_rtc.cc
index 3d1562801..aebad23d6 100644
--- a/libtransport/src/protocols/prod_protocol_rtc.cc
+++ b/libtransport/src/protocols/prod_protocol_rtc.cc
@@ -699,8 +699,6 @@ void RTCProductionProtocol::sendContentObject(
signer_->signPacket(content_object.get());
}
- portal_->sendContentObject(*content_object);
-
// Compute and save data packet digest
if (manifest_max_capacity_ && !is_ah) {
auth::CryptoHashType hash_algo;
@@ -709,6 +707,8 @@ void RTCProductionProtocol::sendContentObject(
manifest_entries_.push({content_object->getName().getSuffix(),
content_object->computeDigest(hash_algo)});
}
+
+ portal_->sendContentObject(*content_object);
}
void RTCProductionProtocol::onFecPackets(fec::BufferArray &packets) {
diff --git a/libtransport/src/test/CMakeLists.txt b/libtransport/src/test/CMakeLists.txt
index 864006e5d..56edcd102 100644
--- a/libtransport/src/test/CMakeLists.txt
+++ b/libtransport/src/test/CMakeLists.txt
@@ -17,8 +17,8 @@
list(APPEND TESTS_SRC
main.cc
test_aggregated_header.cc
- #######test_auth.cc
- # test_consumer_producer_rtc.cc
+ test_auth.cc
+ test_consumer_producer_rtc.cc
test_core_manifest.cc
# test_event_thread.cc
test_fec_base_rs.cc
diff --git a/libtransport/src/test/test_consumer_producer_rtc.cc b/libtransport/src/test/test_consumer_producer_rtc.cc
index b11a6a388..d7378fc72 100644
--- a/libtransport/src/test/test_consumer_producer_rtc.cc
+++ b/libtransport/src/test/test_consumer_producer_rtc.cc
@@ -25,6 +25,17 @@ namespace interface {
namespace {
+class IoModuleInit {
+ public:
+ IoModuleInit() {
+ global_config::IoModuleConfiguration config;
+ config.name = "forwarder_module";
+ config.set();
+ }
+};
+
+static IoModuleInit init;
+
class ConsumerProducerTest : public ::testing::Test,
public ConsumerSocket::ReadCallback {
static const constexpr char prefix[] = "b001::1/128";
@@ -40,16 +51,12 @@ class ConsumerProducerTest : public ::testing::Test,
: io_service_(),
rtc_timer_(io_service_),
stop_timer_(io_service_),
- consumer_(TransportProtocolAlgorithms::RTC, io_service_),
- producer_(ProductionProtocolAlgorithms::RTC_PROD, thread_),
+ consumer_(TransportProtocolAlgorithms::RTC),
+ producer_(ProductionProtocolAlgorithms::RTC_PROD),
producer_prefix_(prefix),
consumer_name_(name),
packets_sent_(0),
- packets_received_(0) {
- global_config::IoModuleConfiguration config;
- config.name = "loopback_module";
- config.set();
- }
+ packets_received_(0) {}
virtual ~ConsumerProducerTest() {
// You can do clean-up work that doesn't throw exceptions here.
@@ -69,6 +76,7 @@ class ConsumerProducerTest : public ::testing::Test,
consumer_.connect();
producer_.registerPrefix(producer_prefix_);
producer_.connect();
+ producer_.start();
}
virtual void TearDown() override {
@@ -116,7 +124,9 @@ class ConsumerProducerTest : public ::testing::Test,
*max_length = receive_buffer_size;
}
- void readDataAvailable(std::size_t length) noexcept override {}
+ void readDataAvailable(std::size_t length) noexcept override {
+ packets_received_++;
+ }
size_t maxBufferSize() const override { return receive_buffer_size; }
@@ -125,9 +135,7 @@ class ConsumerProducerTest : public ::testing::Test,
FAIL() << "Error while reading from RTC socket";
}
- void readSuccess(std::size_t total_size) noexcept override {
- packets_received_++;
- }
+ void readSuccess(std::size_t total_size) noexcept override {}
asio::io_service io_service_;
asio::steady_timer rtc_timer_;
@@ -148,7 +156,7 @@ const char ConsumerProducerTest::name[];
} // namespace
-TEST_F(ConsumerProducerTest, DISABLED_EndToEnd) {
+TEST_F(ConsumerProducerTest, EndToEnd) {
produceRTCPacket(std::error_code());
consumer_.consume(consumer_name_);
setStopTimer();