diff options
Diffstat (limited to 'Input/ICNConnectionConsumerApi.cpp')
-rw-r--r-- | Input/ICNConnectionConsumerApi.cpp | 283 |
1 files changed, 283 insertions, 0 deletions
diff --git a/Input/ICNConnectionConsumerApi.cpp b/Input/ICNConnectionConsumerApi.cpp new file mode 100644 index 00000000..9de82908 --- /dev/null +++ b/Input/ICNConnectionConsumerApi.cpp @@ -0,0 +1,283 @@ +/* + * Copyright (c) 2017 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef ICNICPDOWNLOAD + +#include "ICNConnectionConsumerApi.h" + +#define DEFAULT_LIFETIME 250 +#define RETRY_TIMEOUTS 5 + +using namespace dash; +using namespace dash::network; +using namespace dash::metrics; +using namespace icnet; + +using std::bind; +using std::placeholders::_1; +using std::placeholders::_2; +using std::placeholders::_3; + +using duration_in_seconds = std::chrono::duration<double, std::ratio<1, 1> >; + + +namespace libdash { +namespace framework { +namespace input { +ICNConnectionConsumerApi::ICNConnectionConsumerApi(double alpha, float beta, float drop) : + m_recv_name(ccnx::Name()), + m_first(1), + m_isFinished(false), + sizeDownloaded (0), + cumulativeBytesReceived(0), + icnAlpha(alpha), + beta(beta), + drop(drop) +{ + gamma = 1; + this->speed = 0.0; + this->dnltime = 0.0; + this->deezData = NULL; + this->deezDataSize = 0; + this->datSize = 0; + this->dataPos = 0; + InitializeConditionVariable (&this->contentRetrieved); + InitializeCriticalSection (&this->monitorMutex); + + this->myConsumer = new ConsumerSocket(ccnx::Name(), TransportProtocolAlgorithms::RAAQM); + this->myConsumer->setSocketOption(RaaqmTransportOptions::GAMMA_VALUE, (int)gamma); + + bool configFile = false; + //CHECK if we are not going to override the configuration file. (if !autotune) + if(FILE *fp = fopen("/usr/etc/consumer.conf", "r")) + { + fclose(fp); + configFile = true; + } + if(!configFile) + { + qDebug("beta %f, drop %f", this->beta, this->drop); + this->myConsumer->setSocketOption(RaaqmTransportOptions::BETA_VALUE, this->beta); + this->myConsumer->setSocketOption(RaaqmTransportOptions::DROP_FACTOR, this->drop); + } + this->myConsumer->setSocketOption(RateEstimationOptions::RATE_ESTIMATION_OBSERVER, this); + this->myConsumer->setSocketOption(ConsumerCallbacksOptions::CONTENT_RETRIEVED, (ConsumerContentCallback) bind(&ICNConnectionConsumerApi::processPayload, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); + this->myConsumer->setSocketOption(ConsumerCallbacksOptions::CONTENT_OBJECT_TO_VERIFY, (ConsumerContentObjectVerificationCallback)bind(&ICNConnectionConsumerApi::onPacket, this, std::placeholders::_1, std::placeholders::_2)); +#ifdef NO_GUI + if(this->icnAlpha != 20) + this->icnRateBased = true; +#else + this->icnRateBased = true; +#endif + Debug("ICN class created\n"); + +} + +ICNConnectionConsumerApi::~ICNConnectionConsumerApi() { + delete this->myConsumer; + if(this->deezData) + { + free(this->deezData); + this->deezData = NULL; + } + DeleteConditionVariable (&this->contentRetrieved); + DeleteCriticalSection (&this->monitorMutex); +} + +void ICNConnectionConsumerApi::Init(IChunk *chunk) { + Debug("ICN Connection: STARTING\n"); + m_first = 1; + sizeDownloaded = 0; + m_name = "ccnx:/" + chunk->Host() + chunk->Path(); + m_isFinished = false; + + res = false; + dataPos = 0; + datSize = 0; + if(this->deezData) + { + memset(this->deezData, 0, this->deezDataSize); + } + + qDebug("ICN_Connection:\tINTIATED_to_name %s\n", m_name.c_str()); + qDebug("ICN_Connection:\tSTARTING DOWNLOAD %s\n", m_name.c_str()); +} + +void ICNConnectionConsumerApi::InitForMPD(const std::string& url) +{ + m_first = 1; + sizeDownloaded = 0; + + if(url.find("//") != std::string::npos) + { + int pos = url.find("//"); + char* myName = (char*)malloc(strlen(url.c_str()) - 1); + strncpy(myName, url.c_str(), pos + 1); + strncpy(myName + pos + 1, url.c_str() + pos + 2, strlen(url.c_str()) - pos - 2); + m_name = std::string(myName); + free(myName); + } + else + { + m_name = url; + } + m_isFinished = false; + + res = false; + dataPos = 0; + datSize = 0; + Debug("ICN_Connection:\tINTIATED_for_mpd %s\n", m_name.c_str()); +} + +int ICNConnectionConsumerApi::Read(uint8_t* data, size_t len, IChunk *chunk) +{ + return this->Read(data, len); +} + +int ICNConnectionConsumerApi::Read(uint8_t *data, size_t len) +{ + if(!res) + m_start_time = std::chrono::system_clock::now(); + + if(res) + { + if(this->dataPos == this->datSize) + { + this->dnltime = std::chrono::duration_cast<duration_in_seconds>(std::chrono::system_clock::now() - m_start_time).count(); + if(speed == 0 || !this->icnRateBased) + speed = (double) (sizeDownloaded * 8 / this->dnltime); + cumulativeBytesReceived += sizeDownloaded; + Debug("ICN_Connection:\tFINISHED DOWNLOADING %s Average_DL: %f size: %lu cumulative: %lu Throughput: %f\n", m_name.c_str(), speed, sizeDownloaded, cumulativeBytesReceived, (double) (sizeDownloaded * 8 / this->dnltime)); + return 0; + } + if((this->datSize - this->dataPos) > (int)len) + { + memcpy(data, this->deezData + this->dataPos, len); + this->dataPos += len; + sizeDownloaded += len; + return len; + } + else + { + assert(this->datSize - this->dataPos > 0); + memcpy(data, this->deezData + this->dataPos, this->datSize - this->dataPos); + int temp = this->datSize - this->dataPos; + this->dataPos += this->datSize - this->dataPos; + sizeDownloaded += temp; + return temp; + } + } + + Debug("will consume: %s\n", m_name.c_str()); + this->myConsumer->consume(m_name); + EnterCriticalSection(&this->monitorMutex); + + while(this->m_isFinished == false) + SleepConditionVariableCS(&this->contentRetrieved, &this->monitorMutex, INFINITE); + + assert(this->datSize != 0); + this->res = true; + LeaveCriticalSection(&this->monitorMutex); + if(this->datSize > (int)len) + { + memcpy(data, this->deezData, len); + this->dataPos += len; + sizeDownloaded += len; + return len; + } + else + { + memcpy(data, this->deezData, this->datSize); + this->dataPos += this->datSize; + sizeDownloaded += this->datSize; + return this->datSize; + } +} + +int ICNConnectionConsumerApi::Peek(uint8_t *data, size_t len, IChunk *chunk) { + return -1; +} + +bool ICNConnectionConsumerApi::onPacket(ConsumerSocket& c, const ContentObject& data) +{ + return true; +} + +void ICNConnectionConsumerApi::processPayload(ConsumerSocket& c, const uint8_t* buffer, size_t bufferSize) +{ + EnterCriticalSection(&this->monitorMutex); + if(this->deezData == NULL) + { + this->deezData = (char *)malloc(bufferSize*sizeof(uint8_t)); + this->deezDataSize = bufferSize; + } + else + { + if(bufferSize > this->deezDataSize) + { + this->deezData = (char *)realloc(this->deezData, bufferSize * (sizeof(uint8_t))); + this->deezDataSize = bufferSize; + } + } + memcpy(this->deezData, buffer, bufferSize*sizeof(uint8_t)); + this->m_isFinished = true; + this->datSize = (int) bufferSize; + WakeAllConditionVariable(&this->contentRetrieved); + LeaveCriticalSection(&this->monitorMutex); +} + +double ICNConnectionConsumerApi::GetAverageDownloadingSpeed() +{ + Debug("ICNConnection: DL speed is %f\n", this->speed); + return this->speed; +} + +double ICNConnectionConsumerApi::GetDownloadingTime() +{ + Debug("ICNConnection: DL time is %f\n", this->dnltime); + return this->dnltime; +} + +const std::vector<ITCPConnection *> &ICNConnectionConsumerApi::GetTCPConnectionList() const { + return tcpConnections; +} + +const std::vector<IHTTPTransaction *> &ICNConnectionConsumerApi::GetHTTPTransactionList() const { + return httpTransactions; +} + +void ICNConnectionConsumerApi::notifyStats(double winSize) +{ + this->speed = (winSize); // * 1000000 * 1400 * 8; + Debug("ICNConnection:\tNotificationICPDL\t%f\t%f\n", winSize, speed); +} + +void ICNConnectionConsumerApi::SetBeta(float beta) +{ + this->beta = beta; +} + + +void ICNConnectionConsumerApi::SetDrop(float drop) +{ + this->drop = drop; +} + +} /* namespace input */ +} /* namespace framework */ +} /* namespace libdash */ + + +#endif //NDEF ICNICPDOWNLOAD |