1 /** 2 Copyright: Copyright (c) 2014-2017 Andrey Penechko. 3 License: $(WEB boost.org/LICENSE_1_0.txt, Boost License 1.0). 4 Authors: Andrey Penechko. 5 */ 6 module voxelman.world.storage.storageworker; 7 8 import voxelman.log; 9 import voxelman.container.sharedhashset; 10 import std.conv : to; 11 import std.datetime : MonoTime, Duration, usecs, dur, seconds; 12 import core.atomic; 13 import core.sync.condition; 14 15 import cbor; 16 17 import voxelman.world.block; 18 import voxelman.core.config; 19 import voxelman.utils.compression; 20 import voxelman.utils.worker; 21 import voxelman.world.gen.generator : IGenerator; 22 import voxelman.world.gen.utils; 23 import voxelman.world.storage.chunk; 24 import voxelman.world.storage.chunkprovider; 25 import voxelman.world.storage.coordinates; 26 import voxelman.world.worlddb; 27 28 29 struct TimeMeasurer 30 { 31 TimeMeasurer* nested; 32 TimeMeasurer* next; 33 MonoTime startTime; 34 Duration takenTime; 35 string taskName; 36 bool wasRun = false; 37 38 void reset() 39 { 40 wasRun = false; 41 takenTime = Duration.zero; 42 if (nested) nested.reset(); 43 if (next) next.reset(); 44 } 45 46 void startTaskTiming(string name) 47 { 48 taskName = name; 49 startTime = MonoTime.currTime; 50 } 51 52 void endTaskTiming() 53 { 54 wasRun = true; 55 takenTime = MonoTime.currTime - startTime; 56 } 57 58 void printTime(bool isNested = false) 59 { 60 //int seconds; short msecs; short usecs; 61 //takenTime.split!("seconds", "msecs", "usecs")(seconds, msecs, usecs); 62 //if (msecs > 10 || seconds > 0 || isNested) 63 //{ 64 // if (wasRun) 65 // tracef("%s%s %s.%s,%ss", isNested?" ":"", taskName, seconds, msecs, usecs); 66 // if (nested) nested.printTime(true); 67 // if (next) next.printTime(isNested); 68 //} 69 } 70 } 71 72 struct GenWorkerControl 73 { 74 this(shared Worker[] genWorkers_) 75 { 76 genWorkers = genWorkers_; 77 queueLengths.length = genWorkers.length; 78 } 79 80 shared Worker[] genWorkers; 81 82 // last worker, we sent work to. 83 size_t lastWorker; 84 // last work item. 85 ChunkWorldPos lastCwp; 86 static struct QLen {size_t i; size_t len;} 87 QLen[] queueLengths; 88 89 // returns worker with smallest queue. 90 size_t getWorker() 91 { 92 import std.algorithm : sort; 93 foreach(i; 0..genWorkers.length) 94 { 95 queueLengths[i].i = i; 96 queueLengths[i].len = genWorkers[i].taskQueue.length; 97 } 98 sort!((a,b) => a.len < b.len)(queueLengths);// balance worker queues 99 return queueLengths[0].i; 100 } 101 102 // Sends chunks with the same x and z to the same worker. 103 // There is thread local heightmap cache. 104 void sendGenTask(TaskId taskId, ulong cwp, IGenerator generator) 105 { 106 auto _cwp = ChunkWorldPos(cwp); 107 size_t workerIndex; 108 // send task from the same chunk column 109 // to the same worker to improve cache hit rate 110 if (_cwp.x == lastCwp.x && _cwp.z == lastCwp.z) 111 { 112 workerIndex = lastWorker; 113 } 114 else 115 { 116 workerIndex = getWorker(); 117 } 118 shared(Worker)* worker = &genWorkers[workerIndex]; 119 worker.taskQueue.startMessage(); 120 worker.taskQueue.pushMessagePart(taskId); 121 worker.taskQueue.pushMessagePart!ulong(cwp); 122 worker.taskQueue.pushMessagePart(generator); 123 worker.taskQueue.endMessage(); 124 worker.notify(); 125 lastWorker = workerIndex; 126 lastCwp = _cwp; 127 } 128 } 129 130 void sendEmptyChunk(TaskId taskId, shared SharedQueue* queue, ulong cwp) 131 { 132 queue.startMessage(); 133 queue.pushMessagePart(taskId); 134 queue.pushMessagePart(ChunkHeaderItem(ChunkWorldPos(cwp), 1/*numLayers*/, 0)); 135 queue.pushMessagePart(ChunkLayerItem(BLOCK_LAYER, 136 BLOCKID_UNIFORM_FILL_BITS/*dataLength*/, 0/*timestamp*/, 0/*uniformData*/, 137 SOLID_CHUNK_METADATA)); 138 queue.endMessage(); 139 } 140 141 //version = DBG_OUT; 142 //version = DBG_COMPR; 143 void storageWorker( 144 immutable WorldDb _worldDb, 145 shared bool* workerRunning, 146 shared Mutex workAvaliableMutex, 147 shared Condition workAvaliable, 148 shared SharedQueue* loadResQueue, 149 shared SharedQueue* saveResQueue, 150 shared SharedQueue* loadTaskQueue, 151 shared SharedQueue* saveTaskQueue, 152 shared SharedHashSet!TaskId canceledTasks, 153 shared Worker[] genWorkers, 154 ) 155 { 156 version(DBG_OUT) infof("Storage worker started"); 157 version(DBG_OUT) infof("genWorkers.length %s", genWorkers.length); 158 try 159 { 160 ubyte[] compressBuffer = new ubyte[](4096*16); 161 ubyte[] buffer = new ubyte[](4096*16); 162 WorldDb worldDb = cast(WorldDb)_worldDb; 163 scope(exit) worldDb.close(); 164 165 TimeMeasurer taskTime; 166 TimeMeasurer workTime; 167 TimeMeasurer readTime; 168 taskTime.nested = &readTime; 169 readTime.next = &workTime; 170 171 auto workerControl = GenWorkerControl(genWorkers); 172 bool genEnabled = genWorkers.length > 0; 173 174 void writeChunk() 175 { 176 taskTime.reset(); 177 taskTime.startTaskTiming("WR"); 178 179 ChunkHeaderItem header = saveTaskQueue.popItem!ChunkHeaderItem(); 180 181 saveResQueue.startMessage(); 182 saveResQueue.pushMessagePart(header); 183 try 184 { 185 size_t encodedSize = encodeCbor(buffer[], header.numLayers); 186 187 foreach(_; 0..header.numLayers) 188 { 189 ChunkLayerItem layer = saveTaskQueue.popItem!ChunkLayerItem(); 190 191 encodedSize += encodeCbor(buffer[encodedSize..$], layer.timestamp); 192 encodedSize += encodeCbor(buffer[encodedSize..$], layer.layerId); 193 encodedSize += encodeCbor(buffer[encodedSize..$], layer.metadata); 194 if (layer.type == StorageType.uniform) 195 { 196 encodedSize += encodeCbor(buffer[encodedSize..$], StorageType.uniform); 197 encodedSize += encodeCbor(buffer[encodedSize..$], layer.uniformData); 198 encodedSize += encodeCbor(buffer[encodedSize..$], layer.dataLength); 199 } 200 else if (layer.type == StorageType.fullArray) 201 { 202 encodedSize += encodeCbor(buffer[encodedSize..$], StorageType.compressedArray); 203 ubyte[] compactBlocks = compressLayerData(layer.getArray!ubyte, compressBuffer); 204 encodedSize += encodeCbor(buffer[encodedSize..$], compactBlocks); 205 version(DBG_COMPR)infof("Store1 %s %s %s\n(%(%02x%))", header.cwp, compactBlocks.ptr, compactBlocks.length, cast(ubyte[])compactBlocks); 206 } 207 else if (layer.type == StorageType.compressedArray) 208 { 209 encodedSize += encodeCbor(buffer[encodedSize..$], StorageType.compressedArray); 210 ubyte[] compactBlocks = layer.getArray!ubyte; 211 encodedSize += encodeCbor(buffer[encodedSize..$], compactBlocks); 212 version(DBG_COMPR)infof("Store2 %s %s %s\n(%(%02x%))", header.cwp, compactBlocks.ptr, compactBlocks.length, cast(ubyte[])compactBlocks); 213 } 214 215 saveResQueue.pushMessagePart(ChunkLayerTimestampItem(layer.timestamp, layer.layerId)); 216 } 217 218 //infof("S write %s (%(%02x%)) %s", header.cwp, header.cwp.asUlong.formChunkKey, encodedSize); 219 worldDb.put(header.cwp.asUlong.formChunkKey, buffer[0..encodedSize]); 220 } 221 catch(Exception e) errorf("storage exception %s", e.to!string); 222 saveResQueue.endMessage(); 223 taskTime.endTaskTiming(); 224 taskTime.printTime(); 225 version(DBG_OUT)infof("task save %s", header.cwp); 226 } 227 228 void readChunk() 229 { 230 taskTime.reset(); 231 taskTime.startTaskTiming("RD"); 232 scope(exit) { 233 taskTime.endTaskTiming(); 234 taskTime.printTime(); 235 } 236 237 TaskId taskId = loadTaskQueue.popItem!TaskId(); 238 ulong cwp = loadTaskQueue.popItem!ulong(); 239 IGenerator generator = loadTaskQueue.popItem!IGenerator(); 240 241 if (canceledTasks[taskId]) 242 { 243 loadResQueue.startMessage(); 244 loadResQueue.pushMessagePart(taskId); 245 loadResQueue.pushMessagePart(ChunkHeaderItem(ChunkWorldPos(cwp), 0, TASK_CANCELED_METADATA)); 246 loadResQueue.endMessage(); 247 248 return; 249 } 250 251 bool doGen; 252 try 253 { 254 readTime.startTaskTiming("get"); 255 ubyte[] cborData = worldDb.get(cwp.formChunkKey); 256 readTime.endTaskTiming(); 257 //scope(exit) worldDb.perChunkSelectStmt.reset(); 258 //infof("S read %s %s %s", taskId, ChunkWorldPos(cwp), cborData.length); 259 260 if (cborData !is null) 261 { 262 workTime.startTaskTiming("decode"); 263 //printCborStream(cborData[]); 264 ubyte numLayers = decodeCborSingle!ubyte(cborData); 265 // TODO check numLayers <= ubyte.max 266 loadResQueue.startMessage(); 267 loadResQueue.pushMessagePart(taskId); 268 loadResQueue.pushMessagePart(ChunkHeaderItem(ChunkWorldPos(cwp), cast(ubyte)numLayers, 0)); 269 foreach(_; 0..numLayers) 270 { 271 auto timestamp = decodeCborSingle!TimestampType(cborData); 272 auto layerId = decodeCborSingle!ubyte(cborData); 273 auto metadata = decodeCborSingle!ushort(cborData); 274 auto type = decodeCborSingle!StorageType(cborData); 275 276 if (type == StorageType.uniform) 277 { 278 ulong uniformData = decodeCborSingle!ulong(cborData); 279 auto dataLength = decodeCborSingle!LayerDataLenType(cborData); 280 loadResQueue.pushMessagePart(ChunkLayerItem(layerId, dataLength, timestamp, uniformData, metadata)); 281 } 282 else 283 { 284 assert(type == StorageType.compressedArray); 285 ubyte[] compactBlocks = decodeCborSingle!(ubyte[])(cborData); 286 compactBlocks = allocLayerArray(compactBlocks); 287 288 version(DBG_COMPR)infof("Load %s L %s C (%(%02x%))", ChunkWorldPos(cwp), compactBlocks.length, cast(ubyte[])compactBlocks); 289 loadResQueue.pushMessagePart(ChunkLayerItem(StorageType.compressedArray, layerId, timestamp, compactBlocks, metadata)); 290 } 291 } 292 loadResQueue.endMessage(); 293 // if (cborData.length > 0) error; TODO 294 workTime.endTaskTiming(); 295 } 296 else doGen = true; 297 } 298 catch(Exception e) { 299 infof("storage exception %s regenerating %s", e.to!string, ChunkWorldPos(cwp)); 300 doGen = true; 301 } 302 303 if (doGen) { 304 if (genEnabled && generator) { 305 workerControl.sendGenTask(taskId, cwp, generator); 306 } else { 307 sendEmptyChunk(taskId, loadResQueue, cwp); 308 } 309 } 310 311 version(DBG_OUT)infof("task load %s", ChunkWorldPos(cwp)); 312 } 313 314 uint numReceived; 315 MonoTime frameStart = MonoTime.currTime; 316 size_t prevReceived = size_t.max; 317 while (*atomicLoad!(MemoryOrder.acq)(workerRunning)) 318 { 319 synchronized (workAvaliableMutex) 320 { 321 (cast(Condition)workAvaliable).wait(); 322 } 323 324 worldDb.beginTxn(); 325 while (!loadTaskQueue.empty) 326 { 327 readChunk(); 328 ++numReceived; 329 } 330 worldDb.abortTxn(); 331 332 worldDb.beginTxn(); 333 while (!saveTaskQueue.empty) 334 { 335 auto type = saveTaskQueue.popItem!SaveItemType(); 336 final switch(type) { 337 case SaveItemType.chunk: 338 writeChunk(); 339 ++numReceived; 340 break; 341 case SaveItemType.saveHandler: 342 IoHandler ioHandler = saveTaskQueue.popItem!IoHandler(); 343 ioHandler(worldDb); 344 break; 345 } 346 } 347 worldDb.commitTxn(); 348 349 if (prevReceived != numReceived) 350 version(DBG_OUT)infof("Storage worker running %s %s", numReceived, *atomicLoad(workerRunning)); 351 prevReceived = numReceived; 352 353 auto now = MonoTime.currTime; 354 auto dur = now - frameStart; 355 if (dur > 3.seconds) { 356 //infof("Storage update"); 357 frameStart = now; 358 } 359 } 360 } 361 catch(Throwable t) 362 { 363 infof("%s from storage worker", t.to!string); 364 throw t; 365 } 366 version(DBG_OUT)infof("Storage worker stopped (%s, %s)", numReceived, *atomicLoad(workerRunning)); 367 }