pio_posixasynch.c 11.5 KB
Newer Older
1
2
3
4
/* 
   todo  
   README: specialRank Pe closes down, when all output files are closed    
*/
5
6
7
#ifdef HAVE_CONFIG_H
#  include "config.h"
#endif
Deike Kleberg's avatar
Deike Kleberg committed
8

9
10

#ifdef USE_MPI
11
#ifndef _SX
Deike Kleberg's avatar
Deike Kleberg committed
12
13
14
15

#include <stdbool.h>
#include <stdio.h>
#include <string.h>
16
#include <unistd.h>
Deike Kleberg's avatar
Deike Kleberg committed
17
18
19
#include <errno.h>

#include <aio.h>
20
21
#include <sys/types.h>
#include <sys/stat.h>
Deike Kleberg's avatar
Deike Kleberg committed
22
23
24
25
#include <fcntl.h>

#include "mpi.h"
#include "pio.h"
26
#include "pio_comm.h"
Deike Kleberg's avatar
Deike Kleberg committed
27
#include "pio_impl.h"
Deike Kleberg's avatar
Deike Kleberg committed
28
29
#include "pio_util.h"

30
extern char * command2charP[6];
Deike Kleberg's avatar
Deike Kleberg committed
31

32
33
extern char *token;

Deike Kleberg's avatar
Deike Kleberg committed
34
35
36
37
38
39
40
41
extern double accumProbe;
extern double accumRecv;
extern double accumSend;
extern double accumSuspend;
extern double accumWait;
extern double accumWrite;


42
typedef struct
Deike Kleberg's avatar
Deike Kleberg committed
43
44
45
46
47
48
49
50
51
52
53
54
{
  char *name;
  size_t size;
  struct dBuffer *fb;
  int handle;
  struct aiocb *ctrlBlks;
  off_t offset;
  int currOpIndex;
  int nextOpIndex;
  int prefIndex;
  bool finished;
  bool *nfinished;
55
  int idx;
56
} bFiledataPA;
Deike Kleberg's avatar
Deike Kleberg committed
57

58
59
60
61
62
63
static int
idxTest(void *a, void *idx)
{
  return ((bFiledataPA *)a)->idx == (int)(intptr_t)idx;
}

64
int nPrefStreams = 4;
Deike Kleberg's avatar
Deike Kleberg committed
65
66
67

/***************************************************************/

68
69
static bFiledataPA *
initBFiledataPA(char *filename, size_t bs, int nc)
Deike Kleberg's avatar
Deike Kleberg committed
70
{
71
  bFiledataPA *bfd;
Deike Kleberg's avatar
Deike Kleberg committed
72
73
  int i;

74
75
  xdebug ( "filename=%s, buffersize=%zu, ncollectors=%d, nPrefetchStreams=%d",
           filename, bs, nc, nPrefStreams );
Deike Kleberg's avatar
Deike Kleberg committed
76

77
  bfd = xmalloc ( sizeof ( bfd[0] ));
Deike Kleberg's avatar
Deike Kleberg committed
78

Thomas Jahns's avatar
Thomas Jahns committed
79
  bfd->name = xmalloc (( strlen ( filename ) + 1) * sizeof ( bfd->name[0] ));
80
81
  strcpy ( bfd->name, filename );
  bfd->size = bs;
Deike Kleberg's avatar
Deike Kleberg committed
82

83
  if (( bfd->handle = open ( bfd->name, O_CREAT | O_WRONLY, 0666 )) == -1 )
84
    xabort("Failed to open %s", bfd->name);
Deike Kleberg's avatar
Deike Kleberg committed
85

86
  dbuffer_init ( &( bfd->fb ), ( size_t )( nPrefStreams * bfd->size ));
Deike Kleberg's avatar
Deike Kleberg committed
87

88
  bfd->ctrlBlks = xcalloc(nPrefStreams, sizeof (bfd->ctrlBlks[0]));
Deike Kleberg's avatar
Deike Kleberg committed
89
90
91

  for ( i = 0; i < nPrefStreams; i++ )
    {
92
93
94
95
      bfd->ctrlBlks[i].aio_fildes     = bfd->handle;
      bfd->ctrlBlks[i].aio_buf        = bfd->fb->buffer + ( i * bfd->size );
      bfd->ctrlBlks[i].aio_reqprio    = 0;
      bfd->ctrlBlks[i].aio_sigevent.sigev_notify = SIGEV_NONE;   
Deike Kleberg's avatar
Deike Kleberg committed
96
97
    }
  
98
99
100
101
  bfd->nextOpIndex = 0;
  bfd->prefIndex = 0; 
  bfd->offset = 0;
  bfd->finished = false;
102
  bfd->nfinished = xmalloc ( nc * sizeof ( bfd->nfinished[0] ));
Deike Kleberg's avatar
Deike Kleberg committed
103

Deike Kleberg's avatar
Deike Kleberg committed
104
  for ( i = 0; i < nc; i++ )
105
    bfd->nfinished[i] = false;
Deike Kleberg's avatar
Deike Kleberg committed
106

107
  xdebug ( "filename=%s, opened file, return", bfd->name );
Deike Kleberg's avatar
Deike Kleberg committed
108

109
  return bfd;
Deike Kleberg's avatar
Deike Kleberg committed
110
111
112
113
}

/***************************************************************/

114
115
static int
destroyBFiledataPA ( void *v )
Deike Kleberg's avatar
Deike Kleberg committed
116
{
117
  bFiledataPA *bfd = (bFiledataPA * ) v;
Deike Kleberg's avatar
Deike Kleberg committed
118
119
  const struct aiocb *ccBP[1];
  int iret = 0;
Thomas Jahns's avatar
Thomas Jahns committed
120
  ssize_t ssiret;
121
122
  int nextFinishOp = (bfd->nextOpIndex - bfd->prefIndex + nPrefStreams)
    % nPrefStreams;
123
  double startTime;
Deike Kleberg's avatar
Deike Kleberg committed
124

Thomas Jahns's avatar
Thomas Jahns committed
125
  xdebug ( "filename=%s, cleanup and close file", bfd->name );
Deike Kleberg's avatar
Deike Kleberg committed
126
127
128

  /* close file */

129
  for (; bfd->prefIndex > 0 ; bfd->prefIndex --)
Deike Kleberg's avatar
Deike Kleberg committed
130
    {
Thomas Jahns's avatar
Thomas Jahns committed
131
      xdebug("file: %s, prefIndex=%d", bfd->name, (int)bfd->prefIndex);
132
      ccBP[0] = ( bfd->ctrlBlks + nextFinishOp );
Deike Kleberg's avatar
Deike Kleberg committed
133
134
135
136
137
138
139

      if ( ddebug )
	startTime = MPI_Wtime ();

      do
	{
	  iret = aio_suspend ( ccBP, 1, NULL );
140
	  if ( iret < 0 && errno != EINTR ) xabort ( "aio_suspend () failed" );
141
	}
142
      while ( iret != 0 );
Deike Kleberg's avatar
Deike Kleberg committed
143
144
145

      if ( ddebug )
	accumSuspend += ( MPI_Wtime () - startTime);
146
147
148

      iret = aio_error(bfd->ctrlBlks + nextFinishOp);
      if (( ssiret = aio_return ( bfd->ctrlBlks + nextFinishOp )) == -1 )
Thomas Jahns's avatar
Thomas Jahns committed
149
	xabort("aio_return () failed: %s", strerror(iret));
150
151

      nextFinishOp = ( nextFinishOp + 1 ) % nPrefStreams;
Deike Kleberg's avatar
Deike Kleberg committed
152
153
    }

154
  if (( iret = close ( bfd->handle )) == -1 )
Thomas Jahns's avatar
Thomas Jahns committed
155
    xabort("failed to close %s", bfd->name);
Deike Kleberg's avatar
Deike Kleberg committed
156
157
158

  /* file closed, cleanup */

159
  dbuffer_cleanup ( &( bfd->fb ));
Deike Kleberg's avatar
Deike Kleberg committed
160

161
162
163
164
  free ( bfd->nfinished );
  free ( bfd->ctrlBlks );
  free ( bfd->name );
  free ( bfd );
Deike Kleberg's avatar
Deike Kleberg committed
165

166
  xdebug("%s", "closed file and cleaned up, return");
Deike Kleberg's avatar
Deike Kleberg committed
167
168
169
170
171
172

  return iret;
}

/***************************************************************/

173
174
static bool
compareNamesBPA(void *v1, void *v2)
175
{
176
  bFiledataPA *bfd1 = v1, *bfd2 = v2;
177

178
  return !strcmp(bfd1->name, bfd2->name);
179
180
181
182
}

/***************************************************************/

183
184
static void
writePA(bFiledataPA *bfd, long amount)
Deike Kleberg's avatar
Deike Kleberg committed
185
186
187
{
  const struct aiocb *ccBP[1];
  int iret;
188
  double startTime;
Deike Kleberg's avatar
Deike Kleberg committed
189

190
  xdebug ( "file %s, in", bfd->name );
Deike Kleberg's avatar
Deike Kleberg committed
191
  
192
193
  bfd->ctrlBlks[bfd->currOpIndex].aio_nbytes = amount;
  bfd->ctrlBlks[bfd->currOpIndex].aio_offset = bfd->offset;
Deike Kleberg's avatar
Deike Kleberg committed
194

195
196
197
  xdebug ( " before aio_write(), file %s, aio_nbytes=%zu, aio_offset=%zu",
           bfd->name, bfd->ctrlBlks[bfd->currOpIndex].aio_nbytes,
           bfd->ctrlBlks[bfd->currOpIndex].aio_offset );
Deike Kleberg's avatar
Deike Kleberg committed
198

199
  if ( ddebug ) startTime = MPI_Wtime ();
Deike Kleberg's avatar
Deike Kleberg committed
200

201
  iret = aio_write ( bfd->ctrlBlks + bfd->currOpIndex );
Deike Kleberg's avatar
Deike Kleberg committed
202

203
  if ( ddebug ) accumWrite += ( MPI_Wtime () - startTime);
Deike Kleberg's avatar
Deike Kleberg committed
204

205
206
207
208
  xdebug ( "after aio_write(), file %s, aio_nbytes=%zu, aio_offset=%zu,"
           "iret=aio_write()=%d",
           bfd->name, bfd->ctrlBlks[bfd->currOpIndex].aio_nbytes,
           bfd->ctrlBlks[bfd->currOpIndex].aio_offset, iret );
209
210
211
212
213
   
  if ( iret == -1 ) 
    {
      xabort ( "did not succeed writing buffer" );
    }
214
215
216
  else
    xdebug ( "buffer written to %s",  bfd->name );
     
217
218
  bfd->offset += ( off_t ) amount;
  bfd->prefIndex ++;
Deike Kleberg's avatar
Deike Kleberg committed
219

220
  if ( bfd->prefIndex >= nPrefStreams ) 
Deike Kleberg's avatar
Deike Kleberg committed
221
    {
222
      ccBP[0] = ( bfd->ctrlBlks + bfd->nextOpIndex );
Deike Kleberg's avatar
Deike Kleberg committed
223
224
225
226
227
228
229
230

      if ( ddebug )
	startTime = MPI_Wtime ();

      do
	{
	  iret = aio_suspend ( ccBP, 1, NULL );
	  if ( iret < 0 && errno != EINTR )
231
	    xabort ( "aio_suspend () failed" );
Deike Kleberg's avatar
Deike Kleberg committed
232
233
234
235
236
	} while ( iret != 0 );

      if ( ddebug )
	accumSuspend += ( MPI_Wtime () - startTime);
	      
237
      if (( iret = aio_return ( bfd->ctrlBlks + bfd->nextOpIndex )) == -1 ) 
238
	xabort ( "aio_return () failed" );
Deike Kleberg's avatar
Deike Kleberg committed
239

240
      bfd->prefIndex --;
Deike Kleberg's avatar
Deike Kleberg committed
241
242
    }

243
  xdebug ( "filename=%s, prefIndex=%d, return", bfd->name, bfd->prefIndex );
Deike Kleberg's avatar
Deike Kleberg committed
244
245
246
}

/***************************************************************/
247
static void
248
elemCheck(void *q, void *nm)
249
{
250
251
  bFiledataPA *bfd = q;
  const char *name = nm;
252

253
254
  if (!strcmp(name, bfd->name))
    xabort("Filename %s is already enqueued\n", name);
255
}
Deike Kleberg's avatar
Deike Kleberg committed
256

257
258
/***************************************************************/

259
void pioWriterAIO(void)
Deike Kleberg's avatar
Deike Kleberg committed
260
{
261
  bFiledataPA *bfd; 
Deike Kleberg's avatar
Deike Kleberg committed
262
  queue_t * bibBFiledataPA;
Deike Kleberg's avatar
Deike Kleberg committed
263
264
  long amount, buffersize;
  char *messageBuffer, *pMB, *filename, *temp;
Deike Kleberg's avatar
Deike Kleberg committed
265
  int messagesize, source, tag, i, id;
Deike Kleberg's avatar
Deike Kleberg committed
266
  tag_t *rtag;
Deike Kleberg's avatar
Deike Kleberg committed
267
  MPI_Status status;
268
269
  MPI_Comm commNode = commInqCommNode ();
  int nProcsCollNode = commInqSizeNode () - commInqSizeColl ();
270
  bool * sentFinalize, doFinalize;
Deike Kleberg's avatar
Deike Kleberg committed
271

272
  if ( nPrefStreams < 1 ) xabort("USAGE: # PREFETCH STREAMS >= 1");
273
  xdebug ( "nProcsCollNode=%d on this node", nProcsCollNode );
Deike Kleberg's avatar
Deike Kleberg committed
274
 
275
  bibBFiledataPA = queueInit ( destroyBFiledataPA, compareNamesBPA );
276
  sentFinalize = xmalloc ( nProcsCollNode * sizeof ( sentFinalize ));
Deike Kleberg's avatar
Deike Kleberg committed
277
278
  
  for ( ;; )
279
280
281
282
    {   
      xmpiStat ( MPI_Probe ( MPI_ANY_SOURCE, MPI_ANY_TAG, commNode, 
                             &status ), &status );

Deike Kleberg's avatar
Deike Kleberg committed
283
284
      source = status.MPI_SOURCE;
      tag    = status.MPI_TAG;
Deike Kleberg's avatar
Deike Kleberg committed
285
      rtag   = getTag ( tag );
Deike Kleberg's avatar
Deike Kleberg committed
286

287
      xmpi ( MPI_Get_count ( &status, MPI_CHAR, &messagesize ));
Deike Kleberg's avatar
Deike Kleberg committed
288

289
290
291
292
      xdebug ( "receive message from source=%d, id=%d, command=%d ( %s ), "
               "messagesize=%d",
               source, rtag->id, rtag->command, 
               command2charP [ rtag->command ], messagesize);
Deike Kleberg's avatar
Deike Kleberg committed
293
294
295
296

      switch ( rtag->command )
	{
      	case IO_Open_file:
Deike Kleberg's avatar
Deike Kleberg committed
297

298
299
	  messageBuffer = ( char *) xmalloc ( messagesize * 
                                              sizeof ( messageBuffer[0] ));
Deike Kleberg's avatar
Deike Kleberg committed
300
301
	  pMB = messageBuffer;

302
303
	  xmpi ( MPI_Recv ( messageBuffer, messagesize, MPI_CHAR, source, 
                            tag, commNode, &status ));
Deike Kleberg's avatar
Deike Kleberg committed
304
305
306
307
308
309
310
311

	  filename = strtok ( pMB, token );
	  pMB += ( strlen ( filename ) + 1 );
	  temp =  strtok ( pMB, token );
	  buffersize =  strtol ( temp, NULL, 16 );
	  pMB += ( strlen ( temp ) + 1 );
	  amount = ( long ) ( messageBuffer + messagesize - pMB );

312
313
	  xdebug ( "command  %s, filename=%s, buffersize=%ld, amount=%ld",
                   command2charP [ rtag->command ], filename, 
314
                   buffersize, amount );
Deike Kleberg's avatar
Deike Kleberg committed
315

316
317
          if (!(bfd = queueGet(bibBFiledataPA, idxTest,
                               (void *)(intptr_t)rtag->id)))
Deike Kleberg's avatar
Deike Kleberg committed
318
	    {
319
              queueForeach(bibBFiledataPA, elemCheck, filename);
320
	      bfd = initBFiledataPA(filename, buffersize, nProcsCollNode);
Thomas Jahns's avatar
Thomas Jahns committed
321
              if ((id = queueAdd(bibBFiledataPA, bfd)) < 0)
Thomas Jahns's avatar
Thomas Jahns committed
322
                xabort("fileID=%d not unique", rtag->id);
Deike Kleberg's avatar
Deike Kleberg committed
323
	    }
324
	  else
Thomas Jahns's avatar
Thomas Jahns committed
325
326
	    if (strcmp(filename, bfd->name) != 0)
              xabort("filename is not consistent, fileID=%d", rtag->id);
Deike Kleberg's avatar
Deike Kleberg committed
327

328
329
	  bfd->currOpIndex = bfd->nextOpIndex;
	  bfd->nextOpIndex = ( bfd->nextOpIndex + 1 ) % nPrefStreams;
Deike Kleberg's avatar
Deike Kleberg committed
330

Thomas Jahns's avatar
Thomas Jahns committed
331
	  memcpy ( bfd->fb->buffer + ( bfd->currOpIndex * bfd->size ),
332
                   pMB, ( int ) amount );
Deike Kleberg's avatar
Deike Kleberg committed
333

334
	  writePA ( bfd, amount );
Thomas Jahns's avatar
Thomas Jahns committed
335

Deike Kleberg's avatar
Deike Kleberg committed
336
337
338
339
340
	  free ( messageBuffer );

	  break;

	case IO_Send_buffer:
341

342
343
          if (!(bfd = queueGet(bibBFiledataPA, idxTest,
                               (void *)(intptr_t)rtag->id)))
344
345
            xabort("fileID=%d is not in queue", rtag->id);

Deike Kleberg's avatar
Deike Kleberg committed
346
347
	  amount = messagesize;

348
349
	  xdebug ( "command: %s, id=%d, name=%s", 
                   command2charP [ rtag->command ], rtag->id, bfd->name );
Deike Kleberg's avatar
Deike Kleberg committed
350

351
352
	  bfd->currOpIndex = bfd->nextOpIndex;
	  bfd->nextOpIndex = ( bfd->nextOpIndex + 1 ) % nPrefStreams;
Deike Kleberg's avatar
Deike Kleberg committed
353
	  
354
355
	  xmpi(MPI_Recv((void *)bfd->ctrlBlks[bfd->currOpIndex].aio_buf,
                        amount, MPI_CHAR, source, tag, commNode, &status ));
Deike Kleberg's avatar
Deike Kleberg committed
356

357
	  writePA ( bfd, amount );
Deike Kleberg's avatar
Deike Kleberg committed
358
359
360
361
	  
	  break;

	case IO_Close_file:
362

363
364
          if (!(bfd = queueGet(bibBFiledataPA, idxTest,
                               (void *)(intptr_t)rtag->id)))
365
366
            xabort("fileID=%d is not in queue", rtag->id );

Deike Kleberg's avatar
Deike Kleberg committed
367
368
	  amount = messagesize;

369
	  xdebug ( " command %s, id=%d, name=%s", 
370
                   command2charP [ rtag->command ], rtag->id, bfd->name );
Deike Kleberg's avatar
Deike Kleberg committed
371

372
	  bfd->currOpIndex = bfd->nextOpIndex;
Deike Kleberg's avatar
Deike Kleberg committed
373

374
	  bfd->nextOpIndex = ( bfd->nextOpIndex + 1 ) % nPrefStreams;
Deike Kleberg's avatar
Deike Kleberg committed
375

376
	  MPI_Recv (  bfd->fb->buffer + ( bfd->currOpIndex * bfd->size ), 
377
		      amount, MPI_CHAR, source,  tag, commNode, &status );
Deike Kleberg's avatar
Deike Kleberg committed
378

379
	  writePA ( bfd, amount );
Deike Kleberg's avatar
Deike Kleberg committed
380

381
	  bfd->nfinished[source] = true;
382
	  bfd->finished          = true;
Deike Kleberg's avatar
Deike Kleberg committed
383
	  
384
	  for ( i = 0; i < nProcsCollNode; i++ )
385
	    if ( !( bfd->nfinished[i] ))
Deike Kleberg's avatar
Deike Kleberg committed
386
	      {
387
		bfd->finished = false;
Deike Kleberg's avatar
Deike Kleberg committed
388
389
390
		break;
	      }
	  
391
	  if ( bfd->finished )
Deike Kleberg's avatar
Deike Kleberg committed
392
	    {
393
              xdebug ( "all are finished with file %d, delete node", rtag->id );
394
395
              queueDelNode(bibBFiledataPA, idxTest,
                           (void *)(intptr_t)rtag->id);
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
	    }
          break;
        case IO_Finalize:
          {
            int buffer = CDI_UNDEFID, collID;

            xmpi ( MPI_Recv ( &buffer, 1, MPI_INT, source, tag, commNode, &status ));
            sentFinalize[source] = true;
            doFinalize = true;
            for ( collID = 0; collID < nProcsCollNode; collID++ )
              if ( !sentFinalize[collID] ) 
                {
                  doFinalize = false;
                  break;
                }
            if ( doFinalize )
              {
413
414
                if (!queueIsEmpty(bibBFiledataPA))
                  xabort("Queue bibBfiledataP is not empty.");
415
416
                else
                  {
417
418
                    xdebug("%s", "all files are finished, destroy queue,"
                           " return");
419
420
421
422
423
424
425
426
427
428
                    queueDestroy ( bibBFiledataPA );
                  }
                if ( rtag ) ungetTag ( rtag );
                return;
              }
          }

          break;
        default:
          xabort ( "COMMAND NOT IMPLEMENTED" );
Deike Kleberg's avatar
Deike Kleberg committed
429
430
431
432
433
434
435
436
	}
    }
}



/***************************************************************/

437
438
/***************************************************************/

Deike Kleberg's avatar
Deike Kleberg committed
439
440

#endif
441
#endif
442
443
444
445
446
447
448
449
450
/*
 * Local Variables:
 * c-file-style: "Java"
 * c-basic-offset: 2
 * indent-tabs-mode: nil
 * show-trailing-whitespace: t
 * require-trailing-newline: t
 * End:
 */