#[cfg(feature = "parquet")]
pub mod parquet;
pub mod csv;
use futures::Stream;
use std::any::Any;
use std::collections::HashMap;
use std::fs::File;
use std::io::Write;
use std::path::Path;
use std::sync::Arc;
use std::task::{Context, Poll};
use crate::catalog::{TableProvider, TableProviderFactory};
use crate::dataframe::DataFrame;
use crate::datasource::stream::{FileStreamProvider, StreamConfig, StreamTable};
use crate::datasource::{empty::EmptyTable, provider_as_source};
use crate::error::Result;
use crate::logical_expr::{LogicalPlanBuilder, UNNAMED_TABLE};
use crate::physical_plan::ExecutionPlan;
use crate::prelude::{CsvReadOptions, SessionContext};
use crate::execution::SendableRecordBatchStream;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use datafusion_catalog::Session;
use datafusion_common::TableReference;
use datafusion_expr::{CreateExternalTable, Expr, SortExpr, TableType};
use std::pin::Pin;
use async_trait::async_trait;
use tempfile::TempDir;
#[cfg(feature = "parquet")]
pub use datafusion_common::test_util::parquet_test_data;
pub use datafusion_common::test_util::{arrow_test_data, get_data_dir};
use crate::execution::RecordBatchStream;
pub fn scan_empty(
name: Option<&str>,
table_schema: &Schema,
projection: Option<Vec<usize>>,
) -> Result<LogicalPlanBuilder> {
let table_schema = Arc::new(table_schema.clone());
let provider = Arc::new(EmptyTable::new(table_schema));
let name = TableReference::bare(name.unwrap_or(UNNAMED_TABLE));
LogicalPlanBuilder::scan(name, provider_as_source(provider), projection)
}
pub fn scan_empty_with_partitions(
name: Option<&str>,
table_schema: &Schema,
projection: Option<Vec<usize>>,
partitions: usize,
) -> Result<LogicalPlanBuilder> {
let table_schema = Arc::new(table_schema.clone());
let provider = Arc::new(EmptyTable::new(table_schema).with_partitions(partitions));
let name = TableReference::bare(name.unwrap_or(UNNAMED_TABLE));
LogicalPlanBuilder::scan(name, provider_as_source(provider), projection)
}
pub fn aggr_test_schema() -> SchemaRef {
let mut f1 = Field::new("c1", DataType::Utf8, false);
f1.set_metadata(HashMap::from_iter(vec![("testing".into(), "test".into())]));
let schema = Schema::new(vec![
f1,
Field::new("c2", DataType::UInt32, false),
Field::new("c3", DataType::Int8, false),
Field::new("c4", DataType::Int16, false),
Field::new("c5", DataType::Int32, false),
Field::new("c6", DataType::Int64, false),
Field::new("c7", DataType::UInt8, false),
Field::new("c8", DataType::UInt16, false),
Field::new("c9", DataType::UInt32, false),
Field::new("c10", DataType::UInt64, false),
Field::new("c11", DataType::Float32, false),
Field::new("c12", DataType::Float64, false),
Field::new("c13", DataType::Utf8, false),
]);
Arc::new(schema)
}
pub async fn register_aggregate_csv(
ctx: &SessionContext,
table_name: &str,
) -> Result<()> {
let schema = aggr_test_schema();
let testdata = arrow_test_data();
ctx.register_csv(
table_name,
&format!("{testdata}/csv/aggregate_test_100.csv"),
CsvReadOptions::new().schema(schema.as_ref()),
)
.await?;
Ok(())
}
pub async fn test_table_with_name(name: &str) -> Result<DataFrame> {
let ctx = SessionContext::new();
register_aggregate_csv(&ctx, name).await?;
ctx.table(name).await
}
pub async fn test_table() -> Result<DataFrame> {
test_table_with_name("aggregate_test_100").await
}
pub async fn plan_and_collect(
ctx: &SessionContext,
sql: &str,
) -> Result<Vec<RecordBatch>> {
ctx.sql(sql).await?.collect().await
}
pub fn populate_csv_partitions(
tmp_dir: &TempDir,
partition_count: usize,
file_extension: &str,
) -> Result<SchemaRef> {
let schema = Arc::new(Schema::new(vec![
Field::new("c1", DataType::UInt32, false),
Field::new("c2", DataType::UInt64, false),
Field::new("c3", DataType::Boolean, false),
]));
for partition in 0..partition_count {
let filename = format!("partition-{partition}.{file_extension}");
let file_path = tmp_dir.path().join(filename);
let mut file = File::create(file_path)?;
for i in 0..=10 {
let data = format!("{},{},{}\n", partition, i, i % 2 == 0);
file.write_all(data.as_bytes())?;
}
}
Ok(schema)
}
#[derive(Default, Debug)]
pub struct TestTableFactory {}
#[async_trait]
impl TableProviderFactory for TestTableFactory {
async fn create(
&self,
_: &dyn Session,
cmd: &CreateExternalTable,
) -> Result<Arc<dyn TableProvider>> {
Ok(Arc::new(TestTableProvider {
url: cmd.location.to_string(),
schema: Arc::new(cmd.schema.as_ref().into()),
}))
}
}
#[derive(Debug)]
pub struct TestTableProvider {
pub url: String,
pub schema: SchemaRef,
}
impl TestTableProvider {}
#[async_trait]
impl TableProvider for TestTableProvider {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}
fn table_type(&self) -> TableType {
unimplemented!("TestTableProvider is a stub for testing.")
}
async fn scan(
&self,
_state: &dyn Session,
_projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
unimplemented!("TestTableProvider is a stub for testing.")
}
}
pub fn register_unbounded_file_with_ordering(
ctx: &SessionContext,
schema: SchemaRef,
file_path: &Path,
table_name: &str,
file_sort_order: Vec<Vec<SortExpr>>,
) -> Result<()> {
let source = FileStreamProvider::new_file(schema, file_path.into());
let config = StreamConfig::new(Arc::new(source)).with_order(file_sort_order);
ctx.register_table(table_name, Arc::new(StreamTable::new(Arc::new(config))))?;
Ok(())
}
pub fn bounded_stream(
record_batch: RecordBatch,
limit: usize,
) -> SendableRecordBatchStream {
Box::pin(BoundedStream {
record_batch,
count: 0,
limit,
})
}
struct BoundedStream {
record_batch: RecordBatch,
count: usize,
limit: usize,
}
impl Stream for BoundedStream {
type Item = Result<RecordBatch, crate::error::DataFusionError>;
fn poll_next(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
if self.count >= self.limit {
Poll::Ready(None)
} else {
self.count += 1;
Poll::Ready(Some(Ok(self.record_batch.clone())))
}
}
}
impl RecordBatchStream for BoundedStream {
fn schema(&self) -> SchemaRef {
self.record_batch.schema()
}
}