Skip to content
Snippets Groups Projects

Ingest different DRS formats

Merged Ghost User requested to merge drs-specs into main
4 unresolved threads
3 files
+ 35
21
Compare changes
  • Side-by-side
  • Inline
Files
3
+ 25
19
@@ -35,6 +35,10 @@ pub enum IngestError {
/// Error canonicalizing data dir path
#[error("error canonicalizing data-dir, may not exist")]
DataDirCanonicalization(#[from] io::Error),
/// Given data dir was not contained in any known dataset
#[error("Given data-dir was not contained in any known dataset")]
InvalidDataDir,
}
#[derive(Debug)]
@@ -59,10 +63,10 @@ pub struct DatasetReport {
///
/// `files_collection` and `latest_collection` are optional. If `None`, then this will use Freva's defaults:
/// [`DEFAULT_FILES_COLLECTION`] and [`DEFAULT_LATEST_COLLECTION`], respectively.
pub async fn ingest<P: AsRef<Path>>(
pub async fn ingest(
solr: &Solr,
drs_conf: &Config,
data_dir: &Option<P>,
data_dir: &Option<&Path>,
batch_size: usize,
files_collection: Option<&str>,
latest_collection: Option<&str>,
@@ -71,25 +75,26 @@ pub async fn ingest<P: AsRef<Path>>(
let start_time = Instant::now();
for dataset in drs_conf.datasets() {
// If data_dir is set, for each dataset check if data_dir is within this dataset
// if not, don't spawn a ingestion routine (i.e. skip this iteration of the loop).
// This repackages it into an Option because `ingest_dataset` has to check again for lifetime/borrowing reasons
// would be nice to figure out how to not need that.
let data_dir = if let Some(p) = &data_dir {
let p = tokio::fs::canonicalize(p).await?;
if !p.starts_with(dataset.root()) {
trace!(
"dataset {} does not include specified `data_dir`",
dataset.name()
);
continue;
// immediately canonicalize data-dir if given to ensure we always operate on the fully qualified form
let data_dir = match data_dir {
Some(p) => Some(tokio::fs::canonicalize(p).await?),
None => None,
};
// If we were not given data-dir, ingest all datasets
// If we were given a data-dir then look for a dataset that contains it and only ingest from that dataset
let datasets: Vec<&Dataset> = match data_dir.as_ref() {
None => drs_conf.datasets().collect(),
Some(p) => {
let dataset = drs_conf.datasets().find(|d| p.starts_with(d.root()));
match dataset {
None => return Err(IngestError::InvalidDataDir),
Some(d) => vec![d],
}
Some(p)
} else {
None
};
}
};
for dataset in datasets.into_iter() {
// clone for each thread to capture without worrying about lifetimes
let solr = solr.clone();
let dataset = dataset.clone();
@@ -99,6 +104,7 @@ pub async fn ingest<P: AsRef<Path>>(
let latest_collection = latest_collection
.unwrap_or(DEFAULT_LATEST_COLLECTION)
.to_owned();
let data_dir = data_dir.as_ref().cloned();
handles.push(tokio::spawn(async move {
ingest_dataset(
solr,
Loading