diff options
-rw-r--r-- | libtransport/src/io_modules/forwarder/forwarder.cc | 23 | ||||
-rw-r--r-- | libtransport/src/io_modules/forwarder/forwarder.h | 2 | ||||
-rw-r--r-- | libtransport/src/io_modules/forwarder/forwarder_module.cc | 2 | ||||
-rw-r--r-- | libtransport/src/protocols/prod_protocol_rtc.cc | 4 | ||||
-rw-r--r-- | libtransport/src/test/CMakeLists.txt | 4 | ||||
-rw-r--r-- | libtransport/src/test/test_consumer_producer_rtc.cc | 32 |
6 files changed, 40 insertions, 27 deletions
diff --git a/libtransport/src/io_modules/forwarder/forwarder.cc b/libtransport/src/io_modules/forwarder/forwarder.cc index bfe4dd5de..d5f0b589e 100644 --- a/libtransport/src/io_modules/forwarder/forwarder.cc +++ b/libtransport/src/io_modules/forwarder/forwarder.cc @@ -161,19 +161,24 @@ void Forwarder::onPacketReceived(Connector *connector, // Forward packet to local connectors } -void Forwarder::send(Packet &packet) { +void Forwarder::send(Packet &packet, Connector::Id connector_id) { // TODo Here a nice PIT/CS / FIB would be required:) // For now let's just forward the packet on the remote connector we get - if (remote_connectors_.begin() == remote_connectors_.end()) { - return; + for (auto &c : remote_connectors_) { + auto remote_endpoint = c.second->getRemoteEndpoint(); + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "Sending packet to: " << remote_endpoint.getAddress() << ":" + << remote_endpoint.getPort(); + c.second->send(packet); } - auto remote_endpoint = - remote_connectors_.begin()->second->getRemoteEndpoint(); - DLOG_IF(INFO, VLOG_IS_ON(3)) - << "Sending packet to: " << remote_endpoint.getAddress() << ":" - << remote_endpoint.getPort(); - remote_connectors_.begin()->second->send(packet); + for (auto &c : local_connectors_) { + if (c.first != connector_id) { + DLOG_IF(INFO, VLOG_IS_ON(3)) + << "Sending packet to local connector " << c.first << std::endl; + c.second->receive({packet.shared_from_this()}); + } + } } void Forwarder::onPacketSent(Connector *connector, const std::error_code &ec) {} diff --git a/libtransport/src/io_modules/forwarder/forwarder.h b/libtransport/src/io_modules/forwarder/forwarder.h index 9ad989fcd..1022bf81b 100644 --- a/libtransport/src/io_modules/forwarder/forwarder.h +++ b/libtransport/src/io_modules/forwarder/forwarder.h @@ -54,7 +54,7 @@ class Forwarder { Connector::Ptr getConnector(Connector::Id id); - void send(Packet &packet); + void send(Packet &packet, Connector::Id id); void stop(); diff --git a/libtransport/src/io_modules/forwarder/forwarder_module.cc b/libtransport/src/io_modules/forwarder/forwarder_module.cc index 77d2b5e6a..ca9466f01 100644 --- a/libtransport/src/io_modules/forwarder/forwarder_module.cc +++ b/libtransport/src/io_modules/forwarder/forwarder_module.cc @@ -34,7 +34,7 @@ bool ForwarderModule::isConnected() { return true; } void ForwarderModule::send(Packet &packet) { IoModule::send(packet); - forwarder_.send(packet); + forwarder_.send(packet, connector_id_); DLOG_IF(INFO, VLOG_IS_ON(3)) << "Sending from " << connector_id_ << " to " << 1 - connector_id_; } diff --git a/libtransport/src/protocols/prod_protocol_rtc.cc b/libtransport/src/protocols/prod_protocol_rtc.cc index 3d1562801..aebad23d6 100644 --- a/libtransport/src/protocols/prod_protocol_rtc.cc +++ b/libtransport/src/protocols/prod_protocol_rtc.cc @@ -699,8 +699,6 @@ void RTCProductionProtocol::sendContentObject( signer_->signPacket(content_object.get()); } - portal_->sendContentObject(*content_object); - // Compute and save data packet digest if (manifest_max_capacity_ && !is_ah) { auth::CryptoHashType hash_algo; @@ -709,6 +707,8 @@ void RTCProductionProtocol::sendContentObject( manifest_entries_.push({content_object->getName().getSuffix(), content_object->computeDigest(hash_algo)}); } + + portal_->sendContentObject(*content_object); } void RTCProductionProtocol::onFecPackets(fec::BufferArray &packets) { diff --git a/libtransport/src/test/CMakeLists.txt b/libtransport/src/test/CMakeLists.txt index 864006e5d..56edcd102 100644 --- a/libtransport/src/test/CMakeLists.txt +++ b/libtransport/src/test/CMakeLists.txt @@ -17,8 +17,8 @@ list(APPEND TESTS_SRC main.cc test_aggregated_header.cc - #######test_auth.cc - # test_consumer_producer_rtc.cc + test_auth.cc + test_consumer_producer_rtc.cc test_core_manifest.cc # test_event_thread.cc test_fec_base_rs.cc diff --git a/libtransport/src/test/test_consumer_producer_rtc.cc b/libtransport/src/test/test_consumer_producer_rtc.cc index b11a6a388..d7378fc72 100644 --- a/libtransport/src/test/test_consumer_producer_rtc.cc +++ b/libtransport/src/test/test_consumer_producer_rtc.cc @@ -25,6 +25,17 @@ namespace interface { namespace { +class IoModuleInit { + public: + IoModuleInit() { + global_config::IoModuleConfiguration config; + config.name = "forwarder_module"; + config.set(); + } +}; + +static IoModuleInit init; + class ConsumerProducerTest : public ::testing::Test, public ConsumerSocket::ReadCallback { static const constexpr char prefix[] = "b001::1/128"; @@ -40,16 +51,12 @@ class ConsumerProducerTest : public ::testing::Test, : io_service_(), rtc_timer_(io_service_), stop_timer_(io_service_), - consumer_(TransportProtocolAlgorithms::RTC, io_service_), - producer_(ProductionProtocolAlgorithms::RTC_PROD, thread_), + consumer_(TransportProtocolAlgorithms::RTC), + producer_(ProductionProtocolAlgorithms::RTC_PROD), producer_prefix_(prefix), consumer_name_(name), packets_sent_(0), - packets_received_(0) { - global_config::IoModuleConfiguration config; - config.name = "loopback_module"; - config.set(); - } + packets_received_(0) {} virtual ~ConsumerProducerTest() { // You can do clean-up work that doesn't throw exceptions here. @@ -69,6 +76,7 @@ class ConsumerProducerTest : public ::testing::Test, consumer_.connect(); producer_.registerPrefix(producer_prefix_); producer_.connect(); + producer_.start(); } virtual void TearDown() override { @@ -116,7 +124,9 @@ class ConsumerProducerTest : public ::testing::Test, *max_length = receive_buffer_size; } - void readDataAvailable(std::size_t length) noexcept override {} + void readDataAvailable(std::size_t length) noexcept override { + packets_received_++; + } size_t maxBufferSize() const override { return receive_buffer_size; } @@ -125,9 +135,7 @@ class ConsumerProducerTest : public ::testing::Test, FAIL() << "Error while reading from RTC socket"; } - void readSuccess(std::size_t total_size) noexcept override { - packets_received_++; - } + void readSuccess(std::size_t total_size) noexcept override {} asio::io_service io_service_; asio::steady_timer rtc_timer_; @@ -148,7 +156,7 @@ const char ConsumerProducerTest::name[]; } // namespace -TEST_F(ConsumerProducerTest, DISABLED_EndToEnd) { +TEST_F(ConsumerProducerTest, EndToEnd) { produceRTCPacket(std::error_code()); consumer_.consume(consumer_name_); setStopTimer(); |