pio_posixfpguardsendrecv.c 14 KB
Newer Older
1
2
3
4
5
6
/* 
   todo 
   build in control, for consistance of pairs filename / filenumber 
   ( pioOpenFile member name, recv in tmpbuffer, if(!uniqueName(q,v,n))abort )
*/

7
8
9
10
11
#ifdef HAVE_CONFIG_H
#  include "config.h"
#endif

#ifdef USE_MPI
Deike Kleberg's avatar
Deike Kleberg committed
12

13
#include <inttypes.h>
Deike Kleberg's avatar
Deike Kleberg committed
14
#include <stdio.h>
15
#include <stdbool.h>
Deike Kleberg's avatar
Deike Kleberg committed
16
17
18
19
#include <string.h>

#include "mpi.h"
#include "pio.h"
20
#include "pio_comm.h"
Deike Kleberg's avatar
Deike Kleberg committed
21
#include "pio_impl.h"
Deike Kleberg's avatar
Deike Kleberg committed
22
23
#include "pio_util.h"

Deike Kleberg's avatar
Deike Kleberg committed
24

25
extern char * command2charP[6];
Deike Kleberg's avatar
Deike Kleberg committed
26

27
28
extern long initial_buffersize;

Deike Kleberg's avatar
Deike Kleberg committed
29
30
31
32
33
extern double accumProbe;
extern double accumRecv;
extern double accumSend;
extern double accumSuspend;
extern double accumWait;
34
extern double accumWrite; 
Deike Kleberg's avatar
Deike Kleberg committed
35
36
37
38
39
40
41
42
43

typedef struct 
{
  size_t size;
  struct dBuffer *db1;
  struct dBuffer *db2;
  struct dBuffer *db;
  FILE *fp;
  IO_Server_command command;
44
  int tsID, fileID;
45
  char name[];
46
} aFiledataPF;
Deike Kleberg's avatar
Deike Kleberg committed
47

48
static int
49
fileIDTestA(void *a, void *fileID)
50
{
51
  return ((aFiledataPF *)a)->fileID == (int)(intptr_t)fileID;
52
53
}

Deike Kleberg's avatar
Deike Kleberg committed
54
55
56
57
58
typedef struct 
{
  long offset;
  bool finished;
  bool *nfinished;
59
  int fileID;
60
} bFiledataPF;
Deike Kleberg's avatar
Deike Kleberg committed
61

62
static int
63
fileIDTestB(void *a, void *fileID)
64
{
65
  return ((bFiledataPF *)a)->fileID == (int)(intptr_t)fileID;
66
67
68
}

static bool
69
fileIDCmpB(void *a, void *b)
70
{
71
  return ((bFiledataPF *)a)->fileID == ((bFiledataPF *)b)->fileID;
72
73
}

74
static listSet *bibAFiledataPF;
Deike Kleberg's avatar
Deike Kleberg committed
75
76
77

/***************************************************************/
  
78
static aFiledataPF *initAFiledataPF ( const char *key, size_t bs)
Deike Kleberg's avatar
Deike Kleberg committed
79
{
80
  aFiledataPF *afd;
Deike Kleberg's avatar
Deike Kleberg committed
81
  size_t len;
82
  int iret;
Deike Kleberg's avatar
Deike Kleberg committed
83

84
85
  len = strlen(key);
  afd = xcalloc(1, sizeof (*afd) + len + 1);
Thomas Jahns's avatar
Thomas Jahns committed
86
  strcpy(afd->name, key);
87
  afd->size = bs;
88
  afd->tsID = 0;
Deike Kleberg's avatar
Deike Kleberg committed
89
90
91

  /* init output buffer */

Deike Kleberg's avatar
Deike Kleberg committed
92
  xdebug ( " name=%s, init output buffer",  afd->name );
Deike Kleberg's avatar
Deike Kleberg committed
93
   
94
95
96
97
  iret = dbuffer_init ( &( afd->db1 ), afd->size );
  iret += dbuffer_init ( &( afd->db2 ), afd->size );

  if ( iret > 0 )
98
    xabort("dbuffer_init did not succeed");
99

100
  afd->db = afd->db1;
Deike Kleberg's avatar
Deike Kleberg committed
101
102

  /* open file */ 
Deike Kleberg's avatar
Deike Kleberg committed
103
  xdebug ( "name=%s, open file",  afd->name );
Deike Kleberg's avatar
Deike Kleberg committed
104

105
  if ( ( afd->fp = fopen ( afd->name, "w" )) == NULL ) 
Thomas Jahns's avatar
Thomas Jahns committed
106
    xabort("Failed to open %s", afd->name);
Deike Kleberg's avatar
Deike Kleberg committed
107

108
  afd->command = IO_Open_file;
Deike Kleberg's avatar
Deike Kleberg committed
109
  
110
  return afd;
Deike Kleberg's avatar
Deike Kleberg committed
111
112
}

Deike Kleberg's avatar
Deike Kleberg committed
113
/***************************************************************/
114
115
static bFiledataPF *
initBFiledataPF(int key, int nc)
Deike Kleberg's avatar
Deike Kleberg committed
116
{
117
  bFiledataPF *bfd;
Deike Kleberg's avatar
Deike Kleberg committed
118
119
  int i;

Deike Kleberg's avatar
Deike Kleberg committed
120
  bfd = ( bFiledataPF * ) xmalloc ( sizeof ( bFiledataPF ));
121
122
123
  memset ( bfd, 0, sizeof ( bFiledataPF ));
  bfd->offset = 0;
  bfd->finished = false;
Deike Kleberg's avatar
Deike Kleberg committed
124
  bfd->nfinished = ( bool * ) xmalloc ( nc * sizeof ( bool ));
Deike Kleberg's avatar
Deike Kleberg committed
125
126

  for ( i = 0; i < nc; i++ )
Deike Kleberg's avatar
Deike Kleberg committed
127
    *( bfd->nfinished + i ) = true;
Deike Kleberg's avatar
Deike Kleberg committed
128

129
  return bfd;
Deike Kleberg's avatar
Deike Kleberg committed
130
131
}

Deike Kleberg's avatar
Deike Kleberg committed
132
133
/***************************************************************/

134
135
static int
destroyAFiledataPF(void *v)
Deike Kleberg's avatar
Deike Kleberg committed
136
137
{
  int iret = 0;
138
  aFiledataPF *afd = ( aFiledataPF * ) v;
Thomas Jahns's avatar
Thomas Jahns committed
139

Deike Kleberg's avatar
Deike Kleberg committed
140
  /* close file */
141
142
  xdebug("name=%s, close file", afd->name);
  if ((iret = fclose(afd->fp)) == EOF)
Thomas Jahns's avatar
Thomas Jahns committed
143
    xabort("Failed to close %s", afd->name);
Deike Kleberg's avatar
Deike Kleberg committed
144
145

  /* file closed, cleanup */
146
147
148
  xdebug("name=%s, file closed, cleanup ...",  afd->name);
  dbuffer_cleanup(&(afd->db1));
  dbuffer_cleanup(&(afd->db2));
Deike Kleberg's avatar
Deike Kleberg committed
149

150
  free(afd);
Deike Kleberg's avatar
Deike Kleberg committed
151
152
153
154

  return iret;
}

Deike Kleberg's avatar
Deike Kleberg committed
155
156
/***************************************************************/

157
158
static int
destroyBFiledataPF(void *v)
Deike Kleberg's avatar
Deike Kleberg committed
159
160
{
  int iret = 0;
161
  bFiledataPF *bfd = (bFiledataPF * ) v;
Deike Kleberg's avatar
Deike Kleberg committed
162
  
163
164
  free ( bfd->nfinished );
  free ( bfd );
Deike Kleberg's avatar
Deike Kleberg committed
165
166
167
168

  return iret;
}

Deike Kleberg's avatar
Deike Kleberg committed
169
170
/***************************************************************/

171
172
static bool
compareNamesAPF(void *v1, void *v2)
Deike Kleberg's avatar
Deike Kleberg committed
173
{
174
  aFiledataPF *afd1, *afd2;
Deike Kleberg's avatar
Deike Kleberg committed
175
176
177
  size_t len;
  bool ret;

178
179
  afd1 = ( aFiledataPF * ) v1;
  afd2 = ( aFiledataPF * ) v2;
Deike Kleberg's avatar
Deike Kleberg committed
180

181
182
183
  len = strlen ( afd1->name );
  ret = ( len == strlen ( afd2->name ) && 
	  memcmp ( afd1->name, afd2->name, len ) == 0 );
Deike Kleberg's avatar
Deike Kleberg committed
184
185
186
187

  return ret;
}

Deike Kleberg's avatar
Deike Kleberg committed
188
189
/***************************************************************/

Deike Kleberg's avatar
Deike Kleberg committed
190
void fpgPOSIXFPGUARDSENDRECV ( void )
Deike Kleberg's avatar
Deike Kleberg committed
191
{
192
  int i, source, iret;
193
  struct fileOpTag rtag;
Deike Kleberg's avatar
Deike Kleberg committed
194
  MPI_Status status;
195
  bFiledataPF *bfd; 
196
  listSet *bibBFiledataPF;
Deike Kleberg's avatar
Deike Kleberg committed
197
  long amount;
198
  MPI_Comm commNode = commInqCommNode ();
Deike Kleberg's avatar
Deike Kleberg committed
199
  int nProcsCollNode =  commInqSizeNode () - commInqSizeColl ();
200
  bool * sentFinalize, doFinalize = false;
Deike Kleberg's avatar
Deike Kleberg committed
201

Deike Kleberg's avatar
Deike Kleberg committed
202
  xdebug ( "ncollectors=%d on this node", nProcsCollNode );
Deike Kleberg's avatar
Deike Kleberg committed
203
  
204
  bibBFiledataPF = listSetNew( destroyBFiledataPF, fileIDCmpB);
205
  sentFinalize = xmalloc ( nProcsCollNode * sizeof ( sentFinalize[0] ));
Deike Kleberg's avatar
Deike Kleberg committed
206

Deike Kleberg's avatar
Deike Kleberg committed
207
208
  for ( ;; )
    {
209
      xmpi ( MPI_Probe ( MPI_ANY_SOURCE, MPI_ANY_TAG, commNode, &status ));
Deike Kleberg's avatar
Deike Kleberg committed
210
      source = status.MPI_SOURCE;
211
      rtag = decodeFileOpTag(status.MPI_TAG);
Deike Kleberg's avatar
Deike Kleberg committed
212
      
213
214
      xdebug("receive message from source=%d, id=%d, command=%d ( %s )",
             source, rtag.id, rtag.command, command2charP[rtag.command]);
Deike Kleberg's avatar
Deike Kleberg committed
215
      
216
      switch (rtag.command)
Deike Kleberg's avatar
Deike Kleberg committed
217
218
      	{
      	case IO_Open_file:
219

220
          if (!(bfd = listSetGet(bibBFiledataPF, fileIDTestB,
221
                               (void *)(intptr_t)rtag.id)))
Deike Kleberg's avatar
Deike Kleberg committed
222
	    {
223
	      bfd = initBFiledataPF(rtag.id, nProcsCollNode);
224

225
	      if ((iret = listSetAdd(bibBFiledataPF, bfd)) < 0)
226
		xabort("fileID=%d not unique", rtag.id);
Deike Kleberg's avatar
Deike Kleberg committed
227
228
	    }

Deike Kleberg's avatar
Deike Kleberg committed
229
230
	  *( bfd->nfinished + source ) = false;

231
232
          xdebug("id=%d, command=%d ( %s ), send offset=%ld", rtag.id,
                 rtag.command, command2charP[rtag.command], bfd->offset);
Deike Kleberg's avatar
Deike Kleberg committed
233
	  
Deike Kleberg's avatar
Deike Kleberg committed
234
235
	  xmpi ( MPI_Sendrecv ( &( bfd->offset ), 1, MPI_LONG, source,  status.MPI_TAG,
                                &amount, 1, MPI_LONG, source,  status.MPI_TAG,
236
                                commNode, &status ));
Deike Kleberg's avatar
Deike Kleberg committed
237

238
	  bfd->offset += amount; 
Deike Kleberg's avatar
Deike Kleberg committed
239
 
240
241
242
          xdebug("id=%d, command=%d ( %s ), recv amount=%ld, set offset=%ld",
                 rtag.id, rtag.command, command2charP[rtag.command], amount,
                 bfd->offset);
Deike Kleberg's avatar
Deike Kleberg committed
243
244
245
246

	  break;

	case IO_Set_fp:
247

248
          if (!(bfd = listSetGet(bibBFiledataPF, fileIDTestB,
249
                               (void *)(intptr_t)rtag.id)))
250
            xabort("fileId=%d not in set", rtag.id);
Deike Kleberg's avatar
Deike Kleberg committed
251

252
253
          xdebug("id=%d, command=%d ( %s ), send offset=%ld", rtag.id,
                 rtag.command, command2charP[rtag.command], bfd->offset);
Deike Kleberg's avatar
Deike Kleberg committed
254

Deike Kleberg's avatar
Deike Kleberg committed
255
256
	  xmpi ( MPI_Sendrecv ( &( bfd->offset ), 1, MPI_LONG, source,  status.MPI_TAG,
                                &amount, 1, MPI_LONG, source,  status.MPI_TAG,
257
                                commNode, &status ));
Deike Kleberg's avatar
Deike Kleberg committed
258

259
	  bfd->offset += amount;
Deike Kleberg's avatar
Deike Kleberg committed
260

261
262
263
          xdebug("id=%d, command=%d ( %s ), recv amount=%ld, set offset=%ld",
                 rtag.id, rtag.command, command2charP[rtag.command], amount,
                 bfd->offset);
Deike Kleberg's avatar
Deike Kleberg committed
264
265
266
267

	  break;

	case IO_Close_file:
268

269
          if (!(bfd = listSetGet(bibBFiledataPF, fileIDTestB,
270
                               (void *)(intptr_t)rtag.id)))
271
            xabort("fileId=%d not in set", rtag.id);
Deike Kleberg's avatar
Deike Kleberg committed
272

273
274
          xdebug("id=%d, command=%d ( %s )), send offset=%ld", rtag.id,
                 rtag.command, command2charP[rtag.command], bfd->offset);
Deike Kleberg's avatar
Deike Kleberg committed
275

Deike Kleberg's avatar
Deike Kleberg committed
276
277
	  xmpi ( MPI_Sendrecv ( &( bfd->offset ), 1, MPI_LONG, source,  status.MPI_TAG,
                                &amount, 1, MPI_LONG, source,  status.MPI_TAG,
278
                                commNode, &status ));
Deike Kleberg's avatar
Deike Kleberg committed
279

280
	  bfd->offset += amount;
Deike Kleberg's avatar
Deike Kleberg committed
281

282
283
284
          xdebug("id=%d, command=%d ( %s ), recv amount=%ld, set offset=%ld",
                 rtag.id, rtag.command, command2charP[rtag.command], amount,
                 bfd->offset);
Deike Kleberg's avatar
Deike Kleberg committed
285

286
287

	  bfd->nfinished[source] = true;  
288
	  bfd->finished          = true;
Deike Kleberg's avatar
Deike Kleberg committed
289
	  
Deike Kleberg's avatar
Deike Kleberg committed
290
	  for ( i = 0; i < nProcsCollNode; i++ )
291
	    if ( !( bfd->nfinished[i] ))
Deike Kleberg's avatar
Deike Kleberg committed
292
	      {
293
		bfd->finished = false;
Deike Kleberg's avatar
Deike Kleberg committed
294
295
		break;
	      }
296

297
	  if ( bfd->finished )
298
299
            listSetRemove(bibBFiledataPF, fileIDTestB,
                          (void *)(intptr_t)rtag.id);
300
          break;
Deike Kleberg's avatar
Deike Kleberg committed
301
302
        case IO_Finalize:
          {  
303
            int buffer = CDI_UNDEFID, collID; 
Deike Kleberg's avatar
Deike Kleberg committed
304
305
306

            xmpi ( MPI_Recv ( &buffer, 1, MPI_INT, source, status.MPI_TAG,
                              commNode, &status ));
307
308
309
310
311
312
313
314
315
            sentFinalize[source] = true;
            doFinalize = true;
            for ( collID = 0; collID < nProcsCollNode; collID++ )
              if ( !sentFinalize[collID] ) 
                {
                  doFinalize = false;
                  break;
                }
            if ( doFinalize )
Deike Kleberg's avatar
Deike Kleberg committed
316
              {
317
318
                if (!listSetIsEmpty(bibBFiledataPF))
                  xabort("set bibBFiledataM not empty");
319
320
                else
                  {
321
322
                    xdebug("%s", "destroy set");
                    listSetDelete(bibBFiledataPF);
323
324
                  }
                return;
Deike Kleberg's avatar
Deike Kleberg committed
325
326
327
328
329
              }
          }
          break;
        default:
          xabort ( "COMMAND NOT IMPLEMENTED" );
Deike Kleberg's avatar
Deike Kleberg committed
330
331
332
333
334
335
	}
    }
}   

//*******************************************************

336
337
static void
writePF(aFiledataPF *afd, int id)
Deike Kleberg's avatar
Deike Kleberg committed
338
339
340
{
  size_t amount, written;
  long offset;
Deike Kleberg's avatar
Deike Kleberg committed
341
  long amountL;
Deike Kleberg's avatar
Deike Kleberg committed
342
343
  int error, tag;
  MPI_Status status;
344
345
  int specialRank = commInqSpecialRankNode ();
  MPI_Comm commNode = commInqCommNode ();
Deike Kleberg's avatar
Deike Kleberg committed
346
347
348
  
  /* send buffersize, recv offset */

349
  amount = dbuffer_data_size ( afd->db );
Deike Kleberg's avatar
Deike Kleberg committed
350
351
  amountL = ( long ) amount;

352
  tag = encodeFileOpTag(id, afd->command);
Deike Kleberg's avatar
Deike Kleberg committed
353
  
354
355
356
  xmpi ( MPI_Sendrecv ( &amountL, 1, MPI_LONG, specialRank, tag,
                        &offset, 1, MPI_LONG, specialRank, tag,
                        commNode, &status ));
Deike Kleberg's avatar
Deike Kleberg committed
357
358
359
  xdebug ( "id=%d, command=%d, amount=%llu, send amountL=%ld, recv offset=%ld", 
           id, afd->command, (unsigned long long)amount,
           amountL, offset );
Deike Kleberg's avatar
Deike Kleberg committed
360
361
362
  
  /* write buffer */
  
363
  if (( error = fseek ( afd->fp, offset, SEEK_SET )) != 0 )
364
    xabort ( "did not succeed seeking fp" );
365

Deike Kleberg's avatar
Deike Kleberg committed
366
  if (( written = 
367
	fwrite ( afd->db->buffer, sizeof ( char ), amount, afd->fp )) !=
Deike Kleberg's avatar
Deike Kleberg committed
368
      amount )
Thomas Jahns's avatar
Thomas Jahns committed
369
370
    xabort("fileId=%d, expect to write %zu byte, written %zu byte",
           id, amount, written);
Deike Kleberg's avatar
Deike Kleberg committed
371
372
373
374
 
  xdebug ( "written %llu bytes in file %d with offset %ld", 
           (unsigned long long) written,
           id, offset ); 
Deike Kleberg's avatar
Deike Kleberg committed
375
376
377
  
  /* change outputBuffer */
  
378
  dbuffer_reset ( afd->db );
Deike Kleberg's avatar
Deike Kleberg committed
379
  
380
  if ( afd->db == afd->db1 )
Deike Kleberg's avatar
Deike Kleberg committed
381
    {
Deike Kleberg's avatar
Deike Kleberg committed
382
      xdebug ( "id=%d, change to buffer 2 ...", id );
383
      afd->db =  afd->db2;
Deike Kleberg's avatar
Deike Kleberg committed
384
385
386
    }
  else 
    {
Deike Kleberg's avatar
Deike Kleberg committed
387
      xdebug ( "id=%d, change to buffer 1 ...", id );
388
      afd->db =  afd->db1;
Deike Kleberg's avatar
Deike Kleberg committed
389
390
    }
  
391
  afd->command = IO_Set_fp;
Deike Kleberg's avatar
Deike Kleberg committed
392
393
394
}


395
396
/***************************************************************/

397
398
static void
defTimestepPF(aFiledataPF *afd, int tsID)
399
400
401
402
403
404
405
{
  if ( afd == NULL || tsID < 0 || tsID != afd->tsID + 1 ) 
    xabort ( " defTimestepPF() didn't succeed." );
  afd->tsID = tsID;
}


Deike Kleberg's avatar
Deike Kleberg committed
406
407
/***************************************************************/

408
409
410
static void
flushOp(void *a, void *tsID)
{
411
  writePF((aFiledataPF *)a, ((aFiledataPF *)a)->fileID);
412
413
414
415
  defTimestepPF((aFiledataPF *)a, (int)(intptr_t)tsID);
}


Deike Kleberg's avatar
Deike Kleberg committed
416
size_t fwPOSIXFPGUARDSENDRECV( int fileID, int tsID, const void *buffer, size_t len )
Deike Kleberg's avatar
Deike Kleberg committed
417
418
419
420
{
  int error = 0;
  int flush = 0;
  int filled = 0;
421
  aFiledataPF *afd;
422

423
  afd = listSetGet(bibAFiledataPF, fileIDTestA, (void *)(intptr_t)fileID);
Deike Kleberg's avatar
Deike Kleberg committed
424

425
  flush = ( tsID != afd->tsID ) ? 1 : 0;
Deike Kleberg's avatar
Deike Kleberg committed
426
427
428

  if ( flush == 1 ) 
    {
Deike Kleberg's avatar
Deike Kleberg committed
429
      xdebug ( "tsID = %d, flush buffer", tsID );
430
      listSetForeach(bibAFiledataPF, flushOp, (void *)(intptr_t)tsID);
431
      xmpi ( MPI_Barrier ( commInqCommColl ())); 
432
    }
Deike Kleberg's avatar
Deike Kleberg committed
433

434
  filled = dbuffer_push ( afd->db, ( unsigned char * ) buffer, len );
Deike Kleberg's avatar
Deike Kleberg committed
435

Deike Kleberg's avatar
Deike Kleberg committed
436
437
  xdebug ( "fileID = %d, tsID = %d, pushed data on buffer, filled = %d", 
           fileID, tsID, filled ); 
Deike Kleberg's avatar
Deike Kleberg committed
438
439
440
441
442
443
444

  if ( filled == 1 ) 
    {
      if ( flush )
	error = filled;
      else
	{
Deike Kleberg's avatar
Deike Kleberg committed
445
	  writePF ( afd, fileID );
Deike Kleberg's avatar
Deike Kleberg committed
446
     
447
	  error = dbuffer_push ( afd->db, ( unsigned char * ) buffer, len );
Deike Kleberg's avatar
Deike Kleberg committed
448
449
450
451
	}
    }
  
  if ( error == 1 )
Thomas Jahns's avatar
Thomas Jahns committed
452
    xabort("did not succeed filling output buffer, fileID=%d", fileID);
Deike Kleberg's avatar
Deike Kleberg committed
453
454
455
456
457
458
459
460
  
  return len;
}

/***************************************************************/

int fcPOSIXFPGUARDSENDRECV ( int id )
{
461
  aFiledataPF *afd;
Deike Kleberg's avatar
Deike Kleberg committed
462
463
  int iret;

464
  xdebug("write buffer, close file %d and cleanup", id);
Deike Kleberg's avatar
Deike Kleberg committed
465

466
  afd = listSetGet(bibAFiledataPF, fileIDTestA, (void *)(intptr_t)id);
Deike Kleberg's avatar
Deike Kleberg committed
467

468
  afd->command = IO_Close_file;
Deike Kleberg's avatar
Deike Kleberg committed
469

470
  writePF ( afd, id );
Deike Kleberg's avatar
Deike Kleberg committed
471

472
473
  /* remove file element */
  iret = listSetRemove(bibAFiledataPF, fileIDTestA, (void *)(intptr_t)id);
Deike Kleberg's avatar
Deike Kleberg committed
474
475
476
477
478

  return iret;
}

/***************************************************************/
479
480
static void
elemCheck(void *q, void *nm)
481
{
482
483
  aFiledataPF *afd = q;
  const char *name = nm;
484

485
  if (!strcmp(name, afd->name))
486
    xabort("Filename %s has already been added to set\n", name);
487
}
Deike Kleberg's avatar
Deike Kleberg committed
488
489
490

int fowPOSIXFPGUARDSENDRECV ( const char *filename )
{
491
  int root = 0, id;
492
  aFiledataPF *afd;
493
  static long buffersize = 0;
Deike Kleberg's avatar
Deike Kleberg committed
494
495

  /* broadcast buffersize to collectors */
496
497
498
499
500
501
  if (!buffersize)
    {
      if (commInqRankColl() == root)
	{
          xdebug("name=%s, broadcast buffersize to collectors ...",
                 filename);
Deike Kleberg's avatar
Deike Kleberg committed
502
	  if ( getenv( "BUFSIZE" ) != NULL )
503
504
505
	    buffersize = atol ( getenv ( "BUFSIZE" ));
	  if ( buffersize < initial_buffersize )
	    buffersize = initial_buffersize;
Deike Kleberg's avatar
Deike Kleberg committed
506
	}
507
      xmpi(MPI_Bcast(&buffersize, 1, MPI_LONG, root, commInqCommColl()));
508
    }
Deike Kleberg's avatar
Deike Kleberg committed
509

510
511
  /* init and add file element */
  listSetForeach(bibAFiledataPF, elemCheck, (void *)filename);
Deike Kleberg's avatar
Deike Kleberg committed
512

513
  afd = initAFiledataPF ( filename, buffersize );
514

515
  if ((id = listSetAdd(bibAFiledataPF, afd)) < 0)
Thomas Jahns's avatar
Thomas Jahns committed
516
    xabort("filename %s not unique", afd->name);
Deike Kleberg's avatar
Deike Kleberg committed
517

518
519
  xdebug("name=%s, init and add aFiledataPF, return id = %d",
         filename, id);
520
  return id;
Deike Kleberg's avatar
Deike Kleberg committed
521
522
523
524
}

/***************************************************************/

Thomas Jahns's avatar
Thomas Jahns committed
525
526
void
finalizePOSIXFPGUARDSENDRECV(void)
Deike Kleberg's avatar
Deike Kleberg committed
527
{
Thomas Jahns's avatar
Thomas Jahns committed
528
  int buffer = 0, tag = encodeFileOpTag(0, IO_Finalize);
529

Thomas Jahns's avatar
Thomas Jahns committed
530
531
  xmpi(MPI_Send(&buffer, 1, MPI_INT, commInqSpecialRankNode(),
                tag, commInqCommNode()));
Deike Kleberg's avatar
Deike Kleberg committed
532

533
534
  if (!listSetIsEmpty(bibAFiledataPF))
    xabort("set bibAFiledataM not empty");
Deike Kleberg's avatar
Deike Kleberg committed
535
536
  else
    {
537
538
      xdebug("%s", "destroy set");
      listSetDelete(bibAFiledataPF);
Deike Kleberg's avatar
Deike Kleberg committed
539
540
541
542
543
    }
}

/***************************************************************/

544
void initPOSIXFPGUARDSENDRECV ( void )
Deike Kleberg's avatar
Deike Kleberg committed
545
{
546
547
  if ( commInqSizeNode () < 2 ) 
    xabort ( "USAGE: # IO PROCESSES ON A PHYSICAL NODE >= 2" );
Deike Kleberg's avatar
Deike Kleberg committed
548
  
549
  if ( commInqRankNode () == commInqSpecialRankNode ()) 
Deike Kleberg's avatar
Deike Kleberg committed
550
    {
551
552
553
554
      commDefCommColl ( 0 );
      commSendNodeInfo ();
      commRecvNodeMap ();
      commDefCommsIO ();
Deike Kleberg's avatar
Deike Kleberg committed
555
      fpgPOSIXFPGUARDSENDRECV ();
Deike Kleberg's avatar
Deike Kleberg committed
556
557
    }
  else
558
559
560
561
562
    {
      commDefCommColl ( 1 );
      commSendNodeInfo ();
      commRecvNodeMap ();
      commDefCommsIO ();
563
      bibAFiledataPF = listSetNew( destroyAFiledataPF, compareNamesAPF );
564
    }
Deike Kleberg's avatar
Deike Kleberg committed
565
566
567
}

#endif
568
569
570
571
572
573
574
575
576
/*
 * Local Variables:
 * c-file-style: "Java"
 * c-basic-offset: 2
 * indent-tabs-mode: nil
 * show-trailing-whitespace: t
 * require-trailing-newline: t
 * End:
 */