1 /**
2 Copyright: Copyright (c) 2014-2017 Andrey Penechko.
3 License: $(WEB boost.org/LICENSE_1_0.txt, Boost License 1.0).
4 Authors: Andrey Penechko.
5 */
6 module voxelman.world.storage.storageworker;
7 
8 import voxelman.log;
9 import voxelman.container.sharedhashset;
10 import std.conv : to;
11 import std.datetime : MonoTime, Duration, usecs, dur, seconds;
12 import core.atomic;
13 import core.sync.condition;
14 
15 import cbor;
16 
17 import voxelman.world.block;
18 import voxelman.core.config;
19 import voxelman.utils.compression;
20 import voxelman.utils.worker;
21 import voxelman.world.gen.generator : IGenerator;
22 import voxelman.world.gen.utils;
23 import voxelman.world.storage.chunk;
24 import voxelman.world.storage.chunkprovider;
25 import voxelman.world.storage.coordinates;
26 import voxelman.world.worlddb;
27 
28 
29 struct TimeMeasurer
30 {
31 	TimeMeasurer* nested;
32 	TimeMeasurer* next;
33 	MonoTime startTime;
34 	Duration takenTime;
35 	string taskName;
36 	bool wasRun = false;
37 
38 	void reset()
39 	{
40 		wasRun = false;
41 		takenTime = Duration.zero;
42 		if (nested) nested.reset();
43 		if (next) next.reset();
44 	}
45 
46 	void startTaskTiming(string name)
47 	{
48 		taskName = name;
49 		startTime = MonoTime.currTime;
50 	}
51 
52 	void endTaskTiming()
53 	{
54 		wasRun = true;
55 		takenTime = MonoTime.currTime - startTime;
56 	}
57 
58 	void printTime(bool isNested = false)
59 	{
60 		//int seconds; short msecs; short usecs;
61 		//takenTime.split!("seconds", "msecs", "usecs")(seconds, msecs, usecs);
62 		//if (msecs > 10 || seconds > 0 || isNested)
63 		//{
64 		//	if (wasRun)
65 		//		tracef("%s%s %s.%s,%ss", isNested?"  ":"", taskName, seconds, msecs, usecs);
66 		//	if (nested) nested.printTime(true);
67 		//	if (next) next.printTime(isNested);
68 		//}
69 	}
70 }
71 
72 struct GenWorkerControl
73 {
74 	this(shared Worker[] genWorkers_)
75 	{
76 		genWorkers = genWorkers_;
77 		queueLengths.length = genWorkers.length;
78 	}
79 
80 	shared Worker[] genWorkers;
81 
82 	// last worker, we sent work to.
83 	size_t lastWorker;
84 	// last work item.
85 	ChunkWorldPos lastCwp;
86 	static struct QLen {size_t i; size_t len;}
87 	QLen[] queueLengths;
88 
89 	// returns worker with smallest queue.
90 	size_t getWorker()
91 	{
92 		import std.algorithm : sort;
93 		foreach(i; 0..genWorkers.length)
94 		{
95 			queueLengths[i].i = i;
96 			queueLengths[i].len = genWorkers[i].taskQueue.length;
97 		}
98 		sort!((a,b) => a.len < b.len)(queueLengths);// balance worker queues
99 		return queueLengths[0].i;
100 	}
101 
102 	// Sends chunks with the same x and z to the same worker.
103 	// There is thread local heightmap cache.
104 	void sendGenTask(TaskId taskId, ulong cwp, IGenerator generator)
105 	{
106 		auto _cwp = ChunkWorldPos(cwp);
107 		size_t workerIndex;
108 		// send task from the same chunk column
109 		// to the same worker to improve cache hit rate
110 		if (_cwp.x == lastCwp.x && _cwp.z == lastCwp.z)
111 		{
112 			workerIndex = lastWorker;
113 		}
114 		else
115 		{
116 			workerIndex = getWorker();
117 		}
118 		shared(Worker)* worker = &genWorkers[workerIndex];
119 		worker.taskQueue.startMessage();
120 		worker.taskQueue.pushMessagePart(taskId);
121 		worker.taskQueue.pushMessagePart!ulong(cwp);
122 		worker.taskQueue.pushMessagePart(generator);
123 		worker.taskQueue.endMessage();
124 		worker.notify();
125 		lastWorker = workerIndex;
126 		lastCwp = _cwp;
127 	}
128 }
129 
130 void sendEmptyChunk(TaskId taskId, shared SharedQueue* queue, ulong cwp)
131 {
132 	queue.startMessage();
133 	queue.pushMessagePart(taskId);
134 	queue.pushMessagePart(ChunkHeaderItem(ChunkWorldPos(cwp), 1/*numLayers*/, 0));
135 	queue.pushMessagePart(ChunkLayerItem(BLOCK_LAYER,
136 		BLOCKID_UNIFORM_FILL_BITS/*dataLength*/, 0/*timestamp*/, 0/*uniformData*/,
137 		SOLID_CHUNK_METADATA));
138 	queue.endMessage();
139 }
140 
141 //version = DBG_OUT;
142 //version = DBG_COMPR;
143 void storageWorker(
144 			immutable WorldDb _worldDb,
145 			shared bool* workerRunning,
146 			shared Mutex workAvaliableMutex,
147 			shared Condition workAvaliable,
148 			shared SharedQueue* loadResQueue,
149 			shared SharedQueue* saveResQueue,
150 			shared SharedQueue* loadTaskQueue,
151 			shared SharedQueue* saveTaskQueue,
152 			shared SharedHashSet!TaskId canceledTasks,
153 			shared Worker[] genWorkers,
154 			)
155 {
156 	version(DBG_OUT) infof("Storage worker started");
157 	version(DBG_OUT) infof("genWorkers.length %s", genWorkers.length);
158 	try
159 	{
160 	ubyte[] compressBuffer = new ubyte[](4096*16);
161 	ubyte[] buffer = new ubyte[](4096*16);
162 	WorldDb worldDb = cast(WorldDb)_worldDb;
163 	scope(exit) worldDb.close();
164 
165 	TimeMeasurer taskTime;
166 	TimeMeasurer workTime;
167 	TimeMeasurer readTime;
168 	taskTime.nested = &readTime;
169 	readTime.next = &workTime;
170 
171 	auto workerControl = GenWorkerControl(genWorkers);
172 	bool genEnabled = genWorkers.length > 0;
173 
174 	void writeChunk()
175 	{
176 		taskTime.reset();
177 		taskTime.startTaskTiming("WR");
178 
179 		ChunkHeaderItem header = saveTaskQueue.popItem!ChunkHeaderItem();
180 
181 		saveResQueue.startMessage();
182 		saveResQueue.pushMessagePart(header);
183 		try
184 		{
185 			size_t encodedSize = encodeCbor(buffer[], header.numLayers);
186 
187 			foreach(_; 0..header.numLayers)
188 			{
189 				ChunkLayerItem layer = saveTaskQueue.popItem!ChunkLayerItem();
190 
191 				encodedSize += encodeCbor(buffer[encodedSize..$], layer.timestamp);
192 				encodedSize += encodeCbor(buffer[encodedSize..$], layer.layerId);
193 				encodedSize += encodeCbor(buffer[encodedSize..$], layer.metadata);
194 				if (layer.type == StorageType.uniform)
195 				{
196 					encodedSize += encodeCbor(buffer[encodedSize..$], StorageType.uniform);
197 					encodedSize += encodeCbor(buffer[encodedSize..$], layer.uniformData);
198 					encodedSize += encodeCbor(buffer[encodedSize..$], layer.dataLength);
199 				}
200 				else if (layer.type == StorageType.fullArray)
201 				{
202 					encodedSize += encodeCbor(buffer[encodedSize..$], StorageType.compressedArray);
203 					ubyte[] compactBlocks = compressLayerData(layer.getArray!ubyte, compressBuffer);
204 					encodedSize += encodeCbor(buffer[encodedSize..$], compactBlocks);
205 					version(DBG_COMPR)infof("Store1 %s %s %s\n(%(%02x%))", header.cwp, compactBlocks.ptr, compactBlocks.length, cast(ubyte[])compactBlocks);
206 				}
207 				else if (layer.type == StorageType.compressedArray)
208 				{
209 					encodedSize += encodeCbor(buffer[encodedSize..$], StorageType.compressedArray);
210 					ubyte[] compactBlocks = layer.getArray!ubyte;
211 					encodedSize += encodeCbor(buffer[encodedSize..$], compactBlocks);
212 					version(DBG_COMPR)infof("Store2 %s %s %s\n(%(%02x%))", header.cwp, compactBlocks.ptr, compactBlocks.length, cast(ubyte[])compactBlocks);
213 				}
214 
215 				saveResQueue.pushMessagePart(ChunkLayerTimestampItem(layer.timestamp, layer.layerId));
216 			}
217 
218 			//infof("S write %s (%(%02x%)) %s", header.cwp, header.cwp.asUlong.formChunkKey, encodedSize);
219 			worldDb.put(header.cwp.asUlong.formChunkKey, buffer[0..encodedSize]);
220 		}
221 		catch(Exception e) errorf("storage exception %s", e.to!string);
222 		saveResQueue.endMessage();
223 		taskTime.endTaskTiming();
224 		taskTime.printTime();
225 		version(DBG_OUT)infof("task save %s", header.cwp);
226 	}
227 
228 	void readChunk()
229 	{
230 		taskTime.reset();
231 		taskTime.startTaskTiming("RD");
232 		scope(exit) {
233 			taskTime.endTaskTiming();
234 			taskTime.printTime();
235 		}
236 
237 		TaskId taskId = loadTaskQueue.popItem!TaskId();
238 		ulong cwp = loadTaskQueue.popItem!ulong();
239 		IGenerator generator = loadTaskQueue.popItem!IGenerator();
240 
241 		if (canceledTasks[taskId])
242 		{
243 			loadResQueue.startMessage();
244 			loadResQueue.pushMessagePart(taskId);
245 			loadResQueue.pushMessagePart(ChunkHeaderItem(ChunkWorldPos(cwp), 0, TASK_CANCELED_METADATA));
246 			loadResQueue.endMessage();
247 
248 			return;
249 		}
250 
251 		bool doGen;
252 		try
253 		{
254 			readTime.startTaskTiming("get");
255 			ubyte[] cborData = worldDb.get(cwp.formChunkKey);
256 			readTime.endTaskTiming();
257 			//scope(exit) worldDb.perChunkSelectStmt.reset();
258 			//infof("S read %s %s %s", taskId, ChunkWorldPos(cwp), cborData.length);
259 
260 			if (cborData !is null)
261 			{
262 				workTime.startTaskTiming("decode");
263 				//printCborStream(cborData[]);
264 				ubyte numLayers = decodeCborSingle!ubyte(cborData);
265 				// TODO check numLayers <= ubyte.max
266 				loadResQueue.startMessage();
267 				loadResQueue.pushMessagePart(taskId);
268 				loadResQueue.pushMessagePart(ChunkHeaderItem(ChunkWorldPos(cwp), cast(ubyte)numLayers, 0));
269 				foreach(_; 0..numLayers)
270 				{
271 					auto timestamp = decodeCborSingle!TimestampType(cborData);
272 					auto layerId = decodeCborSingle!ubyte(cborData);
273 					auto metadata = decodeCborSingle!ushort(cborData);
274 					auto type = decodeCborSingle!StorageType(cborData);
275 
276 					if (type == StorageType.uniform)
277 					{
278 						ulong uniformData = decodeCborSingle!ulong(cborData);
279 						auto dataLength = decodeCborSingle!LayerDataLenType(cborData);
280 						loadResQueue.pushMessagePart(ChunkLayerItem(layerId, dataLength, timestamp, uniformData, metadata));
281 					}
282 					else
283 					{
284 						assert(type == StorageType.compressedArray);
285 						ubyte[] compactBlocks = decodeCborSingle!(ubyte[])(cborData);
286 						compactBlocks = allocLayerArray(compactBlocks);
287 
288 						version(DBG_COMPR)infof("Load %s L %s C (%(%02x%))", ChunkWorldPos(cwp), compactBlocks.length, cast(ubyte[])compactBlocks);
289 						loadResQueue.pushMessagePart(ChunkLayerItem(StorageType.compressedArray, layerId, timestamp, compactBlocks, metadata));
290 					}
291 				}
292 				loadResQueue.endMessage();
293 				// if (cborData.length > 0) error; TODO
294 				workTime.endTaskTiming();
295 			}
296 			else doGen = true;
297 		}
298 		catch(Exception e) {
299 			infof("storage exception %s regenerating %s", e.to!string, ChunkWorldPos(cwp));
300 			doGen = true;
301 		}
302 
303 		if (doGen) {
304 			if (genEnabled && generator) {
305 				workerControl.sendGenTask(taskId, cwp, generator);
306 			} else {
307 				sendEmptyChunk(taskId, loadResQueue, cwp);
308 			}
309 		}
310 
311 		version(DBG_OUT)infof("task load %s", ChunkWorldPos(cwp));
312 	}
313 
314 	uint numReceived;
315 	MonoTime frameStart = MonoTime.currTime;
316 	size_t prevReceived = size_t.max;
317 	while (*atomicLoad!(MemoryOrder.acq)(workerRunning))
318 	{
319 		synchronized (workAvaliableMutex)
320 		{
321 			(cast(Condition)workAvaliable).wait();
322 		}
323 
324 		worldDb.beginTxn();
325 		while (!loadTaskQueue.empty)
326 		{
327 			readChunk();
328 			++numReceived;
329 		}
330 		worldDb.abortTxn();
331 
332 		worldDb.beginTxn();
333 		while (!saveTaskQueue.empty)
334 		{
335 			auto type = saveTaskQueue.popItem!SaveItemType();
336 			final switch(type) {
337 				case SaveItemType.chunk:
338 					writeChunk();
339 					++numReceived;
340 					break;
341 				case SaveItemType.saveHandler:
342 					IoHandler ioHandler = saveTaskQueue.popItem!IoHandler();
343 					ioHandler(worldDb);
344 					break;
345 			}
346 		}
347 		worldDb.commitTxn();
348 
349 		if (prevReceived != numReceived)
350 			version(DBG_OUT)infof("Storage worker running %s %s", numReceived, *atomicLoad(workerRunning));
351 		prevReceived = numReceived;
352 
353 		auto now = MonoTime.currTime;
354 		auto dur = now - frameStart;
355 		if (dur > 3.seconds) {
356 			//infof("Storage update");
357 			frameStart = now;
358 		}
359 	}
360 	}
361 	catch(Throwable t)
362 	{
363 		infof("%s from storage worker", t.to!string);
364 		throw t;
365 	}
366 	version(DBG_OUT)infof("Storage worker stopped (%s, %s)", numReceived, *atomicLoad(workerRunning));
367 }