Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
mpim-sw
libcdi
Commits
16ba2827
Commit
16ba2827
authored
Oct 06, 2010
by
Deike Kleberg
Browse files
some things fixed
parent
9768c4be
Changes
11
Hide whitespace changes
Inline
Side-by-side
pioExamples/cdi_write_more_nodes.F90
View file @
16ba2827
...
...
@@ -90,7 +90,6 @@ prbl: DO i = 1,nProblems
load
(
currIndex
)
=
load
(
currIndex
)
+
sortedProblems
(
i
,
1
)
ENDDO
prbl
!write(*,*)
!write(*,'(1X,F9.2)') mean
!write(*,*)
...
...
@@ -102,7 +101,6 @@ ENDDO prbl
!write(*,*)
!WRITE(*,'(1X,10I4)') problemMapping
CONTAINS
RECURSIVE
SUBROUTINE
iQsort
(
A
)
...
...
@@ -167,7 +165,7 @@ END MODULE loadbalancing
INTEGER
,
PARAMETER
::
NLON
=
384
! Number of longitudes 384
INTEGER
,
PARAMETER
::
NLAT
=
192
! Number of latitudes 192
INTEGER
,
PARAMETER
::
NLEV
=
96
! Number of levels 96
INTEGER
,
PARAMETER
::
NTIME
=
10
! Number of time steps 124
INTEGER
,
PARAMETER
::
NTIME
=
10
! Number of time steps 124
INTEGER
,
PARAMETER
::
NFILES
=
10
! Number of files 7
INTEGER
,
PARAMETER
::
SIZESSTREAMS
(
NFILES
)
=
(/
35
,
35
,
35
,
35
,
35
,
&
35
,
35
,
35
,
35
,
35
/)
...
...
@@ -183,20 +181,23 @@ END MODULE loadbalancing
INTEGER
,
ALLOCATABLE
::
lstreamfirst
(
:
),
lstreamlast
(
:
)
INTEGER
,
ALLOCATABLE
::
varIDs
(
:,
:
)
INTEGER
::
pioComm
,
pioComm_NODE
,
rank
,
error
,
status
#ifndef NOMPI
INTEGER
::
pioComm
,
pioComm_NODE
,
rank
,
error
INTEGER
::
ncollectors
CHARACTER
(
len
=
8
)
::
name
#endif
INTEGER
::
status
INTEGER
::
gridID
,
zaxisID
,
taxisID
INTEGER
::
collectingData
,
ncollectors
INTEGER
::
collectingData
INTEGER
::
sID
,
tsID
,
i
,
j
,
nmiss
INTEGER
::
PTYPE
INTEGER
::
NNODES
,
NFILES_NODE
INTEGER
::
mapping
(
NFILES
),
mycolor
INTEGER
,
ALLOCATABLE
::
colors
(
:
)
INTEGER
,
ALLOCATABLE
::
SIZESSTREAMS_NODE
(
:
)
CHARACTER
::
arg
CHARACTER
(
len
=
8
),
ALLOCATABLE
::
FILENAMES_NODE
(
:
)
CHARACTER
(
len
=
8
)
::
name
REAL
(
dp
)
::
startTime
,
stopTime
REAL
(
dp
)
::
accumOpen
,
accumClose
,
accumWrite
...
...
@@ -249,7 +250,6 @@ END MODULE loadbalancing
CALL
initCoords
()
#ifndef NOMPI
ALLOCATE
(
colors
(
NNODES
),
stat
=
status
)
collectingData
=
pioInit
(
PTYPE
,
pioComm
,
mycolor
,
NNODES
,
&
pioComm_NODE
,
ncollectors
)
...
...
@@ -461,8 +461,6 @@ END MODULE loadbalancing
1
IF
(
ddebug
==
1
)
THEN
WRITE
(
*
,
*
)
'MAIN: pe'
,
rank
,
'in main() at label "finish"'
END
IF
DEALLOCATE
(
colors
,
stat
=
status
)
IF
(
rank
==
0
)
THEN
CALL
MPI_REDUCE
(
MPI_IN_PLACE
,
accumOpen
,
1
,
MPI_DOUBLE_PRECISION
,
&
...
...
@@ -505,11 +503,12 @@ END MODULE loadbalancing
CONTAINS
!********************************************
#ifndef NOMPI
SUBROUTINE
readArgs
(
argMode
,
argNodes
)
INTEGER
,
INTENT
(
OUT
)
::
argMode
,
argNodes
INTEGER
::
numarg
,
inum
,
length
CHARACTER
::
arg
numarg
=
command_argument_count
()
...
...
@@ -545,6 +544,7 @@ CONTAINS
CALL
MPI_ABORT
(
pioComm
,
1
,
error
)
END
SUBROUTINE
readArgs
#endif
!********************************************
...
...
@@ -586,7 +586,7 @@ CONTAINS
END
SUBROUTINE
initCoords
!********************************************
#ifndef NOMPI
SUBROUTINE
initLocalstream
(
npes
,
sizestream
,
ifirst
,
ilast
)
INTEGER
,
INTENT
(
IN
)
::
npes
,
sizestream
...
...
@@ -616,5 +616,5 @@ CONTAINS
END
IF
END
SUBROUTINE
initLocalstream
#endif
END
src/cdi.h
View file @
16ba2827
...
...
@@ -183,11 +183,11 @@ extern "C" {
/*#ifndef NOMPI*/
#define PIO_NONE 0
#define PIO_MPI_NONB 1
#define PIO_POSIX_
ASYNCH
2
#define PIO_POSIX_
FPGUARD_SENDRECV
3
#define PIO_POSIX_FPGUARD_
THREAD
4
#define PIO_POSIX_
NONB
2
#define PIO_POSIX_
ASYNCH
3
#define PIO_POSIX_FPGUARD_
SENDRECV
4
#define PIO_POSIX_FPGUARD_THREAD_REFUSE 5
#define PIO_POSIX_
NONB
6
#define PIO_POSIX_
FPGUARD_THREAD
6
int
pioInit
(
int
,
int
,
int
*
,
int
,
int
*
,
int
*
);
void
pioFinalize
(
void
);
...
...
src/cdi.inc
View file @
16ba2827
...
...
@@ -4,7 +4,7 @@
!
!
Author
:
!
-------
!
Uwe
Schulzweida
,
MPI
-
MET
,
Hamburg
,
August
2010
!
Uwe
Schulzweida
,
MPI
-
MET
,
Hamburg
,
October
2010
!
INTEGER
CDI_UNDEFID
...
...
@@ -309,16 +309,16 @@
PARAMETER
(
PIO_NONE
=
0
)
INTEGER
PIO_MPI_NONB
PARAMETER
(
PIO_MPI_NONB
=
1
)
INTEGER
PIO_POSIX_NONB
PARAMETER
(
PIO_POSIX_NONB
=
2
)
INTEGER
PIO_POSIX_ASYNCH
PARAMETER
(
PIO_POSIX_ASYNCH
=
2
)
PARAMETER
(
PIO_POSIX_ASYNCH
=
3
)
INTEGER
PIO_POSIX_FPGUARD_SENDRECV
PARAMETER
(
PIO_POSIX_FPGUARD_SENDRECV
=
3
)
INTEGER
PIO_POSIX_FPGUARD_THREAD
PARAMETER
(
PIO_POSIX_FPGUARD_THREAD
=
4
)
PARAMETER
(
PIO_POSIX_FPGUARD_SENDRECV
=
4
)
INTEGER
PIO_POSIX_FPGUARD_THREAD_REFUSE
PARAMETER
(
PIO_POSIX_FPGUARD_THREAD_REFUSE
=
5
)
INTEGER
PIO_POSIX_
NONB
PARAMETER
(
PIO_POSIX_
NONB
=
6
)
INTEGER
PIO_POSIX_
FPGUARD_THREAD
PARAMETER
(
PIO_POSIX_
FPGUARD_THREAD
=
6
)
INTEGER
pioInit
!
(
INTEGER
,
!
INTEGER
,
...
...
src/pio.c
View file @
16ba2827
#ifndef NOMPI
#include
<stdbool.h>
#include
<stdio.h>
#include
<stdlib.h>
#include
<string.h>
#include
<unistd.h>
#ifndef NOMPI
#include
"mpi.h"
#include
"cdi.h"
#include
"pio.h"
#include
"pio_impl.h"
#endif
bool
ddebug
=
false
;
#ifndef NOMPI
char
*
command2charP
[
6
]
=
{
"IO_Open_file"
,
"IO_Close_file"
,
"IO_Get_fp"
,
"IO_Set_fp"
,
"IO_Send_buffer"
,
"IO_Finalize"
};
...
...
@@ -22,7 +27,7 @@ long initial_buffersize = 16 * 1024 * 1024;
/* 16 * 1024; */
/* 4 * 1024; */
int
maxPtype
=
6
;
int
maxPtype
=
4
;
int
maxNnodes
=
249
;
int
tagKey
=
100
;
...
...
@@ -87,7 +92,22 @@ void check_mpi_status ( int line, MPI_Comm *comm, MPI_Status *status, int iret )
return
;
}
/***************************************************************/
/***************************************************************/
void
*
xmalloc
(
size_t
size
)
{
void
*
value
=
malloc
(
size
);
if
(
value
==
NULL
)
{
fprintf
(
stderr
,
"ERROR: malloc didn't succeed, memory exausted
\n
"
);
perror
(
"xmalloc"
);
MPI_Abort
(
MPI_COMM_WORLD
,
1
);
}
return
value
;
}
/***************************************************************/
int
setTag
(
int
ID
,
int
sc
)
{
...
...
@@ -100,7 +120,7 @@ rTag * getTag ( int tag )
{
rTag
*
rtag
;
rtag
=
(
rTag
*
)
malloc
(
sizeof
(
rTag
));
rtag
=
(
rTag
*
)
x
malloc
(
sizeof
(
rTag
));
rtag
->
id
=
tag
/
tagKey
;
rtag
->
command
=
tag
%
tagKey
;
...
...
@@ -222,15 +242,20 @@ void setPioCommunicator ( MPI_Comm *myComm, MPI_Comm commF2C, int *color,
{
int
size
,
rank
,
len
,
npes_node
,
key
,
test
,
i
,
j
;
char
*
myHost
,
**
allHosts
,
*
curr
;
char
*
myHost
,
**
allHosts
,
*
allHosts0
,
*
curr
;
char
hostname
[
MPI_MAX_PROCESSOR_NAME
+
1
];
MPI_Comm_size
(
commF2C
,
&
size
);
MPI_Comm_rank
(
commF2C
,
&
rank
);
check_mpi
(
__LINE__
,
MPI_Comm_size
(
commF2C
,
&
size
));
check_mpi
(
__LINE__
,
MPI_Comm_rank
(
commF2C
,
&
rank
));
myHost
=
(
char
*
)
malloc
(
(
MPI_MAX_PROCESSOR_NAME
+
1
)
*
sizeof
(
char
));
myHost
=
(
char
*
)
x
malloc
(
MPI_MAX_PROCESSOR_NAME
*
sizeof
(
char
));
MPI_Get_processor_name
(
myHost
,
&
len
);
memset
(
myHost
,
0
,
MPI_MAX_PROCESSOR_NAME
*
sizeof
(
char
));
check_mpi
(
__LINE__
,
MPI_Get_processor_name
(
myHost
,
&
len
));
if
(
ddebug
)
{
...
...
@@ -239,33 +264,21 @@ void setPioCommunicator ( MPI_Comm *myComm, MPI_Comm commF2C, int *color,
fprintf
(
stdout
,
"pe%d: myHost = %s
\n
"
,
rank
,
hostname
);
}
allHosts
=
(
char
**
)
xmalloc
(
size
*
sizeof
(
char
*
));
allHosts
[
0
]
=
(
char
*
)
xmalloc
(
size
*
MPI_MAX_PROCESSOR_NAME
*
sizeof
(
char
));
allHosts0
=
allHosts
[
0
];
allHosts
=
(
char
**
)
malloc
(
size
*
sizeof
(
char
*
));
allHosts
[
0
]
=
(
char
*
)
malloc
(
size
*
MPI_MAX_PROCESSOR_NAME
*
sizeof
(
char
));
for
(
i
=
1
;
i
<
size
;
i
++
)
allHosts
[
i
]
=
allHosts
[
0
]
+
i
*
MPI_MAX_PROCESSOR_NAME
;
MPI_Allgather
(
myHost
,
MPI_MAX_PROCESSOR_NAME
,
MPI_CHAR
,
&
(
allHosts
[
0
][
0
]
),
MPI_MAX_PROCESSOR_NAME
,
MPI_CHAR
,
commF2C
);
if
(
ddebug
)
for
(
i
=
0
;
i
<
size
;
i
++
)
{
strcpy
(
hostname
,
&
(
allHosts
[
i
][
0
]
));
fprintf
(
stdout
,
"pe%d: before qsort, allHosts[%d][0] = %s
\n
"
,
rank
,
i
,
hostname
);
}
check_mpi
(
__LINE__
,
MPI_Allgather
(
myHost
,
MPI_MAX_PROCESSOR_NAME
,
MPI_CHAR
,
&
(
allHosts
[
0
][
0
]
),
MPI_MAX_PROCESSOR_NAME
,
MPI_CHAR
,
commF2C
));
qsort
(
allHosts
,
size
,
sizeof
(
char
*
),
cmpr
);
if
(
ddebug
)
for
(
i
=
0
;
i
<
size
;
i
++
)
{
strcpy
(
hostname
,
&
(
allHosts
[
i
][
0
]
));
fprintf
(
stdout
,
"pe%d: after qsort, allHosts[%d][0] = %s
\n
"
,
rank
,
i
,
hostname
);
}
*
color
=
0
;
i
=
0
;
...
...
@@ -287,28 +300,26 @@ void setPioCommunicator ( MPI_Comm *myComm, MPI_Comm commF2C, int *color,
npes_node
=
size
/
nnodes
;
key
=
rank
%
npes_node
;
MPI_Comm_split
(
commF2C
,
*
color
,
key
,
myComm
);
MPI_Errhandler_set
(
*
myComm
,
MPI_ERRORS_RETURN
);
if
(
ddebug
)
fprintf
(
stdout
,
"pe%d, after mpi_comm_split, color=%d
\n
"
,
rank
,
*
color
);
check_mpi
(
__LINE__
,
MPI_Comm_split
(
commF2C
,
*
color
,
key
,
myComm
));
check_mpi
(
__LINE__
,
MPI_Errhandler_set
(
*
myComm
,
MPI_ERRORS_RETURN
));
*
myComm2F
=
0
;
*
myComm2F
=
MPI_COMM_NULL
;
if
((
*
myComm2F
=
MPI_Comm_c2f
(
*
myComm
))
==
0
)
{
fprintf
(
stderr
,
"mpi_comm_c2f didn't succeed
\n
"
);
MPI_Abort
(
commF2C
,
1
);
}
if
(
ddebug
)
fprintf
(
stdout
,
"pe%d, after mpi_comm_c2f, color=%d
\n
"
,
rank
,
*
color
);
free
(
allHosts
[
0
]
);
free
(
allHosts0
);
free
(
allHosts
);
free
(
myHost
);
if
(
ddebug
)
fprintf
(
stdout
,
"pe%d in setPioCommunicator, color=%d, before return
\n
"
,
rank
,
*
color
);
return
;
}
...
...
@@ -321,53 +332,54 @@ int pioInit ( int ptype, int commF, int *color, int nnodes, int *pioComm, int *n
{
int
collectingData
=
1
;
int
rank
;
#ifndef NOMPI
MPI_Comm
comm
;
int
size
,
rank
;
comm
=
MPI_COMM_NULL
;
if
(
ptype
<
0
||
ptype
>
maxPtype
)
if
(
(
comm
=
MPI_Comm_f2c
((
MPI_Fint
)
commF
))
==
NULL
)
{
fprintf
(
stderr
,
"1. arg of pioInit is no valid modus
\n
"
);
MPI_Abort
(
MPI_COMM_WORLD
,
1
);
fprintf
(
stderr
,
"2. arg of pioInit is no valid Fortran handle "
"to a mpi communicator
\n
"
);
MPI_Abort
(
comm
,
1
);
}
if
((
comm
=
MPI_Comm_f2c
((
MPI_Fint
)
commF
))
==
NULL
)
check_mpi
(
__LINE__
,
MPI_Comm_size
(
comm
,
&
size
));
check_mpi
(
__LINE__
,
MPI_Comm_rank
(
comm
,
&
rank
));
if
(
ptype
<
0
||
ptype
>
maxPtype
)
{
fprintf
(
stderr
,
"2. arg of pioInit is no valid Fortran handle to a mpi communicator
\n
"
);
MPI_Abort
(
MPI_COMM_WORLD
,
1
);
if
(
rank
==
0
)
fprintf
(
stderr
,
"1. arg of pioInit is no valid modus
\n
"
);
MPI_Abort
(
comm
,
1
);
}
if
(
nnodes
<
1
||
nnodes
>
maxNnodes
)
{
fprintf
(
stderr
,
"4. arg of pioInit is no valid no for nnodes
\n
"
);
MPI_Abort
(
MPI_COMM_WORLD
,
1
);
if
(
rank
==
0
)
fprintf
(
stderr
,
"4. arg of pioInit is no valid no for nnodes
\n
"
);
MPI_Abort
(
comm
,
1
);
}
if
(
!
((
ptype
==
PIO_NONE
&&
nnodes
==
1
)
||
(
ptype
==
PIO_MPI_NONB
)
||
(
ptype
==
PIO_POSIX_FPGUARD_SENDRECV
&&
nnodes
==
1
)
||
(
ptype
==
PIO_POSIX_NONB
&&
nnodes
==
1
)))
{
MPI_Comm_rank
(
comm
,
&
rank
);
if
(
ptype
==
PIO_NONE
&&
(
size
!=
1
||
nnodes
!=
1
))
{
if
(
rank
==
0
)
fprintf
(
stderr
,
"
\n\n
pioInit(): arg(1) PTYPE=%d and arg(4) NNODES=%d are not valid. \
\n
Possible combinations are:\
\n
ptype = PIO_NONE (:=0),
\t\t
nnodes = 1\
\n
ptype = PIO_MPI_NONB (:=1)\
\n
ptype = PIO_POSIX_FPGUARD_SENDRECV (:=3),
\t
nnodes = 1\
\n
ptype = PIO_POSIX_NONB (:=6),
\t\t
nnodes = 1
\n\n\n
"
,
ptype
,
nnodes
);
fprintf
(
stderr
,
"
\n\n
pioInit(): combination of "
"arg(1) PTYPE=%d, arg(4) NNODES=%d and npe=%d is not valid.
\n
"
"Possible modus is:
\n
"
"ptype = PIO_NONE (:=0),
\t\t
nnodes = 1,
\t\t
npe = 1
\n\n
"
,
ptype
,
nnodes
,
size
);
MPI_Abort
(
comm
,
1
);
}
if
((
pioinfo
=
(
pioInfo
*
)
malloc
(
sizeof
(
pioInfo
)))
==
NULL
)
{
fprintf
(
stderr
,
"malloc of pioInfo didn't succeed
\n
"
);
MPI_Abort
(
MPI_COMM_WORLD
,
1
);
}
pioinfo
=
(
pioInfo
*
)
xmalloc
(
sizeof
(
pioInfo
));
pioinfo
->
type
=
ptype
;
pioinfo
->
collectorComm
=
MPI_COMM_NULL
;
...
...
@@ -376,18 +388,10 @@ int pioInit ( int ptype, int commF, int *color, int nnodes, int *pioComm, int *n
MPI_Comm_rank
(
pioinfo
->
comm
,
&
(
pioinfo
->
rank
));
MPI_Comm_size
(
pioinfo
->
comm
,
&
(
pioinfo
->
size
));
if
(
pioinfo
->
type
==
PIO_NONE
&&
pioinfo
->
size
!=
1
)
{
fprintf
(
stderr
,
"PTYPE should be set to a parallel I/O type or npe should be 1."
);
fprintf
(
stderr
,
"PTYPE = %d, npe = %d
\n
"
,
pioinfo
->
type
,
pioinfo
->
size
);
MPI_Abort
(
pioinfo
->
comm
,
1
);
}
if
(
ddebug
&&
pioinfo
->
rank
==
0
)
fprintf
(
stdout
,
"pe%d in pioDefPtype(), ptype=%d, initial_buffersize=%ld: init pioinfo ...
\n
"
,
"pe%d in pioDefPtype(), ptype=%d, initial_buffersize=%ld: "
"after init pioinfo ...
\n
"
,
pioinfo
->
rank
,
pioinfo
->
type
,
initial_buffersize
);
switch
(
pioinfo
->
type
)
...
...
@@ -407,13 +411,16 @@ int pioInit ( int ptype, int commF, int *color, int nnodes, int *pioComm, int *n
case
PIO_POSIX_FPGUARD_THREAD_REFUSE
:
collectingData
=
initPOSIXFPGUARDTHREADREFUSE
(
ncollectors
);
break
;
case
PIO_POSIX_NONB
:
case
PIO_POSIX_NONB
:
collectingData
=
initPOSIXNONB
(
ncollectors
);
break
;
}
#endif
if
(
ddebug
)
fprintf
(
stdout
,
"pe in pioinit out
\n
"
);
return
collectingData
;
}
...
...
src/pio_mpinonb.c
View file @
16ba2827
...
...
@@ -49,11 +49,11 @@ static aFiledataM *initAFiledataMPINONB ( const char *filename, size_t bs )
fprintf
(
stdout
,
"pe%d initAFiledataMPINONB (): filename=%s, buffersize=%zu, in
\n
"
,
pioinfo
->
rank
,
filename
,
bs
);
of
=
(
aFiledataM
*
)
malloc
(
sizeof
(
aFiledataM
));
of
=
(
aFiledataM
*
)
x
malloc
(
sizeof
(
aFiledataM
));
memset
(
of
,
0
,
sizeof
(
aFiledataM
));
len
=
strlen
(
filename
);
of
->
name
=
(
char
*
)
malloc
((
len
)
*
sizeof
(
char
));
of
->
name
=
(
char
*
)
x
malloc
((
len
)
*
sizeof
(
char
));
strcpy
(
of
->
name
,
filename
);
of
->
size
=
bs
;
...
...
src/pio_posixasynch.c
View file @
16ba2827
...
...
@@ -37,8 +37,6 @@ extern pioInfo *pioinfo;
extern
char
*
token
;
static
MPI_Request
requestA
=
MPI_REQUEST_NULL
;
typedef
struct
{
char
*
name
;
...
...
@@ -47,6 +45,7 @@ typedef struct
struct
dBuffer
*
db2
;
struct
dBuffer
*
db
;
IO_Server_command
command
;
MPI_Request
request
;
}
aFiledataPA
;
typedef
struct
...
...
@@ -77,14 +76,14 @@ static aFiledataPA *initAFiledataPA ( const char *filename, size_t bs )
int
status
;
if
(
ddebug
)
fprintf
(
stdout
,
"pe%d initAFiledataPA (): filename=%s, buffersize=%
d
, in
\n
"
,
fprintf
(
stdout
,
"pe%d initAFiledataPA (): filename=%s, buffersize=%
zu
, in
\n
"
,
pioinfo
->
rank
,
filename
,
bs
);
afd
=
(
aFiledataPA
*
)
malloc
(
sizeof
(
aFiledataPA
));
afd
=
(
aFiledataPA
*
)
x
malloc
(
sizeof
(
aFiledataPA
));
memset
(
afd
,
0
,
sizeof
(
aFiledataPA
));
len
=
strlen
(
filename
);
afd
->
name
=
(
char
*
)
malloc
((
len
)
*
sizeof
(
char
));
afd
->
name
=
(
char
*
)
x
malloc
((
len
)
*
sizeof
(
char
));
strcpy
(
afd
->
name
,
filename
);
afd
->
size
=
bs
;
...
...
@@ -107,6 +106,7 @@ static aFiledataPA *initAFiledataPA ( const char *filename, size_t bs )
afd
->
db
=
afd
->
db1
;
afd
->
command
=
IO_Open_file
;
afd
->
request
=
MPI_REQUEST_NULL
;
if
(
ddebug
)
fprintf
(
stdout
,
"pe%d initAFiledataPA (), enqueued name=%s, return
\n
"
,
...
...
@@ -123,13 +123,13 @@ static bFiledataPA *initBFiledataPA ( char *filename,
int
i
;
if
(
ddebug
)
fprintf
(
stdout
,
"### pe%d initBFiledataPA (): filename=%s, buffersize=%
d
, ncollectors=%d, nPrefetchStreams=%d, in
\n
"
,
fprintf
(
stdout
,
"### pe%d initBFiledataPA (): filename=%s, buffersize=%
zu
, ncollectors=%d, nPrefetchStreams=%d, in
\n
"
,
pioinfo
->
rank
,
filename
,
bs
,
nc
,
nPrefStreams
);
bfd
=
(
bFiledataPA
*
)
malloc
(
sizeof
(
bFiledataPA
));
bfd
=
(
bFiledataPA
*
)
x
malloc
(
sizeof
(
bFiledataPA
));
memset
(
bfd
,
0
,
sizeof
(
bFiledataPA
));
bfd
->
name
=
(
char
*
)
malloc
((
strlen
(
filename
))
*
sizeof
(
char
));
bfd
->
name
=
(
char
*
)
x
malloc
((
strlen
(
filename
))
*
sizeof
(
char
));
strcpy
(
bfd
->
name
,
filename
);
bfd
->
size
=
bs
;
...
...
@@ -142,11 +142,7 @@ static bFiledataPA *initBFiledataPA ( char *filename,
dbuffer_init
(
&
(
bfd
->
fb
),
(
size_t
)(
nPrefStreams
*
bfd
->
size
));
if
((
bfd
->
ctrlBlks
=
(
struct
aiocb
*
)
malloc
(
nPrefStreams
*
sizeof
(
struct
aiocb
)))
==
NULL
)
{
fprintf
(
stderr
,
"Cannot allocate AIO control blocks
\n
"
);
MPI_Abort
(
pioinfo
->
comm
,
1
);
}
bfd
->
ctrlBlks
=
(
struct
aiocb
*
)
xmalloc
(
nPrefStreams
*
sizeof
(
struct
aiocb
));
for
(
i
=
0
;
i
<
nPrefStreams
;
i
++
)
{
...
...
@@ -160,7 +156,7 @@ static bFiledataPA *initBFiledataPA ( char *filename,
bfd
->
prefIndex
=
0
;
bfd
->
offset
=
0
;
bfd
->
finished
=
false
;
bfd
->
nfinished
=
(
bool
*
)
malloc
(
nc
*
sizeof
(
bool
));
bfd
->
nfinished
=
(
bool
*
)
x
malloc
(
nc
*
sizeof
(
bool
));
for
(
i
=
0
;
i
<
nc
;
i
++
)
*
(
bfd
->
nfinished
+
i
)
=
true
;
...
...
@@ -179,10 +175,13 @@ int destroyAFiledataPA ( void *v )
{
int
iret
=
0
;
aFiledataPA
*
afd
=
(
aFiledataPA
*
)
v
;
MPI_Status
status
;
if
(
ddebug
)
fprintf
(
stdout
,
"pe%d destroyAFiledataPA(): name=%s, cleanup, in
\n
"
,
pioinfo
->
rank
,
afd
->
name
);
MPI_Wait
(
&
(
afd
->
request
),
&
status
);
dbuffer_cleanup
(
&
(
afd
->
db1
));
dbuffer_cleanup
(
&
(
afd
->
db2
));
...
...
@@ -314,7 +313,7 @@ void writePA ( bFiledataPA *bfd, long amount )
bfd
->
ctrlBlks
[
bfd
->
currOpIndex
].
aio_offset
=
bfd
->
offset
;
if
(
ddebug
)
fprintf
(
stdout
,
"### pe%d writePA before aio_write(), file %s, aio_nbytes=%
d
, aio_offset=%
ld
\n
"
,
fprintf
(
stdout
,
"### pe%d writePA before aio_write(), file %s, aio_nbytes=%
zu
, aio_offset=%
zu
\n
"
,
pioinfo
->
rank
,
bfd
->
name
,
bfd
->
ctrlBlks
[
bfd
->
currOpIndex
].
aio_nbytes
,
bfd
->
ctrlBlks
[
bfd
->
currOpIndex
].
aio_offset
);
...
...
@@ -327,7 +326,7 @@ void writePA ( bFiledataPA *bfd, long amount )
accumWrite
+=
(
MPI_Wtime
()
-
startTime
);
if
(
ddebug
)
fprintf
(
stdout
,
"### pe%d writePA after aio_write(), file %s, aio_nbytes=%
d
, aio_offset=%
ld
, iret=aio_write()=%d
\n
"
,
fprintf
(
stdout
,
"### pe%d writePA after aio_write(), file %s, aio_nbytes=%
zu
, aio_offset=%
zu
, iret=aio_write()=%d
\n
"
,
pioinfo
->
rank
,
bfd
->
name
,
bfd
->
ctrlBlks
[
bfd
->
currOpIndex
].
aio_nbytes
,
bfd
->
ctrlBlks
[
bfd
->
currOpIndex
].
aio_offset
,
iret
);
...
...
@@ -388,7 +387,8 @@ void pwPOSIXASYNCH ( int ncollectors )
queue
*
bibBFiledataPA
;
long
amount
,
buffersize
;
char
*
messageBuffer
,
*
pMB
,
*
filename
,
*
temp
;
int
messagesize
,
source
,
tag
,
i
,
len
,
id
;
int
messagesize
,
source
,
tag
,
i
,
id
;
size_t
len
;
rTag
*
rtag
;
MPI_Status
status
;
...
...
@@ -399,12 +399,12 @@ void pwPOSIXASYNCH ( int ncollectors )
bibBFiledataPA
=
queueInit
(
destroyBFiledataPA
,
compareNamesBPA
);
for
(
;;
)
{
free
(
messageBuffer
);
{
MPI_Probe
(
MPI_ANY_SOURCE
,
MPI_ANY_TAG
,
pioinfo
->
comm
,
&
status
);
source
=
status
.
MPI_SOURCE
;
tag
=
status
.
MPI_TAG
;
rtag
=
getTag
(
tag
);
rtag
=
getTag
(
tag
);
MPI_Get_count
(
&
status
,
MPI_CHAR
,
&
messagesize
);
...
...
@@ -417,7 +417,15 @@ void pwPOSIXASYNCH ( int ncollectors )
switch
(
rtag
->
command
)
{
case
IO_Open_file
:
messageBuffer
=
(
char
*
)
malloc
(
messagesize
*
sizeof
(
char
));
//messageBuffer = ( char *) xmalloc ( messagesize * sizeof ( char ));
if
((
messageBuffer
=
(
char
*
)
malloc
(
messagesize
*
sizeof
(
char
)))
==
NULL
)
{
perror
(
"### malloc of messageBuffer failed"
);
MPI_Abort
(
pioinfo
->
comm
,
1
);
}
pMB
=
messageBuffer
;
MPI_Recv
(
messageBuffer
,
messagesize
,
MPI_CHAR
,
source
,
...
...
@@ -432,7 +440,7 @@ void pwPOSIXASYNCH ( int ncollectors )
if
(
ddebug
)
fprintf
(
stdout
,
"### pe%d pwPOSIXASYNCH () case %s, filename=%s, buffersize=%d, amount=%d
\n
"
,
"### pe%d pwPOSIXASYNCH () case %s, filename=%s, buffersize=%
l
d, amount=%
l
d
\n
"
,
pioinfo
->
rank
,
command2charP
[
rtag
->
command
],
filename
,