From 5d615027c2769615c775145fc37ac0acf4556b6b Mon Sep 17 00:00:00 2001
From: Uwe Schulzweida <uwe.schulzweida@mpimet.mpg.de>
Date: Wed, 9 Nov 2022 13:59:05 +0100
Subject: [PATCH] async_worker: removed syncronization at end of timesteps

---
 ChangeLog      |   4 ++
 src/cdi_int.h  |   2 +-
 src/grb_read.c | 120 ++++++++++++++++++++++++++++++++++---------------
 src/stream.c   |  36 ++++++++-------
 4 files changed, 108 insertions(+), 54 deletions(-)

diff --git a/ChangeLog b/ChangeLog
index 879960a14..18314437f 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,7 @@
+2022-11-09  Uwe Schulzweida
+
+	* async_worker: removed syncronization at end of timesteps
+
 2022-11-05  Uwe Schulzweida
 
 	* NetCDF output: added compression support for data on GRID_GENERIC
diff --git a/src/cdi_int.h b/src/cdi_int.h
index 2b8884a72..cdaff5bf4 100644
--- a/src/cdi_int.h
+++ b/src/cdi_int.h
@@ -298,7 +298,7 @@ typedef struct
   void *gribContainers;
 
   int numWorker;
-  int nextRecID;
+  int nextGlobalRecId;
   int cachedTsID;
   void *jobs;
   void *jobManager;
diff --git a/src/grb_read.c b/src/grb_read.c
index 632abd3df..50aad1856 100644
--- a/src/grb_read.c
+++ b/src/grb_read.c
@@ -105,7 +105,7 @@ grib1_unzip_record(void *gribbuffer, size_t *gribsize)
 
 typedef struct DecodeArgs
 {
-  int recID, *outZip, filetype, memtype, unreduced;
+  int recID, tsID, *outZip, filetype, memtype, unreduced;
   void *cgribexp, *gribbuffer, *data;
   size_t recsize, gridsize, nmiss;
   double missval;
@@ -122,10 +122,9 @@ grb_decode_record(void *untypedArgs)
 }
 
 static DecodeArgs
-grb_read_raw_data(stream_t *streamptr, int recID, int memtype, void *gribbuffer, void *data, bool resetFilePos)
+grb_read_raw_data(stream_t *streamptr, int tsID, int recID, int memtype, void *gribbuffer, void *data, bool resetFilePos)
 {
   const int vlistID = streamptr->vlistID;
-  const int tsID = streamptr->curTsID;  // FIXME: This should be looked up from the given recID
   const int varID = streamptr->tsteps[tsID].records[recID].varID;
   size_t recsize = streamptr->tsteps[tsID].records[recID].size;
 
@@ -163,6 +162,7 @@ grb_read_raw_data(stream_t *streamptr, int recID, int memtype, void *gribbuffer,
 
   return (DecodeArgs){
     .recID = recID,
+    .tsID = tsID,
     .outZip = &streamptr->tsteps[tsID].records[recID].zip,
     .filetype = streamptr->filetype,
     .memtype = memtype,
@@ -180,7 +180,7 @@ grb_read_raw_data(stream_t *streamptr, int recID, int memtype, void *gribbuffer,
 static size_t
 grb_read_and_decode_record(stream_t *streamptr, int recID, int memtype, void *data, bool resetFilePos)
 {
-  DecodeArgs args = grb_read_raw_data(streamptr, recID, memtype, streamptr->record->buffer, data, resetFilePos);
+  DecodeArgs args = grb_read_raw_data(streamptr, streamptr->curTsID, recID, memtype, streamptr->record->buffer, data, resetFilePos);
   grb_decode_record(&args);
   return args.nmiss;
 }
@@ -192,9 +192,9 @@ typedef struct JobDescriptor
 } JobDescriptor;
 
 static void
-JobDescriptor_startJob(AsyncManager *jobManager, JobDescriptor *me, stream_t *streamptr, int recID, int memtype, bool resetFilePos)
+JobDescriptor_startJob(AsyncManager *jobManager, JobDescriptor *me, stream_t *streamptr, int tsID, int recID, int memtype)
 {
-  me->args = grb_read_raw_data(streamptr, recID, memtype, NULL, NULL, resetFilePos);
+  me->args = grb_read_raw_data(streamptr, tsID, recID, memtype, NULL, NULL, false);
   me->job = AsyncWorker_requestWork(jobManager, grb_decode_record, &me->args);
   if (!me->job) xabort("error while trying to send job to worker thread");
 }
@@ -209,70 +209,116 @@ JobDescriptor_finishJob(AsyncManager *jobManager, JobDescriptor *me, void *data,
   Free(me->args.gribbuffer);
   Free(me->args.data);
   me->args.recID = -1;  // mark as inactive
+  me->args.tsID = -1;  // mark as inactive
+}
+/*
+static long
+get_global_recId(stream_t *streamptr, int tsID, int recID)
+{
+  const tsteps_t *tsteps = streamptr->tsteps;
+  long globalRecId = recID;
+  if (tsID > 0) globalRecId += tsteps[0].nrecs;
+  if (tsID > 1) globalRecId += tsteps[1].nrecs * (tsID - 1);
+  return globalRecId;
+}
+*/
+static long
+get_max_global_recs(stream_t *streamptr)
+{
+  long maxGlobalRecs = -1;
+  const long numSteps = streamptr->ntsteps;
+  if (numSteps > 0)
+    {
+      const tsteps_t *tsteps = streamptr->tsteps;
+      maxGlobalRecs = tsteps[0].nrecs;
+      if (numSteps > 1) maxGlobalRecs += tsteps[1].nrecs * (numSteps - 1);
+    }
+  return maxGlobalRecs;
 }
 
 static void
-grb_read_next_record(stream_t *streamptr, int recID, int memtype, void *data, size_t *nmiss, bool resetFilePos)
+get_local_step_and_recId(stream_t *streamptr, long globalRecId, int *tsID, int *recID)
+{
+  int localTsId = 0;
+  const long numSteps = streamptr->ntsteps;
+  const tsteps_t *tsteps = streamptr->tsteps;
+  if (numSteps > 0 && globalRecId >= tsteps[0].nrecs)
+    {
+      localTsId++;
+      globalRecId -= tsteps[0].nrecs;
+    }
+  while (globalRecId >= tsteps[1].nrecs)
+    {
+      localTsId++;
+      globalRecId -= tsteps[1].nrecs;
+    }
+
+  *tsID = localTsId;
+  *recID = globalRecId;
+}
+
+static void
+read_next_record(AsyncManager *jobManager, JobDescriptor *jd, stream_t *streamptr, int memtype)
+{
+  int tsId = -1, recId = -1;
+  get_local_step_and_recId(streamptr, streamptr->nextGlobalRecId, &tsId, &recId);
+  const int xRecId = streamptr->tsteps[tsId].recIDs[recId];
+  JobDescriptor_startJob(jobManager, jd, streamptr, tsId, xRecId, memtype);
+  streamptr->nextGlobalRecId++;
+}
+
+static void
+grb_read_next_record(stream_t *streamptr, int recID, int memtype, void *data, size_t *nmiss)
 {
   bool jobFound = false;
 
   int workerCount = streamptr->numWorker;
   if (workerCount > 0)
     {
-      if (workerCount > streamptr->tsteps[0].nrecs) workerCount = streamptr->tsteps[0].nrecs;
+      const int tsID = streamptr->curTsID;
+      const long maxGlobalRecs = get_max_global_recs(streamptr);
+      //const long globalRecId = get_global_recId(streamptr, streamptr->curTsID, recID);
+      if (maxGlobalRecs == -1) xabort("Internal error: number of timesteps missing!");
+
+      if (workerCount > maxGlobalRecs) workerCount = maxGlobalRecs;
 
       AsyncManager *jobManager = (AsyncManager *) streamptr->jobManager;
       JobDescriptor *jobs = (JobDescriptor *) streamptr->jobs;
 
       // if this is the first call, init and start worker threads
-      tsteps_t *timestep = &streamptr->tsteps[streamptr->curTsID];
 
       if (!jobs)
         {
           jobs = (JobDescriptor *) malloc(workerCount * sizeof(*jobs));
           streamptr->jobs = jobs;
           for (int i = 0; i < workerCount; i++) jobs[i].args.recID = -1;
+          for (int i = 0; i < workerCount; i++) jobs[i].args.tsID = -1;
           if (AsyncWorker_init(&jobManager, workerCount)) xabort("error while trying to start worker threads");
           streamptr->jobManager = jobManager;
-        }
-
-      if (recID == 0) streamptr->nextRecID = 0;
-      if (recID == 0)
-        streamptr->cachedTsID = streamptr->curTsID;  // no active workers -> we may start processing records of a new timestep
 
-      if (streamptr->cachedTsID == streamptr->curTsID)
-        {
           // Start as many new jobs as possible.
-          for (int i = 0; streamptr->nextRecID < timestep->nrecs && i < workerCount; i++)
+          for (int i = 0; streamptr->nextGlobalRecId < maxGlobalRecs && i < workerCount; i++)
             {
               JobDescriptor *jd = &jobs[i];
-              if (jd->args.recID < 0)
-                {
-                  JobDescriptor_startJob(jobManager, jd, streamptr, timestep->recIDs[streamptr->nextRecID++], memtype,
-                                         resetFilePos);
-                }
+              if (jd->args.recID < 0 && jd->args.tsID < 0) read_next_record(jobManager, jd, streamptr, memtype);
             }
+        }
 
-          // search for a job descriptor with the given recID, and use its results if it exists
-          for (int i = 0; !jobFound && i < workerCount; i++)
+      // search for a job descriptor with the given tsID and recID, and use its results if it exists
+      for (int i = 0; !jobFound && i < workerCount; i++)
+        {
+          JobDescriptor *jd = &jobs[i];
+          if (jd->args.recID == recID && jd->args.tsID == tsID)
             {
-              JobDescriptor *jd = &jobs[i];
-              if (jd->args.recID == recID)
-                {
-                  jobFound = true;
-                  JobDescriptor_finishJob(jobManager, jd, data, nmiss);
-                  if (streamptr->nextRecID < timestep->nrecs)
-                    {
-                      JobDescriptor_startJob(jobManager, jd, streamptr, timestep->recIDs[streamptr->nextRecID++], memtype,
-                                             resetFilePos);
-                    }
-                }
+              jobFound = true;
+              JobDescriptor_finishJob(jobManager, jd, data, nmiss);
+              if (streamptr->nextGlobalRecId < maxGlobalRecs) read_next_record(jobManager, jd, streamptr, memtype);
             }
         }
     }
 
   // perform the work synchronously if we didn't start a job for it yet
-  if (!jobFound) *nmiss = grb_read_and_decode_record(streamptr, recID, memtype, data, resetFilePos);
+  if (!jobFound) *nmiss = grb_read_and_decode_record(streamptr, recID, memtype, data, false);
 }
 
 void
@@ -282,7 +328,7 @@ grb_read_record(stream_t *streamptr, int memtype, void *data, size_t *nmiss)
   const int vrecID = streamptr->tsteps[tsID].curRecID;
   const int recID = streamptr->tsteps[tsID].recIDs[vrecID];
 
-  grb_read_next_record(streamptr, recID, memtype, data, nmiss, false);
+  grb_read_next_record(streamptr, recID, memtype, data, nmiss);
 }
 
 void
diff --git a/src/stream.c b/src/stream.c
index a6df254a1..0731812c7 100644
--- a/src/stream.c
+++ b/src/stream.c
@@ -199,6 +199,21 @@ streamDefMaxSteps(int streamID, int maxSteps)
     }
 }
 
+static long
+cdiInqTimeSize(int streamID)
+{
+  stream_t *streamptr = stream_to_pointer(streamID);
+
+  long ntsteps = streamptr->ntsteps;
+  if (ntsteps == (long) CDI_UNDEFID)
+    {
+      int tsID = 0;
+      while (streamInqTimestep(streamID, tsID++)) ntsteps = streamptr->ntsteps;
+    }
+
+  return ntsteps;
+}
+
 void
 streamDefNumWorker(int streamID, int numWorker)
 {
@@ -206,6 +221,10 @@ streamDefNumWorker(int streamID, int numWorker)
     {
       stream_t *streamptr = stream_to_pointer(streamID);
       streamptr->numWorker = numWorker;
+
+      const int filetype = streamptr->filetype;
+      const bool filetypeIsGrib = (filetype == CDI_FILETYPE_GRB || filetype == CDI_FILETYPE_GRB2);
+      if (streamptr->filemode == 'r' && filetypeIsGrib) (void)cdiInqTimeSize(streamID);
     }
 }
 
@@ -320,21 +339,6 @@ streamFilename(int streamID)
   return streamptr->filename;
 }
 
-static long
-cdiInqTimeSize(int streamID)
-{
-  stream_t *streamptr = stream_to_pointer(streamID);
-
-  long ntsteps = streamptr->ntsteps;
-  if (ntsteps == (long) CDI_UNDEFID)
-    {
-      int tsID = 0;
-      while (streamInqTimestep(streamID, tsID++)) ntsteps = streamptr->ntsteps;
-    }
-
-  return ntsteps;
-}
-
 static int
 cdiInqContents(stream_t *streamptr)
 {
@@ -955,7 +959,7 @@ streamDefaultValue(stream_t *streamptr)
   streamptr->gribContainers = NULL;
 
   streamptr->numWorker = 0;
-  streamptr->nextRecID = 0;
+  streamptr->nextGlobalRecId = 0;
   streamptr->cachedTsID = -1;
   streamptr->jobs = NULL;
   streamptr->jobManager = NULL;
-- 
GitLab