Skip to content
Snippets Groups Projects
Commit e67ec395 authored by Brian Lewis's avatar Brian Lewis
Browse files

Merge branch 'fix-latest-version' into 'main'

Fix latest version

See merge request !9
parents aeed9ea6 72ebe2b4
No related branches found
Tags 0.5.0
1 merge request!9Fix latest version
Pipeline #22242 passed
//! Handles inserting DRS file metadata into Solr
use std::collections::HashSet;
use std::io;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use std::{collections::HashMap, path::Path};
use camino::Utf8PathBuf;
use thiserror::Error;
......@@ -167,13 +168,17 @@ async fn ingest_dataset(
let mut sent = 0;
let mut skipped = 0;
let mut latest_versions: HashMap<String, String> = HashMap::new();
let mut latest_versions: HashSet<String> = HashSet::new();
let ingest_dir = match ingest_dir {
Some(p) => p,
None => dataset.root().as_std_path().to_owned(),
};
let walkdir = WalkDir::new(ingest_dir).follow_links(true);
let walkdir = WalkDir::new(ingest_dir)
.follow_links(true)
// this sort_by is _extremely important_. The current way we track the latest version of the file relies on
// it receiving the latest version as the first of the files with the same unversioned identifier
.sort_by(|a, b| a.file_name().cmp(b.file_name()).reverse());
let mut entries = walkdir_channel(walkdir);
let start_time = Instant::now();
......@@ -230,20 +235,16 @@ async fn ingest_dataset(
};
let f = Arc::new(f);
// given the reverse sorting above, we know the first (versioned) file we see with a new unversioned
// identifier will be the latest since all other elements are assumed to be identical so on the first
// time we've seen a file with a particular id, we add it as the latest and don't for any others
if let Some(version) = f.version() {
let id = f.to_unversioned_identifier();
match latest_versions.get_mut(&id) {
Some(latest) => {
if version.as_str() >= latest.as_str() {
*latest = version.to_owned();
latest_buf.push(f.clone());
}
}
None => {
latest_versions.insert(id, version.to_owned());
latest_buf.push(f.clone());
}
if !latest_versions.contains(&id) {
trace!("found new unversioned id {id}, submitting version {version} to latest");
latest_versions.insert(id);
latest_buf.push(f.clone());
}
} else {
// not versioned, always add to latest
......
use freva::drs::metadata::ConfigBuilder;
use freva::drs::Config;
use freva::solr::Solr;
use std::sync::Once;
use wiremock::MockServer;
use std::{collections::HashMap, sync::Once};
use wiremock::{Match, MockServer};
static INIT: Once = Once::new();
......@@ -54,3 +54,9 @@ pub fn test_config() -> Config {
let builder: ConfigBuilder = toml::from_str(&string).unwrap();
builder.build().unwrap()
}
pub fn owned_string_map(m: &[(&str, &str)]) -> HashMap<String, String> {
m.iter()
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect()
}
use std::{collections::HashMap, path::Path};
use std::{
collections::HashMap,
path::{Path, PathBuf},
};
use camino::Utf8PathBuf;
use wiremock::{
matchers::{method, path},
Mock, ResponseTemplate,
Mock, Request, ResponseTemplate,
};
use crate::common::owned_string_map;
mod common;
#[tokio::test]
......@@ -32,7 +37,7 @@ async fn test_ingest() {
let expected_results: HashMap<&str, (u32, u32)> = HashMap::from_iter(
[
("observations", (24, 0)),
("cmip5_name", (10, 0)),
("cmip5_name", (11, 0)),
("cmip6_name", (6, 0)),
]
.into_iter(),
......@@ -98,5 +103,106 @@ async fn test_ingest_subset() {
);
let cmip5_results = &report.datasets[0];
assert_eq!(4, cmip5_results.sent);
assert_eq!(5, cmip5_results.sent);
}
#[tokio::test]
async fn test_latest_version() {
common::log_init();
let config_dir = Utf8PathBuf::try_from(
PathBuf::from(common::REPO_ROOT)
.join(".docker")
.canonicalize()
.expect("error canonicalizing config dir"),
)
.expect("config dir contains non utf8 character");
let data_dir =
config_dir.join("data/cmip5/cmip5/output2/INM/inmcm4/esmrcp85/mon/land/Lmon/r1i1p1");
let drs_config = common::test_config();
// this is organized in the order they will be sent to solr (i.e. reverse lexicographical order by the file key)
// both the matchers rely on this to be the case
let expected = vec![owned_string_map(&[
("file", data_dir.join("v20110323/residualFrac/residualFrac_Lmon_inmcm4_esmrcp85_r1i1p1_200601-210012.nc").as_str()),
("dataset", "cmip5_name"),
("product", "output2"),
("project", "cmip5"),
("institute", "INM"),
("model", "inmcm4"),
("experiment", "esmrcp85"),
("time_frequency", "mon"),
("realm", "land"),
("ensemble", "r1i1p1"),
("cmor_table", "Lmon"),
("variable", "residualFrac"),
("version", "v20110323"),
("time", "[2006-01 TO 2100-12]"),
]), owned_string_map(&[
("file", data_dir.join("v20100323/residualFrac/residualFrac_Lmon_inmcm4_esmrcp85_r1i1p1_200601-210012.nc").as_str()),
("dataset", "cmip5_name"),
("product", "output2"),
("project", "cmip5"),
("institute", "INM"),
("model", "inmcm4"),
("experiment", "esmrcp85"),
("time_frequency", "mon"),
("realm", "land"),
("ensemble", "r1i1p1"),
("cmor_table", "Lmon"),
("variable", "residualFrac"),
("version", "v20100323"),
("time", "[2006-01 TO 2100-12]"),
])];
// these custom matcher functions are required because the default json matchers will match the json string that's
// received and hashmaps don't iterate in a predictable order so can't be relied on to produce the exact same string
// every run. Instead we deserialize them back into hashmaps which have order independent equality
// clone for the closure
let expected_match_clone = expected.clone();
let files_matcher = move |request: &Request| {
let actual: Vec<HashMap<String, String>> = serde_json::from_slice(&request.body).expect("");
actual == expected_match_clone
};
let latest_expected = expected[0].clone();
let latest_matcher = move |request: &Request| {
let actual: Vec<HashMap<String, String>> = serde_json::from_slice(&request.body).expect("");
actual[0] == latest_expected
};
let (mock, solr) = common::solr_server().await;
Mock::given(method("POST"))
.and(path("/solr/files/update/json/docs"))
.and(files_matcher)
.respond_with(ResponseTemplate::new(200))
.mount(&mock)
.await;
Mock::given(method("POST"))
.and(path("/solr/latest/update/json/docs"))
.and(latest_matcher)
.respond_with(ResponseTemplate::new(200))
.mount(&mock)
.await;
let result = freva::drs::ingest(
&solr,
&drs_config,
&Some(data_dir.as_std_path()),
10,
None,
None,
)
.await;
assert!(result.is_ok());
let report = result.unwrap();
assert_eq!(
1,
report.datasets.len(),
"unexpected number of dataset reports"
);
let cmip5_results = &report.datasets[0];
assert_eq!(2, cmip5_results.sent);
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment