summaryrefslogtreecommitdiffstats
path: root/Buffer/Buffer.h
diff options
context:
space:
mode:
Diffstat (limited to 'Buffer/Buffer.h')
-rw-r--r--Buffer/Buffer.h238
1 files changed, 238 insertions, 0 deletions
diff --git a/Buffer/Buffer.h b/Buffer/Buffer.h
new file mode 100644
index 00000000..1a95821c
--- /dev/null
+++ b/Buffer/Buffer.h
@@ -0,0 +1,238 @@
+/*
+ * Buffer.h
+ *****************************************************************************
+ * Copyright (C) 2012, bitmovin Softwareentwicklung OG, All Rights Reserved
+ *
+ * Email: libdash-dev@vicky.bitmovin.net
+ *
+ * This source code and its use and distribution, is subject to the terms
+ * and conditions of the applicable license agreement.
+ *****************************************************************************/
+
+#ifndef LIBDASH_FRAMEWORK_BUFFER_BUFFER_H_
+#define LIBDASH_FRAMEWORK_BUFFER_BUFFER_H_
+
+#include "../Portable/MultiThreading.h"
+#include "IBufferObserver.h"
+#include <deque>
+#include <vector>
+
+namespace libdash
+{
+namespace framework
+{
+
+namespace buffer
+{
+template <class T>
+class Buffer
+{
+public:
+ Buffer(uint32_t maxcapacity, BufferType type);
+ virtual ~Buffer();
+
+ bool pushBack(T *element);
+ T* front();
+ T* getFront();
+ void popFront();
+ void clearTail();
+ void clear();
+ void setEOS(bool value);
+ uint32_t length();
+ uint32_t capacity();
+ void attachObserver(IBufferObserver *observer);
+ void notify();
+
+private:
+ std::deque<T*> objects;
+ std::vector<IBufferObserver *> observer;
+ bool eos;
+ uint32_t maxcapacity;
+ mutable CRITICAL_SECTION monitorMutex;
+ mutable CONDITION_VARIABLE full;
+ mutable CONDITION_VARIABLE empty;
+ BufferType type;
+};
+}
+}
+}
+
+using namespace libdash::framework::buffer;
+
+template <class T>
+Buffer<T>::Buffer(uint32_t maxcapacity, BufferType type) :
+ type (type),
+ eos (false),
+ maxcapacity (maxcapacity)
+{
+ InitializeConditionVariable (&this->full);
+ InitializeConditionVariable (&this->empty);
+ InitializeCriticalSection (&this->monitorMutex);
+}
+
+template <class T> Buffer<T>::~Buffer()
+{
+ this->clear();
+
+ DeleteConditionVariable (&this->full);
+ DeleteConditionVariable (&this->empty);
+ DeleteCriticalSection (&this->monitorMutex);
+}
+
+template <class T> bool Buffer<T>::pushBack(T *object)
+{
+ EnterCriticalSection(&this->monitorMutex);
+
+ while(this->objects.size() >= this->maxcapacity && !this->eos)
+ SleepConditionVariableCS(&this->empty, &this->monitorMutex, INFINITE);
+
+ if(this->objects.size() >= this->maxcapacity)
+ {
+ LeaveCriticalSection(&this->monitorMutex);
+ return false;
+ }
+
+ this->objects.push_back(object);
+
+ WakeAllConditionVariable(&this->full);
+ LeaveCriticalSection(&this->monitorMutex);
+ this->notify();
+ return true;
+}
+
+template <class T> T* Buffer<T>::front()
+{
+ EnterCriticalSection(&this->monitorMutex);
+
+ while(this->objects.size() == 0 && !this->eos)
+ SleepConditionVariableCS(&this->full, &this->monitorMutex, INFINITE);
+
+ if(this->objects.size() == 0)
+ {
+ LeaveCriticalSection(&this->monitorMutex);
+ return NULL;
+ }
+
+ T *object = this->objects.front();
+
+ LeaveCriticalSection(&this->monitorMutex);
+
+ return object;
+}
+
+template <class T> T* Buffer<T>::getFront()
+{
+ EnterCriticalSection(&this->monitorMutex);
+
+ while(this->objects.size() == 0 && !this->eos)
+ SleepConditionVariableCS(&this->full, &this->monitorMutex, INFINITE);
+
+ if(this->objects.size() == 0)
+ {
+ LeaveCriticalSection(&this->monitorMutex);
+ return NULL;
+ }
+
+ T *object = this->objects.front();
+ this->objects.pop_front();
+ this->objects.shrink_to_fit();
+ WakeAllConditionVariable(&this->empty);
+ LeaveCriticalSection(&this->monitorMutex);
+ this->notify();
+
+ return object;
+}
+
+template <class T> uint32_t Buffer<T>::length()
+{
+ EnterCriticalSection(&this->monitorMutex);
+
+ uint32_t ret = this->objects.size();
+
+ LeaveCriticalSection(&this->monitorMutex);
+
+ return ret;
+}
+
+template <class T> void Buffer<T>::popFront()
+{
+ EnterCriticalSection(&this->monitorMutex);
+
+ this->objects.pop_front();
+ this->objects.shrink_to_fit();
+
+ WakeAllConditionVariable(&this->empty);
+ LeaveCriticalSection(&this->monitorMutex);
+ this->Notify();
+}
+
+template <class T> void Buffer<T>::setEOS(bool value)
+{
+ EnterCriticalSection(&this->monitorMutex);
+
+ this->eos = value;
+ WakeAllConditionVariable(&this->empty);
+ WakeAllConditionVariable(&this->full);
+ LeaveCriticalSection(&this->monitorMutex);
+}
+
+template <class T> void Buffer<T>::attachObserver(IBufferObserver *observer)
+{
+ this->observer.push_back(observer);
+}
+
+template <class T> void Buffer<T>::notify()
+{
+ for(size_t i = 0; i < this->observer.size(); i++)
+ this->observer.at(i)->onBufferStateChanged(this->type, (uint32_t)((double)this->objects.size()/(double)this->maxcapacity*100.0), this->maxcapacity);
+}
+
+template <class T> void Buffer<T>::clearTail()
+{
+ EnterCriticalSection(&this->monitorMutex);
+
+ int size = this->objects.size() - 1;
+
+ if (size < 1)
+ {
+ LeaveCriticalSection(&this->monitorMutex);
+ return;
+ }
+
+ T* object = this->objects.front();
+ this->objects.pop_front();
+ for(int i=0; i < size; i++)
+ {
+ delete this->objects.front();
+ this->objects.pop_front();
+ }
+ this->objects.shrink_to_fit();
+
+ this->objects.push_back(object);
+ WakeAllConditionVariable(&this->empty);
+ WakeAllConditionVariable(&this->full);
+ LeaveCriticalSection(&this->monitorMutex);
+ this->Notify();
+}
+
+template <class T> void Buffer<T>::clear()
+{
+ EnterCriticalSection(&this->monitorMutex);
+
+ for(int i = 0; i < this->objects.size(); i++)
+ delete this->objects[i];
+
+ this->objects.clear();
+ this->objects.shrink_to_fit();
+ WakeAllConditionVariable(&this->empty);
+ WakeAllConditionVariable(&this->full);
+ LeaveCriticalSection(&this->monitorMutex);
+ this->notify();
+}
+
+template <class T> uint32_t Buffer<T>::capacity()
+{
+ return this->maxcapacity;
+}
+
+#endif /* LIBDASH_FRAMEWORK_BUFFER_BUFFER_H_ */