Skip to content
Snippets Groups Projects
Commit 0ed83c09 authored by Thomas Jahns's avatar Thomas Jahns :cartwheel: Committed by Sergey Kosukhin
Browse files

Extend callbacks to streamClose Post action.

parent 91736fc5
No related branches found
No related tags found
No related merge requests found
......@@ -136,13 +136,38 @@ void cdiPioConfSetCSRole(int confResH, int CSRole);
int cdiPioConfGetCSRole(int confResH);
/* cdiPioConfSetPostCommSetupActions: set function to be called after
* setup of client/server communications of configuration object */
* setup of client/server communications of configuration object.
* Deprecated: use cdiPioConfSetCallBackActions with
* trigger == CDIPIO_CALLBACK_POSTCOMMSETUP in new programs */
void cdiPioConfSetPostCommSetupActions(int confResH, void (*postCommSetupActions)(void));
/* cdiPioConfGetPostCommSetupActions: get function to be called after
* setup of client/server communications from configuration object */
* setup of client/server communications from configuration object.
* Deprecated: use cdiPioConfGetCallBackActions with
* trigger == CDIPIO_CALLBACK_POSTCOMMSETUP in new programs. */
void (*cdiPioConfGetPostCommSetupActions(int confResH))(void);
/* CDIPIO_CALLBACK_POSTCOMMSETUP: trigger number of the hook called
* after communication has been established. This is the same hook
* previously setup with cdiPioConfSetPostCommSetupActions, takes no
* argument */
#define CDIPIO_CALLBACK_POSTCOMMSETUP 0
/* CDIPIO_CALLBACK_POSTSTREAMCLOSE: trigger number for callback
* invoked after each streamClose on the collector side.
* Accepts the streamID as int parameter, i.e. use INTEGER, VALUE and
* BIND(C) on Fortran side
*/
#define CDIPIO_CALLBACK_POSTSTREAMCLOSE 1
/* cdiPioConfSetCallBack: set function to be called at
* indicated trigger of configuration object, action will be cast to
* the appropriate type as indicated for the respective trigger */
void cdiPioConfSetCallBackActions(int confResH, int trigger, void (*action)(void));
/* cdiPioConfGetCallBack: query function to be called at
* indicated trigger of configuration object */
void (*cdiPioConfGetCallBackActions(int confResH, int trigger))(void);
/* cdiPioConfSetLargePageAlign should block buffer be aligned to
* large pages instead of normal pages? */
void cdiPioConfSetLargePageAlign(int confResH, int largePageAlign);
......
......@@ -109,8 +109,10 @@ static int
cdiPioConfCompareP(void *p1, void *p2)
{
struct cdiPioConf *a = p1, *b = p2;
bool callBackDifference = false;
for (size_t i = 0; i < CDIPIO_NUM_CALLBACKS; ++i) callBackDifference |= (a->callbacks[i] != b->callbacks[i]);
return (a->IOMode != b->IOMode) | (a->clientServerRole != b->clientServerRole) | (a->partInflate != b->partInflate)
| (a->postCommSetupActions != b->postCommSetupActions);
| callBackDifference;
}
static void
......@@ -140,7 +142,8 @@ cdiPioConfPrintP(void *cdiPioConfPtr, FILE *fp)
"callback after setup of communication = %p\n",
cdiPioConfPtr, iomodeStr, CSRoleStr, conf->partInflate, conf->largePageAlign ? "en" : "dis",
(size_t) conf->recordAggBufLimMB * 1024 * 1024, conf->writeAggBufLim, conf->stripify ? "en" : "dis",
conf->cacheRedists ? "en" : "dis", conf->cacheXmaps ? "en" : "dis", (void *) conf->postCommSetupActions);
conf->cacheRedists ? "en" : "dis", conf->cacheXmaps ? "en" : "dis",
(void *) conf->callbacks[CDIPIO_CALLBACK_POSTCOMMSETUP]);
}
static inline size_t
......@@ -169,7 +172,7 @@ cdiPioConfCreate(void)
conf->IOMode = PIO_NONE;
conf->clientServerRole = PIO_ROLE_CLIENT;
conf->partInflate = 1.1f;
conf->postCommSetupActions = cdiPioNoPostCommSetup;
for (size_t i = 0; i < CDIPIO_NUM_CALLBACKS; ++i) conf->callbacks[i] = cdiPioNoPostCommSetup;
conf->largePageAlign = false;
conf->cacheRedists = conf->cacheXmaps = true;
conf->recordAggBufLimMB = 128;
......@@ -236,16 +239,39 @@ cdiPioConfGetCSRole(int confResH)
}
void
cdiPioConfSetPostCommSetupActions(int confResH, void (*postCommSetupActions)(void))
cdiPioConfSetCallBackActions(int confResH, int trigger, void (*callback)(void))
{
struct cdiPioConf *conf = reshGetVal(confResH, &cdiPioConfOps);
conf->postCommSetupActions = postCommSetupActions;
if (trigger >= 0 && trigger < CDIPIO_NUM_CALLBACKS)
{
conf->callbacks[trigger] = callback;
}
else
Error("invalid trigger callback query: %d", trigger);
}
void (*cdiPioConfGetPostCommSetupActions(int confResH))(void)
void (*cdiPioConfGetCallBackActions(int confResH, int trigger))(void)
{
void (*callback)(void) = (void (*)(void)) 0;
struct cdiPioConf *conf = reshGetVal(confResH, &cdiPioConfOps);
return conf->postCommSetupActions;
if (trigger >= 0 && trigger < CDIPIO_NUM_CALLBACKS)
{
callback = conf->callbacks[trigger];
}
else
Error("invalid trigger callback query: %d", trigger);
return callback;
}
void
cdiPioConfSetPostCommSetupActions(int confResH, void (*postCommSetupActions)(void))
{
cdiPioConfSetCallBackActions(confResH, CDIPIO_CALLBACK_POSTCOMMSETUP, postCommSetupActions);
}
void (*cdiPioConfGetPostCommSetupActions(int confResH))(void)
{
return cdiPioConfGetCallBackActions(confResH, CDIPIO_CALLBACK_POSTCOMMSETUP);
}
void
......@@ -385,3 +411,13 @@ cdiPioConfGetStripeConversion(int confResH)
struct cdiPioConf *conf = reshGetVal(confResH, &cdiPioConfOps);
return conf->stripify;
}
/*
* Local Variables:
* c-file-style: "Java"
* c-basic-offset: 2
* indent-tabs-mode: nil
* show-trailing-whitespace: t
* require-trailing-newline: t
* End:
*/
......@@ -14,6 +14,11 @@
typedef Xt_xmap (*xmap_new_func_ptr)(Xt_idxlist src_idxlist, Xt_idxlist dst_idxlist, MPI_Comm comm);
enum
{
CDIPIO_NUM_CALLBACKS = 2,
};
/*
* cdiPioConf is meant to be internal to the library and not to be
* used directly if possible, CDI-PIO users should rely on the
......@@ -26,7 +31,7 @@ struct cdiPioConf
float partInflate;
unsigned recordAggBufLimMB;
size_t writeAggBufLim;
void (*postCommSetupActions)(void);
void (*callbacks[CDIPIO_NUM_CALLBACKS])(void);
xmap_new_func_ptr xmap_new;
int maxPathLen;
unsigned short aioQueueDepth;
......
......@@ -497,7 +497,7 @@ cdiPioInit(MPI_Comm commGlob, int confResH, int *pioNamespace)
namespaceSetActive(serverNamespace);
namespaceSwitchSet(cdiPioExtraNSKeys[cdiPioEKConf], NSSW_DATA(conf));
cdiPioSerializeSetMPI();
conf->postCommSetupActions();
conf->callbacks[CDIPIO_CALLBACK_POSTCOMMSETUP]();
cdiPioFileWritingInit();
if (commInqRankColl() >= 0)
{
......
......@@ -1400,11 +1400,11 @@ cdiPioServerStreamClose(stream_t *streamptr, int recordBufIsToBeDeleted)
int fileID = streamptr->fileID;
int filetype = streamptr->filetype;
int vlistID = streamptr->vlistID;
const struct cdiPioConf *conf = cdiPioGetConf();
if (vlistID != CDI_UNDEFID)
{
size_t maxNumStreamWrites = getMaxNumStreamWrites(streamptr);
neededDstIdxlistCacheSize -= maxNumStreamWrites;
const struct cdiPioConf *conf = cdiPioGetConf();
if (conf->cacheXmaps) neededXmapCacheSize -= maxNumStreamWrites;
}
if (fileID == CDI_UNDEFID)
......@@ -1442,6 +1442,8 @@ cdiPioServerStreamClose(stream_t *streamptr, int recordBufIsToBeDeleted)
removeID(&openStreams, streamID);
removeID(&openFiles, fileID);
}
void (*streamCloseCallBack)(int streamID) = (void (*)(int)) conf->callbacks[CDIPIO_CALLBACK_POSTSTREAMCLOSE];
streamCloseCallBack(streamptr->self);
}
#ifdef HAVE_LIBNETCDF
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment