dune-spgrid  2.7
messagebuffer.hh
Go to the documentation of this file.
1 #ifndef DUNE_GRID_SPGRID_MESSAGEBUFFER_HH
2 #define DUNE_GRID_SPGRID_MESSAGEBUFFER_HH
3 
4 #include <cassert>
5 #include <cstddef>
6 #include <cstdlib>
7 #include <cstring>
8 #include <utility>
9 #include <vector>
10 
11 
12 #include <dune/common/parallel/communication.hh>
13 #include <dune/common/parallel/mpicommunication.hh>
14 
15 namespace Dune
16 {
17 
18  // SPBasicPackedMessageWriteBuffer
19  // -------------------------------
20 
22  {
24 
25  public:
27 
28  SPBasicPackedMessageWriteBuffer ( const This & ) = delete;
29 
31  : buffer_( other.buffer_ ),
32  position_( other.position_ ), capacity_( other.capacity_ )
33  {
34  other.initialize();
35  }
36 
38 
39  This &operator= ( const This & ) = delete;
40 
41  This &operator= ( This &&other )
42  {
43  buffer_ = other.buffer_;
44  position_ = other.position_;
45  capacity_ = other.capacity_;
46  other.initialize();
47  return *this;
48  }
49 
50  template< class T >
51  void write ( const T &value )
52  {
53  reserve( position_ + sizeof( T ) );
54  std::memcpy( static_cast< char * >( buffer_ ) + position_, &value, sizeof( T ) );
55  position_ += sizeof( T );
56  }
57 
58  std::size_t position () const { return position_; }
59 
60  protected:
61  void initialize () { buffer_ = nullptr; position_ = 0; capacity_ = 0; }
62 
63  void reserve ( std::size_t size )
64  {
65  if( size <= capacity_ )
66  return;
67 
68  std::size_t capacity = std::max( size, 2*capacity_ );
69  void *buffer = std::realloc( buffer_, capacity );
70  if( !buffer )
71  {
72  capacity = capacity_ + size;
73  buffer = std::realloc( buffer_, capacity );
74  if( !buffer )
75  DUNE_THROW( OutOfMemoryError, "Cannot allocate sufficiently large buffer." );
76  }
77  buffer_ = buffer;
78  capacity_ = capacity;
79  }
80 
81  void *buffer_;
82  std::size_t position_, capacity_;
83  };
84 
85 
86 
87  // SPPackedMessageWriteBuffer
88  // --------------------------
89 
90  template< class CollectiveCommunication >
92 
93  template< class C >
94  class SPPackedMessageWriteBuffer< CollectiveCommunication< C > >
96  {
99 
100  public:
101  explicit SPPackedMessageWriteBuffer ( const CollectiveCommunication< C > &comm ) {}
102 
103  void send ( int rank, int tag ) {}
104  void wait () {}
105  };
106 
107 #if HAVE_MPI
108  template<>
109  class SPPackedMessageWriteBuffer< CollectiveCommunication< MPI_Comm > >
111  {
114 
115  public:
116  explicit SPPackedMessageWriteBuffer ( const CollectiveCommunication< MPI_Comm > &comm ) : comm_( comm ) {}
117 
118  void send ( int rank, int tag )
119  {
120  MPI_Isend( buffer_, position_, MPI_PACKED, rank, tag, comm_, &request_ );
121  }
122 
123  void wait () { MPI_Wait( &request_, MPI_STATUS_IGNORE ); }
124 
125  protected:
126  MPI_Comm comm_;
127  MPI_Request request_;
128  };
129 #endif // #if HAVE_MPI
130 
131 
132 
133  // SPBasicPackedMessageReadBuffer
134  // ------------------------------
135 
137  {
139 
140  public:
142 
143  SPBasicPackedMessageReadBuffer ( const This & ) = delete;
144 
146  : buffer_( other.buffer_ ),
147  position_( other.position_ ), size_( other.size_ )
148  {
149  other.initialize();
150  }
151 
153 
154  This &operator= ( const This & ) = delete;
155 
156  This &operator= ( This &&other )
157  {
158  buffer_ = other.buffer_;
159  position_ = other.position_;
160  size_ = other.size_;
161  other.initialize();
162  return *this;
163  }
164 
165  template< class T >
166  void read ( T &value )
167  {
168  if( position_ + sizeof( T ) <= size_ )
169  {
170  std::memcpy( static_cast< void * >( &value ), static_cast< char * >( buffer_ ) + position_, sizeof( T ) );
171  position_ += sizeof( T );
172  }
173  else
174  DUNE_THROW( IOError, "Cannot read beyond the buffer's end." );
175  }
176 
177  std::size_t position () const { return position_; }
178 
179  protected:
180  void initialize () { buffer_ = nullptr; position_ = 0; size_ = 0; }
181 
182  void reset ( std::size_t size )
183  {
184  std::free( buffer_ );
185  initialize();
186  if( size == 0 )
187  return;
188  buffer_ = std::malloc( size );
189  if( !buffer_ )
190  DUNE_THROW( OutOfMemoryError, "Cannot allocate sufficiently large buffer." );
191  size_ = size;
192  }
193 
194  void *buffer_;
195  std::size_t position_, size_;
196  };
197 
198 
199 
200  // SPPackedMessageReadBuffer
201  // -------------------------
202 
203  template< class CollectiveCommunication >
205 
206  template< class C >
207  class SPPackedMessageReadBuffer< CollectiveCommunication< C > >
209  {
212 
213  public:
214  explicit SPPackedMessageReadBuffer ( const CollectiveCommunication< C > &comm ) {}
215 
216  void receive ( int rank, int rag, std::size_t size )
217  {
218  DUNE_THROW( IOError, "Nothing to receive in a serial communication." );
219  }
220 
221  void receive ( int rank, int tag ) { receive( rank, tag, 0 ); }
222  void receive ( int tag ) { receive( 0, tag, 0 ); }
223 
224  int rank () const { return 0 ; }
225 
226  void wait () {}
227 
228  friend inline typename std::vector< This >::iterator waitAny ( std::vector< This > &readBuffers )
229  {
230  return readBuffers.end();
231  }
232  };
233 
234 #if HAVE_MPI
235  template<>
236  class SPPackedMessageReadBuffer< CollectiveCommunication< MPI_Comm > >
238  {
241 
242  public:
243  SPPackedMessageReadBuffer ( const CollectiveCommunication< MPI_Comm > &comm ) : comm_( comm ) {}
244 
245  void receive ( int rank, int tag, std::size_t size )
246  {
247  rank_ = rank;
248  reset( size );
249  MPI_Irecv( buffer_, size_, MPI_BYTE, rank, tag, comm_, &request_ );
250  }
251 
252  void receive ( int rank, int tag )
253  {
254  MPI_Status status;
255  MPI_Probe( rank, tag, comm_, &status );
256  int count;
257  MPI_Get_count( &status, MPI_BYTE, &count );
258  receive( status.MPI_SOURCE, tag, count );
259  }
260 
261  void receive ( int tag ) { receive( MPI_ANY_SOURCE, tag ); }
262 
263  int rank () const { return rank_; }
264 
265  void wait () { MPI_Wait( &request_, MPI_STATUS_IGNORE ); }
266 
267  friend inline typename std::vector< This >::iterator waitAny ( std::vector< This > &readBuffers )
268  {
269  const std::size_t numBuffers = readBuffers.size();
270  std::vector< MPI_Request > requests( numBuffers );
271  for( std::size_t i = 0; i < numBuffers; ++i )
272  requests[ i ] = readBuffers[ i ].request_;
273 
274  int index = MPI_UNDEFINED;
275  MPI_Waitany( numBuffers, requests.data(), &index, MPI_STATUS_IGNORE );
276  if( index == MPI_UNDEFINED )
277  return readBuffers.end();
278 
279  readBuffers[ index ].request_ = requests[ index ];
280  return readBuffers.begin() + index;
281  }
282 
283  protected:
284  int rank_;
285  MPI_Comm comm_;
286  MPI_Request request_;
287  };
288 #endif // #if HAVE_MPI
289 
290 } // namespace Dune
291 
292 #endif // #ifndef DUNE_GRID_SPGRID_MESSAGEBUFFER_HH
Definition: iostream.hh:7
Definition: messagebuffer.hh:22
void reserve(std::size_t size)
Definition: messagebuffer.hh:63
void initialize()
Definition: messagebuffer.hh:61
void * buffer_
Definition: messagebuffer.hh:81
SPBasicPackedMessageWriteBuffer(This &&other)
Definition: messagebuffer.hh:30
std::size_t capacity_
Definition: messagebuffer.hh:82
std::size_t position() const
Definition: messagebuffer.hh:58
This & operator=(const This &)=delete
SPBasicPackedMessageWriteBuffer(const This &)=delete
SPBasicPackedMessageWriteBuffer()
Definition: messagebuffer.hh:26
std::size_t position_
Definition: messagebuffer.hh:82
void write(const T &value)
Definition: messagebuffer.hh:51
~SPBasicPackedMessageWriteBuffer()
Definition: messagebuffer.hh:37
Definition: messagebuffer.hh:91
SPPackedMessageWriteBuffer(const CollectiveCommunication< C > &comm)
Definition: messagebuffer.hh:101
void send(int rank, int tag)
Definition: messagebuffer.hh:103
void send(int rank, int tag)
Definition: messagebuffer.hh:118
SPPackedMessageWriteBuffer(const CollectiveCommunication< MPI_Comm > &comm)
Definition: messagebuffer.hh:116
Definition: messagebuffer.hh:137
void reset(std::size_t size)
Definition: messagebuffer.hh:182
SPBasicPackedMessageReadBuffer(const This &)=delete
std::size_t size_
Definition: messagebuffer.hh:195
void * buffer_
Definition: messagebuffer.hh:194
This & operator=(const This &)=delete
std::size_t position_
Definition: messagebuffer.hh:195
SPBasicPackedMessageReadBuffer()
Definition: messagebuffer.hh:141
std::size_t position() const
Definition: messagebuffer.hh:177
void initialize()
Definition: messagebuffer.hh:180
~SPBasicPackedMessageReadBuffer()
Definition: messagebuffer.hh:152
SPBasicPackedMessageReadBuffer(This &&other)
Definition: messagebuffer.hh:145
void read(T &value)
Definition: messagebuffer.hh:166
Definition: messagebuffer.hh:204
int rank() const
Definition: messagebuffer.hh:224
void receive(int rank, int rag, std::size_t size)
Definition: messagebuffer.hh:216
SPPackedMessageReadBuffer(const CollectiveCommunication< C > &comm)
Definition: messagebuffer.hh:214
void receive(int tag)
Definition: messagebuffer.hh:222
friend std::vector< This >::iterator waitAny(std::vector< This > &readBuffers)
Definition: messagebuffer.hh:228
void receive(int rank, int tag)
Definition: messagebuffer.hh:221
SPPackedMessageReadBuffer(const CollectiveCommunication< MPI_Comm > &comm)
Definition: messagebuffer.hh:243
void receive(int tag)
Definition: messagebuffer.hh:261
friend std::vector< This >::iterator waitAny(std::vector< This > &readBuffers)
Definition: messagebuffer.hh:267
void receive(int rank, int tag)
Definition: messagebuffer.hh:252
void receive(int rank, int tag, std::size_t size)
Definition: messagebuffer.hh:245