1 /**
2 Copyright: Copyright (c) 2016 Andrey Penechko.
3 License: $(WEB boost.org/LICENSE_1_0.txt, Boost License 1.0).
4 Authors: Andrey Penechko.
5 */
6 
7 // Based on Martin Nowak's lock-free package.
8 module voxelman.utils.sharedqueue;
9 
10 import core.atomic;
11 import core.thread : Thread;
12 import std.experimental.allocator.mallocator;
13 import std.experimental.logger;
14 
15 //version = DBG_QUEUE;
16 private enum PAGE_SIZE = 4096;
17 /// Single-producer single-consumer fixed size circular buffer queue.
18 shared struct SharedQueue {
19 	size_t capacity;
20 
21 	void alloc(string debugName = null, size_t _capacity = roundPow2(PAGE_SIZE)) shared {
22 		_debugName = debugName;
23 		capacity = _capacity;
24 		assert(capacity > 0, "Cannot have a capacity of 0.");
25 		assert(roundPow2(capacity) == capacity, "The capacity must be a power of 2");
26 		_data = cast(shared ubyte[])Mallocator.instance.allocate(capacity);
27 		assert(_data, "Cannot allocate memory for queue");
28 	}
29 
30 	void free() shared {
31 		Mallocator.instance.deallocate(cast(ubyte[])_data);
32 	}
33 
34 	@property bool empty() shared const {
35 		return !length;
36 	}
37 
38 	@property size_t length() shared const {
39 		return atomicLoad!(MemoryOrder.acq)(_writePos) - atomicLoad!(MemoryOrder.acq)(_readPos);
40 	}
41 
42 	@property size_t space() shared const {
43 		return capacity - length;
44 	}
45 
46 	@property bool full() shared const {
47 		return length == capacity;
48 	}
49 
50 	void pushItem(I)(I item) shared {
51 		immutable writePosition = atomicLoad!(MemoryOrder.acq)(_writePos);
52 		// space < I.sizeof
53 		while (capacity - writePosition + atomicLoad!(MemoryOrder.acq)(_readPos) < I.sizeof) {
54 			yield();
55 		}
56 		setItem(item, writePosition);
57 		atomicStore!(MemoryOrder.rel)(_writePos, writePosition + I.sizeof);
58 		version(DBG_QUEUE) printTrace!"pushItem"(item);
59 	}
60 
61 	I popItem(I)() shared {
62 		//static assert(I.sizeof <= capacity, "Item size is greater then capacity");
63 
64 		immutable pos = atomicLoad!(MemoryOrder.acq)(_readPos);
65 		I res;
66 		getItem(res, pos);
67 		atomicStore!(MemoryOrder.rel)(_readPos, pos + I.sizeof);
68 		version(DBG_QUEUE) printTrace!"popItem"(res);
69 		return res;
70 	}
71 
72 	void popItem(I)(out I item) shared {
73 		//static assert(I.sizeof <= capacity, "Item size is greater then capacity");
74 
75 		immutable pos = atomicLoad!(MemoryOrder.acq)(_readPos);
76 		getItem(item, pos);
77 		atomicStore!(MemoryOrder.rel)(_readPos, pos + I.sizeof);
78 		version(DBG_QUEUE) printTrace!"popItem"(item);
79 	}
80 
81 	I peekItem(I)() shared {
82 		//static assert(I.sizeof <= capacity, "Item size is greater then capacity");
83 
84 		immutable pos = atomicLoad!(MemoryOrder.acq)(_readPos);
85 		I res;
86 		getItem(res, pos);
87 		version(DBG_QUEUE) printTrace!"peekItem"(res);
88 		return res;
89 	}
90 
91 	void peekItem(I)(out I item) shared {
92 		//static assert(I.sizeof <= capacity, "Item size is greater then capacity");
93 
94 		immutable pos = atomicLoad!(MemoryOrder.acq)(_readPos);
95 		getItem(item, pos);
96 		version(DBG_QUEUE) printTrace!"peekItem"(item);
97 	}
98 
99 	void dropItem(I)() shared {
100 		//static assert(I.sizeof <= capacity, "Item size is greater then capacity");
101 
102 		immutable pos = atomicLoad!(MemoryOrder.acq)(_readPos);
103 		atomicStore!(MemoryOrder.rel)(_readPos, pos + I.sizeof);
104 		version(DBG_QUEUE) printTrace!"dropItem"();
105 	}
106 
107 	private void getItem(I)(out I item, const size_t at) shared const {
108 		//static assert(I.sizeof <= capacity, "Item size is greater then capacity");
109 		ubyte[] itemData = (*cast(ubyte[I.sizeof]*)&item);
110 
111 		size_t start = at & (capacity - 1);
112 		size_t end = (at + I.sizeof) & (capacity - 1);
113 		if (end > start)
114 		{
115 			//             item[0] v          v item[$]
116 			//         ...........|...item...|..........
117 			// data[0] ^     start ^          ^ end     ^ data[$]
118 			itemData[0..$] = _data[start..end];
119 		}
120 		else
121 		{
122 			//                 item[$] v       item[0] v
123 			//          |...itemEnd...|...............|...itemStart...|
124 			//  _data[0] ^         end ^         start ^               ^ _data[$]
125 			size_t firstPart = I.sizeof - end;
126 			itemData[0..firstPart] = _data[start..$];
127 			itemData[firstPart..$] = _data[0..end];
128 		}
129 	}
130 
131 	void setItem(I)(auto const ref I item, const size_t at) shared {
132 		//static assert(I.sizeof <= capacity, "Item size is greater then capacity");
133 		ubyte[] itemData = (*cast(ubyte[I.sizeof]*)&item);
134 
135 		size_t start = at & (capacity - 1);
136 		size_t end = (at + I.sizeof) & (capacity - 1);
137 		if (end > start)
138 		{
139 			//             item[0] v          v item[$]
140 			//         ...........|...item...|..........
141 			// data[0] ^     start ^          ^ end     ^ data[$]
142 			_data[start..end] = itemData[0..$];
143 		}
144 		else
145 		{
146 			//               item[$] v       item[0] v
147 			//        |...itemEnd...|...............|...itemStart...|
148 			// data[0] ^         end ^         start ^       data[$] ^
149 			size_t firstPart = I.sizeof - end;
150 			_data[start..$] = itemData[0..firstPart];
151 			_data[0..end] = itemData[firstPart..$];
152 		}
153 		atomicFence();
154 	}
155 
156 	// enter multipart message mode
157 	void startMessage() shared {
158 		_msgWritePos = atomicLoad!(MemoryOrder.acq)(_writePos);
159 		version(DBG_QUEUE) printTrace!"startMessage"();
160 	}
161 
162 	// exit multipart message mode
163 	void endMessage() shared {
164 		atomicStore!(MemoryOrder.rel)(_writePos, _msgWritePos);
165 		version(DBG_QUEUE) printTrace!"endMessage"();
166 	}
167 
168 	// skip to fill in later with setItem in multipart message mode
169 	size_t skipMessageItem(I)() shared {
170 		//static assert(I.sizeof <= capacity, "Item size is greater then capacity");
171 		size_t skippedItemPos = cast(size_t)_msgWritePos;
172 		// space < I.sizeof
173 		while (capacity - _msgWritePos + atomicLoad!(MemoryOrder.acq)(_readPos) < I.sizeof) {
174 			yield();
175 		}
176 		cast(size_t)_msgWritePos += I.sizeof;
177 		version(DBG_QUEUE) printTrace!"skipMessageItem"();
178 		return skippedItemPos;
179 	}
180 
181 	// can cause dead-lock if consumer is waiting for producer.
182 	// Make sure that there is enough space. Or else keep consuming
183 	// push in multipart message mode
184 	void pushMessagePart(I)(auto const ref I item) shared {
185 		//static assert(I.sizeof <= capacity, "Item size is greater then capacity");
186 		// space < I.sizeof
187 		while (capacity - _msgWritePos + atomicLoad!(MemoryOrder.acq)(_readPos) < I.sizeof) {
188 			yield();
189 		}
190 		setItem(item, _msgWritePos);
191 		cast(size_t)_msgWritePos += I.sizeof;
192 		version(DBG_QUEUE) printTrace!"pushMessagePart"(item);
193 	}
194 
195 	static void yield() {
196 		Thread.yield();
197 		//infof("yield");
198 	}
199 
200 private:
201 	size_t _msgWritePos;
202 	size_t _writePos;
203 	size_t _readPos;
204 	ubyte[] _data;
205 	string _debugName;
206 
207 	import std.concurrency : thisTid;
208 	void printTrace(string funname, D)(D data) {
209 		version(DBG_QUEUE) tracef("%s.%s."~funname~"(%s)\n\tmwp %s, wp %s, rp %s\n",
210 			thisTid, _debugName, data, _msgWritePos, cast(size_t)_writePos, cast(size_t)_readPos);
211 	}
212 	void printTrace(string funname)() {
213 		version(DBG_QUEUE) tracef("%s.%s."~funname~"()\n\tmwp %s, wp %s, rp %s\n",
214 			thisTid, _debugName, _msgWritePos, cast(size_t)_writePos, cast(size_t)_readPos);
215 	}
216 }
217 
218 size_t roundPow2(size_t v) {
219 	import core.bitop : bsr;
220 	return v ? cast(size_t)1 << bsr(v) : 0;
221 }