pio.c 10.7 KB
Newer Older
Deike Kleberg's avatar
Deike Kleberg committed
1
2
3
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
Deike Kleberg's avatar
Deike Kleberg committed
4
#include <string.h>
5
#include <unistd.h>
Deike Kleberg's avatar
Deike Kleberg committed
6
7
8

#ifndef NOMPI

Deike Kleberg's avatar
Deike Kleberg committed
9
10
11
12
13
#include "mpi.h"
#include "cdi.h"
#include "pio.h"
#include "pio_impl.h"

Deike Kleberg's avatar
Deike Kleberg committed
14
15
#endif

16
bool ddebug = false;
Deike Kleberg's avatar
Deike Kleberg committed
17

Deike Kleberg's avatar
Deike Kleberg committed
18
19
#ifndef NOMPI

20
21
22
char * command2charP[6] = {"IO_Open_file", "IO_Close_file",
			   "IO_Get_fp","IO_Set_fp",
			   "IO_Send_buffer", "IO_Finalize"};
Deike Kleberg's avatar
Deike Kleberg committed
23

24
25
26
long initial_buffersize = 16 * 1024 * 1024;
/*  4 KB <= x < 256 MB */ 
/* 16 * 1024 * 1024; */
27
/* 16 * 1024; */
28
/* 4 * 1024; */
Deike Kleberg's avatar
Deike Kleberg committed
29

Deike Kleberg's avatar
Deike Kleberg committed
30
int maxPtype = 4;
31
int maxNnodes  = 249;
32
int tagKey = 100;
33
int maxErrorString = 100;
34

Deike Kleberg's avatar
Deike Kleberg committed
35
36
37
38
39
40
41
42
43
44
double startTime;
double accumProbe   = 0.0;
double accumRecv    = 0.0;
double accumSend    = 0.0;
double accumSuspend = 0.0;
double accumWait    = 0.0;
double accumWrite   = 0.0;

pioInfo *pioinfo;

45
46
char *token = "%";

Deike Kleberg's avatar
Deike Kleberg committed
47
48
/*****************************************************************************/

49
void errorPIO ( char *errorString, const char *filename, int line, int rank )
Deike Kleberg's avatar
Deike Kleberg committed
50
{
51
52
53
  fprintf ( stderr, "PIO ERROR, pe%d, %s, line %d, errorString: \"%s\"\n",
	    rank, filename, line, errorString );
  MPI_Abort ( MPI_COMM_WORLD, 1 );
54
55
56
57
58
59
}

/*****************************************************************************/

void checkMPI ( int iret, const char *filename, int line, int rank )
{
60
61
62
  char errorString1[MPI_MAX_ERROR_STRING + 1];
  char errorString2[MPI_MAX_ERROR_STRING + 1];
  int len, errorClass;   
Deike Kleberg's avatar
Deike Kleberg committed
63

64
  if ( iret != MPI_SUCCESS )
Deike Kleberg's avatar
Deike Kleberg committed
65
    {
66
67
68
69
70
71
72
73
74
75
76
77
78
      MPI_Error_class ( iret, &errorClass );
      MPI_Error_string ( errorClass, errorString1, &len );
      errorString1[len] = '\0';
      MPI_Error_string ( iret, errorString2, &len);
      errorString2[len] = '\0';
      
      fprintf ( stderr, "MPI ERROR, pe%d, %s, line %d,"
		"errorClass: \"%s\""
		"errorString: \"%s\"\n",
		rank, filename, line, 
		errorString1, errorString2);

      MPI_Abort ( MPI_COMM_WORLD, iret );
Deike Kleberg's avatar
Deike Kleberg committed
79
80
81
82
83
    }

  return;
}

84
85
/*****************************************************************************/

86
87
void checkMPIstat ( int iret, const char *filename, int line, int rank,  
			MPI_Status *status )
88
{
89
  char errorString[MPI_MAX_ERROR_STRING + 1];
90
91
92
93
94
95
96
97
98
99
100
101
102
103
  int len;

  if ( iret == MPI_ERR_IN_STATUS )
    {
      switch ( status->MPI_ERROR )
	{
	  fprintf ( stdout, "------- checking error in request ----------\n" );
	case MPI_SUCCESS :
	  fprintf ( stdout, "-------- mpi_success -----------\n" );
	  break;
	case MPI_ERR_PENDING:
	  fprintf ( stdout, "-------- mpi_err_pending ----------\n");
	  break;
	default:
104
105
	  MPI_Error_string ( status->MPI_ERROR, errorString, &len );
	  errorString[len] = '\0';
106
107
108
109
	  fprintf ( stderr,"MPI ERROR in request, pe%d, %s, line %d,"
		    "return value: %d, error_string: %s\n", 
		    rank, filename, line, iret, errorString ); 
	  MPI_Abort ( MPI_COMM_WORLD, iret );
110
111
112
	}
    }
  else 
113
    checkMPI ( iret, filename, line, rank );
114
115
116
117
  
  return;
}

Deike Kleberg's avatar
Deike Kleberg committed
118
119
120
121
122
123
124
/***************************************************************/     

void * xmalloc ( size_t size )
{
  void * value = malloc ( size );

  if ( value == NULL )
125
126
    errorPIO ( "malloc did not succeed", __FILE__, __LINE__, -1 );
  
Deike Kleberg's avatar
Deike Kleberg committed
127
128
129
130
  return value;
}

/***************************************************************/  
Deike Kleberg's avatar
Deike Kleberg committed
131

Deike Kleberg's avatar
Deike Kleberg committed
132
133
int setTag ( int ID, int sc )
{
134
  return ID * tagKey + sc;
Deike Kleberg's avatar
Deike Kleberg committed
135
136
137
138
139
140
141
142
}

/***************************************************************/

rTag * getTag ( int tag )
{
  rTag *rtag;

Deike Kleberg's avatar
Deike Kleberg committed
143
  rtag = ( rTag * ) xmalloc ( sizeof ( rTag ));
144
145
  rtag->id = tag / tagKey;
  rtag->command = tag % tagKey;
Deike Kleberg's avatar
Deike Kleberg committed
146
147
148
149
150
151

  return rtag;
}

/***************************************************************/

152
153
154
155
156
157
158
159
void ungetTag ( rTag *rtag )
{
  free ( rtag );
  return;
}

/***************************************************************/

Deike Kleberg's avatar
Deike Kleberg committed
160
161
162
163
164
165
size_t pioFileWrite ( int id, int tsId, const void *buffer, size_t len )
{
  size_t iret;

  switch ( pioinfo->type )
    {
166
167
    case PIO_MPI_NONB:
      iret = fwMPINONB ( id, tsId, buffer, len );
Deike Kleberg's avatar
Deike Kleberg committed
168
      break;
Deike Kleberg's avatar
Deike Kleberg committed
169
170
171
    case PIO_POSIX_ASYNCH:
      iret = fwPOSIXASYNCH ( id, tsId, buffer, len );
      break;
172
173
174
    case PIO_POSIX_FPGUARD_SENDRECV:
      iret = fwPOSIXFPGUARDSENDRECV ( id, tsId, buffer, len );
      break;
175
176
177
178
179
180
    case PIO_POSIX_FPGUARD_THREAD:
      iret = fwPOSIXFPGUARDTHREAD ( id, tsId, buffer, len );
      break;
    case PIO_POSIX_FPGUARD_THREAD_REFUSE:
      iret = fwPOSIXFPGUARDTHREADREFUSE ( id, tsId, buffer, len );
      break;
181
182
183
    case PIO_POSIX_NONB:
      iret = fwPOSIXNONB ( id, tsId, buffer, len );
      break;
Deike Kleberg's avatar
Deike Kleberg committed
184
185
186
187
188
189
190
191
192
193
194
195
196
    }

  return iret;
}

/***************************************************************/

int pioFileClose ( int id )
{
  int iret;
  
  switch ( pioinfo->type )
    {
197
198
    case PIO_MPI_NONB:
      iret = fcMPINONB ( id );
Deike Kleberg's avatar
Deike Kleberg committed
199
      break;
Deike Kleberg's avatar
Deike Kleberg committed
200
201
202
    case PIO_POSIX_ASYNCH:
      iret = fcPOSIXASYNCH ( id );
      break;
203
204
205
    case PIO_POSIX_FPGUARD_SENDRECV:
      iret = fcPOSIXFPGUARDSENDRECV ( id );
      break;
206
207
208
209
210
211
    case PIO_POSIX_FPGUARD_THREAD:
      iret = fcPOSIXFPGUARDTHREAD ( id );
      break;
    case PIO_POSIX_FPGUARD_THREAD_REFUSE:
      iret = fcPOSIXFPGUARDTHREADREFUSE ( id );
      break;
212
213
214
    case PIO_POSIX_NONB:
      iret = fcPOSIXNONB ( id );
      break;
Deike Kleberg's avatar
Deike Kleberg committed
215
216
217
218
219
220
221
222
223
224
225
226
227
    }

  return iret;
}

/***************************************************************/

int pioFileOpenW ( const char *filename )
{
  int iret;
  
  switch ( pioinfo->type )
    {
228
229
    case PIO_MPI_NONB:
      iret = fowMPINONB ( filename );
Deike Kleberg's avatar
Deike Kleberg committed
230
      break;
Deike Kleberg's avatar
Deike Kleberg committed
231
232
233
    case PIO_POSIX_ASYNCH:
      iret = fowPOSIXASYNCH ( filename );
      break;
234
235
236
    case PIO_POSIX_FPGUARD_SENDRECV:
      iret = fowPOSIXFPGUARDSENDRECV ( filename );
      break;
237
238
239
240
241
242
    case PIO_POSIX_FPGUARD_THREAD:
      iret = fowPOSIXFPGUARDTHREAD ( filename );
      break;
    case PIO_POSIX_FPGUARD_THREAD_REFUSE:
      iret = fowPOSIXFPGUARDTHREADREFUSE ( filename );
      break;
243
244
245
    case PIO_POSIX_NONB:
      iret = fowPOSIXNONB ( filename );
      break;
Deike Kleberg's avatar
Deike Kleberg committed
246
247
248
249
250
251
252
    }
  
  return iret;
}

/***************************************************************/

253
254
255
256
257
258
259
260
static int cmpr ( const void *a, const void *b ) 
{ 
  return strcmp ( *( char ** ) a, *( char ** ) b);
}

/***************************************************************/

void setPioCommunicator ( MPI_Comm *myComm, MPI_Comm commF2C, int *color, 
261
			  int *nnodes )
Deike Kleberg's avatar
Deike Kleberg committed
262
{
263
264
  int size, rank, len, npes_node, key, test, i, j;

Deike Kleberg's avatar
Deike Kleberg committed
265
  char *myHost, **allHosts, *allHosts0, *curr;
266
267
  char hostname [ MPI_MAX_PROCESSOR_NAME + 1 ];

268
269
  MPI_Comm_size ( commF2C, &size );
  MPI_Comm_rank ( commF2C, &rank );
270

Deike Kleberg's avatar
Deike Kleberg committed
271
  myHost = ( char * ) xmalloc ( MPI_MAX_PROCESSOR_NAME * sizeof ( char ));
272
  
Deike Kleberg's avatar
Deike Kleberg committed
273
  memset ( myHost, 0, MPI_MAX_PROCESSOR_NAME * sizeof ( char ));
274
275
  
  checkMPI (( MPI_Get_processor_name ( myHost, &len )), __FILE__, __LINE__, rank );
Deike Kleberg's avatar
Deike Kleberg committed
276

277
  if ( myHost[0] == '\0' ) 
278
    errorPIO ( "did not succeed to set hostname", __FILE__, __LINE__ , rank);
279
  
280
281
282
283
284
285
  if ( ddebug )
    {
      strncpy ( hostname, myHost, len );
      hostname [ len ] = '\0';
      fprintf ( stdout, "pe%d: myHost = %s\n", rank,  hostname );
    }
286
  
Deike Kleberg's avatar
Deike Kleberg committed
287
288
289
290
291
  
  allHosts = ( char ** ) xmalloc ( size * sizeof ( char * ));
  allHosts[0] = ( char * ) xmalloc ( size * MPI_MAX_PROCESSOR_NAME * 
				     sizeof ( char ));
  allHosts0 = allHosts[0];
Deike Kleberg's avatar
Deike Kleberg committed
292

293
294
  for ( i = 1; i < size; i++ )
    allHosts[i] = allHosts[0] + i * MPI_MAX_PROCESSOR_NAME;
295
296

  MPI_Allgather ( myHost, MPI_MAX_PROCESSOR_NAME, MPI_CHAR, 
Deike Kleberg's avatar
Deike Kleberg committed
297
			    & ( allHosts[0][0] ), MPI_MAX_PROCESSOR_NAME, 
298
			    MPI_CHAR, commF2C );
299
300
301
302
303
304

  qsort ( allHosts, size, sizeof ( char * ), cmpr );

  *color = 0;
  i = 0;
  j = 0;
305

306
  while ( i < size )
307
308
    {
      curr = allHosts[i];
309
      j++;
310
      if (( test = strcmp ( myHost, curr )) == 0 ) *color = j;
311
312
313
314
     
      while ( ++i < size )
	if (( test = strcmp ( allHosts[i], curr )) != 0)
	  break;
315
    }
Deike Kleberg's avatar
Deike Kleberg committed
316

317
318
  *nnodes = j;
      
319
  if ( *color == 0 ) errorPIO ( "Color is not set", __FILE__, __LINE__, rank );
320
  
321
  npes_node = size / ( * nnodes );
Deike Kleberg's avatar
Deike Kleberg committed
322
323
  key = rank % npes_node;

Deike Kleberg's avatar
Deike Kleberg committed
324
325
  checkMPI ( MPI_Comm_split ( commF2C, *color, key, myComm),
	     __FILE__, __LINE__, rank );
326

Deike Kleberg's avatar
Deike Kleberg committed
327
  free ( allHosts0 );
328
329
  free ( allHosts );
  free ( myHost );
Deike Kleberg's avatar
Deike Kleberg committed
330
331
332
333
334

  if ( ddebug )
    fprintf ( stdout, 
	      "pe%d in setPioCommunicator, color=%d, before return\n", 
	      rank, *color );
335
336
337
338
339
340
341
342
  
  return;
}

#endif

/***************************************************************/

343
int pioInit ( int ptype, int commF, int *color, int *nnodes, int *pioCollComm2F )
344
345
346
{

  int collectingData = 1;
347
348
  *nnodes = 1;
  *color = 1;
349
  *pioCollComm2F = 0;
350
351
352
353

#ifndef NOMPI

  MPI_Comm comm;
Deike Kleberg's avatar
Deike Kleberg committed
354
355
356
  int size, rank;
  
  comm = MPI_COMM_NULL;
357

Deike Kleberg's avatar
Deike Kleberg committed
358
  if (( comm = MPI_Comm_f2c (( MPI_Fint ) commF )) == NULL )
359
360
    errorPIO ( "MPI_Comm_f2c didn't succeed", __FILE__, __LINE__, -1 );
  
361
362
363
  MPI_Comm_set_errhandler ( comm, MPI_ERRORS_RETURN );
  MPI_Comm_size ( comm, &size );
  MPI_Comm_rank ( comm, &rank );
364
  
365
366
  if ( ptype < 0 || ptype > maxPtype ) 
    errorPIO ( "PIOTYPE is no valid modus", __FILE__, __LINE__, rank );
367
  
368
  pioinfo = ( pioInfo * ) xmalloc ( sizeof ( pioInfo ));
369
  
370
371
  pioinfo->type = ptype; 
  setPioCommunicator ( & ( pioinfo->comm ), comm, color, nnodes );
372
  
373
374
  if (( pioinfo->type == PIO_NONE ) && ( *nnodes != size ))
    errorPIO ( "PIOTYPE, NNODES: not a valid combination", __FILE__, __LINE__, rank );
375
  
376
  pioinfo->color = *color;
Deike Kleberg's avatar
Deike Kleberg committed
377
378
  MPI_Comm_rank ( pioinfo->comm, &( pioinfo->rank ));
  MPI_Comm_size ( pioinfo->comm, &( pioinfo->size ));
379
  
Deike Kleberg's avatar
Deike Kleberg committed
380
381
  if ( ddebug && pioinfo->rank == 0 )
    fprintf ( stdout, 
Deike Kleberg's avatar
Deike Kleberg committed
382
383
	      "pe%d in pioDefPtype(), ptype=%d, initial_buffersize=%ld: "
	      "after init pioinfo ...\n", 
384
	      pioinfo->rank, pioinfo->type, initial_buffersize );
385
  
386
  pioinfo->collectorComm = MPI_COMM_NULL;
387
  
Deike Kleberg's avatar
Deike Kleberg committed
388
389
  switch ( pioinfo->type )
    {
390
391
    case PIO_NONE:
      MPI_Comm_dup ( pioinfo->comm, &( pioinfo->collectorComm ));
392
      collectingData = 1;
393
      break;
394
    case PIO_MPI_NONB:
395
      collectingData = initMPINONB ();
Deike Kleberg's avatar
Deike Kleberg committed
396
      break;
Deike Kleberg's avatar
Deike Kleberg committed
397
    case PIO_POSIX_ASYNCH:
398
      collectingData = initPOSIXASYNCH ();
Deike Kleberg's avatar
Deike Kleberg committed
399
      break;
400
    case PIO_POSIX_FPGUARD_SENDRECV:
401
      collectingData = initPOSIXFPGUARDSENDRECV ();
402
      break;
403
    case PIO_POSIX_FPGUARD_THREAD:
404
      collectingData = initPOSIXFPGUARDTHREAD ();
405
406
      break;
    case PIO_POSIX_FPGUARD_THREAD_REFUSE:
407
      collectingData = initPOSIXFPGUARDTHREADREFUSE ();
408
      break;
Deike Kleberg's avatar
Deike Kleberg committed
409
    case PIO_POSIX_NONB:      
410
      collectingData = initPOSIXNONB ();
411
      break;
Deike Kleberg's avatar
Deike Kleberg committed
412
    }
413
  
414
415
  *pioCollComm2F  = 0;
  if (( *pioCollComm2F = MPI_Comm_c2f ( pioinfo->collectorComm )) == 0 )
416
417
    errorPIO ( "MPI_Comm_c2f didn't succeed\n", __FILE__, __LINE__ , rank);
  
418
#endif
Deike Kleberg's avatar
Deike Kleberg committed
419
420
421
  
  if ( ddebug )
    fprintf ( stdout, "pe in pioinit out\n" );
422
  
Deike Kleberg's avatar
Deike Kleberg committed
423
424
425
426
427
428
429
  return collectingData;
}

/***************************************************************/

void pioFinalize ()
{
430
#ifndef NOMPI
431
432
433
434
435
436
437
  switch ( pioinfo->type )
    {
    case PIO_POSIX_FPGUARD_THREAD:
      finalizePOSIXFPGUARDTHREAD ();
      break;
    }

438
439
440
441
442
443
  if ( pioinfo->collectorComm != MPI_COMM_NULL ) 
    MPI_Comm_free ( &( pioinfo->collectorComm ));
  
  if ( pioinfo->comm != MPI_COMM_NULL ) 
    MPI_Comm_free ( &( pioinfo->comm ));

Deike Kleberg's avatar
Deike Kleberg committed
444
  free ( pioinfo );
445
#endif
Deike Kleberg's avatar
Deike Kleberg committed
446
447
448
449

  return;
}