Skip to content
Snippets Groups Projects

Ingest

Merged Ghost User requested to merge ingest into main
Compare and Show latest version
10 files
+ 179
187
Compare changes
  • Side-by-side
  • Inline
Files
10
+ 33
11
@@ -9,7 +9,7 @@ use camino::Utf8PathBuf;
use thiserror::Error;
use tokio::sync::mpsc::{channel, Receiver};
use tokio::time::Instant;
use tracing::{error, info, trace, warn};
use tracing::{error, info, instrument, trace, warn};
use walkdir::{DirEntry, WalkDir};
use crate::drs::Structure;
@@ -17,7 +17,7 @@ use crate::{
drs::{Config, Metadata},
solr::{Solr, SolrError},
};
use crate::{FILES_COLLECTION, LATEST_COLLECTION};
use crate::{DEFAULT_FILES_COLLECTION, DEFAULT_LATEST_COLLECTION};
/// Errors possible while ingesting data into solr
#[derive(Debug, Error)]
@@ -55,11 +55,16 @@ pub struct StructureReport {
/// Spawns 2 tasks per [`Structure`] in the DRS config where `root_dir` is contained in (i.e. `starts_with`) `data_dir`.
/// One task is the computation thread which processes the file names as they come in and it spawns the second task
/// which is a blocking thread that uses `walkdir` synchronously and feeds the paths it finds into a channel.
///
/// `files_collection` and `latest_collection` are optional. If `None`, then this will use Freva's defaults:
/// [`DEFAULT_FILES_COLLECTION`] and [`DEFAULT_LATEST_COLLECTION`], respectively.
pub async fn ingest<P: AsRef<Path>>(
solr: &Solr,
drs_conf: &Config,
data_dir: P,
batch_size: usize,
files_collection: Option<&str>,
latest_collection: Option<&str>,
) -> Result<IngestReport, IngestError> {
let mut handles = Vec::new();
let data_dir = tokio::fs::canonicalize(data_dir.as_ref()).await?;
@@ -68,10 +73,24 @@ pub async fn ingest<P: AsRef<Path>>(
for structure in drs_conf.structures() {
if structure.root().starts_with(&data_dir) {
// clone for each thread to capture without worrying about lifetimes
let solr = solr.clone();
let structure = structure.clone();
let files_collection = files_collection
.unwrap_or(DEFAULT_FILES_COLLECTION)
.to_owned();
let latest_collection = latest_collection
.unwrap_or(DEFAULT_LATEST_COLLECTION)
.to_owned();
handles.push(tokio::spawn(async move {
ingest_structure(solr, structure, batch_size).await
ingest_structure(
solr,
structure,
batch_size,
&files_collection,
&latest_collection,
)
.await
}));
}
}
@@ -105,11 +124,15 @@ pub async fn ingest<P: AsRef<Path>>(
/// This handles crawling the structure's `root_dir` and ingesting any files it finds there are valid for the structure.
/// This inserts the files it finds into both files and latest collection within solr.
///
/// Invalid files, whatever the reason they're invalid, will be skipped.
/// Invalid files, whatever the reason they're invalid, will be skipped. Failing to send to solr will result in this
/// ending early with an error. Any files that were already sent to solr will not be cleaned up.
#[instrument(skip(solr, structure), fields(structure = structure.activity().as_str()))]
async fn ingest_structure(
solr: Solr,
structure: Structure,
batch_size: usize,
files_collection: &str,
latest_collection: &str,
) -> Result<StructureReport, IngestError> {
let mut file_buf = Vec::with_capacity(batch_size);
let mut latest_buf = Vec::with_capacity(batch_size);
@@ -125,10 +148,10 @@ async fn ingest_structure(
while let Some(entry) = entries.recv().await {
if file_buf.len() == batch_size {
sent += flush(&solr, FILES_COLLECTION, &mut file_buf).await?;
sent += flush(&solr, files_collection, &mut file_buf).await?;
}
if latest_buf.len() == batch_size {
flush(&solr, LATEST_COLLECTION, &mut latest_buf).await?;
flush(&solr, latest_collection, &mut latest_buf).await?;
}
let entry = match entry {
@@ -153,8 +176,7 @@ async fn ingest_structure(
Ok(p) => p,
Err(e) => {
warn!(
"{} not a valid drs file, has non UTF8 characters in path:\n{:?}",
entry.path().display(),
"not a valid drs file, has non UTF8 characters in path:\n{:?}",
e
);
skipped += 1;
@@ -165,7 +187,7 @@ async fn ingest_structure(
let f = match structure.metadata_from_path(&path) {
Ok(f) => f,
Err(e) => {
warn!("{} not a valid drs file, skipping:\n{:?}", path, e);
warn!("not a valid drs file, skipping:\n{:#?}", e);
skipped += 1;
continue;
}
@@ -198,10 +220,10 @@ async fn ingest_structure(
}
if !file_buf.is_empty() {
sent += flush(&solr, FILES_COLLECTION, &mut file_buf).await?;
sent += flush(&solr, files_collection, &mut file_buf).await?;
}
if !latest_buf.is_empty() {
flush(&solr, LATEST_COLLECTION, &mut latest_buf).await?;
flush(&solr, latest_collection, &mut latest_buf).await?;
}
Ok(StructureReport {
Loading