1 /**
2 Copyright: Copyright (c) 2013-2018 Andrey Penechko.
3 License: $(WEB boost.org/LICENSE_1_0.txt, Boost License 1.0).
4 Authors: Andrey Penechko.
5 */
6 module voxelman.thread.worker;
7 
8 import std.concurrency : spawn, Tid;
9 import std..string : format;
10 import core.atomic;
11 import core.sync.semaphore;
12 import core.sync.condition;
13 import core.sync.mutex;
14 import core.thread : Thread;
15 
16 public import voxelman.thread.sharedqueue;
17 
18 enum QUEUE_LENGTH = 1024*1024*1;
19 enum MAX_LOAD_QUEUE_LENGTH = QUEUE_LENGTH / 2;
20 
21 shared struct Worker
22 {
23 	Thread thread;
24 	bool running = true;
25 	SharedQueue taskQueue;
26 	SharedQueue resultQueue;
27 	size_t groupIndex;
28 	// also notified on stop
29 	Semaphore workAvaliable;
30 
31 	// for owner
32 	void alloc(size_t groupIndex = 0, string debugName = "W", size_t capacity = QUEUE_LENGTH) shared {
33 		taskQueue.alloc(format("%s_task", debugName), capacity);
34 		resultQueue.alloc(format("%s_res", debugName), capacity);
35 		workAvaliable = cast(shared) new Semaphore();
36 		this.groupIndex = groupIndex;
37 	}
38 
39 	void stop() shared {
40 		atomicStore(running, false);
41 		(cast(Semaphore)workAvaliable).notify();
42 	}
43 
44 	void notify() shared {
45 		(cast(Semaphore)workAvaliable).notify();
46 	}
47 
48 	void signalStopped() shared {
49 		atomicStore(running, false);
50 	}
51 
52 	bool isRunning() shared @property {
53 		return (cast(Thread)thread).isRunning;
54 	}
55 
56 	bool isStopped() shared @property const {
57 		return !(cast(Thread)thread).isRunning;
58 	}
59 
60 	void free() shared {
61 		taskQueue.free();
62 		resultQueue.free();
63 	}
64 
65 	// for worker
66 	void waitForNotify() shared const {
67 		(cast(Semaphore)workAvaliable).wait();
68 	}
69 
70 	bool needsToRun() shared @property {
71 		return atomicLoad!(MemoryOrder.acq)(running);
72 	}
73 
74 	bool queuesEmpty() shared @property const {
75 		return taskQueue.empty && resultQueue.empty;
76 	}
77 }
78 
79 Thread spawnWorker(F, T...)(F fn, T args)
80 {
81 	void exec()
82 	{
83 		fn( args );
84 	}
85 	auto t = new Thread(&exec);
86 	t.start();
87 	t.isDaemon = true;
88 	return t;
89 }
90 
91 shared struct WorkerGroup
92 {
93 	import std.traits : ParameterTypeTuple;
94 
95 	private bool _areWorkersStarted;
96 	private size_t _nextWorker;
97 	private uint _numWorkers;
98 	private bool _areWorkersRunning;
99 
100 	Worker[] workers;
101 	size_t numWorkers;
102 
103 	void startWorkers(F, T...)(size_t _numWorkers, F fn, T args) shared
104 	{
105 		if (_areWorkersStarted) return;
106 
107 		numWorkers = _numWorkers;
108 		queueLengths.length = numWorkers;
109 		workers.length = numWorkers;
110 
111 		foreach(i, ref worker; workers)
112 		{
113 			worker.alloc(i);
114 			worker.thread = cast(shared)spawnWorker(fn, &worker, args);
115 		}
116 
117 		_areWorkersStarted = true;
118 	}
119 
120 	private static struct QLen {size_t i; size_t len;}
121 	private QLen[] queueLengths;
122 
123 	shared(Worker)* nextWorker() shared @property
124 	{
125 		import std.algorithm : sort;
126 		foreach(i; 0..numWorkers)
127 		{
128 			queueLengths[i].i = i;
129 			queueLengths[i].len = workers[i].taskQueue.length;
130 		}
131 		sort!((a,b) => a.len < b.len)(queueLengths);// balance worker queues
132 		//_nextWorker = (_nextWorker + 1) % numWorkers; // round robin
133 		return &workers[queueLengths[0].i];
134 	}
135 
136 	bool queuesEmpty()
137 	{
138 		bool empty = true;
139 		foreach(ref worker; workers) empty = empty && (worker.queuesEmpty || worker.isStopped);
140 		return empty;
141 	}
142 
143 	bool allWorkersStopped()
144 	{
145 		bool stopped = true;
146 		foreach(ref worker; workers) stopped = stopped && worker.isStopped;
147 		return stopped;
148 	}
149 
150 	void stop() shared
151 	{
152 		foreach(ref worker; workers)
153 		{
154 			worker.stop();
155 		}
156 
157 		while (!allWorkersStopped())
158 		{
159 			Thread.yield();
160 		}
161 
162 		free();
163 	}
164 
165 	void free() shared
166 	{
167 		foreach(ref w; workers)
168 		{
169 			w.free();
170 		}
171 	}
172 }