Commit 7428c48e authored by Thomas Jahns's avatar Thomas Jahns 🤸
Browse files

Introduce function to write scattered variable parts.

parent 8666603b
......@@ -41,5 +41,9 @@ void cdiPioRDMAProgress();
void streamWriteVarPart (int streamID, int varID,
const void *data, int nmiss,
Xt_idxlist partDesc);
void streamWriteScatteredVarPart(int streamID, int varID, const void *data,
int numBlocks, const int blocklengths[],
const int displacements[],
int nmiss, Xt_idxlist partDesc);
#endif
......@@ -58,3 +58,14 @@
! TYPE(XT_IDXLIST)partDesc)
EXTERNAL streamWriteVarPart
! streamWriteScatteredVarPart
! (INTEGER streamID,
! INTEGER varID,
! CHOICE data,
! INTEGER numBlocks,
! INTEGER blocklengths,
! INTEGER displacements,
! INTEGER nmiss,
! TYPE(XT_IDXLIST)partDesc)
EXTERNAL streamWriteScatteredVarPart
......@@ -45,5 +45,10 @@ static void streamWriteVarPart_fwrap(int streamID, int varID, const void *data,
streamWriteVarPart(streamID, varID, data, nmiss, (*(Xt_idxlist *)partDesc));
}
FCALLSCSUB5 (streamWriteVarPart_fwrap, STREAMWRITEVARPART, streamwritevarpart, INT, INT, PVOID, INT, PVOID)
static void streamWriteScatteredVarPart_fwrap(int streamID, int varID, const void *data, int numBlocks, int blocklengths[], int displacements[], int nmiss, void *partDesc)
{
streamWriteScatteredVarPart(streamID, varID, data, numBlocks, blocklengths, displacements, nmiss, (*(Xt_idxlist *)partDesc));
}
FCALLSCSUB8 (streamWriteScatteredVarPart_fwrap, STREAMWRITESCATTEREDVARPART, streamwritescatteredvarpart, INT, INT, PVOID, INT, INTV, INTV, INT, PVOID)
#endif
......@@ -40,6 +40,7 @@ static int activeNamespace = 0;
{ .func = (void (*)()) cdiStreamWriteVar_ }, \
{ .func = (void (*)()) cdiStreamwriteVarChunk_ }, \
{ .func = (void (*)()) 0 }, \
{ .func = (void (*)()) 0 }, \
{ .func = (void (*)()) cdiStreamCloseDefaultDelegate }, \
{ .func = (void (*)()) cdiStreamDefTimestep_ }, \
{ .func = (void (*)()) cdiStreamSync_ }, \
......
......@@ -34,6 +34,7 @@ enum namespaceSwitch
NSSWITCH_STREAM_WRITE_VAR_,
NSSWITCH_STREAM_WRITE_VAR_CHUNK_,
NSSWITCH_STREAM_WRITE_VAR_PART_,
NSSWITCH_STREAM_WRITE_SCATTERED_VAR_PART_,
NSSWITCH_STREAM_CLOSE_BACKEND,
NSSWITCH_STREAM_DEF_TIMESTEP_,
NSSWITCH_STREAM_SYNC,
......
......@@ -143,6 +143,30 @@ cdiPioClientStreamWriteVarPart(int streamID, int varID, const void *data,
}
}
static void
cdiPioClientStreamWriteScatteredVarPart(int streamID, int varID,
const void *data,
int numBlocks, const int blocklengths[],
const int displacements[],
int nmiss, Xt_idxlist partDesc)
{
switch (namespaceInqResStatus())
{
case STAGE_DEFINITION:
xabort("DEFINITION STAGE: PARALLEL WRITING NOT POSSIBLE.");
break;
case STAGE_TIMELOOP:
cdiPioBufferPartDataGather(streamID, varID, data, numBlocks,
blocklengths, displacements, nmiss, partDesc);
return;
case STAGE_CLEANUP:
xabort("CLEANUP STAGE: PARALLEL WRITING NOT POSSIBLE.");
break;
default:
xabort("INTERNAL ERROR");
}
}
#if defined HAVE_LIBNETCDF
static void
cdiPioCdfDefTimestepNOP(stream_t *streamptr, int tsID)
......@@ -234,6 +258,8 @@ cdiPioClientSetup(int *pioNamespace_, int *pioNamespace)
NSSW_FUNC(cdiPioClientStreamWriteVarChunk_));
namespaceSwitchSet(NSSWITCH_STREAM_WRITE_VAR_PART_,
NSSW_FUNC(cdiPioClientStreamWriteVarPart));
namespaceSwitchSet(NSSWITCH_STREAM_WRITE_SCATTERED_VAR_PART_,
NSSW_FUNC(cdiPioClientStreamWriteScatteredVarPart));
namespaceSwitchSet(NSSWITCH_STREAM_CLOSE_BACKEND,
NSSW_FUNC(cdiPioClientStreamClose));
namespaceSwitchSet(NSSWITCH_STREAM_DEF_TIMESTEP_,
......
......@@ -642,6 +642,57 @@ pioBufferPartData(int streamID, int varID, const double *data,
nmiss, partDesc);
}
struct scatterGatherDesc
{
void *data;
const int *blocklengths, *displacements;
size_t elemSize;
unsigned numBlocks;
};
static void
scatterGatherPackFunc(void *dataDesc, void *buf, int size, int *pos,
void *context)
{
const struct scatterGatherDesc *p = dataDesc;
unsigned numBlocks = p->numBlocks;
const int *bls = p->blocklengths, *disps = p->displacements;
int pos_ = *pos;
unsigned char *dstBuf = buf + pos_, *bufEnd = (unsigned char *)buf + size;
size_t elemSize = p->elemSize;
const unsigned char *data = p->data;
for (unsigned j = 0; j < numBlocks; ++j)
{
int bl = bls[j];
if (bl > 0)
{
size_t bsize = (size_t)bl * elemSize;
xassert(dstBuf + bsize <= bufEnd);
memcpy(dstBuf, data + elemSize * disps[j], bsize);
dstBuf += bsize;
}
}
*pos = (int)(dstBuf - (unsigned char *)buf);
}
void
cdiPioBufferPartDataGather(int streamID, int varID, const double *data,
int numBlocks, const int blocklengths[],
const int displacements[],
int nmiss, Xt_idxlist partDesc)
{
xassert(numBlocks >= 0);
pioBufferPartData_(streamID, varID,
&(struct scatterGatherDesc)
{ .data = (void *)data, .blocklengths = blocklengths,
.displacements = displacements,
.elemSize = sizeof (data[0]), .numBlocks = numBlocks },
scatterGatherPackFunc,
nmiss, partDesc);
}
/************************************************************************/
void pioBufferFuncCall(struct winHeaderEntry header,
......@@ -929,6 +980,34 @@ streamWriteVarPart(int streamID, int varID, const void *data,
myStreamWriteVarPart(streamID, varID, data, nmiss, partDesc);
}
void
streamWriteScatteredVarPart(int streamID, int varID, const void *data,
int numBlocks, const int blocklengths[],
const int displacements[],
int nmiss, Xt_idxlist partDesc)
{
if ( CDI_Debug ) Message("streamID = %d varID = %d", streamID, varID);
int chunk = xt_idxlist_get_num_indices(partDesc);
xassert(chunk == 0 || data);
void (*myStreamWriteScatteredVarPart)(int streamID, int varID,
const void *data,
int numBlocks, const int blocklengths[],
const int displacements[],
int nmiss, Xt_idxlist partDesc)
= (void (*)(int, int, const void *, int, const int[], const int[], int,
Xt_idxlist))
namespaceSwitchGet(NSSWITCH_STREAM_WRITE_SCATTERED_VAR_PART_).func;
if (!myStreamWriteScatteredVarPart)
xabort("local part writing is unsupported!");
myStreamWriteScatteredVarPart(streamID, varID, data,
numBlocks, blocklengths, displacements,
nmiss, partDesc);
}
#endif
......
......@@ -16,6 +16,12 @@
void
pioBufferPartData(int streamID, int varID, const double *data,
int nmiss, Xt_idxlist partDesc);
void
cdiPioBufferPartDataGather(int streamID, int varID, const double *data,
int numBlocks, const int blocklengths[],
const int displacements[],
int nmiss, Xt_idxlist partDesc);
void pioBufferFuncCall(struct winHeaderEntry header,
const void *data, valPackFunc dataPackFunc);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment