pio.c 10.6 KB
Newer Older
1
2
3
4
#ifdef HAVE_CONFIG_H
#  include "config.h"
#endif

Deike Kleberg's avatar
Deike Kleberg committed
5
6
7
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
Deike Kleberg's avatar
Deike Kleberg committed
8
#include <string.h>
9
#include <unistd.h>
Deike Kleberg's avatar
Deike Kleberg committed
10

11
#ifdef USE_MPI
Deike Kleberg's avatar
Deike Kleberg committed
12

Deike Kleberg's avatar
Deike Kleberg committed
13
14
15
#include "cdi.h"
#include "pio.h"
#include "pio_impl.h"
Deike Kleberg's avatar
Deike Kleberg committed
16
#include "pio_util.h"
Deike Kleberg's avatar
Deike Kleberg committed
17

Deike Kleberg's avatar
Deike Kleberg committed
18

Deike Kleberg's avatar
Deike Kleberg committed
19
20
#endif

21
#ifdef USE_MPI
Deike Kleberg's avatar
Deike Kleberg committed
22

23
char * command2charP[6] = {"IO_Open_file", "IO_Close_file",
24
25
			   "IO_Get_fp","IO_Set_fp",
			   "IO_Send_buffer", "IO_Finalize"};
Deike Kleberg's avatar
Deike Kleberg committed
26

27
long initial_buffersize = 16 * 1024 * 1024;
28
/*  4 KB <= x < 256 MB */ 
29
/* 16 * 1024 * 1024; */
30
/* 16 * 1024; */
31
/* 4 * 1024; */
Deike Kleberg's avatar
Deike Kleberg committed
32

Deike Kleberg's avatar
Deike Kleberg committed
33
int maxPtype = 4;
34
int maxNnodes  = 249;
35
int tagKey = 100;
36
int maxErrorString = 100;
37

Deike Kleberg's avatar
Deike Kleberg committed
38
39
40
41
42
43
44
45
46
47
double startTime;
double accumProbe   = 0.0;
double accumRecv    = 0.0;
double accumSend    = 0.0;
double accumSuspend = 0.0;
double accumWait    = 0.0;
double accumWrite   = 0.0;

pioInfo *pioinfo;

48
49
char *token = "%";

Deike Kleberg's avatar
Deike Kleberg committed
50
51
/*****************************************************************************/

52
void errorPIO ( char *errorString, const char *filename, int line, int rank )
Deike Kleberg's avatar
Deike Kleberg committed
53
{
54
  fprintf ( stderr, "PIO ERROR, pe%d, %s, line %d, errorString: \"%s\"\n",
55
	    rank, filename, line, errorString );
56
  MPI_Abort ( MPI_COMM_WORLD, 1 );
57
58
59
60
61
62
}

/*****************************************************************************/

void checkMPI ( int iret, const char *filename, int line, int rank )
{
63
64
  char errorString1[MPI_MAX_ERROR_STRING + 1];
  char errorString2[MPI_MAX_ERROR_STRING + 1];
65
  int len, errorClass;   
Deike Kleberg's avatar
Deike Kleberg committed
66

67
  if ( iret != MPI_SUCCESS )
Deike Kleberg's avatar
Deike Kleberg committed
68
    {
69
70
71
72
73
      MPI_Error_class ( iret, &errorClass );
      MPI_Error_string ( errorClass, errorString1, &len );
      errorString1[len] = '\0';
      MPI_Error_string ( iret, errorString2, &len);
      errorString2[len] = '\0';
74
      
75
      fprintf ( stderr, "MPI ERROR, pe%d, %s, line %d,"
76
77
78
79
		"errorClass: \"%s\""
		"errorString: \"%s\"\n",
		rank, filename, line, 
		errorString1, errorString2);
80
81

      MPI_Abort ( MPI_COMM_WORLD, iret );
Deike Kleberg's avatar
Deike Kleberg committed
82
83
84
85
86
    }

  return;
}

87
88
/*****************************************************************************/

89
90
void checkMPIstat ( int iret, const char *filename, int line, int rank,  
			MPI_Status *status )
91
{
92
  char errorString[MPI_MAX_ERROR_STRING + 1];
93
94
95
96
97
  int len;

  if ( iret == MPI_ERR_IN_STATUS )
    {
      switch ( status->MPI_ERROR )
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
	{
	  fprintf ( stderr, "------- checking error in request ----------\n" );
	case MPI_SUCCESS :
	  fprintf ( stderr, "-------- mpi_success -----------\n" );
	  break;
	case MPI_ERR_PENDING:
	  fprintf ( stderr, "-------- mpi_err_pending ----------\n");
	  break;
	default:
	  MPI_Error_string ( status->MPI_ERROR, errorString, &len );
	  errorString[len] = '\0';
	  fprintf ( stderr,"MPI ERROR in request, pe%d, %s, line %d,"
		    "return value: %d, error_string: %s\n", 
		    rank, filename, line, iret, errorString ); 
	  MPI_Abort ( MPI_COMM_WORLD, iret );
	}
114
    }
115
  else 
116
    checkMPI ( iret, filename, line, rank );
117
  
118
119
120
  return;
}

121
/***************************************************************/     
Deike Kleberg's avatar
Deike Kleberg committed
122
123
124
125
126
127

void * xmalloc ( size_t size )
{
  void * value = malloc ( size );

  if ( value == NULL )
128
    errorPIO ( "malloc did not succeed", __FILE__, __LINE__, -1 );
129
  
Deike Kleberg's avatar
Deike Kleberg committed
130
131
132
  return value;
}

133
/***************************************************************/  
Deike Kleberg's avatar
Deike Kleberg committed
134

Deike Kleberg's avatar
Deike Kleberg committed
135
136
int setTag ( int ID, int sc )
{
137
  return ID * tagKey + sc;
Deike Kleberg's avatar
Deike Kleberg committed
138
139
140
141
142
143
144
145
}

/***************************************************************/

rTag * getTag ( int tag )
{
  rTag *rtag;

Deike Kleberg's avatar
Deike Kleberg committed
146
  rtag = ( rTag * ) xmalloc ( sizeof ( rTag ));
147
148
  rtag->id = tag / tagKey;
  rtag->command = tag % tagKey;
Deike Kleberg's avatar
Deike Kleberg committed
149
150
151
152
153
154

  return rtag;
}

/***************************************************************/

155
156
157
158
159
160
161
162
void ungetTag ( rTag *rtag )
{
  free ( rtag );
  return;
}

/***************************************************************/

Deike Kleberg's avatar
Deike Kleberg committed
163
164
165
166
167
168
size_t pioFileWrite ( int id, int tsId, const void *buffer, size_t len )
{
  size_t iret;

  switch ( pioinfo->type )
    {
169
170
    case PIO_MPI_NONB:
      iret = fwMPINONB ( id, tsId, buffer, len );
Deike Kleberg's avatar
Deike Kleberg committed
171
      break;
Deike Kleberg's avatar
Deike Kleberg committed
172
#ifndef _SX
Deike Kleberg's avatar
Deike Kleberg committed
173
174
175
    case PIO_POSIX_ASYNCH:
      iret = fwPOSIXASYNCH ( id, tsId, buffer, len );
      break;
Deike Kleberg's avatar
Deike Kleberg committed
176
#endif
177
178
179
    case PIO_POSIX_FPGUARD_SENDRECV:
      iret = fwPOSIXFPGUARDSENDRECV ( id, tsId, buffer, len );
      break;
180
181
182
183
184
185
    case PIO_POSIX_FPGUARD_THREAD:
      iret = fwPOSIXFPGUARDTHREAD ( id, tsId, buffer, len );
      break;
    case PIO_POSIX_FPGUARD_THREAD_REFUSE:
      iret = fwPOSIXFPGUARDTHREADREFUSE ( id, tsId, buffer, len );
      break;
186
187
188
    case PIO_POSIX_NONB:
      iret = fwPOSIXNONB ( id, tsId, buffer, len );
      break;
Deike Kleberg's avatar
Deike Kleberg committed
189
190
191
192
193
194
195
196
197
198
    }

  return iret;
}

/***************************************************************/

int pioFileClose ( int id )
{
  int iret;
199
  
Deike Kleberg's avatar
Deike Kleberg committed
200
201
  switch ( pioinfo->type )
    {
202
203
    case PIO_MPI_NONB:
      iret = fcMPINONB ( id );
Deike Kleberg's avatar
Deike Kleberg committed
204
      break;
Deike Kleberg's avatar
Deike Kleberg committed
205
#ifndef _SX
Deike Kleberg's avatar
Deike Kleberg committed
206
207
208
    case PIO_POSIX_ASYNCH:
      iret = fcPOSIXASYNCH ( id );
      break;
Deike Kleberg's avatar
Deike Kleberg committed
209
#endif
210
211
212
    case PIO_POSIX_FPGUARD_SENDRECV:
      iret = fcPOSIXFPGUARDSENDRECV ( id );
      break;
213
214
215
216
217
218
    case PIO_POSIX_FPGUARD_THREAD:
      iret = fcPOSIXFPGUARDTHREAD ( id );
      break;
    case PIO_POSIX_FPGUARD_THREAD_REFUSE:
      iret = fcPOSIXFPGUARDTHREADREFUSE ( id );
      break;
219
220
221
    case PIO_POSIX_NONB:
      iret = fcPOSIXNONB ( id );
      break;
Deike Kleberg's avatar
Deike Kleberg committed
222
223
224
225
226
227
228
229
230
231
    }

  return iret;
}

/***************************************************************/

int pioFileOpenW ( const char *filename )
{
  int iret;
232
  
Deike Kleberg's avatar
Deike Kleberg committed
233
234
  switch ( pioinfo->type )
    {
235
236
    case PIO_MPI_NONB:
      iret = fowMPINONB ( filename );
Deike Kleberg's avatar
Deike Kleberg committed
237
      break;
Deike Kleberg's avatar
Deike Kleberg committed
238
#ifndef _SX
Deike Kleberg's avatar
Deike Kleberg committed
239
240
241
    case PIO_POSIX_ASYNCH:
      iret = fowPOSIXASYNCH ( filename );
      break;
Deike Kleberg's avatar
Deike Kleberg committed
242
#endif
243
244
245
    case PIO_POSIX_FPGUARD_SENDRECV:
      iret = fowPOSIXFPGUARDSENDRECV ( filename );
      break;
246
247
248
249
250
251
    case PIO_POSIX_FPGUARD_THREAD:
      iret = fowPOSIXFPGUARDTHREAD ( filename );
      break;
    case PIO_POSIX_FPGUARD_THREAD_REFUSE:
      iret = fowPOSIXFPGUARDTHREADREFUSE ( filename );
      break;
252
253
254
    case PIO_POSIX_NONB:
      iret = fowPOSIXNONB ( filename );
      break;
Deike Kleberg's avatar
Deike Kleberg committed
255
    }
256
  
Deike Kleberg's avatar
Deike Kleberg committed
257
258
259
260
261
  return iret;
}

/***************************************************************/

262
263
static int cmpr ( const void *a, const void *b ) 
{ 
264
265
266
267
268
  return strcmp ( *( char ** ) a, *( char ** ) b);
}

/***************************************************************/

269
270
void setPioCommunicator ( MPI_Comm *myComm, MPI_Comm commF2C, int *color, 
			  int *nnodes )
Deike Kleberg's avatar
Deike Kleberg committed
271
{
272
273
  int size, rank, len, npes_node, key, test, i, j;

Deike Kleberg's avatar
Deike Kleberg committed
274
  char *myHost, **allHosts, *allHosts0, *curr;
275
276
  char hostname [ MPI_MAX_PROCESSOR_NAME + 1 ];

277
278
  MPI_Comm_size ( commF2C, &size );
  MPI_Comm_rank ( commF2C, &rank );
279

Deike Kleberg's avatar
Deike Kleberg committed
280
  myHost = ( char * ) xmalloc ( MPI_MAX_PROCESSOR_NAME * sizeof ( char ));
281
  
Deike Kleberg's avatar
Deike Kleberg committed
282
  memset ( myHost, 0, MPI_MAX_PROCESSOR_NAME * sizeof ( char ));
283
  
284
  checkMPI (( MPI_Get_processor_name ( myHost, &len )), __FILE__, __LINE__, rank );
Deike Kleberg's avatar
Deike Kleberg committed
285

286
  if ( myHost[0] == '\0' ) 
287
    errorPIO ( "did not succeed to set hostname", __FILE__, __LINE__ , rank);
Deike Kleberg's avatar
Deike Kleberg committed
288

Deike Kleberg's avatar
Deike Kleberg committed
289
  if ( ddebug == MAXDEBUG )
290
291
292
    {
      strncpy ( hostname, myHost, len );
      hostname [ len ] = '\0';
Deike Kleberg's avatar
Deike Kleberg committed
293
      fprintf ( stderr, "pe%d: myHost = %s\n", rank,  hostname );
294
    }
295
  
Deike Kleberg's avatar
Deike Kleberg committed
296
  allHosts = ( char ** ) xmalloc ( size * sizeof ( char * ));
297
298
  allHosts[0] = ( char * ) xmalloc ( size * MPI_MAX_PROCESSOR_NAME * 
				     sizeof ( char ));
Deike Kleberg's avatar
Deike Kleberg committed
299
  allHosts0 = allHosts[0];
Deike Kleberg's avatar
Deike Kleberg committed
300

301
302
  for ( i = 1; i < size; i++ )
    allHosts[i] = allHosts[0] + i * MPI_MAX_PROCESSOR_NAME;
303

304
305
306
  MPI_Allgather ( myHost, MPI_MAX_PROCESSOR_NAME, MPI_CHAR, 
			    & ( allHosts[0][0] ), MPI_MAX_PROCESSOR_NAME, 
			    MPI_CHAR, commF2C );
307
308
309
310
311
312

  qsort ( allHosts, size, sizeof ( char * ), cmpr );

  *color = 0;
  i = 0;
  j = 0;
313

314
  while ( i < size )
315
316
    {
      curr = allHosts[i];
317
      j++;
318
      if (( test = strcmp ( myHost, curr )) == 0 ) *color = j;
319
     
320
      while ( ++i < size )
321
322
	if (( test = strcmp ( allHosts[i], curr )) != 0)
	  break;
323
    }
Deike Kleberg's avatar
Deike Kleberg committed
324

325
  *nnodes = j;
326
      
327
  if ( *color == 0 ) errorPIO ( "Color is not set", __FILE__, __LINE__, rank );
328
  
329
  npes_node = size / ( * nnodes );
Deike Kleberg's avatar
Deike Kleberg committed
330
331
  key = rank % npes_node;

Deike Kleberg's avatar
Deike Kleberg committed
332
  checkMPI ( MPI_Comm_split ( commF2C, *color, key, myComm),
333
	     __FILE__, __LINE__, rank );
334

Deike Kleberg's avatar
Deike Kleberg committed
335
  free ( allHosts0 );
336
337
  free ( allHosts );
  free ( myHost );
Deike Kleberg's avatar
Deike Kleberg committed
338

Deike Kleberg's avatar
Deike Kleberg committed
339
  if ( ddebug == MAXDEBUG )
340
341
342
343
    fprintf ( stderr, 
	      "pe%d in setPioCommunicator, color=%d, before return\n", 
	      rank, *color );
  
344
345
346
347
348
349
350
  return;
}

#endif

/***************************************************************/

Deike Kleberg's avatar
Deike Kleberg committed
351
MPI_Comm bInit ( int ptype, MPI_Comm comm, int *color, int *nnodes )
352
{
Deike Kleberg's avatar
Deike Kleberg committed
353
  char fnName[] = "bInit()";
Deike Kleberg's avatar
Deike Kleberg committed
354
  int size, rank;
355
  int collectingData = 1;
356
357
  *nnodes = 1;
  *color = 1;
358

Deike Kleberg's avatar
Deike Kleberg committed
359
  if ( ddebug == MAXDEBUG )
Deike Kleberg's avatar
Deike Kleberg committed
360
    myDebug ( __FILE__, fnName, __LINE__ );
Deike Kleberg's avatar
Deike Kleberg committed
361

Deike Kleberg's avatar
Deike Kleberg committed
362
#ifdef USE_MPI
363
364
  MPI_Comm_size ( comm, &size );
  MPI_Comm_rank ( comm, &rank );
365
366
  
  if ( ptype < 0 || ptype > maxPtype ) 
367
    errorPIO ( "PIOTYPE is no valid modus", __FILE__, __LINE__, rank );
Deike Kleberg's avatar
Deike Kleberg committed
368
369
370
371
372

#ifdef _SX
  if ( ptype ==  PIO_POSIX_ASYNCH )
    errorPIO ( "PIO_POSIX_ASYNCH does not work on SX", __FILE__, __LINE__, rank );
#endif
373
  
374
  pioinfo = ( pioInfo * ) xmalloc ( sizeof ( pioInfo ));
375
376
  
  pioinfo->type = ptype; 
Deike Kleberg's avatar
Deike Kleberg committed
377

378
  setPioCommunicator ( & ( pioinfo->comm ), comm, color, nnodes );
379
  
380
381
  if (( pioinfo->type == PIO_NONE ) && ( *nnodes != size ))
    errorPIO ( "PIOTYPE, NNODES: not a valid combination", __FILE__, __LINE__, rank );
382
  
383
  pioinfo->color = *color;
Deike Kleberg's avatar
Deike Kleberg committed
384
385
  MPI_Comm_rank ( pioinfo->comm, &( pioinfo->rank ));
  MPI_Comm_size ( pioinfo->comm, &( pioinfo->size ));
386
  
Deike Kleberg's avatar
Deike Kleberg committed
387
  if ( ddebug == MAXDEBUG && pioinfo->rank == 0 )
388
    fprintf ( stderr, 
Deike Kleberg's avatar
Deike Kleberg committed
389
	      "pe%d in bInit(), ptype=%d, initial_buffersize=%ld: "
390
391
392
	      "after init pioinfo ...\n", 
	      pioinfo->rank, pioinfo->type, initial_buffersize );
  
393
  pioinfo->collectorComm = MPI_COMM_NULL;
Deike Kleberg's avatar
Deike Kleberg committed
394

Deike Kleberg's avatar
Deike Kleberg committed
395
396
  switch ( pioinfo->type )
    {
397
398
    case PIO_NONE:
      MPI_Comm_dup ( pioinfo->comm, &( pioinfo->collectorComm ));
399
      collectingData = 1;
400
      break;
401
    case PIO_MPI_NONB:
402
      collectingData = initMPINONB ();
Deike Kleberg's avatar
Deike Kleberg committed
403
      break;
Deike Kleberg's avatar
Deike Kleberg committed
404
#ifndef _SX
Deike Kleberg's avatar
Deike Kleberg committed
405
    case PIO_POSIX_ASYNCH:
406
      collectingData = initPOSIXASYNCH ();
Deike Kleberg's avatar
Deike Kleberg committed
407
      break;
Deike Kleberg's avatar
Deike Kleberg committed
408
#endif
409
    case PIO_POSIX_FPGUARD_SENDRECV:
410
      collectingData = initPOSIXFPGUARDSENDRECV ();
411
      break;
412
    case PIO_POSIX_FPGUARD_THREAD:
413
      collectingData = initPOSIXFPGUARDTHREAD ();
414
415
      break;
    case PIO_POSIX_FPGUARD_THREAD_REFUSE:
416
      collectingData = initPOSIXFPGUARDTHREADREFUSE ();
417
      break;
Deike Kleberg's avatar
Deike Kleberg committed
418
    case PIO_POSIX_NONB:
419
      collectingData = initPOSIXNONB ();
420
      break;
Deike Kleberg's avatar
Deike Kleberg committed
421
    }
422
423
#endif
  
Deike Kleberg's avatar
Deike Kleberg committed
424
  if ( ddebug == MAXDEBUG )
Deike Kleberg's avatar
Deike Kleberg committed
425
    fprintf ( stderr, "pe in bInit out\n" );
426
  
Deike Kleberg's avatar
Deike Kleberg committed
427
  return pioinfo->collectorComm;
Deike Kleberg's avatar
Deike Kleberg committed
428
429
430
431
}

/***************************************************************/

Deike Kleberg's avatar
Deike Kleberg committed
432
void bFinalize ()
Deike Kleberg's avatar
Deike Kleberg committed
433
{
434
#ifdef USE_MPI
435
436
437
438
439
440
441
  switch ( pioinfo->type )
    {
    case PIO_POSIX_FPGUARD_THREAD:
      finalizePOSIXFPGUARDTHREAD ();
      break;
    }

442
  if ( pioinfo->collectorComm != MPI_COMM_NULL ) 
443
    MPI_Comm_free ( &( pioinfo->collectorComm ));
444
445
  
  if ( pioinfo->comm != MPI_COMM_NULL ) 
446
447
    MPI_Comm_free ( &( pioinfo->comm ));

Deike Kleberg's avatar
Deike Kleberg committed
448
  free ( pioinfo );
449
#endif
Deike Kleberg's avatar
Deike Kleberg committed
450
451
452

  return;
}
453