pio.c 8.33 KB
Newer Older
1
2
3
4
#ifdef HAVE_CONFIG_H
#  include "config.h"
#endif

Deike Kleberg's avatar
Deike Kleberg committed
5
6
7
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
Deike Kleberg's avatar
Deike Kleberg committed
8
#include <string.h>
9
#include <unistd.h>
Deike Kleberg's avatar
Deike Kleberg committed
10

11
#ifdef USE_MPI
Deike Kleberg's avatar
Deike Kleberg committed
12

Deike Kleberg's avatar
Deike Kleberg committed
13
14
15
#include "cdi.h"
#include "pio.h"
#include "pio_impl.h"
Deike Kleberg's avatar
Deike Kleberg committed
16
#include "pio_util.h"
Deike Kleberg's avatar
Deike Kleberg committed
17

Deike Kleberg's avatar
Deike Kleberg committed
18

Deike Kleberg's avatar
Deike Kleberg committed
19
20
#endif

21
#ifdef USE_MPI
Deike Kleberg's avatar
Deike Kleberg committed
22

23
char * command2charP[6] = {"IO_Open_file", "IO_Close_file",
24
25
			   "IO_Get_fp","IO_Set_fp",
			   "IO_Send_buffer", "IO_Finalize"};
Deike Kleberg's avatar
Deike Kleberg committed
26

27
long initial_buffersize = 16 * 1024 * 1024;
28
/*  4 KB <= x < 256 MB */ 
29
/* 16 * 1024 * 1024; */
30
/* 16 * 1024; */
31
/* 4 * 1024; */
Deike Kleberg's avatar
Deike Kleberg committed
32

Deike Kleberg's avatar
Deike Kleberg committed
33
int maxPtype = 4;
34
int maxNnodes  = 249;
35
int tagKey = 100;
36
int maxErrorString = 100;
37

Deike Kleberg's avatar
Deike Kleberg committed
38
39
40
41
42
43
44
45
46
47
double startTime;
double accumProbe   = 0.0;
double accumRecv    = 0.0;
double accumSend    = 0.0;
double accumSuspend = 0.0;
double accumWait    = 0.0;
double accumWrite   = 0.0;

pioInfo *pioinfo;

48
49
char *token = "%";

50
/***************************************************************/  
Deike Kleberg's avatar
Deike Kleberg committed
51

Deike Kleberg's avatar
Deike Kleberg committed
52
53
int setTag ( int ID, int sc )
{
54
  return ID * tagKey + sc;
Deike Kleberg's avatar
Deike Kleberg committed
55
56
57
58
59
60
61
62
}

/***************************************************************/

rTag * getTag ( int tag )
{
  rTag *rtag;

Deike Kleberg's avatar
Deike Kleberg committed
63
  rtag = ( rTag * ) xmalloc ( sizeof ( rTag ));
64
65
  rtag->id = tag / tagKey;
  rtag->command = tag % tagKey;
Deike Kleberg's avatar
Deike Kleberg committed
66
67
68
69
70
71

  return rtag;
}

/***************************************************************/

72
73
74
75
76
77
78
79
void ungetTag ( rTag *rtag )
{
  free ( rtag );
  return;
}

/***************************************************************/

Deike Kleberg's avatar
Deike Kleberg committed
80
81
82
83
84
85
size_t pioFileWrite ( int id, int tsId, const void *buffer, size_t len )
{
  size_t iret;

  switch ( pioinfo->type )
    {
86
87
    case PIO_MPI_NONB:
      iret = fwMPINONB ( id, tsId, buffer, len );
Deike Kleberg's avatar
Deike Kleberg committed
88
      break;
Deike Kleberg's avatar
Deike Kleberg committed
89
#ifndef _SX
Deike Kleberg's avatar
Deike Kleberg committed
90
91
92
    case PIO_POSIX_ASYNCH:
      iret = fwPOSIXASYNCH ( id, tsId, buffer, len );
      break;
Deike Kleberg's avatar
Deike Kleberg committed
93
#endif
94
95
96
    case PIO_POSIX_FPGUARD_SENDRECV:
      iret = fwPOSIXFPGUARDSENDRECV ( id, tsId, buffer, len );
      break;
97
98
99
100
101
102
    case PIO_POSIX_FPGUARD_THREAD:
      iret = fwPOSIXFPGUARDTHREAD ( id, tsId, buffer, len );
      break;
    case PIO_POSIX_FPGUARD_THREAD_REFUSE:
      iret = fwPOSIXFPGUARDTHREADREFUSE ( id, tsId, buffer, len );
      break;
103
104
105
    case PIO_POSIX_NONB:
      iret = fwPOSIXNONB ( id, tsId, buffer, len );
      break;
Deike Kleberg's avatar
Deike Kleberg committed
106
107
108
109
110
111
112
113
114
115
    }

  return iret;
}

/***************************************************************/

int pioFileClose ( int id )
{
  int iret;
116
  
Deike Kleberg's avatar
Deike Kleberg committed
117
118
  switch ( pioinfo->type )
    {
119
120
    case PIO_MPI_NONB:
      iret = fcMPINONB ( id );
Deike Kleberg's avatar
Deike Kleberg committed
121
      break;
Deike Kleberg's avatar
Deike Kleberg committed
122
#ifndef _SX
Deike Kleberg's avatar
Deike Kleberg committed
123
124
125
    case PIO_POSIX_ASYNCH:
      iret = fcPOSIXASYNCH ( id );
      break;
Deike Kleberg's avatar
Deike Kleberg committed
126
#endif
127
128
129
    case PIO_POSIX_FPGUARD_SENDRECV:
      iret = fcPOSIXFPGUARDSENDRECV ( id );
      break;
130
131
132
133
134
135
    case PIO_POSIX_FPGUARD_THREAD:
      iret = fcPOSIXFPGUARDTHREAD ( id );
      break;
    case PIO_POSIX_FPGUARD_THREAD_REFUSE:
      iret = fcPOSIXFPGUARDTHREADREFUSE ( id );
      break;
136
137
138
    case PIO_POSIX_NONB:
      iret = fcPOSIXNONB ( id );
      break;
Deike Kleberg's avatar
Deike Kleberg committed
139
140
141
142
143
144
145
146
147
148
    }

  return iret;
}

/***************************************************************/

int pioFileOpenW ( const char *filename )
{
  int iret;
149
  
Deike Kleberg's avatar
Deike Kleberg committed
150
151
  switch ( pioinfo->type )
    {
152
153
    case PIO_MPI_NONB:
      iret = fowMPINONB ( filename );
Deike Kleberg's avatar
Deike Kleberg committed
154
      break;
Deike Kleberg's avatar
Deike Kleberg committed
155
#ifndef _SX
Deike Kleberg's avatar
Deike Kleberg committed
156
157
158
    case PIO_POSIX_ASYNCH:
      iret = fowPOSIXASYNCH ( filename );
      break;
Deike Kleberg's avatar
Deike Kleberg committed
159
#endif
160
161
162
    case PIO_POSIX_FPGUARD_SENDRECV:
      iret = fowPOSIXFPGUARDSENDRECV ( filename );
      break;
163
164
165
166
167
168
    case PIO_POSIX_FPGUARD_THREAD:
      iret = fowPOSIXFPGUARDTHREAD ( filename );
      break;
    case PIO_POSIX_FPGUARD_THREAD_REFUSE:
      iret = fowPOSIXFPGUARDTHREADREFUSE ( filename );
      break;
169
170
171
    case PIO_POSIX_NONB:
      iret = fowPOSIXNONB ( filename );
      break;
Deike Kleberg's avatar
Deike Kleberg committed
172
    }
173
  
Deike Kleberg's avatar
Deike Kleberg committed
174
175
176
177
178
  return iret;
}

/***************************************************************/

179
180
static int cmpr ( const void *a, const void *b ) 
{ 
181
182
183
184
185
  return strcmp ( *( char ** ) a, *( char ** ) b);
}

/***************************************************************/

186
187
void setPioCommunicator ( MPI_Comm *myComm, MPI_Comm commF2C, int *color, 
			  int *nnodes )
Deike Kleberg's avatar
Deike Kleberg committed
188
{
189
190
  int size, rank, len, npes_node, key, test, i, j;

Deike Kleberg's avatar
Deike Kleberg committed
191
  char *myHost, **allHosts, *allHosts0, *curr;
192
193
  char hostname [ MPI_MAX_PROCESSOR_NAME + 1 ];

194
195
  MPI_Comm_size ( commF2C, &size );
  MPI_Comm_rank ( commF2C, &rank );
196

Deike Kleberg's avatar
Deike Kleberg committed
197
  myHost = ( char * ) xmalloc ( MPI_MAX_PROCESSOR_NAME * sizeof ( char ));
198
  
Deike Kleberg's avatar
Deike Kleberg committed
199
  memset ( myHost, 0, MPI_MAX_PROCESSOR_NAME * sizeof ( char ));
200
  
Deike Kleberg's avatar
Deike Kleberg committed
201
  xmpi ( MPI_Get_processor_name ( myHost, &len ));
Deike Kleberg's avatar
Deike Kleberg committed
202

203
  if ( myHost[0] == '\0' ) 
Deike Kleberg's avatar
Deike Kleberg committed
204
    pcdiAbort ( "did not succeed to set hostname", __FILE__, __LINE__ , rank);
Deike Kleberg's avatar
Deike Kleberg committed
205

Deike Kleberg's avatar
Deike Kleberg committed
206
  if ( ddebug == MAXDEBUG )
207
208
209
    {
      strncpy ( hostname, myHost, len );
      hostname [ len ] = '\0';
Deike Kleberg's avatar
Deike Kleberg committed
210
      fprintf ( stderr, "pe%d: myHost = %s\n", rank,  hostname );
211
    }
212
  
Deike Kleberg's avatar
Deike Kleberg committed
213
  allHosts = ( char ** ) xmalloc ( size * sizeof ( char * ));
214
215
  allHosts[0] = ( char * ) xmalloc ( size * MPI_MAX_PROCESSOR_NAME * 
				     sizeof ( char ));
Deike Kleberg's avatar
Deike Kleberg committed
216
  allHosts0 = allHosts[0];
Deike Kleberg's avatar
Deike Kleberg committed
217

218
219
  for ( i = 1; i < size; i++ )
    allHosts[i] = allHosts[0] + i * MPI_MAX_PROCESSOR_NAME;
220

221
222
223
  MPI_Allgather ( myHost, MPI_MAX_PROCESSOR_NAME, MPI_CHAR, 
			    & ( allHosts[0][0] ), MPI_MAX_PROCESSOR_NAME, 
			    MPI_CHAR, commF2C );
224
225
226
227
228
229

  qsort ( allHosts, size, sizeof ( char * ), cmpr );

  *color = 0;
  i = 0;
  j = 0;
230

231
  while ( i < size )
232
233
    {
      curr = allHosts[i];
234
      j++;
235
      if (( test = strcmp ( myHost, curr )) == 0 ) *color = j;
236
     
237
      while ( ++i < size )
238
239
	if (( test = strcmp ( allHosts[i], curr )) != 0)
	  break;
240
    }
Deike Kleberg's avatar
Deike Kleberg committed
241

242
  *nnodes = j;
243
      
Deike Kleberg's avatar
Deike Kleberg committed
244
  if ( *color == 0 ) pcdiAbort ( "Color is not set", __FILE__, __LINE__, rank );
245
  
246
  npes_node = size / ( * nnodes );
Deike Kleberg's avatar
Deike Kleberg committed
247
248
  key = rank % npes_node;

Deike Kleberg's avatar
Deike Kleberg committed
249
  xmpi ( MPI_Comm_split ( commF2C, *color, key, myComm));
250

Deike Kleberg's avatar
Deike Kleberg committed
251
  free ( allHosts0 );
252
253
  free ( allHosts );
  free ( myHost );
Deike Kleberg's avatar
Deike Kleberg committed
254

Deike Kleberg's avatar
Deike Kleberg committed
255
  if ( ddebug == MAXDEBUG )
256
257
258
259
    fprintf ( stderr, 
	      "pe%d in setPioCommunicator, color=%d, before return\n", 
	      rank, *color );
  
260
261
262
263
264
265
266
  return;
}

#endif

/***************************************************************/

Deike Kleberg's avatar
Deike Kleberg committed
267
MPI_Comm bInit ( int ptype, MPI_Comm comm, int *color, int *nnodes )
268
{
Deike Kleberg's avatar
Deike Kleberg committed
269
  char fnName[] = "bInit()";
Deike Kleberg's avatar
Deike Kleberg committed
270
  int size, rank;
271
  int collectingData = 1;
272
273
  *nnodes = 1;
  *color = 1;
274

Deike Kleberg's avatar
Deike Kleberg committed
275
  if ( ddebug == MAXDEBUG )
Deike Kleberg's avatar
Deike Kleberg committed
276
    myDebug ( __FILE__, fnName, __LINE__ );
Deike Kleberg's avatar
Deike Kleberg committed
277

Deike Kleberg's avatar
Deike Kleberg committed
278
#ifdef USE_MPI
279
280
  MPI_Comm_size ( comm, &size );
  MPI_Comm_rank ( comm, &rank );
281
282
  
  if ( ptype < 0 || ptype > maxPtype ) 
Deike Kleberg's avatar
Deike Kleberg committed
283
    pcdiAbort ( "PIOTYPE is no valid modus", __FILE__, __LINE__, rank );
Deike Kleberg's avatar
Deike Kleberg committed
284
285
286

#ifdef _SX
  if ( ptype ==  PIO_POSIX_ASYNCH )
Deike Kleberg's avatar
Deike Kleberg committed
287
    pcdiAbort ( "PIO_POSIX_ASYNCH does not work on SX", __FILE__, __LINE__, rank );
Deike Kleberg's avatar
Deike Kleberg committed
288
#endif
289
  
290
  pioinfo = ( pioInfo * ) xmalloc ( sizeof ( pioInfo ));
291
292
  
  pioinfo->type = ptype; 
Deike Kleberg's avatar
Deike Kleberg committed
293

294
  setPioCommunicator ( & ( pioinfo->comm ), comm, color, nnodes );
295
  
296
  if (( pioinfo->type == PIO_NONE ) && ( *nnodes != size ))
Deike Kleberg's avatar
Deike Kleberg committed
297
    pcdiAbort ( "PIOTYPE, NNODES: not a valid combination", __FILE__, __LINE__, rank );
298
  
299
  pioinfo->color = *color;
Deike Kleberg's avatar
Deike Kleberg committed
300
301
  MPI_Comm_rank ( pioinfo->comm, &( pioinfo->rank ));
  MPI_Comm_size ( pioinfo->comm, &( pioinfo->size ));
302
  
Deike Kleberg's avatar
Deike Kleberg committed
303
  if ( ddebug == MAXDEBUG && pioinfo->rank == 0 )
304
    fprintf ( stderr, 
Deike Kleberg's avatar
Deike Kleberg committed
305
	      "pe%d in bInit(), ptype=%d, initial_buffersize=%ld: "
306
307
308
	      "after init pioinfo ...\n", 
	      pioinfo->rank, pioinfo->type, initial_buffersize );
  
309
  pioinfo->collectorComm = MPI_COMM_NULL;
Deike Kleberg's avatar
Deike Kleberg committed
310

Deike Kleberg's avatar
Deike Kleberg committed
311
312
  switch ( pioinfo->type )
    {
313
314
    case PIO_NONE:
      MPI_Comm_dup ( pioinfo->comm, &( pioinfo->collectorComm ));
315
      collectingData = 1;
316
      break;
317
    case PIO_MPI_NONB:
318
      collectingData = initMPINONB ();
Deike Kleberg's avatar
Deike Kleberg committed
319
      break;
Deike Kleberg's avatar
Deike Kleberg committed
320
#ifndef _SX
Deike Kleberg's avatar
Deike Kleberg committed
321
    case PIO_POSIX_ASYNCH:
322
      collectingData = initPOSIXASYNCH ();
Deike Kleberg's avatar
Deike Kleberg committed
323
      break;
Deike Kleberg's avatar
Deike Kleberg committed
324
#endif
325
    case PIO_POSIX_FPGUARD_SENDRECV:
326
      collectingData = initPOSIXFPGUARDSENDRECV ();
327
      break;
328
    case PIO_POSIX_FPGUARD_THREAD:
329
      collectingData = initPOSIXFPGUARDTHREAD ();
330
331
      break;
    case PIO_POSIX_FPGUARD_THREAD_REFUSE:
332
      collectingData = initPOSIXFPGUARDTHREADREFUSE ();
333
      break;
Deike Kleberg's avatar
Deike Kleberg committed
334
    case PIO_POSIX_NONB:
335
      collectingData = initPOSIXNONB ();
336
      break;
Deike Kleberg's avatar
Deike Kleberg committed
337
    }
338
339
#endif
  
Deike Kleberg's avatar
Deike Kleberg committed
340
  if ( ddebug == MAXDEBUG )
Deike Kleberg's avatar
Deike Kleberg committed
341
    fprintf ( stderr, "pe in bInit out\n" );
342
  
Deike Kleberg's avatar
Deike Kleberg committed
343
  return pioinfo->collectorComm;
Deike Kleberg's avatar
Deike Kleberg committed
344
345
346
347
}

/***************************************************************/

Deike Kleberg's avatar
Deike Kleberg committed
348
void bFinalize ()
Deike Kleberg's avatar
Deike Kleberg committed
349
{
350
#ifdef USE_MPI
351
352
353
354
355
356
357
  switch ( pioinfo->type )
    {
    case PIO_POSIX_FPGUARD_THREAD:
      finalizePOSIXFPGUARDTHREAD ();
      break;
    }

358
  if ( pioinfo->collectorComm != MPI_COMM_NULL ) 
359
    MPI_Comm_free ( &( pioinfo->collectorComm ));
360
361
  
  if ( pioinfo->comm != MPI_COMM_NULL ) 
362
363
    MPI_Comm_free ( &( pioinfo->comm ));

Deike Kleberg's avatar
Deike Kleberg committed
364
  free ( pioinfo );
365
#endif
Deike Kleberg's avatar
Deike Kleberg committed
366
367
368

  return;
}
369