1 module ddebug.common.queue; 2 3 import core.sync.condition; 4 import core.sync.mutex; 5 6 class BlockingQueue(T) { 7 8 private Mutex _mutex; 9 private Condition _condition; 10 private T[] _buffer; 11 private int _readPos; 12 private int _writePos; 13 private shared bool _closed; 14 15 this() { 16 _mutex = new Mutex(); 17 _condition = new Condition(_mutex); 18 _readPos = 0; 19 _writePos = 0; 20 } 21 22 void close() { 23 if (_mutex && !_closed) { 24 synchronized(_mutex) { 25 _closed = true; 26 if (_condition !is null) 27 _condition.notifyAll(); 28 } 29 } else { 30 _closed = true; 31 } 32 if (_condition) { 33 destroy(_condition); 34 _condition = null; 35 } 36 if (_mutex) { 37 destroy(_mutex); 38 _mutex = null; 39 } 40 } 41 42 ~this() { 43 // TODO: destroy mutex? 44 close(); 45 } 46 47 private void move() { 48 if (_readPos > 1024 && _readPos > _buffer.length * 3 / 4) { 49 // move buffer data 50 for (int i = 0; _readPos + i < _writePos; i++) 51 _buffer[i] = _buffer[_readPos + i]; 52 _writePos -= _readPos; 53 _readPos = 0; 54 } 55 } 56 57 private void append(ref T item) { 58 if (_writePos >= _buffer.length) { 59 move(); 60 _buffer.length = _buffer.length == 0 ? 64 : _buffer.length * 2; 61 } 62 _buffer[_writePos++] = item; 63 } 64 65 void put(T item) { 66 if (_closed) 67 return; 68 synchronized(_mutex) { 69 if (_closed) 70 return; 71 append(item); 72 _condition.notifyAll(); 73 } 74 } 75 76 void put(T[] items) { 77 if (_closed) 78 return; 79 synchronized(_mutex) { 80 if (_closed) 81 return; 82 foreach(ref item; items) { 83 append(item); 84 } 85 _condition.notifyAll(); 86 } 87 } 88 89 bool get(ref T value, int timeoutMillis) { 90 if (_closed) 91 return false; 92 synchronized(_mutex) { 93 if (_closed) 94 return false; 95 if (_readPos < _writePos) { 96 value = _buffer[_readPos++]; 97 return true; 98 } 99 if (timeoutMillis <= 0) 100 _condition.wait(); // no timeout 101 else if (!_condition.wait(dur!"msecs"(timeoutMillis))) 102 return false; // timeout 103 if (_readPos < _writePos) { 104 value = _buffer[_readPos++]; 105 return true; 106 } 107 } 108 return false; 109 } 110 111 bool getAll(ref T[] values, int timeoutMillis) { 112 if (_closed) 113 return false; 114 synchronized(_mutex) { 115 if (_closed) 116 return false; 117 values.length = 0; 118 while (_readPos < _writePos) 119 values ~= _buffer[_readPos++]; 120 if (values.length > 0) 121 return true; 122 if (timeoutMillis <= 0) 123 _condition.wait(); // no timeout 124 else if (!_condition.wait(dur!"msecs"(timeoutMillis))) 125 return false; // timeout 126 while (_readPos < _writePos) 127 values ~= _buffer[_readPos++]; 128 if (values.length > 0) 129 return true; 130 } 131 return false; 132 } 133 }