datafusion/dataframe/mod.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`DataFrame`] API for building and executing query plans.
19
20#[cfg(feature = "parquet")]
21mod parquet;
22
23use crate::arrow::record_batch::RecordBatch;
24use crate::arrow::util::pretty;
25use crate::datasource::file_format::csv::CsvFormatFactory;
26use crate::datasource::file_format::format_as_file_type;
27use crate::datasource::file_format::json::JsonFormatFactory;
28use crate::datasource::{
29 provider_as_source, DefaultTableSource, MemTable, TableProvider,
30};
31use crate::error::Result;
32use crate::execution::context::{SessionState, TaskContext};
33use crate::execution::FunctionRegistry;
34use crate::logical_expr::utils::find_window_exprs;
35use crate::logical_expr::{
36 col, ident, Expr, JoinType, LogicalPlan, LogicalPlanBuilder,
37 LogicalPlanBuilderOptions, Partitioning, TableType,
38};
39use crate::physical_plan::{
40 collect, collect_partitioned, execute_stream, execute_stream_partitioned,
41 ExecutionPlan, SendableRecordBatchStream,
42};
43use crate::prelude::SessionContext;
44use std::any::Any;
45use std::borrow::Cow;
46use std::collections::HashMap;
47use std::sync::Arc;
48
49use arrow::array::{Array, ArrayRef, Int64Array, StringArray};
50use arrow::compute::{cast, concat};
51use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
52use datafusion_common::config::{CsvOptions, JsonOptions};
53use datafusion_common::{
54 exec_err, not_impl_err, plan_datafusion_err, plan_err, Column, DFSchema,
55 DataFusionError, ParamValues, ScalarValue, SchemaError, UnnestOptions,
56};
57use datafusion_expr::select_expr::SelectExpr;
58use datafusion_expr::{
59 case,
60 dml::InsertOp,
61 expr::{Alias, ScalarFunction},
62 is_null, lit,
63 utils::COUNT_STAR_EXPANSION,
64 ExplainOption, SortExpr, TableProviderFilterPushDown, UNNAMED_TABLE,
65};
66use datafusion_functions::core::coalesce;
67use datafusion_functions_aggregate::expr_fn::{
68 avg, count, max, median, min, stddev, sum,
69};
70
71use async_trait::async_trait;
72use datafusion_catalog::Session;
73use datafusion_sql::TableReference;
74
75/// Contains options that control how data is
76/// written out from a DataFrame
77pub struct DataFrameWriteOptions {
78 /// Controls how new data should be written to the table, determining whether
79 /// to append, overwrite, or replace existing data.
80 insert_op: InsertOp,
81 /// Controls if all partitions should be coalesced into a single output file
82 /// Generally will have slower performance when set to true.
83 single_file_output: bool,
84 /// Sets which columns should be used for hive-style partitioned writes by name.
85 /// Can be set to empty vec![] for non-partitioned writes.
86 partition_by: Vec<String>,
87 /// Sets which columns should be used for sorting the output by name.
88 /// Can be set to empty vec![] for non-sorted writes.
89 sort_by: Vec<SortExpr>,
90}
91
92impl DataFrameWriteOptions {
93 /// Create a new DataFrameWriteOptions with default values
94 pub fn new() -> Self {
95 DataFrameWriteOptions {
96 insert_op: InsertOp::Append,
97 single_file_output: false,
98 partition_by: vec![],
99 sort_by: vec![],
100 }
101 }
102
103 /// Set the insert operation
104 pub fn with_insert_operation(mut self, insert_op: InsertOp) -> Self {
105 self.insert_op = insert_op;
106 self
107 }
108
109 /// Set the single_file_output value to true or false
110 pub fn with_single_file_output(mut self, single_file_output: bool) -> Self {
111 self.single_file_output = single_file_output;
112 self
113 }
114
115 /// Sets the partition_by columns for output partitioning
116 pub fn with_partition_by(mut self, partition_by: Vec<String>) -> Self {
117 self.partition_by = partition_by;
118 self
119 }
120
121 /// Sets the sort_by columns for output sorting
122 pub fn with_sort_by(mut self, sort_by: Vec<SortExpr>) -> Self {
123 self.sort_by = sort_by;
124 self
125 }
126}
127
128impl Default for DataFrameWriteOptions {
129 fn default() -> Self {
130 Self::new()
131 }
132}
133
134/// Represents a logical set of rows with the same named columns.
135///
136/// Similar to a [Pandas DataFrame] or [Spark DataFrame], a DataFusion DataFrame
137/// represents a 2 dimensional table of rows and columns.
138///
139/// The typical workflow using DataFrames looks like
140///
141/// 1. Create a DataFrame via methods on [SessionContext], such as [`read_csv`]
142/// and [`read_parquet`].
143///
144/// 2. Build a desired calculation by calling methods such as [`filter`],
145/// [`select`], [`aggregate`], and [`limit`]
146///
147/// 3. Execute into [`RecordBatch`]es by calling [`collect`]
148///
149/// A `DataFrame` is a wrapper around a [`LogicalPlan`] and the [`SessionState`]
150/// required for execution.
151///
152/// DataFrames are "lazy" in the sense that most methods do not actually compute
153/// anything, they just build up a plan. Calling [`collect`] executes the plan
154/// using the same DataFusion planning and execution process used to execute SQL
155/// and other queries.
156///
157/// [Pandas DataFrame]: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html
158/// [Spark DataFrame]: https://spark.apache.org/docs/latest/sql-programming-guide.html
159/// [`read_csv`]: SessionContext::read_csv
160/// [`read_parquet`]: SessionContext::read_parquet
161/// [`filter`]: DataFrame::filter
162/// [`select`]: DataFrame::select
163/// [`aggregate`]: DataFrame::aggregate
164/// [`limit`]: DataFrame::limit
165/// [`collect`]: DataFrame::collect
166///
167/// # Example
168/// ```
169/// # use std::sync::Arc;
170/// # use datafusion::prelude::*;
171/// # use datafusion::error::Result;
172/// # use datafusion::functions_aggregate::expr_fn::min;
173/// # use datafusion::arrow::array::{Int32Array, RecordBatch, StringArray};
174/// # use datafusion::arrow::datatypes::{DataType, Field, Schema};
175/// # #[tokio::main]
176/// # async fn main() -> Result<()> {
177/// let ctx = SessionContext::new();
178/// // Read the data from a csv file
179/// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
180/// // create a new dataframe that computes the equivalent of
181/// // `SELECT a, MIN(b) FROM df WHERE a <= b GROUP BY a LIMIT 100;`
182/// let df = df.filter(col("a").lt_eq(col("b")))?
183/// .aggregate(vec![col("a")], vec![min(col("b"))])?
184/// .limit(0, Some(100))?;
185/// // Perform the actual computation
186/// let results = df.collect();
187///
188/// // Create a new dataframe with in-memory data
189/// let schema = Schema::new(vec![
190/// Field::new("id", DataType::Int32, true),
191/// Field::new("name", DataType::Utf8, true),
192/// ]);
193/// let batch = RecordBatch::try_new(
194/// Arc::new(schema),
195/// vec![
196/// Arc::new(Int32Array::from(vec![1, 2, 3])),
197/// Arc::new(StringArray::from(vec!["foo", "bar", "baz"])),
198/// ],
199/// )?;
200/// let df = ctx.read_batch(batch)?;
201/// df.show().await?;
202///
203/// // Create a new dataframe with in-memory data using macro
204/// let df = dataframe!(
205/// "id" => [1, 2, 3],
206/// "name" => ["foo", "bar", "baz"]
207/// )?;
208/// df.show().await?;
209/// # Ok(())
210/// # }
211/// ```
212#[derive(Debug, Clone)]
213pub struct DataFrame {
214 // Box the (large) SessionState to reduce the size of DataFrame on the stack
215 session_state: Box<SessionState>,
216 plan: LogicalPlan,
217 // Whether projection ops can skip validation or not. This flag if false
218 // allows for an optimization in `with_column` and `with_column_renamed` functions
219 // where the recursive work required to columnize and normalize expressions can
220 // be skipped if set to false. Since these function calls are often chained or
221 // called many times in dataframe operations this can result in a significant
222 // performance gain.
223 //
224 // The conditions where this can be set to false is when the dataframe function
225 // call results in the last operation being a
226 // `LogicalPlanBuilder::from(plan).project(fields)?.build()` or
227 // `LogicalPlanBuilder::from(plan).project_with_validation(fields)?.build()`
228 // call. This requirement guarantees that the plan has had all columnization
229 // and normalization applied to existing expressions and only new expressions
230 // will require that work. Any operation that update the plan in any way
231 // via anything other than a `project` call should set this to true.
232 projection_requires_validation: bool,
233}
234
235impl DataFrame {
236 /// Create a new `DataFrame ` based on an existing `LogicalPlan`
237 ///
238 /// This is a low-level method and is not typically used by end users. See
239 /// [`SessionContext::read_csv`] and other methods for creating a
240 /// `DataFrame` from an existing datasource.
241 pub fn new(session_state: SessionState, plan: LogicalPlan) -> Self {
242 Self {
243 session_state: Box::new(session_state),
244 plan,
245 projection_requires_validation: true,
246 }
247 }
248
249 /// Creates logical expression from a SQL query text.
250 /// The expression is created and processed against the current schema.
251 ///
252 /// # Example: Parsing SQL queries
253 /// ```
254 /// # use arrow::datatypes::{DataType, Field, Schema};
255 /// # use datafusion::prelude::*;
256 /// # use datafusion_common::{DFSchema, Result};
257 /// # #[tokio::main]
258 /// # async fn main() -> Result<()> {
259 /// // datafusion will parse number as i64 first.
260 /// let sql = "a > 1 and b in (1, 10)";
261 /// let expected = col("a").gt(lit(1 as i64))
262 /// .and(col("b").in_list(vec![lit(1 as i64), lit(10 as i64)], false));
263 /// let ctx = SessionContext::new();
264 /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
265 /// let expr = df.parse_sql_expr(sql)?;
266 /// assert_eq!(expected, expr);
267 /// # Ok(())
268 /// # }
269 /// ```
270 pub fn parse_sql_expr(&self, sql: &str) -> Result<Expr> {
271 let df_schema = self.schema();
272
273 self.session_state.create_logical_expr(sql, df_schema)
274 }
275
276 /// Consume the DataFrame and produce a physical plan
277 pub async fn create_physical_plan(self) -> Result<Arc<dyn ExecutionPlan>> {
278 self.session_state.create_physical_plan(&self.plan).await
279 }
280
281 /// Filter the DataFrame by column. Returns a new DataFrame only containing the
282 /// specified columns.
283 ///
284 /// ```
285 /// # use datafusion::prelude::*;
286 /// # use datafusion::error::Result;
287 /// # use datafusion_common::assert_batches_sorted_eq;
288 /// # #[tokio::main]
289 /// # async fn main() -> Result<()> {
290 /// let ctx = SessionContext::new();
291 /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
292 /// let df = df.select_columns(&["a", "b"])?;
293 /// let expected = vec![
294 /// "+---+---+",
295 /// "| a | b |",
296 /// "+---+---+",
297 /// "| 1 | 2 |",
298 /// "+---+---+"
299 /// ];
300 /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
301 /// # Ok(())
302 /// # }
303 /// ```
304 pub fn select_columns(self, columns: &[&str]) -> Result<DataFrame> {
305 let fields = columns
306 .iter()
307 .map(|name| {
308 self.plan
309 .schema()
310 .qualified_field_with_unqualified_name(name)
311 })
312 .collect::<Result<Vec<_>>>()?;
313 let expr: Vec<Expr> = fields
314 .into_iter()
315 .map(|(qualifier, field)| Expr::Column(Column::from((qualifier, field))))
316 .collect();
317 self.select(expr)
318 }
319 /// Project arbitrary list of expression strings into a new `DataFrame`.
320 /// Method will parse string expressions into logical plan expressions.
321 ///
322 /// The output `DataFrame` has one column for each element in `exprs`.
323 ///
324 /// # Example
325 /// ```
326 /// # use datafusion::prelude::*;
327 /// # use datafusion::error::Result;
328 /// # #[tokio::main]
329 /// # async fn main() -> Result<()> {
330 /// let ctx = SessionContext::new();
331 /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
332 /// let df : DataFrame = df.select_exprs(&["a * b", "c"])?;
333 /// # Ok(())
334 /// # }
335 /// ```
336 pub fn select_exprs(self, exprs: &[&str]) -> Result<DataFrame> {
337 let expr_list = exprs
338 .iter()
339 .map(|e| self.parse_sql_expr(e))
340 .collect::<Result<Vec<_>>>()?;
341
342 self.select(expr_list)
343 }
344
345 /// Project arbitrary expressions (like SQL SELECT expressions) into a new
346 /// `DataFrame`.
347 ///
348 /// The output `DataFrame` has one column for each element in `expr_list`.
349 ///
350 /// # Example
351 /// ```
352 /// # use datafusion::prelude::*;
353 /// # use datafusion::error::Result;
354 /// # use datafusion_common::assert_batches_sorted_eq;
355 /// # #[tokio::main]
356 /// # async fn main() -> Result<()> {
357 /// let ctx = SessionContext::new();
358 /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
359 /// let df = df.select(vec![col("a"), col("b") * col("c")])?;
360 /// let expected = vec![
361 /// "+---+-----------------------+",
362 /// "| a | ?table?.b * ?table?.c |",
363 /// "+---+-----------------------+",
364 /// "| 1 | 6 |",
365 /// "+---+-----------------------+"
366 /// ];
367 /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
368 /// # Ok(())
369 /// # }
370 /// ```
371 pub fn select(
372 self,
373 expr_list: impl IntoIterator<Item = impl Into<SelectExpr>>,
374 ) -> Result<DataFrame> {
375 let expr_list: Vec<SelectExpr> =
376 expr_list.into_iter().map(|e| e.into()).collect::<Vec<_>>();
377
378 let expressions = expr_list.iter().filter_map(|e| match e {
379 SelectExpr::Expression(expr) => Some(expr),
380 _ => None,
381 });
382
383 let window_func_exprs = find_window_exprs(expressions);
384 let plan = if window_func_exprs.is_empty() {
385 self.plan
386 } else {
387 LogicalPlanBuilder::window_plan(self.plan, window_func_exprs)?
388 };
389
390 let project_plan = LogicalPlanBuilder::from(plan).project(expr_list)?.build()?;
391
392 Ok(DataFrame {
393 session_state: self.session_state,
394 plan: project_plan,
395 projection_requires_validation: false,
396 })
397 }
398
399 /// Returns a new DataFrame containing all columns except the specified columns.
400 ///
401 /// ```
402 /// # use datafusion::prelude::*;
403 /// # use datafusion::error::Result;
404 /// # use datafusion_common::assert_batches_sorted_eq;
405 /// # #[tokio::main]
406 /// # async fn main() -> Result<()> {
407 /// let ctx = SessionContext::new();
408 /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
409 /// // +----+----+----+
410 /// // | a | b | c |
411 /// // +----+----+----+
412 /// // | 1 | 2 | 3 |
413 /// // +----+----+----+
414 /// let df = df.drop_columns(&["a"])?;
415 /// let expected = vec![
416 /// "+---+---+",
417 /// "| b | c |",
418 /// "+---+---+",
419 /// "| 2 | 3 |",
420 /// "+---+---+"
421 /// ];
422 /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
423 /// # Ok(())
424 /// # }
425 /// ```
426 pub fn drop_columns(self, columns: &[&str]) -> Result<DataFrame> {
427 let fields_to_drop = columns
428 .iter()
429 .map(|name| {
430 self.plan
431 .schema()
432 .qualified_field_with_unqualified_name(name)
433 })
434 .filter(|r| r.is_ok())
435 .collect::<Result<Vec<_>>>()?;
436 let expr: Vec<Expr> = self
437 .plan
438 .schema()
439 .fields()
440 .into_iter()
441 .enumerate()
442 .map(|(idx, _)| self.plan.schema().qualified_field(idx))
443 .filter(|(qualifier, f)| !fields_to_drop.contains(&(*qualifier, f)))
444 .map(|(qualifier, field)| Expr::Column(Column::from((qualifier, field))))
445 .collect();
446 self.select(expr)
447 }
448
449 /// Expand multiple list/struct columns into a set of rows and new columns.
450 ///
451 /// See also: [`UnnestOptions`] documentation for the behavior of `unnest`
452 ///
453 /// # Example
454 /// ```
455 /// # use datafusion::prelude::*;
456 /// # use datafusion::error::Result;
457 /// # use datafusion_common::assert_batches_sorted_eq;
458 /// # #[tokio::main]
459 /// # async fn main() -> Result<()> {
460 /// let ctx = SessionContext::new();
461 /// let df = ctx.read_json("tests/data/unnest.json", NdJsonReadOptions::default()).await?;
462 /// // expand into multiple columns if it's json array, flatten field name if it's nested structure
463 /// let df = df.unnest_columns(&["b","c","d"])?;
464 /// let expected = vec![
465 /// "+---+------+-------+-----+-----+",
466 /// "| a | b | c | d.e | d.f |",
467 /// "+---+------+-------+-----+-----+",
468 /// "| 1 | 2.0 | false | 1 | 2 |",
469 /// "| 1 | 1.3 | true | 1 | 2 |",
470 /// "| 1 | -6.1 | | 1 | 2 |",
471 /// "| 2 | 3.0 | false | | |",
472 /// "| 2 | 2.3 | true | | |",
473 /// "| 2 | -7.1 | | | |",
474 /// "+---+------+-------+-----+-----+"
475 /// ];
476 /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
477 /// # Ok(())
478 /// # }
479 /// ```
480 pub fn unnest_columns(self, columns: &[&str]) -> Result<DataFrame> {
481 self.unnest_columns_with_options(columns, UnnestOptions::new())
482 }
483
484 /// Expand multiple list columns into a set of rows, with
485 /// behavior controlled by [`UnnestOptions`].
486 ///
487 /// Please see the documentation on [`UnnestOptions`] for more
488 /// details about the meaning of unnest.
489 pub fn unnest_columns_with_options(
490 self,
491 columns: &[&str],
492 options: UnnestOptions,
493 ) -> Result<DataFrame> {
494 let columns = columns.iter().map(|c| Column::from(*c)).collect();
495 let plan = LogicalPlanBuilder::from(self.plan)
496 .unnest_columns_with_options(columns, options)?
497 .build()?;
498 Ok(DataFrame {
499 session_state: self.session_state,
500 plan,
501 projection_requires_validation: true,
502 })
503 }
504
505 /// Return a DataFrame with only rows for which `predicate` evaluates to
506 /// `true`.
507 ///
508 /// Rows for which `predicate` evaluates to `false` or `null`
509 /// are filtered out.
510 ///
511 /// # Example
512 /// ```
513 /// # use datafusion::prelude::*;
514 /// # use datafusion::error::Result;
515 /// # use datafusion_common::assert_batches_sorted_eq;
516 /// # #[tokio::main]
517 /// # async fn main() -> Result<()> {
518 /// let ctx = SessionContext::new();
519 /// let df = ctx.read_csv("tests/data/example_long.csv", CsvReadOptions::new()).await?;
520 /// let df = df.filter(col("a").lt_eq(col("b")))?;
521 /// // all rows where a <= b are returned
522 /// let expected = vec![
523 /// "+---+---+---+",
524 /// "| a | b | c |",
525 /// "+---+---+---+",
526 /// "| 1 | 2 | 3 |",
527 /// "| 4 | 5 | 6 |",
528 /// "| 7 | 8 | 9 |",
529 /// "+---+---+---+"
530 /// ];
531 /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
532 /// # Ok(())
533 /// # }
534 /// ```
535 pub fn filter(self, predicate: Expr) -> Result<DataFrame> {
536 let plan = LogicalPlanBuilder::from(self.plan)
537 .filter(predicate)?
538 .build()?;
539 Ok(DataFrame {
540 session_state: self.session_state,
541 plan,
542 projection_requires_validation: true,
543 })
544 }
545
546 /// Return a new `DataFrame` that aggregates the rows of the current
547 /// `DataFrame`, first optionally grouping by the given expressions.
548 ///
549 /// # Example
550 /// ```
551 /// # use datafusion::prelude::*;
552 /// # use datafusion::error::Result;
553 /// # use datafusion::functions_aggregate::expr_fn::min;
554 /// # use datafusion_common::assert_batches_sorted_eq;
555 /// # #[tokio::main]
556 /// # async fn main() -> Result<()> {
557 /// let ctx = SessionContext::new();
558 /// let df = ctx.read_csv("tests/data/example_long.csv", CsvReadOptions::new()).await?;
559 ///
560 /// // The following use is the equivalent of "SELECT MIN(b) GROUP BY a"
561 /// let df1 = df.clone().aggregate(vec![col("a")], vec![min(col("b"))])?;
562 /// let expected1 = vec![
563 /// "+---+----------------+",
564 /// "| a | min(?table?.b) |",
565 /// "+---+----------------+",
566 /// "| 1 | 2 |",
567 /// "| 4 | 5 |",
568 /// "| 7 | 8 |",
569 /// "+---+----------------+"
570 /// ];
571 /// assert_batches_sorted_eq!(expected1, &df1.collect().await?);
572 /// // The following use is the equivalent of "SELECT MIN(b)"
573 /// let df2 = df.aggregate(vec![], vec![min(col("b"))])?;
574 /// let expected2 = vec![
575 /// "+----------------+",
576 /// "| min(?table?.b) |",
577 /// "+----------------+",
578 /// "| 2 |",
579 /// "+----------------+"
580 /// ];
581 /// # assert_batches_sorted_eq!(expected2, &df2.collect().await?);
582 /// # Ok(())
583 /// # }
584 /// ```
585 pub fn aggregate(
586 self,
587 group_expr: Vec<Expr>,
588 aggr_expr: Vec<Expr>,
589 ) -> Result<DataFrame> {
590 let is_grouping_set = matches!(group_expr.as_slice(), [Expr::GroupingSet(_)]);
591 let aggr_expr_len = aggr_expr.len();
592 let options =
593 LogicalPlanBuilderOptions::new().with_add_implicit_group_by_exprs(true);
594 let plan = LogicalPlanBuilder::from(self.plan)
595 .with_options(options)
596 .aggregate(group_expr, aggr_expr)?
597 .build()?;
598 let plan = if is_grouping_set {
599 let grouping_id_pos = plan.schema().fields().len() - 1 - aggr_expr_len;
600 // For grouping sets we do a project to not expose the internal grouping id
601 let exprs = plan
602 .schema()
603 .columns()
604 .into_iter()
605 .enumerate()
606 .filter(|(idx, _)| *idx != grouping_id_pos)
607 .map(|(_, column)| Expr::Column(column))
608 .collect::<Vec<_>>();
609 LogicalPlanBuilder::from(plan).project(exprs)?.build()?
610 } else {
611 plan
612 };
613 Ok(DataFrame {
614 session_state: self.session_state,
615 plan,
616 projection_requires_validation: !is_grouping_set,
617 })
618 }
619
620 /// Return a new DataFrame that adds the result of evaluating one or more
621 /// window functions ([`Expr::WindowFunction`]) to the existing columns
622 pub fn window(self, window_exprs: Vec<Expr>) -> Result<DataFrame> {
623 let plan = LogicalPlanBuilder::from(self.plan)
624 .window(window_exprs)?
625 .build()?;
626 Ok(DataFrame {
627 session_state: self.session_state,
628 plan,
629 projection_requires_validation: true,
630 })
631 }
632
633 /// Returns a new `DataFrame` with a limited number of rows.
634 ///
635 /// # Arguments
636 /// `skip` - Number of rows to skip before fetch any row
637 /// `fetch` - Maximum number of rows to return, after skipping `skip` rows.
638 ///
639 /// # Example
640 /// ```
641 /// # use datafusion::prelude::*;
642 /// # use datafusion::error::Result;
643 /// # use datafusion_common::assert_batches_sorted_eq;
644 /// # #[tokio::main]
645 /// # async fn main() -> Result<()> {
646 /// let ctx = SessionContext::new();
647 /// let df = ctx.read_csv("tests/data/example_long.csv", CsvReadOptions::new()).await?;
648 /// let df = df.limit(1, Some(2))?;
649 /// let expected = vec![
650 /// "+---+---+---+",
651 /// "| a | b | c |",
652 /// "+---+---+---+",
653 /// "| 4 | 5 | 6 |",
654 /// "| 7 | 8 | 9 |",
655 /// "+---+---+---+"
656 /// ];
657 /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
658 /// # Ok(())
659 /// # }
660 /// ```
661 pub fn limit(self, skip: usize, fetch: Option<usize>) -> Result<DataFrame> {
662 let plan = LogicalPlanBuilder::from(self.plan)
663 .limit(skip, fetch)?
664 .build()?;
665 Ok(DataFrame {
666 session_state: self.session_state,
667 plan,
668 projection_requires_validation: self.projection_requires_validation,
669 })
670 }
671
672 /// Calculate the union of two [`DataFrame`]s, preserving duplicate rows.
673 ///
674 /// The two [`DataFrame`]s must have exactly the same schema
675 ///
676 /// # Example
677 /// ```
678 /// # use datafusion::prelude::*;
679 /// # use datafusion::error::Result;
680 /// # use datafusion_common::assert_batches_sorted_eq;
681 /// # #[tokio::main]
682 /// # async fn main() -> Result<()> {
683 /// let ctx = SessionContext::new();
684 /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await? ;
685 /// let d2 = df.clone();
686 /// let df = df.union(d2)?;
687 /// let expected = vec![
688 /// "+---+---+---+",
689 /// "| a | b | c |",
690 /// "+---+---+---+",
691 /// "| 1 | 2 | 3 |",
692 /// "| 1 | 2 | 3 |",
693 /// "+---+---+---+"
694 /// ];
695 /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
696 /// # Ok(())
697 /// # }
698 /// ```
699 pub fn union(self, dataframe: DataFrame) -> Result<DataFrame> {
700 let plan = LogicalPlanBuilder::from(self.plan)
701 .union(dataframe.plan)?
702 .build()?;
703 Ok(DataFrame {
704 session_state: self.session_state,
705 plan,
706 projection_requires_validation: true,
707 })
708 }
709
710 /// Calculate the union of two [`DataFrame`]s using column names, preserving duplicate rows.
711 ///
712 /// The two [`DataFrame`]s are combined using column names rather than position,
713 /// filling missing columns with null.
714 ///
715 ///
716 /// # Example
717 /// ```
718 /// # use datafusion::prelude::*;
719 /// # use datafusion::error::Result;
720 /// # use datafusion_common::assert_batches_sorted_eq;
721 /// # #[tokio::main]
722 /// # async fn main() -> Result<()> {
723 /// let ctx = SessionContext::new();
724 /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
725 /// let d2 = df.clone().select_columns(&["b", "c", "a"])?.with_column("d", lit("77"))?;
726 /// let df = df.union_by_name(d2)?;
727 /// let expected = vec![
728 /// "+---+---+---+----+",
729 /// "| a | b | c | d |",
730 /// "+---+---+---+----+",
731 /// "| 1 | 2 | 3 | |",
732 /// "| 1 | 2 | 3 | 77 |",
733 /// "+---+---+---+----+"
734 /// ];
735 /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
736 /// # Ok(())
737 /// # }
738 /// ```
739 pub fn union_by_name(self, dataframe: DataFrame) -> Result<DataFrame> {
740 let plan = LogicalPlanBuilder::from(self.plan)
741 .union_by_name(dataframe.plan)?
742 .build()?;
743 Ok(DataFrame {
744 session_state: self.session_state,
745 plan,
746 projection_requires_validation: true,
747 })
748 }
749
750 /// Calculate the distinct union of two [`DataFrame`]s.
751 ///
752 /// The two [`DataFrame`]s must have exactly the same schema. Any duplicate
753 /// rows are discarded.
754 ///
755 /// # Example
756 /// ```
757 /// # use datafusion::prelude::*;
758 /// # use datafusion::error::Result;
759 /// # use datafusion_common::assert_batches_sorted_eq;
760 /// # #[tokio::main]
761 /// # async fn main() -> Result<()> {
762 /// let ctx = SessionContext::new();
763 /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
764 /// let d2 = df.clone();
765 /// let df = df.union_distinct(d2)?;
766 /// // df2 are duplicate of df
767 /// let expected = vec![
768 /// "+---+---+---+",
769 /// "| a | b | c |",
770 /// "+---+---+---+",
771 /// "| 1 | 2 | 3 |",
772 /// "+---+---+---+"
773 /// ];
774 /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
775 /// # Ok(())
776 /// # }
777 /// ```
778 pub fn union_distinct(self, dataframe: DataFrame) -> Result<DataFrame> {
779 let plan = LogicalPlanBuilder::from(self.plan)
780 .union_distinct(dataframe.plan)?
781 .build()?;
782 Ok(DataFrame {
783 session_state: self.session_state,
784 plan,
785 projection_requires_validation: true,
786 })
787 }
788
789 /// Calculate the union of two [`DataFrame`]s using column names with all duplicated rows removed.
790 ///
791 /// The two [`DataFrame`]s are combined using column names rather than position,
792 /// filling missing columns with null.
793 ///
794 ///
795 /// # Example
796 /// ```
797 /// # use datafusion::prelude::*;
798 /// # use datafusion::error::Result;
799 /// # use datafusion_common::assert_batches_sorted_eq;
800 /// # #[tokio::main]
801 /// # async fn main() -> Result<()> {
802 /// let ctx = SessionContext::new();
803 /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
804 /// let d2 = df.clone().select_columns(&["b", "c", "a"])?;
805 /// let df = df.union_by_name_distinct(d2)?;
806 /// let expected = vec![
807 /// "+---+---+---+",
808 /// "| a | b | c |",
809 /// "+---+---+---+",
810 /// "| 1 | 2 | 3 |",
811 /// "+---+---+---+"
812 /// ];
813 /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
814 /// # Ok(())
815 /// # }
816 /// ```
817 pub fn union_by_name_distinct(self, dataframe: DataFrame) -> Result<DataFrame> {
818 let plan = LogicalPlanBuilder::from(self.plan)
819 .union_by_name_distinct(dataframe.plan)?
820 .build()?;
821 Ok(DataFrame {
822 session_state: self.session_state,
823 plan,
824 projection_requires_validation: true,
825 })
826 }
827
828 /// Return a new `DataFrame` with all duplicated rows removed.
829 ///
830 /// # Example
831 /// ```
832 /// # use datafusion::prelude::*;
833 /// # use datafusion::error::Result;
834 /// # use datafusion_common::assert_batches_sorted_eq;
835 /// # #[tokio::main]
836 /// # async fn main() -> Result<()> {
837 /// let ctx = SessionContext::new();
838 /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
839 /// let df = df.distinct()?;
840 /// let expected = vec![
841 /// "+---+---+---+",
842 /// "| a | b | c |",
843 /// "+---+---+---+",
844 /// "| 1 | 2 | 3 |",
845 /// "+---+---+---+"
846 /// ];
847 /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
848 /// # Ok(())
849 /// # }
850 /// ```
851 pub fn distinct(self) -> Result<DataFrame> {
852 let plan = LogicalPlanBuilder::from(self.plan).distinct()?.build()?;
853 Ok(DataFrame {
854 session_state: self.session_state,
855 plan,
856 projection_requires_validation: true,
857 })
858 }
859
860 /// Return a new `DataFrame` with duplicated rows removed as per the specified expression list
861 /// according to the provided sorting expressions grouped by the `DISTINCT ON` clause
862 /// expressions.
863 ///
864 /// # Example
865 /// ```
866 /// # use datafusion::prelude::*;
867 /// # use datafusion::error::Result;
868 /// # use datafusion_common::assert_batches_sorted_eq;
869 /// # #[tokio::main]
870 /// # async fn main() -> Result<()> {
871 /// let ctx = SessionContext::new();
872 /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?
873 /// // Return a single row (a, b) for each distinct value of a
874 /// .distinct_on(vec![col("a")], vec![col("a"), col("b")], None)?;
875 /// let expected = vec![
876 /// "+---+---+",
877 /// "| a | b |",
878 /// "+---+---+",
879 /// "| 1 | 2 |",
880 /// "+---+---+"
881 /// ];
882 /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
883 /// # Ok(())
884 /// # }
885 /// ```
886 pub fn distinct_on(
887 self,
888 on_expr: Vec<Expr>,
889 select_expr: Vec<Expr>,
890 sort_expr: Option<Vec<SortExpr>>,
891 ) -> Result<DataFrame> {
892 let plan = LogicalPlanBuilder::from(self.plan)
893 .distinct_on(on_expr, select_expr, sort_expr)?
894 .build()?;
895 Ok(DataFrame {
896 session_state: self.session_state,
897 plan,
898 projection_requires_validation: true,
899 })
900 }
901
902 /// Return a new `DataFrame` that has statistics for a DataFrame.
903 ///
904 /// Only summarizes numeric datatypes at the moment and returns nulls for
905 /// non numeric datatypes. The output format is modeled after pandas
906 ///
907 /// # Example
908 /// ```
909 /// # use datafusion::prelude::*;
910 /// # use datafusion::error::Result;
911 /// # use arrow::util::pretty;
912 /// # use datafusion_common::assert_batches_sorted_eq;
913 /// # #[tokio::main]
914 /// # async fn main() -> Result<()> {
915 /// let ctx = SessionContext::new();
916 /// let df = ctx.read_csv("tests/tpch-csv/customer.csv", CsvReadOptions::new()).await?;
917 /// let stat = df.describe().await?;
918 /// # // some output column are ignored
919 /// let expected = vec![
920 /// "+------------+--------------------+--------------------+------------------------------------+--------------------+-----------------+--------------------+--------------+----------------------------------------------------------------------------------------------------------+",
921 /// "| describe | c_custkey | c_name | c_address | c_nationkey | c_phone | c_acctbal | c_mktsegment | c_comment |",
922 /// "+------------+--------------------+--------------------+------------------------------------+--------------------+-----------------+--------------------+--------------+----------------------------------------------------------------------------------------------------------+",
923 /// "| count | 9.0 | 9 | 9 | 9.0 | 9 | 9.0 | 9 | 9 |",
924 /// "| max | 10.0 | Customer#000000010 | xKiAFTjUsCuxfeleNqefumTrjS | 20.0 | 30-114-968-4951 | 9561.95 | MACHINERY | tions. even deposits boost according to the slyly bold packages. final accounts cajole requests. furious |",
925 /// "| mean | 6.0 | null | null | 9.88888888888889 | null | 5153.2155555555555 | null | null |",
926 /// "| median | 6.0 | null | null | 8.0 | null | 6819.74 | null | null |",
927 /// "| min | 2.0 | Customer#000000002 | 6LrEaV6KR6PLVcgl2ArL Q3rqzLzcT1 v2 | 1.0 | 11-719-748-3364 | 121.65 | AUTOMOBILE | deposits eat slyly ironic, even instructions. express foxes detect slyly. blithely even accounts abov |",
928 /// "| null_count | 0.0 | 0 | 0 | 0.0 | 0 | 0.0 | 0 | 0 |",
929 /// "| std | 2.7386127875258306 | null | null | 7.2188026092359046 | null | 3522.169804254585 | null | null |",
930 /// "+------------+--------------------+--------------------+------------------------------------+--------------------+-----------------+--------------------+--------------+----------------------------------------------------------------------------------------------------------+"];
931 /// assert_batches_sorted_eq!(expected, &stat.collect().await?);
932 /// # Ok(())
933 /// # }
934 /// ```
935 pub async fn describe(self) -> Result<Self> {
936 //the functions now supported
937 let supported_describe_functions =
938 vec!["count", "null_count", "mean", "std", "min", "max", "median"];
939
940 let original_schema_fields = self.schema().fields().iter();
941
942 //define describe column
943 let mut describe_schemas = vec![Field::new("describe", DataType::Utf8, false)];
944 describe_schemas.extend(original_schema_fields.clone().map(|field| {
945 if field.data_type().is_numeric() {
946 Field::new(field.name(), DataType::Float64, true)
947 } else {
948 Field::new(field.name(), DataType::Utf8, true)
949 }
950 }));
951
952 //collect recordBatch
953 let describe_record_batch = vec![
954 // count aggregation
955 self.clone().aggregate(
956 vec![],
957 original_schema_fields
958 .clone()
959 .map(|f| count(ident(f.name())).alias(f.name()))
960 .collect::<Vec<_>>(),
961 ),
962 // null_count aggregation
963 self.clone().aggregate(
964 vec![],
965 original_schema_fields
966 .clone()
967 .map(|f| {
968 sum(case(is_null(ident(f.name())))
969 .when(lit(true), lit(1))
970 .otherwise(lit(0))
971 .unwrap())
972 .alias(f.name())
973 })
974 .collect::<Vec<_>>(),
975 ),
976 // mean aggregation
977 self.clone().aggregate(
978 vec![],
979 original_schema_fields
980 .clone()
981 .filter(|f| f.data_type().is_numeric())
982 .map(|f| avg(ident(f.name())).alias(f.name()))
983 .collect::<Vec<_>>(),
984 ),
985 // std aggregation
986 self.clone().aggregate(
987 vec![],
988 original_schema_fields
989 .clone()
990 .filter(|f| f.data_type().is_numeric())
991 .map(|f| stddev(ident(f.name())).alias(f.name()))
992 .collect::<Vec<_>>(),
993 ),
994 // min aggregation
995 self.clone().aggregate(
996 vec![],
997 original_schema_fields
998 .clone()
999 .filter(|f| {
1000 !matches!(f.data_type(), DataType::Binary | DataType::Boolean)
1001 })
1002 .map(|f| min(ident(f.name())).alias(f.name()))
1003 .collect::<Vec<_>>(),
1004 ),
1005 // max aggregation
1006 self.clone().aggregate(
1007 vec![],
1008 original_schema_fields
1009 .clone()
1010 .filter(|f| {
1011 !matches!(f.data_type(), DataType::Binary | DataType::Boolean)
1012 })
1013 .map(|f| max(ident(f.name())).alias(f.name()))
1014 .collect::<Vec<_>>(),
1015 ),
1016 // median aggregation
1017 self.clone().aggregate(
1018 vec![],
1019 original_schema_fields
1020 .clone()
1021 .filter(|f| f.data_type().is_numeric())
1022 .map(|f| median(ident(f.name())).alias(f.name()))
1023 .collect::<Vec<_>>(),
1024 ),
1025 ];
1026
1027 // first column with function names
1028 let mut array_ref_vec: Vec<ArrayRef> = vec![Arc::new(StringArray::from(
1029 supported_describe_functions.clone(),
1030 ))];
1031 for field in original_schema_fields {
1032 let mut array_datas = vec![];
1033 for result in describe_record_batch.iter() {
1034 let array_ref = match result {
1035 Ok(df) => {
1036 let batches = df.clone().collect().await;
1037 match batches {
1038 Ok(batches)
1039 if batches.len() == 1
1040 && batches[0]
1041 .column_by_name(field.name())
1042 .is_some() =>
1043 {
1044 let column =
1045 batches[0].column_by_name(field.name()).unwrap();
1046
1047 if column.data_type().is_null() {
1048 Arc::new(StringArray::from(vec!["null"]))
1049 } else if field.data_type().is_numeric() {
1050 cast(column, &DataType::Float64)?
1051 } else {
1052 cast(column, &DataType::Utf8)?
1053 }
1054 }
1055 _ => Arc::new(StringArray::from(vec!["null"])),
1056 }
1057 }
1058 //Handling error when only boolean/binary column, and in other cases
1059 Err(err)
1060 if err.to_string().contains(
1061 "Error during planning: \
1062 Aggregate requires at least one grouping \
1063 or aggregate expression",
1064 ) =>
1065 {
1066 Arc::new(StringArray::from(vec!["null"]))
1067 }
1068 Err(e) => return exec_err!("{}", e),
1069 };
1070 array_datas.push(array_ref);
1071 }
1072 array_ref_vec.push(concat(
1073 array_datas
1074 .iter()
1075 .map(|af| af.as_ref())
1076 .collect::<Vec<_>>()
1077 .as_slice(),
1078 )?);
1079 }
1080
1081 let describe_record_batch =
1082 RecordBatch::try_new(Arc::new(Schema::new(describe_schemas)), array_ref_vec)?;
1083
1084 let provider = MemTable::try_new(
1085 describe_record_batch.schema(),
1086 vec![vec![describe_record_batch]],
1087 )?;
1088
1089 let plan = LogicalPlanBuilder::scan(
1090 UNNAMED_TABLE,
1091 provider_as_source(Arc::new(provider)),
1092 None,
1093 )?
1094 .build()?;
1095
1096 Ok(DataFrame {
1097 session_state: self.session_state,
1098 plan,
1099 projection_requires_validation: self.projection_requires_validation,
1100 })
1101 }
1102
1103 /// Apply a sort by provided expressions with default direction
1104 pub fn sort_by(self, expr: Vec<Expr>) -> Result<DataFrame> {
1105 self.sort(
1106 expr.into_iter()
1107 .map(|e| e.sort(true, false))
1108 .collect::<Vec<SortExpr>>(),
1109 )
1110 }
1111
1112 /// Sort the DataFrame by the specified sorting expressions.
1113 ///
1114 /// Note that any expression can be turned into
1115 /// a sort expression by calling its [sort](Expr::sort) method.
1116 ///
1117 /// # Example
1118 ///
1119 /// ```
1120 /// # use datafusion::prelude::*;
1121 /// # use datafusion::error::Result;
1122 /// # use datafusion_common::assert_batches_sorted_eq;
1123 /// # #[tokio::main]
1124 /// # async fn main() -> Result<()> {
1125 /// let ctx = SessionContext::new();
1126 /// let df = ctx.read_csv("tests/data/example_long.csv", CsvReadOptions::new()).await?;
1127 /// let df = df.sort(vec![
1128 /// col("a").sort(false, true), // a DESC, nulls first
1129 /// col("b").sort(true, false), // b ASC, nulls last
1130 /// ])?;
1131 /// let expected = vec![
1132 /// "+---+---+---+",
1133 /// "| a | b | c |",
1134 /// "+---+---+---+",
1135 /// "| 1 | 2 | 3 |",
1136 /// "| 4 | 5 | 6 |",
1137 /// "| 7 | 8 | 9 |",
1138 /// "+---+---+---+",
1139 /// ];
1140 /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
1141 /// # Ok(())
1142 /// # }
1143 /// ```
1144 pub fn sort(self, expr: Vec<SortExpr>) -> Result<DataFrame> {
1145 let plan = LogicalPlanBuilder::from(self.plan).sort(expr)?.build()?;
1146 Ok(DataFrame {
1147 session_state: self.session_state,
1148 plan,
1149 projection_requires_validation: self.projection_requires_validation,
1150 })
1151 }
1152
1153 /// Join this `DataFrame` with another `DataFrame` using explicitly specified
1154 /// columns and an optional filter expression.
1155 ///
1156 /// See [`join_on`](Self::join_on) for a more concise way to specify the
1157 /// join condition. Since DataFusion will automatically identify and
1158 /// optimize equality predicates there is no performance difference between
1159 /// this function and `join_on`
1160 ///
1161 /// `left_cols` and `right_cols` are used to form "equijoin" predicates (see
1162 /// example below), which are then combined with the optional `filter`
1163 /// expression. If `left_cols` and `right_cols` contain ambiguous column
1164 /// references, they will be disambiguated by prioritizing the left relation
1165 /// for `left_cols` and the right relation for `right_cols`.
1166 ///
1167 /// Note that in case of outer join, the `filter` is applied to only matched rows.
1168 ///
1169 /// # Example
1170 /// ```
1171 /// # use datafusion::prelude::*;
1172 /// # use datafusion::error::Result;
1173 /// # use datafusion_common::assert_batches_sorted_eq;
1174 /// # #[tokio::main]
1175 /// # async fn main() -> Result<()> {
1176 /// let ctx = SessionContext::new();
1177 /// let left = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
1178 /// let right = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?
1179 /// .select(vec![
1180 /// col("a").alias("a2"),
1181 /// col("b").alias("b2"),
1182 /// col("c").alias("c2")])?;
1183 /// // Perform the equivalent of `left INNER JOIN right ON (a = a2 AND b = b2)`
1184 /// // finding all pairs of rows from `left` and `right` where `a = a2` and `b = b2`.
1185 /// let join = left.join(right, JoinType::Inner, &["a", "b"], &["a2", "b2"], None)?;
1186 /// let expected = vec![
1187 /// "+---+---+---+----+----+----+",
1188 /// "| a | b | c | a2 | b2 | c2 |",
1189 /// "+---+---+---+----+----+----+",
1190 /// "| 1 | 2 | 3 | 1 | 2 | 3 |",
1191 /// "+---+---+---+----+----+----+"
1192 /// ];
1193 /// assert_batches_sorted_eq!(expected, &join.collect().await?);
1194 /// # Ok(())
1195 /// # }
1196 /// ```
1197 ///
1198 pub fn join(
1199 self,
1200 right: DataFrame,
1201 join_type: JoinType,
1202 left_cols: &[&str],
1203 right_cols: &[&str],
1204 filter: Option<Expr>,
1205 ) -> Result<DataFrame> {
1206 let plan = LogicalPlanBuilder::from(self.plan)
1207 .join(
1208 right.plan,
1209 join_type,
1210 (left_cols.to_vec(), right_cols.to_vec()),
1211 filter,
1212 )?
1213 .build()?;
1214 Ok(DataFrame {
1215 session_state: self.session_state,
1216 plan,
1217 projection_requires_validation: true,
1218 })
1219 }
1220
1221 /// Join this `DataFrame` with another `DataFrame` using the specified
1222 /// expressions.
1223 ///
1224 /// Note that DataFusion automatically optimizes joins, including
1225 /// identifying and optimizing equality predicates.
1226 ///
1227 /// # Example
1228 /// ```
1229 /// # use datafusion::prelude::*;
1230 /// # use datafusion::error::Result;
1231 /// # use datafusion_common::assert_batches_sorted_eq;
1232 /// # #[tokio::main]
1233 /// # async fn main() -> Result<()> {
1234 /// let ctx = SessionContext::new();
1235 /// let left = ctx
1236 /// .read_csv("tests/data/example.csv", CsvReadOptions::new())
1237 /// .await?;
1238 /// let right = ctx
1239 /// .read_csv("tests/data/example.csv", CsvReadOptions::new())
1240 /// .await?
1241 /// .select(vec![
1242 /// col("a").alias("a2"),
1243 /// col("b").alias("b2"),
1244 /// col("c").alias("c2"),
1245 /// ])?;
1246 ///
1247 /// // Perform the equivalent of `left INNER JOIN right ON (a != a2 AND b != b2)`
1248 /// // finding all pairs of rows from `left` and `right` where
1249 /// // where `a != a2` and `b != b2`.
1250 /// let join_on = left.join_on(
1251 /// right,
1252 /// JoinType::Inner,
1253 /// [col("a").not_eq(col("a2")), col("b").not_eq(col("b2"))],
1254 /// )?;
1255 /// let expected = vec![
1256 /// "+---+---+---+----+----+----+",
1257 /// "| a | b | c | a2 | b2 | c2 |",
1258 /// "+---+---+---+----+----+----+",
1259 /// "+---+---+---+----+----+----+"
1260 /// ];
1261 /// # assert_batches_sorted_eq!(expected, &join_on.collect().await?);
1262 /// # Ok(())
1263 /// # }
1264 /// ```
1265 pub fn join_on(
1266 self,
1267 right: DataFrame,
1268 join_type: JoinType,
1269 on_exprs: impl IntoIterator<Item = Expr>,
1270 ) -> Result<DataFrame> {
1271 let plan = LogicalPlanBuilder::from(self.plan)
1272 .join_on(right.plan, join_type, on_exprs)?
1273 .build()?;
1274 Ok(DataFrame {
1275 session_state: self.session_state,
1276 plan,
1277 projection_requires_validation: true,
1278 })
1279 }
1280
1281 /// Repartition a DataFrame based on a logical partitioning scheme.
1282 ///
1283 /// # Example
1284 /// ```
1285 /// # use datafusion::prelude::*;
1286 /// # use datafusion::error::Result;
1287 /// # use datafusion_common::assert_batches_sorted_eq;
1288 /// # #[tokio::main]
1289 /// # async fn main() -> Result<()> {
1290 /// let ctx = SessionContext::new();
1291 /// let df = ctx.read_csv("tests/data/example_long.csv", CsvReadOptions::new()).await?;
1292 /// let df1 = df.repartition(Partitioning::RoundRobinBatch(4))?;
1293 /// let expected = vec![
1294 /// "+---+---+---+",
1295 /// "| a | b | c |",
1296 /// "+---+---+---+",
1297 /// "| 1 | 2 | 3 |",
1298 /// "| 4 | 5 | 6 |",
1299 /// "| 7 | 8 | 9 |",
1300 /// "+---+---+---+"
1301 /// ];
1302 /// # assert_batches_sorted_eq!(expected, &df1.collect().await?);
1303 /// # Ok(())
1304 /// # }
1305 /// ```
1306 pub fn repartition(self, partitioning_scheme: Partitioning) -> Result<DataFrame> {
1307 let plan = LogicalPlanBuilder::from(self.plan)
1308 .repartition(partitioning_scheme)?
1309 .build()?;
1310 Ok(DataFrame {
1311 session_state: self.session_state,
1312 plan,
1313 projection_requires_validation: true,
1314 })
1315 }
1316
1317 /// Return the total number of rows in this `DataFrame`.
1318 ///
1319 /// Note that this method will actually run a plan to calculate the count,
1320 /// which may be slow for large or complicated DataFrames.
1321 ///
1322 /// # Example
1323 /// ```
1324 /// # use datafusion::prelude::*;
1325 /// # use datafusion::error::Result;
1326 /// # #[tokio::main]
1327 /// # async fn main() -> Result<()> {
1328 /// let ctx = SessionContext::new();
1329 /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
1330 /// let count = df.count().await?; // 1
1331 /// # assert_eq!(count, 1);
1332 /// # Ok(())
1333 /// # }
1334 /// ```
1335 pub async fn count(self) -> Result<usize> {
1336 let rows = self
1337 .aggregate(
1338 vec![],
1339 vec![count(Expr::Literal(COUNT_STAR_EXPANSION, None))],
1340 )?
1341 .collect()
1342 .await?;
1343 let len = *rows
1344 .first()
1345 .and_then(|r| r.columns().first())
1346 .and_then(|c| c.as_any().downcast_ref::<Int64Array>())
1347 .and_then(|a| a.values().first())
1348 .ok_or(DataFusionError::Internal(
1349 "Unexpected output when collecting for count()".to_string(),
1350 ))? as usize;
1351 Ok(len)
1352 }
1353
1354 /// Execute this `DataFrame` and buffer all resulting `RecordBatch`es into memory.
1355 ///
1356 /// Prior to calling `collect`, modifying a DataFrame simply updates a plan
1357 /// (no actual computation is performed). `collect` triggers the computation.
1358 ///
1359 /// See [`Self::execute_stream`] to execute a DataFrame without buffering.
1360 ///
1361 /// # Example
1362 /// ```
1363 /// # use datafusion::prelude::*;
1364 /// # use datafusion::error::Result;
1365 /// # #[tokio::main]
1366 /// # async fn main() -> Result<()> {
1367 /// let ctx = SessionContext::new();
1368 /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
1369 /// let batches = df.collect().await?;
1370 /// # Ok(())
1371 /// # }
1372 /// ```
1373 pub async fn collect(self) -> Result<Vec<RecordBatch>> {
1374 let task_ctx = Arc::new(self.task_ctx());
1375 let plan = self.create_physical_plan().await?;
1376 collect(plan, task_ctx).await
1377 }
1378
1379 /// Execute the `DataFrame` and print the results to the console.
1380 ///
1381 /// # Example
1382 /// ```
1383 /// # use datafusion::prelude::*;
1384 /// # use datafusion::error::Result;
1385 /// # #[tokio::main]
1386 /// # async fn main() -> Result<()> {
1387 /// let ctx = SessionContext::new();
1388 /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
1389 /// df.show().await?;
1390 /// # Ok(())
1391 /// # }
1392 /// ```
1393 pub async fn show(self) -> Result<()> {
1394 println!("{}", self.to_string().await?);
1395 Ok(())
1396 }
1397
1398 /// Execute the `DataFrame` and return a string representation of the results.
1399 ///
1400 /// # Example
1401 /// ```
1402 /// # use datafusion::prelude::*;
1403 /// # use datafusion::error::Result;
1404 /// # use datafusion::execution::SessionStateBuilder;
1405 ///
1406 /// # #[tokio::main]
1407 /// # async fn main() -> Result<()> {
1408 /// let cfg = SessionConfig::new()
1409 /// .set_str("datafusion.format.null", "no-value");
1410 /// let session_state = SessionStateBuilder::new()
1411 /// .with_config(cfg)
1412 /// .with_default_features()
1413 /// .build();
1414 /// let ctx = SessionContext::new_with_state(session_state);
1415 /// let df = ctx.sql("select null as 'null-column'").await?;
1416 /// let result = df.to_string().await?;
1417 /// assert_eq!(result,
1418 /// "+-------------+
1419 /// | null-column |
1420 /// +-------------+
1421 /// | no-value |
1422 /// +-------------+"
1423 /// );
1424 /// # Ok(())
1425 /// # }
1426 pub async fn to_string(self) -> Result<String> {
1427 let options = self.session_state.config().options().format.clone();
1428 let arrow_options: arrow::util::display::FormatOptions = (&options).try_into()?;
1429
1430 let results = self.collect().await?;
1431 Ok(
1432 pretty::pretty_format_batches_with_options(&results, &arrow_options)?
1433 .to_string(),
1434 )
1435 }
1436
1437 /// Execute the `DataFrame` and print only the first `num` rows of the
1438 /// result to the console.
1439 ///
1440 /// # Example
1441 /// ```
1442 /// # use datafusion::prelude::*;
1443 /// # use datafusion::error::Result;
1444 /// # #[tokio::main]
1445 /// # async fn main() -> Result<()> {
1446 /// let ctx = SessionContext::new();
1447 /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
1448 /// df.show_limit(10).await?;
1449 /// # Ok(())
1450 /// # }
1451 /// ```
1452 pub async fn show_limit(self, num: usize) -> Result<()> {
1453 let results = self.limit(0, Some(num))?.collect().await?;
1454 Ok(pretty::print_batches(&results)?)
1455 }
1456
1457 /// Return a new [`TaskContext`] which would be used to execute this DataFrame
1458 pub fn task_ctx(&self) -> TaskContext {
1459 TaskContext::from(self.session_state.as_ref())
1460 }
1461
1462 /// Executes this DataFrame and returns a stream over a single partition
1463 ///
1464 /// See [Self::collect] to buffer the `RecordBatch`es in memory.
1465 ///
1466 /// # Example
1467 /// ```
1468 /// # use datafusion::prelude::*;
1469 /// # use datafusion::error::Result;
1470 /// # #[tokio::main]
1471 /// # async fn main() -> Result<()> {
1472 /// let ctx = SessionContext::new();
1473 /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
1474 /// let stream = df.execute_stream().await?;
1475 /// # Ok(())
1476 /// # }
1477 /// ```
1478 ///
1479 /// # Aborting Execution
1480 ///
1481 /// Dropping the stream will abort the execution of the query, and free up
1482 /// any allocated resources
1483 pub async fn execute_stream(self) -> Result<SendableRecordBatchStream> {
1484 let task_ctx = Arc::new(self.task_ctx());
1485 let plan = self.create_physical_plan().await?;
1486 execute_stream(plan, task_ctx)
1487 }
1488
1489 /// Executes this DataFrame and collects all results into a vector of vector of RecordBatch
1490 /// maintaining the input partitioning.
1491 ///
1492 /// # Example
1493 /// ```
1494 /// # use datafusion::prelude::*;
1495 /// # use datafusion::error::Result;
1496 /// # #[tokio::main]
1497 /// # async fn main() -> Result<()> {
1498 /// let ctx = SessionContext::new();
1499 /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
1500 /// let batches = df.collect_partitioned().await?;
1501 /// # Ok(())
1502 /// # }
1503 /// ```
1504 pub async fn collect_partitioned(self) -> Result<Vec<Vec<RecordBatch>>> {
1505 let task_ctx = Arc::new(self.task_ctx());
1506 let plan = self.create_physical_plan().await?;
1507 collect_partitioned(plan, task_ctx).await
1508 }
1509
1510 /// Executes this DataFrame and returns one stream per partition.
1511 ///
1512 /// # Example
1513 /// ```
1514 /// # use datafusion::prelude::*;
1515 /// # use datafusion::error::Result;
1516 /// # #[tokio::main]
1517 /// # async fn main() -> Result<()> {
1518 /// let ctx = SessionContext::new();
1519 /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
1520 /// let batches = df.execute_stream_partitioned().await?;
1521 /// # Ok(())
1522 /// # }
1523 /// ```
1524 /// # Aborting Execution
1525 ///
1526 /// Dropping the stream will abort the execution of the query, and free up
1527 /// any allocated resources
1528 pub async fn execute_stream_partitioned(
1529 self,
1530 ) -> Result<Vec<SendableRecordBatchStream>> {
1531 let task_ctx = Arc::new(self.task_ctx());
1532 let plan = self.create_physical_plan().await?;
1533 execute_stream_partitioned(plan, task_ctx)
1534 }
1535
1536 /// Returns the `DFSchema` describing the output of this DataFrame.
1537 ///
1538 /// The output `DFSchema` contains information on the name, data type, and
1539 /// nullability for each column.
1540 ///
1541 /// # Example
1542 /// ```
1543 /// # use datafusion::prelude::*;
1544 /// # use datafusion::error::Result;
1545 /// # #[tokio::main]
1546 /// # async fn main() -> Result<()> {
1547 /// let ctx = SessionContext::new();
1548 /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
1549 /// let schema = df.schema();
1550 /// # Ok(())
1551 /// # }
1552 /// ```
1553 pub fn schema(&self) -> &DFSchema {
1554 self.plan.schema()
1555 }
1556
1557 /// Return a reference to the unoptimized [`LogicalPlan`] that comprises
1558 /// this DataFrame.
1559 ///
1560 /// See [`Self::into_unoptimized_plan`] for more details.
1561 pub fn logical_plan(&self) -> &LogicalPlan {
1562 &self.plan
1563 }
1564
1565 /// Returns both the [`LogicalPlan`] and [`SessionState`] that comprise this [`DataFrame`]
1566 pub fn into_parts(self) -> (SessionState, LogicalPlan) {
1567 (*self.session_state, self.plan)
1568 }
1569
1570 /// Return the [`LogicalPlan`] represented by this DataFrame without running
1571 /// any optimizers
1572 ///
1573 /// Note: This method should not be used outside testing, as it loses the
1574 /// snapshot of the [`SessionState`] attached to this [`DataFrame`] and
1575 /// consequently subsequent operations may take place against a different
1576 /// state (e.g. a different value of `now()`)
1577 ///
1578 /// See [`Self::into_parts`] to retrieve the owned [`LogicalPlan`] and
1579 /// corresponding [`SessionState`].
1580 pub fn into_unoptimized_plan(self) -> LogicalPlan {
1581 self.plan
1582 }
1583
1584 /// Return the optimized [`LogicalPlan`] represented by this DataFrame.
1585 ///
1586 /// Note: This method should not be used outside testing -- see
1587 /// [`Self::into_unoptimized_plan`] for more details.
1588 pub fn into_optimized_plan(self) -> Result<LogicalPlan> {
1589 // Optimize the plan first for better UX
1590 self.session_state.optimize(&self.plan)
1591 }
1592
1593 /// Converts this [`DataFrame`] into a [`TableProvider`] that can be registered
1594 /// as a table view using [`SessionContext::register_table`].
1595 ///
1596 /// Note: This discards the [`SessionState`] associated with this
1597 /// [`DataFrame`] in favour of the one passed to [`TableProvider::scan`]
1598 pub fn into_view(self) -> Arc<dyn TableProvider> {
1599 Arc::new(DataFrameTableProvider { plan: self.plan })
1600 }
1601
1602 /// Return a DataFrame with the explanation of its plan so far.
1603 ///
1604 /// if `analyze` is specified, runs the plan and reports metrics
1605 /// if `verbose` is true, prints out additional details.
1606 /// The default format is Indent format.
1607 ///
1608 /// ```
1609 /// # use datafusion::prelude::*;
1610 /// # use datafusion::error::Result;
1611 /// # #[tokio::main]
1612 /// # async fn main() -> Result<()> {
1613 /// let ctx = SessionContext::new();
1614 /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
1615 /// let batches = df.limit(0, Some(100))?.explain(false, false)?.collect().await?;
1616 /// # Ok(())
1617 /// # }
1618 /// ```
1619 pub fn explain(self, verbose: bool, analyze: bool) -> Result<DataFrame> {
1620 // Set the default format to Indent to keep the previous behavior
1621 let opts = ExplainOption::default()
1622 .with_verbose(verbose)
1623 .with_analyze(analyze);
1624 self.explain_with_options(opts)
1625 }
1626
1627 /// Return a DataFrame with the explanation of its plan so far.
1628 ///
1629 /// `opt` is used to specify the options for the explain operation.
1630 /// Details of the options can be found in [`ExplainOption`].
1631 /// ```
1632 /// # use datafusion::prelude::*;
1633 /// # use datafusion::error::Result;
1634 /// # #[tokio::main]
1635 /// # async fn main() -> Result<()> {
1636 /// use datafusion_expr::{Explain, ExplainOption};
1637 /// let ctx = SessionContext::new();
1638 /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
1639 /// let batches = df.limit(0, Some(100))?.explain_with_options(ExplainOption::default().with_verbose(false).with_analyze(false))?.collect().await?;
1640 /// # Ok(())
1641 /// # }
1642 /// ```
1643 pub fn explain_with_options(
1644 self,
1645 explain_option: ExplainOption,
1646 ) -> Result<DataFrame> {
1647 if matches!(self.plan, LogicalPlan::Explain(_)) {
1648 return plan_err!("Nested EXPLAINs are not supported");
1649 }
1650 let plan = LogicalPlanBuilder::from(self.plan)
1651 .explain_option_format(explain_option)?
1652 .build()?;
1653 Ok(DataFrame {
1654 session_state: self.session_state,
1655 plan,
1656 projection_requires_validation: self.projection_requires_validation,
1657 })
1658 }
1659
1660 /// Return a `FunctionRegistry` used to plan udf's calls
1661 ///
1662 /// # Example
1663 /// ```
1664 /// # use datafusion::prelude::*;
1665 /// # use datafusion::error::Result;
1666 /// # #[tokio::main]
1667 /// # async fn main() -> Result<()> {
1668 /// let ctx = SessionContext::new();
1669 /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
1670 /// let f = df.registry();
1671 /// // use f.udf("name", vec![...]) to use the udf
1672 /// # Ok(())
1673 /// # }
1674 /// ```
1675 pub fn registry(&self) -> &dyn FunctionRegistry {
1676 self.session_state.as_ref()
1677 }
1678
1679 /// Calculate the intersection of two [`DataFrame`]s. The two [`DataFrame`]s must have exactly the same schema
1680 ///
1681 /// ```
1682 /// # use datafusion::prelude::*;
1683 /// # use datafusion::error::Result;
1684 /// # use datafusion_common::assert_batches_sorted_eq;
1685 /// # #[tokio::main]
1686 /// # async fn main() -> Result<()> {
1687 /// let ctx = SessionContext::new();
1688 /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
1689 /// let d2 = ctx.read_csv("tests/data/example_long.csv", CsvReadOptions::new()).await?;
1690 /// let df = df.intersect(d2)?;
1691 /// let expected = vec![
1692 /// "+---+---+---+",
1693 /// "| a | b | c |",
1694 /// "+---+---+---+",
1695 /// "| 1 | 2 | 3 |",
1696 /// "+---+---+---+"
1697 /// ];
1698 /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
1699 /// # Ok(())
1700 /// # }
1701 /// ```
1702 pub fn intersect(self, dataframe: DataFrame) -> Result<DataFrame> {
1703 let left_plan = self.plan;
1704 let right_plan = dataframe.plan;
1705 let plan = LogicalPlanBuilder::intersect(left_plan, right_plan, true)?;
1706 Ok(DataFrame {
1707 session_state: self.session_state,
1708 plan,
1709 projection_requires_validation: true,
1710 })
1711 }
1712
1713 /// Calculate the distinct intersection of two [`DataFrame`]s. The two [`DataFrame`]s must have exactly the same schema
1714 ///
1715 /// ```
1716 /// # use datafusion::prelude::*;
1717 /// # use datafusion::error::Result;
1718 /// # use datafusion_common::assert_batches_sorted_eq;
1719 /// # #[tokio::main]
1720 /// # async fn main() -> Result<()> {
1721 /// let ctx = SessionContext::new();
1722 /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
1723 /// let d2 = ctx.read_csv("tests/data/example_long.csv", CsvReadOptions::new()).await?;
1724 /// let df = df.intersect_distinct(d2)?;
1725 /// let expected = vec![
1726 /// "+---+---+---+",
1727 /// "| a | b | c |",
1728 /// "+---+---+---+",
1729 /// "| 1 | 2 | 3 |",
1730 /// "+---+---+---+"
1731 /// ];
1732 /// # assert_batches_sorted_eq!(expected, &df.collect().await?);
1733 /// # Ok(())
1734 /// # }
1735 /// ```
1736 pub fn intersect_distinct(self, dataframe: DataFrame) -> Result<DataFrame> {
1737 let left_plan = self.plan;
1738 let right_plan = dataframe.plan;
1739 let plan = LogicalPlanBuilder::intersect(left_plan, right_plan, false)?;
1740 Ok(DataFrame {
1741 session_state: self.session_state,
1742 plan,
1743 projection_requires_validation: true,
1744 })
1745 }
1746
1747 /// Calculate the exception of two [`DataFrame`]s. The two [`DataFrame`]s must have exactly the same schema
1748 ///
1749 /// ```
1750 /// # use datafusion::prelude::*;
1751 /// # use datafusion::error::Result;
1752 /// # use datafusion_common::assert_batches_sorted_eq;
1753 /// # #[tokio::main]
1754 /// # async fn main() -> Result<()> {
1755 /// let ctx = SessionContext::new();
1756 /// let df = ctx.read_csv("tests/data/example_long.csv", CsvReadOptions::new()).await?;
1757 /// let d2 = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
1758 /// let result = df.except(d2)?;
1759 /// // those columns are not in example.csv, but in example_long.csv
1760 /// let expected = vec![
1761 /// "+---+---+---+",
1762 /// "| a | b | c |",
1763 /// "+---+---+---+",
1764 /// "| 4 | 5 | 6 |",
1765 /// "| 7 | 8 | 9 |",
1766 /// "+---+---+---+"
1767 /// ];
1768 /// # assert_batches_sorted_eq!(expected, &result.collect().await?);
1769 /// # Ok(())
1770 /// # }
1771 /// ```
1772 pub fn except(self, dataframe: DataFrame) -> Result<DataFrame> {
1773 let left_plan = self.plan;
1774 let right_plan = dataframe.plan;
1775 let plan = LogicalPlanBuilder::except(left_plan, right_plan, true)?;
1776 Ok(DataFrame {
1777 session_state: self.session_state,
1778 plan,
1779 projection_requires_validation: true,
1780 })
1781 }
1782
1783 /// Calculate the distinct exception of two [`DataFrame`]s. The two [`DataFrame`]s must have exactly the same schema
1784 ///
1785 /// ```
1786 /// # use datafusion::prelude::*;
1787 /// # use datafusion::error::Result;
1788 /// # use datafusion_common::assert_batches_sorted_eq;
1789 /// # #[tokio::main]
1790 /// # async fn main() -> Result<()> {
1791 /// let ctx = SessionContext::new();
1792 /// let df = ctx.read_csv("tests/data/example_long.csv", CsvReadOptions::new()).await?;
1793 /// let d2 = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
1794 /// let result = df.except_distinct(d2)?;
1795 /// // those columns are not in example.csv, but in example_long.csv
1796 /// let expected = vec![
1797 /// "+---+---+---+",
1798 /// "| a | b | c |",
1799 /// "+---+---+---+",
1800 /// "| 4 | 5 | 6 |",
1801 /// "| 7 | 8 | 9 |",
1802 /// "+---+---+---+"
1803 /// ];
1804 /// # assert_batches_sorted_eq!(expected, &result.collect().await?);
1805 /// # Ok(())
1806 /// # }
1807 /// ```
1808 pub fn except_distinct(self, dataframe: DataFrame) -> Result<DataFrame> {
1809 let left_plan = self.plan;
1810 let right_plan = dataframe.plan;
1811 let plan = LogicalPlanBuilder::except(left_plan, right_plan, false)?;
1812 Ok(DataFrame {
1813 session_state: self.session_state,
1814 plan,
1815 projection_requires_validation: true,
1816 })
1817 }
1818
1819 /// Execute this `DataFrame` and write the results to `table_name`.
1820 ///
1821 /// Returns a single [RecordBatch] containing a single column and
1822 /// row representing the count of total rows written.
1823 ///
1824 /// Unlike most other `DataFrame` methods, this method executes eagerly.
1825 /// Data is written to the table using the [`TableProvider::insert_into`]
1826 /// method. This is the same underlying implementation used by SQL `INSERT
1827 /// INTO` statements.
1828 pub async fn write_table(
1829 self,
1830 table_name: &str,
1831 write_options: DataFrameWriteOptions,
1832 ) -> Result<Vec<RecordBatch>, DataFusionError> {
1833 let plan = if write_options.sort_by.is_empty() {
1834 self.plan
1835 } else {
1836 LogicalPlanBuilder::from(self.plan)
1837 .sort(write_options.sort_by)?
1838 .build()?
1839 };
1840
1841 let table_ref: TableReference = table_name.into();
1842 let table_schema = self.session_state.schema_for_ref(table_ref.clone())?;
1843 let target = match table_schema.table(table_ref.table()).await? {
1844 Some(ref provider) => Ok(Arc::clone(provider)),
1845 _ => plan_err!("No table named '{table_name}'"),
1846 }?;
1847
1848 let target = Arc::new(DefaultTableSource::new(target));
1849
1850 let plan = LogicalPlanBuilder::insert_into(
1851 plan,
1852 table_ref,
1853 target,
1854 write_options.insert_op,
1855 )?
1856 .build()?;
1857
1858 DataFrame {
1859 session_state: self.session_state,
1860 plan,
1861 projection_requires_validation: self.projection_requires_validation,
1862 }
1863 .collect()
1864 .await
1865 }
1866
1867 /// Execute the `DataFrame` and write the results to CSV file(s).
1868 ///
1869 /// # Example
1870 /// ```
1871 /// # use datafusion::prelude::*;
1872 /// # use datafusion::error::Result;
1873 /// # use std::fs;
1874 /// # #[tokio::main]
1875 /// # async fn main() -> Result<()> {
1876 /// use datafusion::dataframe::DataFrameWriteOptions;
1877 /// let ctx = SessionContext::new();
1878 /// // Sort the data by column "b" and write it to a new location
1879 /// ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?
1880 /// .sort(vec![col("b").sort(true, true)])? // sort by b asc, nulls first
1881 /// .write_csv(
1882 /// "output.csv",
1883 /// DataFrameWriteOptions::new(),
1884 /// None, // can also specify CSV writing options here
1885 /// ).await?;
1886 /// # fs::remove_file("output.csv")?;
1887 /// # Ok(())
1888 /// # }
1889 /// ```
1890 pub async fn write_csv(
1891 self,
1892 path: &str,
1893 options: DataFrameWriteOptions,
1894 writer_options: Option<CsvOptions>,
1895 ) -> Result<Vec<RecordBatch>, DataFusionError> {
1896 if options.insert_op != InsertOp::Append {
1897 return not_impl_err!(
1898 "{} is not implemented for DataFrame::write_csv.",
1899 options.insert_op
1900 );
1901 }
1902
1903 let format = if let Some(csv_opts) = writer_options {
1904 Arc::new(CsvFormatFactory::new_with_options(csv_opts))
1905 } else {
1906 Arc::new(CsvFormatFactory::new())
1907 };
1908
1909 let file_type = format_as_file_type(format);
1910
1911 let plan = if options.sort_by.is_empty() {
1912 self.plan
1913 } else {
1914 LogicalPlanBuilder::from(self.plan)
1915 .sort(options.sort_by)?
1916 .build()?
1917 };
1918
1919 let plan = LogicalPlanBuilder::copy_to(
1920 plan,
1921 path.into(),
1922 file_type,
1923 HashMap::new(),
1924 options.partition_by,
1925 )?
1926 .build()?;
1927
1928 DataFrame {
1929 session_state: self.session_state,
1930 plan,
1931 projection_requires_validation: self.projection_requires_validation,
1932 }
1933 .collect()
1934 .await
1935 }
1936
1937 /// Execute the `DataFrame` and write the results to JSON file(s).
1938 ///
1939 /// # Example
1940 /// ```
1941 /// # use datafusion::prelude::*;
1942 /// # use datafusion::error::Result;
1943 /// # use std::fs;
1944 /// # #[tokio::main]
1945 /// # async fn main() -> Result<()> {
1946 /// use datafusion::dataframe::DataFrameWriteOptions;
1947 /// let ctx = SessionContext::new();
1948 /// // Sort the data by column "b" and write it to a new location
1949 /// ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?
1950 /// .sort(vec![col("b").sort(true, true)])? // sort by b asc, nulls first
1951 /// .write_json(
1952 /// "output.json",
1953 /// DataFrameWriteOptions::new(),
1954 /// None
1955 /// ).await?;
1956 /// # fs::remove_file("output.json")?;
1957 /// # Ok(())
1958 /// # }
1959 /// ```
1960 pub async fn write_json(
1961 self,
1962 path: &str,
1963 options: DataFrameWriteOptions,
1964 writer_options: Option<JsonOptions>,
1965 ) -> Result<Vec<RecordBatch>, DataFusionError> {
1966 if options.insert_op != InsertOp::Append {
1967 return not_impl_err!(
1968 "{} is not implemented for DataFrame::write_json.",
1969 options.insert_op
1970 );
1971 }
1972
1973 let format = if let Some(json_opts) = writer_options {
1974 Arc::new(JsonFormatFactory::new_with_options(json_opts))
1975 } else {
1976 Arc::new(JsonFormatFactory::new())
1977 };
1978
1979 let file_type = format_as_file_type(format);
1980
1981 let plan = if options.sort_by.is_empty() {
1982 self.plan
1983 } else {
1984 LogicalPlanBuilder::from(self.plan)
1985 .sort(options.sort_by)?
1986 .build()?
1987 };
1988
1989 let plan = LogicalPlanBuilder::copy_to(
1990 plan,
1991 path.into(),
1992 file_type,
1993 Default::default(),
1994 options.partition_by,
1995 )?
1996 .build()?;
1997
1998 DataFrame {
1999 session_state: self.session_state,
2000 plan,
2001 projection_requires_validation: self.projection_requires_validation,
2002 }
2003 .collect()
2004 .await
2005 }
2006
2007 /// Add or replace a column in the DataFrame.
2008 ///
2009 /// # Example
2010 /// ```
2011 /// # use datafusion::prelude::*;
2012 /// # use datafusion::error::Result;
2013 /// # #[tokio::main]
2014 /// # async fn main() -> Result<()> {
2015 /// let ctx = SessionContext::new();
2016 /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
2017 /// let df = df.with_column("ab_sum", col("a") + col("b"))?;
2018 /// # Ok(())
2019 /// # }
2020 /// ```
2021 pub fn with_column(self, name: &str, expr: Expr) -> Result<DataFrame> {
2022 let window_func_exprs = find_window_exprs([&expr]);
2023
2024 let (window_fn_str, plan) = if window_func_exprs.is_empty() {
2025 (None, self.plan)
2026 } else {
2027 (
2028 Some(window_func_exprs[0].to_string()),
2029 LogicalPlanBuilder::window_plan(self.plan, window_func_exprs)?,
2030 )
2031 };
2032
2033 let mut col_exists = false;
2034 let new_column = expr.alias(name);
2035 let mut fields: Vec<(Expr, bool)> = plan
2036 .schema()
2037 .iter()
2038 .filter_map(|(qualifier, field)| {
2039 if field.name() == name {
2040 col_exists = true;
2041 Some((new_column.clone(), true))
2042 } else {
2043 let e = col(Column::from((qualifier, field)));
2044 window_fn_str
2045 .as_ref()
2046 .filter(|s| *s == &e.to_string())
2047 .is_none()
2048 .then_some((e, self.projection_requires_validation))
2049 }
2050 })
2051 .collect();
2052
2053 if !col_exists {
2054 fields.push((new_column, true));
2055 }
2056
2057 let project_plan = LogicalPlanBuilder::from(plan)
2058 .project_with_validation(fields)?
2059 .build()?;
2060
2061 Ok(DataFrame {
2062 session_state: self.session_state,
2063 plan: project_plan,
2064 projection_requires_validation: false,
2065 })
2066 }
2067
2068 /// Rename one column by applying a new projection. This is a no-op if the column to be
2069 /// renamed does not exist.
2070 ///
2071 /// The method supports case sensitive rename with wrapping column name into one of following symbols ( " or ' or ` )
2072 ///
2073 /// Alternatively setting DataFusion param `datafusion.sql_parser.enable_ident_normalization` to `false` will enable
2074 /// case sensitive rename without need to wrap column name into special symbols
2075 ///
2076 /// # Example
2077 /// ```
2078 /// # use datafusion::prelude::*;
2079 /// # use datafusion::error::Result;
2080 /// # #[tokio::main]
2081 /// # async fn main() -> Result<()> {
2082 /// let ctx = SessionContext::new();
2083 /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
2084 /// let df = df.with_column_renamed("ab_sum", "total")?;
2085 ///
2086 /// # Ok(())
2087 /// # }
2088 /// ```
2089 pub fn with_column_renamed(
2090 self,
2091 old_name: impl Into<String>,
2092 new_name: &str,
2093 ) -> Result<DataFrame> {
2094 let ident_opts = self
2095 .session_state
2096 .config_options()
2097 .sql_parser
2098 .enable_ident_normalization;
2099 let old_column: Column = if ident_opts {
2100 Column::from_qualified_name(old_name)
2101 } else {
2102 Column::from_qualified_name_ignore_case(old_name)
2103 };
2104
2105 let (qualifier_rename, field_rename) =
2106 match self.plan.schema().qualified_field_from_column(&old_column) {
2107 Ok(qualifier_and_field) => qualifier_and_field,
2108 // no-op if field not found
2109 Err(DataFusionError::SchemaError(e, _))
2110 if matches!(*e, SchemaError::FieldNotFound { .. }) =>
2111 {
2112 return Ok(self);
2113 }
2114 Err(err) => return Err(err),
2115 };
2116 let projection = self
2117 .plan
2118 .schema()
2119 .iter()
2120 .map(|(qualifier, field)| {
2121 if qualifier.eq(&qualifier_rename) && field.as_ref() == field_rename {
2122 (
2123 col(Column::from((qualifier, field)))
2124 .alias_qualified(qualifier.cloned(), new_name),
2125 false,
2126 )
2127 } else {
2128 (col(Column::from((qualifier, field))), false)
2129 }
2130 })
2131 .collect::<Vec<_>>();
2132 let project_plan = LogicalPlanBuilder::from(self.plan)
2133 .project_with_validation(projection)?
2134 .build()?;
2135 Ok(DataFrame {
2136 session_state: self.session_state,
2137 plan: project_plan,
2138 projection_requires_validation: false,
2139 })
2140 }
2141
2142 /// Replace all parameters in logical plan with the specified
2143 /// values, in preparation for execution.
2144 ///
2145 /// # Example
2146 ///
2147 /// ```
2148 /// use datafusion::prelude::*;
2149 /// # use datafusion::{error::Result, assert_batches_eq};
2150 /// # #[tokio::main]
2151 /// # async fn main() -> Result<()> {
2152 /// # use datafusion_common::ScalarValue;
2153 /// let ctx = SessionContext::new();
2154 /// # ctx.register_csv("example", "tests/data/example.csv", CsvReadOptions::new()).await?;
2155 /// let results = ctx
2156 /// .sql("SELECT a FROM example WHERE b = $1")
2157 /// .await?
2158 /// // replace $1 with value 2
2159 /// .with_param_values(vec![
2160 /// // value at index 0 --> $1
2161 /// ScalarValue::from(2i64)
2162 /// ])?
2163 /// .collect()
2164 /// .await?;
2165 /// assert_batches_eq!(
2166 /// &[
2167 /// "+---+",
2168 /// "| a |",
2169 /// "+---+",
2170 /// "| 1 |",
2171 /// "+---+",
2172 /// ],
2173 /// &results
2174 /// );
2175 /// // Note you can also provide named parameters
2176 /// let results = ctx
2177 /// .sql("SELECT a FROM example WHERE b = $my_param")
2178 /// .await?
2179 /// // replace $my_param with value 2
2180 /// // Note you can also use a HashMap as well
2181 /// .with_param_values(vec![
2182 /// ("my_param", ScalarValue::from(2i64))
2183 /// ])?
2184 /// .collect()
2185 /// .await?;
2186 /// assert_batches_eq!(
2187 /// &[
2188 /// "+---+",
2189 /// "| a |",
2190 /// "+---+",
2191 /// "| 1 |",
2192 /// "+---+",
2193 /// ],
2194 /// &results
2195 /// );
2196 /// # Ok(())
2197 /// # }
2198 /// ```
2199 pub fn with_param_values(self, query_values: impl Into<ParamValues>) -> Result<Self> {
2200 let plan = self.plan.with_param_values(query_values)?;
2201 Ok(DataFrame {
2202 session_state: self.session_state,
2203 plan,
2204 projection_requires_validation: self.projection_requires_validation,
2205 })
2206 }
2207
2208 /// Cache DataFrame as a memory table.
2209 ///
2210 /// ```
2211 /// # use datafusion::prelude::*;
2212 /// # use datafusion::error::Result;
2213 /// # #[tokio::main]
2214 /// # async fn main() -> Result<()> {
2215 /// let ctx = SessionContext::new();
2216 /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
2217 /// let df = df.cache().await?;
2218 /// # Ok(())
2219 /// # }
2220 /// ```
2221 pub async fn cache(self) -> Result<DataFrame> {
2222 let context = SessionContext::new_with_state((*self.session_state).clone());
2223 // The schema is consistent with the output
2224 let plan = self.clone().create_physical_plan().await?;
2225 let schema = plan.schema();
2226 let task_ctx = Arc::new(self.task_ctx());
2227 let partitions = collect_partitioned(plan, task_ctx).await?;
2228 let mem_table = MemTable::try_new(schema, partitions)?;
2229 context.read_table(Arc::new(mem_table))
2230 }
2231
2232 /// Apply an alias to the DataFrame.
2233 ///
2234 /// This method replaces the qualifiers of output columns with the given alias.
2235 pub fn alias(self, alias: &str) -> Result<DataFrame> {
2236 let plan = LogicalPlanBuilder::from(self.plan).alias(alias)?.build()?;
2237 Ok(DataFrame {
2238 session_state: self.session_state,
2239 plan,
2240 projection_requires_validation: self.projection_requires_validation,
2241 })
2242 }
2243
2244 /// Fill null values in specified columns with a given value
2245 /// If no columns are specified (empty vector), applies to all columns
2246 /// Only fills if the value can be cast to the column's type
2247 ///
2248 /// # Arguments
2249 /// * `value` - Value to fill nulls with
2250 /// * `columns` - List of column names to fill. If empty, fills all columns.
2251 ///
2252 /// # Example
2253 /// ```
2254 /// # use datafusion::prelude::*;
2255 /// # use datafusion::error::Result;
2256 /// # use datafusion_common::ScalarValue;
2257 /// # #[tokio::main]
2258 /// # async fn main() -> Result<()> {
2259 /// let ctx = SessionContext::new();
2260 /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
2261 /// // Fill nulls in only columns "a" and "c":
2262 /// let df = df.fill_null(ScalarValue::from(0), vec!["a".to_owned(), "c".to_owned()])?;
2263 /// // Fill nulls across all columns:
2264 /// let df = df.fill_null(ScalarValue::from(0), vec![])?;
2265 /// # Ok(())
2266 /// # }
2267 /// ```
2268 pub fn fill_null(
2269 &self,
2270 value: ScalarValue,
2271 columns: Vec<String>,
2272 ) -> Result<DataFrame> {
2273 let cols = if columns.is_empty() {
2274 self.logical_plan()
2275 .schema()
2276 .fields()
2277 .iter()
2278 .map(|f| f.as_ref().clone())
2279 .collect()
2280 } else {
2281 self.find_columns(&columns)?
2282 };
2283
2284 // Create projections for each column
2285 let projections = self
2286 .logical_plan()
2287 .schema()
2288 .fields()
2289 .iter()
2290 .map(|field| {
2291 if cols.contains(field) {
2292 // Try to cast fill value to column type. If the cast fails, fallback to the original column.
2293 match value.clone().cast_to(field.data_type()) {
2294 Ok(fill_value) => Expr::Alias(Alias {
2295 expr: Box::new(Expr::ScalarFunction(ScalarFunction {
2296 func: coalesce(),
2297 args: vec![col(field.name()), lit(fill_value)],
2298 })),
2299 relation: None,
2300 name: field.name().to_string(),
2301 metadata: None,
2302 }),
2303 Err(_) => col(field.name()),
2304 }
2305 } else {
2306 col(field.name())
2307 }
2308 })
2309 .collect::<Vec<_>>();
2310
2311 self.clone().select(projections)
2312 }
2313
2314 // Helper to find columns from names
2315 fn find_columns(&self, names: &[String]) -> Result<Vec<Field>> {
2316 let schema = self.logical_plan().schema();
2317 names
2318 .iter()
2319 .map(|name| {
2320 schema
2321 .field_with_name(None, name)
2322 .cloned()
2323 .map_err(|_| plan_datafusion_err!("Column '{}' not found", name))
2324 })
2325 .collect()
2326 }
2327
2328 /// Helper for creating DataFrame.
2329 /// # Example
2330 /// ```
2331 /// use std::sync::Arc;
2332 /// use arrow::array::{ArrayRef, Int32Array, StringArray};
2333 /// use datafusion::prelude::DataFrame;
2334 /// let id: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3]));
2335 /// let name: ArrayRef = Arc::new(StringArray::from(vec!["foo", "bar", "baz"]));
2336 /// let df = DataFrame::from_columns(vec![("id", id), ("name", name)]).unwrap();
2337 /// // +----+------+,
2338 /// // | id | name |,
2339 /// // +----+------+,
2340 /// // | 1 | foo |,
2341 /// // | 2 | bar |,
2342 /// // | 3 | baz |,
2343 /// // +----+------+,
2344 /// ```
2345 pub fn from_columns(columns: Vec<(&str, ArrayRef)>) -> Result<Self> {
2346 let fields = columns
2347 .iter()
2348 .map(|(name, array)| Field::new(*name, array.data_type().clone(), true))
2349 .collect::<Vec<_>>();
2350
2351 let arrays = columns
2352 .into_iter()
2353 .map(|(_, array)| array)
2354 .collect::<Vec<_>>();
2355
2356 let schema = Arc::new(Schema::new(fields));
2357 let batch = RecordBatch::try_new(schema, arrays)?;
2358 let ctx = SessionContext::new();
2359 let df = ctx.read_batch(batch)?;
2360 Ok(df)
2361 }
2362}
2363
2364/// Macro for creating DataFrame.
2365/// # Example
2366/// ```
2367/// use datafusion::prelude::dataframe;
2368/// # use datafusion::error::Result;
2369/// # #[tokio::main]
2370/// # async fn main() -> Result<()> {
2371/// let df = dataframe!(
2372/// "id" => [1, 2, 3],
2373/// "name" => ["foo", "bar", "baz"]
2374/// )?;
2375/// df.show().await?;
2376/// // +----+------+,
2377/// // | id | name |,
2378/// // +----+------+,
2379/// // | 1 | foo |,
2380/// // | 2 | bar |,
2381/// // | 3 | baz |,
2382/// // +----+------+,
2383/// let df_empty = dataframe!()?; // empty DataFrame
2384/// assert_eq!(df_empty.schema().fields().len(), 0);
2385/// assert_eq!(df_empty.count().await?, 0);
2386/// # Ok(())
2387/// # }
2388/// ```
2389#[macro_export]
2390macro_rules! dataframe {
2391 () => {{
2392 use std::sync::Arc;
2393
2394 use datafusion::prelude::SessionContext;
2395 use datafusion::arrow::array::RecordBatch;
2396 use datafusion::arrow::datatypes::Schema;
2397
2398 let ctx = SessionContext::new();
2399 let batch = RecordBatch::new_empty(Arc::new(Schema::empty()));
2400 ctx.read_batch(batch)
2401 }};
2402
2403 ($($name:expr => $data:expr),+ $(,)?) => {{
2404 use datafusion::prelude::DataFrame;
2405 use datafusion::common::test_util::IntoArrayRef;
2406
2407 let columns = vec![
2408 $(
2409 ($name, $data.into_array_ref()),
2410 )+
2411 ];
2412
2413 DataFrame::from_columns(columns)
2414 }};
2415}
2416
2417#[derive(Debug)]
2418struct DataFrameTableProvider {
2419 plan: LogicalPlan,
2420}
2421
2422#[async_trait]
2423impl TableProvider for DataFrameTableProvider {
2424 fn as_any(&self) -> &dyn Any {
2425 self
2426 }
2427
2428 fn get_logical_plan(&self) -> Option<Cow<LogicalPlan>> {
2429 Some(Cow::Borrowed(&self.plan))
2430 }
2431
2432 fn supports_filters_pushdown(
2433 &self,
2434 filters: &[&Expr],
2435 ) -> Result<Vec<TableProviderFilterPushDown>> {
2436 // A filter is added on the DataFrame when given
2437 Ok(vec![TableProviderFilterPushDown::Exact; filters.len()])
2438 }
2439
2440 fn schema(&self) -> SchemaRef {
2441 let schema: Schema = self.plan.schema().as_ref().into();
2442 Arc::new(schema)
2443 }
2444
2445 fn table_type(&self) -> TableType {
2446 TableType::View
2447 }
2448
2449 async fn scan(
2450 &self,
2451 state: &dyn Session,
2452 projection: Option<&Vec<usize>>,
2453 filters: &[Expr],
2454 limit: Option<usize>,
2455 ) -> Result<Arc<dyn ExecutionPlan>> {
2456 let mut expr = LogicalPlanBuilder::from(self.plan.clone());
2457 // Add filter when given
2458 let filter = filters.iter().cloned().reduce(|acc, new| acc.and(new));
2459 if let Some(filter) = filter {
2460 expr = expr.filter(filter)?
2461 }
2462
2463 if let Some(p) = projection {
2464 expr = expr.select(p.iter().copied())?
2465 }
2466
2467 // add a limit if given
2468 if let Some(l) = limit {
2469 expr = expr.limit(0, Some(l))?
2470 }
2471 let plan = expr.build()?;
2472 state.create_physical_plan(&plan).await
2473 }
2474}
2475
2476// see tests in datafusion/core/tests/dataframe/mod.rs:2816