Commit 47cc9f6f authored by Uwe Schulzweida's avatar Uwe Schulzweida
Browse files

add option -e exp to test DRMAA

parent d56d90dd
......@@ -272,6 +272,7 @@ src/history.c -text
src/institution.c -text
src/interpol.c -text
src/interpol.h -text
src/job.c -text
src/julian.c -text
src/legendre.c -text
src/list.c -text
......
......@@ -4,6 +4,7 @@
* using CDI library version 1.0.2
* set alias gradsdes to gradsdes2
* rename gradsdes to gradsdes1
* add option -e exp to test DRMAA
* add option -z szip to compress GRIB records with SZIP
* use DBL_IS_EQUAL to compare floating point
* remapbil, remapbic: improvement and speedup for regional lonlat grids
......
......@@ -114,6 +114,7 @@ cdo_SOURCES = Arith.c \
history.c \
institution.c \
interpol.c \
job.c \
modules.c \
namelist.c \
normal.c \
......
......@@ -194,6 +194,7 @@ cdo_SOURCES = Arith.c \
history.c \
institution.c \
interpol.c \
job.c \
modules.c \
namelist.c \
normal.c \
......@@ -287,14 +288,14 @@ am_cdo_OBJECTS = Arith.$(OBJEXT) Arithc.$(OBJEXT) Arithdays.$(OBJEXT) \
field.$(OBJEXT) fieldc.$(OBJEXT) field2.$(OBJEXT) \
fieldmer.$(OBJEXT) fieldzon.$(OBJEXT) grid.$(OBJEXT) \
history.$(OBJEXT) institution.$(OBJEXT) interpol.$(OBJEXT) \
modules.$(OBJEXT) namelist.$(OBJEXT) normal.$(OBJEXT) \
pipe.$(OBJEXT) process.$(OBJEXT) remaplib.$(OBJEXT) \
timer.$(OBJEXT) realtime.$(OBJEXT) pstream.$(OBJEXT) \
table.$(OBJEXT) userlog.$(OBJEXT) util.$(OBJEXT) \
legendre.$(OBJEXT) fourier.$(OBJEXT) specspace.$(OBJEXT) \
readline.$(OBJEXT) julian.$(OBJEXT) vinterp.$(OBJEXT) \
zaxis.$(OBJEXT) pthread_debug.$(OBJEXT) color.$(OBJEXT) \
list.$(OBJEXT)
job.$(OBJEXT) modules.$(OBJEXT) namelist.$(OBJEXT) \
normal.$(OBJEXT) pipe.$(OBJEXT) process.$(OBJEXT) \
remaplib.$(OBJEXT) timer.$(OBJEXT) realtime.$(OBJEXT) \
pstream.$(OBJEXT) table.$(OBJEXT) userlog.$(OBJEXT) \
util.$(OBJEXT) legendre.$(OBJEXT) fourier.$(OBJEXT) \
specspace.$(OBJEXT) readline.$(OBJEXT) julian.$(OBJEXT) \
vinterp.$(OBJEXT) zaxis.$(OBJEXT) pthread_debug.$(OBJEXT) \
color.$(OBJEXT) list.$(OBJEXT)
cdo_OBJECTS = $(am_cdo_OBJECTS)
cdo_DEPENDENCIES =
cdo_LDFLAGS =
......@@ -361,11 +362,12 @@ am__depfiles_maybe = depfiles
@AMDEP_TRUE@ ./$(DEPDIR)/fieldzon.Po ./$(DEPDIR)/fourier.Po \
@AMDEP_TRUE@ ./$(DEPDIR)/grid.Po ./$(DEPDIR)/history.Po \
@AMDEP_TRUE@ ./$(DEPDIR)/institution.Po ./$(DEPDIR)/interpol.Po \
@AMDEP_TRUE@ ./$(DEPDIR)/julian.Po ./$(DEPDIR)/legendre.Po \
@AMDEP_TRUE@ ./$(DEPDIR)/list.Po ./$(DEPDIR)/modules.Po \
@AMDEP_TRUE@ ./$(DEPDIR)/namelist.Po ./$(DEPDIR)/normal.Po \
@AMDEP_TRUE@ ./$(DEPDIR)/pipe.Po ./$(DEPDIR)/process.Po \
@AMDEP_TRUE@ ./$(DEPDIR)/pstream.Po ./$(DEPDIR)/pthread_debug.Po \
@AMDEP_TRUE@ ./$(DEPDIR)/job.Po ./$(DEPDIR)/julian.Po \
@AMDEP_TRUE@ ./$(DEPDIR)/legendre.Po ./$(DEPDIR)/list.Po \
@AMDEP_TRUE@ ./$(DEPDIR)/modules.Po ./$(DEPDIR)/namelist.Po \
@AMDEP_TRUE@ ./$(DEPDIR)/normal.Po ./$(DEPDIR)/pipe.Po \
@AMDEP_TRUE@ ./$(DEPDIR)/process.Po ./$(DEPDIR)/pstream.Po \
@AMDEP_TRUE@ ./$(DEPDIR)/pthread_debug.Po \
@AMDEP_TRUE@ ./$(DEPDIR)/readline.Po ./$(DEPDIR)/realtime.Po \
@AMDEP_TRUE@ ./$(DEPDIR)/remaplib.Po ./$(DEPDIR)/specspace.Po \
@AMDEP_TRUE@ ./$(DEPDIR)/table.Po ./$(DEPDIR)/timer.Po \
......@@ -553,6 +555,7 @@ distclean-compile:
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/history.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/institution.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/interpol.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/job.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/julian.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/legendre.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/list.Po@am__quote@
......
......@@ -72,6 +72,9 @@ int cdoTimer = FALSE;
int cdoVerbose = FALSE;
int cdoDebug = 0;
int cdoExpMode = -1;
char *cdoExpName = NULL;
int timer_total, timer_read, timer_write;
int timer_remap, timer_remap_con, timer_remap_con2, timer_remap_con3;
......@@ -177,7 +180,7 @@ static void usage(void)
}
static void cdoPrintHelp(char *phelp[], char *xoperator)
static void cdoPrintHelp(char *phelp[]/*, char *xoperator*/)
{
if ( phelp == NULL )
printf("No help available for this operator!\n");
......@@ -513,7 +516,7 @@ int main(int argc, char *argv[])
if ( noff ) setDefaultFileType(Progname+noff, 0);
while ( (c = cdoGetopt(argc, argv, "f:b:p:g:i:l:m:t:D:z:aBdhRrsTVvZ")) != -1 )
while ( (c = cdoGetopt(argc, argv, "f:b:e:p:g:i:l:m:t:D:z:aBdhRrsTVvZ")) != -1 )
{
switch (c)
{
......@@ -533,6 +536,18 @@ int main(int argc, char *argv[])
Debug = 1;
DebugLevel = atoi(cdoOptarg);
break;
case 'e':
{
char host[1024];
gethostname(host, sizeof(host));
cdoExpName = cdoOptarg;
/* printf("host: %s %s\n", host, cdoExpName); */
if ( strcmp(host, cdoExpName) == 0 )
cdoExpMode = CDO_EXP_REMOTE;
else
cdoExpMode = CDO_EXP_LOCAL;
break;
}
case 'f':
setDefaultFileType(cdoOptarg, 1);
break;
......@@ -730,36 +745,84 @@ int main(int argc, char *argv[])
if ( lstop ) return (status);
if ( cdoTimer )
if ( cdoDefaultTableID != CDI_UNDEFID ) cdiDefTableID(cdoDefaultTableID);
operatorName = getOperatorName(operatorArg);
if ( Help )
{
timer_total = timer_new("total");
timer_read = timer_new("read");
timer_write = timer_new("write");
timer_remap = timer_new("remap");
timer_remap_con = timer_new("remap con");
timer_remap_con2 = timer_new("remap con2");
timer_remap_con3 = timer_new("remap con3");
cdoPrintHelp(operatorHelp(operatorName)/*, operatorName*/);
}
else if ( cdoExpMode == CDO_EXP_LOCAL )
{
char commandline[65536];
int i;
char jobname[1024];
char jobfilename[1024];
FILE *jobfilep;
commandline[0] = 0;
strcat(commandline, "/pf/m/m214003/cdt/work/cdo/src/cdo ");
for ( i = 1; i < argc; i++ )
{
strcat(commandline, argv[i]);
strcat(commandline, " ");
}
/* printf("command: >%s<\n", commandline);*/
if ( cdoTimer ) timer_start(timer_total);
jobfilename[0] = 0;
strcat(jobfilename, "cdojob.sh");
if ( cdoDefaultTableID != CDI_UNDEFID ) cdiDefTableID(cdoDefaultTableID);
jobfilep = fopen(jobfilename, "w");
operatorName = getOperatorName(operatorArg);
if ( jobfilep == NULL )
{
fprintf(stderr, "Open failed on %s\n", jobfilename);
perror(jobfilename);
exit(EXIT_FAILURE);
}
if ( Help )
cdoPrintHelp(operatorHelp(operatorName), operatorName);
fprintf(jobfilep, "#! /bin/bash\n");
fprintf(jobfilep, "uname -s\n");
fprintf(jobfilep, "setenv LD_LIBRARY_PATH /opt/gridware/sge/lib/lx24-x86:$LD_LIBRARY_PATH\n");
fprintf(jobfilep, "%s\n", commandline);
fclose(jobfilep);
sprintf(jobname, "cdo_%s", cdoExpName);
job_submit(cdoExpName, jobfilename, jobname);
sprintf(commandline, "rm -r %s\n", jobfilename);
system(commandline);
}
else
operatorModule(operatorName)(argument);
{
if ( cdoTimer )
{
timer_total = timer_new("total");
timer_read = timer_new("read");
timer_write = timer_new("write");
timer_remap = timer_new("remap");
timer_remap_con = timer_new("remap con");
timer_remap_con2 = timer_new("remap con2");
timer_remap_con3 = timer_new("remap con3");
}
if ( cdoTimer ) timer_start(timer_total);
operatorModule(operatorName)(argument);
if ( cdoTimer ) timer_stop(timer_total);
if ( cdoTimer ) timer_report();
}
if ( argument ) free(argument);
/* problems with alias!!! if ( operatorName ) free(operatorName); */
/* malloc_stats(); */
if ( cdoTimer ) timer_stop(timer_total);
if ( cdoTimer ) timer_report();
return (status);
}
......@@ -58,6 +58,9 @@ char *strdup(const char *s);
#define M_LN10 2.30258509299404568402 /* log_e 10 */
#endif
#define CDO_EXP_LOCAL 1
#define CDO_EXP_REMOTE 2
void strtolower(char *str);
void print_pthread_info(void);
......@@ -93,4 +96,6 @@ void userlog(const char *prompt, double cputime);
void nospec(int vlistID);
void gridWrite(FILE *fp, int gridID);
void job_submit(const char *expname, const char *jobfilename, const char *jobname);
#endif /* _CDO_INT_H */
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include "cdo.h"
#if defined (HAVE_LIBDRMAA)
# include "drmaa.h"
#endif
#if defined (HAVE_LIBDRMAA)
static drmaa_job_template_t *create_job_template(const char *expname, const char *jobfilename, const char *jobname)
{
static char func[] = "create_job_template";
drmaa_job_template_t *job = NULL;
char error[DRMAA_ERROR_STRING_BUFFER];
char name[DRMAA_ATTR_BUFFER], value[DRMAA_ATTR_BUFFER];
char attr[1024];
long size;
char *dir, *ptr;
drmaa_attr_names_t *job_attributes;
int drmaa_errno;
char host[1024];
char *output_path;
int len, len1, len2;
/* determine hostname */
gethostname(host, 255);
/* determine current path */
size = pathconf(".", _PC_PATH_MAX);
if ((dir = (char *)malloc((size_t)size)) != NULL) {
ptr = getcwd(dir, (size_t)size);
}
/* generate DRMAA conform output path */
len1 = strlen(host);
len2 = strlen(dir);
len = len1+len2+1;
output_path = (char *) malloc(len*sizeof(char));
strcpy(output_path, host);
strcat(output_path, ":");
strcat(output_path, dir);
/* need to allow chdir on execution host, not thread save! */
setenv("SGE_DRMAA_ALLOW_CWD", "yes", 1);
/* allocate job template */
if (drmaa_allocate_job_template(&job, NULL, 0) != DRMAA_ERRNO_SUCCESS)
return NULL;
/* the job's name */
drmaa_set_attribute(job, DRMAA_JOB_NAME, jobname, NULL, 0);
/* the job to be run */
drmaa_set_attribute(job, DRMAA_REMOTE_COMMAND, jobfilename, NULL, 0);
/* submit state */
drmaa_set_attribute(job, DRMAA_JS_STATE, "drmaa_active", NULL, 0);
/* working directory on execution host */
drmaa_set_attribute(job, DRMAA_WD, dir, NULL, 0);
/* path for output */
drmaa_set_attribute(job, DRMAA_OUTPUT_PATH, output_path, NULL, 0);
/* join output/error file */
drmaa_set_attribute(job, DRMAA_JOIN_FILES, "y", NULL, 0);
/* transfer files */
drmaa_set_attribute(job, DRMAA_TRANSFER_FILES, "ieo", NULL, 0);
/* some native SGE commands necessary */
sprintf(attr, "-cwd -b n -q %s.q", expname);
drmaa_set_attribute(job, DRMAA_NATIVE_SPECIFICATION, attr, NULL, 0);
/* print out job attributes */
drmaa_get_attribute_names (&job_attributes, error, DRMAA_ERROR_STRING_BUFFER);
if ( cdoVerbose )
while ((drmaa_errno = drmaa_get_next_attr_name(job_attributes, name, DRMAA_ATTR_BUFFER)) == DRMAA_ERRNO_SUCCESS) {
drmaa_get_attribute (job, name, value, DRMAA_ATTR_BUFFER, error, DRMAA_ERROR_STRING_BUFFER);
fprintf (stderr, "name: %-25s \t %s\n", name, value);
}
free(dir);
return job;
}
#endif
#if defined (HAVE_LIBDRMAA)
static int drmaa_submit(const char *expname, const char *jobfilename, const char *jobname)
{
char status[DRMAA_ERROR_STRING_BUFFER];
char jobid[DRMAA_JOBNAME_BUFFER], jobout[DRMAA_JOBNAME_BUFFER];
int drmaa_errno, stat;
drmaa_job_template_t *job;
int aborted, exited, signaled, exit_status;
drmaa_attr_values_t *rusage = NULL;
char usage[DRMAA_ERROR_STRING_BUFFER];
if (drmaa_init(NULL, status, sizeof(status)-1) != DRMAA_ERRNO_SUCCESS) {
fprintf(stderr, "drmaa_init() failed: %s\n", status);
return 1;
}
/* submit some sequential jobs */
if (!(job = create_job_template(expname, jobfilename, jobname))) {
fprintf(stderr, "create_job_template() failed\n");
return 1;
}
while ((drmaa_errno = drmaa_run_job(jobid, sizeof(jobid)-1, job, status,
sizeof(status)-1)) == DRMAA_ERRNO_DRM_COMMUNICATION_FAILURE) {
fprintf(stderr, "drmaa_run_job() failed - retry: %s\n", status);
sleep(1);
}
if (drmaa_errno != DRMAA_ERRNO_SUCCESS) {
fprintf(stderr, "drmaa_run_job() failed: %s\n", status);
return 1;
}
fprintf(stderr, "job %s submitted.\n", jobid);
drmaa_delete_job_template(job, NULL, 0);
/* wait for job */
drmaa_errno = drmaa_wait(jobid, jobout, sizeof(jobout)-1,
&stat, DRMAA_TIMEOUT_WAIT_FOREVER, &rusage, status, sizeof(status)-1);
if (drmaa_errno != DRMAA_ERRNO_SUCCESS) {
fprintf(stderr, "drmaa_wait(%s) failed: %s\n", jobout, status);
return 1;
}
/*
* report how job finished
*/
drmaa_wifaborted(&aborted, stat, NULL, 0);
if (aborted) {
printf("job %s never ran\n", jobid);
} else {
drmaa_wifexited(&exited, stat, NULL, 0);
if (exited) {
drmaa_wexitstatus(&exit_status, stat, NULL, 0);
printf("job %s finished regularly with exit status %d\n",
jobid, exit_status);
} else {
drmaa_wifsignaled(&signaled, stat, NULL, 0);
if (signaled) {
char termsig[DRMAA_SIGNAL_BUFFER+1];
drmaa_wtermsig(termsig, DRMAA_SIGNAL_BUFFER, stat, NULL, 0);
printf("job %s finished due to signal %s\n", jobid, termsig);
} else
printf("job %s finished with unclear conditions\n", jobid);
}
}
if ( cdoVerbose )
{
fprintf(stderr, "Job usage:\n");
while (drmaa_get_next_attr_value (rusage, usage, DRMAA_ERROR_STRING_BUFFER) == DRMAA_ERRNO_SUCCESS) {
fprintf(stderr, " %s\n", usage);
}
}
drmaa_release_attr_values (rusage);
if (drmaa_exit(status, sizeof(status)-1) != DRMAA_ERRNO_SUCCESS) {
fprintf(stderr, "drmaa_exit() failed: %s\n", status);
return 1;
}
{
char commandline[1024];
sprintf(commandline, "cat %s.o%s | grep -v tty | grep -v cannot | grep -v resize | grep -v shell\n", jobname, jobid);
system(commandline);
sprintf(commandline, "rm -f %s.o%s\n", jobname, jobid);
system(commandline);
}
return 0;
}
#endif
void job_submit(const char *expname, const char *jobfilename, const char *jobname)
{
#if defined (HAVE_LIBDRMAA)
int status;
status = drmaa_submit(expname, jobfilename, jobname);
#else
fprintf(stderr, "DRMAA library not available!\n");
#endif
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment