Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
L
libcdi
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Container Registry
Model registry
Operate
Environments
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
mpim-sw
libcdi
Commits
e2395682
Commit
e2395682
authored
8 years ago
by
Thomas Jahns
Committed by
Sergey Kosukhin
2 years ago
Browse files
Options
Downloads
Patches
Plain Diff
Use reliable file IDs in client part of PIO shared file pointer pwrite method.
parent
b279b2a8
No related branches found
Branches containing commit
No related tags found
Tags containing commit
2 merge requests
!34
Version 2.2.0
,
!13
Consolidation with CDI-PIO (develop)
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
src/pio_posixfpguardsendrecv.c
+59
-61
59 additions, 61 deletions
src/pio_posixfpguardsendrecv.c
with
59 additions
and
61 deletions
src/pio_posixfpguardsendrecv.c
+
59
−
61
View file @
e2395682
...
...
@@ -45,30 +45,22 @@ struct mwFileBuf
{
char
*
name
;
struct
dBuffer
*
db
;
int
fd
;
int
fd
,
tsID
;
enum
IO_Server_command
command
;
int
tsID
,
fileID
;
};
static
int
lookupMwFBFileID
(
void
*
a
,
void
*
fileID
)
{
return
((
struct
mwFileBuf
*
)
a
)
->
fileID
==
(
int
)
(
intptr_t
)
fileID
;
}
static
listSet
*
openedFiles
;
static
struct
mwFileBuf
*
openFiles
;
static
unsigned
openFilesSize
,
openFilesFill
;
/***************************************************************/
static
struct
mwFileBuf
*
newMultiwriterFileBuf
(
const
char
*
key
,
size_t
bs
)
static
void
newMultiwriterFileBuf
(
struct
mwFileBuf
*
afd
,
const
char
*
filename
,
size_t
bs
)
{
size_t
len
=
strlen
(
key
);
struct
mwFileBuf
*
afd
=
Calloc
(
1
,
sizeof
(
*
afd
));
{
size_t
nameSize
=
len
+
1
;
size_t
nameSize
=
strlen
(
filename
)
+
1
;
char
*
name
=
afd
->
name
=
Malloc
(
nameSize
);
memcpy
(
afd
->
name
,
key
,
nameSize
);
memcpy
(
name
,
filename
,
nameSize
);
}
afd
->
tsID
=
0
;
...
...
@@ -86,14 +78,12 @@ newMultiwriterFileBuf(const char *key, size_t bs)
if
((
afd
->
fd
=
open
(
afd
->
name
,
O_CREAT
|
O_WRONLY
,
0666
))
==
-
1
)
xabort
(
"Failed to open %s"
,
afd
->
name
);
afd
->
command
=
IO_Open_file
;
return
afd
;
}
static
int
deleteMultiwriterFileBuf
(
void
*
v
)
deleteMultiwriterFileBuf
(
struct
mwFileBuf
*
afd
)
{
int
iret
=
0
;
struct
mwFileBuf
*
afd
=
(
struct
mwFileBuf
*
)
v
;
int
iret
;
/* close file */
xdebug
(
"name=%s, close file"
,
afd
->
name
);
...
...
@@ -104,19 +94,10 @@ deleteMultiwriterFileBuf(void *v)
dbuffer_cleanup
(
&
afd
->
db
);
Free
(
afd
->
name
);
afd
->
name
=
NULL
;
Free
(
afd
);
return
iret
;
}
static
bool
compareNamesAPF
(
void
*
v1
,
void
*
v2
)
{
struct
mwFileBuf
*
afd1
=
v1
,
*
afd2
=
v2
;
return
!
strcmp
(
afd1
->
name
,
afd2
->
name
);
}
//*******************************************************
#ifndef HAVE_PWRITE
static
ssize_t
...
...
@@ -137,26 +118,26 @@ writePF(struct mwFileBuf *afd)
ssize_t
written
;
MPI_Status
status
;
/* FIXME: pretend there's only one special rank for now */
int
file
_id
=
afd
->
f
ile
ID
;
int
file
ID
=
(
int
)
(
afd
-
openF
ile
s
)
;
int
specialRank
=
commInqSizePio
()
-
1
;
MPI_Comm
commPio
=
commInqCommPio
();
/* send buffersize, recv offset */
size_t
amount
=
dbuffer_data_size
(
afd
->
db
);
long
query
[
msgNumWords
]
=
{
file
_id
,
afd
->
command
,
(
long
)
amount
};
long
query
[
msgNumWords
]
=
{
file
ID
,
afd
->
command
,
(
long
)
amount
};
long
offset
;
xmpiStat
(
MPI_Sendrecv
(
query
,
msgNumWords
,
MPI_LONG
,
specialRank
,
collGuardTag
,
&
offset
,
1
,
MPI_LONG
,
specialRank
,
collGuardTag
,
commPio
,
&
status
),
&
status
);
xdebug
(
"id=%d, command=%d, amount=%zu, sent amount=%ld, recv offset=%ld"
,
file
_id
,
afd
->
command
,
amount
,
(
long
)
amount
,
offset
);
xdebug
(
"id=%d, command=%d, amount=%zu, sent amount=%ld, recv offset=%ld"
,
file
ID
,
afd
->
command
,
amount
,
(
long
)
amount
,
offset
);
bool
doTruncate
=
offset
<
0
;
offset
=
labs
(
offset
)
-
(
offset
<
0
);
/* write buffer */
if
((
written
=
pwrite
(
afd
->
fd
,
afd
->
db
->
buffer
,
amount
,
offset
))
!=
(
ssize_t
)
amount
)
xabort
(
"fileId=%d, expect to write %zu byte, written %zu byte"
,
file
_id
,
amount
,
written
);
xdebug
(
"written %zu bytes in file %d with offset %ld"
,
written
,
file
_id
,
offset
);
xabort
(
"fileId=%d, expect to write %zu byte, written %zu byte"
,
file
ID
,
amount
,
written
);
xdebug
(
"written %zu bytes in file %d with offset %ld"
,
written
,
file
ID
,
offset
);
if
(
doTruncate
)
ftruncate
(
afd
->
fd
,
offset
+
(
off_t
)
amount
);
/* change outputBuffer */
dbuffer_reset
(
afd
->
db
);
...
...
@@ -185,8 +166,8 @@ flushOp(struct mwFileBuf *a, int tsID)
static
size_t
fwPOSIXFPGUARDSENDRECV
(
int
fileID
,
const
void
*
buffer
,
size_t
len
,
int
tsID
)
{
int
error
=
0
;
struct
mwFileBuf
*
afd
=
listSetGet
(
open
ed
Files
,
lookupMwFBFileID
,
(
void
*
)
(
intptr_t
)
fileID
)
;
assert
(
fileID
>=
0
&&
(
size_t
)
fileID
<
openFilesSize
&&
openFiles
[
fileID
].
name
)
;
struct
mwFileBuf
*
afd
=
openFiles
+
fileID
;
bool
flush
=
tsID
!=
afd
->
tsID
;
...
...
@@ -201,6 +182,7 @@ fwPOSIXFPGUARDSENDRECV(int fileID, const void *buffer, size_t len, int tsID)
xdebug
(
"fileID = %d, tsID = %d, pushed data on buffer, filled = %d"
,
fileID
,
tsID
,
filled
);
int
error
=
0
;
if
(
filled
==
1
)
{
if
(
flush
)
...
...
@@ -221,32 +203,25 @@ fwPOSIXFPGUARDSENDRECV(int fileID, const void *buffer, size_t len, int tsID)
/***************************************************************/
static
int
fcPOSIXFPGUARDSENDRECV
(
int
id
)
fcPOSIXFPGUARDSENDRECV
(
int
fileID
)
{
xdebug
(
"write buffer, close file %d and cleanup"
,
id
);
assert
(
fileID
>=
0
&&
(
size_t
)
fileID
<
openFilesSize
&&
openFiles
[
fileID
].
name
);
struct
mwFileBuf
*
afd
=
openFiles
+
fileID
;
struct
mwFileBuf
*
afd
=
listSetGet
(
openedFiles
,
lo
okupMwFBFileID
,
(
void
*
)
(
intptr_t
)
id
);
xdebug
(
"write buffer
,
c
lo
se file %d and cleanup"
,
fileID
);
afd
->
command
=
IO_Close_file
;
writePF
(
afd
);
/* remove file element */
int
iret
=
listSetRemove
(
openedFiles
,
lookupMwFBFileID
,
(
void
*
)
(
intptr_t
)
i
d
);
int
iret
=
deleteMultiwriterFileBuf
(
af
d
);
/* make sure the file is closed on all collectors before proceeding */
xmpi
(
MPI_Barrier
(
commInqCommColl
()));
return
iret
;
}
/***************************************************************/
static
void
lookupMwFBFilename
(
void
*
q
,
void
*
nm
)
{
struct
mwFileBuf
*
afd
=
q
;
const
char
*
name
=
nm
;
if
(
!
strcmp
(
name
,
afd
->
name
))
xabort
(
"Filename %s has already been added to set
\n
"
,
name
);
}
static
int
fowPOSIXFPGUARDSENDRECV
(
const
char
*
filename
,
const
char
*
mode
)
...
...
@@ -255,23 +230,50 @@ fowPOSIXFPGUARDSENDRECV(const char *filename, const char *mode)
struct
cdiPioConf
*
conf
=
cdiPioGetConf
();
listSetForeach
(
openedFiles
,
lookupMwFBFilename
,
(
void
*
)
filename
);
/* init and add file element */
struct
mwFileBuf
*
afd
=
newMultiwriterFileBuf
(
filename
,
conf
->
writeAggBufLim
);
for
(
size_t
i
=
0
;
i
<
openFilesSize
;
++
i
)
if
(
openFiles
[
i
].
name
&&
!
strcmp
(
openFiles
[
i
].
name
,
filename
))
{
Warning
(
"filename %s is already open!"
" CDI-PIO does not support concurrent access"
" through different filehandles."
,
filename
);
return
CDI_EINVAL
;
}
size_t
fileID
=
SIZE_MAX
;
if
(
openFilesSize
==
openFilesFill
)
{
fileID
=
openFilesSize
;
if
(
openFilesSize
==
(
size_t
)
INT_MAX
+
1
)
return
CDI_ELIMIT
;
openFilesSize
=
openFilesSize
?
openFilesSize
*
2
:
4
;
if
(
openFilesSize
>
(
size_t
)
INT_MAX
+
1
)
openFilesSize
=
(
size_t
)
INT_MAX
+
1
;
openFiles
=
Realloc
(
openFiles
,
sizeof
(
*
openFiles
)
*
openFilesSize
);
for
(
size_t
i
=
fileID
;
i
<
openFilesSize
;
++
i
)
openFiles
[
i
].
name
=
NULL
;
}
else
{
for
(
size_t
i
=
0
;
i
<
openFilesSize
;
++
i
)
if
(
openFiles
[
i
].
name
==
NULL
)
{
fileID
=
i
;
break
;
}
}
struct
mwFileBuf
*
afd
=
openFiles
+
fileID
;
newMultiwriterFileBuf
(
afd
,
filename
,
conf
->
writeAggBufLim
);
int
file_id
;
if
((
afd
->
fileID
=
file_id
=
listSetAdd
(
openedFiles
,
afd
))
<
0
)
xabort
(
"filename %s not unique"
,
afd
->
name
);
xdebug
(
"name=%s, init and add struct mwFileBuf, return id = %d"
,
filename
,
file_id
);
xdebug
(
"name=%s, init and add struct mwFileBuf, return id = %zu"
,
filename
,
fileID
);
long
offset
;
int
specialRank
=
commInqSpecialRank
();
MPI_Status
status
;
MPI_Comm
commPio
=
commInqCommPio
();
long
query
[
msgNumWords
]
=
{
file
_id
,
afd
->
command
,
0L
};
long
query
[
msgNumWords
]
=
{
(
int
)
file
ID
,
afd
->
command
,
0L
};
xmpiStat
(
MPI_Sendrecv
(
query
,
msgNumWords
,
MPI_LONG
,
specialRank
,
collGuardTag
,
&
offset
,
1
,
MPI_LONG
,
specialRank
,
collGuardTag
,
commPio
,
&
status
),
&
status
);
afd
->
command
=
IO_Set_fp
;
return
file
_id
;
return
(
int
)
file
ID
;
}
/***************************************************************/
...
...
@@ -282,13 +284,10 @@ finalizePOSIXFPGUARDSENDRECV(void)
long
query
[
msgNumWords
]
=
{
0
,
IO_Finalize
,
0L
};
xmpi
(
MPI_Send
(
query
,
msgNumWords
,
MPI_LONG
,
commInqSpecialRank
(),
collGuardTag
,
commInqCommPio
()));
if
(
!
listSetIsEmpty
(
open
ed
Files
)
)
if
(
openFiles
Fill
)
xabort
(
"files still open at CDI-PIO finalization"
);
else
{
xdebug
(
"%s"
,
"destroy set"
);
listSetDelete
(
openedFiles
);
}
Free
(
openFiles
);
}
/***************************************************************/
...
...
@@ -310,7 +309,6 @@ initPOSIXFPGUARDSENDRECV(void)
namespaceSwitchSet
(
NSSWITCH_FILE_CLOSE
,
NSSW_FUNC
(
fcPOSIXFPGUARDSENDRECV
));
namespaceSwitchSet
(
NSSWITCH_FILE_WRITE
,
NSSW_FUNC
(
fwPOSIXFPGUARDSENDRECV
));
namespaceSwitchSet
(
cdiPioExtraNSKeys
[
cdiPioEKFileWritingFinalize
],
NSSW_FUNC
(
finalizePOSIXFPGUARDSENDRECV
));
openedFiles
=
listSetNew
(
deleteMultiwriterFileBuf
,
compareNamesAPF
);
}
}
...
...
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment