1 /** 2 Copyright: Copyright (c) 2014-2016 Andrey Penechko. 3 License: $(WEB boost.org/LICENSE_1_0.txt, Boost License 1.0). 4 Authors: Andrey Penechko. 5 */ 6 module voxelman.storage.storageworker; 7 8 import std.experimental.logger; 9 import std.conv : to; 10 11 import cbor; 12 13 import voxelman.core.config; 14 import voxelman.core.block; 15 import voxelman.core.chunkgen; 16 import voxelman.storage.chunk; 17 import voxelman.storage.chunkprovider; 18 import voxelman.storage.coordinates; 19 import voxelman.storage.regionstorage; 20 import voxelman.utils.rlecompression; 21 22 private ubyte[4096*16] compressBuffer; 23 private ubyte[4096*16] buffer; 24 25 void storageWorkerThread(Tid mainTid, string regionDir) 26 { 27 RegionStorage regionStorage = RegionStorage(regionDir); 28 shared(bool)* isRunning; 29 bool isRunningLocal = true; 30 receive( (shared(bool)* _isRunning){isRunning = _isRunning;} ); 31 32 void writeChunk(ChunkWorldPos chunkPos, BlockData data, TimestampType timestamp) 33 { 34 if (regionStorage.isChunkOnDisk(chunkPos) && 35 timestamp <= regionStorage.chunkTimestamp(chunkPos)) return; 36 37 BlockData compressedData = data; 38 compressedData.blocks = rleEncode(data.blocks, compressBuffer); 39 40 try 41 { 42 size_t encodedSize = encodeCborArray(buffer[], compressedData); 43 regionStorage.writeChunk(chunkPos, buffer[0..encodedSize], timestamp); 44 } 45 catch(Exception e) 46 { 47 errorf("storage exception %s", e.to!string); 48 } 49 } 50 51 void doWrite(immutable(SaveSnapshotMessage)* message) 52 { 53 auto m = cast(SaveSnapshotMessage*)message; 54 writeChunk(m.cwp, m.snapshot.blockData, m.snapshot.timestamp); 55 56 auto res = new SnapshotSavedMessage(m.cwp, m.snapshot); 57 mainTid.send(cast(immutable(SnapshotSavedMessage)*)res); 58 } 59 60 void readChunk(immutable(LoadSnapshotMessage)* message) 61 { 62 auto m = cast(LoadSnapshotMessage*)message; 63 bool doGen = !regionStorage.isChunkOnDisk(m.cwp); 64 65 try 66 if (!doGen) { 67 TimestampType timestamp; 68 ubyte[] cborData = regionStorage.readChunk(m.cwp, buffer[], timestamp); 69 70 if (cborData !is null) { 71 BlockData compressedData = decodeCborSingle!BlockData(cborData); 72 BlockData blockData = compressedData; 73 blockData.blocks = rleDecode(compressedData.blocks, compressBuffer); 74 75 if (blockData.blocks.length > 0) { 76 bool validLength = blockData.blocks.length == CHUNK_SIZE_CUBE; 77 warningf(!validLength, "Wrong chunk data %s", m.cwp); 78 79 if (validLength) { 80 m.blockBuffer[] = blockData.blocks; 81 blockData.blocks = m.blockBuffer; 82 } 83 } 84 else 85 blockData.blocks = m.blockBuffer; 86 87 auto res = new SnapshotLoadedMessage(m.cwp, BlockDataSnapshot(blockData, timestamp)); 88 mainTid.send(cast(immutable(SnapshotLoadedMessage)*)res); 89 } 90 else 91 doGen = true; 92 } 93 catch(Exception e) { 94 infof("storage exception %s regenerating %s", e.to!string, m.cwp); 95 doGen = true; 96 } 97 98 if (doGen) 99 m.genWorker.send(message); 100 } 101 102 try 103 { 104 while (isRunningLocal) 105 { 106 receive( 107 // read 108 (immutable(LoadSnapshotMessage)* message) 109 { 110 if (!atomicLoad(*isRunning)) 111 return; 112 readChunk(message); 113 }, 114 // write 115 (immutable(SaveSnapshotMessage)* message) 116 { 117 doWrite(message); 118 }, 119 (Variant v) 120 { 121 isRunningLocal = false; 122 regionStorage.clear(); 123 } 124 ); 125 } 126 } 127 catch(Throwable t) 128 { 129 infof("%s from storage worker", t.to!string); 130 throw t; 131 } 132 }