aboutsummaryrefslogtreecommitdiffstats
path: root/src/libdash/source/network
diff options
context:
space:
mode:
Diffstat (limited to 'src/libdash/source/network')
-rw-r--r--src/libdash/source/network/AbstractChunk.cpp271
-rw-r--r--src/libdash/source/network/AbstractChunk.h100
-rw-r--r--src/libdash/source/network/DownloadStateManager.cpp100
-rw-r--r--src/libdash/source/network/DownloadStateManager.h50
4 files changed, 521 insertions, 0 deletions
diff --git a/src/libdash/source/network/AbstractChunk.cpp b/src/libdash/source/network/AbstractChunk.cpp
new file mode 100644
index 00000000..35774efe
--- /dev/null
+++ b/src/libdash/source/network/AbstractChunk.cpp
@@ -0,0 +1,271 @@
+/*
+ * AbstractChunk.cpp
+ *****************************************************************************
+ * Copyright (C) 2012, bitmovin Softwareentwicklung OG, All Rights Reserved
+ *
+ * Email: libdash-dev@vicky.bitmovin.net
+ *
+ * This source code and its use and distribution, is subject to the terms
+ * and conditions of the applicable license agreement.
+ *****************************************************************************/
+
+#include "AbstractChunk.h"
+
+using namespace dash::network;
+using namespace dash::helpers;
+using namespace dash::metrics;
+
+uint32_t AbstractChunk::BLOCKSIZE = 32768;
+
+AbstractChunk::AbstractChunk () :
+ connection (NULL),
+ dlThread (NULL),
+ bytesDownloaded (0)
+{
+}
+AbstractChunk::~AbstractChunk ()
+{
+ this->AbortDownload();
+ this->blockStream.Clear();
+ DestroyThreadPortable(this->dlThread);
+}
+
+void AbstractChunk::AbortDownload ()
+{
+ this->stateManager.CheckAndSet(IN_PROGRESS, REQUEST_ABORT);
+ this->stateManager.CheckAndWait(REQUEST_ABORT, ABORTED);
+}
+bool AbstractChunk::StartDownload ()
+{
+ if(this->stateManager.State() != NOT_STARTED)
+ return false;
+ curl_global_init(CURL_GLOBAL_ALL);
+ this->curlm = curl_multi_init();
+
+ this->curl = curl_easy_init();
+ curl_easy_setopt(this->curl, CURLOPT_URL, this->AbsoluteURI().c_str());
+ curl_easy_setopt(this->curl, CURLOPT_WRITEFUNCTION, CurlResponseCallback);
+ curl_easy_setopt(this->curl, CURLOPT_WRITEDATA, (void *)this);
+ /* Debug Callback */
+ curl_easy_setopt(this->curl, CURLOPT_VERBOSE, 1L);
+ curl_easy_setopt(this->curl, CURLOPT_DEBUGFUNCTION, CurlDebugCallback);
+ curl_easy_setopt(this->curl, CURLOPT_DEBUGDATA, (void *)this);
+ curl_easy_setopt(this->curl, CURLOPT_FAILONERROR, true);
+
+ if(this->HasByteRange())
+ curl_easy_setopt(this->curl, CURLOPT_RANGE, this->Range().c_str());
+
+ curl_multi_add_handle(this->curlm, this->curl);
+ this->dlThread = CreateThreadPortable (DownloadInternalConnection, this);
+
+ if(this->dlThread == NULL)
+ return false;
+
+ this->stateManager.State(IN_PROGRESS);
+
+ return true;
+}
+bool AbstractChunk::StartDownload (IConnection *connection)
+{
+ if(this->stateManager.State() != NOT_STARTED)
+ return false;
+
+ this->connection = connection;
+ this->dlThread = CreateThreadPortable (DownloadExternalConnection, this);
+
+ if(this->dlThread == NULL)
+ return false;
+
+ this->stateManager.State(IN_PROGRESS);
+
+
+ return true;
+}
+int AbstractChunk::Read (uint8_t *data, size_t len)
+{
+ return this->blockStream.GetBytes(data, len);
+}
+int AbstractChunk::Peek (uint8_t *data, size_t len)
+{
+ return this->blockStream.PeekBytes(data, len);
+}
+int AbstractChunk::Peek (uint8_t *data, size_t len, size_t offset)
+{
+ return this->blockStream.PeekBytes(data, len, offset);
+}
+void AbstractChunk::AttachDownloadObserver (IDownloadObserver *observer)
+{
+ this->observers.push_back(observer);
+ this->stateManager.Attach(observer);
+}
+void AbstractChunk::DetachDownloadObserver (IDownloadObserver *observer)
+{
+ uint32_t pos = -1;
+
+ for(size_t i = 0; i < this->observers.size(); i++)
+ if(this->observers.at(i) == observer)
+ pos = i;
+
+ if(pos != -1)
+ this->observers.erase(this->observers.begin() + pos);
+
+ this->stateManager.Detach(observer);
+}
+void* AbstractChunk::DownloadExternalConnection (void *abstractchunk)
+{
+ AbstractChunk *chunk = (AbstractChunk *) abstractchunk;
+ block_t *block = AllocBlock(chunk->BLOCKSIZE);
+ int ret = 0;
+
+ int count = 0;
+ do
+ {
+ ret = chunk->connection->Read(block->data, block->len, chunk);
+ if(ret > 0)
+ {
+ block_t *streamblock = AllocBlock(ret);
+ memcpy(streamblock->data, block->data, ret);
+ chunk->blockStream.PushBack(streamblock);
+ chunk->bytesDownloaded += ret;
+
+ // chunk->NotifyDownloadRateChanged();
+ }
+ if(chunk->stateManager.State() == REQUEST_ABORT)
+ ret = 0;
+ count += ret;
+ }while(ret);
+
+ double speed = chunk->connection->GetAverageDownloadingSpeed();
+ double time = chunk->connection->GetDownloadingTime();
+ chunk->NotifyDownloadRateChanged(speed);
+ chunk->NotifyDownloadTimeChanged(time);
+ DeleteBlock(block);
+
+ if(chunk->stateManager.State() == REQUEST_ABORT)
+ chunk->stateManager.State(ABORTED);
+ else
+ chunk->stateManager.State(COMPLETED);
+
+ chunk->blockStream.SetEOS(true);
+
+ return NULL;
+}
+void* AbstractChunk::DownloadInternalConnection (void *abstractchunk)
+{
+ AbstractChunk *chunk = (AbstractChunk *) abstractchunk;
+
+ //chunk->response = curl_easy_perform(chunk->curl);
+ int u =1;
+
+ while(chunk->stateManager.State() != REQUEST_ABORT && u)
+ {
+ curl_multi_perform(chunk->curlm, &u);
+ }
+ double speed;
+ double size;
+ double time;
+ curl_easy_getinfo(chunk->curl, CURLINFO_SPEED_DOWNLOAD,&speed);
+ curl_easy_getinfo(chunk->curl, CURLINFO_SIZE_DOWNLOAD, &size);
+ curl_easy_getinfo(chunk->curl, CURLINFO_TOTAL_TIME, &time);
+
+ //Speed is in Bps ==> *8 for the bps
+ speed = 8*speed;
+ //size = 8*size; //Uncomment for the size in bits.
+ chunk->NotifyDownloadRateChanged(speed);
+ chunk->NotifyDownloadTimeChanged(time);
+ curl_easy_cleanup(chunk->curl);
+ //curl_global_cleanup();
+
+ curl_multi_cleanup(chunk->curlm);
+ if(chunk->stateManager.State() == REQUEST_ABORT)
+ {
+ chunk->stateManager.State(ABORTED);
+ }
+ else
+ {
+ chunk->stateManager.State(COMPLETED);
+ }
+
+ chunk->blockStream.SetEOS(true);
+
+ return NULL;
+}
+void AbstractChunk::NotifyDownloadRateChanged (double bitrate)
+{
+ for(size_t i = 0; i < this->observers.size(); i++)
+ this->observers.at(i)->OnDownloadRateChanged((uint64_t)bitrate);
+}
+void AbstractChunk::NotifyDownloadTimeChanged (double dnltime)
+{
+ for(size_t i = 0; i < this->observers.size(); i++)
+ this->observers.at(i)->OnDownloadTimeChanged(dnltime);
+}
+size_t AbstractChunk::CurlResponseCallback (void *contents, size_t size, size_t nmemb, void *userp)
+{
+ size_t realsize = size * nmemb;
+ AbstractChunk *chunk = (AbstractChunk *)userp;
+
+ if(chunk->stateManager.State() == REQUEST_ABORT)
+ return 0;
+
+ block_t *block = AllocBlock(realsize);
+
+ memcpy(block->data, contents, realsize);
+ chunk->blockStream.PushBack(block);
+
+ chunk->bytesDownloaded += realsize;
+// chunk->NotifyDownloadRateChanged();
+
+ return realsize;
+}
+size_t AbstractChunk::CurlDebugCallback (CURL *url, curl_infotype infoType, char * data, size_t length, void *userdata)
+{
+ AbstractChunk *chunk = (AbstractChunk *)userdata;
+
+ switch (infoType) {
+ case CURLINFO_TEXT:
+ break;
+ case CURLINFO_HEADER_OUT:
+ chunk->HandleHeaderOutCallback();
+ break;
+ case CURLINFO_HEADER_IN:
+ chunk->HandleHeaderInCallback(std::string(data));
+ break;
+ case CURLINFO_DATA_IN:
+ break;
+ default:
+ return 0;
+ }
+ return 0;
+}
+void AbstractChunk::HandleHeaderOutCallback ()
+{
+ HTTPTransaction *httpTransaction = new HTTPTransaction();
+
+ httpTransaction->SetOriginalUrl(this->AbsoluteURI());
+ httpTransaction->SetRange(this->Range());
+ httpTransaction->SetType(this->GetType());
+ httpTransaction->SetRequestSentTime(Time::GetCurrentUTCTimeStr());
+
+ this->httpTransactions.push_back(httpTransaction);
+}
+void AbstractChunk::HandleHeaderInCallback (std::string data)
+{
+ HTTPTransaction *httpTransaction = this->httpTransactions.at(this->httpTransactions.size()-1);
+
+ if (data.substr(0,4) == "HTTP")
+ {
+ httpTransaction->SetResponseReceivedTime(Time::GetCurrentUTCTimeStr());
+ httpTransaction->SetResponseCode(strtoul(data.substr(9,3).c_str(), NULL, 10));
+ }
+
+ httpTransaction->AddHTTPHeaderLine(data);
+}
+const std::vector<ITCPConnection *>& AbstractChunk::GetTCPConnectionList () const
+{
+ return (std::vector<ITCPConnection *> &) this->tcpConnections;
+}
+const std::vector<IHTTPTransaction *>& AbstractChunk::GetHTTPTransactionList () const
+{
+ return (std::vector<IHTTPTransaction *> &) this->httpTransactions;
+}
diff --git a/src/libdash/source/network/AbstractChunk.h b/src/libdash/source/network/AbstractChunk.h
new file mode 100644
index 00000000..794469fd
--- /dev/null
+++ b/src/libdash/source/network/AbstractChunk.h
@@ -0,0 +1,100 @@
+/*
+ * AbstractChunk.h
+ *****************************************************************************
+ * Copyright (C) 2012, bitmovin Softwareentwicklung OG, All Rights Reserved
+ *
+ * Email: libdash-dev@vicky.bitmovin.net
+ *
+ * This source code and its use and distribution, is subject to the terms
+ * and conditions of the applicable license agreement.
+ *****************************************************************************/
+
+#ifndef ABSTRACTCHUNK_H_
+#define ABSTRACTCHUNK_H_
+
+#include "config.h"
+
+#include "IDownloadableChunk.h"
+#include "DownloadStateManager.h"
+#include "../helpers/SyncedBlockStream.h"
+#include "../portable/Networking.h"
+#include <curl/curl.h>
+#include "../metrics/HTTPTransaction.h"
+#include "../metrics/TCPConnection.h"
+#include "../metrics/ThroughputMeasurement.h"
+#include "../helpers/Time.h"
+
+#include <chrono>
+
+namespace dash
+{
+ namespace network
+ {
+ class AbstractChunk : public virtual IDownloadableChunk
+ {
+ public:
+ AbstractChunk ();
+ virtual ~AbstractChunk ();
+
+ /*
+ * Pure virtual IChunk Interface
+ */
+ virtual std::string& AbsoluteURI () = 0;
+ virtual std::string& Host () = 0;
+ virtual size_t Port () = 0;
+ virtual std::string& Path () = 0;
+ virtual std::string& Range () = 0;
+ virtual size_t StartByte () = 0;
+ virtual size_t EndByte () = 0;
+ virtual bool HasByteRange () = 0;
+ virtual dash::metrics::HTTPTransactionType GetType() = 0;
+ /*
+ * IDownloadableChunk Interface
+ */
+ virtual bool StartDownload (IConnection *connection);
+ virtual bool StartDownload ();
+ virtual void AbortDownload ();
+ virtual int Read (uint8_t *data, size_t len);
+ virtual int Peek (uint8_t *data, size_t len);
+ virtual int Peek (uint8_t *data, size_t len, size_t offset);
+ virtual void AttachDownloadObserver (IDownloadObserver *observer);
+ virtual void DetachDownloadObserver (IDownloadObserver *observer);
+ /*
+ * Observer Notification
+ */
+ void NotifyDownloadRateChanged (double bitrate);
+ void NotifyDownloadTimeChanged (double dnltime);
+ /*
+ * IDASHMetrics
+ */
+ const std::vector<dash::metrics::ITCPConnection *>& GetTCPConnectionList () const;
+ const std::vector<dash::metrics::IHTTPTransaction *>& GetHTTPTransactionList () const;
+
+ private:
+ std::vector<IDownloadObserver *> observers;
+ THREAD_HANDLE dlThread;
+ IConnection *connection;
+ helpers::SyncedBlockStream blockStream;
+ CURL *curl;
+ CURLM *curlm;
+ CURLcode response;
+ uint64_t bytesDownloaded;
+ DownloadStateManager stateManager;
+
+ std::vector<dash::metrics::TCPConnection *> tcpConnections;
+ std::vector<dash::metrics::HTTPTransaction *> httpTransactions;
+
+ static uint32_t BLOCKSIZE;
+
+ static void* DownloadExternalConnection (void *chunk);
+ static void* DownloadInternalConnection (void *chunk);
+ static size_t CurlResponseCallback (void *contents, size_t size, size_t nmemb, void *userp);
+ static size_t CurlHeaderCallback (void *headerData, size_t size, size_t nmemb, void *userdata);
+ static size_t CurlDebugCallback (CURL *url, curl_infotype infoType, char * data, size_t length, void *userdata);
+ void HandleHeaderOutCallback ();
+ void HandleHeaderInCallback (std::string data);
+ };
+ }
+}
+
+#endif /* ABSTRACTCHUNK_H_ */
diff --git a/src/libdash/source/network/DownloadStateManager.cpp b/src/libdash/source/network/DownloadStateManager.cpp
new file mode 100644
index 00000000..5117c099
--- /dev/null
+++ b/src/libdash/source/network/DownloadStateManager.cpp
@@ -0,0 +1,100 @@
+/*
+ * DownloadStateManager.cpp
+ *****************************************************************************
+ * Copyright (C) 2012, bitmovin Softwareentwicklung OG, All Rights Reserved
+ *
+ * Email: libdash-dev@vicky.bitmovin.net
+ *
+ * This source code and its use and distribution, is subject to the terms
+ * and conditions of the applicable license agreement.
+ *****************************************************************************/
+
+#include "DownloadStateManager.h"
+
+using namespace dash::network;
+
+DownloadStateManager::DownloadStateManager () :
+ state (NOT_STARTED)
+{
+ InitializeConditionVariable (&this->stateChanged);
+ InitializeCriticalSection (&this->stateLock);
+}
+DownloadStateManager::~DownloadStateManager ()
+{
+ DeleteConditionVariable (&this->stateChanged);
+ DeleteCriticalSection (&this->stateLock);
+}
+
+DownloadState DownloadStateManager::State () const
+{
+ EnterCriticalSection(&this->stateLock);
+
+ DownloadState ret = this->state;
+
+ LeaveCriticalSection(&this->stateLock);
+
+ return ret;
+}
+void DownloadStateManager::State (DownloadState state)
+{
+ EnterCriticalSection(&this->stateLock);
+
+ this->state = state;
+
+ this->Notify();
+ WakeAllConditionVariable(&this->stateChanged);
+ LeaveCriticalSection(&this->stateLock);
+}
+void DownloadStateManager::WaitState (DownloadState state) const
+{
+ EnterCriticalSection(&this->stateLock);
+
+ while(this->state != state)
+ SleepConditionVariableCS(&this->stateChanged, &this->stateLock, INFINITE);
+
+ LeaveCriticalSection(&this->stateLock);
+}
+void DownloadStateManager::CheckAndWait (DownloadState check, DownloadState wait) const
+{
+ EnterCriticalSection(&this->stateLock);
+
+ if(this->state == check)
+ while(this->state != wait)
+ SleepConditionVariableCS(&this->stateChanged, &this->stateLock, INFINITE);
+
+ LeaveCriticalSection(&this->stateLock);
+}
+void DownloadStateManager::Attach (IDownloadObserver *observer)
+{
+ EnterCriticalSection(&this->stateLock);
+ this->observers.push_back(observer);
+ LeaveCriticalSection(&this->stateLock);
+}
+void DownloadStateManager::Detach (IDownloadObserver *observer)
+{
+ EnterCriticalSection(&this->stateLock);
+
+ uint32_t pos = -1;
+
+ for(size_t i = 0; i < this->observers.size(); i++)
+ if(this->observers.at(i) == observer)
+ pos = i;
+
+ if(pos != -1)
+ this->observers.erase(this->observers.begin() + pos);
+
+ LeaveCriticalSection(&this->stateLock);
+}
+void DownloadStateManager::Notify ()
+{
+ for(size_t i = 0; i < this->observers.size(); i++)
+ this->observers.at(i)->OnDownloadStateChanged(this->state);
+}
+void DownloadStateManager::CheckAndSet (DownloadState check, DownloadState set)
+{
+ EnterCriticalSection(&this->stateLock);
+
+ if(this->state == check)
+ this->state = set;
+ LeaveCriticalSection(&this->stateLock);
+}
diff --git a/src/libdash/source/network/DownloadStateManager.h b/src/libdash/source/network/DownloadStateManager.h
new file mode 100644
index 00000000..90dd770d
--- /dev/null
+++ b/src/libdash/source/network/DownloadStateManager.h
@@ -0,0 +1,50 @@
+/*
+ * DownloadStateManager.h
+ *****************************************************************************
+ * Copyright (C) 2012, bitmovin Softwareentwicklung OG, All Rights Reserved
+ *
+ * Email: libdash-dev@vicky.bitmovin.net
+ *
+ * This source code and its use and distribution, is subject to the terms
+ * and conditions of the applicable license agreement.
+ *****************************************************************************/
+
+#ifndef DOWNLOADSTATEMANAGER_H_
+#define DOWNLOADSTATEMANAGER_H_
+
+#include "config.h"
+
+#include "IDownloadObserver.h"
+#include "../portable/MultiThreading.h"
+
+namespace dash
+{
+ namespace network
+ {
+ class DownloadStateManager
+ {
+ public:
+ DownloadStateManager ();
+ virtual ~DownloadStateManager ();
+
+ DownloadState State () const;
+ void WaitState (DownloadState state) const;
+ void CheckAndWait (DownloadState check, DownloadState wait) const;
+ void CheckAndSet (DownloadState check, DownloadState set);
+ void State (DownloadState state);
+ void Attach (IDownloadObserver *observer);
+ void Detach (IDownloadObserver *observer);
+
+ private:
+ DownloadState state;
+ mutable CRITICAL_SECTION stateLock;
+ mutable CONDITION_VARIABLE stateChanged;
+
+ std::vector<IDownloadObserver *> observers;
+
+ void Notify ();
+ };
+ }
+}
+
+#endif /* DOWNLOADSTATEMANAGER_H_ */