use std::task::{Context, Poll};
use arrow::{
datatypes::SchemaRef, error::Result as ArrowResult, record_batch::RecordBatch,
};
use futures::Stream;
use crate::physical_plan::RecordBatchStream;
#[derive(Debug, Default, Clone)]
pub struct BatchIndex {
inner: std::sync::Arc<std::sync::Mutex<usize>>,
}
impl BatchIndex {
pub fn value(&self) -> usize {
let inner = self.inner.lock().unwrap();
*inner
}
pub fn incr(&self) {
let mut inner = self.inner.lock().unwrap();
*inner += 1;
}
}
#[derive(Debug, Default)]
pub(crate) struct TestStream {
data: Vec<RecordBatch>,
index: BatchIndex,
}
impl TestStream {
pub fn new(data: Vec<RecordBatch>) -> Self {
Self {
data,
..Default::default()
}
}
pub fn index(&self) -> BatchIndex {
self.index.clone()
}
}
impl Stream for TestStream {
type Item = ArrowResult<RecordBatch>;
fn poll_next(
self: std::pin::Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
let next_batch = self.index.value();
Poll::Ready(if next_batch < self.data.len() {
let next_batch = self.index.value();
self.index.incr();
Some(Ok(self.data[next_batch].clone()))
} else {
None
})
}
fn size_hint(&self) -> (usize, Option<usize>) {
(self.data.len(), Some(self.data.len()))
}
}
impl RecordBatchStream for TestStream {
fn schema(&self) -> SchemaRef {
self.data[0].schema()
}
}