1 /** 2 Copyright: Copyright (c) 2015-2016 Andrey Penechko. 3 License: $(WEB boost.org/LICENSE_1_0.txt, Boost License 1.0). 4 Authors: Andrey Penechko. 5 */ 6 module voxelman.world.storage.chunkprovider; 7 8 import std.experimental.logger; 9 import core.sync.condition; 10 import core.atomic; 11 import core.thread : Thread; 12 13 import voxelman.math; 14 15 import voxelman.block.utils : BlockInfoTable; 16 import voxelman.core.config; 17 import voxelman.utils.worker; 18 import voxelman.world.gen.utils; 19 import voxelman.world.gen.worker; 20 import voxelman.world.storage.chunk; 21 import voxelman.world.storage.coordinates; 22 import voxelman.world.storage.storageworker; 23 import voxelman.world.worlddb : WorldDb; 24 25 enum saveUnmodifiedChunks = false; 26 27 /// Used to pass data to chunkmanager's onSnapshotLoaded. 28 struct LoadedChunkData 29 { 30 private shared(SharedQueue)* queue; 31 ChunkHeaderItem getHeader() 32 { 33 assert(queue.length >= ChunkHeaderItem.sizeof); 34 ChunkHeaderItem header; 35 queue.popItem(header); 36 assert(queue.length >= ChunkLayerItem.sizeof * header.numLayers); 37 return header; 38 } 39 ChunkLayerItem getLayer() 40 { 41 ChunkLayerItem layer; 42 queue.popItem(layer); 43 if (layer.type != StorageType.uniform) 44 { 45 // Remove root, added on chunk load and gen. 46 // Data can be collected by GC if no-one is referencing it. 47 import core.memory : GC; 48 GC.removeRoot(layer.dataPtr); // TODO remove when moved to non-GC allocator 49 } 50 return layer; 51 } 52 } 53 54 /// Used to pass data to chunkmanager's onSnapshotLoaded. 55 struct SavedChunkData 56 { 57 private shared(SharedQueue)* queue; 58 ChunkHeaderItem getHeader() 59 { 60 assert(queue.length >= 2); 61 ChunkHeaderItem header; 62 queue.popItem(header); 63 assert(queue.length >= ChunkLayerItem.sizeof/8 * header.numLayers); 64 return header; 65 } 66 ChunkLayerTimestampItem getLayerTimestamp() 67 { 68 ChunkLayerTimestampItem layer; 69 queue.popItem(layer); 70 return layer; 71 } 72 } 73 74 alias IoHandler = void delegate(WorldDb); 75 76 enum SaveItemType : ubyte { 77 chunk, 78 saveHandler 79 } 80 81 //version = DBG_OUT; 82 struct ChunkProvider 83 { 84 private Thread storeWorker; 85 private shared bool workerRunning = true; 86 private shared bool workerStopped = false; 87 88 size_t numReceived; 89 90 Mutex workAvaliableMutex; 91 Condition workAvaliable; 92 shared SharedQueue loadResQueue; 93 shared SharedQueue saveResQueue; 94 shared SharedQueue loadTaskQueue; 95 shared SharedQueue saveTaskQueue; 96 97 shared Worker[] genWorkers; 98 99 void delegate(LoadedChunkData loadedChunk, bool needsSave) onChunkLoadedHandler; 100 void delegate(SavedChunkData savedChunk) onChunkSavedHandler; 101 102 size_t loadQueueSpaceAvaliable() @property const { 103 ptrdiff_t space = cast(ptrdiff_t)loadTaskQueue.capacity - loadTaskQueue.length; 104 return space >= 0 ? space : 0; 105 } 106 107 void notify() 108 { 109 synchronized (workAvaliableMutex) 110 { 111 workAvaliable.notify(); 112 } 113 } 114 115 void init(WorldDb worldDb, uint numGenWorkers, BlockInfoTable blocks) 116 { 117 import std.algorithm.comparison : clamp; 118 numGenWorkers = clamp(numGenWorkers, 1, 16); 119 genWorkers.length = numGenWorkers; 120 foreach(i; 0..numGenWorkers) 121 { 122 genWorkers[i].alloc("GEN_W", QUEUE_LENGTH); 123 genWorkers[i].thread = cast(shared)spawnWorker(&chunkGenWorkerThread, &genWorkers[i], blocks); 124 } 125 126 workAvaliableMutex = new Mutex; 127 workAvaliable = new Condition(workAvaliableMutex); 128 loadResQueue.alloc("loadResQ", QUEUE_LENGTH); 129 saveResQueue.alloc("saveResQ", QUEUE_LENGTH); 130 loadTaskQueue.alloc("loadTaskQ", QUEUE_LENGTH); 131 saveTaskQueue.alloc("saveTaskQ", QUEUE_LENGTH); 132 storeWorker = spawnWorker( 133 &storageWorker, cast(immutable)worldDb, 134 &workerRunning, 135 cast(shared)workAvaliableMutex, cast(shared)workAvaliable, 136 &loadResQueue, &saveResQueue, &loadTaskQueue, &saveTaskQueue, 137 genWorkers); 138 } 139 140 void stop() { 141 bool queuesEmpty() { 142 return loadResQueue.empty && saveResQueue.empty && loadTaskQueue.empty && saveTaskQueue.empty; 143 } 144 bool allWorkersStopped() { 145 bool stopped = !storeWorker.isRunning; 146 foreach(ref w; genWorkers) stopped = stopped && w.isStopped; 147 return stopped; 148 } 149 150 while (!queuesEmpty()) { 151 update(); 152 } 153 154 atomicStore!(MemoryOrder.rel)(workerRunning, false); 155 notify(); 156 foreach(ref w; genWorkers) w.stop(); 157 158 while (!allWorkersStopped()) 159 { 160 Thread.yield(); 161 } 162 163 free(); 164 } 165 166 private void free() { 167 loadResQueue.free(); 168 saveResQueue.free(); 169 loadTaskQueue.free(); 170 saveTaskQueue.free(); 171 foreach(ref w; genWorkers) { 172 w.free(); 173 } 174 } 175 176 size_t prevReceived = size_t.max; 177 void update() { 178 //infof("%s %s %s %s", loadResQueue.length, saveResQueue.length, 179 // loadTaskQueue.length, saveTaskQueue.length); 180 while(loadResQueue.length > 0) 181 { 182 onChunkLoadedHandler(LoadedChunkData(&loadResQueue), false); 183 ++numReceived; 184 } 185 while(!saveResQueue.empty) 186 { 187 //infof("Save res received"); 188 onChunkSavedHandler(SavedChunkData(&saveResQueue)); 189 ++numReceived; 190 } 191 foreach(ref w; genWorkers) 192 { 193 while(!w.resultQueue.empty) 194 { 195 //infof("Save res received"); 196 onChunkLoadedHandler(LoadedChunkData(&w.resultQueue), saveUnmodifiedChunks); 197 ++numReceived; 198 } 199 } 200 201 if (prevReceived != numReceived) 202 version(DBG_OUT)infof("ChunkProvider running %s", numReceived); 203 prevReceived = numReceived; 204 } 205 206 void loadChunk(ChunkWorldPos cwp) 207 { 208 loadTaskQueue.pushItem!ulong(cwp.asUlong); 209 notify(); 210 } 211 212 // sends a delegate to IO thread 213 void pushSaveHandler(IoHandler ioHandler) 214 { 215 saveTaskQueue.startMessage(); 216 saveTaskQueue.pushMessagePart(SaveItemType.saveHandler); 217 saveTaskQueue.pushMessagePart(ioHandler); 218 saveTaskQueue.endMessage(); 219 notify(); 220 } 221 222 size_t startChunkSave() 223 { 224 saveTaskQueue.startMessage(); 225 saveTaskQueue.pushMessagePart(SaveItemType.chunk); 226 size_t headerPos = saveTaskQueue.skipMessageItem!ChunkHeaderItem(); 227 return headerPos; 228 } 229 void pushLayer(ChunkLayerItem layer) 230 { 231 saveTaskQueue.pushMessagePart(layer); 232 } 233 void endChunkSave(size_t headerPos, ChunkHeaderItem header) 234 { 235 saveTaskQueue.setItem(header, headerPos); 236 saveTaskQueue.endMessage(); 237 notify(); 238 } 239 }