pio_server.c 15.6 KB
Newer Older
Deike Kleberg's avatar
Deike Kleberg committed
1
2
/** @file ioServer.c
*/
3
4
5
6
7
8
#ifdef HAVE_CONFIG_H
#  include "config.h"
#endif

#ifdef USE_MPI

Deike Kleberg's avatar
Deike Kleberg committed
9
10
11
12
13
14
15
#include "pio_server.h"


#include <stdlib.h>
#include <stdio.h>
#include "limits.h"
#include "cdi.h"
Deike Kleberg's avatar
Deike Kleberg committed
16
#include "pio.h"
Deike Kleberg's avatar
Deike Kleberg committed
17
#include "pio_comm.h"
Deike Kleberg's avatar
Deike Kleberg committed
18
#include "pio_rpc.h"
Deike Kleberg's avatar
Deike Kleberg committed
19
#include "pio_util.h"
20
21
#include "stream_int.h"
#include "resource_handle.h"
Deike Kleberg's avatar
Deike Kleberg committed
22
#include "vlist_var.h"
23
24

extern resOps streamOps;
25
extern void arrayDestroy ( void );
Deike Kleberg's avatar
Deike Kleberg committed
26

27
28
29
static struct
{
  size_t size;
30
  unsigned char *buffer, *head;
31
32
} *rxWin = NULL;

Thomas Jahns's avatar
Thomas Jahns committed
33
static MPI_Win getWin = MPI_WIN_NULL;
Thomas Jahns's avatar
Thomas Jahns committed
34
static MPI_Group groupModel = MPI_GROUP_NULL;
Deike Kleberg's avatar
Deike Kleberg committed
35
36


Deike Kleberg's avatar
Deike Kleberg committed
37
38
/************************************************************************/

39
static
Deike Kleberg's avatar
Deike Kleberg committed
40
41
42
void serverWinCleanup ()
{
  int i;
43
  int nProcsCalc = commInqNProcsModel ();
Deike Kleberg's avatar
Deike Kleberg committed
44
  
Deike Kleberg's avatar
Deike Kleberg committed
45
46
47
  if ( getWin != MPI_WIN_NULL )
    xmpi ( MPI_Win_free ( &getWin ));
  
48
49
  if (rxWin)
    {
Deike Kleberg's avatar
Deike Kleberg committed
50
      for ( i = 0; i < nProcsCalc; i++ )
51
52
        free(rxWin[i].buffer);
      free(rxWin);
Deike Kleberg's avatar
Deike Kleberg committed
53
    }
54

55
  xdebug("%s", "cleaned up mpi_win");
Deike Kleberg's avatar
Deike Kleberg committed
56
57
58
}
 
 /************************************************************************/
59

Deike Kleberg's avatar
Deike Kleberg committed
60
static 
Deike Kleberg's avatar
Deike Kleberg committed
61
  void collDefBufferSizes ()
Deike Kleberg's avatar
Deike Kleberg committed
62
{
63
  int nstreams, * streamIndexList, streamNo, vlistID, nvars, varID, iorank;
Deike Kleberg's avatar
Deike Kleberg committed
64
  int modelID, decoChunk, sumGetBufferSizes = 0;
65
  int rankGlob = commInqRankGlob ();
Deike Kleberg's avatar
Deike Kleberg committed
66
  int nProcsModel = commInqNProcsModel ();
67
  int root = commInqRootGlob ();
Deike Kleberg's avatar
Deike Kleberg committed
68

69
  xassert(rxWin != NULL);
Deike Kleberg's avatar
Deike Kleberg committed
70

Deike Kleberg's avatar
Deike Kleberg committed
71
  nstreams = reshCountType ( &streamOps );
72
73
  streamIndexList = xmalloc ( nstreams * sizeof ( streamIndexList[0] ));
  reshGetResHListOfType ( nstreams, streamIndexList, &streamOps );
Deike Kleberg's avatar
Deike Kleberg committed
74
75
  for ( streamNo = 0; streamNo < nstreams; streamNo++ )
    {
76
      // space required for data
77
      vlistID = streamInqVlist ( streamIndexList[streamNo] );
Deike Kleberg's avatar
Deike Kleberg committed
78
79
80
81
      nvars = vlistNvars ( vlistID );
      for ( varID = 0; varID < nvars; varID++ )
        {
          iorank = vlistInqVarIOrank ( vlistID, varID );
Deike Kleberg's avatar
Deike Kleberg committed
82
          xassert ( iorank != CDI_UNDEFID );
Deike Kleberg's avatar
Deike Kleberg committed
83
84
          if ( iorank == rankGlob )
            {
Deike Kleberg's avatar
Deike Kleberg committed
85
              for ( modelID = 0; modelID < nProcsModel; modelID++ )
86
                {
87
                  decoChunk =  vlistInqVarDecoChunk ( vlistID, varID, modelID );
Deike Kleberg's avatar
Deike Kleberg committed
88
                  xassert ( decoChunk > 0 );
89
90
                  rxWin[modelID].size += decoChunk * sizeof (double)
                    + winBufferOverheadChunk * sizeof (int);
91
                }
Deike Kleberg's avatar
Deike Kleberg committed
92
            }
93
    }
Deike Kleberg's avatar
Deike Kleberg committed
94
95
      // space required for the 3 function calls streamOpen, streamDefVlist, streamClose 
      // once per stream and timestep for all collprocs only on the modelproc root
96
97
      rxWin[root].size += 3 * winBufferOverheadFuncCall * sizeof (int)
        + 5 * sizeof (int) + MAXDATAFILENAME;
Deike Kleberg's avatar
Deike Kleberg committed
98
    }
99
  free ( streamIndexList );
Deike Kleberg's avatar
Deike Kleberg committed
100
101

  for ( modelID = 0; modelID < nProcsModel; modelID++ )
102
    {
103
104
      rxWin[modelID].size += winBufferOverhead * sizeof (int);
      sumGetBufferSizes += rxWin[modelID].size;
105
    }
Deike Kleberg's avatar
Deike Kleberg committed
106
  xassert ( sumGetBufferSizes <= MAXWINBUFFERSIZE );
107
  /* xprintArray ( "getBufferSize", getBufferSize, nProcsModel, DATATYPE_INT ); */
Deike Kleberg's avatar
Deike Kleberg committed
108
}
109

Deike Kleberg's avatar
Deike Kleberg committed
110
 /************************************************************************/
111
112

static 
Deike Kleberg's avatar
Deike Kleberg committed
113
114
 void serverWinCreate ()
{ 
Deike Kleberg's avatar
Deike Kleberg committed
115
  int ranks[1], modelID;
116
  MPI_Comm commCalc = commInqCommCalc ();
Deike Kleberg's avatar
Deike Kleberg committed
117
  MPI_Group groupCalc;
118
  int nProcsModel = commInqNProcsModel ();
Deike Kleberg's avatar
Deike Kleberg committed
119

Deike Kleberg's avatar
Deike Kleberg committed
120
  xmpi ( MPI_Win_create ( MPI_BOTTOM, 0, 1, MPI_INFO_NULL,
121
                          commCalc, &getWin ));
Deike Kleberg's avatar
Deike Kleberg committed
122
123

  /* target group */
124
125
  ranks[0] = nProcsModel;
  xmpi ( MPI_Comm_group ( commCalc, &groupCalc ));
Deike Kleberg's avatar
Deike Kleberg committed
126
127
  xmpi ( MPI_Group_excl ( groupCalc, 1, ranks, &groupModel ));

128
  rxWin = xmalloc(nProcsModel * sizeof (rxWin[0]));
Deike Kleberg's avatar
Deike Kleberg committed
129
  collDefBufferSizes ();
130
  /* xprintArray ( "getBufferSizes", getBufferSize, nProcsModel, DATATYPE_INT ); */
Deike Kleberg's avatar
Deike Kleberg committed
131

Deike Kleberg's avatar
Deike Kleberg committed
132
133
  for ( modelID = 0; modelID < nProcsModel; modelID++ )
    {
134
      rxWin[modelID].buffer = xmalloc(rxWin[modelID].size);
135
      rxWin[modelID].head = rxWin[modelID].buffer;
Deike Kleberg's avatar
Deike Kleberg committed
136
    }
Deike Kleberg's avatar
Deike Kleberg committed
137

138
  xdebug("%s", "created mpi_win, allocated getBuffer");
Deike Kleberg's avatar
Deike Kleberg committed
139
140
}

Deike Kleberg's avatar
Deike Kleberg committed
141
142
/************************************************************************/

143
static
Deike Kleberg's avatar
Deike Kleberg committed
144
145
  void getBufferGetFromEnd ( const char * caller, int line,  
                             int ID, void * argBuffer, size_t size )
146
{
147
  if (rxWin == NULL ||
148
149
150
151
152
153
154
155
156
157
      argBuffer     == NULL ||
      size           < 0    ||
      ID             < 0    ||
      ID             >= commInqNProcsModel () ||
      rxWin[ID].head - rxWin[ID].buffer + size > rxWin[ID].size)
    xabort("caller: %s, line %d, ID = %d, nProcsModel=%d,"
           " size = %lu, rxWin[%d].head = %ld, rxWin[%d].size = %lu",
           caller, line, ID, (unsigned long)size, ID,
           commInqNProcsModel(), rxWin[ID].head - rxWin[ID].buffer,
           ID, (unsigned long)rxWin[ID].size);
158
159
  memcpy ( argBuffer, rxWin[ID].head, size );
  rxWin[ID].head += size;
Deike Kleberg's avatar
Deike Kleberg committed
160
161
162
163
}

/************************************************************************/

Deike Kleberg's avatar
Deike Kleberg committed
164
165
166
167
168
169
170
171
static
  void readFuncCall ( void )
{
  int funcID, tokenID;
  int root = commInqRootGlob ();

  getBufferGetFromEnd ( __func__, __LINE__, 
                        root, &funcID, sizeof ( funcID ));
Deike Kleberg's avatar
Deike Kleberg committed
172
  xassert ( funcID >= MINFUNCID && funcID <= MAXFUNCID );
Deike Kleberg's avatar
Deike Kleberg committed
173
174
175
  
  switch ( funcID )
    {
176
177
178
179
180
181
182
    case STREAMCLOSE:
      {
        int streamID;

        getBufferGetFromEnd ( __func__, __LINE__, 
                              root, &streamID, sizeof ( streamID ));
        streamClose ( streamID );
183
184
185
        xdebug ( "READ FUNCTION CALL FROM WIN:  %s, streamID=%d,"
                 " closed stream", 
                 funcMap[funcID], streamID );
186
187
      }
      break;
Deike Kleberg's avatar
Deike Kleberg committed
188
    case STREAMOPEN:
189
      {
190
        char *filename;
191
        size_t filenamesz;
192
        int filetype, streamID;
193
194
195

        getBufferGetFromEnd ( __func__, __LINE__, 
                              root, &filenamesz, sizeof ( filenamesz ));
Deike Kleberg's avatar
Deike Kleberg committed
196
        xassert ( filenamesz > 0 && filenamesz < MAXDATAFILENAME );
197
        filename = xmalloc(filenamesz + 1);
198
199
        getBufferGetFromEnd ( __func__, __LINE__, 
                              root, filename, filenamesz );
200
        filename[filenamesz] = '\0';
201
202
        getBufferGetFromEnd ( __func__, __LINE__, 
                              root, &filetype, sizeof ( filetype ));
203
        xassert ( filetype >= MINFILETYPE && filetype <= MAXFILETYPE );
204
        streamID = streamOpenWrite ( filename, filetype );
205
206
207
208
209
        xdebug("READ FUNCTION CALL FROM WIN:  %s, filenamesz=%zu,"
               " filename=%s, filetype=%d, OPENED STREAM %d",
               funcMap[funcID], filenamesz, filename,
               filetype, streamID);
        free(filename);
210
      }
Deike Kleberg's avatar
Deike Kleberg committed
211
      break; 
212
213
214
215
216
217
218
219
220
221
222
223
224
225
    case STREAMDEFVLIST:
      {
        int streamID, vlistID;

        getBufferGetFromEnd ( __func__, __LINE__, 
                              root, &streamID, sizeof ( vlistID ));
        getBufferGetFromEnd ( __func__, __LINE__, 
                              root, &vlistID, sizeof ( vlistID ));
        streamDefVlist ( streamID, vlistID );
        xdebug ( "READ FUNCTION CALL FROM WIN:  %s, streamID=%d,"
                 " vlistID=%d, called streamDefVlist ().", 
                 funcMap[funcID], streamID, vlistID );
      }
      break;
Deike Kleberg's avatar
Deike Kleberg committed
226
    default:
227
      xabort ( "REMOTE FUNCTIONCALL NOT IMPLEMENTED!" );
Deike Kleberg's avatar
Deike Kleberg committed
228
229
230
    }
  getBufferGetFromEnd ( __func__, __LINE__, 
                        root, &tokenID, sizeof ( tokenID ));
Deike Kleberg's avatar
Deike Kleberg committed
231
  xassert ( tokenID == SEPARATOR );
Deike Kleberg's avatar
Deike Kleberg committed
232
233
234
235
}

/************************************************************************/

Deike Kleberg's avatar
Deike Kleberg committed
236
237
238
239
240
241
static
void readGetBuffers ( int tsID, int vdate, int vtime )
{
  int modelID;
  double * data = NULL, * dataHead = NULL;
  int streamID = CDI_UNDEFID, streamIDNew = CDI_UNDEFID;
242
243
  int varID, vlistID = CDI_UNDEFID, taxisID;
  int size, chunk;
Deike Kleberg's avatar
Deike Kleberg committed
244
  int tokenID, tokenID2;
245
246
247
  
  int nmiss = 0;
  char text[1024];
248
  int nProcsModel = commInqNProcsModel ();
Deike Kleberg's avatar
Deike Kleberg committed
249
  int root        = commInqRootGlob ();
250
  
251
  xdebug("%s", "START");
Deike Kleberg's avatar
Deike Kleberg committed
252
253
254
  
  getBufferGetFromEnd ( __func__, __LINE__, 
                        root, &tokenID, sizeof ( tokenID ));
255

Deike Kleberg's avatar
Deike Kleberg committed
256
  while ( tokenID != END )
257
    {
Deike Kleberg's avatar
Deike Kleberg committed
258
      switch ( tokenID )
259
        {
260
        case DATATOKEN:
Deike Kleberg's avatar
Deike Kleberg committed
261
262
263
          getBufferGetFromEnd ( __func__, __LINE__, 
                                root, &streamIDNew, sizeof ( streamID ));
          if ( streamIDNew != streamID )
264
            {
Deike Kleberg's avatar
Deike Kleberg committed
265
              streamID = streamIDNew;
266
267
              vlistID = streamInqVlist ( streamID );
              taxisID = vlistInqTaxis ( vlistID );
268
              taxisDefVdate ( taxisID, vdate );
269
270
271
              taxisDefVtime ( taxisID, vtime );
              streamDefTimestep ( streamID, tsID );
            }
Thomas Jahns's avatar
Thomas Jahns committed
272
          getBufferGetFromEnd(__func__, __LINE__, root, &varID, sizeof (varID));
273
274
275
276
          size = vlistInqVarSize ( vlistID, varID );
          data = xmalloc ( size * sizeof ( double ));
          dataHead = data;
          
Deike Kleberg's avatar
Deike Kleberg committed
277
          for ( modelID = 0; modelID < nProcsModel; modelID++ )
278
            {
Deike Kleberg's avatar
Deike Kleberg committed
279
              if ( modelID != root )
280
                {
281
                  int tempID;
Deike Kleberg's avatar
Deike Kleberg committed
282
283
                  getBufferGetFromEnd ( __func__, __LINE__, 
                                        modelID, &tempID, sizeof ( tempID ));
Deike Kleberg's avatar
Deike Kleberg committed
284
                  xassert ( tempID == DATATOKEN );
Deike Kleberg's avatar
Deike Kleberg committed
285
286
                  getBufferGetFromEnd ( __func__, __LINE__, 
                                        modelID, &tempID, sizeof ( tempID ));
Deike Kleberg's avatar
Deike Kleberg committed
287
                  xassert ( tempID == streamID );
Deike Kleberg's avatar
Deike Kleberg committed
288
289
                  getBufferGetFromEnd ( __func__, __LINE__, 
                                        modelID, &tempID, sizeof ( tempID ));
Deike Kleberg's avatar
Deike Kleberg committed
290
                  xassert ( tempID == varID );
291
                }
292
              chunk  = vlistInqVarDecoChunk ( vlistID, varID, modelID );
Deike Kleberg's avatar
Deike Kleberg committed
293
294
              getBufferGetFromEnd ( __func__, __LINE__, 
                                    modelID, dataHead, chunk * sizeof ( double ));
295
              dataHead += chunk;
Deike Kleberg's avatar
Deike Kleberg committed
296
297
              getBufferGetFromEnd ( __func__, __LINE__, 
                                    modelID, &nmiss   , sizeof ( nmiss ));
Deike Kleberg's avatar
Deike Kleberg committed
298
299
              getBufferGetFromEnd ( __func__, __LINE__, 
                                    modelID, &tokenID2, sizeof ( tokenID2 ));
Deike Kleberg's avatar
Deike Kleberg committed
300
              xassert ( tokenID2 == SEPARATOR );
301
302
            }
          
303
          streamWriteVar ( streamID, varID, data, nmiss );
304
          
305
306
307
308
309
310
311
312
313
          if ( ddebug > 2 )
            {
              sprintf ( text, "streamID=%d, var[%d], size=%d", streamID, varID, size );
              xprintArray ( text, data, size, DATATYPE_FLT );
            }
          
          free ( data );
          break;
        case FUNCCALL:
Deike Kleberg's avatar
Deike Kleberg committed
314
          readFuncCall ();
315
316
          break;
        default:
317
          xabort ( "BUFFER NOT READABLE!" );           
318
        }
Deike Kleberg's avatar
Deike Kleberg committed
319
320
      getBufferGetFromEnd ( __func__, __LINE__, 
                            root, &tokenID, sizeof ( tokenID ));
321
    }
322
  xdebug("%s", "RETURN");
323
324
325
326
} 

/************************************************************************/

Deike Kleberg's avatar
Deike Kleberg committed
327

Thomas Jahns's avatar
Thomas Jahns committed
328
329
static
void clearModelWinBuffer(int modelID)
Deike Kleberg's avatar
Deike Kleberg committed
330
331
332
{
  int nProcsModel = commInqNProcsModel ();

Deike Kleberg's avatar
Deike Kleberg committed
333
334
  xassert ( modelID                >= 0           &&
            modelID                 < nProcsModel &&
335
            rxWin != NULL && rxWin[modelID].buffer != NULL &&
336
337
            rxWin[modelID].size > 0 &&
            rxWin[modelID].size <= MAXWINBUFFERSIZE );
338
  memset(rxWin[modelID].buffer, 0, rxWin[modelID].size);
339
  rxWin[modelID].head = rxWin[modelID].buffer;
Deike Kleberg's avatar
Deike Kleberg committed
340
341
342
343
344
345
}


/************************************************************************/


346
static
Thomas Jahns's avatar
Thomas Jahns committed
347
void getTimeStepData ( int tsID, int vdate, int vtime )
Deike Kleberg's avatar
Deike Kleberg committed
348
{
349
  int modelID;
350
  char text[1024];
351
  int nProcsModel = commInqNProcsModel ();
Thomas Jahns's avatar
Thomas Jahns committed
352
353
  void *getWinBaseAddr;
  int attrFound;
354
          
355
  xdebug("%s", "START");
Deike Kleberg's avatar
Deike Kleberg committed
356
357

  // todo put in correct lbs and ubs
Deike Kleberg's avatar
Deike Kleberg committed
358
  xassert ( tsID >= 0 && vdate >= 0 && vtime >= 0 );
359
  xmpi(MPI_Win_start(groupModel, 0, getWin));
360
361
  xmpi(MPI_Win_get_attr(getWin, MPI_WIN_BASE, &getWinBaseAddr, &attrFound));
  xassert(attrFound);
Deike Kleberg's avatar
Deike Kleberg committed
362
363
  for ( modelID = 0; modelID < nProcsModel; modelID++ )
    {
Thomas Jahns's avatar
Thomas Jahns committed
364
      clearModelWinBuffer(modelID);
365
      xdebug("modelID=%d, nProcsModel=%d, rxWin[%d].size=%zu,"
Thomas Jahns's avatar
Thomas Jahns committed
366
             " getWin=%p, sizeof(int)=%u",
367
             modelID, nProcsModel, modelID, rxWin[modelID].size,
Thomas Jahns's avatar
Thomas Jahns committed
368
             getWinBaseAddr, (unsigned)sizeof(int));
369
370
371
      xmpi(MPI_Get(rxWin[modelID].buffer, rxWin[modelID].size,
                   MPI_UNSIGNED_CHAR, modelID, 0,
                   rxWin[modelID].size, MPI_UNSIGNED_CHAR, getWin));
Deike Kleberg's avatar
Deike Kleberg committed
372
    }
373
  xmpi ( MPI_Win_complete ( getWin ));
Deike Kleberg's avatar
Deike Kleberg committed
374

375
  if ( ddebug > 2 )
Deike Kleberg's avatar
Deike Kleberg committed
376
    for ( modelID = 0; modelID < nProcsModel; modelID++ )
377
      {
378
        sprintf(text, "rxWin[%d].size=%zu from PE%d rxWin[%d].buffer",
379
                modelID, rxWin[modelID].size, modelID, modelID);
380
        xprintArray(text, rxWin[modelID].buffer,
381
382
                    rxWin[modelID].size / sizeof (double),
                    DATATYPE_FLT);
383
      }
Deike Kleberg's avatar
Deike Kleberg committed
384
  readGetBuffers ( tsID, vdate, vtime );
385
          
386
  xdebug("%s", "RETURN");
Deike Kleberg's avatar
Deike Kleberg committed
387
}
Deike Kleberg's avatar
Deike Kleberg committed
388
389
390
391
392
393
394
395
396
397
398

/************************************************************************/

/**
  @brief is encapsulated in CDI library and run on I/O PEs.

  @param

  @return
*/

399
void IOServer ()
Deike Kleberg's avatar
Deike Kleberg committed
400
{
401
  int source, tag, size, nProcsModel=commInqNProcsModel();
402
  static int nfinished = 0;
Deike Kleberg's avatar
Deike Kleberg committed
403
  char * buffer;
404
405
  MPI_Comm commCalc;
  MPI_Status status;
Deike Kleberg's avatar
Deike Kleberg committed
406

407
  xdebug("%s", "START");
Deike Kleberg's avatar
Deike Kleberg committed
408

409
  backendInit ();
410
411
  if ( commInqRankNode () == commInqSpecialRankNode ()) 
    backendFinalize ();
412
413
  commCalc = commInqCommCalc ();

Deike Kleberg's avatar
Deike Kleberg committed
414
  for ( ;; )
415
    {
Deike Kleberg's avatar
Deike Kleberg committed
416
      xmpi ( MPI_Probe ( MPI_ANY_SOURCE, MPI_ANY_TAG, commCalc, &status ));
417
      
Deike Kleberg's avatar
Deike Kleberg committed
418
419
      source = status.MPI_SOURCE;
      tag    = status.MPI_TAG;
420
      
Deike Kleberg's avatar
Deike Kleberg committed
421
      switch ( tag )
422
        {
423
424
425
426
427
428
429
        case FINALIZE:
          {
            int i;
            xdebugMsg(tag, source, nfinished);
            xmpi(MPI_Recv(&i, 1, MPI_INTEGER, source,
                          tag, commCalc, &status));
          }
430
          xdebug("%s", "RECEIVED MESSAGE WITH TAG \"FINALIZE\"");
431
          nfinished++;
Thomas Jahns's avatar
Thomas Jahns committed
432
433
434
          xdebug("nfinished=%d, nProcsModel=%d", nfinished, nProcsModel);
          if ( nfinished == nProcsModel )
            {
435
              {
Deike Kleberg's avatar
Deike Kleberg committed
436
                int nStreams = streamSize ();
Thomas Jahns's avatar
Thomas Jahns committed
437

Deike Kleberg's avatar
Deike Kleberg committed
438
439
440
441
                if ( nStreams > 0 )
                  {
                    int streamNo;
                    int * resHs;
Thomas Jahns's avatar
Thomas Jahns committed
442

Deike Kleberg's avatar
Deike Kleberg committed
443
                    resHs       = xmalloc ( nStreams * sizeof ( resHs[0] ));
444
                    streamGetIndexList ( nStreams, resHs );
Deike Kleberg's avatar
Deike Kleberg committed
445
446
447
448
                    for ( streamNo = 0; streamNo < nStreams; streamNo++ )
                      streamClose ( resHs[streamNo] );
                    free ( resHs );
                  }
Deike Kleberg's avatar
Deike Kleberg committed
449
              }
Thomas Jahns's avatar
Thomas Jahns committed
450
451
              backendCleanup();
              serverWinCleanup();
452
              /* listDestroy(); */
453
              xdebug("%s", "RETURN");
Deike Kleberg's avatar
Deike Kleberg committed
454
455
              return;
            }
456
	  
457
458
          break;
          
Deike Kleberg's avatar
Deike Kleberg committed
459
	case RESOURCES:
460
	  xdebugMsg (  tag, source, nfinished );
461
	  xmpi ( MPI_Get_count ( &status, MPI_CHAR, &size ));
Thomas Jahns's avatar
Thomas Jahns committed
462
	  buffer = xmalloc(size);
463
464
	  xmpi ( MPI_Recv ( buffer, size, MPI_PACKED, source,
                            tag, commCalc, &status ));
465
          xdebug("%s", "RECEIVED MESSAGE WITH TAG \"RESOURCES\"");
466
	  rpcUnpackResources ( buffer, size, commCalc );
467
          xdebug("%s", "");
Deike Kleberg's avatar
Deike Kleberg committed
468
	  free ( buffer );
Deike Kleberg's avatar
Deike Kleberg committed
469
          if ( ddebug > 0 && commInqRankGlob () == nProcsModel ) 
470
            reshListPrint ( "reshListIOServer" );
Deike Kleberg's avatar
Deike Kleberg committed
471
	  serverWinCreate ();
472
	  break;
Deike Kleberg's avatar
Deike Kleberg committed
473

474
	case WRITETS:
475
476
477
478
479
480
481
482
483
          {
            int iBuffer[timestepSize];
            xdebugMsg(tag, source, nfinished);
            xmpi(MPI_Get_count(&status, MPI_INTEGER, &size));
            xassert(size == timestepSize);
            xmpi(MPI_Recv(iBuffer, size, MPI_INTEGER, source,
                          tag, commCalc, &status));
            xdebug("RECEIVED MESSAGE WITH TAG \"WRITETS\": "
                   "tsID=%d, vdate=%d, vtime=%d, source=%d",
Deike Kleberg's avatar
Deike Kleberg committed
484
                   iBuffer[0], iBuffer[1], iBuffer[2], source );
Thomas Jahns's avatar
Thomas Jahns committed
485
            getTimeStepData(iBuffer[0], iBuffer[1], iBuffer[2]);
486
          }
Deike Kleberg's avatar
Deike Kleberg committed
487
	  break;
Deike Kleberg's avatar
Deike Kleberg committed
488

Deike Kleberg's avatar
Deike Kleberg committed
489
	default:
Deike Kleberg's avatar
Deike Kleberg committed
490
	  xabort ( "TAG NOT DEFINED!" );
491
	}
Deike Kleberg's avatar
Deike Kleberg committed
492
493
    }
}
494

495
#endif
496
497
498
499
500
501
502
503
504
/*
 * Local Variables:
 * c-file-style: "Java"
 * c-basic-offset: 2
 * indent-tabs-mode: nil
 * show-trailing-whitespace: t
 * require-trailing-newline: t
 * End:
 */