summaryrefslogtreecommitdiffstats
path: root/icnet
diff options
context:
space:
mode:
authorMauro Sardara <msardara+fdio@cisco.com>2017-02-22 14:37:37 +0100
committerMauro Sardara <msardara+fdio@cisco.com>2017-02-22 13:46:08 +0000
commitba8541cad3a4069886444abbd1848b6ef3fff72c (patch)
tree39226dd9b036ca7e513c2cccd8e71e15e09b86bc /icnet
parent9b30fc10fb1cbebe651e5a107e8ca5b24de54675 (diff)
Initial Commit: libicnet
Change-Id: I10a72cb0d84b76553a85c168416b847f6a4ff5f6 Signed-off-by: Mauro Sardara <msardara+fdio@cisco.com>
Diffstat (limited to 'icnet')
-rw-r--r--icnet/CMakeLists.txt60
-rw-r--r--icnet/ccnx/icnet_ccnx_common.h49
-rw-r--r--icnet/ccnx/icnet_ccnx_content_object.cc202
-rw-r--r--icnet/ccnx/icnet_ccnx_content_object.h121
-rw-r--r--icnet/ccnx/icnet_ccnx_facade.h27
-rw-r--r--icnet/ccnx/icnet_ccnx_interest.cc147
-rw-r--r--icnet/ccnx/icnet_ccnx_interest.h92
-rw-r--r--icnet/ccnx/icnet_ccnx_key_locator.cc40
-rw-r--r--icnet/ccnx/icnet_ccnx_key_locator.h52
-rw-r--r--icnet/ccnx/icnet_ccnx_key_locator_type.h22
-rw-r--r--icnet/ccnx/icnet_ccnx_local_connector.cc195
-rw-r--r--icnet/ccnx/icnet_ccnx_local_connector.h98
-rw-r--r--icnet/ccnx/icnet_ccnx_manifest.cc78
-rw-r--r--icnet/ccnx/icnet_ccnx_manifest.h55
-rw-r--r--icnet/ccnx/icnet_ccnx_name.cc218
-rw-r--r--icnet/ccnx/icnet_ccnx_name.h120
-rw-r--r--icnet/ccnx/icnet_ccnx_network_message.cc88
-rw-r--r--icnet/ccnx/icnet_ccnx_network_message.h70
-rw-r--r--icnet/ccnx/icnet_ccnx_payload_type.h37
-rw-r--r--icnet/ccnx/icnet_ccnx_pending_interest.cc94
-rw-r--r--icnet/ccnx/icnet_ccnx_pending_interest.h93
-rw-r--r--icnet/ccnx/icnet_ccnx_portal.cc204
-rw-r--r--icnet/ccnx/icnet_ccnx_portal.h105
-rw-r--r--icnet/ccnx/icnet_ccnx_segment.cc78
-rw-r--r--icnet/ccnx/icnet_ccnx_segment.h71
-rw-r--r--icnet/transport/consumer.conf21
-rw-r--r--icnet/transport/icnet_common.h50
-rw-r--r--icnet/transport/icnet_content_store.cc75
-rw-r--r--icnet/transport/icnet_content_store.h60
-rw-r--r--icnet/transport/icnet_download_observer.h32
-rw-r--r--icnet/transport/icnet_rate_estimation.cc324
-rw-r--r--icnet/transport/icnet_rate_estimation.h187
-rw-r--r--icnet/transport/icnet_socket.h124
-rw-r--r--icnet/transport/icnet_socket_consumer.cc613
-rw-r--r--icnet/transport/icnet_socket_consumer.h161
-rw-r--r--icnet/transport/icnet_socket_options_default_values.h61
-rw-r--r--icnet/transport/icnet_socket_options_keys.h95
-rw-r--r--icnet/transport/icnet_socket_producer.cc717
-rw-r--r--icnet/transport/icnet_socket_producer.h172
-rw-r--r--icnet/transport/icnet_transport.cc31
-rw-r--r--icnet/transport/icnet_transport.h43
-rw-r--r--icnet/transport/icnet_transport_raaqm.cc466
-rw-r--r--icnet/transport/icnet_transport_raaqm.h100
-rw-r--r--icnet/transport/icnet_transport_raaqm_data_path.cc197
-rw-r--r--icnet/transport/icnet_transport_raaqm_data_path.h242
-rw-r--r--icnet/transport/icnet_transport_vegas.cc488
-rw-r--r--icnet/transport/icnet_transport_vegas.h109
-rw-r--r--icnet/transport/icnet_transport_vegas_rto_estimator.cc46
-rw-r--r--icnet/transport/icnet_transport_vegas_rto_estimator.h49
49 files changed, 6879 insertions, 0 deletions
diff --git a/icnet/CMakeLists.txt b/icnet/CMakeLists.txt
new file mode 100644
index 00000000..76e45f69
--- /dev/null
+++ b/icnet/CMakeLists.txt
@@ -0,0 +1,60 @@
+# Copyright (c) 2017 Cisco and/or its affiliates.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+cmake_minimum_required(VERSION 3.2)
+
+file(GLOB HEADER_FILES "ccnx/*.h")
+file(GLOB SOURCE_FILES "ccnx/*.cc")
+
+set(CP_API_HEADER_FILES
+ ${CMAKE_BINARY_DIR}/config.hpp
+ transport/icnet_rate_estimation.h
+ transport/icnet_download_observer.h
+ transport/icnet_socket_consumer.h
+ transport/icnet_socket.h
+ transport/icnet_socket_options_default_values.h
+ transport/icnet_socket_options_keys.h
+ transport/icnet_common.h
+ transport/icnet_socket_producer.h
+ transport/icnet_content_store.h
+ transport/icnet_transport_vegas.h
+ transport/icnet_transport.h
+ transport/icnet_transport_raaqm.h
+ transport/icnet_transport_vegas_rto_estimator.h
+ transport/icnet_transport_raaqm_data_path.h)
+
+set(CP_API_SOURCE_FILES
+ transport/icnet_socket_producer.cc
+ transport/icnet_socket_consumer.cc
+ transport/icnet_transport_vegas.cc
+ transport/icnet_transport.cc
+ transport/icnet_content_store.cc
+ transport/icnet_transport_raaqm.cc
+ transport/icnet_transport_vegas_rto_estimator.cc
+ transport/icnet_rate_estimation.cc
+ transport/icnet_transport_raaqm_data_path.cc)
+
+set(CP_API_CONFIG
+ transport/consumer.conf)
+
+set(CMAKE_INSTALL_RPATH "${CMAKE_INSTALL_PREFIX}/lib")
+
+add_library(icnet SHARED ${SOURCE_FILES} ${CP_API_SOURCE_FILES})
+target_link_libraries(icnet ${LIBRARIES} ${CMAKE_THREAD_LIBS_INIT} ${Boost_LIBRARIES})
+
+install(TARGETS icnet DESTINATION ${CMAKE_INSTALL_PREFIX}/lib)
+
+install(FILES ${COMMON_HEADER_FILES} DESTINATION ${CMAKE_INSTALL_PREFIX}/include/icnet)
+install(FILES ${HEADER_FILES} DESTINATION ${CMAKE_INSTALL_PREFIX}/include/icnet)
+install(FILES ${CP_API_HEADER_FILES} DESTINATION ${CMAKE_INSTALL_PREFIX}/include/icnet)
+install(FILES ${CP_API_CONFIG} DESTINATION ${CMAKE_INSTALL_PREFIX}/etc/)
diff --git a/icnet/ccnx/icnet_ccnx_common.h b/icnet/ccnx/icnet_ccnx_common.h
new file mode 100644
index 00000000..d8020fe7
--- /dev/null
+++ b/icnet/ccnx/icnet_ccnx_common.h
@@ -0,0 +1,49 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ICNET_CCNX_COMMON_H_
+#define ICNET_CCNX_COMMON_H_
+
+// require C++11
+#if __cplusplus < 201103L && !defined(__GXX_EXPERIMENTAL_CXX0X__)
+# error "libconsumer-producer must be compiled using the C++11 standard"
+#endif
+
+// Avoid use of C keyword restrict
+#define restrict __restrict__
+
+// Each ccnx name has to start with ccnx:
+#define CCNX_START_PREFIX "ccnx:"
+
+// #include "config.hpp"
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <iostream>
+#include <string>
+#include <map>
+#include <memory>
+#include <chrono>
+
+#if defined(__GNUC__) || defined(__clang__)
+# define DEPRECATED(func) func __attribute__ ((deprecated))
+#elif defined(_MSC_VER)
+# define DEPRECATED(func) __declspec(deprecated) func
+#else
+# pragma message("DEPRECATED not implemented")
+# define DEPRECATED(func) func
+#endif
+
+#endif // ICNET_CCNX_COMMON_H_
diff --git a/icnet/ccnx/icnet_ccnx_content_object.cc b/icnet/ccnx/icnet_ccnx_content_object.cc
new file mode 100644
index 00000000..1a7fa3c6
--- /dev/null
+++ b/icnet/ccnx/icnet_ccnx_content_object.cc
@@ -0,0 +1,202 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "icnet_ccnx_content_object.h"
+
+extern "C" {
+#include <ccnx/common/ccnx_WireFormatMessage.h>
+};
+
+namespace icnet {
+
+namespace ccnx {
+
+ContentObject::ContentObject()
+ : name_(ccnxName_Create()),
+ ccnx_content_object_(ccnxContentObject_CreateWithNameAndPayload(name_.getWrappedStructure(), NULL)),
+ content_type_(PayloadType::DATA) {
+}
+
+ContentObject::ContentObject(const Name &name, uint8_t *payload, std::size_t size)
+ : name_(name), content_type_(PayloadType::DATA) {
+ PARCBuffer *buffer = parcBuffer_CreateFromArray(payload, size);
+ buffer = parcBuffer_Flip(buffer);
+ ccnx_content_object_ = ccnxContentObject_CreateWithNameAndPayload(name.getWrappedStructure(), buffer);
+ parcBuffer_Release(&buffer);
+}
+
+ContentObject::ContentObject(const CCNxContentObjectStructure *content_object) : name_(ccnxContentObject_GetName(
+ content_object)),
+ ccnx_content_object_(
+ ccnxContentObject_Acquire(
+ content_object)),
+ content_type_((PayloadType) ccnxContentObject_GetPayloadType(
+ content_object)) {
+}
+
+ContentObject::ContentObject(const Name &name)
+ : name_(name),
+ ccnx_content_object_(ccnxContentObject_CreateWithNameAndPayload(name.getWrappedStructure(), NULL)),
+ content_type_(PayloadType::DATA) {
+}
+
+ContentObject::ContentObject(Name &&name)
+ : name_(std::move(name)),
+ ccnx_content_object_(ccnxContentObject_CreateWithNameAndPayload(name.getWrappedStructure(), NULL)),
+ content_type_(PayloadType::DATA) {
+}
+
+ContentObject::~ContentObject() {
+ ccnxContentObject_Release(&ccnx_content_object_);
+}
+
+bool ContentObject::operator==(const ContentObject &content_object) {
+ return ccnxContentObject_Equals(ccnx_content_object_, content_object.ccnx_content_object_);
+}
+
+PayloadType ContentObject::getContentType() const {
+ return (PayloadType) ccnxContentObject_GetPayloadType(ccnx_content_object_);
+}
+
+void ContentObject::setContentType(PayloadType payload_type) {
+ content_type_ = payload_type;
+}
+
+bool ContentObject::setContent(PayloadType content_type, const uint8_t *buffer, size_t buffer_size) {
+ content_type_ = content_type;
+ return setContent(buffer, buffer_size);
+}
+
+bool ContentObject::setContent(const uint8_t *buffer, size_t buffer_size) {
+ bool ret;
+ PARCBuffer *parc_buffer = parcBuffer_CreateFromArray(buffer, buffer_size);
+ parc_buffer = parcBuffer_Flip(parc_buffer);
+
+ if (content_type_ != PayloadType::DATA) {
+ ret = ccnxContentObject_SetPayload(ccnx_content_object_, (CCNxPayloadType) content_type_, parc_buffer);
+ } else {
+ ret = ccnxContentObject_SetPayload(ccnx_content_object_, (CCNxPayloadType) PayloadType::DATA, parc_buffer);
+ }
+
+ parcBuffer_Release(&parc_buffer);
+
+ return ret;
+}
+
+Array ContentObject::getContent() const {
+ PARCBuffer *buffer = ccnxContentObject_GetPayload(ccnx_content_object_);
+ return Array(parcBuffer_Overlay(buffer, 0), parcBuffer_Remaining(buffer));
+}
+
+void ContentObject::setSignature() {
+
+}
+
+void ContentObject::signWithSha256(KeyLocator &key_locator) {
+ // ccnxValidationCRC32C_Set(ccnx_content_object_);
+}
+
+void ContentObject::setFinalChunkNumber(uint64_t final_chunk_number) {
+ ccnxContentObject_SetFinalChunkNumber(ccnx_content_object_, final_chunk_number);
+}
+
+bool ContentObject::hasFinalChunkNumber() {
+ return ccnxContentObject_HasFinalChunkNumber(ccnx_content_object_);
+}
+
+uint64_t ContentObject::getFinalChunkNumber() {
+ return ccnxContentObject_GetFinalChunkNumber(ccnx_content_object_);
+}
+
+void ContentObject::setExpiryTime(uint64_t expiry_time) {
+ std::chrono::milliseconds
+ ms = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch());
+ uint64_t expiration = ms.count() + expiry_time;
+ ccnxContentObject_SetExpiryTime(ccnx_content_object_, expiration);
+}
+
+uint64_t ContentObject::getExpiryTime() {
+ return ccnxContentObject_GetExpiryTime(ccnx_content_object_);
+}
+
+const Name &ContentObject::getName() const {
+ return name_;
+}
+
+std::size_t ContentObject::getPacketSize() const {
+ PARCBuffer *packet = ccnxWireFormatMessage_GetWireFormatBuffer(ccnx_content_object_);
+ std::size_t ret = parcBuffer_Remaining(packet);
+ return ret;
+}
+
+void ContentObject::setName(const Name &name) {
+ PARCBuffer *buffer = parcBuffer_Acquire(ccnxContentObject_GetPayload(ccnx_content_object_));
+ ccnxContentObject_Release(&ccnx_content_object_);
+ ccnx_content_object_ = ccnxContentObject_CreateWithNameAndPayload(name.getWrappedStructure(), buffer);
+ parcBuffer_Release(&buffer);
+ name_ = std::move(name);
+}
+
+void ContentObject::setName(Name &&name) {
+ PARCBuffer *buffer = parcBuffer_Acquire(ccnxContentObject_GetPayload(ccnx_content_object_));
+ ccnxContentObject_Release(&ccnx_content_object_);
+ ccnx_content_object_ = ccnxContentObject_CreateWithNameAndPayload(name.getWrappedStructure(), buffer);
+ parcBuffer_Release(&buffer);
+ name_ = std::move(name);
+}
+
+CCNxContentObjectStructure *ContentObject::getWrappedStructure() {
+ return ccnx_content_object_;
+}
+
+uint8_t ContentObject::getPathLabel() const {
+ if (ccnxContentObject_HasPathLabel(ccnx_content_object_)) {
+ return (uint8_t) ccnxContentObject_GetPathLabel(ccnx_content_object_);
+ }
+
+ return 0;
+}
+
+Array::Array(const void *array, size_t size) {
+ this->array_ = array;
+ this->size_ = size;
+}
+
+Array::Array() {
+ this->array_ = nullptr;
+ this->size_ = 0;
+}
+
+const void *Array::data() {
+ return array_;
+}
+
+std::size_t Array::size() {
+ return size_;
+}
+
+Array &Array::setData(const void *data) {
+ array_ = data;
+ return *this;
+}
+
+Array &Array::setSize(std::size_t size) {
+ size_ = size;
+ return *this;
+}
+
+} // end namespace ccnx
+
+} // end namespace icnet
diff --git a/icnet/ccnx/icnet_ccnx_content_object.h b/icnet/ccnx/icnet_ccnx_content_object.h
new file mode 100644
index 00000000..148587bb
--- /dev/null
+++ b/icnet/ccnx/icnet_ccnx_content_object.h
@@ -0,0 +1,121 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ICNET_CCNX_CONTENT_OBJECT_H_
+#define ICNET_CCNX_CONTENT_OBJECT_H_
+
+#include "icnet_ccnx_common.h"
+#include "icnet_ccnx_name.h"
+#include "icnet_ccnx_key_locator.h"
+#include "icnet_ccnx_payload_type.h"
+
+extern "C" {
+#include <ccnx/common/ccnx_ContentObject.h>
+#include <ccnx/common/validation/ccnxValidation_CRC32C.h>
+#include <ccnx/common/codec/ccnxCodec_TlvPacket.h>
+};
+
+namespace icnet {
+
+namespace ccnx {
+
+typedef CCNxContentObject CCNxContentObjectStructure;
+
+// This class is used just to transfer buffer pointers
+// without making a copy, as std::vector<> would do
+
+class Array {
+ public:
+ explicit Array(const void *array, size_t size);
+
+ Array();
+
+ const void *data();
+
+ std::size_t size();
+
+ Array &setData(const void *data);
+
+ Array &setSize(std::size_t size);
+
+ private:
+ std::size_t size_;
+ const void *array_;
+};
+
+class ContentObject : public std::enable_shared_from_this<ContentObject> {
+ public:
+ ContentObject();
+
+ ContentObject(const Name &name, uint8_t *payload, std::size_t size);
+
+ ContentObject(const CCNxContentObjectStructure *content_object);
+
+ ContentObject(const Name &name);
+
+ ContentObject(Name &&name);
+
+ ~ContentObject();
+
+ bool operator==(const ContentObject &content_object);
+
+ PayloadType getContentType() const;
+
+ bool setContent(PayloadType content_type, const uint8_t *buffer, size_t buffer_size);
+
+ bool setContent(const uint8_t *buffer, size_t buffer_size);
+
+ void setContentType(PayloadType payload_type);
+
+ Array getContent() const;
+
+ void setSignature();
+
+ void signWithSha256(KeyLocator &key_locator);
+
+ void setFinalChunkNumber(uint64_t final_chunk_number);
+
+ bool hasFinalChunkNumber();
+
+ uint64_t getFinalChunkNumber();
+
+ void setExpiryTime(uint64_t expiry_time);
+
+ uint64_t getExpiryTime();
+
+ const Name &getName() const;
+
+ std::size_t getPacketSize() const;
+
+ void setName(const Name &name);
+
+ void setName(Name &&name);
+
+ CCNxContentObjectStructure *getWrappedStructure();
+
+ uint8_t getPathLabel() const;
+
+ protected:
+
+ Name name_;
+ CCNxContentObjectStructure *ccnx_content_object_;
+ PayloadType content_type_;
+};
+
+} // end namespace ccnx
+
+} // end namespace icnet
+
+#endif //CP_API_CCNXDATA_H_
diff --git a/icnet/ccnx/icnet_ccnx_facade.h b/icnet/ccnx/icnet_ccnx_facade.h
new file mode 100644
index 00000000..13a5fd43
--- /dev/null
+++ b/icnet/ccnx/icnet_ccnx_facade.h
@@ -0,0 +1,27 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ICNET_CCNX_FACADE_H_
+#define ICNET_CCNX_FACADE_H_
+
+#include "icnet_ccnx_segment.h"
+#include "icnet_ccnx_name.h"
+#include "icnet_ccnx_content_object.h"
+#include "icnet_ccnx_interest.h"
+#include "icnet_ccnx_portal.h"
+#include "icnet_ccnx_manifest.h"
+#include "icnet_ccnx_key_locator.h"
+
+#endif // ICNET_CCNX_FACADE_H_
diff --git a/icnet/ccnx/icnet_ccnx_interest.cc b/icnet/ccnx/icnet_ccnx_interest.cc
new file mode 100644
index 00000000..a2bcf72e
--- /dev/null
+++ b/icnet/ccnx/icnet_ccnx_interest.cc
@@ -0,0 +1,147 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "icnet_ccnx_interest.h"
+
+namespace icnet {
+
+namespace ccnx {
+
+Interest::Interest(const Name &interest_name)
+ : name_(interest_name), interest_(ccnxInterest_CreateSimple(name_.getWrappedStructure())) {
+}
+
+Interest::Interest(Name &&interestName)
+ : name_(std::move(interestName)), interest_(ccnxInterest_CreateSimple(name_.getWrappedStructure())) {
+}
+
+Interest::Interest(CCNxInterestStruct *interest)
+ : name_(ccnxInterest_GetName(interest)), interest_(ccnxInterest_Acquire(interest)) {
+}
+
+Interest::Interest(const Interest &other_interest) : name_(other_interest.name_),
+ interest_(ccnxInterest_CreateSimple(other_interest.name_
+ .getWrappedStructure())) {
+ PARCBuffer *buffer = nullptr;
+
+ // Key Id restriction
+ buffer = ccnxInterest_GetKeyIdRestriction(other_interest.interest_);
+ if (buffer) {
+ ccnxInterest_SetKeyIdRestriction(interest_, buffer);
+ }
+
+ // Content Hash restriction
+ buffer = ccnxInterest_GetContentObjectHashRestriction(other_interest.interest_);
+ if (buffer) {
+ ccnxInterest_SetContentObjectHashRestriction(interest_, buffer);
+ }
+
+ // Optional Payload
+ buffer = ccnxInterest_GetPayload(other_interest.interest_);
+ if (buffer) {
+ ccnxInterest_SetPayload(interest_, buffer);
+ }
+
+ ccnxInterest_SetHopLimit(interest_, ccnxInterest_GetHopLimit(other_interest.interest_));
+ ccnxInterest_SetLifetime(interest_, ccnxInterest_GetLifetime(other_interest.interest_));
+}
+
+Interest::Interest(Interest &&other_interest) : name_(std::move(other_interest.name_)),
+ interest_(ccnxInterest_Acquire(other_interest.interest_)) {
+}
+
+Interest &Interest::operator=(const Interest &other_interest) {
+ ccnxInterest_Release(&interest_);
+ name_ = other_interest.name_;
+ interest_ = ccnxInterest_CreateSimple(name_.getWrappedStructure());
+ return *this;
+}
+
+Interest::~Interest() {
+ ccnxInterest_Release(&interest_);
+}
+
+bool Interest::operator==(const Interest &interest) {
+ return ccnxInterest_Equals(interest_, interest.interest_);
+}
+
+const Name &Interest::getName() const {
+ return name_;
+}
+
+void Interest::setInterestLifetime(uint32_t lifetime) {
+ ccnxInterest_SetLifetime(interest_, lifetime);
+}
+
+const uint32_t Interest::getInterestLifetime() const {
+ return ccnxInterest_GetLifetime(interest_);
+}
+
+bool Interest::setKeyId(const PARCBuffer *keyId) {
+ return ccnxInterest_SetKeyIdRestriction(interest_, keyId);
+}
+
+PARCBuffer *Interest::getKeyId() {
+ return ccnxInterest_GetKeyIdRestriction(interest_);
+}
+
+PARCBuffer *Interest::getContentHash() {
+ return ccnxInterest_GetContentObjectHashRestriction(interest_);
+}
+
+bool Interest::setContentHash(PARCBuffer *hash) {
+ return ccnxInterest_SetContentObjectHashRestriction(interest_, hash);
+}
+
+std::string Interest::toString() {
+ char *str = ccnxInterest_ToString(interest_);
+ std::string ret(str);
+
+ free(str);
+
+ return ret;
+}
+
+bool Interest::setPayload(const PARCBuffer *payload) {
+ return ccnxInterest_SetPayload(interest_, payload);
+}
+
+bool Interest::setPayloadAndId(const PARCBuffer *payload) {
+ return ccnxInterest_SetPayloadAndId(interest_, payload);
+}
+
+bool Interest::setPayloadWithId(const PARCBuffer *payload, const CCNxInterestPayloadId *payload_id) {
+ return ccnxInterest_SetPayloadWithId(interest_, payload, payload_id);
+}
+
+PARCBuffer *Interest::getPayload() {
+ return ccnxInterest_GetPayload(interest_);
+}
+
+void Interest::setHopLimit(uint32_t hop_limit) {
+ ccnxInterest_SetHopLimit(interest_, hop_limit);
+}
+
+uint32_t Interest::getHopLimit() {
+ return ccnxInterest_GetHopLimit(interest_);
+}
+
+CCNxInterestStruct *Interest::getWrappedStructure() const {
+ return interest_;
+}
+
+} // end namespace ccnx
+
+} // end namespace icnet \ No newline at end of file
diff --git a/icnet/ccnx/icnet_ccnx_interest.h b/icnet/ccnx/icnet_ccnx_interest.h
new file mode 100644
index 00000000..2156b56f
--- /dev/null
+++ b/icnet/ccnx/icnet_ccnx_interest.h
@@ -0,0 +1,92 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ICNET_CCNX_INTEREST_H_
+#define ICNET_CCNX_INTEREST_H_
+
+#include "icnet_ccnx_common.h"
+
+extern "C" {
+#include <ccnx/common/ccnx_Interest.h>
+};
+
+//#include "interest.hpp"
+#include "icnet_ccnx_name.h"
+
+namespace icnet {
+
+namespace ccnx {
+
+typedef CCNxInterest CCNxInterestStruct;
+
+class Interest : public std::enable_shared_from_this<Interest> {
+ public:
+ Interest(const Name &interest_name);
+
+ Interest(Name &&name);
+
+ Interest(CCNxInterestStruct *interest);
+
+ Interest(const Interest &other_interest);
+
+ Interest(Interest &&other_interest);
+
+ ~Interest();
+
+ bool operator==(const Interest &interest);
+
+ Interest &operator=(const Interest &other_interest);
+
+ const Name &getName() const;
+
+ void setInterestLifetime(uint32_t lifetime);
+
+ const uint32_t getInterestLifetime() const;
+
+ bool setKeyId(const PARCBuffer *keyId);
+
+ PARCBuffer *getKeyId();
+
+ PARCBuffer *getContentHash();
+
+ bool setContentHash(PARCBuffer *hash);
+
+ std::string toString();
+
+ bool setPayload(const PARCBuffer *payload);
+
+ bool setPayloadAndId(const PARCBuffer *payload);
+
+ bool setPayloadWithId(const PARCBuffer *payload, const CCNxInterestPayloadId *payload_id);
+
+ PARCBuffer *getPayload();
+
+ void setHopLimit(uint32_t hop_limit);
+
+ uint32_t getHopLimit();
+
+ CCNxInterestStruct *getWrappedStructure() const;
+
+ private:
+
+ Name name_;
+ CCNxInterestStruct *interest_;
+};
+
+} // end namespace ccnx
+
+} // end namespace icnet
+
+#endif // ICNET_CCNX_INTEREST_H_
diff --git a/icnet/ccnx/icnet_ccnx_key_locator.cc b/icnet/ccnx/icnet_ccnx_key_locator.cc
new file mode 100644
index 00000000..193573a9
--- /dev/null
+++ b/icnet/ccnx/icnet_ccnx_key_locator.cc
@@ -0,0 +1,40 @@
+//
+// Created by msardara on 01/11/16.
+//
+
+#include "icnet_ccnx_key_locator.h"
+
+namespace icnet {
+
+namespace ccnx {
+
+KeyLocator::KeyLocator() : type_(KeyLocatorType::UNKNOWN) {
+}
+
+KeyLocator::KeyLocator(KeyLocatorType type, Name &name) : type_(type), name_(name) {
+}
+
+Name &KeyLocator::getName() {
+ return name_;
+}
+
+void KeyLocator::setName(Name &name) {
+ name_ = name;
+}
+
+void KeyLocator::setType(KeyLocatorType type) {
+ type_ = type;
+}
+
+KeyLocatorType KeyLocator::getType() {
+ return type_;
+}
+
+void KeyLocator::clear() {
+ type_ = KeyLocatorType::UNKNOWN;
+ name_.clear();
+}
+
+} // end namespace ccnx
+
+} // end namespace icnet \ No newline at end of file
diff --git a/icnet/ccnx/icnet_ccnx_key_locator.h b/icnet/ccnx/icnet_ccnx_key_locator.h
new file mode 100644
index 00000000..c55ff5f1
--- /dev/null
+++ b/icnet/ccnx/icnet_ccnx_key_locator.h
@@ -0,0 +1,52 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ICNET_CCNX_KEY_LOCATOR_H_
+#define ICNET_CCNX_KEY_LOCATOR_H_
+
+#include "icnet_ccnx_common.h"
+#include "icnet_ccnx_key_locator_type.h"
+#include "icnet_ccnx_name.h"
+
+namespace icnet {
+
+namespace ccnx {
+
+class KeyLocator : public std::enable_shared_from_this<KeyLocator> {
+ public:
+ KeyLocator();
+
+ KeyLocator(KeyLocatorType type, Name &name);
+
+ KeyLocatorType getType();
+
+ void setType(KeyLocatorType type);
+
+ void setName(Name &name);
+
+ Name &getName();
+
+ void clear();
+
+ private:
+ KeyLocatorType type_;
+ Name name_;
+};
+
+} // end namespace ccnx
+
+} // end namespace icnet
+
+#endif // ICNET_CCNX_KEY_LOCATOR_H_
diff --git a/icnet/ccnx/icnet_ccnx_key_locator_type.h b/icnet/ccnx/icnet_ccnx_key_locator_type.h
new file mode 100644
index 00000000..477870d3
--- /dev/null
+++ b/icnet/ccnx/icnet_ccnx_key_locator_type.h
@@ -0,0 +1,22 @@
+//
+// Created by msardara on 16/02/2017.
+//
+
+#ifndef ICNET_CCNX_KEY_LOCATOR_TYPE_H_
+#define ICNET_CCNX_KEY_LOCATOR_TYPE_H_
+
+namespace icnet {
+
+namespace ccnx {
+
+enum Type {
+ NAME = 0, KEY_DIGEST = 1, UNKNOWN = 255
+};
+
+typedef enum Type KeyLocatorType;
+
+} // end namespace ccnx
+
+} // end namespace icnet
+
+#endif // ICNET_CCNX_KEY_LOCATOR_TYPE_H_
diff --git a/icnet/ccnx/icnet_ccnx_local_connector.cc b/icnet/ccnx/icnet_ccnx_local_connector.cc
new file mode 100644
index 00000000..2a47c117
--- /dev/null
+++ b/icnet/ccnx/icnet_ccnx_local_connector.cc
@@ -0,0 +1,195 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "icnet_ccnx_local_connector.h"
+
+namespace icnet {
+
+namespace ccnx {
+
+LocalConnector::LocalConnector(boost::asio::io_service &io_service,
+ std::string &ip_address,
+ std::string &port,
+ MessageReceivedCallback receive_callback,
+ std::list<Name> &name_list)
+ : io_service_(io_service),
+ socket_(io_service_),
+ resolver_(io_service_),
+ endpoint_iterator_(resolver_.resolve({ip_address, port})),
+ timer_(io_service),
+ is_connecting_(false),
+ is_reconnection_(false),
+ data_available_(false),
+ receive_callback_(receive_callback),
+ served_name_list_(name_list) {
+ startConnectionTimer();
+ doConnect();
+}
+
+LocalConnector::~LocalConnector() {
+}
+
+void LocalConnector::bind(Name &name) {
+ CCNxControl *control = ccnxControl_CreateAddRouteToSelfRequest(name.getWrappedStructure());
+ CCNxMetaMessage *message = ccnxMetaMessage_CreateFromControl(control);
+ ccnxControl_Release(&control);
+
+ send(message);
+
+ ccnxMetaMessage_Release((CCNxMetaMessage **) &message);
+
+}
+
+void LocalConnector::send(CCNxMetaMessage *message) {
+ CCNxMetaMessage *msg = ccnxMetaMessage_Acquire(message);
+
+ io_service_.post([this, msg]() {
+ bool write_in_progres = !write_msgs_.empty();
+ write_msgs_.push_back(msg);
+ if (!is_connecting_) {
+ if (!write_in_progres) {
+ doWrite();
+ }
+ } else {
+ // Tell the handle connect it has data to write
+ data_available_ = true;
+ }
+ });
+}
+
+void LocalConnector::close() {
+ io_service_.post([this]() { socket_.close(); });
+}
+
+void LocalConnector::doWrite() {
+ CCNxMetaMessage *message = write_msgs_.front();
+ CCNxCodecNetworkBufferIoVec *network_buffer = ccnxCodecSchemaV1PacketEncoder_DictionaryEncode(message, NULL);
+ const iovec *iov = ccnxCodecNetworkBufferIoVec_GetArray(network_buffer);
+
+ boost::asio::async_write(socket_,
+ boost::asio::buffer(iov->iov_base, iov->iov_len),
+ [this, network_buffer, message](boost::system::error_code ec, std::size_t /*length*/) {
+ if (!ec) {
+ ccnxMetaMessage_Release((CCNxMetaMessage **) &message);
+ write_msgs_.pop_front();
+
+ if (!write_msgs_.empty()) {
+ doWrite();
+ }
+ } else {
+ tryReconnect();
+ }
+
+ ccnxCodecNetworkBufferIoVec_Release((CCNxCodecNetworkBufferIoVec **) &network_buffer);
+
+ });
+
+}
+
+void LocalConnector::doReadBody() {
+ boost::asio::async_read(socket_,
+ boost::asio::buffer(read_msg_.body(), read_msg_.bodyLength()),
+ boost::asio::transfer_exactly(read_msg_.bodyLength()),
+ [this](boost::system::error_code ec, std::size_t length) {
+ if (!ec) {
+ receive_callback_(read_msg_.decodeMessage());
+ doReadHeader();
+ } else {
+ tryReconnect();
+ }
+ });
+}
+
+void LocalConnector::doReadHeader() {
+ boost::asio::async_read(socket_,
+ boost::asio::buffer(read_msg_.data(), TransportMessage::header_length),
+ boost::asio::transfer_exactly(TransportMessage::header_length),
+ [this](boost::system::error_code ec, std::size_t /*length*/) {
+ if (!ec) {
+ if (read_msg_.decodeHeader()) {
+ doReadBody();
+ } else {
+ std::cerr << "Decoding error" << std::endl;
+ }
+ } else {
+ tryReconnect();
+ }
+ });
+}
+
+void LocalConnector::tryReconnect() {
+ if (!is_connecting_) {
+ std::cerr << "Connection lost. Trying to reconnect..." << std::endl;
+ is_connecting_ = true;
+ is_reconnection_ = true;
+ io_service_.post([this]() {
+ socket_.close();
+ startConnectionTimer();
+ doConnect();
+ });
+ }
+}
+
+void LocalConnector::doConnect() {
+ boost::asio::async_connect(socket_,
+ endpoint_iterator_,
+ [this](boost::system::error_code ec, tcp::resolver::iterator) {
+ if (!ec) {
+ timer_.cancel();
+ is_connecting_ = false;
+ doReadHeader();
+
+ if (data_available_) {
+ data_available_ = false;
+ doWrite();
+ }
+
+ if (is_reconnection_) {
+ is_reconnection_ = false;
+ std::cout << "Connection recovered!" << std::endl;
+ for (auto &name : served_name_list_) {
+ bind(name);
+ }
+ }
+
+ } else {
+ sleep(1);
+ doConnect();
+ }
+ });
+}
+
+bool LocalConnector::checkConnected() {
+ return !is_connecting_;
+}
+
+void LocalConnector::startConnectionTimer() {
+ timer_.expires_from_now(boost::posix_time::seconds(20));
+ timer_.async_wait(std::bind(&LocalConnector::handleDeadline, this, std::placeholders::_1));
+}
+
+void LocalConnector::handleDeadline(const boost::system::error_code &ec) {
+ if (!ec) {
+ io_service_.post([this]() {
+ socket_.close();
+ std::cerr << "Error connecting. Is the forwarder running?" << std::endl;
+ io_service_.stop();
+ });
+ }
+}
+
+} // end namespace ccnx
+
+} // end namespace icnet \ No newline at end of file
diff --git a/icnet/ccnx/icnet_ccnx_local_connector.h b/icnet/ccnx/icnet_ccnx_local_connector.h
new file mode 100644
index 00000000..b08d1c5a
--- /dev/null
+++ b/icnet/ccnx/icnet_ccnx_local_connector.h
@@ -0,0 +1,98 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ICNET_CCNX_LOCAL_CONNECTOR_H_
+#define ICNET_CCNX_LOCAL_CONNECTOR_H_
+
+#include "icnet_ccnx_common.h"
+#include "icnet_ccnx_network_message.h"
+#include "icnet_ccnx_name.h"
+#include <boost/asio.hpp>
+#include <deque>
+
+extern "C" {
+#include <ccnx/common/ccnx_Interest.h>
+#include <ccnx/common/ccnx_ContentObject.h>
+#include <parc/security/parc_Security.h>
+#include <ccnx/api/ccnx_Portal/ccnx_Portal.h>
+#include <ccnx/api/ccnx_Portal/ccnx_PortalRTA.h>
+#include <ccnx/common/codec/schema_v1/ccnxCodecSchemaV1_PacketEncoder.h>
+};
+
+namespace icnet {
+
+namespace ccnx {
+
+using boost::asio::ip::tcp;
+typedef std::deque<CCNxMetaMessage *> CcnxTransportMessageQueue;
+typedef std::function<void(CCNxMetaMessage *)> MessageReceivedCallback;
+
+class LocalConnector {
+ public:
+ LocalConnector(boost::asio::io_service &io_service,
+ std::string &ip_address,
+ std::string &port,
+ MessageReceivedCallback receive_callback,
+ std::list<Name> &name_list);
+
+ ~LocalConnector();
+
+ void send(CCNxMetaMessage *message);
+
+ void bind(Name &name);
+
+ void close();
+
+ private:
+ void doConnect();
+
+ void doReadHeader();
+
+ void doReadBody();
+
+ void doWrite();
+
+ bool checkConnected();
+
+ private:
+
+ void handleDeadline(const boost::system::error_code &ec);
+
+ void startConnectionTimer();
+
+ void tryReconnect();
+
+ boost::asio::io_service &io_service_;
+ boost::asio::ip::tcp::socket socket_;
+ boost::asio::ip::tcp::resolver resolver_;
+ boost::asio::ip::tcp::resolver::iterator endpoint_iterator_;
+ boost::asio::deadline_timer timer_;
+
+ TransportMessage read_msg_;
+ CcnxTransportMessageQueue write_msgs_;
+
+ bool is_connecting_;
+ bool is_reconnection_;
+ bool data_available_;
+
+ MessageReceivedCallback receive_callback_;
+ std::list<Name> &served_name_list_;
+};
+
+} // end namespace ccnx
+
+} // end namespace icnet
+
+#endif // ICNET_CCNX_LOCAL_CONNECTOR_H_
diff --git a/icnet/ccnx/icnet_ccnx_manifest.cc b/icnet/ccnx/icnet_ccnx_manifest.cc
new file mode 100644
index 00000000..c818f110
--- /dev/null
+++ b/icnet/ccnx/icnet_ccnx_manifest.cc
@@ -0,0 +1,78 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "icnet_ccnx_manifest.h"
+#include <boost/property_tree/json_parser.hpp>
+
+namespace icnet {
+
+namespace ccnx {
+
+Manifest::Manifest(Name &name) : ContentObject(name) {
+}
+
+std::size_t Manifest::estimateManifestSize() {
+ std::size_t capacity = 0;
+
+ // for (auto pair : m_mapNameDigest)
+ // capacity += pair.first.size() + pair.second.data().size();
+
+ return capacity;
+}
+
+Manifest::Manifest(ContentObject &content_object) {
+ decode();
+}
+
+void Manifest::encode() {
+ std::stringstream ss;
+ //boost::property_tree::write_json(ss, m_mapNameDigest);
+
+ std::string json_string = ss.str();
+
+ ContentObject::setContent(PayloadType::MANIFEST, (uint8_t *) json_string.c_str(), json_string.size());
+}
+
+void Manifest::decode() {
+ PARCBuffer *payload = ccnxContentObject_GetPayload(ccnx_content_object_);
+ char *buffer = parcBuffer_ToString(payload);
+
+ std::stringstream ss;
+ ss << buffer;
+
+ //boost::property_tree::read_json(ss, m_mapNameDigest);
+
+ free(buffer);
+}
+
+std::string Manifest::getDigest(ContentObject &content_object) {
+ // ContentObject &ccnxData = (ContentObject &) content_object;
+ // for (auto pair : m_mapNameDigest)
+ // if (pair.second.content_object() == ccnxData.getName().toUri()) {
+ // return pair.second.content_object();
+ // }
+
+ return std::string();
+}
+
+void Manifest::addNameToCatalogue(Name &name, uint8_t *digest, std::size_t digest_size) {
+ // Name &ccnxName = (Name &) name;
+ // m_mapNameDigest.put(ccnxName.toUri(), std::string((char *) digest, digest_size));
+ return;
+}
+
+} // end namespace ccnx
+
+} // end namespace icnet \ No newline at end of file
diff --git a/icnet/ccnx/icnet_ccnx_manifest.h b/icnet/ccnx/icnet_ccnx_manifest.h
new file mode 100644
index 00000000..92118777
--- /dev/null
+++ b/icnet/ccnx/icnet_ccnx_manifest.h
@@ -0,0 +1,55 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ICNET_CCNX_MANIFEST_H_
+#define ICNET_CCNX_MANIFEST_H_
+
+#include "icnet_ccnx_common.h"
+//#include <boost/property_tree/ptree.hpp>
+
+#include "icnet_ccnx_name.h"
+#include "icnet_ccnx_content_object.h"
+
+namespace icnet {
+
+namespace ccnx {
+
+class Manifest : public ContentObject // public api::Manifest
+{
+ public:
+ Manifest(Name &name);
+
+ Manifest(ContentObject &content_object);
+
+ std::size_t estimateManifestSize();
+
+ void encode();
+
+ void decode();
+
+ std::string getDigest(ContentObject &content_object);
+
+ void addNameToCatalogue(Name &name, uint8_t *digest, std::size_t digest_size);
+
+ private:
+ // boost::property_tree::ptree map_name_digest_;
+};
+
+} // end namespace ccnx
+
+} // end namespace icnet
+
+
+#endif // ICNET_CCNX_MANIFEST_H_
diff --git a/icnet/ccnx/icnet_ccnx_name.cc b/icnet/ccnx/icnet_ccnx_name.cc
new file mode 100644
index 00000000..2ea0d8ab
--- /dev/null
+++ b/icnet/ccnx/icnet_ccnx_name.cc
@@ -0,0 +1,218 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "icnet_ccnx_name.h"
+
+namespace icnet {
+
+namespace ccnx {
+
+Name::Name() {
+ name_ = ccnxName_Create();
+}
+
+Name::Name(const char *uri) : name_(ccnxName_CreateFromCString(uri)) {
+ ccnxName_AssertValid(name_);
+}
+
+Name::Name(std::string uri) : Name(uri.c_str()) {
+}
+
+Name::Name(const CCNxNameStructure *name) : name_(ccnxName_Acquire(name)) {
+ ccnxName_AssertValid(name_);
+}
+
+Name::Name(const Name &name) : name_(ccnxName_Copy(name.name_)) {
+}
+
+Name::Name(Name &&otherName) : name_(ccnxName_Acquire(otherName.name_)) {
+}
+
+Name &Name::operator=(const Name &name) {
+ ccnxName_Release(&this->name_);
+ this->name_ = ccnxName_Copy(name.name_);
+ return *this;
+}
+
+Name &Name::operator=(Name &&name) {
+ ccnxName_Release(&this->name_);
+ this->name_ = ccnxName_Acquire(name.name_);
+ return *this;
+}
+
+bool Name::operator==(const Name &name) const {
+ return ccnxName_Equals(this->name_, name.name_);
+}
+
+Name::~Name() {
+ ccnxName_Release(&name_);
+}
+
+bool Name::equals(const Name &name) {
+ return ccnxName_Equals(this->name_, name.name_);
+}
+
+bool Name::isValid() {
+ return ccnxName_IsValid(name_);
+}
+
+const std::string &Name::toString() {
+ char *str = ccnxName_ToString(name_);
+ name_string_ = std::string(str);
+ free(str);
+ return name_string_;
+}
+
+void Name::appendComponent(const Segment &suffix) {
+ ccnxName_Append(name_, suffix.getWrappedStructure());
+}
+
+void Name::append(const Name &suffix) {
+ if (ccnxName_IsValid(suffix.name_)) {
+
+ size_t number_of_components = ccnxName_GetSegmentCount(suffix.getWrappedStructure());
+
+ for (uint32_t i = 0; i < number_of_components; i++) {
+ ccnxName_Append(name_, ccnxName_GetSegment(suffix.getWrappedStructure(), i));
+ }
+ }
+}
+
+Name Name::getPrefix(ssize_t number_of_components) const {
+ std::size_t segment_count = ccnxName_GetSegmentCount(name_);
+
+ if (number_of_components >= 0) {
+ assert((std::size_t) number_of_components < segment_count);
+ return getSubName(0, number_of_components);
+ } else {
+ assert(segment_count + number_of_components >= 0);
+ return getSubName(0, number_of_components + segment_count);
+ }
+
+}
+
+Segment Name::get(ssize_t index) const {
+ std::size_t segment_count = ccnxName_GetSegmentCount(name_);
+ size_t componentIndex = 0;
+
+ if (index >= 0) {
+ assert((size_t) index < segment_count);
+ componentIndex = (size_t) index;
+ } else {
+ assert(segment_count + index >= 0);
+ componentIndex = segment_count + index;
+ }
+
+ CCNxNameSegment *segment = ccnxName_GetSegment(name_, componentIndex);
+ Segment ret(segment);
+
+ return ret;
+}
+
+Name Name::getSubName(ssize_t start_component, ssize_t number_of_components) const {
+
+ size_t name_size = ccnxName_GetSegmentCount(name_);
+ size_t begin;
+
+ if (start_component >= 0) {
+ assert((size_t) start_component < name_size);
+ begin = static_cast<size_t>(start_component);
+ } else {
+ assert((ssize_t) (start_component + name_size) >= 0);
+ begin = start_component + name_size;
+ }
+
+ size_t end = begin;
+ end += number_of_components < 0 ? name_size : static_cast<size_t>(number_of_components);
+
+ if (end >= name_size) {
+ end = name_size;
+ }
+
+ CCNxName *name = ccnxName_Create();
+
+ for (size_t i = begin; i < end; i++) {
+ ccnxName_Append(name, ccnxName_GetSegment(name_, i));
+ }
+
+ Name ret(name);
+ ccnxName_Release(&name);
+
+ return ret;
+}
+
+bool Name::isPrefixOf(const Name &name) {
+ Name &ccnx_name = (Name &) name;
+ return ccnxName_StartsWith(ccnx_name.name_, name_);
+}
+
+Name &Name::appendSegment(const uint64_t chunk_number) {
+ CCNxNameSegmentStructure *ns = ccnxNameSegmentNumber_Create(CCNxNameLabelType_CHUNK, chunk_number);
+ name_ = ccnxName_Append(name_, ns);
+ ccnxNameSegment_Release(&ns);
+
+ return *this;
+}
+
+bool Name::empty() const {
+ return ccnxName_GetSegmentCount(name_) == 0;
+}
+
+void Name::clear() {
+ ccnxName_Release(&name_);
+ name_ = ccnxName_Create();
+}
+
+std::size_t Name::getSegmentCount() {
+ return ccnxName_GetSegmentCount(name_);
+}
+
+std::size_t Name::size() {
+ std::size_t number_of_segments = ccnxName_GetSegmentCount(name_);
+ std::size_t name_bytes = 0;
+
+ for (std::size_t i = 0; i < number_of_segments; i++) {
+ name_bytes += ccnxNameSegment_Length(ccnxName_GetSegment(name_, i));
+ }
+
+ return name_bytes;
+}
+
+std::ostream &operator<<(std::ostream &os, const Name &name) {
+ const std::string &str = const_cast<Name &>(name).toString();
+
+ if (name.empty()) {
+ os << "ccnx:/";
+ } else {
+ os << str;
+ }
+
+ return os;
+}
+
+CCNxNameStructure *Name::getWrappedStructure() const {
+ return name_;
+}
+
+} // end namespace ccnx
+
+} // end namespace icnet
+
+namespace std {
+size_t hash<icnet::ccnx::Name>::operator()(const icnet::ccnx::Name &name) const {
+ return ccnxName_HashCode(name.getWrappedStructure());;
+}
+
+} // end namespace std \ No newline at end of file
diff --git a/icnet/ccnx/icnet_ccnx_name.h b/icnet/ccnx/icnet_ccnx_name.h
new file mode 100644
index 00000000..68c4d19e
--- /dev/null
+++ b/icnet/ccnx/icnet_ccnx_name.h
@@ -0,0 +1,120 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ICNET_CCNX_NAME_H_
+#define ICNET_CCNX_NAME_H_
+
+#include "icnet_ccnx_common.h"
+
+#include <unordered_map>
+#include <assert.h>
+#include <list>
+
+extern "C" {
+#include <ccnx/common/ccnx_Name.h>
+};
+
+//#include "name.hpp"
+#include "icnet_ccnx_segment.h"
+#include <vector>
+
+typedef CCNxName CCNxNameStructure;
+
+namespace icnet {
+
+namespace ccnx {
+
+class Name : public std::enable_shared_from_this<Name> // : public api::Name
+{
+ public:
+ Name();
+
+ /**
+ * @brief Create name
+ * @param uri The null-terminated URI string
+ */
+ Name(const char *uri);
+
+ /**
+ * @brief Create name from @p uri (ICN URI scheme)
+ * @param uri The URI string
+ */
+ Name(std::string uri);
+
+ Name(const Name &name);
+
+ Name(Name &&otherName);
+
+ Name(const CCNxNameStructure *name);
+
+ Name &operator=(const Name &name);
+
+ Name &operator=(Name &&name);
+
+ bool operator==(const Name &name) const;
+
+ bool isValid();
+
+ const std::string &toString();
+
+ void appendComponent(const Segment &component);
+
+ void append(const Name &suffix);
+
+ Name getPrefix(ssize_t number_of_components) const;
+
+ Name &appendSegment(const uint64_t chunk_number);
+
+ bool isPrefixOf(const Name &name);
+
+ bool equals(const Name &name);
+
+ Segment get(ssize_t index) const;
+
+ Name getSubName(ssize_t start_component, ssize_t number_of_components = -1) const;
+
+ bool empty() const;
+
+ void clear();
+
+ std::size_t getSegmentCount();
+
+ std::size_t size();
+
+ ~Name();
+
+ CCNxNameStructure *getWrappedStructure() const;
+
+ private:
+
+ CCNxNameStructure *name_;
+ std::string name_string_;
+};
+
+std::ostream &operator<<(std::ostream &os, const Name &name);
+
+} // end namespace ccnx
+
+} // end namespace icnet
+
+namespace std {
+template<>
+struct hash<icnet::ccnx::Name> {
+ size_t operator()(const icnet::ccnx::Name &name) const;
+};
+
+} // end namespace std
+
+#endif // ICNET_CCNX_NAME_H_
diff --git a/icnet/ccnx/icnet_ccnx_network_message.cc b/icnet/ccnx/icnet_ccnx_network_message.cc
new file mode 100644
index 00000000..70dff5f7
--- /dev/null
+++ b/icnet/ccnx/icnet_ccnx_network_message.cc
@@ -0,0 +1,88 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "icnet_ccnx_network_message.h"
+
+namespace icnet {
+
+namespace ccnx {
+
+TransportMessage::TransportMessage() : packet_length_(0) {
+}
+
+const uint8_t *TransportMessage::data() const {
+ return data_;
+}
+
+uint8_t *TransportMessage::data() {
+ return data_;
+}
+
+std::size_t TransportMessage::length() const {
+ return packet_length_;
+}
+
+const uint8_t *TransportMessage::body() const {
+ return data_ + header_length;
+}
+
+uint8_t *TransportMessage::body() {
+ return data_ + header_length;
+}
+
+std::size_t TransportMessage::bodyLength() const {
+ return packet_length_ - header_length;
+}
+
+void TransportMessage::bodyLength(std::size_t new_length) {
+ packet_length_ = new_length;
+ if (packet_length_ > max_packet_length) {
+ packet_length_ = max_packet_length;
+ }
+}
+
+bool TransportMessage::decodeHeader() {
+ // General checks
+
+ uint8_t message_version = data_[0];
+
+ if (message_version != 1) {
+ std::cerr << "Illegal packet version " << message_version << std::endl;
+ return false;
+ }
+
+ // Get packet length
+
+ packet_length_ = data_[2];
+ packet_length_ <<= 8;
+ packet_length_ |= data_[3];
+
+ return true;
+}
+
+CCNxMetaMessage *TransportMessage::decodeMessage() {
+ std::size_t total_length = packet_length_;
+ PARCBuffer *buffer = parcBuffer_CreateFromArray((void *) data_, total_length);
+ buffer = parcBuffer_Flip(buffer);
+ CCNxMetaMessage *ret = ccnxMetaMessage_CreateFromWireFormatBuffer(buffer);
+ parcBuffer_Release((PARCBuffer **) &buffer);
+
+ return ret;
+
+}
+
+} // end namespace ccnx
+
+} // end namespace icnet \ No newline at end of file
diff --git a/icnet/ccnx/icnet_ccnx_network_message.h b/icnet/ccnx/icnet_ccnx_network_message.h
new file mode 100644
index 00000000..a5cbfccc
--- /dev/null
+++ b/icnet/ccnx/icnet_ccnx_network_message.h
@@ -0,0 +1,70 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ICNET_CCNX_NETWORK_MESSAGE_H_
+#define ICNET_CCNX_NETWORK_MESSAGE_H_
+
+#include "icnet_ccnx_common.h"
+#include <iostream>
+
+extern "C" {
+#include <ccnx/common/codec/schema_v1/ccnxCodecSchemaV1_PacketDecoder.h>
+#include <parc/algol/parc_Buffer.h>
+#include <ccnx/transport/common/transport_MetaMessage.h>
+};
+
+namespace icnet {
+
+namespace ccnx {
+
+class TransportMessage {
+ public:
+ enum {
+ header_length = 8
+ };
+ enum {
+ max_packet_length = 1500
+ };
+
+ TransportMessage();
+
+ const uint8_t *data() const;
+
+ uint8_t *data();
+
+ std::size_t length() const;
+
+ const uint8_t *body() const;
+
+ uint8_t *body();
+
+ std::size_t bodyLength() const;
+
+ void bodyLength(std::size_t new_length);
+
+ bool decodeHeader();
+
+ CCNxMetaMessage *decodeMessage();
+
+ private:
+ uint8_t data_[max_packet_length];
+ std::size_t packet_length_;
+};
+
+} // end namespace ccnx
+
+} // end namespace icnet
+
+#endif // ICNET_CCNX_NETWORK_MESSAGE_H_
diff --git a/icnet/ccnx/icnet_ccnx_payload_type.h b/icnet/ccnx/icnet_ccnx_payload_type.h
new file mode 100644
index 00000000..36231c52
--- /dev/null
+++ b/icnet/ccnx/icnet_ccnx_payload_type.h
@@ -0,0 +1,37 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ICNET_CCNX_PAYLOAD_TYPE_H_
+#define ICNET_CCNX_PAYLOAD_TYPE_H_
+
+#include <ccnx/common/ccnx_PayloadType.h>
+
+namespace icnet {
+
+namespace ccnx {
+
+typedef enum {
+ DATA = CCNxPayloadType_DATA,
+ KEY = CCNxPayloadType_KEY,
+ LINK = CCNxPayloadType_LINK,
+ MANIFEST = CCNxPayloadType_MANIFEST,
+ NONE = 999
+} PayloadType;
+
+} // end namespace ccnx
+
+} // end namespace icnet
+
+#endif // ICNET_CCNX_PAYLOAD_TYPE_H_
diff --git a/icnet/ccnx/icnet_ccnx_pending_interest.cc b/icnet/ccnx/icnet_ccnx_pending_interest.cc
new file mode 100644
index 00000000..9536e4fb
--- /dev/null
+++ b/icnet/ccnx/icnet_ccnx_pending_interest.cc
@@ -0,0 +1,94 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "icnet_ccnx_pending_interest.h"
+
+namespace icnet {
+
+namespace ccnx {
+
+PendingInterest::PendingInterest(std::shared_ptr<Interest> &interest,
+ boost::asio::io_service &portal_io_service,
+ const OnContentObjectCallback &on_content_object,
+ const OnInterestTimeoutCallback &on_interest_timeout)
+ : interest_(interest),
+ io_service_(portal_io_service),
+ timer_(io_service_),
+ on_content_object_callback_(on_content_object),
+ on_interest_timeout_callback_(on_interest_timeout),
+ received_(false),
+ valid_(true) {
+}
+
+PendingInterest::~PendingInterest() {
+ interest_.reset();
+}
+
+void PendingInterest::startCountdown(BoostCallback &cb) {
+ timer_.expires_from_now(boost::posix_time::milliseconds(interest_->getInterestLifetime()));
+ timer_.async_wait(cb);
+}
+
+void PendingInterest::cancelTimer() {
+ timer_.cancel();
+}
+
+bool PendingInterest::isReceived() const {
+ return received_;
+}
+
+void PendingInterest::setReceived() {
+ received_ = true;
+}
+
+const std::shared_ptr<Interest> &PendingInterest::getInterest() const {
+ return interest_;
+}
+
+void PendingInterest::setInterest(const std::shared_ptr<Interest> &interest) {
+ PendingInterest::interest_ = interest;
+}
+
+const OnContentObjectCallback &PendingInterest::getOnDataCallback() const {
+ return on_content_object_callback_;
+}
+
+void PendingInterest::setOnDataCallback(const OnContentObjectCallback &on_content_object) {
+ PendingInterest::on_content_object_callback_ = on_content_object;
+}
+
+const OnInterestTimeoutCallback &PendingInterest::getOnTimeoutCallback() const {
+ return on_interest_timeout_callback_;
+}
+
+void PendingInterest::setOnTimeoutCallback(const OnInterestTimeoutCallback &on_interest_timeout) {
+ PendingInterest::on_interest_timeout_callback_ = on_interest_timeout;
+}
+
+void PendingInterest::setReceived(bool received) {
+ PendingInterest::received_ = received;
+}
+
+bool PendingInterest::isValid() const {
+ return valid_;
+}
+
+void PendingInterest::setValid(bool valid) {
+ PendingInterest::valid_ = valid;
+}
+
+} // end namespace ccnx
+
+} // end namespace icnet \ No newline at end of file
diff --git a/icnet/ccnx/icnet_ccnx_pending_interest.h b/icnet/ccnx/icnet_ccnx_pending_interest.h
new file mode 100644
index 00000000..692c63e7
--- /dev/null
+++ b/icnet/ccnx/icnet_ccnx_pending_interest.h
@@ -0,0 +1,93 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ICNET_CCNX_PENDING_INTEREST_H_
+#define ICNET_CCNX_PENDING_INTEREST_H_
+
+#include "icnet_ccnx_interest.h"
+#include "icnet_ccnx_content_object.h"
+#include "icnet_ccnx_name.h"
+
+#include <boost/asio.hpp>
+
+namespace icnet {
+
+namespace ccnx {
+
+typedef std::function<void(const Interest &, ContentObject &)>
+OnContentObjectCallback;
+typedef std::function<void( const Interest
+&)>
+OnInterestTimeoutCallback;
+typedef std::function<void(const Name &, const Interest
+&)>
+OnInterestCallback;
+typedef std::function<void(const boost::system::error_code &)> BoostCallback;
+
+class PendingInterest {
+ public:
+
+ friend class Portal;
+
+ PendingInterest(std::shared_ptr<Interest> &interest,
+ boost::asio::io_service &portal_io_service,
+ const OnContentObjectCallback &on_content_object,
+ const OnInterestTimeoutCallback &on_interest_timeout);
+
+ ~PendingInterest();
+
+ bool isReceived() const;
+
+ void startCountdown(BoostCallback &cb);
+
+ void cancelTimer();
+
+ void setReceived();
+
+ const std::shared_ptr<Interest> &getInterest() const;
+
+ void setInterest(const std::shared_ptr<Interest> &interest);
+
+ const OnContentObjectCallback &getOnDataCallback() const;
+
+ void setOnDataCallback(const OnContentObjectCallback &on_content_object);
+
+ const OnInterestTimeoutCallback &getOnTimeoutCallback() const;
+
+ void setOnTimeoutCallback(const OnInterestTimeoutCallback &on_interest_timeout);
+
+ void setReceived(bool received);
+
+ bool isValid() const;
+
+ void setValid(bool valid);
+
+ private:
+ std::shared_ptr<Interest> interest_;
+ boost::asio::io_service &io_service_;
+ boost::asio::deadline_timer timer_;
+
+ private:
+ OnContentObjectCallback on_content_object_callback_;
+ OnInterestTimeoutCallback on_interest_timeout_callback_;
+ bool received_;
+ bool valid_;
+};
+
+} // end namespace ccnx
+
+} // end namespace icnet
+
+#endif // ICNET_CCNX_PENDING_INTEREST_H_
diff --git a/icnet/ccnx/icnet_ccnx_portal.cc b/icnet/ccnx/icnet_ccnx_portal.cc
new file mode 100644
index 00000000..5b14ace3
--- /dev/null
+++ b/icnet/ccnx/icnet_ccnx_portal.cc
@@ -0,0 +1,204 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "icnet_ccnx_portal.h"
+
+#define UNSET_CALLBACK 0
+#define MAX_ARRAY_SIZE 16000
+
+namespace icnet {
+
+namespace ccnx {
+
+Portal::Portal(std::string forwarder_ip_address, std::string forwarder_port)
+ : is_running_(true),
+ clear_(false),
+ on_interest_callback_(UNSET_CALLBACK),
+ connector_(io_service_,
+ forwarder_ip_address,
+ forwarder_port,
+ std::bind(&Portal::processIncomingMessages, this, std::placeholders::_1),
+ served_name_list_) {
+ io_service_.reset();
+}
+
+Portal::~Portal() {
+ connector_.close();
+ stopEventsLoop();
+ clear();
+}
+
+void Portal::sendInterest(const Interest &interest,
+ const OnContentObjectCallback &on_content_object,
+ const OnInterestTimeoutCallback &on_interest_timeout) {
+ std::shared_ptr<Interest> _interest = const_cast<Interest &>(interest).shared_from_this();
+
+ // Create new message
+ CCNxMetaMessage *message = ccnxMetaMessage_CreateFromInterest(_interest->getWrappedStructure());
+
+ // Send it
+ connector_.send(message);
+ clear_ = false;
+ std::function<void(const boost::system::error_code &)> timer_callback;
+
+ PendingInterest *pend_interest = new PendingInterest(_interest, io_service_, on_content_object, on_interest_timeout);
+ const Name &name = _interest->getName();
+
+ pending_interest_hash_table_[name] = std::unique_ptr<PendingInterest>(pend_interest);
+
+ timer_callback = [this, name](const boost::system::error_code &ec) {
+
+ if (clear_ || !is_running_) {
+ return;
+ }
+
+ if (ec.value() != boost::system::errc::operation_canceled) {
+ std::unordered_map<Name, std::unique_ptr<PendingInterest>>::iterator it = pending_interest_hash_table_.find(name);
+ if (it != pending_interest_hash_table_.end()) {
+ it->second->getOnTimeoutCallback()(*it->second->getInterest());
+ } else {
+ std::cerr << "Timeout on interest already received_! [" << it->second->getInterest()->getName() << "]"
+ << std::endl;
+ }
+ }
+ };
+
+ pend_interest->startCountdown(timer_callback);
+
+ ccnxMetaMessage_Release(&message);
+}
+
+void Portal::bind(Name &name, const OnInterestCallback &on_interest_callback) {
+ on_interest_callback_ = on_interest_callback;
+ served_name_list_.push_back(name);
+ work_ = std::shared_ptr<boost::asio::io_service::work>(new boost::asio::io_service::work(io_service_));
+ connector_.bind(name);
+}
+
+void Portal::sendContentObject(const ContentObject &content_object) {
+ ContentObject &ccnx_data = const_cast<ContentObject &>(content_object);
+ CCNxMetaMessageStructure *message = ccnxMetaMessage_CreateFromContentObject(ccnx_data.getWrappedStructure());
+
+ ccnxContentObject_AssertValid(ccnx_data.getWrappedStructure());
+
+ connector_.send(message);
+
+ ccnxMetaMessage_Release(&message);
+}
+
+void Portal::runEventsLoop() {
+ if (io_service_.stopped()) {
+ io_service_.reset(); // ensure that run()/poll() will do some work
+ }
+
+ is_running_ = true;
+ this->io_service_.run();
+}
+
+void Portal::stopEventsLoop() {
+ is_running_ = false;
+ work_.reset();
+ io_service_.stop();
+}
+
+void Portal::clear() {
+ pending_interest_hash_table_.clear();
+ clear_ = true;
+}
+
+void Portal::processInterest(CCNxMetaMessage *response) {
+ // Interest for a producer
+ CCNxInterest *interest_ptr = ccnxInterest_Acquire(ccnxMetaMessage_GetInterest(response));
+
+ if (on_interest_callback_ != UNSET_CALLBACK) {
+
+ Interest interest(interest_ptr);
+ if (on_interest_callback_) {
+ on_interest_callback_(interest.getName(), interest);
+ }
+ ccnxInterest_Release((CCNxInterest **) &interest_ptr);
+ }
+}
+
+void Portal::processControlMessage(CCNxMetaMessage *response) {
+ // Control message as response to the route set by a producer
+
+ CCNxControl *control_message = ccnxMetaMessage_GetControl(response);
+
+ if (ccnxControl_IsACK(control_message)) {
+ std::cout << "Route set correctly!" << std::endl;
+ } else {
+ std::cout << "Failed to set the route." << std::endl;
+ }
+}
+
+void Portal::processContentObject(CCNxMetaMessage *response) {
+ // Content object for a consumer
+
+ CCNxContentObject *content_object = ccnxContentObject_Acquire(ccnxMetaMessage_GetContentObject(response));
+
+ CCNxName *n = ccnxContentObject_GetName(content_object);
+ size_t n_components = ccnxName_GetSegmentCount(n);
+ CCNxNameSegment *last_segment = ccnxName_GetSegment(n, n_components - 1);
+
+ bool has_chunk_number = ccnxNameSegmentNumber_IsValid(last_segment);
+
+ PendingInterestHashTable::iterator it = pending_interest_hash_table_.find(Name(n));
+
+ if (it != pending_interest_hash_table_.end()) {
+
+ std::unique_ptr<PendingInterest> &interest_ptr = it->second;
+
+ interest_ptr->cancelTimer();
+ std::shared_ptr<ContentObject> data_ptr = std::make_shared<ContentObject>(content_object);
+
+ if (!interest_ptr->isReceived()) {
+ interest_ptr->setReceived();
+ interest_ptr->getOnDataCallback()(*interest_ptr->getInterest(), *data_ptr);
+
+ if (!has_chunk_number) {
+ pending_interest_hash_table_.erase(interest_ptr->getInterest()->getName());
+ }
+ }
+ }
+
+ ccnxContentObject_Release((CCNxContentObject **) &content_object);
+}
+
+void Portal::processIncomingMessages(CCNxMetaMessage *response) {
+ if (clear_ || !is_running_) {
+ return;
+ }
+
+ if (response) {
+ if (ccnxMetaMessage_IsContentObject(response)) {
+ processContentObject(response);
+ } else if (ccnxMetaMessage_IsInterest(response)) {
+ processInterest(response);
+ } else if (ccnxMetaMessage_IsControl(response)) {
+ processControlMessage(response);
+ }
+ ccnxMetaMessage_Release(&response);
+ }
+
+}
+
+boost::asio::io_service &Portal::getIoService() {
+ return io_service_;
+}
+
+} // end namespace ccnx
+
+} // end namespace icnet \ No newline at end of file
diff --git a/icnet/ccnx/icnet_ccnx_portal.h b/icnet/ccnx/icnet_ccnx_portal.h
new file mode 100644
index 00000000..5076fcd9
--- /dev/null
+++ b/icnet/ccnx/icnet_ccnx_portal.h
@@ -0,0 +1,105 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ICNET_CCNX_PORTAL_H_
+#define ICNET_CCNX_PORTAL_H_
+
+#include "icnet_ccnx_common.h"
+
+#include <unordered_map>
+#include <memory>
+#include <boost/asio/deadline_timer.hpp>
+#include <boost/asio/io_service.hpp>
+#include <boost/asio.hpp>
+#include <future>
+
+extern "C" {
+#include <ccnx/api/ccnx_Portal/ccnx_Portal.h>
+#include <ccnx/api/ccnx_Portal/ccnx_PortalRTA.h>
+#include <ccnx/api/control/cpi_Acks.h>
+#include <ccnx/common/ccnx_ContentObject.h>
+#include <parc/security/parc_Security.h>
+#include <parc/security/parc_IdentityFile.h>
+#include <parc/security/parc_Pkcs12KeyStore.h>
+#include <parc/algol/parc_Memory.h>
+};
+
+#include "icnet_ccnx_interest.h"
+#include "icnet_ccnx_pending_interest.h"
+#include "icnet_ccnx_name.h"
+#include "icnet_ccnx_content_object.h"
+#include "icnet_ccnx_local_connector.h"
+
+namespace icnet {
+
+namespace ccnx {
+
+typedef std::unordered_map<Name, std::unique_ptr<PendingInterest>> PendingInterestHashTable;
+typedef uint64_t PendingInterestId;
+typedef CCNxMetaMessage CCNxMetaMessageStructure;
+
+class Portal : public std::enable_shared_from_this<Portal> // : public api::Face
+{
+ public:
+ Portal(std::string forwarder_ip_address = "127.0.0.1", std::string forwarder_port = "9695");
+
+ ~Portal();
+
+ void sendInterest(const Interest &interest,
+ const OnContentObjectCallback &on_content_object_callback,
+ const OnInterestTimeoutCallback &on_interest_timeout_callback);
+
+ void bind(Name &name, const OnInterestCallback &on_interest_callback);
+
+ void runEventsLoop();
+
+ void sendContentObject(const ContentObject &content_object);
+
+ void stopEventsLoop();
+
+ void clear();
+
+ boost::asio::io_service &getIoService();
+
+ private:
+
+ void processIncomingMessages(CCNxMetaMessageStructure *response);
+
+ void processInterest(CCNxMetaMessage *response);
+
+ void processContentObject(CCNxMetaMessage *response);
+
+ void processControlMessage(CCNxMetaMessage *response);
+
+ volatile bool is_running_;
+ bool clear_;
+
+ boost::asio::io_service io_service_;
+
+ std::shared_ptr<boost::asio::io_service::work> work_;
+
+ PendingInterestHashTable pending_interest_hash_table_;
+
+ OnInterestCallback on_interest_callback_;
+ std::list<Name> served_name_list_;
+
+ LocalConnector connector_;
+};
+
+} // end namespace ccnx
+
+} // end namespace icnet
+
+#endif // ICNET_CCNX_PORTAL_H_
diff --git a/icnet/ccnx/icnet_ccnx_segment.cc b/icnet/ccnx/icnet_ccnx_segment.cc
new file mode 100644
index 00000000..4215650a
--- /dev/null
+++ b/icnet/ccnx/icnet_ccnx_segment.cc
@@ -0,0 +1,78 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "icnet_ccnx_segment.h"
+
+namespace icnet {
+
+namespace ccnx {
+
+Segment::Segment(CCNxNameLabelType type, std::string &segment_value) : name_segment_(
+ ccnxNameSegment_CreateTypeValueArray(type, segment_value.length(), segment_value.c_str())) {
+}
+
+Segment::Segment(CCNxNameSegmentStructure *segment) : name_segment_(ccnxNameSegment_Acquire(segment)) {
+}
+
+Segment::Segment(const Segment &segment) : name_segment_(ccnxNameSegment_Copy(segment.name_segment_)) {
+}
+
+Segment::Segment(Segment &&otherSegment) : name_segment_(ccnxNameSegment_Acquire(otherSegment.name_segment_)) {
+}
+
+Segment::~Segment() {
+ ccnxNameSegment_Release(&name_segment_);
+}
+
+Segment &Segment::operator=(const Segment &segment) {
+ ccnxNameSegment_Release(&name_segment_);
+ this->name_segment_ = ccnxNameSegment_Copy(segment.name_segment_);
+ return *this;
+}
+
+bool Segment::operator==(const Segment &segment) {
+ return ccnxNameSegment_Equals(this->name_segment_, segment.name_segment_);
+}
+
+const std::string &Segment::toString() {
+ char *str = ccnxNameSegment_ToString(name_segment_);
+ name_segment_string_ = std::string(str);
+ free(str);
+ return name_segment_string_;
+}
+
+std::size_t Segment::getSize() {
+ return ccnxNameSegment_Length(name_segment_);
+}
+
+CCNxNameLabelType Segment::getType() {
+ return ccnxNameSegment_GetType(name_segment_);
+}
+
+CCNxNameSegmentStructure *Segment::getWrappedStructure() const {
+ return this->name_segment_;
+}
+
+bool Segment::isSegment() const {
+ return ccnxNameSegmentNumber_IsValid(name_segment_);
+}
+
+uint64_t Segment::toSegment() const {
+ return ccnxNameSegmentNumber_Value(name_segment_);
+}
+
+} // end namespace ccnx
+
+} // end namespace icnet
diff --git a/icnet/ccnx/icnet_ccnx_segment.h b/icnet/ccnx/icnet_ccnx_segment.h
new file mode 100644
index 00000000..6620ee74
--- /dev/null
+++ b/icnet/ccnx/icnet_ccnx_segment.h
@@ -0,0 +1,71 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ICNET_CCNX_SEGMENT_H_
+#define ICNET_CCNX_SEGMENT_H_
+
+#include "icnet_ccnx_common.h"
+
+extern "C" {
+#include <ccnx/common/ccnx_NameLabel.h>
+#include <ccnx/common/ccnx_NameSegment.h>
+#include <ccnx/common/ccnx_NameSegmentNumber.h>
+};
+
+typedef CCNxNameSegment CCNxNameSegmentStructure;
+
+namespace icnet {
+
+namespace ccnx {
+
+class Segment : public std::enable_shared_from_this<Segment> // : public api::Component
+{
+ public:
+ Segment(CCNxNameLabelType type, std::string &segment_value);
+
+ Segment(const Segment &segment);
+
+ Segment(Segment &&otherSegment);
+
+ Segment(CCNxNameSegmentStructure *segment);
+
+ ~Segment();
+
+ Segment &operator=(const Segment &segment);
+
+ bool operator==(const Segment &segment);
+
+ const std::string &toString();
+
+ std::size_t getSize();
+
+ CCNxNameLabelType getType();
+
+ CCNxNameSegmentStructure *getWrappedStructure() const;
+
+ bool isSegment() const;
+
+ uint64_t toSegment() const;
+
+ private:
+ CCNxNameSegmentStructure *name_segment_;
+ std::string name_segment_string_;
+};
+
+} // end namespace ccnx
+
+} // end namespace icnet
+
+#endif // ICNET_CCNX_PORTAL_H_
diff --git a/icnet/transport/consumer.conf b/icnet/transport/consumer.conf
new file mode 100644
index 00000000..1a366f32
--- /dev/null
+++ b/icnet/transport/consumer.conf
@@ -0,0 +1,21 @@
+; this file contais the parameters for RAAQM
+autotune = no
+lifetime = 500
+retransmissions = 128
+beta = 0.99
+drop = 0.003
+beta_wifi_ = 0.99
+drop_wifi_ = 0.6
+beta_lte_ = 0.99
+drop_lte_ = 0.003
+wifi_delay_ = 200
+lte_delay_ = 9000
+
+alpha = 0.95
+batching_parameter = 200
+
+;Choice of rate estimator:
+;0 --> an estimation each $(batching_parameter) packets
+;1 --> an estimation "a la TCP", estimation at the end of the download of the segment
+
+rate_estimator = 0
diff --git a/icnet/transport/icnet_common.h b/icnet/transport/icnet_common.h
new file mode 100644
index 00000000..d507b324
--- /dev/null
+++ b/icnet/transport/icnet_common.h
@@ -0,0 +1,50 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ICNET_COMMON_H_
+#define ICNET_COMMON_H_
+
+// require C++11
+#if __cplusplus < 201103L && !defined(__GXX_EXPERIMENTAL_CXX0X__)
+#error "icnet needs to be compiled using the C++11 standard"
+#endif
+
+#include <cstddef>
+#include <cstdint>
+#include <cstring>
+#include <functional>
+#include <limits>
+#include <memory>
+#include <stdexcept>
+#include <string>
+#include <type_traits>
+#include <unistd.h>
+#include <utility>
+#include <fstream>
+#include <chrono>
+
+#include "config.hpp"
+#include "icnet_ccnx_facade.h"
+
+#if defined(__GNUC__) || defined(__clang__)
+# define DEPRECATED(func) func __attribute__ ((deprecated))
+#elif defined(_MSC_VER)
+# define DEPRECATED(func) __declspec(deprecated) func
+#else
+# pragma message("DEPRECATED not implemented")
+# define DEPRECATED(func) func
+#endif
+
+#endif // ICNET_COMMON_H_
diff --git a/icnet/transport/icnet_content_store.cc b/icnet/transport/icnet_content_store.cc
new file mode 100644
index 00000000..884ad2e7
--- /dev/null
+++ b/icnet/transport/icnet_content_store.cc
@@ -0,0 +1,75 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "icnet_content_store.h"
+
+namespace icnet {
+
+ContentStore::ContentStore(std::size_t max_packets) : max_content_store_size_(max_packets) {
+}
+
+ContentStore::~ContentStore() {
+ content_store_hash_table_.clear();
+}
+
+void ContentStore::insert(const std::shared_ptr<ContentObject> &content_object) {
+ std::unique_lock<std::mutex> lock(cs_mutex_);
+ if (content_store_hash_table_.size() >= max_content_store_size_) {
+ // Evict item
+ content_store_hash_table_.erase(lru_list_.back());
+ lru_list_.pop_back();
+ }
+
+ // Insert new item
+ lru_list_.push_back(std::cref(content_object->getName()));
+ LRUList::iterator pos = lru_list_.end();
+ content_store_hash_table_[content_object->getName()] = CcnxContentStoreEntry(content_object, pos);
+
+}
+
+const std::shared_ptr<ContentObject> &ContentStore::find(const Interest &interest) {
+ std::unique_lock<std::mutex> lock(cs_mutex_);
+ ContentStoreHashTable::iterator it = content_store_hash_table_.find(interest.getName());
+ if (it != content_store_hash_table_.end()) {
+ if (it->second.second != lru_list_.begin()) {
+ // Move element to the top of the LRU list
+ lru_list_.splice(lru_list_.begin(), lru_list_, it->second.second);
+ }
+ return it->second.first;
+ } else {
+ return empty_reference_;
+ }
+}
+
+void ContentStore::erase(const Name &exact_name) {
+ std::unique_lock<std::mutex> lock(cs_mutex_);
+ ContentStoreHashTable::iterator it = content_store_hash_table_.find(exact_name);
+ lru_list_.erase(it->second.second);
+ content_store_hash_table_.erase(exact_name);
+}
+
+void ContentStore::setLimit(size_t max_packets) {
+ max_content_store_size_ = max_packets;
+}
+
+std::size_t ContentStore::getLimit() const {
+ return max_content_store_size_;
+}
+
+std::size_t ContentStore::size() const {
+ return content_store_hash_table_.size();
+}
+
+} // end namespace icnet \ No newline at end of file
diff --git a/icnet/transport/icnet_content_store.h b/icnet/transport/icnet_content_store.h
new file mode 100644
index 00000000..a626055d
--- /dev/null
+++ b/icnet/transport/icnet_content_store.h
@@ -0,0 +1,60 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ICNET_CONTENT_STORE_H_
+#define ICNET_CONTENT_STORE_H_
+
+#include "icnet_socket.h"
+
+#include <mutex>
+
+namespace icnet {
+
+typedef std::pair<std::shared_ptr<ContentObject>, std::list<std::reference_wrapper<const Name>>::iterator>
+ CcnxContentStoreEntry;
+typedef std::list<std::reference_wrapper<const Name>> LRUList;
+typedef std::unordered_map<Name, CcnxContentStoreEntry> ContentStoreHashTable;
+
+class ContentStore {
+ public:
+
+ explicit ContentStore(std::size_t max_packets = 65536);
+
+ ~ContentStore();
+
+ void insert(const std::shared_ptr<ContentObject> &content_object);
+
+ const std::shared_ptr<ContentObject> &find(const Interest &interest);
+
+ void erase(const Name &exact_name);
+
+ void setLimit(size_t max_packets);
+
+ size_t getLimit() const;
+
+ size_t size() const;
+
+ private:
+ ContentStoreHashTable content_store_hash_table_;
+ LRUList lru_list_;
+ std::shared_ptr<ContentObject> empty_reference_;
+ std::size_t max_content_store_size_;
+ std::mutex cs_mutex_;
+};
+
+} // end namespace icnet
+
+
+#endif // ICNET_CONTENT_STORE_H_
diff --git a/icnet/transport/icnet_download_observer.h b/icnet/transport/icnet_download_observer.h
new file mode 100644
index 00000000..7b640a1c
--- /dev/null
+++ b/icnet/transport/icnet_download_observer.h
@@ -0,0 +1,32 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ICNET_DOWNLOAD_OBSERVER_H_
+#define ICNET_DOWNLOAD_OBSERVER_H_
+
+namespace icnet {
+
+class IcnObserver {
+ public:
+ virtual ~IcnObserver() {
+ };
+
+ virtual void notifyStats(double throughput) = 0;
+};
+
+} // end namespace icnet
+
+#endif // ICNET_DOWNLOAD_OBSERVER_H_
+
diff --git a/icnet/transport/icnet_rate_estimation.cc b/icnet/transport/icnet_rate_estimation.cc
new file mode 100644
index 00000000..b378da06
--- /dev/null
+++ b/icnet/transport/icnet_rate_estimation.cc
@@ -0,0 +1,324 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#include "icnet_rate_estimation.h"
+
+namespace icnet {
+
+void *Timer(void *data) {
+ InterRttEstimator *estimator = (InterRttEstimator *) data;
+
+ double dat_rtt, my_avg_win, my_avg_rtt;
+ int my_win_change, number_of_packets, max_packet_size;
+
+ pthread_mutex_lock(&(estimator->mutex_));
+ dat_rtt = estimator->rtt_;
+ pthread_mutex_unlock(&(estimator->mutex_));
+
+ while (estimator->is_running_) {
+ usleep(KV * dat_rtt);
+
+ pthread_mutex_lock(&(estimator->mutex_));
+
+ dat_rtt = estimator->rtt_;
+ my_avg_win = estimator->avg_win_;
+ my_avg_rtt = estimator->avg_rtt_;
+ my_win_change = estimator->win_change_;
+ number_of_packets = estimator->number_of_packets_;
+ max_packet_size = estimator->max_packet_size_;
+ estimator->avg_rtt_ = estimator->rtt_;
+ estimator->avg_win_ = 0;
+ estimator->win_change_ = 0;
+ estimator->number_of_packets_ = 1;
+
+ pthread_mutex_unlock(&(estimator->mutex_));
+
+ if (number_of_packets == 0 || my_win_change == 0) {
+ continue;
+ }
+ if (estimator->estimation_ == 0) {
+ estimator->estimation_ = (my_avg_win * 8.0 * max_packet_size * 1000000.0 / (1.0 * my_win_change))
+ / (my_avg_rtt / (1.0 * number_of_packets));
+ }
+
+ estimator->estimation_ = estimator->alpha_ * estimator->estimation_ + (1 - estimator->alpha_)
+ * ((my_avg_win * 8.0 * max_packet_size * 1000000.0 / (1.0 * my_win_change))
+ / (my_avg_rtt / (1.0 * number_of_packets)));
+
+ if (estimator->observer_) {
+ estimator->observer_->notifyStats(estimator->estimation_);
+ }
+ }
+
+ return nullptr;
+}
+
+InterRttEstimator::InterRttEstimator(double alpha_arg) {
+ this->estimated_ = false;
+ this->observer_ = NULL;
+ this->alpha_ = alpha_arg;
+ this->thread_is_running_ = false;
+ this->my_th_ = NULL;
+ this->is_running_ = true;
+ this->avg_rtt_ = 0.0;
+ this->estimation_ = 0.0;
+ this->avg_win_ = 0.0;
+ this->rtt_ = 0.0;
+ this->win_change_ = 0;
+ this->number_of_packets_ = 0;
+ this->max_packet_size_ = 0;
+ this->win_current_ = 1.0;
+
+ pthread_mutex_init(&(this->mutex_), NULL);
+ gettimeofday(&(this->begin_), 0);
+}
+
+InterRttEstimator::~InterRttEstimator() {
+ this->is_running_ = false;
+ if (this->my_th_) {
+ pthread_join(*(this->my_th_), NULL);
+ }
+ this->my_th_ = NULL;
+ pthread_mutex_destroy(&(this->mutex_));
+}
+
+void InterRttEstimator::onRttUpdate(double rtt) {
+ pthread_mutex_lock(&(this->mutex_));
+ this->rtt_ = rtt;
+ this->number_of_packets_++;
+ this->avg_rtt_ += rtt;
+ pthread_mutex_unlock(&(this->mutex_));
+
+ if (!thread_is_running_) {
+ my_th_ = (pthread_t *) malloc(sizeof(pthread_t));
+ if (!my_th_) {
+ std::cerr << "Error allocating thread." << std::endl;
+ my_th_ = NULL;
+ }
+ if (/*int err = */pthread_create(my_th_, NULL, icnet::Timer, (void *) this)) {
+ std::cerr << "Error creating the thread" << std::endl;
+ my_th_ = NULL;
+ }
+ thread_is_running_ = true;
+ }
+}
+
+void InterRttEstimator::onWindowIncrease(double win_current) {
+ timeval end;
+ gettimeofday(&end, 0);
+ double delay = RaaqmDataPath::getMicroSeconds(end) - RaaqmDataPath::getMicroSeconds(this->begin_);
+
+ pthread_mutex_lock(&(this->mutex_));
+ this->avg_win_ += this->win_current_ * delay;
+ this->win_current_ = win_current;
+ this->win_change_ += delay;
+ pthread_mutex_unlock(&(this->mutex_));
+
+ gettimeofday(&(this->begin_), 0);
+}
+
+void InterRttEstimator::onWindowDecrease(double win_current) {
+ timeval end;
+ gettimeofday(&end, 0);
+ double delay = RaaqmDataPath::getMicroSeconds(end) - RaaqmDataPath::getMicroSeconds(this->begin_);
+
+ pthread_mutex_lock(&(this->mutex_));
+ this->avg_win_ += this->win_current_ * delay;
+ this->win_current_ = win_current;
+ this->win_change_ += delay;
+ pthread_mutex_unlock(&(this->mutex_));
+
+ gettimeofday(&(this->begin_), 0);
+}
+
+ALaTcpEstimator::ALaTcpEstimator() {
+ this->estimation_ = 0.0;
+ this->observer_ = NULL;
+ gettimeofday(&(this->begin_), 0);
+ this->totalSize_ = 0.0;
+}
+
+void ALaTcpEstimator::onStart() {
+ this->totalSize_ = 0.0;
+ gettimeofday(&(this->begin_), 0);
+}
+
+void ALaTcpEstimator::onDownloadFinished() {
+ timeval end;
+ gettimeofday(&end, 0);
+ double delay = RaaqmDataPath::getMicroSeconds(end) - RaaqmDataPath::getMicroSeconds(this->begin_);
+ this->estimation_ = this->totalSize_ * 8 * 1000000 / delay;
+ if (observer_) {
+ observer_->notifyStats(this->estimation_);
+ }
+}
+
+void ALaTcpEstimator::onDataReceived(int packet_size) {
+ this->totalSize_ += packet_size;
+}
+
+SimpleEstimator::SimpleEstimator(double alphaArg, int batching_param) {
+ this->estimation_ = 0.0;
+ this->estimated_ = false;
+ this->observer_ = NULL;
+ this->batching_param_ = batching_param;
+ this->total_size_ = 0.0;
+ this->number_of_packets_ = 0;
+ this->base_alpha_ = alphaArg;
+ this->alpha_ = alphaArg;
+ gettimeofday(&(this->begin_), 0);
+}
+
+void SimpleEstimator::onStart() {
+ this->estimated_ = false;
+ this->number_of_packets_ = 0;
+ this->total_size_ = 0.0;
+ gettimeofday(&(this->begin_), 0);
+}
+
+void SimpleEstimator::onDownloadFinished() {
+ if (!this->estimated_) {
+ timeval end;
+ gettimeofday(&end, 0);
+ double delay = RaaqmDataPath::getMicroSeconds(end) - RaaqmDataPath::getMicroSeconds(this->begin_);
+
+ //Assuming all packets carry max_packet_size_ bytes of data (8*max_packet_size_ bits); 1000000 factor to convert us to seconds
+ if (this->estimation_) {
+ this->estimation_ = alpha_ * this->estimation_ + (1 - alpha_) * (total_size_ * 8 * 1000000.0 / (delay));
+ } else {
+ this->estimation_ = total_size_ * 8 * 1000000.0 / (delay);
+ }
+ if (observer_) {
+ observer_->notifyStats(this->estimation_);
+ }
+ this->alpha_ = this->base_alpha_ * (((double) this->number_of_packets_) / ((double) this->batching_param_));
+ this->number_of_packets_ = 0;
+ this->total_size_ = 0.0;
+ gettimeofday(&(this->begin_), 0);
+ } else {
+ if (this->number_of_packets_ >= (int) (75.0 * (double) this->batching_param_ / 100.0)) {
+ timeval end;
+ gettimeofday(&end, 0);
+ double delay = RaaqmDataPath::getMicroSeconds(end) - RaaqmDataPath::getMicroSeconds(this->begin_);
+
+ //Assuming all packets carry max_packet_size_ bytes of data (8*max_packet_size_ bits); 1000000 factor to convert us to seconds
+ if (this->estimation_) {
+ this->estimation_ = alpha_ * this->estimation_ + (1 - alpha_) * (total_size_ * 8 * 1000000.0 / (delay));
+ } else {
+ this->estimation_ = total_size_ * 8 * 1000000.0 / (delay);
+ }
+ if (observer_) {
+ observer_->notifyStats(this->estimation_);
+ }
+ this->alpha_ = this->base_alpha_ * (((double) this->number_of_packets_) / ((double) this->batching_param_));
+ this->number_of_packets_ = 0;
+ this->total_size_ = 0.0;
+
+ gettimeofday(&(this->begin_), 0);
+
+ }
+ }
+}
+
+void SimpleEstimator::onDataReceived(int packet_size) {
+ this->total_size_ += packet_size;
+}
+
+void SimpleEstimator::onRttUpdate(double rtt) {
+ this->number_of_packets_++;
+
+ if (number_of_packets_ == this->batching_param_) {
+ timeval end;
+ gettimeofday(&end, 0);
+ double delay = RaaqmDataPath::getMicroSeconds(end) - RaaqmDataPath::getMicroSeconds(this->begin_);
+ //Assuming all packets carry max_packet_size_ bytes of data (8*max_packet_size_ bits); 1000000 factor to convert us to seconds
+ if (this->estimation_) {
+ this->estimation_ = alpha_ * this->estimation_ + (1 - alpha_) * (total_size_ * 8 * 1000000.0 / (delay));
+ } else {
+ this->estimation_ = total_size_ * 8 * 1000000.0 / (delay);
+ }
+ if (observer_) {
+ observer_->notifyStats(this->estimation_);
+ }
+ this->alpha_ = this->base_alpha_;
+ this->number_of_packets_ = 0;
+ this->total_size_ = 0.0;
+ gettimeofday(&(this->begin_), 0);
+ }
+}
+
+BatchingPacketsEstimator::BatchingPacketsEstimator(double alpha_arg, int param) {
+ this->estimated_ = false;
+ this->observer_ = NULL;
+ this->alpha_ = alpha_arg;
+ this->batching_param_ = param;
+ this->number_of_packets_ = 0;
+ this->avg_win_ = 0.0;
+ this->avg_rtt_ = 0.0;
+ this->win_change_ = 0.0;
+ this->max_packet_size_ = 0;
+ this->estimation_ = 0.0;
+ this->win_current_ = 1.0;
+ gettimeofday(&(this->begin_), 0);
+}
+
+void BatchingPacketsEstimator::onRttUpdate(double rtt) {
+ this->number_of_packets_++;
+ this->avg_rtt_ += rtt;
+
+ if (number_of_packets_ == this->batching_param_) {
+ if (estimation_ == 0) {
+ estimation_ = (avg_win_ * 8.0 * max_packet_size_ * 1000000.0 / (1.0 * win_change_))
+ / (avg_rtt_ / (1.0 * number_of_packets_));
+ } else {
+ estimation_ = alpha_ * estimation_ + (1 - alpha_)
+ * ((avg_win_ * 8.0 * max_packet_size_ * 1000000.0 / (1.0 * win_change_))
+ / (avg_rtt_ / (1.0 * number_of_packets_)));
+ }
+
+ if (observer_) {
+ observer_->notifyStats(estimation_);
+ }
+
+ this->number_of_packets_ = 0;
+ this->avg_win_ = 0.0;
+ this->avg_rtt_ = 0.0;
+ this->win_change_ = 0.0;
+ }
+}
+
+void BatchingPacketsEstimator::onWindowIncrease(double win_current) {
+ timeval end;
+ gettimeofday(&end, 0);
+ double delay = RaaqmDataPath::getMicroSeconds(end) - RaaqmDataPath::getMicroSeconds(this->begin_);
+ this->avg_win_ += this->win_current_ * delay;
+ this->win_current_ = win_current;
+ this->win_change_ += delay;
+ gettimeofday(&(this->begin_), 0);
+}
+
+void BatchingPacketsEstimator::onWindowDecrease(double win_current) {
+ timeval end;
+ gettimeofday(&end, 0);
+ double delay = RaaqmDataPath::getMicroSeconds(end) - RaaqmDataPath::getMicroSeconds(this->begin_);
+ this->avg_win_ += this->win_current_ * delay;
+ this->win_current_ = win_current;
+ this->win_change_ += delay;
+ gettimeofday(&(this->begin_), 0);
+}
+
+} // end namespace icnet
+
diff --git a/icnet/transport/icnet_rate_estimation.h b/icnet/transport/icnet_rate_estimation.h
new file mode 100644
index 00000000..86b879c2
--- /dev/null
+++ b/icnet/transport/icnet_rate_estimation.h
@@ -0,0 +1,187 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ICNET_RATE_ESTIMATION_H_
+#define ICNET_RATE_ESTIMATION_H_
+
+#include <unistd.h>
+
+#include "icnet_transport_raaqm_data_path.h"
+#include "icnet_download_observer.h"
+
+#define BATCH 50
+#define KV 20
+#define ALPHA 0.8
+#define RATE_CHOICE 0
+
+namespace icnet {
+
+class IcnRateEstimator {
+ public:
+ IcnRateEstimator() {
+ };
+
+ virtual ~IcnRateEstimator() {
+ };
+
+ virtual void onRttUpdate(double rtt) {
+ };
+
+ virtual void onDataReceived(int packetSize) {
+ };
+
+ virtual void onWindowIncrease(double winCurrent) {
+ };
+
+ virtual void onWindowDecrease(double winCurrent) {
+ };
+
+ virtual void onStart() {
+ };
+
+ virtual void onDownloadFinished() {
+ };
+
+ virtual void setObserver(IcnObserver *observer) {
+ this->observer_ = observer;
+ };
+ IcnObserver *observer_;
+ struct timeval begin_;
+ double base_alpha_;
+ double alpha_;
+ double estimation_;
+ int number_of_packets_;
+ // this boolean is to make sure at least one estimation of the BW is done
+ bool estimated_;
+};
+
+// A rate estimator RTT-based. Computes EWMA(WinSize)/EWMA(RTT)
+
+class InterRttEstimator : public IcnRateEstimator {
+ public:
+ InterRttEstimator(double alpha_arg);
+
+ ~InterRttEstimator();
+
+ void onRttUpdate(double rtt);
+
+ void onDataReceived(int packet_size) {
+ if (packet_size > this->max_packet_size_) {
+ this->max_packet_size_ = packet_size;
+ }
+ };
+
+ void onWindowIncrease(double win_current);
+
+ void onWindowDecrease(double win_current);
+
+ void onStart() {
+ };
+
+ void onDownloadFinished() {
+ };
+
+ // private: should be done by using getters
+ pthread_t *my_th_;
+ bool thread_is_running_;
+ double rtt_;
+ bool is_running_;
+ pthread_mutex_t mutex_;
+ double avg_rtt_;
+ double avg_win_;
+ int max_packet_size_;
+ double win_change_;
+ double win_current_;
+};
+
+// A rate estimator, Batching Packets based. Computes EWMA(WinSize)/EWMA(RTT)
+
+class BatchingPacketsEstimator : public IcnRateEstimator {
+ public:
+ BatchingPacketsEstimator(double alpha_arg, int batchingParam);
+
+ void onRttUpdate(double rtt);
+
+ void onDataReceived(int packet_size) {
+ if (packet_size > this->max_packet_size_) {
+ this->max_packet_size_ = packet_size;
+ }
+ };
+
+ void onWindowIncrease(double win_current);
+
+ void onWindowDecrease(double win_current);
+
+ void onStart() {
+ };
+
+ void onDownloadFinished() {
+ };
+
+ private:
+ int batching_param_;
+ double avg_rtt_;
+ double avg_win_;
+ double win_change_;
+ int max_packet_size_;
+ double win_current_;
+};
+
+//Segment Estimator
+
+class ALaTcpEstimator : public IcnRateEstimator {
+ public:
+ ALaTcpEstimator();
+
+ void onDataReceived(int packet_size);
+ void onStart();
+ void onDownloadFinished();
+
+ private:
+ double totalSize_;
+};
+
+// A Rate estimator, this one is the simplest: counting batching_param_ packets and then divide the sum of the size of these packets by the time taken to DL them.
+// Should be the one used
+
+class SimpleEstimator : public IcnRateEstimator {
+ public:
+ SimpleEstimator(double alpha, int batching_param);
+
+ void onRttUpdate(double rtt);
+
+ void onDataReceived(int packet_size);
+
+ void onWindowIncrease(double win_current) {
+ };
+
+ void onWindowDecrease(double win_current) {
+ };
+
+ void onStart();
+
+ void onDownloadFinished();
+
+ private:
+ int batching_param_;
+ double total_size_;
+};
+
+void *Timer(void *data);
+
+}// end namespace icnet
+
+#endif // ICNET_RATE_ESTIMATION_H_
+
diff --git a/icnet/transport/icnet_socket.h b/icnet/transport/icnet_socket.h
new file mode 100644
index 00000000..f1ce8da0
--- /dev/null
+++ b/icnet/transport/icnet_socket.h
@@ -0,0 +1,124 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ICNET_SOCKET_H_
+#define ICNET_SOCKET_H_
+
+#include "icnet_common.h"
+#include "icnet_socket_options_keys.h"
+#include "icnet_socket_options_default_values.h"
+#include "icnet_download_observer.h"
+
+#define SOCKET_OPTION_GET 0
+#define SOCKET_OPTION_NOT_GET 1
+#define SOCKET_OPTION_SET 2
+#define SOCKET_OPTION_NOT_SET 3
+#define SOCKET_OPTION_DEFAULT 12345
+
+#define VOID_HANDLER 0
+
+namespace icnet {
+
+class ConsumerSocket;
+class ProducerSocket;
+
+typedef ccnx::Interest Interest;
+typedef ccnx::ContentObject ContentObject;
+typedef ccnx::Name Name;
+typedef ccnx::Manifest Manifest;
+typedef ccnx::Portal Portal;
+typedef ccnx::KeyLocator KeyLocator;
+typedef ccnx::Segment Segment;
+typedef ccnx::PayloadType PayloadType;
+typedef ccnx::Array Array;
+
+typedef std::function<void(ConsumerSocket &, const Interest &)> ConsumerInterestCallback;
+typedef std::function<void(ConsumerSocket &, const uint8_t *, size_t)> ConsumerContentCallback;
+typedef std::function<void(ConsumerSocket &, const ContentObject &)> ConsumerContentObjectCallback;
+typedef std::function<bool(ConsumerSocket &, const ContentObject &)> ConsumerContentObjectVerificationCallback;
+typedef std::function<void(ConsumerSocket &, const Manifest &)> ConsumerManifestCallback;
+typedef std::function<void(ProducerSocket &, ContentObject &)> ProducerContentObjectCallback;
+typedef std::function<void(ProducerSocket &, const Interest &)> ProducerInterestCallback;
+
+class Socket {
+ public:
+
+ virtual int setSocketOption(int socket_option_key, int socket_option_value) = 0;
+
+ virtual int setSocketOption(int socket_option_key, double socket_option_value) = 0;
+
+ virtual int setSocketOption(int socket_option_key, size_t socket_option_value) = 0;
+
+ virtual int setSocketOption(int socket_option_key, bool socket_option_value) = 0;
+
+ virtual int setSocketOption(int socket_option_key, Name socket_option_value) = 0;
+
+ virtual int setSocketOption(int socket_option_key, ProducerContentObjectCallback socket_option_value) = 0;
+
+ virtual int setSocketOption(int socket_option_key, ProducerInterestCallback socket_option_value) = 0;
+
+ virtual int setSocketOption(int socket_option_key, ConsumerContentObjectVerificationCallback socket_option_value) = 0;
+
+ virtual int setSocketOption(int socket_option_key, ConsumerContentObjectCallback socket_option_value) = 0;
+
+ virtual int setSocketOption(int socket_option_key, ConsumerInterestCallback socket_option_value) = 0;
+
+ virtual int setSocketOption(int socket_option_key, ConsumerContentCallback socket_option_value) = 0;
+
+ virtual int setSocketOption(int socket_option_key, ConsumerManifestCallback socket_option_value) = 0;
+
+ virtual int setSocketOption(int socket_option_key, KeyLocator socket_option_value) = 0;
+
+ virtual int setSocketOption(int socket_option_key, IcnObserver *socket_option_value) = 0;
+
+ virtual int getSocketOption(int socket_option_key, int &socket_option_value) = 0;
+
+ virtual int getSocketOption(int socket_option_key, double &socket_option_value) = 0;
+
+ virtual int getSocketOption(int socket_option_key, size_t &socket_option_value) = 0;
+
+ virtual int getSocketOption(int socket_option_key, bool &socket_option_value) = 0;
+
+ virtual int getSocketOption(int socket_option_key, Name &socket_option_value) = 0;
+
+ virtual int getSocketOption(int socket_option_key, ProducerContentObjectCallback &socket_option_value) = 0;
+
+ virtual int getSocketOption(int socket_option_key, ProducerInterestCallback &socket_option_value) = 0;
+
+ virtual int getSocketOption(int socket_option_key,
+ ConsumerContentObjectVerificationCallback &socket_option_value) = 0;
+
+ virtual int getSocketOption(int socket_option_key, ConsumerContentObjectCallback &socket_option_value) = 0;
+
+ virtual int getSocketOption(int socket_option_key, ConsumerInterestCallback &socket_option_value) = 0;
+
+ virtual int getSocketOption(int socket_option_key, ConsumerContentCallback &socket_option_value) = 0;
+
+ virtual int getSocketOption(int socket_option_key, ConsumerManifestCallback &socket_option_value) = 0;
+
+ virtual int getSocketOption(int socket_option_key, KeyLocator &socket_option_value) = 0;
+
+ virtual int getSocketOption(int socket_option_key, std::shared_ptr<Portal> &socket_option_value) = 0;
+
+ virtual int getSocketOption(int socket_option_key, IcnObserver **socket_option_value) = 0;
+
+ protected:
+ virtual ~Socket() {
+ };
+};
+
+} // namespace icnet
+
+#endif // ICNET_SOCKET_H_
diff --git a/icnet/transport/icnet_socket_consumer.cc b/icnet/transport/icnet_socket_consumer.cc
new file mode 100644
index 00000000..b6c928c6
--- /dev/null
+++ b/icnet/transport/icnet_socket_consumer.cc
@@ -0,0 +1,613 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "icnet_socket_consumer.h"
+
+namespace icnet {
+
+ConsumerSocket::ConsumerSocket(Name prefix, int protocol)
+ : is_running_(false),
+ name_prefix_(prefix),
+ interest_lifetime_(default_values::interest_lifetime),
+ min_window_size_(default_values::min_window_size),
+ max_window_size_(default_values::max_window_size),
+ current_window_size_(-1),
+ /****** RAAQM Parameters ******/
+ minimum_drop_probability_(default_values::minimum_drop_probability),
+ sample_number_(default_values::sample_number),
+ gamma_(default_values::gamma_value),
+ beta_(default_values::beta_value),
+ drop_factor_(default_values::drop_factor),
+ /****** END RAAQM Parameters ******/
+ rate_estimation_alpha_(default_values::rate_alpha),
+ rate_estimation_observer_(NULL),
+ is_async_(false),
+ on_interest_output_(VOID_HANDLER),
+ on_interest_timeout_(VOID_HANDLER),
+ on_interest_satisfied_(VOID_HANDLER),
+ on_content_object_input_(VOID_HANDLER),
+ on_content_object_verification_(VOID_HANDLER),
+ on_content_object_(VOID_HANDLER),
+ on_manifest_(VOID_HANDLER),
+ on_payload_retrieved_(VOID_HANDLER),
+ virtual_download_(false),
+ rtt_stats_(false) {
+
+ portal_ = std::make_shared<Portal>();
+
+ switch (protocol) {
+ case TransportProtocolAlgorithms::VEGAS: {
+ transport_protocol_ = std::make_shared<VegasTransportProtocol>(this);
+ break;
+ }
+ case TransportProtocolAlgorithms::RAAQM: {
+ transport_protocol_ = std::make_shared<RaaqmTransportProtocol>(this);
+ break;
+ }
+ }
+}
+
+ConsumerSocket::~ConsumerSocket() {
+ stop();
+ transport_protocol_.reset();
+ portal_.reset();
+}
+
+int ConsumerSocket::consume(Name suffix) {
+ if (is_running_) {
+ portal_->getIoService().post(std::bind(&ConsumerSocket::postponedConsume, this, suffix));
+ return CONSUMER_BUSY;
+ }
+
+ if (is_async_) {
+ portal_ = std::make_shared<Portal>();
+ transport_protocol_->updatePortal();
+ }
+
+ name_suffix_ = suffix;
+ is_async_ = false;
+ transport_protocol_->start();
+ is_running_ = false;
+ return CONSUMER_READY;
+}
+
+void ConsumerSocket::postponedConsume(Name name_suffix) {
+ if (is_async_) {
+ portal_ = std::make_shared<Portal>();
+ transport_protocol_->updatePortal();
+ }
+
+ name_suffix_ = name_suffix;
+ is_async_ = false;
+ transport_protocol_->start();
+}
+
+int ConsumerSocket::asyncConsume(Name suffix) {
+ if (transport_protocol_->isRunning()) {
+ return CONSUMER_BUSY;
+ }
+
+ name_suffix_ = suffix;
+ is_async_ = true;
+ transport_protocol_->start();
+ return CONSUMER_READY;
+}
+
+void ConsumerSocket::stop() {
+ if (transport_protocol_->isRunning()) {
+ transport_protocol_->stop();
+ }
+
+ is_running_ = false;
+}
+
+int ConsumerSocket::setSocketOption(int socket_option_key, double socket_option_value) {
+ switch (socket_option_key) {
+ case MIN_WINDOW_SIZE:
+ min_window_size_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+
+ case MAX_WINDOW_SIZE:
+ max_window_size_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+
+ case CURRENT_WINDOW_SIZE:
+ current_window_size_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+
+ case GAMMA_VALUE:
+ gamma_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+
+ case BETA_VALUE:
+ beta_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+
+ case DROP_FACTOR:
+ drop_factor_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+
+ case MINIMUM_DROP_PROBABILITY:
+ minimum_drop_probability_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+
+ case RATE_ESTIMATION_ALPHA:
+ if (socket_option_value >= 0 && socket_option_value < 1) {
+ rate_estimation_alpha_ = socket_option_value;
+ } else {
+ rate_estimation_alpha_ = ALPHA;
+ }
+ return SOCKET_OPTION_SET;
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+}
+
+int ConsumerSocket::setSocketOption(int socket_option_key, int socket_option_value) {
+ switch (socket_option_key) {
+
+ case GeneralTransportOptions::INPUT_BUFFER_SIZE:
+ input_buffer_size_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+
+ case GeneralTransportOptions::OUTPUT_BUFFER_SIZE:
+ output_buffer_size_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+
+ case GeneralTransportOptions::MAX_INTEREST_RETX:
+ max_retransmissions_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+
+ case GeneralTransportOptions::INTEREST_LIFETIME:
+ interest_lifetime_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+
+ case ConsumerCallbacksOptions::INTEREST_RETRANSMISSION:
+ if (socket_option_value == VOID_HANDLER) {
+ on_interest_retransmission_ = VOID_HANDLER;
+ return SOCKET_OPTION_SET;
+ }
+
+ case ConsumerCallbacksOptions::INTEREST_EXPIRED:
+ if (socket_option_value == VOID_HANDLER) {
+ on_interest_timeout_ = VOID_HANDLER;
+ return SOCKET_OPTION_SET;
+ }
+
+ case ConsumerCallbacksOptions::INTEREST_SATISFIED:
+ if (socket_option_value == VOID_HANDLER) {
+ on_interest_satisfied_ = VOID_HANDLER;
+ return SOCKET_OPTION_SET;
+ }
+
+ case ConsumerCallbacksOptions::INTEREST_OUTPUT:
+ if (socket_option_value == VOID_HANDLER) {
+ on_interest_output_ = VOID_HANDLER;
+ return SOCKET_OPTION_SET;
+ }
+
+ case ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT:
+ if (socket_option_value == VOID_HANDLER) {
+ on_content_object_input_ = VOID_HANDLER;
+ return SOCKET_OPTION_SET;
+ }
+
+ case ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY:
+ if (socket_option_value == VOID_HANDLER) {
+ on_content_object_verification_ = VOID_HANDLER;
+ return SOCKET_OPTION_SET;
+ }
+
+ case ConsumerCallbacksOptions::CONTENT_RETRIEVED:
+ if (socket_option_value == VOID_HANDLER) {
+ on_payload_retrieved_ = VOID_HANDLER;
+ return SOCKET_OPTION_SET;
+ }
+
+ case RateEstimationOptions::RATE_ESTIMATION_BATCH_PARAMETER:
+ if (socket_option_value > 0) {
+ rate_estimation_batching_parameter_ = socket_option_value;
+ } else {
+ rate_estimation_batching_parameter_ = BATCH;
+ }
+ return SOCKET_OPTION_SET;
+
+ case RateEstimationOptions::RATE_ESTIMATION_CHOICE:
+ if (socket_option_value > 0) {
+ rate_estimation_choice_ = socket_option_value;
+ } else {
+ rate_estimation_choice_ = RATE_CHOICE;
+ }
+ return SOCKET_OPTION_SET;
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+}
+
+int ConsumerSocket::setSocketOption(int socket_option_key, size_t socket_option_value) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::INPUT_BUFFER_SIZE:
+ input_buffer_size_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+
+ case GeneralTransportOptions::OUTPUT_BUFFER_SIZE:
+ output_buffer_size_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+}
+
+int ConsumerSocket::setSocketOption(int socket_option_key, bool socket_option_value) {
+ switch (socket_option_key) {
+
+ case GeneralTransportOptions::RUNNING:
+ is_running_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+
+ case OtherOptions::VIRTUAL_DOWNLOAD:
+ virtual_download_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+
+ case RaaqmTransportOptions::RTT_STATS:
+ rtt_stats_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+}
+
+int ConsumerSocket::setSocketOption(int socket_option_key, Name socket_option_value) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::NAME_PREFIX:
+ name_prefix_ = socket_option_value;;
+ return SOCKET_OPTION_SET;
+
+ case GeneralTransportOptions::NAME_SUFFIX:
+ name_suffix_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+}
+
+int ConsumerSocket::setSocketOption(int socket_option_key, ConsumerContentObjectCallback socket_option_value) {
+ switch (socket_option_key) {
+ case ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT:
+ on_content_object_input_ = socket_option_value;;
+ return SOCKET_OPTION_SET;
+
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+}
+
+int ConsumerSocket::setSocketOption(int socket_option_key, ProducerContentObjectCallback socket_option_value) {
+ return SOCKET_OPTION_NOT_SET;
+}
+
+int ConsumerSocket::setSocketOption(int socket_option_key,
+ ConsumerContentObjectVerificationCallback socket_option_value) {
+ switch (socket_option_key) {
+ case ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY:
+ on_content_object_verification_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+}
+
+int ConsumerSocket::setSocketOption(int socket_option_key, ConsumerInterestCallback socket_option_value) {
+ switch (socket_option_key) {
+ case ConsumerCallbacksOptions::INTEREST_RETRANSMISSION:
+ on_interest_retransmission_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+
+ case ConsumerCallbacksOptions::INTEREST_OUTPUT:
+ on_interest_output_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+
+ case ConsumerCallbacksOptions::INTEREST_EXPIRED:
+ on_interest_timeout_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+
+ case ConsumerCallbacksOptions::INTEREST_SATISFIED:
+ on_interest_satisfied_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+}
+
+int ConsumerSocket::setSocketOption(int socket_option_key, ProducerInterestCallback socket_option_value) {
+ return SOCKET_OPTION_NOT_SET;
+}
+
+int ConsumerSocket::setSocketOption(int socket_option_key, ConsumerContentCallback socket_option_value) {
+ switch (socket_option_key) {
+ case ConsumerCallbacksOptions::CONTENT_RETRIEVED:
+ on_payload_retrieved_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+}
+
+int ConsumerSocket::setSocketOption(int socket_option_key, ConsumerManifestCallback socket_option_value) {
+ switch (socket_option_key) {
+ case ConsumerCallbacksOptions::MANIFEST_INPUT:
+ on_manifest_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+}
+
+int ConsumerSocket::setSocketOption(int socket_option_key, KeyLocator socket_option_value) {
+ return SOCKET_OPTION_NOT_SET;
+}
+
+int ConsumerSocket::setSocketOption(int socket_option_key, IcnObserver *socket_option_value) {
+ if (socket_option_key == RateEstimationOptions::RATE_ESTIMATION_OBSERVER) {
+ rate_estimation_observer_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+ } else {
+ return SOCKET_OPTION_NOT_SET;
+ }
+}
+
+int ConsumerSocket::getSocketOption(int socket_option_key, double &socket_option_value) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::MIN_WINDOW_SIZE:
+ socket_option_value = min_window_size_;
+ return SOCKET_OPTION_GET;
+
+ case GeneralTransportOptions::MAX_WINDOW_SIZE:
+ socket_option_value = max_window_size_;
+ return SOCKET_OPTION_GET;
+
+ case GeneralTransportOptions::CURRENT_WINDOW_SIZE:
+ socket_option_value = current_window_size_;
+ return SOCKET_OPTION_GET;
+
+ // RAAQM parameters
+
+ case RaaqmTransportOptions::GAMMA_VALUE:
+ socket_option_value = gamma_;
+ return SOCKET_OPTION_GET;
+
+ case RaaqmTransportOptions::BETA_VALUE:
+ socket_option_value = beta_;
+ return SOCKET_OPTION_GET;
+
+ case RaaqmTransportOptions::DROP_FACTOR:
+ socket_option_value = drop_factor_;
+ return SOCKET_OPTION_GET;
+
+ case RaaqmTransportOptions::MINIMUM_DROP_PROBABILITY:
+ socket_option_value = minimum_drop_probability_;
+ return SOCKET_OPTION_GET;
+
+ case RateEstimationOptions::RATE_ESTIMATION_ALPHA:
+ socket_option_value = rate_estimation_alpha_;
+ return SOCKET_OPTION_GET;
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+}
+
+int ConsumerSocket::getSocketOption(int socket_option_key, int &socket_option_value) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::INPUT_BUFFER_SIZE:
+ socket_option_value = input_buffer_size_;
+ return SOCKET_OPTION_GET;
+
+ case GeneralTransportOptions::OUTPUT_BUFFER_SIZE:
+ socket_option_value = output_buffer_size_;
+ return SOCKET_OPTION_GET;
+
+ case GeneralTransportOptions::MAX_INTEREST_RETX:
+ socket_option_value = max_retransmissions_;
+ return SOCKET_OPTION_GET;
+
+ case GeneralTransportOptions::INTEREST_LIFETIME:
+ socket_option_value = interest_lifetime_;
+ return SOCKET_OPTION_GET;
+
+ case RaaqmTransportOptions::SAMPLE_NUMBER:
+ socket_option_value = sample_number_;
+ return SOCKET_OPTION_GET;
+
+ case RateEstimationOptions::RATE_ESTIMATION_BATCH_PARAMETER:
+ socket_option_value = rate_estimation_batching_parameter_;
+ return SOCKET_OPTION_GET;
+
+ case RateEstimationOptions::RATE_ESTIMATION_CHOICE:
+ socket_option_value = rate_estimation_choice_;
+ return SOCKET_OPTION_GET;
+
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+}
+
+int ConsumerSocket::getSocketOption(int socket_option_key, size_t &socket_option_value) {
+ switch (socket_option_key) {
+ case INPUT_BUFFER_SIZE:
+ socket_option_value = input_buffer_size_;
+ return SOCKET_OPTION_GET;
+
+ case OUTPUT_BUFFER_SIZE:
+ socket_option_value = output_buffer_size_;
+ return SOCKET_OPTION_GET;
+
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+}
+
+int ConsumerSocket::getSocketOption(int socket_option_key, bool &socket_option_value) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::ASYNC_MODE:
+ socket_option_value = is_async_;
+ return SOCKET_OPTION_GET;
+
+ case GeneralTransportOptions::RUNNING:
+ socket_option_value = is_running_;
+ return SOCKET_OPTION_GET;
+
+ case OtherOptions::VIRTUAL_DOWNLOAD:
+ socket_option_value = virtual_download_;
+ return SOCKET_OPTION_GET;
+
+ case RaaqmTransportOptions::RTT_STATS:
+ socket_option_value = rtt_stats_;
+ return SOCKET_OPTION_GET;
+
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+}
+
+int ConsumerSocket::getSocketOption(int socket_option_key, Name &socket_option_value) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::NAME_PREFIX:
+ socket_option_value = name_prefix_;
+ return SOCKET_OPTION_GET;
+
+ case GeneralTransportOptions::NAME_SUFFIX:
+ socket_option_value = name_suffix_;
+ return SOCKET_OPTION_GET;
+
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+}
+
+int ConsumerSocket::getSocketOption(int socket_option_key, ConsumerContentObjectCallback &socket_option_value) {
+ switch (socket_option_key) {
+ case ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT:
+ socket_option_value = on_content_object_input_;
+ return SOCKET_OPTION_GET;
+
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+}
+
+int ConsumerSocket::getSocketOption(int socket_option_key, ProducerContentObjectCallback &socket_option_value) {
+ return SOCKET_OPTION_NOT_GET;
+}
+
+int ConsumerSocket::getSocketOption(int socket_option_key,
+ ConsumerContentObjectVerificationCallback &socket_option_value) {
+ switch (socket_option_key) {
+ case ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY:
+ socket_option_value = on_content_object_verification_;
+ return SOCKET_OPTION_GET;
+
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+}
+
+int ConsumerSocket::getSocketOption(int socket_option_key, ConsumerInterestCallback &socket_option_value) {
+ switch (socket_option_key) {
+ case ConsumerCallbacksOptions::INTEREST_RETRANSMISSION:
+ socket_option_value = on_interest_retransmission_;
+ return SOCKET_OPTION_GET;
+
+ case ConsumerCallbacksOptions::INTEREST_OUTPUT:
+ socket_option_value = on_interest_output_;
+ return SOCKET_OPTION_GET;
+
+ case ConsumerCallbacksOptions::INTEREST_EXPIRED:
+ socket_option_value = on_interest_timeout_;
+ return SOCKET_OPTION_GET;
+
+ case ConsumerCallbacksOptions::INTEREST_SATISFIED:
+ socket_option_value = on_interest_satisfied_;
+ return SOCKET_OPTION_GET;
+
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+}
+
+int ConsumerSocket::getSocketOption(int socket_option_key, ProducerInterestCallback &socket_option_value) {
+ return SOCKET_OPTION_NOT_GET;
+}
+
+int ConsumerSocket::getSocketOption(int socket_option_key, ConsumerContentCallback &socket_option_value) {
+ switch (socket_option_key) {
+ case ConsumerCallbacksOptions::CONTENT_RETRIEVED:
+ socket_option_value = on_payload_retrieved_;
+ return SOCKET_OPTION_GET;
+
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+}
+
+int ConsumerSocket::getSocketOption(int socket_option_key, ConsumerManifestCallback &socket_option_value) {
+ switch (socket_option_key) {
+ case ConsumerCallbacksOptions::MANIFEST_INPUT:
+ socket_option_value = on_manifest_;
+ return SOCKET_OPTION_GET;
+
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+}
+
+int ConsumerSocket::getSocketOption(int socket_option_key, KeyLocator &socket_option_value) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::KEY_LOCATOR:
+ socket_option_value = key_locator_;
+ return SOCKET_OPTION_GET;
+
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+}
+
+int ConsumerSocket::getSocketOption(int socket_option_key, std::shared_ptr<Portal> &socket_option_value) {
+ switch (socket_option_key) {
+ case PORTAL:
+ socket_option_value = portal_;
+ return SOCKET_OPTION_GET;
+
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+}
+
+int ConsumerSocket::getSocketOption(int socket_option_key, IcnObserver **socket_option_value) {
+ if (socket_option_key == RATE_ESTIMATION_OBSERVER) {
+ *socket_option_value = (rate_estimation_observer_);
+ return SOCKET_OPTION_GET;
+ } else {
+ return SOCKET_OPTION_NOT_GET;
+ }
+}
+
+} // end namespace icnet
diff --git a/icnet/transport/icnet_socket_consumer.h b/icnet/transport/icnet_socket_consumer.h
new file mode 100644
index 00000000..6b9ec811
--- /dev/null
+++ b/icnet/transport/icnet_socket_consumer.h
@@ -0,0 +1,161 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ICNET_CONSUMER_SOCKET_H_
+#define ICNET_CONSUMER_SOCKET_H_
+
+#include "icnet_common.h"
+#include "icnet_socket.h"
+#include "icnet_transport.h"
+#include "icnet_transport_raaqm.h"
+#include "icnet_transport_vegas.h"
+
+#define CONSUMER_READY 0
+#define CONSUMER_BUSY 1
+
+namespace icnet {
+
+class ConsumerSocket : public Socket {
+ public:
+ explicit ConsumerSocket(const Name prefix, int protocol);
+
+ ~ConsumerSocket();
+
+ int consume(Name suffix);
+
+ int asyncConsume(Name suffix);
+
+ void stop();
+
+ int setSocketOption(int socket_option_key, int socket_option_value);
+
+ int setSocketOption(int socket_option_key, double socket_option_value);
+
+ int setSocketOption(int socket_option_key, bool socket_option_value);
+
+ int setSocketOption(int socket_option_key, size_t socket_option_value);
+
+ int setSocketOption(int socket_option_key, Name socket_option_value);
+
+ int setSocketOption(int socket_option_key, ProducerContentObjectCallback socket_option_value);
+
+ int setSocketOption(int socket_option_key, ConsumerContentObjectVerificationCallback socket_option_value);
+
+ int setSocketOption(int socket_option_key, ConsumerContentObjectCallback socket_option_value);
+
+ int setSocketOption(int socket_option_key, ConsumerInterestCallback socket_option_value);
+
+ int setSocketOption(int socket_option_key, ProducerInterestCallback socket_option_value);
+
+ int setSocketOption(int socket_option_key, ConsumerContentCallback socket_option_value);
+
+ int setSocketOption(int socket_option_key, ConsumerManifestCallback socket_option_value);
+
+ int setSocketOption(int socket_option_key, KeyLocator socket_option_value);
+
+ int setSocketOption(int socket_option_key, IcnObserver *socket_option_value);
+
+ int getSocketOption(int socket_option_key, int &socket_option_value);
+
+ int getSocketOption(int socket_option_key, double &socket_option_value);
+
+ int getSocketOption(int socket_option_key, size_t &socket_option_value);
+
+ int getSocketOption(int socket_option_key, bool &socket_option_value);
+
+ int getSocketOption(int socket_option_key, Name &socket_option_value);
+
+ int getSocketOption(int socket_option_key, ProducerContentObjectCallback &socket_option_value);
+
+ int getSocketOption(int socket_option_key, ConsumerContentObjectVerificationCallback &socket_option_value);
+
+ int getSocketOption(int socket_option_key, ConsumerContentObjectCallback &socket_option_value);
+
+ int getSocketOption(int socket_option_key, ConsumerInterestCallback &socket_option_value);
+
+ int getSocketOption(int socket_option_key, ProducerInterestCallback &socket_option_value);
+
+ int getSocketOption(int socket_option_key, ConsumerContentCallback &socket_option_value);
+
+ int getSocketOption(int socket_option_key, ConsumerManifestCallback &socket_option_value);
+
+ int getSocketOption(int socket_option_key, KeyLocator &socket_option_value);
+
+ int getSocketOption(int socket_option_key, std::shared_ptr<Portal> &socket_option_value);
+
+ int getSocketOption(int socket_option_key, IcnObserver **socket_option_value);
+
+ private:
+
+ void postponedConsume(Name name_suffix);
+
+ private:
+ // context inner state variables
+ bool is_running_;
+ std::shared_ptr<Portal> portal_;
+ std::shared_ptr<TransportProtocol> transport_protocol_;
+
+ Name name_prefix_;
+ Name name_suffix_;
+
+ int interest_lifetime_;
+
+ double min_window_size_;
+ double max_window_size_;
+ double current_window_size_;
+ int max_retransmissions_;
+ size_t output_buffer_size_;
+ size_t input_buffer_size_;
+
+ // RAAQM Parameters
+
+ double minimum_drop_probability_;
+ unsigned int sample_number_;
+ double gamma_;
+ double beta_;
+ double drop_factor_;
+
+ //Rate estimation parameters
+ double rate_estimation_alpha_;
+ IcnObserver *rate_estimation_observer_;
+ int rate_estimation_batching_parameter_;
+ int rate_estimation_choice_;
+
+ bool is_async_;
+
+ KeyLocator key_locator_;
+
+ ConsumerInterestCallback on_interest_retransmission_;
+ ConsumerInterestCallback on_interest_output_;
+ ConsumerInterestCallback on_interest_timeout_;
+ ConsumerInterestCallback on_interest_satisfied_;
+
+ ConsumerContentObjectCallback on_content_object_input_;
+ ConsumerContentObjectVerificationCallback on_content_object_verification_;
+
+ ConsumerContentObjectCallback on_content_object_;
+ ConsumerManifestCallback on_manifest_;
+
+ ConsumerContentCallback on_payload_retrieved_;
+
+ // Virtual download for traffic generator
+
+ bool virtual_download_;
+ bool rtt_stats_;
+};
+
+} // end namespace icnet
+
+#endif // ICNET_CONSUMER_SOCKET_H_
diff --git a/icnet/transport/icnet_socket_options_default_values.h b/icnet/transport/icnet_socket_options_default_values.h
new file mode 100644
index 00000000..0f830a54
--- /dev/null
+++ b/icnet/transport/icnet_socket_options_default_values.h
@@ -0,0 +1,61 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ICNET_SOCKET_OPTIONS_DEFAULT_VALUES_H_
+#define ICNET_SOCKET_OPTIONS_DEFAULT_VALUES_H_
+
+namespace icnet {
+
+namespace default_values {
+
+const int interest_lifetime = 1000; // milliseconds
+const int content_object_expiry_time = 50000; // milliseconds -> 50 seconds
+const int content_object_packet_size = 1500; // The ethernet MTU
+const int producer_socket_input_buffer_size = 150000; // Interests
+const int producer_socket_output_buffer_size = 150000; // Content Object
+const int default_buffer_size = 8096 * 8096 * 2;
+const int signature_size = 260; // bytes
+const int key_locator_size = 60; // bytes
+const int limit_guard = 80; // bytes
+const int min_window_size = 1; // Interests
+const int max_window_size = 128000; // Interests
+const int digest_size = 34; // bytes
+const int max_out_of_order_segments = 3; // content object
+
+// RAAQM
+const int sample_number = 30;
+const double gamma_value = 1;
+const double beta_value = 0.8;
+const double drop_factor = 0.2;
+const double minimum_drop_probability = 0.00001;
+const int path_id = 0;
+const double rate_alpha = 0.8;
+
+// Vegas
+const double alpha = 1 / 8;
+const double beta = 1 / 4;
+const uint16_t k = 4;
+const std::chrono::milliseconds clock_granularity = std::chrono::milliseconds(100);
+
+// maximum allowed values
+const int transport_protocol_min_retransmissions = 0;
+const int transport_protocol_max_retransmissions = 128;
+const int max_content_object_size = 8096;
+
+} // end namespace default_values
+
+} // end namespace icnet
+
+#endif // ICNET_SOCKET_OPTIONS_DEFAULT_VALUES_H_
diff --git a/icnet/transport/icnet_socket_options_keys.h b/icnet/transport/icnet_socket_options_keys.h
new file mode 100644
index 00000000..4b82f67a
--- /dev/null
+++ b/icnet/transport/icnet_socket_options_keys.h
@@ -0,0 +1,95 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ICNET_TRANSPORT_OPTIONS_KEYS_H_
+#define ICNET_TRANSPORT_OPTIONS_KEYS_H_
+
+namespace icnet {
+
+typedef enum {
+ RAAQM = 0, VEGAS = 1
+} TransportProtocolAlgorithms;
+
+typedef enum {
+ INPUT_BUFFER_SIZE = 101,
+ OUTPUT_BUFFER_SIZE = 102,
+ NAME_PREFIX = 103,
+ NAME_SUFFIX = 104,
+ MAX_INTEREST_RETX = 105,
+ DATA_PACKET_SIZE = 106,
+ INTEREST_LIFETIME = 107,
+ CONTENT_OBJECT_EXPIRY_TIME = 108,
+ KEY_LOCATOR = 110,
+ SIGNATURE_TYPE = 111,
+ MIN_WINDOW_SIZE = 112,
+ MAX_WINDOW_SIZE = 113,
+ CURRENT_WINDOW_SIZE = 114,
+ ASYNC_MODE = 115,
+ MAKE_MANIFEST = 116,
+ PORTAL = 117,
+ RUNNING = 118,
+} GeneralTransportOptions;
+
+typedef enum {
+ SAMPLE_NUMBER = 201,
+ GAMMA_VALUE = 202,
+ BETA_VALUE = 203,
+ DROP_FACTOR = 204,
+ MINIMUM_DROP_PROBABILITY = 205,
+ PATH_ID = 206,
+ RTT_STATS = 207,
+} RaaqmTransportOptions;
+
+typedef enum {
+ RATE_ESTIMATION_ALPHA = 301,
+ RATE_ESTIMATION_OBSERVER = 302,
+ RATE_ESTIMATION_BATCH_PARAMETER = 303,
+ RATE_ESTIMATION_CHOICE = 304,
+} RateEstimationOptions;
+
+typedef enum {
+ INTEREST_OUTPUT = 401,
+ INTEREST_RETRANSMISSION = 402,
+ INTEREST_EXPIRED = 403,
+ INTEREST_SATISFIED = 404,
+ CONTENT_OBJECT_INPUT = 411,
+ MANIFEST_INPUT = 412,
+ CONTENT_OBJECT_TO_VERIFY = 413,
+ CONTENT_RETRIEVED = 414,
+} ConsumerCallbacksOptions;
+
+typedef enum {
+ INTEREST_INPUT = 501,
+ INTEREST_DROP = 502,
+ INTEREST_PASS = 503,
+ CACHE_HIT = 506,
+ CACHE_MISS = 508,
+ NEW_CONTENT_OBJECT = 509,
+ CONTENT_OBJECT_SIGN = 513,
+ CONTENT_OBJECT_READY = 510,
+ CONTENT_OBJECT_OUTPUT = 511,
+} ProducerCallbacksOptions;
+
+typedef enum {
+ VIRTUAL_DOWNLOAD = 601, USE_CFG_FILE = 603,
+} OtherOptions;
+
+typedef enum {
+ SHA_256 = 701, RSA_256 = 702,
+} SignatureType;
+
+} // end namespace icnet
+
+#endif // ICNET_TRANSPORT_OPTIONS_KEYS_H_
diff --git a/icnet/transport/icnet_socket_producer.cc b/icnet/transport/icnet_socket_producer.cc
new file mode 100644
index 00000000..9a870e5c
--- /dev/null
+++ b/icnet/transport/icnet_socket_producer.cc
@@ -0,0 +1,717 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "icnet_socket_producer.h"
+
+namespace icnet {
+
+ProducerSocket::ProducerSocket(Name prefix)
+ : portal_(new Portal()),
+ name_prefix_(prefix),
+ data_packet_size_(default_values::content_object_packet_size),
+ content_object_expiry_time_(default_values::content_object_expiry_time),
+ registration_status_(REGISTRATION_NOT_ATTEMPTED),
+ making_manifest_(false),
+ signature_type_(SHA_256),
+ key_locator_size_(default_values::key_locator_size),
+ output_buffer_(default_values::producer_socket_output_buffer_size),
+ input_buffer_capacity_(default_values::producer_socket_input_buffer_size),
+ input_buffer_size_(0),
+ processing_thread_stop_(false),
+ listening_thread_stop_(false),
+ on_interest_input_(VOID_HANDLER),
+ on_interest_dropped_input_buffer_(VOID_HANDLER),
+ on_interest_inserted_input_buffer_(VOID_HANDLER),
+ on_interest_satisfied_output_buffer_(VOID_HANDLER),
+ on_interest_process_(VOID_HANDLER),
+ on_new_segment_(VOID_HANDLER),
+ on_content_object_to_sign_(VOID_HANDLER),
+ on_content_object_in_output_buffer_(VOID_HANDLER),
+ on_content_object_output_(VOID_HANDLER),
+ on_content_object_evicted_from_output_buffer_(VOID_HANDLER) {
+ listening_thread_stop_ = false;
+ key_locator_size_ = default_values::key_locator_size;
+}
+
+ProducerSocket::~ProducerSocket() {
+ processing_thread_stop_ = true;
+ portal_->stopEventsLoop();
+ processing_thread_.join();
+
+ if (listening_thread_.joinable()) {
+ listening_thread_.join();
+ }
+}
+
+void ProducerSocket::attach() {
+ listening_thread_ = std::thread(std::bind(&ProducerSocket::listen, this));
+ // processing_thread_ = boost::thread(bind(&ProducerTransport::processIncomingInterest, this));
+}
+
+void ProducerSocket::serveForever() {
+ if (listening_thread_.joinable()) {
+ listening_thread_.join();
+ }
+}
+
+void ProducerSocket::dispatch() {
+ // Check that the INTEREST_INPUT callback is set.
+ if (on_interest_input_ == VOID_HANDLER) {
+ std::cerr << "Warning: the dispatcher function needs a dispatcher callback! "
+ "You need to set INTEREST_INPUT callback" << std::endl;
+ }
+
+ listening_thread_ = std::thread(std::bind(&ProducerSocket::listen, this));
+}
+
+void ProducerSocket::listen() {
+ registration_status_ = REGISTRATION_IN_PROGRESS;
+
+ portal_
+ ->bind(name_prefix_, std::bind(&ProducerSocket::onInterest, this, std::placeholders::_1, std::placeholders::_2));
+
+ portal_->runEventsLoop();
+}
+
+void ProducerSocket::passContentObjectToCallbacks(const std::shared_ptr<ContentObject> &content_object) {
+ if (content_object) {
+ if (on_new_segment_ != VOID_HANDLER) {
+ on_new_segment_(*this, *content_object);
+ }
+
+ if (on_content_object_to_sign_ != VOID_HANDLER) {
+ if (!making_manifest_) {
+ on_content_object_to_sign_(*this, *content_object);
+ } else {
+ if (content_object->getContentType() == PayloadType::MANIFEST) {
+ on_content_object_to_sign_(*this, *content_object);
+ } else {
+ content_object->signWithSha256(key_locator_);
+ }
+ }
+ } else {
+ content_object->signWithSha256(key_locator_);
+ }
+
+ if (on_content_object_in_output_buffer_ != VOID_HANDLER) {
+ on_content_object_in_output_buffer_(*this, *content_object);
+ }
+
+ output_buffer_.insert(content_object);
+
+ if (on_content_object_output_ != VOID_HANDLER) {
+ on_content_object_output_(*this, *content_object);
+ }
+
+ if (content_object->getName().get(-1).toSegment() == 0) {
+ portal_->sendContentObject(*content_object);
+ }
+ }
+}
+
+void ProducerSocket::produce(ContentObject &content_object) {
+ if (!name_prefix_.isPrefixOf(content_object.getName())) {
+ return;
+ }
+
+ if (on_content_object_in_output_buffer_ != VOID_HANDLER) {
+ on_content_object_in_output_buffer_(*this, content_object);
+ }
+
+ if (on_content_object_output_ != VOID_HANDLER) {
+ on_content_object_output_(*this, content_object);
+ }
+
+ portal_->sendContentObject(content_object);
+}
+
+void ProducerSocket::produce(Name suffix, const uint8_t *buf, size_t buffer_size, const int response_id, bool is_last) {
+
+ if (buffer_size == 0) {
+ return;
+ }
+
+ int bytes_segmented = 0;
+
+ Name name(name_prefix_);
+
+ if (!suffix.empty()) {
+ name.append(suffix);
+ }
+
+ size_t bytes_occupied_by_name = name.size();
+
+ int digestSize = default_values::digest_size; // SHA_256 as default
+ int signatureSize = default_values::signature_size;
+ uint64_t free_space_for_content = 0;
+
+ free_space_for_content = data_packet_size_ - bytes_occupied_by_name - digestSize - default_values::limit_guard;
+
+ uint64_t number_of_segments = uint64_t(std::ceil(double(buffer_size) / double(free_space_for_content)));
+
+ if (free_space_for_content * number_of_segments < buffer_size) {
+ number_of_segments++;
+ }
+
+ uint64_t current_segment = 0;
+
+ if (seq_number_map_.find(name.toString()) != seq_number_map_.end()
+ && seq_number_map_[name.toString()].find(response_id) != seq_number_map_[name.toString()].end()) {
+ current_segment = seq_number_map_[name.toString()][response_id];
+ } else {
+ seq_number_map_[name.toString()][response_id] = current_segment;
+ }
+
+ if (making_manifest_) {
+
+ std::shared_ptr<ContentObject> content_object_segment;
+ std::shared_ptr<Manifest> manifest_segment;
+ bool manifest_segment_needed = true;
+
+ uint64_t free_space_for_manifest =
+ data_packet_size_ - bytes_occupied_by_name - signatureSize - default_values::limit_guard;
+
+ for (unsigned int packaged_segments = 0; packaged_segments < number_of_segments;) {
+
+ if (manifest_segment_needed) {
+
+ Name manifest_name(name_prefix_);
+
+ if (!suffix.empty()) {
+ manifest_name.append(suffix);
+ }
+
+ manifest_name.appendSegment(current_segment);
+
+ if (manifest_segment) {
+ manifest_segment->encode();
+ passContentObjectToCallbacks(manifest_segment);
+ }
+
+ manifest_segment = std::make_shared<Manifest>(manifest_name);
+
+ if (is_last) {
+ manifest_segment->setFinalChunkNumber(current_segment + number_of_segments - packaged_segments);
+ }
+
+ // finalSegment = current_segment;
+ manifest_segment_needed = false;
+ current_segment++;
+
+ key_locator_.clear();
+ key_locator_.setName(const_cast<Name &>(manifest_segment->getName()));
+ }
+
+ Name full_name = name;
+
+ content_object_segment = std::make_shared<ContentObject>(std::move(full_name.appendSegment(current_segment)));
+ // content_object_segment->setExpiryTime((uint64_t) m_dataFreshness);
+
+ if (packaged_segments == number_of_segments - 1) {
+ content_object_segment->setContent(&buf[bytes_segmented], buffer_size - bytes_segmented);
+ bytes_segmented += buffer_size - bytes_segmented;
+ } else {
+ content_object_segment->setContent(&buf[bytes_segmented], free_space_for_content);
+ bytes_segmented += free_space_for_content;
+ }
+
+ if (is_last) {
+ content_object_segment->setFinalChunkNumber(current_segment + number_of_segments - packaged_segments - 1);
+ }
+
+ passContentObjectToCallbacks(content_object_segment);
+
+ size_t manifestSize = manifest_segment->estimateManifestSize();
+ Name &content_object_name = (Name &) content_object_segment->getName();
+ size_t fullNameSize = content_object_name.size();
+
+ // TODO Signature
+
+ if (manifestSize + 2 * fullNameSize > free_space_for_manifest) {
+ manifest_segment_needed = true;
+ }
+
+ // TODO Manifest and hashes!
+ // Array block = content_object_segment->getContent();
+ // icn-interface::ConstBufferPtr implicitDigest = icn-interface::crypto::sha256(block.data(), block.size());
+ //
+ // //add implicit digest to the manifest
+ // manifest_segment->addNameToCatalogue(Name(std::to_string(current_segment)), implicitDigest->buf(),
+ // implicitDigest->size());
+
+ packaged_segments++;
+ current_segment++;
+
+ if (packaged_segments == number_of_segments) {
+ manifest_segment->encode();
+ passContentObjectToCallbacks(manifest_segment);
+ }
+ }
+ } else {
+
+ for (unsigned int packaged_segments = 0; packaged_segments < number_of_segments; packaged_segments++) {
+ Name fullName = name;
+
+ std::shared_ptr<ContentObject>
+ content_object = std::make_shared<ContentObject>(std::move(fullName.appendSegment(current_segment)));
+
+ // TODO If we set the throughput will decrease.. to investigate
+ // content_object->setExpiryTime((uint64_t)m_dataFreshness);
+
+ if (is_last) {
+ content_object->setFinalChunkNumber(current_segment + number_of_segments - packaged_segments - 1);
+ }
+
+ if (packaged_segments == number_of_segments - 1) {
+ content_object->setContent(&buf[bytes_segmented], buffer_size - bytes_segmented);
+ bytes_segmented += buffer_size - bytes_segmented;
+ } else {
+ content_object->setContent(&buf[bytes_segmented], free_space_for_content);
+ bytes_segmented += free_space_for_content;
+ }
+
+ current_segment++;
+ passContentObjectToCallbacks(content_object);
+ }
+ }
+
+ seq_number_map_[name.toString()][response_id] = current_segment;
+
+ if (is_last) {
+ seq_number_map_[name.toString()].erase(response_id);
+ if (seq_number_map_[name.toString()].empty()) {
+ seq_number_map_.erase(name.toString());
+ }
+ }
+}
+
+void ProducerSocket::asyncProduce(ContentObject &content_object) {
+ std::shared_ptr<ContentObject> c_object = std::make_shared<ContentObject>(content_object);
+ std::thread t([c_object, this]() { produce(*c_object); });
+ t.detach();
+}
+
+void ProducerSocket::asyncProduce(Name suffix,
+ const uint8_t *buf,
+ size_t buffer_size,
+ const int response_id,
+ bool is_last) {
+ std::thread t([suffix, buf, buffer_size, response_id, is_last, this]() {
+ produce(suffix, buf, buffer_size, response_id, is_last);
+ });
+ t.detach();
+}
+
+void ProducerSocket::onInterest(const Name &name, const Interest &interest) {
+ if (on_interest_input_ != VOID_HANDLER) {
+ on_interest_input_(*this, interest);
+ }
+
+ const std::shared_ptr<ContentObject> &content_object = output_buffer_.find(interest);
+
+ if (content_object) {
+
+ if (on_interest_satisfied_output_buffer_ != VOID_HANDLER) {
+ on_interest_satisfied_output_buffer_(*this, interest);
+ }
+
+ if (on_content_object_output_ != VOID_HANDLER) {
+ on_content_object_output_(*this, *content_object);
+ }
+
+ portal_->sendContentObject(*content_object);
+ } else {
+ if (on_interest_process_ != VOID_HANDLER) {
+ on_interest_process_(*this, interest);
+ }
+ }
+}
+
+int ProducerSocket::setSocketOption(int socket_option_key, int socket_option_value) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::DATA_PACKET_SIZE:
+ if (socket_option_value < default_values::max_content_object_size && socket_option_value > 0) {
+ data_packet_size_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+ } else {
+ return SOCKET_OPTION_NOT_SET;
+ }
+
+ case GeneralTransportOptions::INPUT_BUFFER_SIZE:
+ if (socket_option_value >= 1) {
+ input_buffer_capacity_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+ } else {
+ return SOCKET_OPTION_NOT_SET;
+ }
+
+ case GeneralTransportOptions::OUTPUT_BUFFER_SIZE:
+ if (socket_option_value >= 0) {
+ output_buffer_.setLimit(socket_option_value);
+ return SOCKET_OPTION_SET;
+ } else {
+ return SOCKET_OPTION_NOT_SET;
+ }
+
+ case GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME:
+ content_object_expiry_time_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+
+ case GeneralTransportOptions::SIGNATURE_TYPE:
+ if (socket_option_value == SOCKET_OPTION_DEFAULT) {
+ signature_type_ = SHA_256;
+ } else {
+ signature_type_ = socket_option_value;
+ }
+
+ if (signature_type_ == SHA_256 || signature_type_ == RSA_256) {
+ signature_size_ = 32;
+ }
+
+ case ProducerCallbacksOptions::INTEREST_INPUT:
+ if (socket_option_value == VOID_HANDLER) {
+ on_interest_input_ = VOID_HANDLER;
+ return SOCKET_OPTION_SET;
+ }
+
+ case ProducerCallbacksOptions::INTEREST_DROP:
+ if (socket_option_value == VOID_HANDLER) {
+ on_interest_dropped_input_buffer_ = VOID_HANDLER;
+ return SOCKET_OPTION_SET;
+ }
+
+ case ProducerCallbacksOptions::INTEREST_PASS:
+ if (socket_option_value == VOID_HANDLER) {
+ on_interest_inserted_input_buffer_ = VOID_HANDLER;
+ return SOCKET_OPTION_SET;
+ }
+
+ case ProducerCallbacksOptions::CACHE_HIT:
+ if (socket_option_value == VOID_HANDLER) {
+ on_interest_satisfied_output_buffer_ = VOID_HANDLER;
+ return SOCKET_OPTION_SET;
+ }
+
+ case ProducerCallbacksOptions::CACHE_MISS:
+ if (socket_option_value == VOID_HANDLER) {
+ on_interest_process_ = VOID_HANDLER;
+ return SOCKET_OPTION_SET;
+ }
+
+ case ProducerCallbacksOptions::NEW_CONTENT_OBJECT:
+ if (socket_option_value == VOID_HANDLER) {
+ on_new_segment_ = VOID_HANDLER;
+ return SOCKET_OPTION_SET;
+ }
+
+ case ProducerCallbacksOptions::CONTENT_OBJECT_SIGN:
+ if (socket_option_value == VOID_HANDLER) {
+ on_content_object_to_sign_ = VOID_HANDLER;
+ return SOCKET_OPTION_SET;
+ }
+
+ case ProducerCallbacksOptions::CONTENT_OBJECT_READY:
+ if (socket_option_value == VOID_HANDLER) {
+ on_content_object_in_output_buffer_ = VOID_HANDLER;
+ return SOCKET_OPTION_SET;
+ }
+
+ case ProducerCallbacksOptions::CONTENT_OBJECT_OUTPUT:
+ if (socket_option_value == VOID_HANDLER) {
+ on_content_object_output_ = VOID_HANDLER;
+ return SOCKET_OPTION_SET;
+ }
+
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+}
+
+int ProducerSocket::setSocketOption(int socket_option_key, double socket_option_value) {
+ return SOCKET_OPTION_NOT_SET;
+}
+
+int ProducerSocket::setSocketOption(int socket_option_key, bool socket_option_value) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::MAKE_MANIFEST:
+ making_manifest_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+}
+
+int ProducerSocket::setSocketOption(int socket_option_key, Name socket_option_value) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::NAME_PREFIX:
+ name_prefix_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+}
+
+int ProducerSocket::setSocketOption(int socket_option_key, ProducerContentObjectCallback socket_option_value) {
+ switch (socket_option_key) {
+ case ProducerCallbacksOptions::NEW_CONTENT_OBJECT:
+ on_new_segment_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+
+ case ProducerCallbacksOptions::CONTENT_OBJECT_SIGN:
+ on_content_object_to_sign_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+
+ case ProducerCallbacksOptions::CONTENT_OBJECT_READY:
+ on_content_object_in_output_buffer_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+
+ case ProducerCallbacksOptions::CONTENT_OBJECT_OUTPUT:
+ on_content_object_output_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+}
+
+int ProducerSocket::setSocketOption(int socket_option_key, ProducerInterestCallback socket_option_value) {
+ switch (socket_option_key) {
+ case ProducerCallbacksOptions::INTEREST_INPUT:
+ on_interest_input_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+
+ case ProducerCallbacksOptions::INTEREST_DROP:
+ on_interest_dropped_input_buffer_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+
+ case ProducerCallbacksOptions::INTEREST_PASS:
+ on_interest_inserted_input_buffer_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+
+ case ProducerCallbacksOptions::CACHE_HIT:
+ on_interest_satisfied_output_buffer_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+
+ case ProducerCallbacksOptions::CACHE_MISS:
+ on_interest_process_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+}
+
+int ProducerSocket::setSocketOption(int socket_option_key, ConsumerContentObjectCallback socket_option_value) {
+ return SOCKET_OPTION_NOT_SET;
+}
+
+int ProducerSocket::setSocketOption(int socket_option_key,
+ ConsumerContentObjectVerificationCallback socket_option_value) {
+ return SOCKET_OPTION_NOT_SET;
+}
+
+int ProducerSocket::setSocketOption(int socket_option_key, ConsumerInterestCallback socket_option_value) {
+ return SOCKET_OPTION_NOT_SET;
+}
+
+int ProducerSocket::setSocketOption(int socket_option_key, ConsumerContentCallback socket_option_value) {
+ return SOCKET_OPTION_NOT_SET;
+}
+
+int ProducerSocket::setSocketOption(int socket_option_key, ConsumerManifestCallback socket_option_value) {
+ return SOCKET_OPTION_NOT_SET;
+}
+
+int ProducerSocket::setSocketOption(int socket_option_key, KeyLocator socket_option_value) {
+ return SOCKET_OPTION_NOT_SET;
+}
+
+int ProducerSocket::getSocketOption(int socket_option_key, int &socket_option_value) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::INPUT_BUFFER_SIZE:
+ socket_option_value = input_buffer_capacity_;
+ return SOCKET_OPTION_GET;
+
+ case GeneralTransportOptions::OUTPUT_BUFFER_SIZE:
+ socket_option_value = output_buffer_.getLimit();
+ return SOCKET_OPTION_GET;
+
+ case GeneralTransportOptions::DATA_PACKET_SIZE:
+ socket_option_value = data_packet_size_;
+ return SOCKET_OPTION_GET;
+
+ case GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME:
+ socket_option_value = content_object_expiry_time_;
+ return SOCKET_OPTION_GET;
+
+ case GeneralTransportOptions::SIGNATURE_TYPE:
+ socket_option_value = signature_type_;
+ return SOCKET_OPTION_GET;
+
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+}
+
+int ProducerSocket::getSocketOption(int socket_option_key, double &socket_option_value) {
+ return SOCKET_OPTION_NOT_GET;
+}
+
+int ProducerSocket::getSocketOption(int socket_option_key, bool &socket_option_value) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::MAKE_MANIFEST:
+ socket_option_value = making_manifest_;
+ return SOCKET_OPTION_GET;
+
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+}
+
+int ProducerSocket::getSocketOption(int socket_option_key, Name &socket_option_value) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::NAME_PREFIX:
+ socket_option_value = name_prefix_;
+ return SOCKET_OPTION_GET;
+
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+}
+
+int ProducerSocket::getSocketOption(int socket_option_key, ProducerContentObjectCallback &socket_option_value) {
+ switch (socket_option_key) {
+ case ProducerCallbacksOptions::NEW_CONTENT_OBJECT:
+ socket_option_value = on_new_segment_;
+ return SOCKET_OPTION_GET;
+
+ case ProducerCallbacksOptions::CONTENT_OBJECT_SIGN:
+ socket_option_value = on_content_object_to_sign_;
+ return SOCKET_OPTION_GET;
+
+ case ProducerCallbacksOptions::CONTENT_OBJECT_READY:
+ socket_option_value = on_content_object_in_output_buffer_;
+ return SOCKET_OPTION_GET;
+
+ case ProducerCallbacksOptions::CONTENT_OBJECT_OUTPUT:
+ socket_option_value = on_content_object_output_;
+ return SOCKET_OPTION_GET;
+
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+}
+
+int ProducerSocket::getSocketOption(int socket_option_key, ProducerInterestCallback &socket_option_value) {
+ switch (socket_option_key) {
+ case ProducerCallbacksOptions::INTEREST_INPUT:
+ socket_option_value = on_interest_input_;
+ return SOCKET_OPTION_GET;
+
+ case ProducerCallbacksOptions::INTEREST_DROP:
+ socket_option_value = on_interest_dropped_input_buffer_;
+ return SOCKET_OPTION_GET;
+
+ case ProducerCallbacksOptions::INTEREST_PASS:
+ socket_option_value = on_interest_inserted_input_buffer_;
+ return SOCKET_OPTION_GET;
+
+ case CACHE_HIT:
+ socket_option_value = on_interest_satisfied_output_buffer_;
+ return SOCKET_OPTION_GET;
+
+ case CACHE_MISS:
+ socket_option_value = on_interest_process_;
+ return SOCKET_OPTION_GET;
+
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+}
+
+int ProducerSocket::getSocketOption(int socket_option_key, ConsumerContentObjectCallback &socket_option_value) {
+ return SOCKET_OPTION_NOT_GET;
+}
+
+int ProducerSocket::getSocketOption(int socket_option_key,
+ ConsumerContentObjectVerificationCallback &socket_option_value) {
+ return SOCKET_OPTION_NOT_GET;
+}
+
+int ProducerSocket::getSocketOption(int socket_option_key, ConsumerInterestCallback &socket_option_value) {
+ return SOCKET_OPTION_NOT_GET;
+}
+
+int ProducerSocket::getSocketOption(int socket_option_key, ConsumerContentCallback &socket_option_value) {
+ return SOCKET_OPTION_NOT_GET;
+}
+
+int ProducerSocket::getSocketOption(int socket_option_key, ConsumerManifestCallback &socket_option_value) {
+ return SOCKET_OPTION_NOT_GET;
+}
+
+int ProducerSocket::setSocketOption(int socket_option_key, size_t socket_option_value) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::INPUT_BUFFER_SIZE:
+ if (input_buffer_capacity_ >= 1) {
+ input_buffer_capacity_ = socket_option_value;
+ return SOCKET_OPTION_SET;
+ }
+
+ default:
+ return SOCKET_OPTION_NOT_SET;
+ }
+}
+
+int ProducerSocket::getSocketOption(int socket_option_key, size_t &socket_option_value) {
+ switch (socket_option_key) {
+ case GeneralTransportOptions::INPUT_BUFFER_SIZE:
+ socket_option_value = input_buffer_capacity_;
+ return SOCKET_OPTION_GET;
+
+ case GeneralTransportOptions::OUTPUT_BUFFER_SIZE:
+ socket_option_value = output_buffer_.size();
+ return SOCKET_OPTION_GET;
+
+ default:
+ return SOCKET_OPTION_NOT_GET;
+ }
+}
+
+int ProducerSocket::getSocketOption(int socket_option_key, KeyLocator &socket_option_value) {
+ return SOCKET_OPTION_NOT_GET;
+}
+
+int ProducerSocket::getSocketOption(int socket_option_key, std::shared_ptr<Portal> &socket_option_value) {
+ switch (socket_option_key) {
+ case PORTAL:
+ socket_option_value = portal_;
+ return SOCKET_OPTION_GET;
+ }
+
+ return SOCKET_OPTION_NOT_GET;
+}
+
+int ProducerSocket::getSocketOption(int socket_option_key, IcnObserver **socket_option_value) {
+ return SOCKET_OPTION_NOT_GET;
+}
+
+int ProducerSocket::setSocketOption(int socket_option_key, IcnObserver *socket_option_value) {
+ return SOCKET_OPTION_NOT_SET;
+}
+
+} // end namespace icnet
diff --git a/icnet/transport/icnet_socket_producer.h b/icnet/transport/icnet_socket_producer.h
new file mode 100644
index 00000000..d709e305
--- /dev/null
+++ b/icnet/transport/icnet_socket_producer.h
@@ -0,0 +1,172 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ICNET_PRODUCER_SOCKET_H_
+#define ICNET_PRODUCER_SOCKET_H_
+
+#include "icnet_socket.h"
+#include "icnet_content_store.h"
+
+#include <queue>
+#include <mutex>
+#include <atomic>
+#include <thread>
+
+#define REGISTRATION_NOT_ATTEMPTED 0
+#define REGISTRATION_SUCCESS 1
+#define REGISTRATION_FAILURE 2
+#define REGISTRATION_IN_PROGRESS 3
+
+namespace icnet {
+
+class ProducerSocket : public Socket {
+ public:
+
+ explicit ProducerSocket(Name prefix);
+
+ ~ProducerSocket();
+
+ void attach();
+
+ void dispatch();
+
+ void produce(Name suffix, const uint8_t *buffer, size_t buffer_size, const int request_id = 0, bool is_last = false);
+
+ void produce(ContentObject &content_object);
+
+ void asyncProduce(Name suffix, const uint8_t *buf, size_t buffer_size, const int response_id, bool is_last);
+
+ void asyncProduce(ContentObject &content_object);
+
+ void serveForever();
+
+ void onInterest(const Name &name, const Interest &interest);
+
+ int setSocketOption(int socket_option_key, int socket_option_value);
+
+ int setSocketOption(int socket_option_key, double socket_option_value);
+
+ int setSocketOption(int socket_option_key, bool socket_option_value);
+
+ int setSocketOption(int socket_option_key, size_t socket_option_value);
+
+ int setSocketOption(int socket_option_key, Name socket_option_value);
+
+ int setSocketOption(int socket_option_key, ProducerContentObjectCallback socket_option_value);
+
+ int setSocketOption(int socket_option_key, ProducerInterestCallback socket_option_value);
+
+ int setSocketOption(int socket_option_key, ConsumerContentObjectVerificationCallback socket_option_value);
+
+ int setSocketOption(int socket_option_key, ConsumerContentObjectCallback socket_option_value);
+
+ int setSocketOption(int socket_option_key, ConsumerInterestCallback socket_option_value);
+
+ int setSocketOption(int socket_option_key, ConsumerContentCallback socket_option_value);
+
+ int setSocketOption(int socket_option_key, ConsumerManifestCallback socket_option_value);
+
+ int setSocketOption(int socket_option_key, KeyLocator socket_option_value);
+
+ int setSocketOption(int socket_option_key, IcnObserver *obs);
+
+ int getSocketOption(int socket_option_key, int &socket_option_value);
+
+ int getSocketOption(int socket_option_key, double &socket_option_value);
+
+ int getSocketOption(int socket_option_key, bool &socket_option_value);
+
+ int getSocketOption(int socket_option_key, size_t &socket_option_value);
+
+ int getSocketOption(int socket_option_key, Name &socket_option_value);
+
+ int getSocketOption(int socket_option_key, ProducerContentObjectCallback &socket_option_value);
+
+ int getSocketOption(int socket_option_key, ProducerInterestCallback &socket_option_value);
+
+ int getSocketOption(int socket_option_key, ConsumerContentObjectVerificationCallback &socket_option_value);
+
+ int getSocketOption(int socket_option_key, ConsumerContentObjectCallback &socket_option_value);
+
+ int getSocketOption(int socket_option_key, ConsumerInterestCallback &socket_option_value);
+
+ int getSocketOption(int socket_option_key, ConsumerContentCallback &socket_option_value);
+
+ int getSocketOption(int socket_option_key, ConsumerManifestCallback &socket_option_value);
+
+ int getSocketOption(int socket_option_key, KeyLocator &socket_option_value);
+
+ int getSocketOption(int socket_option_key, std::shared_ptr<Portal> &socket_option_value);
+
+ int getSocketOption(int socket_option_key, IcnObserver **socket_option_value);
+
+ private:
+
+ std::shared_ptr<Portal> portal_;
+ boost::asio::io_service io_service_;
+
+ Name name_prefix_;
+
+ int data_packet_size_;
+ int content_object_expiry_time_;
+ int registration_status_;
+
+ bool making_manifest_;
+
+ // map for storing sequence numbers for several calls of the publish function
+ std::unordered_map<std::string, std::unordered_map<int, uint64_t>> seq_number_map_;
+
+ int signature_type_;
+ int signature_size_;
+ int key_locator_size_;
+ KeyLocator key_locator_;
+
+ // buffers
+ ContentStore output_buffer_;
+
+ std::queue<std::shared_ptr<const Interest> > input_buffer_;
+ std::mutex input_buffer_mutex_;
+ std::atomic_size_t input_buffer_capacity_;
+ std::atomic_size_t input_buffer_size_;
+
+ // threads
+ std::thread listening_thread_;
+ std::thread processing_thread_;
+ volatile bool processing_thread_stop_;
+ volatile bool listening_thread_stop_;
+
+ // callbacks
+ ProducerInterestCallback on_interest_input_;
+ ProducerInterestCallback on_interest_dropped_input_buffer_;
+ ProducerInterestCallback on_interest_inserted_input_buffer_;
+ ProducerInterestCallback on_interest_satisfied_output_buffer_;
+ ProducerInterestCallback on_interest_process_;
+
+ ProducerContentObjectCallback on_new_segment_;
+ ProducerContentObjectCallback on_content_object_to_sign_;
+ ProducerContentObjectCallback on_content_object_in_output_buffer_;
+ ProducerContentObjectCallback on_content_object_output_;
+ ProducerContentObjectCallback on_content_object_evicted_from_output_buffer_;
+
+ private:
+ void listen();
+
+ void passContentObjectToCallbacks(const std::shared_ptr<ContentObject> &content_object);
+
+};
+
+}
+
+#endif // ICNET_PRODUCER_SOCKET_H_
diff --git a/icnet/transport/icnet_transport.cc b/icnet/transport/icnet_transport.cc
new file mode 100644
index 00000000..c85f02b4
--- /dev/null
+++ b/icnet/transport/icnet_transport.cc
@@ -0,0 +1,31 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "icnet_transport.h"
+
+namespace icnet {
+
+TransportProtocol::TransportProtocol(Socket *icn_socket) : socket_(icn_socket), is_running_(false) {
+}
+
+void TransportProtocol::updatePortal() {
+ socket_->getSocketOption(PORTAL, portal_);
+}
+
+bool TransportProtocol::isRunning() {
+ return is_running_;
+}
+
+} // end namespace icnet
diff --git a/icnet/transport/icnet_transport.h b/icnet/transport/icnet_transport.h
new file mode 100644
index 00000000..738634fb
--- /dev/null
+++ b/icnet/transport/icnet_transport.h
@@ -0,0 +1,43 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ICNET_TRANSPORT_PROTOCOL_H_
+#define ICNET_TRANSPORT_PROTOCOL_H_
+
+#include "icnet_socket.h"
+
+namespace icnet {
+
+class TransportProtocol {
+ public:
+ TransportProtocol(Socket *icn_socket);
+
+ void updatePortal();
+
+ bool isRunning();
+
+ virtual void start() = 0;
+
+ virtual void stop() = 0;
+
+ protected:
+ Socket *socket_;
+ std::shared_ptr<Portal> portal_;
+ bool is_running_;
+};
+
+}
+
+#endif // ICNET_TRANSPORT_PROTOCOL_H_
diff --git a/icnet/transport/icnet_transport_raaqm.cc b/icnet/transport/icnet_transport_raaqm.cc
new file mode 100644
index 00000000..9a1b77b4
--- /dev/null
+++ b/icnet/transport/icnet_transport_raaqm.cc
@@ -0,0 +1,466 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "icnet_transport_raaqm.h"
+
+namespace icnet {
+
+RaaqmTransportProtocol::RaaqmTransportProtocol(Socket *icnet_socket)
+ : VegasTransportProtocol(icnet_socket), rate_estimator_(NULL) {
+ init();
+}
+
+RaaqmTransportProtocol::~RaaqmTransportProtocol() {
+ if (this->rate_estimator_) {
+ delete this->rate_estimator_;
+ }
+}
+
+void RaaqmTransportProtocol::init() {
+ std::ifstream is(RAAQM_CONFIG_PATH);
+
+ std::string line;
+
+ socket_->setSocketOption(RaaqmTransportOptions::BETA_VALUE, default_values::beta_value);
+ socket_->setSocketOption(RaaqmTransportOptions::DROP_FACTOR, default_values::drop_factor);
+ socket_->setSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, default_values::interest_lifetime);
+ socket_->setSocketOption(ConsumerCallbacksOptions::INTEREST_RETRANSMISSION,
+ default_values::transport_protocol_max_retransmissions);
+ raaqm_autotune_ = false;
+ default_beta_ = default_values::beta_value;
+ default_drop_ = default_values::drop_factor;
+ beta_wifi_ = default_values::beta_value;
+ drop_wifi_ = default_values::drop_factor;
+ beta_lte_ = default_values::beta_value;
+ drop_lte_ = default_values::drop_factor;
+ wifi_delay_ = 1000;
+ lte_delay_ = 15000;
+ avg_rtt_ = 0.0;
+
+ if (!is) {
+ std::cout << "WARNING: RAAQM parameters not found, set default values" << std::endl;
+ return;
+ }
+
+ std::cout << "Setting RAAQM parameters:" << std::endl;
+ while (getline(is, line)) {
+ std::string command;
+ std::istringstream line_s(line);
+
+ line_s >> command;
+
+ if (command == ";") {
+ continue;
+ }
+
+ if (command == "autotune") {
+ std::string tmp;
+ std::string val;
+ line_s >> tmp >> val;
+ if (val == "yes") {
+ raaqm_autotune_ = true;
+ } else {
+ raaqm_autotune_ = false;
+ }
+ std::cout << "params: autotune = " << raaqm_autotune_ << std::endl;
+ continue;
+ }
+
+ if (command == "lifetime") {
+ std::string tmp;
+ int lifetime;
+ line_s >> tmp >> lifetime;
+ std::cout << "params: lifetime = " << lifetime << std::endl;
+ socket_->setSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, lifetime);
+ continue;
+ }
+
+ if (command == "retransmissions") {
+ std::string tmp;
+ int rtx;
+ line_s >> tmp >> rtx;
+ std::cout << "params: retransmissions = " << rtx << std::endl;
+ socket_->setSocketOption(ConsumerCallbacksOptions::INTEREST_RETRANSMISSION, rtx);
+ continue;
+ }
+
+ if (command == "beta") {
+ std::string tmp;
+ line_s >> tmp >> default_beta_;
+ std::cout << "params: beta = " << default_beta_ << std::endl;
+ socket_->setSocketOption(RaaqmTransportOptions::BETA_VALUE, default_beta_);
+ continue;
+ }
+
+ if (command == "drop") {
+ std::string tmp;
+ line_s >> tmp >> default_drop_;
+ std::cout << "params: drop = " << default_drop_ << std::endl;
+ socket_->setSocketOption(RaaqmTransportOptions::DROP_FACTOR, default_drop_);
+ continue;
+ }
+
+ if (command == "beta_wifi_") {
+ std::string tmp;
+ line_s >> tmp >> beta_wifi_;
+ std::cout << "params: beta_wifi_ = " << beta_wifi_ << std::endl;
+ continue;
+ }
+
+ if (command == "drop_wifi_") {
+ std::string tmp;
+ line_s >> tmp >> drop_wifi_;
+ std::cout << "params: drop_wifi_ = " << drop_wifi_ << std::endl;
+ continue;
+ }
+
+ if (command == "beta_lte_") {
+ std::string tmp;
+ line_s >> tmp >> beta_lte_;
+ std::cout << "params: beta_lte_ = " << beta_lte_ << std::endl;
+ continue;
+ }
+
+ if (command == "drop_lte_") {
+ std::string tmp;
+ line_s >> tmp >> drop_lte_;
+ std::cout << "params: drop_lte_ = " << drop_lte_ << std::endl;
+ continue;
+ }
+
+ if (command == "wifi_delay_") {
+ std::string tmp;
+ line_s >> tmp >> wifi_delay_;
+ std::cout << "params: wifi_delay_ = " << wifi_delay_ << std::endl;
+ continue;
+ }
+
+ if (command == "lte_delay_") {
+ std::string tmp;
+ line_s >> tmp >> lte_delay_;
+ std::cout << "params: lte_delay_ = " << lte_delay_ << std::endl;
+ continue;
+ }
+ if (command == "alpha") {
+ std::string tmp;
+ double rate_alpha = 0.0;
+ line_s >> tmp >> rate_alpha;
+ socket_->setSocketOption(RateEstimationOptions::RATE_ESTIMATION_ALPHA, rate_alpha);
+ std::cout << "params: alpha = " << rate_alpha << std::endl;
+ continue;
+ }
+
+ if (command == "batching_parameter") {
+ std::string tmp;
+ int batching_param = 0;
+ line_s >> tmp >> batching_param;
+ socket_->setSocketOption(RateEstimationOptions::RATE_ESTIMATION_BATCH_PARAMETER, batching_param);
+ std::cout << "params: batching = " << batching_param << std::endl;
+ continue;
+ }
+
+ if (command == "rate_estimator") {
+ std::string tmp;
+ int choice_param = 0;
+ line_s >> tmp >> choice_param;
+ socket_->setSocketOption(RateEstimationOptions::RATE_ESTIMATION_CHOICE, choice_param);
+ std::cout << "params: choice = " << choice_param << std::endl;
+ continue;
+ }
+ }
+ is.close();
+ std::cout << "init done" << std::endl;
+}
+
+void RaaqmTransportProtocol::reset() {
+ is_final_block_number_discovered_ = false;
+ final_block_number_ = std::numeric_limits<uint64_t>::max();
+ segment_number_ = 0;
+ interests_in_flight_ = 0;
+ last_reassembled_segment_ = 0;
+ content_buffer_size_ = 0;
+ content_buffer_.clear();
+ interest_retransmissions_.clear();
+ receive_buffer_.clear();
+ unverified_segments_.clear();
+ verified_manifests_.clear();
+}
+
+void RaaqmTransportProtocol::start() {
+ if (this->rate_estimator_) {
+ this->rate_estimator_->onStart();
+ }
+
+ if (!cur_path_) {
+
+ double drop_factor;
+ double minimum_drop_probability;
+ int sample_number;
+ int interest_lifetime;
+ double beta;
+
+ socket_->getSocketOption(DROP_FACTOR, drop_factor);
+ socket_->getSocketOption(MINIMUM_DROP_PROBABILITY, minimum_drop_probability);
+ socket_->getSocketOption(SAMPLE_NUMBER, sample_number);
+ socket_->getSocketOption(INTEREST_LIFETIME, interest_lifetime);
+ socket_->getSocketOption(BETA_VALUE, beta);
+
+ std::cout << "Drop Factor: " << drop_factor << std::endl;
+ std::cout << "Minimum drop prob: " << minimum_drop_probability << std::endl;
+ std::cout << "Sample number: " << sample_number << std::endl;
+ std::cout << "lifetime: " << interest_lifetime << std::endl;
+ std::cout << "beta: " << beta << std::endl;
+
+ double alpha = 0.0;
+ int batching_param = 0;
+ int choice_param = 0;
+ socket_->getSocketOption(RATE_ESTIMATION_ALPHA, alpha);
+ socket_->getSocketOption(RATE_ESTIMATION_BATCH_PARAMETER, batching_param);
+ socket_->getSocketOption(RATE_ESTIMATION_CHOICE, choice_param);
+ if (choice_param == 1) {
+ this->rate_estimator_ = new ALaTcpEstimator();
+ } else {
+ this->rate_estimator_ = new SimpleEstimator(alpha, batching_param);
+ }
+
+ socket_->getSocketOption(RATE_ESTIMATION_OBSERVER, &(this->rate_estimator_->observer_));
+
+ cur_path_ =
+ std::make_shared<RaaqmDataPath>(drop_factor, minimum_drop_probability, interest_lifetime * 1000, sample_number);
+ path_table_[default_values::path_id] = cur_path_;
+ }
+
+ VegasTransportProtocol::start();
+}
+
+void RaaqmTransportProtocol::copyContent(ContentObject &content_object) {
+ if ((content_object.getName().get(-1).toSegment() == final_block_number_) || !(is_running_)) {
+ this->rate_estimator_->onDownloadFinished();
+ }
+ VegasTransportProtocol::copyContent(content_object);
+}
+
+void RaaqmTransportProtocol::updatePathTable(const ContentObject &content_object) {
+ unsigned char path_id = content_object.getPathLabel();
+
+ if (path_table_.find(path_id) == path_table_.end()) {
+ if (cur_path_) {
+ // Create a new path with some default param
+ if (path_table_.empty()) {
+ std::cerr << "No path initialized for path table, error could be in default path initialization." << std::endl;
+ exit(EXIT_FAILURE);
+ } else {
+ // Initiate the new path default param
+ std::shared_ptr<RaaqmDataPath>
+ new_path = std::make_shared<RaaqmDataPath>(*(path_table_.at(default_values::path_id)));
+ // Insert the new path into hash table
+ path_table_[path_id] = new_path;
+ }
+ } else {
+ std::cerr << "UNEXPECTED ERROR: when running,current path not found." << std::endl;
+ exit(EXIT_FAILURE);
+ }
+ }
+
+ cur_path_ = path_table_[path_id];
+
+ size_t packet_size = content_object.getPacketSize();
+ size_t data_size = content_object.getContent().size();
+
+ // Update measurements for path
+ cur_path_->updateReceivedStats(packet_size, data_size);
+}
+
+void RaaqmTransportProtocol::updateRtt(uint64_t segment) {
+ if (!cur_path_) {
+ throw std::runtime_error("ERROR: no current path found, exit");
+ } else {
+ std::chrono::microseconds rtt;
+
+ std::chrono::steady_clock::duration duration =
+ std::chrono::steady_clock::now() - interest_timepoints_[segment % default_values::default_buffer_size];
+ rtt = std::chrono::duration_cast<std::chrono::microseconds>(duration);
+ if (this->rate_estimator_) {
+ this->rate_estimator_->onRttUpdate(rtt.count());
+ }
+ cur_path_->insertNewRtt(rtt.count());
+ cur_path_->smoothTimer();
+
+ avg_rtt_ = (avg_rtt_ * 0.99) + ((double) rtt.count() * 0.01);
+ if (cur_path_->newPropagationDelayAvailable()) {
+ check_drop_probability();
+ }
+ }
+}
+
+void RaaqmTransportProtocol::changeInterestLifetime(uint64_t segment) {
+ return;
+}
+
+void RaaqmTransportProtocol::check_drop_probability() {
+ if (!raaqm_autotune_) {
+ return;
+ }
+
+ unsigned int max_pd = 0;
+ std::unordered_map<unsigned char, std::shared_ptr<RaaqmDataPath >>::iterator it;
+ for (it = path_table_.begin(); it != path_table_.end(); ++it) {
+ if (it->second->getPropagationDelay() > max_pd && it->second->getPropagationDelay() != UINT_MAX
+ && !it->second->isStale()) {
+ max_pd = it->second->getPropagationDelay();
+ }
+ }
+
+ double drop_prob = 0;
+ double beta = 0;
+ if (max_pd < wifi_delay_) { //only ethernet paths
+ drop_prob = default_drop_;
+ beta = default_beta_;
+ } else if (max_pd < lte_delay_) { //at least one wifi path
+ drop_prob = drop_wifi_;
+ beta = beta_wifi_;
+ } else { //at least one lte path
+ drop_prob = drop_lte_;
+ beta = beta_lte_;
+ }
+
+ double old_drop_prob = 0;
+ double old_beta = 0;
+ socket_->getSocketOption(BETA_VALUE, old_beta);
+ socket_->getSocketOption(DROP_FACTOR, old_drop_prob);
+
+ if (drop_prob == old_drop_prob && beta == old_beta) {
+ return;
+ }
+
+ std::cout << "*************[RAAQM TUNING] new beta = " << beta << " new drop = " << drop_prob << " max pd = "
+ << max_pd << std::endl;
+
+ socket_->setSocketOption(BETA_VALUE, beta);
+ socket_->setSocketOption(DROP_FACTOR, drop_prob);
+
+ for (it = path_table_.begin(); it != path_table_.end(); it++) {
+ it->second->setDropProb(drop_prob);
+ }
+}
+
+void RaaqmTransportProtocol::check_for_stale_paths() {
+ if (!raaqm_autotune_) {
+ return;
+ }
+
+ bool stale = false;
+ std::unordered_map<unsigned char, std::shared_ptr<RaaqmDataPath >>::iterator it;
+ for (it = path_table_.begin(); it != path_table_.end(); ++it) {
+ if (it->second->isStale()) {
+ stale = true;
+ break;
+ }
+ }
+ if (stale) {
+ check_drop_probability();
+ }
+}
+
+void RaaqmTransportProtocol::onTimeout(const Interest &interest) {
+ check_for_stale_paths();
+ VegasTransportProtocol::onTimeout(interest);
+}
+
+void RaaqmTransportProtocol::increaseWindow() {
+ double max_window_size = -1;
+ socket_->getSocketOption(MAX_WINDOW_SIZE, max_window_size);
+ if (current_window_size_ < max_window_size) // don't expand window above max level
+ {
+ double gamma = -1;
+ socket_->getSocketOption(GAMMA_VALUE, gamma);
+
+ current_window_size_ += gamma / current_window_size_;
+ socket_->setSocketOption(CURRENT_WINDOW_SIZE, current_window_size_);
+ }
+ this->rate_estimator_->onWindowIncrease(current_window_size_);
+}
+
+void RaaqmTransportProtocol::decreaseWindow() {
+ double min_window_size = -1;
+ socket_->getSocketOption(MIN_WINDOW_SIZE, min_window_size);
+ if (current_window_size_ > min_window_size) // don't shrink window below minimum level
+ {
+ double beta = -1;
+ socket_->getSocketOption(BETA_VALUE, beta);
+
+ current_window_size_ = current_window_size_ * beta;
+ if (current_window_size_ < min_window_size) {
+ current_window_size_ = min_window_size;
+ }
+
+ socket_->setSocketOption(CURRENT_WINDOW_SIZE, current_window_size_);
+ }
+ this->rate_estimator_->onWindowDecrease(current_window_size_);
+}
+
+void RaaqmTransportProtocol::RAAQM() {
+ if (!cur_path_) {
+ std::cerr << "ERROR: no current path found, exit" << std::endl;
+ exit(EXIT_FAILURE);
+ } else {
+ // Change drop probability according to RTT statistics
+ cur_path_->updateDropProb();
+
+ if (rand() % 10000 <= cur_path_->getDropProb() * 10000) {
+ decreaseWindow();
+ }
+ }
+}
+
+void RaaqmTransportProtocol::afterDataUnsatisfied(uint64_t segment) {
+ // Decrease the window because the timeout happened
+ decreaseWindow();
+}
+
+void RaaqmTransportProtocol::afterContentReception(const Interest &interest, const ContentObject &content_object) {
+ updatePathTable(content_object);
+ increaseWindow();
+ updateRtt(interest.getName().get(-1).toSegment());
+ this->rate_estimator_->onDataReceived((int) content_object.getPacketSize());
+ // Set drop probablility and window size accordingly
+ RAAQM();
+}
+
+void RaaqmTransportProtocol::checkForFastRetransmission(const Interest &interest) {
+}
+
+#if 0
+void
+RaaqmTransportProtocol::onInterest(const Interest &interest)
+{
+ bool mobility = interest.get_MobilityLossFlag();
+
+ if(mobility){
+ const Name &name = interest.getName();
+ uint64_t segment = name[-1].toSegment();
+ timeval now;
+ gettimeofday(&now, 0);
+ std::cout << (long) now.tv_sec << "." << (unsigned) now.tv_usec << " RAAQM: M-Interest " <<
+ segment << " " << interest.getName() << std::endl;
+ NackSet::iterator it = m_nackSet.find(segment);
+ if(it == m_nackSet.end()){
+ m_nackSet.insert(segment);
+ }
+ }
+}
+#endif
+
+} // end namespace icnet
diff --git a/icnet/transport/icnet_transport_raaqm.h b/icnet/transport/icnet_transport_raaqm.h
new file mode 100644
index 00000000..dc5e72bd
--- /dev/null
+++ b/icnet/transport/icnet_transport_raaqm.h
@@ -0,0 +1,100 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ICNET_RAAQM_TRANSPORT_PROTOCOL_H_
+#define ICNET_RAAQM_TRANSPORT_PROTOCOL_H_
+
+#include "icnet_transport_vegas.h"
+#include "icnet_transport_vegas_rto_estimator.h"
+#include "icnet_transport_raaqm_data_path.h"
+#include "icnet_rate_estimation.h"
+
+namespace icnet {
+
+class RaaqmTransportProtocol : public VegasTransportProtocol {
+ public:
+ RaaqmTransportProtocol(Socket *icnet_socket);
+
+ ~RaaqmTransportProtocol();
+
+ void start();
+
+ protected:
+ void copyContent(ContentObject &content_object);
+
+ private:
+
+ void init();
+
+ void reset();
+
+ void afterContentReception(const Interest &interest, const ContentObject &content_object);
+
+ void afterDataUnsatisfied(uint64_t segment);
+
+ void increaseWindow();
+
+ void updateRtt(uint64_t segment);
+
+ void decreaseWindow();
+
+ void changeInterestLifetime(uint64_t segment);
+
+ void onTimeout(const Interest &interest);
+
+ void RAAQM();
+
+ void updatePathTable(const ContentObject &content_object);
+
+ void checkForFastRetransmission(const Interest &interest);
+
+ void check_drop_probability();
+
+ void check_for_stale_paths();
+
+ void printRtt();
+
+ /**
+ * Current download path
+ */
+ std::shared_ptr<RaaqmDataPath> cur_path_;
+
+ /**
+ * Hash table for path: each entry is a pair path ID(key) - path object
+ */
+ std::unordered_map<unsigned char, std::shared_ptr<RaaqmDataPath>> path_table_;
+
+ bool set_interest_filter_;
+ //for rate-estimation at packet level
+ IcnRateEstimator *rate_estimator_;
+
+ //params for autotuning
+ bool raaqm_autotune_;
+ double default_beta_;
+ double default_drop_;
+ double beta_wifi_;
+ double drop_wifi_;
+ double beta_lte_;
+ double drop_lte_;
+ unsigned int wifi_delay_;
+ unsigned int lte_delay_;
+
+ //RTT stats
+ double avg_rtt_;
+};
+
+} // end namespace icnet
+
+#endif // ICNET_RAAQM_TRANSPORT_PROTOCOL_H_
diff --git a/icnet/transport/icnet_transport_raaqm_data_path.cc b/icnet/transport/icnet_transport_raaqm_data_path.cc
new file mode 100644
index 00000000..dfef6762
--- /dev/null
+++ b/icnet/transport/icnet_transport_raaqm_data_path.cc
@@ -0,0 +1,197 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "icnet_transport_raaqm_data_path.h"
+
+namespace icnet {
+
+RaaqmDataPath::RaaqmDataPath(double drop_factor,
+ double minimum_drop_probability,
+ unsigned new_timer,
+ unsigned int samples,
+ uint64_t new_rtt,
+ uint64_t new_rtt_min,
+ uint64_t new_rtt_max,
+ unsigned new_pd)
+
+ : drop_factor_(drop_factor),
+ minimum_drop_probability_(minimum_drop_probability),
+ timer_(new_timer),
+ samples_(samples),
+ rtt_(new_rtt),
+ rtt_min_(new_rtt_min),
+ rtt_max_(new_rtt_max),
+ prop_delay_(new_pd),
+ new_prop_delay_(false),
+ drop_prob_(0),
+ packets_received_(0),
+ last_packets_received_(0),
+ m_packets_bytes_received_(0),
+ last_packets_bytes_received_(0),
+ raw_data_bytes_received_(0),
+ last_raw_data_bytes_received_(0),
+ average_rtt_(0),
+ alpha_(ALPHA) {
+ gettimeofday(&previous_call_of_path_reporter_, 0);
+ gettimeofday(&m_last_received_pkt_, 0);
+}
+
+RaaqmDataPath &RaaqmDataPath::pathReporter() {
+ struct timeval now;
+ gettimeofday(&now, 0);
+
+ double rate, delta_t;
+
+ delta_t = getMicroSeconds(now) - getMicroSeconds(previous_call_of_path_reporter_);
+ rate = (m_packets_bytes_received_ - last_packets_bytes_received_) * 8 / delta_t; // MB/s
+ std::cout << "RaaqmDataPath status report: " << "at time " << (long) now.tv_sec << "." << (unsigned) now.tv_usec
+ << " sec:\n" << (void *) this << " path\n" << "Packets Received: "
+ << (packets_received_ - last_packets_received_) << "\n" << "delta_t " << delta_t << " [us]\n" << "rate "
+ << rate << " [Mbps]\n" << "Last RTT " << rtt_ << " [us]\n" << "Max RTT " << rtt_max_ << " [us]\n"
+ << "Min RTT " << rtt_min_ << " [us]\n" << "Prop delay " << prop_delay_ << " [us]\n";
+ last_packets_received_ = packets_received_;
+ last_packets_bytes_received_ = m_packets_bytes_received_;
+ gettimeofday(&previous_call_of_path_reporter_, 0);
+
+ return *this;
+}
+
+RaaqmDataPath &RaaqmDataPath::insertNewRtt(uint64_t new_rtt) {
+ rtt_ = new_rtt;
+ rtt_samples_.get<byArrival>().push_back(new_rtt);
+
+ if (rtt_samples_.get<byArrival>().size() > samples_) {
+ rtt_samples_.get<byArrival>().pop_front();
+ }
+
+ rtt_max_ = *(rtt_samples_.get<byOrder>().rbegin());
+ rtt_min_ = *(rtt_samples_.get<byOrder>().begin());
+
+ if (rtt_min_ < prop_delay_) {
+ new_prop_delay_ = true;
+ prop_delay_ = rtt_min_;
+ }
+
+ gettimeofday(&m_last_received_pkt_, 0);
+
+ return *this;
+}
+
+RaaqmDataPath &RaaqmDataPath::updateReceivedStats(std::size_t packet_size, std::size_t data_size) {
+ packets_received_++;
+ m_packets_bytes_received_ += packet_size;
+ raw_data_bytes_received_ += data_size;
+
+ return *this;
+}
+
+double RaaqmDataPath::getDropFactor() {
+ return drop_factor_;
+}
+
+double RaaqmDataPath::getDropProb() {
+ return drop_prob_;
+}
+
+RaaqmDataPath &RaaqmDataPath::setDropProb(double dropProb) {
+ drop_prob_ = dropProb;
+
+ return *this;
+}
+
+double RaaqmDataPath::getMinimumDropProbability() {
+ return minimum_drop_probability_;
+}
+
+double RaaqmDataPath::getTimer() {
+ return timer_;
+}
+
+RaaqmDataPath &RaaqmDataPath::smoothTimer() {
+ timer_ = (1 - TIMEOUT_SMOOTHER) * timer_ + (TIMEOUT_SMOOTHER) * rtt_ * (TIMEOUT_RATIO);
+
+ return *this;
+}
+
+double RaaqmDataPath::getRtt() {
+ return rtt_;
+}
+
+double RaaqmDataPath::getAverageRtt() {
+ return average_rtt_;
+}
+
+double RaaqmDataPath::getRttMax() {
+ return rtt_max_;
+}
+
+double RaaqmDataPath::getRttMin() {
+ return rtt_min_;
+}
+
+unsigned RaaqmDataPath::getSampleValue() {
+ return samples_;
+}
+
+unsigned RaaqmDataPath::getRttQueueSize() {
+ return rtt_samples_.get<byArrival>().size();
+}
+
+RaaqmDataPath &RaaqmDataPath::updateDropProb() {
+
+ drop_prob_ = 0.0;
+
+ if (getSampleValue() == getRttQueueSize()) {
+ if (rtt_max_ == rtt_min_) {
+ drop_prob_ = minimum_drop_probability_;
+ } else {
+ drop_prob_ = minimum_drop_probability_ + drop_factor_ * (rtt_ - rtt_min_) / (rtt_max_ - rtt_min_);
+ }
+ }
+
+ return *this;
+}
+
+double RaaqmDataPath::getMicroSeconds(struct timeval &time) {
+ return (double) (time.tv_sec) * 1000000 + (double) (time.tv_usec);
+}
+
+void RaaqmDataPath::setAlpha(double alpha) {
+ if (alpha >= 0 && alpha <= 1) {
+ alpha_ = alpha;
+ }
+}
+
+bool RaaqmDataPath::newPropagationDelayAvailable() {
+ bool r = new_prop_delay_;
+ new_prop_delay_ = false;
+ return r;
+}
+
+unsigned int RaaqmDataPath::getPropagationDelay() {
+ return prop_delay_;
+}
+
+bool RaaqmDataPath::isStale() {
+ struct timeval now;
+ gettimeofday(&now, 0);
+ double time = getMicroSeconds(now) - getMicroSeconds(m_last_received_pkt_);
+ if (time > 2000000) {
+ return true;
+ }
+ return false;
+}
+
+} // end namespace icnet
diff --git a/icnet/transport/icnet_transport_raaqm_data_path.h b/icnet/transport/icnet_transport_raaqm_data_path.h
new file mode 100644
index 00000000..0093f84b
--- /dev/null
+++ b/icnet/transport/icnet_transport_raaqm_data_path.h
@@ -0,0 +1,242 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ICNET_RAAQM_DATA_PATH_H_
+#define ICNET_RAAQM_DATA_PATH_H_
+
+#include <sys/time.h>
+
+#include <boost/multi_index_container.hpp>
+#include <boost/multi_index/ordered_index.hpp>
+#include <boost/multi_index/sequenced_index.hpp>
+#include <boost/foreach.hpp>
+#include <iostream>
+#include <climits>
+
+#define TIMEOUT_SMOOTHER 0.1
+#define TIMEOUT_RATIO 10
+#define ALPHA 0.8
+
+namespace icnet {
+
+class RaaqmDataPath {
+ public:
+
+ RaaqmDataPath(double drop_factor,
+ double minimum_drop_probability,
+ unsigned new_timer,
+ unsigned int samples,
+ uint64_t new_rtt = 1000,
+ uint64_t new_rtt_min = 1000,
+ uint64_t new_rtt_max = 1000,
+ unsigned new_pd = UINT_MAX);
+
+ public:
+
+ /*
+ * @brief Print Path Status
+ */
+ RaaqmDataPath &pathReporter();
+
+ /*
+ * @brief Add a new RTT to the RTT queue of the path, check if RTT queue is full, and thus need overwrite.
+ * Also it maintains the validity of min and max of RTT.
+ * @param new_rtt is the value of the new RTT
+ */
+ RaaqmDataPath &insertNewRtt(uint64_t new_rtt);
+
+ /**
+ * @brief Update the path statistics
+ * @param packet_size the size of the packet received, including the ICN header
+ * @param data_size the size of the data received, without the ICN header
+ */
+ RaaqmDataPath &updateReceivedStats(std::size_t packet_size, std::size_t data_size);
+
+ /**
+ * @brief Get the value of the drop factor parameter
+ */
+ double getDropFactor();
+
+ /**
+ * @brief Get the value of the drop probability
+ */
+ double getDropProb();
+
+ /**
+ * @brief Set the value pf the drop probability
+ * @param drop_prob is the value of the drop probability
+ */
+ RaaqmDataPath &setDropProb(double drop_prob);
+
+ /**
+ * @brief Get the minimum drop probability
+ */
+ double getMinimumDropProbability();
+
+ /**
+ * @brief Get last RTT
+ */
+ double getRtt();
+
+ /**
+ * @brief Get average RTT
+ */
+ double getAverageRtt();
+
+ /**
+ * @brief Get the current m_timer value
+ */
+ double getTimer();
+
+ /**
+ * @brief Smooth he value of the m_timer accordingly with the last RTT measured
+ */
+ RaaqmDataPath &smoothTimer();
+
+ /**
+ * @brief Get the maximum RTT among the last samples
+ */
+ double getRttMax();
+
+ /**
+ * @brief Get the minimum RTT among the last samples
+ */
+ double getRttMin();
+
+ /**
+ * @brief Get the number of saved samples
+ */
+ unsigned getSampleValue();
+
+ /**
+ * @brief Get the size og the RTT queue
+ */
+ unsigned getRttQueueSize();
+
+ /*
+ * @brief Change drop probability according to RTT statistics
+ * Invoked in RAAQM(), before control window size update.
+ */
+ RaaqmDataPath &updateDropProb();
+
+ /**
+ * @brief This function convert the time from struct timeval to its value in microseconds
+ */
+ static double getMicroSeconds(struct timeval &time);
+
+ void setAlpha(double alpha);
+
+ /**
+ * @brief Returns the smallest RTT registered so far for this path
+ */
+
+ unsigned int getPropagationDelay();
+
+ bool newPropagationDelayAvailable();
+
+ bool isStale();
+
+ private:
+
+ /**
+ * The value of the drop factor
+ */
+ double drop_factor_;
+
+ /**
+ * The minumum drop probability
+ */
+ double minimum_drop_probability_;
+
+ /**
+ * The timer, expressed in milliseconds
+ */
+ double timer_;
+
+ /**
+ * The number of samples to store for computing the protocol measurements
+ */
+ const unsigned int samples_;
+
+ /**
+ * The last, the minimum and the maximum value of the RTT (among the last m_samples samples)
+ */
+ uint64_t rtt_, rtt_min_, rtt_max_, prop_delay_;
+
+ bool new_prop_delay_;
+
+ /**
+ * The current drop probability
+ */
+ double drop_prob_;
+
+ /**
+ * The number of packets received in this path
+ */
+ intmax_t packets_received_;
+
+ /**
+ * The first packet received after the statistics print
+ */
+ intmax_t last_packets_received_;
+
+ /**
+ * Total number of bytes received including the ICN header
+ */
+ intmax_t m_packets_bytes_received_;
+
+ /**
+ * The amount of packet bytes received at the last path summary computation
+ */
+ intmax_t last_packets_bytes_received_;
+
+ /**
+ * Total number of bytes received without including the ICN header
+ */
+ intmax_t raw_data_bytes_received_;
+
+ /**
+ * The amount of raw dat bytes received at the last path summary computation
+ */
+ intmax_t last_raw_data_bytes_received_;
+
+ class byArrival;
+
+ class byOrder;
+
+ /**
+ * Double ended queue for the RTTs
+ */
+ typedef boost::multi_index_container<unsigned, boost::multi_index::indexed_by<
+ // by arrival (FIFO)
+ boost::multi_index::sequenced<boost::multi_index::tag<byArrival> >,
+ // index by ascending order
+ boost::multi_index::ordered_non_unique<boost::multi_index::tag<byOrder>,
+ boost::multi_index::identity<unsigned> > > > RTTQueue;
+ RTTQueue rtt_samples_;
+
+ /**
+ * Time of the last call to the path reporter method
+ */
+ struct timeval previous_call_of_path_reporter_;
+ struct timeval m_last_received_pkt_;
+
+ double average_rtt_;
+ double alpha_;
+};
+
+} // end namespace icnet
+
+#endif // ICNET_RAAQM_DATA_PATH_H_
diff --git a/icnet/transport/icnet_transport_vegas.cc b/icnet/transport/icnet_transport_vegas.cc
new file mode 100644
index 00000000..fae29bf8
--- /dev/null
+++ b/icnet/transport/icnet_transport_vegas.cc
@@ -0,0 +1,488 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "icnet_transport_vegas.h"
+#include "icnet_socket_consumer.h"
+
+namespace icnet {
+
+VegasTransportProtocol::VegasTransportProtocol(Socket *icnet_socket)
+ : TransportProtocol(icnet_socket),
+ is_final_block_number_discovered_(false),
+ final_block_number_(std::numeric_limits<uint64_t>::max()),
+ last_reassembled_segment_(0),
+ content_buffer_size_(0),
+ current_window_size_(default_values::min_window_size),
+ interests_in_flight_(0),
+ segment_number_(0),
+ interest_retransmissions_(default_values::max_window_size),
+ interest_timepoints_(default_values::default_buffer_size),
+ receive_buffer_(default_values::default_buffer_size),
+ unverified_segments_(default_values::default_buffer_size),
+ verified_manifests_(default_values::default_buffer_size) {
+ icnet_socket->getSocketOption(PORTAL, portal_);
+}
+
+VegasTransportProtocol::~VegasTransportProtocol() {
+ stop();
+}
+
+void VegasTransportProtocol::start() {
+ is_running_ = true;
+ is_final_block_number_discovered_ = false;
+ final_block_number_ = std::numeric_limits<uint64_t>::max();
+ segment_number_ = 0;
+ interests_in_flight_ = 0;
+ last_reassembled_segment_ = 0;
+ content_buffer_size_ = 0;
+ content_buffer_.clear();
+ interest_retransmissions_.clear();
+ receive_buffer_.clear();
+ unverified_segments_.clear();
+ verified_manifests_.clear();
+
+ sendInterest();
+
+ bool isAsync = false;
+ socket_->getSocketOption(ASYNC_MODE, isAsync);
+
+ bool isContextRunning = false;
+ socket_->getSocketOption(RUNNING, isContextRunning);
+
+ if (!isAsync && !isContextRunning) {
+ socket_->setSocketOption(RUNNING, true);
+ portal_->runEventsLoop();
+
+ // If portal returns, the download (maybe) is finished, so we can remove the pending interests
+
+ removeAllPendingInterests();
+ }
+}
+
+// TODO Reuse this function for sending an arbitrary interest
+void VegasTransportProtocol::sendInterest() {
+ Name prefix;
+ socket_->getSocketOption(GeneralTransportOptions::NAME_PREFIX, prefix);
+
+ Name suffix;
+ socket_->getSocketOption(GeneralTransportOptions::NAME_SUFFIX, suffix);
+
+ if (!suffix.empty()) {
+ prefix.append(suffix);
+ }
+
+ prefix.appendSegment(segment_number_);
+
+ std::shared_ptr<Interest> interest = std::make_shared<Interest>(std::move(prefix));
+
+ int interestLifetime = default_values::interest_lifetime;
+ socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, interestLifetime);
+ interest->setInterestLifetime(uint32_t(interestLifetime));
+
+ ConsumerInterestCallback on_interest_output = VOID_HANDLER;
+
+ socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT, on_interest_output);
+ if (on_interest_output != VOID_HANDLER) {
+ on_interest_output(*dynamic_cast<ConsumerSocket *>(socket_), *interest);
+ }
+
+ if (!is_running_) {
+ return;
+ }
+
+ interests_in_flight_++;
+ interest_retransmissions_[segment_number_ % default_values::default_buffer_size] = 0;
+ interest_timepoints_[segment_number_ % default_values::default_buffer_size] = std::chrono::steady_clock::now();
+
+ portal_->sendInterest(*interest,
+ bind(&VegasTransportProtocol::onContentSegment, this, _1, _2),
+ bind(&VegasTransportProtocol::onTimeout, this, _1));
+ segment_number_++;
+}
+
+void VegasTransportProtocol::stop() {
+ is_running_ = false;
+ portal_->stopEventsLoop();
+}
+
+void VegasTransportProtocol::onContentSegment(const Interest &interest, ContentObject &content_object) {
+ uint64_t segment = interest.getName().get(-1).toSegment();
+
+ if (is_running_ == false /*|| input_buffer_[segment]*/) {
+ return;
+ }
+
+ interests_in_flight_--;
+
+ changeInterestLifetime(segment);
+ ConsumerContentObjectCallback on_data_input = VOID_HANDLER;
+ socket_->getSocketOption(ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT, on_data_input);
+ if (on_data_input != VOID_HANDLER) {
+ on_data_input(*dynamic_cast<ConsumerSocket *>(socket_), content_object);
+ }
+
+ ConsumerInterestCallback on_interest_satisfied = VOID_HANDLER;
+ socket_->getSocketOption(INTEREST_SATISFIED, on_interest_satisfied);
+ if (on_interest_satisfied != VOID_HANDLER) {
+ on_interest_satisfied(*dynamic_cast<ConsumerSocket *>(socket_), const_cast<Interest &>(interest));
+ }
+
+ if (content_object.getContentType() == PayloadType::MANIFEST) {
+ onManifest(interest, content_object);
+ } else if (content_object.getContentType() == PayloadType::DATA) {
+ onContentObject(interest, content_object);
+ } // TODO InterestReturn
+
+ scheduleNextInterests();
+}
+
+void VegasTransportProtocol::afterContentReception(const Interest &interest, const ContentObject &content_object) {
+ increaseWindow();
+}
+
+void VegasTransportProtocol::afterDataUnsatisfied(uint64_t segment) {
+ decreaseWindow();
+}
+
+void VegasTransportProtocol::scheduleNextInterests() {
+ if (segment_number_ == 0) {
+ current_window_size_ = final_block_number_;
+
+ double maxWindowSize = -1;
+ socket_->getSocketOption(MAX_WINDOW_SIZE, maxWindowSize);
+
+ if (current_window_size_ > maxWindowSize) {
+ current_window_size_ = maxWindowSize;
+ }
+
+ while (interests_in_flight_ < current_window_size_) {
+ if (is_final_block_number_discovered_) {
+ if (segment_number_ <= final_block_number_) {
+ sendInterest();
+ } else {
+ break;
+ }
+ } else {
+ sendInterest();
+ }
+ }
+ } else {
+ if (is_running_) {
+ while (interests_in_flight_ < current_window_size_) {
+ if (is_final_block_number_discovered_) {
+ if (segment_number_ <= final_block_number_) {
+ sendInterest();
+ } else {
+ break;
+ }
+ } else {
+ sendInterest();
+ }
+ }
+ }
+ }
+}
+
+void VegasTransportProtocol::decreaseWindow() {
+ double min_window_size = -1;
+ socket_->getSocketOption(MIN_WINDOW_SIZE, min_window_size);
+ if (current_window_size_ > min_window_size) {
+ current_window_size_ = std::ceil(current_window_size_ / 2);
+
+ socket_->setSocketOption(CURRENT_WINDOW_SIZE, current_window_size_);
+ }
+}
+
+void VegasTransportProtocol::increaseWindow() {
+ double max_window_size = -1;
+ socket_->getSocketOption(MAX_WINDOW_SIZE, max_window_size);
+ if (current_window_size_ < max_window_size) // don't expand window above max level
+ {
+ current_window_size_++;
+ socket_->setSocketOption(CURRENT_WINDOW_SIZE, current_window_size_);
+ }
+};
+
+void VegasTransportProtocol::changeInterestLifetime(uint64_t segment) {
+ std::chrono::steady_clock::duration duration = std::chrono::steady_clock::now() - interest_timepoints_[segment];
+ rtt_estimator_.addMeasurement(std::chrono::duration_cast<std::chrono::microseconds>(duration));
+
+ RtoEstimator::Duration rto = rtt_estimator_.computeRto();
+ std::chrono::milliseconds lifetime = std::chrono::duration_cast<std::chrono::milliseconds>(rto);
+
+ socket_->setSocketOption(INTEREST_LIFETIME, (int) lifetime.count());
+}
+
+void VegasTransportProtocol::onManifest(const Interest &interest, ContentObject &content_object) {
+ if (!is_running_) {
+ return;
+ }
+
+ if (verifyManifest(content_object)) {
+ // TODO Retrieve piece of data using manifest
+ }
+}
+
+bool VegasTransportProtocol::verifyManifest(ContentObject &content_object) {
+ ConsumerContentObjectVerificationCallback on_manifest_to_verify = VOID_HANDLER;
+ socket_->getSocketOption(ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY, on_manifest_to_verify);
+
+ bool is_data_secure = false;
+
+ if (on_manifest_to_verify == VOID_HANDLER) {
+ // TODO Perform manifest verification
+ } else if (on_manifest_to_verify(*dynamic_cast<ConsumerSocket *>(socket_), content_object)) {
+ is_data_secure = true;
+ }
+
+ return is_data_secure;
+}
+
+bool VegasTransportProtocol::requireInterestWithHash(const Interest &interest,
+ const ContentObject &content_object,
+ Manifest &manifest) {
+ // TODO Require content object with specific hash.
+ return true;
+}
+
+// TODO Add the name in the digest computation!
+void VegasTransportProtocol::onContentObject(const Interest &interest, ContentObject &content_object) {
+ if (verifyContentObject(interest, content_object)) {
+ checkForFastRetransmission(interest);
+
+ uint64_t segment = interest.getName().get(-1).toSegment();
+
+ if (interest_retransmissions_[segment % default_values::default_buffer_size] == 0) {
+ afterContentReception(interest, content_object);
+ }
+
+ if (content_object.hasFinalChunkNumber()) {
+ is_final_block_number_discovered_ = true;
+ final_block_number_ = content_object.getFinalChunkNumber();
+ }
+
+ bool virtualDownload = false;
+
+ socket_->getSocketOption(VIRTUAL_DOWNLOAD, virtualDownload);
+
+ if (!virtualDownload) {
+ receive_buffer_[segment % default_values::default_buffer_size] = content_object.shared_from_this();
+ reassemble();
+ } else {
+ if (segment == final_block_number_) {
+ portal_->stopEventsLoop();
+ }
+ }
+ }
+}
+
+bool VegasTransportProtocol::verifyContentObject(const Interest &interest, ContentObject &content_object) {
+ // TODO Check content object using manifest
+ return true;
+}
+
+// TODO move inside manifest
+bool VegasTransportProtocol::pointsToManifest(ContentObject &content_object) {
+ // TODO Check content objects using manifest
+ return true;
+}
+
+void VegasTransportProtocol::onTimeout(const Interest &interest) {
+ if (!is_running_) {
+ return;
+ }
+
+ interests_in_flight_--;
+
+ std::cerr << "Timeout on " << interest.getName() << std::endl;
+
+ ConsumerInterestCallback on_interest_timeout = VOID_HANDLER;
+ socket_->getSocketOption(INTEREST_EXPIRED, on_interest_timeout);
+ if (on_interest_timeout != VOID_HANDLER) {
+ on_interest_timeout(*dynamic_cast<ConsumerSocket *>(socket_), const_cast<Interest &>(interest));
+ }
+
+ uint64_t segment = interest.getName().get(-1).toSegment();
+
+ // Do not retransmit interests asking contents that do not exist.
+ if (is_final_block_number_discovered_) {
+ if (segment > final_block_number_) {
+ return;
+ }
+ }
+
+ afterDataUnsatisfied(segment);
+
+ int max_retransmissions;
+ socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_RETRANSMISSION, max_retransmissions);
+
+ if (interest_retransmissions_[segment % default_values::default_buffer_size] < max_retransmissions) {
+
+ ConsumerInterestCallback on_interest_retransmission = VOID_HANDLER;
+ socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_RETRANSMISSION, on_interest_retransmission);
+
+ if (on_interest_retransmission != VOID_HANDLER) {
+ on_interest_retransmission(*dynamic_cast<ConsumerSocket *>(socket_), interest);
+ }
+
+ ConsumerInterestCallback on_interest_output = VOID_HANDLER;
+
+ socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT, on_interest_output);
+ if (on_interest_output != VOID_HANDLER) {
+ on_interest_output(*dynamic_cast<ConsumerSocket *>(socket_), interest);
+ }
+
+ if (!is_running_) {
+ return;
+ }
+
+ //retransmit
+ interests_in_flight_++;
+ interest_retransmissions_[segment % default_values::default_buffer_size]++;
+
+ portal_->sendInterest(interest,
+ bind(&VegasTransportProtocol::onContentSegment, this, _1, _2),
+ bind(&VegasTransportProtocol::onTimeout, this, _1));
+ } else {
+ is_running_ = false;
+
+ bool virtual_download = false;
+ socket_->getSocketOption(VIRTUAL_DOWNLOAD, virtual_download);
+
+ if (!virtual_download) {
+ reassemble();
+ }
+
+ portal_->stopEventsLoop();
+ }
+
+}
+
+void VegasTransportProtocol::copyContent(ContentObject &content_object) {
+ Array a = content_object.getContent();
+
+ content_buffer_.insert(content_buffer_.end(), (uint8_t *) a.data(), (uint8_t *) a.data() + a.size());
+
+ if ((content_object.getName().get(-1).toSegment() == final_block_number_) || (!is_running_)) {
+
+ // return content to the user
+ ConsumerContentCallback on_payload = VOID_HANDLER;
+ socket_->getSocketOption(CONTENT_RETRIEVED, on_payload);
+ if (on_payload != VOID_HANDLER) {
+ on_payload(*dynamic_cast<ConsumerSocket *>(socket_),
+ (uint8_t *) (content_buffer_.data()),
+ content_buffer_.size());
+ }
+
+ //reduce window size to prevent its speculative growth in case when consume() is called in loop
+ int current_window_size = -1;
+ socket_->getSocketOption(CURRENT_WINDOW_SIZE, current_window_size);
+ if ((uint64_t) current_window_size > final_block_number_) {
+ socket_->setSocketOption(CURRENT_WINDOW_SIZE, (int) (final_block_number_));
+ }
+
+ is_running_ = false;
+ portal_->stopEventsLoop();
+ }
+}
+
+void VegasTransportProtocol::reassemble() {
+ uint64_t index = last_reassembled_segment_ % default_values::default_buffer_size;
+
+ while (receive_buffer_[index % default_values::default_buffer_size]) {
+ if (receive_buffer_[index % default_values::default_buffer_size]->getContentType() == PayloadType::DATA) {
+ copyContent(*receive_buffer_[index % default_values::default_buffer_size]);
+ }
+
+ receive_buffer_[index % default_values::default_buffer_size].reset();
+
+ last_reassembled_segment_++;
+ index = last_reassembled_segment_ % default_values::default_buffer_size;
+ }
+}
+
+bool VegasTransportProtocol::verifySegmentUsingManifest(Manifest &manifestSegment, ContentObject &content_object) {
+ // TODO Content object verification exploiting manifest
+ return true;
+}
+
+void VegasTransportProtocol::checkForFastRetransmission(const Interest &interest) {
+ uint64_t segNumber = interest.getName().get(-1).toSegment();
+ received_segments_[segNumber] = true;
+ fast_retransmitted_segments.erase(segNumber);
+
+ uint64_t possibly_lost_segment = 0;
+ uint64_t highest_received_segment = received_segments_.rbegin()->first;
+
+ for (uint64_t i = 0; i <= highest_received_segment; i++) {
+ if (received_segments_.find(i) == received_segments_.end()) {
+ if (fast_retransmitted_segments.find(i) == fast_retransmitted_segments.end()) {
+ possibly_lost_segment = i;
+ uint8_t out_of_order_segments = 0;
+ for (uint64_t j = i; j <= highest_received_segment; j++) {
+ if (received_segments_.find(j) != received_segments_.end()) {
+ out_of_order_segments++;
+ if (out_of_order_segments >= default_values::max_out_of_order_segments) {
+ fast_retransmitted_segments[possibly_lost_segment] = true;
+ fastRetransmit(interest, possibly_lost_segment);
+ }
+ }
+ }
+ }
+ }
+ }
+}
+
+void VegasTransportProtocol::fastRetransmit(const Interest &interest, uint64_t chunk_number) {
+ int max_retransmissions;
+ socket_->getSocketOption(GeneralTransportOptions::MAX_INTEREST_RETX, max_retransmissions);
+
+ if (interest_retransmissions_[chunk_number % default_values::default_buffer_size] < max_retransmissions) {
+ Name name = interest.getName().getPrefix(-1);
+ name.appendSegment(chunk_number);
+
+ std::shared_ptr<Interest> retx_interest = std::make_shared<Interest>(name);
+
+ ConsumerInterestCallback on_interest_retransmission = VOID_HANDLER;
+ socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_RETRANSMISSION, on_interest_retransmission);
+
+ if (on_interest_retransmission != VOID_HANDLER) {
+ on_interest_retransmission(*dynamic_cast<ConsumerSocket *>(socket_), *retx_interest);
+ }
+
+ ConsumerInterestCallback on_interest_output = VOID_HANDLER;
+
+ socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT, on_interest_output);
+ if (on_interest_output != VOID_HANDLER) {
+ on_interest_output(*dynamic_cast<ConsumerSocket *>(socket_), *retx_interest);
+ }
+
+ if (!is_running_) {
+ return;
+ }
+
+ interests_in_flight_++;
+ interest_retransmissions_[chunk_number % default_values::default_buffer_size]++;
+ portal_->sendInterest(*retx_interest,
+ bind(&VegasTransportProtocol::onContentSegment, this, _1, _2),
+ bind(&VegasTransportProtocol::onTimeout, this, _1));
+ }
+}
+
+void VegasTransportProtocol::removeAllPendingInterests() {
+ portal_->clear();
+}
+
+} // namespace icn-interface
diff --git a/icnet/transport/icnet_transport_vegas.h b/icnet/transport/icnet_transport_vegas.h
new file mode 100644
index 00000000..a47050d8
--- /dev/null
+++ b/icnet/transport/icnet_transport_vegas.h
@@ -0,0 +1,109 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ICNET_VEGAS_TRANSPORT_PROTOCOL_H_
+#define ICNET_VEGAS_TRANSPORT_PROTOCOL_H_
+
+#include "icnet_transport.h"
+#include "icnet_transport_vegas_rto_estimator.h"
+
+namespace icnet {
+
+class VegasTransportProtocol : public TransportProtocol {
+ public:
+
+ VegasTransportProtocol(Socket *icnet_socket);
+
+ virtual ~VegasTransportProtocol();
+
+ virtual void start();
+
+ void stop();
+
+ protected:
+
+ void sendInterest();
+
+ void onContentSegment(const Interest &interest, ContentObject &content_object);
+
+ bool verifyContentObject(const Interest &interest, ContentObject &content_object);
+
+ bool verifyManifest(ContentObject &content_object);
+
+ virtual void onTimeout(const Interest &interest);
+
+ void onManifest(const Interest &interest, ContentObject &content_object);
+
+ void onContentObject(const Interest &interest, ContentObject &content_object);
+
+ virtual void changeInterestLifetime(uint64_t segment);
+
+ void scheduleNextInterests();
+
+ virtual void decreaseWindow();
+
+ virtual void increaseWindow();
+
+ virtual void afterContentReception(const Interest &interest, const ContentObject &content_object);
+
+ virtual void afterDataUnsatisfied(uint64_t segment);
+
+ void reassemble();
+
+ virtual void copyContent(ContentObject &content_object);
+
+ bool pointsToManifest(ContentObject &content_object);
+
+ bool requireInterestWithHash(const Interest &interest, const ContentObject &content_object, Manifest &manifest);
+
+ bool verifySegmentUsingManifest(Manifest &manifestSegment, ContentObject &content_object);
+
+ virtual void checkForFastRetransmission(const Interest &interest);
+
+ void fastRetransmit(const Interest &interest, uint64_t chunk_number);
+
+ void removeAllPendingInterests();
+
+ protected:
+ // reassembly variables
+ bool is_final_block_number_discovered_;
+ uint64_t final_block_number_;
+ uint64_t last_reassembled_segment_;
+ std::vector<uint8_t> content_buffer_;
+ size_t content_buffer_size_;
+
+ // transmission variables
+ double current_window_size_;
+ double pending_window_size_;
+ uint64_t interests_in_flight_;
+ uint64_t segment_number_;
+ std::vector<int> interest_retransmissions_;
+ std::vector<std::chrono::steady_clock::time_point> interest_timepoints_;
+ RtoEstimator rtt_estimator_;
+
+ // buffers
+ std::vector<std::shared_ptr<ContentObject> > receive_buffer_; // verified segments by segment number
+ std::vector<std::shared_ptr<ContentObject> > unverified_segments_; // used with embedded manifests
+ std::vector<std::shared_ptr<Manifest> > verified_manifests_; // by segment number
+
+ // Fast Retransmission
+ std::map<uint64_t, bool> received_segments_;
+ std::unordered_map<uint64_t, bool> fast_retransmitted_segments;
+};
+
+} // end namespace icnet
+
+
+#endif // ICNET_VEGAS_TRANSPORT_PROTOCOL_H_
diff --git a/icnet/transport/icnet_transport_vegas_rto_estimator.cc b/icnet/transport/icnet_transport_vegas_rto_estimator.cc
new file mode 100644
index 00000000..889a6bd3
--- /dev/null
+++ b/icnet/transport/icnet_transport_vegas_rto_estimator.cc
@@ -0,0 +1,46 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "icnet_transport_vegas_rto_estimator.h"
+#include "icnet_socket_options_default_values.h"
+
+namespace icnet {
+
+RtoEstimator::RtoEstimator(Duration min_rto) : smoothed_rtt_(RtoEstimator::getInitialRtt().count()),
+ rtt_variation_(0),
+ first_measurement_(true),
+ last_rto_(min_rto.count()) {
+}
+
+void RtoEstimator::addMeasurement(Duration rtt) {
+ double duration = static_cast<double>(rtt.count());
+ if (first_measurement_) {
+ smoothed_rtt_ = duration;
+ rtt_variation_ = duration / 2;
+ first_measurement_ = false;
+ } else {
+ rtt_variation_ =
+ (1 - default_values::beta) * rtt_variation_ + default_values::beta * std::abs(smoothed_rtt_ - duration);
+ smoothed_rtt_ = (1 - default_values::alpha) * smoothed_rtt_ + default_values::alpha * duration;
+ }
+}
+
+RtoEstimator::Duration RtoEstimator::computeRto() const {
+ double rto =
+ smoothed_rtt_ + std::max(double(default_values::clock_granularity.count()), default_values::k * rtt_variation_);
+ return Duration(static_cast<Duration::rep>(rto));
+}
+
+} // end namespace icnet \ No newline at end of file
diff --git a/icnet/transport/icnet_transport_vegas_rto_estimator.h b/icnet/transport/icnet_transport_vegas_rto_estimator.h
new file mode 100644
index 00000000..7b18533c
--- /dev/null
+++ b/icnet/transport/icnet_transport_vegas_rto_estimator.h
@@ -0,0 +1,49 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ICNET_VEGAS_TRANSPORT_PROTOCOL_RTT_ESTIMATOR_H_
+#define ICNET_VEGAS_TRANSPORT_PROTOCOL_RTT_ESTIMATOR_H_
+
+#include "icnet_common.h"
+
+// Implementation inspired from RFC6298 (https://tools.ietf.org/search/rfc6298#ref-JK88)
+
+namespace icnet {
+
+class RtoEstimator {
+ public:
+ typedef std::chrono::microseconds Duration;
+
+ static Duration getInitialRtt() {
+ return std::chrono::seconds(1);
+ }
+
+ RtoEstimator(Duration min_rto = std::chrono::seconds(1));
+
+ void addMeasurement(Duration measure);
+
+ Duration computeRto() const;
+
+ private:
+ double smoothed_rtt_;
+ double rtt_variation_;
+ bool first_measurement_;
+ double last_rto_;
+};
+
+} // end namespace icnet
+
+
+#endif // ICNET_VEGAS_TRANSPORT_PROTOCOL_RTT_ESTIMATOR_H_