From ba8541cad3a4069886444abbd1848b6ef3fff72c Mon Sep 17 00:00:00 2001 From: Mauro Sardara Date: Wed, 22 Feb 2017 14:37:37 +0100 Subject: Initial Commit: libicnet Change-Id: I10a72cb0d84b76553a85c168416b847f6a4ff5f6 Signed-off-by: Mauro Sardara --- icnet/CMakeLists.txt | 60 ++ icnet/ccnx/icnet_ccnx_common.h | 49 ++ icnet/ccnx/icnet_ccnx_content_object.cc | 202 ++++++ icnet/ccnx/icnet_ccnx_content_object.h | 121 ++++ icnet/ccnx/icnet_ccnx_facade.h | 27 + icnet/ccnx/icnet_ccnx_interest.cc | 147 +++++ icnet/ccnx/icnet_ccnx_interest.h | 92 +++ icnet/ccnx/icnet_ccnx_key_locator.cc | 40 ++ icnet/ccnx/icnet_ccnx_key_locator.h | 52 ++ icnet/ccnx/icnet_ccnx_key_locator_type.h | 22 + icnet/ccnx/icnet_ccnx_local_connector.cc | 195 ++++++ icnet/ccnx/icnet_ccnx_local_connector.h | 98 +++ icnet/ccnx/icnet_ccnx_manifest.cc | 78 +++ icnet/ccnx/icnet_ccnx_manifest.h | 55 ++ icnet/ccnx/icnet_ccnx_name.cc | 218 +++++++ icnet/ccnx/icnet_ccnx_name.h | 120 ++++ icnet/ccnx/icnet_ccnx_network_message.cc | 88 +++ icnet/ccnx/icnet_ccnx_network_message.h | 70 ++ icnet/ccnx/icnet_ccnx_payload_type.h | 37 ++ icnet/ccnx/icnet_ccnx_pending_interest.cc | 94 +++ icnet/ccnx/icnet_ccnx_pending_interest.h | 93 +++ icnet/ccnx/icnet_ccnx_portal.cc | 204 ++++++ icnet/ccnx/icnet_ccnx_portal.h | 105 +++ icnet/ccnx/icnet_ccnx_segment.cc | 78 +++ icnet/ccnx/icnet_ccnx_segment.h | 71 ++ icnet/transport/consumer.conf | 21 + icnet/transport/icnet_common.h | 50 ++ icnet/transport/icnet_content_store.cc | 75 +++ icnet/transport/icnet_content_store.h | 60 ++ icnet/transport/icnet_download_observer.h | 32 + icnet/transport/icnet_rate_estimation.cc | 324 ++++++++++ icnet/transport/icnet_rate_estimation.h | 187 ++++++ icnet/transport/icnet_socket.h | 124 ++++ icnet/transport/icnet_socket_consumer.cc | 613 ++++++++++++++++++ icnet/transport/icnet_socket_consumer.h | 161 +++++ .../icnet_socket_options_default_values.h | 61 ++ icnet/transport/icnet_socket_options_keys.h | 95 +++ icnet/transport/icnet_socket_producer.cc | 717 +++++++++++++++++++++ icnet/transport/icnet_socket_producer.h | 172 +++++ icnet/transport/icnet_transport.cc | 31 + icnet/transport/icnet_transport.h | 43 ++ icnet/transport/icnet_transport_raaqm.cc | 466 +++++++++++++ icnet/transport/icnet_transport_raaqm.h | 100 +++ icnet/transport/icnet_transport_raaqm_data_path.cc | 197 ++++++ icnet/transport/icnet_transport_raaqm_data_path.h | 242 +++++++ icnet/transport/icnet_transport_vegas.cc | 488 ++++++++++++++ icnet/transport/icnet_transport_vegas.h | 109 ++++ .../icnet_transport_vegas_rto_estimator.cc | 46 ++ .../icnet_transport_vegas_rto_estimator.h | 49 ++ 49 files changed, 6879 insertions(+) create mode 100644 icnet/CMakeLists.txt create mode 100644 icnet/ccnx/icnet_ccnx_common.h create mode 100644 icnet/ccnx/icnet_ccnx_content_object.cc create mode 100644 icnet/ccnx/icnet_ccnx_content_object.h create mode 100644 icnet/ccnx/icnet_ccnx_facade.h create mode 100644 icnet/ccnx/icnet_ccnx_interest.cc create mode 100644 icnet/ccnx/icnet_ccnx_interest.h create mode 100644 icnet/ccnx/icnet_ccnx_key_locator.cc create mode 100644 icnet/ccnx/icnet_ccnx_key_locator.h create mode 100644 icnet/ccnx/icnet_ccnx_key_locator_type.h create mode 100644 icnet/ccnx/icnet_ccnx_local_connector.cc create mode 100644 icnet/ccnx/icnet_ccnx_local_connector.h create mode 100644 icnet/ccnx/icnet_ccnx_manifest.cc create mode 100644 icnet/ccnx/icnet_ccnx_manifest.h create mode 100644 icnet/ccnx/icnet_ccnx_name.cc create mode 100644 icnet/ccnx/icnet_ccnx_name.h create mode 100644 icnet/ccnx/icnet_ccnx_network_message.cc create mode 100644 icnet/ccnx/icnet_ccnx_network_message.h create mode 100644 icnet/ccnx/icnet_ccnx_payload_type.h create mode 100644 icnet/ccnx/icnet_ccnx_pending_interest.cc create mode 100644 icnet/ccnx/icnet_ccnx_pending_interest.h create mode 100644 icnet/ccnx/icnet_ccnx_portal.cc create mode 100644 icnet/ccnx/icnet_ccnx_portal.h create mode 100644 icnet/ccnx/icnet_ccnx_segment.cc create mode 100644 icnet/ccnx/icnet_ccnx_segment.h create mode 100644 icnet/transport/consumer.conf create mode 100644 icnet/transport/icnet_common.h create mode 100644 icnet/transport/icnet_content_store.cc create mode 100644 icnet/transport/icnet_content_store.h create mode 100644 icnet/transport/icnet_download_observer.h create mode 100644 icnet/transport/icnet_rate_estimation.cc create mode 100644 icnet/transport/icnet_rate_estimation.h create mode 100644 icnet/transport/icnet_socket.h create mode 100644 icnet/transport/icnet_socket_consumer.cc create mode 100644 icnet/transport/icnet_socket_consumer.h create mode 100644 icnet/transport/icnet_socket_options_default_values.h create mode 100644 icnet/transport/icnet_socket_options_keys.h create mode 100644 icnet/transport/icnet_socket_producer.cc create mode 100644 icnet/transport/icnet_socket_producer.h create mode 100644 icnet/transport/icnet_transport.cc create mode 100644 icnet/transport/icnet_transport.h create mode 100644 icnet/transport/icnet_transport_raaqm.cc create mode 100644 icnet/transport/icnet_transport_raaqm.h create mode 100644 icnet/transport/icnet_transport_raaqm_data_path.cc create mode 100644 icnet/transport/icnet_transport_raaqm_data_path.h create mode 100644 icnet/transport/icnet_transport_vegas.cc create mode 100644 icnet/transport/icnet_transport_vegas.h create mode 100644 icnet/transport/icnet_transport_vegas_rto_estimator.cc create mode 100644 icnet/transport/icnet_transport_vegas_rto_estimator.h (limited to 'icnet') diff --git a/icnet/CMakeLists.txt b/icnet/CMakeLists.txt new file mode 100644 index 00000000..76e45f69 --- /dev/null +++ b/icnet/CMakeLists.txt @@ -0,0 +1,60 @@ +# Copyright (c) 2017 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +cmake_minimum_required(VERSION 3.2) + +file(GLOB HEADER_FILES "ccnx/*.h") +file(GLOB SOURCE_FILES "ccnx/*.cc") + +set(CP_API_HEADER_FILES + ${CMAKE_BINARY_DIR}/config.hpp + transport/icnet_rate_estimation.h + transport/icnet_download_observer.h + transport/icnet_socket_consumer.h + transport/icnet_socket.h + transport/icnet_socket_options_default_values.h + transport/icnet_socket_options_keys.h + transport/icnet_common.h + transport/icnet_socket_producer.h + transport/icnet_content_store.h + transport/icnet_transport_vegas.h + transport/icnet_transport.h + transport/icnet_transport_raaqm.h + transport/icnet_transport_vegas_rto_estimator.h + transport/icnet_transport_raaqm_data_path.h) + +set(CP_API_SOURCE_FILES + transport/icnet_socket_producer.cc + transport/icnet_socket_consumer.cc + transport/icnet_transport_vegas.cc + transport/icnet_transport.cc + transport/icnet_content_store.cc + transport/icnet_transport_raaqm.cc + transport/icnet_transport_vegas_rto_estimator.cc + transport/icnet_rate_estimation.cc + transport/icnet_transport_raaqm_data_path.cc) + +set(CP_API_CONFIG + transport/consumer.conf) + +set(CMAKE_INSTALL_RPATH "${CMAKE_INSTALL_PREFIX}/lib") + +add_library(icnet SHARED ${SOURCE_FILES} ${CP_API_SOURCE_FILES}) +target_link_libraries(icnet ${LIBRARIES} ${CMAKE_THREAD_LIBS_INIT} ${Boost_LIBRARIES}) + +install(TARGETS icnet DESTINATION ${CMAKE_INSTALL_PREFIX}/lib) + +install(FILES ${COMMON_HEADER_FILES} DESTINATION ${CMAKE_INSTALL_PREFIX}/include/icnet) +install(FILES ${HEADER_FILES} DESTINATION ${CMAKE_INSTALL_PREFIX}/include/icnet) +install(FILES ${CP_API_HEADER_FILES} DESTINATION ${CMAKE_INSTALL_PREFIX}/include/icnet) +install(FILES ${CP_API_CONFIG} DESTINATION ${CMAKE_INSTALL_PREFIX}/etc/) diff --git a/icnet/ccnx/icnet_ccnx_common.h b/icnet/ccnx/icnet_ccnx_common.h new file mode 100644 index 00000000..d8020fe7 --- /dev/null +++ b/icnet/ccnx/icnet_ccnx_common.h @@ -0,0 +1,49 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef ICNET_CCNX_COMMON_H_ +#define ICNET_CCNX_COMMON_H_ + +// require C++11 +#if __cplusplus < 201103L && !defined(__GXX_EXPERIMENTAL_CXX0X__) +# error "libconsumer-producer must be compiled using the C++11 standard" +#endif + +// Avoid use of C keyword restrict +#define restrict __restrict__ + +// Each ccnx name has to start with ccnx: +#define CCNX_START_PREFIX "ccnx:" + +// #include "config.hpp" + +#include +#include +#include +#include +#include +#include +#include + +#if defined(__GNUC__) || defined(__clang__) +# define DEPRECATED(func) func __attribute__ ((deprecated)) +#elif defined(_MSC_VER) +# define DEPRECATED(func) __declspec(deprecated) func +#else +# pragma message("DEPRECATED not implemented") +# define DEPRECATED(func) func +#endif + +#endif // ICNET_CCNX_COMMON_H_ diff --git a/icnet/ccnx/icnet_ccnx_content_object.cc b/icnet/ccnx/icnet_ccnx_content_object.cc new file mode 100644 index 00000000..1a7fa3c6 --- /dev/null +++ b/icnet/ccnx/icnet_ccnx_content_object.cc @@ -0,0 +1,202 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "icnet_ccnx_content_object.h" + +extern "C" { +#include +}; + +namespace icnet { + +namespace ccnx { + +ContentObject::ContentObject() + : name_(ccnxName_Create()), + ccnx_content_object_(ccnxContentObject_CreateWithNameAndPayload(name_.getWrappedStructure(), NULL)), + content_type_(PayloadType::DATA) { +} + +ContentObject::ContentObject(const Name &name, uint8_t *payload, std::size_t size) + : name_(name), content_type_(PayloadType::DATA) { + PARCBuffer *buffer = parcBuffer_CreateFromArray(payload, size); + buffer = parcBuffer_Flip(buffer); + ccnx_content_object_ = ccnxContentObject_CreateWithNameAndPayload(name.getWrappedStructure(), buffer); + parcBuffer_Release(&buffer); +} + +ContentObject::ContentObject(const CCNxContentObjectStructure *content_object) : name_(ccnxContentObject_GetName( + content_object)), + ccnx_content_object_( + ccnxContentObject_Acquire( + content_object)), + content_type_((PayloadType) ccnxContentObject_GetPayloadType( + content_object)) { +} + +ContentObject::ContentObject(const Name &name) + : name_(name), + ccnx_content_object_(ccnxContentObject_CreateWithNameAndPayload(name.getWrappedStructure(), NULL)), + content_type_(PayloadType::DATA) { +} + +ContentObject::ContentObject(Name &&name) + : name_(std::move(name)), + ccnx_content_object_(ccnxContentObject_CreateWithNameAndPayload(name.getWrappedStructure(), NULL)), + content_type_(PayloadType::DATA) { +} + +ContentObject::~ContentObject() { + ccnxContentObject_Release(&ccnx_content_object_); +} + +bool ContentObject::operator==(const ContentObject &content_object) { + return ccnxContentObject_Equals(ccnx_content_object_, content_object.ccnx_content_object_); +} + +PayloadType ContentObject::getContentType() const { + return (PayloadType) ccnxContentObject_GetPayloadType(ccnx_content_object_); +} + +void ContentObject::setContentType(PayloadType payload_type) { + content_type_ = payload_type; +} + +bool ContentObject::setContent(PayloadType content_type, const uint8_t *buffer, size_t buffer_size) { + content_type_ = content_type; + return setContent(buffer, buffer_size); +} + +bool ContentObject::setContent(const uint8_t *buffer, size_t buffer_size) { + bool ret; + PARCBuffer *parc_buffer = parcBuffer_CreateFromArray(buffer, buffer_size); + parc_buffer = parcBuffer_Flip(parc_buffer); + + if (content_type_ != PayloadType::DATA) { + ret = ccnxContentObject_SetPayload(ccnx_content_object_, (CCNxPayloadType) content_type_, parc_buffer); + } else { + ret = ccnxContentObject_SetPayload(ccnx_content_object_, (CCNxPayloadType) PayloadType::DATA, parc_buffer); + } + + parcBuffer_Release(&parc_buffer); + + return ret; +} + +Array ContentObject::getContent() const { + PARCBuffer *buffer = ccnxContentObject_GetPayload(ccnx_content_object_); + return Array(parcBuffer_Overlay(buffer, 0), parcBuffer_Remaining(buffer)); +} + +void ContentObject::setSignature() { + +} + +void ContentObject::signWithSha256(KeyLocator &key_locator) { + // ccnxValidationCRC32C_Set(ccnx_content_object_); +} + +void ContentObject::setFinalChunkNumber(uint64_t final_chunk_number) { + ccnxContentObject_SetFinalChunkNumber(ccnx_content_object_, final_chunk_number); +} + +bool ContentObject::hasFinalChunkNumber() { + return ccnxContentObject_HasFinalChunkNumber(ccnx_content_object_); +} + +uint64_t ContentObject::getFinalChunkNumber() { + return ccnxContentObject_GetFinalChunkNumber(ccnx_content_object_); +} + +void ContentObject::setExpiryTime(uint64_t expiry_time) { + std::chrono::milliseconds + ms = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()); + uint64_t expiration = ms.count() + expiry_time; + ccnxContentObject_SetExpiryTime(ccnx_content_object_, expiration); +} + +uint64_t ContentObject::getExpiryTime() { + return ccnxContentObject_GetExpiryTime(ccnx_content_object_); +} + +const Name &ContentObject::getName() const { + return name_; +} + +std::size_t ContentObject::getPacketSize() const { + PARCBuffer *packet = ccnxWireFormatMessage_GetWireFormatBuffer(ccnx_content_object_); + std::size_t ret = parcBuffer_Remaining(packet); + return ret; +} + +void ContentObject::setName(const Name &name) { + PARCBuffer *buffer = parcBuffer_Acquire(ccnxContentObject_GetPayload(ccnx_content_object_)); + ccnxContentObject_Release(&ccnx_content_object_); + ccnx_content_object_ = ccnxContentObject_CreateWithNameAndPayload(name.getWrappedStructure(), buffer); + parcBuffer_Release(&buffer); + name_ = std::move(name); +} + +void ContentObject::setName(Name &&name) { + PARCBuffer *buffer = parcBuffer_Acquire(ccnxContentObject_GetPayload(ccnx_content_object_)); + ccnxContentObject_Release(&ccnx_content_object_); + ccnx_content_object_ = ccnxContentObject_CreateWithNameAndPayload(name.getWrappedStructure(), buffer); + parcBuffer_Release(&buffer); + name_ = std::move(name); +} + +CCNxContentObjectStructure *ContentObject::getWrappedStructure() { + return ccnx_content_object_; +} + +uint8_t ContentObject::getPathLabel() const { + if (ccnxContentObject_HasPathLabel(ccnx_content_object_)) { + return (uint8_t) ccnxContentObject_GetPathLabel(ccnx_content_object_); + } + + return 0; +} + +Array::Array(const void *array, size_t size) { + this->array_ = array; + this->size_ = size; +} + +Array::Array() { + this->array_ = nullptr; + this->size_ = 0; +} + +const void *Array::data() { + return array_; +} + +std::size_t Array::size() { + return size_; +} + +Array &Array::setData(const void *data) { + array_ = data; + return *this; +} + +Array &Array::setSize(std::size_t size) { + size_ = size; + return *this; +} + +} // end namespace ccnx + +} // end namespace icnet diff --git a/icnet/ccnx/icnet_ccnx_content_object.h b/icnet/ccnx/icnet_ccnx_content_object.h new file mode 100644 index 00000000..148587bb --- /dev/null +++ b/icnet/ccnx/icnet_ccnx_content_object.h @@ -0,0 +1,121 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef ICNET_CCNX_CONTENT_OBJECT_H_ +#define ICNET_CCNX_CONTENT_OBJECT_H_ + +#include "icnet_ccnx_common.h" +#include "icnet_ccnx_name.h" +#include "icnet_ccnx_key_locator.h" +#include "icnet_ccnx_payload_type.h" + +extern "C" { +#include +#include +#include +}; + +namespace icnet { + +namespace ccnx { + +typedef CCNxContentObject CCNxContentObjectStructure; + +// This class is used just to transfer buffer pointers +// without making a copy, as std::vector<> would do + +class Array { + public: + explicit Array(const void *array, size_t size); + + Array(); + + const void *data(); + + std::size_t size(); + + Array &setData(const void *data); + + Array &setSize(std::size_t size); + + private: + std::size_t size_; + const void *array_; +}; + +class ContentObject : public std::enable_shared_from_this { + public: + ContentObject(); + + ContentObject(const Name &name, uint8_t *payload, std::size_t size); + + ContentObject(const CCNxContentObjectStructure *content_object); + + ContentObject(const Name &name); + + ContentObject(Name &&name); + + ~ContentObject(); + + bool operator==(const ContentObject &content_object); + + PayloadType getContentType() const; + + bool setContent(PayloadType content_type, const uint8_t *buffer, size_t buffer_size); + + bool setContent(const uint8_t *buffer, size_t buffer_size); + + void setContentType(PayloadType payload_type); + + Array getContent() const; + + void setSignature(); + + void signWithSha256(KeyLocator &key_locator); + + void setFinalChunkNumber(uint64_t final_chunk_number); + + bool hasFinalChunkNumber(); + + uint64_t getFinalChunkNumber(); + + void setExpiryTime(uint64_t expiry_time); + + uint64_t getExpiryTime(); + + const Name &getName() const; + + std::size_t getPacketSize() const; + + void setName(const Name &name); + + void setName(Name &&name); + + CCNxContentObjectStructure *getWrappedStructure(); + + uint8_t getPathLabel() const; + + protected: + + Name name_; + CCNxContentObjectStructure *ccnx_content_object_; + PayloadType content_type_; +}; + +} // end namespace ccnx + +} // end namespace icnet + +#endif //CP_API_CCNXDATA_H_ diff --git a/icnet/ccnx/icnet_ccnx_facade.h b/icnet/ccnx/icnet_ccnx_facade.h new file mode 100644 index 00000000..13a5fd43 --- /dev/null +++ b/icnet/ccnx/icnet_ccnx_facade.h @@ -0,0 +1,27 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef ICNET_CCNX_FACADE_H_ +#define ICNET_CCNX_FACADE_H_ + +#include "icnet_ccnx_segment.h" +#include "icnet_ccnx_name.h" +#include "icnet_ccnx_content_object.h" +#include "icnet_ccnx_interest.h" +#include "icnet_ccnx_portal.h" +#include "icnet_ccnx_manifest.h" +#include "icnet_ccnx_key_locator.h" + +#endif // ICNET_CCNX_FACADE_H_ diff --git a/icnet/ccnx/icnet_ccnx_interest.cc b/icnet/ccnx/icnet_ccnx_interest.cc new file mode 100644 index 00000000..a2bcf72e --- /dev/null +++ b/icnet/ccnx/icnet_ccnx_interest.cc @@ -0,0 +1,147 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "icnet_ccnx_interest.h" + +namespace icnet { + +namespace ccnx { + +Interest::Interest(const Name &interest_name) + : name_(interest_name), interest_(ccnxInterest_CreateSimple(name_.getWrappedStructure())) { +} + +Interest::Interest(Name &&interestName) + : name_(std::move(interestName)), interest_(ccnxInterest_CreateSimple(name_.getWrappedStructure())) { +} + +Interest::Interest(CCNxInterestStruct *interest) + : name_(ccnxInterest_GetName(interest)), interest_(ccnxInterest_Acquire(interest)) { +} + +Interest::Interest(const Interest &other_interest) : name_(other_interest.name_), + interest_(ccnxInterest_CreateSimple(other_interest.name_ + .getWrappedStructure())) { + PARCBuffer *buffer = nullptr; + + // Key Id restriction + buffer = ccnxInterest_GetKeyIdRestriction(other_interest.interest_); + if (buffer) { + ccnxInterest_SetKeyIdRestriction(interest_, buffer); + } + + // Content Hash restriction + buffer = ccnxInterest_GetContentObjectHashRestriction(other_interest.interest_); + if (buffer) { + ccnxInterest_SetContentObjectHashRestriction(interest_, buffer); + } + + // Optional Payload + buffer = ccnxInterest_GetPayload(other_interest.interest_); + if (buffer) { + ccnxInterest_SetPayload(interest_, buffer); + } + + ccnxInterest_SetHopLimit(interest_, ccnxInterest_GetHopLimit(other_interest.interest_)); + ccnxInterest_SetLifetime(interest_, ccnxInterest_GetLifetime(other_interest.interest_)); +} + +Interest::Interest(Interest &&other_interest) : name_(std::move(other_interest.name_)), + interest_(ccnxInterest_Acquire(other_interest.interest_)) { +} + +Interest &Interest::operator=(const Interest &other_interest) { + ccnxInterest_Release(&interest_); + name_ = other_interest.name_; + interest_ = ccnxInterest_CreateSimple(name_.getWrappedStructure()); + return *this; +} + +Interest::~Interest() { + ccnxInterest_Release(&interest_); +} + +bool Interest::operator==(const Interest &interest) { + return ccnxInterest_Equals(interest_, interest.interest_); +} + +const Name &Interest::getName() const { + return name_; +} + +void Interest::setInterestLifetime(uint32_t lifetime) { + ccnxInterest_SetLifetime(interest_, lifetime); +} + +const uint32_t Interest::getInterestLifetime() const { + return ccnxInterest_GetLifetime(interest_); +} + +bool Interest::setKeyId(const PARCBuffer *keyId) { + return ccnxInterest_SetKeyIdRestriction(interest_, keyId); +} + +PARCBuffer *Interest::getKeyId() { + return ccnxInterest_GetKeyIdRestriction(interest_); +} + +PARCBuffer *Interest::getContentHash() { + return ccnxInterest_GetContentObjectHashRestriction(interest_); +} + +bool Interest::setContentHash(PARCBuffer *hash) { + return ccnxInterest_SetContentObjectHashRestriction(interest_, hash); +} + +std::string Interest::toString() { + char *str = ccnxInterest_ToString(interest_); + std::string ret(str); + + free(str); + + return ret; +} + +bool Interest::setPayload(const PARCBuffer *payload) { + return ccnxInterest_SetPayload(interest_, payload); +} + +bool Interest::setPayloadAndId(const PARCBuffer *payload) { + return ccnxInterest_SetPayloadAndId(interest_, payload); +} + +bool Interest::setPayloadWithId(const PARCBuffer *payload, const CCNxInterestPayloadId *payload_id) { + return ccnxInterest_SetPayloadWithId(interest_, payload, payload_id); +} + +PARCBuffer *Interest::getPayload() { + return ccnxInterest_GetPayload(interest_); +} + +void Interest::setHopLimit(uint32_t hop_limit) { + ccnxInterest_SetHopLimit(interest_, hop_limit); +} + +uint32_t Interest::getHopLimit() { + return ccnxInterest_GetHopLimit(interest_); +} + +CCNxInterestStruct *Interest::getWrappedStructure() const { + return interest_; +} + +} // end namespace ccnx + +} // end namespace icnet \ No newline at end of file diff --git a/icnet/ccnx/icnet_ccnx_interest.h b/icnet/ccnx/icnet_ccnx_interest.h new file mode 100644 index 00000000..2156b56f --- /dev/null +++ b/icnet/ccnx/icnet_ccnx_interest.h @@ -0,0 +1,92 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef ICNET_CCNX_INTEREST_H_ +#define ICNET_CCNX_INTEREST_H_ + +#include "icnet_ccnx_common.h" + +extern "C" { +#include +}; + +//#include "interest.hpp" +#include "icnet_ccnx_name.h" + +namespace icnet { + +namespace ccnx { + +typedef CCNxInterest CCNxInterestStruct; + +class Interest : public std::enable_shared_from_this { + public: + Interest(const Name &interest_name); + + Interest(Name &&name); + + Interest(CCNxInterestStruct *interest); + + Interest(const Interest &other_interest); + + Interest(Interest &&other_interest); + + ~Interest(); + + bool operator==(const Interest &interest); + + Interest &operator=(const Interest &other_interest); + + const Name &getName() const; + + void setInterestLifetime(uint32_t lifetime); + + const uint32_t getInterestLifetime() const; + + bool setKeyId(const PARCBuffer *keyId); + + PARCBuffer *getKeyId(); + + PARCBuffer *getContentHash(); + + bool setContentHash(PARCBuffer *hash); + + std::string toString(); + + bool setPayload(const PARCBuffer *payload); + + bool setPayloadAndId(const PARCBuffer *payload); + + bool setPayloadWithId(const PARCBuffer *payload, const CCNxInterestPayloadId *payload_id); + + PARCBuffer *getPayload(); + + void setHopLimit(uint32_t hop_limit); + + uint32_t getHopLimit(); + + CCNxInterestStruct *getWrappedStructure() const; + + private: + + Name name_; + CCNxInterestStruct *interest_; +}; + +} // end namespace ccnx + +} // end namespace icnet + +#endif // ICNET_CCNX_INTEREST_H_ diff --git a/icnet/ccnx/icnet_ccnx_key_locator.cc b/icnet/ccnx/icnet_ccnx_key_locator.cc new file mode 100644 index 00000000..193573a9 --- /dev/null +++ b/icnet/ccnx/icnet_ccnx_key_locator.cc @@ -0,0 +1,40 @@ +// +// Created by msardara on 01/11/16. +// + +#include "icnet_ccnx_key_locator.h" + +namespace icnet { + +namespace ccnx { + +KeyLocator::KeyLocator() : type_(KeyLocatorType::UNKNOWN) { +} + +KeyLocator::KeyLocator(KeyLocatorType type, Name &name) : type_(type), name_(name) { +} + +Name &KeyLocator::getName() { + return name_; +} + +void KeyLocator::setName(Name &name) { + name_ = name; +} + +void KeyLocator::setType(KeyLocatorType type) { + type_ = type; +} + +KeyLocatorType KeyLocator::getType() { + return type_; +} + +void KeyLocator::clear() { + type_ = KeyLocatorType::UNKNOWN; + name_.clear(); +} + +} // end namespace ccnx + +} // end namespace icnet \ No newline at end of file diff --git a/icnet/ccnx/icnet_ccnx_key_locator.h b/icnet/ccnx/icnet_ccnx_key_locator.h new file mode 100644 index 00000000..c55ff5f1 --- /dev/null +++ b/icnet/ccnx/icnet_ccnx_key_locator.h @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef ICNET_CCNX_KEY_LOCATOR_H_ +#define ICNET_CCNX_KEY_LOCATOR_H_ + +#include "icnet_ccnx_common.h" +#include "icnet_ccnx_key_locator_type.h" +#include "icnet_ccnx_name.h" + +namespace icnet { + +namespace ccnx { + +class KeyLocator : public std::enable_shared_from_this { + public: + KeyLocator(); + + KeyLocator(KeyLocatorType type, Name &name); + + KeyLocatorType getType(); + + void setType(KeyLocatorType type); + + void setName(Name &name); + + Name &getName(); + + void clear(); + + private: + KeyLocatorType type_; + Name name_; +}; + +} // end namespace ccnx + +} // end namespace icnet + +#endif // ICNET_CCNX_KEY_LOCATOR_H_ diff --git a/icnet/ccnx/icnet_ccnx_key_locator_type.h b/icnet/ccnx/icnet_ccnx_key_locator_type.h new file mode 100644 index 00000000..477870d3 --- /dev/null +++ b/icnet/ccnx/icnet_ccnx_key_locator_type.h @@ -0,0 +1,22 @@ +// +// Created by msardara on 16/02/2017. +// + +#ifndef ICNET_CCNX_KEY_LOCATOR_TYPE_H_ +#define ICNET_CCNX_KEY_LOCATOR_TYPE_H_ + +namespace icnet { + +namespace ccnx { + +enum Type { + NAME = 0, KEY_DIGEST = 1, UNKNOWN = 255 +}; + +typedef enum Type KeyLocatorType; + +} // end namespace ccnx + +} // end namespace icnet + +#endif // ICNET_CCNX_KEY_LOCATOR_TYPE_H_ diff --git a/icnet/ccnx/icnet_ccnx_local_connector.cc b/icnet/ccnx/icnet_ccnx_local_connector.cc new file mode 100644 index 00000000..2a47c117 --- /dev/null +++ b/icnet/ccnx/icnet_ccnx_local_connector.cc @@ -0,0 +1,195 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "icnet_ccnx_local_connector.h" + +namespace icnet { + +namespace ccnx { + +LocalConnector::LocalConnector(boost::asio::io_service &io_service, + std::string &ip_address, + std::string &port, + MessageReceivedCallback receive_callback, + std::list &name_list) + : io_service_(io_service), + socket_(io_service_), + resolver_(io_service_), + endpoint_iterator_(resolver_.resolve({ip_address, port})), + timer_(io_service), + is_connecting_(false), + is_reconnection_(false), + data_available_(false), + receive_callback_(receive_callback), + served_name_list_(name_list) { + startConnectionTimer(); + doConnect(); +} + +LocalConnector::~LocalConnector() { +} + +void LocalConnector::bind(Name &name) { + CCNxControl *control = ccnxControl_CreateAddRouteToSelfRequest(name.getWrappedStructure()); + CCNxMetaMessage *message = ccnxMetaMessage_CreateFromControl(control); + ccnxControl_Release(&control); + + send(message); + + ccnxMetaMessage_Release((CCNxMetaMessage **) &message); + +} + +void LocalConnector::send(CCNxMetaMessage *message) { + CCNxMetaMessage *msg = ccnxMetaMessage_Acquire(message); + + io_service_.post([this, msg]() { + bool write_in_progres = !write_msgs_.empty(); + write_msgs_.push_back(msg); + if (!is_connecting_) { + if (!write_in_progres) { + doWrite(); + } + } else { + // Tell the handle connect it has data to write + data_available_ = true; + } + }); +} + +void LocalConnector::close() { + io_service_.post([this]() { socket_.close(); }); +} + +void LocalConnector::doWrite() { + CCNxMetaMessage *message = write_msgs_.front(); + CCNxCodecNetworkBufferIoVec *network_buffer = ccnxCodecSchemaV1PacketEncoder_DictionaryEncode(message, NULL); + const iovec *iov = ccnxCodecNetworkBufferIoVec_GetArray(network_buffer); + + boost::asio::async_write(socket_, + boost::asio::buffer(iov->iov_base, iov->iov_len), + [this, network_buffer, message](boost::system::error_code ec, std::size_t /*length*/) { + if (!ec) { + ccnxMetaMessage_Release((CCNxMetaMessage **) &message); + write_msgs_.pop_front(); + + if (!write_msgs_.empty()) { + doWrite(); + } + } else { + tryReconnect(); + } + + ccnxCodecNetworkBufferIoVec_Release((CCNxCodecNetworkBufferIoVec **) &network_buffer); + + }); + +} + +void LocalConnector::doReadBody() { + boost::asio::async_read(socket_, + boost::asio::buffer(read_msg_.body(), read_msg_.bodyLength()), + boost::asio::transfer_exactly(read_msg_.bodyLength()), + [this](boost::system::error_code ec, std::size_t length) { + if (!ec) { + receive_callback_(read_msg_.decodeMessage()); + doReadHeader(); + } else { + tryReconnect(); + } + }); +} + +void LocalConnector::doReadHeader() { + boost::asio::async_read(socket_, + boost::asio::buffer(read_msg_.data(), TransportMessage::header_length), + boost::asio::transfer_exactly(TransportMessage::header_length), + [this](boost::system::error_code ec, std::size_t /*length*/) { + if (!ec) { + if (read_msg_.decodeHeader()) { + doReadBody(); + } else { + std::cerr << "Decoding error" << std::endl; + } + } else { + tryReconnect(); + } + }); +} + +void LocalConnector::tryReconnect() { + if (!is_connecting_) { + std::cerr << "Connection lost. Trying to reconnect..." << std::endl; + is_connecting_ = true; + is_reconnection_ = true; + io_service_.post([this]() { + socket_.close(); + startConnectionTimer(); + doConnect(); + }); + } +} + +void LocalConnector::doConnect() { + boost::asio::async_connect(socket_, + endpoint_iterator_, + [this](boost::system::error_code ec, tcp::resolver::iterator) { + if (!ec) { + timer_.cancel(); + is_connecting_ = false; + doReadHeader(); + + if (data_available_) { + data_available_ = false; + doWrite(); + } + + if (is_reconnection_) { + is_reconnection_ = false; + std::cout << "Connection recovered!" << std::endl; + for (auto &name : served_name_list_) { + bind(name); + } + } + + } else { + sleep(1); + doConnect(); + } + }); +} + +bool LocalConnector::checkConnected() { + return !is_connecting_; +} + +void LocalConnector::startConnectionTimer() { + timer_.expires_from_now(boost::posix_time::seconds(20)); + timer_.async_wait(std::bind(&LocalConnector::handleDeadline, this, std::placeholders::_1)); +} + +void LocalConnector::handleDeadline(const boost::system::error_code &ec) { + if (!ec) { + io_service_.post([this]() { + socket_.close(); + std::cerr << "Error connecting. Is the forwarder running?" << std::endl; + io_service_.stop(); + }); + } +} + +} // end namespace ccnx + +} // end namespace icnet \ No newline at end of file diff --git a/icnet/ccnx/icnet_ccnx_local_connector.h b/icnet/ccnx/icnet_ccnx_local_connector.h new file mode 100644 index 00000000..b08d1c5a --- /dev/null +++ b/icnet/ccnx/icnet_ccnx_local_connector.h @@ -0,0 +1,98 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef ICNET_CCNX_LOCAL_CONNECTOR_H_ +#define ICNET_CCNX_LOCAL_CONNECTOR_H_ + +#include "icnet_ccnx_common.h" +#include "icnet_ccnx_network_message.h" +#include "icnet_ccnx_name.h" +#include +#include + +extern "C" { +#include +#include +#include +#include +#include +#include +}; + +namespace icnet { + +namespace ccnx { + +using boost::asio::ip::tcp; +typedef std::deque CcnxTransportMessageQueue; +typedef std::function MessageReceivedCallback; + +class LocalConnector { + public: + LocalConnector(boost::asio::io_service &io_service, + std::string &ip_address, + std::string &port, + MessageReceivedCallback receive_callback, + std::list &name_list); + + ~LocalConnector(); + + void send(CCNxMetaMessage *message); + + void bind(Name &name); + + void close(); + + private: + void doConnect(); + + void doReadHeader(); + + void doReadBody(); + + void doWrite(); + + bool checkConnected(); + + private: + + void handleDeadline(const boost::system::error_code &ec); + + void startConnectionTimer(); + + void tryReconnect(); + + boost::asio::io_service &io_service_; + boost::asio::ip::tcp::socket socket_; + boost::asio::ip::tcp::resolver resolver_; + boost::asio::ip::tcp::resolver::iterator endpoint_iterator_; + boost::asio::deadline_timer timer_; + + TransportMessage read_msg_; + CcnxTransportMessageQueue write_msgs_; + + bool is_connecting_; + bool is_reconnection_; + bool data_available_; + + MessageReceivedCallback receive_callback_; + std::list &served_name_list_; +}; + +} // end namespace ccnx + +} // end namespace icnet + +#endif // ICNET_CCNX_LOCAL_CONNECTOR_H_ diff --git a/icnet/ccnx/icnet_ccnx_manifest.cc b/icnet/ccnx/icnet_ccnx_manifest.cc new file mode 100644 index 00000000..c818f110 --- /dev/null +++ b/icnet/ccnx/icnet_ccnx_manifest.cc @@ -0,0 +1,78 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "icnet_ccnx_manifest.h" +#include + +namespace icnet { + +namespace ccnx { + +Manifest::Manifest(Name &name) : ContentObject(name) { +} + +std::size_t Manifest::estimateManifestSize() { + std::size_t capacity = 0; + + // for (auto pair : m_mapNameDigest) + // capacity += pair.first.size() + pair.second.data().size(); + + return capacity; +} + +Manifest::Manifest(ContentObject &content_object) { + decode(); +} + +void Manifest::encode() { + std::stringstream ss; + //boost::property_tree::write_json(ss, m_mapNameDigest); + + std::string json_string = ss.str(); + + ContentObject::setContent(PayloadType::MANIFEST, (uint8_t *) json_string.c_str(), json_string.size()); +} + +void Manifest::decode() { + PARCBuffer *payload = ccnxContentObject_GetPayload(ccnx_content_object_); + char *buffer = parcBuffer_ToString(payload); + + std::stringstream ss; + ss << buffer; + + //boost::property_tree::read_json(ss, m_mapNameDigest); + + free(buffer); +} + +std::string Manifest::getDigest(ContentObject &content_object) { + // ContentObject &ccnxData = (ContentObject &) content_object; + // for (auto pair : m_mapNameDigest) + // if (pair.second.content_object() == ccnxData.getName().toUri()) { + // return pair.second.content_object(); + // } + + return std::string(); +} + +void Manifest::addNameToCatalogue(Name &name, uint8_t *digest, std::size_t digest_size) { + // Name &ccnxName = (Name &) name; + // m_mapNameDigest.put(ccnxName.toUri(), std::string((char *) digest, digest_size)); + return; +} + +} // end namespace ccnx + +} // end namespace icnet \ No newline at end of file diff --git a/icnet/ccnx/icnet_ccnx_manifest.h b/icnet/ccnx/icnet_ccnx_manifest.h new file mode 100644 index 00000000..92118777 --- /dev/null +++ b/icnet/ccnx/icnet_ccnx_manifest.h @@ -0,0 +1,55 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef ICNET_CCNX_MANIFEST_H_ +#define ICNET_CCNX_MANIFEST_H_ + +#include "icnet_ccnx_common.h" +//#include + +#include "icnet_ccnx_name.h" +#include "icnet_ccnx_content_object.h" + +namespace icnet { + +namespace ccnx { + +class Manifest : public ContentObject // public api::Manifest +{ + public: + Manifest(Name &name); + + Manifest(ContentObject &content_object); + + std::size_t estimateManifestSize(); + + void encode(); + + void decode(); + + std::string getDigest(ContentObject &content_object); + + void addNameToCatalogue(Name &name, uint8_t *digest, std::size_t digest_size); + + private: + // boost::property_tree::ptree map_name_digest_; +}; + +} // end namespace ccnx + +} // end namespace icnet + + +#endif // ICNET_CCNX_MANIFEST_H_ diff --git a/icnet/ccnx/icnet_ccnx_name.cc b/icnet/ccnx/icnet_ccnx_name.cc new file mode 100644 index 00000000..2ea0d8ab --- /dev/null +++ b/icnet/ccnx/icnet_ccnx_name.cc @@ -0,0 +1,218 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "icnet_ccnx_name.h" + +namespace icnet { + +namespace ccnx { + +Name::Name() { + name_ = ccnxName_Create(); +} + +Name::Name(const char *uri) : name_(ccnxName_CreateFromCString(uri)) { + ccnxName_AssertValid(name_); +} + +Name::Name(std::string uri) : Name(uri.c_str()) { +} + +Name::Name(const CCNxNameStructure *name) : name_(ccnxName_Acquire(name)) { + ccnxName_AssertValid(name_); +} + +Name::Name(const Name &name) : name_(ccnxName_Copy(name.name_)) { +} + +Name::Name(Name &&otherName) : name_(ccnxName_Acquire(otherName.name_)) { +} + +Name &Name::operator=(const Name &name) { + ccnxName_Release(&this->name_); + this->name_ = ccnxName_Copy(name.name_); + return *this; +} + +Name &Name::operator=(Name &&name) { + ccnxName_Release(&this->name_); + this->name_ = ccnxName_Acquire(name.name_); + return *this; +} + +bool Name::operator==(const Name &name) const { + return ccnxName_Equals(this->name_, name.name_); +} + +Name::~Name() { + ccnxName_Release(&name_); +} + +bool Name::equals(const Name &name) { + return ccnxName_Equals(this->name_, name.name_); +} + +bool Name::isValid() { + return ccnxName_IsValid(name_); +} + +const std::string &Name::toString() { + char *str = ccnxName_ToString(name_); + name_string_ = std::string(str); + free(str); + return name_string_; +} + +void Name::appendComponent(const Segment &suffix) { + ccnxName_Append(name_, suffix.getWrappedStructure()); +} + +void Name::append(const Name &suffix) { + if (ccnxName_IsValid(suffix.name_)) { + + size_t number_of_components = ccnxName_GetSegmentCount(suffix.getWrappedStructure()); + + for (uint32_t i = 0; i < number_of_components; i++) { + ccnxName_Append(name_, ccnxName_GetSegment(suffix.getWrappedStructure(), i)); + } + } +} + +Name Name::getPrefix(ssize_t number_of_components) const { + std::size_t segment_count = ccnxName_GetSegmentCount(name_); + + if (number_of_components >= 0) { + assert((std::size_t) number_of_components < segment_count); + return getSubName(0, number_of_components); + } else { + assert(segment_count + number_of_components >= 0); + return getSubName(0, number_of_components + segment_count); + } + +} + +Segment Name::get(ssize_t index) const { + std::size_t segment_count = ccnxName_GetSegmentCount(name_); + size_t componentIndex = 0; + + if (index >= 0) { + assert((size_t) index < segment_count); + componentIndex = (size_t) index; + } else { + assert(segment_count + index >= 0); + componentIndex = segment_count + index; + } + + CCNxNameSegment *segment = ccnxName_GetSegment(name_, componentIndex); + Segment ret(segment); + + return ret; +} + +Name Name::getSubName(ssize_t start_component, ssize_t number_of_components) const { + + size_t name_size = ccnxName_GetSegmentCount(name_); + size_t begin; + + if (start_component >= 0) { + assert((size_t) start_component < name_size); + begin = static_cast(start_component); + } else { + assert((ssize_t) (start_component + name_size) >= 0); + begin = start_component + name_size; + } + + size_t end = begin; + end += number_of_components < 0 ? name_size : static_cast(number_of_components); + + if (end >= name_size) { + end = name_size; + } + + CCNxName *name = ccnxName_Create(); + + for (size_t i = begin; i < end; i++) { + ccnxName_Append(name, ccnxName_GetSegment(name_, i)); + } + + Name ret(name); + ccnxName_Release(&name); + + return ret; +} + +bool Name::isPrefixOf(const Name &name) { + Name &ccnx_name = (Name &) name; + return ccnxName_StartsWith(ccnx_name.name_, name_); +} + +Name &Name::appendSegment(const uint64_t chunk_number) { + CCNxNameSegmentStructure *ns = ccnxNameSegmentNumber_Create(CCNxNameLabelType_CHUNK, chunk_number); + name_ = ccnxName_Append(name_, ns); + ccnxNameSegment_Release(&ns); + + return *this; +} + +bool Name::empty() const { + return ccnxName_GetSegmentCount(name_) == 0; +} + +void Name::clear() { + ccnxName_Release(&name_); + name_ = ccnxName_Create(); +} + +std::size_t Name::getSegmentCount() { + return ccnxName_GetSegmentCount(name_); +} + +std::size_t Name::size() { + std::size_t number_of_segments = ccnxName_GetSegmentCount(name_); + std::size_t name_bytes = 0; + + for (std::size_t i = 0; i < number_of_segments; i++) { + name_bytes += ccnxNameSegment_Length(ccnxName_GetSegment(name_, i)); + } + + return name_bytes; +} + +std::ostream &operator<<(std::ostream &os, const Name &name) { + const std::string &str = const_cast(name).toString(); + + if (name.empty()) { + os << "ccnx:/"; + } else { + os << str; + } + + return os; +} + +CCNxNameStructure *Name::getWrappedStructure() const { + return name_; +} + +} // end namespace ccnx + +} // end namespace icnet + +namespace std { +size_t hash::operator()(const icnet::ccnx::Name &name) const { + return ccnxName_HashCode(name.getWrappedStructure());; +} + +} // end namespace std \ No newline at end of file diff --git a/icnet/ccnx/icnet_ccnx_name.h b/icnet/ccnx/icnet_ccnx_name.h new file mode 100644 index 00000000..68c4d19e --- /dev/null +++ b/icnet/ccnx/icnet_ccnx_name.h @@ -0,0 +1,120 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef ICNET_CCNX_NAME_H_ +#define ICNET_CCNX_NAME_H_ + +#include "icnet_ccnx_common.h" + +#include +#include +#include + +extern "C" { +#include +}; + +//#include "name.hpp" +#include "icnet_ccnx_segment.h" +#include + +typedef CCNxName CCNxNameStructure; + +namespace icnet { + +namespace ccnx { + +class Name : public std::enable_shared_from_this // : public api::Name +{ + public: + Name(); + + /** + * @brief Create name + * @param uri The null-terminated URI string + */ + Name(const char *uri); + + /** + * @brief Create name from @p uri (ICN URI scheme) + * @param uri The URI string + */ + Name(std::string uri); + + Name(const Name &name); + + Name(Name &&otherName); + + Name(const CCNxNameStructure *name); + + Name &operator=(const Name &name); + + Name &operator=(Name &&name); + + bool operator==(const Name &name) const; + + bool isValid(); + + const std::string &toString(); + + void appendComponent(const Segment &component); + + void append(const Name &suffix); + + Name getPrefix(ssize_t number_of_components) const; + + Name &appendSegment(const uint64_t chunk_number); + + bool isPrefixOf(const Name &name); + + bool equals(const Name &name); + + Segment get(ssize_t index) const; + + Name getSubName(ssize_t start_component, ssize_t number_of_components = -1) const; + + bool empty() const; + + void clear(); + + std::size_t getSegmentCount(); + + std::size_t size(); + + ~Name(); + + CCNxNameStructure *getWrappedStructure() const; + + private: + + CCNxNameStructure *name_; + std::string name_string_; +}; + +std::ostream &operator<<(std::ostream &os, const Name &name); + +} // end namespace ccnx + +} // end namespace icnet + +namespace std { +template<> +struct hash { + size_t operator()(const icnet::ccnx::Name &name) const; +}; + +} // end namespace std + +#endif // ICNET_CCNX_NAME_H_ diff --git a/icnet/ccnx/icnet_ccnx_network_message.cc b/icnet/ccnx/icnet_ccnx_network_message.cc new file mode 100644 index 00000000..70dff5f7 --- /dev/null +++ b/icnet/ccnx/icnet_ccnx_network_message.cc @@ -0,0 +1,88 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "icnet_ccnx_network_message.h" + +namespace icnet { + +namespace ccnx { + +TransportMessage::TransportMessage() : packet_length_(0) { +} + +const uint8_t *TransportMessage::data() const { + return data_; +} + +uint8_t *TransportMessage::data() { + return data_; +} + +std::size_t TransportMessage::length() const { + return packet_length_; +} + +const uint8_t *TransportMessage::body() const { + return data_ + header_length; +} + +uint8_t *TransportMessage::body() { + return data_ + header_length; +} + +std::size_t TransportMessage::bodyLength() const { + return packet_length_ - header_length; +} + +void TransportMessage::bodyLength(std::size_t new_length) { + packet_length_ = new_length; + if (packet_length_ > max_packet_length) { + packet_length_ = max_packet_length; + } +} + +bool TransportMessage::decodeHeader() { + // General checks + + uint8_t message_version = data_[0]; + + if (message_version != 1) { + std::cerr << "Illegal packet version " << message_version << std::endl; + return false; + } + + // Get packet length + + packet_length_ = data_[2]; + packet_length_ <<= 8; + packet_length_ |= data_[3]; + + return true; +} + +CCNxMetaMessage *TransportMessage::decodeMessage() { + std::size_t total_length = packet_length_; + PARCBuffer *buffer = parcBuffer_CreateFromArray((void *) data_, total_length); + buffer = parcBuffer_Flip(buffer); + CCNxMetaMessage *ret = ccnxMetaMessage_CreateFromWireFormatBuffer(buffer); + parcBuffer_Release((PARCBuffer **) &buffer); + + return ret; + +} + +} // end namespace ccnx + +} // end namespace icnet \ No newline at end of file diff --git a/icnet/ccnx/icnet_ccnx_network_message.h b/icnet/ccnx/icnet_ccnx_network_message.h new file mode 100644 index 00000000..a5cbfccc --- /dev/null +++ b/icnet/ccnx/icnet_ccnx_network_message.h @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef ICNET_CCNX_NETWORK_MESSAGE_H_ +#define ICNET_CCNX_NETWORK_MESSAGE_H_ + +#include "icnet_ccnx_common.h" +#include + +extern "C" { +#include +#include +#include +}; + +namespace icnet { + +namespace ccnx { + +class TransportMessage { + public: + enum { + header_length = 8 + }; + enum { + max_packet_length = 1500 + }; + + TransportMessage(); + + const uint8_t *data() const; + + uint8_t *data(); + + std::size_t length() const; + + const uint8_t *body() const; + + uint8_t *body(); + + std::size_t bodyLength() const; + + void bodyLength(std::size_t new_length); + + bool decodeHeader(); + + CCNxMetaMessage *decodeMessage(); + + private: + uint8_t data_[max_packet_length]; + std::size_t packet_length_; +}; + +} // end namespace ccnx + +} // end namespace icnet + +#endif // ICNET_CCNX_NETWORK_MESSAGE_H_ diff --git a/icnet/ccnx/icnet_ccnx_payload_type.h b/icnet/ccnx/icnet_ccnx_payload_type.h new file mode 100644 index 00000000..36231c52 --- /dev/null +++ b/icnet/ccnx/icnet_ccnx_payload_type.h @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef ICNET_CCNX_PAYLOAD_TYPE_H_ +#define ICNET_CCNX_PAYLOAD_TYPE_H_ + +#include + +namespace icnet { + +namespace ccnx { + +typedef enum { + DATA = CCNxPayloadType_DATA, + KEY = CCNxPayloadType_KEY, + LINK = CCNxPayloadType_LINK, + MANIFEST = CCNxPayloadType_MANIFEST, + NONE = 999 +} PayloadType; + +} // end namespace ccnx + +} // end namespace icnet + +#endif // ICNET_CCNX_PAYLOAD_TYPE_H_ diff --git a/icnet/ccnx/icnet_ccnx_pending_interest.cc b/icnet/ccnx/icnet_ccnx_pending_interest.cc new file mode 100644 index 00000000..9536e4fb --- /dev/null +++ b/icnet/ccnx/icnet_ccnx_pending_interest.cc @@ -0,0 +1,94 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "icnet_ccnx_pending_interest.h" + +namespace icnet { + +namespace ccnx { + +PendingInterest::PendingInterest(std::shared_ptr &interest, + boost::asio::io_service &portal_io_service, + const OnContentObjectCallback &on_content_object, + const OnInterestTimeoutCallback &on_interest_timeout) + : interest_(interest), + io_service_(portal_io_service), + timer_(io_service_), + on_content_object_callback_(on_content_object), + on_interest_timeout_callback_(on_interest_timeout), + received_(false), + valid_(true) { +} + +PendingInterest::~PendingInterest() { + interest_.reset(); +} + +void PendingInterest::startCountdown(BoostCallback &cb) { + timer_.expires_from_now(boost::posix_time::milliseconds(interest_->getInterestLifetime())); + timer_.async_wait(cb); +} + +void PendingInterest::cancelTimer() { + timer_.cancel(); +} + +bool PendingInterest::isReceived() const { + return received_; +} + +void PendingInterest::setReceived() { + received_ = true; +} + +const std::shared_ptr &PendingInterest::getInterest() const { + return interest_; +} + +void PendingInterest::setInterest(const std::shared_ptr &interest) { + PendingInterest::interest_ = interest; +} + +const OnContentObjectCallback &PendingInterest::getOnDataCallback() const { + return on_content_object_callback_; +} + +void PendingInterest::setOnDataCallback(const OnContentObjectCallback &on_content_object) { + PendingInterest::on_content_object_callback_ = on_content_object; +} + +const OnInterestTimeoutCallback &PendingInterest::getOnTimeoutCallback() const { + return on_interest_timeout_callback_; +} + +void PendingInterest::setOnTimeoutCallback(const OnInterestTimeoutCallback &on_interest_timeout) { + PendingInterest::on_interest_timeout_callback_ = on_interest_timeout; +} + +void PendingInterest::setReceived(bool received) { + PendingInterest::received_ = received; +} + +bool PendingInterest::isValid() const { + return valid_; +} + +void PendingInterest::setValid(bool valid) { + PendingInterest::valid_ = valid; +} + +} // end namespace ccnx + +} // end namespace icnet \ No newline at end of file diff --git a/icnet/ccnx/icnet_ccnx_pending_interest.h b/icnet/ccnx/icnet_ccnx_pending_interest.h new file mode 100644 index 00000000..692c63e7 --- /dev/null +++ b/icnet/ccnx/icnet_ccnx_pending_interest.h @@ -0,0 +1,93 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef ICNET_CCNX_PENDING_INTEREST_H_ +#define ICNET_CCNX_PENDING_INTEREST_H_ + +#include "icnet_ccnx_interest.h" +#include "icnet_ccnx_content_object.h" +#include "icnet_ccnx_name.h" + +#include + +namespace icnet { + +namespace ccnx { + +typedef std::function +OnContentObjectCallback; +typedef std::function +OnInterestTimeoutCallback; +typedef std::function +OnInterestCallback; +typedef std::function BoostCallback; + +class PendingInterest { + public: + + friend class Portal; + + PendingInterest(std::shared_ptr &interest, + boost::asio::io_service &portal_io_service, + const OnContentObjectCallback &on_content_object, + const OnInterestTimeoutCallback &on_interest_timeout); + + ~PendingInterest(); + + bool isReceived() const; + + void startCountdown(BoostCallback &cb); + + void cancelTimer(); + + void setReceived(); + + const std::shared_ptr &getInterest() const; + + void setInterest(const std::shared_ptr &interest); + + const OnContentObjectCallback &getOnDataCallback() const; + + void setOnDataCallback(const OnContentObjectCallback &on_content_object); + + const OnInterestTimeoutCallback &getOnTimeoutCallback() const; + + void setOnTimeoutCallback(const OnInterestTimeoutCallback &on_interest_timeout); + + void setReceived(bool received); + + bool isValid() const; + + void setValid(bool valid); + + private: + std::shared_ptr interest_; + boost::asio::io_service &io_service_; + boost::asio::deadline_timer timer_; + + private: + OnContentObjectCallback on_content_object_callback_; + OnInterestTimeoutCallback on_interest_timeout_callback_; + bool received_; + bool valid_; +}; + +} // end namespace ccnx + +} // end namespace icnet + +#endif // ICNET_CCNX_PENDING_INTEREST_H_ diff --git a/icnet/ccnx/icnet_ccnx_portal.cc b/icnet/ccnx/icnet_ccnx_portal.cc new file mode 100644 index 00000000..5b14ace3 --- /dev/null +++ b/icnet/ccnx/icnet_ccnx_portal.cc @@ -0,0 +1,204 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "icnet_ccnx_portal.h" + +#define UNSET_CALLBACK 0 +#define MAX_ARRAY_SIZE 16000 + +namespace icnet { + +namespace ccnx { + +Portal::Portal(std::string forwarder_ip_address, std::string forwarder_port) + : is_running_(true), + clear_(false), + on_interest_callback_(UNSET_CALLBACK), + connector_(io_service_, + forwarder_ip_address, + forwarder_port, + std::bind(&Portal::processIncomingMessages, this, std::placeholders::_1), + served_name_list_) { + io_service_.reset(); +} + +Portal::~Portal() { + connector_.close(); + stopEventsLoop(); + clear(); +} + +void Portal::sendInterest(const Interest &interest, + const OnContentObjectCallback &on_content_object, + const OnInterestTimeoutCallback &on_interest_timeout) { + std::shared_ptr _interest = const_cast(interest).shared_from_this(); + + // Create new message + CCNxMetaMessage *message = ccnxMetaMessage_CreateFromInterest(_interest->getWrappedStructure()); + + // Send it + connector_.send(message); + clear_ = false; + std::function timer_callback; + + PendingInterest *pend_interest = new PendingInterest(_interest, io_service_, on_content_object, on_interest_timeout); + const Name &name = _interest->getName(); + + pending_interest_hash_table_[name] = std::unique_ptr(pend_interest); + + timer_callback = [this, name](const boost::system::error_code &ec) { + + if (clear_ || !is_running_) { + return; + } + + if (ec.value() != boost::system::errc::operation_canceled) { + std::unordered_map>::iterator it = pending_interest_hash_table_.find(name); + if (it != pending_interest_hash_table_.end()) { + it->second->getOnTimeoutCallback()(*it->second->getInterest()); + } else { + std::cerr << "Timeout on interest already received_! [" << it->second->getInterest()->getName() << "]" + << std::endl; + } + } + }; + + pend_interest->startCountdown(timer_callback); + + ccnxMetaMessage_Release(&message); +} + +void Portal::bind(Name &name, const OnInterestCallback &on_interest_callback) { + on_interest_callback_ = on_interest_callback; + served_name_list_.push_back(name); + work_ = std::shared_ptr(new boost::asio::io_service::work(io_service_)); + connector_.bind(name); +} + +void Portal::sendContentObject(const ContentObject &content_object) { + ContentObject &ccnx_data = const_cast(content_object); + CCNxMetaMessageStructure *message = ccnxMetaMessage_CreateFromContentObject(ccnx_data.getWrappedStructure()); + + ccnxContentObject_AssertValid(ccnx_data.getWrappedStructure()); + + connector_.send(message); + + ccnxMetaMessage_Release(&message); +} + +void Portal::runEventsLoop() { + if (io_service_.stopped()) { + io_service_.reset(); // ensure that run()/poll() will do some work + } + + is_running_ = true; + this->io_service_.run(); +} + +void Portal::stopEventsLoop() { + is_running_ = false; + work_.reset(); + io_service_.stop(); +} + +void Portal::clear() { + pending_interest_hash_table_.clear(); + clear_ = true; +} + +void Portal::processInterest(CCNxMetaMessage *response) { + // Interest for a producer + CCNxInterest *interest_ptr = ccnxInterest_Acquire(ccnxMetaMessage_GetInterest(response)); + + if (on_interest_callback_ != UNSET_CALLBACK) { + + Interest interest(interest_ptr); + if (on_interest_callback_) { + on_interest_callback_(interest.getName(), interest); + } + ccnxInterest_Release((CCNxInterest **) &interest_ptr); + } +} + +void Portal::processControlMessage(CCNxMetaMessage *response) { + // Control message as response to the route set by a producer + + CCNxControl *control_message = ccnxMetaMessage_GetControl(response); + + if (ccnxControl_IsACK(control_message)) { + std::cout << "Route set correctly!" << std::endl; + } else { + std::cout << "Failed to set the route." << std::endl; + } +} + +void Portal::processContentObject(CCNxMetaMessage *response) { + // Content object for a consumer + + CCNxContentObject *content_object = ccnxContentObject_Acquire(ccnxMetaMessage_GetContentObject(response)); + + CCNxName *n = ccnxContentObject_GetName(content_object); + size_t n_components = ccnxName_GetSegmentCount(n); + CCNxNameSegment *last_segment = ccnxName_GetSegment(n, n_components - 1); + + bool has_chunk_number = ccnxNameSegmentNumber_IsValid(last_segment); + + PendingInterestHashTable::iterator it = pending_interest_hash_table_.find(Name(n)); + + if (it != pending_interest_hash_table_.end()) { + + std::unique_ptr &interest_ptr = it->second; + + interest_ptr->cancelTimer(); + std::shared_ptr data_ptr = std::make_shared(content_object); + + if (!interest_ptr->isReceived()) { + interest_ptr->setReceived(); + interest_ptr->getOnDataCallback()(*interest_ptr->getInterest(), *data_ptr); + + if (!has_chunk_number) { + pending_interest_hash_table_.erase(interest_ptr->getInterest()->getName()); + } + } + } + + ccnxContentObject_Release((CCNxContentObject **) &content_object); +} + +void Portal::processIncomingMessages(CCNxMetaMessage *response) { + if (clear_ || !is_running_) { + return; + } + + if (response) { + if (ccnxMetaMessage_IsContentObject(response)) { + processContentObject(response); + } else if (ccnxMetaMessage_IsInterest(response)) { + processInterest(response); + } else if (ccnxMetaMessage_IsControl(response)) { + processControlMessage(response); + } + ccnxMetaMessage_Release(&response); + } + +} + +boost::asio::io_service &Portal::getIoService() { + return io_service_; +} + +} // end namespace ccnx + +} // end namespace icnet \ No newline at end of file diff --git a/icnet/ccnx/icnet_ccnx_portal.h b/icnet/ccnx/icnet_ccnx_portal.h new file mode 100644 index 00000000..5076fcd9 --- /dev/null +++ b/icnet/ccnx/icnet_ccnx_portal.h @@ -0,0 +1,105 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef ICNET_CCNX_PORTAL_H_ +#define ICNET_CCNX_PORTAL_H_ + +#include "icnet_ccnx_common.h" + +#include +#include +#include +#include +#include +#include + +extern "C" { +#include +#include +#include +#include +#include +#include +#include +#include +}; + +#include "icnet_ccnx_interest.h" +#include "icnet_ccnx_pending_interest.h" +#include "icnet_ccnx_name.h" +#include "icnet_ccnx_content_object.h" +#include "icnet_ccnx_local_connector.h" + +namespace icnet { + +namespace ccnx { + +typedef std::unordered_map> PendingInterestHashTable; +typedef uint64_t PendingInterestId; +typedef CCNxMetaMessage CCNxMetaMessageStructure; + +class Portal : public std::enable_shared_from_this // : public api::Face +{ + public: + Portal(std::string forwarder_ip_address = "127.0.0.1", std::string forwarder_port = "9695"); + + ~Portal(); + + void sendInterest(const Interest &interest, + const OnContentObjectCallback &on_content_object_callback, + const OnInterestTimeoutCallback &on_interest_timeout_callback); + + void bind(Name &name, const OnInterestCallback &on_interest_callback); + + void runEventsLoop(); + + void sendContentObject(const ContentObject &content_object); + + void stopEventsLoop(); + + void clear(); + + boost::asio::io_service &getIoService(); + + private: + + void processIncomingMessages(CCNxMetaMessageStructure *response); + + void processInterest(CCNxMetaMessage *response); + + void processContentObject(CCNxMetaMessage *response); + + void processControlMessage(CCNxMetaMessage *response); + + volatile bool is_running_; + bool clear_; + + boost::asio::io_service io_service_; + + std::shared_ptr work_; + + PendingInterestHashTable pending_interest_hash_table_; + + OnInterestCallback on_interest_callback_; + std::list served_name_list_; + + LocalConnector connector_; +}; + +} // end namespace ccnx + +} // end namespace icnet + +#endif // ICNET_CCNX_PORTAL_H_ diff --git a/icnet/ccnx/icnet_ccnx_segment.cc b/icnet/ccnx/icnet_ccnx_segment.cc new file mode 100644 index 00000000..4215650a --- /dev/null +++ b/icnet/ccnx/icnet_ccnx_segment.cc @@ -0,0 +1,78 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "icnet_ccnx_segment.h" + +namespace icnet { + +namespace ccnx { + +Segment::Segment(CCNxNameLabelType type, std::string &segment_value) : name_segment_( + ccnxNameSegment_CreateTypeValueArray(type, segment_value.length(), segment_value.c_str())) { +} + +Segment::Segment(CCNxNameSegmentStructure *segment) : name_segment_(ccnxNameSegment_Acquire(segment)) { +} + +Segment::Segment(const Segment &segment) : name_segment_(ccnxNameSegment_Copy(segment.name_segment_)) { +} + +Segment::Segment(Segment &&otherSegment) : name_segment_(ccnxNameSegment_Acquire(otherSegment.name_segment_)) { +} + +Segment::~Segment() { + ccnxNameSegment_Release(&name_segment_); +} + +Segment &Segment::operator=(const Segment &segment) { + ccnxNameSegment_Release(&name_segment_); + this->name_segment_ = ccnxNameSegment_Copy(segment.name_segment_); + return *this; +} + +bool Segment::operator==(const Segment &segment) { + return ccnxNameSegment_Equals(this->name_segment_, segment.name_segment_); +} + +const std::string &Segment::toString() { + char *str = ccnxNameSegment_ToString(name_segment_); + name_segment_string_ = std::string(str); + free(str); + return name_segment_string_; +} + +std::size_t Segment::getSize() { + return ccnxNameSegment_Length(name_segment_); +} + +CCNxNameLabelType Segment::getType() { + return ccnxNameSegment_GetType(name_segment_); +} + +CCNxNameSegmentStructure *Segment::getWrappedStructure() const { + return this->name_segment_; +} + +bool Segment::isSegment() const { + return ccnxNameSegmentNumber_IsValid(name_segment_); +} + +uint64_t Segment::toSegment() const { + return ccnxNameSegmentNumber_Value(name_segment_); +} + +} // end namespace ccnx + +} // end namespace icnet diff --git a/icnet/ccnx/icnet_ccnx_segment.h b/icnet/ccnx/icnet_ccnx_segment.h new file mode 100644 index 00000000..6620ee74 --- /dev/null +++ b/icnet/ccnx/icnet_ccnx_segment.h @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef ICNET_CCNX_SEGMENT_H_ +#define ICNET_CCNX_SEGMENT_H_ + +#include "icnet_ccnx_common.h" + +extern "C" { +#include +#include +#include +}; + +typedef CCNxNameSegment CCNxNameSegmentStructure; + +namespace icnet { + +namespace ccnx { + +class Segment : public std::enable_shared_from_this // : public api::Component +{ + public: + Segment(CCNxNameLabelType type, std::string &segment_value); + + Segment(const Segment &segment); + + Segment(Segment &&otherSegment); + + Segment(CCNxNameSegmentStructure *segment); + + ~Segment(); + + Segment &operator=(const Segment &segment); + + bool operator==(const Segment &segment); + + const std::string &toString(); + + std::size_t getSize(); + + CCNxNameLabelType getType(); + + CCNxNameSegmentStructure *getWrappedStructure() const; + + bool isSegment() const; + + uint64_t toSegment() const; + + private: + CCNxNameSegmentStructure *name_segment_; + std::string name_segment_string_; +}; + +} // end namespace ccnx + +} // end namespace icnet + +#endif // ICNET_CCNX_PORTAL_H_ diff --git a/icnet/transport/consumer.conf b/icnet/transport/consumer.conf new file mode 100644 index 00000000..1a366f32 --- /dev/null +++ b/icnet/transport/consumer.conf @@ -0,0 +1,21 @@ +; this file contais the parameters for RAAQM +autotune = no +lifetime = 500 +retransmissions = 128 +beta = 0.99 +drop = 0.003 +beta_wifi_ = 0.99 +drop_wifi_ = 0.6 +beta_lte_ = 0.99 +drop_lte_ = 0.003 +wifi_delay_ = 200 +lte_delay_ = 9000 + +alpha = 0.95 +batching_parameter = 200 + +;Choice of rate estimator: +;0 --> an estimation each $(batching_parameter) packets +;1 --> an estimation "a la TCP", estimation at the end of the download of the segment + +rate_estimator = 0 diff --git a/icnet/transport/icnet_common.h b/icnet/transport/icnet_common.h new file mode 100644 index 00000000..d507b324 --- /dev/null +++ b/icnet/transport/icnet_common.h @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef ICNET_COMMON_H_ +#define ICNET_COMMON_H_ + +// require C++11 +#if __cplusplus < 201103L && !defined(__GXX_EXPERIMENTAL_CXX0X__) +#error "icnet needs to be compiled using the C++11 standard" +#endif + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "config.hpp" +#include "icnet_ccnx_facade.h" + +#if defined(__GNUC__) || defined(__clang__) +# define DEPRECATED(func) func __attribute__ ((deprecated)) +#elif defined(_MSC_VER) +# define DEPRECATED(func) __declspec(deprecated) func +#else +# pragma message("DEPRECATED not implemented") +# define DEPRECATED(func) func +#endif + +#endif // ICNET_COMMON_H_ diff --git a/icnet/transport/icnet_content_store.cc b/icnet/transport/icnet_content_store.cc new file mode 100644 index 00000000..884ad2e7 --- /dev/null +++ b/icnet/transport/icnet_content_store.cc @@ -0,0 +1,75 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "icnet_content_store.h" + +namespace icnet { + +ContentStore::ContentStore(std::size_t max_packets) : max_content_store_size_(max_packets) { +} + +ContentStore::~ContentStore() { + content_store_hash_table_.clear(); +} + +void ContentStore::insert(const std::shared_ptr &content_object) { + std::unique_lock lock(cs_mutex_); + if (content_store_hash_table_.size() >= max_content_store_size_) { + // Evict item + content_store_hash_table_.erase(lru_list_.back()); + lru_list_.pop_back(); + } + + // Insert new item + lru_list_.push_back(std::cref(content_object->getName())); + LRUList::iterator pos = lru_list_.end(); + content_store_hash_table_[content_object->getName()] = CcnxContentStoreEntry(content_object, pos); + +} + +const std::shared_ptr &ContentStore::find(const Interest &interest) { + std::unique_lock lock(cs_mutex_); + ContentStoreHashTable::iterator it = content_store_hash_table_.find(interest.getName()); + if (it != content_store_hash_table_.end()) { + if (it->second.second != lru_list_.begin()) { + // Move element to the top of the LRU list + lru_list_.splice(lru_list_.begin(), lru_list_, it->second.second); + } + return it->second.first; + } else { + return empty_reference_; + } +} + +void ContentStore::erase(const Name &exact_name) { + std::unique_lock lock(cs_mutex_); + ContentStoreHashTable::iterator it = content_store_hash_table_.find(exact_name); + lru_list_.erase(it->second.second); + content_store_hash_table_.erase(exact_name); +} + +void ContentStore::setLimit(size_t max_packets) { + max_content_store_size_ = max_packets; +} + +std::size_t ContentStore::getLimit() const { + return max_content_store_size_; +} + +std::size_t ContentStore::size() const { + return content_store_hash_table_.size(); +} + +} // end namespace icnet \ No newline at end of file diff --git a/icnet/transport/icnet_content_store.h b/icnet/transport/icnet_content_store.h new file mode 100644 index 00000000..a626055d --- /dev/null +++ b/icnet/transport/icnet_content_store.h @@ -0,0 +1,60 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef ICNET_CONTENT_STORE_H_ +#define ICNET_CONTENT_STORE_H_ + +#include "icnet_socket.h" + +#include + +namespace icnet { + +typedef std::pair, std::list>::iterator> + CcnxContentStoreEntry; +typedef std::list> LRUList; +typedef std::unordered_map ContentStoreHashTable; + +class ContentStore { + public: + + explicit ContentStore(std::size_t max_packets = 65536); + + ~ContentStore(); + + void insert(const std::shared_ptr &content_object); + + const std::shared_ptr &find(const Interest &interest); + + void erase(const Name &exact_name); + + void setLimit(size_t max_packets); + + size_t getLimit() const; + + size_t size() const; + + private: + ContentStoreHashTable content_store_hash_table_; + LRUList lru_list_; + std::shared_ptr empty_reference_; + std::size_t max_content_store_size_; + std::mutex cs_mutex_; +}; + +} // end namespace icnet + + +#endif // ICNET_CONTENT_STORE_H_ diff --git a/icnet/transport/icnet_download_observer.h b/icnet/transport/icnet_download_observer.h new file mode 100644 index 00000000..7b640a1c --- /dev/null +++ b/icnet/transport/icnet_download_observer.h @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef ICNET_DOWNLOAD_OBSERVER_H_ +#define ICNET_DOWNLOAD_OBSERVER_H_ + +namespace icnet { + +class IcnObserver { + public: + virtual ~IcnObserver() { + }; + + virtual void notifyStats(double throughput) = 0; +}; + +} // end namespace icnet + +#endif // ICNET_DOWNLOAD_OBSERVER_H_ + diff --git a/icnet/transport/icnet_rate_estimation.cc b/icnet/transport/icnet_rate_estimation.cc new file mode 100644 index 00000000..b378da06 --- /dev/null +++ b/icnet/transport/icnet_rate_estimation.cc @@ -0,0 +1,324 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#include "icnet_rate_estimation.h" + +namespace icnet { + +void *Timer(void *data) { + InterRttEstimator *estimator = (InterRttEstimator *) data; + + double dat_rtt, my_avg_win, my_avg_rtt; + int my_win_change, number_of_packets, max_packet_size; + + pthread_mutex_lock(&(estimator->mutex_)); + dat_rtt = estimator->rtt_; + pthread_mutex_unlock(&(estimator->mutex_)); + + while (estimator->is_running_) { + usleep(KV * dat_rtt); + + pthread_mutex_lock(&(estimator->mutex_)); + + dat_rtt = estimator->rtt_; + my_avg_win = estimator->avg_win_; + my_avg_rtt = estimator->avg_rtt_; + my_win_change = estimator->win_change_; + number_of_packets = estimator->number_of_packets_; + max_packet_size = estimator->max_packet_size_; + estimator->avg_rtt_ = estimator->rtt_; + estimator->avg_win_ = 0; + estimator->win_change_ = 0; + estimator->number_of_packets_ = 1; + + pthread_mutex_unlock(&(estimator->mutex_)); + + if (number_of_packets == 0 || my_win_change == 0) { + continue; + } + if (estimator->estimation_ == 0) { + estimator->estimation_ = (my_avg_win * 8.0 * max_packet_size * 1000000.0 / (1.0 * my_win_change)) + / (my_avg_rtt / (1.0 * number_of_packets)); + } + + estimator->estimation_ = estimator->alpha_ * estimator->estimation_ + (1 - estimator->alpha_) + * ((my_avg_win * 8.0 * max_packet_size * 1000000.0 / (1.0 * my_win_change)) + / (my_avg_rtt / (1.0 * number_of_packets))); + + if (estimator->observer_) { + estimator->observer_->notifyStats(estimator->estimation_); + } + } + + return nullptr; +} + +InterRttEstimator::InterRttEstimator(double alpha_arg) { + this->estimated_ = false; + this->observer_ = NULL; + this->alpha_ = alpha_arg; + this->thread_is_running_ = false; + this->my_th_ = NULL; + this->is_running_ = true; + this->avg_rtt_ = 0.0; + this->estimation_ = 0.0; + this->avg_win_ = 0.0; + this->rtt_ = 0.0; + this->win_change_ = 0; + this->number_of_packets_ = 0; + this->max_packet_size_ = 0; + this->win_current_ = 1.0; + + pthread_mutex_init(&(this->mutex_), NULL); + gettimeofday(&(this->begin_), 0); +} + +InterRttEstimator::~InterRttEstimator() { + this->is_running_ = false; + if (this->my_th_) { + pthread_join(*(this->my_th_), NULL); + } + this->my_th_ = NULL; + pthread_mutex_destroy(&(this->mutex_)); +} + +void InterRttEstimator::onRttUpdate(double rtt) { + pthread_mutex_lock(&(this->mutex_)); + this->rtt_ = rtt; + this->number_of_packets_++; + this->avg_rtt_ += rtt; + pthread_mutex_unlock(&(this->mutex_)); + + if (!thread_is_running_) { + my_th_ = (pthread_t *) malloc(sizeof(pthread_t)); + if (!my_th_) { + std::cerr << "Error allocating thread." << std::endl; + my_th_ = NULL; + } + if (/*int err = */pthread_create(my_th_, NULL, icnet::Timer, (void *) this)) { + std::cerr << "Error creating the thread" << std::endl; + my_th_ = NULL; + } + thread_is_running_ = true; + } +} + +void InterRttEstimator::onWindowIncrease(double win_current) { + timeval end; + gettimeofday(&end, 0); + double delay = RaaqmDataPath::getMicroSeconds(end) - RaaqmDataPath::getMicroSeconds(this->begin_); + + pthread_mutex_lock(&(this->mutex_)); + this->avg_win_ += this->win_current_ * delay; + this->win_current_ = win_current; + this->win_change_ += delay; + pthread_mutex_unlock(&(this->mutex_)); + + gettimeofday(&(this->begin_), 0); +} + +void InterRttEstimator::onWindowDecrease(double win_current) { + timeval end; + gettimeofday(&end, 0); + double delay = RaaqmDataPath::getMicroSeconds(end) - RaaqmDataPath::getMicroSeconds(this->begin_); + + pthread_mutex_lock(&(this->mutex_)); + this->avg_win_ += this->win_current_ * delay; + this->win_current_ = win_current; + this->win_change_ += delay; + pthread_mutex_unlock(&(this->mutex_)); + + gettimeofday(&(this->begin_), 0); +} + +ALaTcpEstimator::ALaTcpEstimator() { + this->estimation_ = 0.0; + this->observer_ = NULL; + gettimeofday(&(this->begin_), 0); + this->totalSize_ = 0.0; +} + +void ALaTcpEstimator::onStart() { + this->totalSize_ = 0.0; + gettimeofday(&(this->begin_), 0); +} + +void ALaTcpEstimator::onDownloadFinished() { + timeval end; + gettimeofday(&end, 0); + double delay = RaaqmDataPath::getMicroSeconds(end) - RaaqmDataPath::getMicroSeconds(this->begin_); + this->estimation_ = this->totalSize_ * 8 * 1000000 / delay; + if (observer_) { + observer_->notifyStats(this->estimation_); + } +} + +void ALaTcpEstimator::onDataReceived(int packet_size) { + this->totalSize_ += packet_size; +} + +SimpleEstimator::SimpleEstimator(double alphaArg, int batching_param) { + this->estimation_ = 0.0; + this->estimated_ = false; + this->observer_ = NULL; + this->batching_param_ = batching_param; + this->total_size_ = 0.0; + this->number_of_packets_ = 0; + this->base_alpha_ = alphaArg; + this->alpha_ = alphaArg; + gettimeofday(&(this->begin_), 0); +} + +void SimpleEstimator::onStart() { + this->estimated_ = false; + this->number_of_packets_ = 0; + this->total_size_ = 0.0; + gettimeofday(&(this->begin_), 0); +} + +void SimpleEstimator::onDownloadFinished() { + if (!this->estimated_) { + timeval end; + gettimeofday(&end, 0); + double delay = RaaqmDataPath::getMicroSeconds(end) - RaaqmDataPath::getMicroSeconds(this->begin_); + + //Assuming all packets carry max_packet_size_ bytes of data (8*max_packet_size_ bits); 1000000 factor to convert us to seconds + if (this->estimation_) { + this->estimation_ = alpha_ * this->estimation_ + (1 - alpha_) * (total_size_ * 8 * 1000000.0 / (delay)); + } else { + this->estimation_ = total_size_ * 8 * 1000000.0 / (delay); + } + if (observer_) { + observer_->notifyStats(this->estimation_); + } + this->alpha_ = this->base_alpha_ * (((double) this->number_of_packets_) / ((double) this->batching_param_)); + this->number_of_packets_ = 0; + this->total_size_ = 0.0; + gettimeofday(&(this->begin_), 0); + } else { + if (this->number_of_packets_ >= (int) (75.0 * (double) this->batching_param_ / 100.0)) { + timeval end; + gettimeofday(&end, 0); + double delay = RaaqmDataPath::getMicroSeconds(end) - RaaqmDataPath::getMicroSeconds(this->begin_); + + //Assuming all packets carry max_packet_size_ bytes of data (8*max_packet_size_ bits); 1000000 factor to convert us to seconds + if (this->estimation_) { + this->estimation_ = alpha_ * this->estimation_ + (1 - alpha_) * (total_size_ * 8 * 1000000.0 / (delay)); + } else { + this->estimation_ = total_size_ * 8 * 1000000.0 / (delay); + } + if (observer_) { + observer_->notifyStats(this->estimation_); + } + this->alpha_ = this->base_alpha_ * (((double) this->number_of_packets_) / ((double) this->batching_param_)); + this->number_of_packets_ = 0; + this->total_size_ = 0.0; + + gettimeofday(&(this->begin_), 0); + + } + } +} + +void SimpleEstimator::onDataReceived(int packet_size) { + this->total_size_ += packet_size; +} + +void SimpleEstimator::onRttUpdate(double rtt) { + this->number_of_packets_++; + + if (number_of_packets_ == this->batching_param_) { + timeval end; + gettimeofday(&end, 0); + double delay = RaaqmDataPath::getMicroSeconds(end) - RaaqmDataPath::getMicroSeconds(this->begin_); + //Assuming all packets carry max_packet_size_ bytes of data (8*max_packet_size_ bits); 1000000 factor to convert us to seconds + if (this->estimation_) { + this->estimation_ = alpha_ * this->estimation_ + (1 - alpha_) * (total_size_ * 8 * 1000000.0 / (delay)); + } else { + this->estimation_ = total_size_ * 8 * 1000000.0 / (delay); + } + if (observer_) { + observer_->notifyStats(this->estimation_); + } + this->alpha_ = this->base_alpha_; + this->number_of_packets_ = 0; + this->total_size_ = 0.0; + gettimeofday(&(this->begin_), 0); + } +} + +BatchingPacketsEstimator::BatchingPacketsEstimator(double alpha_arg, int param) { + this->estimated_ = false; + this->observer_ = NULL; + this->alpha_ = alpha_arg; + this->batching_param_ = param; + this->number_of_packets_ = 0; + this->avg_win_ = 0.0; + this->avg_rtt_ = 0.0; + this->win_change_ = 0.0; + this->max_packet_size_ = 0; + this->estimation_ = 0.0; + this->win_current_ = 1.0; + gettimeofday(&(this->begin_), 0); +} + +void BatchingPacketsEstimator::onRttUpdate(double rtt) { + this->number_of_packets_++; + this->avg_rtt_ += rtt; + + if (number_of_packets_ == this->batching_param_) { + if (estimation_ == 0) { + estimation_ = (avg_win_ * 8.0 * max_packet_size_ * 1000000.0 / (1.0 * win_change_)) + / (avg_rtt_ / (1.0 * number_of_packets_)); + } else { + estimation_ = alpha_ * estimation_ + (1 - alpha_) + * ((avg_win_ * 8.0 * max_packet_size_ * 1000000.0 / (1.0 * win_change_)) + / (avg_rtt_ / (1.0 * number_of_packets_))); + } + + if (observer_) { + observer_->notifyStats(estimation_); + } + + this->number_of_packets_ = 0; + this->avg_win_ = 0.0; + this->avg_rtt_ = 0.0; + this->win_change_ = 0.0; + } +} + +void BatchingPacketsEstimator::onWindowIncrease(double win_current) { + timeval end; + gettimeofday(&end, 0); + double delay = RaaqmDataPath::getMicroSeconds(end) - RaaqmDataPath::getMicroSeconds(this->begin_); + this->avg_win_ += this->win_current_ * delay; + this->win_current_ = win_current; + this->win_change_ += delay; + gettimeofday(&(this->begin_), 0); +} + +void BatchingPacketsEstimator::onWindowDecrease(double win_current) { + timeval end; + gettimeofday(&end, 0); + double delay = RaaqmDataPath::getMicroSeconds(end) - RaaqmDataPath::getMicroSeconds(this->begin_); + this->avg_win_ += this->win_current_ * delay; + this->win_current_ = win_current; + this->win_change_ += delay; + gettimeofday(&(this->begin_), 0); +} + +} // end namespace icnet + diff --git a/icnet/transport/icnet_rate_estimation.h b/icnet/transport/icnet_rate_estimation.h new file mode 100644 index 00000000..86b879c2 --- /dev/null +++ b/icnet/transport/icnet_rate_estimation.h @@ -0,0 +1,187 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef ICNET_RATE_ESTIMATION_H_ +#define ICNET_RATE_ESTIMATION_H_ + +#include + +#include "icnet_transport_raaqm_data_path.h" +#include "icnet_download_observer.h" + +#define BATCH 50 +#define KV 20 +#define ALPHA 0.8 +#define RATE_CHOICE 0 + +namespace icnet { + +class IcnRateEstimator { + public: + IcnRateEstimator() { + }; + + virtual ~IcnRateEstimator() { + }; + + virtual void onRttUpdate(double rtt) { + }; + + virtual void onDataReceived(int packetSize) { + }; + + virtual void onWindowIncrease(double winCurrent) { + }; + + virtual void onWindowDecrease(double winCurrent) { + }; + + virtual void onStart() { + }; + + virtual void onDownloadFinished() { + }; + + virtual void setObserver(IcnObserver *observer) { + this->observer_ = observer; + }; + IcnObserver *observer_; + struct timeval begin_; + double base_alpha_; + double alpha_; + double estimation_; + int number_of_packets_; + // this boolean is to make sure at least one estimation of the BW is done + bool estimated_; +}; + +// A rate estimator RTT-based. Computes EWMA(WinSize)/EWMA(RTT) + +class InterRttEstimator : public IcnRateEstimator { + public: + InterRttEstimator(double alpha_arg); + + ~InterRttEstimator(); + + void onRttUpdate(double rtt); + + void onDataReceived(int packet_size) { + if (packet_size > this->max_packet_size_) { + this->max_packet_size_ = packet_size; + } + }; + + void onWindowIncrease(double win_current); + + void onWindowDecrease(double win_current); + + void onStart() { + }; + + void onDownloadFinished() { + }; + + // private: should be done by using getters + pthread_t *my_th_; + bool thread_is_running_; + double rtt_; + bool is_running_; + pthread_mutex_t mutex_; + double avg_rtt_; + double avg_win_; + int max_packet_size_; + double win_change_; + double win_current_; +}; + +// A rate estimator, Batching Packets based. Computes EWMA(WinSize)/EWMA(RTT) + +class BatchingPacketsEstimator : public IcnRateEstimator { + public: + BatchingPacketsEstimator(double alpha_arg, int batchingParam); + + void onRttUpdate(double rtt); + + void onDataReceived(int packet_size) { + if (packet_size > this->max_packet_size_) { + this->max_packet_size_ = packet_size; + } + }; + + void onWindowIncrease(double win_current); + + void onWindowDecrease(double win_current); + + void onStart() { + }; + + void onDownloadFinished() { + }; + + private: + int batching_param_; + double avg_rtt_; + double avg_win_; + double win_change_; + int max_packet_size_; + double win_current_; +}; + +//Segment Estimator + +class ALaTcpEstimator : public IcnRateEstimator { + public: + ALaTcpEstimator(); + + void onDataReceived(int packet_size); + void onStart(); + void onDownloadFinished(); + + private: + double totalSize_; +}; + +// A Rate estimator, this one is the simplest: counting batching_param_ packets and then divide the sum of the size of these packets by the time taken to DL them. +// Should be the one used + +class SimpleEstimator : public IcnRateEstimator { + public: + SimpleEstimator(double alpha, int batching_param); + + void onRttUpdate(double rtt); + + void onDataReceived(int packet_size); + + void onWindowIncrease(double win_current) { + }; + + void onWindowDecrease(double win_current) { + }; + + void onStart(); + + void onDownloadFinished(); + + private: + int batching_param_; + double total_size_; +}; + +void *Timer(void *data); + +}// end namespace icnet + +#endif // ICNET_RATE_ESTIMATION_H_ + diff --git a/icnet/transport/icnet_socket.h b/icnet/transport/icnet_socket.h new file mode 100644 index 00000000..f1ce8da0 --- /dev/null +++ b/icnet/transport/icnet_socket.h @@ -0,0 +1,124 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef ICNET_SOCKET_H_ +#define ICNET_SOCKET_H_ + +#include "icnet_common.h" +#include "icnet_socket_options_keys.h" +#include "icnet_socket_options_default_values.h" +#include "icnet_download_observer.h" + +#define SOCKET_OPTION_GET 0 +#define SOCKET_OPTION_NOT_GET 1 +#define SOCKET_OPTION_SET 2 +#define SOCKET_OPTION_NOT_SET 3 +#define SOCKET_OPTION_DEFAULT 12345 + +#define VOID_HANDLER 0 + +namespace icnet { + +class ConsumerSocket; +class ProducerSocket; + +typedef ccnx::Interest Interest; +typedef ccnx::ContentObject ContentObject; +typedef ccnx::Name Name; +typedef ccnx::Manifest Manifest; +typedef ccnx::Portal Portal; +typedef ccnx::KeyLocator KeyLocator; +typedef ccnx::Segment Segment; +typedef ccnx::PayloadType PayloadType; +typedef ccnx::Array Array; + +typedef std::function ConsumerInterestCallback; +typedef std::function ConsumerContentCallback; +typedef std::function ConsumerContentObjectCallback; +typedef std::function ConsumerContentObjectVerificationCallback; +typedef std::function ConsumerManifestCallback; +typedef std::function ProducerContentObjectCallback; +typedef std::function ProducerInterestCallback; + +class Socket { + public: + + virtual int setSocketOption(int socket_option_key, int socket_option_value) = 0; + + virtual int setSocketOption(int socket_option_key, double socket_option_value) = 0; + + virtual int setSocketOption(int socket_option_key, size_t socket_option_value) = 0; + + virtual int setSocketOption(int socket_option_key, bool socket_option_value) = 0; + + virtual int setSocketOption(int socket_option_key, Name socket_option_value) = 0; + + virtual int setSocketOption(int socket_option_key, ProducerContentObjectCallback socket_option_value) = 0; + + virtual int setSocketOption(int socket_option_key, ProducerInterestCallback socket_option_value) = 0; + + virtual int setSocketOption(int socket_option_key, ConsumerContentObjectVerificationCallback socket_option_value) = 0; + + virtual int setSocketOption(int socket_option_key, ConsumerContentObjectCallback socket_option_value) = 0; + + virtual int setSocketOption(int socket_option_key, ConsumerInterestCallback socket_option_value) = 0; + + virtual int setSocketOption(int socket_option_key, ConsumerContentCallback socket_option_value) = 0; + + virtual int setSocketOption(int socket_option_key, ConsumerManifestCallback socket_option_value) = 0; + + virtual int setSocketOption(int socket_option_key, KeyLocator socket_option_value) = 0; + + virtual int setSocketOption(int socket_option_key, IcnObserver *socket_option_value) = 0; + + virtual int getSocketOption(int socket_option_key, int &socket_option_value) = 0; + + virtual int getSocketOption(int socket_option_key, double &socket_option_value) = 0; + + virtual int getSocketOption(int socket_option_key, size_t &socket_option_value) = 0; + + virtual int getSocketOption(int socket_option_key, bool &socket_option_value) = 0; + + virtual int getSocketOption(int socket_option_key, Name &socket_option_value) = 0; + + virtual int getSocketOption(int socket_option_key, ProducerContentObjectCallback &socket_option_value) = 0; + + virtual int getSocketOption(int socket_option_key, ProducerInterestCallback &socket_option_value) = 0; + + virtual int getSocketOption(int socket_option_key, + ConsumerContentObjectVerificationCallback &socket_option_value) = 0; + + virtual int getSocketOption(int socket_option_key, ConsumerContentObjectCallback &socket_option_value) = 0; + + virtual int getSocketOption(int socket_option_key, ConsumerInterestCallback &socket_option_value) = 0; + + virtual int getSocketOption(int socket_option_key, ConsumerContentCallback &socket_option_value) = 0; + + virtual int getSocketOption(int socket_option_key, ConsumerManifestCallback &socket_option_value) = 0; + + virtual int getSocketOption(int socket_option_key, KeyLocator &socket_option_value) = 0; + + virtual int getSocketOption(int socket_option_key, std::shared_ptr &socket_option_value) = 0; + + virtual int getSocketOption(int socket_option_key, IcnObserver **socket_option_value) = 0; + + protected: + virtual ~Socket() { + }; +}; + +} // namespace icnet + +#endif // ICNET_SOCKET_H_ diff --git a/icnet/transport/icnet_socket_consumer.cc b/icnet/transport/icnet_socket_consumer.cc new file mode 100644 index 00000000..b6c928c6 --- /dev/null +++ b/icnet/transport/icnet_socket_consumer.cc @@ -0,0 +1,613 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "icnet_socket_consumer.h" + +namespace icnet { + +ConsumerSocket::ConsumerSocket(Name prefix, int protocol) + : is_running_(false), + name_prefix_(prefix), + interest_lifetime_(default_values::interest_lifetime), + min_window_size_(default_values::min_window_size), + max_window_size_(default_values::max_window_size), + current_window_size_(-1), + /****** RAAQM Parameters ******/ + minimum_drop_probability_(default_values::minimum_drop_probability), + sample_number_(default_values::sample_number), + gamma_(default_values::gamma_value), + beta_(default_values::beta_value), + drop_factor_(default_values::drop_factor), + /****** END RAAQM Parameters ******/ + rate_estimation_alpha_(default_values::rate_alpha), + rate_estimation_observer_(NULL), + is_async_(false), + on_interest_output_(VOID_HANDLER), + on_interest_timeout_(VOID_HANDLER), + on_interest_satisfied_(VOID_HANDLER), + on_content_object_input_(VOID_HANDLER), + on_content_object_verification_(VOID_HANDLER), + on_content_object_(VOID_HANDLER), + on_manifest_(VOID_HANDLER), + on_payload_retrieved_(VOID_HANDLER), + virtual_download_(false), + rtt_stats_(false) { + + portal_ = std::make_shared(); + + switch (protocol) { + case TransportProtocolAlgorithms::VEGAS: { + transport_protocol_ = std::make_shared(this); + break; + } + case TransportProtocolAlgorithms::RAAQM: { + transport_protocol_ = std::make_shared(this); + break; + } + } +} + +ConsumerSocket::~ConsumerSocket() { + stop(); + transport_protocol_.reset(); + portal_.reset(); +} + +int ConsumerSocket::consume(Name suffix) { + if (is_running_) { + portal_->getIoService().post(std::bind(&ConsumerSocket::postponedConsume, this, suffix)); + return CONSUMER_BUSY; + } + + if (is_async_) { + portal_ = std::make_shared(); + transport_protocol_->updatePortal(); + } + + name_suffix_ = suffix; + is_async_ = false; + transport_protocol_->start(); + is_running_ = false; + return CONSUMER_READY; +} + +void ConsumerSocket::postponedConsume(Name name_suffix) { + if (is_async_) { + portal_ = std::make_shared(); + transport_protocol_->updatePortal(); + } + + name_suffix_ = name_suffix; + is_async_ = false; + transport_protocol_->start(); +} + +int ConsumerSocket::asyncConsume(Name suffix) { + if (transport_protocol_->isRunning()) { + return CONSUMER_BUSY; + } + + name_suffix_ = suffix; + is_async_ = true; + transport_protocol_->start(); + return CONSUMER_READY; +} + +void ConsumerSocket::stop() { + if (transport_protocol_->isRunning()) { + transport_protocol_->stop(); + } + + is_running_ = false; +} + +int ConsumerSocket::setSocketOption(int socket_option_key, double socket_option_value) { + switch (socket_option_key) { + case MIN_WINDOW_SIZE: + min_window_size_ = socket_option_value; + return SOCKET_OPTION_SET; + + case MAX_WINDOW_SIZE: + max_window_size_ = socket_option_value; + return SOCKET_OPTION_SET; + + case CURRENT_WINDOW_SIZE: + current_window_size_ = socket_option_value; + return SOCKET_OPTION_SET; + + case GAMMA_VALUE: + gamma_ = socket_option_value; + return SOCKET_OPTION_SET; + + case BETA_VALUE: + beta_ = socket_option_value; + return SOCKET_OPTION_SET; + + case DROP_FACTOR: + drop_factor_ = socket_option_value; + return SOCKET_OPTION_SET; + + case MINIMUM_DROP_PROBABILITY: + minimum_drop_probability_ = socket_option_value; + return SOCKET_OPTION_SET; + + case RATE_ESTIMATION_ALPHA: + if (socket_option_value >= 0 && socket_option_value < 1) { + rate_estimation_alpha_ = socket_option_value; + } else { + rate_estimation_alpha_ = ALPHA; + } + return SOCKET_OPTION_SET; + default: + return SOCKET_OPTION_NOT_SET; + } +} + +int ConsumerSocket::setSocketOption(int socket_option_key, int socket_option_value) { + switch (socket_option_key) { + + case GeneralTransportOptions::INPUT_BUFFER_SIZE: + input_buffer_size_ = socket_option_value; + return SOCKET_OPTION_SET; + + case GeneralTransportOptions::OUTPUT_BUFFER_SIZE: + output_buffer_size_ = socket_option_value; + return SOCKET_OPTION_SET; + + case GeneralTransportOptions::MAX_INTEREST_RETX: + max_retransmissions_ = socket_option_value; + return SOCKET_OPTION_SET; + + case GeneralTransportOptions::INTEREST_LIFETIME: + interest_lifetime_ = socket_option_value; + return SOCKET_OPTION_SET; + + case ConsumerCallbacksOptions::INTEREST_RETRANSMISSION: + if (socket_option_value == VOID_HANDLER) { + on_interest_retransmission_ = VOID_HANDLER; + return SOCKET_OPTION_SET; + } + + case ConsumerCallbacksOptions::INTEREST_EXPIRED: + if (socket_option_value == VOID_HANDLER) { + on_interest_timeout_ = VOID_HANDLER; + return SOCKET_OPTION_SET; + } + + case ConsumerCallbacksOptions::INTEREST_SATISFIED: + if (socket_option_value == VOID_HANDLER) { + on_interest_satisfied_ = VOID_HANDLER; + return SOCKET_OPTION_SET; + } + + case ConsumerCallbacksOptions::INTEREST_OUTPUT: + if (socket_option_value == VOID_HANDLER) { + on_interest_output_ = VOID_HANDLER; + return SOCKET_OPTION_SET; + } + + case ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT: + if (socket_option_value == VOID_HANDLER) { + on_content_object_input_ = VOID_HANDLER; + return SOCKET_OPTION_SET; + } + + case ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY: + if (socket_option_value == VOID_HANDLER) { + on_content_object_verification_ = VOID_HANDLER; + return SOCKET_OPTION_SET; + } + + case ConsumerCallbacksOptions::CONTENT_RETRIEVED: + if (socket_option_value == VOID_HANDLER) { + on_payload_retrieved_ = VOID_HANDLER; + return SOCKET_OPTION_SET; + } + + case RateEstimationOptions::RATE_ESTIMATION_BATCH_PARAMETER: + if (socket_option_value > 0) { + rate_estimation_batching_parameter_ = socket_option_value; + } else { + rate_estimation_batching_parameter_ = BATCH; + } + return SOCKET_OPTION_SET; + + case RateEstimationOptions::RATE_ESTIMATION_CHOICE: + if (socket_option_value > 0) { + rate_estimation_choice_ = socket_option_value; + } else { + rate_estimation_choice_ = RATE_CHOICE; + } + return SOCKET_OPTION_SET; + default: + return SOCKET_OPTION_NOT_SET; + } +} + +int ConsumerSocket::setSocketOption(int socket_option_key, size_t socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::INPUT_BUFFER_SIZE: + input_buffer_size_ = socket_option_value; + return SOCKET_OPTION_SET; + + case GeneralTransportOptions::OUTPUT_BUFFER_SIZE: + output_buffer_size_ = socket_option_value; + return SOCKET_OPTION_SET; + + default: + return SOCKET_OPTION_NOT_SET; + } +} + +int ConsumerSocket::setSocketOption(int socket_option_key, bool socket_option_value) { + switch (socket_option_key) { + + case GeneralTransportOptions::RUNNING: + is_running_ = socket_option_value; + return SOCKET_OPTION_SET; + + case OtherOptions::VIRTUAL_DOWNLOAD: + virtual_download_ = socket_option_value; + return SOCKET_OPTION_SET; + + case RaaqmTransportOptions::RTT_STATS: + rtt_stats_ = socket_option_value; + return SOCKET_OPTION_SET; + + default: + return SOCKET_OPTION_NOT_SET; + } +} + +int ConsumerSocket::setSocketOption(int socket_option_key, Name socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::NAME_PREFIX: + name_prefix_ = socket_option_value;; + return SOCKET_OPTION_SET; + + case GeneralTransportOptions::NAME_SUFFIX: + name_suffix_ = socket_option_value; + return SOCKET_OPTION_SET; + + default: + return SOCKET_OPTION_NOT_SET; + } +} + +int ConsumerSocket::setSocketOption(int socket_option_key, ConsumerContentObjectCallback socket_option_value) { + switch (socket_option_key) { + case ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT: + on_content_object_input_ = socket_option_value;; + return SOCKET_OPTION_SET; + + default: + return SOCKET_OPTION_NOT_SET; + } +} + +int ConsumerSocket::setSocketOption(int socket_option_key, ProducerContentObjectCallback socket_option_value) { + return SOCKET_OPTION_NOT_SET; +} + +int ConsumerSocket::setSocketOption(int socket_option_key, + ConsumerContentObjectVerificationCallback socket_option_value) { + switch (socket_option_key) { + case ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY: + on_content_object_verification_ = socket_option_value; + return SOCKET_OPTION_SET; + + default: + return SOCKET_OPTION_NOT_SET; + } +} + +int ConsumerSocket::setSocketOption(int socket_option_key, ConsumerInterestCallback socket_option_value) { + switch (socket_option_key) { + case ConsumerCallbacksOptions::INTEREST_RETRANSMISSION: + on_interest_retransmission_ = socket_option_value; + return SOCKET_OPTION_SET; + + case ConsumerCallbacksOptions::INTEREST_OUTPUT: + on_interest_output_ = socket_option_value; + return SOCKET_OPTION_SET; + + case ConsumerCallbacksOptions::INTEREST_EXPIRED: + on_interest_timeout_ = socket_option_value; + return SOCKET_OPTION_SET; + + case ConsumerCallbacksOptions::INTEREST_SATISFIED: + on_interest_satisfied_ = socket_option_value; + return SOCKET_OPTION_SET; + + default: + return SOCKET_OPTION_NOT_SET; + } +} + +int ConsumerSocket::setSocketOption(int socket_option_key, ProducerInterestCallback socket_option_value) { + return SOCKET_OPTION_NOT_SET; +} + +int ConsumerSocket::setSocketOption(int socket_option_key, ConsumerContentCallback socket_option_value) { + switch (socket_option_key) { + case ConsumerCallbacksOptions::CONTENT_RETRIEVED: + on_payload_retrieved_ = socket_option_value; + return SOCKET_OPTION_SET; + + default: + return SOCKET_OPTION_NOT_SET; + } +} + +int ConsumerSocket::setSocketOption(int socket_option_key, ConsumerManifestCallback socket_option_value) { + switch (socket_option_key) { + case ConsumerCallbacksOptions::MANIFEST_INPUT: + on_manifest_ = socket_option_value; + return SOCKET_OPTION_SET; + + default: + return SOCKET_OPTION_NOT_SET; + } +} + +int ConsumerSocket::setSocketOption(int socket_option_key, KeyLocator socket_option_value) { + return SOCKET_OPTION_NOT_SET; +} + +int ConsumerSocket::setSocketOption(int socket_option_key, IcnObserver *socket_option_value) { + if (socket_option_key == RateEstimationOptions::RATE_ESTIMATION_OBSERVER) { + rate_estimation_observer_ = socket_option_value; + return SOCKET_OPTION_SET; + } else { + return SOCKET_OPTION_NOT_SET; + } +} + +int ConsumerSocket::getSocketOption(int socket_option_key, double &socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::MIN_WINDOW_SIZE: + socket_option_value = min_window_size_; + return SOCKET_OPTION_GET; + + case GeneralTransportOptions::MAX_WINDOW_SIZE: + socket_option_value = max_window_size_; + return SOCKET_OPTION_GET; + + case GeneralTransportOptions::CURRENT_WINDOW_SIZE: + socket_option_value = current_window_size_; + return SOCKET_OPTION_GET; + + // RAAQM parameters + + case RaaqmTransportOptions::GAMMA_VALUE: + socket_option_value = gamma_; + return SOCKET_OPTION_GET; + + case RaaqmTransportOptions::BETA_VALUE: + socket_option_value = beta_; + return SOCKET_OPTION_GET; + + case RaaqmTransportOptions::DROP_FACTOR: + socket_option_value = drop_factor_; + return SOCKET_OPTION_GET; + + case RaaqmTransportOptions::MINIMUM_DROP_PROBABILITY: + socket_option_value = minimum_drop_probability_; + return SOCKET_OPTION_GET; + + case RateEstimationOptions::RATE_ESTIMATION_ALPHA: + socket_option_value = rate_estimation_alpha_; + return SOCKET_OPTION_GET; + default: + return SOCKET_OPTION_NOT_GET; + } +} + +int ConsumerSocket::getSocketOption(int socket_option_key, int &socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::INPUT_BUFFER_SIZE: + socket_option_value = input_buffer_size_; + return SOCKET_OPTION_GET; + + case GeneralTransportOptions::OUTPUT_BUFFER_SIZE: + socket_option_value = output_buffer_size_; + return SOCKET_OPTION_GET; + + case GeneralTransportOptions::MAX_INTEREST_RETX: + socket_option_value = max_retransmissions_; + return SOCKET_OPTION_GET; + + case GeneralTransportOptions::INTEREST_LIFETIME: + socket_option_value = interest_lifetime_; + return SOCKET_OPTION_GET; + + case RaaqmTransportOptions::SAMPLE_NUMBER: + socket_option_value = sample_number_; + return SOCKET_OPTION_GET; + + case RateEstimationOptions::RATE_ESTIMATION_BATCH_PARAMETER: + socket_option_value = rate_estimation_batching_parameter_; + return SOCKET_OPTION_GET; + + case RateEstimationOptions::RATE_ESTIMATION_CHOICE: + socket_option_value = rate_estimation_choice_; + return SOCKET_OPTION_GET; + + default: + return SOCKET_OPTION_NOT_GET; + } +} + +int ConsumerSocket::getSocketOption(int socket_option_key, size_t &socket_option_value) { + switch (socket_option_key) { + case INPUT_BUFFER_SIZE: + socket_option_value = input_buffer_size_; + return SOCKET_OPTION_GET; + + case OUTPUT_BUFFER_SIZE: + socket_option_value = output_buffer_size_; + return SOCKET_OPTION_GET; + + default: + return SOCKET_OPTION_NOT_GET; + } +} + +int ConsumerSocket::getSocketOption(int socket_option_key, bool &socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::ASYNC_MODE: + socket_option_value = is_async_; + return SOCKET_OPTION_GET; + + case GeneralTransportOptions::RUNNING: + socket_option_value = is_running_; + return SOCKET_OPTION_GET; + + case OtherOptions::VIRTUAL_DOWNLOAD: + socket_option_value = virtual_download_; + return SOCKET_OPTION_GET; + + case RaaqmTransportOptions::RTT_STATS: + socket_option_value = rtt_stats_; + return SOCKET_OPTION_GET; + + default: + return SOCKET_OPTION_NOT_GET; + } +} + +int ConsumerSocket::getSocketOption(int socket_option_key, Name &socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::NAME_PREFIX: + socket_option_value = name_prefix_; + return SOCKET_OPTION_GET; + + case GeneralTransportOptions::NAME_SUFFIX: + socket_option_value = name_suffix_; + return SOCKET_OPTION_GET; + + default: + return SOCKET_OPTION_NOT_GET; + } +} + +int ConsumerSocket::getSocketOption(int socket_option_key, ConsumerContentObjectCallback &socket_option_value) { + switch (socket_option_key) { + case ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT: + socket_option_value = on_content_object_input_; + return SOCKET_OPTION_GET; + + default: + return SOCKET_OPTION_NOT_GET; + } +} + +int ConsumerSocket::getSocketOption(int socket_option_key, ProducerContentObjectCallback &socket_option_value) { + return SOCKET_OPTION_NOT_GET; +} + +int ConsumerSocket::getSocketOption(int socket_option_key, + ConsumerContentObjectVerificationCallback &socket_option_value) { + switch (socket_option_key) { + case ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY: + socket_option_value = on_content_object_verification_; + return SOCKET_OPTION_GET; + + default: + return SOCKET_OPTION_NOT_GET; + } +} + +int ConsumerSocket::getSocketOption(int socket_option_key, ConsumerInterestCallback &socket_option_value) { + switch (socket_option_key) { + case ConsumerCallbacksOptions::INTEREST_RETRANSMISSION: + socket_option_value = on_interest_retransmission_; + return SOCKET_OPTION_GET; + + case ConsumerCallbacksOptions::INTEREST_OUTPUT: + socket_option_value = on_interest_output_; + return SOCKET_OPTION_GET; + + case ConsumerCallbacksOptions::INTEREST_EXPIRED: + socket_option_value = on_interest_timeout_; + return SOCKET_OPTION_GET; + + case ConsumerCallbacksOptions::INTEREST_SATISFIED: + socket_option_value = on_interest_satisfied_; + return SOCKET_OPTION_GET; + + default: + return SOCKET_OPTION_NOT_GET; + } +} + +int ConsumerSocket::getSocketOption(int socket_option_key, ProducerInterestCallback &socket_option_value) { + return SOCKET_OPTION_NOT_GET; +} + +int ConsumerSocket::getSocketOption(int socket_option_key, ConsumerContentCallback &socket_option_value) { + switch (socket_option_key) { + case ConsumerCallbacksOptions::CONTENT_RETRIEVED: + socket_option_value = on_payload_retrieved_; + return SOCKET_OPTION_GET; + + default: + return SOCKET_OPTION_NOT_GET; + } +} + +int ConsumerSocket::getSocketOption(int socket_option_key, ConsumerManifestCallback &socket_option_value) { + switch (socket_option_key) { + case ConsumerCallbacksOptions::MANIFEST_INPUT: + socket_option_value = on_manifest_; + return SOCKET_OPTION_GET; + + default: + return SOCKET_OPTION_NOT_GET; + } +} + +int ConsumerSocket::getSocketOption(int socket_option_key, KeyLocator &socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::KEY_LOCATOR: + socket_option_value = key_locator_; + return SOCKET_OPTION_GET; + + default: + return SOCKET_OPTION_NOT_GET; + } +} + +int ConsumerSocket::getSocketOption(int socket_option_key, std::shared_ptr &socket_option_value) { + switch (socket_option_key) { + case PORTAL: + socket_option_value = portal_; + return SOCKET_OPTION_GET; + + default: + return SOCKET_OPTION_NOT_GET; + } +} + +int ConsumerSocket::getSocketOption(int socket_option_key, IcnObserver **socket_option_value) { + if (socket_option_key == RATE_ESTIMATION_OBSERVER) { + *socket_option_value = (rate_estimation_observer_); + return SOCKET_OPTION_GET; + } else { + return SOCKET_OPTION_NOT_GET; + } +} + +} // end namespace icnet diff --git a/icnet/transport/icnet_socket_consumer.h b/icnet/transport/icnet_socket_consumer.h new file mode 100644 index 00000000..6b9ec811 --- /dev/null +++ b/icnet/transport/icnet_socket_consumer.h @@ -0,0 +1,161 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef ICNET_CONSUMER_SOCKET_H_ +#define ICNET_CONSUMER_SOCKET_H_ + +#include "icnet_common.h" +#include "icnet_socket.h" +#include "icnet_transport.h" +#include "icnet_transport_raaqm.h" +#include "icnet_transport_vegas.h" + +#define CONSUMER_READY 0 +#define CONSUMER_BUSY 1 + +namespace icnet { + +class ConsumerSocket : public Socket { + public: + explicit ConsumerSocket(const Name prefix, int protocol); + + ~ConsumerSocket(); + + int consume(Name suffix); + + int asyncConsume(Name suffix); + + void stop(); + + int setSocketOption(int socket_option_key, int socket_option_value); + + int setSocketOption(int socket_option_key, double socket_option_value); + + int setSocketOption(int socket_option_key, bool socket_option_value); + + int setSocketOption(int socket_option_key, size_t socket_option_value); + + int setSocketOption(int socket_option_key, Name socket_option_value); + + int setSocketOption(int socket_option_key, ProducerContentObjectCallback socket_option_value); + + int setSocketOption(int socket_option_key, ConsumerContentObjectVerificationCallback socket_option_value); + + int setSocketOption(int socket_option_key, ConsumerContentObjectCallback socket_option_value); + + int setSocketOption(int socket_option_key, ConsumerInterestCallback socket_option_value); + + int setSocketOption(int socket_option_key, ProducerInterestCallback socket_option_value); + + int setSocketOption(int socket_option_key, ConsumerContentCallback socket_option_value); + + int setSocketOption(int socket_option_key, ConsumerManifestCallback socket_option_value); + + int setSocketOption(int socket_option_key, KeyLocator socket_option_value); + + int setSocketOption(int socket_option_key, IcnObserver *socket_option_value); + + int getSocketOption(int socket_option_key, int &socket_option_value); + + int getSocketOption(int socket_option_key, double &socket_option_value); + + int getSocketOption(int socket_option_key, size_t &socket_option_value); + + int getSocketOption(int socket_option_key, bool &socket_option_value); + + int getSocketOption(int socket_option_key, Name &socket_option_value); + + int getSocketOption(int socket_option_key, ProducerContentObjectCallback &socket_option_value); + + int getSocketOption(int socket_option_key, ConsumerContentObjectVerificationCallback &socket_option_value); + + int getSocketOption(int socket_option_key, ConsumerContentObjectCallback &socket_option_value); + + int getSocketOption(int socket_option_key, ConsumerInterestCallback &socket_option_value); + + int getSocketOption(int socket_option_key, ProducerInterestCallback &socket_option_value); + + int getSocketOption(int socket_option_key, ConsumerContentCallback &socket_option_value); + + int getSocketOption(int socket_option_key, ConsumerManifestCallback &socket_option_value); + + int getSocketOption(int socket_option_key, KeyLocator &socket_option_value); + + int getSocketOption(int socket_option_key, std::shared_ptr &socket_option_value); + + int getSocketOption(int socket_option_key, IcnObserver **socket_option_value); + + private: + + void postponedConsume(Name name_suffix); + + private: + // context inner state variables + bool is_running_; + std::shared_ptr portal_; + std::shared_ptr transport_protocol_; + + Name name_prefix_; + Name name_suffix_; + + int interest_lifetime_; + + double min_window_size_; + double max_window_size_; + double current_window_size_; + int max_retransmissions_; + size_t output_buffer_size_; + size_t input_buffer_size_; + + // RAAQM Parameters + + double minimum_drop_probability_; + unsigned int sample_number_; + double gamma_; + double beta_; + double drop_factor_; + + //Rate estimation parameters + double rate_estimation_alpha_; + IcnObserver *rate_estimation_observer_; + int rate_estimation_batching_parameter_; + int rate_estimation_choice_; + + bool is_async_; + + KeyLocator key_locator_; + + ConsumerInterestCallback on_interest_retransmission_; + ConsumerInterestCallback on_interest_output_; + ConsumerInterestCallback on_interest_timeout_; + ConsumerInterestCallback on_interest_satisfied_; + + ConsumerContentObjectCallback on_content_object_input_; + ConsumerContentObjectVerificationCallback on_content_object_verification_; + + ConsumerContentObjectCallback on_content_object_; + ConsumerManifestCallback on_manifest_; + + ConsumerContentCallback on_payload_retrieved_; + + // Virtual download for traffic generator + + bool virtual_download_; + bool rtt_stats_; +}; + +} // end namespace icnet + +#endif // ICNET_CONSUMER_SOCKET_H_ diff --git a/icnet/transport/icnet_socket_options_default_values.h b/icnet/transport/icnet_socket_options_default_values.h new file mode 100644 index 00000000..0f830a54 --- /dev/null +++ b/icnet/transport/icnet_socket_options_default_values.h @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef ICNET_SOCKET_OPTIONS_DEFAULT_VALUES_H_ +#define ICNET_SOCKET_OPTIONS_DEFAULT_VALUES_H_ + +namespace icnet { + +namespace default_values { + +const int interest_lifetime = 1000; // milliseconds +const int content_object_expiry_time = 50000; // milliseconds -> 50 seconds +const int content_object_packet_size = 1500; // The ethernet MTU +const int producer_socket_input_buffer_size = 150000; // Interests +const int producer_socket_output_buffer_size = 150000; // Content Object +const int default_buffer_size = 8096 * 8096 * 2; +const int signature_size = 260; // bytes +const int key_locator_size = 60; // bytes +const int limit_guard = 80; // bytes +const int min_window_size = 1; // Interests +const int max_window_size = 128000; // Interests +const int digest_size = 34; // bytes +const int max_out_of_order_segments = 3; // content object + +// RAAQM +const int sample_number = 30; +const double gamma_value = 1; +const double beta_value = 0.8; +const double drop_factor = 0.2; +const double minimum_drop_probability = 0.00001; +const int path_id = 0; +const double rate_alpha = 0.8; + +// Vegas +const double alpha = 1 / 8; +const double beta = 1 / 4; +const uint16_t k = 4; +const std::chrono::milliseconds clock_granularity = std::chrono::milliseconds(100); + +// maximum allowed values +const int transport_protocol_min_retransmissions = 0; +const int transport_protocol_max_retransmissions = 128; +const int max_content_object_size = 8096; + +} // end namespace default_values + +} // end namespace icnet + +#endif // ICNET_SOCKET_OPTIONS_DEFAULT_VALUES_H_ diff --git a/icnet/transport/icnet_socket_options_keys.h b/icnet/transport/icnet_socket_options_keys.h new file mode 100644 index 00000000..4b82f67a --- /dev/null +++ b/icnet/transport/icnet_socket_options_keys.h @@ -0,0 +1,95 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef ICNET_TRANSPORT_OPTIONS_KEYS_H_ +#define ICNET_TRANSPORT_OPTIONS_KEYS_H_ + +namespace icnet { + +typedef enum { + RAAQM = 0, VEGAS = 1 +} TransportProtocolAlgorithms; + +typedef enum { + INPUT_BUFFER_SIZE = 101, + OUTPUT_BUFFER_SIZE = 102, + NAME_PREFIX = 103, + NAME_SUFFIX = 104, + MAX_INTEREST_RETX = 105, + DATA_PACKET_SIZE = 106, + INTEREST_LIFETIME = 107, + CONTENT_OBJECT_EXPIRY_TIME = 108, + KEY_LOCATOR = 110, + SIGNATURE_TYPE = 111, + MIN_WINDOW_SIZE = 112, + MAX_WINDOW_SIZE = 113, + CURRENT_WINDOW_SIZE = 114, + ASYNC_MODE = 115, + MAKE_MANIFEST = 116, + PORTAL = 117, + RUNNING = 118, +} GeneralTransportOptions; + +typedef enum { + SAMPLE_NUMBER = 201, + GAMMA_VALUE = 202, + BETA_VALUE = 203, + DROP_FACTOR = 204, + MINIMUM_DROP_PROBABILITY = 205, + PATH_ID = 206, + RTT_STATS = 207, +} RaaqmTransportOptions; + +typedef enum { + RATE_ESTIMATION_ALPHA = 301, + RATE_ESTIMATION_OBSERVER = 302, + RATE_ESTIMATION_BATCH_PARAMETER = 303, + RATE_ESTIMATION_CHOICE = 304, +} RateEstimationOptions; + +typedef enum { + INTEREST_OUTPUT = 401, + INTEREST_RETRANSMISSION = 402, + INTEREST_EXPIRED = 403, + INTEREST_SATISFIED = 404, + CONTENT_OBJECT_INPUT = 411, + MANIFEST_INPUT = 412, + CONTENT_OBJECT_TO_VERIFY = 413, + CONTENT_RETRIEVED = 414, +} ConsumerCallbacksOptions; + +typedef enum { + INTEREST_INPUT = 501, + INTEREST_DROP = 502, + INTEREST_PASS = 503, + CACHE_HIT = 506, + CACHE_MISS = 508, + NEW_CONTENT_OBJECT = 509, + CONTENT_OBJECT_SIGN = 513, + CONTENT_OBJECT_READY = 510, + CONTENT_OBJECT_OUTPUT = 511, +} ProducerCallbacksOptions; + +typedef enum { + VIRTUAL_DOWNLOAD = 601, USE_CFG_FILE = 603, +} OtherOptions; + +typedef enum { + SHA_256 = 701, RSA_256 = 702, +} SignatureType; + +} // end namespace icnet + +#endif // ICNET_TRANSPORT_OPTIONS_KEYS_H_ diff --git a/icnet/transport/icnet_socket_producer.cc b/icnet/transport/icnet_socket_producer.cc new file mode 100644 index 00000000..9a870e5c --- /dev/null +++ b/icnet/transport/icnet_socket_producer.cc @@ -0,0 +1,717 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "icnet_socket_producer.h" + +namespace icnet { + +ProducerSocket::ProducerSocket(Name prefix) + : portal_(new Portal()), + name_prefix_(prefix), + data_packet_size_(default_values::content_object_packet_size), + content_object_expiry_time_(default_values::content_object_expiry_time), + registration_status_(REGISTRATION_NOT_ATTEMPTED), + making_manifest_(false), + signature_type_(SHA_256), + key_locator_size_(default_values::key_locator_size), + output_buffer_(default_values::producer_socket_output_buffer_size), + input_buffer_capacity_(default_values::producer_socket_input_buffer_size), + input_buffer_size_(0), + processing_thread_stop_(false), + listening_thread_stop_(false), + on_interest_input_(VOID_HANDLER), + on_interest_dropped_input_buffer_(VOID_HANDLER), + on_interest_inserted_input_buffer_(VOID_HANDLER), + on_interest_satisfied_output_buffer_(VOID_HANDLER), + on_interest_process_(VOID_HANDLER), + on_new_segment_(VOID_HANDLER), + on_content_object_to_sign_(VOID_HANDLER), + on_content_object_in_output_buffer_(VOID_HANDLER), + on_content_object_output_(VOID_HANDLER), + on_content_object_evicted_from_output_buffer_(VOID_HANDLER) { + listening_thread_stop_ = false; + key_locator_size_ = default_values::key_locator_size; +} + +ProducerSocket::~ProducerSocket() { + processing_thread_stop_ = true; + portal_->stopEventsLoop(); + processing_thread_.join(); + + if (listening_thread_.joinable()) { + listening_thread_.join(); + } +} + +void ProducerSocket::attach() { + listening_thread_ = std::thread(std::bind(&ProducerSocket::listen, this)); + // processing_thread_ = boost::thread(bind(&ProducerTransport::processIncomingInterest, this)); +} + +void ProducerSocket::serveForever() { + if (listening_thread_.joinable()) { + listening_thread_.join(); + } +} + +void ProducerSocket::dispatch() { + // Check that the INTEREST_INPUT callback is set. + if (on_interest_input_ == VOID_HANDLER) { + std::cerr << "Warning: the dispatcher function needs a dispatcher callback! " + "You need to set INTEREST_INPUT callback" << std::endl; + } + + listening_thread_ = std::thread(std::bind(&ProducerSocket::listen, this)); +} + +void ProducerSocket::listen() { + registration_status_ = REGISTRATION_IN_PROGRESS; + + portal_ + ->bind(name_prefix_, std::bind(&ProducerSocket::onInterest, this, std::placeholders::_1, std::placeholders::_2)); + + portal_->runEventsLoop(); +} + +void ProducerSocket::passContentObjectToCallbacks(const std::shared_ptr &content_object) { + if (content_object) { + if (on_new_segment_ != VOID_HANDLER) { + on_new_segment_(*this, *content_object); + } + + if (on_content_object_to_sign_ != VOID_HANDLER) { + if (!making_manifest_) { + on_content_object_to_sign_(*this, *content_object); + } else { + if (content_object->getContentType() == PayloadType::MANIFEST) { + on_content_object_to_sign_(*this, *content_object); + } else { + content_object->signWithSha256(key_locator_); + } + } + } else { + content_object->signWithSha256(key_locator_); + } + + if (on_content_object_in_output_buffer_ != VOID_HANDLER) { + on_content_object_in_output_buffer_(*this, *content_object); + } + + output_buffer_.insert(content_object); + + if (on_content_object_output_ != VOID_HANDLER) { + on_content_object_output_(*this, *content_object); + } + + if (content_object->getName().get(-1).toSegment() == 0) { + portal_->sendContentObject(*content_object); + } + } +} + +void ProducerSocket::produce(ContentObject &content_object) { + if (!name_prefix_.isPrefixOf(content_object.getName())) { + return; + } + + if (on_content_object_in_output_buffer_ != VOID_HANDLER) { + on_content_object_in_output_buffer_(*this, content_object); + } + + if (on_content_object_output_ != VOID_HANDLER) { + on_content_object_output_(*this, content_object); + } + + portal_->sendContentObject(content_object); +} + +void ProducerSocket::produce(Name suffix, const uint8_t *buf, size_t buffer_size, const int response_id, bool is_last) { + + if (buffer_size == 0) { + return; + } + + int bytes_segmented = 0; + + Name name(name_prefix_); + + if (!suffix.empty()) { + name.append(suffix); + } + + size_t bytes_occupied_by_name = name.size(); + + int digestSize = default_values::digest_size; // SHA_256 as default + int signatureSize = default_values::signature_size; + uint64_t free_space_for_content = 0; + + free_space_for_content = data_packet_size_ - bytes_occupied_by_name - digestSize - default_values::limit_guard; + + uint64_t number_of_segments = uint64_t(std::ceil(double(buffer_size) / double(free_space_for_content))); + + if (free_space_for_content * number_of_segments < buffer_size) { + number_of_segments++; + } + + uint64_t current_segment = 0; + + if (seq_number_map_.find(name.toString()) != seq_number_map_.end() + && seq_number_map_[name.toString()].find(response_id) != seq_number_map_[name.toString()].end()) { + current_segment = seq_number_map_[name.toString()][response_id]; + } else { + seq_number_map_[name.toString()][response_id] = current_segment; + } + + if (making_manifest_) { + + std::shared_ptr content_object_segment; + std::shared_ptr manifest_segment; + bool manifest_segment_needed = true; + + uint64_t free_space_for_manifest = + data_packet_size_ - bytes_occupied_by_name - signatureSize - default_values::limit_guard; + + for (unsigned int packaged_segments = 0; packaged_segments < number_of_segments;) { + + if (manifest_segment_needed) { + + Name manifest_name(name_prefix_); + + if (!suffix.empty()) { + manifest_name.append(suffix); + } + + manifest_name.appendSegment(current_segment); + + if (manifest_segment) { + manifest_segment->encode(); + passContentObjectToCallbacks(manifest_segment); + } + + manifest_segment = std::make_shared(manifest_name); + + if (is_last) { + manifest_segment->setFinalChunkNumber(current_segment + number_of_segments - packaged_segments); + } + + // finalSegment = current_segment; + manifest_segment_needed = false; + current_segment++; + + key_locator_.clear(); + key_locator_.setName(const_cast(manifest_segment->getName())); + } + + Name full_name = name; + + content_object_segment = std::make_shared(std::move(full_name.appendSegment(current_segment))); + // content_object_segment->setExpiryTime((uint64_t) m_dataFreshness); + + if (packaged_segments == number_of_segments - 1) { + content_object_segment->setContent(&buf[bytes_segmented], buffer_size - bytes_segmented); + bytes_segmented += buffer_size - bytes_segmented; + } else { + content_object_segment->setContent(&buf[bytes_segmented], free_space_for_content); + bytes_segmented += free_space_for_content; + } + + if (is_last) { + content_object_segment->setFinalChunkNumber(current_segment + number_of_segments - packaged_segments - 1); + } + + passContentObjectToCallbacks(content_object_segment); + + size_t manifestSize = manifest_segment->estimateManifestSize(); + Name &content_object_name = (Name &) content_object_segment->getName(); + size_t fullNameSize = content_object_name.size(); + + // TODO Signature + + if (manifestSize + 2 * fullNameSize > free_space_for_manifest) { + manifest_segment_needed = true; + } + + // TODO Manifest and hashes! + // Array block = content_object_segment->getContent(); + // icn-interface::ConstBufferPtr implicitDigest = icn-interface::crypto::sha256(block.data(), block.size()); + // + // //add implicit digest to the manifest + // manifest_segment->addNameToCatalogue(Name(std::to_string(current_segment)), implicitDigest->buf(), + // implicitDigest->size()); + + packaged_segments++; + current_segment++; + + if (packaged_segments == number_of_segments) { + manifest_segment->encode(); + passContentObjectToCallbacks(manifest_segment); + } + } + } else { + + for (unsigned int packaged_segments = 0; packaged_segments < number_of_segments; packaged_segments++) { + Name fullName = name; + + std::shared_ptr + content_object = std::make_shared(std::move(fullName.appendSegment(current_segment))); + + // TODO If we set the throughput will decrease.. to investigate + // content_object->setExpiryTime((uint64_t)m_dataFreshness); + + if (is_last) { + content_object->setFinalChunkNumber(current_segment + number_of_segments - packaged_segments - 1); + } + + if (packaged_segments == number_of_segments - 1) { + content_object->setContent(&buf[bytes_segmented], buffer_size - bytes_segmented); + bytes_segmented += buffer_size - bytes_segmented; + } else { + content_object->setContent(&buf[bytes_segmented], free_space_for_content); + bytes_segmented += free_space_for_content; + } + + current_segment++; + passContentObjectToCallbacks(content_object); + } + } + + seq_number_map_[name.toString()][response_id] = current_segment; + + if (is_last) { + seq_number_map_[name.toString()].erase(response_id); + if (seq_number_map_[name.toString()].empty()) { + seq_number_map_.erase(name.toString()); + } + } +} + +void ProducerSocket::asyncProduce(ContentObject &content_object) { + std::shared_ptr c_object = std::make_shared(content_object); + std::thread t([c_object, this]() { produce(*c_object); }); + t.detach(); +} + +void ProducerSocket::asyncProduce(Name suffix, + const uint8_t *buf, + size_t buffer_size, + const int response_id, + bool is_last) { + std::thread t([suffix, buf, buffer_size, response_id, is_last, this]() { + produce(suffix, buf, buffer_size, response_id, is_last); + }); + t.detach(); +} + +void ProducerSocket::onInterest(const Name &name, const Interest &interest) { + if (on_interest_input_ != VOID_HANDLER) { + on_interest_input_(*this, interest); + } + + const std::shared_ptr &content_object = output_buffer_.find(interest); + + if (content_object) { + + if (on_interest_satisfied_output_buffer_ != VOID_HANDLER) { + on_interest_satisfied_output_buffer_(*this, interest); + } + + if (on_content_object_output_ != VOID_HANDLER) { + on_content_object_output_(*this, *content_object); + } + + portal_->sendContentObject(*content_object); + } else { + if (on_interest_process_ != VOID_HANDLER) { + on_interest_process_(*this, interest); + } + } +} + +int ProducerSocket::setSocketOption(int socket_option_key, int socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::DATA_PACKET_SIZE: + if (socket_option_value < default_values::max_content_object_size && socket_option_value > 0) { + data_packet_size_ = socket_option_value; + return SOCKET_OPTION_SET; + } else { + return SOCKET_OPTION_NOT_SET; + } + + case GeneralTransportOptions::INPUT_BUFFER_SIZE: + if (socket_option_value >= 1) { + input_buffer_capacity_ = socket_option_value; + return SOCKET_OPTION_SET; + } else { + return SOCKET_OPTION_NOT_SET; + } + + case GeneralTransportOptions::OUTPUT_BUFFER_SIZE: + if (socket_option_value >= 0) { + output_buffer_.setLimit(socket_option_value); + return SOCKET_OPTION_SET; + } else { + return SOCKET_OPTION_NOT_SET; + } + + case GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME: + content_object_expiry_time_ = socket_option_value; + return SOCKET_OPTION_SET; + + case GeneralTransportOptions::SIGNATURE_TYPE: + if (socket_option_value == SOCKET_OPTION_DEFAULT) { + signature_type_ = SHA_256; + } else { + signature_type_ = socket_option_value; + } + + if (signature_type_ == SHA_256 || signature_type_ == RSA_256) { + signature_size_ = 32; + } + + case ProducerCallbacksOptions::INTEREST_INPUT: + if (socket_option_value == VOID_HANDLER) { + on_interest_input_ = VOID_HANDLER; + return SOCKET_OPTION_SET; + } + + case ProducerCallbacksOptions::INTEREST_DROP: + if (socket_option_value == VOID_HANDLER) { + on_interest_dropped_input_buffer_ = VOID_HANDLER; + return SOCKET_OPTION_SET; + } + + case ProducerCallbacksOptions::INTEREST_PASS: + if (socket_option_value == VOID_HANDLER) { + on_interest_inserted_input_buffer_ = VOID_HANDLER; + return SOCKET_OPTION_SET; + } + + case ProducerCallbacksOptions::CACHE_HIT: + if (socket_option_value == VOID_HANDLER) { + on_interest_satisfied_output_buffer_ = VOID_HANDLER; + return SOCKET_OPTION_SET; + } + + case ProducerCallbacksOptions::CACHE_MISS: + if (socket_option_value == VOID_HANDLER) { + on_interest_process_ = VOID_HANDLER; + return SOCKET_OPTION_SET; + } + + case ProducerCallbacksOptions::NEW_CONTENT_OBJECT: + if (socket_option_value == VOID_HANDLER) { + on_new_segment_ = VOID_HANDLER; + return SOCKET_OPTION_SET; + } + + case ProducerCallbacksOptions::CONTENT_OBJECT_SIGN: + if (socket_option_value == VOID_HANDLER) { + on_content_object_to_sign_ = VOID_HANDLER; + return SOCKET_OPTION_SET; + } + + case ProducerCallbacksOptions::CONTENT_OBJECT_READY: + if (socket_option_value == VOID_HANDLER) { + on_content_object_in_output_buffer_ = VOID_HANDLER; + return SOCKET_OPTION_SET; + } + + case ProducerCallbacksOptions::CONTENT_OBJECT_OUTPUT: + if (socket_option_value == VOID_HANDLER) { + on_content_object_output_ = VOID_HANDLER; + return SOCKET_OPTION_SET; + } + + default: + return SOCKET_OPTION_NOT_GET; + } +} + +int ProducerSocket::setSocketOption(int socket_option_key, double socket_option_value) { + return SOCKET_OPTION_NOT_SET; +} + +int ProducerSocket::setSocketOption(int socket_option_key, bool socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::MAKE_MANIFEST: + making_manifest_ = socket_option_value; + return SOCKET_OPTION_SET; + + default: + return SOCKET_OPTION_NOT_SET; + } +} + +int ProducerSocket::setSocketOption(int socket_option_key, Name socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::NAME_PREFIX: + name_prefix_ = socket_option_value; + return SOCKET_OPTION_SET; + + default: + return SOCKET_OPTION_NOT_SET; + } +} + +int ProducerSocket::setSocketOption(int socket_option_key, ProducerContentObjectCallback socket_option_value) { + switch (socket_option_key) { + case ProducerCallbacksOptions::NEW_CONTENT_OBJECT: + on_new_segment_ = socket_option_value; + return SOCKET_OPTION_SET; + + case ProducerCallbacksOptions::CONTENT_OBJECT_SIGN: + on_content_object_to_sign_ = socket_option_value; + return SOCKET_OPTION_SET; + + case ProducerCallbacksOptions::CONTENT_OBJECT_READY: + on_content_object_in_output_buffer_ = socket_option_value; + return SOCKET_OPTION_SET; + + case ProducerCallbacksOptions::CONTENT_OBJECT_OUTPUT: + on_content_object_output_ = socket_option_value; + return SOCKET_OPTION_SET; + + default: + return SOCKET_OPTION_NOT_SET; + } +} + +int ProducerSocket::setSocketOption(int socket_option_key, ProducerInterestCallback socket_option_value) { + switch (socket_option_key) { + case ProducerCallbacksOptions::INTEREST_INPUT: + on_interest_input_ = socket_option_value; + return SOCKET_OPTION_SET; + + case ProducerCallbacksOptions::INTEREST_DROP: + on_interest_dropped_input_buffer_ = socket_option_value; + return SOCKET_OPTION_SET; + + case ProducerCallbacksOptions::INTEREST_PASS: + on_interest_inserted_input_buffer_ = socket_option_value; + return SOCKET_OPTION_SET; + + case ProducerCallbacksOptions::CACHE_HIT: + on_interest_satisfied_output_buffer_ = socket_option_value; + return SOCKET_OPTION_SET; + + case ProducerCallbacksOptions::CACHE_MISS: + on_interest_process_ = socket_option_value; + return SOCKET_OPTION_SET; + + default: + return SOCKET_OPTION_NOT_SET; + } +} + +int ProducerSocket::setSocketOption(int socket_option_key, ConsumerContentObjectCallback socket_option_value) { + return SOCKET_OPTION_NOT_SET; +} + +int ProducerSocket::setSocketOption(int socket_option_key, + ConsumerContentObjectVerificationCallback socket_option_value) { + return SOCKET_OPTION_NOT_SET; +} + +int ProducerSocket::setSocketOption(int socket_option_key, ConsumerInterestCallback socket_option_value) { + return SOCKET_OPTION_NOT_SET; +} + +int ProducerSocket::setSocketOption(int socket_option_key, ConsumerContentCallback socket_option_value) { + return SOCKET_OPTION_NOT_SET; +} + +int ProducerSocket::setSocketOption(int socket_option_key, ConsumerManifestCallback socket_option_value) { + return SOCKET_OPTION_NOT_SET; +} + +int ProducerSocket::setSocketOption(int socket_option_key, KeyLocator socket_option_value) { + return SOCKET_OPTION_NOT_SET; +} + +int ProducerSocket::getSocketOption(int socket_option_key, int &socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::INPUT_BUFFER_SIZE: + socket_option_value = input_buffer_capacity_; + return SOCKET_OPTION_GET; + + case GeneralTransportOptions::OUTPUT_BUFFER_SIZE: + socket_option_value = output_buffer_.getLimit(); + return SOCKET_OPTION_GET; + + case GeneralTransportOptions::DATA_PACKET_SIZE: + socket_option_value = data_packet_size_; + return SOCKET_OPTION_GET; + + case GeneralTransportOptions::CONTENT_OBJECT_EXPIRY_TIME: + socket_option_value = content_object_expiry_time_; + return SOCKET_OPTION_GET; + + case GeneralTransportOptions::SIGNATURE_TYPE: + socket_option_value = signature_type_; + return SOCKET_OPTION_GET; + + default: + return SOCKET_OPTION_NOT_SET; + } +} + +int ProducerSocket::getSocketOption(int socket_option_key, double &socket_option_value) { + return SOCKET_OPTION_NOT_GET; +} + +int ProducerSocket::getSocketOption(int socket_option_key, bool &socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::MAKE_MANIFEST: + socket_option_value = making_manifest_; + return SOCKET_OPTION_GET; + + default: + return SOCKET_OPTION_NOT_GET; + } +} + +int ProducerSocket::getSocketOption(int socket_option_key, Name &socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::NAME_PREFIX: + socket_option_value = name_prefix_; + return SOCKET_OPTION_GET; + + default: + return SOCKET_OPTION_NOT_GET; + } +} + +int ProducerSocket::getSocketOption(int socket_option_key, ProducerContentObjectCallback &socket_option_value) { + switch (socket_option_key) { + case ProducerCallbacksOptions::NEW_CONTENT_OBJECT: + socket_option_value = on_new_segment_; + return SOCKET_OPTION_GET; + + case ProducerCallbacksOptions::CONTENT_OBJECT_SIGN: + socket_option_value = on_content_object_to_sign_; + return SOCKET_OPTION_GET; + + case ProducerCallbacksOptions::CONTENT_OBJECT_READY: + socket_option_value = on_content_object_in_output_buffer_; + return SOCKET_OPTION_GET; + + case ProducerCallbacksOptions::CONTENT_OBJECT_OUTPUT: + socket_option_value = on_content_object_output_; + return SOCKET_OPTION_GET; + + default: + return SOCKET_OPTION_NOT_GET; + } +} + +int ProducerSocket::getSocketOption(int socket_option_key, ProducerInterestCallback &socket_option_value) { + switch (socket_option_key) { + case ProducerCallbacksOptions::INTEREST_INPUT: + socket_option_value = on_interest_input_; + return SOCKET_OPTION_GET; + + case ProducerCallbacksOptions::INTEREST_DROP: + socket_option_value = on_interest_dropped_input_buffer_; + return SOCKET_OPTION_GET; + + case ProducerCallbacksOptions::INTEREST_PASS: + socket_option_value = on_interest_inserted_input_buffer_; + return SOCKET_OPTION_GET; + + case CACHE_HIT: + socket_option_value = on_interest_satisfied_output_buffer_; + return SOCKET_OPTION_GET; + + case CACHE_MISS: + socket_option_value = on_interest_process_; + return SOCKET_OPTION_GET; + + default: + return SOCKET_OPTION_NOT_GET; + } +} + +int ProducerSocket::getSocketOption(int socket_option_key, ConsumerContentObjectCallback &socket_option_value) { + return SOCKET_OPTION_NOT_GET; +} + +int ProducerSocket::getSocketOption(int socket_option_key, + ConsumerContentObjectVerificationCallback &socket_option_value) { + return SOCKET_OPTION_NOT_GET; +} + +int ProducerSocket::getSocketOption(int socket_option_key, ConsumerInterestCallback &socket_option_value) { + return SOCKET_OPTION_NOT_GET; +} + +int ProducerSocket::getSocketOption(int socket_option_key, ConsumerContentCallback &socket_option_value) { + return SOCKET_OPTION_NOT_GET; +} + +int ProducerSocket::getSocketOption(int socket_option_key, ConsumerManifestCallback &socket_option_value) { + return SOCKET_OPTION_NOT_GET; +} + +int ProducerSocket::setSocketOption(int socket_option_key, size_t socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::INPUT_BUFFER_SIZE: + if (input_buffer_capacity_ >= 1) { + input_buffer_capacity_ = socket_option_value; + return SOCKET_OPTION_SET; + } + + default: + return SOCKET_OPTION_NOT_SET; + } +} + +int ProducerSocket::getSocketOption(int socket_option_key, size_t &socket_option_value) { + switch (socket_option_key) { + case GeneralTransportOptions::INPUT_BUFFER_SIZE: + socket_option_value = input_buffer_capacity_; + return SOCKET_OPTION_GET; + + case GeneralTransportOptions::OUTPUT_BUFFER_SIZE: + socket_option_value = output_buffer_.size(); + return SOCKET_OPTION_GET; + + default: + return SOCKET_OPTION_NOT_GET; + } +} + +int ProducerSocket::getSocketOption(int socket_option_key, KeyLocator &socket_option_value) { + return SOCKET_OPTION_NOT_GET; +} + +int ProducerSocket::getSocketOption(int socket_option_key, std::shared_ptr &socket_option_value) { + switch (socket_option_key) { + case PORTAL: + socket_option_value = portal_; + return SOCKET_OPTION_GET; + } + + return SOCKET_OPTION_NOT_GET; +} + +int ProducerSocket::getSocketOption(int socket_option_key, IcnObserver **socket_option_value) { + return SOCKET_OPTION_NOT_GET; +} + +int ProducerSocket::setSocketOption(int socket_option_key, IcnObserver *socket_option_value) { + return SOCKET_OPTION_NOT_SET; +} + +} // end namespace icnet diff --git a/icnet/transport/icnet_socket_producer.h b/icnet/transport/icnet_socket_producer.h new file mode 100644 index 00000000..d709e305 --- /dev/null +++ b/icnet/transport/icnet_socket_producer.h @@ -0,0 +1,172 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef ICNET_PRODUCER_SOCKET_H_ +#define ICNET_PRODUCER_SOCKET_H_ + +#include "icnet_socket.h" +#include "icnet_content_store.h" + +#include +#include +#include +#include + +#define REGISTRATION_NOT_ATTEMPTED 0 +#define REGISTRATION_SUCCESS 1 +#define REGISTRATION_FAILURE 2 +#define REGISTRATION_IN_PROGRESS 3 + +namespace icnet { + +class ProducerSocket : public Socket { + public: + + explicit ProducerSocket(Name prefix); + + ~ProducerSocket(); + + void attach(); + + void dispatch(); + + void produce(Name suffix, const uint8_t *buffer, size_t buffer_size, const int request_id = 0, bool is_last = false); + + void produce(ContentObject &content_object); + + void asyncProduce(Name suffix, const uint8_t *buf, size_t buffer_size, const int response_id, bool is_last); + + void asyncProduce(ContentObject &content_object); + + void serveForever(); + + void onInterest(const Name &name, const Interest &interest); + + int setSocketOption(int socket_option_key, int socket_option_value); + + int setSocketOption(int socket_option_key, double socket_option_value); + + int setSocketOption(int socket_option_key, bool socket_option_value); + + int setSocketOption(int socket_option_key, size_t socket_option_value); + + int setSocketOption(int socket_option_key, Name socket_option_value); + + int setSocketOption(int socket_option_key, ProducerContentObjectCallback socket_option_value); + + int setSocketOption(int socket_option_key, ProducerInterestCallback socket_option_value); + + int setSocketOption(int socket_option_key, ConsumerContentObjectVerificationCallback socket_option_value); + + int setSocketOption(int socket_option_key, ConsumerContentObjectCallback socket_option_value); + + int setSocketOption(int socket_option_key, ConsumerInterestCallback socket_option_value); + + int setSocketOption(int socket_option_key, ConsumerContentCallback socket_option_value); + + int setSocketOption(int socket_option_key, ConsumerManifestCallback socket_option_value); + + int setSocketOption(int socket_option_key, KeyLocator socket_option_value); + + int setSocketOption(int socket_option_key, IcnObserver *obs); + + int getSocketOption(int socket_option_key, int &socket_option_value); + + int getSocketOption(int socket_option_key, double &socket_option_value); + + int getSocketOption(int socket_option_key, bool &socket_option_value); + + int getSocketOption(int socket_option_key, size_t &socket_option_value); + + int getSocketOption(int socket_option_key, Name &socket_option_value); + + int getSocketOption(int socket_option_key, ProducerContentObjectCallback &socket_option_value); + + int getSocketOption(int socket_option_key, ProducerInterestCallback &socket_option_value); + + int getSocketOption(int socket_option_key, ConsumerContentObjectVerificationCallback &socket_option_value); + + int getSocketOption(int socket_option_key, ConsumerContentObjectCallback &socket_option_value); + + int getSocketOption(int socket_option_key, ConsumerInterestCallback &socket_option_value); + + int getSocketOption(int socket_option_key, ConsumerContentCallback &socket_option_value); + + int getSocketOption(int socket_option_key, ConsumerManifestCallback &socket_option_value); + + int getSocketOption(int socket_option_key, KeyLocator &socket_option_value); + + int getSocketOption(int socket_option_key, std::shared_ptr &socket_option_value); + + int getSocketOption(int socket_option_key, IcnObserver **socket_option_value); + + private: + + std::shared_ptr portal_; + boost::asio::io_service io_service_; + + Name name_prefix_; + + int data_packet_size_; + int content_object_expiry_time_; + int registration_status_; + + bool making_manifest_; + + // map for storing sequence numbers for several calls of the publish function + std::unordered_map> seq_number_map_; + + int signature_type_; + int signature_size_; + int key_locator_size_; + KeyLocator key_locator_; + + // buffers + ContentStore output_buffer_; + + std::queue > input_buffer_; + std::mutex input_buffer_mutex_; + std::atomic_size_t input_buffer_capacity_; + std::atomic_size_t input_buffer_size_; + + // threads + std::thread listening_thread_; + std::thread processing_thread_; + volatile bool processing_thread_stop_; + volatile bool listening_thread_stop_; + + // callbacks + ProducerInterestCallback on_interest_input_; + ProducerInterestCallback on_interest_dropped_input_buffer_; + ProducerInterestCallback on_interest_inserted_input_buffer_; + ProducerInterestCallback on_interest_satisfied_output_buffer_; + ProducerInterestCallback on_interest_process_; + + ProducerContentObjectCallback on_new_segment_; + ProducerContentObjectCallback on_content_object_to_sign_; + ProducerContentObjectCallback on_content_object_in_output_buffer_; + ProducerContentObjectCallback on_content_object_output_; + ProducerContentObjectCallback on_content_object_evicted_from_output_buffer_; + + private: + void listen(); + + void passContentObjectToCallbacks(const std::shared_ptr &content_object); + +}; + +} + +#endif // ICNET_PRODUCER_SOCKET_H_ diff --git a/icnet/transport/icnet_transport.cc b/icnet/transport/icnet_transport.cc new file mode 100644 index 00000000..c85f02b4 --- /dev/null +++ b/icnet/transport/icnet_transport.cc @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "icnet_transport.h" + +namespace icnet { + +TransportProtocol::TransportProtocol(Socket *icn_socket) : socket_(icn_socket), is_running_(false) { +} + +void TransportProtocol::updatePortal() { + socket_->getSocketOption(PORTAL, portal_); +} + +bool TransportProtocol::isRunning() { + return is_running_; +} + +} // end namespace icnet diff --git a/icnet/transport/icnet_transport.h b/icnet/transport/icnet_transport.h new file mode 100644 index 00000000..738634fb --- /dev/null +++ b/icnet/transport/icnet_transport.h @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef ICNET_TRANSPORT_PROTOCOL_H_ +#define ICNET_TRANSPORT_PROTOCOL_H_ + +#include "icnet_socket.h" + +namespace icnet { + +class TransportProtocol { + public: + TransportProtocol(Socket *icn_socket); + + void updatePortal(); + + bool isRunning(); + + virtual void start() = 0; + + virtual void stop() = 0; + + protected: + Socket *socket_; + std::shared_ptr portal_; + bool is_running_; +}; + +} + +#endif // ICNET_TRANSPORT_PROTOCOL_H_ diff --git a/icnet/transport/icnet_transport_raaqm.cc b/icnet/transport/icnet_transport_raaqm.cc new file mode 100644 index 00000000..9a1b77b4 --- /dev/null +++ b/icnet/transport/icnet_transport_raaqm.cc @@ -0,0 +1,466 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "icnet_transport_raaqm.h" + +namespace icnet { + +RaaqmTransportProtocol::RaaqmTransportProtocol(Socket *icnet_socket) + : VegasTransportProtocol(icnet_socket), rate_estimator_(NULL) { + init(); +} + +RaaqmTransportProtocol::~RaaqmTransportProtocol() { + if (this->rate_estimator_) { + delete this->rate_estimator_; + } +} + +void RaaqmTransportProtocol::init() { + std::ifstream is(RAAQM_CONFIG_PATH); + + std::string line; + + socket_->setSocketOption(RaaqmTransportOptions::BETA_VALUE, default_values::beta_value); + socket_->setSocketOption(RaaqmTransportOptions::DROP_FACTOR, default_values::drop_factor); + socket_->setSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, default_values::interest_lifetime); + socket_->setSocketOption(ConsumerCallbacksOptions::INTEREST_RETRANSMISSION, + default_values::transport_protocol_max_retransmissions); + raaqm_autotune_ = false; + default_beta_ = default_values::beta_value; + default_drop_ = default_values::drop_factor; + beta_wifi_ = default_values::beta_value; + drop_wifi_ = default_values::drop_factor; + beta_lte_ = default_values::beta_value; + drop_lte_ = default_values::drop_factor; + wifi_delay_ = 1000; + lte_delay_ = 15000; + avg_rtt_ = 0.0; + + if (!is) { + std::cout << "WARNING: RAAQM parameters not found, set default values" << std::endl; + return; + } + + std::cout << "Setting RAAQM parameters:" << std::endl; + while (getline(is, line)) { + std::string command; + std::istringstream line_s(line); + + line_s >> command; + + if (command == ";") { + continue; + } + + if (command == "autotune") { + std::string tmp; + std::string val; + line_s >> tmp >> val; + if (val == "yes") { + raaqm_autotune_ = true; + } else { + raaqm_autotune_ = false; + } + std::cout << "params: autotune = " << raaqm_autotune_ << std::endl; + continue; + } + + if (command == "lifetime") { + std::string tmp; + int lifetime; + line_s >> tmp >> lifetime; + std::cout << "params: lifetime = " << lifetime << std::endl; + socket_->setSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, lifetime); + continue; + } + + if (command == "retransmissions") { + std::string tmp; + int rtx; + line_s >> tmp >> rtx; + std::cout << "params: retransmissions = " << rtx << std::endl; + socket_->setSocketOption(ConsumerCallbacksOptions::INTEREST_RETRANSMISSION, rtx); + continue; + } + + if (command == "beta") { + std::string tmp; + line_s >> tmp >> default_beta_; + std::cout << "params: beta = " << default_beta_ << std::endl; + socket_->setSocketOption(RaaqmTransportOptions::BETA_VALUE, default_beta_); + continue; + } + + if (command == "drop") { + std::string tmp; + line_s >> tmp >> default_drop_; + std::cout << "params: drop = " << default_drop_ << std::endl; + socket_->setSocketOption(RaaqmTransportOptions::DROP_FACTOR, default_drop_); + continue; + } + + if (command == "beta_wifi_") { + std::string tmp; + line_s >> tmp >> beta_wifi_; + std::cout << "params: beta_wifi_ = " << beta_wifi_ << std::endl; + continue; + } + + if (command == "drop_wifi_") { + std::string tmp; + line_s >> tmp >> drop_wifi_; + std::cout << "params: drop_wifi_ = " << drop_wifi_ << std::endl; + continue; + } + + if (command == "beta_lte_") { + std::string tmp; + line_s >> tmp >> beta_lte_; + std::cout << "params: beta_lte_ = " << beta_lte_ << std::endl; + continue; + } + + if (command == "drop_lte_") { + std::string tmp; + line_s >> tmp >> drop_lte_; + std::cout << "params: drop_lte_ = " << drop_lte_ << std::endl; + continue; + } + + if (command == "wifi_delay_") { + std::string tmp; + line_s >> tmp >> wifi_delay_; + std::cout << "params: wifi_delay_ = " << wifi_delay_ << std::endl; + continue; + } + + if (command == "lte_delay_") { + std::string tmp; + line_s >> tmp >> lte_delay_; + std::cout << "params: lte_delay_ = " << lte_delay_ << std::endl; + continue; + } + if (command == "alpha") { + std::string tmp; + double rate_alpha = 0.0; + line_s >> tmp >> rate_alpha; + socket_->setSocketOption(RateEstimationOptions::RATE_ESTIMATION_ALPHA, rate_alpha); + std::cout << "params: alpha = " << rate_alpha << std::endl; + continue; + } + + if (command == "batching_parameter") { + std::string tmp; + int batching_param = 0; + line_s >> tmp >> batching_param; + socket_->setSocketOption(RateEstimationOptions::RATE_ESTIMATION_BATCH_PARAMETER, batching_param); + std::cout << "params: batching = " << batching_param << std::endl; + continue; + } + + if (command == "rate_estimator") { + std::string tmp; + int choice_param = 0; + line_s >> tmp >> choice_param; + socket_->setSocketOption(RateEstimationOptions::RATE_ESTIMATION_CHOICE, choice_param); + std::cout << "params: choice = " << choice_param << std::endl; + continue; + } + } + is.close(); + std::cout << "init done" << std::endl; +} + +void RaaqmTransportProtocol::reset() { + is_final_block_number_discovered_ = false; + final_block_number_ = std::numeric_limits::max(); + segment_number_ = 0; + interests_in_flight_ = 0; + last_reassembled_segment_ = 0; + content_buffer_size_ = 0; + content_buffer_.clear(); + interest_retransmissions_.clear(); + receive_buffer_.clear(); + unverified_segments_.clear(); + verified_manifests_.clear(); +} + +void RaaqmTransportProtocol::start() { + if (this->rate_estimator_) { + this->rate_estimator_->onStart(); + } + + if (!cur_path_) { + + double drop_factor; + double minimum_drop_probability; + int sample_number; + int interest_lifetime; + double beta; + + socket_->getSocketOption(DROP_FACTOR, drop_factor); + socket_->getSocketOption(MINIMUM_DROP_PROBABILITY, minimum_drop_probability); + socket_->getSocketOption(SAMPLE_NUMBER, sample_number); + socket_->getSocketOption(INTEREST_LIFETIME, interest_lifetime); + socket_->getSocketOption(BETA_VALUE, beta); + + std::cout << "Drop Factor: " << drop_factor << std::endl; + std::cout << "Minimum drop prob: " << minimum_drop_probability << std::endl; + std::cout << "Sample number: " << sample_number << std::endl; + std::cout << "lifetime: " << interest_lifetime << std::endl; + std::cout << "beta: " << beta << std::endl; + + double alpha = 0.0; + int batching_param = 0; + int choice_param = 0; + socket_->getSocketOption(RATE_ESTIMATION_ALPHA, alpha); + socket_->getSocketOption(RATE_ESTIMATION_BATCH_PARAMETER, batching_param); + socket_->getSocketOption(RATE_ESTIMATION_CHOICE, choice_param); + if (choice_param == 1) { + this->rate_estimator_ = new ALaTcpEstimator(); + } else { + this->rate_estimator_ = new SimpleEstimator(alpha, batching_param); + } + + socket_->getSocketOption(RATE_ESTIMATION_OBSERVER, &(this->rate_estimator_->observer_)); + + cur_path_ = + std::make_shared(drop_factor, minimum_drop_probability, interest_lifetime * 1000, sample_number); + path_table_[default_values::path_id] = cur_path_; + } + + VegasTransportProtocol::start(); +} + +void RaaqmTransportProtocol::copyContent(ContentObject &content_object) { + if ((content_object.getName().get(-1).toSegment() == final_block_number_) || !(is_running_)) { + this->rate_estimator_->onDownloadFinished(); + } + VegasTransportProtocol::copyContent(content_object); +} + +void RaaqmTransportProtocol::updatePathTable(const ContentObject &content_object) { + unsigned char path_id = content_object.getPathLabel(); + + if (path_table_.find(path_id) == path_table_.end()) { + if (cur_path_) { + // Create a new path with some default param + if (path_table_.empty()) { + std::cerr << "No path initialized for path table, error could be in default path initialization." << std::endl; + exit(EXIT_FAILURE); + } else { + // Initiate the new path default param + std::shared_ptr + new_path = std::make_shared(*(path_table_.at(default_values::path_id))); + // Insert the new path into hash table + path_table_[path_id] = new_path; + } + } else { + std::cerr << "UNEXPECTED ERROR: when running,current path not found." << std::endl; + exit(EXIT_FAILURE); + } + } + + cur_path_ = path_table_[path_id]; + + size_t packet_size = content_object.getPacketSize(); + size_t data_size = content_object.getContent().size(); + + // Update measurements for path + cur_path_->updateReceivedStats(packet_size, data_size); +} + +void RaaqmTransportProtocol::updateRtt(uint64_t segment) { + if (!cur_path_) { + throw std::runtime_error("ERROR: no current path found, exit"); + } else { + std::chrono::microseconds rtt; + + std::chrono::steady_clock::duration duration = + std::chrono::steady_clock::now() - interest_timepoints_[segment % default_values::default_buffer_size]; + rtt = std::chrono::duration_cast(duration); + if (this->rate_estimator_) { + this->rate_estimator_->onRttUpdate(rtt.count()); + } + cur_path_->insertNewRtt(rtt.count()); + cur_path_->smoothTimer(); + + avg_rtt_ = (avg_rtt_ * 0.99) + ((double) rtt.count() * 0.01); + if (cur_path_->newPropagationDelayAvailable()) { + check_drop_probability(); + } + } +} + +void RaaqmTransportProtocol::changeInterestLifetime(uint64_t segment) { + return; +} + +void RaaqmTransportProtocol::check_drop_probability() { + if (!raaqm_autotune_) { + return; + } + + unsigned int max_pd = 0; + std::unordered_map>::iterator it; + for (it = path_table_.begin(); it != path_table_.end(); ++it) { + if (it->second->getPropagationDelay() > max_pd && it->second->getPropagationDelay() != UINT_MAX + && !it->second->isStale()) { + max_pd = it->second->getPropagationDelay(); + } + } + + double drop_prob = 0; + double beta = 0; + if (max_pd < wifi_delay_) { //only ethernet paths + drop_prob = default_drop_; + beta = default_beta_; + } else if (max_pd < lte_delay_) { //at least one wifi path + drop_prob = drop_wifi_; + beta = beta_wifi_; + } else { //at least one lte path + drop_prob = drop_lte_; + beta = beta_lte_; + } + + double old_drop_prob = 0; + double old_beta = 0; + socket_->getSocketOption(BETA_VALUE, old_beta); + socket_->getSocketOption(DROP_FACTOR, old_drop_prob); + + if (drop_prob == old_drop_prob && beta == old_beta) { + return; + } + + std::cout << "*************[RAAQM TUNING] new beta = " << beta << " new drop = " << drop_prob << " max pd = " + << max_pd << std::endl; + + socket_->setSocketOption(BETA_VALUE, beta); + socket_->setSocketOption(DROP_FACTOR, drop_prob); + + for (it = path_table_.begin(); it != path_table_.end(); it++) { + it->second->setDropProb(drop_prob); + } +} + +void RaaqmTransportProtocol::check_for_stale_paths() { + if (!raaqm_autotune_) { + return; + } + + bool stale = false; + std::unordered_map>::iterator it; + for (it = path_table_.begin(); it != path_table_.end(); ++it) { + if (it->second->isStale()) { + stale = true; + break; + } + } + if (stale) { + check_drop_probability(); + } +} + +void RaaqmTransportProtocol::onTimeout(const Interest &interest) { + check_for_stale_paths(); + VegasTransportProtocol::onTimeout(interest); +} + +void RaaqmTransportProtocol::increaseWindow() { + double max_window_size = -1; + socket_->getSocketOption(MAX_WINDOW_SIZE, max_window_size); + if (current_window_size_ < max_window_size) // don't expand window above max level + { + double gamma = -1; + socket_->getSocketOption(GAMMA_VALUE, gamma); + + current_window_size_ += gamma / current_window_size_; + socket_->setSocketOption(CURRENT_WINDOW_SIZE, current_window_size_); + } + this->rate_estimator_->onWindowIncrease(current_window_size_); +} + +void RaaqmTransportProtocol::decreaseWindow() { + double min_window_size = -1; + socket_->getSocketOption(MIN_WINDOW_SIZE, min_window_size); + if (current_window_size_ > min_window_size) // don't shrink window below minimum level + { + double beta = -1; + socket_->getSocketOption(BETA_VALUE, beta); + + current_window_size_ = current_window_size_ * beta; + if (current_window_size_ < min_window_size) { + current_window_size_ = min_window_size; + } + + socket_->setSocketOption(CURRENT_WINDOW_SIZE, current_window_size_); + } + this->rate_estimator_->onWindowDecrease(current_window_size_); +} + +void RaaqmTransportProtocol::RAAQM() { + if (!cur_path_) { + std::cerr << "ERROR: no current path found, exit" << std::endl; + exit(EXIT_FAILURE); + } else { + // Change drop probability according to RTT statistics + cur_path_->updateDropProb(); + + if (rand() % 10000 <= cur_path_->getDropProb() * 10000) { + decreaseWindow(); + } + } +} + +void RaaqmTransportProtocol::afterDataUnsatisfied(uint64_t segment) { + // Decrease the window because the timeout happened + decreaseWindow(); +} + +void RaaqmTransportProtocol::afterContentReception(const Interest &interest, const ContentObject &content_object) { + updatePathTable(content_object); + increaseWindow(); + updateRtt(interest.getName().get(-1).toSegment()); + this->rate_estimator_->onDataReceived((int) content_object.getPacketSize()); + // Set drop probablility and window size accordingly + RAAQM(); +} + +void RaaqmTransportProtocol::checkForFastRetransmission(const Interest &interest) { +} + +#if 0 +void +RaaqmTransportProtocol::onInterest(const Interest &interest) +{ + bool mobility = interest.get_MobilityLossFlag(); + + if(mobility){ + const Name &name = interest.getName(); + uint64_t segment = name[-1].toSegment(); + timeval now; + gettimeofday(&now, 0); + std::cout << (long) now.tv_sec << "." << (unsigned) now.tv_usec << " RAAQM: M-Interest " << + segment << " " << interest.getName() << std::endl; + NackSet::iterator it = m_nackSet.find(segment); + if(it == m_nackSet.end()){ + m_nackSet.insert(segment); + } + } +} +#endif + +} // end namespace icnet diff --git a/icnet/transport/icnet_transport_raaqm.h b/icnet/transport/icnet_transport_raaqm.h new file mode 100644 index 00000000..dc5e72bd --- /dev/null +++ b/icnet/transport/icnet_transport_raaqm.h @@ -0,0 +1,100 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef ICNET_RAAQM_TRANSPORT_PROTOCOL_H_ +#define ICNET_RAAQM_TRANSPORT_PROTOCOL_H_ + +#include "icnet_transport_vegas.h" +#include "icnet_transport_vegas_rto_estimator.h" +#include "icnet_transport_raaqm_data_path.h" +#include "icnet_rate_estimation.h" + +namespace icnet { + +class RaaqmTransportProtocol : public VegasTransportProtocol { + public: + RaaqmTransportProtocol(Socket *icnet_socket); + + ~RaaqmTransportProtocol(); + + void start(); + + protected: + void copyContent(ContentObject &content_object); + + private: + + void init(); + + void reset(); + + void afterContentReception(const Interest &interest, const ContentObject &content_object); + + void afterDataUnsatisfied(uint64_t segment); + + void increaseWindow(); + + void updateRtt(uint64_t segment); + + void decreaseWindow(); + + void changeInterestLifetime(uint64_t segment); + + void onTimeout(const Interest &interest); + + void RAAQM(); + + void updatePathTable(const ContentObject &content_object); + + void checkForFastRetransmission(const Interest &interest); + + void check_drop_probability(); + + void check_for_stale_paths(); + + void printRtt(); + + /** + * Current download path + */ + std::shared_ptr cur_path_; + + /** + * Hash table for path: each entry is a pair path ID(key) - path object + */ + std::unordered_map> path_table_; + + bool set_interest_filter_; + //for rate-estimation at packet level + IcnRateEstimator *rate_estimator_; + + //params for autotuning + bool raaqm_autotune_; + double default_beta_; + double default_drop_; + double beta_wifi_; + double drop_wifi_; + double beta_lte_; + double drop_lte_; + unsigned int wifi_delay_; + unsigned int lte_delay_; + + //RTT stats + double avg_rtt_; +}; + +} // end namespace icnet + +#endif // ICNET_RAAQM_TRANSPORT_PROTOCOL_H_ diff --git a/icnet/transport/icnet_transport_raaqm_data_path.cc b/icnet/transport/icnet_transport_raaqm_data_path.cc new file mode 100644 index 00000000..dfef6762 --- /dev/null +++ b/icnet/transport/icnet_transport_raaqm_data_path.cc @@ -0,0 +1,197 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "icnet_transport_raaqm_data_path.h" + +namespace icnet { + +RaaqmDataPath::RaaqmDataPath(double drop_factor, + double minimum_drop_probability, + unsigned new_timer, + unsigned int samples, + uint64_t new_rtt, + uint64_t new_rtt_min, + uint64_t new_rtt_max, + unsigned new_pd) + + : drop_factor_(drop_factor), + minimum_drop_probability_(minimum_drop_probability), + timer_(new_timer), + samples_(samples), + rtt_(new_rtt), + rtt_min_(new_rtt_min), + rtt_max_(new_rtt_max), + prop_delay_(new_pd), + new_prop_delay_(false), + drop_prob_(0), + packets_received_(0), + last_packets_received_(0), + m_packets_bytes_received_(0), + last_packets_bytes_received_(0), + raw_data_bytes_received_(0), + last_raw_data_bytes_received_(0), + average_rtt_(0), + alpha_(ALPHA) { + gettimeofday(&previous_call_of_path_reporter_, 0); + gettimeofday(&m_last_received_pkt_, 0); +} + +RaaqmDataPath &RaaqmDataPath::pathReporter() { + struct timeval now; + gettimeofday(&now, 0); + + double rate, delta_t; + + delta_t = getMicroSeconds(now) - getMicroSeconds(previous_call_of_path_reporter_); + rate = (m_packets_bytes_received_ - last_packets_bytes_received_) * 8 / delta_t; // MB/s + std::cout << "RaaqmDataPath status report: " << "at time " << (long) now.tv_sec << "." << (unsigned) now.tv_usec + << " sec:\n" << (void *) this << " path\n" << "Packets Received: " + << (packets_received_ - last_packets_received_) << "\n" << "delta_t " << delta_t << " [us]\n" << "rate " + << rate << " [Mbps]\n" << "Last RTT " << rtt_ << " [us]\n" << "Max RTT " << rtt_max_ << " [us]\n" + << "Min RTT " << rtt_min_ << " [us]\n" << "Prop delay " << prop_delay_ << " [us]\n"; + last_packets_received_ = packets_received_; + last_packets_bytes_received_ = m_packets_bytes_received_; + gettimeofday(&previous_call_of_path_reporter_, 0); + + return *this; +} + +RaaqmDataPath &RaaqmDataPath::insertNewRtt(uint64_t new_rtt) { + rtt_ = new_rtt; + rtt_samples_.get().push_back(new_rtt); + + if (rtt_samples_.get().size() > samples_) { + rtt_samples_.get().pop_front(); + } + + rtt_max_ = *(rtt_samples_.get().rbegin()); + rtt_min_ = *(rtt_samples_.get().begin()); + + if (rtt_min_ < prop_delay_) { + new_prop_delay_ = true; + prop_delay_ = rtt_min_; + } + + gettimeofday(&m_last_received_pkt_, 0); + + return *this; +} + +RaaqmDataPath &RaaqmDataPath::updateReceivedStats(std::size_t packet_size, std::size_t data_size) { + packets_received_++; + m_packets_bytes_received_ += packet_size; + raw_data_bytes_received_ += data_size; + + return *this; +} + +double RaaqmDataPath::getDropFactor() { + return drop_factor_; +} + +double RaaqmDataPath::getDropProb() { + return drop_prob_; +} + +RaaqmDataPath &RaaqmDataPath::setDropProb(double dropProb) { + drop_prob_ = dropProb; + + return *this; +} + +double RaaqmDataPath::getMinimumDropProbability() { + return minimum_drop_probability_; +} + +double RaaqmDataPath::getTimer() { + return timer_; +} + +RaaqmDataPath &RaaqmDataPath::smoothTimer() { + timer_ = (1 - TIMEOUT_SMOOTHER) * timer_ + (TIMEOUT_SMOOTHER) * rtt_ * (TIMEOUT_RATIO); + + return *this; +} + +double RaaqmDataPath::getRtt() { + return rtt_; +} + +double RaaqmDataPath::getAverageRtt() { + return average_rtt_; +} + +double RaaqmDataPath::getRttMax() { + return rtt_max_; +} + +double RaaqmDataPath::getRttMin() { + return rtt_min_; +} + +unsigned RaaqmDataPath::getSampleValue() { + return samples_; +} + +unsigned RaaqmDataPath::getRttQueueSize() { + return rtt_samples_.get().size(); +} + +RaaqmDataPath &RaaqmDataPath::updateDropProb() { + + drop_prob_ = 0.0; + + if (getSampleValue() == getRttQueueSize()) { + if (rtt_max_ == rtt_min_) { + drop_prob_ = minimum_drop_probability_; + } else { + drop_prob_ = minimum_drop_probability_ + drop_factor_ * (rtt_ - rtt_min_) / (rtt_max_ - rtt_min_); + } + } + + return *this; +} + +double RaaqmDataPath::getMicroSeconds(struct timeval &time) { + return (double) (time.tv_sec) * 1000000 + (double) (time.tv_usec); +} + +void RaaqmDataPath::setAlpha(double alpha) { + if (alpha >= 0 && alpha <= 1) { + alpha_ = alpha; + } +} + +bool RaaqmDataPath::newPropagationDelayAvailable() { + bool r = new_prop_delay_; + new_prop_delay_ = false; + return r; +} + +unsigned int RaaqmDataPath::getPropagationDelay() { + return prop_delay_; +} + +bool RaaqmDataPath::isStale() { + struct timeval now; + gettimeofday(&now, 0); + double time = getMicroSeconds(now) - getMicroSeconds(m_last_received_pkt_); + if (time > 2000000) { + return true; + } + return false; +} + +} // end namespace icnet diff --git a/icnet/transport/icnet_transport_raaqm_data_path.h b/icnet/transport/icnet_transport_raaqm_data_path.h new file mode 100644 index 00000000..0093f84b --- /dev/null +++ b/icnet/transport/icnet_transport_raaqm_data_path.h @@ -0,0 +1,242 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef ICNET_RAAQM_DATA_PATH_H_ +#define ICNET_RAAQM_DATA_PATH_H_ + +#include + +#include +#include +#include +#include +#include +#include + +#define TIMEOUT_SMOOTHER 0.1 +#define TIMEOUT_RATIO 10 +#define ALPHA 0.8 + +namespace icnet { + +class RaaqmDataPath { + public: + + RaaqmDataPath(double drop_factor, + double minimum_drop_probability, + unsigned new_timer, + unsigned int samples, + uint64_t new_rtt = 1000, + uint64_t new_rtt_min = 1000, + uint64_t new_rtt_max = 1000, + unsigned new_pd = UINT_MAX); + + public: + + /* + * @brief Print Path Status + */ + RaaqmDataPath &pathReporter(); + + /* + * @brief Add a new RTT to the RTT queue of the path, check if RTT queue is full, and thus need overwrite. + * Also it maintains the validity of min and max of RTT. + * @param new_rtt is the value of the new RTT + */ + RaaqmDataPath &insertNewRtt(uint64_t new_rtt); + + /** + * @brief Update the path statistics + * @param packet_size the size of the packet received, including the ICN header + * @param data_size the size of the data received, without the ICN header + */ + RaaqmDataPath &updateReceivedStats(std::size_t packet_size, std::size_t data_size); + + /** + * @brief Get the value of the drop factor parameter + */ + double getDropFactor(); + + /** + * @brief Get the value of the drop probability + */ + double getDropProb(); + + /** + * @brief Set the value pf the drop probability + * @param drop_prob is the value of the drop probability + */ + RaaqmDataPath &setDropProb(double drop_prob); + + /** + * @brief Get the minimum drop probability + */ + double getMinimumDropProbability(); + + /** + * @brief Get last RTT + */ + double getRtt(); + + /** + * @brief Get average RTT + */ + double getAverageRtt(); + + /** + * @brief Get the current m_timer value + */ + double getTimer(); + + /** + * @brief Smooth he value of the m_timer accordingly with the last RTT measured + */ + RaaqmDataPath &smoothTimer(); + + /** + * @brief Get the maximum RTT among the last samples + */ + double getRttMax(); + + /** + * @brief Get the minimum RTT among the last samples + */ + double getRttMin(); + + /** + * @brief Get the number of saved samples + */ + unsigned getSampleValue(); + + /** + * @brief Get the size og the RTT queue + */ + unsigned getRttQueueSize(); + + /* + * @brief Change drop probability according to RTT statistics + * Invoked in RAAQM(), before control window size update. + */ + RaaqmDataPath &updateDropProb(); + + /** + * @brief This function convert the time from struct timeval to its value in microseconds + */ + static double getMicroSeconds(struct timeval &time); + + void setAlpha(double alpha); + + /** + * @brief Returns the smallest RTT registered so far for this path + */ + + unsigned int getPropagationDelay(); + + bool newPropagationDelayAvailable(); + + bool isStale(); + + private: + + /** + * The value of the drop factor + */ + double drop_factor_; + + /** + * The minumum drop probability + */ + double minimum_drop_probability_; + + /** + * The timer, expressed in milliseconds + */ + double timer_; + + /** + * The number of samples to store for computing the protocol measurements + */ + const unsigned int samples_; + + /** + * The last, the minimum and the maximum value of the RTT (among the last m_samples samples) + */ + uint64_t rtt_, rtt_min_, rtt_max_, prop_delay_; + + bool new_prop_delay_; + + /** + * The current drop probability + */ + double drop_prob_; + + /** + * The number of packets received in this path + */ + intmax_t packets_received_; + + /** + * The first packet received after the statistics print + */ + intmax_t last_packets_received_; + + /** + * Total number of bytes received including the ICN header + */ + intmax_t m_packets_bytes_received_; + + /** + * The amount of packet bytes received at the last path summary computation + */ + intmax_t last_packets_bytes_received_; + + /** + * Total number of bytes received without including the ICN header + */ + intmax_t raw_data_bytes_received_; + + /** + * The amount of raw dat bytes received at the last path summary computation + */ + intmax_t last_raw_data_bytes_received_; + + class byArrival; + + class byOrder; + + /** + * Double ended queue for the RTTs + */ + typedef boost::multi_index_container >, + // index by ascending order + boost::multi_index::ordered_non_unique, + boost::multi_index::identity > > > RTTQueue; + RTTQueue rtt_samples_; + + /** + * Time of the last call to the path reporter method + */ + struct timeval previous_call_of_path_reporter_; + struct timeval m_last_received_pkt_; + + double average_rtt_; + double alpha_; +}; + +} // end namespace icnet + +#endif // ICNET_RAAQM_DATA_PATH_H_ diff --git a/icnet/transport/icnet_transport_vegas.cc b/icnet/transport/icnet_transport_vegas.cc new file mode 100644 index 00000000..fae29bf8 --- /dev/null +++ b/icnet/transport/icnet_transport_vegas.cc @@ -0,0 +1,488 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "icnet_transport_vegas.h" +#include "icnet_socket_consumer.h" + +namespace icnet { + +VegasTransportProtocol::VegasTransportProtocol(Socket *icnet_socket) + : TransportProtocol(icnet_socket), + is_final_block_number_discovered_(false), + final_block_number_(std::numeric_limits::max()), + last_reassembled_segment_(0), + content_buffer_size_(0), + current_window_size_(default_values::min_window_size), + interests_in_flight_(0), + segment_number_(0), + interest_retransmissions_(default_values::max_window_size), + interest_timepoints_(default_values::default_buffer_size), + receive_buffer_(default_values::default_buffer_size), + unverified_segments_(default_values::default_buffer_size), + verified_manifests_(default_values::default_buffer_size) { + icnet_socket->getSocketOption(PORTAL, portal_); +} + +VegasTransportProtocol::~VegasTransportProtocol() { + stop(); +} + +void VegasTransportProtocol::start() { + is_running_ = true; + is_final_block_number_discovered_ = false; + final_block_number_ = std::numeric_limits::max(); + segment_number_ = 0; + interests_in_flight_ = 0; + last_reassembled_segment_ = 0; + content_buffer_size_ = 0; + content_buffer_.clear(); + interest_retransmissions_.clear(); + receive_buffer_.clear(); + unverified_segments_.clear(); + verified_manifests_.clear(); + + sendInterest(); + + bool isAsync = false; + socket_->getSocketOption(ASYNC_MODE, isAsync); + + bool isContextRunning = false; + socket_->getSocketOption(RUNNING, isContextRunning); + + if (!isAsync && !isContextRunning) { + socket_->setSocketOption(RUNNING, true); + portal_->runEventsLoop(); + + // If portal returns, the download (maybe) is finished, so we can remove the pending interests + + removeAllPendingInterests(); + } +} + +// TODO Reuse this function for sending an arbitrary interest +void VegasTransportProtocol::sendInterest() { + Name prefix; + socket_->getSocketOption(GeneralTransportOptions::NAME_PREFIX, prefix); + + Name suffix; + socket_->getSocketOption(GeneralTransportOptions::NAME_SUFFIX, suffix); + + if (!suffix.empty()) { + prefix.append(suffix); + } + + prefix.appendSegment(segment_number_); + + std::shared_ptr interest = std::make_shared(std::move(prefix)); + + int interestLifetime = default_values::interest_lifetime; + socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, interestLifetime); + interest->setInterestLifetime(uint32_t(interestLifetime)); + + ConsumerInterestCallback on_interest_output = VOID_HANDLER; + + socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT, on_interest_output); + if (on_interest_output != VOID_HANDLER) { + on_interest_output(*dynamic_cast(socket_), *interest); + } + + if (!is_running_) { + return; + } + + interests_in_flight_++; + interest_retransmissions_[segment_number_ % default_values::default_buffer_size] = 0; + interest_timepoints_[segment_number_ % default_values::default_buffer_size] = std::chrono::steady_clock::now(); + + portal_->sendInterest(*interest, + bind(&VegasTransportProtocol::onContentSegment, this, _1, _2), + bind(&VegasTransportProtocol::onTimeout, this, _1)); + segment_number_++; +} + +void VegasTransportProtocol::stop() { + is_running_ = false; + portal_->stopEventsLoop(); +} + +void VegasTransportProtocol::onContentSegment(const Interest &interest, ContentObject &content_object) { + uint64_t segment = interest.getName().get(-1).toSegment(); + + if (is_running_ == false /*|| input_buffer_[segment]*/) { + return; + } + + interests_in_flight_--; + + changeInterestLifetime(segment); + ConsumerContentObjectCallback on_data_input = VOID_HANDLER; + socket_->getSocketOption(ConsumerCallbacksOptions::CONTENT_OBJECT_INPUT, on_data_input); + if (on_data_input != VOID_HANDLER) { + on_data_input(*dynamic_cast(socket_), content_object); + } + + ConsumerInterestCallback on_interest_satisfied = VOID_HANDLER; + socket_->getSocketOption(INTEREST_SATISFIED, on_interest_satisfied); + if (on_interest_satisfied != VOID_HANDLER) { + on_interest_satisfied(*dynamic_cast(socket_), const_cast(interest)); + } + + if (content_object.getContentType() == PayloadType::MANIFEST) { + onManifest(interest, content_object); + } else if (content_object.getContentType() == PayloadType::DATA) { + onContentObject(interest, content_object); + } // TODO InterestReturn + + scheduleNextInterests(); +} + +void VegasTransportProtocol::afterContentReception(const Interest &interest, const ContentObject &content_object) { + increaseWindow(); +} + +void VegasTransportProtocol::afterDataUnsatisfied(uint64_t segment) { + decreaseWindow(); +} + +void VegasTransportProtocol::scheduleNextInterests() { + if (segment_number_ == 0) { + current_window_size_ = final_block_number_; + + double maxWindowSize = -1; + socket_->getSocketOption(MAX_WINDOW_SIZE, maxWindowSize); + + if (current_window_size_ > maxWindowSize) { + current_window_size_ = maxWindowSize; + } + + while (interests_in_flight_ < current_window_size_) { + if (is_final_block_number_discovered_) { + if (segment_number_ <= final_block_number_) { + sendInterest(); + } else { + break; + } + } else { + sendInterest(); + } + } + } else { + if (is_running_) { + while (interests_in_flight_ < current_window_size_) { + if (is_final_block_number_discovered_) { + if (segment_number_ <= final_block_number_) { + sendInterest(); + } else { + break; + } + } else { + sendInterest(); + } + } + } + } +} + +void VegasTransportProtocol::decreaseWindow() { + double min_window_size = -1; + socket_->getSocketOption(MIN_WINDOW_SIZE, min_window_size); + if (current_window_size_ > min_window_size) { + current_window_size_ = std::ceil(current_window_size_ / 2); + + socket_->setSocketOption(CURRENT_WINDOW_SIZE, current_window_size_); + } +} + +void VegasTransportProtocol::increaseWindow() { + double max_window_size = -1; + socket_->getSocketOption(MAX_WINDOW_SIZE, max_window_size); + if (current_window_size_ < max_window_size) // don't expand window above max level + { + current_window_size_++; + socket_->setSocketOption(CURRENT_WINDOW_SIZE, current_window_size_); + } +}; + +void VegasTransportProtocol::changeInterestLifetime(uint64_t segment) { + std::chrono::steady_clock::duration duration = std::chrono::steady_clock::now() - interest_timepoints_[segment]; + rtt_estimator_.addMeasurement(std::chrono::duration_cast(duration)); + + RtoEstimator::Duration rto = rtt_estimator_.computeRto(); + std::chrono::milliseconds lifetime = std::chrono::duration_cast(rto); + + socket_->setSocketOption(INTEREST_LIFETIME, (int) lifetime.count()); +} + +void VegasTransportProtocol::onManifest(const Interest &interest, ContentObject &content_object) { + if (!is_running_) { + return; + } + + if (verifyManifest(content_object)) { + // TODO Retrieve piece of data using manifest + } +} + +bool VegasTransportProtocol::verifyManifest(ContentObject &content_object) { + ConsumerContentObjectVerificationCallback on_manifest_to_verify = VOID_HANDLER; + socket_->getSocketOption(ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY, on_manifest_to_verify); + + bool is_data_secure = false; + + if (on_manifest_to_verify == VOID_HANDLER) { + // TODO Perform manifest verification + } else if (on_manifest_to_verify(*dynamic_cast(socket_), content_object)) { + is_data_secure = true; + } + + return is_data_secure; +} + +bool VegasTransportProtocol::requireInterestWithHash(const Interest &interest, + const ContentObject &content_object, + Manifest &manifest) { + // TODO Require content object with specific hash. + return true; +} + +// TODO Add the name in the digest computation! +void VegasTransportProtocol::onContentObject(const Interest &interest, ContentObject &content_object) { + if (verifyContentObject(interest, content_object)) { + checkForFastRetransmission(interest); + + uint64_t segment = interest.getName().get(-1).toSegment(); + + if (interest_retransmissions_[segment % default_values::default_buffer_size] == 0) { + afterContentReception(interest, content_object); + } + + if (content_object.hasFinalChunkNumber()) { + is_final_block_number_discovered_ = true; + final_block_number_ = content_object.getFinalChunkNumber(); + } + + bool virtualDownload = false; + + socket_->getSocketOption(VIRTUAL_DOWNLOAD, virtualDownload); + + if (!virtualDownload) { + receive_buffer_[segment % default_values::default_buffer_size] = content_object.shared_from_this(); + reassemble(); + } else { + if (segment == final_block_number_) { + portal_->stopEventsLoop(); + } + } + } +} + +bool VegasTransportProtocol::verifyContentObject(const Interest &interest, ContentObject &content_object) { + // TODO Check content object using manifest + return true; +} + +// TODO move inside manifest +bool VegasTransportProtocol::pointsToManifest(ContentObject &content_object) { + // TODO Check content objects using manifest + return true; +} + +void VegasTransportProtocol::onTimeout(const Interest &interest) { + if (!is_running_) { + return; + } + + interests_in_flight_--; + + std::cerr << "Timeout on " << interest.getName() << std::endl; + + ConsumerInterestCallback on_interest_timeout = VOID_HANDLER; + socket_->getSocketOption(INTEREST_EXPIRED, on_interest_timeout); + if (on_interest_timeout != VOID_HANDLER) { + on_interest_timeout(*dynamic_cast(socket_), const_cast(interest)); + } + + uint64_t segment = interest.getName().get(-1).toSegment(); + + // Do not retransmit interests asking contents that do not exist. + if (is_final_block_number_discovered_) { + if (segment > final_block_number_) { + return; + } + } + + afterDataUnsatisfied(segment); + + int max_retransmissions; + socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_RETRANSMISSION, max_retransmissions); + + if (interest_retransmissions_[segment % default_values::default_buffer_size] < max_retransmissions) { + + ConsumerInterestCallback on_interest_retransmission = VOID_HANDLER; + socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_RETRANSMISSION, on_interest_retransmission); + + if (on_interest_retransmission != VOID_HANDLER) { + on_interest_retransmission(*dynamic_cast(socket_), interest); + } + + ConsumerInterestCallback on_interest_output = VOID_HANDLER; + + socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT, on_interest_output); + if (on_interest_output != VOID_HANDLER) { + on_interest_output(*dynamic_cast(socket_), interest); + } + + if (!is_running_) { + return; + } + + //retransmit + interests_in_flight_++; + interest_retransmissions_[segment % default_values::default_buffer_size]++; + + portal_->sendInterest(interest, + bind(&VegasTransportProtocol::onContentSegment, this, _1, _2), + bind(&VegasTransportProtocol::onTimeout, this, _1)); + } else { + is_running_ = false; + + bool virtual_download = false; + socket_->getSocketOption(VIRTUAL_DOWNLOAD, virtual_download); + + if (!virtual_download) { + reassemble(); + } + + portal_->stopEventsLoop(); + } + +} + +void VegasTransportProtocol::copyContent(ContentObject &content_object) { + Array a = content_object.getContent(); + + content_buffer_.insert(content_buffer_.end(), (uint8_t *) a.data(), (uint8_t *) a.data() + a.size()); + + if ((content_object.getName().get(-1).toSegment() == final_block_number_) || (!is_running_)) { + + // return content to the user + ConsumerContentCallback on_payload = VOID_HANDLER; + socket_->getSocketOption(CONTENT_RETRIEVED, on_payload); + if (on_payload != VOID_HANDLER) { + on_payload(*dynamic_cast(socket_), + (uint8_t *) (content_buffer_.data()), + content_buffer_.size()); + } + + //reduce window size to prevent its speculative growth in case when consume() is called in loop + int current_window_size = -1; + socket_->getSocketOption(CURRENT_WINDOW_SIZE, current_window_size); + if ((uint64_t) current_window_size > final_block_number_) { + socket_->setSocketOption(CURRENT_WINDOW_SIZE, (int) (final_block_number_)); + } + + is_running_ = false; + portal_->stopEventsLoop(); + } +} + +void VegasTransportProtocol::reassemble() { + uint64_t index = last_reassembled_segment_ % default_values::default_buffer_size; + + while (receive_buffer_[index % default_values::default_buffer_size]) { + if (receive_buffer_[index % default_values::default_buffer_size]->getContentType() == PayloadType::DATA) { + copyContent(*receive_buffer_[index % default_values::default_buffer_size]); + } + + receive_buffer_[index % default_values::default_buffer_size].reset(); + + last_reassembled_segment_++; + index = last_reassembled_segment_ % default_values::default_buffer_size; + } +} + +bool VegasTransportProtocol::verifySegmentUsingManifest(Manifest &manifestSegment, ContentObject &content_object) { + // TODO Content object verification exploiting manifest + return true; +} + +void VegasTransportProtocol::checkForFastRetransmission(const Interest &interest) { + uint64_t segNumber = interest.getName().get(-1).toSegment(); + received_segments_[segNumber] = true; + fast_retransmitted_segments.erase(segNumber); + + uint64_t possibly_lost_segment = 0; + uint64_t highest_received_segment = received_segments_.rbegin()->first; + + for (uint64_t i = 0; i <= highest_received_segment; i++) { + if (received_segments_.find(i) == received_segments_.end()) { + if (fast_retransmitted_segments.find(i) == fast_retransmitted_segments.end()) { + possibly_lost_segment = i; + uint8_t out_of_order_segments = 0; + for (uint64_t j = i; j <= highest_received_segment; j++) { + if (received_segments_.find(j) != received_segments_.end()) { + out_of_order_segments++; + if (out_of_order_segments >= default_values::max_out_of_order_segments) { + fast_retransmitted_segments[possibly_lost_segment] = true; + fastRetransmit(interest, possibly_lost_segment); + } + } + } + } + } + } +} + +void VegasTransportProtocol::fastRetransmit(const Interest &interest, uint64_t chunk_number) { + int max_retransmissions; + socket_->getSocketOption(GeneralTransportOptions::MAX_INTEREST_RETX, max_retransmissions); + + if (interest_retransmissions_[chunk_number % default_values::default_buffer_size] < max_retransmissions) { + Name name = interest.getName().getPrefix(-1); + name.appendSegment(chunk_number); + + std::shared_ptr retx_interest = std::make_shared(name); + + ConsumerInterestCallback on_interest_retransmission = VOID_HANDLER; + socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_RETRANSMISSION, on_interest_retransmission); + + if (on_interest_retransmission != VOID_HANDLER) { + on_interest_retransmission(*dynamic_cast(socket_), *retx_interest); + } + + ConsumerInterestCallback on_interest_output = VOID_HANDLER; + + socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT, on_interest_output); + if (on_interest_output != VOID_HANDLER) { + on_interest_output(*dynamic_cast(socket_), *retx_interest); + } + + if (!is_running_) { + return; + } + + interests_in_flight_++; + interest_retransmissions_[chunk_number % default_values::default_buffer_size]++; + portal_->sendInterest(*retx_interest, + bind(&VegasTransportProtocol::onContentSegment, this, _1, _2), + bind(&VegasTransportProtocol::onTimeout, this, _1)); + } +} + +void VegasTransportProtocol::removeAllPendingInterests() { + portal_->clear(); +} + +} // namespace icn-interface diff --git a/icnet/transport/icnet_transport_vegas.h b/icnet/transport/icnet_transport_vegas.h new file mode 100644 index 00000000..a47050d8 --- /dev/null +++ b/icnet/transport/icnet_transport_vegas.h @@ -0,0 +1,109 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef ICNET_VEGAS_TRANSPORT_PROTOCOL_H_ +#define ICNET_VEGAS_TRANSPORT_PROTOCOL_H_ + +#include "icnet_transport.h" +#include "icnet_transport_vegas_rto_estimator.h" + +namespace icnet { + +class VegasTransportProtocol : public TransportProtocol { + public: + + VegasTransportProtocol(Socket *icnet_socket); + + virtual ~VegasTransportProtocol(); + + virtual void start(); + + void stop(); + + protected: + + void sendInterest(); + + void onContentSegment(const Interest &interest, ContentObject &content_object); + + bool verifyContentObject(const Interest &interest, ContentObject &content_object); + + bool verifyManifest(ContentObject &content_object); + + virtual void onTimeout(const Interest &interest); + + void onManifest(const Interest &interest, ContentObject &content_object); + + void onContentObject(const Interest &interest, ContentObject &content_object); + + virtual void changeInterestLifetime(uint64_t segment); + + void scheduleNextInterests(); + + virtual void decreaseWindow(); + + virtual void increaseWindow(); + + virtual void afterContentReception(const Interest &interest, const ContentObject &content_object); + + virtual void afterDataUnsatisfied(uint64_t segment); + + void reassemble(); + + virtual void copyContent(ContentObject &content_object); + + bool pointsToManifest(ContentObject &content_object); + + bool requireInterestWithHash(const Interest &interest, const ContentObject &content_object, Manifest &manifest); + + bool verifySegmentUsingManifest(Manifest &manifestSegment, ContentObject &content_object); + + virtual void checkForFastRetransmission(const Interest &interest); + + void fastRetransmit(const Interest &interest, uint64_t chunk_number); + + void removeAllPendingInterests(); + + protected: + // reassembly variables + bool is_final_block_number_discovered_; + uint64_t final_block_number_; + uint64_t last_reassembled_segment_; + std::vector content_buffer_; + size_t content_buffer_size_; + + // transmission variables + double current_window_size_; + double pending_window_size_; + uint64_t interests_in_flight_; + uint64_t segment_number_; + std::vector interest_retransmissions_; + std::vector interest_timepoints_; + RtoEstimator rtt_estimator_; + + // buffers + std::vector > receive_buffer_; // verified segments by segment number + std::vector > unverified_segments_; // used with embedded manifests + std::vector > verified_manifests_; // by segment number + + // Fast Retransmission + std::map received_segments_; + std::unordered_map fast_retransmitted_segments; +}; + +} // end namespace icnet + + +#endif // ICNET_VEGAS_TRANSPORT_PROTOCOL_H_ diff --git a/icnet/transport/icnet_transport_vegas_rto_estimator.cc b/icnet/transport/icnet_transport_vegas_rto_estimator.cc new file mode 100644 index 00000000..889a6bd3 --- /dev/null +++ b/icnet/transport/icnet_transport_vegas_rto_estimator.cc @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "icnet_transport_vegas_rto_estimator.h" +#include "icnet_socket_options_default_values.h" + +namespace icnet { + +RtoEstimator::RtoEstimator(Duration min_rto) : smoothed_rtt_(RtoEstimator::getInitialRtt().count()), + rtt_variation_(0), + first_measurement_(true), + last_rto_(min_rto.count()) { +} + +void RtoEstimator::addMeasurement(Duration rtt) { + double duration = static_cast(rtt.count()); + if (first_measurement_) { + smoothed_rtt_ = duration; + rtt_variation_ = duration / 2; + first_measurement_ = false; + } else { + rtt_variation_ = + (1 - default_values::beta) * rtt_variation_ + default_values::beta * std::abs(smoothed_rtt_ - duration); + smoothed_rtt_ = (1 - default_values::alpha) * smoothed_rtt_ + default_values::alpha * duration; + } +} + +RtoEstimator::Duration RtoEstimator::computeRto() const { + double rto = + smoothed_rtt_ + std::max(double(default_values::clock_granularity.count()), default_values::k * rtt_variation_); + return Duration(static_cast(rto)); +} + +} // end namespace icnet \ No newline at end of file diff --git a/icnet/transport/icnet_transport_vegas_rto_estimator.h b/icnet/transport/icnet_transport_vegas_rto_estimator.h new file mode 100644 index 00000000..7b18533c --- /dev/null +++ b/icnet/transport/icnet_transport_vegas_rto_estimator.h @@ -0,0 +1,49 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef ICNET_VEGAS_TRANSPORT_PROTOCOL_RTT_ESTIMATOR_H_ +#define ICNET_VEGAS_TRANSPORT_PROTOCOL_RTT_ESTIMATOR_H_ + +#include "icnet_common.h" + +// Implementation inspired from RFC6298 (https://tools.ietf.org/search/rfc6298#ref-JK88) + +namespace icnet { + +class RtoEstimator { + public: + typedef std::chrono::microseconds Duration; + + static Duration getInitialRtt() { + return std::chrono::seconds(1); + } + + RtoEstimator(Duration min_rto = std::chrono::seconds(1)); + + void addMeasurement(Duration measure); + + Duration computeRto() const; + + private: + double smoothed_rtt_; + double rtt_variation_; + bool first_measurement_; + double last_rto_; +}; + +} // end namespace icnet + + +#endif // ICNET_VEGAS_TRANSPORT_PROTOCOL_RTT_ESTIMATOR_H_ -- cgit 1.2.3-korg