1 /** 2 Copyright: Copyright (c) 2014-2016 Andrey Penechko. 3 License: $(WEB boost.org/LICENSE_1_0.txt, Boost License 1.0). 4 Authors: Andrey Penechko. 5 */ 6 module voxelman.world.storage.storageworker; 7 8 import std.experimental.logger; 9 import std.conv : to; 10 import std.datetime : MonoTime, Duration, usecs, dur, seconds; 11 import core.atomic; 12 import core.sync.condition; 13 14 import cbor; 15 16 import voxelman.block.utils; 17 import voxelman.world.gen.generators; 18 import voxelman.world.gen.utils; 19 import voxelman.core.config; 20 import voxelman.utils.compression; 21 import voxelman.utils.worker; 22 import voxelman.world.storage.chunk; 23 import voxelman.world.storage.chunkprovider; 24 import voxelman.world.storage.coordinates; 25 import voxelman.world.worlddb; 26 27 28 struct TimeMeasurer 29 { 30 TimeMeasurer* nested; 31 TimeMeasurer* next; 32 MonoTime startTime; 33 Duration takenTime; 34 string taskName; 35 bool wasRun = false; 36 37 void reset() 38 { 39 wasRun = false; 40 takenTime = Duration.zero; 41 if (nested) nested.reset(); 42 if (next) next.reset(); 43 } 44 45 void startTaskTiming(string name) 46 { 47 taskName = name; 48 startTime = MonoTime.currTime; 49 } 50 51 void endTaskTiming() 52 { 53 wasRun = true; 54 takenTime = MonoTime.currTime - startTime; 55 } 56 57 void printTime(bool isNested = false) 58 { 59 //int seconds; short msecs; short usecs; 60 //takenTime.split!("seconds", "msecs", "usecs")(seconds, msecs, usecs); 61 //if (msecs > 10 || seconds > 0 || isNested) 62 //{ 63 // if (wasRun) 64 // tracef("%s%s %s.%s,%ss", isNested?" ":"", taskName, seconds, msecs, usecs); 65 // if (nested) nested.printTime(true); 66 // if (next) next.printTime(isNested); 67 //} 68 } 69 } 70 71 struct GenWorkerControl 72 { 73 this(shared Worker[] genWorkers_) 74 { 75 genWorkers = genWorkers_; 76 queueLengths.length = genWorkers.length; 77 } 78 79 shared Worker[] genWorkers; 80 81 // last worker, we sent work to. 82 size_t lastWorker; 83 // last work item. 84 ChunkWorldPos lastCwp; 85 static struct QLen {size_t i; size_t len;} 86 QLen[] queueLengths; 87 88 // returns worker with smallest queue. 89 size_t getWorker() 90 { 91 import std.algorithm : sort; 92 foreach(i; 0..genWorkers.length) 93 { 94 queueLengths[i].i = i; 95 queueLengths[i].len = genWorkers[i].taskQueue.length; 96 } 97 sort!((a,b) => a.len < b.len)(queueLengths);// balance worker queues 98 return queueLengths[0].i; 99 } 100 101 // Sends chunks with the same x and z to the same worker. 102 // There is thread local heightmap cache. 103 void sendGenTask(ulong cwp) 104 { 105 auto _cwp = ChunkWorldPos(cwp); 106 size_t workerIndex; 107 // send task from the same chunk column 108 // to the same worker to improve cache hit rate 109 if (_cwp.x == lastCwp.x && _cwp.z == lastCwp.z) 110 { 111 workerIndex = lastWorker; 112 } 113 else 114 { 115 workerIndex = getWorker(); 116 } 117 shared(Worker)* worker = &genWorkers[workerIndex]; 118 worker.taskQueue.pushItem!ulong(cwp); 119 worker.taskQueue.pushItem(generators[_cwp.w % $]); 120 worker.notify(); 121 lastWorker = workerIndex; 122 lastCwp = _cwp; 123 } 124 } 125 126 //version = DBG_OUT; 127 //version = DBG_COMPR; 128 void storageWorker( 129 immutable WorldDb _worldDb, 130 shared bool* workerRunning, 131 shared Mutex workAvaliableMutex, 132 shared Condition workAvaliable, 133 shared SharedQueue* loadResQueue, 134 shared SharedQueue* saveResQueue, 135 shared SharedQueue* loadTaskQueue, 136 shared SharedQueue* saveTaskQueue, 137 shared Worker[] genWorkers, 138 ) 139 { 140 version(DBG_OUT)infof("Storage worker started"); 141 infof("genWorkers.length %s", genWorkers.length); 142 try 143 { 144 ubyte[] compressBuffer = new ubyte[](4096*16); 145 ubyte[] buffer = new ubyte[](4096*16); 146 WorldDb worldDb = cast(WorldDb)_worldDb; 147 scope(exit) worldDb.close(); 148 149 TimeMeasurer taskTime; 150 TimeMeasurer workTime; 151 TimeMeasurer readTime; 152 taskTime.nested = &readTime; 153 readTime.next = &workTime; 154 155 auto workerControl = GenWorkerControl(genWorkers); 156 157 void writeChunk() 158 { 159 taskTime.reset(); 160 taskTime.startTaskTiming("WR"); 161 162 ChunkHeaderItem header = saveTaskQueue.popItem!ChunkHeaderItem(); 163 164 saveResQueue.startMessage(); 165 saveResQueue.pushMessagePart(header); 166 try 167 { 168 size_t encodedSize = encodeCbor(buffer[], header.numLayers); 169 170 foreach(_; 0..header.numLayers) 171 { 172 ChunkLayerItem layer = saveTaskQueue.popItem!ChunkLayerItem(); 173 174 encodedSize += encodeCbor(buffer[encodedSize..$], layer.timestamp); 175 encodedSize += encodeCbor(buffer[encodedSize..$], layer.layerId); 176 encodedSize += encodeCbor(buffer[encodedSize..$], layer.metadata); 177 if (layer.type == StorageType.uniform) 178 { 179 encodedSize += encodeCbor(buffer[encodedSize..$], StorageType.uniform); 180 encodedSize += encodeCbor(buffer[encodedSize..$], layer.uniformData); 181 encodedSize += encodeCbor(buffer[encodedSize..$], layer.dataLength); 182 } 183 else if (layer.type == StorageType.fullArray) 184 { 185 encodedSize += encodeCbor(buffer[encodedSize..$], StorageType.compressedArray); 186 ubyte[] compactBlocks = compressLayerData(layer.getArray!ubyte, compressBuffer); 187 encodedSize += encodeCbor(buffer[encodedSize..$], compactBlocks); 188 version(DBG_COMPR)infof("Store1 %s %s %s\n(%(%02x%))", header.cwp, compactBlocks.ptr, compactBlocks.length, cast(ubyte[])compactBlocks); 189 } 190 else if (layer.type == StorageType.compressedArray) 191 { 192 encodedSize += encodeCbor(buffer[encodedSize..$], StorageType.compressedArray); 193 ubyte[] compactBlocks = layer.getArray!ubyte; 194 encodedSize += encodeCbor(buffer[encodedSize..$], compactBlocks); 195 version(DBG_COMPR)infof("Store2 %s %s %s\n(%(%02x%))", header.cwp, compactBlocks.ptr, compactBlocks.length, cast(ubyte[])compactBlocks); 196 } 197 198 saveResQueue.pushMessagePart(ChunkLayerTimestampItem(layer.timestamp, layer.layerId)); 199 } 200 201 worldDb.putPerChunkValue(header.cwp.asUlong, buffer[0..encodedSize]); 202 } 203 catch(Exception e) errorf("storage exception %s", e.to!string); 204 saveResQueue.endMessage(); 205 taskTime.endTaskTiming(); 206 taskTime.printTime(); 207 version(DBG_OUT)infof("task save %s", header.cwp); 208 } 209 210 void readChunk() 211 { 212 taskTime.reset(); 213 taskTime.startTaskTiming("RD"); 214 bool doGen; 215 216 ulong cwp = loadTaskQueue.popItem!ulong(); 217 218 try 219 { 220 readTime.startTaskTiming("getPerChunkValue"); 221 ubyte[] cborData = worldDb.getPerChunkValue(cwp); 222 readTime.endTaskTiming(); 223 //scope(exit) worldDb.perChunkSelectStmt.reset(); 224 225 if (cborData !is null) 226 { 227 workTime.startTaskTiming("decode"); 228 ubyte numLayers = decodeCborSingle!ubyte(cborData); 229 // TODO check numLayers <= ubyte.max 230 loadResQueue.startMessage(); 231 loadResQueue.pushMessagePart(ChunkHeaderItem(ChunkWorldPos(cwp), cast(ubyte)numLayers, 0)); 232 foreach(_; 0..numLayers) 233 { 234 auto timestamp = decodeCborSingle!TimestampType(cborData); 235 auto layerId = decodeCborSingle!ubyte(cborData); 236 auto metadata = decodeCborSingle!ushort(cborData); 237 auto type = decodeCborSingle!StorageType(cborData); 238 239 if (type == StorageType.uniform) 240 { 241 ulong uniformData = decodeCborSingle!ulong(cborData); 242 auto dataLength = decodeCborSingle!LayerDataLenType(cborData); 243 loadResQueue.pushMessagePart(ChunkLayerItem(StorageType.uniform, layerId, dataLength, timestamp, uniformData, metadata)); 244 } 245 else 246 { 247 import core.memory : GC; 248 assert(type == StorageType.compressedArray); 249 ubyte[] compactBlocks = decodeCborSingle!(ubyte[])(cborData); 250 compactBlocks = compactBlocks.dup; 251 LayerDataLenType dataLength = cast(LayerDataLenType)compactBlocks.length; 252 ubyte* data = cast(ubyte*)compactBlocks.ptr; 253 254 // Add root to data. 255 // Data can be collected by GC if no-one is referencing it. 256 // It is needed to pass array trough shared queue. 257 GC.addRoot(data); // TODO remove when moved to non-GC allocator 258 version(DBG_COMPR)infof("Load %s L %s C (%(%02x%))", ChunkWorldPos(cwp), compactBlocks.length, cast(ubyte[])compactBlocks); 259 loadResQueue.pushMessagePart(ChunkLayerItem(StorageType.compressedArray, layerId, dataLength, timestamp, data, metadata)); 260 } 261 } 262 loadResQueue.endMessage(); 263 // if (cborData.length > 0) error; TODO 264 workTime.endTaskTiming(); 265 } 266 else doGen = true; 267 } 268 catch(Exception e) { 269 infof("storage exception %s regenerating %s", e.to!string, ChunkWorldPos(cwp)); 270 doGen = true; 271 } 272 if (doGen) { 273 workerControl.sendGenTask(cwp); 274 } 275 taskTime.endTaskTiming(); 276 taskTime.printTime(); 277 version(DBG_OUT)infof("task load %s", ChunkWorldPos(cwp)); 278 } 279 280 uint numReceived; 281 MonoTime frameStart = MonoTime.currTime; 282 size_t prevReceived = size_t.max; 283 while (*atomicLoad!(MemoryOrder.acq)(workerRunning)) 284 { 285 synchronized (workAvaliableMutex) 286 { 287 (cast(Condition)workAvaliable).wait(); 288 } 289 290 worldDb.beginTxn(); 291 while (!loadTaskQueue.empty) 292 { 293 readChunk(); 294 ++numReceived; 295 } 296 worldDb.abortTxn(); 297 298 worldDb.beginTxn(); 299 while (!saveTaskQueue.empty) 300 { 301 auto type = saveTaskQueue.popItem!SaveItemType(); 302 final switch(type) { 303 case SaveItemType.chunk: 304 writeChunk(); 305 ++numReceived; 306 break; 307 case SaveItemType.saveHandler: 308 IoHandler ioHandler = saveTaskQueue.popItem!IoHandler(); 309 ioHandler(worldDb); 310 break; 311 } 312 } 313 worldDb.commitTxn(); 314 315 if (prevReceived != numReceived) 316 version(DBG_OUT)infof("Storage worker running %s %s", numReceived, *atomicLoad(workerRunning)); 317 prevReceived = numReceived; 318 319 auto now = MonoTime.currTime; 320 auto dur = now - frameStart; 321 if (dur > 3.seconds) { 322 //infof("Storage update"); 323 frameStart = now; 324 } 325 } 326 } 327 catch(Throwable t) 328 { 329 infof("%s from storage worker", t.to!string); 330 throw t; 331 } 332 version(DBG_OUT)infof("Storage worker stopped (%s, %s)", numReceived, *atomicLoad(workerRunning)); 333 }