1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
use crate::error::Result;
use crate::logicalplan::LogicalPlan;
use std::sync::Arc;
pub trait Table {
fn select_columns(&self, columns: Vec<&str>) -> Result<Arc<Table>>;
fn limit(&self, n: usize) -> Result<Arc<Table>>;
fn to_logical_plan(&self) -> Arc<LogicalPlan>;
}
#[cfg(test)]
mod tests {
use super::*;
use crate::execution::context::ExecutionContext;
use arrow::datatypes::*;
use std::env;
#[test]
fn demonstrate_api_usage() {
let mut ctx = ExecutionContext::new();
register_aggregate_csv(&mut ctx);
let t = ctx.table("aggregate_test_100").unwrap();
let example = t
.select_columns(vec!["c1", "c2", "c11"])
.unwrap()
.limit(10)
.unwrap();
let plan = example.to_logical_plan();
assert_eq!("Limit: UInt32(10)\n Projection: #0, #1, #10\n TableScan: aggregate_test_100 projection=None", format!("{:?}", plan));
}
fn register_aggregate_csv(ctx: &mut ExecutionContext) {
let schema = aggr_test_schema();
let testdata = env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not defined");
ctx.register_csv(
"aggregate_test_100",
&format!("{}/csv/aggregate_test_100.csv", testdata),
&schema,
true,
);
}
fn aggr_test_schema() -> Arc<Schema> {
Arc::new(Schema::new(vec![
Field::new("c1", DataType::Utf8, false),
Field::new("c2", DataType::UInt32, false),
Field::new("c3", DataType::Int8, false),
Field::new("c4", DataType::Int16, false),
Field::new("c5", DataType::Int32, false),
Field::new("c6", DataType::Int64, false),
Field::new("c7", DataType::UInt8, false),
Field::new("c8", DataType::UInt16, false),
Field::new("c9", DataType::UInt32, false),
Field::new("c10", DataType::UInt64, false),
Field::new("c11", DataType::Float32, false),
Field::new("c12", DataType::Float64, false),
Field::new("c13", DataType::Utf8, false),
]))
}
}