Commit 302b63a1 authored by Uwe Schulzweida's avatar Uwe Schulzweida

No commit message

No commit message
parent 2ec08b61
......@@ -406,6 +406,8 @@ src/normal.c -text
src/nth_element.c -text
src/nth_element.h -text
src/operator_help.h -text
src/par_io.c -text
src/par_io.h -text
src/percentiles.c -text
src/percentiles.h -text
src/pipe.c -text
......
......@@ -2,7 +2,7 @@
This file is part of CDO. CDO is a collection of Operators to
manipulate and analyse Climate model Data.
Copyright (C) 2003-2008 Uwe Schulzweida, Uwe.Schulzweida@zmaw.de
Copyright (C) 2003-2009 Uwe Schulzweida, Uwe.Schulzweida@zmaw.de
See COPYING file for copying and redistribution conditions.
This program is free software; you can redistribute it and/or modify
......@@ -27,6 +27,7 @@
#include "cdi.h"
#include "cdo.h"
#include "cdo_int.h"
#include "par_io.h"
#include "pstream.h"
void vlistDefVarTime(int vlistID, int varID, int timeID);
......@@ -47,6 +48,7 @@ void *Copy(void *argument)
int taxisID1, taxisID2 = CDI_UNDEFID;
int ntsteps;
double *array = NULL;
par_io_t parIO;
cdoInitialize(argument);
......@@ -93,6 +95,11 @@ void *Copy(void *argument)
gridsize = vlistGridsizeMax(vlistID1);
array = (double *) malloc(gridsize*sizeof(double));
if ( cdoParIO )
{
parIO.array = (double *) malloc(gridsize*sizeof(double));
parIO.array_size = gridsize;
}
}
else
{
......@@ -108,17 +115,28 @@ void *Copy(void *argument)
streamDefTimestep(streamID2, tsID2);
for ( recID = 0; recID < nrecs; recID++ )
{
streamInqRecord(streamID1, &varID, &levelID);
streamDefRecord(streamID2, varID, levelID);
{
if ( lcopy && operatorID == SELALL )
{
streamInqRecord(streamID1, &varID, &levelID);
streamDefRecord(streamID2, varID, levelID);
streamCopyRecord(streamID2, streamID1);
}
else
{
streamReadRecord(streamID1, array, &nmiss);
if ( cdoParIO )
{
parIO.recID = recID; parIO.nrecs = nrecs;
parReadRecord(streamID1, &varID, &levelID, array, &nmiss, &parIO);
fprintf(stderr, "2 streamID %d varID %d levelID %d\n", streamID1, varID, levelID);
}
else
{
streamInqRecord(streamID1, &varID, &levelID);
streamReadRecord(streamID1, array, &nmiss);
}
streamDefRecord(streamID2, varID, levelID);
streamWriteRecord(streamID2, array, nmiss);
}
}
......
......@@ -187,6 +187,8 @@ cdo_SOURCES = Arith.c \
julian.c \
vinterp.c \
zaxis.c \
par_io.c \
par_io.h \
pthread_debug.c \
pthread_debug.h \
color.c \
......
......@@ -107,10 +107,11 @@ am_cdo_OBJECTS = Arith.$(OBJEXT) Arithc.$(OBJEXT) Arithdays.$(OBJEXT) \
table.$(OBJEXT) userlog.$(OBJEXT) util.$(OBJEXT) \
legendre.$(OBJEXT) fourier.$(OBJEXT) specspace.$(OBJEXT) \
readline.$(OBJEXT) julian.$(OBJEXT) vinterp.$(OBJEXT) \
zaxis.$(OBJEXT) pthread_debug.$(OBJEXT) color.$(OBJEXT) \
list.$(OBJEXT) percentiles.$(OBJEXT) nth_element.$(OBJEXT) \
ecacore.$(OBJEXT) ecautil.$(OBJEXT) EcaIndices.$(OBJEXT) \
Hi.$(OBJEXT) Wct.$(OBJEXT) statistic.$(OBJEXT)
zaxis.$(OBJEXT) par_io.$(OBJEXT) pthread_debug.$(OBJEXT) \
color.$(OBJEXT) list.$(OBJEXT) percentiles.$(OBJEXT) \
nth_element.$(OBJEXT) ecacore.$(OBJEXT) ecautil.$(OBJEXT) \
EcaIndices.$(OBJEXT) Hi.$(OBJEXT) Wct.$(OBJEXT) \
statistic.$(OBJEXT)
cdo_OBJECTS = $(am_cdo_OBJECTS)
cdo_LDADD = $(LDADD)
am_cdotest_OBJECTS = cdilib.$(OBJEXT) cdotest.$(OBJEXT)
......@@ -411,6 +412,8 @@ cdo_SOURCES = Arith.c \
julian.c \
vinterp.c \
zaxis.c \
par_io.c \
par_io.h \
pthread_debug.c \
pthread_debug.h \
color.c \
......@@ -730,6 +733,7 @@ distclean-compile:
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/namelist.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/normal.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/nth_element.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/par_io.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/percentiles.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/pipe.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/process.Po@am__quote@
......
......@@ -2,7 +2,7 @@
This file is part of CDO. CDO is a collection of Operators to
manipulate and analyse Climate model Data.
Copyright (C) 2003-2006 Uwe Schulzweida, Uwe.Schulzweida@zmaw.de
Copyright (C) 2003-2009 Uwe Schulzweida, Uwe.Schulzweida@zmaw.de
See COPYING file for copying and redistribution conditions.
This program is free software; you can redistribute it and/or modify
......
......@@ -88,6 +88,7 @@ int cdoVerbose = FALSE;
int cdoDebug = 0;
int cdoCompress = FALSE;
int cdoInteractive = FALSE;
int cdoParIO = FALSE;
int cdoExpMode = -1;
char *cdoExpName = NULL;
......@@ -607,7 +608,7 @@ int main(int argc, char *argv[])
cdoHaveNC4 = have_netCDF4();
while ( (c = cdoGetopt(argc, argv, "f:b:e:P:p:g:i:l:m:t:D:z:aBdhRrsTuVvZ")) != -1 )
while ( (c = cdoGetopt(argc, argv, "f:b:e:P:p:g:i:l:m:t:D:z:aBdhMRrsTuVvZ")) != -1 )
{
switch (c)
{
......@@ -662,6 +663,9 @@ int main(int argc, char *argv[])
case 'm':
cdiDefMissval(atof(cdoOptarg));
break;
case 'M': /* multi threaded I/O */
cdoParIO = TRUE;
break;
case 'P':
numThreads = atoi(cdoOptarg);
break;
......
......@@ -2,7 +2,7 @@
This file is part of CDO. CDO is a collection of Operators to
manipulate and analyse Climate model Data.
Copyright (C) 2003-2006 Uwe Schulzweida, Uwe.Schulzweida@zmaw.de
Copyright (C) 2003-2009 Uwe Schulzweida, Uwe.Schulzweida@zmaw.de
See COPYING file for copying and redistribution conditions.
This program is free software; you can redistribute it and/or modify
......@@ -59,6 +59,7 @@ extern int cdoVerbose;
extern int cdoDebug;
extern int cdoCompress;
extern int cdoInteractive;
extern int cdoParIO;
extern int cdoZtype;
extern int cdoZlevel;
......
......@@ -2,7 +2,7 @@
This file is part of CDO. CDO is a collection of Operators to
manipulate and analyse Climate model Data.
Copyright (C) 2003-2008 Uwe Schulzweida, Uwe.Schulzweida@zmaw.de
Copyright (C) 2003-2009 Uwe Schulzweida, Uwe.Schulzweida@zmaw.de
See COPYING file for copying and redistribution conditions.
This program is free software; you can redistribute it and/or modify
......
......@@ -2,7 +2,7 @@
This file is part of CDO. CDO is a collection of Operators to
manipulate and analyse Climate model Data.
Copyright (C) 2003-2008 Uwe Schulzweida, Uwe.Schulzweida@zmaw.de
Copyright (C) 2003-2009 Uwe Schulzweida, Uwe.Schulzweida@zmaw.de
See COPYING file for copying and redistribution conditions.
This program is free software; you can redistribute it and/or modify
......
#if defined (HAVE_CONFIG_H)
# include "config.h"
#endif
#if defined (HAVE_LIBPTHREAD)
# include <pthread.h>
#endif
#include <string.h> /* memcpy */
#include "cdo.h"
#include "par_io.h"
#include "pstream.h"
typedef struct {
int streamID;
int *varID, *levelID, *nmiss;
double *array;
}
READ_ARG;
void *readRecord(void *arg)
{
int streamID;
int *varID, *levelID, *nmiss;
double *array;
READ_ARG *read_arg = (READ_ARG *) arg;
streamID = read_arg->streamID;
varID = read_arg->varID;
levelID = read_arg->levelID;
nmiss = read_arg->nmiss;
array = read_arg->array;
streamInqRecord(streamID, varID, levelID);
streamReadRecord(streamID, array, nmiss);
fprintf(stderr, "1 varID %d levelID %d\n", *varID, *levelID);
return (NULL);
}
void parReadRecord(int streamID, int *varID, int *levelID, double *array, int *nmiss, par_io_t *parIO)
{
int lpario = FALSE;
READ_ARG read_arg;
void *statusp;
int recID = 0, nrecs = 0;
#if defined (HAVE_LIBPTHREAD)
pthread_t thrID;
/* pthread_attr_t attr; */
int rval;
#endif
read_arg.streamID = streamID;
read_arg.varID = varID;
read_arg.levelID = levelID;
read_arg.nmiss = nmiss;
read_arg.array = array;
#if defined (HAVE_LIBPTHREAD)
if ( parIO )
{
lpario = TRUE;
recID = parIO->recID;
nrecs = parIO->nrecs;
thrID = parIO->thrID;
}
#endif
if ( recID == 0 || lpario == FALSE )
{
statusp = readRecord(&read_arg);
}
#if defined (HAVE_LIBPTHREAD)
else
{
fprintf(stderr, "parIO: %ld streamID %d %d %d\n", (long)thrID, streamID, recID, nrecs);
rval = pthread_join(thrID, &statusp);
if ( rval != 0 ) cdoAbort("pthread_join failed!");
/*
if ( *(int *)statusp < 0 )
cdoAbort("pthread_join failed! (status = %d)", *(int *)statusp);
*/
*varID = parIO->varID;
*levelID = parIO->levelID;
*nmiss = parIO->nmiss;
memcpy(array, parIO->array, parIO->array_size*sizeof(double));
}
if ( lpario && nrecs > 1 )
{
if ( (recID+1) < nrecs )
{
if ( recID == 0 )
{
pthread_attr_init(&parIO->attr);
pthread_attr_setdetachstate(&parIO->attr, PTHREAD_CREATE_JOINABLE);
}
read_arg.streamID = streamID;
read_arg.varID = &parIO->varID;
read_arg.levelID = &parIO->levelID;
read_arg.nmiss = &parIO->nmiss;
read_arg.array = parIO->array;
rval = pthread_create(&thrID, &parIO->attr, readRecord, &read_arg);
if ( rval != 0 ) cdoAbort("pthread_create failed!");
fprintf(stderr, "thrID = %ld\n", (long) thrID);
parIO->thrID = thrID;
}
else
pthread_attr_destroy(&parIO->attr);
}
#endif
}
#ifndef _PAR_IO_H
#define _PAR_IO_H
#if defined (HAVE_CONFIG_H)
# include "config.h"
#endif
#if defined (HAVE_LIBPTHREAD)
# include <pthread.h>
#endif
typedef struct {
int varID, levelID, nmiss;
double *array;
int array_size;
int recID, nrecs;
#if defined (HAVE_LIBPTHREAD)
pthread_t thrID;
pthread_attr_t attr;
#endif
}
par_io_t;
void parReadRecord(int streamID, int *varID, int *levelID, double *array, int *nmiss, par_io_t *parIO);
#endif /* _PAR_IO_H */
......@@ -2,7 +2,7 @@
This file is part of CDO. CDO is a collection of Operators to
manipulate and analyse Climate model Data.
Copyright (C) 2003-2006 Uwe Schulzweida, Uwe.Schulzweida@zmaw.de
Copyright (C) 2003-2009 Uwe Schulzweida, Uwe.Schulzweida@zmaw.de
See COPYING file for copying and redistribution conditions.
This program is free software; you can redistribute it and/or modify
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment