use dashmap::DashMap;
use datafusion_common::{DataFusionError, Result};
use object_store::local::LocalFileSystem;
use object_store::ObjectStore;
use std::sync::Arc;
use url::Url;
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct ObjectStoreUrl {
url: Url,
}
impl ObjectStoreUrl {
pub fn parse(s: impl AsRef<str>) -> Result<Self> {
let mut parsed =
Url::parse(s.as_ref()).map_err(|e| DataFusionError::External(Box::new(e)))?;
let remaining = &parsed[url::Position::BeforePath..];
if !remaining.is_empty() && remaining != "/" {
return Err(DataFusionError::Execution(format!(
"ObjectStoreUrl must only contain scheme and authority, got: {remaining}"
)));
}
parsed.set_path("/");
Ok(Self { url: parsed })
}
pub fn local_filesystem() -> Self {
Self::parse("file://").unwrap()
}
pub fn as_str(&self) -> &str {
self.as_ref()
}
}
impl AsRef<str> for ObjectStoreUrl {
fn as_ref(&self) -> &str {
self.url.as_ref()
}
}
impl AsRef<Url> for ObjectStoreUrl {
fn as_ref(&self) -> &Url {
&self.url
}
}
impl std::fmt::Display for ObjectStoreUrl {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
self.as_str().fmt(f)
}
}
pub trait ObjectStoreProvider: Send + Sync + 'static {
fn get_by_url(&self, url: &Url) -> Result<Arc<dyn ObjectStore>>;
}
pub struct ObjectStoreRegistry {
object_stores: DashMap<String, Arc<dyn ObjectStore>>,
provider: Option<Arc<dyn ObjectStoreProvider>>,
}
impl std::fmt::Debug for ObjectStoreRegistry {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("ObjectStoreRegistry")
.field(
"schemes",
&self
.object_stores
.iter()
.map(|o| o.key().clone())
.collect::<Vec<_>>(),
)
.finish()
}
}
impl Default for ObjectStoreRegistry {
fn default() -> Self {
Self::new()
}
}
impl ObjectStoreRegistry {
pub fn new() -> Self {
ObjectStoreRegistry::new_with_provider(None)
}
pub fn new_with_provider(provider: Option<Arc<dyn ObjectStoreProvider>>) -> Self {
let object_stores: DashMap<String, Arc<dyn ObjectStore>> = DashMap::new();
object_stores.insert("file://".to_string(), Arc::new(LocalFileSystem::new()));
Self {
object_stores,
provider,
}
}
pub fn register_store(
&self,
scheme: impl AsRef<str>,
host: impl AsRef<str>,
store: Arc<dyn ObjectStore>,
) -> Option<Arc<dyn ObjectStore>> {
let s = format!("{}://{}", scheme.as_ref(), host.as_ref());
self.object_stores.insert(s, store)
}
pub fn get_by_url(&self, url: impl AsRef<Url>) -> Result<Arc<dyn ObjectStore>> {
let url = url.as_ref();
let s = &url[url::Position::BeforeScheme..url::Position::BeforePath];
let store = self.object_stores.get(s).map(|o| o.value().clone());
match store {
Some(store) => Ok(store),
None => match &self.provider {
Some(provider) => {
let store = provider.get_by_url(url)?;
let key =
&url[url::Position::BeforeScheme..url::Position::BeforePath];
self.object_stores.insert(key.to_owned(), store.clone());
Ok(store)
}
None => Err(DataFusionError::Internal(format!(
"No suitable object store found for {url}"
))),
},
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_object_store_url() {
let file = ObjectStoreUrl::parse("file://").unwrap();
assert_eq!(file.as_str(), "file:///");
let url = ObjectStoreUrl::parse("s3://bucket").unwrap();
assert_eq!(url.as_str(), "s3://bucket/");
let url = ObjectStoreUrl::parse("s3://username:password@host:123").unwrap();
assert_eq!(url.as_str(), "s3://username:password@host:123/");
let err = ObjectStoreUrl::parse("s3://bucket:invalid").unwrap_err();
assert_eq!(err.to_string(), "External error: invalid port number");
let err = ObjectStoreUrl::parse("s3://bucket?").unwrap_err();
assert_eq!(err.to_string(), "Execution error: ObjectStoreUrl must only contain scheme and authority, got: ?");
let err = ObjectStoreUrl::parse("s3://bucket?foo=bar").unwrap_err();
assert_eq!(err.to_string(), "Execution error: ObjectStoreUrl must only contain scheme and authority, got: ?foo=bar");
let err = ObjectStoreUrl::parse("s3://host:123/foo").unwrap_err();
assert_eq!(err.to_string(), "Execution error: ObjectStoreUrl must only contain scheme and authority, got: /foo");
let err =
ObjectStoreUrl::parse("s3://username:password@host:123/foo").unwrap_err();
assert_eq!(err.to_string(), "Execution error: ObjectStoreUrl must only contain scheme and authority, got: /foo");
}
}