1 /** 2 Copyright: Copyright (c) 2016 Andrey Penechko. 3 License: $(WEB boost.org/LICENSE_1_0.txt, Boost License 1.0). 4 Authors: Andrey Penechko. 5 */ 6 7 // Based on Martin Nowak's lock-free package. 8 module voxelman.utils.sharedqueue; 9 10 import core.atomic; 11 import core.thread : Thread; 12 import std.experimental.allocator.mallocator; 13 import std.experimental.logger; 14 15 //version = DBG_QUEUE; 16 private enum PAGE_SIZE = 4096; 17 /// Single-producer single-consumer fixed size circular buffer queue. 18 shared struct SharedQueue { 19 size_t capacity; 20 21 void alloc(string debugName = null, size_t _capacity = roundPow2(PAGE_SIZE)) shared { 22 _debugName = debugName; 23 capacity = _capacity; 24 assert(capacity > 0, "Cannot have a capacity of 0."); 25 assert(roundPow2(capacity) == capacity, "The capacity must be a power of 2"); 26 _data = cast(shared ubyte[])Mallocator.instance.allocate(capacity); 27 assert(_data, "Cannot allocate memory for queue"); 28 } 29 30 void free() shared { 31 Mallocator.instance.deallocate(cast(ubyte[])_data); 32 } 33 34 @property bool empty() shared const { 35 return !length; 36 } 37 38 @property size_t length() shared const { 39 return atomicLoad!(MemoryOrder.acq)(_writePos) - atomicLoad!(MemoryOrder.acq)(_readPos); 40 } 41 42 @property size_t space() shared const { 43 return capacity - length; 44 } 45 46 @property bool full() shared const { 47 return length == capacity; 48 } 49 50 void pushItem(I)(I item) shared { 51 immutable writePosition = atomicLoad!(MemoryOrder.acq)(_writePos); 52 // space < I.sizeof 53 while (capacity - writePosition + atomicLoad!(MemoryOrder.acq)(_readPos) < I.sizeof) { 54 yield(); 55 } 56 setItem(item, writePosition); 57 atomicStore!(MemoryOrder.rel)(_writePos, writePosition + I.sizeof); 58 version(DBG_QUEUE) printTrace!"pushItem"(item); 59 } 60 61 I popItem(I)() shared { 62 //static assert(I.sizeof <= capacity, "Item size is greater then capacity"); 63 64 immutable pos = atomicLoad!(MemoryOrder.acq)(_readPos); 65 I res; 66 getItem(res, pos); 67 atomicStore!(MemoryOrder.rel)(_readPos, pos + I.sizeof); 68 version(DBG_QUEUE) printTrace!"popItem"(res); 69 return res; 70 } 71 72 void popItem(I)(out I item) shared { 73 //static assert(I.sizeof <= capacity, "Item size is greater then capacity"); 74 75 immutable pos = atomicLoad!(MemoryOrder.acq)(_readPos); 76 getItem(item, pos); 77 atomicStore!(MemoryOrder.rel)(_readPos, pos + I.sizeof); 78 version(DBG_QUEUE) printTrace!"popItem"(item); 79 } 80 81 I peekItem(I)() shared { 82 //static assert(I.sizeof <= capacity, "Item size is greater then capacity"); 83 84 immutable pos = atomicLoad!(MemoryOrder.acq)(_readPos); 85 I res; 86 getItem(res, pos); 87 version(DBG_QUEUE) printTrace!"peekItem"(res); 88 return res; 89 } 90 91 void peekItem(I)(out I item) shared { 92 //static assert(I.sizeof <= capacity, "Item size is greater then capacity"); 93 94 immutable pos = atomicLoad!(MemoryOrder.acq)(_readPos); 95 getItem(item, pos); 96 version(DBG_QUEUE) printTrace!"peekItem"(item); 97 } 98 99 void dropItem(I)() shared { 100 //static assert(I.sizeof <= capacity, "Item size is greater then capacity"); 101 102 immutable pos = atomicLoad!(MemoryOrder.acq)(_readPos); 103 atomicStore!(MemoryOrder.rel)(_readPos, pos + I.sizeof); 104 version(DBG_QUEUE) printTrace!"dropItem"(); 105 } 106 107 private void getItem(I)(out I item, const size_t at) shared const { 108 //static assert(I.sizeof <= capacity, "Item size is greater then capacity"); 109 ubyte[] itemData = (*cast(ubyte[I.sizeof]*)&item); 110 111 size_t start = at & (capacity - 1); 112 size_t end = (at + I.sizeof) & (capacity - 1); 113 if (end > start) 114 { 115 // item[0] v v item[$] 116 // ...........|...item...|.......... 117 // data[0] ^ start ^ ^ end ^ data[$] 118 itemData[0..$] = _data[start..end]; 119 } 120 else 121 { 122 // item[$] v item[0] v 123 // |...itemEnd...|...............|...itemStart...| 124 // _data[0] ^ end ^ start ^ ^ _data[$] 125 size_t firstPart = I.sizeof - end; 126 itemData[0..firstPart] = _data[start..$]; 127 itemData[firstPart..$] = _data[0..end]; 128 } 129 } 130 131 void setItem(I)(auto const ref I item, const size_t at) shared { 132 //static assert(I.sizeof <= capacity, "Item size is greater then capacity"); 133 ubyte[] itemData = (*cast(ubyte[I.sizeof]*)&item); 134 135 size_t start = at & (capacity - 1); 136 size_t end = (at + I.sizeof) & (capacity - 1); 137 if (end > start) 138 { 139 // item[0] v v item[$] 140 // ...........|...item...|.......... 141 // data[0] ^ start ^ ^ end ^ data[$] 142 _data[start..end] = itemData[0..$]; 143 } 144 else 145 { 146 // item[$] v item[0] v 147 // |...itemEnd...|...............|...itemStart...| 148 // data[0] ^ end ^ start ^ data[$] ^ 149 size_t firstPart = I.sizeof - end; 150 _data[start..$] = itemData[0..firstPart]; 151 _data[0..end] = itemData[firstPart..$]; 152 } 153 atomicFence(); 154 } 155 156 // enter multipart message mode 157 void startMessage() shared { 158 _msgWritePos = atomicLoad!(MemoryOrder.acq)(_writePos); 159 version(DBG_QUEUE) printTrace!"startMessage"(); 160 } 161 162 // exit multipart message mode 163 void endMessage() shared { 164 atomicStore!(MemoryOrder.rel)(_writePos, _msgWritePos); 165 version(DBG_QUEUE) printTrace!"endMessage"(); 166 } 167 168 // skip to fill in later with setItem in multipart message mode 169 size_t skipMessageItem(I)() shared { 170 //static assert(I.sizeof <= capacity, "Item size is greater then capacity"); 171 size_t skippedItemPos = cast(size_t)_msgWritePos; 172 // space < I.sizeof 173 while (capacity - _msgWritePos + atomicLoad!(MemoryOrder.acq)(_readPos) < I.sizeof) { 174 yield(); 175 } 176 cast(size_t)_msgWritePos += I.sizeof; 177 version(DBG_QUEUE) printTrace!"skipMessageItem"(); 178 return skippedItemPos; 179 } 180 181 // can cause dead-lock if consumer is waiting for producer. 182 // Make sure that there is enough space. Or else keep consuming 183 // push in multipart message mode 184 void pushMessagePart(I)(auto const ref I item) shared { 185 //static assert(I.sizeof <= capacity, "Item size is greater then capacity"); 186 // space < I.sizeof 187 while (capacity - _msgWritePos + atomicLoad!(MemoryOrder.acq)(_readPos) < I.sizeof) { 188 yield(); 189 } 190 setItem(item, _msgWritePos); 191 cast(size_t)_msgWritePos += I.sizeof; 192 version(DBG_QUEUE) printTrace!"pushMessagePart"(item); 193 } 194 195 static void yield() { 196 Thread.yield(); 197 //infof("yield"); 198 } 199 200 private: 201 size_t _msgWritePos; 202 size_t _writePos; 203 size_t _readPos; 204 ubyte[] _data; 205 string _debugName; 206 207 import std.concurrency : thisTid; 208 void printTrace(string funname, D)(D data) { 209 version(DBG_QUEUE) tracef("%s.%s."~funname~"(%s)\n\tmwp %s, wp %s, rp %s\n", 210 thisTid, _debugName, data, _msgWritePos, cast(size_t)_writePos, cast(size_t)_readPos); 211 } 212 void printTrace(string funname)() { 213 version(DBG_QUEUE) tracef("%s.%s."~funname~"()\n\tmwp %s, wp %s, rp %s\n", 214 thisTid, _debugName, _msgWritePos, cast(size_t)_writePos, cast(size_t)_readPos); 215 } 216 } 217 218 size_t roundPow2(size_t v) { 219 import core.bitop : bsr; 220 return v ? cast(size_t)1 << bsr(v) : 0; 221 }