#![allow(missing_docs)]
use std::fs::File;
use std::io::prelude::*;
use std::io::{BufReader, BufWriter};
use std::path::Path;
use std::sync::Arc;
use crate::datasource::file_format::csv::CsvFormat;
use crate::datasource::file_format::file_compression_type::FileCompressionType;
use crate::datasource::file_format::FileFormat;
use crate::datasource::listing::PartitionedFile;
use crate::datasource::object_store::ObjectStoreUrl;
use crate::datasource::physical_plan::CsvSource;
use crate::datasource::{MemTable, TableProvider};
use crate::error::Result;
use crate::logical_expr::LogicalPlan;
use crate::test::object_store::local_unpartitioned_file;
use crate::test_util::{aggr_test_schema, arrow_test_data};
use arrow::array::{self, Array, ArrayRef, Decimal128Builder, Int32Array};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use datafusion_common::DataFusionError;
use datafusion_datasource::file::FileSource;
use datafusion_datasource::file_scan_config::FileScanConfig;
use datafusion_datasource::source::DataSourceExec;
#[cfg(feature = "compression")]
use bzip2::write::BzEncoder;
#[cfg(feature = "compression")]
use bzip2::Compression as BzCompression;
#[cfg(feature = "compression")]
use flate2::write::GzEncoder;
#[cfg(feature = "compression")]
use flate2::Compression as GzCompression;
#[cfg(feature = "compression")]
use xz2::write::XzEncoder;
#[cfg(feature = "compression")]
use zstd::Encoder as ZstdEncoder;
pub fn create_table_dual() -> Arc<dyn TableProvider> {
let dual_schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("name", DataType::Utf8, false),
]));
let batch = RecordBatch::try_new(
Arc::<Schema>::clone(&dual_schema),
vec![
Arc::new(Int32Array::from(vec![1])),
Arc::new(array::StringArray::from(vec!["a"])),
],
)
.unwrap();
let provider = MemTable::try_new(dual_schema, vec![vec![batch]]).unwrap();
Arc::new(provider)
}
pub fn scan_partitioned_csv(
partitions: usize,
work_dir: &Path,
) -> Result<Arc<DataSourceExec>> {
let schema = aggr_test_schema();
let filename = "aggregate_test_100.csv";
let path = format!("{}/csv", arrow_test_data());
let file_groups = partitioned_file_groups(
path.as_str(),
filename,
partitions,
Arc::new(CsvFormat::default()),
FileCompressionType::UNCOMPRESSED,
work_dir,
)?;
let source = Arc::new(CsvSource::new(true, b'"', b'"'));
let config = partitioned_csv_config(schema, file_groups, source)
.with_file_compression_type(FileCompressionType::UNCOMPRESSED);
Ok(config.build())
}
pub fn partitioned_file_groups(
path: &str,
filename: &str,
partitions: usize,
file_format: Arc<dyn FileFormat>,
file_compression_type: FileCompressionType,
work_dir: &Path,
) -> Result<Vec<Vec<PartitionedFile>>> {
let path = format!("{path}/{filename}");
let mut writers = vec![];
let mut files = vec![];
for i in 0..partitions {
let filename = format!(
"partition-{}{}",
i,
file_format
.get_ext_with_compression(&file_compression_type)
.unwrap()
);
let filename = work_dir.join(filename);
let file = File::create(&filename).unwrap();
let encoder: Box<dyn Write + Send> = match file_compression_type.to_owned() {
FileCompressionType::UNCOMPRESSED => Box::new(file),
#[cfg(feature = "compression")]
FileCompressionType::GZIP => {
Box::new(GzEncoder::new(file, GzCompression::default()))
}
#[cfg(feature = "compression")]
FileCompressionType::XZ => Box::new(XzEncoder::new(file, 9)),
#[cfg(feature = "compression")]
FileCompressionType::ZSTD => {
let encoder = ZstdEncoder::new(file, 0)
.map_err(|e| DataFusionError::External(Box::new(e)))?
.auto_finish();
Box::new(encoder)
}
#[cfg(feature = "compression")]
FileCompressionType::BZIP2 => {
Box::new(BzEncoder::new(file, BzCompression::default()))
}
#[cfg(not(feature = "compression"))]
FileCompressionType::GZIP
| FileCompressionType::BZIP2
| FileCompressionType::XZ
| FileCompressionType::ZSTD => {
panic!("Compression is not supported in this build")
}
};
let writer = BufWriter::new(encoder);
writers.push(writer);
files.push(filename);
}
let f = File::open(path)?;
let f = BufReader::new(f);
for (i, line) in f.lines().enumerate() {
let line = line.unwrap();
if i == 0 && file_format.get_ext() == CsvFormat::default().get_ext() {
for w in writers.iter_mut() {
w.write_all(line.as_bytes()).unwrap();
w.write_all(b"\n").unwrap();
}
} else {
let partition = i % partitions;
writers[partition].write_all(line.as_bytes()).unwrap();
writers[partition].write_all(b"\n").unwrap();
}
}
for mut w in writers.into_iter() {
w.flush().unwrap();
}
Ok(files
.into_iter()
.map(|f| vec![local_unpartitioned_file(f).into()])
.collect::<Vec<_>>())
}
pub fn partitioned_csv_config(
schema: SchemaRef,
file_groups: Vec<Vec<PartitionedFile>>,
file_source: Arc<dyn FileSource>,
) -> FileScanConfig {
FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema, file_source)
.with_file_groups(file_groups)
}
pub fn assert_fields_eq(plan: &LogicalPlan, expected: Vec<&str>) {
let actual: Vec<String> = plan
.schema()
.fields()
.iter()
.map(|f| f.name().clone())
.collect();
assert_eq!(actual, expected);
}
pub fn columns(schema: &Schema) -> Vec<String> {
schema.fields().iter().map(|f| f.name().clone()).collect()
}
pub fn table_with_sequence(
seq_start: i32,
seq_end: i32,
) -> Result<Arc<dyn TableProvider>> {
let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
let arr = Arc::new(Int32Array::from((seq_start..=seq_end).collect::<Vec<_>>()));
let partitions = vec![vec![RecordBatch::try_new(
Arc::<Schema>::clone(&schema),
vec![arr as ArrayRef],
)?]];
Ok(Arc::new(MemTable::try_new(schema, partitions)?))
}
pub fn make_partition(sz: i32) -> RecordBatch {
let seq_start = 0;
let seq_end = sz;
let values = (seq_start..seq_end).collect::<Vec<_>>();
let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
let arr = Arc::new(Int32Array::from(values));
let arr = arr as ArrayRef;
RecordBatch::try_new(schema, vec![arr]).unwrap()
}
pub fn table_with_decimal() -> Arc<dyn TableProvider> {
let batch_decimal = make_decimal();
let schema = batch_decimal.schema();
let partitions = vec![vec![batch_decimal]];
Arc::new(MemTable::try_new(schema, partitions).unwrap())
}
fn make_decimal() -> RecordBatch {
let mut decimal_builder = Decimal128Builder::with_capacity(20);
for i in 110000..110010 {
decimal_builder.append_value(i as i128);
}
for i in 100000..100010 {
decimal_builder.append_value(-i as i128);
}
let array = decimal_builder
.finish()
.with_precision_and_scale(10, 3)
.unwrap();
let schema = Schema::new(vec![Field::new("c1", array.data_type().clone(), true)]);
RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap()
}
pub mod object_store;
pub mod variable;