pio.c 8.25 KB
Newer Older
1
#ifdef HAVE_CONFIG_H
Deike Kleberg's avatar
Deike Kleberg committed
2
#include "config.h"
3
4
#endif

Deike Kleberg's avatar
Deike Kleberg committed
5
6
7
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
Deike Kleberg's avatar
Deike Kleberg committed
8
#include <string.h>
9
#include <unistd.h>
Deike Kleberg's avatar
Deike Kleberg committed
10

Thomas Jahns's avatar
Thomas Jahns committed
11
#include "cdi.h"
12
13
14
#include "pio.h"
#include "pio_util.h"

15
#ifdef USE_MPI
Deike Kleberg's avatar
Deike Kleberg committed
16
17
#include "cdi.h"
#include "pio_impl.h"
Deike Kleberg's avatar
Deike Kleberg committed
18
19
#endif

20
#ifdef USE_MPI
Deike Kleberg's avatar
Deike Kleberg committed
21

22
char * command2charP[6] = {"IO_Open_file", "IO_Close_file",
23
24
                           "IO_Get_fp","IO_Set_fp",
                           "IO_Send_buffer", "IO_Finalize"};
Deike Kleberg's avatar
Deike Kleberg committed
25

26
long initial_buffersize = 16 * 1024 * 1024;
27
/*  4 KB <= x < 256 MB */
28
/* 16 * 1024 * 1024; */
29
/* 16 * 1024; */
30
/* 4 * 1024; */
Deike Kleberg's avatar
Deike Kleberg committed
31

Deike Kleberg's avatar
Deike Kleberg committed
32
int maxPtype = 4;
33
int maxNnodes  = 249;
34
int tagKey = 100;
35
int maxErrorString = 100;
36

Deike Kleberg's avatar
Deike Kleberg committed
37
38
39
40
41
42
43
44
45
46
double startTime;
double accumProbe   = 0.0;
double accumRecv    = 0.0;
double accumSend    = 0.0;
double accumSuspend = 0.0;
double accumWait    = 0.0;
double accumWrite   = 0.0;

pioInfo *pioinfo;

47
48
char *token = "%";

49
/***************************************************************/
Deike Kleberg's avatar
Deike Kleberg committed
50

Deike Kleberg's avatar
Deike Kleberg committed
51
52
int setTag ( int ID, int sc )
{
53
  return ID * tagKey + sc;
Deike Kleberg's avatar
Deike Kleberg committed
54
55
56
57
58
59
60
61
}

/***************************************************************/

rTag * getTag ( int tag )
{
  rTag *rtag;

Deike Kleberg's avatar
Deike Kleberg committed
62
  rtag = ( rTag * ) xmalloc ( sizeof ( rTag ));
63
64
  rtag->id = tag / tagKey;
  rtag->command = tag % tagKey;
Deike Kleberg's avatar
Deike Kleberg committed
65
66
67
68
69
70

  return rtag;
}

/***************************************************************/

71
72
73
74
75
76
77
78
void ungetTag ( rTag *rtag )
{
  free ( rtag );
  return;
}

/***************************************************************/

Deike Kleberg's avatar
Deike Kleberg committed
79
size_t pioFileWrite ( int fileID, int tsID, const void *buffer, size_t len )
Deike Kleberg's avatar
Deike Kleberg committed
80
{
Thomas Jahns's avatar
Thomas Jahns committed
81
  size_t iret = CDI_UNDEFID;
Deike Kleberg's avatar
Deike Kleberg committed
82
83
84

  switch ( pioinfo->type )
    {
85
    case PIO_MPI_NONB:
Deike Kleberg's avatar
Deike Kleberg committed
86
      iret = fwMPINONB ( fileID, tsID, buffer, len );
Deike Kleberg's avatar
Deike Kleberg committed
87
      break;
Deike Kleberg's avatar
Deike Kleberg committed
88
#ifndef _SX
Deike Kleberg's avatar
Deike Kleberg committed
89
    case PIO_POSIX_ASYNCH:
Deike Kleberg's avatar
Deike Kleberg committed
90
      iret = fwPOSIXASYNCH ( fileID, tsID, buffer, len );
Deike Kleberg's avatar
Deike Kleberg committed
91
      break;
Deike Kleberg's avatar
Deike Kleberg committed
92
#endif
93
    case PIO_POSIX_FPGUARD_SENDRECV:
Deike Kleberg's avatar
Deike Kleberg committed
94
      iret = fwPOSIXFPGUARDSENDRECV ( fileID, tsID, buffer, len );
95
      break;
96
    case PIO_POSIX_FPGUARD_THREAD:
Deike Kleberg's avatar
Deike Kleberg committed
97
      iret = fwPOSIXFPGUARDTHREAD ( fileID, tsID, buffer, len );
98
99
      break;
    case PIO_POSIX_FPGUARD_THREAD_REFUSE:
Deike Kleberg's avatar
Deike Kleberg committed
100
      iret = fwPOSIXFPGUARDTHREADREFUSE ( fileID, tsID, buffer, len );
101
      break;
102
    case PIO_POSIX_NONB:
Deike Kleberg's avatar
Deike Kleberg committed
103
      iret = fwPOSIXNONB ( fileID, tsID, buffer, len );
104
      break;
Deike Kleberg's avatar
Deike Kleberg committed
105
106
107
108
109
110
111
112
113
    }

  return iret;
}

/***************************************************************/

int pioFileClose ( int id )
{
Thomas Jahns's avatar
Thomas Jahns committed
114
  int iret = CDI_UNDEFID;
Deike Kleberg's avatar
Deike Kleberg committed
115
116
  switch ( pioinfo->type )
    {
117
118
    case PIO_MPI_NONB:
      iret = fcMPINONB ( id );
Deike Kleberg's avatar
Deike Kleberg committed
119
      break;
Deike Kleberg's avatar
Deike Kleberg committed
120
#ifndef _SX
Deike Kleberg's avatar
Deike Kleberg committed
121
122
123
    case PIO_POSIX_ASYNCH:
      iret = fcPOSIXASYNCH ( id );
      break;
Deike Kleberg's avatar
Deike Kleberg committed
124
#endif
125
126
127
    case PIO_POSIX_FPGUARD_SENDRECV:
      iret = fcPOSIXFPGUARDSENDRECV ( id );
      break;
128
129
130
131
132
133
    case PIO_POSIX_FPGUARD_THREAD:
      iret = fcPOSIXFPGUARDTHREAD ( id );
      break;
    case PIO_POSIX_FPGUARD_THREAD_REFUSE:
      iret = fcPOSIXFPGUARDTHREADREFUSE ( id );
      break;
134
135
136
    case PIO_POSIX_NONB:
      iret = fcPOSIXNONB ( id );
      break;
Deike Kleberg's avatar
Deike Kleberg committed
137
138
139
140
141
142
143
144
145
    }

  return iret;
}

/***************************************************************/

int pioFileOpenW ( const char *filename )
{
Thomas Jahns's avatar
Thomas Jahns committed
146
  int iret = CDI_UNDEFID;
147

Deike Kleberg's avatar
Deike Kleberg committed
148
149
  switch ( pioinfo->type )
    {
150
151
    case PIO_MPI_NONB:
      iret = fowMPINONB ( filename );
Deike Kleberg's avatar
Deike Kleberg committed
152
      break;
Deike Kleberg's avatar
Deike Kleberg committed
153
#ifndef _SX
Deike Kleberg's avatar
Deike Kleberg committed
154
155
156
    case PIO_POSIX_ASYNCH:
      iret = fowPOSIXASYNCH ( filename );
      break;
Deike Kleberg's avatar
Deike Kleberg committed
157
#endif
158
159
160
    case PIO_POSIX_FPGUARD_SENDRECV:
      iret = fowPOSIXFPGUARDSENDRECV ( filename );
      break;
161
162
163
164
165
166
    case PIO_POSIX_FPGUARD_THREAD:
      iret = fowPOSIXFPGUARDTHREAD ( filename );
      break;
    case PIO_POSIX_FPGUARD_THREAD_REFUSE:
      iret = fowPOSIXFPGUARDTHREADREFUSE ( filename );
      break;
167
168
169
    case PIO_POSIX_NONB:
      iret = fowPOSIXNONB ( filename );
      break;
Deike Kleberg's avatar
Deike Kleberg committed
170
    }
171

Deike Kleberg's avatar
Deike Kleberg committed
172
173
174
175
176
  return iret;
}

/***************************************************************/

177
178
static int cmpr ( const void *a, const void *b )
{
179
180
181
182
183
  return strcmp ( *( char ** ) a, *( char ** ) b);
}

/***************************************************************/

184
185
void setPioCommunicator ( MPI_Comm *myComm, MPI_Comm commF2C, int *color,
                          int *nnodes )
Deike Kleberg's avatar
Deike Kleberg committed
186
{
187
188
  int size, rank, len, npes_node, key, test, i, j;

Deike Kleberg's avatar
Deike Kleberg committed
189
  char *myHost, **allHosts, *allHosts0, *curr;
190
191
  char hostname [ MPI_MAX_PROCESSOR_NAME + 1 ];

Deike Kleberg's avatar
Deike Kleberg committed
192
193
  xmpi ( MPI_Comm_size ( commF2C, &size ));
  xmpi ( MPI_Comm_rank ( commF2C, &rank ));
194

Deike Kleberg's avatar
Deike Kleberg committed
195
  myHost = ( char * ) xmalloc ( MPI_MAX_PROCESSOR_NAME * sizeof ( char ));
196

Deike Kleberg's avatar
Deike Kleberg committed
197
  memset ( myHost, 0, MPI_MAX_PROCESSOR_NAME * sizeof ( char ));
198

Deike Kleberg's avatar
Deike Kleberg committed
199
  xmpi ( MPI_Get_processor_name ( myHost, &len ));
Deike Kleberg's avatar
Deike Kleberg committed
200

201
  if ( myHost[0] == '\0' ) xabort ( "did not succeed to set hostname" );
Deike Kleberg's avatar
Deike Kleberg committed
202

Deike Kleberg's avatar
Deike Kleberg committed
203
  if ( ddebug == MAXDEBUG )
204
205
206
    {
      strncpy ( hostname, myHost, len );
      hostname [ len ] = '\0';
207
      xdebug ( "myHost = %s", hostname );
208
    }
209

Deike Kleberg's avatar
Deike Kleberg committed
210
  allHosts = ( char ** ) xmalloc ( size * sizeof ( char * ));
211
212
  allHosts[0] = ( char * ) xmalloc ( size * MPI_MAX_PROCESSOR_NAME *
                                     sizeof ( char ));
Deike Kleberg's avatar
Deike Kleberg committed
213
  allHosts0 = allHosts[0];
Deike Kleberg's avatar
Deike Kleberg committed
214

215
216
  for ( i = 1; i < size; i++ )
    allHosts[i] = allHosts[0] + i * MPI_MAX_PROCESSOR_NAME;
217

218
219
220
  MPI_Allgather ( myHost, MPI_MAX_PROCESSOR_NAME, MPI_CHAR,
                            & ( allHosts[0][0] ), MPI_MAX_PROCESSOR_NAME,
                            MPI_CHAR, commF2C );
221
222
223
224
225
226

  qsort ( allHosts, size, sizeof ( char * ), cmpr );

  *color = 0;
  i = 0;
  j = 0;
227

228
  while ( i < size )
229
230
    {
      curr = allHosts[i];
231
      j++;
232
      if (( test = strcmp ( myHost, curr )) == 0 ) *color = j;
233

234
      while ( ++i < size )
235
236
        if (( test = strcmp ( allHosts[i], curr )) != 0)
          break;
237
    }
Deike Kleberg's avatar
Deike Kleberg committed
238

239
  *nnodes = j;
240

241
  if ( *color == 0 ) xabort ( "Color is not set" );
242

243
  npes_node = size / ( * nnodes );
Deike Kleberg's avatar
Deike Kleberg committed
244
245
  key = rank % npes_node;

Deike Kleberg's avatar
Deike Kleberg committed
246
  xmpi ( MPI_Comm_split ( commF2C, *color, key, myComm));
247

Deike Kleberg's avatar
Deike Kleberg committed
248
  free ( allHosts0 );
249
250
  free ( allHosts );
  free ( myHost );
Deike Kleberg's avatar
Deike Kleberg committed
251

252
  xdebug ( "color=%d", *color );
253

254
255
256
257
258
259
260
  return;
}

#endif

/***************************************************************/

Deike Kleberg's avatar
Deike Kleberg committed
261
MPI_Comm bInit ( int ptype, MPI_Comm comm, int *color, int *nnodes )
262
{
Deike Kleberg's avatar
Deike Kleberg committed
263
  int size, rank;
264
  int collectingData = 1;
265
266
  *nnodes = 1;
  *color = 1;
267

Deike Kleberg's avatar
Deike Kleberg committed
268
#ifdef USE_MPI
Deike Kleberg's avatar
Deike Kleberg committed
269
270
  xmpi ( MPI_Comm_size ( comm, &size ));
  xmpi ( MPI_Comm_rank ( comm, &rank ));
271
272

  if ( ptype < 0 || ptype > maxPtype )
273
    xabort ( "PIOTYPE is no valid modus" );
Deike Kleberg's avatar
Deike Kleberg committed
274
275
276

#ifdef _SX
  if ( ptype ==  PIO_POSIX_ASYNCH )
277
    xabort ( "PIO_POSIX_ASYNCH does not work on SX" );
Deike Kleberg's avatar
Deike Kleberg committed
278
#endif
279

280
  pioinfo = ( pioInfo * ) xmalloc ( sizeof ( pioInfo ));
281
282

  pioinfo->type = ptype;
Deike Kleberg's avatar
Deike Kleberg committed
283

284
  setPioCommunicator ( & ( pioinfo->comm ), comm, color, nnodes );
285

286
  if (( pioinfo->type == PIO_NONE ) && ( *nnodes != size ))
287
    xabort ( "PIOTYPE, NNODES: not a valid combination" );
288

289
  pioinfo->color = *color;
Deike Kleberg's avatar
Deike Kleberg committed
290

Deike Kleberg's avatar
Deike Kleberg committed
291
292
  xmpi ( MPI_Comm_rank ( pioinfo->comm, &( pioinfo->rank )));
  xmpi ( MPI_Comm_size ( pioinfo->comm, &( pioinfo->size )));
293

294
  xdebug( "IOPE%d in bInit(), ptype=%d, initial_buffersize=%ld: "
295
296
          "after init pioinfo ...",
          pioinfo->rank, pioinfo->type, initial_buffersize );
297

298
  pioinfo->collectorComm = MPI_COMM_NULL;
Deike Kleberg's avatar
Deike Kleberg committed
299

Deike Kleberg's avatar
Deike Kleberg committed
300
301
  switch ( pioinfo->type )
    {
302
    case PIO_NONE:
Deike Kleberg's avatar
Deike Kleberg committed
303
      xmpi ( MPI_Comm_dup ( pioinfo->comm, &( pioinfo->collectorComm )));
304
      collectingData = 1;
305
      break;
306
    case PIO_MPI_NONB:
307
      collectingData = initMPINONB ();
Deike Kleberg's avatar
Deike Kleberg committed
308
      break;
Deike Kleberg's avatar
Deike Kleberg committed
309
#ifndef _SX
Deike Kleberg's avatar
Deike Kleberg committed
310
    case PIO_POSIX_ASYNCH:
311
      collectingData = initPOSIXASYNCH ();
Deike Kleberg's avatar
Deike Kleberg committed
312
      break;
Deike Kleberg's avatar
Deike Kleberg committed
313
#endif
314
    case PIO_POSIX_FPGUARD_SENDRECV:
315
      collectingData = initPOSIXFPGUARDSENDRECV ();
316
      break;
317
    case PIO_POSIX_FPGUARD_THREAD:
318
      collectingData = initPOSIXFPGUARDTHREAD ();
319
320
      break;
    case PIO_POSIX_FPGUARD_THREAD_REFUSE:
321
      collectingData = initPOSIXFPGUARDTHREADREFUSE ();
322
      break;
Deike Kleberg's avatar
Deike Kleberg committed
323
    case PIO_POSIX_NONB:
324
      collectingData = initPOSIXNONB ();
325
      break;
Deike Kleberg's avatar
Deike Kleberg committed
326
    }
327
#endif
328
329

#ifdef USE_MPI
Deike Kleberg's avatar
Deike Kleberg committed
330
  return pioinfo->collectorComm;
331
332
333
#else
  return 0;
#endif
Deike Kleberg's avatar
Deike Kleberg committed
334
335
336
337
}

/***************************************************************/

Deike Kleberg's avatar
Deike Kleberg committed
338
void bFinalize ()
Deike Kleberg's avatar
Deike Kleberg committed
339
{
340
#ifdef USE_MPI
341
342
343
344
345
346
347
  switch ( pioinfo->type )
    {
    case PIO_POSIX_FPGUARD_THREAD:
      finalizePOSIXFPGUARDTHREAD ();
      break;
    }

348
  if ( pioinfo->collectorComm != MPI_COMM_NULL )
Deike Kleberg's avatar
Deike Kleberg committed
349
    xmpi ( MPI_Comm_free ( &( pioinfo->collectorComm )));
350
351

  if ( pioinfo->comm != MPI_COMM_NULL )
Deike Kleberg's avatar
Deike Kleberg committed
352
    xmpi ( MPI_Comm_free ( &( pioinfo->comm )));
353

Deike Kleberg's avatar
Deike Kleberg committed
354
  free ( pioinfo );
355
#endif
Deike Kleberg's avatar
Deike Kleberg committed
356
}
357
358
359
360
361
362
363
364
365
/*
 * Local Variables:
 * c-file-style: "Java"
 * c-basic-offset: 2
 * indent-tabs-mode: nil
 * show-trailing-whitespace: t
 * require-trailing-newline: t
 * End:
 */