Skip to content

Commit

Permalink
refactor: SchemaProvider::table can fail (#9307)
Browse files Browse the repository at this point in the history
  • Loading branch information
crepererum authored Feb 27, 2024
1 parent 85f7a8e commit 8f3d1ef
Show file tree
Hide file tree
Showing 8 changed files with 80 additions and 46 deletions.
43 changes: 24 additions & 19 deletions datafusion-cli/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::object_storage::get_object_store;
use async_trait::async_trait;
use datafusion::catalog::schema::SchemaProvider;
use datafusion::catalog::{CatalogProvider, CatalogProviderList};
use datafusion::common::{plan_datafusion_err, DataFusionError};
use datafusion::datasource::listing::{
ListingTable, ListingTableConfig, ListingTableUrl,
};
Expand Down Expand Up @@ -145,16 +146,21 @@ impl SchemaProvider for DynamicFileSchemaProvider {
self.inner.register_table(name, table)
}

async fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
let inner_table = self.inner.table(name).await;
async fn table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
let inner_table = self.inner.table(name).await?;
if inner_table.is_some() {
return inner_table;
return Ok(inner_table);
}

// if the inner schema provider didn't have a table by
// that name, try to treat it as a listing table
let state = self.state.upgrade()?.read().clone();
let table_url = ListingTableUrl::parse(name).ok()?;
let state = self
.state
.upgrade()
.ok_or_else(|| plan_datafusion_err!("locking error"))?
.read()
.clone();
let table_url = ListingTableUrl::parse(name)?;
let url: &Url = table_url.as_ref();

// If the store is already registered for this URL then `get_store`
Expand All @@ -169,18 +175,20 @@ impl SchemaProvider for DynamicFileSchemaProvider {
let mut options = HashMap::new();
let store =
get_object_store(&state, &mut options, table_url.scheme(), url)
.await
.unwrap();
.await?;
state.runtime_env().register_object_store(url, store);
}
}

let config = ListingTableConfig::new(table_url)
.infer(&state)
.await
.ok()?;
let config = match ListingTableConfig::new(table_url).infer(&state).await {
Ok(cfg) => cfg,
Err(_) => {
// treat as non-existing
return Ok(None);
}
};

Some(Arc::new(ListingTable::try_new(config).ok()?))
Ok(Some(Arc::new(ListingTable::try_new(config)?)))
}

fn deregister_table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
Expand Down Expand Up @@ -227,7 +235,7 @@ mod tests {
let (ctx, schema) = setup_context();

// That's a non registered table so expecting None here
let table = schema.table(&location).await;
let table = schema.table(&location).await.unwrap();
assert!(table.is_none());

// It should still create an object store for the location in the SessionState
Expand All @@ -251,7 +259,7 @@ mod tests {

let (ctx, schema) = setup_context();

let table = schema.table(&location).await;
let table = schema.table(&location).await.unwrap();
assert!(table.is_none());

let store = ctx
Expand All @@ -273,7 +281,7 @@ mod tests {

let (ctx, schema) = setup_context();

let table = schema.table(&location).await;
let table = schema.table(&location).await.unwrap();
assert!(table.is_none());

let store = ctx
Expand All @@ -289,13 +297,10 @@ mod tests {
}

#[tokio::test]
#[should_panic]
async fn query_invalid_location_test() {
let location = "ts://file.parquet";
let (_ctx, schema) = setup_context();

// This will panic, we cannot prevent that because `schema.table`
// returns an Option
schema.table(location).await;
assert!(schema.table(location).await.is_err());
}
}
4 changes: 2 additions & 2 deletions datafusion-examples/examples/external_dependency/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,9 @@ impl SchemaProvider for DirSchema {
tables.keys().cloned().collect::<Vec<_>>()
}

async fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
async fn table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
let tables = self.tables.read().unwrap();
tables.get(name).cloned()
Ok(tables.get(name).cloned())
}

fn table_exist(&self, name: &str) -> bool {
Expand Down
45 changes: 32 additions & 13 deletions datafusion/core/src/catalog/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
//! [Information Schema]: https://en.wikipedia.org/wiki/Information_schema
use async_trait::async_trait;
use datafusion_common::DataFusionError;
use std::{any::Any, sync::Arc};

use arrow::{
Expand Down Expand Up @@ -78,7 +79,10 @@ struct InformationSchemaConfig {

impl InformationSchemaConfig {
/// Construct the `information_schema.tables` virtual table
async fn make_tables(&self, builder: &mut InformationSchemaTablesBuilder) {
async fn make_tables(
&self,
builder: &mut InformationSchemaTablesBuilder,
) -> Result<(), DataFusionError> {
// create a mem table with the names of tables

for catalog_name in self.catalog_list.catalog_names() {
Expand All @@ -89,7 +93,7 @@ impl InformationSchemaConfig {
// schema name may not exist in the catalog, so we need to check
if let Some(schema) = catalog.schema(&schema_name) {
for table_name in schema.table_names() {
if let Some(table) = schema.table(&table_name).await {
if let Some(table) = schema.table(&table_name).await? {
builder.add_table(
&catalog_name,
&schema_name,
Expand Down Expand Up @@ -124,6 +128,8 @@ impl InformationSchemaConfig {
TableType::View,
);
}

Ok(())
}

async fn make_schemata(&self, builder: &mut InformationSchemataBuilder) {
Expand All @@ -141,7 +147,10 @@ impl InformationSchemaConfig {
}
}

async fn make_views(&self, builder: &mut InformationSchemaViewBuilder) {
async fn make_views(
&self,
builder: &mut InformationSchemaViewBuilder,
) -> Result<(), DataFusionError> {
for catalog_name in self.catalog_list.catalog_names() {
let catalog = self.catalog_list.catalog(&catalog_name).unwrap();

Expand All @@ -150,7 +159,7 @@ impl InformationSchemaConfig {
// schema name may not exist in the catalog, so we need to check
if let Some(schema) = catalog.schema(&schema_name) {
for table_name in schema.table_names() {
if let Some(table) = schema.table(&table_name).await {
if let Some(table) = schema.table(&table_name).await? {
builder.add_view(
&catalog_name,
&schema_name,
Expand All @@ -163,10 +172,15 @@ impl InformationSchemaConfig {
}
}
}

Ok(())
}

/// Construct the `information_schema.columns` virtual table
async fn make_columns(&self, builder: &mut InformationSchemaColumnsBuilder) {
async fn make_columns(
&self,
builder: &mut InformationSchemaColumnsBuilder,
) -> Result<(), DataFusionError> {
for catalog_name in self.catalog_list.catalog_names() {
let catalog = self.catalog_list.catalog(&catalog_name).unwrap();

Expand All @@ -175,7 +189,7 @@ impl InformationSchemaConfig {
// schema name may not exist in the catalog, so we need to check
if let Some(schema) = catalog.schema(&schema_name) {
for table_name in schema.table_names() {
if let Some(table) = schema.table(&table_name).await {
if let Some(table) = schema.table(&table_name).await? {
for (field_position, field) in
table.schema().fields().iter().enumerate()
{
Expand All @@ -193,6 +207,8 @@ impl InformationSchemaConfig {
}
}
}

Ok(())
}

/// Construct the `information_schema.df_settings` virtual table
Expand Down Expand Up @@ -223,7 +239,10 @@ impl SchemaProvider for InformationSchemaProvider {
]
}

async fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
async fn table(
&self,
name: &str,
) -> Result<Option<Arc<dyn TableProvider>>, DataFusionError> {
let config = self.config.clone();
let table: Arc<dyn PartitionStream> = if name.eq_ignore_ascii_case("tables") {
Arc::new(InformationSchemaTables::new(config))
Expand All @@ -236,12 +255,12 @@ impl SchemaProvider for InformationSchemaProvider {
} else if name.eq_ignore_ascii_case("schemata") {
Arc::new(InformationSchemata::new(config))
} else {
return None;
return Ok(None);
};

Some(Arc::new(
Ok(Some(Arc::new(
StreamingTable::try_new(table.schema().clone(), vec![table]).unwrap(),
))
)))
}

fn table_exist(&self, name: &str) -> bool {
Expand Down Expand Up @@ -292,7 +311,7 @@ impl PartitionStream for InformationSchemaTables {
self.schema.clone(),
// TODO: Stream this
futures::stream::once(async move {
config.make_tables(&mut builder).await;
config.make_tables(&mut builder).await?;
Ok(builder.finish())
}),
))
Expand Down Expand Up @@ -383,7 +402,7 @@ impl PartitionStream for InformationSchemaViews {
self.schema.clone(),
// TODO: Stream this
futures::stream::once(async move {
config.make_views(&mut builder).await;
config.make_views(&mut builder).await?;
Ok(builder.finish())
}),
))
Expand Down Expand Up @@ -497,7 +516,7 @@ impl PartitionStream for InformationSchemaColumns {
self.schema.clone(),
// TODO: Stream this
futures::stream::once(async move {
config.make_columns(&mut builder).await;
config.make_columns(&mut builder).await?;
Ok(builder.finish())
}),
))
Expand Down
10 changes: 7 additions & 3 deletions datafusion/core/src/catalog/listing_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,12 +175,16 @@ impl SchemaProvider for ListingSchemaProvider {
.collect()
}

async fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
self.tables
async fn table(
&self,
name: &str,
) -> Result<Option<Arc<dyn TableProvider>>, DataFusionError> {
Ok(self
.tables
.lock()
.expect("Can't lock tables")
.get(name)
.cloned()
.cloned())
}

fn register_table(
Expand Down
12 changes: 9 additions & 3 deletions datafusion/core/src/catalog/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,10 @@ pub trait SchemaProvider: Sync + Send {

/// Retrieves a specific table from the schema by name, if it exists,
/// otherwise returns `None`.
async fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>>;
async fn table(
&self,
name: &str,
) -> Result<Option<Arc<dyn TableProvider>>, DataFusionError>;

/// If supported by the implementation, adds a new table named `name` to
/// this schema.
Expand Down Expand Up @@ -111,8 +114,11 @@ impl SchemaProvider for MemorySchemaProvider {
.collect()
}

async fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
self.tables.get(name).map(|table| table.value().clone())
async fn table(
&self,
name: &str,
) -> Result<Option<Arc<dyn TableProvider>>, DataFusionError> {
Ok(self.tables.get(name).map(|table| table.value().clone()))
}

fn register_table(
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -783,7 +783,7 @@ impl SessionContext {
};

if let Some(schema) = maybe_schema {
if let Some(table_provider) = schema.table(&table).await {
if let Some(table_provider) = schema.table(&table).await? {
if table_provider.table_type() == table_type {
schema.deregister_table(&table)?;
return Ok(true);
Expand Down Expand Up @@ -1115,7 +1115,7 @@ impl SessionContext {
let table_ref = table_ref.into();
let table = table_ref.table().to_string();
let schema = self.state.read().schema_for_ref(table_ref)?;
match schema.table(&table).await {
match schema.table(&table).await? {
Some(ref provider) => Ok(Arc::clone(provider)),
_ => plan_err!("No table named '{table}'"),
}
Expand Down Expand Up @@ -1714,7 +1714,7 @@ impl SessionState {
let resolved = self.resolve_table_ref(&reference);
if let Entry::Vacant(v) = provider.tables.entry(resolved.to_string()) {
if let Ok(schema) = self.schema_for_ref(resolved) {
if let Some(table) = schema.table(table).await {
if let Some(table) = schema.table(table).await? {
v.insert(provider_as_source(table));
}
}
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,7 @@ impl DefaultPhysicalPlanner {
}) => {
let name = table_name.table();
let schema = session_state.schema_for_ref(table_name)?;
if let Some(provider) = schema.table(name).await {
if let Some(provider) = schema.table(name).await? {
let input_exec = self.create_initial_plan(input, session_state).await?;
provider.insert_into(session_state, input_exec, false).await
} else {
Expand All @@ -641,7 +641,7 @@ impl DefaultPhysicalPlanner {
}) => {
let name = table_name.table();
let schema = session_state.schema_for_ref(table_name)?;
if let Some(provider) = schema.table(name).await {
if let Some(provider) = schema.table(name).await? {
let input_exec = self.create_initial_plan(input, session_state).await?;
provider.insert_into(session_state, input_exec, true).await
} else {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/sql/create_drop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ async fn create_external_table_with_ddl() -> Result<()> {
let exists = schema.table_exist("dt");
assert!(exists, "Table should have been created!");

let table_schema = schema.table("dt").await.unwrap().schema();
let table_schema = schema.table("dt").await.unwrap().unwrap().schema();

assert_eq!(3, table_schema.fields().len());

Expand Down

0 comments on commit 8f3d1ef

Please sign in to comment.