pio_posixasynch.c 11.1 KB
Newer Older
1
2
3
4
/* 
   todo  
   README: specialRank Pe closes down, when all output files are closed    
*/
5
6
7
#ifdef HAVE_CONFIG_H
#  include "config.h"
#endif
Deike Kleberg's avatar
Deike Kleberg committed
8

9
10

#ifdef USE_MPI
11
#ifndef _SX
Deike Kleberg's avatar
Deike Kleberg committed
12

Thomas Jahns's avatar
Thomas Jahns committed
13
14
15
#include <aio.h>
#include <errno.h>
#include <fcntl.h>
Deike Kleberg's avatar
Deike Kleberg committed
16
17
18
#include <stdbool.h>
#include <stdio.h>
#include <string.h>
19
20
#include <sys/types.h>
#include <sys/stat.h>
Thomas Jahns's avatar
Thomas Jahns committed
21
22
23
#include <unistd.h>

#include <mpi.h>
Deike Kleberg's avatar
Deike Kleberg committed
24
25

#include "pio.h"
26
#include "pio_comm.h"
Deike Kleberg's avatar
Deike Kleberg committed
27
#include "pio_impl.h"
Deike Kleberg's avatar
Deike Kleberg committed
28
#include "pio_util.h"
29
#include "dmemory.h"
Deike Kleberg's avatar
Deike Kleberg committed
30

31
extern char * command2charP[6];
Deike Kleberg's avatar
Deike Kleberg committed
32

33
34
extern char *token;

Deike Kleberg's avatar
Deike Kleberg committed
35
36
37
38
extern double accumSuspend;
extern double accumWrite;


39
typedef struct
Deike Kleberg's avatar
Deike Kleberg committed
40
41
42
43
44
45
46
{
  struct dBuffer *fb;
  struct aiocb *ctrlBlks;
  off_t offset;
  int currOpIndex;
  int nextOpIndex;
  int prefIndex;
47
  int activeCollectors;
48
  int handle, fileID;
49
  char name[];
50
} bFiledataPA;
Deike Kleberg's avatar
Deike Kleberg committed
51

52
static int
53
fileIDTest(void *a, void *fileID)
54
{
55
  return ((bFiledataPA *)a)->fileID == (int)(intptr_t)fileID;
56
57
}

58
int nPrefStreams = 4;
Deike Kleberg's avatar
Deike Kleberg committed
59
60
61

/***************************************************************/

62
63
static bFiledataPA *
initBFiledataPA(char *filename, size_t bs, int nc)
Deike Kleberg's avatar
Deike Kleberg committed
64
{
65
  bFiledataPA *bfd;
Deike Kleberg's avatar
Deike Kleberg committed
66
67
  int i;

68
69
  xdebug ( "filename=%s, buffersize=%zu, ncollectors=%d, nPrefetchStreams=%d",
           filename, bs, nc, nPrefStreams );
Deike Kleberg's avatar
Deike Kleberg committed
70

71
72
  bfd = xmalloc( sizeof (*bfd) + strlen(filename) + 1);
  strcpy(bfd->name, filename);
Deike Kleberg's avatar
Deike Kleberg committed
73

74
  if (( bfd->handle = open ( bfd->name, O_CREAT | O_WRONLY, 0666 )) == -1 )
75
    xabort("Failed to open %s", bfd->name);
Deike Kleberg's avatar
Deike Kleberg committed
76

77
  dbuffer_init(&(bfd->fb), (size_t)(nPrefStreams * bs));
Deike Kleberg's avatar
Deike Kleberg committed
78

79
  bfd->ctrlBlks = xcalloc(nPrefStreams, sizeof (bfd->ctrlBlks[0]));
Deike Kleberg's avatar
Deike Kleberg committed
80
81
82

  for ( i = 0; i < nPrefStreams; i++ )
    {
83
      bfd->ctrlBlks[i].aio_fildes     = bfd->handle;
84
      bfd->ctrlBlks[i].aio_buf = bfd->fb->buffer + i * bs;
85
86
      bfd->ctrlBlks[i].aio_reqprio    = 0;
      bfd->ctrlBlks[i].aio_sigevent.sigev_notify = SIGEV_NONE;   
Deike Kleberg's avatar
Deike Kleberg committed
87
88
    }
  
89
90
91
  bfd->nextOpIndex = 0;
  bfd->prefIndex = 0; 
  bfd->offset = 0;
92
  bfd->activeCollectors = nc;
Deike Kleberg's avatar
Deike Kleberg committed
93

94
  xdebug ( "filename=%s, opened file, return", bfd->name );
Deike Kleberg's avatar
Deike Kleberg committed
95

96
  return bfd;
Deike Kleberg's avatar
Deike Kleberg committed
97
98
99
100
}

/***************************************************************/

101
102
static int
destroyBFiledataPA ( void *v )
Deike Kleberg's avatar
Deike Kleberg committed
103
{
104
  bFiledataPA *bfd = (bFiledataPA * ) v;
Deike Kleberg's avatar
Deike Kleberg committed
105
106
  const struct aiocb *ccBP[1];
  int iret = 0;
Thomas Jahns's avatar
Thomas Jahns committed
107
  ssize_t ssiret;
108
109
  int nextFinishOp = (bfd->nextOpIndex - bfd->prefIndex + nPrefStreams)
    % nPrefStreams;
110
  double startTime;
Deike Kleberg's avatar
Deike Kleberg committed
111

Thomas Jahns's avatar
Thomas Jahns committed
112
  xdebug ( "filename=%s, cleanup and close file", bfd->name );
Deike Kleberg's avatar
Deike Kleberg committed
113
114
115

  /* close file */

116
  for (; bfd->prefIndex > 0 ; --(bfd->prefIndex))
Deike Kleberg's avatar
Deike Kleberg committed
117
    {
Thomas Jahns's avatar
Thomas Jahns committed
118
      xdebug("file: %s, prefIndex=%d", bfd->name, (int)bfd->prefIndex);
119
      ccBP[0] = ( bfd->ctrlBlks + nextFinishOp );
Deike Kleberg's avatar
Deike Kleberg committed
120
121
122
123
124
125
126

      if ( ddebug )
	startTime = MPI_Wtime ();

      do
	{
	  iret = aio_suspend ( ccBP, 1, NULL );
127
	  if ( iret < 0 && errno != EINTR ) xabort ( "aio_suspend () failed" );
128
	}
129
      while ( iret != 0 );
Deike Kleberg's avatar
Deike Kleberg committed
130
131
132

      if ( ddebug )
	accumSuspend += ( MPI_Wtime () - startTime);
133
134
135

      iret = aio_error(bfd->ctrlBlks + nextFinishOp);
      if (( ssiret = aio_return ( bfd->ctrlBlks + nextFinishOp )) == -1 )
Thomas Jahns's avatar
Thomas Jahns committed
136
	xabort("aio_return () failed: %s", strerror(iret));
137
138

      nextFinishOp = ( nextFinishOp + 1 ) % nPrefStreams;
Deike Kleberg's avatar
Deike Kleberg committed
139
140
    }

141
142
  if ((iret = ftruncate(bfd->handle, bfd->offset)) == -1)
    xabort("failed to truncate file %s: %s", bfd->name, strerror(errno));
143
  if (( iret = close ( bfd->handle )) == -1 )
Thomas Jahns's avatar
Thomas Jahns committed
144
    xabort("failed to close %s", bfd->name);
Deike Kleberg's avatar
Deike Kleberg committed
145
146
147

  /* file closed, cleanup */

148
  dbuffer_cleanup ( &( bfd->fb ));
Deike Kleberg's avatar
Deike Kleberg committed
149

150
  free(bfd->ctrlBlks);
151
  free(bfd);
Deike Kleberg's avatar
Deike Kleberg committed
152

153
  xdebug("%s", "closed file and cleaned up, return");
Deike Kleberg's avatar
Deike Kleberg committed
154
155
156
157
158
159

  return iret;
}

/***************************************************************/

160
161
static bool
compareNamesBPA(void *v1, void *v2)
162
{
163
  bFiledataPA *bfd1 = v1, *bfd2 = v2;
164

165
  return !strcmp(bfd1->name, bfd2->name);
166
167
168
169
}

/***************************************************************/

170
171
static void
writePA(bFiledataPA *bfd, long amount)
Deike Kleberg's avatar
Deike Kleberg committed
172
173
174
{
  const struct aiocb *ccBP[1];
  int iret;
175
  double startTime;
Deike Kleberg's avatar
Deike Kleberg committed
176

177
  xdebug ( "file %s, in", bfd->name );
Deike Kleberg's avatar
Deike Kleberg committed
178
  
179
180
  bfd->ctrlBlks[bfd->currOpIndex].aio_nbytes = amount;
  bfd->ctrlBlks[bfd->currOpIndex].aio_offset = bfd->offset;
Deike Kleberg's avatar
Deike Kleberg committed
181

182
183
184
  xdebug ( " before aio_write(), file %s, aio_nbytes=%zu, aio_offset=%zu",
           bfd->name, bfd->ctrlBlks[bfd->currOpIndex].aio_nbytes,
           bfd->ctrlBlks[bfd->currOpIndex].aio_offset );
Deike Kleberg's avatar
Deike Kleberg committed
185

186
  if ( ddebug ) startTime = MPI_Wtime ();
Deike Kleberg's avatar
Deike Kleberg committed
187

188
  iret = aio_write ( bfd->ctrlBlks + bfd->currOpIndex );
Deike Kleberg's avatar
Deike Kleberg committed
189

190
  if ( ddebug ) accumWrite += ( MPI_Wtime () - startTime);
Deike Kleberg's avatar
Deike Kleberg committed
191

192
193
194
195
  xdebug ( "after aio_write(), file %s, aio_nbytes=%zu, aio_offset=%zu,"
           "iret=aio_write()=%d",
           bfd->name, bfd->ctrlBlks[bfd->currOpIndex].aio_nbytes,
           bfd->ctrlBlks[bfd->currOpIndex].aio_offset, iret );
196
197
198
199
200
   
  if ( iret == -1 ) 
    {
      xabort ( "did not succeed writing buffer" );
    }
201
202
203
  else
    xdebug ( "buffer written to %s",  bfd->name );
     
204
205
  bfd->offset += ( off_t ) amount;
  bfd->prefIndex ++;
Deike Kleberg's avatar
Deike Kleberg committed
206

207
  if ( bfd->prefIndex >= nPrefStreams ) 
Deike Kleberg's avatar
Deike Kleberg committed
208
    {
209
      ccBP[0] = ( bfd->ctrlBlks + bfd->nextOpIndex );
Deike Kleberg's avatar
Deike Kleberg committed
210
211
212
213
214
215
216
217

      if ( ddebug )
	startTime = MPI_Wtime ();

      do
	{
	  iret = aio_suspend ( ccBP, 1, NULL );
	  if ( iret < 0 && errno != EINTR )
218
	    xabort ( "aio_suspend () failed" );
Deike Kleberg's avatar
Deike Kleberg committed
219
220
221
222
223
	} while ( iret != 0 );

      if ( ddebug )
	accumSuspend += ( MPI_Wtime () - startTime);
	      
224
      if (( iret = aio_return ( bfd->ctrlBlks + bfd->nextOpIndex )) == -1 ) 
225
	xabort ( "aio_return () failed" );
Deike Kleberg's avatar
Deike Kleberg committed
226

227
      bfd->prefIndex --;
Deike Kleberg's avatar
Deike Kleberg committed
228
229
    }

230
  xdebug ( "filename=%s, prefIndex=%d, return", bfd->name, bfd->prefIndex );
Deike Kleberg's avatar
Deike Kleberg committed
231
232
233
}

/***************************************************************/
234
static void
235
elemCheck(void *q, void *nm)
236
{
237
238
  bFiledataPA *bfd = q;
  const char *name = nm;
239

240
  if (!strcmp(name, bfd->name))
241
    xabort("Filename %s has already been inserted\n", name);
242
}
Deike Kleberg's avatar
Deike Kleberg committed
243

244
245
/***************************************************************/

246
void pioWriterAIO(void)
Deike Kleberg's avatar
Deike Kleberg committed
247
{
248
  bFiledataPA *bfd; 
249
  listSet * bibBFiledataPA;
Deike Kleberg's avatar
Deike Kleberg committed
250
251
  long amount, buffersize;
  char *messageBuffer, *pMB, *filename, *temp;
Thomas Jahns's avatar
Thomas Jahns committed
252
  int messagesize, source, tag, id;
253
  struct fileOpTag rtag;
Deike Kleberg's avatar
Deike Kleberg committed
254
  MPI_Status status;
255
256
  MPI_Comm commNode = commInqCommNode ();
  int nProcsCollNode = commInqSizeNode () - commInqSizeColl ();
257
  bool * sentFinalize, doFinalize;
Deike Kleberg's avatar
Deike Kleberg committed
258

259
  if ( nPrefStreams < 1 ) xabort("USAGE: # PREFETCH STREAMS >= 1");
260
  xdebug ( "nProcsCollNode=%d on this node", nProcsCollNode );
Deike Kleberg's avatar
Deike Kleberg committed
261
 
262
  bibBFiledataPA = listSetNew(destroyBFiledataPA, compareNamesBPA);
263
  sentFinalize = xmalloc ( nProcsCollNode * sizeof ( sentFinalize ));
Deike Kleberg's avatar
Deike Kleberg committed
264
265
  
  for ( ;; )
266
267
268
269
    {   
      xmpiStat ( MPI_Probe ( MPI_ANY_SOURCE, MPI_ANY_TAG, commNode, 
                             &status ), &status );

Deike Kleberg's avatar
Deike Kleberg committed
270
271
      source = status.MPI_SOURCE;
      tag    = status.MPI_TAG;
272
      rtag = decodeFileOpTag(tag);
Deike Kleberg's avatar
Deike Kleberg committed
273

274
      xmpi ( MPI_Get_count ( &status, MPI_CHAR, &messagesize ));
Deike Kleberg's avatar
Deike Kleberg committed
275

276
      xdebug ( "receive message from source=%d, id=%d, command=%d ( %s ), "
277
278
               "messagesize=%d", source, rtag.id, rtag.command,
               command2charP[rtag.command], messagesize);
Deike Kleberg's avatar
Deike Kleberg committed
279

280
      switch (rtag.command)
Deike Kleberg's avatar
Deike Kleberg committed
281
282
	{
      	case IO_Open_file:
Deike Kleberg's avatar
Deike Kleberg committed
283

284
285
	  messageBuffer = ( char *) xmalloc ( messagesize * 
                                              sizeof ( messageBuffer[0] ));
Deike Kleberg's avatar
Deike Kleberg committed
286
287
	  pMB = messageBuffer;

288
289
	  xmpi ( MPI_Recv ( messageBuffer, messagesize, MPI_CHAR, source, 
                            tag, commNode, &status ));
Deike Kleberg's avatar
Deike Kleberg committed
290
291
292
293
294
295
296
297

	  filename = strtok ( pMB, token );
	  pMB += ( strlen ( filename ) + 1 );
	  temp =  strtok ( pMB, token );
	  buffersize =  strtol ( temp, NULL, 16 );
	  pMB += ( strlen ( temp ) + 1 );
	  amount = ( long ) ( messageBuffer + messagesize - pMB );

298
299
	  xdebug("command  %s, filename=%s, buffersize=%ld, amount=%ld",
                 command2charP[rtag.command], filename, buffersize, amount);
Deike Kleberg's avatar
Deike Kleberg committed
300

301
          if (!(bfd = listSetGet(bibBFiledataPA, fileIDTest,
302
                               (void *)(intptr_t)rtag.id)))
Deike Kleberg's avatar
Deike Kleberg committed
303
	    {
304
              listSetForeach(bibBFiledataPA, elemCheck, filename);
305
	      bfd = initBFiledataPA(filename, buffersize, nProcsCollNode);
306
              if ((id = listSetAdd(bibBFiledataPA, bfd)) < 0)
307
                xabort("fileID=%d not unique", rtag.id);
308
              bfd->fileID = id;
Deike Kleberg's avatar
Deike Kleberg committed
309
	    }
310
	  else
Thomas Jahns's avatar
Thomas Jahns committed
311
	    if (strcmp(filename, bfd->name) != 0)
312
              xabort("filename is not consistent, fileID=%d", rtag.id);
Deike Kleberg's avatar
Deike Kleberg committed
313

314
315
	  bfd->currOpIndex = bfd->nextOpIndex;
	  bfd->nextOpIndex = ( bfd->nextOpIndex + 1 ) % nPrefStreams;
Deike Kleberg's avatar
Deike Kleberg committed
316

317
318
319
          xassert(amount >= 0);
	  memcpy((void *)bfd->ctrlBlks[bfd->currOpIndex].aio_buf,
                 pMB, (size_t)amount);
Deike Kleberg's avatar
Deike Kleberg committed
320

321
	  writePA ( bfd, amount );
Thomas Jahns's avatar
Thomas Jahns committed
322

Deike Kleberg's avatar
Deike Kleberg committed
323
324
325
326
327
	  free ( messageBuffer );

	  break;

	case IO_Send_buffer:
328

329
          if (!(bfd = listSetGet(bibBFiledataPA, fileIDTest,
330
                               (void *)(intptr_t)rtag.id)))
331
            xabort("fileID=%d is not in set", rtag.id);
332

Deike Kleberg's avatar
Deike Kleberg committed
333
334
	  amount = messagesize;

335
336
	  xdebug("command: %s, id=%d, name=%s",
                 command2charP[rtag.command], rtag.id, bfd->name );
Deike Kleberg's avatar
Deike Kleberg committed
337

338
339
	  bfd->currOpIndex = bfd->nextOpIndex;
	  bfd->nextOpIndex = ( bfd->nextOpIndex + 1 ) % nPrefStreams;
Deike Kleberg's avatar
Deike Kleberg committed
340
	  
341
342
	  xmpi(MPI_Recv((void *)bfd->ctrlBlks[bfd->currOpIndex].aio_buf,
                        amount, MPI_CHAR, source, tag, commNode, &status ));
Deike Kleberg's avatar
Deike Kleberg committed
343

344
	  writePA ( bfd, amount );
Deike Kleberg's avatar
Deike Kleberg committed
345
346
347
348
	  
	  break;

	case IO_Close_file:
349

350
          if (!(bfd = listSetGet(bibBFiledataPA, fileIDTest,
351
                               (void *)(intptr_t)rtag.id)))
352
            xabort("fileID=%d is not in set", rtag.id);
353

Deike Kleberg's avatar
Deike Kleberg committed
354
355
	  amount = messagesize;

356
357
	  xdebug(" command %s, id=%d, name=%s",
                 command2charP[rtag.command], rtag.id, bfd->name);
Deike Kleberg's avatar
Deike Kleberg committed
358

359
	  bfd->currOpIndex = bfd->nextOpIndex;
Deike Kleberg's avatar
Deike Kleberg committed
360

361
	  bfd->nextOpIndex = ( bfd->nextOpIndex + 1 ) % nPrefStreams;
Deike Kleberg's avatar
Deike Kleberg committed
362

363
364
	  MPI_Recv((void *)bfd->ctrlBlks[bfd->currOpIndex].aio_buf,
                   amount, MPI_CHAR, source, tag, commNode, &status);
Deike Kleberg's avatar
Deike Kleberg committed
365

366
	  writePA ( bfd, amount );
Deike Kleberg's avatar
Deike Kleberg committed
367

368
	  if ( ! --(bfd->activeCollectors))
Deike Kleberg's avatar
Deike Kleberg committed
369
	    {
370
              xdebug ( "all are finished with file %d, delete node", rtag.id);
371
372
              listSetRemove(bibBFiledataPA, fileIDTest,
                            (void *)(intptr_t)rtag.id);
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
	    }
          break;
        case IO_Finalize:
          {
            int buffer = CDI_UNDEFID, collID;

            xmpi ( MPI_Recv ( &buffer, 1, MPI_INT, source, tag, commNode, &status ));
            sentFinalize[source] = true;
            doFinalize = true;
            for ( collID = 0; collID < nProcsCollNode; collID++ )
              if ( !sentFinalize[collID] ) 
                {
                  doFinalize = false;
                  break;
                }
            if ( doFinalize )
              {
390
391
                if (!listSetIsEmpty(bibBFiledataPA))
                  xabort("Set bibBfiledataP is not empty.");
392
393
                else
                  {
394
                    xdebug("%s", "all files are finished, destroy set,"
395
                           " return");
396
                    listSetDelete(bibBFiledataPA);
397
398
399
400
401
402
403
404
                  }
                return;
              }
          }

          break;
        default:
          xabort ( "COMMAND NOT IMPLEMENTED" );
Deike Kleberg's avatar
Deike Kleberg committed
405
406
407
408
409
410
411
412
	}
    }
}



/***************************************************************/

413
414
/***************************************************************/

Deike Kleberg's avatar
Deike Kleberg committed
415
416

#endif
417
#endif
418
419
420
421
422
423
424
425
426
/*
 * Local Variables:
 * c-file-style: "Java"
 * c-basic-offset: 2
 * indent-tabs-mode: nil
 * show-trailing-whitespace: t
 * require-trailing-newline: t
 * End:
 */