pio_posixasynch.c 10.7 KB
Newer Older
1
2
3
4
/* 
   todo  
   README: specialRank Pe closes down, when all output files are closed    
*/
5
6
7
#ifdef HAVE_CONFIG_H
#  include "config.h"
#endif
Deike Kleberg's avatar
Deike Kleberg committed
8

9

10
#ifndef _SX
Deike Kleberg's avatar
Deike Kleberg committed
11

Thomas Jahns's avatar
Thomas Jahns committed
12
13
14
#include <aio.h>
#include <errno.h>
#include <fcntl.h>
Deike Kleberg's avatar
Deike Kleberg committed
15
16
17
#include <stdbool.h>
#include <stdio.h>
#include <string.h>
18
19
#include <sys/types.h>
#include <sys/stat.h>
Thomas Jahns's avatar
Thomas Jahns committed
20
21
22
#include <unistd.h>

#include <mpi.h>
Deike Kleberg's avatar
Deike Kleberg committed
23
24

#include "pio.h"
25
#include "pio_comm.h"
Deike Kleberg's avatar
Deike Kleberg committed
26
#include "pio_impl.h"
Deike Kleberg's avatar
Deike Kleberg committed
27
#include "pio_util.h"
28
#include "dmemory.h"
Deike Kleberg's avatar
Deike Kleberg committed
29

30
extern char * command2charP[6];
Deike Kleberg's avatar
Deike Kleberg committed
31

32
33
extern char *token;

Deike Kleberg's avatar
Deike Kleberg committed
34

35
typedef struct
Deike Kleberg's avatar
Deike Kleberg committed
36
37
38
39
40
41
42
{
  struct dBuffer *fb;
  struct aiocb *ctrlBlks;
  off_t offset;
  int currOpIndex;
  int nextOpIndex;
  int prefIndex;
43
  int activeCollectors;
44
  int handle, fileID;
45
  char name[];
46
} bFiledataPA;
Deike Kleberg's avatar
Deike Kleberg committed
47

48
static int
49
fileIDTest(void *a, void *fileID)
50
{
51
  return ((bFiledataPA *)a)->fileID == (int)(intptr_t)fileID;
52
53
}

54
static int nPrefStreams = 4;
Deike Kleberg's avatar
Deike Kleberg committed
55
56
57

/***************************************************************/

58
59
static bFiledataPA *
initBFiledataPA(char *filename, size_t bs, int nc)
Deike Kleberg's avatar
Deike Kleberg committed
60
{
61
  bFiledataPA *bfd;
Deike Kleberg's avatar
Deike Kleberg committed
62

63
64
  xdebug ( "filename=%s, buffersize=%zu, ncollectors=%d, nPrefetchStreams=%d",
           filename, bs, nc, nPrefStreams );
Deike Kleberg's avatar
Deike Kleberg committed
65

Uwe Schulzweida's avatar
Uwe Schulzweida committed
66
  bfd = (bFiledataPA*) xmalloc( sizeof (*bfd) + strlen(filename) + 1);
67
  strcpy(bfd->name, filename);
Deike Kleberg's avatar
Deike Kleberg committed
68

69
  if (( bfd->handle = open ( bfd->name, O_CREAT | O_WRONLY, 0666 )) == -1 )
70
    xabort("Failed to open %s", bfd->name);
Deike Kleberg's avatar
Deike Kleberg committed
71

72
  dbuffer_init(&(bfd->fb), (size_t)nPrefStreams * bs);
Deike Kleberg's avatar
Deike Kleberg committed
73

74
  bfd->ctrlBlks = xcalloc((size_t)nPrefStreams, sizeof (bfd->ctrlBlks[0]));
Deike Kleberg's avatar
Deike Kleberg committed
75

76
  for (int i = 0; i < nPrefStreams; i++ )
Deike Kleberg's avatar
Deike Kleberg committed
77
    {
78
      bfd->ctrlBlks[i].aio_fildes     = bfd->handle;
79
      bfd->ctrlBlks[i].aio_buf = bfd->fb->buffer + (size_t)i * bs;
80
81
      bfd->ctrlBlks[i].aio_reqprio    = 0;
      bfd->ctrlBlks[i].aio_sigevent.sigev_notify = SIGEV_NONE;   
Deike Kleberg's avatar
Deike Kleberg committed
82
83
    }
  
84
85
86
  bfd->nextOpIndex = 0;
  bfd->prefIndex = 0; 
  bfd->offset = 0;
87
  bfd->activeCollectors = nc;
Deike Kleberg's avatar
Deike Kleberg committed
88

89
  xdebug ( "filename=%s, opened file, return", bfd->name );
Deike Kleberg's avatar
Deike Kleberg committed
90

91
  return bfd;
Deike Kleberg's avatar
Deike Kleberg committed
92
93
94
95
}

/***************************************************************/

96
97
static int
destroyBFiledataPA ( void *v )
Deike Kleberg's avatar
Deike Kleberg committed
98
{
99
  bFiledataPA *bfd = (bFiledataPA * ) v;
Deike Kleberg's avatar
Deike Kleberg committed
100
101
  const struct aiocb *ccBP[1];
  int iret = 0;
Thomas Jahns's avatar
Thomas Jahns committed
102
  ssize_t ssiret;
103
104
  int nextFinishOp = (bfd->nextOpIndex - bfd->prefIndex + nPrefStreams)
    % nPrefStreams;
Deike Kleberg's avatar
Deike Kleberg committed
105

Thomas Jahns's avatar
Thomas Jahns committed
106
  xdebug ( "filename=%s, cleanup and close file", bfd->name );
Deike Kleberg's avatar
Deike Kleberg committed
107
108
109

  /* close file */

110
  for (; bfd->prefIndex > 0 ; --(bfd->prefIndex))
Deike Kleberg's avatar
Deike Kleberg committed
111
    {
Thomas Jahns's avatar
Thomas Jahns committed
112
      xdebug("file: %s, prefIndex=%d", bfd->name, (int)bfd->prefIndex);
113
      ccBP[0] = ( bfd->ctrlBlks + nextFinishOp );
Deike Kleberg's avatar
Deike Kleberg committed
114
115
116
      do
	{
	  iret = aio_suspend ( ccBP, 1, NULL );
117
	  if ( iret < 0 && errno != EINTR ) xabort ( "aio_suspend () failed" );
118
	}
119
      while ( iret != 0 );
Deike Kleberg's avatar
Deike Kleberg committed
120

121
122
      iret = aio_error(bfd->ctrlBlks + nextFinishOp);
      if (( ssiret = aio_return ( bfd->ctrlBlks + nextFinishOp )) == -1 )
Thomas Jahns's avatar
Thomas Jahns committed
123
	xabort("aio_return () failed: %s", strerror(iret));
124
125

      nextFinishOp = ( nextFinishOp + 1 ) % nPrefStreams;
Deike Kleberg's avatar
Deike Kleberg committed
126
127
    }

128
129
  if ((iret = ftruncate(bfd->handle, bfd->offset)) == -1)
    xabort("failed to truncate file %s: %s", bfd->name, strerror(errno));
130
  if (( iret = close ( bfd->handle )) == -1 )
Thomas Jahns's avatar
Thomas Jahns committed
131
    xabort("failed to close %s", bfd->name);
Deike Kleberg's avatar
Deike Kleberg committed
132
133
134

  /* file closed, cleanup */

135
  dbuffer_cleanup ( &( bfd->fb ));
Deike Kleberg's avatar
Deike Kleberg committed
136

137
  free(bfd->ctrlBlks);
138
  free(bfd);
Deike Kleberg's avatar
Deike Kleberg committed
139

140
  xdebug("%s", "closed file and cleaned up, return");
Deike Kleberg's avatar
Deike Kleberg committed
141
142
143
144
145
146

  return iret;
}

/***************************************************************/

147
148
static bool
compareNamesBPA(void *v1, void *v2)
149
{
150
  bFiledataPA *bfd1 = v1, *bfd2 = v2;
151

152
  return !strcmp(bfd1->name, bfd2->name);
153
154
155
156
}

/***************************************************************/

157
static void
158
writePA(bFiledataPA *bfd, size_t amount)
Deike Kleberg's avatar
Deike Kleberg committed
159
160
{
  const struct aiocb *ccBP[1];
161
  ssize_t iret;
Deike Kleberg's avatar
Deike Kleberg committed
162

163
  xdebug ( "file %s, in", bfd->name );
Deike Kleberg's avatar
Deike Kleberg committed
164
  
165
166
  bfd->ctrlBlks[bfd->currOpIndex].aio_nbytes = amount;
  bfd->ctrlBlks[bfd->currOpIndex].aio_offset = bfd->offset;
Deike Kleberg's avatar
Deike Kleberg committed
167

168
169
170
  xdebug ( " before aio_write(), file %s, aio_nbytes=%zu, aio_offset=%zu",
           bfd->name, bfd->ctrlBlks[bfd->currOpIndex].aio_nbytes,
           bfd->ctrlBlks[bfd->currOpIndex].aio_offset );
Deike Kleberg's avatar
Deike Kleberg committed
171

172
  iret = aio_write ( bfd->ctrlBlks + bfd->currOpIndex );
Deike Kleberg's avatar
Deike Kleberg committed
173

174
175
176
  xdebug ( "after aio_write(), file %s, aio_nbytes=%zu, aio_offset=%zu,"
           "iret=aio_write()=%d",
           bfd->name, bfd->ctrlBlks[bfd->currOpIndex].aio_nbytes,
177
           bfd->ctrlBlks[bfd->currOpIndex].aio_offset, (int)iret );
178
179
180
181
182
   
  if ( iret == -1 ) 
    {
      xabort ( "did not succeed writing buffer" );
    }
183
184
185
  else
    xdebug ( "buffer written to %s",  bfd->name );
     
186
  bfd->offset += (off_t)amount;
187
  bfd->prefIndex ++;
Deike Kleberg's avatar
Deike Kleberg committed
188

189
  if ( bfd->prefIndex >= nPrefStreams ) 
Deike Kleberg's avatar
Deike Kleberg committed
190
    {
191
      ccBP[0] = ( bfd->ctrlBlks + bfd->nextOpIndex );
Deike Kleberg's avatar
Deike Kleberg committed
192
193
194
195
      do
	{
	  iret = aio_suspend ( ccBP, 1, NULL );
	  if ( iret < 0 && errno != EINTR )
196
	    xabort ( "aio_suspend () failed" );
Deike Kleberg's avatar
Deike Kleberg committed
197
198
	} while ( iret != 0 );

Thomas Jahns's avatar
Thomas Jahns committed
199
      if (( iret = aio_return ( bfd->ctrlBlks + bfd->nextOpIndex )) == -1 )
200
	xabort ( "aio_return () failed" );
Deike Kleberg's avatar
Deike Kleberg committed
201

202
      bfd->prefIndex --;
Deike Kleberg's avatar
Deike Kleberg committed
203
204
    }

205
  xdebug ( "filename=%s, prefIndex=%d, return", bfd->name, bfd->prefIndex );
Deike Kleberg's avatar
Deike Kleberg committed
206
207
208
}

/***************************************************************/
209
static void
210
elemCheck(void *q, void *nm)
211
{
212
213
  bFiledataPA *bfd = q;
  const char *name = nm;
214

215
  if (!strcmp(name, bfd->name))
216
    xabort("Filename %s has already been inserted\n", name);
217
}
Deike Kleberg's avatar
Deike Kleberg committed
218

219
220
/***************************************************************/

221
void pioWriterAIO(void)
Deike Kleberg's avatar
Deike Kleberg committed
222
{
223
  bFiledataPA *bfd; 
224
  listSet * bibBFiledataPA;
225
226
  MPI_Comm commNode = commInqCommNode ();
  int nProcsCollNode = commInqSizeNode () - commInqSizeColl ();
227
  bool * sentFinalize, doFinalize;
Deike Kleberg's avatar
Deike Kleberg committed
228

229
  if ( nPrefStreams < 1 ) xabort("USAGE: # PREFETCH STREAMS >= 1");
230
  xdebug ( "nProcsCollNode=%d on this node", nProcsCollNode );
Deike Kleberg's avatar
Deike Kleberg committed
231
 
232
  bibBFiledataPA = listSetNew(destroyBFiledataPA, compareNamesBPA);
233
  sentFinalize = xcalloc((size_t)nProcsCollNode, sizeof (sentFinalize[0]));
Deike Kleberg's avatar
Deike Kleberg committed
234
235
  
  for ( ;; )
236
237
238
239
    {
      MPI_Status status;
      xmpiStat(MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, commNode,
                         &status ), &status);
240

241
242
243
      int source = status.MPI_SOURCE;
      int tag = status.MPI_TAG;
      struct fileOpTag rtag = decodeFileOpTag(tag);
Deike Kleberg's avatar
Deike Kleberg committed
244

245
      int messagesize;
Thomas Jahns's avatar
Thomas Jahns committed
246
      xmpi(MPI_Get_count(&status, MPI_UNSIGNED_CHAR, &messagesize));
Deike Kleberg's avatar
Deike Kleberg committed
247

248
      xdebug ( "receive message from source=%d, id=%d, command=%d ( %s ), "
249
250
               "messagesize=%d", source, rtag.id, rtag.command,
               command2charP[rtag.command], messagesize);
Deike Kleberg's avatar
Deike Kleberg committed
251

252
      switch (rtag.command)
Deike Kleberg's avatar
Deike Kleberg committed
253
254
	{
      	case IO_Open_file:
255
256
257
258
          {
            char *messageBuffer = xmalloc((size_t)messagesize
                                          * sizeof (messageBuffer[0]));
            char *pMB = messageBuffer;
Deike Kleberg's avatar
Deike Kleberg committed
259

Thomas Jahns's avatar
Thomas Jahns committed
260
            xmpi(MPI_Recv(messageBuffer, messagesize, MPI_UNSIGNED_CHAR, source,
261
                          tag, commNode, &status ));
Deike Kleberg's avatar
Deike Kleberg committed
262

263
264
265
266
267
268
            char *filename = strtok(pMB, token);
            pMB += (strlen(filename) + 1);
            char *temp = strtok(pMB, token);
            long buffersize =  strtol(temp, NULL, 16);
            pMB += (strlen(temp) + 1);
            size_t amount = (size_t)(messageBuffer + messagesize - pMB);
Deike Kleberg's avatar
Deike Kleberg committed
269

270
271
            xdebug("command  %s, filename=%s, buffersize=%ld, amount=%zd",
                   command2charP[rtag.command], filename, buffersize, amount);
Deike Kleberg's avatar
Deike Kleberg committed
272

273
274
275
276
277
278
279
280
281
282
283
284
285
            if (!(bfd = listSetGet(bibBFiledataPA, fileIDTest,
                                   (void *)(intptr_t)rtag.id)))
              {
                listSetForeach(bibBFiledataPA, elemCheck, filename);
                bfd = initBFiledataPA(filename, (size_t)buffersize, nProcsCollNode);
                int id;
                if ((id = listSetAdd(bibBFiledataPA, bfd)) < 0)
                  xabort("fileID=%d not unique", rtag.id);
                bfd->fileID = id;
              }
            else
              if (strcmp(filename, bfd->name) != 0)
                xabort("filename is not consistent, fileID=%d", rtag.id);
Deike Kleberg's avatar
Deike Kleberg committed
286

287
288
            bfd->currOpIndex = bfd->nextOpIndex;
            bfd->nextOpIndex = ( bfd->nextOpIndex + 1 ) % nPrefStreams;
Deike Kleberg's avatar
Deike Kleberg committed
289

290
291
            memcpy((void *)bfd->ctrlBlks[bfd->currOpIndex].aio_buf,
                   pMB, (size_t)amount);
Thomas Jahns's avatar
Thomas Jahns committed
292

293
            writePA(bfd, amount);
Deike Kleberg's avatar
Deike Kleberg committed
294

295
296
            free(messageBuffer);
          }
Deike Kleberg's avatar
Deike Kleberg committed
297
298
299
	  break;

	case IO_Send_buffer:
300

301
          if (!(bfd = listSetGet(bibBFiledataPA, fileIDTest,
302
                               (void *)(intptr_t)rtag.id)))
303
            xabort("fileID=%d is not in set", rtag.id);
304

305
306
	  xdebug("command: %s, id=%d, name=%s",
                 command2charP[rtag.command], rtag.id, bfd->name );
Deike Kleberg's avatar
Deike Kleberg committed
307

308
309
	  bfd->currOpIndex = bfd->nextOpIndex;
	  bfd->nextOpIndex = ( bfd->nextOpIndex + 1 ) % nPrefStreams;
Deike Kleberg's avatar
Deike Kleberg committed
310

311
	  xmpi(MPI_Recv((void *)bfd->ctrlBlks[bfd->currOpIndex].aio_buf,
312
313
                        messagesize, MPI_UNSIGNED_CHAR, source, tag, commNode,
                        &status));
314
	  writePA(bfd, (size_t)messagesize);
Deike Kleberg's avatar
Deike Kleberg committed
315
316
317
	  break;

	case IO_Close_file:
318

319
          if (!(bfd = listSetGet(bibBFiledataPA, fileIDTest,
320
                               (void *)(intptr_t)rtag.id)))
321
            xabort("fileID=%d is not in set", rtag.id);
322

323
324
	  xdebug(" command %s, id=%d, name=%s",
                 command2charP[rtag.command], rtag.id, bfd->name);
Deike Kleberg's avatar
Deike Kleberg committed
325

326
	  bfd->currOpIndex = bfd->nextOpIndex;
Deike Kleberg's avatar
Deike Kleberg committed
327

328
	  bfd->nextOpIndex = ( bfd->nextOpIndex + 1 ) % nPrefStreams;
Deike Kleberg's avatar
Deike Kleberg committed
329

Thomas Jahns's avatar
Thomas Jahns committed
330
331
332
	  xmpi(MPI_Recv((void *)bfd->ctrlBlks[bfd->currOpIndex].aio_buf,
                        messagesize, MPI_UNSIGNED_CHAR,
                        source, tag, commNode, &status));
Deike Kleberg's avatar
Deike Kleberg committed
333

334
	  writePA(bfd, (size_t)messagesize);
Deike Kleberg's avatar
Deike Kleberg committed
335

336
	  if ( ! --(bfd->activeCollectors))
Deike Kleberg's avatar
Deike Kleberg committed
337
	    {
338
              xdebug ( "all are finished with file %d, delete node", rtag.id);
339
340
              listSetRemove(bibBFiledataPA, fileIDTest,
                            (void *)(intptr_t)rtag.id);
341
342
343
344
	    }
          break;
        case IO_Finalize:
          {
345
            int buffer, collID;
346
347
348
349
350

            xmpi ( MPI_Recv ( &buffer, 1, MPI_INT, source, tag, commNode, &status ));
            sentFinalize[source] = true;
            doFinalize = true;
            for ( collID = 0; collID < nProcsCollNode; collID++ )
351
              doFinalize &= sentFinalize[collID];
352
353
            if ( doFinalize )
              {
354
355
                if (!listSetIsEmpty(bibBFiledataPA))
                  xabort("Set bibBfiledataP is not empty.");
356
357
                else
                  {
358
                    xdebug("%s", "all files are finished, destroy set,"
359
                           " return");
360
                    listSetDelete(bibBFiledataPA);
361
                  }
Thomas Jahns's avatar
Thomas Jahns committed
362
                free(sentFinalize);
363
364
365
366
367
368
369
                return;
              }
          }

          break;
        default:
          xabort ( "COMMAND NOT IMPLEMENTED" );
Deike Kleberg's avatar
Deike Kleberg committed
370
371
372
373
374
375
	}
    }
}



376
377
/***************************************************************/

Deike Kleberg's avatar
Deike Kleberg committed
378

379
#endif
380
381
382
383
384
385
386
387
388
/*
 * Local Variables:
 * c-file-style: "Java"
 * c-basic-offset: 2
 * indent-tabs-mode: nil
 * show-trailing-whitespace: t
 * require-trailing-newline: t
 * End:
 */