VariableSizeCommunicator.hpp
Go to the documentation of this file.
1// -*- tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*-
2// vi: set et ts=4 sw=2 sts=2:
3
4// This file is copied from file dune/common/variablesizecommunicator.hh
5// of dune-common as of commit 30e78443884f7730229e6c59b14406051d71c032.
6// It is needed as we need an important bugfix (commit 30e78443884)
7// that is not included in DUNE
8// versions below 2.8. We have moved the classes to the Opm namespace.
9// The file's main author is Markus Blatt. Others have only contributed
10// minor fixes proposed by compilers, and codespell to prevent unused warnings
11// and variables or improved documentation.
12/*
13Copyright 2013-2018 Dr. Blatt - HPC-Simulation-Software & Services
14Copyright 2013-2017 Statoil ASA
15*/
16#ifndef OPM_COMMON_PARALLEL_VARIABLESIZECOMMUNICATOR_HH // Still fits the line!
17#define OPM_COMMON_PARALLEL_VARIABLESIZECOMMUNICATOR_HH
18
19#if HAVE_MPI
20
21#include <algorithm>
22#include <cassert>
23#include <cstddef>
24#include <functional>
25#include <map>
26#include <memory>
27#include <utility>
28#include <vector>
29#include <algorithm>
30
31#include <mpi.h>
32
33#include <dune/common/parallel/interface.hh>
34#include <dune/common/parallel/mpitraits.hh>
35#include <dune/common/unused.hh>
36
49namespace Opm
50{
51
52namespace
53{
58template<class T, class Allocator=std::allocator<T> >
59class MessageBuffer
60{
61public:
66 explicit MessageBuffer(int size)
67 : buffer_(new T[size]), size_(size), position_(0)
68 {}
73 explicit MessageBuffer(const MessageBuffer& o)
74 : buffer_(new T[o.size_]), size_(o.size_), position_(o.position_)
75 {
76 }
78 ~MessageBuffer()
79 {
80 delete[] buffer_;
81 }
86 void write(const T& data)
87 {
88 assert(position_<size_);
89 buffer_[position_++]=data;
90 }
91
96 void read(T& data)
97 {
98 assert(position_<size_);
99 data=buffer_[position_++];
100 }
101
107 void reset()
108 {
109 position_=0;
110 }
111
116 bool finished()
117 {
118 return position_==size_;
119 }
120
126 bool hasSpaceForItems(int noItems)
127 {
128 return position_+noItems<=size_;
129 }
134 std::size_t size() const
135 {
136 return size_;
137 }
138 std::size_t position() const
139 {
140 return position_;
141 }
146 operator T*()
147 {
148 return buffer_;
149 }
150
151private:
155 T* buffer_;
159 std::size_t size_;
163 std::size_t position_;
164};
165
166using Dune::InterfaceInformation;
167using Dune::Interface;
168
172class InterfaceTracker
173{
174public:
180 InterfaceTracker(int rank, InterfaceInformation info, std::size_t fixedsize=0,
181 bool allocateSizes=false)
182 : fixedSize(fixedsize),rank_(rank), index_(), interface_(info), sizes_()
183 {
184 if(allocateSizes)
185 {
186 sizes_.resize(info.size());
187 }
188 }
189
193 void moveToNextIndex()
194 {
195 index_++;
196 assert(index_<=interface_.size());
197 skipZeroIndices();
198 }
203 void increment(std::size_t i)
204 {
205 index_+=i;
206 assert(index_<=interface_.size());
207 }
212 bool finished() const
213 {
214 return index_==interface_.size();
215 }
216
217 void skipZeroIndices()
218 {
219 // skip indices with size zero!
220 while(sizes_.size() && index_!=interface_.size() &&!size())
221 ++index_;
222 }
223
228 std::size_t index() const
229 {
230 return interface_[index_];
231 }
235 std::size_t size() const
236 {
237 assert(sizes_.size());
238 return sizes_[index_];
239 }
243 std::size_t* getSizesPointer()
244 {
245 return &sizes_[0];
246 }
251 bool empty() const
252 {
253 return !interface_.size();
254 }
255
260 std::size_t indicesLeft() const
261 {
262 return interface_.size()-index_;
263 }
267 std::size_t fixedSize;
271 int rank() const
272 {
273 return rank_;
274 }
278 std::size_t offset() const
279 {
280 return index_;
281 }
282private:
284 int rank_;
286 std::size_t index_;
288 InterfaceInformation interface_;
289 std::vector<std::size_t> sizes_;
290};
291
292
293} // end unnamed namespace
294
332template<class Allocator=std::allocator<std::pair<InterfaceInformation,InterfaceInformation> > >
334{
335public:
340 typedef std::map<int,std::pair<InterfaceInformation,InterfaceInformation>,
341 std::less<int>,
342 typename Allocator::template rebind<std::pair<const int,std::pair<InterfaceInformation,InterfaceInformation> > >::other> InterfaceMap;
343
344#ifndef DUNE_PARALLEL_MAX_COMMUNICATION_BUFFER_SIZE
351 VariableSizeCommunicator(MPI_Comm comm, const InterfaceMap& inf)
352 : maxBufferSize_(32768), interface_(&inf)
353 {
354 MPI_Comm_dup(comm, &communicator_);
355 }
360 VariableSizeCommunicator(const Interface& inf)
361 : maxBufferSize_(32768), interface_(&inf.interfaces())
362 {
363 MPI_Comm_dup(inf.communicator(), &communicator_);
364 }
365#else
372 VariableSizeCommunicator(MPI_Comm comm, InterfaceMap& inf)
373 : maxBufferSize_(DUNE_PARALLEL_MAX_COMMUNICATION_BUFFER_SIZE),
374 interface_(&inf)
375 {
376 MPI_Comm_dup(comm, &communicator_);
377 }
382 VariableSizeCommunicator(const Interface& inf)
383 : maxBufferSize_(DUNE_PARALLEL_MAX_COMMUNICATION_BUFFER_SIZE),
384 interface_(&inf.interfaces())
385 {
386 MPI_Comm_dup(inf.communicator(), &communicator_);
387 }
388#endif
395 VariableSizeCommunicator(MPI_Comm comm, const InterfaceMap& inf, std::size_t max_buffer_size)
396 : maxBufferSize_(max_buffer_size), interface_(&inf)
397 {
398 MPI_Comm_dup(comm, &communicator_);
399 }
400
406 VariableSizeCommunicator(const Interface& inf, std::size_t max_buffer_size)
407 : maxBufferSize_(max_buffer_size), interface_(&inf.interfaces())
408 {
409 MPI_Comm_dup(inf.communicator(), &communicator_);
410 }
411
413 {
414 MPI_Comm_free(&communicator_);
415 }
416
417
437 template<class DataHandle>
438 void forward(DataHandle& handle)
439 {
440 communicate<true>(handle);
441 }
442
462 template<class DataHandle>
463 void backward(DataHandle& handle)
464 {
465 communicate<false>(handle);
466 }
467
468private:
469 template<bool FORWARD, class DataHandle>
470 void communicateSizes(DataHandle& handle,
471 std::vector<InterfaceTracker>& recv_trackers);
472
479 template<bool forward,class DataHandle>
480 void communicate(DataHandle& handle);
490 template<bool FORWARD, class DataHandle>
491 void setupInterfaceTrackers(DataHandle& handle,
492 std::vector<InterfaceTracker>& send_trackers,
493 std::vector<InterfaceTracker>& recv_trackers);
501 template<bool FORWARD, class DataHandle>
502 [[maybe_unused]] void communicateFixedSize(DataHandle& handle);
510 template<bool FORWARD, class DataHandle>
511 void communicateVariableSize(DataHandle& handle);
518 std::size_t maxBufferSize_;
526 const InterfaceMap* interface_;
532 MPI_Comm communicator_;
533};
534
536namespace
537{
541template<class DataHandle>
542class SizeDataHandle
543{
544public:
545 typedef std::size_t DataType;
546
547 SizeDataHandle(DataHandle& data,
548 std::vector<InterfaceTracker>& trackers)
549 : data_(data), trackers_(trackers), index_()
550 {}
551 bool fixedsize()
552 {
553 return true;
554 }
555 std::size_t size(std::size_t i)
556 {
557 DUNE_UNUSED_PARAMETER(i);
558 return 1;
559 }
560 template<class B>
561 void gather(B& buf, int i)
562 {
563 buf.write(data_.size(i));
564 }
565 void setReceivingIndex(std::size_t i)
566 {
567 index_=i;
568 }
569 std::size_t* getSizesPointer()
570 {
571 return trackers_[index_].getSizesPointer();
572 }
573
574private:
575 DataHandle& data_;
576 std::vector<InterfaceTracker>& trackers_;
577 int index_;
578};
579
580template<class T>
581void setReceivingIndex(T&, int)
582{}
583
584template<class T>
585void setReceivingIndex(SizeDataHandle<T>& t, int i)
586{
587 t.setReceivingIndex(i);
588}
589
590
596template<bool FORWARD>
597struct InterfaceInformationChooser
598{
602 static const InterfaceInformation&
603 getSend(const std::pair<InterfaceInformation,InterfaceInformation>& info)
604 {
605 return info.first;
606 }
607
611 static const InterfaceInformation&
612 getReceive(const std::pair<InterfaceInformation,InterfaceInformation>& info)
613 {
614 return info.second;
615 }
616};
617
618template<>
619struct InterfaceInformationChooser<false>
620{
621 static const InterfaceInformation&
622 getSend(const std::pair<InterfaceInformation,InterfaceInformation>& info)
623 {
624 return info.second;
625 }
626
627 static const InterfaceInformation&
628 getReceive(const std::pair<InterfaceInformation,InterfaceInformation>& info)
629 {
630 return info.first;
631 }
632};
633
639template<class DataHandle>
640struct PackEntries
641{
642
643 int operator()(DataHandle& handle, InterfaceTracker& tracker,
644 MessageBuffer<typename DataHandle::DataType>& buffer,
645 int i) const
646 {
647 DUNE_UNUSED_PARAMETER(i);
648 return operator()(handle,tracker,buffer);
649 }
650
658 int operator()(DataHandle& handle, InterfaceTracker& tracker,
659 MessageBuffer<typename DataHandle::DataType>& buffer) const
660 {
661 if(tracker.fixedSize) // fixed size if variable is >0!
662 {
663
664 std::size_t noIndices=std::min(buffer.size()/tracker.fixedSize, tracker.indicesLeft());
665 for(std::size_t i=0; i< noIndices; ++i)
666 {
667 handle.gather(buffer, tracker.index());
668 tracker.moveToNextIndex();
669 }
670 return noIndices*tracker.fixedSize;
671 }
672 else
673 {
674 int packed=0;
675 tracker.skipZeroIndices();
676 while(!tracker.finished())
677 if(buffer.hasSpaceForItems(handle.size(tracker.index())))
678 {
679 assert(std::size_t(packed) == buffer.position());
680 handle.gather(buffer, tracker.index());
681 packed+=handle.size(tracker.index());
682 assert(std::size_t(packed) <= buffer.size());
683 assert(std::size_t(packed) == buffer.position());
684 tracker.moveToNextIndex();
685 }
686 else
687 break;
688 return packed;
689 }
690 }
691};
692
698template<class DataHandle>
699struct UnpackEntries{
700
708 bool operator()(DataHandle& handle, InterfaceTracker& tracker,
709 MessageBuffer<typename DataHandle::DataType>& buffer,
710 int count=0)
711 {
712 if(tracker.fixedSize) // fixed size if variable is >0!
713 {
714 std::size_t noIndices=std::min(buffer.size()/tracker.fixedSize, tracker.indicesLeft());
715
716 for(std::size_t i=0; i< noIndices; ++i)
717 {
718 handle.scatter(buffer, tracker.index(), tracker.fixedSize);
719 tracker.moveToNextIndex();
720 }
721 return tracker.finished();
722 }
723 else
724 {
725 assert(count);
726 for(int unpacked=0;unpacked<count;)
727 {
728 assert(!tracker.finished());
729 assert(buffer.hasSpaceForItems(tracker.size()));
730 handle.scatter(buffer, tracker.index(), tracker.size());
731 unpacked+=tracker.size();
732 tracker.moveToNextIndex();
733 }
734 return tracker.finished();
735 }
736 }
737};
738
739
743template<class DataHandle>
744struct UnpackSizeEntries{
745
753 bool operator()(SizeDataHandle<DataHandle>& handle, InterfaceTracker& tracker,
754 MessageBuffer<typename SizeDataHandle<DataHandle>::DataType>& buffer) const
755 {
756 std::size_t noIndices=std::min(buffer.size(), tracker.indicesLeft());
757 std::copy(static_cast<std::size_t*>(buffer), static_cast<std::size_t*>(buffer)+noIndices,
758 handle.getSizesPointer()+tracker.offset());
759 tracker.increment(noIndices);
760 return noIndices;
761 }
762 bool operator()(SizeDataHandle<DataHandle>& handle, InterfaceTracker& tracker,
763 MessageBuffer<typename SizeDataHandle<DataHandle>::DataType>& buffer, int) const
764 {
765 return operator()(handle,tracker,buffer);
766 }
767};
768
776[[maybe_unused]] void sendFixedSize(std::vector<InterfaceTracker>& send_trackers,
777 std::vector<MPI_Request>& send_requests,
778 std::vector<InterfaceTracker>& recv_trackers,
779 std::vector<MPI_Request>& recv_requests,
780 MPI_Comm communicator)
781{
782 typedef std::vector<InterfaceTracker>::iterator TIter;
783 std::vector<MPI_Request>::iterator mIter=recv_requests.begin();
784
785 for(TIter iter=recv_trackers.begin(), end=recv_trackers.end(); iter!=end;
786 ++iter, ++mIter)
787 {
788 MPI_Irecv(&(iter->fixedSize), 1, Dune::MPITraits<std::size_t>::getType(),
789 iter->rank(), 933881, communicator, &(*mIter));
790 }
791
792 // Send our size to all neighbours using non-blocking synchronous communication.
793 std::vector<MPI_Request>::iterator mIter1=send_requests.begin();
794 for(TIter iter=send_trackers.begin(), end=send_trackers.end();
795 iter!=end;
796 ++iter, ++mIter1)
797 {
798 MPI_Issend(&(iter->fixedSize), 1, Dune::MPITraits<std::size_t>::getType(),
799 iter->rank(), 933881, communicator, &(*mIter1));
800 }
801}
802
803
808template<class DataHandle>
809struct SetupSendRequest{
810 void operator()(DataHandle& handle,
811 InterfaceTracker& tracker,
812 MessageBuffer<typename DataHandle::DataType>& buffer,
813 MPI_Request& request,
814 MPI_Comm comm) const
815 {
816 buffer.reset();
817 int size=PackEntries<DataHandle>()(handle, tracker, buffer);
818 // Skip indices of zero size.
819 while(!tracker.finished() && !handle.size(tracker.index()))
820 tracker.moveToNextIndex();
821 if(size)
822 {
823 assert(std::size_t(size) <= buffer.size());
824 MPI_Issend(buffer, size, Dune::MPITraits<typename DataHandle::DataType>::getType(),
825 tracker.rank(), 933399, comm, &request);
826 }
827 }
828};
829
830
835template<class DataHandle>
836struct SetupRecvRequest{
837 void operator()(DataHandle& /*handle*/,
838 InterfaceTracker& tracker,
839 MessageBuffer<typename DataHandle::DataType>& buffer,
840 MPI_Request& request,
841 MPI_Comm comm) const
842 {
843 buffer.reset();
844 if(tracker.indicesLeft())
845 MPI_Irecv(buffer, buffer.size(), Dune::MPITraits<typename DataHandle::DataType>::getType(),
846 tracker.rank(), 933399, comm, &request);
847 }
848};
849
853template<class DataHandle>
854struct NullPackUnpackFunctor
855{
856 int operator()(DataHandle&, InterfaceTracker&,
857 MessageBuffer<typename DataHandle::DataType>&, int)
858 {
859 return 0;
860 }
861 int operator()(DataHandle&, InterfaceTracker&,
862 MessageBuffer<typename DataHandle::DataType>&)
863 {
864 return 0;
865 }
866};
867
882template<class DataHandle, class BufferFunctor, class CommunicationFunctor>
883std::size_t checkAndContinue(DataHandle& handle,
884 std::vector<InterfaceTracker>& trackers,
885 std::vector<MPI_Request>& requests,
886 std::vector<MPI_Request>& requests2,
887 std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
888 MPI_Comm comm,
889 BufferFunctor buffer_func,
890 CommunicationFunctor comm_func,
891 bool valid=true,
892 bool getCount=false)
893{
894 std::size_t size=requests.size();
895 std::vector<MPI_Status> statuses(size);
896 int no_completed;
897 std::vector<int> indices(size, -1); // the indices for which the communication finished.
898
899 MPI_Testsome(size, &(requests[0]), &no_completed, &(indices[0]), &(statuses[0]));
900 indices.resize(no_completed);
901 for(std::vector<int>::iterator index=indices.begin(), end=indices.end();
902 index!=end; ++index)
903 {
904 InterfaceTracker& tracker=trackers[*index];
905 setReceivingIndex(handle, *index);
906 if(getCount)
907 {
908 // Get the number of entries received
909 int count;
910 MPI_Get_count(&(statuses[index-indices.begin()]),
911 Dune::MPITraits<typename DataHandle::DataType>::getType(),
912 &count);
913 // Communication completed, we can reuse the buffers, e.g. unpack or repack
914 buffer_func(handle, tracker, buffers[*index], count);
915 }else
916 buffer_func(handle, tracker, buffers[*index]);
917 tracker.skipZeroIndices();
918 if(!tracker.finished()){
919 // Maybe start another communication.
920 comm_func(handle, tracker, buffers[*index], requests2[*index], comm);
921 tracker.skipZeroIndices();
922 if(valid)
923 --no_completed; // communication not finished, decrement counter for finished ones.
924 }
925 }
926 return no_completed;
927
928}
929
939template<class DataHandle>
940std::size_t receiveSizeAndSetupReceive(DataHandle& handle,
941 std::vector<InterfaceTracker>& trackers,
942 std::vector<MPI_Request>& size_requests,
943 std::vector<MPI_Request>& data_requests,
944 std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
945 MPI_Comm comm)
946{
947 return checkAndContinue(handle, trackers, size_requests, data_requests, buffers, comm,
948 NullPackUnpackFunctor<DataHandle>(), SetupRecvRequest<DataHandle>(), false);
949}
950
959template<class DataHandle>
960std::size_t checkSendAndContinueSending(DataHandle& handle,
961 std::vector<InterfaceTracker>& trackers,
962 std::vector<MPI_Request>& requests,
963 std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
964 MPI_Comm comm)
965{
966 return checkAndContinue(handle, trackers, requests, requests, buffers, comm,
967 NullPackUnpackFunctor<DataHandle>(), SetupSendRequest<DataHandle>());
968}
969
978template<class DataHandle>
979std::size_t checkReceiveAndContinueReceiving(DataHandle& handle,
980 std::vector<InterfaceTracker>& trackers,
981 std::vector<MPI_Request>& requests,
982 std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
983 MPI_Comm comm)
984{
985 return checkAndContinue(handle, trackers, requests, requests, buffers, comm,
986 UnpackEntries<DataHandle>(), SetupRecvRequest<DataHandle>(),
987 true, !handle.fixedsize());
988}
989
990
991[[maybe_unused]] bool validRecvRequests(const std::vector<MPI_Request> reqs)
992{
993 for(std::vector<MPI_Request>::const_iterator i=reqs.begin(), end=reqs.end();
994 i!=end; ++i)
995 if(*i!=MPI_REQUEST_NULL)
996 return true;
997 return false;
998}
999
1010template<class DataHandle, class Functor>
1011std::size_t setupRequests(DataHandle& handle,
1012 std::vector<InterfaceTracker>& trackers,
1013 std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
1014 std::vector<MPI_Request>& requests,
1015 const Functor& setupFunctor,
1016 MPI_Comm communicator)
1017{
1018 typedef typename std::vector<InterfaceTracker>::iterator TIter;
1019 typename std::vector<MessageBuffer<typename DataHandle::DataType> >::iterator
1020 biter=buffers.begin();
1021 typename std::vector<MPI_Request>::iterator riter=requests.begin();
1022 std::size_t complete=0;
1023 for(TIter titer=trackers.begin(), end=trackers.end(); titer!=end; ++titer, ++biter, ++riter)
1024 {
1025 setupFunctor(handle, *titer, *biter, *riter, communicator);
1026 complete+=titer->finished();
1027 }
1028 return complete;
1029}
1030} // end unnamed namespace
1031
1032template<class Allocator>
1033template<bool FORWARD, class DataHandle>
1034void VariableSizeCommunicator<Allocator>::setupInterfaceTrackers(DataHandle& handle,
1035 std::vector<InterfaceTracker>& send_trackers,
1036 std::vector<InterfaceTracker>& recv_trackers)
1037{
1038 if(interface_->size()==0)
1039 return;
1040 send_trackers.reserve(interface_->size());
1041 recv_trackers.reserve(interface_->size());
1042
1043 int fixedsize=0;
1044 if(handle.fixedsize())
1045 ++fixedsize;
1046
1047
1048 typedef typename InterfaceMap::const_iterator IIter;
1049 for(IIter inf=interface_->begin(), end=interface_->end(); inf!=end; ++inf)
1050 {
1051
1052 if(handle.fixedsize() && InterfaceInformationChooser<FORWARD>::getSend(inf->second).size())
1053 fixedsize=handle.size(InterfaceInformationChooser<FORWARD>::getSend(inf->second)[0]);
1054 assert(!handle.fixedsize()||fixedsize>0);
1055 send_trackers.push_back(InterfaceTracker(inf->first,
1056 InterfaceInformationChooser<FORWARD>::getSend(inf->second), fixedsize));
1057 recv_trackers.push_back(InterfaceTracker(inf->first,
1058 InterfaceInformationChooser<FORWARD>::getReceive(inf->second), fixedsize, fixedsize==0));
1059 }
1060}
1061
1062template<class Allocator>
1063template<bool FORWARD, class DataHandle>
1064void VariableSizeCommunicator<Allocator>::communicateFixedSize(DataHandle& handle)
1065{
1066 std::vector<MPI_Request> size_send_req(interface_->size());
1067 std::vector<MPI_Request> size_recv_req(interface_->size());
1068
1069 std::vector<InterfaceTracker> send_trackers;
1070 std::vector<InterfaceTracker> recv_trackers;
1071 setupInterfaceTrackers<FORWARD>(handle,send_trackers, recv_trackers);
1072 sendFixedSize(send_trackers, size_send_req, recv_trackers, size_recv_req, communicator_);
1073
1074 std::vector<MPI_Request> data_send_req(interface_->size(), MPI_REQUEST_NULL);
1075 std::vector<MPI_Request> data_recv_req(interface_->size(), MPI_REQUEST_NULL);
1076 typedef typename DataHandle::DataType DataType;
1077 std::vector<MessageBuffer<DataType> > send_buffers(interface_->size(), MessageBuffer<DataType>(maxBufferSize_)),
1078 recv_buffers(interface_->size(), MessageBuffer<DataType>(maxBufferSize_));
1079
1080
1081 setupRequests(handle, send_trackers, send_buffers, data_send_req,
1082 SetupSendRequest<DataHandle>(), communicator_);
1083
1084 std::size_t no_size_to_recv, no_to_send, no_to_recv, old_size;
1085 no_size_to_recv = no_to_send = no_to_recv = old_size = interface_->size();
1086
1087 // Skip empty interfaces.
1088 typedef typename std::vector<InterfaceTracker>::const_iterator Iter;
1089 for(Iter i=recv_trackers.begin(), end=recv_trackers.end(); i!=end; ++i)
1090 if(i->empty())
1091 --no_to_recv;
1092 for(Iter i=send_trackers.begin(), end=send_trackers.end(); i!=end; ++i)
1093 if(i->empty())
1094 --no_to_send;
1095
1096 while(no_size_to_recv+no_to_send+no_to_recv)
1097 {
1098 // Receive the fixedsize and setup receives accordingly
1099 if(no_size_to_recv)
1100 no_size_to_recv -= receiveSizeAndSetupReceive(handle,recv_trackers, size_recv_req,
1101 data_recv_req, recv_buffers,
1102 communicator_);
1103
1104 // Check send completion and initiate other necessary sends
1105 if(no_to_send)
1106 no_to_send -= checkSendAndContinueSending(handle, send_trackers, data_send_req,
1107 send_buffers, communicator_);
1108 if(validRecvRequests(data_recv_req))
1109 // Receive data and setup new unblocking receives if necessary
1110 no_to_recv -= checkReceiveAndContinueReceiving(handle, recv_trackers, data_recv_req,
1111 recv_buffers, communicator_);
1112 }
1113
1114 // Wait for completion of sending the size.
1115 //std::vector<MPI_Status> statuses(interface_->size(), MPI_STATUSES_IGNORE);
1116 MPI_Waitall(size_send_req.size(), &(size_send_req[0]), MPI_STATUSES_IGNORE);
1117
1118}
1119
1120template<class Allocator>
1121template<bool FORWARD, class DataHandle>
1122void VariableSizeCommunicator<Allocator>::communicateSizes(DataHandle& handle,
1123 std::vector<InterfaceTracker>& data_recv_trackers)
1124{
1125 std::vector<InterfaceTracker> send_trackers;
1126 std::vector<InterfaceTracker> recv_trackers;
1127 std::size_t size = interface_->size();
1128 std::vector<MPI_Request> send_requests(size, MPI_REQUEST_NULL);
1129 std::vector<MPI_Request> recv_requests(size, MPI_REQUEST_NULL);
1130 std::vector<MessageBuffer<std::size_t> >
1131 send_buffers(size, MessageBuffer<std::size_t>(maxBufferSize_)),
1132 recv_buffers(size, MessageBuffer<std::size_t>(maxBufferSize_));
1133 SizeDataHandle<DataHandle> size_handle(handle,data_recv_trackers);
1134 setupInterfaceTrackers<FORWARD>(size_handle,send_trackers, recv_trackers);
1135 setupRequests(size_handle, send_trackers, send_buffers, send_requests,
1136 SetupSendRequest<SizeDataHandle<DataHandle> >(), communicator_);
1137 setupRequests(size_handle, recv_trackers, recv_buffers, recv_requests,
1138 SetupRecvRequest<SizeDataHandle<DataHandle> >(), communicator_);
1139
1140 // Count valid requests that we have to wait for.
1141 auto valid_req_func =
1142 [](const MPI_Request& req) { return req != MPI_REQUEST_NULL; };
1143
1144 auto size_to_send = std::count_if(send_requests.begin(), send_requests.end(),
1145 valid_req_func);
1146 auto size_to_recv = std::count_if(recv_requests.begin(), recv_requests.end(),
1147 valid_req_func);
1148
1149 while(size_to_send+size_to_recv)
1150 {
1151 if(size_to_send)
1152 size_to_send -=
1153 checkSendAndContinueSending(size_handle, send_trackers, send_requests,
1154 send_buffers, communicator_);
1155 if(size_to_recv)
1156 // Could have done this using checkSendAndContinueSending
1157 // But the call below is more efficient as UnpackSizeEntries
1158 // uses std::copy.
1159 size_to_recv -=
1160 checkAndContinue(size_handle, recv_trackers, recv_requests, recv_requests,
1161 recv_buffers, communicator_, UnpackSizeEntries<DataHandle>(),
1162 SetupRecvRequest<SizeDataHandle<DataHandle> >());
1163 }
1164}
1165
1166template<class Allocator>
1167template<bool FORWARD, class DataHandle>
1168void VariableSizeCommunicator<Allocator>::communicateVariableSize(DataHandle& handle)
1169{
1170
1171 std::vector<InterfaceTracker> send_trackers;
1172 std::vector<InterfaceTracker> recv_trackers;
1173 setupInterfaceTrackers<FORWARD>(handle, send_trackers, recv_trackers);
1174
1175 std::vector<MPI_Request> send_requests(interface_->size(), MPI_REQUEST_NULL);
1176 std::vector<MPI_Request> recv_requests(interface_->size(), MPI_REQUEST_NULL);
1177 typedef typename DataHandle::DataType DataType;
1178 std::vector<MessageBuffer<DataType> >
1179 send_buffers(interface_->size(), MessageBuffer<DataType>(maxBufferSize_)),
1180 recv_buffers(interface_->size(), MessageBuffer<DataType>(maxBufferSize_));
1181
1182 communicateSizes<FORWARD>(handle, recv_trackers);
1183 // Setup requests for sending and receiving.
1184 setupRequests(handle, send_trackers, send_buffers, send_requests,
1185 SetupSendRequest<DataHandle>(), communicator_);
1186 setupRequests(handle, recv_trackers, recv_buffers, recv_requests,
1187 SetupRecvRequest<DataHandle>(), communicator_);
1188
1189 // Determine number of valid requests.
1190 auto valid_req_func =
1191 [](const MPI_Request& req) { return req != MPI_REQUEST_NULL;};
1192
1193 auto no_to_send = std::count_if(send_requests.begin(), send_requests.end(),
1194 valid_req_func);
1195 auto no_to_recv = std::count_if(recv_requests.begin(), recv_requests.end(),
1196 valid_req_func);
1197 while(no_to_send+no_to_recv)
1198 {
1199 // Check send completion and initiate other necessary sends
1200 if(no_to_send)
1201 no_to_send -= checkSendAndContinueSending(handle, send_trackers, send_requests,
1202 send_buffers, communicator_);
1203 if(no_to_recv)
1204 // Receive data and setup new unblocking receives if necessary
1205 no_to_recv -= checkReceiveAndContinueReceiving(handle, recv_trackers, recv_requests,
1206 recv_buffers, communicator_);
1207 }
1208}
1209
1210template<class Allocator>
1211template<bool FORWARD, class DataHandle>
1212void VariableSizeCommunicator<Allocator>::communicate(DataHandle& handle)
1213{
1214 if( interface_->size() == 0)
1215 // Simply return as otherwise we will index an empty container
1216 // either for MPI_Wait_all or MPI_Test_some.
1217 return;
1218
1219 if(handle.fixedsize())
1220 communicateFixedSize<FORWARD>(handle);
1221 else
1222 communicateVariableSize<FORWARD>(handle);
1223}
1224} // end namespace Dune
1225
1226#endif // HAVE_MPI
1227
1228#endif
DataHandle & data
Definition: CpGridData.hpp:1085
mover::MoveBuffer< typename DataHandle::DataType > & buffer
Definition: CpGridData.hpp:1103
std::size_t fixedSize
The number of data items per index if it is fixed, 0 otherwise.
Definition: VariableSizeCommunicator.hpp:267
A buffered communicator where the amount of data sent does not have to be known a priori.
Definition: VariableSizeCommunicator.hpp:334
void backward(DataHandle &handle)
Communicate backwards.
Definition: VariableSizeCommunicator.hpp:463
VariableSizeCommunicator(const Interface &inf, std::size_t max_buffer_size)
Creates a communicator with a specific maximum buffer size.
Definition: VariableSizeCommunicator.hpp:406
VariableSizeCommunicator(MPI_Comm comm, const InterfaceMap &inf, std::size_t max_buffer_size)
Creates a communicator with a specific maximum buffer size.
Definition: VariableSizeCommunicator.hpp:395
std::map< int, std::pair< InterfaceInformation, InterfaceInformation >, std::less< int >, typename Allocator::template rebind< std::pair< const int, std::pair< InterfaceInformation, InterfaceInformation > > >::other > InterfaceMap
The type of the map from process number to InterfaceInformation for sending and receiving to and from...
Definition: VariableSizeCommunicator.hpp:342
void forward(DataHandle &handle)
Communicate forward.
Definition: VariableSizeCommunicator.hpp:438
~VariableSizeCommunicator()
Definition: VariableSizeCommunicator.hpp:412
VariableSizeCommunicator(const Interface &inf)
Creates a communicator with the default maximum buffer size.
Definition: VariableSizeCommunicator.hpp:360
VariableSizeCommunicator(MPI_Comm comm, const InterfaceMap &inf)
Creates a communicator with the default maximum buffer size.
Definition: VariableSizeCommunicator.hpp:351
T * increment(T *cc, int i, int dim)
Increment an iterator over an array that reresents a dense row-major matrix with dims columns.
Definition: GridHelpers.hpp:354
Holds the implementation of the CpGrid as a pimple.
Definition: CellQuadrature.hpp:26