use arrow::array::{Array, NullArray, UInt64Array};
use arrow::array::{ArrayRef, BooleanArray};
use arrow::datatypes::{FieldRef, Schema, SchemaRef};
use std::collections::HashSet;
use std::sync::Arc;
use crate::error::DataFusionError;
use crate::stats::Precision;
use crate::{Column, Statistics};
use crate::{ColumnStatistics, ScalarValue};
pub trait PruningStatistics {
fn min_values(&self, column: &Column) -> Option<ArrayRef>;
fn max_values(&self, column: &Column) -> Option<ArrayRef>;
fn num_containers(&self) -> usize;
fn null_counts(&self, column: &Column) -> Option<ArrayRef>;
fn row_counts(&self, column: &Column) -> Option<ArrayRef>;
fn contained(
&self,
column: &Column,
values: &HashSet<ScalarValue>,
) -> Option<BooleanArray>;
}
#[derive(Clone)]
pub struct PartitionPruningStatistics {
partition_values: Vec<ArrayRef>,
num_containers: usize,
partition_schema: SchemaRef,
}
impl PartitionPruningStatistics {
pub fn try_new(
partition_values: Vec<Vec<ScalarValue>>,
partition_fields: Vec<FieldRef>,
) -> Result<Self, DataFusionError> {
let num_containers = partition_values.len();
let partition_schema = Arc::new(Schema::new(partition_fields));
let mut partition_values_by_column =
vec![
Vec::with_capacity(partition_values.len());
partition_schema.fields().len()
];
for partition_value in partition_values {
for (i, value) in partition_value.into_iter().enumerate() {
partition_values_by_column[i].push(value);
}
}
Ok(Self {
partition_values: partition_values_by_column
.into_iter()
.map(|v| {
if v.is_empty() {
Ok(Arc::new(NullArray::new(0)) as ArrayRef)
} else {
ScalarValue::iter_to_array(v)
}
})
.collect::<Result<Vec<_>, _>>()?,
num_containers,
partition_schema,
})
}
}
impl PruningStatistics for PartitionPruningStatistics {
fn min_values(&self, column: &Column) -> Option<ArrayRef> {
let index = self.partition_schema.index_of(column.name()).ok()?;
self.partition_values.get(index).and_then(|v| {
if v.is_empty() || v.null_count() == v.len() {
None
} else {
Some(Arc::clone(v))
}
})
}
fn max_values(&self, column: &Column) -> Option<ArrayRef> {
self.min_values(column)
}
fn num_containers(&self) -> usize {
self.num_containers
}
fn null_counts(&self, _column: &Column) -> Option<ArrayRef> {
None
}
fn row_counts(&self, _column: &Column) -> Option<ArrayRef> {
None
}
fn contained(
&self,
column: &Column,
values: &HashSet<ScalarValue>,
) -> Option<BooleanArray> {
let index = self.partition_schema.index_of(column.name()).ok()?;
let array = self.partition_values.get(index)?;
let boolean_array = values.iter().try_fold(None, |acc, v| {
let arrow_value = v.to_scalar().ok()?;
let eq_result = arrow::compute::kernels::cmp::eq(array, &arrow_value).ok()?;
match acc {
None => Some(Some(eq_result)),
Some(acc_array) => {
arrow::compute::kernels::boolean::and(&acc_array, &eq_result)
.map(Some)
.ok()
}
}
})??;
if boolean_array.is_empty() || boolean_array.null_count() == boolean_array.len() {
None
} else {
Some(boolean_array)
}
}
}
#[derive(Clone)]
pub struct PrunableStatistics {
statistics: Vec<Arc<Statistics>>,
schema: SchemaRef,
}
impl PrunableStatistics {
pub fn new(statistics: Vec<Arc<Statistics>>, schema: SchemaRef) -> Self {
Self { statistics, schema }
}
fn get_exact_column_statistics(
&self,
column: &Column,
get_stat: impl Fn(&ColumnStatistics) -> &Precision<ScalarValue>,
) -> Option<ArrayRef> {
let index = self.schema.index_of(column.name()).ok()?;
let mut has_value = false;
match ScalarValue::iter_to_array(self.statistics.iter().map(|s| {
s.column_statistics
.get(index)
.and_then(|stat| {
if let Precision::Exact(min) = get_stat(stat) {
has_value = true;
Some(min.clone())
} else {
None
}
})
.unwrap_or(ScalarValue::Null)
})) {
Ok(array) => has_value.then_some(array),
Err(_) => {
log::warn!(
"Failed to convert min values to array for column {}",
column.name()
);
None
}
}
}
}
impl PruningStatistics for PrunableStatistics {
fn min_values(&self, column: &Column) -> Option<ArrayRef> {
self.get_exact_column_statistics(column, |stat| &stat.min_value)
}
fn max_values(&self, column: &Column) -> Option<ArrayRef> {
self.get_exact_column_statistics(column, |stat| &stat.max_value)
}
fn num_containers(&self) -> usize {
self.statistics.len()
}
fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
let index = self.schema.index_of(column.name()).ok()?;
if self.statistics.iter().any(|s| {
s.column_statistics
.get(index)
.is_some_and(|stat| stat.null_count.is_exact().unwrap_or(false))
}) {
Some(Arc::new(
self.statistics
.iter()
.map(|s| {
s.column_statistics.get(index).and_then(|stat| {
if let Precision::Exact(null_count) = &stat.null_count {
u64::try_from(*null_count).ok()
} else {
None
}
})
})
.collect::<UInt64Array>(),
))
} else {
None
}
}
fn row_counts(&self, column: &Column) -> Option<ArrayRef> {
if self.schema.index_of(column.name()).is_err() {
return None;
}
if self
.statistics
.iter()
.any(|s| s.num_rows.is_exact().unwrap_or(false))
{
Some(Arc::new(
self.statistics
.iter()
.map(|s| {
if let Precision::Exact(row_count) = &s.num_rows {
u64::try_from(*row_count).ok()
} else {
None
}
})
.collect::<UInt64Array>(),
))
} else {
None
}
}
fn contained(
&self,
_column: &Column,
_values: &HashSet<ScalarValue>,
) -> Option<BooleanArray> {
None
}
}
pub struct CompositePruningStatistics {
pub statistics: Vec<Box<dyn PruningStatistics>>,
}
impl CompositePruningStatistics {
pub fn new(statistics: Vec<Box<dyn PruningStatistics>>) -> Self {
assert!(!statistics.is_empty());
let num_containers = statistics[0].num_containers();
for stats in &statistics {
assert_eq!(num_containers, stats.num_containers());
}
Self { statistics }
}
}
impl PruningStatistics for CompositePruningStatistics {
fn min_values(&self, column: &Column) -> Option<ArrayRef> {
for stats in &self.statistics {
if let Some(array) = stats.min_values(column) {
return Some(array);
}
}
None
}
fn max_values(&self, column: &Column) -> Option<ArrayRef> {
for stats in &self.statistics {
if let Some(array) = stats.max_values(column) {
return Some(array);
}
}
None
}
fn num_containers(&self) -> usize {
self.statistics[0].num_containers()
}
fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
for stats in &self.statistics {
if let Some(array) = stats.null_counts(column) {
return Some(array);
}
}
None
}
fn row_counts(&self, column: &Column) -> Option<ArrayRef> {
for stats in &self.statistics {
if let Some(array) = stats.row_counts(column) {
return Some(array);
}
}
None
}
fn contained(
&self,
column: &Column,
values: &HashSet<ScalarValue>,
) -> Option<BooleanArray> {
for stats in &self.statistics {
if let Some(array) = stats.contained(column, values) {
return Some(array);
}
}
None
}
}
#[cfg(test)]
mod tests {
use crate::{
cast::{as_int32_array, as_uint64_array},
ColumnStatistics,
};
use super::*;
use arrow::datatypes::{DataType, Field};
use std::sync::Arc;
#[test]
fn test_partition_pruning_statistics() {
let partition_values = vec![
vec![ScalarValue::from(1i32), ScalarValue::from(2i32)],
vec![ScalarValue::from(3i32), ScalarValue::from(4i32)],
];
let partition_fields = vec![
Arc::new(Field::new("a", DataType::Int32, false)),
Arc::new(Field::new("b", DataType::Int32, false)),
];
let partition_stats =
PartitionPruningStatistics::try_new(partition_values, partition_fields)
.unwrap();
let column_a = Column::new_unqualified("a");
let column_b = Column::new_unqualified("b");
assert!(partition_stats.null_counts(&column_a).is_none());
assert!(partition_stats.row_counts(&column_a).is_none());
assert!(partition_stats.null_counts(&column_b).is_none());
assert!(partition_stats.row_counts(&column_b).is_none());
let min_values_a =
as_int32_array(&partition_stats.min_values(&column_a).unwrap())
.unwrap()
.into_iter()
.collect::<Vec<_>>();
let expected_values_a = vec![Some(1), Some(3)];
assert_eq!(min_values_a, expected_values_a);
let max_values_a =
as_int32_array(&partition_stats.max_values(&column_a).unwrap())
.unwrap()
.into_iter()
.collect::<Vec<_>>();
let expected_values_a = vec![Some(1), Some(3)];
assert_eq!(max_values_a, expected_values_a);
let min_values_b =
as_int32_array(&partition_stats.min_values(&column_b).unwrap())
.unwrap()
.into_iter()
.collect::<Vec<_>>();
let expected_values_b = vec![Some(2), Some(4)];
assert_eq!(min_values_b, expected_values_b);
let max_values_b =
as_int32_array(&partition_stats.max_values(&column_b).unwrap())
.unwrap()
.into_iter()
.collect::<Vec<_>>();
let expected_values_b = vec![Some(2), Some(4)];
assert_eq!(max_values_b, expected_values_b);
let values = HashSet::from([ScalarValue::from(1i32)]);
let contained_a = partition_stats.contained(&column_a, &values).unwrap();
let expected_contained_a = BooleanArray::from(vec![true, false]);
assert_eq!(contained_a, expected_contained_a);
let contained_b = partition_stats.contained(&column_b, &values).unwrap();
let expected_contained_b = BooleanArray::from(vec![false, false]);
assert_eq!(contained_b, expected_contained_b);
assert_eq!(partition_stats.num_containers(), 2);
}
#[test]
fn test_partition_pruning_statistics_empty() {
let partition_values = vec![];
let partition_fields = vec![
Arc::new(Field::new("a", DataType::Int32, false)),
Arc::new(Field::new("b", DataType::Int32, false)),
];
let partition_stats =
PartitionPruningStatistics::try_new(partition_values, partition_fields)
.unwrap();
let column_a = Column::new_unqualified("a");
let column_b = Column::new_unqualified("b");
assert!(partition_stats.null_counts(&column_a).is_none());
assert!(partition_stats.row_counts(&column_a).is_none());
assert!(partition_stats.null_counts(&column_b).is_none());
assert!(partition_stats.row_counts(&column_b).is_none());
assert!(partition_stats.min_values(&column_a).is_none());
assert!(partition_stats.max_values(&column_a).is_none());
assert!(partition_stats.min_values(&column_b).is_none());
assert!(partition_stats.max_values(&column_b).is_none());
let values = HashSet::from([ScalarValue::from(1i32)]);
assert!(partition_stats.contained(&column_a, &values).is_none());
}
#[test]
fn test_statistics_pruning_statistics() {
let statistics = vec![
Arc::new(
Statistics::default()
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::from(0i32)))
.with_max_value(Precision::Exact(ScalarValue::from(100i32)))
.with_null_count(Precision::Exact(0)),
)
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::from(100i32)))
.with_max_value(Precision::Exact(ScalarValue::from(200i32)))
.with_null_count(Precision::Exact(5)),
)
.with_num_rows(Precision::Exact(100)),
),
Arc::new(
Statistics::default()
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::from(50i32)))
.with_max_value(Precision::Exact(ScalarValue::from(300i32)))
.with_null_count(Precision::Exact(10)),
)
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::from(200i32)))
.with_max_value(Precision::Exact(ScalarValue::from(400i32)))
.with_null_count(Precision::Exact(0)),
)
.with_num_rows(Precision::Exact(200)),
),
];
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
Field::new("c", DataType::Int32, false),
]));
let pruning_stats = PrunableStatistics::new(statistics, schema);
let column_a = Column::new_unqualified("a");
let column_b = Column::new_unqualified("b");
let min_values_a = as_int32_array(&pruning_stats.min_values(&column_a).unwrap())
.unwrap()
.into_iter()
.collect::<Vec<_>>();
let expected_values_a = vec![Some(0), Some(50)];
assert_eq!(min_values_a, expected_values_a);
let max_values_a = as_int32_array(&pruning_stats.max_values(&column_a).unwrap())
.unwrap()
.into_iter()
.collect::<Vec<_>>();
let expected_values_a = vec![Some(100), Some(300)];
assert_eq!(max_values_a, expected_values_a);
let min_values_b = as_int32_array(&pruning_stats.min_values(&column_b).unwrap())
.unwrap()
.into_iter()
.collect::<Vec<_>>();
let expected_values_b = vec![Some(100), Some(200)];
assert_eq!(min_values_b, expected_values_b);
let max_values_b = as_int32_array(&pruning_stats.max_values(&column_b).unwrap())
.unwrap()
.into_iter()
.collect::<Vec<_>>();
let expected_values_b = vec![Some(200), Some(400)];
assert_eq!(max_values_b, expected_values_b);
let null_counts_a =
as_uint64_array(&pruning_stats.null_counts(&column_a).unwrap())
.unwrap()
.into_iter()
.collect::<Vec<_>>();
let expected_null_counts_a = vec![Some(0), Some(10)];
assert_eq!(null_counts_a, expected_null_counts_a);
let null_counts_b =
as_uint64_array(&pruning_stats.null_counts(&column_b).unwrap())
.unwrap()
.into_iter()
.collect::<Vec<_>>();
let expected_null_counts_b = vec![Some(5), Some(0)];
assert_eq!(null_counts_b, expected_null_counts_b);
let row_counts_a = as_uint64_array(&pruning_stats.row_counts(&column_a).unwrap())
.unwrap()
.into_iter()
.collect::<Vec<_>>();
let expected_row_counts_a = vec![Some(100), Some(200)];
assert_eq!(row_counts_a, expected_row_counts_a);
let row_counts_b = as_uint64_array(&pruning_stats.row_counts(&column_b).unwrap())
.unwrap()
.into_iter()
.collect::<Vec<_>>();
let expected_row_counts_b = vec![Some(100), Some(200)];
assert_eq!(row_counts_b, expected_row_counts_b);
let values = HashSet::from([ScalarValue::from(0i32)]);
assert!(pruning_stats.contained(&column_a, &values).is_none());
assert!(pruning_stats.contained(&column_b, &values).is_none());
assert_eq!(pruning_stats.num_containers(), 2);
let column_c = Column::new_unqualified("c");
assert!(pruning_stats.min_values(&column_c).is_none());
assert!(pruning_stats.max_values(&column_c).is_none());
assert!(pruning_stats.null_counts(&column_c).is_none());
let row_counts_c = as_uint64_array(&pruning_stats.row_counts(&column_c).unwrap())
.unwrap()
.into_iter()
.collect::<Vec<_>>();
let expected_row_counts_c = vec![Some(100), Some(200)];
assert_eq!(row_counts_c, expected_row_counts_c);
assert!(pruning_stats.contained(&column_c, &values).is_none());
let column_d = Column::new_unqualified("d");
assert!(pruning_stats.min_values(&column_d).is_none());
assert!(pruning_stats.max_values(&column_d).is_none());
assert!(pruning_stats.null_counts(&column_d).is_none());
assert!(pruning_stats.row_counts(&column_d).is_none());
assert!(pruning_stats.contained(&column_d, &values).is_none());
}
#[test]
fn test_statistics_pruning_statistics_empty() {
let statistics = vec![];
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
Field::new("c", DataType::Int32, false),
]));
let pruning_stats = PrunableStatistics::new(statistics, schema);
let column_a = Column::new_unqualified("a");
let column_b = Column::new_unqualified("b");
assert!(pruning_stats.min_values(&column_a).is_none());
assert!(pruning_stats.max_values(&column_a).is_none());
assert!(pruning_stats.min_values(&column_b).is_none());
assert!(pruning_stats.max_values(&column_b).is_none());
assert!(pruning_stats.null_counts(&column_a).is_none());
assert!(pruning_stats.null_counts(&column_b).is_none());
assert!(pruning_stats.row_counts(&column_a).is_none());
assert!(pruning_stats.row_counts(&column_b).is_none());
let values = HashSet::from([ScalarValue::from(1i32)]);
assert!(pruning_stats.contained(&column_a, &values).is_none());
}
#[test]
fn test_composite_pruning_statistics_partition_and_file() {
let partition_values = vec![
vec![ScalarValue::from(1i32), ScalarValue::from(10i32)],
vec![ScalarValue::from(2i32), ScalarValue::from(20i32)],
];
let partition_fields = vec![
Arc::new(Field::new("part_a", DataType::Int32, false)),
Arc::new(Field::new("part_b", DataType::Int32, false)),
];
let partition_stats =
PartitionPruningStatistics::try_new(partition_values, partition_fields)
.unwrap();
let file_statistics = vec![
Arc::new(
Statistics::default()
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::from(100i32)))
.with_max_value(Precision::Exact(ScalarValue::from(200i32)))
.with_null_count(Precision::Exact(0)),
)
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::from(300i32)))
.with_max_value(Precision::Exact(ScalarValue::from(400i32)))
.with_null_count(Precision::Exact(5)),
)
.with_num_rows(Precision::Exact(100)),
),
Arc::new(
Statistics::default()
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::from(500i32)))
.with_max_value(Precision::Exact(ScalarValue::from(600i32)))
.with_null_count(Precision::Exact(10)),
)
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::from(700i32)))
.with_max_value(Precision::Exact(ScalarValue::from(800i32)))
.with_null_count(Precision::Exact(0)),
)
.with_num_rows(Precision::Exact(200)),
),
];
let file_schema = Arc::new(Schema::new(vec![
Field::new("col_x", DataType::Int32, false),
Field::new("col_y", DataType::Int32, false),
]));
let file_stats = PrunableStatistics::new(file_statistics, file_schema);
let composite_stats = CompositePruningStatistics::new(vec![
Box::new(partition_stats),
Box::new(file_stats),
]);
let part_a = Column::new_unqualified("part_a");
let part_b = Column::new_unqualified("part_b");
let col_x = Column::new_unqualified("col_x");
let col_y = Column::new_unqualified("col_y");
let min_values_part_a =
as_int32_array(&composite_stats.min_values(&part_a).unwrap())
.unwrap()
.into_iter()
.collect::<Vec<_>>();
let expected_values_part_a = vec![Some(1), Some(2)];
assert_eq!(min_values_part_a, expected_values_part_a);
let max_values_part_a =
as_int32_array(&composite_stats.max_values(&part_a).unwrap())
.unwrap()
.into_iter()
.collect::<Vec<_>>();
assert_eq!(max_values_part_a, expected_values_part_a);
let min_values_part_b =
as_int32_array(&composite_stats.min_values(&part_b).unwrap())
.unwrap()
.into_iter()
.collect::<Vec<_>>();
let expected_values_part_b = vec![Some(10), Some(20)];
assert_eq!(min_values_part_b, expected_values_part_b);
let min_values_col_x =
as_int32_array(&composite_stats.min_values(&col_x).unwrap())
.unwrap()
.into_iter()
.collect::<Vec<_>>();
let expected_values_col_x = vec![Some(100), Some(500)];
assert_eq!(min_values_col_x, expected_values_col_x);
let max_values_col_x =
as_int32_array(&composite_stats.max_values(&col_x).unwrap())
.unwrap()
.into_iter()
.collect::<Vec<_>>();
let expected_max_values_col_x = vec![Some(200), Some(600)];
assert_eq!(max_values_col_x, expected_max_values_col_x);
let min_values_col_y =
as_int32_array(&composite_stats.min_values(&col_y).unwrap())
.unwrap()
.into_iter()
.collect::<Vec<_>>();
let expected_values_col_y = vec![Some(300), Some(700)];
assert_eq!(min_values_col_y, expected_values_col_y);
assert!(composite_stats.null_counts(&part_a).is_none());
assert!(composite_stats.null_counts(&part_b).is_none());
let null_counts_col_x =
as_uint64_array(&composite_stats.null_counts(&col_x).unwrap())
.unwrap()
.into_iter()
.collect::<Vec<_>>();
let expected_null_counts_col_x = vec![Some(0), Some(10)];
assert_eq!(null_counts_col_x, expected_null_counts_col_x);
assert!(composite_stats.row_counts(&part_a).is_none());
let row_counts_col_x =
as_uint64_array(&composite_stats.row_counts(&col_x).unwrap())
.unwrap()
.into_iter()
.collect::<Vec<_>>();
let expected_row_counts = vec![Some(100), Some(200)];
assert_eq!(row_counts_col_x, expected_row_counts);
let values = HashSet::from([ScalarValue::from(1i32)]);
let contained_part_a = composite_stats.contained(&part_a, &values).unwrap();
let expected_contained_part_a = BooleanArray::from(vec![true, false]);
assert_eq!(contained_part_a, expected_contained_part_a);
assert!(composite_stats.contained(&col_x, &values).is_none());
let non_existent = Column::new_unqualified("non_existent");
assert!(composite_stats.min_values(&non_existent).is_none());
assert!(composite_stats.max_values(&non_existent).is_none());
assert!(composite_stats.null_counts(&non_existent).is_none());
assert!(composite_stats.row_counts(&non_existent).is_none());
assert!(composite_stats.contained(&non_existent, &values).is_none());
assert_eq!(composite_stats.num_containers(), 2);
}
#[test]
fn test_composite_pruning_statistics_priority() {
let first_statistics = vec![
Arc::new(
Statistics::default()
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::from(100i32)))
.with_max_value(Precision::Exact(ScalarValue::from(200i32)))
.with_null_count(Precision::Exact(0)),
)
.with_num_rows(Precision::Exact(100)),
),
Arc::new(
Statistics::default()
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::from(300i32)))
.with_max_value(Precision::Exact(ScalarValue::from(400i32)))
.with_null_count(Precision::Exact(5)),
)
.with_num_rows(Precision::Exact(200)),
),
];
let first_schema = Arc::new(Schema::new(vec![Field::new(
"col_a",
DataType::Int32,
false,
)]));
let first_stats = PrunableStatistics::new(first_statistics, first_schema);
let second_statistics = vec![
Arc::new(
Statistics::default()
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::from(1000i32)))
.with_max_value(Precision::Exact(ScalarValue::from(2000i32)))
.with_null_count(Precision::Exact(10)),
)
.with_num_rows(Precision::Exact(1000)),
),
Arc::new(
Statistics::default()
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::from(3000i32)))
.with_max_value(Precision::Exact(ScalarValue::from(4000i32)))
.with_null_count(Precision::Exact(20)),
)
.with_num_rows(Precision::Exact(2000)),
),
];
let second_schema = Arc::new(Schema::new(vec![Field::new(
"col_a",
DataType::Int32,
false,
)]));
let second_stats = PrunableStatistics::new(second_statistics, second_schema);
let composite_stats = CompositePruningStatistics::new(vec![
Box::new(first_stats.clone()),
Box::new(second_stats.clone()),
]);
let col_a = Column::new_unqualified("col_a");
let min_values = as_int32_array(&composite_stats.min_values(&col_a).unwrap())
.unwrap()
.into_iter()
.collect::<Vec<_>>();
let expected_min_values = vec![Some(100), Some(300)];
assert_eq!(min_values, expected_min_values);
let max_values = as_int32_array(&composite_stats.max_values(&col_a).unwrap())
.unwrap()
.into_iter()
.collect::<Vec<_>>();
let expected_max_values = vec![Some(200), Some(400)];
assert_eq!(max_values, expected_max_values);
let null_counts = as_uint64_array(&composite_stats.null_counts(&col_a).unwrap())
.unwrap()
.into_iter()
.collect::<Vec<_>>();
let expected_null_counts = vec![Some(0), Some(5)];
assert_eq!(null_counts, expected_null_counts);
let row_counts = as_uint64_array(&composite_stats.row_counts(&col_a).unwrap())
.unwrap()
.into_iter()
.collect::<Vec<_>>();
let expected_row_counts = vec![Some(100), Some(200)];
assert_eq!(row_counts, expected_row_counts);
let composite_stats_reversed = CompositePruningStatistics::new(vec![
Box::new(second_stats.clone()),
Box::new(first_stats.clone()),
]);
let min_values =
as_int32_array(&composite_stats_reversed.min_values(&col_a).unwrap())
.unwrap()
.into_iter()
.collect::<Vec<_>>();
let expected_min_values = vec![Some(1000), Some(3000)];
assert_eq!(min_values, expected_min_values);
let max_values =
as_int32_array(&composite_stats_reversed.max_values(&col_a).unwrap())
.unwrap()
.into_iter()
.collect::<Vec<_>>();
let expected_max_values = vec![Some(2000), Some(4000)];
assert_eq!(max_values, expected_max_values);
let null_counts =
as_uint64_array(&composite_stats_reversed.null_counts(&col_a).unwrap())
.unwrap()
.into_iter()
.collect::<Vec<_>>();
let expected_null_counts = vec![Some(10), Some(20)];
assert_eq!(null_counts, expected_null_counts);
let row_counts =
as_uint64_array(&composite_stats_reversed.row_counts(&col_a).unwrap())
.unwrap()
.into_iter()
.collect::<Vec<_>>();
let expected_row_counts = vec![Some(1000), Some(2000)];
assert_eq!(row_counts, expected_row_counts);
}
#[test]
fn test_composite_pruning_statistics_empty_and_mismatched_containers() {
let result = std::panic::catch_unwind(|| {
CompositePruningStatistics::new(vec![]);
});
assert!(result.is_err());
let result = std::panic::catch_unwind(|| {
let partition_values_1 = vec![
vec![ScalarValue::from(1i32), ScalarValue::from(10i32)],
vec![ScalarValue::from(2i32), ScalarValue::from(20i32)],
];
let partition_fields_1 = vec![
Arc::new(Field::new("part_a", DataType::Int32, false)),
Arc::new(Field::new("part_b", DataType::Int32, false)),
];
let partition_stats_1 = PartitionPruningStatistics::try_new(
partition_values_1,
partition_fields_1,
)
.unwrap();
let partition_values_2 = vec![
vec![ScalarValue::from(3i32), ScalarValue::from(30i32)],
vec![ScalarValue::from(4i32), ScalarValue::from(40i32)],
vec![ScalarValue::from(5i32), ScalarValue::from(50i32)],
];
let partition_fields_2 = vec![
Arc::new(Field::new("part_x", DataType::Int32, false)),
Arc::new(Field::new("part_y", DataType::Int32, false)),
];
let partition_stats_2 = PartitionPruningStatistics::try_new(
partition_values_2,
partition_fields_2,
)
.unwrap();
CompositePruningStatistics::new(vec![
Box::new(partition_stats_1),
Box::new(partition_stats_2),
]);
});
assert!(result.is_err());
}
}