/* * Copyright (c) 2017-2019 Cisco and/or its affiliates. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at: * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include #include #include namespace transport { namespace interface { static const std::string producer_identity = "producer_socket"; AsyncFullDuplexSocket::AsyncFullDuplexSocket(const Prefix &locator) : AsyncFullDuplexSocket(locator, internal_io_service_) {} AsyncFullDuplexSocket::AsyncFullDuplexSocket(const Prefix &locator, asio::io_service &io_service) : locator_(locator), incremental_suffix_(0), io_service_(io_service), work_(io_service), producer_(std::make_unique(io_service_)), consumer_(std::make_unique( TransportProtocolAlgorithms::RAAQM /* , io_service_ */)), read_callback_(nullptr), write_callback_(nullptr), connect_callback_(nullptr), accept_callback_(nullptr), internal_connect_callback_(new OnConnectCallback(*this)), internal_signal_callback_(new OnSignalCallback(*this)), send_timeout_milliseconds_(~0), counters_({0}), receive_buffer_(ContentBuffer()) { using namespace transport; using namespace std::placeholders; producer_->registerPrefix(locator); producer_->setSocketOption( ProducerCallbacksOptions::CACHE_MISS, std::bind(&AsyncFullDuplexSocket::onControlInterest, this, _1, _2)); producer_->setSocketOption(GeneralTransportOptions::OUTPUT_BUFFER_SIZE, uint32_t{150000}); ProducerContentCallback producer_callback = std::bind(&AsyncFullDuplexSocket::onContentProduced, this, _1, _2, _3); producer_->setSocketOption(ProducerCallbacksOptions::CONTENT_PRODUCED, producer_callback); producer_->connect(); consumer_->setSocketOption(ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY, (ConsumerContentObjectVerificationCallback)[]( ConsumerSocket & s, const ContentObject &c) ->bool { return true; }); ConsumerContentCallback consumer_callback = std::bind(&AsyncFullDuplexSocket::onContentRetrieved, this, _1, _2, _3); consumer_->setSocketOption(ConsumerCallbacksOptions::CONTENT_RETRIEVED, consumer_callback); consumer_->setSocketOption(GeneralTransportOptions::MAX_INTEREST_RETX, uint32_t{4}); consumer_->connect(); } void AsyncFullDuplexSocket::close() { this->consumer_->stop(); this->producer_->stop(); } void AsyncFullDuplexSocket::closeNow() { close(); } void AsyncFullDuplexSocket::shutdownWrite() { producer_->stop(); } void AsyncFullDuplexSocket::shutdownWriteNow() { shutdownWrite(); } bool AsyncFullDuplexSocket::good() const { return true; } bool AsyncFullDuplexSocket::readable() const { // TODO return status of consumer socket return true; } bool AsyncFullDuplexSocket::writable() const { // TODO return status of producer socket return true; } bool AsyncFullDuplexSocket::isPending() const { // TODO save if there are production operation in the ops queue // in producer socket return true; } bool AsyncFullDuplexSocket::connected() const { // No real connection here (ICN world). Return good return good(); } bool AsyncFullDuplexSocket::error() const { return !good(); } void AsyncFullDuplexSocket::setSendTimeout(uint32_t milliseconds) { // TODO if production takes too much to complete // let's abort the operation. // Normally with hicn this should be done for content // pull, not for production. send_timeout_milliseconds_ = milliseconds; } uint32_t AsyncFullDuplexSocket::getSendTimeout() const { return send_timeout_milliseconds_; } size_t AsyncFullDuplexSocket::getAppBytesWritten() const { return counters_.app_bytes_written_; } size_t AsyncFullDuplexSocket::getRawBytesWritten() const { return 0; } size_t AsyncFullDuplexSocket::getAppBytesReceived() const { return counters_.app_bytes_read_; } size_t AsyncFullDuplexSocket::getRawBytesReceived() const { return 0; } void AsyncFullDuplexSocket::connect(ConnectCallback *callback, const core::Prefix &prefix) { connect_callback_ = callback; // Create an interest for a subscription auto interest = core::Interest::Ptr(new core::Interest(prefix.makeRandomName())); auto _payload = utils::MemBuf::create(sizeof(ActionMessage)); _payload->append(sizeof(ActionMessage)); auto payload = _payload->writableData(); ActionMessage *subscription_message = reinterpret_cast(payload); subscription_message->header.msg_type = MessageType::ACTION; subscription_message->action = Action::SUBSCRIBE; subscription_message->header.reserved[0] = 0; subscription_message->header.reserved[1] = 0; // Set the name the other part should use for notifying a content production sync_notification_ = std::move(locator_.makeRandomName()); sync_notification_.copyToDestination( reinterpret_cast(subscription_message->name)); TRANSPORT_LOGI( "Trying to connect. Sending interest: %s, name for notifications: %s", prefix.getName().toString().c_str(), sync_notification_.toString().c_str()); interest->setLifetime(1000); interest->appendPayload(std::move(_payload)); consumer_->asyncSendInterest(std::move(interest), internal_connect_callback_.get()); } void AsyncFullDuplexSocket::write(WriteCallback *callback, const void *buf, size_t bytes, const PublicationOptions &options, WriteFlags flags) { using namespace transport; // 1 asynchronously write the content. I assume here the // buffer contains the whole application frame. FIXME: check // if this is true and fix it accordingly std::cout << "Size of the PAYLOAD: " << bytes << std::endl; if (bytes > core::Packet::default_mtu - sizeof(PayloadMessage)) { TRANSPORT_LOGI("Producing content with name %s", options.getName().toString().c_str()); producer_->asyncProduce(options.getName(), reinterpret_cast(buf), bytes); signalProductionToSubscribers(options.getName()); } else { TRANSPORT_LOGI("Sending payload through interest"); piggybackPayloadToSubscribers( options.getName(), reinterpret_cast(buf), bytes); } } void AsyncFullDuplexSocket::write(WriteCallback *callback, ContentBuffer &&output_buffer, const PublicationOptions &options, WriteFlags flags) { using namespace transport; // 1 asynchronously write the content. I assume here the // buffer contains the whole application frame. FIXME: check // if this is true and fix it accordingly std::cout << "Size of the PAYLOAD: " << output_buffer->size() << std::endl; if (output_buffer->size() > core::Packet::default_mtu - sizeof(PayloadMessage)) { TRANSPORT_LOGI("Producing content with name %s", options.getName().toString().c_str()); producer_->asyncProduce(options.getName(), std::move(output_buffer)); signalProductionToSubscribers(options.getName()); } else { TRANSPORT_LOGI("Sending payload through interest"); piggybackPayloadToSubscribers(options.getName(), &(*output_buffer)[0], output_buffer->size()); } } void AsyncFullDuplexSocket::piggybackPayloadToSubscribers( const core::Name &name, const uint8_t *buffer, std::size_t bytes) { for (auto &sub : subscribers_) { auto interest = core::Interest::Ptr(new core::Interest(name)); auto _payload = utils::MemBuf::create(bytes + sizeof(PayloadMessage)); _payload->append(bytes + sizeof(PayloadMessage)); auto payload = _payload->writableData(); PayloadMessage *interest_payload = reinterpret_cast(payload); interest_payload->header.msg_type = MessageType::PAYLOAD; interest_payload->header.reserved[0] = 0; interest_payload->header.reserved[1] = 0; interest_payload->reserved[0] = 0; std::memcpy(payload + sizeof(PayloadMessage), buffer, bytes); interest->appendPayload(std::move(_payload)); // Set the timeout of 0.2 second interest->setLifetime(1000); interest->setName(sub); interest->getWritableName().setSuffix(incremental_suffix_++); // TRANSPORT_LOGI("Sending signalization to %s", // interest->getName().toString().c_str()); consumer_->asyncSendInterest(std::move(interest), internal_signal_callback_.get()); } } void AsyncFullDuplexSocket::signalProductionToSubscribers( const core::Name &name) { // Signal the other part we are producing a content // Create an interest for a subscription for (auto &sub : subscribers_) { auto interest = core::Interest::Ptr(new core::Interest(name)); // Todo consider using preallocated pool of membufs auto _payload = utils::MemBuf::create(sizeof(ActionMessage)); _payload->append(sizeof(ActionMessage)); auto payload = interest->getPayload()->writableData(); ActionMessage *produce_notification = reinterpret_cast(payload); produce_notification->header.msg_type = MessageType::ACTION; produce_notification->action = Action::SIGNAL_PRODUCTION; produce_notification->header.reserved[0] = 0; produce_notification->header.reserved[1] = 0; name.copyToDestination( reinterpret_cast(produce_notification->name)); interest->appendPayload(std::move(_payload)); // Set the timeout of 0.2 second interest->setLifetime(1000); interest->setName(sub); interest->getWritableName().setSuffix(incremental_suffix_++); // TRANSPORT_LOGI("Sending signalization to %s", // interest->getName().toString().c_str()); consumer_->asyncSendInterest(std::move(interest), internal_signal_callback_.get()); } } void AsyncFullDuplexSocket::waitForSubscribers(AcceptCallback *cb) { accept_callback_ = cb; } std::shared_ptr AsyncFullDuplexSocket::decodeSynchronizationMessage( const core::Interest &interest) { auto mesg = interest.getPayload(); const MessageHeader *header = reinterpret_cast(mesg->data()); switch (header->msg_type) { case MessageType::ACTION: { // Check what is the action to perform const ActionMessage *message = reinterpret_cast(header); if (message->action == Action::SUBSCRIBE) { // Add consumer to list on consumers to be notified auto ret = subscribers_.emplace(AF_INET6, (const uint8_t *)message->name, 0); TRANSPORT_LOGI("Added subscriber %s :)", ret.first->toString().c_str()); if (ret.second) { accept_callback_->connectionAccepted(*ret.first); } TRANSPORT_LOGI("Connection success!"); sync_notification_ = std::move(locator_.makeRandomName()); return createSubscriptionResponse(sync_notification_); } else if (message->action == Action::CANCEL_SUBSCRIPTION) { // XXX Modify name!!! Each allocated name allocates a 128 bit array. subscribers_.erase( core::Name(AF_INET6, (const uint8_t *)message->name, 0)); return createAck(); } else if (message->action == Action::SIGNAL_PRODUCTION) { // trigger a reverse pull for the name contained in the message core::Name n(AF_INET6, (const uint8_t *)message->name, 0); std::cout << "PROD NOTIFICATION: Content to retrieve: " << n << std::endl; std::cout << "PROD NOTIFICATION: Interest name: " << interest.getName() << std::endl; // << " compared to " << sync_notification_ << // std::endl; if (sync_notification_.equals(interest.getName(), false)) { std::cout << "Starting reverse pull for " << n << std::endl; consumer_->asyncConsume(n, receive_buffer_); return createAck(); } } else { TRANSPORT_LOGE("Received unknown message. Dropping it."); } break; } case MessageType::RESPONSE: { throw errors::RuntimeException( "The response should be a content object!!"); } case MessageType::PAYLOAD: { // The interest contains the payload directly. // We saved one round trip :) auto buffer = ContentBuffer(); const uint8_t *data = mesg->data() + sizeof(PayloadMessage); buffer->assign(data, data + mesg->length() - sizeof(PayloadMessage)); read_callback_->readBufferAvailable(std::move(buffer)); return createAck(); } default: { return std::shared_ptr(nullptr); } } return std::shared_ptr(nullptr); } void AsyncFullDuplexSocket::onControlInterest(ProducerSocket &s, const core::Interest &i) { auto payload = i.getPayload(); if (payload->length()) { // Try to decode payload and see if starting an async pull operation auto response = decodeSynchronizationMessage(i); if (response) { response->setName(i.getName()); s.produce(*response); } } } void AsyncFullDuplexSocket::onContentProduced(ProducerSocket &producer, const std::error_code &ec, uint64_t bytes_written) { if (write_callback_) { if (!ec) { write_callback_->writeSuccess(); } else { write_callback_->writeErr(bytes_written); } } } void AsyncFullDuplexSocket::onContentRetrieved(ConsumerSocket &s, std::size_t size, const std::error_code &ec) { // Sanity check if (size != receive_buffer_->size()) { TRANSPORT_LOGE( "Received content size differs from size retrieved from the buffer."); return; } TRANSPORT_LOGI("Received content with size %zu", size); if (!ec) { read_callback_->readBufferAvailable(std::move(receive_buffer_)); } else { TRANSPORT_LOGE("Error retrieving content."); } // consumer_->stop(); } void AsyncFullDuplexSocket::OnConnectCallback::onContentObject( core::Interest::Ptr &&, core::ContentObject::Ptr &&content_object) { // The ack message should contain the name to be used for notifying // the production of the content to the other part if (content_object->getPayload()->length() == 0) { TRANSPORT_LOGW("Connection response message empty...."); return; } SubscriptionResponseMessage *response = reinterpret_cast( content_object->getPayload()->writableData()); if (response->response.header.msg_type == MessageType::RESPONSE) { if (response->response.return_code == ReturnCode::OK) { auto ret = socket_.subscribers_.emplace(AF_INET6, (uint8_t *)response->name, 0); TRANSPORT_LOGI("Successfully connected!!!! Subscriber added: %s", ret.first->toString().c_str()); socket_.connect_callback_->connectSuccess(); } } } void AsyncFullDuplexSocket::OnSignalCallback::onContentObject( core::Interest::Ptr &&, core::ContentObject::Ptr &&content_object) { return; } void AsyncFullDuplexSocket::OnSignalCallback::onTimeout( core::Interest::Ptr &&interest) { TRANSPORT_LOGE("Retransmitting signalization interest to %s!!", interest->getName().toString().c_str()); socket_.consumer_->asyncSendInterest(std::move(interest), socket_.internal_signal_callback_.get()); } void AsyncFullDuplexSocket::OnConnectCallback::onTimeout( core::Interest::Ptr &&interest) { socket_.connect_callback_->connectErr( std::make_error_code(std::errc::not_connected)); } std::shared_ptr AsyncFullDuplexSocket::createAck() { // Send the response back core::Name name("b001::abcd"); auto response = std::make_shared(name); auto _payload = utils::MemBuf::create(sizeof(ActionMessage)); _payload->append(sizeof(ResponseMessage)); auto payload = response->getPayload()->data(); ResponseMessage *response_message = (ResponseMessage *)payload; response_message->header.msg_type = MessageType::RESPONSE; response_message->header.reserved[0] = 0; response_message->header.reserved[1] = 0; response_message->return_code = ReturnCode::OK; response->appendPayload(std::move(_payload)); response->setLifetime(0); return response; } std::shared_ptr AsyncFullDuplexSocket::createSubscriptionResponse(const core::Name &name) { // Send the response back core::Name tmp_name("b001::abcd"); auto response = std::make_shared(tmp_name); auto _payload = utils::MemBuf::create(sizeof(SubscriptionResponseMessage)); _payload->append(sizeof(SubscriptionResponseMessage)); auto payload = _payload->data(); SubscriptionResponseMessage *response_message = (SubscriptionResponseMessage *)payload; response_message->response.header.msg_type = MessageType::RESPONSE; response_message->response.header.reserved[0] = 0; response_message->response.header.reserved[1] = 0; response_message->response.return_code = ReturnCode::OK; name.copyToDestination(reinterpret_cast(response_message->name)); response->appendPayload(std::move(_payload)); response->setLifetime(0); return response; } } // namespace interface } // namespace transport