1 /**
2 Copyright: Copyright (c) 2015-2016 Andrey Penechko.
3 License: $(WEB boost.org/LICENSE_1_0.txt, Boost License 1.0).
4 Authors: Andrey Penechko.
5 */
6 module voxelman.world.storage.chunkprovider;
7 
8 import std.experimental.logger;
9 import core.sync.condition;
10 import core.atomic;
11 import core.thread : Thread;
12 
13 import voxelman.math;
14 
15 import voxelman.block.utils : BlockInfoTable;
16 import voxelman.core.config;
17 import voxelman.utils.worker;
18 import voxelman.world.gen.utils;
19 import voxelman.world.gen.worker;
20 import voxelman.world.storage.chunk;
21 import voxelman.world.storage.coordinates;
22 import voxelman.world.storage.storageworker;
23 import voxelman.world.worlddb : WorldDb;
24 
25 enum saveUnmodifiedChunks = false;
26 
27 /// Used to pass data to chunkmanager's onSnapshotLoaded.
28 struct LoadedChunkData
29 {
30 	private shared(SharedQueue)* queue;
31 	ChunkHeaderItem getHeader()
32 	{
33 		assert(queue.length >= ChunkHeaderItem.sizeof);
34 		ChunkHeaderItem header;
35 		queue.popItem(header);
36 		assert(queue.length >= ChunkLayerItem.sizeof * header.numLayers);
37 		return header;
38 	}
39 	ChunkLayerItem getLayer()
40 	{
41 		ChunkLayerItem layer;
42 		queue.popItem(layer);
43 		if (layer.type != StorageType.uniform)
44 		{
45 			// Remove root, added on chunk load and gen.
46 			// Data can be collected by GC if no-one is referencing it.
47 			import core.memory : GC;
48 			GC.removeRoot(layer.dataPtr); // TODO remove when moved to non-GC allocator
49 		}
50 		return layer;
51 	}
52 }
53 
54 /// Used to pass data to chunkmanager's onSnapshotLoaded.
55 struct SavedChunkData
56 {
57 	private shared(SharedQueue)* queue;
58 	ChunkHeaderItem getHeader()
59 	{
60 		assert(queue.length >= 2);
61 		ChunkHeaderItem header;
62 		queue.popItem(header);
63 		assert(queue.length >= ChunkLayerItem.sizeof/8 * header.numLayers);
64 		return header;
65 	}
66 	ChunkLayerTimestampItem getLayerTimestamp()
67 	{
68 		ChunkLayerTimestampItem layer;
69 		queue.popItem(layer);
70 		return layer;
71 	}
72 }
73 
74 alias IoHandler = void delegate(WorldDb);
75 
76 enum SaveItemType : ubyte {
77 	chunk,
78 	saveHandler
79 }
80 
81 //version = DBG_OUT;
82 struct ChunkProvider
83 {
84 	private Thread storeWorker;
85 	private shared bool workerRunning = true;
86 	private shared bool workerStopped = false;
87 
88 	size_t numReceived;
89 
90 	Mutex workAvaliableMutex;
91 	Condition workAvaliable;
92 	shared SharedQueue loadResQueue;
93 	shared SharedQueue saveResQueue;
94 	shared SharedQueue loadTaskQueue;
95 	shared SharedQueue saveTaskQueue;
96 
97 	shared Worker[] genWorkers;
98 
99 	void delegate(LoadedChunkData loadedChunk, bool needsSave) onChunkLoadedHandler;
100 	void delegate(SavedChunkData savedChunk) onChunkSavedHandler;
101 
102 	size_t loadQueueSpaceAvaliable() @property const {
103 		ptrdiff_t space = cast(ptrdiff_t)loadTaskQueue.capacity - loadTaskQueue.length;
104 		return space >= 0 ? space : 0;
105 	}
106 
107 	void notify()
108 	{
109 		synchronized (workAvaliableMutex)
110 		{
111 			workAvaliable.notify();
112 		}
113 	}
114 
115 	void init(WorldDb worldDb, uint numGenWorkers, BlockInfoTable blocks)
116 	{
117 		import std.algorithm.comparison : clamp;
118 		numGenWorkers = clamp(numGenWorkers, 1, 16);
119 		genWorkers.length = numGenWorkers;
120 		foreach(i; 0..numGenWorkers)
121 		{
122 			genWorkers[i].alloc("GEN_W", QUEUE_LENGTH);
123 			genWorkers[i].thread = cast(shared)spawnWorker(&chunkGenWorkerThread, &genWorkers[i], blocks);
124 		}
125 
126 		workAvaliableMutex = new Mutex;
127 		workAvaliable = new Condition(workAvaliableMutex);
128 		loadResQueue.alloc("loadResQ", QUEUE_LENGTH);
129 		saveResQueue.alloc("saveResQ", QUEUE_LENGTH);
130 		loadTaskQueue.alloc("loadTaskQ", QUEUE_LENGTH);
131 		saveTaskQueue.alloc("saveTaskQ", QUEUE_LENGTH);
132 		storeWorker = spawnWorker(
133 			&storageWorker, cast(immutable)worldDb,
134 			&workerRunning,
135 			cast(shared)workAvaliableMutex, cast(shared)workAvaliable,
136 			&loadResQueue, &saveResQueue, &loadTaskQueue, &saveTaskQueue,
137 			genWorkers);
138 	}
139 
140 	void stop() {
141 		bool queuesEmpty() {
142 			return loadResQueue.empty && saveResQueue.empty && loadTaskQueue.empty && saveTaskQueue.empty;
143 		}
144 		bool allWorkersStopped() {
145 			bool stopped = !storeWorker.isRunning;
146 			foreach(ref w; genWorkers) stopped = stopped && w.isStopped;
147 			return stopped;
148 		}
149 
150 		while (!queuesEmpty()) {
151 			update();
152 		}
153 
154 		atomicStore!(MemoryOrder.rel)(workerRunning, false);
155 		notify();
156 		foreach(ref w; genWorkers) w.stop();
157 
158 		while (!allWorkersStopped())
159 		{
160 			Thread.yield();
161 		}
162 
163 		free();
164 	}
165 
166 	private void free() {
167 		loadResQueue.free();
168 		saveResQueue.free();
169 		loadTaskQueue.free();
170 		saveTaskQueue.free();
171 		foreach(ref w; genWorkers) {
172 			w.free();
173 		}
174 	}
175 
176 	size_t prevReceived = size_t.max;
177 	void update() {
178 		//infof("%s %s %s %s", loadResQueue.length, saveResQueue.length,
179 		//		loadTaskQueue.length, saveTaskQueue.length);
180 		while(loadResQueue.length > 0)
181 		{
182 			onChunkLoadedHandler(LoadedChunkData(&loadResQueue), false);
183 			++numReceived;
184 		}
185 		while(!saveResQueue.empty)
186 		{
187 			//infof("Save res received");
188 			onChunkSavedHandler(SavedChunkData(&saveResQueue));
189 			++numReceived;
190 		}
191 		foreach(ref w; genWorkers)
192 		{
193 			while(!w.resultQueue.empty)
194 			{
195 				//infof("Save res received");
196 				onChunkLoadedHandler(LoadedChunkData(&w.resultQueue), saveUnmodifiedChunks);
197 				++numReceived;
198 			}
199 		}
200 
201 		if (prevReceived != numReceived)
202 			version(DBG_OUT)infof("ChunkProvider running %s", numReceived);
203 		prevReceived = numReceived;
204 	}
205 
206 	void loadChunk(ChunkWorldPos cwp)
207 	{
208 		loadTaskQueue.pushItem!ulong(cwp.asUlong);
209 		notify();
210 	}
211 
212 	// sends a delegate to IO thread
213 	void pushSaveHandler(IoHandler ioHandler)
214 	{
215 		saveTaskQueue.startMessage();
216 		saveTaskQueue.pushMessagePart(SaveItemType.saveHandler);
217 		saveTaskQueue.pushMessagePart(ioHandler);
218 		saveTaskQueue.endMessage();
219 		notify();
220 	}
221 
222 	size_t startChunkSave()
223 	{
224 		saveTaskQueue.startMessage();
225 		saveTaskQueue.pushMessagePart(SaveItemType.chunk);
226 		size_t headerPos = saveTaskQueue.skipMessageItem!ChunkHeaderItem();
227 		return headerPos;
228 	}
229 	void pushLayer(ChunkLayerItem layer)
230 	{
231 		saveTaskQueue.pushMessagePart(layer);
232 	}
233 	void endChunkSave(size_t headerPos, ChunkHeaderItem header)
234 	{
235 		saveTaskQueue.setItem(header, headerPos);
236 		saveTaskQueue.endMessage();
237 		notify();
238 	}
239 }