1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
use std::rc::Rc;
use super::super::datasources::common::*;
use super::super::errors::*;
use super::super::exec::*;
use arrow::datatypes::Schema;
pub struct LimitRelation {
schema: Rc<Schema>,
input: Box<SimpleRelation>,
limit: usize,
}
impl LimitRelation {
pub fn new(schema: Rc<Schema>, input: Box<SimpleRelation>, limit: usize) -> Self {
LimitRelation {
schema,
input,
limit,
}
}
}
impl SimpleRelation for LimitRelation {
fn scan<'a>(&'a mut self) -> Box<Iterator<Item = Result<Rc<RecordBatch>>> + 'a> {
let mut count: usize = 0;
let limit = self.limit;
Box::new(self.input.scan().map(move |batch| match batch {
Ok(ref b) => {
if count + b.num_rows() < limit {
count += b.num_rows();
Ok(b.clone())
} else {
let n = b.num_rows().min(limit - count);
count += n;
let new_batch: Rc<RecordBatch> = Rc::new(DefaultRecordBatch {
schema: b.schema().clone(),
data: b.columns().clone(),
row_count: n,
});
Ok(new_batch)
}
}
Err(e) => Err(e),
}))
}
fn schema<'a>(&'a self) -> &'a Schema {
self.schema.as_ref()
}
}