Commit 3f2ebc0b authored by Cedrick Ansorge's avatar Cedrick Ansorge
Browse files

parallel version of sort_add

parent 0952ab18
......@@ -46,6 +46,10 @@
enum {REMAPCON, REMAPCON2, REMAPBIL, REMAPBIC, REMAPDIS, REMAPNN, REMAPLAF, REMAPSUM,
GENCON, GENCON2, GENBIL, GENBIC, GENDIS, GENNN, GENLAF, REMAPXXX};
enum {HEAP_SORT, MERGE_SORT};
#define SORT_MODE MERGE_SORT
static
void get_map_type(int operfunc, int *map_type, int *submap_type, int *remap_order)
{
......@@ -784,8 +788,37 @@ void *Remap(void *argument)
if ( remaps[r].vars.num_links != remaps[r].vars.max_links )
resize_remap_vars(&remaps[r].vars, remaps[r].vars.num_links-remaps[r].vars.max_links);
sort_add(remaps[r].vars.num_links, remaps[r].vars.num_wts,
remaps[r].vars.grid2_add, remaps[r].vars.grid1_add, remaps[r].vars.wts);
//sort_add(remaps[r].vars.num_links, remaps[r].vars.num_wts,
// remaps[r].vars.grid2_add, remaps[r].vars.grid1_add, remaps[r].vars.wts);
if ( SORT_MODE == MERGE_SORT )
{ /*
** use a combination of the old sort_add and a split and
** merge approach. The chunk size is determined by
** MERGE_SORT_LIMIT_SIZE in remaplib.c. OpenMP parallelism
** is supported
*/
int numThreads;
// printf("Calling sort_iter:\nnum_links: %li\nnum_wts: %li\n",
// remaps[r].vars.num_links,remaps[r].vars.num_wts);
#if defined (_OPENMP)
#pragma omp parallel
{
numThreads = omp_get_num_threads();
}
#else
numThreads = 1;
#endif
printf("using %i threads\n",numThreads);
sort_iter(remaps[r].vars.num_links, remaps[r].vars.num_wts,
remaps[r].vars.grid2_add, remaps[r].vars.grid1_add,
remaps[r].vars.wts,numThreads);
}
else if ( SORT_MODE == HEAP_SORT )
{ /* use a pure heap sort without any support of parallelism */
sort_add(remaps[r].vars.num_links, remaps[r].vars.num_wts,
remaps[r].vars.grid2_add, remaps[r].vars.grid1_add,
remaps[r].vars.wts);
}
if ( lwrite_remap ) goto WRITE_REMAP;
......
#if defined (_OPENMP)
#include <omp.h>
#endif
#define RESTR_TYPE int /* restrict data types: 0 -> double, float; 1 -> int */
......@@ -173,6 +176,8 @@ void reorder_links(remapvars_t *rv);
void sort_add(long num_links, long num_wts, int *restrict add1, int *restrict add2, double *restrict *restrict weights);
void sort_add_test(long num_links, long num_wts, int *restrict add1, int *restrict add2, double *restrict *restrict weights);
void sort_iter(long num_links, long num_wts, int *add1, int *add2, double **weights, int parent);
void merge_lists(int *nl, int *l11, int *l12, int *l21, int *l22, long *idx);
void write_remap_scrip(const char *interp_file, int map_type, int submap_type,
int remap_order, remapgrid_t rg, remapvars_t rv);
......
......@@ -46,6 +46,8 @@
2009-01-11 Uwe Schulzweida: OpenMP parallelization
*/
#if defined (HAVE_CONFIG_H)
# include "config.h"
#endif
......@@ -78,6 +80,8 @@ struct grid_layer
struct grid_layer *next;
};
enum TPARMODE { parallel, serial };
typedef struct grid_layer grid_layer_t;
typedef struct
......@@ -109,6 +113,13 @@ typedef struct
#define PIH HALF*PI
/* MERGE SORT DEFINES */
#define MERGE_SORT_CHUNKS 64
#define MERGE_SORT_LIMIT_SIZE 4096 //num_links/(MERGE_SORT_CHUNKS*omp_get_num_procs())
#define VERBOSE_PAR 0
#define VERBOSE 0
/* static double north_thresh = 1.45; */ /* threshold for coord transf. */
/* static double south_thresh =-2.00; */ /* threshold for coord transf. */
static double north_thresh = 2.00; /* threshold for coord transf. */
......@@ -6919,3 +6930,294 @@ void read_remap_scrip(const char *interp_file, int gridID1, int gridID2, int *ma
rv->links.dst_add = NULL;
rv->links.w_index = NULL;
} /* read_remap_scrip */
/* ********************************************************************************
** XXX XXX XXXXXXXXXX XXXXXXXXXX XXXXXXXXXXX XXXXXXXXXX
** XXXX XXXX XXXXXXXXXX XXXXXXXXXXX XXXXXXXXXXX XXXXXXXXXX
** XXXXX XXXXX XXX XXX XXX XXX XXX
** XXXXXX XXXXXX XXXXXXXXX XXXX XXXX XXX XXXXXXXX
** XXX XXX XXX XXXXXXXXX XXXXXXX XXX XXXX XXXXXXXX
** XXX X XXX XXX XXX XXXX XXX XXX XXX
** XXX XXX XXXXXXXXXX XXX XXXX XXXXXXXXXXXX XXXXXXXXXX
** XXX XXX XXXXXXXXXX XXX XXXX XXXXXXXXXX XXXXXXXXXX
**
**
** XXXXXXXXXXX XXXXXXXX XXXXXXXXXX XXXXXXXXXXXXX
** XXXXXXXXXXX XXX XXX XXXXXXXXXXX XXXXXXXXXXXXX
** XXX XXX XXX XXX XXX XXX
** XXXXXXXXXXX XXX XXX XXXX XXXX XXX
** XXXXXXXXXXX XXX XXX XXXXXXXX XXX
** XXX XXX XXX XXX XXXX XXX
** XXXXXXXXXXX XXX XXX XXX XXXX XXX
** XXXXXXXXXXX XXXXXXXX XXX XXXX XXX
********************************************************************************** */
int first_sort_iter_call = 1;
int numThreads = 1;
int par_depth;
void sort_par(int nsplit, long num_links, long num_wts, int *add1, int *add2,
double **weights, int parent)
{
/*
This routine is the core of merge-sort. It does the following
+ split the address-arrays into two segments,
+ sort each array seperately (this can be done in parallel as there
is no data dependency)
- the routine sort_iter, which is called for sorting the sub-arrays
EITHER calls this routine againg, which means, that the sub-arrays
are further split
OR it calls sort_add, which actually sorts the sublist sequentially
+ merge the sorted arrays together
For the merge step additional memory is needed as it cannot work in place.
Therefor, the merge sort algorith in this implementation uses at maximum
twice as much memory as the sequential sort_add.
Parameters:
-----------
int nsplit | (only 2 allowed) number of segments to split the data
long num_links | length of arrays add1 and add2 (MUST be of same length
int *add1 *add2 | arrays with addresses, that are used as sorting criteria (ascending)
double ** weights | weights for each address that have to be kept in the same order
as add1[] and add2[]
int parent | the parent of this sort_par. This parameter is used to find
the recursion depth and determine the actual position of the
sub-array within the original array
*/
static char func[] = "sort_par";
int nl[nsplit]; /* number of links in each sub-array */
int thread_id,thread_num; /* thread id and number of threads */
int who_am_i,depth,my_depth; /* current depth, depth of children and index
to be parent in next call to sort_par */
int num_procs;
int *add_srt, *add_end; /* arrays for start and end index of sub array */
int **add1s, **add2s; /* pointers to sub arrays for sort and merge step */
int *tmp; /* pointer to buffer for merging of address lists */
double *tmp2 = NULL; /* pointer to buffer for merging weight lists */
double **wgttmp = NULL; /* pointer to buffer for swap weights */
long *idx; /* index list to merge sub-arrays */
long i,n,m;
if ( nsplit != 2 )
{
fprintf(stderr,"Error: splitting into more than two subsegments not allowed\n"
" in this implementation of merge sort\n");
exit(-1);
}
add_srt = (int *) malloc (nsplit*sizeof(int));
add_end = (int *) malloc (nsplit*sizeof(int));
add1s = (int **) malloc (nsplit*sizeof(int*));
add2s = (int **) malloc (nsplit*sizeof(int*));
idx = ( long * ) malloc ((num_links)*sizeof(long));
/* SPLIT AND SORT THE DATA FRAGMENTS */
/*
for ( i=0; i<nsplit; i++)
{
add_srt[i]= i * num_links/nsplit;
add_end[i]= (i+1) * num_links/nsplit;
add1s[i] = &(add1[add_srt[i]]);
add2s[i] = &(add2[add_srt[i]]);
nl[i] = add_end[i]-add_srt[i];
}
*/
add_srt[0] = 0; add_srt[1] = num_links/nsplit;
add1s[0] = &add1[add_srt[0]]; add1s[1] = &add1[add_srt[1]];
add2s[0] = &add2[add_srt[0]]; add2s[1] = &add2[add_srt[1]];
nl[0] = num_links/nsplit; nl[1] = num_links-nl[0];
add_end[0] = nl[0]; add_end[1] = num_links;
num_procs = numThreads;
depth = (int) (log(parent)/log(2));
#if defined (_OPENMP)
/* Allow for nested parallelism */
if ( omp_in_parallel() && depth<par_depth )
{
omp_set_nested(1);
if ( omp_get_nested() == 0 )
printf("Warning: openMP implementation seems to not support\n"
" nested parallelism. Maximum of CPUs used \n"
" is 2 instead of %i.\n",num_procs);
}
#endif
// printf("I am %i nl[0] %i nl[1] %i\n",parent,nl[0],nl[1]);
// printf("add_srt[0] %i add_Srt[1] %i\n",add_srt[0],add_srt[1]);
// if ( 1 )
// printf("\n\nSplitting thread into %i!! (I AM %i) depth %i parallel_depth %i add_srt[0]%i add_srt[1] %i\n",
// nsplit,parent,depth,par_depth,add_srt[0],add_srt[1]);
#if defined (_OPENMP)
#pragma omp parallel for if(depth<par_depth) \
private(i,n,m,wgttmp,thread_id,who_am_i,my_depth) \
shared(weights,thread_num) num_threads(2)
#endif
for ( i=0; i < nsplit; i++ )
{
who_am_i = nsplit*parent+i;
my_depth = (int) (log(parent)/log(2))+1;
int pow2d = 1 << my_depth;
#if defined (_OPENMP)
// if ( 1 )
// printf("I am %i (parent %i), my_depth is: %i thread_num %i (%i) \n",
// who_am_i,parent,my_depth,omp_get_thread_num()+1,omp_get_num_threads());
#endif
wgttmp = malloc(num_wts*sizeof(double*));
for ( n=0; n<num_wts; n++ )
{
wgttmp[n] = malloc ( nl[i] * sizeof(double));
for ( m=0; m<nl[i]; m++ )
wgttmp[n][m] = weights[n][add_srt[i]+m];
}
sort_iter(nl[i], num_wts, add1s[i], add2s[i], wgttmp, who_am_i);
for ( n=0; n<num_wts; n++ )
{
for ( m=0; m<nl[i]; m++ )
weights[n][add_srt[i]+m] = wgttmp[n][m];
free(wgttmp[n]);
}
free(wgttmp);
}
/* ********************************* */
/* TO DO */
/* THIS BIT NEEDS TO BE PARALLELIZED */
/* ********************************* */
/* Idea I: one CPU merges top-down, the other one bottom-up */
/* ********************** */
merge_lists(nl,add1s[0],add2s[0],add1s[1],add2s[1], idx); /* MERGE THE SEGMENTS */
/* ********************** */
tmp = malloc ( num_links*sizeof(int) );
#pragma omp if ( depth < par_depth ) parallel for private(i) num_threads(2)
for ( i=0; i< num_links; i++ )
tmp[i] = add1[idx[i]];
#pragma omp if ( depth < par_depth ) parallel for private(i) num_threads(2)
for ( i=0; i< num_links; i++ )
{
add1[i] = tmp[i];
tmp[i] = add2[idx[i]];
}
#pragma omp if ( depth < par_depth ) parallel for private(i) num_threads(2)
for ( i=0; i<num_links; i++ )
add2[i] = tmp[i];
free(tmp);
tmp=NULL;
tmp2 = (double *) malloc ( num_links*num_wts*sizeof(double) );
#pragma omp if ( depth < par_depth ) parallel for private(i,n) num_threads(2)
for ( i=0; i<num_links; i++ )
for ( n = 0; n< num_wts; n++ )
tmp2[num_wts*i + n] = weights[n][idx[i]];
#pragma omp if ( depth < par_depth ) parallel for private(i,n) num_threads(2)
for ( i=0; i<num_links; i++ )
for ( n = 0; n< num_wts; n++ )
weights[n][i] = tmp2[num_wts*i+n];
free(tmp2);
tmp2 = NULL;
free(add1s);
free(add2s);
free(add_srt);
free(add_end);
free(idx);
return;
}
void sort_iter(long num_links, long num_wts, int *add1, int *add2, double **weights,int parent)
{
/*
This routine is an interface between the parallelized (merge-sort)
and the sequential sorting algorithm for addresses implemented in
the library.
It iterates 1 level into the binary tree if the single data chunks
to sort are larger than the maximum size prescribed. Otherwise, it
just sorts the chunk using the sort_add routine as implemented
originally.
Note, that even on a single CPU, the merge sort algorithm can be
considerably faster (up to about 30% for a reasonable chunk size)
*/
/* Parameters as in sort_par
additional parameters
int mod; (enum TPAR_MODE) determines wether tomake use of merge sort
int parent; !!! CAUTION !!!
+ determines level and position of data chunk within
the original heap (level = log_2(who_am_i)) if sort_iter(...) has not
been called before
+ determines number of threads to use on first call of sort_iter(...)
*/
if ( first_sort_iter_call )
{
first_sort_iter_call = 0;
numThreads = parent;
parent = 1;
par_depth = (int)(log(numThreads)/log(2));
}
if ( num_links > MERGE_SORT_LIMIT_SIZE )
{
sort_par(2,num_links,num_wts,add1,add2,weights,parent);
if ( VERBOSE ) printf (" finished iteration parent %i\n", parent);
return;
}
sort_add(num_links,num_wts,add1,add2,weights);
return;
}
void merge_lists(int *nl, int *l11, int *l12, int *l21, int *l22, long *idx)
{
/*
This routine writes to idx a list of indices relative to *l11 and *l12
--> l11, l12, and l21,l22 each need to be allocated in
a signle block of memory
The order is thus, that (I) l11[idx[i]]<l11[idx[i+1]]
OR (II) l11[idx[i]]==l11[idx[i+1]] && l21[idx[i]]<l21[idx[i+1]]
where 0 <= i < nl
*/
int i1=0, i2=0, i=0, ii;
const int n1=nl[0], n2=nl[1];
i=0;
while ( i2 < n2 && i1 < n1 )
{
if ( ( l11[i1] < l21[i2] ) ||
( l11[i1] == l21[i2] && l12[i1] < l22[i2] ) )
{ idx[i] = i1; i1++; }
else
{ idx[i] = n1+i2; i2++; }
i++;
}
for ( ii=i1; i1 < n1; ii++ ) {idx[i] = i1; i++; i1++; }
for ( ii=i2; i2 < n2; ii++ ) {idx[i] = n1+i2; i++; i2++; }
return;
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment