Skip to content

Commit

Permalink
refactor: proxy sql read
Browse files Browse the repository at this point in the history
  • Loading branch information
chunshao90 committed May 16, 2023
1 parent 0c2dc8e commit c33c221
Show file tree
Hide file tree
Showing 9 changed files with 280 additions and 404 deletions.
2 changes: 1 addition & 1 deletion integration_tests/cases/common/dummy/select_1.result
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ Boolean(false),

SELECT NOT(1);

Failed to execute query, err: Server(ServerError { code: 500, msg: "Failed to execute interpreter, sql: SELECT NOT(1);. Caused by: Failed to execute select, err:Failed to execute logical plan, err:Failed to collect record batch stream, err:Stream error, msg:convert from arrow record batch, err:Internal error: NOT 'Literal { value: Int64(1) }' can't be evaluated because the expression's type is Int64, not boolean or NULL. This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker" })
Failed to execute query, err: Server(ServerError { code: 500, msg: "Failed to execute interpreter. Caused by: Failed to execute select, err:Failed to execute logical plan, err:Failed to collect record batch stream, err:Stream error, msg:convert from arrow record batch, err:Internal error: NOT 'Literal { value: Int64(1) }' can't be evaluated because the expression's type is Int64, not boolean or NULL. This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker" })

SELECT TRUE;

Expand Down
8 changes: 4 additions & 4 deletions integration_tests/cases/env/cluster/ddl/create_tables.result
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ affected_rows: 0

CREATE TABLE IF NOT EXISTS `05_create_tables_t`(c1 int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic;

Failed to execute query, err: Server(ServerError { code: 500, msg: "Failed to execute interpreter, sql: CREATE TABLE IF NOT EXISTS `05_create_tables_t`(c1 int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic;. Caused by: Failed to execute create table, err:Failed to create table by table manipulator, err:Failed to create table, msg:failed to create table by meta client, req:CreateTableRequest { schema_name: \"public\", name: \"05_create_tables_t\", engine: \"Analytic\", create_if_not_exist: true, options: {}, partition_table_info: None }, err:Missing table info, msg:created table is not found in the create table response." })
Failed to execute query, err: Server(ServerError { code: 500, msg: "Failed to execute interpreter. Caused by: Failed to execute create table, err:Failed to create table by table manipulator, err:Failed to create table, msg:failed to create table by meta client, req:CreateTableRequest { schema_name: \"public\", name: \"05_create_tables_t\", engine: \"Analytic\", create_if_not_exist: true, options: {}, partition_table_info: None }, err:Missing table info, msg:created table is not found in the create table response." })

CREATE TABLE `05_create_tables_t`(c1 int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic;

Failed to execute query, err: Server(ServerError { code: 500, msg: "Failed to execute interpreter, sql: CREATE TABLE `05_create_tables_t`(c1 int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic;. Caused by: Failed to execute create table, err:Failed to create table by table manipulator, err:Failed to create table, msg:failed to create table by meta client, req:CreateTableRequest { schema_name: \"public\", name: \"05_create_tables_t\", engine: \"Analytic\", create_if_not_exist: false, options: {}, partition_table_info: None }, err:Missing table info, msg:created table is not found in the create table response." })
Failed to execute query, err: Server(ServerError { code: 500, msg: "Failed to execute interpreter. Caused by: Failed to execute create table, err:Failed to create table by table manipulator, err:Failed to create table, msg:failed to create table by meta client, req:CreateTableRequest { schema_name: \"public\", name: \"05_create_tables_t\", engine: \"Analytic\", create_if_not_exist: false, options: {}, partition_table_info: None }, err:Missing table info, msg:created table is not found in the create table response." })

create table `05_create_tables_t2`(a int, b int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic with (enable_ttl='false');

Expand All @@ -67,11 +67,11 @@ Int32(4),

create table `05_create_tables_t2`(a int,b int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic;

Failed to execute query, err: Server(ServerError { code: 500, msg: "Failed to execute interpreter, sql: create table `05_create_tables_t2`(a int,b int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic;. Caused by: Failed to execute create table, err:Failed to create table by table manipulator, err:Failed to create table, msg:failed to create table by meta client, req:CreateTableRequest { schema_name: \"public\", name: \"05_create_tables_t2\", engine: \"Analytic\", create_if_not_exist: false, options: {}, partition_table_info: None }, err:Missing table info, msg:created table is not found in the create table response." })
Failed to execute query, err: Server(ServerError { code: 500, msg: "Failed to execute interpreter. Caused by: Failed to execute create table, err:Failed to create table by table manipulator, err:Failed to create table, msg:failed to create table by meta client, req:CreateTableRequest { schema_name: \"public\", name: \"05_create_tables_t2\", engine: \"Analytic\", create_if_not_exist: false, options: {}, partition_table_info: None }, err:Missing table info, msg:created table is not found in the create table response." })

create table `05_create_tables_t2`(a int,b int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic;

Failed to execute query, err: Server(ServerError { code: 500, msg: "Failed to execute interpreter, sql: create table `05_create_tables_t2`(a int,b int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic;. Caused by: Failed to execute create table, err:Failed to create table by table manipulator, err:Failed to create table, msg:failed to create table by meta client, req:CreateTableRequest { schema_name: \"public\", name: \"05_create_tables_t2\", engine: \"Analytic\", create_if_not_exist: false, options: {}, partition_table_info: None }, err:Missing table info, msg:created table is not found in the create table response." })
Failed to execute query, err: Server(ServerError { code: 500, msg: "Failed to execute interpreter. Caused by: Failed to execute create table, err:Failed to create table by table manipulator, err:Failed to create table, msg:failed to create table by meta client, req:CreateTableRequest { schema_name: \"public\", name: \"05_create_tables_t2\", engine: \"Analytic\", create_if_not_exist: false, options: {}, partition_table_info: None }, err:Missing table info, msg:created table is not found in the create table response." })

create table `05_create_tables_t3`(a int,b int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic;

Expand Down
6 changes: 3 additions & 3 deletions integration_tests/cases/env/local/ddl/create_tables.result
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ affected_rows: 0

CREATE TABLE `05_create_tables_t`(c1 int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic;

Failed to execute query, err: Server(ServerError { code: 500, msg: "Failed to execute interpreter, sql: CREATE TABLE `05_create_tables_t`(c1 int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic;. Caused by: Failed to execute create table, err:Failed to create table by table manipulator, err:Failed to operate table, err:Failed to operate table, msg:failed to create table on shard, request:CreateTableRequest { catalog_name: \"ceresdb\", schema_name: \"public\", table_name: \"05_create_tables_t\", table_id: None, table_schema: Schema { timestamp_index: 1, tsid_index: Some(0), column_schemas: ColumnSchemas { columns: [ColumnSchema { id: 1, name: \"tsid\", data_type: UInt64, is_nullable: false, is_tag: false, comment: \"\", escaped_name: \"tsid\", default_value: None }, ColumnSchema { id: 2, name: \"t\", data_type: Timestamp, is_nullable: false, is_tag: false, comment: \"\", escaped_name: \"t\", default_value: None }, ColumnSchema { id: 3, name: \"c1\", data_type: Int32, is_nullable: true, is_tag: false, comment: \"\", escaped_name: \"c1\", default_value: None }] }, version: 1, primary_key_indexes: [0, 1] }, engine: \"Analytic\", options: {}, state: Stable, shard_id: 0, partition_info: None }, err:Failed to create table, table already exists, table:05_create_tables_t." })
Failed to execute query, err: Server(ServerError { code: 500, msg: "Failed to execute interpreter. Caused by: Failed to execute create table, err:Failed to create table by table manipulator, err:Failed to operate table, err:Failed to operate table, msg:failed to create table on shard, request:CreateTableRequest { catalog_name: \"ceresdb\", schema_name: \"public\", table_name: \"05_create_tables_t\", table_id: None, table_schema: Schema { timestamp_index: 1, tsid_index: Some(0), column_schemas: ColumnSchemas { columns: [ColumnSchema { id: 1, name: \"tsid\", data_type: UInt64, is_nullable: false, is_tag: false, comment: \"\", escaped_name: \"tsid\", default_value: None }, ColumnSchema { id: 2, name: \"t\", data_type: Timestamp, is_nullable: false, is_tag: false, comment: \"\", escaped_name: \"t\", default_value: None }, ColumnSchema { id: 3, name: \"c1\", data_type: Int32, is_nullable: true, is_tag: false, comment: \"\", escaped_name: \"c1\", default_value: None }] }, version: 1, primary_key_indexes: [0, 1] }, engine: \"Analytic\", options: {}, state: Stable, shard_id: 0, partition_info: None }, err:Failed to create table, table already exists, table:05_create_tables_t." })

create table `05_create_tables_t2`(a int, b int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic with (enable_ttl='false');

Expand All @@ -67,11 +67,11 @@ Int32(4),

create table `05_create_tables_t2`(a int,b int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic;

Failed to execute query, err: Server(ServerError { code: 500, msg: "Failed to execute interpreter, sql: create table `05_create_tables_t2`(a int,b int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic;. Caused by: Failed to execute create table, err:Failed to create table by table manipulator, err:Failed to operate table, err:Failed to operate table, msg:failed to create table on shard, request:CreateTableRequest { catalog_name: \"ceresdb\", schema_name: \"public\", table_name: \"05_create_tables_t2\", table_id: None, table_schema: Schema { timestamp_index: 1, tsid_index: Some(0), column_schemas: ColumnSchemas { columns: [ColumnSchema { id: 1, name: \"tsid\", data_type: UInt64, is_nullable: false, is_tag: false, comment: \"\", escaped_name: \"tsid\", default_value: None }, ColumnSchema { id: 2, name: \"t\", data_type: Timestamp, is_nullable: false, is_tag: false, comment: \"\", escaped_name: \"t\", default_value: None }, ColumnSchema { id: 3, name: \"a\", data_type: Int32, is_nullable: true, is_tag: false, comment: \"\", escaped_name: \"a\", default_value: None }, ColumnSchema { id: 4, name: \"b\", data_type: Int32, is_nullable: true, is_tag: false, comment: \"\", escaped_name: \"b\", default_value: None }] }, version: 1, primary_key_indexes: [0, 1] }, engine: \"Analytic\", options: {}, state: Stable, shard_id: 0, partition_info: None }, err:Failed to create table, table already exists, table:05_create_tables_t2." })
Failed to execute query, err: Server(ServerError { code: 500, msg: "Failed to execute interpreter. Caused by: Failed to execute create table, err:Failed to create table by table manipulator, err:Failed to operate table, err:Failed to operate table, msg:failed to create table on shard, request:CreateTableRequest { catalog_name: \"ceresdb\", schema_name: \"public\", table_name: \"05_create_tables_t2\", table_id: None, table_schema: Schema { timestamp_index: 1, tsid_index: Some(0), column_schemas: ColumnSchemas { columns: [ColumnSchema { id: 1, name: \"tsid\", data_type: UInt64, is_nullable: false, is_tag: false, comment: \"\", escaped_name: \"tsid\", default_value: None }, ColumnSchema { id: 2, name: \"t\", data_type: Timestamp, is_nullable: false, is_tag: false, comment: \"\", escaped_name: \"t\", default_value: None }, ColumnSchema { id: 3, name: \"a\", data_type: Int32, is_nullable: true, is_tag: false, comment: \"\", escaped_name: \"a\", default_value: None }, ColumnSchema { id: 4, name: \"b\", data_type: Int32, is_nullable: true, is_tag: false, comment: \"\", escaped_name: \"b\", default_value: None }] }, version: 1, primary_key_indexes: [0, 1] }, engine: \"Analytic\", options: {}, state: Stable, shard_id: 0, partition_info: None }, err:Failed to create table, table already exists, table:05_create_tables_t2." })

create table `05_create_tables_t2`(a int,b int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic;

Failed to execute query, err: Server(ServerError { code: 500, msg: "Failed to execute interpreter, sql: create table `05_create_tables_t2`(a int,b int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic;. Caused by: Failed to execute create table, err:Failed to create table by table manipulator, err:Failed to operate table, err:Failed to operate table, msg:failed to create table on shard, request:CreateTableRequest { catalog_name: \"ceresdb\", schema_name: \"public\", table_name: \"05_create_tables_t2\", table_id: None, table_schema: Schema { timestamp_index: 1, tsid_index: Some(0), column_schemas: ColumnSchemas { columns: [ColumnSchema { id: 1, name: \"tsid\", data_type: UInt64, is_nullable: false, is_tag: false, comment: \"\", escaped_name: \"tsid\", default_value: None }, ColumnSchema { id: 2, name: \"t\", data_type: Timestamp, is_nullable: false, is_tag: false, comment: \"\", escaped_name: \"t\", default_value: None }, ColumnSchema { id: 3, name: \"a\", data_type: Int32, is_nullable: true, is_tag: false, comment: \"\", escaped_name: \"a\", default_value: None }, ColumnSchema { id: 4, name: \"b\", data_type: Int32, is_nullable: true, is_tag: false, comment: \"\", escaped_name: \"b\", default_value: None }] }, version: 1, primary_key_indexes: [0, 1] }, engine: \"Analytic\", options: {}, state: Stable, shard_id: 0, partition_info: None }, err:Failed to create table, table already exists, table:05_create_tables_t2." })
Failed to execute query, err: Server(ServerError { code: 500, msg: "Failed to execute interpreter. Caused by: Failed to execute create table, err:Failed to create table by table manipulator, err:Failed to operate table, err:Failed to operate table, msg:failed to create table on shard, request:CreateTableRequest { catalog_name: \"ceresdb\", schema_name: \"public\", table_name: \"05_create_tables_t2\", table_id: None, table_schema: Schema { timestamp_index: 1, tsid_index: Some(0), column_schemas: ColumnSchemas { columns: [ColumnSchema { id: 1, name: \"tsid\", data_type: UInt64, is_nullable: false, is_tag: false, comment: \"\", escaped_name: \"tsid\", default_value: None }, ColumnSchema { id: 2, name: \"t\", data_type: Timestamp, is_nullable: false, is_tag: false, comment: \"\", escaped_name: \"t\", default_value: None }, ColumnSchema { id: 3, name: \"a\", data_type: Int32, is_nullable: true, is_tag: false, comment: \"\", escaped_name: \"a\", default_value: None }, ColumnSchema { id: 4, name: \"b\", data_type: Int32, is_nullable: true, is_tag: false, comment: \"\", escaped_name: \"b\", default_value: None }] }, version: 1, primary_key_indexes: [0, 1] }, engine: \"Analytic\", options: {}, state: Stable, shard_id: 0, partition_info: None }, err:Failed to create table, table already exists, table:05_create_tables_t2." })

create table `05_create_tables_t3`(a int,b int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic;

Expand Down
43 changes: 4 additions & 39 deletions proxy/src/grpc/prom_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use common_types::{
};
use common_util::error::BoxError;
use http::StatusCode;
use interpreters::{context::Context as InterpreterContext, factory::Factory, interpreter::Output};
use interpreters::interpreter::Output;
use log::info;
use query_engine::executor::{Executor as QueryExecutor, RecordBatchVec};
use query_frontend::{
Expand Down Expand Up @@ -116,44 +116,9 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
msg: "Query is blocked",
})?;

// Execute in interpreter
let interpreter_ctx = InterpreterContext::builder(request_id, deadline)
// Use current ctx's catalog and schema as default catalog and schema
.default_catalog_and_schema(catalog.to_string(), schema)
.build();
let interpreter_factory = Factory::new(
self.instance.query_executor.clone(),
self.instance.catalog_manager.clone(),
self.instance.table_engine.clone(),
self.instance.table_manipulator.clone(),
);
let interpreter = interpreter_factory
.create(interpreter_ctx, plan)
.box_err()
.with_context(|| ErrWithCause {
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: "Failed to create interpreter",
})?;

let output = if let Some(deadline) = deadline {
tokio::time::timeout_at(
tokio::time::Instant::from_std(deadline),
interpreter.execute(),
)
.await
.box_err()
.context(ErrWithCause {
code: StatusCode::REQUEST_TIMEOUT,
msg: "Query timeout",
})?
} else {
interpreter.execute().await
}
.box_err()
.context(ErrWithCause {
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: "Failed to execute interpreter",
})?;
let output = self
.execute_plan(request_id, catalog, &schema, plan, deadline)
.await?;

let resp = convert_output(output, column_name)
.box_err()
Expand Down
Loading

0 comments on commit c33c221

Please sign in to comment.