Commit 80df8788 authored by Uwe Schulzweida's avatar Uwe Schulzweida
Browse files

Ensstat test task.

parent 13c96bff
......@@ -42,16 +42,17 @@ typedef struct
{
int streamID;
int vlistID;
int nmiss;
double missval;
double *array;
int nmiss[2];
double missval[2];
double *array[2];
} ens_file_t;
typedef struct
{
int varID;
int levelID;
int t;
int varID[2];
int levelID[2];
int vlistID1;
int streamID2;
int nfiles;
......@@ -73,16 +74,17 @@ void *ensstat_func(void *ensarg)
if ( CDO_task ) cdo_omp_set_num_threads(ompNumThreads);
ensstat_arg_t *arg = (ensstat_arg_t*) ensarg;
int t = arg->t;
int nfiles = arg->nfiles;
ens_file_t *ef = arg->ef;
field_type *field = arg->field;
bool lmiss = false;
for ( int fileID = 0; fileID < nfiles; fileID++ ) if ( ef[fileID].nmiss > 0 ) lmiss = true;
for ( int fileID = 0; fileID < nfiles; fileID++ ) if ( ef[fileID].nmiss[t] > 0 ) lmiss = true;
int gridID = vlistInqVarGrid(arg->vlistID1, arg->varID);
int gridID = vlistInqVarGrid(arg->vlistID1, arg->varID[t]);
int gridsize = gridInqSize(gridID);
double missval = vlistInqVarMissval(arg->vlistID1, arg->varID);
double missval = vlistInqVarMissval(arg->vlistID1, arg->varID[t]);
int nmiss = 0;
#if defined(_OPENMP)
......@@ -96,8 +98,8 @@ void *ensstat_func(void *ensarg)
field[ompthID].nmiss = 0;
for ( int fileID = 0; fileID < nfiles; fileID++ )
{
field[ompthID].ptr[fileID] = ef[fileID].array[i];
if ( lmiss && DBL_IS_EQUAL(field[ompthID].ptr[fileID], ef[fileID].missval) )
field[ompthID].ptr[fileID] = ef[fileID].array[t][i];
if ( lmiss && DBL_IS_EQUAL(field[ompthID].ptr[fileID], ef[fileID].missval[t]) )
{
field[ompthID].ptr[fileID] = missval;
field[ompthID].nmiss++;
......@@ -117,19 +119,250 @@ void *ensstat_func(void *ensarg)
if ( arg->count_data ) arg->count2[i] = nfiles - field[ompthID].nmiss;
}
streamDefRecord(arg->streamID2, arg->varID, arg->levelID);
streamDefRecord(arg->streamID2, arg->varID[t], arg->levelID[t]);
streamWriteRecord(arg->streamID2, arg->array2, nmiss);
if ( arg->count_data )
{
streamDefRecord(arg->streamID2, arg->varID+arg->nvars, arg->levelID);
streamDefRecord(arg->streamID2, arg->varID[t]+arg->nvars, arg->levelID[t]);
streamWriteRecord(arg->streamID2, arg->count2, 0);
}
return NULL;
}
//#define TEST_TASK
#ifdef TEST_TASK
void *Ensstat(void *argument)
{
void *task = CDO_task ? cdo_task_new() : NULL;
ensstat_arg_t ensstat_arg;
int nrecs0;
cdoInitialize(argument);
cdoOperatorAdd("ensmin", func_min, 0, NULL);
cdoOperatorAdd("ensmax", func_max, 0, NULL);
cdoOperatorAdd("enssum", func_sum, 0, NULL);
cdoOperatorAdd("ensmean", func_mean, 0, NULL);
cdoOperatorAdd("ensavg", func_avg, 0, NULL);
cdoOperatorAdd("ensstd", func_std, 0, NULL);
cdoOperatorAdd("ensstd1", func_std1, 0, NULL);
cdoOperatorAdd("ensvar", func_var, 0, NULL);
cdoOperatorAdd("ensvar1", func_var1, 0, NULL);
cdoOperatorAdd("enspctl", func_pctl, 0, NULL);
int operatorID = cdoOperatorID();
int operfunc = cdoOperatorF1(operatorID);
bool lpctl = operfunc == func_pctl;
int argc = operatorArgc();
int nargc = argc;
double pn = 0;
if ( operfunc == func_pctl )
{
operatorInputArg("percentile number");
pn = parameter2int(operatorArgv()[0]);
percentile_check_number(pn);
argc--;
}
bool count_data = false;
if ( argc == 1 )
{
if ( strcmp("count", operatorArgv()[nargc-1]) == 0 ) count_data = true;
else cdoAbort("Unknown parameter: >%s<", operatorArgv()[nargc-1]);
}
int nfiles = cdoStreamCnt() - 1;
if ( cdoVerbose ) cdoPrint("Ensemble over %d files.", nfiles);
const char *ofilename = cdoStreamName(nfiles)->args;
if ( !cdoOverwriteMode && fileExists(ofilename) && !userFileOverwrite(ofilename) )
cdoAbort("Outputfile %s already exists!", ofilename);
ens_file_t *ef = (ens_file_t *) Malloc(nfiles*sizeof(ens_file_t));
field_type *field = (field_type *) Malloc(ompNumThreads*sizeof(field_type));
for ( int i = 0; i < ompNumThreads; i++ )
{
field_init(&field[i]);
field[i].size = nfiles;
field[i].ptr = (double*) Malloc(nfiles*sizeof(double));
}
for ( int fileID = 0; fileID < nfiles; fileID++ )
{
ef[fileID].streamID = streamOpenRead(cdoStreamName(fileID));
ef[fileID].vlistID = streamInqVlist(ef[fileID].streamID);
}
/* check that the contents is always the same */
for ( int fileID = 1; fileID < nfiles; fileID++ )
vlistCompare(ef[0].vlistID, ef[fileID].vlistID, CMP_ALL);
int vlistID1 = ef[0].vlistID;
int vlistID2 = vlistDuplicate(vlistID1);
int taxisID1 = vlistInqTaxis(vlistID1);
int taxisID2 = taxisDuplicate(taxisID1);
vlistDefTaxis(vlistID2, taxisID2);
int gridsizemax = vlistGridsizeMax(vlistID1);
for ( int fileID = 0; fileID < nfiles; fileID++ )
{
ef[fileID].array[0] = (double*) Malloc(gridsizemax*sizeof(double));
ef[fileID].array[1] = task ? (double*) Malloc(gridsizemax*sizeof(double)) : NULL;
}
double *array2 = (double *) Malloc(gridsizemax*sizeof(double));
int nvars = vlistNvars(vlistID2);
double *count2 = NULL;
if ( count_data )
{
count2 = (double *) Malloc(gridsizemax*sizeof(double));
for ( int varID = 0; varID < nvars; ++varID )
{
char name[CDI_MAX_NAME];
vlistInqVarName(vlistID2, varID, name);
strcat(name, "_count");
int gridID = vlistInqVarGrid(vlistID2, varID);
int zaxisID = vlistInqVarZaxis(vlistID2, varID);
int tsteptype = vlistInqVarTsteptype(vlistID2, varID);
int cvarID = vlistDefVar(vlistID2, gridID, zaxisID, tsteptype);
vlistDefVarName(vlistID2, cvarID, name);
vlistDefVarDatatype(vlistID2, cvarID, CDI_DATATYPE_INT16);
if ( cvarID != (varID+nvars) ) cdoAbort("Internal error, varIDs do not match!");
}
}
int streamID2 = streamOpenWrite(cdoStreamName(nfiles), cdoFiletype());
streamDefVlist(streamID2, vlistID2);
ensstat_arg.vlistID1 = vlistID1;
ensstat_arg.streamID2 = streamID2;
ensstat_arg.nfiles = nfiles;
ensstat_arg.array2 = array2;
ensstat_arg.count2 = count2;
ensstat_arg.field = field;
ensstat_arg.operfunc = operfunc;
ensstat_arg.pn = pn;
ensstat_arg.lpctl = lpctl;
ensstat_arg.count_data = count_data;
ensstat_arg.nvars = nvars;
ensstat_arg.t = 0;
bool lwarning = false;
bool lerror = false;
int t = 0;
int tsID = 0;
do
{
nrecs0 = streamInqTimestep(ef[0].streamID, tsID);
for ( int fileID = 1; fileID < nfiles; fileID++ )
{
int streamID = ef[fileID].streamID;
int nrecs = streamInqTimestep(streamID, tsID);
if ( nrecs != nrecs0 )
{
if ( nrecs == 0 )
{
lwarning = true;
cdoWarning("Inconsistent ensemble file, too few time steps in %s!", cdoStreamName(fileID)->args);
}
else if ( nrecs0 == 0 )
{
lwarning = true;
cdoWarning("Inconsistent ensemble file, too few time steps in %s!", cdoStreamName(0)->args);
}
else
{
lerror = true;
cdoWarning("Inconsistent ensemble file, number of records at time step %d of %s and %s differ!",
tsID+1, cdoStreamName(0)->args, cdoStreamName(fileID)->args);
}
goto CLEANUP;
}
}
if ( nrecs0 > 0 )
{
taxisCopyTimestep(taxisID2, taxisID1);
streamDefTimestep(streamID2, tsID);
}
for ( int recID = 0; recID < nrecs0; recID++ )
{
int varID = 0, levelID;
for ( int fileID = 0; fileID < nfiles; fileID++ )
{
streamInqRecord(ef[fileID].streamID, &varID, &levelID);
ef[fileID].missval[t] = vlistInqVarMissval(ef[fileID].vlistID, varID);
}
//#pragma omp parallel for default(none) shared(ef, nfiles)
for ( int fileID = 0; fileID < nfiles; fileID++ )
{
streamReadRecord(ef[fileID].streamID, ef[fileID].array[t], &ef[fileID].nmiss[t]);
}
ensstat_arg.ef = ef;
ensstat_arg.varID[t] = varID;
ensstat_arg.levelID[t] = levelID;
if ( task )
{
cdo_task_start(task, ensstat_func, &ensstat_arg);
cdo_task_wait(task);
t = !t;
}
else
{
ensstat_func(&ensstat_arg);
}
}
tsID++;
}
while ( nrecs0 > 0 );
CLEANUP:
if ( lwarning ) cdoWarning("Inconsistent ensemble, processed only the first %d timesteps!", tsID);
if ( lerror ) cdoAbort("Inconsistent ensemble, processed only the first %d timesteps!", tsID);
if ( task ) cdo_task_wait(task);
for ( int fileID = 0; fileID < nfiles; fileID++ )
streamClose(ef[fileID].streamID);
streamClose(streamID2);
for ( int fileID = 0; fileID < nfiles; fileID++ )
{
if ( ef[fileID].array[0] ) Free(ef[fileID].array[0]);
if ( ef[fileID].array[1] ) Free(ef[fileID].array[1]);
}
if ( ef ) Free(ef);
if ( array2 ) Free(array2);
if ( count2 ) Free(count2);
for ( int i = 0; i < ompNumThreads; i++ ) if ( field[i].ptr ) Free(field[i].ptr);
if ( field ) Free(field);
if ( task ) cdo_task_delete(task);
cdoFinish();
return 0;
}
#else
void *Ensstat(void *argument)
{
void *task = CDO_task ? cdo_task_new() : NULL;
......@@ -211,7 +444,10 @@ void *Ensstat(void *argument)
int gridsizemax = vlistGridsizeMax(vlistID1);
for ( int fileID = 0; fileID < nfiles; fileID++ )
ef[fileID].array = (double*) Malloc(gridsizemax*sizeof(double));
{
ef[fileID].array[0] = (double*) Malloc(gridsizemax*sizeof(double));
ef[fileID].array[1] = task ? (double*) Malloc(gridsizemax*sizeof(double)) : NULL;
}
double *array2 = (double *) Malloc(gridsizemax*sizeof(double));
......@@ -250,9 +486,11 @@ void *Ensstat(void *argument)
ensstat_arg.lpctl = lpctl;
ensstat_arg.count_data = count_data;
ensstat_arg.nvars = nvars;
ensstat_arg.t = 0;
bool lwarning = false;
bool lerror = false;
int t = 0;
int tsID = 0;
do
{
......@@ -296,17 +534,17 @@ void *Ensstat(void *argument)
for ( int fileID = 0; fileID < nfiles; fileID++ )
{
streamInqRecord(ef[fileID].streamID, &varID, &levelID);
ef[fileID].missval = vlistInqVarMissval(ef[fileID].vlistID, varID);
ef[fileID].missval[t] = vlistInqVarMissval(ef[fileID].vlistID, varID);
}
//#pragma omp parallel for default(none) shared(ef, nfiles)
for ( int fileID = 0; fileID < nfiles; fileID++ )
{
streamReadRecord(ef[fileID].streamID, ef[fileID].array, &ef[fileID].nmiss);
streamReadRecord(ef[fileID].streamID, ef[fileID].array[t], &ef[fileID].nmiss[t]);
}
ensstat_arg.ef = ef;
ensstat_arg.varID = varID;
ensstat_arg.levelID = levelID;
ensstat_arg.varID[t] = varID;
ensstat_arg.levelID[t] = levelID;
if ( task )
{
cdo_task_start(task, ensstat_func, &ensstat_arg);
......@@ -333,7 +571,10 @@ void *Ensstat(void *argument)
streamClose(streamID2);
for ( int fileID = 0; fileID < nfiles; fileID++ )
if ( ef[fileID].array ) Free(ef[fileID].array);
{
if ( ef[fileID].array[0] ) Free(ef[fileID].array[0]);
if ( ef[fileID].array[1] ) Free(ef[fileID].array[1]);
}
if ( ef ) Free(ef);
if ( array2 ) Free(array2);
......@@ -348,3 +589,4 @@ void *Ensstat(void *argument)
return 0;
}
#endif
......@@ -130,9 +130,11 @@ void *cdo_task_wait(void *task)
{
while (1)
{
if ( IDLE == task_info->state ) break;
pthread_cond_wait(&(task_info->boss_cond), &(task_info->boss_mtx));
if ( IDLE == task_info->state ) break;
// if ( IDLE == task_info->state ) break;
}
}
#endif
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment