Skip to content
Snippets Groups Projects
Commit 62169775 authored by Uwe Schulzweida's avatar Uwe Schulzweida
Browse files

No commit message

No commit message
parent 04b940dc
No related branches found
No related tags found
No related merge requests found
......@@ -33,6 +33,7 @@ typedef struct {
int array_size;
int recID, nrecs;
read_arg_t read_arg;
int shutdown;
int used;
work_t *work;
#if defined (HAVE_LIBPTHREAD)
......@@ -52,10 +53,6 @@ typedef struct {
int recID, nrecs;
read_arg_t read_arg;
iothread_t *iothread;
#if defined (HAVE_LIBPTHREAD)
pthread_t thrID;
pthread_attr_t attr;
#endif
}
par_io_t;
......@@ -73,47 +70,55 @@ void readRecord(void *arg)
nmiss = read_arg->nmiss;
array = read_arg->array;
/* fprintf(stderr, "streamInqRecord: streamID = %d\n", streamID); */
streamInqRecord(streamID, varID, levelID);
fprintf(stderr, "readRecord: streamID = %d %d %d\n", streamID, *varID, *levelID);
streamReadRecord(streamID, array, nmiss);
/* fprintf(stderr, "readRecord: varID %d levelID %d\n", *varID, *levelID); */
// return (NULL);
}
#if defined (HAVE_LIBPTHREAD)
/* This function is the work function of the thread */
void *do_work(void *p)
{
iothread_t *iothread = (iothread_t *) p;
work_t *work; //The q element
while ( 1 )
{
pthread_mutex_lock(&(iothread->lock)); //get the q lock.
pthread_mutex_lock(&(iothread->lock));
while ( iothread->used == 0 ) // if the size is 0 then wait.
while ( iothread->used == 0 )
{
if ( iothread->shutdown )
{
pthread_mutex_unlock(&(iothread->lock));
// pthread_exit(NULL);
return (NULL);
}
// wait until the condition says its no emtpy and give up the lock.
pthread_mutex_unlock(&(iothread->lock)); //get the lock.
pthread_mutex_unlock(&(iothread->lock));
pthread_cond_wait(&(iothread->not_empty), &(iothread->lock));
if ( iothread->shutdown )
{
pthread_mutex_unlock(&(iothread->lock));
// pthread_exit(NULL);
return (NULL);
}
}
work = iothread->work;
(iothread->work->routine) (iothread->work->arg); //actually do work.
free(iothread->work);
iothread->used = 0;
// the q is empty again, now signal that its empty.
// now signal that its empty.
pthread_cond_signal(&(iothread->empty));
pthread_mutex_unlock(&(iothread->lock));
fprintf(stderr, "do_work\n");
(work->routine) (work->arg); //actually do work.
free(work);
}
}
#endif
#if defined (HAVE_LIBPTHREAD)
iothread_t *create_iothread()
{
iothread_t *iothread = NULL;
......@@ -147,6 +152,9 @@ iothread_t *create_iothread()
pthread_attr_init(&(iothread->attr));
pthread_attr_setdetachstate(&(iothread->attr), PTHREAD_CREATE_JOINABLE);
iothread->shutdown = 0;
iothread->used = 0;
//make thread
if ( pthread_create(&(iothread->thrID), &(iothread->attr), do_work, (void *)iothread) )
{
......@@ -156,7 +164,26 @@ iothread_t *create_iothread()
return (iothread);
}
#endif
#if defined (HAVE_LIBPTHREAD)
void check_iothread(iothread_t *iothread)
{
pthread_mutex_lock(&(iothread->lock));
while ( iothread->used == 1 )
{
// wait until the condition says its emtpy and give up the lock.
pthread_mutex_unlock(&(iothread->lock)); //get the lock.
pthread_cond_wait(&(iothread->empty), &(iothread->lock));
}
pthread_mutex_unlock(&(iothread->lock));
}
#endif
#if defined (HAVE_LIBPTHREAD)
typedef void (*dispatch_fn)(void *);
void dispatch(iothread_t *iothread, dispatch_fn dispatch_to_here, void *arg)
......@@ -179,34 +206,46 @@ void dispatch(iothread_t *iothread, dispatch_fn dispatch_to_here, void *arg)
if ( iothread->used == 0 )
{
iothread->work = work;
pthread_cond_signal(&(iothread->not_empty)); //I am not empty.
pthread_cond_signal(&(iothread->not_empty));
}
iothread->used++;
iothread->used = 1;
pthread_mutex_unlock(&(iothread->lock));
}
#endif
#if defined (HAVE_LIBPTHREAD)
void destroy_iothread(iothread_t *iothread)
{
int status;
pthread_mutex_lock(&(iothread->lock));
iothread->shutdown = 1;
pthread_cond_signal(&(iothread->not_empty));
pthread_mutex_unlock(&(iothread->lock));
status = pthread_join(iothread->thrID, NULL);
if ( status > 0 )
{
fprintf(stderr, "pthread_join error!\n");
return;
}
pthread_mutex_destroy(&(iothread->lock));
pthread_cond_destroy(&(iothread->empty));
pthread_cond_destroy(&(iothread->not_empty));
pthread_attr_destroy(&(iothread->attr));
return;
//free(iothread);
}
#endif
void stream_read_record_par(int streamID, int *varID, int *levelID, double *array, int *nmiss, par_io_t *parIO)
{
int lpario = 0;
int recID = 0, nrecs = 0;
#if defined (HAVE_LIBPTHREAD)
pthread_t thrID;
/* pthread_attr_t attr; */
int rval;
#endif
#if defined (HAVE_LIBPTHREAD)
if ( parIO )
......@@ -214,7 +253,6 @@ void stream_read_record_par(int streamID, int *varID, int *levelID, double *arra
lpario = 1;
recID = parIO->recID;
nrecs = parIO->nrecs;
thrID = parIO->thrID;
}
#endif
......@@ -232,48 +270,34 @@ void stream_read_record_par(int streamID, int *varID, int *levelID, double *arra
#if defined (HAVE_LIBPTHREAD)
else
{
/* fprintf(stderr, "parIO1: %ld streamID %d %d %d\n", (long)thrID, streamID, recID, nrecs); */
/*
rval = pthread_join(thrID, NULL);
if ( rval != 0 )
{
fprintf(stderr, "pthread_join failed!\n");
exit(-1);
}
*/
check_iothread(parIO->iothread);
*varID = parIO->varID;
*levelID = parIO->levelID;
*nmiss = parIO->nmiss;
/* fprintf(stderr, "parIO2: %ld streamID %d %d %d\n", (long)thrID, streamID, *varID, *levelID); */
memcpy(array, parIO->array, parIO->array_size*sizeof(double));
}
if ( lpario && nrecs > 1 )
{
read_arg_t *read_arg = &(parIO->read_arg);
if ( (recID+1) < nrecs )
{
if ( recID == 0 )
{
pthread_attr_init(&parIO->attr);
pthread_attr_setdetachstate(&parIO->attr, PTHREAD_CREATE_JOINABLE);
read_arg_t *read_arg = &(parIO->read_arg);
parIO->iothread = create_iothread();
}
if ( recID == 0 ) parIO->iothread = create_iothread();
read_arg->streamID = streamID;
read_arg->varID = &parIO->varID;
read_arg->levelID = &parIO->levelID;
read_arg->nmiss = &parIO->nmiss;
read_arg->array = parIO->array;
sleep(1);
dispatch(parIO->iothread, readRecord, read_arg);
}
else
{
destroy_iothread(parIO->iothread);
pthread_attr_destroy(&parIO->attr);
}
}
#endif
......@@ -291,7 +315,7 @@ int main(int argc, char *argv[])
{
int taxisID, vlistID, varID, levelID, streamID, tsID;
int nmiss, vdate, vtime;
int nrecs, recID;
int nrecs, recID, code;
int gridsize, i;
double *data;
double fmin, fmax, fmean;
......@@ -344,10 +368,12 @@ int main(int argc, char *argv[])
#else
stream_read_record(streamID, &varID, &levelID, data, &nmiss);
#endif
code = vlistInqVarCode(vlistID, varID);
gridsize = gridInqSize(vlistInqVarGrid(vlistID, varID));
fmin = 1.e33;
fmax = -1.e33;
fmean = 0;
// for ( int j = 0; j < 4; ++j )
for ( i = 0; i < gridsize; ++i )
{
if ( data[i] < fmin ) fmin = data[i];
......@@ -355,14 +381,14 @@ int main(int argc, char *argv[])
fmean += data[i];
}
fmean /= gridsize;
fprintf(stderr, "%3d %3d %d %g %g %g\n", varID, levelID, gridsize, fmin, fmean, fmax);
fprintf(stdout, "%3d %3d %3d %d %g %g %g\n",
code, varID, levelID, gridsize, fmin, fmean, fmax);
}
tsID++;
}
//printf("%3d %3d %d %g %g %g\n", varID, levelID, gridsize, fmin, fmean, fmax);
/* Close the input stream */
/* Close the input stream */
streamClose(streamID);
free(data);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment