diff --git a/workshop/testslurm.sh b/workshop/testslurm.sh old mode 100644 new mode 100755 index 9a7434c2e9f3331ec563e03fa030036d3adefc3d..c56a35b72fbf9a5399051b632c84c3e6345f83a7 --- a/workshop/testslurm.sh +++ b/workshop/testslurm.sh @@ -1,7 +1,7 @@ #!/bin/bash ### Batch Queuing System is SLURM #SBATCH --partition=shared -#SBATCH --time=1:00:00 #168 +#SBATCH --time=168:00:00 #SBATCH --mail-type=FAIL #SBATCH --account=bm0021 #SBATCH --output=cloudify_%j.log @@ -12,5 +12,5 @@ echo $HOSTNAME #/scratch/k/k204210/temp/ngc4008_P1D_3.parq source activate /work/bm0021/conda-envs/cloudify -python xpublish_references.py test $1 +python xpublish_references.py "$@" diff --git a/workshop/xpublish_references.py b/workshop/xpublish_references.py new file mode 100644 index 0000000000000000000000000000000000000000..4cf579e72b66be9178345182be754ff5e0279f88 --- /dev/null +++ b/workshop/xpublish_references.py @@ -0,0 +1,80 @@ + +ssl_keyfile="/work/bm0021/k204210/cloudify/workshop/key.pem" +ssl_certfile="/work/bm0021/k204210/cloudify/workshop/cert.pem" + +from cloudify.plugins.stacer import * +from cloudify.utils.daskhelper import * +from cloudify.plugins.kerchunk import * +import xarray as xr +import xpublish as xp +import asyncio +#import nest_asyncio +import sys +import os +import socket + +def is_port_free(port, host="localhost"): + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + return s.connect_ex((host, port)) != 0 # Returns True if the port is free + +def find_free_port(start=5000, end=5100, host="localhost"): + for port in range(start, end + 1): + if is_port_free(port, host): + return port + return None # No free ports found + +port = find_free_port(9000,9100) +if not port: + raise ValueError("Could not find a free port for service") + +SO=dict( + remote_protocol="slk", + remote_options=dict( + slk_cache="/scratch/k/k202134/INTAKE_CACHE" + ), + lazy=True, + cache_size=0 +) +#nest_asyncio.apply() + + +if __name__ == "__main__": # This avoids infinite subprocess creation + #import dask + #zarrcluster = asyncio.get_event_loop().run_until_complete(get_dask_cluster()) + #os.environ["ZARR_ADDRESS"]=zarrcluster.scheduler._address + + glob_inp=sys.argv[1:] + + dsdict={} + mapper_dict={} + for g in glob_inp: + dsname=g.split('/')[-1] + source="reference::/"+g + print(source) + fsmap = fsspec.get_mapper( + source, + **SO + ) + ds=xr.open_dataset( + fsmap, + engine="zarr", + chunks="auto", + consolidated=False + ) + mapper_dict[source]=fsmap + ds=ds.drop_encoding() + ds.encoding["source"]=source + dsdict[dsname]=ds + + kp = KerchunkPass() + kp.mapper_dict = mapper_dict + + collection = xp.Rest(dsdict) + collection.register_plugin(Stac()) + collection.register_plugin(kp) + collection.serve( + host="0.0.0.0", + port=port, + ssl_keyfile=ssl_keyfile, + ssl_certfile=ssl_certfile + )