Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
mpim-sw
libcdi
Commits
5f5ee7b6
Commit
5f5ee7b6
authored
Oct 08, 2011
by
Deike Kleberg
Browse files
IO mode PIO_POSIXNONB running.
parent
3cf89a20
Changes
8
Hide whitespace changes
Inline
Side-by-side
pioExamples/collectData.c
View file @
5f5ee7b6
...
...
@@ -25,8 +25,9 @@ static int nlev[nVars] = {1,1,5,5,2};
#define maxlev 5
#define ntsteps 3
int
IOModus
=
PIO_POSIX_FPGUARD_SENDRECV
;
//
int IOModus = PIO_POSIX_FPGUARD_SENDRECV;
//int IOModus = PIO_MPI_NONB;
int
IOModus
=
PIO_POSIX_NONB
;
void
modelRun
()
...
...
src/Makefile.am
View file @
5f5ee7b6
...
...
@@ -62,6 +62,7 @@ libcdi_la_SOURCES = \
pio_interface.h
\
pio_mpinonb.c
\
pio_posixfpguardsendrecv.c
\
pio_posixnonb.c
\
pio_queue.c
\
pio_rpc.c
\
pio_rpc.h
\
...
...
src/Makefile.in
View file @
5f5ee7b6
...
...
@@ -83,8 +83,8 @@ am_libcdi_la_OBJECTS = basetime.lo binary.lo calendar.lo cdf.lo \
gaussgrid.lo gribapi.lo grid.lo ieglib.lo institution.lo
\
model.lo namespace.lo pio.lo pio_comm.lo pio_dbuffer.lo
\
pio_interface.lo pio_mpinonb.lo pio_posixfpguardsendrecv.lo
\
pio_queue.lo pio_rpc.lo pio_server.lo
pio_util.lo
\
resource_handle.lo servicelib.lo stream_cdf.lo
\
pio_posixnonb.lo
pio_queue.lo pio_rpc.lo pio_server.lo
\
pio_util.lo
resource_handle.lo servicelib.lo stream_cdf.lo
\
stream_cgribex.lo stream_ext.lo stream_grb.lo
\
stream_gribapi.lo stream_history.lo stream_ieg.lo
\
stream_int.lo stream_record.lo stream_srv.lo stream_var.lo
\
...
...
@@ -332,6 +332,7 @@ libcdi_la_SOURCES = \
pio_interface.h
\
pio_mpinonb.c
\
pio_posixfpguardsendrecv.c
\
pio_posixnonb.c
\
pio_queue.c
\
pio_rpc.c
\
pio_rpc.h
\
...
...
@@ -513,6 +514,7 @@ distclean-compile:
@AMDEP_TRUE@@am__include@
@am__quote@./$(DEPDIR)/pio_interface.Plo@am__quote@
@AMDEP_TRUE@@am__include@
@am__quote@./$(DEPDIR)/pio_mpinonb.Plo@am__quote@
@AMDEP_TRUE@@am__include@
@am__quote@./$(DEPDIR)/pio_posixfpguardsendrecv.Plo@am__quote@
@AMDEP_TRUE@@am__include@
@am__quote@./$(DEPDIR)/pio_posixnonb.Plo@am__quote@
@AMDEP_TRUE@@am__include@
@am__quote@./$(DEPDIR)/pio_queue.Plo@am__quote@
@AMDEP_TRUE@@am__include@
@am__quote@./$(DEPDIR)/pio_rpc.Plo@am__quote@
@AMDEP_TRUE@@am__include@
@am__quote@./$(DEPDIR)/pio_server.Plo@am__quote@
...
...
src/pio.c
View file @
5f5ee7b6
...
...
@@ -107,11 +107,9 @@ size_t pioFileWrite ( int fileID, int tsID, const void *buffer, size_t len )
iret = fwPOSIXFPGUARDTHREADREFUSE ( fileID, tsID, buffer, len );
break;
*/
/*
case
PIO_POSIX_NONB
:
iret
=
fwPOSIXNONB
(
fileID
,
tsID
,
buffer
,
len
);
break
;
*/
}
return
iret
;
...
...
@@ -149,11 +147,9 @@ int pioFileClose ( int id )
iret = fcPOSIXFPGUARDTHREADREFUSE ( id );
break;
*/
/*
case
PIO_POSIX_NONB
:
iret
=
fcPOSIXNONB
(
id
);
break
;
*/
}
return
iret
;
...
...
@@ -192,11 +188,9 @@ int pioFileOpenW ( const char *filename )
iret = fowPOSIXFPGUARDTHREADREFUSE ( filename );
break;
*/
/*
case
PIO_POSIX_NONB
:
iret
=
fowPOSIXNONB
(
filename
);
break
;
*/
}
return
iret
;
...
...
@@ -246,11 +240,9 @@ void backendInit ( void )
collectingData = initPOSIXFPGUARDTHREADREFUSE ();
break;
*/
/*
case
PIO_POSIX_NONB
:
collectingData =
initPOSIXNONB ();
initPOSIXNONB
();
break
;
*/
}
#endif
}
...
...
@@ -260,9 +252,7 @@ void backendInit ( void )
void
backendFinalize
(
void
)
{
#ifdef USE_MPI
//xdebug ("#############################");
commDestroy
();
//xdebug ("#############################");
MPI_Finalize
();
exit
(
0
);
#endif
...
...
src/pio_comm.c
View file @
5f5ee7b6
...
...
@@ -686,8 +686,8 @@ void commPrint ( FILE * fp )
if
(
pioinfo2
->
commsIO
!=
NULL
)
{
fprintf
(
fp
,
"#
\n
"
);
for
(
i
=
0
;
i
<
pioinfo2
->
nProcs
IO
;
i
++
)
fprintf
(
fp
,
"# commsIO[%d]
= %d
\n
"
,
i
,
pioinfo2
->
commsIO
[
i
]
);
for
(
i
=
0
;
i
<
pioinfo2
->
nProcs
Coll
;
i
++
)
fprintf
(
fp
,
"# commsIO[%d] = %d
\n
"
,
i
,
pioinfo2
->
commsIO
[
i
]
);
}
else
fprintf
(
fp
,
"# commsIO = NULL
\n
"
);
...
...
src/pio_impl.h
View file @
5f5ee7b6
...
...
@@ -87,7 +87,7 @@ size_t fwPOSIXASYNCH( int, int, const void *, size_t );
int
initPOSIXASYNCH
(
void
);
#endif
/* pio_posixfpguardsendrecv.c */
void
fpgPOSIXFPGUARDSENDRECV
(
int
);
void
fpgPOSIXFPGUARDSENDRECV
(
void
);
int
fowPOSIXFPGUARDSENDRECV
(
const
char
*
);
int
fcPOSIXFPGUARDSENDRECV
(
int
);
size_t
fwPOSIXFPGUARDSENDRECV
(
int
,
int
,
const
void
*
,
size_t
);
...
...
@@ -110,11 +110,11 @@ int initPOSIXFPGUARDTHREADREFUSE ( void );
void
finalizePOSIXFPGUARDTHREADREFUSE
(
void
);
/* pio_posixnonb.c */
void
pwPOSIXNONB
(
int
);
int
fowPOSIXNONB
(
const
char
*
);
int
fcPOSIXNONB
(
int
);
size_t
fwPOSIXNONB
(
int
,
int
,
const
void
*
,
size_t
);
int
initPOSIXNONB
(
void
);
void
pwPOSIXNONB
(
void
);
int
fowPOSIXNONB
(
const
char
*
);
int
fcPOSIXNONB
(
int
);
size_t
fwPOSIXNONB
(
int
,
int
,
const
void
*
,
size_t
);
void
initPOSIXNONB
(
void
);
#endif
#endif
...
...
src/pio_posixfpguardsendrecv.c
View file @
5f5ee7b6
...
...
@@ -179,7 +179,7 @@ bool compareNamesAPF ( void *v1, void *v2 )
/***************************************************************/
void
fpgPOSIXFPGUARDSENDRECV
(
int
ncollectors
)
void
fpgPOSIXFPGUARDSENDRECV
(
void
)
{
int
i
,
source
,
iret
;
tag_t
*
rtag
;
...
...
@@ -189,13 +189,15 @@ void fpgPOSIXFPGUARDSENDRECV ( int ncollectors )
long
amount
;
char
errorString
[
maxErrorString
];
MPI_Comm
commNode
=
commInqCommNode
();
int
nProcsCollNode
=
commInqSizeNode
()
-
commInqSizeColl
();
xdebug
(
"ncollectors=%d
"
,
ncollectors
);
xdebug
(
"ncollectors=%d
on this node"
,
nProcsCollNode
);
bibBFiledataPF
=
queueInit
(
destroyBFiledataPF
,
NULL
);
for
(
;;
)
{
xdebug
();
xmpi
(
MPI_Probe
(
MPI_ANY_SOURCE
,
MPI_ANY_TAG
,
commNode
,
&
status
));
source
=
status
.
MPI_SOURCE
;
rtag
=
getTag
(
status
.
MPI_TAG
);
...
...
@@ -211,7 +213,7 @@ void fpgPOSIXFPGUARDSENDRECV ( int ncollectors )
rtag
->
id
)))
{
bfd
=
(
bFiledataPF
*
)
initBFiledataPF
(
rtag
->
id
,
n
collectors
);
n
ProcsCollNode
);
if
((
iret
=
queuePush
(
bibBFiledataPF
,
bfd
,
1
,
rtag
->
id
))
!=
rtag
->
id
)
{
...
...
@@ -290,7 +292,7 @@ void fpgPOSIXFPGUARDSENDRECV ( int ncollectors )
bfd
->
nfinished
[
source
]
=
true
;
bfd
->
finished
=
true
;
for
(
i
=
0
;
i
<
n
collectors
;
i
++
)
for
(
i
=
0
;
i
<
n
ProcsCollNode
;
i
++
)
if
(
!
(
bfd
->
nfinished
[
i
]
))
{
bfd
->
finished
=
false
;
...
...
@@ -538,8 +540,6 @@ int fowPOSIXFPGUARDSENDRECV ( const char *filename )
void
initPOSIXFPGUARDSENDRECV
(
void
)
{
int
ncollectors
;
if
(
commInqSizeNode
()
<
2
)
xabort
(
"usage: #pes/#nodes >= 2"
);
if
(
commInqRankNode
()
==
commInqSpecialRankNode
())
...
...
@@ -548,9 +548,7 @@ void initPOSIXFPGUARDSENDRECV ( void )
commSendNodeInfo
();
commRecvNodeMap
();
commDefCommsIO
();
ncollectors
=
commInqSizeNode
()
-
commInqSizeColl
();
xdebug
(
"Call fpGuard(), ncollectors = %d ..."
,
ncollectors
);
fpgPOSIXFPGUARDSENDRECV
(
ncollectors
);
fpgPOSIXFPGUARDSENDRECV
();
}
else
{
...
...
src/pio_posixnonb.c
View file @
5f5ee7b6
//pio_posixnonb.c:693: warning: pointer targets in passing argument 2 of ‘strncpy’ differ in signedness
#ifdef HAVE_CONFIG_H
# include "config.h"
#endif
#ifdef USE_MPI
#include
<stdbool.h>
...
...
@@ -14,6 +11,7 @@
#include
"mpi.h"
#include
"pio.h"
#include
"pio_comm.h"
#include
"pio_impl.h"
#include
"pio_util.h"
...
...
@@ -31,8 +29,6 @@ extern double accumSuspend;
extern
double
accumWait
;
extern
double
accumWrite
;
extern
pioInfo
*
pioinfo
;
typedef
struct
{
...
...
@@ -69,23 +65,18 @@ static aFiledataP *initAFiledataP ( const char *filename, size_t bs )
size_t
len
;
int
iret
;
if
(
ddebug
)
fprintf
(
stderr
,
"pe%d initFiledataAP (): filename=%s, buffersize=%lu, in
\n
"
,
pioinfo
->
rank
,
filename
,
bs
);
xdebug
(
"filename=%s, buffersize=%lu, in"
,
filename
,
bs
);
afp
=
(
aFiledataP
*
)
xmalloc
(
sizeof
(
aFiledataP
));
memset
(
afp
,
0
,
sizeof
(
aFiledataP
));
afp
=
xmalloc
(
sizeof
(
aFiledataP
));
len
=
strlen
(
filename
);
afp
->
name
=
(
char
*
)
xmalloc
((
len
+
1
)
*
sizeof
(
char
));
afp
->
name
=
xmalloc
((
len
+
1
)
*
sizeof
(
afp
->
name
[
0
]
));
strcpy
(
afp
->
name
,
filename
);
afp
->
size
=
bs
;
/* init output buffer */
if
(
ddebug
)
fprintf
(
stderr
,
"pe%d initOpenFileCollector(): name=%s, init output buffer
\n
"
,
pioinfo
->
rank
,
afp
->
name
);
xdebug
(
"filename=%s, init output buffer"
,
afp
->
name
);
iret
=
dbuffer_init
(
&
(
afp
->
db1
),
afp
->
size
);
iret
+=
dbuffer_init
(
&
(
afp
->
db2
),
afp
->
size
);
...
...
@@ -97,9 +88,7 @@ static aFiledataP *initAFiledataP ( const char *filename, size_t bs )
afp
->
command
=
IO_Open_file
;
afp
->
request
=
MPI_REQUEST_NULL
;
if
(
ddebug
)
fprintf
(
stderr
,
"pe%d initFiledataAP (), enqueued name=%s, return
\n
"
,
pioinfo
->
rank
,
afp
->
name
);
xdebug
(
"enqueued name=%s, return"
,
afp
->
name
);
return
afp
;
}
...
...
@@ -113,14 +102,11 @@ static bFiledataP * initBFiledataP ( char *filename,
int
i
;
char
errorString
[
maxErrorString
];
if
(
ddebug
)
fprintf
(
stderr
,
"### pe%d initFiledataBP (): filename=%s, buffersize=%lu, ncollectors=%d, in
\n
"
,
pioinfo
->
rank
,
filename
,
bs
,
nc
);
xdebug
(
"filename=%s, buffersize=%lu, ncollectors=%d, in"
,
filename
,
bs
,
nc
);
bfp
=
(
bFiledataP
*
)
xmalloc
(
sizeof
(
bFiledataP
));
memset
(
bfp
,
0
,
sizeof
(
bFiledataP
));
bfp
=
xmalloc
(
sizeof
(
bFiledataP
));
bfp
->
name
=
(
char
*
)
xmalloc
((
strlen
(
filename
)
+
1
)
*
sizeof
(
char
));
bfp
->
name
=
xmalloc
((
strlen
(
filename
)
+
1
)
*
sizeof
(
char
));
strcpy
(
bfp
->
name
,
filename
);
bfp
->
size
=
bs
;
...
...
@@ -133,15 +119,13 @@ static bFiledataP * initBFiledataP ( char *filename,
dbuffer_init
(
&
(
bfp
->
fb
),
(
size_t
)(
bfp
->
size
));
bfp
->
finished
=
false
;
bfp
->
nfinished
=
(
bool
*
)
xmalloc
(
nc
*
sizeof
(
b
ool
));
bfp
->
nfinished
=
xmalloc
(
nc
*
sizeof
(
b
fp
->
nfinished
[
0
]
));
for
(
i
=
0
;
i
<
nc
;
i
++
)
*
(
bfp
->
nfinished
+
i
)
=
true
;
if
(
ddebug
)
fprintf
(
stderr
,
"### pe%d initFiledataBP (), name=%s, opened file, return
\n
"
,
pioinfo
->
rank
,
bfp
->
name
);
xdebug
(
"filename=%s, opened file, return"
,
bfp
->
name
);
return
bfp
;
}
...
...
@@ -154,9 +138,7 @@ int destroyAFiledataP ( void *v )
aFiledataP
*
afp
=
(
aFiledataP
*
)
v
;
MPI_Status
status
;
if
(
ddebug
)
fprintf
(
stderr
,
"pe%d destroyAFiledataP(): name=%s, cleanup, in
\n
"
,
pioinfo
->
rank
,
afp
->
name
);
xdebug
(
"filename=%s, cleanup, in"
,
afp
->
name
);
xmpiStat
(
MPI_Wait
(
&
(
afp
->
request
),
&
status
),
&
status
);
...
...
@@ -166,9 +148,7 @@ int destroyAFiledataP ( void *v )
free
(
afp
->
name
);
free
(
afp
);
if
(
ddebug
)
fprintf
(
stderr
,
"pe%d return from destroyAFiledataP(), cleaned up, return
\n
"
,
pioinfo
->
rank
);
xdebug
(
"cleaned up, return"
);
return
iret
;
}
...
...
@@ -181,9 +161,7 @@ int destroyBFiledataP ( void *v )
bFiledataP
*
bfp
=
(
bFiledataP
*
)
v
;
char
errorString
[
maxErrorString
];
if
(
ddebug
)
fprintf
(
stderr
,
"pe%d destroyBFiledataP(): name=%s, cleanup, in
\n
"
,
pioinfo
->
rank
,
bfp
->
name
);
xdebug
(
"filename=%s, cleanup, in"
,
bfp
->
name
);
/* close file */
...
...
@@ -201,9 +179,7 @@ int destroyBFiledataP ( void *v )
free
(
bfp
->
nfinished
);
free
(
bfp
);
if
(
ddebug
)
fprintf
(
stderr
,
"pe%d return from destroyBFiledataP(), cleaned up, return
\n
"
,
pioinfo
->
rank
);
xdebug
(
"cleaned up, return"
);
return
iret
;
}
...
...
@@ -251,9 +227,7 @@ void writeP ( bFiledataP *bfd, long amount )
long
written
;
char
errorString
[
maxErrorString
];
if
(
ddebug
)
fprintf
(
stderr
,
"### pe%d writeP, name=%s, amount=%ld, in
\n
"
,
pioinfo
->
rank
,
bfd
->
name
,
amount
);
xdebug
(
"filename=%s, amount=%ld, in"
,
bfd
->
name
,
amount
);
if
((
written
=
fwrite
(
bfd
->
fb
->
buffer
,
sizeof
(
char
),
amount
,
bfd
->
fp
))
!=
amount
)
...
...
@@ -262,9 +236,8 @@ void writeP ( bFiledataP *bfd, long amount )
xabort
(
errorString
);
}
if
(
ddebug
)
fprintf
(
stderr
,
"### pe%d writeP (), name=%s, written=%ld, amount=%ld, return
\n
"
,
pioinfo
->
rank
,
bfd
->
name
,
written
,
amount
);
xdebug
(
"filename=%s, written=%ld, amount=%ld, return"
,
bfd
->
name
,
written
,
amount
);
return
;
}
...
...
@@ -299,7 +272,7 @@ void queueCheckP ( queue_t *q, char *name )
}
//
void
pwPOSIXNONB
(
int
ncollectors
)
void
pwPOSIXNONB
(
void
)
{
bFiledataP
*
bfd
;
queue_t
*
bibBFiledataP
;
...
...
@@ -309,18 +282,18 @@ void pwPOSIXNONB ( int ncollectors )
int
messagesize
,
source
,
tag
,
i
,
len
,
id
;
tag_t
*
rtag
=
NULL
;
MPI_Status
status
;
MPI_Comm
commNode
=
commInqCommNode
();
char
errorString
[
maxErrorString
];
int
nProcsCollNode
=
commInqSizeNode
()
-
commInqSizeColl
();
if
(
ddebug
)
fprintf
(
stderr
,
"### pe%d pwPOSIXNONB (), ncollectors=%d, in
\n
"
,
pioinfo
->
rank
,
ncollectors
);
xdebug
(
"ncollectors=%d on this node"
,
nProcsCollNode
);
bibBFiledataP
=
queueInit
(
destroyBFiledataP
,
compareNamesBP
);
for
(
;;
)
{
MPI_Probe
(
MPI_ANY_SOURCE
,
MPI_ANY_TAG
,
pioinfo
->
comm
,
&
status
);
xmpi
(
MPI_Probe
(
MPI_ANY_SOURCE
,
MPI_ANY_TAG
,
comm
Node
,
&
status
)
);
source
=
status
.
MPI_SOURCE
;
tag
=
status
.
MPI_TAG
;
...
...
@@ -329,13 +302,12 @@ void pwPOSIXNONB ( int ncollectors )
if
((
rtag
=
getTag
(
tag
))
==
NULL
)
xabort
(
"getTag () failed"
);
MPI_Get_count
(
&
status
,
MPI_CHAR
,
&
messagesize
);
xmpi
(
MPI_Get_count
(
&
status
,
MPI_CHAR
,
&
messagesize
)
);
if
(
ddebug
)
fprintf
(
stderr
,
"### pe%d pwPOSIXNONB (): receive message from source=%d, id=%d, command=%d ( %s ), messagesize=%d
\n
"
,
pioinfo
->
rank
,
source
,
rtag
->
id
,
rtag
->
command
,
command2charP
[
rtag
->
command
],
messagesize
);
xdebug
(
"receive message from source=%d, id=%d, command=%d ( %s ),"
"messagesize=%d"
,
source
,
rtag
->
id
,
rtag
->
command
,
command2charP
[
rtag
->
command
],
messagesize
);
switch
(
rtag
->
command
)
{
...
...
@@ -344,32 +316,29 @@ void pwPOSIXNONB ( int ncollectors )
messageBuffer
=
(
char
*
)
xmalloc
(
messagesize
*
sizeof
(
char
));
pMB
=
messageBuffer
;
MPI_Recv
(
messageBuffer
,
messagesize
,
MPI_CHAR
,
source
,
tag
,
pioinfo
->
comm
,
&
status
);
xmpi
(
MPI_Recv
(
messageBuffer
,
messagesize
,
MPI_CHAR
,
source
,
tag
,
commNode
,
&
status
)
);
if
(
ddebug
)
fprintf
(
stderr
,
"### pe%d pwPOSIXNONB (), after recv, in loop
\n
"
,
pioinfo
->
rank
);
xdebug
(
"after recv, in loop"
);
filename
=
strtok
(
pMB
,
token
);
pMB
+=
(
strlen
(
filename
)
+
1
);
temp
=
strtok
(
pMB
,
token
);
buffersize
=
strtol
(
temp
,
NULL
,
16
);
buffersize
=
strtol
(
temp
,
NULL
,
16
);
pMB
+=
(
strlen
(
temp
)
+
1
);
amount
=
(
long
)
(
messageBuffer
+
messagesize
-
pMB
);
if
(
ddebug
)
fprintf
(
stderr
,
"### pe%d pwPOSIXNONB () case %s, filename=%s, buffersize=%ld, amount=%ld
\n
"
,
pioinfo
->
rank
,
command2charP
[
rtag
->
command
],
filename
,
buffersize
,
amount
);
xdebug
(
"command %s, filename=%s, "
"buffersize=%ld, amount=%ld"
,
command2charP
[
rtag
->
command
],
filename
,
buffersize
,
amount
);
if
(
!
(
bfd
=
(
bFiledataP
*
)
queueIdx2val
(
bibBFiledataP
,
rtag
->
id
)))
{
queueCheckP
(
bibBFiledataP
,
filename
);
bfd
=
(
bFiledataP
*
)
initBFiledataP
(
filename
,
buffersize
,
n
collectors
);
bfd
=
(
bFiledataP
*
)
initBFiledataP
(
filename
,
buffersize
,
n
ProcsCollNode
);
if
((
id
=
queuePush
(
bibBFiledataP
,
bfd
,
1
,
rtag
->
id
))
!=
rtag
->
id
)
{
...
...
@@ -406,13 +375,12 @@ void pwPOSIXNONB ( int ncollectors )
amount
=
messagesize
;
if
(
ddebug
)
fprintf
(
stderr
,
"### pe%d, command: %s, id=%d, name=%s
\n
"
,
pioinfo
->
rank
,
command2charP
[
rtag
->
command
],
rtag
->
id
,
bfd
->
name
);
xdebug
(
"command %s, id=%d, name=%s"
,
command2charP
[
rtag
->
command
],
rtag
->
id
,
bfd
->
name
);
MPI_Recv
(
bfd
->
fb
->
buffer
,
amount
,
MPI_CHAR
,
source
,
tag
,
pioinfo
->
comm
,
&
status
);
xmpi
(
MPI_Recv
(
bfd
->
fb
->
buffer
,
amount
,
MPI_CHAR
,
source
,
tag
,
commNode
,
&
status
)
);
writeP
(
bfd
,
amount
);
...
...
@@ -420,11 +388,9 @@ void pwPOSIXNONB ( int ncollectors )
case
IO_Close_file
:
if
(
ddebug
)
fprintf
(
stderr
,
"### pe%d, case %s, file%d, source%d
\n
"
,
pioinfo
->
rank
,
command2charP
[
rtag
->
command
],
rtag
->
id
,
source
);
xdebug
(
"command %s, file%d, source%d"
,
command2charP
[
rtag
->
command
],
rtag
->
id
,
source
);
if
(
!
(
bfd
=
(
bFiledataP
*
)
queueIdx2val
(
bibBFiledataP
,
rtag
->
id
)))
...
...
@@ -435,20 +401,19 @@ void pwPOSIXNONB ( int ncollectors )
amount
=
messagesize
;
if
(
ddebug
)
fprintf
(
stderr
,
"### pe%d, command %s, id=%d, name=%s, amount=%ld
\n
"
,
pioinfo
->
rank
,
command2charP
[
rtag
->
command
],
rtag
->
id
,
bfd
->
name
,
amount
);
xdebug
(
"command %s, id=%d, name=%s, amount=%ld"
,
command2charP
[
rtag
->
command
],
rtag
->
id
,
bfd
->
name
,
amount
);
MPI_Recv
(
bfd
->
fb
->
buffer
,
amount
,
MPI_CHAR
,
source
,
tag
,
pioinfo
->
comm
,
&
status
);
xmpi
(
MPI_Recv
(
bfd
->
fb
->
buffer
,
amount
,
MPI_CHAR
,
source
,
tag
,
commNode
,
&
status
)
);
writeP
(
bfd
,
amount
);
*
(
bfd
->
nfinished
+
source
)
=
true
;
bfd
->
finished
=
true
;
for
(
i
=
0
;
i
<
n
collectors
;
i
++
)
for
(
i
=
0
;
i
<
n
ProcsCollNode
;
i
++
)
{
if
(
*
(
bfd
->
nfinished
+
i
)
==
false
)
{
...
...
@@ -459,15 +424,11 @@ void pwPOSIXNONB ( int ncollectors )
if
(
bfd
->
finished
)
{
if
(
ddebug
)
fprintf
(
stderr
,
"### pe%d: all are finished with file %d, delete node
\n
"
,
pioinfo
->
rank
,
rtag
->
id
);
xdebug
(
"all are finished with file %d, delete node"
,
rtag
->
id
);
queueDelNode
(
bibBFiledataP
,
rtag
->
id
);
if
(
!
bibBFiledataP
->
head
)
{
if
(
ddebug
)
fprintf
(
stderr
,
"### pe%d: all files are finished, destroy queue, return
\n
"
,
pioinfo
->
rank
);
xdebug
(
"all files are finished, destroy queue, return"
);
queueDestroy
(
bibBFiledataP
);
ungetTag
(
rtag
);
return
;
...
...
@@ -490,22 +451,19 @@ void sendP ( aFiledataP *afd, int id )
amount
=
dbuffer_data_size
(
afd
->
db
);
tag
=
setTag
(
id
,
afd
->
command
);
if
(
ddebug
)
fprintf
(
stderr
,
"pe%d sendP (): send buffer for %s,"
" size: %llu bytes, command=%s, in
\n
"
,
pioinfo
->
rank
,
afd
->
name
,
(
unsigned
long
long
)
amount
,
command2charP
[
afd
->
command
]
);
xdebug
(
"send buffer for %s,"
" size: %llu bytes, command=%s, in"
,
afd
->
name
,
(
unsigned
long
long
)
amount
,
command2charP
[
afd
->
command
]
);
if
(
ddebug
)
startTime
=
MPI_Wtime
();
xmpi
Stat
(
MPI_Wait
(
&
(
afd
->
request
),
&
status
)
,
&
status
);
xmpi
(
MPI_Wait
(
&
(
afd
->
request
),
&
status
));
if
(
ddebug
)
accumWait
+=
(
MPI_Wtime
()
-
startTime
);
if
(
ddebug
)
accumWait
+=
(
MPI_Wtime
()
-
startTime
);
MPI_Isend
(
afd
->
db
->
buffer
,
amount
,
MPI_CHAR
,
pioinfo
->
s
pecialRank
,
tag
,
pioinfo
->
comm
,
&
(
afd
->
request
));
xmpi
(
MPI_Isend
(
afd
->
db
->
buffer
,
amount
,
MPI_CHAR
,
commInqS
pecialRank
Node
()
,
tag
,
commInqCommNode
()
,
&
(
afd
->
request
)
));
/* change outputBuffer */
...
...
@@ -513,16 +471,12 @@ void sendP ( aFiledataP *afd, int id )
if
(
afd
->
db
==
afd
->
db1
)
{
if
(
ddebug
)
fprintf
(
stderr
,
"pe%d sendP (): Change to buffer 2 ...
\n
"
,
pioinfo
->
rank
);
xdebug
(
"Change to buffer 2 ..."
);
afd
->
db
=
afd
->
db2
;
}
else
{
if
(
ddebug
)
fprintf
(
stderr
,
"pe%d sendP (): Change to buffer 1 ...
\n
"
,
pioinfo
->
rank
);
xdebug
(
"Change to buffer 1 ..."
);
afd
->
db
=
afd
->
db1
;
}
...
...
@@ -547,9 +501,7 @@ size_t fwPOSIXNONB ( int id, int tsId, const void *buffer, size_t len )
if
(
flush
==
1
)
{
if
(
ddebug
)
fprintf
(
stderr
,
"pe%d fileWriter (): tsId = %d, flush buffer
\n
"
,
pioinfo
->
rank
,
tsId
);
xdebug
(
"tsId = %d, flush buffer"
,
tsId
);
curr
=
bibAFiledataP
->
head
;
...
...
@@ -560,16 +512,14 @@ size_t fwPOSIXNONB ( int id, int tsId, const void *buffer, size_t len )
}
oldTsId
=
tsId
;
MPI_Barrier
(
pioinfo
->
collectorComm
);
xmpi
(
MPI_Barrier
(
commInqCommColl
())
);
}
afd
=
(
aFiledataP
*
)
queueIdx2val
(
bibAFiledataP
,
id
);
filled
=
dbuffer_push
(
afd
->
db
,
(
unsigned
char
*
)
buffer
,
len
);
if
(
ddebug
)
fprintf
(
stderr
,
"pe%d fileWrite (): id = %d, tsId = %d, pushed %lu byte data on buffer, filled = %d
\n
"
,
pioinfo
->
rank
,
id
,
tsId
,
len
,
filled
);
xdebug
(
"id = %d, tsId = %d, pushed %lu byte data on buffer, filled = %d"
,
id
,
tsId
,
len
,
filled
);
if
(
filled
==
1
)
{
...
...
@@ -598,11 +548,9 @@ int fcPOSIXNONB ( int id )
{
double
accumWaitMax
;
aFiledataP
*
afd
;
int
iret
;
int
iret
,
root
=
0
;
if
(
ddebug
)
fprintf
(
stderr
,
"pe%d fileClose (%d ): send buffer, close file and cleanup
\n
"
,
pioinfo
->
rank
,
id
);
xdebug
(
"fileID %d: send buffer, close file and cleanup"
,
id
);
afd
=
(
aFiledataP
*