use std::fs::File;
use std::path::PathBuf;
use std::sync::Arc;
use crate::arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
use crate::common::ToDFSchema;
use crate::config::ConfigOptions;
use crate::datasource::listing::{ListingTableUrl, PartitionedFile};
use crate::datasource::object_store::ObjectStoreUrl;
use crate::datasource::physical_plan::{FileScanConfig, ParquetExec};
use crate::error::Result;
use crate::logical_expr::execution_props::ExecutionProps;
use crate::logical_expr::simplify::SimplifyContext;
use crate::optimizer::simplify_expressions::ExprSimplifier;
use crate::physical_expr::create_physical_expr;
use crate::physical_plan::filter::FilterExec;
use crate::physical_plan::metrics::MetricsSet;
use crate::physical_plan::ExecutionPlan;
use crate::prelude::{Expr, SessionConfig, SessionContext};
use crate::datasource::physical_plan::parquet::ParquetExecBuilder;
use object_store::path::Path;
use object_store::ObjectMeta;
use parquet::arrow::ArrowWriter;
use parquet::file::properties::WriterProperties;
pub struct TestParquetFile {
path: PathBuf,
schema: SchemaRef,
object_store_url: ObjectStoreUrl,
object_meta: ObjectMeta,
}
#[derive(Debug, Clone, Copy)]
pub struct ParquetScanOptions {
pub pushdown_filters: bool,
pub reorder_filters: bool,
pub enable_page_index: bool,
}
impl ParquetScanOptions {
pub fn config(&self) -> SessionConfig {
let mut config = ConfigOptions::new();
config.execution.parquet.pushdown_filters = self.pushdown_filters;
config.execution.parquet.reorder_filters = self.reorder_filters;
config.execution.parquet.enable_page_index = self.enable_page_index;
config.into()
}
}
impl TestParquetFile {
pub fn try_new(
path: PathBuf,
props: WriterProperties,
batches: impl IntoIterator<Item = RecordBatch>,
) -> Result<Self> {
let file = File::create(&path).unwrap();
let mut batches = batches.into_iter();
let first_batch = batches.next().expect("need at least one record batch");
let schema = first_batch.schema();
let mut writer = ArrowWriter::try_new(file, schema.clone(), Some(props)).unwrap();
writer.write(&first_batch).unwrap();
let mut num_rows = first_batch.num_rows();
for batch in batches {
writer.write(&batch).unwrap();
num_rows += batch.num_rows();
}
writer.close().unwrap();
println!("Generated test dataset with {num_rows} rows");
let size = std::fs::metadata(&path)?.len() as usize;
let canonical_path = path.canonicalize()?;
let object_store_url =
ListingTableUrl::parse(canonical_path.to_str().unwrap_or_default())?
.object_store();
let object_meta = ObjectMeta {
location: Path::parse(canonical_path.to_str().unwrap_or_default())?,
last_modified: Default::default(),
size,
e_tag: None,
version: None,
};
Ok(Self {
path,
schema,
object_store_url,
object_meta,
})
}
}
impl TestParquetFile {
pub async fn create_scan(
&self,
ctx: &SessionContext,
maybe_filter: Option<Expr>,
) -> Result<Arc<dyn ExecutionPlan>> {
let scan_config =
FileScanConfig::new(self.object_store_url.clone(), self.schema.clone())
.with_file(PartitionedFile {
object_meta: self.object_meta.clone(),
partition_values: vec![],
range: None,
statistics: None,
extensions: None,
});
let df_schema = self.schema.clone().to_dfschema_ref()?;
let props = ExecutionProps::new();
let context = SimplifyContext::new(&props).with_schema(df_schema.clone());
let parquet_options = ctx.copied_table_options().parquet;
if let Some(filter) = maybe_filter {
let simplifier = ExprSimplifier::new(context);
let filter = simplifier.coerce(filter, &df_schema).unwrap();
let physical_filter_expr =
create_physical_expr(&filter, &df_schema, &ExecutionProps::default())?;
let parquet_exec =
ParquetExecBuilder::new_with_options(scan_config, parquet_options)
.with_predicate(physical_filter_expr.clone())
.build_arc();
let exec = Arc::new(FilterExec::try_new(physical_filter_expr, parquet_exec)?);
Ok(exec)
} else {
Ok(
ParquetExecBuilder::new_with_options(scan_config, parquet_options)
.build_arc(),
)
}
}
pub fn parquet_metrics(plan: &Arc<dyn ExecutionPlan>) -> Option<MetricsSet> {
if let Some(parquet) = plan.as_any().downcast_ref::<ParquetExec>() {
return parquet.metrics();
}
for child in plan.children() {
if let Some(metrics) = Self::parquet_metrics(child) {
return Some(metrics);
}
}
None
}
pub fn schema(&self) -> SchemaRef {
self.schema.clone()
}
pub fn path(&self) -> &std::path::Path {
self.path.as_path()
}
}