pio.c 10.5 KB
Newer Older
Deike Kleberg's avatar
Deike Kleberg committed
1
2
3
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
Deike Kleberg's avatar
Deike Kleberg committed
4
#include <string.h>
5
#include <unistd.h>
Deike Kleberg's avatar
Deike Kleberg committed
6
7
8

#ifndef NOMPI

Deike Kleberg's avatar
Deike Kleberg committed
9
10
11
12
13
#include "mpi.h"
#include "cdi.h"
#include "pio.h"
#include "pio_impl.h"

Deike Kleberg's avatar
Deike Kleberg committed
14
15
#endif

16
bool ddebug = false;
Deike Kleberg's avatar
Deike Kleberg committed
17

Deike Kleberg's avatar
Deike Kleberg committed
18
19
#ifndef NOMPI

20
21
22
char * command2charP[6] = {"IO_Open_file", "IO_Close_file",
			   "IO_Get_fp","IO_Set_fp",
			   "IO_Send_buffer", "IO_Finalize"};
Deike Kleberg's avatar
Deike Kleberg committed
23

24
25
26
long initial_buffersize = 16 * 1024 * 1024;
/*  4 KB <= x < 256 MB */ 
/* 16 * 1024 * 1024; */
27
/* 16 * 1024; */
28
/* 4 * 1024; */
Deike Kleberg's avatar
Deike Kleberg committed
29

Deike Kleberg's avatar
Deike Kleberg committed
30
int maxPtype = 4;
31
int maxNnodes  = 249;
32
33
int tagKey = 100;

Deike Kleberg's avatar
Deike Kleberg committed
34
35
36
37
38
39
40
41
42
43
double startTime;
double accumProbe   = 0.0;
double accumRecv    = 0.0;
double accumSend    = 0.0;
double accumSuspend = 0.0;
double accumWait    = 0.0;
double accumWrite   = 0.0;

pioInfo *pioinfo;

44
45
char *token = "%";

Deike Kleberg's avatar
Deike Kleberg committed
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
/*****************************************************************************/

void check_mpi ( int line, int iret )
{
  char error_string[MPI_MAX_ERROR_STRING+1];
  int len;

  if  ( iret != MPI_SUCCESS ) 
    {
      MPI_Error_string ( iret, error_string, &len ); 
      error_string[len] = '\0';
      fprintf ( stderr,"\nLine %8d MPI error %4d: %s\n\n", line, iret, 
		error_string ); 
    }

  return;
}

64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
/*****************************************************************************/

void check_mpi_status ( int line, MPI_Comm *comm, MPI_Status *status, int iret )
{
  char error_string[MPI_MAX_ERROR_STRING];
  int len;

  if ( iret == MPI_ERR_IN_STATUS )
    {
      switch ( status->MPI_ERROR )
	{
	  fprintf ( stdout, "------- checking error in request ----------\n" );
	case MPI_SUCCESS :
	  fprintf ( stdout, "-------- mpi_success -----------\n" );
	  break;
	case MPI_ERR_PENDING:
	  fprintf ( stdout, "-------- mpi_err_pending ----------\n");
	  break;
	default:
	  MPI_Error_string ( status->MPI_ERROR, error_string, &len );
	  fprintf ( stdout, "---------- error in request, error string: --------\n");
	  fprintf ( stdout, error_string );
	  MPI_Abort ( *comm, 1 );
	}
    }
  else 
    check_mpi ( line, iret );
  
  return;
}

Deike Kleberg's avatar
Deike Kleberg committed
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
/***************************************************************/     

void * xmalloc ( size_t size )
{
  void * value = malloc ( size );

  if ( value == NULL )
    {
      fprintf ( stderr, "ERROR: malloc didn't succeed, memory exausted\n" );
      perror ( "xmalloc" );
      MPI_Abort ( MPI_COMM_WORLD, 1 );
    }
  return value;
}

/***************************************************************/  
Deike Kleberg's avatar
Deike Kleberg committed
111

Deike Kleberg's avatar
Deike Kleberg committed
112
113
int setTag ( int ID, int sc )
{
114
  return ID * tagKey + sc;
Deike Kleberg's avatar
Deike Kleberg committed
115
116
117
118
119
120
121
122
}

/***************************************************************/

rTag * getTag ( int tag )
{
  rTag *rtag;

Deike Kleberg's avatar
Deike Kleberg committed
123
  rtag = ( rTag * ) xmalloc ( sizeof ( rTag ));
124
125
  rtag->id = tag / tagKey;
  rtag->command = tag % tagKey;
Deike Kleberg's avatar
Deike Kleberg committed
126
127
128
129
130
131

  return rtag;
}

/***************************************************************/

132
133
134
135
136
137
138
139
void ungetTag ( rTag *rtag )
{
  free ( rtag );
  return;
}

/***************************************************************/

Deike Kleberg's avatar
Deike Kleberg committed
140
141
142
143
144
145
size_t pioFileWrite ( int id, int tsId, const void *buffer, size_t len )
{
  size_t iret;

  switch ( pioinfo->type )
    {
146
147
    case PIO_MPI_NONB:
      iret = fwMPINONB ( id, tsId, buffer, len );
Deike Kleberg's avatar
Deike Kleberg committed
148
      break;
Deike Kleberg's avatar
Deike Kleberg committed
149
150
151
    case PIO_POSIX_ASYNCH:
      iret = fwPOSIXASYNCH ( id, tsId, buffer, len );
      break;
152
153
154
    case PIO_POSIX_FPGUARD_SENDRECV:
      iret = fwPOSIXFPGUARDSENDRECV ( id, tsId, buffer, len );
      break;
155
156
157
158
159
160
    case PIO_POSIX_FPGUARD_THREAD:
      iret = fwPOSIXFPGUARDTHREAD ( id, tsId, buffer, len );
      break;
    case PIO_POSIX_FPGUARD_THREAD_REFUSE:
      iret = fwPOSIXFPGUARDTHREADREFUSE ( id, tsId, buffer, len );
      break;
161
162
163
    case PIO_POSIX_NONB:
      iret = fwPOSIXNONB ( id, tsId, buffer, len );
      break;
Deike Kleberg's avatar
Deike Kleberg committed
164
165
166
167
168
169
170
171
172
173
174
175
176
    }

  return iret;
}

/***************************************************************/

int pioFileClose ( int id )
{
  int iret;
  
  switch ( pioinfo->type )
    {
177
178
    case PIO_MPI_NONB:
      iret = fcMPINONB ( id );
Deike Kleberg's avatar
Deike Kleberg committed
179
      break;
Deike Kleberg's avatar
Deike Kleberg committed
180
181
182
    case PIO_POSIX_ASYNCH:
      iret = fcPOSIXASYNCH ( id );
      break;
183
184
185
    case PIO_POSIX_FPGUARD_SENDRECV:
      iret = fcPOSIXFPGUARDSENDRECV ( id );
      break;
186
187
188
189
190
191
    case PIO_POSIX_FPGUARD_THREAD:
      iret = fcPOSIXFPGUARDTHREAD ( id );
      break;
    case PIO_POSIX_FPGUARD_THREAD_REFUSE:
      iret = fcPOSIXFPGUARDTHREADREFUSE ( id );
      break;
192
193
194
    case PIO_POSIX_NONB:
      iret = fcPOSIXNONB ( id );
      break;
Deike Kleberg's avatar
Deike Kleberg committed
195
196
197
198
199
200
201
202
203
204
205
206
207
    }

  return iret;
}

/***************************************************************/

int pioFileOpenW ( const char *filename )
{
  int iret;
  
  switch ( pioinfo->type )
    {
208
209
    case PIO_MPI_NONB:
      iret = fowMPINONB ( filename );
Deike Kleberg's avatar
Deike Kleberg committed
210
      break;
Deike Kleberg's avatar
Deike Kleberg committed
211
212
213
    case PIO_POSIX_ASYNCH:
      iret = fowPOSIXASYNCH ( filename );
      break;
214
215
216
    case PIO_POSIX_FPGUARD_SENDRECV:
      iret = fowPOSIXFPGUARDSENDRECV ( filename );
      break;
217
218
219
220
221
222
    case PIO_POSIX_FPGUARD_THREAD:
      iret = fowPOSIXFPGUARDTHREAD ( filename );
      break;
    case PIO_POSIX_FPGUARD_THREAD_REFUSE:
      iret = fowPOSIXFPGUARDTHREADREFUSE ( filename );
      break;
223
224
225
    case PIO_POSIX_NONB:
      iret = fowPOSIXNONB ( filename );
      break;
Deike Kleberg's avatar
Deike Kleberg committed
226
227
228
229
230
231
232
    }
  
  return iret;
}

/***************************************************************/

233
234
235
236
237
238
239
240
241
static int cmpr ( const void *a, const void *b ) 
{ 
  return strcmp ( *( char ** ) a, *( char ** ) b);
}

/***************************************************************/

void setPioCommunicator ( MPI_Comm *myComm, MPI_Comm commF2C, int *color, 
			  int nnodes, int * myComm2F )
Deike Kleberg's avatar
Deike Kleberg committed
242
{
243
244
  int size, rank, len, npes_node, key, test, i, j;

Deike Kleberg's avatar
Deike Kleberg committed
245
  char *myHost, **allHosts, *allHosts0, *curr;
246
247
  char hostname [ MPI_MAX_PROCESSOR_NAME + 1 ];

Deike Kleberg's avatar
Deike Kleberg committed
248
249
250
251
  check_mpi ( __LINE__, 
	    MPI_Comm_size ( commF2C, &size ));
  check_mpi ( __LINE__, 
	    MPI_Comm_rank ( commF2C, &rank ));
252

Deike Kleberg's avatar
Deike Kleberg committed
253
  myHost = ( char * ) xmalloc ( MPI_MAX_PROCESSOR_NAME * sizeof ( char ));
254

Deike Kleberg's avatar
Deike Kleberg committed
255
256
257
258
  memset ( myHost, 0, MPI_MAX_PROCESSOR_NAME * sizeof ( char ));

  check_mpi( __LINE__, 
	    MPI_Get_processor_name ( myHost, &len ));
259
260
261
262
263
264
265

  if ( ddebug )
    {
      strncpy ( hostname, myHost, len );
      hostname [ len ] = '\0';
      fprintf ( stdout, "pe%d: myHost = %s\n", rank,  hostname );
    }
Deike Kleberg's avatar
Deike Kleberg committed
266

Deike Kleberg's avatar
Deike Kleberg committed
267
268
269
270
271
  
  allHosts = ( char ** ) xmalloc ( size * sizeof ( char * ));
  allHosts[0] = ( char * ) xmalloc ( size * MPI_MAX_PROCESSOR_NAME * 
				     sizeof ( char ));
  allHosts0 = allHosts[0];
Deike Kleberg's avatar
Deike Kleberg committed
272

273
274
275
  for ( i = 1; i < size; i++ )
    allHosts[i] = allHosts[0] + i * MPI_MAX_PROCESSOR_NAME;
  
Deike Kleberg's avatar
Deike Kleberg committed
276
277
278
279
  check_mpi ( __LINE__, 
	    MPI_Allgather ( myHost, MPI_MAX_PROCESSOR_NAME, MPI_CHAR, 
			    & ( allHosts[0][0] ), MPI_MAX_PROCESSOR_NAME, 
			    MPI_CHAR, commF2C ));
280
281
282
283
284
285
286
287
288
289
290
291
292

  qsort ( allHosts, size, sizeof ( char * ), cmpr );

  *color = 0;
  i = 0;
  j = 0;
  while ( ++j <= nnodes )
    {
      curr = allHosts[i];
      if (( test = strcmp ( myHost, curr )) == 0 ) *color = j;
      while (( ++i < size ) && 
	     (( test = strcmp ( allHosts[i], curr )) == 0));;    
    } 
Deike Kleberg's avatar
Deike Kleberg committed
293

294
295
296
297
298
299
  if ( *color <= 0 || *color > nnodes )
    {
      fprintf ( stderr, "color not set correctly\n" );
      MPI_Abort ( commF2C, 1 );
    }
  
Deike Kleberg's avatar
Deike Kleberg committed
300
301
302
  npes_node = size/nnodes;
  key = rank % npes_node;

Deike Kleberg's avatar
Deike Kleberg committed
303
304
305
306
  check_mpi(__LINE__, 
	    MPI_Comm_split ( commF2C, *color, key, myComm));
  check_mpi(__LINE__, 
	    MPI_Errhandler_set ( *myComm, MPI_ERRORS_RETURN ));
307

Deike Kleberg's avatar
Deike Kleberg committed
308
  *myComm2F = MPI_COMM_NULL;
309
310
311
312
313
314
  if (( *myComm2F = MPI_Comm_c2f ( *myComm )) == 0 )
    {
      fprintf ( stderr, "mpi_comm_c2f didn't succeed\n");
      MPI_Abort ( commF2C , 1 );
    }

Deike Kleberg's avatar
Deike Kleberg committed
315
  free ( allHosts0 );
316
317
  free ( allHosts );
  free ( myHost );
Deike Kleberg's avatar
Deike Kleberg committed
318
319
320
321
322

  if ( ddebug )
    fprintf ( stdout, 
	      "pe%d in setPioCommunicator, color=%d, before return\n", 
	      rank, *color );
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
  
  return;
}

#endif

/***************************************************************/

int pioInit ( int ptype, int commF, int *color, int nnodes, int *pioComm, int *ncollectors )
{

  int collectingData = 1;

#ifndef NOMPI

  MPI_Comm comm;
Deike Kleberg's avatar
Deike Kleberg committed
339
340
341
  int size, rank;
  
  comm = MPI_COMM_NULL;
342

Deike Kleberg's avatar
Deike Kleberg committed
343
  if (( comm = MPI_Comm_f2c (( MPI_Fint ) commF )) == NULL )
344
    {
Deike Kleberg's avatar
Deike Kleberg committed
345
346
347
348
      fprintf ( stderr, 
		"2. arg of pioInit is no valid Fortran handle "
		"to a mpi communicator\n");
      MPI_Abort ( comm, 1 );
349
350
    }

Deike Kleberg's avatar
Deike Kleberg committed
351
352
353
354
355
  check_mpi (__LINE__, MPI_Comm_size ( comm, &size ));

  check_mpi (__LINE__, MPI_Comm_rank ( comm, &rank ));

  if ( ptype < 0 || ptype > maxPtype )
356
    {
Deike Kleberg's avatar
Deike Kleberg committed
357
358
359
      if ( rank == 0 )
	fprintf ( stderr, "1. arg of pioInit is no valid modus\n");
      MPI_Abort ( comm, 1 );
360
361
362
363
    }

  if ( nnodes < 1 || nnodes > maxNnodes )
    {
Deike Kleberg's avatar
Deike Kleberg committed
364
365
366
      if ( rank == 0 )
	fprintf ( stderr, "4. arg of pioInit is no valid no for nnodes\n");
      MPI_Abort ( comm, 1 );
367
368
    }

Deike Kleberg's avatar
Deike Kleberg committed
369
370
371
  if ( ptype == PIO_NONE && 
       ( size != 1 || nnodes != 1 ))
    {   
372
      if ( rank == 0 )
Deike Kleberg's avatar
Deike Kleberg committed
373
374
375
376
377
378
	fprintf ( stderr, 
		  "\n\npioInit(): combination of "
		  "arg(1) PTYPE=%d, arg(4) NNODES=%d and npe=%d is not valid.\n"
		  "Possible modus is:\n"						
		  "ptype = PIO_NONE (:=0), \t\t nnodes = 1,\t\t  npe = 1\n\n",
		  ptype, nnodes, size );
379
380
381
      MPI_Abort ( comm, 1 );
    }

Deike Kleberg's avatar
Deike Kleberg committed
382
  pioinfo = ( pioInfo * ) xmalloc ( sizeof ( pioInfo ));
383
384
385
386

  pioinfo->type = ptype; 
  pioinfo->collectorComm = MPI_COMM_NULL;
  setPioCommunicator ( & ( pioinfo->comm ), comm, color, nnodes, pioComm );
387
  pioinfo->color = *color;
Deike Kleberg's avatar
Deike Kleberg committed
388
389
390
391
392
  MPI_Comm_rank ( pioinfo->comm, &( pioinfo->rank ));
  MPI_Comm_size ( pioinfo->comm, &( pioinfo->size ));

  if ( ddebug && pioinfo->rank == 0 )
    fprintf ( stdout, 
Deike Kleberg's avatar
Deike Kleberg committed
393
394
	      "pe%d in pioDefPtype(), ptype=%d, initial_buffersize=%ld: "
	      "after init pioinfo ...\n", 
395
	      pioinfo->rank, pioinfo->type, initial_buffersize );
Deike Kleberg's avatar
Deike Kleberg committed
396
397
398

  switch ( pioinfo->type )
    {
399
400
    case PIO_MPI_NONB:
      collectingData = initMPINONB ( ncollectors );
Deike Kleberg's avatar
Deike Kleberg committed
401
      break;
Deike Kleberg's avatar
Deike Kleberg committed
402
403
404
    case PIO_POSIX_ASYNCH:
      collectingData = initPOSIXASYNCH ( ncollectors );
      break;
405
406
407
    case PIO_POSIX_FPGUARD_SENDRECV:
      collectingData = initPOSIXFPGUARDSENDRECV ( ncollectors );
      break;
408
409
410
411
412
413
    case PIO_POSIX_FPGUARD_THREAD:
      collectingData = initPOSIXFPGUARDTHREAD ( ncollectors );
      break;
    case PIO_POSIX_FPGUARD_THREAD_REFUSE:
      collectingData = initPOSIXFPGUARDTHREADREFUSE ( ncollectors );
      break;
Deike Kleberg's avatar
Deike Kleberg committed
414
    case PIO_POSIX_NONB:      
415
416
      collectingData = initPOSIXNONB ( ncollectors );
      break;
Deike Kleberg's avatar
Deike Kleberg committed
417
418
    }

419
#endif
Deike Kleberg's avatar
Deike Kleberg committed
420
421
422
423
  
  if ( ddebug )
    fprintf ( stdout, "pe in pioinit out\n" );
      
Deike Kleberg's avatar
Deike Kleberg committed
424
425
426
427
428
429
430
  return collectingData;
}

/***************************************************************/

void pioFinalize ()
{
431
#ifndef NOMPI
432
433
434
435
436
437
438
  switch ( pioinfo->type )
    {
    case PIO_POSIX_FPGUARD_THREAD:
      finalizePOSIXFPGUARDTHREAD ();
      break;
    }

439
440
441
442
443
444
  if ( pioinfo->collectorComm != MPI_COMM_NULL ) 
    MPI_Comm_free ( &( pioinfo->collectorComm ));
  
  if ( pioinfo->comm != MPI_COMM_NULL ) 
    MPI_Comm_free ( &( pioinfo->comm ));

Deike Kleberg's avatar
Deike Kleberg committed
445
  free ( pioinfo );
446
#endif
Deike Kleberg's avatar
Deike Kleberg committed
447
448
449
450

  return;
}