From bac3da61644515f05663789b122554dc77549286 Mon Sep 17 00:00:00 2001 From: Luca Muscariello Date: Thu, 17 Jan 2019 13:47:57 +0100 Subject: This is the first commit of the hicn project Change-Id: I6f2544ad9b9f8891c88cc4bcce3cf19bd3cc863f Signed-off-by: Luca Muscariello --- .../src/hicn/transport/interfaces/CMakeLists.txt | 38 + .../hicn/transport/interfaces/async_transport.h | 640 ++++++++++++++ .../transport/interfaces/full_duplex_socket.cc | 490 +++++++++++ .../hicn/transport/interfaces/full_duplex_socket.h | 254 ++++++ .../transport/interfaces/publication_options.h | 34 + .../transport/interfaces/rtc_socket_consumer.cc | 35 + .../transport/interfaces/rtc_socket_consumer.h | 35 + .../transport/interfaces/rtc_socket_producer.cc | 157 ++++ .../transport/interfaces/rtc_socket_producer.h | 60 ++ .../src/hicn/transport/interfaces/socket.h | 270 ++++++ .../hicn/transport/interfaces/socket_consumer.cc | 735 ++++++++++++++++ .../hicn/transport/interfaces/socket_consumer.h | 259 ++++++ .../interfaces/socket_options_default_values.h | 68 ++ .../transport/interfaces/socket_options_keys.h | 108 +++ .../hicn/transport/interfaces/socket_producer.cc | 948 +++++++++++++++++++++ .../hicn/transport/interfaces/socket_producer.h | 269 ++++++ 16 files changed, 4400 insertions(+) create mode 100755 libtransport/src/hicn/transport/interfaces/CMakeLists.txt create mode 100755 libtransport/src/hicn/transport/interfaces/async_transport.h create mode 100755 libtransport/src/hicn/transport/interfaces/full_duplex_socket.cc create mode 100755 libtransport/src/hicn/transport/interfaces/full_duplex_socket.h create mode 100755 libtransport/src/hicn/transport/interfaces/publication_options.h create mode 100755 libtransport/src/hicn/transport/interfaces/rtc_socket_consumer.cc create mode 100755 libtransport/src/hicn/transport/interfaces/rtc_socket_consumer.h create mode 100755 libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc create mode 100755 libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h create mode 100755 libtransport/src/hicn/transport/interfaces/socket.h create mode 100755 libtransport/src/hicn/transport/interfaces/socket_consumer.cc create mode 100755 libtransport/src/hicn/transport/interfaces/socket_consumer.h create mode 100755 libtransport/src/hicn/transport/interfaces/socket_options_default_values.h create mode 100755 libtransport/src/hicn/transport/interfaces/socket_options_keys.h create mode 100755 libtransport/src/hicn/transport/interfaces/socket_producer.cc create mode 100755 libtransport/src/hicn/transport/interfaces/socket_producer.h (limited to 'libtransport/src/hicn/transport/interfaces') diff --git a/libtransport/src/hicn/transport/interfaces/CMakeLists.txt b/libtransport/src/hicn/transport/interfaces/CMakeLists.txt new file mode 100755 index 000000000..cbf371bac --- /dev/null +++ b/libtransport/src/hicn/transport/interfaces/CMakeLists.txt @@ -0,0 +1,38 @@ +# Copyright (c) 2017-2019 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +cmake_minimum_required(VERSION 3.5 FATAL_ERROR) + +list(APPEND HEADER_FILES + ${CMAKE_CURRENT_SOURCE_DIR}/socket.h + ${CMAKE_CURRENT_SOURCE_DIR}/socket_consumer.h + ${CMAKE_CURRENT_SOURCE_DIR}/socket_producer.h + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_socket_consumer.h + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_socket_producer.h + ${CMAKE_CURRENT_SOURCE_DIR}/async_transport.h + ${CMAKE_CURRENT_SOURCE_DIR}/full_duplex_socket.h + ${CMAKE_CURRENT_SOURCE_DIR}/publication_options.h + ${CMAKE_CURRENT_SOURCE_DIR}/socket_options_default_values.h + ${CMAKE_CURRENT_SOURCE_DIR}/socket_options_keys.h +) + +list(APPEND SOURCE_FILES + ${CMAKE_CURRENT_SOURCE_DIR}/full_duplex_socket.cc + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_socket_consumer.cc + ${CMAKE_CURRENT_SOURCE_DIR}/rtc_socket_producer.cc + ${CMAKE_CURRENT_SOURCE_DIR}/socket_producer.cc + ${CMAKE_CURRENT_SOURCE_DIR}/socket_consumer.cc +) + +set(SOURCE_FILES ${SOURCE_FILES} PARENT_SCOPE) +set(HEADER_FILES ${HEADER_FILES} PARENT_SCOPE) \ No newline at end of file diff --git a/libtransport/src/hicn/transport/interfaces/async_transport.h b/libtransport/src/hicn/transport/interfaces/async_transport.h new file mode 100755 index 000000000..492b4ec26 --- /dev/null +++ b/libtransport/src/hicn/transport/interfaces/async_transport.h @@ -0,0 +1,640 @@ + +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +#include +#include + +namespace transport { + +namespace interface { + +/* + * flags given by the application for write* calls + */ +enum class WriteFlags : uint32_t { + NONE = 0x00, + /* + * Whether to delay the output until a subsequent non-corked write. + * (Note: may not be supported in all subclasses or on all platforms.) + */ + CORK = 0x01, + /* + * for a socket that has ACK latency enabled, it will cause the kernel + * to fire a TCP ESTATS event when the last byte of the given write call + * will be acknowledged. + */ + EOR = 0x02, + /* + * this indicates that only the write side of socket should be shutdown + */ + WRITE_SHUTDOWN = 0x04, + /* + * use msg zerocopy if allowed + */ + WRITE_MSG_ZEROCOPY = 0x08, +}; + +/* + * union operator + */ +TRANSPORT_ALWAYS_INLINE WriteFlags operator|(WriteFlags a, WriteFlags b) { + return static_cast(static_cast(a) | + static_cast(b)); +} + +/* + * compound assignment union operator + */ +TRANSPORT_ALWAYS_INLINE WriteFlags &operator|=(WriteFlags &a, WriteFlags b) { + a = a | b; + return a; +} + +/* + * intersection operator + */ +TRANSPORT_ALWAYS_INLINE WriteFlags operator&(WriteFlags a, WriteFlags b) { + return static_cast(static_cast(a) & + static_cast(b)); +} + +/* + * compound assignment intersection operator + */ +TRANSPORT_ALWAYS_INLINE WriteFlags &operator&=(WriteFlags &a, WriteFlags b) { + a = a & b; + return a; +} + +/* + * exclusion parameter + */ +TRANSPORT_ALWAYS_INLINE WriteFlags operator~(WriteFlags a) { + return static_cast(~static_cast(a)); +} + +/* + * unset operator + */ +TRANSPORT_ALWAYS_INLINE WriteFlags unSet(WriteFlags a, WriteFlags b) { + return a & ~b; +} + +/* + * inclusion operator + */ +TRANSPORT_ALWAYS_INLINE bool isSet(WriteFlags a, WriteFlags b) { + return (a & b) == b; +} + +class ConnectCallback { + public: + virtual ~ConnectCallback() = default; + + /** + * connectSuccess() will be invoked when the connection has been + * successfully established. + */ + virtual void connectSuccess() noexcept = 0; + + /** + * connectErr() will be invoked if the connection attempt fails. + * + * @param ex An exception describing the error that occurred. + */ + virtual void connectErr(const std::error_code ec) noexcept = 0; +}; + +/** + * AsyncSocket defines an asynchronous API for streaming I/O. + * + * This class provides an API to for asynchronously waiting for data + * on a streaming transport, and for asynchronously sending data. + * + * The APIs for reading and writing are intentionally asymmetric. Waiting for + * data to read is a persistent API: a callback is installed, and is notified + * whenever new data is available. It continues to be notified of new events + * until it is uninstalled. + * + * AsyncSocket does not provide read timeout functionality, because it + * typically cannot determine when the timeout should be active. Generally, a + * timeout should only be enabled when processing is blocked waiting on data + * from the remote endpoint. For server-side applications, the timeout should + * not be active if the server is currently processing one or more outstanding + * requests on this transport. For client-side applications, the timeout + * should not be active if there are no requests pending on the transport. + * Additionally, if a client has multiple pending requests, it will ususally + * want a separate timeout for each request, rather than a single read timeout. + * + * The write API is fairly intuitive: a user can request to send a block of + * data, and a callback will be informed once the entire block has been + * transferred to the kernel, or on error. AsyncSocket does provide a send + * timeout, since most callers want to give up if the remote end stops + * responding and no further progress can be made sending the data. + */ +class AsyncSocket { + public: + /** + * Close the transport. + * + * This gracefully closes the transport, waiting for all pending write + * requests to complete before actually closing the underlying transport. + * + * If a read callback is set, readEOF() will be called immediately. If there + * are outstanding write requests, the close will be delayed until all + * remaining writes have completed. No new writes may be started after + * close() has been called. + */ + virtual void close() = 0; + + /** + * Close the transport immediately. + * + * This closes the transport immediately, dropping any outstanding data + * waiting to be written. + * + * If a read callback is set, readEOF() will be called immediately. + * If there are outstanding write requests, these requests will be aborted + * and writeError() will be invoked immediately on all outstanding write + * callbacks. + */ + virtual void closeNow() = 0; + + /** + * Perform a half-shutdown of the write side of the transport. + * + * The caller should not make any more calls to write() or writev() after + * shutdownWrite() is called. Any future write attempts will fail + * immediately. + * + * Not all transport types support half-shutdown. If the underlying + * transport does not support half-shutdown, it will fully shutdown both the + * read and write sides of the transport. (Fully shutting down the socket is + * better than doing nothing at all, since the caller may rely on the + * shutdownWrite() call to notify the other end of the connection that no + * more data can be read.) + * + * If there is pending data still waiting to be written on the transport, + * the actual shutdown will be delayed until the pending data has been + * written. + * + * Note: There is no corresponding shutdownRead() equivalent. Simply + * uninstall the read callback if you wish to stop reading. (On TCP sockets + * at least, shutting down the read side of the socket is a no-op anyway.) + */ + virtual void shutdownWrite() = 0; + + /** + * Perform a half-shutdown of the write side of the transport. + * + * shutdownWriteNow() is identical to shutdownWrite(), except that it + * immediately performs the shutdown, rather than waiting for pending writes + * to complete. Any pending write requests will be immediately failed when + * shutdownWriteNow() is called. + */ + virtual void shutdownWriteNow() = 0; + + /** + * Determine if transport is open and ready to read or write. + * + * Note that this function returns false on EOF; you must also call error() + * to distinguish between an EOF and an error. + * + * @return true iff the transport is open and ready, false otherwise. + */ + virtual bool good() const = 0; + + /** + * Determine if the transport is readable or not. + * + * @return true iff the transport is readable, false otherwise. + */ + virtual bool readable() const = 0; + + /** + * Determine if the transport is writable or not. + * + * @return true iff the transport is writable, false otherwise. + */ + virtual bool writable() const { + // By default return good() - leave it to implementers to override. + return good(); + } + + /** + * Determine if the there is pending data on the transport. + * + * @return true iff the if the there is pending data, false otherwise. + */ + virtual bool isPending() const { return readable(); } + + /** + * Determine if transport is connected to the endpoint + * + * @return false iff the transport is connected, otherwise true + */ + virtual bool connected() const = 0; + + /** + * Determine if an error has occurred with this transport. + * + * @return true iff an error has occurred (not EOF). + */ + virtual bool error() const = 0; + + // /** + // * Attach the transport to a EventBase. + // * + // * This may only be called if the transport is not currently attached to a + // * EventBase (by an earlier call to detachEventBase()). + // * + // * This method must be invoked in the EventBase's thread. + // */ + // virtual void attachEventBase(EventBase* eventBase) = 0; + + // /** + // * Detach the transport from its EventBase. + // * + // * This may only be called when the transport is idle and has no reads or + // * writes pending. Once detached, the transport may not be used again + // until + // * it is re-attached to a EventBase by calling attachEventBase(). + // * + // * This method must be called from the current EventBase's thread. + // */ + // virtual void detachEventBase() = 0; + + // /** + // * Determine if the transport can be detached. + // * + // * This method must be called from the current EventBase's thread. + // */ + // virtual bool isDetachable() const = 0; + + /** + * Set the send timeout. + * + * If write requests do not make any progress for more than the specified + * number of milliseconds, fail all pending writes and close the transport. + * + * If write requests are currently pending when setSendTimeout() is called, + * the timeout interval is immediately restarted using the new value. + * + * @param milliseconds The timeout duration, in milliseconds. If 0, no + * timeout will be used. + */ + virtual void setSendTimeout(uint32_t milliseconds) = 0; + + /** + * Get the send timeout. + * + * @return Returns the current send timeout, in milliseconds. A return value + * of 0 indicates that no timeout is set. + */ + virtual uint32_t getSendTimeout() const = 0; + + virtual void connect(ConnectCallback *callback, + const core::Prefix &prefix_) = 0; + + // /** + // * Get the address of the local endpoint of this transport. + // * + // * This function may throw AsyncSocketException on error. + // * + // * @param address The local address will be stored in the specified + // * SocketAddress. + // */ + // virtual void getLocalAddress(* address) const = 0; + + virtual size_t getAppBytesWritten() const = 0; + virtual size_t getRawBytesWritten() const = 0; + virtual size_t getAppBytesReceived() const = 0; + virtual size_t getRawBytesReceived() const = 0; + + class BufferCallback { + public: + virtual ~BufferCallback() {} + virtual void onEgressBuffered() = 0; + virtual void onEgressBufferCleared() = 0; + }; + + ~AsyncSocket() = default; +}; + +class AsyncAcceptor { + public: + class AcceptCallback { + public: + virtual ~AcceptCallback() = default; + + /** + * connectionAccepted() is called whenever a new client connection is + * received. + * + * The AcceptCallback will remain installed after connectionAccepted() + * returns. + * + * @param fd The newly accepted client socket. The AcceptCallback + * assumes ownership of this socket, and is responsible + * for closing it when done. The newly accepted file + * descriptor will have already been put into + * non-blocking mode. + * @param clientAddr A reference to a SocketAddress struct containing the + * client's address. This struct is only guaranteed to + * remain valid until connectionAccepted() returns. + */ + virtual void connectionAccepted( + const core::Name &subscriber_name) noexcept = 0; + + /** + * acceptError() is called if an error occurs while accepting. + * + * The AcceptCallback will remain installed even after an accept error, + * as the errors are typically somewhat transient, such as being out of + * file descriptors. The server socket must be explicitly stopped if you + * wish to stop accepting after an error. + * + * @param ex An exception representing the error. + */ + virtual void acceptError(const std::exception &ex) noexcept = 0; + + /** + * acceptStarted() will be called in the callback's EventBase thread + * after this callback has been added to the AsyncServerSocket. + * + * acceptStarted() will be called before any calls to connectionAccepted() + * or acceptError() are made on this callback. + * + * acceptStarted() makes it easier for callbacks to perform initialization + * inside the callback thread. (The call to addAcceptCallback() must + * always be made from the AsyncServerSocket's primary EventBase thread. + * acceptStarted() provides a hook that will always be invoked in the + * callback's thread.) + * + * Note that the call to acceptStarted() is made once the callback is + * added, regardless of whether or not the AsyncServerSocket is actually + * accepting at the moment. acceptStarted() will be called even if the + * AsyncServerSocket is paused when the callback is added (including if + * the initial call to startAccepting() on the AsyncServerSocket has not + * been made yet). + */ + virtual void acceptStarted() noexcept {} + + /** + * acceptStopped() will be called when this AcceptCallback is removed from + * the AsyncServerSocket, or when the AsyncServerSocket is destroyed, + * whichever occurs first. + * + * No more calls to connectionAccepted() or acceptError() will be made + * after acceptStopped() is invoked. + */ + virtual void acceptStopped() noexcept {} + }; + + /** + * Wait for subscribers + * + */ + virtual void waitForSubscribers(AcceptCallback *cb) = 0; +}; + +class AsyncReader { + public: + class ReadCallback { + public: + virtual ~ReadCallback() = default; + + /** + * When data becomes available, getReadBuffer() will be invoked to get the + * buffer into which data should be read. + * + * This method allows the ReadCallback to delay buffer allocation until + * data becomes available. This allows applications to manage large + * numbers of idle connections, without having to maintain a separate read + * buffer for each idle connection. + * + * It is possible that in some cases, getReadBuffer() may be called + * multiple times before readDataAvailable() is invoked. In this case, the + * data will be written to the buffer returned from the most recent call to + * readDataAvailable(). If the previous calls to readDataAvailable() + * returned different buffers, the ReadCallback is responsible for ensuring + * that they are not leaked. + * + * If getReadBuffer() throws an exception, returns a nullptr buffer, or + * returns a 0 length, the ReadCallback will be uninstalled and its + * readError() method will be invoked. + * + * getReadBuffer() is not allowed to change the transport state before it + * returns. (For example, it should never uninstall the read callback, or + * set a different read callback.) + * + * @param bufReturn getReadBuffer() should update *bufReturn to contain the + * address of the read buffer. This parameter will never + * be nullptr. + * @param lenReturn getReadBuffer() should update *lenReturn to contain the + * maximum number of bytes that may be written to the read + * buffer. This parameter will never be nullptr. + * + * + * XXX TODO this does not seems to be completely true Checlk i/. + */ + virtual void getReadBuffer(void **bufReturn, size_t *lenReturn) = 0; + + /** + * readDataAvailable() will be invoked when data has been successfully read + * into the buffer returned by the last call to getReadBuffer(). + * + * The read callback remains installed after readDataAvailable() returns. + * It must be explicitly uninstalled to stop receiving read events. + * getReadBuffer() will be called at least once before each call to + * readDataAvailable(). getReadBuffer() will also be called before any + * call to readEOF(). + * + * @param len The number of bytes placed in the buffer. + */ + + virtual void readDataAvailable(size_t len) noexcept = 0; + + /** + * When data becomes available, isBufferMovable() will be invoked to figure + * out which API will be used, readBufferAvailable() or + * readDataAvailable(). If isBufferMovable() returns true, that means + * ReadCallback supports the IOBuf ownership transfer and + * readBufferAvailable() will be used. Otherwise, not. + + * By default, isBufferMovable() always return false. If + * readBufferAvailable() is implemented and to be invoked, You should + * overwrite isBufferMovable() and return true in the inherited class. + * + * This method allows the AsyncSocket/AsyncSSLSocket do buffer allocation by + * itself until data becomes available. Compared with the pre/post buffer + * allocation in getReadBuffer()/readDataAvailabe(), readBufferAvailable() + * has two advantages. First, this can avoid memcpy. E.g., in + * AsyncSSLSocket, the decrypted data was copied from the openssl internal + * buffer to the readbuf buffer. With the buffer ownership transfer, the + * internal buffer can be directly "moved" to ReadCallback. Second, the + * memory allocation can be more precise. The reason is + * AsyncSocket/AsyncSSLSocket can allocate the memory of precise size + * because they have more context about the available data than + * ReadCallback. Think about the getReadBuffer() pre-allocate 4072 bytes + * buffer, but the available data is always 16KB (max OpenSSL record size). + */ + + virtual bool isBufferMovable() noexcept { return false; } + + /** + * Suggested buffer size, allocated for read operations, + * if callback is movable and supports folly::IOBuf + */ + + virtual size_t maxBufferSize() const { + return 64 * 1024; // 64K + } + + /** + * readBufferAvailable() will be invoked when data has been successfully + * read. + * + * Note that only either readBufferAvailable() or readDataAvailable() will + * be invoked according to the return value of isBufferMovable(). The timing + * and aftereffect of readBufferAvailable() are the same as + * readDataAvailable() + * + * @param readBuf The unique pointer of read buffer. + */ + + // virtual void readBufferAvailable(uint8_t** buffer, std::size_t + // *buf_length) noexcept {} + + virtual void readBufferAvailable( + utils::SharableVector &&buffer) noexcept {} + + // virtual void readBufferAvailable(utils::SharableBuffer&& buffer) + // noexcept {} + + /** + * readEOF() will be invoked when the transport is closed. + * + * The read callback will be automatically uninstalled immediately before + * readEOF() is invoked. + */ + virtual void readEOF() noexcept = 0; + + /** + * readError() will be invoked if an error occurs reading from the + * transport. + * + * The read callback will be automatically uninstalled immediately before + * readError() is invoked. + * + * @param ex An exception describing the error that occurred. + */ + virtual void readErr(const std::error_code ec) noexcept = 0; + }; + + // Read methods that aren't part of AsyncTransport. + virtual void setReadCB(ReadCallback *callback) = 0; + virtual ReadCallback *getReadCallback() const = 0; + + protected: + virtual ~AsyncReader() = default; +}; + +class AsyncWriter { + public: + class WriteCallback { + public: + virtual ~WriteCallback() = default; + + /** + * writeSuccess() will be invoked when all of the data has been + * successfully written. + * + * Note that this mainly signals that the buffer containing the data to + * write is no longer needed and may be freed or re-used. It does not + * guarantee that the data has been fully transmitted to the remote + * endpoint. For example, on socket-based transports, writeSuccess() only + * indicates that the data has been given to the kernel for eventual + * transmission. + */ + virtual void writeSuccess() noexcept = 0; + + /** + * writeError() will be invoked if an error occurs writing the data. + * + * @param bytesWritten The number of bytes that were successfull + * @param ex An exception describing the error that occurred. + */ + virtual void writeErr(size_t bytesWritten) noexcept = 0; + }; + + /** + * If you supply a non-null WriteCallback, exactly one of writeSuccess() + * or writeErr() will be invoked when the write completes. If you supply + * the same WriteCallback object for multiple write() calls, it will be + * invoked exactly once per call. The only way to cancel outstanding + * write requests is to close the socket (e.g., with closeNow() or + * shutdownWriteNow()). When closing the socket this way, writeErr() will + * still be invoked once for each outstanding write operation. + */ + virtual void write(WriteCallback *callback, const void *buf, size_t bytes, + const PublicationOptions &options, + WriteFlags flags = WriteFlags::NONE) = 0; + + /** + * If you supply a non-null WriteCallback, exactly one of writeSuccess() + * or writeErr() will be invoked when the write completes. If you supply + * the same WriteCallback object for multiple write() calls, it will be + * invoked exactly once per call. The only way to cancel outstanding + * write requests is to close the socket (e.g., with closeNow() or + * shutdownWriteNow()). When closing the socket this way, writeErr() will + * still be invoked once for each outstanding write operation. + */ + virtual void write(WriteCallback *callback, + utils::SharableVector &&output_buffer, + const PublicationOptions &options, + WriteFlags flags = WriteFlags::NONE) = 0; + + // /** + // * If you supply a non-null WriteCallback, exactly one of writeSuccess() + // * or writeErr() will be invoked when the write completes. If you supply + // * the same WriteCallback object for multiple write() calls, it will be + // * invoked exactly once per call. The only way to cancel outstanding + // * write requests is to close the socket (e.g., with closeNow() or + // * shutdownWriteNow()). When closing the socket this way, writeErr() will + // * still be invoked once for each outstanding write operation. + // */ + // virtual void writeChain( + // WriteCallback* callback, + // std::unique_ptr&& buf, + // WriteFlags flags = WriteFlags::NONE) = 0; + + virtual void setWriteCB(WriteCallback *callback) = 0; + virtual WriteCallback *getWriteCallback() const = 0; + + protected: + virtual ~AsyncWriter() = default; +}; + +} // namespace interface + +} // namespace transport \ No newline at end of file diff --git a/libtransport/src/hicn/transport/interfaces/full_duplex_socket.cc b/libtransport/src/hicn/transport/interfaces/full_duplex_socket.cc new file mode 100755 index 000000000..7b6342262 --- /dev/null +++ b/libtransport/src/hicn/transport/interfaces/full_duplex_socket.cc @@ -0,0 +1,490 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +#include + +namespace transport { + +namespace interface { + +static const std::string producer_identity = "producer_socket"; + +AsyncFullDuplexSocket::AsyncFullDuplexSocket(const Prefix &locator) + : AsyncFullDuplexSocket(locator, internal_io_service_) {} + +AsyncFullDuplexSocket::AsyncFullDuplexSocket(const Prefix &locator, + asio::io_service &io_service) + : locator_(locator), + incremental_suffix_(0), + io_service_(io_service), + work_(io_service), + producer_(std::make_unique(io_service_)), + consumer_(std::make_unique( + TransportProtocolAlgorithms::RAAQM /* , io_service_ */)), + read_callback_(nullptr), + write_callback_(nullptr), + connect_callback_(nullptr), + accept_callback_(nullptr), + internal_connect_callback_(new OnConnectCallback(*this)), + internal_signal_callback_(new OnSignalCallback(*this)), + send_timeout_milliseconds_(~0), + counters_({0}), + receive_buffer_(std::make_shared>()) { + using namespace transport; + using namespace std::placeholders; + producer_->registerPrefix(locator); + + producer_->setSocketOption( + ProducerCallbacksOptions::CACHE_MISS, + std::bind(&AsyncFullDuplexSocket::onControlInterest, this, _1, _2)); + + producer_->setSocketOption(GeneralTransportOptions::OUTPUT_BUFFER_SIZE, + uint32_t{150000}); + + producer_->setSocketOption( + ProducerCallbacksOptions::CONTENT_PRODUCED, + std::bind(&AsyncFullDuplexSocket::onContentProduced, this, _1, _2, _3)); + + producer_->connect(); + + consumer_->setSocketOption(ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY, + (ConsumerContentObjectVerificationCallback)[]( + ConsumerSocket & s, const ContentObject &c) + ->bool { return true; }); + + consumer_->setSocketOption( + ConsumerCallbacksOptions::CONTENT_RETRIEVED, + std::bind(&AsyncFullDuplexSocket::onContentRetrieved, this, _1, _2, _3)); + + consumer_->setSocketOption(GeneralTransportOptions::MAX_INTEREST_RETX, + uint32_t{4}); + + consumer_->connect(); +} + +void AsyncFullDuplexSocket::close() { + this->consumer_->stop(); + this->producer_->stop(); +} + +void AsyncFullDuplexSocket::closeNow() { close(); } + +void AsyncFullDuplexSocket::shutdownWrite() { producer_->stop(); } + +void AsyncFullDuplexSocket::shutdownWriteNow() { shutdownWrite(); } + +bool AsyncFullDuplexSocket::good() const { return true; } + +bool AsyncFullDuplexSocket::readable() const { + // TODO return status of consumer socket + return true; +} + +bool AsyncFullDuplexSocket::writable() const { + // TODO return status of producer socket + return true; +} + +bool AsyncFullDuplexSocket::isPending() const { + // TODO save if there are production operation in the ops queue + // in producer socket + return true; +} + +bool AsyncFullDuplexSocket::connected() const { + // No real connection here (ICN world). Return good + return good(); +} + +bool AsyncFullDuplexSocket::error() const { return !good(); } + +void AsyncFullDuplexSocket::setSendTimeout(uint32_t milliseconds) { + // TODO if production takes too much to complete + // let's abort the operation. + + // Normally with hicn this should be done for content + // pull, not for production. + + send_timeout_milliseconds_ = milliseconds; +} + +uint32_t AsyncFullDuplexSocket::getSendTimeout() const { + return send_timeout_milliseconds_; +} + +size_t AsyncFullDuplexSocket::getAppBytesWritten() const { + return counters_.app_bytes_written_; +} + +size_t AsyncFullDuplexSocket::getRawBytesWritten() const { return 0; } + +size_t AsyncFullDuplexSocket::getAppBytesReceived() const { + return counters_.app_bytes_read_; +} + +size_t AsyncFullDuplexSocket::getRawBytesReceived() const { return 0; } + +void AsyncFullDuplexSocket::connect(ConnectCallback *callback, + const core::Prefix &prefix) { + connect_callback_ = callback; + + // Create an interest for a subscription + auto interest = + core::Interest::Ptr(new core::Interest(prefix.makeRandomName())); + auto _payload = utils::MemBuf::create(sizeof(ActionMessage)); + _payload->append(sizeof(ActionMessage)); + auto payload = _payload->writableData(); + ActionMessage *subscription_message = + reinterpret_cast(payload); + subscription_message->header.msg_type = MessageType::ACTION; + subscription_message->action = Action::SUBSCRIBE; + subscription_message->header.reserved[0] = 0; + subscription_message->header.reserved[1] = 0; + + // Set the name the other part should use for notifying a content production + sync_notification_ = std::move(locator_.makeRandomName()); + sync_notification_.copyToDestination( + reinterpret_cast(subscription_message->name)); + + TRANSPORT_LOGI( + "Trying to connect. Sending interest: %s, name for notifications: %s", + prefix.getName().toString().c_str(), + sync_notification_.toString().c_str()); + + interest->setLifetime(1000); + interest->appendPayload(std::move(_payload)); + consumer_->asyncSendInterest(std::move(interest), + internal_connect_callback_.get()); +} + +void AsyncFullDuplexSocket::write(WriteCallback *callback, const void *buf, + size_t bytes, + const PublicationOptions &options, + WriteFlags flags) { + using namespace transport; + + // 1 asynchronously write the content. I assume here the + // buffer contains the whole application frame. FIXME: check + // if this is true and fix it accordingly + std::cout << "Size of the PAYLOAD: " << bytes << std::endl; + + if (bytes > core::Packet::default_mtu - sizeof(PayloadMessage)) { + TRANSPORT_LOGI("Producing content with name %s", + options.name.toString().c_str()); + producer_->asyncProduce(options.name, + reinterpret_cast(buf), bytes); + signalProductionToSubscribers(options.name); + } else { + TRANSPORT_LOGI("Sending payload through interest"); + piggybackPayloadToSubscribers( + options.name, reinterpret_cast(buf), bytes); + } +} + +void AsyncFullDuplexSocket::write( + WriteCallback *callback, utils::SharableVector &&output_buffer, + const PublicationOptions &options, WriteFlags flags) { + using namespace transport; + + // 1 asynchronously write the content. I assume here the + // buffer contains the whole application frame. FIXME: check + // if this is true and fix it accordingly + std::cout << "Size of the PAYLOAD: " << output_buffer.size() << std::endl; + + if (output_buffer.size() > + core::Packet::default_mtu - sizeof(PayloadMessage)) { + TRANSPORT_LOGI("Producing content with name %s", + options.name.toString().c_str()); + producer_->asyncProduce(options.name, std::move(output_buffer)); + signalProductionToSubscribers(options.name); + } else { + TRANSPORT_LOGI("Sending payload through interest"); + piggybackPayloadToSubscribers(options.name, &output_buffer[0], + output_buffer.size()); + } +} + +void AsyncFullDuplexSocket::piggybackPayloadToSubscribers( + const core::Name &name, const uint8_t *buffer, std::size_t bytes) { + for (auto &sub : subscribers_) { + auto interest = core::Interest::Ptr(new core::Interest(name)); + auto _payload = utils::MemBuf::create(bytes + sizeof(PayloadMessage)); + _payload->append(bytes + sizeof(PayloadMessage)); + auto payload = _payload->writableData(); + + PayloadMessage *interest_payload = + reinterpret_cast(payload); + interest_payload->header.msg_type = MessageType::PAYLOAD; + interest_payload->header.reserved[0] = 0; + interest_payload->header.reserved[1] = 0; + interest_payload->reserved[0] = 0; + std::memcpy(payload + sizeof(PayloadMessage), buffer, bytes); + interest->appendPayload(std::move(_payload)); + + // Set the timeout of 0.2 second + interest->setLifetime(1000); + interest->setName(sub); + interest->getWritableName().setSuffix(incremental_suffix_++); + // TRANSPORT_LOGI("Sending signalization to %s", + // interest->getName().toString().c_str()); + + consumer_->asyncSendInterest(std::move(interest), + internal_signal_callback_.get()); + } +} + +void AsyncFullDuplexSocket::signalProductionToSubscribers( + const core::Name &name) { + // Signal the other part we are producing a content + // Create an interest for a subscription + + for (auto &sub : subscribers_) { + auto interest = core::Interest::Ptr(new core::Interest(name)); + // Todo consider using preallocated pool of membufs + auto _payload = utils::MemBuf::create(sizeof(ActionMessage)); + _payload->append(sizeof(ActionMessage)); + auto payload = const_cast(interest->getPayload().data()); + + ActionMessage *produce_notification = + reinterpret_cast(payload); + produce_notification->header.msg_type = MessageType::ACTION; + produce_notification->action = Action::SIGNAL_PRODUCTION; + produce_notification->header.reserved[0] = 0; + produce_notification->header.reserved[1] = 0; + name.copyToDestination( + reinterpret_cast(produce_notification->name)); + interest->appendPayload(std::move(_payload)); + + // Set the timeout of 0.2 second + interest->setLifetime(1000); + interest->setName(sub); + interest->getWritableName().setSuffix(incremental_suffix_++); + // TRANSPORT_LOGI("Sending signalization to %s", + // interest->getName().toString().c_str()); + + consumer_->asyncSendInterest(std::move(interest), + internal_signal_callback_.get()); + } +} + +void AsyncFullDuplexSocket::waitForSubscribers(AcceptCallback *cb) { + accept_callback_ = cb; +} + +std::shared_ptr +AsyncFullDuplexSocket::decodeSynchronizationMessage( + const core::Interest &interest) { + auto mesg = interest.getPayload(); + const MessageHeader *header = + reinterpret_cast(mesg.data()); + + switch (header->msg_type) { + case MessageType::ACTION: { + // Check what is the action to perform + const ActionMessage *message = + reinterpret_cast(header); + + if (message->action == Action::SUBSCRIBE) { + // Add consumer to list on consumers to be notified + auto ret = + subscribers_.emplace(AF_INET6, (const uint8_t *)message->name, 0); + TRANSPORT_LOGI("Added subscriber %s :)", ret.first->toString().c_str()); + if (ret.second) { + accept_callback_->connectionAccepted(*ret.first); + } + + TRANSPORT_LOGI("Connection success!"); + + sync_notification_ = std::move(locator_.makeRandomName()); + return createSubscriptionResponse(sync_notification_); + + } else if (message->action == Action::CANCEL_SUBSCRIPTION) { + // XXX Modify name!!! Each allocated name allocates a 128 bit array. + subscribers_.erase( + core::Name(AF_INET6, (const uint8_t *)message->name, 0)); + return createAck(); + } else if (message->action == Action::SIGNAL_PRODUCTION) { + // trigger a reverse pull for the name contained in the message + core::Name n(AF_INET6, (const uint8_t *)message->name, 0); + std::cout << "PROD NOTIFICATION: Content to retrieve: " << n + << std::endl; + std::cout << "PROD NOTIFICATION: Interest name: " << interest.getName() + << std::endl; // << " compared to " << sync_notification_ << + // std::endl; + + if (sync_notification_.equals(interest.getName(), false)) { + std::cout << "Starting reverse pull for " << n << std::endl; + consumer_->asyncConsume(n, receive_buffer_); + return createAck(); + } + } else { + TRANSPORT_LOGE("Received unknown message. Dropping it."); + } + + break; + } + case MessageType::RESPONSE: { + throw errors::RuntimeException( + "The response should be a content object!!"); + } + case MessageType::PAYLOAD: { + // The interest contains the payload directly. + // We saved one round trip :) + + auto buffer = std::make_shared>(); + const uint8_t *data = mesg.data() + sizeof(PayloadMessage); + buffer->assign(data, data + mesg.length() - sizeof(PayloadMessage)); + read_callback_->readBufferAvailable(std::move(*buffer)); + return createAck(); + } + default: { + return std::shared_ptr(nullptr); + } + } + + return std::shared_ptr(nullptr); +} + +void AsyncFullDuplexSocket::onControlInterest(ProducerSocket &s, + const core::Interest &i) { + auto payload = i.getPayload(); + if (payload.length()) { + // Try to decode payload and see if starting an async pull operation + auto response = decodeSynchronizationMessage(i); + if (response) { + response->setName(i.getName()); + s.produce(*response); + } + } +} + +void AsyncFullDuplexSocket::onContentProduced(ProducerSocket &producer, + const std::error_code &ec, + uint64_t bytes_written) { + if (write_callback_) { + if (!ec) { + write_callback_->writeSuccess(); + } else { + write_callback_->writeErr(bytes_written); + } + } +} + +void AsyncFullDuplexSocket::onContentRetrieved(ConsumerSocket &s, + std::size_t size, + const std::error_code &ec) { + // Sanity check + if (size != receive_buffer_->size()) { + TRANSPORT_LOGE( + "Received content size differs from size retrieved from the buffer."); + return; + } + + TRANSPORT_LOGI("Received content with size %lu", size); + if (!ec) { + read_callback_->readBufferAvailable(std::move(*receive_buffer_)); + } else { + TRANSPORT_LOGE("Error retrieving content."); + } + // consumer_->stop(); +} + +void AsyncFullDuplexSocket::OnConnectCallback::onContentObject( + core::Interest::Ptr &&, core::ContentObject::Ptr &&content_object) { + // The ack message should contain the name to be used for notifying + // the production of the content to the other part + + if (content_object->getPayload().length() == 0) { + TRANSPORT_LOGW("Connection response message empty...."); + return; + } + + SubscriptionResponseMessage *response = + reinterpret_cast( + content_object->getPayload().writableData()); + + if (response->response.header.msg_type == MessageType::RESPONSE) { + if (response->response.return_code == ReturnCode::OK) { + auto ret = + socket_.subscribers_.emplace(AF_INET6, (uint8_t *)response->name, 0); + TRANSPORT_LOGI("Successfully connected!!!! Subscriber added: %s", + ret.first->toString().c_str()); + socket_.connect_callback_->connectSuccess(); + } + } +} + +void AsyncFullDuplexSocket::OnSignalCallback::onContentObject( + core::Interest::Ptr &&, core::ContentObject::Ptr &&content_object) { + return; +} + +void AsyncFullDuplexSocket::OnSignalCallback::onTimeout( + core::Interest::Ptr &&interest) { + TRANSPORT_LOGE("Retransmitting signalization interest to %s!!", + interest->getName().toString().c_str()); + socket_.consumer_->asyncSendInterest(std::move(interest), + socket_.internal_signal_callback_.get()); +} + +void AsyncFullDuplexSocket::OnConnectCallback::onTimeout( + core::Interest::Ptr &&interest) { + socket_.connect_callback_->connectErr( + std::make_error_code(std::errc::not_connected)); +} + +std::shared_ptr AsyncFullDuplexSocket::createAck() { + // Send the response back + core::Name name("b001::abcd"); + auto response = std::make_shared(name); + auto _payload = utils::MemBuf::create(sizeof(ActionMessage)); + _payload->append(sizeof(ResponseMessage)); + auto payload = response->getPayload().data(); + ResponseMessage *response_message = (ResponseMessage *)payload; + response_message->header.msg_type = MessageType::RESPONSE; + response_message->header.reserved[0] = 0; + response_message->header.reserved[1] = 0; + response_message->return_code = ReturnCode::OK; + response->appendPayload(std::move(_payload)); + response->setLifetime(0); + return response; +} + +std::shared_ptr +AsyncFullDuplexSocket::createSubscriptionResponse(const core::Name &name) { + // Send the response back + core::Name tmp_name("b001::abcd"); + auto response = std::make_shared(tmp_name); + auto _payload = utils::MemBuf::create(sizeof(SubscriptionResponseMessage)); + _payload->append(sizeof(SubscriptionResponseMessage)); + auto payload = _payload->data(); + SubscriptionResponseMessage *response_message = + (SubscriptionResponseMessage *)payload; + response_message->response.header.msg_type = MessageType::RESPONSE; + response_message->response.header.reserved[0] = 0; + response_message->response.header.reserved[1] = 0; + response_message->response.return_code = ReturnCode::OK; + name.copyToDestination(reinterpret_cast(response_message->name)); + response->appendPayload(std::move(_payload)); + response->setLifetime(0); + return response; +} + +} // namespace interface +} // namespace transport diff --git a/libtransport/src/hicn/transport/interfaces/full_duplex_socket.h b/libtransport/src/hicn/transport/interfaces/full_duplex_socket.h new file mode 100755 index 000000000..f881bea54 --- /dev/null +++ b/libtransport/src/hicn/transport/interfaces/full_duplex_socket.h @@ -0,0 +1,254 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * This class is created for sending/receiving data over an ICN network. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include +#include + +namespace transport { + +namespace interface { + +enum class MessageType : uint8_t { ACTION, RESPONSE, PAYLOAD }; + +enum class Action : uint8_t { + SUBSCRIBE, + CANCEL_SUBSCRIPTION, + SIGNAL_PRODUCTION, +}; + +enum class ReturnCode : uint8_t { + OK, + FAILED, +}; + +struct MessageHeader { + MessageType msg_type; + uint8_t reserved[2]; +}; + +struct ActionMessage { + MessageHeader header; + Action action; + uint64_t name[2]; +}; + +struct ResponseMessage { + MessageHeader header; + ReturnCode return_code; +}; + +struct SubscriptionResponseMessage { + ResponseMessage response; + uint64_t name[2]; +}; + +struct PayloadMessage { + MessageHeader header; + uint8_t reserved[1]; +}; + +// struct NotificationMessage { +// Action action; +// uint8_t reserved[3]; +// uint64_t +// } + +using core::Prefix; + +class AsyncFullDuplexSocket : public AsyncSocket, + public AsyncReader, + public AsyncWriter, + public AsyncAcceptor { + private: + struct Counters { + uint64_t app_bytes_written_; + uint64_t app_bytes_read_; + + TRANSPORT_ALWAYS_INLINE void updateBytesWritten(uint64_t bytes) { + app_bytes_written_ += bytes; + } + + TRANSPORT_ALWAYS_INLINE void updateBytesRead(uint64_t bytes) { + app_bytes_read_ += bytes; + } + }; + + public: + using UniquePtr = std::unique_ptr; + using SharedPtr = std::unique_ptr; + + AsyncFullDuplexSocket(const Prefix &locator, asio::io_service &io_service); + AsyncFullDuplexSocket(const core::Prefix &locator); + + ~AsyncFullDuplexSocket() { + TRANSPORT_LOGI("Adios AsyncFullDuplexSocket!!!"); + }; + + using ReadCallback = AsyncReader::ReadCallback; + using WriteCallback = AsyncWriter::WriteCallback; + + TRANSPORT_ALWAYS_INLINE void setReadCB(ReadCallback *callback) override { + read_callback_ = callback; + } + + TRANSPORT_ALWAYS_INLINE ReadCallback *getReadCallback() const override { + return read_callback_; + } + + TRANSPORT_ALWAYS_INLINE void setWriteCB(WriteCallback *callback) override { + write_callback_ = callback; + } + + TRANSPORT_ALWAYS_INLINE WriteCallback *getWriteCallback() const override { + return write_callback_; + } + + TRANSPORT_ALWAYS_INLINE const core::Prefix &getLocator() { return locator_; } + + void connect(ConnectCallback *callback, const core::Prefix &prefix) override; + + void write(WriteCallback *callback, const void *buf, size_t bytes, + const PublicationOptions &options, + WriteFlags flags = WriteFlags::NONE) override; + + virtual void write(WriteCallback *callback, + utils::SharableVector &&output_buffer, + const PublicationOptions &options, + WriteFlags flags = WriteFlags::NONE) override; + + void waitForSubscribers(AcceptCallback *cb) override; + + // void writev( + // WriteCallback* callback, + // const iovec* vec, + // size_t count, + // Name &&content_to_publish_name, + // WriteFlags flags = WriteFlags::NONE) override; + + void close() override; + + void closeNow() override; + + void shutdownWrite() override; + + void shutdownWriteNow() override; + + bool good() const override; + + bool readable() const override; + + bool writable() const override; + + bool isPending() const override; + + bool connected() const override; + + bool error() const override; + + void setSendTimeout(uint32_t milliseconds) override; + + size_t getAppBytesWritten() const override; + size_t getRawBytesWritten() const override; + size_t getAppBytesReceived() const override; + size_t getRawBytesReceived() const override; + + uint32_t getSendTimeout() const override; + + private: + std::shared_ptr decodeSynchronizationMessage( + const core::Interest &interest); + + class OnConnectCallback : public BasePortal::ConsumerCallback { + public: + OnConnectCallback(AsyncFullDuplexSocket &socket) : socket_(socket){}; + virtual ~OnConnectCallback() = default; + void onContentObject(core::Interest::Ptr &&, + core::ContentObject::Ptr &&content_object) override; + void onTimeout(core::Interest::Ptr &&interest) override; + + private: + AsyncFullDuplexSocket &socket_; + }; + + class OnSignalCallback : public BasePortal::ConsumerCallback { + public: + OnSignalCallback(AsyncFullDuplexSocket &socket) : socket_(socket){}; + virtual ~OnSignalCallback() = default; + void onContentObject(core::Interest::Ptr &&, + core::ContentObject::Ptr &&content_object); + void onTimeout(core::Interest::Ptr &&interest); + + private: + AsyncFullDuplexSocket &socket_; + }; + + void onControlInterest(ProducerSocket &s, const core::Interest &i); + void onContentProduced(ProducerSocket &producer, const std::error_code &ec, + uint64_t bytes_written); + void onContentRetrieved(ConsumerSocket &s, std::size_t size, + const std::error_code &ec); + + void signalProductionToSubscribers(const core::Name &name); + void piggybackPayloadToSubscribers(const core::Name &name, + const uint8_t *buffer, std::size_t bytes); + + std::shared_ptr createAck(); + std::shared_ptr createSubscriptionResponse( + const core::Name &name); + + core::Prefix locator_; + uint32_t incremental_suffix_; + core::Name sync_notification_; + // std::unique_ptr portal_; + asio::io_service internal_io_service_; + asio::io_service &io_service_; + asio::io_service::work work_; + + // These names represent the "locator" of a certain + // peer that subscribed to this. + std::unordered_set subscribers_; + + // Useful for publishing / Retrieving data + std::unique_ptr producer_; + std::unique_ptr consumer_; + + ReadCallback *read_callback_; + WriteCallback *write_callback_; + ConnectCallback *connect_callback_; + AcceptCallback *accept_callback_; + + std::unique_ptr internal_connect_callback_; + std::unique_ptr internal_signal_callback_; + + uint32_t send_timeout_milliseconds_; + struct Counters counters_; + std::shared_ptr> receive_buffer_; +}; + +} // namespace interface +} // namespace transport diff --git a/libtransport/src/hicn/transport/interfaces/publication_options.h b/libtransport/src/hicn/transport/interfaces/publication_options.h new file mode 100755 index 000000000..ae5366ce7 --- /dev/null +++ b/libtransport/src/hicn/transport/interfaces/publication_options.h @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +namespace transport { + +namespace interface { + +class PublicationOptions { + public: + core::Name name; + uint32_t content_lifetime_milliseconds; + // TODO Signature +}; +} // namespace interface + +} // namespace transport \ No newline at end of file diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_consumer.cc b/libtransport/src/hicn/transport/interfaces/rtc_socket_consumer.cc new file mode 100755 index 000000000..de3e84417 --- /dev/null +++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_consumer.cc @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +namespace transport { + +namespace interface { + +RTCConsumerSocket::RTCConsumerSocket(int protocol, asio::io_service &io_service) + : ConsumerSocket(protocol, io_service) {} + +RTCConsumerSocket::~RTCConsumerSocket() {} + +void RTCConsumerSocket::handleRTCPPacket(uint8_t *packet, size_t len) { + RTCTransportProtocol *transport = dynamic_cast( + ConsumerSocket::transport_protocol_.get()); + if (transport) transport->onRTCPPacket(packet, len); +} + +} // namespace interface + +} // end namespace transport diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_consumer.h b/libtransport/src/hicn/transport/interfaces/rtc_socket_consumer.h new file mode 100755 index 000000000..86ccf6e22 --- /dev/null +++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_consumer.h @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +namespace transport { + +namespace interface { + +class RTCConsumerSocket : public ConsumerSocket { + public: + explicit RTCConsumerSocket(int protocol, asio::io_service &io_service); + + ~RTCConsumerSocket(); + + void handleRTCPPacket(uint8_t *packet, size_t len); +}; + +} // namespace interface + +} // end namespace transport diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc new file mode 100755 index 000000000..d8a9d53b9 --- /dev/null +++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc @@ -0,0 +1,157 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +#define NACK_HEADER_SIZE 8 // bytes +#define TIMESTAMP_LEN 8 // bytes +#define TCP_HEADER_SIZE 20 +#define IP6_HEADER_SIZE 40 +#define INIT_PACKET_PRODUCTION_RATE 100 // pps random value (almost 1Mbps) +#define STATS_INTERVAL_DURATION 500 // ms +#define INTEREST_LIFETIME_REDUCTION_FACTOR 0.8 + +// NACK HEADER +// +-----------------------------------------+ +// | 4 bytes: current segment in production | +// +-----------------------------------------+ +// | 4 bytes: production rate (bytes x sec) | +// +-----------------------------------------+ +// may require additional field (Rate for multiple qualities, ...) +// + +namespace transport { + +namespace interface { + +RTCProducerSocket::RTCProducerSocket(asio::io_service &io_service) + : ProducerSocket(io_service), + currentSeg_(1), + nack_(std::make_shared()), + producedBytes_(0), + producedPackets_(0), + bytesProductionRate_(0), + packetsProductionRate_(INIT_PACKET_PRODUCTION_RATE), + perSecondFactor_(1000 / STATS_INTERVAL_DURATION) { + nack_->appendPayload(utils::MemBuf::create(NACK_HEADER_SIZE)); + lastStats_ = std::chrono::steady_clock::now(); + srand(time(NULL)); + prodLabel_ = ((rand() % 255) << 24UL); +} + +RTCProducerSocket::~RTCProducerSocket() {} + +void RTCProducerSocket::registerName(Prefix &producer_namespace) { + ProducerSocket::registerPrefix(producer_namespace); + + flowName_ = producer_namespace.getName(); + + if (flowName_.getType() == HNT_CONTIGUOUS_V4 || + flowName_.getType() == HNT_IOV_V4) { + headerSize_ = sizeof(hicn_v6_hdr_t::ip); + } else if (flowName_.getType() == HNT_CONTIGUOUS_V6 || + flowName_.getType() == HNT_IOV_V6) { + headerSize_ = sizeof(hicn_v4_hdr_t::ip); + } else { + throw errors::RuntimeException("Unknown name format."); + } + + headerSize_ += TCP_HEADER_SIZE; +} + +void RTCProducerSocket::updateStats(uint32_t packet_size) { + producedBytes_ += packet_size; + producedPackets_++; + std::chrono::steady_clock::duration duration = + std::chrono::steady_clock::now() - lastStats_; + if (std::chrono::duration_cast(duration).count() >= + STATS_INTERVAL_DURATION) { + lastStats_ = std::chrono::steady_clock::now(); + bytesProductionRate_ = producedBytes_ * perSecondFactor_; + packetsProductionRate_ = producedPackets_ * perSecondFactor_; + producedBytes_ = 0; + producedPackets_ = 0; + } +} + +void RTCProducerSocket::produce(const uint8_t *buf, size_t buffer_size) { + if (TRANSPORT_EXPECT_FALSE(buffer_size == 0)) { + return; + } + + if (TRANSPORT_EXPECT_FALSE((buffer_size + headerSize_ + TIMESTAMP_LEN) > + data_packet_size_)) { + return; + } + + updateStats(buffer_size + headerSize_ + TIMESTAMP_LEN); + + std::shared_ptr content_object = + std::make_shared(flowName_.setSuffix(currentSeg_)); + auto payload = utils::MemBuf::copyBuffer(buf, buffer_size, TIMESTAMP_LEN); + + // content_object->setLifetime(content_object_expiry_time_); + content_object->setLifetime(1000); // XXX this should be set by the APP + + uint64_t timestamp = std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count(); + + payload->prepend(TIMESTAMP_LEN); + uint8_t *payloadPointer = payload->writableData(); + *(uint64_t *)payloadPointer = timestamp; + content_object->appendPayload(std::move(payload)); + + content_object->setPathLabel(prodLabel_); + portal_->sendContentObject(*content_object); + + currentSeg_++; +} + +void RTCProducerSocket::onInterest(Interest::Ptr &&interest) { + uint32_t interestSeg = interest->getName().getSuffix(); + uint32_t lifetime = interest->getLifetime(); + uint32_t max_gap; + + // XXX + // packetsProductionRate_ is modified by another thread in updateStats + // this should be safe since I just read here. but, you never know. + max_gap = + floor((double)((double)((double)lifetime * + INTEREST_LIFETIME_REDUCTION_FACTOR / 1000.0) * + (double)packetsProductionRate_)); + + if (interestSeg < currentSeg_ || interestSeg > (max_gap + currentSeg_)) { + sendNack(*interest); + } + // else drop packet +} + +void RTCProducerSocket::sendNack(const Interest &interest) { + nack_->setName(interest.getName()); + uint32_t *payload_ptr = (uint32_t *)nack_->getPayload().data(); + *payload_ptr = currentSeg_; + *(++payload_ptr) = bytesProductionRate_; + + nack_->setLifetime(0); + nack_->setPathLabel(prodLabel_); + portal_->sendContentObject(*nack_); +} + +} // namespace interface + +} // end namespace transport diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h new file mode 100755 index 000000000..1a42bdc56 --- /dev/null +++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h @@ -0,0 +1,60 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include +#include + +namespace transport { + +namespace interface { + +class RTCProducerSocket : public ProducerSocket { + public: + RTCProducerSocket(asio::io_service &io_service); + ~RTCProducerSocket(); + + void registerName(Prefix &producer_namespace); + + void produce(const uint8_t *buffer, size_t buffer_size); + + void onInterest(Interest::Ptr &&interest) override; + + private: + void sendNack(const Interest &interest); + void updateStats(uint32_t packet_size); + + // std::map pendingInterests_; + uint32_t currentSeg_; + uint32_t prodLabel_; + uint16_t headerSize_; + Name flowName_; + // bool produceInSynch_; + std::shared_ptr nack_; + uint32_t producedBytes_; + uint32_t producedPackets_; + uint32_t bytesProductionRate_; + uint32_t packetsProductionRate_; + uint32_t perSecondFactor_; + std::chrono::steady_clock::time_point lastStats_; +}; + +} // namespace interface + +} // end namespace transport diff --git a/libtransport/src/hicn/transport/interfaces/socket.h b/libtransport/src/hicn/transport/interfaces/socket.h new file mode 100755 index 000000000..22757810a --- /dev/null +++ b/libtransport/src/hicn/transport/interfaces/socket.h @@ -0,0 +1,270 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define SOCKET_OPTION_GET 0 +#define SOCKET_OPTION_NOT_GET 1 +#define SOCKET_OPTION_SET 2 +#define SOCKET_OPTION_NOT_SET 3 +#define SOCKET_OPTION_DEFAULT 12345 + +#define VOID_HANDLER 0 + +namespace transport { + +namespace protocol { +class IcnObserver; +} + +namespace interface { + +template +class Socket; +class ConsumerSocket; +class ProducerSocket; + +// using Interest = core::Interest; +// using ContentObject = core::ContentObject; +// using Name = core::Name; +// using HashAlgorithm = core::HashAlgorithm; +// using CryptoSuite = utils::CryptoSuite; +// using Identity = utils::Identity; +// using Verifier = utils::Verifier; + +using HicnForwarderPortal = core::HicnForwarderPortal; + +#ifdef __linux__ +#ifndef __ANDROID__ +using RawSocketPortal = core::RawSocketPortal; +#endif +#endif + +#ifdef __vpp__ +using VPPForwarderPortal = core::VPPForwarderPortal; +using BaseSocket = Socket; +using BasePortal = VPPForwarderPortal; +#else +using BaseSocket = Socket; +using BasePortal = HicnForwarderPortal; +#endif + +using PayloadType = core::PayloadType; +using Prefix = core::Prefix; +using Array = utils::Array; + +using ConsumerInterestCallback = + std::function; + +using ConsumerContentCallback = + std::function; + +using ConsumerTimerCallback = + std::function; + +using ProducerContentCallback = std::function; + +using ConsumerContentObjectCallback = + std::function; + +using ConsumerContentObjectVerificationCallback = + std::function; + +using ConsumerManifestCallback = + std::function; + +using ProducerContentObjectCallback = + std::function; + +using ProducerInterestCallback = + std::function; + +using ProducerInterestCallback = + std::function; + +using namespace protocol; + +template +class Socket { + static_assert(std::is_same::value +#ifdef __linux__ +#ifndef __ANDROID__ + || std::is_same::value +#ifdef __vpp__ + || std::is_same::value +#endif +#endif + , +#else + , + +#endif + "This class is not allowed as Portal"); + + public: + using Portal = PortalType; + + virtual asio::io_service &getIoService() = 0; + + virtual void connect() = 0; + + virtual int setSocketOption(int socket_option_key, + uint32_t socket_option_value) = 0; + + virtual int setSocketOption(int socket_option_key, + double socket_option_value) = 0; + + virtual int setSocketOption(int socket_option_key, + bool socket_option_value) = 0; + + virtual int setSocketOption(int socket_option_key, + core::Name socket_option_value) = 0; + + virtual int setSocketOption(int socket_option_key, + std::list socket_option_value) = 0; + + virtual int setSocketOption( + int socket_option_key, + ProducerContentObjectCallback socket_option_value) = 0; + + virtual int setSocketOption(int socket_option_key, + ProducerInterestCallback socket_option_value) = 0; + + virtual int setSocketOption(int socket_option_key, + ProducerContentCallback socket_option_value) = 0; + + virtual int setSocketOption( + int socket_option_key, + ConsumerContentObjectVerificationCallback socket_option_value) = 0; + + virtual int setSocketOption( + int socket_option_key, + ConsumerContentObjectCallback socket_option_value) = 0; + + virtual int setSocketOption(int socket_option_key, + ConsumerInterestCallback socket_option_value) = 0; + + virtual int setSocketOption(int socket_option_key, + ConsumerContentCallback socket_option_value) = 0; + + virtual int setSocketOption(int socket_option_key, + ConsumerManifestCallback socket_option_value) = 0; + + virtual int setSocketOption(int socket_option_key, + IcnObserver *socket_option_value) = 0; + + virtual int setSocketOption(int socket_option_key, + core::HashAlgorithm socket_option_value) = 0; + + virtual int setSocketOption(int socket_option_key, + utils::CryptoSuite socket_option_value) = 0; + + virtual int setSocketOption(int socket_option_key, + const utils::Identity &socket_option_value) = 0; + + virtual int setSocketOption(int socket_option_key, + ConsumerTimerCallback socket_option_value) = 0; + + virtual int setSocketOption(int socket_option_key, + const std::string &socket_option_value) = 0; + + virtual int getSocketOption(int socket_option_key, + uint32_t &socket_option_value) = 0; + + virtual int getSocketOption(int socket_option_key, + double &socket_option_value) = 0; + + virtual int getSocketOption(int socket_option_key, + bool &socket_option_value) = 0; + + virtual int getSocketOption(int socket_option_key, + core::Name &socket_option_value) = 0; + + virtual int getSocketOption(int socket_option_key, + std::list &socket_option_value) = 0; + + virtual int getSocketOption( + int socket_option_key, + ProducerContentObjectCallback &socket_option_value) = 0; + + virtual int getSocketOption( + int socket_option_key, ProducerInterestCallback &socket_option_value) = 0; + + virtual int getSocketOption( + int socket_option_key, + ConsumerContentObjectVerificationCallback &socket_option_value) = 0; + + virtual int getSocketOption( + int socket_option_key, + ConsumerContentObjectCallback &socket_option_value) = 0; + + virtual int getSocketOption( + int socket_option_key, ConsumerInterestCallback &socket_option_value) = 0; + + virtual int getSocketOption(int socket_option_key, + ConsumerContentCallback &socket_option_value) = 0; + + virtual int getSocketOption( + int socket_option_key, ConsumerManifestCallback &socket_option_value) = 0; + + virtual int getSocketOption(int socket_option_key, + ProducerContentCallback &socket_option_value) = 0; + + virtual int getSocketOption(int socket_option_key, + std::shared_ptr &socket_option_value) = 0; + + virtual int getSocketOption(int socket_option_key, + IcnObserver **socket_option_value) = 0; + + virtual int getSocketOption(int socket_option_key, + core::HashAlgorithm &socket_option_value) = 0; + + virtual int getSocketOption(int socket_option_key, + utils::CryptoSuite &socket_option_value) = 0; + + virtual int getSocketOption(int socket_option_key, + utils::Identity &socket_option_value) = 0; + + virtual int getSocketOption(int socket_option_key, + std::string &socket_option_value) = 0; + + virtual int getSocketOption(int socket_option_key, + ConsumerTimerCallback &socket_option_value) = 0; + + protected: + virtual ~Socket(){}; + + protected: + std::string output_interface_; +}; + +} // namespace interface + +} // namespace transport diff --git a/libtransport/src/hicn/transport/interfaces/socket_consumer.cc b/libtransport/src/hicn/transport/interfaces/socket_consumer.cc new file mode 100755 index 000000000..8109d0e99 --- /dev/null +++ b/libtransport/src/hicn/transport/interfaces/socket_consumer.cc @@ -0,0 +1,735 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +namespace transport { + +namespace interface { + +ConsumerSocket::ConsumerSocket(int protocol) + : ConsumerSocket(protocol, internal_io_service_) {} + +ConsumerSocket::ConsumerSocket(int protocol, asio::io_service &io_service) + : io_service_(io_service), + portal_(std::make_shared(io_service_)), + async_downloader_(), + interest_lifetime_(default_values::interest_lifetime), + min_window_size_(default_values::min_window_size), + max_window_size_(default_values::max_window_size), + current_window_size_(-1), + max_retransmissions_( + default_values::transport_protocol_max_retransmissions), + /****** RAAQM Parameters ******/ + minimum_drop_probability_(default_values::minimum_drop_probability), + sample_number_(default_values::sample_number), + gamma_(default_values::gamma_value), + beta_(default_values::beta_value), + drop_factor_(default_values::drop_factor), + /****** END RAAQM Parameters ******/ + rate_estimation_alpha_(default_values::rate_alpha), + rate_estimation_observer_(nullptr), + rate_estimation_choice_(0), + is_async_(false), + verify_signature_(false), + content_buffer_(nullptr), + on_interest_output_(VOID_HANDLER), + on_interest_timeout_(VOID_HANDLER), + on_interest_satisfied_(VOID_HANDLER), + on_content_object_input_(VOID_HANDLER), + on_content_object_verification_(VOID_HANDLER), + on_content_object_(VOID_HANDLER), + on_manifest_(VOID_HANDLER), + on_payload_retrieved_(VOID_HANDLER), + virtual_download_(false), + rtt_stats_(false), + timer_(portal_->getIoService()), + timer_interval_milliseconds_(0) { + switch (protocol) { + case TransportProtocolAlgorithms::VEGAS: + transport_protocol_ = std::make_shared(this); + break; + case TransportProtocolAlgorithms::CBR: + transport_protocol_ = std::make_shared(this); + break; + case TransportProtocolAlgorithms::RTC: + transport_protocol_ = std::make_shared(this); + break; + case TransportProtocolAlgorithms::RAAQM: + default: + transport_protocol_ = std::make_shared(this); + break; + } +} + +ConsumerSocket::~ConsumerSocket() { + stop(); + + async_downloader_.stop(); + + transport_protocol_.reset(); + portal_.reset(); +} + +void ConsumerSocket::connect() { portal_->connect(); } + +int ConsumerSocket::consume(const Name &name, + utils::SharableVector &receive_buffer) { + if (transport_protocol_->isRunning()) { + return CONSUMER_BUSY; + } + + content_buffer_ = receive_buffer.shared_from_this(); + + network_name_ = name; + network_name_.setSuffix(0); + is_async_ = false; + + transport_protocol_->start(receive_buffer); + + return CONSUMER_READY; +} + +int ConsumerSocket::asyncConsume( + const Name &name, + std::shared_ptr> receive_buffer) { + // XXX Try to move the name here, instead of copying it!! + if (!async_downloader_.stopped()) { + async_downloader_.add([this, receive_buffer, name]() { + network_name_ = std::move(name); + network_name_.setSuffix(0); + is_async_ = true; + transport_protocol_->start(*receive_buffer); + }); + } + + return CONSUMER_READY; +} + +void ConsumerSocket::asyncSendInterest(Interest::Ptr &&interest, + Portal::ConsumerCallback *callback) { + if (!async_downloader_.stopped()) { + // TODO Workaround, to be fixed! + auto i = interest.release(); + async_downloader_.add([this, i, callback]() mutable { + Interest::Ptr _interest(i); + portal_->setConsumerCallback(callback); + portal_->sendInterest(std::move(_interest)); + portal_->runEventsLoop(); + }); + } +} + +void ConsumerSocket::stop() { + if (transport_protocol_->isRunning()) { + transport_protocol_->stop(); + } + + //is_running_ = false; +} + +void ConsumerSocket::resume() { + if(!transport_protocol_->isRunning()){ + transport_protocol_->resume(); + } +} + +asio::io_service &ConsumerSocket::getIoService() { + return portal_->getIoService(); +} + +int ConsumerSocket::setSocketOption(int socket_option_key, + double socket_option_value) { + switch (socket_option_key) { + case MIN_WINDOW_SIZE: + min_window_size_ = socket_option_value; + return SOCKET_OPTION_SET; + + case MAX_WINDOW_SIZE: + max_window_size_ = socket_option_value; + return SOCKET_OPTION_SET; + + case CURRENT_WINDOW_SIZE: + current_window_size_ = socket_option_value; + return SOCKET_OPTION_SET; + + case GAMMA_VALUE: + gamma_ = socket_option_value; + return SOCKET_OPTION_SET; + + case BETA_VALUE: + beta_ = socket_option_value; + return SOCKET_OPTION_SET; + + case DROP_FACTOR: + drop_factor_ = socket_option_value; + return SOCKET_OPTION_SET; + + case MINIMUM_DROP_PROBABILITY: + minimum_drop_probability_ = socket_option_value; + return SOCKET_OPTION_SET; + + case RATE_ESTIMATION_ALPHA: + if (socket_option_value >= 0 && socket_option_value < 1) { + rate_estimation_alpha_ = socket_option_value; + } else { + rate_estimation_alpha_ = ALPHA; + } + return SOCKET_OPTION_SET; + default: + return SOCKET_OPTION_NOT_SET; + } +} + +int ConsumerSocket::setSocketOption(int socket_option_key, + uint32_t socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::INPUT_BUFFER_SIZE: + input_buffer_size_ = socket_option_value; + return SOCKET_OPTION_SET; + + case GeneralTransportOptions::OUTPUT_BUFFER_SIZE: + output_buffer_size_ = socket_option_value; + return SOCKET_OPTION_SET; + + case GeneralTransportOptions::MAX_INTEREST_RETX: + max_retransmissions_ = socket_option_value; + return SOCKET_OPTION_SET; + + case GeneralTransportOptions::INTEREST_LIFETIME: + interest_lifetime_ = socket_option_value; + return SOCKET_OPTION_SET; + + case ConsumerCallbacksOptions::INTEREST_RETRANSMISSION: + if (socket_option_value == VOID_HANDLER) { + on_interest_retransmission_ = VOID_HANDLER; + return SOCKET_OPTION_SET; + } + + case ConsumerCallbacksOptions::INTEREST_EXPIRED: + if (socket_option_value == VOID_HANDLER) { + on_interest_timeout_ = VOID_HANDLER; + return SOCKET_OPTION_SET; + } + + case ConsumerCallbacksOptions::INTEREST_SATISFIED: + if (socket_option_value == VOID_HANDLER) { + on_interest_satisfied_ = VOID_HANDLER; + return SOCKET_OPTION_SET; + } + + case ConsumerCallbacksOptions::INTEREST_OUTPUT: + if (socket_option_value == VOID_HANDLER) { + on_interest_output_ = VOID_HANDLER; + return SOCKET_OPTION_SET; + } + + case ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT: + if (socket_option_value == VOID_HANDLER) { + on_content_object_input_ = VOID_HANDLER; + return SOCKET_OPTION_SET; + } + + case ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY: + if (socket_option_value == VOID_HANDLER) { + on_content_object_verification_ = VOID_HANDLER; + return SOCKET_OPTION_SET; + } + + case ConsumerCallbacksOptions::CONTENT_RETRIEVED: + if (socket_option_value == VOID_HANDLER) { + on_payload_retrieved_ = VOID_HANDLER; + return SOCKET_OPTION_SET; + } + + case RateEstimationOptions::RATE_ESTIMATION_BATCH_PARAMETER: + if (socket_option_value > 0) { + rate_estimation_batching_parameter_ = socket_option_value; + } else { + rate_estimation_batching_parameter_ = BATCH; + } + return SOCKET_OPTION_SET; + + case RateEstimationOptions::RATE_ESTIMATION_CHOICE: + if (socket_option_value > 0) { + rate_estimation_choice_ = socket_option_value; + } else { + rate_estimation_choice_ = RATE_CHOICE; + } + return SOCKET_OPTION_SET; + + case GeneralTransportOptions::TIMER_INTERVAL: + timer_interval_milliseconds_ = socket_option_value; + TRANSPORT_LOGD("Ok set %d", timer_interval_milliseconds_); + return SOCKET_OPTION_SET; + + default: + return SOCKET_OPTION_NOT_SET; + } +} + +int ConsumerSocket::setSocketOption(int socket_option_key, + bool socket_option_value) { + switch (socket_option_key) { + case OtherOptions::VIRTUAL_DOWNLOAD: + virtual_download_ = socket_option_value; + return SOCKET_OPTION_SET; + + case RaaqmTransportOptions::RTT_STATS: + rtt_stats_ = socket_option_value; + return SOCKET_OPTION_SET; + + case GeneralTransportOptions::VERIFY_SIGNATURE: + verify_signature_ = socket_option_value; + return SOCKET_OPTION_SET; + + default: + return SOCKET_OPTION_NOT_SET; + } +} + +int ConsumerSocket::setSocketOption(int socket_option_key, + Name socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::NETWORK_NAME: + network_name_ = socket_option_value; + return SOCKET_OPTION_SET; + default: + return SOCKET_OPTION_NOT_SET; + } +} + +int ConsumerSocket::setSocketOption(int socket_option_key, + std::list socket_option_value) { + return SOCKET_OPTION_NOT_SET; +} + +int ConsumerSocket::setSocketOption( + int socket_option_key, ConsumerContentObjectCallback socket_option_value) { + switch (socket_option_key) { + case ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT: + on_content_object_input_ = socket_option_value; + ; + return SOCKET_OPTION_SET; + + default: + return SOCKET_OPTION_NOT_SET; + } +} + +int ConsumerSocket::setSocketOption( + int socket_option_key, ProducerContentObjectCallback socket_option_value) { + return SOCKET_OPTION_NOT_SET; +} + +int ConsumerSocket::setSocketOption( + int socket_option_key, + ConsumerContentObjectVerificationCallback socket_option_value) { + switch (socket_option_key) { + case ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY: + on_content_object_verification_ = socket_option_value; + return SOCKET_OPTION_SET; + + default: + return SOCKET_OPTION_NOT_SET; + } +} + +int ConsumerSocket::setSocketOption( + int socket_option_key, ConsumerInterestCallback socket_option_value) { + switch (socket_option_key) { + case ConsumerCallbacksOptions::INTEREST_RETRANSMISSION: + on_interest_retransmission_ = socket_option_value; + return SOCKET_OPTION_SET; + + case ConsumerCallbacksOptions::INTEREST_OUTPUT: + on_interest_output_ = socket_option_value; + return SOCKET_OPTION_SET; + + case ConsumerCallbacksOptions::INTEREST_EXPIRED: + on_interest_timeout_ = socket_option_value; + return SOCKET_OPTION_SET; + + case ConsumerCallbacksOptions::INTEREST_SATISFIED: + on_interest_satisfied_ = socket_option_value; + return SOCKET_OPTION_SET; + + default: + return SOCKET_OPTION_NOT_SET; + } +} + +int ConsumerSocket::setSocketOption( + int socket_option_key, ProducerInterestCallback socket_option_value) { + return SOCKET_OPTION_NOT_SET; +} + +int ConsumerSocket::setSocketOption( + int socket_option_key, ConsumerContentCallback socket_option_value) { + switch (socket_option_key) { + case ConsumerCallbacksOptions::CONTENT_RETRIEVED: + on_payload_retrieved_ = socket_option_value; + return SOCKET_OPTION_SET; + + default: + return SOCKET_OPTION_NOT_SET; + } +} + +int ConsumerSocket::setSocketOption( + int socket_option_key, ConsumerManifestCallback socket_option_value) { + switch (socket_option_key) { + case ConsumerCallbacksOptions::MANIFEST_INPUT: + on_manifest_ = socket_option_value; + return SOCKET_OPTION_SET; + + default: + return SOCKET_OPTION_NOT_SET; + } +} + +int ConsumerSocket::setSocketOption( + int socket_option_key, ProducerContentCallback socket_option_value) { + return SOCKET_OPTION_NOT_SET; +} + +int ConsumerSocket::setSocketOption(int socket_option_key, + IcnObserver *socket_option_value) { + if (socket_option_key == RateEstimationOptions::RATE_ESTIMATION_OBSERVER) { + rate_estimation_observer_ = socket_option_value; + return SOCKET_OPTION_SET; + } + + return SOCKET_OPTION_NOT_SET; +} + +int ConsumerSocket::setSocketOption(int socket_option_key, + HashAlgorithm socket_option_value) { + return SOCKET_OPTION_NOT_SET; +} + +int ConsumerSocket::setSocketOption(int socket_option_key, + utils::CryptoSuite socket_option_value) { + return SOCKET_OPTION_NOT_SET; +} + +int ConsumerSocket::setSocketOption( + int socket_option_key, const utils::Identity &socket_option_value) { + return SOCKET_OPTION_NOT_SET; +} + +int ConsumerSocket::setSocketOption(int socket_option_key, + const std::string &socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::CERTIFICATE: + key_id_ = verifier_.addKeyFromCertificate(socket_option_value); + + if (key_id_ != nullptr) { + return SOCKET_OPTION_SET; + } + + break; + + case DataLinkOptions::OUTPUT_INTERFACE: + output_interface_ = socket_option_value; + portal_->setOutputInterface(output_interface_); + return SOCKET_OPTION_SET; + } + + return SOCKET_OPTION_NOT_SET; +} + +int ConsumerSocket::setSocketOption(int socket_option_key, + ConsumerTimerCallback socket_option_value) { + switch (socket_option_key) { + case ConsumerCallbacksOptions::TIMER_EXPIRES: + on_timer_expires_ = socket_option_value; + return SOCKET_OPTION_SET; + } + + return SOCKET_OPTION_NOT_SET; +} + +int ConsumerSocket::getSocketOption(int socket_option_key, + double &socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::MIN_WINDOW_SIZE: + socket_option_value = min_window_size_; + return SOCKET_OPTION_GET; + + case GeneralTransportOptions::MAX_WINDOW_SIZE: + socket_option_value = max_window_size_; + return SOCKET_OPTION_GET; + + case GeneralTransportOptions::CURRENT_WINDOW_SIZE: + socket_option_value = current_window_size_; + return SOCKET_OPTION_GET; + + // RAAQM parameters + + case RaaqmTransportOptions::GAMMA_VALUE: + socket_option_value = gamma_; + return SOCKET_OPTION_GET; + + case RaaqmTransportOptions::BETA_VALUE: + socket_option_value = beta_; + return SOCKET_OPTION_GET; + + case RaaqmTransportOptions::DROP_FACTOR: + socket_option_value = drop_factor_; + return SOCKET_OPTION_GET; + + case RaaqmTransportOptions::MINIMUM_DROP_PROBABILITY: + socket_option_value = minimum_drop_probability_; + return SOCKET_OPTION_GET; + + case RateEstimationOptions::RATE_ESTIMATION_ALPHA: + socket_option_value = rate_estimation_alpha_; + return SOCKET_OPTION_GET; + default: + return SOCKET_OPTION_NOT_GET; + } +} + +int ConsumerSocket::getSocketOption(int socket_option_key, + uint32_t &socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::INPUT_BUFFER_SIZE: + socket_option_value = input_buffer_size_; + return SOCKET_OPTION_GET; + + case GeneralTransportOptions::OUTPUT_BUFFER_SIZE: + socket_option_value = output_buffer_size_; + return SOCKET_OPTION_GET; + + case GeneralTransportOptions::MAX_INTEREST_RETX: + socket_option_value = max_retransmissions_; + return SOCKET_OPTION_GET; + + case GeneralTransportOptions::INTEREST_LIFETIME: + socket_option_value = interest_lifetime_; + return SOCKET_OPTION_GET; + + case RaaqmTransportOptions::SAMPLE_NUMBER: + socket_option_value = sample_number_; + return SOCKET_OPTION_GET; + + case RateEstimationOptions::RATE_ESTIMATION_BATCH_PARAMETER: + socket_option_value = rate_estimation_batching_parameter_; + return SOCKET_OPTION_GET; + + case RateEstimationOptions::RATE_ESTIMATION_CHOICE: + socket_option_value = rate_estimation_choice_; + return SOCKET_OPTION_GET; + + default: + return SOCKET_OPTION_NOT_GET; + } +} + +int ConsumerSocket::getSocketOption(int socket_option_key, + bool &socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::ASYNC_MODE: + socket_option_value = is_async_; + return SOCKET_OPTION_GET; + + case GeneralTransportOptions::RUNNING: + socket_option_value = transport_protocol_->isRunning(); + return SOCKET_OPTION_GET; + + case OtherOptions::VIRTUAL_DOWNLOAD: + socket_option_value = virtual_download_; + return SOCKET_OPTION_GET; + + case RaaqmTransportOptions::RTT_STATS: + socket_option_value = rtt_stats_; + return SOCKET_OPTION_GET; + + case GeneralTransportOptions::VERIFY_SIGNATURE: + socket_option_value = verify_signature_; + return SOCKET_OPTION_GET; + + default: + return SOCKET_OPTION_NOT_GET; + } +} + +int ConsumerSocket::getSocketOption(int socket_option_key, + Name &socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::NETWORK_NAME: + socket_option_value = network_name_; + return SOCKET_OPTION_GET; + default: + return SOCKET_OPTION_NOT_GET; + } +} + +int ConsumerSocket::getSocketOption(int socket_option_key, + std::list &socket_option_value) { + return SOCKET_OPTION_NOT_GET; +} + +int ConsumerSocket::getSocketOption( + int socket_option_key, ConsumerContentObjectCallback &socket_option_value) { + switch (socket_option_key) { + case ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT: + socket_option_value = on_content_object_input_; + return SOCKET_OPTION_GET; + + default: + return SOCKET_OPTION_NOT_GET; + } +} + +int ConsumerSocket::getSocketOption( + int socket_option_key, ProducerContentObjectCallback &socket_option_value) { + return SOCKET_OPTION_NOT_GET; +} + +int ConsumerSocket::getSocketOption( + int socket_option_key, + ConsumerContentObjectVerificationCallback &socket_option_value) { + switch (socket_option_key) { + case ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY: + socket_option_value = on_content_object_verification_; + return SOCKET_OPTION_GET; + + default: + return SOCKET_OPTION_NOT_GET; + } +} + +int ConsumerSocket::getSocketOption( + int socket_option_key, ConsumerInterestCallback &socket_option_value) { + switch (socket_option_key) { + case ConsumerCallbacksOptions::INTEREST_RETRANSMISSION: + socket_option_value = on_interest_retransmission_; + return SOCKET_OPTION_GET; + + case ConsumerCallbacksOptions::INTEREST_OUTPUT: + socket_option_value = on_interest_output_; + return SOCKET_OPTION_GET; + + case ConsumerCallbacksOptions::INTEREST_EXPIRED: + socket_option_value = on_interest_timeout_; + return SOCKET_OPTION_GET; + + case ConsumerCallbacksOptions::INTEREST_SATISFIED: + socket_option_value = on_interest_satisfied_; + return SOCKET_OPTION_GET; + + default: + return SOCKET_OPTION_NOT_GET; + } +} + +int ConsumerSocket::getSocketOption( + int socket_option_key, ProducerInterestCallback &socket_option_value) { + return SOCKET_OPTION_NOT_GET; +} + +int ConsumerSocket::getSocketOption( + int socket_option_key, ConsumerContentCallback &socket_option_value) { + switch (socket_option_key) { + case ConsumerCallbacksOptions::CONTENT_RETRIEVED: + socket_option_value = on_payload_retrieved_; + return SOCKET_OPTION_GET; + + default: + return SOCKET_OPTION_NOT_GET; + } +} + +int ConsumerSocket::getSocketOption( + int socket_option_key, ConsumerManifestCallback &socket_option_value) { + switch (socket_option_key) { + case ConsumerCallbacksOptions::MANIFEST_INPUT: + socket_option_value = on_manifest_; + return SOCKET_OPTION_GET; + + default: + return SOCKET_OPTION_NOT_GET; + } +} + +int ConsumerSocket::getSocketOption( + int socket_option_key, std::shared_ptr &socket_option_value) { + switch (socket_option_key) { + case PORTAL: + socket_option_value = portal_; + return SOCKET_OPTION_GET; + + default: + return SOCKET_OPTION_NOT_GET; + } +} + +int ConsumerSocket::getSocketOption(int socket_option_key, + IcnObserver **socket_option_value) { + if (socket_option_key == RATE_ESTIMATION_OBSERVER) { + *socket_option_value = (rate_estimation_observer_); + return SOCKET_OPTION_GET; + } + + return SOCKET_OPTION_NOT_GET; +} + +int ConsumerSocket::getSocketOption(int socket_option_key, + HashAlgorithm &socket_option_value) { + return SOCKET_OPTION_NOT_GET; +} + +int ConsumerSocket::getSocketOption(int socket_option_key, + utils::CryptoSuite &socket_option_value) { + return SOCKET_OPTION_NOT_GET; +} + +int ConsumerSocket::getSocketOption(int socket_option_key, + utils::Identity &socket_option_value) { + return SOCKET_OPTION_NOT_GET; +} + +int ConsumerSocket::getSocketOption( + int socket_option_key, ProducerContentCallback &socket_option_value) { + return SOCKET_OPTION_NOT_GET; +} + +int ConsumerSocket::getSocketOption(int socket_option_key, + std::string &socket_option_value) { + switch (socket_option_key) { + case DataLinkOptions::OUTPUT_INTERFACE: + socket_option_value = output_interface_; + return SOCKET_OPTION_GET; + } + + return SOCKET_OPTION_NOT_GET; +} + +int ConsumerSocket::getSocketOption( + int socket_option_key, ConsumerTimerCallback &socket_option_value) { + switch (socket_option_key) { + case ConsumerCallbacksOptions::TIMER_EXPIRES: + socket_option_value = on_timer_expires_; + return SOCKET_OPTION_GET; + } + + return SOCKET_OPTION_NOT_GET; +} + +} // namespace interface + +} // end namespace transport diff --git a/libtransport/src/hicn/transport/interfaces/socket_consumer.h b/libtransport/src/hicn/transport/interfaces/socket_consumer.h new file mode 100755 index 000000000..9e309aae8 --- /dev/null +++ b/libtransport/src/hicn/transport/interfaces/socket_consumer.h @@ -0,0 +1,259 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include +#include +#include +#include +#include + +#include +#include + +#define CONSUMER_READY 0 +#define CONSUMER_BUSY 1 + +namespace transport { + +namespace interface { + +class ConsumerSocket : public BaseSocket { + friend class protocol::TransportProtocol; + friend class protocol::VegasTransportProtocol; + friend class protocol::RaaqmTransportProtocol; + friend class protocol::CbrTransportProtocol; + + public: + explicit ConsumerSocket(int protocol); + explicit ConsumerSocket(int protocol, asio::io_service &io_service); + + ~ConsumerSocket(); + + void connect() override; + + int consume(const Name &name, utils::SharableVector &receive_buffer); + + int asyncConsume( + const Name &name, + std::shared_ptr> receive_buffer); + + void asyncSendInterest(Interest::Ptr &&interest, + Portal::ConsumerCallback *callback); + + void stop(); + + void resume(); + + asio::io_service &getIoService() override; + + int setSocketOption(int socket_option_key, + uint32_t socket_option_value) override; + + int setSocketOption(int socket_option_key, + double socket_option_value) override; + + int setSocketOption(int socket_option_key, bool socket_option_value) override; + + int setSocketOption(int socket_option_key, Name socket_option_value) override; + + int setSocketOption(int socket_option_key, + std::list socket_option_value) override; + + int setSocketOption( + int socket_option_key, + ProducerContentObjectCallback socket_option_value) override; + + int setSocketOption( + int socket_option_key, + ConsumerContentObjectVerificationCallback socket_option_value) override; + + int setSocketOption( + int socket_option_key, + ConsumerContentObjectCallback socket_option_value) override; + + int setSocketOption(int socket_option_key, + ConsumerInterestCallback socket_option_value) override; + + int setSocketOption(int socket_option_key, + ProducerInterestCallback socket_option_value) override; + + int setSocketOption(int socket_option_key, + ConsumerContentCallback socket_option_value) override; + + int setSocketOption(int socket_option_key, + ConsumerManifestCallback socket_option_value) override; + + int setSocketOption(int socket_option_key, + IcnObserver *socket_option_value) override; + + int setSocketOption(int socket_option_key, + HashAlgorithm socket_option_value) override; + + int setSocketOption(int socket_option_key, + utils::CryptoSuite crypto_suite) override; + + int setSocketOption(int socket_option_key, + const utils::Identity &crypto_suite) override; + + int setSocketOption(int socket_option_key, + const std::string &socket_option_value) override; + + int setSocketOption(int socket_option_key, + ConsumerTimerCallback socket_option_value) override; + + int setSocketOption(int socket_option_key, + ProducerContentCallback socket_option_value) override; + + int getSocketOption(int socket_option_key, + uint32_t &socket_option_value) override; + + int getSocketOption(int socket_option_key, + double &socket_option_value) override; + + int getSocketOption(int socket_option_key, + bool &socket_option_value) override; + + int getSocketOption(int socket_option_key, + Name &socket_option_value) override; + + int getSocketOption(int socket_option_key, + std::list &socket_option_value) override; + + int getSocketOption( + int socket_option_key, + ProducerContentObjectCallback &socket_option_value) override; + + int getSocketOption( + int socket_option_key, + ConsumerContentObjectVerificationCallback &socket_option_value) override; + + int getSocketOption( + int socket_option_key, + ConsumerContentObjectCallback &socket_option_value) override; + + int getSocketOption(int socket_option_key, + ConsumerInterestCallback &socket_option_value) override; + + int getSocketOption(int socket_option_key, + ProducerInterestCallback &socket_option_value) override; + + int getSocketOption(int socket_option_key, + ConsumerContentCallback &socket_option_value) override; + + int getSocketOption(int socket_option_key, + ConsumerManifestCallback &socket_option_value) override; + + int getSocketOption(int socket_option_key, + std::shared_ptr &socket_option_value) override; + + int getSocketOption(int socket_option_key, + IcnObserver **socket_option_value) override; + + int getSocketOption(int socket_option_key, + HashAlgorithm &socket_option_value) override; + + int getSocketOption(int socket_option_key, + utils::CryptoSuite &crypto_suite) override; + + int getSocketOption(int socket_option_key, + utils::Identity &crypto_suite) override; + + int getSocketOption(int socket_option_key, + std::string &socket_option_value) override; + + int getSocketOption(int socket_option_key, + ConsumerTimerCallback &socket_option_value) override; + + int getSocketOption(int socket_option_key, + ProducerContentCallback &socket_option_value) override; + + protected: + std::shared_ptr transport_protocol_; + + private: + // context inner state variables + asio::io_service internal_io_service_; + asio::io_service &io_service_; + + std::shared_ptr portal_; + + utils::EventThread async_downloader_; + + Name network_name_; + + int interest_lifetime_; + + double min_window_size_; + double max_window_size_; + double current_window_size_; + uint32_t max_retransmissions_; + size_t output_buffer_size_; + size_t input_buffer_size_; + + // RAAQM Parameters + + double minimum_drop_probability_; + unsigned int sample_number_; + double gamma_; + double beta_; + double drop_factor_; + + // Rate estimation parameters + double rate_estimation_alpha_; + IcnObserver *rate_estimation_observer_; + int rate_estimation_batching_parameter_; + int rate_estimation_choice_; + + bool is_async_; + + utils::Verifier verifier_; + PARCKeyId *key_id_; + bool verify_signature_; + + std::shared_ptr> content_buffer_; + + ConsumerInterestCallback on_interest_retransmission_; + ConsumerInterestCallback on_interest_output_; + ConsumerInterestCallback on_interest_timeout_; + ConsumerInterestCallback on_interest_satisfied_; + + ConsumerContentObjectCallback on_content_object_input_; + ConsumerContentObjectVerificationCallback on_content_object_verification_; + + ConsumerContentObjectCallback on_content_object_; + ConsumerManifestCallback on_manifest_; + + ConsumerContentCallback on_payload_retrieved_; + + ConsumerTimerCallback on_timer_expires_; + + // Virtual download for traffic generator + + bool virtual_download_; + bool rtt_stats_; + + Time t0_; + Time t1_; + asio::steady_timer timer_; + uint32_t timer_interval_milliseconds_; +}; + +} // namespace interface + +} // end namespace transport diff --git a/libtransport/src/hicn/transport/interfaces/socket_options_default_values.h b/libtransport/src/hicn/transport/interfaces/socket_options_default_values.h new file mode 100755 index 000000000..5fae1c484 --- /dev/null +++ b/libtransport/src/hicn/transport/interfaces/socket_options_default_values.h @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +namespace transport { + +namespace interface { + +namespace default_values { + +const uint32_t interest_lifetime = 1001; // milliseconds +const uint32_t content_object_expiry_time = + 0xffff; // milliseconds -> 50 seconds +const uint32_t content_object_packet_size = 1500; // The ethernet MTU +const uint32_t producer_socket_input_buffer_size = 150000; // Interests +const uint32_t producer_socket_output_buffer_size = 150000; // Content Object +const uint32_t log_2_default_buffer_size = 12; +const uint32_t signature_size = 260; // bytes +const uint32_t key_locator_size = 60; // bytes +const uint32_t limit_guard = 80; // bytes +const uint32_t min_window_size = 1; // Interests +const uint32_t max_window_size = 128000; // Interests +const uint32_t digest_size = 34; // bytes +const uint32_t max_out_of_order_segments = 3; // content object +const uint32_t never_expire_time = 0x0000ffff << 0x0f; + +// RAAQM +const int sample_number = 30; +const double gamma_value = 1; +const double beta_value = 0.8; +const double drop_factor = 0.2; +const double minimum_drop_probability = 0.00001; +const int path_id = 0; +const double rate_alpha = 0.8; + +// Vegas +const double alpha = 1 / 8; +const double beta = 1 / 4; +const uint16_t k = 4; +const std::chrono::milliseconds clock_granularity = + std::chrono::milliseconds(100); + +// maximum allowed values +const uint32_t transport_protocol_min_retransmissions = 0; +const uint32_t transport_protocol_max_retransmissions = 128; +const uint32_t max_content_object_size = 8096; + +} // namespace default_values + +} // namespace interface + +} // end namespace transport diff --git a/libtransport/src/hicn/transport/interfaces/socket_options_keys.h b/libtransport/src/hicn/transport/interfaces/socket_options_keys.h new file mode 100755 index 000000000..1afad2b48 --- /dev/null +++ b/libtransport/src/hicn/transport/interfaces/socket_options_keys.h @@ -0,0 +1,108 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +namespace transport { + +namespace interface { + +typedef enum { + RAAQM = 0, + VEGAS = 1, + CBR = 3, + RTC = 4, +} TransportProtocolAlgorithms; + +typedef enum { + INPUT_BUFFER_SIZE = 101, + OUTPUT_BUFFER_SIZE = 102, + NETWORK_NAME = 103, + NAME_SUFFIX = 104, + MAX_INTEREST_RETX = 105, + DATA_PACKET_SIZE = 106, + INTEREST_LIFETIME = 107, + CONTENT_OBJECT_EXPIRY_TIME = 108, + KEY_LOCATOR = 110, + SIGNATURE_TYPE = 111, + MIN_WINDOW_SIZE = 112, + MAX_WINDOW_SIZE = 113, + CURRENT_WINDOW_SIZE = 114, + ASYNC_MODE = 115, + MAKE_MANIFEST = 116, + PORTAL = 117, + RUNNING = 118, + HASH_ALGORITHM = 119, + CRYPTO_SUITE = 120, + IDENTITY = 121, + CERTIFICATE = 122, + VERIFY_SIGNATURE = 123, + TIMER_INTERVAL = 124 +} GeneralTransportOptions; + +typedef enum { + SAMPLE_NUMBER = 201, + GAMMA_VALUE = 202, + BETA_VALUE = 203, + DROP_FACTOR = 204, + MINIMUM_DROP_PROBABILITY = 205, + PATH_ID = 206, + RTT_STATS = 207, +} RaaqmTransportOptions; + +typedef enum { + RATE_ESTIMATION_ALPHA = 301, + RATE_ESTIMATION_OBSERVER = 302, + RATE_ESTIMATION_BATCH_PARAMETER = 303, + RATE_ESTIMATION_CHOICE = 304, +} RateEstimationOptions; + +typedef enum { + INTEREST_OUTPUT = 401, + INTEREST_RETRANSMISSION = 402, + INTEREST_EXPIRED = 403, + INTEREST_SATISFIED = 404, + CONTENT_OBJECT_INPUT = 411, + MANIFEST_INPUT = 412, + CONTENT_OBJECT_TO_VERIFY = 413, + CONTENT_RETRIEVED = 414, + TIMER_EXPIRES = 415 +} ConsumerCallbacksOptions; + +typedef enum { + INTEREST_INPUT = 501, + INTEREST_DROP = 502, + INTEREST_PASS = 503, + CACHE_HIT = 506, + CACHE_MISS = 508, + NEW_CONTENT_OBJECT = 509, + CONTENT_OBJECT_SIGN = 513, + CONTENT_OBJECT_READY = 510, + CONTENT_OBJECT_OUTPUT = 511, + CONTENT_PRODUCED = 512 +} ProducerCallbacksOptions; + +typedef enum { OUTPUT_INTERFACE = 601 } DataLinkOptions; + +typedef enum { VIRTUAL_DOWNLOAD = 601, USE_CFG_FILE = 603 } OtherOptions; + +typedef enum { + SHA_256 = 701, + RSA_256 = 702, +} SignatureType; + +} // namespace interface + +} // end namespace transport \ No newline at end of file diff --git a/libtransport/src/hicn/transport/interfaces/socket_producer.cc b/libtransport/src/hicn/transport/interfaces/socket_producer.cc new file mode 100755 index 000000000..69adc2b3f --- /dev/null +++ b/libtransport/src/hicn/transport/interfaces/socket_producer.cc @@ -0,0 +1,948 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +namespace transport { + +namespace interface { + +typedef std::chrono::time_point Time; +typedef std::chrono::microseconds TimeDuration; + +ProducerSocket::ProducerSocket() : ProducerSocket(internal_io_service_) {} + +ProducerSocket::ProducerSocket(asio::io_service &io_service) + : io_service_(io_service), + portal_(std::make_shared(io_service_)), + data_packet_size_(default_values::content_object_packet_size), + content_object_expiry_time_(default_values::content_object_expiry_time), + output_buffer_(default_values::producer_socket_output_buffer_size), + async_thread_(), + registration_status_(REGISTRATION_NOT_ATTEMPTED), + making_manifest_(false), + signature_type_(SHA_256), + hash_algorithm_(HashAlgorithm::SHA_256), + input_buffer_capacity_(default_values::producer_socket_input_buffer_size), + input_buffer_size_(0), + processing_thread_stop_(false), + listening_thread_stop_(false), + on_interest_input_(VOID_HANDLER), + on_interest_dropped_input_buffer_(VOID_HANDLER), + on_interest_inserted_input_buffer_(VOID_HANDLER), + on_interest_satisfied_output_buffer_(VOID_HANDLER), + on_interest_process_(VOID_HANDLER), + on_new_segment_(VOID_HANDLER), + on_content_object_to_sign_(VOID_HANDLER), + on_content_object_in_output_buffer_(VOID_HANDLER), + on_content_object_output_(VOID_HANDLER), + on_content_object_evicted_from_output_buffer_(VOID_HANDLER), + on_content_produced_(VOID_HANDLER) { + listening_thread_stop_ = false; +} + +ProducerSocket::~ProducerSocket() { + TRANSPORT_LOGI("Destroying the ProducerSocket"); + processing_thread_stop_ = true; + portal_->stopEventsLoop(); + + if (processing_thread_.joinable()) { + processing_thread_.join(); + } + + if (listening_thread_.joinable()) { + listening_thread_.join(); + } +} + +void ProducerSocket::connect() { + portal_->connect(false); + listening_thread_ = std::thread(std::bind(&ProducerSocket::listen, this)); +} + +void ProducerSocket::serveForever() { + if (listening_thread_.joinable()) { + listening_thread_.join(); + } +} + +void ProducerSocket::stop() { + TRANSPORT_LOGI("Calling stop for ProducerSocket"); + portal_->killConnection(); + portal_->stopEventsLoop(); +} + +void ProducerSocket::registerPrefix(const Prefix &producer_namespace) { + served_namespaces_.push_back(producer_namespace); +} + +void ProducerSocket::listen() { + registration_status_ = REGISTRATION_IN_PROGRESS; + bool first = true; + + for (core::Prefix &producer_namespace : served_namespaces_) { + if (first) { + core::BindConfig bind_config(producer_namespace, 1000); + portal_->bind(bind_config); + portal_->setProducerCallback(this); + first = !first; + } else { + portal_->registerRoute(producer_namespace); + } + } + + portal_->runEventsLoop(); +} + +void ProducerSocket::passContentObjectToCallbacks( + const std::shared_ptr &content_object) { + if (content_object) { + if (on_new_segment_ != VOID_HANDLER) { + on_new_segment_(*this, *content_object); + } + + if (on_content_object_to_sign_ != VOID_HANDLER) { + on_content_object_to_sign_(*this, *content_object); + } + + if (on_content_object_in_output_buffer_ != VOID_HANDLER) { + on_content_object_in_output_buffer_(*this, *content_object); + } + + output_buffer_.insert(content_object); + + if (on_content_object_output_ != VOID_HANDLER) { + on_content_object_output_(*this, *content_object); + } + +#ifndef PUSH_API + std::unordered_map>::iterator it; + + { + std::lock_guard lock(pending_interests_mtx_); + it = pending_interests_.find(content_object->getName()); + } + + if (it != pending_interests_.end()) { + content_object->setLocator(it->second->getLocator()); + portal_->sendContentObject(*content_object); + std::lock_guard lock(pending_interests_mtx_); + pending_interests_.erase(it); + } +#else + portal_->sendContentObject(*content_object); +#endif + } +} + +void ProducerSocket::produce(ContentObject &content_object) { + if (on_content_object_in_output_buffer_ != VOID_HANDLER) { + on_content_object_in_output_buffer_(*this, content_object); + } + + output_buffer_.insert(std::static_pointer_cast( + content_object.shared_from_this())); + + if (on_content_object_output_ != VOID_HANDLER) { + on_content_object_output_(*this, content_object); + } + +#ifndef PUSH_API + std::unordered_map>::iterator it; + + { + std::lock_guard lock(pending_interests_mtx_); + it = pending_interests_.find(content_object.getName()); + } + + if (it != pending_interests_.end()) { + content_object.setLocator(it->second->getLocator()); + portal_->sendContentObject(content_object); + std::lock_guard lock(pending_interests_mtx_); + pending_interests_.erase(it); + } +#else + portal_->sendContentObject(content_object); +#endif +} + +uint32_t ProducerSocket::produce(Name content_name, const uint8_t *buf, + size_t buffer_size, bool is_last, + uint32_t start_offset) { + if (TRANSPORT_EXPECT_FALSE(buffer_size == 0)) { + return 0; + } + + const std::size_t hash_size = 32; + + int bytes_segmented = 0; + std::size_t header_size; + std::size_t manifest_header_size = 0; + std::size_t signature_length = 0; + std::uint32_t final_block_number = 0; + + uint64_t free_space_for_content = 0; + + core::Packet::Format format; + + uint32_t current_segment = start_offset; + std::shared_ptr manifest; + bool is_last_manifest = false; + std::unique_ptr zero_hash; + + // TODO Manifest may still be used for indexing + if (making_manifest_ && !identity_) { + throw errors::RuntimeException( + "Making manifests without setting producer identity. Aborting."); + } + + core::Packet::Format hf_format = core::Packet::Format::HF_UNSPEC; + core::Packet::Format hf_format_ah = core::Packet::Format::HF_UNSPEC; + if (content_name.getType() == HNT_CONTIGUOUS_V4 || + content_name.getType() == HNT_IOV_V4) { + hf_format = core::Packet::Format::HF_INET_TCP; + hf_format_ah = core::Packet::Format::HF_INET_TCP_AH; + } else if (content_name.getType() == HNT_CONTIGUOUS_V6 || + content_name.getType() == HNT_IOV_V6) { + hf_format = core::Packet::Format::HF_INET6_TCP; + hf_format_ah = core::Packet::Format::HF_INET6_TCP_AH; + } else { + throw errors::RuntimeException("Unknown name format."); + } + + format = hf_format; + if (making_manifest_) { + format = hf_format; + manifest_header_size = core::Packet::getHeaderSizeFromFormat( + hf_format_ah, identity_->getSignatureLength()); + } else if (identity_) { + format = hf_format_ah; + signature_length = identity_->getSignatureLength(); + } + + header_size = core::Packet::getHeaderSizeFromFormat(format, signature_length); + + free_space_for_content = data_packet_size_ - header_size; + + uint32_t number_of_segments = + uint32_t(std::ceil(double(buffer_size) / double(free_space_for_content))); + + if (free_space_for_content * number_of_segments < buffer_size) { + number_of_segments++; + } + + if (making_manifest_) { + auto segment_in_manifest = static_cast( + std::floor(double(data_packet_size_ - manifest_header_size - + ContentObjectManifest::getManifestHeaderSize()) / + (4.0 + 32.0)) - + 1.0); + auto number_of_manifests = static_cast( + std::ceil(float(number_of_segments) / segment_in_manifest)); + final_block_number = number_of_segments + number_of_manifests - 1; + + manifest.reset(ContentObjectManifest::createManifest( + content_name.setSuffix(current_segment++), + core::ManifestVersion::VERSION_1, core::ManifestType::INLINE_MANIFEST, + hash_algorithm_, is_last_manifest, content_name, + core::NextSegmentCalculationStrategy::INCREMENTAL, + identity_->getSignatureLength())); + manifest->setLifetime(content_object_expiry_time_); + + if (is_last) { + manifest->setFinalBlockNumber(final_block_number); + } else { + manifest->setFinalBlockNumber(std::numeric_limits::max()); + } + + uint8_t hash[hash_size]; + std::memset(hash, 0, hash_size); + zero_hash = std::make_unique( + hash, hash_size, static_cast(hash_algorithm_)); + } + + for (unsigned int packaged_segments = 0; + packaged_segments < number_of_segments; packaged_segments++) { + if (making_manifest_) { + if (manifest->estimateManifestSize(2) > + data_packet_size_ - manifest_header_size) { + // Add next manifest + manifest->addSuffixHash(current_segment, *zero_hash); + + // Send the current manifest + manifest->encode(); + + identity_->getSigner().sign(*manifest); + + passContentObjectToCallbacks(manifest); + + // Create new manifest. The reference to the last manifest has been + // acquired in the passContentObjectToCallbacks function, so we can + // safely release this reference + manifest.reset(ContentObjectManifest::createManifest( + content_name.setSuffix(current_segment), + core::ManifestVersion::VERSION_1, + core::ManifestType::INLINE_MANIFEST, hash_algorithm_, + is_last_manifest, content_name, + core::NextSegmentCalculationStrategy::INCREMENTAL, + identity_->getSignatureLength())); + manifest->setLifetime(content_object_expiry_time_); + if (is_last) { + manifest->setFinalBlockNumber(final_block_number); + } else { + manifest->setFinalBlockNumber(std::numeric_limits::max()); + } + current_segment++; + } + } + + auto content_object = std::make_shared( + content_name.setSuffix(current_segment), format); + content_object->setLifetime(content_object_expiry_time_); + + if (!making_manifest_ && identity_) { + content_object->setSignatureSize(signature_length); + } + + if (packaged_segments == number_of_segments - 1) { + content_object->appendPayload(&buf[bytes_segmented], + buffer_size - bytes_segmented); + bytes_segmented += buffer_size - bytes_segmented; + + if (is_last && making_manifest_) { + is_last_manifest = true; + } else if (is_last) { + content_object->setRst(); + } + + } else { + content_object->appendPayload(&buf[bytes_segmented], + free_space_for_content); + bytes_segmented += free_space_for_content; + } + + if (making_manifest_) { + using namespace std::chrono_literals; + utils::CryptoHash hash = content_object->computeDigest(hash_algorithm_); + manifest->addSuffixHash(current_segment, hash); + } else if (identity_) { + identity_->getSigner().sign(*content_object); + } + + current_segment++; + passContentObjectToCallbacks(content_object); + } + + if (making_manifest_) { + if (is_last_manifest) { + manifest->setFinalManifest(is_last_manifest); + } + manifest->encode(); + // Time t0 = std::chrono::steady_clock::now(); + identity_->getSigner().sign(*manifest); + passContentObjectToCallbacks(manifest); + } + + if (on_content_produced_ != VOID_HANDLER) { + on_content_produced_(*this, std::make_error_code(std::errc(0)), + buffer_size); + } + + return current_segment; +} + +void ProducerSocket::asyncProduce(ContentObject &content_object) { + if (!async_thread_.stopped()) { + // async_thread_.add(std::bind(&ProducerSocket::produce, this, + // content_object)); + } +} + +// void ProducerSocket::asyncProduce(const Name &suffix, +// const uint8_t *buf, +// size_t buffer_size, +// AsyncProduceCallback && handler) { +// if (!async_thread_.stopped()) { +// async_thread_.add([this, buffer = buf, size = buffer_size, cb = +// std::move(handler)] () { +// uint64_t bytes_written = produce(suff, buffer, size, 0, false); +// auto ec = std::make_errc(0); +// cb(*this, ec, bytes_written); +// }); +// } +// } + +void ProducerSocket::asyncProduce(const Name &suffix, const uint8_t *buf, + size_t buffer_size) { + if (!async_thread_.stopped()) { + async_thread_.add( + [this, suff = suffix, buffer = buf, size = buffer_size]() { + produce(suff, buffer, size, true); + }); + } +} + +void ProducerSocket::asyncProduce( + const Name &suffix, utils::SharableVector &&output_buffer) { + if (!async_thread_.stopped()) { + async_thread_.add( + [this, suff = suffix, buffer = std::move(output_buffer)]() { + TRANSPORT_LOGI("FOR REAL!!!!!! --> Producing content with name %s", + suff.toString().c_str()); + produce(suff, &buffer[0], buffer.size(), true); + }); + } +} + +void ProducerSocket::onInterest(const Interest &interest) { + if (on_interest_input_ != VOID_HANDLER) { + on_interest_input_(*this, interest); + } + + const std::shared_ptr content_object = + output_buffer_.find(interest); + + if (content_object) { + if (on_interest_satisfied_output_buffer_ != VOID_HANDLER) { + on_interest_satisfied_output_buffer_(*this, interest); + } + + if (on_content_object_output_ != VOID_HANDLER) { + on_content_object_output_(*this, *content_object); + } + + portal_->sendContentObject(*content_object); + } else { +#ifndef PUSH_API + { + std::lock_guard lock(pending_interests_mtx_); + pending_interests_[interest.getName()] = + std::static_pointer_cast(interest.shared_from_this()); + } +#endif + + if (on_interest_process_ != VOID_HANDLER) { + // external_io_service_.post([this, &interest] () { + on_interest_process_(*this, interest); + // }); + } + } +} + +asio::io_service &ProducerSocket::getIoService() { return io_service_; } + +int ProducerSocket::setSocketOption(int socket_option_key, + uint32_t socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::DATA_PACKET_SIZE: + if (socket_option_value < default_values::max_content_object_size && + socket_option_value > 0) { + data_packet_size_ = socket_option_value; + return SOCKET_OPTION_SET; + } else { + return SOCKET_OPTION_NOT_SET; + } + + case GeneralTransportOptions::INPUT_BUFFER_SIZE: + if (socket_option_value >= 1) { + input_buffer_capacity_ = socket_option_value; + return SOCKET_OPTION_SET; + } else { + return SOCKET_OPTION_NOT_SET; + } + + case GeneralTransportOptions::OUTPUT_BUFFER_SIZE: + output_buffer_.setLimit(socket_option_value); + return SOCKET_OPTION_SET; + + case GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME: + content_object_expiry_time_ = socket_option_value; + return SOCKET_OPTION_SET; + + case GeneralTransportOptions::SIGNATURE_TYPE: + if (socket_option_value == SOCKET_OPTION_DEFAULT) { + signature_type_ = SHA_256; + } else { + signature_type_ = socket_option_value; + } + + if (signature_type_ == SHA_256 || signature_type_ == RSA_256) { + signature_size_ = 32; + } + + case ProducerCallbacksOptions::INTEREST_INPUT: + if (socket_option_value == VOID_HANDLER) { + on_interest_input_ = VOID_HANDLER; + return SOCKET_OPTION_SET; + } + + case ProducerCallbacksOptions::INTEREST_DROP: + if (socket_option_value == VOID_HANDLER) { + on_interest_dropped_input_buffer_ = VOID_HANDLER; + return SOCKET_OPTION_SET; + } + + case ProducerCallbacksOptions::INTEREST_PASS: + if (socket_option_value == VOID_HANDLER) { + on_interest_inserted_input_buffer_ = VOID_HANDLER; + return SOCKET_OPTION_SET; + } + + case ProducerCallbacksOptions::CACHE_HIT: + if (socket_option_value == VOID_HANDLER) { + on_interest_satisfied_output_buffer_ = VOID_HANDLER; + return SOCKET_OPTION_SET; + } + + case ProducerCallbacksOptions::CACHE_MISS: + if (socket_option_value == VOID_HANDLER) { + on_interest_process_ = VOID_HANDLER; + return SOCKET_OPTION_SET; + } + + case ProducerCallbacksOptions::NEW_CONTENT_OBJECT: + if (socket_option_value == VOID_HANDLER) { + on_new_segment_ = VOID_HANDLER; + return SOCKET_OPTION_SET; + } + + case ProducerCallbacksOptions::CONTENT_OBJECT_SIGN: + if (socket_option_value == VOID_HANDLER) { + on_content_object_to_sign_ = VOID_HANDLER; + return SOCKET_OPTION_SET; + } + + case ProducerCallbacksOptions::CONTENT_OBJECT_READY: + if (socket_option_value == VOID_HANDLER) { + on_content_object_in_output_buffer_ = VOID_HANDLER; + return SOCKET_OPTION_SET; + } + + case ProducerCallbacksOptions::CONTENT_OBJECT_OUTPUT: + if (socket_option_value == VOID_HANDLER) { + on_content_object_output_ = VOID_HANDLER; + return SOCKET_OPTION_SET; + } + + default: + return SOCKET_OPTION_NOT_SET; + } +} + +int ProducerSocket::setSocketOption(int socket_option_key, + double socket_option_value) { + return SOCKET_OPTION_NOT_SET; +} + +int ProducerSocket::setSocketOption(int socket_option_key, + bool socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::MAKE_MANIFEST: + making_manifest_ = socket_option_value; + return SOCKET_OPTION_SET; + + default: + return SOCKET_OPTION_NOT_SET; + } +} + +int ProducerSocket::setSocketOption(int socket_option_key, + Name socket_option_value) { + return SOCKET_OPTION_NOT_SET; +} + +int ProducerSocket::setSocketOption(int socket_option_key, + std::list socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::NETWORK_NAME: + served_namespaces_ = socket_option_value; + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; +} + +int ProducerSocket::setSocketOption( + int socket_option_key, ProducerContentObjectCallback socket_option_value) { + switch (socket_option_key) { + case ProducerCallbacksOptions::NEW_CONTENT_OBJECT: + on_new_segment_ = socket_option_value; + return SOCKET_OPTION_SET; + + case ProducerCallbacksOptions::CONTENT_OBJECT_SIGN: + on_content_object_to_sign_ = socket_option_value; + return SOCKET_OPTION_SET; + + case ProducerCallbacksOptions::CONTENT_OBJECT_READY: + on_content_object_in_output_buffer_ = socket_option_value; + return SOCKET_OPTION_SET; + + case ProducerCallbacksOptions::CONTENT_OBJECT_OUTPUT: + on_content_object_output_ = socket_option_value; + return SOCKET_OPTION_SET; + + default: + return SOCKET_OPTION_NOT_SET; + } +} + +int ProducerSocket::setSocketOption( + int socket_option_key, ProducerInterestCallback socket_option_value) { + switch (socket_option_key) { + case ProducerCallbacksOptions::INTEREST_INPUT: + on_interest_input_ = socket_option_value; + return SOCKET_OPTION_SET; + + case ProducerCallbacksOptions::INTEREST_DROP: + on_interest_dropped_input_buffer_ = socket_option_value; + return SOCKET_OPTION_SET; + + case ProducerCallbacksOptions::INTEREST_PASS: + on_interest_inserted_input_buffer_ = socket_option_value; + return SOCKET_OPTION_SET; + + case ProducerCallbacksOptions::CACHE_HIT: + on_interest_satisfied_output_buffer_ = socket_option_value; + return SOCKET_OPTION_SET; + + case ProducerCallbacksOptions::CACHE_MISS: + on_interest_process_ = socket_option_value; + return SOCKET_OPTION_SET; + + default: + return SOCKET_OPTION_NOT_SET; + } +} + +int ProducerSocket::setSocketOption( + int socket_option_key, ProducerContentCallback socket_option_value) { + switch (socket_option_key) { + case ProducerCallbacksOptions::CONTENT_PRODUCED: + on_content_produced_ = socket_option_value; + return SOCKET_OPTION_SET; + + default: + return SOCKET_OPTION_NOT_SET; + } +} + +int ProducerSocket::setSocketOption( + int socket_option_key, ConsumerContentObjectCallback socket_option_value) { + return SOCKET_OPTION_NOT_SET; +} + +int ProducerSocket::setSocketOption( + int socket_option_key, + ConsumerContentObjectVerificationCallback socket_option_value) { + return SOCKET_OPTION_NOT_SET; +} + +int ProducerSocket::setSocketOption( + int socket_option_key, ConsumerInterestCallback socket_option_value) { + return SOCKET_OPTION_NOT_SET; +} + +int ProducerSocket::setSocketOption( + int socket_option_key, ConsumerContentCallback socket_option_value) { + return SOCKET_OPTION_NOT_SET; +} + +int ProducerSocket::setSocketOption( + int socket_option_key, ConsumerManifestCallback socket_option_value) { + return SOCKET_OPTION_NOT_SET; +} + +int ProducerSocket::setSocketOption(int socket_option_key, + HashAlgorithm socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::HASH_ALGORITHM: + hash_algorithm_ = socket_option_value; + return SOCKET_OPTION_SET; + default: + return SOCKET_OPTION_NOT_SET; + } +} + +int ProducerSocket::setSocketOption(int socket_option_key, + utils::CryptoSuite socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::CRYPTO_SUITE: + crypto_suite_ = socket_option_value; + return SOCKET_OPTION_SET; + default: + return SOCKET_OPTION_NOT_SET; + } +} + +int ProducerSocket::setSocketOption( + int socket_option_key, const utils::Identity &socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::IDENTITY: + identity_.reset(); + identity_ = std::make_unique(socket_option_value); + return SOCKET_OPTION_SET; + default: + return SOCKET_OPTION_NOT_SET; + } +} + +int ProducerSocket::setSocketOption(int socket_option_key, + const std::string &socket_option_value) { + switch (socket_option_key) { + case DataLinkOptions::OUTPUT_INTERFACE: + output_interface_ = socket_option_value; + portal_->setOutputInterface(output_interface_); + return SOCKET_OPTION_SET; + } + + return SOCKET_OPTION_NOT_SET; +} + +int ProducerSocket::setSocketOption( + int socket_option_key, + interface::ConsumerTimerCallback socket_option_value) { + return SOCKET_OPTION_NOT_SET; +} + +int ProducerSocket::getSocketOption(int socket_option_key, + uint32_t &socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::INPUT_BUFFER_SIZE: + socket_option_value = input_buffer_capacity_; + return SOCKET_OPTION_GET; + + case GeneralTransportOptions::OUTPUT_BUFFER_SIZE: + socket_option_value = output_buffer_.getLimit(); + return SOCKET_OPTION_GET; + + case GeneralTransportOptions::DATA_PACKET_SIZE: + socket_option_value = data_packet_size_; + return SOCKET_OPTION_GET; + + case GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME: + socket_option_value = content_object_expiry_time_; + return SOCKET_OPTION_GET; + + case GeneralTransportOptions::SIGNATURE_TYPE: + socket_option_value = signature_type_; + return SOCKET_OPTION_GET; + + default: + return SOCKET_OPTION_NOT_SET; + } +} + +int ProducerSocket::getSocketOption(int socket_option_key, + double &socket_option_value) { + return SOCKET_OPTION_NOT_GET; +} + +int ProducerSocket::getSocketOption(int socket_option_key, + bool &socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::MAKE_MANIFEST: + socket_option_value = making_manifest_; + return SOCKET_OPTION_GET; + + default: + return SOCKET_OPTION_NOT_GET; + } +} + +int ProducerSocket::getSocketOption(int socket_option_key, + Name &socket_option_value) { + return SOCKET_OPTION_NOT_GET; +} + +int ProducerSocket::getSocketOption(int socket_option_key, + std::list &socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::NETWORK_NAME: + + socket_option_value = served_namespaces_; + return SOCKET_OPTION_GET; + + default: + return SOCKET_OPTION_NOT_GET; + } +} + +int ProducerSocket::getSocketOption( + int socket_option_key, ProducerContentObjectCallback &socket_option_value) { + switch (socket_option_key) { + case ProducerCallbacksOptions::NEW_CONTENT_OBJECT: + socket_option_value = on_new_segment_; + return SOCKET_OPTION_GET; + + case ProducerCallbacksOptions::CONTENT_OBJECT_SIGN: + socket_option_value = on_content_object_to_sign_; + return SOCKET_OPTION_GET; + + case ProducerCallbacksOptions::CONTENT_OBJECT_READY: + socket_option_value = on_content_object_in_output_buffer_; + return SOCKET_OPTION_GET; + + case ProducerCallbacksOptions::CONTENT_OBJECT_OUTPUT: + socket_option_value = on_content_object_output_; + return SOCKET_OPTION_GET; + + default: + return SOCKET_OPTION_NOT_GET; + } +} + +int ProducerSocket::getSocketOption( + int socket_option_key, ProducerContentCallback &socket_option_value) { + switch (socket_option_key) { + case ProducerCallbacksOptions::CONTENT_PRODUCED: + socket_option_value = on_content_produced_; + return SOCKET_OPTION_GET; + + default: + return SOCKET_OPTION_NOT_GET; + } +} + +int ProducerSocket::getSocketOption( + int socket_option_key, ProducerInterestCallback &socket_option_value) { + switch (socket_option_key) { + case ProducerCallbacksOptions::INTEREST_INPUT: + socket_option_value = on_interest_input_; + return SOCKET_OPTION_GET; + + case ProducerCallbacksOptions::INTEREST_DROP: + socket_option_value = on_interest_dropped_input_buffer_; + return SOCKET_OPTION_GET; + + case ProducerCallbacksOptions::INTEREST_PASS: + socket_option_value = on_interest_inserted_input_buffer_; + return SOCKET_OPTION_GET; + + case CACHE_HIT: + socket_option_value = on_interest_satisfied_output_buffer_; + return SOCKET_OPTION_GET; + + case CACHE_MISS: + socket_option_value = on_interest_process_; + return SOCKET_OPTION_GET; + + default: + return SOCKET_OPTION_NOT_GET; + } +} + +int ProducerSocket::getSocketOption( + int socket_option_key, ConsumerContentObjectCallback &socket_option_value) { + return SOCKET_OPTION_NOT_GET; +} + +int ProducerSocket::getSocketOption( + int socket_option_key, + ConsumerContentObjectVerificationCallback &socket_option_value) { + return SOCKET_OPTION_NOT_GET; +} + +int ProducerSocket::getSocketOption( + int socket_option_key, ConsumerInterestCallback &socket_option_value) { + return SOCKET_OPTION_NOT_GET; +} + +int ProducerSocket::getSocketOption( + int socket_option_key, ConsumerContentCallback &socket_option_value) { + return SOCKET_OPTION_NOT_GET; +} + +int ProducerSocket::getSocketOption( + int socket_option_key, ConsumerManifestCallback &socket_option_value) { + return SOCKET_OPTION_NOT_GET; +} + +int ProducerSocket::getSocketOption( + int socket_option_key, std::shared_ptr &socket_option_value) { + switch (socket_option_key) { + case PORTAL: + socket_option_value = portal_; + return SOCKET_OPTION_GET; + } + + return SOCKET_OPTION_NOT_GET; +} + +int ProducerSocket::getSocketOption(int socket_option_key, + IcnObserver **socket_option_value) { + return SOCKET_OPTION_NOT_GET; +} + +int ProducerSocket::setSocketOption(int socket_option_key, + IcnObserver *socket_option_value) { + return SOCKET_OPTION_NOT_SET; +} + +int ProducerSocket::getSocketOption(int socket_option_key, + HashAlgorithm &socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::HASH_ALGORITHM: + socket_option_value = hash_algorithm_; + return SOCKET_OPTION_GET; + default: + return SOCKET_OPTION_NOT_GET; + } +} + +int ProducerSocket::getSocketOption(int socket_option_key, + utils::CryptoSuite &socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::HASH_ALGORITHM: + socket_option_value = crypto_suite_; + return SOCKET_OPTION_GET; + default: + return SOCKET_OPTION_NOT_GET; + } +} + +int ProducerSocket::getSocketOption(int socket_option_key, + utils::Identity &socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::IDENTITY: + if (identity_) { + socket_option_value = *identity_; + return SOCKET_OPTION_SET; + } + default: + return SOCKET_OPTION_NOT_SET; + } +} + +int ProducerSocket::getSocketOption(int socket_option_key, + std::string &socket_option_value) { + switch (socket_option_key) { + case DataLinkOptions::OUTPUT_INTERFACE: + socket_option_value = output_interface_; + return SOCKET_OPTION_GET; + } + + return SOCKET_OPTION_NOT_GET; +} + +int ProducerSocket::getSocketOption( + int socket_option_key, + interface::ConsumerTimerCallback &socket_option_value) { + return SOCKET_OPTION_NOT_GET; +} + +} // namespace interface + +} // end namespace transport diff --git a/libtransport/src/hicn/transport/interfaces/socket_producer.h b/libtransport/src/hicn/transport/interfaces/socket_producer.h new file mode 100755 index 000000000..06c47d973 --- /dev/null +++ b/libtransport/src/hicn/transport/interfaces/socket_producer.h @@ -0,0 +1,269 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#define PUSH_API 1 + +#define REGISTRATION_NOT_ATTEMPTED 0 +#define REGISTRATION_SUCCESS 1 +#define REGISTRATION_FAILURE 2 +#define REGISTRATION_IN_PROGRESS 3 + +namespace transport { + +namespace interface { + +using namespace core; + +class ProducerSocket : public Socket, + public BasePortal::ProducerCallback { + public: + explicit ProducerSocket(); + explicit ProducerSocket(asio::io_service &io_service); + + ~ProducerSocket(); + + void connect() override; + + uint32_t produce(Name content_name, const uint8_t *buffer, size_t buffer_size, + bool is_last = true, uint32_t start_offset = 0); + + void produce(ContentObject &content_object); + + void asyncProduce(const Name &suffix, const uint8_t *buf, size_t buffer_size); + + void asyncProduce(const Name &suffix, + utils::SharableVector &&output_buffer); + + void asyncProduce(ContentObject &content_object); + + void registerPrefix(const Prefix &producer_namespace); + + void serveForever(); + + void stop(); + + asio::io_service &getIoService() override; + + virtual void onInterest(const Interest &interest); + + virtual void onInterest(Interest::Ptr &&interest) override { + onInterest(*interest); + }; + + int setSocketOption(int socket_option_key, + uint32_t socket_option_value) override; + + int setSocketOption(int socket_option_key, + double socket_option_value) override; + + int setSocketOption(int socket_option_key, bool socket_option_value) override; + + int setSocketOption(int socket_option_key, Name socket_option_value) override; + + int setSocketOption(int socket_option_key, + std::list socket_option_value) override; + + int setSocketOption( + int socket_option_key, + ProducerContentObjectCallback socket_option_value) override; + + int setSocketOption(int socket_option_key, + ProducerInterestCallback socket_option_value) override; + + int setSocketOption( + int socket_option_key, + ConsumerContentObjectVerificationCallback socket_option_value) override; + + int setSocketOption( + int socket_option_key, + ConsumerContentObjectCallback socket_option_value) override; + + int setSocketOption(int socket_option_key, + ConsumerInterestCallback socket_option_value) override; + + int setSocketOption(int socket_option_key, + ConsumerContentCallback socket_option_value) override; + + int setSocketOption(int socket_option_key, + ConsumerManifestCallback socket_option_value) override; + + int setSocketOption(int socket_option_key, IcnObserver *obs) override; + + int setSocketOption(int socket_option_key, + HashAlgorithm socket_option_value) override; + + int setSocketOption(int socket_option_key, + utils::CryptoSuite socket_option_value) override; + + int setSocketOption(int socket_option_key, + const utils::Identity &socket_option_value) override; + + int setSocketOption(int socket_option_key, + const std::string &socket_option_value) override; + + int setSocketOption(int socket_option_key, + ConsumerTimerCallback socket_option_value) override; + + int setSocketOption(int socket_option_key, + ProducerContentCallback socket_option_value) override; + + int getSocketOption(int socket_option_key, + uint32_t &socket_option_value) override; + + int getSocketOption(int socket_option_key, + double &socket_option_value) override; + + int getSocketOption(int socket_option_key, + bool &socket_option_value) override; + + int getSocketOption(int socket_option_key, + Name &socket_option_value) override; + + int getSocketOption(int socket_option_key, + std::list &socket_option_value) override; + + int getSocketOption( + int socket_option_key, + ProducerContentObjectCallback &socket_option_value) override; + + int getSocketOption(int socket_option_key, + ProducerInterestCallback &socket_option_value) override; + + int getSocketOption( + int socket_option_key, + ConsumerContentObjectVerificationCallback &socket_option_value) override; + + int getSocketOption( + int socket_option_key, + ConsumerContentObjectCallback &socket_option_value) override; + + int getSocketOption(int socket_option_key, + ConsumerInterestCallback &socket_option_value) override; + + int getSocketOption(int socket_option_key, + ConsumerContentCallback &socket_option_value) override; + + int getSocketOption(int socket_option_key, + ConsumerManifestCallback &socket_option_value) override; + + int getSocketOption(int socket_option_key, + std::shared_ptr &socket_option_value) override; + + int getSocketOption(int socket_option_key, + IcnObserver **socket_option_value) override; + + int getSocketOption(int socket_option_key, + HashAlgorithm &socket_option_value) override; + + int getSocketOption(int socket_option_key, + utils::CryptoSuite &socket_option_value) override; + + int getSocketOption(int socket_option_key, + utils::Identity &socket_option_value) override; + + int getSocketOption(int socket_option_key, + std::string &socket_option_value) override; + + int getSocketOption(int socket_option_key, + ProducerContentCallback &socket_option_value) override; + + int getSocketOption(int socket_option_key, + ConsumerTimerCallback &socket_option_value) override; + + protected: + asio::io_service internal_io_service_; + asio::io_service &io_service_; + std::shared_ptr portal_; + std::size_t data_packet_size_; + std::list served_namespaces_; + uint32_t content_object_expiry_time_; + + // buffers + utils::ContentStore output_buffer_; + + private: + utils::EventThread async_thread_; + + int registration_status_; + + bool making_manifest_; + + // map for storing sequence numbers for several calls of the publish function + std::unordered_map> seq_number_map_; + + int signature_type_; + int signature_size_; + + HashAlgorithm hash_algorithm_; + utils::CryptoSuite crypto_suite_; + std::unique_ptr identity_; + // utils::Signer& signer_; + + // buffers + + std::queue> input_buffer_; + std::atomic_size_t input_buffer_capacity_; + std::atomic_size_t input_buffer_size_; + +#ifndef PUSH_API + std::mutex pending_interests_mtx_; + std::unordered_map> pending_interests_; +#endif + + // threads + std::thread listening_thread_; + std::thread processing_thread_; + volatile bool processing_thread_stop_; + volatile bool listening_thread_stop_; + + // callbacks + protected: + ProducerInterestCallback on_interest_input_; + ProducerInterestCallback on_interest_dropped_input_buffer_; + ProducerInterestCallback on_interest_inserted_input_buffer_; + ProducerInterestCallback on_interest_satisfied_output_buffer_; + ProducerInterestCallback on_interest_process_; + + ProducerContentObjectCallback on_new_segment_; + ProducerContentObjectCallback on_content_object_to_sign_; + ProducerContentObjectCallback on_content_object_in_output_buffer_; + ProducerContentObjectCallback on_content_object_output_; + ProducerContentObjectCallback on_content_object_evicted_from_output_buffer_; + + ProducerContentCallback on_content_produced_; + + private: + void listen(); + + void passContentObjectToCallbacks( + const std::shared_ptr &content_object); +}; + +} // namespace interface + +} // end namespace transport -- cgit 1.2.3-korg