diff options
author | 2017-02-22 14:37:37 +0100 | |
---|---|---|
committer | 2017-02-22 13:46:08 +0000 | |
commit | ba8541cad3a4069886444abbd1848b6ef3fff72c (patch) | |
tree | 39226dd9b036ca7e513c2cccd8e71e15e09b86bc /apps/consumers | |
parent | 9b30fc10fb1cbebe651e5a107e8ca5b24de54675 (diff) |
Initial Commit: libicnet
Change-Id: I10a72cb0d84b76553a85c168416b847f6a4ff5f6
Signed-off-by: Mauro Sardara <msardara+fdio@cisco.com>
Diffstat (limited to 'apps/consumers')
-rwxr-xr-x | apps/consumers/CMakeLists.txt | 30 | ||||
-rwxr-xr-x | apps/consumers/icnet_consumer_dash.cc | 217 | ||||
-rwxr-xr-x | apps/consumers/icnet_consumer_test.cc | 208 | ||||
-rwxr-xr-x | apps/consumers/icnet_iget.cc | 159 |
4 files changed, 614 insertions, 0 deletions
diff --git a/apps/consumers/CMakeLists.txt b/apps/consumers/CMakeLists.txt new file mode 100755 index 00000000..de9c254f --- /dev/null +++ b/apps/consumers/CMakeLists.txt @@ -0,0 +1,30 @@ +# Copyright (c) 2017 Cisco and/or its affiliates. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +cmake_minimum_required(VERSION 3.4) + +set(CONSUMER_SOURCE_FILES icnet_consumer_test.cc) +set(IGET_SOURCE_FILES icnet_iget.cc) +set(CONSUMERDASH_SOURCE_FILES icnet_consumer_dash.cc) + +add_executable(consumer-test ${CONSUMER_SOURCE_FILES}) +add_executable(iget ${IGET_SOURCE_FILES}) +add_executable(consumer-dash ${CONSUMERDASH_SOURCE_FILES}) + +target_link_libraries(consumer-test icnet ${CMAKE_THREAD_LIBS_INIT} ${Boost_LIBRARIES}) +target_link_libraries(iget icnet ${CMAKE_THREAD_LIBS_INIT} ${Boost_LIBRARIES}) +target_link_libraries(consumer-dash icnet ${CMAKE_THREAD_LIBS_INIT} ${Boost_LIBRARIES}) + +install(TARGETS consumer-test DESTINATION ${CMAKE_INSTALL_PREFIX}/bin) +install(TARGETS iget DESTINATION ${CMAKE_INSTALL_PREFIX}/bin) +install(TARGETS consumer-dash DESTINATION ${CMAKE_INSTALL_PREFIX}/bin) diff --git a/apps/consumers/icnet_consumer_dash.cc b/apps/consumers/icnet_consumer_dash.cc new file mode 100755 index 00000000..4ec4a2c9 --- /dev/null +++ b/apps/consumers/icnet_consumer_dash.cc @@ -0,0 +1,217 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "icnet_socket_consumer.h" + +#define DEFAULT_BETA 0.99 +#define DEFAULT_GAMMA 0.07 + +namespace icnet { + +class CallbackContainer { + public: + CallbackContainer() + : work_(new boost::asio::io_service::work(io_service_)), + handler_(std::async(std::launch::async, [this]() { io_service_.run(); })) { + seen_manifest_segments_ = 0; + seen_data_segments_ = 0; + byte_counter_ = 0; + } + + ~CallbackContainer() { + work_.reset(); + } + + void processPayload(ConsumerSocket &c, const uint8_t *buffer, size_t bufferSize) { + std::cout << "Content retrieved!! Size: " << bufferSize << std::endl; + } + + bool verifyData(ConsumerSocket &c, const ContentObject &contentObject) { + if (contentObject.getContentType() == PayloadType::DATA) { + std::cout << "VERIFY CONTENT" << std::endl; + } else if (contentObject.getContentType() == PayloadType::MANIFEST) { + std::cout << "VERIFY MANIFEST" << std::endl; + } + + return true; + } + + void processLeavingInterest(ConsumerSocket &c, const Interest &interest) { + // std::cout << "LEAVES " << interest.getName().toUri() << std::endl; + } + + private: + int seen_manifest_segments_; + int seen_data_segments_; + int byte_counter_; + boost::asio::io_service io_service_; + std::shared_ptr<boost::asio::io_service::work> work_; + std::future<void> handler_; +}; + +class Verificator { + public: + Verificator() { + }; + + ~Verificator() { + } + + bool onPacket(ConsumerSocket &c, const ContentObject &contentObject) { + return true; + } + +}; + +void becomeDaemon() { + pid_t process_id = 0; + pid_t sid = 0; + + // Create child process + process_id = fork(); + + // Indication of fork() failure + if (process_id < 0) { + printf("fork failed!\n"); + // Return failure in exit status + exit(EXIT_FAILURE); + } + + // PARENT PROCESS. Need to kill it. + if (process_id > 0) { + printf("process_id of child process %d \n", process_id); + // return success in exit status + exit(EXIT_SUCCESS); + } + + //unmask the file mode + umask(0); + + //set new session + sid = setsid(); + if (sid < 0) { + // Return failure + exit(EXIT_FAILURE); + } + + // Change the current working directory to root. + chdir("/"); + + // Close stdin. stdout and stderr + close(STDIN_FILENO); + close(STDOUT_FILENO); + close(STDERR_FILENO); + + // Really start application +} + +int main(int argc, char **argv) { + double beta = DEFAULT_BETA; + double drop_factor = DEFAULT_GAMMA; + bool daemon = false; + bool rtt_stats = false; + int n_segment = 427; + bool looping = false; + + int opt; + while ((opt = getopt(argc, argv, "b:d:DRn:l")) != -1) { + switch (opt) { + case 'b': + beta = std::stod(optarg); + break; + case 'd': + drop_factor = std::stod(optarg); + break; + case 'D': + daemon = true; + break; + case 'R': + rtt_stats = true; + break; + case 'n': + n_segment = std::stoi(optarg); + break; + case 'l': + looping = true; + break; + default: + exit(EXIT_FAILURE); + } + } + + std::string name = "ccnx:/webserver/get/sintel/18000"; + + if (argv[optind] == 0) { + std::cerr << "Using default name ccnx:/webserver/sintel/18000" << std::endl; + } else { + name = argv[optind]; + } + + if (daemon) { + becomeDaemon(); + } + + ConsumerSocket c(Name(name.c_str()), TransportProtocolAlgorithms::RAAQM); + + CallbackContainer stubs; + Verificator verificator; + + c.setSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, 1001); + c.setSocketOption(RaaqmTransportOptions::BETA_VALUE, beta); + c.setSocketOption(RaaqmTransportOptions::DROP_FACTOR, drop_factor); + c.setSocketOption(GeneralTransportOptions::MAX_INTEREST_RETX, 10); + c.setSocketOption(OtherOptions::VIRTUAL_DOWNLOAD, true); + c.setSocketOption(RaaqmTransportOptions::RTT_STATS, rtt_stats); + + c.setSocketOption(ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY, + (ConsumerContentObjectVerificationCallback) std::bind(&Verificator::onPacket, + &verificator, + std::placeholders::_1, + std::placeholders::_2)); + + c.setSocketOption(ConsumerCallbacksOptions::CONTENT_RETRIEVED, + (ConsumerContentCallback) std::bind(&CallbackContainer::processPayload, + &stubs, + std::placeholders::_1, + std::placeholders::_2, + std::placeholders::_3)); + + c.setSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT, + (ConsumerInterestCallback) std::bind(&CallbackContainer::processLeavingInterest, + &stubs, + std::placeholders::_1, + std::placeholders::_2)); + + do { + std::stringstream ss; + for (int i = 1; i < n_segment; i++) { + ss << "ccnx:/seg_" << i << ".m4s"; + auto str = ss.str(); + c.consume(Name(str)); + ss.str(""); + } + } while (looping); + + c.stop(); + + return 0; + +} + +} // end namespace icnet + +int main(int argc, char **argv) { + return icnet::main(argc, argv); +} diff --git a/apps/consumers/icnet_consumer_test.cc b/apps/consumers/icnet_consumer_test.cc new file mode 100755 index 00000000..d5b57d6f --- /dev/null +++ b/apps/consumers/icnet_consumer_test.cc @@ -0,0 +1,208 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "icnet_socket_consumer.h" + +#define DEFAULT_BETA 0.99 +#define DEFAULT_GAMMA 0.07 + +namespace icnet { + +class CallbackContainer { + public: + CallbackContainer() + : work_(new boost::asio::io_service::work(io_service_)), + handler_(std::async(std::launch::async, [this]() { io_service_.run(); })) { + seen_manifest_segments_ = 0; + seen_data_segments_ = 0; + byte_counter_ = 0; + } + + ~CallbackContainer() { + work_.reset(); + } + + void processPayload(ConsumerSocket &c, const uint8_t *buffer, size_t bufferSize) { + std::cout << "Content retrieved!! Size: " << bufferSize << std::endl; + + io_service_.dispatch([buffer, bufferSize]() { + std::ofstream file("ciao.txt", std::ofstream::binary); + file.write((char *) buffer, bufferSize); + file.close(); + }); + } + + bool verifyData(ConsumerSocket &c, const ContentObject &contentObject) { + if (contentObject.getContentType() == PayloadType::DATA) { + std::cout << "VERIFY CONTENT" << std::endl; + } else if (contentObject.getContentType() == PayloadType::MANIFEST) { + std::cout << "VERIFY MANIFEST" << std::endl; + } + + return true; + } + + void processLeavingInterest(ConsumerSocket &c, const Interest &interest) { + // std::cout << "LEAVES " << interest.getName().toUri() << std::endl; + } + + private: + int seen_manifest_segments_; + int seen_data_segments_; + int byte_counter_; + boost::asio::io_service io_service_; + std::shared_ptr<boost::asio::io_service::work> work_; + std::future<void> handler_; +}; + +class Verificator { + public: + Verificator() { + }; + + ~Verificator() { + // m_keyChain.deleteIdentity(Name(IDENTITY_NAME)); + } + + bool onPacket(ConsumerSocket &c, const ContentObject &contentObject) { + + return true; + } +}; + +void becomeDaemon() { + pid_t process_id = 0; + pid_t sid = 0; + + // Create child process + process_id = fork(); + + // Indication of fork() failure + if (process_id < 0) { + printf("fork failed!\n"); + // Return failure in exit status + exit(EXIT_FAILURE); + } + + // PARENT PROCESS. Need to kill it. + if (process_id > 0) { + printf("process_id of child process %d \n", process_id); + // return success in exit status + exit(EXIT_SUCCESS); + } + + //unmask the file mode + umask(0); + + //set new session + sid = setsid(); + if (sid < 0) { + // Return failure + exit(EXIT_FAILURE); + } + + // Change the current working directory to root. + chdir("/"); + + // Close stdin. stdout and stderr + close(STDIN_FILENO); + close(STDOUT_FILENO); + close(STDERR_FILENO); + + // Really start application +} + +int main(int argc, char *argv[]) { + double beta = DEFAULT_BETA; + double dropFactor = DEFAULT_GAMMA; + bool daemon = false; + bool rttStats = false; + + int opt; + while ((opt = getopt(argc, argv, "b:d:DR")) != -1) { + switch (opt) { + case 'b': + beta = std::stod(optarg); + break; + case 'd': + dropFactor = std::stod(optarg); + break; + case 'D': + daemon = true; + break; + case 'R': + rttStats = true; + break; + default: + exit(EXIT_FAILURE); + } + } + + std::string name = "ccnx:/ccnxtest"; + + if (argv[optind] == 0) { + std::cerr << "Using default name ccnx:/ccnxtest" << std::endl; + } else { + name = argv[optind]; + } + + if (daemon) { + becomeDaemon(); + } + + ConsumerSocket c(Name(name.c_str()), TransportProtocolAlgorithms::RAAQM); + + CallbackContainer stubs; + Verificator verificator; + + c.setSocketOption(GeneralTransportOptions::INTEREST_LIFETIME, 1001); + c.setSocketOption(RaaqmTransportOptions::BETA_VALUE, beta); + c.setSocketOption(RaaqmTransportOptions::DROP_FACTOR, dropFactor); + c.setSocketOption(GeneralTransportOptions::MAX_INTEREST_RETX, 200); + c.setSocketOption(OtherOptions::VIRTUAL_DOWNLOAD, true); + c.setSocketOption(RaaqmTransportOptions::RTT_STATS, rttStats); + + c.setSocketOption(ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY, + (ConsumerContentObjectVerificationCallback) std::bind(&Verificator::onPacket, + &verificator, + std::placeholders::_1, + std::placeholders::_2)); + + c.setSocketOption(ConsumerCallbacksOptions::CONTENT_RETRIEVED, + (ConsumerContentCallback) std::bind(&CallbackContainer::processPayload, + &stubs, + std::placeholders::_1, + std::placeholders::_2, + std::placeholders::_3)); + + c.setSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT, + (ConsumerInterestCallback) std::bind(&CallbackContainer::processLeavingInterest, + &stubs, + std::placeholders::_1, + std::placeholders::_2)); + + c.consume(Name()); + + c.stop(); + + return 0; + +} + +} // end namespace icnet + +int main(int argc, char *argv[]) { + return icnet::main(argc, argv); +} diff --git a/apps/consumers/icnet_iget.cc b/apps/consumers/icnet_iget.cc new file mode 100755 index 00000000..db5ef173 --- /dev/null +++ b/apps/consumers/icnet_iget.cc @@ -0,0 +1,159 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "icnet_socket_consumer.h" + +typedef std::chrono::time_point<std::chrono::system_clock> Time; +typedef std::chrono::milliseconds TimeDuration; + +Time t1 = std::chrono::system_clock::now(); + +#define DEFAULT_BETA 0.99 +#define DEFAULT_GAMMA 0.07 + +namespace icnet { + +class CallbackContainer { + public: + CallbackContainer() + : work_(new boost::asio::io_service::work(io_service_)), + handler_(std::async(std::launch::async, [this]() { io_service_.run(); })) { + seen_manifest_segments_ = 0; + seen_data_segments_ = 0; + byte_counter_ = 0; + } + + ~CallbackContainer() { + work_.reset(); + } + + void processPayload(ConsumerSocket &c, const uint8_t *buffer, size_t buffer_size) { + Name m_name; + c.getSocketOption(GeneralTransportOptions::NAME_PREFIX, m_name); + std::string filename = m_name.toString().substr(1 + m_name.toString().find_last_of("/")); + io_service_.dispatch([buffer, buffer_size, filename]() { + std::cout << "Saving to: " << filename << " " << buffer_size / 1024 << "kB" << std::endl; + Time t3 = std::chrono::system_clock::now();; + std::ofstream file(filename.c_str(), std::ofstream::binary); + file.write((char *) buffer, buffer_size); + file.close(); + Time t2 = std::chrono::system_clock::now();; + TimeDuration dt = std::chrono::duration_cast<std::chrono::milliseconds>(t2 - t1); + TimeDuration dt3 = std::chrono::duration_cast<std::chrono::milliseconds>(t3 - t1); + long msec = dt.count(); + long msec3 = dt3.count(); + std::cout << "Elapsed Time: " << msec / 1000.0 << " seconds -- " << buffer_size * 8 / msec / 1000.0 + << "[Mbps] -- " << buffer_size * 8 / msec3 / 1000.0 << "[Mbps]" << std::endl; + }); + } + + bool verifyData(ConsumerSocket &c, const ContentObject &content_object) { + if (content_object.getContentType() == PayloadType::DATA) { + std::cout << "VERIFY CONTENT" << std::endl; + } else if (content_object.getContentType() == PayloadType::MANIFEST) { + std::cout << "VERIFY MANIFEST" << std::endl; + } + + return true; + } + + void processLeavingInterest(ConsumerSocket &c, const Interest &interest) { + // std::cout << "OUTPUT: " << interest.getName() << std::endl; + } + + private: + int seen_manifest_segments_; + int seen_data_segments_; + int byte_counter_; + boost::asio::io_service io_service_; + std::shared_ptr<boost::asio::io_service::work> work_; + std::future<void> handler_; +}; + +/* + * The client signature verification is currently being reworked with the new API. + * The implementation is disabled for the moment. + */ + +class Verificator { + public: + Verificator() { + }; + + ~Verificator() { + // m_keyChain.deleteIdentity(Name(IDENTITY_NAME)); + } + + bool onPacket(ConsumerSocket &c, const ContentObject &contentObject) { + return true; + } +}; + +int main(int argc, char **argv) { + + std::string url = ""; + std::string locator = ""; + std::string path = ""; + std::string name = "ccnx:/locator/get/path"; + size_t found = 0; + size_t path_begin = 0; + + if (argv[optind] == 0) { + std::cerr << "Missing URL" << std::endl; + return 0; + } else { + url = argv[optind]; + std::cout << "Iget " << url << std::endl; + } + + found = url.find("//"); + path_begin = url.find('/', found + 2); + locator = url.substr(found + 2, path_begin - (found + 2)); + path = url.substr(path_begin, std::string::npos); + std::cout << "locator " << locator << std::endl; + std::cout << "path " << path << std::endl; + name = "ccnx:/" + locator + "/get" + path; + std::cout << "Iget ccnx name: " << name << std::endl; + + ConsumerSocket c(Name(name.c_str()), TransportProtocolAlgorithms::RAAQM); + CallbackContainer stubs; + Verificator verificator; + + c.setSocketOption(ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY, + (ConsumerContentObjectVerificationCallback) std::bind(&Verificator::onPacket, + &verificator, + std::placeholders::_1, + std::placeholders::_2)); + c.setSocketOption(ConsumerCallbacksOptions::CONTENT_RETRIEVED, + (ConsumerContentCallback) std::bind(&CallbackContainer::processPayload, + &stubs, + std::placeholders::_1, + std::placeholders::_2, + std::placeholders::_3)); + c.setSocketOption(ConsumerCallbacksOptions::INTEREST_OUTPUT, + (ConsumerInterestCallback) std::bind(&CallbackContainer::processLeavingInterest, + &stubs, + std::placeholders::_1, + std::placeholders::_2)); + c.consume(Name()); + c.stop(); + return 0; +} + +} // end namespace icnet + +int main(int argc, char **argv) { + return icnet::main(argc, argv); +} |