Skip to content
Snippets Groups Projects
Commit 231b0bbd authored by Uwe Schulzweida's avatar Uwe Schulzweida
Browse files

DRMAA update

parent 92561c8c
No related branches found
No related tags found
No related merge requests found
......@@ -758,10 +758,15 @@ int main(int argc, char *argv[])
{
char commandline[65536];
int i;
int status;
char jobname[1024];
char jobfilename[1024];
FILE *jobfilep;
int ftpget(const char *target, const char *source);
char ftp_url[4096];
char ftpfile[1024];
char tmppath[] = "/localA/m214003/tmp/";
char tmppath2[] = "/scratch/localA/m214003/tmp/";
FILE *jobfilep, *ftpfilep;
int ftpget(int flag, const char *url, const char *path, const char *target, const char *source);
commandline[0] = 0;
strcat(commandline, "/pf/m/m214003/cdt/work/cdo/build/GRID_AMD64/src/cdo ");
......@@ -785,13 +790,13 @@ int main(int argc, char *argv[])
}
fprintf(jobfilep, "#! /bin/bash\n");
fprintf(jobfilep, "uname -s\n");
fprintf(jobfilep, "pwd\n");
fprintf(jobfilep, "ls -l /pf/m/m214003/tmp\n");
fprintf(jobfilep, "cd /scratch/small/m214/m214003/tmp\n");
fprintf(jobfilep, "cd /pf/m/m214003/tmp\n");
fprintf(jobfilep, "#uname -s\n");
fprintf(jobfilep, "#pwd\n");
fprintf(jobfilep, "#ls -l %s\n", tmppath2);
fprintf(jobfilep, "rm -f %s*\n", tmppath2);
fprintf(jobfilep, "cd %s\n", tmppath2);
fprintf(jobfilep, "rm -f %s\n", cdojobfiles);
fprintf(jobfilep, "echo $LD_LIBRARY_PATH\n");
fprintf(jobfilep, "#echo $LD_LIBRARY_PATH\n");
fprintf(jobfilep, "setenv LD_LIBRARY_PATH /opt/gridware/sge/lib/lx24-amd64:$LD_LIBRARY_PATH\n");
fprintf(jobfilep, "%s\n", commandline);
......@@ -801,10 +806,29 @@ int main(int argc, char *argv[])
job_submit(cdoExpName, jobfilename, jobname);
sprintf(commandline, "rm -r %s\n", jobfilename);
sprintf(commandline, "rm -f %s\n", jobfilename);
system(commandline);
ftpget(cdojobfiles, cdojobfiles);
sprintf(commandline, "rm -f %s\n", cdojobfiles);
system(commandline);
sprintf(ftp_url, "ftp://%s.zmaw.de", cdoExpName);
status = ftpget(0, ftp_url, tmppath, cdojobfiles, cdojobfiles);
if ( status == 0 )
{
ftpfilep = fopen(cdojobfiles, "r");
if ( ftpfilep )
{
while ( fscanf(ftpfilep, "%s\n", ftpfile) == 1 )
{
ftpget(1, ftp_url, tmppath, ftpfile, ftpfile);
}
fclose(ftpfilep);
}
}
}
else
{
......
......@@ -3,6 +3,10 @@
#include <string.h>
#include <stdlib.h>
#include <sys/types.h> /* fstat */
#include <sys/stat.h>
#include <unistd.h>
#include "cdo.h"
#if defined (HAVE_LIBDRMAA)
......@@ -40,9 +44,10 @@ static drmaa_job_template_t *create_job_template(const char *expname, const char
/* determine current path */
size = pathconf(".", _PC_PATH_MAX);
if ((dir = (char *)malloc((size_t)size)) != NULL) {
ptr = getcwd(dir, (size_t)size);
}
if ( (dir = (char *)malloc((size_t)size)) != NULL )
{
ptr = getcwd(dir, (size_t)size);
}
/* generate DRMAA conform output path */
......@@ -80,7 +85,7 @@ static drmaa_job_template_t *create_job_template(const char *expname, const char
drmaa_set_attribute(job, DRMAA_OUTPUT_PATH, output_path, NULL, 0);
/* join output/error file */
drmaa_set_attribute(job, DRMAA_JOIN_FILES, "y", NULL, 0);
drmaa_set_attribute(job, DRMAA_JOIN_FILES, "n", NULL, 0);
/* transfer files */
drmaa_set_attribute(job, DRMAA_TRANSFER_FILES, "ieo", NULL, 0);
......@@ -110,41 +115,94 @@ static drmaa_job_template_t *create_job_template(const char *expname, const char
static int drmaa_submit(const char *expname, const char *jobfilename, const char *jobname)
{
char status[DRMAA_ERROR_STRING_BUFFER];
char jobid[DRMAA_JOBNAME_BUFFER], jobout[DRMAA_JOBNAME_BUFFER];
int drmaa_errno, stat;
drmaa_job_template_t *job;
int aborted, exited, signaled, exit_status;
drmaa_attr_values_t *rusage = NULL;
char usage[DRMAA_ERROR_STRING_BUFFER];
int stdout_is_tty = 0;
int errnum;
if (drmaa_init(NULL, status, sizeof(status)-1) != DRMAA_ERRNO_SUCCESS) {
fprintf(stderr, "drmaa_init() failed: %s\n", status);
return 1;
{ /* check character device on stdout */
struct stat statbuf;
fstat(1, &statbuf);
if ( S_ISCHR(statbuf.st_mode) ) stdout_is_tty = 1;
}
if ( drmaa_init(NULL, status, sizeof(status)-1) != DRMAA_ERRNO_SUCCESS )
{
fprintf(stderr, "drmaa_init() failed: %s\n", status);
return 1;
}
/* submit some sequential jobs */
if (!(job = create_job_template(expname, jobfilename, jobname))) {
fprintf(stderr, "create_job_template() failed\n");
return 1;
}
if ( !(job = create_job_template(expname, jobfilename, jobname)) )
{
fprintf(stderr, "create_job_template() failed\n");
return 1;
}
while ((drmaa_errno = drmaa_run_job(jobid, sizeof(jobid)-1, job, status,
sizeof(status)-1)) == DRMAA_ERRNO_DRM_COMMUNICATION_FAILURE) {
fprintf(stderr, "drmaa_run_job() failed - retry: %s\n", status);
sleep(1);
}
if (drmaa_errno != DRMAA_ERRNO_SUCCESS) {
fprintf(stderr, "drmaa_run_job() failed: %s\n", status);
return 1;
}
fprintf(stderr, "job %s submitted.\n", jobid);
while ( (drmaa_errno = drmaa_run_job(jobid, sizeof(jobid)-1, job, status, sizeof(status)-1))
== DRMAA_ERRNO_DRM_COMMUNICATION_FAILURE )
{
fprintf(stderr, "drmaa_run_job() failed - retry: %s\n", status);
sleep(1);
}
if ( drmaa_errno != DRMAA_ERRNO_SUCCESS )
{
fprintf(stderr, "drmaa_run_job() failed: %s\n", status);
return 1;
}
if ( stdout_is_tty )
{
fprintf(stdout, "%s job %s ", expname, jobid);
fprintf(stdout, "submitted");
fflush(stdout);
}
if ( stdout_is_tty )
{
while ( 1 )
{
sleep (1);
errnum = drmaa_job_ps(jobid, &stat, status, DRMAA_ERROR_STRING_BUFFER);
if ( errnum != DRMAA_ERRNO_SUCCESS ) break;
if ( stat == DRMAA_PS_QUEUED_ACTIVE ||
stat == DRMAA_PS_SYSTEM_ON_HOLD ||
stat == DRMAA_PS_USER_ON_HOLD ||
stat == DRMAA_PS_USER_SYSTEM_ON_HOLD )
{
fprintf(stdout, "\b\b\b\b\b\b\b\b\bqueued ");
fflush(stdout);
}
else
break;
}
while ( 1 )
{
sleep (1);
errnum = drmaa_job_ps(jobid, &stat, status, DRMAA_ERROR_STRING_BUFFER);
if ( errnum != DRMAA_ERRNO_SUCCESS ) break;
if ( stat == DRMAA_PS_RUNNING )
{
fprintf(stdout, "\b\b\b\b\b\b\b\b\brunning ");
fflush(stdout);
}
else
break;
}
}
drmaa_delete_job_template(job, NULL, 0);
......@@ -153,34 +211,49 @@ static int drmaa_submit(const char *expname, const char *jobfilename, const char
drmaa_errno = drmaa_wait(jobid, jobout, sizeof(jobout)-1,
&stat, DRMAA_TIMEOUT_WAIT_FOREVER, &rusage, status, sizeof(status)-1);
if (drmaa_errno != DRMAA_ERRNO_SUCCESS) {
fprintf(stderr, "drmaa_wait(%s) failed: %s\n", jobout, status);
return 1;
}
if ( drmaa_errno != DRMAA_ERRNO_SUCCESS )
{
fprintf(stderr, "drmaa_wait(%s) failed: %s\n", jobout, status);
return 1;
}
/*
* report how job finished
*/
drmaa_wifaborted(&aborted, stat, NULL, 0);
if (aborted) {
printf("job %s never ran\n", jobid);
} else {
drmaa_wifexited(&exited, stat, NULL, 0);
if (exited) {
drmaa_wexitstatus(&exit_status, stat, NULL, 0);
printf("job %s finished regularly with exit status %d\n",
jobid, exit_status);
} else {
drmaa_wifsignaled(&signaled, stat, NULL, 0);
if (signaled) {
char termsig[DRMAA_SIGNAL_BUFFER+1];
drmaa_wtermsig(termsig, DRMAA_SIGNAL_BUFFER, stat, NULL, 0);
printf("job %s finished due to signal %s\n", jobid, termsig);
} else
printf("job %s finished with unclear conditions\n", jobid);
if ( aborted )
{
fprintf(stderr, "job %s never ran\n", jobid);
}
}
else
{
drmaa_wifexited(&exited, stat, NULL, 0);
if ( exited )
{
drmaa_wexitstatus(&exit_status, stat, NULL, 0);
if ( stdout_is_tty )
{
fprintf(stdout, "\b\b\b\b\b\b\b\b\bfinished\n");
}
if ( exit_status )
fprintf(stdout, "%s job %s exit status %d\n", expname, jobid, exit_status);
}
else
{
drmaa_wifsignaled(&signaled, stat, NULL, 0);
if ( signaled )
{
char termsig[DRMAA_SIGNAL_BUFFER+1];
drmaa_wtermsig(termsig, DRMAA_SIGNAL_BUFFER, stat, NULL, 0);
fprintf(stderr, "job %s finished due to signal %s\n", jobid, termsig);
}
else
fprintf(stderr, "job %s finished with unclear conditions\n", jobid);
}
}
if ( stdout_is_tty ) fprintf(stdout, "\n");
if ( cdoVerbose )
{
......@@ -193,19 +266,22 @@ static int drmaa_submit(const char *expname, const char *jobfilename, const char
drmaa_release_attr_values (rusage);
if (drmaa_exit(status, sizeof(status)-1) != DRMAA_ERRNO_SUCCESS) {
fprintf(stderr, "drmaa_exit() failed: %s\n", status);
return 1;
}
if ( drmaa_exit(status, sizeof(status)-1) != DRMAA_ERRNO_SUCCESS )
{
fprintf(stderr, "drmaa_exit() failed: %s\n", status);
return 1;
}
{
char commandline[1024];
sprintf(commandline, "cat %s.o%s | grep -v tty | grep -v cannot | grep -v resize | grep -v shell\n", jobname, jobid);
sprintf(commandline, "cat %s.o%s | grep -v tty | grep -v shell\n", jobname, jobid);
system(commandline);
sprintf(commandline, "cat %s.e%s | grep -v cannot | grep -v resize | grep -v rm\n", jobname, jobid);
system(commandline);
sprintf(commandline, "rm -f %s.o%s\n", jobname, jobid);
sprintf(commandline, "rm -f %s.o%s %s.e%s\n", jobname, jobid, jobname, jobid);
system(commandline);
}
......@@ -238,6 +314,7 @@ struct FtpFile {
FILE *stream;
};
int my_fwrite(void *buffer, size_t size, size_t nmemb, void *stream)
{
struct FtpFile *out=(struct FtpFile *)stream;
......@@ -250,18 +327,60 @@ int my_fwrite(void *buffer, size_t size, size_t nmemb, void *stream)
}
int ftpget(const char *target, const char *source)
int my_progress_func(int *stdout_is_tty,
double t, /* dltotal */
double d, /* dlnow */
double ultotal,
double ulnow)
{
if ( *stdout_is_tty )
{
fprintf(stdout, "\b\b\b\b\b%4d%%", (int) (d*100/t));
fflush(stdout);
}
return 0;
}
int ftpget(int flag, const char *url, const char *path, const char *target, const char *source)
{
#if defined (HAVE_LIBCURL)
CURL *curl;
CURLcode res;
struct curl_slist* commands = NULL ;
struct FtpFile ftpfile={
NULL, /* name to store the file as if succesful */
NULL
};
char filename[8196] = "ftp://qftps.zmaw.de/pf/m/m214003/tmp/";
char filename[8196];
char ftpcommand[1024];
char errorbuffer[CURL_ERROR_SIZE];
int status = 0;
int stdout_is_tty = 0;
char prompt[1024];
{ /* check character device on stdout */
struct stat statbuf;
fstat(1, &statbuf);
if ( S_ISCHR(statbuf.st_mode) ) stdout_is_tty = 1;
}
strcat(filename, source);
sprintf(filename, "%s%s", path, source);
sprintf(ftpcommand, "PWD");
commands = curl_slist_append(commands, ftpcommand) ;
sprintf(ftpcommand, "DELE %s\n", filename);
commands = curl_slist_append(commands, ftpcommand) ;
sprintf(filename, "%s%s%s", url, path, source);
if ( flag )
{
sprintf(prompt, "Download %-40s ", filename);
fprintf(stdout, "%s ", prompt);
}
ftpfile.filename = target;
......@@ -269,46 +388,73 @@ int ftpget(const char *target, const char *source)
curl = curl_easy_init();
if(curl) {
curl_easy_setopt(curl, CURLOPT_NETRC, CURL_NETRC_REQUIRED);
if ( curl )
{
curl_easy_setopt(curl, CURLOPT_NETRC, CURL_NETRC_REQUIRED);
curl_easy_setopt(curl, CURLOPT_VERBOSE, 1L);
if ( cdoVerbose )
curl_easy_setopt(curl, CURLOPT_VERBOSE, 1L);
else
curl_easy_setopt(curl, CURLOPT_VERBOSE, 0L);
curl_easy_setopt(curl, CURLOPT_FTP_SSL, CURLFTPSSL_CONTROL);
curl_easy_setopt(curl, CURLOPT_FTPSSLAUTH, CURLFTPAUTH_TLS);
if ( flag )
{
curl_easy_setopt(curl, CURLOPT_NOPROGRESS, FALSE);
curl_easy_setopt(curl, CURLOPT_PROGRESSFUNCTION, my_progress_func);
curl_easy_setopt(curl, CURLOPT_PROGRESSDATA, &stdout_is_tty);
}
curl_easy_setopt(curl, CURLOPT_SSL_VERIFYPEER, 0L);
curl_easy_setopt(curl, CURLOPT_SSL_VERIFYHOST, 0L);
curl_easy_setopt(curl, CURLOPT_FTP_SSL, CURLFTPSSL_CONTROL);
curl_easy_setopt(curl, CURLOPT_FTPSSLAUTH, CURLFTPAUTH_TLS);
curl_easy_setopt(curl, CURLOPT_SSLKEYPASSWD, "");
curl_easy_setopt(curl, CURLOPT_SSL_VERIFYPEER, 0L);
curl_easy_setopt(curl, CURLOPT_SSL_VERIFYHOST, 0L);
curl_easy_setopt(curl, CURLOPT_URL, filename);
curl_easy_setopt(curl, CURLOPT_SSLKEYPASSWD, "");
/* define callback */
curl_easy_setopt(curl, CURLOPT_URL, filename);
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, my_fwrite);
/* define callback */
/* set a pointer to struct to pass to the callback */
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, my_fwrite);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, &ftpfile);
/* set a pointer to struct to pass to the callback */
res = curl_easy_perform(curl);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, &ftpfile);
curl_easy_cleanup(curl);
curl_easy_setopt(curl, CURLOPT_ERRORBUFFER, errorbuffer);
if(CURLE_OK != res) {
fprintf(stderr, "curl told us %d\n", res);
}
curl_easy_setopt(curl, CURLOPT_POSTQUOTE, commands);
}
res = curl_easy_perform(curl);
if(ftpfile.stream)
fclose(ftpfile.stream);
curl_slist_free_all(commands);
curl_easy_cleanup(curl);
if ( CURLE_OK != res )
{
if ( flag )
{
/* fprintf(stderr, "curl told us %d\n", res); */
fprintf(stderr, "%s\n", errorbuffer);
}
status = -2;
}
}
else
{
status = -1;
}
if ( ftpfile.stream ) fclose(ftpfile.stream);
curl_global_cleanup();
if ( flag ) fprintf(stdout, "\n");
#else
fprintf(stderr, "CURL library not available!\n");
#endif
return 0;
return (status);
}
......@@ -494,8 +494,23 @@ int pstreamOpenRead(const char *argument)
else
{
len = strlen(argument);
filename = (char *) malloc(len+1);
strcpy(filename, argument);
if ( cdoExpMode == CDO_EXP_REMOTE )
{
char datapath[] = "/scratch/localA/m214003/data/";
len += strlen(datapath);
filename = (char *) malloc(len+1);
strcpy(filename, datapath);
strcat(filename, argument);
}
else
{
filename = (char *) malloc(len+1);
strcpy(filename, argument);
}
}
if ( PSTREAM_Debug ) Message(func, "file %s", filename);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment