p2pcommunicator.hh
Go to the documentation of this file.
1/*
2 Copyright 2015 IRIS AS
3
4 This file is part of the Open Porous Media project (OPM).
5
6 OPM is free software: you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation, either version 3 of the License, or
9 (at your option) any later version.
10
11 OPM is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
15
16 You should have received a copy of the GNU General Public License
17 along with OPM. If not, see <http://www.gnu.org/licenses/>.
18*/
19#ifndef DUNE_COMMUNICATOR_HEADER_INCLUDED
20#define DUNE_COMMUNICATOR_HEADER_INCLUDED
21
22#include <cassert>
23#include <algorithm>
24#include <vector>
25#include <set>
26#include <map>
27
28#include <dune/common/parallel/mpihelper.hh>
29#include <dune/common/parallel/communication.hh>
30
31// the following implementation is only available in case MPI is available
32#if HAVE_MPI
33#include <dune/common/parallel/mpicommunication.hh>
34#endif
35
36namespace Dune
37{
39 {
40 typedef std::vector< char > BufferType;
41
42 mutable BufferType buffer_;
43 const double factor_;
44 mutable size_t pos_;
45public:
48 SimpleMessageBuffer( const double factor = 1.1 )
49 : buffer_(), factor_( factor )
50 {
52 }
53
55 void clear() { buffer_.clear(); resetReadPosition(); }
57 void resetReadPosition() { pos_ = 0 ; }
59 size_t size() const { return buffer_.size(); }
60
62 void reserve( const size_t size )
63 {
64 buffer_.reserve( size );
65 }
66
68 void resize( const size_t size )
69 {
70 buffer_.resize( size );
71 }
72
74 template <class T>
75 void write( const T& value )
76 {
77 // union to access bytes in value
78 const size_t tsize = sizeof( T );
79 size_t pos = buffer_.size();
80 const size_t sizeNeeded = pos + tsize ;
81 // reserve with some 10% overestimation
82 if( buffer_.capacity() < sizeNeeded )
83 {
84 reserve( size_t(factor_ * sizeNeeded) ) ;
85 }
86 // resize to size need to store value
87 buffer_.resize( sizeNeeded );
88 // copy value to buffer
89 std::copy_n( reinterpret_cast<const char *> (&value), tsize, buffer_.data()+pos );
90 }
91
92 void write( const std::string& str)
93 {
94 int size = str.size();
95 write(size);
96 for (int k = 0; k < size; ++k) {
97 write(str[k]);
98 }
99 }
100
102 template <class T>
103 void read( T& value ) const
104 {
105 // read bytes from stream and store in value
106 const size_t tsize = sizeof( T );
107 assert( pos_ + tsize <= buffer_.size() );
108 std::copy_n( buffer_.data()+pos_, tsize, reinterpret_cast<char *> (&value) );
109 pos_ += tsize;
110 }
111
112 void read( std::string& str) const
113 {
114 int size = 0;
115 read(size);
116 str.resize(size);
117 for (int k = 0; k < size; ++k) {
118 read(str[k]);
119 }
120 }
121
123 std::pair< char* , int > buffer() const
124 {
125 return std::make_pair( buffer_.data(), int(buffer_.size()) );
126 }
127 };
128
130 template < class MsgBuffer >
131 class Point2PointCommunicator : public CollectiveCommunication< MPIHelper::MPICommunicator >
132 {
133 public:
135 typedef MPIHelper::MPICommunicator MPICommunicator ;
136
138 typedef MsgBuffer MessageBufferType ;
139
140 protected:
141 using BaseType = Dune::Communication<MPICommunicator>;
143
144 // starting message tag
145 static const int messagetag = 234;
146
147 typedef std::map< int, int > linkage_t;
148 typedef std::vector< int > vector_t;
149
152
155
158
159 public :
160 using BaseType :: rank;
161 using BaseType :: size;
162
163 /* \brief data handle interface that needs to be implemented for use with some of
164 * the exchange methods */
166 {
167 protected:
169 public:
171 virtual void pack( const int link, MessageBufferType& os ) = 0 ;
172 virtual void unpack( const int link, MessageBufferType& os ) = 0 ;
173 // should contain work that could be done between send and receive
174 virtual void localComputation () {}
175 };
176
177 public:
179 Point2PointCommunicator( const MPICommunicator& mpiComm = MPIHelper::getCommunicator() )
180 : BaseType( mpiComm ) { removeLinkage(); }
181
184
185
187 inline void insertRequest( const std::set< int >& sendLinks, const std::set< int >& recvLinks );
188
190 inline int sendLinks () const { return sendLinkage_.size(); }
191
193 inline int recvLinks () const { return recvLinkage_.size(); }
194
196 const vector_t& recvBufferSizes() const { return _recvBufferSizes; }
197
199 inline int sendLink (const int rank) const
200 {
201 assert (sendLinkage_.end () != sendLinkage_.find (rank)) ;
202 return (* sendLinkage_.find (rank)).second ;
203 }
204
206 inline int recvLink (const int rank) const
207 {
208 assert (recvLinkage_.end () != recvLinkage_.find (rank)) ;
209 return (* recvLinkage_.find (rank)).second ;
210 }
211
213 const std::vector< int > &sendDest () const { return sendDest_; }
215 const std::vector< int > &recvSource () const { return recvSource_; }
216
218 inline void removeLinkage () ;
219
221 virtual std::vector< MessageBufferType > exchange (const std::vector< MessageBufferType > &) const;
222
224 virtual void exchange ( DataHandleInterface& ) const;
225
229 virtual void exchangeCached ( DataHandleInterface& ) const;
230
231 protected:
232 inline void computeDestinations( const linkage_t& linkage, vector_t& dest );
233
234 // return new tag number for the exchange messages
235 int getMessageTag(const unsigned int increment) const
236 {
237 const int retTag = this->tag_;
238 this->generateNextMessageTag(increment);
239 return retTag;
240 }
241
242 // return new tag number for the exchange messages
243 int getMessageTag() const
244 {
245 return this->getMessageTag(1u);
246 }
247
248 private:
249 mutable int tag_{messagetag + 2};
250
251 void generateNextMessageTag(const unsigned int increment) const
252 {
253 this->tag_ += increment;
254
255 // Reset to initial value if next tag exceeds MPI standard's maximum
256 // message tag guarantee of 2^15-1.
257 if (this->tag_ >= (1 << 15) - 1)
258 {
259 this->tag_ = messagetag + 2;
260 }
261 }
262 };
263
264} // namespace Dune
265
266// include inline implementation
268
269#endif // #ifndef DUNE_COMMUNICATOR_HEADER_INCLUDED
Definition: p2pcommunicator.hh:166
virtual void unpack(const int link, MessageBufferType &os)=0
virtual ~DataHandleInterface()
Definition: p2pcommunicator.hh:170
virtual void pack(const int link, MessageBufferType &os)=0
DataHandleInterface()
Definition: p2pcommunicator.hh:168
virtual void localComputation()
Definition: p2pcommunicator.hh:174
Point-2-Point communicator for exchange messages between processes.
Definition: p2pcommunicator.hh:132
MsgBuffer MessageBufferType
type of message buffer used
Definition: p2pcommunicator.hh:138
linkage_t sendLinkage_
Definition: p2pcommunicator.hh:150
Point2PointCommunicator< MessageBufferType > ThisType
Definition: p2pcommunicator.hh:142
int recvLinks() const
return number of processes we will receive data from
Definition: p2pcommunicator.hh:193
int recvLink(const int rank) const
return recv link number for a given recv rank number
Definition: p2pcommunicator.hh:206
int getMessageTag() const
Definition: p2pcommunicator.hh:243
Dune::Communication< MPICommunicator > BaseType
Definition: p2pcommunicator.hh:141
bool _recvBufferSizesComputed
Definition: p2pcommunicator.hh:157
const std::vector< int > & sendDest() const
return vector containing all process numbers we will send to
Definition: p2pcommunicator.hh:213
const std::vector< int > & recvSource() const
return vector containing all process numbers we will receive from
Definition: p2pcommunicator.hh:215
MPIHelper::MPICommunicator MPICommunicator
type of MPI communicator, either MPI_Comm or NoComm as defined in MPIHelper
Definition: p2pcommunicator.hh:135
std::map< int, int > linkage_t
Definition: p2pcommunicator.hh:147
int sendLink(const int rank) const
return send link number for a given send rank number
Definition: p2pcommunicator.hh:199
virtual void exchangeCached(DataHandleInterface &) const
exchange data with peers, handle defines pack and unpack of data, if receive buffers are known from p...
Definition: p2pcommunicator_impl.hh:617
vector_t sendDest_
Definition: p2pcommunicator.hh:153
int sendLinks() const
return number of processes we will send data to
Definition: p2pcommunicator.hh:190
vector_t recvSource_
Definition: p2pcommunicator.hh:154
linkage_t recvLinkage_
Definition: p2pcommunicator.hh:151
vector_t _recvBufferSizes
Definition: p2pcommunicator.hh:156
const vector_t & recvBufferSizes() const
return vector containing possible recv buffer sizes
Definition: p2pcommunicator.hh:196
virtual std::vector< MessageBufferType > exchange(const std::vector< MessageBufferType > &) const
exchange message buffers with peers defined by inserted linkage
Definition: p2pcommunicator_impl.hh:599
Point2PointCommunicator(const MPICommunicator &mpiComm=MPIHelper::getCommunicator())
constructor taking mpi communicator
Definition: p2pcommunicator.hh:179
static const int messagetag
Definition: p2pcommunicator.hh:145
int getMessageTag(const unsigned int increment) const
Definition: p2pcommunicator.hh:235
std::vector< int > vector_t
Definition: p2pcommunicator.hh:148
void insertRequest(const std::set< int > &sendLinks, const std::set< int > &recvLinks)
insert communication request with a set os ranks to send to and a set of ranks to receive from
Definition: p2pcommunicator_impl.hh:59
void computeDestinations(const linkage_t &linkage, vector_t &dest)
Definition: p2pcommunicator_impl.hh:45
void removeLinkage()
remove stored linkage
Definition: p2pcommunicator_impl.hh:30
Point2PointCommunicator(const BaseType &comm)
constructor taking collective communication
Definition: p2pcommunicator.hh:183
Definition: p2pcommunicator.hh:39
void write(const std::string &str)
Definition: p2pcommunicator.hh:92
void resetReadPosition()
reset read position of buffer to beginning
Definition: p2pcommunicator.hh:57
void read(std::string &str) const
Definition: p2pcommunicator.hh:112
size_t size() const
return size of buffer
Definition: p2pcommunicator.hh:59
std::pair< char *, int > buffer() const
return pointer to buffer and size for use with MPI functions
Definition: p2pcommunicator.hh:123
void clear()
clear the buffer
Definition: p2pcommunicator.hh:55
void reserve(const size_t size)
reserve memory for 'size' entries
Definition: p2pcommunicator.hh:62
void resize(const size_t size)
resize buffer to 'size' entries
Definition: p2pcommunicator.hh:68
SimpleMessageBuffer(const double factor=1.1)
constructor taking memory reserve estimation factor (default is 1.1, i.e. 10% over estimation )
Definition: p2pcommunicator.hh:48
void read(T &value) const
read value from buffer, value must implement the operator= correctly (i.e. no internal pointers etc....
Definition: p2pcommunicator.hh:103
void write(const T &value)
write value to buffer, value must implement the operator= correctly (i.e. no internal pointers etc....
Definition: p2pcommunicator.hh:75
The namespace Dune is the main namespace for all Dune code.
Definition: common/CartesianIndexMapper.hpp:10
T * increment(T *cc, int i, int dim)
Increment an iterator over an array that reresents a dense row-major matrix with dims columns.
Definition: GridHelpers.hpp:354