Skip to content
Snippets Groups Projects
Commit fa39136d authored by Swantje Bastin's avatar Swantje Bastin
Browse files

pyic_tave.py: Enabled use of dask slurm cluster.

parent 466ff331
No related branches found
No related tags found
No related merge requests found
......@@ -30,6 +30,7 @@ import sys
from dask.distributed import Client, progress
from distributed.scheduler import logger
import socket
from time import sleep
# ## Preliminaries
......@@ -75,7 +76,13 @@ def main():
parser.add_argument('--vars', type=str, default=None,
help='Variables which are averaged')
parser.add_argument('--dask', type=str, default=None,
help='Specify how das is going to be used.')
help='Specify how dask is going to be used.')
parser.add_argument('--ndasknodes', type=int, default=2,
help='Specify number of nodes to use for dask slurm cluster')
parser.add_argument('--username', type=str, default=None,
help='Specify username on Levante')
parser.add_argument('--accountname', type=str, default=None,
help='Specify DKRZ project account name')
# -
if debug_jupyter:
......@@ -102,7 +109,7 @@ def main():
return strings
fpath_data = iopts.fpath_data[0]
fpath_data = iopts.fpath_data #[0]
ave_vars = decipher_list(iopts.vars)
time_sel = decipher_list(iopts.time_sel)
time_isel = decipher_list(iopts.time_isel)
......@@ -112,15 +119,17 @@ def main():
# ### Setting up a dask cluster
nodask = False
username = 'm300602'
account_name = 'mh0033'
dask_tmp_dir = '/work/{account_name}/{username}/dask_tmp'
#username = 'm300602'
#account_name = 'mh0033'
username = iopts.username
account_name = iopts.accountname
dask_tmp_dir = f'/work/{account_name}/{username}/dask_tmp'
if not iopts.dask:
print('!!! Warning: No --dask option specified, continue without dask.!!!')
elif iopts.dask=='simple_client':
if __name__=="__main__":
dask.config.config.get('distributed').get('dashboard').update({'link':'{JUPYTERHUB_SERVICE_PREFIX}/proxy/{port}/status'})
dask.config.config.get('distributed').get('dashboard').update({'link':f'{JUPYTERHUB_SERVICE_PREFIX}/proxy/{port}/status'})
client = Client(n_workers=10, threads_per_worker=1, memory_limit='20GB')
client.run(numcodecs.register_codec, gribscan.RawGribCodec, "gribscan.rawgrib")
elif iopts.dask=='local':
......@@ -140,14 +149,18 @@ def main():
port = client.scheduler_info()['services']['dashboard']
logger.info(f"ssh -L 8788:localhost:{port} -J {username}@levante.dkrz.de {username}@{host}")
elif iopts.dask=='slurm':
if not (iopts.username and iopts.accountname):
print('Warning: username and project account need to be specified')
from dask_jobqueue import SLURMCluster
queue = 'compute' # Name of the partition we want to use
job_name = 'pyic_dask' # Job name that is submitted via sbatch
memory = "100GiB" # Max memory per node that is going to be used - this depends on the partition
cores = 24 # Max number of cores per task that are reserved - also partition dependend
processes = 6 # split job per node into so many processes (should be roughly sqrt(cores))
walltime = '8:00:00' # Walltime - also partition dependent
cluster = SLURMCluster(memory=memory,
cores=cores,
processes=processes,
project=account_name,
walltime=walltime,
queue=queue,
......@@ -156,15 +169,19 @@ def main():
local_directory=dask_tmp_dir,
job_extra=[f'-J {job_name}',
f'-D {dask_tmp_dir}',
f'--begin=now',
#f'--begin=now', # does not work -- workers do not connect...?
f'--output={dask_tmp_dir}/LOG_cluster.%j.o',
f'--output={dask_tmp_dir}/LOG_cluster.%j.o'
],
interface='ib0')
cluster.scale(jobs=2)
cluster.scale(jobs=iopts.ndasknodes)
client = Client(cluster)
if iopts.verbose:
print(cluster.job_script())
nworkers = processes * iopts.ndasknodes
while ((client.status == "running") and (len(client.scheduler_info()["workers"]) < nworkers)):
sleep(1.0)
print("jobs running, starting computation")
else:
print(f'!!! Warning: Unknown dask method: {iopts.dask}. Continuing without a dask cluster. !!!')
......@@ -179,8 +196,9 @@ def main():
nodask = True
# ## Loading the data
flist = glob.glob(fpath_data)
#flist = glob.glob(fpath_data)
flist = fpath_data # shell already expanded argument into list of files
flist.sort()
if iopts.verbose:
print("\n".join(flist))
......@@ -263,6 +281,8 @@ def main():
progress(x)
print('All done!')
if iopts.dask=="slurm":
client.shutdown()
return
if __name__=='__main__':
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment