Skip to content
Snippets Groups Projects
Commit 71c4f0b2 authored by Thomas Jahns's avatar Thomas Jahns :cartwheel:
Browse files

exchangers: Add buffered and synchronous mode to packed irecv_isend exchanger.

parent 8ada9747
No related branches found
No related tags found
1 merge request!31Draft: Attempt at fuller exchanger interface
......@@ -201,9 +201,11 @@ start_packed_transfer(unsigned char *buffer,
int nsend, int nrecv,
const struct Xt_msg_param *send_msgs,
const struct Xt_msg_param *recv_msgs,
MPI_Comm comm, int tag_offset,
Xt_exchanger_simple_base base,
MPI_Request *requests)
{
MPI_Comm comm = base->comm;
int tag_offset = base->tag_offset;
for (int i = 0; i < nrecv; ++i) {
int recv_size = (int)(buf_ofs[i+1] - buf_ofs[i]);
xt_mpi_call(MPI_Irecv(buffer + buf_ofs[i], recv_size, MPI_PACKED,
......@@ -211,6 +213,11 @@ start_packed_transfer(unsigned char *buffer,
tag_offset + xt_mpi_tag_exchange_msg, comm,
requests+i), comm);
}
typedef int (*ifp)(const void *buf, int count, MPI_Datatype datatype,
int dest, int tag, MPI_Comm comm, MPI_Request *request);
int send_mode
= XT_CONFIG_FLAGS_GET_EXCH_SEND_COMM_MODE(base->config_flags);
ifp a_send = (ifp)xt_send_func_tab[0][send_mode];
for (int i = 0; i < nsend; ++i) {
int position = 0;
int buf_size = (int)(buf_ofs[send_start+i+1] - buf_ofs[send_start+i]);
......@@ -220,10 +227,10 @@ start_packed_transfer(unsigned char *buffer,
send_msgs[i].datatype,
buffer + buf_ofs[send_start+i], buf_size, &position,
comm), comm);
xt_mpi_call(MPI_Isend(buffer + buf_ofs[send_start+i], position, MPI_PACKED,
send_msgs[i].rank,
tag_offset + xt_mpi_tag_exchange_msg, comm,
requests+nrecv+i), comm);
xt_mpi_call(a_send(buffer + buf_ofs[send_start+i], position, MPI_PACKED,
send_msgs[i].rank,
tag_offset + xt_mpi_tag_exchange_msg, comm,
requests+nrecv+i), comm);
}
}
......@@ -261,8 +268,7 @@ xt_exchanger_irecv_isend_packed_s_exchange(
start_packed_transfer(buffer, nrecv, buf_ofs,
src_data, nsend, nrecv,
send_msgs, recv_msgs,
base->comm, base->tag_offset,
requests);
base, requests);
xt_mpi_call(MPI_Waitall(nrecv + nsend, requests, MPI_STATUSES_IGNORE),
base->comm);
......@@ -326,8 +332,7 @@ xt_exchanger_irecv_isend_packed_s_exchange_omp(
buf_ofs+start_recv,
src_data, nsend_, nrecv_,
send_msgs+start_send, recv_msgs+start_recv,
base->comm, base->tag_offset,
requests+start_req);
base, requests+start_req);
xt_mpi_call(MPI_Waitall(nreq, requests+start_req, MPI_STATUSES_IGNORE),
base->comm);
......@@ -503,8 +508,7 @@ xt_exchanger_irecv_isend_packed_a_exchange(
nrecv, buf_ofs,
src_data, nsend, nrecv,
send_msgs, recv_msgs,
base->comm, base->tag_offset,
tmp_requests);
base, tmp_requests);
size_t *buf_ofs_ = header->buf_ofs;
for (int i = 0; i <= nrecv; ++i) {
......@@ -611,8 +615,7 @@ xt_exchanger_irecv_isend_packed_a_exchange_omp(
buf_ofs+start_recv,
src_data, nsend_, nrecv_,
send_msgs+start_send, recv_msgs+start_recv,
base->comm, base->tag_offset,
prequests+start_req);
base, prequests+start_req);
size_t *buf_ofs_ = header->buf_ofs;
for (int i = start_recv; i < end_recv+(end_recv==nrecv); ++i) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment