Commit bec9c226 authored by Ralf Mueller's avatar Ralf Mueller
Browse files

[cdo-mapReduce] basic version which can create bounds for subset s of lonlat and icon grid

parent eaaf2476
......@@ -97,11 +97,13 @@ void *MapReduce(void *argument)
int datatype = DATATYPE_INT16;
dtlist_type *dtlist = dtlist_new();
double missval1, missval2;
double *arrayIn = NULL;
field_t ***vars = NULL;
cdoInitialize(argument);
int streamID1 = streamOpenRead(cdoStreamName(0));
int streamID2 = streamOpenWrite(cdoStreamName(1), cdoFiletype());
int vlistID1 = streamInqVlist(streamID1);
int nvars = vlistNvars(vlistID1);
int vlistID2 = vlistDuplicate(vlistID1);
......@@ -114,7 +116,8 @@ void *MapReduce(void *argument)
/* search the number of relevant locations */
tsID = 0; nrecs = 0;
double *inputMaskField = (double*) Malloc(inputGridSize*sizeof(double));
streamReadRecord(streamID1, inputMaskField, &nmiss);
int maskStreamID = streamOpenRead(file_argument_new(operatorArgv()[0]));
streamReadRecord(maskStreamID, inputMaskField, &nmiss);
minmaxval(inputGridSize, inputMaskField, NULL,&missval1, &missval2);
cdoPrint("min: %g | max: %g ",missval1, missval2);
/* count points {{{*/
......@@ -138,6 +141,7 @@ void *MapReduce(void *argument)
}
if (cdoDebug) for (int l = 0; l < maskSize; l++) cdoPrint("maskIndexList[%d] = %d",l,maskIndexList[l]);
streamClose(maskStreamID);
/* }}} */
/* create unstructured output grid */
......@@ -151,12 +155,9 @@ void *MapReduce(void *argument)
vlistDefTaxis(vlistID2, taxisID2);
tsID = 0;
while ( (nrecs = streamInqTimestep(streamID1, tsID)) )
{
tsID++;
}
/* copy the mask to target grid */
streamID2 = streamOpenWrite(cdoStreamName(1), cdoFiletype());
streamDefVlist(streamID2, vlistID2);
vlistChangeGridIndex(vlistID2, 0, outputGridID);
int tsteptype = TSTEP_CONSTANT;
int zaxisID = zaxisCreate(ZAXIS_SURFACE, 1);
......@@ -170,12 +171,39 @@ void *MapReduce(void *argument)
double *values = (double *)Malloc(maskSize*sizeof(double));
for (int i = 0; i < maskSize; i++) values[i] = 1.0;
int streamID2 = streamOpenWrite(cdoStreamName(1), cdoFiletype());
streamDefVlist(streamID2, vlistID2);
streamDefRecord(streamID2, 0, 0);
streamWriteRecord(streamID2, values, 0);
/* loop over all data fields */
double *arrayOut = (double *)Malloc(maskSize*sizeof(double));
tsID = 0;
while ( (nrecs = streamInqTimestep(streamID1, tsID)) )
{
taxisCopyTimestep(taxisID2, taxisID1);
for ( recID = 0; recID < nrecs; recID++ )
{
streamInqRecord(streamID1, &varID, &levelID);
streamReadRecord(streamID1, arrayIn, &nmiss);
gridID = vlistInqVarGrid(vlistID1, varID);
/* skip if size of type do not match with mask grid */
if (inputGridType == gridInqType(gridID) && inputGridSize == gridInqSize(gridID))
{
for (int i = 0; i < maskSize; i++)
arrayOut[i] = arrayIn[maskIndexList[i]];
streamDefRecord(streamID2, varID, levelID);
streamWriteRecord(streamID2, arrayOut, 0);
}
}
tsID++;
}
streamClose(streamID2);
streamClose(streamID1);
......
......@@ -750,8 +750,8 @@ static const char *MapReduceHelp[] = {
" This module contains operators for data reduction based on a user defined mask.",
"",
"OPERATORS",
" reduce Reduce input file variables for locations, where mask is not zero",
" Performs a transformation to an unstructured grid",
" reduce Reduce input file variables for locations, where mask is non-zero.",
" Performs a transformation to an unstructured grid including coordinate bounds",
" unreduce Unpack input fields to given grid description",
" Blowup the input fields towards given grid without changing the values",
"",
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment