diff --git a/freva/src/drs/ingest.rs b/freva/src/drs/ingest.rs index e35716fe50dfb2cf1b16f9f7f06072c7d16294a7..7b9ce9783fcaae275007146decf1e806bdef3f38 100644 --- a/freva/src/drs/ingest.rs +++ b/freva/src/drs/ingest.rs @@ -220,7 +220,7 @@ async fn ingest_dataset( } }; - let f = match dataset.metadata_from_path(&path) { + let f = match dataset.metadata_from_path(&path).await { Ok(f) => f, Err(e) => { warn!("{} not a valid drs file, skipping:\n{:?}", path, e); diff --git a/freva/src/drs/metadata.rs b/freva/src/drs/metadata.rs index d538979f4000d6240e0cccadda65ddc895dbf542..2f6bbe5a9e06c5a9475622d18554061ced474c56 100644 --- a/freva/src/drs/metadata.rs +++ b/freva/src/drs/metadata.rs @@ -4,7 +4,6 @@ mod config_builder; pub mod custom; use core::fmt; -use std::sync::Mutex; use std::{ collections::HashMap, fmt::Formatter, @@ -21,6 +20,7 @@ use drs::{ use lazy_static::lazy_static; use netcdf::AttrValue; use thiserror::Error; +use tokio::sync::Mutex; use tracing::{debug, error, trace}; pub use config_builder::ConfigBuilder; @@ -95,7 +95,11 @@ impl Specification { /// Relative path here means it starts from the beginning of the DRS portion of the path. For example if a /// dataset's `root_dir` is `/foo/bar` and a file within that dataset is `/foo/bar/cmip5/some/other/things.nc` /// then `path` here would be `cmip5/some/other/things.nc`. - fn metadata_from_path(&self, root_dir: &Utf8Path, path: &Utf8Path) -> Result<Metadata, Error> { + async fn metadata_from_path( + &self, + root_dir: &Utf8Path, + path: &Utf8Path, + ) -> Result<Metadata, Error> { use drs::cmip5::Cmip5 as drs_cmip5; use drs::cmip6::Cmip6 as drs_cmip6; use drs::cordex::Cordex as drs_cordex; @@ -105,7 +109,7 @@ impl Specification { Cmip5 => drs_cmip5::from_esgf_path(path)?.into(), Cmip6 => { let cmip = drs_cmip6::from_path(path)?; - metadata_from_cmip6(cmip, root_dir)? + metadata_from_cmip6(cmip, root_dir).await? } Cordex => drs_cordex::from_path(path)?.into(), Custom(c) => c.metadata_from_path(path)?.into(), @@ -144,12 +148,15 @@ impl Dataset { } /// Processes a path according to this dataset's `drs_format` and returns the normalized metadata - pub fn metadata_from_path(&self, path: &Utf8Path) -> Result<Metadata, Error> { + pub async fn metadata_from_path(&self, path: &Utf8Path) -> Result<Metadata, Error> { // in the normal flow of this code as currently written, it's shouldn't be possible for stripping the prefix to // cause an error let path = path.strip_prefix(&self.root_dir)?; - let mut metadata = self.drs_format.metadata_from_path(&self.root_dir, path)?; + let mut metadata = self + .drs_format + .metadata_from_path(&self.root_dir, path) + .await?; // put the root back into the path so this contains the entire fully qualified path metadata.path = self.root_dir.join(metadata.path); Ok(metadata) @@ -170,7 +177,7 @@ pub struct Config { impl Config { /// Creates a [`Metadata`] from the given path based on the available dataset - pub fn metadata_from_path<P: AsRef<Path>>(&self, path: &P) -> Result<Metadata, Error> { + pub async fn metadata_from_path<P: AsRef<Path>>(&self, path: &P) -> Result<Metadata, Error> { let path = absolute(path.as_ref())?; let path = Utf8PathBuf::try_from(path)?; @@ -179,7 +186,7 @@ impl Config { .ok_or_else(|| Error::InvalidPath("no matching dataset found".to_owned()))?; debug!("found dataset {} for path \n{}", dataset.name(), path); - dataset.metadata_from_path(&path) + dataset.metadata_from_path(&path).await } /// Returns an iterator over the datasets available in the config. @@ -402,9 +409,12 @@ lazy_static! { /// Transforms a [`Cmip6`] object into [`Metadata`]. This is handled differently from the others because CMIP6's path /// does not contain all the data [`Metadata`] needs. To get the rest we need to either use mapping tables maintained /// separately from the DRS spec or, as we do here, open the files up and pull the data from their attributes. -fn metadata_from_cmip6(cmip: Cmip6, root_dir: &Utf8Path) -> Result<Metadata, ExtractMetadataError> { +async fn metadata_from_cmip6( + cmip: Cmip6<'_>, + root_dir: &Utf8Path, +) -> Result<Metadata, ExtractMetadataError> { let (frequency, realm) = { - let mut guard = CMIP6_ATTR_CACHE.lock().unwrap(); + let mut guard = CMIP6_ATTR_CACHE.lock().await; if let Some((freq, realm)) = guard.get(cmip.metadata.table_id) { (freq.clone(), realm.clone()) } else { @@ -440,6 +450,8 @@ fn get_cmip6_attrs(path: &Utf8Path) -> Result<(String, String), ExtractMetadataE // this netcdf library has an internal global mutex meaning it doesn't play well with multithreaded code. // It's safe but will slow things down. I think this will be mostly ok since data of one type will already be // mostly single threaded but the mutex may not play nicely with tokio + // I'm not sure if sending this into a `spawn_blocking` would help because of the global mutex. I'll leave it as + // is until it's more clear if not doing so is a problem. let f = netcdf::open(path)?; let frequency_attr = f diff --git a/freva/src/drs/search.rs b/freva/src/drs/search.rs index fa3fd1cea1e8fd8258f22abf23e8b231ba3d3ed7..0fe56b8f1d20af0d59733bbf06768604c4d8f1cc 100644 --- a/freva/src/drs/search.rs +++ b/freva/src/drs/search.rs @@ -36,7 +36,7 @@ pub async fn search(drs_conf: &Config, solr: &Solr) -> Result<Vec<Metadata>, Err for d in docs.iter() { println!("{}", d.file); - let file = drs_conf.metadata_from_path(&d.file)?; + let file = drs_conf.metadata_from_path(&d.file).await?; files.push(file); } start += docs.len();