use std::sync::Arc;
use arrow::datatypes::{FieldRef, Schema, SchemaRef};
use datafusion_common::{
pruning::{
CompositePruningStatistics, PartitionPruningStatistics, PrunableStatistics,
PruningStatistics,
},
Result,
};
use datafusion_datasource::PartitionedFile;
use datafusion_physical_expr_common::physical_expr::{snapshot_generation, PhysicalExpr};
use datafusion_physical_plan::metrics::Count;
use itertools::Itertools;
use log::debug;
use crate::build_pruning_predicate;
pub struct FilePruner {
predicate_generation: Option<u64>,
predicate: Arc<dyn PhysicalExpr>,
pruning_schema: Arc<Schema>,
file: PartitionedFile,
partition_fields: Vec<FieldRef>,
predicate_creation_errors: Count,
}
impl FilePruner {
pub fn new(
predicate: Arc<dyn PhysicalExpr>,
logical_file_schema: &SchemaRef,
partition_fields: Vec<FieldRef>,
file: PartitionedFile,
predicate_creation_errors: Count,
) -> Result<Self> {
let pruning_schema = Arc::new(
Schema::new(
logical_file_schema
.fields()
.iter()
.cloned()
.chain(partition_fields.iter().cloned())
.collect_vec(),
)
.with_metadata(logical_file_schema.metadata().clone()),
);
Ok(Self {
predicate_generation: None,
predicate,
pruning_schema,
file,
partition_fields,
predicate_creation_errors,
})
}
pub fn should_prune(&mut self) -> Result<bool> {
let new_generation = snapshot_generation(&self.predicate);
if let Some(current_generation) = self.predicate_generation.as_mut() {
if *current_generation == new_generation {
return Ok(false);
}
*current_generation = new_generation;
} else {
self.predicate_generation = Some(new_generation);
}
let pruning_predicate = build_pruning_predicate(
Arc::clone(&self.predicate),
&self.pruning_schema,
&self.predicate_creation_errors,
);
if let Some(pruning_predicate) = pruning_predicate {
let mut pruning = Box::new(PartitionPruningStatistics::try_new(
vec![self.file.partition_values.clone()],
self.partition_fields.clone(),
)?) as Box<dyn PruningStatistics>;
if let Some(stats) = &self.file.statistics {
let stats_pruning = Box::new(PrunableStatistics::new(
vec![Arc::clone(stats)],
Arc::clone(&self.pruning_schema),
));
pruning = Box::new(CompositePruningStatistics::new(vec![
pruning,
stats_pruning,
]));
}
match pruning_predicate.prune(pruning.as_ref()) {
Ok(values) => {
assert!(values.len() == 1);
if values.into_iter().all(|v| !v) {
return Ok(true);
}
}
Err(e) => {
debug!("Ignoring error building pruning predicate for file: {e}");
self.predicate_creation_errors.add(1);
}
}
}
Ok(false)
}
}