aboutsummaryrefslogtreecommitdiffstats
path: root/libtransport/src/hicn/transport/interfaces
diff options
context:
space:
mode:
authorLuca Muscariello <lumuscar+fdio@cisco.com>2019-01-17 13:47:57 +0100
committerLuca Muscariello <lumuscar+fdio@cisco.com>2019-01-17 16:32:51 +0100
commitbac3da61644515f05663789b122554dc77549286 (patch)
tree898210bc8e70371d77de7d446a26c5dd4fd1165a /libtransport/src/hicn/transport/interfaces
parentd5165246787301d0f13b646fda5e8a8567aef5ac (diff)
This is the first commit of the hicn projectv19.01
Change-Id: I6f2544ad9b9f8891c88cc4bcce3cf19bd3cc863f Signed-off-by: Luca Muscariello <lumuscar+fdio@cisco.com>
Diffstat (limited to 'libtransport/src/hicn/transport/interfaces')
-rwxr-xr-xlibtransport/src/hicn/transport/interfaces/CMakeLists.txt38
-rwxr-xr-xlibtransport/src/hicn/transport/interfaces/async_transport.h640
-rwxr-xr-xlibtransport/src/hicn/transport/interfaces/full_duplex_socket.cc490
-rwxr-xr-xlibtransport/src/hicn/transport/interfaces/full_duplex_socket.h254
-rwxr-xr-xlibtransport/src/hicn/transport/interfaces/publication_options.h34
-rwxr-xr-xlibtransport/src/hicn/transport/interfaces/rtc_socket_consumer.cc35
-rwxr-xr-xlibtransport/src/hicn/transport/interfaces/rtc_socket_consumer.h35
-rwxr-xr-xlibtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc157
-rwxr-xr-xlibtransport/src/hicn/transport/interfaces/rtc_socket_producer.h60
-rwxr-xr-xlibtransport/src/hicn/transport/interfaces/socket.h270
-rwxr-xr-xlibtransport/src/hicn/transport/interfaces/socket_consumer.cc735
-rwxr-xr-xlibtransport/src/hicn/transport/interfaces/socket_consumer.h259
-rwxr-xr-xlibtransport/src/hicn/transport/interfaces/socket_options_default_values.h68
-rwxr-xr-xlibtransport/src/hicn/transport/interfaces/socket_options_keys.h108
-rwxr-xr-xlibtransport/src/hicn/transport/interfaces/socket_producer.cc948
-rwxr-xr-xlibtransport/src/hicn/transport/interfaces/socket_producer.h269
16 files changed, 4400 insertions, 0 deletions
diff --git a/libtransport/src/hicn/transport/interfaces/CMakeLists.txt b/libtransport/src/hicn/transport/interfaces/CMakeLists.txt
new file mode 100755
index 000000000..cbf371bac
--- /dev/null
+++ b/libtransport/src/hicn/transport/interfaces/CMakeLists.txt
@@ -0,0 +1,38 @@
+# Copyright (c) 2017-2019 Cisco and/or its affiliates.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+cmake_minimum_required(VERSION 3.5 FATAL_ERROR)
+
+list(APPEND HEADER_FILES
+ ${CMAKE_CURRENT_SOURCE_DIR}/socket.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/socket_consumer.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/socket_producer.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_socket_consumer.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_socket_producer.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/async_transport.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/full_duplex_socket.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/publication_options.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/socket_options_default_values.h
+ ${CMAKE_CURRENT_SOURCE_DIR}/socket_options_keys.h
+)
+
+list(APPEND SOURCE_FILES
+ ${CMAKE_CURRENT_SOURCE_DIR}/full_duplex_socket.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_socket_consumer.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/rtc_socket_producer.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/socket_producer.cc
+ ${CMAKE_CURRENT_SOURCE_DIR}/socket_consumer.cc
+)
+
+set(SOURCE_FILES ${SOURCE_FILES} PARENT_SCOPE)
+set(HEADER_FILES ${HEADER_FILES} PARENT_SCOPE) \ No newline at end of file
diff --git a/libtransport/src/hicn/transport/interfaces/async_transport.h b/libtransport/src/hicn/transport/interfaces/async_transport.h
new file mode 100755
index 000000000..492b4ec26
--- /dev/null
+++ b/libtransport/src/hicn/transport/interfaces/async_transport.h
@@ -0,0 +1,640 @@
+
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/interfaces/publication_options.h>
+#include <hicn/transport/portability/portability.h>
+#include <hicn/transport/utils/sharable_vector.h>
+
+#include <sys/uio.h>
+#include <memory>
+
+namespace transport {
+
+namespace interface {
+
+/*
+ * flags given by the application for write* calls
+ */
+enum class WriteFlags : uint32_t {
+ NONE = 0x00,
+ /*
+ * Whether to delay the output until a subsequent non-corked write.
+ * (Note: may not be supported in all subclasses or on all platforms.)
+ */
+ CORK = 0x01,
+ /*
+ * for a socket that has ACK latency enabled, it will cause the kernel
+ * to fire a TCP ESTATS event when the last byte of the given write call
+ * will be acknowledged.
+ */
+ EOR = 0x02,
+ /*
+ * this indicates that only the write side of socket should be shutdown
+ */
+ WRITE_SHUTDOWN = 0x04,
+ /*
+ * use msg zerocopy if allowed
+ */
+ WRITE_MSG_ZEROCOPY = 0x08,
+};
+
+/*
+ * union operator
+ */
+TRANSPORT_ALWAYS_INLINE WriteFlags operator|(WriteFlags a, WriteFlags b) {
+ return static_cast<WriteFlags>(static_cast<uint32_t>(a) |
+ static_cast<uint32_t>(b));
+}
+
+/*
+ * compound assignment union operator
+ */
+TRANSPORT_ALWAYS_INLINE WriteFlags &operator|=(WriteFlags &a, WriteFlags b) {
+ a = a | b;
+ return a;
+}
+
+/*
+ * intersection operator
+ */
+TRANSPORT_ALWAYS_INLINE WriteFlags operator&(WriteFlags a, WriteFlags b) {
+ return static_cast<WriteFlags>(static_cast<uint32_t>(a) &
+ static_cast<uint32_t>(b));
+}
+
+/*
+ * compound assignment intersection operator
+ */
+TRANSPORT_ALWAYS_INLINE WriteFlags &operator&=(WriteFlags &a, WriteFlags b) {
+ a = a & b;
+ return a;
+}
+
+/*
+ * exclusion parameter
+ */
+TRANSPORT_ALWAYS_INLINE WriteFlags operator~(WriteFlags a) {
+ return static_cast<WriteFlags>(~static_cast<uint32_t>(a));
+}
+
+/*
+ * unset operator
+ */
+TRANSPORT_ALWAYS_INLINE WriteFlags unSet(WriteFlags a, WriteFlags b) {
+ return a & ~b;
+}
+
+/*
+ * inclusion operator
+ */
+TRANSPORT_ALWAYS_INLINE bool isSet(WriteFlags a, WriteFlags b) {
+ return (a & b) == b;
+}
+
+class ConnectCallback {
+ public:
+ virtual ~ConnectCallback() = default;
+
+ /**
+ * connectSuccess() will be invoked when the connection has been
+ * successfully established.
+ */
+ virtual void connectSuccess() noexcept = 0;
+
+ /**
+ * connectErr() will be invoked if the connection attempt fails.
+ *
+ * @param ex An exception describing the error that occurred.
+ */
+ virtual void connectErr(const std::error_code ec) noexcept = 0;
+};
+
+/**
+ * AsyncSocket defines an asynchronous API for streaming I/O.
+ *
+ * This class provides an API to for asynchronously waiting for data
+ * on a streaming transport, and for asynchronously sending data.
+ *
+ * The APIs for reading and writing are intentionally asymmetric. Waiting for
+ * data to read is a persistent API: a callback is installed, and is notified
+ * whenever new data is available. It continues to be notified of new events
+ * until it is uninstalled.
+ *
+ * AsyncSocket does not provide read timeout functionality, because it
+ * typically cannot determine when the timeout should be active. Generally, a
+ * timeout should only be enabled when processing is blocked waiting on data
+ * from the remote endpoint. For server-side applications, the timeout should
+ * not be active if the server is currently processing one or more outstanding
+ * requests on this transport. For client-side applications, the timeout
+ * should not be active if there are no requests pending on the transport.
+ * Additionally, if a client has multiple pending requests, it will ususally
+ * want a separate timeout for each request, rather than a single read timeout.
+ *
+ * The write API is fairly intuitive: a user can request to send a block of
+ * data, and a callback will be informed once the entire block has been
+ * transferred to the kernel, or on error. AsyncSocket does provide a send
+ * timeout, since most callers want to give up if the remote end stops
+ * responding and no further progress can be made sending the data.
+ */
+class AsyncSocket {
+ public:
+ /**
+ * Close the transport.
+ *
+ * This gracefully closes the transport, waiting for all pending write
+ * requests to complete before actually closing the underlying transport.
+ *
+ * If a read callback is set, readEOF() will be called immediately. If there
+ * are outstanding write requests, the close will be delayed until all
+ * remaining writes have completed. No new writes may be started after
+ * close() has been called.
+ */
+ virtual void close() = 0;
+
+ /**
+ * Close the transport immediately.
+ *
+ * This closes the transport immediately, dropping any outstanding data
+ * waiting to be written.
+ *
+ * If a read callback is set, readEOF() will be called immediately.
+ * If there are outstanding write requests, these requests will be aborted
+ * and writeError() will be invoked immediately on all outstanding write
+ * callbacks.
+ */
+ virtual void closeNow() = 0;
+
+ /**
+ * Perform a half-shutdown of the write side of the transport.
+ *
+ * The caller should not make any more calls to write() or writev() after
+ * shutdownWrite() is called. Any future write attempts will fail
+ * immediately.
+ *
+ * Not all transport types support half-shutdown. If the underlying
+ * transport does not support half-shutdown, it will fully shutdown both the
+ * read and write sides of the transport. (Fully shutting down the socket is
+ * better than doing nothing at all, since the caller may rely on the
+ * shutdownWrite() call to notify the other end of the connection that no
+ * more data can be read.)
+ *
+ * If there is pending data still waiting to be written on the transport,
+ * the actual shutdown will be delayed until the pending data has been
+ * written.
+ *
+ * Note: There is no corresponding shutdownRead() equivalent. Simply
+ * uninstall the read callback if you wish to stop reading. (On TCP sockets
+ * at least, shutting down the read side of the socket is a no-op anyway.)
+ */
+ virtual void shutdownWrite() = 0;
+
+ /**
+ * Perform a half-shutdown of the write side of the transport.
+ *
+ * shutdownWriteNow() is identical to shutdownWrite(), except that it
+ * immediately performs the shutdown, rather than waiting for pending writes
+ * to complete. Any pending write requests will be immediately failed when
+ * shutdownWriteNow() is called.
+ */
+ virtual void shutdownWriteNow() = 0;
+
+ /**
+ * Determine if transport is open and ready to read or write.
+ *
+ * Note that this function returns false on EOF; you must also call error()
+ * to distinguish between an EOF and an error.
+ *
+ * @return true iff the transport is open and ready, false otherwise.
+ */
+ virtual bool good() const = 0;
+
+ /**
+ * Determine if the transport is readable or not.
+ *
+ * @return true iff the transport is readable, false otherwise.
+ */
+ virtual bool readable() const = 0;
+
+ /**
+ * Determine if the transport is writable or not.
+ *
+ * @return true iff the transport is writable, false otherwise.
+ */
+ virtual bool writable() const {
+ // By default return good() - leave it to implementers to override.
+ return good();
+ }
+
+ /**
+ * Determine if the there is pending data on the transport.
+ *
+ * @return true iff the if the there is pending data, false otherwise.
+ */
+ virtual bool isPending() const { return readable(); }
+
+ /**
+ * Determine if transport is connected to the endpoint
+ *
+ * @return false iff the transport is connected, otherwise true
+ */
+ virtual bool connected() const = 0;
+
+ /**
+ * Determine if an error has occurred with this transport.
+ *
+ * @return true iff an error has occurred (not EOF).
+ */
+ virtual bool error() const = 0;
+
+ // /**
+ // * Attach the transport to a EventBase.
+ // *
+ // * This may only be called if the transport is not currently attached to a
+ // * EventBase (by an earlier call to detachEventBase()).
+ // *
+ // * This method must be invoked in the EventBase's thread.
+ // */
+ // virtual void attachEventBase(EventBase* eventBase) = 0;
+
+ // /**
+ // * Detach the transport from its EventBase.
+ // *
+ // * This may only be called when the transport is idle and has no reads or
+ // * writes pending. Once detached, the transport may not be used again
+ // until
+ // * it is re-attached to a EventBase by calling attachEventBase().
+ // *
+ // * This method must be called from the current EventBase's thread.
+ // */
+ // virtual void detachEventBase() = 0;
+
+ // /**
+ // * Determine if the transport can be detached.
+ // *
+ // * This method must be called from the current EventBase's thread.
+ // */
+ // virtual bool isDetachable() const = 0;
+
+ /**
+ * Set the send timeout.
+ *
+ * If write requests do not make any progress for more than the specified
+ * number of milliseconds, fail all pending writes and close the transport.
+ *
+ * If write requests are currently pending when setSendTimeout() is called,
+ * the timeout interval is immediately restarted using the new value.
+ *
+ * @param milliseconds The timeout duration, in milliseconds. If 0, no
+ * timeout will be used.
+ */
+ virtual void setSendTimeout(uint32_t milliseconds) = 0;
+
+ /**
+ * Get the send timeout.
+ *
+ * @return Returns the current send timeout, in milliseconds. A return value
+ * of 0 indicates that no timeout is set.
+ */
+ virtual uint32_t getSendTimeout() const = 0;
+
+ virtual void connect(ConnectCallback *callback,
+ const core::Prefix &prefix_) = 0;
+
+ // /**
+ // * Get the address of the local endpoint of this transport.
+ // *
+ // * This function may throw AsyncSocketException on error.
+ // *
+ // * @param address The local address will be stored in the specified
+ // * SocketAddress.
+ // */
+ // virtual void getLocalAddress(* address) const = 0;
+
+ virtual size_t getAppBytesWritten() const = 0;
+ virtual size_t getRawBytesWritten() const = 0;
+ virtual size_t getAppBytesReceived() const = 0;
+ virtual size_t getRawBytesReceived() const = 0;
+
+ class BufferCallback {
+ public:
+ virtual ~BufferCallback() {}
+ virtual void onEgressBuffered() = 0;
+ virtual void onEgressBufferCleared() = 0;
+ };
+
+ ~AsyncSocket() = default;
+};
+
+class AsyncAcceptor {
+ public:
+ class AcceptCallback {
+ public:
+ virtual ~AcceptCallback() = default;
+
+ /**
+ * connectionAccepted() is called whenever a new client connection is
+ * received.
+ *
+ * The AcceptCallback will remain installed after connectionAccepted()
+ * returns.
+ *
+ * @param fd The newly accepted client socket. The AcceptCallback
+ * assumes ownership of this socket, and is responsible
+ * for closing it when done. The newly accepted file
+ * descriptor will have already been put into
+ * non-blocking mode.
+ * @param clientAddr A reference to a SocketAddress struct containing the
+ * client's address. This struct is only guaranteed to
+ * remain valid until connectionAccepted() returns.
+ */
+ virtual void connectionAccepted(
+ const core::Name &subscriber_name) noexcept = 0;
+
+ /**
+ * acceptError() is called if an error occurs while accepting.
+ *
+ * The AcceptCallback will remain installed even after an accept error,
+ * as the errors are typically somewhat transient, such as being out of
+ * file descriptors. The server socket must be explicitly stopped if you
+ * wish to stop accepting after an error.
+ *
+ * @param ex An exception representing the error.
+ */
+ virtual void acceptError(const std::exception &ex) noexcept = 0;
+
+ /**
+ * acceptStarted() will be called in the callback's EventBase thread
+ * after this callback has been added to the AsyncServerSocket.
+ *
+ * acceptStarted() will be called before any calls to connectionAccepted()
+ * or acceptError() are made on this callback.
+ *
+ * acceptStarted() makes it easier for callbacks to perform initialization
+ * inside the callback thread. (The call to addAcceptCallback() must
+ * always be made from the AsyncServerSocket's primary EventBase thread.
+ * acceptStarted() provides a hook that will always be invoked in the
+ * callback's thread.)
+ *
+ * Note that the call to acceptStarted() is made once the callback is
+ * added, regardless of whether or not the AsyncServerSocket is actually
+ * accepting at the moment. acceptStarted() will be called even if the
+ * AsyncServerSocket is paused when the callback is added (including if
+ * the initial call to startAccepting() on the AsyncServerSocket has not
+ * been made yet).
+ */
+ virtual void acceptStarted() noexcept {}
+
+ /**
+ * acceptStopped() will be called when this AcceptCallback is removed from
+ * the AsyncServerSocket, or when the AsyncServerSocket is destroyed,
+ * whichever occurs first.
+ *
+ * No more calls to connectionAccepted() or acceptError() will be made
+ * after acceptStopped() is invoked.
+ */
+ virtual void acceptStopped() noexcept {}
+ };
+
+ /**
+ * Wait for subscribers
+ *
+ */
+ virtual void waitForSubscribers(AcceptCallback *cb) = 0;
+};
+
+class AsyncReader {
+ public:
+ class ReadCallback {
+ public:
+ virtual ~ReadCallback() = default;
+
+ /**
+ * When data becomes available, getReadBuffer() will be invoked to get the
+ * buffer into which data should be read.
+ *
+ * This method allows the ReadCallback to delay buffer allocation until
+ * data becomes available. This allows applications to manage large
+ * numbers of idle connections, without having to maintain a separate read
+ * buffer for each idle connection.
+ *
+ * It is possible that in some cases, getReadBuffer() may be called
+ * multiple times before readDataAvailable() is invoked. In this case, the
+ * data will be written to the buffer returned from the most recent call to
+ * readDataAvailable(). If the previous calls to readDataAvailable()
+ * returned different buffers, the ReadCallback is responsible for ensuring
+ * that they are not leaked.
+ *
+ * If getReadBuffer() throws an exception, returns a nullptr buffer, or
+ * returns a 0 length, the ReadCallback will be uninstalled and its
+ * readError() method will be invoked.
+ *
+ * getReadBuffer() is not allowed to change the transport state before it
+ * returns. (For example, it should never uninstall the read callback, or
+ * set a different read callback.)
+ *
+ * @param bufReturn getReadBuffer() should update *bufReturn to contain the
+ * address of the read buffer. This parameter will never
+ * be nullptr.
+ * @param lenReturn getReadBuffer() should update *lenReturn to contain the
+ * maximum number of bytes that may be written to the read
+ * buffer. This parameter will never be nullptr.
+ *
+ *
+ * XXX TODO this does not seems to be completely true Checlk i/.
+ */
+ virtual void getReadBuffer(void **bufReturn, size_t *lenReturn) = 0;
+
+ /**
+ * readDataAvailable() will be invoked when data has been successfully read
+ * into the buffer returned by the last call to getReadBuffer().
+ *
+ * The read callback remains installed after readDataAvailable() returns.
+ * It must be explicitly uninstalled to stop receiving read events.
+ * getReadBuffer() will be called at least once before each call to
+ * readDataAvailable(). getReadBuffer() will also be called before any
+ * call to readEOF().
+ *
+ * @param len The number of bytes placed in the buffer.
+ */
+
+ virtual void readDataAvailable(size_t len) noexcept = 0;
+
+ /**
+ * When data becomes available, isBufferMovable() will be invoked to figure
+ * out which API will be used, readBufferAvailable() or
+ * readDataAvailable(). If isBufferMovable() returns true, that means
+ * ReadCallback supports the IOBuf ownership transfer and
+ * readBufferAvailable() will be used. Otherwise, not.
+
+ * By default, isBufferMovable() always return false. If
+ * readBufferAvailable() is implemented and to be invoked, You should
+ * overwrite isBufferMovable() and return true in the inherited class.
+ *
+ * This method allows the AsyncSocket/AsyncSSLSocket do buffer allocation by
+ * itself until data becomes available. Compared with the pre/post buffer
+ * allocation in getReadBuffer()/readDataAvailabe(), readBufferAvailable()
+ * has two advantages. First, this can avoid memcpy. E.g., in
+ * AsyncSSLSocket, the decrypted data was copied from the openssl internal
+ * buffer to the readbuf buffer. With the buffer ownership transfer, the
+ * internal buffer can be directly "moved" to ReadCallback. Second, the
+ * memory allocation can be more precise. The reason is
+ * AsyncSocket/AsyncSSLSocket can allocate the memory of precise size
+ * because they have more context about the available data than
+ * ReadCallback. Think about the getReadBuffer() pre-allocate 4072 bytes
+ * buffer, but the available data is always 16KB (max OpenSSL record size).
+ */
+
+ virtual bool isBufferMovable() noexcept { return false; }
+
+ /**
+ * Suggested buffer size, allocated for read operations,
+ * if callback is movable and supports folly::IOBuf
+ */
+
+ virtual size_t maxBufferSize() const {
+ return 64 * 1024; // 64K
+ }
+
+ /**
+ * readBufferAvailable() will be invoked when data has been successfully
+ * read.
+ *
+ * Note that only either readBufferAvailable() or readDataAvailable() will
+ * be invoked according to the return value of isBufferMovable(). The timing
+ * and aftereffect of readBufferAvailable() are the same as
+ * readDataAvailable()
+ *
+ * @param readBuf The unique pointer of read buffer.
+ */
+
+ // virtual void readBufferAvailable(uint8_t** buffer, std::size_t
+ // *buf_length) noexcept {}
+
+ virtual void readBufferAvailable(
+ utils::SharableVector<uint8_t> &&buffer) noexcept {}
+
+ // virtual void readBufferAvailable(utils::SharableBuffer<uint8_t>&& buffer)
+ // noexcept {}
+
+ /**
+ * readEOF() will be invoked when the transport is closed.
+ *
+ * The read callback will be automatically uninstalled immediately before
+ * readEOF() is invoked.
+ */
+ virtual void readEOF() noexcept = 0;
+
+ /**
+ * readError() will be invoked if an error occurs reading from the
+ * transport.
+ *
+ * The read callback will be automatically uninstalled immediately before
+ * readError() is invoked.
+ *
+ * @param ex An exception describing the error that occurred.
+ */
+ virtual void readErr(const std::error_code ec) noexcept = 0;
+ };
+
+ // Read methods that aren't part of AsyncTransport.
+ virtual void setReadCB(ReadCallback *callback) = 0;
+ virtual ReadCallback *getReadCallback() const = 0;
+
+ protected:
+ virtual ~AsyncReader() = default;
+};
+
+class AsyncWriter {
+ public:
+ class WriteCallback {
+ public:
+ virtual ~WriteCallback() = default;
+
+ /**
+ * writeSuccess() will be invoked when all of the data has been
+ * successfully written.
+ *
+ * Note that this mainly signals that the buffer containing the data to
+ * write is no longer needed and may be freed or re-used. It does not
+ * guarantee that the data has been fully transmitted to the remote
+ * endpoint. For example, on socket-based transports, writeSuccess() only
+ * indicates that the data has been given to the kernel for eventual
+ * transmission.
+ */
+ virtual void writeSuccess() noexcept = 0;
+
+ /**
+ * writeError() will be invoked if an error occurs writing the data.
+ *
+ * @param bytesWritten The number of bytes that were successfull
+ * @param ex An exception describing the error that occurred.
+ */
+ virtual void writeErr(size_t bytesWritten) noexcept = 0;
+ };
+
+ /**
+ * If you supply a non-null WriteCallback, exactly one of writeSuccess()
+ * or writeErr() will be invoked when the write completes. If you supply
+ * the same WriteCallback object for multiple write() calls, it will be
+ * invoked exactly once per call. The only way to cancel outstanding
+ * write requests is to close the socket (e.g., with closeNow() or
+ * shutdownWriteNow()). When closing the socket this way, writeErr() will
+ * still be invoked once for each outstanding write operation.
+ */
+ virtual void write(WriteCallback *callback, const void *buf, size_t bytes,
+ const PublicationOptions &options,
+ WriteFlags flags = WriteFlags::NONE) = 0;
+
+ /**
+ * If you supply a non-null WriteCallback, exactly one of writeSuccess()
+ * or writeErr() will be invoked when the write completes. If you supply
+ * the same WriteCallback object for multiple write() calls, it will be
+ * invoked exactly once per call. The only way to cancel outstanding
+ * write requests is to close the socket (e.g., with closeNow() or
+ * shutdownWriteNow()). When closing the socket this way, writeErr() will
+ * still be invoked once for each outstanding write operation.
+ */
+ virtual void write(WriteCallback *callback,
+ utils::SharableVector<uint8_t> &&output_buffer,
+ const PublicationOptions &options,
+ WriteFlags flags = WriteFlags::NONE) = 0;
+
+ // /**
+ // * If you supply a non-null WriteCallback, exactly one of writeSuccess()
+ // * or writeErr() will be invoked when the write completes. If you supply
+ // * the same WriteCallback object for multiple write() calls, it will be
+ // * invoked exactly once per call. The only way to cancel outstanding
+ // * write requests is to close the socket (e.g., with closeNow() or
+ // * shutdownWriteNow()). When closing the socket this way, writeErr() will
+ // * still be invoked once for each outstanding write operation.
+ // */
+ // virtual void writeChain(
+ // WriteCallback* callback,
+ // std::unique_ptr<IOBuf>&& buf,
+ // WriteFlags flags = WriteFlags::NONE) = 0;
+
+ virtual void setWriteCB(WriteCallback *callback) = 0;
+ virtual WriteCallback *getWriteCallback() const = 0;
+
+ protected:
+ virtual ~AsyncWriter() = default;
+};
+
+} // namespace interface
+
+} // namespace transport \ No newline at end of file
diff --git a/libtransport/src/hicn/transport/interfaces/full_duplex_socket.cc b/libtransport/src/hicn/transport/interfaces/full_duplex_socket.cc
new file mode 100755
index 000000000..7b6342262
--- /dev/null
+++ b/libtransport/src/hicn/transport/interfaces/full_duplex_socket.cc
@@ -0,0 +1,490 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/transport/interfaces/full_duplex_socket.h>
+#include <hicn/transport/interfaces/socket_options_default_values.h>
+#include <hicn/transport/utils/sharable_vector.h>
+
+#include <memory>
+
+namespace transport {
+
+namespace interface {
+
+static const std::string producer_identity = "producer_socket";
+
+AsyncFullDuplexSocket::AsyncFullDuplexSocket(const Prefix &locator)
+ : AsyncFullDuplexSocket(locator, internal_io_service_) {}
+
+AsyncFullDuplexSocket::AsyncFullDuplexSocket(const Prefix &locator,
+ asio::io_service &io_service)
+ : locator_(locator),
+ incremental_suffix_(0),
+ io_service_(io_service),
+ work_(io_service),
+ producer_(std::make_unique<ProducerSocket>(io_service_)),
+ consumer_(std::make_unique<ConsumerSocket>(
+ TransportProtocolAlgorithms::RAAQM /* , io_service_ */)),
+ read_callback_(nullptr),
+ write_callback_(nullptr),
+ connect_callback_(nullptr),
+ accept_callback_(nullptr),
+ internal_connect_callback_(new OnConnectCallback(*this)),
+ internal_signal_callback_(new OnSignalCallback(*this)),
+ send_timeout_milliseconds_(~0),
+ counters_({0}),
+ receive_buffer_(std::make_shared<utils::SharableVector<uint8_t>>()) {
+ using namespace transport;
+ using namespace std::placeholders;
+ producer_->registerPrefix(locator);
+
+ producer_->setSocketOption(
+ ProducerCallbacksOptions::CACHE_MISS,
+ std::bind(&AsyncFullDuplexSocket::onControlInterest, this, _1, _2));
+
+ producer_->setSocketOption(GeneralTransportOptions::OUTPUT_BUFFER_SIZE,
+ uint32_t{150000});
+
+ producer_->setSocketOption(
+ ProducerCallbacksOptions::CONTENT_PRODUCED,
+ std::bind(&AsyncFullDuplexSocket::onContentProduced, this, _1, _2, _3));
+
+ producer_->connect();
+
+ consumer_->setSocketOption(ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY,
+ (ConsumerContentObjectVerificationCallback)[](
+ ConsumerSocket & s, const ContentObject &c)
+ ->bool { return true; });
+
+ consumer_->setSocketOption(
+ ConsumerCallbacksOptions::CONTENT_RETRIEVED,
+ std::bind(&AsyncFullDuplexSocket::onContentRetrieved, this, _1, _2, _3));
+
+ consumer_->setSocketOption(GeneralTransportOptions::MAX_INTEREST_RETX,
+ uint32_t{4});
+
+ consumer_->connect();
+}
+
+void AsyncFullDuplexSocket::close() {
+ this->consumer_->stop();
+ this->producer_->stop();
+}
+
+void AsyncFullDuplexSocket::closeNow() { close(); }
+
+void AsyncFullDuplexSocket::shutdownWrite() { producer_->stop(); }
+
+void AsyncFullDuplexSocket::shutdownWriteNow() { shutdownWrite(); }
+
+bool AsyncFullDuplexSocket::good() const { return true; }
+
+bool AsyncFullDuplexSocket::readable() const {
+ // TODO return status of consumer socket
+ return true;
+}
+
+bool AsyncFullDuplexSocket::writable() const {
+ // TODO return status of producer socket
+ return true;
+}
+
+bool AsyncFullDuplexSocket::isPending() const {
+ // TODO save if there are production operation in the ops queue
+ // in producer socket
+ return true;
+}
+
+bool AsyncFullDuplexSocket::connected() const {
+ // No real connection here (ICN world). Return good
+ return good();
+}
+
+bool AsyncFullDuplexSocket::error() const { return !good(); }
+
+void AsyncFullDuplexSocket::setSendTimeout(uint32_t milliseconds) {
+ // TODO if production takes too much to complete
+ // let's abort the operation.
+
+ // Normally with hicn this should be done for content
+ // pull, not for production.
+
+ send_timeout_milliseconds_ = milliseconds;
+}
+
+uint32_t AsyncFullDuplexSocket::getSendTimeout() const {
+ return send_timeout_milliseconds_;
+}
+
+size_t AsyncFullDuplexSocket::getAppBytesWritten() const {
+ return counters_.app_bytes_written_;
+}
+
+size_t AsyncFullDuplexSocket::getRawBytesWritten() const { return 0; }
+
+size_t AsyncFullDuplexSocket::getAppBytesReceived() const {
+ return counters_.app_bytes_read_;
+}
+
+size_t AsyncFullDuplexSocket::getRawBytesReceived() const { return 0; }
+
+void AsyncFullDuplexSocket::connect(ConnectCallback *callback,
+ const core::Prefix &prefix) {
+ connect_callback_ = callback;
+
+ // Create an interest for a subscription
+ auto interest =
+ core::Interest::Ptr(new core::Interest(prefix.makeRandomName()));
+ auto _payload = utils::MemBuf::create(sizeof(ActionMessage));
+ _payload->append(sizeof(ActionMessage));
+ auto payload = _payload->writableData();
+ ActionMessage *subscription_message =
+ reinterpret_cast<ActionMessage *>(payload);
+ subscription_message->header.msg_type = MessageType::ACTION;
+ subscription_message->action = Action::SUBSCRIBE;
+ subscription_message->header.reserved[0] = 0;
+ subscription_message->header.reserved[1] = 0;
+
+ // Set the name the other part should use for notifying a content production
+ sync_notification_ = std::move(locator_.makeRandomName());
+ sync_notification_.copyToDestination(
+ reinterpret_cast<uint8_t *>(subscription_message->name));
+
+ TRANSPORT_LOGI(
+ "Trying to connect. Sending interest: %s, name for notifications: %s",
+ prefix.getName().toString().c_str(),
+ sync_notification_.toString().c_str());
+
+ interest->setLifetime(1000);
+ interest->appendPayload(std::move(_payload));
+ consumer_->asyncSendInterest(std::move(interest),
+ internal_connect_callback_.get());
+}
+
+void AsyncFullDuplexSocket::write(WriteCallback *callback, const void *buf,
+ size_t bytes,
+ const PublicationOptions &options,
+ WriteFlags flags) {
+ using namespace transport;
+
+ // 1 asynchronously write the content. I assume here the
+ // buffer contains the whole application frame. FIXME: check
+ // if this is true and fix it accordingly
+ std::cout << "Size of the PAYLOAD: " << bytes << std::endl;
+
+ if (bytes > core::Packet::default_mtu - sizeof(PayloadMessage)) {
+ TRANSPORT_LOGI("Producing content with name %s",
+ options.name.toString().c_str());
+ producer_->asyncProduce(options.name,
+ reinterpret_cast<const uint8_t *>(buf), bytes);
+ signalProductionToSubscribers(options.name);
+ } else {
+ TRANSPORT_LOGI("Sending payload through interest");
+ piggybackPayloadToSubscribers(
+ options.name, reinterpret_cast<const uint8_t *>(buf), bytes);
+ }
+}
+
+void AsyncFullDuplexSocket::write(
+ WriteCallback *callback, utils::SharableVector<uint8_t> &&output_buffer,
+ const PublicationOptions &options, WriteFlags flags) {
+ using namespace transport;
+
+ // 1 asynchronously write the content. I assume here the
+ // buffer contains the whole application frame. FIXME: check
+ // if this is true and fix it accordingly
+ std::cout << "Size of the PAYLOAD: " << output_buffer.size() << std::endl;
+
+ if (output_buffer.size() >
+ core::Packet::default_mtu - sizeof(PayloadMessage)) {
+ TRANSPORT_LOGI("Producing content with name %s",
+ options.name.toString().c_str());
+ producer_->asyncProduce(options.name, std::move(output_buffer));
+ signalProductionToSubscribers(options.name);
+ } else {
+ TRANSPORT_LOGI("Sending payload through interest");
+ piggybackPayloadToSubscribers(options.name, &output_buffer[0],
+ output_buffer.size());
+ }
+}
+
+void AsyncFullDuplexSocket::piggybackPayloadToSubscribers(
+ const core::Name &name, const uint8_t *buffer, std::size_t bytes) {
+ for (auto &sub : subscribers_) {
+ auto interest = core::Interest::Ptr(new core::Interest(name));
+ auto _payload = utils::MemBuf::create(bytes + sizeof(PayloadMessage));
+ _payload->append(bytes + sizeof(PayloadMessage));
+ auto payload = _payload->writableData();
+
+ PayloadMessage *interest_payload =
+ reinterpret_cast<PayloadMessage *>(payload);
+ interest_payload->header.msg_type = MessageType::PAYLOAD;
+ interest_payload->header.reserved[0] = 0;
+ interest_payload->header.reserved[1] = 0;
+ interest_payload->reserved[0] = 0;
+ std::memcpy(payload + sizeof(PayloadMessage), buffer, bytes);
+ interest->appendPayload(std::move(_payload));
+
+ // Set the timeout of 0.2 second
+ interest->setLifetime(1000);
+ interest->setName(sub);
+ interest->getWritableName().setSuffix(incremental_suffix_++);
+ // TRANSPORT_LOGI("Sending signalization to %s",
+ // interest->getName().toString().c_str());
+
+ consumer_->asyncSendInterest(std::move(interest),
+ internal_signal_callback_.get());
+ }
+}
+
+void AsyncFullDuplexSocket::signalProductionToSubscribers(
+ const core::Name &name) {
+ // Signal the other part we are producing a content
+ // Create an interest for a subscription
+
+ for (auto &sub : subscribers_) {
+ auto interest = core::Interest::Ptr(new core::Interest(name));
+ // Todo consider using preallocated pool of membufs
+ auto _payload = utils::MemBuf::create(sizeof(ActionMessage));
+ _payload->append(sizeof(ActionMessage));
+ auto payload = const_cast<uint8_t *>(interest->getPayload().data());
+
+ ActionMessage *produce_notification =
+ reinterpret_cast<ActionMessage *>(payload);
+ produce_notification->header.msg_type = MessageType::ACTION;
+ produce_notification->action = Action::SIGNAL_PRODUCTION;
+ produce_notification->header.reserved[0] = 0;
+ produce_notification->header.reserved[1] = 0;
+ name.copyToDestination(
+ reinterpret_cast<uint8_t *>(produce_notification->name));
+ interest->appendPayload(std::move(_payload));
+
+ // Set the timeout of 0.2 second
+ interest->setLifetime(1000);
+ interest->setName(sub);
+ interest->getWritableName().setSuffix(incremental_suffix_++);
+ // TRANSPORT_LOGI("Sending signalization to %s",
+ // interest->getName().toString().c_str());
+
+ consumer_->asyncSendInterest(std::move(interest),
+ internal_signal_callback_.get());
+ }
+}
+
+void AsyncFullDuplexSocket::waitForSubscribers(AcceptCallback *cb) {
+ accept_callback_ = cb;
+}
+
+std::shared_ptr<core::ContentObject>
+AsyncFullDuplexSocket::decodeSynchronizationMessage(
+ const core::Interest &interest) {
+ auto mesg = interest.getPayload();
+ const MessageHeader *header =
+ reinterpret_cast<const MessageHeader *>(mesg.data());
+
+ switch (header->msg_type) {
+ case MessageType::ACTION: {
+ // Check what is the action to perform
+ const ActionMessage *message =
+ reinterpret_cast<const ActionMessage *>(header);
+
+ if (message->action == Action::SUBSCRIBE) {
+ // Add consumer to list on consumers to be notified
+ auto ret =
+ subscribers_.emplace(AF_INET6, (const uint8_t *)message->name, 0);
+ TRANSPORT_LOGI("Added subscriber %s :)", ret.first->toString().c_str());
+ if (ret.second) {
+ accept_callback_->connectionAccepted(*ret.first);
+ }
+
+ TRANSPORT_LOGI("Connection success!");
+
+ sync_notification_ = std::move(locator_.makeRandomName());
+ return createSubscriptionResponse(sync_notification_);
+
+ } else if (message->action == Action::CANCEL_SUBSCRIPTION) {
+ // XXX Modify name!!! Each allocated name allocates a 128 bit array.
+ subscribers_.erase(
+ core::Name(AF_INET6, (const uint8_t *)message->name, 0));
+ return createAck();
+ } else if (message->action == Action::SIGNAL_PRODUCTION) {
+ // trigger a reverse pull for the name contained in the message
+ core::Name n(AF_INET6, (const uint8_t *)message->name, 0);
+ std::cout << "PROD NOTIFICATION: Content to retrieve: " << n
+ << std::endl;
+ std::cout << "PROD NOTIFICATION: Interest name: " << interest.getName()
+ << std::endl; // << " compared to " << sync_notification_ <<
+ // std::endl;
+
+ if (sync_notification_.equals(interest.getName(), false)) {
+ std::cout << "Starting reverse pull for " << n << std::endl;
+ consumer_->asyncConsume(n, receive_buffer_);
+ return createAck();
+ }
+ } else {
+ TRANSPORT_LOGE("Received unknown message. Dropping it.");
+ }
+
+ break;
+ }
+ case MessageType::RESPONSE: {
+ throw errors::RuntimeException(
+ "The response should be a content object!!");
+ }
+ case MessageType::PAYLOAD: {
+ // The interest contains the payload directly.
+ // We saved one round trip :)
+
+ auto buffer = std::make_shared<utils::SharableVector<uint8_t>>();
+ const uint8_t *data = mesg.data() + sizeof(PayloadMessage);
+ buffer->assign(data, data + mesg.length() - sizeof(PayloadMessage));
+ read_callback_->readBufferAvailable(std::move(*buffer));
+ return createAck();
+ }
+ default: {
+ return std::shared_ptr<core::ContentObject>(nullptr);
+ }
+ }
+
+ return std::shared_ptr<core::ContentObject>(nullptr);
+}
+
+void AsyncFullDuplexSocket::onControlInterest(ProducerSocket &s,
+ const core::Interest &i) {
+ auto payload = i.getPayload();
+ if (payload.length()) {
+ // Try to decode payload and see if starting an async pull operation
+ auto response = decodeSynchronizationMessage(i);
+ if (response) {
+ response->setName(i.getName());
+ s.produce(*response);
+ }
+ }
+}
+
+void AsyncFullDuplexSocket::onContentProduced(ProducerSocket &producer,
+ const std::error_code &ec,
+ uint64_t bytes_written) {
+ if (write_callback_) {
+ if (!ec) {
+ write_callback_->writeSuccess();
+ } else {
+ write_callback_->writeErr(bytes_written);
+ }
+ }
+}
+
+void AsyncFullDuplexSocket::onContentRetrieved(ConsumerSocket &s,
+ std::size_t size,
+ const std::error_code &ec) {
+ // Sanity check
+ if (size != receive_buffer_->size()) {
+ TRANSPORT_LOGE(
+ "Received content size differs from size retrieved from the buffer.");
+ return;
+ }
+
+ TRANSPORT_LOGI("Received content with size %lu", size);
+ if (!ec) {
+ read_callback_->readBufferAvailable(std::move(*receive_buffer_));
+ } else {
+ TRANSPORT_LOGE("Error retrieving content.");
+ }
+ // consumer_->stop();
+}
+
+void AsyncFullDuplexSocket::OnConnectCallback::onContentObject(
+ core::Interest::Ptr &&, core::ContentObject::Ptr &&content_object) {
+ // The ack message should contain the name to be used for notifying
+ // the production of the content to the other part
+
+ if (content_object->getPayload().length() == 0) {
+ TRANSPORT_LOGW("Connection response message empty....");
+ return;
+ }
+
+ SubscriptionResponseMessage *response =
+ reinterpret_cast<SubscriptionResponseMessage *>(
+ content_object->getPayload().writableData());
+
+ if (response->response.header.msg_type == MessageType::RESPONSE) {
+ if (response->response.return_code == ReturnCode::OK) {
+ auto ret =
+ socket_.subscribers_.emplace(AF_INET6, (uint8_t *)response->name, 0);
+ TRANSPORT_LOGI("Successfully connected!!!! Subscriber added: %s",
+ ret.first->toString().c_str());
+ socket_.connect_callback_->connectSuccess();
+ }
+ }
+}
+
+void AsyncFullDuplexSocket::OnSignalCallback::onContentObject(
+ core::Interest::Ptr &&, core::ContentObject::Ptr &&content_object) {
+ return;
+}
+
+void AsyncFullDuplexSocket::OnSignalCallback::onTimeout(
+ core::Interest::Ptr &&interest) {
+ TRANSPORT_LOGE("Retransmitting signalization interest to %s!!",
+ interest->getName().toString().c_str());
+ socket_.consumer_->asyncSendInterest(std::move(interest),
+ socket_.internal_signal_callback_.get());
+}
+
+void AsyncFullDuplexSocket::OnConnectCallback::onTimeout(
+ core::Interest::Ptr &&interest) {
+ socket_.connect_callback_->connectErr(
+ std::make_error_code(std::errc::not_connected));
+}
+
+std::shared_ptr<core::ContentObject> AsyncFullDuplexSocket::createAck() {
+ // Send the response back
+ core::Name name("b001::abcd");
+ auto response = std::make_shared<core::ContentObject>(name);
+ auto _payload = utils::MemBuf::create(sizeof(ActionMessage));
+ _payload->append(sizeof(ResponseMessage));
+ auto payload = response->getPayload().data();
+ ResponseMessage *response_message = (ResponseMessage *)payload;
+ response_message->header.msg_type = MessageType::RESPONSE;
+ response_message->header.reserved[0] = 0;
+ response_message->header.reserved[1] = 0;
+ response_message->return_code = ReturnCode::OK;
+ response->appendPayload(std::move(_payload));
+ response->setLifetime(0);
+ return response;
+}
+
+std::shared_ptr<core::ContentObject>
+AsyncFullDuplexSocket::createSubscriptionResponse(const core::Name &name) {
+ // Send the response back
+ core::Name tmp_name("b001::abcd");
+ auto response = std::make_shared<core::ContentObject>(tmp_name);
+ auto _payload = utils::MemBuf::create(sizeof(SubscriptionResponseMessage));
+ _payload->append(sizeof(SubscriptionResponseMessage));
+ auto payload = _payload->data();
+ SubscriptionResponseMessage *response_message =
+ (SubscriptionResponseMessage *)payload;
+ response_message->response.header.msg_type = MessageType::RESPONSE;
+ response_message->response.header.reserved[0] = 0;
+ response_message->response.header.reserved[1] = 0;
+ response_message->response.return_code = ReturnCode::OK;
+ name.copyToDestination(reinterpret_cast<uint8_t *>(response_message->name));
+ response->appendPayload(std::move(_payload));
+ response->setLifetime(0);
+ return response;
+}
+
+} // namespace interface
+} // namespace transport
diff --git a/libtransport/src/hicn/transport/interfaces/full_duplex_socket.h b/libtransport/src/hicn/transport/interfaces/full_duplex_socket.h
new file mode 100755
index 000000000..f881bea54
--- /dev/null
+++ b/libtransport/src/hicn/transport/interfaces/full_duplex_socket.h
@@ -0,0 +1,254 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * This class is created for sending/receiving data over an ICN network.
+ */
+
+#pragma once
+
+#include <hicn/transport/core/prefix.h>
+#include <hicn/transport/interfaces/async_transport.h>
+#include <hicn/transport/interfaces/socket_consumer.h>
+#include <hicn/transport/interfaces/socket_producer.h>
+#include <hicn/transport/portability/portability.h>
+#include <hicn/transport/utils/sharable_vector.h>
+
+#include <unordered_set>
+#include <vector>
+
+namespace transport {
+
+namespace interface {
+
+enum class MessageType : uint8_t { ACTION, RESPONSE, PAYLOAD };
+
+enum class Action : uint8_t {
+ SUBSCRIBE,
+ CANCEL_SUBSCRIPTION,
+ SIGNAL_PRODUCTION,
+};
+
+enum class ReturnCode : uint8_t {
+ OK,
+ FAILED,
+};
+
+struct MessageHeader {
+ MessageType msg_type;
+ uint8_t reserved[2];
+};
+
+struct ActionMessage {
+ MessageHeader header;
+ Action action;
+ uint64_t name[2];
+};
+
+struct ResponseMessage {
+ MessageHeader header;
+ ReturnCode return_code;
+};
+
+struct SubscriptionResponseMessage {
+ ResponseMessage response;
+ uint64_t name[2];
+};
+
+struct PayloadMessage {
+ MessageHeader header;
+ uint8_t reserved[1];
+};
+
+// struct NotificationMessage {
+// Action action;
+// uint8_t reserved[3];
+// uint64_t
+// }
+
+using core::Prefix;
+
+class AsyncFullDuplexSocket : public AsyncSocket,
+ public AsyncReader,
+ public AsyncWriter,
+ public AsyncAcceptor {
+ private:
+ struct Counters {
+ uint64_t app_bytes_written_;
+ uint64_t app_bytes_read_;
+
+ TRANSPORT_ALWAYS_INLINE void updateBytesWritten(uint64_t bytes) {
+ app_bytes_written_ += bytes;
+ }
+
+ TRANSPORT_ALWAYS_INLINE void updateBytesRead(uint64_t bytes) {
+ app_bytes_read_ += bytes;
+ }
+ };
+
+ public:
+ using UniquePtr = std::unique_ptr<AsyncFullDuplexSocket>;
+ using SharedPtr = std::unique_ptr<AsyncFullDuplexSocket>;
+
+ AsyncFullDuplexSocket(const Prefix &locator, asio::io_service &io_service);
+ AsyncFullDuplexSocket(const core::Prefix &locator);
+
+ ~AsyncFullDuplexSocket() {
+ TRANSPORT_LOGI("Adios AsyncFullDuplexSocket!!!");
+ };
+
+ using ReadCallback = AsyncReader::ReadCallback;
+ using WriteCallback = AsyncWriter::WriteCallback;
+
+ TRANSPORT_ALWAYS_INLINE void setReadCB(ReadCallback *callback) override {
+ read_callback_ = callback;
+ }
+
+ TRANSPORT_ALWAYS_INLINE ReadCallback *getReadCallback() const override {
+ return read_callback_;
+ }
+
+ TRANSPORT_ALWAYS_INLINE void setWriteCB(WriteCallback *callback) override {
+ write_callback_ = callback;
+ }
+
+ TRANSPORT_ALWAYS_INLINE WriteCallback *getWriteCallback() const override {
+ return write_callback_;
+ }
+
+ TRANSPORT_ALWAYS_INLINE const core::Prefix &getLocator() { return locator_; }
+
+ void connect(ConnectCallback *callback, const core::Prefix &prefix) override;
+
+ void write(WriteCallback *callback, const void *buf, size_t bytes,
+ const PublicationOptions &options,
+ WriteFlags flags = WriteFlags::NONE) override;
+
+ virtual void write(WriteCallback *callback,
+ utils::SharableVector<uint8_t> &&output_buffer,
+ const PublicationOptions &options,
+ WriteFlags flags = WriteFlags::NONE) override;
+
+ void waitForSubscribers(AcceptCallback *cb) override;
+
+ // void writev(
+ // WriteCallback* callback,
+ // const iovec* vec,
+ // size_t count,
+ // Name &&content_to_publish_name,
+ // WriteFlags flags = WriteFlags::NONE) override;
+
+ void close() override;
+
+ void closeNow() override;
+
+ void shutdownWrite() override;
+
+ void shutdownWriteNow() override;
+
+ bool good() const override;
+
+ bool readable() const override;
+
+ bool writable() const override;
+
+ bool isPending() const override;
+
+ bool connected() const override;
+
+ bool error() const override;
+
+ void setSendTimeout(uint32_t milliseconds) override;
+
+ size_t getAppBytesWritten() const override;
+ size_t getRawBytesWritten() const override;
+ size_t getAppBytesReceived() const override;
+ size_t getRawBytesReceived() const override;
+
+ uint32_t getSendTimeout() const override;
+
+ private:
+ std::shared_ptr<core::ContentObject> decodeSynchronizationMessage(
+ const core::Interest &interest);
+
+ class OnConnectCallback : public BasePortal::ConsumerCallback {
+ public:
+ OnConnectCallback(AsyncFullDuplexSocket &socket) : socket_(socket){};
+ virtual ~OnConnectCallback() = default;
+ void onContentObject(core::Interest::Ptr &&,
+ core::ContentObject::Ptr &&content_object) override;
+ void onTimeout(core::Interest::Ptr &&interest) override;
+
+ private:
+ AsyncFullDuplexSocket &socket_;
+ };
+
+ class OnSignalCallback : public BasePortal::ConsumerCallback {
+ public:
+ OnSignalCallback(AsyncFullDuplexSocket &socket) : socket_(socket){};
+ virtual ~OnSignalCallback() = default;
+ void onContentObject(core::Interest::Ptr &&,
+ core::ContentObject::Ptr &&content_object);
+ void onTimeout(core::Interest::Ptr &&interest);
+
+ private:
+ AsyncFullDuplexSocket &socket_;
+ };
+
+ void onControlInterest(ProducerSocket &s, const core::Interest &i);
+ void onContentProduced(ProducerSocket &producer, const std::error_code &ec,
+ uint64_t bytes_written);
+ void onContentRetrieved(ConsumerSocket &s, std::size_t size,
+ const std::error_code &ec);
+
+ void signalProductionToSubscribers(const core::Name &name);
+ void piggybackPayloadToSubscribers(const core::Name &name,
+ const uint8_t *buffer, std::size_t bytes);
+
+ std::shared_ptr<core::ContentObject> createAck();
+ std::shared_ptr<core::ContentObject> createSubscriptionResponse(
+ const core::Name &name);
+
+ core::Prefix locator_;
+ uint32_t incremental_suffix_;
+ core::Name sync_notification_;
+ // std::unique_ptr<BasePortal> portal_;
+ asio::io_service internal_io_service_;
+ asio::io_service &io_service_;
+ asio::io_service::work work_;
+
+ // These names represent the "locator" of a certain
+ // peer that subscribed to this.
+ std::unordered_set<core::Name> subscribers_;
+
+ // Useful for publishing / Retrieving data
+ std::unique_ptr<ProducerSocket> producer_;
+ std::unique_ptr<ConsumerSocket> consumer_;
+
+ ReadCallback *read_callback_;
+ WriteCallback *write_callback_;
+ ConnectCallback *connect_callback_;
+ AcceptCallback *accept_callback_;
+
+ std::unique_ptr<OnConnectCallback> internal_connect_callback_;
+ std::unique_ptr<OnSignalCallback> internal_signal_callback_;
+
+ uint32_t send_timeout_milliseconds_;
+ struct Counters counters_;
+ std::shared_ptr<utils::SharableVector<uint8_t>> receive_buffer_;
+};
+
+} // namespace interface
+} // namespace transport
diff --git a/libtransport/src/hicn/transport/interfaces/publication_options.h b/libtransport/src/hicn/transport/interfaces/publication_options.h
new file mode 100755
index 000000000..ae5366ce7
--- /dev/null
+++ b/libtransport/src/hicn/transport/interfaces/publication_options.h
@@ -0,0 +1,34 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <map>
+#include <sstream>
+#include <vector>
+
+namespace transport {
+
+namespace interface {
+
+class PublicationOptions {
+ public:
+ core::Name name;
+ uint32_t content_lifetime_milliseconds;
+ // TODO Signature
+};
+} // namespace interface
+
+} // namespace transport \ No newline at end of file
diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_consumer.cc b/libtransport/src/hicn/transport/interfaces/rtc_socket_consumer.cc
new file mode 100755
index 000000000..de3e84417
--- /dev/null
+++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_consumer.cc
@@ -0,0 +1,35 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/transport/interfaces/rtc_socket_consumer.h>
+
+namespace transport {
+
+namespace interface {
+
+RTCConsumerSocket::RTCConsumerSocket(int protocol, asio::io_service &io_service)
+ : ConsumerSocket(protocol, io_service) {}
+
+RTCConsumerSocket::~RTCConsumerSocket() {}
+
+void RTCConsumerSocket::handleRTCPPacket(uint8_t *packet, size_t len) {
+ RTCTransportProtocol *transport = dynamic_cast<RTCTransportProtocol *>(
+ ConsumerSocket::transport_protocol_.get());
+ if (transport) transport->onRTCPPacket(packet, len);
+}
+
+} // namespace interface
+
+} // end namespace transport
diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_consumer.h b/libtransport/src/hicn/transport/interfaces/rtc_socket_consumer.h
new file mode 100755
index 000000000..86ccf6e22
--- /dev/null
+++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_consumer.h
@@ -0,0 +1,35 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/interfaces/socket_consumer.h>
+
+namespace transport {
+
+namespace interface {
+
+class RTCConsumerSocket : public ConsumerSocket {
+ public:
+ explicit RTCConsumerSocket(int protocol, asio::io_service &io_service);
+
+ ~RTCConsumerSocket();
+
+ void handleRTCPPacket(uint8_t *packet, size_t len);
+};
+
+} // namespace interface
+
+} // end namespace transport
diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc
new file mode 100755
index 000000000..d8a9d53b9
--- /dev/null
+++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.cc
@@ -0,0 +1,157 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <stdlib.h>
+#include <time.h>
+#include <hicn/transport/interfaces/rtc_socket_producer.h>
+
+#define NACK_HEADER_SIZE 8 // bytes
+#define TIMESTAMP_LEN 8 // bytes
+#define TCP_HEADER_SIZE 20
+#define IP6_HEADER_SIZE 40
+#define INIT_PACKET_PRODUCTION_RATE 100 // pps random value (almost 1Mbps)
+#define STATS_INTERVAL_DURATION 500 // ms
+#define INTEREST_LIFETIME_REDUCTION_FACTOR 0.8
+
+// NACK HEADER
+// +-----------------------------------------+
+// | 4 bytes: current segment in production |
+// +-----------------------------------------+
+// | 4 bytes: production rate (bytes x sec) |
+// +-----------------------------------------+
+// may require additional field (Rate for multiple qualities, ...)
+//
+
+namespace transport {
+
+namespace interface {
+
+RTCProducerSocket::RTCProducerSocket(asio::io_service &io_service)
+ : ProducerSocket(io_service),
+ currentSeg_(1),
+ nack_(std::make_shared<ContentObject>()),
+ producedBytes_(0),
+ producedPackets_(0),
+ bytesProductionRate_(0),
+ packetsProductionRate_(INIT_PACKET_PRODUCTION_RATE),
+ perSecondFactor_(1000 / STATS_INTERVAL_DURATION) {
+ nack_->appendPayload(utils::MemBuf::create(NACK_HEADER_SIZE));
+ lastStats_ = std::chrono::steady_clock::now();
+ srand(time(NULL));
+ prodLabel_ = ((rand() % 255) << 24UL);
+}
+
+RTCProducerSocket::~RTCProducerSocket() {}
+
+void RTCProducerSocket::registerName(Prefix &producer_namespace) {
+ ProducerSocket::registerPrefix(producer_namespace);
+
+ flowName_ = producer_namespace.getName();
+
+ if (flowName_.getType() == HNT_CONTIGUOUS_V4 ||
+ flowName_.getType() == HNT_IOV_V4) {
+ headerSize_ = sizeof(hicn_v6_hdr_t::ip);
+ } else if (flowName_.getType() == HNT_CONTIGUOUS_V6 ||
+ flowName_.getType() == HNT_IOV_V6) {
+ headerSize_ = sizeof(hicn_v4_hdr_t::ip);
+ } else {
+ throw errors::RuntimeException("Unknown name format.");
+ }
+
+ headerSize_ += TCP_HEADER_SIZE;
+}
+
+void RTCProducerSocket::updateStats(uint32_t packet_size) {
+ producedBytes_ += packet_size;
+ producedPackets_++;
+ std::chrono::steady_clock::duration duration =
+ std::chrono::steady_clock::now() - lastStats_;
+ if (std::chrono::duration_cast<std::chrono::milliseconds>(duration).count() >=
+ STATS_INTERVAL_DURATION) {
+ lastStats_ = std::chrono::steady_clock::now();
+ bytesProductionRate_ = producedBytes_ * perSecondFactor_;
+ packetsProductionRate_ = producedPackets_ * perSecondFactor_;
+ producedBytes_ = 0;
+ producedPackets_ = 0;
+ }
+}
+
+void RTCProducerSocket::produce(const uint8_t *buf, size_t buffer_size) {
+ if (TRANSPORT_EXPECT_FALSE(buffer_size == 0)) {
+ return;
+ }
+
+ if (TRANSPORT_EXPECT_FALSE((buffer_size + headerSize_ + TIMESTAMP_LEN) >
+ data_packet_size_)) {
+ return;
+ }
+
+ updateStats(buffer_size + headerSize_ + TIMESTAMP_LEN);
+
+ std::shared_ptr<ContentObject> content_object =
+ std::make_shared<ContentObject>(flowName_.setSuffix(currentSeg_));
+ auto payload = utils::MemBuf::copyBuffer(buf, buffer_size, TIMESTAMP_LEN);
+
+ // content_object->setLifetime(content_object_expiry_time_);
+ content_object->setLifetime(1000); // XXX this should be set by the APP
+
+ uint64_t timestamp = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::system_clock::now().time_since_epoch())
+ .count();
+
+ payload->prepend(TIMESTAMP_LEN);
+ uint8_t *payloadPointer = payload->writableData();
+ *(uint64_t *)payloadPointer = timestamp;
+ content_object->appendPayload(std::move(payload));
+
+ content_object->setPathLabel(prodLabel_);
+ portal_->sendContentObject(*content_object);
+
+ currentSeg_++;
+}
+
+void RTCProducerSocket::onInterest(Interest::Ptr &&interest) {
+ uint32_t interestSeg = interest->getName().getSuffix();
+ uint32_t lifetime = interest->getLifetime();
+ uint32_t max_gap;
+
+ // XXX
+ // packetsProductionRate_ is modified by another thread in updateStats
+ // this should be safe since I just read here. but, you never know.
+ max_gap =
+ floor((double)((double)((double)lifetime *
+ INTEREST_LIFETIME_REDUCTION_FACTOR / 1000.0) *
+ (double)packetsProductionRate_));
+
+ if (interestSeg < currentSeg_ || interestSeg > (max_gap + currentSeg_)) {
+ sendNack(*interest);
+ }
+ // else drop packet
+}
+
+void RTCProducerSocket::sendNack(const Interest &interest) {
+ nack_->setName(interest.getName());
+ uint32_t *payload_ptr = (uint32_t *)nack_->getPayload().data();
+ *payload_ptr = currentSeg_;
+ *(++payload_ptr) = bytesProductionRate_;
+
+ nack_->setLifetime(0);
+ nack_->setPathLabel(prodLabel_);
+ portal_->sendContentObject(*nack_);
+}
+
+} // namespace interface
+
+} // end namespace transport
diff --git a/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h
new file mode 100755
index 000000000..1a42bdc56
--- /dev/null
+++ b/libtransport/src/hicn/transport/interfaces/rtc_socket_producer.h
@@ -0,0 +1,60 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/interfaces/socket_producer.h>
+#include <hicn/transport/utils/content_store.h>
+
+#include <map>
+#include <mutex>
+
+namespace transport {
+
+namespace interface {
+
+class RTCProducerSocket : public ProducerSocket {
+ public:
+ RTCProducerSocket(asio::io_service &io_service);
+ ~RTCProducerSocket();
+
+ void registerName(Prefix &producer_namespace);
+
+ void produce(const uint8_t *buffer, size_t buffer_size);
+
+ void onInterest(Interest::Ptr &&interest) override;
+
+ private:
+ void sendNack(const Interest &interest);
+ void updateStats(uint32_t packet_size);
+
+ // std::map<uint32_t, uint64_t> pendingInterests_;
+ uint32_t currentSeg_;
+ uint32_t prodLabel_;
+ uint16_t headerSize_;
+ Name flowName_;
+ // bool produceInSynch_;
+ std::shared_ptr<ContentObject> nack_;
+ uint32_t producedBytes_;
+ uint32_t producedPackets_;
+ uint32_t bytesProductionRate_;
+ uint32_t packetsProductionRate_;
+ uint32_t perSecondFactor_;
+ std::chrono::steady_clock::time_point lastStats_;
+};
+
+} // namespace interface
+
+} // end namespace transport
diff --git a/libtransport/src/hicn/transport/interfaces/socket.h b/libtransport/src/hicn/transport/interfaces/socket.h
new file mode 100755
index 000000000..22757810a
--- /dev/null
+++ b/libtransport/src/hicn/transport/interfaces/socket.h
@@ -0,0 +1,270 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/config.h>
+#include <hicn/transport/core/content_object.h>
+#include <hicn/transport/core/facade.h>
+#include <hicn/transport/core/interest.h>
+#include <hicn/transport/core/manifest_format_fixed.h>
+#include <hicn/transport/core/manifest_inline.h>
+#include <hicn/transport/core/name.h>
+#include <hicn/transport/interfaces/socket_options_default_values.h>
+#include <hicn/transport/interfaces/socket_options_keys.h>
+#include <hicn/transport/utils/crypto_suite.h>
+#include <hicn/transport/utils/identity.h>
+#include <hicn/transport/utils/verifier.h>
+
+#define SOCKET_OPTION_GET 0
+#define SOCKET_OPTION_NOT_GET 1
+#define SOCKET_OPTION_SET 2
+#define SOCKET_OPTION_NOT_SET 3
+#define SOCKET_OPTION_DEFAULT 12345
+
+#define VOID_HANDLER 0
+
+namespace transport {
+
+namespace protocol {
+class IcnObserver;
+}
+
+namespace interface {
+
+template <typename PortalType>
+class Socket;
+class ConsumerSocket;
+class ProducerSocket;
+
+// using Interest = core::Interest;
+// using ContentObject = core::ContentObject;
+// using Name = core::Name;
+// using HashAlgorithm = core::HashAlgorithm;
+// using CryptoSuite = utils::CryptoSuite;
+// using Identity = utils::Identity;
+// using Verifier = utils::Verifier;
+
+using HicnForwarderPortal = core::HicnForwarderPortal;
+
+#ifdef __linux__
+#ifndef __ANDROID__
+using RawSocketPortal = core::RawSocketPortal;
+#endif
+#endif
+
+#ifdef __vpp__
+using VPPForwarderPortal = core::VPPForwarderPortal;
+using BaseSocket = Socket<VPPForwarderPortal>;
+using BasePortal = VPPForwarderPortal;
+#else
+using BaseSocket = Socket<HicnForwarderPortal>;
+using BasePortal = HicnForwarderPortal;
+#endif
+
+using PayloadType = core::PayloadType;
+using Prefix = core::Prefix;
+using Array = utils::Array<uint8_t>;
+
+using ConsumerInterestCallback =
+ std::function<void(ConsumerSocket &, const core::Interest &)>;
+
+using ConsumerContentCallback =
+ std::function<void(ConsumerSocket &, std::size_t, const std::error_code &)>;
+
+using ConsumerTimerCallback =
+ std::function<void(ConsumerSocket &, std::size_t,
+ std::chrono::milliseconds &, float, uint32_t, uint32_t)>;
+
+using ProducerContentCallback = std::function<void(
+ ProducerSocket &, const std::error_code &, uint64_t bytes_written)>;
+
+using ConsumerContentObjectCallback =
+ std::function<void(ConsumerSocket &, const core::ContentObject &)>;
+
+using ConsumerContentObjectVerificationCallback =
+ std::function<bool(ConsumerSocket &, const core::ContentObject &)>;
+
+using ConsumerManifestCallback =
+ std::function<void(ConsumerSocket &, const core::ContentObjectManifest &)>;
+
+using ProducerContentObjectCallback =
+ std::function<void(ProducerSocket &, core::ContentObject &)>;
+
+using ProducerInterestCallback =
+ std::function<void(ProducerSocket &, const core::Interest &)>;
+
+using ProducerInterestCallback =
+ std::function<void(ProducerSocket &, const core::Interest &)>;
+
+using namespace protocol;
+
+template <typename PortalType>
+class Socket {
+ static_assert(std::is_same<PortalType, HicnForwarderPortal>::value
+#ifdef __linux__
+#ifndef __ANDROID__
+ || std::is_same<PortalType, RawSocketPortal>::value
+#ifdef __vpp__
+ || std::is_same<PortalType, VPPForwarderPortal>::value
+#endif
+#endif
+ ,
+#else
+ ,
+
+#endif
+ "This class is not allowed as Portal");
+
+ public:
+ using Portal = PortalType;
+
+ virtual asio::io_service &getIoService() = 0;
+
+ virtual void connect() = 0;
+
+ virtual int setSocketOption(int socket_option_key,
+ uint32_t socket_option_value) = 0;
+
+ virtual int setSocketOption(int socket_option_key,
+ double socket_option_value) = 0;
+
+ virtual int setSocketOption(int socket_option_key,
+ bool socket_option_value) = 0;
+
+ virtual int setSocketOption(int socket_option_key,
+ core::Name socket_option_value) = 0;
+
+ virtual int setSocketOption(int socket_option_key,
+ std::list<Prefix> socket_option_value) = 0;
+
+ virtual int setSocketOption(
+ int socket_option_key,
+ ProducerContentObjectCallback socket_option_value) = 0;
+
+ virtual int setSocketOption(int socket_option_key,
+ ProducerInterestCallback socket_option_value) = 0;
+
+ virtual int setSocketOption(int socket_option_key,
+ ProducerContentCallback socket_option_value) = 0;
+
+ virtual int setSocketOption(
+ int socket_option_key,
+ ConsumerContentObjectVerificationCallback socket_option_value) = 0;
+
+ virtual int setSocketOption(
+ int socket_option_key,
+ ConsumerContentObjectCallback socket_option_value) = 0;
+
+ virtual int setSocketOption(int socket_option_key,
+ ConsumerInterestCallback socket_option_value) = 0;
+
+ virtual int setSocketOption(int socket_option_key,
+ ConsumerContentCallback socket_option_value) = 0;
+
+ virtual int setSocketOption(int socket_option_key,
+ ConsumerManifestCallback socket_option_value) = 0;
+
+ virtual int setSocketOption(int socket_option_key,
+ IcnObserver *socket_option_value) = 0;
+
+ virtual int setSocketOption(int socket_option_key,
+ core::HashAlgorithm socket_option_value) = 0;
+
+ virtual int setSocketOption(int socket_option_key,
+ utils::CryptoSuite socket_option_value) = 0;
+
+ virtual int setSocketOption(int socket_option_key,
+ const utils::Identity &socket_option_value) = 0;
+
+ virtual int setSocketOption(int socket_option_key,
+ ConsumerTimerCallback socket_option_value) = 0;
+
+ virtual int setSocketOption(int socket_option_key,
+ const std::string &socket_option_value) = 0;
+
+ virtual int getSocketOption(int socket_option_key,
+ uint32_t &socket_option_value) = 0;
+
+ virtual int getSocketOption(int socket_option_key,
+ double &socket_option_value) = 0;
+
+ virtual int getSocketOption(int socket_option_key,
+ bool &socket_option_value) = 0;
+
+ virtual int getSocketOption(int socket_option_key,
+ core::Name &socket_option_value) = 0;
+
+ virtual int getSocketOption(int socket_option_key,
+ std::list<Prefix> &socket_option_value) = 0;
+
+ virtual int getSocketOption(
+ int socket_option_key,
+ ProducerContentObjectCallback &socket_option_value) = 0;
+
+ virtual int getSocketOption(
+ int socket_option_key, ProducerInterestCallback &socket_option_value) = 0;
+
+ virtual int getSocketOption(
+ int socket_option_key,
+ ConsumerContentObjectVerificationCallback &socket_option_value) = 0;
+
+ virtual int getSocketOption(
+ int socket_option_key,
+ ConsumerContentObjectCallback &socket_option_value) = 0;
+
+ virtual int getSocketOption(
+ int socket_option_key, ConsumerInterestCallback &socket_option_value) = 0;
+
+ virtual int getSocketOption(int socket_option_key,
+ ConsumerContentCallback &socket_option_value) = 0;
+
+ virtual int getSocketOption(
+ int socket_option_key, ConsumerManifestCallback &socket_option_value) = 0;
+
+ virtual int getSocketOption(int socket_option_key,
+ ProducerContentCallback &socket_option_value) = 0;
+
+ virtual int getSocketOption(int socket_option_key,
+ std::shared_ptr<Portal> &socket_option_value) = 0;
+
+ virtual int getSocketOption(int socket_option_key,
+ IcnObserver **socket_option_value) = 0;
+
+ virtual int getSocketOption(int socket_option_key,
+ core::HashAlgorithm &socket_option_value) = 0;
+
+ virtual int getSocketOption(int socket_option_key,
+ utils::CryptoSuite &socket_option_value) = 0;
+
+ virtual int getSocketOption(int socket_option_key,
+ utils::Identity &socket_option_value) = 0;
+
+ virtual int getSocketOption(int socket_option_key,
+ std::string &socket_option_value) = 0;
+
+ virtual int getSocketOption(int socket_option_key,
+ ConsumerTimerCallback &socket_option_value) = 0;
+
+ protected:
+ virtual ~Socket(){};
+
+ protected:
+ std::string output_interface_;
+};
+
+} // namespace interface
+
+} // namespace transport
diff --git a/libtransport/src/hicn/transport/interfaces/socket_consumer.cc b/libtransport/src/hicn/transport/interfaces/socket_consumer.cc
new file mode 100755
index 000000000..8109d0e99
--- /dev/null
+++ b/libtransport/src/hicn/transport/interfaces/socket_consumer.cc
@@ -0,0 +1,735 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/transport/interfaces/socket_consumer.h>
+
+namespace transport {
+
+namespace interface {
+
+ConsumerSocket::ConsumerSocket(int protocol)
+ : ConsumerSocket(protocol, internal_io_service_) {}
+
+ConsumerSocket::ConsumerSocket(int protocol, asio::io_service &io_service)
+ : io_service_(io_service),
+ portal_(std::make_shared<Portal>(io_service_)),
+ async_downloader_(),
+ interest_lifetime_(default_values::interest_lifetime),
+ min_window_size_(default_values::min_window_size),
+ max_window_size_(default_values::max_window_size),
+ current_window_size_(-1),
+ max_retransmissions_(
+ default_values::transport_protocol_max_retransmissions),
+ /****** RAAQM Parameters ******/
+ minimum_drop_probability_(default_values::minimum_drop_probability),
+ sample_number_(default_values::sample_number),
+ gamma_(default_values::gamma_value),
+ beta_(default_values::beta_value),
+ drop_factor_(default_values::drop_factor),
+ /****** END RAAQM Parameters ******/
+ rate_estimation_alpha_(default_values::rate_alpha),
+ rate_estimation_observer_(nullptr),
+ rate_estimation_choice_(0),
+ is_async_(false),
+ verify_signature_(false),
+ content_buffer_(nullptr),
+ on_interest_output_(VOID_HANDLER),
+ on_interest_timeout_(VOID_HANDLER),
+ on_interest_satisfied_(VOID_HANDLER),
+ on_content_object_input_(VOID_HANDLER),
+ on_content_object_verification_(VOID_HANDLER),
+ on_content_object_(VOID_HANDLER),
+ on_manifest_(VOID_HANDLER),
+ on_payload_retrieved_(VOID_HANDLER),
+ virtual_download_(false),
+ rtt_stats_(false),
+ timer_(portal_->getIoService()),
+ timer_interval_milliseconds_(0) {
+ switch (protocol) {
+ case TransportProtocolAlgorithms::VEGAS:
+ transport_protocol_ = std::make_shared<VegasTransportProtocol>(this);
+ break;
+ case TransportProtocolAlgorithms::CBR:
+ transport_protocol_ = std::make_shared<CbrTransportProtocol>(this);
+ break;
+ case TransportProtocolAlgorithms::RTC:
+ transport_protocol_ = std::make_shared<RTCTransportProtocol>(this);
+ break;
+ case TransportProtocolAlgorithms::RAAQM:
+ default:
+ transport_protocol_ = std::make_shared<RaaqmTransportProtocol>(this);
+ break;
+ }
+}
+
+ConsumerSocket::~ConsumerSocket() {
+ stop();
+
+ async_downloader_.stop();
+
+ transport_protocol_.reset();
+ portal_.reset();
+}
+
+void ConsumerSocket::connect() { portal_->connect(); }
+
+int ConsumerSocket::consume(const Name &name,
+ utils::SharableVector<uint8_t> &receive_buffer) {
+ if (transport_protocol_->isRunning()) {
+ return CONSUMER_BUSY;
+ }
+
+ content_buffer_ = receive_buffer.shared_from_this();
+
+ network_name_ = name;
+ network_name_.setSuffix(0);
+ is_async_ = false;
+
+ transport_protocol_->start(receive_buffer);
+
+ return CONSUMER_READY;
+}
+
+int ConsumerSocket::asyncConsume(
+ const Name &name,
+ std::shared_ptr<utils::SharableVector<uint8_t>> receive_buffer) {
+ // XXX Try to move the name here, instead of copying it!!
+ if (!async_downloader_.stopped()) {
+ async_downloader_.add([this, receive_buffer, name]() {
+ network_name_ = std::move(name);
+ network_name_.setSuffix(0);
+ is_async_ = true;
+ transport_protocol_->start(*receive_buffer);
+ });
+ }
+
+ return CONSUMER_READY;
+}
+
+void ConsumerSocket::asyncSendInterest(Interest::Ptr &&interest,
+ Portal::ConsumerCallback *callback) {
+ if (!async_downloader_.stopped()) {
+ // TODO Workaround, to be fixed!
+ auto i = interest.release();
+ async_downloader_.add([this, i, callback]() mutable {
+ Interest::Ptr _interest(i);
+ portal_->setConsumerCallback(callback);
+ portal_->sendInterest(std::move(_interest));
+ portal_->runEventsLoop();
+ });
+ }
+}
+
+void ConsumerSocket::stop() {
+ if (transport_protocol_->isRunning()) {
+ transport_protocol_->stop();
+ }
+
+ //is_running_ = false;
+}
+
+void ConsumerSocket::resume() {
+ if(!transport_protocol_->isRunning()){
+ transport_protocol_->resume();
+ }
+}
+
+asio::io_service &ConsumerSocket::getIoService() {
+ return portal_->getIoService();
+}
+
+int ConsumerSocket::setSocketOption(int socket_option_key,
+ double socket_option_value) {
+ switch (socket_option_key) {
+ case MIN_WINDOW_SIZE:
+ min_window_size_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+
+ case MAX_WINDOW_SIZE:
+ max_window_size_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+
+ case CURRENT_WINDOW_SIZE:
+ current_window_size_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+
+ case GAMMA_VALUE:
+ gamma_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+
+ case BETA_VALUE:
+ beta_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+
+ case DROP_FACTOR:
+ drop_factor_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+
+ case MINIMUM_DROP_PROBABILITY:
+ minimum_drop_probability_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+
+ case RATE_ESTIMATION_ALPHA:
+ if (socket_option_value >= 0 && socket_option_value < 1) {
+ rate_estimation_alpha_ = socket_option_value;
+ } else {
+ rate_estimation_alpha_ = ALPHA;
+ }
+ return SOCKET_OPTION_SET;
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+}
+
+int ConsumerSocket::setSocketOption(int socket_option_key,
+ uint32_t socket_option_value) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::INPUT_BUFFER_SIZE:
+ input_buffer_size_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+
+ case GeneralTransportOptions::OUTPUT_BUFFER_SIZE:
+ output_buffer_size_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+
+ case GeneralTransportOptions::MAX_INTEREST_RETX:
+ max_retransmissions_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+
+ case GeneralTransportOptions::INTEREST_LIFETIME:
+ interest_lifetime_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+
+ case ConsumerCallbacksOptions::INTEREST_RETRANSMISSION:
+ if (socket_option_value == VOID_HANDLER) {
+ on_interest_retransmission_ = VOID_HANDLER;
+ return SOCKET_OPTION_SET;
+ }
+
+ case ConsumerCallbacksOptions::INTEREST_EXPIRED:
+ if (socket_option_value == VOID_HANDLER) {
+ on_interest_timeout_ = VOID_HANDLER;
+ return SOCKET_OPTION_SET;
+ }
+
+ case ConsumerCallbacksOptions::INTEREST_SATISFIED:
+ if (socket_option_value == VOID_HANDLER) {
+ on_interest_satisfied_ = VOID_HANDLER;
+ return SOCKET_OPTION_SET;
+ }
+
+ case ConsumerCallbacksOptions::INTEREST_OUTPUT:
+ if (socket_option_value == VOID_HANDLER) {
+ on_interest_output_ = VOID_HANDLER;
+ return SOCKET_OPTION_SET;
+ }
+
+ case ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT:
+ if (socket_option_value == VOID_HANDLER) {
+ on_content_object_input_ = VOID_HANDLER;
+ return SOCKET_OPTION_SET;
+ }
+
+ case ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY:
+ if (socket_option_value == VOID_HANDLER) {
+ on_content_object_verification_ = VOID_HANDLER;
+ return SOCKET_OPTION_SET;
+ }
+
+ case ConsumerCallbacksOptions::CONTENT_RETRIEVED:
+ if (socket_option_value == VOID_HANDLER) {
+ on_payload_retrieved_ = VOID_HANDLER;
+ return SOCKET_OPTION_SET;
+ }
+
+ case RateEstimationOptions::RATE_ESTIMATION_BATCH_PARAMETER:
+ if (socket_option_value > 0) {
+ rate_estimation_batching_parameter_ = socket_option_value;
+ } else {
+ rate_estimation_batching_parameter_ = BATCH;
+ }
+ return SOCKET_OPTION_SET;
+
+ case RateEstimationOptions::RATE_ESTIMATION_CHOICE:
+ if (socket_option_value > 0) {
+ rate_estimation_choice_ = socket_option_value;
+ } else {
+ rate_estimation_choice_ = RATE_CHOICE;
+ }
+ return SOCKET_OPTION_SET;
+
+ case GeneralTransportOptions::TIMER_INTERVAL:
+ timer_interval_milliseconds_ = socket_option_value;
+ TRANSPORT_LOGD("Ok set %d", timer_interval_milliseconds_);
+ return SOCKET_OPTION_SET;
+
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+}
+
+int ConsumerSocket::setSocketOption(int socket_option_key,
+ bool socket_option_value) {
+ switch (socket_option_key) {
+ case OtherOptions::VIRTUAL_DOWNLOAD:
+ virtual_download_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+
+ case RaaqmTransportOptions::RTT_STATS:
+ rtt_stats_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+
+ case GeneralTransportOptions::VERIFY_SIGNATURE:
+ verify_signature_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+}
+
+int ConsumerSocket::setSocketOption(int socket_option_key,
+ Name socket_option_value) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::NETWORK_NAME:
+ network_name_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+}
+
+int ConsumerSocket::setSocketOption(int socket_option_key,
+ std::list<Prefix> socket_option_value) {
+ return SOCKET_OPTION_NOT_SET;
+}
+
+int ConsumerSocket::setSocketOption(
+ int socket_option_key, ConsumerContentObjectCallback socket_option_value) {
+ switch (socket_option_key) {
+ case ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT:
+ on_content_object_input_ = socket_option_value;
+ ;
+ return SOCKET_OPTION_SET;
+
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+}
+
+int ConsumerSocket::setSocketOption(
+ int socket_option_key, ProducerContentObjectCallback socket_option_value) {
+ return SOCKET_OPTION_NOT_SET;
+}
+
+int ConsumerSocket::setSocketOption(
+ int socket_option_key,
+ ConsumerContentObjectVerificationCallback socket_option_value) {
+ switch (socket_option_key) {
+ case ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY:
+ on_content_object_verification_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+}
+
+int ConsumerSocket::setSocketOption(
+ int socket_option_key, ConsumerInterestCallback socket_option_value) {
+ switch (socket_option_key) {
+ case ConsumerCallbacksOptions::INTEREST_RETRANSMISSION:
+ on_interest_retransmission_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+
+ case ConsumerCallbacksOptions::INTEREST_OUTPUT:
+ on_interest_output_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+
+ case ConsumerCallbacksOptions::INTEREST_EXPIRED:
+ on_interest_timeout_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+
+ case ConsumerCallbacksOptions::INTEREST_SATISFIED:
+ on_interest_satisfied_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+}
+
+int ConsumerSocket::setSocketOption(
+ int socket_option_key, ProducerInterestCallback socket_option_value) {
+ return SOCKET_OPTION_NOT_SET;
+}
+
+int ConsumerSocket::setSocketOption(
+ int socket_option_key, ConsumerContentCallback socket_option_value) {
+ switch (socket_option_key) {
+ case ConsumerCallbacksOptions::CONTENT_RETRIEVED:
+ on_payload_retrieved_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+}
+
+int ConsumerSocket::setSocketOption(
+ int socket_option_key, ConsumerManifestCallback socket_option_value) {
+ switch (socket_option_key) {
+ case ConsumerCallbacksOptions::MANIFEST_INPUT:
+ on_manifest_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+}
+
+int ConsumerSocket::setSocketOption(
+ int socket_option_key, ProducerContentCallback socket_option_value) {
+ return SOCKET_OPTION_NOT_SET;
+}
+
+int ConsumerSocket::setSocketOption(int socket_option_key,
+ IcnObserver *socket_option_value) {
+ if (socket_option_key == RateEstimationOptions::RATE_ESTIMATION_OBSERVER) {
+ rate_estimation_observer_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+ }
+
+ return SOCKET_OPTION_NOT_SET;
+}
+
+int ConsumerSocket::setSocketOption(int socket_option_key,
+ HashAlgorithm socket_option_value) {
+ return SOCKET_OPTION_NOT_SET;
+}
+
+int ConsumerSocket::setSocketOption(int socket_option_key,
+ utils::CryptoSuite socket_option_value) {
+ return SOCKET_OPTION_NOT_SET;
+}
+
+int ConsumerSocket::setSocketOption(
+ int socket_option_key, const utils::Identity &socket_option_value) {
+ return SOCKET_OPTION_NOT_SET;
+}
+
+int ConsumerSocket::setSocketOption(int socket_option_key,
+ const std::string &socket_option_value) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::CERTIFICATE:
+ key_id_ = verifier_.addKeyFromCertificate(socket_option_value);
+
+ if (key_id_ != nullptr) {
+ return SOCKET_OPTION_SET;
+ }
+
+ break;
+
+ case DataLinkOptions::OUTPUT_INTERFACE:
+ output_interface_ = socket_option_value;
+ portal_->setOutputInterface(output_interface_);
+ return SOCKET_OPTION_SET;
+ }
+
+ return SOCKET_OPTION_NOT_SET;
+}
+
+int ConsumerSocket::setSocketOption(int socket_option_key,
+ ConsumerTimerCallback socket_option_value) {
+ switch (socket_option_key) {
+ case ConsumerCallbacksOptions::TIMER_EXPIRES:
+ on_timer_expires_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+ }
+
+ return SOCKET_OPTION_NOT_SET;
+}
+
+int ConsumerSocket::getSocketOption(int socket_option_key,
+ double &socket_option_value) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::MIN_WINDOW_SIZE:
+ socket_option_value = min_window_size_;
+ return SOCKET_OPTION_GET;
+
+ case GeneralTransportOptions::MAX_WINDOW_SIZE:
+ socket_option_value = max_window_size_;
+ return SOCKET_OPTION_GET;
+
+ case GeneralTransportOptions::CURRENT_WINDOW_SIZE:
+ socket_option_value = current_window_size_;
+ return SOCKET_OPTION_GET;
+
+ // RAAQM parameters
+
+ case RaaqmTransportOptions::GAMMA_VALUE:
+ socket_option_value = gamma_;
+ return SOCKET_OPTION_GET;
+
+ case RaaqmTransportOptions::BETA_VALUE:
+ socket_option_value = beta_;
+ return SOCKET_OPTION_GET;
+
+ case RaaqmTransportOptions::DROP_FACTOR:
+ socket_option_value = drop_factor_;
+ return SOCKET_OPTION_GET;
+
+ case RaaqmTransportOptions::MINIMUM_DROP_PROBABILITY:
+ socket_option_value = minimum_drop_probability_;
+ return SOCKET_OPTION_GET;
+
+ case RateEstimationOptions::RATE_ESTIMATION_ALPHA:
+ socket_option_value = rate_estimation_alpha_;
+ return SOCKET_OPTION_GET;
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+}
+
+int ConsumerSocket::getSocketOption(int socket_option_key,
+ uint32_t &socket_option_value) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::INPUT_BUFFER_SIZE:
+ socket_option_value = input_buffer_size_;
+ return SOCKET_OPTION_GET;
+
+ case GeneralTransportOptions::OUTPUT_BUFFER_SIZE:
+ socket_option_value = output_buffer_size_;
+ return SOCKET_OPTION_GET;
+
+ case GeneralTransportOptions::MAX_INTEREST_RETX:
+ socket_option_value = max_retransmissions_;
+ return SOCKET_OPTION_GET;
+
+ case GeneralTransportOptions::INTEREST_LIFETIME:
+ socket_option_value = interest_lifetime_;
+ return SOCKET_OPTION_GET;
+
+ case RaaqmTransportOptions::SAMPLE_NUMBER:
+ socket_option_value = sample_number_;
+ return SOCKET_OPTION_GET;
+
+ case RateEstimationOptions::RATE_ESTIMATION_BATCH_PARAMETER:
+ socket_option_value = rate_estimation_batching_parameter_;
+ return SOCKET_OPTION_GET;
+
+ case RateEstimationOptions::RATE_ESTIMATION_CHOICE:
+ socket_option_value = rate_estimation_choice_;
+ return SOCKET_OPTION_GET;
+
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+}
+
+int ConsumerSocket::getSocketOption(int socket_option_key,
+ bool &socket_option_value) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::ASYNC_MODE:
+ socket_option_value = is_async_;
+ return SOCKET_OPTION_GET;
+
+ case GeneralTransportOptions::RUNNING:
+ socket_option_value = transport_protocol_->isRunning();
+ return SOCKET_OPTION_GET;
+
+ case OtherOptions::VIRTUAL_DOWNLOAD:
+ socket_option_value = virtual_download_;
+ return SOCKET_OPTION_GET;
+
+ case RaaqmTransportOptions::RTT_STATS:
+ socket_option_value = rtt_stats_;
+ return SOCKET_OPTION_GET;
+
+ case GeneralTransportOptions::VERIFY_SIGNATURE:
+ socket_option_value = verify_signature_;
+ return SOCKET_OPTION_GET;
+
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+}
+
+int ConsumerSocket::getSocketOption(int socket_option_key,
+ Name &socket_option_value) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::NETWORK_NAME:
+ socket_option_value = network_name_;
+ return SOCKET_OPTION_GET;
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+}
+
+int ConsumerSocket::getSocketOption(int socket_option_key,
+ std::list<Prefix> &socket_option_value) {
+ return SOCKET_OPTION_NOT_GET;
+}
+
+int ConsumerSocket::getSocketOption(
+ int socket_option_key, ConsumerContentObjectCallback &socket_option_value) {
+ switch (socket_option_key) {
+ case ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT:
+ socket_option_value = on_content_object_input_;
+ return SOCKET_OPTION_GET;
+
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+}
+
+int ConsumerSocket::getSocketOption(
+ int socket_option_key, ProducerContentObjectCallback &socket_option_value) {
+ return SOCKET_OPTION_NOT_GET;
+}
+
+int ConsumerSocket::getSocketOption(
+ int socket_option_key,
+ ConsumerContentObjectVerificationCallback &socket_option_value) {
+ switch (socket_option_key) {
+ case ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY:
+ socket_option_value = on_content_object_verification_;
+ return SOCKET_OPTION_GET;
+
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+}
+
+int ConsumerSocket::getSocketOption(
+ int socket_option_key, ConsumerInterestCallback &socket_option_value) {
+ switch (socket_option_key) {
+ case ConsumerCallbacksOptions::INTEREST_RETRANSMISSION:
+ socket_option_value = on_interest_retransmission_;
+ return SOCKET_OPTION_GET;
+
+ case ConsumerCallbacksOptions::INTEREST_OUTPUT:
+ socket_option_value = on_interest_output_;
+ return SOCKET_OPTION_GET;
+
+ case ConsumerCallbacksOptions::INTEREST_EXPIRED:
+ socket_option_value = on_interest_timeout_;
+ return SOCKET_OPTION_GET;
+
+ case ConsumerCallbacksOptions::INTEREST_SATISFIED:
+ socket_option_value = on_interest_satisfied_;
+ return SOCKET_OPTION_GET;
+
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+}
+
+int ConsumerSocket::getSocketOption(
+ int socket_option_key, ProducerInterestCallback &socket_option_value) {
+ return SOCKET_OPTION_NOT_GET;
+}
+
+int ConsumerSocket::getSocketOption(
+ int socket_option_key, ConsumerContentCallback &socket_option_value) {
+ switch (socket_option_key) {
+ case ConsumerCallbacksOptions::CONTENT_RETRIEVED:
+ socket_option_value = on_payload_retrieved_;
+ return SOCKET_OPTION_GET;
+
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+}
+
+int ConsumerSocket::getSocketOption(
+ int socket_option_key, ConsumerManifestCallback &socket_option_value) {
+ switch (socket_option_key) {
+ case ConsumerCallbacksOptions::MANIFEST_INPUT:
+ socket_option_value = on_manifest_;
+ return SOCKET_OPTION_GET;
+
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+}
+
+int ConsumerSocket::getSocketOption(
+ int socket_option_key, std::shared_ptr<Portal> &socket_option_value) {
+ switch (socket_option_key) {
+ case PORTAL:
+ socket_option_value = portal_;
+ return SOCKET_OPTION_GET;
+
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+}
+
+int ConsumerSocket::getSocketOption(int socket_option_key,
+ IcnObserver **socket_option_value) {
+ if (socket_option_key == RATE_ESTIMATION_OBSERVER) {
+ *socket_option_value = (rate_estimation_observer_);
+ return SOCKET_OPTION_GET;
+ }
+
+ return SOCKET_OPTION_NOT_GET;
+}
+
+int ConsumerSocket::getSocketOption(int socket_option_key,
+ HashAlgorithm &socket_option_value) {
+ return SOCKET_OPTION_NOT_GET;
+}
+
+int ConsumerSocket::getSocketOption(int socket_option_key,
+ utils::CryptoSuite &socket_option_value) {
+ return SOCKET_OPTION_NOT_GET;
+}
+
+int ConsumerSocket::getSocketOption(int socket_option_key,
+ utils::Identity &socket_option_value) {
+ return SOCKET_OPTION_NOT_GET;
+}
+
+int ConsumerSocket::getSocketOption(
+ int socket_option_key, ProducerContentCallback &socket_option_value) {
+ return SOCKET_OPTION_NOT_GET;
+}
+
+int ConsumerSocket::getSocketOption(int socket_option_key,
+ std::string &socket_option_value) {
+ switch (socket_option_key) {
+ case DataLinkOptions::OUTPUT_INTERFACE:
+ socket_option_value = output_interface_;
+ return SOCKET_OPTION_GET;
+ }
+
+ return SOCKET_OPTION_NOT_GET;
+}
+
+int ConsumerSocket::getSocketOption(
+ int socket_option_key, ConsumerTimerCallback &socket_option_value) {
+ switch (socket_option_key) {
+ case ConsumerCallbacksOptions::TIMER_EXPIRES:
+ socket_option_value = on_timer_expires_;
+ return SOCKET_OPTION_GET;
+ }
+
+ return SOCKET_OPTION_NOT_GET;
+}
+
+} // namespace interface
+
+} // end namespace transport
diff --git a/libtransport/src/hicn/transport/interfaces/socket_consumer.h b/libtransport/src/hicn/transport/interfaces/socket_consumer.h
new file mode 100755
index 000000000..9e309aae8
--- /dev/null
+++ b/libtransport/src/hicn/transport/interfaces/socket_consumer.h
@@ -0,0 +1,259 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/interfaces/socket.h>
+
+#include <hicn/transport/protocols/cbr.h>
+#include <hicn/transport/protocols/protocol.h>
+#include <hicn/transport/protocols/raaqm.h>
+#include <hicn/transport/protocols/rtc.h>
+#include <hicn/transport/protocols/vegas.h>
+
+#include <hicn/transport/utils/event_thread.h>
+#include <hicn/transport/utils/sharable_vector.h>
+
+#define CONSUMER_READY 0
+#define CONSUMER_BUSY 1
+
+namespace transport {
+
+namespace interface {
+
+class ConsumerSocket : public BaseSocket {
+ friend class protocol::TransportProtocol;
+ friend class protocol::VegasTransportProtocol;
+ friend class protocol::RaaqmTransportProtocol;
+ friend class protocol::CbrTransportProtocol;
+
+ public:
+ explicit ConsumerSocket(int protocol);
+ explicit ConsumerSocket(int protocol, asio::io_service &io_service);
+
+ ~ConsumerSocket();
+
+ void connect() override;
+
+ int consume(const Name &name, utils::SharableVector<uint8_t> &receive_buffer);
+
+ int asyncConsume(
+ const Name &name,
+ std::shared_ptr<utils::SharableVector<uint8_t>> receive_buffer);
+
+ void asyncSendInterest(Interest::Ptr &&interest,
+ Portal::ConsumerCallback *callback);
+
+ void stop();
+
+ void resume();
+
+ asio::io_service &getIoService() override;
+
+ int setSocketOption(int socket_option_key,
+ uint32_t socket_option_value) override;
+
+ int setSocketOption(int socket_option_key,
+ double socket_option_value) override;
+
+ int setSocketOption(int socket_option_key, bool socket_option_value) override;
+
+ int setSocketOption(int socket_option_key, Name socket_option_value) override;
+
+ int setSocketOption(int socket_option_key,
+ std::list<Prefix> socket_option_value) override;
+
+ int setSocketOption(
+ int socket_option_key,
+ ProducerContentObjectCallback socket_option_value) override;
+
+ int setSocketOption(
+ int socket_option_key,
+ ConsumerContentObjectVerificationCallback socket_option_value) override;
+
+ int setSocketOption(
+ int socket_option_key,
+ ConsumerContentObjectCallback socket_option_value) override;
+
+ int setSocketOption(int socket_option_key,
+ ConsumerInterestCallback socket_option_value) override;
+
+ int setSocketOption(int socket_option_key,
+ ProducerInterestCallback socket_option_value) override;
+
+ int setSocketOption(int socket_option_key,
+ ConsumerContentCallback socket_option_value) override;
+
+ int setSocketOption(int socket_option_key,
+ ConsumerManifestCallback socket_option_value) override;
+
+ int setSocketOption(int socket_option_key,
+ IcnObserver *socket_option_value) override;
+
+ int setSocketOption(int socket_option_key,
+ HashAlgorithm socket_option_value) override;
+
+ int setSocketOption(int socket_option_key,
+ utils::CryptoSuite crypto_suite) override;
+
+ int setSocketOption(int socket_option_key,
+ const utils::Identity &crypto_suite) override;
+
+ int setSocketOption(int socket_option_key,
+ const std::string &socket_option_value) override;
+
+ int setSocketOption(int socket_option_key,
+ ConsumerTimerCallback socket_option_value) override;
+
+ int setSocketOption(int socket_option_key,
+ ProducerContentCallback socket_option_value) override;
+
+ int getSocketOption(int socket_option_key,
+ uint32_t &socket_option_value) override;
+
+ int getSocketOption(int socket_option_key,
+ double &socket_option_value) override;
+
+ int getSocketOption(int socket_option_key,
+ bool &socket_option_value) override;
+
+ int getSocketOption(int socket_option_key,
+ Name &socket_option_value) override;
+
+ int getSocketOption(int socket_option_key,
+ std::list<Prefix> &socket_option_value) override;
+
+ int getSocketOption(
+ int socket_option_key,
+ ProducerContentObjectCallback &socket_option_value) override;
+
+ int getSocketOption(
+ int socket_option_key,
+ ConsumerContentObjectVerificationCallback &socket_option_value) override;
+
+ int getSocketOption(
+ int socket_option_key,
+ ConsumerContentObjectCallback &socket_option_value) override;
+
+ int getSocketOption(int socket_option_key,
+ ConsumerInterestCallback &socket_option_value) override;
+
+ int getSocketOption(int socket_option_key,
+ ProducerInterestCallback &socket_option_value) override;
+
+ int getSocketOption(int socket_option_key,
+ ConsumerContentCallback &socket_option_value) override;
+
+ int getSocketOption(int socket_option_key,
+ ConsumerManifestCallback &socket_option_value) override;
+
+ int getSocketOption(int socket_option_key,
+ std::shared_ptr<Portal> &socket_option_value) override;
+
+ int getSocketOption(int socket_option_key,
+ IcnObserver **socket_option_value) override;
+
+ int getSocketOption(int socket_option_key,
+ HashAlgorithm &socket_option_value) override;
+
+ int getSocketOption(int socket_option_key,
+ utils::CryptoSuite &crypto_suite) override;
+
+ int getSocketOption(int socket_option_key,
+ utils::Identity &crypto_suite) override;
+
+ int getSocketOption(int socket_option_key,
+ std::string &socket_option_value) override;
+
+ int getSocketOption(int socket_option_key,
+ ConsumerTimerCallback &socket_option_value) override;
+
+ int getSocketOption(int socket_option_key,
+ ProducerContentCallback &socket_option_value) override;
+
+ protected:
+ std::shared_ptr<TransportProtocol> transport_protocol_;
+
+ private:
+ // context inner state variables
+ asio::io_service internal_io_service_;
+ asio::io_service &io_service_;
+
+ std::shared_ptr<Portal> portal_;
+
+ utils::EventThread async_downloader_;
+
+ Name network_name_;
+
+ int interest_lifetime_;
+
+ double min_window_size_;
+ double max_window_size_;
+ double current_window_size_;
+ uint32_t max_retransmissions_;
+ size_t output_buffer_size_;
+ size_t input_buffer_size_;
+
+ // RAAQM Parameters
+
+ double minimum_drop_probability_;
+ unsigned int sample_number_;
+ double gamma_;
+ double beta_;
+ double drop_factor_;
+
+ // Rate estimation parameters
+ double rate_estimation_alpha_;
+ IcnObserver *rate_estimation_observer_;
+ int rate_estimation_batching_parameter_;
+ int rate_estimation_choice_;
+
+ bool is_async_;
+
+ utils::Verifier verifier_;
+ PARCKeyId *key_id_;
+ bool verify_signature_;
+
+ std::shared_ptr<utils::SharableVector<uint8_t>> content_buffer_;
+
+ ConsumerInterestCallback on_interest_retransmission_;
+ ConsumerInterestCallback on_interest_output_;
+ ConsumerInterestCallback on_interest_timeout_;
+ ConsumerInterestCallback on_interest_satisfied_;
+
+ ConsumerContentObjectCallback on_content_object_input_;
+ ConsumerContentObjectVerificationCallback on_content_object_verification_;
+
+ ConsumerContentObjectCallback on_content_object_;
+ ConsumerManifestCallback on_manifest_;
+
+ ConsumerContentCallback on_payload_retrieved_;
+
+ ConsumerTimerCallback on_timer_expires_;
+
+ // Virtual download for traffic generator
+
+ bool virtual_download_;
+ bool rtt_stats_;
+
+ Time t0_;
+ Time t1_;
+ asio::steady_timer timer_;
+ uint32_t timer_interval_milliseconds_;
+};
+
+} // namespace interface
+
+} // end namespace transport
diff --git a/libtransport/src/hicn/transport/interfaces/socket_options_default_values.h b/libtransport/src/hicn/transport/interfaces/socket_options_default_values.h
new file mode 100755
index 000000000..5fae1c484
--- /dev/null
+++ b/libtransport/src/hicn/transport/interfaces/socket_options_default_values.h
@@ -0,0 +1,68 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <chrono>
+#include <cstdint>
+
+namespace transport {
+
+namespace interface {
+
+namespace default_values {
+
+const uint32_t interest_lifetime = 1001; // milliseconds
+const uint32_t content_object_expiry_time =
+ 0xffff; // milliseconds -> 50 seconds
+const uint32_t content_object_packet_size = 1500; // The ethernet MTU
+const uint32_t producer_socket_input_buffer_size = 150000; // Interests
+const uint32_t producer_socket_output_buffer_size = 150000; // Content Object
+const uint32_t log_2_default_buffer_size = 12;
+const uint32_t signature_size = 260; // bytes
+const uint32_t key_locator_size = 60; // bytes
+const uint32_t limit_guard = 80; // bytes
+const uint32_t min_window_size = 1; // Interests
+const uint32_t max_window_size = 128000; // Interests
+const uint32_t digest_size = 34; // bytes
+const uint32_t max_out_of_order_segments = 3; // content object
+const uint32_t never_expire_time = 0x0000ffff << 0x0f;
+
+// RAAQM
+const int sample_number = 30;
+const double gamma_value = 1;
+const double beta_value = 0.8;
+const double drop_factor = 0.2;
+const double minimum_drop_probability = 0.00001;
+const int path_id = 0;
+const double rate_alpha = 0.8;
+
+// Vegas
+const double alpha = 1 / 8;
+const double beta = 1 / 4;
+const uint16_t k = 4;
+const std::chrono::milliseconds clock_granularity =
+ std::chrono::milliseconds(100);
+
+// maximum allowed values
+const uint32_t transport_protocol_min_retransmissions = 0;
+const uint32_t transport_protocol_max_retransmissions = 128;
+const uint32_t max_content_object_size = 8096;
+
+} // namespace default_values
+
+} // namespace interface
+
+} // end namespace transport
diff --git a/libtransport/src/hicn/transport/interfaces/socket_options_keys.h b/libtransport/src/hicn/transport/interfaces/socket_options_keys.h
new file mode 100755
index 000000000..1afad2b48
--- /dev/null
+++ b/libtransport/src/hicn/transport/interfaces/socket_options_keys.h
@@ -0,0 +1,108 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+namespace transport {
+
+namespace interface {
+
+typedef enum {
+ RAAQM = 0,
+ VEGAS = 1,
+ CBR = 3,
+ RTC = 4,
+} TransportProtocolAlgorithms;
+
+typedef enum {
+ INPUT_BUFFER_SIZE = 101,
+ OUTPUT_BUFFER_SIZE = 102,
+ NETWORK_NAME = 103,
+ NAME_SUFFIX = 104,
+ MAX_INTEREST_RETX = 105,
+ DATA_PACKET_SIZE = 106,
+ INTEREST_LIFETIME = 107,
+ CONTENT_OBJECT_EXPIRY_TIME = 108,
+ KEY_LOCATOR = 110,
+ SIGNATURE_TYPE = 111,
+ MIN_WINDOW_SIZE = 112,
+ MAX_WINDOW_SIZE = 113,
+ CURRENT_WINDOW_SIZE = 114,
+ ASYNC_MODE = 115,
+ MAKE_MANIFEST = 116,
+ PORTAL = 117,
+ RUNNING = 118,
+ HASH_ALGORITHM = 119,
+ CRYPTO_SUITE = 120,
+ IDENTITY = 121,
+ CERTIFICATE = 122,
+ VERIFY_SIGNATURE = 123,
+ TIMER_INTERVAL = 124
+} GeneralTransportOptions;
+
+typedef enum {
+ SAMPLE_NUMBER = 201,
+ GAMMA_VALUE = 202,
+ BETA_VALUE = 203,
+ DROP_FACTOR = 204,
+ MINIMUM_DROP_PROBABILITY = 205,
+ PATH_ID = 206,
+ RTT_STATS = 207,
+} RaaqmTransportOptions;
+
+typedef enum {
+ RATE_ESTIMATION_ALPHA = 301,
+ RATE_ESTIMATION_OBSERVER = 302,
+ RATE_ESTIMATION_BATCH_PARAMETER = 303,
+ RATE_ESTIMATION_CHOICE = 304,
+} RateEstimationOptions;
+
+typedef enum {
+ INTEREST_OUTPUT = 401,
+ INTEREST_RETRANSMISSION = 402,
+ INTEREST_EXPIRED = 403,
+ INTEREST_SATISFIED = 404,
+ CONTENT_OBJECT_INPUT = 411,
+ MANIFEST_INPUT = 412,
+ CONTENT_OBJECT_TO_VERIFY = 413,
+ CONTENT_RETRIEVED = 414,
+ TIMER_EXPIRES = 415
+} ConsumerCallbacksOptions;
+
+typedef enum {
+ INTEREST_INPUT = 501,
+ INTEREST_DROP = 502,
+ INTEREST_PASS = 503,
+ CACHE_HIT = 506,
+ CACHE_MISS = 508,
+ NEW_CONTENT_OBJECT = 509,
+ CONTENT_OBJECT_SIGN = 513,
+ CONTENT_OBJECT_READY = 510,
+ CONTENT_OBJECT_OUTPUT = 511,
+ CONTENT_PRODUCED = 512
+} ProducerCallbacksOptions;
+
+typedef enum { OUTPUT_INTERFACE = 601 } DataLinkOptions;
+
+typedef enum { VIRTUAL_DOWNLOAD = 601, USE_CFG_FILE = 603 } OtherOptions;
+
+typedef enum {
+ SHA_256 = 701,
+ RSA_256 = 702,
+} SignatureType;
+
+} // namespace interface
+
+} // end namespace transport \ No newline at end of file
diff --git a/libtransport/src/hicn/transport/interfaces/socket_producer.cc b/libtransport/src/hicn/transport/interfaces/socket_producer.cc
new file mode 100755
index 000000000..69adc2b3f
--- /dev/null
+++ b/libtransport/src/hicn/transport/interfaces/socket_producer.cc
@@ -0,0 +1,948 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <hicn/transport/interfaces/socket_producer.h>
+
+namespace transport {
+
+namespace interface {
+
+typedef std::chrono::time_point<std::chrono::steady_clock> Time;
+typedef std::chrono::microseconds TimeDuration;
+
+ProducerSocket::ProducerSocket() : ProducerSocket(internal_io_service_) {}
+
+ProducerSocket::ProducerSocket(asio::io_service &io_service)
+ : io_service_(io_service),
+ portal_(std::make_shared<Portal>(io_service_)),
+ data_packet_size_(default_values::content_object_packet_size),
+ content_object_expiry_time_(default_values::content_object_expiry_time),
+ output_buffer_(default_values::producer_socket_output_buffer_size),
+ async_thread_(),
+ registration_status_(REGISTRATION_NOT_ATTEMPTED),
+ making_manifest_(false),
+ signature_type_(SHA_256),
+ hash_algorithm_(HashAlgorithm::SHA_256),
+ input_buffer_capacity_(default_values::producer_socket_input_buffer_size),
+ input_buffer_size_(0),
+ processing_thread_stop_(false),
+ listening_thread_stop_(false),
+ on_interest_input_(VOID_HANDLER),
+ on_interest_dropped_input_buffer_(VOID_HANDLER),
+ on_interest_inserted_input_buffer_(VOID_HANDLER),
+ on_interest_satisfied_output_buffer_(VOID_HANDLER),
+ on_interest_process_(VOID_HANDLER),
+ on_new_segment_(VOID_HANDLER),
+ on_content_object_to_sign_(VOID_HANDLER),
+ on_content_object_in_output_buffer_(VOID_HANDLER),
+ on_content_object_output_(VOID_HANDLER),
+ on_content_object_evicted_from_output_buffer_(VOID_HANDLER),
+ on_content_produced_(VOID_HANDLER) {
+ listening_thread_stop_ = false;
+}
+
+ProducerSocket::~ProducerSocket() {
+ TRANSPORT_LOGI("Destroying the ProducerSocket");
+ processing_thread_stop_ = true;
+ portal_->stopEventsLoop();
+
+ if (processing_thread_.joinable()) {
+ processing_thread_.join();
+ }
+
+ if (listening_thread_.joinable()) {
+ listening_thread_.join();
+ }
+}
+
+void ProducerSocket::connect() {
+ portal_->connect(false);
+ listening_thread_ = std::thread(std::bind(&ProducerSocket::listen, this));
+}
+
+void ProducerSocket::serveForever() {
+ if (listening_thread_.joinable()) {
+ listening_thread_.join();
+ }
+}
+
+void ProducerSocket::stop() {
+ TRANSPORT_LOGI("Calling stop for ProducerSocket");
+ portal_->killConnection();
+ portal_->stopEventsLoop();
+}
+
+void ProducerSocket::registerPrefix(const Prefix &producer_namespace) {
+ served_namespaces_.push_back(producer_namespace);
+}
+
+void ProducerSocket::listen() {
+ registration_status_ = REGISTRATION_IN_PROGRESS;
+ bool first = true;
+
+ for (core::Prefix &producer_namespace : served_namespaces_) {
+ if (first) {
+ core::BindConfig bind_config(producer_namespace, 1000);
+ portal_->bind(bind_config);
+ portal_->setProducerCallback(this);
+ first = !first;
+ } else {
+ portal_->registerRoute(producer_namespace);
+ }
+ }
+
+ portal_->runEventsLoop();
+}
+
+void ProducerSocket::passContentObjectToCallbacks(
+ const std::shared_ptr<ContentObject> &content_object) {
+ if (content_object) {
+ if (on_new_segment_ != VOID_HANDLER) {
+ on_new_segment_(*this, *content_object);
+ }
+
+ if (on_content_object_to_sign_ != VOID_HANDLER) {
+ on_content_object_to_sign_(*this, *content_object);
+ }
+
+ if (on_content_object_in_output_buffer_ != VOID_HANDLER) {
+ on_content_object_in_output_buffer_(*this, *content_object);
+ }
+
+ output_buffer_.insert(content_object);
+
+ if (on_content_object_output_ != VOID_HANDLER) {
+ on_content_object_output_(*this, *content_object);
+ }
+
+#ifndef PUSH_API
+ std::unordered_map<Name, std::shared_ptr<const Interest>>::iterator it;
+
+ {
+ std::lock_guard<std::mutex> lock(pending_interests_mtx_);
+ it = pending_interests_.find(content_object->getName());
+ }
+
+ if (it != pending_interests_.end()) {
+ content_object->setLocator(it->second->getLocator());
+ portal_->sendContentObject(*content_object);
+ std::lock_guard<std::mutex> lock(pending_interests_mtx_);
+ pending_interests_.erase(it);
+ }
+#else
+ portal_->sendContentObject(*content_object);
+#endif
+ }
+}
+
+void ProducerSocket::produce(ContentObject &content_object) {
+ if (on_content_object_in_output_buffer_ != VOID_HANDLER) {
+ on_content_object_in_output_buffer_(*this, content_object);
+ }
+
+ output_buffer_.insert(std::static_pointer_cast<ContentObject>(
+ content_object.shared_from_this()));
+
+ if (on_content_object_output_ != VOID_HANDLER) {
+ on_content_object_output_(*this, content_object);
+ }
+
+#ifndef PUSH_API
+ std::unordered_map<Name, std::shared_ptr<const Interest>>::iterator it;
+
+ {
+ std::lock_guard<std::mutex> lock(pending_interests_mtx_);
+ it = pending_interests_.find(content_object.getName());
+ }
+
+ if (it != pending_interests_.end()) {
+ content_object.setLocator(it->second->getLocator());
+ portal_->sendContentObject(content_object);
+ std::lock_guard<std::mutex> lock(pending_interests_mtx_);
+ pending_interests_.erase(it);
+ }
+#else
+ portal_->sendContentObject(content_object);
+#endif
+}
+
+uint32_t ProducerSocket::produce(Name content_name, const uint8_t *buf,
+ size_t buffer_size, bool is_last,
+ uint32_t start_offset) {
+ if (TRANSPORT_EXPECT_FALSE(buffer_size == 0)) {
+ return 0;
+ }
+
+ const std::size_t hash_size = 32;
+
+ int bytes_segmented = 0;
+ std::size_t header_size;
+ std::size_t manifest_header_size = 0;
+ std::size_t signature_length = 0;
+ std::uint32_t final_block_number = 0;
+
+ uint64_t free_space_for_content = 0;
+
+ core::Packet::Format format;
+
+ uint32_t current_segment = start_offset;
+ std::shared_ptr<ContentObjectManifest> manifest;
+ bool is_last_manifest = false;
+ std::unique_ptr<utils::CryptoHash> zero_hash;
+
+ // TODO Manifest may still be used for indexing
+ if (making_manifest_ && !identity_) {
+ throw errors::RuntimeException(
+ "Making manifests without setting producer identity. Aborting.");
+ }
+
+ core::Packet::Format hf_format = core::Packet::Format::HF_UNSPEC;
+ core::Packet::Format hf_format_ah = core::Packet::Format::HF_UNSPEC;
+ if (content_name.getType() == HNT_CONTIGUOUS_V4 ||
+ content_name.getType() == HNT_IOV_V4) {
+ hf_format = core::Packet::Format::HF_INET_TCP;
+ hf_format_ah = core::Packet::Format::HF_INET_TCP_AH;
+ } else if (content_name.getType() == HNT_CONTIGUOUS_V6 ||
+ content_name.getType() == HNT_IOV_V6) {
+ hf_format = core::Packet::Format::HF_INET6_TCP;
+ hf_format_ah = core::Packet::Format::HF_INET6_TCP_AH;
+ } else {
+ throw errors::RuntimeException("Unknown name format.");
+ }
+
+ format = hf_format;
+ if (making_manifest_) {
+ format = hf_format;
+ manifest_header_size = core::Packet::getHeaderSizeFromFormat(
+ hf_format_ah, identity_->getSignatureLength());
+ } else if (identity_) {
+ format = hf_format_ah;
+ signature_length = identity_->getSignatureLength();
+ }
+
+ header_size = core::Packet::getHeaderSizeFromFormat(format, signature_length);
+
+ free_space_for_content = data_packet_size_ - header_size;
+
+ uint32_t number_of_segments =
+ uint32_t(std::ceil(double(buffer_size) / double(free_space_for_content)));
+
+ if (free_space_for_content * number_of_segments < buffer_size) {
+ number_of_segments++;
+ }
+
+ if (making_manifest_) {
+ auto segment_in_manifest = static_cast<float>(
+ std::floor(double(data_packet_size_ - manifest_header_size -
+ ContentObjectManifest::getManifestHeaderSize()) /
+ (4.0 + 32.0)) -
+ 1.0);
+ auto number_of_manifests = static_cast<uint32_t>(
+ std::ceil(float(number_of_segments) / segment_in_manifest));
+ final_block_number = number_of_segments + number_of_manifests - 1;
+
+ manifest.reset(ContentObjectManifest::createManifest(
+ content_name.setSuffix(current_segment++),
+ core::ManifestVersion::VERSION_1, core::ManifestType::INLINE_MANIFEST,
+ hash_algorithm_, is_last_manifest, content_name,
+ core::NextSegmentCalculationStrategy::INCREMENTAL,
+ identity_->getSignatureLength()));
+ manifest->setLifetime(content_object_expiry_time_);
+
+ if (is_last) {
+ manifest->setFinalBlockNumber(final_block_number);
+ } else {
+ manifest->setFinalBlockNumber(std::numeric_limits<uint32_t>::max());
+ }
+
+ uint8_t hash[hash_size];
+ std::memset(hash, 0, hash_size);
+ zero_hash = std::make_unique<utils::CryptoHash>(
+ hash, hash_size, static_cast<utils::CryptoHashType>(hash_algorithm_));
+ }
+
+ for (unsigned int packaged_segments = 0;
+ packaged_segments < number_of_segments; packaged_segments++) {
+ if (making_manifest_) {
+ if (manifest->estimateManifestSize(2) >
+ data_packet_size_ - manifest_header_size) {
+ // Add next manifest
+ manifest->addSuffixHash(current_segment, *zero_hash);
+
+ // Send the current manifest
+ manifest->encode();
+
+ identity_->getSigner().sign(*manifest);
+
+ passContentObjectToCallbacks(manifest);
+
+ // Create new manifest. The reference to the last manifest has been
+ // acquired in the passContentObjectToCallbacks function, so we can
+ // safely release this reference
+ manifest.reset(ContentObjectManifest::createManifest(
+ content_name.setSuffix(current_segment),
+ core::ManifestVersion::VERSION_1,
+ core::ManifestType::INLINE_MANIFEST, hash_algorithm_,
+ is_last_manifest, content_name,
+ core::NextSegmentCalculationStrategy::INCREMENTAL,
+ identity_->getSignatureLength()));
+ manifest->setLifetime(content_object_expiry_time_);
+ if (is_last) {
+ manifest->setFinalBlockNumber(final_block_number);
+ } else {
+ manifest->setFinalBlockNumber(std::numeric_limits<uint32_t>::max());
+ }
+ current_segment++;
+ }
+ }
+
+ auto content_object = std::make_shared<ContentObject>(
+ content_name.setSuffix(current_segment), format);
+ content_object->setLifetime(content_object_expiry_time_);
+
+ if (!making_manifest_ && identity_) {
+ content_object->setSignatureSize(signature_length);
+ }
+
+ if (packaged_segments == number_of_segments - 1) {
+ content_object->appendPayload(&buf[bytes_segmented],
+ buffer_size - bytes_segmented);
+ bytes_segmented += buffer_size - bytes_segmented;
+
+ if (is_last && making_manifest_) {
+ is_last_manifest = true;
+ } else if (is_last) {
+ content_object->setRst();
+ }
+
+ } else {
+ content_object->appendPayload(&buf[bytes_segmented],
+ free_space_for_content);
+ bytes_segmented += free_space_for_content;
+ }
+
+ if (making_manifest_) {
+ using namespace std::chrono_literals;
+ utils::CryptoHash hash = content_object->computeDigest(hash_algorithm_);
+ manifest->addSuffixHash(current_segment, hash);
+ } else if (identity_) {
+ identity_->getSigner().sign(*content_object);
+ }
+
+ current_segment++;
+ passContentObjectToCallbacks(content_object);
+ }
+
+ if (making_manifest_) {
+ if (is_last_manifest) {
+ manifest->setFinalManifest(is_last_manifest);
+ }
+ manifest->encode();
+ // Time t0 = std::chrono::steady_clock::now();
+ identity_->getSigner().sign(*manifest);
+ passContentObjectToCallbacks(manifest);
+ }
+
+ if (on_content_produced_ != VOID_HANDLER) {
+ on_content_produced_(*this, std::make_error_code(std::errc(0)),
+ buffer_size);
+ }
+
+ return current_segment;
+}
+
+void ProducerSocket::asyncProduce(ContentObject &content_object) {
+ if (!async_thread_.stopped()) {
+ // async_thread_.add(std::bind(&ProducerSocket::produce, this,
+ // content_object));
+ }
+}
+
+// void ProducerSocket::asyncProduce(const Name &suffix,
+// const uint8_t *buf,
+// size_t buffer_size,
+// AsyncProduceCallback && handler) {
+// if (!async_thread_.stopped()) {
+// async_thread_.add([this, buffer = buf, size = buffer_size, cb =
+// std::move(handler)] () {
+// uint64_t bytes_written = produce(suff, buffer, size, 0, false);
+// auto ec = std::make_errc(0);
+// cb(*this, ec, bytes_written);
+// });
+// }
+// }
+
+void ProducerSocket::asyncProduce(const Name &suffix, const uint8_t *buf,
+ size_t buffer_size) {
+ if (!async_thread_.stopped()) {
+ async_thread_.add(
+ [this, suff = suffix, buffer = buf, size = buffer_size]() {
+ produce(suff, buffer, size, true);
+ });
+ }
+}
+
+void ProducerSocket::asyncProduce(
+ const Name &suffix, utils::SharableVector<uint8_t> &&output_buffer) {
+ if (!async_thread_.stopped()) {
+ async_thread_.add(
+ [this, suff = suffix, buffer = std::move(output_buffer)]() {
+ TRANSPORT_LOGI("FOR REAL!!!!!! --> Producing content with name %s",
+ suff.toString().c_str());
+ produce(suff, &buffer[0], buffer.size(), true);
+ });
+ }
+}
+
+void ProducerSocket::onInterest(const Interest &interest) {
+ if (on_interest_input_ != VOID_HANDLER) {
+ on_interest_input_(*this, interest);
+ }
+
+ const std::shared_ptr<ContentObject> content_object =
+ output_buffer_.find(interest);
+
+ if (content_object) {
+ if (on_interest_satisfied_output_buffer_ != VOID_HANDLER) {
+ on_interest_satisfied_output_buffer_(*this, interest);
+ }
+
+ if (on_content_object_output_ != VOID_HANDLER) {
+ on_content_object_output_(*this, *content_object);
+ }
+
+ portal_->sendContentObject(*content_object);
+ } else {
+#ifndef PUSH_API
+ {
+ std::lock_guard<std::mutex> lock(pending_interests_mtx_);
+ pending_interests_[interest.getName()] =
+ std::static_pointer_cast<const Interest>(interest.shared_from_this());
+ }
+#endif
+
+ if (on_interest_process_ != VOID_HANDLER) {
+ // external_io_service_.post([this, &interest] () {
+ on_interest_process_(*this, interest);
+ // });
+ }
+ }
+}
+
+asio::io_service &ProducerSocket::getIoService() { return io_service_; }
+
+int ProducerSocket::setSocketOption(int socket_option_key,
+ uint32_t socket_option_value) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::DATA_PACKET_SIZE:
+ if (socket_option_value < default_values::max_content_object_size &&
+ socket_option_value > 0) {
+ data_packet_size_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+ } else {
+ return SOCKET_OPTION_NOT_SET;
+ }
+
+ case GeneralTransportOptions::INPUT_BUFFER_SIZE:
+ if (socket_option_value >= 1) {
+ input_buffer_capacity_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+ } else {
+ return SOCKET_OPTION_NOT_SET;
+ }
+
+ case GeneralTransportOptions::OUTPUT_BUFFER_SIZE:
+ output_buffer_.setLimit(socket_option_value);
+ return SOCKET_OPTION_SET;
+
+ case GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME:
+ content_object_expiry_time_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+
+ case GeneralTransportOptions::SIGNATURE_TYPE:
+ if (socket_option_value == SOCKET_OPTION_DEFAULT) {
+ signature_type_ = SHA_256;
+ } else {
+ signature_type_ = socket_option_value;
+ }
+
+ if (signature_type_ == SHA_256 || signature_type_ == RSA_256) {
+ signature_size_ = 32;
+ }
+
+ case ProducerCallbacksOptions::INTEREST_INPUT:
+ if (socket_option_value == VOID_HANDLER) {
+ on_interest_input_ = VOID_HANDLER;
+ return SOCKET_OPTION_SET;
+ }
+
+ case ProducerCallbacksOptions::INTEREST_DROP:
+ if (socket_option_value == VOID_HANDLER) {
+ on_interest_dropped_input_buffer_ = VOID_HANDLER;
+ return SOCKET_OPTION_SET;
+ }
+
+ case ProducerCallbacksOptions::INTEREST_PASS:
+ if (socket_option_value == VOID_HANDLER) {
+ on_interest_inserted_input_buffer_ = VOID_HANDLER;
+ return SOCKET_OPTION_SET;
+ }
+
+ case ProducerCallbacksOptions::CACHE_HIT:
+ if (socket_option_value == VOID_HANDLER) {
+ on_interest_satisfied_output_buffer_ = VOID_HANDLER;
+ return SOCKET_OPTION_SET;
+ }
+
+ case ProducerCallbacksOptions::CACHE_MISS:
+ if (socket_option_value == VOID_HANDLER) {
+ on_interest_process_ = VOID_HANDLER;
+ return SOCKET_OPTION_SET;
+ }
+
+ case ProducerCallbacksOptions::NEW_CONTENT_OBJECT:
+ if (socket_option_value == VOID_HANDLER) {
+ on_new_segment_ = VOID_HANDLER;
+ return SOCKET_OPTION_SET;
+ }
+
+ case ProducerCallbacksOptions::CONTENT_OBJECT_SIGN:
+ if (socket_option_value == VOID_HANDLER) {
+ on_content_object_to_sign_ = VOID_HANDLER;
+ return SOCKET_OPTION_SET;
+ }
+
+ case ProducerCallbacksOptions::CONTENT_OBJECT_READY:
+ if (socket_option_value == VOID_HANDLER) {
+ on_content_object_in_output_buffer_ = VOID_HANDLER;
+ return SOCKET_OPTION_SET;
+ }
+
+ case ProducerCallbacksOptions::CONTENT_OBJECT_OUTPUT:
+ if (socket_option_value == VOID_HANDLER) {
+ on_content_object_output_ = VOID_HANDLER;
+ return SOCKET_OPTION_SET;
+ }
+
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+}
+
+int ProducerSocket::setSocketOption(int socket_option_key,
+ double socket_option_value) {
+ return SOCKET_OPTION_NOT_SET;
+}
+
+int ProducerSocket::setSocketOption(int socket_option_key,
+ bool socket_option_value) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::MAKE_MANIFEST:
+ making_manifest_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+}
+
+int ProducerSocket::setSocketOption(int socket_option_key,
+ Name socket_option_value) {
+ return SOCKET_OPTION_NOT_SET;
+}
+
+int ProducerSocket::setSocketOption(int socket_option_key,
+ std::list<Prefix> socket_option_value) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::NETWORK_NAME:
+ served_namespaces_ = socket_option_value;
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+
+ return SOCKET_OPTION_SET;
+}
+
+int ProducerSocket::setSocketOption(
+ int socket_option_key, ProducerContentObjectCallback socket_option_value) {
+ switch (socket_option_key) {
+ case ProducerCallbacksOptions::NEW_CONTENT_OBJECT:
+ on_new_segment_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+
+ case ProducerCallbacksOptions::CONTENT_OBJECT_SIGN:
+ on_content_object_to_sign_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+
+ case ProducerCallbacksOptions::CONTENT_OBJECT_READY:
+ on_content_object_in_output_buffer_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+
+ case ProducerCallbacksOptions::CONTENT_OBJECT_OUTPUT:
+ on_content_object_output_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+}
+
+int ProducerSocket::setSocketOption(
+ int socket_option_key, ProducerInterestCallback socket_option_value) {
+ switch (socket_option_key) {
+ case ProducerCallbacksOptions::INTEREST_INPUT:
+ on_interest_input_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+
+ case ProducerCallbacksOptions::INTEREST_DROP:
+ on_interest_dropped_input_buffer_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+
+ case ProducerCallbacksOptions::INTEREST_PASS:
+ on_interest_inserted_input_buffer_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+
+ case ProducerCallbacksOptions::CACHE_HIT:
+ on_interest_satisfied_output_buffer_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+
+ case ProducerCallbacksOptions::CACHE_MISS:
+ on_interest_process_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+}
+
+int ProducerSocket::setSocketOption(
+ int socket_option_key, ProducerContentCallback socket_option_value) {
+ switch (socket_option_key) {
+ case ProducerCallbacksOptions::CONTENT_PRODUCED:
+ on_content_produced_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+}
+
+int ProducerSocket::setSocketOption(
+ int socket_option_key, ConsumerContentObjectCallback socket_option_value) {
+ return SOCKET_OPTION_NOT_SET;
+}
+
+int ProducerSocket::setSocketOption(
+ int socket_option_key,
+ ConsumerContentObjectVerificationCallback socket_option_value) {
+ return SOCKET_OPTION_NOT_SET;
+}
+
+int ProducerSocket::setSocketOption(
+ int socket_option_key, ConsumerInterestCallback socket_option_value) {
+ return SOCKET_OPTION_NOT_SET;
+}
+
+int ProducerSocket::setSocketOption(
+ int socket_option_key, ConsumerContentCallback socket_option_value) {
+ return SOCKET_OPTION_NOT_SET;
+}
+
+int ProducerSocket::setSocketOption(
+ int socket_option_key, ConsumerManifestCallback socket_option_value) {
+ return SOCKET_OPTION_NOT_SET;
+}
+
+int ProducerSocket::setSocketOption(int socket_option_key,
+ HashAlgorithm socket_option_value) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::HASH_ALGORITHM:
+ hash_algorithm_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+}
+
+int ProducerSocket::setSocketOption(int socket_option_key,
+ utils::CryptoSuite socket_option_value) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::CRYPTO_SUITE:
+ crypto_suite_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+}
+
+int ProducerSocket::setSocketOption(
+ int socket_option_key, const utils::Identity &socket_option_value) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::IDENTITY:
+ identity_.reset();
+ identity_ = std::make_unique<utils::Identity>(socket_option_value);
+ return SOCKET_OPTION_SET;
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+}
+
+int ProducerSocket::setSocketOption(int socket_option_key,
+ const std::string &socket_option_value) {
+ switch (socket_option_key) {
+ case DataLinkOptions::OUTPUT_INTERFACE:
+ output_interface_ = socket_option_value;
+ portal_->setOutputInterface(output_interface_);
+ return SOCKET_OPTION_SET;
+ }
+
+ return SOCKET_OPTION_NOT_SET;
+}
+
+int ProducerSocket::setSocketOption(
+ int socket_option_key,
+ interface::ConsumerTimerCallback socket_option_value) {
+ return SOCKET_OPTION_NOT_SET;
+}
+
+int ProducerSocket::getSocketOption(int socket_option_key,
+ uint32_t &socket_option_value) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::INPUT_BUFFER_SIZE:
+ socket_option_value = input_buffer_capacity_;
+ return SOCKET_OPTION_GET;
+
+ case GeneralTransportOptions::OUTPUT_BUFFER_SIZE:
+ socket_option_value = output_buffer_.getLimit();
+ return SOCKET_OPTION_GET;
+
+ case GeneralTransportOptions::DATA_PACKET_SIZE:
+ socket_option_value = data_packet_size_;
+ return SOCKET_OPTION_GET;
+
+ case GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME:
+ socket_option_value = content_object_expiry_time_;
+ return SOCKET_OPTION_GET;
+
+ case GeneralTransportOptions::SIGNATURE_TYPE:
+ socket_option_value = signature_type_;
+ return SOCKET_OPTION_GET;
+
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+}
+
+int ProducerSocket::getSocketOption(int socket_option_key,
+ double &socket_option_value) {
+ return SOCKET_OPTION_NOT_GET;
+}
+
+int ProducerSocket::getSocketOption(int socket_option_key,
+ bool &socket_option_value) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::MAKE_MANIFEST:
+ socket_option_value = making_manifest_;
+ return SOCKET_OPTION_GET;
+
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+}
+
+int ProducerSocket::getSocketOption(int socket_option_key,
+ Name &socket_option_value) {
+ return SOCKET_OPTION_NOT_GET;
+}
+
+int ProducerSocket::getSocketOption(int socket_option_key,
+ std::list<Prefix> &socket_option_value) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::NETWORK_NAME:
+
+ socket_option_value = served_namespaces_;
+ return SOCKET_OPTION_GET;
+
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+}
+
+int ProducerSocket::getSocketOption(
+ int socket_option_key, ProducerContentObjectCallback &socket_option_value) {
+ switch (socket_option_key) {
+ case ProducerCallbacksOptions::NEW_CONTENT_OBJECT:
+ socket_option_value = on_new_segment_;
+ return SOCKET_OPTION_GET;
+
+ case ProducerCallbacksOptions::CONTENT_OBJECT_SIGN:
+ socket_option_value = on_content_object_to_sign_;
+ return SOCKET_OPTION_GET;
+
+ case ProducerCallbacksOptions::CONTENT_OBJECT_READY:
+ socket_option_value = on_content_object_in_output_buffer_;
+ return SOCKET_OPTION_GET;
+
+ case ProducerCallbacksOptions::CONTENT_OBJECT_OUTPUT:
+ socket_option_value = on_content_object_output_;
+ return SOCKET_OPTION_GET;
+
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+}
+
+int ProducerSocket::getSocketOption(
+ int socket_option_key, ProducerContentCallback &socket_option_value) {
+ switch (socket_option_key) {
+ case ProducerCallbacksOptions::CONTENT_PRODUCED:
+ socket_option_value = on_content_produced_;
+ return SOCKET_OPTION_GET;
+
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+}
+
+int ProducerSocket::getSocketOption(
+ int socket_option_key, ProducerInterestCallback &socket_option_value) {
+ switch (socket_option_key) {
+ case ProducerCallbacksOptions::INTEREST_INPUT:
+ socket_option_value = on_interest_input_;
+ return SOCKET_OPTION_GET;
+
+ case ProducerCallbacksOptions::INTEREST_DROP:
+ socket_option_value = on_interest_dropped_input_buffer_;
+ return SOCKET_OPTION_GET;
+
+ case ProducerCallbacksOptions::INTEREST_PASS:
+ socket_option_value = on_interest_inserted_input_buffer_;
+ return SOCKET_OPTION_GET;
+
+ case CACHE_HIT:
+ socket_option_value = on_interest_satisfied_output_buffer_;
+ return SOCKET_OPTION_GET;
+
+ case CACHE_MISS:
+ socket_option_value = on_interest_process_;
+ return SOCKET_OPTION_GET;
+
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+}
+
+int ProducerSocket::getSocketOption(
+ int socket_option_key, ConsumerContentObjectCallback &socket_option_value) {
+ return SOCKET_OPTION_NOT_GET;
+}
+
+int ProducerSocket::getSocketOption(
+ int socket_option_key,
+ ConsumerContentObjectVerificationCallback &socket_option_value) {
+ return SOCKET_OPTION_NOT_GET;
+}
+
+int ProducerSocket::getSocketOption(
+ int socket_option_key, ConsumerInterestCallback &socket_option_value) {
+ return SOCKET_OPTION_NOT_GET;
+}
+
+int ProducerSocket::getSocketOption(
+ int socket_option_key, ConsumerContentCallback &socket_option_value) {
+ return SOCKET_OPTION_NOT_GET;
+}
+
+int ProducerSocket::getSocketOption(
+ int socket_option_key, ConsumerManifestCallback &socket_option_value) {
+ return SOCKET_OPTION_NOT_GET;
+}
+
+int ProducerSocket::getSocketOption(
+ int socket_option_key, std::shared_ptr<Portal> &socket_option_value) {
+ switch (socket_option_key) {
+ case PORTAL:
+ socket_option_value = portal_;
+ return SOCKET_OPTION_GET;
+ }
+
+ return SOCKET_OPTION_NOT_GET;
+}
+
+int ProducerSocket::getSocketOption(int socket_option_key,
+ IcnObserver **socket_option_value) {
+ return SOCKET_OPTION_NOT_GET;
+}
+
+int ProducerSocket::setSocketOption(int socket_option_key,
+ IcnObserver *socket_option_value) {
+ return SOCKET_OPTION_NOT_SET;
+}
+
+int ProducerSocket::getSocketOption(int socket_option_key,
+ HashAlgorithm &socket_option_value) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::HASH_ALGORITHM:
+ socket_option_value = hash_algorithm_;
+ return SOCKET_OPTION_GET;
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+}
+
+int ProducerSocket::getSocketOption(int socket_option_key,
+ utils::CryptoSuite &socket_option_value) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::HASH_ALGORITHM:
+ socket_option_value = crypto_suite_;
+ return SOCKET_OPTION_GET;
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+}
+
+int ProducerSocket::getSocketOption(int socket_option_key,
+ utils::Identity &socket_option_value) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::IDENTITY:
+ if (identity_) {
+ socket_option_value = *identity_;
+ return SOCKET_OPTION_SET;
+ }
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+}
+
+int ProducerSocket::getSocketOption(int socket_option_key,
+ std::string &socket_option_value) {
+ switch (socket_option_key) {
+ case DataLinkOptions::OUTPUT_INTERFACE:
+ socket_option_value = output_interface_;
+ return SOCKET_OPTION_GET;
+ }
+
+ return SOCKET_OPTION_NOT_GET;
+}
+
+int ProducerSocket::getSocketOption(
+ int socket_option_key,
+ interface::ConsumerTimerCallback &socket_option_value) {
+ return SOCKET_OPTION_NOT_GET;
+}
+
+} // namespace interface
+
+} // end namespace transport
diff --git a/libtransport/src/hicn/transport/interfaces/socket_producer.h b/libtransport/src/hicn/transport/interfaces/socket_producer.h
new file mode 100755
index 000000000..06c47d973
--- /dev/null
+++ b/libtransport/src/hicn/transport/interfaces/socket_producer.h
@@ -0,0 +1,269 @@
+/*
+ * Copyright (c) 2017-2019 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <hicn/transport/interfaces/socket.h>
+#include <hicn/transport/utils/content_store.h>
+#include <hicn/transport/utils/event_thread.h>
+#include <hicn/transport/utils/sharable_vector.h>
+
+#include <atomic>
+#include <cmath>
+#include <mutex>
+#include <queue>
+#include <thread>
+
+#define PUSH_API 1
+
+#define REGISTRATION_NOT_ATTEMPTED 0
+#define REGISTRATION_SUCCESS 1
+#define REGISTRATION_FAILURE 2
+#define REGISTRATION_IN_PROGRESS 3
+
+namespace transport {
+
+namespace interface {
+
+using namespace core;
+
+class ProducerSocket : public Socket<BasePortal>,
+ public BasePortal::ProducerCallback {
+ public:
+ explicit ProducerSocket();
+ explicit ProducerSocket(asio::io_service &io_service);
+
+ ~ProducerSocket();
+
+ void connect() override;
+
+ uint32_t produce(Name content_name, const uint8_t *buffer, size_t buffer_size,
+ bool is_last = true, uint32_t start_offset = 0);
+
+ void produce(ContentObject &content_object);
+
+ void asyncProduce(const Name &suffix, const uint8_t *buf, size_t buffer_size);
+
+ void asyncProduce(const Name &suffix,
+ utils::SharableVector<uint8_t> &&output_buffer);
+
+ void asyncProduce(ContentObject &content_object);
+
+ void registerPrefix(const Prefix &producer_namespace);
+
+ void serveForever();
+
+ void stop();
+
+ asio::io_service &getIoService() override;
+
+ virtual void onInterest(const Interest &interest);
+
+ virtual void onInterest(Interest::Ptr &&interest) override {
+ onInterest(*interest);
+ };
+
+ int setSocketOption(int socket_option_key,
+ uint32_t socket_option_value) override;
+
+ int setSocketOption(int socket_option_key,
+ double socket_option_value) override;
+
+ int setSocketOption(int socket_option_key, bool socket_option_value) override;
+
+ int setSocketOption(int socket_option_key, Name socket_option_value) override;
+
+ int setSocketOption(int socket_option_key,
+ std::list<Prefix> socket_option_value) override;
+
+ int setSocketOption(
+ int socket_option_key,
+ ProducerContentObjectCallback socket_option_value) override;
+
+ int setSocketOption(int socket_option_key,
+ ProducerInterestCallback socket_option_value) override;
+
+ int setSocketOption(
+ int socket_option_key,
+ ConsumerContentObjectVerificationCallback socket_option_value) override;
+
+ int setSocketOption(
+ int socket_option_key,
+ ConsumerContentObjectCallback socket_option_value) override;
+
+ int setSocketOption(int socket_option_key,
+ ConsumerInterestCallback socket_option_value) override;
+
+ int setSocketOption(int socket_option_key,
+ ConsumerContentCallback socket_option_value) override;
+
+ int setSocketOption(int socket_option_key,
+ ConsumerManifestCallback socket_option_value) override;
+
+ int setSocketOption(int socket_option_key, IcnObserver *obs) override;
+
+ int setSocketOption(int socket_option_key,
+ HashAlgorithm socket_option_value) override;
+
+ int setSocketOption(int socket_option_key,
+ utils::CryptoSuite socket_option_value) override;
+
+ int setSocketOption(int socket_option_key,
+ const utils::Identity &socket_option_value) override;
+
+ int setSocketOption(int socket_option_key,
+ const std::string &socket_option_value) override;
+
+ int setSocketOption(int socket_option_key,
+ ConsumerTimerCallback socket_option_value) override;
+
+ int setSocketOption(int socket_option_key,
+ ProducerContentCallback socket_option_value) override;
+
+ int getSocketOption(int socket_option_key,
+ uint32_t &socket_option_value) override;
+
+ int getSocketOption(int socket_option_key,
+ double &socket_option_value) override;
+
+ int getSocketOption(int socket_option_key,
+ bool &socket_option_value) override;
+
+ int getSocketOption(int socket_option_key,
+ Name &socket_option_value) override;
+
+ int getSocketOption(int socket_option_key,
+ std::list<Prefix> &socket_option_value) override;
+
+ int getSocketOption(
+ int socket_option_key,
+ ProducerContentObjectCallback &socket_option_value) override;
+
+ int getSocketOption(int socket_option_key,
+ ProducerInterestCallback &socket_option_value) override;
+
+ int getSocketOption(
+ int socket_option_key,
+ ConsumerContentObjectVerificationCallback &socket_option_value) override;
+
+ int getSocketOption(
+ int socket_option_key,
+ ConsumerContentObjectCallback &socket_option_value) override;
+
+ int getSocketOption(int socket_option_key,
+ ConsumerInterestCallback &socket_option_value) override;
+
+ int getSocketOption(int socket_option_key,
+ ConsumerContentCallback &socket_option_value) override;
+
+ int getSocketOption(int socket_option_key,
+ ConsumerManifestCallback &socket_option_value) override;
+
+ int getSocketOption(int socket_option_key,
+ std::shared_ptr<Portal> &socket_option_value) override;
+
+ int getSocketOption(int socket_option_key,
+ IcnObserver **socket_option_value) override;
+
+ int getSocketOption(int socket_option_key,
+ HashAlgorithm &socket_option_value) override;
+
+ int getSocketOption(int socket_option_key,
+ utils::CryptoSuite &socket_option_value) override;
+
+ int getSocketOption(int socket_option_key,
+ utils::Identity &socket_option_value) override;
+
+ int getSocketOption(int socket_option_key,
+ std::string &socket_option_value) override;
+
+ int getSocketOption(int socket_option_key,
+ ProducerContentCallback &socket_option_value) override;
+
+ int getSocketOption(int socket_option_key,
+ ConsumerTimerCallback &socket_option_value) override;
+
+ protected:
+ asio::io_service internal_io_service_;
+ asio::io_service &io_service_;
+ std::shared_ptr<Portal> portal_;
+ std::size_t data_packet_size_;
+ std::list<Prefix> served_namespaces_;
+ uint32_t content_object_expiry_time_;
+
+ // buffers
+ utils::ContentStore output_buffer_;
+
+ private:
+ utils::EventThread async_thread_;
+
+ int registration_status_;
+
+ bool making_manifest_;
+
+ // map for storing sequence numbers for several calls of the publish function
+ std::unordered_map<Name, std::unordered_map<int, uint32_t>> seq_number_map_;
+
+ int signature_type_;
+ int signature_size_;
+
+ HashAlgorithm hash_algorithm_;
+ utils::CryptoSuite crypto_suite_;
+ std::unique_ptr<utils::Identity> identity_;
+ // utils::Signer& signer_;
+
+ // buffers
+
+ std::queue<std::shared_ptr<const Interest>> input_buffer_;
+ std::atomic_size_t input_buffer_capacity_;
+ std::atomic_size_t input_buffer_size_;
+
+#ifndef PUSH_API
+ std::mutex pending_interests_mtx_;
+ std::unordered_map<Name, std::shared_ptr<const Interest>> pending_interests_;
+#endif
+
+ // threads
+ std::thread listening_thread_;
+ std::thread processing_thread_;
+ volatile bool processing_thread_stop_;
+ volatile bool listening_thread_stop_;
+
+ // callbacks
+ protected:
+ ProducerInterestCallback on_interest_input_;
+ ProducerInterestCallback on_interest_dropped_input_buffer_;
+ ProducerInterestCallback on_interest_inserted_input_buffer_;
+ ProducerInterestCallback on_interest_satisfied_output_buffer_;
+ ProducerInterestCallback on_interest_process_;
+
+ ProducerContentObjectCallback on_new_segment_;
+ ProducerContentObjectCallback on_content_object_to_sign_;
+ ProducerContentObjectCallback on_content_object_in_output_buffer_;
+ ProducerContentObjectCallback on_content_object_output_;
+ ProducerContentObjectCallback on_content_object_evicted_from_output_buffer_;
+
+ ProducerContentCallback on_content_produced_;
+
+ private:
+ void listen();
+
+ void passContentObjectToCallbacks(
+ const std::shared_ptr<ContentObject> &content_object);
+};
+
+} // namespace interface
+
+} // end namespace transport