19#ifndef DUNE_POINT2POINTCOMMUNICATOR_IMPL_HEADER_INCLUDED
20#define DUNE_POINT2POINTCOMMUNICATOR_IMPL_HEADER_INCLUDED
27 template <
class MsgBuffer>
38 _recvBufferSizes.clear();
39 _recvBufferSizesComputed = false ;
42 template <
class MsgBuffer>
47 typedef linkage_t::const_iterator const_iterator ;
48 dest.resize( linkage.size() );
49 const const_iterator linkageEnd = linkage.end ();
50 for( const_iterator i = linkage.begin (); i != linkageEnd; ++i )
52 dest[ (*i).second ] = (*i).first;
56 template <
class MsgBuffer>
59 insertRequest(
const std::set< int >& sendLinks,
const std::set< int >& recvLinks )
64 const int me_rank = rank ();
67 typedef std::map< int, int >::iterator iterator ;
68 typedef std::set< int >::const_iterator const_iterator;
70 const iterator sendEnd = sendLinkage_.end ();
71 const iterator recvEnd = recvLinkage_.end ();
72 const const_iterator sendLinksEnd = sendLinks.end ();
75 for (const_iterator i = sendLinks.begin (); i != sendLinksEnd; ++i )
77 const int rank = (*i);
79 if( rank != me_rank && (sendLinkage_.find ( rank ) == sendEnd ) )
81 sendLinkage_.insert( std::make_pair( rank, sendLink++) );
85 const const_iterator recvLinksEnd = recvLinks.end ();
86 for (const_iterator i = recvLinks.begin (); i != recvLinksEnd; ++i )
88 const int rank = (*i);
90 if( rank != me_rank && (recvLinkage_.find ( rank ) == recvEnd ) )
92 recvLinkage_.insert( std::make_pair( rank, recvLink++) );
98 computeDestinations( sendLinkage_, sendDest_ );
101 computeDestinations( recvLinkage_, recvSource_ );
113#define MY_INT_TEST int test =
118 template <
class P2PCommunicator >
121 typedef P2PCommunicator P2PCommunicatorType ;
122 const P2PCommunicatorType& _p2pCommunicator;
124 const int _sendLinks;
125 const int _recvLinks;
128 MPI_Request* _sendRequest;
129 MPI_Request* _recvRequest;
131 const bool _recvBufferSizesKnown;
138 MPI_Request* createSendRequest()
const
140 return ( _sendLinks > 0 ) ?
new MPI_Request [ _sendLinks ] : 0;
145 MPI_Request* createRecvRequest(
const bool recvBufferSizesKnown )
const
147 return ( _recvLinks > 0 && recvBufferSizesKnown ) ?
new MPI_Request [ _recvLinks ] : 0;
151 MPI_Comm mpiCommunicator()
const {
return static_cast< MPI_Comm
> (_p2pCommunicator); }
159 const bool recvBufferSizesKnown =
false )
160 : _p2pCommunicator( p2pComm ),
161 _sendLinks( _p2pCommunicator.sendLinks() ),
162 _recvLinks( _p2pCommunicator.recvLinks() ),
164 _sendRequest( createSendRequest() ),
165 _recvRequest( createRecvRequest( recvBufferSizesKnown ) ),
166 _recvBufferSizesKnown( recvBufferSizesKnown ),
172 assert ( mytag == _p2pCommunicator.max( mytag ) );
178 const std::vector< MessageBufferType > & sendBuffers )
179 : _p2pCommunicator( p2pComm ),
180 _sendLinks( _p2pCommunicator.sendLinks() ),
181 _recvLinks( _p2pCommunicator.recvLinks() ),
183 _sendRequest( createSendRequest() ),
184 _recvRequest( createRecvRequest( false ) ),
185 _recvBufferSizesKnown( false ),
191 assert ( mytag == _p2pCommunicator.max( mytag ) );
194 assert ( _sendLinks ==
int( sendBuffers.size() ) );
205 delete [] _sendRequest;
211 delete [] _recvRequest;
217 void send(
const std::vector< MessageBufferType > & sendBuffers ) {
sendImpl( sendBuffers ); }
225 void sendImpl(
const std::vector< MessageBufferType > & sendBuffers )
228 MPI_Comm comm = mpiCommunicator();
231 const std::vector< int >& sendDest = _p2pCommunicator.sendDest();
234 for (
int link = 0; link < _sendLinks; ++link)
236 sendLink( sendDest[ link ], _tag, sendBuffers[ link ], _sendRequest[ link ], comm );
240 _needToSend = false ;
247 std::vector< MessageBufferType > recvBuffer( _recvLinks );
256 if( (_recvLinks + _sendLinks) == 0 )
return;
259 MPI_Comm comm = mpiCommunicator();
262 const std::vector< int >& recvSource = _p2pCommunicator.recvSource();
265 const bool useFirstStreamOnly = (recvBuffers.size() == 1) ;
268 std::vector< bool > linkNotReceived( _recvLinks,
true );
272 while( numReceived < _recvLinks )
275 for (
int link = 0; link < _recvLinks; ++link )
278 if( linkNotReceived[ link ] )
281 MessageBufferType& recvBuffer = useFirstStreamOnly ? recvBuffers[ 0 ] : recvBuffers[ link ];
288 if( dataHandle ) dataHandle->unpack( link, recvBuffer );
291 linkNotReceived[ link ] = false ;
304 MY_INT_TEST MPI_Waitall ( _sendLinks, _sendRequest, MPI_STATUSES_IGNORE);
305 assert (test == MPI_SUCCESS);
313 if( _recvLinks == 0 )
return;
316 std::vector< bool > linkNotReceived( _recvLinks,
true );
320 while( numReceived < _recvLinks )
323 for (
int link = 0; link < _recvLinks; ++link )
326 if( linkNotReceived[ link ] )
328 assert( _recvRequest );
333 dataHandle.unpack( link, recvBuffers[ link ] );
336 linkNotReceived[ link ] = false ;
348 MY_INT_TEST MPI_Waitall ( _sendLinks, _sendRequest, MPI_STATUSES_IGNORE);
349 assert (test == MPI_SUCCESS);
354 void send( std::vector< MessageBufferType >& sendBuffers,
357 std::vector< MessageBufferType > recvBuffers;
358 send( sendBuffers, recvBuffers, dataHandle );
362 void send( std::vector< MessageBufferType >& sendBuffer,
363 std::vector< MessageBufferType >& recvBuffer,
369 MPI_Comm comm = mpiCommunicator();
372 const std::vector< int >& sendDest = _p2pCommunicator.sendDest();
375 for (
int link = 0; link < _sendLinks; ++link)
378 dataHandle.pack( link, sendBuffer[ link ] );
381 sendLink( sendDest[ link ], _tag, sendBuffer[ link ], _sendRequest[ link ], comm );
385 _needToSend = false ;
389 if( _recvBufferSizesKnown )
392 MPI_Comm comm = mpiCommunicator();
394 recvBuffer.resize( _recvLinks );
397 const std::vector< int >& recvSource = _p2pCommunicator.recvSource();
398 const std::vector< int >& recvBufferSizes = _p2pCommunicator.recvBufferSizes();
401 for (
int link = 0; link < _recvLinks; ++link)
404 const int bufferSize = recvBufferSizes[ link ];
407 assert( _recvRequest );
408 assert( &_recvRequest[ link ] );
409 postReceive( recvSource[ link ], _tag, bufferSize, recvBuffer[ link ], _recvRequest[ link ], comm );
418 dataHandle.localComputation() ;
421 std::vector< MessageBufferType > recvBuffer( 1 );
429 const int recvLinks = _p2pCommunicator.recvLinks();
431 if( (recvLinks + _sendLinks) == 0 )
return;
436 std::vector< MessageBufferType > sendBuffers ;
437 std::vector< MessageBufferType > recvBuffers ;
443 sendBuffers.resize( _sendLinks );
446 send( sendBuffers, recvBuffers, dataHandle );
450 if( _recvBufferSizesKnown )
461 std::pair< char*, int >
buffer = msgBuffer.buffer();
464 assert (test == MPI_SUCCESS);
469 void postReceive(
const int source,
const int tag,
const int bufferSize,
473 msgBuffer.resize( bufferSize );
475 msgBuffer.resetReadPosition();
478 std::pair< char*, int >
buffer = msgBuffer.buffer();
483 assert (test == MPI_SUCCESS);
502 MPI_Test( &request, &received,
513 int checkBufferSize = -1;
514 MPI_Get_count ( & status, MPI_BYTE, &checkBufferSize );
515 if( checkBufferSize !=
int(
buffer.size()) )
516 std::cout <<
"Buffer sizes don't match: " << checkBufferSize <<
" " <<
buffer.size() << std::endl;
517 assert( checkBufferSize ==
int(
buffer.size()) );
520 return bool(received);
537 MPI_Iprobe( source, tag, comm, &available, &status );
543 assert ( source == status.MPI_SOURCE );
550 MY_INT_TEST MPI_Get_count ( &status, MPI_BYTE, &bufferSize );
551 assert (test == MPI_SUCCESS);
555 recvBuffer.resize( bufferSize );
557 recvBuffer.resetReadPosition();
560 std::pair< char*, int >
buffer = recvBuffer.buffer();
565 assert (test == MPI_SUCCESS);
579 template <
class MsgBuffer>
588 assert( _recvBufferSizes.empty () );
591 nonBlockingExchange.
exchange( handle );
596 template <
class MsgBuffer>
597 inline std::vector< MsgBuffer >
599 exchange(
const std::vector< MessageBufferType > & in )
const
613 template <
class MsgBuffer>
623 if( ! _recvBufferSizesComputed )
625 const int nSendLinks = sendLinks();
626 std::vector< MsgBuffer > buffers( nSendLinks );
628 for(
int link=0; link<nSendLinks; ++link )
630 handle.pack( link, buffers[ link ] );
633 buffers = exchange( buffers );
634 const int nRecvLinks = recvLinks();
636 for(
int link=0; link<nRecvLinks; ++link )
638 handle.unpack( link, buffers[ link ] );
641 _recvBufferSizes.resize( nRecvLinks );
642 for(
int link=0; link<nRecvLinks; ++link )
644 _recvBufferSizes[ link ] = buffers[ link ].size();
646 _recvBufferSizesComputed = true ;
651 nonBlockingExchange.
exchange( handle );
mover::MoveBuffer< typename DataHandle::DataType > & buffer
Definition: CpGridData.hpp:1103
Definition: p2pcommunicator_impl.hh:120
void receive(DataHandleInterface &dataHandle)
Definition: p2pcommunicator_impl.hh:415
std::vector< MessageBufferType > receive()
Definition: p2pcommunicator_impl.hh:218
P2PCommunicatorType::DataHandleInterface DataHandleInterface
Definition: p2pcommunicator_impl.hh:154
bool probeAndReceive(MPI_Comm &comm, const int source, const int tag, MessageBufferType &recvBuffer)
Definition: p2pcommunicator_impl.hh:524
bool receivedMessage(MPI_Request &request, MessageBufferType &buffer)
Definition: p2pcommunicator_impl.hh:488
P2PCommunicatorType::MessageBufferType MessageBufferType
Definition: p2pcommunicator_impl.hh:155
void send(std::vector< MessageBufferType > &sendBuffers, DataHandleInterface &dataHandle)
Definition: p2pcommunicator_impl.hh:354
void send(const std::vector< MessageBufferType > &sendBuffers)
Definition: p2pcommunicator_impl.hh:217
void sendImpl(const std::vector< MessageBufferType > &sendBuffers)
Definition: p2pcommunicator_impl.hh:225
NonBlockingExchangeImplementation(const P2PCommunicatorType &p2pComm, const int tag, const std::vector< MessageBufferType > &sendBuffers)
Definition: p2pcommunicator_impl.hh:176
void send(std::vector< MessageBufferType > &sendBuffer, std::vector< MessageBufferType > &recvBuffer, DataHandleInterface &dataHandle)
Definition: p2pcommunicator_impl.hh:362
void postReceive(const int source, const int tag, const int bufferSize, MessageBufferType &msgBuffer, MPI_Request &request, MPI_Comm &comm)
Definition: p2pcommunicator_impl.hh:469
int sendLink(const int dest, const int tag, const MessageBufferType &msgBuffer, MPI_Request &request, MPI_Comm &comm)
Definition: p2pcommunicator_impl.hh:457
NonBlockingExchangeImplementation(const P2PCommunicatorType &p2pComm, const int tag, const bool recvBufferSizesKnown=false)
Definition: p2pcommunicator_impl.hh:157
std::vector< MessageBufferType > receiveImpl()
Definition: p2pcommunicator_impl.hh:244
void unpackRecvBufferSizeKnown(std::vector< MessageBufferType > &recvBuffers, DataHandleInterface &dataHandle)
Definition: p2pcommunicator_impl.hh:310
~NonBlockingExchangeImplementation()
Definition: p2pcommunicator_impl.hh:201
void exchange(DataHandleInterface &dataHandle)
Definition: p2pcommunicator_impl.hh:427
void receiveImpl(std::vector< MessageBufferType > &recvBuffers, DataHandleInterface *dataHandle=0)
Definition: p2pcommunicator_impl.hh:253
Definition: p2pcommunicator.hh:166
Point-2-Point communicator for exchange messages between processes.
Definition: p2pcommunicator.hh:132
std::map< int, int > linkage_t
Definition: p2pcommunicator.hh:147
virtual std::vector< MessageBufferType > exchange(const std::vector< MessageBufferType > &) const
exchange message buffers with peers defined by inserted linkage
Definition: p2pcommunicator_impl.hh:599
std::vector< int > vector_t
Definition: p2pcommunicator.hh:148
void insertRequest(const std::set< int > &sendLinks, const std::set< int > &recvLinks)
insert communication request with a set os ranks to send to and a set of ranks to receive from
Definition: p2pcommunicator_impl.hh:59
void computeDestinations(const linkage_t &linkage, vector_t &dest)
Definition: p2pcommunicator_impl.hh:45
void removeLinkage()
remove stored linkage
Definition: p2pcommunicator_impl.hh:30
The namespace Dune is the main namespace for all Dune code.
Definition: common/CartesianIndexMapper.hpp:10
#define MY_INT_TEST
Definition: p2pcommunicator_impl.hh:113