pio_mpinonb.c 7.33 KB
Newer Older
1
2
3
4
#ifdef HAVE_CONFIG_H
#  include "config.h"
#endif

5
#include <inttypes.h>
6
7
8
#include <stdbool.h>
#include <stdio.h>
#include <string.h>
9
#include <mpi.h>
10

11
#include "cdi.h"
12
#include "dmemory.h"
13
#include "pio.h"
Deike Kleberg's avatar
Deike Kleberg committed
14
#include "pio_comm.h"
15
#include "pio_impl.h"
Deike Kleberg's avatar
Deike Kleberg committed
16
#include "pio_util.h"
17

18
19
extern long initial_buffersize;

20
typedef struct
21
22
23
24
25
26
27
{
  size_t size;
  struct dBuffer *db1;
  struct dBuffer *db2;
  struct dBuffer *db;
  MPI_File fh;
  MPI_Request request;
28
  int fileID;
29
  int tsID;
30
  bool finished;
31
  char name[];
32
33
} aFiledataM;

34
static listSet *bibAFiledataM;
35

36
static int
37
fileIDTest(void *a, void *fileID)
38
{
39
  return ((aFiledataM *)a)->fileID == (int)(intptr_t)fileID;
40
41
}

Deike Kleberg's avatar
Deike Kleberg committed
42

43
44
45
46
/***************************************************************/

static aFiledataM *initAFiledataMPINONB ( const char *filename, size_t bs )
{
47
  aFiledataM *of = NULL;
Deike Kleberg's avatar
Deike Kleberg committed
48
  int iret;
Deike Kleberg's avatar
Deike Kleberg committed
49
  MPI_Comm commNode = commInqCommNode ();
50

Uwe Schulzweida's avatar
Uwe Schulzweida committed
51
  of = (aFiledataM*) xmalloc(sizeof (*of) + strlen(filename) + 1);
52

Thomas Jahns's avatar
Thomas Jahns committed
53
  strcpy(of->name, filename);
54
  of->size = bs;
55
56
  of->db1 = NULL;
  of->db2 = NULL;
57
58

  /* init output buffer */
Thomas Jahns's avatar
Thomas Jahns committed
59

Deike Kleberg's avatar
Deike Kleberg committed
60
  iret = dbuffer_init ( &( of->db1 ), of->size );
61
  iret += dbuffer_init ( &( of->db2 ), of->size );
62

63
  if ( iret > 0 ) xabort ( "dbuffer_init did not succeed" );
64

65
66
  of->db = of->db1;

67
68
69
70
71
  of->tsID = CDI_UNDEFID;

  /* open file */
  xmpi(MPI_File_open(commNode, of->name, MPI_MODE_CREATE|MPI_MODE_WRONLY,
                     MPI_INFO_NULL, &( of->fh )));
72
  of->request = MPI_REQUEST_NULL;
73
  of->finished = false;
Thomas Jahns's avatar
Thomas Jahns committed
74

75
76
77
78
79
  return of;
}

/***************************************************************/

Thomas Jahns's avatar
Thomas Jahns committed
80
81
static int
destroyAFiledataMPINONB(void *v)
82
83
84
85
{
  int iret = 0;
  aFiledataM *of;
  MPI_Status status;
Deike Kleberg's avatar
Deike Kleberg committed
86
  int rankNode = commInqRankNode ();
87
  MPI_Offset endpos;
88
89
90

  of = (aFiledataM * ) v;

Thomas Jahns's avatar
Thomas Jahns committed
91
92
  xdebug ( "IOPE%d: close file %d, name=\"%s\"",
           rankNode, of->fileID, of->name );
Thomas Jahns's avatar
Thomas Jahns committed
93

94
  /* close file */
Thomas Jahns's avatar
Thomas Jahns committed
95
  xmpi(MPI_Wait(&of->request, &status));
96
  xmpi(MPI_Barrier(commInqCommNode()));
97
98
  xmpi(MPI_File_get_position_shared(of->fh, &endpos));
  xmpi(MPI_File_set_size(of->fh, endpos));
99
100
101
  iret = MPI_File_close ( & ( of->fh ));

  /* file closed, cleanup */
Thomas Jahns's avatar
Thomas Jahns committed
102

103
104
105
106
107
  dbuffer_cleanup ( & ( of->db1 ));
  dbuffer_cleanup ( & ( of->db2 ));

  free ( of );

Thomas Jahns's avatar
Thomas Jahns committed
108
  xdebug ( "IOPE%d: closed file, cleaned up, return",
Deike Kleberg's avatar
Deike Kleberg committed
109
           rankNode );
110
111
112
113
114
115

  return iret == MPI_SUCCESS ? 0 : -1;
}

/***************************************************************/

Thomas Jahns's avatar
Thomas Jahns committed
116
117
static bool
compareNamesMPINONB(void *v1, void *v2)
118
{
Thomas Jahns's avatar
Thomas Jahns committed
119
120
  aFiledataM *afm1 = v1, *afm2 = v2;
  return !strcmp(afm1->name, afm2->name);
121
122
123
124
}

/***************************************************************/

Thomas Jahns's avatar
Thomas Jahns committed
125
126
static void
writeMPINONB(aFiledataM *of)
127
{
128
  int amount;
129
  MPI_Status status;
Deike Kleberg's avatar
Deike Kleberg committed
130
  int rankNode = commInqRankNode ();
131
  int fileID = of->fileID;
132
133

  /* write buffer */
Thomas Jahns's avatar
Thomas Jahns committed
134

135
136
  amount = ( int ) dbuffer_data_size ( of->db );

Deike Kleberg's avatar
Deike Kleberg committed
137
138
  if ( amount == 0 ) return;

Thomas Jahns's avatar
Thomas Jahns committed
139
  xdebug3 ( "IOPI%d: Write buffer, size %d bytes, in",
Deike Kleberg's avatar
Deike Kleberg committed
140
           rankNode, amount );
Thomas Jahns's avatar
Thomas Jahns committed
141

142
  xmpi ( MPI_Wait ( & ( of->request ), &status ));
Thomas Jahns's avatar
Thomas Jahns committed
143
144
145
  xmpi(MPI_File_iwrite_shared(of->fh, of->db->buffer, amount, MPI_UNSIGNED_CHAR,
                              &of->request));
  xdebug("%d bytes written for fileID=%d", amount, fileID);
146
147

  /* change outputBuffer */
Thomas Jahns's avatar
Thomas Jahns committed
148

149
  dbuffer_reset ( of->db );
Thomas Jahns's avatar
Thomas Jahns committed
150

151
152
  if ( of->db == of->db1 )
    {
Thomas Jahns's avatar
Thomas Jahns committed
153
        xdebug3 ( "IOPE%d: fileID=%d, change to buffer 2 ...",
Deike Kleberg's avatar
Deike Kleberg committed
154
                 rankNode, fileID );
155
156
      of->db =  of->db2;
    }
Thomas Jahns's avatar
Thomas Jahns committed
157
  else
158
    {
Thomas Jahns's avatar
Thomas Jahns committed
159
160
        xdebug3 ( "IOPE%d: fileID=%d, change to buffer 1 ...",
                  rankNode, fileID );
161
162
163
164
165
166
167
168
      of->db =  of->db1;
    }

  return;
}

/***************************************************************/

Deike Kleberg's avatar
Deike Kleberg committed
169
size_t fwMPINONB ( int fileID, int tsID, const void *buffer, size_t len )
Thomas Jahns's avatar
Thomas Jahns committed
170
{
171
172
173
  int error = 0;
  int filled = 0;
  aFiledataM *of;
Deike Kleberg's avatar
Deike Kleberg committed
174
  int rankNode = commInqRankNode ();
175

176
  of = listSetGet(bibAFiledataM, fileIDTest, (void *)(intptr_t)fileID);
177
178
  xassert(of);

179
  bool flush = tsID != of->tsID;
180

181
  if (flush)
182
    {
183
184
      xdebug3("IOPE%d: tsID = %d, flush buffer", rankNode, tsID);
      writeMPINONB(of);
185
      of->tsID = tsID;
186
187
188
189
      MPI_Status status;
      xmpi(MPI_Wait(&(of->request), &status));
      xmpi(MPI_Barrier(commInqCommNode()));
    }
190
191
192

  filled = dbuffer_push ( of->db, ( unsigned char * ) buffer, len );

Deike Kleberg's avatar
Deike Kleberg committed
193
  xdebug3 ( "IOPE%d: fileID = %d, tsID = %d,"
Thomas Jahns's avatar
Thomas Jahns committed
194
195
           " pushed data on buffer, filled = %d",
           rankNode, fileID, tsID, filled );
196

Thomas Jahns's avatar
Thomas Jahns committed
197
  if ( filled == 1 )
198
199
    {
      if ( flush )
Thomas Jahns's avatar
Thomas Jahns committed
200
        error = filled;
201
      else
Thomas Jahns's avatar
Thomas Jahns committed
202
203
204
205
206
        {
          writeMPINONB(of);

          error = dbuffer_push ( of->db, ( unsigned char * ) buffer, len );
        }
207
    }
Thomas Jahns's avatar
Thomas Jahns committed
208

209
  if ( error == 1 )
Thomas Jahns's avatar
Thomas Jahns committed
210
211
    xabort("did not succeed filling output buffer, fileID=%d", fileID);

212
213
214
215
216
  return len;
}

/***************************************************************/

Deike Kleberg's avatar
Deike Kleberg committed
217
int fcMPINONB ( int fileID )
218
219
220
{
  aFiledataM *of;
  int iret;
Deike Kleberg's avatar
Deike Kleberg committed
221
222
  MPI_Comm commNode = commInqCommNode ();
  int rankNode = commInqRankNode ();
223

Thomas Jahns's avatar
Thomas Jahns committed
224
225
  xdebug("IOPE%d: write buffer, close file and cleanup, in %d",
         rankNode, fileID );
226

227
228
  if (!(of = listSetGet(bibAFiledataM, fileIDTest, (void *)(intptr_t)fileID)))
    xabort("listSet, fileID=%d not found", fileID);
229

Thomas Jahns's avatar
Thomas Jahns committed
230
  writeMPINONB(of);
231

232
  /* remove file element */
Thomas Jahns's avatar
Thomas Jahns committed
233
  int iret = listSetRemove(bibAFiledataM, fileIDTest, (void *)(intptr_t)fileID);
234
235
236
237
  return iret;
}

/***************************************************************/
238
239
static void
elemCheck(void *q, void *nm)
Deike Kleberg's avatar
Deike Kleberg committed
240
{
241
242
243
244
  aFiledataM *afm = q;
  const char *name = nm;

  if (!strcmp(name, afm->name))
245
    xabort("Filename %s has already been added to set\n", name);
Deike Kleberg's avatar
Deike Kleberg committed
246
}
247

248
249
250
251

int fowMPINONB ( const char *filename )
{
  static aFiledataM *of;
Thomas Jahns's avatar
Thomas Jahns committed
252
253
  static long buffersize = 0;
  int id, bcastRank = 0;
Deike Kleberg's avatar
Deike Kleberg committed
254
255
  MPI_Comm commNode = commInqCommNode ();
  int rankNode = commInqRankNode ();
256
257

  /* broadcast buffersize to collectors ( just once, for all files )*/
Thomas Jahns's avatar
Thomas Jahns committed
258

259
260
  if ( ! buffersize )
    {
Thomas Jahns's avatar
Thomas Jahns committed
261
262
263
        xdebug ( "IOPE%d: Broadcast buffersize to collectors ...",
                  rankNode );

Deike Kleberg's avatar
Deike Kleberg committed
264
      if  ( rankNode == bcastRank )
Thomas Jahns's avatar
Thomas Jahns committed
265
266
267
268
269
270
271
        {
          if ( getenv( "BUFSIZE" ) != NULL )
            buffersize = atol ( getenv ( "BUFSIZE" ));
          if ( buffersize < initial_buffersize )
            buffersize = initial_buffersize;
        }

Deike Kleberg's avatar
Deike Kleberg committed
272
      xmpi ( MPI_Bcast ( &buffersize, 1, MPI_LONG, bcastRank, commNode ));
273
274
    }

Thomas Jahns's avatar
Thomas Jahns committed
275
  xdebug("buffersize=%ld", buffersize);
276

277
  listSetForeach(bibAFiledataM, elemCheck, (void *)filename);
278
  of = initAFiledataMPINONB(filename, (size_t)buffersize);
Deike Kleberg's avatar
Deike Kleberg committed
279

280
  if ((id = listSetAdd(bibAFiledataM, of)) < 0 )
Thomas Jahns's avatar
Thomas Jahns committed
281
282
    xabort("filename %s not unique", of->name);

283
284
  xdebug("IOPE%d: name=%s, init and added aFiledataM, return id = %d",
         rankNode, filename, id);
285
  of->fileID = id;
286
287
288
289
290
  return id;
}

/***************************************************************/

291
void finalizeMPINONB(void)
292
{
293
294
  if (!listSetIsEmpty(bibAFiledataM))
    xabort("set bibAFiledataM not empty");
295
296
  else
    {
297
298
      xdebug("%s", "destroy set");
      listSetDelete(bibAFiledataM);
299
300
301
302
303
    }
}

/***************************************************************/

304
305
void
initMPINONB(void (*postCommSetupActions)(void))
306
{
Deike Kleberg's avatar
Deike Kleberg committed
307
  commDefCommColl ( 1 );
Deike Kleberg's avatar
Deike Kleberg committed
308
  commSendNodeInfo ();
Deike Kleberg's avatar
Deike Kleberg committed
309
310
  commRecvNodeMap ();
  commDefCommsIO ();
311
  postCommSetupActions();
312
  bibAFiledataM = listSetNew( destroyAFiledataMPINONB, compareNamesMPINONB );
Thomas Jahns's avatar
Thomas Jahns committed
313

Deike Kleberg's avatar
Deike Kleberg committed
314
  if ( bibAFiledataM == NULL )
Thomas Jahns's avatar
Thomas Jahns committed
315
    xabort ( "listSetNew did not succeed" );
316
317
}

318
319
320
321
322
323
324
325
326
/*
 * Local Variables:
 * c-file-style: "Java"
 * c-basic-offset: 2
 * indent-tabs-mode: nil
 * show-trailing-whitespace: t
 * require-trailing-newline: t
 * End:
 */