opm-grid
p2pcommunicator_impl.hh
1 /*
2  Copyright 2015 IRIS AS
3 
4  This file is part of the Open Porous Media project (OPM).
5 
6  OPM is free software: you can redistribute it and/or modify
7  it under the terms of the GNU General Public License as published by
8  the Free Software Foundation, either version 3 of the License, or
9  (at your option) any later version.
10 
11  OPM is distributed in the hope that it will be useful,
12  but WITHOUT ANY WARRANTY; without even the implied warranty of
13  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  GNU General Public License for more details.
15 
16  You should have received a copy of the GNU General Public License
17  along with OPM. If not, see <http://www.gnu.org/licenses/>.
18 */
19 #ifndef DUNE_POINT2POINTCOMMUNICATOR_IMPL_HEADER_INCLUDED
20 #define DUNE_POINT2POINTCOMMUNICATOR_IMPL_HEADER_INCLUDED
21 
22 #include <iostream>
23 
24 namespace Dune
25 {
26 
27  template <class MsgBuffer>
28  inline void
31  {
32  sendLinkage_.clear();
33  recvLinkage_.clear();
34  sendDest_.clear();
35  recvSource_.clear();
36 
37  // clear previously stored buffer sizes
38  _recvBufferSizes.clear();
39  _recvBufferSizesComputed = false ;
40  }
41 
42  template <class MsgBuffer>
43  inline void
45  computeDestinations( const linkage_t& linkage, vector_t& dest )
46  {
47  typedef linkage_t::const_iterator const_iterator ;
48  dest.resize( linkage.size() );
49  const const_iterator linkageEnd = linkage.end ();
50  for( const_iterator i = linkage.begin (); i != linkageEnd; ++i )
51  {
52  dest[ (*i).second ] = (*i).first;
53  }
54  }
55 
56  template <class MsgBuffer>
57  inline void
59  insertRequest( const std::set< int >& sendLinks, const std::set< int >& recvLinks )
60  {
61  // remove old linkage
62  removeLinkage();
63 
64  const int me_rank = rank ();
65 
66  {
67  typedef std::map< int, int >::iterator iterator ;
68  typedef std::set< int >::const_iterator const_iterator;
69 
70  const iterator sendEnd = sendLinkage_.end ();
71  const iterator recvEnd = recvLinkage_.end ();
72  const const_iterator sendLinksEnd = sendLinks.end ();
73  int sendLink = 0 ;
74  int recvLink = 0 ;
75  for (const_iterator i = sendLinks.begin (); i != sendLinksEnd; ++i )
76  {
77  const int rank = (*i);
78  // if rank was not inserted, insert with current link number
79  if( rank != me_rank && (sendLinkage_.find ( rank ) == sendEnd ) )
80  {
81  sendLinkage_.insert( std::make_pair( rank, sendLink++) );
82  }
83  }
84 
85  const const_iterator recvLinksEnd = recvLinks.end ();
86  for (const_iterator i = recvLinks.begin (); i != recvLinksEnd; ++i )
87  {
88  const int rank = (*i);
89  // if rank was not inserted, insert with current link number
90  if( rank != me_rank && (recvLinkage_.find ( rank ) == recvEnd ) )
91  {
92  recvLinkage_.insert( std::make_pair( rank, recvLink++) );
93  }
94  }
95  }
96 
97  // compute send destinations
98  computeDestinations( sendLinkage_, sendDest_ );
99 
100  // compute send destinations
101  computeDestinations( recvLinkage_, recvSource_ );
102  }
103 
104 
106  // non-blocking communication object
107  // this class is defined here since it contains MPI information
109 #if HAVE_MPI
110 
111 #ifndef NDEBUG
112 // this is simply to avoid warning of unused variables
113 #define MY_INT_TEST int test =
114 #else
115 #define MY_INT_TEST
116 #endif
117 
118  template < class P2PCommunicator >
119  class NonBlockingExchangeImplementation
120  {
121  typedef P2PCommunicator P2PCommunicatorType ;
122  const P2PCommunicatorType& _p2pCommunicator;
123 
124  const int _sendLinks;
125  const int _recvLinks;
126  const int _tag;
127 
128  MPI_Request* _sendRequest;
129  MPI_Request* _recvRequest;
130 
131  const bool _recvBufferSizesKnown;
132  bool _needToSend ;
133 
134  // no copying
135  NonBlockingExchangeImplementation( const NonBlockingExchangeImplementation& );
136 
137  // return vector of send requests for number of send links is positive
138  MPI_Request* createSendRequest() const
139  {
140  return ( _sendLinks > 0 ) ? new MPI_Request [ _sendLinks ] : 0;
141  }
142 
143  // return vector of recv requests when
144  // number of recv links is positive and symmetric is true
145  MPI_Request* createRecvRequest( const bool recvBufferSizesKnown ) const
146  {
147  return ( _recvLinks > 0 && recvBufferSizesKnown ) ? new MPI_Request [ _recvLinks ] : 0;
148  }
149 
150  // call cast operator on CollectiveCommunication to retreive MPI_Comm
151  MPI_Comm mpiCommunicator() const { return static_cast< MPI_Comm > (_p2pCommunicator); }
152 
153  public:
154  typedef typename P2PCommunicatorType :: DataHandleInterface DataHandleInterface;
155  typedef typename P2PCommunicatorType :: MessageBufferType MessageBufferType;
156 
157  NonBlockingExchangeImplementation( const P2PCommunicatorType& p2pComm,
158  const int tag,
159  const bool recvBufferSizesKnown = false )
160  : _p2pCommunicator( p2pComm ),
161  _sendLinks( _p2pCommunicator.sendLinks() ),
162  _recvLinks( _p2pCommunicator.recvLinks() ),
163  _tag( tag ),
164  _sendRequest( createSendRequest() ),
165  _recvRequest( createRecvRequest( recvBufferSizesKnown ) ),
166  _recvBufferSizesKnown( recvBufferSizesKnown ),
167  _needToSend( true )
168  {
169  // make sure every process has the same tag
170 #ifndef NDEBUG
171  int mytag = tag ;
172  assert ( mytag == _p2pCommunicator.max( mytag ) );
173 #endif
174  }
175 
176  NonBlockingExchangeImplementation( const P2PCommunicatorType& p2pComm,
177  const int tag,
178  const std::vector< MessageBufferType > & sendBuffers )
179  : _p2pCommunicator( p2pComm ),
180  _sendLinks( _p2pCommunicator.sendLinks() ),
181  _recvLinks( _p2pCommunicator.recvLinks() ),
182  _tag( tag ),
183  _sendRequest( createSendRequest() ),
184  _recvRequest( createRecvRequest( false ) ),
185  _recvBufferSizesKnown( false ),
186  _needToSend( false )
187  {
188  // make sure every process has the same tag
189 #ifndef NDEBUG
190  int mytag = tag ;
191  assert ( mytag == _p2pCommunicator.max( mytag ) );
192 #endif
193 
194  assert ( _sendLinks == int( sendBuffers.size() ) );
195  sendImpl( sendBuffers );
196  }
197 
199  // interface methods
201  ~NonBlockingExchangeImplementation()
202  {
203  if( _sendRequest )
204  {
205  delete [] _sendRequest;
206  _sendRequest = 0;
207  }
208 
209  if( _recvRequest )
210  {
211  delete [] _recvRequest;
212  _recvRequest = 0;
213  }
214  }
215 
216  // virtual methods
217  void send( const std::vector< MessageBufferType > & sendBuffers ) { sendImpl( sendBuffers ); }
218  std::vector< MessageBufferType > receive() { return receiveImpl(); }
219 
221  // implementation
223 
224  // send data implementation
225  void sendImpl( const std::vector< MessageBufferType > & sendBuffers )
226  {
227  // get mpi communicator
228  MPI_Comm comm = mpiCommunicator();
229 
230  // get vector with destinations
231  const std::vector< int >& sendDest = _p2pCommunicator.sendDest();
232 
233  // send data
234  for (int link = 0; link < _sendLinks; ++link)
235  {
236  sendLink( sendDest[ link ], _tag, sendBuffers[ link ], _sendRequest[ link ], comm );
237  }
238 
239  // set send info
240  _needToSend = false ;
241  }
242 
243  // receive data without buffer given
244  std::vector< MessageBufferType > receiveImpl ()
245  {
246  // create vector of empty streams
247  std::vector< MessageBufferType > recvBuffer( _recvLinks );
248  receiveImpl( recvBuffer );
249  return recvBuffer;
250  }
251 
252  // receive data implementation with given buffers
253  void receiveImpl ( std::vector< MessageBufferType >& recvBuffers, DataHandleInterface* dataHandle = 0)
254  {
255  // do nothing if number of links is zero
256  if( (_recvLinks + _sendLinks) == 0 ) return;
257 
258  // get mpi communicator
259  MPI_Comm comm = mpiCommunicator();
260 
261  // get vector with destinations
262  const std::vector< int >& recvSource = _p2pCommunicator.recvSource();
263 
264  // check whether out vector has more than one stream
265  const bool useFirstStreamOnly = (recvBuffers.size() == 1) ;
266 
267  // flag vector holding information about received links
268  std::vector< bool > linkNotReceived( _recvLinks, true );
269 
270  // count noumber of received messages
271  int numReceived = 0;
272  while( numReceived < _recvLinks )
273  {
274  // check for all links messages
275  for (int link = 0; link < _recvLinks; ++link )
276  {
277  // if message was not received yet, check again
278  if( linkNotReceived[ link ] )
279  {
280  // get appropriate object stream
281  MessageBufferType& recvBuffer = useFirstStreamOnly ? recvBuffers[ 0 ] : recvBuffers[ link ];
282 
283  // check whether a message was completely received
284  // if message was received the unpack data
285  if( probeAndReceive( comm, recvSource[ link ], _tag, recvBuffer ) )
286  {
287  // if data handle was given do unpack
288  if( dataHandle ) dataHandle->unpack( link, recvBuffer );
289 
290  // mark link as received
291  linkNotReceived[ link ] = false ;
292 
293  // increase number of received messages
294  ++ numReceived;
295  }
296  }
297  }
298  }
299 
300  // if send request exists, i.e. some messages have been sent
301  if( _sendRequest )
302  {
303  // wait until all processes are done with receiving
304  MY_INT_TEST MPI_Waitall ( _sendLinks, _sendRequest, MPI_STATUSES_IGNORE);
305  assert (test == MPI_SUCCESS);
306  }
307  }
308 
309  // receive data implementation with given buffers
310  void unpackRecvBufferSizeKnown( std::vector< MessageBufferType >& recvBuffers, DataHandleInterface& dataHandle )
311  {
312  // do nothing if number of links is zero
313  if( _recvLinks == 0 ) return;
314 
315  // flag vector holding information about received links
316  std::vector< bool > linkNotReceived( _recvLinks, true );
317 
318  // count noumber of received messages
319  int numReceived = 0;
320  while( numReceived < _recvLinks )
321  {
322  // check for all links messages
323  for (int link = 0; link < _recvLinks; ++link )
324  {
325  // if message was not received yet, check again
326  if( linkNotReceived[ link ] )
327  {
328  assert( _recvRequest );
329  // check whether message was received, and if unpack data
330  if( receivedMessage( _recvRequest[ link ], recvBuffers[ link ] ) )
331  {
332  // if data handle was given do unpack
333  dataHandle.unpack( link, recvBuffers[ link ] );
334 
335  // mark link as received
336  linkNotReceived[ link ] = false ;
337  // increase number of received messages
338  ++ numReceived;
339  }
340  }
341  }
342  }
343 
344  // if send request exists, i.e. some messages have been sent
345  if( _sendRequest )
346  {
347  // wait until all processes are done with receiving
348  MY_INT_TEST MPI_Waitall ( _sendLinks, _sendRequest, MPI_STATUSES_IGNORE);
349  assert (test == MPI_SUCCESS);
350  }
351  }
352 
353  // receive data implementation with given buffers
354  void send( std::vector< MessageBufferType >& sendBuffers,
355  DataHandleInterface& dataHandle )
356  {
357  std::vector< MessageBufferType > recvBuffers;
358  send( sendBuffers, recvBuffers, dataHandle );
359  }
360 
361  // receive data implementation with given buffers
362  void send( std::vector< MessageBufferType >& sendBuffer,
363  std::vector< MessageBufferType >& recvBuffer,
364  DataHandleInterface& dataHandle )
365  {
366  if( _needToSend )
367  {
368  // get mpi communicator
369  MPI_Comm comm = mpiCommunicator();
370 
371  // get vector with destinations
372  const std::vector< int >& sendDest = _p2pCommunicator.sendDest();
373 
374  // send data
375  for (int link = 0; link < _sendLinks; ++link)
376  {
377  // pack data
378  dataHandle.pack( link, sendBuffer[ link ] );
379 
380  // send data
381  sendLink( sendDest[ link ], _tag, sendBuffer[ link ], _sendRequest[ link ], comm );
382  }
383 
384  // set send info
385  _needToSend = false ;
386  }
387 
388  // resize receive buffer if in symmetric mode
389  if( _recvBufferSizesKnown )
390  {
391  // get mpi communicator
392  MPI_Comm comm = mpiCommunicator();
393 
394  recvBuffer.resize( _recvLinks );
395 
396  // get vector with destinations
397  const std::vector< int >& recvSource = _p2pCommunicator.recvSource();
398  const std::vector< int >& recvBufferSizes = _p2pCommunicator.recvBufferSizes();
399 
400  // send data
401  for (int link = 0; link < _recvLinks; ++link)
402  {
403  // send data
404  const int bufferSize = recvBufferSizes[ link ];
405 
406  // post receive if in symmetric mode
407  assert( _recvRequest );
408  assert( &_recvRequest[ link ] );
409  postReceive( recvSource[ link ], _tag, bufferSize, recvBuffer[ link ], _recvRequest[ link ], comm );
410  }
411  }
412  }
413 
414  // receive data implementation with given buffers
415  void receive( DataHandleInterface& dataHandle )
416  {
417  // do work that can be done between send and receive
418  dataHandle.localComputation() ;
419 
420  // create receive message buffers
421  std::vector< MessageBufferType > recvBuffer( 1 );
422  // receive data
423  receiveImpl( recvBuffer, &dataHandle );
424  }
425 
426  // receive data implementation with given buffers
427  void exchange( DataHandleInterface& dataHandle )
428  {
429  const int recvLinks = _p2pCommunicator.recvLinks();
430  // do nothing if number of links is zero
431  if( (recvLinks + _sendLinks) == 0 ) return;
432 
433  // send message buffers, we need several because of the
434  // non-blocking send routines, send might not be finished
435  // when we start recieving
436  std::vector< MessageBufferType > recvBuffers ;
437  // note: due to non-blocking we cannot limit scope
438  std::vector< MessageBufferType > sendBuffers;
439 
440  // if data was noy send yet, do it now
441  if( _needToSend )
442  {
443  // resize message buffer vector
444  sendBuffers.resize(_sendLinks);
445 
446  // send data
447  send( sendBuffers, recvBuffers, dataHandle );
448  }
449 
450  // now receive data
451  if( _recvBufferSizesKnown )
452  unpackRecvBufferSizeKnown( recvBuffers, dataHandle );
453  else
454  receive( dataHandle );
455  }
456 
457  protected:
458  int sendLink( const int dest, const int tag,
459  const MessageBufferType& msgBuffer, MPI_Request& request, MPI_Comm& comm )
460  {
461  // buffer = point to mem and size
462  std::pair< char*, int > buffer = msgBuffer.buffer();
463 
464  MY_INT_TEST MPI_Isend ( buffer.first, buffer.second, MPI_BYTE, dest, tag, comm, &request );
465  assert (test == MPI_SUCCESS);
466 
467  return buffer.second;
468  }
469 
470  void postReceive( const int source, const int tag, const int bufferSize,
471  MessageBufferType& msgBuffer, MPI_Request& request, MPI_Comm& comm )
472  {
473  // reserve memory for receive buffer
474  msgBuffer.resize( bufferSize );
475  // reset read position
476  msgBuffer.resetReadPosition();
477 
478  // get buffer and size
479  std::pair< char*, int > buffer = msgBuffer.buffer();
480 
481  // MPI receive (non-blocking)
482  {
483  MY_INT_TEST MPI_Irecv ( buffer.first, buffer.second, MPI_BYTE, source, tag, comm, & request);
484  assert (test == MPI_SUCCESS);
485  }
486  }
487 
488  // does receive operation for one link
489  bool receivedMessage( MPI_Request& request, MessageBufferType&
490 #ifndef NDEBUG
491  buffer
492 #endif
493  )
494  {
495 #ifndef NDEBUG
496  // for checking whether the buffer size is correct
497  MPI_Status status ;
498 #endif
499  // msg received, 0 or 1
500  int received = 0;
501 
502  // if receive of message is finished, unpack
503  MPI_Test( &request, &received,
504 #ifndef NDEBUG
505  &status
506 #else
507  MPI_STATUS_IGNORE // ignore status in non-debug mode for performance reasons
508 #endif
509  );
510 
511 #ifndef NDEBUG
512  if( received )
513  {
514  int checkBufferSize = -1;
515  MPI_Get_count ( & status, MPI_BYTE, &checkBufferSize );
516  if( checkBufferSize != int(buffer.size()) )
517  std::cout << "Buffer sizes don't match: " << checkBufferSize << " " << buffer.size() << std::endl;
518  assert( checkBufferSize == int(buffer.size()) );
519  }
520 #endif
521  return bool(received);
522  }
523 
524  // does receive operation for one link
525  bool probeAndReceive( MPI_Comm& comm,
526  const int source,
527  const int tag,
528  MessageBufferType& recvBuffer )
529  {
530  // corresponding MPI status
531  MPI_Status status;
532 
533  // msg available, 0 or 1
534  // available does not mean already received
535  int available = 0;
536 
537  // check for any message with tag (nonblocking)
538  MPI_Iprobe( source, tag, comm, &available, &status );
539 
540  // receive message if available flag is true
541  if( available )
542  {
543  // this should be the same, otherwise we got an error
544  assert ( source == status.MPI_SOURCE );
545 
546  // length of message
547  int bufferSize = -1;
548 
549  // get length of message
550  {
551  MY_INT_TEST MPI_Get_count ( &status, MPI_BYTE, &bufferSize );
552  assert (test == MPI_SUCCESS);
553  }
554 
555  // reserve memory
556  recvBuffer.resize( bufferSize );
557  // reset read position for unpack
558  recvBuffer.resetReadPosition();
559 
560  // get buffer
561  std::pair< char*, int > buffer = recvBuffer.buffer();
562 
563  // MPI receive (blocking)
564  {
565  MY_INT_TEST MPI_Recv ( buffer.first, buffer.second, MPI_BYTE, status.MPI_SOURCE, tag, comm, & status);
566  assert (test == MPI_SUCCESS);
567  }
568 
569  return true ; // received
570  }
571  return false ; // not yet received
572  }
573  }; // end NonBlockingExchangeImplementation
574 
575 #undef MY_INT_TEST
576 #endif // #if HAVE_MPI
577 
578  // --exchange
579 
580  template <class MsgBuffer>
581  inline void
582  Point2PointCommunicator< MsgBuffer >::
583 #if HAVE_MPI
584  exchange( DataHandleInterface& handle) const
585 #else
586  exchange( DataHandleInterface&) const
587 #endif
588  {
589  assert( _recvBufferSizes.empty () );
590 #if HAVE_MPI
591  NonBlockingExchangeImplementation< ThisType > nonBlockingExchange( *this, getMessageTag() );
592  nonBlockingExchange.exchange( handle );
593 #endif
594  }
595 
596  // --exchange
597  template <class MsgBuffer>
598  inline std::vector< MsgBuffer >
600  exchange( const std::vector< MessageBufferType > & in ) const
601  {
602 #if HAVE_MPI
603  // note: for the non-blocking exchange the message tag
604  // should be different each time to avoid MPI problems
605  NonBlockingExchangeImplementation< ThisType > nonBlockingExchange( *this, getMessageTag(), in );
606  return nonBlockingExchange.receiveImpl();
607 #else
608  // don nothing when MPI is not found
609  return in;
610 #endif
611  }
612 
613  // --exchange
614  template <class MsgBuffer>
615  inline void
617 #if HAVE_MPI
618  exchangeCached( DataHandleInterface& handle ) const
619 #else
620  exchangeCached( DataHandleInterface&) const
621 #endif
622  {
623 #if HAVE_MPI
624  if( ! _recvBufferSizesComputed )
625  {
626  const int nSendLinks = sendLinks();
627  std::vector< MsgBuffer > buffers( nSendLinks );
628  // pack all data
629  for( int link=0; link<nSendLinks; ++link )
630  {
631  handle.pack( link, buffers[ link ] );
632  }
633  // exchange data
634  buffers = exchange( buffers );
635  const int nRecvLinks = recvLinks();
636  // unpack all data
637  for( int link=0; link<nRecvLinks; ++link )
638  {
639  handle.unpack( link, buffers[ link ] );
640  }
641  // store receive buffer sizes
642  _recvBufferSizes.resize( nRecvLinks );
643  for( int link=0; link<nRecvLinks; ++link )
644  {
645  _recvBufferSizes[ link ] = buffers[ link ].size();
646  }
647  _recvBufferSizesComputed = true ;
648  }
649  else
650  {
651  NonBlockingExchangeImplementation< ThisType > nonBlockingExchange( *this, getMessageTag(), _recvBufferSizesComputed );
652  nonBlockingExchange.exchange( handle );
653  }
654 #endif
655  }
656 
657 } // namespace Dune
658 #endif // #ifndef DUNE_POINT2POINTCOMMUNICATOR_IMPL_HEADER_INCLUDED
void removeLinkage()
remove stored linkage
Definition: p2pcommunicator_impl.hh:30
The namespace Dune is the main namespace for all Dune code.
Definition: CartesianIndexMapper.hpp:9
virtual std::vector< MessageBufferType > exchange(const std::vector< MessageBufferType > &) const
exchange message buffers with peers defined by inserted linkage
Definition: p2pcommunicator_impl.hh:600
Definition: p2pcommunicator.hh:165
Point-2-Point communicator for exchange messages between processes.
Definition: p2pcommunicator.hh:131
void insertRequest(const std::set< int > &sendLinks, const std::set< int > &recvLinks)
insert communication request with a set os ranks to send to and a set of ranks to receive from ...
Definition: p2pcommunicator_impl.hh:59