Deprecated: The each() function is deprecated. This message will be suppressed on further calls in /home/zhenxiangba/zhenxiangba.com/public_html/phproxy-improved-master/index.php on line 456
mod.rs - source
[go: Go Back, main page]

datafusion/test/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Common unit test utility methods
19
20#![allow(missing_docs)]
21
22use std::fs::File;
23use std::io::prelude::*;
24use std::io::{BufReader, BufWriter};
25use std::path::Path;
26use std::sync::Arc;
27
28use crate::datasource::file_format::csv::CsvFormat;
29use crate::datasource::file_format::file_compression_type::FileCompressionType;
30use crate::datasource::file_format::FileFormat;
31
32use crate::datasource::physical_plan::CsvSource;
33use crate::datasource::{MemTable, TableProvider};
34use crate::error::Result;
35use crate::logical_expr::LogicalPlan;
36use crate::test_util::{aggr_test_schema, arrow_test_data};
37
38use arrow::array::{self, Array, ArrayRef, Decimal128Builder, Int32Array};
39use arrow::datatypes::{DataType, Field, Schema};
40use arrow::record_batch::RecordBatch;
41use datafusion_common::DataFusionError;
42use datafusion_datasource::source::DataSourceExec;
43
44#[cfg(feature = "compression")]
45use bzip2::write::BzEncoder;
46#[cfg(feature = "compression")]
47use bzip2::Compression as BzCompression;
48use datafusion_datasource::file_groups::FileGroup;
49use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
50use datafusion_datasource_csv::partitioned_csv_config;
51#[cfg(feature = "compression")]
52use flate2::write::GzEncoder;
53#[cfg(feature = "compression")]
54use flate2::Compression as GzCompression;
55use object_store::local_unpartitioned_file;
56#[cfg(feature = "compression")]
57use xz2::write::XzEncoder;
58#[cfg(feature = "compression")]
59use zstd::Encoder as ZstdEncoder;
60
61pub fn create_table_dual() -> Arc<dyn TableProvider> {
62    let dual_schema = Arc::new(Schema::new(vec![
63        Field::new("id", DataType::Int32, false),
64        Field::new("name", DataType::Utf8, false),
65    ]));
66    let batch = RecordBatch::try_new(
67        Arc::<Schema>::clone(&dual_schema),
68        vec![
69            Arc::new(Int32Array::from(vec![1])),
70            Arc::new(array::StringArray::from(vec!["a"])),
71        ],
72    )
73    .unwrap();
74    let provider = MemTable::try_new(dual_schema, vec![vec![batch]]).unwrap();
75    Arc::new(provider)
76}
77
78/// Returns a [`DataSourceExec`] that scans "aggregate_test_100.csv" with `partitions` partitions
79pub fn scan_partitioned_csv(
80    partitions: usize,
81    work_dir: &Path,
82) -> Result<Arc<DataSourceExec>> {
83    let schema = aggr_test_schema();
84    let filename = "aggregate_test_100.csv";
85    let path = format!("{}/csv", arrow_test_data());
86    let file_groups = partitioned_file_groups(
87        path.as_str(),
88        filename,
89        partitions,
90        Arc::new(CsvFormat::default()),
91        FileCompressionType::UNCOMPRESSED,
92        work_dir,
93    )?;
94    let source = Arc::new(CsvSource::new(true, b'"', b'"'));
95    let config =
96        FileScanConfigBuilder::from(partitioned_csv_config(schema, file_groups, source))
97            .with_file_compression_type(FileCompressionType::UNCOMPRESSED)
98            .build();
99    Ok(DataSourceExec::from_data_source(config))
100}
101
102/// Returns file groups [`Vec<FileGroup>`] for scanning `partitions` of `filename`
103pub fn partitioned_file_groups(
104    path: &str,
105    filename: &str,
106    partitions: usize,
107    file_format: Arc<dyn FileFormat>,
108    file_compression_type: FileCompressionType,
109    work_dir: &Path,
110) -> Result<Vec<FileGroup>> {
111    let path = format!("{path}/{filename}");
112
113    let mut writers = vec![];
114    let mut files = vec![];
115    for i in 0..partitions {
116        let filename = format!(
117            "partition-{}{}",
118            i,
119            file_format
120                .get_ext_with_compression(&file_compression_type)
121                .unwrap()
122        );
123        let filename = work_dir.join(filename);
124
125        let file = File::create(&filename).unwrap();
126
127        let encoder: Box<dyn Write + Send> = match file_compression_type.to_owned() {
128            FileCompressionType::UNCOMPRESSED => Box::new(file),
129            #[cfg(feature = "compression")]
130            FileCompressionType::GZIP => {
131                Box::new(GzEncoder::new(file, GzCompression::default()))
132            }
133            #[cfg(feature = "compression")]
134            FileCompressionType::XZ => Box::new(XzEncoder::new(file, 9)),
135            #[cfg(feature = "compression")]
136            FileCompressionType::ZSTD => {
137                let encoder = ZstdEncoder::new(file, 0)
138                    .map_err(|e| DataFusionError::External(Box::new(e)))?
139                    .auto_finish();
140                Box::new(encoder)
141            }
142            #[cfg(feature = "compression")]
143            FileCompressionType::BZIP2 => {
144                Box::new(BzEncoder::new(file, BzCompression::default()))
145            }
146            #[cfg(not(feature = "compression"))]
147            FileCompressionType::GZIP
148            | FileCompressionType::BZIP2
149            | FileCompressionType::XZ
150            | FileCompressionType::ZSTD => {
151                panic!("Compression is not supported in this build")
152            }
153        };
154
155        let writer = BufWriter::new(encoder);
156        writers.push(writer);
157        files.push(filename);
158    }
159
160    let f = File::open(path)?;
161    let f = BufReader::new(f);
162    for (i, line) in f.lines().enumerate() {
163        let line = line.unwrap();
164
165        if i == 0 && file_format.get_ext() == CsvFormat::default().get_ext() {
166            // write header to all partitions
167            for w in writers.iter_mut() {
168                w.write_all(line.as_bytes()).unwrap();
169                w.write_all(b"\n").unwrap();
170            }
171        } else {
172            // write data line to single partition
173            let partition = i % partitions;
174            writers[partition].write_all(line.as_bytes()).unwrap();
175            writers[partition].write_all(b"\n").unwrap();
176        }
177    }
178
179    // Must drop the stream before creating ObjectMeta below as drop triggers
180    // finish for ZstdEncoder/BzEncoder which writes additional data
181    for mut w in writers.into_iter() {
182        w.flush().unwrap();
183    }
184
185    Ok(files
186        .into_iter()
187        .map(|f| FileGroup::new(vec![local_unpartitioned_file(f).into()]))
188        .collect::<Vec<_>>())
189}
190
191pub fn assert_fields_eq(plan: &LogicalPlan, expected: Vec<&str>) {
192    let actual: Vec<String> = plan
193        .schema()
194        .fields()
195        .iter()
196        .map(|f| f.name().clone())
197        .collect();
198    assert_eq!(actual, expected);
199}
200
201/// Returns the column names on the schema
202pub fn columns(schema: &Schema) -> Vec<String> {
203    schema.fields().iter().map(|f| f.name().clone()).collect()
204}
205
206/// Return a new table provider that has a single Int32 column with
207/// values between `seq_start` and `seq_end`
208pub fn table_with_sequence(
209    seq_start: i32,
210    seq_end: i32,
211) -> Result<Arc<dyn TableProvider>> {
212    let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
213    let arr = Arc::new(Int32Array::from((seq_start..=seq_end).collect::<Vec<_>>()));
214    let partitions = vec![vec![RecordBatch::try_new(
215        Arc::<Schema>::clone(&schema),
216        vec![arr as ArrayRef],
217    )?]];
218    Ok(Arc::new(MemTable::try_new(schema, partitions)?))
219}
220
221/// Return a RecordBatch with a single Int32 array with values (0..sz)
222pub fn make_partition(sz: i32) -> RecordBatch {
223    let seq_start = 0;
224    let seq_end = sz;
225    let values = (seq_start..seq_end).collect::<Vec<_>>();
226    let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
227    let arr = Arc::new(Int32Array::from(values));
228    let arr = arr as ArrayRef;
229
230    RecordBatch::try_new(schema, vec![arr]).unwrap()
231}
232
233/// Return a new table which provide this decimal column
234pub fn table_with_decimal() -> Arc<dyn TableProvider> {
235    let batch_decimal = make_decimal();
236    let schema = batch_decimal.schema();
237    let partitions = vec![vec![batch_decimal]];
238    Arc::new(MemTable::try_new(schema, partitions).unwrap())
239}
240
241fn make_decimal() -> RecordBatch {
242    let mut decimal_builder = Decimal128Builder::with_capacity(20);
243    for i in 110000..110010 {
244        decimal_builder.append_value(i as i128);
245    }
246    for i in 100000..100010 {
247        decimal_builder.append_value(-i as i128);
248    }
249    let array = decimal_builder
250        .finish()
251        .with_precision_and_scale(10, 3)
252        .unwrap();
253    let schema = Schema::new(vec![Field::new("c1", array.data_type().clone(), true)]);
254    RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap()
255}
256
257pub mod object_store;
258pub mod variable;