use std::any::Any;
use std::fs::File;
use std::io::prelude::*;
use std::io::{BufReader, BufWriter};
use std::path::Path;
use std::sync::Arc;
use crate::datasource::file_format::csv::CsvFormat;
use crate::datasource::file_format::file_compression_type::FileCompressionType;
use crate::datasource::file_format::FileFormat;
use crate::datasource::listing::PartitionedFile;
use crate::datasource::object_store::ObjectStoreUrl;
use crate::datasource::physical_plan::{CsvExec, FileScanConfig};
use crate::datasource::{MemTable, TableProvider};
use crate::error::Result;
use crate::logical_expr::LogicalPlan;
use crate::physical_plan::ExecutionPlan;
use crate::test::object_store::local_unpartitioned_file;
use crate::test_util::{aggr_test_schema, arrow_test_data};
use arrow::array::{self, Array, ArrayRef, Decimal128Builder, Int32Array};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use datafusion_common::{DataFusionError, Statistics};
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalSortExpr};
use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec};
use datafusion_physical_plan::{
DisplayAs, DisplayFormatType, ExecutionMode, PlanProperties,
};
#[cfg(feature = "compression")]
use bzip2::write::BzEncoder;
#[cfg(feature = "compression")]
use bzip2::Compression as BzCompression;
#[cfg(feature = "compression")]
use flate2::write::GzEncoder;
#[cfg(feature = "compression")]
use flate2::Compression as GzCompression;
#[cfg(feature = "compression")]
use xz2::write::XzEncoder;
#[cfg(feature = "compression")]
use zstd::Encoder as ZstdEncoder;
pub fn create_table_dual() -> Arc<dyn TableProvider> {
let dual_schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("name", DataType::Utf8, false),
]));
let batch = RecordBatch::try_new(
dual_schema.clone(),
vec![
Arc::new(array::Int32Array::from(vec![1])),
Arc::new(array::StringArray::from(vec!["a"])),
],
)
.unwrap();
let provider = MemTable::try_new(dual_schema, vec![vec![batch]]).unwrap();
Arc::new(provider)
}
pub fn scan_partitioned_csv(partitions: usize, work_dir: &Path) -> Result<Arc<CsvExec>> {
let schema = aggr_test_schema();
let filename = "aggregate_test_100.csv";
let path = format!("{}/csv", arrow_test_data());
let file_groups = partitioned_file_groups(
path.as_str(),
filename,
partitions,
Arc::new(CsvFormat::default()),
FileCompressionType::UNCOMPRESSED,
work_dir,
)?;
let config = partitioned_csv_config(schema, file_groups);
Ok(Arc::new(
CsvExec::builder(config)
.with_has_header(true)
.with_delimeter(b',')
.with_quote(b'"')
.with_escape(None)
.with_comment(None)
.with_newlines_in_values(false)
.with_file_compression_type(FileCompressionType::UNCOMPRESSED)
.build(),
))
}
pub fn partitioned_file_groups(
path: &str,
filename: &str,
partitions: usize,
file_format: Arc<dyn FileFormat>,
file_compression_type: FileCompressionType,
work_dir: &Path,
) -> Result<Vec<Vec<PartitionedFile>>> {
let path = format!("{path}/{filename}");
let mut writers = vec![];
let mut files = vec![];
for i in 0..partitions {
let filename = format!(
"partition-{}{}",
i,
file_format
.get_ext_with_compression(&file_compression_type)
.unwrap()
);
let filename = work_dir.join(filename);
let file = File::create(&filename).unwrap();
let encoder: Box<dyn Write + Send> = match file_compression_type.to_owned() {
FileCompressionType::UNCOMPRESSED => Box::new(file),
#[cfg(feature = "compression")]
FileCompressionType::GZIP => {
Box::new(GzEncoder::new(file, GzCompression::default()))
}
#[cfg(feature = "compression")]
FileCompressionType::XZ => Box::new(XzEncoder::new(file, 9)),
#[cfg(feature = "compression")]
FileCompressionType::ZSTD => {
let encoder = ZstdEncoder::new(file, 0)
.map_err(|e| DataFusionError::External(Box::new(e)))?
.auto_finish();
Box::new(encoder)
}
#[cfg(feature = "compression")]
FileCompressionType::BZIP2 => {
Box::new(BzEncoder::new(file, BzCompression::default()))
}
#[cfg(not(feature = "compression"))]
FileCompressionType::GZIP
| FileCompressionType::BZIP2
| FileCompressionType::XZ
| FileCompressionType::ZSTD => {
panic!("Compression is not supported in this build")
}
};
let writer = BufWriter::new(encoder);
writers.push(writer);
files.push(filename);
}
let f = File::open(path)?;
let f = BufReader::new(f);
for (i, line) in f.lines().enumerate() {
let line = line.unwrap();
if i == 0 && file_format.get_ext() == CsvFormat::default().get_ext() {
for w in writers.iter_mut() {
w.write_all(line.as_bytes()).unwrap();
w.write_all(b"\n").unwrap();
}
} else {
let partition = i % partitions;
writers[partition].write_all(line.as_bytes()).unwrap();
writers[partition].write_all(b"\n").unwrap();
}
}
for mut w in writers.into_iter() {
w.flush().unwrap();
}
Ok(files
.into_iter()
.map(|f| vec![local_unpartitioned_file(f).into()])
.collect::<Vec<_>>())
}
pub fn partitioned_csv_config(
schema: SchemaRef,
file_groups: Vec<Vec<PartitionedFile>>,
) -> FileScanConfig {
FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema)
.with_file_groups(file_groups)
}
pub fn assert_fields_eq(plan: &LogicalPlan, expected: Vec<&str>) {
let actual: Vec<String> = plan
.schema()
.fields()
.iter()
.map(|f| f.name().clone())
.collect();
assert_eq!(actual, expected);
}
pub fn columns(schema: &Schema) -> Vec<String> {
schema.fields().iter().map(|f| f.name().clone()).collect()
}
pub fn table_with_sequence(
seq_start: i32,
seq_end: i32,
) -> Result<Arc<dyn TableProvider>> {
let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
let arr = Arc::new(Int32Array::from((seq_start..=seq_end).collect::<Vec<_>>()));
let partitions = vec![vec![RecordBatch::try_new(
schema.clone(),
vec![arr as ArrayRef],
)?]];
Ok(Arc::new(MemTable::try_new(schema, partitions)?))
}
pub fn make_partition(sz: i32) -> RecordBatch {
let seq_start = 0;
let seq_end = sz;
let values = (seq_start..seq_end).collect::<Vec<_>>();
let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
let arr = Arc::new(Int32Array::from(values));
let arr = arr as ArrayRef;
RecordBatch::try_new(schema, vec![arr]).unwrap()
}
pub fn table_with_decimal() -> Arc<dyn TableProvider> {
let batch_decimal = make_decimal();
let schema = batch_decimal.schema();
let partitions = vec![vec![batch_decimal]];
Arc::new(MemTable::try_new(schema, partitions).unwrap())
}
fn make_decimal() -> RecordBatch {
let mut decimal_builder = Decimal128Builder::with_capacity(20);
for i in 110000..110010 {
decimal_builder.append_value(i as i128);
}
for i in 100000..100010 {
decimal_builder.append_value(-i as i128);
}
let array = decimal_builder
.finish()
.with_precision_and_scale(10, 3)
.unwrap();
let schema = Schema::new(vec![Field::new("c1", array.data_type().clone(), true)]);
RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap()
}
pub fn csv_exec_sorted(
schema: &SchemaRef,
sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
) -> Arc<dyn ExecutionPlan> {
let sort_exprs = sort_exprs.into_iter().collect();
Arc::new(
CsvExec::builder(
FileScanConfig::new(
ObjectStoreUrl::parse("test:///").unwrap(),
schema.clone(),
)
.with_file(PartitionedFile::new("x".to_string(), 100))
.with_output_ordering(vec![sort_exprs]),
)
.with_has_header(false)
.with_delimeter(0)
.with_quote(0)
.with_escape(None)
.with_comment(None)
.with_newlines_in_values(false)
.with_file_compression_type(FileCompressionType::UNCOMPRESSED)
.build(),
)
}
pub(crate) struct TestStreamPartition {
pub schema: SchemaRef,
}
impl PartitionStream for TestStreamPartition {
fn schema(&self) -> &SchemaRef {
&self.schema
}
fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
unreachable!()
}
}
pub fn stream_exec_ordered(
schema: &SchemaRef,
sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
) -> Arc<dyn ExecutionPlan> {
let sort_exprs = sort_exprs.into_iter().collect();
Arc::new(
StreamingTableExec::try_new(
schema.clone(),
vec![Arc::new(TestStreamPartition {
schema: schema.clone(),
}) as _],
None,
vec![sort_exprs],
true,
None,
)
.unwrap(),
)
}
pub fn csv_exec_ordered(
schema: &SchemaRef,
sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
) -> Arc<dyn ExecutionPlan> {
let sort_exprs = sort_exprs.into_iter().collect();
Arc::new(
CsvExec::builder(
FileScanConfig::new(
ObjectStoreUrl::parse("test:///").unwrap(),
schema.clone(),
)
.with_file(PartitionedFile::new("file_path".to_string(), 100))
.with_output_ordering(vec![sort_exprs]),
)
.with_has_header(true)
.with_delimeter(0)
.with_quote(b'"')
.with_escape(None)
.with_comment(None)
.with_newlines_in_values(false)
.with_file_compression_type(FileCompressionType::UNCOMPRESSED)
.build(),
)
}
#[derive(Debug, Clone)]
pub struct StatisticsExec {
stats: Statistics,
schema: Arc<Schema>,
cache: PlanProperties,
}
impl StatisticsExec {
pub fn new(stats: Statistics, schema: Schema) -> Self {
assert_eq!(
stats.column_statistics.len(), schema.fields().len(),
"if defined, the column statistics vector length should be the number of fields"
);
let cache = Self::compute_properties(Arc::new(schema.clone()));
Self {
stats,
schema: Arc::new(schema),
cache,
}
}
fn compute_properties(schema: SchemaRef) -> PlanProperties {
let eq_properties = EquivalenceProperties::new(schema);
PlanProperties::new(
eq_properties,
Partitioning::UnknownPartitioning(2),
ExecutionMode::Bounded,
)
}
}
impl DisplayAs for StatisticsExec {
fn fmt_as(
&self,
t: DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(
f,
"StatisticsExec: col_count={}, row_count={:?}",
self.schema.fields().len(),
self.stats.num_rows,
)
}
}
}
}
impl ExecutionPlan for StatisticsExec {
fn name(&self) -> &'static str {
Self::static_name()
}
fn as_any(&self) -> &dyn Any {
self
}
fn properties(&self) -> &PlanProperties {
&self.cache
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![]
}
fn with_new_children(
self: Arc<Self>,
_: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(self)
}
fn execute(
&self,
_partition: usize,
_context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
unimplemented!("This plan only serves for testing statistics")
}
fn statistics(&self) -> Result<Statistics> {
Ok(self.stats.clone())
}
}
pub mod object_store;
pub mod variable;