Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support pg_database for DBeaver. #5362

Merged
merged 1 commit into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/catalog/src/system_schema/pg_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

mod pg_catalog_memory_table;
mod pg_class;
mod pg_database;
mod pg_namespace;
mod table_names;

Expand All @@ -26,6 +27,7 @@ use lazy_static::lazy_static;
use paste::paste;
use pg_catalog_memory_table::get_schema_columns;
use pg_class::PGClass;
use pg_database::PGDatabase;
use pg_namespace::PGNamespace;
use session::context::{Channel, QueryContext};
use table::TableRef;
Expand Down Expand Up @@ -113,6 +115,10 @@ impl PGCatalogProvider {
PG_CLASS.to_string(),
self.build_table(PG_CLASS).expect(PG_NAMESPACE),
);
tables.insert(
PG_DATABASE.to_string(),
self.build_table(PG_DATABASE).expect(PG_DATABASE),
);
self.tables = tables;
}
}
Expand All @@ -135,6 +141,11 @@ impl SystemSchemaProviderInner for PGCatalogProvider {
self.catalog_manager.clone(),
self.namespace_oid_map.clone(),
))),
table_names::PG_DATABASE => Some(Arc::new(PGDatabase::new(
self.catalog_name.clone(),
self.catalog_manager.clone(),
self.namespace_oid_map.clone(),
))),
_ => None,
}
}
Expand Down
214 changes: 214 additions & 0 deletions src/catalog/src/system_schema/pg_catalog/pg_database.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::{Arc, Weak};

use arrow_schema::SchemaRef as ArrowSchemaRef;
use common_catalog::consts::PG_CATALOG_PG_DATABASE_TABLE_ID;
use common_error::ext::BoxedError;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{DfSendableRecordBatchStream, RecordBatch};
use datafusion::execution::TaskContext;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::schema::{Schema, SchemaRef};
use datatypes::value::Value;
use datatypes::vectors::{StringVectorBuilder, UInt32VectorBuilder, VectorRef};
use snafu::{OptionExt, ResultExt};
use store_api::storage::ScanRequest;

use super::pg_namespace::oid_map::PGNamespaceOidMapRef;
use super::{query_ctx, OID_COLUMN_NAME, PG_DATABASE};
use crate::error::{
CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu,
};
use crate::information_schema::Predicates;
use crate::system_schema::utils::tables::{string_column, u32_column};
use crate::system_schema::SystemTable;
use crate::CatalogManager;

// === column name ===
pub const DATNAME: &str = "datname";

/// The initial capacity of the vector builders.
const INIT_CAPACITY: usize = 42;

/// The `pg_catalog.database` table implementation.
pub(super) struct PGDatabase {
schema: SchemaRef,
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,

// Workaround to convert schema_name to a numeric id
namespace_oid_map: PGNamespaceOidMapRef,
}

impl PGDatabase {
pub(super) fn new(
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
namespace_oid_map: PGNamespaceOidMapRef,
) -> Self {
Self {
schema: Self::schema(),
catalog_name,
catalog_manager,
namespace_oid_map,
}
}

fn schema() -> SchemaRef {
Arc::new(Schema::new(vec![
u32_column(OID_COLUMN_NAME),
string_column(DATNAME),
]))
}

fn builder(&self) -> PGCDatabaseBuilder {
PGCDatabaseBuilder::new(
self.schema.clone(),
self.catalog_name.clone(),
self.catalog_manager.clone(),
self.namespace_oid_map.clone(),
)
}
}

impl DfPartitionStream for PGDatabase {
fn schema(&self) -> &ArrowSchemaRef {
self.schema.arrow_schema()
}

fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
let schema = self.schema.arrow_schema().clone();
let mut builder = self.builder();
Box::pin(DfRecordBatchStreamAdapter::new(
schema,
futures::stream::once(async move {
builder
.make_database(None)
.await
.map(|x| x.into_df_record_batch())
.map_err(Into::into)
}),
))
}
}

impl SystemTable for PGDatabase {
fn table_id(&self) -> table::metadata::TableId {
PG_CATALOG_PG_DATABASE_TABLE_ID
}

fn table_name(&self) -> &'static str {
PG_DATABASE
}

fn schema(&self) -> SchemaRef {
self.schema.clone()
}

fn to_stream(
&self,
request: ScanRequest,
) -> Result<common_recordbatch::SendableRecordBatchStream> {
let schema = self.schema.arrow_schema().clone();
let mut builder = self.builder();
let stream = Box::pin(DfRecordBatchStreamAdapter::new(
schema,
futures::stream::once(async move {
builder
.make_database(Some(request))
.await
.map(|x| x.into_df_record_batch())
.map_err(Into::into)
}),
));
Ok(Box::pin(
RecordBatchStreamAdapter::try_new(stream)
.map_err(BoxedError::new)
.context(InternalSnafu)?,
))
}
}

/// Builds the `pg_catalog.pg_database` table row by row
/// `oid` use schema name as a workaround since we don't have numeric schema id.
/// `nspname` is the schema name.
struct PGCDatabaseBuilder {
schema: SchemaRef,
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
namespace_oid_map: PGNamespaceOidMapRef,

oid: UInt32VectorBuilder,
datname: StringVectorBuilder,
}

impl PGCDatabaseBuilder {
fn new(
schema: SchemaRef,
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
namespace_oid_map: PGNamespaceOidMapRef,
) -> Self {
Self {
schema,
catalog_name,
catalog_manager,
namespace_oid_map,

oid: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
datname: StringVectorBuilder::with_capacity(INIT_CAPACITY),
}
}

async fn make_database(&mut self, request: Option<ScanRequest>) -> Result<RecordBatch> {
let catalog_name = self.catalog_name.clone();
let catalog_manager = self
.catalog_manager
.upgrade()
.context(UpgradeWeakCatalogManagerRefSnafu)?;
let predicates = Predicates::from_scan_request(&request);
for schema_name in catalog_manager
.schema_names(&catalog_name, query_ctx())
.await?
{
self.add_database(&predicates, &schema_name);
}
self.finish()
}

fn add_database(&mut self, predicates: &Predicates, schema_name: &str) {
let oid = self.namespace_oid_map.get_oid(schema_name);
let row: [(&str, &Value); 2] = [
(OID_COLUMN_NAME, &Value::from(oid)),
(DATNAME, &Value::from(schema_name)),
];

if !predicates.eval(&row) {
return;
}

self.oid.push(Some(oid));
self.datname.push(Some(schema_name));
}

fn finish(&mut self) -> Result<RecordBatch> {
let columns: Vec<VectorRef> =
vec![Arc::new(self.oid.finish()), Arc::new(self.datname.finish())];
RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
}
}
6 changes: 5 additions & 1 deletion src/catalog/src/system_schema/pg_catalog/table_names.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

pub const PG_DATABASE: &str = "pg_databases";
// https://www.postgresql.org/docs/current/catalog-pg-database.html
pub const PG_DATABASE: &str = "pg_database";
// https://www.postgresql.org/docs/current/catalog-pg-namespace.html
pub const PG_NAMESPACE: &str = "pg_namespace";
// https://www.postgresql.org/docs/current/catalog-pg-class.html
pub const PG_CLASS: &str = "pg_class";
// https://www.postgresql.org/docs/current/catalog-pg-type.html
pub const PG_TYPE: &str = "pg_type";
1 change: 1 addition & 0 deletions src/common/catalog/src/consts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ pub const INFORMATION_SCHEMA_REGION_STATISTICS_TABLE_ID: u32 = 35;
pub const PG_CATALOG_PG_CLASS_TABLE_ID: u32 = 256;
pub const PG_CATALOG_PG_TYPE_TABLE_ID: u32 = 257;
pub const PG_CATALOG_PG_NAMESPACE_TABLE_ID: u32 = 258;
pub const PG_CATALOG_PG_DATABASE_TABLE_ID: u32 = 259;

// ----- End of pg_catalog tables -----

Expand Down
14 changes: 13 additions & 1 deletion src/servers/src/postgres/fixtures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,15 @@ static LIMIT_CAST_PATTERN: Lazy<Regex> =
Lazy::new(|| Regex::new("(?i)(LIMIT\\s+\\d+)::bigint").unwrap());
pub(crate) fn rewrite_sql(query: &str) -> Cow<'_, str> {
//TODO(sunng87): remove this when we upgraded datafusion to 43 or newer
LIMIT_CAST_PATTERN.replace_all(query, "$1")
let query = LIMIT_CAST_PATTERN.replace_all(query, "$1");
// DBeaver tricky replacement for datafusion not support sql
// TODO: add more here
query
.replace(
"SELECT db.oid,db.* FROM pg_catalog.pg_database db",
"SELECT db.oid as _oid,db.* FROM pg_catalog.pg_database db",
)
.into()
}

#[cfg(test)]
Expand Down Expand Up @@ -215,5 +223,9 @@ mod test {

assert_eq!("SELECT * FROM number LIMIT 1", rewrite_sql(sql));
assert_eq!("SELECT * FROM number limit 1", rewrite_sql(sql2));
assert_eq!(
"SELECT db.oid as _oid,db.* FROM pg_catalog.pg_database db",
rewrite_sql("SELECT db.oid,db.* FROM pg_catalog.pg_database db")
);
}
}
13 changes: 13 additions & 0 deletions tests/cases/standalone/common/system/pg_catalog.result
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ select * from pg_catalog.pg_type;

Error: 4001(TableNotFound), Failed to plan SQL: Table not found: greptime.pg_catalog.pg_type

select * from pg_catalog.pg_database;

Error: 4001(TableNotFound), Failed to plan SQL: Table not found: greptime.pg_catalog.pg_database

-- SQLNESS PROTOCOL POSTGRES
select * from pg_catalog.pg_type order by oid;

Expand Down Expand Up @@ -102,6 +106,15 @@ select * from pg_catalog.pg_type order by oid;
| 20 | List | -1 |
+-----+-----------+--------+

-- SQLNESS PROTOCOL POSTGRES
select * from pg_catalog.pg_database where datname = 'public';

+------------+---------+
| oid | datname |
+------------+---------+
| 3927743705 | public |
+------------+---------+

-- \d
-- SQLNESS PROTOCOL POSTGRES
SELECT n.nspname as "Schema",
Expand Down
4 changes: 4 additions & 0 deletions tests/cases/standalone/common/system/pg_catalog.sql
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,14 @@ select current_schema();
select * from pg_catalog.pg_class;
select * from pg_catalog.pg_namespace;
select * from pg_catalog.pg_type;
select * from pg_catalog.pg_database;

-- SQLNESS PROTOCOL POSTGRES
select * from pg_catalog.pg_type order by oid;

-- SQLNESS PROTOCOL POSTGRES
select * from pg_catalog.pg_database where datname = 'public';

-- \d
-- SQLNESS PROTOCOL POSTGRES
SELECT n.nspname as "Schema",
Expand Down
Loading