pio.c 10.9 KB
Newer Older
1
2
3
4
#ifdef HAVE_CONFIG_H
#  include "config.h"
#endif

Deike Kleberg's avatar
Deike Kleberg committed
5
6
7
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
Deike Kleberg's avatar
Deike Kleberg committed
8
#include <string.h>
9
#include <unistd.h>
Deike Kleberg's avatar
Deike Kleberg committed
10

11
#ifdef USE_MPI
Deike Kleberg's avatar
Deike Kleberg committed
12

Deike Kleberg's avatar
Deike Kleberg committed
13
14
15
16
17
#include "mpi.h"
#include "cdi.h"
#include "pio.h"
#include "pio_impl.h"

Deike Kleberg's avatar
Deike Kleberg committed
18
19
#endif

20
bool ddebug = false;
Deike Kleberg's avatar
Deike Kleberg committed
21

22
#ifdef USE_MPI
Deike Kleberg's avatar
Deike Kleberg committed
23

24
char * command2charP[6] = {"IO_Open_file", "IO_Close_file",
25
26
                           "IO_Get_fp","IO_Set_fp",
                           "IO_Send_buffer", "IO_Finalize"};
Deike Kleberg's avatar
Deike Kleberg committed
27

28
long initial_buffersize = 16 * 1024 * 1024;
29
/*  4 KB <= x < 256 MB */
30
/* 16 * 1024 * 1024; */
31
/* 16 * 1024; */
32
/* 4 * 1024; */
Deike Kleberg's avatar
Deike Kleberg committed
33

Deike Kleberg's avatar
Deike Kleberg committed
34
int maxPtype = 4;
35
int maxNnodes  = 249;
36
int tagKey = 100;
37
int maxErrorString = 100;
38

Deike Kleberg's avatar
Deike Kleberg committed
39
40
41
42
43
44
45
46
47
48
double startTime;
double accumProbe   = 0.0;
double accumRecv    = 0.0;
double accumSend    = 0.0;
double accumSuspend = 0.0;
double accumWait    = 0.0;
double accumWrite   = 0.0;

pioInfo *pioinfo;

49
50
char *token = "%";

Deike Kleberg's avatar
Deike Kleberg committed
51
52
/*****************************************************************************/

53
void errorPIO ( char *errorString, const char *filename, int line, int rank )
Deike Kleberg's avatar
Deike Kleberg committed
54
{
55
  fprintf ( stderr, "PIO ERROR, pe%d, %s, line %d, errorString: \"%s\"\n",
56
            rank, filename, line, errorString );
57
  MPI_Abort ( MPI_COMM_WORLD, 1 );
58
59
60
61
62
63
}

/*****************************************************************************/

void checkMPI ( int iret, const char *filename, int line, int rank )
{
64
65
  char errorString1[MPI_MAX_ERROR_STRING + 1];
  char errorString2[MPI_MAX_ERROR_STRING + 1];
66
  int len, errorClass;
Deike Kleberg's avatar
Deike Kleberg committed
67

68
  if ( iret != MPI_SUCCESS )
Deike Kleberg's avatar
Deike Kleberg committed
69
    {
70
71
72
73
74
      MPI_Error_class ( iret, &errorClass );
      MPI_Error_string ( errorClass, errorString1, &len );
      errorString1[len] = '\0';
      MPI_Error_string ( iret, errorString2, &len);
      errorString2[len] = '\0';
75

76
      fprintf ( stderr, "MPI ERROR, pe%d, %s, line %d,"
77
78
79
80
                "errorClass: \"%s\""
                "errorString: \"%s\"\n",
                rank, filename, line,
                errorString1, errorString2);
81
82

      MPI_Abort ( MPI_COMM_WORLD, iret );
Deike Kleberg's avatar
Deike Kleberg committed
83
84
85
86
87
    }

  return;
}

88
89
/*****************************************************************************/

90
91
void checkMPIstat ( int iret, const char *filename, int line, int rank,
                        MPI_Status *status )
92
{
93
  char errorString[MPI_MAX_ERROR_STRING + 1];
94
95
96
97
98
  int len;

  if ( iret == MPI_ERR_IN_STATUS )
    {
      switch ( status->MPI_ERROR )
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
        {
          fprintf ( stderr, "------- checking error in request ----------\n" );
        case MPI_SUCCESS :
          fprintf ( stderr, "-------- mpi_success -----------\n" );
          break;
        case MPI_ERR_PENDING:
          fprintf ( stderr, "-------- mpi_err_pending ----------\n");
          break;
        default:
          MPI_Error_string ( status->MPI_ERROR, errorString, &len );
          errorString[len] = '\0';
          fprintf ( stderr,"MPI ERROR in request, pe%d, %s, line %d,"
                    "return value: %d, error_string: %s\n",
                    rank, filename, line, iret, errorString );
          MPI_Abort ( MPI_COMM_WORLD, iret );
        }
115
    }
116
  else
117
    checkMPI ( iret, filename, line, rank );
118

119
120
121
  return;
}

122
/***************************************************************/
Deike Kleberg's avatar
Deike Kleberg committed
123
124
125
126
127
128

void * xmalloc ( size_t size )
{
  void * value = malloc ( size );

  if ( value == NULL )
129
    errorPIO ( "malloc did not succeed", __FILE__, __LINE__, -1 );
130

Deike Kleberg's avatar
Deike Kleberg committed
131
132
133
  return value;
}

134
/***************************************************************/
Deike Kleberg's avatar
Deike Kleberg committed
135

Deike Kleberg's avatar
Deike Kleberg committed
136
137
int setTag ( int ID, int sc )
{
138
  return ID * tagKey + sc;
Deike Kleberg's avatar
Deike Kleberg committed
139
140
141
142
143
144
145
146
}

/***************************************************************/

rTag * getTag ( int tag )
{
  rTag *rtag;

Deike Kleberg's avatar
Deike Kleberg committed
147
  rtag = ( rTag * ) xmalloc ( sizeof ( rTag ));
148
149
  rtag->id = tag / tagKey;
  rtag->command = tag % tagKey;
Deike Kleberg's avatar
Deike Kleberg committed
150
151
152
153
154
155

  return rtag;
}

/***************************************************************/

156
157
158
159
160
161
162
163
void ungetTag ( rTag *rtag )
{
  free ( rtag );
  return;
}

/***************************************************************/

Deike Kleberg's avatar
Deike Kleberg committed
164
165
166
167
168
169
size_t pioFileWrite ( int id, int tsId, const void *buffer, size_t len )
{
  size_t iret;

  switch ( pioinfo->type )
    {
170
171
    case PIO_MPI_NONB:
      iret = fwMPINONB ( id, tsId, buffer, len );
Deike Kleberg's avatar
Deike Kleberg committed
172
      break;
Deike Kleberg's avatar
Deike Kleberg committed
173
#ifndef _SX
Deike Kleberg's avatar
Deike Kleberg committed
174
175
176
    case PIO_POSIX_ASYNCH:
      iret = fwPOSIXASYNCH ( id, tsId, buffer, len );
      break;
Deike Kleberg's avatar
Deike Kleberg committed
177
#endif
178
179
180
    case PIO_POSIX_FPGUARD_SENDRECV:
      iret = fwPOSIXFPGUARDSENDRECV ( id, tsId, buffer, len );
      break;
181
182
183
184
185
186
    case PIO_POSIX_FPGUARD_THREAD:
      iret = fwPOSIXFPGUARDTHREAD ( id, tsId, buffer, len );
      break;
    case PIO_POSIX_FPGUARD_THREAD_REFUSE:
      iret = fwPOSIXFPGUARDTHREADREFUSE ( id, tsId, buffer, len );
      break;
187
188
189
    case PIO_POSIX_NONB:
      iret = fwPOSIXNONB ( id, tsId, buffer, len );
      break;
Deike Kleberg's avatar
Deike Kleberg committed
190
191
192
193
194
195
196
197
198
199
    }

  return iret;
}

/***************************************************************/

int pioFileClose ( int id )
{
  int iret;
200

Deike Kleberg's avatar
Deike Kleberg committed
201
202
  switch ( pioinfo->type )
    {
203
204
    case PIO_MPI_NONB:
      iret = fcMPINONB ( id );
Deike Kleberg's avatar
Deike Kleberg committed
205
      break;
Deike Kleberg's avatar
Deike Kleberg committed
206
#ifndef _SX
Deike Kleberg's avatar
Deike Kleberg committed
207
208
209
    case PIO_POSIX_ASYNCH:
      iret = fcPOSIXASYNCH ( id );
      break;
Deike Kleberg's avatar
Deike Kleberg committed
210
#endif
211
212
213
    case PIO_POSIX_FPGUARD_SENDRECV:
      iret = fcPOSIXFPGUARDSENDRECV ( id );
      break;
214
215
216
217
218
219
    case PIO_POSIX_FPGUARD_THREAD:
      iret = fcPOSIXFPGUARDTHREAD ( id );
      break;
    case PIO_POSIX_FPGUARD_THREAD_REFUSE:
      iret = fcPOSIXFPGUARDTHREADREFUSE ( id );
      break;
220
221
222
    case PIO_POSIX_NONB:
      iret = fcPOSIXNONB ( id );
      break;
Deike Kleberg's avatar
Deike Kleberg committed
223
224
225
226
227
228
229
230
231
232
    }

  return iret;
}

/***************************************************************/

int pioFileOpenW ( const char *filename )
{
  int iret;
233

Deike Kleberg's avatar
Deike Kleberg committed
234
235
  switch ( pioinfo->type )
    {
236
237
    case PIO_MPI_NONB:
      iret = fowMPINONB ( filename );
Deike Kleberg's avatar
Deike Kleberg committed
238
      break;
Deike Kleberg's avatar
Deike Kleberg committed
239
#ifndef _SX
Deike Kleberg's avatar
Deike Kleberg committed
240
241
242
    case PIO_POSIX_ASYNCH:
      iret = fowPOSIXASYNCH ( filename );
      break;
Deike Kleberg's avatar
Deike Kleberg committed
243
#endif
244
245
246
    case PIO_POSIX_FPGUARD_SENDRECV:
      iret = fowPOSIXFPGUARDSENDRECV ( filename );
      break;
247
248
249
250
251
252
    case PIO_POSIX_FPGUARD_THREAD:
      iret = fowPOSIXFPGUARDTHREAD ( filename );
      break;
    case PIO_POSIX_FPGUARD_THREAD_REFUSE:
      iret = fowPOSIXFPGUARDTHREADREFUSE ( filename );
      break;
253
254
255
    case PIO_POSIX_NONB:
      iret = fowPOSIXNONB ( filename );
      break;
Deike Kleberg's avatar
Deike Kleberg committed
256
    }
257

Deike Kleberg's avatar
Deike Kleberg committed
258
259
260
261
262
  return iret;
}

/***************************************************************/

263
264
static int cmpr ( const void *a, const void *b )
{
265
266
267
268
269
  return strcmp ( *( char ** ) a, *( char ** ) b);
}

/***************************************************************/

270
271
void setPioCommunicator ( MPI_Comm *myComm, MPI_Comm commF2C, int *color,
                          int *nnodes )
Deike Kleberg's avatar
Deike Kleberg committed
272
{
273
274
  int size, rank, len, npes_node, key, test, i, j;

Deike Kleberg's avatar
Deike Kleberg committed
275
  char *myHost, **allHosts, *allHosts0, *curr;
276
277
  char hostname [ MPI_MAX_PROCESSOR_NAME + 1 ];

278
279
  MPI_Comm_size ( commF2C, &size );
  MPI_Comm_rank ( commF2C, &rank );
280

Deike Kleberg's avatar
Deike Kleberg committed
281
  myHost = ( char * ) xmalloc ( MPI_MAX_PROCESSOR_NAME * sizeof ( char ));
282

Deike Kleberg's avatar
Deike Kleberg committed
283
  memset ( myHost, 0, MPI_MAX_PROCESSOR_NAME * sizeof ( char ));
284

285
  checkMPI (( MPI_Get_processor_name ( myHost, &len )), __FILE__, __LINE__, rank );
Deike Kleberg's avatar
Deike Kleberg committed
286

287
  if ( myHost[0] == '\0' )
288
    errorPIO ( "did not succeed to set hostname", __FILE__, __LINE__ , rank);
Deike Kleberg's avatar
Deike Kleberg committed
289

290
291
292
293
  if ( ddebug )
    {
      strncpy ( hostname, myHost, len );
      hostname [ len ] = '\0';
Deike Kleberg's avatar
Deike Kleberg committed
294
      fprintf ( stderr, "pe%d: myHost = %s\n", rank,  hostname );
295
    }
296

Deike Kleberg's avatar
Deike Kleberg committed
297
  allHosts = ( char ** ) xmalloc ( size * sizeof ( char * ));
298
299
  allHosts[0] = ( char * ) xmalloc ( size * MPI_MAX_PROCESSOR_NAME *
                                     sizeof ( char ));
Deike Kleberg's avatar
Deike Kleberg committed
300
  allHosts0 = allHosts[0];
Deike Kleberg's avatar
Deike Kleberg committed
301

302
303
  for ( i = 1; i < size; i++ )
    allHosts[i] = allHosts[0] + i * MPI_MAX_PROCESSOR_NAME;
304

305
306
307
  MPI_Allgather ( myHost, MPI_MAX_PROCESSOR_NAME, MPI_CHAR,
                            & ( allHosts[0][0] ), MPI_MAX_PROCESSOR_NAME,
                            MPI_CHAR, commF2C );
308
309
310
311
312
313

  qsort ( allHosts, size, sizeof ( char * ), cmpr );

  *color = 0;
  i = 0;
  j = 0;
314

315
  while ( i < size )
316
317
    {
      curr = allHosts[i];
318
      j++;
319
      if (( test = strcmp ( myHost, curr )) == 0 ) *color = j;
320

321
      while ( ++i < size )
322
323
        if (( test = strcmp ( allHosts[i], curr )) != 0)
          break;
324
    }
Deike Kleberg's avatar
Deike Kleberg committed
325

326
  *nnodes = j;
327

328
  if ( *color == 0 ) errorPIO ( "Color is not set", __FILE__, __LINE__, rank );
329

330
  npes_node = size / ( * nnodes );
Deike Kleberg's avatar
Deike Kleberg committed
331
332
  key = rank % npes_node;

Deike Kleberg's avatar
Deike Kleberg committed
333
  checkMPI ( MPI_Comm_split ( commF2C, *color, key, myComm),
334
             __FILE__, __LINE__, rank );
335

Deike Kleberg's avatar
Deike Kleberg committed
336
  free ( allHosts0 );
337
338
  free ( allHosts );
  free ( myHost );
Deike Kleberg's avatar
Deike Kleberg committed
339
340

  if ( ddebug )
341
342
343
344
    fprintf ( stderr,
              "pe%d in setPioCommunicator, color=%d, before return\n",
              rank, *color );

345
346
347
348
349
350
351
  return;
}

#endif

/***************************************************************/

352
353
int pioInit ( int ptype, MPI_Comm comm, int *color, int *nnodes,
              MPI_Comm *pioCollComm )
354
{
Deike Kleberg's avatar
Deike Kleberg committed
355
  int size, rank;
356
  int collectingData = 1;
357
358
  *nnodes = 1;
  *color = 1;
Deike Kleberg's avatar
Deike Kleberg committed
359

360
361
362
  MPI_Comm_set_errhandler ( comm, MPI_ERRORS_RETURN );
  MPI_Comm_size ( comm, &size );
  MPI_Comm_rank ( comm, &rank );
363
364

  if ( ptype < 0 || ptype > maxPtype )
365
    errorPIO ( "PIOTYPE is no valid modus", __FILE__, __LINE__, rank );
Deike Kleberg's avatar
Deike Kleberg committed
366
367
368
369
370

#ifdef _SX
  if ( ptype ==  PIO_POSIX_ASYNCH )
    errorPIO ( "PIO_POSIX_ASYNCH does not work on SX", __FILE__, __LINE__, rank );
#endif
371

372
  pioinfo = ( pioInfo * ) xmalloc ( sizeof ( pioInfo ));
373
374

  pioinfo->type = ptype;
Deike Kleberg's avatar
Deike Kleberg committed
375

376
  setPioCommunicator ( & ( pioinfo->comm ), comm, color, nnodes );
377

378
379
  if (( pioinfo->type == PIO_NONE ) && ( *nnodes != size ))
    errorPIO ( "PIOTYPE, NNODES: not a valid combination", __FILE__, __LINE__, rank );
380

381
  pioinfo->color = *color;
Deike Kleberg's avatar
Deike Kleberg committed
382
383
  MPI_Comm_rank ( pioinfo->comm, &( pioinfo->rank ));
  MPI_Comm_size ( pioinfo->comm, &( pioinfo->size ));
384

Deike Kleberg's avatar
Deike Kleberg committed
385
  if ( ddebug && pioinfo->rank == 0 )
386
387
388
389
390
    fprintf ( stderr,
              "pe%d in pioInit(), ptype=%d, initial_buffersize=%ld: "
              "after init pioinfo ...\n",
              pioinfo->rank, pioinfo->type, initial_buffersize );

391
  pioinfo->collectorComm = MPI_COMM_NULL;
Deike Kleberg's avatar
Deike Kleberg committed
392

Deike Kleberg's avatar
Deike Kleberg committed
393
394
  switch ( pioinfo->type )
    {
395
396
    case PIO_NONE:
      MPI_Comm_dup ( pioinfo->comm, &( pioinfo->collectorComm ));
397
      collectingData = 1;
398
      break;
399
    case PIO_MPI_NONB:
400
      collectingData = initMPINONB ();
Deike Kleberg's avatar
Deike Kleberg committed
401
      break;
Deike Kleberg's avatar
Deike Kleberg committed
402
#ifndef _SX
Deike Kleberg's avatar
Deike Kleberg committed
403
    case PIO_POSIX_ASYNCH:
404
      collectingData = initPOSIXASYNCH ();
Deike Kleberg's avatar
Deike Kleberg committed
405
      break;
Deike Kleberg's avatar
Deike Kleberg committed
406
#endif
407
    case PIO_POSIX_FPGUARD_SENDRECV:
408
      collectingData = initPOSIXFPGUARDSENDRECV ();
409
      break;
410
    case PIO_POSIX_FPGUARD_THREAD:
411
      collectingData = initPOSIXFPGUARDTHREAD ();
412
413
      break;
    case PIO_POSIX_FPGUARD_THREAD_REFUSE:
414
      collectingData = initPOSIXFPGUARDTHREADREFUSE ();
415
      break;
Deike Kleberg's avatar
Deike Kleberg committed
416
    case PIO_POSIX_NONB:
417
      collectingData = initPOSIXNONB ();
418
      break;
Deike Kleberg's avatar
Deike Kleberg committed
419
    }
420
421
422

  *pioCollComm = pioinfo->collectorComm;

Deike Kleberg's avatar
Deike Kleberg committed
423
  if ( ddebug )
Deike Kleberg's avatar
Deike Kleberg committed
424
    fprintf ( stderr, "pe in pioinit out\n" );
425

Deike Kleberg's avatar
Deike Kleberg committed
426
427
428
429
430
431
432
  return collectingData;
}

/***************************************************************/

void pioFinalize ()
{
433
#ifdef USE_MPI
434
435
436
437
438
439
440
  switch ( pioinfo->type )
    {
    case PIO_POSIX_FPGUARD_THREAD:
      finalizePOSIXFPGUARDTHREAD ();
      break;
    }

441
  if ( pioinfo->collectorComm != MPI_COMM_NULL )
442
    MPI_Comm_free ( &( pioinfo->collectorComm ));
443
444

  if ( pioinfo->comm != MPI_COMM_NULL )
445
446
    MPI_Comm_free ( &( pioinfo->comm ));

Deike Kleberg's avatar
Deike Kleberg committed
447
  free ( pioinfo );
448
#endif
Deike Kleberg's avatar
Deike Kleberg committed
449
450
451

  return;
}