1#![allow(missing_docs)]
21
22use std::fs::File;
23use std::io::prelude::*;
24use std::io::{BufReader, BufWriter};
25use std::path::Path;
26use std::sync::Arc;
27
28use crate::datasource::file_format::csv::CsvFormat;
29use crate::datasource::file_format::file_compression_type::FileCompressionType;
30use crate::datasource::file_format::FileFormat;
31use crate::datasource::listing::PartitionedFile;
32use crate::datasource::object_store::ObjectStoreUrl;
33use crate::datasource::physical_plan::CsvSource;
34use crate::datasource::{MemTable, TableProvider};
35use crate::error::Result;
36use crate::logical_expr::LogicalPlan;
37use crate::test::object_store::local_unpartitioned_file;
38use crate::test_util::{aggr_test_schema, arrow_test_data};
39
40use arrow::array::{self, Array, ArrayRef, Decimal128Builder, Int32Array};
41use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
42use arrow::record_batch::RecordBatch;
43use datafusion_common::DataFusionError;
44use datafusion_datasource::file::FileSource;
45use datafusion_datasource::file_scan_config::FileScanConfig;
46use datafusion_datasource::source::DataSourceExec;
47
48#[cfg(feature = "compression")]
49use bzip2::write::BzEncoder;
50#[cfg(feature = "compression")]
51use bzip2::Compression as BzCompression;
52#[cfg(feature = "compression")]
53use flate2::write::GzEncoder;
54#[cfg(feature = "compression")]
55use flate2::Compression as GzCompression;
56#[cfg(feature = "compression")]
57use xz2::write::XzEncoder;
58#[cfg(feature = "compression")]
59use zstd::Encoder as ZstdEncoder;
60
61pub fn create_table_dual() -> Arc<dyn TableProvider> {
62 let dual_schema = Arc::new(Schema::new(vec![
63 Field::new("id", DataType::Int32, false),
64 Field::new("name", DataType::Utf8, false),
65 ]));
66 let batch = RecordBatch::try_new(
67 Arc::<Schema>::clone(&dual_schema),
68 vec![
69 Arc::new(Int32Array::from(vec![1])),
70 Arc::new(array::StringArray::from(vec!["a"])),
71 ],
72 )
73 .unwrap();
74 let provider = MemTable::try_new(dual_schema, vec![vec![batch]]).unwrap();
75 Arc::new(provider)
76}
77
78pub fn scan_partitioned_csv(
80 partitions: usize,
81 work_dir: &Path,
82) -> Result<Arc<DataSourceExec>> {
83 let schema = aggr_test_schema();
84 let filename = "aggregate_test_100.csv";
85 let path = format!("{}/csv", arrow_test_data());
86 let file_groups = partitioned_file_groups(
87 path.as_str(),
88 filename,
89 partitions,
90 Arc::new(CsvFormat::default()),
91 FileCompressionType::UNCOMPRESSED,
92 work_dir,
93 )?;
94 let source = Arc::new(CsvSource::new(true, b'"', b'"'));
95 let config = partitioned_csv_config(schema, file_groups, source)
96 .with_file_compression_type(FileCompressionType::UNCOMPRESSED);
97 Ok(config.build())
98}
99
100pub fn partitioned_file_groups(
102 path: &str,
103 filename: &str,
104 partitions: usize,
105 file_format: Arc<dyn FileFormat>,
106 file_compression_type: FileCompressionType,
107 work_dir: &Path,
108) -> Result<Vec<Vec<PartitionedFile>>> {
109 let path = format!("{path}/{filename}");
110
111 let mut writers = vec![];
112 let mut files = vec![];
113 for i in 0..partitions {
114 let filename = format!(
115 "partition-{}{}",
116 i,
117 file_format
118 .get_ext_with_compression(&file_compression_type)
119 .unwrap()
120 );
121 let filename = work_dir.join(filename);
122
123 let file = File::create(&filename).unwrap();
124
125 let encoder: Box<dyn Write + Send> = match file_compression_type.to_owned() {
126 FileCompressionType::UNCOMPRESSED => Box::new(file),
127 #[cfg(feature = "compression")]
128 FileCompressionType::GZIP => {
129 Box::new(GzEncoder::new(file, GzCompression::default()))
130 }
131 #[cfg(feature = "compression")]
132 FileCompressionType::XZ => Box::new(XzEncoder::new(file, 9)),
133 #[cfg(feature = "compression")]
134 FileCompressionType::ZSTD => {
135 let encoder = ZstdEncoder::new(file, 0)
136 .map_err(|e| DataFusionError::External(Box::new(e)))?
137 .auto_finish();
138 Box::new(encoder)
139 }
140 #[cfg(feature = "compression")]
141 FileCompressionType::BZIP2 => {
142 Box::new(BzEncoder::new(file, BzCompression::default()))
143 }
144 #[cfg(not(feature = "compression"))]
145 FileCompressionType::GZIP
146 | FileCompressionType::BZIP2
147 | FileCompressionType::XZ
148 | FileCompressionType::ZSTD => {
149 panic!("Compression is not supported in this build")
150 }
151 };
152
153 let writer = BufWriter::new(encoder);
154 writers.push(writer);
155 files.push(filename);
156 }
157
158 let f = File::open(path)?;
159 let f = BufReader::new(f);
160 for (i, line) in f.lines().enumerate() {
161 let line = line.unwrap();
162
163 if i == 0 && file_format.get_ext() == CsvFormat::default().get_ext() {
164 for w in writers.iter_mut() {
166 w.write_all(line.as_bytes()).unwrap();
167 w.write_all(b"\n").unwrap();
168 }
169 } else {
170 let partition = i % partitions;
172 writers[partition].write_all(line.as_bytes()).unwrap();
173 writers[partition].write_all(b"\n").unwrap();
174 }
175 }
176
177 for mut w in writers.into_iter() {
180 w.flush().unwrap();
181 }
182
183 Ok(files
184 .into_iter()
185 .map(|f| vec![local_unpartitioned_file(f).into()])
186 .collect::<Vec<_>>())
187}
188
189pub fn partitioned_csv_config(
191 schema: SchemaRef,
192 file_groups: Vec<Vec<PartitionedFile>>,
193 file_source: Arc<dyn FileSource>,
194) -> FileScanConfig {
195 FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema, file_source)
196 .with_file_groups(file_groups)
197}
198
199pub fn assert_fields_eq(plan: &LogicalPlan, expected: Vec<&str>) {
200 let actual: Vec<String> = plan
201 .schema()
202 .fields()
203 .iter()
204 .map(|f| f.name().clone())
205 .collect();
206 assert_eq!(actual, expected);
207}
208
209pub fn columns(schema: &Schema) -> Vec<String> {
211 schema.fields().iter().map(|f| f.name().clone()).collect()
212}
213
214pub fn table_with_sequence(
217 seq_start: i32,
218 seq_end: i32,
219) -> Result<Arc<dyn TableProvider>> {
220 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
221 let arr = Arc::new(Int32Array::from((seq_start..=seq_end).collect::<Vec<_>>()));
222 let partitions = vec![vec![RecordBatch::try_new(
223 Arc::<Schema>::clone(&schema),
224 vec![arr as ArrayRef],
225 )?]];
226 Ok(Arc::new(MemTable::try_new(schema, partitions)?))
227}
228
229pub fn make_partition(sz: i32) -> RecordBatch {
231 let seq_start = 0;
232 let seq_end = sz;
233 let values = (seq_start..seq_end).collect::<Vec<_>>();
234 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
235 let arr = Arc::new(Int32Array::from(values));
236 let arr = arr as ArrayRef;
237
238 RecordBatch::try_new(schema, vec![arr]).unwrap()
239}
240
241pub fn table_with_decimal() -> Arc<dyn TableProvider> {
243 let batch_decimal = make_decimal();
244 let schema = batch_decimal.schema();
245 let partitions = vec![vec![batch_decimal]];
246 Arc::new(MemTable::try_new(schema, partitions).unwrap())
247}
248
249fn make_decimal() -> RecordBatch {
250 let mut decimal_builder = Decimal128Builder::with_capacity(20);
251 for i in 110000..110010 {
252 decimal_builder.append_value(i as i128);
253 }
254 for i in 100000..100010 {
255 decimal_builder.append_value(-i as i128);
256 }
257 let array = decimal_builder
258 .finish()
259 .with_precision_and_scale(10, 3)
260 .unwrap();
261 let schema = Schema::new(vec![Field::new("c1", array.data_type().clone(), true)]);
262 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap()
263}
264
265pub mod object_store;
266pub mod variable;