pio_posixnonb.c 7.54 KB
Newer Older
1
2
3
#ifdef HAVE_CONFIG_H
#  include "config.h"
#endif
Deike Kleberg's avatar
Deike Kleberg committed
4

5
6
7
#include <stdbool.h>
#include <stdio.h>
#include <string.h>
Deike Kleberg's avatar
Deike Kleberg committed
8
#include <unistd.h>
Thomas Jahns's avatar
Thomas Jahns committed
9
#include <mpi.h>
10
11

#include "pio.h"
Deike Kleberg's avatar
Deike Kleberg committed
12
#include "pio_comm.h"
13
#include "pio_impl.h"
Deike Kleberg's avatar
Deike Kleberg committed
14
#include "pio_util.h"
15

16
17
#include "dmemory.h"

18
extern char * command2charP[6];
19

20
extern char *token;
21

22
typedef struct
23
24
25
{
  struct dBuffer *fb;
  FILE *fp;
26
  int fileID;
27
  int activeCollectors;
28
  char name[];
29
30
} bFiledataP;

31
static int
32
fileIDTest(void *a, void *fileID)
33
{
34
  return ((bFiledataP *)a)->fileID == (int)(intptr_t)fileID;
35
36
}

37
38
/***************************************************************/

39
static bFiledataP *
40
initBFiledataP(char *filename, size_t bs, int nc, int fileID)
41
42
43
{
  bFiledataP * bfp;

44
  xdebug ( "filename=%s, buffersize=%lu, ncollectors=%d", filename, bs, nc );
45

46
47
  bfp = xmalloc(sizeof (*bfp) + strlen(filename) + 1);
  strcpy(bfp->name, filename);
48
49

  if (( bfp->fp = fopen ( filename, "w" )) == NULL ) 
50
    xabort("Failed to open %s", bfp->name);
51
52
  int fd = fileno(bfp->fp);
  ftruncate(fd, (off_t)0);
53
  dbuffer_init(&bfp->fb, bs);
54

55
  bfp->activeCollectors = nc;
Deike Kleberg's avatar
Deike Kleberg committed
56

57
  bfp->fileID = fileID;
58

Deike Kleberg's avatar
Deike Kleberg committed
59
  xdebug ( "filename=%s, opened file, return", bfp->name );
60
61
62
63
64
65

  return bfp;
}

/***************************************************************/

66
67
static int
destroyBFiledataP(void *v)
68
69
70
71
{
  int iret = 0;
  bFiledataP *bfp = ( bFiledataP * ) v;

Deike Kleberg's avatar
Deike Kleberg committed
72
  xdebug ( "filename=%s, cleanup, in",  bfp->name );
73
74
75

  /* close file */
  if (( iret = fclose ( bfp->fp )) == EOF )
Thomas Jahns's avatar
Thomas Jahns committed
76
    xabort("Failed to close %s", bfp->name);
77
78
79
80
81

  /* file closed, cleanup */

  dbuffer_cleanup ( &( bfp->fb ));

82
  free(bfp);
83

84
  xdebug("%s", "cleaned up, return");
85
86
87
88
89
90

  return iret;
}

/***************************************************************/

91
92
static bool
compareNamesBP(void *v1, void *v2)
93
{
94
  bFiledataP *bfd1 = v1, *bfd2 = v2;
95

96
  return !strcmp(bfd1->name, bfd2->name);
97
98
99
100
}

/***************************************************************/

101
static void
102
writeP(bFiledataP *bfd, size_t amount)
103
{
104
  size_t written;
105

Deike Kleberg's avatar
Deike Kleberg committed
106
  xdebug ( "filename=%s, amount=%ld, in", bfd->name, amount );
107

108
  if ((written = fwrite(bfd->fb->buffer, 1, amount, bfd->fp )) != amount)
109
    xabort("did not succeed writing buffer in %s", bfd->name);
110

Deike Kleberg's avatar
Deike Kleberg committed
111
112
  xdebug ( "filename=%s, written=%ld, amount=%ld, return",
           bfd->name, written, amount );
113
114
115
}

/***************************************************************/
116
static void
117
elemCheck(void *q, void *nm)
118
{
119
120
  bFiledataP *bfd = q;
  const char *name = nm;
121

122
  if (!strcmp(name, bfd->name))
123
    xabort("Filename %s has already been added to the set\n", name);
124
}
125

126
127
void
pioWriterStdIO(void)
128
129
{
  bFiledataP *bfd; 
130
  listSet * bibBFiledataP;
131
  size_t amount, buffersize;
132
133
  char *messageBuffer = NULL;
  char *pMB, *filename, *temp;
Thomas Jahns's avatar
Thomas Jahns committed
134
  int messagesize, source, tag, id;
135
  struct fileOpTag rtag;
136
  MPI_Status status;
Deike Kleberg's avatar
Deike Kleberg committed
137
138
  MPI_Comm commNode = commInqCommNode ();
  int nProcsCollNode = commInqSizeNode () - commInqSizeColl ();
139
  bool * sentFinalize, doFinalize;
140

Deike Kleberg's avatar
Deike Kleberg committed
141
  xdebug ( "ncollectors=%d on this node", nProcsCollNode );
142

143
  bibBFiledataP = listSetNew(destroyBFiledataP, compareNamesBP);
144
  sentFinalize = xcalloc((size_t)nProcsCollNode, sizeof (sentFinalize[0]));
145
146
  
  for ( ;; )
147
148
    {  
        
149
150
      xmpiStat ( MPI_Probe ( MPI_ANY_SOURCE, MPI_ANY_TAG, commNode, 
                             &status ), &status );
151
152
      
      
153
154
      source = status.MPI_SOURCE;
      tag    = status.MPI_TAG;
155
      
156
      rtag = decodeFileOpTag(tag);
157
      
Deike Kleberg's avatar
Deike Kleberg committed
158
      xmpi ( MPI_Get_count ( &status, MPI_CHAR, &messagesize ));
159

160
      xdebug ( "RECEIVE MESSAGE FROM SOURCE=%d, ID=%d, COMMAND=%d ( %s ),"
161
162
               "MESSAGESIZE=%d", source, rtag.id, rtag.command,
               command2charP[rtag.command], messagesize);
163

164
      switch (rtag.command)
165
166
167
	{
      	case IO_Open_file:

168
169
	  messageBuffer
            = xmalloc((size_t)messagesize * sizeof (messageBuffer[0]));
170
    	  pMB = messageBuffer;
171

Deike Kleberg's avatar
Deike Kleberg committed
172
173
	  xmpi ( MPI_Recv ( messageBuffer, messagesize, MPI_CHAR, source, 
                            tag, commNode, &status ));
Deike Kleberg's avatar
Deike Kleberg committed
174

175
	  xdebug("%s", "after recv, in loop");
176
	  
177
178
179
	  filename = strtok ( pMB, token );
	  pMB += ( strlen ( filename ) + 1 );
	  temp =  strtok ( pMB, token );
180
          buffersize = (size_t)strtol(temp, NULL, 16);
181
	  pMB += ( strlen ( temp ) + 1 );
182
	  amount = (size_t)(messageBuffer + messagesize - pMB);
183
	  
184
	  xdebug("command %s, filename=%s, buffersize=%zu, amount=%zu",
185
                 command2charP[rtag.command], filename, buffersize, amount);
186
	  
187
	  
188
          if (!(bfd = listSetGet(bibBFiledataP, fileIDTest,
189
                               (void *)(intptr_t)rtag.id)))
190
	    {
191
	      listSetForeach(bibBFiledataP, elemCheck, filename);
192
	      bfd = initBFiledataP(filename, buffersize, nProcsCollNode,
193
                                   rtag.id);
194
	      
195
	      if ((id = listSetAdd(bibBFiledataP, bfd)) < 0)
196
                xabort("fileID=%d not unique", rtag.id);
197
              bfd->fileID = id;
198
199
	    }
	  else
Thomas Jahns's avatar
Thomas Jahns committed
200
	    if (strcmp(filename, bfd->name) != 0)
201
              xabort("filename is not consistent, fileID=%d", rtag.id);
202

203
	  memcpy(bfd->fb->buffer, pMB, amount);
204

205
	  writeP(bfd, amount);
206
207
208
209
210
211
	  
	  free ( messageBuffer );

	  break;

	case IO_Send_buffer:
212

213
          if (!(bfd = listSetGet(bibBFiledataP, fileIDTest,
214
                               (void *)(intptr_t)rtag.id)))
215
            xabort("fileID=%d is not in set", rtag.id );
216

217
	  amount = (size_t)messagesize;
218

219
220
	  xdebug("COMMAND %s, ID=%d, NAME=%s", command2charP[rtag.command],
                 rtag.id, bfd->name);
221

222
223
224
225
	  xmpi(MPI_Recv(bfd->fb->buffer, messagesize, MPI_CHAR, source, tag,
                        commNode, &status));

	  writeP(bfd, amount);
226
227
228
	  break;

	case IO_Close_file:
229

230
231
	  xdebug("COMMAND %s,  FILE%d, SOURCE%d",
                 command2charP[rtag.command], rtag.id, source);
232

233
          if (!(bfd = listSetGet(bibBFiledataP, fileIDTest,
234
                               (void *)(intptr_t)rtag.id)))
235
            xabort("fileID=%d is not in set", rtag.id);
236

237
          amount = (size_t)messagesize;
238

239
	  xdebug("COMMAND %s, ID=%d, NAME=%s, AMOUNT=%zu",
240
                 command2charP[rtag.command], rtag.id, bfd->name, amount);
241
242
243

	  xmpi(MPI_Recv(bfd->fb->buffer, messagesize, MPI_CHAR, source, tag,
                        commNode, &status));
244
245
246

	  writeP ( bfd, amount );

247
	  if ( ! --(bfd->activeCollectors))
248
	    {
249
	      xdebug("all are finished with file %d, delete node", rtag.id);
250
251
	      listSetRemove(bibBFiledataP, fileIDTest,
                            (void *)(intptr_t)rtag.id);
252
253
254
255
256
257
258
	    }
          break;
        case IO_Finalize:
          {
            int buffer = CDI_UNDEFID, collID;

            xmpi ( MPI_Recv ( &buffer, 1, MPI_INT, source, tag, commNode, &status ));
259
            
260
261
            sentFinalize[source] = true;
            doFinalize = true;
262
            
263
264
265
266
267
268
            for ( collID = 0; collID < nProcsCollNode; collID++ )
              if ( !sentFinalize[collID] ) 
                {
                  doFinalize = false;
                  break;
                }
269
            
270
271
            if ( doFinalize )
              {
272
273
                if (!listSetIsEmpty(bibBFiledataP))
                  xabort("set bibBfiledataP is not empty.");
274
275
                else
                  {
276
                    xdebug("%s", "all files are finished, destroy file set,"
277
                           " return");
278
                    listSetDelete(bibBFiledataP);
279
                  }
Thomas Jahns's avatar
Thomas Jahns committed
280
                free(sentFinalize);
281
282
283
284
285
286
                return;
              }
          }
          break;
        default:
          xabort ( "COMMAND NOT IMPLEMENTED" );
287
288
289
290
	}
    }
}

291
292
293
294
295
296
297
298
299
/*
 * Local Variables:
 * c-file-style: "Java"
 * c-basic-offset: 2
 * indent-tabs-mode: nil
 * show-trailing-whitespace: t
 * require-trailing-newline: t
 * End:
 */