Commit a157f87a authored by Uwe Schulzweida's avatar Uwe Schulzweida
Browse files

Added function streamDefNumWorker().

parent be965313
#if defined (HAVE_CONFIG_H)
# include "../src/config.h"
#ifdef HAVE_CONFIG_H
#include "../src/config.h"
#endif
#include <inttypes.h>
......@@ -50,7 +50,7 @@ static const uint32_t HOST_ENDIANNESS_temp[1] = { UINT32_C(0x00030201) };
extern "C" {
#endif
void cdiDefTableID(int tableID);
#if defined (__cplusplus)
#ifdef __cplusplus
}
#endif
......@@ -79,10 +79,10 @@ void version(void)
const char *typenames[] = { "srv", "ext", "ieg", "grb", "grb2", "nc", "nc2", "nc4", "nc4c", "nc5"};
fprintf(stderr, "CDI version 1.9\n");
#if defined (COMPILER)
#ifdef COMPILER
fprintf(stderr, "C Compiler: %s\n", COMPILER);
#endif
#if defined (COMP_VERSION)
#ifdef COMP_VERSION
fprintf(stderr, "C version: %s\n", COMP_VERSION);
#endif
#ifdef SYSTEM_TYPE
......@@ -90,40 +90,40 @@ void version(void)
#endif
fprintf(stderr, "Features:");
#if defined(HAVE_CF_INTERFACE)
#ifdef HAVE_CF_INTERFACE
fprintf(stderr, " Fortran");
#endif
#if defined (HAVE_LIBPTHREAD)
#ifdef HAVE_LIBPTHREAD
fprintf(stderr, " PTHREADS");
#endif
#if defined (_OPENMP)
#ifdef _OPENMP
fprintf(stderr, " OpenMP");
#endif
#if defined(HAVE_NETCDF4)
#ifdef HAVE_NETCDF4
fprintf(stderr, " NC4");
#if defined(HAVE_NC4HDF5)
#ifdef HAVE_NC4HDF5
fprintf(stderr, "/HDF5");
#if defined(HAVE_NC4HDF5_THREADSAFE)
#ifdef HAVE_NC4HDF5_THREADSAFE
fprintf(stderr, "/threadsafe");
#endif
#endif
#endif
#if defined (HAVE_LIBNC_DAP)
#ifdef HAVE_LIBNC_DAP
fprintf(stderr, " OPeNDAP");
#endif
#if defined (HAVE_LIBSZ)
#ifdef HAVE_LIBSZ
fprintf(stderr, " SZ");
#endif
#if defined (HAVE_LIBJASPER)
#ifdef HAVE_LIBJASPER
fprintf(stderr, " JASPER");
#endif
#if defined (HAVE_LIBPROJ)
#ifdef HAVE_LIBPROJ
fprintf(stderr, " PROJ.4");
#endif
#if defined (HAVE_LIBDRMAA)
#ifdef HAVE_LIBDRMAA
fprintf(stderr, " DRMAA");
#endif
#if defined (HAVE_LIBCURL)
#ifdef HAVE_LIBCURL
fprintf(stderr, " CURL");
#endif
fprintf(stderr, "\n");
......@@ -153,6 +153,7 @@ void version(void)
1.7.0 11 Apr 2008 : added option -z zip for deflate compression of netCDF4 variables
1.7.1 1 Nov 2009 : added option -z jpeg for JPEG compression of GRIB2 records
1.7.2 14 Nov 2012 : added optional compression level -z zip[_1-9]
1.9.0 29 May 2019 : added option -i to set number of input worker
*/
}
......@@ -160,7 +161,6 @@ static
void usage(void)
{
const char *name;
int id;
fprintf(stderr, "usage : %s [Option] [ifile] [ofile]\n", Progname);
......@@ -171,7 +171,7 @@ void usage(void)
fprintf(stderr, " -s give short information if ofile is missing\n");
fprintf(stderr, " -t <table> Parameter table name/file\n");
fprintf(stderr, " Predefined tables: ");
for ( id = 0; id < tableInqNumber(); id++ )
for ( int id = 0; id < tableInqNumber(); id++ )
if ( (name = tableInqNamePtr(id)) )
fprintf(stderr, " %s", name);
fprintf(stderr, "\n");
......@@ -286,6 +286,7 @@ void printInfo(int64_t vdate, int vtime, char *varname, double level,
static const char *tunit2str(int tunits)
{
// clang-format off
if ( tunits == TUNIT_YEAR ) return ("years");
else if ( tunits == TUNIT_MONTH ) return ("months");
else if ( tunits == TUNIT_DAY ) return ("days");
......@@ -298,24 +299,27 @@ static const char *tunit2str(int tunits)
else if ( tunits == TUNIT_MINUTE ) return ("minutes");
else if ( tunits == TUNIT_SECOND ) return ("seconds");
else return ("unknown");
// clang-format on
}
static const char *calendar2str(int calendar)
{
// clang-format off
if ( calendar == CALENDAR_STANDARD ) return ("standard");
else if ( calendar == CALENDAR_PROLEPTIC ) return ("proleptic_gregorian");
else if ( calendar == CALENDAR_360DAYS ) return ("360_day");
else if ( calendar == CALENDAR_365DAYS ) return ("365_day");
else if ( calendar == CALENDAR_366DAYS ) return ("366_day");
else return ("unknown");
// clang-format on
}
static
void limit_string_length(char* string, size_t maxlen)
{
string[maxlen-1] = 0;
size_t len = strlen(string);
const size_t len = strlen(string);
if ( len > 10 )
{
......@@ -393,6 +397,7 @@ void printShortinfo(int streamID, int vlistID, int vardis)
else if ( tsteptype == TSTEP_ACCUM ) fprintf(stdout, "%-8s ", "accum");
else if ( tsteptype == TSTEP_RANGE ) fprintf(stdout, "%-8s ", "range");
else if ( tsteptype == TSTEP_DIFF ) fprintf(stdout, "%-8s ", "diff");
else if ( tsteptype == TSTEP_SUM ) fprintf(stdout, "%-8s ", "sum");
else fprintf(stdout, "%-8s ", "unknown");
if ( nsubtypes > 0 )
......@@ -523,34 +528,20 @@ void setDefaultDataType(char *datatypestr)
enum {D_UINT, D_INT, D_FLT, D_CPX};
int dtype = -1;
if ( *datatypestr == 'i' || *datatypestr == 'I' )
{
dtype = D_INT;
datatypestr++;
}
else if ( *datatypestr == 'u' || *datatypestr == 'U' )
{
dtype = D_UINT;
datatypestr++;
}
else if ( *datatypestr == 'f' || *datatypestr == 'F' )
{
dtype = D_FLT;
datatypestr++;
}
else if ( *datatypestr == 'c' || *datatypestr == 'C' )
{
dtype = D_CPX;
datatypestr++;
}
const int datatype = tolower(*datatypestr);
// clang-format off
if (datatype == 'i') { dtype = D_INT; datatypestr++; }
else if (datatype == 'u') { dtype = D_UINT; datatypestr++; }
else if (datatype == 'f') { dtype = D_FLT; datatypestr++; }
else if (datatype == 'c') { dtype = D_CPX; datatypestr++; }
else if (datatype == 'p') { datatypestr++; }
// clang-format on
if ( isdigit((int) *datatypestr) )
{
nbits = atoi(datatypestr);
if ( nbits < 10 )
datatypestr += 1;
else
datatypestr += 2;
datatypestr += 1;
if (nbits >= 10) datatypestr += 1;
if ( dtype == -1 )
{
......@@ -644,6 +635,7 @@ void setDefaultFileType(char *filetypestr)
{
char *ftstr = filetypestr;
// clang-format off
if ( memcmp(filetypestr, "grb2", 4) == 0 ) { ftstr += 4; DefaultFileType = CDI_FILETYPE_GRB2;}
else if ( memcmp(filetypestr, "grb1", 4) == 0 ) { ftstr += 4; DefaultFileType = CDI_FILETYPE_GRB; }
else if ( memcmp(filetypestr, "grb", 3) == 0 ) { ftstr += 3; DefaultFileType = CDI_FILETYPE_GRB; }
......@@ -662,6 +654,7 @@ void setDefaultFileType(char *filetypestr)
fprintf(stderr, "Available filetypes: grb, grb2, nc, nc2, nc4, nc4c, nc5, srv, ext and ieg\n");
exit(EXIT_FAILURE);
}
// clang-format on
if ( DefaultFileType != CDI_UNDEFID && *ftstr != 0 )
{
......@@ -704,7 +697,7 @@ int handle_error(int cdiErrno, const char *fmt, ...)
static
void defineCompress(const char *arg)
{
size_t len = strlen(arg);
const size_t len = strlen(arg);
if ( strncmp(arg, "szip", len) == 0 )
{
......@@ -747,6 +740,8 @@ int main(int argc, char *argv[])
int itableID = CDI_UNDEFID, otableID = CDI_UNDEFID;
int Info = 1;
int NoInfo = 0;
int numWorkerIn = 0;
int numWorkerOut = 0;
char varname[CDI_MAX_NAME];
char paramstr[32];
......@@ -754,67 +749,34 @@ int main(int argc, char *argv[])
if (Progname == 0) Progname = argv[0];
else Progname++;
while ( (c = getopt(argc, argv, "b:f:t:w:z:cdhlMmnqRrsvVxZ")) != EOF )
// clang-format off
while ( (c = getopt(argc, argv, "b:f:i:o:t:w:z:cdhlMmnqRrsvVxZ")) != EOF )
{
switch (c)
{
case 'b':
setDefaultDataType(optarg);
break;
case 'd':
Debug = 1;
break;
case 'f':
setDefaultFileType(optarg);
break;
case 'h':
usage();
exit (0);
case 'l':
Longinfo = 1;
break;
case 'M':
cdiDefGlobal("HAVE_MISSVAL", 1);
break;
case 'm':
Move = 1;
break;
case 'n':
Info = 0;
NoInfo = 1;
break;
case 'q':
/* Quiet = 1; */
break;
case 'R':
cdiDefGlobal("REGULARGRID", 1);
break;
case 'r':
Record = 1;
break;
case 's':
Shortinfo = 1;
break;
case 't':
rTable = optarg;
break;
case 'v':
Vardis = 1;
break;
case 'V':
Version = 1;
break;
case 'w':
wTable = optarg;
break;
case 'x':
datamode = SP_MODE;
break;
case 'z':
defineCompress(optarg);
break;
case 'b': setDefaultDataType(optarg); break;
case 'd': Debug = 1; break;
case 'f': setDefaultFileType(optarg); break;
case 'h': usage(); exit (0);
case 'i': numWorkerIn = atoi(optarg); break;
case 'o': numWorkerOut = atoi(optarg); break;
case 'l': Longinfo = 1; break;
case 'M': cdiDefGlobal("HAVE_MISSVAL", 1); break;
case 'm': Move = 1; break;
case 'n': Info = 0; NoInfo = 1; break;
case 'q': /* Quiet = 1; */ break;
case 'R': cdiDefGlobal("REGULARGRID", 1); break;
case 'r': Record = 1; break;
case 's': Shortinfo = 1; break;
case 't': rTable = optarg; break;
case 'v': Vardis = 1; break;
case 'V': Version = 1; break;
case 'w': wTable = optarg; break;
case 'x': datamode = SP_MODE; break;
case 'z': defineCompress(optarg); break;
}
}
// clang-format on
if ( optind < argc ) fname1 = argv[optind++];
if ( optind < argc ) fname2 = argv[optind++];
......@@ -840,12 +802,9 @@ int main(int argc, char *argv[])
if ( fname1 )
{
size_t nmiss;
int number;
size_t datasize = 0;
int streamID2 = CDI_UNDEFID;
int filetype;
int gridID, zaxisID;
int param;
int nrecs;
int levelID, levelsize;
int nts = 0;
......@@ -854,19 +813,20 @@ int main(int argc, char *argv[])
int taxisID2 = CDI_UNDEFID;
int vlistID2 = CDI_UNDEFID;
int streamID1 = streamOpenRead(fname1);
const int streamID1 = streamOpenRead(fname1);
if ( streamID1 < 0 ) return handle_error(streamID1, "Open failed on %s", fname1);
int vlistID1 = streamInqVlist(streamID1);
if ( numWorkerIn > 0 ) streamDefNumWorker(streamID1, numWorkerIn);
const int vlistID1 = streamInqVlist(streamID1);
if ( Longinfo ) vlistPrint(vlistID1);
int nvars = vlistNvars(vlistID1);
int taxisID1 = vlistInqTaxis(vlistID1);
int ntsteps = vlistNtsteps(vlistID1);
const int nvars = vlistNvars(vlistID1);
const int taxisID1 = vlistInqTaxis(vlistID1);
const int ntsteps = vlistNtsteps(vlistID1);
if ( Debug )
fprintf(stderr, "nvars = %d\nntsteps = %d\n", nvars, ntsteps);
if ( Debug ) fprintf(stderr, "nvars = %d\nntsteps = %d\n", nvars, ntsteps);
if ( fname2 )
{
......@@ -877,7 +837,7 @@ int main(int argc, char *argv[])
for ( varID = 0; varID < nvars; varID++)
{
gridID = vlistInqVarGrid(vlistID1, varID);
const int gridID = vlistInqVarGrid(vlistID1, varID);
gridsize = gridInqSize(gridID);
if ( gridsize > datasize ) datasize = gridsize;
if ( fname2 )
......@@ -899,6 +859,8 @@ int main(int argc, char *argv[])
if ( streamID2 < 0 )
return handle_error(streamID2, "Open failed on %s", fname2);
if ( numWorkerOut > 0 ) streamDefNumWorker(streamID2, numWorkerOut);
if ( DefaultByteorder != CDI_UNDEFID )
streamDefByteorder(streamID2, DefaultByteorder);
......@@ -938,8 +900,8 @@ int main(int argc, char *argv[])
taxisCopyTimestep(taxisID2, taxisID1);
streamDefTimestep(streamID2, tsID);
}
int64_t vdate = taxisInqVdate(taxisID1);
int vtime = taxisInqVtime(taxisID1);
const int64_t vdate = taxisInqVdate(taxisID1);
const int vtime = taxisInqVtime(taxisID1);
if ( Debug )
fprintf(stdout, "tsID = %d nrecs = %d date = %lld time = %d\n", tsID, nrecs, vdate, vtime);
......@@ -952,10 +914,10 @@ int main(int argc, char *argv[])
streamReadRecord(streamID1, data, &idum);
nmiss = idum;
number = vlistInqVarNumber(vlistID1, varID);
gridID = vlistInqVarGrid(vlistID1, varID);
zaxisID = vlistInqVarZaxis(vlistID1, varID);
param = vlistInqVarParam(vlistID1, varID);
const int number = vlistInqVarNumber(vlistID1, varID);
const int gridID = vlistInqVarGrid(vlistID1, varID);
const int zaxisID = vlistInqVarZaxis(vlistID1, varID);
const int param = vlistInqVarParam(vlistID1, varID);
cdiParamToString(param, paramstr, sizeof(paramstr));
......@@ -966,8 +928,8 @@ int main(int argc, char *argv[])
varID, param, gridID, zaxisID, levelID);
*/
gridsize = gridInqSize(gridID);
double level = zaxisInqLevels(zaxisID, NULL) ? zaxisInqLevel(zaxisID, levelID) : levelID+1;
double missval = vlistInqVarMissval(vlistID1, varID);
const double level = zaxisInqLevels(zaxisID, NULL) ? zaxisInqLevel(zaxisID, levelID) : levelID+1;
const double missval = vlistInqVarMissval(vlistID1, varID);
if ( Info )
printInfo(vdate, vtime, varname, level, gridsize, number, nmiss, missval, data, Vardis);
......@@ -988,10 +950,10 @@ int main(int argc, char *argv[])
{
if ( vlistInqVarTimetype(vlistID1, varID) == TIME_CONSTANT && tsID > 0 ) continue;
number = vlistInqVarNumber(vlistID1, varID);
gridID = vlistInqVarGrid(vlistID1, varID);
zaxisID = vlistInqVarZaxis(vlistID1, varID);
param = vlistInqVarParam(vlistID1, varID);
const int number = vlistInqVarNumber(vlistID1, varID);
const int gridID = vlistInqVarGrid(vlistID1, varID);
const int zaxisID = vlistInqVarZaxis(vlistID1, varID);
const int param = vlistInqVarParam(vlistID1, varID);
cdiParamToString(param, paramstr, sizeof(paramstr));
......@@ -1003,12 +965,12 @@ int main(int argc, char *argv[])
varID, param, gridID, zaxisID);
gridsize = gridInqSize(gridID);
double missval = vlistInqVarMissval(vlistID1, varID);
const double missval = vlistInqVarMissval(vlistID1, varID);
levelsize = zaxisInqSize(zaxisID);
for ( levelID = 0; levelID < levelsize; levelID++ )
{
double level = zaxisInqLevels(zaxisID, NULL) ? zaxisInqLevel(zaxisID, levelID) : levelID+1;
const double level = zaxisInqLevels(zaxisID, NULL) ? zaxisInqLevel(zaxisID, levelID) : levelID+1;
streamReadVarSlice(streamID1, varID, levelID, data, &idum);
nmiss = idum;
......
......@@ -311,54 +311,56 @@ int cdiEncodeTime(int hour, int minute, int second);
int cdiGetFiletype(const char *path, int *byteorder);
/* streamOpenRead: Open a dataset for reading */
// streamOpenRead: Open a dataset for reading
int streamOpenRead(const char *path);
/* streamOpenWrite: Create a new dataset */
// streamOpenWrite: Create a new dataset
int streamOpenWrite(const char *path, int filetype);
int streamOpenAppend(const char *path);
/* streamClose: Close an open dataset */
// streamClose: Close an open dataset
void streamClose(int streamID);
/* streamSync: Synchronize an Open Dataset to Disk */
// streamSync: Synchronize an Open Dataset to Disk
void streamSync(int streamID);
/* streamDefVlist: Define the Vlist for a stream */
void streamDefNumWorker(int streamID, int numWorker);
// streamDefVlist: Define the Vlist for a stream
void streamDefVlist(int streamID, int vlistID);
/* streamInqVlist: Get the Vlist of a stream */
// streamInqVlist: Get the Vlist of a stream
int streamInqVlist(int streamID);
/* streamInqFiletype: Get the filetype */
// streamInqFiletype: Get the filetype
int streamInqFiletype(int streamID);
/* streamDefByteorder: Define the byteorder */
// streamDefByteorder: Define the byteorder
void streamDefByteorder(int streamID, int byteorder);
/* streamInqByteorder: Get the byteorder */
// streamInqByteorder: Get the byteorder
int streamInqByteorder(int streamID);
/* streamDefCompType: Define compression type */
// streamDefCompType: Define compression type
void streamDefCompType(int streamID, int comptype);
/* streamInqCompType: Get compression type */
// streamInqCompType: Get compression type
int streamInqCompType(int streamID);
/* streamDefCompLevel: Define compression level */
// streamDefCompLevel: Define compression level
void streamDefCompLevel(int streamID, int complevel);
/* streamInqCompLevel: Get compression level */
// streamInqCompLevel: Get compression level
int streamInqCompLevel(int streamID);
/* streamDefTimestep: Define time step */
// streamDefTimestep: Define time step
int streamDefTimestep(int streamID, int tsID);
/* streamInqTimestep: Get time step */
// streamInqTimestep: Get time step
int streamInqTimestep(int streamID, int tsID);
/* PIO: query currently set timestep id */
// PIO: query currently set timestep id
int streamInqCurTimestepID(int streamID);
const char *streamFilename(int streamID);
......
......@@ -255,6 +255,7 @@ typedef struct {
int accesstype; // TYPE_REC or TYPE_VAR
int accessmode;
int filetype;
int numWorker;
int byteorder;
int fileID;
int filemode;
......
......@@ -231,6 +231,14 @@ int getByteswap(int byteorder)
return byteswap;
}
void streamDefNumWorker(int streamID, int numWorker)
{
stream_t *streamptr = stream_to_pointer(streamID);
streamptr->numWorker = numWorker;
}
/*
@Function streamDefByteorder
@Title Define the byte order
......@@ -857,6 +865,7 @@ void streamDefaultValue ( stream_t * streamptr )
streamptr->accesstype = CDI_UNDEFID;
streamptr->accessmode = 0;
streamptr->filetype = CDI_FILETYPE_UNDEF;
streamptr->numWorker = 0;
streamptr->byteorder = CDI_UNDEFID;
streamptr->fileID = 0;
streamptr->filemode = 0;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment