use arrow::{
datatypes::SchemaRef, error::Result as ArrowResult, record_batch::RecordBatch,
};
use futures::{Stream, StreamExt};
use tokio::task::JoinHandle;
use tokio_stream::wrappers::ReceiverStream;
use super::common::AbortOnDropSingle;
use super::{RecordBatchStream, SendableRecordBatchStream};
pub struct RecordBatchReceiverStream {
schema: SchemaRef,
inner: ReceiverStream<ArrowResult<RecordBatch>>,
#[allow(dead_code)]
drop_helper: AbortOnDropSingle<()>,
}
impl RecordBatchReceiverStream {
pub fn create(
schema: &SchemaRef,
rx: tokio::sync::mpsc::Receiver<ArrowResult<RecordBatch>>,
join_handle: JoinHandle<()>,
) -> SendableRecordBatchStream {
let schema = schema.clone();
let inner = ReceiverStream::new(rx);
Box::pin(Self {
schema,
inner,
drop_helper: AbortOnDropSingle::new(join_handle),
})
}
}
impl Stream for RecordBatchReceiverStream {
type Item = ArrowResult<RecordBatch>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
self.inner.poll_next_unpin(cx)
}
}
impl RecordBatchStream for RecordBatchReceiverStream {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
}