Skip to content
Snippets Groups Projects
Commit 9b8ac929 authored by Thomas Jahns's avatar Thomas Jahns :cartwheel: Committed by Sergey Kosukhin
Browse files

Add distributed grid capability for parallel output.

parent c2d903db
No related branches found
No related tags found
2 merge requests!34Version 2.2.0,!13Consolidation with CDI-PIO (develop)
......@@ -227,6 +227,24 @@ main(int argc, char **argv)
PKG_CHECK_MODULES([YAXT],[yaxt],
[AC_DEFINE([HAVE_YAXT],,[yaxt library is available])],
[AC_MSG_FAILURE([Required yaxt library unavailable.])])
dnl parallel netCDF support still requires ScalES-PPM and YAXT to
dnl re-arrange the data when running with more than one collector
PKG_CHECK_MODULES([PPM_CORE],[scales-ppm-core],
[enable_ppm=yes
AC_DEFINE([HAVE_PPM_CORE],,
[ScalES PPM C core library is available])
saved_CPPFLAGS=$CPPFLAGS
CPPFLAGS="$CPPFLAGS $PPM_CORE_CFLAGS"
AC_CHECK_HEADERS(
[ppm/dist_array.h],
[HAVE_PPM_DIST_ARRAY=yes],,
[AC_INCLUDES_DEFAULT])
AC_SUBST([HAVE_PPM_DIST_ARRAY])
AM_SUBST_NOTMAKE([HAVE_PPM_DIST_ARRAY])
CPPFLAGS=$saved_CPPFLAGS
],
[enable_ppm=no])
AS_IF([test x"$ENABLE_NC4" = xyes],
[AC_CHECK_HEADERS([netcdf_par.h],
[AC_CHECK_DECL([MPI_Bcast],
......@@ -240,14 +258,6 @@ main(int argc, char **argv)
@%:@ifdef HAVE_NETCDF_H
@%:@include <netcdf.h>
@%:@endif])
dnl parallel netCDF support still requires ScalES-PPM and YAXT to
dnl re-arrange the data when running with more than one collector
PKG_CHECK_MODULES([PPM_CORE],[scales-ppm-core],
[enable_ppm=yes
AC_DEFINE([HAVE_PPM_CORE],,
[ScalES PPM C core library is available])
],
[enable_ppm=no])
dnl if not both scales-ppm and yaxt are available, netcdf can only be
dnl used in serial mode
AS_IF([test x$enable_ppm != xyes],
......
......@@ -143,3 +143,9 @@ for "first", ranks 0-NUMSERVERS-1 become I/O servers and in the case
Use this option to generate UUID for the created grids/zaxis data structures.
This feature is off by default.
-quse-dist-grid[=(true|false)], -qno-use-dist-grid[=(true|false)]
If true, generate a distributed grid on the I/O clients and servers. This
feature is only available when CDI was built with a PPM library including
the distributed multi-array feature.
......@@ -229,6 +229,7 @@ libcdipio_la_SOURCES = \
pio_conf.c \
pio_conf.h \
pio_dbuffer.c \
pio_dist_grid.c \
pio_id_set.h \
pio_impl.h \
pio_interface.c \
......
......@@ -144,6 +144,20 @@ void cdiPioConfSetStripeConversion(int confResH, int doStripify);
* to the xmap constructor? */
int cdiPioConfGetStripeConversion(int confResH);
/* cdiPioDistGridCreate: create a grid data structure where the
* per-coordinate data is distributed over the client tasks
* chunk_decomposition specifies how to distribute the data of each of
* the following arrays:
* x-values, y-values, x-bounds, y-bounds, area, mask and gme mask
*
* for 2D grids (all but gridtype == GRID_UNSTRUCTURED), four values specifiy the start and size of each dimension
* where e.g. chunk_decomposition[1][0] would
* specify the part size of the y-dimension and chunk_decomposition[0][0] the x dimension start
* index. For unstructured grids only the x-dimension applies.
*/
int cdiPioDistGridCreate(int gridtype, int size, int xsize, int ysize, int nvertex, const int xy_decomposition_optional[][2],
Xt_idxlist partDesc2D, Xt_idxlist partDescX, Xt_idxlist partDescY);
// End of fortran interface
//FINT_OFF <--- don't change or remove this line!!!
......
......@@ -15,6 +15,7 @@
#include "cdipio.h"
#include "pio.h"
#include "pio_client.h"
#include "pio_dist_grid.h"
#include "pio_comm.h"
#include "pio_interface.h"
#include "pio_rpc.h"
......@@ -59,7 +60,12 @@ cdiPioClientStreamOpen(const char *filename, char filemode, int filetype, stream
Free(msgBuffer);
}
else
xmpi(MPI_Recv(&fileID, 1, MPI_INT, collRank, STREAMOPEN, comm, MPI_STATUS_IGNORE));
{
#ifdef HAVE_PPM_DIST_ARRAY_H
cdiPioDistGridPackAssist();
#endif
xmpi(MPI_Recv(&fileID, 1, MPI_INT, collRank, STREAMOPEN, comm, MPI_STATUS_IGNORE));
}
if (fileID >= 0)
{
streamptr->filetype = filetype;
......@@ -97,6 +103,10 @@ cdiPioClientStreamDefVlist_(int streamID, int vlistID)
xmpi(MPI_Send(msgBuffer, msgBufPos, MPI_PACKED, collRank, STREAMDEFVLIST, comm));
Free(msgBuffer);
}
#ifdef HAVE_PPM_DIST_ARRAY_H
else
cdiPioDistGridPackAssist();
#endif
struct collSpec cspec = { .numClients = numClients, .numServers = numColl, .sendRPCData = sendRPCData };
cdiPioClientStreamWinCreate(streamID, &cspec);
}
......@@ -194,6 +204,10 @@ cdiPioClientStreamClose(stream_t *streamptr, int recordBufIsToBeDeleted)
xmpi(MPI_Send(msgBuffer, msgBufPos, MPI_PACKED, collRank, STREAMCLOSE, comm));
Free(msgBuffer);
}
#ifdef HAVE_PPM_DIST_ARRAY_H
else
cdiPioDistGridPackAssist();
#endif
cdiPioClientStreamWinDestroy(streamID);
}
......
This diff is collapsed.
#ifndef PIO_DIST_GRID_H
#define PIO_DIST_GRID_H
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#ifdef HAVE_PPM_DIST_ARRAY_H
void cdiPioDistGridPackAssist(void);
void cdiPioDistGridUnpack(char *unpackBuffer, int unpackBufferSize, int *unpackBufferPos, int originNamespace, void *context,
int force_id);
void cdiPioDistGridFinalizeOnce(int namespace);
#endif
#endif
......@@ -12,6 +12,10 @@
#include <mpi.h>
#include <yaxt.h>
#ifdef HAVE_PPM_CORE
#include <ppm/ppm.h>
#endif
#include "cdi.h"
#include "cdipio.h"
#include "cdi_int.h"
......@@ -20,6 +24,7 @@
#include "pio.h"
#include "pio_client.h"
#include "pio_conf.h"
#include "pio_dist_grid.h"
#include "pio_id_set.h"
#include "pio_impl.h"
#include "pio_serialize.h"
......@@ -29,6 +34,7 @@
#include "pio_server.h"
#include "pio_util.h"
#include "resource_handle.h"
#include "resource_unpack.h"
#include "vlist.h"
#include "vlist_var.h"
......@@ -365,6 +371,9 @@ cdiPioNoPostCommSetup(void)
/*****************************************************************************/
static int xtInitByCDI = 0;
#ifdef HAVE_PPM_CORE
static int ppmInitByCDI = 0;
#endif
/* pioInit definition must currently compile even in non-MPI configurations */
/**
......@@ -422,6 +431,9 @@ cdiPioInit(MPI_Comm commGlob, int confResH, int *pioNamespace)
IOMode == PIO_ASYNCH)
xabort("CDI-PIO Output method PIO_ASYNCH is unsupported on this system!");
#ifdef HAVE_PPM_CORE
if ((ppmInitByCDI = (!PPM_initialized() || PPM_finalized()))) PPM_initialize(&commGlob, NULL, NULL);
#endif
if ((xtInitByCDI = (!xt_initialized() || xt_finalized()))) xt_initialize(commGlob);
int nProcsIO = cdiPioCommInit(commGlob, IOMode, conf->clientServerRole);
int sizeGlob = commInqSizeGlob();
......@@ -442,6 +454,11 @@ cdiPioInit(MPI_Comm commGlob, int confResH, int *pioNamespace)
reshRemove(confResH, &cdiPioConfOps);
if (cdiPioExtraNSKeys[cdiPioEKXmapNew] == 0) cdiPioExtraNSKeys[cdiPioEKXmapNew] = cdiNamespaceSwitchNewKey();
#ifdef HAVE_PPM_DIST_ARRAY_H
reshDistGridUnpack = cdiPioDistGridUnpack;
#endif
if (commInqIsProcIO())
{
int prevNamespace = namespaceGetActive(), serverNamespace = namespaceNew();
......@@ -456,11 +473,17 @@ cdiPioInit(MPI_Comm commGlob, int confResH, int *pioNamespace)
void (*cdiPioFileWritingFinalize)(void) = namespaceSwitchGet(cdiPioExtraNSKeys[cdiPioEKFileWritingFinalize]).func;
cdiPioFileWritingFinalize();
}
cdiPioCommFinalize();
if (xtInitByCDI) xt_finalize();
namespaceSetActive(prevNamespace);
reshReplace(confResH, conf, &cdiPioConfOps);
namespaceDelete(serverNamespace);
#ifdef HAVE_PPM_DIST_ARRAY_H
if (commInqRankColl() >= 0) cdiPioDistGridFinalizeOnce(serverNamespace);
#endif
cdiPioCommFinalize();
if (xtInitByCDI) xt_finalize();
#ifdef HAVE_PPM_CORE
if (ppmInitByCDI) PPM_finalize();
#endif
return MPI_COMM_NULL;
}
else
......@@ -518,6 +541,10 @@ pioEndDef(void)
xmpi(MPI_Send(buffer, bufferSize, MPI_PACKED, collRank, RESOURCES, comm));
reshPackBufferDestroy(&buffer);
}
#if HAVE_PPM_DIST_ARRAY_H
else
cdiPioDistGridPackAssist();
#endif
}
/************************************************************************/
......@@ -541,8 +568,11 @@ pioFinalize(void)
/* namespace is unchanged on I/O servers */
int pioNamespace = namespaceGetActive();
if (pioNamespace == 0) return;
reshListDestruct(pioNamespace);
#ifdef HAVE_PPM_DIST_ARRAY_H
cdiPioDistGridFinalizeOnce(pioNamespace);
#endif
namespaceDelete(pioNamespace);
int clientRank = commInqRankModel(), numClients = cdiPioCommInqSizeClients(), numColl = commInqSizeColl(),
collRank = cdiPioCollRank(clientRank, numClients, numColl);
......@@ -553,6 +583,9 @@ pioFinalize(void)
cdiPioCommFinalize();
idSetDestroy(&openStreams);
if (xtInitByCDI) xt_finalize();
#ifdef HAVE_PPM_CORE
if (ppmInitByCDI) PPM_finalize();
#endif
xdebug("%s", "RETURN");
}
......
......@@ -20,6 +20,7 @@ enum collectorCommandTags
STREAMDEFVLIST,
WRITETS,
BLOCK_XFER,
DIST_DATA_AGG,
};
#define MAXWINBUFFERSIZE ((size_t) 2048 * 1024 * 1024)
......
......@@ -12,6 +12,9 @@
#include "taxis.h"
#include "zaxis.h"
void (*reshDistGridUnpack)(char *unpackBuffer, int unpackBufferSize, int *unpackBufferPos, int originNamespace, void *context,
int force_id);
/*****************************************************************************/
int
......@@ -46,6 +49,7 @@ reshUnpackResources(char *unpackBuffer, int unpackBufferSize, void *context)
++numAssociations;
break;
case VLIST: vlistUnpack(unpackBuffer, unpackBufferSize, &unpackBufferPos, originNamespace, context, 1); break;
case DIST_GRID: reshDistGridUnpack(unpackBuffer, unpackBufferSize, &unpackBufferPos, originNamespace, context, 1); break;
case RESH_DELETE:
serializeUnpack(unpackBuffer, unpackBufferSize, &unpackBufferPos, &resH, 1, CDI_DATATYPE_INT, context);
resH = namespaceAdaptKey(resH, originNamespace);
......
......@@ -14,6 +14,7 @@ enum
MODEL = 5,
STREAM = 6,
VLIST = 7,
DIST_GRID = 8,
RESH_DELETE,
START = 55555555,
END = 99999999
......@@ -21,6 +22,8 @@ enum
int reshUnpackResources(char *unpackBuffer, int unpackBufferSize, void *context);
extern void (*reshDistGridUnpack)(char *unpackBuffer, int unpackBufferSize, int *unpackBufferPos, int originNamespace,
void *context, int force_id);
#endif
/*
......
......@@ -17,6 +17,9 @@
#include <mpi.h>
#include <yaxt.h>
#endif
#ifdef HAVE_PPM_CORE
#include <ppm/ppm.h>
#endif
#include "cdi.h"
#include "dmemory.h"
......@@ -142,7 +145,8 @@ parse_long_option(struct model_config *restrict setup, int pioConfHandle, pioRol
(void) pioRoleAssign;
#endif
static const char cacheRedistStr[] = "no-cache-redists", pioRoleSchemeOptionStr[] = "pio-role-scheme",
curvilinearGridOptionStr[] = "no-create-curvilinear-grid", uuidCreateOptionStr[] = "no-create-uuid";
curvilinearGridOptionStr[] = "no-create-curvilinear-grid", uuidCreateOptionStr[] = "no-create-uuid",
useDistGridOptionStr[] = "no-use-dist-grid";
struct boolOptionParse bop;
if ((bop = parseBooleanLongOption(sizeof(cacheRedistStr), cacheRedistStr, str)).matched)
{
......@@ -172,6 +176,20 @@ parse_long_option(struct model_config *restrict setup, int pioConfHandle, pioRol
invalidOptionDie("long option %s needs argument\n", pioRoleSchemeOptionStr);
#else
invalidOptionDie("CDI-PIO option -q%s ignored in non-MPI mode\n", pioRoleSchemeOptionStr);
#endif
}
else if ((bop = parseBooleanLongOption(sizeof(useDistGridOptionStr), useDistGridOptionStr, str)).matched)
{
#if defined USE_MPI && defined HAVE_PPM_DIST_ARRAY_H
setup->flags = (setup->flags & ~~PIO_WRITE_CONFIG_USE_DIST_GRID_FLAG) | (bop.value << PIO_WRITE_CONFIG_USE_DIST_GRID_BIT);
#else
invalidOptionDie("CDI-PIO option -q%s feature unavailable %s\n", useDistGridOptionStr + (bop.invert ? 0 : 3),
#ifndef USE_MPI
"in non-MPI mode"
#else
"without PPM feature distributed multi-array"
#endif
);
#endif
}
else if ((bop = parseBooleanLongOption(sizeof(uuidCreateOptionStr), uuidCreateOptionStr, str)).matched)
......
......@@ -18,9 +18,11 @@ enum
PIO_WRITE_CONFIG_CHECKSUM_BIT = 0,
PIO_WRITE_CONFIG_CREATE_UUID_BIT,
PIO_WRITE_CONFIG_CREATE_CURVILINEAR_GRID_BIT,
PIO_WRITE_CONFIG_USE_DIST_GRID_BIT,
PIO_WRITE_CONFIG_CHECKSUM_FLAG = 1 << PIO_WRITE_CONFIG_CHECKSUM_BIT,
PIO_WRITE_CONFIG_CREATE_UUID_FLAG = 1 << PIO_WRITE_CONFIG_CREATE_UUID_BIT,
PIO_WRITE_CONFIG_CREATE_CURVILINEAR_GRID_FLAG = 1 << PIO_WRITE_CONFIG_CREATE_CURVILINEAR_GRID_BIT,
PIO_WRITE_CONFIG_USE_DIST_GRID_FLAG = 1 << PIO_WRITE_CONFIG_USE_DIST_GRID_BIT,
};
struct model_config
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment