pio_posixasynch.c 19.7 KB
Newer Older
1
2
3
4
/* 
   todo  
   README: specialRank Pe closes down, when all output files are closed    
*/
5
6
7
#ifdef HAVE_CONFIG_H
#  include "config.h"
#endif
Deike Kleberg's avatar
Deike Kleberg committed
8

9
10

#ifdef USE_MPI
11
#ifndef _SX
Deike Kleberg's avatar
Deike Kleberg committed
12
13
14
15

#include <stdbool.h>
#include <stdio.h>
#include <string.h>
16
#include <unistd.h>
Deike Kleberg's avatar
Deike Kleberg committed
17
18
19
#include <errno.h>

#include <aio.h>
20
21
#include <sys/types.h>
#include <sys/stat.h>
Deike Kleberg's avatar
Deike Kleberg committed
22
23
24
25
#include <fcntl.h>

#include "mpi.h"
#include "pio.h"
26
#include "pio_comm.h"
Deike Kleberg's avatar
Deike Kleberg committed
27
#include "pio_impl.h"
Deike Kleberg's avatar
Deike Kleberg committed
28
29
#include "pio_util.h"

30
extern char * command2charP[6];
Deike Kleberg's avatar
Deike Kleberg committed
31

32
33
34
extern long initial_buffersize;
extern char *token;

Deike Kleberg's avatar
Deike Kleberg committed
35
36
37
38
39
40
41
42
43
44
45
46
extern double accumProbe;
extern double accumRecv;
extern double accumSend;
extern double accumSuspend;
extern double accumWait;
extern double accumWrite;


typedef struct 
{
  char *name;
  size_t size;
47
  int tsID;
Deike Kleberg's avatar
Deike Kleberg committed
48
49
50
51
  struct dBuffer *db1;
  struct dBuffer *db2;
  struct dBuffer *db;
  IO_Server_command command;
Deike Kleberg's avatar
Deike Kleberg committed
52
  MPI_Request request;
53
} aFiledataPA;
Deike Kleberg's avatar
Deike Kleberg committed
54
55
56
57
58
59
60
61
62
63
64
65
66
67

typedef struct 
{
  char *name;
  size_t size;
  struct dBuffer *fb;
  int handle;
  struct aiocb *ctrlBlks;
  off_t offset;
  int currOpIndex;
  int nextOpIndex;
  int prefIndex;
  bool finished;
  bool *nfinished;
68
} bFiledataPA;
Deike Kleberg's avatar
Deike Kleberg committed
69

Deike Kleberg's avatar
Deike Kleberg committed
70
static queue_t *bibAFiledataPA;
Deike Kleberg's avatar
Deike Kleberg committed
71

72
int nPrefStreams = 4;
Deike Kleberg's avatar
Deike Kleberg committed
73
74
75

/***************************************************************/

76
77
static aFiledataPA *
initAFiledataPA(const char *filename, size_t bs)
Deike Kleberg's avatar
Deike Kleberg committed
78
{
79
  aFiledataPA *afd;
Deike Kleberg's avatar
Deike Kleberg committed
80
  size_t len;
81
  int iret;
Deike Kleberg's avatar
Deike Kleberg committed
82

83
  xdebug ( "filename=%s, buffersize=%zu, in", filename, bs );
Deike Kleberg's avatar
Deike Kleberg committed
84

85
  afd = xmalloc ( sizeof ( aFiledataPA ));
86
87

  len = strlen ( filename );
Thomas Jahns's avatar
Thomas Jahns committed
88
  afd->name = xmalloc (( len + 1) * sizeof ( afd->name[0] ));
89
90
  strcpy ( afd->name, filename );
  afd->size = bs;
91
  afd->tsID = 0;
92
  
Deike Kleberg's avatar
Deike Kleberg committed
93
  /* init output buffer */
94
95
  
  xdebug ( "filename=%s, init output buffer",  afd->name );
96
97
98
99

  iret = dbuffer_init ( &( afd->db1 ), afd->size );
  iret += dbuffer_init ( &( afd->db2 ), afd->size );

100
  if ( iret > 0 ) xabort ( "dbuffer_init did not succeed" );
101

102
  afd->db = afd->db1;
Deike Kleberg's avatar
Deike Kleberg committed
103

104
  afd->command = IO_Open_file;
Deike Kleberg's avatar
Deike Kleberg committed
105
  afd->request = MPI_REQUEST_NULL;
Deike Kleberg's avatar
Deike Kleberg committed
106

107
  xdebug ( "enqueued name=%s, return", afd->name );
Deike Kleberg's avatar
Deike Kleberg committed
108
  
109
  return afd;
Deike Kleberg's avatar
Deike Kleberg committed
110
}
111

Deike Kleberg's avatar
Deike Kleberg committed
112
113
/***************************************************************/ 

114
115
static bFiledataPA *
initBFiledataPA(char *filename, size_t bs, int nc)
Deike Kleberg's avatar
Deike Kleberg committed
116
{
117
  bFiledataPA *bfd;
Deike Kleberg's avatar
Deike Kleberg committed
118
119
  int i;

120
121
  xdebug ( "filename=%s, buffersize=%zu, ncollectors=%d, nPrefetchStreams=%d",
           filename, bs, nc, nPrefStreams );
Deike Kleberg's avatar
Deike Kleberg committed
122

123
  bfd = xmalloc ( sizeof ( bfd[0] ));
Deike Kleberg's avatar
Deike Kleberg committed
124

Thomas Jahns's avatar
Thomas Jahns committed
125
  bfd->name = xmalloc (( strlen ( filename ) + 1) * sizeof ( bfd->name[0] ));
126
127
  strcpy ( bfd->name, filename );
  bfd->size = bs;
Deike Kleberg's avatar
Deike Kleberg committed
128

129
  if (( bfd->handle = open ( bfd->name, O_CREAT | O_WRONLY, 0666 )) == -1 )
130
    xabort("Failed to open %s", bfd->name);
Deike Kleberg's avatar
Deike Kleberg committed
131

132
  dbuffer_init ( &( bfd->fb ), ( size_t )( nPrefStreams * bfd->size ));
Deike Kleberg's avatar
Deike Kleberg committed
133

134
  bfd->ctrlBlks = xcalloc(nPrefStreams, sizeof (bfd->ctrlBlks[0]));
Deike Kleberg's avatar
Deike Kleberg committed
135
136
137

  for ( i = 0; i < nPrefStreams; i++ )
    {
138
139
140
141
      bfd->ctrlBlks[i].aio_fildes     = bfd->handle;
      bfd->ctrlBlks[i].aio_buf        = bfd->fb->buffer + ( i * bfd->size );
      bfd->ctrlBlks[i].aio_reqprio    = 0;
      bfd->ctrlBlks[i].aio_sigevent.sigev_notify = SIGEV_NONE;   
Deike Kleberg's avatar
Deike Kleberg committed
142
143
    }
  
144
145
146
147
  bfd->nextOpIndex = 0;
  bfd->prefIndex = 0; 
  bfd->offset = 0;
  bfd->finished = false;
148
  bfd->nfinished = xmalloc ( nc * sizeof ( bfd->nfinished[0] ));
Deike Kleberg's avatar
Deike Kleberg committed
149

Deike Kleberg's avatar
Deike Kleberg committed
150
  for ( i = 0; i < nc; i++ )
151
    bfd->nfinished[i] = false;
Deike Kleberg's avatar
Deike Kleberg committed
152

153
  xdebug ( "filename=%s, opened file, return", bfd->name );
Deike Kleberg's avatar
Deike Kleberg committed
154

155
  return bfd;
Deike Kleberg's avatar
Deike Kleberg committed
156
157
158
159
}

/***************************************************************/

160
161
static int
destroyAFiledataPA(void *v)
Deike Kleberg's avatar
Deike Kleberg committed
162
{
163
  aFiledataPA *afd = (aFiledataPA * ) v;
Deike Kleberg's avatar
Deike Kleberg committed
164
  MPI_Status status;
Deike Kleberg's avatar
Deike Kleberg committed
165

166
  xdebug ( "filename=%s, cleanup, in", afd->name );
Deike Kleberg's avatar
Deike Kleberg committed
167

168
  xmpiStat ( MPI_Wait ( &( afd->request ), &status ), &status );
Deike Kleberg's avatar
Deike Kleberg committed
169
  
170
171
  dbuffer_cleanup ( &( afd->db1 ));
  dbuffer_cleanup ( &( afd->db2 ));
Deike Kleberg's avatar
Deike Kleberg committed
172

173
174
  free ( afd->name );
  free ( afd );
Deike Kleberg's avatar
Deike Kleberg committed
175

176
  xdebug("%s", "cleaned up, return");
Deike Kleberg's avatar
Deike Kleberg committed
177

178
  return 0;
Deike Kleberg's avatar
Deike Kleberg committed
179
180
181
182
}

/***************************************************************/

183
184
static int
destroyBFiledataPA ( void *v )
Deike Kleberg's avatar
Deike Kleberg committed
185
{
186
  bFiledataPA *bfd = (bFiledataPA * ) v;
Deike Kleberg's avatar
Deike Kleberg committed
187
188
  const struct aiocb *ccBP[1];
  int iret = 0;
Thomas Jahns's avatar
Thomas Jahns committed
189
  ssize_t ssiret;
190
191
  int nextFinishOp = (bfd->nextOpIndex - bfd->prefIndex + nPrefStreams)
    % nPrefStreams;
192
  double startTime;
Deike Kleberg's avatar
Deike Kleberg committed
193

Thomas Jahns's avatar
Thomas Jahns committed
194
  xdebug ( "filename=%s, cleanup and close file", bfd->name );
Deike Kleberg's avatar
Deike Kleberg committed
195
196
197

  /* close file */

198
  for (; bfd->prefIndex > 0 ; bfd->prefIndex --)
Deike Kleberg's avatar
Deike Kleberg committed
199
    {
Thomas Jahns's avatar
Thomas Jahns committed
200
      xdebug("file: %s, prefIndex=%d", bfd->name, (int)bfd->prefIndex);
201
      ccBP[0] = ( bfd->ctrlBlks + nextFinishOp );
Deike Kleberg's avatar
Deike Kleberg committed
202
203
204
205
206
207
208

      if ( ddebug )
	startTime = MPI_Wtime ();

      do
	{
	  iret = aio_suspend ( ccBP, 1, NULL );
209
	  if ( iret < 0 && errno != EINTR ) xabort ( "aio_suspend () failed" );
210
	}
211
      while ( iret != 0 );
Deike Kleberg's avatar
Deike Kleberg committed
212
213
214

      if ( ddebug )
	accumSuspend += ( MPI_Wtime () - startTime);
215
216
217

      iret = aio_error(bfd->ctrlBlks + nextFinishOp);
      if (( ssiret = aio_return ( bfd->ctrlBlks + nextFinishOp )) == -1 )
Thomas Jahns's avatar
Thomas Jahns committed
218
	xabort("aio_return () failed: %s", strerror(iret));
219
220

      nextFinishOp = ( nextFinishOp + 1 ) % nPrefStreams;
Deike Kleberg's avatar
Deike Kleberg committed
221
222
    }

223
  if (( iret = close ( bfd->handle )) == -1 )
Thomas Jahns's avatar
Thomas Jahns committed
224
    xabort("failed to close %s", bfd->name);
Deike Kleberg's avatar
Deike Kleberg committed
225
226
227

  /* file closed, cleanup */

228
  dbuffer_cleanup ( &( bfd->fb ));
Deike Kleberg's avatar
Deike Kleberg committed
229

230
231
232
233
  free ( bfd->nfinished );
  free ( bfd->ctrlBlks );
  free ( bfd->name );
  free ( bfd );
Deike Kleberg's avatar
Deike Kleberg committed
234

235
  xdebug("%s", "closed file and cleaned up, return");
Deike Kleberg's avatar
Deike Kleberg committed
236
237
238
239
240
241

  return iret;
}

/***************************************************************/

242
243
static bool
compareNamesAPA(void *v1, void *v2)
Deike Kleberg's avatar
Deike Kleberg committed
244
{
245
  aFiledataPA *afd1 = v1, *afd2 = v2;
Deike Kleberg's avatar
Deike Kleberg committed
246

247
  return !strcmp(afd1->name, afd2->name);
Deike Kleberg's avatar
Deike Kleberg committed
248
249
250
251
}

/***************************************************************/

252
253
static bool
compareNamesBPA(void *v1, void *v2)
254
{
255
  bFiledataPA *bfd1 = v1, *bfd2 = v2;
256

257
  return !strcmp(bfd1->name, bfd2->name);
258
259
260
261
}

/***************************************************************/

262
263
static void
writePA(bFiledataPA *bfd, long amount)
Deike Kleberg's avatar
Deike Kleberg committed
264
265
266
{
  const struct aiocb *ccBP[1];
  int iret;
267
  double startTime;
Deike Kleberg's avatar
Deike Kleberg committed
268

269
  xdebug ( "file %s, in", bfd->name );
Deike Kleberg's avatar
Deike Kleberg committed
270
  
271
272
  bfd->ctrlBlks[bfd->currOpIndex].aio_nbytes = amount;
  bfd->ctrlBlks[bfd->currOpIndex].aio_offset = bfd->offset;
Deike Kleberg's avatar
Deike Kleberg committed
273

274
275
276
  xdebug ( " before aio_write(), file %s, aio_nbytes=%zu, aio_offset=%zu",
           bfd->name, bfd->ctrlBlks[bfd->currOpIndex].aio_nbytes,
           bfd->ctrlBlks[bfd->currOpIndex].aio_offset );
Deike Kleberg's avatar
Deike Kleberg committed
277

278
  if ( ddebug ) startTime = MPI_Wtime ();
Deike Kleberg's avatar
Deike Kleberg committed
279

280
  iret = aio_write ( bfd->ctrlBlks + bfd->currOpIndex );
Deike Kleberg's avatar
Deike Kleberg committed
281

282
  if ( ddebug ) accumWrite += ( MPI_Wtime () - startTime);
Deike Kleberg's avatar
Deike Kleberg committed
283

284
285
286
287
  xdebug ( "after aio_write(), file %s, aio_nbytes=%zu, aio_offset=%zu,"
           "iret=aio_write()=%d",
           bfd->name, bfd->ctrlBlks[bfd->currOpIndex].aio_nbytes,
           bfd->ctrlBlks[bfd->currOpIndex].aio_offset, iret );
288
289
290
291
292
   
  if ( iret == -1 ) 
    {
      xabort ( "did not succeed writing buffer" );
    }
293
294
295
  else
    xdebug ( "buffer written to %s",  bfd->name );
     
296
297
  bfd->offset += ( off_t ) amount;
  bfd->prefIndex ++;
Deike Kleberg's avatar
Deike Kleberg committed
298

299
  if ( bfd->prefIndex >= nPrefStreams ) 
Deike Kleberg's avatar
Deike Kleberg committed
300
    {
301
      ccBP[0] = ( bfd->ctrlBlks + bfd->nextOpIndex );
Deike Kleberg's avatar
Deike Kleberg committed
302
303
304
305
306
307
308
309

      if ( ddebug )
	startTime = MPI_Wtime ();

      do
	{
	  iret = aio_suspend ( ccBP, 1, NULL );
	  if ( iret < 0 && errno != EINTR )
310
	    xabort ( "aio_suspend () failed" );
Deike Kleberg's avatar
Deike Kleberg committed
311
312
313
314
315
	} while ( iret != 0 );

      if ( ddebug )
	accumSuspend += ( MPI_Wtime () - startTime);
	      
316
      if (( iret = aio_return ( bfd->ctrlBlks + bfd->nextOpIndex )) == -1 ) 
317
	xabort ( "aio_return () failed" );
Deike Kleberg's avatar
Deike Kleberg committed
318

319
      bfd->prefIndex --;
Deike Kleberg's avatar
Deike Kleberg committed
320
321
    }

322
  xdebug ( "filename=%s, prefIndex=%d, return", bfd->name, bfd->prefIndex );
Deike Kleberg's avatar
Deike Kleberg committed
323
324
325
}

/***************************************************************/
326
327

// TODO: unify in IOModi, encapsulate in pio_queue.c
328
329
static void
queueCheckPA(queue_t *q, char *name)
330
{
Deike Kleberg's avatar
Deike Kleberg committed
331
  listElem_t *curr;
332
  bFiledataPA *bfd;
333
334

  curr = q->head;
335
  while ( curr )
336
    {
337
      bfd = ( bFiledataPA * ) curr->val;
338
339
340
      if (strcmp(name, bfd->name) == 0)
        xabort("Filename %s is already enqueued\n", name );

341
      curr = curr->next;
342
    }
343
}
Deike Kleberg's avatar
Deike Kleberg committed
344

345
346
347
/***************************************************************/

void pwPOSIXASYNCH ( void )
Deike Kleberg's avatar
Deike Kleberg committed
348
{
349
  bFiledataPA *bfd; 
Deike Kleberg's avatar
Deike Kleberg committed
350
  queue_t * bibBFiledataPA;
Deike Kleberg's avatar
Deike Kleberg committed
351
352
  long amount, buffersize;
  char *messageBuffer, *pMB, *filename, *temp;
Deike Kleberg's avatar
Deike Kleberg committed
353
  int messagesize, source, tag, i, id;
Deike Kleberg's avatar
Deike Kleberg committed
354
  tag_t *rtag;
Deike Kleberg's avatar
Deike Kleberg committed
355
  MPI_Status status;
356
357
  MPI_Comm commNode = commInqCommNode ();
  int nProcsCollNode = commInqSizeNode () - commInqSizeColl ();
358
  bool * sentFinalize, doFinalize;
Deike Kleberg's avatar
Deike Kleberg committed
359

360
  xdebug ( "nProcsCollNode=%d on this node", nProcsCollNode );
Deike Kleberg's avatar
Deike Kleberg committed
361
 
362
  bibBFiledataPA = queueInit ( destroyBFiledataPA, compareNamesBPA );
363
  sentFinalize = xmalloc ( nProcsCollNode * sizeof ( sentFinalize ));
Deike Kleberg's avatar
Deike Kleberg committed
364
365
  
  for ( ;; )
366
367
368
369
    {   
      xmpiStat ( MPI_Probe ( MPI_ANY_SOURCE, MPI_ANY_TAG, commNode, 
                             &status ), &status );

Deike Kleberg's avatar
Deike Kleberg committed
370
371
      source = status.MPI_SOURCE;
      tag    = status.MPI_TAG;
Deike Kleberg's avatar
Deike Kleberg committed
372
      rtag   = getTag ( tag );
Deike Kleberg's avatar
Deike Kleberg committed
373

374
      xmpi ( MPI_Get_count ( &status, MPI_CHAR, &messagesize ));
Deike Kleberg's avatar
Deike Kleberg committed
375

376
377
378
379
      xdebug ( "receive message from source=%d, id=%d, command=%d ( %s ), "
               "messagesize=%d",
               source, rtag->id, rtag->command, 
               command2charP [ rtag->command ], messagesize);
Deike Kleberg's avatar
Deike Kleberg committed
380
381
382
383

      switch ( rtag->command )
	{
      	case IO_Open_file:
Deike Kleberg's avatar
Deike Kleberg committed
384

385
386
	  messageBuffer = ( char *) xmalloc ( messagesize * 
                                              sizeof ( messageBuffer[0] ));
Deike Kleberg's avatar
Deike Kleberg committed
387
388
	  pMB = messageBuffer;

389
390
	  xmpi ( MPI_Recv ( messageBuffer, messagesize, MPI_CHAR, source, 
                            tag, commNode, &status ));
Deike Kleberg's avatar
Deike Kleberg committed
391
392
393
394
395
396
397
398

	  filename = strtok ( pMB, token );
	  pMB += ( strlen ( filename ) + 1 );
	  temp =  strtok ( pMB, token );
	  buffersize =  strtol ( temp, NULL, 16 );
	  pMB += ( strlen ( temp ) + 1 );
	  amount = ( long ) ( messageBuffer + messagesize - pMB );

399
400
401
	  xdebug ( "command  %s, filename=%s, buffersize=%ld, amount=%ld",
                   command2charP [ rtag->command ], filename, 
                   buffersize, amount ); 
Deike Kleberg's avatar
Deike Kleberg committed
402
403

	  
404
	  if ( !(bfd = ( bFiledataPA * ) queueIdx2val ( bibBFiledataPA, 
Deike Kleberg's avatar
Deike Kleberg committed
405
406
							     rtag->id )))
	    {
407
	      queueCheckPA (  bibBFiledataPA, filename );
408
	      bfd = ( bFiledataPA * ) initBFiledataPA ( filename, 
Deike Kleberg's avatar
Deike Kleberg committed
409
								 buffersize,  
410
							       nProcsCollNode );
411
	      if (( id = queuePush ( bibBFiledataPA, bfd, 1, rtag->id )) != rtag->id )
Thomas Jahns's avatar
Thomas Jahns committed
412
                xabort("fileID=%d not unique", rtag->id);
Deike Kleberg's avatar
Deike Kleberg committed
413
	    }
414
	  else
Thomas Jahns's avatar
Thomas Jahns committed
415
416
	    if (strcmp(filename, bfd->name) != 0)
              xabort("filename is not consistent, fileID=%d", rtag->id);
Deike Kleberg's avatar
Deike Kleberg committed
417

418
419
	  bfd->currOpIndex = bfd->nextOpIndex;
	  bfd->nextOpIndex = ( bfd->nextOpIndex + 1 ) % nPrefStreams;
Deike Kleberg's avatar
Deike Kleberg committed
420

Thomas Jahns's avatar
Thomas Jahns committed
421
	  memcpy ( bfd->fb->buffer + ( bfd->currOpIndex * bfd->size ),
422
                   pMB, ( int ) amount );
Deike Kleberg's avatar
Deike Kleberg committed
423

424
	  writePA ( bfd, amount );
Thomas Jahns's avatar
Thomas Jahns committed
425

Deike Kleberg's avatar
Deike Kleberg committed
426
427
428
429
430
	  free ( messageBuffer );

	  break;

	case IO_Send_buffer:
431
432
433
434

	  if (!(bfd = (bFiledataPA *)queueIdx2val(bibBFiledataPA, rtag->id)))
            xabort("fileID=%d is not in queue", rtag->id);

Deike Kleberg's avatar
Deike Kleberg committed
435
436
	  amount = messagesize;

437
438
	  xdebug ( "command: %s, id=%d, name=%s", 
                   command2charP [ rtag->command ], rtag->id, bfd->name );
Deike Kleberg's avatar
Deike Kleberg committed
439

440
441
	  bfd->currOpIndex = bfd->nextOpIndex;
	  bfd->nextOpIndex = ( bfd->nextOpIndex + 1 ) % nPrefStreams;
Deike Kleberg's avatar
Deike Kleberg committed
442
	  
443
444
	  xmpi(MPI_Recv((void *)bfd->ctrlBlks[bfd->currOpIndex].aio_buf,
                        amount, MPI_CHAR, source, tag, commNode, &status ));
Deike Kleberg's avatar
Deike Kleberg committed
445

446
	  writePA ( bfd, amount );
Deike Kleberg's avatar
Deike Kleberg committed
447
448
449
450
	  
	  break;

	case IO_Close_file:
451
452
453
454

	  if (!(bfd = (bFiledataPA *)queueIdx2val(bibBFiledataPA, rtag->id)))
            xabort("fileID=%d is not in queue", rtag->id );

Deike Kleberg's avatar
Deike Kleberg committed
455
456
	  amount = messagesize;

457
	  xdebug ( " command %s, id=%d, name=%s", 
458
                   command2charP [ rtag->command ], rtag->id, bfd->name );
Deike Kleberg's avatar
Deike Kleberg committed
459

460
	  bfd->currOpIndex = bfd->nextOpIndex;
Deike Kleberg's avatar
Deike Kleberg committed
461

462
	  bfd->nextOpIndex = ( bfd->nextOpIndex + 1 ) % nPrefStreams;
Deike Kleberg's avatar
Deike Kleberg committed
463

464
	  MPI_Recv (  bfd->fb->buffer + ( bfd->currOpIndex * bfd->size ), 
465
		      amount, MPI_CHAR, source,  tag, commNode, &status );
Deike Kleberg's avatar
Deike Kleberg committed
466

467
	  writePA ( bfd, amount );
Deike Kleberg's avatar
Deike Kleberg committed
468

469
	  bfd->nfinished[source] = true;
470
	  bfd->finished          = true;
Deike Kleberg's avatar
Deike Kleberg committed
471
	  
472
	  for ( i = 0; i < nProcsCollNode; i++ )
473
	    if ( !( bfd->nfinished[i] ))
Deike Kleberg's avatar
Deike Kleberg committed
474
	      {
475
		bfd->finished = false;
Deike Kleberg's avatar
Deike Kleberg committed
476
477
478
		break;
	      }
	  
479
	  if ( bfd->finished )
Deike Kleberg's avatar
Deike Kleberg committed
480
	    {
481
              xdebug ( "all are finished with file %d, delete node", rtag->id );
482
	      queueDelNode ( bibBFiledataPA, rtag->id );
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
	    }
          break;
        case IO_Finalize:
          {
            int buffer = CDI_UNDEFID, collID;

            xmpi ( MPI_Recv ( &buffer, 1, MPI_INT, source, tag, commNode, &status ));
            sentFinalize[source] = true;
            doFinalize = true;
            for ( collID = 0; collID < nProcsCollNode; collID++ )
              if ( !sentFinalize[collID] ) 
                {
                  doFinalize = false;
                  break;
                }
            if ( doFinalize )
              {
                if ( bibBFiledataPA->head != NULL )
                  {
                    xabort ( "Queue bibBfiledataP is not empty." );
                  }
                else
                  {
506
507
                    xdebug("%s", "all files are finished, destroy queue,"
                           " return");
508
509
510
511
512
513
514
515
516
517
                    queueDestroy ( bibBFiledataPA );
                  }
                if ( rtag ) ungetTag ( rtag );
                return;
              }
          }

          break;
        default:
          xabort ( "COMMAND NOT IMPLEMENTED" );
Deike Kleberg's avatar
Deike Kleberg committed
518
519
520
521
522
523
	}
    }
}


/***************************************************************/
Thomas Jahns's avatar
Thomas Jahns committed
524
/* send buffer to writer and swap buffer for filling */
525
526
static void
sendPA(aFiledataPA *afd, int id)
Deike Kleberg's avatar
Deike Kleberg committed
527
{
Deike Kleberg's avatar
Deike Kleberg committed
528
529
  int tag;
  long amount;
Deike Kleberg's avatar
Deike Kleberg committed
530
  MPI_Status status;
531
  double startTime;
Deike Kleberg's avatar
Deike Kleberg committed
532

533
534
  amount = ( long ) dbuffer_data_size ( afd->db );
  tag = setTag ( id, afd->command );
Deike Kleberg's avatar
Deike Kleberg committed
535

536
537
538
539
  xdebug ( "send buffer for %s, size: %zu bytes, command=%s, in", 
           afd->name, amount, command2charP [ afd->command ] );
  
  if ( ddebug ) startTime = MPI_Wtime ();
Deike Kleberg's avatar
Deike Kleberg committed
540

541
  xmpiStat ( MPI_Wait ( &( afd->request ), &status ), &status );
Deike Kleberg's avatar
Deike Kleberg committed
542

543
  if ( ddebug ) accumWait +=  ( MPI_Wtime () - startTime );
Deike Kleberg's avatar
Deike Kleberg committed
544

545
546
  xmpi(MPI_Issend(afd->db->buffer, amount, MPI_CHAR, commInqSpecialRankNode(),
                  tag, commInqCommNode(), &afd->request));
Deike Kleberg's avatar
Deike Kleberg committed
547
548
549

  /* change outputBuffer */
  
550
  dbuffer_reset ( afd->db );
Deike Kleberg's avatar
Deike Kleberg committed
551
  
552
  if ( afd->db == afd->db1 )
Deike Kleberg's avatar
Deike Kleberg committed
553
    {
554
      xdebug("%s", "Change to buffer 2 ...");
555
      afd->db =  afd->db2;
Deike Kleberg's avatar
Deike Kleberg committed
556
557
558
    }
  else 
    {
559
      xdebug("%s", "Change to buffer 1 ...");
560
      afd->db =  afd->db1;
Deike Kleberg's avatar
Deike Kleberg committed
561
562
    }
  
563
  afd->command = IO_Send_buffer;
Deike Kleberg's avatar
Deike Kleberg committed
564
565
  
  return;
566
567
568
569
570
}

 
/***************************************************************/ 

571
572
static void
defTimestepPA(aFiledataPA * afd, int tsID)
573
574
575
576
577
578
{
  if ( afd == NULL || tsID != afd->tsID + 1 ) 
    xabort ( " defTimestepPA () didn't succeed." );
  afd->tsID = tsID;
}
 
Deike Kleberg's avatar
Deike Kleberg committed
579
580
581

/***************************************************************/

582
size_t fwPOSIXASYNCH( int id, int tsID, const void *buffer, size_t len )
Deike Kleberg's avatar
Deike Kleberg committed
583
584
585
586
{
  int error = 0;
  int flush = 0;
  int filled = 0;
587
  aFiledataPA *afd;
Deike Kleberg's avatar
Deike Kleberg committed
588
  listElem_t *curr;
Deike Kleberg's avatar
Deike Kleberg committed
589

590
591
  afd = ( aFiledataPA * ) queueIdx2val ( bibAFiledataPA, id );
  flush = ( tsID != afd->tsID ) ? 1 : 0;
Deike Kleberg's avatar
Deike Kleberg committed
592
593
  if ( flush == 1 ) 
    {
Thomas Jahns's avatar
Thomas Jahns committed
594
      xdebug ( "fileWriter (): tsID = %d, flush buffer", tsID );
Deike Kleberg's avatar
Deike Kleberg committed
595

596
      curr = bibAFiledataPA->head;
Deike Kleberg's avatar
Deike Kleberg committed
597
598
599

      while ( curr ) 
	{
600
	  sendPA (( aFiledataPA *) curr->val, curr->idx );
601
          defTimestepPA (( aFiledataPA * ) curr->val, tsID );
Deike Kleberg's avatar
Deike Kleberg committed
602
603
	  curr = curr->next;
	}
604
605
606
607
608
609
610
611

      {
        double startTime;
        MPI_Status status;
        if (ddebug) startTime = MPI_Wtime();
        xmpiStat(MPI_Wait(&(afd->request), &status), &status);
        if (ddebug) accumWait +=  MPI_Wtime() - startTime;
      }
612
      xmpi ( MPI_Barrier ( commInqCommColl ())); 
Deike Kleberg's avatar
Deike Kleberg committed
613
614
    }                                          

615
  filled = dbuffer_push ( afd->db, ( unsigned char * ) buffer, len );
Deike Kleberg's avatar
Deike Kleberg committed
616

617
618
  xdebug ( "id = %d, tsID = %d, pushed %zu byte data on buffer, filled = %d", 
           id, tsID, len, filled ); 
Deike Kleberg's avatar
Deike Kleberg committed
619
620
621
622
623
624
625

  if ( filled == 1 ) 
    {
      if ( flush )
	error = filled;
      else
	{
626
	  sendPA ( afd, id );
Deike Kleberg's avatar
Deike Kleberg committed
627
     
628
	  error = dbuffer_push ( afd->db, ( unsigned char * )buffer, len );
Deike Kleberg's avatar
Deike Kleberg committed
629
630
	}
    }
631

Deike Kleberg's avatar
Deike Kleberg committed
632
  if ( error == 1 )
633
634
    xabort("did not succeed filling output buffer, id=%d", id);

Deike Kleberg's avatar
Deike Kleberg committed
635
636
637
638
639
640
641
642
  return len;
}

/***************************************************************/

int fcPOSIXASYNCH ( int id )
{
  double accumWaitMax;
643
  aFiledataPA *afd;
644
  int iret, root = 0;
Deike Kleberg's avatar
Deike Kleberg committed
645

646
  xdebug ( "fileID %d: send buffer, close file and cleanup", id );
Deike Kleberg's avatar
Deike Kleberg committed
647

648
  afd = ( aFiledataPA * ) queueIdx2val ( bibAFiledataPA, id );
Deike Kleberg's avatar
Deike Kleberg committed
649

650
  afd->command = IO_Close_file;
Deike Kleberg's avatar
Deike Kleberg committed
651

652
  sendPA ( afd, id );
Deike Kleberg's avatar
Deike Kleberg committed
653
654
655

  /* dequeue file element */

656
  iret = queueDelNode ( bibAFiledataPA, id );
Deike Kleberg's avatar
Deike Kleberg committed
657
658
659
660
661

  /* timer output */

  if ( ddebug )
    {
662
663
664
665
666
      xmpi ( MPI_Reduce ( &accumWait, &accumWaitMax, 
                          1, MPI_DOUBLE, MPI_MAX, root, commInqCommColl ()));
      xdebug ( "Wait time %15.10lf s", accumWait );
      if ( commInqRankColl () == root )
	xdebug ( "Max wait time %15.10lf s", accumWaitMax );
Deike Kleberg's avatar
Deike Kleberg committed
667
668
669
670
671
672
673
674
675
    }
  
  return iret;
}

/***************************************************************/

int fowPOSIXASYNCH ( const char *filename )
{
676
677
  static aFiledataPA *afd;
  static long buffersize = 0; 
678
  int root = 0, iret, id, messageLength = 32;
Deike Kleberg's avatar
Deike Kleberg committed
679
680
681
  char message[messageLength];

  /* broadcast buffersize to collectors */
682
             
Deike Kleberg's avatar
Deike Kleberg committed
683
684
685
  if ( !buffersize )
    {
      
686
      if  ( commInqRankColl () == root )
Deike Kleberg's avatar
Deike Kleberg committed
687
688
689
	{ 
	  if ( getenv( "BUFSIZE" ) != NULL )
	    buffersize = atol ( getenv ( "BUFSIZE" ));
690
691
	  if ( buffersize < initial_buffersize )
	    buffersize = initial_buffersize;
692
693
          xdebug("filename=%s, broadcast buffersize=%ld to collectors ...",
                 filename, buffersize);
Deike Kleberg's avatar
Deike Kleberg committed
694
	}
695
      xmpi ( MPI_Bcast ( &buffersize, 1, MPI_LONG, root, commInqCommColl ()));
Deike Kleberg's avatar
Deike Kleberg committed
696
697
    }

698
699
700
  /* init and enqueue aFiledataPA */

  afd = initAFiledataPA ( filename, buffersize );
701
702
  if ((id = queuePush ( bibAFiledataPA, afd, 0)) < 0)
    xabort("filename %s is not unique", afd->name);
Deike Kleberg's avatar
Deike Kleberg committed
703

704
705
  xdebug ( "filename=%s, init and enqueued aFiledataPA, return id = %d", 
           filename, id );
Deike Kleberg's avatar
Deike Kleberg committed
706
707
708

  /* put filename, id and buffersize on buffer */ 

709
710
  iret = dbuffer_push ( afd->db, ( unsigned char *) filename, 
                        strlen ( filename ));
711
  xassert(iret == 0);
712
  iret = dbuffer_push ( afd->db,  ( unsigned char *) token, 1);  
713
  xassert(iret == 0);
Deike Kleberg's avatar
Deike Kleberg committed
714
  sprintf ( message,"%lX", buffersize); 
715
716
  iret = dbuffer_push ( afd->db,  ( unsigned char *) message, 
                        strlen ( message ));
717
  xassert(iret == 0);
718
  iret = dbuffer_push ( afd->db,  ( unsigned char *) token, 1);
719
  xassert(iret == 0);
720
721
722
723
724
725

  if ( ddebug )
    {
      size_t l = strlen(filename) + strlen(message) + 2;
      char *temp = xmalloc(l + 1);
      strncpy(temp, (char *)afd->db->buffer, l);
726
727
728
      temp[l] = '\0';
      xdebug("filename=%s, put Open file message on buffer:\n%s,\t return",
             filename, temp);
729
730
731
      free(temp);
    }

Deike Kleberg's avatar
Deike Kleberg committed
732
733
734
735
736
  return id;
}

/***************************************************************/

737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
void finalizePOSIXASYNCH ( void )
{  
  int buffer = 0, tag, specialRank = commInqSpecialRankNode ();
  MPI_Comm commNode = commInqCommNode ();
  
  tag = setTag ( 0, IO_Finalize );
  
  xmpi ( MPI_Send ( &buffer, 1, MPI_INT, specialRank, tag, commNode ));
  
  if ( bibAFiledataPA->head != NULL ) 
    {
      xabort ( "Queue bibAFiledataP not empty." );
    }
  else
    {
752
      xdebug("%s", "cleanup queue");
753
754
755
756
757
758
      queueDestroy ( bibAFiledataPA );
    }
}

/***************************************************************/

759
void initPOSIXASYNCH ( void )  
Deike Kleberg's avatar
Deike Kleberg committed
760
{
761
762
  if ( commInqSizeNode () < 2 ) 
    xabort ( "USAGE: # IO PROCESSES ON A PHYSICAL NODE >= 2" );
763

764
  if ( nPrefStreams < 1 ) xabort ( "USAGE: # PREFETCH STREAMS >= 1" );
Deike Kleberg's avatar
Deike Kleberg committed
765

766
  if ( commInqRankNode () == commInqSpecialRankNode ())
Deike Kleberg's avatar
Deike Kleberg committed
767
    {
768
769
770
771
772
      commDefCommColl ( 0 );
      commSendNodeInfo ();
      commRecvNodeMap ();
      commDefCommsIO ();
      pwPOSIXASYNCH ();
Deike Kleberg's avatar
Deike Kleberg committed
773
774
    }
  else
775
776
777
778
779
780
781
782
    {
      commDefCommColl ( 1 );
      commSendNodeInfo ();
      commRecvNodeMap ();
      commDefCommsIO ();
      bibAFiledataPA = queueInit ( destroyAFiledataPA, 
                                   compareNamesAPA );
    }
Deike Kleberg's avatar
Deike Kleberg committed
783
784
785
}

#endif
786
#endif
787
788
789
790
791
792
793
794
795
/*
 * Local Variables:
 * c-file-style: "Java"
 * c-basic-offset: 2
 * indent-tabs-mode: nil
 * show-trailing-whitespace: t
 * require-trailing-newline: t
 * End:
 */