1 /** 2 Copyright: Copyright (c) 2013-2018 Andrey Penechko. 3 License: $(WEB boost.org/LICENSE_1_0.txt, Boost License 1.0). 4 Authors: Andrey Penechko. 5 */ 6 module voxelman.thread.worker; 7 8 import std.concurrency : spawn, Tid; 9 import std..string : format; 10 import core.atomic; 11 import core.sync.semaphore; 12 import core.sync.condition; 13 import core.sync.mutex; 14 import core.thread : Thread; 15 16 public import voxelman.thread.sharedqueue; 17 18 enum QUEUE_LENGTH = 1024*1024*1; 19 enum MAX_LOAD_QUEUE_LENGTH = QUEUE_LENGTH / 2; 20 21 shared struct Worker 22 { 23 Thread thread; 24 bool running = true; 25 SharedQueue taskQueue; 26 SharedQueue resultQueue; 27 size_t groupIndex; 28 // also notified on stop 29 Semaphore workAvaliable; 30 31 // for owner 32 void alloc(size_t groupIndex = 0, string debugName = "W", size_t capacity = QUEUE_LENGTH) shared { 33 taskQueue.alloc(format("%s_task", debugName), capacity); 34 resultQueue.alloc(format("%s_res", debugName), capacity); 35 workAvaliable = cast(shared) new Semaphore(); 36 this.groupIndex = groupIndex; 37 } 38 39 void stop() shared { 40 atomicStore(running, false); 41 (cast(Semaphore)workAvaliable).notify(); 42 } 43 44 void notify() shared { 45 (cast(Semaphore)workAvaliable).notify(); 46 } 47 48 void signalStopped() shared { 49 atomicStore(running, false); 50 } 51 52 bool isRunning() shared @property { 53 return (cast(Thread)thread).isRunning; 54 } 55 56 bool isStopped() shared @property const { 57 return !(cast(Thread)thread).isRunning; 58 } 59 60 void free() shared { 61 taskQueue.free(); 62 resultQueue.free(); 63 } 64 65 // for worker 66 void waitForNotify() shared const { 67 (cast(Semaphore)workAvaliable).wait(); 68 } 69 70 bool needsToRun() shared @property { 71 return atomicLoad!(MemoryOrder.acq)(running); 72 } 73 74 bool queuesEmpty() shared @property const { 75 return taskQueue.empty && resultQueue.empty; 76 } 77 } 78 79 Thread spawnWorker(F, T...)(F fn, T args) 80 { 81 void exec() 82 { 83 fn( args ); 84 } 85 auto t = new Thread(&exec); 86 t.start(); 87 t.isDaemon = true; 88 return t; 89 } 90 91 shared struct WorkerGroup 92 { 93 import std.traits : ParameterTypeTuple; 94 95 private bool _areWorkersStarted; 96 private size_t _nextWorker; 97 private uint _numWorkers; 98 private bool _areWorkersRunning; 99 100 Worker[] workers; 101 size_t numWorkers; 102 103 void startWorkers(F, T...)(size_t _numWorkers, F fn, T args) shared 104 { 105 if (_areWorkersStarted) return; 106 107 numWorkers = _numWorkers; 108 queueLengths.length = numWorkers; 109 workers.length = numWorkers; 110 111 foreach(i, ref worker; workers) 112 { 113 worker.alloc(i); 114 worker.thread = cast(shared)spawnWorker(fn, &worker, args); 115 } 116 117 _areWorkersStarted = true; 118 } 119 120 private static struct QLen {size_t i; size_t len;} 121 private QLen[] queueLengths; 122 123 shared(Worker)* nextWorker() shared @property 124 { 125 import std.algorithm : sort; 126 foreach(i; 0..numWorkers) 127 { 128 queueLengths[i].i = i; 129 queueLengths[i].len = workers[i].taskQueue.length; 130 } 131 sort!((a,b) => a.len < b.len)(queueLengths);// balance worker queues 132 //_nextWorker = (_nextWorker + 1) % numWorkers; // round robin 133 return &workers[queueLengths[0].i]; 134 } 135 136 bool queuesEmpty() 137 { 138 bool empty = true; 139 foreach(ref worker; workers) empty = empty && (worker.queuesEmpty || worker.isStopped); 140 return empty; 141 } 142 143 bool allWorkersStopped() 144 { 145 bool stopped = true; 146 foreach(ref worker; workers) stopped = stopped && worker.isStopped; 147 return stopped; 148 } 149 150 void stop() shared 151 { 152 foreach(ref worker; workers) 153 { 154 worker.stop(); 155 } 156 157 while (!allWorkersStopped()) 158 { 159 Thread.yield(); 160 } 161 162 free(); 163 } 164 165 void free() shared 166 { 167 foreach(ref w; workers) 168 { 169 w.free(); 170 } 171 } 172 }