Skip to content
Snippets Groups Projects
Commit 7c702573 authored by Thomas Jahns's avatar Thomas Jahns :cartwheel:
Browse files

Add sharing of data for neighbor exchanger.

parent 864e3403
No related branches found
No related tags found
No related merge requests found
......@@ -66,6 +66,7 @@
struct Xt_config_ xt_default_config = {
.exchanger_new = xt_exchanger_mix_isend_irecv_new,
.exchanger_team_share = NULL,
.idxv_cnv_size = CHEAP_VECTOR_SIZE,
.flags = 0,
};
......
......@@ -67,6 +67,10 @@ struct Xt_config_ {
* constructor to use when creating the exchanger of a redist
*/
Xt_exchanger_new exchanger_new;
/**
* pointer to exchanger team share data
*/
void *exchanger_team_share;
/**
* automatically compress index lists of vector type at this size
* into another representation to save on computation/memory overall
......
......@@ -89,6 +89,33 @@ xt_exchanger_neigh_alltoall_get_MPI_Datatype(Xt_exchanger exchanger,
enum xt_msg_direction direction,
bool do_dup);
struct xt_exchanger_neigh_alltoall_team_share
{
int *ranks;
int *one_counts;
MPI_Aint *displs;
MPI_Comm nb_comm;
int nmsg[2];
};
static void xt_exchanger_neigh_alltoall_team_share_default_init(void *share)
{
struct xt_exchanger_neigh_alltoall_team_share *team_share = share;
team_share->ranks = NULL;
team_share->one_counts = NULL;
team_share->displs = NULL;
team_share->nb_comm = MPI_COMM_NULL;
}
static void xt_exchanger_neigh_alltoall_team_share_destroy(void *share)
{
struct xt_exchanger_neigh_alltoall_team_share *team_share = share;
if (team_share->nb_comm != MPI_COMM_NULL)
xt_mpi_call(MPI_Comm_free(&team_share->nb_comm), Xt_default_comm);
free(team_share->one_counts);
free(team_share->displs);
free(team_share->ranks);
}
const struct xt_exchanger_vtable xt_exchanger_neigh_alltoall_vtable = {
.copy = xt_exchanger_neigh_alltoall_copy,
......@@ -97,6 +124,10 @@ const struct xt_exchanger_vtable xt_exchanger_neigh_alltoall_vtable = {
.a_exchange = xt_exchanger_neigh_alltoall_a_exchange,
.get_msg_ranks = xt_exchanger_neigh_alltoall_get_msg_ranks,
.get_MPI_Datatype = xt_exchanger_neigh_alltoall_get_MPI_Datatype,
.team_share_size = sizeof (struct xt_exchanger_neigh_alltoall_team_share),
.team_share_default_init
= xt_exchanger_neigh_alltoall_team_share_default_init,
.team_share_destroy = xt_exchanger_neigh_alltoall_team_share_destroy,
};
typedef struct Xt_exchanger_neigh_alltoall_ * Xt_exchanger_neigh_alltoall;
......@@ -105,46 +136,48 @@ struct Xt_exchanger_neigh_alltoall_ {
const struct xt_exchanger_vtable * vtable;
int nmsg[2];
int tag_offset;
MPI_Comm comm;
int * ranks;
int * one_counts;
MPI_Aint * displs;
MPI_Datatype * datatypes;
struct xt_exchanger_neigh_alltoall_team_share *team_share,
team_share_[];
};
static Xt_exchanger_neigh_alltoall
xt_exchanger_neigh_alltoall_alloc(size_t nsend, size_t nrecv)
xt_exchanger_neigh_alltoall_alloc(size_t nsend, size_t nrecv,
void *exchanger_team_share)
{
size_t nmsg = nsend + nrecv;
size_t max_msgs = MAX(nsend, nrecv);
Xt_exchanger_neigh_alltoall exchanger = xmalloc(1 * sizeof(*exchanger));
exchanger->ranks = xmalloc(nmsg * sizeof(*(exchanger->ranks)));
exchanger->datatypes = xmalloc(nmsg * sizeof(*(exchanger->datatypes)));
exchanger->one_counts = xmalloc(max_msgs * sizeof(*(exchanger->one_counts)));
exchanger->displs = xmalloc(max_msgs * sizeof(*(exchanger->displs)));
bool need_team_share_alloc = exchanger_team_share == NULL;
Xt_exchanger_neigh_alltoall exchanger
= xmalloc(sizeof(*exchanger)
+ (need_team_share_alloc ? sizeof (*exchanger->team_share) : 0));
exchanger->datatypes = xmalloc(nmsg * sizeof(*exchanger->datatypes));
exchanger->vtable = &xt_exchanger_neigh_alltoall_vtable;
for (size_t i = 0; i < max_msgs; ++i) {
exchanger->one_counts[i] = 1;
exchanger->displs[i] = 0;
}
if (need_team_share_alloc) {
exchanger->team_share = exchanger->team_share_;
xt_exchanger_neigh_alltoall_team_share_default_init(exchanger->team_share_);
} else
exchanger->team_share = exchanger_team_share;
return exchanger;
}
static void copy_from_redist_msgs(size_t n,
const struct Xt_redist_msg *restrict msgs,
int *restrict ranks,
MPI_Datatype *restrict datatypes,
MPI_Comm comm, bool dt_dup) {
static void copy_dt(size_t n,
const struct Xt_redist_msg *restrict msgs,
MPI_Datatype *restrict datatypes,
MPI_Comm comm, bool dt_dup) {
for (size_t i = 0; i < n; ++i) {
ranks[i] = msgs[i].rank;
for (size_t i = 0; i < n; ++i)
if (dt_dup)
xt_mpi_call(MPI_Type_dup(msgs[i].datatype, datatypes + i), comm);
else
datatypes[i] = msgs[i].datatype;
}
}
static void copy_ranks(size_t n,
const struct Xt_redist_msg *restrict msgs,
int *restrict ranks)
{
for (size_t i = 0; i < n; ++i)
ranks[i] = msgs[i].rank;
}
Xt_exchanger
......@@ -157,7 +190,7 @@ xt_exchanger_neigh_alltoall_new(int nsend, int nrecv,
* be used on @a comm by any other part of the program during the
* lifetime of the created exchanger object
*/
(void)tag_offset;
int flag;
xt_mpi_call(MPI_Comm_test_inter(comm, &flag), comm);
if (flag)
......@@ -167,32 +200,44 @@ xt_exchanger_neigh_alltoall_new(int nsend, int nrecv,
assert((nsend >= 0) & (nrecv >= 0));
Xt_exchanger_neigh_alltoall exchanger
= xt_exchanger_neigh_alltoall_alloc((size_t)nsend, (size_t)nrecv);
exchanger->tag_offset = tag_offset;
exchanger->nmsg[SEND] = nsend;
= xt_exchanger_neigh_alltoall_alloc((size_t)nsend, (size_t)nrecv,
config->exchanger_team_share);
bool dt_dup = !(config->flags & exch_no_dt_dup);
copy_from_redist_msgs((size_t)nsend, send_msgs, exchanger->ranks,
exchanger->datatypes, comm, dt_dup);
exchanger->nmsg[RECV] = nrecv;
copy_from_redist_msgs((size_t)nrecv, recv_msgs, exchanger->ranks + nsend,
exchanger->datatypes + nsend, comm, dt_dup);
enum { no_reorder = 0 }; // no reordering of ranks in new comm
copy_dt((size_t)nsend, send_msgs, exchanger->datatypes, comm, dt_dup);
copy_dt((size_t)nrecv, recv_msgs, exchanger->datatypes + nsend, comm, dt_dup);
struct xt_exchanger_neigh_alltoall_team_share *team_share
= exchanger->team_share;
if (team_share->nb_comm == MPI_COMM_NULL) {
size_t nmsg = (size_t)nsend + (size_t)nrecv;
size_t max_msgs = MAX((size_t)nsend, (size_t)nrecv);
team_share->nmsg[RECV] = nrecv;
team_share->nmsg[SEND] = nsend;
team_share->ranks = xmalloc(nmsg * sizeof(*team_share->ranks));
team_share->one_counts
= xmalloc(max_msgs * sizeof(*team_share->one_counts));
team_share->displs = xmalloc(max_msgs * sizeof(*team_share->displs));
copy_ranks((size_t)nsend, send_msgs, team_share->ranks);
copy_ranks((size_t)nrecv, recv_msgs, team_share->ranks+nsend);
for (size_t i = 0; i < max_msgs; ++i) {
team_share->one_counts[i] = 1;
team_share->displs[i] = 0;
}
enum { no_reorder = 0 }; // no reordering of ranks in new comm
#if __GNUC__ == 11
/* GCC 11 has no means to specify that the special value pointer
* MPI_UNWEIGHTED does not need to point to something of size > 0 */
/* GCC 11 has no means to specify that the special value pointer
* MPI_UNWEIGHTED does not need to point to something of size > 0 */
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wstringop-overread"
#endif
xt_mpi_call(
MPI_Dist_graph_create_adjacent(
comm, nrecv, exchanger->ranks + nsend, MPI_UNWEIGHTED, nsend,
exchanger->ranks, MPI_UNWEIGHTED, MPI_INFO_NULL, no_reorder,
&(exchanger->comm)), comm);
xt_mpi_call(
MPI_Dist_graph_create_adjacent(
comm, nrecv, team_share->ranks + nsend, MPI_UNWEIGHTED, nsend,
team_share->ranks, MPI_UNWEIGHTED, MPI_INFO_NULL, no_reorder,
&team_share->nb_comm), comm);
#if __GNUC__ == 11
#pragma GCC diagnostic pop
#endif
}
return (Xt_exchanger)exchanger;
}
......@@ -200,21 +245,56 @@ static Xt_exchanger
xt_exchanger_neigh_alltoall_copy(Xt_exchanger exchanger,
MPI_Comm new_comm, int new_tag_offset)
{
(void)new_tag_offset;
Xt_exchanger_neigh_alltoall exchanger_na =
(Xt_exchanger_neigh_alltoall)exchanger;
size_t nsend = (size_t)(exchanger_na->nmsg[SEND]),
nrecv = (size_t)(exchanger_na->nmsg[RECV]),
nmsg = nsend + nrecv;
const struct xt_exchanger_neigh_alltoall_team_share
*team_share = exchanger_na->team_share;
size_t nsend = (size_t)team_share->nmsg[SEND],
nrecv = (size_t)team_share->nmsg[RECV];
Xt_exchanger_neigh_alltoall
exchanger_copy = xt_exchanger_neigh_alltoall_alloc(nsend, nrecv);
exchanger_copy->nmsg[SEND] = (int)nsend;
exchanger_copy->nmsg[RECV] = (int)nrecv;
exchanger_copy->tag_offset = new_tag_offset;
exchanger_copy->comm = new_comm;
memcpy(exchanger_copy->ranks, exchanger_na->ranks,
nmsg * sizeof(*(exchanger_copy->ranks)));
exchanger_copy = xt_exchanger_neigh_alltoall_alloc(nsend, nrecv, NULL);
struct xt_exchanger_neigh_alltoall_team_share
*team_share_copy = exchanger_copy->team_share;
size_t nmsg = nsend + nrecv;
size_t max_msgs = MAX(nsend, nrecv);
team_share_copy->nmsg[SEND] = (int)nsend;
team_share_copy->nmsg[RECV] = (int)nrecv;
team_share_copy->ranks = xmalloc(nmsg * sizeof(*team_share_copy->ranks));
team_share_copy->one_counts
= xmalloc(max_msgs * sizeof(*team_share_copy->one_counts));
team_share_copy->displs
= xmalloc(max_msgs * sizeof(*team_share_copy->displs));
memcpy(team_share_copy->ranks, team_share->ranks,
nmsg * sizeof(*team_share_copy->ranks));
for (size_t i = 0; i < max_msgs; ++i) {
team_share_copy->one_counts[i] = 1;
team_share_copy->displs[i] = 0;
}
#if defined MPICH_NUMVERSION && MPICH_NUMVERSION < 40000000
/* MPICH up to version 3.4.2 at least cannot do MPI_Comm_dup
* for topology communicators */
enum { no_reorder = 0 }; // no reordering of ranks in new comm
#if __GNUC__ == 11
/* GCC 11 has no means to specify that the special value pointer
* MPI_UNWEIGHTED does not need to point to something of size > 0 */
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wstringop-overread"
#endif
xt_mpi_call(
MPI_Dist_graph_create_adjacent(
new_comm, (int)nrecv, team_share_copy->ranks + nsend, MPI_UNWEIGHTED,
(int)nsend, team_share_copy->ranks, MPI_UNWEIGHTED, MPI_INFO_NULL,
no_reorder, &team_share_copy->nb_comm), new_comm);
#if __GNUC__ == 11
#pragma GCC diagnostic pop
#endif
#else
xt_mpi_call(MPI_Comm_dup(team_share->nb_comm, &team_share_copy->nb_comm),
new_comm);
#endif
for (size_t i = 0; i < nmsg; ++i)
xt_mpi_call(MPI_Type_dup(exchanger_na->datatypes[i],
exchanger_copy->datatypes + i), new_comm);
......@@ -226,21 +306,18 @@ static void xt_exchanger_neigh_alltoall_delete(Xt_exchanger exchanger) {
Xt_exchanger_neigh_alltoall exchanger_na =
(Xt_exchanger_neigh_alltoall)exchanger;
struct xt_exchanger_neigh_alltoall_team_share
*team_share_na = exchanger_na->team_share;
size_t nmsg = (size_t)exchanger_na->nmsg[SEND]
+ (size_t)exchanger_na->nmsg[RECV];
MPI_Comm comm = exchanger_na->comm;
size_t nmsg = (size_t)team_share_na->nmsg[SEND]
+ (size_t)team_share_na->nmsg[RECV];
free(exchanger_na->ranks);
free(exchanger_na->one_counts);
free(exchanger_na->displs);
for (size_t i = 0; i < nmsg; ++i) {
MPI_Datatype *dt = exchanger_na->datatypes + i;
if (*dt != MPI_DATATYPE_NULL)
xt_mpi_call(MPI_Type_free(dt), comm);
}
for (size_t i = 0; i < nmsg; ++i)
xt_mpi_call(MPI_Type_free(exchanger_na->datatypes + i),
team_share_na->nb_comm);
free(exchanger_na->datatypes);
xt_mpi_call(MPI_Comm_free(&(exchanger_na->comm)), Xt_default_comm);
if (exchanger_na->team_share == exchanger_na->team_share_)
xt_exchanger_neigh_alltoall_team_share_destroy(exchanger_na->team_share_);
free(exchanger_na);
}
......@@ -250,15 +327,17 @@ static void xt_exchanger_neigh_alltoall_s_exchange(Xt_exchanger exchanger,
Xt_exchanger_neigh_alltoall exchanger_na =
(Xt_exchanger_neigh_alltoall)exchanger;
struct xt_exchanger_neigh_alltoall_team_share
*team_share = exchanger_na->team_share;
xt_mpi_call(
MPI_Neighbor_alltoallw(src_data, exchanger_na->one_counts,
exchanger_na->displs, exchanger_na->datatypes,
dst_data, exchanger_na->one_counts,
exchanger_na->displs, exchanger_na->datatypes +
(size_t)(exchanger_na->nmsg[SEND]),
exchanger_na->comm),
exchanger_na->comm);
MPI_Neighbor_alltoallw(src_data, team_share->one_counts,
team_share->displs, exchanger_na->datatypes,
dst_data, team_share->one_counts,
team_share->displs, exchanger_na->datatypes +
(size_t)team_share->nmsg[SEND],
team_share->nb_comm),
team_share->nb_comm);
}
static void xt_exchanger_neigh_alltoall_a_exchange(Xt_exchanger exchanger,
......@@ -268,19 +347,58 @@ static void xt_exchanger_neigh_alltoall_a_exchange(Xt_exchanger exchanger,
Xt_exchanger_neigh_alltoall exchanger_na =
(Xt_exchanger_neigh_alltoall)exchanger;
struct xt_exchanger_neigh_alltoall_team_share
*team_share = exchanger_na->team_share;
#if defined OMPI_MAJOR_VERSION && OMPI_MAJOR_VERSION == 4 \
&& OMPI_MINOR_VERSION == 0 && OMPI_RELEASE_VERSION == 2
/* ugly work-around: Open MPI retains pointers to arguments
* of MPI_Ineighbor_alltoallw, but the exchanger might have
* been destroyed by the time Open MPI starts to use it.
* Therefore, this work-around resizes the request object and
* stores copies of some arguments in the request object tail.
* Not pretty, I know, but Open MPI 4.0.x is too recent to ignore. */
struct Xt_request_msgs_ {
const struct Xt_request_vtable *vtable;
int n;
MPI_Comm comm;
MPI_Request requests[];
};
size_t nmsg = (size_t)team_share->nmsg[SEND] + (size_t)team_share->nmsg[RECV],
header_size = sizeof (struct Xt_request_msgs_),
body_size = sizeof (MPI_Datatype) * nmsg + sizeof (int) * nmsg,
dt_size = nmsg * sizeof (MPI_Datatype);
body_size = (body_size + sizeof (MPI_Datatype) - 1)
/ sizeof (MPI_Datatype) * sizeof (MPI_Datatype);
struct Xt_request_msgs_ *request_
= xrealloc(xt_request_msgs_new(1, (MPI_Request[]){ MPI_REQUEST_NULL},
team_share->nb_comm),
header_size + body_size + dt_size);
*request = (Xt_request)request_;
MPI_Datatype *dt_copy
= (MPI_Datatype *)(void *)((unsigned char *)request_ + header_size + body_size);
memcpy(dt_copy, exchanger_na->datatypes, dt_size);
xt_mpi_call(
MPI_Ineighbor_alltoallw(src_data, team_share->one_counts,
team_share->displs, dt_copy,
dst_data, team_share->one_counts,
team_share->displs, dt_copy +
(size_t)team_share->nmsg[SEND],
team_share->nb_comm, request_->requests),
team_share->nb_comm);
#else
MPI_Request tmp_request;
xt_mpi_call(
MPI_Ineighbor_alltoallw(src_data, exchanger_na->one_counts,
exchanger_na->displs, exchanger_na->datatypes,
dst_data, exchanger_na->one_counts,
exchanger_na->displs, exchanger_na->datatypes +
(size_t)(exchanger_na->nmsg[SEND]),
exchanger_na->comm, &tmp_request),
exchanger_na->comm);
*request = xt_request_msgs_new(1, &tmp_request, exchanger_na->comm);
MPI_Ineighbor_alltoallw(src_data, team_share->one_counts,
team_share->displs, exchanger_na->datatypes,
dst_data, team_share->one_counts,
team_share->displs, exchanger_na->datatypes +
(size_t)team_share->nmsg[SEND],
team_share->nb_comm, &tmp_request),
team_share->nb_comm);
*request = xt_request_msgs_new(1, &tmp_request, team_share->nb_comm);
#endif
}
static MPI_Datatype
......@@ -291,16 +409,18 @@ xt_exchanger_neigh_alltoall_get_MPI_Datatype(Xt_exchanger exchanger,
{
Xt_exchanger_neigh_alltoall exchanger_na =
(Xt_exchanger_neigh_alltoall)exchanger;
size_t nsend = (size_t)exchanger_na->nmsg[SEND],
nmsg = (size_t)exchanger_na->nmsg[direction],
struct xt_exchanger_neigh_alltoall_team_share
*team_share = exchanger_na->team_share;
size_t nsend = (size_t)team_share->nmsg[SEND],
nmsg = (size_t)team_share->nmsg[direction],
ofs = direction == SEND ? 0 : nsend;
int *restrict ranks = exchanger_na->ranks + ofs;
int *restrict ranks = team_share->ranks + ofs;
MPI_Datatype datatype_copy = MPI_DATATYPE_NULL;
for (size_t i = 0; i < nmsg; ++i) {
if (ranks[i] == rank) {
if (do_dup)
xt_mpi_call(MPI_Type_dup(exchanger_na->datatypes[i+ofs], &datatype_copy),
exchanger_na->comm);
team_share->nb_comm);
else
datatype_copy = exchanger_na->datatypes[i+ofs];
break;
......@@ -316,13 +436,15 @@ xt_exchanger_neigh_alltoall_get_msg_ranks(Xt_exchanger exchanger,
{
Xt_exchanger_neigh_alltoall exchanger_na =
(Xt_exchanger_neigh_alltoall)exchanger;
size_t nsend = (size_t)exchanger_na->nmsg[SEND],
nmsg = (size_t)exchanger_na->nmsg[direction],
struct xt_exchanger_neigh_alltoall_team_share
*team_share = exchanger_na->team_share;
size_t nsend = (size_t)team_share->nmsg[SEND],
nmsg = (size_t)team_share->nmsg[direction],
ofs = direction == SEND ? 0 : nsend;
int *ranks_ = *ranks;
if (!ranks_)
ranks_ = *ranks = xmalloc(nmsg * sizeof(**ranks));
memcpy(ranks_, exchanger_na->ranks + ofs, nmsg * sizeof(**ranks));
memcpy(ranks_, team_share->ranks + ofs, nmsg * sizeof(**ranks));
return (int)nmsg;
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment