1 /** 2 Copyright: Copyright (c) 2013-2017 Andrey Penechko. 3 License: $(WEB boost.org/LICENSE_1_0.txt, Boost License 1.0). 4 Authors: Andrey Penechko. 5 */ 6 module voxelman.utils.worker; 7 8 import std.concurrency : spawn, Tid; 9 import std..string : format; 10 import core.atomic; 11 import core.sync.semaphore; 12 import core.sync.condition; 13 import core.sync.mutex; 14 import core.thread : Thread; 15 16 import voxelman.core.config : QUEUE_LENGTH; 17 public import voxelman.utils.sharedqueue; 18 19 20 shared struct Worker 21 { 22 Thread thread; 23 bool running = true; 24 SharedQueue taskQueue; 25 SharedQueue resultQueue; 26 size_t groupIndex; 27 // also notified on stop 28 Semaphore workAvaliable; 29 30 // for owner 31 void alloc(size_t groupIndex = 0, string debugName = "W", size_t capacity = QUEUE_LENGTH) shared { 32 taskQueue.alloc(format("%s_task", debugName), capacity); 33 resultQueue.alloc(format("%s_res", debugName), capacity); 34 workAvaliable = cast(shared) new Semaphore(); 35 this.groupIndex = groupIndex; 36 } 37 38 void stop() shared { 39 atomicStore(running, false); 40 (cast(Semaphore)workAvaliable).notify(); 41 } 42 43 void notify() shared { 44 (cast(Semaphore)workAvaliable).notify(); 45 } 46 47 void signalStopped() shared { 48 atomicStore(running, false); 49 } 50 51 bool isRunning() shared @property { 52 return (cast(Thread)thread).isRunning; 53 } 54 55 bool isStopped() shared @property const { 56 return !(cast(Thread)thread).isRunning; 57 } 58 59 void free() shared { 60 taskQueue.free(); 61 resultQueue.free(); 62 } 63 64 // for worker 65 void waitForNotify() shared const { 66 (cast(Semaphore)workAvaliable).wait(); 67 } 68 69 bool needsToRun() shared @property { 70 return atomicLoad!(MemoryOrder.acq)(running); 71 } 72 73 bool queuesEmpty() shared @property const { 74 return taskQueue.empty && resultQueue.empty; 75 } 76 } 77 78 Thread spawnWorker(F, T...)(F fn, T args) 79 { 80 void exec() 81 { 82 fn( args ); 83 } 84 auto t = new Thread(&exec); 85 t.start(); 86 t.isDaemon = true; 87 return t; 88 } 89 90 shared struct WorkerGroup 91 { 92 import std.traits : ParameterTypeTuple; 93 94 private bool _areWorkersStarted; 95 private size_t _nextWorker; 96 private uint _numWorkers; 97 private bool _areWorkersRunning; 98 99 Worker[] workers; 100 size_t numWorkers; 101 102 void startWorkers(F, T...)(size_t _numWorkers, F fn, T args) shared 103 { 104 if (_areWorkersStarted) return; 105 106 numWorkers = _numWorkers; 107 queueLengths.length = numWorkers; 108 workers.length = numWorkers; 109 110 foreach(i, ref worker; workers) 111 { 112 worker.alloc(i); 113 worker.thread = cast(shared)spawnWorker(fn, &worker, args); 114 } 115 116 _areWorkersStarted = true; 117 } 118 119 private static struct QLen {size_t i; size_t len;} 120 private QLen[] queueLengths; 121 122 shared(Worker)* nextWorker() shared @property 123 { 124 import std.algorithm : sort; 125 foreach(i; 0..numWorkers) 126 { 127 queueLengths[i].i = i; 128 queueLengths[i].len = workers[i].taskQueue.length; 129 } 130 sort!((a,b) => a.len < b.len)(queueLengths);// balance worker queues 131 //_nextWorker = (_nextWorker + 1) % numWorkers; // round robin 132 return &workers[queueLengths[0].i]; 133 } 134 135 bool queuesEmpty() 136 { 137 bool empty = true; 138 foreach(ref worker; workers) empty = empty && (worker.queuesEmpty || worker.isStopped); 139 return empty; 140 } 141 142 bool allWorkersStopped() 143 { 144 bool stopped = true; 145 foreach(ref worker; workers) stopped = stopped && worker.isStopped; 146 return stopped; 147 } 148 149 void stop() shared 150 { 151 foreach(ref worker; workers) 152 { 153 worker.stop(); 154 } 155 156 while (!allWorkersStopped()) 157 { 158 Thread.yield(); 159 } 160 161 free(); 162 } 163 164 void free() shared 165 { 166 foreach(ref w; workers) 167 { 168 w.free(); 169 } 170 } 171 }