1 /**
2 Copyright: Copyright (c) 2014-2016 Andrey Penechko.
3 License: $(WEB boost.org/LICENSE_1_0.txt, Boost License 1.0).
4 Authors: Andrey Penechko.
5 */
6 module voxelman.storage.storageworker;
7 
8 import std.experimental.logger;
9 import std.conv : to;
10 
11 import cbor;
12 
13 import voxelman.core.config;
14 import voxelman.core.block;
15 import voxelman.core.chunkgen;
16 import voxelman.storage.chunk;
17 import voxelman.storage.chunkprovider;
18 import voxelman.storage.coordinates;
19 import voxelman.storage.regionstorage;
20 import voxelman.utils.rlecompression;
21 
22 private ubyte[4096*16] compressBuffer;
23 private ubyte[4096*16] buffer;
24 
25 void storageWorkerThread(Tid mainTid, string regionDir)
26 {
27 	RegionStorage regionStorage = RegionStorage(regionDir);
28 	shared(bool)* isRunning;
29 	bool isRunningLocal = true;
30 	receive( (shared(bool)* _isRunning){isRunning = _isRunning;} );
31 
32 	void writeChunk(ChunkWorldPos chunkPos, BlockData data, TimestampType timestamp)
33 	{
34 		if (regionStorage.isChunkOnDisk(chunkPos) &&
35 			timestamp <= regionStorage.chunkTimestamp(chunkPos)) return;
36 
37 		BlockData compressedData = data;
38 		compressedData.blocks = rleEncode(data.blocks, compressBuffer);
39 
40 		try
41 		{
42 			size_t encodedSize = encodeCborArray(buffer[], compressedData);
43 			regionStorage.writeChunk(chunkPos, buffer[0..encodedSize], timestamp);
44 		}
45 		catch(Exception e)
46 		{
47 			errorf("storage exception %s", e.to!string);
48 		}
49 	}
50 
51 	void doWrite(immutable(SaveSnapshotMessage)* message)
52 	{
53 		auto m = cast(SaveSnapshotMessage*)message;
54 		writeChunk(m.cwp, m.snapshot.blockData, m.snapshot.timestamp);
55 
56 		auto res = new SnapshotSavedMessage(m.cwp, m.snapshot);
57 		mainTid.send(cast(immutable(SnapshotSavedMessage)*)res);
58 	}
59 
60 	void readChunk(immutable(LoadSnapshotMessage)* message)
61 	{
62 		auto m = cast(LoadSnapshotMessage*)message;
63 		bool doGen = !regionStorage.isChunkOnDisk(m.cwp);
64 
65 		try
66 		if (!doGen) {
67 			TimestampType timestamp;
68 			ubyte[] cborData = regionStorage.readChunk(m.cwp, buffer[], timestamp);
69 
70 			if (cborData !is null) {
71 				BlockData compressedData = decodeCborSingle!BlockData(cborData);
72 				BlockData blockData = compressedData;
73 				blockData.blocks = rleDecode(compressedData.blocks, compressBuffer);
74 
75 				if (blockData.blocks.length > 0) {
76 					bool validLength = blockData.blocks.length == CHUNK_SIZE_CUBE;
77 					warningf(!validLength, "Wrong chunk data %s", m.cwp);
78 
79 					if (validLength) {
80 						m.blockBuffer[] = blockData.blocks;
81 						blockData.blocks = m.blockBuffer;
82 					}
83 				}
84 				else
85 					blockData.blocks = m.blockBuffer;
86 
87 				auto res = new SnapshotLoadedMessage(m.cwp, BlockDataSnapshot(blockData, timestamp));
88 				mainTid.send(cast(immutable(SnapshotLoadedMessage)*)res);
89 			}
90 			else
91 				doGen = true;
92 		}
93 		catch(Exception e) {
94 			infof("storage exception %s regenerating %s", e.to!string, m.cwp);
95 			doGen = true;
96 		}
97 
98 		if (doGen)
99 			m.genWorker.send(message);
100 	}
101 
102 	try
103 	{
104 		while (isRunningLocal)
105 		{
106 			receive(
107 				// read
108 				(immutable(LoadSnapshotMessage)* message)
109 				{
110 					if (!atomicLoad(*isRunning))
111 						return;
112 					readChunk(message);
113 				},
114 				// write
115 				(immutable(SaveSnapshotMessage)* message)
116 				{
117 					doWrite(message);
118 				},
119 				(Variant v)
120 				{
121 					isRunningLocal = false;
122 					regionStorage.clear();
123 				}
124 			);
125 		}
126 	}
127 	catch(Throwable t)
128 	{
129 		infof("%s from storage worker", t.to!string);
130 		throw t;
131 	}
132 }