Skip to content
Snippets Groups Projects
Commit ffe4cde3 authored by Thomas Jahns's avatar Thomas Jahns :cartwheel:
Browse files

exchangers: Use inheritance in xt_exchanger_irecv_isend_packed.

parent 5cce6dcf
No related branches found
No related tags found
1 merge request!31Draft: Attempt at fuller exchanger interface
......@@ -190,6 +190,7 @@ libyaxt_c_la_SOURCES = \
xt_exchanger_irecv_isend_internal.h \
xt_exchanger_irecv_isend_packed.c \
xt_exchanger_irecv_isend_packed.h \
xt_exchanger_irecv_isend_packed_internal.h \
xt_exchanger_neigh_alltoall.h \
xt_exchanger_mix_isend_irecv.c \
xt_exchanger_mix_isend_irecv.h
......
......@@ -54,6 +54,7 @@
#include "xt/xt_core.h"
#include "xt/xt_config.h"
#include "xt_exchanger.h"
#include "xt_exchanger_internal.h"
PPM_DSO_INTERNAL extern struct xt_exchanger_vtable
xt_exchanger_irecv_isend_ddt_packed_vtable[];
......
......@@ -61,7 +61,82 @@
#include "xt_redist_internal.h"
#include "xt_exchanger_internal.h"
#include "xt_exchanger_irecv_isend_packed.h"
#include "xt_exchanger_irecv_isend_packed_internal.h"
#include "xt_exchanger_simple_base.h"
#include "xt_exchanger_simple_base_internal.h"
enum {
#ifdef _OPENMP
nvtab = 2,
#else
nvtab = 1,
#endif
};
struct xt_exchanger_vtable
xt_exchanger_irecv_isend_packed_vtable[nvtab];
static void
xt_exchanger_irecv_isend_packed_s_exchange(
Xt_exchanger exchanger,
const void *src_data, void *dst_data);
static void
xt_exchanger_irecv_isend_packed_a_exchange(
Xt_exchanger exchanger,
const void *src_data,
void *dst_data,
Xt_request *request);
#ifdef _OPENMP
static void
xt_exchanger_irecv_isend_packed_s_exchange_omp(
Xt_exchanger exchanger,
const void *src_data, void *dst_data);
static void
xt_exchanger_irecv_isend_packed_a_exchange_omp(
Xt_exchanger exchanger,
const void *src_data,
void *dst_data,
Xt_request *request);
#endif
static Xt_exchanger_thread_share
xt_exchanger_irecv_isend_packed_create_thread_share(
Xt_exchanger exchanger, Xt_config config);
static void
xt_exchanger_irecv_isend_packed_destroy_thread_share(
Xt_exchanger exchanger, Xt_exchanger_thread_share thread_share);
void
xt_exchanger_irecv_isend_packed_init(void)
{
xt_exchanger_irecv_isend_packed_vtable[0]
= xt_exchanger_simple_base_vtable;
xt_exchanger_irecv_isend_packed_vtable[0].s_exchange
= xt_exchanger_irecv_isend_packed_s_exchange;
xt_exchanger_irecv_isend_packed_vtable[0].a_exchange
= xt_exchanger_irecv_isend_packed_a_exchange;
xt_exchanger_irecv_isend_packed_vtable[0].create_thread_share
= xt_exchanger_irecv_isend_packed_create_thread_share;
xt_exchanger_irecv_isend_packed_vtable[0].destroy_thread_share
= xt_exchanger_irecv_isend_packed_destroy_thread_share;
#ifdef _OPENMP
xt_exchanger_irecv_isend_packed_vtable[1]
= xt_exchanger_simple_base_vtable;
xt_exchanger_irecv_isend_packed_vtable[1].s_exchange
= xt_exchanger_irecv_isend_packed_s_exchange_omp;
xt_exchanger_irecv_isend_packed_vtable[1].a_exchange
= xt_exchanger_irecv_isend_packed_a_exchange_omp;
xt_exchanger_irecv_isend_packed_vtable[1].create_thread_share
= xt_exchanger_irecv_isend_packed_create_thread_share;
xt_exchanger_irecv_isend_packed_vtable[1].destroy_thread_share
= xt_exchanger_irecv_isend_packed_destroy_thread_share;
#endif
}
/* unfortunately GCC 11 to 13 cannot handle the literal constants used for
* MPI_STATUSES_IGNORE by MPICH */
......@@ -156,11 +231,13 @@ enum { AUTO_ALLOC_SIZE = 32, };
static void
xt_exchanger_irecv_isend_packed_s_exchange(
const void *src_data, void *dst_data,
int nsend, int nrecv,
const struct Xt_msg_param *send_msgs, const struct Xt_msg_param *recv_msgs,
int tag_offset, MPI_Comm comm)
Xt_exchanger exchanger,
const void *src_data, void *dst_data)
{
Xt_exchanger_simple_base base = (Xt_exchanger_simple_base)exchanger;
int nsend = base->nmsg[xt_direction_send],
nrecv = base->nmsg[xt_direction_recv];
MPI_Request *requests, requests_auto[AUTO_ALLOC_SIZE];
size_t *buf_ofs, buf_ofs_auto[AUTO_ALLOC_SIZE+1];
......@@ -173,24 +250,28 @@ xt_exchanger_irecv_isend_packed_s_exchange(
+ (num_tx+1) * sizeof (*buf_ofs));
buf_ofs = (void *)(requests + (num_tx+(num_tx&1)));
}
const struct Xt_msg_param *send_msgs = base->msgs,
*recv_msgs = base->msgs + nsend;
size_t buffer_size = get_buffer_offsets(buf_ofs, nsend, nrecv,
send_msgs, recv_msgs, comm);
send_msgs, recv_msgs, base->comm);
unsigned char *buffer = xmalloc(buffer_size);
start_packed_transfer(buffer, nrecv, buf_ofs,
src_data, nsend, nrecv,
send_msgs, recv_msgs,
comm, tag_offset,
base->comm, base->tag_offset,
requests);
xt_mpi_call(MPI_Waitall(nrecv + nsend, requests, MPI_STATUSES_IGNORE), comm);
xt_mpi_call(MPI_Waitall(nrecv + nsend, requests, MPI_STATUSES_IGNORE),
base->comm);
for (int i = 0; i < nrecv; ++i) {
int position = 0, recv_size = (int)(buf_ofs[i+1]-buf_ofs[i]);
xt_mpi_call(MPI_Unpack(buffer + buf_ofs[i], recv_size, &position,
(unsigned char *)dst_data + recv_msgs[i].displacement,
recv_msgs[i].count, recv_msgs[i].datatype, comm), comm);
recv_msgs[i].count, recv_msgs[i].datatype,
base->comm), base->comm);
}
free(buffer);
......@@ -201,11 +282,13 @@ xt_exchanger_irecv_isend_packed_s_exchange(
#ifdef _OPENMP
static void
xt_exchanger_irecv_isend_packed_s_exchange_omp(
const void *src_data, void *dst_data,
int nsend, int nrecv,
const struct Xt_msg_param *send_msgs, const struct Xt_msg_param *recv_msgs,
int tag_offset, MPI_Comm comm)
Xt_exchanger exchanger,
const void *src_data, void *dst_data)
{
Xt_exchanger_simple_base base = (Xt_exchanger_simple_base)exchanger;
int nsend = base->nmsg[xt_direction_send],
nrecv = base->nmsg[xt_direction_recv];
MPI_Request *requests, requests_auto[AUTO_ALLOC_SIZE];
size_t *buf_ofs, buf_ofs_auto[AUTO_ALLOC_SIZE+1];
......@@ -214,12 +297,16 @@ xt_exchanger_irecv_isend_packed_s_exchange_omp(
requests = requests_auto;
buf_ofs = buf_ofs_auto;
} else {
requests = xmalloc((num_tx+(num_tx&1)) * sizeof (*requests) + (num_tx+1) * sizeof (*buf_ofs));
requests = xmalloc((num_tx+(num_tx&1))
* sizeof (*requests) + (num_tx+1) * sizeof (*buf_ofs));
buf_ofs = (size_t *)(requests + (num_tx+(num_tx&1)));
}
const struct Xt_msg_param *send_msgs = base->msgs,
*recv_msgs = base->msgs + nsend;
size_t buffer_size = get_buffer_offsets(buf_ofs, nsend, nrecv,
send_msgs, recv_msgs, comm);
send_msgs, recv_msgs, base->comm);
unsigned char *buffer = xmalloc(buffer_size);
#pragma omp parallel
......@@ -237,17 +324,19 @@ xt_exchanger_irecv_isend_packed_s_exchange_omp(
buf_ofs+start_recv,
src_data, nsend_, nrecv_,
send_msgs+start_send, recv_msgs+start_recv,
comm, tag_offset,
base->comm, base->tag_offset,
requests+start_req);
xt_mpi_call(MPI_Waitall(nreq, requests+start_req, MPI_STATUSES_IGNORE),
comm);
base->comm);
for (int i = start_recv; i < end_recv; ++i) {
int position = 0, recv_size = (int)(buf_ofs[i+1]-buf_ofs[i]);
xt_mpi_call(MPI_Unpack(buffer + buf_ofs[i], recv_size, &position,
(unsigned char *)dst_data + recv_msgs[i].displacement,
recv_msgs[i].count, recv_msgs[i].datatype, comm),
comm);
(unsigned char *)dst_data
+ recv_msgs[i].displacement,
recv_msgs[i].count, recv_msgs[i].datatype,
base->comm),
base->comm);
}
}
......@@ -373,10 +462,13 @@ finalize_packed_a_exchange(Xt_request request, void *buf)
static void
xt_exchanger_irecv_isend_packed_a_exchange(
const void *src_data, void *dst_data, int nsend, int nrecv,
const struct Xt_msg_param *send_msgs, const struct Xt_msg_param *recv_msgs,
int tag_offset, MPI_Comm comm, Xt_request *request)
Xt_exchanger exchanger,
const void *src_data, void *dst_data, Xt_request *request)
{
Xt_exchanger_simple_base base = (Xt_exchanger_simple_base)exchanger;
int nsend = base->nmsg[xt_direction_send],
nrecv = base->nmsg[xt_direction_recv];
size_t *buf_ofs, buf_ofs_auto[AUTO_ALLOC_SIZE+1];
size_t num_tx = (size_t)nrecv + (size_t)nsend;
......@@ -385,13 +477,15 @@ xt_exchanger_irecv_isend_packed_a_exchange(
else
buf_ofs = xmalloc((num_tx+1) * sizeof (*buf_ofs));
const struct Xt_msg_param *send_msgs = base->msgs,
*recv_msgs = base->msgs + nsend;
size_t buffer_size = get_buffer_offsets(buf_ofs, nsend, nrecv,
send_msgs, recv_msgs, comm);
send_msgs, recv_msgs, base->comm);
size_t inventory_size = get_inventory_size((size_t)nrecv);
struct Xt_config_ conf = xt_default_config;
xt_config_set_redist_mthread_mode(&conf, XT_MT_NONE);
Xt_request requests
= xt_request_msgs_ebuf_alloc(nrecv + nsend, comm,
= xt_request_msgs_ebuf_alloc(nrecv + nsend, base->comm,
inventory_size + buffer_size, &conf);
xt_request_msgs_ebuf_set_finalizer(requests, finalize_packed_a_exchange);
......@@ -406,7 +500,7 @@ xt_exchanger_irecv_isend_packed_a_exchange(
nrecv, buf_ofs,
src_data, nsend, nrecv,
send_msgs, recv_msgs,
comm, tag_offset,
base->comm, base->tag_offset,
tmp_requests);
size_t *buf_ofs_ = header->buf_ofs;
......@@ -427,7 +521,7 @@ xt_exchanger_irecv_isend_packed_a_exchange(
}
MPI_Datatype *datatypes = get_inventory_datatypes(header);
for (int i = 0; i < nrecv; ++i)
xt_mpi_call(MPI_Type_dup(recv_msgs[i].datatype, datatypes+i), comm);
xt_mpi_call(MPI_Type_dup(recv_msgs[i].datatype, datatypes+i), base->comm);
*request = requests;
if (num_tx > AUTO_ALLOC_SIZE)
......@@ -462,15 +556,14 @@ finalize_packed_a_exchange_mt(Xt_request request, void *buf)
static void
xt_exchanger_irecv_isend_packed_a_exchange_omp(
Xt_exchanger exchanger,
const void *src_data,
void *dst_data,
int nsend,
int nrecv,
const struct Xt_msg_param *send_msgs,
const struct Xt_msg_param *recv_msgs,
int tag_offset, MPI_Comm comm,
Xt_request *request)
{
Xt_exchanger_simple_base base = (Xt_exchanger_simple_base)exchanger;
int nsend = base->nmsg[xt_direction_send],
nrecv = base->nmsg[xt_direction_recv];
size_t *buf_ofs, buf_ofs_auto[AUTO_ALLOC_SIZE+1];
size_t num_tx = (size_t)nrecv + (size_t)nsend;
......@@ -479,13 +572,15 @@ xt_exchanger_irecv_isend_packed_a_exchange_omp(
else
buf_ofs = xmalloc((num_tx+1) * sizeof (*buf_ofs));
size_t buffer_size = get_buffer_offsets(buf_ofs, nsend, nrecv,
send_msgs, recv_msgs, comm);
const struct Xt_msg_param *send_msgs = base->msgs,
*recv_msgs = base->msgs + nsend;
size_t buffer_size = get_buffer_offsets(
buf_ofs, nsend, nrecv, send_msgs, recv_msgs, base->comm);
size_t inventory_size = get_inventory_size((size_t)nrecv);
struct Xt_config_ conf = xt_default_config;
xt_config_set_redist_mthread_mode(&conf, XT_MT_OPENMP);
Xt_request requests
= xt_request_msgs_ebuf_alloc(nrecv + nsend, comm,
= xt_request_msgs_ebuf_alloc(nrecv + nsend, base->comm,
inventory_size + buffer_size, &conf);
xt_request_msgs_ebuf_set_finalizer(
requests, finalize_packed_a_exchange_mt);
......@@ -512,7 +607,7 @@ xt_exchanger_irecv_isend_packed_a_exchange_omp(
buf_ofs+start_recv,
src_data, nsend_, nrecv_,
send_msgs+start_send, recv_msgs+start_recv,
comm, tag_offset,
base->comm, base->tag_offset,
prequests+start_req);
size_t *buf_ofs_ = header->buf_ofs;
......@@ -534,7 +629,8 @@ xt_exchanger_irecv_isend_packed_a_exchange_omp(
}
MPI_Datatype *datatypes = get_inventory_datatypes_(header, (size_t)nrecv);
for (int i = start_recv; i < end_recv; ++i)
xt_mpi_call(MPI_Type_dup(recv_msgs[i].datatype, datatypes+i), comm);
xt_mpi_call(MPI_Type_dup(recv_msgs[i].datatype, datatypes+i),
base->comm);
}
*request = requests;
......@@ -545,16 +641,18 @@ xt_exchanger_irecv_isend_packed_a_exchange_omp(
static Xt_exchanger_thread_share
xt_exchanger_irecv_isend_packed_create_thread_share(
int nsend, int nrecv,
const struct Xt_msg_param *send_msgs,
const struct Xt_msg_param *recv_msgs,
MPI_Comm comm, Xt_config config)
Xt_exchanger exchanger, Xt_config config)
{
Xt_exchanger_simple_base base = (Xt_exchanger_simple_base)exchanger;
int nsend = base->nmsg[xt_direction_send],
nrecv = base->nmsg[xt_direction_recv];
const struct Xt_msg_param *send_msgs = base->msgs,
*recv_msgs = base->msgs + nsend;
size_t buf_size
= get_buffer_size(nsend, nrecv, send_msgs, recv_msgs, comm);
= get_buffer_size(nsend, nrecv, send_msgs, recv_msgs, base->comm);
size_t inventory_size = get_inventory_size((size_t)nrecv);
Xt_request shared_req
= xt_request_msgs_ebuf_alloc(nsend+nrecv, comm,
= xt_request_msgs_ebuf_alloc(nsend+nrecv, base->comm,
inventory_size + buf_size, config);
return (Xt_exchanger_thread_share)shared_req;
}
......@@ -579,33 +677,21 @@ xt_exchanger_irecv_isend_packed_new(int nsend, int nrecv,
* be used on @a comm by any other part of the program during the
* lifetime of the created exchanger object
*/
static const xt_simple_s_exchange_func
s_exch_by_mthread_mode[] = {
xt_exchanger_irecv_isend_packed_s_exchange,
#ifdef _OPENMP
xt_exchanger_irecv_isend_packed_s_exchange_omp,
#else
(xt_simple_s_exchange_func)0,
#endif
};
static const xt_simple_a_exchange_func
a_exch_by_mthread_mode[] = {
xt_exchanger_irecv_isend_packed_a_exchange,
#ifdef _OPENMP
xt_exchanger_irecv_isend_packed_a_exchange_omp,
#else
(xt_simple_a_exchange_func)0,
#endif
};
int mthread_mode = xt_config_get_redist_mthread_mode(config);
return xt_exchanger_simple_base_new(
Xt_exchanger_simple_base exchanger =
xt_exchanger_simple_base_alloc((size_t)nsend+(size_t)nrecv, 0);
xt_exchanger_simple_base_init(
exchanger,
nsend, nrecv, send_msgs, recv_msgs,
comm, tag_offset,
s_exch_by_mthread_mode[mthread_mode],
a_exch_by_mthread_mode[mthread_mode],
xt_exchanger_irecv_isend_packed_create_thread_share,
xt_exchanger_irecv_isend_packed_destroy_thread_share,
(xt_simple_s_exchange_func)0,
(xt_simple_a_exchange_func)0,
(xt_simple_create_thread_share_func)0,
(xt_simple_destroy_thread_share_func)0,
config);
int mthread_mode = xt_config_get_redist_mthread_mode(config);
exchanger->vtable
= xt_exchanger_irecv_isend_packed_vtable + mthread_mode;
return (Xt_exchanger)exchanger;
}
/*
......
/**
* @file xt_exchanger_irecv_isend_packed_internal.h
*
* @copyright Copyright (C) 2022 Jörg Behrens <behrens@dkrz.de>
* Moritz Hanke <hanke@dkrz.de>
* Thomas Jahns <jahns@dkrz.de>
*
* @author Jörg Behrens <behrens@dkrz.de>
* Moritz Hanke <hanke@dkrz.de>
* Thomas Jahns <jahns@dkrz.de>
*/
/*
* Keywords:
* Maintainer: Jörg Behrens <behrens@dkrz.de>
* Moritz Hanke <hanke@dkrz.de>
* Thomas Jahns <jahns@dkrz.de>
* URL: https://dkrz-sw.gitlab-pages.dkrz.de/yaxt/
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
*
* Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* Neither the name of the DKRZ GmbH nor the names of its contributors
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
* IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
* TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
* PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
* OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
* NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef XT_EXCHANGER_IRECV_ISEND_PACKED_INTERNAL_H
#define XT_EXCHANGER_IRECV_ISEND_PACKED_INTERNAL_H
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif
#include "core/ppm_visibility.h"
#include "xt/xt_core.h"
#include "xt/xt_config.h"
#include "xt_exchanger.h"
#include "xt_exchanger_internal.h"
PPM_DSO_INTERNAL extern struct xt_exchanger_vtable
xt_exchanger_irecv_isend_packed_vtable[];
PPM_DSO_INTERNAL void
xt_exchanger_irecv_isend_packed_init(void);
#endif // XT_EXCHANGER_IRECV_ISEND_PACKED_INTERNAL_H
/*
* Local Variables:
* c-basic-offset: 2
* coding: utf-8
* indent-tabs-mode: nil
* show-trailing-whitespace: t
* require-trailing-newline: t
* End:
*/
......@@ -57,6 +57,7 @@
# include "xt_exchanger_neigh_alltoall.h"
#endif
#include "xt_exchanger_irecv_isend_packed.h"
#include "xt_exchanger_irecv_isend_packed_internal.h"
#include "xt_exchanger_irecv_isend_ddt_packed.h"
#if XT_ENABLE_DDT_EXCHANGER
# include "xt_exchanger_irecv_isend_ddt_packed_internal.h"
......@@ -77,10 +78,10 @@ xt_exchanger_new_get_vtable(Xt_exchanger_new exchanger_new)
else if (exchanger_new == xt_exchanger_irecv_send_new)
vtab = xt_exchanger_irecv_send_vtable;
else if (exchanger_new == xt_exchanger_irecv_isend_packed_new)
vtab = &xt_exchanger_simple_base_vtable;
vtab = xt_exchanger_irecv_isend_packed_vtable;
#if XT_ENABLE_DDT_EXCHANGER
else if (exchanger_new == xt_exchanger_irecv_isend_ddt_packed_new)
vtab = &xt_exchanger_irecv_isend_ddt_pack_vtable;
vtab = xt_exchanger_irecv_isend_ddt_packed_vtable;
#endif
#if XT_CAN_USE_MPI_NEIGHBOR_ALLTOALL
else if (exchanger_new == xt_exchanger_neigh_alltoall_new)
......
......@@ -58,6 +58,7 @@
#include "xt_idxempty_internal.h"
#include "xt_exchanger.h"
#include "xt_exchanger_irecv_isend_internal.h"
#include "xt_exchanger_irecv_isend_packed_internal.h"
#include "xt_exchanger_irecv_send_internal.h"
#include "xt_exchanger_irecv_isend_ddt_packed_internal.h"
#include "xt_mpi_internal.h"
......@@ -77,6 +78,7 @@ xt_initialize(MPI_Comm default_comm)
xt_config_defaults_init();
xt_exchanger_irecv_isend_init();
xt_exchanger_irecv_send_init();
xt_exchanger_irecv_isend_packed_init();
xt_idxempty_init();
xt_idxstripes_initialize();
xt_idxsection_initialize();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment