1 module ddebug.common.queue;
2 
3 import core.sync.condition;
4 import core.sync.mutex;
5 
6 class BlockingQueue(T) {
7 
8     private Mutex _mutex;
9     private Condition _condition;
10     private T[] _buffer;
11     private int _readPos;
12     private int _writePos;
13     private shared bool _closed;
14 
15     this() {
16         _mutex = new Mutex();
17         _condition = new Condition(_mutex);
18         _readPos = 0;
19         _writePos = 0;
20     }
21 
22     ~this() {
23         close();
24         if (_condition) {
25             destroy(_condition);
26             _condition = null;
27         }
28         if (_mutex) {
29             destroy(_mutex);
30             _mutex = null;
31         }
32     }
33 
34     void close() {
35         if (_mutex && !_closed) {
36             synchronized(_mutex) {
37                 _closed = true;
38                 if (_condition !is null)
39                     _condition.notifyAll();
40             }
41         } else {
42             _closed = true;
43         }
44     }
45 
46     /// returns true if queue is closed
47     @property bool closed() {
48         return _closed;
49     }
50 
51     private void move() {
52         if (_readPos > 1024 && _readPos > _buffer.length * 3 / 4) {
53             // move buffer data
54             for (int i = 0; _readPos + i < _writePos; i++)
55                 _buffer[i] = _buffer[_readPos + i];
56             _writePos -= _readPos;
57             _readPos = 0;
58         }
59     }
60 
61     private void append(ref T item) {
62         if (_writePos >= _buffer.length) {
63             move();
64             _buffer.length = _buffer.length == 0 ? 64 : _buffer.length * 2;
65         }
66         _buffer[_writePos++] = item;
67     }
68 
69     void put(T item) {
70         if (_closed)
71             return;
72         synchronized(_mutex) {
73             if (_closed)
74                 return;
75             append(item);
76             _condition.notifyAll();
77         }
78     }
79 
80     void put(T[] items) {
81         if (_closed)
82             return;
83         synchronized(_mutex) {
84             if (_closed)
85                 return;
86             foreach(ref item; items) {
87                 append(item);
88             }
89             _condition.notifyAll();
90         }
91     }
92     
93     bool get(ref T value, int timeoutMillis = 0) {
94         if (_closed)
95             return false;
96         synchronized(_mutex) {
97             if (_closed)
98                 return false;
99             if (_readPos < _writePos) {
100                 value = _buffer[_readPos++];
101                 return true;
102             }
103             try {
104                 if (timeoutMillis <= 0)
105                     _condition.wait(); // no timeout
106                 else if (!_condition.wait(dur!"msecs"(timeoutMillis)))
107                     return false; // timeout
108             } catch (Exception e) {
109                 // ignore
110             }
111             if (_readPos < _writePos) {
112                 value = _buffer[_readPos++];
113                 return true;
114             }
115         }
116         return false;
117     }
118 
119     bool getAll(ref T[] values, int timeoutMillis) {
120         if (_closed)
121             return false;
122         synchronized(_mutex) {
123             if (_closed)
124                 return false;
125             values.length = 0;
126             while (_readPos < _writePos)
127                 values ~= _buffer[_readPos++];
128             if (values.length > 0)
129                 return true;
130             if (timeoutMillis <= 0)
131                 _condition.wait(); // no timeout
132             else if (!_condition.wait(dur!"msecs"(timeoutMillis)))
133                 return false; // timeout
134             while (_readPos < _writePos)
135                 values ~= _buffer[_readPos++];
136             if (values.length > 0)
137                 return true;
138         }
139         return false;
140     }
141 }