pio.c 8.51 KB
Newer Older
1
2
3
4
#ifdef HAVE_CONFIG_H
#  include "config.h"
#endif

Deike Kleberg's avatar
Deike Kleberg committed
5
6
7
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
Deike Kleberg's avatar
Deike Kleberg committed
8
#include <string.h>
9
#include <unistd.h>
Deike Kleberg's avatar
Deike Kleberg committed
10

Thomas Jahns's avatar
Thomas Jahns committed
11
#include "cdi.h"
12
13
14
#include "pio.h"
#include "pio_util.h"

15
#ifdef USE_MPI
Deike Kleberg's avatar
Deike Kleberg committed
16

Deike Kleberg's avatar
Deike Kleberg committed
17
18
#include "cdi.h"
#include "pio_impl.h"
Deike Kleberg's avatar
Deike Kleberg committed
19

Deike Kleberg's avatar
Deike Kleberg committed
20

Deike Kleberg's avatar
Deike Kleberg committed
21
22
#endif

23
#ifdef USE_MPI
Deike Kleberg's avatar
Deike Kleberg committed
24

25
char * command2charP[6] = {"IO_Open_file", "IO_Close_file",
26
27
                           "IO_Get_fp","IO_Set_fp",
                           "IO_Send_buffer", "IO_Finalize"};
Deike Kleberg's avatar
Deike Kleberg committed
28

29
long initial_buffersize = 16 * 1024 * 1024;
30
/*  4 KB <= x < 256 MB */
31
/* 16 * 1024 * 1024; */
32
/* 16 * 1024; */
33
/* 4 * 1024; */
Deike Kleberg's avatar
Deike Kleberg committed
34

Deike Kleberg's avatar
Deike Kleberg committed
35
int maxPtype = 4;
36
int maxNnodes  = 249;
37
int tagKey = 100;
38
int maxErrorString = 100;
39

Deike Kleberg's avatar
Deike Kleberg committed
40
41
42
43
44
45
46
47
48
49
double startTime;
double accumProbe   = 0.0;
double accumRecv    = 0.0;
double accumSend    = 0.0;
double accumSuspend = 0.0;
double accumWait    = 0.0;
double accumWrite   = 0.0;

pioInfo *pioinfo;

50
51
char *token = "%";

52
/***************************************************************/
Deike Kleberg's avatar
Deike Kleberg committed
53

Deike Kleberg's avatar
Deike Kleberg committed
54
55
int setTag ( int ID, int sc )
{
56
  return ID * tagKey + sc;
Deike Kleberg's avatar
Deike Kleberg committed
57
58
59
60
61
62
63
64
}

/***************************************************************/

rTag * getTag ( int tag )
{
  rTag *rtag;

Deike Kleberg's avatar
Deike Kleberg committed
65
  rtag = ( rTag * ) xmalloc ( sizeof ( rTag ));
66
67
  rtag->id = tag / tagKey;
  rtag->command = tag % tagKey;
Deike Kleberg's avatar
Deike Kleberg committed
68
69
70
71
72
73

  return rtag;
}

/***************************************************************/

74
75
76
77
78
79
80
81
void ungetTag ( rTag *rtag )
{
  free ( rtag );
  return;
}

/***************************************************************/

Deike Kleberg's avatar
Deike Kleberg committed
82
83
size_t pioFileWrite ( int id, int tsId, const void *buffer, size_t len )
{
Thomas Jahns's avatar
Thomas Jahns committed
84
  size_t iret = CDI_UNDEFID;
Deike Kleberg's avatar
Deike Kleberg committed
85
86
87

  switch ( pioinfo->type )
    {
88
89
    case PIO_MPI_NONB:
      iret = fwMPINONB ( id, tsId, buffer, len );
Deike Kleberg's avatar
Deike Kleberg committed
90
      break;
Deike Kleberg's avatar
Deike Kleberg committed
91
#ifndef _SX
Deike Kleberg's avatar
Deike Kleberg committed
92
93
94
    case PIO_POSIX_ASYNCH:
      iret = fwPOSIXASYNCH ( id, tsId, buffer, len );
      break;
Deike Kleberg's avatar
Deike Kleberg committed
95
#endif
96
97
98
    case PIO_POSIX_FPGUARD_SENDRECV:
      iret = fwPOSIXFPGUARDSENDRECV ( id, tsId, buffer, len );
      break;
99
100
101
102
103
104
    case PIO_POSIX_FPGUARD_THREAD:
      iret = fwPOSIXFPGUARDTHREAD ( id, tsId, buffer, len );
      break;
    case PIO_POSIX_FPGUARD_THREAD_REFUSE:
      iret = fwPOSIXFPGUARDTHREADREFUSE ( id, tsId, buffer, len );
      break;
105
106
107
    case PIO_POSIX_NONB:
      iret = fwPOSIXNONB ( id, tsId, buffer, len );
      break;
Deike Kleberg's avatar
Deike Kleberg committed
108
109
110
111
112
113
114
115
116
    }

  return iret;
}

/***************************************************************/

int pioFileClose ( int id )
{
Thomas Jahns's avatar
Thomas Jahns committed
117
  int iret = CDI_UNDEFID;
118

Deike Kleberg's avatar
Deike Kleberg committed
119
120
  switch ( pioinfo->type )
    {
121
122
    case PIO_MPI_NONB:
      iret = fcMPINONB ( id );
Deike Kleberg's avatar
Deike Kleberg committed
123
      break;
Deike Kleberg's avatar
Deike Kleberg committed
124
#ifndef _SX
Deike Kleberg's avatar
Deike Kleberg committed
125
126
127
    case PIO_POSIX_ASYNCH:
      iret = fcPOSIXASYNCH ( id );
      break;
Deike Kleberg's avatar
Deike Kleberg committed
128
#endif
129
130
131
    case PIO_POSIX_FPGUARD_SENDRECV:
      iret = fcPOSIXFPGUARDSENDRECV ( id );
      break;
132
133
134
135
136
137
    case PIO_POSIX_FPGUARD_THREAD:
      iret = fcPOSIXFPGUARDTHREAD ( id );
      break;
    case PIO_POSIX_FPGUARD_THREAD_REFUSE:
      iret = fcPOSIXFPGUARDTHREADREFUSE ( id );
      break;
138
139
140
    case PIO_POSIX_NONB:
      iret = fcPOSIXNONB ( id );
      break;
Deike Kleberg's avatar
Deike Kleberg committed
141
142
143
144
145
146
147
148
149
    }

  return iret;
}

/***************************************************************/

int pioFileOpenW ( const char *filename )
{
Thomas Jahns's avatar
Thomas Jahns committed
150
  int iret = CDI_UNDEFID;
151

Deike Kleberg's avatar
Deike Kleberg committed
152
153
  switch ( pioinfo->type )
    {
154
155
    case PIO_MPI_NONB:
      iret = fowMPINONB ( filename );
Deike Kleberg's avatar
Deike Kleberg committed
156
      break;
Deike Kleberg's avatar
Deike Kleberg committed
157
#ifndef _SX
Deike Kleberg's avatar
Deike Kleberg committed
158
159
160
    case PIO_POSIX_ASYNCH:
      iret = fowPOSIXASYNCH ( filename );
      break;
Deike Kleberg's avatar
Deike Kleberg committed
161
#endif
162
163
164
    case PIO_POSIX_FPGUARD_SENDRECV:
      iret = fowPOSIXFPGUARDSENDRECV ( filename );
      break;
165
166
167
168
169
170
    case PIO_POSIX_FPGUARD_THREAD:
      iret = fowPOSIXFPGUARDTHREAD ( filename );
      break;
    case PIO_POSIX_FPGUARD_THREAD_REFUSE:
      iret = fowPOSIXFPGUARDTHREADREFUSE ( filename );
      break;
171
172
173
    case PIO_POSIX_NONB:
      iret = fowPOSIXNONB ( filename );
      break;
Deike Kleberg's avatar
Deike Kleberg committed
174
    }
175

Deike Kleberg's avatar
Deike Kleberg committed
176
177
178
179
180
  return iret;
}

/***************************************************************/

181
182
static int cmpr ( const void *a, const void *b )
{
183
184
185
186
187
  return strcmp ( *( char ** ) a, *( char ** ) b);
}

/***************************************************************/

188
189
void setPioCommunicator ( MPI_Comm *myComm, MPI_Comm commF2C, int *color,
                          int *nnodes )
Deike Kleberg's avatar
Deike Kleberg committed
190
{
191
192
  int size, rank, len, npes_node, key, test, i, j;

Deike Kleberg's avatar
Deike Kleberg committed
193
  char *myHost, **allHosts, *allHosts0, *curr;
194
195
  char hostname [ MPI_MAX_PROCESSOR_NAME + 1 ];

196
197
  MPI_Comm_size ( commF2C, &size );
  MPI_Comm_rank ( commF2C, &rank );
198

Deike Kleberg's avatar
Deike Kleberg committed
199
  myHost = ( char * ) xmalloc ( MPI_MAX_PROCESSOR_NAME * sizeof ( char ));
200

Deike Kleberg's avatar
Deike Kleberg committed
201
  memset ( myHost, 0, MPI_MAX_PROCESSOR_NAME * sizeof ( char ));
202

Deike Kleberg's avatar
Deike Kleberg committed
203
  xmpi ( MPI_Get_processor_name ( myHost, &len ));
Deike Kleberg's avatar
Deike Kleberg committed
204

205
  if ( myHost[0] == '\0' ) xabort ( "did not succeed to set hostname" );
Deike Kleberg's avatar
Deike Kleberg committed
206

Deike Kleberg's avatar
Deike Kleberg committed
207
  if ( ddebug == MAXDEBUG )
208
209
210
    {
      strncpy ( hostname, myHost, len );
      hostname [ len ] = '\0';
Deike Kleberg's avatar
Deike Kleberg committed
211
      xdebug ( "myHost = %s\n", hostname );
212
    }
213

Deike Kleberg's avatar
Deike Kleberg committed
214
  allHosts = ( char ** ) xmalloc ( size * sizeof ( char * ));
215
216
  allHosts[0] = ( char * ) xmalloc ( size * MPI_MAX_PROCESSOR_NAME *
                                     sizeof ( char ));
Deike Kleberg's avatar
Deike Kleberg committed
217
  allHosts0 = allHosts[0];
Deike Kleberg's avatar
Deike Kleberg committed
218

219
220
  for ( i = 1; i < size; i++ )
    allHosts[i] = allHosts[0] + i * MPI_MAX_PROCESSOR_NAME;
221

222
223
224
  MPI_Allgather ( myHost, MPI_MAX_PROCESSOR_NAME, MPI_CHAR,
                            & ( allHosts[0][0] ), MPI_MAX_PROCESSOR_NAME,
                            MPI_CHAR, commF2C );
225
226
227
228
229
230

  qsort ( allHosts, size, sizeof ( char * ), cmpr );

  *color = 0;
  i = 0;
  j = 0;
231

232
  while ( i < size )
233
234
    {
      curr = allHosts[i];
235
      j++;
236
      if (( test = strcmp ( myHost, curr )) == 0 ) *color = j;
237

238
      while ( ++i < size )
239
240
        if (( test = strcmp ( allHosts[i], curr )) != 0)
          break;
241
    }
Deike Kleberg's avatar
Deike Kleberg committed
242

243
  *nnodes = j;
244

245
  if ( *color == 0 ) xabort ( "Color is not set" );
246

247
  npes_node = size / ( * nnodes );
Deike Kleberg's avatar
Deike Kleberg committed
248
249
  key = rank % npes_node;

Deike Kleberg's avatar
Deike Kleberg committed
250
  xmpi ( MPI_Comm_split ( commF2C, *color, key, myComm));
251

Deike Kleberg's avatar
Deike Kleberg committed
252
  free ( allHosts0 );
253
254
  free ( allHosts );
  free ( myHost );
Deike Kleberg's avatar
Deike Kleberg committed
255

Deike Kleberg's avatar
Deike Kleberg committed
256
  if ( ddebug == MAXDEBUG )
257
258
259
260
    fprintf ( stderr,
              "pe%d in setPioCommunicator, color=%d, before return\n",
              rank, *color );

261
262
263
264
265
266
267
  return;
}

#endif

/***************************************************************/

Deike Kleberg's avatar
Deike Kleberg committed
268
MPI_Comm bInit ( int ptype, MPI_Comm comm, int *color, int *nnodes )
269
{
Deike Kleberg's avatar
Deike Kleberg committed
270
  int size, rank;
271
  int collectingData = 1;
272
273
  *nnodes = 1;
  *color = 1;
274

Deike Kleberg's avatar
Deike Kleberg committed
275
  if ( ddebug == MAXDEBUG ) xdebug ();
Deike Kleberg's avatar
Deike Kleberg committed
276
#ifdef USE_MPI
277
278
  MPI_Comm_size ( comm, &size );
  MPI_Comm_rank ( comm, &rank );
279
280

  if ( ptype < 0 || ptype > maxPtype )
281
    xabort ( "PIOTYPE is no valid modus" );
Deike Kleberg's avatar
Deike Kleberg committed
282
283
284

#ifdef _SX
  if ( ptype ==  PIO_POSIX_ASYNCH )
285
    xabort ( "PIO_POSIX_ASYNCH does not work on SX" );
Deike Kleberg's avatar
Deike Kleberg committed
286
#endif
287

288
  pioinfo = ( pioInfo * ) xmalloc ( sizeof ( pioInfo ));
289

Deike Kleberg's avatar
Deike Kleberg committed
290
291
  xdebug ();

292
  pioinfo->type = ptype;
Deike Kleberg's avatar
Deike Kleberg committed
293

294
  setPioCommunicator ( & ( pioinfo->comm ), comm, color, nnodes );
295

296
  if (( pioinfo->type == PIO_NONE ) && ( *nnodes != size ))
297
    xabort ( "PIOTYPE, NNODES: not a valid combination" );
298

299
  pioinfo->color = *color;
Deike Kleberg's avatar
Deike Kleberg committed
300
301

  xdebug ();
Deike Kleberg's avatar
Deike Kleberg committed
302
303
  MPI_Comm_rank ( pioinfo->comm, &( pioinfo->rank ));
  MPI_Comm_size ( pioinfo->comm, &( pioinfo->size ));
304

Deike Kleberg's avatar
Deike Kleberg committed
305
306
307
308
309
  xdebug ();

  // if ( ddebug == MAXDEBUG && pioinfo->rank == 0 )
  //fprintf ( stderr,
	      xdebug(
310
311
312
313
              "pe%d in bInit(), ptype=%d, initial_buffersize=%ld: "
              "after init pioinfo ...\n",
              pioinfo->rank, pioinfo->type, initial_buffersize );

314
  pioinfo->collectorComm = MPI_COMM_NULL;
Deike Kleberg's avatar
Deike Kleberg committed
315

Deike Kleberg's avatar
Deike Kleberg committed
316
317
  xdebug ();

Deike Kleberg's avatar
Deike Kleberg committed
318
319
  switch ( pioinfo->type )
    {
320
321
    case PIO_NONE:
      MPI_Comm_dup ( pioinfo->comm, &( pioinfo->collectorComm ));
322
      collectingData = 1;
323
      break;
324
    case PIO_MPI_NONB:
325
      collectingData = initMPINONB ();
Deike Kleberg's avatar
Deike Kleberg committed
326
      break;
Deike Kleberg's avatar
Deike Kleberg committed
327
#ifndef _SX
Deike Kleberg's avatar
Deike Kleberg committed
328
    case PIO_POSIX_ASYNCH:
329
      collectingData = initPOSIXASYNCH ();
Deike Kleberg's avatar
Deike Kleberg committed
330
      break;
Deike Kleberg's avatar
Deike Kleberg committed
331
#endif
332
    case PIO_POSIX_FPGUARD_SENDRECV:
333
      collectingData = initPOSIXFPGUARDSENDRECV ();
334
      break;
335
    case PIO_POSIX_FPGUARD_THREAD:
336
      collectingData = initPOSIXFPGUARDTHREAD ();
337
338
      break;
    case PIO_POSIX_FPGUARD_THREAD_REFUSE:
339
      collectingData = initPOSIXFPGUARDTHREADREFUSE ();
340
      break;
Deike Kleberg's avatar
Deike Kleberg committed
341
    case PIO_POSIX_NONB:
342
      collectingData = initPOSIXNONB ();
343
      break;
Deike Kleberg's avatar
Deike Kleberg committed
344
    }
345
#endif
346

Deike Kleberg's avatar
Deike Kleberg committed
347
  xdebug ("bInit out\n" );
348
349

#ifdef USE_MPI
Deike Kleberg's avatar
Deike Kleberg committed
350
  return pioinfo->collectorComm;
351
352
353
#else
  return 0;
#endif
Deike Kleberg's avatar
Deike Kleberg committed
354
355
356
357
}

/***************************************************************/

Deike Kleberg's avatar
Deike Kleberg committed
358
void bFinalize ()
Deike Kleberg's avatar
Deike Kleberg committed
359
{
360
#ifdef USE_MPI
361
362
363
364
365
366
367
  switch ( pioinfo->type )
    {
    case PIO_POSIX_FPGUARD_THREAD:
      finalizePOSIXFPGUARDTHREAD ();
      break;
    }

368
  if ( pioinfo->collectorComm != MPI_COMM_NULL )
369
    MPI_Comm_free ( &( pioinfo->collectorComm ));
370
371

  if ( pioinfo->comm != MPI_COMM_NULL )
372
373
    MPI_Comm_free ( &( pioinfo->comm ));

Deike Kleberg's avatar
Deike Kleberg committed
374
  free ( pioinfo );
375
#endif
Deike Kleberg's avatar
Deike Kleberg committed
376
377
378

  return;
}
379
380
381
382
383
384
385
386
387
/*
 * Local Variables:
 * c-file-style: "Java"
 * c-basic-offset: 2
 * indent-tabs-mode: nil
 * show-trailing-whitespace: t
 * require-trailing-newline: t
 * End:
 */