source: trunk/GDE/SINA/builddir/include/spdlog/details/mpmc_blocking_q.h

Last change on this file was 19170, checked in by westram, 2 years ago
  • sina source
    • unpack + remove tarball
    • no longer ignore sina builddir.
File size: 3.4 KB
Line 
1#pragma once
2
3//
4// Copyright(c) 2018 Gabi Melman.
5// Distributed under the MIT License (http://opensource.org/licenses/MIT)
6//
7
8// multi producer-multi consumer blocking queue.
9// enqueue(..) - will block until room found to put the new message.
10// enqueue_nowait(..) - will return immediately with false if no room left in
11// the queue.
12// dequeue_for(..) - will block until the queue is not empty or timeout have
13// passed.
14
15#include "spdlog/details/circular_q.h"
16
17#include <condition_variable>
18#include <mutex>
19
20namespace spdlog {
21namespace details {
22
23template<typename T>
24class mpmc_blocking_queue
25{
26public:
27    using item_type = T;
28    explicit mpmc_blocking_queue(size_t max_items)
29        : q_(max_items)
30    {
31    }
32
33#ifndef __MINGW32__
34    // try to enqueue and block if no room left
35    void enqueue(T &&item)
36    {
37        {
38            std::unique_lock<std::mutex> lock(queue_mutex_);
39            pop_cv_.wait(lock, [this] { return !this->q_.full(); });
40            q_.push_back(std::move(item));
41        }
42        push_cv_.notify_one();
43    }
44
45    // enqueue immediately. overrun oldest message in the queue if no room left.
46    void enqueue_nowait(T &&item)
47    {
48        {
49            std::unique_lock<std::mutex> lock(queue_mutex_);
50            q_.push_back(std::move(item));
51        }
52        push_cv_.notify_one();
53    }
54
55    // try to dequeue item. if no item found. wait upto timeout and try again
56    // Return true, if succeeded dequeue item, false otherwise
57    bool dequeue_for(T &popped_item, std::chrono::milliseconds wait_duration)
58    {
59        {
60            std::unique_lock<std::mutex> lock(queue_mutex_);
61            if (!push_cv_.wait_for(lock, wait_duration, [this] { return !this->q_.empty(); }))
62            {
63                return false;
64            }
65            q_.pop_front(popped_item);
66        }
67        pop_cv_.notify_one();
68        return true;
69    }
70
71#else
72    // apparently mingw deadlocks if the mutex is released before cv.notify_one(),
73    // so release the mutex at the very end each function.
74
75    // try to enqueue and block if no room left
76    void enqueue(T &&item)
77    {
78        std::unique_lock<std::mutex> lock(queue_mutex_);
79        pop_cv_.wait(lock, [this] { return !this->q_.full(); });
80        q_.push_back(std::move(item));
81        push_cv_.notify_one();
82    }
83
84    // enqueue immediately. overrun oldest message in the queue if no room left.
85    void enqueue_nowait(T &&item)
86    {
87        std::unique_lock<std::mutex> lock(queue_mutex_);
88        q_.push_back(std::move(item));
89        push_cv_.notify_one();
90    }
91
92    // try to dequeue item. if no item found. wait upto timeout and try again
93    // Return true, if succeeded dequeue item, false otherwise
94    bool dequeue_for(T &popped_item, std::chrono::milliseconds wait_duration)
95    {
96        std::unique_lock<std::mutex> lock(queue_mutex_);
97        if (!push_cv_.wait_for(lock, wait_duration, [this] { return !this->q_.empty(); }))
98        {
99            return false;
100        }
101        q_.pop_front(popped_item);
102        pop_cv_.notify_one();
103        return true;
104    }
105
106#endif
107
108    size_t overrun_counter()
109    {
110        std::unique_lock<std::mutex> lock(queue_mutex_);
111        return q_.overrun_counter();
112    }
113
114private:
115    std::mutex queue_mutex_;
116    std::condition_variable push_cv_;
117    std::condition_variable pop_cv_;
118    spdlog::details::circular_q<T> q_;
119};
120} // namespace details
121} // namespace spdlog
Note: See TracBrowser for help on using the repository browser.