diff options
Diffstat (limited to 'Buffer/Buffer.h')
-rw-r--r-- | Buffer/Buffer.h | 238 |
1 files changed, 238 insertions, 0 deletions
diff --git a/Buffer/Buffer.h b/Buffer/Buffer.h new file mode 100644 index 00000000..1a95821c --- /dev/null +++ b/Buffer/Buffer.h @@ -0,0 +1,238 @@ +/* + * Buffer.h + ***************************************************************************** + * Copyright (C) 2012, bitmovin Softwareentwicklung OG, All Rights Reserved + * + * Email: libdash-dev@vicky.bitmovin.net + * + * This source code and its use and distribution, is subject to the terms + * and conditions of the applicable license agreement. + *****************************************************************************/ + +#ifndef LIBDASH_FRAMEWORK_BUFFER_BUFFER_H_ +#define LIBDASH_FRAMEWORK_BUFFER_BUFFER_H_ + +#include "../Portable/MultiThreading.h" +#include "IBufferObserver.h" +#include <deque> +#include <vector> + +namespace libdash +{ +namespace framework +{ + +namespace buffer +{ +template <class T> +class Buffer +{ +public: + Buffer(uint32_t maxcapacity, BufferType type); + virtual ~Buffer(); + + bool pushBack(T *element); + T* front(); + T* getFront(); + void popFront(); + void clearTail(); + void clear(); + void setEOS(bool value); + uint32_t length(); + uint32_t capacity(); + void attachObserver(IBufferObserver *observer); + void notify(); + +private: + std::deque<T*> objects; + std::vector<IBufferObserver *> observer; + bool eos; + uint32_t maxcapacity; + mutable CRITICAL_SECTION monitorMutex; + mutable CONDITION_VARIABLE full; + mutable CONDITION_VARIABLE empty; + BufferType type; +}; +} +} +} + +using namespace libdash::framework::buffer; + +template <class T> +Buffer<T>::Buffer(uint32_t maxcapacity, BufferType type) : + type (type), + eos (false), + maxcapacity (maxcapacity) +{ + InitializeConditionVariable (&this->full); + InitializeConditionVariable (&this->empty); + InitializeCriticalSection (&this->monitorMutex); +} + +template <class T> Buffer<T>::~Buffer() +{ + this->clear(); + + DeleteConditionVariable (&this->full); + DeleteConditionVariable (&this->empty); + DeleteCriticalSection (&this->monitorMutex); +} + +template <class T> bool Buffer<T>::pushBack(T *object) +{ + EnterCriticalSection(&this->monitorMutex); + + while(this->objects.size() >= this->maxcapacity && !this->eos) + SleepConditionVariableCS(&this->empty, &this->monitorMutex, INFINITE); + + if(this->objects.size() >= this->maxcapacity) + { + LeaveCriticalSection(&this->monitorMutex); + return false; + } + + this->objects.push_back(object); + + WakeAllConditionVariable(&this->full); + LeaveCriticalSection(&this->monitorMutex); + this->notify(); + return true; +} + +template <class T> T* Buffer<T>::front() +{ + EnterCriticalSection(&this->monitorMutex); + + while(this->objects.size() == 0 && !this->eos) + SleepConditionVariableCS(&this->full, &this->monitorMutex, INFINITE); + + if(this->objects.size() == 0) + { + LeaveCriticalSection(&this->monitorMutex); + return NULL; + } + + T *object = this->objects.front(); + + LeaveCriticalSection(&this->monitorMutex); + + return object; +} + +template <class T> T* Buffer<T>::getFront() +{ + EnterCriticalSection(&this->monitorMutex); + + while(this->objects.size() == 0 && !this->eos) + SleepConditionVariableCS(&this->full, &this->monitorMutex, INFINITE); + + if(this->objects.size() == 0) + { + LeaveCriticalSection(&this->monitorMutex); + return NULL; + } + + T *object = this->objects.front(); + this->objects.pop_front(); + this->objects.shrink_to_fit(); + WakeAllConditionVariable(&this->empty); + LeaveCriticalSection(&this->monitorMutex); + this->notify(); + + return object; +} + +template <class T> uint32_t Buffer<T>::length() +{ + EnterCriticalSection(&this->monitorMutex); + + uint32_t ret = this->objects.size(); + + LeaveCriticalSection(&this->monitorMutex); + + return ret; +} + +template <class T> void Buffer<T>::popFront() +{ + EnterCriticalSection(&this->monitorMutex); + + this->objects.pop_front(); + this->objects.shrink_to_fit(); + + WakeAllConditionVariable(&this->empty); + LeaveCriticalSection(&this->monitorMutex); + this->Notify(); +} + +template <class T> void Buffer<T>::setEOS(bool value) +{ + EnterCriticalSection(&this->monitorMutex); + + this->eos = value; + WakeAllConditionVariable(&this->empty); + WakeAllConditionVariable(&this->full); + LeaveCriticalSection(&this->monitorMutex); +} + +template <class T> void Buffer<T>::attachObserver(IBufferObserver *observer) +{ + this->observer.push_back(observer); +} + +template <class T> void Buffer<T>::notify() +{ + for(size_t i = 0; i < this->observer.size(); i++) + this->observer.at(i)->onBufferStateChanged(this->type, (uint32_t)((double)this->objects.size()/(double)this->maxcapacity*100.0), this->maxcapacity); +} + +template <class T> void Buffer<T>::clearTail() +{ + EnterCriticalSection(&this->monitorMutex); + + int size = this->objects.size() - 1; + + if (size < 1) + { + LeaveCriticalSection(&this->monitorMutex); + return; + } + + T* object = this->objects.front(); + this->objects.pop_front(); + for(int i=0; i < size; i++) + { + delete this->objects.front(); + this->objects.pop_front(); + } + this->objects.shrink_to_fit(); + + this->objects.push_back(object); + WakeAllConditionVariable(&this->empty); + WakeAllConditionVariable(&this->full); + LeaveCriticalSection(&this->monitorMutex); + this->Notify(); +} + +template <class T> void Buffer<T>::clear() +{ + EnterCriticalSection(&this->monitorMutex); + + for(int i = 0; i < this->objects.size(); i++) + delete this->objects[i]; + + this->objects.clear(); + this->objects.shrink_to_fit(); + WakeAllConditionVariable(&this->empty); + WakeAllConditionVariable(&this->full); + LeaveCriticalSection(&this->monitorMutex); + this->notify(); +} + +template <class T> uint32_t Buffer<T>::capacity() +{ + return this->maxcapacity; +} + +#endif /* LIBDASH_FRAMEWORK_BUFFER_BUFFER_H_ */ |