1 /**
2 Copyright: Copyright (c) 2013-2017 Andrey Penechko.
3 License: $(WEB boost.org/LICENSE_1_0.txt, Boost License 1.0).
4 Authors: Andrey Penechko.
5 */
6 module voxelman.utils.worker;
7 
8 import std.concurrency : spawn, Tid;
9 import std..string : format;
10 import core.atomic;
11 import core.sync.semaphore;
12 import core.sync.condition;
13 import core.sync.mutex;
14 import core.thread : Thread;
15 
16 import voxelman.core.config : QUEUE_LENGTH;
17 public import voxelman.utils.sharedqueue;
18 
19 
20 shared struct Worker
21 {
22 	Thread thread;
23 	bool running = true;
24 	SharedQueue taskQueue;
25 	SharedQueue resultQueue;
26 	size_t groupIndex;
27 	// also notified on stop
28 	Semaphore workAvaliable;
29 
30 	// for owner
31 	void alloc(size_t groupIndex = 0, string debugName = "W", size_t capacity = QUEUE_LENGTH) shared {
32 		taskQueue.alloc(format("%s_task", debugName), capacity);
33 		resultQueue.alloc(format("%s_res", debugName), capacity);
34 		workAvaliable = cast(shared) new Semaphore();
35 		this.groupIndex = groupIndex;
36 	}
37 
38 	void stop() shared {
39 		atomicStore(running, false);
40 		(cast(Semaphore)workAvaliable).notify();
41 	}
42 
43 	void notify() shared {
44 		(cast(Semaphore)workAvaliable).notify();
45 	}
46 
47 	void signalStopped() shared {
48 		atomicStore(running, false);
49 	}
50 
51 	bool isRunning() shared @property {
52 		return (cast(Thread)thread).isRunning;
53 	}
54 
55 	bool isStopped() shared @property const {
56 		return !(cast(Thread)thread).isRunning;
57 	}
58 
59 	void free() shared {
60 		taskQueue.free();
61 		resultQueue.free();
62 	}
63 
64 	// for worker
65 	void waitForNotify() shared const {
66 		(cast(Semaphore)workAvaliable).wait();
67 	}
68 
69 	bool needsToRun() shared @property {
70 		return atomicLoad!(MemoryOrder.acq)(running);
71 	}
72 
73 	bool queuesEmpty() shared @property const {
74 		return taskQueue.empty && resultQueue.empty;
75 	}
76 }
77 
78 Thread spawnWorker(F, T...)(F fn, T args)
79 {
80 	void exec()
81 	{
82 		fn( args );
83 	}
84 	auto t = new Thread(&exec);
85 	t.start();
86 	t.isDaemon = true;
87 	return t;
88 }
89 
90 shared struct WorkerGroup
91 {
92 	import std.traits : ParameterTypeTuple;
93 
94 	private bool _areWorkersStarted;
95 	private size_t _nextWorker;
96 	private uint _numWorkers;
97 	private bool _areWorkersRunning;
98 
99 	Worker[] workers;
100 	size_t numWorkers;
101 
102 	void startWorkers(F, T...)(size_t _numWorkers, F fn, T args) shared
103 	{
104 		if (_areWorkersStarted) return;
105 
106 		numWorkers = _numWorkers;
107 		queueLengths.length = numWorkers;
108 		workers.length = numWorkers;
109 
110 		foreach(i, ref worker; workers)
111 		{
112 			worker.alloc(i);
113 			worker.thread = cast(shared)spawnWorker(fn, &worker, args);
114 		}
115 
116 		_areWorkersStarted = true;
117 	}
118 
119 	private static struct QLen {size_t i; size_t len;}
120 	private QLen[] queueLengths;
121 
122 	shared(Worker)* nextWorker() shared @property
123 	{
124 		import std.algorithm : sort;
125 		foreach(i; 0..numWorkers)
126 		{
127 			queueLengths[i].i = i;
128 			queueLengths[i].len = workers[i].taskQueue.length;
129 		}
130 		sort!((a,b) => a.len < b.len)(queueLengths);// balance worker queues
131 		//_nextWorker = (_nextWorker + 1) % numWorkers; // round robin
132 		return &workers[queueLengths[0].i];
133 	}
134 
135 	bool queuesEmpty()
136 	{
137 		bool empty = true;
138 		foreach(ref worker; workers) empty = empty && (worker.queuesEmpty || worker.isStopped);
139 		return empty;
140 	}
141 
142 	bool allWorkersStopped()
143 	{
144 		bool stopped = true;
145 		foreach(ref worker; workers) stopped = stopped && worker.isStopped;
146 		return stopped;
147 	}
148 
149 	void stop() shared
150 	{
151 		foreach(ref worker; workers)
152 		{
153 			worker.stop();
154 		}
155 
156 		while (!allWorkersStopped())
157 		{
158 			Thread.yield();
159 		}
160 
161 		free();
162 	}
163 
164 	void free() shared
165 	{
166 		foreach(ref w; workers)
167 		{
168 			w.free();
169 		}
170 	}
171 }