1 /**
2 Copyright: Copyright (c) 2015-2017 Andrey Penechko.
3 License: $(WEB boost.org/LICENSE_1_0.txt, Boost License 1.0).
4 Authors: Andrey Penechko.
5 */
6 module voxelman.world.storage.chunkprovider;
7 
8 import voxelman.log;
9 import core.sync.condition;
10 import core.atomic;
11 import core.thread : Thread;
12 import std..string : format;
13 
14 import voxelman.math;
15 import voxelman.container.sharedhashset;
16 
17 import voxelman.world.block : BlockInfoTable;
18 import voxelman.core.config;
19 import voxelman.utils.worker;
20 import voxelman.world.gen.generator : IGenerator;
21 import voxelman.world.gen.utils;
22 import voxelman.world.gen.worker;
23 import voxelman.world.storage;
24 import voxelman.world.worlddb : WorldDb;
25 
26 enum saveUnmodifiedChunks = false;
27 
28 /// Used to pass data to chunkmanager's onSnapshotLoaded.
29 struct LoadedChunkData
30 {
31 	private ChunkHeaderItem header;
32 	private ChunkLayerItem[MAX_CHUNK_LAYERS] _layers;
33 
34 	ChunkWorldPos cwp() { return header.cwp; }
35 	ChunkLayerItem[] layers() { return _layers[0..header.numLayers]; }
36 
37 	static LoadedChunkData getFromQueue(shared SharedQueue* queue) {
38 		LoadedChunkData data;
39 		assert(queue.length >= ChunkHeaderItem.sizeof);
40 		data.header = queue.popItem!ChunkHeaderItem;
41 
42 		assert(queue.length >= ChunkLayerItem.sizeof * data.header.numLayers);
43 		assert(data.header.numLayers <= MAX_CHUNK_LAYERS,
44 			format("%s <= %s", data.header.numLayers, MAX_CHUNK_LAYERS));
45 
46 		foreach(i; 0..data.header.numLayers)
47 		{
48 			data._layers[i] = queue.popItem!ChunkLayerItem;
49 		}
50 		return data;
51 	}
52 }
53 
54 /// Used to pass data to chunkmanager's onSnapshotLoaded.
55 struct SavedChunkData
56 {
57 	private ChunkHeaderItem header;
58 	private ChunkLayerTimestampItem[MAX_CHUNK_LAYERS] _layers;
59 
60 	ChunkWorldPos cwp() { return header.cwp; }
61 	ChunkLayerTimestampItem[] layers() { return _layers[0..header.numLayers]; }
62 
63 	static SavedChunkData getFromQueue(shared SharedQueue* queue) {
64 		SavedChunkData data;
65 		assert(queue.length >= ChunkHeaderItem.sizeof);
66 		data.header = queue.popItem!ChunkHeaderItem;
67 
68 		assert(queue.length >= ChunkLayerTimestampItem.sizeof * data.header.numLayers);
69 		assert(data.header.numLayers <= MAX_CHUNK_LAYERS);
70 
71 		foreach(i; 0..data.header.numLayers)
72 		{
73 			data._layers[i] = queue.popItem!ChunkLayerTimestampItem;
74 		}
75 		return data;
76 	}
77 }
78 
79 alias IoHandler = void delegate(WorldDb);
80 alias TaskId = uint;
81 
82 enum SaveItemType : ubyte {
83 	chunk,
84 	saveHandler
85 }
86 
87 enum TASK_OK_METADATA = 0;
88 enum TASK_CANCELED_METADATA = 1;
89 
90 //version = DBG_OUT;
91 struct ChunkProvider
92 {
93 	private Thread storeWorker;
94 	private shared bool workerRunning = true;
95 	private shared bool workerStopped = false;
96 
97 	// metrics
98 	size_t totalReceived;
99 	size_t numWastedLoads;
100 	size_t numSuccessfulCancelations;
101 
102 	private TaskId nextTaskId;
103 
104 	Mutex workAvaliableMutex;
105 	Condition workAvaliable;
106 	shared SharedQueue loadResQueue;
107 	shared SharedQueue saveResQueue;
108 	shared SharedQueue loadTaskQueue;
109 	shared SharedQueue saveTaskQueue;
110 
111 	shared SharedHashSet!TaskId canceledTasks;
112 	TaskId[ChunkWorldPos] chunkTasks;
113 
114 	shared Worker[] genWorkers;
115 
116 	void delegate(ChunkWorldPos cwp, ChunkLayerItem[] layers, bool needsSave) onChunkLoadedHandler;
117 	void delegate(ChunkWorldPos cwp, ChunkLayerTimestampItem[] timestamps) onChunkSavedHandler;
118 	IGenerator delegate(DimensionId dimensionId) generatorGetter;
119 
120 	size_t loadQueueSpaceAvaliable() @property const {
121 		ptrdiff_t space = cast(ptrdiff_t)loadTaskQueue.capacity - loadTaskQueue.length;
122 		return space >= 0 ? space : 0;
123 	}
124 
125 	void notify()
126 	{
127 		synchronized (workAvaliableMutex)
128 		{
129 			workAvaliable.notify();
130 		}
131 	}
132 
133 	void init(WorldDb worldDb, uint numGenWorkers, BlockInfoTable blocks)
134 	{
135 		canceledTasks = cast(shared) new SharedHashSet!TaskId;
136 
137 		import std.algorithm.comparison : clamp;
138 		numGenWorkers = clamp(numGenWorkers, 0, 16);
139 		genWorkers.length = numGenWorkers;
140 		foreach(i; 0..numGenWorkers)
141 		{
142 			genWorkers[i].alloc(0, "GEN_W", QUEUE_LENGTH);
143 			genWorkers[i].thread = cast(shared)spawnWorker(&chunkGenWorkerThread, &genWorkers[i], canceledTasks, blocks);
144 		}
145 
146 		workAvaliableMutex = new Mutex;
147 		workAvaliable = new Condition(workAvaliableMutex);
148 		loadResQueue.alloc("loadResQ", QUEUE_LENGTH);
149 		saveResQueue.alloc("saveResQ", QUEUE_LENGTH);
150 		loadTaskQueue.alloc("loadTaskQ", QUEUE_LENGTH);
151 		saveTaskQueue.alloc("saveTaskQ", QUEUE_LENGTH);
152 		storeWorker = spawnWorker(
153 			&storageWorker, cast(immutable)worldDb,
154 			&workerRunning,
155 			cast(shared)workAvaliableMutex, cast(shared)workAvaliable,
156 			&loadResQueue, &saveResQueue, &loadTaskQueue, &saveTaskQueue,
157 			canceledTasks,
158 			genWorkers);
159 	}
160 
161 	void stop() {
162 		bool queuesEmpty() {
163 			return loadResQueue.empty && saveResQueue.empty && loadTaskQueue.empty && saveTaskQueue.empty;
164 		}
165 		bool allWorkersStopped() {
166 			bool stopped = !storeWorker.isRunning;
167 			foreach(ref w; genWorkers) stopped = stopped && w.isStopped;
168 			return stopped;
169 		}
170 
171 		while (!queuesEmpty()) {
172 			if(!saveTaskQueue.empty || !loadTaskQueue.empty)
173 			{
174 				notify();
175 			}
176 			update();
177 		}
178 
179 		atomicStore!(MemoryOrder.rel)(workerRunning, false);
180 		notify();
181 		foreach(ref w; genWorkers) w.stop();
182 
183 		while (!allWorkersStopped())
184 		{
185 			Thread.yield();
186 		}
187 
188 		free();
189 	}
190 
191 	private void free() {
192 		loadResQueue.free();
193 		saveResQueue.free();
194 		loadTaskQueue.free();
195 		saveTaskQueue.free();
196 		foreach(ref w; genWorkers) {
197 			w.free();
198 		}
199 	}
200 
201 	size_t prevReceived = size_t.max;
202 	void update() {
203 		while(loadResQueue.length > 0)
204 		{
205 			receiveChunk(&loadResQueue, false);
206 		}
207 		while(!saveResQueue.empty)
208 		{
209 			auto data = SavedChunkData.getFromQueue(&saveResQueue);
210 			onChunkSavedHandler(data.cwp, data.layers);
211 		}
212 		foreach(ref w; genWorkers)
213 		{
214 			while(!w.resultQueue.empty)
215 			{
216 				receiveChunk(&w.resultQueue, saveUnmodifiedChunks);
217 			}
218 		}
219 
220 		if (prevReceived != totalReceived)
221 			version(DBG_OUT)infof("ChunkProvider running %s", totalReceived);
222 		prevReceived = totalReceived;
223 	}
224 
225 	void loadChunk(ChunkWorldPos cwp)
226 	{
227 		IGenerator generator = generatorGetter(cwp.dimension);
228 
229 		TaskId id = nextTaskId++;
230 		chunkTasks[cwp] = id;
231 
232 		loadTaskQueue.startMessage();
233 		loadTaskQueue.pushMessagePart!TaskId(id);
234 		loadTaskQueue.pushMessagePart!ulong(cwp.asUlong);
235 		loadTaskQueue.pushMessagePart!IGenerator(generator);
236 		loadTaskQueue.endMessage();
237 
238 		notify();
239 	}
240 
241 	void cancelLoad(ChunkWorldPos cwp)
242 	{
243 		TaskId tid = chunkTasks[cwp];
244 		canceledTasks.put(tid);
245 		chunkTasks.remove(cwp);
246 	}
247 
248 	void receiveChunk(shared(SharedQueue)* queue, bool needsSave)
249 	{
250 		TaskId loadedTaskId = queue.popItem!TaskId();
251 
252 		auto data = LoadedChunkData.getFromQueue(queue);
253 
254 		bool isFinalResult = false;
255 		// data is not marked as canceled
256 		if (data.header.metadata == TASK_OK_METADATA)
257 		{
258 			// data is for latest task -> send to chunk manager
259 			if (auto latestTaskId = data.cwp in chunkTasks)
260 			{
261 				if (loadedTaskId == *latestTaskId)
262 				{
263 					isFinalResult = true;
264 				}
265 			}
266 		}
267 
268 		if (isFinalResult)
269 		{
270 			//assert(!canceledTasks[loadedTaskId]);
271 			onChunkLoadedHandler(data.cwp, data.layers, needsSave);
272 			chunkTasks.remove(data.cwp);
273 		}
274 		else
275 		{
276 			//assert(canceledTasks[loadedTaskId]);
277 			// update metrics
278 			if (data.header.metadata == TASK_OK_METADATA)
279 				++numWastedLoads;
280 			else
281 				++numSuccessfulCancelations;
282 
283 			// data is for canceled request -> free arrays
284 			foreach(ref layer; data.layers)
285 				freeLayerArray(layer);
286 			canceledTasks.remove(loadedTaskId);
287 		}
288 
289 		++totalReceived;
290 	}
291 
292 	// sends a delegate to IO thread
293 	void pushSaveHandler(IoHandler ioHandler)
294 	{
295 		saveTaskQueue.startMessage();
296 		saveTaskQueue.pushMessagePart(SaveItemType.saveHandler);
297 		saveTaskQueue.pushMessagePart(ioHandler);
298 		saveTaskQueue.endMessage();
299 		notify();
300 	}
301 
302 	size_t startChunkSave()
303 	{
304 		saveTaskQueue.startMessage();
305 		saveTaskQueue.pushMessagePart(SaveItemType.chunk);
306 		size_t headerPos = saveTaskQueue.skipMessageItem!ChunkHeaderItem();
307 		return headerPos;
308 	}
309 	void pushLayer(ChunkLayerItem layer)
310 	{
311 		saveTaskQueue.pushMessagePart(layer);
312 	}
313 	void endChunkSave(size_t headerPos, ChunkHeaderItem header)
314 	{
315 		saveTaskQueue.setItem(header, headerPos);
316 		saveTaskQueue.endMessage();
317 		notify();
318 	}
319 }