19#ifndef DUNE_POINT2POINTCOMMUNICATOR_IMPL_HEADER_INCLUDED 
   20#define DUNE_POINT2POINTCOMMUNICATOR_IMPL_HEADER_INCLUDED 
   27  template <
class MsgBuffer>
 
   38    _recvBufferSizes.clear();
 
   39    _recvBufferSizesComputed = false ;
 
   42  template <
class MsgBuffer>
 
   47    typedef linkage_t::const_iterator const_iterator ;
 
   48    dest.resize( linkage.size() );
 
   49    const const_iterator linkageEnd = linkage.end ();
 
   50    for( const_iterator i = linkage.begin (); i != linkageEnd; ++i )
 
   52      dest[ (*i).second ] = (*i).first;
 
   56  template <
class MsgBuffer>
 
   59  insertRequest( 
const std::set< int >& sendLinks, 
const std::set< int >& recvLinks )
 
   64    const int me_rank = rank ();
 
   67      typedef std::map< int, int >::iterator iterator ;
 
   68      typedef std::set< int >::const_iterator const_iterator;
 
   70      const iterator sendEnd = sendLinkage_.end ();
 
   71      const iterator recvEnd = recvLinkage_.end ();
 
   72      const const_iterator sendLinksEnd = sendLinks.end ();
 
   75      for (const_iterator i = sendLinks.begin (); i != sendLinksEnd; ++i )
 
   77        const int rank = (*i);
 
   79        if( rank != me_rank && (sendLinkage_.find ( rank ) == sendEnd ) )
 
   81          sendLinkage_.insert( std::make_pair( rank, sendLink++) );
 
   85      const const_iterator recvLinksEnd = recvLinks.end ();
 
   86      for (const_iterator i = recvLinks.begin (); i != recvLinksEnd; ++i )
 
   88        const int rank = (*i);
 
   90        if( rank != me_rank && (recvLinkage_.find ( rank ) == recvEnd ) )
 
   92          recvLinkage_.insert( std::make_pair( rank, recvLink++) );
 
   98    computeDestinations( sendLinkage_, sendDest_ );
 
  101    computeDestinations( recvLinkage_, recvSource_ );
 
  113#define MY_INT_TEST int test = 
  118  template < 
class P2PCommunicator >
 
  121    typedef P2PCommunicator  P2PCommunicatorType ;
 
  122    const P2PCommunicatorType& _p2pCommunicator;
 
  124    const int _sendLinks;
 
  125    const int _recvLinks;
 
  128    MPI_Request* _sendRequest;
 
  129    MPI_Request* _recvRequest;
 
  131    const bool _recvBufferSizesKnown;
 
  138    MPI_Request* createSendRequest()
 const 
  140      return ( _sendLinks > 0 ) ? 
new MPI_Request [ _sendLinks ] : 0;
 
  145    MPI_Request* createRecvRequest( 
const bool recvBufferSizesKnown )
 const 
  147      return ( _recvLinks > 0 && recvBufferSizesKnown ) ? 
new MPI_Request [ _recvLinks ] : 0;
 
  151    MPI_Comm mpiCommunicator()
 const { 
return static_cast< MPI_Comm 
> (_p2pCommunicator); }
 
  159                                       const bool recvBufferSizesKnown = 
false )
 
  160      : _p2pCommunicator( p2pComm ),
 
  161        _sendLinks( _p2pCommunicator.sendLinks() ),
 
  162        _recvLinks( _p2pCommunicator.recvLinks() ),
 
  164        _sendRequest( createSendRequest() ),
 
  165        _recvRequest( createRecvRequest( recvBufferSizesKnown ) ),
 
  166        _recvBufferSizesKnown( recvBufferSizesKnown ),
 
  172      assert ( mytag == _p2pCommunicator.max( mytag ) );
 
  178                                       const std::vector< MessageBufferType > & sendBuffers )
 
  179      : _p2pCommunicator( p2pComm ),
 
  180        _sendLinks( _p2pCommunicator.sendLinks() ),
 
  181        _recvLinks( _p2pCommunicator.recvLinks() ),
 
  183        _sendRequest( createSendRequest() ),
 
  184        _recvRequest( createRecvRequest( false ) ),
 
  185        _recvBufferSizesKnown( false ),
 
  191      assert ( mytag == _p2pCommunicator.max( mytag ) );
 
  194      assert ( _sendLinks == 
int( sendBuffers.size() ) );
 
  205        delete [] _sendRequest;
 
  211        delete [] _recvRequest;
 
  217    void send( 
const std::vector< MessageBufferType > & sendBuffers ) { 
sendImpl( sendBuffers ); }
 
  225    void sendImpl( 
const std::vector< MessageBufferType > & sendBuffers )
 
  228      MPI_Comm comm = mpiCommunicator();
 
  231      const std::vector< int >& sendDest = _p2pCommunicator.sendDest();
 
  234      for (
int link = 0; link < _sendLinks; ++link)
 
  236        sendLink( sendDest[ link ], _tag, sendBuffers[ link ], _sendRequest[ link ], comm );
 
  240      _needToSend = false ;
 
  247      std::vector< MessageBufferType > recvBuffer( _recvLinks );
 
  256      if( (_recvLinks + _sendLinks) == 0 ) 
return;
 
  259      MPI_Comm comm = mpiCommunicator();
 
  262      const std::vector< int >& recvSource = _p2pCommunicator.recvSource();
 
  265      const bool useFirstStreamOnly = (recvBuffers.size() == 1) ;
 
  268      std::vector< bool > linkNotReceived( _recvLinks, 
true );
 
  272      while( numReceived < _recvLinks )
 
  275        for (
int link = 0; link < _recvLinks; ++link )
 
  278          if( linkNotReceived[ link ] )
 
  281            MessageBufferType& recvBuffer = useFirstStreamOnly ? recvBuffers[ 0 ] : recvBuffers[ link ];
 
  288              if( dataHandle ) dataHandle->unpack( link, recvBuffer );
 
  291              linkNotReceived[ link ] = false ;
 
  304        MY_INT_TEST MPI_Waitall ( _sendLinks, _sendRequest, MPI_STATUSES_IGNORE);
 
  305        assert (test == MPI_SUCCESS);
 
  313      if( _recvLinks == 0 ) 
return;
 
  316      std::vector< bool > linkNotReceived( _recvLinks, 
true );
 
  320      while( numReceived < _recvLinks )
 
  323        for (
int link = 0; link < _recvLinks; ++link )
 
  326          if( linkNotReceived[ link ] )
 
  328            assert( _recvRequest );
 
  333              dataHandle.unpack( link, recvBuffers[ link ] );
 
  336              linkNotReceived[ link ] = false ;
 
  348        MY_INT_TEST MPI_Waitall ( _sendLinks, _sendRequest, MPI_STATUSES_IGNORE);
 
  349        assert (test == MPI_SUCCESS);
 
  354    void send( std::vector< MessageBufferType >& sendBuffers,
 
  357      std::vector< MessageBufferType > recvBuffers;
 
  358      send( sendBuffers, recvBuffers, dataHandle );
 
  362    void send( std::vector< MessageBufferType >& sendBuffer,
 
  363               std::vector< MessageBufferType >& recvBuffer,
 
  369        MPI_Comm comm = mpiCommunicator();
 
  372        const std::vector< int >& sendDest = _p2pCommunicator.sendDest();
 
  375        for (
int link = 0; link < _sendLinks; ++link)
 
  378          dataHandle.pack( link, sendBuffer[ link ] );
 
  381          sendLink( sendDest[ link ], _tag, sendBuffer[ link ], _sendRequest[ link ], comm );
 
  385        _needToSend = false ;
 
  389      if( _recvBufferSizesKnown )
 
  392        MPI_Comm comm = mpiCommunicator();
 
  394        recvBuffer.resize( _recvLinks );
 
  397        const std::vector< int >& recvSource = _p2pCommunicator.recvSource();
 
  398        const std::vector< int >& recvBufferSizes = _p2pCommunicator.recvBufferSizes();
 
  401        for (
int link = 0; link < _recvLinks; ++link)
 
  404          const int bufferSize = recvBufferSizes[ link ];
 
  407          assert( _recvRequest );
 
  408          assert( &_recvRequest[ link ] );
 
  409          postReceive( recvSource[ link ], _tag, bufferSize, recvBuffer[ link ], _recvRequest[ link ], comm );
 
  418      dataHandle.localComputation() ;
 
  421      std::vector< MessageBufferType > recvBuffer( 1 );
 
  429      const int recvLinks = _p2pCommunicator.recvLinks();
 
  431      if( (recvLinks + _sendLinks) == 0 ) 
return;
 
  436      std::vector< MessageBufferType > recvBuffers ;
 
  438      std::vector< MessageBufferType > sendBuffers;
 
  444        sendBuffers.resize(_sendLinks);
 
  447        send( sendBuffers, recvBuffers, dataHandle );
 
  451      if( _recvBufferSizesKnown )
 
  462      std::pair< char*, int > 
buffer = msgBuffer.buffer();
 
  465      assert (test == MPI_SUCCESS);
 
  470    void postReceive( 
const int source, 
const int tag, 
const int bufferSize,
 
  474      msgBuffer.resize( bufferSize );
 
  476      msgBuffer.resetReadPosition();
 
  479      std::pair< char*, int > 
buffer = msgBuffer.buffer();
 
  484        assert (test == MPI_SUCCESS);
 
  503      MPI_Test( &request, &received,
 
  514        int checkBufferSize = -1;
 
  515        MPI_Get_count ( & status, MPI_BYTE, &checkBufferSize );
 
  516        if( checkBufferSize != 
int(
buffer.size()) )
 
  517          std::cout << 
"Buffer sizes don't match: "  << checkBufferSize << 
" " << 
buffer.size() << std::endl;
 
  518        assert( checkBufferSize == 
int(
buffer.size()) );
 
  521      return bool(received);
 
  538      MPI_Iprobe( source, tag, comm, &available, &status );
 
  544        assert ( source == status.MPI_SOURCE );
 
  551          MY_INT_TEST MPI_Get_count ( &status, MPI_BYTE, &bufferSize );
 
  552          assert (test == MPI_SUCCESS);
 
  556        recvBuffer.resize( bufferSize );
 
  558        recvBuffer.resetReadPosition();
 
  561        std::pair< char*, int > 
buffer = recvBuffer.buffer();
 
  566          assert (test == MPI_SUCCESS);
 
  580  template <
class MsgBuffer>
 
  589    assert( _recvBufferSizes.empty () );
 
  592    nonBlockingExchange.
exchange( handle );
 
  597  template <
class MsgBuffer>
 
  598  inline std::vector< MsgBuffer >
 
  600  exchange( 
const std::vector< MessageBufferType > & in )
 const 
  614  template <
class MsgBuffer>
 
  624    if( ! _recvBufferSizesComputed )
 
  626      const int nSendLinks = sendLinks();
 
  627      std::vector< MsgBuffer > buffers( nSendLinks );
 
  629      for( 
int link=0; link<nSendLinks; ++link )
 
  631        handle.pack( link, buffers[ link ] );
 
  634      buffers = exchange( buffers );
 
  635      const int nRecvLinks = recvLinks();
 
  637      for( 
int link=0; link<nRecvLinks; ++link )
 
  639        handle.unpack( link, buffers[ link ] );
 
  642      _recvBufferSizes.resize( nRecvLinks );
 
  643      for( 
int link=0; link<nRecvLinks; ++link )
 
  645        _recvBufferSizes[ link ] = buffers[ link ].size();
 
  647      _recvBufferSizesComputed = true ;
 
  652      nonBlockingExchange.
exchange( handle );
 
mover::MoveBuffer< typename DataHandle::DataType > & buffer
Definition: CpGridData.hpp:1183
 
Definition: p2pcommunicator_impl.hh:120
 
void receive(DataHandleInterface &dataHandle)
Definition: p2pcommunicator_impl.hh:415
 
std::vector< MessageBufferType > receive()
Definition: p2pcommunicator_impl.hh:218
 
P2PCommunicatorType::DataHandleInterface DataHandleInterface
Definition: p2pcommunicator_impl.hh:154
 
bool probeAndReceive(MPI_Comm &comm, const int source, const int tag, MessageBufferType &recvBuffer)
Definition: p2pcommunicator_impl.hh:525
 
bool receivedMessage(MPI_Request &request, MessageBufferType &buffer)
Definition: p2pcommunicator_impl.hh:489
 
P2PCommunicatorType::MessageBufferType MessageBufferType
Definition: p2pcommunicator_impl.hh:155
 
void send(std::vector< MessageBufferType > &sendBuffers, DataHandleInterface &dataHandle)
Definition: p2pcommunicator_impl.hh:354
 
void send(const std::vector< MessageBufferType > &sendBuffers)
Definition: p2pcommunicator_impl.hh:217
 
void sendImpl(const std::vector< MessageBufferType > &sendBuffers)
Definition: p2pcommunicator_impl.hh:225
 
NonBlockingExchangeImplementation(const P2PCommunicatorType &p2pComm, const int tag, const std::vector< MessageBufferType > &sendBuffers)
Definition: p2pcommunicator_impl.hh:176
 
void send(std::vector< MessageBufferType > &sendBuffer, std::vector< MessageBufferType > &recvBuffer, DataHandleInterface &dataHandle)
Definition: p2pcommunicator_impl.hh:362
 
void postReceive(const int source, const int tag, const int bufferSize, MessageBufferType &msgBuffer, MPI_Request &request, MPI_Comm &comm)
Definition: p2pcommunicator_impl.hh:470
 
int sendLink(const int dest, const int tag, const MessageBufferType &msgBuffer, MPI_Request &request, MPI_Comm &comm)
Definition: p2pcommunicator_impl.hh:458
 
NonBlockingExchangeImplementation(const P2PCommunicatorType &p2pComm, const int tag, const bool recvBufferSizesKnown=false)
Definition: p2pcommunicator_impl.hh:157
 
std::vector< MessageBufferType > receiveImpl()
Definition: p2pcommunicator_impl.hh:244
 
void unpackRecvBufferSizeKnown(std::vector< MessageBufferType > &recvBuffers, DataHandleInterface &dataHandle)
Definition: p2pcommunicator_impl.hh:310
 
~NonBlockingExchangeImplementation()
Definition: p2pcommunicator_impl.hh:201
 
void exchange(DataHandleInterface &dataHandle)
Definition: p2pcommunicator_impl.hh:427
 
void receiveImpl(std::vector< MessageBufferType > &recvBuffers, DataHandleInterface *dataHandle=0)
Definition: p2pcommunicator_impl.hh:253
 
Definition: p2pcommunicator.hh:166
 
Point-2-Point communicator for exchange messages between processes.
Definition: p2pcommunicator.hh:132
 
std::map< int, int > linkage_t
Definition: p2pcommunicator.hh:147
 
virtual std::vector< MessageBufferType > exchange(const std::vector< MessageBufferType > &) const
exchange message buffers with peers defined by inserted linkage
Definition: p2pcommunicator_impl.hh:600
 
std::vector< int > vector_t
Definition: p2pcommunicator.hh:148
 
void insertRequest(const std::set< int > &sendLinks, const std::set< int > &recvLinks)
insert communication request with a set os ranks to send to and a set of ranks to receive from
Definition: p2pcommunicator_impl.hh:59
 
void computeDestinations(const linkage_t &linkage, vector_t &dest)
Definition: p2pcommunicator_impl.hh:45
 
void removeLinkage()
remove stored linkage
Definition: p2pcommunicator_impl.hh:30
 
The namespace Dune is the main namespace for all Dune code.
Definition: common/CartesianIndexMapper.hpp:10
 
#define MY_INT_TEST
Definition: p2pcommunicator_impl.hh:113