1 /**
2 Copyright: Copyright (c) 2013-2016 Andrey Penechko.
3 License: $(WEB boost.org/LICENSE_1_0.txt, Boost License 1.0).
4 Authors: Andrey Penechko.
5 */
6 module voxelman.utils.worker;
7 
8 import std.concurrency : spawn, Tid;
9 import std.string : format;
10 import core.atomic;
11 import core.sync.semaphore;
12 import core.sync.condition;
13 import core.sync.mutex;
14 import core.thread : Thread;
15 
16 import voxelman.core.config : QUEUE_LENGTH;
17 public import voxelman.utils.sharedqueue;
18 
19 
20 shared struct Worker
21 {
22 	Thread thread;
23 	bool running = true;
24 	SharedQueue taskQueue;
25 	SharedQueue resultQueue;
26 	// also notified on stop
27 	Semaphore workAvaliable;
28 
29 	// for owner
30 	void alloc(string debugName = "W", size_t capacity = QUEUE_LENGTH) shared {
31 		taskQueue.alloc(format("%s_task", debugName), capacity);
32 		resultQueue.alloc(format("%s_res", debugName), capacity);
33 		workAvaliable = cast(shared) new Semaphore();
34 	}
35 
36 	void stop() shared {
37 		atomicStore(running, false);
38 		(cast(Semaphore)workAvaliable).notify();
39 	}
40 
41 	void notify() shared {
42 		(cast(Semaphore)workAvaliable).notify();
43 	}
44 
45 	void signalStopped() shared {
46 		atomicStore(running, false);
47 	}
48 
49 	bool isRunning() shared @property {
50 		return (cast(Thread)thread).isRunning;
51 	}
52 
53 	bool isStopped() shared @property const {
54 		return !(cast(Thread)thread).isRunning;
55 	}
56 
57 	void free() shared {
58 		taskQueue.free();
59 		resultQueue.free();
60 	}
61 
62 	// for worker
63 	void waitForNotify() shared const {
64 		(cast(Semaphore)workAvaliable).wait();
65 	}
66 
67 	bool needsToRun() shared @property {
68 		return atomicLoad!(MemoryOrder.acq)(running);
69 	}
70 
71 	bool queuesEmpty() shared @property const {
72 		return taskQueue.empty && resultQueue.empty;
73 	}
74 }
75 
76 Thread spawnWorker(F, T...)(F fn, T args)
77 {
78 	void exec()
79 	{
80 		fn( args );
81 	}
82 	auto t = new Thread(&exec);
83 	t.start();
84 	t.isDaemon = true;
85 	return t;
86 }
87 
88 shared struct WorkerGroup
89 {
90 	import std.traits : ParameterTypeTuple;
91 
92 	private bool _areWorkersStarted;
93 	private size_t _nextWorker;
94 	private uint _numWorkers;
95 	private bool _areWorkersRunning;
96 
97 	Worker[] workers;
98 	size_t numWorkers;
99 
100 	void startWorkers(F, T...)(size_t _numWorkers, F fn, T args) shared
101 	{
102 		import std.algorithm.comparison : clamp;
103 
104 		if (_areWorkersStarted) return;
105 
106 		numWorkers = clamp(_numWorkers, 1, 16);
107 		queueLengths.length = numWorkers;
108 		workers.length = numWorkers;
109 
110 		foreach(ref worker; workers)
111 		{
112 			worker.alloc();
113 			worker.thread = cast(shared)spawnWorker(fn, &worker, args);
114 		}
115 
116 		_areWorkersStarted = true;
117 	}
118 
119 	private static struct QLen {size_t i; size_t len;}
120 	private QLen[] queueLengths;
121 
122 	shared(Worker)* nextWorker() shared @property
123 	{
124 		import std.algorithm : sort;
125 		foreach(i; 0..numWorkers)
126 		{
127 			queueLengths[i].i = i;
128 			queueLengths[i].len = workers[i].taskQueue.length;
129 		}
130 		sort!((a,b) => a.len < b.len)(queueLengths);// balance worker queues
131 		//_nextWorker = (_nextWorker + 1) % numWorkers; // round robin
132 		return &workers[queueLengths[0].i];
133 	}
134 
135 	bool queuesEmpty()
136 	{
137 		bool empty = true;
138 		foreach(ref worker; workers) empty = empty && worker.queuesEmpty;
139 		return empty;
140 	}
141 
142 	bool allWorkersStopped()
143 	{
144 		bool stopped = true;
145 		foreach(ref worker; workers) stopped = stopped && worker.isStopped;
146 		return stopped;
147 	}
148 
149 	void stop() shared
150 	{
151 		foreach(ref worker; workers)
152 		{
153 			worker.stop();
154 		}
155 
156 		while (!allWorkersStopped())
157 		{
158 			Thread.yield();
159 		}
160 
161 		free();
162 	}
163 
164 	void free() shared
165 	{
166 		foreach(ref w; workers)
167 		{
168 			w.free();
169 		}
170 	}
171 }