1 /**
2 Copyright: Copyright (c) 2015-2018 Andrey Penechko.
3 License: $(WEB boost.org/LICENSE_1_0.txt, Boost License 1.0).
4 Authors: Andrey Penechko.
5 */
6 module voxelman.world.storage.chunk.chunkprovider;
7 
8 import voxelman.log;
9 import core.sync.condition;
10 import core.atomic;
11 import core.thread : Thread;
12 import std..string : format;
13 
14 import voxelman.math;
15 import voxelman.container.sharedhashset;
16 
17 import voxelman.world.block : BlockInfoTable;
18 import voxelman.core.config;
19 import voxelman.thread.worker;
20 import voxelman.world.gen.generator : IGenerator;
21 import voxelman.world.gen.utils;
22 import voxelman.world.gen.worker;
23 import voxelman.world.storage;
24 import voxelman.world.worlddb : WorldDb;
25 
26 
27 /// Used to pass data to chunkmanager's onSnapshotLoaded.
28 struct LoadedChunkData
29 {
30 	private ChunkHeaderItem header;
31 	private ChunkLayerItem[MAX_CHUNK_LAYERS] _layers;
32 
33 	ChunkWorldPos cwp() { return header.cwp; }
34 	ChunkLayerItem[] layers() { return _layers[0..header.numLayers]; }
35 
36 	static LoadedChunkData getFromQueue(shared SharedQueue* queue) {
37 		LoadedChunkData data;
38 		assert(queue.length >= ChunkHeaderItem.sizeof);
39 		data.header = queue.popItem!ChunkHeaderItem;
40 
41 		assert(queue.length >= ChunkLayerItem.sizeof * data.header.numLayers);
42 		assert(data.header.numLayers <= MAX_CHUNK_LAYERS,
43 			format("%s <= %s", data.header.numLayers, MAX_CHUNK_LAYERS));
44 
45 		foreach(i; 0..data.header.numLayers)
46 		{
47 			data._layers[i] = queue.popItem!ChunkLayerItem;
48 		}
49 		return data;
50 	}
51 }
52 
53 /// Used to pass data to chunkmanager's onSnapshotLoaded.
54 struct SavedChunkData
55 {
56 	private ChunkHeaderItem header;
57 	private ChunkLayerTimestampItem[MAX_CHUNK_LAYERS] _layers;
58 
59 	ChunkWorldPos cwp() { return header.cwp; }
60 	ChunkLayerTimestampItem[] layers() { return _layers[0..header.numLayers]; }
61 
62 	static SavedChunkData getFromQueue(shared SharedQueue* queue) {
63 		SavedChunkData data;
64 		assert(queue.length >= ChunkHeaderItem.sizeof);
65 		data.header = queue.popItem!ChunkHeaderItem;
66 
67 		assert(queue.length >= ChunkLayerTimestampItem.sizeof * data.header.numLayers);
68 		assert(data.header.numLayers <= MAX_CHUNK_LAYERS);
69 
70 		foreach(i; 0..data.header.numLayers)
71 		{
72 			data._layers[i] = queue.popItem!ChunkLayerTimestampItem;
73 		}
74 		return data;
75 	}
76 }
77 
78 alias IoHandler = void delegate(WorldDb);
79 alias TaskId = uint;
80 
81 enum SaveItemType : ubyte {
82 	chunk,
83 	saveHandler
84 }
85 
86 enum TASK_OK_METADATA = 0;
87 enum TASK_CANCELED_METADATA = 1;
88 
89 //version = DBG_OUT;
90 final class ChunkProvider
91 {
92 	private Thread storeWorker;
93 	private shared bool workerRunning = true;
94 	private shared bool workerStopped = false;
95 
96 	// metrics
97 	size_t totalReceived;
98 	size_t numWastedLoads;
99 	size_t numSuccessfulCancelations;
100 
101 	private TaskId nextTaskId;
102 
103 	Mutex workAvaliableMutex;
104 	Condition workAvaliable;
105 	shared SharedQueue loadResQueue;
106 	shared SharedQueue saveResQueue;
107 	shared SharedQueue loadTaskQueue;
108 	shared SharedQueue saveTaskQueue;
109 
110 	shared SharedHashSet!TaskId canceledTasks;
111 	TaskId[ChunkWorldPos] chunkTasks;
112 	bool saveUnmodifiedChunks;
113 
114 	shared Worker[] genWorkers;
115 
116 	ChunkManager chunkManager;
117 	void delegate(ChunkWorldPos cwp, ChunkLayerItem[] layers, bool needsSave) onChunkLoadedHandler;
118 	void delegate(ChunkWorldPos cwp, ChunkLayerTimestampItem[] timestamps) onChunkSavedHandler;
119 	IGenerator delegate(DimensionId dimensionId) generatorGetter;
120 
121 	size_t loadQueueSpaceAvaliable() @property const {
122 		ptrdiff_t space = cast(ptrdiff_t)loadTaskQueue.capacity - loadTaskQueue.length;
123 		return space >= 0 ? space : 0;
124 	}
125 
126 	void notify()
127 	{
128 		synchronized (workAvaliableMutex)
129 		{
130 			workAvaliable.notify();
131 		}
132 	}
133 
134 	this(ChunkManager chunkManager) {
135 		this.chunkManager = chunkManager;
136 	}
137 
138 	void init(WorldDb worldDb, uint numGenWorkers, BlockInfoTable blocks, bool saveUnmodifiedChunks)
139 	{
140 		canceledTasks = cast(shared) new SharedHashSet!TaskId;
141 		this.saveUnmodifiedChunks = saveUnmodifiedChunks;
142 
143 		import std.algorithm.comparison : clamp;
144 		numGenWorkers = clamp(numGenWorkers, 0, 16);
145 		genWorkers.length = numGenWorkers;
146 		foreach(i; 0..numGenWorkers)
147 		{
148 			genWorkers[i].alloc(0, "GEN_W", QUEUE_LENGTH);
149 			genWorkers[i].thread = cast(shared)spawnWorker(&chunkGenWorkerThread, &genWorkers[i], canceledTasks, blocks);
150 		}
151 
152 		workAvaliableMutex = new Mutex;
153 		workAvaliable = new Condition(workAvaliableMutex);
154 		loadResQueue.alloc("loadResQ", QUEUE_LENGTH);
155 		saveResQueue.alloc("saveResQ", QUEUE_LENGTH);
156 		loadTaskQueue.alloc("loadTaskQ", QUEUE_LENGTH);
157 		saveTaskQueue.alloc("saveTaskQ", QUEUE_LENGTH);
158 		storeWorker = spawnWorker(
159 			&storageWorker, cast(immutable)worldDb,
160 			&workerRunning,
161 			cast(shared)workAvaliableMutex, cast(shared)workAvaliable,
162 			&loadResQueue, &saveResQueue, &loadTaskQueue, &saveTaskQueue,
163 			canceledTasks,
164 			genWorkers);
165 	}
166 
167 	void stop() {
168 		bool queuesEmpty() {
169 			return loadResQueue.empty && saveResQueue.empty && loadTaskQueue.empty && saveTaskQueue.empty;
170 		}
171 		bool allWorkersStopped() {
172 			bool stopped = !storeWorker.isRunning;
173 			foreach(ref w; genWorkers) stopped = stopped && w.isStopped;
174 			return stopped;
175 		}
176 
177 		while (!queuesEmpty()) {
178 			if(!saveTaskQueue.empty || !loadTaskQueue.empty)
179 			{
180 				notify();
181 			}
182 			update();
183 		}
184 
185 		atomicStore!(MemoryOrder.rel)(workerRunning, false);
186 		notify();
187 		foreach(ref w; genWorkers) w.stop();
188 
189 		while (!allWorkersStopped())
190 		{
191 			Thread.yield();
192 		}
193 
194 		free();
195 	}
196 
197 	private void free() {
198 		loadResQueue.free();
199 		saveResQueue.free();
200 		loadTaskQueue.free();
201 		saveTaskQueue.free();
202 		foreach(ref w; genWorkers) {
203 			w.free();
204 		}
205 	}
206 
207 	size_t prevReceived = size_t.max;
208 	void update() {
209 		while(loadResQueue.length > 0)
210 		{
211 			receiveChunk(&loadResQueue, false);
212 		}
213 		while(!saveResQueue.empty)
214 		{
215 			auto data = SavedChunkData.getFromQueue(&saveResQueue);
216 			onSnapshotSaved(data.cwp, data.layers);
217 		}
218 		foreach(ref w; genWorkers)
219 		{
220 			while(!w.resultQueue.empty)
221 			{
222 				receiveChunk(&w.resultQueue, saveUnmodifiedChunks);
223 			}
224 		}
225 
226 		if (prevReceived != totalReceived)
227 			version(DBG_OUT)infof("ChunkProvider running %s", totalReceived);
228 		prevReceived = totalReceived;
229 	}
230 
231 	void loadChunk(ChunkWorldPos cwp)
232 	{
233 		IGenerator generator = generatorGetter(cwp.dimension);
234 
235 		TaskId id = nextTaskId++;
236 		chunkTasks[cwp] = id;
237 
238 		loadTaskQueue.startMessage();
239 		loadTaskQueue.pushMessagePart!TaskId(id);
240 		loadTaskQueue.pushMessagePart!ulong(cwp.asUlong);
241 		loadTaskQueue.pushMessagePart!IGenerator(generator);
242 		loadTaskQueue.endMessage();
243 
244 		notify();
245 	}
246 
247 	void cancelLoad(ChunkWorldPos cwp)
248 	{
249 		TaskId tid = chunkTasks[cwp];
250 		canceledTasks.put(tid);
251 		chunkTasks.remove(cwp);
252 	}
253 
254 	void receiveChunk(shared(SharedQueue)* queue, bool needsSave)
255 	{
256 		TaskId loadedTaskId = queue.popItem!TaskId();
257 
258 		auto data = LoadedChunkData.getFromQueue(queue);
259 
260 		bool isFinalResult = false;
261 		// data is not marked as canceled
262 		if (data.header.metadata == TASK_OK_METADATA)
263 		{
264 			// data is for latest task -> send to chunk manager
265 			if (auto latestTaskId = data.cwp in chunkTasks)
266 			{
267 				if (loadedTaskId == *latestTaskId)
268 				{
269 					isFinalResult = true;
270 				}
271 			}
272 		}
273 
274 		if (isFinalResult)
275 		{
276 			//assert(!canceledTasks[loadedTaskId]);
277 			onChunkLoadedHandler(data.cwp, data.layers, needsSave);
278 			chunkTasks.remove(data.cwp);
279 		}
280 		else
281 		{
282 			//assert(canceledTasks[loadedTaskId]);
283 			// update metrics
284 			if (data.header.metadata == TASK_OK_METADATA)
285 				++numWastedLoads;
286 			else
287 				++numSuccessfulCancelations;
288 
289 			// data is for canceled request -> free arrays
290 			foreach(ref layer; data.layers)
291 				freeLayerArray(layer);
292 			canceledTasks.remove(loadedTaskId);
293 		}
294 
295 		++totalReceived;
296 	}
297 
298 	// sends a delegate to IO thread
299 	void pushSaveHandler(IoHandler ioHandler)
300 	{
301 		saveTaskQueue.startMessage();
302 		saveTaskQueue.pushMessagePart(SaveItemType.saveHandler);
303 		saveTaskQueue.pushMessagePart(ioHandler);
304 		saveTaskQueue.endMessage();
305 		notify();
306 	}
307 
308 	/// Performs save of all modified chunks.
309 	/// Modified chunks are those that were committed.
310 	/// Perform save right after commit.
311 	void save() {
312 		foreach(cwp; chunkManager.getModifiedChunks()) {
313 			saveChunk(cwp);
314 		}
315 		chunkManager.clearModifiedChunks();
316 	}
317 
318 	void saveChunk(ChunkWorldPos cwp) {
319 		saveTaskQueue.startMessage();
320 		saveTaskQueue.pushMessagePart(SaveItemType.chunk);
321 		size_t headerPos = saveTaskQueue.skipMessageItem!ChunkHeaderItem();
322 
323 		uint numChunkLayers;
324 		foreach(ChunkLayerItem layerItem; chunkManager.iterateChunkSnapshotsAddUsers(cwp)) {
325 			saveTaskQueue.pushMessagePart(layerItem);
326 			++numChunkLayers;
327 		}
328 
329 		saveTaskQueue.setItem(ChunkHeaderItem(cwp, numChunkLayers), headerPos);
330 		saveTaskQueue.endMessage();
331 
332 		notify();
333 	}
334 
335 	private void onSnapshotSaved(ChunkWorldPos cwp, ChunkLayerTimestampItem[] timestamps) {
336 		foreach(item; timestamps) {
337 			chunkManager.removeSnapshotUser(cwp, item.timestamp, item.layerId);
338 		}
339 	}
340 }