Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
mpim-sw
libcdi
Commits
cace7a29
Commit
cace7a29
authored
Oct 06, 2011
by
Deike Kleberg
Browse files
Minor beautifications.
parent
7891f01c
Changes
9
Hide whitespace changes
Inline
Side-by-side
pioExamples/collectData.c
View file @
cace7a29
...
...
@@ -27,6 +27,8 @@ static int nlev[nVars] = {1,1,5,5,2};
#define maxlev 5
#define ntsteps 3
int
IOModus
=
PIO_POSIX_FPGUARD_SENDRECV
;
void
modelRun
()
{
...
...
@@ -144,7 +146,7 @@ int main (int argc, char *argv[])
if
(
nProcsIO
<=
0
||
nProcsIO
>=
sizeGlob
)
xabort
(
"bad distribution of tasks on PEs"
);
commModel
=
pioInit_c
(
commGlob
,
nProcsIO
,
P
IO
_MPI_NONB
);
commModel
=
pioInit_c
(
commGlob
,
nProcsIO
,
IO
Modus
);
#endif
modelRun
();
...
...
src/pio.c
View file @
cace7a29
...
...
@@ -79,31 +79,31 @@ void ungetTag ( rTag *rtag )
/***************************************************************/
size_t
pioFileWrite
(
int
id
,
int
tsI
d
,
const
void
*
buffer
,
size_t
len
)
size_t
pioFileWrite
(
int
fileID
,
int
tsI
D
,
const
void
*
buffer
,
size_t
len
)
{
size_t
iret
=
CDI_UNDEFID
;
switch
(
pioinfo
->
type
)
{
case
PIO_MPI_NONB
:
iret
=
fwMPINONB
(
id
,
tsI
d
,
buffer
,
len
);
iret
=
fwMPINONB
(
fileID
,
tsI
D
,
buffer
,
len
);
break
;
#ifndef _SX
case
PIO_POSIX_ASYNCH
:
iret
=
fwPOSIXASYNCH
(
id
,
tsI
d
,
buffer
,
len
);
iret
=
fwPOSIXASYNCH
(
fileID
,
tsI
D
,
buffer
,
len
);
break
;
#endif
case
PIO_POSIX_FPGUARD_SENDRECV
:
iret
=
fwPOSIXFPGUARDSENDRECV
(
id
,
tsI
d
,
buffer
,
len
);
iret
=
fwPOSIXFPGUARDSENDRECV
(
fileID
,
tsI
D
,
buffer
,
len
);
break
;
case
PIO_POSIX_FPGUARD_THREAD
:
iret
=
fwPOSIXFPGUARDTHREAD
(
id
,
tsI
d
,
buffer
,
len
);
iret
=
fwPOSIXFPGUARDTHREAD
(
fileID
,
tsI
D
,
buffer
,
len
);
break
;
case
PIO_POSIX_FPGUARD_THREAD_REFUSE
:
iret
=
fwPOSIXFPGUARDTHREADREFUSE
(
id
,
tsI
d
,
buffer
,
len
);
iret
=
fwPOSIXFPGUARDTHREADREFUSE
(
fileID
,
tsI
D
,
buffer
,
len
);
break
;
case
PIO_POSIX_NONB
:
iret
=
fwPOSIXNONB
(
id
,
tsI
d
,
buffer
,
len
);
iret
=
fwPOSIXNONB
(
fileID
,
tsI
D
,
buffer
,
len
);
break
;
}
...
...
src/pio_interface.c
View file @
cace7a29
...
...
@@ -165,17 +165,13 @@ void mapProblems ( int problemSizes[], int * problemMapping, int nProblems,
problemMapping
[
dummy
[
i
]
]
=
writerIdx
;
buckets
[
writerIdx
]
+=
*
ip
[
i
];
}
if
(
ddebug
>
2
)
{
xprintArray
(
"problemSizes = "
,
problemSizes
,
nProblems
,
DATATYPE_INT
);
xprintArray
(
"vector of indices, qsort of problemSizes"
,
dummy
,
nProblems
,
DATATYPE_INT
);
xprintArray
(
"problemMapping"
,
problemMapping
,
nProblems
,
DATATYPE_INT
);
xprintArray
(
"meanBucket"
,
meanBucket
,
nWriter
,
DATATYPE_FLT
);
xprintArray
(
"actual buckets"
,
buckets
,
nWriter
,
DATATYPE_INT
);
}
xprintArray3
(
"problemSizes = "
,
problemSizes
,
nProblems
,
DATATYPE_INT
);
xprintArray3
(
"vector of indices, qsort of problemSizes"
,
dummy
,
nProblems
,
DATATYPE_INT
);
xprintArray3
(
"problemMapping"
,
problemMapping
,
nProblems
,
DATATYPE_INT
);
xprintArray3
(
"meanBucket"
,
meanBucket
,
nWriter
,
DATATYPE_FLT
);
xprintArray3
(
"actual buckets"
,
buckets
,
nWriter
,
DATATYPE_INT
);
}
/****************************************************/
...
...
@@ -283,14 +279,12 @@ void varMapGen ( int * vSizes, int * sSizes, int * varMapping,
if
(
ddebug
)
{
xprintArray
(
"varMapping"
,
varMapping
,
nVars
,
DATATYPE_INT
);
for
(
i
=
0
;
i
<
nProcsIO
;
i
++
)
buckets
[
i
]
=
0
;
for
(
i
=
0
;
i
<
nVars
;
i
++
)
buckets
[
*
(
varMapping
+
i
)
-
nProcsCalc
]
+=
*
(
vSizes
+
i
);
xprintArray
(
"buckets"
,
buckets
,
nProcsIO
,
DATATYPE_INT
);
}
}
#endif
...
...
src/pio_mpinonb.c
View file @
cace7a29
...
...
@@ -142,7 +142,7 @@ bool compareNamesMPINONB ( void *v1, void *v2 )
/***************************************************************/
void
writeMPINONB
(
aFiledataM
*
of
,
int
id
)
void
writeMPINONB
(
aFiledataM
*
of
,
int
fileID
)
{
int
amount
;
MPI_Status
status
;
...
...
@@ -159,7 +159,7 @@ void writeMPINONB ( aFiledataM *of, int id )
xmpi
(
MPI_Wait
(
&
(
of
->
request
),
&
status
));
xmpi
(
MPI_File_iwrite_shared
(
of
->
fh
,
of
->
db
->
buffer
,
amount
,
MPI_CHAR
,
&
(
of
->
request
)));
xdebug
(
"%d bytes written for
id
=%d"
,
amount
,
id
);
xdebug
(
"%d bytes written for
fileID
=%d"
,
amount
,
fileID
);
/* change outputBuffer */
...
...
@@ -167,14 +167,14 @@ void writeMPINONB ( aFiledataM *of, int id )
if
(
of
->
db
==
of
->
db1
)
{
xdebug3
(
"IOPE%d:
id
=%d, change to buffer 2 ..."
,
pioinfo
->
rank
,
id
);
xdebug3
(
"IOPE%d:
fileID
=%d, change to buffer 2 ..."
,
pioinfo
->
rank
,
fileID
);
of
->
db
=
of
->
db2
;
}
else
{
xdebug3
(
"IOPE%d:
id
=%d, change to buffer 1 ..."
,
pioinfo
->
rank
,
id
);
xdebug3
(
"IOPE%d:
fileID
=%d, change to buffer 1 ..."
,
pioinfo
->
rank
,
fileID
);
of
->
db
=
of
->
db1
;
}
...
...
@@ -183,9 +183,9 @@ void writeMPINONB ( aFiledataM *of, int id )
/***************************************************************/
size_t
fwMPINONB
(
int
id
,
int
tsI
d
,
const
void
*
buffer
,
size_t
len
)
size_t
fwMPINONB
(
int
fileID
,
int
tsI
D
,
const
void
*
buffer
,
size_t
len
)
{
static
int
oldTsI
d
=
0
;
static
int
oldTsI
D
=
0
;
int
error
=
0
;
int
flush
=
0
;
int
filled
=
0
;
...
...
@@ -193,28 +193,28 @@ size_t fwMPINONB ( int id, int tsId, const void *buffer, size_t len )
node
*
curr
;
char
errorString
[
maxErrorString
];
flush
=
(
tsI
d
!=
oldTsI
d
)
?
1
:
0
;
flush
=
(
tsI
D
!=
oldTsI
D
)
?
1
:
0
;
if
(
flush
==
1
)
{
xdebug3
(
"IOPE%d: tsI
d
= %d, flush buffer"
,
pioinfo
->
rank
,
tsI
d
);
xdebug3
(
"IOPE%d: tsI
D
= %d, flush buffer"
,
pioinfo
->
rank
,
tsI
D
);
curr
=
bibAFiledataM
->
head
;
while
(
curr
)
{
writeMPINONB
((
aFiledataM
*
)
curr
->
val
,
curr
->
idx
);
curr
=
curr
->
next
;
}
oldTsI
d
=
tsI
d
;
oldTsI
D
=
tsI
D
;
MPI_Barrier
(
pioinfo
->
comm
);
}
of
=
(
aFiledataM
*
)
queueIdx2val
(
bibAFiledataM
,
id
);
of
=
(
aFiledataM
*
)
queueIdx2val
(
bibAFiledataM
,
fileID
);
filled
=
dbuffer_push
(
of
->
db
,
(
unsigned
char
*
)
buffer
,
len
);
xdebug3
(
"IOPE%d:
id
= %d, tsI
d
= %d,"
xdebug3
(
"IOPE%d:
fileID
= %d, tsI
D
= %d,"
" pushed data on buffer, filled = %d"
,
pioinfo
->
rank
,
id
,
tsI
d
,
filled
);
pioinfo
->
rank
,
fileID
,
tsI
D
,
filled
);
if
(
filled
==
1
)
{
...
...
@@ -222,7 +222,7 @@ size_t fwMPINONB ( int id, int tsId, const void *buffer, size_t len )
error
=
filled
;
else
{
writeMPINONB
(
of
,
id
);
writeMPINONB
(
of
,
fileID
);
error
=
dbuffer_push
(
of
->
db
,
(
unsigned
char
*
)
buffer
,
len
);
}
...
...
@@ -230,7 +230,7 @@ size_t fwMPINONB ( int id, int tsId, const void *buffer, size_t len )
if
(
error
==
1
)
{
sprintf
(
errorString
,
"did not succeed filling output buffer,
id=%d"
,
id
);
sprintf
(
errorString
,
"did not succeed filling output buffer,
fileID=%d"
,
fileID
);
xabort
(
errorString
);
}
...
...
@@ -239,7 +239,7 @@ size_t fwMPINONB ( int id, int tsId, const void *buffer, size_t len )
/***************************************************************/
int
fcMPINONB
(
int
id
)
int
fcMPINONB
(
int
fileID
)
{
aFiledataM
*
of
;
int
iret
;
...
...
@@ -247,19 +247,19 @@ int fcMPINONB ( int id )
char
errorString
[
maxErrorString
];
xdebug
(
"IOPE%d: write buffer, close file and cleanup, in"
,
pioinfo
->
rank
,
id
);
pioinfo
->
rank
,
fileID
);
if
(
!
(
of
=
(
aFiledataM
*
)
queueIdx2val
(
bibAFiledataM
,
id
)))
if
(
!
(
of
=
(
aFiledataM
*
)
queueIdx2val
(
bibAFiledataM
,
fileID
)))
{
sprintf
(
errorString
,
"pio_queue,
id
=%d not found"
,
id
);
sprintf
(
errorString
,
"pio_queue,
fileID
=%d not found"
,
fileID
);
xabort
(
errorString
);
}
writeMPINONB
(
of
,
id
);
writeMPINONB
(
of
,
fileID
);
/* dequeue file element */
iret
=
queueDelNode
(
bibAFiledataM
,
id
);
iret
=
queueDelNode
(
bibAFiledataM
,
fileID
);
if
(
!
bibAFiledataM
->
head
)
{
...
...
src/pio_posixfpguardsendrecv.c
View file @
cace7a29
...
...
@@ -73,9 +73,7 @@ static aFiledataPF *initAFiledataPF ( const char *key, size_t bs)
/* init output buffer */
if
(
ddebug
)
fprintf
(
stderr
,
"pe%d in initAFiledataPF(): name=%s, init output buffer
\n
"
,
pioinfo
->
rank
,
afd
->
name
);
xdebug
(
" name=%s, init output buffer"
,
afd
->
name
);
iret
=
dbuffer_init
(
&
(
afd
->
db1
),
afd
->
size
);
iret
+=
dbuffer_init
(
&
(
afd
->
db2
),
afd
->
size
);
...
...
@@ -86,10 +84,7 @@ static aFiledataPF *initAFiledataPF ( const char *key, size_t bs)
afd
->
db
=
afd
->
db1
;
/* open file */
if
(
ddebug
)
fprintf
(
stderr
,
"pe%d in initAFiledataPF(): name=%s, open file
\n
"
,
pioinfo
->
rank
,
afd
->
name
);
xdebug
(
"name=%s, open file"
,
afd
->
name
);
if
(
(
afd
->
fp
=
fopen
(
afd
->
name
,
"w"
))
==
NULL
)
{
...
...
@@ -131,9 +126,7 @@ int destroyAFiledataPF ( void *v )
/* close file */
if
(
ddebug
)
fprintf
(
stderr
,
"pe%d in destroyAFiledataPF(): name=%s, close file
\n
"
,
pioinfo
->
rank
,
afd
->
name
);
xdebug
(
"name=%s, close file"
,
afd
->
name
);
if
((
iret
=
fclose
(
afd
->
fp
))
==
EOF
)
{
...
...
@@ -143,9 +136,7 @@ int destroyAFiledataPF ( void *v )
/* file closed, cleanup */
if
(
ddebug
)
fprintf
(
stderr
,
"pe%d in destroyAFiledataPF(): name=%s, file closed, cleanup ...
\n
"
,
pioinfo
->
rank
,
afd
->
name
);
xdebug
(
"name=%s, file closed, cleanup ..."
,
afd
->
name
);
dbuffer_cleanup
(
&
(
afd
->
db1
));
dbuffer_cleanup
(
&
(
afd
->
db2
));
...
...
@@ -199,23 +190,20 @@ void fpgPOSIXFPGUARDSENDRECV ( int ncollectors )
long
amount
;
char
errorString
[
maxErrorString
];
if
(
ddebug
)
fprintf
(
stderr
,
"pe%d in fpGuard(), ncollectors=%d
\n
"
,
pioinfo
->
rank
,
ncollectors
);
xdebug
(
"ncollectors=%d"
,
ncollectors
);
bibBFiledataPF
=
queueInit
(
destroyBFiledataPF
,
NULL
);
for
(
;;
)
{
MPI_Probe
(
MPI_ANY_SOURCE
,
MPI_ANY_TAG
,
pioinfo
->
comm
,
&
status
);
xdebug
();
xmpi
(
MPI_Probe
(
MPI_ANY_SOURCE
,
MPI_ANY_TAG
,
pioinfo
->
comm
,
&
status
));
source
=
status
.
MPI_SOURCE
;
rtag
=
getTag
(
status
.
MPI_TAG
);
if
(
ddebug
)
fprintf
(
stderr
,
"pe%d in fpGuard (): receive message from source=%d, id=%d, command=%d ( %s )
\n
"
,
pioinfo
->
rank
,
source
,
rtag
->
id
,
rtag
->
command
,
command2charP
[
rtag
->
command
]);
xdebug
(
"receive message from source=%d, id=%d, command=%d ( %s )"
,
source
,
rtag
->
id
,
rtag
->
command
,
command2charP
[
rtag
->
command
]);
switch
(
rtag
->
command
)
{
...
...
@@ -235,22 +223,19 @@ void fpgPOSIXFPGUARDSENDRECV ( int ncollectors )
*
(
bfd
->
nfinished
+
source
)
=
false
;
if
(
ddebug
)
fprintf
(
stderr
,
"pe%d in fpGuard(), id=%d, command=%d ( %s ), send offset=%ld
\n
"
,
pioinfo
->
rank
,
rtag
->
id
,
rtag
->
command
,
command2charP
[
rtag
->
command
],
bfd
->
offset
);
xdebug
(
"id=%d, command=%d ( %s ), send offset=%ld"
,
rtag
->
id
,
rtag
->
command
,
command2charP
[
rtag
->
command
],
bfd
->
offset
);
MPI_Sendrecv
(
&
(
bfd
->
offset
),
1
,
MPI_LONG
,
source
,
status
.
MPI_TAG
,
&
amount
,
1
,
MPI_LONG
,
source
,
status
.
MPI_TAG
,
pioinfo
->
comm
,
&
status
);
xmpi
(
MPI_Sendrecv
(
&
(
bfd
->
offset
),
1
,
MPI_LONG
,
source
,
status
.
MPI_TAG
,
&
amount
,
1
,
MPI_LONG
,
source
,
status
.
MPI_TAG
,
pioinfo
->
comm
,
&
status
)
);
bfd
->
offset
+=
amount
;
if
(
ddebug
)
fprintf
(
stderr
,
"pe%d in fpguard(), id=%d, command=%d ( %s ), recv amount=%ld, set offset=%ld
\n
"
,
pioinfo
->
rank
,
rtag
->
id
,
rtag
->
command
,
command2charP
[
rtag
->
command
],
amount
,
bfd
->
offset
);
xdebug
(
"id=%d, command=%d ( %s ), recv amount=%ld, set offset=%ld"
,
rtag
->
id
,
rtag
->
command
,
command2charP
[
rtag
->
command
],
amount
,
bfd
->
offset
);
break
;
...
...
@@ -263,22 +248,19 @@ void fpgPOSIXFPGUARDSENDRECV ( int ncollectors )
xabort
(
errorString
);
}
if
(
ddebug
)
fprintf
(
stderr
,
"pe%d in fpGuard(), id=%d, command=%d ( %s ), send offset=%ld
\n
"
,
pioinfo
->
rank
,
rtag
->
id
,
rtag
->
command
,
command2charP
[
rtag
->
command
],
bfd
->
offset
);
xdebug
(
"id=%d, command=%d ( %s ), send offset=%ld"
,
rtag
->
id
,
rtag
->
command
,
command2charP
[
rtag
->
command
],
bfd
->
offset
);
MPI_Sendrecv
(
&
(
bfd
->
offset
),
1
,
MPI_LONG
,
source
,
status
.
MPI_TAG
,
&
amount
,
1
,
MPI_LONG
,
source
,
status
.
MPI_TAG
,
pioinfo
->
comm
,
&
status
);
xmpi
(
MPI_Sendrecv
(
&
(
bfd
->
offset
),
1
,
MPI_LONG
,
source
,
status
.
MPI_TAG
,
&
amount
,
1
,
MPI_LONG
,
source
,
status
.
MPI_TAG
,
pioinfo
->
comm
,
&
status
)
);
bfd
->
offset
+=
amount
;
if
(
ddebug
)
fprintf
(
stderr
,
"pe%d in fpguard(), id=%d, command=%d ( %s ), recv amount=%ld, set offset=%ld
\n
"
,
pioinfo
->
rank
,
rtag
->
id
,
rtag
->
command
,
command2charP
[
rtag
->
command
],
amount
,
bfd
->
offset
);
xdebug
(
"id=%d, command=%d ( %s ), recv amount=%ld, set offset=%ld"
,
rtag
->
id
,
rtag
->
command
,
command2charP
[
rtag
->
command
],
amount
,
bfd
->
offset
);
break
;
...
...
@@ -291,22 +273,19 @@ void fpgPOSIXFPGUARDSENDRECV ( int ncollectors )
xabort
(
errorString
);
}
if
(
ddebug
)
fprintf
(
stderr
,
"pe%d in fpGuard(): id=%d, command=%d ( %s )), send offset=%ld
\n
"
,
pioinfo
->
rank
,
rtag
->
id
,
rtag
->
command
,
command2charP
[
rtag
->
command
],
bfd
->
offset
);
xdebug
(
"id=%d, command=%d ( %s )), send offset=%ld"
,
rtag
->
id
,
rtag
->
command
,
command2charP
[
rtag
->
command
],
bfd
->
offset
);
MPI_Sendrecv
(
&
(
bfd
->
offset
),
1
,
MPI_LONG
,
source
,
status
.
MPI_TAG
,
&
amount
,
1
,
MPI_LONG
,
source
,
status
.
MPI_TAG
,
pioinfo
->
comm
,
&
status
);
xmpi
(
MPI_Sendrecv
(
&
(
bfd
->
offset
),
1
,
MPI_LONG
,
source
,
status
.
MPI_TAG
,
&
amount
,
1
,
MPI_LONG
,
source
,
status
.
MPI_TAG
,
pioinfo
->
comm
,
&
status
)
);
bfd
->
offset
+=
amount
;
if
(
ddebug
)
fprintf
(
stderr
,
"pe%d in fpguard(), id=%d, command=%d ( %s ), recv amount=%ld, set offset=%ld
\n
"
,
pioinfo
->
rank
,
rtag
->
id
,
rtag
->
command
,
command2charP
[
rtag
->
command
],
amount
,
bfd
->
offset
);
xdebug
(
"id=%d, command=%d ( %s ), recv amount=%ld, set offset=%ld"
,
rtag
->
id
,
rtag
->
command
,
command2charP
[
rtag
->
command
],
amount
,
bfd
->
offset
);
bfd
->
nfinished
[
source
]
=
true
;
...
...
@@ -350,15 +329,13 @@ void writePF ( aFiledataPF *afd, int id )
tag
=
setTag
(
id
,
afd
->
command
);
MPI_Sendrecv
(
&
amountL
,
1
,
MPI_LONG
,
pioinfo
->
specialRank
,
tag
,
&
offset
,
1
,
MPI_LONG
,
pioinfo
->
specialRank
,
tag
,
pioinfo
->
comm
,
&
status
);
if
(
ddebug
)
fprintf
(
stderr
,
"pe%d in writePF (), id=%d, command=%d, amount=%llu, send amountL=%ld, recv offset=%ld
\n
"
,
pioinfo
->
rank
,
id
,
afd
->
command
,
(
unsigned
long
long
)
amount
,
amountL
,
offset
);
xmpi
(
MPI_Sendrecv
(
&
amountL
,
1
,
MPI_LONG
,
pioinfo
->
specialRank
,
tag
,
&
offset
,
1
,
MPI_LONG
,
pioinfo
->
specialRank
,
tag
,
pioinfo
->
comm
,
&
status
));
xdebug
(
"id=%d, command=%d, amount=%llu, send amountL=%ld, recv offset=%ld"
,
id
,
afd
->
command
,
(
unsigned
long
long
)
amount
,
amountL
,
offset
);
/* write buffer */
...
...
@@ -375,12 +352,10 @@ void writePF ( aFiledataPF *afd, int id )
(
unsigned
long
long
)
written
);
xabort
(
errorString
);
}
if
(
ddebug
)
fprintf
(
stderr
,
"pe%d in writePF (), written %llu bytes in file %d with offset %ld
\n
"
,
pioinfo
->
rank
,
(
unsigned
long
long
)
written
,
id
,
offset
);
xdebug
(
"written %llu bytes in file %d with offset %ld"
,
(
unsigned
long
long
)
written
,
id
,
offset
);
/* change outputBuffer */
...
...
@@ -388,16 +363,12 @@ void writePF ( aFiledataPF *afd, int id )
if
(
afd
->
db
==
afd
->
db1
)
{
if
(
ddebug
)
fprintf
(
stderr
,
"pe%d in writePF (): id=%d, change to buffer 2 ...
\n
"
,
pioinfo
->
rank
,
id
);
xdebug
(
"id=%d, change to buffer 2 ..."
,
id
);
afd
->
db
=
afd
->
db2
;
}
else
{
if
(
ddebug
)
fprintf
(
stderr
,
"pe%d in writePF (): id=%d, change to buffer 1 ...
\n
"
,
pioinfo
->
rank
,
id
);
xdebug
(
"id=%d, change to buffer 1 ..."
,
id
);
afd
->
db
=
afd
->
db1
;
}
...
...
@@ -407,9 +378,9 @@ void writePF ( aFiledataPF *afd, int id )
/***************************************************************/
size_t
fwPOSIXFPGUARDSENDRECV
(
int
id
,
int
tsI
d
,
const
void
*
buffer
,
size_t
len
)
size_t
fwPOSIXFPGUARDSENDRECV
(
int
fileID
,
int
tsI
D
,
const
void
*
buffer
,
size_t
len
)
{
static
int
oldTsI
d
=
0
;
static
int
oldTsI
D
=
0
;
int
error
=
0
;
int
flush
=
0
;
int
filled
=
0
;
...
...
@@ -417,29 +388,26 @@ size_t fwPOSIXFPGUARDSENDRECV( int id, int tsId, const void *buffer, size_t len
node
*
curr
;
char
errorString
[
maxErrorString
];
flush
=
(
tsI
d
!=
oldTsI
d
)
?
1
:
0
;
flush
=
(
tsI
D
!=
oldTsI
D
)
?
1
:
0
;
if
(
flush
==
1
)
{
if
(
ddebug
)
fprintf
(
stderr
,
"pe%d in fileWriter (): tsId = %d, flush buffer
\n
"
,
pioinfo
->
rank
,
tsId
);
xdebug
(
"tsID = %d, flush buffer"
,
tsID
);
curr
=
bibAFiledataPF
->
head
;
while
(
curr
)
{
writePF
((
aFiledataPF
*
)
curr
->
val
,
curr
->
idx
);
curr
=
curr
->
next
;
}
oldTsI
d
=
tsI
d
;
MPI_Barrier
(
pioinfo
->
collectorComm
);
oldTsI
D
=
tsI
D
;
xmpi
(
MPI_Barrier
(
pioinfo
->
collectorComm
)
);
}
afd
=
(
aFiledataPF
*
)
queueIdx2val
(
bibAFiledataPF
,
id
);
afd
=
(
aFiledataPF
*
)
queueIdx2val
(
bibAFiledataPF
,
fileID
);
filled
=
dbuffer_push
(
afd
->
db
,
(
unsigned
char
*
)
buffer
,
len
);
if
(
ddebug
)
fprintf
(
stderr
,
"pe%d in fileWriter (): id = %d, tsId = %d, pushed data on buffer, filled = %d
\n
"
,
pioinfo
->
rank
,
id
,
tsId
,
filled
);
xdebug
(
"fileID = %d, tsID = %d, pushed data on buffer, filled = %d"
,
fileID
,
tsID
,
filled
);
if
(
filled
==
1
)
{
...
...
@@ -447,7 +415,7 @@ size_t fwPOSIXFPGUARDSENDRECV( int id, int tsId, const void *buffer, size_t len
error
=
filled
;
else
{
writePF
(
afd
,
id
);
writePF
(
afd
,
fileID
);
error
=
dbuffer_push
(
afd
->
db
,
(
unsigned
char
*
)
buffer
,
len
);
}
...
...
@@ -455,7 +423,7 @@ size_t fwPOSIXFPGUARDSENDRECV( int id, int tsId, const void *buffer, size_t len
if
(
error
==
1
)
{
sprintf
(
errorString
,
"did not succeed filling output buffer,
id=%d"
,
id
);
sprintf
(
errorString
,
"did not succeed filling output buffer,
fileID=%d"
,
fileID
);
xabort
(
errorString
);
}
...
...
@@ -469,9 +437,7 @@ int fcPOSIXFPGUARDSENDRECV ( int id )
aFiledataPF
*
afd
;
int
iret
;
if
(
ddebug
)
fprintf
(
stderr
,
"pe%d in fileClose (%d ): write buffer, close file and cleanup
\n
"
,
pioinfo
->
rank
,
id
);
xdebug
(
"write buffer, close file and cleanup"
,
id
);
afd
=
(
aFiledataPF
*
)
queueIdx2val
(
bibAFiledataPF
,
id
);
...
...
@@ -485,9 +451,7 @@ int fcPOSIXFPGUARDSENDRECV ( int id )
if
(
!
bibAFiledataPF
->
head
)
{
if
(
ddebug
)
fprintf
(
stderr
,
"pe%d in fileClose (): cleanup queue
\n
"
,
pioinfo
->
rank
);
xdebug
(
"cleanup queue"
);
queueDestroy
(
bibAFiledataPF
);
}
...
...
@@ -538,9 +502,8 @@ int fowPOSIXFPGUARDSENDRECV ( const char *filename )
if
(
firstOpen
)
{
if
(
ddebug
&&
pioinfo
->
rank
==
0
)
fprintf
(
stderr
,
"pe%d in pioFileOpenW(): name=%s, broadcast buffersize to collectors ...
\n
"
,
pioinfo
->
rank
,
filename
);
xdebug
(
"name=%s, broadcast buffersize to collectors ..."
,
filename
);
bcastRank
=
0
;
bcastComm
=
pioinfo
->
collectorComm
;
...
...
@@ -553,7 +516,7 @@ int fowPOSIXFPGUARDSENDRECV ( const char *filename )
buffersize
=
initial_buffersize
;
}
MPI_Bcast
(
&
buffersize
,
1
,
MPI_LONG
,
0
,
bcastComm
);
xmpi
(
MPI_Bcast
(
&
buffersize
,
1
,
MPI_LONG
,
0
,
bcastComm
)
);
firstOpen
=
false
;
}
...
...
@@ -570,10 +533,8 @@ int fowPOSIXFPGUARDSENDRECV ( const char *filename )
xabort
(
errorString
);
}
if
(
ddebug
)
fprintf
(
stderr
,
"pe%d in pioFileOpenW(): name=%s, init and enqueued aFiledataPF, return id = %d
\n
"
,
pioinfo
->
rank
,
filename
,
id
);
xdebug
(
"name=%s, init and enqueued aFiledataPF, return id = %d"
,
filename
,
id
);
return
id
;
}
...
...
@@ -587,26 +548,21 @@ int initPOSIXFPGUARDSENDRECV ()
int
ioRanks
[
1
];
if
(
pioinfo
->
size
<
2
)
xabort
(
"usage: #pes/#nodes >= 2"
);
if
(
ddebug
)
fprintf
(
stderr
,
"pe%d in initPOSIXFPGUARDSENDRECV()
\n
"
,
pioinfo
->
rank
);
xdebug
();
ncollectors
=
pioinfo
->
size
-
1
;
pioinfo
->
specialRank
=
pioinfo
->
size
-
1
;
ioRanks
[
0
]
=
pioinfo
->
specialRank
;
MPI_Comm_group
(
pioinfo
->
comm
,
&
streamGroup
);
MPI_Group_excl
(
streamGroup
,
1
,
ioRanks
,
&
collectorGroup
);
MPI_Comm_create
(
pioinfo
->
comm
,
collectorGroup
,
&
pioinfo
->
collectorComm
);
xmpi
(
MPI_Comm_group
(
pioinfo
->
comm
,
&
streamGroup
)
);
xmpi
(
MPI_Group_excl
(
streamGroup
,
1
,
ioRanks
,
&
collectorGroup
)
);
xmpi
(
MPI_Comm_create
(
pioinfo
->
comm
,
collectorGroup
,
&
pioinfo
->
collectorComm
)
);
if
(
pioinfo
->
rank
==
pioinfo
->
specialRank
)
{
if
(
ddebug
)
fprintf
(
stderr
,
"pe%d in initPOSIXFPGUARDSENDREC(): Call fpGuard(), ncollectors = %d ...
\n
"
,
pioinfo
->
rank
,
ncollectors
);
xdebug
(
"Call fpGuard(), ncollectors = %d ..."
,
ncollectors
);
fpgPOSIXFPGUARDSENDRECV
(
ncollectors
);
collectingData
=
0
;
...
...
src/pio_queue.c
View file @
cace7a29
...
...
@@ -12,16 +12,13 @@