1#![allow(missing_docs)]
21
22use std::fs::File;
23use std::io::prelude::*;
24use std::io::{BufReader, BufWriter};
25use std::path::Path;
26use std::sync::Arc;
27
28use crate::datasource::file_format::csv::CsvFormat;
29use crate::datasource::file_format::file_compression_type::FileCompressionType;
30use crate::datasource::file_format::FileFormat;
31
32use crate::datasource::physical_plan::CsvSource;
33use crate::datasource::{MemTable, TableProvider};
34use crate::error::Result;
35use crate::logical_expr::LogicalPlan;
36use crate::test_util::{aggr_test_schema, arrow_test_data};
37
38use arrow::array::{self, Array, ArrayRef, Decimal128Builder, Int32Array};
39use arrow::datatypes::{DataType, Field, Schema};
40use arrow::record_batch::RecordBatch;
41#[cfg(feature = "compression")]
42use datafusion_common::DataFusionError;
43use datafusion_datasource::source::DataSourceExec;
44
45#[cfg(feature = "compression")]
46use bzip2::write::BzEncoder;
47#[cfg(feature = "compression")]
48use bzip2::Compression as BzCompression;
49use datafusion_datasource::file_groups::FileGroup;
50use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
51use datafusion_datasource_csv::partitioned_csv_config;
52#[cfg(feature = "compression")]
53use flate2::write::GzEncoder;
54#[cfg(feature = "compression")]
55use flate2::Compression as GzCompression;
56use object_store::local_unpartitioned_file;
57#[cfg(feature = "compression")]
58use xz2::write::XzEncoder;
59#[cfg(feature = "compression")]
60use zstd::Encoder as ZstdEncoder;
61
62pub fn create_table_dual() -> Arc<dyn TableProvider> {
63 let dual_schema = Arc::new(Schema::new(vec![
64 Field::new("id", DataType::Int32, false),
65 Field::new("name", DataType::Utf8, false),
66 ]));
67 let batch = RecordBatch::try_new(
68 Arc::<Schema>::clone(&dual_schema),
69 vec![
70 Arc::new(Int32Array::from(vec![1])),
71 Arc::new(array::StringArray::from(vec!["a"])),
72 ],
73 )
74 .unwrap();
75 let provider = MemTable::try_new(dual_schema, vec![vec![batch]]).unwrap();
76 Arc::new(provider)
77}
78
79pub fn scan_partitioned_csv(
81 partitions: usize,
82 work_dir: &Path,
83) -> Result<Arc<DataSourceExec>> {
84 let schema = aggr_test_schema();
85 let filename = "aggregate_test_100.csv";
86 let path = format!("{}/csv", arrow_test_data());
87 let file_groups = partitioned_file_groups(
88 path.as_str(),
89 filename,
90 partitions,
91 Arc::new(CsvFormat::default()),
92 FileCompressionType::UNCOMPRESSED,
93 work_dir,
94 )?;
95 let source = Arc::new(CsvSource::new(true, b'"', b'"'));
96 let config =
97 FileScanConfigBuilder::from(partitioned_csv_config(schema, file_groups, source))
98 .with_file_compression_type(FileCompressionType::UNCOMPRESSED)
99 .build();
100 Ok(DataSourceExec::from_data_source(config))
101}
102
103pub fn partitioned_file_groups(
105 path: &str,
106 filename: &str,
107 partitions: usize,
108 file_format: Arc<dyn FileFormat>,
109 file_compression_type: FileCompressionType,
110 work_dir: &Path,
111) -> Result<Vec<FileGroup>> {
112 let path = format!("{path}/{filename}");
113
114 let mut writers = vec![];
115 let mut files = vec![];
116 for i in 0..partitions {
117 let filename = format!(
118 "partition-{}{}",
119 i,
120 file_format
121 .get_ext_with_compression(&file_compression_type)
122 .unwrap()
123 );
124 let filename = work_dir.join(filename);
125
126 let file = File::create(&filename).unwrap();
127
128 let encoder: Box<dyn Write + Send> = match file_compression_type.to_owned() {
129 FileCompressionType::UNCOMPRESSED => Box::new(file),
130 #[cfg(feature = "compression")]
131 FileCompressionType::GZIP => {
132 Box::new(GzEncoder::new(file, GzCompression::default()))
133 }
134 #[cfg(feature = "compression")]
135 FileCompressionType::XZ => Box::new(XzEncoder::new(file, 9)),
136 #[cfg(feature = "compression")]
137 FileCompressionType::ZSTD => {
138 let encoder = ZstdEncoder::new(file, 0)
139 .map_err(|e| DataFusionError::External(Box::new(e)))?
140 .auto_finish();
141 Box::new(encoder)
142 }
143 #[cfg(feature = "compression")]
144 FileCompressionType::BZIP2 => {
145 Box::new(BzEncoder::new(file, BzCompression::default()))
146 }
147 #[cfg(not(feature = "compression"))]
148 FileCompressionType::GZIP
149 | FileCompressionType::BZIP2
150 | FileCompressionType::XZ
151 | FileCompressionType::ZSTD => {
152 panic!("Compression is not supported in this build")
153 }
154 };
155
156 let writer = BufWriter::new(encoder);
157 writers.push(writer);
158 files.push(filename);
159 }
160
161 let f = File::open(path)?;
162 let f = BufReader::new(f);
163 for (i, line) in f.lines().enumerate() {
164 let line = line.unwrap();
165
166 if i == 0 && file_format.get_ext() == CsvFormat::default().get_ext() {
167 for w in writers.iter_mut() {
169 w.write_all(line.as_bytes()).unwrap();
170 w.write_all(b"\n").unwrap();
171 }
172 } else {
173 let partition = i % partitions;
175 writers[partition].write_all(line.as_bytes()).unwrap();
176 writers[partition].write_all(b"\n").unwrap();
177 }
178 }
179
180 for mut w in writers.into_iter() {
183 w.flush().unwrap();
184 }
185
186 Ok(files
187 .into_iter()
188 .map(|f| FileGroup::new(vec![local_unpartitioned_file(f).into()]))
189 .collect::<Vec<_>>())
190}
191
192pub fn assert_fields_eq(plan: &LogicalPlan, expected: Vec<&str>) {
193 let actual: Vec<String> = plan
194 .schema()
195 .fields()
196 .iter()
197 .map(|f| f.name().clone())
198 .collect();
199 assert_eq!(actual, expected);
200}
201
202pub fn columns(schema: &Schema) -> Vec<String> {
204 schema.fields().iter().map(|f| f.name().clone()).collect()
205}
206
207pub fn table_with_sequence(
210 seq_start: i32,
211 seq_end: i32,
212) -> Result<Arc<dyn TableProvider>> {
213 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
214 let arr = Arc::new(Int32Array::from((seq_start..=seq_end).collect::<Vec<_>>()));
215 let partitions = vec![vec![RecordBatch::try_new(
216 Arc::<Schema>::clone(&schema),
217 vec![arr as ArrayRef],
218 )?]];
219 Ok(Arc::new(MemTable::try_new(schema, partitions)?))
220}
221
222pub fn make_partition(sz: i32) -> RecordBatch {
224 let seq_start = 0;
225 let seq_end = sz;
226 let values = (seq_start..seq_end).collect::<Vec<_>>();
227 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
228 let arr = Arc::new(Int32Array::from(values));
229 let arr = arr as ArrayRef;
230
231 RecordBatch::try_new(schema, vec![arr]).unwrap()
232}
233
234pub fn table_with_decimal() -> Arc<dyn TableProvider> {
236 let batch_decimal = make_decimal();
237 let schema = batch_decimal.schema();
238 let partitions = vec![vec![batch_decimal]];
239 Arc::new(MemTable::try_new(schema, partitions).unwrap())
240}
241
242fn make_decimal() -> RecordBatch {
243 let mut decimal_builder = Decimal128Builder::with_capacity(20);
244 for i in 110000..110010 {
245 decimal_builder.append_value(i as i128);
246 }
247 for i in 100000..100010 {
248 decimal_builder.append_value(-i as i128);
249 }
250 let array = decimal_builder
251 .finish()
252 .with_precision_and_scale(10, 3)
253 .unwrap();
254 let schema = Schema::new(vec![Field::new("c1", array.data_type().clone(), true)]);
255 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap()
256}
257
258pub mod object_store;
259pub mod variable;