Commit 879b4d4e authored by Moritz Hanke's avatar Moritz Hanke
Browse files

adds new redistribution type: xt_redist_collection_static

parent 48b0b3d3
......@@ -65,23 +65,25 @@ AS_IF([test x$MPI_LAUNCH = xtrue],
AC_CONFIG_HEADER([include/config.h])
AC_CONFIG_FILES([tests/test_redist_p2p_parallel_run \
tests/test_redist_collection_parallel_run \
tests/test_xmap_all2all_parallel_run \
tests/test_xmap_dist_dir_parallel_run \
tests/test_idxvec_run \
tests/test_idxlist_collection_run \
tests/test_idxsection_run \
tests/test_idxstripes_run \
tests/test_redist_collection_run \
tests/test_redist_p2p_run \
tests/test_xmap_all2all_run \
tests/test_xmap_dist_dir_run \
tests/test_handles_run \
tests/test_ut_run \
tests/test_perf_run \
tests/test_perf_stripes_run \
tests/test_sort_run \
tests/test_yaxt_run \
AC_CONFIG_FILES([tests/test_redist_p2p_parallel_run \
tests/test_redist_collection_parallel_run \
tests/test_redist_collection_static_parallel_run \
tests/test_xmap_all2all_parallel_run \
tests/test_xmap_dist_dir_parallel_run \
tests/test_idxvec_run \
tests/test_idxlist_collection_run \
tests/test_idxsection_run \
tests/test_idxstripes_run \
tests/test_redist_collection_run \
tests/test_redist_collection_static_run \
tests/test_redist_p2p_run \
tests/test_xmap_dist_dir_run \
tests/test_xmap_all2all_run \
tests/test_handles_run \
tests/test_ut_run \
tests/test_perf_run \
tests/test_perf_stripes_run \
tests/test_sort_run \
tests/test_yaxt_run \
util/serialrun],[chmod a+x "$ac_file"])
AC_OUTPUT([Makefile src/Makefile tests/Makefile])
......@@ -22,6 +22,8 @@ libyaxt_a_SOURCES = \
xt_redist.h \
xt_redist_collection.c \
xt_redist_collection.h \
xt_redist_collection_static.c \
xt_redist_collection_static.h \
xt_redist_p2p.c \
xt_redist_p2p.h \
xt_xmap.c \
......
#include <stdlib.h>
#include <string.h>
#include <mpi.h>
......@@ -70,28 +71,28 @@ static void get_MPI_datatypes(struct redist_collection_msg ** msgs, int * nmsgs,
int comm_size;
mpi_err_handler(MPI_Comm_size(comm, &comm_size), comm);
MPI_Datatype datatypes[num_redists];
for (int i = 0; i < comm_size; ++i) {
for (unsigned j = 0; j < num_redists; ++j) {
int flag = 0;
for (unsigned j = 0; j < num_redists; ++j)
flag |= (datatypes[j] = get_MPI_datatype(redists[j], i)) != MPI_DATATYPE_NULL;
if (get_MPI_datatype(redists[j], i) != MPI_DATATYPE_NULL) {
if (flag) {
ENSURE_ARRAY_SIZE(*msgs, msgs_array_size, *nmsgs+1);
(*msgs)[*nmsgs].rank = i;
(*msgs)[*nmsgs].datatypes = xmalloc(num_redists *
sizeof(*((*msgs)[*nmsgs].datatypes)));
memcpy((*msgs)[*nmsgs].datatypes, datatypes, num_redists * sizeof(*datatypes));
for (unsigned k = 0; k < DATATYPE_CACHE_SIZE; ++k)
(*msgs)[*nmsgs].datatype_cache[k] = MPI_DATATYPE_NULL;
for (unsigned j = 0; j < num_redists; ++j)
(*msgs)[*nmsgs].datatypes[j] = get_MPI_datatype(redists[j], i);
++*nmsgs;
break;
}
}
}
......
#include <stdlib.h>
#include <mpi.h>
#include "core/core.h"
#include "core/ppm_xfuncs.h"
#include "xt_mpi.h"
#include "xt_redist_collection_static.h"
#include "ensure_array_size.h"
#include "xt_redist.h"
static void
redist_collection_static_delete(Xt_redist redist);
static void
redist_collection_static_s_exchange(Xt_redist redist, void **src_data,
unsigned num_src_arrays, void **dst_data,
unsigned num_dst_arrays);
static void
redist_collection_static_s_exchange1(Xt_redist redist, void *src_data, void *dst_data);
static MPI_Datatype
redist_collection_static_get_send_MPI_Datatype(Xt_redist redist, int rank);
static MPI_Datatype
redist_collection_static_get_recv_MPI_Datatype(Xt_redist redist, int rank);
static const struct xt_redist_vtable redist_collection_static_vtable = {
.delete = redist_collection_static_delete,
.s_exchange = redist_collection_static_s_exchange,
.s_exchange1 = redist_collection_static_s_exchange1,
.get_send_MPI_Datatype = redist_collection_static_get_send_MPI_Datatype,
.get_recv_MPI_Datatype = redist_collection_static_get_recv_MPI_Datatype
};
struct redist_collection_static_msg {
int rank;
MPI_Datatype datatype;
};
struct Xt_redist_collection_static {
const struct xt_redist_vtable *vtable;
int ndst, nsrc;
struct redist_collection_static_msg * send_msgs;
struct redist_collection_static_msg * recv_msgs;
MPI_Comm comm;
};
static MPI_Datatype
generate_datatype(MPI_Aint * displacements, int * block_lengths,
unsigned num_redists, MPI_Datatype * datatypes,
MPI_Comm comm) {
MPI_Datatype datatype;
int num_datatypes = 0;
for (int i = 0; i < num_redists; ++i)
if (datatypes[i] != MPI_DATATYPE_NULL)
++num_datatypes;
MPI_Datatype * datatypes_;
MPI_Aint * displacements_;
if (num_datatypes != num_redists) {
datatypes_ = xmalloc(num_datatypes * sizeof(*datatypes_));
displacements_ = xmalloc(num_datatypes * sizeof(*displacements));
num_datatypes = 0;
for (int i = 0; i < num_redists; ++i) {
if (datatypes[i] != MPI_DATATYPE_NULL) {
datatypes_[num_datatypes] = datatypes[i];
displacements_[num_datatypes] = displacements[i];
++num_datatypes;
}
}
} else {
datatypes_ = datatypes;
displacements_ = displacements;
}
mpi_err_handler(MPI_Type_create_struct(num_datatypes, block_lengths, displacements_,
datatypes_, &datatype), comm);
mpi_err_handler(MPI_Type_commit(&datatype), comm);
if (num_datatypes != num_redists) {
free(datatypes_);
free(displacements_);
}
return datatype;
}
static void generate_msg_infos(struct redist_collection_static_msg ** msgs, int * nmsgs,
MPI_Aint * displacements, Xt_redist * redists,
unsigned num_redists, MPI_Comm comm,
MPI_Datatype (*get_MPI_datatype)(Xt_redist,int)) {
int msgs_array_size = 0;
int comm_size;
mpi_err_handler(MPI_Comm_size(comm, &comm_size), comm);
int block_lengths[num_redists];
MPI_Datatype datatypes[num_redists];
int flag;
for (unsigned i = 0; i < num_redists; ++i)
block_lengths[i] = 1;
for (int i = 0; i < comm_size; ++i) {
for (unsigned j = 0; j < num_redists; ++j)
datatypes[j] = get_MPI_datatype(redists[j], i);
for (unsigned j = 0; j < num_redists; ++j) {
if (datatypes[j] != MPI_DATATYPE_NULL) {
ENSURE_ARRAY_SIZE(*msgs, msgs_array_size, *nmsgs+1);
(*msgs)[*nmsgs].rank = i;
(*msgs)[*nmsgs].datatype = generate_datatype(displacements, block_lengths,
num_redists, datatypes, comm);
++*nmsgs;
break;
}
}
for (unsigned j = 0; j < num_redists; ++j)
if (datatypes[j] != MPI_DATATYPE_NULL)
mpi_err_handler(MPI_Type_free(datatypes+j), comm);
}
if (*nmsgs > 0)
*msgs = xrealloc(*msgs, *nmsgs * sizeof(**msgs));
}
Xt_redist xt_redist_collection_static_new(Xt_redist * redists, unsigned num_redists,
MPI_Aint * src_displacements,
MPI_Aint * dst_displacements, MPI_Comm comm) {
struct Xt_redist_collection_static * redist_coll;
redist_coll = xmalloc(1 * sizeof(*redist_coll));
redist_coll->vtable = &redist_collection_static_vtable;
redist_coll->ndst = 0;
redist_coll->nsrc = 0;
redist_coll->send_msgs = NULL;
redist_coll->recv_msgs = NULL;
mpi_err_handler(MPI_Comm_dup(comm, &(redist_coll->comm)), comm);
generate_msg_infos(&(redist_coll->send_msgs), &(redist_coll->nsrc),
src_displacements, redists, num_redists, redist_coll->comm,
xt_redist_get_send_MPI_Datatype);
generate_msg_infos(&(redist_coll->recv_msgs), &(redist_coll->ndst),
dst_displacements, redists, num_redists, redist_coll->comm,
xt_redist_get_recv_MPI_Datatype);
return (Xt_redist)redist_coll;
}
static void
redist_collection_static_s_exchange(Xt_redist redist, void **src_data,
unsigned num_src_arrays, void **dst_data,
unsigned num_dst_arrays) {
struct Xt_redist_collection_static * redist_coll;
redist_coll = (struct Xt_redist_collection_static *)redist;
Xt_abort(redist_coll->comm, "ERROR: s_exchange is not implemented for"
" this xt_redist type (xt_redist_collection_static)", __FILE__, __LINE__);
}
static void
redist_collection_static_delete(Xt_redist redist) {
struct Xt_redist_collection_static * redist_coll;
redist_coll = (struct Xt_redist_collection_static *)redist;
int i;
for (i = 0; i < redist_coll->nsrc; ++i)
MPI_Type_free(&(redist_coll->send_msgs[i].datatype));
free(redist_coll->send_msgs);
for (i = 0; i < redist_coll->ndst; ++i)
MPI_Type_free(&(redist_coll->recv_msgs[i].datatype));
free(redist_coll->recv_msgs);
mpi_err_handler(MPI_Comm_free(&(redist_coll->comm)), MPI_COMM_WORLD);
free(redist_coll);
}
static MPI_Datatype
redist_collection_static_get_send_MPI_Datatype(Xt_redist redist, int rank) {
struct Xt_redist_collection_static * redist_coll;
redist_coll = (struct Xt_redist_collection_static *)redist;
MPI_Datatype datatype_copy;
datatype_copy = MPI_DATATYPE_NULL;
for (int i = 0; i < redist_coll->nsrc; ++i)
if (redist_coll->send_msgs[i].rank == rank) {
mpi_err_handler(MPI_Type_dup(redist_coll->send_msgs[i].datatype, &datatype_copy),
redist_coll->comm);
break;
}
return datatype_copy;
}
static MPI_Datatype
redist_collection_static_get_recv_MPI_Datatype(Xt_redist redist, int rank) {
struct Xt_redist_collection_static * redist_coll;
redist_coll = (struct Xt_redist_collection_static *)redist;
MPI_Datatype datatype_copy;
datatype_copy = MPI_DATATYPE_NULL;
for (int i = 0; i < redist_coll->ndst; ++i)
if (redist_coll->recv_msgs[i].rank == rank) {
mpi_err_handler(MPI_Type_dup(redist_coll->recv_msgs[i].datatype, &datatype_copy),
redist_coll->comm);
break;
}
return datatype_copy;
}
static void
redist_collection_static_s_exchange1(Xt_redist redist, void *src_data, void *dst_data) {
struct Xt_redist_collection_static * redist_coll;
redist_coll = (struct Xt_redist_collection_static *) redist;
MPI_Request * recv_requests;
recv_requests = xmalloc(redist_coll->ndst * sizeof(*recv_requests));
for (int i = 0; i < redist_coll->ndst; ++i)
mpi_err_handler(MPI_Irecv(dst_data, 1, redist_coll->recv_msgs[i].datatype,
redist_coll->recv_msgs[i].rank, 0, redist_coll->comm,
recv_requests+i), redist_coll->comm);
for (int i = 0; i < redist_coll->nsrc; ++i)
mpi_err_handler(MPI_Send(src_data, 1, redist_coll->send_msgs[i].datatype,
redist_coll->send_msgs[i].rank, 0, redist_coll->comm),
redist_coll->comm);
mpi_err_handler(MPI_Waitall(redist_coll->ndst, recv_requests,
MPI_STATUSES_IGNORE), redist_coll->comm);
free(recv_requests);
}
#ifndef XT_REDIST_COLLECTION_H
#define XT_REDIST_COLLECTION_H
#include "xt_redist.h"
/**
* constructor for a redistribution collection that is comprised
* of multiple other redistributions
* @param[in] redists redistributions
* @param[in] num_redists number of redistributions
* @param[in] src_displacements array of displacements of the source
* input arrays for the exchange
* @param[in] dst_displacements array of displacements of the destination
* input arrays for the exchange
* @param[in] comm MPI communicator
*
* \remarks all redistributions need to be based on the same
* MPI communicator
*/
Xt_redist xt_redist_collection_static_new(Xt_redist * redists, unsigned num_redists,
MPI_Aint * src_displacements,
MPI_Aint * dst_displacements, MPI_Comm comm);
#endif // XT_REDIST_COLLECTION_H
......@@ -6,6 +6,8 @@ noinst_PROGRAMS = \
test_idxstripes \
test_redist_collection \
test_redist_collection_parallel \
test_redist_collection_static \
test_redist_collection_static_parallel \
test_redist_p2p \
test_redist_p2p_parallel \
test_xmap_all2all \
......@@ -25,6 +27,8 @@ test_idxsection_SOURCES = test_idxsection.c tests.h test_idxlist.h
test_idxstripes_SOURCES = test_idxstripes.c tests.h test_idxlist.h
test_redist_collection_SOURCES = test_redist_collection.c test.h
test_redist_collection_parallel_SOURCES = test_redist_collection_parallel.c test.h
test_redist_collection_static_SOURCES = test_redist_collection_static.c test.h
test_redist_collection_static_parallel_SOURCES = test_redist_collection_static_parallel.c test.h
test_redist_p2p_SOURCES = test_redist_p2p.c test.h
test_redist_p2p_parallel_SOURCES = test_redist_p2p_parallel.c test.h
test_xmap_all2all_SOURCES = test_xmap_all2all.c tests.h
......@@ -52,6 +56,8 @@ TESTS = \
test_redist_p2p_run \
test_redist_collection_run \
test_redist_collection_parallel_run \
test_redist_collection_static_run \
test_redist_collection_static_parallel_run \
test_redist_p2p_parallel_run \
test_xmap_all2all_run \
test_xmap_all2all_parallel_run \
......
#include <string.h>
#include <mpi.h>
#include "tests.h"
#include "yaxt.h"
#include "xt_idxlist.h"
#include "xt_idxvec.h"
#include "xt_redist_p2p.h"
#include "xt_redist_collection_static.h"
#include "xt_mpi.h"
#include "xt_xmap_all2all.h"
int main(void) {
// init mpi
mpi_err_handler(MPI_Init(NULL, NULL), MPI_COMM_WORLD);
Xt_initialize(MPI_COMM_WORLD);
{ // general test with one redist
// set up data
Xt_idx src_index_list[] = {1,2,3,4,5};
unsigned src_num_indices = sizeof(src_index_list) / sizeof(src_index_list[0]);
xt_idxlist src_idxlist;
src_idxlist = xt_idxvec_new(src_index_list, src_num_indices);
Xt_idx dst_index_list[] = {1,3,5};
unsigned dst_num_indices = sizeof(dst_index_list) / sizeof(dst_index_list[0]);
xt_idxlist dst_idxlist;
dst_idxlist = xt_idxvec_new(dst_index_list, dst_num_indices);
Xt_xmap xmap;
xmap = xt_xmap_all2all_new(src_idxlist, dst_idxlist, MPI_COMM_WORLD);
xt_idxlist_delete(src_idxlist);
xt_idxlist_delete(dst_idxlist);
Xt_redist redist;
redist = xt_redist_p2p_new(xmap, MPI_DOUBLE);
xt_xmap_delete(xmap);
// generate redist_collection
Xt_redist redist_coll;
MPI_Aint src_displacements[1] = {0};
MPI_Aint dst_displacements[1] = {0};
redist_coll = xt_redist_collection_static_new(&redist, 1, src_displacements,
dst_displacements, MPI_COMM_WORLD);
xt_redist_delete(redist);
// test exchange
double src_data[] = {1,2,3,4,5};
double dst_data[] = {-1,-1,-1};
xt_redist_s_exchange1(redist_coll, (void*)src_data, (void*)dst_data);
double ref_dst_data[] = {1,3,5};
unsigned i;
for (i = 0; i < sizeof(dst_data) / sizeof(dst_data[0]); ++i)
if (ref_dst_data[i] != dst_data[i])
PUT_ERR("error in xt_redist_s_exchange\n");
// clean up
xt_redist_delete(redist_coll);
}
{ // test with one redist used three times (two exchanges)
// set up data
Xt_idx src_index_list[] = {1,2,3,4,5};
unsigned src_num_indices = sizeof(src_index_list) / sizeof(src_index_list[0]);
xt_idxlist src_idxlist;
src_idxlist = xt_idxvec_new(src_index_list, src_num_indices);
Xt_idx dst_index_list[] = {1,3,5};
unsigned dst_num_indices = sizeof(dst_index_list) / sizeof(dst_index_list[0]);
xt_idxlist dst_idxlist;
dst_idxlist = xt_idxvec_new(dst_index_list, dst_num_indices);
Xt_xmap xmap;
xmap = xt_xmap_all2all_new(src_idxlist, dst_idxlist, MPI_COMM_WORLD);
xt_idxlist_delete(src_idxlist);
xt_idxlist_delete(dst_idxlist);
Xt_redist redist;
redist = xt_redist_p2p_new(xmap, MPI_DOUBLE);
xt_xmap_delete(xmap);
// generate redist_collection
Xt_redist redist_coll;
Xt_redist redists[3] = {redist, redist, redist};
double src_data[3][5];
double dst_data[3][3];
MPI_Aint src_displacements[3] = {0,(src_data[1]-src_data[0])*sizeof(double),
(src_data[2]-src_data[0])*sizeof(double)};
MPI_Aint dst_displacements[3] = {0,(dst_data[1]-dst_data[0])*sizeof(double),
(dst_data[2]-dst_data[0])*sizeof(double)};
redist_coll = xt_redist_collection_static_new(redists, 3, src_displacements,
dst_displacements, MPI_COMM_WORLD);
xt_redist_delete(redist);
// test exchange
{
double src_data[3][5] = {{1,2,3,4,5},{6,7,8,9,10},{11,12,13,14,15}};
double dst_data[3][3] = {{-1,-1,-1},{-1,-1,-1},{-1,-1,-1}};
xt_redist_s_exchange1(redist_coll, (void*)src_data, (void*)dst_data);
double ref_dst_data[3][3] = {{1,3,5},{6,8,10},{11,13,15}};
unsigned i, j;
for (i = 0; i < 3; ++i)
for (j = 0; j < 3; ++j)
if (ref_dst_data[i][j] != dst_data[i][j])
PUT_ERR("error in xt_redist_s_exchange\n");
}
{
double src_data[3][5] = {{1,2,3,4,5},{6,7,8,9,10},{11,12,13,14,15}};
double dst_data[3][3] = {{-1,-1,-1},{-1,-1,-1},{-1,-1,-1}};
xt_redist_s_exchange1(redist_coll, (void*)src_data, (void*)dst_data);
double ref_dst_data[3][3] = {{1,3,5},{6,8,10},{11,13,15}};
unsigned i, j;
for (i = 0; i < 3; ++i)
for (j = 0; j < 3; ++j)
if (ref_dst_data[i][j] != dst_data[i][j])
PUT_ERR("error in xt_redist_s_exchange\n");
}
// clean up
xt_redist_delete(redist_coll);
}
MPI_Finalize();
return TEST_EXIT_CODE;
}
#include <stdlib.h>
#include <mpi.h>
#include "tests.h"
#include "yaxt.h"
#include "xt_idxvec.h"
#include "xt_idxsection.h"
#include "xt_redist_p2p.h"
#include "xt_redist_collection_static.h"
#include "xt_idxstripes.h"
#include "xt_idxlist_collection.h"
#include "xt_idxlist.h"
#include "xt_mpi.h"
#include "xt_xmap_all2all.h"
inline int imin(int a, int b) {
return (((a)<(b))?(a):(b));
}
inline int imax(int a, int b) {
return (((a)>(b))?(a):(b));
}