pio_record_send.c 7.62 KB
Newer Older
1
2
3
4
#ifdef HAVE_CONFIG_H
#  include "config.h"
#endif

5
#include <inttypes.h>
6
7
#include <stdlib.h>

8
#include "cdipio.h"
9
10
11
#include "pio_comm.h"
#include "pio_impl.h"
#include "pio_util.h"
12
#include "dmemory.h"
13
14
15
16
17
18
19
20
21
22
23

extern char *command2charP[];
extern char *token;
extern long initial_buffersize;

typedef struct
{
  size_t size;
  struct dBuffer *db1;
  struct dBuffer *db2;
  struct dBuffer *db;
Thomas Jahns's avatar
Thomas Jahns committed
24
  enum IO_Server_command command;
25
  MPI_Request request;
26
  int tsID, fileID;
27
  char name[];
28
} remoteFileBuf;
29

30
static listSet * bibRemoteFileBuf;
31

32
static int
33
fileIDTest(void *a, void *fileID)
34
{
35
  return ((remoteFileBuf *)a)->fileID == (int)(intptr_t)fileID;
36
37
}

38
39
static remoteFileBuf *
initRemoteFileBuf(const char *filename, size_t bs)
40
{
41
  remoteFileBuf *afp;
42
43
44
45
46
  size_t len;
  int iret;

  xdebug ( "filename=%s, buffersize=%zu, in", filename, bs );

47
48
49
  len = strlen(filename);
  afp = xmalloc(sizeof (remoteFileBuf) + len + 1);
  strcpy(afp->name, filename);
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
  afp->size = bs;
  afp->tsID = 0;

  /* init output buffer */

  xdebug ( "filename=%s, init output buffer",  afp->name );

  iret = dbuffer_init ( &( afp->db1 ), afp->size );
  iret += dbuffer_init ( &( afp->db2 ), afp->size );

  if ( iret > 0 ) xabort ( "dbuffer_init did not succeed" );

  afp->db = afp->db1;

  afp->command = IO_Open_file;
  afp->request = MPI_REQUEST_NULL;

67
  xdebug ( "added name=%s, return", afp->name );
68
69
70
71
  return afp;
}

static int
72
destroyRemoteFileBuf(void *v)
73
{
74
  remoteFileBuf *afp = ( remoteFileBuf * ) v;
75
76
77
78
79
80
81
82
  MPI_Status status;

  xdebug ( "filename=%s, cleanup, in", afp->name );

  xmpiStat(MPI_Wait(&afp->request, &status), &status);
  dbuffer_cleanup(&afp->db1);
  dbuffer_cleanup(&afp->db2);

83
  free(afp);
84
85
86
87
88
89
90
91
92

  xdebug("%s", "cleaned up, return");

  return 0;
}

static bool
compareNames(void *v1, void *v2)
{
93
  remoteFileBuf *afd1 = v1, *afd2 = v2;
94
95
96
97
98
99
100

  return !strcmp(afd1->name, afd2->name);
}

/***************************************************************/
/* send buffer to writer and swap buffer for filling */
static void
101
sendP(remoteFileBuf *afd, int id)
102
103
104
105
106
107
{
  int tag;
  size_t amount;
  MPI_Status status;

  amount = dbuffer_data_size ( afd->db );
108
  tag = encodeFileOpTag(id, afd->command);
109
110
111
112
113
114

  xdebug("send buffer for %s, size: %zu bytes, command=%s, in",
         afd->name, amount, command2charP[afd->command]);

  xmpiStat(MPI_Wait(&(afd->request), &status), &status);

115
  /* FIXME: amount > INT_MAX unhandled */
116
117
118
  xmpi(MPI_Issend(afd->db->buffer, (int)amount, MPI_UNSIGNED_CHAR,
                  commInqSpecialRankNode(), tag, commInqCommNode(),
                  &afd->request));
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137

  /* change outputBuffer */
  dbuffer_reset ( afd->db );
  if ( afd->db == afd->db1 )
    {
      xdebug("%s", "Change to buffer 2 ...");
      afd->db =  afd->db2;
    }
  else
    {
      xdebug("%s", "Change to buffer 1 ...");
      afd->db =  afd->db1;
    }
  afd->command = IO_Send_buffer;

  return;
}

static void
138
defTimestep(remoteFileBuf *afd, int tsID)
139
140
141
142
143
144
{
  if ( afd == NULL || tsID != afd->tsID + 1 )
    xabort ( " defTimestepPA () didn't succeed." );
  afd->tsID = tsID;
}

145
static void
146
flushOp(remoteFileBuf *fb, int tsID)
147
{
148
  sendP(fb, fb->fileID);
149
150
151
152
  defTimestep(fb, (int)(intptr_t)tsID);
}


153
154
155
156
157
158
size_t
pioSendWrite(int id, int tsID, const void *buffer, size_t len)
{
  int error = 0;
  int flush = 0;
  int filled;
159
  remoteFileBuf *afd;
160

161
  afd = listSetGet(bibRemoteFileBuf, fileIDTest, (void *)(intptr_t)id);
162

163
  flush = tsID != afd->tsID;
164

165
  if ( flush )
166
    {
167
      xdebug("tsID = %d, flush buffer for fileID=%d", tsID, afd->fileID);
168

169
      flushOp(afd, tsID);
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
      {
        MPI_Status status;
        xmpiStat(MPI_Wait(&(afd->request), &status), &status);
      }
      xmpi(MPI_Barrier(commInqCommColl()));
    }

  filled = dbuffer_push(afd->db, buffer, len);

  xdebug ( "id = %d, tsID = %d, pushed %lu byte data on buffer, filled = %d",
           id, tsID, len, filled);

  if (filled == 1)
    {
      if ( flush )
        error = filled;
      else
        {
          sendP(afd, id);
          error = dbuffer_push(afd->db, buffer, len);
        }
    }

  if ( error == 1 )
    xabort("did not succeed filling output buffer, id=%d", id);

  return len;
}


int
pioSendClose(int id)
{
203
  remoteFileBuf *afd;
204
205
  xdebug ( "fileID %d: send buffer, close file and cleanup",id );

206
  afd = listSetGet(bibRemoteFileBuf, fileIDTest, (void *)(intptr_t)id);
207
208
209
210

  afd->command = IO_Close_file;

  sendP(afd, id);
211
212
213
214
  /* wait for other collectors to also close the file
   * this prevents one process from re-using the file ID before
   * another has sent the close */
  xmpi(MPI_Barrier(commInqCommColl()));
215

216
  /* remove file element */
Thomas Jahns's avatar
Thomas Jahns committed
217
  int iret = listSetRemove(bibRemoteFileBuf, fileIDTest, (void *)(intptr_t)id);
218
219
220
221
222
223
  return iret;
}

int
pioSendOpen(const char *filename)
{
224
  remoteFileBuf *afd;
225
  static long buffersize = 0;
226
227
228
229
  int root = 0, id, iret;
  enum {
    messageLength = 32,
  };
230
  char message[messageLength];
231
  MPI_Comm commCollectors = commInqCommColl();
232
233
234
235
236
237
238
239
240
241
242
243
244

  /* broadcast buffersize to collectors */
  if (!buffersize)
    {
      if  (commInqRankColl() == root)
	{
	  if (getenv("BUFSIZE") != NULL)
	    buffersize = atol(getenv("BUFSIZE"));
	  if (buffersize < initial_buffersize)
	    buffersize = initial_buffersize;
          xdebug("filename=%s, broadcast buffersize=%ld to collectors ...",
                 filename, buffersize);
	}
245
      xmpi(MPI_Bcast(&buffersize, 1, MPI_LONG, root, commCollectors));
246
247
    }

248
  /* init and add remoteFileBuf */
249
  afd = initRemoteFileBuf(filename, (size_t)buffersize);
250
  if ((id = listSetAdd(bibRemoteFileBuf, afd)) < 0)
251
    xabort("filename %s is not unique", afd->name);
252
  afd->fileID = id;
253

254
  xdebug("filename=%s, init and added remoteFileBuf, return id = %d",
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
         filename, id);

  /* put filename, id and buffersize on buffer */
  iret = dbuffer_push ( afd->db, filename, strlen ( filename ));
  xassert(iret == 0);
  iret = dbuffer_push ( afd->db, token, 1);
  xassert(iret == 0);
  sprintf ( message,"%lX", buffersize);
  iret = dbuffer_push ( afd->db, message, strlen ( message ));
  xassert(iret == 0);
  iret = dbuffer_push ( afd->db, token, 1);
  xassert(iret == 0);

  if ( ddebug )
    {
      size_t l = strlen(filename) + strlen(message) + 2;
      char *temp = xmalloc(l + 1);
      strncpy(temp, (char *)afd->db->buffer, l);
      temp[l] = '\0';
      xdebug("filename=%s, put Open file message on buffer:\n%s,\t return",
             filename, temp);
      free(temp);
    }
278
  sendP(afd, afd->fileID);
279
  xmpi(MPI_Barrier(commCollectors));
280
281
282
283
284
285
286
287
288
  return id;
}

void
pioSendFinalize(void)
{
  int buffer = 0, tag, specialRank = commInqSpecialRankNode ();
  MPI_Comm commNode = commInqCommNode ();

289
  tag = encodeFileOpTag(0, IO_Finalize);
290
291
292
293

  xmpi(MPI_Send(&buffer, 1, MPI_INT, specialRank, tag, commNode));
  xdebug("%s", "SENT MESSAGE WITH TAG \"IO_FINALIZE\" TO SPECIAL PROCESS");

294
295
  if (!listSetIsEmpty(bibRemoteFileBuf))
    xabort("set bibRemoteFileBuf not empty.");
296
297
  else
    {
298
299
      xdebug("%s", "destroy set");
      listSetDelete(bibRemoteFileBuf);
300
301
302
303
    }
}

void
304
pioSendInitialize(void (*postCommSetupActions)(void))
305
306
307
308
309
{
  if (commInqSizeNode() < 2)
    xabort ( "USAGE: # IO PROCESSES ON A PHYSICAL NODE >= 2" );


310
311
312
313
314
  int isCollector = commInqRankNode () != commInqSpecialRankNode ();
  commDefCommColl(isCollector);
  commSendNodeInfo();
  commRecvNodeMap();
  commDefCommsIO();
315
  postCommSetupActions();
316
317
318
319
320
321
322
323
324
325
  if (!isCollector)
    switch ( commInqIOMode ())
      {
      case PIO_WRITER:
        pioWriterStdIO();
        break;
      case PIO_ASYNCH:
        pioWriterAIO();
        break;
      }
326
  else
327
    bibRemoteFileBuf = listSetNew(destroyRemoteFileBuf, compareNames);
328
329
330
331
332
333
334
335
336
337
338
}

/*
 * Local Variables:
 * c-file-style: "Java"
 * c-basic-offset: 2
 * indent-tabs-mode: nil
 * show-trailing-whitespace: t
 * require-trailing-newline: t
 * End:
 */