Skip to content
Snippets Groups Projects
Commit 11bfc103 authored by Thomas Jahns's avatar Thomas Jahns :cartwheel:
Browse files

Extend CDI-PIO writing of data to float.

parent 6c4b06b1
No related branches found
No related tags found
No related merge requests found
......@@ -48,13 +48,23 @@ MPI_Comm cdiPioInit(MPI_Comm commSuper, int confResH, int *pioNamespace);
void pioWriteTimestep(void);
void cdiPioRDMAProgress(void);
void streamWriteVarPart (int streamID, int varID,
const void *data, int nmiss,
Xt_idxlist partDesc);
void streamWriteScatteredVarPart(int streamID, int varID, const void *data,
int numBlocks, const int blocklengths[],
const int displacements[],
int nmiss, Xt_idxlist partDesc);
void streamWriteVarPart(int streamID, int varID,
const double *data, int nmiss,
Xt_idxlist partDesc);
void streamWriteVarPartF(int streamID, int varID,
const float *data, int nmiss,
Xt_idxlist partDesc);
void streamWriteScatteredVarPart(int streamID, int varID, const double *data,
int numBlocks, const int blocklengths[],
const int displacements[],
int nmiss, Xt_idxlist partDesc);
void streamWriteScatteredVarPartF(int streamID, int varID, const float *data,
int numBlocks, const int blocklengths[],
const int displacements[],
int nmiss, Xt_idxlist partDesc);
/* cdiPioCSRLastN: return role codes appropriate to use the last
\textit{nProcsIO} tasks as I/O servers */
int cdiPioCSRLastN(MPI_Comm commSuper, int IOMode, int nProcsIO);
......
......@@ -79,15 +79,23 @@
! streamWriteVarPart
! (INTEGER streamID,
! INTEGER varID,
! CHOICE data,
! DOUBLEPRECISION data,
! INTEGER nmiss,
! TYPE(XT_IDXLIST)partDesc)
EXTERNAL streamWriteVarPart
! streamWriteVarPartF
! (INTEGER streamID,
! INTEGER varID,
! REAL data,
! INTEGER nmiss,
! TYPE(XT_IDXLIST)partDesc)
EXTERNAL streamWriteVarPartF
! streamWriteScatteredVarPart
! (INTEGER streamID,
! INTEGER varID,
! CHOICE data,
! DOUBLEPRECISION data,
! INTEGER numBlocks,
! INTEGER blocklengths(*),
! INTEGER displacements(*),
......@@ -95,6 +103,17 @@
! TYPE(XT_IDXLIST)partDesc)
EXTERNAL streamWriteScatteredVarPart
! streamWriteScatteredVarPartF
! (INTEGER streamID,
! INTEGER varID,
! REAL data,
! INTEGER numBlocks,
! INTEGER blocklengths(*),
! INTEGER displacements(*),
! INTEGER nmiss,
! TYPE(XT_IDXLIST)partDesc)
EXTERNAL streamWriteScatteredVarPartF
INTEGER cdiPioCSRLastN
! (INTEGER commSuper,
! INTEGER IOMode,
......
......@@ -55,16 +55,26 @@ static int cdiPioInit_fwrap(int commSuper, int confResH, int *pioNamespace)
FCALLSCFUN3 (INT, cdiPioInit_fwrap, CDIPIOINIT, cdipioinit, INT, INT, PINT)
FCALLSCSUB0 (pioWriteTimestep, PIOWRITETIMESTEP, piowritetimestep)
FCALLSCSUB0 (cdiPioRDMAProgress, CDIPIORDMAPROGRESS, cdipiordmaprogress)
static void streamWriteVarPart_fwrap(int streamID, int varID, const void *data, int nmiss, void *partDesc)
static void streamWriteVarPart_fwrap(int streamID, int varID, double *data, int nmiss, void *partDesc)
{
streamWriteVarPart(streamID, varID, data, nmiss, (*(Xt_idxlist *)partDesc));
}
FCALLSCSUB5 (streamWriteVarPart_fwrap, STREAMWRITEVARPART, streamwritevarpart, INT, INT, PVOID, INT, PVOID)
static void streamWriteScatteredVarPart_fwrap(int streamID, int varID, const void *data, int numBlocks, const int blocklengths[], const int displacements[], int nmiss, void *partDesc)
FCALLSCSUB5 (streamWriteVarPart_fwrap, STREAMWRITEVARPART, streamwritevarpart, INT, INT, PDOUBLE, INT, PVOID)
static void streamWriteVarPartF_fwrap(int streamID, int varID, float *data, int nmiss, void *partDesc)
{
streamWriteVarPartF(streamID, varID, data, nmiss, (*(Xt_idxlist *)partDesc));
}
FCALLSCSUB5 (streamWriteVarPartF_fwrap, STREAMWRITEVARPARTF, streamwritevarpartf, INT, INT, PFLOAT, INT, PVOID)
static void streamWriteScatteredVarPart_fwrap(int streamID, int varID, double *data, int numBlocks, const int blocklengths[], const int displacements[], int nmiss, void *partDesc)
{
streamWriteScatteredVarPart(streamID, varID, data, numBlocks, blocklengths, displacements, nmiss, (*(Xt_idxlist *)partDesc));
}
FCALLSCSUB8 (streamWriteScatteredVarPart_fwrap, STREAMWRITESCATTEREDVARPART, streamwritescatteredvarpart, INT, INT, PVOID, INT, INTV, INTV, INT, PVOID)
FCALLSCSUB8 (streamWriteScatteredVarPart_fwrap, STREAMWRITESCATTEREDVARPART, streamwritescatteredvarpart, INT, INT, PDOUBLE, INT, INTV, INTV, INT, PVOID)
static void streamWriteScatteredVarPartF_fwrap(int streamID, int varID, float *data, int numBlocks, const int blocklengths[], const int displacements[], int nmiss, void *partDesc)
{
streamWriteScatteredVarPartF(streamID, varID, data, numBlocks, blocklengths, displacements, nmiss, (*(Xt_idxlist *)partDesc));
}
FCALLSCSUB8 (streamWriteScatteredVarPartF_fwrap, STREAMWRITESCATTEREDVARPARTF, streamwritescatteredvarpartf, INT, INT, PFLOAT, INT, INTV, INTV, INT, PVOID)
static int cdiPioCSRLastN_fwrap(int commSuper, int IOMode, int nProcsIO)
{
int v;
......
......@@ -156,9 +156,8 @@ cdiPioClientStreamWriteVarChunk_(int streamID, int varID, int memtype,
const int rect[][2],
const void *data, int nmiss)
{
/* todo: handle transmission of float data */
if (memtype != MEMTYPE_DOUBLE)
Error("Writing of non-double type data not implemented!");
if (memtype != MEMTYPE_DOUBLE && memtype != MEMTYPE_FLOAT)
Error("Writing of type data %d not implemented!", memtype);
int vlistID = streamInqVlist(streamID);
int size = vlistInqVarSize(vlistID, varID),
varShape[3];
......@@ -174,28 +173,10 @@ cdiPioClientStreamWriteVarChunk_(int streamID, int varID, int memtype,
xassert(varSize == size);
Xt_idxlist chunkDesc
= xt_idxsection_new(0, (int)ndims, varShapeXt, chunkShape, origin);
pioBufferPartData(streamID, varID, data, nmiss, chunkDesc);
cdiPioBufferPartData(streamID, varID, memtype, data, nmiss, chunkDesc);
xt_idxlist_delete(chunkDesc);
}
static void
cdiPioClientStreamWriteVarPart(int streamID, int varID, const void *data,
int nmiss, Xt_idxlist partDesc)
{
pioBufferPartData(streamID, varID, data, nmiss, partDesc);
}
static void
cdiPioClientStreamWriteScatteredVarPart(int streamID, int varID,
const void *data,
int numBlocks, const int blocklengths[],
const int displacements[],
int nmiss, Xt_idxlist partDesc)
{
cdiPioBufferPartDataGather(streamID, varID, data, numBlocks,
blocklengths, displacements, nmiss, partDesc);
}
#if defined HAVE_LIBNETCDF
static void
cdiPioCdfDefTimestepNOP(stream_t *streamptr, int tsID)
......@@ -288,9 +269,9 @@ cdiPioClientSetup(int *pioNamespace_, struct cdiPioConf *conf)
namespaceSwitchSet(NSSWITCH_STREAM_WRITE_VAR_CHUNK_,
NSSW_FUNC(cdiPioClientStreamWriteVarChunk_));
namespaceSwitchSet(NSSWITCH_STREAM_WRITE_VAR_PART_,
NSSW_FUNC(cdiPioClientStreamWriteVarPart));
NSSW_FUNC(cdiPioBufferPartData));
namespaceSwitchSet(NSSWITCH_STREAM_WRITE_SCATTERED_VAR_PART_,
NSSW_FUNC(cdiPioClientStreamWriteScatteredVarPart));
NSSW_FUNC(cdiPioBufferPartDataGather));
namespaceSwitchSet(NSSWITCH_STREAM_CLOSE_BACKEND,
NSSW_FUNC(cdiPioClientStreamClose));
namespaceSwitchSet(NSSWITCH_STREAM_DEF_TIMESTEP_,
......
......@@ -200,7 +200,7 @@ modelWinEnqueue(size_t streamIdx,
int offset = header.offset
= (int)roundUpToMultiple((size_t)(txWin[streamIdx].head
- txWin[streamIdx].buffer),
sizeof (double));
PIO_WIN_ALIGN);
MPI_Comm comm = cdiPioInqInterComm();
packFunc((void *)data, txWin[streamIdx].buffer, (int)txWin[streamIdx].size,
&offset, &comm);
......@@ -291,17 +291,19 @@ cdiPioRDMAProgress(void)
static void
pioBufferPartData_(int streamID, int varID,
const void *packData, valPackFunc packDataFunc,
int nmiss, Xt_idxlist partDesc)
cdiPioBufferPartData_(int streamID, int varID,
int memtype, const void *packData,
valPackFunc packDataFunc,
int nmiss, Xt_idxlist partDesc)
{
size_t streamIdx = indexOfID(&openStreams, streamID);
xassert(streamIdx != SIZE_MAX);
xassert(varID >= 0 && varID < streamInqNvars(streamID));
collWaitAll();
int dataHeaderID
= memtype == MEMTYPE_DOUBLE ? DATA_HEADER_DOUBLE : DATA_HEADER_FLOAT;
struct winHeaderEntry dataHeader
= { .id = DATA_HEADER_DOUBLE,
= { .id = dataHeaderID,
.specific.dataRecord = { varID, nmiss }, .offset = -1 };
modelWinEnqueue(streamIdx, dataHeader, packData, packDataFunc);
{
......@@ -315,16 +317,24 @@ pioBufferPartData_(int streamID, int varID,
txWin[streamIdx].refuseFuncCall = 1;
}
static inline size_t
memtype2ElemSize(int memtype)
{
return memtype == MEMTYPE_DOUBLE ? sizeof (double) : sizeof (float);
}
void
pioBufferPartData(int streamID, int varID, const double *data,
int nmiss, Xt_idxlist partDesc)
cdiPioBufferPartData(int streamID, int varID,
int memtype, const void *data,
int nmiss, Xt_idxlist partDesc)
{
int chunk = xt_idxlist_get_num_indices(partDesc);
xassert(chunk <= INT_MAX);
pioBufferPartData_(streamID, varID,
&(struct memCpyDataDesc){data, (size_t)chunk * sizeof (data[0])},
memcpyPackFunc,
nmiss, partDesc);
size_t chunk = (size_t)(xt_idxlist_get_num_indices(partDesc));
size_t elemSize = memtype2ElemSize(memtype);
cdiPioBufferPartData_(streamID, varID, memtype,
&(struct memCpyDataDesc){data, chunk * elemSize},
memcpyPackFunc,
nmiss, partDesc);
}
struct scatterGatherDesc
......@@ -373,22 +383,23 @@ scatterGatherPackFunc(void *dataDesc, void *buf, int size, int *pos,
void
cdiPioBufferPartDataGather(int streamID, int varID, const double *data,
cdiPioBufferPartDataGather(int streamID, int varID,
int memtype, const void *data,
int numBlocks, const int blocklengths[],
const int displacements[],
int nmiss, Xt_idxlist partDesc)
{
xassert(numBlocks >= 0);
pioBufferPartData_(streamID, varID,
&(struct scatterGatherDesc)
{ .data = (void *)data, .blocklengths = blocklengths,
.displacements = displacements,
.elemSize = sizeof (data[0]),
.numBlocks = (unsigned)numBlocks,
.numElems
= (unsigned)xt_idxlist_get_num_indices(partDesc) },
scatterGatherPackFunc,
nmiss, partDesc);
cdiPioBufferPartData_(streamID, varID, memtype,
&(struct scatterGatherDesc)
{ .data = (void *)data, .blocklengths = blocklengths,
.displacements = displacements,
.elemSize = memtype2ElemSize(memtype),
.numBlocks = (unsigned)numBlocks,
.numElems
= (unsigned)xt_idxlist_get_num_indices(partDesc) },
scatterGatherPackFunc,
nmiss, partDesc);
}
......@@ -758,31 +769,48 @@ void pioWriteTimestep(void)
xdebug("%s", "RETURN. messages sent, windows posted");
}
void
streamWriteVarPart(int streamID, int varID, const void *data,
int nmiss, Xt_idxlist partDesc)
static void
streamWriteVarPart_(int streamID, int varID, int memtype, const void *data,
int nmiss, Xt_idxlist partDesc)
{
if ( CDI_Debug ) Message("streamID = %d varID = %d", streamID, varID);
int chunk = xt_idxlist_get_num_indices(partDesc);
xassert(chunk == 0 || data);
void (*myStreamWriteVarPart)(int streamID, int varID, const void *data,
void (*myStreamWriteVarPart)(int streamID, int varID,
int memtype, const void *data,
int nmiss, Xt_idxlist partDesc)
= (void (*)(int, int, const void *, int, Xt_idxlist))
= (void (*)(int, int, int, const void *, int, Xt_idxlist))
namespaceSwitchGet(NSSWITCH_STREAM_WRITE_VAR_PART_).func;
if (!myStreamWriteVarPart)
xabort("local part writing is unsupported!");
myStreamWriteVarPart(streamID, varID, data, nmiss, partDesc);
myStreamWriteVarPart(streamID, varID, memtype, data, nmiss, partDesc);
}
void
streamWriteScatteredVarPart(int streamID, int varID, const void *data,
int numBlocks, const int blocklengths[],
const int displacements[],
int nmiss, Xt_idxlist partDesc)
streamWriteVarPart(int streamID, int varID, const double *data,
int nmiss, Xt_idxlist partDesc)
{
streamWriteVarPart_(streamID, varID, MEMTYPE_DOUBLE, data, nmiss, partDesc);
}
void
streamWriteVarPartF(int streamID, int varID, const float *data,
int nmiss, Xt_idxlist partDesc)
{
streamWriteVarPart_(streamID, varID, MEMTYPE_FLOAT, data, nmiss, partDesc);
}
static void
streamWriteScatteredVarPart_(int streamID, int varID,
int memtype, const void *data,
int numBlocks, const int blocklengths[],
const int displacements[],
int nmiss, Xt_idxlist partDesc)
{
if ( CDI_Debug ) Message("streamID = %d varID = %d", streamID, varID);
......@@ -790,22 +818,45 @@ streamWriteScatteredVarPart(int streamID, int varID, const void *data,
xassert(chunk == 0 || data);
void (*myStreamWriteScatteredVarPart)(int streamID, int varID,
const void *data,
int memtype, const void *data,
int numBlocks, const int blocklengths[],
const int displacements[],
int nmiss, Xt_idxlist partDesc)
= (void (*)(int, int, const void *, int, const int[], const int[], int,
= (void (*)(int, int, int, const void *, int, const int[], const int[], int,
Xt_idxlist))
namespaceSwitchGet(NSSWITCH_STREAM_WRITE_SCATTERED_VAR_PART_).func;
if (!myStreamWriteScatteredVarPart)
xabort("local part writing is unsupported!");
myStreamWriteScatteredVarPart(streamID, varID, data,
myStreamWriteScatteredVarPart(streamID, varID, memtype, data,
numBlocks, blocklengths, displacements,
nmiss, partDesc);
}
void
streamWriteScatteredVarPart(int streamID, int varID, const double *data,
int numBlocks, const int blocklengths[],
const int displacements[],
int nmiss, Xt_idxlist partDesc)
{
streamWriteScatteredVarPart_(streamID, varID, MEMTYPE_DOUBLE, data,
numBlocks, blocklengths, displacements,
nmiss, partDesc);
}
void
streamWriteScatteredVarPartF(int streamID, int varID, const float *data,
int numBlocks, const int blocklengths[],
const int displacements[],
int nmiss, Xt_idxlist partDesc)
{
streamWriteScatteredVarPart_(streamID, varID, MEMTYPE_FLOAT, data,
numBlocks, blocklengths, displacements,
nmiss, partDesc);
}
/*
* Local Variables:
* c-file-style: "Java"
......
......@@ -12,10 +12,11 @@
#include "pio_rpc.h"
void
pioBufferPartData(int streamID, int varID, const double *data,
int nmiss, Xt_idxlist partDesc);
cdiPioBufferPartData(int streamID, int varID, int memtype, const void *data,
int nmiss, Xt_idxlist partDesc);
void
cdiPioBufferPartDataGather(int streamID, int varID, const double *data,
cdiPioBufferPartDataGather(int streamID, int varID,
int memtype, const void *data,
int numBlocks, const int blocklengths[],
const int displacements[],
int nmiss, Xt_idxlist partDesc);
......
......@@ -26,6 +26,7 @@ enum collectorCommandTags {
enum
{
DATA_HEADER_FLOAT = 4,
DATA_HEADER_DOUBLE = 8,
numRPCFuncs = 1,
STREAMDEFTIMESTEP = -1,
......
This diff is collapsed.
......@@ -19,7 +19,7 @@ test_variation()
variation=$1
../libtool --mode=execute \
@MPI_LAUNCH@ \
-n ${mpi_task_num} ${tool_wrap} ./pio_write ${pio_write_args} ${variation}
-n ${mpi_task_num} ${tool_wrap} ./pio_write ${pio_write_args} -s 7 ${variation}
echo "checking example_0.$suffix" >&2
../libtool --mode=execute \
${tool_wrap} ./cksum_read example_0.${suffix} example_0.cksum
......
......@@ -68,11 +68,13 @@ modelRun(struct model_config setup, MPI_Comm comm)
int chunkSize, start;
Xt_idxlist partDesc;
#endif
bool useFloat;
} *varDesc;
int gridID, taxisID, vlistID, tsID, tfID = 0;
enum { nmiss = 0 };
double *levs;
double *var = NULL, *varslice = NULL;
float *varsliceF = NULL;
double mscale, mrscale;
time_t current_time;
int vdate = 19850101, vtime = 120000;
......@@ -80,7 +82,7 @@ modelRun(struct model_config setup, MPI_Comm comm)
char *filename = NULL;
int nlon = setup.nlon, nlat = setup.nlat;
size_t nVars = (size_t)setup.nvars;
size_t varslice_size = 0;
size_t varslice_size = 0, varsliceF_size = 0;
#if USE_MPI
int *chunks = NULL, *displs = NULL, comm_size = 1;
#endif
......@@ -144,15 +146,15 @@ modelRun(struct model_config setup, MPI_Comm comm)
zaxisDefLevels(varDesc[varIdx].zaxisID, levs);
}
if (setup.flags & PIO_WRITE_CONFIG_CREATE_UUID_FLAG)
{
unsigned char uuid[16];
if (rank == 0)
cdiCreateUUID(uuid);
{
unsigned char uuid[16];
if (rank == 0)
cdiCreateUUID(uuid);
#if USE_MPI
MPI_Bcast(uuid, CDI_UUID_SIZE, MPI_UNSIGNED_CHAR, 0, comm);
MPI_Bcast(uuid, CDI_UUID_SIZE, MPI_UNSIGNED_CHAR, 0, comm);
#endif
zaxisDefUUID(varDesc[varIdx].zaxisID, uuid);
}
zaxisDefUUID(varDesc[varIdx].zaxisID, uuid);
}
zaxisIDset:
varDesc[varIdx].id
= vlistDefVar(vlistID, gridID, varDesc[varIdx].zaxisID, TIME_VARIABLE);
......@@ -184,6 +186,7 @@ modelRun(struct model_config setup, MPI_Comm comm)
varDesc[varIdx].code = GRIB_USERDEF + (int)varIdx;
vlistDefVarCode(vlistID, varDesc[varIdx].id, varDesc[varIdx].code);
vlistDefVarDatatype(vlistID, varDesc[varIdx].id, setup.datatype);
varDesc[varIdx].useFloat = (bool)(random()&1);
}
taxisID = taxisCreate ( setup.taxistype );
......@@ -235,6 +238,20 @@ modelRun(struct model_config setup, MPI_Comm comm)
modelRegionCompute(varslice, (size_t)start, chunkSize,
varDesc[varIdx].nlev, nlat, nlon,
tsID, mscale, mrscale);
bool useFloat = varDesc[varIdx].useFloat;
if (useFloat)
{
if (varsliceF_size < chunkSize)
{
varsliceF
= (float *)Realloc(varsliceF,
chunkSize * sizeof (varsliceF[0]));
varsliceF_size = chunkSize;
}
for (size_t i = 0; i < chunkSize; ++i)
varslice[i] = varsliceF[i] = (float)varslice[i];
}
if (setup.flags & PIO_WRITE_CONFIG_CHECKSUM_FLAG)
{
#if USE_MPI
......@@ -267,10 +284,17 @@ modelRun(struct model_config setup, MPI_Comm comm)
}
#ifdef USE_MPI
streamWriteVarPart(streamID, varDesc[varIdx].id, varslice, nmiss,
varDesc[varIdx].partDesc);
if (useFloat)
streamWriteVarPartF(streamID, varDesc[varIdx].id, varsliceF,
nmiss, varDesc[varIdx].partDesc);
else
streamWriteVarPart(streamID, varDesc[varIdx].id, varslice,
nmiss, varDesc[varIdx].partDesc);
#else
streamWriteVar(streamID, varDesc[varIdx].id, varslice, nmiss);
if (useFloat)
streamWriteVarF(streamID, varDesc[varIdx].id, varsliceF, nmiss);
else
streamWriteVar(streamID, varDesc[varIdx].id, varslice, nmiss);
#endif
}
current_time += 86400;
......@@ -288,14 +312,14 @@ modelRun(struct model_config setup, MPI_Comm comm)
perror("failed to open table file");
exit(EXIT_FAILURE);
}
for (size_t i = 0; i < (size_t)nVars; ++i)
for (size_t varIdx = 0; varIdx < nVars; ++varIdx)
{
uint32_t cksum
= memcrc_finish(&varDesc[i].checksum_state,
(off_t)((varDesc[i].size * sizeof (var[0])
= memcrc_finish(&varDesc[varIdx].checksum_state,
(off_t)((varDesc[varIdx].size * sizeof (var[0])
+ sizeof (int) * 2)
* (size_t)setup.nts));
int code = vlistInqVarCode(vlistID, varDesc[i].id);
int code = vlistInqVarCode(vlistID, varDesc[varIdx].id);
if (fprintf(tablefp, "%08lx %d\n", (unsigned long)cksum,
code) < 0)
{
......@@ -314,6 +338,7 @@ modelRun(struct model_config setup, MPI_Comm comm)
pioEndTimestepping ();
#endif
Free(varslice);
Free(varsliceF);
vlistDestroy ( vlistID );
taxisDestroy ( taxisID );
for (size_t varIdx = 0; varIdx < nVars; varIdx++ )
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment