use std::sync::Arc;
use crate::datasource::physical_plan::parquet::plan_to_parquet;
use parquet::file::properties::WriterProperties;
use super::super::options::{ParquetReadOptions, ReadOptions};
use super::{DataFilePaths, DataFrame, ExecutionPlan, Result, SessionContext};
impl SessionContext {
pub async fn read_parquet<P: DataFilePaths>(
&self,
table_paths: P,
options: ParquetReadOptions<'_>,
) -> Result<DataFrame> {
self._read_type(table_paths, options).await
}
pub async fn register_parquet(
&self,
name: &str,
table_path: &str,
options: ParquetReadOptions<'_>,
) -> Result<()> {
let listing_options = options.to_listing_options(&self.state.read().config);
self.register_listing_table(
name,
table_path,
listing_options,
options.schema.map(|s| Arc::new(s.to_owned())),
None,
)
.await?;
Ok(())
}
pub async fn write_parquet(
&self,
plan: Arc<dyn ExecutionPlan>,
path: impl AsRef<str>,
writer_properties: Option<WriterProperties>,
) -> Result<()> {
plan_to_parquet(self.task_ctx(), plan, path, writer_properties).await
}
}
#[cfg(test)]
mod tests {
use async_trait::async_trait;
use crate::arrow::array::{Float32Array, Int32Array};
use crate::arrow::datatypes::{DataType, Field, Schema};
use crate::arrow::record_batch::RecordBatch;
use crate::dataframe::DataFrameWriteOptions;
use crate::parquet::basic::Compression;
use crate::test_util::parquet_test_data;
use tempfile::tempdir;
use super::*;
#[tokio::test]
async fn read_with_glob_path() -> Result<()> {
let ctx = SessionContext::new();
let df = ctx
.read_parquet(
format!("{}/alltypes_plain*.parquet", parquet_test_data()),
ParquetReadOptions::default(),
)
.await?;
let results = df.collect().await?;
let total_rows: usize = results.iter().map(|rb| rb.num_rows()).sum();
assert_eq!(total_rows, 10);
Ok(())
}
#[tokio::test]
async fn read_with_glob_path_issue_2465() -> Result<()> {
let ctx = SessionContext::new();
let df = ctx
.read_parquet(
format!("{}/..//*/alltypes_plain*.parquet", parquet_test_data()),
ParquetReadOptions::default(),
)
.await?;
let results = df.collect().await?;
let total_rows: usize = results.iter().map(|rb| rb.num_rows()).sum();
assert_eq!(total_rows, 10);
Ok(())
}
#[tokio::test]
async fn read_from_registered_table_with_glob_path() -> Result<()> {
let ctx = SessionContext::new();
ctx.register_parquet(
"test",
&format!("{}/alltypes_plain*.parquet", parquet_test_data()),
ParquetReadOptions::default(),
)
.await?;
let df = ctx.sql("SELECT * FROM test").await?;
let results = df.collect().await?;
let total_rows: usize = results.iter().map(|rb| rb.num_rows()).sum();
assert_eq!(total_rows, 10);
Ok(())
}
#[tokio::test]
async fn read_from_different_file_extension() -> Result<()> {
let ctx = SessionContext::new();
let sep = std::path::MAIN_SEPARATOR.to_string();
let write_df = ctx.read_batch(RecordBatch::try_new(
Arc::new(Schema::new(vec![
Field::new("purchase_id", DataType::Int32, false),
Field::new("price", DataType::Float32, false),
Field::new("quantity", DataType::Int32, false),
])),
vec![
Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])),
Arc::new(Float32Array::from(vec![1.12, 3.40, 2.33, 9.10, 6.66])),
Arc::new(Int32Array::from(vec![1, 3, 2, 4, 3])),
],
)?)?;
let temp_dir = tempdir()?;
let temp_dir_path = temp_dir.path();
let path1 = temp_dir_path
.join("output1.parquet")
.to_str()
.unwrap()
.to_string();
let path2 = temp_dir_path
.join("output2.parquet.snappy")
.to_str()
.unwrap()
.to_string();
let path3 = temp_dir_path
.join("output3.parquet.snappy.parquet")
.to_str()
.unwrap()
.to_string();
let path4 = temp_dir_path
.join("output4.parquet".to_owned() + &sep)
.to_str()
.unwrap()
.to_string();
let path5 = temp_dir_path
.join("bbb..bbb")
.join("filename.parquet")
.to_str()
.unwrap()
.to_string();
let dir = temp_dir_path
.join("bbb..bbb".to_owned() + &sep)
.to_str()
.unwrap()
.to_string();
std::fs::create_dir(dir).expect("create dir failed");
write_df
.clone()
.write_parquet(
&path1,
DataFrameWriteOptions::new().with_single_file_output(true),
Some(
WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.build(),
),
)
.await?;
write_df
.clone()
.write_parquet(
&path2,
DataFrameWriteOptions::new().with_single_file_output(true),
Some(
WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.build(),
),
)
.await?;
write_df
.clone()
.write_parquet(
&path3,
DataFrameWriteOptions::new().with_single_file_output(true),
Some(
WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.build(),
),
)
.await?;
write_df
.write_parquet(
&path5,
DataFrameWriteOptions::new().with_single_file_output(true),
Some(
WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.build(),
),
)
.await?;
let read_df = ctx
.read_parquet(
&path1,
ParquetReadOptions {
..Default::default()
},
)
.await?;
let results = read_df.collect().await?;
let total_rows: usize = results.iter().map(|rb| rb.num_rows()).sum();
assert_eq!(total_rows, 5);
let read_df = ctx
.read_parquet(
&path2,
ParquetReadOptions {
file_extension: "snappy",
..Default::default()
},
)
.await?;
let results = read_df.collect().await?;
let total_rows: usize = results.iter().map(|rb| rb.num_rows()).sum();
assert_eq!(total_rows, 5);
let read_df = ctx
.read_parquet(
&path2,
ParquetReadOptions {
..Default::default()
},
)
.await;
let binding = DataFilePaths::to_urls(&path2).unwrap();
let expexted_path = binding[0].as_str();
assert_eq!(
read_df.unwrap_err().strip_backtrace(),
format!("Execution error: File path '{}' does not match the expected extension '.parquet'", expexted_path)
);
let read_df = ctx
.read_parquet(
&path3,
ParquetReadOptions {
..Default::default()
},
)
.await?;
let results = read_df.collect().await?;
let total_rows: usize = results.iter().map(|rb| rb.num_rows()).sum();
assert_eq!(total_rows, 5);
std::fs::create_dir(&path4)?;
let read_df = ctx
.read_parquet(
&path4,
ParquetReadOptions {
..Default::default()
},
)
.await?;
let results = read_df.collect().await?;
let total_rows: usize = results.iter().map(|rb| rb.num_rows()).sum();
assert_eq!(total_rows, 0);
let read_df = ctx
.read_parquet(
&path5,
ParquetReadOptions {
..Default::default()
},
)
.await?;
let results = read_df.collect().await?;
let total_rows: usize = results.iter().map(|rb| rb.num_rows()).sum();
assert_eq!(total_rows, 5);
Ok(())
}
#[async_trait]
trait CallReadTrait {
async fn call_read_parquet(&self) -> DataFrame;
}
struct CallRead {}
#[async_trait]
impl CallReadTrait for CallRead {
async fn call_read_parquet(&self) -> DataFrame {
let ctx = SessionContext::new();
ctx.read_parquet("dummy", ParquetReadOptions::default())
.await
.unwrap()
}
}
}