Commit c1aa9a53 authored by Thomas Jahns's avatar Thomas Jahns 🤸
Browse files

Refactor "queue" insert and search functions.

* The "index" is now a member of the "enqueued" struct.
* The search operation uses a predicate passed as function pointer.
parent afcdf32d
......@@ -29,20 +29,19 @@ struct dBuffer
typedef struct listElem
{
int idx;
void * val;
struct listElem * next;
} listElem_t;
typedef int ( * valDestroyFunction ) ( void * );
typedef bool ( * keyCompareFunction ) ( void *, void * );
typedef bool (*eqPredicate)(void *, void *);
typedef struct
typedef struct
{
listElem_t * head;
listElem_t * tail;
valDestroyFunction valDestroy;
keyCompareFunction keyCompare;
eqPredicate keyCompare;
int count;
} queue_t;
......@@ -66,11 +65,12 @@ void dbuffer_cleanup ( struct dBuffer ** );
size_t dbuffer_free ( struct dBuffer * );
/* pio_queue.c */
queue_t * queueInit ( valDestroyFunction, keyCompareFunction );
queue_t *queueInit(valDestroyFunction, eqPredicate);
void queueDestroy ( queue_t * );
int queuePush ( queue_t *, void *, int, ... );
void * queueIdx2val ( queue_t *, int );
int queueDelNode ( queue_t *, int );
int queuePush(queue_t *, void *);
int queueDelNode(queue_t *, int (*predicate)(void *, void *),
void *data);
void *queueGet(queue_t *q, int (*predicate)(void *, void *), void *data);
/* pio_mpinonb.c */
int fowMPINONB ( const char * );
......
......@@ -4,6 +4,7 @@
#ifdef USE_MPI
#include <inttypes.h>
#include <stdbool.h>
#include <stdio.h>
#include <string.h>
......@@ -34,11 +35,18 @@ typedef struct
struct dBuffer *db;
MPI_File fh;
MPI_Request request;
int idx;
bool finished;
} aFiledataM;
static queue_t *bibAFiledataM;
static int
idxTest(void *a, void *idx)
{
return ((aFiledataM *)a)->idx == (int)(intptr_t)idx;
}
/***************************************************************/
......@@ -200,14 +208,15 @@ size_t fwMPINONB ( int fileID, int tsID, const void *buffer, size_t len )
curr = bibAFiledataM->head;
while ( curr )
{
writeMPINONB (( aFiledataM *) curr->val, curr->idx );
writeMPINONB ((aFiledataM *) curr->val,
((aFiledataM *)curr->val)->idx);
curr = curr->next;
}
oldTsID = tsID;
xmpi ( MPI_Barrier ( commNode ));
}
of = ( aFiledataM * ) queueIdx2val ( bibAFiledataM, fileID );
of = queueGet(bibAFiledataM, idxTest, (void *)(intptr_t)fileID);
filled = dbuffer_push ( of->db, ( unsigned char * ) buffer, len );
xdebug3 ( "IOPE%d: fileID = %d, tsID = %d,"
......@@ -249,7 +258,7 @@ int fcMPINONB ( int fileID )
xdebug("IOPE%d: write buffer, close file and cleanup, in %d",
rankNode, fileID );
if ( ! ( of = ( aFiledataM * ) queueIdx2val ( bibAFiledataM, fileID )))
if (!(of = queueGet(bibAFiledataM, idxTest, (void *)(intptr_t)fileID)))
{
sprintf ( errorString, "pio_queue, fileID=%d not found", fileID );
xabort ( errorString );
......@@ -259,7 +268,7 @@ int fcMPINONB ( int fileID )
/* dequeue file element */
iret = queueDelNode ( bibAFiledataM, fileID );
iret = queueDelNode(bibAFiledataM, idxTest, (void *)(intptr_t)fileID);
/* timer output */
......@@ -337,7 +346,7 @@ int fowMPINONB ( const char *filename )
queueCheck ( bibAFiledataM, filename );
of = initAFiledataMPINONB ( filename, buffersize );
if (( id = queuePush ( bibAFiledataM, of, 0 )) < 0 )
if ((id = queuePush(bibAFiledataM, of)) < 0 )
{
sprintf ( errorString, "filename %s not unique", of->name );
xabort ( errorString );
......
......@@ -39,7 +39,7 @@ extern double accumWait;
extern double accumWrite;
typedef struct
typedef struct
{
char *name;
size_t size;
......@@ -52,8 +52,15 @@ typedef struct
int prefIndex;
bool finished;
bool *nfinished;
int idx;
} bFiledataPA;
static int
idxTest(void *a, void *idx)
{
return ((bFiledataPA *)a)->idx == (int)(intptr_t)idx;
}
int nPrefStreams = 4;
/***************************************************************/
......@@ -313,17 +320,14 @@ void pioWriterAIO(void)
xdebug ( "command %s, filename=%s, buffersize=%ld, amount=%ld",
command2charP [ rtag->command ], filename,
buffersize, amount );
buffersize, amount );
if ( !(bfd = ( bFiledataPA * ) queueIdx2val ( bibBFiledataPA,
rtag->id )))
if (!(bfd = queueGet(bibBFiledataPA, idxTest,
(void *)(intptr_t)rtag->id)))
{
queueCheckPA ( bibBFiledataPA, filename );
bfd = ( bFiledataPA * ) initBFiledataPA ( filename,
buffersize,
nProcsCollNode );
if (( id = queuePush ( bibBFiledataPA, bfd, 1, rtag->id )) != rtag->id )
queueCheckPA ( bibBFiledataPA, filename );
bfd = initBFiledataPA(filename, buffersize, nProcsCollNode);
if ((id = queuePush(bibBFiledataPA, bfd)) < 0)
xabort("fileID=%d not unique", rtag->id);
}
else
......@@ -344,7 +348,8 @@ void pioWriterAIO(void)
case IO_Send_buffer:
if (!(bfd = (bFiledataPA *)queueIdx2val(bibBFiledataPA, rtag->id)))
if (!(bfd = queueGet(bibBFiledataPA, idxTest,
(void *)(intptr_t)rtag->id)))
xabort("fileID=%d is not in queue", rtag->id);
amount = messagesize;
......@@ -364,7 +369,8 @@ void pioWriterAIO(void)
case IO_Close_file:
if (!(bfd = (bFiledataPA *)queueIdx2val(bibBFiledataPA, rtag->id)))
if (!(bfd = queueGet(bibBFiledataPA, idxTest,
(void *)(intptr_t)rtag->id)))
xabort("fileID=%d is not in queue", rtag->id );
amount = messagesize;
......@@ -394,7 +400,8 @@ void pioWriterAIO(void)
if ( bfd->finished )
{
xdebug ( "all are finished with file %d, delete node", rtag->id );
queueDelNode ( bibBFiledataPA, rtag->id );
queueDelNode(bibBFiledataPA, idxTest,
(void *)(intptr_t)rtag->id);
}
break;
case IO_Finalize:
......
......@@ -10,6 +10,7 @@
#ifdef USE_MPI
#include <inttypes.h>
#include <stdio.h>
#include <stdbool.h>
#include <string.h>
......@@ -43,15 +44,35 @@ typedef struct
struct dBuffer *db;
FILE *fp;
IO_Server_command command;
int idx;
} aFiledataPF;
static int
idxTestA(void *a, void *idx)
{
return ((aFiledataPF *)a)->idx == (int)(intptr_t)idx;
}
typedef struct
{
long offset;
bool finished;
bool *nfinished;
int idx;
} bFiledataPF;
static int
idxTestB(void *a, void *idx)
{
return ((bFiledataPF *)a)->idx == (int)(intptr_t)idx;
}
static bool
idxCmpB(void *a, void *b)
{
return ((bFiledataPF *)a)->idx == ((bFiledataPF *)b)->idx;
}
static queue_t *bibAFiledataPF;
/***************************************************************/
......@@ -195,7 +216,7 @@ void fpgPOSIXFPGUARDSENDRECV ( void )
xdebug ( "ncollectors=%d on this node", nProcsCollNode );
bibBFiledataPF = queueInit ( destroyBFiledataPF, NULL );
bibBFiledataPF = queueInit ( destroyBFiledataPF, idxCmpB);
sentFinalize = xmalloc ( nProcsCollNode * sizeof ( sentFinalize[0] ));
for ( ;; )
......@@ -211,13 +232,14 @@ void fpgPOSIXFPGUARDSENDRECV ( void )
switch ( rtag->command )
{
case IO_Open_file:
if ( !(bfd = ( bFiledataPF * ) queueIdx2val ( bibBFiledataPF,
rtag->id )))
if (!(bfd = queueGet(bibBFiledataPF, idxTestB,
(void *)(intptr_t)rtag->id)))
{
bfd = ( bFiledataPF * ) initBFiledataPF ( rtag->id,
nProcsCollNode );
if (( iret = queuePush ( bibBFiledataPF, bfd, 1, rtag->id )) != rtag->id )
if ((iret = queuePush(bibBFiledataPF, bfd)) < 0)
{
sprintf ( errorString, "fileID=%d not unique", rtag->id );
xabort ( errorString);
......@@ -243,9 +265,9 @@ void fpgPOSIXFPGUARDSENDRECV ( void )
break;
case IO_Set_fp:
if ( ! ( bfd = ( bFiledataPF * ) queueIdx2val ( bibBFiledataPF,
rtag->id )))
if (!(bfd = queueGet(bibBFiledataPF, idxTestB,
(void *)(intptr_t)rtag->id)))
{
sprintf ( errorString, "fileId=%d not in queue", rtag->id );
xabort ( errorString );
......@@ -268,9 +290,9 @@ void fpgPOSIXFPGUARDSENDRECV ( void )
break;
case IO_Close_file:
if ( ! ( bfd = ( bFiledataPF * ) queueIdx2val ( bibBFiledataPF,
rtag->id )))
if (!(bfd = queueGet(bibBFiledataPF, idxTestB,
(void *)(intptr_t)rtag->id)))
{
sprintf ( errorString, "fileId=%d not in queue", rtag->id );
xabort ( errorString );
......@@ -300,9 +322,9 @@ void fpgPOSIXFPGUARDSENDRECV ( void )
bfd->finished = false;
break;
}
if ( bfd->finished )
queueDelNode ( bibBFiledataPF, rtag->id );
queueDelNode(bibBFiledataPF, idxTestB, (void *)(intptr_t)rtag->id);
break;
case IO_Finalize:
{
......@@ -427,7 +449,7 @@ size_t fwPOSIXFPGUARDSENDRECV( int fileID, int tsID, const void *buffer, size_t
listElem_t *curr;
char errorString[maxErrorString];
afd = ( aFiledataPF * ) queueIdx2val ( bibAFiledataPF, fileID );
afd = queueGet(bibAFiledataPF, idxTestA, (void *)(intptr_t)fileID);
flush = ( tsID != afd->tsID ) ? 1 : 0;
......@@ -437,8 +459,8 @@ size_t fwPOSIXFPGUARDSENDRECV( int fileID, int tsID, const void *buffer, size_t
curr = bibAFiledataPF->head;
while ( curr )
{
writePF (( aFiledataPF *) curr->val, curr->idx );
defTimestepPF (( aFiledataPF * ) curr->val, tsID );
writePF((aFiledataPF *)curr->val, ((aFiledataPF *)curr->val)->idx);
defTimestepPF((aFiledataPF *)curr->val, tsID);
curr = curr->next;
}
xmpi ( MPI_Barrier ( commInqCommColl ()));
......@@ -479,7 +501,7 @@ int fcPOSIXFPGUARDSENDRECV ( int id )
xdebug("write buffer, close file %d and cleanup", id);
afd = ( aFiledataPF * ) queueIdx2val ( bibAFiledataPF, id );
afd = queueGet(bibAFiledataPF, idxTestA, (void *)(intptr_t)id);
afd->command = IO_Close_file;
......@@ -487,7 +509,7 @@ int fcPOSIXFPGUARDSENDRECV ( int id )
/* dequeue file element */
iret = queueDelNode ( bibAFiledataPF, id );
iret = queueDelNode(bibAFiledataPF, idxTestA, (void *)(intptr_t)id);
return iret;
}
......@@ -555,8 +577,8 @@ int fowPOSIXFPGUARDSENDRECV ( const char *filename )
afd = initAFiledataPF ( filename, buffersize );
if (( id = queuePush ( bibAFiledataPF, afd, 0 )) < 0 )
{
if ((id = queuePush(bibAFiledataPF, afd)) < 0)
{
sprintf ( errorString, "filename %s not unique", afd->name );
xabort ( errorString );
}
......
......@@ -35,12 +35,19 @@ typedef struct
FILE *fp;
bool finished;
bool *nfinished;
int idx;
} bFiledataP;
static int
idxTest(void *a, void *idx)
{
return ((bFiledataP *)a)->idx == (int)(intptr_t)idx;
}
/***************************************************************/
static bFiledataP *
initBFiledataP(char *filename, size_t bs, int nc)
initBFiledataP(char *filename, size_t bs, int nc, int idx)
{
bFiledataP * bfp;
int i;
......@@ -64,6 +71,8 @@ initBFiledataP(char *filename, size_t bs, int nc)
for ( i = 0; i < nc; i++ )
bfp->nfinished[i] = false;
bfp->idx = idx;
xdebug ( "filename=%s, opened file, return", bfp->name );
return bfp;
......@@ -167,7 +176,7 @@ pioWriterStdIO(void)
xdebug ( "ncollectors=%d on this node", nProcsCollNode );
bibBFiledataP = queueInit ( destroyBFiledataP, compareNamesBP );
bibBFiledataP = queueInit(destroyBFiledataP, compareNamesBP);
sentFinalize = xmalloc ( nProcsCollNode * sizeof ( sentFinalize ));
for ( ;; )
......@@ -215,13 +224,14 @@ pioWriterStdIO(void)
buffersize, amount);
if ( !(bfd = ( bFiledataP * ) queueIdx2val ( bibBFiledataP,
rtag->id )))
if (!(bfd = queueGet(bibBFiledataP, idxTest,
(void *)(intptr_t)rtag->id)))
{
queueCheckP ( bibBFiledataP, filename );
bfd = ( bFiledataP * ) initBFiledataP ( filename, buffersize, nProcsCollNode );
queueCheckP(bibBFiledataP, filename);
bfd = initBFiledataP(filename, buffersize, nProcsCollNode,
rtag->id);
if (( id = queuePush ( bibBFiledataP, bfd, 1, rtag->id )) != rtag->id )
if ((id = queuePush(bibBFiledataP, bfd)) < 0)
xabort("fileID=%d not unique", rtag->id);
}
else
......@@ -238,7 +248,8 @@ pioWriterStdIO(void)
case IO_Send_buffer:
if (!(bfd = (bFiledataP *)queueIdx2val(bibBFiledataP, rtag->id)))
if (!(bfd = queueGet(bibBFiledataP, idxTest,
(void *)(intptr_t)rtag->id)))
xabort("fileID=%d is not in queue", rtag->id );
amount = messagesize;
......@@ -259,7 +270,8 @@ pioWriterStdIO(void)
xdebug ( "COMMAND %s, FILE%d, SOURCE%d",
command2charP [ rtag->command ], rtag->id, source );
if (!(bfd = (bFiledataP *)queueIdx2val(bibBFiledataP, rtag->id)))
if (!(bfd = queueGet(bibBFiledataP, idxTest,
(void *)(intptr_t)rtag->id)))
xabort("fileID=%d is not in queue", rtag->id);
amount = messagesize;
......@@ -288,7 +300,7 @@ pioWriterStdIO(void)
if ( bfd->finished )
{
xdebug ( "all are finished with file %d, delete node", rtag->id );
queueDelNode ( bibBFiledataP, rtag->id );
queueDelNode(bibBFiledataP, idxTest, (void *)(intptr_t)rtag->id);
}
break;
case IO_Finalize:
......
......@@ -14,7 +14,7 @@
#include "pio_impl.h"
#include "pio_util.h"
queue_t *queueInit ( valDestroyFunction vD, keyCompareFunction kC )
queue_t *queueInit ( valDestroyFunction vD, eqPredicate kC )
{
queue_t *myq;
......@@ -51,31 +51,15 @@ void queueDestroy ( queue_t *q )
return;
}
int queuePush ( queue_t *q, void *v, int noCollector, ... )
int
queuePush(queue_t *q, void *v)
{
listElem_t *newListElem_T, *curr;
va_list ap;
int id = CDI_UNDEFID;
va_start(ap, noCollector);
if ( noCollector )
id = va_arg( ap, int );
va_end(ap);
// ensure unique key values
curr = q->head;
while ( curr )
{
if (( !noCollector && ( *( q->keyCompare )) ( v, curr->val )) ||
( noCollector && ( id == curr->idx )))
return -1;
curr = curr->next;
}
for (curr = q->head; curr; curr = curr->next)
// ensure unique keys
if (q->keyCompare(v, curr->val))
return -1;
if (( newListElem_T = ( listElem_t * ) malloc (sizeof ( listElem_t ))) == NULL )
{
......@@ -84,7 +68,6 @@ int queuePush ( queue_t *q, void *v, int noCollector, ... )
return 1;
}
newListElem_T->idx = noCollector ? id : q->count;
newListElem_T->val = v;
newListElem_T->next = NULL;
......@@ -97,65 +80,40 @@ int queuePush ( queue_t *q, void *v, int noCollector, ... )
q->tail = newListElem_T;
q->count ++;
return newListElem_T->idx;
return 0;
}
void *queueIdx2val ( queue_t *q, int id )
int
queueDelNode(queue_t *q, int (*predicate)(void *, void *),
void *data)
{
listElem_t *curr;
curr = q->head;
while ( curr )
{
if ( curr->idx == id ) break;
curr = curr->next;
}
return curr ? curr->val : NULL;
listElem_t **p;
for (p = &q->head; *p; p = &(*p)->next)
if (predicate((*p)->val, data))
{
listElem_t *rem = *p;
if (rem == q->tail) q->tail = NULL;
int iret = q->valDestroy(rem->val);
*p = rem->next;
free(rem);
return iret;
}
return -1;
}
int queueDelNode ( queue_t *q, int id ) // error
void *
queueGet(queue_t *q, int (*predicate)(void *, void *), void *data)
{
listElem_t *pred, *curr;
if ( !q->head ) return -1;
pred = q->head;
curr = pred->next;
if ( pred->idx == id )
{
int iret;
if ( pred == q->tail ) q->tail = NULL;
q->head = curr;
iret = ( *( q->valDestroy )) ( pred->val );
free ( pred );
return iret;
}
while ( curr )
{
if ( curr->idx == id ) break;
pred = curr;
curr = curr->next;
}
if ( curr )
{
int iret;
if ( curr == q->tail ) q->tail = pred;
pred->next = curr->next;
iret = ( *( q->valDestroy )) ( curr->val );
free ( curr );
return iret;
}
return -1;
listElem_t *p;
xassert(q && predicate);
for (p = q->head; p; p = p->next)
if (predicate(p->val, data))
return p->val;
return NULL;
}
#endif
/*
* Local Variables:
......
......@@ -4,6 +4,7 @@
#ifdef USE_MPI
#include <inttypes.h>
#include <stdlib.h>
#include "pio_comm.h"
......@@ -25,10 +26,17 @@ typedef struct
struct dBuffer *db;
IO_Server_command command;
MPI_Request request;
int idx;
} remoteFileBuf;
static queue_t * bibRemoteFileBuf;
static int
idxTest(void *a, void *idx)
{
return ((remoteFileBuf *)a)->idx == (int)(intptr_t)idx;
}
static remoteFileBuf *
initRemoteFileBuf(const char *filename, size_t bs)
{
......@@ -151,7 +159,7 @@ pioSendWrite(int id, int tsID, const void *buffer, size_t len)
remoteFileBuf *afd;
listElem_t *curr;
afd = (remoteFileBuf *)queueIdx2val(bibRemoteFileBuf, id);
afd = queueGet(bibRemoteFileBuf, idxTest, (void *)(intptr_t)id);
flush = ( tsID != afd->tsID ) ? 1 : 0;
......@@ -163,7 +171,7 @@ pioSendWrite(int id, int tsID, const void *buffer, size_t len)
while (curr)
{
sendP((remoteFileBuf *)curr->val, curr->idx);
sendP((remoteFileBuf *)curr->val, ((remoteFileBuf *)curr->val)->idx);
defTimestep((remoteFileBuf *)curr->val, tsID);
curr = curr->next;
}
......@@ -210,14 +218,14 @@ pioSendClose(int id)
xdebug ( "fileID %d: send buffer, close file and cleanup",id );