1 module ddebug.common.queue;
2 
3 import core.sync.condition;
4 import core.sync.mutex;
5 
6 class BlockingQueue(T) {
7 
8 	private Mutex _mutex;
9 	private Condition _condition;
10 	private T[] _buffer;
11 	private int _readPos;
12 	private int _writePos;
13 	private shared bool _closed;
14 
15 	this() {
16 		_mutex = new Mutex();
17 		_condition = new Condition(_mutex);
18 		_readPos = 0;
19 		_writePos = 0;
20 	}
21 
22 	void close() {
23 		if (_mutex && !_closed) {
24 			synchronized(_mutex) {
25 				_closed = true;
26 				if (_condition !is null)
27 					_condition.notifyAll();
28 			}
29 		} else {
30 			_closed = true;
31 		}
32 		if (_condition) {
33 			destroy(_condition);
34 			_condition = null;
35 		}
36 		if (_mutex) {
37 			destroy(_mutex);
38 			_mutex = null;
39 		}
40 	}
41 
42 	~this() {
43 		// TODO: destroy mutex?
44 		close();
45 	}
46 
47 	private void move() {
48 		if (_readPos > 1024 && _readPos > _buffer.length * 3 / 4) {
49 			// move buffer data
50 			for (int i = 0; _readPos + i < _writePos; i++)
51 				_buffer[i] = _buffer[_readPos + i];
52 			_writePos -= _readPos;
53 			_readPos = 0;
54 		}
55 	}
56 
57 	private void append(ref T item) {
58 		if (_writePos >= _buffer.length) {
59 			move();
60 			_buffer.length = _buffer.length == 0 ? 64 : _buffer.length * 2;
61 		}
62 		_buffer[_writePos++] = item;
63 	}
64 
65 	void put(T item) {
66 		if (_closed)
67 			return;
68 		synchronized(_mutex) {
69 			if (_closed)
70 				return;
71 			append(item);
72 			_condition.notifyAll();
73 		}
74 	}
75 
76 	void put(T[] items) {
77 		if (_closed)
78 			return;
79 		synchronized(_mutex) {
80 			if (_closed)
81 				return;
82 			foreach(ref item; items) {
83 				append(item);
84 			}
85 			_condition.notifyAll();
86 		}
87 	}
88 	
89 	bool get(ref T value, int timeoutMillis) {
90 		if (_closed)
91 			return false;
92 		synchronized(_mutex) {
93 			if (_closed)
94 				return false;
95 			if (_readPos < _writePos) {
96 				value = _buffer[_readPos++];
97 				return true;
98 			}
99 			if (timeoutMillis <= 0)
100 				_condition.wait(); // no timeout
101 			else if (!_condition.wait(dur!"msecs"(timeoutMillis)))
102 				return false; // timeout
103 			if (_readPos < _writePos) {
104 				value = _buffer[_readPos++];
105 				return true;
106 			}
107 		}
108 		return false;
109 	}
110 
111 	bool getAll(ref T[] values, int timeoutMillis) {
112 		if (_closed)
113 			return false;
114 		synchronized(_mutex) {
115 			if (_closed)
116 				return false;
117 			values.length = 0;
118 			while (_readPos < _writePos)
119 				values ~= _buffer[_readPos++];
120 			if (values.length > 0)
121 				return true;
122 			if (timeoutMillis <= 0)
123 				_condition.wait(); // no timeout
124 			else if (!_condition.wait(dur!"msecs"(timeoutMillis)))
125 				return false; // timeout
126 			while (_readPos < _writePos)
127 				values ~= _buffer[_readPos++];
128 			if (values.length > 0)
129 				return true;
130 		}
131 		return false;
132 	}
133 }