opm-simulators
MPISerializer.hpp
1 /*
2  This file is part of the Open Porous Media project (OPM).
3 
4  OPM is free software: you can redistribute it and/or modify
5  it under the terms of the GNU General Public License as published by
6  the Free Software Foundation, either version 2 of the License, or
7  (at your option) any later version.
8 
9  OPM is distributed in the hope that it will be useful,
10  but WITHOUT ANY WARRANTY; without even the implied warranty of
11  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12  GNU General Public License for more details.
13 
14  You should have received a copy of the GNU General Public License
15  along with OPM. If not, see <http://www.gnu.org/licenses/>.
16 
17  Consult the COPYING file in the top-level source directory of this
18  module for the precise wording of the license and the list of
19  copyright holders.
20 */
21 #ifndef MPI_SERIALIZER_HPP
22 #define MPI_SERIALIZER_HPP
23 
24 #include <opm/common/utility/Serializer.hpp>
25 #include <opm/simulators/utils/MPIPacker.hpp>
26 #include <opm/simulators/utils/ParallelCommunication.hpp>
27 
28 namespace Opm::Parallel {
29 
32 struct RootRank
33 {
34  int value;
35 };
36 
38 class MpiSerializer : public Serializer<Mpi::Packer> {
39 public:
40  explicit MpiSerializer(Parallel::Communication comm)
41  : Serializer<Mpi::Packer>(m_packer)
42  , m_packer(comm)
43  , m_comm(comm)
44  {}
45 
46  template<typename... Args>
47  void broadcast(RootRank rootrank, Args&&... args)
48  {
49  if (m_comm.size() == 1)
50  return;
51 
52  const int root = rootrank.value;
53  if (m_comm.rank() == root) {
54  try {
55  this->pack(std::forward<Args>(args)...);
56  m_comm.broadcast(&m_packSize, 1, root);
57  broadcast_chunked(root);
58  } catch (...) {
59  m_packSize = std::numeric_limits<size_t>::max();
60  m_comm.broadcast(&m_packSize, 1, root);
61  throw;
62  }
63  } else {
64  m_comm.broadcast(&m_packSize, 1, root);
65  if (m_packSize == std::numeric_limits<size_t>::max()) {
66  throw std::runtime_error("Error detected in parallel serialization");
67  }
68  m_buffer.resize(m_packSize);
69  broadcast_chunked(root);
70  this->unpack(std::forward<Args>(args)...);
71  }
72  }
73 
74 
81  template<class T>
82  void append(T& data, int root = 0)
83  {
84  if (m_comm.size() == 1)
85  return;
86 
87  T tmp;
88  T& bcast = m_comm.rank() == root ? data : tmp;
89  broadcast(RootRank{root}, bcast);
90 
91  if (m_comm.rank() != root)
92  data.append(tmp);
93  }
94 
95 private:
96  void broadcast_chunked(int root) {
97  const int maxChunkSize = std::numeric_limits<int>::max();
98  std::size_t remainingSize = m_packSize;
99  std::size_t pos = 0;
100  while (remainingSize > maxChunkSize) {
101  m_comm.broadcast(m_buffer.data()+pos, maxChunkSize, root);
102  pos += maxChunkSize;
103  remainingSize -= maxChunkSize;
104  }
105  m_comm.broadcast(m_buffer.data()+pos, static_cast<int>(remainingSize), root);
106  }
107 
108  const Mpi::Packer m_packer;
109  Parallel::Communication m_comm;
110 };
111 
112 }
113 
114 #endif
Definition: MPISerializer.hpp:28
Class for serializing and broadcasting data using MPI.
Definition: MPISerializer.hpp:38
void append(T &data, int root=0)
Serialize and broadcast on root process, de-serialize and append on others.
Definition: MPISerializer.hpp:82
Avoid mistakes in calls to broadcast() by wrapping the root argument in an explicit type...
Definition: MPISerializer.hpp:32