pio_posixasynch.c 11 KB
Newer Older
1
2
3
4
/* 
   todo  
   README: specialRank Pe closes down, when all output files are closed    
*/
5
6
7
#ifdef HAVE_CONFIG_H
#  include "config.h"
#endif
Deike Kleberg's avatar
Deike Kleberg committed
8

9

10
#ifndef _SX
Deike Kleberg's avatar
Deike Kleberg committed
11

Thomas Jahns's avatar
Thomas Jahns committed
12
13
14
#include <aio.h>
#include <errno.h>
#include <fcntl.h>
Deike Kleberg's avatar
Deike Kleberg committed
15
16
17
#include <stdbool.h>
#include <stdio.h>
#include <string.h>
18
19
#include <sys/types.h>
#include <sys/stat.h>
Thomas Jahns's avatar
Thomas Jahns committed
20
21
22
#include <unistd.h>

#include <mpi.h>
Deike Kleberg's avatar
Deike Kleberg committed
23
24

#include "pio.h"
25
#include "pio_comm.h"
Deike Kleberg's avatar
Deike Kleberg committed
26
#include "pio_impl.h"
Deike Kleberg's avatar
Deike Kleberg committed
27
#include "pio_util.h"
28
#include "dmemory.h"
Deike Kleberg's avatar
Deike Kleberg committed
29

30
extern char * command2charP[6];
Deike Kleberg's avatar
Deike Kleberg committed
31

32
33
extern char *token;

Deike Kleberg's avatar
Deike Kleberg committed
34
35
36
37
extern double accumSuspend;
extern double accumWrite;


38
typedef struct
Deike Kleberg's avatar
Deike Kleberg committed
39
40
41
42
43
44
45
{
  struct dBuffer *fb;
  struct aiocb *ctrlBlks;
  off_t offset;
  int currOpIndex;
  int nextOpIndex;
  int prefIndex;
46
  int activeCollectors;
47
  int handle, fileID;
48
  char name[];
49
} bFiledataPA;
Deike Kleberg's avatar
Deike Kleberg committed
50

51
static int
52
fileIDTest(void *a, void *fileID)
53
{
54
  return ((bFiledataPA *)a)->fileID == (int)(intptr_t)fileID;
55
56
}

57
static int nPrefStreams = 4;
Deike Kleberg's avatar
Deike Kleberg committed
58
59
60

/***************************************************************/

61
62
static bFiledataPA *
initBFiledataPA(char *filename, size_t bs, int nc)
Deike Kleberg's avatar
Deike Kleberg committed
63
{
64
  bFiledataPA *bfd;
Deike Kleberg's avatar
Deike Kleberg committed
65

66
67
  xdebug ( "filename=%s, buffersize=%zu, ncollectors=%d, nPrefetchStreams=%d",
           filename, bs, nc, nPrefStreams );
Deike Kleberg's avatar
Deike Kleberg committed
68

Uwe Schulzweida's avatar
Uwe Schulzweida committed
69
  bfd = (bFiledataPA*) xmalloc( sizeof (*bfd) + strlen(filename) + 1);
70
  strcpy(bfd->name, filename);
Deike Kleberg's avatar
Deike Kleberg committed
71

72
  if (( bfd->handle = open ( bfd->name, O_CREAT | O_WRONLY, 0666 )) == -1 )
73
    xabort("Failed to open %s", bfd->name);
Deike Kleberg's avatar
Deike Kleberg committed
74

75
  dbuffer_init(&(bfd->fb), (size_t)nPrefStreams * bs);
Deike Kleberg's avatar
Deike Kleberg committed
76

77
  bfd->ctrlBlks = xcalloc((size_t)nPrefStreams, sizeof (bfd->ctrlBlks[0]));
Deike Kleberg's avatar
Deike Kleberg committed
78

79
  for (int i = 0; i < nPrefStreams; i++ )
Deike Kleberg's avatar
Deike Kleberg committed
80
    {
81
      bfd->ctrlBlks[i].aio_fildes     = bfd->handle;
82
      bfd->ctrlBlks[i].aio_buf = bfd->fb->buffer + (size_t)i * bs;
83
84
      bfd->ctrlBlks[i].aio_reqprio    = 0;
      bfd->ctrlBlks[i].aio_sigevent.sigev_notify = SIGEV_NONE;   
Deike Kleberg's avatar
Deike Kleberg committed
85
86
    }
  
87
88
89
  bfd->nextOpIndex = 0;
  bfd->prefIndex = 0; 
  bfd->offset = 0;
90
  bfd->activeCollectors = nc;
Deike Kleberg's avatar
Deike Kleberg committed
91

92
  xdebug ( "filename=%s, opened file, return", bfd->name );
Deike Kleberg's avatar
Deike Kleberg committed
93

94
  return bfd;
Deike Kleberg's avatar
Deike Kleberg committed
95
96
97
98
}

/***************************************************************/

99
100
static int
destroyBFiledataPA ( void *v )
Deike Kleberg's avatar
Deike Kleberg committed
101
{
102
  bFiledataPA *bfd = (bFiledataPA * ) v;
Deike Kleberg's avatar
Deike Kleberg committed
103
104
  const struct aiocb *ccBP[1];
  int iret = 0;
Thomas Jahns's avatar
Thomas Jahns committed
105
  ssize_t ssiret;
106
107
  int nextFinishOp = (bfd->nextOpIndex - bfd->prefIndex + nPrefStreams)
    % nPrefStreams;
108
  double startTime;
Deike Kleberg's avatar
Deike Kleberg committed
109

Thomas Jahns's avatar
Thomas Jahns committed
110
  xdebug ( "filename=%s, cleanup and close file", bfd->name );
Deike Kleberg's avatar
Deike Kleberg committed
111
112
113

  /* close file */

114
  for (; bfd->prefIndex > 0 ; --(bfd->prefIndex))
Deike Kleberg's avatar
Deike Kleberg committed
115
    {
Thomas Jahns's avatar
Thomas Jahns committed
116
      xdebug("file: %s, prefIndex=%d", bfd->name, (int)bfd->prefIndex);
117
      ccBP[0] = ( bfd->ctrlBlks + nextFinishOp );
Deike Kleberg's avatar
Deike Kleberg committed
118
119
120
121
122
123
124

      if ( ddebug )
	startTime = MPI_Wtime ();

      do
	{
	  iret = aio_suspend ( ccBP, 1, NULL );
125
	  if ( iret < 0 && errno != EINTR ) xabort ( "aio_suspend () failed" );
126
	}
127
      while ( iret != 0 );
Deike Kleberg's avatar
Deike Kleberg committed
128
129
130

      if ( ddebug )
	accumSuspend += ( MPI_Wtime () - startTime);
131
132
133

      iret = aio_error(bfd->ctrlBlks + nextFinishOp);
      if (( ssiret = aio_return ( bfd->ctrlBlks + nextFinishOp )) == -1 )
Thomas Jahns's avatar
Thomas Jahns committed
134
	xabort("aio_return () failed: %s", strerror(iret));
135
136

      nextFinishOp = ( nextFinishOp + 1 ) % nPrefStreams;
Deike Kleberg's avatar
Deike Kleberg committed
137
138
    }

139
140
  if ((iret = ftruncate(bfd->handle, bfd->offset)) == -1)
    xabort("failed to truncate file %s: %s", bfd->name, strerror(errno));
141
  if (( iret = close ( bfd->handle )) == -1 )
Thomas Jahns's avatar
Thomas Jahns committed
142
    xabort("failed to close %s", bfd->name);
Deike Kleberg's avatar
Deike Kleberg committed
143
144
145

  /* file closed, cleanup */

146
  dbuffer_cleanup ( &( bfd->fb ));
Deike Kleberg's avatar
Deike Kleberg committed
147

148
  free(bfd->ctrlBlks);
149
  free(bfd);
Deike Kleberg's avatar
Deike Kleberg committed
150

151
  xdebug("%s", "closed file and cleaned up, return");
Deike Kleberg's avatar
Deike Kleberg committed
152
153
154
155
156
157

  return iret;
}

/***************************************************************/

158
159
static bool
compareNamesBPA(void *v1, void *v2)
160
{
161
  bFiledataPA *bfd1 = v1, *bfd2 = v2;
162

163
  return !strcmp(bfd1->name, bfd2->name);
164
165
166
167
}

/***************************************************************/

168
static void
169
writePA(bFiledataPA *bfd, size_t amount)
Deike Kleberg's avatar
Deike Kleberg committed
170
171
{
  const struct aiocb *ccBP[1];
172
  ssize_t iret;
173
  double startTime;
Deike Kleberg's avatar
Deike Kleberg committed
174

175
  xdebug ( "file %s, in", bfd->name );
Deike Kleberg's avatar
Deike Kleberg committed
176
  
177
178
  bfd->ctrlBlks[bfd->currOpIndex].aio_nbytes = amount;
  bfd->ctrlBlks[bfd->currOpIndex].aio_offset = bfd->offset;
Deike Kleberg's avatar
Deike Kleberg committed
179

180
181
182
  xdebug ( " before aio_write(), file %s, aio_nbytes=%zu, aio_offset=%zu",
           bfd->name, bfd->ctrlBlks[bfd->currOpIndex].aio_nbytes,
           bfd->ctrlBlks[bfd->currOpIndex].aio_offset );
Deike Kleberg's avatar
Deike Kleberg committed
183

184
  if ( ddebug ) startTime = MPI_Wtime ();
Deike Kleberg's avatar
Deike Kleberg committed
185

186
  iret = aio_write ( bfd->ctrlBlks + bfd->currOpIndex );
Deike Kleberg's avatar
Deike Kleberg committed
187

188
  if ( ddebug ) accumWrite += ( MPI_Wtime () - startTime);
Deike Kleberg's avatar
Deike Kleberg committed
189

190
191
192
  xdebug ( "after aio_write(), file %s, aio_nbytes=%zu, aio_offset=%zu,"
           "iret=aio_write()=%d",
           bfd->name, bfd->ctrlBlks[bfd->currOpIndex].aio_nbytes,
193
           bfd->ctrlBlks[bfd->currOpIndex].aio_offset, (int)iret );
194
195
196
197
198
   
  if ( iret == -1 ) 
    {
      xabort ( "did not succeed writing buffer" );
    }
199
200
201
  else
    xdebug ( "buffer written to %s",  bfd->name );
     
202
  bfd->offset += (off_t)amount;
203
  bfd->prefIndex ++;
Deike Kleberg's avatar
Deike Kleberg committed
204

205
  if ( bfd->prefIndex >= nPrefStreams ) 
Deike Kleberg's avatar
Deike Kleberg committed
206
    {
207
      ccBP[0] = ( bfd->ctrlBlks + bfd->nextOpIndex );
Deike Kleberg's avatar
Deike Kleberg committed
208
209
210
211
212
213
214
215

      if ( ddebug )
	startTime = MPI_Wtime ();

      do
	{
	  iret = aio_suspend ( ccBP, 1, NULL );
	  if ( iret < 0 && errno != EINTR )
216
	    xabort ( "aio_suspend () failed" );
Deike Kleberg's avatar
Deike Kleberg committed
217
218
219
220
221
	} while ( iret != 0 );

      if ( ddebug )
	accumSuspend += ( MPI_Wtime () - startTime);
	      
222
      if (( iret = aio_return ( bfd->ctrlBlks + bfd->nextOpIndex )) == -1 ) 
223
	xabort ( "aio_return () failed" );
Deike Kleberg's avatar
Deike Kleberg committed
224

225
      bfd->prefIndex --;
Deike Kleberg's avatar
Deike Kleberg committed
226
227
    }

228
  xdebug ( "filename=%s, prefIndex=%d, return", bfd->name, bfd->prefIndex );
Deike Kleberg's avatar
Deike Kleberg committed
229
230
231
}

/***************************************************************/
232
static void
233
elemCheck(void *q, void *nm)
234
{
235
236
  bFiledataPA *bfd = q;
  const char *name = nm;
237

238
  if (!strcmp(name, bfd->name))
239
    xabort("Filename %s has already been inserted\n", name);
240
}
Deike Kleberg's avatar
Deike Kleberg committed
241

242
243
/***************************************************************/

244
void pioWriterAIO(void)
Deike Kleberg's avatar
Deike Kleberg committed
245
{
246
  bFiledataPA *bfd; 
247
  listSet * bibBFiledataPA;
248
249
  MPI_Comm commNode = commInqCommNode ();
  int nProcsCollNode = commInqSizeNode () - commInqSizeColl ();
250
  bool * sentFinalize, doFinalize;
Deike Kleberg's avatar
Deike Kleberg committed
251

252
  if ( nPrefStreams < 1 ) xabort("USAGE: # PREFETCH STREAMS >= 1");
253
  xdebug ( "nProcsCollNode=%d on this node", nProcsCollNode );
Deike Kleberg's avatar
Deike Kleberg committed
254
 
255
  bibBFiledataPA = listSetNew(destroyBFiledataPA, compareNamesBPA);
256
  sentFinalize = xmalloc((size_t)nProcsCollNode * sizeof (sentFinalize[0]));
Deike Kleberg's avatar
Deike Kleberg committed
257
258
  
  for ( ;; )
259
260
261
262
    {
      MPI_Status status;
      xmpiStat(MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, commNode,
                         &status ), &status);
263

264
265
266
      int source = status.MPI_SOURCE;
      int tag = status.MPI_TAG;
      struct fileOpTag rtag = decodeFileOpTag(tag);
Deike Kleberg's avatar
Deike Kleberg committed
267

268
269
      int messagesize;
      xmpi (MPI_Get_count(&status, MPI_CHAR, &messagesize));
Deike Kleberg's avatar
Deike Kleberg committed
270

271
      xdebug ( "receive message from source=%d, id=%d, command=%d ( %s ), "
272
273
               "messagesize=%d", source, rtag.id, rtag.command,
               command2charP[rtag.command], messagesize);
Deike Kleberg's avatar
Deike Kleberg committed
274

275
      switch (rtag.command)
Deike Kleberg's avatar
Deike Kleberg committed
276
277
	{
      	case IO_Open_file:
278
279
280
281
          {
            char *messageBuffer = xmalloc((size_t)messagesize
                                          * sizeof (messageBuffer[0]));
            char *pMB = messageBuffer;
Deike Kleberg's avatar
Deike Kleberg committed
282

283
284
            xmpi(MPI_Recv(messageBuffer, messagesize, MPI_CHAR, source,
                          tag, commNode, &status ));
Deike Kleberg's avatar
Deike Kleberg committed
285

286
287
288
289
290
291
            char *filename = strtok(pMB, token);
            pMB += (strlen(filename) + 1);
            char *temp = strtok(pMB, token);
            long buffersize =  strtol(temp, NULL, 16);
            pMB += (strlen(temp) + 1);
            size_t amount = (size_t)(messageBuffer + messagesize - pMB);
Deike Kleberg's avatar
Deike Kleberg committed
292

293
294
            xdebug("command  %s, filename=%s, buffersize=%ld, amount=%zd",
                   command2charP[rtag.command], filename, buffersize, amount);
Deike Kleberg's avatar
Deike Kleberg committed
295

296
297
298
299
300
301
302
303
304
305
306
307
308
            if (!(bfd = listSetGet(bibBFiledataPA, fileIDTest,
                                   (void *)(intptr_t)rtag.id)))
              {
                listSetForeach(bibBFiledataPA, elemCheck, filename);
                bfd = initBFiledataPA(filename, (size_t)buffersize, nProcsCollNode);
                int id;
                if ((id = listSetAdd(bibBFiledataPA, bfd)) < 0)
                  xabort("fileID=%d not unique", rtag.id);
                bfd->fileID = id;
              }
            else
              if (strcmp(filename, bfd->name) != 0)
                xabort("filename is not consistent, fileID=%d", rtag.id);
Deike Kleberg's avatar
Deike Kleberg committed
309

310
311
            bfd->currOpIndex = bfd->nextOpIndex;
            bfd->nextOpIndex = ( bfd->nextOpIndex + 1 ) % nPrefStreams;
Deike Kleberg's avatar
Deike Kleberg committed
312

313
314
            memcpy((void *)bfd->ctrlBlks[bfd->currOpIndex].aio_buf,
                   pMB, (size_t)amount);
Thomas Jahns's avatar
Thomas Jahns committed
315

316
            writePA(bfd, amount);
Deike Kleberg's avatar
Deike Kleberg committed
317

318
319
            free(messageBuffer);
          }
Deike Kleberg's avatar
Deike Kleberg committed
320
321
322
	  break;

	case IO_Send_buffer:
323

324
          if (!(bfd = listSetGet(bibBFiledataPA, fileIDTest,
325
                               (void *)(intptr_t)rtag.id)))
326
            xabort("fileID=%d is not in set", rtag.id);
327

328
329
	  xdebug("command: %s, id=%d, name=%s",
                 command2charP[rtag.command], rtag.id, bfd->name );
Deike Kleberg's avatar
Deike Kleberg committed
330

331
332
	  bfd->currOpIndex = bfd->nextOpIndex;
	  bfd->nextOpIndex = ( bfd->nextOpIndex + 1 ) % nPrefStreams;
Deike Kleberg's avatar
Deike Kleberg committed
333

334
335
336
	  xmpi(MPI_Recv((void *)bfd->ctrlBlks[bfd->currOpIndex].aio_buf,
                        messagesize, MPI_CHAR, source, tag, commNode, &status ));
	  writePA(bfd, (size_t)messagesize);
Deike Kleberg's avatar
Deike Kleberg committed
337
338
339
	  break;

	case IO_Close_file:
340

341
          if (!(bfd = listSetGet(bibBFiledataPA, fileIDTest,
342
                               (void *)(intptr_t)rtag.id)))
343
            xabort("fileID=%d is not in set", rtag.id);
344

345
346
	  xdebug(" command %s, id=%d, name=%s",
                 command2charP[rtag.command], rtag.id, bfd->name);
Deike Kleberg's avatar
Deike Kleberg committed
347

348
	  bfd->currOpIndex = bfd->nextOpIndex;
Deike Kleberg's avatar
Deike Kleberg committed
349

350
	  bfd->nextOpIndex = ( bfd->nextOpIndex + 1 ) % nPrefStreams;
Deike Kleberg's avatar
Deike Kleberg committed
351

352
	  MPI_Recv((void *)bfd->ctrlBlks[bfd->currOpIndex].aio_buf,
353
                   messagesize, MPI_CHAR, source, tag, commNode, &status);
Deike Kleberg's avatar
Deike Kleberg committed
354

355
	  writePA(bfd, (size_t)messagesize);
Deike Kleberg's avatar
Deike Kleberg committed
356

357
	  if ( ! --(bfd->activeCollectors))
Deike Kleberg's avatar
Deike Kleberg committed
358
	    {
359
              xdebug ( "all are finished with file %d, delete node", rtag.id);
360
361
              listSetRemove(bibBFiledataPA, fileIDTest,
                            (void *)(intptr_t)rtag.id);
362
363
364
365
	    }
          break;
        case IO_Finalize:
          {
366
            int buffer, collID;
367
368
369
370
371

            xmpi ( MPI_Recv ( &buffer, 1, MPI_INT, source, tag, commNode, &status ));
            sentFinalize[source] = true;
            doFinalize = true;
            for ( collID = 0; collID < nProcsCollNode; collID++ )
372
              doFinalize &= sentFinalize[collID];
373
374
            if ( doFinalize )
              {
375
376
                if (!listSetIsEmpty(bibBFiledataPA))
                  xabort("Set bibBfiledataP is not empty.");
377
378
                else
                  {
379
                    xdebug("%s", "all files are finished, destroy set,"
380
                           " return");
381
                    listSetDelete(bibBFiledataPA);
382
383
384
385
386
387
388
389
                  }
                return;
              }
          }

          break;
        default:
          xabort ( "COMMAND NOT IMPLEMENTED" );
Deike Kleberg's avatar
Deike Kleberg committed
390
391
392
393
394
395
	}
    }
}



396
397
/***************************************************************/

Deike Kleberg's avatar
Deike Kleberg committed
398

399
#endif
400
401
402
403
404
405
406
407
408
/*
 * Local Variables:
 * c-file-style: "Java"
 * c-basic-offset: 2
 * indent-tabs-mode: nil
 * show-trailing-whitespace: t
 * require-trailing-newline: t
 * End:
 */