tasklets.hh
Go to the documentation of this file.
1// -*- mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*-
2// vi: set et ts=4 sw=4 sts=4:
3/*
4 This file is part of the Open Porous Media project (OPM).
5
6 OPM is free software: you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation, either version 2 of the License, or
9 (at your option) any later version.
10
11 OPM is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
15
16 You should have received a copy of the GNU General Public License
17 along with OPM. If not, see <http://www.gnu.org/licenses/>.
18
19 Consult the COPYING file in the top-level source directory of this
20 module for the precise wording of the license and the list of
21 copyright holders.
22*/
27#ifndef EWOMS_TASKLETS_HH
28#define EWOMS_TASKLETS_HH
29
30#include <stdexcept>
31#include <cassert>
32#include <thread>
33#include <queue>
34#include <mutex>
35#include <iostream>
36#include <condition_variable>
37
38namespace Opm {
39
46{
47public:
48 TaskletInterface(int refCount = 1)
49 : referenceCount_(refCount)
50 {}
51 virtual ~TaskletInterface() {}
52 virtual void run() = 0;
53 virtual bool isEndMarker () const { return false; }
54
56 { -- referenceCount_; }
57
58 int referenceCount() const
59 { return referenceCount_; }
60
61private:
62 int referenceCount_;
63};
64
69template <class Fn>
71{
72public:
74 FunctionRunnerTasklet(int numInvocations, const Fn& fn)
75 : TaskletInterface(numInvocations)
76 , fn_(fn)
77 {}
78 void run() override
79 { fn_(); }
80
81private:
82 const Fn& fn_;
83};
84
85class TaskletRunner;
86
87// this class stores the thread local static attributes for the TaskletRunner class. we
88// cannot put them directly into TaskletRunner because defining static members for
89// non-template classes in headers leads the linker to choke in case multiple compile
90// units are used.
91template <class Dummy = void>
93{
94 static thread_local TaskletRunner* taskletRunner_;
95 static thread_local int workerThreadIndex_;
96};
97
98template <class Dummy>
100
101template <class Dummy>
103
111{
113 class BarrierTasklet : public TaskletInterface
114 {
115 public:
116 BarrierTasklet(unsigned numWorkers)
117 : TaskletInterface(/*refCount=*/numWorkers)
118 {
119 numWorkers_ = numWorkers;
120 numWaiting_ = 0;
121 }
122
123 void run()
124 { wait(); }
125
126 void wait()
127 {
128 std::unique_lock<std::mutex> lock(barrierMutex_);
129
130 numWaiting_ += 1;
131 if (numWaiting_ >= numWorkers_ + 1) {
132 lock.unlock();
133 barrierCondition_.notify_all();
134 }
135 else {
136 const auto& areAllWaiting =
137 [this]() -> bool
138 { return this->numWaiting_ >= this->numWorkers_ + 1; };
139
140 barrierCondition_.wait(lock, /*predicate=*/areAllWaiting);
141 }
142 }
143
144 private:
145 unsigned numWorkers_;
146 unsigned numWaiting_;
147
148 std::condition_variable barrierCondition_;
149 std::mutex barrierMutex_;
150 };
151
154 class TerminateThreadTasklet : public TaskletInterface
155 {
156 public:
157 void run()
158 { }
159
160 bool isEndMarker() const
161 { return true; }
162 };
163
164public:
165 // prohibit copying of tasklet runners
166 TaskletRunner(const TaskletRunner&) = delete;
167
174 TaskletRunner(unsigned numWorkers)
175 {
176 threads_.resize(numWorkers);
177 for (unsigned i = 0; i < numWorkers; ++i)
178 // create a worker thread
179 threads_[i].reset(new std::thread(startWorkerThread_, this, i));
180 }
181
190 {
191 if (threads_.size() > 0) {
192 // dispatch a tasklet which will terminate the worker thread
193 dispatch(std::make_shared<TerminateThreadTasklet>());
194
195 // wait until all worker threads have terminated
196 for (auto& thread : threads_)
197 thread->join();
198 }
199 }
200
207 {
209 return -1;
211 }
212
217 { return threads_.size(); }
218
224 void dispatch(std::shared_ptr<TaskletInterface> tasklet)
225 {
226 if (threads_.empty()) {
227 // run the tasklet immediately in synchronous mode.
228 while (tasklet->referenceCount() > 0) {
229 tasklet->dereference();
230 try {
231 tasklet->run();
232 }
233 catch (const std::exception& e) {
234 std::cerr << "ERROR: Uncaught std::exception when running tasklet: " << e.what() << ". Trying to continue.\n";
235 }
236 catch (...) {
237 std::cerr << "ERROR: Uncaught exception (general type) when running tasklet. Trying to continue.\n";
238 }
239 }
240 }
241 else {
242 // lock mutex for the tasklet queue to make sure that nobody messes with the
243 // task queue
244 taskletQueueMutex_.lock();
245
246 // add the tasklet to the queue
247 taskletQueue_.push(tasklet);
248
249 taskletQueueMutex_.unlock();
250
251 workAvailableCondition_.notify_all();
252 }
253 }
254
258 template <class Fn>
259 std::shared_ptr<FunctionRunnerTasklet<Fn> > dispatchFunction(Fn &fn, int numInvocations=1)
260 {
261 using Tasklet = FunctionRunnerTasklet<Fn>;
262 auto tasklet = std::make_shared<Tasklet>(numInvocations, fn);
263 this->dispatch(tasklet);
264 return tasklet;
265 }
266
270 void barrier()
271 {
272 unsigned numWorkers = threads_.size();
273 if (numWorkers == 0)
274 // nothing needs to be done to implement a barrier in synchronous mode
275 return;
276
277 // dispatch a barrier tasklet and wait until it has been run by the worker thread
278 auto barrierTasklet = std::make_shared<BarrierTasklet>(numWorkers);
279 dispatch(barrierTasklet);
280
281 barrierTasklet->wait();
282 }
283
284protected:
285 // main function of the worker thread
286 static void startWorkerThread_(TaskletRunner* taskletRunner, int workerThreadIndex)
287 {
290
291 taskletRunner->run_();
292 }
293
295 void run_()
296 {
297 while (true) {
298 // wait until tasklets have been pushed to the queue. first we need to lock
299 // mutex for access to taskletQueue_
300 std::unique_lock<std::mutex> lock(taskletQueueMutex_);
301
302 const auto& workIsAvailable =
303 [this]() -> bool
304 { return !taskletQueue_.empty(); };
305
306 if (!workIsAvailable())
307 workAvailableCondition_.wait(lock, /*predicate=*/workIsAvailable);
308
309 // remove tasklet from queue
310 std::shared_ptr<TaskletInterface> tasklet = taskletQueue_.front();
311
312 // if tasklet is an end marker, terminate the thread and DO NOT remove the
313 // tasklet.
314 if (tasklet->isEndMarker()) {
315 if(taskletQueue_.size() > 1)
316 throw std::logic_error("TaskletRunner: Not all queued tasklets were executed");
317 taskletQueueMutex_.unlock();
318 return;
319 }
320
321 tasklet->dereference();
322 if (tasklet->referenceCount() == 0)
323 // remove tasklets from the queue as soon as their reference count
324 // reaches zero, i.e. the tasklet has been run often enough.
325 taskletQueue_.pop();
326 lock.unlock();
327
328 // execute tasklet
329 try {
330 tasklet->run();
331 }
332 catch (const std::exception& e) {
333 std::cerr << "ERROR: Uncaught std::exception when running tasklet: " << e.what() << ". Trying to continue.\n";
334 }
335 catch (...) {
336 std::cerr << "ERROR: Uncaught exception when running tasklet. Trying to continue.\n";
337 }
338 }
339 }
340
341 std::vector<std::unique_ptr<std::thread> > threads_;
342 std::queue<std::shared_ptr<TaskletInterface> > taskletQueue_;
344 std::condition_variable workAvailableCondition_;
345};
346
347} // end namespace Opm
348#endif
A simple tasklet that runs a function that returns void and does not take any arguments a given numbe...
Definition: tasklets.hh:71
FunctionRunnerTasklet(int numInvocations, const Fn &fn)
Definition: tasklets.hh:74
FunctionRunnerTasklet(const FunctionRunnerTasklet &)=default
void run() override
Definition: tasklets.hh:78
The base class for tasklets.
Definition: tasklets.hh:46
virtual bool isEndMarker() const
Definition: tasklets.hh:53
TaskletInterface(int refCount=1)
Definition: tasklets.hh:48
int referenceCount() const
Definition: tasklets.hh:58
void dereference()
Definition: tasklets.hh:55
virtual ~TaskletInterface()
Definition: tasklets.hh:51
virtual void run()=0
Handles where a given tasklet is run.
Definition: tasklets.hh:111
int workerThreadIndex() const
Returns the index of the current worker thread.
Definition: tasklets.hh:206
int numWorkerThreads() const
Returns the number of worker threads for the tasklet runner.
Definition: tasklets.hh:216
void run_()
do the work until the queue received an end tasklet
Definition: tasklets.hh:295
std::mutex taskletQueueMutex_
Definition: tasklets.hh:343
~TaskletRunner()
Destructor.
Definition: tasklets.hh:189
std::condition_variable workAvailableCondition_
Definition: tasklets.hh:344
void barrier()
Make sure that all tasklets have been completed after this method has been called.
Definition: tasklets.hh:270
std::shared_ptr< FunctionRunnerTasklet< Fn > > dispatchFunction(Fn &fn, int numInvocations=1)
Convenience method to construct a new function runner tasklet and dispatch it immediately.
Definition: tasklets.hh:259
void dispatch(std::shared_ptr< TaskletInterface > tasklet)
Add a new tasklet.
Definition: tasklets.hh:224
TaskletRunner(unsigned numWorkers)
Creates a tasklet runner with numWorkers underling threads for doing work.
Definition: tasklets.hh:174
static void startWorkerThread_(TaskletRunner *taskletRunner, int workerThreadIndex)
Definition: tasklets.hh:286
std::vector< std::unique_ptr< std::thread > > threads_
Definition: tasklets.hh:341
TaskletRunner(const TaskletRunner &)=delete
std::queue< std::shared_ptr< TaskletInterface > > taskletQueue_
Definition: tasklets.hh:342
void reset()
Definition: parametersystem.hh:898
Definition: blackoilboundaryratevector.hh:37
Definition: tasklets.hh:93
static thread_local TaskletRunner * taskletRunner_
Definition: tasklets.hh:94
static thread_local int workerThreadIndex_
Definition: tasklets.hh:95