pio_server.c 15.9 KB
Newer Older
Deike Kleberg's avatar
Deike Kleberg committed
1
2
/** @file ioServer.c
*/
3
4
5
6
7
8
#ifdef HAVE_CONFIG_H
#  include "config.h"
#endif

#ifdef USE_MPI

Deike Kleberg's avatar
Deike Kleberg committed
9
10
11
12
13
14
15
#include "pio_server.h"


#include <stdlib.h>
#include <stdio.h>
#include "limits.h"
#include "cdi.h"
Deike Kleberg's avatar
Deike Kleberg committed
16
#include "pio.h"
Deike Kleberg's avatar
Deike Kleberg committed
17
#include "pio_comm.h"
Deike Kleberg's avatar
Deike Kleberg committed
18
#include "pio_rpc.h"
Deike Kleberg's avatar
Deike Kleberg committed
19
#include "pio_util.h"
20
21
#include "stream_int.h"
#include "resource_handle.h"
Deike Kleberg's avatar
Deike Kleberg committed
22
#include "vlist_var.h"
23
24

extern resOps streamOps;
25
extern void arrayDestroy ( void );
Deike Kleberg's avatar
Deike Kleberg committed
26

27
28
29
static struct
{
  size_t size;
30
  unsigned char *buffer, *head;
31
32
} *rxWin = NULL;

Thomas Jahns's avatar
Thomas Jahns committed
33
static MPI_Win getWin = MPI_WIN_NULL;
Deike Kleberg's avatar
Deike Kleberg committed
34
MPI_Group        groupModel    = MPI_GROUP_NULL;
Deike Kleberg's avatar
Deike Kleberg committed
35
36


Deike Kleberg's avatar
Deike Kleberg committed
37
38
/************************************************************************/

39
static
Deike Kleberg's avatar
Deike Kleberg committed
40
41
42
void serverWinCleanup ()
{
  int i;
43
  int nProcsCalc = commInqNProcsModel ();
Deike Kleberg's avatar
Deike Kleberg committed
44
  
Deike Kleberg's avatar
Deike Kleberg committed
45
46
47
  if ( getWin != MPI_WIN_NULL )
    xmpi ( MPI_Win_free ( &getWin ));
  
48
49
  if (rxWin)
    {
Deike Kleberg's avatar
Deike Kleberg committed
50
      for ( i = 0; i < nProcsCalc; i++ )
51
52
        free(rxWin[i].buffer);
      free(rxWin);
Deike Kleberg's avatar
Deike Kleberg committed
53
    }
54

55
  xdebug("%s", "cleaned up mpi_win");
Deike Kleberg's avatar
Deike Kleberg committed
56
57
58
}
 
 /************************************************************************/
59

Deike Kleberg's avatar
Deike Kleberg committed
60
static 
Deike Kleberg's avatar
Deike Kleberg committed
61
  void collDefBufferSizes ()
Deike Kleberg's avatar
Deike Kleberg committed
62
{
63
  int nstreams, * streamIndexList, streamNo, vlistID, nvars, varID, iorank;
Deike Kleberg's avatar
Deike Kleberg committed
64
  int modelID, decoChunk, sumGetBufferSizes = 0;
65
  int rankGlob = commInqRankGlob ();
Deike Kleberg's avatar
Deike Kleberg committed
66
  int nProcsModel = commInqNProcsModel ();
67
  int root = commInqRootGlob ();
Deike Kleberg's avatar
Deike Kleberg committed
68

69
  xassert(rxWin != NULL);
Deike Kleberg's avatar
Deike Kleberg committed
70

Deike Kleberg's avatar
Deike Kleberg committed
71
  nstreams = reshCountType ( &streamOps );
72
73
  streamIndexList = xmalloc ( nstreams * sizeof ( streamIndexList[0] ));
  reshGetResHListOfType ( nstreams, streamIndexList, &streamOps );
Deike Kleberg's avatar
Deike Kleberg committed
74
75
  for ( streamNo = 0; streamNo < nstreams; streamNo++ )
    {
76
      // space required for data
77
      vlistID = streamInqVlist ( streamIndexList[streamNo] );
Deike Kleberg's avatar
Deike Kleberg committed
78
79
80
81
      nvars = vlistNvars ( vlistID );
      for ( varID = 0; varID < nvars; varID++ )
        {
          iorank = vlistInqVarIOrank ( vlistID, varID );
Deike Kleberg's avatar
Deike Kleberg committed
82
          xassert ( iorank != CDI_UNDEFID );
Deike Kleberg's avatar
Deike Kleberg committed
83
84
          if ( iorank == rankGlob )
            {
Deike Kleberg's avatar
Deike Kleberg committed
85
              for ( modelID = 0; modelID < nProcsModel; modelID++ )
86
                {
87
                  decoChunk =  vlistInqVarDecoChunk ( vlistID, varID, modelID );
Deike Kleberg's avatar
Deike Kleberg committed
88
                  xassert ( decoChunk > 0 );
89
90
                  rxWin[modelID].size += decoChunk * sizeof (double)
                    + winBufferOverheadChunk * sizeof (int);
91
                }
Deike Kleberg's avatar
Deike Kleberg committed
92
            }
93
    }
Deike Kleberg's avatar
Deike Kleberg committed
94
95
      // space required for the 3 function calls streamOpen, streamDefVlist, streamClose 
      // once per stream and timestep for all collprocs only on the modelproc root
96
97
      rxWin[root].size += 3 * winBufferOverheadFuncCall * sizeof (int)
        + 5 * sizeof (int) + MAXDATAFILENAME;
Deike Kleberg's avatar
Deike Kleberg committed
98
    }
99
  free ( streamIndexList );
Deike Kleberg's avatar
Deike Kleberg committed
100
101

  for ( modelID = 0; modelID < nProcsModel; modelID++ )
102
    {
103
104
      rxWin[modelID].size += winBufferOverhead * sizeof (int);
      sumGetBufferSizes += rxWin[modelID].size;
105
    }
Deike Kleberg's avatar
Deike Kleberg committed
106
  xassert ( sumGetBufferSizes <= MAXWINBUFFERSIZE );
107
  /* xprintArray ( "getBufferSize", getBufferSize, nProcsModel, DATATYPE_INT ); */
Deike Kleberg's avatar
Deike Kleberg committed
108
}
109

Deike Kleberg's avatar
Deike Kleberg committed
110
 /************************************************************************/
111
112

static 
Deike Kleberg's avatar
Deike Kleberg committed
113
114
 void serverWinCreate ()
{ 
Deike Kleberg's avatar
Deike Kleberg committed
115
  int ranks[1], modelID;
116
  MPI_Comm commCalc = commInqCommCalc ();
Deike Kleberg's avatar
Deike Kleberg committed
117
  MPI_Group groupCalc;
118
  int nProcsModel = commInqNProcsModel ();
Deike Kleberg's avatar
Deike Kleberg committed
119

Deike Kleberg's avatar
Deike Kleberg committed
120
  xmpi ( MPI_Win_create ( MPI_BOTTOM, 0, 1, MPI_INFO_NULL,
121
                          commCalc, &getWin ));
Deike Kleberg's avatar
Deike Kleberg committed
122
123

  /* target group */
124
125
  ranks[0] = nProcsModel;
  xmpi ( MPI_Comm_group ( commCalc, &groupCalc ));
Deike Kleberg's avatar
Deike Kleberg committed
126
127
  xmpi ( MPI_Group_excl ( groupCalc, 1, ranks, &groupModel ));

128
  rxWin = xmalloc(nProcsModel * sizeof (rxWin[0]));
Deike Kleberg's avatar
Deike Kleberg committed
129
  collDefBufferSizes ();
130
  /* xprintArray ( "getBufferSizes", getBufferSize, nProcsModel, DATATYPE_INT ); */
Deike Kleberg's avatar
Deike Kleberg committed
131

Deike Kleberg's avatar
Deike Kleberg committed
132
133
  for ( modelID = 0; modelID < nProcsModel; modelID++ )
    {
134
      rxWin[modelID].buffer = xmalloc(rxWin[modelID].size);
135
      rxWin[modelID].head = rxWin[modelID].buffer;
Deike Kleberg's avatar
Deike Kleberg committed
136
    }
Deike Kleberg's avatar
Deike Kleberg committed
137

138
  xdebug("%s", "created mpi_win, allocated getBuffer");
Deike Kleberg's avatar
Deike Kleberg committed
139
140
}

Deike Kleberg's avatar
Deike Kleberg committed
141
142
/************************************************************************/

143
static
Deike Kleberg's avatar
Deike Kleberg committed
144
145
  void getBufferGetFromEnd ( const char * caller, int line,  
                             int ID, void * argBuffer, size_t size )
146
{
Deike Kleberg's avatar
Deike Kleberg committed
147
148
  char text[1024];

149
  if (rxWin == NULL ||
Deike Kleberg's avatar
Deike Kleberg committed
150
151
152
       argBuffer     == NULL ||
       size           < 0    ||
       ID             < 0    ||
153
       ID             >= commInqNProcsModel () ||
154
       rxWin[ID].head - rxWin[ID].buffer + size > rxWin[ID].size)
Deike Kleberg's avatar
Deike Kleberg committed
155
    {
156
      sprintf ( text, "caller: %s, line %d, ID = %d, nProcsModel=%d, size = %d, "
157
                "rxWin[%d].head = %d, rxWin[%d].size = %d",
158
                caller, line, ID, size, ID, commInqNProcsModel (), 
159
                rxWin[ID].head - rxWin[ID].buffer,
160
                ID, rxWin[ID].size);
Deike Kleberg's avatar
Deike Kleberg committed
161
      xabort ( text );
162
    }  
163
164
  memcpy ( argBuffer, rxWin[ID].head, size );
  rxWin[ID].head += size;
Deike Kleberg's avatar
Deike Kleberg committed
165
166
167
168
}

/************************************************************************/

Deike Kleberg's avatar
Deike Kleberg committed
169
170
171
172
173
174
175
176
static
  void readFuncCall ( void )
{
  int funcID, tokenID;
  int root = commInqRootGlob ();

  getBufferGetFromEnd ( __func__, __LINE__, 
                        root, &funcID, sizeof ( funcID ));
Deike Kleberg's avatar
Deike Kleberg committed
177
  xassert ( funcID >= MINFUNCID && funcID <= MAXFUNCID );
Deike Kleberg's avatar
Deike Kleberg committed
178
179
180
  
  switch ( funcID )
    {
181
182
183
184
185
186
187
    case STREAMCLOSE:
      {
        int streamID;

        getBufferGetFromEnd ( __func__, __LINE__, 
                              root, &streamID, sizeof ( streamID ));
        streamClose ( streamID );
188
189
190
        xdebug ( "READ FUNCTION CALL FROM WIN:  %s, streamID=%d,"
                 " closed stream", 
                 funcMap[funcID], streamID );
191
192
      }
      break;
Deike Kleberg's avatar
Deike Kleberg committed
193
    case STREAMOPEN:
194
195
196
      {
        char * filename, * endname = "\0";
        size_t filenamesz;
197
        int filetype, streamID;
198
199
200

        getBufferGetFromEnd ( __func__, __LINE__, 
                              root, &filenamesz, sizeof ( filenamesz ));
Deike Kleberg's avatar
Deike Kleberg committed
201
        xassert ( filenamesz > 0 && filenamesz < MAXDATAFILENAME );
202
203
204
205
206
207
        filename = xmalloc ( filenamesz + 1 );
        getBufferGetFromEnd ( __func__, __LINE__, 
                              root, filename, filenamesz );
        memcpy ( filename + filenamesz, endname, 1 );
        getBufferGetFromEnd ( __func__, __LINE__, 
                              root, &filetype, sizeof ( filetype ));
208
        xassert ( filetype >= MINFILETYPE && filetype <= MAXFILETYPE );
209
210
        streamID = streamOpenWrite ( filename, filetype );
        xdebug ( "READ FUNCTION CALL FROM WIN:  %s, filenamesz=%d,"
211
                 " filename=%s, filetype=%d, OPENED STREAM %d", 
212
                 funcMap[funcID], filenamesz, filename, filetype, streamID );
213
        free ( filename );
214
      }
Deike Kleberg's avatar
Deike Kleberg committed
215
      break; 
216
217
218
219
220
221
222
223
224
225
226
227
228
229
    case STREAMDEFVLIST:
      {
        int streamID, vlistID;

        getBufferGetFromEnd ( __func__, __LINE__, 
                              root, &streamID, sizeof ( vlistID ));
        getBufferGetFromEnd ( __func__, __LINE__, 
                              root, &vlistID, sizeof ( vlistID ));
        streamDefVlist ( streamID, vlistID );
        xdebug ( "READ FUNCTION CALL FROM WIN:  %s, streamID=%d,"
                 " vlistID=%d, called streamDefVlist ().", 
                 funcMap[funcID], streamID, vlistID );
      }
      break;
Deike Kleberg's avatar
Deike Kleberg committed
230
    default:
231
      xabort ( "REMOTE FUNCTIONCALL NOT IMPLEMENTED!" );
Deike Kleberg's avatar
Deike Kleberg committed
232
233
234
    }
  getBufferGetFromEnd ( __func__, __LINE__, 
                        root, &tokenID, sizeof ( tokenID ));
Deike Kleberg's avatar
Deike Kleberg committed
235
  xassert ( tokenID == SEPARATOR );
Deike Kleberg's avatar
Deike Kleberg committed
236
237
238
239
}

/************************************************************************/

Deike Kleberg's avatar
Deike Kleberg committed
240
241
242
243
244
245
246
static
void readGetBuffers ( int tsID, int vdate, int vtime )
{
  int modelID;
  double * data = NULL, * dataHead = NULL;
  int streamID = CDI_UNDEFID, streamIDNew = CDI_UNDEFID;
  int tempID, varID, vlistID, taxisID;
247
  int size, offset, chunk, chunkSize;
Deike Kleberg's avatar
Deike Kleberg committed
248
  int tokenID, tokenID2;
249
250
251
  
  int nmiss = 0;
  char text[1024];
252
  int nProcsModel = commInqNProcsModel ();
Deike Kleberg's avatar
Deike Kleberg committed
253
  int root        = commInqRootGlob ();
254
  
255
  xdebug("%s", "START");
Deike Kleberg's avatar
Deike Kleberg committed
256
257
258
  
  getBufferGetFromEnd ( __func__, __LINE__, 
                        root, &tokenID, sizeof ( tokenID ));
259

Deike Kleberg's avatar
Deike Kleberg committed
260
  while ( tokenID != END )
261
    {
Deike Kleberg's avatar
Deike Kleberg committed
262
      switch ( tokenID )
263
        {
264
        case DATATOKEN:
Deike Kleberg's avatar
Deike Kleberg committed
265
266
267
          getBufferGetFromEnd ( __func__, __LINE__, 
                                root, &streamIDNew, sizeof ( streamID ));
          if ( streamIDNew != streamID )
268
            {
Deike Kleberg's avatar
Deike Kleberg committed
269
              streamID = streamIDNew;
270
271
              vlistID = streamInqVlist ( streamID );
              taxisID = vlistInqTaxis ( vlistID );
272
              taxisDefVdate ( taxisID, vdate );
273
274
275
              taxisDefVtime ( taxisID, vtime );
              streamDefTimestep ( streamID, tsID );
            }
Deike Kleberg's avatar
Deike Kleberg committed
276
277
          getBufferGetFromEnd ( __func__, __LINE__, 
                                root, &varID, sizeof ( varID ));
278
279
280
281
          size = vlistInqVarSize ( vlistID, varID );
          data = xmalloc ( size * sizeof ( double ));
          dataHead = data;
          
Deike Kleberg's avatar
Deike Kleberg committed
282
          for ( modelID = 0; modelID < nProcsModel; modelID++ )
283
            {
Deike Kleberg's avatar
Deike Kleberg committed
284
              if ( modelID != root )
285
                {
Deike Kleberg's avatar
Deike Kleberg committed
286
287
                  getBufferGetFromEnd ( __func__, __LINE__, 
                                        modelID, &tempID, sizeof ( tempID ));
Deike Kleberg's avatar
Deike Kleberg committed
288
                  xassert ( tempID == DATATOKEN );
Deike Kleberg's avatar
Deike Kleberg committed
289
290
                  getBufferGetFromEnd ( __func__, __LINE__, 
                                        modelID, &tempID, sizeof ( tempID ));
Deike Kleberg's avatar
Deike Kleberg committed
291
                  xassert ( tempID == streamID );
Deike Kleberg's avatar
Deike Kleberg committed
292
293
                  getBufferGetFromEnd ( __func__, __LINE__, 
                                        modelID, &tempID, sizeof ( tempID ));
Deike Kleberg's avatar
Deike Kleberg committed
294
                  xassert ( tempID == varID );
295
                }
296
              chunk  = vlistInqVarDecoChunk ( vlistID, varID, modelID );
Deike Kleberg's avatar
Deike Kleberg committed
297
298
              getBufferGetFromEnd ( __func__, __LINE__, 
                                    modelID, dataHead, chunk * sizeof ( double ));
299
              dataHead += chunk;
Deike Kleberg's avatar
Deike Kleberg committed
300
301
              getBufferGetFromEnd ( __func__, __LINE__, 
                                    modelID, &nmiss   , sizeof ( nmiss ));
Deike Kleberg's avatar
Deike Kleberg committed
302
303
              getBufferGetFromEnd ( __func__, __LINE__, 
                                    modelID, &tokenID2, sizeof ( tokenID2 ));
Deike Kleberg's avatar
Deike Kleberg committed
304
              xassert ( tokenID2 == SEPARATOR );
305
306
            }
          
307
          streamWriteVar ( streamID, varID, data, nmiss );
308
          
309
310
311
312
313
314
315
316
317
          if ( ddebug > 2 )
            {
              sprintf ( text, "streamID=%d, var[%d], size=%d", streamID, varID, size );
              xprintArray ( text, data, size, DATATYPE_FLT );
            }
          
          free ( data );
          break;
        case FUNCCALL:
Deike Kleberg's avatar
Deike Kleberg committed
318
          readFuncCall ();
319
320
          break;
        default:
321
          xabort ( "BUFFER NOT READABLE!" );           
322
        }
Deike Kleberg's avatar
Deike Kleberg committed
323
324
      getBufferGetFromEnd ( __func__, __LINE__, 
                            root, &tokenID, sizeof ( tokenID ));
325
    }
326
  xdebug("%s", "RETURN");
327
328
329
330
} 

/************************************************************************/

Deike Kleberg's avatar
Deike Kleberg committed
331
332
333
334
335
336

static 
  void getFlushBuffer ( int modelID )
{
  int nProcsModel = commInqNProcsModel ();

Deike Kleberg's avatar
Deike Kleberg committed
337
338
  xassert ( modelID                >= 0           &&
            modelID                 < nProcsModel &&
339
            rxWin != NULL && rxWin[modelID].buffer != NULL &&
340
341
            rxWin[modelID].size > 0 &&
            rxWin[modelID].size <= MAXWINBUFFERSIZE );
342
  memset(rxWin[modelID].buffer, 0, rxWin[modelID].size);
343
  rxWin[modelID].head = rxWin[modelID].buffer;
Deike Kleberg's avatar
Deike Kleberg committed
344
345
346
347
348
349
}


/************************************************************************/


350
static
351
void getData ( int tsID, int vdate, int vtime )
Deike Kleberg's avatar
Deike Kleberg committed
352
{
Deike Kleberg's avatar
Deike Kleberg committed
353
  int iAssert   = 0, modelID;
354
  char text[1024];
355
  int nProcsModel = commInqNProcsModel ();
Thomas Jahns's avatar
Thomas Jahns committed
356
357
  void *getWinBaseAddr;
  int attrFound;
358
          
359
  xdebug("%s", "START");
Deike Kleberg's avatar
Deike Kleberg committed
360
361

  // todo put in correct lbs and ubs
Deike Kleberg's avatar
Deike Kleberg committed
362
  xassert ( tsID >= 0 && vdate >= 0 && vtime >= 0 );
363
  xmpi ( MPI_Win_start ( groupModel, iAssert, getWin )); 
Deike Kleberg's avatar
Deike Kleberg committed
364
365
366
  for ( modelID = 0; modelID < nProcsModel; modelID++ )
    {
      getFlushBuffer ( modelID );
Thomas Jahns's avatar
Thomas Jahns committed
367
368
      xmpi(MPI_Win_get_attr(getWin, MPI_WIN_BASE, &getWinBaseAddr, &attrFound));
      xassert(attrFound);
369
      xdebug("modelID=%d, nProcsModel=%d, rxWin[%d].size=%d,"
Thomas Jahns's avatar
Thomas Jahns committed
370
             " getWin=%p, sizeof(int)=%u",
371
             modelID, nProcsModel, modelID, rxWin[modelID].size,
Thomas Jahns's avatar
Thomas Jahns committed
372
             getWinBaseAddr, (unsigned)sizeof(int));
373
374
375
      xmpi(MPI_Get(rxWin[modelID].buffer, rxWin[modelID].size,
                   MPI_UNSIGNED_CHAR, modelID, 0,
                   rxWin[modelID].size, MPI_UNSIGNED_CHAR, getWin));
Deike Kleberg's avatar
Deike Kleberg committed
376
    }
377
  xmpi ( MPI_Win_complete ( getWin ));
Deike Kleberg's avatar
Deike Kleberg committed
378

379
  if ( ddebug > 2 )
Deike Kleberg's avatar
Deike Kleberg committed
380
    for ( modelID = 0; modelID < nProcsModel; modelID++ )
381
      {
382
        sprintf(text, "rxWin[%d].size=%d from PE%d rxWin[%d].buffer",
383
                modelID, rxWin[modelID].size, modelID, modelID);
384
        xprintArray(text, rxWin[modelID].buffer,
385
386
                    rxWin[modelID].size / sizeof (double),
                    DATATYPE_FLT);
387
      }
Deike Kleberg's avatar
Deike Kleberg committed
388
  readGetBuffers ( tsID, vdate, vtime );
389
          
390
  xdebug("%s", "RETURN");
Deike Kleberg's avatar
Deike Kleberg committed
391
}
Deike Kleberg's avatar
Deike Kleberg committed
392
393
394
395
396
397
398
399
400
401
402

/************************************************************************/

/**
  @brief is encapsulated in CDI library and run on I/O PEs.

  @param

  @return
*/

403
void IOServer ()
Deike Kleberg's avatar
Deike Kleberg committed
404
{
405
406
  int source, tag, * iBuffer, size, nProcsModel = commInqNProcsModel ();
  static int nfinished = 0;
Deike Kleberg's avatar
Deike Kleberg committed
407
  char * buffer;
408
409
  MPI_Comm commCalc;
  MPI_Status status;
Deike Kleberg's avatar
Deike Kleberg committed
410

411
  xdebug("%s", "START");
Deike Kleberg's avatar
Deike Kleberg committed
412

413
  backendInit ();
414
415
  if ( commInqRankNode () == commInqSpecialRankNode ()) 
    backendFinalize ();
416
417
  commCalc = commInqCommCalc ();

Deike Kleberg's avatar
Deike Kleberg committed
418
  for ( ;; )
419
    {
Deike Kleberg's avatar
Deike Kleberg committed
420
      xmpi ( MPI_Probe ( MPI_ANY_SOURCE, MPI_ANY_TAG, commCalc, &status ));
421
      
Deike Kleberg's avatar
Deike Kleberg committed
422
423
      source = status.MPI_SOURCE;
      tag    = status.MPI_TAG;
424
      
Deike Kleberg's avatar
Deike Kleberg committed
425
      switch ( tag )
426
427
428
        {
        case FINALIZE: 
          xdebugMsg (  tag, source, nfinished );
429
          iBuffer = xmalloc ( sizeof ( int ));     
430
          xmpi ( MPI_Recv ( iBuffer, 1, MPI_INTEGER, source, 
431
                            tag, commCalc, &status ));
432
          xdebug("%s", "RECEIVED MESSAGE WITH TAG \"FINALIZE\"");
433
          free ( iBuffer );
434
          nfinished++;
Deike Kleberg's avatar
Deike Kleberg committed
435
          xdebug ( "nfinished=%d, nProcsModel=%d", 
436
437
438
                   nfinished, nProcsModel );
          
          if ( nfinished == nProcsModel )  
Deike Kleberg's avatar
Deike Kleberg committed
439
            {              
440
              {
Deike Kleberg's avatar
Deike Kleberg committed
441
                int nStreams = streamSize ();
442
                
Deike Kleberg's avatar
Deike Kleberg committed
443
444
445
446
447
448
                if ( nStreams > 0 )
                  {
                    int streamNo;
                    int * resHs;
                    
                    resHs       = xmalloc ( nStreams * sizeof ( resHs[0] ));
449
                    streamGetIndexList ( nStreams, resHs );
Deike Kleberg's avatar
Deike Kleberg committed
450
451
452
453
                    for ( streamNo = 0; streamNo < nStreams; streamNo++ )
                      streamClose ( resHs[streamNo] );
                    free ( resHs );
                  }
Deike Kleberg's avatar
Deike Kleberg committed
454
455
              }
              backendCleanup (); 
456
              serverWinCleanup ();                 
457
              /* listDestroy(); */
458
              xdebug("%s", "RETURN");
Deike Kleberg's avatar
Deike Kleberg committed
459
460
              return;
            }
461
	  
462
463
          break;
          
Deike Kleberg's avatar
Deike Kleberg committed
464
	case RESOURCES:
465
	  xdebugMsg (  tag, source, nfinished );
466
	  xmpi ( MPI_Get_count ( &status, MPI_CHAR, &size ));
467
	  buffer = xmalloc ( size * sizeof ( char ));
468
469
	  xmpi ( MPI_Recv ( buffer, size, MPI_PACKED, source,
                            tag, commCalc, &status ));
470
          xdebug("%s", "RECEIVED MESSAGE WITH TAG \"RESOURCES\"");
471
	  rpcUnpackResources ( buffer, size, commCalc );
472
          xdebug("%s", "");
Deike Kleberg's avatar
Deike Kleberg committed
473
	  free ( buffer );
Deike Kleberg's avatar
Deike Kleberg committed
474
          if ( ddebug > 0 && commInqRankGlob () == nProcsModel ) 
475
            reshListPrint ( "reshListIOServer" );
Deike Kleberg's avatar
Deike Kleberg committed
476
	  serverWinCreate ();
477
	  break;
Deike Kleberg's avatar
Deike Kleberg committed
478

479
	case WRITETS:
480
	  xdebugMsg (  tag, source, nfinished );
481
          xmpi ( MPI_Get_count ( &status, MPI_INTEGER, &size ));     
Deike Kleberg's avatar
Deike Kleberg committed
482
          xassert ( size == timestepSize );
483
484
485
          iBuffer = xmalloc ( size * sizeof ( int ));
          xmpi ( MPI_Recv ( iBuffer, size, MPI_INTEGER, source,
                            tag, commCalc, &status ));
Deike Kleberg's avatar
Deike Kleberg committed
486
487
488
          xdebug ( "RECEIVED MESSAGE WITH TAG \"WRITETS\": "
                   "tsID=%d, vdate=%d, vtime=%d, source=%d", 
                   iBuffer[0], iBuffer[1], iBuffer[2], source );
489
490
          
          getData ( iBuffer[0], iBuffer[1], iBuffer[2] );                
491
          free ( iBuffer );
Deike Kleberg's avatar
Deike Kleberg committed
492
	  break;
Deike Kleberg's avatar
Deike Kleberg committed
493

Deike Kleberg's avatar
Deike Kleberg committed
494
	default:
Deike Kleberg's avatar
Deike Kleberg committed
495
	  xabort ( "TAG NOT DEFINED!" );
496
	}
Deike Kleberg's avatar
Deike Kleberg committed
497
498
    }
}
499

500
#endif
501
502
503
504
505
506
507
508
509
/*
 * Local Variables:
 * c-file-style: "Java"
 * c-basic-offset: 2
 * indent-tabs-mode: nil
 * show-trailing-whitespace: t
 * require-trailing-newline: t
 * End:
 */