datafusion/dataframe/mod.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`DataFrame`] API for building and executing query plans.
19
20#[cfg(feature = "parquet")]
21mod parquet;
22
23use crate::arrow::record_batch::RecordBatch;
24use crate::arrow::util::pretty;
25use crate::datasource::file_format::csv::CsvFormatFactory;
26use crate::datasource::file_format::format_as_file_type;
27use crate::datasource::file_format::json::JsonFormatFactory;
28use crate::datasource::{
29 provider_as_source, DefaultTableSource, MemTable, TableProvider,
30};
31use crate::error::Result;
32use crate::execution::context::{SessionState, TaskContext};
33use crate::execution::FunctionRegistry;
34use crate::logical_expr::utils::find_window_exprs;
35use crate::logical_expr::{
36 col, ident, Expr, JoinType, LogicalPlan, LogicalPlanBuilder,
37 LogicalPlanBuilderOptions, Partitioning, TableType,
38};
39use crate::physical_plan::{
40 collect, collect_partitioned, execute_stream, execute_stream_partitioned,
41 ExecutionPlan, SendableRecordBatchStream,
42};
43use crate::prelude::SessionContext;
44use std::any::Any;
45use std::borrow::Cow;
46use std::collections::HashMap;
47use std::sync::Arc;
48
49use arrow::array::{Array, ArrayRef, Int64Array, StringArray};
50use arrow::compute::{cast, concat};
51use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
52use datafusion_common::config::{CsvOptions, JsonOptions};
53use datafusion_common::{
54 exec_err, not_impl_err, plan_datafusion_err, plan_err, Column, DFSchema,
55 DataFusionError, ParamValues, ScalarValue, SchemaError, UnnestOptions,
56};
57use datafusion_expr::select_expr::SelectExpr;
58use datafusion_expr::{
59 case,
60 dml::InsertOp,
61 expr::{Alias, ScalarFunction},
62 is_null, lit,
63 utils::COUNT_STAR_EXPANSION,
64 SortExpr, TableProviderFilterPushDown, UNNAMED_TABLE,
65};
66use datafusion_functions::core::coalesce;
67use datafusion_functions_aggregate::expr_fn::{
68 avg, count, max, median, min, stddev, sum,
69};
70
71use async_trait::async_trait;
72use datafusion_catalog::Session;
73use datafusion_sql::TableReference;
74
75/// Contains options that control how data is
76/// written out from a DataFrame
77pub struct DataFrameWriteOptions {
78 /// Controls how new data should be written to the table, determining whether
79 /// to append, overwrite, or replace existing data.
80 insert_op: InsertOp,
81 /// Controls if all partitions should be coalesced into a single output file
82 /// Generally will have slower performance when set to true.
83 single_file_output: bool,
84 /// Sets which columns should be used for hive-style partitioned writes by name.
85 /// Can be set to empty vec![] for non-partitioned writes.
86 partition_by: Vec<String>,
87 /// Sets which columns should be used for sorting the output by name.
88 /// Can be set to empty vec![] for non-sorted writes.
89 sort_by: Vec<SortExpr>,
90}
91
92impl DataFrameWriteOptions {
93 /// Create a new DataFrameWriteOptions with default values
94 pub fn new() -> Self {
95 DataFrameWriteOptions {
96 insert_op: InsertOp::Append,
97 single_file_output: false,
98 partition_by: vec![],
99 sort_by: vec![],
100 }
101 }
102
103 /// Set the insert operation
104 pub fn with_insert_operation(mut self, insert_op: InsertOp) -> Self {
105 self.insert_op = insert_op;
106 self
107 }
108
109 /// Set the single_file_output value to true or false
110 pub fn with_single_file_output(mut self, single_file_output: bool) -> Self {
111 self.single_file_output = single_file_output;
112 self
113 }
114
115 /// Sets the partition_by columns for output partitioning
116 pub fn with_partition_by(mut self, partition_by: Vec<String>) -> Self {
117 self.partition_by = partition_by;
118 self
119 }
120
121 /// Sets the sort_by columns for output sorting
122 pub fn with_sort_by(mut self, sort_by: Vec<SortExpr>) -> Self {
123 self.sort_by = sort_by;
124 self
125 }
126}
127
128impl Default for DataFrameWriteOptions {
129 fn default() -> Self {
130 Self::new()
131 }
132}
133
134/// Represents a logical set of rows with the same named columns.
135///
136/// Similar to a [Pandas DataFrame] or [Spark DataFrame], a DataFusion DataFrame
137/// represents a 2 dimensional table of rows and columns.
138///
139/// The typical workflow using DataFrames looks like
140///
141/// 1. Create a DataFrame via methods on [SessionContext], such as [`read_csv`]
142/// and [`read_parquet`].
143///
144/// 2. Build a desired calculation by calling methods such as [`filter`],
145/// [`select`], [`aggregate`], and [`limit`]
146///
147/// 3. Execute into [`RecordBatch`]es by calling [`collect`]
148///
149/// A `DataFrame` is a wrapper around a [`LogicalPlan`] and the [`SessionState`]
150/// required for execution.
151///
152/// DataFrames are "lazy" in the sense that most methods do not actually compute
153/// anything, they just build up a plan. Calling [`collect`] executes the plan
154/// using the same DataFusion planning and execution process used to execute SQL
155/// and other queries.
156///
157/// [Pandas DataFrame]: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html
158/// [Spark DataFrame]: https://spark.apache.org/docs/latest/sql-programming-guide.html
159/// [`read_csv`]: SessionContext::read_csv
160/// [`read_parquet`]: SessionContext::read_parquet
161/// [`filter`]: DataFrame::filter
162/// [`select`]: DataFrame::select
163/// [`aggregate`]: DataFrame::aggregate
164/// [`limit`]: DataFrame::limit
165/// [`collect`]: DataFrame::collect
166///
167/// # Example
168/// ```
169/// # use std::sync::Arc;
170/// # use datafusion::prelude::*;
171/// # use datafusion::error::Result;
172/// # use datafusion::functions_aggregate::expr_fn::min;
173/// # use datafusion::arrow::array::{Int32Array, RecordBatch, StringArray};
174/// # use datafusion::arrow::datatypes::{DataType, Field, Schema};
175/// # #[tokio::main]
176/// # async fn main() -> Result<()> {
177/// let ctx = SessionContext::new();
178/// // Read the data from a csv file
179/// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
180/// // create a new dataframe that computes the equivalent of
181/// // `SELECT a, MIN(b) FROM df WHERE a <= b GROUP BY a LIMIT 100;`
182/// let df = df.filter(col("a").lt_eq(col("b")))?
183/// .aggregate(vec![col("a")], vec![min(col("b"))])?
184/// .limit(0, Some(100))?;
185/// // Perform the actual computation
186/// let results = df.collect();
187///
188/// // Create a new dataframe with in-memory data
189/// let schema = Schema::new(vec![
190/// Field::new("id", DataType::Int32, true),
191/// Field::new("name", DataType::Utf8, true),
192/// ]);
193/// let batch = RecordBatch::try_new(
194/// Arc::new(schema),
195/// vec![
196/// Arc::new(Int32Array::from(vec![1, 2, 3])),
197/// Arc::new(StringArray::from(vec!["foo", "bar", "baz"])),
198/// ],
199/// )?;
200/// let df = ctx.read_batch(batch)?;
201/// df.show().await?;
202///
203/// // Create a new dataframe with in-memory data using macro
204/// let df = dataframe!(
205/// "id" => [1, 2, 3],
206/// "name" => ["foo", "bar", "baz"]
207/// )?;
208/// df.show().await?;
209/// # Ok(())
210/// # }
211/// ```
212#[derive(Debug, Clone)]
213pub struct DataFrame {
214 // Box the (large) SessionState to reduce the size of DataFrame on the stack
215 session_state: Box<SessionState>,
216 plan: LogicalPlan,
217 // Whether projection ops can skip validation or not. This flag if false
218 // allows for an optimization in `with_column` and `with_column_renamed` functions
219 // where the recursive work required to columnize and normalize expressions can
220 // be skipped if set to false. Since these function calls are often chained or
221 // called many times in dataframe operations this can result in a significant
222 // performance gain.
223 //
224 // The conditions where this can be set to false is when the dataframe function
225 // call results in the last operation being a
226 // `LogicalPlanBuilder::from(plan).project(fields)?.build()` or
227 // `LogicalPlanBuilder::from(plan).project_with_validation(fields)?.build()`
228 // call. This requirement guarantees that the plan has had all columnization
229 // and normalization applied to existing expressions and only new expressions
230 // will require that work. Any operation that update the plan in any way
231 // via anything other than a `project` call should set this to true.
232 projection_requires_validation: bool,
233}
234
235impl DataFrame {
236 /// Create a new `DataFrame ` based on an existing `LogicalPlan`
237 ///
238 /// This is a low-level method and is not typically used by end users. See
239 /// [`SessionContext::read_csv`] and other methods for creating a
240 /// `DataFrame` from an existing datasource.
241 pub fn new(session_state: SessionState, plan: LogicalPlan) -> Self {
242 Self {
243 session_state: Box::new(session_state),
244 plan,
245 projection_requires_validation: true,
246 }
247 }
248
249 /// Creates logical expression from a SQL query text.
250 /// The expression is created and processed against the current schema.
251 ///
252 /// # Example: Parsing SQL queries
253 /// ```
254 /// # use arrow::datatypes::{DataType, Field, Schema};
255 /// # use datafusion::prelude::*;
256 /// # use datafusion_common::{DFSchema, Result};
257 /// # #[tokio::main]
258 /// # async fn main() -> Result<()> {
259 /// // datafusion will parse number as i64 first.
260 /// let sql = "a > 1 and b in (1, 10)";
261 /// let expected = col("a").gt(lit(1 as i64))
262 /// .and(col("b").in_list(vec![lit(1 as i64), lit(10 as i64)], false));
263 /// let ctx = SessionContext::new();
264 /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
265 /// let expr = df.parse_sql_expr(sql)?;
266 /// assert_eq!(expected, expr);
267 /// # Ok(())
268 /// # }
269 /// ```
270 pub fn parse_sql_expr(&self, sql: &str) -> Result<Expr> {
271 let df_schema = self.schema();
272
273 self.session_state.create_logical_expr(sql, df_schema)
274 }
275
276 /// Consume the DataFrame and produce a physical plan
277 pub async fn create_physical_plan(self) -> Result<Arc<dyn ExecutionPlan>> {
278 self.session_state.create_physical_plan(&self.plan).await
279 }
280
281 /// Filter the DataFrame by column. Returns a new DataFrame only containing the
282 /// specified columns.
283 ///
284 /// ```
285 /// # use datafusion::prelude::*;
286 /// # use datafusion::error::Result;
287 /// # use datafusion_common::assert_batches_sorted_eq;
288 /// # #[tokio::main]
289 /// # async fn main() -> Result<()> {
290 /// let ctx = SessionContext::new();
291 /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
292 /// let df = df.select_columns(&["a", "b"])?;
293 /// let expected = vec![
294 /// "+---+---+",
295 /// "| a | b |",
296 /// "+---+---+",
297 /// "| 1 | 2 |",
298 /// "+---+---+"
299 /// ];
300 /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
301 /// # Ok(())
302 /// # }
303 /// ```
304 pub fn select_columns(self, columns: &[&str]) -> Result<DataFrame> {
305 let fields = columns
306 .iter()
307 .map(|name| {
308 self.plan
309 .schema()
310 .qualified_field_with_unqualified_name(name)
311 })
312 .collect::<Result<Vec<_>>>()?;
313 let expr: Vec<Expr> = fields
314 .into_iter()
315 .map(|(qualifier, field)| Expr::Column(Column::from((qualifier, field))))
316 .collect();
317 self.select(expr)
318 }
319 /// Project arbitrary list of expression strings into a new `DataFrame`.
320 /// Method will parse string expressions into logical plan expressions.
321 ///
322 /// The output `DataFrame` has one column for each element in `exprs`.
323 ///
324 /// # Example
325 /// ```
326 /// # use datafusion::prelude::*;
327 /// # use datafusion::error::Result;
328 /// # #[tokio::main]
329 /// # async fn main() -> Result<()> {
330 /// let ctx = SessionContext::new();
331 /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
332 /// let df : DataFrame = df.select_exprs(&["a * b", "c"])?;
333 /// # Ok(())
334 /// # }
335 /// ```
336 pub fn select_exprs(self, exprs: &[&str]) -> Result<DataFrame> {
337 let expr_list = exprs
338 .iter()
339 .map(|e| self.parse_sql_expr(e))
340 .collect::<Result<Vec<_>>>()?;
341
342 self.select(expr_list)
343 }
344
345 /// Project arbitrary expressions (like SQL SELECT expressions) into a new
346 /// `DataFrame`.
347 ///
348 /// The output `DataFrame` has one column for each element in `expr_list`.
349 ///
350 /// # Example
351 /// ```
352 /// # use datafusion::prelude::*;
353 /// # use datafusion::error::Result;
354 /// # use datafusion_common::assert_batches_sorted_eq;
355 /// # #[tokio::main]
356 /// # async fn main() -> Result<()> {
357 /// let ctx = SessionContext::new();
358 /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
359 /// let df = df.select(vec![col("a"), col("b") * col("c")])?;
360 /// let expected = vec![
361 /// "+---+-----------------------+",
362 /// "| a | ?table?.b * ?table?.c |",
363 /// "+---+-----------------------+",
364 /// "| 1 | 6 |",
365 /// "+---+-----------------------+"
366 /// ];
367 /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
368 /// # Ok(())
369 /// # }
370 /// ```
371 pub fn select(
372 self,
373 expr_list: impl IntoIterator<Item = impl Into<SelectExpr>>,
374 ) -> Result<DataFrame> {
375 let expr_list: Vec<SelectExpr> =
376 expr_list.into_iter().map(|e| e.into()).collect::<Vec<_>>();
377
378 let expressions = expr_list
379 .iter()
380 .filter_map(|e| match e {
381 SelectExpr::Expression(expr) => Some(expr.clone()),
382 _ => None,
383 })
384 .collect::<Vec<_>>();
385
386 let window_func_exprs = find_window_exprs(&expressions);
387 let plan = if window_func_exprs.is_empty() {
388 self.plan
389 } else {
390 LogicalPlanBuilder::window_plan(self.plan, window_func_exprs)?
391 };
392
393 let project_plan = LogicalPlanBuilder::from(plan).project(expr_list)?.build()?;
394
395 Ok(DataFrame {
396 session_state: self.session_state,
397 plan: project_plan,
398 projection_requires_validation: false,
399 })
400 }
401
402 /// Returns a new DataFrame containing all columns except the specified columns.
403 ///
404 /// ```
405 /// # use datafusion::prelude::*;
406 /// # use datafusion::error::Result;
407 /// # use datafusion_common::assert_batches_sorted_eq;
408 /// # #[tokio::main]
409 /// # async fn main() -> Result<()> {
410 /// let ctx = SessionContext::new();
411 /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
412 /// // +----+----+----+
413 /// // | a | b | c |
414 /// // +----+----+----+
415 /// // | 1 | 2 | 3 |
416 /// // +----+----+----+
417 /// let df = df.drop_columns(&["a"])?;
418 /// let expected = vec![
419 /// "+---+---+",
420 /// "| b | c |",
421 /// "+---+---+",
422 /// "| 2 | 3 |",
423 /// "+---+---+"
424 /// ];
425 /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
426 /// # Ok(())
427 /// # }
428 /// ```
429 pub fn drop_columns(self, columns: &[&str]) -> Result<DataFrame> {
430 let fields_to_drop = columns
431 .iter()
432 .map(|name| {
433 self.plan
434 .schema()
435 .qualified_field_with_unqualified_name(name)
436 })
437 .filter(|r| r.is_ok())
438 .collect::<Result<Vec<_>>>()?;
439 let expr: Vec<Expr> = self
440 .plan
441 .schema()
442 .fields()
443 .into_iter()
444 .enumerate()
445 .map(|(idx, _)| self.plan.schema().qualified_field(idx))
446 .filter(|(qualifier, f)| !fields_to_drop.contains(&(*qualifier, f)))
447 .map(|(qualifier, field)| Expr::Column(Column::from((qualifier, field))))
448 .collect();
449 self.select(expr)
450 }
451
452 /// Expand multiple list/struct columns into a set of rows and new columns.
453 ///
454 /// See also: [`UnnestOptions`] documentation for the behavior of `unnest`
455 ///
456 /// # Example
457 /// ```
458 /// # use datafusion::prelude::*;
459 /// # use datafusion::error::Result;
460 /// # use datafusion_common::assert_batches_sorted_eq;
461 /// # #[tokio::main]
462 /// # async fn main() -> Result<()> {
463 /// let ctx = SessionContext::new();
464 /// let df = ctx.read_json("tests/data/unnest.json", NdJsonReadOptions::default()).await?;
465 /// // expand into multiple columns if it's json array, flatten field name if it's nested structure
466 /// let df = df.unnest_columns(&["b","c","d"])?;
467 /// let expected = vec![
468 /// "+---+------+-------+-----+-----+",
469 /// "| a | b | c | d.e | d.f |",
470 /// "+---+------+-------+-----+-----+",
471 /// "| 1 | 2.0 | false | 1 | 2 |",
472 /// "| 1 | 1.3 | true | 1 | 2 |",
473 /// "| 1 | -6.1 | | 1 | 2 |",
474 /// "| 2 | 3.0 | false | | |",
475 /// "| 2 | 2.3 | true | | |",
476 /// "| 2 | -7.1 | | | |",
477 /// "+---+------+-------+-----+-----+"
478 /// ];
479 /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
480 /// # Ok(())
481 /// # }
482 /// ```
483 pub fn unnest_columns(self, columns: &[&str]) -> Result<DataFrame> {
484 self.unnest_columns_with_options(columns, UnnestOptions::new())
485 }
486
487 /// Expand multiple list columns into a set of rows, with
488 /// behavior controlled by [`UnnestOptions`].
489 ///
490 /// Please see the documentation on [`UnnestOptions`] for more
491 /// details about the meaning of unnest.
492 pub fn unnest_columns_with_options(
493 self,
494 columns: &[&str],
495 options: UnnestOptions,
496 ) -> Result<DataFrame> {
497 let columns = columns.iter().map(|c| Column::from(*c)).collect();
498 let plan = LogicalPlanBuilder::from(self.plan)
499 .unnest_columns_with_options(columns, options)?
500 .build()?;
501 Ok(DataFrame {
502 session_state: self.session_state,
503 plan,
504 projection_requires_validation: true,
505 })
506 }
507
508 /// Return a DataFrame with only rows for which `predicate` evaluates to
509 /// `true`.
510 ///
511 /// Rows for which `predicate` evaluates to `false` or `null`
512 /// are filtered out.
513 ///
514 /// # Example
515 /// ```
516 /// # use datafusion::prelude::*;
517 /// # use datafusion::error::Result;
518 /// # use datafusion_common::assert_batches_sorted_eq;
519 /// # #[tokio::main]
520 /// # async fn main() -> Result<()> {
521 /// let ctx = SessionContext::new();
522 /// let df = ctx.read_csv("tests/data/example_long.csv", CsvReadOptions::new()).await?;
523 /// let df = df.filter(col("a").lt_eq(col("b")))?;
524 /// // all rows where a <= b are returned
525 /// let expected = vec![
526 /// "+---+---+---+",
527 /// "| a | b | c |",
528 /// "+---+---+---+",
529 /// "| 1 | 2 | 3 |",
530 /// "| 4 | 5 | 6 |",
531 /// "| 7 | 8 | 9 |",
532 /// "+---+---+---+"
533 /// ];
534 /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
535 /// # Ok(())
536 /// # }
537 /// ```
538 pub fn filter(self, predicate: Expr) -> Result<DataFrame> {
539 let plan = LogicalPlanBuilder::from(self.plan)
540 .filter(predicate)?
541 .build()?;
542 Ok(DataFrame {
543 session_state: self.session_state,
544 plan,
545 projection_requires_validation: true,
546 })
547 }
548
549 /// Return a new `DataFrame` that aggregates the rows of the current
550 /// `DataFrame`, first optionally grouping by the given expressions.
551 ///
552 /// # Example
553 /// ```
554 /// # use datafusion::prelude::*;
555 /// # use datafusion::error::Result;
556 /// # use datafusion::functions_aggregate::expr_fn::min;
557 /// # use datafusion_common::assert_batches_sorted_eq;
558 /// # #[tokio::main]
559 /// # async fn main() -> Result<()> {
560 /// let ctx = SessionContext::new();
561 /// let df = ctx.read_csv("tests/data/example_long.csv", CsvReadOptions::new()).await?;
562 ///
563 /// // The following use is the equivalent of "SELECT MIN(b) GROUP BY a"
564 /// let df1 = df.clone().aggregate(vec![col("a")], vec![min(col("b"))])?;
565 /// let expected1 = vec![
566 /// "+---+----------------+",
567 /// "| a | min(?table?.b) |",
568 /// "+---+----------------+",
569 /// "| 1 | 2 |",
570 /// "| 4 | 5 |",
571 /// "| 7 | 8 |",
572 /// "+---+----------------+"
573 /// ];
574 /// assert_batches_sorted_eq!(expected1, &df1.collect().await?);
575 /// // The following use is the equivalent of "SELECT MIN(b)"
576 /// let df2 = df.aggregate(vec![], vec![min(col("b"))])?;
577 /// let expected2 = vec![
578 /// "+----------------+",
579 /// "| min(?table?.b) |",
580 /// "+----------------+",
581 /// "| 2 |",
582 /// "+----------------+"
583 /// ];
584 /// # assert_batches_sorted_eq!(expected2, &df2.collect().await?);
585 /// # Ok(())
586 /// # }
587 /// ```
588 pub fn aggregate(
589 self,
590 group_expr: Vec<Expr>,
591 aggr_expr: Vec<Expr>,
592 ) -> Result<DataFrame> {
593 let is_grouping_set = matches!(group_expr.as_slice(), [Expr::GroupingSet(_)]);
594 let aggr_expr_len = aggr_expr.len();
595 let options =
596 LogicalPlanBuilderOptions::new().with_add_implicit_group_by_exprs(true);
597 let plan = LogicalPlanBuilder::from(self.plan)
598 .with_options(options)
599 .aggregate(group_expr, aggr_expr)?
600 .build()?;
601 let plan = if is_grouping_set {
602 let grouping_id_pos = plan.schema().fields().len() - 1 - aggr_expr_len;
603 // For grouping sets we do a project to not expose the internal grouping id
604 let exprs = plan
605 .schema()
606 .columns()
607 .into_iter()
608 .enumerate()
609 .filter(|(idx, _)| *idx != grouping_id_pos)
610 .map(|(_, column)| Expr::Column(column))
611 .collect::<Vec<_>>();
612 LogicalPlanBuilder::from(plan).project(exprs)?.build()?
613 } else {
614 plan
615 };
616 Ok(DataFrame {
617 session_state: self.session_state,
618 plan,
619 projection_requires_validation: !is_grouping_set,
620 })
621 }
622
623 /// Return a new DataFrame that adds the result of evaluating one or more
624 /// window functions ([`Expr::WindowFunction`]) to the existing columns
625 pub fn window(self, window_exprs: Vec<Expr>) -> Result<DataFrame> {
626 let plan = LogicalPlanBuilder::from(self.plan)
627 .window(window_exprs)?
628 .build()?;
629 Ok(DataFrame {
630 session_state: self.session_state,
631 plan,
632 projection_requires_validation: true,
633 })
634 }
635
636 /// Returns a new `DataFrame` with a limited number of rows.
637 ///
638 /// # Arguments
639 /// `skip` - Number of rows to skip before fetch any row
640 /// `fetch` - Maximum number of rows to return, after skipping `skip` rows.
641 ///
642 /// # Example
643 /// ```
644 /// # use datafusion::prelude::*;
645 /// # use datafusion::error::Result;
646 /// # use datafusion_common::assert_batches_sorted_eq;
647 /// # #[tokio::main]
648 /// # async fn main() -> Result<()> {
649 /// let ctx = SessionContext::new();
650 /// let df = ctx.read_csv("tests/data/example_long.csv", CsvReadOptions::new()).await?;
651 /// let df = df.limit(1, Some(2))?;
652 /// let expected = vec![
653 /// "+---+---+---+",
654 /// "| a | b | c |",
655 /// "+---+---+---+",
656 /// "| 4 | 5 | 6 |",
657 /// "| 7 | 8 | 9 |",
658 /// "+---+---+---+"
659 /// ];
660 /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
661 /// # Ok(())
662 /// # }
663 /// ```
664 pub fn limit(self, skip: usize, fetch: Option<usize>) -> Result<DataFrame> {
665 let plan = LogicalPlanBuilder::from(self.plan)
666 .limit(skip, fetch)?
667 .build()?;
668 Ok(DataFrame {
669 session_state: self.session_state,
670 plan,
671 projection_requires_validation: self.projection_requires_validation,
672 })
673 }
674
675 /// Calculate the union of two [`DataFrame`]s, preserving duplicate rows.
676 ///
677 /// The two [`DataFrame`]s must have exactly the same schema
678 ///
679 /// # Example
680 /// ```
681 /// # use datafusion::prelude::*;
682 /// # use datafusion::error::Result;
683 /// # use datafusion_common::assert_batches_sorted_eq;
684 /// # #[tokio::main]
685 /// # async fn main() -> Result<()> {
686 /// let ctx = SessionContext::new();
687 /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await? ;
688 /// let d2 = df.clone();
689 /// let df = df.union(d2)?;
690 /// let expected = vec![
691 /// "+---+---+---+",
692 /// "| a | b | c |",
693 /// "+---+---+---+",
694 /// "| 1 | 2 | 3 |",
695 /// "| 1 | 2 | 3 |",
696 /// "+---+---+---+"
697 /// ];
698 /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
699 /// # Ok(())
700 /// # }
701 /// ```
702 pub fn union(self, dataframe: DataFrame) -> Result<DataFrame> {
703 let plan = LogicalPlanBuilder::from(self.plan)
704 .union(dataframe.plan)?
705 .build()?;
706 Ok(DataFrame {
707 session_state: self.session_state,
708 plan,
709 projection_requires_validation: true,
710 })
711 }
712
713 /// Calculate the union of two [`DataFrame`]s using column names, preserving duplicate rows.
714 ///
715 /// The two [`DataFrame`]s are combined using column names rather than position,
716 /// filling missing columns with null.
717 ///
718 ///
719 /// # Example
720 /// ```
721 /// # use datafusion::prelude::*;
722 /// # use datafusion::error::Result;
723 /// # use datafusion_common::assert_batches_sorted_eq;
724 /// # #[tokio::main]
725 /// # async fn main() -> Result<()> {
726 /// let ctx = SessionContext::new();
727 /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
728 /// let d2 = df.clone().select_columns(&["b", "c", "a"])?.with_column("d", lit("77"))?;
729 /// let df = df.union_by_name(d2)?;
730 /// let expected = vec![
731 /// "+---+---+---+----+",
732 /// "| a | b | c | d |",
733 /// "+---+---+---+----+",
734 /// "| 1 | 2 | 3 | |",
735 /// "| 1 | 2 | 3 | 77 |",
736 /// "+---+---+---+----+"
737 /// ];
738 /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
739 /// # Ok(())
740 /// # }
741 /// ```
742 pub fn union_by_name(self, dataframe: DataFrame) -> Result<DataFrame> {
743 let plan = LogicalPlanBuilder::from(self.plan)
744 .union_by_name(dataframe.plan)?
745 .build()?;
746 Ok(DataFrame {
747 session_state: self.session_state,
748 plan,
749 projection_requires_validation: true,
750 })
751 }
752
753 /// Calculate the distinct union of two [`DataFrame`]s.
754 ///
755 /// The two [`DataFrame`]s must have exactly the same schema. Any duplicate
756 /// rows are discarded.
757 ///
758 /// # Example
759 /// ```
760 /// # use datafusion::prelude::*;
761 /// # use datafusion::error::Result;
762 /// # use datafusion_common::assert_batches_sorted_eq;
763 /// # #[tokio::main]
764 /// # async fn main() -> Result<()> {
765 /// let ctx = SessionContext::new();
766 /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
767 /// let d2 = df.clone();
768 /// let df = df.union_distinct(d2)?;
769 /// // df2 are duplicate of df
770 /// let expected = vec![
771 /// "+---+---+---+",
772 /// "| a | b | c |",
773 /// "+---+---+---+",
774 /// "| 1 | 2 | 3 |",
775 /// "+---+---+---+"
776 /// ];
777 /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
778 /// # Ok(())
779 /// # }
780 /// ```
781 pub fn union_distinct(self, dataframe: DataFrame) -> Result<DataFrame> {
782 let plan = LogicalPlanBuilder::from(self.plan)
783 .union_distinct(dataframe.plan)?
784 .build()?;
785 Ok(DataFrame {
786 session_state: self.session_state,
787 plan,
788 projection_requires_validation: true,
789 })
790 }
791
792 /// Calculate the union of two [`DataFrame`]s using column names with all duplicated rows removed.
793 ///
794 /// The two [`DataFrame`]s are combined using column names rather than position,
795 /// filling missing columns with null.
796 ///
797 ///
798 /// # Example
799 /// ```
800 /// # use datafusion::prelude::*;
801 /// # use datafusion::error::Result;
802 /// # use datafusion_common::assert_batches_sorted_eq;
803 /// # #[tokio::main]
804 /// # async fn main() -> Result<()> {
805 /// let ctx = SessionContext::new();
806 /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
807 /// let d2 = df.clone().select_columns(&["b", "c", "a"])?;
808 /// let df = df.union_by_name_distinct(d2)?;
809 /// let expected = vec![
810 /// "+---+---+---+",
811 /// "| a | b | c |",
812 /// "+---+---+---+",
813 /// "| 1 | 2 | 3 |",
814 /// "+---+---+---+"
815 /// ];
816 /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
817 /// # Ok(())
818 /// # }
819 /// ```
820 pub fn union_by_name_distinct(self, dataframe: DataFrame) -> Result<DataFrame> {
821 let plan = LogicalPlanBuilder::from(self.plan)
822 .union_by_name_distinct(dataframe.plan)?
823 .build()?;
824 Ok(DataFrame {
825 session_state: self.session_state,
826 plan,
827 projection_requires_validation: true,
828 })
829 }
830
831 /// Return a new `DataFrame` with all duplicated rows removed.
832 ///
833 /// # Example
834 /// ```
835 /// # use datafusion::prelude::*;
836 /// # use datafusion::error::Result;
837 /// # use datafusion_common::assert_batches_sorted_eq;
838 /// # #[tokio::main]
839 /// # async fn main() -> Result<()> {
840 /// let ctx = SessionContext::new();
841 /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
842 /// let df = df.distinct()?;
843 /// let expected = vec![
844 /// "+---+---+---+",
845 /// "| a | b | c |",
846 /// "+---+---+---+",
847 /// "| 1 | 2 | 3 |",
848 /// "+---+---+---+"
849 /// ];
850 /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
851 /// # Ok(())
852 /// # }
853 /// ```
854 pub fn distinct(self) -> Result<DataFrame> {
855 let plan = LogicalPlanBuilder::from(self.plan).distinct()?.build()?;
856 Ok(DataFrame {
857 session_state: self.session_state,
858 plan,
859 projection_requires_validation: true,
860 })
861 }
862
863 /// Return a new `DataFrame` with duplicated rows removed as per the specified expression list
864 /// according to the provided sorting expressions grouped by the `DISTINCT ON` clause
865 /// expressions.
866 ///
867 /// # Example
868 /// ```
869 /// # use datafusion::prelude::*;
870 /// # use datafusion::error::Result;
871 /// # use datafusion_common::assert_batches_sorted_eq;
872 /// # #[tokio::main]
873 /// # async fn main() -> Result<()> {
874 /// let ctx = SessionContext::new();
875 /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?
876 /// // Return a single row (a, b) for each distinct value of a
877 /// .distinct_on(vec![col("a")], vec![col("a"), col("b")], None)?;
878 /// let expected = vec![
879 /// "+---+---+",
880 /// "| a | b |",
881 /// "+---+---+",
882 /// "| 1 | 2 |",
883 /// "+---+---+"
884 /// ];
885 /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
886 /// # Ok(())
887 /// # }
888 /// ```
889 pub fn distinct_on(
890 self,
891 on_expr: Vec<Expr>,
892 select_expr: Vec<Expr>,
893 sort_expr: Option<Vec<SortExpr>>,
894 ) -> Result<DataFrame> {
895 let plan = LogicalPlanBuilder::from(self.plan)
896 .distinct_on(on_expr, select_expr, sort_expr)?
897 .build()?;
898 Ok(DataFrame {
899 session_state: self.session_state,
900 plan,
901 projection_requires_validation: true,
902 })
903 }
904
905 /// Return a new `DataFrame` that has statistics for a DataFrame.
906 ///
907 /// Only summarizes numeric datatypes at the moment and returns nulls for
908 /// non numeric datatypes. The output format is modeled after pandas
909 ///
910 /// # Example
911 /// ```
912 /// # use datafusion::prelude::*;
913 /// # use datafusion::error::Result;
914 /// # use arrow::util::pretty;
915 /// # use datafusion_common::assert_batches_sorted_eq;
916 /// # #[tokio::main]
917 /// # async fn main() -> Result<()> {
918 /// let ctx = SessionContext::new();
919 /// let df = ctx.read_csv("tests/tpch-csv/customer.csv", CsvReadOptions::new()).await?;
920 /// let stat = df.describe().await?;
921 /// # // some output column are ignored
922 /// let expected = vec![
923 /// "+------------+--------------------+--------------------+------------------------------------+--------------------+-----------------+--------------------+--------------+----------------------------------------------------------------------------------------------------------+",
924 /// "| describe | c_custkey | c_name | c_address | c_nationkey | c_phone | c_acctbal | c_mktsegment | c_comment |",
925 /// "+------------+--------------------+--------------------+------------------------------------+--------------------+-----------------+--------------------+--------------+----------------------------------------------------------------------------------------------------------+",
926 /// "| count | 9.0 | 9 | 9 | 9.0 | 9 | 9.0 | 9 | 9 |",
927 /// "| max | 10.0 | Customer#000000010 | xKiAFTjUsCuxfeleNqefumTrjS | 20.0 | 30-114-968-4951 | 9561.95 | MACHINERY | tions. even deposits boost according to the slyly bold packages. final accounts cajole requests. furious |",
928 /// "| mean | 6.0 | null | null | 9.88888888888889 | null | 5153.2155555555555 | null | null |",
929 /// "| median | 6.0 | null | null | 8.0 | null | 6819.74 | null | null |",
930 /// "| min | 2.0 | Customer#000000002 | 6LrEaV6KR6PLVcgl2ArL Q3rqzLzcT1 v2 | 1.0 | 11-719-748-3364 | 121.65 | AUTOMOBILE | deposits eat slyly ironic, even instructions. express foxes detect slyly. blithely even accounts abov |",
931 /// "| null_count | 0.0 | 0 | 0 | 0.0 | 0 | 0.0 | 0 | 0 |",
932 /// "| std | 2.7386127875258306 | null | null | 7.2188026092359046 | null | 3522.169804254585 | null | null |",
933 /// "+------------+--------------------+--------------------+------------------------------------+--------------------+-----------------+--------------------+--------------+----------------------------------------------------------------------------------------------------------+"];
934 /// assert_batches_sorted_eq!(expected, &stat.collect().await?);
935 /// # Ok(())
936 /// # }
937 /// ```
938 pub async fn describe(self) -> Result<Self> {
939 //the functions now supported
940 let supported_describe_functions =
941 vec!["count", "null_count", "mean", "std", "min", "max", "median"];
942
943 let original_schema_fields = self.schema().fields().iter();
944
945 //define describe column
946 let mut describe_schemas = vec![Field::new("describe", DataType::Utf8, false)];
947 describe_schemas.extend(original_schema_fields.clone().map(|field| {
948 if field.data_type().is_numeric() {
949 Field::new(field.name(), DataType::Float64, true)
950 } else {
951 Field::new(field.name(), DataType::Utf8, true)
952 }
953 }));
954
955 //collect recordBatch
956 let describe_record_batch = vec![
957 // count aggregation
958 self.clone().aggregate(
959 vec![],
960 original_schema_fields
961 .clone()
962 .map(|f| count(ident(f.name())).alias(f.name()))
963 .collect::<Vec<_>>(),
964 ),
965 // null_count aggregation
966 self.clone().aggregate(
967 vec![],
968 original_schema_fields
969 .clone()
970 .map(|f| {
971 sum(case(is_null(ident(f.name())))
972 .when(lit(true), lit(1))
973 .otherwise(lit(0))
974 .unwrap())
975 .alias(f.name())
976 })
977 .collect::<Vec<_>>(),
978 ),
979 // mean aggregation
980 self.clone().aggregate(
981 vec![],
982 original_schema_fields
983 .clone()
984 .filter(|f| f.data_type().is_numeric())
985 .map(|f| avg(ident(f.name())).alias(f.name()))
986 .collect::<Vec<_>>(),
987 ),
988 // std aggregation
989 self.clone().aggregate(
990 vec![],
991 original_schema_fields
992 .clone()
993 .filter(|f| f.data_type().is_numeric())
994 .map(|f| stddev(ident(f.name())).alias(f.name()))
995 .collect::<Vec<_>>(),
996 ),
997 // min aggregation
998 self.clone().aggregate(
999 vec![],
1000 original_schema_fields
1001 .clone()
1002 .filter(|f| {
1003 !matches!(f.data_type(), DataType::Binary | DataType::Boolean)
1004 })
1005 .map(|f| min(ident(f.name())).alias(f.name()))
1006 .collect::<Vec<_>>(),
1007 ),
1008 // max aggregation
1009 self.clone().aggregate(
1010 vec![],
1011 original_schema_fields
1012 .clone()
1013 .filter(|f| {
1014 !matches!(f.data_type(), DataType::Binary | DataType::Boolean)
1015 })
1016 .map(|f| max(ident(f.name())).alias(f.name()))
1017 .collect::<Vec<_>>(),
1018 ),
1019 // median aggregation
1020 self.clone().aggregate(
1021 vec![],
1022 original_schema_fields
1023 .clone()
1024 .filter(|f| f.data_type().is_numeric())
1025 .map(|f| median(ident(f.name())).alias(f.name()))
1026 .collect::<Vec<_>>(),
1027 ),
1028 ];
1029
1030 // first column with function names
1031 let mut array_ref_vec: Vec<ArrayRef> = vec![Arc::new(StringArray::from(
1032 supported_describe_functions.clone(),
1033 ))];
1034 for field in original_schema_fields {
1035 let mut array_datas = vec![];
1036 for result in describe_record_batch.iter() {
1037 let array_ref = match result {
1038 Ok(df) => {
1039 let batches = df.clone().collect().await;
1040 match batches {
1041 Ok(batches)
1042 if batches.len() == 1
1043 && batches[0]
1044 .column_by_name(field.name())
1045 .is_some() =>
1046 {
1047 let column =
1048 batches[0].column_by_name(field.name()).unwrap();
1049
1050 if column.data_type().is_null() {
1051 Arc::new(StringArray::from(vec!["null"]))
1052 } else if field.data_type().is_numeric() {
1053 cast(column, &DataType::Float64)?
1054 } else {
1055 cast(column, &DataType::Utf8)?
1056 }
1057 }
1058 _ => Arc::new(StringArray::from(vec!["null"])),
1059 }
1060 }
1061 //Handling error when only boolean/binary column, and in other cases
1062 Err(err)
1063 if err.to_string().contains(
1064 "Error during planning: \
1065 Aggregate requires at least one grouping \
1066 or aggregate expression",
1067 ) =>
1068 {
1069 Arc::new(StringArray::from(vec!["null"]))
1070 }
1071 Err(e) => return exec_err!("{}", e),
1072 };
1073 array_datas.push(array_ref);
1074 }
1075 array_ref_vec.push(concat(
1076 array_datas
1077 .iter()
1078 .map(|af| af.as_ref())
1079 .collect::<Vec<_>>()
1080 .as_slice(),
1081 )?);
1082 }
1083
1084 let describe_record_batch =
1085 RecordBatch::try_new(Arc::new(Schema::new(describe_schemas)), array_ref_vec)?;
1086
1087 let provider = MemTable::try_new(
1088 describe_record_batch.schema(),
1089 vec![vec![describe_record_batch]],
1090 )?;
1091
1092 let plan = LogicalPlanBuilder::scan(
1093 UNNAMED_TABLE,
1094 provider_as_source(Arc::new(provider)),
1095 None,
1096 )?
1097 .build()?;
1098
1099 Ok(DataFrame {
1100 session_state: self.session_state,
1101 plan,
1102 projection_requires_validation: self.projection_requires_validation,
1103 })
1104 }
1105
1106 /// Apply a sort by provided expressions with default direction
1107 pub fn sort_by(self, expr: Vec<Expr>) -> Result<DataFrame> {
1108 self.sort(
1109 expr.into_iter()
1110 .map(|e| e.sort(true, false))
1111 .collect::<Vec<SortExpr>>(),
1112 )
1113 }
1114
1115 /// Sort the DataFrame by the specified sorting expressions.
1116 ///
1117 /// Note that any expression can be turned into
1118 /// a sort expression by calling its [sort](Expr::sort) method.
1119 ///
1120 /// # Example
1121 ///
1122 /// ```
1123 /// # use datafusion::prelude::*;
1124 /// # use datafusion::error::Result;
1125 /// # use datafusion_common::assert_batches_sorted_eq;
1126 /// # #[tokio::main]
1127 /// # async fn main() -> Result<()> {
1128 /// let ctx = SessionContext::new();
1129 /// let df = ctx.read_csv("tests/data/example_long.csv", CsvReadOptions::new()).await?;
1130 /// let df = df.sort(vec![
1131 /// col("a").sort(false, true), // a DESC, nulls first
1132 /// col("b").sort(true, false), // b ASC, nulls last
1133 /// ])?;
1134 /// let expected = vec![
1135 /// "+---+---+---+",
1136 /// "| a | b | c |",
1137 /// "+---+---+---+",
1138 /// "| 1 | 2 | 3 |",
1139 /// "| 4 | 5 | 6 |",
1140 /// "| 7 | 8 | 9 |",
1141 /// "+---+---+---+",
1142 /// ];
1143 /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
1144 /// # Ok(())
1145 /// # }
1146 /// ```
1147 pub fn sort(self, expr: Vec<SortExpr>) -> Result<DataFrame> {
1148 let plan = LogicalPlanBuilder::from(self.plan).sort(expr)?.build()?;
1149 Ok(DataFrame {
1150 session_state: self.session_state,
1151 plan,
1152 projection_requires_validation: self.projection_requires_validation,
1153 })
1154 }
1155
1156 /// Join this `DataFrame` with another `DataFrame` using explicitly specified
1157 /// columns and an optional filter expression.
1158 ///
1159 /// See [`join_on`](Self::join_on) for a more concise way to specify the
1160 /// join condition. Since DataFusion will automatically identify and
1161 /// optimize equality predicates there is no performance difference between
1162 /// this function and `join_on`
1163 ///
1164 /// `left_cols` and `right_cols` are used to form "equijoin" predicates (see
1165 /// example below), which are then combined with the optional `filter`
1166 /// expression. If `left_cols` and `right_cols` contain ambiguous column
1167 /// references, they will be disambiguated by prioritizing the left relation
1168 /// for `left_cols` and the right relation for `right_cols`.
1169 ///
1170 /// Note that in case of outer join, the `filter` is applied to only matched rows.
1171 ///
1172 /// # Example
1173 /// ```
1174 /// # use datafusion::prelude::*;
1175 /// # use datafusion::error::Result;
1176 /// # use datafusion_common::assert_batches_sorted_eq;
1177 /// # #[tokio::main]
1178 /// # async fn main() -> Result<()> {
1179 /// let ctx = SessionContext::new();
1180 /// let left = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
1181 /// let right = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?
1182 /// .select(vec![
1183 /// col("a").alias("a2"),
1184 /// col("b").alias("b2"),
1185 /// col("c").alias("c2")])?;
1186 /// // Perform the equivalent of `left INNER JOIN right ON (a = a2 AND b = b2)`
1187 /// // finding all pairs of rows from `left` and `right` where `a = a2` and `b = b2`.
1188 /// let join = left.join(right, JoinType::Inner, &["a", "b"], &["a2", "b2"], None)?;
1189 /// let expected = vec![
1190 /// "+---+---+---+----+----+----+",
1191 /// "| a | b | c | a2 | b2 | c2 |",
1192 /// "+---+---+---+----+----+----+",
1193 /// "| 1 | 2 | 3 | 1 | 2 | 3 |",
1194 /// "+---+---+---+----+----+----+"
1195 /// ];
1196 /// assert_batches_sorted_eq!(expected, &join.collect().await?);
1197 /// # Ok(())
1198 /// # }
1199 /// ```
1200 ///
1201 pub fn join(
1202 self,
1203 right: DataFrame,
1204 join_type: JoinType,
1205 left_cols: &[&str],
1206 right_cols: &[&str],
1207 filter: Option<Expr>,
1208 ) -> Result<DataFrame> {
1209 let plan = LogicalPlanBuilder::from(self.plan)
1210 .join(
1211 right.plan,
1212 join_type,
1213 (left_cols.to_vec(), right_cols.to_vec()),
1214 filter,
1215 )?
1216 .build()?;
1217 Ok(DataFrame {
1218 session_state: self.session_state,
1219 plan,
1220 projection_requires_validation: true,
1221 })
1222 }
1223
1224 /// Join this `DataFrame` with another `DataFrame` using the specified
1225 /// expressions.
1226 ///
1227 /// Note that DataFusion automatically optimizes joins, including
1228 /// identifying and optimizing equality predicates.
1229 ///
1230 /// # Example
1231 /// ```
1232 /// # use datafusion::prelude::*;
1233 /// # use datafusion::error::Result;
1234 /// # use datafusion_common::assert_batches_sorted_eq;
1235 /// # #[tokio::main]
1236 /// # async fn main() -> Result<()> {
1237 /// let ctx = SessionContext::new();
1238 /// let left = ctx
1239 /// .read_csv("tests/data/example.csv", CsvReadOptions::new())
1240 /// .await?;
1241 /// let right = ctx
1242 /// .read_csv("tests/data/example.csv", CsvReadOptions::new())
1243 /// .await?
1244 /// .select(vec![
1245 /// col("a").alias("a2"),
1246 /// col("b").alias("b2"),
1247 /// col("c").alias("c2"),
1248 /// ])?;
1249 ///
1250 /// // Perform the equivalent of `left INNER JOIN right ON (a != a2 AND b != b2)`
1251 /// // finding all pairs of rows from `left` and `right` where
1252 /// // where `a != a2` and `b != b2`.
1253 /// let join_on = left.join_on(
1254 /// right,
1255 /// JoinType::Inner,
1256 /// [col("a").not_eq(col("a2")), col("b").not_eq(col("b2"))],
1257 /// )?;
1258 /// let expected = vec![
1259 /// "+---+---+---+----+----+----+",
1260 /// "| a | b | c | a2 | b2 | c2 |",
1261 /// "+---+---+---+----+----+----+",
1262 /// "+---+---+---+----+----+----+"
1263 /// ];
1264 /// # assert_batches_sorted_eq!(expected, &join_on.collect().await?);
1265 /// # Ok(())
1266 /// # }
1267 /// ```
1268 pub fn join_on(
1269 self,
1270 right: DataFrame,
1271 join_type: JoinType,
1272 on_exprs: impl IntoIterator<Item = Expr>,
1273 ) -> Result<DataFrame> {
1274 let plan = LogicalPlanBuilder::from(self.plan)
1275 .join_on(right.plan, join_type, on_exprs)?
1276 .build()?;
1277 Ok(DataFrame {
1278 session_state: self.session_state,
1279 plan,
1280 projection_requires_validation: true,
1281 })
1282 }
1283
1284 /// Repartition a DataFrame based on a logical partitioning scheme.
1285 ///
1286 /// # Example
1287 /// ```
1288 /// # use datafusion::prelude::*;
1289 /// # use datafusion::error::Result;
1290 /// # use datafusion_common::assert_batches_sorted_eq;
1291 /// # #[tokio::main]
1292 /// # async fn main() -> Result<()> {
1293 /// let ctx = SessionContext::new();
1294 /// let df = ctx.read_csv("tests/data/example_long.csv", CsvReadOptions::new()).await?;
1295 /// let df1 = df.repartition(Partitioning::RoundRobinBatch(4))?;
1296 /// let expected = vec![
1297 /// "+---+---+---+",
1298 /// "| a | b | c |",
1299 /// "+---+---+---+",
1300 /// "| 1 | 2 | 3 |",
1301 /// "| 4 | 5 | 6 |",
1302 /// "| 7 | 8 | 9 |",
1303 /// "+---+---+---+"
1304 /// ];
1305 /// # assert_batches_sorted_eq!(expected, &df1.collect().await?);
1306 /// # Ok(())
1307 /// # }
1308 /// ```
1309 pub fn repartition(self, partitioning_scheme: Partitioning) -> Result<DataFrame> {
1310 let plan = LogicalPlanBuilder::from(self.plan)
1311 .repartition(partitioning_scheme)?
1312 .build()?;
1313 Ok(DataFrame {
1314 session_state: self.session_state,
1315 plan,
1316 projection_requires_validation: true,
1317 })
1318 }
1319
1320 /// Return the total number of rows in this `DataFrame`.
1321 ///
1322 /// Note that this method will actually run a plan to calculate the count,
1323 /// which may be slow for large or complicated DataFrames.
1324 ///
1325 /// # Example
1326 /// ```
1327 /// # use datafusion::prelude::*;
1328 /// # use datafusion::error::Result;
1329 /// # #[tokio::main]
1330 /// # async fn main() -> Result<()> {
1331 /// let ctx = SessionContext::new();
1332 /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
1333 /// let count = df.count().await?; // 1
1334 /// # assert_eq!(count, 1);
1335 /// # Ok(())
1336 /// # }
1337 /// ```
1338 pub async fn count(self) -> Result<usize> {
1339 let rows = self
1340 .aggregate(
1341 vec![],
1342 vec![count(Expr::Literal(COUNT_STAR_EXPANSION, None))],
1343 )?
1344 .collect()
1345 .await?;
1346 let len = *rows
1347 .first()
1348 .and_then(|r| r.columns().first())
1349 .and_then(|c| c.as_any().downcast_ref::<Int64Array>())
1350 .and_then(|a| a.values().first())
1351 .ok_or(DataFusionError::Internal(
1352 "Unexpected output when collecting for count()".to_string(),
1353 ))? as usize;
1354 Ok(len)
1355 }
1356
1357 /// Execute this `DataFrame` and buffer all resulting `RecordBatch`es into memory.
1358 ///
1359 /// Prior to calling `collect`, modifying a DataFrame simply updates a plan
1360 /// (no actual computation is performed). `collect` triggers the computation.
1361 ///
1362 /// See [`Self::execute_stream`] to execute a DataFrame without buffering.
1363 ///
1364 /// # Example
1365 /// ```
1366 /// # use datafusion::prelude::*;
1367 /// # use datafusion::error::Result;
1368 /// # #[tokio::main]
1369 /// # async fn main() -> Result<()> {
1370 /// let ctx = SessionContext::new();
1371 /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
1372 /// let batches = df.collect().await?;
1373 /// # Ok(())
1374 /// # }
1375 /// ```
1376 pub async fn collect(self) -> Result<Vec<RecordBatch>> {
1377 let task_ctx = Arc::new(self.task_ctx());
1378 let plan = self.create_physical_plan().await?;
1379 collect(plan, task_ctx).await
1380 }
1381
1382 /// Execute the `DataFrame` and print the results to the console.
1383 ///
1384 /// # Example
1385 /// ```
1386 /// # use datafusion::prelude::*;
1387 /// # use datafusion::error::Result;
1388 /// # #[tokio::main]
1389 /// # async fn main() -> Result<()> {
1390 /// let ctx = SessionContext::new();
1391 /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
1392 /// df.show().await?;
1393 /// # Ok(())
1394 /// # }
1395 /// ```
1396 pub async fn show(self) -> Result<()> {
1397 println!("{}", self.to_string().await?);
1398 Ok(())
1399 }
1400
1401 /// Execute the `DataFrame` and return a string representation of the results.
1402 ///
1403 /// # Example
1404 /// ```
1405 /// # use datafusion::prelude::*;
1406 /// # use datafusion::error::Result;
1407 /// # use datafusion::execution::SessionStateBuilder;
1408 ///
1409 /// # #[tokio::main]
1410 /// # async fn main() -> Result<()> {
1411 /// let cfg = SessionConfig::new()
1412 /// .set_str("datafusion.format.null", "no-value");
1413 /// let session_state = SessionStateBuilder::new()
1414 /// .with_config(cfg)
1415 /// .with_default_features()
1416 /// .build();
1417 /// let ctx = SessionContext::new_with_state(session_state);
1418 /// let df = ctx.sql("select null as 'null-column'").await?;
1419 /// let result = df.to_string().await?;
1420 /// assert_eq!(result,
1421 /// "+-------------+
1422 /// | null-column |
1423 /// +-------------+
1424 /// | no-value |
1425 /// +-------------+"
1426 /// );
1427 /// # Ok(())
1428 /// # }
1429 pub async fn to_string(self) -> Result<String> {
1430 let options = self.session_state.config().options().format.clone();
1431 let arrow_options: arrow::util::display::FormatOptions = (&options).try_into()?;
1432
1433 let results = self.collect().await?;
1434 Ok(
1435 pretty::pretty_format_batches_with_options(&results, &arrow_options)?
1436 .to_string(),
1437 )
1438 }
1439
1440 /// Execute the `DataFrame` and print only the first `num` rows of the
1441 /// result to the console.
1442 ///
1443 /// # Example
1444 /// ```
1445 /// # use datafusion::prelude::*;
1446 /// # use datafusion::error::Result;
1447 /// # #[tokio::main]
1448 /// # async fn main() -> Result<()> {
1449 /// let ctx = SessionContext::new();
1450 /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
1451 /// df.show_limit(10).await?;
1452 /// # Ok(())
1453 /// # }
1454 /// ```
1455 pub async fn show_limit(self, num: usize) -> Result<()> {
1456 let results = self.limit(0, Some(num))?.collect().await?;
1457 Ok(pretty::print_batches(&results)?)
1458 }
1459
1460 /// Return a new [`TaskContext`] which would be used to execute this DataFrame
1461 pub fn task_ctx(&self) -> TaskContext {
1462 TaskContext::from(self.session_state.as_ref())
1463 }
1464
1465 /// Executes this DataFrame and returns a stream over a single partition
1466 ///
1467 /// See [Self::collect] to buffer the `RecordBatch`es in memory.
1468 ///
1469 /// # Example
1470 /// ```
1471 /// # use datafusion::prelude::*;
1472 /// # use datafusion::error::Result;
1473 /// # #[tokio::main]
1474 /// # async fn main() -> Result<()> {
1475 /// let ctx = SessionContext::new();
1476 /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
1477 /// let stream = df.execute_stream().await?;
1478 /// # Ok(())
1479 /// # }
1480 /// ```
1481 ///
1482 /// # Aborting Execution
1483 ///
1484 /// Dropping the stream will abort the execution of the query, and free up
1485 /// any allocated resources
1486 pub async fn execute_stream(self) -> Result<SendableRecordBatchStream> {
1487 let task_ctx = Arc::new(self.task_ctx());
1488 let plan = self.create_physical_plan().await?;
1489 execute_stream(plan, task_ctx)
1490 }
1491
1492 /// Executes this DataFrame and collects all results into a vector of vector of RecordBatch
1493 /// maintaining the input partitioning.
1494 ///
1495 /// # Example
1496 /// ```
1497 /// # use datafusion::prelude::*;
1498 /// # use datafusion::error::Result;
1499 /// # #[tokio::main]
1500 /// # async fn main() -> Result<()> {
1501 /// let ctx = SessionContext::new();
1502 /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
1503 /// let batches = df.collect_partitioned().await?;
1504 /// # Ok(())
1505 /// # }
1506 /// ```
1507 pub async fn collect_partitioned(self) -> Result<Vec<Vec<RecordBatch>>> {
1508 let task_ctx = Arc::new(self.task_ctx());
1509 let plan = self.create_physical_plan().await?;
1510 collect_partitioned(plan, task_ctx).await
1511 }
1512
1513 /// Executes this DataFrame and returns one stream per partition.
1514 ///
1515 /// # Example
1516 /// ```
1517 /// # use datafusion::prelude::*;
1518 /// # use datafusion::error::Result;
1519 /// # #[tokio::main]
1520 /// # async fn main() -> Result<()> {
1521 /// let ctx = SessionContext::new();
1522 /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
1523 /// let batches = df.execute_stream_partitioned().await?;
1524 /// # Ok(())
1525 /// # }
1526 /// ```
1527 /// # Aborting Execution
1528 ///
1529 /// Dropping the stream will abort the execution of the query, and free up
1530 /// any allocated resources
1531 pub async fn execute_stream_partitioned(
1532 self,
1533 ) -> Result<Vec<SendableRecordBatchStream>> {
1534 let task_ctx = Arc::new(self.task_ctx());
1535 let plan = self.create_physical_plan().await?;
1536 execute_stream_partitioned(plan, task_ctx)
1537 }
1538
1539 /// Returns the `DFSchema` describing the output of this DataFrame.
1540 ///
1541 /// The output `DFSchema` contains information on the name, data type, and
1542 /// nullability for each column.
1543 ///
1544 /// # Example
1545 /// ```
1546 /// # use datafusion::prelude::*;
1547 /// # use datafusion::error::Result;
1548 /// # #[tokio::main]
1549 /// # async fn main() -> Result<()> {
1550 /// let ctx = SessionContext::new();
1551 /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
1552 /// let schema = df.schema();
1553 /// # Ok(())
1554 /// # }
1555 /// ```
1556 pub fn schema(&self) -> &DFSchema {
1557 self.plan.schema()
1558 }
1559
1560 /// Return a reference to the unoptimized [`LogicalPlan`] that comprises
1561 /// this DataFrame.
1562 ///
1563 /// See [`Self::into_unoptimized_plan`] for more details.
1564 pub fn logical_plan(&self) -> &LogicalPlan {
1565 &self.plan
1566 }
1567
1568 /// Returns both the [`LogicalPlan`] and [`SessionState`] that comprise this [`DataFrame`]
1569 pub fn into_parts(self) -> (SessionState, LogicalPlan) {
1570 (*self.session_state, self.plan)
1571 }
1572
1573 /// Return the [`LogicalPlan`] represented by this DataFrame without running
1574 /// any optimizers
1575 ///
1576 /// Note: This method should not be used outside testing, as it loses the
1577 /// snapshot of the [`SessionState`] attached to this [`DataFrame`] and
1578 /// consequently subsequent operations may take place against a different
1579 /// state (e.g. a different value of `now()`)
1580 ///
1581 /// See [`Self::into_parts`] to retrieve the owned [`LogicalPlan`] and
1582 /// corresponding [`SessionState`].
1583 pub fn into_unoptimized_plan(self) -> LogicalPlan {
1584 self.plan
1585 }
1586
1587 /// Return the optimized [`LogicalPlan`] represented by this DataFrame.
1588 ///
1589 /// Note: This method should not be used outside testing -- see
1590 /// [`Self::into_unoptimized_plan`] for more details.
1591 pub fn into_optimized_plan(self) -> Result<LogicalPlan> {
1592 // Optimize the plan first for better UX
1593 self.session_state.optimize(&self.plan)
1594 }
1595
1596 /// Converts this [`DataFrame`] into a [`TableProvider`] that can be registered
1597 /// as a table view using [`SessionContext::register_table`].
1598 ///
1599 /// Note: This discards the [`SessionState`] associated with this
1600 /// [`DataFrame`] in favour of the one passed to [`TableProvider::scan`]
1601 pub fn into_view(self) -> Arc<dyn TableProvider> {
1602 Arc::new(DataFrameTableProvider { plan: self.plan })
1603 }
1604
1605 /// Return a DataFrame with the explanation of its plan so far.
1606 ///
1607 /// if `analyze` is specified, runs the plan and reports metrics
1608 ///
1609 /// ```
1610 /// # use datafusion::prelude::*;
1611 /// # use datafusion::error::Result;
1612 /// # #[tokio::main]
1613 /// # async fn main() -> Result<()> {
1614 /// let ctx = SessionContext::new();
1615 /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
1616 /// let batches = df.limit(0, Some(100))?.explain(false, false)?.collect().await?;
1617 /// # Ok(())
1618 /// # }
1619 /// ```
1620 pub fn explain(self, verbose: bool, analyze: bool) -> Result<DataFrame> {
1621 if matches!(self.plan, LogicalPlan::Explain(_)) {
1622 return plan_err!("Nested EXPLAINs are not supported");
1623 }
1624 let plan = LogicalPlanBuilder::from(self.plan)
1625 .explain(verbose, analyze)?
1626 .build()?;
1627 Ok(DataFrame {
1628 session_state: self.session_state,
1629 plan,
1630 projection_requires_validation: self.projection_requires_validation,
1631 })
1632 }
1633
1634 /// Return a `FunctionRegistry` used to plan udf's calls
1635 ///
1636 /// # Example
1637 /// ```
1638 /// # use datafusion::prelude::*;
1639 /// # use datafusion::error::Result;
1640 /// # #[tokio::main]
1641 /// # async fn main() -> Result<()> {
1642 /// let ctx = SessionContext::new();
1643 /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
1644 /// let f = df.registry();
1645 /// // use f.udf("name", vec![...]) to use the udf
1646 /// # Ok(())
1647 /// # }
1648 /// ```
1649 pub fn registry(&self) -> &dyn FunctionRegistry {
1650 self.session_state.as_ref()
1651 }
1652
1653 /// Calculate the intersection of two [`DataFrame`]s. The two [`DataFrame`]s must have exactly the same schema
1654 ///
1655 /// ```
1656 /// # use datafusion::prelude::*;
1657 /// # use datafusion::error::Result;
1658 /// # use datafusion_common::assert_batches_sorted_eq;
1659 /// # #[tokio::main]
1660 /// # async fn main() -> Result<()> {
1661 /// let ctx = SessionContext::new();
1662 /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
1663 /// let d2 = ctx.read_csv("tests/data/example_long.csv", CsvReadOptions::new()).await?;
1664 /// let df = df.intersect(d2)?;
1665 /// let expected = vec![
1666 /// "+---+---+---+",
1667 /// "| a | b | c |",
1668 /// "+---+---+---+",
1669 /// "| 1 | 2 | 3 |",
1670 /// "+---+---+---+"
1671 /// ];
1672 /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
1673 /// # Ok(())
1674 /// # }
1675 /// ```
1676 pub fn intersect(self, dataframe: DataFrame) -> Result<DataFrame> {
1677 let left_plan = self.plan;
1678 let right_plan = dataframe.plan;
1679 let plan = LogicalPlanBuilder::intersect(left_plan, right_plan, true)?;
1680 Ok(DataFrame {
1681 session_state: self.session_state,
1682 plan,
1683 projection_requires_validation: true,
1684 })
1685 }
1686
1687 /// Calculate the exception of two [`DataFrame`]s. The two [`DataFrame`]s must have exactly the same schema
1688 ///
1689 /// ```
1690 /// # use datafusion::prelude::*;
1691 /// # use datafusion::error::Result;
1692 /// # use datafusion_common::assert_batches_sorted_eq;
1693 /// # #[tokio::main]
1694 /// # async fn main() -> Result<()> {
1695 /// let ctx = SessionContext::new();
1696 /// let df = ctx.read_csv("tests/data/example_long.csv", CsvReadOptions::new()).await?;
1697 /// let d2 = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
1698 /// let result = df.except(d2)?;
1699 /// // those columns are not in example.csv, but in example_long.csv
1700 /// let expected = vec![
1701 /// "+---+---+---+",
1702 /// "| a | b | c |",
1703 /// "+---+---+---+",
1704 /// "| 4 | 5 | 6 |",
1705 /// "| 7 | 8 | 9 |",
1706 /// "+---+---+---+"
1707 /// ];
1708 /// # assert_batches_sorted_eq!(expected, &result.collect().await?);
1709 /// # Ok(())
1710 /// # }
1711 /// ```
1712 pub fn except(self, dataframe: DataFrame) -> Result<DataFrame> {
1713 let left_plan = self.plan;
1714 let right_plan = dataframe.plan;
1715 let plan = LogicalPlanBuilder::except(left_plan, right_plan, true)?;
1716 Ok(DataFrame {
1717 session_state: self.session_state,
1718 plan,
1719 projection_requires_validation: true,
1720 })
1721 }
1722
1723 /// Execute this `DataFrame` and write the results to `table_name`.
1724 ///
1725 /// Returns a single [RecordBatch] containing a single column and
1726 /// row representing the count of total rows written.
1727 ///
1728 /// Unlike most other `DataFrame` methods, this method executes eagerly.
1729 /// Data is written to the table using the [`TableProvider::insert_into`]
1730 /// method. This is the same underlying implementation used by SQL `INSERT
1731 /// INTO` statements.
1732 pub async fn write_table(
1733 self,
1734 table_name: &str,
1735 write_options: DataFrameWriteOptions,
1736 ) -> Result<Vec<RecordBatch>, DataFusionError> {
1737 let plan = if write_options.sort_by.is_empty() {
1738 self.plan
1739 } else {
1740 LogicalPlanBuilder::from(self.plan)
1741 .sort(write_options.sort_by)?
1742 .build()?
1743 };
1744
1745 let table_ref: TableReference = table_name.into();
1746 let table_schema = self.session_state.schema_for_ref(table_ref.clone())?;
1747 let target = match table_schema.table(table_ref.table()).await? {
1748 Some(ref provider) => Ok(Arc::clone(provider)),
1749 _ => plan_err!("No table named '{table_name}'"),
1750 }?;
1751
1752 let target = Arc::new(DefaultTableSource::new(target));
1753
1754 let plan = LogicalPlanBuilder::insert_into(
1755 plan,
1756 table_ref,
1757 target,
1758 write_options.insert_op,
1759 )?
1760 .build()?;
1761
1762 DataFrame {
1763 session_state: self.session_state,
1764 plan,
1765 projection_requires_validation: self.projection_requires_validation,
1766 }
1767 .collect()
1768 .await
1769 }
1770
1771 /// Execute the `DataFrame` and write the results to CSV file(s).
1772 ///
1773 /// # Example
1774 /// ```
1775 /// # use datafusion::prelude::*;
1776 /// # use datafusion::error::Result;
1777 /// # use std::fs;
1778 /// # #[tokio::main]
1779 /// # async fn main() -> Result<()> {
1780 /// use datafusion::dataframe::DataFrameWriteOptions;
1781 /// let ctx = SessionContext::new();
1782 /// // Sort the data by column "b" and write it to a new location
1783 /// ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?
1784 /// .sort(vec![col("b").sort(true, true)])? // sort by b asc, nulls first
1785 /// .write_csv(
1786 /// "output.csv",
1787 /// DataFrameWriteOptions::new(),
1788 /// None, // can also specify CSV writing options here
1789 /// ).await?;
1790 /// # fs::remove_file("output.csv")?;
1791 /// # Ok(())
1792 /// # }
1793 /// ```
1794 pub async fn write_csv(
1795 self,
1796 path: &str,
1797 options: DataFrameWriteOptions,
1798 writer_options: Option<CsvOptions>,
1799 ) -> Result<Vec<RecordBatch>, DataFusionError> {
1800 if options.insert_op != InsertOp::Append {
1801 return not_impl_err!(
1802 "{} is not implemented for DataFrame::write_csv.",
1803 options.insert_op
1804 );
1805 }
1806
1807 let format = if let Some(csv_opts) = writer_options {
1808 Arc::new(CsvFormatFactory::new_with_options(csv_opts))
1809 } else {
1810 Arc::new(CsvFormatFactory::new())
1811 };
1812
1813 let file_type = format_as_file_type(format);
1814
1815 let plan = if options.sort_by.is_empty() {
1816 self.plan
1817 } else {
1818 LogicalPlanBuilder::from(self.plan)
1819 .sort(options.sort_by)?
1820 .build()?
1821 };
1822
1823 let plan = LogicalPlanBuilder::copy_to(
1824 plan,
1825 path.into(),
1826 file_type,
1827 HashMap::new(),
1828 options.partition_by,
1829 )?
1830 .build()?;
1831
1832 DataFrame {
1833 session_state: self.session_state,
1834 plan,
1835 projection_requires_validation: self.projection_requires_validation,
1836 }
1837 .collect()
1838 .await
1839 }
1840
1841 /// Execute the `DataFrame` and write the results to JSON file(s).
1842 ///
1843 /// # Example
1844 /// ```
1845 /// # use datafusion::prelude::*;
1846 /// # use datafusion::error::Result;
1847 /// # use std::fs;
1848 /// # #[tokio::main]
1849 /// # async fn main() -> Result<()> {
1850 /// use datafusion::dataframe::DataFrameWriteOptions;
1851 /// let ctx = SessionContext::new();
1852 /// // Sort the data by column "b" and write it to a new location
1853 /// ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?
1854 /// .sort(vec![col("b").sort(true, true)])? // sort by b asc, nulls first
1855 /// .write_json(
1856 /// "output.json",
1857 /// DataFrameWriteOptions::new(),
1858 /// None
1859 /// ).await?;
1860 /// # fs::remove_file("output.json")?;
1861 /// # Ok(())
1862 /// # }
1863 /// ```
1864 pub async fn write_json(
1865 self,
1866 path: &str,
1867 options: DataFrameWriteOptions,
1868 writer_options: Option<JsonOptions>,
1869 ) -> Result<Vec<RecordBatch>, DataFusionError> {
1870 if options.insert_op != InsertOp::Append {
1871 return not_impl_err!(
1872 "{} is not implemented for DataFrame::write_json.",
1873 options.insert_op
1874 );
1875 }
1876
1877 let format = if let Some(json_opts) = writer_options {
1878 Arc::new(JsonFormatFactory::new_with_options(json_opts))
1879 } else {
1880 Arc::new(JsonFormatFactory::new())
1881 };
1882
1883 let file_type = format_as_file_type(format);
1884
1885 let plan = if options.sort_by.is_empty() {
1886 self.plan
1887 } else {
1888 LogicalPlanBuilder::from(self.plan)
1889 .sort(options.sort_by)?
1890 .build()?
1891 };
1892
1893 let plan = LogicalPlanBuilder::copy_to(
1894 plan,
1895 path.into(),
1896 file_type,
1897 Default::default(),
1898 options.partition_by,
1899 )?
1900 .build()?;
1901
1902 DataFrame {
1903 session_state: self.session_state,
1904 plan,
1905 projection_requires_validation: self.projection_requires_validation,
1906 }
1907 .collect()
1908 .await
1909 }
1910
1911 /// Add or replace a column in the DataFrame.
1912 ///
1913 /// # Example
1914 /// ```
1915 /// # use datafusion::prelude::*;
1916 /// # use datafusion::error::Result;
1917 /// # #[tokio::main]
1918 /// # async fn main() -> Result<()> {
1919 /// let ctx = SessionContext::new();
1920 /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
1921 /// let df = df.with_column("ab_sum", col("a") + col("b"))?;
1922 /// # Ok(())
1923 /// # }
1924 /// ```
1925 pub fn with_column(self, name: &str, expr: Expr) -> Result<DataFrame> {
1926 let window_func_exprs = find_window_exprs(std::slice::from_ref(&expr));
1927
1928 let (window_fn_str, plan) = if window_func_exprs.is_empty() {
1929 (None, self.plan)
1930 } else {
1931 (
1932 Some(window_func_exprs[0].to_string()),
1933 LogicalPlanBuilder::window_plan(self.plan, window_func_exprs)?,
1934 )
1935 };
1936
1937 let mut col_exists = false;
1938 let new_column = expr.alias(name);
1939 let mut fields: Vec<(Expr, bool)> = plan
1940 .schema()
1941 .iter()
1942 .filter_map(|(qualifier, field)| {
1943 if field.name() == name {
1944 col_exists = true;
1945 Some((new_column.clone(), true))
1946 } else {
1947 let e = col(Column::from((qualifier, field)));
1948 window_fn_str
1949 .as_ref()
1950 .filter(|s| *s == &e.to_string())
1951 .is_none()
1952 .then_some((e, self.projection_requires_validation))
1953 }
1954 })
1955 .collect();
1956
1957 if !col_exists {
1958 fields.push((new_column, true));
1959 }
1960
1961 let project_plan = LogicalPlanBuilder::from(plan)
1962 .project_with_validation(fields)?
1963 .build()?;
1964
1965 Ok(DataFrame {
1966 session_state: self.session_state,
1967 plan: project_plan,
1968 projection_requires_validation: false,
1969 })
1970 }
1971
1972 /// Rename one column by applying a new projection. This is a no-op if the column to be
1973 /// renamed does not exist.
1974 ///
1975 /// The method supports case sensitive rename with wrapping column name into one of following symbols ( " or ' or ` )
1976 ///
1977 /// Alternatively setting DataFusion param `datafusion.sql_parser.enable_ident_normalization` to `false` will enable
1978 /// case sensitive rename without need to wrap column name into special symbols
1979 ///
1980 /// # Example
1981 /// ```
1982 /// # use datafusion::prelude::*;
1983 /// # use datafusion::error::Result;
1984 /// # #[tokio::main]
1985 /// # async fn main() -> Result<()> {
1986 /// let ctx = SessionContext::new();
1987 /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
1988 /// let df = df.with_column_renamed("ab_sum", "total")?;
1989 ///
1990 /// # Ok(())
1991 /// # }
1992 /// ```
1993 pub fn with_column_renamed(
1994 self,
1995 old_name: impl Into<String>,
1996 new_name: &str,
1997 ) -> Result<DataFrame> {
1998 let ident_opts = self
1999 .session_state
2000 .config_options()
2001 .sql_parser
2002 .enable_ident_normalization;
2003 let old_column: Column = if ident_opts {
2004 Column::from_qualified_name(old_name)
2005 } else {
2006 Column::from_qualified_name_ignore_case(old_name)
2007 };
2008
2009 let (qualifier_rename, field_rename) =
2010 match self.plan.schema().qualified_field_from_column(&old_column) {
2011 Ok(qualifier_and_field) => qualifier_and_field,
2012 // no-op if field not found
2013 Err(DataFusionError::SchemaError(
2014 SchemaError::FieldNotFound { .. },
2015 _,
2016 )) => return Ok(self),
2017 Err(err) => return Err(err),
2018 };
2019 let projection = self
2020 .plan
2021 .schema()
2022 .iter()
2023 .map(|(qualifier, field)| {
2024 if qualifier.eq(&qualifier_rename) && field.as_ref() == field_rename {
2025 (
2026 col(Column::from((qualifier, field)))
2027 .alias_qualified(qualifier.cloned(), new_name),
2028 false,
2029 )
2030 } else {
2031 (col(Column::from((qualifier, field))), false)
2032 }
2033 })
2034 .collect::<Vec<_>>();
2035 let project_plan = LogicalPlanBuilder::from(self.plan)
2036 .project_with_validation(projection)?
2037 .build()?;
2038 Ok(DataFrame {
2039 session_state: self.session_state,
2040 plan: project_plan,
2041 projection_requires_validation: false,
2042 })
2043 }
2044
2045 /// Replace all parameters in logical plan with the specified
2046 /// values, in preparation for execution.
2047 ///
2048 /// # Example
2049 ///
2050 /// ```
2051 /// use datafusion::prelude::*;
2052 /// # use datafusion::{error::Result, assert_batches_eq};
2053 /// # #[tokio::main]
2054 /// # async fn main() -> Result<()> {
2055 /// # use datafusion_common::ScalarValue;
2056 /// let ctx = SessionContext::new();
2057 /// # ctx.register_csv("example", "tests/data/example.csv", CsvReadOptions::new()).await?;
2058 /// let results = ctx
2059 /// .sql("SELECT a FROM example WHERE b = $1")
2060 /// .await?
2061 /// // replace $1 with value 2
2062 /// .with_param_values(vec![
2063 /// // value at index 0 --> $1
2064 /// ScalarValue::from(2i64)
2065 /// ])?
2066 /// .collect()
2067 /// .await?;
2068 /// assert_batches_eq!(
2069 /// &[
2070 /// "+---+",
2071 /// "| a |",
2072 /// "+---+",
2073 /// "| 1 |",
2074 /// "+---+",
2075 /// ],
2076 /// &results
2077 /// );
2078 /// // Note you can also provide named parameters
2079 /// let results = ctx
2080 /// .sql("SELECT a FROM example WHERE b = $my_param")
2081 /// .await?
2082 /// // replace $my_param with value 2
2083 /// // Note you can also use a HashMap as well
2084 /// .with_param_values(vec![
2085 /// ("my_param", ScalarValue::from(2i64))
2086 /// ])?
2087 /// .collect()
2088 /// .await?;
2089 /// assert_batches_eq!(
2090 /// &[
2091 /// "+---+",
2092 /// "| a |",
2093 /// "+---+",
2094 /// "| 1 |",
2095 /// "+---+",
2096 /// ],
2097 /// &results
2098 /// );
2099 /// # Ok(())
2100 /// # }
2101 /// ```
2102 pub fn with_param_values(self, query_values: impl Into<ParamValues>) -> Result<Self> {
2103 let plan = self.plan.with_param_values(query_values)?;
2104 Ok(DataFrame {
2105 session_state: self.session_state,
2106 plan,
2107 projection_requires_validation: self.projection_requires_validation,
2108 })
2109 }
2110
2111 /// Cache DataFrame as a memory table.
2112 ///
2113 /// ```
2114 /// # use datafusion::prelude::*;
2115 /// # use datafusion::error::Result;
2116 /// # #[tokio::main]
2117 /// # async fn main() -> Result<()> {
2118 /// let ctx = SessionContext::new();
2119 /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
2120 /// let df = df.cache().await?;
2121 /// # Ok(())
2122 /// # }
2123 /// ```
2124 pub async fn cache(self) -> Result<DataFrame> {
2125 let context = SessionContext::new_with_state((*self.session_state).clone());
2126 // The schema is consistent with the output
2127 let plan = self.clone().create_physical_plan().await?;
2128 let schema = plan.schema();
2129 let task_ctx = Arc::new(self.task_ctx());
2130 let partitions = collect_partitioned(plan, task_ctx).await?;
2131 let mem_table = MemTable::try_new(schema, partitions)?;
2132 context.read_table(Arc::new(mem_table))
2133 }
2134
2135 /// Apply an alias to the DataFrame.
2136 ///
2137 /// This method replaces the qualifiers of output columns with the given alias.
2138 pub fn alias(self, alias: &str) -> Result<DataFrame> {
2139 let plan = LogicalPlanBuilder::from(self.plan).alias(alias)?.build()?;
2140 Ok(DataFrame {
2141 session_state: self.session_state,
2142 plan,
2143 projection_requires_validation: self.projection_requires_validation,
2144 })
2145 }
2146
2147 /// Fill null values in specified columns with a given value
2148 /// If no columns are specified (empty vector), applies to all columns
2149 /// Only fills if the value can be cast to the column's type
2150 ///
2151 /// # Arguments
2152 /// * `value` - Value to fill nulls with
2153 /// * `columns` - List of column names to fill. If empty, fills all columns.
2154 ///
2155 /// # Example
2156 /// ```
2157 /// # use datafusion::prelude::*;
2158 /// # use datafusion::error::Result;
2159 /// # use datafusion_common::ScalarValue;
2160 /// # #[tokio::main]
2161 /// # async fn main() -> Result<()> {
2162 /// let ctx = SessionContext::new();
2163 /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
2164 /// // Fill nulls in only columns "a" and "c":
2165 /// let df = df.fill_null(ScalarValue::from(0), vec!["a".to_owned(), "c".to_owned()])?;
2166 /// // Fill nulls across all columns:
2167 /// let df = df.fill_null(ScalarValue::from(0), vec![])?;
2168 /// # Ok(())
2169 /// # }
2170 /// ```
2171 pub fn fill_null(
2172 &self,
2173 value: ScalarValue,
2174 columns: Vec<String>,
2175 ) -> Result<DataFrame> {
2176 let cols = if columns.is_empty() {
2177 self.logical_plan()
2178 .schema()
2179 .fields()
2180 .iter()
2181 .map(|f| f.as_ref().clone())
2182 .collect()
2183 } else {
2184 self.find_columns(&columns)?
2185 };
2186
2187 // Create projections for each column
2188 let projections = self
2189 .logical_plan()
2190 .schema()
2191 .fields()
2192 .iter()
2193 .map(|field| {
2194 if cols.contains(field) {
2195 // Try to cast fill value to column type. If the cast fails, fallback to the original column.
2196 match value.clone().cast_to(field.data_type()) {
2197 Ok(fill_value) => Expr::Alias(Alias {
2198 expr: Box::new(Expr::ScalarFunction(ScalarFunction {
2199 func: coalesce(),
2200 args: vec![col(field.name()), lit(fill_value)],
2201 })),
2202 relation: None,
2203 name: field.name().to_string(),
2204 metadata: None,
2205 }),
2206 Err(_) => col(field.name()),
2207 }
2208 } else {
2209 col(field.name())
2210 }
2211 })
2212 .collect::<Vec<_>>();
2213
2214 self.clone().select(projections)
2215 }
2216
2217 // Helper to find columns from names
2218 fn find_columns(&self, names: &[String]) -> Result<Vec<Field>> {
2219 let schema = self.logical_plan().schema();
2220 names
2221 .iter()
2222 .map(|name| {
2223 schema
2224 .field_with_name(None, name)
2225 .cloned()
2226 .map_err(|_| plan_datafusion_err!("Column '{}' not found", name))
2227 })
2228 .collect()
2229 }
2230
2231 /// Helper for creating DataFrame.
2232 /// # Example
2233 /// ```
2234 /// use std::sync::Arc;
2235 /// use arrow::array::{ArrayRef, Int32Array, StringArray};
2236 /// use datafusion::prelude::DataFrame;
2237 /// let id: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3]));
2238 /// let name: ArrayRef = Arc::new(StringArray::from(vec!["foo", "bar", "baz"]));
2239 /// let df = DataFrame::from_columns(vec![("id", id), ("name", name)]).unwrap();
2240 /// // +----+------+,
2241 /// // | id | name |,
2242 /// // +----+------+,
2243 /// // | 1 | foo |,
2244 /// // | 2 | bar |,
2245 /// // | 3 | baz |,
2246 /// // +----+------+,
2247 /// ```
2248 pub fn from_columns(columns: Vec<(&str, ArrayRef)>) -> Result<Self> {
2249 let fields = columns
2250 .iter()
2251 .map(|(name, array)| Field::new(*name, array.data_type().clone(), true))
2252 .collect::<Vec<_>>();
2253
2254 let arrays = columns
2255 .into_iter()
2256 .map(|(_, array)| array)
2257 .collect::<Vec<_>>();
2258
2259 let schema = Arc::new(Schema::new(fields));
2260 let batch = RecordBatch::try_new(schema, arrays)?;
2261 let ctx = SessionContext::new();
2262 let df = ctx.read_batch(batch)?;
2263 Ok(df)
2264 }
2265}
2266
2267/// Macro for creating DataFrame.
2268/// # Example
2269/// ```
2270/// use datafusion::prelude::dataframe;
2271/// # use datafusion::error::Result;
2272/// # #[tokio::main]
2273/// # async fn main() -> Result<()> {
2274/// let df = dataframe!(
2275/// "id" => [1, 2, 3],
2276/// "name" => ["foo", "bar", "baz"]
2277/// )?;
2278/// df.show().await?;
2279/// // +----+------+,
2280/// // | id | name |,
2281/// // +----+------+,
2282/// // | 1 | foo |,
2283/// // | 2 | bar |,
2284/// // | 3 | baz |,
2285/// // +----+------+,
2286/// let df_empty = dataframe!()?; // empty DataFrame
2287/// assert_eq!(df_empty.schema().fields().len(), 0);
2288/// assert_eq!(df_empty.count().await?, 0);
2289/// # Ok(())
2290/// # }
2291/// ```
2292#[macro_export]
2293macro_rules! dataframe {
2294 () => {{
2295 use std::sync::Arc;
2296
2297 use datafusion::prelude::SessionContext;
2298 use datafusion::arrow::array::RecordBatch;
2299 use datafusion::arrow::datatypes::Schema;
2300
2301 let ctx = SessionContext::new();
2302 let batch = RecordBatch::new_empty(Arc::new(Schema::empty()));
2303 ctx.read_batch(batch)
2304 }};
2305
2306 ($($name:expr => $data:expr),+ $(,)?) => {{
2307 use datafusion::prelude::DataFrame;
2308 use datafusion::common::test_util::IntoArrayRef;
2309
2310 let columns = vec![
2311 $(
2312 ($name, $data.into_array_ref()),
2313 )+
2314 ];
2315
2316 DataFrame::from_columns(columns)
2317 }};
2318}
2319
2320#[derive(Debug)]
2321struct DataFrameTableProvider {
2322 plan: LogicalPlan,
2323}
2324
2325#[async_trait]
2326impl TableProvider for DataFrameTableProvider {
2327 fn as_any(&self) -> &dyn Any {
2328 self
2329 }
2330
2331 fn get_logical_plan(&self) -> Option<Cow<LogicalPlan>> {
2332 Some(Cow::Borrowed(&self.plan))
2333 }
2334
2335 fn supports_filters_pushdown(
2336 &self,
2337 filters: &[&Expr],
2338 ) -> Result<Vec<TableProviderFilterPushDown>> {
2339 // A filter is added on the DataFrame when given
2340 Ok(vec![TableProviderFilterPushDown::Exact; filters.len()])
2341 }
2342
2343 fn schema(&self) -> SchemaRef {
2344 let schema: Schema = self.plan.schema().as_ref().into();
2345 Arc::new(schema)
2346 }
2347
2348 fn table_type(&self) -> TableType {
2349 TableType::View
2350 }
2351
2352 async fn scan(
2353 &self,
2354 state: &dyn Session,
2355 projection: Option<&Vec<usize>>,
2356 filters: &[Expr],
2357 limit: Option<usize>,
2358 ) -> Result<Arc<dyn ExecutionPlan>> {
2359 let mut expr = LogicalPlanBuilder::from(self.plan.clone());
2360 // Add filter when given
2361 let filter = filters.iter().cloned().reduce(|acc, new| acc.and(new));
2362 if let Some(filter) = filter {
2363 expr = expr.filter(filter)?
2364 }
2365
2366 if let Some(p) = projection {
2367 expr = expr.select(p.iter().copied())?
2368 }
2369
2370 // add a limit if given
2371 if let Some(l) = limit {
2372 expr = expr.limit(0, Some(l))?
2373 }
2374 let plan = expr.build()?;
2375 state.create_physical_plan(&plan).await
2376 }
2377}
2378
2379// see tests in datafusion/core/tests/dataframe/mod.rs:2816