pio_server.c 15.9 KB
Newer Older
Deike Kleberg's avatar
Deike Kleberg committed
1
2
/** @file ioServer.c
*/
3
4
5
6
7
8
#ifdef HAVE_CONFIG_H
#  include "config.h"
#endif

#ifdef USE_MPI

Deike Kleberg's avatar
Deike Kleberg committed
9
10
11
12
13
14
15
#include "pio_server.h"


#include <stdlib.h>
#include <stdio.h>
#include "limits.h"
#include "cdi.h"
Deike Kleberg's avatar
Deike Kleberg committed
16
#include "pio.h"
Deike Kleberg's avatar
Deike Kleberg committed
17
#include "pio_comm.h"
Deike Kleberg's avatar
Deike Kleberg committed
18
#include "pio_rpc.h"
Deike Kleberg's avatar
Deike Kleberg committed
19
#include "pio_util.h"
20
21
#include "stream_int.h"
#include "resource_handle.h"
Deike Kleberg's avatar
Deike Kleberg committed
22
#include "vlist_var.h"
23
24

extern resOps streamOps;
25
extern void arrayDestroy ( void );
Deike Kleberg's avatar
Deike Kleberg committed
26

27
28
29
static struct
{
  size_t size;
30
  unsigned char *buffer, *head;
31
32
} *rxWin = NULL;

Thomas Jahns's avatar
Thomas Jahns committed
33
static MPI_Win getWin = MPI_WIN_NULL;
Thomas Jahns's avatar
Thomas Jahns committed
34
static MPI_Group groupModel = MPI_GROUP_NULL;
Deike Kleberg's avatar
Deike Kleberg committed
35
36


Deike Kleberg's avatar
Deike Kleberg committed
37
38
/************************************************************************/

39
static
Deike Kleberg's avatar
Deike Kleberg committed
40
41
42
void serverWinCleanup ()
{
  int i;
43
  int nProcsCalc = commInqNProcsModel ();
Deike Kleberg's avatar
Deike Kleberg committed
44
  
Deike Kleberg's avatar
Deike Kleberg committed
45
46
47
  if ( getWin != MPI_WIN_NULL )
    xmpi ( MPI_Win_free ( &getWin ));
  
48
49
  if (rxWin)
    {
Deike Kleberg's avatar
Deike Kleberg committed
50
      for ( i = 0; i < nProcsCalc; i++ )
51
52
        free(rxWin[i].buffer);
      free(rxWin);
Deike Kleberg's avatar
Deike Kleberg committed
53
    }
54

55
  xdebug("%s", "cleaned up mpi_win");
Deike Kleberg's avatar
Deike Kleberg committed
56
57
58
}
 
 /************************************************************************/
59

Deike Kleberg's avatar
Deike Kleberg committed
60
static 
Deike Kleberg's avatar
Deike Kleberg committed
61
  void collDefBufferSizes ()
Deike Kleberg's avatar
Deike Kleberg committed
62
{
63
  int nstreams, * streamIndexList, streamNo, vlistID, nvars, varID, iorank;
Deike Kleberg's avatar
Deike Kleberg committed
64
  int modelID, decoChunk, sumGetBufferSizes = 0;
65
  int rankGlob = commInqRankGlob ();
Deike Kleberg's avatar
Deike Kleberg committed
66
  int nProcsModel = commInqNProcsModel ();
67
  int root = commInqRootGlob ();
Deike Kleberg's avatar
Deike Kleberg committed
68

69
  xassert(rxWin != NULL);
Deike Kleberg's avatar
Deike Kleberg committed
70

Deike Kleberg's avatar
Deike Kleberg committed
71
  nstreams = reshCountType ( &streamOps );
72
73
  streamIndexList = xmalloc ( nstreams * sizeof ( streamIndexList[0] ));
  reshGetResHListOfType ( nstreams, streamIndexList, &streamOps );
Deike Kleberg's avatar
Deike Kleberg committed
74
75
  for ( streamNo = 0; streamNo < nstreams; streamNo++ )
    {
76
      // space required for data
77
      vlistID = streamInqVlist ( streamIndexList[streamNo] );
Deike Kleberg's avatar
Deike Kleberg committed
78
79
80
81
      nvars = vlistNvars ( vlistID );
      for ( varID = 0; varID < nvars; varID++ )
        {
          iorank = vlistInqVarIOrank ( vlistID, varID );
Deike Kleberg's avatar
Deike Kleberg committed
82
          xassert ( iorank != CDI_UNDEFID );
Deike Kleberg's avatar
Deike Kleberg committed
83
84
          if ( iorank == rankGlob )
            {
Deike Kleberg's avatar
Deike Kleberg committed
85
              for ( modelID = 0; modelID < nProcsModel; modelID++ )
86
                {
87
                  decoChunk =  vlistInqVarDecoChunk ( vlistID, varID, modelID );
Deike Kleberg's avatar
Deike Kleberg committed
88
                  xassert ( decoChunk > 0 );
89
90
                  rxWin[modelID].size += decoChunk * sizeof (double)
                    + winBufferOverheadChunk * sizeof (int);
91
                }
Deike Kleberg's avatar
Deike Kleberg committed
92
            }
93
    }
Deike Kleberg's avatar
Deike Kleberg committed
94
95
      // space required for the 3 function calls streamOpen, streamDefVlist, streamClose 
      // once per stream and timestep for all collprocs only on the modelproc root
96
97
      rxWin[root].size += 3 * winBufferOverheadFuncCall * sizeof (int)
        + 5 * sizeof (int) + MAXDATAFILENAME;
Deike Kleberg's avatar
Deike Kleberg committed
98
    }
99
  free ( streamIndexList );
Deike Kleberg's avatar
Deike Kleberg committed
100
101

  for ( modelID = 0; modelID < nProcsModel; modelID++ )
102
    {
103
104
      rxWin[modelID].size += winBufferOverhead * sizeof (int);
      sumGetBufferSizes += rxWin[modelID].size;
105
    }
Deike Kleberg's avatar
Deike Kleberg committed
106
  xassert ( sumGetBufferSizes <= MAXWINBUFFERSIZE );
107
  /* xprintArray ( "getBufferSize", getBufferSize, nProcsModel, DATATYPE_INT ); */
Deike Kleberg's avatar
Deike Kleberg committed
108
}
109

Deike Kleberg's avatar
Deike Kleberg committed
110
 /************************************************************************/
111
112

static 
Deike Kleberg's avatar
Deike Kleberg committed
113
114
 void serverWinCreate ()
{ 
Deike Kleberg's avatar
Deike Kleberg committed
115
  int ranks[1], modelID;
116
  MPI_Comm commCalc = commInqCommCalc ();
Deike Kleberg's avatar
Deike Kleberg committed
117
  MPI_Group groupCalc;
118
  int nProcsModel = commInqNProcsModel ();
Deike Kleberg's avatar
Deike Kleberg committed
119

Deike Kleberg's avatar
Deike Kleberg committed
120
  xmpi ( MPI_Win_create ( MPI_BOTTOM, 0, 1, MPI_INFO_NULL,
121
                          commCalc, &getWin ));
Deike Kleberg's avatar
Deike Kleberg committed
122
123

  /* target group */
124
125
  ranks[0] = nProcsModel;
  xmpi ( MPI_Comm_group ( commCalc, &groupCalc ));
Deike Kleberg's avatar
Deike Kleberg committed
126
127
  xmpi ( MPI_Group_excl ( groupCalc, 1, ranks, &groupModel ));

128
  rxWin = xmalloc(nProcsModel * sizeof (rxWin[0]));
Deike Kleberg's avatar
Deike Kleberg committed
129
  collDefBufferSizes ();
130
  /* xprintArray ( "getBufferSizes", getBufferSize, nProcsModel, DATATYPE_INT ); */
Deike Kleberg's avatar
Deike Kleberg committed
131

Deike Kleberg's avatar
Deike Kleberg committed
132
133
  for ( modelID = 0; modelID < nProcsModel; modelID++ )
    {
134
      rxWin[modelID].buffer = xmalloc(rxWin[modelID].size);
135
      rxWin[modelID].head = rxWin[modelID].buffer;
Deike Kleberg's avatar
Deike Kleberg committed
136
    }
Deike Kleberg's avatar
Deike Kleberg committed
137

138
  xdebug("%s", "created mpi_win, allocated getBuffer");
Deike Kleberg's avatar
Deike Kleberg committed
139
140
}

Deike Kleberg's avatar
Deike Kleberg committed
141
142
/************************************************************************/

143
static
Deike Kleberg's avatar
Deike Kleberg committed
144
145
  void getBufferGetFromEnd ( const char * caller, int line,  
                             int ID, void * argBuffer, size_t size )
146
{
147
  if (rxWin == NULL ||
148
149
150
151
152
153
154
155
156
157
      argBuffer     == NULL ||
      size           < 0    ||
      ID             < 0    ||
      ID             >= commInqNProcsModel () ||
      rxWin[ID].head - rxWin[ID].buffer + size > rxWin[ID].size)
    xabort("caller: %s, line %d, ID = %d, nProcsModel=%d,"
           " size = %lu, rxWin[%d].head = %ld, rxWin[%d].size = %lu",
           caller, line, ID, (unsigned long)size, ID,
           commInqNProcsModel(), rxWin[ID].head - rxWin[ID].buffer,
           ID, (unsigned long)rxWin[ID].size);
158
159
  memcpy ( argBuffer, rxWin[ID].head, size );
  rxWin[ID].head += size;
Deike Kleberg's avatar
Deike Kleberg committed
160
161
162
163
}

/************************************************************************/

Deike Kleberg's avatar
Deike Kleberg committed
164
165
166
167
168
169
170
171
static
  void readFuncCall ( void )
{
  int funcID, tokenID;
  int root = commInqRootGlob ();

  getBufferGetFromEnd ( __func__, __LINE__, 
                        root, &funcID, sizeof ( funcID ));
Deike Kleberg's avatar
Deike Kleberg committed
172
  xassert ( funcID >= MINFUNCID && funcID <= MAXFUNCID );
Deike Kleberg's avatar
Deike Kleberg committed
173
174
175
  
  switch ( funcID )
    {
176
177
178
179
180
181
182
    case STREAMCLOSE:
      {
        int streamID;

        getBufferGetFromEnd ( __func__, __LINE__, 
                              root, &streamID, sizeof ( streamID ));
        streamClose ( streamID );
183
184
185
        xdebug ( "READ FUNCTION CALL FROM WIN:  %s, streamID=%d,"
                 " closed stream", 
                 funcMap[funcID], streamID );
186
187
      }
      break;
Deike Kleberg's avatar
Deike Kleberg committed
188
    case STREAMOPEN:
189
      {
190
        char *filename;
191
        size_t filenamesz;
192
        int filetype, streamID;
193
194
195

        getBufferGetFromEnd ( __func__, __LINE__, 
                              root, &filenamesz, sizeof ( filenamesz ));
Deike Kleberg's avatar
Deike Kleberg committed
196
        xassert ( filenamesz > 0 && filenamesz < MAXDATAFILENAME );
197
        filename = xmalloc(filenamesz + 1);
198
199
        getBufferGetFromEnd ( __func__, __LINE__, 
                              root, filename, filenamesz );
200
        filename[filenamesz] = '\0';
201
202
        getBufferGetFromEnd ( __func__, __LINE__, 
                              root, &filetype, sizeof ( filetype ));
203
        xassert ( filetype >= MINFILETYPE && filetype <= MAXFILETYPE );
204
205
        streamID = streamOpenWrite ( filename, filetype );
        xdebug ( "READ FUNCTION CALL FROM WIN:  %s, filenamesz=%d,"
206
                 " filename=%s, filetype=%d, OPENED STREAM %d", 
207
                 funcMap[funcID], filenamesz, filename, filetype, streamID );
208
        free ( filename );
209
      }
Deike Kleberg's avatar
Deike Kleberg committed
210
      break; 
211
212
213
214
215
216
217
218
219
220
221
222
223
224
    case STREAMDEFVLIST:
      {
        int streamID, vlistID;

        getBufferGetFromEnd ( __func__, __LINE__, 
                              root, &streamID, sizeof ( vlistID ));
        getBufferGetFromEnd ( __func__, __LINE__, 
                              root, &vlistID, sizeof ( vlistID ));
        streamDefVlist ( streamID, vlistID );
        xdebug ( "READ FUNCTION CALL FROM WIN:  %s, streamID=%d,"
                 " vlistID=%d, called streamDefVlist ().", 
                 funcMap[funcID], streamID, vlistID );
      }
      break;
Deike Kleberg's avatar
Deike Kleberg committed
225
    default:
226
      xabort ( "REMOTE FUNCTIONCALL NOT IMPLEMENTED!" );
Deike Kleberg's avatar
Deike Kleberg committed
227
228
229
    }
  getBufferGetFromEnd ( __func__, __LINE__, 
                        root, &tokenID, sizeof ( tokenID ));
Deike Kleberg's avatar
Deike Kleberg committed
230
  xassert ( tokenID == SEPARATOR );
Deike Kleberg's avatar
Deike Kleberg committed
231
232
233
234
}

/************************************************************************/

Deike Kleberg's avatar
Deike Kleberg committed
235
236
237
238
239
240
static
void readGetBuffers ( int tsID, int vdate, int vtime )
{
  int modelID;
  double * data = NULL, * dataHead = NULL;
  int streamID = CDI_UNDEFID, streamIDNew = CDI_UNDEFID;
241
  int varID, vlistID, taxisID;
242
  int size, offset, chunk, chunkSize;
Deike Kleberg's avatar
Deike Kleberg committed
243
  int tokenID, tokenID2;
244
245
246
  
  int nmiss = 0;
  char text[1024];
247
  int nProcsModel = commInqNProcsModel ();
Deike Kleberg's avatar
Deike Kleberg committed
248
  int root        = commInqRootGlob ();
249
  
250
  xdebug("%s", "START");
Deike Kleberg's avatar
Deike Kleberg committed
251
252
253
  
  getBufferGetFromEnd ( __func__, __LINE__, 
                        root, &tokenID, sizeof ( tokenID ));
254

Deike Kleberg's avatar
Deike Kleberg committed
255
  while ( tokenID != END )
256
    {
Deike Kleberg's avatar
Deike Kleberg committed
257
      switch ( tokenID )
258
        {
259
        case DATATOKEN:
Deike Kleberg's avatar
Deike Kleberg committed
260
261
262
          getBufferGetFromEnd ( __func__, __LINE__, 
                                root, &streamIDNew, sizeof ( streamID ));
          if ( streamIDNew != streamID )
263
            {
Deike Kleberg's avatar
Deike Kleberg committed
264
              streamID = streamIDNew;
265
266
              vlistID = streamInqVlist ( streamID );
              taxisID = vlistInqTaxis ( vlistID );
267
              taxisDefVdate ( taxisID, vdate );
268
269
270
              taxisDefVtime ( taxisID, vtime );
              streamDefTimestep ( streamID, tsID );
            }
Deike Kleberg's avatar
Deike Kleberg committed
271
272
          getBufferGetFromEnd ( __func__, __LINE__, 
                                root, &varID, sizeof ( varID ));
273
274
275
276
          size = vlistInqVarSize ( vlistID, varID );
          data = xmalloc ( size * sizeof ( double ));
          dataHead = data;
          
Deike Kleberg's avatar
Deike Kleberg committed
277
          for ( modelID = 0; modelID < nProcsModel; modelID++ )
278
            {
Deike Kleberg's avatar
Deike Kleberg committed
279
              if ( modelID != root )
280
                {
281
                  int tempID;
Deike Kleberg's avatar
Deike Kleberg committed
282
283
                  getBufferGetFromEnd ( __func__, __LINE__, 
                                        modelID, &tempID, sizeof ( tempID ));
Deike Kleberg's avatar
Deike Kleberg committed
284
                  xassert ( tempID == DATATOKEN );
Deike Kleberg's avatar
Deike Kleberg committed
285
286
                  getBufferGetFromEnd ( __func__, __LINE__, 
                                        modelID, &tempID, sizeof ( tempID ));
Deike Kleberg's avatar
Deike Kleberg committed
287
                  xassert ( tempID == streamID );
Deike Kleberg's avatar
Deike Kleberg committed
288
289
                  getBufferGetFromEnd ( __func__, __LINE__, 
                                        modelID, &tempID, sizeof ( tempID ));
Deike Kleberg's avatar
Deike Kleberg committed
290
                  xassert ( tempID == varID );
291
                }
292
              chunk  = vlistInqVarDecoChunk ( vlistID, varID, modelID );
Deike Kleberg's avatar
Deike Kleberg committed
293
294
              getBufferGetFromEnd ( __func__, __LINE__, 
                                    modelID, dataHead, chunk * sizeof ( double ));
295
              dataHead += chunk;
Deike Kleberg's avatar
Deike Kleberg committed
296
297
              getBufferGetFromEnd ( __func__, __LINE__, 
                                    modelID, &nmiss   , sizeof ( nmiss ));
Deike Kleberg's avatar
Deike Kleberg committed
298
299
              getBufferGetFromEnd ( __func__, __LINE__, 
                                    modelID, &tokenID2, sizeof ( tokenID2 ));
Deike Kleberg's avatar
Deike Kleberg committed
300
              xassert ( tokenID2 == SEPARATOR );
301
302
            }
          
303
          streamWriteVar ( streamID, varID, data, nmiss );
304
          
305
306
307
308
309
310
311
312
313
          if ( ddebug > 2 )
            {
              sprintf ( text, "streamID=%d, var[%d], size=%d", streamID, varID, size );
              xprintArray ( text, data, size, DATATYPE_FLT );
            }
          
          free ( data );
          break;
        case FUNCCALL:
Deike Kleberg's avatar
Deike Kleberg committed
314
          readFuncCall ();
315
316
          break;
        default:
317
          xabort ( "BUFFER NOT READABLE!" );           
318
        }
Deike Kleberg's avatar
Deike Kleberg committed
319
320
      getBufferGetFromEnd ( __func__, __LINE__, 
                            root, &tokenID, sizeof ( tokenID ));
321
    }
322
  xdebug("%s", "RETURN");
323
324
325
326
} 

/************************************************************************/

Deike Kleberg's avatar
Deike Kleberg committed
327
328
329
330
331
332

static 
  void getFlushBuffer ( int modelID )
{
  int nProcsModel = commInqNProcsModel ();

Deike Kleberg's avatar
Deike Kleberg committed
333
334
  xassert ( modelID                >= 0           &&
            modelID                 < nProcsModel &&
335
            rxWin != NULL && rxWin[modelID].buffer != NULL &&
336
337
            rxWin[modelID].size > 0 &&
            rxWin[modelID].size <= MAXWINBUFFERSIZE );
338
  memset(rxWin[modelID].buffer, 0, rxWin[modelID].size);
339
  rxWin[modelID].head = rxWin[modelID].buffer;
Deike Kleberg's avatar
Deike Kleberg committed
340
341
342
343
344
345
}


/************************************************************************/


346
static
347
void getData ( int tsID, int vdate, int vtime )
Deike Kleberg's avatar
Deike Kleberg committed
348
{
Deike Kleberg's avatar
Deike Kleberg committed
349
  int iAssert   = 0, modelID;
350
  char text[1024];
351
  int nProcsModel = commInqNProcsModel ();
Thomas Jahns's avatar
Thomas Jahns committed
352
353
  void *getWinBaseAddr;
  int attrFound;
354
          
355
  xdebug("%s", "START");
Deike Kleberg's avatar
Deike Kleberg committed
356
357

  // todo put in correct lbs and ubs
Deike Kleberg's avatar
Deike Kleberg committed
358
  xassert ( tsID >= 0 && vdate >= 0 && vtime >= 0 );
359
  xmpi ( MPI_Win_start ( groupModel, iAssert, getWin )); 
360
361
  xmpi(MPI_Win_get_attr(getWin, MPI_WIN_BASE, &getWinBaseAddr, &attrFound));
  xassert(attrFound);
Deike Kleberg's avatar
Deike Kleberg committed
362
363
364
  for ( modelID = 0; modelID < nProcsModel; modelID++ )
    {
      getFlushBuffer ( modelID );
365
      xdebug("modelID=%d, nProcsModel=%d, rxWin[%d].size=%d,"
Thomas Jahns's avatar
Thomas Jahns committed
366
             " getWin=%p, sizeof(int)=%u",
367
             modelID, nProcsModel, modelID, rxWin[modelID].size,
Thomas Jahns's avatar
Thomas Jahns committed
368
             getWinBaseAddr, (unsigned)sizeof(int));
369
370
371
      xmpi(MPI_Get(rxWin[modelID].buffer, rxWin[modelID].size,
                   MPI_UNSIGNED_CHAR, modelID, 0,
                   rxWin[modelID].size, MPI_UNSIGNED_CHAR, getWin));
Deike Kleberg's avatar
Deike Kleberg committed
372
    }
373
  xmpi ( MPI_Win_complete ( getWin ));
Deike Kleberg's avatar
Deike Kleberg committed
374

375
  if ( ddebug > 2 )
Deike Kleberg's avatar
Deike Kleberg committed
376
    for ( modelID = 0; modelID < nProcsModel; modelID++ )
377
      {
378
        sprintf(text, "rxWin[%d].size=%d from PE%d rxWin[%d].buffer",
379
                modelID, rxWin[modelID].size, modelID, modelID);
380
        xprintArray(text, rxWin[modelID].buffer,
381
382
                    rxWin[modelID].size / sizeof (double),
                    DATATYPE_FLT);
383
      }
Deike Kleberg's avatar
Deike Kleberg committed
384
  readGetBuffers ( tsID, vdate, vtime );
385
          
386
  xdebug("%s", "RETURN");
Deike Kleberg's avatar
Deike Kleberg committed
387
}
Deike Kleberg's avatar
Deike Kleberg committed
388
389
390
391
392
393
394
395
396
397
398

/************************************************************************/

/**
  @brief is encapsulated in CDI library and run on I/O PEs.

  @param

  @return
*/

399
void IOServer ()
Deike Kleberg's avatar
Deike Kleberg committed
400
{
401
402
  int source, tag, * iBuffer, size, nProcsModel = commInqNProcsModel ();
  static int nfinished = 0;
Deike Kleberg's avatar
Deike Kleberg committed
403
  char * buffer;
404
405
  MPI_Comm commCalc;
  MPI_Status status;
Deike Kleberg's avatar
Deike Kleberg committed
406

407
  xdebug("%s", "START");
Deike Kleberg's avatar
Deike Kleberg committed
408

409
  backendInit ();
410
411
  if ( commInqRankNode () == commInqSpecialRankNode ()) 
    backendFinalize ();
412
413
  commCalc = commInqCommCalc ();

Deike Kleberg's avatar
Deike Kleberg committed
414
  for ( ;; )
415
    {
Deike Kleberg's avatar
Deike Kleberg committed
416
      xmpi ( MPI_Probe ( MPI_ANY_SOURCE, MPI_ANY_TAG, commCalc, &status ));
417
      
Deike Kleberg's avatar
Deike Kleberg committed
418
419
      source = status.MPI_SOURCE;
      tag    = status.MPI_TAG;
420
      
Deike Kleberg's avatar
Deike Kleberg committed
421
      switch ( tag )
422
423
424
        {
        case FINALIZE: 
          xdebugMsg (  tag, source, nfinished );
425
          iBuffer = xmalloc ( sizeof ( int ));     
426
          xmpi ( MPI_Recv ( iBuffer, 1, MPI_INTEGER, source, 
427
                            tag, commCalc, &status ));
428
          xdebug("%s", "RECEIVED MESSAGE WITH TAG \"FINALIZE\"");
429
          free ( iBuffer );
430
          nfinished++;
Deike Kleberg's avatar
Deike Kleberg committed
431
          xdebug ( "nfinished=%d, nProcsModel=%d", 
432
433
434
                   nfinished, nProcsModel );
          
          if ( nfinished == nProcsModel )  
Deike Kleberg's avatar
Deike Kleberg committed
435
            {              
436
              {
Deike Kleberg's avatar
Deike Kleberg committed
437
                int nStreams = streamSize ();
438
                
Deike Kleberg's avatar
Deike Kleberg committed
439
440
441
442
443
444
                if ( nStreams > 0 )
                  {
                    int streamNo;
                    int * resHs;
                    
                    resHs       = xmalloc ( nStreams * sizeof ( resHs[0] ));
445
                    streamGetIndexList ( nStreams, resHs );
Deike Kleberg's avatar
Deike Kleberg committed
446
447
448
449
                    for ( streamNo = 0; streamNo < nStreams; streamNo++ )
                      streamClose ( resHs[streamNo] );
                    free ( resHs );
                  }
Deike Kleberg's avatar
Deike Kleberg committed
450
451
              }
              backendCleanup (); 
452
              serverWinCleanup ();                 
453
              /* listDestroy(); */
454
              xdebug("%s", "RETURN");
Deike Kleberg's avatar
Deike Kleberg committed
455
456
              return;
            }
457
	  
458
459
          break;
          
Deike Kleberg's avatar
Deike Kleberg committed
460
	case RESOURCES:
461
	  xdebugMsg (  tag, source, nfinished );
462
	  xmpi ( MPI_Get_count ( &status, MPI_CHAR, &size ));
463
	  buffer = xmalloc ( size * sizeof ( char ));
464
465
	  xmpi ( MPI_Recv ( buffer, size, MPI_PACKED, source,
                            tag, commCalc, &status ));
466
          xdebug("%s", "RECEIVED MESSAGE WITH TAG \"RESOURCES\"");
467
	  rpcUnpackResources ( buffer, size, commCalc );
468
          xdebug("%s", "");
Deike Kleberg's avatar
Deike Kleberg committed
469
	  free ( buffer );
Deike Kleberg's avatar
Deike Kleberg committed
470
          if ( ddebug > 0 && commInqRankGlob () == nProcsModel ) 
471
            reshListPrint ( "reshListIOServer" );
Deike Kleberg's avatar
Deike Kleberg committed
472
	  serverWinCreate ();
473
	  break;
Deike Kleberg's avatar
Deike Kleberg committed
474

475
	case WRITETS:
476
	  xdebugMsg (  tag, source, nfinished );
477
          xmpi ( MPI_Get_count ( &status, MPI_INTEGER, &size ));     
Deike Kleberg's avatar
Deike Kleberg committed
478
          xassert ( size == timestepSize );
479
480
481
          iBuffer = xmalloc ( size * sizeof ( int ));
          xmpi ( MPI_Recv ( iBuffer, size, MPI_INTEGER, source,
                            tag, commCalc, &status ));
Deike Kleberg's avatar
Deike Kleberg committed
482
483
484
          xdebug ( "RECEIVED MESSAGE WITH TAG \"WRITETS\": "
                   "tsID=%d, vdate=%d, vtime=%d, source=%d", 
                   iBuffer[0], iBuffer[1], iBuffer[2], source );
485
486
          
          getData ( iBuffer[0], iBuffer[1], iBuffer[2] );                
487
          free ( iBuffer );
Deike Kleberg's avatar
Deike Kleberg committed
488
	  break;
Deike Kleberg's avatar
Deike Kleberg committed
489

Deike Kleberg's avatar
Deike Kleberg committed
490
	default:
Deike Kleberg's avatar
Deike Kleberg committed
491
	  xabort ( "TAG NOT DEFINED!" );
492
	}
Deike Kleberg's avatar
Deike Kleberg committed
493
494
    }
}
495

496
#endif
497
498
499
500
501
502
503
504
505
/*
 * Local Variables:
 * c-file-style: "Java"
 * c-basic-offset: 2
 * indent-tabs-mode: nil
 * show-trailing-whitespace: t
 * require-trailing-newline: t
 * End:
 */