aboutsummaryrefslogtreecommitdiffstats
path: root/Input/DASHReceiver.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'Input/DASHReceiver.cpp')
-rw-r--r--Input/DASHReceiver.cpp291
1 files changed, 141 insertions, 150 deletions
diff --git a/Input/DASHReceiver.cpp b/Input/DASHReceiver.cpp
index bbfe3e35..aaa75205 100644
--- a/Input/DASHReceiver.cpp
+++ b/Input/DASHReceiver.cpp
@@ -19,44 +19,47 @@ using namespace dash::mpd;
using duration_in_seconds = std::chrono::duration<double, std::ratio<1, 1> >;
-DASHReceiver::DASHReceiver (IMPD *mpd, IDASHReceiverObserver *obs, Buffer<MediaObject> *buffer, uint32_t bufferSize, bool icnEnabled, double icnAlpha, float beta, float drop) :
- mpd (mpd),
- period (NULL),
- adaptationSet (NULL),
- representation (NULL),
- adaptationSetStream (NULL),
- representationStream (NULL),
- segmentNumber (0),
- observer (obs),
- buffer (buffer),
- bufferSize (bufferSize),
- isBuffering (false),
- withFeedBack (false),
- icn (icnEnabled),
- icnAlpha (icnAlpha),
- previousQuality (0),
- isPaused (false),
- threadComplete (false),
- isScheduledPaced (false),
- targetDownload (0.0),
- downloadingTime (0.0),
- bufferLevelAtUpdate (0),
- isBufferBased (false),
- isLooping (false),
- beta (beta),
- drop (drop)
+DASHReceiver::DASHReceiver (viper::managers::StreamType type, MPDWrapper *mpdWrapper, IDASHReceiverObserver *obs, Buffer<MediaObject> *buffer, uint32_t bufferSize, bool icnEnabled, double icnAlpha, float beta, float drop) :
+ type (type),
+ mpdWrapper (mpdWrapper),
+// period (NULL),
+// adaptationSet (NULL),
+// representation (NULL),
+ adaptationSetStream (NULL),
+// representationStream (NULL),
+ segmentNumber (0),
+ observer (obs),
+ buffer (buffer),
+ bufferSize (bufferSize),
+ isBuffering (false),
+ withFeedBack (false),
+ icn (icnEnabled),
+ icnAlpha (icnAlpha),
+ previousQuality (0),
+ isPaused (false),
+ threadComplete (false),
+ isScheduledPaced (false),
+ targetDownload (0.0),
+ downloadingTime (0.0),
+ bufferLevelAtUpdate (0),
+ isBufferBased (false),
+ isLooping (false),
+ beta (beta),
+ drop (drop),
+ bufferingThread (NULL),
+ mpdFetcherThread (NULL)
{
readMax = 32768;
readBuffer = (uint8_t*)malloc(sizeof(uint8_t)*readMax);
- this->period = this->mpd->GetPeriods().at(0);
- this->adaptationSet = this->period->GetAdaptationSets().at(0);
- this->representation = this->adaptationSet->GetRepresentation().at(0);
+// this->period = this->mpd->GetPeriods().at(0);
+// this->adaptationSet = this->period->GetAdaptationSets().at(0);
+// this->representation = this->adaptationSet->GetRepresentation().at(0);
- this->adaptationSetStream = new AdaptationSetStream(mpd, period, adaptationSet);
- this->representationStream = adaptationSetStream->getRepresentationStream(this->representation);
+ this->adaptationSetStream = new AdaptationSetStream(type, mpdWrapper);
+// this->representationStream = adaptationSetStream->getRepresentationStream(this->representation);
this->segmentOffset = CalculateSegmentOffset();
- this->representationStream->setSegmentOffset(this->segmentOffset);
-
+// this->representationStream->setSegmentOffset(this->segmentOffset);
+ this->mpdWrapper->setSegmentOffset(type, this->segmentOffset);
this->conn = NULL;
this->initConn = NULL;
readMax = 32768;
@@ -103,7 +106,15 @@ bool DASHReceiver::Start ()
this->isBuffering = false;
return false;
}
-
+ //if dynamic, set up the fetching loop
+ if(!strcmp(this->mpdWrapper->getType().c_str(), "dynamic"))
+ {
+ this->mpdFetcherThread = createThreadPortable(DoMPDFetching, this);
+ if(this->mpdFetcherThread == NULL)
+ {
+ std::cout << "mpd Fetcher thread is NULL. Need to think of how to handle this?" << std::endl;
+ }
+ }
return true;
}
void DASHReceiver::Stop()
@@ -119,83 +130,43 @@ void DASHReceiver::Stop()
JoinThread(this->bufferingThread);
destroyThreadPortable(this->bufferingThread);
}
- this->period = this->mpd->GetPeriods().at(0);
- this->adaptationSet = this->period->GetAdaptationSets().at(0);
- this->representation = this->adaptationSet->GetRepresentation().at(0);
+ if(this->mpdFetcherThread != NULL)
+ {
+ JoinThread(this->mpdFetcherThread);
+ destroyThreadPortable(this->mpdFetcherThread);
+ }
}
MediaObject* DASHReceiver::GetNextSegment ()
{
- ISegment *seg = NULL;
EnterCriticalSection(&this->monitorPausedMutex);
while(this->isPaused)
SleepConditionVariableCS(&this->paused, &this->monitorPausedMutex, INFINITE);
- if(!strcmp(this->mpd->GetType().c_str(), "static"))
- {
- if(this->segmentNumber >= this->representationStream->getSize())
- {
- qDebug("looping? : %s\n", this->isLooping ? "YES" : "NO");
- if(this->isLooping)
- {
- this->segmentNumber = 0;
- }
- else
- {
- LeaveCriticalSection(&this->monitorPausedMutex);
- return NULL;
- }
- }
- }
- seg = this->representationStream->getMediaSegment(this->segmentNumber);
-
- if (seg != NULL)
- {
- std::vector<IRepresentation *> rep = this->adaptationSet->GetRepresentation();
-
- this->NotifyQualityDownloading(this->representation->GetBandwidth());
+ MediaObject *media = this->mpdWrapper->getNextSegment(type, isLooping, this->segmentNumber, withFeedBack);
- MediaObject *media = new MediaObject(seg, this->representation,this->withFeedBack);
- this->segmentNumber++;
- LeaveCriticalSection(&this->monitorPausedMutex);
- return media;
- }
+ if(media)
+ this->NotifyQualityDownloading(media->GetRepresentationBandwidth());
LeaveCriticalSection(&this->monitorPausedMutex);
- return NULL;
+ return media;
}
MediaObject* DASHReceiver::GetSegment (uint32_t segNum)
{
- ISegment *seg = NULL;
-
- if(segNum >= this->representationStream->getSize())
- return NULL;
-
- seg = this->representationStream->getMediaSegment(segNum);
-
- if (seg != NULL)
- {
- MediaObject *media = new MediaObject(seg, this->representation);
- return media;
- }
-
- return NULL;
+ return this->mpdWrapper->getSegment(type, segNum);
}
+
MediaObject* DASHReceiver::GetInitSegment ()
{
- ISegment *seg = NULL;
-
- seg = this->representationStream->getInitializationSegment();
-
- if (seg != NULL)
- {
- MediaObject *media = new MediaObject(seg, this->representation);
- return media;
- }
+ return this->mpdWrapper->getInitSegment(type);
+}
- return NULL;
+MediaObject* DASHReceiver::GetInitSegmentWithoutLock ()
+{
+ return this->mpdWrapper->getInitSegmentWithoutLock(type);
}
-MediaObject* DASHReceiver::FindInitSegment (dash::mpd::IRepresentation *representation)
+
+MediaObject* DASHReceiver::FindInitSegment (int representation)
{
if (!this->InitSegmentExists(representation))
return NULL;
@@ -224,45 +195,9 @@ void DASHReceiver::NotifyQualityDownloading (uint32_t quality)
this->observer->notifyQualityDownloading(quality);
}
-void DASHReceiver::SetRepresentation (IPeriod *period, IAdaptationSet *adaptationSet, IRepresentation *representation)
+void DASHReceiver::SetRepresentation ()
{
- EnterCriticalSection(&this->monitorMutex);
-
- bool periodChanged = false;
-
- if (this->representation == representation)
- {
- LeaveCriticalSection(&this->monitorMutex);
- return;
- }
-
- this->representation = representation;
-
- if (this->adaptationSet != adaptationSet)
- {
- this->adaptationSet = adaptationSet;
-
- if (this->period != period)
- {
- this->period = period;
- periodChanged = true;
- }
-
- delete this->adaptationSetStream;
- this->adaptationSetStream = NULL;
-
- this->adaptationSetStream = new AdaptationSetStream(this->mpd, this->period, this->adaptationSet);
- }
-
- this->representationStream = this->adaptationSetStream->getRepresentationStream(this->representation);
- this->DownloadInitSegment(this->representation);
-
- if (periodChanged)
- {
- this->segmentNumber = 0;
- this->CalculateSegmentOffset();
- }
- LeaveCriticalSection(&this->monitorMutex);
+ this->DownloadInitSegmentWithoutLock();
}
libdash::framework::adaptation::IAdaptationLogic* DASHReceiver::GetAdaptationLogic ()
@@ -271,19 +206,13 @@ libdash::framework::adaptation::IAdaptationLogic* DASHReceiver::GetAdaptationLog
}
dash::mpd::IRepresentation* DASHReceiver::GetRepresentation ()
{
- return this->representation;
+ return NULL;
}
uint32_t DASHReceiver::CalculateSegmentOffset ()
{
- if (mpd->GetType() == "static")
- return 0;
-
- uint32_t firstSegNum = this->representationStream->getFirstSegmentNumber();
- uint32_t currSegNum = this->representationStream->getCurrentSegmentNumber();
- uint32_t startSegNum = currSegNum - 2*bufferSize;
-
- return (startSegNum > firstSegNum) ? startSegNum : firstSegNum;
+ return this->mpdWrapper->calculateSegmentOffset(type, bufferSize);
}
+
void DASHReceiver::NotifySegmentDownloaded ()
{
this->observer->onSegmentDownloaded();
@@ -291,14 +220,32 @@ void DASHReceiver::NotifySegmentDownloaded ()
void DASHReceiver::NotifyBitrateChange(dash::mpd::IRepresentation *representation)
{
- if(this->representation != representation)
+// if(this->representation != representation)
+// {
+// this->representation = representation;
+// this->SetRepresentation(this->period,this->adaptationSet,this->representation);
+// }
+}
+void DASHReceiver::DownloadInitSegmentWithoutLock ()
+{
+ int rep = std::stoi(this->mpdWrapper->getRepresentationIDWithoutLock(type).c_str());
+ if (this->InitSegmentExists(rep))
+ return;
+
+ MediaObject *initSeg = NULL;
+ initSeg = this->GetInitSegmentWithoutLock();
+
+ if (initSeg)
{
- this->representation = representation;
- this->SetRepresentation(this->period,this->adaptationSet,this->representation);
+ initSeg->StartDownload(this->initConn);
+ this->initSegments[rep] = initSeg;
+ initSeg->WaitFinished();
}
}
-void DASHReceiver::DownloadInitSegment (IRepresentation* rep)
+
+void DASHReceiver::DownloadInitSegment ()
{
+ int rep = std::stoi(this->mpdWrapper->getRepresentationID(type).c_str());
if (this->InitSegmentExists(rep))
return;
@@ -312,7 +259,8 @@ void DASHReceiver::DownloadInitSegment (IRepresentatio
initSeg->WaitFinished();
}
}
-bool DASHReceiver::InitSegmentExists (IRepresentation* rep)
+
+bool DASHReceiver::InitSegmentExists (int rep)
{
if (this->initSegments.find(rep) != this->initSegments.end())
return true;
@@ -358,20 +306,18 @@ void DASHReceiver::OnEOS(bool value)
bool DASHReceiver::PushBack(MediaObject *mediaObject)
{
- MediaObject *init = this->FindInitSegment(mediaObject->GetRepresentation());
+ MediaObject *init = this->FindInitSegment(mediaObject->GetRepresentationID());
mediaObject->AddInitSegment(init);
//TODO the read should be in a function
//Grab the infos for the analytics: bitrate, fps
- dash::mpd::IRepresentation* datRep = mediaObject->GetRepresentation();
uint32_t bitrate = 0;
int fps = 0;
uint32_t quality = 0;
- bitrate = datRep->GetBandwidth();
- quality = datRep->GetHeight();
+ bitrate = mediaObject->GetRepresentationBandwidth();
+ quality = mediaObject->GetRepresentationHeight();
fps = this->bufferLevelAtUpdate;
this->observer->notifyStatistics((int)this->segmentNumber - 1, bitrate, fps, quality);
-
return(this->buffer->pushBack(mediaObject));
}
@@ -380,7 +326,7 @@ void* DASHReceiver::DoBuffering (void *recei
{
DASHReceiver *dashReceiver = (DASHReceiver *) receiver;
- dashReceiver->DownloadInitSegment(dashReceiver->GetRepresentation());
+ dashReceiver->DownloadInitSegment();
MediaObject *media = dashReceiver->GetNextSegment();
dashReceiver->NotifyCheckedAdaptationLogic();
@@ -426,6 +372,25 @@ void* DASHReceiver::DoBuffering (void *recei
return NULL;
}
+void* DASHReceiver::DoMPDFetching (void* receiver)
+{
+ DASHReceiver* dashReceiver = (DASHReceiver*) receiver;
+ uint32_t currTime = TimeResolver::getCurrentTimeInSec();
+ uint32_t publishedTime = TimeResolver::getUTCDateTimeInSec(dashReceiver->mpdWrapper->getPublishTime());
+ uint32_t period = TimeResolver::getDurationInSec(dashReceiver->mpdWrapper->getMinimumUpdatePeriod());
+ while(dashReceiver->isBuffering)
+ {
+ while(dashReceiver->isBuffering && currTime < publishedTime + period)
+ {
+ usleep(((publishedTime + period) - currTime) * 1000000);
+ currTime = TimeResolver::getCurrentTimeInSec();
+ }
+ dashReceiver->observer->fetchMPD();
+ publishedTime = TimeResolver::getUTCDateTimeInSec(dashReceiver->mpdWrapper->getPublishTime());
+ period = TimeResolver::getDurationInSec(dashReceiver->mpdWrapper->getMinimumUpdatePeriod());
+ }
+}
+
//can Push video to buffer in the renderer
bool DASHReceiver::CanPush ()
{
@@ -454,4 +419,30 @@ void DASHReceiver::SetDrop (float drop)
this->drop = drop;
}
+void DASHReceiver::updateMPD(IMPD* mpd)
+{
+// EnterCriticalSection(&this->monitorMutex);
+ //First we need to find the new segmentNumber -> what is the segment time now?
+// uint32_t time = this->representationStream->getTime(this->segmentNumber);
+// printf("old segmentNumber!: %d\n", this->segmentNumber);
+// printf("time: %u\n", time);
+ //Second, replace the MPD with the new one
+// delete(this->mpd);
+// this->mpd = mpd;
+//
+ //Third, Update all the structures associated to the mpd
+// this->period = this->mpd->GetPeriods().at(0);
+// this->adaptationSet = this->period->GetAdaptationSets().at(0);
+// this->representation = this->adaptationSet->GetRepresentation().at(0);
+// delete(this->adaptationSetStream);
+// this->adaptationSetStream = new AdaptationSetStream(mpd, period, adaptationSet);
+// this->representationStream = adaptationSetStream->getRepresentationStream(this->representation);
+// this->segmentOffset = CalculateSegmentOffset();
+// this->representationStream->setSegmentOffset(this->segmentOffset);
+//
+ //Fourth, Set the new segmentNumber by finding the index of the segment associated to 'uint32_t time' in the new mpd
+// this->segmentNumber = this->representationStream->getSegmentNumber(time);
+// printf("new segmentNumber!: %d\n", this->segmentNumber);
+// LeaveCriticalSection(&this->monitorMutex);
+}