Skip to content
Snippets Groups Projects
Commit 5cce6dcf authored by Thomas Jahns's avatar Thomas Jahns :cartwheel:
Browse files

exchangers: Use inheritance in xt_exchanger_irecv_isend_ddt_packed.

parent 3632475a
No related branches found
No related tags found
1 merge request!31Draft: Attempt at fuller exchanger interface
......@@ -198,6 +198,7 @@ if ENABLE_XT_DDT_EXCHANGER
libyaxt_c_la_SOURCES += \
xt_exchanger_irecv_isend_ddt_packed.c \
xt_exchanger_irecv_isend_ddt_packed.h \
xt_exchanger_irecv_isend_ddt_packed_internal.h \
xt_request_msgs_ddt_packed.c \
xt_gpu.h \
xt_gpu.c \
......
......@@ -69,6 +69,10 @@ typedef struct Xt_exchanger_thread_share_ *Xt_exchanger_thread_share;
typedef void (*xt_exchanger_a_exchange_func)(
Xt_exchanger, const void *, void *, Xt_request *request);
typedef Xt_exchanger_thread_share (*xt_exchanger_create_thread_share_func)(
Xt_exchanger, Xt_config);
typedef void (*xt_exchanger_destroy_thread_share_func)(
Xt_exchanger, Xt_exchanger_thread_share);
struct xt_exchanger_vtable {
Xt_exchanger (*copy)(Xt_exchanger, MPI_Comm, int);
......@@ -82,8 +86,8 @@ struct xt_exchanger_vtable {
void (*team_share_default_init)(void *share);
void (*team_share_destroy)(void *share);
size_t team_share_size;
Xt_exchanger_thread_share (*create_thread_share)(Xt_exchanger, Xt_config);
void (*destroy_thread_share)(Xt_exchanger, Xt_exchanger_thread_share);
xt_exchanger_create_thread_share_func create_thread_share;
xt_exchanger_destroy_thread_share_func destroy_thread_share;
Xt_exchanger_iter (*get_iterator)(Xt_exchanger exchanger, Xt_config config);
};
......
......@@ -53,7 +53,9 @@
#include "xt_mpi_internal.h"
#include "xt_redist_internal.h"
#include "xt_exchanger_irecv_isend_ddt_packed.h"
#include "xt_exchanger_irecv_isend_ddt_packed_internal.h"
#include "xt_exchanger_simple_base.h"
#include "xt_exchanger_simple_base_internal.h"
#include "xt_exchanger_internal.h"
#include "xt_ddt_internal.h"
......@@ -65,15 +67,45 @@
#pragma GCC diagnostic ignored "-Wstringop-overflow"
#endif
enum {
nvtab = 1,
};
struct xt_exchanger_vtable
xt_exchanger_irecv_isend_ddt_packed_vtable[nvtab];
static void
xt_exchanger_irecv_isend_ddt_packed_s_exchange(
const void *src_data, void *dst_data,
int nsend, int nrecv,
const struct Xt_msg_param *send_msgs, const struct Xt_msg_param *recv_msgs,
int tag_offset, MPI_Comm comm) {
Xt_exchanger exchanger,
const void *src_data, void *dst_data);
XT_GPU_INSTR_PUSH(xt_exchanger_irecv_isend_ddt_packed_s_exchange);
static void
xt_exchanger_irecv_isend_ddt_packed_a_exchange(
Xt_exchanger exchangr,
const void *src_data,
void *dst_data,
Xt_request *request);
void
xt_exchanger_irecv_isend_ddt_packed_init(void)
{
xt_exchanger_irecv_isend_ddt_packed_vtable[0]
= xt_exchanger_simple_base_vtable;
xt_exchanger_irecv_isend_ddt_packed_vtable[0].s_exchange
= xt_exchanger_irecv_isend_ddt_packed_s_exchange;
xt_exchanger_irecv_isend_ddt_packed_vtable[0].a_exchange
= xt_exchanger_irecv_isend_ddt_packed_a_exchange;
}
static void
xt_exchanger_irecv_isend_ddt_packed_s_exchange(
Xt_exchanger exchanger,
const void *src_data, void *dst_data)
{
XT_GPU_INSTR_PUSH(xt_exchanger_irecv_isend_ddt_packed_s_exchange);
Xt_exchanger_simple_base base = (Xt_exchanger_simple_base)exchanger;
int nsend = base->nmsg[xt_direction_send],
nrecv = base->nmsg[xt_direction_recv];
enum { AUTO_ALLOC_SIZE = 32, };
MPI_Request *requests, requests_auto[AUTO_ALLOC_SIZE];
size_t *buffer_sizes, buffer_sizes_auto[AUTO_ALLOC_SIZE];
......@@ -92,6 +124,8 @@ xt_exchanger_irecv_isend_ddt_packed_s_exchange(
size_t recv_buffer_size = 0;
size_t send_buffer_size = 0;
const struct Xt_msg_param *send_msgs = base->msgs,
*recv_msgs = base->msgs + nsend;
for (int i = 0; i < nrecv; ++i)
recv_buffer_size +=
((buffer_sizes[i] =
......@@ -115,10 +149,10 @@ xt_exchanger_irecv_isend_ddt_packed_s_exchange(
size_t ofs = 0;
for (int i = 0; i < nrecv; ++i) {
int recv_size = (int)buffer_sizes[i];
xt_mpi_call(MPI_Irecv(recv_buffer + ofs, recv_size, MPI_BYTE,
recv_msgs[i].rank,
tag_offset + xt_mpi_tag_exchange_msg, comm,
requests+i), comm);
xt_mpi_call(MPI_Irecv(
recv_buffer + ofs, recv_size, MPI_BYTE, recv_msgs[i].rank,
base->tag_offset + xt_mpi_tag_exchange_msg, base->comm,
requests+i), base->comm);
ofs += (size_t)recv_size;
}
......@@ -126,24 +160,24 @@ xt_exchanger_irecv_isend_ddt_packed_s_exchange(
for (int i = 0; i < nsend; ++i) {
size_t send_size = buffer_sizes[nrecv+i];
MPI_Aint extent, lb;
xt_mpi_call(MPI_Type_get_extent(send_msgs[i].datatype, &lb, &extent), comm);
xt_mpi_call(MPI_Type_get_extent(send_msgs[i].datatype, &lb, &extent), base->comm);
xt_ddt_pack_internal(
CAST_MPI_SEND_BUF(src_data), (size_t)send_msgs[i].count, ddts[nrecv+i],
send_buffer + ofs, extent, src_data_memtype);
xt_mpi_call(MPI_Isend(send_buffer + ofs, (int)send_size, MPI_BYTE,
send_msgs[i].rank,
tag_offset + xt_mpi_tag_exchange_msg, comm,
requests+nrecv+i), comm);
base->tag_offset + xt_mpi_tag_exchange_msg, base->comm,
requests+nrecv+i), base->comm);
ofs += send_size;
}
xt_mpi_call(MPI_Waitall(nrecv + nsend, requests, MPI_STATUSES_IGNORE), comm);
xt_mpi_call(MPI_Waitall(nrecv + nsend, requests, MPI_STATUSES_IGNORE), base->comm);
ofs = 0;
for (int i = 0; i < nrecv; ++i) {
size_t recv_size = buffer_sizes[i];
MPI_Aint extent, lb;
xt_mpi_call(MPI_Type_get_extent(recv_msgs[i].datatype, &lb, &extent), comm);
xt_mpi_call(MPI_Type_get_extent(recv_msgs[i].datatype, &lb, &extent), base->comm);
xt_ddt_unpack_internal(
recv_buffer + ofs, (size_t)recv_msgs[i].count, ddts[i],
(unsigned char *)dst_data + recv_msgs[i].displacement,
......@@ -162,14 +196,16 @@ xt_exchanger_irecv_isend_ddt_packed_s_exchange(
}
static void
xt_exchanger_irecv_isend_ddt_packed_a_exchange(const void *src_data, void *dst_data,
int nsend, int nrecv,
const struct Xt_msg_param *send_msgs,
const struct Xt_msg_param *recv_msgs,
int tag_offset, MPI_Comm comm,
Xt_request *request) {
xt_exchanger_irecv_isend_ddt_packed_a_exchange(
Xt_exchanger exchanger,
const void *src_data,
void *dst_data,
Xt_request *request)
{
XT_GPU_INSTR_PUSH(xt_exchanger_irecv_isend_ddt_packed_a_exchange);
Xt_exchanger_simple_base base = (Xt_exchanger_simple_base)exchanger;
int nsend = base->nmsg[xt_direction_send],
nrecv = base->nmsg[xt_direction_recv];
/* todo: manually inline xt_request_msgs_ddt_packed_new to save
* redundant allocations */
MPI_Request * tmp_requests =
......@@ -183,18 +219,21 @@ xt_exchanger_irecv_isend_ddt_packed_a_exchange(const void *src_data, void *dst_d
enum xt_memtype src_data_memtype = xt_gpu_get_memtype(src_data);
enum xt_memtype dst_data_memtype = xt_gpu_get_memtype(dst_data);
Xt_ddt * recv_ddts = xmalloc((size_t)nrecv * sizeof(*recv_ddts));
Xt_ddt *recv_ddts = xmalloc((size_t)nrecv * sizeof(*recv_ddts));
const struct Xt_msg_param *recv_msgs = base->msgs + nsend;
for (int i = 0; i < nrecv; ++i) {
recv_ddts[i] = xt_ddt_from_mpi_ddt(recv_msgs[i].datatype);
size_t buffer_size
= xt_ddt_get_pack_size_internal((size_t)recv_msgs[i].count, recv_ddts[i]);
buffers[i] = xt_gpu_malloc(buffer_size, dst_data_memtype);
xt_mpi_call(MPI_Irecv(buffers[i], (int)buffer_size, MPI_BYTE,
recv_msgs[i].rank,
tag_offset + xt_mpi_tag_exchange_msg, comm,
tmp_requests+i), comm);
xt_mpi_call(MPI_Irecv(
buffers[i], (int)buffer_size, MPI_BYTE,
recv_msgs[i].rank,
base->tag_offset + xt_mpi_tag_exchange_msg, base->comm,
tmp_requests+i), base->comm);
}
const struct Xt_msg_param *send_msgs = base->msgs;
for (int i = 0; i < nsend; ++i) {
Xt_ddt send_ddt = xt_ddt_from_mpi_ddt(send_msgs[i].datatype);
size_t buffer_size
......@@ -203,25 +242,27 @@ xt_exchanger_irecv_isend_ddt_packed_a_exchange(const void *src_data, void *dst_d
//! \todo merge all packing kernels into single kernel call -> less overhead,
//! but not overlapping of packing and sending
MPI_Aint extent, lb;
xt_mpi_call(MPI_Type_get_extent(send_msgs[i].datatype, &lb, &extent), comm);
xt_mpi_call(MPI_Type_get_extent(send_msgs[i].datatype, &lb, &extent), base->comm);
xt_ddt_pack_internal(
src_data, (size_t)send_msgs[i].count, send_ddt, buffers[nrecv + i],
extent, src_data_memtype);
xt_mpi_call(MPI_Isend(buffers[nrecv + i], (int)buffer_size, MPI_BYTE,
send_msgs[i].rank,
tag_offset + xt_mpi_tag_exchange_msg, comm,
tmp_requests+nrecv+i), comm);
xt_mpi_call(MPI_Isend(
buffers[nrecv + i], (int)buffer_size, MPI_BYTE,
send_msgs[i].rank,
base->tag_offset + xt_mpi_tag_exchange_msg, base->comm,
tmp_requests+nrecv+i), base->comm);
}
for (int i = 0; i < nrecv; ++i) {
MPI_Aint lb;
xt_mpi_call(MPI_Type_get_extent(recv_msgs[i].datatype, &lb, recv_extents+i), comm);
xt_mpi_call(MPI_Type_get_extent(recv_msgs[i].datatype, &lb, recv_extents+i),
base->comm);
recv_displacements[i] = recv_msgs[i].displacement;
recv_counts[i] = recv_msgs[i].count;
}
Xt_request requests =
xt_request_msgs_ddt_packed_new(
nrecv + nsend, tmp_requests, comm, nrecv, nsend,
nrecv + nsend, tmp_requests, base->comm, nrecv, nsend,
recv_ddts, recv_extents, recv_counts, buffers, buffers + nrecv, dst_data,
src_data_memtype, dst_data_memtype);
......@@ -247,14 +288,19 @@ xt_exchanger_irecv_isend_ddt_packed_new(int nsend, int nrecv,
* be used on @a comm by any other part of the program during the
* lifetime of the created exchanger object
*/
return
xt_exchanger_simple_base_new(nsend, nrecv, send_msgs, recv_msgs,
comm, tag_offset,
xt_exchanger_irecv_isend_ddt_packed_s_exchange,
xt_exchanger_irecv_isend_ddt_packed_a_exchange,
(xt_simple_create_thread_share_func)0,
(xt_simple_destroy_thread_share_func)0,
config);
Xt_exchanger_simple_base exchanger =
xt_exchanger_simple_base_alloc((size_t)nsend+(size_t)nrecv, 0);
xt_exchanger_simple_base_init(
exchanger, nsend, nrecv, send_msgs, recv_msgs,
comm, tag_offset,
(xt_simple_s_exchange_func)0,
(xt_simple_a_exchange_func)0,
(xt_simple_create_thread_share_func)0,
(xt_simple_destroy_thread_share_func)0,
config);
exchanger->vtable
= xt_exchanger_irecv_isend_ddt_packed_vtable;
return (Xt_exchanger)exchanger;
}
/*
......
......@@ -54,7 +54,6 @@
#include "xt/xt_core.h"
#include "xt/xt_config.h"
#include "xt_exchanger.h"
#include "xt_redist_internal.h"
/**
* Constructor for an exchanger using asynchronous send and recv, the data is
......
/**
* @file xt_exchanger_irecv_isend_ddt_packed_internal.h
*
* @copyright Copyright (C) 2022 Jörg Behrens <behrens@dkrz.de>
* Moritz Hanke <hanke@dkrz.de>
* Thomas Jahns <jahns@dkrz.de>
*
* @author Jörg Behrens <behrens@dkrz.de>
* Moritz Hanke <hanke@dkrz.de>
* Thomas Jahns <jahns@dkrz.de>
*/
/*
* Keywords:
* Maintainer: Jörg Behrens <behrens@dkrz.de>
* Moritz Hanke <hanke@dkrz.de>
* Thomas Jahns <jahns@dkrz.de>
* URL: https://dkrz-sw.gitlab-pages.dkrz.de/yaxt/
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
*
* Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* Neither the name of the DKRZ GmbH nor the names of its contributors
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
* IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
* TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
* PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
* OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
* NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef XT_EXCHANGER_IRECV_ISEND_DDT_PACKED_INTERNAL_H
#define XT_EXCHANGER_IRECV_ISEND_DDT_PACKED_INTERNAL_H
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif
#include "core/ppm_visibility.h"
#include "xt/xt_core.h"
#include "xt/xt_config.h"
#include "xt_exchanger.h"
PPM_DSO_INTERNAL extern struct xt_exchanger_vtable
xt_exchanger_irecv_isend_ddt_packed_vtable[];
PPM_DSO_INTERNAL void
xt_exchanger_irecv_isend_ddt_packed_init(void);
#endif // XT_EXCHANGER_IRECV_ISEND_DDT_PACKED_INTERNAL_H
/*
* Local Variables:
* c-basic-offset: 2
* coding: utf-8
* indent-tabs-mode: nil
* show-trailing-whitespace: t
* require-trailing-newline: t
* End:
*/
......@@ -58,6 +58,9 @@
#endif
#include "xt_exchanger_irecv_isend_packed.h"
#include "xt_exchanger_irecv_isend_ddt_packed.h"
#if XT_ENABLE_DDT_EXCHANGER
# include "xt_exchanger_irecv_isend_ddt_packed_internal.h"
#endif
#include "xt_exchanger_irecv_isend.h"
#include "xt_exchanger_irecv_isend_internal.h"
#include "xt_exchanger_irecv_send.h"
......@@ -77,7 +80,7 @@ xt_exchanger_new_get_vtable(Xt_exchanger_new exchanger_new)
vtab = &xt_exchanger_simple_base_vtable;
#if XT_ENABLE_DDT_EXCHANGER
else if (exchanger_new == xt_exchanger_irecv_isend_ddt_packed_new)
vtab = &xt_exchanger_simple_base_vtable;
vtab = &xt_exchanger_irecv_isend_ddt_pack_vtable;
#endif
#if XT_CAN_USE_MPI_NEIGHBOR_ALLTOALL
else if (exchanger_new == xt_exchanger_neigh_alltoall_new)
......
......@@ -59,6 +59,7 @@
#include "xt_exchanger.h"
#include "xt_exchanger_irecv_isend_internal.h"
#include "xt_exchanger_irecv_send_internal.h"
#include "xt_exchanger_irecv_isend_ddt_packed_internal.h"
#include "xt_mpi_internal.h"
#include "instr.h"
#include "xt_gpu.h"
......@@ -88,6 +89,7 @@ xt_initialize(MPI_Comm default_comm)
#endif
#ifdef XT_ENABLE_DDT_EXCHANGER
xt_gpu_init();
xt_exchanger_irecv_isend_ddt_packed_init();
#endif
xt_lib_state = xt_lib_initialized;
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment