From a644414fd2c3a3f7f41e716b6875a78981e4cfe1 Mon Sep 17 00:00:00 2001 From: jacko Date: Fri, 23 Jun 2017 16:12:18 +0200 Subject: adding mpd live handling + automatic mpd fetching Change-Id: I2c05bdf6a4d940ad22bb8632268f4b63a08a80a8 Signed-off-by: jacko --- Input/DASHReceiver.cpp | 291 ++++++++++++++++++++++++------------------------- 1 file changed, 141 insertions(+), 150 deletions(-) (limited to 'Input/DASHReceiver.cpp') diff --git a/Input/DASHReceiver.cpp b/Input/DASHReceiver.cpp index bbfe3e35..aaa75205 100644 --- a/Input/DASHReceiver.cpp +++ b/Input/DASHReceiver.cpp @@ -19,44 +19,47 @@ using namespace dash::mpd; using duration_in_seconds = std::chrono::duration >; -DASHReceiver::DASHReceiver (IMPD *mpd, IDASHReceiverObserver *obs, Buffer *buffer, uint32_t bufferSize, bool icnEnabled, double icnAlpha, float beta, float drop) : - mpd (mpd), - period (NULL), - adaptationSet (NULL), - representation (NULL), - adaptationSetStream (NULL), - representationStream (NULL), - segmentNumber (0), - observer (obs), - buffer (buffer), - bufferSize (bufferSize), - isBuffering (false), - withFeedBack (false), - icn (icnEnabled), - icnAlpha (icnAlpha), - previousQuality (0), - isPaused (false), - threadComplete (false), - isScheduledPaced (false), - targetDownload (0.0), - downloadingTime (0.0), - bufferLevelAtUpdate (0), - isBufferBased (false), - isLooping (false), - beta (beta), - drop (drop) +DASHReceiver::DASHReceiver (viper::managers::StreamType type, MPDWrapper *mpdWrapper, IDASHReceiverObserver *obs, Buffer *buffer, uint32_t bufferSize, bool icnEnabled, double icnAlpha, float beta, float drop) : + type (type), + mpdWrapper (mpdWrapper), +// period (NULL), +// adaptationSet (NULL), +// representation (NULL), + adaptationSetStream (NULL), +// representationStream (NULL), + segmentNumber (0), + observer (obs), + buffer (buffer), + bufferSize (bufferSize), + isBuffering (false), + withFeedBack (false), + icn (icnEnabled), + icnAlpha (icnAlpha), + previousQuality (0), + isPaused (false), + threadComplete (false), + isScheduledPaced (false), + targetDownload (0.0), + downloadingTime (0.0), + bufferLevelAtUpdate (0), + isBufferBased (false), + isLooping (false), + beta (beta), + drop (drop), + bufferingThread (NULL), + mpdFetcherThread (NULL) { readMax = 32768; readBuffer = (uint8_t*)malloc(sizeof(uint8_t)*readMax); - this->period = this->mpd->GetPeriods().at(0); - this->adaptationSet = this->period->GetAdaptationSets().at(0); - this->representation = this->adaptationSet->GetRepresentation().at(0); +// this->period = this->mpd->GetPeriods().at(0); +// this->adaptationSet = this->period->GetAdaptationSets().at(0); +// this->representation = this->adaptationSet->GetRepresentation().at(0); - this->adaptationSetStream = new AdaptationSetStream(mpd, period, adaptationSet); - this->representationStream = adaptationSetStream->getRepresentationStream(this->representation); + this->adaptationSetStream = new AdaptationSetStream(type, mpdWrapper); +// this->representationStream = adaptationSetStream->getRepresentationStream(this->representation); this->segmentOffset = CalculateSegmentOffset(); - this->representationStream->setSegmentOffset(this->segmentOffset); - +// this->representationStream->setSegmentOffset(this->segmentOffset); + this->mpdWrapper->setSegmentOffset(type, this->segmentOffset); this->conn = NULL; this->initConn = NULL; readMax = 32768; @@ -103,7 +106,15 @@ bool DASHReceiver::Start () this->isBuffering = false; return false; } - + //if dynamic, set up the fetching loop + if(!strcmp(this->mpdWrapper->getType().c_str(), "dynamic")) + { + this->mpdFetcherThread = createThreadPortable(DoMPDFetching, this); + if(this->mpdFetcherThread == NULL) + { + std::cout << "mpd Fetcher thread is NULL. Need to think of how to handle this?" << std::endl; + } + } return true; } void DASHReceiver::Stop() @@ -119,83 +130,43 @@ void DASHReceiver::Stop() JoinThread(this->bufferingThread); destroyThreadPortable(this->bufferingThread); } - this->period = this->mpd->GetPeriods().at(0); - this->adaptationSet = this->period->GetAdaptationSets().at(0); - this->representation = this->adaptationSet->GetRepresentation().at(0); + if(this->mpdFetcherThread != NULL) + { + JoinThread(this->mpdFetcherThread); + destroyThreadPortable(this->mpdFetcherThread); + } } MediaObject* DASHReceiver::GetNextSegment () { - ISegment *seg = NULL; EnterCriticalSection(&this->monitorPausedMutex); while(this->isPaused) SleepConditionVariableCS(&this->paused, &this->monitorPausedMutex, INFINITE); - if(!strcmp(this->mpd->GetType().c_str(), "static")) - { - if(this->segmentNumber >= this->representationStream->getSize()) - { - qDebug("looping? : %s\n", this->isLooping ? "YES" : "NO"); - if(this->isLooping) - { - this->segmentNumber = 0; - } - else - { - LeaveCriticalSection(&this->monitorPausedMutex); - return NULL; - } - } - } - seg = this->representationStream->getMediaSegment(this->segmentNumber); - - if (seg != NULL) - { - std::vector rep = this->adaptationSet->GetRepresentation(); - - this->NotifyQualityDownloading(this->representation->GetBandwidth()); + MediaObject *media = this->mpdWrapper->getNextSegment(type, isLooping, this->segmentNumber, withFeedBack); - MediaObject *media = new MediaObject(seg, this->representation,this->withFeedBack); - this->segmentNumber++; - LeaveCriticalSection(&this->monitorPausedMutex); - return media; - } + if(media) + this->NotifyQualityDownloading(media->GetRepresentationBandwidth()); LeaveCriticalSection(&this->monitorPausedMutex); - return NULL; + return media; } MediaObject* DASHReceiver::GetSegment (uint32_t segNum) { - ISegment *seg = NULL; - - if(segNum >= this->representationStream->getSize()) - return NULL; - - seg = this->representationStream->getMediaSegment(segNum); - - if (seg != NULL) - { - MediaObject *media = new MediaObject(seg, this->representation); - return media; - } - - return NULL; + return this->mpdWrapper->getSegment(type, segNum); } + MediaObject* DASHReceiver::GetInitSegment () { - ISegment *seg = NULL; - - seg = this->representationStream->getInitializationSegment(); - - if (seg != NULL) - { - MediaObject *media = new MediaObject(seg, this->representation); - return media; - } + return this->mpdWrapper->getInitSegment(type); +} - return NULL; +MediaObject* DASHReceiver::GetInitSegmentWithoutLock () +{ + return this->mpdWrapper->getInitSegmentWithoutLock(type); } -MediaObject* DASHReceiver::FindInitSegment (dash::mpd::IRepresentation *representation) + +MediaObject* DASHReceiver::FindInitSegment (int representation) { if (!this->InitSegmentExists(representation)) return NULL; @@ -224,45 +195,9 @@ void DASHReceiver::NotifyQualityDownloading (uint32_t quality) this->observer->notifyQualityDownloading(quality); } -void DASHReceiver::SetRepresentation (IPeriod *period, IAdaptationSet *adaptationSet, IRepresentation *representation) +void DASHReceiver::SetRepresentation () { - EnterCriticalSection(&this->monitorMutex); - - bool periodChanged = false; - - if (this->representation == representation) - { - LeaveCriticalSection(&this->monitorMutex); - return; - } - - this->representation = representation; - - if (this->adaptationSet != adaptationSet) - { - this->adaptationSet = adaptationSet; - - if (this->period != period) - { - this->period = period; - periodChanged = true; - } - - delete this->adaptationSetStream; - this->adaptationSetStream = NULL; - - this->adaptationSetStream = new AdaptationSetStream(this->mpd, this->period, this->adaptationSet); - } - - this->representationStream = this->adaptationSetStream->getRepresentationStream(this->representation); - this->DownloadInitSegment(this->representation); - - if (periodChanged) - { - this->segmentNumber = 0; - this->CalculateSegmentOffset(); - } - LeaveCriticalSection(&this->monitorMutex); + this->DownloadInitSegmentWithoutLock(); } libdash::framework::adaptation::IAdaptationLogic* DASHReceiver::GetAdaptationLogic () @@ -271,19 +206,13 @@ libdash::framework::adaptation::IAdaptationLogic* DASHReceiver::GetAdaptationLog } dash::mpd::IRepresentation* DASHReceiver::GetRepresentation () { - return this->representation; + return NULL; } uint32_t DASHReceiver::CalculateSegmentOffset () { - if (mpd->GetType() == "static") - return 0; - - uint32_t firstSegNum = this->representationStream->getFirstSegmentNumber(); - uint32_t currSegNum = this->representationStream->getCurrentSegmentNumber(); - uint32_t startSegNum = currSegNum - 2*bufferSize; - - return (startSegNum > firstSegNum) ? startSegNum : firstSegNum; + return this->mpdWrapper->calculateSegmentOffset(type, bufferSize); } + void DASHReceiver::NotifySegmentDownloaded () { this->observer->onSegmentDownloaded(); @@ -291,14 +220,32 @@ void DASHReceiver::NotifySegmentDownloaded () void DASHReceiver::NotifyBitrateChange(dash::mpd::IRepresentation *representation) { - if(this->representation != representation) +// if(this->representation != representation) +// { +// this->representation = representation; +// this->SetRepresentation(this->period,this->adaptationSet,this->representation); +// } +} +void DASHReceiver::DownloadInitSegmentWithoutLock () +{ + int rep = std::stoi(this->mpdWrapper->getRepresentationIDWithoutLock(type).c_str()); + if (this->InitSegmentExists(rep)) + return; + + MediaObject *initSeg = NULL; + initSeg = this->GetInitSegmentWithoutLock(); + + if (initSeg) { - this->representation = representation; - this->SetRepresentation(this->period,this->adaptationSet,this->representation); + initSeg->StartDownload(this->initConn); + this->initSegments[rep] = initSeg; + initSeg->WaitFinished(); } } -void DASHReceiver::DownloadInitSegment (IRepresentation* rep) + +void DASHReceiver::DownloadInitSegment () { + int rep = std::stoi(this->mpdWrapper->getRepresentationID(type).c_str()); if (this->InitSegmentExists(rep)) return; @@ -312,7 +259,8 @@ void DASHReceiver::DownloadInitSegment (IRepresentatio initSeg->WaitFinished(); } } -bool DASHReceiver::InitSegmentExists (IRepresentation* rep) + +bool DASHReceiver::InitSegmentExists (int rep) { if (this->initSegments.find(rep) != this->initSegments.end()) return true; @@ -358,20 +306,18 @@ void DASHReceiver::OnEOS(bool value) bool DASHReceiver::PushBack(MediaObject *mediaObject) { - MediaObject *init = this->FindInitSegment(mediaObject->GetRepresentation()); + MediaObject *init = this->FindInitSegment(mediaObject->GetRepresentationID()); mediaObject->AddInitSegment(init); //TODO the read should be in a function //Grab the infos for the analytics: bitrate, fps - dash::mpd::IRepresentation* datRep = mediaObject->GetRepresentation(); uint32_t bitrate = 0; int fps = 0; uint32_t quality = 0; - bitrate = datRep->GetBandwidth(); - quality = datRep->GetHeight(); + bitrate = mediaObject->GetRepresentationBandwidth(); + quality = mediaObject->GetRepresentationHeight(); fps = this->bufferLevelAtUpdate; this->observer->notifyStatistics((int)this->segmentNumber - 1, bitrate, fps, quality); - return(this->buffer->pushBack(mediaObject)); } @@ -380,7 +326,7 @@ void* DASHReceiver::DoBuffering (void *recei { DASHReceiver *dashReceiver = (DASHReceiver *) receiver; - dashReceiver->DownloadInitSegment(dashReceiver->GetRepresentation()); + dashReceiver->DownloadInitSegment(); MediaObject *media = dashReceiver->GetNextSegment(); dashReceiver->NotifyCheckedAdaptationLogic(); @@ -426,6 +372,25 @@ void* DASHReceiver::DoBuffering (void *recei return NULL; } +void* DASHReceiver::DoMPDFetching (void* receiver) +{ + DASHReceiver* dashReceiver = (DASHReceiver*) receiver; + uint32_t currTime = TimeResolver::getCurrentTimeInSec(); + uint32_t publishedTime = TimeResolver::getUTCDateTimeInSec(dashReceiver->mpdWrapper->getPublishTime()); + uint32_t period = TimeResolver::getDurationInSec(dashReceiver->mpdWrapper->getMinimumUpdatePeriod()); + while(dashReceiver->isBuffering) + { + while(dashReceiver->isBuffering && currTime < publishedTime + period) + { + usleep(((publishedTime + period) - currTime) * 1000000); + currTime = TimeResolver::getCurrentTimeInSec(); + } + dashReceiver->observer->fetchMPD(); + publishedTime = TimeResolver::getUTCDateTimeInSec(dashReceiver->mpdWrapper->getPublishTime()); + period = TimeResolver::getDurationInSec(dashReceiver->mpdWrapper->getMinimumUpdatePeriod()); + } +} + //can Push video to buffer in the renderer bool DASHReceiver::CanPush () { @@ -454,4 +419,30 @@ void DASHReceiver::SetDrop (float drop) this->drop = drop; } +void DASHReceiver::updateMPD(IMPD* mpd) +{ +// EnterCriticalSection(&this->monitorMutex); + //First we need to find the new segmentNumber -> what is the segment time now? +// uint32_t time = this->representationStream->getTime(this->segmentNumber); +// printf("old segmentNumber!: %d\n", this->segmentNumber); +// printf("time: %u\n", time); + //Second, replace the MPD with the new one +// delete(this->mpd); +// this->mpd = mpd; +// + //Third, Update all the structures associated to the mpd +// this->period = this->mpd->GetPeriods().at(0); +// this->adaptationSet = this->period->GetAdaptationSets().at(0); +// this->representation = this->adaptationSet->GetRepresentation().at(0); +// delete(this->adaptationSetStream); +// this->adaptationSetStream = new AdaptationSetStream(mpd, period, adaptationSet); +// this->representationStream = adaptationSetStream->getRepresentationStream(this->representation); +// this->segmentOffset = CalculateSegmentOffset(); +// this->representationStream->setSegmentOffset(this->segmentOffset); +// + //Fourth, Set the new segmentNumber by finding the index of the segment associated to 'uint32_t time' in the new mpd +// this->segmentNumber = this->representationStream->getSegmentNumber(time); +// printf("new segmentNumber!: %d\n", this->segmentNumber); +// LeaveCriticalSection(&this->monitorMutex); +} -- cgit 1.2.3-korg