pio_posixasynch.c 11.5 KB
Newer Older
1
2
3
4
/* 
   todo  
   README: specialRank Pe closes down, when all output files are closed    
*/
5
6
7
#ifdef HAVE_CONFIG_H
#  include "config.h"
#endif
Deike Kleberg's avatar
Deike Kleberg committed
8

9
10

#ifdef USE_MPI
11
#ifndef _SX
Deike Kleberg's avatar
Deike Kleberg committed
12
13
14
15

#include <stdbool.h>
#include <stdio.h>
#include <string.h>
16
#include <unistd.h>
Deike Kleberg's avatar
Deike Kleberg committed
17
18
19
#include <errno.h>

#include <aio.h>
20
21
#include <sys/types.h>
#include <sys/stat.h>
Deike Kleberg's avatar
Deike Kleberg committed
22
23
24
25
#include <fcntl.h>

#include "mpi.h"
#include "pio.h"
26
#include "pio_comm.h"
Deike Kleberg's avatar
Deike Kleberg committed
27
#include "pio_impl.h"
Deike Kleberg's avatar
Deike Kleberg committed
28
29
#include "pio_util.h"

30
extern char * command2charP[6];
Deike Kleberg's avatar
Deike Kleberg committed
31

32
33
extern char *token;

Deike Kleberg's avatar
Deike Kleberg committed
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
extern double accumProbe;
extern double accumRecv;
extern double accumSend;
extern double accumSuspend;
extern double accumWait;
extern double accumWrite;


typedef struct 
{
  char *name;
  size_t size;
  struct dBuffer *fb;
  int handle;
  struct aiocb *ctrlBlks;
  off_t offset;
  int currOpIndex;
  int nextOpIndex;
  int prefIndex;
  bool finished;
  bool *nfinished;
55
} bFiledataPA;
Deike Kleberg's avatar
Deike Kleberg committed
56

57
int nPrefStreams = 4;
Deike Kleberg's avatar
Deike Kleberg committed
58
59
60

/***************************************************************/

61
62
static bFiledataPA *
initBFiledataPA(char *filename, size_t bs, int nc)
Deike Kleberg's avatar
Deike Kleberg committed
63
{
64
  bFiledataPA *bfd;
Deike Kleberg's avatar
Deike Kleberg committed
65
66
  int i;

67
68
  xdebug ( "filename=%s, buffersize=%zu, ncollectors=%d, nPrefetchStreams=%d",
           filename, bs, nc, nPrefStreams );
Deike Kleberg's avatar
Deike Kleberg committed
69

70
  bfd = xmalloc ( sizeof ( bfd[0] ));
Deike Kleberg's avatar
Deike Kleberg committed
71

Thomas Jahns's avatar
Thomas Jahns committed
72
  bfd->name = xmalloc (( strlen ( filename ) + 1) * sizeof ( bfd->name[0] ));
73
74
  strcpy ( bfd->name, filename );
  bfd->size = bs;
Deike Kleberg's avatar
Deike Kleberg committed
75

76
  if (( bfd->handle = open ( bfd->name, O_CREAT | O_WRONLY, 0666 )) == -1 )
77
    xabort("Failed to open %s", bfd->name);
Deike Kleberg's avatar
Deike Kleberg committed
78

79
  dbuffer_init ( &( bfd->fb ), ( size_t )( nPrefStreams * bfd->size ));
Deike Kleberg's avatar
Deike Kleberg committed
80

81
  bfd->ctrlBlks = xcalloc(nPrefStreams, sizeof (bfd->ctrlBlks[0]));
Deike Kleberg's avatar
Deike Kleberg committed
82
83
84

  for ( i = 0; i < nPrefStreams; i++ )
    {
85
86
87
88
      bfd->ctrlBlks[i].aio_fildes     = bfd->handle;
      bfd->ctrlBlks[i].aio_buf        = bfd->fb->buffer + ( i * bfd->size );
      bfd->ctrlBlks[i].aio_reqprio    = 0;
      bfd->ctrlBlks[i].aio_sigevent.sigev_notify = SIGEV_NONE;   
Deike Kleberg's avatar
Deike Kleberg committed
89
90
    }
  
91
92
93
94
  bfd->nextOpIndex = 0;
  bfd->prefIndex = 0; 
  bfd->offset = 0;
  bfd->finished = false;
95
  bfd->nfinished = xmalloc ( nc * sizeof ( bfd->nfinished[0] ));
Deike Kleberg's avatar
Deike Kleberg committed
96

Deike Kleberg's avatar
Deike Kleberg committed
97
  for ( i = 0; i < nc; i++ )
98
    bfd->nfinished[i] = false;
Deike Kleberg's avatar
Deike Kleberg committed
99

100
  xdebug ( "filename=%s, opened file, return", bfd->name );
Deike Kleberg's avatar
Deike Kleberg committed
101

102
  return bfd;
Deike Kleberg's avatar
Deike Kleberg committed
103
104
105
106
}

/***************************************************************/

107
108
static int
destroyBFiledataPA ( void *v )
Deike Kleberg's avatar
Deike Kleberg committed
109
{
110
  bFiledataPA *bfd = (bFiledataPA * ) v;
Deike Kleberg's avatar
Deike Kleberg committed
111
112
  const struct aiocb *ccBP[1];
  int iret = 0;
Thomas Jahns's avatar
Thomas Jahns committed
113
  ssize_t ssiret;
114
115
  int nextFinishOp = (bfd->nextOpIndex - bfd->prefIndex + nPrefStreams)
    % nPrefStreams;
116
  double startTime;
Deike Kleberg's avatar
Deike Kleberg committed
117

Thomas Jahns's avatar
Thomas Jahns committed
118
  xdebug ( "filename=%s, cleanup and close file", bfd->name );
Deike Kleberg's avatar
Deike Kleberg committed
119
120
121

  /* close file */

122
  for (; bfd->prefIndex > 0 ; bfd->prefIndex --)
Deike Kleberg's avatar
Deike Kleberg committed
123
    {
Thomas Jahns's avatar
Thomas Jahns committed
124
      xdebug("file: %s, prefIndex=%d", bfd->name, (int)bfd->prefIndex);
125
      ccBP[0] = ( bfd->ctrlBlks + nextFinishOp );
Deike Kleberg's avatar
Deike Kleberg committed
126
127
128
129
130
131
132

      if ( ddebug )
	startTime = MPI_Wtime ();

      do
	{
	  iret = aio_suspend ( ccBP, 1, NULL );
133
	  if ( iret < 0 && errno != EINTR ) xabort ( "aio_suspend () failed" );
134
	}
135
      while ( iret != 0 );
Deike Kleberg's avatar
Deike Kleberg committed
136
137
138

      if ( ddebug )
	accumSuspend += ( MPI_Wtime () - startTime);
139
140
141

      iret = aio_error(bfd->ctrlBlks + nextFinishOp);
      if (( ssiret = aio_return ( bfd->ctrlBlks + nextFinishOp )) == -1 )
Thomas Jahns's avatar
Thomas Jahns committed
142
	xabort("aio_return () failed: %s", strerror(iret));
143
144

      nextFinishOp = ( nextFinishOp + 1 ) % nPrefStreams;
Deike Kleberg's avatar
Deike Kleberg committed
145
146
    }

147
  if (( iret = close ( bfd->handle )) == -1 )
Thomas Jahns's avatar
Thomas Jahns committed
148
    xabort("failed to close %s", bfd->name);
Deike Kleberg's avatar
Deike Kleberg committed
149
150
151

  /* file closed, cleanup */

152
  dbuffer_cleanup ( &( bfd->fb ));
Deike Kleberg's avatar
Deike Kleberg committed
153

154
155
156
157
  free ( bfd->nfinished );
  free ( bfd->ctrlBlks );
  free ( bfd->name );
  free ( bfd );
Deike Kleberg's avatar
Deike Kleberg committed
158

159
  xdebug("%s", "closed file and cleaned up, return");
Deike Kleberg's avatar
Deike Kleberg committed
160
161
162
163
164
165

  return iret;
}

/***************************************************************/

166
167
static bool
compareNamesBPA(void *v1, void *v2)
168
{
169
  bFiledataPA *bfd1 = v1, *bfd2 = v2;
170

171
  return !strcmp(bfd1->name, bfd2->name);
172
173
174
175
}

/***************************************************************/

176
177
static void
writePA(bFiledataPA *bfd, long amount)
Deike Kleberg's avatar
Deike Kleberg committed
178
179
180
{
  const struct aiocb *ccBP[1];
  int iret;
181
  double startTime;
Deike Kleberg's avatar
Deike Kleberg committed
182

183
  xdebug ( "file %s, in", bfd->name );
Deike Kleberg's avatar
Deike Kleberg committed
184
  
185
186
  bfd->ctrlBlks[bfd->currOpIndex].aio_nbytes = amount;
  bfd->ctrlBlks[bfd->currOpIndex].aio_offset = bfd->offset;
Deike Kleberg's avatar
Deike Kleberg committed
187

188
189
190
  xdebug ( " before aio_write(), file %s, aio_nbytes=%zu, aio_offset=%zu",
           bfd->name, bfd->ctrlBlks[bfd->currOpIndex].aio_nbytes,
           bfd->ctrlBlks[bfd->currOpIndex].aio_offset );
Deike Kleberg's avatar
Deike Kleberg committed
191

192
  if ( ddebug ) startTime = MPI_Wtime ();
Deike Kleberg's avatar
Deike Kleberg committed
193

194
  iret = aio_write ( bfd->ctrlBlks + bfd->currOpIndex );
Deike Kleberg's avatar
Deike Kleberg committed
195

196
  if ( ddebug ) accumWrite += ( MPI_Wtime () - startTime);
Deike Kleberg's avatar
Deike Kleberg committed
197

198
199
200
201
  xdebug ( "after aio_write(), file %s, aio_nbytes=%zu, aio_offset=%zu,"
           "iret=aio_write()=%d",
           bfd->name, bfd->ctrlBlks[bfd->currOpIndex].aio_nbytes,
           bfd->ctrlBlks[bfd->currOpIndex].aio_offset, iret );
202
203
204
205
206
   
  if ( iret == -1 ) 
    {
      xabort ( "did not succeed writing buffer" );
    }
207
208
209
  else
    xdebug ( "buffer written to %s",  bfd->name );
     
210
211
  bfd->offset += ( off_t ) amount;
  bfd->prefIndex ++;
Deike Kleberg's avatar
Deike Kleberg committed
212

213
  if ( bfd->prefIndex >= nPrefStreams ) 
Deike Kleberg's avatar
Deike Kleberg committed
214
    {
215
      ccBP[0] = ( bfd->ctrlBlks + bfd->nextOpIndex );
Deike Kleberg's avatar
Deike Kleberg committed
216
217
218
219
220
221
222
223

      if ( ddebug )
	startTime = MPI_Wtime ();

      do
	{
	  iret = aio_suspend ( ccBP, 1, NULL );
	  if ( iret < 0 && errno != EINTR )
224
	    xabort ( "aio_suspend () failed" );
Deike Kleberg's avatar
Deike Kleberg committed
225
226
227
228
229
	} while ( iret != 0 );

      if ( ddebug )
	accumSuspend += ( MPI_Wtime () - startTime);
	      
230
      if (( iret = aio_return ( bfd->ctrlBlks + bfd->nextOpIndex )) == -1 ) 
231
	xabort ( "aio_return () failed" );
Deike Kleberg's avatar
Deike Kleberg committed
232

233
      bfd->prefIndex --;
Deike Kleberg's avatar
Deike Kleberg committed
234
235
    }

236
  xdebug ( "filename=%s, prefIndex=%d, return", bfd->name, bfd->prefIndex );
Deike Kleberg's avatar
Deike Kleberg committed
237
238
239
}

/***************************************************************/
240
241

// TODO: unify in IOModi, encapsulate in pio_queue.c
242
243
static void
queueCheckPA(queue_t *q, char *name)
244
{
Deike Kleberg's avatar
Deike Kleberg committed
245
  listElem_t *curr;
246
  bFiledataPA *bfd;
247
248

  curr = q->head;
249
  while ( curr )
250
    {
251
      bfd = ( bFiledataPA * ) curr->val;
252
253
254
      if (strcmp(name, bfd->name) == 0)
        xabort("Filename %s is already enqueued\n", name );

255
      curr = curr->next;
256
    }
257
}
Deike Kleberg's avatar
Deike Kleberg committed
258

259
260
/***************************************************************/

261
void pioWriterAIO(void)
Deike Kleberg's avatar
Deike Kleberg committed
262
{
263
  bFiledataPA *bfd; 
Deike Kleberg's avatar
Deike Kleberg committed
264
  queue_t * bibBFiledataPA;
Deike Kleberg's avatar
Deike Kleberg committed
265
266
  long amount, buffersize;
  char *messageBuffer, *pMB, *filename, *temp;
Deike Kleberg's avatar
Deike Kleberg committed
267
  int messagesize, source, tag, i, id;
Deike Kleberg's avatar
Deike Kleberg committed
268
  tag_t *rtag;
Deike Kleberg's avatar
Deike Kleberg committed
269
  MPI_Status status;
270
271
  MPI_Comm commNode = commInqCommNode ();
  int nProcsCollNode = commInqSizeNode () - commInqSizeColl ();
272
  bool * sentFinalize, doFinalize;
Deike Kleberg's avatar
Deike Kleberg committed
273

274
  if ( nPrefStreams < 1 ) xabort("USAGE: # PREFETCH STREAMS >= 1");
275
  xdebug ( "nProcsCollNode=%d on this node", nProcsCollNode );
Deike Kleberg's avatar
Deike Kleberg committed
276
 
277
  bibBFiledataPA = queueInit ( destroyBFiledataPA, compareNamesBPA );
278
  sentFinalize = xmalloc ( nProcsCollNode * sizeof ( sentFinalize ));
Deike Kleberg's avatar
Deike Kleberg committed
279
280
  
  for ( ;; )
281
282
283
284
    {   
      xmpiStat ( MPI_Probe ( MPI_ANY_SOURCE, MPI_ANY_TAG, commNode, 
                             &status ), &status );

Deike Kleberg's avatar
Deike Kleberg committed
285
286
      source = status.MPI_SOURCE;
      tag    = status.MPI_TAG;
Deike Kleberg's avatar
Deike Kleberg committed
287
      rtag   = getTag ( tag );
Deike Kleberg's avatar
Deike Kleberg committed
288

289
      xmpi ( MPI_Get_count ( &status, MPI_CHAR, &messagesize ));
Deike Kleberg's avatar
Deike Kleberg committed
290

291
292
293
294
      xdebug ( "receive message from source=%d, id=%d, command=%d ( %s ), "
               "messagesize=%d",
               source, rtag->id, rtag->command, 
               command2charP [ rtag->command ], messagesize);
Deike Kleberg's avatar
Deike Kleberg committed
295
296
297
298

      switch ( rtag->command )
	{
      	case IO_Open_file:
Deike Kleberg's avatar
Deike Kleberg committed
299

300
301
	  messageBuffer = ( char *) xmalloc ( messagesize * 
                                              sizeof ( messageBuffer[0] ));
Deike Kleberg's avatar
Deike Kleberg committed
302
303
	  pMB = messageBuffer;

304
305
	  xmpi ( MPI_Recv ( messageBuffer, messagesize, MPI_CHAR, source, 
                            tag, commNode, &status ));
Deike Kleberg's avatar
Deike Kleberg committed
306
307
308
309
310
311
312
313

	  filename = strtok ( pMB, token );
	  pMB += ( strlen ( filename ) + 1 );
	  temp =  strtok ( pMB, token );
	  buffersize =  strtol ( temp, NULL, 16 );
	  pMB += ( strlen ( temp ) + 1 );
	  amount = ( long ) ( messageBuffer + messagesize - pMB );

314
315
316
	  xdebug ( "command  %s, filename=%s, buffersize=%ld, amount=%ld",
                   command2charP [ rtag->command ], filename, 
                   buffersize, amount ); 
Deike Kleberg's avatar
Deike Kleberg committed
317
318

	  
319
	  if ( !(bfd = ( bFiledataPA * ) queueIdx2val ( bibBFiledataPA, 
Deike Kleberg's avatar
Deike Kleberg committed
320
321
							     rtag->id )))
	    {
322
	      queueCheckPA (  bibBFiledataPA, filename );
323
	      bfd = ( bFiledataPA * ) initBFiledataPA ( filename, 
Deike Kleberg's avatar
Deike Kleberg committed
324
								 buffersize,  
325
							       nProcsCollNode );
326
	      if (( id = queuePush ( bibBFiledataPA, bfd, 1, rtag->id )) != rtag->id )
Thomas Jahns's avatar
Thomas Jahns committed
327
                xabort("fileID=%d not unique", rtag->id);
Deike Kleberg's avatar
Deike Kleberg committed
328
	    }
329
	  else
Thomas Jahns's avatar
Thomas Jahns committed
330
331
	    if (strcmp(filename, bfd->name) != 0)
              xabort("filename is not consistent, fileID=%d", rtag->id);
Deike Kleberg's avatar
Deike Kleberg committed
332

333
334
	  bfd->currOpIndex = bfd->nextOpIndex;
	  bfd->nextOpIndex = ( bfd->nextOpIndex + 1 ) % nPrefStreams;
Deike Kleberg's avatar
Deike Kleberg committed
335

Thomas Jahns's avatar
Thomas Jahns committed
336
	  memcpy ( bfd->fb->buffer + ( bfd->currOpIndex * bfd->size ),
337
                   pMB, ( int ) amount );
Deike Kleberg's avatar
Deike Kleberg committed
338

339
	  writePA ( bfd, amount );
Thomas Jahns's avatar
Thomas Jahns committed
340

Deike Kleberg's avatar
Deike Kleberg committed
341
342
343
344
345
	  free ( messageBuffer );

	  break;

	case IO_Send_buffer:
346
347
348
349

	  if (!(bfd = (bFiledataPA *)queueIdx2val(bibBFiledataPA, rtag->id)))
            xabort("fileID=%d is not in queue", rtag->id);

Deike Kleberg's avatar
Deike Kleberg committed
350
351
	  amount = messagesize;

352
353
	  xdebug ( "command: %s, id=%d, name=%s", 
                   command2charP [ rtag->command ], rtag->id, bfd->name );
Deike Kleberg's avatar
Deike Kleberg committed
354

355
356
	  bfd->currOpIndex = bfd->nextOpIndex;
	  bfd->nextOpIndex = ( bfd->nextOpIndex + 1 ) % nPrefStreams;
Deike Kleberg's avatar
Deike Kleberg committed
357
	  
358
359
	  xmpi(MPI_Recv((void *)bfd->ctrlBlks[bfd->currOpIndex].aio_buf,
                        amount, MPI_CHAR, source, tag, commNode, &status ));
Deike Kleberg's avatar
Deike Kleberg committed
360

361
	  writePA ( bfd, amount );
Deike Kleberg's avatar
Deike Kleberg committed
362
363
364
365
	  
	  break;

	case IO_Close_file:
366
367
368
369

	  if (!(bfd = (bFiledataPA *)queueIdx2val(bibBFiledataPA, rtag->id)))
            xabort("fileID=%d is not in queue", rtag->id );

Deike Kleberg's avatar
Deike Kleberg committed
370
371
	  amount = messagesize;

372
	  xdebug ( " command %s, id=%d, name=%s", 
373
                   command2charP [ rtag->command ], rtag->id, bfd->name );
Deike Kleberg's avatar
Deike Kleberg committed
374

375
	  bfd->currOpIndex = bfd->nextOpIndex;
Deike Kleberg's avatar
Deike Kleberg committed
376

377
	  bfd->nextOpIndex = ( bfd->nextOpIndex + 1 ) % nPrefStreams;
Deike Kleberg's avatar
Deike Kleberg committed
378

379
	  MPI_Recv (  bfd->fb->buffer + ( bfd->currOpIndex * bfd->size ), 
380
		      amount, MPI_CHAR, source,  tag, commNode, &status );
Deike Kleberg's avatar
Deike Kleberg committed
381

382
	  writePA ( bfd, amount );
Deike Kleberg's avatar
Deike Kleberg committed
383

384
	  bfd->nfinished[source] = true;
385
	  bfd->finished          = true;
Deike Kleberg's avatar
Deike Kleberg committed
386
	  
387
	  for ( i = 0; i < nProcsCollNode; i++ )
388
	    if ( !( bfd->nfinished[i] ))
Deike Kleberg's avatar
Deike Kleberg committed
389
	      {
390
		bfd->finished = false;
Deike Kleberg's avatar
Deike Kleberg committed
391
392
393
		break;
	      }
	  
394
	  if ( bfd->finished )
Deike Kleberg's avatar
Deike Kleberg committed
395
	    {
396
              xdebug ( "all are finished with file %d, delete node", rtag->id );
397
	      queueDelNode ( bibBFiledataPA, rtag->id );
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
	    }
          break;
        case IO_Finalize:
          {
            int buffer = CDI_UNDEFID, collID;

            xmpi ( MPI_Recv ( &buffer, 1, MPI_INT, source, tag, commNode, &status ));
            sentFinalize[source] = true;
            doFinalize = true;
            for ( collID = 0; collID < nProcsCollNode; collID++ )
              if ( !sentFinalize[collID] ) 
                {
                  doFinalize = false;
                  break;
                }
            if ( doFinalize )
              {
                if ( bibBFiledataPA->head != NULL )
                  {
                    xabort ( "Queue bibBfiledataP is not empty." );
                  }
                else
                  {
421
422
                    xdebug("%s", "all files are finished, destroy queue,"
                           " return");
423
424
425
426
427
428
429
430
431
432
                    queueDestroy ( bibBFiledataPA );
                  }
                if ( rtag ) ungetTag ( rtag );
                return;
              }
          }

          break;
        default:
          xabort ( "COMMAND NOT IMPLEMENTED" );
Deike Kleberg's avatar
Deike Kleberg committed
433
434
435
436
437
438
439
440
	}
    }
}



/***************************************************************/

441
442
/***************************************************************/

Deike Kleberg's avatar
Deike Kleberg committed
443
444

#endif
445
#endif
446
447
448
449
450
451
452
453
454
/*
 * Local Variables:
 * c-file-style: "Java"
 * c-basic-offset: 2
 * indent-tabs-mode: nil
 * show-trailing-whitespace: t
 * require-trailing-newline: t
 * End:
 */