1#![allow(missing_docs)]
21
22use std::fs::File;
23use std::io::prelude::*;
24use std::io::{BufReader, BufWriter};
25use std::path::Path;
26use std::sync::Arc;
27
28use crate::datasource::file_format::csv::CsvFormat;
29use crate::datasource::file_format::file_compression_type::FileCompressionType;
30use crate::datasource::file_format::FileFormat;
31
32use crate::datasource::physical_plan::CsvSource;
33use crate::datasource::{MemTable, TableProvider};
34use crate::error::Result;
35use crate::logical_expr::LogicalPlan;
36use crate::test_util::{aggr_test_schema, arrow_test_data};
37
38use arrow::array::{self, Array, ArrayRef, Decimal128Builder, Int32Array};
39use arrow::datatypes::{DataType, Field, Schema};
40use arrow::record_batch::RecordBatch;
41use datafusion_common::DataFusionError;
42use datafusion_datasource::source::DataSourceExec;
43
44#[cfg(feature = "compression")]
45use bzip2::write::BzEncoder;
46#[cfg(feature = "compression")]
47use bzip2::Compression as BzCompression;
48use datafusion_datasource::file_groups::FileGroup;
49use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
50use datafusion_datasource_csv::partitioned_csv_config;
51#[cfg(feature = "compression")]
52use flate2::write::GzEncoder;
53#[cfg(feature = "compression")]
54use flate2::Compression as GzCompression;
55use object_store::local_unpartitioned_file;
56#[cfg(feature = "compression")]
57use xz2::write::XzEncoder;
58#[cfg(feature = "compression")]
59use zstd::Encoder as ZstdEncoder;
60
61pub fn create_table_dual() -> Arc<dyn TableProvider> {
62 let dual_schema = Arc::new(Schema::new(vec![
63 Field::new("id", DataType::Int32, false),
64 Field::new("name", DataType::Utf8, false),
65 ]));
66 let batch = RecordBatch::try_new(
67 Arc::<Schema>::clone(&dual_schema),
68 vec![
69 Arc::new(Int32Array::from(vec![1])),
70 Arc::new(array::StringArray::from(vec!["a"])),
71 ],
72 )
73 .unwrap();
74 let provider = MemTable::try_new(dual_schema, vec![vec![batch]]).unwrap();
75 Arc::new(provider)
76}
77
78pub fn scan_partitioned_csv(
80 partitions: usize,
81 work_dir: &Path,
82) -> Result<Arc<DataSourceExec>> {
83 let schema = aggr_test_schema();
84 let filename = "aggregate_test_100.csv";
85 let path = format!("{}/csv", arrow_test_data());
86 let file_groups = partitioned_file_groups(
87 path.as_str(),
88 filename,
89 partitions,
90 Arc::new(CsvFormat::default()),
91 FileCompressionType::UNCOMPRESSED,
92 work_dir,
93 )?;
94 let source = Arc::new(CsvSource::new(true, b'"', b'"'));
95 let config =
96 FileScanConfigBuilder::from(partitioned_csv_config(schema, file_groups, source))
97 .with_file_compression_type(FileCompressionType::UNCOMPRESSED)
98 .build();
99 Ok(DataSourceExec::from_data_source(config))
100}
101
102pub fn partitioned_file_groups(
104 path: &str,
105 filename: &str,
106 partitions: usize,
107 file_format: Arc<dyn FileFormat>,
108 file_compression_type: FileCompressionType,
109 work_dir: &Path,
110) -> Result<Vec<FileGroup>> {
111 let path = format!("{path}/{filename}");
112
113 let mut writers = vec![];
114 let mut files = vec![];
115 for i in 0..partitions {
116 let filename = format!(
117 "partition-{}{}",
118 i,
119 file_format
120 .get_ext_with_compression(&file_compression_type)
121 .unwrap()
122 );
123 let filename = work_dir.join(filename);
124
125 let file = File::create(&filename).unwrap();
126
127 let encoder: Box<dyn Write + Send> = match file_compression_type.to_owned() {
128 FileCompressionType::UNCOMPRESSED => Box::new(file),
129 #[cfg(feature = "compression")]
130 FileCompressionType::GZIP => {
131 Box::new(GzEncoder::new(file, GzCompression::default()))
132 }
133 #[cfg(feature = "compression")]
134 FileCompressionType::XZ => Box::new(XzEncoder::new(file, 9)),
135 #[cfg(feature = "compression")]
136 FileCompressionType::ZSTD => {
137 let encoder = ZstdEncoder::new(file, 0)
138 .map_err(|e| DataFusionError::External(Box::new(e)))?
139 .auto_finish();
140 Box::new(encoder)
141 }
142 #[cfg(feature = "compression")]
143 FileCompressionType::BZIP2 => {
144 Box::new(BzEncoder::new(file, BzCompression::default()))
145 }
146 #[cfg(not(feature = "compression"))]
147 FileCompressionType::GZIP
148 | FileCompressionType::BZIP2
149 | FileCompressionType::XZ
150 | FileCompressionType::ZSTD => {
151 panic!("Compression is not supported in this build")
152 }
153 };
154
155 let writer = BufWriter::new(encoder);
156 writers.push(writer);
157 files.push(filename);
158 }
159
160 let f = File::open(path)?;
161 let f = BufReader::new(f);
162 for (i, line) in f.lines().enumerate() {
163 let line = line.unwrap();
164
165 if i == 0 && file_format.get_ext() == CsvFormat::default().get_ext() {
166 for w in writers.iter_mut() {
168 w.write_all(line.as_bytes()).unwrap();
169 w.write_all(b"\n").unwrap();
170 }
171 } else {
172 let partition = i % partitions;
174 writers[partition].write_all(line.as_bytes()).unwrap();
175 writers[partition].write_all(b"\n").unwrap();
176 }
177 }
178
179 for mut w in writers.into_iter() {
182 w.flush().unwrap();
183 }
184
185 Ok(files
186 .into_iter()
187 .map(|f| FileGroup::new(vec![local_unpartitioned_file(f).into()]))
188 .collect::<Vec<_>>())
189}
190
191pub fn assert_fields_eq(plan: &LogicalPlan, expected: Vec<&str>) {
192 let actual: Vec<String> = plan
193 .schema()
194 .fields()
195 .iter()
196 .map(|f| f.name().clone())
197 .collect();
198 assert_eq!(actual, expected);
199}
200
201pub fn columns(schema: &Schema) -> Vec<String> {
203 schema.fields().iter().map(|f| f.name().clone()).collect()
204}
205
206pub fn table_with_sequence(
209 seq_start: i32,
210 seq_end: i32,
211) -> Result<Arc<dyn TableProvider>> {
212 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
213 let arr = Arc::new(Int32Array::from((seq_start..=seq_end).collect::<Vec<_>>()));
214 let partitions = vec![vec![RecordBatch::try_new(
215 Arc::<Schema>::clone(&schema),
216 vec![arr as ArrayRef],
217 )?]];
218 Ok(Arc::new(MemTable::try_new(schema, partitions)?))
219}
220
221pub fn make_partition(sz: i32) -> RecordBatch {
223 let seq_start = 0;
224 let seq_end = sz;
225 let values = (seq_start..seq_end).collect::<Vec<_>>();
226 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
227 let arr = Arc::new(Int32Array::from(values));
228 let arr = arr as ArrayRef;
229
230 RecordBatch::try_new(schema, vec![arr]).unwrap()
231}
232
233pub fn table_with_decimal() -> Arc<dyn TableProvider> {
235 let batch_decimal = make_decimal();
236 let schema = batch_decimal.schema();
237 let partitions = vec![vec![batch_decimal]];
238 Arc::new(MemTable::try_new(schema, partitions).unwrap())
239}
240
241fn make_decimal() -> RecordBatch {
242 let mut decimal_builder = Decimal128Builder::with_capacity(20);
243 for i in 110000..110010 {
244 decimal_builder.append_value(i as i128);
245 }
246 for i in 100000..100010 {
247 decimal_builder.append_value(-i as i128);
248 }
249 let array = decimal_builder
250 .finish()
251 .with_precision_and_scale(10, 3)
252 .unwrap();
253 let schema = Schema::new(vec![Field::new("c1", array.data_type().clone(), true)]);
254 RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap()
255}
256
257pub mod object_store;
258pub mod variable;