use std::fs::File;
use std::path::PathBuf;
use std::sync::Arc;
use crate::arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
use crate::common::ToDFSchema;
use crate::config::ConfigOptions;
use crate::datasource::listing::{ListingTableUrl, PartitionedFile};
use crate::datasource::object_store::ObjectStoreUrl;
use crate::datasource::physical_plan::ParquetSource;
use crate::error::Result;
use crate::logical_expr::execution_props::ExecutionProps;
use crate::logical_expr::simplify::SimplifyContext;
use crate::optimizer::simplify_expressions::ExprSimplifier;
use crate::physical_expr::create_physical_expr;
use crate::physical_plan::filter::FilterExec;
use crate::physical_plan::metrics::MetricsSet;
use crate::physical_plan::ExecutionPlan;
use crate::prelude::{Expr, SessionConfig, SessionContext};
use datafusion_datasource::file::FileSource;
use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
use datafusion_datasource::source::DataSourceExec;
use object_store::path::Path;
use object_store::ObjectMeta;
use parquet::arrow::ArrowWriter;
use parquet::file::properties::WriterProperties;
pub struct TestParquetFile {
path: PathBuf,
schema: SchemaRef,
object_store_url: ObjectStoreUrl,
object_meta: ObjectMeta,
}
#[derive(Debug, Clone, Copy)]
pub struct ParquetScanOptions {
pub pushdown_filters: bool,
pub reorder_filters: bool,
pub enable_page_index: bool,
}
impl ParquetScanOptions {
pub fn config(&self) -> SessionConfig {
let mut config = ConfigOptions::new();
config.execution.parquet.pushdown_filters = self.pushdown_filters;
config.execution.parquet.reorder_filters = self.reorder_filters;
config.execution.parquet.enable_page_index = self.enable_page_index;
config.into()
}
}
impl TestParquetFile {
pub fn try_new(
path: PathBuf,
props: WriterProperties,
batches: impl IntoIterator<Item = RecordBatch>,
) -> Result<Self> {
let file = File::create(&path)?;
let mut batches = batches.into_iter();
let first_batch = batches.next().expect("need at least one record batch");
let schema = first_batch.schema();
let mut writer = ArrowWriter::try_new(file, Arc::clone(&schema), Some(props))?;
writer.write(&first_batch)?;
let mut num_rows = first_batch.num_rows();
for batch in batches {
writer.write(&batch)?;
num_rows += batch.num_rows();
}
writer.close()?;
println!("Generated test dataset with {num_rows} rows");
let size = std::fs::metadata(&path)?.len();
let mut canonical_path = path.canonicalize()?;
if cfg!(target_os = "windows") {
canonical_path = canonical_path
.to_str()
.unwrap()
.replace("\\", "/")
.strip_prefix("//?/")
.unwrap()
.into();
};
let object_store_url =
ListingTableUrl::parse(canonical_path.to_str().unwrap_or_default())?
.object_store();
let object_meta = ObjectMeta {
location: Path::parse(canonical_path.to_str().unwrap_or_default())?,
last_modified: Default::default(),
size,
e_tag: None,
version: None,
};
Ok(Self {
path,
schema,
object_store_url,
object_meta,
})
}
}
impl TestParquetFile {
pub async fn create_scan(
&self,
ctx: &SessionContext,
maybe_filter: Option<Expr>,
) -> Result<Arc<dyn ExecutionPlan>> {
let parquet_options = ctx.copied_table_options().parquet;
let source = Arc::new(ParquetSource::new(parquet_options.clone()));
let scan_config_builder = FileScanConfigBuilder::new(
self.object_store_url.clone(),
Arc::clone(&self.schema),
source,
)
.with_file(PartitionedFile {
object_meta: self.object_meta.clone(),
partition_values: vec![],
range: None,
statistics: None,
extensions: None,
metadata_size_hint: None,
});
let df_schema = Arc::clone(&self.schema).to_dfschema_ref()?;
let props = ExecutionProps::new();
let context = SimplifyContext::new(&props).with_schema(Arc::clone(&df_schema));
if let Some(filter) = maybe_filter {
let simplifier = ExprSimplifier::new(context);
let filter = simplifier.coerce(filter, &df_schema).unwrap();
let physical_filter_expr =
create_physical_expr(&filter, &df_schema, &ExecutionProps::default())?;
let source = Arc::new(
ParquetSource::new(parquet_options)
.with_predicate(Arc::clone(&physical_filter_expr)),
)
.with_schema(Arc::clone(&self.schema));
let config = scan_config_builder.with_source(source).build();
let parquet_exec = DataSourceExec::from_data_source(config);
let exec = Arc::new(FilterExec::try_new(physical_filter_expr, parquet_exec)?);
Ok(exec)
} else {
let config = scan_config_builder.build();
Ok(DataSourceExec::from_data_source(config))
}
}
pub fn parquet_metrics(plan: &Arc<dyn ExecutionPlan>) -> Option<MetricsSet> {
if let Some(data_source_exec) = plan.as_any().downcast_ref::<DataSourceExec>() {
if data_source_exec
.downcast_to_file_source::<ParquetSource>()
.is_some()
{
return data_source_exec.metrics();
}
}
for child in plan.children() {
if let Some(metrics) = Self::parquet_metrics(child) {
return Some(metrics);
}
}
None
}
pub fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}
pub fn path(&self) -> &std::path::Path {
self.path.as_path()
}
}