1 /** 2 Copyright: Copyright (c) 2015-2018 Andrey Penechko. 3 License: $(WEB boost.org/LICENSE_1_0.txt, Boost License 1.0). 4 Authors: Andrey Penechko. 5 */ 6 module voxelman.world.storage.chunk.chunkprovider; 7 8 import voxelman.log; 9 import core.sync.condition; 10 import core.atomic; 11 import core.thread : Thread; 12 import std..string : format; 13 14 import voxelman.math; 15 import voxelman.container.sharedhashset; 16 17 import voxelman.world.block : BlockInfoTable; 18 import voxelman.core.config; 19 import voxelman.thread.worker; 20 import voxelman.world.gen.generator : IGenerator; 21 import voxelman.world.gen.utils; 22 import voxelman.world.gen.worker; 23 import voxelman.world.storage; 24 import voxelman.world.worlddb : WorldDb; 25 26 27 /// Used to pass data to chunkmanager's onSnapshotLoaded. 28 struct LoadedChunkData 29 { 30 private ChunkHeaderItem header; 31 private ChunkLayerItem[MAX_CHUNK_LAYERS] _layers; 32 33 ChunkWorldPos cwp() { return header.cwp; } 34 ChunkLayerItem[] layers() { return _layers[0..header.numLayers]; } 35 36 static LoadedChunkData getFromQueue(shared SharedQueue* queue) { 37 LoadedChunkData data; 38 assert(queue.length >= ChunkHeaderItem.sizeof); 39 data.header = queue.popItem!ChunkHeaderItem; 40 41 assert(queue.length >= ChunkLayerItem.sizeof * data.header.numLayers); 42 assert(data.header.numLayers <= MAX_CHUNK_LAYERS, 43 format("%s <= %s", data.header.numLayers, MAX_CHUNK_LAYERS)); 44 45 foreach(i; 0..data.header.numLayers) 46 { 47 data._layers[i] = queue.popItem!ChunkLayerItem; 48 } 49 return data; 50 } 51 } 52 53 /// Used to pass data to chunkmanager's onSnapshotLoaded. 54 struct SavedChunkData 55 { 56 private ChunkHeaderItem header; 57 private ChunkLayerTimestampItem[MAX_CHUNK_LAYERS] _layers; 58 59 ChunkWorldPos cwp() { return header.cwp; } 60 ChunkLayerTimestampItem[] layers() { return _layers[0..header.numLayers]; } 61 62 static SavedChunkData getFromQueue(shared SharedQueue* queue) { 63 SavedChunkData data; 64 assert(queue.length >= ChunkHeaderItem.sizeof); 65 data.header = queue.popItem!ChunkHeaderItem; 66 67 assert(queue.length >= ChunkLayerTimestampItem.sizeof * data.header.numLayers); 68 assert(data.header.numLayers <= MAX_CHUNK_LAYERS); 69 70 foreach(i; 0..data.header.numLayers) 71 { 72 data._layers[i] = queue.popItem!ChunkLayerTimestampItem; 73 } 74 return data; 75 } 76 } 77 78 alias IoHandler = void delegate(WorldDb); 79 alias TaskId = uint; 80 81 enum SaveItemType : ubyte { 82 chunk, 83 saveHandler 84 } 85 86 enum TASK_OK_METADATA = 0; 87 enum TASK_CANCELED_METADATA = 1; 88 89 //version = DBG_OUT; 90 final class ChunkProvider 91 { 92 private Thread storeWorker; 93 private shared bool workerRunning = true; 94 private shared bool workerStopped = false; 95 96 // metrics 97 size_t totalReceived; 98 size_t numWastedLoads; 99 size_t numSuccessfulCancelations; 100 101 private TaskId nextTaskId; 102 103 Mutex workAvaliableMutex; 104 Condition workAvaliable; 105 shared SharedQueue loadResQueue; 106 shared SharedQueue saveResQueue; 107 shared SharedQueue loadTaskQueue; 108 shared SharedQueue saveTaskQueue; 109 110 shared SharedHashSet!TaskId canceledTasks; 111 TaskId[ChunkWorldPos] chunkTasks; 112 bool saveUnmodifiedChunks; 113 114 shared Worker[] genWorkers; 115 116 ChunkManager chunkManager; 117 void delegate(ChunkWorldPos cwp, ChunkLayerItem[] layers, bool needsSave) onChunkLoadedHandler; 118 void delegate(ChunkWorldPos cwp, ChunkLayerTimestampItem[] timestamps) onChunkSavedHandler; 119 IGenerator delegate(DimensionId dimensionId) generatorGetter; 120 121 size_t loadQueueSpaceAvaliable() @property const { 122 ptrdiff_t space = cast(ptrdiff_t)loadTaskQueue.capacity - loadTaskQueue.length; 123 return space >= 0 ? space : 0; 124 } 125 126 void notify() 127 { 128 synchronized (workAvaliableMutex) 129 { 130 workAvaliable.notify(); 131 } 132 } 133 134 this(ChunkManager chunkManager) { 135 this.chunkManager = chunkManager; 136 } 137 138 void init(WorldDb worldDb, uint numGenWorkers, BlockInfoTable blocks, bool saveUnmodifiedChunks) 139 { 140 canceledTasks = cast(shared) new SharedHashSet!TaskId; 141 this.saveUnmodifiedChunks = saveUnmodifiedChunks; 142 143 import std.algorithm.comparison : clamp; 144 numGenWorkers = clamp(numGenWorkers, 0, 16); 145 genWorkers.length = numGenWorkers; 146 foreach(i; 0..numGenWorkers) 147 { 148 genWorkers[i].alloc(0, "GEN_W", QUEUE_LENGTH); 149 genWorkers[i].thread = cast(shared)spawnWorker(&chunkGenWorkerThread, &genWorkers[i], canceledTasks, blocks); 150 } 151 152 workAvaliableMutex = new Mutex; 153 workAvaliable = new Condition(workAvaliableMutex); 154 loadResQueue.alloc("loadResQ", QUEUE_LENGTH); 155 saveResQueue.alloc("saveResQ", QUEUE_LENGTH); 156 loadTaskQueue.alloc("loadTaskQ", QUEUE_LENGTH); 157 saveTaskQueue.alloc("saveTaskQ", QUEUE_LENGTH); 158 storeWorker = spawnWorker( 159 &storageWorker, cast(immutable)worldDb, 160 &workerRunning, 161 cast(shared)workAvaliableMutex, cast(shared)workAvaliable, 162 &loadResQueue, &saveResQueue, &loadTaskQueue, &saveTaskQueue, 163 canceledTasks, 164 genWorkers); 165 } 166 167 void stop() { 168 bool queuesEmpty() { 169 return loadResQueue.empty && saveResQueue.empty && loadTaskQueue.empty && saveTaskQueue.empty; 170 } 171 bool allWorkersStopped() { 172 bool stopped = !storeWorker.isRunning; 173 foreach(ref w; genWorkers) stopped = stopped && w.isStopped; 174 return stopped; 175 } 176 177 while (!queuesEmpty()) { 178 if(!saveTaskQueue.empty || !loadTaskQueue.empty) 179 { 180 notify(); 181 } 182 update(); 183 } 184 185 atomicStore!(MemoryOrder.rel)(workerRunning, false); 186 notify(); 187 foreach(ref w; genWorkers) w.stop(); 188 189 while (!allWorkersStopped()) 190 { 191 Thread.yield(); 192 } 193 194 free(); 195 } 196 197 private void free() { 198 loadResQueue.free(); 199 saveResQueue.free(); 200 loadTaskQueue.free(); 201 saveTaskQueue.free(); 202 foreach(ref w; genWorkers) { 203 w.free(); 204 } 205 } 206 207 size_t prevReceived = size_t.max; 208 void update() { 209 while(loadResQueue.length > 0) 210 { 211 receiveChunk(&loadResQueue, false); 212 } 213 while(!saveResQueue.empty) 214 { 215 auto data = SavedChunkData.getFromQueue(&saveResQueue); 216 onSnapshotSaved(data.cwp, data.layers); 217 } 218 foreach(ref w; genWorkers) 219 { 220 while(!w.resultQueue.empty) 221 { 222 receiveChunk(&w.resultQueue, saveUnmodifiedChunks); 223 } 224 } 225 226 if (prevReceived != totalReceived) 227 version(DBG_OUT)infof("ChunkProvider running %s", totalReceived); 228 prevReceived = totalReceived; 229 } 230 231 void loadChunk(ChunkWorldPos cwp) 232 { 233 IGenerator generator = generatorGetter(cwp.dimension); 234 235 TaskId id = nextTaskId++; 236 chunkTasks[cwp] = id; 237 238 loadTaskQueue.startMessage(); 239 loadTaskQueue.pushMessagePart!TaskId(id); 240 loadTaskQueue.pushMessagePart!ulong(cwp.asUlong); 241 loadTaskQueue.pushMessagePart!IGenerator(generator); 242 loadTaskQueue.endMessage(); 243 244 notify(); 245 } 246 247 void cancelLoad(ChunkWorldPos cwp) 248 { 249 TaskId tid = chunkTasks[cwp]; 250 canceledTasks.put(tid); 251 chunkTasks.remove(cwp); 252 } 253 254 void receiveChunk(shared(SharedQueue)* queue, bool needsSave) 255 { 256 TaskId loadedTaskId = queue.popItem!TaskId(); 257 258 auto data = LoadedChunkData.getFromQueue(queue); 259 260 bool isFinalResult = false; 261 // data is not marked as canceled 262 if (data.header.metadata == TASK_OK_METADATA) 263 { 264 // data is for latest task -> send to chunk manager 265 if (auto latestTaskId = data.cwp in chunkTasks) 266 { 267 if (loadedTaskId == *latestTaskId) 268 { 269 isFinalResult = true; 270 } 271 } 272 } 273 274 if (isFinalResult) 275 { 276 //assert(!canceledTasks[loadedTaskId]); 277 onChunkLoadedHandler(data.cwp, data.layers, needsSave); 278 chunkTasks.remove(data.cwp); 279 } 280 else 281 { 282 //assert(canceledTasks[loadedTaskId]); 283 // update metrics 284 if (data.header.metadata == TASK_OK_METADATA) 285 ++numWastedLoads; 286 else 287 ++numSuccessfulCancelations; 288 289 // data is for canceled request -> free arrays 290 foreach(ref layer; data.layers) 291 freeLayerArray(layer); 292 canceledTasks.remove(loadedTaskId); 293 } 294 295 ++totalReceived; 296 } 297 298 // sends a delegate to IO thread 299 void pushSaveHandler(IoHandler ioHandler) 300 { 301 saveTaskQueue.startMessage(); 302 saveTaskQueue.pushMessagePart(SaveItemType.saveHandler); 303 saveTaskQueue.pushMessagePart(ioHandler); 304 saveTaskQueue.endMessage(); 305 notify(); 306 } 307 308 /// Performs save of all modified chunks. 309 /// Modified chunks are those that were committed. 310 /// Perform save right after commit. 311 void save() { 312 foreach(cwp; chunkManager.getModifiedChunks()) { 313 saveChunk(cwp); 314 } 315 chunkManager.clearModifiedChunks(); 316 } 317 318 void saveChunk(ChunkWorldPos cwp) { 319 saveTaskQueue.startMessage(); 320 saveTaskQueue.pushMessagePart(SaveItemType.chunk); 321 size_t headerPos = saveTaskQueue.skipMessageItem!ChunkHeaderItem(); 322 323 uint numChunkLayers; 324 foreach(ChunkLayerItem layerItem; chunkManager.iterateChunkSnapshotsAddUsers(cwp)) { 325 saveTaskQueue.pushMessagePart(layerItem); 326 ++numChunkLayers; 327 } 328 329 saveTaskQueue.setItem(ChunkHeaderItem(cwp, numChunkLayers), headerPos); 330 saveTaskQueue.endMessage(); 331 332 notify(); 333 } 334 335 private void onSnapshotSaved(ChunkWorldPos cwp, ChunkLayerTimestampItem[] timestamps) { 336 foreach(item; timestamps) { 337 chunkManager.removeSnapshotUser(cwp, item.timestamp, item.layerId); 338 } 339 } 340 }