Commit 16a19288 authored by Uwe Schulzweida's avatar Uwe Schulzweida
Browse files

added option -p for parallel read

parent 2c6f82f9
......@@ -96,10 +96,12 @@ void *cdoReadTimestep(void *rarg)
for ( int recID = 0; recID < nrecs; ++recID )
{
streamInqRecord(streamID, &varID, &levelID);
if ( CDO_Memtype == MEMTYPE_FLOAT )
streamReadRecordF(streamID, input_vars[varID][levelID].ptr2, &nmiss);
else
streamReadRecord(streamID, input_vars[varID][levelID].ptr2, &nmiss);
input_vars[varID][levelID].nmiss2 = nmiss;
}
......@@ -111,13 +113,22 @@ void *cdoReadTimestep(void *rarg)
static
void cdoUpdateVars(int nvars, int vlistID, field_t **vars)
{
void *tmp = NULL;
for ( int varID = 0; varID < nvars; varID++ )
{
int nlevels = zaxisInqSize(vlistInqVarZaxis(vlistID, varID));
for ( int levelID = 0; levelID < nlevels; levelID++ )
{
double *tmp = vars[varID][levelID].ptr;
vars[varID][levelID].ptr = vars[varID][levelID].ptr2;
if ( CDO_Memtype == MEMTYPE_FLOAT )
{
tmp = vars[varID][levelID].ptrf;
vars[varID][levelID].ptrf = vars[varID][levelID].ptr2;
}
else
{
tmp = vars[varID][levelID].ptr;
vars[varID][levelID].ptr = vars[varID][levelID].ptr2;
}
vars[varID][levelID].ptr2 = tmp;
vars[varID][levelID].nmiss = vars[varID][levelID].nmiss2;
}
......@@ -259,7 +270,9 @@ void *XTimstat(void *argument)
gridsize = vlistGridsizeMax(vlistID1);
if ( vlistNumber(vlistID1) != CDI_REAL ) gridsize *= 2;
field_t **input_vars = field_malloc(vlistID1, FIELD_PTR | FIELD_PTR2);
int FIELD_MEMTYPE = 0;
if ( CDO_Memtype == MEMTYPE_FLOAT ) FIELD_MEMTYPE = FIELD_FLT;
field_t **input_vars = field_malloc(vlistID1, FIELD_PTR | FIELD_PTR2 | FIELD_MEMTYPE);
field_t **vars1 = field_malloc(vlistID1, FIELD_PTR);
field_t **samp1 = field_malloc(vlistID1, FIELD_NONE);
field_t **vars2 = NULL;
......@@ -269,7 +282,7 @@ void *XTimstat(void *argument)
readarg.streamID = streamID1;
readarg.vars = input_vars;
int lparallelread = TRUE;
int lparallelread = CDO_Parallel_Read;
int ltsfirst = TRUE;
void *read_task = NULL;
void *readresult = NULL;
......@@ -308,17 +321,8 @@ void *XTimstat(void *argument)
if ( ltsfirst || lparallelread == FALSE )
{
ltsfirst = FALSE;
if ( lparallelread )
{
cdo_task_start(read_task, cdoReadTimestep, &readarg);
readresult = cdo_task_wait(read_task);
}
else
{
readresult = cdoReadTimestep(&readarg);
}
}
readresult = cdoReadTimestep(&readarg);
}
else
{
readresult = cdo_task_wait(read_task);
......
......@@ -1057,7 +1057,7 @@ int parse_options_long(int argc, char *argv[])
lgridsearchradius = 0;
lremap_genweights = 0;
c = cdo_getopt_long(argc, argv, "f:b:e:P:p:g:i:k:l:m:n:t:D:z:aBCcdhLMOQRrsSTuVvWXZ", opt_long, NULL);
c = cdo_getopt_long(argc, argv, "f:b:e:P:g:i:k:l:m:n:t:D:z:aBCcdhLMOpQRrsSTuVvWXZ", opt_long, NULL);
if ( c == -1 ) break;
switch (c)
......@@ -1191,8 +1191,7 @@ int parse_options_long(int argc, char *argv[])
numThreads = atoi(CDO_optarg);
break;
case 'p':
fprintf(stderr, "CDO option -p is not available anymore, please use -b <bits>!\n");
//setDefaultDataTypeByte(CDO_optarg);
CDO_Parallel_Read = TRUE;
break;
case 'Q':
cdiDefGlobal("SORTNAME", TRUE);
......
......@@ -25,7 +25,8 @@ enum field_flag {
FIELD_PTR = 2,
FIELD_WGT = 4,
FIELD_PTR2 = 8,
FIELD_ALL = FIELD_PTR | FIELD_WGT
FIELD_FLT = 16,
FIELD_ALL = FIELD_PTR | FIELD_WGT
};
......@@ -61,10 +62,12 @@ typedef struct {
int nsamp;
int nmiss;
int nmiss2;
int memtype;
double missval;
double *weight;
double *ptr;
double *ptr2;
float *ptrf;
void *ptr2;
}
field_t;
......
......@@ -53,6 +53,7 @@ void farcpy(field_t *field1, field_t field2)
const int nmiss2 = field2.nmiss;
const double missval2 = field2.missval;
double *array2 = field2.ptr;
float *array2f = field2.ptrf;
if ( nwpv != 2 ) nwpv = 1;
......@@ -61,8 +62,10 @@ void farcpy(field_t *field1, field_t field2)
if ( len != (size_t) (nwpv*gridInqSize(grid2)) )
cdoAbort("Fields have different gridsize (%s)", __func__);
for ( i = 0; i < len; i++ )
array1[i] = array2[i];
if ( field2.memtype == MEMTYPE_FLOAT )
for ( i = 0; i < len; i++ ) array1[i] = array2f[i];
else
for ( i = 0; i < len; i++ ) array1[i] = array2[i];
}
static
......@@ -155,6 +158,7 @@ void farsum(field_t *field1, field_t field2)
const int nmiss2 = field2.nmiss;
const double missval2 = field2.missval;
double *array2 = field2.ptr;
float *array2f = field2.ptrf;
if ( nwpv != 2 ) nwpv = 1;
......@@ -180,7 +184,10 @@ void farsum(field_t *field1, field_t field2)
}
else
{
arradd(len, array1, array2);
if ( field2.memtype == MEMTYPE_FLOAT )
for ( size_t i = 0; i < len; i++ ) array1[i] += array2f[i];
else
arradd(len, array1, array2);
}
}
......
......@@ -3,6 +3,7 @@
#include <cdi.h>
#include <cdo.h>
#include <cdo_int.h>
#include "dmemory.h"
#include "field.h"
#include "util.h"
......@@ -40,6 +41,7 @@ field_t **field_allocate(int vlistID, int ptype, int init)
field[varID][levelID].nsamp = 0;
field[varID][levelID].nmiss = 0;
field[varID][levelID].nmiss2 = 0;
if ( ptype & FIELD_FLT ) field[varID][levelID].memtype = MEMTYPE_FLOAT;
field[varID][levelID].missval = missval;
field[varID][levelID].ptr = NULL;
field[varID][levelID].ptr2 = NULL;
......@@ -47,15 +49,31 @@ field_t **field_allocate(int vlistID, int ptype, int init)
if ( ptype & FIELD_PTR )
{
field[varID][levelID].ptr = (double*) Malloc(nwpv*gridsize*sizeof(double));
if ( init ) memset(field[varID][levelID].ptr, 0, nwpv*gridsize*sizeof(double));
}
if ( ptype & FIELD_FLT )
{
field[varID][levelID].ptrf = (float*) Malloc(nwpv*gridsize*sizeof(float));
if ( init ) memset(field[varID][levelID].ptrf, 0, nwpv*gridsize*sizeof(float));
}
else
{
field[varID][levelID].ptr = (double*) Malloc(nwpv*gridsize*sizeof(double));
if ( init ) memset(field[varID][levelID].ptr, 0, nwpv*gridsize*sizeof(double));
}
}
if ( ptype & FIELD_PTR2 )
{
field[varID][levelID].ptr2 = (double*) Malloc(nwpv*gridsize*sizeof(double));
if ( init ) memset(field[varID][levelID].ptr2, 0, nwpv*gridsize*sizeof(double));
}
if ( ptype & FIELD_FLT )
{
field[varID][levelID].ptr2 = Malloc(nwpv*gridsize*sizeof(float));
if ( init ) memset(field[varID][levelID].ptr2, 0, nwpv*gridsize*sizeof(float));
}
else
{
field[varID][levelID].ptr2 = Malloc(nwpv*gridsize*sizeof(double));
if ( init ) memset(field[varID][levelID].ptr2, 0, nwpv*gridsize*sizeof(double));
}
}
if ( ptype & FIELD_WGT )
{
......@@ -90,6 +108,7 @@ void field_free(field_t **field, int vlistID)
for ( int levelID = 0; levelID < nlevel; ++levelID )
{
if ( field[varID][levelID].ptr ) Free(field[varID][levelID].ptr);
if ( field[varID][levelID].ptrf ) Free(field[varID][levelID].ptrf);
if ( field[varID][levelID].ptr2 ) Free(field[varID][levelID].ptr2);
if ( field[varID][levelID].weight ) Free(field[varID][levelID].weight);
}
......
......@@ -86,6 +86,7 @@ int CDO_CMOR_Mode = FALSE;
int cdoDiag = FALSE;
int CDO_Memtype = MEMTYPE_DOUBLE;
int CDO_Parallel_Read = FALSE;
int CDO_Reduce_Dim = FALSE;
int CDO_Append_History = TRUE;
......
......@@ -46,6 +46,7 @@ extern char *Progname;
extern char *cdoGridSearchDir;
extern int CDO_Reduce_Dim;
extern int CDO_Memtype;
extern int CDO_Parallel_Read;
extern int CDO_Append_History;
extern int CDO_Reset_History;
extern int timer_read, timer_write; // refactor: both pstream.c and CDIread.c CDIwrite.c defined in cdo.c
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment