use std::str::FromStr;
use datafusion_common::error::{DataFusionError, Result};
use datafusion_common::parsers::CompressionTypeVariant::{self, *};
use datafusion_common::GetExt;
#[cfg(feature = "compression")]
use async_compression::tokio::bufread::{
BzDecoder as AsyncBzDecoder, BzEncoder as AsyncBzEncoder,
GzipDecoder as AsyncGzDecoder, GzipEncoder as AsyncGzEncoder,
XzDecoder as AsyncXzDecoder, XzEncoder as AsyncXzEncoder,
ZstdDecoder as AsyncZstdDecoer, ZstdEncoder as AsyncZstdEncoder,
};
#[cfg(feature = "compression")]
use async_compression::tokio::write::{BzEncoder, GzipEncoder, XzEncoder, ZstdEncoder};
use bytes::Bytes;
#[cfg(feature = "compression")]
use bzip2::read::MultiBzDecoder;
#[cfg(feature = "compression")]
use flate2::read::MultiGzDecoder;
use futures::stream::BoxStream;
use futures::StreamExt;
#[cfg(feature = "compression")]
use futures::TryStreamExt;
use object_store::buffered::BufWriter;
use tokio::io::AsyncWrite;
#[cfg(feature = "compression")]
use tokio_util::io::{ReaderStream, StreamReader};
#[cfg(feature = "compression")]
use xz2::read::XzDecoder;
#[cfg(feature = "compression")]
use zstd::Decoder as ZstdDecoder;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct FileCompressionType {
variant: CompressionTypeVariant,
}
impl GetExt for FileCompressionType {
fn get_ext(&self) -> String {
match self.variant {
GZIP => ".gz".to_owned(),
BZIP2 => ".bz2".to_owned(),
XZ => ".xz".to_owned(),
ZSTD => ".zst".to_owned(),
UNCOMPRESSED => "".to_owned(),
}
}
}
impl From<CompressionTypeVariant> for FileCompressionType {
fn from(t: CompressionTypeVariant) -> Self {
Self { variant: t }
}
}
impl From<FileCompressionType> for CompressionTypeVariant {
fn from(t: FileCompressionType) -> Self {
t.variant
}
}
impl FromStr for FileCompressionType {
type Err = DataFusionError;
fn from_str(s: &str) -> Result<Self> {
let variant = CompressionTypeVariant::from_str(s).map_err(|_| {
DataFusionError::NotImplemented(format!("Unknown FileCompressionType: {s}"))
})?;
Ok(Self { variant })
}
}
impl FileCompressionType {
pub const GZIP: Self = Self { variant: GZIP };
pub const BZIP2: Self = Self { variant: BZIP2 };
pub const XZ: Self = Self { variant: XZ };
pub const ZSTD: Self = Self { variant: ZSTD };
pub const UNCOMPRESSED: Self = Self {
variant: UNCOMPRESSED,
};
pub fn get_variant(&self) -> &CompressionTypeVariant {
&self.variant
}
pub const fn is_compressed(&self) -> bool {
self.variant.is_compressed()
}
pub fn convert_to_compress_stream<'a>(
&self,
s: BoxStream<'a, Result<Bytes>>,
) -> Result<BoxStream<'a, Result<Bytes>>> {
Ok(match self.variant {
#[cfg(feature = "compression")]
GZIP => ReaderStream::new(AsyncGzEncoder::new(StreamReader::new(s)))
.map_err(DataFusionError::from)
.boxed(),
#[cfg(feature = "compression")]
BZIP2 => ReaderStream::new(AsyncBzEncoder::new(StreamReader::new(s)))
.map_err(DataFusionError::from)
.boxed(),
#[cfg(feature = "compression")]
XZ => ReaderStream::new(AsyncXzEncoder::new(StreamReader::new(s)))
.map_err(DataFusionError::from)
.boxed(),
#[cfg(feature = "compression")]
ZSTD => ReaderStream::new(AsyncZstdEncoder::new(StreamReader::new(s)))
.map_err(DataFusionError::from)
.boxed(),
#[cfg(not(feature = "compression"))]
GZIP | BZIP2 | XZ | ZSTD => {
return Err(DataFusionError::NotImplemented(
"Compression feature is not enabled".to_owned(),
))
}
UNCOMPRESSED => s.boxed(),
})
}
pub fn convert_async_writer(
&self,
w: BufWriter,
) -> Result<Box<dyn AsyncWrite + Send + Unpin>> {
Ok(match self.variant {
#[cfg(feature = "compression")]
GZIP => Box::new(GzipEncoder::new(w)),
#[cfg(feature = "compression")]
BZIP2 => Box::new(BzEncoder::new(w)),
#[cfg(feature = "compression")]
XZ => Box::new(XzEncoder::new(w)),
#[cfg(feature = "compression")]
ZSTD => Box::new(ZstdEncoder::new(w)),
#[cfg(not(feature = "compression"))]
GZIP | BZIP2 | XZ | ZSTD => {
return Err(DataFusionError::NotImplemented(
"Compression feature is not enabled".to_owned(),
))
}
UNCOMPRESSED => Box::new(w),
})
}
pub fn convert_stream<'a>(
&self,
s: BoxStream<'a, Result<Bytes>>,
) -> Result<BoxStream<'a, Result<Bytes>>> {
Ok(match self.variant {
#[cfg(feature = "compression")]
GZIP => {
let mut decoder = AsyncGzDecoder::new(StreamReader::new(s));
decoder.multiple_members(true);
ReaderStream::new(decoder)
.map_err(DataFusionError::from)
.boxed()
}
#[cfg(feature = "compression")]
BZIP2 => ReaderStream::new(AsyncBzDecoder::new(StreamReader::new(s)))
.map_err(DataFusionError::from)
.boxed(),
#[cfg(feature = "compression")]
XZ => ReaderStream::new(AsyncXzDecoder::new(StreamReader::new(s)))
.map_err(DataFusionError::from)
.boxed(),
#[cfg(feature = "compression")]
ZSTD => ReaderStream::new(AsyncZstdDecoer::new(StreamReader::new(s)))
.map_err(DataFusionError::from)
.boxed(),
#[cfg(not(feature = "compression"))]
GZIP | BZIP2 | XZ | ZSTD => {
return Err(DataFusionError::NotImplemented(
"Compression feature is not enabled".to_owned(),
))
}
UNCOMPRESSED => s.boxed(),
})
}
pub fn convert_read<T: std::io::Read + Send + 'static>(
&self,
r: T,
) -> Result<Box<dyn std::io::Read + Send>> {
Ok(match self.variant {
#[cfg(feature = "compression")]
GZIP => Box::new(MultiGzDecoder::new(r)),
#[cfg(feature = "compression")]
BZIP2 => Box::new(MultiBzDecoder::new(r)),
#[cfg(feature = "compression")]
XZ => Box::new(XzDecoder::new_multi_decoder(r)),
#[cfg(feature = "compression")]
ZSTD => match ZstdDecoder::new(r) {
Ok(decoder) => Box::new(decoder),
Err(e) => return Err(DataFusionError::External(Box::new(e))),
},
#[cfg(not(feature = "compression"))]
GZIP | BZIP2 | XZ | ZSTD => {
return Err(DataFusionError::NotImplemented(
"Compression feature is not enabled".to_owned(),
))
}
UNCOMPRESSED => Box::new(r),
})
}
}
pub trait FileTypeExt {
fn get_ext_with_compression(&self, c: FileCompressionType) -> Result<String>;
}
#[cfg(test)]
mod tests {
use std::str::FromStr;
use super::FileCompressionType;
use datafusion_common::error::DataFusionError;
use bytes::Bytes;
use futures::StreamExt;
#[test]
fn from_str() {
for (ext, compression_type) in [
("gz", FileCompressionType::GZIP),
("GZ", FileCompressionType::GZIP),
("gzip", FileCompressionType::GZIP),
("GZIP", FileCompressionType::GZIP),
("xz", FileCompressionType::XZ),
("XZ", FileCompressionType::XZ),
("bz2", FileCompressionType::BZIP2),
("BZ2", FileCompressionType::BZIP2),
("bzip2", FileCompressionType::BZIP2),
("BZIP2", FileCompressionType::BZIP2),
("zst", FileCompressionType::ZSTD),
("ZST", FileCompressionType::ZSTD),
("zstd", FileCompressionType::ZSTD),
("ZSTD", FileCompressionType::ZSTD),
("", FileCompressionType::UNCOMPRESSED),
] {
assert_eq!(
FileCompressionType::from_str(ext).unwrap(),
compression_type
);
}
assert!(matches!(
FileCompressionType::from_str("Unknown"),
Err(DataFusionError::NotImplemented(_))
));
}
#[tokio::test]
async fn test_bgzip_stream_decoding() -> Result<(), DataFusionError> {
#[rustfmt::skip]
let data = [
0x1f, 0x8b, 0x08, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0x06, 0x00, 0x42, 0x43,
0x02, 0x00,
0x1e, 0x00, 0x33, 0x31, 0xe2, 0x02, 0x00, 0x31, 0x29, 0x86, 0xd1, 0x03, 0x00, 0x00, 0x00,
0x1f, 0x8b, 0x08, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0x06, 0x00, 0x42, 0x43,
0x02, 0x00,
0x1e, 0x00, 0x33, 0x31, 0xe2, 0x02, 0x00, 0x31, 0x29, 0x86, 0xd1, 0x03, 0x00, 0x00, 0x00,
0x1f, 0x8b, 0x08, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0x06, 0x00, 0x42, 0x43,
0x02, 0x00, 0x1b, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
];
let stream = futures::stream::iter(vec![Ok::<Bytes, DataFusionError>(
Bytes::from(data.to_vec()),
)]);
let converted_stream =
FileCompressionType::GZIP.convert_stream(stream.boxed())?;
let vec = converted_stream
.map(|r| r.unwrap())
.collect::<Vec<Bytes>>()
.await;
let string_value = String::from_utf8_lossy(&vec[0]);
assert_eq!(string_value, "42\n42\n");
Ok(())
}
}