opm-grid
p2pcommunicator.hh
1 /*
2  Copyright 2015 IRIS AS
3 
4  This file is part of the Open Porous Media project (OPM).
5 
6  OPM is free software: you can redistribute it and/or modify
7  it under the terms of the GNU General Public License as published by
8  the Free Software Foundation, either version 3 of the License, or
9  (at your option) any later version.
10 
11  OPM is distributed in the hope that it will be useful,
12  but WITHOUT ANY WARRANTY; without even the implied warranty of
13  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  GNU General Public License for more details.
15 
16  You should have received a copy of the GNU General Public License
17  along with OPM. If not, see <http://www.gnu.org/licenses/>.
18 */
19 #ifndef DUNE_COMMUNICATOR_HEADER_INCLUDED
20 #define DUNE_COMMUNICATOR_HEADER_INCLUDED
21 
22 #include <cassert>
23 #include <algorithm>
24 #include <vector>
25 #include <set>
26 #include <map>
27 
28 #include <dune/common/parallel/mpihelper.hh>
29 #include <dune/common/parallel/communication.hh>
30 
31 // the following implementation is only available in case MPI is available
32 #if HAVE_MPI
33 #include <dune/common/parallel/mpicommunication.hh>
34 #endif
35 
36 namespace Dune
37 {
39  {
40  typedef std::vector< char > BufferType;
41 
42  mutable BufferType buffer_;
43  const double factor_;
44  mutable size_t pos_;
45 public:
48  explicit SimpleMessageBuffer( const double factor = 1.1 )
49  : buffer_(), factor_( factor )
50  {
52  }
53 
55  void clear() { buffer_.clear(); resetReadPosition(); }
57  void resetReadPosition() { pos_ = 0 ; }
59  size_t size() const { return buffer_.size(); }
60 
62  void reserve( const size_t size )
63  {
64  buffer_.reserve( size );
65  }
66 
68  void resize( const size_t size )
69  {
70  buffer_.resize( size );
71  }
72 
74  template <class T>
75  void write( const T& value )
76  {
77  // union to access bytes in value
78  const size_t tsize = sizeof( T );
79  size_t pos = buffer_.size();
80  const size_t sizeNeeded = pos + tsize ;
81  // reserve with some 10% overestimation
82  if( buffer_.capacity() < sizeNeeded )
83  {
84  reserve( size_t(factor_ * sizeNeeded) ) ;
85  }
86  // resize to size need to store value
87  buffer_.resize( sizeNeeded );
88  // copy value to buffer
89  std::copy_n( reinterpret_cast<const char *> (&value), tsize, buffer_.data()+pos );
90  }
91 
92  void write( const std::string& str)
93  {
94  int size = str.size();
95  write(size);
96  for (int k = 0; k < size; ++k) {
97  write(str[k]);
98  }
99  }
100 
102  template <class T>
103  void read( T& value ) const
104  {
105  // read bytes from stream and store in value
106  const size_t tsize = sizeof( T );
107  assert( pos_ + tsize <= buffer_.size() );
108  std::copy_n( buffer_.data()+pos_, tsize, reinterpret_cast<char *> (&value) );
109  pos_ += tsize;
110  }
111 
112  void read( std::string& str) const
113  {
114  int size = 0;
115  read(size);
116  str.resize(size);
117  for (int k = 0; k < size; ++k) {
118  read(str[k]);
119  }
120  }
121 
123  std::pair< char* , int > buffer() const
124  {
125  return std::make_pair( buffer_.data(), int(buffer_.size()) );
126  }
127  };
128 
130  template < class MsgBuffer >
131  class Point2PointCommunicator : public Dune::Communication< MPIHelper::MPICommunicator >
132  {
133  public:
135  typedef MPIHelper::MPICommunicator MPICommunicator ;
136 
138  typedef MsgBuffer MessageBufferType ;
139 
140  protected:
141  using BaseType = Dune::Communication<MPICommunicator>;
143 
144  // starting message tag
145  static const int messagetag = 234;
146 
147  typedef std::map< int, int > linkage_t;
148  typedef std::vector< int > vector_t;
149 
150  linkage_t sendLinkage_ ;
151  linkage_t recvLinkage_ ;
152 
153  vector_t sendDest_ ;
154  vector_t recvSource_ ;
155 
156  mutable vector_t _recvBufferSizes;
157  mutable bool _recvBufferSizesComputed;
158 
159  public :
160  using BaseType :: rank;
161  using BaseType :: size;
162 
163  /* \brief data handle interface that needs to be implemented for use with some of
164  * the exchange methods */
166  {
167  protected:
168  DataHandleInterface () {}
169  public:
170  virtual ~DataHandleInterface () {}
171  virtual void pack( const int link, MessageBufferType& os ) = 0 ;
172  virtual void unpack( const int link, MessageBufferType& os ) = 0 ;
173  // should contain work that could be done between send and receive
174  virtual void localComputation () {}
175  };
176 
177  public:
179  explicit Point2PointCommunicator( const MPICommunicator& mpiComm = MPIHelper::getCommunicator() )
180  : BaseType( mpiComm ) { removeLinkage(); }
181 
183  explicit Point2PointCommunicator( const BaseType& comm ) : BaseType( comm ) { removeLinkage(); }
184 
185 
187  inline void insertRequest( const std::set< int >& sendLinks, const std::set< int >& recvLinks );
188 
190  inline int sendLinks () const { return sendLinkage_.size(); }
191 
193  inline int recvLinks () const { return recvLinkage_.size(); }
194 
196  const vector_t& recvBufferSizes() const { return _recvBufferSizes; }
197 
199  inline int sendLink (const int rank) const
200  {
201  assert (sendLinkage_.end () != sendLinkage_.find (rank)) ;
202  return (* sendLinkage_.find (rank)).second ;
203  }
204 
206  inline int recvLink (const int rank) const
207  {
208  assert (recvLinkage_.end () != recvLinkage_.find (rank)) ;
209  return (* recvLinkage_.find (rank)).second ;
210  }
211 
213  const std::vector< int > &sendDest () const { return sendDest_; }
215  const std::vector< int > &recvSource () const { return recvSource_; }
216 
218  inline void removeLinkage () ;
219 
221  virtual std::vector< MessageBufferType > exchange (const std::vector< MessageBufferType > &) const;
222 
224  virtual void exchange ( DataHandleInterface& ) const;
225 
229  virtual void exchangeCached ( DataHandleInterface& ) const;
230 
231  protected:
232  inline void computeDestinations( const linkage_t& linkage, vector_t& dest );
233 
234  // return new tag number for the exchange messages
235  int getMessageTag(const unsigned int increment) const
236  {
237  const int retTag = this->tag_;
238  this->generateNextMessageTag(increment);
239  return retTag;
240  }
241 
242  // return new tag number for the exchange messages
243  int getMessageTag() const
244  {
245  return this->getMessageTag(1u);
246  }
247 
248  private:
249  mutable int tag_{messagetag + 2};
250 
251  void generateNextMessageTag(const unsigned int increment) const
252  {
253  this->tag_ += increment;
254 
255  // Reset to initial value if next tag exceeds MPI standard's maximum
256  // message tag guarantee of 2^15-1.
257  if (this->tag_ >= (1 << 15) - 1)
258  {
259  this->tag_ = messagetag + 2;
260  }
261  }
262  };
263 
264 } // namespace Dune
265 
266 // include inline implementation
267 #include "p2pcommunicator_impl.hh"
268 
269 #endif // #ifndef DUNE_COMMUNICATOR_HEADER_INCLUDED
SimpleMessageBuffer(const double factor=1.1)
constructor taking memory reserve estimation factor (default is 1.1, i.e.
Definition: p2pcommunicator.hh:48
virtual void exchangeCached(DataHandleInterface &) const
exchange data with peers, handle defines pack and unpack of data, if receive buffers are known from p...
Definition: p2pcommunicator_impl.hh:620
std::pair< char *, int > buffer() const
return pointer to buffer and size for use with MPI functions
Definition: p2pcommunicator.hh:123
void removeLinkage()
remove stored linkage
Definition: p2pcommunicator_impl.hh:30
void read(T &value) const
read value from buffer, value must implement the operator= correctly (i.e.
Definition: p2pcommunicator.hh:103
void resize(const size_t size)
resize buffer to &#39;size&#39; entries
Definition: p2pcommunicator.hh:68
void write(const T &value)
write value to buffer, value must implement the operator= correctly (i.e.
Definition: p2pcommunicator.hh:75
Point2PointCommunicator(const MPICommunicator &mpiComm=MPIHelper::getCommunicator())
constructor taking mpi communicator
Definition: p2pcommunicator.hh:179
The namespace Dune is the main namespace for all Dune code.
Definition: CartesianIndexMapper.hpp:9
int recvLink(const int rank) const
return recv link number for a given recv rank number
Definition: p2pcommunicator.hh:206
const std::vector< int > & sendDest() const
return vector containing all process numbers we will send to
Definition: p2pcommunicator.hh:213
size_t size() const
return size of buffer
Definition: p2pcommunicator.hh:59
MsgBuffer MessageBufferType
type of message buffer used
Definition: p2pcommunicator.hh:138
void clear()
clear the buffer
Definition: p2pcommunicator.hh:55
virtual std::vector< MessageBufferType > exchange(const std::vector< MessageBufferType > &) const
exchange message buffers with peers defined by inserted linkage
Definition: p2pcommunicator_impl.hh:600
Definition: p2pcommunicator.hh:165
int recvLinks() const
return number of processes we will receive data from
Definition: p2pcommunicator.hh:193
int sendLinks() const
return number of processes we will send data to
Definition: p2pcommunicator.hh:190
Point-2-Point communicator for exchange messages between processes.
Definition: p2pcommunicator.hh:131
MPIHelper::MPICommunicator MPICommunicator
type of MPI communicator, either MPI_Comm or NoComm as defined in MPIHelper
Definition: p2pcommunicator.hh:135
void insertRequest(const std::set< int > &sendLinks, const std::set< int > &recvLinks)
insert communication request with a set os ranks to send to and a set of ranks to receive from ...
Definition: p2pcommunicator_impl.hh:59
const std::vector< int > & recvSource() const
return vector containing all process numbers we will receive from
Definition: p2pcommunicator.hh:215
Definition: p2pcommunicator.hh:38
void resetReadPosition()
reset read position of buffer to beginning
Definition: p2pcommunicator.hh:57
void reserve(const size_t size)
reserve memory for &#39;size&#39; entries
Definition: p2pcommunicator.hh:62
const vector_t & recvBufferSizes() const
return vector containing possible recv buffer sizes
Definition: p2pcommunicator.hh:196
int sendLink(const int rank) const
return send link number for a given send rank number
Definition: p2pcommunicator.hh:199
Point2PointCommunicator(const BaseType &comm)
constructor taking collective communication
Definition: p2pcommunicator.hh:183