1 // Taken from https://github.com/MartinNowak/lock-free/blob/master/src/lock_free/rwqueue.d 2 module voxelman.utils.lockfreequeue; 3 4 import core.atomic; 5 6 /** 7 * A lock-free single-reader, single-writer FIFO queue. 8 */ 9 shared struct ProducerConsumerQueue(T, size_t capacity = roundPow2!(PAGE_SIZE / T.sizeof)) 10 { 11 static assert(capacity > 0, "Cannot have a capacity of 0."); 12 static assert(roundPow2!capacity == capacity, "The capacity must be a power of 2"); 13 14 @property size_t length() shared const 15 { 16 return atomicLoad!(MemoryOrder.acq)(_wpos) - atomicLoad!(MemoryOrder.acq)(_rpos); 17 } 18 19 @property bool empty() shared const 20 { 21 return !length; 22 } 23 24 @property bool full() const 25 { 26 return length == capacity; 27 } 28 29 void push(shared(T) t) 30 in { assert(!full); } 31 body 32 { 33 immutable pos = atomicLoad!(MemoryOrder.acq)(_wpos); 34 _data[pos & mask] = t; 35 atomicStore!(MemoryOrder.rel)(_wpos, pos + 1); 36 } 37 38 shared(T) pop() 39 in { assert(!empty); } 40 body 41 { 42 immutable pos = atomicLoad!(MemoryOrder.acq)(_rpos); 43 auto res = _data[pos & mask]; 44 atomicStore!(MemoryOrder.rel)(_rpos, pos + 1); 45 return res; 46 } 47 48 private: 49 // import std.algorithm; // move 50 51 enum mask = capacity - 1; 52 53 size_t _wpos; 54 size_t _rpos; 55 T[capacity] _data; 56 } 57 58 private: 59 60 enum PAGE_SIZE = 4096; 61 62 template roundPow2(size_t v) 63 { 64 import core.bitop : bsr; 65 enum roundPow2 = v ? cast(size_t)1 << bsr(v) : 0; 66 } 67 68 static assert(roundPow2!0 == 0); 69 static assert(roundPow2!3 == 2); 70 static assert(roundPow2!4 == 4); 71 72 version (unittest) 73 { 74 import core.thread, std.concurrency; 75 enum amount = 10_000; 76 77 void push(T)(ref shared(ProducerConsumerQueue!T) queue) 78 { 79 foreach (i; 0 .. amount) 80 { 81 while (queue.full) 82 Thread.yield(); 83 queue.push(cast(shared T)i); 84 } 85 } 86 87 void pop(T)(ref shared(ProducerConsumerQueue!T) queue) 88 { 89 foreach (i; 0 .. amount) 90 { 91 while (queue.empty) 92 Thread.yield(); 93 assert(queue.pop() == cast(shared T)i); 94 } 95 } 96 } 97 98 unittest 99 { 100 shared(ProducerConsumerQueue!size_t) queue; 101 auto t0 = new Thread({push(queue);}), 102 t1 = new Thread({pop(queue);}); 103 t0.start(); t1.start(); 104 t0.join(); t1.join(); 105 } 106 107 unittest 108 { 109 static struct Data { size_t i; } 110 shared(ProducerConsumerQueue!Data) queue; 111 auto t0 = new Thread({push(queue);}), 112 t1 = new Thread({pop(queue);}); 113 t0.start(); t1.start(); 114 t0.join(); t1.join(); 115 }