Skip to content
Snippets Groups Projects

Add caching to cmip6 ingestion

Merged Ghost User requested to merge cmip6-cache into main
All threads resolved!
1 file
+ 1
2
Compare changes
  • Side-by-side
  • Inline
+ 76
25
@@ -17,9 +17,11 @@ use drs::{
cmip6::{Cmip6, MIP_ERA},
cordex::Cordex,
};
use lazy_static::lazy_static;
use netcdf::AttrValue;
use thiserror::Error;
use tracing::{debug, error};
use tokio::sync::Mutex;
use tracing::{debug, error, trace};
pub use config_builder::ConfigBuilder;
use custom::Custom;
@@ -93,7 +95,11 @@ impl Specification {
/// Relative path here means it starts from the beginning of the DRS portion of the path. For example if a
/// dataset's `root_dir` is `/foo/bar` and a file within that dataset is `/foo/bar/cmip5/some/other/things.nc`
/// then `path` here would be `cmip5/some/other/things.nc`.
fn metadata_from_path(&self, root_dir: &Utf8Path, path: &Utf8Path) -> Result<Metadata, Error> {
async fn metadata_from_path(
&self,
root_dir: &Utf8Path,
path: &Utf8Path,
) -> Result<Metadata, Error> {
use drs::cmip5::Cmip5 as drs_cmip5;
use drs::cmip6::Cmip6 as drs_cmip6;
use drs::cordex::Cordex as drs_cordex;
@@ -103,7 +109,7 @@ impl Specification {
Cmip5 => drs_cmip5::from_esgf_path(path)?.into(),
Cmip6 => {
let cmip = drs_cmip6::from_path(path)?;
metadata_from_cmip6(cmip, root_dir)?
metadata_from_cmip6(cmip, root_dir).await?
}
Cordex => drs_cordex::from_path(path)?.into(),
Custom(c) => c.metadata_from_path(path)?.into(),
@@ -142,12 +148,15 @@ impl Dataset {
}
/// Processes a path according to this dataset's `drs_format` and returns the normalized metadata
pub fn metadata_from_path(&self, path: &Utf8Path) -> Result<Metadata, Error> {
pub async fn metadata_from_path(&self, path: &Utf8Path) -> Result<Metadata, Error> {
// in the normal flow of this code as currently written, it's shouldn't be possible for stripping the prefix to
// cause an error
let path = path.strip_prefix(&self.root_dir)?;
let mut metadata = self.drs_format.metadata_from_path(&self.root_dir, path)?;
let mut metadata = self
.drs_format
.metadata_from_path(&self.root_dir, path)
.await?;
// put the root back into the path so this contains the entire fully qualified path
metadata.path = self.root_dir.join(metadata.path);
Ok(metadata)
@@ -168,7 +177,7 @@ pub struct Config {
impl Config {
/// Creates a [`Metadata`] from the given path based on the available dataset
pub fn metadata_from_path<P: AsRef<Path>>(&self, path: &P) -> Result<Metadata, Error> {
pub async fn metadata_from_path<P: AsRef<Path>>(&self, path: &P) -> Result<Metadata, Error> {
let path = absolute(path.as_ref())?;
let path = Utf8PathBuf::try_from(path)?;
@@ -177,7 +186,7 @@ impl Config {
.ok_or_else(|| Error::InvalidPath("no matching dataset found".to_owned()))?;
debug!("found dataset {} for path \n{}", dataset.name(), path);
dataset.metadata_from_path(&path)
dataset.metadata_from_path(&path).await
}
/// Returns an iterator over the datasets available in the config.
@@ -379,14 +388,70 @@ pub enum ExtractMetadataError {
},
}
lazy_static! {
/// Holds a mapping of table_id => (frequency, realm) which are the values that we need that cmip6 doesn't carry
/// in its path data. We can get this information from the netcdf header attributes available in the files
/// themselves but opening them to inspect is extremely slow. So we build this cache as we process any cmip6 files
/// because any table_id should always have the same frequency and realm values.
///
/// I'm not sure if this is a property of CMIP6 or just a coincidence but seems to hold for all current possible
/// values in the controlled vocabulary. See [here](https://github.com/PCMDI/cmip6-cmor-tables) for how table_id
/// determines frequency and realm.
///
/// This is done as a global cache right now just because performance shouldn't be a large concern as we expect far
/// more reads than writes and this implementation is simpler than e.g. having the [`drs`] crate load and hold the
/// mapping values ahead of time.
static ref CMIP6_ATTR_CACHE: Mutex<HashMap<String, (String, String)>> =
Mutex::new(HashMap::new());
}
/// Transforms a [`Cmip6`] object into [`Metadata`]. This is handled differently from the others because CMIP6's path
/// does not contain all the data [`Metadata`] needs. To get the rest we need to either use mapping tables maintained
/// separately from the DRS spec or, as we do here, open the files up and pull the data from their attributes.
fn metadata_from_cmip6(cmip: Cmip6, root_dir: &Utf8Path) -> Result<Metadata, ExtractMetadataError> {
async fn metadata_from_cmip6(
cmip: Cmip6<'_>,
root_dir: &Utf8Path,
) -> Result<Metadata, ExtractMetadataError> {
let (frequency, realm) = {
let mut guard = CMIP6_ATTR_CACHE.lock().await;
if let Some((freq, realm)) = guard.get(cmip.metadata.table_id) {
(freq.clone(), realm.clone())
} else {
trace!("attributes not cached, opening file to extract");
let (frequency, realm) = get_cmip6_attrs(&root_dir.join(cmip.path))?;
guard.insert(
cmip.metadata.table_id.to_owned(),
(frequency.clone(), realm.clone()),
);
(frequency, realm)
}
};
let m = &cmip.metadata;
Ok(Metadata {
path: cmip.path.to_owned(),
activity: MIP_ERA.to_owned(),
product: m.activity_id.to_owned(),
institute: m.institution_id.to_owned(),
model: m.source_id.to_owned(),
experiment: m.experiment_id.to_owned(),
ensemble: m.member_id.variant_label.to_string(),
variable: m.variable_id.to_owned(),
version: Some(m.version.to_owned()),
mip_table: m.table_id.to_owned(),
modeling_realm: realm,
frequency,
})
}
fn get_cmip6_attrs(path: &Utf8Path) -> Result<(String, String), ExtractMetadataError> {
// this netcdf library has an internal global mutex meaning it doesn't play well with multithreaded code.
// It's safe but will slow things down. I think this will be mostly ok since data of one type will already be
// mostly single threaded but the mutex may not play nicely with tokio
let f = netcdf::open(root_dir.join(cmip.path))?;
// I'm not sure if sending this into a `spawn_blocking` would help because of the global mutex. I'll leave it as
// is until it's more clear if not doing so is a problem.
let f = netcdf::open(path)?;
let frequency_attr = f
.attribute("frequency")
@@ -406,7 +471,7 @@ fn metadata_from_cmip6(cmip: Cmip6, root_dir: &Utf8Path) -> Result<Metadata, Ext
let realm_attr = f
.attribute("realm")
.ok_or_else(|| ExtractMetadataError::MissingAttribute("table_id".to_owned()))?
.ok_or_else(|| ExtractMetadataError::MissingAttribute("realm".to_owned()))?
.value()?;
let realm = match realm_attr {
AttrValue::Str(s) => s,
@@ -419,21 +484,7 @@ fn metadata_from_cmip6(cmip: Cmip6, root_dir: &Utf8Path) -> Result<Metadata, Ext
}
};
let m = &cmip.metadata;
Ok(Metadata {
path: cmip.path.to_owned(),
activity: MIP_ERA.to_owned(),
product: m.activity_id.to_owned(),
institute: m.institution_id.to_owned(),
model: m.source_id.to_owned(),
experiment: m.experiment_id.to_owned(),
ensemble: m.member_id.variant_label.to_string(),
variable: m.variable_id.to_owned(),
version: Some(m.version.to_owned()),
mip_table: m.table_id.to_owned(),
modeling_realm: realm,
frequency,
})
Ok((frequency, realm))
}
/// Gets a printable type name for each attrvalue for error handling.
Loading