From 077b75f41e4737926632a3389481858c5e26d6c1 Mon Sep 17 00:00:00 2001
From: Fabian Wachsmann <k204210@l40243.lvt.dkrz.de>
Date: Thu, 27 Feb 2025 08:25:03 +0100
Subject: [PATCH] For all ngcs

---
 workshop/testslurm.sh           |  4 +-
 workshop/xpublish_references.py | 80 +++++++++++++++++++++++++++++++++
 2 files changed, 82 insertions(+), 2 deletions(-)
 mode change 100644 => 100755 workshop/testslurm.sh
 create mode 100644 workshop/xpublish_references.py

diff --git a/workshop/testslurm.sh b/workshop/testslurm.sh
old mode 100644
new mode 100755
index 9a7434c..c56a35b
--- a/workshop/testslurm.sh
+++ b/workshop/testslurm.sh
@@ -1,7 +1,7 @@
 #!/bin/bash
 ### Batch Queuing System is SLURM
 #SBATCH --partition=shared
-#SBATCH --time=1:00:00 #168
+#SBATCH --time=168:00:00
 #SBATCH --mail-type=FAIL
 #SBATCH --account=bm0021
 #SBATCH --output=cloudify_%j.log
@@ -12,5 +12,5 @@
 echo $HOSTNAME
 #/scratch/k/k204210/temp/ngc4008_P1D_3.parq
 source activate /work/bm0021/conda-envs/cloudify
-python xpublish_references.py test $1
+python xpublish_references.py "$@"
 
diff --git a/workshop/xpublish_references.py b/workshop/xpublish_references.py
new file mode 100644
index 0000000..4cf579e
--- /dev/null
+++ b/workshop/xpublish_references.py
@@ -0,0 +1,80 @@
+
+ssl_keyfile="/work/bm0021/k204210/cloudify/workshop/key.pem"
+ssl_certfile="/work/bm0021/k204210/cloudify/workshop/cert.pem"
+
+from cloudify.plugins.stacer import *
+from cloudify.utils.daskhelper import *
+from cloudify.plugins.kerchunk import *
+import xarray as xr
+import xpublish as xp
+import asyncio
+#import nest_asyncio
+import sys
+import os
+import socket
+
+def is_port_free(port, host="localhost"):
+    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
+        return s.connect_ex((host, port)) != 0  # Returns True if the port is free
+
+def find_free_port(start=5000, end=5100, host="localhost"):
+    for port in range(start, end + 1):
+        if is_port_free(port, host):
+            return port
+    return None  # No free ports found
+
+port = find_free_port(9000,9100)
+if not port:
+    raise ValueError("Could not find a free port for service")
+
+SO=dict(
+    remote_protocol="slk",
+    remote_options=dict(
+        slk_cache="/scratch/k/k202134/INTAKE_CACHE"
+    ),
+    lazy=True,
+    cache_size=0
+)
+#nest_asyncio.apply()
+
+
+if __name__ == "__main__":  # This avoids infinite subprocess creation
+    #import dask
+    #zarrcluster = asyncio.get_event_loop().run_until_complete(get_dask_cluster())
+    #os.environ["ZARR_ADDRESS"]=zarrcluster.scheduler._address
+    
+    glob_inp=sys.argv[1:]
+
+    dsdict={}
+    mapper_dict={}
+    for g in glob_inp:
+        dsname=g.split('/')[-1]
+        source="reference::/"+g
+        print(source)
+        fsmap = fsspec.get_mapper(
+                source,
+                **SO
+                )
+        ds=xr.open_dataset(
+                fsmap,
+                engine="zarr",
+                chunks="auto",
+                consolidated=False 
+                )
+        mapper_dict[source]=fsmap
+        ds=ds.drop_encoding()
+        ds.encoding["source"]=source
+        dsdict[dsname]=ds
+
+    kp = KerchunkPass()
+    kp.mapper_dict = mapper_dict
+    
+    collection = xp.Rest(dsdict)
+    collection.register_plugin(Stac())
+    collection.register_plugin(kp)
+    collection.serve(
+        host="0.0.0.0",
+        port=port,
+        ssl_keyfile=ssl_keyfile,
+        ssl_certfile=ssl_certfile
+    )
-- 
GitLab