Deprecated: The each() function is deprecated. This message will be suppressed on further calls in /home/zhenxiangba/zhenxiangba.com/public_html/phproxy-improved-master/index.php on line 456
mod.rs - source
[go: Go Back, main page]

datafusion/dataframe/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`DataFrame`] API for building and executing query plans.
19
20#[cfg(feature = "parquet")]
21mod parquet;
22
23use crate::arrow::record_batch::RecordBatch;
24use crate::arrow::util::pretty;
25use crate::datasource::file_format::csv::CsvFormatFactory;
26use crate::datasource::file_format::format_as_file_type;
27use crate::datasource::file_format::json::JsonFormatFactory;
28use crate::datasource::{
29    provider_as_source, DefaultTableSource, MemTable, TableProvider,
30};
31use crate::error::Result;
32use crate::execution::context::{SessionState, TaskContext};
33use crate::execution::FunctionRegistry;
34use crate::logical_expr::utils::find_window_exprs;
35use crate::logical_expr::{
36    col, ident, Expr, JoinType, LogicalPlan, LogicalPlanBuilder,
37    LogicalPlanBuilderOptions, Partitioning, TableType,
38};
39use crate::physical_plan::{
40    collect, collect_partitioned, execute_stream, execute_stream_partitioned,
41    ExecutionPlan, SendableRecordBatchStream,
42};
43use crate::prelude::SessionContext;
44use std::any::Any;
45use std::borrow::Cow;
46use std::collections::HashMap;
47use std::sync::Arc;
48
49use arrow::array::{Array, ArrayRef, Int64Array, StringArray};
50use arrow::compute::{cast, concat};
51use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
52use datafusion_common::config::{CsvOptions, JsonOptions};
53use datafusion_common::{
54    exec_err, not_impl_err, plan_datafusion_err, plan_err, Column, DFSchema,
55    DataFusionError, ParamValues, ScalarValue, SchemaError, UnnestOptions,
56};
57use datafusion_expr::select_expr::SelectExpr;
58use datafusion_expr::{
59    case,
60    dml::InsertOp,
61    expr::{Alias, ScalarFunction},
62    is_null, lit,
63    utils::COUNT_STAR_EXPANSION,
64    ExplainOption, SortExpr, TableProviderFilterPushDown, UNNAMED_TABLE,
65};
66use datafusion_functions::core::coalesce;
67use datafusion_functions_aggregate::expr_fn::{
68    avg, count, max, median, min, stddev, sum,
69};
70
71use async_trait::async_trait;
72use datafusion_catalog::Session;
73use datafusion_sql::TableReference;
74
75/// Contains options that control how data is
76/// written out from a DataFrame
77pub struct DataFrameWriteOptions {
78    /// Controls how new data should be written to the table, determining whether
79    /// to append, overwrite, or replace existing data.
80    insert_op: InsertOp,
81    /// Controls if all partitions should be coalesced into a single output file
82    /// Generally will have slower performance when set to true.
83    single_file_output: bool,
84    /// Sets which columns should be used for hive-style partitioned writes by name.
85    /// Can be set to empty vec![] for non-partitioned writes.
86    partition_by: Vec<String>,
87    /// Sets which columns should be used for sorting the output by name.
88    /// Can be set to empty vec![] for non-sorted writes.
89    sort_by: Vec<SortExpr>,
90}
91
92impl DataFrameWriteOptions {
93    /// Create a new DataFrameWriteOptions with default values
94    pub fn new() -> Self {
95        DataFrameWriteOptions {
96            insert_op: InsertOp::Append,
97            single_file_output: false,
98            partition_by: vec![],
99            sort_by: vec![],
100        }
101    }
102
103    /// Set the insert operation
104    pub fn with_insert_operation(mut self, insert_op: InsertOp) -> Self {
105        self.insert_op = insert_op;
106        self
107    }
108
109    /// Set the single_file_output value to true or false
110    pub fn with_single_file_output(mut self, single_file_output: bool) -> Self {
111        self.single_file_output = single_file_output;
112        self
113    }
114
115    /// Sets the partition_by columns for output partitioning
116    pub fn with_partition_by(mut self, partition_by: Vec<String>) -> Self {
117        self.partition_by = partition_by;
118        self
119    }
120
121    /// Sets the sort_by columns for output sorting
122    pub fn with_sort_by(mut self, sort_by: Vec<SortExpr>) -> Self {
123        self.sort_by = sort_by;
124        self
125    }
126}
127
128impl Default for DataFrameWriteOptions {
129    fn default() -> Self {
130        Self::new()
131    }
132}
133
134/// Represents a logical set of rows with the same named columns.
135///
136/// Similar to a [Pandas DataFrame] or [Spark DataFrame], a DataFusion DataFrame
137/// represents a 2 dimensional table of rows and columns.
138///
139/// The typical workflow using DataFrames looks like
140///
141/// 1. Create a DataFrame via methods on [SessionContext], such as [`read_csv`]
142///    and [`read_parquet`].
143///
144/// 2. Build a desired calculation by calling methods such as [`filter`],
145///    [`select`], [`aggregate`], and [`limit`]
146///
147/// 3. Execute into [`RecordBatch`]es by calling [`collect`]
148///
149/// A `DataFrame` is a wrapper around a [`LogicalPlan`] and the [`SessionState`]
150///    required for execution.
151///
152/// DataFrames are "lazy" in the sense that most methods do not actually compute
153/// anything, they just build up a plan. Calling [`collect`] executes the plan
154/// using the same DataFusion planning and execution process used to execute SQL
155/// and other queries.
156///
157/// [Pandas DataFrame]: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html
158/// [Spark DataFrame]: https://spark.apache.org/docs/latest/sql-programming-guide.html
159/// [`read_csv`]: SessionContext::read_csv
160/// [`read_parquet`]: SessionContext::read_parquet
161/// [`filter`]: DataFrame::filter
162/// [`select`]: DataFrame::select
163/// [`aggregate`]: DataFrame::aggregate
164/// [`limit`]: DataFrame::limit
165/// [`collect`]: DataFrame::collect
166///
167/// # Example
168/// ```
169/// # use std::sync::Arc;
170/// # use datafusion::prelude::*;
171/// # use datafusion::error::Result;
172/// # use datafusion::functions_aggregate::expr_fn::min;
173/// # use datafusion::arrow::array::{Int32Array, RecordBatch, StringArray};
174/// # use datafusion::arrow::datatypes::{DataType, Field, Schema};
175/// # #[tokio::main]
176/// # async fn main() -> Result<()> {
177/// let ctx = SessionContext::new();
178/// // Read the data from a csv file
179/// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
180/// // create a new dataframe that computes the equivalent of
181/// // `SELECT a, MIN(b) FROM df WHERE a <= b GROUP BY a LIMIT 100;`
182/// let df = df.filter(col("a").lt_eq(col("b")))?
183///            .aggregate(vec![col("a")], vec![min(col("b"))])?
184///            .limit(0, Some(100))?;
185/// // Perform the actual computation
186/// let results = df.collect();
187///
188/// // Create a new dataframe with in-memory data
189/// let schema = Schema::new(vec![
190///     Field::new("id", DataType::Int32, true),
191///     Field::new("name", DataType::Utf8, true),
192/// ]);
193/// let batch = RecordBatch::try_new(
194///     Arc::new(schema),
195///     vec![
196///         Arc::new(Int32Array::from(vec![1, 2, 3])),
197///         Arc::new(StringArray::from(vec!["foo", "bar", "baz"])),
198///     ],
199/// )?;
200/// let df = ctx.read_batch(batch)?;
201/// df.show().await?;
202///
203/// // Create a new dataframe with in-memory data using macro
204/// let df = dataframe!(
205///     "id" => [1, 2, 3],
206///     "name" => ["foo", "bar", "baz"]
207///  )?;
208/// df.show().await?;
209/// # Ok(())
210/// # }
211/// ```
212#[derive(Debug, Clone)]
213pub struct DataFrame {
214    // Box the (large) SessionState to reduce the size of DataFrame on the stack
215    session_state: Box<SessionState>,
216    plan: LogicalPlan,
217    // Whether projection ops can skip validation or not. This flag if false
218    // allows for an optimization in `with_column` and `with_column_renamed` functions
219    // where the recursive work required to columnize and normalize expressions can
220    // be skipped if set to false. Since these function calls are often chained or
221    // called many times in dataframe operations this can result in a significant
222    // performance gain.
223    //
224    // The conditions where this can be set to false is when the dataframe function
225    // call results in the last operation being a
226    // `LogicalPlanBuilder::from(plan).project(fields)?.build()` or
227    // `LogicalPlanBuilder::from(plan).project_with_validation(fields)?.build()`
228    // call. This requirement guarantees that the plan has had all columnization
229    // and normalization applied to existing expressions and only new expressions
230    // will require that work. Any operation that update the plan in any way
231    // via anything other than a `project` call should set this to true.
232    projection_requires_validation: bool,
233}
234
235impl DataFrame {
236    /// Create a new `DataFrame ` based on an existing `LogicalPlan`
237    ///
238    /// This is a low-level method and is not typically used by end users. See
239    /// [`SessionContext::read_csv`] and other methods for creating a
240    /// `DataFrame` from an existing datasource.
241    pub fn new(session_state: SessionState, plan: LogicalPlan) -> Self {
242        Self {
243            session_state: Box::new(session_state),
244            plan,
245            projection_requires_validation: true,
246        }
247    }
248
249    /// Creates logical expression from a SQL query text.
250    /// The expression is created and processed against the current schema.
251    ///
252    /// # Example: Parsing SQL queries
253    /// ```
254    /// # use arrow::datatypes::{DataType, Field, Schema};
255    /// # use datafusion::prelude::*;
256    /// # use datafusion_common::{DFSchema, Result};
257    /// # #[tokio::main]
258    /// # async fn main() -> Result<()> {
259    /// // datafusion will parse number as i64 first.
260    /// let sql = "a > 1 and b in (1, 10)";
261    /// let expected = col("a").gt(lit(1 as i64))
262    ///   .and(col("b").in_list(vec![lit(1 as i64), lit(10 as i64)], false));
263    /// let ctx = SessionContext::new();
264    /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
265    /// let expr = df.parse_sql_expr(sql)?;
266    /// assert_eq!(expected, expr);
267    /// # Ok(())
268    /// # }
269    /// ```
270    pub fn parse_sql_expr(&self, sql: &str) -> Result<Expr> {
271        let df_schema = self.schema();
272
273        self.session_state.create_logical_expr(sql, df_schema)
274    }
275
276    /// Consume the DataFrame and produce a physical plan
277    pub async fn create_physical_plan(self) -> Result<Arc<dyn ExecutionPlan>> {
278        self.session_state.create_physical_plan(&self.plan).await
279    }
280
281    /// Filter the DataFrame by column. Returns a new DataFrame only containing the
282    /// specified columns.
283    ///
284    /// ```
285    /// # use datafusion::prelude::*;
286    /// # use datafusion::error::Result;
287    /// # use datafusion_common::assert_batches_sorted_eq;
288    /// # #[tokio::main]
289    /// # async fn main() -> Result<()> {
290    /// let ctx = SessionContext::new();
291    /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
292    /// let df = df.select_columns(&["a", "b"])?;
293    /// let expected = vec![
294    ///     "+---+---+",
295    ///     "| a | b |",
296    ///     "+---+---+",
297    ///     "| 1 | 2 |",
298    ///     "+---+---+"
299    /// ];
300    /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
301    /// # Ok(())
302    /// # }
303    /// ```
304    pub fn select_columns(self, columns: &[&str]) -> Result<DataFrame> {
305        let fields = columns
306            .iter()
307            .map(|name| {
308                self.plan
309                    .schema()
310                    .qualified_field_with_unqualified_name(name)
311            })
312            .collect::<Result<Vec<_>>>()?;
313        let expr: Vec<Expr> = fields
314            .into_iter()
315            .map(|(qualifier, field)| Expr::Column(Column::from((qualifier, field))))
316            .collect();
317        self.select(expr)
318    }
319    /// Project arbitrary list of expression strings into a new `DataFrame`.
320    /// Method will parse string expressions into logical plan expressions.
321    ///
322    /// The output `DataFrame` has one column for each element in `exprs`.
323    ///
324    /// # Example
325    /// ```
326    /// # use datafusion::prelude::*;
327    /// # use datafusion::error::Result;
328    /// # #[tokio::main]
329    /// # async fn main() -> Result<()> {
330    /// let ctx = SessionContext::new();
331    /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
332    /// let df : DataFrame = df.select_exprs(&["a * b", "c"])?;
333    /// # Ok(())
334    /// # }
335    /// ```
336    pub fn select_exprs(self, exprs: &[&str]) -> Result<DataFrame> {
337        let expr_list = exprs
338            .iter()
339            .map(|e| self.parse_sql_expr(e))
340            .collect::<Result<Vec<_>>>()?;
341
342        self.select(expr_list)
343    }
344
345    /// Project arbitrary expressions (like SQL SELECT expressions) into a new
346    /// `DataFrame`.
347    ///
348    /// The output `DataFrame` has one column for each element in `expr_list`.
349    ///
350    /// # Example
351    /// ```
352    /// # use datafusion::prelude::*;
353    /// # use datafusion::error::Result;
354    /// # use datafusion_common::assert_batches_sorted_eq;
355    /// # #[tokio::main]
356    /// # async fn main() -> Result<()> {
357    /// let ctx = SessionContext::new();
358    /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
359    /// let df = df.select(vec![col("a"), col("b") * col("c")])?;
360    /// let expected = vec![
361    ///     "+---+-----------------------+",
362    ///     "| a | ?table?.b * ?table?.c |",
363    ///     "+---+-----------------------+",
364    ///     "| 1 | 6                     |",
365    ///     "+---+-----------------------+"
366    /// ];
367    /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
368    /// # Ok(())
369    /// # }
370    /// ```
371    pub fn select(
372        self,
373        expr_list: impl IntoIterator<Item = impl Into<SelectExpr>>,
374    ) -> Result<DataFrame> {
375        let expr_list: Vec<SelectExpr> =
376            expr_list.into_iter().map(|e| e.into()).collect::<Vec<_>>();
377
378        let expressions = expr_list.iter().filter_map(|e| match e {
379            SelectExpr::Expression(expr) => Some(expr),
380            _ => None,
381        });
382
383        let window_func_exprs = find_window_exprs(expressions);
384        let plan = if window_func_exprs.is_empty() {
385            self.plan
386        } else {
387            LogicalPlanBuilder::window_plan(self.plan, window_func_exprs)?
388        };
389
390        let project_plan = LogicalPlanBuilder::from(plan).project(expr_list)?.build()?;
391
392        Ok(DataFrame {
393            session_state: self.session_state,
394            plan: project_plan,
395            projection_requires_validation: false,
396        })
397    }
398
399    /// Returns a new DataFrame containing all columns except the specified columns.
400    ///
401    /// ```
402    /// # use datafusion::prelude::*;
403    /// # use datafusion::error::Result;
404    /// # use datafusion_common::assert_batches_sorted_eq;
405    /// # #[tokio::main]
406    /// # async fn main() -> Result<()> {
407    /// let ctx = SessionContext::new();
408    /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
409    /// // +----+----+----+
410    /// // | a  | b  | c  |
411    /// // +----+----+----+
412    /// // | 1  | 2  | 3  |
413    /// // +----+----+----+
414    /// let df = df.drop_columns(&["a"])?;
415    /// let expected = vec![
416    ///     "+---+---+",
417    ///     "| b | c |",
418    ///     "+---+---+",
419    ///     "| 2 | 3 |",
420    ///     "+---+---+"
421    /// ];
422    /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
423    /// # Ok(())
424    /// # }
425    /// ```
426    pub fn drop_columns(self, columns: &[&str]) -> Result<DataFrame> {
427        let fields_to_drop = columns
428            .iter()
429            .map(|name| {
430                self.plan
431                    .schema()
432                    .qualified_field_with_unqualified_name(name)
433            })
434            .filter(|r| r.is_ok())
435            .collect::<Result<Vec<_>>>()?;
436        let expr: Vec<Expr> = self
437            .plan
438            .schema()
439            .fields()
440            .into_iter()
441            .enumerate()
442            .map(|(idx, _)| self.plan.schema().qualified_field(idx))
443            .filter(|(qualifier, f)| !fields_to_drop.contains(&(*qualifier, f)))
444            .map(|(qualifier, field)| Expr::Column(Column::from((qualifier, field))))
445            .collect();
446        self.select(expr)
447    }
448
449    /// Expand multiple list/struct columns into a set of rows and new columns.
450    ///
451    /// See also: [`UnnestOptions`] documentation for the behavior of `unnest`
452    ///
453    /// # Example
454    /// ```
455    /// # use datafusion::prelude::*;
456    /// # use datafusion::error::Result;
457    /// # use datafusion_common::assert_batches_sorted_eq;
458    /// # #[tokio::main]
459    /// # async fn main() -> Result<()> {
460    /// let ctx = SessionContext::new();
461    /// let df = ctx.read_json("tests/data/unnest.json", NdJsonReadOptions::default()).await?;
462    /// // expand into multiple columns if it's json array, flatten field name if it's nested structure
463    /// let df = df.unnest_columns(&["b","c","d"])?;
464    /// let expected = vec![
465    ///     "+---+------+-------+-----+-----+",
466    ///     "| a | b    | c     | d.e | d.f |",
467    ///     "+---+------+-------+-----+-----+",
468    ///     "| 1 | 2.0  | false | 1   | 2   |",
469    ///     "| 1 | 1.3  | true  | 1   | 2   |",
470    ///     "| 1 | -6.1 |       | 1   | 2   |",
471    ///     "| 2 | 3.0  | false |     |     |",
472    ///     "| 2 | 2.3  | true  |     |     |",
473    ///     "| 2 | -7.1 |       |     |     |",
474    ///     "+---+------+-------+-----+-----+"
475    /// ];
476    /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
477    /// # Ok(())
478    /// # }
479    /// ```
480    pub fn unnest_columns(self, columns: &[&str]) -> Result<DataFrame> {
481        self.unnest_columns_with_options(columns, UnnestOptions::new())
482    }
483
484    /// Expand multiple list columns into a set of rows, with
485    /// behavior controlled by [`UnnestOptions`].
486    ///
487    /// Please see the documentation on [`UnnestOptions`] for more
488    /// details about the meaning of unnest.
489    pub fn unnest_columns_with_options(
490        self,
491        columns: &[&str],
492        options: UnnestOptions,
493    ) -> Result<DataFrame> {
494        let columns = columns.iter().map(|c| Column::from(*c)).collect();
495        let plan = LogicalPlanBuilder::from(self.plan)
496            .unnest_columns_with_options(columns, options)?
497            .build()?;
498        Ok(DataFrame {
499            session_state: self.session_state,
500            plan,
501            projection_requires_validation: true,
502        })
503    }
504
505    /// Return a DataFrame with only rows for which `predicate` evaluates to
506    /// `true`.
507    ///
508    /// Rows for which `predicate` evaluates to `false` or `null`
509    /// are filtered out.
510    ///
511    /// # Example
512    /// ```
513    /// # use datafusion::prelude::*;
514    /// # use datafusion::error::Result;
515    /// # use datafusion_common::assert_batches_sorted_eq;
516    /// # #[tokio::main]
517    /// # async fn main() -> Result<()> {
518    /// let ctx = SessionContext::new();
519    /// let df = ctx.read_csv("tests/data/example_long.csv", CsvReadOptions::new()).await?;
520    /// let df = df.filter(col("a").lt_eq(col("b")))?;
521    /// // all rows where a <= b are returned
522    /// let expected = vec![
523    ///     "+---+---+---+",
524    ///     "| a | b | c |",
525    ///     "+---+---+---+",
526    ///     "| 1 | 2 | 3 |",
527    ///     "| 4 | 5 | 6 |",
528    ///     "| 7 | 8 | 9 |",
529    ///     "+---+---+---+"
530    /// ];
531    /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
532    /// # Ok(())
533    /// # }
534    /// ```
535    pub fn filter(self, predicate: Expr) -> Result<DataFrame> {
536        let plan = LogicalPlanBuilder::from(self.plan)
537            .filter(predicate)?
538            .build()?;
539        Ok(DataFrame {
540            session_state: self.session_state,
541            plan,
542            projection_requires_validation: true,
543        })
544    }
545
546    /// Return a new `DataFrame` that aggregates the rows of the current
547    /// `DataFrame`, first optionally grouping by the given expressions.
548    ///
549    /// # Example
550    /// ```
551    /// # use datafusion::prelude::*;
552    /// # use datafusion::error::Result;
553    /// # use datafusion::functions_aggregate::expr_fn::min;
554    /// # use datafusion_common::assert_batches_sorted_eq;
555    /// # #[tokio::main]
556    /// # async fn main() -> Result<()> {
557    /// let ctx = SessionContext::new();
558    /// let df = ctx.read_csv("tests/data/example_long.csv", CsvReadOptions::new()).await?;
559    ///
560    /// // The following use is the equivalent of "SELECT MIN(b) GROUP BY a"
561    /// let df1 = df.clone().aggregate(vec![col("a")], vec![min(col("b"))])?;
562    /// let expected1 = vec![
563    ///     "+---+----------------+",
564    ///     "| a | min(?table?.b) |",
565    ///     "+---+----------------+",
566    ///     "| 1 | 2              |",
567    ///     "| 4 | 5              |",
568    ///     "| 7 | 8              |",
569    ///     "+---+----------------+"
570    /// ];
571    /// assert_batches_sorted_eq!(expected1, &df1.collect().await?);
572    /// // The following use is the equivalent of "SELECT MIN(b)"
573    /// let df2 = df.aggregate(vec![], vec![min(col("b"))])?;
574    /// let expected2 = vec![
575    ///     "+----------------+",
576    ///     "| min(?table?.b) |",
577    ///     "+----------------+",
578    ///     "| 2              |",
579    ///     "+----------------+"
580    /// ];
581    /// # assert_batches_sorted_eq!(expected2, &df2.collect().await?);
582    /// # Ok(())
583    /// # }
584    /// ```
585    pub fn aggregate(
586        self,
587        group_expr: Vec<Expr>,
588        aggr_expr: Vec<Expr>,
589    ) -> Result<DataFrame> {
590        let is_grouping_set = matches!(group_expr.as_slice(), [Expr::GroupingSet(_)]);
591        let aggr_expr_len = aggr_expr.len();
592        let options =
593            LogicalPlanBuilderOptions::new().with_add_implicit_group_by_exprs(true);
594        let plan = LogicalPlanBuilder::from(self.plan)
595            .with_options(options)
596            .aggregate(group_expr, aggr_expr)?
597            .build()?;
598        let plan = if is_grouping_set {
599            let grouping_id_pos = plan.schema().fields().len() - 1 - aggr_expr_len;
600            // For grouping sets we do a project to not expose the internal grouping id
601            let exprs = plan
602                .schema()
603                .columns()
604                .into_iter()
605                .enumerate()
606                .filter(|(idx, _)| *idx != grouping_id_pos)
607                .map(|(_, column)| Expr::Column(column))
608                .collect::<Vec<_>>();
609            LogicalPlanBuilder::from(plan).project(exprs)?.build()?
610        } else {
611            plan
612        };
613        Ok(DataFrame {
614            session_state: self.session_state,
615            plan,
616            projection_requires_validation: !is_grouping_set,
617        })
618    }
619
620    /// Return a new DataFrame that adds the result of evaluating one or more
621    /// window functions ([`Expr::WindowFunction`]) to the existing columns
622    pub fn window(self, window_exprs: Vec<Expr>) -> Result<DataFrame> {
623        let plan = LogicalPlanBuilder::from(self.plan)
624            .window(window_exprs)?
625            .build()?;
626        Ok(DataFrame {
627            session_state: self.session_state,
628            plan,
629            projection_requires_validation: true,
630        })
631    }
632
633    /// Returns a new `DataFrame` with a limited number of rows.
634    ///
635    /// # Arguments
636    /// `skip` - Number of rows to skip before fetch any row
637    /// `fetch` - Maximum number of rows to return, after skipping `skip` rows.
638    ///
639    /// # Example
640    /// ```
641    /// # use datafusion::prelude::*;
642    /// # use datafusion::error::Result;
643    /// # use datafusion_common::assert_batches_sorted_eq;
644    /// # #[tokio::main]
645    /// # async fn main() -> Result<()> {
646    /// let ctx = SessionContext::new();
647    /// let df = ctx.read_csv("tests/data/example_long.csv", CsvReadOptions::new()).await?;
648    /// let df = df.limit(1, Some(2))?;
649    /// let expected = vec![
650    ///     "+---+---+---+",
651    ///     "| a | b | c |",
652    ///     "+---+---+---+",
653    ///     "| 4 | 5 | 6 |",
654    ///     "| 7 | 8 | 9 |",
655    ///     "+---+---+---+"
656    /// ];
657    /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
658    /// # Ok(())
659    /// # }
660    /// ```
661    pub fn limit(self, skip: usize, fetch: Option<usize>) -> Result<DataFrame> {
662        let plan = LogicalPlanBuilder::from(self.plan)
663            .limit(skip, fetch)?
664            .build()?;
665        Ok(DataFrame {
666            session_state: self.session_state,
667            plan,
668            projection_requires_validation: self.projection_requires_validation,
669        })
670    }
671
672    /// Calculate the union of two [`DataFrame`]s, preserving duplicate rows.
673    ///
674    /// The two [`DataFrame`]s must have exactly the same schema
675    ///
676    /// # Example
677    /// ```
678    /// # use datafusion::prelude::*;
679    /// # use datafusion::error::Result;
680    /// # use datafusion_common::assert_batches_sorted_eq;
681    /// # #[tokio::main]
682    /// # async fn main() -> Result<()> {
683    /// let ctx = SessionContext::new();
684    /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?   ;
685    /// let d2 = df.clone();
686    /// let df = df.union(d2)?;
687    /// let expected = vec![
688    ///     "+---+---+---+",
689    ///     "| a | b | c |",
690    ///     "+---+---+---+",
691    ///     "| 1 | 2 | 3 |",
692    ///     "| 1 | 2 | 3 |",
693    ///     "+---+---+---+"
694    /// ];
695    /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
696    /// # Ok(())
697    /// # }
698    /// ```
699    pub fn union(self, dataframe: DataFrame) -> Result<DataFrame> {
700        let plan = LogicalPlanBuilder::from(self.plan)
701            .union(dataframe.plan)?
702            .build()?;
703        Ok(DataFrame {
704            session_state: self.session_state,
705            plan,
706            projection_requires_validation: true,
707        })
708    }
709
710    /// Calculate the union of two [`DataFrame`]s using column names, preserving duplicate rows.
711    ///
712    /// The two [`DataFrame`]s are combined using column names rather than position,
713    /// filling missing columns with null.
714    ///
715    ///
716    /// # Example
717    /// ```
718    /// # use datafusion::prelude::*;
719    /// # use datafusion::error::Result;
720    /// # use datafusion_common::assert_batches_sorted_eq;
721    /// # #[tokio::main]
722    /// # async fn main() -> Result<()> {
723    /// let ctx = SessionContext::new();
724    /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
725    /// let d2 = df.clone().select_columns(&["b", "c", "a"])?.with_column("d", lit("77"))?;
726    /// let df = df.union_by_name(d2)?;
727    /// let expected = vec![
728    ///     "+---+---+---+----+",
729    ///     "| a | b | c | d  |",
730    ///     "+---+---+---+----+",
731    ///     "| 1 | 2 | 3 |    |",
732    ///     "| 1 | 2 | 3 | 77 |",
733    ///     "+---+---+---+----+"
734    /// ];
735    /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
736    /// # Ok(())
737    /// # }
738    /// ```
739    pub fn union_by_name(self, dataframe: DataFrame) -> Result<DataFrame> {
740        let plan = LogicalPlanBuilder::from(self.plan)
741            .union_by_name(dataframe.plan)?
742            .build()?;
743        Ok(DataFrame {
744            session_state: self.session_state,
745            plan,
746            projection_requires_validation: true,
747        })
748    }
749
750    /// Calculate the distinct union of two [`DataFrame`]s.
751    ///
752    /// The two [`DataFrame`]s must have exactly the same schema. Any duplicate
753    /// rows are discarded.
754    ///
755    /// # Example
756    /// ```
757    /// # use datafusion::prelude::*;
758    /// # use datafusion::error::Result;
759    /// # use datafusion_common::assert_batches_sorted_eq;
760    /// # #[tokio::main]
761    /// # async fn main() -> Result<()> {
762    /// let ctx = SessionContext::new();
763    /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
764    /// let d2 = df.clone();
765    /// let df = df.union_distinct(d2)?;
766    /// // df2 are duplicate of df
767    /// let expected = vec![
768    ///     "+---+---+---+",
769    ///     "| a | b | c |",
770    ///     "+---+---+---+",
771    ///     "| 1 | 2 | 3 |",
772    ///     "+---+---+---+"
773    /// ];
774    /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
775    /// # Ok(())
776    /// # }
777    /// ```
778    pub fn union_distinct(self, dataframe: DataFrame) -> Result<DataFrame> {
779        let plan = LogicalPlanBuilder::from(self.plan)
780            .union_distinct(dataframe.plan)?
781            .build()?;
782        Ok(DataFrame {
783            session_state: self.session_state,
784            plan,
785            projection_requires_validation: true,
786        })
787    }
788
789    /// Calculate the union of two [`DataFrame`]s using column names with all duplicated rows removed.
790    ///
791    /// The two [`DataFrame`]s are combined using column names rather than position,
792    /// filling missing columns with null.
793    ///
794    ///
795    /// # Example
796    /// ```
797    /// # use datafusion::prelude::*;
798    /// # use datafusion::error::Result;
799    /// # use datafusion_common::assert_batches_sorted_eq;
800    /// # #[tokio::main]
801    /// # async fn main() -> Result<()> {
802    /// let ctx = SessionContext::new();
803    /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
804    /// let d2 = df.clone().select_columns(&["b", "c", "a"])?;
805    /// let df = df.union_by_name_distinct(d2)?;
806    /// let expected = vec![
807    ///     "+---+---+---+",
808    ///     "| a | b | c |",
809    ///     "+---+---+---+",
810    ///     "| 1 | 2 | 3 |",
811    ///     "+---+---+---+"
812    /// ];
813    /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
814    /// # Ok(())
815    /// # }
816    /// ```
817    pub fn union_by_name_distinct(self, dataframe: DataFrame) -> Result<DataFrame> {
818        let plan = LogicalPlanBuilder::from(self.plan)
819            .union_by_name_distinct(dataframe.plan)?
820            .build()?;
821        Ok(DataFrame {
822            session_state: self.session_state,
823            plan,
824            projection_requires_validation: true,
825        })
826    }
827
828    /// Return a new `DataFrame` with all duplicated rows removed.
829    ///
830    /// # Example
831    /// ```
832    /// # use datafusion::prelude::*;
833    /// # use datafusion::error::Result;
834    /// # use datafusion_common::assert_batches_sorted_eq;
835    /// # #[tokio::main]
836    /// # async fn main() -> Result<()> {
837    /// let ctx = SessionContext::new();
838    /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
839    /// let df = df.distinct()?;
840    /// let expected = vec![
841    ///     "+---+---+---+",
842    ///     "| a | b | c |",
843    ///     "+---+---+---+",
844    ///     "| 1 | 2 | 3 |",
845    ///     "+---+---+---+"
846    /// ];
847    /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
848    /// # Ok(())
849    /// # }
850    /// ```
851    pub fn distinct(self) -> Result<DataFrame> {
852        let plan = LogicalPlanBuilder::from(self.plan).distinct()?.build()?;
853        Ok(DataFrame {
854            session_state: self.session_state,
855            plan,
856            projection_requires_validation: true,
857        })
858    }
859
860    /// Return a new `DataFrame` with duplicated rows removed as per the specified expression list
861    /// according to the provided sorting expressions grouped by the `DISTINCT ON` clause
862    /// expressions.
863    ///
864    /// # Example
865    /// ```
866    /// # use datafusion::prelude::*;
867    /// # use datafusion::error::Result;
868    /// # use datafusion_common::assert_batches_sorted_eq;
869    /// # #[tokio::main]
870    /// # async fn main() -> Result<()> {
871    /// let ctx = SessionContext::new();
872    /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?
873    ///   // Return a single row (a, b) for each distinct value of a
874    ///   .distinct_on(vec![col("a")], vec![col("a"), col("b")], None)?;
875    /// let expected = vec![
876    ///     "+---+---+",
877    ///     "| a | b |",
878    ///     "+---+---+",
879    ///     "| 1 | 2 |",
880    ///     "+---+---+"
881    /// ];
882    /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
883    /// # Ok(())
884    /// # }
885    /// ```
886    pub fn distinct_on(
887        self,
888        on_expr: Vec<Expr>,
889        select_expr: Vec<Expr>,
890        sort_expr: Option<Vec<SortExpr>>,
891    ) -> Result<DataFrame> {
892        let plan = LogicalPlanBuilder::from(self.plan)
893            .distinct_on(on_expr, select_expr, sort_expr)?
894            .build()?;
895        Ok(DataFrame {
896            session_state: self.session_state,
897            plan,
898            projection_requires_validation: true,
899        })
900    }
901
902    /// Return a new `DataFrame` that has statistics for a DataFrame.
903    ///
904    /// Only summarizes numeric datatypes at the moment and returns nulls for
905    /// non numeric datatypes. The output format is modeled after pandas
906    ///
907    /// # Example
908    /// ```
909    /// # use datafusion::prelude::*;
910    /// # use datafusion::error::Result;
911    /// # use arrow::util::pretty;
912    /// # use datafusion_common::assert_batches_sorted_eq;
913    /// # #[tokio::main]
914    /// # async fn main() -> Result<()> {
915    /// let ctx = SessionContext::new();
916    /// let df = ctx.read_csv("tests/tpch-csv/customer.csv", CsvReadOptions::new()).await?;
917    /// let stat = df.describe().await?;
918    /// # // some output column are ignored
919    /// let expected = vec![
920    ///     "+------------+--------------------+--------------------+------------------------------------+--------------------+-----------------+--------------------+--------------+----------------------------------------------------------------------------------------------------------+",
921    ///     "| describe   | c_custkey          | c_name             | c_address                          | c_nationkey        | c_phone         | c_acctbal          | c_mktsegment | c_comment                                                                                                |",
922    ///     "+------------+--------------------+--------------------+------------------------------------+--------------------+-----------------+--------------------+--------------+----------------------------------------------------------------------------------------------------------+",
923    ///     "| count      | 9.0                | 9                  | 9                                  | 9.0                | 9               | 9.0                | 9            | 9                                                                                                        |",
924    ///     "| max        | 10.0               | Customer#000000010 | xKiAFTjUsCuxfeleNqefumTrjS         | 20.0               | 30-114-968-4951 | 9561.95            | MACHINERY    | tions. even deposits boost according to the slyly bold packages. final accounts cajole requests. furious |",
925    ///     "| mean       | 6.0                | null               | null                               | 9.88888888888889   | null            | 5153.2155555555555 | null         | null                                                                                                     |",
926    ///     "| median     | 6.0                | null               | null                               | 8.0                | null            | 6819.74            | null         | null                                                                                                     |",
927    ///     "| min        | 2.0                | Customer#000000002 | 6LrEaV6KR6PLVcgl2ArL Q3rqzLzcT1 v2 | 1.0                | 11-719-748-3364 | 121.65             | AUTOMOBILE   |  deposits eat slyly ironic, even instructions. express foxes detect slyly. blithely even accounts abov   |",
928    ///     "| null_count | 0.0                | 0                  | 0                                  | 0.0                | 0               | 0.0                | 0            | 0                                                                                                        |",
929    ///     "| std        | 2.7386127875258306 | null               | null                               | 7.2188026092359046 | null            | 3522.169804254585  | null         | null                                                                                                     |",
930    ///     "+------------+--------------------+--------------------+------------------------------------+--------------------+-----------------+--------------------+--------------+----------------------------------------------------------------------------------------------------------+"];
931    /// assert_batches_sorted_eq!(expected, &stat.collect().await?);
932    /// # Ok(())
933    /// # }
934    /// ```
935    pub async fn describe(self) -> Result<Self> {
936        //the functions now supported
937        let supported_describe_functions =
938            vec!["count", "null_count", "mean", "std", "min", "max", "median"];
939
940        let original_schema_fields = self.schema().fields().iter();
941
942        //define describe column
943        let mut describe_schemas = vec![Field::new("describe", DataType::Utf8, false)];
944        describe_schemas.extend(original_schema_fields.clone().map(|field| {
945            if field.data_type().is_numeric() {
946                Field::new(field.name(), DataType::Float64, true)
947            } else {
948                Field::new(field.name(), DataType::Utf8, true)
949            }
950        }));
951
952        //collect recordBatch
953        let describe_record_batch = vec![
954            // count aggregation
955            self.clone().aggregate(
956                vec![],
957                original_schema_fields
958                    .clone()
959                    .map(|f| count(ident(f.name())).alias(f.name()))
960                    .collect::<Vec<_>>(),
961            ),
962            // null_count aggregation
963            self.clone().aggregate(
964                vec![],
965                original_schema_fields
966                    .clone()
967                    .map(|f| {
968                        sum(case(is_null(ident(f.name())))
969                            .when(lit(true), lit(1))
970                            .otherwise(lit(0))
971                            .unwrap())
972                        .alias(f.name())
973                    })
974                    .collect::<Vec<_>>(),
975            ),
976            // mean aggregation
977            self.clone().aggregate(
978                vec![],
979                original_schema_fields
980                    .clone()
981                    .filter(|f| f.data_type().is_numeric())
982                    .map(|f| avg(ident(f.name())).alias(f.name()))
983                    .collect::<Vec<_>>(),
984            ),
985            // std aggregation
986            self.clone().aggregate(
987                vec![],
988                original_schema_fields
989                    .clone()
990                    .filter(|f| f.data_type().is_numeric())
991                    .map(|f| stddev(ident(f.name())).alias(f.name()))
992                    .collect::<Vec<_>>(),
993            ),
994            // min aggregation
995            self.clone().aggregate(
996                vec![],
997                original_schema_fields
998                    .clone()
999                    .filter(|f| {
1000                        !matches!(f.data_type(), DataType::Binary | DataType::Boolean)
1001                    })
1002                    .map(|f| min(ident(f.name())).alias(f.name()))
1003                    .collect::<Vec<_>>(),
1004            ),
1005            // max aggregation
1006            self.clone().aggregate(
1007                vec![],
1008                original_schema_fields
1009                    .clone()
1010                    .filter(|f| {
1011                        !matches!(f.data_type(), DataType::Binary | DataType::Boolean)
1012                    })
1013                    .map(|f| max(ident(f.name())).alias(f.name()))
1014                    .collect::<Vec<_>>(),
1015            ),
1016            // median aggregation
1017            self.clone().aggregate(
1018                vec![],
1019                original_schema_fields
1020                    .clone()
1021                    .filter(|f| f.data_type().is_numeric())
1022                    .map(|f| median(ident(f.name())).alias(f.name()))
1023                    .collect::<Vec<_>>(),
1024            ),
1025        ];
1026
1027        // first column with function names
1028        let mut array_ref_vec: Vec<ArrayRef> = vec![Arc::new(StringArray::from(
1029            supported_describe_functions.clone(),
1030        ))];
1031        for field in original_schema_fields {
1032            let mut array_datas = vec![];
1033            for result in describe_record_batch.iter() {
1034                let array_ref = match result {
1035                    Ok(df) => {
1036                        let batches = df.clone().collect().await;
1037                        match batches {
1038                            Ok(batches)
1039                                if batches.len() == 1
1040                                    && batches[0]
1041                                        .column_by_name(field.name())
1042                                        .is_some() =>
1043                            {
1044                                let column =
1045                                    batches[0].column_by_name(field.name()).unwrap();
1046
1047                                if column.data_type().is_null() {
1048                                    Arc::new(StringArray::from(vec!["null"]))
1049                                } else if field.data_type().is_numeric() {
1050                                    cast(column, &DataType::Float64)?
1051                                } else {
1052                                    cast(column, &DataType::Utf8)?
1053                                }
1054                            }
1055                            _ => Arc::new(StringArray::from(vec!["null"])),
1056                        }
1057                    }
1058                    //Handling error when only boolean/binary column, and in other cases
1059                    Err(err)
1060                        if err.to_string().contains(
1061                            "Error during planning: \
1062                                            Aggregate requires at least one grouping \
1063                                            or aggregate expression",
1064                        ) =>
1065                    {
1066                        Arc::new(StringArray::from(vec!["null"]))
1067                    }
1068                    Err(e) => return exec_err!("{}", e),
1069                };
1070                array_datas.push(array_ref);
1071            }
1072            array_ref_vec.push(concat(
1073                array_datas
1074                    .iter()
1075                    .map(|af| af.as_ref())
1076                    .collect::<Vec<_>>()
1077                    .as_slice(),
1078            )?);
1079        }
1080
1081        let describe_record_batch =
1082            RecordBatch::try_new(Arc::new(Schema::new(describe_schemas)), array_ref_vec)?;
1083
1084        let provider = MemTable::try_new(
1085            describe_record_batch.schema(),
1086            vec![vec![describe_record_batch]],
1087        )?;
1088
1089        let plan = LogicalPlanBuilder::scan(
1090            UNNAMED_TABLE,
1091            provider_as_source(Arc::new(provider)),
1092            None,
1093        )?
1094        .build()?;
1095
1096        Ok(DataFrame {
1097            session_state: self.session_state,
1098            plan,
1099            projection_requires_validation: self.projection_requires_validation,
1100        })
1101    }
1102
1103    /// Apply a sort by provided expressions with default direction
1104    pub fn sort_by(self, expr: Vec<Expr>) -> Result<DataFrame> {
1105        self.sort(
1106            expr.into_iter()
1107                .map(|e| e.sort(true, false))
1108                .collect::<Vec<SortExpr>>(),
1109        )
1110    }
1111
1112    /// Sort the DataFrame by the specified sorting expressions.
1113    ///
1114    /// Note that any expression can be turned into
1115    /// a sort expression by calling its [sort](Expr::sort) method.
1116    ///
1117    /// # Example
1118    ///
1119    /// ```
1120    /// # use datafusion::prelude::*;
1121    /// # use datafusion::error::Result;
1122    /// # use datafusion_common::assert_batches_sorted_eq;
1123    /// # #[tokio::main]
1124    /// # async fn main() -> Result<()> {
1125    /// let ctx = SessionContext::new();
1126    /// let df = ctx.read_csv("tests/data/example_long.csv", CsvReadOptions::new()).await?;
1127    /// let df = df.sort(vec![
1128    ///   col("a").sort(false, true),   // a DESC, nulls first
1129    ///   col("b").sort(true, false), // b ASC, nulls last
1130    ///  ])?;
1131    /// let expected = vec![
1132    ///     "+---+---+---+",
1133    ///     "| a | b | c |",
1134    ///     "+---+---+---+",
1135    ///     "| 1 | 2 | 3 |",
1136    ///     "| 4 | 5 | 6 |",
1137    ///     "| 7 | 8 | 9 |",
1138    ///     "+---+---+---+",
1139    /// ];
1140    /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
1141    /// # Ok(())
1142    /// # }
1143    /// ```
1144    pub fn sort(self, expr: Vec<SortExpr>) -> Result<DataFrame> {
1145        let plan = LogicalPlanBuilder::from(self.plan).sort(expr)?.build()?;
1146        Ok(DataFrame {
1147            session_state: self.session_state,
1148            plan,
1149            projection_requires_validation: self.projection_requires_validation,
1150        })
1151    }
1152
1153    /// Join this `DataFrame` with another `DataFrame` using explicitly specified
1154    /// columns and an optional filter expression.
1155    ///
1156    /// See [`join_on`](Self::join_on) for a more concise way to specify the
1157    /// join condition. Since DataFusion will automatically identify and
1158    /// optimize equality predicates there is no performance difference between
1159    /// this function and `join_on`
1160    ///
1161    /// `left_cols` and `right_cols` are used to form "equijoin" predicates (see
1162    /// example below), which are then combined with the optional `filter`
1163    /// expression. If `left_cols` and `right_cols` contain ambiguous column
1164    /// references, they will be disambiguated by prioritizing the left relation
1165    /// for `left_cols` and the right relation for `right_cols`.
1166    ///
1167    /// Note that in case of outer join, the `filter` is applied to only matched rows.
1168    ///
1169    /// # Example
1170    /// ```
1171    /// # use datafusion::prelude::*;
1172    /// # use datafusion::error::Result;
1173    /// # use datafusion_common::assert_batches_sorted_eq;
1174    /// # #[tokio::main]
1175    /// # async fn main() -> Result<()> {
1176    /// let ctx = SessionContext::new();
1177    /// let left = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
1178    /// let right = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?
1179    ///   .select(vec![
1180    ///     col("a").alias("a2"),
1181    ///     col("b").alias("b2"),
1182    ///     col("c").alias("c2")])?;
1183    /// // Perform the equivalent of `left INNER JOIN right ON (a = a2 AND b = b2)`
1184    /// // finding all pairs of rows from `left` and `right` where `a = a2` and `b = b2`.
1185    /// let join = left.join(right, JoinType::Inner, &["a", "b"], &["a2", "b2"], None)?;
1186    /// let expected = vec![
1187    ///     "+---+---+---+----+----+----+",
1188    ///     "| a | b | c | a2 | b2 | c2 |",
1189    ///     "+---+---+---+----+----+----+",
1190    ///     "| 1 | 2 | 3 | 1  | 2  | 3  |",
1191    ///     "+---+---+---+----+----+----+"
1192    /// ];
1193    /// assert_batches_sorted_eq!(expected, &join.collect().await?);
1194    /// # Ok(())
1195    /// # }
1196    /// ```
1197    ///
1198    pub fn join(
1199        self,
1200        right: DataFrame,
1201        join_type: JoinType,
1202        left_cols: &[&str],
1203        right_cols: &[&str],
1204        filter: Option<Expr>,
1205    ) -> Result<DataFrame> {
1206        let plan = LogicalPlanBuilder::from(self.plan)
1207            .join(
1208                right.plan,
1209                join_type,
1210                (left_cols.to_vec(), right_cols.to_vec()),
1211                filter,
1212            )?
1213            .build()?;
1214        Ok(DataFrame {
1215            session_state: self.session_state,
1216            plan,
1217            projection_requires_validation: true,
1218        })
1219    }
1220
1221    /// Join this `DataFrame` with another `DataFrame` using the specified
1222    /// expressions.
1223    ///
1224    /// Note that DataFusion automatically optimizes joins, including
1225    /// identifying and optimizing equality predicates.
1226    ///
1227    /// # Example
1228    /// ```
1229    /// # use datafusion::prelude::*;
1230    /// # use datafusion::error::Result;
1231    /// # use datafusion_common::assert_batches_sorted_eq;
1232    /// # #[tokio::main]
1233    /// # async fn main() -> Result<()> {
1234    /// let ctx = SessionContext::new();
1235    /// let left = ctx
1236    ///     .read_csv("tests/data/example.csv", CsvReadOptions::new())
1237    ///     .await?;
1238    /// let right = ctx
1239    ///     .read_csv("tests/data/example.csv", CsvReadOptions::new())
1240    ///     .await?
1241    ///     .select(vec![
1242    ///         col("a").alias("a2"),
1243    ///         col("b").alias("b2"),
1244    ///         col("c").alias("c2"),
1245    ///     ])?;
1246    ///
1247    /// // Perform the equivalent of `left INNER JOIN right ON (a != a2 AND b != b2)`
1248    /// // finding all pairs of rows from `left` and `right` where
1249    /// // where `a != a2` and `b != b2`.
1250    /// let join_on = left.join_on(
1251    ///     right,
1252    ///     JoinType::Inner,
1253    ///     [col("a").not_eq(col("a2")), col("b").not_eq(col("b2"))],
1254    /// )?;
1255    /// let expected = vec![
1256    ///     "+---+---+---+----+----+----+",
1257    ///     "| a | b | c | a2 | b2 | c2 |",
1258    ///     "+---+---+---+----+----+----+",
1259    ///     "+---+---+---+----+----+----+"
1260    /// ];
1261    /// # assert_batches_sorted_eq!(expected, &join_on.collect().await?);
1262    /// # Ok(())
1263    /// # }
1264    /// ```
1265    pub fn join_on(
1266        self,
1267        right: DataFrame,
1268        join_type: JoinType,
1269        on_exprs: impl IntoIterator<Item = Expr>,
1270    ) -> Result<DataFrame> {
1271        let plan = LogicalPlanBuilder::from(self.plan)
1272            .join_on(right.plan, join_type, on_exprs)?
1273            .build()?;
1274        Ok(DataFrame {
1275            session_state: self.session_state,
1276            plan,
1277            projection_requires_validation: true,
1278        })
1279    }
1280
1281    /// Repartition a DataFrame based on a logical partitioning scheme.
1282    ///
1283    /// # Example
1284    /// ```
1285    /// # use datafusion::prelude::*;
1286    /// # use datafusion::error::Result;
1287    /// # use datafusion_common::assert_batches_sorted_eq;
1288    /// # #[tokio::main]
1289    /// # async fn main() -> Result<()> {
1290    /// let ctx = SessionContext::new();
1291    /// let df = ctx.read_csv("tests/data/example_long.csv", CsvReadOptions::new()).await?;
1292    /// let df1 = df.repartition(Partitioning::RoundRobinBatch(4))?;
1293    /// let expected = vec![
1294    ///     "+---+---+---+",
1295    ///     "| a | b | c |",
1296    ///     "+---+---+---+",
1297    ///     "| 1 | 2 | 3 |",
1298    ///     "| 4 | 5 | 6 |",
1299    ///     "| 7 | 8 | 9 |",
1300    ///     "+---+---+---+"
1301    /// ];
1302    /// # assert_batches_sorted_eq!(expected, &df1.collect().await?);
1303    /// # Ok(())
1304    /// # }
1305    /// ```
1306    pub fn repartition(self, partitioning_scheme: Partitioning) -> Result<DataFrame> {
1307        let plan = LogicalPlanBuilder::from(self.plan)
1308            .repartition(partitioning_scheme)?
1309            .build()?;
1310        Ok(DataFrame {
1311            session_state: self.session_state,
1312            plan,
1313            projection_requires_validation: true,
1314        })
1315    }
1316
1317    /// Return the total number of rows in this `DataFrame`.
1318    ///
1319    /// Note that this method will actually run a plan to calculate the count,
1320    /// which may be slow for large or complicated DataFrames.
1321    ///
1322    /// # Example
1323    /// ```
1324    /// # use datafusion::prelude::*;
1325    /// # use datafusion::error::Result;
1326    /// # #[tokio::main]
1327    /// # async fn main() -> Result<()> {
1328    /// let ctx = SessionContext::new();
1329    /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
1330    /// let count = df.count().await?; // 1
1331    /// # assert_eq!(count, 1);
1332    /// # Ok(())
1333    /// # }
1334    /// ```
1335    pub async fn count(self) -> Result<usize> {
1336        let rows = self
1337            .aggregate(
1338                vec![],
1339                vec![count(Expr::Literal(COUNT_STAR_EXPANSION, None))],
1340            )?
1341            .collect()
1342            .await?;
1343        let len = *rows
1344            .first()
1345            .and_then(|r| r.columns().first())
1346            .and_then(|c| c.as_any().downcast_ref::<Int64Array>())
1347            .and_then(|a| a.values().first())
1348            .ok_or(DataFusionError::Internal(
1349                "Unexpected output when collecting for count()".to_string(),
1350            ))? as usize;
1351        Ok(len)
1352    }
1353
1354    /// Execute this `DataFrame` and buffer all resulting `RecordBatch`es  into memory.
1355    ///
1356    /// Prior to calling `collect`, modifying a DataFrame simply updates a plan
1357    /// (no actual computation is performed). `collect` triggers the computation.
1358    ///
1359    /// See [`Self::execute_stream`] to execute a DataFrame without buffering.
1360    ///
1361    /// # Example
1362    /// ```
1363    /// # use datafusion::prelude::*;
1364    /// # use datafusion::error::Result;
1365    /// # #[tokio::main]
1366    /// # async fn main() -> Result<()> {
1367    /// let ctx = SessionContext::new();
1368    /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
1369    /// let batches = df.collect().await?;
1370    /// # Ok(())
1371    /// # }
1372    /// ```
1373    pub async fn collect(self) -> Result<Vec<RecordBatch>> {
1374        let task_ctx = Arc::new(self.task_ctx());
1375        let plan = self.create_physical_plan().await?;
1376        collect(plan, task_ctx).await
1377    }
1378
1379    /// Execute the `DataFrame` and print the results to the console.
1380    ///
1381    /// # Example
1382    /// ```
1383    /// # use datafusion::prelude::*;
1384    /// # use datafusion::error::Result;
1385    /// # #[tokio::main]
1386    /// # async fn main() -> Result<()> {
1387    /// let ctx = SessionContext::new();
1388    /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
1389    /// df.show().await?;
1390    /// # Ok(())
1391    /// # }
1392    /// ```
1393    pub async fn show(self) -> Result<()> {
1394        println!("{}", self.to_string().await?);
1395        Ok(())
1396    }
1397
1398    /// Execute the `DataFrame` and return a string representation of the results.
1399    ///
1400    /// # Example
1401    /// ```
1402    /// # use datafusion::prelude::*;
1403    /// # use datafusion::error::Result;
1404    /// # use datafusion::execution::SessionStateBuilder;
1405    ///
1406    /// # #[tokio::main]
1407    /// # async fn main() -> Result<()> {
1408    /// let cfg = SessionConfig::new()
1409    ///     .set_str("datafusion.format.null", "no-value");
1410    /// let session_state = SessionStateBuilder::new()
1411    ///     .with_config(cfg)
1412    ///     .with_default_features()
1413    ///     .build();
1414    /// let ctx = SessionContext::new_with_state(session_state);
1415    /// let df = ctx.sql("select null as 'null-column'").await?;
1416    /// let result = df.to_string().await?;
1417    /// assert_eq!(result,
1418    /// "+-------------+
1419    /// | null-column |
1420    /// +-------------+
1421    /// | no-value    |
1422    /// +-------------+"
1423    /// );
1424    /// # Ok(())
1425    /// # }
1426    pub async fn to_string(self) -> Result<String> {
1427        let options = self.session_state.config().options().format.clone();
1428        let arrow_options: arrow::util::display::FormatOptions = (&options).try_into()?;
1429
1430        let results = self.collect().await?;
1431        Ok(
1432            pretty::pretty_format_batches_with_options(&results, &arrow_options)?
1433                .to_string(),
1434        )
1435    }
1436
1437    /// Execute the `DataFrame` and print only the first `num` rows of the
1438    /// result to the console.
1439    ///
1440    /// # Example
1441    /// ```
1442    /// # use datafusion::prelude::*;
1443    /// # use datafusion::error::Result;
1444    /// # #[tokio::main]
1445    /// # async fn main() -> Result<()> {
1446    /// let ctx = SessionContext::new();
1447    /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
1448    /// df.show_limit(10).await?;
1449    /// # Ok(())
1450    /// # }
1451    /// ```
1452    pub async fn show_limit(self, num: usize) -> Result<()> {
1453        let results = self.limit(0, Some(num))?.collect().await?;
1454        Ok(pretty::print_batches(&results)?)
1455    }
1456
1457    /// Return a new [`TaskContext`] which would be used to execute this DataFrame
1458    pub fn task_ctx(&self) -> TaskContext {
1459        TaskContext::from(self.session_state.as_ref())
1460    }
1461
1462    /// Executes this DataFrame and returns a stream over a single partition
1463    ///
1464    /// See [Self::collect] to buffer the `RecordBatch`es in memory.
1465    ///
1466    /// # Example
1467    /// ```
1468    /// # use datafusion::prelude::*;
1469    /// # use datafusion::error::Result;
1470    /// # #[tokio::main]
1471    /// # async fn main() -> Result<()> {
1472    /// let ctx = SessionContext::new();
1473    /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
1474    /// let stream = df.execute_stream().await?;
1475    /// # Ok(())
1476    /// # }
1477    /// ```
1478    ///
1479    /// # Aborting Execution
1480    ///
1481    /// Dropping the stream will abort the execution of the query, and free up
1482    /// any allocated resources
1483    pub async fn execute_stream(self) -> Result<SendableRecordBatchStream> {
1484        let task_ctx = Arc::new(self.task_ctx());
1485        let plan = self.create_physical_plan().await?;
1486        execute_stream(plan, task_ctx)
1487    }
1488
1489    /// Executes this DataFrame and collects all results into a vector of vector of RecordBatch
1490    /// maintaining the input partitioning.
1491    ///
1492    /// # Example
1493    /// ```
1494    /// # use datafusion::prelude::*;
1495    /// # use datafusion::error::Result;
1496    /// # #[tokio::main]
1497    /// # async fn main() -> Result<()> {
1498    /// let ctx = SessionContext::new();
1499    /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
1500    /// let batches = df.collect_partitioned().await?;
1501    /// # Ok(())
1502    /// # }
1503    /// ```
1504    pub async fn collect_partitioned(self) -> Result<Vec<Vec<RecordBatch>>> {
1505        let task_ctx = Arc::new(self.task_ctx());
1506        let plan = self.create_physical_plan().await?;
1507        collect_partitioned(plan, task_ctx).await
1508    }
1509
1510    /// Executes this DataFrame and returns one stream per partition.
1511    ///
1512    /// # Example
1513    /// ```
1514    /// # use datafusion::prelude::*;
1515    /// # use datafusion::error::Result;
1516    /// # #[tokio::main]
1517    /// # async fn main() -> Result<()> {
1518    /// let ctx = SessionContext::new();
1519    /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
1520    /// let batches = df.execute_stream_partitioned().await?;
1521    /// # Ok(())
1522    /// # }
1523    /// ```
1524    /// # Aborting Execution
1525    ///
1526    /// Dropping the stream will abort the execution of the query, and free up
1527    /// any allocated resources
1528    pub async fn execute_stream_partitioned(
1529        self,
1530    ) -> Result<Vec<SendableRecordBatchStream>> {
1531        let task_ctx = Arc::new(self.task_ctx());
1532        let plan = self.create_physical_plan().await?;
1533        execute_stream_partitioned(plan, task_ctx)
1534    }
1535
1536    /// Returns the `DFSchema` describing the output of this DataFrame.
1537    ///
1538    /// The output `DFSchema` contains information on the name, data type, and
1539    /// nullability for each column.
1540    ///
1541    /// # Example
1542    /// ```
1543    /// # use datafusion::prelude::*;
1544    /// # use datafusion::error::Result;
1545    /// # #[tokio::main]
1546    /// # async fn main() -> Result<()> {
1547    /// let ctx = SessionContext::new();
1548    /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
1549    /// let schema = df.schema();
1550    /// # Ok(())
1551    /// # }
1552    /// ```
1553    pub fn schema(&self) -> &DFSchema {
1554        self.plan.schema()
1555    }
1556
1557    /// Return a reference to the unoptimized [`LogicalPlan`] that comprises
1558    /// this DataFrame.
1559    ///
1560    /// See [`Self::into_unoptimized_plan`] for more details.
1561    pub fn logical_plan(&self) -> &LogicalPlan {
1562        &self.plan
1563    }
1564
1565    /// Returns both the [`LogicalPlan`] and [`SessionState`] that comprise this [`DataFrame`]
1566    pub fn into_parts(self) -> (SessionState, LogicalPlan) {
1567        (*self.session_state, self.plan)
1568    }
1569
1570    /// Return the [`LogicalPlan`] represented by this DataFrame without running
1571    /// any optimizers
1572    ///
1573    /// Note: This method should not be used outside testing, as it loses the
1574    /// snapshot of the [`SessionState`] attached to this [`DataFrame`] and
1575    /// consequently subsequent operations may take place against a different
1576    /// state (e.g. a different value of `now()`)
1577    ///
1578    /// See [`Self::into_parts`] to retrieve the owned [`LogicalPlan`] and
1579    /// corresponding [`SessionState`].
1580    pub fn into_unoptimized_plan(self) -> LogicalPlan {
1581        self.plan
1582    }
1583
1584    /// Return the optimized [`LogicalPlan`] represented by this DataFrame.
1585    ///
1586    /// Note: This method should not be used outside testing -- see
1587    /// [`Self::into_unoptimized_plan`] for more details.
1588    pub fn into_optimized_plan(self) -> Result<LogicalPlan> {
1589        // Optimize the plan first for better UX
1590        self.session_state.optimize(&self.plan)
1591    }
1592
1593    /// Converts this [`DataFrame`] into a [`TableProvider`] that can be registered
1594    /// as a table view using [`SessionContext::register_table`].
1595    ///
1596    /// Note: This discards the [`SessionState`] associated with this
1597    /// [`DataFrame`] in favour of the one passed to [`TableProvider::scan`]
1598    pub fn into_view(self) -> Arc<dyn TableProvider> {
1599        Arc::new(DataFrameTableProvider { plan: self.plan })
1600    }
1601
1602    /// Return a DataFrame with the explanation of its plan so far.
1603    ///
1604    /// if `analyze` is specified, runs the plan and reports metrics
1605    /// if `verbose` is true, prints out additional details.
1606    /// The default format is Indent format.
1607    ///
1608    /// ```
1609    /// # use datafusion::prelude::*;
1610    /// # use datafusion::error::Result;
1611    /// # #[tokio::main]
1612    /// # async fn main() -> Result<()> {
1613    /// let ctx = SessionContext::new();
1614    /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
1615    /// let batches = df.limit(0, Some(100))?.explain(false, false)?.collect().await?;
1616    /// # Ok(())
1617    /// # }
1618    /// ```
1619    pub fn explain(self, verbose: bool, analyze: bool) -> Result<DataFrame> {
1620        // Set the default format to Indent to keep the previous behavior
1621        let opts = ExplainOption::default()
1622            .with_verbose(verbose)
1623            .with_analyze(analyze);
1624        self.explain_with_options(opts)
1625    }
1626
1627    /// Return a DataFrame with the explanation of its plan so far.
1628    ///
1629    /// `opt` is used to specify the options for the explain operation.
1630    /// Details of the options can be found in [`ExplainOption`].
1631    /// ```
1632    /// # use datafusion::prelude::*;
1633    /// # use datafusion::error::Result;
1634    /// # #[tokio::main]
1635    /// # async fn main() -> Result<()> {
1636    /// use datafusion_expr::{Explain, ExplainOption};
1637    /// let ctx = SessionContext::new();
1638    /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
1639    /// let batches = df.limit(0, Some(100))?.explain_with_options(ExplainOption::default().with_verbose(false).with_analyze(false))?.collect().await?;
1640    /// # Ok(())
1641    /// # }
1642    /// ```
1643    pub fn explain_with_options(
1644        self,
1645        explain_option: ExplainOption,
1646    ) -> Result<DataFrame> {
1647        if matches!(self.plan, LogicalPlan::Explain(_)) {
1648            return plan_err!("Nested EXPLAINs are not supported");
1649        }
1650        let plan = LogicalPlanBuilder::from(self.plan)
1651            .explain_option_format(explain_option)?
1652            .build()?;
1653        Ok(DataFrame {
1654            session_state: self.session_state,
1655            plan,
1656            projection_requires_validation: self.projection_requires_validation,
1657        })
1658    }
1659
1660    /// Return a `FunctionRegistry` used to plan udf's calls
1661    ///
1662    /// # Example
1663    /// ```
1664    /// # use datafusion::prelude::*;
1665    /// # use datafusion::error::Result;
1666    /// # #[tokio::main]
1667    /// # async fn main() -> Result<()> {
1668    /// let ctx = SessionContext::new();
1669    /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
1670    /// let f = df.registry();
1671    /// // use f.udf("name", vec![...]) to use the udf
1672    /// # Ok(())
1673    /// # }
1674    /// ```
1675    pub fn registry(&self) -> &dyn FunctionRegistry {
1676        self.session_state.as_ref()
1677    }
1678
1679    /// Calculate the intersection of two [`DataFrame`]s.  The two [`DataFrame`]s must have exactly the same schema
1680    ///
1681    /// ```
1682    /// # use datafusion::prelude::*;
1683    /// # use datafusion::error::Result;
1684    /// # use datafusion_common::assert_batches_sorted_eq;
1685    /// # #[tokio::main]
1686    /// # async fn main() -> Result<()> {
1687    /// let ctx = SessionContext::new();
1688    /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
1689    /// let d2 = ctx.read_csv("tests/data/example_long.csv", CsvReadOptions::new()).await?;
1690    /// let df = df.intersect(d2)?;
1691    /// let expected = vec![
1692    ///     "+---+---+---+",
1693    ///     "| a | b | c |",
1694    ///     "+---+---+---+",
1695    ///     "| 1 | 2 | 3 |",
1696    ///     "+---+---+---+"
1697    /// ];
1698    /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
1699    /// # Ok(())
1700    /// # }
1701    /// ```
1702    pub fn intersect(self, dataframe: DataFrame) -> Result<DataFrame> {
1703        let left_plan = self.plan;
1704        let right_plan = dataframe.plan;
1705        let plan = LogicalPlanBuilder::intersect(left_plan, right_plan, true)?;
1706        Ok(DataFrame {
1707            session_state: self.session_state,
1708            plan,
1709            projection_requires_validation: true,
1710        })
1711    }
1712
1713    /// Calculate the distinct intersection of two [`DataFrame`]s.  The two [`DataFrame`]s must have exactly the same schema
1714    ///
1715    /// ```
1716    /// # use datafusion::prelude::*;
1717    /// # use datafusion::error::Result;
1718    /// # use datafusion_common::assert_batches_sorted_eq;
1719    /// # #[tokio::main]
1720    /// # async fn main() -> Result<()> {
1721    /// let ctx = SessionContext::new();
1722    /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
1723    /// let d2 = ctx.read_csv("tests/data/example_long.csv", CsvReadOptions::new()).await?;
1724    /// let df = df.intersect_distinct(d2)?;
1725    /// let expected = vec![
1726    ///     "+---+---+---+",
1727    ///     "| a | b | c |",
1728    ///     "+---+---+---+",
1729    ///     "| 1 | 2 | 3 |",
1730    ///     "+---+---+---+"
1731    /// ];
1732    /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
1733    /// # Ok(())
1734    /// # }
1735    /// ```
1736    pub fn intersect_distinct(self, dataframe: DataFrame) -> Result<DataFrame> {
1737        let left_plan = self.plan;
1738        let right_plan = dataframe.plan;
1739        let plan = LogicalPlanBuilder::intersect(left_plan, right_plan, false)?;
1740        Ok(DataFrame {
1741            session_state: self.session_state,
1742            plan,
1743            projection_requires_validation: true,
1744        })
1745    }
1746
1747    /// Calculate the exception of two [`DataFrame`]s.  The two [`DataFrame`]s must have exactly the same schema
1748    ///
1749    /// ```
1750    /// # use datafusion::prelude::*;
1751    /// # use datafusion::error::Result;
1752    /// # use datafusion_common::assert_batches_sorted_eq;
1753    /// # #[tokio::main]
1754    /// # async fn main() -> Result<()> {
1755    /// let ctx = SessionContext::new();
1756    /// let df = ctx.read_csv("tests/data/example_long.csv", CsvReadOptions::new()).await?;
1757    /// let d2 = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
1758    /// let result = df.except(d2)?;
1759    /// // those columns are not in example.csv, but in example_long.csv
1760    /// let expected = vec![
1761    ///     "+---+---+---+",
1762    ///     "| a | b | c |",
1763    ///     "+---+---+---+",
1764    ///     "| 4 | 5 | 6 |",
1765    ///     "| 7 | 8 | 9 |",
1766    ///     "+---+---+---+"
1767    /// ];
1768    /// # assert_batches_sorted_eq!(expected, &result.collect().await?);
1769    /// # Ok(())
1770    /// # }
1771    /// ```
1772    pub fn except(self, dataframe: DataFrame) -> Result<DataFrame> {
1773        let left_plan = self.plan;
1774        let right_plan = dataframe.plan;
1775        let plan = LogicalPlanBuilder::except(left_plan, right_plan, true)?;
1776        Ok(DataFrame {
1777            session_state: self.session_state,
1778            plan,
1779            projection_requires_validation: true,
1780        })
1781    }
1782
1783    /// Calculate the distinct exception of two [`DataFrame`]s.  The two [`DataFrame`]s must have exactly the same schema
1784    ///
1785    /// ```
1786    /// # use datafusion::prelude::*;
1787    /// # use datafusion::error::Result;
1788    /// # use datafusion_common::assert_batches_sorted_eq;
1789    /// # #[tokio::main]
1790    /// # async fn main() -> Result<()> {
1791    /// let ctx = SessionContext::new();
1792    /// let df = ctx.read_csv("tests/data/example_long.csv", CsvReadOptions::new()).await?;
1793    /// let d2 = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
1794    /// let result = df.except_distinct(d2)?;
1795    /// // those columns are not in example.csv, but in example_long.csv
1796    /// let expected = vec![
1797    ///     "+---+---+---+",
1798    ///     "| a | b | c |",
1799    ///     "+---+---+---+",
1800    ///     "| 4 | 5 | 6 |",
1801    ///     "| 7 | 8 | 9 |",
1802    ///     "+---+---+---+"
1803    /// ];
1804    /// # assert_batches_sorted_eq!(expected, &result.collect().await?);
1805    /// # Ok(())
1806    /// # }
1807    /// ```
1808    pub fn except_distinct(self, dataframe: DataFrame) -> Result<DataFrame> {
1809        let left_plan = self.plan;
1810        let right_plan = dataframe.plan;
1811        let plan = LogicalPlanBuilder::except(left_plan, right_plan, false)?;
1812        Ok(DataFrame {
1813            session_state: self.session_state,
1814            plan,
1815            projection_requires_validation: true,
1816        })
1817    }
1818
1819    /// Execute this `DataFrame` and write the results to `table_name`.
1820    ///
1821    /// Returns a single [RecordBatch] containing a single column and
1822    /// row representing the count of total rows written.
1823    ///
1824    /// Unlike most other `DataFrame` methods, this method executes eagerly.
1825    /// Data is written to the table using the [`TableProvider::insert_into`]
1826    /// method. This is the same underlying implementation used by SQL `INSERT
1827    /// INTO` statements.
1828    pub async fn write_table(
1829        self,
1830        table_name: &str,
1831        write_options: DataFrameWriteOptions,
1832    ) -> Result<Vec<RecordBatch>, DataFusionError> {
1833        let plan = if write_options.sort_by.is_empty() {
1834            self.plan
1835        } else {
1836            LogicalPlanBuilder::from(self.plan)
1837                .sort(write_options.sort_by)?
1838                .build()?
1839        };
1840
1841        let table_ref: TableReference = table_name.into();
1842        let table_schema = self.session_state.schema_for_ref(table_ref.clone())?;
1843        let target = match table_schema.table(table_ref.table()).await? {
1844            Some(ref provider) => Ok(Arc::clone(provider)),
1845            _ => plan_err!("No table named '{table_name}'"),
1846        }?;
1847
1848        let target = Arc::new(DefaultTableSource::new(target));
1849
1850        let plan = LogicalPlanBuilder::insert_into(
1851            plan,
1852            table_ref,
1853            target,
1854            write_options.insert_op,
1855        )?
1856        .build()?;
1857
1858        DataFrame {
1859            session_state: self.session_state,
1860            plan,
1861            projection_requires_validation: self.projection_requires_validation,
1862        }
1863        .collect()
1864        .await
1865    }
1866
1867    /// Execute the `DataFrame` and write the results to CSV file(s).
1868    ///
1869    /// # Example
1870    /// ```
1871    /// # use datafusion::prelude::*;
1872    /// # use datafusion::error::Result;
1873    /// # use std::fs;
1874    /// # #[tokio::main]
1875    /// # async fn main() -> Result<()> {
1876    /// use datafusion::dataframe::DataFrameWriteOptions;
1877    /// let ctx = SessionContext::new();
1878    /// // Sort the data by column "b" and write it to a new location
1879    /// ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?
1880    ///   .sort(vec![col("b").sort(true, true)])? // sort by b asc, nulls first
1881    ///   .write_csv(
1882    ///     "output.csv",
1883    ///     DataFrameWriteOptions::new(),
1884    ///     None, // can also specify CSV writing options here
1885    /// ).await?;
1886    /// # fs::remove_file("output.csv")?;
1887    /// # Ok(())
1888    /// # }
1889    /// ```
1890    pub async fn write_csv(
1891        self,
1892        path: &str,
1893        options: DataFrameWriteOptions,
1894        writer_options: Option<CsvOptions>,
1895    ) -> Result<Vec<RecordBatch>, DataFusionError> {
1896        if options.insert_op != InsertOp::Append {
1897            return not_impl_err!(
1898                "{} is not implemented for DataFrame::write_csv.",
1899                options.insert_op
1900            );
1901        }
1902
1903        let format = if let Some(csv_opts) = writer_options {
1904            Arc::new(CsvFormatFactory::new_with_options(csv_opts))
1905        } else {
1906            Arc::new(CsvFormatFactory::new())
1907        };
1908
1909        let file_type = format_as_file_type(format);
1910
1911        let plan = if options.sort_by.is_empty() {
1912            self.plan
1913        } else {
1914            LogicalPlanBuilder::from(self.plan)
1915                .sort(options.sort_by)?
1916                .build()?
1917        };
1918
1919        let plan = LogicalPlanBuilder::copy_to(
1920            plan,
1921            path.into(),
1922            file_type,
1923            HashMap::new(),
1924            options.partition_by,
1925        )?
1926        .build()?;
1927
1928        DataFrame {
1929            session_state: self.session_state,
1930            plan,
1931            projection_requires_validation: self.projection_requires_validation,
1932        }
1933        .collect()
1934        .await
1935    }
1936
1937    /// Execute the `DataFrame` and write the results to JSON file(s).
1938    ///
1939    /// # Example
1940    /// ```
1941    /// # use datafusion::prelude::*;
1942    /// # use datafusion::error::Result;
1943    /// # use std::fs;
1944    /// # #[tokio::main]
1945    /// # async fn main() -> Result<()> {
1946    /// use datafusion::dataframe::DataFrameWriteOptions;
1947    /// let ctx = SessionContext::new();
1948    /// // Sort the data by column "b" and write it to a new location
1949    /// ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?
1950    ///   .sort(vec![col("b").sort(true, true)])? // sort by b asc, nulls first
1951    ///   .write_json(
1952    ///     "output.json",
1953    ///     DataFrameWriteOptions::new(),
1954    ///     None
1955    /// ).await?;
1956    /// # fs::remove_file("output.json")?;
1957    /// # Ok(())
1958    /// # }
1959    /// ```
1960    pub async fn write_json(
1961        self,
1962        path: &str,
1963        options: DataFrameWriteOptions,
1964        writer_options: Option<JsonOptions>,
1965    ) -> Result<Vec<RecordBatch>, DataFusionError> {
1966        if options.insert_op != InsertOp::Append {
1967            return not_impl_err!(
1968                "{} is not implemented for DataFrame::write_json.",
1969                options.insert_op
1970            );
1971        }
1972
1973        let format = if let Some(json_opts) = writer_options {
1974            Arc::new(JsonFormatFactory::new_with_options(json_opts))
1975        } else {
1976            Arc::new(JsonFormatFactory::new())
1977        };
1978
1979        let file_type = format_as_file_type(format);
1980
1981        let plan = if options.sort_by.is_empty() {
1982            self.plan
1983        } else {
1984            LogicalPlanBuilder::from(self.plan)
1985                .sort(options.sort_by)?
1986                .build()?
1987        };
1988
1989        let plan = LogicalPlanBuilder::copy_to(
1990            plan,
1991            path.into(),
1992            file_type,
1993            Default::default(),
1994            options.partition_by,
1995        )?
1996        .build()?;
1997
1998        DataFrame {
1999            session_state: self.session_state,
2000            plan,
2001            projection_requires_validation: self.projection_requires_validation,
2002        }
2003        .collect()
2004        .await
2005    }
2006
2007    /// Add or replace a column in the DataFrame.
2008    ///
2009    /// # Example
2010    /// ```
2011    /// # use datafusion::prelude::*;
2012    /// # use datafusion::error::Result;
2013    /// # #[tokio::main]
2014    /// # async fn main() -> Result<()> {
2015    /// let ctx = SessionContext::new();
2016    /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
2017    /// let df = df.with_column("ab_sum", col("a") + col("b"))?;
2018    /// # Ok(())
2019    /// # }
2020    /// ```
2021    pub fn with_column(self, name: &str, expr: Expr) -> Result<DataFrame> {
2022        let window_func_exprs = find_window_exprs([&expr]);
2023
2024        let (window_fn_str, plan) = if window_func_exprs.is_empty() {
2025            (None, self.plan)
2026        } else {
2027            (
2028                Some(window_func_exprs[0].to_string()),
2029                LogicalPlanBuilder::window_plan(self.plan, window_func_exprs)?,
2030            )
2031        };
2032
2033        let mut col_exists = false;
2034        let new_column = expr.alias(name);
2035        let mut fields: Vec<(Expr, bool)> = plan
2036            .schema()
2037            .iter()
2038            .filter_map(|(qualifier, field)| {
2039                if field.name() == name {
2040                    col_exists = true;
2041                    Some((new_column.clone(), true))
2042                } else {
2043                    let e = col(Column::from((qualifier, field)));
2044                    window_fn_str
2045                        .as_ref()
2046                        .filter(|s| *s == &e.to_string())
2047                        .is_none()
2048                        .then_some((e, self.projection_requires_validation))
2049                }
2050            })
2051            .collect();
2052
2053        if !col_exists {
2054            fields.push((new_column, true));
2055        }
2056
2057        let project_plan = LogicalPlanBuilder::from(plan)
2058            .project_with_validation(fields)?
2059            .build()?;
2060
2061        Ok(DataFrame {
2062            session_state: self.session_state,
2063            plan: project_plan,
2064            projection_requires_validation: false,
2065        })
2066    }
2067
2068    /// Rename one column by applying a new projection. This is a no-op if the column to be
2069    /// renamed does not exist.
2070    ///
2071    /// The method supports case sensitive rename with wrapping column name into one of following symbols (  "  or  '  or  `  )
2072    ///
2073    /// Alternatively setting DataFusion param `datafusion.sql_parser.enable_ident_normalization` to `false` will enable
2074    /// case sensitive rename without need to wrap column name into special symbols
2075    ///
2076    /// # Example
2077    /// ```
2078    /// # use datafusion::prelude::*;
2079    /// # use datafusion::error::Result;
2080    /// # #[tokio::main]
2081    /// # async fn main() -> Result<()> {
2082    /// let ctx = SessionContext::new();
2083    /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
2084    /// let df = df.with_column_renamed("ab_sum", "total")?;
2085    ///
2086    /// # Ok(())
2087    /// # }
2088    /// ```
2089    pub fn with_column_renamed(
2090        self,
2091        old_name: impl Into<String>,
2092        new_name: &str,
2093    ) -> Result<DataFrame> {
2094        let ident_opts = self
2095            .session_state
2096            .config_options()
2097            .sql_parser
2098            .enable_ident_normalization;
2099        let old_column: Column = if ident_opts {
2100            Column::from_qualified_name(old_name)
2101        } else {
2102            Column::from_qualified_name_ignore_case(old_name)
2103        };
2104
2105        let (qualifier_rename, field_rename) =
2106            match self.plan.schema().qualified_field_from_column(&old_column) {
2107                Ok(qualifier_and_field) => qualifier_and_field,
2108                // no-op if field not found
2109                Err(DataFusionError::SchemaError(e, _))
2110                    if matches!(*e, SchemaError::FieldNotFound { .. }) =>
2111                {
2112                    return Ok(self);
2113                }
2114                Err(err) => return Err(err),
2115            };
2116        let projection = self
2117            .plan
2118            .schema()
2119            .iter()
2120            .map(|(qualifier, field)| {
2121                if qualifier.eq(&qualifier_rename) && field.as_ref() == field_rename {
2122                    (
2123                        col(Column::from((qualifier, field)))
2124                            .alias_qualified(qualifier.cloned(), new_name),
2125                        false,
2126                    )
2127                } else {
2128                    (col(Column::from((qualifier, field))), false)
2129                }
2130            })
2131            .collect::<Vec<_>>();
2132        let project_plan = LogicalPlanBuilder::from(self.plan)
2133            .project_with_validation(projection)?
2134            .build()?;
2135        Ok(DataFrame {
2136            session_state: self.session_state,
2137            plan: project_plan,
2138            projection_requires_validation: false,
2139        })
2140    }
2141
2142    /// Replace all parameters in logical plan with the specified
2143    /// values, in preparation for execution.
2144    ///
2145    /// # Example
2146    ///
2147    /// ```
2148    /// use datafusion::prelude::*;
2149    /// # use datafusion::{error::Result, assert_batches_eq};
2150    /// # #[tokio::main]
2151    /// # async fn main() -> Result<()> {
2152    /// # use datafusion_common::ScalarValue;
2153    /// let ctx = SessionContext::new();
2154    /// # ctx.register_csv("example", "tests/data/example.csv", CsvReadOptions::new()).await?;
2155    /// let results = ctx
2156    ///   .sql("SELECT a FROM example WHERE b = $1")
2157    ///   .await?
2158    ///    // replace $1 with value 2
2159    ///   .with_param_values(vec![
2160    ///      // value at index 0 --> $1
2161    ///      ScalarValue::from(2i64)
2162    ///    ])?
2163    ///   .collect()
2164    ///   .await?;
2165    /// assert_batches_eq!(
2166    ///  &[
2167    ///    "+---+",
2168    ///    "| a |",
2169    ///    "+---+",
2170    ///    "| 1 |",
2171    ///    "+---+",
2172    ///  ],
2173    ///  &results
2174    /// );
2175    /// // Note you can also provide named parameters
2176    /// let results = ctx
2177    ///   .sql("SELECT a FROM example WHERE b = $my_param")
2178    ///   .await?
2179    ///    // replace $my_param with value 2
2180    ///    // Note you can also use a HashMap as well
2181    ///   .with_param_values(vec![
2182    ///       ("my_param", ScalarValue::from(2i64))
2183    ///    ])?
2184    ///   .collect()
2185    ///   .await?;
2186    /// assert_batches_eq!(
2187    ///  &[
2188    ///    "+---+",
2189    ///    "| a |",
2190    ///    "+---+",
2191    ///    "| 1 |",
2192    ///    "+---+",
2193    ///  ],
2194    ///  &results
2195    /// );
2196    /// # Ok(())
2197    /// # }
2198    /// ```
2199    pub fn with_param_values(self, query_values: impl Into<ParamValues>) -> Result<Self> {
2200        let plan = self.plan.with_param_values(query_values)?;
2201        Ok(DataFrame {
2202            session_state: self.session_state,
2203            plan,
2204            projection_requires_validation: self.projection_requires_validation,
2205        })
2206    }
2207
2208    /// Cache DataFrame as a memory table.
2209    ///
2210    /// ```
2211    /// # use datafusion::prelude::*;
2212    /// # use datafusion::error::Result;
2213    /// # #[tokio::main]
2214    /// # async fn main() -> Result<()> {
2215    /// let ctx = SessionContext::new();
2216    /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
2217    /// let df = df.cache().await?;
2218    /// # Ok(())
2219    /// # }
2220    /// ```
2221    pub async fn cache(self) -> Result<DataFrame> {
2222        let context = SessionContext::new_with_state((*self.session_state).clone());
2223        // The schema is consistent with the output
2224        let plan = self.clone().create_physical_plan().await?;
2225        let schema = plan.schema();
2226        let task_ctx = Arc::new(self.task_ctx());
2227        let partitions = collect_partitioned(plan, task_ctx).await?;
2228        let mem_table = MemTable::try_new(schema, partitions)?;
2229        context.read_table(Arc::new(mem_table))
2230    }
2231
2232    /// Apply an alias to the DataFrame.
2233    ///
2234    /// This method replaces the qualifiers of output columns with the given alias.
2235    pub fn alias(self, alias: &str) -> Result<DataFrame> {
2236        let plan = LogicalPlanBuilder::from(self.plan).alias(alias)?.build()?;
2237        Ok(DataFrame {
2238            session_state: self.session_state,
2239            plan,
2240            projection_requires_validation: self.projection_requires_validation,
2241        })
2242    }
2243
2244    /// Fill null values in specified columns with a given value
2245    /// If no columns are specified (empty vector), applies to all columns
2246    /// Only fills if the value can be cast to the column's type
2247    ///
2248    /// # Arguments
2249    /// * `value` - Value to fill nulls with
2250    /// * `columns` - List of column names to fill. If empty, fills all columns.
2251    ///
2252    /// # Example
2253    /// ```
2254    /// # use datafusion::prelude::*;
2255    /// # use datafusion::error::Result;
2256    /// # use datafusion_common::ScalarValue;
2257    /// # #[tokio::main]
2258    /// # async fn main() -> Result<()> {
2259    /// let ctx = SessionContext::new();
2260    /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
2261    /// // Fill nulls in only columns "a" and "c":
2262    /// let df = df.fill_null(ScalarValue::from(0), vec!["a".to_owned(), "c".to_owned()])?;
2263    /// // Fill nulls across all columns:
2264    /// let df = df.fill_null(ScalarValue::from(0), vec![])?;
2265    /// # Ok(())
2266    /// # }
2267    /// ```
2268    pub fn fill_null(
2269        &self,
2270        value: ScalarValue,
2271        columns: Vec<String>,
2272    ) -> Result<DataFrame> {
2273        let cols = if columns.is_empty() {
2274            self.logical_plan()
2275                .schema()
2276                .fields()
2277                .iter()
2278                .map(|f| f.as_ref().clone())
2279                .collect()
2280        } else {
2281            self.find_columns(&columns)?
2282        };
2283
2284        // Create projections for each column
2285        let projections = self
2286            .logical_plan()
2287            .schema()
2288            .fields()
2289            .iter()
2290            .map(|field| {
2291                if cols.contains(field) {
2292                    // Try to cast fill value to column type. If the cast fails, fallback to the original column.
2293                    match value.clone().cast_to(field.data_type()) {
2294                        Ok(fill_value) => Expr::Alias(Alias {
2295                            expr: Box::new(Expr::ScalarFunction(ScalarFunction {
2296                                func: coalesce(),
2297                                args: vec![col(field.name()), lit(fill_value)],
2298                            })),
2299                            relation: None,
2300                            name: field.name().to_string(),
2301                            metadata: None,
2302                        }),
2303                        Err(_) => col(field.name()),
2304                    }
2305                } else {
2306                    col(field.name())
2307                }
2308            })
2309            .collect::<Vec<_>>();
2310
2311        self.clone().select(projections)
2312    }
2313
2314    // Helper to find columns from names
2315    fn find_columns(&self, names: &[String]) -> Result<Vec<Field>> {
2316        let schema = self.logical_plan().schema();
2317        names
2318            .iter()
2319            .map(|name| {
2320                schema
2321                    .field_with_name(None, name)
2322                    .cloned()
2323                    .map_err(|_| plan_datafusion_err!("Column '{}' not found", name))
2324            })
2325            .collect()
2326    }
2327
2328    /// Helper for creating DataFrame.
2329    /// # Example
2330    /// ```
2331    /// use std::sync::Arc;
2332    /// use arrow::array::{ArrayRef, Int32Array, StringArray};
2333    /// use datafusion::prelude::DataFrame;
2334    /// let id: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3]));
2335    /// let name: ArrayRef = Arc::new(StringArray::from(vec!["foo", "bar", "baz"]));
2336    /// let df = DataFrame::from_columns(vec![("id", id), ("name", name)]).unwrap();
2337    /// // +----+------+,
2338    /// // | id | name |,
2339    /// // +----+------+,
2340    /// // | 1  | foo  |,
2341    /// // | 2  | bar  |,
2342    /// // | 3  | baz  |,
2343    /// // +----+------+,
2344    /// ```
2345    pub fn from_columns(columns: Vec<(&str, ArrayRef)>) -> Result<Self> {
2346        let fields = columns
2347            .iter()
2348            .map(|(name, array)| Field::new(*name, array.data_type().clone(), true))
2349            .collect::<Vec<_>>();
2350
2351        let arrays = columns
2352            .into_iter()
2353            .map(|(_, array)| array)
2354            .collect::<Vec<_>>();
2355
2356        let schema = Arc::new(Schema::new(fields));
2357        let batch = RecordBatch::try_new(schema, arrays)?;
2358        let ctx = SessionContext::new();
2359        let df = ctx.read_batch(batch)?;
2360        Ok(df)
2361    }
2362}
2363
2364/// Macro for creating DataFrame.
2365/// # Example
2366/// ```
2367/// use datafusion::prelude::dataframe;
2368/// # use datafusion::error::Result;
2369/// # #[tokio::main]
2370/// # async fn main() -> Result<()> {
2371/// let df = dataframe!(
2372///    "id" => [1, 2, 3],
2373///    "name" => ["foo", "bar", "baz"]
2374///  )?;
2375/// df.show().await?;
2376/// // +----+------+,
2377/// // | id | name |,
2378/// // +----+------+,
2379/// // | 1  | foo  |,
2380/// // | 2  | bar  |,
2381/// // | 3  | baz  |,
2382/// // +----+------+,
2383/// let df_empty = dataframe!()?; // empty DataFrame
2384/// assert_eq!(df_empty.schema().fields().len(), 0);
2385/// assert_eq!(df_empty.count().await?, 0);
2386/// # Ok(())
2387/// # }
2388/// ```
2389#[macro_export]
2390macro_rules! dataframe {
2391    () => {{
2392        use std::sync::Arc;
2393
2394        use datafusion::prelude::SessionContext;
2395        use datafusion::arrow::array::RecordBatch;
2396        use datafusion::arrow::datatypes::Schema;
2397
2398        let ctx = SessionContext::new();
2399        let batch = RecordBatch::new_empty(Arc::new(Schema::empty()));
2400        ctx.read_batch(batch)
2401    }};
2402
2403    ($($name:expr => $data:expr),+ $(,)?) => {{
2404        use datafusion::prelude::DataFrame;
2405        use datafusion::common::test_util::IntoArrayRef;
2406
2407        let columns = vec![
2408            $(
2409                ($name, $data.into_array_ref()),
2410            )+
2411        ];
2412
2413        DataFrame::from_columns(columns)
2414    }};
2415}
2416
2417#[derive(Debug)]
2418struct DataFrameTableProvider {
2419    plan: LogicalPlan,
2420}
2421
2422#[async_trait]
2423impl TableProvider for DataFrameTableProvider {
2424    fn as_any(&self) -> &dyn Any {
2425        self
2426    }
2427
2428    fn get_logical_plan(&self) -> Option<Cow<LogicalPlan>> {
2429        Some(Cow::Borrowed(&self.plan))
2430    }
2431
2432    fn supports_filters_pushdown(
2433        &self,
2434        filters: &[&Expr],
2435    ) -> Result<Vec<TableProviderFilterPushDown>> {
2436        // A filter is added on the DataFrame when given
2437        Ok(vec![TableProviderFilterPushDown::Exact; filters.len()])
2438    }
2439
2440    fn schema(&self) -> SchemaRef {
2441        let schema: Schema = self.plan.schema().as_ref().into();
2442        Arc::new(schema)
2443    }
2444
2445    fn table_type(&self) -> TableType {
2446        TableType::View
2447    }
2448
2449    async fn scan(
2450        &self,
2451        state: &dyn Session,
2452        projection: Option<&Vec<usize>>,
2453        filters: &[Expr],
2454        limit: Option<usize>,
2455    ) -> Result<Arc<dyn ExecutionPlan>> {
2456        let mut expr = LogicalPlanBuilder::from(self.plan.clone());
2457        // Add filter when given
2458        let filter = filters.iter().cloned().reduce(|acc, new| acc.and(new));
2459        if let Some(filter) = filter {
2460            expr = expr.filter(filter)?
2461        }
2462
2463        if let Some(p) = projection {
2464            expr = expr.select(p.iter().copied())?
2465        }
2466
2467        // add a limit if given
2468        if let Some(l) = limit {
2469            expr = expr.limit(0, Some(l))?
2470        }
2471        let plan = expr.build()?;
2472        state.create_physical_plan(&plan).await
2473    }
2474}
2475
2476// see tests in datafusion/core/tests/dataframe/mod.rs:2816