fixed blocking queue

This commit is contained in:
Neeflix
2018-07-11 14:53:20 +02:00
parent 08d969b9b8
commit 3e08697607
2 changed files with 49 additions and 25 deletions

View File

@@ -10,6 +10,7 @@
#include <mutex> #include <mutex>
#include <condition_variable> #include <condition_variable>
#include <future>
#include <boost/heap/priority_queue.hpp> #include <boost/heap/priority_queue.hpp>
using namespace boost; using namespace boost;
@@ -18,32 +19,55 @@ template <typename T>
class BlockingQueue class BlockingQueue
{ {
private: private:
std::mutex d_mutex; std::mutex mutex;
std::condition_variable d_condition;
heap::priority_queue<T, heap::stable<true>> p_queue; heap::priority_queue<T, heap::stable<true>> p_queue;
std::promise<T> promise;
std::shared_future<T> future_pop;
public: public:
void push(T const& value) BlockingQueue()
{ {
std::unique_lock<std::mutex> lock(this->d_mutex); this->future_pop = promise.get_future();
}
void push(const T& value)
{
std::lock_guard lock(this->mutex);
if(this->p_queue.empty())
{
this->promise.set_value(value);
return;
}
p_queue.push(value); p_queue.push(value);
this->d_condition.notify_one();
} }
T pop() T pop()
{ {
// TODO die queue funzt net weil wir nich pushen koennen wenn wir im pop warten std::unique_lock lock(this->mutex);
// TODO denk ma ueber future nach
std::unique_lock<std::mutex> lock(this->d_mutex); auto status = future_pop.wait_for(std::chrono::seconds(0));
this->d_condition.wait(lock, [=]{ return !this->p_queue.empty(); }); if(status == std::future_status::ready)
T rc = *this->p_queue.end(); {
this->p_queue.pop(); return future_pop.get();
return rc;
} }
void release() if (not this->p_queue.empty())
{ {
this->d_condition.notify_one(); auto element = p_queue.top();
p_queue.pop();
return element;
}
lock.unlock();
return future_pop.get();
}
private:
T get_first()
{
std::unique_lock lock(this->mutex);
} }
}; };

View File

@@ -6,7 +6,7 @@
*/ */
#include "../catch.hpp" #include "catch.hpp"
#include "fakeit.hpp" #include "fakeit.hpp"
#include <map> #include <map>
@@ -61,7 +61,7 @@ SCENARIO("Creating a Detector object", "")
AND_WHEN("an event can be found at gpio interface") AND_WHEN("an event can be found at gpio interface")
{ {
std::this_thread::sleep_for(std::chrono::milliseconds(50)); std::this_thread::sleep_for(std::chrono::milliseconds(50));
THEN("only the fitting event should be distributed by event notifier") THEN("after some time the only the fitting event should be distributed by event notifier")
{ {
REQUIRE((bool)Verify(Method(event_notifier_mock, distribute_event).Using(event2))); REQUIRE((bool)Verify(Method(event_notifier_mock, distribute_event).Using(event2)));
} }