diff options
Diffstat (limited to 'libtransport/src/hicn/transport/interfaces/full_duplex_socket.cc')
-rw-r--r-- | libtransport/src/hicn/transport/interfaces/full_duplex_socket.cc | 490 |
1 files changed, 0 insertions, 490 deletions
diff --git a/libtransport/src/hicn/transport/interfaces/full_duplex_socket.cc b/libtransport/src/hicn/transport/interfaces/full_duplex_socket.cc deleted file mode 100644 index fdd422dee..000000000 --- a/libtransport/src/hicn/transport/interfaces/full_duplex_socket.cc +++ /dev/null @@ -1,490 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <hicn/transport/interfaces/full_duplex_socket.h> -#include <hicn/transport/interfaces/socket_options_default_values.h> - -#include <memory> - -namespace transport { - -namespace interface { - -static const std::string producer_identity = "producer_socket"; - -AsyncFullDuplexSocket::AsyncFullDuplexSocket(const Prefix &locator) - : AsyncFullDuplexSocket(locator, internal_io_service_) {} - -AsyncFullDuplexSocket::AsyncFullDuplexSocket(const Prefix &locator, - asio::io_service &io_service) - : locator_(locator), - incremental_suffix_(0), - io_service_(io_service), - work_(io_service), - producer_(std::make_unique<ProducerSocket>(io_service_)), - consumer_(std::make_unique<ConsumerSocket>( - TransportProtocolAlgorithms::RAAQM /* , io_service_ */)), - read_callback_(nullptr), - write_callback_(nullptr), - connect_callback_(nullptr), - accept_callback_(nullptr), - internal_connect_callback_(new OnConnectCallback(*this)), - internal_signal_callback_(new OnSignalCallback(*this)), - send_timeout_milliseconds_(~0), - counters_({0}), - receive_buffer_(ContentBuffer()) { - using namespace transport; - using namespace std::placeholders; - producer_->registerPrefix(locator); - - producer_->setSocketOption( - ProducerCallbacksOptions::CACHE_MISS, - std::bind(&AsyncFullDuplexSocket::onControlInterest, this, _1, _2)); - - producer_->setSocketOption(GeneralTransportOptions::OUTPUT_BUFFER_SIZE, - uint32_t{150000}); - - ProducerContentCallback producer_callback = - std::bind(&AsyncFullDuplexSocket::onContentProduced, this, _1, _2, _3); - producer_->setSocketOption(ProducerCallbacksOptions::CONTENT_PRODUCED, - producer_callback); - - producer_->connect(); - - consumer_->setSocketOption(ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY, - (ConsumerContentObjectVerificationCallback)[]( - ConsumerSocket & s, const ContentObject &c) - ->bool { return true; }); - - ConsumerContentCallback consumer_callback = - std::bind(&AsyncFullDuplexSocket::onContentRetrieved, this, _1, _2, _3); - consumer_->setSocketOption(ConsumerCallbacksOptions::CONTENT_RETRIEVED, - consumer_callback); - - consumer_->setSocketOption(GeneralTransportOptions::MAX_INTEREST_RETX, - uint32_t{4}); - - consumer_->connect(); -} - -void AsyncFullDuplexSocket::close() { - this->consumer_->stop(); - this->producer_->stop(); -} - -void AsyncFullDuplexSocket::closeNow() { close(); } - -void AsyncFullDuplexSocket::shutdownWrite() { producer_->stop(); } - -void AsyncFullDuplexSocket::shutdownWriteNow() { shutdownWrite(); } - -bool AsyncFullDuplexSocket::good() const { return true; } - -bool AsyncFullDuplexSocket::readable() const { - // TODO return status of consumer socket - return true; -} - -bool AsyncFullDuplexSocket::writable() const { - // TODO return status of producer socket - return true; -} - -bool AsyncFullDuplexSocket::isPending() const { - // TODO save if there are production operation in the ops queue - // in producer socket - return true; -} - -bool AsyncFullDuplexSocket::connected() const { - // No real connection here (ICN world). Return good - return good(); -} - -bool AsyncFullDuplexSocket::error() const { return !good(); } - -void AsyncFullDuplexSocket::setSendTimeout(uint32_t milliseconds) { - // TODO if production takes too much to complete - // let's abort the operation. - - // Normally with hicn this should be done for content - // pull, not for production. - - send_timeout_milliseconds_ = milliseconds; -} - -uint32_t AsyncFullDuplexSocket::getSendTimeout() const { - return send_timeout_milliseconds_; -} - -size_t AsyncFullDuplexSocket::getAppBytesWritten() const { - return counters_.app_bytes_written_; -} - -size_t AsyncFullDuplexSocket::getRawBytesWritten() const { return 0; } - -size_t AsyncFullDuplexSocket::getAppBytesReceived() const { - return counters_.app_bytes_read_; -} - -size_t AsyncFullDuplexSocket::getRawBytesReceived() const { return 0; } - -void AsyncFullDuplexSocket::connect(ConnectCallback *callback, - const core::Prefix &prefix) { - connect_callback_ = callback; - - // Create an interest for a subscription - auto interest = - core::Interest::Ptr(new core::Interest(prefix.makeRandomName())); - auto _payload = utils::MemBuf::create(sizeof(ActionMessage)); - _payload->append(sizeof(ActionMessage)); - auto payload = _payload->writableData(); - ActionMessage *subscription_message = - reinterpret_cast<ActionMessage *>(payload); - subscription_message->header.msg_type = MessageType::ACTION; - subscription_message->action = Action::SUBSCRIBE; - subscription_message->header.reserved[0] = 0; - subscription_message->header.reserved[1] = 0; - - // Set the name the other part should use for notifying a content production - sync_notification_ = std::move(locator_.makeRandomName()); - sync_notification_.copyToDestination( - reinterpret_cast<uint8_t *>(subscription_message->name)); - - TRANSPORT_LOGI( - "Trying to connect. Sending interest: %s, name for notifications: %s", - prefix.getName().toString().c_str(), - sync_notification_.toString().c_str()); - - interest->setLifetime(1000); - interest->appendPayload(std::move(_payload)); - consumer_->asyncSendInterest(std::move(interest), - internal_connect_callback_.get()); -} - -void AsyncFullDuplexSocket::write(WriteCallback *callback, const void *buf, - size_t bytes, - const PublicationOptions &options, - WriteFlags flags) { - using namespace transport; - - // 1 asynchronously write the content. I assume here the - // buffer contains the whole application frame. FIXME: check - // if this is true and fix it accordingly - std::cout << "Size of the PAYLOAD: " << bytes << std::endl; - - if (bytes > core::Packet::default_mtu - sizeof(PayloadMessage)) { - TRANSPORT_LOGI("Producing content with name %s", - options.getName().toString().c_str()); - producer_->asyncProduce(options.getName(), - reinterpret_cast<const uint8_t *>(buf), bytes); - signalProductionToSubscribers(options.getName()); - } else { - TRANSPORT_LOGI("Sending payload through interest"); - piggybackPayloadToSubscribers( - options.getName(), reinterpret_cast<const uint8_t *>(buf), bytes); - } -} - -void AsyncFullDuplexSocket::write(WriteCallback *callback, - ContentBuffer &&output_buffer, - const PublicationOptions &options, - WriteFlags flags) { - using namespace transport; - - // 1 asynchronously write the content. I assume here the - // buffer contains the whole application frame. FIXME: check - // if this is true and fix it accordingly - std::cout << "Size of the PAYLOAD: " << output_buffer->size() << std::endl; - - if (output_buffer->size() > - core::Packet::default_mtu - sizeof(PayloadMessage)) { - TRANSPORT_LOGI("Producing content with name %s", - options.getName().toString().c_str()); - producer_->asyncProduce(options.getName(), std::move(output_buffer)); - signalProductionToSubscribers(options.getName()); - } else { - TRANSPORT_LOGI("Sending payload through interest"); - piggybackPayloadToSubscribers(options.getName(), &(*output_buffer)[0], - output_buffer->size()); - } -} - -void AsyncFullDuplexSocket::piggybackPayloadToSubscribers( - const core::Name &name, const uint8_t *buffer, std::size_t bytes) { - for (auto &sub : subscribers_) { - auto interest = core::Interest::Ptr(new core::Interest(name)); - auto _payload = utils::MemBuf::create(bytes + sizeof(PayloadMessage)); - _payload->append(bytes + sizeof(PayloadMessage)); - auto payload = _payload->writableData(); - - PayloadMessage *interest_payload = - reinterpret_cast<PayloadMessage *>(payload); - interest_payload->header.msg_type = MessageType::PAYLOAD; - interest_payload->header.reserved[0] = 0; - interest_payload->header.reserved[1] = 0; - interest_payload->reserved[0] = 0; - std::memcpy(payload + sizeof(PayloadMessage), buffer, bytes); - interest->appendPayload(std::move(_payload)); - - // Set the timeout of 0.2 second - interest->setLifetime(1000); - interest->setName(sub); - interest->getWritableName().setSuffix(incremental_suffix_++); - // TRANSPORT_LOGI("Sending signalization to %s", - // interest->getName().toString().c_str()); - - consumer_->asyncSendInterest(std::move(interest), - internal_signal_callback_.get()); - } -} - -void AsyncFullDuplexSocket::signalProductionToSubscribers( - const core::Name &name) { - // Signal the other part we are producing a content - // Create an interest for a subscription - - for (auto &sub : subscribers_) { - auto interest = core::Interest::Ptr(new core::Interest(name)); - // Todo consider using preallocated pool of membufs - auto _payload = utils::MemBuf::create(sizeof(ActionMessage)); - _payload->append(sizeof(ActionMessage)); - auto payload = interest->getPayload()->writableData(); - - ActionMessage *produce_notification = - reinterpret_cast<ActionMessage *>(payload); - produce_notification->header.msg_type = MessageType::ACTION; - produce_notification->action = Action::SIGNAL_PRODUCTION; - produce_notification->header.reserved[0] = 0; - produce_notification->header.reserved[1] = 0; - name.copyToDestination( - reinterpret_cast<uint8_t *>(produce_notification->name)); - interest->appendPayload(std::move(_payload)); - - // Set the timeout of 0.2 second - interest->setLifetime(1000); - interest->setName(sub); - interest->getWritableName().setSuffix(incremental_suffix_++); - // TRANSPORT_LOGI("Sending signalization to %s", - // interest->getName().toString().c_str()); - - consumer_->asyncSendInterest(std::move(interest), - internal_signal_callback_.get()); - } -} - -void AsyncFullDuplexSocket::waitForSubscribers(AcceptCallback *cb) { - accept_callback_ = cb; -} - -std::shared_ptr<core::ContentObject> -AsyncFullDuplexSocket::decodeSynchronizationMessage( - const core::Interest &interest) { - auto mesg = interest.getPayload(); - const MessageHeader *header = - reinterpret_cast<const MessageHeader *>(mesg->data()); - - switch (header->msg_type) { - case MessageType::ACTION: { - // Check what is the action to perform - const ActionMessage *message = - reinterpret_cast<const ActionMessage *>(header); - - if (message->action == Action::SUBSCRIBE) { - // Add consumer to list on consumers to be notified - auto ret = - subscribers_.emplace(AF_INET6, (const uint8_t *)message->name, 0); - TRANSPORT_LOGI("Added subscriber %s :)", ret.first->toString().c_str()); - if (ret.second) { - accept_callback_->connectionAccepted(*ret.first); - } - - TRANSPORT_LOGI("Connection success!"); - - sync_notification_ = std::move(locator_.makeRandomName()); - return createSubscriptionResponse(sync_notification_); - - } else if (message->action == Action::CANCEL_SUBSCRIPTION) { - // XXX Modify name!!! Each allocated name allocates a 128 bit array. - subscribers_.erase( - core::Name(AF_INET6, (const uint8_t *)message->name, 0)); - return createAck(); - } else if (message->action == Action::SIGNAL_PRODUCTION) { - // trigger a reverse pull for the name contained in the message - core::Name n(AF_INET6, (const uint8_t *)message->name, 0); - std::cout << "PROD NOTIFICATION: Content to retrieve: " << n - << std::endl; - std::cout << "PROD NOTIFICATION: Interest name: " << interest.getName() - << std::endl; // << " compared to " << sync_notification_ << - // std::endl; - - if (sync_notification_.equals(interest.getName(), false)) { - std::cout << "Starting reverse pull for " << n << std::endl; - consumer_->asyncConsume(n, receive_buffer_); - return createAck(); - } - } else { - TRANSPORT_LOGE("Received unknown message. Dropping it."); - } - - break; - } - case MessageType::RESPONSE: { - throw errors::RuntimeException( - "The response should be a content object!!"); - } - case MessageType::PAYLOAD: { - // The interest contains the payload directly. - // We saved one round trip :) - - auto buffer = ContentBuffer(); - const uint8_t *data = mesg->data() + sizeof(PayloadMessage); - buffer->assign(data, data + mesg->length() - sizeof(PayloadMessage)); - read_callback_->readBufferAvailable(std::move(buffer)); - return createAck(); - } - default: { return std::shared_ptr<core::ContentObject>(nullptr); } - } - - return std::shared_ptr<core::ContentObject>(nullptr); -} - -void AsyncFullDuplexSocket::onControlInterest(ProducerSocket &s, - const core::Interest &i) { - auto payload = i.getPayload(); - if (payload->length()) { - // Try to decode payload and see if starting an async pull operation - auto response = decodeSynchronizationMessage(i); - if (response) { - response->setName(i.getName()); - s.produce(*response); - } - } -} - -void AsyncFullDuplexSocket::onContentProduced(ProducerSocket &producer, - const std::error_code &ec, - uint64_t bytes_written) { - if (write_callback_) { - if (!ec) { - write_callback_->writeSuccess(); - } else { - write_callback_->writeErr(bytes_written); - } - } -} - -void AsyncFullDuplexSocket::onContentRetrieved(ConsumerSocket &s, - std::size_t size, - const std::error_code &ec) { - // Sanity check - if (size != receive_buffer_->size()) { - TRANSPORT_LOGE( - "Received content size differs from size retrieved from the buffer."); - return; - } - - TRANSPORT_LOGI("Received content with size %zu", size); - if (!ec) { - read_callback_->readBufferAvailable(std::move(receive_buffer_)); - } else { - TRANSPORT_LOGE("Error retrieving content."); - } - // consumer_->stop(); -} - -void AsyncFullDuplexSocket::OnConnectCallback::onContentObject( - core::Interest::Ptr &&, core::ContentObject::Ptr &&content_object) { - // The ack message should contain the name to be used for notifying - // the production of the content to the other part - - if (content_object->getPayload()->length() == 0) { - TRANSPORT_LOGW("Connection response message empty...."); - return; - } - - SubscriptionResponseMessage *response = - reinterpret_cast<SubscriptionResponseMessage *>( - content_object->getPayload()->writableData()); - - if (response->response.header.msg_type == MessageType::RESPONSE) { - if (response->response.return_code == ReturnCode::OK) { - auto ret = - socket_.subscribers_.emplace(AF_INET6, (uint8_t *)response->name, 0); - TRANSPORT_LOGI("Successfully connected!!!! Subscriber added: %s", - ret.first->toString().c_str()); - socket_.connect_callback_->connectSuccess(); - } - } -} - -void AsyncFullDuplexSocket::OnSignalCallback::onContentObject( - core::Interest::Ptr &&, core::ContentObject::Ptr &&content_object) { - return; -} - -void AsyncFullDuplexSocket::OnSignalCallback::onTimeout( - core::Interest::Ptr &&interest) { - TRANSPORT_LOGE("Retransmitting signalization interest to %s!!", - interest->getName().toString().c_str()); - socket_.consumer_->asyncSendInterest(std::move(interest), - socket_.internal_signal_callback_.get()); -} - -void AsyncFullDuplexSocket::OnConnectCallback::onTimeout( - core::Interest::Ptr &&interest) { - socket_.connect_callback_->connectErr( - std::make_error_code(std::errc::not_connected)); -} - -std::shared_ptr<core::ContentObject> AsyncFullDuplexSocket::createAck() { - // Send the response back - core::Name name("b001::abcd"); - auto response = std::make_shared<core::ContentObject>(name); - auto _payload = utils::MemBuf::create(sizeof(ActionMessage)); - _payload->append(sizeof(ResponseMessage)); - auto payload = response->getPayload()->data(); - ResponseMessage *response_message = (ResponseMessage *)payload; - response_message->header.msg_type = MessageType::RESPONSE; - response_message->header.reserved[0] = 0; - response_message->header.reserved[1] = 0; - response_message->return_code = ReturnCode::OK; - response->appendPayload(std::move(_payload)); - response->setLifetime(0); - return response; -} - -std::shared_ptr<core::ContentObject> -AsyncFullDuplexSocket::createSubscriptionResponse(const core::Name &name) { - // Send the response back - core::Name tmp_name("b001::abcd"); - auto response = std::make_shared<core::ContentObject>(tmp_name); - auto _payload = utils::MemBuf::create(sizeof(SubscriptionResponseMessage)); - _payload->append(sizeof(SubscriptionResponseMessage)); - auto payload = _payload->data(); - SubscriptionResponseMessage *response_message = - (SubscriptionResponseMessage *)payload; - response_message->response.header.msg_type = MessageType::RESPONSE; - response_message->response.header.reserved[0] = 0; - response_message->response.header.reserved[1] = 0; - response_message->response.return_code = ReturnCode::OK; - name.copyToDestination(reinterpret_cast<uint8_t *>(response_message->name)); - response->appendPayload(std::move(_payload)); - response->setLifetime(0); - return response; -} - -} // namespace interface -} // namespace transport |