use std::sync::Arc;
use crate::datasource::physical_plan::plan_to_csv;
use super::super::options::{CsvReadOptions, ReadOptions};
use super::{DataFilePaths, DataFrame, ExecutionPlan, Result, SessionContext};
impl SessionContext {
pub async fn read_csv<P: DataFilePaths>(
&self,
table_paths: P,
options: CsvReadOptions<'_>,
) -> Result<DataFrame> {
self._read_type(table_paths, options).await
}
pub async fn register_csv(
&self,
name: &str,
table_path: &str,
options: CsvReadOptions<'_>,
) -> Result<()> {
let listing_options = options.to_listing_options(&self.copied_config());
self.register_listing_table(
name,
table_path,
listing_options,
options.schema.map(|s| Arc::new(s.to_owned())),
None,
)
.await?;
Ok(())
}
pub async fn write_csv(
&self,
plan: Arc<dyn ExecutionPlan>,
path: impl AsRef<str>,
) -> Result<()> {
plan_to_csv(self.task_ctx(), plan, path).await
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::assert_batches_eq;
use crate::test_util::{plan_and_collect, populate_csv_partitions};
use async_trait::async_trait;
use tempfile::TempDir;
#[tokio::test]
async fn query_csv_with_custom_partition_extension() -> Result<()> {
let tmp_dir = TempDir::new()?;
let file_extension = ".tst";
let ctx = SessionContext::new();
let schema = populate_csv_partitions(&tmp_dir, 2, file_extension)?;
ctx.register_csv(
"test",
tmp_dir.path().to_str().unwrap(),
CsvReadOptions::new()
.schema(&schema)
.file_extension(file_extension),
)
.await?;
let results =
plan_and_collect(&ctx, "SELECT SUM(c1), SUM(c2), COUNT(*) FROM test").await?;
assert_eq!(results.len(), 1);
let expected = [
"+--------------+--------------+----------+",
"| SUM(test.c1) | SUM(test.c2) | COUNT(*) |",
"+--------------+--------------+----------+",
"| 10 | 110 | 20 |",
"+--------------+--------------+----------+",
];
assert_batches_eq!(expected, &results);
Ok(())
}
#[async_trait]
trait CallReadTrait {
async fn call_read_csv(&self) -> DataFrame;
}
struct CallRead {}
#[async_trait]
impl CallReadTrait for CallRead {
async fn call_read_csv(&self) -> DataFrame {
let ctx = SessionContext::new();
ctx.read_csv("dummy", CsvReadOptions::new()).await.unwrap()
}
}
}