Skip to content

Commit

Permalink
[task #8917] Implement information_schema.schemata
Browse files Browse the repository at this point in the history
Signed-off-by: tangruilin <[email protected]>
  • Loading branch information
Tangruilin committed Jan 27, 2024
1 parent ed24539 commit b38a158
Show file tree
Hide file tree
Showing 5 changed files with 158 additions and 2 deletions.
133 changes: 131 additions & 2 deletions datafusion/core/src/catalog/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,11 @@ pub(crate) const TABLES: &str = "tables";
pub(crate) const VIEWS: &str = "views";
pub(crate) const COLUMNS: &str = "columns";
pub(crate) const DF_SETTINGS: &str = "df_settings";
pub(crate) const SCHEMATA: &str = "schemata";

/// All information schema tables
pub const INFORMATION_SCHEMA_TABLES: &[&str] = &[TABLES, VIEWS, COLUMNS, DF_SETTINGS];
pub const INFORMATION_SCHEMA_TABLES: &[&str] =
&[TABLES, VIEWS, COLUMNS, DF_SETTINGS, SCHEMATA];

/// Implements the `information_schema` virtual schema and tables
///
Expand Down Expand Up @@ -115,6 +117,27 @@ impl InformationSchemaConfig {
DF_SETTINGS,
TableType::View,
);
builder.add_table(
&catalog_name,
INFORMATION_SCHEMA,
SCHEMATA,
TableType::View,
);
}
}

async fn make_schemata(&self, builder: &mut InformationSchemataBuilder) {
for catalog_name in self.catalog_list.catalog_names() {
let catalog = self.catalog_list.catalog(&catalog_name).unwrap();

for schema_name in catalog.schema_names() {
if schema_name != INFORMATION_SCHEMA {
if let Some(schema) = catalog.schema(&schema_name) {
let schema_owner = schema.owner_name();
builder.add_schemata(&catalog_name, &schema_name, schema_owner);
}
}
}
}
}

Expand Down Expand Up @@ -196,6 +219,7 @@ impl SchemaProvider for InformationSchemaProvider {
VIEWS.to_string(),
COLUMNS.to_string(),
DF_SETTINGS.to_string(),
SCHEMATA.to_string(),
]
}

Expand All @@ -209,6 +233,8 @@ impl SchemaProvider for InformationSchemaProvider {
Arc::new(InformationSchemaViews::new(config))
} else if name.eq_ignore_ascii_case("df_settings") {
Arc::new(InformationSchemaDfSettings::new(config))
} else if name.eq_ignore_ascii_case("schemata") {
Arc::new(InformationSchemata::new(config))
} else {
return None;
};
Expand All @@ -219,7 +245,10 @@ impl SchemaProvider for InformationSchemaProvider {
}

fn table_exist(&self, name: &str) -> bool {
matches!(name.to_ascii_lowercase().as_str(), TABLES | VIEWS | COLUMNS)
matches!(
name.to_ascii_lowercase().as_str(),
TABLES | VIEWS | COLUMNS | SCHEMATA
)
}
}

Expand Down Expand Up @@ -617,6 +646,106 @@ impl InformationSchemaColumnsBuilder {
}
}

struct InformationSchemata {
schema: SchemaRef,
config: InformationSchemaConfig,
}

impl InformationSchemata {
fn new(config: InformationSchemaConfig) -> Self {
let schema = Arc::new(Schema::new(vec![
Field::new("catalog_name", DataType::Utf8, false),
Field::new("schema_name", DataType::Utf8, false),
Field::new("schema_owner", DataType::Utf8, true),
Field::new("default_character_set_catalog", DataType::Utf8, true),
Field::new("default_character_set_schema", DataType::Utf8, true),
Field::new("default_character_set_name", DataType::Utf8, true),
Field::new("sql_path", DataType::Utf8, true),
]));
Self { schema, config }
}

fn builder(&self) -> InformationSchemataBuilder {
InformationSchemataBuilder {
schema: self.schema.clone(),
catalog_name: StringBuilder::new(),
schema_name: StringBuilder::new(),
schema_owner: StringBuilder::new(),
default_character_set_catalog: StringBuilder::new(),
default_character_set_schema: StringBuilder::new(),
default_character_set_name: StringBuilder::new(),
sql_path: StringBuilder::new(),
}
}
}

struct InformationSchemataBuilder {
schema: SchemaRef,
catalog_name: StringBuilder,
schema_name: StringBuilder,
schema_owner: StringBuilder,
default_character_set_catalog: StringBuilder,
default_character_set_schema: StringBuilder,
default_character_set_name: StringBuilder,
sql_path: StringBuilder,
}

impl InformationSchemataBuilder {
fn add_schemata(
&mut self,
catalog_name: &str,
schema_name: &str,
schema_owner: Option<&str>,
) {
self.catalog_name.append_value(catalog_name);
self.schema_name.append_value(schema_name);
match schema_owner {
Some(owner) => self.schema_owner.append_value(owner),
None => self.schema_owner.append_null(),
}
// refer to https://www.postgresql.org/docs/current/infoschema-schemata.html, these rows are Applies to a feature not available
self.default_character_set_catalog.append_null();
self.default_character_set_schema.append_null();
self.default_character_set_name.append_null();
self.sql_path.append_null();
}

fn finish(&mut self) -> RecordBatch {
RecordBatch::try_new(
self.schema.clone(),
vec![
Arc::new(self.catalog_name.finish()),
Arc::new(self.schema_name.finish()),
Arc::new(self.schema_owner.finish()),
Arc::new(self.default_character_set_catalog.finish()),
Arc::new(self.default_character_set_schema.finish()),
Arc::new(self.default_character_set_name.finish()),
Arc::new(self.sql_path.finish()),
],
)
.unwrap()
}
}

impl PartitionStream for InformationSchemata {
fn schema(&self) -> &SchemaRef {
&self.schema
}

fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
let mut builder = self.builder();
let config = self.config.clone();
Box::pin(RecordBatchStreamAdapter::new(
self.schema.clone(),
// TODO: Stream this
futures::stream::once(async move {
config.make_schemata(&mut builder).await;
Ok(builder.finish())
}),
))
}
}

struct InformationSchemaDfSettings {
schema: SchemaRef,
config: InformationSchemaConfig,
Expand Down
5 changes: 5 additions & 0 deletions datafusion/core/src/catalog/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ use crate::error::{DataFusionError, Result};
/// [`CatalogProvider`]: super::CatalogProvider
#[async_trait]
pub trait SchemaProvider: Sync + Send {
/// Returns the owner of the Schema, default is None
fn owner_name(&self) -> Option<&str> {
None
}

/// Returns this `SchemaProvider` as [`Any`] so that it can be downcast to a
/// specific implementation.
fn as_any(&self) -> &dyn Any;
Expand Down
17 changes: 17 additions & 0 deletions datafusion/sqllogictest/test_files/information_schema.slt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
statement error DataFusion error: Error during planning: table 'datafusion.information_schema.tables' not found
SELECT * from information_schema.tables

# Verify the information schema does not exit by default
statement error DataFusion error: Error during planning: table 'datafusion.information_schema.schemata' not found
SELECT * from information_schema.schemata

statement error DataFusion error: Error during planning: SHOW \[VARIABLE\] is not supported unless information_schema is enabled
show all

Expand All @@ -35,9 +39,16 @@ SELECT * from information_schema.tables;
----
datafusion information_schema columns VIEW
datafusion information_schema df_settings VIEW
datafusion information_schema schemata VIEW
datafusion information_schema tables VIEW
datafusion information_schema views VIEW

# Verify the information schema now does exist and is empty
query TTTTTTT rowsort
SELECT * from information_schema.schemata;
----
datafusion public NULL NULL NULL NULL NULL

# Disable information_schema and verify it now errors again
statement ok
set datafusion.catalog.information_schema = false
Expand Down Expand Up @@ -66,6 +77,7 @@ SELECT * from information_schema.tables;
----
datafusion information_schema columns VIEW
datafusion information_schema df_settings VIEW
datafusion information_schema schemata VIEW
datafusion information_schema tables VIEW
datafusion information_schema views VIEW
datafusion public t BASE TABLE
Expand All @@ -79,6 +91,7 @@ SELECT * from information_schema.tables;
----
datafusion information_schema columns VIEW
datafusion information_schema df_settings VIEW
datafusion information_schema schemata VIEW
datafusion information_schema tables VIEW
datafusion information_schema views VIEW
datafusion public t BASE TABLE
Expand All @@ -89,6 +102,7 @@ SELECT * from information_schema.tables WHERE tables.table_schema='information_s
----
datafusion information_schema columns VIEW
datafusion information_schema df_settings VIEW
datafusion information_schema schemata VIEW
datafusion information_schema tables VIEW
datafusion information_schema views VIEW

Expand All @@ -97,6 +111,7 @@ SELECT * from information_schema.tables WHERE information_schema.tables.table_sc
----
datafusion information_schema columns VIEW
datafusion information_schema df_settings VIEW
datafusion information_schema schemata VIEW
datafusion information_schema tables VIEW
datafusion information_schema views VIEW

Expand All @@ -105,6 +120,7 @@ SELECT * from information_schema.tables WHERE datafusion.information_schema.tabl
----
datafusion information_schema columns VIEW
datafusion information_schema df_settings VIEW
datafusion information_schema schemata VIEW
datafusion information_schema tables VIEW
datafusion information_schema views VIEW

Expand Down Expand Up @@ -391,6 +407,7 @@ SHOW TABLES
----
datafusion information_schema columns VIEW
datafusion information_schema df_settings VIEW
datafusion information_schema schemata VIEW
datafusion information_schema tables VIEW
datafusion information_schema views VIEW

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ SELECT * from information_schema.tables;
----
datafusion information_schema columns VIEW
datafusion information_schema df_settings VIEW
datafusion information_schema schemata VIEW
datafusion information_schema tables VIEW
datafusion information_schema views VIEW

Expand Down Expand Up @@ -79,16 +80,19 @@ SELECT * from information_schema.tables;
----
datafusion information_schema columns VIEW
datafusion information_schema df_settings VIEW
datafusion information_schema schemata VIEW
datafusion information_schema tables VIEW
datafusion information_schema views VIEW
my_catalog information_schema columns VIEW
my_catalog information_schema df_settings VIEW
my_catalog information_schema schemata VIEW
my_catalog information_schema tables VIEW
my_catalog information_schema views VIEW
my_catalog my_schema t1 BASE TABLE
my_catalog my_schema t2 BASE TABLE
my_other_catalog information_schema columns VIEW
my_other_catalog information_schema df_settings VIEW
my_other_catalog information_schema schemata VIEW
my_other_catalog information_schema tables VIEW
my_other_catalog information_schema views VIEW
my_other_catalog my_other_schema t3 BASE TABLE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ SELECT * from information_schema.tables;
----
datafusion information_schema columns VIEW
datafusion information_schema df_settings VIEW
datafusion information_schema schemata VIEW
datafusion information_schema tables VIEW
datafusion information_schema views VIEW
datafusion public physical BASE TABLE
Expand Down

0 comments on commit b38a158

Please sign in to comment.