1 /** 2 Copyright: Copyright (c) 2013-2016 Andrey Penechko. 3 License: $(WEB boost.org/LICENSE_1_0.txt, Boost License 1.0). 4 Authors: Andrey Penechko. 5 */ 6 module voxelman.utils.worker; 7 8 import std.concurrency : spawn, Tid; 9 import std.string : format; 10 import core.atomic; 11 import core.sync.semaphore; 12 import core.sync.condition; 13 import core.sync.mutex; 14 import core.thread : Thread; 15 16 import voxelman.core.config : QUEUE_LENGTH; 17 public import voxelman.utils.sharedqueue; 18 19 20 shared struct Worker 21 { 22 Thread thread; 23 bool running = true; 24 SharedQueue taskQueue; 25 SharedQueue resultQueue; 26 // also notified on stop 27 Semaphore workAvaliable; 28 29 // for owner 30 void alloc(string debugName = "W", size_t capacity = QUEUE_LENGTH) shared { 31 taskQueue.alloc(format("%s_task", debugName), capacity); 32 resultQueue.alloc(format("%s_res", debugName), capacity); 33 workAvaliable = cast(shared) new Semaphore(); 34 } 35 36 void stop() shared { 37 atomicStore(running, false); 38 (cast(Semaphore)workAvaliable).notify(); 39 } 40 41 void notify() shared { 42 (cast(Semaphore)workAvaliable).notify(); 43 } 44 45 void signalStopped() shared { 46 atomicStore(running, false); 47 } 48 49 bool isRunning() shared @property { 50 return (cast(Thread)thread).isRunning; 51 } 52 53 bool isStopped() shared @property const { 54 return !(cast(Thread)thread).isRunning; 55 } 56 57 void free() shared { 58 taskQueue.free(); 59 resultQueue.free(); 60 } 61 62 // for worker 63 void waitForNotify() shared const { 64 (cast(Semaphore)workAvaliable).wait(); 65 } 66 67 bool needsToRun() shared @property { 68 return atomicLoad!(MemoryOrder.acq)(running); 69 } 70 71 bool queuesEmpty() shared @property const { 72 return taskQueue.empty && resultQueue.empty; 73 } 74 } 75 76 Thread spawnWorker(F, T...)(F fn, T args) 77 { 78 void exec() 79 { 80 fn( args ); 81 } 82 auto t = new Thread(&exec); 83 t.start(); 84 t.isDaemon = true; 85 return t; 86 } 87 88 shared struct WorkerGroup 89 { 90 import std.traits : ParameterTypeTuple; 91 92 private bool _areWorkersStarted; 93 private size_t _nextWorker; 94 private uint _numWorkers; 95 private bool _areWorkersRunning; 96 97 Worker[] workers; 98 size_t numWorkers; 99 100 void startWorkers(F, T...)(size_t _numWorkers, F fn, T args) shared 101 { 102 import std.algorithm.comparison : clamp; 103 104 if (_areWorkersStarted) return; 105 106 numWorkers = clamp(_numWorkers, 1, 16); 107 queueLengths.length = numWorkers; 108 workers.length = numWorkers; 109 110 foreach(ref worker; workers) 111 { 112 worker.alloc(); 113 worker.thread = cast(shared)spawnWorker(fn, &worker, args); 114 } 115 116 _areWorkersStarted = true; 117 } 118 119 private static struct QLen {size_t i; size_t len;} 120 private QLen[] queueLengths; 121 122 shared(Worker)* nextWorker() shared @property 123 { 124 import std.algorithm : sort; 125 foreach(i; 0..numWorkers) 126 { 127 queueLengths[i].i = i; 128 queueLengths[i].len = workers[i].taskQueue.length; 129 } 130 sort!((a,b) => a.len < b.len)(queueLengths);// balance worker queues 131 //_nextWorker = (_nextWorker + 1) % numWorkers; // round robin 132 return &workers[queueLengths[0].i]; 133 } 134 135 bool queuesEmpty() 136 { 137 bool empty = true; 138 foreach(ref worker; workers) empty = empty && worker.queuesEmpty; 139 return empty; 140 } 141 142 bool allWorkersStopped() 143 { 144 bool stopped = true; 145 foreach(ref worker; workers) stopped = stopped && worker.isStopped; 146 return stopped; 147 } 148 149 void stop() shared 150 { 151 foreach(ref worker; workers) 152 { 153 worker.stop(); 154 } 155 156 while (!allWorkersStopped()) 157 { 158 Thread.yield(); 159 } 160 161 free(); 162 } 163 164 void free() shared 165 { 166 foreach(ref w; workers) 167 { 168 w.free(); 169 } 170 } 171 }