xt_exchanger_mix_isend_irecv.c 11.5 KB
Newer Older
Moritz Hanke's avatar
Moritz Hanke committed
1
2
3
/**
 * @file xt_exchanger_mix_isend_irecv.c
 *
Thomas Jahns's avatar
Thomas Jahns committed
4
 * @copyright Copyright  (C)  2016 Jörg Behrens <behrens@dkrz.de>
Moritz Hanke's avatar
Moritz Hanke committed
5
6
7
 *                                 Moritz Hanke <hanke@dkrz.de>
 *                                 Thomas Jahns <jahns@dkrz.de>
 *
Thomas Jahns's avatar
Thomas Jahns committed
8
 * @author Jörg Behrens <behrens@dkrz.de>
Moritz Hanke's avatar
Moritz Hanke committed
9
10
11
12
13
 *         Moritz Hanke <hanke@dkrz.de>
 *         Thomas Jahns <jahns@dkrz.de>
 */
/*
 * Keywords:
Thomas Jahns's avatar
Thomas Jahns committed
14
 * Maintainer: Jörg Behrens <behrens@dkrz.de>
Moritz Hanke's avatar
Moritz Hanke committed
15
16
 *             Moritz Hanke <hanke@dkrz.de>
 *             Thomas Jahns <jahns@dkrz.de>
Moritz Hanke's avatar
Moritz Hanke committed
17
 * URL: https://doc.redmine.dkrz.de/yaxt/html/
Moritz Hanke's avatar
Moritz Hanke committed
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are  permitted provided that the following conditions are
 * met:
 *
 * Redistributions of source code must retain the above copyright notice,
 * this list of conditions and the following disclaimer.
 *
 * Redistributions in binary form must reproduce the above copyright
 * notice, this list of conditions and the following disclaimer in the
 * documentation and/or other materials provided with the distribution.
 *
 * Neither the name of the DKRZ GmbH nor the names of its contributors
 * may be used to endorse or promote products derived from this software
 * without specific prior written permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
 * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
 * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
 * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 */
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif

50
#include <assert.h>
Moritz Hanke's avatar
Moritz Hanke committed
51
52
53
54
55
#include <mpi.h>

#include "core/core.h"
#include "core/ppm_xfuncs.h"
#include "xt/xt_mpi.h"
56
#include "xt/xt_request_msgs.h"
Thomas Jahns's avatar
Thomas Jahns committed
57
#include "xt_mpi_internal.h"
Moritz Hanke's avatar
Moritz Hanke committed
58
59
60
61
#include "xt_redist_internal.h"
#include "xt_exchanger.h"
#include "xt_exchanger_mix_isend_irecv.h"

62
63
64
static Xt_exchanger
xt_exchanger_mix_isend_irecv_copy(Xt_exchanger exchanger,
                                  MPI_Comm newComm, int new_tag_offset);
Moritz Hanke's avatar
Moritz Hanke committed
65
66
67
68
static void xt_exchanger_mix_isend_irecv_delete(Xt_exchanger exchanger);
static void xt_exchanger_mix_isend_irecv_s_exchange(Xt_exchanger exchanger,
                                                    const void * src_data,
                                                    void * dst_data);
69
70
71
static void xt_exchanger_mix_isend_irecv_a_exchange(
  Xt_exchanger exchanger, const void * src_data, void * dst_data,
  Xt_request *request);
72
73
74
static int
xt_exchanger_mix_isend_irecv_get_msg_ranks(Xt_exchanger exchanger,
                                           enum xt_msg_direction direction,
75
                                           int *restrict *ranks);
76

77
78
79
80
static MPI_Datatype
xt_exchanger_mix_isend_irecv_get_MPI_Datatype(Xt_exchanger exchanger,
                                              int rank,
                                              enum xt_msg_direction direction);
Moritz Hanke's avatar
Moritz Hanke committed
81
82

static const struct xt_exchanger_vtable exchanger_mix_isend_irecv_vtable = {
83
  .copy = xt_exchanger_mix_isend_irecv_copy,
Moritz Hanke's avatar
Moritz Hanke committed
84
85
  .delete = xt_exchanger_mix_isend_irecv_delete,
  .s_exchange = xt_exchanger_mix_isend_irecv_s_exchange,
86
  .a_exchange = xt_exchanger_mix_isend_irecv_a_exchange,
87
  .get_msg_ranks = xt_exchanger_mix_isend_irecv_get_msg_ranks,
88
  .get_MPI_Datatype = xt_exchanger_mix_isend_irecv_get_MPI_Datatype,
Moritz Hanke's avatar
Moritz Hanke committed
89
90
91
92
93
94
};

typedef struct Xt_exchanger_mix_isend_irecv_ * Xt_exchanger_mix_isend_irecv;

struct mix_msg {
  struct Xt_redist_msg data;
95
96
97
98
#if SIZEOF_MPI_DATATYPE == 2 * SIZEOF_INT
#  define MSG_DIR(msg) ((enum xt_msg_direction)((msg).data.padding))
#  define type data.padding
#else
99
  enum xt_msg_direction type;
100
101
#  define MSG_DIR(msg) ((msg).type)
#endif
Moritz Hanke's avatar
Moritz Hanke committed
102
103
104
105
106
107
};

struct Xt_exchanger_mix_isend_irecv_ {

  const struct xt_exchanger_vtable * vtable;

108
  int n, tag_offset;
Moritz Hanke's avatar
Moritz Hanke committed
109
  MPI_Comm comm;
110
  struct mix_msg msgs[];
Moritz Hanke's avatar
Moritz Hanke committed
111
112
};

113
114
115
116
117
118
119
120
121
122
123
124
static Xt_exchanger_mix_isend_irecv
xt_exchanger_mix_isend_irecv_alloc(size_t nmsg)
{
  Xt_exchanger_mix_isend_irecv exchanger;
  size_t header_size = sizeof (*exchanger),
    body_size = sizeof (struct mix_msg) * nmsg;
  exchanger = xmalloc(header_size + body_size);
  exchanger->n = (int)nmsg;
  exchanger->vtable = &exchanger_mix_isend_irecv_vtable;
  return exchanger;
}

Moritz Hanke's avatar
Moritz Hanke committed
125
126
Xt_exchanger
xt_exchanger_mix_isend_irecv_new(int nsend, int nrecv,
127
128
                                 const struct Xt_redist_msg *send_msgs,
                                 const struct Xt_redist_msg *recv_msgs,
129
                                 MPI_Comm comm, int tag_offset) {
Moritz Hanke's avatar
Moritz Hanke committed
130

131
132
  assert((nsend >= 0) & (nrecv >= 0));
  size_t nmsg = (size_t)nsend + (size_t)nrecv;
133
134
  Xt_exchanger_mix_isend_irecv exchanger
    = xt_exchanger_mix_isend_irecv_alloc(nmsg);
135
136
  exchanger->comm = comm;
  exchanger->tag_offset = tag_offset;
137
  struct mix_msg *restrict msgs = exchanger->msgs;
138
139
140
  xt_redist_msgs_strided_copy((size_t)nsend, send_msgs, sizeof (send_msgs[0]),
                              &(msgs[0].data), sizeof (msgs[0]), comm);
  for (size_t i = 0; i < (size_t)nsend; ++i)
141
    msgs[i].type = SEND;
142
143
144
  xt_redist_msgs_strided_copy((size_t)nrecv, recv_msgs, sizeof (recv_msgs[0]),
                              &(msgs[nsend].data), sizeof (msgs[0]), comm);
  for (size_t i = 0; i < (size_t)nrecv; ++i)
145
    msgs[i + (size_t)nsend].type = RECV;
Moritz Hanke's avatar
Moritz Hanke committed
146

147
  xt_exchanger_internal_optimize(nmsg, msgs, sizeof(*msgs), comm);
Moritz Hanke's avatar
Moritz Hanke committed
148

149
  for (size_t i = 1; i < nmsg; ++i) {
Moritz Hanke's avatar
Moritz Hanke committed
150

151
    if (msgs[i-1].data.rank == msgs[i].data.rank && MSG_DIR(msgs[i]) == SEND) {
Moritz Hanke's avatar
Moritz Hanke committed
152

153
154
155
      struct mix_msg temp = msgs[i-1];
      msgs[i-1] = msgs[i];
      msgs[i] = temp;
Moritz Hanke's avatar
Moritz Hanke committed
156
157
158
159
160
161
162
      i++;
    }
  }

  return (Xt_exchanger)exchanger;
}

163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
static Xt_exchanger
xt_exchanger_mix_isend_irecv_copy(Xt_exchanger exchanger,
                                  MPI_Comm new_comm, int new_tag_offset)
{
  Xt_exchanger_mix_isend_irecv exchanger_msr =
    (Xt_exchanger_mix_isend_irecv)exchanger;
  size_t nmsg = (size_t)exchanger_msr->n;
  Xt_exchanger_mix_isend_irecv exchanger_copy
    = xt_exchanger_mix_isend_irecv_alloc(nmsg);
  exchanger_copy->comm = new_comm;
  exchanger_copy->tag_offset = new_tag_offset;
  struct mix_msg *restrict new_msgs = exchanger_copy->msgs,
    *restrict orig_msgs = exchanger_msr->msgs;
  xt_redist_msgs_strided_copy(nmsg, &orig_msgs->data, sizeof (*orig_msgs),
                              &new_msgs->data, sizeof (*new_msgs),
                              new_comm);
  for (size_t i = 0; i < nmsg; ++i)
    new_msgs[i].type = orig_msgs[i].type;
  return (Xt_exchanger)exchanger_copy;
}

184
enum { max_on_stack_req = 16 };
185

Moritz Hanke's avatar
Moritz Hanke committed
186
187
188
189
190
static void xt_exchanger_mix_isend_irecv_delete(Xt_exchanger exchanger) {

  Xt_exchanger_mix_isend_irecv exchanger_msr =
    (Xt_exchanger_mix_isend_irecv)exchanger;

Thomas Jahns's avatar
Thomas Jahns committed
191
  size_t nmsg = (size_t)exchanger_msr->n;
192
  struct mix_msg *restrict msgs = exchanger_msr->msgs;
Moritz Hanke's avatar
Moritz Hanke committed
193

194
195
  xt_redist_msgs_strided_destruct(nmsg, &msgs[0].data, exchanger_msr->comm,
                                  sizeof (*msgs));
Moritz Hanke's avatar
Moritz Hanke committed
196
197
198
199
  free(exchanger_msr);
}

static void xt_exchanger_mix_isend_irecv_s_exchange(Xt_exchanger exchanger,
Thomas Jahns's avatar
Thomas Jahns committed
200
201
                                                    const void * src_data,
                                                    void * dst_data) {
Moritz Hanke's avatar
Moritz Hanke committed
202
203
204
205

  Xt_exchanger_mix_isend_irecv exchanger_msr =
    (Xt_exchanger_mix_isend_irecv)exchanger;

206
207
208
209
210
  if (exchanger_msr->n > 0) {
    size_t nmsg = (size_t)exchanger_msr->n;
    MPI_Comm comm = exchanger_msr->comm;
    struct mix_msg *restrict msgs = exchanger_msr->msgs;
    int tag_offset = exchanger_msr->tag_offset;
211
212
213
214
    MPI_Request req_buf[max_on_stack_req];
    MPI_Request *requests
      = nmsg <= max_on_stack_req
      ? req_buf : xmalloc(nmsg * sizeof (*requests));
Thomas Jahns's avatar
Thomas Jahns committed
215
216
217
    for (size_t i = 0; i < nmsg; ++i) {
      typedef int (*ifp)(void *buf, int count, MPI_Datatype datatype, int dest,
                         int tag, MPI_Comm comm, MPI_Request *request);
218
219
      ifp op = MSG_DIR(msgs[i]) == SEND ? (ifp)MPI_Isend : (ifp)MPI_Irecv;
      void *data = MSG_DIR(msgs[i]) == SEND ? (void *)src_data : dst_data;
Thomas Jahns's avatar
Thomas Jahns committed
220
221
222
223
224
      xt_mpi_call(op(data, 1, msgs[i].data.datatype,
                     msgs[i].data.rank,
                     tag_offset + xt_mpi_tag_exchange_msg,
                     comm, requests+i), comm);
    }
225
    xt_mpi_call(MPI_Waitall((int)nmsg, requests, MPI_STATUSES_IGNORE), comm);
226
227
    if (requests != req_buf)
      free(requests);
228
  }
Moritz Hanke's avatar
Moritz Hanke committed
229
}
230

231
232
233
static void xt_exchanger_mix_isend_irecv_a_exchange(
  Xt_exchanger exchanger, const void * src_data, void * dst_data,
  Xt_request *request) {
234
235
236
237
238
239
240
241
242
243
244

  Xt_exchanger_mix_isend_irecv exchanger_msr =
    (Xt_exchanger_mix_isend_irecv)exchanger;

  Xt_request requests = XT_REQUEST_NULL;

  if (exchanger_msr->n > 0) {
    size_t nmsg = (size_t)exchanger_msr->n;
    MPI_Comm comm = exchanger_msr->comm;
    struct mix_msg *restrict msgs = exchanger_msr->msgs;
    int tag_offset = exchanger_msr->tag_offset;
245
246
247
248
    MPI_Request req_buf[max_on_stack_req];
    MPI_Request *tmp_requests
      = nmsg <= max_on_stack_req
      ? req_buf : xmalloc(nmsg * sizeof (*tmp_requests));
249
250
251
    for (size_t i = 0; i < nmsg; ++i) {
      typedef int (*ifp)(void *buf, int count, MPI_Datatype datatype, int dest,
                         int tag, MPI_Comm comm, MPI_Request *request);
252
253
      ifp op = MSG_DIR(msgs[i]) == SEND ? (ifp)MPI_Isend : (ifp)MPI_Irecv;
      void *data = MSG_DIR(msgs[i]) == SEND ? (void *)src_data : dst_data;
254
255
256
257
258
259
      xt_mpi_call(op(data, 1, msgs[i].data.datatype,
                     msgs[i].data.rank,
                     tag_offset + xt_mpi_tag_exchange_msg,
                     comm, tmp_requests+i), comm);
    }
    requests = xt_request_msgs_new((int)nmsg, tmp_requests, comm);
260
261
    if (tmp_requests != req_buf)
      free(tmp_requests);
262
263
  }

264
  *request = requests;
265
266
}

267
268
269
static int
xt_exchanger_mix_isend_irecv_get_msg_ranks(Xt_exchanger exchanger,
                                           enum xt_msg_direction direction,
270
                                           int *restrict *ranks)
271
272
273
274
275
276
{
  Xt_exchanger_mix_isend_irecv exchanger_msr =
    (Xt_exchanger_mix_isend_irecv)exchanger;
  size_t nmsg = 0, nmsg_all = (size_t)exchanger_msr->n;
  const struct mix_msg *restrict msgs = exchanger_msr->msgs;
  for (size_t i = 0; i < nmsg_all; ++i)
277
    nmsg += MSG_DIR(msgs[i]) == direction;
278
279
  int *restrict ranks_ = *ranks = xmalloc(nmsg * sizeof (*ranks_));
  for (size_t i = 0, j = (size_t)-1; i < nmsg_all; ++i)
280
    if (MSG_DIR(msgs[i]) == direction)
281
282
283
284
      ranks_[++j] = msgs[i].data.rank;
  return (int)nmsg;
}

285
286
287
288
289
290
291
292
293
294
295
static MPI_Datatype
xt_exchanger_mix_isend_irecv_get_MPI_Datatype(Xt_exchanger exchanger,
                                              int rank,
                                              enum xt_msg_direction direction)
{
  Xt_exchanger_mix_isend_irecv exchanger_msr =
    (Xt_exchanger_mix_isend_irecv)exchanger;
  size_t nmsg = (size_t)exchanger_msr->n;
  struct mix_msg *restrict msgs = exchanger_msr->msgs;
  MPI_Datatype datatype_copy = MPI_DATATYPE_NULL;
  for (size_t i = 0; i < nmsg; ++i)
296
    if (MSG_DIR(msgs[i]) == direction && msgs[i].data.rank == rank) {
297
298
299
300
301
302
303
      xt_mpi_call(MPI_Type_dup(msgs[i].data.datatype, &datatype_copy),
                  exchanger_msr->comm);
      break;
    }
  return datatype_copy;
}

Thomas Jahns's avatar
Thomas Jahns committed
304
305
306
307
308
309
310
311
312
/*
 * Local Variables:
 * c-basic-offset: 2
 * coding: utf-8
 * indent-tabs-mode: nil
 * show-trailing-whitespace: t
 * require-trailing-newline: t
 * End:
 */