/* * Buffer.h ***************************************************************************** * Copyright (C) 2012, bitmovin Softwareentwicklung OG, All Rights Reserved * * Email: libdash-dev@vicky.bitmovin.net * * This source code and its use and distribution, is subject to the terms * and conditions of the applicable license agreement. *****************************************************************************/ #ifndef LIBDASH_FRAMEWORK_BUFFER_BUFFER_H_ #define LIBDASH_FRAMEWORK_BUFFER_BUFFER_H_ #include "../Portable/MultiThreading.h" #include "IBufferObserver.h" #include #include namespace libdash { namespace framework { namespace buffer { template class Buffer { public: Buffer(uint32_t maxcapacity, BufferType type); virtual ~Buffer(); bool pushBack(T *element); T* front(); T* getFront(); void popFront(); void clearTail(); void clear(); void setEOS(bool value); uint32_t length(); uint32_t capacity(); void attachObserver(IBufferObserver *observer); void notify(); private: std::deque objects; std::vector observer; bool eos; uint32_t maxcapacity; mutable CRITICAL_SECTION monitorMutex; mutable CONDITION_VARIABLE full; mutable CONDITION_VARIABLE empty; BufferType type; }; } } } using namespace libdash::framework::buffer; template Buffer::Buffer(uint32_t maxcapacity, BufferType type) : type (type), eos (false), maxcapacity (maxcapacity) { InitializeConditionVariable (&this->full); InitializeConditionVariable (&this->empty); InitializeCriticalSection (&this->monitorMutex); } template Buffer::~Buffer() { this->clear(); DeleteConditionVariable (&this->full); DeleteConditionVariable (&this->empty); DeleteCriticalSection (&this->monitorMutex); } template bool Buffer::pushBack(T *object) { EnterCriticalSection(&this->monitorMutex); while(this->objects.size() >= this->maxcapacity && !this->eos) SleepConditionVariableCS(&this->empty, &this->monitorMutex, INFINITE); if(this->objects.size() >= this->maxcapacity) { LeaveCriticalSection(&this->monitorMutex); return false; } this->objects.push_back(object); WakeAllConditionVariable(&this->full); LeaveCriticalSection(&this->monitorMutex); this->notify(); return true; } template T* Buffer::front() { EnterCriticalSection(&this->monitorMutex); while(this->objects.size() == 0 && !this->eos) SleepConditionVariableCS(&this->full, &this->monitorMutex, INFINITE); if(this->objects.size() == 0) { LeaveCriticalSection(&this->monitorMutex); return NULL; } T *object = this->objects.front(); LeaveCriticalSection(&this->monitorMutex); return object; } template T* Buffer::getFront() { EnterCriticalSection(&this->monitorMutex); while(this->objects.size() == 0 && !this->eos) SleepConditionVariableCS(&this->full, &this->monitorMutex, INFINITE); if(this->objects.size() == 0) { LeaveCriticalSection(&this->monitorMutex); return NULL; } T *object = this->objects.front(); this->objects.pop_front(); this->objects.shrink_to_fit(); WakeAllConditionVariable(&this->empty); LeaveCriticalSection(&this->monitorMutex); this->notify(); return object; } template uint32_t Buffer::length() { EnterCriticalSection(&this->monitorMutex); uint32_t ret = this->objects.size(); LeaveCriticalSection(&this->monitorMutex); return ret; } template void Buffer::popFront() { EnterCriticalSection(&this->monitorMutex); this->objects.pop_front(); this->objects.shrink_to_fit(); WakeAllConditionVariable(&this->empty); LeaveCriticalSection(&this->monitorMutex); this->Notify(); } template void Buffer::setEOS(bool value) { EnterCriticalSection(&this->monitorMutex); this->eos = value; WakeAllConditionVariable(&this->empty); WakeAllConditionVariable(&this->full); LeaveCriticalSection(&this->monitorMutex); } template void Buffer::attachObserver(IBufferObserver *observer) { this->observer.push_back(observer); } template void Buffer::notify() { for(size_t i = 0; i < this->observer.size(); i++) this->observer.at(i)->onBufferStateChanged(this->type, (uint32_t)((double)this->objects.size()/(double)this->maxcapacity*100.0), this->maxcapacity); } template void Buffer::clearTail() { EnterCriticalSection(&this->monitorMutex); int size = this->objects.size() - 1; if (size < 1) { LeaveCriticalSection(&this->monitorMutex); return; } T* object = this->objects.front(); this->objects.pop_front(); for(int i=0; i < size; i++) { delete this->objects.front(); this->objects.pop_front(); } this->objects.shrink_to_fit(); this->objects.push_back(object); WakeAllConditionVariable(&this->empty); WakeAllConditionVariable(&this->full); LeaveCriticalSection(&this->monitorMutex); this->Notify(); } template void Buffer::clear() { EnterCriticalSection(&this->monitorMutex); for(int i = 0; i < this->objects.size(); i++) delete this->objects[i]; this->objects.clear(); this->objects.shrink_to_fit(); WakeAllConditionVariable(&this->empty); WakeAllConditionVariable(&this->full); LeaveCriticalSection(&this->monitorMutex); this->notify(); } template uint32_t Buffer::capacity() { return this->maxcapacity; } #endif /* LIBDASH_FRAMEWORK_BUFFER_BUFFER_H_ */