Commit b5c58689 authored by Deike Kleberg's avatar Deike Kleberg
Browse files

threaded versions of fpguard added

parent 81d5928d
......@@ -118,6 +118,7 @@ m4/acx_sl_mod_suffix.m4 -text
pioExamples/README -text
pioExamples/cdi_write_f.F90 -text
pioExamples/cdi_write_more_nodes.F90 -text
pioExamples/cdi_write_more_nodes.job -text
pioExamples/cdi_write_parallel.job -text
pioExamples/cdi_write_serial.job -text
src/Makefile.am -text
......@@ -175,6 +176,8 @@ src/pio_impl.h -text
src/pio_mpinonb.c -text
src/pio_posixasynch.c -text
src/pio_posixfpguardsendrecv.c -text
src/pio_posixfpguardthread.c -text
src/pio_posixfpguardthreadrefuse.c -text
src/pio_posixnonb.c -text
src/pio_queue.c -text
src/service.h -text
......
......@@ -38,11 +38,13 @@ status quo:
ptypes available:
PIO_NONE 0
PIO_MPI_NONB 1
PIO_POSIX_ASYNCH 2
PIO_POSIX_FPGUARD_SENDRECV 3
PIO_POSIX_NONB 4
PIO_NONE 0
PIO_MPI_NONB 1
PIO_POSIX_ASYNCH 2
PIO_POSIX_FPGUARD_SENDRECV 3
PIO_POSIX_FPGUARD_THREAD 4
PIO_POSIX_FPGUARD_THREAD_REFUSE 5
PIO_POSIX_NONB 6
initial_buffer_size = 16 * 1024 * 1024 bytes
......
......@@ -156,7 +156,7 @@ END MODULE loadbalancing
INTEGER, PARAMETER :: dp = SELECTED_REAL_KIND(12,307)
INTEGER, PARAMETER :: nPtypes = 4
INTEGER, PARAMETER :: nPtypes = 6
INTEGER, PARAMETER :: MAXNODES = 249
INTEGER, PARAMETER :: ddebug = 0
......@@ -193,10 +193,13 @@ END MODULE loadbalancing
REAL(dp) :: startTime, stopTime
REAL(dp) :: accumOpen, accumClose, accumWrite
INTEGER REQUIRED, PROVIDED, IERROR
#ifndef NOMPI
CALL MPI_INIT ( error )
!CALL MPI_INIT ( error )29
CALL MPI_INIT_THREAD(REQUIRED, PROVIDED, IERROR)
CALL MPI_COMM_DUP ( MPI_COMM_WORLD, pioComm, error )
CALL MPI_COMM_RANK ( MPI_COMM_WORLD, rank, error )
......
#! /bin/bash
#-----------------------------------------------------------------------------
# @ shell = /bin/bash
# @ job_type = parallel
# @ node_usage = not_shared
# @ network.MPI = sn_all,not_shared,us
# @ rset = rset_mcm_affinity
# @ mcm_affinity_options = mcm_distribute
# @ node = 4
# @ tasks_per_node = 16
# @ resources = ConsumableMemory(3000mb)
# @ task_affinity = core
# @ wall_clock_limit = 00:10:00
# @ job_name = cdi_write_more_nodes
# @ output = $(job_name).o$(jobid)
# @ error = $(output)
# @ notification = error
# @ account_no = bk0521
# @ queue
#-----------------------------------------------------------------------------
#
# . /usr/lpp/ppe.hpct/env_sh
#
# export PROFILE_BY_CALLSITE=YES
# export TRACE_ALL_EVENTS=NO
# export OUTPUT_ALL_TASKS=YES
# export TRACEBACK_LEVEL=2
#
# it's MANDATORY to set MP_SINGLE_THREAD=NO, MPI-IO requires this !
#
export MP_SINGLE_THREAD=NO
#
# general options
#
export MEMORY_AFFINITY=MC
export MP_SHARED_MEMORY=YES
export MP_BUFFER_MEM=32M,256M
export MP_LABELIO=yes
export MP_RDMA_MTU=4k
export MP_FIFO_MTU=4k
export MP_SHM_ATTACH_THRESH=500000
#
# critical parameters that affect MPI-IO performance
#
export MP_RFIFO_SIZE=16M # without BULK_XFER you should set this to the max of 16M
export MP_USE_BULK_XFER=YES # gives a large boost for transfer sizes >= 1M
export MP_BULK_MIN_MSG_SIZE=128k # limit BULK_XFER for msg sizes > 128 kB
export MP_S_IOAGENT_CNT=1 # 1/2 x - 1 x the number of total nodes seem to be OK
export MP_IO_BUFFER_SIZE=128M # 128M for 1-4 IO Agents, reduce for more than 4 agents
#
#-----------------------------------------------------------------------------
#
if [[ ! -d /scratch/m/m300050/cdi_pio ]]; then
mkdir /scratch/m/m300050/cdi_pio
fi
#
cd /scratch/m/m300050/cdi_pio
poe /pf/m/m300050/cdi_pio/cdi-pio/examples/cdi_write_more_nodes 4 4
#
#-----------------------------------------------------------------------------
exit
#-----------------------------------------------------------------------------
......@@ -100,6 +100,8 @@ libcdi_a_SOURCES = \
pio_queue.c \
pio_posixasynch.c\
pio_posixfpguardsendrecv.c \
pio_posixfpguardthread.c \
pio_posixfpguardthreadrefuse.c \
pio_posixnonb.c \
pio.h \
pio_impl.h
......
......@@ -79,7 +79,9 @@ am_libcdi_a_OBJECTS = cdiFortran.$(OBJEXT) cdi_error.$(OBJEXT) \
swap.$(OBJEXT) cgribexlib.$(OBJEXT) gribapi.$(OBJEXT) \
pio.$(OBJEXT) pio_dbuffer.$(OBJEXT) pio_mpinonb.$(OBJEXT) \
pio_queue.$(OBJEXT) pio_posixasynch.$(OBJEXT) \
pio_posixfpguardsendrecv.$(OBJEXT) pio_posixnonb.$(OBJEXT)
pio_posixfpguardsendrecv.$(OBJEXT) \
pio_posixfpguardthread.$(OBJEXT) \
pio_posixfpguardthreadrefuse.$(OBJEXT) pio_posixnonb.$(OBJEXT)
libcdi_a_OBJECTS = $(am_libcdi_a_OBJECTS)
DEFAULT_INCLUDES = -I.@am__isrc@
depcomp = $(SHELL) $(top_srcdir)/config/depcomp
......@@ -310,6 +312,8 @@ libcdi_a_SOURCES = \
pio_queue.c \
pio_posixasynch.c\
pio_posixfpguardsendrecv.c \
pio_posixfpguardthread.c \
pio_posixfpguardthreadrefuse.c \
pio_posixnonb.c \
pio.h \
pio_impl.h
......@@ -432,6 +436,8 @@ distclean-compile:
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/pio_mpinonb.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/pio_posixasynch.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/pio_posixfpguardsendrecv.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/pio_posixfpguardthread.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/pio_posixfpguardthreadrefuse.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/pio_posixnonb.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/pio_queue.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/servicelib.Po@am__quote@
......
......@@ -181,11 +181,13 @@ extern "C" {
/* parallel io types, A: asynchronous, B: blocking */
#ifndef NOMPI
#define PIO_NONE 0
#define PIO_MPI_NONB 1
#define PIO_POSIX_ASYNCH 2
#define PIO_POSIX_FPGUARD_SENDRECV 3
#define PIO_POSIX_NONB 4
#define PIO_NONE 0
#define PIO_MPI_NONB 1
#define PIO_POSIX_ASYNCH 2
#define PIO_POSIX_FPGUARD_SENDRECV 3
#define PIO_POSIX_FPGUARD_THREAD 4
#define PIO_POSIX_FPGUARD_THREAD_REFUSE 5
#define PIO_POSIX_NONB 6
int pioInit ( int, int, int*, int*, int, int*, int* );
void pioFinalize ( void );
......
......@@ -18,14 +18,17 @@ const char *token_NODE = "p";
long initial_buffersize = 16 * 1024 * 1024;
/* 4 KB <= x < 256 MB */
/* 16 * 1024 * 1024; */
/* 256 * 1024; */
/* 16 * 1024; */
/* 4 * 1024; */
int tagKey = 100;
double startTime;
double accumProbe = 0.0;
double accumRecv = 0.0;
double accumSend = 0.0;
double accumSuspend = 0.0;
double accumWait = 0.0;
double accumWrite = 0.0;
......@@ -57,7 +60,7 @@ void check_mpi ( int line, int iret )
int setTag ( int ID, int sc )
{
return ID * 1000 + sc;
return ID * tagKey + sc;
}
/***************************************************************/
......@@ -67,8 +70,8 @@ rTag * getTag ( int tag )
rTag *rtag;
rtag = ( rTag * ) malloc ( sizeof ( rTag ));
rtag->id = tag / 1000;
rtag->command = tag % 1000;
rtag->id = tag / tagKey;
rtag->command = tag % tagKey;
return rtag;
}
......@@ -90,6 +93,12 @@ size_t pioFileWrite ( int id, int tsId, const void *buffer, size_t len )
case PIO_POSIX_FPGUARD_SENDRECV:
iret = fwPOSIXFPGUARDSENDRECV ( id, tsId, buffer, len );
break;
case PIO_POSIX_FPGUARD_THREAD:
iret = fwPOSIXFPGUARDTHREAD ( id, tsId, buffer, len );
break;
case PIO_POSIX_FPGUARD_THREAD_REFUSE:
iret = fwPOSIXFPGUARDTHREADREFUSE ( id, tsId, buffer, len );
break;
case PIO_POSIX_NONB:
iret = fwPOSIXNONB ( id, tsId, buffer, len );
break;
......@@ -115,6 +124,12 @@ int pioFileClose ( int id )
case PIO_POSIX_FPGUARD_SENDRECV:
iret = fcPOSIXFPGUARDSENDRECV ( id );
break;
case PIO_POSIX_FPGUARD_THREAD:
iret = fcPOSIXFPGUARDTHREAD ( id );
break;
case PIO_POSIX_FPGUARD_THREAD_REFUSE:
iret = fcPOSIXFPGUARDTHREADREFUSE ( id );
break;
case PIO_POSIX_NONB:
iret = fcPOSIXNONB ( id );
break;
......@@ -140,6 +155,12 @@ int pioFileOpenW ( const char *filename )
case PIO_POSIX_FPGUARD_SENDRECV:
iret = fowPOSIXFPGUARDSENDRECV ( filename );
break;
case PIO_POSIX_FPGUARD_THREAD:
iret = fowPOSIXFPGUARDTHREAD ( filename );
break;
case PIO_POSIX_FPGUARD_THREAD_REFUSE:
iret = fowPOSIXFPGUARDTHREADREFUSE ( filename );
break;
case PIO_POSIX_NONB:
iret = fowPOSIXNONB ( filename );
break;
......@@ -174,6 +195,7 @@ int pioInit ( int ptype, int comm, int *color, int *colors, int nnodes, int *pio
MPI_Comm_split (( MPI_Comm ) comm, *color, key, ( MPI_Comm *) pioComm );
pioinfo->comm = ( MPI_Comm ) *pioComm;
pioinfo->color = *color;
MPI_Comm_rank ( pioinfo->comm, &( pioinfo->rank ));
MPI_Comm_size ( pioinfo->comm, &( pioinfo->size ));
......@@ -226,6 +248,12 @@ int pioInit ( int ptype, int comm, int *color, int *colors, int nnodes, int *pio
case PIO_POSIX_FPGUARD_SENDRECV:
collectingData = initPOSIXFPGUARDSENDRECV ( ncollectors );
break;
case PIO_POSIX_FPGUARD_THREAD:
collectingData = initPOSIXFPGUARDTHREAD ( ncollectors );
break;
case PIO_POSIX_FPGUARD_THREAD_REFUSE:
collectingData = initPOSIXFPGUARDTHREADREFUSE ( ncollectors );
break;
case PIO_POSIX_NONB:
collectingData = initPOSIXNONB ( ncollectors );
break;
......@@ -241,6 +269,13 @@ int pioInit ( int ptype, int comm, int *color, int *colors, int nnodes, int *pio
void pioFinalize ()
{
switch ( pioinfo->type )
{
case PIO_POSIX_FPGUARD_THREAD:
finalizePOSIXFPGUARDTHREAD ();
break;
}
free ( pioinfo );
return;
......
......@@ -8,6 +8,7 @@
typedef struct{
int type;
MPI_Comm comm;
int color;
int size;
int rank;
int specialRank;
......
......@@ -12,12 +12,13 @@ typedef enum
IO_Close_file,
IO_Get_fp,
IO_Set_fp,
IO_Send_buffer
IO_Send_buffer,
IO_Finalize
} IO_Server_command;
const char * command2charP[5] = {"IO_Open_file", "IO_Close_file",
const char * command2charP[6] = {"IO_Open_file", "IO_Close_file",
"IO_Get_fp","IO_Set_fp",
"IO_Send_buffer"};
"IO_Send_buffer", "IO_Finalize"};
struct dBuffer
{
......@@ -92,6 +93,22 @@ int fcPOSIXFPGUARDSENDRECV ( int );
size_t fwPOSIXFPGUARDSENDRECV ( int, int, const void *, size_t );
int initPOSIXFPGUARDSENDRECV ( int * );
/* pio_posixfpguardthread.c */
void * fpgPOSIXFPGUARDTHREAD ( void * );
int fowPOSIXFPGUARDTHREAD ( const char * );
int fcPOSIXFPGUARDTHREAD ( int );
size_t fwPOSIXFPGUARDTHREAD ( int, int, const void *, size_t );
int initPOSIXFPGUARDTHREAD ( int * );
void finalizePOSIXFPGUARDTHREAD ( void );
/* pio_posixfpguardthreadrefuse.c */
void * fpgPOSIXFPGUARDTHREADREFUSE ( void * );
int fowPOSIXFPGUARDTHREADREFUSE ( const char * );
int fcPOSIXFPGUARDTHREADREFUSE ( int );
size_t fwPOSIXFPGUARDTHREADREFUSE ( int, int, const void *, size_t );
int initPOSIXFPGUARDTHREADREFUSE ( int * );
void finalizePOSIXFPGUARDTHREADREFUSE ( void );
/* pio_posixnonb.c */
void pwPOSIXNONB ( int );
int fowPOSIXNONB ( const char * );
......
......@@ -6,8 +6,8 @@
#ifndef NOMPI
#include <stdbool.h>
#include <stdio.h>
#include <stdbool.h>
#include <string.h>
#include "mpi.h"
......@@ -308,7 +308,8 @@ void fpgPOSIXFPGUARDSENDRECV ( int ncollectors )
pioinfo->rank, rtag->id, rtag->command,
command2charP [ rtag->command ], amount, bfd->offset );
bfd->nfinished[source] = true;
bfd->nfinished[source] = true;
bfd->finished = true;
for ( i = 0; i < ncollectors; i++ )
......@@ -326,7 +327,7 @@ void fpgPOSIXFPGUARDSENDRECV ( int ncollectors )
queueDestroy ( bibBFiledataPF );
return;
}
}
}
}
}
}
......@@ -466,7 +467,7 @@ int fcPOSIXFPGUARDSENDRECV ( int id )
int iret;
if ( ddebug )
fprintf ( stdout, "pe%d in fileClose (%id ): write buffer, close file and cleanup\n",
fprintf ( stdout, "pe%d in fileClose (%d ): write buffer, close file and cleanup, --- holle 0\n",
pioinfo->rank, id );
afd = ( aFiledataPF * ) queueIdx2val ( bibAFiledataPF, id );
......@@ -528,6 +529,7 @@ int fowPOSIXFPGUARDSENDRECV ( const char *filename )
/* init and enqueue file element */
afd = initAFiledataPF ( filename, buffersize );
if (( id = queuePush ( bibAFiledataPF, afd, 0 )) < 0 )
{
fprintf ( stderr, "Filename %s not unique\n", afd->name );
......@@ -559,7 +561,7 @@ int initPOSIXFPGUARDSENDRECV ( int *ncollectors )
if ( ddebug )
fprintf ( stdout,
"pe%d in initPOSIXFPGUARDSENDREC()\n", pioinfo->rank );
"pe%d in initPOSIXFPGUARDSENDRECV()\n", pioinfo->rank );
*ncollectors = pioinfo->size - 1;
pioinfo->specialRank = pioinfo->size - 1;
......
/*
todo
build in control, for consistance of pairs filename / filenumber
( pioOpenFile member name, recv in tmpbuffer, if(!uniqueName(q,v,n))abort )
*/
#ifndef NOMPI
#include <pthread.h>
#include <stdlib.h>
#include <stdio.h>
#include <stdbool.h>
#include <string.h>
#include <fcntl.h>
#include "mpi.h"
#include "pio.h"
#include "pio_impl.h"
extern bool ddebug;
extern double startTime;
extern double accumProbe;
extern double accumRecv;
extern double accumSend;
extern double accumSuspend;
extern double accumWait;
extern double accumWrite;
extern long initial_buffersize;
extern pioInfo *pioinfo;
typedef struct
{
char *name;
size_t size;
struct dBuffer *db1;
struct dBuffer *db2;
struct dBuffer *db;
int fd;
IO_Server_command command;
} aFiledataPFT;
typedef struct
{
long offset;
bool finished;
bool *nfinished;
} bFiledataPFT;
static queue *bibAFiledataPFT;
queue *bibBFiledataPF;
pthread_t thread;
pthread_attr_t attr;
pthread_mutex_t offsetMutex;
/***************************************************************/
static aFiledataPFT *initAFiledataPFT( const char *key, size_t bs)
{
aFiledataPFT *afd;
size_t len;
int status;
afd = ( aFiledataPFT * ) malloc ( sizeof ( aFiledataPFT ));
memset ( afd, 0, sizeof ( aFiledataPFT ));
len = strlen ( key );
afd->name = ( char * ) malloc (( len ) * sizeof ( char ));
strcpy ( afd->name, key );
afd->size = bs;
/* init output buffer */
if ( ddebug )
fprintf ( stdout, "pe%d in initAFiledataPF(): name=%s, init output buffer\n",
pioinfo->rank, afd->name );
if (( status = dbuffer_init ( &( afd->db1 ), afd->size )) == 1 )
{
fprintf ( stderr, "dbuffer_init outputBuffer1: %d\n", status );
MPI_Abort ( pioinfo->comm, 1 );
}
if (( status = dbuffer_init ( &( afd->db2 ), afd->size )) == 1)
{
fprintf ( stderr, "dbuffer_init outputBuffer2: %d\n", status );
MPI_Abort ( pioinfo->comm, 1 );
}
afd->db = afd->db1;
/* open file */
if ( ddebug )
fprintf ( stdout, "pe%d in initAFiledataPF(): name=%s, open file\n",
pioinfo->rank, afd->name );
if ( ( afd->fd = open ( afd->name, O_CREAT | O_WRONLY, 0666 )) == -1 )
{
fprintf ( stderr, "Failed to open %s:\n", afd->name );
perror ( "System error message" );
MPI_Abort ( pioinfo->comm, 1 );
}
if ( ddebug )
fprintf ( stdout, "pe%d in initAFiledataPF() has opened file %s\n",
pioinfo->rank, afd->name );
afd->command = IO_Open_file;
return afd;
}
/***************************************************************/
bFiledataPFT * initBFiledataPFT ( int key, int nc )
{
bFiledataPFT *bfd;
int i;
bfd = ( bFiledataPFT * ) malloc ( sizeof ( bFiledataPFT ));
memset ( bfd, 0, sizeof ( bFiledataPFT ));
bfd->offset = 0;
bfd->finished = false;
bfd->nfinished = ( bool * ) malloc ( nc * sizeof ( bool ));
for ( i = 0; i < nc; i++ )
*( bfd->nfinished + i ) = true;
return bfd;
}
/***************************************************************/
int destroyAFiledataPFT ( void *v )
{
int iret = 0;
aFiledataPFT *afd = ( aFiledataPFT * ) v;
/* close file */
if ( ddebug )
fprintf ( stdout, "pe%d in destroyAFiledataPFT(): name=%s, close file\n",
pioinfo->rank, afd->name );
if (( iret = close ( afd->fd )) == -1 )
{
fprintf ( stderr, "Failed to close file %s:\n", afd->name );
perror ( "System error message" );
MPI_Abort ( pioinfo->comm, 1 );
}
/* file closed, cleanup */
if ( ddebug )
fprintf ( stdout, "pe%d in destroyAFiledataPFT(): name=%s, file closed, cleanup ...\n",
pioinfo->rank, afd->name );
dbuffer_cleanup ( &( afd->db1 ));
dbuffer_cleanup ( &( afd->db2 ));
free ( afd->name );
free ( afd );
return iret;
}
/***************************************************************/
int destroyBFiledataPFT ( void *v )
{
int iret = 0;
bFiledataPFT *bfd = (bFiledataPFT * ) v;
free ( bfd->nfinished );
free ( bfd );
return iret;
}
/***************************************************************/
bool compareNamesAPFT ( void *v1, void *v2 )
{
aFiledataPFT *afd1, *afd2;
size_t len;
bool ret;
afd1 = ( aFiledataPFT * ) v1;
afd2 = ( aFiledataPFT * ) v2;
len = strlen ( afd1->name );
ret = ( len == strlen ( afd2->name ) &&
memcmp ( afd1->name, afd2->name, len ) == 0 );
return ret;
}
/***************************************************************/
long rmwOffset ( long *bfdOffset, long amount, long *offsetNew )
{
long offsetOld;
if ( ddebug )
fprintf ( stdout, "pe%d in rmwOffset, amount=%Ld\n",
pioinfo->rank, amount );
pthread_mutex_lock ( &offsetMutex );
offsetOld = *bfdOffset;
*bfdOffset += amount;
*offsetNew = *bfdOffset;
pthread_mutex_unlock ( &offsetMutex );
if ( ddebug )
fprintf ( stdout, "pe%d in rmwOffset, offsetOld=%Ld, offsetNew=%Ld\n",
pioinfo->rank, offsetOld, *offsetNew );
return offsetOld;
}
/***************************************************************/
void communicateOffset ( MPI_Status *statusF, long *bfdOffset )
{
long amount, offsetOld, offsetNew;
int source, tag;
rTag *rtagF;
source = statusF->MPI_SOURCE;
tag = statusF->MPI_TAG;
rtagF = getTag ( tag );