Skip to content
Snippets Groups Projects
Commit 8ada9747 authored by Thomas Jahns's avatar Thomas Jahns :cartwheel:
Browse files

exchangers: Add buffered and synchronous mode to irecv_send exchanger.

parent af08fcc1
No related branches found
No related tags found
1 merge request!31Draft: Attempt at fuller exchanger interface
......@@ -169,10 +169,13 @@ xt_config_get_exchange_new_by_comm(Xt_config config, MPI_Comm comm);
(((flags) >> xt_exch_send_comm_mode_bit_ofs) & 3U))
#define XT_CONFIG_GET_EXCH_SEND_COMM_MODE(config) \
(XT_CONFIG_FLAGS_GET_EXCH_SEND_COMM_MODE((config)->flags))
#define XT_CONFIG_FLAGS_SET_EXCH_SEND_COMM_MODE(flags, v) \
((flags) & ~(uint32_t)xt_exch_send_comm_mode_mask) \
| (uint32_t)(((v)&3) << xt_exch_send_comm_mode_bit_ofs)
#define XT_CONFIG_SET_EXCH_SEND_COMM_MODE(config, v) \
do { (config)->flags \
= ((config)->flags & ~(uint32_t)xt_exch_send_comm_mode_mask) \
| (uint32_t)(((v)&3) << xt_exch_send_comm_mode_bit_ofs); } \
do { Xt_config c = (config); \
c->flags \
= XT_CONFIG_FLAGS_SET_EXCH_SEND_COMM_MODE(c->flags, (v)); } \
while (0)
#define XT_CONFIG_BUCKET_DESTROY(config, bucket_gen_state) \
......
......@@ -52,6 +52,11 @@
#include <omp.h>
#endif
#if defined OMPI_MAJOR_VERSION && \
OMPI_MAJOR_VERSION == 4 && OMPI_MINOR_VERSION == 0 && OMPI_RELEASE_VERSION <= 1
# include <pthread.h>
# include <stdio.h>
#endif
#include "core/ppm_xfuncs.h"
#include "xt/xt_mpi.h"
#include "xt_exchanger.h"
......@@ -137,12 +142,14 @@ xt_exchanger_irecv_send_init(void)
static void
xt_exchanger_irecv_send_s_exchange__(const void *src_data, void *dst_data,
int nsend, int nrecv,
const struct Xt_msg_param *send_msgs,
const struct Xt_msg_param *recv_msgs,
int tag_offset, MPI_Comm comm,
MPI_Request *recv_request)
int nsend, int nrecv,
const struct Xt_msg_param *send_msgs,
const struct Xt_msg_param *recv_msgs,
Xt_exchanger_simple_base base,
MPI_Request *recv_request)
{
int tag_offset = base->tag_offset;
MPI_Comm comm = base->comm;
for (int i = 0; i < nrecv; ++i) {
void *dst_data_ = (unsigned char *)dst_data + recv_msgs[i].displacement;
xt_mpi_call(MPI_Irecv(dst_data_, recv_msgs[i].count, recv_msgs[i].datatype,
......@@ -150,13 +157,18 @@ xt_exchanger_irecv_send_s_exchange__(const void *src_data, void *dst_data,
tag_offset + xt_mpi_tag_exchange_msg, comm,
recv_request+i), comm);
}
typedef int (*ifp)(const void *buf, int count, MPI_Datatype datatype,
int dest, int tag, MPI_Comm comm);
int send_mode
= XT_CONFIG_FLAGS_GET_EXCH_SEND_COMM_MODE(base->config_flags);
ifp send_op = (ifp)xt_send_func_tab[1][send_mode];
for (int i = 0; i < nsend; ++i) {
const void *src_data_
= (const unsigned char *)src_data + send_msgs[i].displacement;
xt_mpi_call(MPI_Send(CAST_MPI_SEND_BUF(src_data_), send_msgs[i].count,
send_msgs[i].datatype,
send_msgs[i].rank,
tag_offset + xt_mpi_tag_exchange_msg, comm),
xt_mpi_call(send_op(CAST_MPI_SEND_BUF(src_data_), send_msgs[i].count,
send_msgs[i].datatype,
send_msgs[i].rank,
tag_offset + xt_mpi_tag_exchange_msg, comm),
comm);
}
xt_mpi_call(MPI_Waitall(nrecv, recv_request, MPI_STATUSES_IGNORE), comm);
......@@ -176,7 +188,7 @@ xt_exchanger_irecv_send_s_exchange(
src_data, dst_data, nsend, nrecv,
xt_exchanger_simple_base_get_send_msg(base),
xt_exchanger_simple_base_get_recv_msg(base),
base->tag_offset, base->comm, requests);
base, requests);
free(requests);
}
......@@ -187,7 +199,7 @@ xt_exchanger_irecv_send_s_exchange_mt(const void *src_data, void *dst_data,
int nsend, int nrecv,
const struct Xt_msg_param *send_msgs,
const struct Xt_msg_param *recv_msgs,
int tag_offset, MPI_Comm comm,
Xt_exchanger_simple_base base,
Xt_exchanger_thread_share shared_req)
{
MPI_Request *recv_request
......@@ -203,7 +215,7 @@ xt_exchanger_irecv_send_s_exchange_mt(const void *src_data, void *dst_data,
nsend_, nrecv_,
send_msgs+start_send,
recv_msgs+start_recv,
tag_offset, comm,
base,
recv_request+start_recv);
}
......@@ -225,7 +237,7 @@ xt_exchanger_irecv_send_s_exchange_omp(
src_data, dst_data, nsend, nrecv,
xt_exchanger_simple_base_get_send_msg(base),
xt_exchanger_simple_base_get_recv_msg(base),
base->tag_offset, base->comm, shared_req);
base, shared_req);
free(shared_req);
}
......@@ -251,6 +263,23 @@ xt_exchanger_irecv_send_destroy_thread_share(
free(thread_share);
}
#if defined OMPI_MAJOR_VERSION && \
OMPI_MAJOR_VERSION == 4 && OMPI_MINOR_VERSION == 0 && OMPI_RELEASE_VERSION <= 1
static void
xt_ompi_ssend_diag(void)
{
fputs("warning: with OpenMPI 4.0 and 4.0.1 a deadlock may occur\n"
"when MPI_Ssend is called and the destination rank also does after\n"
"posting a matching MPI_Irecv.\n"
"For this reason, the requested combination of a synchronous mode,\n"
"blocking exchanger is not fulfilled and standard mode will be used.\n"
"*********************************\n"
"PLEASE CONSIDER UPDATING YOUR MPI INSTALLATION, IF POSSIBLE\n"
"*********************************\n",
stderr);
}
#endif
Xt_exchanger
xt_exchanger_irecv_send_new(int nsend, int nrecv,
const struct Xt_msg_param * send_msgs,
......@@ -267,6 +296,28 @@ xt_exchanger_irecv_send_new(int nsend, int nrecv,
xt_exchanger_simple_base_init(
exchanger, nsend, nrecv, send_msgs, recv_msgs,
comm, tag_offset, config);
#if defined OMPI_MAJOR_VERSION && \
OMPI_MAJOR_VERSION == 4 && OMPI_MINOR_VERSION == 0 && OMPI_RELEASE_VERSION <= 1
/*
* OpenMPI 4.0 and 4.0.1 seem to have a bug in not making progress
* when MPI_Ssend is called and the destination rank also does after
* posting a matching MPI_Irecv. Hence prohibit creation of
* synchronous exchanger here.
*/
int config_flags = config->flags, send_mode
= XT_CONFIG_FLAGS_GET_EXCH_SEND_COMM_MODE(config_flags);
if (send_mode == xt_exch_send_synchronous) {
exchanger->config_flags
= XT_CONFIG_FLAGS_SET_EXCH_SEND_COMM_MODE(config_flags,
xt_exch_send_standard);
int rank;
xt_mpi_call(MPI_Comm_rank(comm, &rank), comm);
if (rank == 0) {
static pthread_once_t ssend_diag_control = PTHREAD_ONCE_INIT;
pthread_once(&ssend_diag_control, xt_ompi_ssend_diag);
}
}
#endif
exchanger->vtable = xt_exchanger_irecv_send_vtable+mthread_mode;
return (Xt_exchanger)exchanger;
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment