pio_server.c 16.2 KB
Newer Older
Deike Kleberg's avatar
Deike Kleberg committed
1
2
/** @file ioServer.c
*/
3
4
5
6
7
8
#ifdef HAVE_CONFIG_H
#  include "config.h"
#endif

#ifdef USE_MPI

Deike Kleberg's avatar
Deike Kleberg committed
9
10
11
12
13
14
15
#include "pio_server.h"


#include <stdlib.h>
#include <stdio.h>
#include "limits.h"
#include "cdi.h"
Deike Kleberg's avatar
Deike Kleberg committed
16
#include "pio.h"
Deike Kleberg's avatar
Deike Kleberg committed
17
#include "pio_comm.h"
Deike Kleberg's avatar
Deike Kleberg committed
18
#include "pio_rpc.h"
Deike Kleberg's avatar
Deike Kleberg committed
19
#include "pio_util.h"
20
21
#include "stream_int.h"
#include "resource_handle.h"
Deike Kleberg's avatar
Deike Kleberg committed
22
#include "vlist_var.h"
23
24

extern resOps streamOps;
25
extern void arrayDestroy ( void );
Deike Kleberg's avatar
Deike Kleberg committed
26

27
28
29
static struct
{
  size_t size;
30
  unsigned char *buffer;
31
32
} *rxWin = NULL;

Deike Kleberg's avatar
Deike Kleberg committed
33
unsigned char ** getBufferHead = NULL;
Thomas Jahns's avatar
Thomas Jahns committed
34
static MPI_Win getWin = MPI_WIN_NULL;
Deike Kleberg's avatar
Deike Kleberg committed
35
MPI_Group        groupModel    = MPI_GROUP_NULL;
Deike Kleberg's avatar
Deike Kleberg committed
36
37


Deike Kleberg's avatar
Deike Kleberg committed
38
39
/************************************************************************/

40
static
Deike Kleberg's avatar
Deike Kleberg committed
41
42
43
void serverWinCleanup ()
{
  int i;
44
  int nProcsCalc = commInqNProcsModel ();
Deike Kleberg's avatar
Deike Kleberg committed
45
  
Deike Kleberg's avatar
Deike Kleberg committed
46
47
48
  if ( getWin != MPI_WIN_NULL )
    xmpi ( MPI_Win_free ( &getWin ));
  
49
50
  if (rxWin)
    {
Deike Kleberg's avatar
Deike Kleberg committed
51
      for ( i = 0; i < nProcsCalc; i++ )
52
53
        free(rxWin[i].buffer);
      free(rxWin);
Deike Kleberg's avatar
Deike Kleberg committed
54
    }
Deike Kleberg's avatar
Deike Kleberg committed
55
  if ( getBufferHead ) free ( getBufferHead );
Deike Kleberg's avatar
Deike Kleberg committed
56
  
57
  xdebug("%s", "cleaned up mpi_win");
Deike Kleberg's avatar
Deike Kleberg committed
58
59
60
}
 
 /************************************************************************/
61

Deike Kleberg's avatar
Deike Kleberg committed
62
static 
Deike Kleberg's avatar
Deike Kleberg committed
63
  void collDefBufferSizes ()
Deike Kleberg's avatar
Deike Kleberg committed
64
{
65
  int nstreams, * streamIndexList, streamNo, vlistID, nvars, varID, iorank;
Deike Kleberg's avatar
Deike Kleberg committed
66
  int modelID, decoChunk, sumGetBufferSizes = 0;
67
  int rankGlob = commInqRankGlob ();
Deike Kleberg's avatar
Deike Kleberg committed
68
  int nProcsModel = commInqNProcsModel ();
69
  int root = commInqRootGlob ();
Deike Kleberg's avatar
Deike Kleberg committed
70

71
  xassert(rxWin != NULL);
Deike Kleberg's avatar
Deike Kleberg committed
72

Deike Kleberg's avatar
Deike Kleberg committed
73
  nstreams = reshCountType ( &streamOps );
74
75
  streamIndexList = xmalloc ( nstreams * sizeof ( streamIndexList[0] ));
  reshGetResHListOfType ( nstreams, streamIndexList, &streamOps );
Deike Kleberg's avatar
Deike Kleberg committed
76
77
  for ( streamNo = 0; streamNo < nstreams; streamNo++ )
    {
78
      // space required for data
79
      vlistID = streamInqVlist ( streamIndexList[streamNo] );
Deike Kleberg's avatar
Deike Kleberg committed
80
81
82
83
      nvars = vlistNvars ( vlistID );
      for ( varID = 0; varID < nvars; varID++ )
        {
          iorank = vlistInqVarIOrank ( vlistID, varID );
Deike Kleberg's avatar
Deike Kleberg committed
84
          xassert ( iorank != CDI_UNDEFID );
Deike Kleberg's avatar
Deike Kleberg committed
85
86
          if ( iorank == rankGlob )
            {
Deike Kleberg's avatar
Deike Kleberg committed
87
              for ( modelID = 0; modelID < nProcsModel; modelID++ )
88
                {
89
                  decoChunk =  vlistInqVarDecoChunk ( vlistID, varID, modelID );
Deike Kleberg's avatar
Deike Kleberg committed
90
                  xassert ( decoChunk > 0 );
91
92
                  rxWin[modelID].size += decoChunk * sizeof (double)
                    + winBufferOverheadChunk * sizeof (int);
93
                }
Deike Kleberg's avatar
Deike Kleberg committed
94
            }
95
    }
Deike Kleberg's avatar
Deike Kleberg committed
96
97
      // space required for the 3 function calls streamOpen, streamDefVlist, streamClose 
      // once per stream and timestep for all collprocs only on the modelproc root
98
99
      rxWin[root].size += 3 * winBufferOverheadFuncCall * sizeof (int)
        + 5 * sizeof (int) + MAXDATAFILENAME;
Deike Kleberg's avatar
Deike Kleberg committed
100
    }
101
  free ( streamIndexList );
Deike Kleberg's avatar
Deike Kleberg committed
102
103

  for ( modelID = 0; modelID < nProcsModel; modelID++ )
104
    {
105
106
      rxWin[modelID].size += winBufferOverhead * sizeof (int);
      sumGetBufferSizes += rxWin[modelID].size;
107
    }
Deike Kleberg's avatar
Deike Kleberg committed
108
  xassert ( sumGetBufferSizes <= MAXWINBUFFERSIZE );
109
  /* xprintArray ( "getBufferSize", getBufferSize, nProcsModel, DATATYPE_INT ); */
Deike Kleberg's avatar
Deike Kleberg committed
110
}
111

Deike Kleberg's avatar
Deike Kleberg committed
112
 /************************************************************************/
113
114

static 
Deike Kleberg's avatar
Deike Kleberg committed
115
116
 void serverWinCreate ()
{ 
Deike Kleberg's avatar
Deike Kleberg committed
117
  int ranks[1], modelID;
118
  MPI_Comm commCalc = commInqCommCalc ();
Deike Kleberg's avatar
Deike Kleberg committed
119
  MPI_Group groupCalc;
120
  int nProcsModel = commInqNProcsModel ();
Deike Kleberg's avatar
Deike Kleberg committed
121

Deike Kleberg's avatar
Deike Kleberg committed
122
  xmpi ( MPI_Win_create ( MPI_BOTTOM, 0, 1, MPI_INFO_NULL,
123
                          commCalc, &getWin ));
Deike Kleberg's avatar
Deike Kleberg committed
124
125

  /* target group */
126
127
  ranks[0] = nProcsModel;
  xmpi ( MPI_Comm_group ( commCalc, &groupCalc ));
Deike Kleberg's avatar
Deike Kleberg committed
128
129
  xmpi ( MPI_Group_excl ( groupCalc, 1, ranks, &groupModel ));

130
  rxWin = xmalloc(nProcsModel * sizeof (rxWin[0]));
Deike Kleberg's avatar
Deike Kleberg committed
131
  collDefBufferSizes ();
132
  /* xprintArray ( "getBufferSizes", getBufferSize, nProcsModel, DATATYPE_INT ); */
Deike Kleberg's avatar
Deike Kleberg committed
133

Deike Kleberg's avatar
Deike Kleberg committed
134
135
136
  getBufferHead = xmalloc ( nProcsModel * sizeof ( getBufferHead[0] ));
  for ( modelID = 0; modelID < nProcsModel; modelID++ )
    {
137
138
      rxWin[modelID].buffer = xmalloc(rxWin[modelID].size);
      getBufferHead[modelID] = rxWin[modelID].buffer;
Deike Kleberg's avatar
Deike Kleberg committed
139
    }
Deike Kleberg's avatar
Deike Kleberg committed
140

141
  xdebug("%s", "created mpi_win, allocated getBuffer");
Deike Kleberg's avatar
Deike Kleberg committed
142
143
}

Deike Kleberg's avatar
Deike Kleberg committed
144
145
/************************************************************************/

146
static
Deike Kleberg's avatar
Deike Kleberg committed
147
148
  void getBufferGetFromEnd ( const char * caller, int line,  
                             int ID, void * argBuffer, size_t size )
149
{
Deike Kleberg's avatar
Deike Kleberg committed
150
151
152
153
154
155
  char text[1024];

  if ( getBufferHead == NULL ||
       argBuffer     == NULL ||
       size           < 0    ||
       ID             < 0    ||
156
       ID             >= commInqNProcsModel () ||
157
       getBufferHead[ID] - rxWin[ID].buffer + size > rxWin[ID].size)
Deike Kleberg's avatar
Deike Kleberg committed
158
    {
159
      sprintf ( text, "caller: %s, line %d, ID = %d, nProcsModel=%d, size = %d, "
160
                "getBufferHead[%d] = %d, rxWin[%d].size = %d",
161
                caller, line, ID, size, ID, commInqNProcsModel (), 
162
                getBufferHead[ID] - rxWin[ID].buffer,
163
                ID, rxWin[ID].size);
Deike Kleberg's avatar
Deike Kleberg committed
164
      xabort ( text );
165
    }  
Deike Kleberg's avatar
Deike Kleberg committed
166
167
168
169
170
171
  memcpy ( argBuffer, getBufferHead[ID], size );
  getBufferHead[ID] += size;
}

/************************************************************************/

Deike Kleberg's avatar
Deike Kleberg committed
172
173
174
175
176
177
178
179
static
  void readFuncCall ( void )
{
  int funcID, tokenID;
  int root = commInqRootGlob ();

  getBufferGetFromEnd ( __func__, __LINE__, 
                        root, &funcID, sizeof ( funcID ));
Deike Kleberg's avatar
Deike Kleberg committed
180
  xassert ( funcID >= MINFUNCID && funcID <= MAXFUNCID );
Deike Kleberg's avatar
Deike Kleberg committed
181
182
183
  
  switch ( funcID )
    {
184
185
186
187
188
189
190
    case STREAMCLOSE:
      {
        int streamID;

        getBufferGetFromEnd ( __func__, __LINE__, 
                              root, &streamID, sizeof ( streamID ));
        streamClose ( streamID );
191
192
193
        xdebug ( "READ FUNCTION CALL FROM WIN:  %s, streamID=%d,"
                 " closed stream", 
                 funcMap[funcID], streamID );
194
195
      }
      break;
Deike Kleberg's avatar
Deike Kleberg committed
196
    case STREAMOPEN:
197
198
199
      {
        char * filename, * endname = "\0";
        size_t filenamesz;
200
        int filetype, streamID;
201
202
203

        getBufferGetFromEnd ( __func__, __LINE__, 
                              root, &filenamesz, sizeof ( filenamesz ));
Deike Kleberg's avatar
Deike Kleberg committed
204
        xassert ( filenamesz > 0 && filenamesz < MAXDATAFILENAME );
205
206
207
208
209
210
        filename = xmalloc ( filenamesz + 1 );
        getBufferGetFromEnd ( __func__, __LINE__, 
                              root, filename, filenamesz );
        memcpy ( filename + filenamesz, endname, 1 );
        getBufferGetFromEnd ( __func__, __LINE__, 
                              root, &filetype, sizeof ( filetype ));
211
        xassert ( filetype >= MINFILETYPE && filetype <= MAXFILETYPE );
212
213
        streamID = streamOpenWrite ( filename, filetype );
        xdebug ( "READ FUNCTION CALL FROM WIN:  %s, filenamesz=%d,"
214
                 " filename=%s, filetype=%d, OPENED STREAM %d", 
215
                 funcMap[funcID], filenamesz, filename, filetype, streamID );
216
        free ( filename );
217
      }
Deike Kleberg's avatar
Deike Kleberg committed
218
      break; 
219
220
221
222
223
224
225
226
227
228
229
230
231
232
    case STREAMDEFVLIST:
      {
        int streamID, vlistID;

        getBufferGetFromEnd ( __func__, __LINE__, 
                              root, &streamID, sizeof ( vlistID ));
        getBufferGetFromEnd ( __func__, __LINE__, 
                              root, &vlistID, sizeof ( vlistID ));
        streamDefVlist ( streamID, vlistID );
        xdebug ( "READ FUNCTION CALL FROM WIN:  %s, streamID=%d,"
                 " vlistID=%d, called streamDefVlist ().", 
                 funcMap[funcID], streamID, vlistID );
      }
      break;
Deike Kleberg's avatar
Deike Kleberg committed
233
    default:
234
      xabort ( "REMOTE FUNCTIONCALL NOT IMPLEMENTED!" );
Deike Kleberg's avatar
Deike Kleberg committed
235
236
237
    }
  getBufferGetFromEnd ( __func__, __LINE__, 
                        root, &tokenID, sizeof ( tokenID ));
Deike Kleberg's avatar
Deike Kleberg committed
238
  xassert ( tokenID == SEPARATOR );
Deike Kleberg's avatar
Deike Kleberg committed
239
240
241
242
}

/************************************************************************/

Deike Kleberg's avatar
Deike Kleberg committed
243
244
245
246
247
248
249
static
void readGetBuffers ( int tsID, int vdate, int vtime )
{
  int modelID;
  double * data = NULL, * dataHead = NULL;
  int streamID = CDI_UNDEFID, streamIDNew = CDI_UNDEFID;
  int tempID, varID, vlistID, taxisID;
250
  int size, offset, chunk, chunkSize;
Deike Kleberg's avatar
Deike Kleberg committed
251
  int tokenID, tokenID2;
252
253
254
  
  int nmiss = 0;
  char text[1024];
255
  int nProcsModel = commInqNProcsModel ();
Deike Kleberg's avatar
Deike Kleberg committed
256
  int root        = commInqRootGlob ();
257
  
258
  xdebug("%s", "START");
Deike Kleberg's avatar
Deike Kleberg committed
259
260
261
  
  getBufferGetFromEnd ( __func__, __LINE__, 
                        root, &tokenID, sizeof ( tokenID ));
262

Deike Kleberg's avatar
Deike Kleberg committed
263
  while ( tokenID != END )
264
    {
Deike Kleberg's avatar
Deike Kleberg committed
265
      switch ( tokenID )
266
        {
267
        case DATATOKEN:
Deike Kleberg's avatar
Deike Kleberg committed
268
269
270
          getBufferGetFromEnd ( __func__, __LINE__, 
                                root, &streamIDNew, sizeof ( streamID ));
          if ( streamIDNew != streamID )
271
            {
Deike Kleberg's avatar
Deike Kleberg committed
272
              streamID = streamIDNew;
273
274
              vlistID = streamInqVlist ( streamID );
              taxisID = vlistInqTaxis ( vlistID );
275
              taxisDefVdate ( taxisID, vdate );
276
277
278
              taxisDefVtime ( taxisID, vtime );
              streamDefTimestep ( streamID, tsID );
            }
Deike Kleberg's avatar
Deike Kleberg committed
279
280
          getBufferGetFromEnd ( __func__, __LINE__, 
                                root, &varID, sizeof ( varID ));
281
282
283
284
          size = vlistInqVarSize ( vlistID, varID );
          data = xmalloc ( size * sizeof ( double ));
          dataHead = data;
          
Deike Kleberg's avatar
Deike Kleberg committed
285
          for ( modelID = 0; modelID < nProcsModel; modelID++ )
286
            {
Deike Kleberg's avatar
Deike Kleberg committed
287
              if ( modelID != root )
288
                {
Deike Kleberg's avatar
Deike Kleberg committed
289
290
                  getBufferGetFromEnd ( __func__, __LINE__, 
                                        modelID, &tempID, sizeof ( tempID ));
Deike Kleberg's avatar
Deike Kleberg committed
291
                  xassert ( tempID == DATATOKEN );
Deike Kleberg's avatar
Deike Kleberg committed
292
293
                  getBufferGetFromEnd ( __func__, __LINE__, 
                                        modelID, &tempID, sizeof ( tempID ));
Deike Kleberg's avatar
Deike Kleberg committed
294
                  xassert ( tempID == streamID );
Deike Kleberg's avatar
Deike Kleberg committed
295
296
                  getBufferGetFromEnd ( __func__, __LINE__, 
                                        modelID, &tempID, sizeof ( tempID ));
Deike Kleberg's avatar
Deike Kleberg committed
297
                  xassert ( tempID == varID );
298
                }
299
              chunk  = vlistInqVarDecoChunk ( vlistID, varID, modelID );
Deike Kleberg's avatar
Deike Kleberg committed
300
301
              getBufferGetFromEnd ( __func__, __LINE__, 
                                    modelID, dataHead, chunk * sizeof ( double ));
302
              dataHead += chunk;
Deike Kleberg's avatar
Deike Kleberg committed
303
304
              getBufferGetFromEnd ( __func__, __LINE__, 
                                    modelID, &nmiss   , sizeof ( nmiss ));
Deike Kleberg's avatar
Deike Kleberg committed
305
306
              getBufferGetFromEnd ( __func__, __LINE__, 
                                    modelID, &tokenID2, sizeof ( tokenID2 ));
Deike Kleberg's avatar
Deike Kleberg committed
307
              xassert ( tokenID2 == SEPARATOR );
308
309
            }
          
310
          streamWriteVar ( streamID, varID, data, nmiss );
311
          
312
313
314
315
316
317
318
319
320
          if ( ddebug > 2 )
            {
              sprintf ( text, "streamID=%d, var[%d], size=%d", streamID, varID, size );
              xprintArray ( text, data, size, DATATYPE_FLT );
            }
          
          free ( data );
          break;
        case FUNCCALL:
Deike Kleberg's avatar
Deike Kleberg committed
321
          readFuncCall ();
322
323
          break;
        default:
324
          xabort ( "BUFFER NOT READABLE!" );           
325
        }
Deike Kleberg's avatar
Deike Kleberg committed
326
327
      getBufferGetFromEnd ( __func__, __LINE__, 
                            root, &tokenID, sizeof ( tokenID ));
328
    }
329
  xdebug("%s", "RETURN");
330
331
332
333
} 

/************************************************************************/

Deike Kleberg's avatar
Deike Kleberg committed
334
335
336
337
338
339

static 
  void getFlushBuffer ( int modelID )
{
  int nProcsModel = commInqNProcsModel ();

Deike Kleberg's avatar
Deike Kleberg committed
340
341
  xassert ( modelID                >= 0           &&
            modelID                 < nProcsModel &&
342
            rxWin != NULL && rxWin[modelID].buffer != NULL &&
Deike Kleberg's avatar
Deike Kleberg committed
343
            getBufferHead          != NULL        &&
344
345
            rxWin[modelID].size > 0 &&
            rxWin[modelID].size <= MAXWINBUFFERSIZE );
346
347
  memset(rxWin[modelID].buffer, 0, rxWin[modelID].size);
  getBufferHead[modelID] = rxWin[modelID].buffer;
Deike Kleberg's avatar
Deike Kleberg committed
348
349
350
351
352
353
}


/************************************************************************/


354
static
355
void getData ( int tsID, int vdate, int vtime )
Deike Kleberg's avatar
Deike Kleberg committed
356
{
Deike Kleberg's avatar
Deike Kleberg committed
357
  int iAssert   = 0, modelID;
358
  char text[1024];
359
  int nProcsModel = commInqNProcsModel ();
Thomas Jahns's avatar
Thomas Jahns committed
360
361
  void *getWinBaseAddr;
  int attrFound;
362
          
363
  xdebug("%s", "START");
Deike Kleberg's avatar
Deike Kleberg committed
364
365

  // todo put in correct lbs and ubs
Deike Kleberg's avatar
Deike Kleberg committed
366
  xassert ( tsID >= 0 && vdate >= 0 && vtime >= 0 );
367
  xmpi ( MPI_Win_start ( groupModel, iAssert, getWin )); 
Deike Kleberg's avatar
Deike Kleberg committed
368
369
370
  for ( modelID = 0; modelID < nProcsModel; modelID++ )
    {
      getFlushBuffer ( modelID );
Thomas Jahns's avatar
Thomas Jahns committed
371
372
      xmpi(MPI_Win_get_attr(getWin, MPI_WIN_BASE, &getWinBaseAddr, &attrFound));
      xassert(attrFound);
373
      xdebug("modelID=%d, nProcsModel=%d, rxWin[%d].size=%d,"
Thomas Jahns's avatar
Thomas Jahns committed
374
             " getWin=%p, sizeof(int)=%u",
375
             modelID, nProcsModel, modelID, rxWin[modelID].size,
Thomas Jahns's avatar
Thomas Jahns committed
376
             getWinBaseAddr, (unsigned)sizeof(int));
377
378
379
      xmpi(MPI_Get(rxWin[modelID].buffer, rxWin[modelID].size,
                   MPI_UNSIGNED_CHAR, modelID, 0,
                   rxWin[modelID].size, MPI_UNSIGNED_CHAR, getWin));
Deike Kleberg's avatar
Deike Kleberg committed
380
    }
381
  xmpi ( MPI_Win_complete ( getWin ));
Deike Kleberg's avatar
Deike Kleberg committed
382

383
  if ( ddebug > 2 )
Deike Kleberg's avatar
Deike Kleberg committed
384
    for ( modelID = 0; modelID < nProcsModel; modelID++ )
385
      {
386
        sprintf(text, "rxWin[%d].size=%d from PE%d rxWin[%d].buffer",
387
                modelID, rxWin[modelID].size, modelID, modelID);
388
        xprintArray(text, rxWin[modelID].buffer,
389
390
                    rxWin[modelID].size / sizeof (double),
                    DATATYPE_FLT);
391
      }
Deike Kleberg's avatar
Deike Kleberg committed
392
  readGetBuffers ( tsID, vdate, vtime );
393
          
394
  xdebug("%s", "RETURN");
Deike Kleberg's avatar
Deike Kleberg committed
395
}
Deike Kleberg's avatar
Deike Kleberg committed
396
397
398
399
400
401
402
403
404
405
406

/************************************************************************/

/**
  @brief is encapsulated in CDI library and run on I/O PEs.

  @param

  @return
*/

407
void IOServer ()
Deike Kleberg's avatar
Deike Kleberg committed
408
{
409
410
  int source, tag, * iBuffer, size, nProcsModel = commInqNProcsModel ();
  static int nfinished = 0;
Deike Kleberg's avatar
Deike Kleberg committed
411
  char * buffer;
412
413
  MPI_Comm commCalc;
  MPI_Status status;
Deike Kleberg's avatar
Deike Kleberg committed
414

415
  xdebug("%s", "START");
Deike Kleberg's avatar
Deike Kleberg committed
416

417
  backendInit ();
418
419
  if ( commInqRankNode () == commInqSpecialRankNode ()) 
    backendFinalize ();
420
421
  commCalc = commInqCommCalc ();

Deike Kleberg's avatar
Deike Kleberg committed
422
  for ( ;; )
423
    {
Deike Kleberg's avatar
Deike Kleberg committed
424
      xmpi ( MPI_Probe ( MPI_ANY_SOURCE, MPI_ANY_TAG, commCalc, &status ));
425
      
Deike Kleberg's avatar
Deike Kleberg committed
426
427
      source = status.MPI_SOURCE;
      tag    = status.MPI_TAG;
428
      
Deike Kleberg's avatar
Deike Kleberg committed
429
      switch ( tag )
430
431
432
        {
        case FINALIZE: 
          xdebugMsg (  tag, source, nfinished );
433
          iBuffer = xmalloc ( sizeof ( int ));     
434
          xmpi ( MPI_Recv ( iBuffer, 1, MPI_INTEGER, source, 
435
                            tag, commCalc, &status ));
436
          xdebug("%s", "RECEIVED MESSAGE WITH TAG \"FINALIZE\"");
437
          free ( iBuffer );
438
          nfinished++;
Deike Kleberg's avatar
Deike Kleberg committed
439
          xdebug ( "nfinished=%d, nProcsModel=%d", 
440
441
442
                   nfinished, nProcsModel );
          
          if ( nfinished == nProcsModel )  
Deike Kleberg's avatar
Deike Kleberg committed
443
            {              
444
              {
Deike Kleberg's avatar
Deike Kleberg committed
445
                int nStreams = streamSize ();
446
                
Deike Kleberg's avatar
Deike Kleberg committed
447
448
449
450
451
452
                if ( nStreams > 0 )
                  {
                    int streamNo;
                    int * resHs;
                    
                    resHs       = xmalloc ( nStreams * sizeof ( resHs[0] ));
453
                    streamGetIndexList ( nStreams, resHs );
Deike Kleberg's avatar
Deike Kleberg committed
454
455
456
457
                    for ( streamNo = 0; streamNo < nStreams; streamNo++ )
                      streamClose ( resHs[streamNo] );
                    free ( resHs );
                  }
Deike Kleberg's avatar
Deike Kleberg committed
458
459
              }
              backendCleanup (); 
460
              serverWinCleanup ();                 
461
              /* listDestroy(); */
462
              xdebug("%s", "RETURN");
Deike Kleberg's avatar
Deike Kleberg committed
463
464
              return;
            }
465
	  
466
467
          break;
          
Deike Kleberg's avatar
Deike Kleberg committed
468
	case RESOURCES:
469
	  xdebugMsg (  tag, source, nfinished );
470
	  xmpi ( MPI_Get_count ( &status, MPI_CHAR, &size ));
471
	  buffer = xmalloc ( size * sizeof ( char ));
472
473
	  xmpi ( MPI_Recv ( buffer, size, MPI_PACKED, source,
                            tag, commCalc, &status ));
474
          xdebug("%s", "RECEIVED MESSAGE WITH TAG \"RESOURCES\"");
475
	  rpcUnpackResources ( buffer, size, commCalc );
476
          xdebug("%s", "");
Deike Kleberg's avatar
Deike Kleberg committed
477
	  free ( buffer );
Deike Kleberg's avatar
Deike Kleberg committed
478
          if ( ddebug > 0 && commInqRankGlob () == nProcsModel ) 
479
            reshListPrint ( "reshListIOServer" );
Deike Kleberg's avatar
Deike Kleberg committed
480
	  serverWinCreate ();
481
	  break;
Deike Kleberg's avatar
Deike Kleberg committed
482

483
	case WRITETS:
484
	  xdebugMsg (  tag, source, nfinished );
485
          xmpi ( MPI_Get_count ( &status, MPI_INTEGER, &size ));     
Deike Kleberg's avatar
Deike Kleberg committed
486
          xassert ( size == timestepSize );
487
488
489
          iBuffer = xmalloc ( size * sizeof ( int ));
          xmpi ( MPI_Recv ( iBuffer, size, MPI_INTEGER, source,
                            tag, commCalc, &status ));
Deike Kleberg's avatar
Deike Kleberg committed
490
491
492
          xdebug ( "RECEIVED MESSAGE WITH TAG \"WRITETS\": "
                   "tsID=%d, vdate=%d, vtime=%d, source=%d", 
                   iBuffer[0], iBuffer[1], iBuffer[2], source );
493
494
          
          getData ( iBuffer[0], iBuffer[1], iBuffer[2] );                
495
          free ( iBuffer );
Deike Kleberg's avatar
Deike Kleberg committed
496
	  break;
Deike Kleberg's avatar
Deike Kleberg committed
497

Deike Kleberg's avatar
Deike Kleberg committed
498
	default:
Deike Kleberg's avatar
Deike Kleberg committed
499
	  xabort ( "TAG NOT DEFINED!" );
500
	}
Deike Kleberg's avatar
Deike Kleberg committed
501
502
    }
}
503

504
#endif
505
506
507
508
509
510
511
512
513
/*
 * Local Variables:
 * c-file-style: "Java"
 * c-basic-offset: 2
 * indent-tabs-mode: nil
 * show-trailing-whitespace: t
 * require-trailing-newline: t
 * End:
 */