Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
mpim-sw
cdo
Commits
def526d5
Commit
def526d5
authored
Mar 21, 2016
by
Ralf Mueller
Browse files
[cdo-mapReduce] first working version, tested with lonlat, gme and icon grid
parent
c482567d
Changes
1
Hide whitespace changes
Inline
Side-by-side
src/MapReduce.c
View file @
def526d5
...
...
@@ -19,8 +19,7 @@
This module contains the following operators:
Pack reduce
Pack unreduce
*/
*/
#if defined(_OPENMP)
# include <omp.h>
...
...
@@ -31,9 +30,27 @@
#include
<cdi.h>
#include
"cdo.h"
#include
"cdo_int.h"
#include
"pstream.h"
#include
"grid.h"
void
read_first_record
(
char
*
filename
,
int
gridSize
,
double
*
field
)
{
int
nmiss
,
varID
,
levelID
;
double
*
missval1
,
*
missval2
;
int
streamID
=
streamOpenRead
(
filename
);
int
nrecs
=
streamInqTimestep
(
streamID
,
0
);
streamInqRecord
(
streamID
,
&
varID
,
&
levelID
);
streamReadRecord
(
streamID
,
field
,
&
nmiss
);
/* minmaxval(gridSize, field, NULL,&missval1, &missval2);
cdoPrint("min: %g | max: %g ",missval1, missval2);
*/
/*for (int l = 0; l < maskSize; l++) cdoPrint("maskIndexList[%d] = %d",l,maskIndexList[l]);
*/
streamClose
(
streamID
);
/* }}} */
}
#include
"pstream.h"
int
matrix2vector
(
int
i
,
int
j
,
int
Ni
,
int
Nj
)
{
return
i
*
Nj
+
j
;
...
...
@@ -76,7 +93,7 @@ void collectLocations(double *maskField, int gridSize, double falseVal, int *mas
k
+=
1
;
}
}
printf
(
" k =%d "
,
k
);
printf
(
" k =%d "
,
k
);
}
/*
...
...
@@ -88,7 +105,7 @@ void *MapReduce(void *argument)
int
gridsize
;
int
nrecs
;
int
tsID
;
int
gridID
,
varID
,
levelID
,
recID
;
int
varID
,
levelID
,
recID
;
int
i
;
int
nts
;
int
nalloc
=
0
;
...
...
@@ -97,8 +114,6 @@ void *MapReduce(void *argument)
int
datatype
=
DATATYPE_INT16
;
dtlist_type
*
dtlist
=
dtlist_new
();
double
missval1
,
missval2
;
double
*
arrayIn
=
NULL
;
field_t
***
vars
=
NULL
;
cdoInitialize
(
argument
);
...
...
@@ -111,35 +126,28 @@ void *MapReduce(void *argument)
/* search the number of relevant locations */
tsID
=
0
;
nrecs
=
0
;
double
*
inputMaskField
=
(
double
*
)
Malloc
(
inputGridSize
*
sizeof
(
double
));
int
maskStreamID
=
streamOpenRead
(
file_argument_new
(
operatorArgv
()[
0
]));
streamReadRecord
(
maskStreamID
,
inputMaskField
,
&
nmiss
);
minmaxval
(
inputGridSize
,
inputMaskField
,
NULL
,
&
missval1
,
&
missval2
);
cdoPrint
(
"min: %g | max: %g "
,
missval1
,
missval2
);
read_first_record
(
operatorArgv
()[
0
],
inputGridSize
,
inputMaskField
);
/* count points {{{*/
int
maskSize
=
countMask
(
inputMaskField
,
inputGridSize
,
0
.
0
);
cdoPrint
(
"maskSize = %d"
,
maskSize
);
/* }}} */
/* collect the original coordinates */
int
*
maskIndexList
=
(
int
*
)
Malloc
(
maskSize
*
sizeof
(
int
));
for
(
int
m
=
0
;
m
<
maskSize
;
m
++
)
maskIndexList
[
m
]
=
-
1
;
/* create an index list of relevant points {{{ */
int
k
=
0
;
for
(
int
i
=
0
;
i
<
inputGridSize
;
i
++
)
{
if
(
!
DBL_IS_EQUAL
(
inputMaskField
[
i
],
0
.
0
))
{
if
(
cdoDebug
)
printf
(
"found at:%d -"
,
i
);
maskIndexList
[
k
]
=
i
;
k
+=
1
;
}
}
int
*
maskIndexList
=
(
int
*
)
Malloc
(
maskSize
*
sizeof
(
int
));
for
(
int
m
=
0
;
m
<
maskSize
;
m
++
)
maskIndexList
[
m
]
=
-
1
;
/* create an index list of relevant points {{{ */
int
k
=
0
;
for
(
int
i
=
0
;
i
<
inputGridSize
;
i
++
)
{
if
(
!
DBL_IS_EQUAL
(
inputMaskField
[
i
],
0
.
0
))
{
if
(
cdoDebug
)
printf
(
"found at:%d -"
,
i
);
maskIndexList
[
k
]
=
i
;
k
+=
1
;
}
}
/*for (int l = 0; l < maskSize; l++) cdoPrint("maskIndexList[%d] = %d",l,maskIndexList[l]);
*/
streamClose
(
maskStreamID
);
/* }}} */
/* create unstructured output grid */
int
outputGridID
=
gridToUnstructuredSelecton
(
inputGridID
,
TRUE
,
maskSize
,
maskIndexList
);
...
...
@@ -148,63 +156,69 @@ void *MapReduce(void *argument)
/* copy time axis */
int
streamID1
=
streamOpenRead
(
cdoStreamName
(
0
));
int
vlistID1
=
streamInqVlist
(
streamID1
);
int
nvars
=
vlistNvars
(
vlistID1
);
int
*
vars
=
(
int
*
)
Malloc
(
nvars
*
sizeof
(
int
));
int
taxisID1
=
vlistInqTaxis
(
vlistID1
);
int
vlistID2
=
vlistDuplicate
(
vlistID1
);
vlistClearFlag
(
vlistID1
);
for
(
varID
=
0
;
varID
<
nvars
;
varID
++
)
{
vars
[
varID
]
=
FALSE
;
int
gridID
=
vlistInqVarGrid
(
vlistID1
,
varID
);
if
(
inputGridType
==
gridInqType
(
gridID
)
&&
inputGridSize
==
gridInqSize
(
gridID
))
{
vars
[
varID
]
=
TRUE
;
int
zaxisID
=
vlistInqVarZaxis
(
vlistID1
,
varID
);
int
nlevs
=
zaxisInqSize
(
zaxisID
);
for
(
int
levID
=
0
;
levID
<
nlevs
;
levID
++
)
{
vlistDefFlag
(
vlistID1
,
varID
,
levID
,
TRUE
);
}
}
}
int
vlistID2
=
vlistCreate
();
vlistCopyFlag
(
vlistID2
,
vlistID1
);
int
taxisID2
=
taxisDuplicate
(
taxisID1
);
vlistDefTaxis
(
vlistID2
,
taxisID2
);
/* copy the mask to target grid */
int
ngrids
=
vlistNgrids
(
vlistID
1
);
int
ngrids
=
vlistNgrids
(
vlistID
2
);
for
(
int
index
=
0
;
index
<
ngrids
;
index
++
)
vlistChangeGridIndex
(
vlistID2
,
index
,
outputGridID
);
/* {{{
varID = vlistDefVar(vlistID2, outputGridID, zaxisID, tsteptype);
vlistDefVarName(vlistID2 , varID , "mask");
vlistDefVarStdname(vlistID2 , varID , "grid_mask");
vlistDefVarLongname(vlistID2, varID , "mask");
vlistDefVarUnits(vlistID2 , varID , "");
double *values = (double *)Malloc(maskSize*sizeof(double));
for (int i = 0; i < maskSize; i++) values[i] = 1.0;
streamDefRecord(streamID2, 0, 0);
streamWriteRecord(streamID2, values, 0);
}}} */
int
streamID2
=
streamOpenWrite
(
cdoStreamName
(
1
),
cdoFiletype
());
streamDefVlist
(
streamID2
,
vlistID2
);
/* loop over all data fields */
double
*
arrayIn
=
(
double
*
)
Malloc
(
inputGridSize
*
sizeof
(
double
));
double
*
arrayOut
=
(
double
*
)
Malloc
(
maskSize
*
sizeof
(
double
));
tsID
=
0
;
int
tsID2
=
0
;
while
(
(
nrecs
=
streamInqTimestep
(
streamID1
,
tsID
))
)
{
taxisCopyTimestep
(
taxisID2
,
taxisID1
);
streamDefTimestep
(
streamID2
,
tsID
2
);
streamDefTimestep
(
streamID2
,
tsID
);
for
(
recID
=
0
;
recID
<
nrecs
;
recID
++
)
{
streamInqRecord
(
streamID1
,
&
varID
,
&
levelID
);
cdoPrint
(
"aaaaaa"
);
streamReadRecord
(
streamID1
,
arrayIn
,
&
nmiss
);
cdoPrint
(
"bbbbbb"
);
if
(
TRUE
==
vars
[
varID
])
{
int
varID2
=
vlistFindVar
(
vlistID2
,
varID
);
int
levelID2
=
vlistFindLevel
(
vlistID2
,
varID
,
levelID
);
gridID
=
vlistInqVarGrid
(
vlistID1
,
varID
);
cdoPrint
(
"aaaaaa"
);
streamReadRecord
(
streamID1
,
arrayIn
,
&
nmiss
);
cdoPrint
(
"bbbbbb"
);
/* skip if size of type do not match with mask grid */
if
(
inputGridType
==
gridInqType
(
gridID
)
&&
inputGridSize
==
gridInqSize
(
gridID
))
{
for
(
int
i
=
0
;
i
<
maskSize
;
i
++
)
arrayOut
[
i
]
=
arrayIn
[
maskIndexList
[
i
]];
minmaxval
(
maskSize
,
arrayOut
,
NULL
,
&
missval1
,
&
missval2
);
cdoPrint
(
"min: %g | max: %g "
,
missval1
,
missval2
);
streamDefRecord
(
streamID2
,
varID
,
levelID
);
streamDefRecord
(
streamID2
,
varID2
,
levelID2
);
streamWriteRecord
(
streamID2
,
arrayOut
,
0
);
}
}
...
...
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment