diff options
59 files changed, 2091 insertions, 1415 deletions
diff --git a/apps/http-proxy/src/ATSConnector.h b/apps/http-proxy/src/ATSConnector.h index 1ef2fa252..dbec30353 100644 --- a/apps/http-proxy/src/ATSConnector.h +++ b/apps/http-proxy/src/ATSConnector.h @@ -36,7 +36,7 @@ typedef std::deque< BufferQueue; class ATSConnector { - static constexpr uint32_t buffer_size = 1024 * 64; + static constexpr uint32_t buffer_size = 1024 * 512; enum class ConnectorState { CLOSED, diff --git a/apps/http-proxy/src/IcnReceiver.cc b/apps/http-proxy/src/IcnReceiver.cc index 18553d84b..8d0fb4917 100644 --- a/apps/http-proxy/src/IcnReceiver.cc +++ b/apps/http-proxy/src/IcnReceiver.cc @@ -80,6 +80,13 @@ AsyncConsumerProducer::AsyncConsumerProducer( } ret = producer_socket_.setSocketOption( + interface::GeneralTransportOptions::MAKE_MANIFEST, true); + + if (ret != SOCKET_OPTION_SET) { + TRANSPORT_LOGD("Warning: impossible to enable signatures."); + } + + ret = producer_socket_.setSocketOption( interface::GeneralTransportOptions::DATA_PACKET_SIZE, mtu_); if (ret != SOCKET_OPTION_SET) { @@ -130,6 +137,8 @@ void AsyncConsumerProducer::manageIncomingInterest( auto _it = chunk_number_map_.find(name); auto _end = chunk_number_map_.end(); + std::cout << "Received interest " << seg << std::endl; + if (_it != _end) { if (_it->second.second) { // Content is in production @@ -137,7 +146,7 @@ void AsyncConsumerProducer::manageIncomingInterest( } if (seg >= _it->second.first) { - TRANSPORT_LOGD( + TRANSPORT_LOGI( "Ignoring interest with name %s for a content object which does not " "exist. (Request: %u, max: %u)", name.toString().c_str(), (uint32_t)seg, (uint32_t)_it->second.first); @@ -145,6 +154,8 @@ void AsyncConsumerProducer::manageIncomingInterest( } } + std::cout << "Received interest " << seg << std::endl; + bool is_mpd = HTTPMessageFastParser::isMpdRequest(payload->data(), payload->length()); @@ -194,6 +205,7 @@ void AsyncConsumerProducer::publishContent(const uint8_t* data, if (headers) { request_counter_++; } + it->second.first += producer_socket_.produce(name, data, size, is_last, start_suffix); diff --git a/ctrl/libhicnctrl/includes/hicn/ctrl/commands.h b/ctrl/libhicnctrl/includes/hicn/ctrl/commands.h index c55529894..d8e5329b3 100644 --- a/ctrl/libhicnctrl/includes/hicn/ctrl/commands.h +++ b/ctrl/libhicnctrl/includes/hicn/ctrl/commands.h @@ -37,6 +37,7 @@ #endif /* WITH_POLICY */ #define SYMBOLIC_NAME_LEN 16 +#define MAX_FWD_STRATEGY_RELATED_PREFIXES 10 typedef struct in6_addr ipv6_addr_t; typedef uint32_t ipv4_addr_t; @@ -69,6 +70,7 @@ typedef enum { MAPME_DISCOVERY, MAPME_TIMESCALE, MAPME_RETX, + MAPME_SEND_UPDATE, CONNECTION_SET_ADMIN_STATE, #ifdef WITH_POLICY ADD_POLICY, @@ -251,9 +253,13 @@ typedef struct { uint8_t strategyType; uint8_t addressType; uint8_t len; + uint8_t related_prefixes; + ip_address_t addresses[MAX_FWD_STRATEGY_RELATED_PREFIXES]; + uint8_t lens[MAX_FWD_STRATEGY_RELATED_PREFIXES]; + uint8_t addresses_type[MAX_FWD_STRATEGY_RELATED_PREFIXES]; } set_strategy_command; -// SIZE=20 +// SIZE=208 //========== [11] SET WLDR ========== @@ -303,6 +309,12 @@ typedef struct { uint32_t timePeriod; } mapme_timing_command; +typedef struct { + ip_address_t address; + uint8_t addressType; + uint8_t len; +} mapme_send_update_command; + // SIZE=1 typedef struct { @@ -395,6 +407,8 @@ static inline int payloadLengthDaemon(command_id id) { return sizeof(mapme_timing_command); case MAPME_RETX: return sizeof(mapme_timing_command); + case MAPME_SEND_UPDATE: + return sizeof(mapme_send_update_command); case CONNECTION_SET_ADMIN_STATE: return sizeof(connection_set_admin_state_command); #ifdef WITH_POLICY diff --git a/docs/README.md b/docs/README.md index c99cf879b..d7a6c6cc8 100644 --- a/docs/README.md +++ b/docs/README.md @@ -1,27 +1,24 @@ -Building Documents +# Building Documents These instructions show how documentation sources are built. To build your files, you can either Create a Virtual Environment using virtualenv, which installs all the required applications for you. -Create a Virtual Environment using virtualenv -============================ +# Create a Virtual Environment using virtualenv For more information on how to use the Python virtual environment check -out https://packaging.python.org/guides/installing-using-pip-and-virtualenv +out <https://packaging.python.org/guides/installing-using-pip-and-virtualenv> -Get the Documents ------------------------------- +# Get the Documents -For example start with a clone of the vpp. +For example start with a clone of the hicn. -$ git clone https://gerrit.fd.io/r/hicn +$ git clone <https://gerrit.fd.io/r/hicn> $ cd hicn -Install the virtual environment ----------------------------------------------- +# Install the virtual environment $ python3 -m pip install --user virtualenv $ python3 -m virtualenv env @@ -33,16 +30,14 @@ Which installs all the required applications into it's own, isolated, virtual environment, so as to not interfere with other builds that may use different versions of software. -Build the html files ----------------------------- +# Build the html files Be sure you are in your hicn/docs directory, since that is where Sphinx will look for your conf.py file, and build the documents into an index.html file $ make html -View the results ------------------------- +# View the results If there are no errors during the build process, you should now have an index.html file in your hicn/docs/build/html directory, which you can diff --git a/docs/source/apps.md b/docs/source/apps.md index eefb68be1..9b2384f29 100644 --- a/docs/source/apps.md +++ b/docs/source/apps.md @@ -10,7 +10,7 @@ higet and hicn-http-proxy are two application examples that use hicn stack. Build dependencies: -- c++14 ( clang++ / g++ ) +- C++14 ( clang++ / g++ ) - CMake 3.5 or higher Basic dependencies: @@ -283,22 +283,3 @@ Retrieve a web page: ```bash client$ /usr/bin/higet -O - http://webserver/index.html -P c001 ``` - -## License - -This software is distributed under the following license: - -```text -Copyright (c) 2017-2019 Cisco and/or its affiliates. -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at: - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -``` diff --git a/docs/source/control.md b/docs/source/control.md index 0940fff67..f94d3f3b9 100644 --- a/docs/source/control.md +++ b/docs/source/control.md @@ -1,23 +1,33 @@ -# Sysrepo plugin for hicn-plugin (2019) +# Sysrepo plugin for hicn-plugin These plugins serve as a data management agent. They provide yang models via -NETCONF to allow the management of hicn-light, and hicn-plugin which runs in VPP -instance from out-of-box. +NETCONF to allow the management of hicn-light, and hicn VPP plugin. ## Software Requirement - VPP +- sysrepo +- hicn-plugin +- hicn-light +- libyang - sysrepo +- libnetconf +- netopeer2 -- hicn-plugin +To install libyang, sysrepo, libnetconf and netopeer2 for Ubuntu18 amd64/arm64 +and ad-hoc repository is available and maintained in bintray. -- hicn-light +```shell +echo "deb [trusted=yes] https://dl.bintray.com/icn-team/apt-hicn-extras bionic main" | tee -a /etc/apt/sources.list +apt-get update && apt-get install -y libyang sysrepo libnetconf2 netopeer2-server +``` ## hICN yang model You can install the yang model using the following bash script: -``` + +```shell EXIT_CODE=0 command -v sysrepoctl > /dev/null if [ $? != 0 ]; then @@ -27,6 +37,7 @@ else sysrepoctl --install --yang=path_to_hicn_yang_model fi ``` + hicn.yang can be found in the yang-model. It consists of two container nodes: hicn-conf and hicn-state. One is used to hold the configuration data (i.e., hicn-conf) and one for providing the state data (i.e., hicn-state). The @@ -40,7 +51,8 @@ to allow controller to communicate with the hicn-plugin as well as update the st data in hicn-state. To setup the startup configuration you can use the following script: -``` + +```shell EXIT_CODE=0 command -v sysrepocfg > /dev/null if [ $? != 0 ]; then @@ -52,7 +64,8 @@ fi ``` startup.xml is placed in the yang-model. Here you can find the content: -``` + +```shell <hicn-conf xmlns="urn:sysrepo:hicn"> <params> <enable_disable>false</enable_disable> @@ -65,6 +78,7 @@ startup.xml is placed in the yang-model. Here you can find the content: </params> </hicn-conf> ``` + As can be seen, it contains the leaves of the params in hicn-conf node which is used as the startup configuration. This configuration can be changed through the controller by subscribing which changes the target to the running state. hicn @@ -74,7 +88,8 @@ state data. In order to run different RPCs from controller you can use the examples in the controler_rpcs_instances.xml in the yang-model. Here you can find the content: -``` + +```shell <node-params-get xmlns="urn:sysrepo:hicn"/> <node-stat-get xmlns="urn:sysrepo:hicn"/> @@ -164,8 +179,10 @@ In order to connect through the netopeer client run the netopeer2-cli. Then, fol - get (you can get the configuration and operational data) - get-config (you can get the configuration data) - edit-config --target running --config - you can modify the configuration but it needs an xml configuration input -``` + +You can modify the configuration but it needs an xml configuration input + +```shell <hicn-conf xmlns="urn:sysrepo:hicn"> <params> <enable_disable>false</enable_disable> @@ -178,6 +195,7 @@ In order to connect through the netopeer client run the netopeer2-cli. Then, fol </params> </hicn-conf> ``` + - user-rpc (you can call one of the rpc proposed by hicn model but it needs an xml input) ## Connect from OpenDaylight (ODL) controller @@ -191,10 +209,11 @@ In order to connect through the OpenDaylight follow these procedure: - run a rest client program (e.g., postman or RESTClient) - mount the remote netopeer2-server to the OpenDaylight by the following REST API: -PUT http://localhost:8181/restconf/config/network-topology:network-topology/topology/topology-netconf/node/hicn-node +PUT <http://localhost:8181/restconf/config/network-topology:network-topology/topology/topology-netconf/node/hicn-node> with the following body -``` + +```shell <node xmlns="urn:TBD:params:xml:ns:yang:network-topology"> <node-id>hicn-node</node-id> <host xmlns="urn:opendaylight:netconf-node-topology">Remote_NETCONF_SERVER_IP</host> @@ -205,11 +224,12 @@ with the following body <keepalive-delay xmlns="urn:opendaylight:netconf-node-topology">1</keepalive-delay> </node> ``` + Note that the header files must be set to Content-Type: application/xml, Accept: application/xml. - send the operation through the following REST API: -POST http://localhost:8181/restconf/operations/network-topology:network-topology/topology/topology-netconf/node/hicn-node/yang-ext:mount/ietf-netconf:edit-config +POST <http://localhost:8181/restconf/operations/network-topology:network-topology/topology/topology-netconf/node/hicn-node/yang-ext:mount/ietf-netconf:edit-config> The body can be used the same as edit-config in netopeer2-cli. @@ -238,4 +258,4 @@ At this point, we are able to connect to the remote device. ## Release note -The current version is compatible with the 19.01 VPP stable and sysrepo 0.7.7.
\ No newline at end of file +The current version is compatible with the 20.01 VPP stable and sysrepo devel.
\ No newline at end of file diff --git a/docs/source/hicn-light.md b/docs/source/hicn-light.md index e321b78e5..a68af7f05 100644 --- a/docs/source/hicn-light.md +++ b/docs/source/hicn-light.md @@ -1,5 +1,4 @@ -The Hybrid ICN Portable Forwarder -============================== +# The Hybrid ICN Portable Forwarder ## Introduction @@ -7,24 +6,11 @@ hicn-light is a socket based forwarder ## Using hicn-light -### Platforms - -hicn-light has been tested in: - -- Ubuntu 16.04 / 18.04 (x86_64) -- Debian Testing -- Centos 7 -- MacOSX 10.12 -- Android -- iOS - -Other platforms and architectures may work. - ### Dependencies Build dependencies: -- c99 ( clang / gcc ) +- C99 ( clang / gcc ) - CMake 3.4 Basic dependencies: @@ -49,7 +35,7 @@ Use the `-h` option to display the help messages The command `hicn-light-daemon` runs the hicn-light forwarder. The forwarder can be executed with the following options: -``` +```shell hicn-light-daemon [--port port] [--daemon] [--capacity objectStoreSize] [--log facility=level] [--log-file filename] [--config file] @@ -81,7 +67,7 @@ default port for hicn-light is 9695. Commands are expected on port 2001. `hicn-light-control` can be used to send command to the hicn-light forwarder and configure it. The command can be executed in the following way: -``` +```shell hicn-light-control [commands] Options: @@ -101,7 +87,7 @@ Information about the commands are also available in the `hicn-light-control` he For local connections (application to hicn-light) we expect a TCP listener. The default port for the local listener is 9695. -``` +```shell add listener <protocol> <symbolic> <local_adress> <local_port> <symbolic> :User defined name for listener, must start with alpha and bealphanum @@ -113,7 +99,7 @@ add listener <protocol> <symbolic> <local_adress> <local_port> `add listener hicn`: creates a hicn listener with the specified options on the local forwarder. -``` +```shell add listener hicn <symbolic> <local_adress> <symbolic> :User defined name for listener, must start with alpha and be alphanum @@ -123,7 +109,7 @@ add listener hicn <symbolic> <local_adress> `add connection`: creates a TCP or UDP connection on the local forwarder with the specified options. -``` +```shell add connection <protocol> <symbolic> <remote_ip> <remote_port> <local_ip> <local_port> <protocol> : tcp | udp @@ -134,9 +120,10 @@ add connection <protocol> <symbolic> <remote_ip> <remote_port> <local_ip> <local <local_port> : local TCP/UDP port ``` + `add connection hicn`: creates an hicn connection on the local forwarder with the specified options. -``` +```shell add connection hicn <symbolic> <remote_ip> <local_ip> <symbolic> : symbolic name, e.g. 'conn1' (must be unique, start with alpha) @@ -144,14 +131,17 @@ add connection hicn <symbolic> <remote_ip> <local_ip> <local_ip> : local IP address to bind to ``` + `list`: lists the connections, routes or listeners available on the local hicn-light forwarder -``` + +```shell list <connections | routes | listeners> ``` + `add route`: adds a route to the specified connection -``` +```shell add route <symbolic | connid> <prefix> <cost> <symbolic> :The symbolic name for an exgress (must be unique, start with alpha) @@ -163,7 +153,7 @@ add route <symbolic | connid> <prefix> <cost> `remove connection`: removes the specified connection. At the moment, this commands is available only for UDP connections, TCP is ignored. -``` +```shell remove connection <protocol> <symbolic | connid> <protocol> : tcp | upd. This is the protocol used to create the connection. @@ -174,7 +164,7 @@ remove connection <protocol> <symbolic | connid> `remove route`: remove the specified prefix for a local connection -``` +```shell remove route <symbolic | connid> <prefix> <connid> : the alphanumeric name of a local connection @@ -183,22 +173,24 @@ remove route <symbolic | connid> <prefix> `cache serve`: enables/disables replies from local content store (if available) -``` +```shell cache serve <on|off> ``` + `cache store`: enables/disables the storage of incoming data packets in the local content store (if available) -``` +```shell cache store <on|off> ``` + `cache clear`: removes all the cached data form the local content store (if available) -``` +```shell cache clear - ``` + `set strategy`: sets the forwarding strategy for a give prefix. There are 4 different strategies implemented in hicn-light: @@ -211,12 +203,13 @@ implemented in hicn-light: ICNP 2013. - low_latency: uses the face with the lowest latency. In case more faces have similar latency the strategy uses them in parallel -``` +```shell set strategy <prefix> <strategy> <preifx> : the prefix to which apply the forwarding strategy <strategy> : random | loadbalancer | low_latency ``` + `set wldr`: turns on/off WLDR on the specified connection. WLDR (Wireless Loss Detiection and Recovery) is a protocol that can be used to recover losses generated by unreliable wireless connections, such as WIFI. More information on WLDR are available in: @@ -225,42 +218,46 @@ set strategy <prefix> <strategy> ICN 2016. Notice that WLDR is currently available only for UDP connections. In order to work properly, WLDR needs to be activated on both side of the connection. -``` +```shell set wldr <on|off> <symbolic | connid> <symbolic> :The symbolic name for an exgress (must be unique, start with alpha) <connid>: :The egress connection id (see 'help list connections') ``` + `add punting`: Add punting rules to the forwarders. -``` +```shell add punting <symbolic> <prefix> <symbolic> : listener symbolic name <address> : prefix to add as a punting rule. (example 1234::0/64) ``` + `mapme enable`: enables/disables mapme -``` +```shell mapme enable <on|off> ``` `mapme discovery`: enables/disables mapme discovery -``` +```shell mapme discovery <on|off> ``` `mapme timescale`: set the timescale value expressed in millisencods -``` +```shell mapme timescale <milliseconds> ``` + `mapme retx`: set the retrasmission time value expressed in millisecond -``` +```shell mapme retx <milliseconds> ``` + `quit`: Exits the interactive shell ### hicn-light Configuration File Example @@ -268,7 +265,7 @@ mapme retx <milliseconds> This is an example of a simple configuration file for hicn-light. It can be loaded by running the command `hicn-light-daemon --config configFile.cfg`, assuming the file name is configFile.cfg -``` +```shell #create a local listener on port 9199. This will be used by the applications to talk with the forwarder add listener udp local0 192.168.0.1 9199 @@ -279,22 +276,3 @@ add connection udp conn0 192.168.0.20 12345 192.168.0.1 9199 #add a route toward the remote node add route conn0 c001::/64 1 ``` - -## License - -This software is distributed under the following license: - -``` -Copyright (c) 2017-2019 Cisco and/or its affiliates. -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at: - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -``` diff --git a/docs/source/index.rst b/docs/source/index.rst index 43a380b5f..9123734b4 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -2,7 +2,8 @@ Hybrid ICN software distribution ================================= .. toctree:: - + + started lib vpp-plugin transport diff --git a/docs/source/lib.md b/docs/source/lib.md index 58f23b510..727d496bf 100644 --- a/docs/source/lib.md +++ b/docs/source/lib.md @@ -1,6 +1,5 @@ -The Hybrid ICN Core Library -============================== +# The Hybrid ICN Core Library ## Introduction @@ -16,6 +15,7 @@ userspace according to the available APIs and permissions that each system offers. The library consists in several layers: + - the core library (hicn.h) provides a standard hICN packet format, as well as an API allowing manipulation of packet headers; - an hICN helper, allowing an hICN stack to be built in userspace in a portable @@ -32,54 +32,39 @@ either a consumer, a producer, or a simple forwarder. ## Folder content -CMakeLists.txt CMkake global build file -doc Package documentation -README.md This file -src - base.h Base definitions for hICN implementation - CMakeLists.txt CMake library build file - common.{h,c} Harmonization layer across supported platforms - compat.{h,c} Compatibility layer for former API - error.{h,c} Error management files - header.h hICN header definitions - hicn.h Master include file - mapme.{h,c} MAP-Me : anchorless producer mobility mechanisms - name.{h,c} hICN naming conventions and name processing + IP helpers - ops.{h,c} Protocol-independent hICN operations - protocol/* Protocol headers + protocol-dependent implementations - protocol.h Common file for protocols +```shell +. ++-- CMakeLists.txt CMkake global build file ++-- doc Package documentation ++-- README.md This file ++-- src +| +-- base.h Base definitions for hICN implementation +| +-- CMakeLists.txt CMake library build file +| +-- common.{h,c} Harmonization layer across supported platforms +| +-- compat.{h,c} Compatibility layer for former API +| +-- error.{h,c} Error management files +| +-- header.h hICN header definitions +| +-- hicn.h Master include file +| +-- mapme.{h,c} MAP-Me : anchorless producer mobility mechanisms +| +-- name.{h,c} hICN naming conventions and name processing + IP helpers +| +-- ops.{h,c} Protocol-independent hICN operations +| +-- protocol/* Protocol headers + protocol-dependent implementations +| +-- protocol.h Common file for protocols +``` ## Using libhicn -### Platforms ### - -libhicn has been tested in: - -- Ubuntu 16.04 LTS (x86_64) -- Ubuntu 18.04 LTS (x86_64) -- Debian Stable/Testing -- Red Hat Enterprise Linux 7 -- CentOS 7 -- Android 8 -- iOS 12 -- macOS 10.12 -- Windows 10 - -Other platforms and architectures may work. - ### Dependencies Build dependencies: -- c11 ( clang / gcc ) +- C11 ( clang / gcc ) - CMake 3.4 Basic dependencies: None ## Installation -You can either use released packages, or compile libhicn from sources. - ### Release mode mkdir build @@ -95,22 +80,3 @@ cd debug cmake .. -DCMAKE_BUILD_TYPE=Debug make sudo make install - -## License - -This software is distributed under the following license: - -``` -Copyright (c) 2017-2019 Cisco and/or its affiliates. -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at: - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -``` diff --git a/docs/source/started.md b/docs/source/started.md new file mode 100644 index 000000000..099b5f4d9 --- /dev/null +++ b/docs/source/started.md @@ -0,0 +1,72 @@ +# Getting started + +The Hybrid ICN software distribution can be installed for several platforms. +The network stack comes in two different implementations: one scalable based +on VPP and one portable based on IPC and sockets. + +The transport stack is a unique library that is used for both the scalable +and portable network stacks. + +## Platforms + +- Ubuntu 18.04 LTS (amd64, arm64) +- Debian Stable/Testing +- Red Hat Enterprise Linux 7 +- CentOS 7 +- Android 10 (amd64, arm64) +- iOS 13 +- macOS 10.15 +- Windows 10 + +Other platforms and architectures may work. +You can either use released packages, or compile hicn from sources. + +### Ubuntu 18.04/16.04 amd64/arm64 + +```shell +curl -s https://packagecloud.io/install/repositories/fdio/release/script.deb.sh | sudo bash +``` + +### CentOS 7 amd64 + +```shell +curl -s https://packagecloud.io/install/repositories/fdio/release/script.rpm.sh | sudo bash +``` + +### macOS + +```shell +brew install hicn +``` + +### Android + +Install the applications via the Google Play Store +<https://play.google.com/store/apps/developer?id=ICN+Team> + +### iOS + +Coming soon. + +### Windows 10 + +Coming soon. + +## License + +This software is distributed under the following license: + +```shell +Copyright (c) 2017-2020 Cisco and/or its affiliates. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at: + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +``` diff --git a/docs/source/transport.md b/docs/source/transport.md index e7fc267ee..7f8293a78 100644 --- a/docs/source/transport.md +++ b/docs/source/transport.md @@ -1,7 +1,6 @@ -Libtransport: data transport library for hICN -==================================================== +# The Transport Library -## Introduction ## +## Introduction This library provides transport services and socket API for applications willing to communicate using the hICN protocol stack. @@ -14,20 +13,12 @@ Overview: - Transport services (authentication, integrity, segmentation, reassembly, naming) - Interfaces for Applications (from low-level interfaces for interest-data interaction to high level interfaces for Application Data Unit interaction) -## Build Dependencies ## +## Build Dependencies - libparc - libmemif (linux only, if compiling with VPP support) - libasio -### Ubuntu 16.04 and Ubuntu 18.04 ### - -```bash - $ echo "deb [trusted=yes] https://nexus.fd.io/content/repositories/fd.io.master.ubuntu.$(lsb_release -sc).main/ ./" \ - | sudo tee -a /etc/apt/sources.list.d/99fd.io.list - $ sudo apt-get install libparc libasio-dev -``` - If you wish to use the library for connecting to the vpp hicn-plugin, you will need to also install vpp, the vpp libraries and the libmemif libraries: - DEB packages: @@ -39,39 +30,40 @@ You can get them either from from the vpp packages ot the source code. Check the Libmemif is in the vpp-lib and vpp-dev packages. -### Mac OSX ### +### macOS We recommend to use [HomeBrew](https://brew.sh/) for installing the libasio dependency: ```bash - $ brew install asio + brew install asio ``` Download, compile and install libparc: ```bash - $ git clone -b cframework/master https://gerrit.fd.io/r/cicn cframework && cd cframework - $ mkdir -p libparc.build && cd libparc.build - $ cmake ../libparc - $ make - $ make install + git clone -b cframework/master https://gerrit.fd.io/r/cicn cframework && cd cframework + mkdir -p libparc.build && cd libparc.build + cmake ../libparc + make + make install ``` Libparc will be installed by default under `/usr/local/lib` and `/usr/local/include`. -Since VPP does not support MAC OS, the hicn-plugin connector is not built. +Since VPP does not support macOS, the hicn-plugin connector is not built. -## Build The library ## +## Build the library From the project root folder: ```bash - $ cd libtransport - $ mkdir build && cd build - $ cmake .. - $ make + cd libtransport + mkdir build && cd build + cmake .. + make ``` -### Compile options ### + +### Compile options The build process can be customized with the following options: @@ -85,42 +77,10 @@ The build process can be customized with the following options: An option can be set using cmake -D`OPTION`=`VALUE`. -Install the library -------------------- +### Install the library For installing the library, from the cmake build folder: ```bash - $ sudo make install -``` - -## Supported platforms - -- Ubuntu 16.04 LTS (x86_64) -- Ubuntu 18.04 LTS (x86_64) -- Debian Stable/Testing -- Red Hat Enterprise Linux 7 -- CentOS 7 -- Android 8 -- iOS 12 -- macOS 10.12 -- Windows 10 - -## License ## - -This software is distributed under the following license: - -``` -Copyright (c) 2017-2019 Cisco and/or its affiliates. -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at: - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. + sudo make install ```
\ No newline at end of file diff --git a/docs/source/utils.md b/docs/source/utils.md index 0d81ea426..5b9178b56 100644 --- a/docs/source/utils.md +++ b/docs/source/utils.md @@ -6,22 +6,11 @@ hicn-ping-server, hicn-ping-client and hiperf are three utility applications for ## Using hICN Utils applications -### Platforms - -hicn-light has been tested in: - -- Ubuntu 16.04 / 18.04 (x86_64) -- Debian Testing -- Centos 7 -- MacOSX 10.12 -- Android -- iOS - ### Dependencies Build dependencies: -- c++14 ( clang++ / g++ ) +- C++14 ( clang++ / g++ ) - CMake 3.4 Basic dependencies: @@ -41,7 +30,7 @@ The utility applications are a set of binary executables consisting of a client/ The command `hicn-ping-server` runs the server side ping application. hicn-ping-server can be executed with the following options: -``` +```shell usage: hicn-ping-server [options] Options: @@ -66,7 +55,7 @@ hicn-ping-server -n c001::/64 The command `hicn-ping-client` runs the client side ping application. hicn-ping-client can be executed with the following options: -``` +```shell usage: hicn-ping-client [options] Options: @@ -93,7 +82,7 @@ hicn-ping-client -n c001::1 The command `hiperf` is a tool for performing network throughput measurements with hicn. It can be executed as server or client using the following options: -``` +```shell usage: hiperf [-S|-C] [options] [prefix|name] Options: @@ -110,9 +99,9 @@ Server specific: -k <keystore_path> = path of p12 file containing the crypto material used for signing the packets -y <hash_algorithm> = use the selected hash algorithm for calculating manifest digests -p <password> = password for p12 keystore --x = produce a content of <download_size>, then after downloading it produces a new content of - <download_size> without resetting the suffix to 0 --B <bitrate> = bitrate for RTC producer, to be used with the -R option, in kbps (example: 64kbps) +-x = produce a content of <download_size>, then after downloading it produces a new + content of <download_size> without resetting the suffix to 0 +-B <bitrate> = bitrate for RTC producer, to be used with the -R option, in kbps (example: 64kbps) Client specific: -b <beta_parameter> = RAAQM beta parameter @@ -229,7 +218,7 @@ This will run the client with a fixed window of 50 interests. For sending hICN packets directly over the network, using hicn faces, change the configuration of the two forwarders and restart them. -##### Server Configuration +#### Server ```bash server$ mkdir -p ${HICN_ROOT}/etc @@ -241,7 +230,7 @@ add listener hicn list0 ${LOCAL_IP} EOF ``` -#### Client Configuration +#### Client ```bash client$ mkdir -p ${HICN_ROOT}/etc @@ -284,42 +273,23 @@ $ export HICN_ROOT=${PWD}/../hicn-install Make sure vpp is running: ```bash -$ sudo systemctl restart vpp +sudo systemctl restart vpp ``` Run the hicn-plugin: ```bash -$ vppctl hicn control start +vppctl hicn control start ``` Run hiperf server: ```bash -$ hiperf -S b001::/64 +hiperf -S b001::/64 ``` Run hiperf client: ```bash -$ hiperf -C b001::1 -W 300 -``` - -## License - -This software is distributed under the following license: - -``` -Copyright (c) 2017-2019 Cisco and/or its affiliates. -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at: - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. +hiperf -C b001::1 -W 300 ``` diff --git a/docs/source/vpp-plugin.md b/docs/source/vpp-plugin.md index ed9d122d6..772f9a730 100644 --- a/docs/source/vpp-plugin.md +++ b/docs/source/vpp-plugin.md @@ -1,21 +1,21 @@ -VPP plugin -============================== +# VPP plugin The hICN-plugin forwarder -## Introduction ## +## Introduction A high-performance Hybrid ICN forwarder as a plugin to VPP. The plugin provides the following functionalities: - - Fast packet processing - - Interest aggregation - - Content caching - - Forwarding strategies +- Fast packet processing +- Interest aggregation +- Content caching +- Forwarding strategies -## Quick Start ## -``` +## Quick Start + +```bash From the code tree root (VPP installed with DEB pkg) @@ -47,42 +47,32 @@ CMAKE variables: - VPP_HOME -- set the directory containing the include and lib directories of vpp. ``` -## Using hICN plugin ## - -### Platforms ### - -hICN-plugin has been tested in: +## Using hICN plugin -- Ubuntu 16.04 LTS (x86_64) -- Ubuntu 18.04 LTS (x86_64) -- Debian Stable/Testing -- Red Hat Enterprise Linux 7 -- CentOS 7 - - -### Dependencies ### +### Dependencies Build dependencies: -- VPP 19.08 - - DEB packages (can be found https://packagecloud.io/fdio/release/install): +- VPP 20.01 + - DEB packages (can be found <https://packagecloud.io/fdio/release/install):> - vpp - libvppinfra-dev - vpp-dev Runtime dependencies: -- VPP 19.08 - - DEB packages (can be found https://packagecloud.io/fdio/release/install): +- VPP 20.01 + - DEB packages (can be found <https://packagecloud.io/fdio/release/install):> - vpp - vpp-plugin-core - vpp-plugin-dpdk (only to use DPDK compatible nics) Hardware support (not mandatory): -- [DPDK](http://DPDK.org/) compatible nics +- [DPDK](http://DPDK.org/) compatible NICs + +## Getting started -## Getting started ## In order to start, the hICN plugin requires a running instance of VPP The steps required to successfully start hICN are: @@ -94,33 +84,36 @@ The steps required to successfully start hICN are: Detailed information for configuring VPP can be found at [https://wiki.fd.io/view/VPP](https://wiki.fd.io/view/VPP). -### Setup the host for VPP ### +### Setup the host for VPP Hugepages must be enabled in the system -``` -$ sudo sysctl -w vm.nr_hugepages=1024 +```shell +sudo sysctl -w vm.nr_hugepages=1024 ``` In order to use a DPDK interface, the `uio` and `uio_pci_generic` or `vfio_pci` modules need to be loaded in the kernel -``` -$ sudo modprobe uio -$ sudo modprobe uio_pci_generic -$ sudo modprobe vfio_pci +```shell +sudo modprobe uio +sudo modprobe uio_pci_generic +sudo modprobe vfio_pci ``` If the DPDK interface we want to assign to VPP is up, we must bring it down +```bash +sudo ifconfig <interface_name> down ``` -$ sudo ifconfig <interface_name> down -``` + or + +```bash +sudo ip link set <interface_name> down ``` -$ sudo ip link set <interface_name> down -``` -### Configure VPP ### +### Configure VPP + The file /etc/VPP/startup.conf contains a set of parameters to setup VPP at startup. The following example sets up VPP to use a DPDK interfaces: @@ -156,9 +149,10 @@ plugins { # plugin acl_plugin.so { disable } } ``` + Where `0000:08:00.0` must be replaced with the actual PCI address of the DPDK interface -### Start VPP ### +### Start VPP VPP can be started as a process or a service: @@ -171,10 +165,11 @@ $ sudo vpp -c /etc/vpp/startup.conf ``` -### Configure hICN plugin ### +### Configure hICN plugin + The hICN plugin can be configured either using the VPP command-line interface (CLI), through a configuration file or through the VPP binary api -#### hICN plugin CLI #### +#### hICN plugin CLI The CLI commands for the hICN plugin start all with the hicn keyword. To see the full list of command available type: @@ -185,9 +180,9 @@ vpp# hicn ? `hicn control param`: configures the internal parameter of the hICN plugin. This command must be run before hicn control start. -``` +```shell hicn control param { pit { size <entries> | { dfltlife | minlife | maxlife } <seconds> } | cs {size <entries> | app <portion to reserved to app>} } - <entries> :set the maximum number of entry in the PIT or CS. Default for PIT is 131072, for CS is 4096. CS size cannot be grater than PIT size. Moreover CS size must be smaller than (# of vlib buffer - 8196). + <entries> :set the maximum number of entry in the PIT or CS. Default for PIT is 131072, for CS is 4096. CS size cannot be grater than PIT size. Moreover CS size must be smaller than (# of vlib buffer - 8196). <seconds> :set the default, maximum or minimum lifetime of pit entries. Default value 2s (default), 0.2s (minumum), 20s (maximum) <portion to reserved to app> :set the portion of CS to reserve to application running locally on the forwarder. Default is 30% of the cs size. ``` @@ -198,7 +193,7 @@ hicn control param { pit { size <entries> | { dfltlife | minlife | maxlife } <se `hicn face app` : manipulates producer and consumer application faces in the forwarder. -``` +```shell hicn face app {add intfc <sw_if> {prod prefix <hicn_prefix> cs_size <size_in_packets>} {cons}} | {del <face_id>} <sw_if> :software interface existing in vpp on top of which to create an application face <hicn_prefix> :prefix to bound to the producer application face. Only content matching the prefix will be allowed through such face. @@ -209,7 +204,7 @@ hicn face app {add intfc <sw_if> {prod prefix <hicn_prefix> cs_size <size_in_pac `hicn face ip`: manipulates ip application faces in the forwarder. -``` +```shell hicn face ip {add [local <src_address>] remote <dst_address> intfc <sw_if>} | {del id <face_id>} <src_address> :the IPv4 or IPv6 local IP address to bind to (not mandatory, if not specified the local address is one of the address assigned to sw_if) <dst_address> :the IPv4 or IPv6 address of the remote system @@ -217,10 +212,9 @@ hicn face ip {add [local <src_address>] remote <dst_address> intfc <sw_if>} | {d <face_id> :id of the face to remove ``` - `hicn face show`: list the available faces in the forwarder. -``` +```shell hicn face show [<face_id>| type <ip/udp>] <face_id> :face id of which we want to display the informations <ip/udp> :shows all the ip or udp faces available @@ -228,7 +222,7 @@ hicn face show [<face_id>| type <ip/udp>] `hicn face udp`: manipulates udp application faces in the forwarder. -``` +```shell hicn face udp {add src_addr <src_address> port <src_port > dst_addr <dst_address> port <dst_port>} intfc <sw_if> | {del id <face_id>} <src_address> :the IPv4 or IPv6 local IP address to bind to <src_port> :the local UDP port @@ -241,7 +235,7 @@ hicn face udp {add src_addr <src_address> port <src_port > dst_addr <dst_address `hicn fib`: manipulates hicn fib entries. -``` +```shell hicn fib {{add | delete } prefix <prefix> face <face_id> } | set strategy <strategy_id> prefix <prefix> <prefix> :prefix to add to the FIB <face_id> :face id to add as nexto hop in the FIB entry @@ -250,7 +244,7 @@ hicn fib {{add | delete } prefix <prefix> face <face_id> } | set strategy <strat `hicn pgen client`: set an vpp forwarder as an hicn packet generator client -``` +```shell hicn pgen client fwd <ip|hicn> src <addr> n_ifaces <n_ifaces> name <prefix> lifetime <interest-lifetime> intfc <data in-interface> max_seq <max sequence number> n_flows <number of flows> <ip|hicn> :set if the underlying forwarder is configured as ip or hicn <src_addr> :source address to use in the interests, i.e., the locator for routing the data packet back @@ -264,7 +258,7 @@ hicn pgen client fwd <ip|hicn> src <addr> n_ifaces <n_ifaces> name <prefix> life `hicn pgen server`: set an vpp forwarder as an hicn packet generator client -``` +```shell hicn pgen server fwd <ip|hicn> name <prefix> intfc <interest in-interface> size <payload_size> <ip|hicn> :set if the underlying forwarder is configured as ip or hicn <prefix> :prefix to use to reply to interest @@ -274,7 +268,7 @@ hicn pgen server fwd <ip|hicn> name <prefix> intfc <interest in-interface> size `hicn punting`: manipulates punting rules -``` +```shell hicn punting {add|delete} prefix <prefix> intfc <sw_if> {type ip | type <udp4|udp6> src_port <src_port> dst_port <dst_port>} <prefix> :prefix to punt to the hICN plugin <sw_if> :software interface where to apply the punting @@ -284,7 +278,8 @@ hicn punting {add|delete} prefix <prefix> intfc <sw_if> {type ip | type <udp4|ud ``` `hicn show`: show forwarder information. -``` + +```shell hicn show [detail] [strategies] <detail> :shows additional details as pit,cs entries allocation/deallocation <strategies> :shows only the available strategies int he forwarder @@ -292,18 +287,18 @@ hicn show [detail] [strategies] `hicn strategy mw set`: set the weight for a face. -``` +```shell hicn strategy mw set prefix <prefix> face <face_id> weight <weight> <prefix> :prefix to which the strategy applies <face_id> :id of the face to set the weight <weight> :weight ``` -#### hICN plugin configuration file #### +#### hICN plugin configuration file A configuration can be use to setup the hicn plugin when vpp starts. The configuration file is made of a list of CLI commands. In order to set vpp to read the configuration file, the file /etc/vpp/startup.conf needs to be modified as follows: -``` +```shell unix { nodaemon log /tmp/vpp.log @@ -311,15 +306,16 @@ unix { startup-config <path to configuration file> } ``` -#### hICN plugin binary api #### + +#### hICN plugin binary API The binary api, or the vapi, can be used as well to configure the hicn plugin. For each cli command there is a corresponding message in the binary api. The list of messages is available in the file hicn.api (located in hicn/hicn-plugin/src/) -### Example: consumer and producer Ping ### +### Example: consumer and producer Ping In this example, we connect two vpp forwarders, A and B, each of them running the hicn plugin. On top of forwarder A we run the ping_client application, on top of forwarder B we run the ping_server application. Each application connects to the underlying forwarder through a memif-interface. The two forwarders are connected through a dpdk link. -#### Forwarder A #### +#### Forwarder A ```shell $ sudo vppctl @@ -331,7 +327,7 @@ vpp# hicn fib add prefix b002::1/64 face 0 vpp# hicn punting add prefix b002::1/64 intfc TenGigabitEtherneta/0/0 type ip ``` -#### Forwarder B #### +#### Forwarder B ```shell $ sudo vppctl @@ -344,30 +340,11 @@ vpp# hicn punting add prefix b002::1/64 intfc TenGigabitEtherneta/0/1 type ip Once the two forwarder are started, run the ping_server application on the host where the forwarder B is running ```shell -$ sudo ping_server -n b002::1 +sudo ping_server -n b002::1 ``` and the client on the host where forwarder B is running ```shell -$ sudo ping_client -n b002::1 -``` - -## License ## - -This software is distributed under the following license: - -``` -Copyright (c) 2017-2019 Cisco and/or its affiliates. -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at: - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. +sudo ping_client -n b002::1 ``` diff --git a/hicn-light/src/hicn/command_line/controller/hicnLightControl_main.c b/hicn-light/src/hicn/command_line/controller/hicnLightControl_main.c index f27eb3ba5..0a8e01f65 100644 --- a/hicn-light/src/hicn/command_line/controller/hicnLightControl_main.c +++ b/hicn-light/src/hicn/command_line/controller/hicnLightControl_main.c @@ -79,6 +79,7 @@ static int payloadLengthController[LAST_COMMAND_VALUE] = { sizeof(mapme_activator_command), sizeof(mapme_timing_command), sizeof(mapme_timing_command), + sizeof(mapme_send_update_command), sizeof(connection_set_admin_state_command), #ifdef WITH_POLICY sizeof(add_policy_command), diff --git a/hicn-light/src/hicn/config/configuration.c b/hicn-light/src/hicn/config/configuration.c index 509ad10ba..a5be84f32 100644 --- a/hicn-light/src/hicn/config/configuration.c +++ b/hicn-light/src/hicn/config/configuration.c @@ -1094,10 +1094,7 @@ struct iovec *configuration_MapMeEnable(Configuration *config, parcMemory_Deallocate((void **)&result); parcBufferComposer_Release(&composer); - struct iovec *response = - utils_CreateAck(header, control, sizeof(mapme_activator_command)); - - return response; + return utils_CreateAck(header, control, sizeof(mapme_timing_command)); } struct iovec *configuration_MapMeDiscovery(Configuration *config, @@ -1118,10 +1115,7 @@ struct iovec *configuration_MapMeDiscovery(Configuration *config, parcMemory_Deallocate((void **)&result); parcBufferComposer_Release(&composer); - struct iovec *response = - utils_CreateAck(header, control, sizeof(mapme_activator_command)); - - return response; + return utils_CreateAck(header, control, sizeof(mapme_timing_command)); } struct iovec *configuration_MapMeTimescale(Configuration *config, @@ -1141,10 +1135,7 @@ struct iovec *configuration_MapMeTimescale(Configuration *config, parcMemory_Deallocate((void **)&result); parcBufferComposer_Release(&composer); - struct iovec *response = - utils_CreateAck(header, control, sizeof(mapme_timing_command)); - - return response; + return utils_CreateAck(header, control, sizeof(mapme_timing_command)); } struct iovec *configuration_MapMeRetx(Configuration *config, @@ -1164,12 +1155,44 @@ struct iovec *configuration_MapMeRetx(Configuration *config, parcMemory_Deallocate((void **)&result); parcBufferComposer_Release(&composer); - struct iovec *response = - utils_CreateAck(header, control, sizeof(mapme_timing_command)); + return utils_CreateAck(header, control, sizeof(mapme_timing_command)); +} - return response; +struct iovec * configuration_MapMeSendUpdate(Configuration *config, + struct iovec *request, unsigned ingressId) { + header_control_message *header = request[0].iov_base; + mapme_send_update_command *control = request[1].iov_base; + + FIB * fib = forwarder_getFib(config->forwarder); + if (!fib) + goto ERR; + Name *prefix = name_CreateFromAddress(control->addressType, control->address, + control->len); + if (!prefix) + goto ERR; + FibEntry *entry = fib_Contains(fib, prefix); + name_Release(&prefix); + if (!entry) + goto ERR; + + const NumberSet * nexthops = fibEntry_GetNexthops(entry); + unsigned size = (unsigned) numberSet_Length(nexthops); + + /* The command is accepted iif triggered by (one of) the producer of this prefix */ + for (unsigned i = 0; i < size; i++) { + unsigned nhop = numberSet_GetItem(nexthops, i); + if (nhop == ingressId) { + MapMe * mapme = forwarder_getMapmeInstance(config->forwarder); + mapme_send_updates(mapme, entry, nexthops); + return utils_CreateAck(header, control, sizeof(mapme_timing_command)); + } + } + +ERR: + return utils_CreateNack(header, control, sizeof(connection_set_admin_state_command)); } + struct iovec *configuration_ConnectionSetAdminState(Configuration *config, struct iovec *request) { header_control_message *header = request[0].iov_base; @@ -1418,6 +1441,10 @@ struct iovec *configuration_DispatchCommand(Configuration *config, response = configuration_MapMeRetx(config, control); break; + case MAPME_SEND_UPDATE: + response = configuration_MapMeSendUpdate(config, control, ingressId); + break; + case CONNECTION_SET_ADMIN_STATE: response = configuration_ConnectionSetAdminState(config, control); break; diff --git a/hicn-light/src/hicn/core/mapme.c b/hicn-light/src/hicn/core/mapme.c index 93a01bb0d..a22d01ae7 100644 --- a/hicn-light/src/hicn/core/mapme.c +++ b/hicn-light/src/hicn/core/mapme.c @@ -482,14 +482,6 @@ static bool mapme_hasLocalNextHops(const MapMe *mapme, void mapme_send_updates(const MapMe * mapme, FibEntry * fibEntry, const NumberSet * nexthops) { - /* Detect change */ - NumberSet * previous_nexthops = fibEntry_GetPreviousNextHops(fibEntry); - if (numberSet_Equals(nexthops, previous_nexthops)) { - INFO(mapme, "[MAP-Me] No change in nexthops"); - return; - } - fibEntry_SetPreviousNextHops(fibEntry, nexthops); - if (!TFIB(fibEntry)) /* Create TFIB associated to FIB entry */ mapme_CreateTFIB(fibEntry); TFIB(fibEntry)->seq++; @@ -507,6 +499,21 @@ mapme_send_updates(const MapMe * mapme, FibEntry * fibEntry, const NumberSet * n free(name_str); } + +void +mapme_maybe_send_updates(const MapMe * mapme, FibEntry * fibEntry, const NumberSet * nexthops) +{ + /* Detect change */ + NumberSet * previous_nexthops = fibEntry_GetPreviousNextHops(fibEntry); + if (numberSet_Equals(nexthops, previous_nexthops)) { + INFO(mapme, "[MAP-Me] No change in nexthops"); + return; + } + fibEntry_SetPreviousNextHops(fibEntry, nexthops); + + mapme_send_updates(mapme, fibEntry, nexthops); +} + void mapme_reconsiderFibEntry(const MapMe *mapme, FibEntry * fibEntry) { diff --git a/hicn-light/src/hicn/core/mapme.h b/hicn-light/src/hicn/core/mapme.h index 503b22568..72f8d536a 100644 --- a/hicn-light/src/hicn/core/mapme.h +++ b/hicn-light/src/hicn/core/mapme.h @@ -70,7 +70,7 @@ void mapme_Process(const MapMe *mapme, const uint8_t *msgBuffer, /** * @function mapme_send_updates - * @abstract Trigger the update for specified FIB entry and nexthops + * @abstract Trigger (if needed) the update for specified FIB entry and nexthops * @param [in] mapme - Pointer to the MAP-Me data structure. * @param [in] fibEntry - The FIB entry to consider * @param [in] nexthops - NumberSet holding the next hops on which to send the @@ -79,6 +79,16 @@ void mapme_Process(const MapMe *mapme, const uint8_t *msgBuffer, void mapme_send_updates(const MapMe * mapme, FibEntry * fibEntry, const NumberSet * nexthops); /** + * @function mapme_send_updates + * @abstract Trigger the update for specified FIB entry and nexthops, only if needed + * @param [in] mapme - Pointer to the MAP-Me data structure. + * @param [in] fibEntry - The FIB entry to consider + * @param [in] nexthops - NumberSet holding the next hops on which to send the + * update. + */ +void mapme_maybe_send_updates(const MapMe * mapme, FibEntry * fibEntry, const NumberSet * nexthops); + +/** * @function mapme_reconsiderFibEntry * @abstract Process a fib entry for changes that might trigger new updates * @param [in] mapme - Pointer to the MAP-Me data structure. diff --git a/hicn-light/src/hicn/strategies/lowLatency.c b/hicn-light/src/hicn/strategies/lowLatency.c index 47de73538..61bffe243 100644 --- a/hicn-light/src/hicn/strategies/lowLatency.c +++ b/hicn-light/src/hicn/strategies/lowLatency.c @@ -192,7 +192,7 @@ static void strategyLowLatency_SendMapmeUpdate(StrategyLowLatency *ll, for(unsigned i = 0; i < ll->related_prefixes_len; i++){ FibEntry *fibEntry = fib_MatchName(fib, ll->related_prefixes[i]); if(fibEntry != NULL){ - mapme_send_updates(mapme, fibEntry, nexthops); + mapme_maybe_send_updates(mapme, fibEntry, nexthops); } } } diff --git a/hicn-light/src/hicn/utils/commands.h b/hicn-light/src/hicn/utils/commands.h index 1cce8edd3..d8e5329b3 100644 --- a/hicn-light/src/hicn/utils/commands.h +++ b/hicn-light/src/hicn/utils/commands.h @@ -70,6 +70,7 @@ typedef enum { MAPME_DISCOVERY, MAPME_TIMESCALE, MAPME_RETX, + MAPME_SEND_UPDATE, CONNECTION_SET_ADMIN_STATE, #ifdef WITH_POLICY ADD_POLICY, @@ -308,6 +309,12 @@ typedef struct { uint32_t timePeriod; } mapme_timing_command; +typedef struct { + ip_address_t address; + uint8_t addressType; + uint8_t len; +} mapme_send_update_command; + // SIZE=1 typedef struct { @@ -400,6 +407,8 @@ static inline int payloadLengthDaemon(command_id id) { return sizeof(mapme_timing_command); case MAPME_RETX: return sizeof(mapme_timing_command); + case MAPME_SEND_UPDATE: + return sizeof(mapme_send_update_command); case CONNECTION_SET_ADMIN_STATE: return sizeof(connection_set_admin_state_command); #ifdef WITH_POLICY diff --git a/libtransport/src/hicn/transport/core/manifest_format.h b/libtransport/src/hicn/transport/core/manifest_format.h index 451e3db6a..9b6777270 100644 --- a/libtransport/src/hicn/transport/core/manifest_format.h +++ b/libtransport/src/hicn/transport/core/manifest_format.h @@ -51,8 +51,18 @@ enum class HashAlgorithm : uint8_t { CRC32C = static_cast<uint8_t>(utils::CryptoHashType::CRC32C), }; +/** + * INCREMENTAL: Manifests will be received inline with the data with no specific + * assumption regarding the manifest capacity. Consumers can send interests + * using a +1 heuristic. + * + * MANIFEST_CAPACITY_BASED: manifests with capacity N have a suffix multiple of + * N+1: 0, N+1, 2(N+1) etc. Contents have a suffix incremented by 1 except when + * it conflicts with a manifest: 1, 2, ..., N, N+2, N+3, ..., 2N+1, 2N+3 + */ enum class NextSegmentCalculationStrategy : uint8_t { INCREMENTAL = 1, + MANIFEST_CAPACITY_BASED = 2, }; template <typename T> diff --git a/libtransport/src/hicn/transport/http/client_connection.cc b/libtransport/src/hicn/transport/http/client_connection.cc index 32269d49d..aa9cb0463 100644 --- a/libtransport/src/hicn/transport/http/client_connection.cc +++ b/libtransport/src/hicn/transport/http/client_connection.cc @@ -16,6 +16,8 @@ #include <hicn/transport/http/client_connection.h> #include <hicn/transport/utils/hash.h> +#include <fstream> + #define DEFAULT_BETA 0.99 #define DEFAULT_GAMMA 0.07 @@ -38,6 +40,12 @@ HTTPClientConnection::HTTPClientConnection() std::placeholders::_2)); consumer_.setSocketOption(ConsumerCallbacksOptions::READ_CALLBACK, this); + consumer_.setSocketOption( + ConsumerCallbacksOptions::VERIFICATION_FAILED, + (ConsumerContentObjectVerificationFailedCallback)std::bind( + &HTTPClientConnection::onSignatureVerificationFailed, this, + std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); + consumer_.setSocketOption(GeneralTransportOptions::VERIFY_SIGNATURE, false); consumer_.connect(); std::shared_ptr<typename ConsumerSocket::Portal> portal; @@ -87,6 +95,10 @@ HTTPClientConnection::RC HTTPClientConnection::sendRequest( return return_code_; } +void HTTPClientConnection::verifyPacketSignature(bool verify) { + consumer_.setSocketOption(GeneralTransportOptions::VERIFY_SIGNATURE, verify); +} + void HTTPClientConnection::sendRequestGetReply( const HTTPRequest &request, std::shared_ptr<HTTPResponse> &response, std::string &ipv6_first_word) { @@ -186,6 +198,12 @@ HTTPClientConnection &HTTPClientConnection::setCertificate( return *this; } +VerificationPolicy HTTPClientConnection::onSignatureVerificationFailed( + ConsumerSocket &consumer, const core::ContentObject &content_object, + std::error_code reason) { + return VerificationPolicy::ACCEPT_PACKET; +} + // Read buffer management void HTTPClientConnection::readBufferAvailable( std::unique_ptr<utils::MemBuf> &&buffer) noexcept { diff --git a/libtransport/src/hicn/transport/http/client_connection.h b/libtransport/src/hicn/transport/http/client_connection.h index 5bcf9c4c7..e001653ab 100644 --- a/libtransport/src/hicn/transport/http/client_connection.h +++ b/libtransport/src/hicn/transport/http/client_connection.h @@ -20,6 +20,7 @@ #include <hicn/transport/http/response.h> #include <hicn/transport/interfaces/socket_consumer.h> #include <hicn/transport/interfaces/socket_producer.h> +#include <hicn/transport/interfaces/verification_policy.h> #include <hicn/transport/utils/uri.h> #include <vector> @@ -68,6 +69,8 @@ class HTTPClientConnection : public ConsumerSocket::ReadCallback { HTTPClientConnection &setCertificate(const std::string &cert_path); + void verifyPacketSignature(bool verify); + private: void sendRequestGetReply(const HTTPRequest &request, std::shared_ptr<HTTPResponse> &response, @@ -80,6 +83,10 @@ class HTTPClientConnection : public ConsumerSocket::ReadCallback { const core::Interest &interest, std::string &payload); + VerificationPolicy onSignatureVerificationFailed( + ConsumerSocket &consumer, const core::ContentObject &content_object, + std::error_code reason); + // Read callback bool isBufferMovable() noexcept override { return true; } void getReadBuffer(uint8_t **application_buffer, diff --git a/libtransport/src/hicn/transport/interfaces/CMakeLists.txt b/libtransport/src/hicn/transport/interfaces/CMakeLists.txt index 0c2c73623..1f3c29b1f 100644 --- a/libtransport/src/hicn/transport/interfaces/CMakeLists.txt +++ b/libtransport/src/hicn/transport/interfaces/CMakeLists.txt @@ -22,6 +22,7 @@ list(APPEND HEADER_FILES ${CMAKE_CURRENT_SOURCE_DIR}/socket_options_default_values.h ${CMAKE_CURRENT_SOURCE_DIR}/socket_options_keys.h ${CMAKE_CURRENT_SOURCE_DIR}/callbacks.h + ${CMAKE_CURRENT_SOURCE_DIR}/verification_policy.h ) list(APPEND SOURCE_FILES diff --git a/libtransport/src/hicn/transport/interfaces/callbacks.h b/libtransport/src/hicn/transport/interfaces/callbacks.h index 7194cca42..6de48d14b 100644 --- a/libtransport/src/hicn/transport/interfaces/callbacks.h +++ b/libtransport/src/hicn/transport/interfaces/callbacks.h @@ -15,11 +15,12 @@ #pragma once +#include <hicn/transport/core/facade.h> +#include <hicn/transport/interfaces/verification_policy.h> + #include <functional> #include <system_error> -#include <hicn/transport/core/facade.h> - namespace utils { class MemBuf; } @@ -85,6 +86,16 @@ using ConsumerContentObjectVerificationCallback = std::function<bool(ConsumerSocket &, const core::ContentObject &)>; /** + * The ConsumerContentObjectVerificationFailedCallback will be caled by the + * transport if a data packet (either manifest or content object) cannot be + * verified. The application here decides what to do by returning a + * VerificationFailedPolicy object. + */ +using ConsumerContentObjectVerificationFailedCallback = + std::function<VerificationPolicy( + ConsumerSocket &, const core::ContentObject &, std::error_code ec)>; + +/** * The ConsumerManifestCallback will be called by the consumer socket when a * manifest is received. */ diff --git a/libtransport/src/hicn/transport/interfaces/socket_consumer.cc b/libtransport/src/hicn/transport/interfaces/socket_consumer.cc index fbe4bed1a..fba972fe5 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_consumer.cc +++ b/libtransport/src/hicn/transport/interfaces/socket_consumer.cc @@ -417,6 +417,28 @@ int ConsumerSocket::setSocketOption( }); } +int ConsumerSocket::setSocketOption( + int socket_option_key, + ConsumerContentObjectVerificationFailedCallback socket_option_value) { + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this]( + int socket_option_key, + ConsumerContentObjectVerificationFailedCallback socket_option_value) + -> int { + switch (socket_option_key) { + case ConsumerCallbacksOptions::VERIFICATION_FAILED: + verification_failed_callback_ = socket_option_value; + break; + + default: + return SOCKET_OPTION_NOT_SET; + } + + return SOCKET_OPTION_SET; + }); +} + int ConsumerSocket::setSocketOption(int socket_option_key, IcnObserver *socket_option_value) { utils::SpinLock::Acquire locked(guard_raaqm_params_); @@ -712,6 +734,29 @@ int ConsumerSocket::getSocketOption( } int ConsumerSocket::getSocketOption( + int socket_option_key, + ConsumerContentObjectVerificationFailedCallback **socket_option_value) { + // Reschedule the function on the io_service to avoid race condition in case + // setSocketOption is called while the io_service is running. + return rescheduleOnIOService( + socket_option_key, socket_option_value, + [this]( + int socket_option_key, + ConsumerContentObjectVerificationFailedCallback **socket_option_value) + -> int { + switch (socket_option_key) { + case ConsumerCallbacksOptions::VERIFICATION_FAILED: + *socket_option_value = &verification_failed_callback_; + break; + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; + }); +} + +int ConsumerSocket::getSocketOption( int socket_option_key, std::shared_ptr<Portal> &socket_option_value) { switch (socket_option_key) { case PORTAL: @@ -767,6 +812,19 @@ int ConsumerSocket::getSocketOption(int socket_option_key, return SOCKET_OPTION_GET; } +int ConsumerSocket::getSocketOption(int socket_option_key, + TransportStatistics **socket_option_value) { + switch (socket_option_key) { + case OtherOptions::STATISTICS: + *socket_option_value = &stats_; + break; + default: + return SOCKET_OPTION_NOT_GET; + } + + return SOCKET_OPTION_GET; +} + int ConsumerSocket::getSocketOption( int socket_option_key, ConsumerTimerCallback **socket_option_value) { // Reschedule the function on the io_service to avoid race condition in case @@ -789,4 +847,4 @@ int ConsumerSocket::getSocketOption( } // namespace interface -} // end namespace transport +} // namespace transport diff --git a/libtransport/src/hicn/transport/interfaces/socket_consumer.h b/libtransport/src/hicn/transport/interfaces/socket_consumer.h index 0f83fd38f..acce28c1d 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_consumer.h +++ b/libtransport/src/hicn/transport/interfaces/socket_consumer.h @@ -18,6 +18,7 @@ #include <hicn/transport/interfaces/socket.h> #include <hicn/transport/interfaces/socket_options_default_values.h> #include <hicn/transport/protocols/protocol.h> +#include <hicn/transport/protocols/statistics.h> #include <hicn/transport/utils/event_thread.h> #include <hicn/transport/utils/verifier.h> @@ -224,6 +225,10 @@ class ConsumerSocket : public BaseSocket { virtual int setSocketOption( int socket_option_key, + ConsumerContentObjectVerificationFailedCallback socket_option_value); + + virtual int setSocketOption( + int socket_option_key, ConsumerContentObjectVerificationCallback socket_option_value); virtual int setSocketOption(int socket_option_key, @@ -262,6 +267,10 @@ class ConsumerSocket : public BaseSocket { virtual int getSocketOption( int socket_option_key, + ConsumerContentObjectVerificationFailedCallback **socket_option_value); + + virtual int getSocketOption( + int socket_option_key, ConsumerContentObjectVerificationCallback **socket_option_value); virtual int getSocketOption(int socket_option_key, @@ -286,6 +295,9 @@ class ConsumerSocket : public BaseSocket { virtual int getSocketOption(int socket_option_key, ConsumerTimerCallback **socket_option_value); + virtual int getSocketOption(int socket_option_key, + TransportStatistics **socket_option_value); + protected: // If the thread calling lambda_func is not the same of io_service, this // function reschedule the function on it @@ -364,6 +376,7 @@ class ConsumerSocket : public BaseSocket { ConsumerContentObjectCallback on_content_object_; ConsumerManifestCallback on_manifest_; ConsumerTimerCallback stats_summary_; + ConsumerContentObjectVerificationFailedCallback verification_failed_callback_; ReadCallback *read_callback_; @@ -375,6 +388,9 @@ class ConsumerSocket : public BaseSocket { // Transport protocol std::unique_ptr<TransportProtocol> transport_protocol_; + // Statistic + TransportStatistics stats_; + utils::SpinLock guard_raaqm_params_; }; diff --git a/libtransport/src/hicn/transport/interfaces/socket_options_keys.h b/libtransport/src/hicn/transport/interfaces/socket_options_keys.h index e14f0f412..b25bacbb9 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_options_keys.h +++ b/libtransport/src/hicn/transport/interfaces/socket_options_keys.h @@ -77,8 +77,9 @@ typedef enum { CONTENT_OBJECT_INPUT = 411, MANIFEST_INPUT = 412, CONTENT_OBJECT_TO_VERIFY = 413, - READ_CALLBACK = 414, - STATS_SUMMARY = 415 + VERIFICATION_FAILED = 414, + READ_CALLBACK = 415, + STATS_SUMMARY = 416 } ConsumerCallbacksOptions; typedef enum { @@ -96,7 +97,11 @@ typedef enum { typedef enum { OUTPUT_INTERFACE = 601 } DataLinkOptions; -typedef enum { VIRTUAL_DOWNLOAD = 701, USE_CFG_FILE = 702 } OtherOptions; +typedef enum { + VIRTUAL_DOWNLOAD = 701, + USE_CFG_FILE = 702, + STATISTICS +} OtherOptions; typedef enum { SHA_256 = 801, diff --git a/libtransport/src/hicn/transport/interfaces/socket_producer.cc b/libtransport/src/hicn/transport/interfaces/socket_producer.cc index 6782000ac..4fef5d1e2 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_producer.cc +++ b/libtransport/src/hicn/transport/interfaces/socket_producer.cc @@ -38,8 +38,7 @@ ProducerSocket::ProducerSocket(asio::io_service &io_service) registration_status_(REGISTRATION_NOT_ATTEMPTED), making_manifest_(false), hash_algorithm_(HashAlgorithm::SHA_256), - suffix_manifest_(core::NextSegmentCalculationStrategy::INCREMENTAL, 0), - suffix_content_(core::NextSegmentCalculationStrategy::INCREMENTAL, 0), + suffix_strategy_(core::NextSegmentCalculationStrategy::INCREMENTAL), on_interest_input_(VOID_HANDLER), on_interest_dropped_input_buffer_(VOID_HANDLER), on_interest_inserted_input_buffer_(VOID_HANDLER), @@ -159,8 +158,8 @@ uint32_t ProducerSocket::produce(Name content_name, uint32_t content_object_expiry_time = content_object_expiry_time_; HashAlgorithm hash_algo = hash_algorithm_; bool making_manifest = making_manifest_; - utils::SuffixContent suffix_content = suffix_content_; - utils::SuffixManifest suffix_manifest = suffix_manifest_; + auto suffix_strategy = utils::SuffixStrategyFactory::getSuffixStrategy( + suffix_strategy_, start_offset); std::shared_ptr<utils::Identity> identity; getSocketOption(GeneralTransportOptions::IDENTITY, identity); @@ -169,19 +168,16 @@ uint32_t ProducerSocket::produce(Name content_name, std::size_t header_size; std::size_t manifest_header_size = 0; std::size_t signature_length = 0; - std::uint32_t final_block_number = 0; + std::uint32_t final_block_number = start_offset; uint64_t free_space_for_content = 0; core::Packet::Format format; std::shared_ptr<ContentObjectManifest> manifest; bool is_last_manifest = false; - suffix_content.updateSuffix(start_offset); - suffix_content.setUsingManifest(making_manifest); // TODO Manifest may still be used for indexing if (making_manifest && !identity) { - throw errors::RuntimeException( - "Making manifests without setting producer identity. Aborting."); + TRANSPORT_LOGD("Making manifests without setting producer identity."); } core::Packet::Format hf_format = core::Packet::Format::HF_UNSPEC; @@ -200,9 +196,9 @@ uint32_t ProducerSocket::produce(Name content_name, format = hf_format; if (making_manifest) { - format = hf_format; manifest_header_size = core::Packet::getHeaderSizeFromFormat( - hf_format_ah, identity->getSignatureLength()); + identity ? hf_format_ah : hf_format, + identity ? identity->getSignatureLength() : 0); } else if (identity) { format = hf_format_ah; signature_length = identity->getSignatureLength(); @@ -225,28 +221,20 @@ uint32_t ProducerSocket::produce(Name content_name, 1.0); uint32_t number_of_manifests = static_cast<uint32_t>( std::ceil(float(number_of_segments) / segment_in_manifest)); - final_block_number = number_of_segments + number_of_manifests - 1; - - suffix_manifest.updateSuffix(start_offset); - suffix_manifest.setNbSegments(segment_in_manifest); - suffix_content.updateSuffix(start_offset + 1); - suffix_content.setNbSegments(segment_in_manifest); + final_block_number += number_of_segments + number_of_manifests - 1; manifest.reset(ContentObjectManifest::createManifest( - content_name.setSuffix(suffix_manifest.getSuffix()), + content_name.setSuffix(suffix_strategy->getNextManifestSuffix()), core::ManifestVersion::VERSION_1, core::ManifestType::INLINE_MANIFEST, - hash_algo, is_last_manifest, content_name, - core::NextSegmentCalculationStrategy::INCREMENTAL, - identity->getSignatureLength())); + hash_algo, is_last_manifest, content_name, suffix_strategy_, + identity ? identity->getSignatureLength() : 0)); manifest->setLifetime(content_object_expiry_time); - suffix_manifest++; if (is_last) { manifest->setFinalBlockNumber(final_block_number); } else { - manifest->setFinalBlockNumber(std::numeric_limits<uint32_t>::max()); + manifest->setFinalBlockNumber(utils::SuffixStrategy::INVALID_SUFFIX); } - } for (unsigned int packaged_segments = 0; @@ -256,7 +244,12 @@ uint32_t ProducerSocket::produce(Name content_name, data_packet_size - manifest_header_size) { // Send the current manifest manifest->encode(); - identity->getSigner().sign(*manifest); + + // If identity set, sign manifest + if (identity) { + identity->getSigner().sign(*manifest); + } + passContentObjectToCallbacks(manifest); // Send content objects stored in the queue @@ -269,25 +262,22 @@ uint32_t ProducerSocket::produce(Name content_name, // acquired in the passContentObjectToCallbacks function, so we can // safely release this reference manifest.reset(ContentObjectManifest::createManifest( - content_name.setSuffix(suffix_manifest.getSuffix()), + content_name.setSuffix(suffix_strategy->getNextManifestSuffix()), core::ManifestVersion::VERSION_1, core::ManifestType::INLINE_MANIFEST, hash_algo, is_last_manifest, - content_name, core::NextSegmentCalculationStrategy::INCREMENTAL, - identity->getSignatureLength())); - manifest->setLifetime(content_object_expiry_time); + content_name, suffix_strategy_, + identity ? identity->getSignatureLength() : 0)); - if (is_last) { - manifest->setFinalBlockNumber(final_block_number); - } else { - manifest->setFinalBlockNumber(std::numeric_limits<uint32_t>::max()); - } - - suffix_manifest++; + manifest->setLifetime(content_object_expiry_time); + manifest->setFinalBlockNumber( + is_last ? final_block_number + : utils::SuffixStrategy::INVALID_SUFFIX); } } + auto content_suffix = suffix_strategy->getNextContentSuffix(); auto content_object = std::make_shared<ContentObject>( - content_name.setSuffix(suffix_content.getSuffix()), format); + content_name.setSuffix(content_suffix), format); content_object->setLifetime(content_object_expiry_time); auto b = buffer->cloneOne(); @@ -314,7 +304,7 @@ uint32_t ProducerSocket::produce(Name content_name, if (making_manifest) { using namespace std::chrono_literals; utils::CryptoHash hash = content_object->computeDigest(hash_algo); - manifest->addSuffixHash(suffix_content.getSuffix(), hash); + manifest->addSuffixHash(content_suffix, hash); content_queue_.push(content_object); } else { if (identity) { @@ -322,8 +312,6 @@ uint32_t ProducerSocket::produce(Name content_name, } passContentObjectToCallbacks(content_object); } - - suffix_content++; } if (making_manifest) { @@ -332,7 +320,10 @@ uint32_t ProducerSocket::produce(Name content_name, } manifest->encode(); - identity->getSigner().sign(*manifest); + if (identity) { + identity->getSigner().sign(*manifest); + } + passContentObjectToCallbacks(manifest); while (!content_queue_.empty()) { passContentObjectToCallbacks(content_queue_.front()); @@ -347,7 +338,7 @@ uint32_t ProducerSocket::produce(Name content_name, }); } - return suffix_content.getSuffix() - start_offset; + return suffix_strategy->getTotalCount(); } void ProducerSocket::asyncProduce(ContentObject &content_object) { diff --git a/libtransport/src/hicn/transport/interfaces/socket_producer.h b/libtransport/src/hicn/transport/interfaces/socket_producer.h index 83d0f73f3..ff6f49723 100644 --- a/libtransport/src/hicn/transport/interfaces/socket_producer.h +++ b/libtransport/src/hicn/transport/interfaces/socket_producer.h @@ -193,8 +193,7 @@ class ProducerSocket : public Socket<BasePortal>, std::atomic<utils::CryptoSuite> crypto_suite_; utils::SpinLock identity_lock_; std::shared_ptr<utils::Identity> identity_; - utils::SuffixManifest suffix_manifest_; - utils::SuffixContent suffix_content_; + core::NextSegmentCalculationStrategy suffix_strategy_; // While manifests are being built, contents are stored in a queue std::queue<std::shared_ptr<ContentObject>> content_queue_; diff --git a/libtransport/src/hicn/transport/interfaces/verification_policy.h b/libtransport/src/hicn/transport/interfaces/verification_policy.h new file mode 100644 index 000000000..cb5140ac1 --- /dev/null +++ b/libtransport/src/hicn/transport/interfaces/verification_policy.h @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2020 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <cstdint> + +namespace transport { +namespace interface { + +/** + * This policy allows the application to tell the transport what to do in case + * the verification of a content object fails. + */ +enum class VerificationPolicy : std::uint8_t { + DROP_PACKET, + ACCEPT_PACKET, + ABORT_SESSION +}; +} // namespace interface +} // namespace transport
\ No newline at end of file diff --git a/libtransport/src/hicn/transport/protocols/CMakeLists.txt b/libtransport/src/hicn/transport/protocols/CMakeLists.txt index 23aeca9bf..06515e0e2 100644 --- a/libtransport/src/hicn/transport/protocols/CMakeLists.txt +++ b/libtransport/src/hicn/transport/protocols/CMakeLists.txt @@ -14,8 +14,12 @@ cmake_minimum_required(VERSION 3.5 FATAL_ERROR) list(APPEND HEADER_FILES - ${CMAKE_CURRENT_SOURCE_DIR}/indexing_manager.h + ${CMAKE_CURRENT_SOURCE_DIR}/indexer.h + ${CMAKE_CURRENT_SOURCE_DIR}/incremental_indexer.h + ${CMAKE_CURRENT_SOURCE_DIR}/manifest_incremental_indexer.h ${CMAKE_CURRENT_SOURCE_DIR}/reassembly.h + ${CMAKE_CURRENT_SOURCE_DIR}/datagram_reassembly.h + ${CMAKE_CURRENT_SOURCE_DIR}/byte_stream_reassembly.h ${CMAKE_CURRENT_SOURCE_DIR}/congestion_window_protocol.h ${CMAKE_CURRENT_SOURCE_DIR}/packet_manager.h ${CMAKE_CURRENT_SOURCE_DIR}/statistics.h @@ -27,11 +31,18 @@ list(APPEND HEADER_FILES ${CMAKE_CURRENT_SOURCE_DIR}/cbr.h ${CMAKE_CURRENT_SOURCE_DIR}/rtc.h ${CMAKE_CURRENT_SOURCE_DIR}/rtc_data_path.h - ${CMAKE_CURRENT_SOURCE_DIR}/manifest_indexing_manager.h + ${CMAKE_CURRENT_SOURCE_DIR}/errors.h + ${CMAKE_CURRENT_SOURCE_DIR}/verification_manager.h + ${CMAKE_CURRENT_SOURCE_DIR}/data_processing_events.h ) list(APPEND SOURCE_FILES + ${CMAKE_CURRENT_SOURCE_DIR}/indexer.cc + ${CMAKE_CURRENT_SOURCE_DIR}/incremental_indexer.cc + ${CMAKE_CURRENT_SOURCE_DIR}/manifest_incremental_indexer.cc ${CMAKE_CURRENT_SOURCE_DIR}/reassembly.cc + ${CMAKE_CURRENT_SOURCE_DIR}/datagram_reassembly.cc + ${CMAKE_CURRENT_SOURCE_DIR}/byte_stream_reassembly.cc ${CMAKE_CURRENT_SOURCE_DIR}/protocol.cc ${CMAKE_CURRENT_SOURCE_DIR}/raaqm.cc ${CMAKE_CURRENT_SOURCE_DIR}/rate_estimation.cc @@ -39,7 +50,8 @@ list(APPEND SOURCE_FILES ${CMAKE_CURRENT_SOURCE_DIR}/cbr.cc ${CMAKE_CURRENT_SOURCE_DIR}/rtc.cc ${CMAKE_CURRENT_SOURCE_DIR}/rtc_data_path.cc - ${CMAKE_CURRENT_SOURCE_DIR}/manifest_indexing_manager.cc + ${CMAKE_CURRENT_SOURCE_DIR}/errors.cc + ${CMAKE_CURRENT_SOURCE_DIR}/verification_manager.cc ) set(RAAQM_CONFIG_INSTALL_PREFIX diff --git a/libtransport/src/hicn/transport/protocols/byte_stream_reassembly.cc b/libtransport/src/hicn/transport/protocols/byte_stream_reassembly.cc new file mode 100644 index 000000000..2f1e5d8fd --- /dev/null +++ b/libtransport/src/hicn/transport/protocols/byte_stream_reassembly.cc @@ -0,0 +1,121 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <hicn/transport/protocols/byte_stream_reassembly.h> + +#include <hicn/transport/interfaces/socket_consumer.h> +#include <hicn/transport/protocols/errors.h> +#include <hicn/transport/protocols/indexer.h> +#include <hicn/transport/utils/array.h> +#include <hicn/transport/utils/membuf.h> + +namespace transport { + +namespace protocol { + +ByteStreamReassembly::ByteStreamReassembly( + interface::ConsumerSocket *icn_socket, + TransportProtocol *transport_protocol) + : Reassembly(icn_socket, transport_protocol), + index_(IndexManager::invalid_index), + download_complete_(false) {} + +void ByteStreamReassembly::reassemble( + std::unique_ptr<ContentObjectManifest> &&manifest) { + if (TRANSPORT_EXPECT_TRUE(manifest != nullptr)) { + received_packets_.emplace( + std::make_pair(manifest->getName().getSuffix(), nullptr)); + assembleContent(); + } +} + +void ByteStreamReassembly::reassemble(ContentObject::Ptr &&content_object) { + if (TRANSPORT_EXPECT_TRUE(content_object != nullptr)) { + received_packets_.emplace(std::make_pair( + content_object->getName().getSuffix(), std::move(content_object))); + assembleContent(); + } +} + +void ByteStreamReassembly::assembleContent() { + if (TRANSPORT_EXPECT_FALSE(index_ == IndexManager::invalid_index)) { + index_ = index_manager_->getNextReassemblySegment(); + if (index_ == IndexManager::invalid_index) { + return; + } + } + + auto it = received_packets_.find((const unsigned int)index_); + while (it != received_packets_.end()) { + // Check if valid packet + if (it->second) { + copyContent(*it->second); + } + + received_packets_.erase(it); + index_ = index_manager_->getNextReassemblySegment(); + it = received_packets_.find((const unsigned int)index_); + } + + if (!download_complete_ && index_ != IndexManager::invalid_index) { + transport_protocol_->onReassemblyFailed(index_); + } +} + +void ByteStreamReassembly::copyContent(const ContentObject &content_object) { + auto a = content_object.getPayload(); + auto payload_length = a->length(); + auto write_size = std::min(payload_length, read_buffer_->tailroom()); + auto additional_bytes = payload_length > read_buffer_->tailroom() + ? payload_length - read_buffer_->tailroom() + : 0; + + std::memcpy(read_buffer_->writableTail(), a->data(), write_size); + read_buffer_->append(write_size); + + if (!read_buffer_->tailroom()) { + notifyApplication(); + std::memcpy(read_buffer_->writableTail(), a->data() + write_size, + additional_bytes); + read_buffer_->append(additional_bytes); + } + + download_complete_ = + index_manager_->getFinalSuffix() == content_object.getName().getSuffix(); + + if (TRANSPORT_EXPECT_FALSE(download_complete_)) { + notifyApplication(); + transport_protocol_->onContentReassembled( + make_error_code(protocol_error::success)); + } +} + +void ByteStreamReassembly::reInitialize() { + index_ = IndexManager::invalid_index; + download_complete_ = false; + + received_packets_.clear(); + + // reset read buffer + interface::ConsumerSocket::ReadCallback *read_callback; + reassembly_consumer_socket_->getSocketOption( + interface::ConsumerCallbacksOptions::READ_CALLBACK, &read_callback); + + read_buffer_ = utils::MemBuf::create(read_callback->maxBufferSize()); +} + +} // namespace protocol + +} // namespace transport diff --git a/libtransport/src/hicn/transport/protocols/byte_stream_reassembly.h b/libtransport/src/hicn/transport/protocols/byte_stream_reassembly.h new file mode 100644 index 000000000..7c77d486f --- /dev/null +++ b/libtransport/src/hicn/transport/protocols/byte_stream_reassembly.h @@ -0,0 +1,54 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <hicn/transport/protocols/reassembly.h> + +namespace transport { + +namespace protocol { + +class ByteStreamReassembly : public Reassembly { + public: + ByteStreamReassembly(interface::ConsumerSocket *icn_socket, + TransportProtocol *transport_protocol); + + protected: + virtual void reassemble(core::ContentObject::Ptr &&content_object) override; + + virtual void reassemble( + std::unique_ptr<core::ContentObjectManifest> &&manifest) override; + + virtual void copyContent(const core::ContentObject &content_object); + + virtual void reInitialize() override; + + private: + void assembleContent(); + + protected: + // The consumer socket + // std::unique_ptr<IncrementalIndexManager> incremental_index_manager_; + // std::unique_ptr<ManifestIndexManager> manifest_index_manager_; + // IndexVerificationManager *index_manager_; + std::unordered_map<std::uint32_t, core::ContentObject::Ptr> received_packets_; + uint32_t index_; + bool download_complete_; +}; + +} // namespace protocol + +} // end namespace transport diff --git a/libtransport/src/hicn/transport/protocols/data_processing_events.h b/libtransport/src/hicn/transport/protocols/data_processing_events.h new file mode 100644 index 000000000..8975c2b4a --- /dev/null +++ b/libtransport/src/hicn/transport/protocols/data_processing_events.h @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <hicn/transport/core/content_object.h> +#include <hicn/transport/core/interest.h> + +namespace transport { +namespace protocol { + +class ContentObjectProcessingEventCallback { + public: + virtual ~ContentObjectProcessingEventCallback() = default; + virtual void onPacketDropped(core::Interest::Ptr &&i, + core::ContentObject::Ptr &&c) = 0; + virtual void onReassemblyFailed(std::uint32_t missing_segment) = 0; +}; + +} // namespace protocol +} // namespace transport diff --git a/libtransport/src/hicn/transport/protocols/datagram_reassembly.cc b/libtransport/src/hicn/transport/protocols/datagram_reassembly.cc new file mode 100644 index 000000000..7b01ad4bc --- /dev/null +++ b/libtransport/src/hicn/transport/protocols/datagram_reassembly.cc @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <hicn/transport/protocols/datagram_reassembly.h> + +namespace transport { + +namespace protocol { + +DatagramReassembly::DatagramReassembly(interface::ConsumerSocket* icn_socket, + TransportProtocol* transport_protocol) + : Reassembly(icn_socket, transport_protocol) {} + +void DatagramReassembly::reassemble(core::ContentObject::Ptr&& content_object) { + read_buffer_ = content_object->getPayload(); + Reassembly::notifyApplication(); +} + +void DatagramReassembly::reInitialize() {} + +} // namespace protocol + +} // namespace transport diff --git a/libtransport/src/hicn/transport/protocols/datagram_reassembly.h b/libtransport/src/hicn/transport/protocols/datagram_reassembly.h new file mode 100644 index 000000000..923b6f2c1 --- /dev/null +++ b/libtransport/src/hicn/transport/protocols/datagram_reassembly.h @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <hicn/transport/protocols/reassembly.h> + +namespace transport { + +namespace protocol { + +class DatagramReassembly : public Reassembly { + public: + DatagramReassembly(interface::ConsumerSocket *icn_socket, + TransportProtocol *transport_protocol); + + virtual void reassemble(core::ContentObject::Ptr &&content_object) override; + virtual void reInitialize() override; + virtual void reassemble( + std::unique_ptr<core::ContentObjectManifest> &&manifest) override { + return; + } +}; + +} // namespace protocol + +} // end namespace transport diff --git a/libtransport/src/hicn/transport/protocols/errors.cc b/libtransport/src/hicn/transport/protocols/errors.cc new file mode 100644 index 000000000..c2249ed4a --- /dev/null +++ b/libtransport/src/hicn/transport/protocols/errors.cc @@ -0,0 +1,60 @@ +/* + * Copyright (c) 2020 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <hicn/transport/protocols/errors.h> + +namespace transport { +namespace protocol { + +const std::error_category& protocol_category() { + static protocol_category_impl instance; + + return instance; +} + +const char* protocol_category_impl::name() const throw() { + return "transport::protocol::error"; +} + +std::string protocol_category_impl::message(int ev) const { + switch (static_cast<protocol_error>(ev)) { + case protocol_error::success: { + return "Success"; + } + case protocol_error::signature_verification_failed: { + return "Signature verification failed."; + } + case protocol_error::integrity_verification_failed: { + return "Integrity verification failed"; + } + case protocol_error::no_verifier_provided: { + return "Transport cannot get any verifier for the given data."; + } + case protocol_error::io_error: { + return "Conectivity error between transport and local forwarder"; + } + case protocol_error::max_retransmissions_error: { + return "Transport protocol reached max number of retransmissions allowed " + "for the same interest."; + } + case protocol_error::session_aborted: { + return "The session has been aborted by the application."; + } + default: { return "Unknown protocol error"; } + } +} + +} // namespace protocol +} // namespace transport
\ No newline at end of file diff --git a/libtransport/src/hicn/transport/protocols/errors.h b/libtransport/src/hicn/transport/protocols/errors.h new file mode 100644 index 000000000..cb3d3474e --- /dev/null +++ b/libtransport/src/hicn/transport/protocols/errors.h @@ -0,0 +1,91 @@ +/* + * Copyright (c) 2020 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <string> +#include <system_error> + +namespace transport { +namespace protocol { + +/** + * @brief Get the default server error category. + * @return The default server error category instance. + * + * @warning The first call to this function is thread-safe only starting with + * C++11. + */ +const std::error_category& protocol_category(); + +/** + * The list of errors. + */ +enum class protocol_error { + success = 0, + signature_verification_failed, + integrity_verification_failed, + no_verifier_provided, + io_error, + max_retransmissions_error, + session_aborted, +}; + +/** + * @brief Create an error_code instance for the given error. + * @param error The error. + * @return The error_code instance. + */ +inline std::error_code make_error_code(protocol_error error) { + return std::error_code(static_cast<int>(error), protocol_category()); +} + +/** + * @brief Create an error_condition instance for the given error. + * @param error The error. + * @return The error_condition instance. + */ +inline std::error_condition make_error_condition(protocol_error error) { + return std::error_condition(static_cast<int>(error), protocol_category()); +} + +/** + * @brief A server error category. + */ +class protocol_category_impl : public std::error_category { + public: + /** + * @brief Get the name of the category. + * @return The name of the category. + */ + virtual const char* name() const throw(); + + /** + * @brief Get the error message for a given error. + * @param ev The error numeric value. + * @return The message associated to the error. + */ + virtual std::string message(int ev) const; +}; +} // namespace protocol +} // namespace transport + +namespace std { +// namespace system { +template <> +struct is_error_code_enum<::transport::protocol::protocol_error> + : public std::true_type {}; +// } // namespace system +} // namespace std
\ No newline at end of file diff --git a/libtransport/src/hicn/transport/protocols/incremental_indexer.cc b/libtransport/src/hicn/transport/protocols/incremental_indexer.cc new file mode 100644 index 000000000..5a8046daa --- /dev/null +++ b/libtransport/src/hicn/transport/protocols/incremental_indexer.cc @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <hicn/transport/protocols/incremental_indexer.h> + +#include <hicn/transport/interfaces/socket_consumer.h> + +namespace transport { +namespace protocol { + +void IncrementalIndexer::onContentObject( + core::Interest::Ptr &&interest, core::ContentObject::Ptr &&content_object) { + using namespace interface; + + if (TRANSPORT_EXPECT_FALSE(content_object->testRst())) { + final_suffix_ = content_object->getName().getSuffix(); + } + + auto ret = verification_manager_->onPacketToVerify(*content_object); + + switch (ret) { + case VerificationPolicy::ACCEPT_PACKET: { + reassembly_->reassemble(std::move(content_object)); + break; + } + case VerificationPolicy::DROP_PACKET: { + transport_protocol_->onPacketDropped(std::move(interest), + std::move(content_object)); + break; + } + case VerificationPolicy::ABORT_SESSION: { + transport_protocol_->onContentReassembled( + make_error_code(protocol_error::session_aborted)); + break; + } + } +} + +} // namespace protocol +} // namespace transport
\ No newline at end of file diff --git a/libtransport/src/hicn/transport/protocols/indexing_manager.h b/libtransport/src/hicn/transport/protocols/incremental_indexer.h index b6b8bb4a6..ea84d645a 100644 --- a/libtransport/src/hicn/transport/protocols/indexing_manager.h +++ b/libtransport/src/hicn/transport/protocols/incremental_indexer.h @@ -15,9 +15,11 @@ #pragma once +#include <hicn/transport/protocols/indexer.h> + #include <hicn/transport/errors/runtime_exception.h> #include <hicn/transport/errors/unexpected_manifest_exception.h> -#include <hicn/transport/interfaces/socket_consumer.h> +#include <hicn/transport/protocols/reassembly.h> #include <hicn/transport/protocols/verification_manager.h> #include <hicn/transport/utils/literals.h> @@ -25,74 +27,59 @@ namespace transport { -namespace protocol { - -class IndexManager { - public: - static constexpr uint32_t invalid_index = ~0; - - /** - * - */ - virtual ~IndexManager() = default; - /** - * Retrieve from the manifest the next suffix to retrieve. - */ - virtual uint32_t getNextSuffix() = 0; - - virtual void setFirstSuffix(uint32_t suffix) = 0; - - /** - * Retrive the next segment to be reassembled. - */ - virtual uint32_t getNextReassemblySegment() = 0; - - virtual bool isFinalSuffixDiscovered() = 0; - - virtual uint32_t getFinalSuffix() = 0; - - virtual void reset() = 0; -}; +namespace interface { +class ConsumerSocket; +} -class IndexVerificationManager : public IndexManager { - public: - /** - * - */ - virtual ~IndexVerificationManager() = default; +namespace protocol { - /** - * The ownership of the ContentObjectManifest is moved - * from the caller to the VerificationManager - */ - virtual bool onManifest(core::ContentObject::Ptr &&content_object) = 0; +class Reassembly; +class TransportProtocol; - /** - * The content object must just be verified; the ownership is still of the - * caller. - */ - virtual bool onContentObject(const core::ContentObject &content_object) = 0; -}; - -class IncrementalIndexManager : public IndexVerificationManager { +class IncrementalIndexer : public Indexer { public: - IncrementalIndexManager(interface::ConsumerSocket *icn_socket) + IncrementalIndexer(interface::ConsumerSocket *icn_socket, + TransportProtocol *transport, Reassembly *reassembly) : socket_(icn_socket), + reassembly_(reassembly), + transport_protocol_(transport), final_suffix_(std::numeric_limits<uint32_t>::max()), + first_suffix_(0), next_download_suffix_(0), next_reassembly_suffix_(0), verification_manager_( - std::make_unique<SignatureVerificationManager>(icn_socket)) {} + std::make_unique<SignatureVerificationManager>(icn_socket)) { + if (reassembly_) { + reassembly_->setIndexer(this); + } + } + + IncrementalIndexer(const IncrementalIndexer &) = delete; + + IncrementalIndexer(IncrementalIndexer &&other) + : socket_(other.socket_), + reassembly_(other.reassembly_), + transport_protocol_(other.transport_protocol_), + final_suffix_(other.final_suffix_), + first_suffix_(other.first_suffix_), + next_download_suffix_(other.next_download_suffix_), + next_reassembly_suffix_(other.next_reassembly_suffix_), + verification_manager_(std::move(other.verification_manager_)) { + if (reassembly_) { + reassembly_->setIndexer(this); + } + } /** * */ - virtual ~IncrementalIndexManager() {} + virtual ~IncrementalIndexer() {} - TRANSPORT_ALWAYS_INLINE virtual void reset() override { + TRANSPORT_ALWAYS_INLINE virtual void reset( + std::uint32_t offset = 0) override { final_suffix_ = std::numeric_limits<uint32_t>::max(); - next_download_suffix_ = first_suffix_; - next_reassembly_suffix_ = 0; + next_download_suffix_ = offset; + next_reassembly_suffix_ = offset; } /** @@ -125,24 +112,21 @@ class IncrementalIndexManager : public IndexVerificationManager { return final_suffix_; } - TRANSPORT_ALWAYS_INLINE bool onManifest( - core::ContentObject::Ptr &&content_object) override { - throw errors::UnexpectedManifestException(); - } + void onContentObject(core::Interest::Ptr &&interest, + core::ContentObject::Ptr &&content_object) override; - TRANSPORT_ALWAYS_INLINE bool onContentObject( - const core::ContentObject &content_object) override { - auto ret = verification_manager_->onPacketToVerify(content_object); + TRANSPORT_ALWAYS_INLINE void setReassembly(Reassembly *reassembly) { + reassembly_ = reassembly; - if (TRANSPORT_EXPECT_FALSE(content_object.testRst())) { - final_suffix_ = content_object.getName().getSuffix(); + if (reassembly_) { + reassembly_->setIndexer(this); } - - return ret; } protected: interface::ConsumerSocket *socket_; + Reassembly *reassembly_; + TransportProtocol *transport_protocol_; uint32_t final_suffix_; uint32_t first_suffix_; uint32_t next_download_suffix_; diff --git a/libtransport/src/hicn/transport/protocols/indexer.cc b/libtransport/src/hicn/transport/protocols/indexer.cc new file mode 100644 index 000000000..c50c4236b --- /dev/null +++ b/libtransport/src/hicn/transport/protocols/indexer.cc @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <hicn/transport/protocols/indexer.h> + +#include <hicn/transport/protocols/incremental_indexer.h> +#include <hicn/transport/protocols/manifest_incremental_indexer.h> +#include <hicn/transport/protocols/protocol.h> +#include <hicn/transport/utils/branch_prediction.h> + +namespace transport { +namespace protocol { + +IndexManager::IndexManager(interface::ConsumerSocket *icn_socket, + TransportProtocol *transport, Reassembly *reassembly) + : indexer_(std::make_unique<IncrementalIndexer>(icn_socket, transport, + reassembly)), + first_segment_received_(false), + icn_socket_(icn_socket), + transport_(transport), + reassembly_(reassembly) {} + +void IndexManager::onContentObject(core::Interest::Ptr &&interest, + core::ContentObject::Ptr &&content_object) { + if (first_segment_received_) { + indexer_->onContentObject(std::move(interest), std::move(content_object)); + } else { + std::uint32_t segment_number = interest->getName().getSuffix(); + + if (segment_number == 0) { + // Check if manifest + if (content_object->getPayloadType() == PayloadType::MANIFEST) { + IncrementalIndexer *indexer = + static_cast<IncrementalIndexer *>(indexer_.release()); + indexer_ = + std::make_unique<ManifestIncrementalIndexer>(std::move(*indexer)); + delete indexer; + } + + indexer_->onContentObject(std::move(interest), std::move(content_object)); + auto it = interest_data_set_.begin(); + while (it != interest_data_set_.end()) { + indexer_->onContentObject(std::move(const_cast<core::Interest::Ptr &&>(it->first)), std::move(const_cast<core::ContentObject::Ptr &&>(it->second))); + it = interest_data_set_.erase(it); + } + + first_segment_received_ = true; + } else { + interest_data_set_.emplace(std::move(interest), std::move(content_object)); + } + } +} + +void IndexManager::reset(std::uint32_t offset) { + indexer_ = std::make_unique<IncrementalIndexer>(icn_socket_, transport_, + reassembly_); + first_segment_received_ = false; + interest_data_set_.clear(); +} + +} // namespace protocol +} // namespace transport
\ No newline at end of file diff --git a/libtransport/src/hicn/transport/protocols/indexer.h b/libtransport/src/hicn/transport/protocols/indexer.h new file mode 100644 index 000000000..89751095e --- /dev/null +++ b/libtransport/src/hicn/transport/protocols/indexer.h @@ -0,0 +1,102 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <hicn/transport/core/content_object.h> +#include <hicn/transport/core/interest.h> + +#include <set> + +namespace transport { + +namespace interface { +class ConsumerSocket; +} + +namespace protocol { + +class Reassembly; +class TransportProtocol; + +class Indexer { + public: + /** + * + */ + virtual ~Indexer() = default; + /** + * Retrieve from the manifest the next suffix to retrieve. + */ + virtual uint32_t getNextSuffix() = 0; + + virtual void setFirstSuffix(uint32_t suffix) = 0; + + /** + * Retrive the next segment to be reassembled. + */ + virtual uint32_t getNextReassemblySegment() = 0; + + virtual bool isFinalSuffixDiscovered() = 0; + + virtual uint32_t getFinalSuffix() = 0; + + virtual void reset(std::uint32_t offset = 0) = 0; + + virtual void onContentObject(core::Interest::Ptr &&interest, + core::ContentObject::Ptr &&content_object) = 0; +}; + +class IndexManager : Indexer { + public: + static constexpr uint32_t invalid_index = ~0; + + IndexManager(interface::ConsumerSocket *icn_socket, + TransportProtocol *transport, Reassembly *reassembly); + + uint32_t getNextSuffix() override { return indexer_->getNextSuffix(); } + + void setFirstSuffix(uint32_t suffix) override { + indexer_->setFirstSuffix(suffix); + } + + uint32_t getNextReassemblySegment() override { + return indexer_->getNextReassemblySegment(); + } + + bool isFinalSuffixDiscovered() override { + return indexer_->isFinalSuffixDiscovered(); + } + + uint32_t getFinalSuffix() override { return indexer_->getFinalSuffix(); } + + void reset(std::uint32_t offset = 0) override; + + void onContentObject(core::Interest::Ptr &&interest, + core::ContentObject::Ptr &&content_object) override; + + private: + std::unique_ptr<Indexer> indexer_; + bool first_segment_received_; + std::set<std::pair<core::Interest::Ptr, core::ContentObject::Ptr>> + interest_data_set_; + interface::ConsumerSocket *icn_socket_; + TransportProtocol *transport_; + Reassembly *reassembly_; +}; + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/hicn/transport/protocols/manifest_incremental_indexer.cc b/libtransport/src/hicn/transport/protocols/manifest_incremental_indexer.cc new file mode 100644 index 000000000..592daa4d4 --- /dev/null +++ b/libtransport/src/hicn/transport/protocols/manifest_incremental_indexer.cc @@ -0,0 +1,232 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <hicn/transport/interfaces/socket_consumer.h> +#include <hicn/transport/protocols/manifest_incremental_indexer.h> + +#include <cmath> +#include <deque> + +namespace transport { + +namespace protocol { + +using namespace interface; + +ManifestIncrementalIndexer::ManifestIncrementalIndexer( + interface::ConsumerSocket *icn_socket, TransportProtocol *transport, + Reassembly *reassembly) + : IncrementalIndexer(icn_socket, transport, reassembly), + suffix_strategy_(utils::SuffixStrategyFactory::getSuffixStrategy( + NextSegmentCalculationStrategy::INCREMENTAL, + next_download_suffix_, 0)) {} + +void ManifestIncrementalIndexer::onContentObject( + core::Interest::Ptr &&interest, core::ContentObject::Ptr &&content_object) { + // Check if mainfiest or not + if (content_object->getPayloadType() == PayloadType::MANIFEST) { + onUntrustedManifest(std::move(interest), std::move(content_object)); + } else if (content_object->getPayloadType() == PayloadType::CONTENT_OBJECT) { + onUntrustedContentObject(std::move(interest), std::move(content_object)); + } +} + +void ManifestIncrementalIndexer::onUntrustedManifest( + core::Interest::Ptr &&interest, core::ContentObject::Ptr &&content_object) { + auto ret = verification_manager_->onPacketToVerify(*content_object); + + switch (ret) { + case VerificationPolicy::ACCEPT_PACKET: { + processTrustedManifest(std::move(content_object)); + break; + } + case VerificationPolicy::DROP_PACKET: + case VerificationPolicy::ABORT_SESSION: { + transport_protocol_->onContentReassembled( + make_error_code(protocol_error::session_aborted)); + break; + } + } +} + +void ManifestIncrementalIndexer::processTrustedManifest( + ContentObject::Ptr &&content_object) { + auto manifest = + std::make_unique<ContentObjectManifest>(std::move(*content_object)); + manifest->decode(); + + if (TRANSPORT_EXPECT_FALSE(manifest->getVersion() != + core::ManifestVersion::VERSION_1)) { + throw errors::RuntimeException("Received manifest with unknown version."); + } + + switch (manifest->getManifestType()) { + case core::ManifestType::INLINE_MANIFEST: { + auto _it = manifest->getSuffixList().begin(); + auto _end = manifest->getSuffixList().end(); + + suffix_strategy_->setFinalSuffix(manifest->getFinalBlockNumber()); + + for (; _it != _end; _it++) { + auto hash = + std::make_pair(std::vector<uint8_t>(_it->second, _it->second + 32), + manifest->getHashAlgorithm()); + + if (!checkUnverifiedSegments(_it->first, hash)) { + suffix_hash_map_[_it->first] = std::move(hash); + } + } + + reassembly_->reassemble(std::move(manifest)); + + break; + } + case core::ManifestType::FLIC_MANIFEST: { + throw errors::NotImplementedException(); + } + case core::ManifestType::FINAL_CHUNK_NUMBER: { + throw errors::NotImplementedException(); + } + } +} + +bool ManifestIncrementalIndexer::checkUnverifiedSegments( + std::uint32_t suffix, const HashEntry &hash) { + auto it = unverified_segments_.find(suffix); + + if (it != unverified_segments_.end()) { + auto ret = verifyContentObject(hash, *it->second.second); + + switch (ret) { + case VerificationPolicy::ACCEPT_PACKET: { + reassembly_->reassemble(std::move(it->second.second)); + break; + } + case VerificationPolicy::DROP_PACKET: { + transport_protocol_->onPacketDropped(std::move(it->second.first), + std::move(it->second.second)); + break; + } + case VerificationPolicy::ABORT_SESSION: { + transport_protocol_->onContentReassembled( + make_error_code(protocol_error::session_aborted)); + break; + } + } + + unverified_segments_.erase(it); + return true; + } + + return false; +} + +VerificationPolicy ManifestIncrementalIndexer::verifyContentObject( + const HashEntry &manifest_hash, const ContentObject &content_object) { + VerificationPolicy ret; + + auto hash_type = static_cast<utils::CryptoHashType>(manifest_hash.second); + auto data_packet_digest = content_object.computeDigest(manifest_hash.second); + auto data_packet_digest_bytes = + data_packet_digest.getDigest<uint8_t>().data(); + const std::vector<uint8_t> &manifest_digest_bytes = manifest_hash.first; + + if (utils::CryptoHash::compareBinaryDigest( + data_packet_digest_bytes, manifest_digest_bytes.data(), hash_type)) { + ret = VerificationPolicy::ACCEPT_PACKET; + } else { + ConsumerContentObjectVerificationFailedCallback + *verification_failed_callback = VOID_HANDLER; + socket_->getSocketOption(ConsumerCallbacksOptions::VERIFICATION_FAILED, + &verification_failed_callback); + ret = (*verification_failed_callback)( + *socket_, content_object, + make_error_code(protocol_error::integrity_verification_failed)); + } + + return ret; +} + +void ManifestIncrementalIndexer::onUntrustedContentObject( + Interest::Ptr &&i, ContentObject::Ptr &&c) { + auto suffix = c->getName().getSuffix(); + auto it = suffix_hash_map_.find(suffix); + + if (it != suffix_hash_map_.end()) { + auto ret = verifyContentObject(it->second, *c); + + switch (ret) { + case VerificationPolicy::ACCEPT_PACKET: { + suffix_hash_map_.erase(it); + reassembly_->reassemble(std::move(c)); + break; + } + case VerificationPolicy::DROP_PACKET: { + transport_protocol_->onPacketDropped(std::move(i), std::move(c)); + break; + } + case VerificationPolicy::ABORT_SESSION: { + transport_protocol_->onContentReassembled( + make_error_code(protocol_error::session_aborted)); + break; + } + } + } else { + unverified_segments_[suffix] = std::make_pair(std::move(i), std::move(c)); + } +} + +uint32_t ManifestIncrementalIndexer::getNextSuffix() { + auto ret = suffix_strategy_->getNextSuffix(); + + if (ret <= suffix_strategy_->getFinalSuffix() && + ret != utils::SuffixStrategy::INVALID_SUFFIX) { + suffix_queue_.push(ret); + return ret; + } + + return IndexManager::invalid_index; +} + +uint32_t ManifestIncrementalIndexer::getFinalSuffix() { + return suffix_strategy_->getFinalSuffix(); +} + +bool ManifestIncrementalIndexer::isFinalSuffixDiscovered() { + return IncrementalIndexer::isFinalSuffixDiscovered(); +} + +uint32_t ManifestIncrementalIndexer::getNextReassemblySegment() { + if (suffix_queue_.empty()) { + return IndexManager::invalid_index; + } + + auto ret = suffix_queue_.front(); + suffix_queue_.pop(); + return ret; +} + +void ManifestIncrementalIndexer::reset(std::uint32_t offset) { + IncrementalIndexer::reset(offset); + suffix_hash_map_.clear(); + unverified_segments_.clear(); + SuffixQueue empty; + std::swap(suffix_queue_, empty); + suffix_strategy_->reset(offset); +} + +} // namespace protocol + +} // namespace transport diff --git a/libtransport/src/hicn/transport/protocols/manifest_incremental_indexer.h b/libtransport/src/hicn/transport/protocols/manifest_incremental_indexer.h new file mode 100644 index 000000000..6e991f86f --- /dev/null +++ b/libtransport/src/hicn/transport/protocols/manifest_incremental_indexer.h @@ -0,0 +1,91 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <hicn/transport/interfaces/socket.h> +#include <hicn/transport/protocols/incremental_indexer.h> +#include <hicn/transport/utils/suffix_strategy.h> + +#include <list> + +namespace transport { + +namespace protocol { + +class ManifestIncrementalIndexer : public IncrementalIndexer { + static constexpr double alpha = 0.3; + + public: + using SuffixQueue = std::queue<uint32_t>; + using HashEntry = std::pair<std::vector<uint8_t>, core::HashAlgorithm>; + + ManifestIncrementalIndexer(interface::ConsumerSocket *icn_socket, + TransportProtocol *transport, Reassembly *reassembly); + + ManifestIncrementalIndexer(IncrementalIndexer &&indexer) + : IncrementalIndexer(std::move(indexer)), + suffix_strategy_(utils::SuffixStrategyFactory::getSuffixStrategy( + core::NextSegmentCalculationStrategy::INCREMENTAL, + next_download_suffix_, 0)) { + for (uint32_t i = first_suffix_; i < next_download_suffix_; i++) { + suffix_queue_.push(i); + } + } + + virtual ~ManifestIncrementalIndexer() = default; + + void reset(std::uint32_t offset = 0) override; + + void onContentObject(core::Interest::Ptr &&interest, + core::ContentObject::Ptr &&content_object) override; + + uint32_t getNextSuffix() override; + + uint32_t getNextReassemblySegment() override; + + bool isFinalSuffixDiscovered() override; + + uint32_t getFinalSuffix() override; + + private: + void onUntrustedManifest(core::Interest::Ptr &&interest, + core::ContentObject::Ptr &&content_object); + void onUntrustedContentObject(core::Interest::Ptr &&interest, + core::ContentObject::Ptr &&content_object); + void processTrustedManifest(core::ContentObject::Ptr &&content_object); + void onManifestReceived(core::Interest::Ptr &&i, + core::ContentObject::Ptr &&c); + void onManifestTimeout(core::Interest::Ptr &&i); + VerificationPolicy verifyContentObject( + const HashEntry &manifest_hash, + const core::ContentObject &content_object); + bool checkUnverifiedSegments(std::uint32_t suffix, const HashEntry &hash); + + protected: + std::unique_ptr<utils::SuffixStrategy> suffix_strategy_; + SuffixQueue suffix_queue_; + + // Hash verification + std::unordered_map<uint32_t, HashEntry> suffix_hash_map_; + + std::unordered_map<uint32_t, + std::pair<core::Interest::Ptr, core::ContentObject::Ptr>> + unverified_segments_; +}; + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.cc b/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.cc deleted file mode 100644 index ea13bf9e6..000000000 --- a/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.cc +++ /dev/null @@ -1,293 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <hicn/transport/interfaces/socket_consumer.h> -#include <hicn/transport/protocols/manifest_indexing_manager.h> - -#include <cmath> -#include <deque> - -namespace transport { - -namespace protocol { - -using namespace interface; - -ManifestIndexManager::ManifestIndexManager( - interface::ConsumerSocket *icn_socket, TransportProtocol *next_interest) - : IncrementalIndexManager(icn_socket), - PacketManager<Interest>(1024), - next_to_retrieve_segment_(suffix_queue_.end()), - suffix_manifest_(core::NextSegmentCalculationStrategy::INCREMENTAL, 0), - next_reassembly_segment_( - core::NextSegmentCalculationStrategy::INCREMENTAL, 1, true), - ignored_segments_(), - next_interest_(next_interest) {} - -bool ManifestIndexManager::onManifest( - core::ContentObject::Ptr &&content_object) { - auto manifest = - std::make_unique<ContentObjectManifest>(std::move(*content_object)); - bool manifest_verified = verification_manager_->onPacketToVerify(*manifest); - - if (manifest_verified) { - manifest->decode(); - - if (TRANSPORT_EXPECT_FALSE(manifest->getVersion() != - core::ManifestVersion::VERSION_1)) { - throw errors::RuntimeException("Received manifest with unknown version."); - } - - switch (manifest->getManifestType()) { - case core::ManifestType::INLINE_MANIFEST: { - auto _it = manifest->getSuffixList().begin(); - auto _end = manifest->getSuffixList().end(); - size_t nb_segments = std::distance(_it, _end); - final_suffix_ = manifest->getFinalBlockNumber(); // final block number - - suffix_hash_map_[_it->first] = - std::make_pair(std::vector<uint8_t>(_it->second, _it->second + 32), - manifest->getHashAlgorithm()); - suffix_queue_.push_back(_it->first); - - // If the transport protocol finished the list of segments to retrieve, - // reset the next_to_retrieve_segment_ iterator to the next segment - // provided by this manifest. - if (TRANSPORT_EXPECT_FALSE(next_to_retrieve_segment_ == - suffix_queue_.end())) { - next_to_retrieve_segment_ = --suffix_queue_.end(); - } - - std::advance(_it, 1); - for (; _it != _end; _it++) { - suffix_hash_map_[_it->first] = std::make_pair( - std::vector<uint8_t>(_it->second, _it->second + 32), - manifest->getHashAlgorithm()); - suffix_queue_.push_back(_it->first); - } - - if (TRANSPORT_EXPECT_FALSE(manifest->getName().getSuffix()) == 0) { - core::NextSegmentCalculationStrategy strategy = - manifest->getNextSegmentCalculationStrategy(); - - suffix_manifest_.reset(0); - suffix_manifest_.setNbSegments(nb_segments); - suffix_manifest_.setSuffixStrategy(strategy); - TRANSPORT_LOGD("Capacity of 1st manifest %zu", - suffix_manifest_.getNbSegments()); - - next_reassembly_segment_.reset(*suffix_queue_.begin()); - next_reassembly_segment_.setNbSegments(nb_segments); - suffix_manifest_.setSuffixStrategy(strategy); - } - - // If the manifest is not full, we add the suffixes of missing segments - // to the list of segments to ignore when computing the next reassembly - // index. - if (TRANSPORT_EXPECT_FALSE( - suffix_manifest_.getNbSegments() - nb_segments > 0)) { - auto start = manifest->getSuffixList().begin(); - auto last = --_end; - for (uint32_t i = last->first + 1; - i < start->first + suffix_manifest_.getNbSegments(); i++) { - ignored_segments_.push_back(i); - } - } - - if (TRANSPORT_EXPECT_FALSE(manifest->isFinalManifest()) == 0) { - fillWindow(manifest->getWritableName(), - manifest->getName().getSuffix()); - } - - break; - } - case core::ManifestType::FLIC_MANIFEST: { - throw errors::NotImplementedException(); - } - case core::ManifestType::FINAL_CHUNK_NUMBER: { - throw errors::NotImplementedException(); - } - } - } - - return manifest_verified; -} - -void ManifestIndexManager::onManifestReceived(Interest::Ptr &&i, - ContentObject::Ptr &&c) { - onManifest(std::move(c)); - if (next_interest_) { - next_interest_->scheduleNextInterests(); - } -} - -void ManifestIndexManager::onManifestTimeout(Interest::Ptr &&i) { - const Name &n = i->getName(); - uint32_t segment = n.getSuffix(); - - if (segment > final_suffix_) { - return; - } - - // Get portal - std::shared_ptr<interface::BasePortal> portal; - socket_->getSocketOption(GeneralTransportOptions::PORTAL, portal); - - // Send requests for manifest out of the congestion window (no - // in_flight_interests++) - portal->sendInterest( - std::move(i), - std::bind(&ManifestIndexManager::onManifestReceived, this, - std::placeholders::_1, std::placeholders::_2), - std::bind(&ManifestIndexManager::onManifestTimeout, this, - std::placeholders::_1)); -} - -void ManifestIndexManager::fillWindow(Name &name, uint32_t current_manifest) { - /* Send as many manifest as required for filling window. */ - uint32_t interest_lifetime; - double window_size; - std::shared_ptr<interface::BasePortal> portal; - Interest::Ptr interest; - uint32_t current_segment = *next_to_retrieve_segment_; - // suffix_manifest_ now points to the next manifest to request - uint32_t last_requested_manifest = (suffix_manifest_++).getSuffix(); - - socket_->getSocketOption(GeneralTransportOptions::PORTAL, portal); - socket_->getSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, - interest_lifetime); - socket_->getSocketOption(GeneralTransportOptions::CURRENT_WINDOW_SIZE, - window_size); - - if (TRANSPORT_EXPECT_FALSE(suffix_manifest_.getSuffix() >= final_suffix_)) { - suffix_manifest_.updateSuffix(last_requested_manifest); - return; - } - - if (current_segment + window_size < suffix_manifest_.getSuffix() && - current_manifest != last_requested_manifest) { - suffix_manifest_.updateSuffix(last_requested_manifest); - return; - } - - do { - interest = getPacket(); - name.setSuffix(suffix_manifest_.getSuffix()); - interest->setName(name); - interest->setLifetime(interest_lifetime); - - // Send interests for manifest out of the congestion window (no - // in_flight_interests++) - portal->sendInterest( - std::move(interest), - std::bind(&ManifestIndexManager::onManifestReceived, this, - std::placeholders::_1, std::placeholders::_2), - std::bind(&ManifestIndexManager::onManifestTimeout, this, - std::placeholders::_1)); - - last_requested_manifest = (suffix_manifest_++).getSuffix(); - } while (current_segment + window_size >= suffix_manifest_.getSuffix() && - suffix_manifest_.getSuffix() < final_suffix_); - - // suffix_manifest_ now points to the last requested manifest - suffix_manifest_.updateSuffix(last_requested_manifest); -} - -bool ManifestIndexManager::onContentObject( - const core::ContentObject &content_object) { - bool verify_signature; - socket_->getSocketOption(GeneralTransportOptions::VERIFY_SIGNATURE, - verify_signature); - - if (!verify_signature) { - return true; - } - - uint64_t segment = content_object.getName().getSuffix(); - - bool ret = false; - - auto it = suffix_hash_map_.find((const unsigned int)segment); - if (it != suffix_hash_map_.end()) { - auto hash_type = static_cast<utils::CryptoHashType>(it->second.second); - auto data_packet_digest = content_object.computeDigest(it->second.second); - auto data_packet_digest_bytes = - data_packet_digest.getDigest<uint8_t>().data(); - std::vector<uint8_t> &manifest_digest_bytes = it->second.first; - - if (utils::CryptoHash::compareBinaryDigest(data_packet_digest_bytes, - manifest_digest_bytes.data(), - hash_type)) { - suffix_hash_map_.erase(it); - ret = true; - } else { - throw errors::RuntimeException( - "Verification failure policy has to be implemented."); - } - } - - return ret; -} - -uint32_t ManifestIndexManager::getNextSuffix() { - if (TRANSPORT_EXPECT_FALSE(next_to_retrieve_segment_ == - suffix_queue_.end())) { - return invalid_index; - } - - return *next_to_retrieve_segment_++; -} - -uint32_t ManifestIndexManager::getFinalSuffix() { return final_suffix_; } - -bool ManifestIndexManager::isFinalSuffixDiscovered() { - return IncrementalIndexManager::isFinalSuffixDiscovered(); -} - -uint32_t ManifestIndexManager::getNextReassemblySegment() { - uint32_t current_reassembly_segment; - - while (true) { - current_reassembly_segment = next_reassembly_segment_.getSuffix(); - next_reassembly_segment_++; - - if (TRANSPORT_EXPECT_FALSE(current_reassembly_segment > final_suffix_)) { - return invalid_index; - } - - if (ignored_segments_.empty()) break; - - auto is_ignored = - std::find(ignored_segments_.begin(), ignored_segments_.end(), - current_reassembly_segment); - - if (is_ignored == ignored_segments_.end()) break; - - ignored_segments_.erase(is_ignored); - } - - return current_reassembly_segment; -} - -void ManifestIndexManager::reset() { - IncrementalIndexManager::reset(); - suffix_manifest_.reset(0); - suffix_queue_.clear(); - suffix_hash_map_.clear(); -} - -} // end namespace protocol - -} // end namespace transport diff --git a/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.h b/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.h deleted file mode 100644 index 645b20e9a..000000000 --- a/libtransport/src/hicn/transport/protocols/manifest_indexing_manager.h +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include <hicn/transport/interfaces/socket.h> -#include <hicn/transport/protocols/indexing_manager.h> -#include <hicn/transport/utils/suffix_strategy.h> - -#include <list> - -namespace transport { - -namespace protocol { - -class ManifestIndexManager : public IncrementalIndexManager, - public PacketManager<Interest> { - static constexpr double alpha = 0.3; - - public: - using SuffixQueue = std::list<uint32_t>; - using HashEntry = std::pair<std::vector<uint8_t>, core::HashAlgorithm>; - - ManifestIndexManager(interface::ConsumerSocket *icn_socket, - TransportProtocol *next_interest); - - virtual ~ManifestIndexManager() = default; - - void reset() override; - - bool onManifest(core::ContentObject::Ptr &&content_object) override; - - bool onContentObject(const core::ContentObject &content_object) override; - - uint32_t getNextSuffix() override; - - uint32_t getNextReassemblySegment() override; - - bool isFinalSuffixDiscovered() override; - - uint32_t getFinalSuffix() override; - - private: - void onManifestReceived(Interest::Ptr &&i, ContentObject::Ptr &&c); - void onManifestTimeout(Interest::Ptr &&i); - void fillWindow(Name &name, uint32_t current_manifest); - - protected: - SuffixQueue suffix_queue_; - SuffixQueue::iterator next_to_retrieve_segment_; - utils::SuffixManifest suffix_manifest_; - utils::SuffixContent next_reassembly_segment_; - - // Holds segments that should not be requested. Useful when - // computing the next reassembly segment because some manifests - // may be incomplete. - std::vector<uint32_t> ignored_segments_; - - // Hash verification - std::unordered_map<uint32_t, - std::pair<std::vector<uint8_t>, core::HashAlgorithm>> - suffix_hash_map_; - - // (temporary) To call scheduleNextInterests() after receiving a manifest - TransportProtocol *next_interest_; -}; - -} // end namespace protocol - -} // end namespace transport diff --git a/libtransport/src/hicn/transport/protocols/protocol.cc b/libtransport/src/hicn/transport/protocols/protocol.cc index 8da9529d6..a0f847453 100644 --- a/libtransport/src/hicn/transport/protocols/protocol.cc +++ b/libtransport/src/hicn/transport/protocols/protocol.cc @@ -22,9 +22,16 @@ namespace protocol { using namespace interface; -TransportProtocol::TransportProtocol(interface::ConsumerSocket *icn_socket) - : socket_(icn_socket), is_running_(false), is_first_(false) { +TransportProtocol::TransportProtocol(interface::ConsumerSocket *icn_socket, + Reassembly *reassembly_protocol) + : socket_(icn_socket), + reassembly_protocol_(reassembly_protocol), + index_manager_( + std::make_unique<IndexManager>(socket_, this, reassembly_protocol)), + is_running_(false), + is_first_(false) { socket_->getSocketOption(GeneralTransportOptions::PORTAL, portal_); + socket_->getSocketOption(OtherOptions::STATISTICS, &stats_); } int TransportProtocol::start() { @@ -71,6 +78,26 @@ void TransportProtocol::resume() { is_running_ = false; } +void TransportProtocol::onContentReassembled(std::error_code ec) { + interface::ConsumerSocket::ReadCallback *on_payload = VOID_HANDLER; + socket_->getSocketOption(READ_CALLBACK, &on_payload); + + if (!on_payload) { + throw errors::RuntimeException( + "The read callback must be installed in the transport before " + "starting " + "the content retrieval."); + } + + if (!ec) { + on_payload->readSuccess(stats_->getBytesRecv()); + } else { + on_payload->readError(ec); + } + + stop(); +} + } // end namespace protocol } // end namespace transport diff --git a/libtransport/src/hicn/transport/protocols/protocol.h b/libtransport/src/hicn/transport/protocols/protocol.h index e4821b6a0..87fab588b 100644 --- a/libtransport/src/hicn/transport/protocols/protocol.h +++ b/libtransport/src/hicn/transport/protocols/protocol.h @@ -18,7 +18,10 @@ #include <atomic> #include <hicn/transport/interfaces/socket.h> +#include <hicn/transport/protocols/data_processing_events.h> +#include <hicn/transport/protocols/indexer.h> #include <hicn/transport/protocols/packet_manager.h> +#include <hicn/transport/protocols/reassembly.h> #include <hicn/transport/protocols/statistics.h> #include <hicn/transport/utils/object_pool.h> @@ -28,6 +31,8 @@ namespace protocol { using namespace core; +class IndexVerificationManager; + class TransportProtocolCallback { virtual void onContentObject(const core::Interest &interest, const core::ContentObject &content_object) = 0; @@ -35,11 +40,15 @@ class TransportProtocolCallback { }; class TransportProtocol : public interface::BasePortal::ConsumerCallback, - public PacketManager<Interest> { + public PacketManager<Interest>, + public ContentObjectProcessingEventCallback { static constexpr std::size_t interest_pool_size = 4096; + friend class ManifestIndexManager; + public: - TransportProtocol(interface::ConsumerSocket *icn_socket); + TransportProtocol(interface::ConsumerSocket *icn_socket, + Reassembly *reassembly_protocol); virtual ~TransportProtocol() = default; @@ -53,6 +62,12 @@ class TransportProtocol : public interface::BasePortal::ConsumerCallback, virtual void scheduleNextInterests() = 0; + // Events generated by the indexing + virtual void onContentReassembled(std::error_code ec); + virtual void onPacketDropped(Interest::Ptr &&interest, + ContentObject::Ptr &&content_object) = 0; + virtual void onReassemblyFailed(std::uint32_t missing_segment) = 0; + protected: // Consumer Callback virtual void reset() = 0; @@ -61,13 +76,14 @@ class TransportProtocol : public interface::BasePortal::ConsumerCallback, protected: interface::ConsumerSocket *socket_; + std::unique_ptr<Reassembly> reassembly_protocol_; + std::unique_ptr<IndexManager> index_manager_; std::shared_ptr<interface::BasePortal> portal_; std::atomic<bool> is_running_; // True if it si the first time we schedule an interest std::atomic<bool> is_first_; - TransportStatistics stats_; + TransportStatistics *stats_; }; } // end namespace protocol - } // end namespace transport diff --git a/libtransport/src/hicn/transport/protocols/raaqm.cc b/libtransport/src/hicn/transport/protocols/raaqm.cc index a57eb7cd9..641ae45c3 100644 --- a/libtransport/src/hicn/transport/protocols/raaqm.cc +++ b/libtransport/src/hicn/transport/protocols/raaqm.cc @@ -14,8 +14,9 @@ */ #include <hicn/transport/interfaces/socket_consumer.h> -#include <hicn/transport/protocols/manifest_indexing_manager.h> +#include <hicn/transport/protocols/indexer.h> #include <hicn/transport/protocols/raaqm.h> +#include <hicn/transport/protocols/errors.h> #include <cstdlib> #include <fstream> @@ -26,9 +27,8 @@ namespace protocol { using namespace interface; -RaaqmTransportProtocol::RaaqmTransportProtocol(ConsumerSocket *icnet_socket) - : TransportProtocol(icnet_socket), - BaseReassembly(icnet_socket, this, this), +RaaqmTransportProtocol::RaaqmTransportProtocol(ConsumerSocket *icn_socket) + : TransportProtocol(icn_socket, new ByteStreamReassembly(icn_socket, this)), current_window_size_(1), interests_in_flight_(0), cur_path_(nullptr), @@ -101,13 +101,14 @@ void RaaqmTransportProtocol::reset() { // Set first segment to retrieve core::Name *name; socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, &name); + index_manager_->reset(); index_manager_->setFirstSuffix(name->getSuffix()); std::queue<Interest::Ptr> empty; std::swap(interest_to_retransmit_, empty); - stats_.reset(); + stats_->reset(); // Reset reassembly component - BaseReassembly::reset(); + reassembly_protocol_->reInitialize(); // Reset protocol variables interests_in_flight_ = 0; @@ -309,8 +310,6 @@ void RaaqmTransportProtocol::init() { void RaaqmTransportProtocol::onContentObject( Interest::Ptr &&interest, ContentObject::Ptr &&content_object) { - uint32_t incremental_suffix = content_object->getName().getSuffix(); - // Check whether makes sense to continue if (TRANSPORT_EXPECT_FALSE(!is_running_)) { return; @@ -331,27 +330,17 @@ void RaaqmTransportProtocol::onContentObject( (*callback_interest)(*socket_, *interest); } - if (TRANSPORT_EXPECT_FALSE(content_object->getPayloadType() == - PayloadType::MANIFEST)) { - if (TRANSPORT_EXPECT_FALSE(incremental_suffix == 0)) { - index_manager_ = manifest_index_manager_.get(); - interests_in_flight_--; - } - - index_manager_->onManifest(std::move(content_object)); - - } else if (content_object->getPayloadType() == PayloadType::CONTENT_OBJECT) { - onContentSegment(std::move(interest), std::move(content_object)); + if (content_object->getPayloadType() == PayloadType::CONTENT_OBJECT) { + stats_->updateBytesRecv(content_object->payloadSize()); } + onContentSegment(std::move(interest), std::move(content_object)); scheduleNextInterests(); } void RaaqmTransportProtocol::onContentSegment( Interest::Ptr &&interest, ContentObject::Ptr &&content_object) { uint32_t incremental_suffix = content_object->getName().getSuffix(); - bool virtual_download = false; - socket_->getSocketOption(OtherOptions::VIRTUAL_DOWNLOAD, virtual_download); // Decrease in-flight interests interests_in_flight_--; @@ -361,28 +350,55 @@ void RaaqmTransportProtocol::onContentSegment( afterContentReception(*interest, *content_object); } - if (index_manager_->onContentObject(*content_object)) { - stats_.updateBytesRecv(content_object->payloadSize()); + index_manager_->onContentObject(std::move(interest), + std::move(content_object)); +} - if (!virtual_download) { - reassemble(std::move(content_object)); - } else if (TRANSPORT_EXPECT_FALSE(incremental_suffix == - index_manager_->getFinalSuffix())) { - interface::ConsumerSocket::ReadCallback *on_payload = VOID_HANDLER; - socket_->getSocketOption(READ_CALLBACK, &on_payload); +void RaaqmTransportProtocol::onPacketDropped( + Interest::Ptr &&interest, ContentObject::Ptr &&content_object) { + uint32_t max_rtx = 0; + socket_->getSocketOption(GeneralTransportOptions::MAX_INTEREST_RETX, max_rtx); - if (on_payload) { - on_payload->readSuccess(stats_.getBytesRecv()); - } + uint64_t segment = interest->getName().getSuffix(); + ConsumerInterestCallback *callback = VOID_HANDLER; + if (TRANSPORT_EXPECT_TRUE(interest_retransmissions_[segment & mask] < + max_rtx)) { + stats_->updateRetxCount(1); + + callback = VOID_HANDLER; + socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_RETRANSMISSION, + &callback); + if (*callback) { + (*callback)(*socket_, *interest); } + + callback = VOID_HANDLER; + socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT, + &callback); + if (*callback) { + (*callback)(*socket_, *interest); + } + + if (!is_running_) { + return; + } + + interest_retransmissions_[segment & mask]++; + + interest_to_retransmit_.push(std::move(interest)); } else { - // TODO Application policy check - // unverified_segments_.emplace( - // std::make_pair(incremental_suffix, std::move(content_object))); - TRANSPORT_LOGE("Received not trusted segment."); + TRANSPORT_LOGE( + "Stop: received not trusted packet %llu times", + (unsigned long long)interest_retransmissions_[segment & mask]); + onContentReassembled( + make_error_code(protocol_error::max_retransmissions_error)); } } +void RaaqmTransportProtocol::onReassemblyFailed(std::uint32_t missing_segment) { + +} + void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) { checkForStalePaths(); @@ -399,7 +415,7 @@ void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) { uint64_t segment = n.getSuffix(); // Do not retransmit interests asking contents that do not exist. - if (segment >= index_manager_->getFinalSuffix()) { + if (segment > index_manager_->getFinalSuffix()) { return; } @@ -417,7 +433,7 @@ void RaaqmTransportProtocol::onTimeout(Interest::Ptr &&interest) { if (TRANSPORT_EXPECT_TRUE(interest_retransmissions_[segment & mask] < max_rtx)) { - stats_.updateRetxCount(1); + stats_->updateRetxCount(1); callback = VOID_HANDLER; socket_->getSocketOption(ConsumerCallbacksOptions::INTEREST_RETRANSMISSION, @@ -515,24 +531,8 @@ void RaaqmTransportProtocol::sendInterest(Interest::Ptr &&interest) { } void RaaqmTransportProtocol::onContentReassembled(std::error_code ec) { - interface::ConsumerSocket::ReadCallback *on_payload = VOID_HANDLER; - socket_->getSocketOption(READ_CALLBACK, &on_payload); - - if (!on_payload) { - throw errors::RuntimeException( - "The read callback must be installed in the transport before " - "starting " - "the content retrieval."); - } - - if (!ec) { - on_payload->readSuccess(stats_.getBytesRecv()); - } else { - on_payload->readError(ec); - } - rate_estimator_->onDownloadFinished(); - stop(); + TransportProtocol::onContentReassembled(ec); } void RaaqmTransportProtocol::updateRtt(uint64_t segment) { @@ -567,7 +567,7 @@ void RaaqmTransportProtocol::RAAQM() { // Change drop probability according to RTT statistics cur_path_->updateDropProb(); - double coin = ((double) rand() / (RAND_MAX)); + double coin = ((double)rand() / (RAND_MAX)); if (coin <= cur_path_->getDropProb()) { decreaseWindow(); } @@ -577,8 +577,8 @@ void RaaqmTransportProtocol::RAAQM() { void RaaqmTransportProtocol::updateStats(uint32_t suffix, uint64_t rtt, utils::TimePoint &now) { // Update RTT statistics - stats_.updateAverageRtt(rtt); - stats_.updateAverageWindowSize(current_window_size_); + stats_->updateAverageRtt(rtt); + stats_->updateAverageWindowSize(current_window_size_); // Call statistics callback ConsumerTimerCallback *stats_callback = VOID_HANDLER; @@ -591,7 +591,7 @@ void RaaqmTransportProtocol::updateStats(uint32_t suffix, uint64_t rtt, socket_->getSocketOption(GeneralTransportOptions::STATS_INTERVAL, timer_interval_milliseconds); if (dt.count() > timer_interval_milliseconds) { - (*stats_callback)(*socket_, stats_); + (*stats_callback)(*socket_, *stats_); t0_ = utils::SteadyClock::now(); } } diff --git a/libtransport/src/hicn/transport/protocols/raaqm.h b/libtransport/src/hicn/transport/protocols/raaqm.h index 09d22cd4f..7fc540c9f 100644 --- a/libtransport/src/hicn/transport/protocols/raaqm.h +++ b/libtransport/src/hicn/transport/protocols/raaqm.h @@ -15,11 +15,11 @@ #pragma once +#include <hicn/transport/protocols/byte_stream_reassembly.h> #include <hicn/transport/protocols/congestion_window_protocol.h> #include <hicn/transport/protocols/protocol.h> #include <hicn/transport/protocols/raaqm_data_path.h> #include <hicn/transport/protocols/rate_estimation.h> -#include <hicn/transport/protocols/reassembly.h> #include <hicn/transport/utils/chrono_typedefs.h> #include <queue> @@ -29,11 +29,8 @@ namespace transport { namespace protocol { -class RaaqmTransportProtocol - : public TransportProtocol, - public BaseReassembly, - public CWindowProtocol, - public BaseReassembly::ContentReassembledCallback { +class RaaqmTransportProtocol : public TransportProtocol, + public CWindowProtocol { public: RaaqmTransportProtocol(interface::ConsumerSocket *icnet_socket); @@ -70,6 +67,11 @@ class RaaqmTransportProtocol void onContentSegment(Interest::Ptr &&interest, ContentObject::Ptr &&content_object); + void onPacketDropped(Interest::Ptr &&interest, + ContentObject::Ptr &&content_object) override; + + void onReassemblyFailed(std::uint32_t missing_segment) override; + void onTimeout(Interest::Ptr &&i) override; virtual void scheduleNextInterests() override; diff --git a/libtransport/src/hicn/transport/protocols/reassembly.cc b/libtransport/src/hicn/transport/protocols/reassembly.cc index c45d876a0..9682d338d 100644 --- a/libtransport/src/hicn/transport/protocols/reassembly.cc +++ b/libtransport/src/hicn/transport/protocols/reassembly.cc @@ -14,7 +14,8 @@ */ #include <hicn/transport/interfaces/socket_consumer.h> -#include <hicn/transport/protocols/indexing_manager.h> +#include <hicn/transport/protocols/errors.h> +#include <hicn/transport/protocols/indexer.h> #include <hicn/transport/protocols/reassembly.h> #include <hicn/transport/utils/array.h> #include <hicn/transport/utils/membuf.h> @@ -23,66 +24,7 @@ namespace transport { namespace protocol { -BaseReassembly::BaseReassembly(interface::ConsumerSocket *icn_socket, - ContentReassembledCallback *content_callback, - TransportProtocol *next_interest) - : reassembly_consumer_socket_(icn_socket), - incremental_index_manager_( - std::make_unique<IncrementalIndexManager>(icn_socket)), - manifest_index_manager_( - std::make_unique<ManifestIndexManager>(icn_socket, next_interest)), - index_manager_(incremental_index_manager_.get()), - index_(0), - read_buffer_(nullptr) { - setContentCallback(content_callback); -} - -void BaseReassembly::reassemble(ContentObject::Ptr &&content_object) { - if (TRANSPORT_EXPECT_TRUE(content_object != nullptr)) { - received_packets_.emplace(std::make_pair( - content_object->getName().getSuffix(), std::move(content_object))); - } - - auto it = received_packets_.find((const unsigned int)index_); - while (it != received_packets_.end()) { - if (it->second->getPayloadType() == PayloadType::CONTENT_OBJECT) { - copyContent(*it->second); - received_packets_.erase(it); - } - - index_ = index_manager_->getNextReassemblySegment(); - it = received_packets_.find((const unsigned int)index_); - } -} - -void BaseReassembly::copyContent(const ContentObject &content_object) { - auto a = content_object.getPayload(); - auto payload_length = a->length(); - auto write_size = std::min(payload_length, read_buffer_->tailroom()); - auto additional_bytes = payload_length > read_buffer_->tailroom() - ? payload_length - read_buffer_->tailroom() - : 0; - - std::memcpy(read_buffer_->writableTail(), a->data(), write_size); - read_buffer_->append(write_size); - - if (!read_buffer_->tailroom()) { - notifyApplication(); - std::memcpy(read_buffer_->writableTail(), a->data() + write_size, - additional_bytes); - read_buffer_->append(additional_bytes); - } - - bool download_completed = - index_manager_->getFinalSuffix() == content_object.getName().getSuffix(); - - if (TRANSPORT_EXPECT_FALSE(download_completed)) { - notifyApplication(); - content_callback_->onContentReassembled(std::make_error_code(std::errc(0))); - } -} - -void BaseReassembly::notifyApplication() { +void Reassembly::notifyApplication() { interface::ConsumerSocket::ReadCallback *read_callback = nullptr; reassembly_consumer_socket_->getSocketOption( interface::ConsumerCallbacksOptions::READ_CALLBACK, &read_callback); @@ -124,21 +66,5 @@ void BaseReassembly::notifyApplication() { } } -void BaseReassembly::reset() { - manifest_index_manager_->reset(); - incremental_index_manager_->reset(); - index_ = index_manager_->getNextReassemblySegment(); - - received_packets_.clear(); - - // reset read buffer - interface::ConsumerSocket::ReadCallback *read_callback; - reassembly_consumer_socket_->getSocketOption( - interface::ConsumerCallbacksOptions::READ_CALLBACK, &read_callback); - - read_buffer_ = utils::MemBuf::create(read_callback->maxBufferSize()); -} - } // namespace protocol - } // namespace transport diff --git a/libtransport/src/hicn/transport/protocols/reassembly.h b/libtransport/src/hicn/transport/protocols/reassembly.h index e859ca294..34af2a70a 100644 --- a/libtransport/src/hicn/transport/protocols/reassembly.h +++ b/libtransport/src/hicn/transport/protocols/reassembly.h @@ -15,17 +15,20 @@ #pragma once -#include <hicn/transport/core/content_object.h> -#include <hicn/transport/protocols/manifest_indexing_manager.h> +#include <hicn/transport/core/facade.h> namespace transport { namespace interface { class ConsumerReadCallback; -} +class ConsumerSocket; +} // namespace interface namespace protocol { +class TransportProtocol; +class Indexer; + // Forward Declaration class ManifestManager; @@ -36,41 +39,26 @@ class Reassembly { virtual void onContentReassembled(std::error_code ec) = 0; }; - virtual void reassemble(ContentObject::Ptr &&content_object) = 0; - virtual void reset() = 0; - virtual void setContentCallback(ContentReassembledCallback *callback) { - content_callback_ = callback; - } + Reassembly(interface::ConsumerSocket *icn_socket, + TransportProtocol *transport_protocol) + : reassembly_consumer_socket_(icn_socket), + transport_protocol_(transport_protocol) {} - protected: - ContentReassembledCallback *content_callback_; -}; + virtual ~Reassembly() = default; -class BaseReassembly : public Reassembly { - public: - BaseReassembly(interface::ConsumerSocket *icn_socket, - ContentReassembledCallback *content_callback, - TransportProtocol *next_interest); + virtual void reassemble(core::ContentObject::Ptr &&content_object) = 0; + virtual void reassemble( + std::unique_ptr<core::ContentObjectManifest> &&manifest) = 0; + virtual void reInitialize() = 0; + virtual void setIndexer(Indexer *indexer) { index_manager_ = indexer; } protected: - virtual void reassemble(ContentObject::Ptr &&content_object) override; - - virtual void copyContent(const ContentObject &content_object); - - virtual void reset() override; - - private: - void notifyApplication(); + virtual void notifyApplication(); protected: - // The consumer socket interface::ConsumerSocket *reassembly_consumer_socket_; - std::unique_ptr<IncrementalIndexManager> incremental_index_manager_; - std::unique_ptr<ManifestIndexManager> manifest_index_manager_; - IndexVerificationManager *index_manager_; - std::unordered_map<std::uint32_t, ContentObject::Ptr> received_packets_; - - uint32_t index_; + TransportProtocol *transport_protocol_; + Indexer *index_manager_; std::unique_ptr<utils::MemBuf> read_buffer_; }; diff --git a/libtransport/src/hicn/transport/protocols/rtc.cc b/libtransport/src/hicn/transport/protocols/rtc.cc index 559e86592..e371217f8 100644 --- a/libtransport/src/hicn/transport/protocols/rtc.cc +++ b/libtransport/src/hicn/transport/protocols/rtc.cc @@ -13,11 +13,12 @@ * limitations under the License. */ -#include <math.h> -#include <random> +#include <hicn/transport/protocols/rtc.h> #include <hicn/transport/interfaces/socket_consumer.h> -#include <hicn/transport/protocols/rtc.h> + +#include <math.h> +#include <random> namespace transport { @@ -26,14 +27,16 @@ namespace protocol { using namespace interface; RTCTransportProtocol::RTCTransportProtocol( - interface::ConsumerSocket *icnet_socket) - : TransportProtocol(icnet_socket), + interface::ConsumerSocket *icn_socket) + : TransportProtocol(icn_socket, nullptr), + DatagramReassembly(icn_socket, this), inflightInterests_(1 << default_values::log_2_default_buffer_size), modMask_((1 << default_values::log_2_default_buffer_size) - 1) { - icnet_socket->getSocketOption(PORTAL, portal_); + icn_socket->getSocketOption(PORTAL, portal_); rtx_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService()); probe_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService()); - sentinel_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService()); + sentinel_timer_ = + std::make_unique<asio::steady_timer>(portal_->getIoService()); round_timer_ = std::make_unique<asio::steady_timer>(portal_->getIoService()); reset(); } @@ -147,8 +150,7 @@ uint32_t min(uint32_t a, uint32_t b) { } void RTCTransportProtocol::newRound() { - round_timer_->expires_from_now(std::chrono::milliseconds( - HICN_ROUND_LEN)); + round_timer_->expires_from_now(std::chrono::milliseconds(HICN_ROUND_LEN)); round_timer_->async_wait([this](std::error_code ec) { if (ec) return; updateStats(HICN_ROUND_LEN); @@ -281,10 +283,10 @@ void RTCTransportProtocol::updateStats(uint32_t round_duration) { &stats_callback); if (*stats_callback) { // Send the stats to the app - stats_.updateQueuingDelay(queuingDelay_); - stats_.updateLossRatio(lossRate_); - stats_.updateAverageRtt(pathTable_[producerPathLabels_[1]]->getMinRtt()); - (*stats_callback)(*socket_, stats_); + stats_->updateQueuingDelay(queuingDelay_); + stats_->updateLossRatio(lossRate_); + stats_->updateAverageRtt(pathTable_[producerPathLabels_[1]]->getMinRtt()); + (*stats_callback)(*socket_, *stats_); } // bound also by interest lifitime* production rate @@ -301,9 +303,9 @@ void RTCTransportProtocol::updateStats(uint32_t round_duration) { updateCCState(); updateWindow(); - if(queuingDelay_ > 25.0){ - //this indicates that the client will go soon out of synch, - //switch to synch mode + if (queuingDelay_ > 25.0) { + // this indicates that the client will go soon out of synch, + // switch to synch mode if (currentState_ == HICN_RTC_NORMAL_STATE) { currentState_ = HICN_RTC_SYNC_STATE; } @@ -358,8 +360,7 @@ void RTCTransportProtocol::computeMaxWindow(uint32_t productionRate, maxCWin_ = min(maxWaintingInterest, maxCWin_); } - if(maxCWin_ < HICN_MIN_CWIN) - maxCWin_ = HICN_MIN_CWIN; + if (maxCWin_ < HICN_MIN_CWIN) maxCWin_ = HICN_MIN_CWIN; } void RTCTransportProtocol::updateWindow() { @@ -518,68 +519,64 @@ void RTCTransportProtocol::scheduleNextInterests() { } } -void RTCTransportProtocol::sentinelTimer(){ +void RTCTransportProtocol::sentinelTimer() { uint32_t wait = 50; - if(pathTable_.find(producerPathLabels_[0]) != pathTable_.end() && - pathTable_.find(producerPathLabels_[1]) != pathTable_.end()){ - //we have all the info to set the timers + if (pathTable_.find(producerPathLabels_[0]) != pathTable_.end() && + pathTable_.find(producerPathLabels_[1]) != pathTable_.end()) { + // we have all the info to set the timers wait = round(pathTable_[producerPathLabels_[0]]->getInterArrivalGap()); - if(wait == 0) - wait = 1; + if (wait == 0) wait = 1; } sentinel_timer_->expires_from_now(std::chrono::milliseconds(wait)); sentinel_timer_->async_wait([this](std::error_code ec) { - if (ec) return; - uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); - - if(pathTable_.find(producerPathLabels_[0]) == pathTable_.end() || - pathTable_.find(producerPathLabels_[1]) == pathTable_.end()){ - //we have no info, so we send again - - for(auto it = packets_in_window_.begin(); - it != packets_in_window_.end(); it++){ - uint32_t pkt = it->first & modMask_; - if (inflightInterests_[pkt].sequence == it->first) { - inflightInterests_[pkt].transmissionTime = now; - Name *interest_name = nullptr; - socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, - &interest_name); - interest_name->setSuffix(it->first); - it->second++; - sendInterest(interest_name, true); - } + uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + + if (pathTable_.find(producerPathLabels_[0]) == pathTable_.end() || + pathTable_.find(producerPathLabels_[1]) == pathTable_.end()) { + // we have no info, so we send again + + for (auto it = packets_in_window_.begin(); it != packets_in_window_.end(); + it++) { + uint32_t pkt = it->first & modMask_; + if (inflightInterests_[pkt].sequence == it->first) { + inflightInterests_[pkt].transmissionTime = now; + Name *interest_name = nullptr; + socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, + &interest_name); + interest_name->setSuffix(it->first); + it->second++; + sendInterest(interest_name, true); } - }else{ - uint64_t max_waiting_time = //wait at least 50ms - (pathTable_[producerPathLabels_[1]]->getMinRtt() - - pathTable_[producerPathLabels_[0]]->getMinRtt()) + - (ceil(pathTable_[producerPathLabels_[0]]->getInterArrivalGap()) * 50); + } + } else { + uint64_t max_waiting_time = // wait at least 50ms + (pathTable_[producerPathLabels_[1]]->getMinRtt() - + pathTable_[producerPathLabels_[0]]->getMinRtt()) + + (ceil(pathTable_[producerPathLabels_[0]]->getInterArrivalGap()) * 50); - if((currentState_ == HICN_RTC_NORMAL_STATE) && + if ((currentState_ == HICN_RTC_NORMAL_STATE) && (inflightInterestsCount_ >= currentCWin_) && - ((now - lastEvent_) > max_waiting_time) && - (lossRate_ >= 0.05)){ + ((now - lastEvent_) > max_waiting_time) && (lossRate_ >= 0.05)) { + uint64_t RTT = pathTable_[producerPathLabels_[1]]->getMinRtt(); - uint64_t RTT = pathTable_[producerPathLabels_[1]]->getMinRtt(); - - for(auto it = packets_in_window_.begin(); - it != packets_in_window_.end(); it++){ + for (auto it = packets_in_window_.begin(); + it != packets_in_window_.end(); it++) { uint32_t pkt = it->first & modMask_; if (inflightInterests_[pkt].sequence == it->first && - ((now - inflightInterests_[pkt].transmissionTime) >= RTT)){ - inflightInterests_[pkt].transmissionTime = now; - Name *interest_name = nullptr; - socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, - &interest_name); - interest_name->setSuffix(it->first); - it->second++; - sendInterest(interest_name, true); + ((now - inflightInterests_[pkt].transmissionTime) >= RTT)) { + inflightInterests_[pkt].transmissionTime = now; + Name *interest_name = nullptr; + socket_->getSocketOption(GeneralTransportOptions::NETWORK_NAME, + &interest_name); + interest_name->setSuffix(it->first); + it->second++; + sendInterest(interest_name, true); } } } @@ -754,8 +751,8 @@ void RTCTransportProtocol::onTimeout(Interest::Ptr &&interest) { // and over until we get at least a packet inflightInterestsCount_--; lastEvent_ = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); + std::chrono::steady_clock::now().time_since_epoch()) + .count(); packets_in_window_.erase(segmentNumber); scheduleNextInterests(); return; @@ -763,8 +760,8 @@ void RTCTransportProtocol::onTimeout(Interest::Ptr &&interest) { if (inflightInterests_[pkt].state == sent_) { lastEvent_ = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); + std::chrono::steady_clock::now().time_since_epoch()) + .count(); packets_in_window_.erase(segmentNumber); inflightInterestsCount_--; } @@ -890,30 +887,29 @@ void RTCTransportProtocol::onContentObject( return; } - //check if the packet is a rtx + // check if the packet is a rtx bool is_rtx = false; - if(interestRetransmissions_.find(segmentNumber) != - interestRetransmissions_.end()){ + if (interestRetransmissions_.find(segmentNumber) != + interestRetransmissions_.end()) { is_rtx = true; - }else{ + } else { auto it_win = packets_in_window_.find(segmentNumber); - if(it_win != packets_in_window_.end() && - it_win->second != 0) - is_rtx = true; + if (it_win != packets_in_window_.end() && it_win->second != 0) + is_rtx = true; } if (payload_size == HICN_NACK_HEADER_SIZE) { if (inflightInterests_[pkt].state == sent_) { lastEvent_ = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); + std::chrono::steady_clock::now().time_since_epoch()) + .count(); packets_in_window_.erase(segmentNumber); inflightInterestsCount_--; } bool old_nack = false; - if (!is_rtx){ + if (!is_rtx) { // this is not a retransmitted packet old_nack = onNack(*content_object, false); updateDelayStats(*content_object); @@ -924,8 +920,8 @@ void RTCTransportProtocol::onContentObject( // the nacked_ state is used only to avoid to decrease // inflightInterestsCount_ multiple times. In fact, every time that we // receive an event related to an interest (timeout, nacked, content) we - // cange the state. In this way we are sure that we do not decrease twice the - // counter + // cange the state. In this way we are sure that we do not decrease twice + // the counter if (old_nack) { inflightInterests_[pkt].state = lost_; interestRetransmissions_.erase(segmentNumber); @@ -942,13 +938,13 @@ void RTCTransportProtocol::onContentObject( if (inflightInterests_[pkt].state == sent_) { lastEvent_ = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::steady_clock::now().time_since_epoch()) - .count(); + std::chrono::steady_clock::now().time_since_epoch()) + .count(); packets_in_window_.erase(segmentNumber); inflightInterestsCount_--; // packet sent without timeouts } - if (inflightInterests_[pkt].state == sent_ && !is_rtx){ + if (inflightInterests_[pkt].state == sent_ && !is_rtx) { // delay stats are computed only for non retransmitted data updateDelayStats(*content_object); } @@ -979,52 +975,6 @@ void RTCTransportProtocol::onContentObject( scheduleNextInterests(); } -void RTCTransportProtocol::returnContentToApplication( - const ContentObject &content_object) { - // return content to the user - auto read_buffer = content_object.getPayload(); - - read_buffer->trimStart(HICN_TIMESTAMP_SIZE); - - interface::ConsumerSocket::ReadCallback *read_callback = nullptr; - socket_->getSocketOption(READ_CALLBACK, &read_callback); - - if (read_callback == nullptr) { - throw errors::RuntimeException( - "The read callback must be installed in the transport before starting " - "the content retrieval."); - } - - if (read_callback->isBufferMovable()) { - read_callback->readBufferAvailable( - utils::MemBuf::copyBuffer(read_buffer->data(), read_buffer->length())); - } else { - // The buffer will be copied into the application-provided buffer - uint8_t *buffer; - std::size_t length; - std::size_t total_length = read_buffer->length(); - - while (read_buffer->length()) { - buffer = nullptr; - length = 0; - read_callback->getReadBuffer(&buffer, &length); - - if (!buffer || !length) { - throw errors::RuntimeException( - "Invalid buffer provided by the application."); - } - - auto to_copy = std::min(read_buffer->length(), length); - - std::memcpy(buffer, read_buffer->data(), to_copy); - read_buffer->trimStart(to_copy); - } - - read_callback->readDataAvailable(total_length); - read_buffer->clear(); - } -} - } // end namespace protocol } // end namespace transport diff --git a/libtransport/src/hicn/transport/protocols/rtc.h b/libtransport/src/hicn/transport/protocols/rtc.h index 2b9ed10a6..9e1731e96 100644 --- a/libtransport/src/hicn/transport/protocols/rtc.h +++ b/libtransport/src/hicn/transport/protocols/rtc.h @@ -15,12 +15,12 @@ #pragma once -#include <queue> #include <map> +#include <queue> #include <unordered_map> +#include <hicn/transport/protocols/datagram_reassembly.h> #include <hicn/transport/protocols/protocol.h> -#include <hicn/transport/protocols/reassembly.h> #include <hicn/transport/protocols/rtc_data_path.h> // algorithm state @@ -35,26 +35,27 @@ #define HICN_TIMESTAMP_SIZE 8 // bytes #define HICN_RTC_INTEREST_LIFETIME 1000 // ms -//rtt measurement -//normal interests for data goes from 0 to -//HICN_MIN_PROBE_SEQ, the rest is reserverd for -//probes +// rtt measurement +// normal interests for data goes from 0 to +// HICN_MIN_PROBE_SEQ, the rest is reserverd for +// probes #define HICN_MIN_PROBE_SEQ 0xefffffff #define HICN_MAX_PROBE_SEQ 0xffffffff // controller constant -#define HICN_ROUND_LEN 200 // ms interval of time on which - // we take decisions / measurements +#define HICN_ROUND_LEN \ + 200 // ms interval of time on which + // we take decisions / measurements #define HICN_MAX_RTX 10 #define HICN_MAX_RTX_SIZE 1024 #define HICN_MAX_RTX_MAX_AGE 10000 -#define HICN_MIN_RTT_WIN 30 // rounds -#define HICN_MIN_INTER_ARRIVAL_GAP 100 //ms +#define HICN_MIN_RTT_WIN 30 // rounds +#define HICN_MIN_INTER_ARRIVAL_GAP 100 // ms // cwin #define HICN_INITIAL_CWIN 1 // packets #define HICN_INITIAL_CWIN_MAX 100000 // packets -#define HICN_MIN_CWIN 10 // packets +#define HICN_MIN_CWIN 10 // packets #define HICN_WIN_INCREASE_FACTOR 1.5 #define HICN_WIN_DECREASE_FACTOR 0.9 @@ -70,30 +71,23 @@ #define HICN_MICRO_IN_A_SEC 1000000 #define HICN_MILLI_IN_A_SEC 1000 - namespace transport { namespace protocol { -enum packetState { - sent_, - nacked_, - received_, - timeout1_, - timeout2_, - lost_ -}; +enum packetState { sent_, nacked_, received_, timeout1_, timeout2_, lost_ }; typedef enum packetState packetState_t; struct sentInterest { uint64_t transmissionTime; - uint32_t sequence; //sequence number of the interest sent - //to handle seq % buffer_size - packetState_t state; //see packet state + uint32_t sequence; // sequence number of the interest sent + // to handle seq % buffer_size + packetState_t state; // see packet state }; -class RTCTransportProtocol : public TransportProtocol, public Reassembly { +class RTCTransportProtocol : public TransportProtocol, + public DatagramReassembly { public: RTCTransportProtocol(interface::ConsumerSocket *icnet_socket); @@ -133,11 +127,16 @@ class RTCTransportProtocol : public TransportProtocol, public Reassembly { bool onNack(const ContentObject &content_object, bool rtx); void onContentObject(Interest::Ptr &&interest, ContentObject::Ptr &&content_object) override; - void returnContentToApplication(const ContentObject &content_object); + void onPacketDropped(Interest::Ptr &&interest, + ContentObject::Ptr &&content_object) override {} + void onReassemblyFailed(std::uint32_t missing_segment) override {} TRANSPORT_ALWAYS_INLINE virtual void reassemble( ContentObject::Ptr &&content_object) override { - returnContentToApplication(*content_object); + auto read_buffer = content_object->getPayload(); + read_buffer->trimStart(HICN_TIMESTAMP_SIZE); + Reassembly::read_buffer_ = std::move(read_buffer); + Reassembly::notifyApplication(); } // controller var @@ -151,36 +150,36 @@ class RTCTransportProtocol : public TransportProtocol, public Reassembly { // names/packets var uint32_t actualSegment_; uint32_t inflightInterestsCount_; - //map seq to rtx + // map seq to rtx std::map<uint32_t, uint8_t> interestRetransmissions_; bool rtx_timer_used_; std::unique_ptr<asio::steady_timer> rtx_timer_; std::vector<sentInterest> inflightInterests_; - uint32_t lastSegNacked_; //indicates the segment id in the last received - // past Nack. we do not ask for retransmissions - //for samething that is older than this value. - uint32_t lastReceived_; //segment of the last content object received - //indicates the base of the window on the client - uint64_t lastReceivedTime_; //time at which we recevied the - //lastReceived_ packet - - //sentinel - //if all packets in the window get lost we need something that - //wakes up our consumer socket. Interest timeouts set to 1 sec - //expire too late. This timers expire much sooner and if it - //detects that all the interest in the window may be lost - //it sends all of them again + uint32_t lastSegNacked_; // indicates the segment id in the last received + // past Nack. we do not ask for retransmissions + // for samething that is older than this value. + uint32_t lastReceived_; // segment of the last content object received + // indicates the base of the window on the client + uint64_t lastReceivedTime_; // time at which we recevied the + // lastReceived_ packet + + // sentinel + // if all packets in the window get lost we need something that + // wakes up our consumer socket. Interest timeouts set to 1 sec + // expire too late. This timers expire much sooner and if it + // detects that all the interest in the window may be lost + // it sends all of them again std::unique_ptr<asio::steady_timer> sentinel_timer_; - uint64_t lastEvent_; //time at which we removed a pending - //interest from the window + uint64_t lastEvent_; // time at which we removed a pending + // interest from the window std::unordered_map<uint32_t, uint8_t> packets_in_window_; - //rtt probes - //the RTC transport tends to overestimate the RTT - //du to the production time on the server side - //once per second we send an interest for wich we know - //we will get a nack. This nack will keep our estimation - //close to the reality + // rtt probes + // the RTC transport tends to overestimate the RTT + // du to the production time on the server side + // once per second we send an interest for wich we know + // we will get a nack. This nack will keep our estimation + // close to the reality std::unique_ptr<asio::steady_timer> probe_timer_; uint64_t time_sent_probe_; uint32_t probe_seq_number_; @@ -203,10 +202,10 @@ class RTCTransportProtocol : public TransportProtocol, public Reassembly { uint32_t rounds_; uint32_t roundsWithoutNacks_; - //we keep track of up two paths (if only one path is in use - //the two values in the vector will be the same) - //position 0 stores the path with minRTT - //position 1 stores the path with maxRTT + // we keep track of up two paths (if only one path is in use + // the two values in the vector will be the same) + // position 0 stores the path with minRTT + // position 1 stores the path with maxRTT uint32_t producerPathLabels_[2]; std::unordered_map<uint32_t, std::shared_ptr<RTCDataPath>> pathTable_; @@ -219,7 +218,6 @@ class RTCTransportProtocol : public TransportProtocol, public Reassembly { unsigned protocolState_; bool initied; - TransportStatistics stats_; }; } // namespace protocol diff --git a/libtransport/src/hicn/transport/protocols/verification_manager.cc b/libtransport/src/hicn/transport/protocols/verification_manager.cc new file mode 100644 index 000000000..f45cab743 --- /dev/null +++ b/libtransport/src/hicn/transport/protocols/verification_manager.cc @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2017-2019 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <hicn/transport/protocols/verification_manager.h> + +#include <hicn/transport/interfaces/socket_consumer.h> + +namespace transport { + +namespace protocol { + +interface::VerificationPolicy SignatureVerificationManager::onPacketToVerify( + const Packet& packet) { + using namespace interface; + + bool verify_signature; + VerificationPolicy ret = VerificationPolicy::DROP_PACKET; + + ConsumerContentObjectVerificationFailedCallback* + verification_failed_callback = VOID_HANDLER; + icn_socket_->getSocketOption(GeneralTransportOptions::VERIFY_SIGNATURE, + verify_signature); + + if (!verify_signature) { + return VerificationPolicy::ACCEPT_PACKET; + } + + icn_socket_->getSocketOption(ConsumerCallbacksOptions::VERIFICATION_FAILED, + &verification_failed_callback); + if (!verification_failed_callback) { + throw errors::RuntimeException( + "No verification failed callback provided by application. " + "Aborting."); + } + + std::shared_ptr<utils::Verifier> verifier; + icn_socket_->getSocketOption(GeneralTransportOptions::VERIFIER, verifier); + + if (TRANSPORT_EXPECT_FALSE(!verifier)) { + ret = (*verification_failed_callback)( + *icn_socket_, dynamic_cast<const ContentObject&>(packet), + make_error_code(protocol_error::no_verifier_provided)); + return ret; + } + + if (!verifier->verify(packet)) { + ret = (*verification_failed_callback)( + *icn_socket_, dynamic_cast<const ContentObject&>(packet), + make_error_code(protocol_error::signature_verification_failed)); + } else { + ret = VerificationPolicy::ACCEPT_PACKET; + } + + return ret; +} + +} // end namespace protocol + +} // end namespace transport diff --git a/libtransport/src/hicn/transport/protocols/verification_manager.h b/libtransport/src/hicn/transport/protocols/verification_manager.h index da67e86f8..6e5d32127 100644 --- a/libtransport/src/hicn/transport/protocols/verification_manager.h +++ b/libtransport/src/hicn/transport/protocols/verification_manager.h @@ -15,56 +15,37 @@ #pragma once -#include <hicn/transport/interfaces/socket_consumer.h> - -#include <deque> +#include <hicn/transport/interfaces/callbacks.h> +#include <hicn/transport/interfaces/verification_policy.h> +#include <hicn/transport/protocols/errors.h> namespace transport { +namespace interface { +class ConsumerSocket; +} + namespace protocol { +using Packet = core::Packet; +using interface::ConsumerSocket; +using interface::VerificationPolicy; + class VerificationManager { public: virtual ~VerificationManager() = default; - virtual bool onPacketToVerify(const Packet& packet) = 0; + virtual VerificationPolicy onPacketToVerify(const Packet& packet) = 0; }; class SignatureVerificationManager : public VerificationManager { public: - SignatureVerificationManager(interface::ConsumerSocket* icn_socket) + SignatureVerificationManager(ConsumerSocket* icn_socket) : icn_socket_(icn_socket) {} - TRANSPORT_ALWAYS_INLINE bool onPacketToVerify(const Packet& packet) override { - using namespace interface; - - bool verify_signature, ret = false; - icn_socket_->getSocketOption(GeneralTransportOptions::VERIFY_SIGNATURE, - verify_signature); - - if (!verify_signature) { - return true; - } - - std::shared_ptr<utils::Verifier> verifier; - icn_socket_->getSocketOption(GeneralTransportOptions::VERIFIER, verifier); - - if (TRANSPORT_EXPECT_FALSE(!verifier)) { - throw errors::RuntimeException( - "No certificate provided by the application."); - } - - ret = verifier->verify(packet); - - if (!ret) { - throw errors::RuntimeException( - "Verification failure policy has to be implemented."); - } - - return ret; - } + interface::VerificationPolicy onPacketToVerify(const Packet& packet) override; private: - interface::ConsumerSocket* icn_socket_; + ConsumerSocket* icn_socket_; }; } // end namespace protocol diff --git a/libtransport/src/hicn/transport/utils/CMakeLists.txt b/libtransport/src/hicn/transport/utils/CMakeLists.txt index cbbca86ed..5a7dbe9cc 100644 --- a/libtransport/src/hicn/transport/utils/CMakeLists.txt +++ b/libtransport/src/hicn/transport/utils/CMakeLists.txt @@ -19,7 +19,6 @@ list(APPEND SOURCE_FILES ${CMAKE_CURRENT_SOURCE_DIR}/signer.cc ${CMAKE_CURRENT_SOURCE_DIR}/verifier.cc ${CMAKE_CURRENT_SOURCE_DIR}/identity.cc - ${CMAKE_CURRENT_SOURCE_DIR}/suffix_strategy.cc ${CMAKE_CURRENT_SOURCE_DIR}/log.cc ${CMAKE_CURRENT_SOURCE_DIR}/membuf.cc ${CMAKE_CURRENT_SOURCE_DIR}/content_store.cc diff --git a/libtransport/src/hicn/transport/utils/suffix_strategy.cc b/libtransport/src/hicn/transport/utils/suffix_strategy.cc deleted file mode 100644 index f3bcc4562..000000000 --- a/libtransport/src/hicn/transport/utils/suffix_strategy.cc +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Copyright (c) 2017-2019 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <hicn/transport/utils/suffix_strategy.h> - -using transport::core::NextSegmentCalculationStrategy; - -namespace utils { -std::uint32_t SuffixManifest::getNextSuffix() { - uint32_t next_suffix; - - switch (suffix_stragegy_) { - case NextSegmentCalculationStrategy::INCREMENTAL: - if (!nb_segments_) { - throw errors::RuntimeException( - "The number of segments in a manifest must be set " - "before assigning incremental suffixes."); - } - /* The current manifest's suffix + the number of segments in a */ - /* manifest give the suffix of the last segment in the manifest. */ - /* The next manifest's suffix is therefore that number plus one. */ - next_suffix = suffix_ + nb_segments_ + 1; - break; - - default: - throw errors::RuntimeException("Unknown suffix strategy."); - } - - return next_suffix; -} - -std::uint32_t SuffixContent::getNextSuffix() { - uint32_t next_suffix; - - switch (suffix_stragegy_) { - case NextSegmentCalculationStrategy::INCREMENTAL: - next_suffix = suffix_ + 1; - if (making_manifest_) { - if (!nb_segments_) { - throw errors::RuntimeException( - "The number of segments in a manifest must be set " - "before assigning incremental suffixes."); - } - - content_counter_++; - /* If the counter have reached the manifest's capacity, - * it means that the next suffix will be a manifest, so we skip it. */ - if (content_counter_ % nb_segments_ == 0) { - next_suffix++; - content_counter_ = 0; - } - } - break; - - default: - throw errors::RuntimeException("Unknown suffix strategy."); - } - - return next_suffix; -} -} // namespace utils diff --git a/libtransport/src/hicn/transport/utils/suffix_strategy.h b/libtransport/src/hicn/transport/utils/suffix_strategy.h index 3014855f6..0ed3c5b0e 100644 --- a/libtransport/src/hicn/transport/utils/suffix_strategy.h +++ b/libtransport/src/hicn/transport/utils/suffix_strategy.h @@ -18,111 +18,148 @@ #include <hicn/transport/core/manifest_format.h> namespace utils { + +using transport::core::NextSegmentCalculationStrategy; + class SuffixStrategy { public: - SuffixStrategy( - transport::core::NextSegmentCalculationStrategy suffix_stragegy, - std::uint32_t start_offset) - : suffix_stragegy_(suffix_stragegy), - suffix_(start_offset), - nb_segments_(0) {} - - transport::core::NextSegmentCalculationStrategy getSuffixStrategy() { - return suffix_stragegy_; - } + static constexpr uint32_t INVALID_SUFFIX = + std::numeric_limits<uint32_t>::max(); + + SuffixStrategy(NextSegmentCalculationStrategy strategy) + : suffix_stragegy_(strategy), + total_count_(0), + final_suffix_(INVALID_SUFFIX) {} + + virtual ~SuffixStrategy() = default; + + virtual uint32_t getNextSuffix() = 0; + + virtual uint32_t getFinalSuffix() { return final_suffix_; } - void setSuffixStrategy( - transport::core::NextSegmentCalculationStrategy strategy) { - suffix_stragegy_ = strategy; + virtual void setFinalSuffix(std::uint32_t final_suffix) { + if (final_suffix != INVALID_SUFFIX) { + final_suffix_ = final_suffix; + } } - std::uint32_t getSuffix() { return suffix_; } + virtual uint32_t getNextManifestSuffix() = 0; - void updateSuffix(std::uint32_t new_suffix) { suffix_ = new_suffix; } + virtual uint32_t getNextContentSuffix() = 0; - std::size_t getNbSegments() { return nb_segments_; } + virtual void reset(uint32_t offset = 0) = 0; - void setNbSegments(std::size_t nb_segments) { nb_segments_ = nb_segments; } + virtual uint32_t getManifestCapacity() = 0; - void reset(std::uint32_t reset_suffix) { - suffix_ = reset_suffix; - nb_segments_ = 0; + virtual void setManifestCapacity(uint32_t capacity) = 0; + + virtual uint32_t getTotalCount() { return total_count_; }; + + NextSegmentCalculationStrategy getSuffixStrategy() { + return suffix_stragegy_; } - ~SuffixStrategy() {} + protected: + inline void incrementTotalCount() { total_count_++; }; protected: - transport::core::NextSegmentCalculationStrategy suffix_stragegy_; - std::uint32_t suffix_; - std::size_t nb_segments_; - virtual std::uint32_t getNextSuffix() = 0; + NextSegmentCalculationStrategy suffix_stragegy_; + std::uint32_t total_count_; + std::uint32_t final_suffix_; }; -class SuffixManifest : public SuffixStrategy { +class IncrementalSuffixStrategy : public SuffixStrategy { public: - SuffixManifest( - transport::core::NextSegmentCalculationStrategy suffix_stragegy, - std::uint32_t start_offset) - : SuffixStrategy(suffix_stragegy, start_offset) {} - - SuffixManifest operator++() { - updateSuffix(getNextSuffix()); - SuffixManifest temp_suffix(suffix_stragegy_, suffix_); - temp_suffix.setNbSegments(getNbSegments()); - return temp_suffix; + IncrementalSuffixStrategy(std::uint32_t start_offset) + : SuffixStrategy(NextSegmentCalculationStrategy::INCREMENTAL), + next_suffix_(start_offset) {} + + TRANSPORT_ALWAYS_INLINE std::uint32_t getNextSuffix() override { + incrementTotalCount(); + return next_suffix_++; + } + + TRANSPORT_ALWAYS_INLINE std::uint32_t getNextContentSuffix() override { + return getNextSuffix(); } - SuffixManifest operator++(int) { - SuffixManifest temp_suffix(suffix_stragegy_, suffix_); - temp_suffix.setNbSegments(getNbSegments()); - updateSuffix(getNextSuffix()); - return temp_suffix; + TRANSPORT_ALWAYS_INLINE std::uint32_t getNextManifestSuffix() override { + return getNextSuffix(); } + uint32_t getManifestCapacity() override { + throw errors::RuntimeException( + "No manifest capacity in IncrementalSuffixStrategy."); + } + + void setManifestCapacity(uint32_t capacity) override { + throw errors::RuntimeException( + "No manifest capacity in IncrementalSuffixStrategy."); + } + + void reset(std::uint32_t offset = 0) override { next_suffix_ = offset; } + protected: - std::uint32_t getNextSuffix(); + std::uint32_t next_suffix_; }; -class SuffixContent : public SuffixStrategy { +class CapacityBasedSuffixStrategy : public SuffixStrategy { public: - SuffixContent(transport::core::NextSegmentCalculationStrategy suffix_stragegy, - std::uint32_t start_offset, bool making_manifest) - : SuffixStrategy(suffix_stragegy, start_offset), - making_manifest_(making_manifest), - content_counter_(0) {} - - SuffixContent(transport::core::NextSegmentCalculationStrategy suffix_stragegy, - std::uint32_t start_offset) - : SuffixContent(suffix_stragegy, start_offset, false) {} - - SuffixContent operator++() { - updateSuffix(getNextSuffix()); - SuffixContent temp_suffix(suffix_stragegy_, suffix_, making_manifest_); - temp_suffix.setNbSegments(getNbSegments()); - temp_suffix.content_counter_ = content_counter_; - return temp_suffix; + CapacityBasedSuffixStrategy(std::uint32_t start_offset, + std::uint32_t manifest_capacity) + : SuffixStrategy(NextSegmentCalculationStrategy::INCREMENTAL), + next_suffix_(start_offset), + segments_in_manifest_(manifest_capacity), + current_manifest_iteration_(0) {} + + TRANSPORT_ALWAYS_INLINE std::uint32_t getNextSuffix() override { + incrementTotalCount(); + return next_suffix_++; + } + + TRANSPORT_ALWAYS_INLINE std::uint32_t getNextContentSuffix() override { + incrementTotalCount(); + return next_suffix_ % segments_in_manifest_ == 0 ? next_suffix_++ + : ++next_suffix_; } - SuffixContent operator++(int) { - SuffixContent temp_suffix(suffix_stragegy_, suffix_, making_manifest_); - temp_suffix.setNbSegments(getNbSegments()); - temp_suffix.content_counter_ = content_counter_; - updateSuffix(getNextSuffix()); - return temp_suffix; + TRANSPORT_ALWAYS_INLINE std::uint32_t getNextManifestSuffix() override { + incrementTotalCount(); + return (current_manifest_iteration_++) * (segments_in_manifest_ + 1); } - void setUsingManifest(bool value) { making_manifest_ = value; } + TRANSPORT_ALWAYS_INLINE uint32_t getManifestCapacity() override { + return segments_in_manifest_; + } - void reset(std::uint32_t reset_suffix) { - SuffixStrategy::reset(reset_suffix); - content_counter_ = 0; + TRANSPORT_ALWAYS_INLINE void setManifestCapacity(uint32_t capacity) override { + segments_in_manifest_ = capacity; } + void reset(std::uint32_t offset = 0) override { next_suffix_ = offset; } + protected: - bool making_manifest_; - /* content_counter_ keeps track of the number of segments */ - /* between two manifests */ - uint32_t content_counter_; - std::uint32_t getNextSuffix(); + std::uint32_t next_suffix_; + std::uint32_t segments_in_manifest_; + std::uint32_t current_manifest_iteration_; }; + +class SuffixStrategyFactory { + public: + static std::unique_ptr<SuffixStrategy> getSuffixStrategy( + NextSegmentCalculationStrategy strategy, uint32_t start_offset, + uint32_t manifest_capacity = 0) { + switch (strategy) { + case NextSegmentCalculationStrategy::INCREMENTAL: + return std::make_unique<IncrementalSuffixStrategy>(start_offset); + case NextSegmentCalculationStrategy::MANIFEST_CAPACITY_BASED: + return std::make_unique<CapacityBasedSuffixStrategy>(start_offset, + manifest_capacity); + default: + throw errors::RuntimeException( + "No valid NextSegmentCalculationStrategy specified."); + } + } +}; + } // namespace utils |