tasklets.hh
Go to the documentation of this file.
1// -*- mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*-
2// vi: set et ts=4 sw=4 sts=4:
3/*
4 This file is part of the Open Porous Media project (OPM).
5
6 OPM is free software: you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation, either version 2 of the License, or
9 (at your option) any later version.
10
11 OPM is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
15
16 You should have received a copy of the GNU General Public License
17 along with OPM. If not, see <http://www.gnu.org/licenses/>.
18
19 Consult the COPYING file in the top-level source directory of this
20 module for the precise wording of the license and the list of
21 copyright holders.
22*/
27#ifndef EWOMS_TASKLETS_HH
28#define EWOMS_TASKLETS_HH
29
30#include <atomic>
31#include <stdexcept>
32#include <cassert>
33#include <thread>
34#include <queue>
35#include <mutex>
36#include <iostream>
37#include <condition_variable>
38
39namespace Opm {
40
47{
48public:
49 TaskletInterface(int refCount = 1)
50 : referenceCount_(refCount)
51 {}
52 virtual ~TaskletInterface() {}
53 virtual void run() = 0;
54 virtual bool isEndMarker () const { return false; }
55
57 { -- referenceCount_; }
58
59 int referenceCount() const
60 { return referenceCount_; }
61
62private:
63 int referenceCount_;
64};
65
70template <class Fn>
72{
73public:
75 FunctionRunnerTasklet(int numInvocations, const Fn& fn)
76 : TaskletInterface(numInvocations)
77 , fn_(fn)
78 {}
79 void run() override
80 { fn_(); }
81
82private:
83 const Fn& fn_;
84};
85
86class TaskletRunner;
87
88// this class stores the thread local static attributes for the TaskletRunner class. we
89// cannot put them directly into TaskletRunner because defining static members for
90// non-template classes in headers leads the linker to choke in case multiple compile
91// units are used.
92template <class Dummy = void>
94{
95 static thread_local TaskletRunner* taskletRunner_;
96 static thread_local int workerThreadIndex_;
97};
98
99template <class Dummy>
101
102template <class Dummy>
104
112{
114 class BarrierTasklet : public TaskletInterface
115 {
116 public:
117 BarrierTasklet(unsigned numWorkers)
118 : TaskletInterface(/*refCount=*/numWorkers)
119 {
120 numWorkers_ = numWorkers;
121 numWaiting_ = 0;
122 }
123
124 void run()
125 { wait(); }
126
127 void wait()
128 {
129 std::unique_lock<std::mutex> lock(barrierMutex_);
130
131 numWaiting_ += 1;
132 if (numWaiting_ >= numWorkers_ + 1) {
133 lock.unlock();
134 barrierCondition_.notify_all();
135 }
136 else {
137 const auto& areAllWaiting =
138 [this]() -> bool
139 { return this->numWaiting_ >= this->numWorkers_ + 1; };
140
141 barrierCondition_.wait(lock, /*predicate=*/areAllWaiting);
142 }
143 }
144
145 private:
146 unsigned numWorkers_;
147 unsigned numWaiting_;
148
149 std::condition_variable barrierCondition_;
150 std::mutex barrierMutex_;
151 };
152
155 class TerminateThreadTasklet : public TaskletInterface
156 {
157 public:
158 void run()
159 { }
160
161 bool isEndMarker() const
162 { return true; }
163 };
164
165public:
166 // prohibit copying of tasklet runners
167 TaskletRunner(const TaskletRunner&) = delete;
168
175 TaskletRunner(unsigned numWorkers)
176 {
177 threads_.resize(numWorkers);
178 for (unsigned i = 0; i < numWorkers; ++i)
179 // create a worker thread
180 threads_[i].reset(new std::thread(startWorkerThread_, this, i));
181 }
182
191 {
192 if (threads_.size() > 0) {
193 // dispatch a tasklet which will terminate the worker thread
194 dispatch(std::make_shared<TerminateThreadTasklet>());
195
196 // wait until all worker threads have terminated
197 for (auto& thread : threads_)
198 thread->join();
199 }
200 }
201
202 bool failure() const
203 {
204 return this->failureFlag_.load(std::memory_order_relaxed);
205 }
206
213 {
215 return -1;
217 }
218
223 { return threads_.size(); }
224
230 void dispatch(std::shared_ptr<TaskletInterface> tasklet)
231 {
232 if (threads_.empty()) {
233 // run the tasklet immediately in synchronous mode.
234 while (tasklet->referenceCount() > 0) {
235 tasklet->dereference();
236 try {
237 tasklet->run();
238 }
239 catch (const std::exception& e) {
240 std::cerr << "ERROR: Uncaught std::exception when running tasklet: " << e.what() << ". Trying to continue.\n";
241 failureFlag_.store(true, std::memory_order_relaxed);
242 }
243 catch (...) {
244 std::cerr << "ERROR: Uncaught exception (general type) when running tasklet. Trying to continue.\n";
245 failureFlag_.store(true, std::memory_order_relaxed);
246 }
247 }
248 }
249 else {
250 // lock mutex for the tasklet queue to make sure that nobody messes with the
251 // task queue
252 taskletQueueMutex_.lock();
253
254 // add the tasklet to the queue
255 taskletQueue_.push(tasklet);
256
257 taskletQueueMutex_.unlock();
258
259 workAvailableCondition_.notify_all();
260 }
261 }
262
266 template <class Fn>
267 std::shared_ptr<FunctionRunnerTasklet<Fn> > dispatchFunction(Fn &fn, int numInvocations=1)
268 {
269 using Tasklet = FunctionRunnerTasklet<Fn>;
270 auto tasklet = std::make_shared<Tasklet>(numInvocations, fn);
271 this->dispatch(tasklet);
272 return tasklet;
273 }
274
278 void barrier()
279 {
280 unsigned numWorkers = threads_.size();
281 if (numWorkers == 0)
282 // nothing needs to be done to implement a barrier in synchronous mode
283 return;
284
285 // dispatch a barrier tasklet and wait until it has been run by the worker thread
286 auto barrierTasklet = std::make_shared<BarrierTasklet>(numWorkers);
287 dispatch(barrierTasklet);
288
289 barrierTasklet->wait();
290 }
291private:
292 // Atomic flag that is set to failure if any of the tasklets run by the TaskletRunner fails.
293 // This flag is checked before new tasklets run or get dispatched and in case it is true, the
294 // thread execution will be stopped / no new tasklets will be started and the program will abort.
295 // To set the flag and load the flag, we use std::memory_order_relaxed.
296 // Atomic operations tagged memory_order_relaxed are not synchronization operations; they do not
297 // impose an order among concurrent memory accesses. They guarantee atomicity and modification order
298 // consistency. This is the right choice for the setting here, since it is enough to broadcast failure
299 // before new run or get dispatched.
300 std::atomic<bool> failureFlag_ = false;
301
302protected:
303 // main function of the worker thread
304 static void startWorkerThread_(TaskletRunner* taskletRunner, int workerThreadIndex)
305 {
308
309 taskletRunner->run_();
310 }
311
313 void run_()
314 {
315 while (true) {
316
317 // wait until tasklets have been pushed to the queue. first we need to lock
318 // mutex for access to taskletQueue_
319 std::unique_lock<std::mutex> lock(taskletQueueMutex_);
320
321 const auto& workIsAvailable =
322 [this]() -> bool
323 { return !taskletQueue_.empty(); };
324
325 if (!workIsAvailable())
326 workAvailableCondition_.wait(lock, /*predicate=*/workIsAvailable);
327
328 // remove tasklet from queue
329 std::shared_ptr<TaskletInterface> tasklet = taskletQueue_.front();
330
331 // if tasklet is an end marker, terminate the thread and DO NOT remove the
332 // tasklet.
333 if (tasklet->isEndMarker()) {
334 if(taskletQueue_.size() > 1)
335 throw std::logic_error("TaskletRunner: Not all queued tasklets were executed");
336 taskletQueueMutex_.unlock();
337 return;
338 }
339
340 tasklet->dereference();
341 if (tasklet->referenceCount() == 0)
342 // remove tasklets from the queue as soon as their reference count
343 // reaches zero, i.e. the tasklet has been run often enough.
344 taskletQueue_.pop();
345 lock.unlock();
346
347 // execute tasklet
348 try {
349 tasklet->run();
350 }
351 catch (const std::exception& e) {
352 std::cerr << "ERROR: Uncaught std::exception when running tasklet: " << e.what() << ".\n";
353 failureFlag_.store(true, std::memory_order_relaxed);
354 }
355 catch (...) {
356 std::cerr << "ERROR: Uncaught exception when running tasklet.\n";
357 failureFlag_.store(true, std::memory_order_relaxed);
358 }
359 }
360 }
361
362 std::vector<std::unique_ptr<std::thread> > threads_;
363 std::queue<std::shared_ptr<TaskletInterface> > taskletQueue_;
365 std::condition_variable workAvailableCondition_;
366};
367
368} // end namespace Opm
369#endif
A simple tasklet that runs a function that returns void and does not take any arguments a given numbe...
Definition: tasklets.hh:72
FunctionRunnerTasklet(int numInvocations, const Fn &fn)
Definition: tasklets.hh:75
FunctionRunnerTasklet(const FunctionRunnerTasklet &)=default
void run() override
Definition: tasklets.hh:79
The base class for tasklets.
Definition: tasklets.hh:47
virtual bool isEndMarker() const
Definition: tasklets.hh:54
TaskletInterface(int refCount=1)
Definition: tasklets.hh:49
int referenceCount() const
Definition: tasklets.hh:59
void dereference()
Definition: tasklets.hh:56
virtual ~TaskletInterface()
Definition: tasklets.hh:52
virtual void run()=0
Handles where a given tasklet is run.
Definition: tasklets.hh:112
int workerThreadIndex() const
Returns the index of the current worker thread.
Definition: tasklets.hh:212
bool failure() const
Definition: tasklets.hh:202
int numWorkerThreads() const
Returns the number of worker threads for the tasklet runner.
Definition: tasklets.hh:222
void run_()
do the work until the queue received an end tasklet
Definition: tasklets.hh:313
std::mutex taskletQueueMutex_
Definition: tasklets.hh:364
~TaskletRunner()
Destructor.
Definition: tasklets.hh:190
std::condition_variable workAvailableCondition_
Definition: tasklets.hh:365
void barrier()
Make sure that all tasklets have been completed after this method has been called.
Definition: tasklets.hh:278
std::shared_ptr< FunctionRunnerTasklet< Fn > > dispatchFunction(Fn &fn, int numInvocations=1)
Convenience method to construct a new function runner tasklet and dispatch it immediately.
Definition: tasklets.hh:267
void dispatch(std::shared_ptr< TaskletInterface > tasklet)
Add a new tasklet.
Definition: tasklets.hh:230
TaskletRunner(unsigned numWorkers)
Creates a tasklet runner with numWorkers underling threads for doing work.
Definition: tasklets.hh:175
static void startWorkerThread_(TaskletRunner *taskletRunner, int workerThreadIndex)
Definition: tasklets.hh:304
std::vector< std::unique_ptr< std::thread > > threads_
Definition: tasklets.hh:362
TaskletRunner(const TaskletRunner &)=delete
std::queue< std::shared_ptr< TaskletInterface > > taskletQueue_
Definition: tasklets.hh:363
void reset()
Definition: parametersystem.hh:936
Definition: blackoilboundaryratevector.hh:37
Definition: tasklets.hh:94
static thread_local TaskletRunner * taskletRunner_
Definition: tasklets.hh:95
static thread_local int workerThreadIndex_
Definition: tasklets.hh:96