Skip to content
Snippets Groups Projects
Commit d8339ccd authored by Nils Brüggemann's avatar Nils Brüggemann
Browse files

tools/pyic_tave.py: Merge with master

parents a970efc7 847803c7
No related branches found
No related tags found
No related merge requests found
......@@ -31,6 +31,7 @@ from dask.distributed import Client, progress
from distributed.scheduler import logger
import socket
#from ipdb import set_trace as mybreak
from time import sleep
# ## Preliminaries
......@@ -75,10 +76,16 @@ def main():
help='Time records in between averageing will be done.')
parser.add_argument('--vars', type=str, default=None,
help='Variables which are averaged')
parser.add_argument('--dask', type=str, default=None,
help='Specify how das is going to be used.')
parser.add_argument('--groupby', type=str, default=None,
help='Specify a groupby operation.')
parser.add_argument('--dask', type=str, default=None,
help='Specify how dask is going to be used.')
parser.add_argument('--ndasknodes', type=int, default=2,
help='Specify number of nodes to use for dask slurm cluster')
parser.add_argument('--username', type=str, default=None,
help='Specify username on Levante')
parser.add_argument('--accountname', type=str, default=None,
help='Specify DKRZ project account name')
# -
if debug_jupyter:
......@@ -105,7 +112,6 @@ def main():
return strings
fpath_data = iopts.fpath_data[0]
ave_vars = decipher_list(iopts.vars)
time_sel = decipher_list(iopts.time_sel)
time_isel = decipher_list(iopts.time_isel)
......@@ -115,15 +121,17 @@ def main():
# ### Setting up a dask cluster
nodask = False
username = 'm300602'
account_name = 'mh0033'
dask_tmp_dir = '/work/{account_name}/{username}/dask_tmp'
#username = 'm300602'
#account_name = 'mh0033'
username = iopts.username
account_name = iopts.accountname
dask_tmp_dir = f'/work/{account_name}/{username}/dask_tmp'
if not iopts.dask:
print('!!! Warning: No --dask option specified, continue without dask.!!!')
elif iopts.dask=='simple_client':
if __name__=="__main__":
dask.config.config.get('distributed').get('dashboard').update({'link':'{JUPYTERHUB_SERVICE_PREFIX}/proxy/{port}/status'})
dask.config.config.get('distributed').get('dashboard').update({'link':f'{JUPYTERHUB_SERVICE_PREFIX}/proxy/{port}/status'})
client = Client(n_workers=10, threads_per_worker=1, memory_limit='20GB')
client.run(numcodecs.register_codec, gribscan.RawGribCodec, "gribscan.rawgrib")
elif iopts.dask=='local':
......@@ -143,14 +151,18 @@ def main():
port = client.scheduler_info()['services']['dashboard']
logger.info(f"ssh -L 8788:localhost:{port} -J {username}@levante.dkrz.de {username}@{host}")
elif iopts.dask=='slurm':
if not (iopts.username and iopts.accountname):
print('Warning: username and project account need to be specified')
from dask_jobqueue import SLURMCluster
queue = 'compute' # Name of the partition we want to use
job_name = 'pyic_dask' # Job name that is submitted via sbatch
memory = "100GiB" # Max memory per node that is going to be used - this depends on the partition
cores = 24 # Max number of cores per task that are reserved - also partition dependend
processes = 6 # split job per node into so many processes (should be roughly sqrt(cores))
walltime = '8:00:00' # Walltime - also partition dependent
cluster = SLURMCluster(memory=memory,
cores=cores,
processes=processes,
project=account_name,
walltime=walltime,
queue=queue,
......@@ -159,15 +171,19 @@ def main():
local_directory=dask_tmp_dir,
job_extra=[f'-J {job_name}',
f'-D {dask_tmp_dir}',
f'--begin=now',
#f'--begin=now', # does not work -- workers do not connect...?
f'--output={dask_tmp_dir}/LOG_cluster.%j.o',
f'--output={dask_tmp_dir}/LOG_cluster.%j.o'
],
interface='ib0')
cluster.scale(jobs=2)
cluster.scale(jobs=iopts.ndasknodes)
client = Client(cluster)
if iopts.verbose:
print(cluster.job_script())
nworkers = processes * iopts.ndasknodes
while ((client.status == "running") and (len(client.scheduler_info()["workers"]) < nworkers)):
sleep(1.0)
print("jobs running, starting computation")
else:
print(f'!!! Warning: Unknown dask method: {iopts.dask}. Continuing without a dask cluster. !!!')
......@@ -182,9 +198,9 @@ def main():
nodask = True
# ## Loading the data
flist = iopts.fpath_data
flist.sort()
if iopts.verbose:
print("\n".join(flist))
......@@ -271,6 +287,8 @@ def main():
progress(x)
print('All done!')
if iopts.dask=="slurm":
client.shutdown()
return
if __name__=='__main__':
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment