pio.c 4.29 KB
Newer Older
Deike Kleberg's avatar
Deike Kleberg committed
1
2
3
4
5
6
7
8
9
10
11
12
#ifndef NOMPI

#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>

#include "mpi.h"

#include "cdi.h"
#include "pio.h"
#include "pio_impl.h"

13
bool ddebug = false;
Deike Kleberg's avatar
Deike Kleberg committed
14

15
16
17
18
long initial_buffersize = 16 * 1024 * 1024;
/*  4 KB <= x < 256 MB */ 
/* 16 * 1024 * 1024; */
/* 4 * 1024; */
Deike Kleberg's avatar
Deike Kleberg committed
19
20
21
22
23
24
25
26
27
28
29

double startTime;
double accumProbe   = 0.0;
double accumRecv    = 0.0;
double accumSend    = 0.0;
double accumSuspend = 0.0;
double accumWait    = 0.0;
double accumWrite   = 0.0;

pioInfo *pioinfo;

30
31
32
33
MPI_Request request = MPI_REQUEST_NULL;

char *token = "%";

Deike Kleberg's avatar
Deike Kleberg committed
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
/*****************************************************************************/

void check_mpi ( int line, int iret )
{
  char error_string[MPI_MAX_ERROR_STRING+1];
  int len;

  if  ( iret != MPI_SUCCESS ) 
    {
      MPI_Error_string ( iret, error_string, &len ); 
      error_string[len] = '\0';
      fprintf ( stderr,"\nLine %8d MPI error %4d: %s\n\n", line, iret, 
		error_string ); 
    }

  return;
}

/***************************************************************/

Deike Kleberg's avatar
Deike Kleberg committed
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
int setTag ( int ID, int sc )
{
  return ID * 1000 + sc;
}

/***************************************************************/

rTag * getTag ( int tag )
{
  rTag *rtag;

  rtag = ( rTag * ) malloc ( sizeof ( rTag ));
  rtag->id = tag / 1000;
  rtag->command = tag % 1000;

  return rtag;
}

/***************************************************************/

Deike Kleberg's avatar
Deike Kleberg committed
74
75
76
77
78
79
size_t pioFileWrite ( int id, int tsId, const void *buffer, size_t len )
{
  size_t iret;

  switch ( pioinfo->type )
    {
80
81
    case PIO_MPI_NONB:
      iret = fwMPINONB ( id, tsId, buffer, len );
Deike Kleberg's avatar
Deike Kleberg committed
82
      break;
Deike Kleberg's avatar
Deike Kleberg committed
83
84
85
    case PIO_POSIX_ASYNCH:
      iret = fwPOSIXASYNCH ( id, tsId, buffer, len );
      break;
86
87
88
89
90
91
    case PIO_POSIX_FPGUARD_SENDRECV:
      iret = fwPOSIXFPGUARDSENDRECV ( id, tsId, buffer, len );
      break;
    case PIO_POSIX_NONB:
      iret = fwPOSIXNONB ( id, tsId, buffer, len );
      break;
Deike Kleberg's avatar
Deike Kleberg committed
92
93
94
95
96
97
98
99
100
101
102
103
104
    }

  return iret;
}

/***************************************************************/

int pioFileClose ( int id )
{
  int iret;
  
  switch ( pioinfo->type )
    {
105
106
    case PIO_MPI_NONB:
      iret = fcMPINONB ( id );
Deike Kleberg's avatar
Deike Kleberg committed
107
      break;
Deike Kleberg's avatar
Deike Kleberg committed
108
109
110
    case PIO_POSIX_ASYNCH:
      iret = fcPOSIXASYNCH ( id );
      break;
111
112
113
114
115
116
    case PIO_POSIX_FPGUARD_SENDRECV:
      iret = fcPOSIXFPGUARDSENDRECV ( id );
      break;
    case PIO_POSIX_NONB:
      iret = fcPOSIXNONB ( id );
      break;
Deike Kleberg's avatar
Deike Kleberg committed
117
118
119
120
121
122
123
124
125
126
127
128
129
    }

  return iret;
}

/***************************************************************/

int pioFileOpenW ( const char *filename )
{
  int iret;
  
  switch ( pioinfo->type )
    {
130
131
    case PIO_MPI_NONB:
      iret = fowMPINONB ( filename );
Deike Kleberg's avatar
Deike Kleberg committed
132
      break;
Deike Kleberg's avatar
Deike Kleberg committed
133
134
135
    case PIO_POSIX_ASYNCH:
      iret = fowPOSIXASYNCH ( filename );
      break;
136
137
138
139
140
141
    case PIO_POSIX_FPGUARD_SENDRECV:
      iret = fowPOSIXFPGUARDSENDRECV ( filename );
      break;
    case PIO_POSIX_NONB:
      iret = fowPOSIXNONB ( filename );
      break;
Deike Kleberg's avatar
Deike Kleberg committed
142
143
144
145
146
147
148
149
150
    }
  
  return iret;
}

/***************************************************************/

int pioInit ( int ptype, int comm, int *ncollectors )
{
Deike Kleberg's avatar
Deike Kleberg committed
151
  int collectingData = 1;
Deike Kleberg's avatar
Deike Kleberg committed
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
  int size;

  pioinfo = ( pioInfo * ) malloc ( sizeof ( pioInfo ));

  pioinfo->type = ptype;
  pioinfo->comm  = ( MPI_Comm ) comm;
  MPI_Comm_rank ( pioinfo->comm, &( pioinfo->rank ));
  MPI_Comm_size ( pioinfo->comm, &( pioinfo->size ));

  if ( pioinfo->type == PIO_NONE && pioinfo->size != 1 )
    {
      fprintf ( stderr, 
		"PTYPE should be set to a parallel I/O type or npe should be 1." );
      fprintf ( stderr, "PTYPE = %d, npe = %d\n", 
		pioinfo->type , pioinfo->size );
      MPI_Abort ( pioinfo->comm, 1 );
    }

  if ( ddebug && pioinfo->rank == 0 )
    fprintf ( stdout, 
172
173
	      "pe%d in pioDefPtype(), ptype=%d, initial_buffersize=%ld: init pioinfo ...\n", 
	      pioinfo->rank, pioinfo->type, initial_buffersize );
Deike Kleberg's avatar
Deike Kleberg committed
174
175
176

  switch ( pioinfo->type )
    {
177
178
    case PIO_MPI_NONB:
      collectingData = initMPINONB ( ncollectors );
Deike Kleberg's avatar
Deike Kleberg committed
179
      break;
Deike Kleberg's avatar
Deike Kleberg committed
180
181
182
    case PIO_POSIX_ASYNCH:
      collectingData = initPOSIXASYNCH ( ncollectors );
      break;
183
184
185
186
187
188
    case PIO_POSIX_FPGUARD_SENDRECV:
      collectingData = initPOSIXFPGUARDSENDRECV ( ncollectors );
      break;
    case PIO_POSIX_NONB:
      collectingData = initPOSIXNONB ( ncollectors );
      break;
Deike Kleberg's avatar
Deike Kleberg committed
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
    }

  return collectingData;
}

/***************************************************************/

void pioFinalize ()
{
  free ( pioinfo );

  return;
}

#endif
204