1 /**
2 Copyright: Copyright (c) 2014-2016 Andrey Penechko.
3 License: $(WEB boost.org/LICENSE_1_0.txt, Boost License 1.0).
4 Authors: Andrey Penechko.
5 */
6 module voxelman.world.storage.storageworker;
7 
8 import std.experimental.logger;
9 import std.conv : to;
10 import std.datetime : MonoTime, Duration, usecs, dur, seconds;
11 import core.atomic;
12 import core.sync.condition;
13 
14 import cbor;
15 
16 import voxelman.block.utils;
17 import voxelman.world.gen.generators;
18 import voxelman.world.gen.utils;
19 import voxelman.core.config;
20 import voxelman.utils.compression;
21 import voxelman.utils.worker;
22 import voxelman.world.storage.chunk;
23 import voxelman.world.storage.chunkprovider;
24 import voxelman.world.storage.coordinates;
25 import voxelman.world.worlddb;
26 
27 
28 struct TimeMeasurer
29 {
30 	TimeMeasurer* nested;
31 	TimeMeasurer* next;
32 	MonoTime startTime;
33 	Duration takenTime;
34 	string taskName;
35 	bool wasRun = false;
36 
37 	void reset()
38 	{
39 		wasRun = false;
40 		takenTime = Duration.zero;
41 		if (nested) nested.reset();
42 		if (next) next.reset();
43 	}
44 
45 	void startTaskTiming(string name)
46 	{
47 		taskName = name;
48 		startTime = MonoTime.currTime;
49 	}
50 
51 	void endTaskTiming()
52 	{
53 		wasRun = true;
54 		takenTime = MonoTime.currTime - startTime;
55 	}
56 
57 	void printTime(bool isNested = false)
58 	{
59 		//int seconds; short msecs; short usecs;
60 		//takenTime.split!("seconds", "msecs", "usecs")(seconds, msecs, usecs);
61 		//if (msecs > 10 || seconds > 0 || isNested)
62 		//{
63 		//	if (wasRun)
64 		//		tracef("%s%s %s.%s,%ss", isNested?"  ":"", taskName, seconds, msecs, usecs);
65 		//	if (nested) nested.printTime(true);
66 		//	if (next) next.printTime(isNested);
67 		//}
68 	}
69 }
70 
71 struct GenWorkerControl
72 {
73 	this(shared Worker[] genWorkers_)
74 	{
75 		genWorkers = genWorkers_;
76 		queueLengths.length = genWorkers.length;
77 	}
78 
79 	shared Worker[] genWorkers;
80 
81 	// last worker, we sent work to.
82 	size_t lastWorker;
83 	// last work item.
84 	ChunkWorldPos lastCwp;
85 	static struct QLen {size_t i; size_t len;}
86 	QLen[] queueLengths;
87 
88 	// returns worker with smallest queue.
89 	size_t getWorker()
90 	{
91 		import std.algorithm : sort;
92 		foreach(i; 0..genWorkers.length)
93 		{
94 			queueLengths[i].i = i;
95 			queueLengths[i].len = genWorkers[i].taskQueue.length;
96 		}
97 		sort!((a,b) => a.len < b.len)(queueLengths);// balance worker queues
98 		return queueLengths[0].i;
99 	}
100 
101 	// Sends chunks with the same x and z to the same worker.
102 	// There is thread local heightmap cache.
103 	void sendGenTask(ulong cwp)
104 	{
105 		auto _cwp = ChunkWorldPos(cwp);
106 		size_t workerIndex;
107 		// send task from the same chunk column
108 		// to the same worker to improve cache hit rate
109 		if (_cwp.x == lastCwp.x && _cwp.z == lastCwp.z)
110 		{
111 			workerIndex = lastWorker;
112 		}
113 		else
114 		{
115 			workerIndex = getWorker();
116 		}
117 		shared(Worker)* worker = &genWorkers[workerIndex];
118 		worker.taskQueue.pushItem!ulong(cwp);
119 		worker.taskQueue.pushItem(generators[_cwp.w % $]);
120 		worker.notify();
121 		lastWorker = workerIndex;
122 		lastCwp = _cwp;
123 	}
124 }
125 
126 //version = DBG_OUT;
127 //version = DBG_COMPR;
128 void storageWorker(
129 			immutable WorldDb _worldDb,
130 			shared bool* workerRunning,
131 			shared Mutex workAvaliableMutex,
132 			shared Condition workAvaliable,
133 			shared SharedQueue* loadResQueue,
134 			shared SharedQueue* saveResQueue,
135 			shared SharedQueue* loadTaskQueue,
136 			shared SharedQueue* saveTaskQueue,
137 			shared Worker[] genWorkers,
138 			)
139 {
140 	version(DBG_OUT)infof("Storage worker started");
141 	infof("genWorkers.length %s", genWorkers.length);
142 	try
143 	{
144 	ubyte[] compressBuffer = new ubyte[](4096*16);
145 	ubyte[] buffer = new ubyte[](4096*16);
146 	WorldDb worldDb = cast(WorldDb)_worldDb;
147 	scope(exit) worldDb.close();
148 
149 	TimeMeasurer taskTime;
150 	TimeMeasurer workTime;
151 	TimeMeasurer readTime;
152 	taskTime.nested = &readTime;
153 	readTime.next = &workTime;
154 
155 	auto workerControl = GenWorkerControl(genWorkers);
156 
157 	void writeChunk()
158 	{
159 		taskTime.reset();
160 		taskTime.startTaskTiming("WR");
161 
162 		ChunkHeaderItem header = saveTaskQueue.popItem!ChunkHeaderItem();
163 
164 		saveResQueue.startMessage();
165 		saveResQueue.pushMessagePart(header);
166 		try
167 		{
168 			size_t encodedSize = encodeCbor(buffer[], header.numLayers);
169 
170 			foreach(_; 0..header.numLayers)
171 			{
172 				ChunkLayerItem layer = saveTaskQueue.popItem!ChunkLayerItem();
173 
174 				encodedSize += encodeCbor(buffer[encodedSize..$], layer.timestamp);
175 				encodedSize += encodeCbor(buffer[encodedSize..$], layer.layerId);
176 				encodedSize += encodeCbor(buffer[encodedSize..$], layer.metadata);
177 				if (layer.type == StorageType.uniform)
178 				{
179 					encodedSize += encodeCbor(buffer[encodedSize..$], StorageType.uniform);
180 					encodedSize += encodeCbor(buffer[encodedSize..$], layer.uniformData);
181 					encodedSize += encodeCbor(buffer[encodedSize..$], layer.dataLength);
182 				}
183 				else if (layer.type == StorageType.fullArray)
184 				{
185 					encodedSize += encodeCbor(buffer[encodedSize..$], StorageType.compressedArray);
186 					ubyte[] compactBlocks = compressLayerData(layer.getArray!ubyte, compressBuffer);
187 					encodedSize += encodeCbor(buffer[encodedSize..$], compactBlocks);
188 					version(DBG_COMPR)infof("Store1 %s %s %s\n(%(%02x%))", header.cwp, compactBlocks.ptr, compactBlocks.length, cast(ubyte[])compactBlocks);
189 				}
190 				else if (layer.type == StorageType.compressedArray)
191 				{
192 					encodedSize += encodeCbor(buffer[encodedSize..$], StorageType.compressedArray);
193 					ubyte[] compactBlocks = layer.getArray!ubyte;
194 					encodedSize += encodeCbor(buffer[encodedSize..$], compactBlocks);
195 					version(DBG_COMPR)infof("Store2 %s %s %s\n(%(%02x%))", header.cwp, compactBlocks.ptr, compactBlocks.length, cast(ubyte[])compactBlocks);
196 				}
197 
198 				saveResQueue.pushMessagePart(ChunkLayerTimestampItem(layer.timestamp, layer.layerId));
199 			}
200 
201 			worldDb.putPerChunkValue(header.cwp.asUlong, buffer[0..encodedSize]);
202 		}
203 		catch(Exception e) errorf("storage exception %s", e.to!string);
204 		saveResQueue.endMessage();
205 		taskTime.endTaskTiming();
206 		taskTime.printTime();
207 		version(DBG_OUT)infof("task save %s", header.cwp);
208 	}
209 
210 	void readChunk()
211 	{
212 		taskTime.reset();
213 		taskTime.startTaskTiming("RD");
214 		bool doGen;
215 
216 		ulong cwp = loadTaskQueue.popItem!ulong();
217 
218 		try
219 		{
220 			readTime.startTaskTiming("getPerChunkValue");
221 			ubyte[] cborData = worldDb.getPerChunkValue(cwp);
222 			readTime.endTaskTiming();
223 			//scope(exit) worldDb.perChunkSelectStmt.reset();
224 
225 			if (cborData !is null)
226 			{
227 				workTime.startTaskTiming("decode");
228 				ubyte numLayers = decodeCborSingle!ubyte(cborData);
229 				// TODO check numLayers <= ubyte.max
230 				loadResQueue.startMessage();
231 				loadResQueue.pushMessagePart(ChunkHeaderItem(ChunkWorldPos(cwp), cast(ubyte)numLayers, 0));
232 				foreach(_; 0..numLayers)
233 				{
234 					auto timestamp = decodeCborSingle!TimestampType(cborData);
235 					auto layerId = decodeCborSingle!ubyte(cborData);
236 					auto metadata = decodeCborSingle!ushort(cborData);
237 					auto type = decodeCborSingle!StorageType(cborData);
238 
239 					if (type == StorageType.uniform)
240 					{
241 						ulong uniformData = decodeCborSingle!ulong(cborData);
242 						auto dataLength = decodeCborSingle!LayerDataLenType(cborData);
243 						loadResQueue.pushMessagePart(ChunkLayerItem(StorageType.uniform, layerId, dataLength, timestamp, uniformData, metadata));
244 					}
245 					else
246 					{
247 						import core.memory : GC;
248 						assert(type == StorageType.compressedArray);
249 						ubyte[] compactBlocks = decodeCborSingle!(ubyte[])(cborData);
250 						compactBlocks = compactBlocks.dup;
251 						LayerDataLenType dataLength = cast(LayerDataLenType)compactBlocks.length;
252 						ubyte* data = cast(ubyte*)compactBlocks.ptr;
253 
254 						// Add root to data.
255 						// Data can be collected by GC if no-one is referencing it.
256 						// It is needed to pass array trough shared queue.
257 						GC.addRoot(data); // TODO remove when moved to non-GC allocator
258 						version(DBG_COMPR)infof("Load %s L %s C (%(%02x%))", ChunkWorldPos(cwp), compactBlocks.length, cast(ubyte[])compactBlocks);
259 						loadResQueue.pushMessagePart(ChunkLayerItem(StorageType.compressedArray, layerId, dataLength, timestamp, data, metadata));
260 					}
261 				}
262 				loadResQueue.endMessage();
263 				// if (cborData.length > 0) error; TODO
264 				workTime.endTaskTiming();
265 			}
266 			else doGen = true;
267 		}
268 		catch(Exception e) {
269 			infof("storage exception %s regenerating %s", e.to!string, ChunkWorldPos(cwp));
270 			doGen = true;
271 		}
272 		if (doGen) {
273 			workerControl.sendGenTask(cwp);
274 		}
275 		taskTime.endTaskTiming();
276 		taskTime.printTime();
277 		version(DBG_OUT)infof("task load %s", ChunkWorldPos(cwp));
278 	}
279 
280 	uint numReceived;
281 	MonoTime frameStart = MonoTime.currTime;
282 	size_t prevReceived = size_t.max;
283 	while (*atomicLoad!(MemoryOrder.acq)(workerRunning))
284 	{
285 		synchronized (workAvaliableMutex)
286 		{
287 			(cast(Condition)workAvaliable).wait();
288 		}
289 
290 		worldDb.beginTxn();
291 		while (!loadTaskQueue.empty)
292 		{
293 			readChunk();
294 			++numReceived;
295 		}
296 		worldDb.abortTxn();
297 
298 		worldDb.beginTxn();
299 		while (!saveTaskQueue.empty)
300 		{
301 			auto type = saveTaskQueue.popItem!SaveItemType();
302 			final switch(type) {
303 				case SaveItemType.chunk:
304 					writeChunk();
305 					++numReceived;
306 					break;
307 				case SaveItemType.saveHandler:
308 					IoHandler ioHandler = saveTaskQueue.popItem!IoHandler();
309 					ioHandler(worldDb);
310 					break;
311 			}
312 		}
313 		worldDb.commitTxn();
314 
315 		if (prevReceived != numReceived)
316 			version(DBG_OUT)infof("Storage worker running %s %s", numReceived, *atomicLoad(workerRunning));
317 		prevReceived = numReceived;
318 
319 		auto now = MonoTime.currTime;
320 		auto dur = now - frameStart;
321 		if (dur > 3.seconds) {
322 			//infof("Storage update");
323 			frameStart = now;
324 		}
325 	}
326 	}
327 	catch(Throwable t)
328 	{
329 		infof("%s from storage worker", t.to!string);
330 		throw t;
331 	}
332 	version(DBG_OUT)infof("Storage worker stopped (%s, %s)", numReceived, *atomicLoad(workerRunning));
333 }