Commit c77bbae4 authored by Thomas Jahns's avatar Thomas Jahns 🤸
Browse files

Encapsulate structure of queue/set type.

parent 2cd54782
......@@ -27,23 +27,10 @@ struct dBuffer
unsigned char * buffer;
};
typedef struct listElem
{
void * val;
struct listElem * next;
} listElem_t;
typedef int ( * valDestroyFunction ) ( void * );
typedef bool (*eqPredicate)(void *, void *);
typedef struct
{
listElem_t * head;
listElem_t * tail;
valDestroyFunction valDestroy;
eqPredicate keyCompare;
int count;
} queue_t;
typedef struct queue_t queue_t;
typedef struct
{
......@@ -73,6 +60,9 @@ int queueDelNode(queue_t *, int (*predicate)(void *, void *),
void *data);
void *queueGet(queue_t *q, int (*predicate)(void *, void *), void *data);
typedef void (*elemOp)(void *elem, void *data);
void queueForeach(queue_t *q, elemOp func, void *data);
/* pio_mpinonb.c */
int fowMPINONB ( const char * );
int fcMPINONB ( int );
......
......@@ -195,7 +195,6 @@ size_t fwMPINONB ( int fileID, int tsID, const void *buffer, size_t len )
int flush = 0;
int filled = 0;
aFiledataM *of;
listElem_t *curr;
char errorString[maxErrorString];
MPI_Comm commNode = commInqCommNode ();
int rankNode = commInqRankNode ();
......@@ -204,14 +203,9 @@ size_t fwMPINONB ( int fileID, int tsID, const void *buffer, size_t len )
if ( flush == 1 )
{
xdebug3 ( "IOPE%d: tsID = %d, flush buffer",
xdebug3 ( "IOPE%d: tsID = %d, flush buffer",
rankNode, tsID );
curr = bibAFiledataM->head;
while ( curr )
{
writeMPINONB((aFiledataM *)curr->val);
curr = curr->next;
}
queueForeach(bibAFiledataM, (elemOp)writeMPINONB, NULL);
oldTsID = tsID;
xmpi ( MPI_Barrier ( commNode ));
}
......@@ -288,29 +282,14 @@ int fcMPINONB ( int fileID )
/***************************************************************/
// baustelle, in modi vereinheitlichen, kapseln in pio_queue.c
void queueCheck ( queue_t *q, const char *name )
static void
elemCheck(void *q, void *nm)
{
listElem_t *curr;
aFiledataM *afm;
size_t len;
char errorString[maxErrorString];
len = strlen ( name );
curr = q->head;
while ( curr )
{
afm = ( aFiledataM * ) curr->val;
if ( len == strlen ( afm->name ) &&
memcmp ( name, afm->name, len ) == 0 )
{
sprintf ( errorString, "Filename %s is already enqueued\n", name );
xabort ( errorString );
return;
}
curr = curr->next;
}
aFiledataM *afm = q;
const char *name = nm;
if (!strcmp(name, afm->name))
xabort("Filename %s is already enqueued\n", name);
}
......@@ -343,7 +322,7 @@ int fowMPINONB ( const char *filename )
xdebug("buffersize=%ld", buffersize);
queueCheck ( bibAFiledataM, filename );
queueForeach(bibAFiledataM, elemCheck, (void *)filename);
of = initAFiledataMPINONB ( filename, buffersize );
if ((id = queueAdd(bibAFiledataM, of)) < 0 )
......@@ -362,8 +341,8 @@ int fowMPINONB ( const char *filename )
void finalizeMPINONB(void)
{
if (queueIsEmpty(bibAFiledataM))
xabort("queue bibAFiledataM not empty")
if (!queueIsEmpty(bibAFiledataM))
xabort("queue bibAFiledataM not empty");
else
{
xdebug("%s", "cleanup queue");
......
......@@ -244,23 +244,14 @@ writePA(bFiledataPA *bfd, long amount)
}
/***************************************************************/
// TODO: unify in IOModi, encapsulate in pio_queue.c
static void
queueCheckPA(queue_t *q, char *name)
elemCheck(void *q, void *nm)
{
listElem_t *curr;
bFiledataPA *bfd;
curr = q->head;
while ( curr )
{
bfd = ( bFiledataPA * ) curr->val;
if (strcmp(name, bfd->name) == 0)
xabort("Filename %s is already enqueued\n", name );
bFiledataPA *bfd = q;
const char *name = nm;
curr = curr->next;
}
if (!strcmp(name, bfd->name))
xabort("Filename %s is already enqueued\n", name);
}
/***************************************************************/
......@@ -325,7 +316,7 @@ void pioWriterAIO(void)
if (!(bfd = queueGet(bibBFiledataPA, idxTest,
(void *)(intptr_t)rtag->id)))
{
queueCheckPA ( bibBFiledataPA, filename );
queueForeach(bibBFiledataPA, elemCheck, filename);
bfd = initBFiledataPA(filename, buffersize, nProcsCollNode);
if ((id = queueAdd(bibBFiledataPA, bfd)) < 0)
xabort("fileID=%d not unique", rtag->id);
......@@ -419,10 +410,8 @@ void pioWriterAIO(void)
}
if ( doFinalize )
{
if ( bibBFiledataPA->head != NULL )
{
xabort ( "Queue bibBfiledataP is not empty." );
}
if (!queueIsEmpty(bibBFiledataPA))
xabort("Queue bibBfiledataP is not empty.");
else
{
xdebug("%s", "all files are finished, destroy queue,"
......
......@@ -100,7 +100,7 @@ static aFiledataPF *initAFiledataPF ( const char *key, size_t bs)
iret += dbuffer_init ( &( afd->db2 ), afd->size );
if ( iret > 0 )
xabort ( "dbuffer_init did not succeed" );
xabort("dbuffer_init did not succeed");
afd->db = afd->db1;
......@@ -342,10 +342,8 @@ void fpgPOSIXFPGUARDSENDRECV ( void )
}
if ( doFinalize )
{
if ( bibBFiledataPF->head != NULL )
{
xabort ( "queue bibBFiledataM not empty" );
}
if (!queueIsEmpty(bibBFiledataPF))
xabort("queue bibBFiledataM not empty");
else
{
xdebug("%s", "cleanup queue");
......@@ -440,13 +438,20 @@ void defTimestepPF ( aFiledataPF *afd, int tsID)
/***************************************************************/
static void
flushOp(void *a, void *tsID)
{
writePF((aFiledataPF *)a, ((aFiledataPF *)a)->idx);
defTimestepPF((aFiledataPF *)a, (int)(intptr_t)tsID);
}
size_t fwPOSIXFPGUARDSENDRECV( int fileID, int tsID, const void *buffer, size_t len )
{
int error = 0;
int flush = 0;
int filled = 0;
aFiledataPF *afd;
listElem_t *curr;
char errorString[maxErrorString];
afd = queueGet(bibAFiledataPF, idxTestA, (void *)(intptr_t)fileID);
......@@ -456,13 +461,7 @@ size_t fwPOSIXFPGUARDSENDRECV( int fileID, int tsID, const void *buffer, size_t
if ( flush == 1 )
{
xdebug ( "tsID = %d, flush buffer", tsID );
curr = bibAFiledataPF->head;
while ( curr )
{
writePF((aFiledataPF *)curr->val, ((aFiledataPF *)curr->val)->idx);
defTimestepPF((aFiledataPF *)curr->val, tsID);
curr = curr->next;
}
queueForeach(bibAFiledataPF, flushOp, (void *)(intptr_t)tsID);
xmpi ( MPI_Barrier ( commInqCommColl ()));
}
......@@ -516,33 +515,15 @@ int fcPOSIXFPGUARDSENDRECV ( int id )
/***************************************************************/
// baustelle, in modi vereinheitlichen, kapseln in pio_queue.c
void queueCheckFP ( queue_t *q, const char *name )
static void
elemCheck(void *q, void *nm)
{
listElem_t *curr;
aFiledataPF *afd;
size_t len;
char errorString[maxErrorString];
len = strlen ( name );
curr = q->head;
while ( curr )
{
afd = ( aFiledataPF * ) curr->val;
if ( len == strlen ( afd->name ) &&
memcmp ( name, afd->name, len ) == 0 )
{
sprintf ( errorString, "Filename %s is already enqueued\n", name );
xabort ( errorString );
return;
}
curr = curr->next;
}
aFiledataPF *afd = q;
const char *name = nm;
return;
if (!strcmp(name, afd->name))
xabort("Filename %s is already enqueued\n", name);
}
//
int fowPOSIXFPGUARDSENDRECV ( const char *filename )
{
......@@ -573,7 +554,7 @@ int fowPOSIXFPGUARDSENDRECV ( const char *filename )
/* init and enqueue file element */
queueCheckFP ( bibAFiledataPF, filename );
queueForeach(bibAFiledataPF, elemCheck, (void *)filename);
afd = initAFiledataPF ( filename, buffersize );
......@@ -600,10 +581,8 @@ void finalizePOSIXFPGUARDSENDRECV ( void )
xmpi ( MPI_Send ( &buffer, 1, MPI_INT, specialRank, tag, commNode ));
if ( bibAFiledataPF->head != NULL )
{
xabort ( "queue bibAFiledataM not empty" );
}
if (!queueIsEmpty(bibAFiledataPF))
xabort("queue bibAFiledataM not empty");
else
{
xdebug("%s", "cleanup queue");
......
......@@ -137,28 +137,15 @@ writeP(bFiledataP *bfd, long amount)
// TODO: unify in IOModi, encapsulate in pio_queue.c
static void
queueCheckP(queue_t *q, char *name)
elemCheck(void *q, void *nm)
{
listElem_t *curr;
bFiledataP *bfd;
size_t len;
bFiledataP *bfd = q;
const char *name = nm;
len = strlen ( name );
curr = q->head;
while ( curr )
{
bfd = ( bFiledataP * ) curr->val;
if ( len == strlen ( bfd->name ) &&
memcmp ( name, bfd->name, len ) == 0 )
xabort("Filename %s is already enqueued\n", name);
curr = curr->next;
}
if (!strcmp(name, bfd->name))
xabort("Filename %s is already enqueued\n", name);
}
/***************************************************************/
void
pioWriterStdIO(void)
{
......@@ -227,7 +214,7 @@ pioWriterStdIO(void)
if (!(bfd = queueGet(bibBFiledataP, idxTest,
(void *)(intptr_t)rtag->id)))
{
queueCheckP(bibBFiledataP, filename);
queueForeach(bibBFiledataP, elemCheck, filename);
bfd = initBFiledataP(filename, buffersize, nProcsCollNode,
rtag->id);
......@@ -321,10 +308,8 @@ pioWriterStdIO(void)
if ( doFinalize )
{
if ( bibBFiledataP->head != NULL )
{
xabort ( "Queue bibBfiledataP is not empty." );
}
if (!queueIsEmpty(bibBFiledataP))
xabort("Queue bibBfiledataP is not empty.");
else
{
xdebug("%s", "all files are finished, destroy queue,"
......
......@@ -14,6 +14,19 @@
#include "pio_impl.h"
#include "pio_util.h"
struct cons
{
void * val;
struct cons * next;
};
struct queue_t {
struct cons *head, *tail;
valDestroyFunction valDestroy;
eqPredicate keyCompare;
int count;
};
queue_t *queueInit ( valDestroyFunction vD, eqPredicate kC )
{
queue_t *myq;
......@@ -31,7 +44,7 @@ queue_t *queueInit ( valDestroyFunction vD, eqPredicate kC )
void queueDestroy ( queue_t *q )
{
listElem_t *curr, *succ;
struct cons *curr, *succ;
if ( q->head )
{
......@@ -54,30 +67,33 @@ void queueDestroy ( queue_t *q )
int
queueAdd(queue_t *q, void *v)
{
listElem_t *newListElem_T, *curr;
struct cons *newCons;
for (curr = q->head; curr; curr = curr->next)
// ensure unique keys
if (q->keyCompare(v, curr->val))
return -1;
{
struct cons *p;
for (p = q->head; p; p = p->next)
// ensure unique keys
if (q->keyCompare(v, p->val))
return -1;
}
if (( newListElem_T = ( listElem_t * ) malloc (sizeof ( listElem_t ))) == NULL )
if ((newCons = malloc(sizeof(struct cons))) == NULL)
{
perror ( "pio_queue: queueAdd (): Not enough memory" );
/* FIXME: why not abort? */
return 1;
}
newListElem_T->val = v;
newListElem_T->next = NULL;
newCons->val = v;
newCons->next = NULL;
if ( q->tail != NULL)
q->tail->next = newListElem_T;
q->tail->next = newCons;
else
q->head = newListElem_T;
q->head = newCons;
q->tail = newListElem_T;
q->tail = newCons;
q->count ++;
return 0;
......@@ -87,12 +103,12 @@ int
queueDelNode(queue_t *q, int (*predicate)(void *, void *),
void *data)
{
listElem_t **p;
struct cons **p;
for (p = &q->head; *p; p = &(*p)->next)
if (predicate((*p)->val, data))
{
listElem_t *rem = *p;
struct cons *rem = *p;
if (rem == q->tail) q->tail = NULL;
int iret = q->valDestroy(rem->val);
*p = rem->next;
......@@ -105,7 +121,7 @@ queueDelNode(queue_t *q, int (*predicate)(void *, void *),
void *
queueGet(queue_t *q, int (*predicate)(void *, void *), void *data)
{
listElem_t *p;
struct cons *p;
xassert(q && predicate);
for (p = q->head; p; p = p->next)
if (predicate(p->val, data))
......@@ -120,6 +136,15 @@ queueIsEmpty(queue_t *q)
}
void
queueForeach(queue_t *q, void (*func)(void *elem, void *data), void *data)
{
struct cons *p;
for (p = q->head; p; p = p->next)
func(p->val, data);
}
#endif
/*
* Local Variables:
......
......@@ -150,6 +150,15 @@ defTimestep(remoteFileBuf *afd, int tsID)
afd->tsID = tsID;
}
static void
flushOp(void *b, void *tsID)
{
remoteFileBuf *fb = b;
sendP(fb, fb->idx);
defTimestep(fb, (int)(intptr_t)tsID);
}
size_t
pioSendWrite(int id, int tsID, const void *buffer, size_t len)
{
......@@ -157,7 +166,6 @@ pioSendWrite(int id, int tsID, const void *buffer, size_t len)
int flush = 0;
int filled;
remoteFileBuf *afd;
listElem_t *curr;
afd = queueGet(bibRemoteFileBuf, idxTest, (void *)(intptr_t)id);
......@@ -167,14 +175,7 @@ pioSendWrite(int id, int tsID, const void *buffer, size_t len)
{
xdebug ( "tsID = %d, flush buffer", tsID );
curr = bibRemoteFileBuf->head;
while (curr)
{
sendP((remoteFileBuf *)curr->val, ((remoteFileBuf *)curr->val)->idx);
defTimestep((remoteFileBuf *)curr->val, tsID);
curr = curr->next;
}
queueForeach(bibRemoteFileBuf, flushOp, (void *)(intptr_t)tsID);
{
double startTime;
......@@ -307,8 +308,8 @@ pioSendFinalize(void)
xmpi(MPI_Send(&buffer, 1, MPI_INT, specialRank, tag, commNode));
xdebug("%s", "SENT MESSAGE WITH TAG \"IO_FINALIZE\" TO SPECIAL PROCESS");
if (bibRemoteFileBuf->head != NULL)
xabort("Queue bibRemoteFileBuf not empty.")
if (!queueIsEmpty(bibRemoteFileBuf))
xabort("Queue bibRemoteFileBuf not empty.");
else
{
xdebug("%s", "cleanup queue");
......
......@@ -126,7 +126,7 @@ char * outTextComm ( MPI_Comm * );
void pcdiAbort (const char *, const char *, int, const char *, ... )
__attribute__((noreturn));
#define xabort(...) pcdiAbort (__FILE__, __func__, __LINE__, __VA_ARGS__ );
#define xabort(...) pcdiAbort(__FILE__, __func__, __LINE__, __VA_ARGS__ )
void * pcdiXmalloc ( size_t, const char *, const char *, int );
#define xmalloc(size) pcdiXmalloc ( size, __FILE__, __func__, __LINE__ )
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment