1 // Taken from https://github.com/MartinNowak/lock-free/blob/master/src/lock_free/rwqueue.d
2 module voxelman.utils.lockfreequeue;
3 
4 import core.atomic;
5 
6 /**
7  * A lock-free single-reader, single-writer FIFO queue.
8  */
9 shared struct ProducerConsumerQueue(T, size_t capacity = roundPow2!(PAGE_SIZE / T.sizeof))
10 {
11 	static assert(capacity > 0, "Cannot have a capacity of 0.");
12 	static assert(roundPow2!capacity == capacity, "The capacity must be a power of 2");
13 
14 	@property size_t length() shared const
15 	{
16 		return atomicLoad!(MemoryOrder.acq)(_wpos) - atomicLoad!(MemoryOrder.acq)(_rpos);
17 	}
18 
19 	@property bool empty() shared const
20 	{
21 		return !length;
22 	}
23 
24 	@property bool full() const
25 	{
26 		return length == capacity;
27 	}
28 
29 	void push(shared(T) t)
30 	in { assert(!full); }
31 	body
32 	{
33 		immutable pos = atomicLoad!(MemoryOrder.acq)(_wpos);
34 		_data[pos & mask] = t;
35 		atomicStore!(MemoryOrder.rel)(_wpos, pos + 1);
36 	}
37 
38 	shared(T) pop()
39 	in { assert(!empty); }
40 	body
41 	{
42 		immutable pos = atomicLoad!(MemoryOrder.acq)(_rpos);
43 		auto res = _data[pos & mask];
44 		atomicStore!(MemoryOrder.rel)(_rpos, pos + 1);
45 		return res;
46 	}
47 
48 private:
49 	//    import std.algorithm; // move
50 
51 	enum mask = capacity - 1;
52 
53 	size_t _wpos;
54 	size_t _rpos;
55 	T[capacity] _data;
56 }
57 
58 private:
59 
60 enum PAGE_SIZE = 4096;
61 
62 template roundPow2(size_t v)
63 {
64 	import core.bitop : bsr;
65 	enum roundPow2 = v ? cast(size_t)1 << bsr(v) : 0;
66 }
67 
68 static assert(roundPow2!0 == 0);
69 static assert(roundPow2!3 == 2);
70 static assert(roundPow2!4 == 4);
71 
72 version (unittest)
73 {
74 	import core.thread, std.concurrency;
75 	enum amount = 10_000;
76 
77 	void push(T)(ref shared(ProducerConsumerQueue!T) queue)
78 	{
79 		foreach (i; 0 .. amount)
80 		{
81 			while (queue.full)
82 				Thread.yield();
83 			queue.push(cast(shared T)i);
84 		}
85 	}
86 
87 	void pop(T)(ref shared(ProducerConsumerQueue!T) queue)
88 	{
89 		foreach (i; 0 .. amount)
90 		{
91 			while (queue.empty)
92 				Thread.yield();
93 			assert(queue.pop() == cast(shared T)i);
94 		}
95 	}
96 }
97 
98 unittest
99 {
100 	shared(ProducerConsumerQueue!size_t) queue;
101 	auto t0 = new Thread({push(queue);}),
102 		t1 = new Thread({pop(queue);});
103 	t0.start(); t1.start();
104 	t0.join(); t1.join();
105 }
106 
107 unittest
108 {
109 	static struct Data { size_t i; }
110 	shared(ProducerConsumerQueue!Data) queue;
111 	auto t0 = new Thread({push(queue);}),
112 		t1 = new Thread({pop(queue);});
113 	t0.start(); t1.start();
114 	t0.join(); t1.join();
115 }