1 /** 2 Copyright: Copyright (c) 2015-2017 Andrey Penechko. 3 License: $(WEB boost.org/LICENSE_1_0.txt, Boost License 1.0). 4 Authors: Andrey Penechko. 5 */ 6 module voxelman.world.storage.chunkprovider; 7 8 import voxelman.log; 9 import core.sync.condition; 10 import core.atomic; 11 import core.thread : Thread; 12 import std..string : format; 13 14 import voxelman.math; 15 import voxelman.container.sharedhashset; 16 17 import voxelman.world.block : BlockInfoTable; 18 import voxelman.core.config; 19 import voxelman.utils.worker; 20 import voxelman.world.gen.generator : IGenerator; 21 import voxelman.world.gen.utils; 22 import voxelman.world.gen.worker; 23 import voxelman.world.storage; 24 import voxelman.world.worlddb : WorldDb; 25 26 enum saveUnmodifiedChunks = false; 27 28 /// Used to pass data to chunkmanager's onSnapshotLoaded. 29 struct LoadedChunkData 30 { 31 private ChunkHeaderItem header; 32 private ChunkLayerItem[MAX_CHUNK_LAYERS] _layers; 33 34 ChunkWorldPos cwp() { return header.cwp; } 35 ChunkLayerItem[] layers() { return _layers[0..header.numLayers]; } 36 37 static LoadedChunkData getFromQueue(shared SharedQueue* queue) { 38 LoadedChunkData data; 39 assert(queue.length >= ChunkHeaderItem.sizeof); 40 data.header = queue.popItem!ChunkHeaderItem; 41 42 assert(queue.length >= ChunkLayerItem.sizeof * data.header.numLayers); 43 assert(data.header.numLayers <= MAX_CHUNK_LAYERS, 44 format("%s <= %s", data.header.numLayers, MAX_CHUNK_LAYERS)); 45 46 foreach(i; 0..data.header.numLayers) 47 { 48 data._layers[i] = queue.popItem!ChunkLayerItem; 49 } 50 return data; 51 } 52 } 53 54 /// Used to pass data to chunkmanager's onSnapshotLoaded. 55 struct SavedChunkData 56 { 57 private ChunkHeaderItem header; 58 private ChunkLayerTimestampItem[MAX_CHUNK_LAYERS] _layers; 59 60 ChunkWorldPos cwp() { return header.cwp; } 61 ChunkLayerTimestampItem[] layers() { return _layers[0..header.numLayers]; } 62 63 static SavedChunkData getFromQueue(shared SharedQueue* queue) { 64 SavedChunkData data; 65 assert(queue.length >= ChunkHeaderItem.sizeof); 66 data.header = queue.popItem!ChunkHeaderItem; 67 68 assert(queue.length >= ChunkLayerTimestampItem.sizeof * data.header.numLayers); 69 assert(data.header.numLayers <= MAX_CHUNK_LAYERS); 70 71 foreach(i; 0..data.header.numLayers) 72 { 73 data._layers[i] = queue.popItem!ChunkLayerTimestampItem; 74 } 75 return data; 76 } 77 } 78 79 alias IoHandler = void delegate(WorldDb); 80 alias TaskId = uint; 81 82 enum SaveItemType : ubyte { 83 chunk, 84 saveHandler 85 } 86 87 enum TASK_OK_METADATA = 0; 88 enum TASK_CANCELED_METADATA = 1; 89 90 //version = DBG_OUT; 91 struct ChunkProvider 92 { 93 private Thread storeWorker; 94 private shared bool workerRunning = true; 95 private shared bool workerStopped = false; 96 97 // metrics 98 size_t totalReceived; 99 size_t numWastedLoads; 100 size_t numSuccessfulCancelations; 101 102 private TaskId nextTaskId; 103 104 Mutex workAvaliableMutex; 105 Condition workAvaliable; 106 shared SharedQueue loadResQueue; 107 shared SharedQueue saveResQueue; 108 shared SharedQueue loadTaskQueue; 109 shared SharedQueue saveTaskQueue; 110 111 shared SharedHashSet!TaskId canceledTasks; 112 TaskId[ChunkWorldPos] chunkTasks; 113 114 shared Worker[] genWorkers; 115 116 void delegate(ChunkWorldPos cwp, ChunkLayerItem[] layers, bool needsSave) onChunkLoadedHandler; 117 void delegate(ChunkWorldPos cwp, ChunkLayerTimestampItem[] timestamps) onChunkSavedHandler; 118 IGenerator delegate(DimensionId dimensionId) generatorGetter; 119 120 size_t loadQueueSpaceAvaliable() @property const { 121 ptrdiff_t space = cast(ptrdiff_t)loadTaskQueue.capacity - loadTaskQueue.length; 122 return space >= 0 ? space : 0; 123 } 124 125 void notify() 126 { 127 synchronized (workAvaliableMutex) 128 { 129 workAvaliable.notify(); 130 } 131 } 132 133 void init(WorldDb worldDb, uint numGenWorkers, BlockInfoTable blocks) 134 { 135 canceledTasks = cast(shared) new SharedHashSet!TaskId; 136 137 import std.algorithm.comparison : clamp; 138 numGenWorkers = clamp(numGenWorkers, 0, 16); 139 genWorkers.length = numGenWorkers; 140 foreach(i; 0..numGenWorkers) 141 { 142 genWorkers[i].alloc(0, "GEN_W", QUEUE_LENGTH); 143 genWorkers[i].thread = cast(shared)spawnWorker(&chunkGenWorkerThread, &genWorkers[i], canceledTasks, blocks); 144 } 145 146 workAvaliableMutex = new Mutex; 147 workAvaliable = new Condition(workAvaliableMutex); 148 loadResQueue.alloc("loadResQ", QUEUE_LENGTH); 149 saveResQueue.alloc("saveResQ", QUEUE_LENGTH); 150 loadTaskQueue.alloc("loadTaskQ", QUEUE_LENGTH); 151 saveTaskQueue.alloc("saveTaskQ", QUEUE_LENGTH); 152 storeWorker = spawnWorker( 153 &storageWorker, cast(immutable)worldDb, 154 &workerRunning, 155 cast(shared)workAvaliableMutex, cast(shared)workAvaliable, 156 &loadResQueue, &saveResQueue, &loadTaskQueue, &saveTaskQueue, 157 canceledTasks, 158 genWorkers); 159 } 160 161 void stop() { 162 bool queuesEmpty() { 163 return loadResQueue.empty && saveResQueue.empty && loadTaskQueue.empty && saveTaskQueue.empty; 164 } 165 bool allWorkersStopped() { 166 bool stopped = !storeWorker.isRunning; 167 foreach(ref w; genWorkers) stopped = stopped && w.isStopped; 168 return stopped; 169 } 170 171 while (!queuesEmpty()) { 172 if(!saveTaskQueue.empty || !loadTaskQueue.empty) 173 { 174 notify(); 175 } 176 update(); 177 } 178 179 atomicStore!(MemoryOrder.rel)(workerRunning, false); 180 notify(); 181 foreach(ref w; genWorkers) w.stop(); 182 183 while (!allWorkersStopped()) 184 { 185 Thread.yield(); 186 } 187 188 free(); 189 } 190 191 private void free() { 192 loadResQueue.free(); 193 saveResQueue.free(); 194 loadTaskQueue.free(); 195 saveTaskQueue.free(); 196 foreach(ref w; genWorkers) { 197 w.free(); 198 } 199 } 200 201 size_t prevReceived = size_t.max; 202 void update() { 203 while(loadResQueue.length > 0) 204 { 205 receiveChunk(&loadResQueue, false); 206 } 207 while(!saveResQueue.empty) 208 { 209 auto data = SavedChunkData.getFromQueue(&saveResQueue); 210 onChunkSavedHandler(data.cwp, data.layers); 211 } 212 foreach(ref w; genWorkers) 213 { 214 while(!w.resultQueue.empty) 215 { 216 receiveChunk(&w.resultQueue, saveUnmodifiedChunks); 217 } 218 } 219 220 if (prevReceived != totalReceived) 221 version(DBG_OUT)infof("ChunkProvider running %s", totalReceived); 222 prevReceived = totalReceived; 223 } 224 225 void loadChunk(ChunkWorldPos cwp) 226 { 227 IGenerator generator = generatorGetter(cwp.dimension); 228 229 TaskId id = nextTaskId++; 230 chunkTasks[cwp] = id; 231 232 loadTaskQueue.startMessage(); 233 loadTaskQueue.pushMessagePart!TaskId(id); 234 loadTaskQueue.pushMessagePart!ulong(cwp.asUlong); 235 loadTaskQueue.pushMessagePart!IGenerator(generator); 236 loadTaskQueue.endMessage(); 237 238 notify(); 239 } 240 241 void cancelLoad(ChunkWorldPos cwp) 242 { 243 TaskId tid = chunkTasks[cwp]; 244 canceledTasks.put(tid); 245 chunkTasks.remove(cwp); 246 } 247 248 void receiveChunk(shared(SharedQueue)* queue, bool needsSave) 249 { 250 TaskId loadedTaskId = queue.popItem!TaskId(); 251 252 auto data = LoadedChunkData.getFromQueue(queue); 253 254 bool isFinalResult = false; 255 // data is not marked as canceled 256 if (data.header.metadata == TASK_OK_METADATA) 257 { 258 // data is for latest task -> send to chunk manager 259 if (auto latestTaskId = data.cwp in chunkTasks) 260 { 261 if (loadedTaskId == *latestTaskId) 262 { 263 isFinalResult = true; 264 } 265 } 266 } 267 268 if (isFinalResult) 269 { 270 //assert(!canceledTasks[loadedTaskId]); 271 onChunkLoadedHandler(data.cwp, data.layers, needsSave); 272 chunkTasks.remove(data.cwp); 273 } 274 else 275 { 276 //assert(canceledTasks[loadedTaskId]); 277 // update metrics 278 if (data.header.metadata == TASK_OK_METADATA) 279 ++numWastedLoads; 280 else 281 ++numSuccessfulCancelations; 282 283 // data is for canceled request -> free arrays 284 foreach(ref layer; data.layers) 285 freeLayerArray(layer); 286 canceledTasks.remove(loadedTaskId); 287 } 288 289 ++totalReceived; 290 } 291 292 // sends a delegate to IO thread 293 void pushSaveHandler(IoHandler ioHandler) 294 { 295 saveTaskQueue.startMessage(); 296 saveTaskQueue.pushMessagePart(SaveItemType.saveHandler); 297 saveTaskQueue.pushMessagePart(ioHandler); 298 saveTaskQueue.endMessage(); 299 notify(); 300 } 301 302 size_t startChunkSave() 303 { 304 saveTaskQueue.startMessage(); 305 saveTaskQueue.pushMessagePart(SaveItemType.chunk); 306 size_t headerPos = saveTaskQueue.skipMessageItem!ChunkHeaderItem(); 307 return headerPos; 308 } 309 void pushLayer(ChunkLayerItem layer) 310 { 311 saveTaskQueue.pushMessagePart(layer); 312 } 313 void endChunkSave(size_t headerPos, ChunkHeaderItem header) 314 { 315 saveTaskQueue.setItem(header, headerPos); 316 saveTaskQueue.endMessage(); 317 notify(); 318 } 319 }