From 86ac808477ab66e3abf6fd5e2aa770d4437b1a42 Mon Sep 17 00:00:00 2001 From: baojinri Date: Mon, 13 May 2024 14:46:50 +0800 Subject: [PATCH 1/8] impl basic auth --- src/proxy/src/auth/auth_with_file.rs | 77 ++++++++++++++++++++++ src/proxy/src/auth/mod.rs | 76 +++++++++++++++++++++ src/proxy/src/context.rs | 18 +++++ src/proxy/src/forward.rs | 25 ++++++- src/proxy/src/grpc/prom_query.rs | 15 +++++ src/proxy/src/grpc/route.rs | 55 ++++++++++++---- src/proxy/src/grpc/sql_query.rs | 30 +++++++++ src/proxy/src/http/prom.rs | 34 ++++++---- src/proxy/src/http/sql.rs | 7 +- src/proxy/src/influxdb/mod.rs | 2 +- src/proxy/src/lib.rs | 20 +++++- src/proxy/src/opentsdb/mod.rs | 2 +- src/proxy/src/read.rs | 2 + src/proxy/src/write.rs | 16 +++++ src/server/src/config.rs | 6 +- src/server/src/consts.rs | 2 - src/server/src/grpc/storage_service/mod.rs | 67 ++++++++++++++++--- src/server/src/http.rs | 11 +++- src/server/src/mysql/worker.rs | 3 +- src/server/src/postgresql/handler.rs | 3 +- src/server/src/server.rs | 23 ++++++- 21 files changed, 447 insertions(+), 47 deletions(-) create mode 100644 src/proxy/src/auth/auth_with_file.rs create mode 100644 src/proxy/src/auth/mod.rs diff --git a/src/proxy/src/auth/auth_with_file.rs b/src/proxy/src/auth/auth_with_file.rs new file mode 100644 index 0000000000..ba2074215c --- /dev/null +++ b/src/proxy/src/auth/auth_with_file.rs @@ -0,0 +1,77 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! The proxy module provides features such as forwarding and authentication, +//! adapts to different protocols. + +use std::{collections::HashMap, fs::File, io, io::BufRead, path::Path}; + +use snafu::ResultExt; + +use crate::auth::{Auth, FileNotExisted, OpenFile, ReadLine, Result, ADMIN_TENANT}; + +pub struct AuthWithFile { + file_path: String, + users: HashMap, +} + +impl AuthWithFile { + pub fn new(file_path: String) -> Self { + Self { + file_path, + users: HashMap::new(), + } + } +} + +impl Auth for AuthWithFile { + /// Load credential from file + fn load_credential(&mut self) -> Result<()> { + let path = Path::new(&self.file_path); + if !path.exists() { + return FileNotExisted { + path: self.file_path.clone(), + } + .fail(); + } + + let file = File::open(path).context(OpenFile)?; + let reader = io::BufReader::new(file); + + for line in reader.lines() { + let line = line.context(ReadLine)?; + if let Some((value, key)) = line.split_once(':') { + self.users.insert(key.to_string(), value.to_string()); + } + } + + Ok(()) + } + + fn identify(&self, tenant: Option, token: Option) -> bool { + if let Some(tenant) = tenant { + if tenant == ADMIN_TENANT { + return true; + } + } + + match token { + Some(token) => self.users.contains_key(&token), + None => false, + } + } +} diff --git a/src/proxy/src/auth/mod.rs b/src/proxy/src/auth/mod.rs new file mode 100644 index 0000000000..78552c818a --- /dev/null +++ b/src/proxy/src/auth/mod.rs @@ -0,0 +1,76 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! The proxy module provides features such as forwarding and authentication, +//! adapts to different protocols. + +use std::sync::{Arc, Mutex}; + +use macros::define_result; +use serde::{Deserialize, Serialize}; +use snafu::Snafu; + +pub mod auth_with_file; + +#[derive(Debug, Snafu)] +pub enum Error { + #[snafu(display("Failed to open file, err:{}.", source))] + OpenFile { source: std::io::Error }, + + #[snafu(display("Failed to read line, err:{}.", source))] + ReadLine { source: std::io::Error }, + + #[snafu(display("File not existed, file path:{}", path))] + FileNotExisted { path: String }, +} + +define_result!(Error); + +pub type AuthRef = Arc>; + +/// Header of tenant name +pub const TENANT_HEADER: &str = "x-horaedb-access-tenant"; +/// Header of tenant name +pub const TENANT_TOKEN_HEADER: &str = "x-horaedb-access-token"; + +/// Admin tenant name +pub const ADMIN_TENANT: &str = "admin"; + +#[derive(Debug, Clone, Deserialize, Serialize, Default)] +pub struct Config { + pub enable: bool, + pub auth_type: String, + pub source: String, +} + +pub trait Auth: Send + Sync { + fn load_credential(&mut self) -> Result<()>; + fn identify(&self, tenant: Option, token: Option) -> bool; +} + +#[derive(Default)] +pub struct AuthBase; + +impl Auth for AuthBase { + fn load_credential(&mut self) -> Result<()> { + Ok(()) + } + + fn identify(&self, _tenant: Option, _token: Option) -> bool { + true + } +} diff --git a/src/proxy/src/context.rs b/src/proxy/src/context.rs index cb2fcb6763..fb2c752f58 100644 --- a/src/proxy/src/context.rs +++ b/src/proxy/src/context.rs @@ -56,6 +56,10 @@ pub struct RequestContext { pub timeout: Option, /// Request id pub request_id: RequestId, + /// Tenant + pub tenant: Option, + /// Access token + pub access_token: Option, } impl RequestContext { @@ -69,6 +73,8 @@ pub struct Builder { catalog: String, schema: String, timeout: Option, + tenant: Option, + access_token: Option, } impl Builder { @@ -87,6 +93,16 @@ impl Builder { self } + pub fn tenant(mut self, tenant: Option) -> Self { + self.tenant = tenant; + self + } + + pub fn access_token(mut self, access_token: Option) -> Self { + self.access_token = access_token; + self + } + pub fn build(self) -> Result { ensure!(!self.catalog.is_empty(), MissingCatalog); ensure!(!self.schema.is_empty(), MissingSchema); @@ -96,6 +112,8 @@ impl Builder { schema: self.schema, timeout: self.timeout, request_id: RequestId::next_id(), + tenant: self.tenant, + access_token: self.access_token, }) } } diff --git a/src/proxy/src/forward.rs b/src/proxy/src/forward.rs index 0d8a856f79..bc73ed5ff4 100644 --- a/src/proxy/src/forward.rs +++ b/src/proxy/src/forward.rs @@ -37,7 +37,10 @@ use tonic::{ transport::{self, Channel}, }; -use crate::FORWARDED_FROM; +use crate::{ + auth::{TENANT_HEADER, TENANT_TOKEN_HEADER}, + FORWARDED_FROM, +}; #[derive(Debug, Snafu)] pub enum Error { @@ -206,6 +209,8 @@ pub struct ForwardRequest { pub table: String, pub req: tonic::Request, pub forwarded_from: Option, + pub tenant: Option, + pub access_token: Option, } impl Forwarder { @@ -283,6 +288,8 @@ impl Forwarder { table, req, forwarded_from, + tenant, + access_token, } = forward_req; let req_pb = RouteRequestPb { @@ -309,7 +316,7 @@ impl Forwarder { } }; - self.forward_with_endpoint(endpoint, req, forwarded_from, do_rpc) + self.forward_with_endpoint(endpoint, req, forwarded_from, tenant, access_token, do_rpc) .await } @@ -318,6 +325,8 @@ impl Forwarder { endpoint: Endpoint, mut req: tonic::Request, forwarded_from: Option, + tenant: Option, + access_token: Option, do_rpc: F, ) -> Result> where @@ -351,6 +360,16 @@ impl Forwarder { self.local_endpoint.to_string().parse().unwrap(), ); + if let Some(tenant) = tenant { + req.metadata_mut() + .insert(TENANT_HEADER, tenant.parse().unwrap()); + } + + if let Some(access_token) = access_token { + req.metadata_mut() + .insert(TENANT_TOKEN_HEADER, access_token.parse().unwrap()); + } + let client = self.get_or_create_client(&endpoint).await?; match do_rpc(client, req, &endpoint).await { Err(e) => { @@ -503,6 +522,8 @@ mod tests { table: table.to_string(), req: query_request.into_request(), forwarded_from: None, + tenant: None, + access_token: None, } }; diff --git a/src/proxy/src/grpc/prom_query.rs b/src/proxy/src/grpc/prom_query.rs index 673b6131a5..0a189eb920 100644 --- a/src/proxy/src/grpc/prom_query.rs +++ b/src/proxy/src/grpc/prom_query.rs @@ -81,6 +81,21 @@ impl Proxy { msg: "Missing context", code: StatusCode::BAD_REQUEST, })?; + + // Check if the tenant is authorized to access the database. + if !self + .auth + .lock() + .unwrap() + .identify(ctx.tenant.clone(), ctx.access_token) + { + return ErrNoCause { + msg: format!("tenant: {:?} unauthorized", ctx.tenant), + code: StatusCode::UNAUTHORIZED, + } + .fail(); + } + let schema = req_ctx.database; let catalog = self.instance.catalog_manager.default_catalog_name(); diff --git a/src/proxy/src/grpc/route.rs b/src/proxy/src/grpc/route.rs index 0955cec2bd..1ee085bed1 100644 --- a/src/proxy/src/grpc/route.rs +++ b/src/proxy/src/grpc/route.rs @@ -16,30 +16,59 @@ // under the License. use horaedbproto::storage::{RouteRequest as RouteRequestPb, RouteResponse}; +use http::StatusCode; use router::RouteRequest; -use crate::{error, metrics::GRPC_HANDLER_COUNTER_VEC, Context, Proxy}; +use crate::{ + error, + error::{ErrNoCause, Result}, + metrics::GRPC_HANDLER_COUNTER_VEC, + Context, Proxy, +}; impl Proxy { - pub async fn handle_route(&self, _ctx: Context, req: RouteRequestPb) -> RouteResponse { + pub async fn handle_route(&self, ctx: Context, req: RouteRequestPb) -> RouteResponse { let request = RouteRequest::new(req, true); - let routes = self.route(request).await; - let mut resp = RouteResponse::default(); - match routes { + match self.handle_route_internal(ctx, request).await { + Ok(v) => { + GRPC_HANDLER_COUNTER_VEC.route_succeeded.inc(); + v + } Err(e) => { - GRPC_HANDLER_COUNTER_VEC.route_failed.inc(); - error!("Failed to handle route, err:{e}"); - resp.header = Some(error::build_err_header(e)); + GRPC_HANDLER_COUNTER_VEC.route_failed.inc(); + RouteResponse { + header: Some(error::build_err_header(e)), + ..Default::default() + } } - Ok(v) => { - GRPC_HANDLER_COUNTER_VEC.route_succeeded.inc(); + } + } - resp.header = Some(error::build_ok_header()); - resp.routes = v; + async fn handle_route_internal( + &self, + ctx: Context, + req: RouteRequest, + ) -> Result { + // Check if the tenant is authorized to access the database. + if !self + .auth + .lock() + .unwrap() + .identify(ctx.tenant.clone(), ctx.access_token.clone()) + { + return ErrNoCause { + msg: format!("tenant: {:?} unauthorized", ctx.tenant), + code: StatusCode::UNAUTHORIZED, } + .fail(); } - resp + + let routes = self.route(req).await?; + Ok(RouteResponse { + header: Some(error::build_ok_header()), + routes, + }) } } diff --git a/src/proxy/src/grpc/sql_query.rs b/src/proxy/src/grpc/sql_query.rs index 4a2a5d8080..cc2332ced5 100644 --- a/src/proxy/src/grpc/sql_query.rs +++ b/src/proxy/src/grpc/sql_query.rs @@ -87,6 +87,20 @@ impl Proxy { .fail(); } + // Check if the tenant is authorized to access the database. + if !self + .auth + .lock() + .unwrap() + .identify(ctx.tenant.clone(), ctx.access_token.clone()) + { + return ErrNoCause { + msg: format!("tenant: {:?} unauthorized", ctx.tenant), + code: StatusCode::UNAUTHORIZED, + } + .fail(); + } + let req_context = req.context.as_ref().unwrap(); let schema = &req_context.database; @@ -156,6 +170,20 @@ impl Proxy { .fail(); } + // Check if the tenant is authorized to access the database. + if !self + .auth + .lock() + .unwrap() + .identify(ctx.tenant.clone(), ctx.access_token.clone()) + { + return ErrNoCause { + msg: format!("tenant: {:?} unauthorized", ctx.tenant), + code: StatusCode::UNAUTHORIZED, + } + .fail(); + } + let req_context = req.context.as_ref().unwrap(); let schema = &req_context.database; let req = match self.clone().maybe_forward_stream_sql_query(ctx, req).await { @@ -227,6 +255,8 @@ impl Proxy { table: req.tables[0].clone(), req: req.clone().into_request(), forwarded_from: ctx.forwarded_from.clone(), + tenant: ctx.tenant.clone(), + access_token: ctx.access_token.clone(), }; let do_query = |mut client: StorageServiceClient, request: tonic::Request, diff --git a/src/proxy/src/http/prom.rs b/src/proxy/src/http/prom.rs index c113b3e705..43ea5b3c8e 100644 --- a/src/proxy/src/http/prom.rs +++ b/src/proxy/src/http/prom.rs @@ -19,11 +19,7 @@ //! It converts write request to gRPC write request, and //! translates query request to SQL for execution. -use std::{ - collections::HashMap, - result::Result as StdResult, - time::{Duration, Instant}, -}; +use std::{collections::HashMap, result::Result as StdResult, time::Instant}; use async_trait::async_trait; use catalog::consts::DEFAULT_CATALOG; @@ -83,7 +79,7 @@ impl Proxy { }), table_requests: write_table_requests, }; - let ctx = ProxyContext::new(ctx.timeout, None); + let ctx = ProxyContext::new(ctx.timeout, None, ctx.tenant, ctx.access_token); match self.handle_write_internal(ctx, table_request).await { Ok(result) => { @@ -120,6 +116,20 @@ impl Proxy { metric: String, query: Query, ) -> Result { + // Check if the tenant is authorized to access the database. + if !self + .auth + .lock() + .unwrap() + .identify(ctx.tenant.clone(), ctx.access_token.clone()) + { + return ErrNoCause { + msg: format!("tenant: {:?} unauthorized", ctx.tenant), + code: StatusCode::UNAUTHORIZED, + } + .fail(); + } + let request_id = &ctx.request_id; let begin_instant = Instant::now(); let deadline = ctx.timeout.map(|t| begin_instant + t); @@ -178,14 +188,14 @@ impl Proxy { /// another HoraeDB instance. pub async fn handle_prom_grpc_query( &self, - timeout: Option, + ctx: ProxyContext, req: PrometheusRemoteQueryRequest, ) -> Result { - let ctx = req.context.context(ErrNoCause { + let req_ctx = req.context.context(ErrNoCause { code: StatusCode::BAD_REQUEST, msg: "request context is missing", })?; - let database = ctx.database.to_string(); + let database = req_ctx.database.to_string(); let query = Query::decode(req.query.as_ref()) .box_err() .context(Internal { @@ -193,7 +203,9 @@ impl Proxy { })?; let metric = find_metric(&query.matchers)?; let builder = RequestContext::builder() - .timeout(timeout) + .timeout(ctx.timeout) + .tenant(ctx.tenant) + .access_token(ctx.access_token) .schema(database) // TODO: support different catalog .catalog(DEFAULT_CATALOG.to_string()); @@ -235,7 +247,7 @@ impl RemoteStorage for Proxy { query: query.encode_to_vec(), }; if let Some(resp) = self - .maybe_forward_prom_remote_query(metric.clone(), remote_req) + .maybe_forward_prom_remote_query(ctx, metric.clone(), remote_req) .await .map_err(|e| { error!("Forward prom remote query failed, err:{e}"); diff --git a/src/proxy/src/http/sql.rs b/src/proxy/src/http/sql.rs index 1b1fffdc96..a2c96959ae 100644 --- a/src/proxy/src/http/sql.rs +++ b/src/proxy/src/http/sql.rs @@ -50,7 +50,12 @@ impl Proxy { req: Request, ) -> Result { let schema = &ctx.schema; - let ctx = Context::new(ctx.timeout, None); + let ctx = Context::new( + ctx.timeout, + None, + ctx.tenant.clone(), + ctx.access_token.clone(), + ); let query_res = self .handle_sql( diff --git a/src/proxy/src/influxdb/mod.rs b/src/proxy/src/influxdb/mod.rs index 8c8346dfc6..49b79668ab 100644 --- a/src/proxy/src/influxdb/mod.rs +++ b/src/proxy/src/influxdb/mod.rs @@ -81,7 +81,7 @@ impl Proxy { }), table_requests: write_table_requests, }; - let proxy_context = Context::new(ctx.timeout, None); + let proxy_context = Context::new(ctx.timeout, None, ctx.tenant, ctx.access_token); match self .handle_write_internal(proxy_context, table_request) diff --git a/src/proxy/src/lib.rs b/src/proxy/src/lib.rs index de07fc2df4..a78722b2d4 100644 --- a/src/proxy/src/lib.rs +++ b/src/proxy/src/lib.rs @@ -20,6 +20,7 @@ #![feature(trait_alias)] +pub mod auth; pub mod context; pub mod error; mod error_util; @@ -80,6 +81,8 @@ use table_engine::{ use tonic::{transport::Channel, IntoRequest}; use crate::{ + auth::AuthRef, + context::RequestContext, error::{ErrNoCause, ErrWithCause, Error, Internal, Result}, forward::{ForwardRequest, ForwardResult, Forwarder, ForwarderRef}, hotspot::HotspotRecorder, @@ -105,6 +108,7 @@ impl Default for SubTableAccessPerm { } pub struct Proxy { + auth: AuthRef, router: Arc, forwarder: ForwarderRef, instance: InstanceRef, @@ -122,6 +126,7 @@ pub struct Proxy { impl Proxy { #[allow(clippy::too_many_arguments)] pub fn new( + auth: AuthRef, router: Arc, instance: InstanceRef, forward_config: forward::Config, @@ -143,6 +148,7 @@ impl Proxy { )); Self { + auth, router, instance, forwarder, @@ -168,6 +174,7 @@ impl Proxy { async fn maybe_forward_prom_remote_query( &self, + ctx: &RequestContext, metric: String, req: PrometheusRemoteQueryRequest, ) -> Result>> { @@ -177,6 +184,8 @@ impl Proxy { table: metric, req: req.into_request(), forwarded_from: None, + tenant: ctx.tenant.clone(), + access_token: ctx.access_token.clone(), }; let do_query = |mut client: StorageServiceClient, request: tonic::Request, @@ -536,14 +545,23 @@ pub struct Context { request_id: RequestId, timeout: Option, forwarded_from: Option, + tenant: Option, + access_token: Option, } impl Context { - pub fn new(timeout: Option, forwarded_from: Option) -> Self { + pub fn new( + timeout: Option, + forwarded_from: Option, + tenant: Option, + access_token: Option, + ) -> Self { Self { request_id: RequestId::next_id(), timeout, forwarded_from, + tenant, + access_token, } } } diff --git a/src/proxy/src/opentsdb/mod.rs b/src/proxy/src/opentsdb/mod.rs index aae4a4a2d6..9d5ed02f81 100644 --- a/src/proxy/src/opentsdb/mod.rs +++ b/src/proxy/src/opentsdb/mod.rs @@ -69,7 +69,7 @@ impl Proxy { }), table_requests: write_table_requests, }; - let proxy_context = Context::new(ctx.timeout, None); + let proxy_context = Context::new(ctx.timeout, None, ctx.tenant, ctx.access_token); match self .handle_write_internal(proxy_context, table_request) diff --git a/src/proxy/src/read.rs b/src/proxy/src/read.rs index a34875d13b..ea7eb39bb9 100644 --- a/src/proxy/src/read.rs +++ b/src/proxy/src/read.rs @@ -316,6 +316,8 @@ impl Proxy { table: table_name.unwrap(), req: sql_request.into_request(), forwarded_from: ctx.forwarded_from, + tenant: ctx.tenant, + access_token: ctx.access_token, }; let do_query = |mut client: StorageServiceClient, request: tonic::Request, diff --git a/src/proxy/src/write.rs b/src/proxy/src/write.rs index 1e5ae3564a..24e8e71498 100644 --- a/src/proxy/src/write.rs +++ b/src/proxy/src/write.rs @@ -87,6 +87,20 @@ impl Proxy { ctx: Context, req: WriteRequest, ) -> Result { + // Check if the tenant is authorized to access the database. + if !self + .auth + .lock() + .unwrap() + .identify(ctx.tenant.clone(), ctx.access_token.clone()) + { + return ErrNoCause { + msg: format!("tenant: {:?} unauthorized", ctx.tenant), + code: StatusCode::UNAUTHORIZED, + } + .fail(); + } + let write_context = req.context.clone(); let resp = if self.cluster_with_meta { self.handle_write_with_meta(ctx, req).await? @@ -453,6 +467,8 @@ impl Proxy { endpoint, tonic::Request::new(table_write_request), ctx.forwarded_from, + ctx.tenant, + ctx.access_token, do_write, ) .await; diff --git a/src/server/src/config.rs b/src/server/src/config.rs index 054b15329d..b07cef9595 100644 --- a/src/server/src/config.rs +++ b/src/server/src/config.rs @@ -25,7 +25,7 @@ use std::{ use cluster::config::SchemaConfig; use common_types::schema::TIMESTAMP_COLUMN; use meta_client::types::ShardId; -use proxy::{forward, hotspot, SubTableAccessPerm}; +use proxy::{auth, forward, hotspot, SubTableAccessPerm}; use router::{ endpoint::Endpoint, rule_based::{ClusterView, RuleList}, @@ -141,6 +141,9 @@ pub struct ServerConfig { /// The minimum length of the response body to compress. pub resp_compress_min_length: ReadableSize, + /// Auth config + pub auth: auth::Config, + /// Config for forwarding pub forward: forward::Config, @@ -178,6 +181,7 @@ impl Default for ServerConfig { http_max_body_size: ReadableSize::mb(64), grpc_server_cq_count: 20, resp_compress_min_length: ReadableSize::mb(4), + auth: auth::Config::default(), forward: forward::Config::default(), auto_create_table: true, default_schema_config: Default::default(), diff --git a/src/server/src/consts.rs b/src/server/src/consts.rs index b7b9831002..680e607833 100644 --- a/src/server/src/consts.rs +++ b/src/server/src/consts.rs @@ -21,8 +21,6 @@ pub const CATALOG_HEADER: &str = "x-horaedb-catalog"; /// Header of schema name pub const SCHEMA_HEADER: &str = "x-horaedb-schema"; -/// Header of tenant name -pub const TENANT_HEADER: &str = "x-horaedb-access-tenant"; /// Header of content encoding type pub const CONTENT_ENCODING_HEADER: &str = "content-encoding"; diff --git a/src/server/src/grpc/storage_service/mod.rs b/src/server/src/grpc/storage_service/mod.rs index 142c5e23ed..1230b80d6b 100644 --- a/src/server/src/grpc/storage_service/mod.rs +++ b/src/server/src/grpc/storage_service/mod.rs @@ -35,7 +35,7 @@ use horaedbproto::{ }, }; use http::StatusCode; -use proxy::{Context, Proxy, FORWARDED_FROM}; +use proxy::{auth::TENANT_TOKEN_HEADER, Context, Proxy, FORWARDED_FROM}; use table_engine::engine::EngineRuntimes; use time_ext::InstantExt; @@ -148,7 +148,12 @@ impl StorageService for StorageServiceImpl { ) -> Result, tonic::Status> { let begin_instant = Instant::now(); let proxy = self.proxy.clone(); - let ctx = Context::new(self.timeout, get_forwarded_from(&req)); + let ctx = Context::new( + self.timeout, + get_forwarded_from(&req), + get_tenant(&req), + get_access_token(&req), + ); let stream = self.stream_sql_query_internal(ctx, proxy, req).await; @@ -166,13 +171,30 @@ fn get_forwarded_from(req: &tonic::Request) -> Option { .map(|value| value.to_str().unwrap().to_string()) } +fn get_tenant(req: &tonic::Request) -> Option { + req.metadata() + .get(TENANT_TOKEN_HEADER) + .map(|value| value.to_str().unwrap().to_string()) +} + +fn get_access_token(req: &tonic::Request) -> Option { + req.metadata() + .get(TENANT_TOKEN_HEADER) + .map(|value| value.to_str().unwrap().to_string()) +} + // TODO: Use macros to simplify duplicate code impl StorageServiceImpl { async fn route_internal( &self, req: tonic::Request, ) -> Result, tonic::Status> { - let ctx = Context::new(self.timeout, get_forwarded_from(&req)); + let ctx = Context::new( + self.timeout, + get_forwarded_from(&req), + get_tenant(&req), + get_access_token(&req), + ); let req = req.into_inner(); let proxy = self.proxy.clone(); @@ -199,7 +221,12 @@ impl StorageServiceImpl { &self, req: tonic::Request, ) -> Result, tonic::Status> { - let ctx = Context::new(self.timeout, get_forwarded_from(&req)); + let ctx = Context::new( + self.timeout, + get_forwarded_from(&req), + get_tenant(&req), + get_access_token(&req), + ); let req = req.into_inner(); let proxy = self.proxy.clone(); @@ -236,7 +263,12 @@ impl StorageServiceImpl { &self, req: tonic::Request, ) -> Result, tonic::Status> { - let ctx = Context::new(self.timeout, get_forwarded_from(&req)); + let ctx = Context::new( + self.timeout, + get_forwarded_from(&req), + get_tenant(&req), + get_access_token(&req), + ); let proxy = self.proxy.clone(); let join_handle = self @@ -262,11 +294,18 @@ impl StorageServiceImpl { &self, req: tonic::Request, ) -> Result, tonic::Status> { + let ctx = Context::new( + self.timeout, + get_forwarded_from(&req), + get_tenant(&req), + get_access_token(&req), + ); + let req = req.into_inner(); let proxy = self.proxy.clone(); - let timeout = self.timeout; + let join_handle = self.runtimes.read_runtime.spawn(async move { - match proxy.handle_prom_grpc_query(timeout, req).await { + match proxy.handle_prom_grpc_query(ctx, req).await { Ok(v) => v, Err(e) => PrometheusRemoteQueryResponse { header: Some(error::build_err_header( @@ -295,7 +334,12 @@ impl StorageServiceImpl { &self, req: tonic::Request, ) -> Result, tonic::Status> { - let ctx = Context::new(self.timeout, get_forwarded_from(&req)); + let ctx = Context::new( + self.timeout, + get_forwarded_from(&req), + get_tenant(&req), + get_access_token(&req), + ); let req = req.into_inner(); let proxy = self.proxy.clone(); @@ -331,7 +375,12 @@ impl StorageServiceImpl { &self, req: tonic::Request>, ) -> Result, tonic::Status> { - let ctx = Context::new(self.timeout, get_forwarded_from(&req)); + let ctx = Context::new( + self.timeout, + get_forwarded_from(&req), + get_tenant(&req), + get_access_token(&req), + ); let mut stream = req.into_inner(); let proxy = self.proxy.clone(); diff --git a/src/server/src/http.rs b/src/server/src/http.rs index 95ce7a1889..c0200db288 100644 --- a/src/server/src/http.rs +++ b/src/server/src/http.rs @@ -37,6 +37,7 @@ use macros::define_result; use profile::Profiler; use prom_remote_api::web; use proxy::{ + auth::{ADMIN_TENANT, TENANT_HEADER, TENANT_TOKEN_HEADER}, context::RequestContext, handlers::{self}, http::sql::{convert_output, Request}, @@ -732,9 +733,13 @@ impl Service { header::optional::(consts::CATALOG_HEADER) .and(header::optional::(consts::SCHEMA_HEADER)) - .and(header::optional::(consts::TENANT_HEADER)) + .and(header::optional::(TENANT_HEADER)) + .and(header::optional::(TENANT_TOKEN_HEADER)) .and_then( - move |catalog: Option<_>, schema: Option<_>, _tenant: Option<_>| { + move |catalog: Option<_>, + schema: Option<_>, + _tenant: Option<_>, + access_token: Option<_>| { // Clone the captured variables let default_catalog = default_catalog.clone(); let schema = schema.unwrap_or_else(|| default_schema.clone()); @@ -743,6 +748,8 @@ impl Service { .catalog(catalog.unwrap_or(default_catalog)) .schema(schema) .timeout(timeout) + .tenant(Some(ADMIN_TENANT.to_string())) + .access_token(access_token) .build() .context(CreateContext) .map_err(reject::custom) diff --git a/src/server/src/mysql/worker.rs b/src/server/src/mysql/worker.rs index b25e756bc5..37d9f676ab 100644 --- a/src/server/src/mysql/worker.rs +++ b/src/server/src/mysql/worker.rs @@ -23,7 +23,7 @@ use logger::{error, info}; use opensrv_mysql::{ AsyncMysqlShim, ErrorKind, InitWriter, QueryResultWriter, StatementMetaWriter, }; -use proxy::{context::RequestContext, http::sql::Request, Proxy}; +use proxy::{auth::ADMIN_TENANT, context::RequestContext, http::sql::Request, Proxy}; use snafu::ResultExt; use crate::{ @@ -152,6 +152,7 @@ where .catalog(session.catalog().to_string()) .schema(session.schema().to_string()) .timeout(self.timeout) + .tenant(Some(ADMIN_TENANT.to_string())) .build() .context(CreateContext) } diff --git a/src/server/src/postgresql/handler.rs b/src/server/src/postgresql/handler.rs index feecbca8f7..4ed80e795c 100644 --- a/src/server/src/postgresql/handler.rs +++ b/src/server/src/postgresql/handler.rs @@ -31,7 +31,7 @@ use pgwire::{ }, error::{PgWireError, PgWireResult}, }; -use proxy::{context::RequestContext, http::sql::Request, Proxy}; +use proxy::{auth::ADMIN_TENANT, context::RequestContext, http::sql::Request, Proxy}; use snafu::ResultExt; use crate::postgresql::error::{CreateContext, Result}; @@ -89,6 +89,7 @@ impl PostgresqlHandler { .catalog(default_catalog) .schema(default_schema) .timeout(self.timeout) + .tenant(Some(ADMIN_TENANT.to_string())) .build() .context(CreateContext) } diff --git a/src/server/src/server.rs b/src/server/src/server.rs index ddc151c809..7da3f276ba 100644 --- a/src/server/src/server.rs +++ b/src/server/src/server.rs @@ -17,7 +17,7 @@ //! Server -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use catalog::manager::ManagerRef; use cluster::ClusterRef; @@ -29,6 +29,7 @@ use macros::define_result; use notifier::notifier::RequestNotifiers; use partition_table_engine::PartitionTableEngine; use proxy::{ + auth::{auth_with_file::AuthWithFile, AuthBase, AuthRef}, hotspot::HotspotRecorder, instance::{DynamicConfig, Instance, InstanceRef}, limiter::Limiter, @@ -135,6 +136,9 @@ pub enum Error { #[snafu(display("Failed to build query engine, err:{source}"))] BuildQueryEngine { source: query_engine::error::Error }, + + #[snafu(display("Failed to load auth credential, err:{source}"))] + LoadCredential { source: proxy::auth::Error }, } define_result!(Error); @@ -451,7 +455,24 @@ impl Builder { .enable .then(|| Arc::new(RequestNotifiers::default())); + // Build auth + let auth: AuthRef = + if self.server_config.auth.enable && self.server_config.auth.auth_type == "file" { + Arc::new(Mutex::new(AuthWithFile::new( + self.server_config.auth.source.clone(), + ))) + } else { + Arc::new(Mutex::new(AuthBase)) + }; + + // Load auth credential + auth.lock() + .unwrap() + .load_credential() + .context(LoadCredential)?; + let proxy = Arc::new(Proxy::new( + auth, router.clone(), instance.clone(), self.server_config.forward, From 94bb684733218d64f9acdd6dd1de5332c9af3538 Mon Sep 17 00:00:00 2001 From: baojinri Date: Tue, 14 May 2024 12:29:33 +0800 Subject: [PATCH 2/8] use interceptor to auth --- Cargo.lock | 1 + src/proxy/Cargo.toml | 1 + src/proxy/src/auth/auth_with_file.rs | 60 +++++++++++++++------- src/proxy/src/auth/mod.rs | 31 ++--------- src/proxy/src/context.rs | 21 +++----- src/proxy/src/forward.rs | 28 +++------- src/proxy/src/grpc/prom_query.rs | 14 ----- src/proxy/src/grpc/route.rs | 55 +++++--------------- src/proxy/src/grpc/sql_query.rs | 31 +---------- src/proxy/src/http/prom.rs | 19 +------ src/proxy/src/http/sql.rs | 7 +-- src/proxy/src/influxdb/mod.rs | 2 +- src/proxy/src/lib.rs | 16 ++---- src/proxy/src/opentsdb/mod.rs | 2 +- src/proxy/src/read.rs | 3 +- src/proxy/src/write.rs | 17 +----- src/server/src/consts.rs | 2 + src/server/src/grpc/mod.rs | 18 +++++-- src/server/src/grpc/storage_service/mod.rs | 33 ++++-------- src/server/src/http.rs | 11 +--- src/server/src/mysql/worker.rs | 3 +- src/server/src/postgresql/handler.rs | 3 +- src/server/src/server.rs | 26 ++++------ 23 files changed, 127 insertions(+), 277 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c8e331b0f7..a0c6dd8bfa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5428,6 +5428,7 @@ dependencies = [ "arrow 49.0.0", "arrow_ext", "async-trait", + "base64 0.13.1", "bytes", "catalog", "clru", diff --git a/src/proxy/Cargo.toml b/src/proxy/Cargo.toml index 1f66b131da..c6831b3682 100644 --- a/src/proxy/Cargo.toml +++ b/src/proxy/Cargo.toml @@ -34,6 +34,7 @@ workspace = true arrow = { workspace = true } arrow_ext = { workspace = true } async-trait = { workspace = true } +base64 = { workspace = true } bytes = { workspace = true } catalog = { workspace = true } clru = { workspace = true } diff --git a/src/proxy/src/auth/auth_with_file.rs b/src/proxy/src/auth/auth_with_file.rs index ba2074215c..fe977f19a5 100644 --- a/src/proxy/src/auth/auth_with_file.rs +++ b/src/proxy/src/auth/auth_with_file.rs @@ -18,29 +18,35 @@ //! The proxy module provides features such as forwarding and authentication, //! adapts to different protocols. -use std::{collections::HashMap, fs::File, io, io::BufRead, path::Path}; +use std::{collections::HashSet, fs::File, io, io::BufRead, path::Path}; +use base64::encode; use snafu::ResultExt; +use tonic::service::Interceptor; -use crate::auth::{Auth, FileNotExisted, OpenFile, ReadLine, Result, ADMIN_TENANT}; +use crate::auth::{FileNotExisted, OpenFile, ReadLine, Result, AUTHORIZATION}; +#[derive(Debug, Clone, Default)] pub struct AuthWithFile { + enable: bool, file_path: String, - users: HashMap, + auth: HashSet, } impl AuthWithFile { - pub fn new(file_path: String) -> Self { + pub fn new(enable: bool, file_path: String) -> Self { Self { + enable, file_path, - users: HashMap::new(), + auth: HashSet::new(), } } -} -impl Auth for AuthWithFile { - /// Load credential from file - fn load_credential(&mut self) -> Result<()> { + pub fn load_credential(&mut self) -> Result<()> { + if !self.enable { + return Ok(()); + } + let path = Path::new(&self.file_path); if !path.exists() { return FileNotExisted { @@ -54,24 +60,40 @@ impl Auth for AuthWithFile { for line in reader.lines() { let line = line.context(ReadLine)?; - if let Some((value, key)) = line.split_once(':') { - self.users.insert(key.to_string(), value.to_string()); - } + let mut buf = Vec::with_capacity(line.len() + 1); + buf.extend_from_slice(line.as_bytes()); + let auth = encode(&buf); + self.auth.insert(format!("Basic {}", auth)); } Ok(()) } - fn identify(&self, tenant: Option, token: Option) -> bool { - if let Some(tenant) = tenant { - if tenant == ADMIN_TENANT { - return true; - } + fn identify(&self, authorization: Option) -> bool { + if !self.enable { + return true; } - match token { - Some(token) => self.users.contains_key(&token), + match authorization { + Some(auth) => self.auth.contains(&auth), None => false, } } } + +impl Interceptor for AuthWithFile { + fn call( + &mut self, + request: tonic::Request<()>, + ) -> std::result::Result, tonic::Status> { + let metadata = request.metadata(); + let authorization = metadata + .get(AUTHORIZATION) + .map(|v| v.to_str().unwrap().to_string()); + if self.identify(authorization) { + Ok(request) + } else { + Err(tonic::Status::unauthenticated("unauthenticated")) + } + } +} diff --git a/src/proxy/src/auth/mod.rs b/src/proxy/src/auth/mod.rs index 78552c818a..735e0a3287 100644 --- a/src/proxy/src/auth/mod.rs +++ b/src/proxy/src/auth/mod.rs @@ -18,8 +18,6 @@ //! The proxy module provides features such as forwarding and authentication, //! adapts to different protocols. -use std::sync::{Arc, Mutex}; - use macros::define_result; use serde::{Deserialize, Serialize}; use snafu::Snafu; @@ -40,15 +38,10 @@ pub enum Error { define_result!(Error); -pub type AuthRef = Arc>; - -/// Header of tenant name -pub const TENANT_HEADER: &str = "x-horaedb-access-tenant"; -/// Header of tenant name -pub const TENANT_TOKEN_HEADER: &str = "x-horaedb-access-token"; +/// Header of authorization +pub const AUTHORIZATION: &str = "authorization"; -/// Admin tenant name -pub const ADMIN_TENANT: &str = "admin"; +pub const DEFAULT_AUTH_TYPE: &str = "file"; #[derive(Debug, Clone, Deserialize, Serialize, Default)] pub struct Config { @@ -56,21 +49,3 @@ pub struct Config { pub auth_type: String, pub source: String, } - -pub trait Auth: Send + Sync { - fn load_credential(&mut self) -> Result<()>; - fn identify(&self, tenant: Option, token: Option) -> bool; -} - -#[derive(Default)] -pub struct AuthBase; - -impl Auth for AuthBase { - fn load_credential(&mut self) -> Result<()> { - Ok(()) - } - - fn identify(&self, _tenant: Option, _token: Option) -> bool { - true - } -} diff --git a/src/proxy/src/context.rs b/src/proxy/src/context.rs index fb2c752f58..b4452609cc 100644 --- a/src/proxy/src/context.rs +++ b/src/proxy/src/context.rs @@ -56,10 +56,8 @@ pub struct RequestContext { pub timeout: Option, /// Request id pub request_id: RequestId, - /// Tenant - pub tenant: Option, - /// Access token - pub access_token: Option, + /// authorization + pub authorization: Option, } impl RequestContext { @@ -73,8 +71,7 @@ pub struct Builder { catalog: String, schema: String, timeout: Option, - tenant: Option, - access_token: Option, + authorization: Option, } impl Builder { @@ -93,13 +90,8 @@ impl Builder { self } - pub fn tenant(mut self, tenant: Option) -> Self { - self.tenant = tenant; - self - } - - pub fn access_token(mut self, access_token: Option) -> Self { - self.access_token = access_token; + pub fn authorization(mut self, tenant: Option) -> Self { + self.authorization = tenant; self } @@ -112,8 +104,7 @@ impl Builder { schema: self.schema, timeout: self.timeout, request_id: RequestId::next_id(), - tenant: self.tenant, - access_token: self.access_token, + authorization: self.authorization, }) } } diff --git a/src/proxy/src/forward.rs b/src/proxy/src/forward.rs index bc73ed5ff4..93d0dae96b 100644 --- a/src/proxy/src/forward.rs +++ b/src/proxy/src/forward.rs @@ -37,10 +37,7 @@ use tonic::{ transport::{self, Channel}, }; -use crate::{ - auth::{TENANT_HEADER, TENANT_TOKEN_HEADER}, - FORWARDED_FROM, -}; +use crate::{auth::AUTHORIZATION, FORWARDED_FROM}; #[derive(Debug, Snafu)] pub enum Error { @@ -209,8 +206,7 @@ pub struct ForwardRequest { pub table: String, pub req: tonic::Request, pub forwarded_from: Option, - pub tenant: Option, - pub access_token: Option, + pub authorization: Option, } impl Forwarder { @@ -288,8 +284,7 @@ impl Forwarder { table, req, forwarded_from, - tenant, - access_token, + authorization, } = forward_req; let req_pb = RouteRequestPb { @@ -316,7 +311,7 @@ impl Forwarder { } }; - self.forward_with_endpoint(endpoint, req, forwarded_from, tenant, access_token, do_rpc) + self.forward_with_endpoint(endpoint, req, forwarded_from, authorization, do_rpc) .await } @@ -325,8 +320,7 @@ impl Forwarder { endpoint: Endpoint, mut req: tonic::Request, forwarded_from: Option, - tenant: Option, - access_token: Option, + authorization: Option, do_rpc: F, ) -> Result> where @@ -360,14 +354,9 @@ impl Forwarder { self.local_endpoint.to_string().parse().unwrap(), ); - if let Some(tenant) = tenant { - req.metadata_mut() - .insert(TENANT_HEADER, tenant.parse().unwrap()); - } - - if let Some(access_token) = access_token { + if let Some(authorization) = authorization { req.metadata_mut() - .insert(TENANT_TOKEN_HEADER, access_token.parse().unwrap()); + .insert(AUTHORIZATION, authorization.parse().unwrap()); } let client = self.get_or_create_client(&endpoint).await?; @@ -522,8 +511,7 @@ mod tests { table: table.to_string(), req: query_request.into_request(), forwarded_from: None, - tenant: None, - access_token: None, + authorization: None, } }; diff --git a/src/proxy/src/grpc/prom_query.rs b/src/proxy/src/grpc/prom_query.rs index 0a189eb920..e596f61a1d 100644 --- a/src/proxy/src/grpc/prom_query.rs +++ b/src/proxy/src/grpc/prom_query.rs @@ -82,20 +82,6 @@ impl Proxy { code: StatusCode::BAD_REQUEST, })?; - // Check if the tenant is authorized to access the database. - if !self - .auth - .lock() - .unwrap() - .identify(ctx.tenant.clone(), ctx.access_token) - { - return ErrNoCause { - msg: format!("tenant: {:?} unauthorized", ctx.tenant), - code: StatusCode::UNAUTHORIZED, - } - .fail(); - } - let schema = req_ctx.database; let catalog = self.instance.catalog_manager.default_catalog_name(); diff --git a/src/proxy/src/grpc/route.rs b/src/proxy/src/grpc/route.rs index 1ee085bed1..0955cec2bd 100644 --- a/src/proxy/src/grpc/route.rs +++ b/src/proxy/src/grpc/route.rs @@ -16,59 +16,30 @@ // under the License. use horaedbproto::storage::{RouteRequest as RouteRequestPb, RouteResponse}; -use http::StatusCode; use router::RouteRequest; -use crate::{ - error, - error::{ErrNoCause, Result}, - metrics::GRPC_HANDLER_COUNTER_VEC, - Context, Proxy, -}; +use crate::{error, metrics::GRPC_HANDLER_COUNTER_VEC, Context, Proxy}; impl Proxy { - pub async fn handle_route(&self, ctx: Context, req: RouteRequestPb) -> RouteResponse { + pub async fn handle_route(&self, _ctx: Context, req: RouteRequestPb) -> RouteResponse { let request = RouteRequest::new(req, true); + let routes = self.route(request).await; - match self.handle_route_internal(ctx, request).await { - Ok(v) => { - GRPC_HANDLER_COUNTER_VEC.route_succeeded.inc(); - v - } + let mut resp = RouteResponse::default(); + match routes { Err(e) => { - error!("Failed to handle route, err:{e}"); GRPC_HANDLER_COUNTER_VEC.route_failed.inc(); - RouteResponse { - header: Some(error::build_err_header(e)), - ..Default::default() - } + + error!("Failed to handle route, err:{e}"); + resp.header = Some(error::build_err_header(e)); } - } - } + Ok(v) => { + GRPC_HANDLER_COUNTER_VEC.route_succeeded.inc(); - async fn handle_route_internal( - &self, - ctx: Context, - req: RouteRequest, - ) -> Result { - // Check if the tenant is authorized to access the database. - if !self - .auth - .lock() - .unwrap() - .identify(ctx.tenant.clone(), ctx.access_token.clone()) - { - return ErrNoCause { - msg: format!("tenant: {:?} unauthorized", ctx.tenant), - code: StatusCode::UNAUTHORIZED, + resp.header = Some(error::build_ok_header()); + resp.routes = v; } - .fail(); } - - let routes = self.route(req).await?; - Ok(RouteResponse { - header: Some(error::build_ok_header()), - routes, - }) + resp } } diff --git a/src/proxy/src/grpc/sql_query.rs b/src/proxy/src/grpc/sql_query.rs index cc2332ced5..16c6201703 100644 --- a/src/proxy/src/grpc/sql_query.rs +++ b/src/proxy/src/grpc/sql_query.rs @@ -87,20 +87,6 @@ impl Proxy { .fail(); } - // Check if the tenant is authorized to access the database. - if !self - .auth - .lock() - .unwrap() - .identify(ctx.tenant.clone(), ctx.access_token.clone()) - { - return ErrNoCause { - msg: format!("tenant: {:?} unauthorized", ctx.tenant), - code: StatusCode::UNAUTHORIZED, - } - .fail(); - } - let req_context = req.context.as_ref().unwrap(); let schema = &req_context.database; @@ -170,20 +156,6 @@ impl Proxy { .fail(); } - // Check if the tenant is authorized to access the database. - if !self - .auth - .lock() - .unwrap() - .identify(ctx.tenant.clone(), ctx.access_token.clone()) - { - return ErrNoCause { - msg: format!("tenant: {:?} unauthorized", ctx.tenant), - code: StatusCode::UNAUTHORIZED, - } - .fail(); - } - let req_context = req.context.as_ref().unwrap(); let schema = &req_context.database; let req = match self.clone().maybe_forward_stream_sql_query(ctx, req).await { @@ -255,8 +227,7 @@ impl Proxy { table: req.tables[0].clone(), req: req.clone().into_request(), forwarded_from: ctx.forwarded_from.clone(), - tenant: ctx.tenant.clone(), - access_token: ctx.access_token.clone(), + authorization: ctx.authorization.clone(), }; let do_query = |mut client: StorageServiceClient, request: tonic::Request, diff --git a/src/proxy/src/http/prom.rs b/src/proxy/src/http/prom.rs index 43ea5b3c8e..847da23d0a 100644 --- a/src/proxy/src/http/prom.rs +++ b/src/proxy/src/http/prom.rs @@ -79,7 +79,7 @@ impl Proxy { }), table_requests: write_table_requests, }; - let ctx = ProxyContext::new(ctx.timeout, None, ctx.tenant, ctx.access_token); + let ctx = ProxyContext::new(ctx.timeout, None, ctx.authorization); match self.handle_write_internal(ctx, table_request).await { Ok(result) => { @@ -116,20 +116,6 @@ impl Proxy { metric: String, query: Query, ) -> Result { - // Check if the tenant is authorized to access the database. - if !self - .auth - .lock() - .unwrap() - .identify(ctx.tenant.clone(), ctx.access_token.clone()) - { - return ErrNoCause { - msg: format!("tenant: {:?} unauthorized", ctx.tenant), - code: StatusCode::UNAUTHORIZED, - } - .fail(); - } - let request_id = &ctx.request_id; let begin_instant = Instant::now(); let deadline = ctx.timeout.map(|t| begin_instant + t); @@ -204,8 +190,7 @@ impl Proxy { let metric = find_metric(&query.matchers)?; let builder = RequestContext::builder() .timeout(ctx.timeout) - .tenant(ctx.tenant) - .access_token(ctx.access_token) + .authorization(ctx.authorization) .schema(database) // TODO: support different catalog .catalog(DEFAULT_CATALOG.to_string()); diff --git a/src/proxy/src/http/sql.rs b/src/proxy/src/http/sql.rs index a2c96959ae..8f61039eac 100644 --- a/src/proxy/src/http/sql.rs +++ b/src/proxy/src/http/sql.rs @@ -50,12 +50,7 @@ impl Proxy { req: Request, ) -> Result { let schema = &ctx.schema; - let ctx = Context::new( - ctx.timeout, - None, - ctx.tenant.clone(), - ctx.access_token.clone(), - ); + let ctx = Context::new(ctx.timeout, None, ctx.authorization.clone()); let query_res = self .handle_sql( diff --git a/src/proxy/src/influxdb/mod.rs b/src/proxy/src/influxdb/mod.rs index 49b79668ab..3e24478684 100644 --- a/src/proxy/src/influxdb/mod.rs +++ b/src/proxy/src/influxdb/mod.rs @@ -81,7 +81,7 @@ impl Proxy { }), table_requests: write_table_requests, }; - let proxy_context = Context::new(ctx.timeout, None, ctx.tenant, ctx.access_token); + let proxy_context = Context::new(ctx.timeout, None, ctx.authorization); match self .handle_write_internal(proxy_context, table_request) diff --git a/src/proxy/src/lib.rs b/src/proxy/src/lib.rs index a78722b2d4..f2c36b13c7 100644 --- a/src/proxy/src/lib.rs +++ b/src/proxy/src/lib.rs @@ -81,7 +81,6 @@ use table_engine::{ use tonic::{transport::Channel, IntoRequest}; use crate::{ - auth::AuthRef, context::RequestContext, error::{ErrNoCause, ErrWithCause, Error, Internal, Result}, forward::{ForwardRequest, ForwardResult, Forwarder, ForwarderRef}, @@ -108,7 +107,6 @@ impl Default for SubTableAccessPerm { } pub struct Proxy { - auth: AuthRef, router: Arc, forwarder: ForwarderRef, instance: InstanceRef, @@ -126,7 +124,6 @@ pub struct Proxy { impl Proxy { #[allow(clippy::too_many_arguments)] pub fn new( - auth: AuthRef, router: Arc, instance: InstanceRef, forward_config: forward::Config, @@ -148,7 +145,6 @@ impl Proxy { )); Self { - auth, router, instance, forwarder, @@ -184,8 +180,7 @@ impl Proxy { table: metric, req: req.into_request(), forwarded_from: None, - tenant: ctx.tenant.clone(), - access_token: ctx.access_token.clone(), + authorization: ctx.authorization.clone(), }; let do_query = |mut client: StorageServiceClient, request: tonic::Request, @@ -545,23 +540,20 @@ pub struct Context { request_id: RequestId, timeout: Option, forwarded_from: Option, - tenant: Option, - access_token: Option, + authorization: Option, } impl Context { pub fn new( timeout: Option, forwarded_from: Option, - tenant: Option, - access_token: Option, + authorization: Option, ) -> Self { Self { request_id: RequestId::next_id(), timeout, forwarded_from, - tenant, - access_token, + authorization, } } } diff --git a/src/proxy/src/opentsdb/mod.rs b/src/proxy/src/opentsdb/mod.rs index 9d5ed02f81..80affd0392 100644 --- a/src/proxy/src/opentsdb/mod.rs +++ b/src/proxy/src/opentsdb/mod.rs @@ -69,7 +69,7 @@ impl Proxy { }), table_requests: write_table_requests, }; - let proxy_context = Context::new(ctx.timeout, None, ctx.tenant, ctx.access_token); + let proxy_context = Context::new(ctx.timeout, None, ctx.authorization); match self .handle_write_internal(proxy_context, table_request) diff --git a/src/proxy/src/read.rs b/src/proxy/src/read.rs index ea7eb39bb9..67e1473d71 100644 --- a/src/proxy/src/read.rs +++ b/src/proxy/src/read.rs @@ -316,8 +316,7 @@ impl Proxy { table: table_name.unwrap(), req: sql_request.into_request(), forwarded_from: ctx.forwarded_from, - tenant: ctx.tenant, - access_token: ctx.access_token, + authorization: ctx.authorization, }; let do_query = |mut client: StorageServiceClient, request: tonic::Request, diff --git a/src/proxy/src/write.rs b/src/proxy/src/write.rs index 24e8e71498..fd8e54bd5b 100644 --- a/src/proxy/src/write.rs +++ b/src/proxy/src/write.rs @@ -87,20 +87,6 @@ impl Proxy { ctx: Context, req: WriteRequest, ) -> Result { - // Check if the tenant is authorized to access the database. - if !self - .auth - .lock() - .unwrap() - .identify(ctx.tenant.clone(), ctx.access_token.clone()) - { - return ErrNoCause { - msg: format!("tenant: {:?} unauthorized", ctx.tenant), - code: StatusCode::UNAUTHORIZED, - } - .fail(); - } - let write_context = req.context.clone(); let resp = if self.cluster_with_meta { self.handle_write_with_meta(ctx, req).await? @@ -467,8 +453,7 @@ impl Proxy { endpoint, tonic::Request::new(table_write_request), ctx.forwarded_from, - ctx.tenant, - ctx.access_token, + ctx.authorization, do_write, ) .await; diff --git a/src/server/src/consts.rs b/src/server/src/consts.rs index 680e607833..b7b9831002 100644 --- a/src/server/src/consts.rs +++ b/src/server/src/consts.rs @@ -21,6 +21,8 @@ pub const CATALOG_HEADER: &str = "x-horaedb-catalog"; /// Header of schema name pub const SCHEMA_HEADER: &str = "x-horaedb-schema"; +/// Header of tenant name +pub const TENANT_HEADER: &str = "x-horaedb-access-tenant"; /// Header of content encoding type pub const CONTENT_ENCODING_HEADER: &str = "content-encoding"; diff --git a/src/server/src/grpc/mod.rs b/src/server/src/grpc/mod.rs index 1c53f205dc..66c45893a9 100644 --- a/src/server/src/grpc/mod.rs +++ b/src/server/src/grpc/mod.rs @@ -37,6 +37,7 @@ use logger::{info, warn}; use macros::define_result; use notifier::notifier::RequestNotifiers; use proxy::{ + auth::auth_with_file::AuthWithFile, forward, hotspot::HotspotRecorder, instance::InstanceRef, @@ -47,7 +48,7 @@ use runtime::{JoinHandle, Runtime}; use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; use table_engine::engine::EngineRuntimes; use tokio::sync::oneshot::{self, Sender}; -use tonic::transport::Server; +use tonic::{codegen::InterceptedService, transport::Server}; use wal::manager::OpenedWals; use self::remote_engine_service::QueryDedup; @@ -113,6 +114,9 @@ pub enum Error { #[snafu(display("Missing HotspotRecorder.\nBacktrace:\n{}", backtrace))] MissingHotspotRecorder { backtrace: Backtrace }, + #[snafu(display("Missing auth.\nBacktrace:\n{}", backtrace))] + MissingAuth { backtrace: Backtrace }, + #[snafu(display("Catalog name is not utf8.\nBacktrace:\n{}", backtrace))] ParseCatalogName { source: std::string::FromUtf8Error, @@ -158,7 +162,7 @@ define_result!(Error); /// Rpc services manages all grpc services of the server. pub struct RpcServices { serve_addr: SocketAddr, - rpc_server: StorageServiceServer, + rpc_server: InterceptedService, AuthWithFile>, meta_rpc_server: Option>, remote_engine_server: RemoteEngineServiceServer, runtime: Arc, @@ -212,6 +216,7 @@ impl RpcServices { } pub struct Builder { + auth: Option, endpoint: String, timeout: Option, runtimes: Option>, @@ -226,6 +231,7 @@ pub struct Builder { impl Builder { pub fn new() -> Self { Self { + auth: None, endpoint: "0.0.0.0:8381".to_string(), timeout: None, runtimes: None, @@ -238,6 +244,11 @@ impl Builder { } } + pub fn auth(mut self, auth: AuthWithFile) -> Self { + self.auth = Some(auth); + self + } + pub fn endpoint(mut self, endpoint: String) -> Self { self.endpoint = endpoint; self @@ -287,6 +298,7 @@ impl Builder { impl Builder { pub fn build(self) -> Result { + let auth = self.auth.context(MissingAuth)?; let runtimes = self.runtimes.context(MissingRuntimes)?; let instance = self.instance.context(MissingInstance)?; let opened_wals = self.opened_wals.context(MissingWals)?; @@ -330,7 +342,7 @@ impl Builder { runtimes, timeout: self.timeout, }; - let rpc_server = StorageServiceServer::new(storage_service); + let rpc_server = StorageServiceServer::with_interceptor(storage_service, auth); let serve_addr = self.endpoint.parse().context(InvalidRpcServeAddr)?; diff --git a/src/server/src/grpc/storage_service/mod.rs b/src/server/src/grpc/storage_service/mod.rs index 1230b80d6b..228c1742c9 100644 --- a/src/server/src/grpc/storage_service/mod.rs +++ b/src/server/src/grpc/storage_service/mod.rs @@ -35,7 +35,7 @@ use horaedbproto::{ }, }; use http::StatusCode; -use proxy::{auth::TENANT_TOKEN_HEADER, Context, Proxy, FORWARDED_FROM}; +use proxy::{auth::AUTHORIZATION, Context, Proxy, FORWARDED_FROM}; use table_engine::engine::EngineRuntimes; use time_ext::InstantExt; @@ -151,8 +151,7 @@ impl StorageService for StorageServiceImpl { let ctx = Context::new( self.timeout, get_forwarded_from(&req), - get_tenant(&req), - get_access_token(&req), + get_authorization(&req), ); let stream = self.stream_sql_query_internal(ctx, proxy, req).await; @@ -171,15 +170,9 @@ fn get_forwarded_from(req: &tonic::Request) -> Option { .map(|value| value.to_str().unwrap().to_string()) } -fn get_tenant(req: &tonic::Request) -> Option { +fn get_authorization(req: &tonic::Request) -> Option { req.metadata() - .get(TENANT_TOKEN_HEADER) - .map(|value| value.to_str().unwrap().to_string()) -} - -fn get_access_token(req: &tonic::Request) -> Option { - req.metadata() - .get(TENANT_TOKEN_HEADER) + .get(AUTHORIZATION) .map(|value| value.to_str().unwrap().to_string()) } @@ -192,8 +185,7 @@ impl StorageServiceImpl { let ctx = Context::new( self.timeout, get_forwarded_from(&req), - get_tenant(&req), - get_access_token(&req), + get_authorization(&req), ); let req = req.into_inner(); let proxy = self.proxy.clone(); @@ -224,8 +216,7 @@ impl StorageServiceImpl { let ctx = Context::new( self.timeout, get_forwarded_from(&req), - get_tenant(&req), - get_access_token(&req), + get_authorization(&req), ); let req = req.into_inner(); @@ -266,8 +257,7 @@ impl StorageServiceImpl { let ctx = Context::new( self.timeout, get_forwarded_from(&req), - get_tenant(&req), - get_access_token(&req), + get_authorization(&req), ); let proxy = self.proxy.clone(); @@ -297,8 +287,7 @@ impl StorageServiceImpl { let ctx = Context::new( self.timeout, get_forwarded_from(&req), - get_tenant(&req), - get_access_token(&req), + get_authorization(&req), ); let req = req.into_inner(); @@ -337,8 +326,7 @@ impl StorageServiceImpl { let ctx = Context::new( self.timeout, get_forwarded_from(&req), - get_tenant(&req), - get_access_token(&req), + get_authorization(&req), ); let req = req.into_inner(); @@ -378,8 +366,7 @@ impl StorageServiceImpl { let ctx = Context::new( self.timeout, get_forwarded_from(&req), - get_tenant(&req), - get_access_token(&req), + get_authorization(&req), ); let mut stream = req.into_inner(); let proxy = self.proxy.clone(); diff --git a/src/server/src/http.rs b/src/server/src/http.rs index c0200db288..95ce7a1889 100644 --- a/src/server/src/http.rs +++ b/src/server/src/http.rs @@ -37,7 +37,6 @@ use macros::define_result; use profile::Profiler; use prom_remote_api::web; use proxy::{ - auth::{ADMIN_TENANT, TENANT_HEADER, TENANT_TOKEN_HEADER}, context::RequestContext, handlers::{self}, http::sql::{convert_output, Request}, @@ -733,13 +732,9 @@ impl Service { header::optional::(consts::CATALOG_HEADER) .and(header::optional::(consts::SCHEMA_HEADER)) - .and(header::optional::(TENANT_HEADER)) - .and(header::optional::(TENANT_TOKEN_HEADER)) + .and(header::optional::(consts::TENANT_HEADER)) .and_then( - move |catalog: Option<_>, - schema: Option<_>, - _tenant: Option<_>, - access_token: Option<_>| { + move |catalog: Option<_>, schema: Option<_>, _tenant: Option<_>| { // Clone the captured variables let default_catalog = default_catalog.clone(); let schema = schema.unwrap_or_else(|| default_schema.clone()); @@ -748,8 +743,6 @@ impl Service { .catalog(catalog.unwrap_or(default_catalog)) .schema(schema) .timeout(timeout) - .tenant(Some(ADMIN_TENANT.to_string())) - .access_token(access_token) .build() .context(CreateContext) .map_err(reject::custom) diff --git a/src/server/src/mysql/worker.rs b/src/server/src/mysql/worker.rs index 37d9f676ab..b25e756bc5 100644 --- a/src/server/src/mysql/worker.rs +++ b/src/server/src/mysql/worker.rs @@ -23,7 +23,7 @@ use logger::{error, info}; use opensrv_mysql::{ AsyncMysqlShim, ErrorKind, InitWriter, QueryResultWriter, StatementMetaWriter, }; -use proxy::{auth::ADMIN_TENANT, context::RequestContext, http::sql::Request, Proxy}; +use proxy::{context::RequestContext, http::sql::Request, Proxy}; use snafu::ResultExt; use crate::{ @@ -152,7 +152,6 @@ where .catalog(session.catalog().to_string()) .schema(session.schema().to_string()) .timeout(self.timeout) - .tenant(Some(ADMIN_TENANT.to_string())) .build() .context(CreateContext) } diff --git a/src/server/src/postgresql/handler.rs b/src/server/src/postgresql/handler.rs index 4ed80e795c..feecbca8f7 100644 --- a/src/server/src/postgresql/handler.rs +++ b/src/server/src/postgresql/handler.rs @@ -31,7 +31,7 @@ use pgwire::{ }, error::{PgWireError, PgWireResult}, }; -use proxy::{auth::ADMIN_TENANT, context::RequestContext, http::sql::Request, Proxy}; +use proxy::{context::RequestContext, http::sql::Request, Proxy}; use snafu::ResultExt; use crate::postgresql::error::{CreateContext, Result}; @@ -89,7 +89,6 @@ impl PostgresqlHandler { .catalog(default_catalog) .schema(default_schema) .timeout(self.timeout) - .tenant(Some(ADMIN_TENANT.to_string())) .build() .context(CreateContext) } diff --git a/src/server/src/server.rs b/src/server/src/server.rs index 7da3f276ba..9c3383f7d1 100644 --- a/src/server/src/server.rs +++ b/src/server/src/server.rs @@ -17,7 +17,7 @@ //! Server -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use catalog::manager::ManagerRef; use cluster::ClusterRef; @@ -29,7 +29,7 @@ use macros::define_result; use notifier::notifier::RequestNotifiers; use partition_table_engine::PartitionTableEngine; use proxy::{ - auth::{auth_with_file::AuthWithFile, AuthBase, AuthRef}, + auth::{auth_with_file::AuthWithFile, DEFAULT_AUTH_TYPE}, hotspot::HotspotRecorder, instance::{DynamicConfig, Instance, InstanceRef}, limiter::Limiter, @@ -456,23 +456,18 @@ impl Builder { .then(|| Arc::new(RequestNotifiers::default())); // Build auth - let auth: AuthRef = - if self.server_config.auth.enable && self.server_config.auth.auth_type == "file" { - Arc::new(Mutex::new(AuthWithFile::new( - self.server_config.auth.source.clone(), - ))) - } else { - Arc::new(Mutex::new(AuthBase)) - }; + let mut auth = if self.server_config.auth.enable + && self.server_config.auth.auth_type == DEFAULT_AUTH_TYPE + { + AuthWithFile::new(true, self.server_config.auth.source.clone()) + } else { + AuthWithFile::default() + }; // Load auth credential - auth.lock() - .unwrap() - .load_credential() - .context(LoadCredential)?; + auth.load_credential().context(LoadCredential)?; let proxy = Arc::new(Proxy::new( - auth, router.clone(), instance.clone(), self.server_config.forward, @@ -521,6 +516,7 @@ impl Builder { .context(BuildPostgresqlService)?; let rpc_services = grpc::Builder::new() + .auth(auth) .endpoint(grpc_endpoint.to_string()) .runtimes(engine_runtimes) .instance(instance.clone()) From 48e87902e085cef1733070f1dde8957ad616e2e8 Mon Sep 17 00:00:00 2001 From: baojinri Date: Tue, 14 May 2024 19:26:25 +0800 Subject: [PATCH 3/8] auth in http --- src/proxy/src/auth/auth_with_file.rs | 2 +- src/proxy/src/context.rs | 4 ++-- src/server/src/http.rs | 32 +++++++++++++++++++++++++++- src/server/src/server.rs | 1 + 4 files changed, 35 insertions(+), 4 deletions(-) diff --git a/src/proxy/src/auth/auth_with_file.rs b/src/proxy/src/auth/auth_with_file.rs index fe977f19a5..1652a829fd 100644 --- a/src/proxy/src/auth/auth_with_file.rs +++ b/src/proxy/src/auth/auth_with_file.rs @@ -69,7 +69,7 @@ impl AuthWithFile { Ok(()) } - fn identify(&self, authorization: Option) -> bool { + pub fn identify(&self, authorization: Option) -> bool { if !self.enable { return true; } diff --git a/src/proxy/src/context.rs b/src/proxy/src/context.rs index b4452609cc..02aa48c9b2 100644 --- a/src/proxy/src/context.rs +++ b/src/proxy/src/context.rs @@ -90,8 +90,8 @@ impl Builder { self } - pub fn authorization(mut self, tenant: Option) -> Self { - self.authorization = tenant; + pub fn authorization(mut self, authorization: Option) -> Self { + self.authorization = authorization; self } diff --git a/src/server/src/http.rs b/src/server/src/http.rs index 95ce7a1889..35a12926bb 100644 --- a/src/server/src/http.rs +++ b/src/server/src/http.rs @@ -37,6 +37,7 @@ use macros::define_result; use profile::Profiler; use prom_remote_api::web; use proxy::{ + auth::{auth_with_file::AuthWithFile, AUTHORIZATION}, context::RequestContext, handlers::{self}, http::sql::{convert_output, Request}, @@ -92,6 +93,9 @@ pub enum Error { #[snafu(display("Missing proxy.\nBacktrace:\n{}", backtrace))] MissingProxy { backtrace: Backtrace }, + #[snafu(display("Missing auth.\nBacktrace:\n{}", backtrace))] + MissingAuth { backtrace: Backtrace }, + #[snafu(display( "Fail to do heap profiling, err:{}.\nBacktrace:\n{}", source, @@ -148,6 +152,9 @@ pub enum Error { #[snafu(display("Querying shards is only supported in cluster mode"))] QueryShards {}, + + #[snafu(display("unauthenticated.\nBacktrace:\n{}", backtrace))] + UnAuthenticated { backtrace: Backtrace }, } define_result!(Error); @@ -176,6 +183,7 @@ impl TryFrom<&str> for ContentEncodingType { /// Endpoints beginning with /debug are for internal use, and may subject to /// breaking changes. pub struct Service { + auth: AuthWithFile, // In cluster mode, cluster is valid, while in stand-alone mode, cluster is None cluster: Option, proxy: Arc, @@ -729,16 +737,27 @@ impl Service { .default_schema_name() .to_string(); let timeout = self.config.timeout; + let auth = self.auth.clone(); header::optional::(consts::CATALOG_HEADER) .and(header::optional::(consts::SCHEMA_HEADER)) .and(header::optional::(consts::TENANT_HEADER)) + .and(header::optional::(AUTHORIZATION)) .and_then( - move |catalog: Option<_>, schema: Option<_>, _tenant: Option<_>| { + move |catalog: Option<_>, + schema: Option<_>, + _tenant: Option<_>, + authorization: Option<_>| { // Clone the captured variables let default_catalog = default_catalog.clone(); let schema = schema.unwrap_or_else(|| default_schema.clone()); + let auth = auth.clone(); + async move { + if !auth.identify(authorization) { + return UnAuthenticated.fail().map_err(reject::custom); + } + RequestContext::builder() .catalog(catalog.unwrap_or(default_catalog)) .schema(schema) @@ -800,6 +819,7 @@ impl Service { /// Service builder pub struct Builder { + auth: Option, config: HttpConfig, engine_runtimes: Option>, log_runtime: Option>, @@ -812,6 +832,7 @@ pub struct Builder { impl Builder { pub fn new(config: HttpConfig) -> Self { Self { + auth: None, config, engine_runtimes: None, log_runtime: None, @@ -822,6 +843,11 @@ impl Builder { } } + pub fn auth(mut self, auth: AuthWithFile) -> Self { + self.auth = Some(auth); + self + } + pub fn engine_runtimes(mut self, engine_runtimes: Arc) -> Self { self.engine_runtimes = Some(engine_runtimes); self @@ -860,12 +886,14 @@ impl Builder { let log_runtime = self.log_runtime.context(MissingLogRuntime)?; let config_content = self.config_content.context(MissingInstance)?; let proxy = self.proxy.context(MissingProxy)?; + let auth = self.auth.context(MissingAuth)?; let cluster = self.cluster; let opened_wals = self.opened_wals.context(MissingWal)?; let (tx, rx) = oneshot::channel(); let service = Service { + auth, cluster, proxy, engine_runtimes, @@ -908,6 +936,7 @@ fn error_to_status_code(err: &Error) -> StatusCode { | Error::MissingInstance { .. } | Error::MissingSchemaConfigProvider { .. } | Error::MissingProxy { .. } + | Error::MissingAuth { .. } | Error::ParseIpAddr { .. } | Error::ProfileHeap { .. } | Error::ProfileCPU { .. } @@ -919,6 +948,7 @@ fn error_to_status_code(err: &Error) -> StatusCode { | Error::QueryShards { .. } => StatusCode::BAD_REQUEST, Error::HandleUpdateLogLevel { .. } => StatusCode::INTERNAL_SERVER_ERROR, Error::QueryMaybeExceedTTL { .. } => StatusCode::OK, + Error::UnAuthenticated { .. } => StatusCode::UNAUTHORIZED, } } diff --git a/src/server/src/server.rs b/src/server/src/server.rs index 9c3383f7d1..62d996c0a0 100644 --- a/src/server/src/server.rs +++ b/src/server/src/server.rs @@ -484,6 +484,7 @@ impl Builder { )); let http_service = http::Builder::new(http_config) + .auth(auth.clone()) .engine_runtimes(engine_runtimes.clone()) .log_runtime(log_runtime) .config_content(config_content) From aad84faa97b3b9cf65ef7e32cdafb1fed558775c Mon Sep 17 00:00:00 2001 From: baojinri Date: Tue, 14 May 2024 23:19:36 +0800 Subject: [PATCH 4/8] fix for cr --- src/proxy/src/auth/mod.rs | 30 +++++-------------- .../auth/{auth_with_file.rs => with_file.rs} | 21 ++++++++----- src/proxy/src/lib.rs | 8 +++++ src/server/src/grpc/mod.rs | 2 +- src/server/src/http.rs | 22 +++----------- src/server/src/server.rs | 14 ++++----- 6 files changed, 41 insertions(+), 56 deletions(-) rename src/proxy/src/auth/{auth_with_file.rs => with_file.rs} (85%) diff --git a/src/proxy/src/auth/mod.rs b/src/proxy/src/auth/mod.rs index 735e0a3287..b0b5269123 100644 --- a/src/proxy/src/auth/mod.rs +++ b/src/proxy/src/auth/mod.rs @@ -15,37 +15,23 @@ // specific language governing permissions and limitations // under the License. -//! The proxy module provides features such as forwarding and authentication, -//! adapts to different protocols. - -use macros::define_result; use serde::{Deserialize, Serialize}; -use snafu::Snafu; - -pub mod auth_with_file; - -#[derive(Debug, Snafu)] -pub enum Error { - #[snafu(display("Failed to open file, err:{}.", source))] - OpenFile { source: std::io::Error }, - - #[snafu(display("Failed to read line, err:{}.", source))] - ReadLine { source: std::io::Error }, - #[snafu(display("File not existed, file path:{}", path))] - FileNotExisted { path: String }, -} - -define_result!(Error); +pub mod with_file; /// Header of authorization pub const AUTHORIZATION: &str = "authorization"; -pub const DEFAULT_AUTH_TYPE: &str = "file"; +#[derive(Debug, Clone, Deserialize, Serialize, Default)] +pub enum AuthType { + #[default] + #[serde(rename = "file")] + File, +} #[derive(Debug, Clone, Deserialize, Serialize, Default)] pub struct Config { pub enable: bool, - pub auth_type: String, + pub auth_type: AuthType, pub source: String, } diff --git a/src/proxy/src/auth/auth_with_file.rs b/src/proxy/src/auth/with_file.rs similarity index 85% rename from src/proxy/src/auth/auth_with_file.rs rename to src/proxy/src/auth/with_file.rs index 1652a829fd..96067dfb81 100644 --- a/src/proxy/src/auth/auth_with_file.rs +++ b/src/proxy/src/auth/with_file.rs @@ -15,16 +15,17 @@ // specific language governing permissions and limitations // under the License. -//! The proxy module provides features such as forwarding and authentication, -//! adapts to different protocols. - use std::{collections::HashSet, fs::File, io, io::BufRead, path::Path}; use base64::encode; +use generic_error::BoxError; use snafu::ResultExt; use tonic::service::Interceptor; -use crate::auth::{FileNotExisted, OpenFile, ReadLine, Result, AUTHORIZATION}; +use crate::{ + auth::AUTHORIZATION, + error::{Internal, InternalNoCause, Result}, +}; #[derive(Debug, Clone, Default)] pub struct AuthWithFile { @@ -49,17 +50,21 @@ impl AuthWithFile { let path = Path::new(&self.file_path); if !path.exists() { - return FileNotExisted { - path: self.file_path.clone(), + return InternalNoCause { + msg: format!("file not existed: {:?}", path), } .fail(); } - let file = File::open(path).context(OpenFile)?; + let file = File::open(path).box_err().context(Internal { + msg: "failed to open file", + })?; let reader = io::BufReader::new(file); for line in reader.lines() { - let line = line.context(ReadLine)?; + let line = line.box_err().context(Internal { + msg: "failed to read line", + })?; let mut buf = Vec::with_capacity(line.len() + 1); buf.extend_from_slice(line.as_bytes()); let auth = encode(&buf); diff --git a/src/proxy/src/lib.rs b/src/proxy/src/lib.rs index f2c36b13c7..5488164032 100644 --- a/src/proxy/src/lib.rs +++ b/src/proxy/src/lib.rs @@ -81,6 +81,7 @@ use table_engine::{ use tonic::{transport::Channel, IntoRequest}; use crate::{ + auth::with_file::AuthWithFile, context::RequestContext, error::{ErrNoCause, ErrWithCause, Error, Internal, Result}, forward::{ForwardRequest, ForwardResult, Forwarder, ForwarderRef}, @@ -107,6 +108,7 @@ impl Default for SubTableAccessPerm { } pub struct Proxy { + auth: AuthWithFile, router: Arc, forwarder: ForwarderRef, instance: InstanceRef, @@ -124,6 +126,7 @@ pub struct Proxy { impl Proxy { #[allow(clippy::too_many_arguments)] pub fn new( + auth: AuthWithFile, router: Arc, instance: InstanceRef, forward_config: forward::Config, @@ -145,6 +148,7 @@ impl Proxy { )); Self { + auth, router, instance, forwarder, @@ -533,6 +537,10 @@ impl Proxy { }) } } + + pub fn check_auth(&self, authorization: Option) -> bool { + self.auth.identify(authorization) + } } #[derive(Clone, Debug)] diff --git a/src/server/src/grpc/mod.rs b/src/server/src/grpc/mod.rs index 66c45893a9..7b02a3a2a2 100644 --- a/src/server/src/grpc/mod.rs +++ b/src/server/src/grpc/mod.rs @@ -37,7 +37,7 @@ use logger::{info, warn}; use macros::define_result; use notifier::notifier::RequestNotifiers; use proxy::{ - auth::auth_with_file::AuthWithFile, + auth::with_file::AuthWithFile, forward, hotspot::HotspotRecorder, instance::InstanceRef, diff --git a/src/server/src/http.rs b/src/server/src/http.rs index 35a12926bb..ef23e0dfc4 100644 --- a/src/server/src/http.rs +++ b/src/server/src/http.rs @@ -37,7 +37,7 @@ use macros::define_result; use profile::Profiler; use prom_remote_api::web; use proxy::{ - auth::{auth_with_file::AuthWithFile, AUTHORIZATION}, + auth::AUTHORIZATION, context::RequestContext, handlers::{self}, http::sql::{convert_output, Request}, @@ -93,9 +93,6 @@ pub enum Error { #[snafu(display("Missing proxy.\nBacktrace:\n{}", backtrace))] MissingProxy { backtrace: Backtrace }, - #[snafu(display("Missing auth.\nBacktrace:\n{}", backtrace))] - MissingAuth { backtrace: Backtrace }, - #[snafu(display( "Fail to do heap profiling, err:{}.\nBacktrace:\n{}", source, @@ -183,7 +180,6 @@ impl TryFrom<&str> for ContentEncodingType { /// Endpoints beginning with /debug are for internal use, and may subject to /// breaking changes. pub struct Service { - auth: AuthWithFile, // In cluster mode, cluster is valid, while in stand-alone mode, cluster is None cluster: Option, proxy: Arc, @@ -737,7 +733,7 @@ impl Service { .default_schema_name() .to_string(); let timeout = self.config.timeout; - let auth = self.auth.clone(); + let proxy = self.proxy.clone(); header::optional::(consts::CATALOG_HEADER) .and(header::optional::(consts::SCHEMA_HEADER)) @@ -751,10 +747,10 @@ impl Service { // Clone the captured variables let default_catalog = default_catalog.clone(); let schema = schema.unwrap_or_else(|| default_schema.clone()); - let auth = auth.clone(); + let proxy = proxy.clone(); async move { - if !auth.identify(authorization) { + if !proxy.check_auth(authorization) { return UnAuthenticated.fail().map_err(reject::custom); } @@ -819,7 +815,6 @@ impl Service { /// Service builder pub struct Builder { - auth: Option, config: HttpConfig, engine_runtimes: Option>, log_runtime: Option>, @@ -832,7 +827,6 @@ pub struct Builder { impl Builder { pub fn new(config: HttpConfig) -> Self { Self { - auth: None, config, engine_runtimes: None, log_runtime: None, @@ -843,11 +837,6 @@ impl Builder { } } - pub fn auth(mut self, auth: AuthWithFile) -> Self { - self.auth = Some(auth); - self - } - pub fn engine_runtimes(mut self, engine_runtimes: Arc) -> Self { self.engine_runtimes = Some(engine_runtimes); self @@ -886,14 +875,12 @@ impl Builder { let log_runtime = self.log_runtime.context(MissingLogRuntime)?; let config_content = self.config_content.context(MissingInstance)?; let proxy = self.proxy.context(MissingProxy)?; - let auth = self.auth.context(MissingAuth)?; let cluster = self.cluster; let opened_wals = self.opened_wals.context(MissingWal)?; let (tx, rx) = oneshot::channel(); let service = Service { - auth, cluster, proxy, engine_runtimes, @@ -936,7 +923,6 @@ fn error_to_status_code(err: &Error) -> StatusCode { | Error::MissingInstance { .. } | Error::MissingSchemaConfigProvider { .. } | Error::MissingProxy { .. } - | Error::MissingAuth { .. } | Error::ParseIpAddr { .. } | Error::ProfileHeap { .. } | Error::ProfileCPU { .. } diff --git a/src/server/src/server.rs b/src/server/src/server.rs index 62d996c0a0..f7cd72ec7b 100644 --- a/src/server/src/server.rs +++ b/src/server/src/server.rs @@ -29,7 +29,7 @@ use macros::define_result; use notifier::notifier::RequestNotifiers; use partition_table_engine::PartitionTableEngine; use proxy::{ - auth::{auth_with_file::AuthWithFile, DEFAULT_AUTH_TYPE}, + auth::{with_file::AuthWithFile, AuthType}, hotspot::HotspotRecorder, instance::{DynamicConfig, Instance, InstanceRef}, limiter::Limiter, @@ -138,7 +138,7 @@ pub enum Error { BuildQueryEngine { source: query_engine::error::Error }, #[snafu(display("Failed to load auth credential, err:{source}"))] - LoadCredential { source: proxy::auth::Error }, + LoadCredential { source: proxy::error::Error }, } define_result!(Error); @@ -456,10 +456,10 @@ impl Builder { .then(|| Arc::new(RequestNotifiers::default())); // Build auth - let mut auth = if self.server_config.auth.enable - && self.server_config.auth.auth_type == DEFAULT_AUTH_TYPE - { - AuthWithFile::new(true, self.server_config.auth.source.clone()) + let mut auth = if self.server_config.auth.enable { + match self.server_config.auth.auth_type { + AuthType::File => AuthWithFile::new(true, self.server_config.auth.source.clone()), + } } else { AuthWithFile::default() }; @@ -468,6 +468,7 @@ impl Builder { auth.load_credential().context(LoadCredential)?; let proxy = Arc::new(Proxy::new( + auth.clone(), router.clone(), instance.clone(), self.server_config.forward, @@ -484,7 +485,6 @@ impl Builder { )); let http_service = http::Builder::new(http_config) - .auth(auth.clone()) .engine_runtimes(engine_runtimes.clone()) .log_runtime(log_runtime) .config_content(config_content) From cbb75eccb3b72c28f949930f75d2f777cdb3e9b2 Mon Sep 17 00:00:00 2001 From: baojinri Date: Tue, 14 May 2024 23:24:13 +0800 Subject: [PATCH 5/8] modify buf len --- src/proxy/src/auth/with_file.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/proxy/src/auth/with_file.rs b/src/proxy/src/auth/with_file.rs index 96067dfb81..869c772c75 100644 --- a/src/proxy/src/auth/with_file.rs +++ b/src/proxy/src/auth/with_file.rs @@ -65,7 +65,7 @@ impl AuthWithFile { let line = line.box_err().context(Internal { msg: "failed to read line", })?; - let mut buf = Vec::with_capacity(line.len() + 1); + let mut buf = Vec::with_capacity(line.len()); buf.extend_from_slice(line.as_bytes()); let auth = encode(&buf); self.auth.insert(format!("Basic {}", auth)); From f169ee38322bb33b04c31624cc65b7f59a76ac06 Mon Sep 17 00:00:00 2001 From: baojinri Date: Tue, 14 May 2024 23:34:59 +0800 Subject: [PATCH 6/8] add authorization to RequestContext --- src/server/src/http.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/server/src/http.rs b/src/server/src/http.rs index ef23e0dfc4..da74ee8472 100644 --- a/src/server/src/http.rs +++ b/src/server/src/http.rs @@ -750,7 +750,7 @@ impl Service { let proxy = proxy.clone(); async move { - if !proxy.check_auth(authorization) { + if !proxy.check_auth(authorization.clone()) { return UnAuthenticated.fail().map_err(reject::custom); } @@ -758,6 +758,7 @@ impl Service { .catalog(catalog.unwrap_or(default_catalog)) .schema(schema) .timeout(timeout) + .authorization(authorization) .build() .context(CreateContext) .map_err(reject::custom) From 6c58696e07e9042e4fa02771cad99931b33b70fd Mon Sep 17 00:00:00 2001 From: jiacai2050 Date: Wed, 15 May 2024 11:12:28 +0800 Subject: [PATCH 7/8] remove unwrap --- src/proxy/src/auth/with_file.rs | 13 +++++++++---- src/server/src/grpc/storage_service/mod.rs | 8 +------- 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/src/proxy/src/auth/with_file.rs b/src/proxy/src/auth/with_file.rs index 869c772c75..a38e669dc1 100644 --- a/src/proxy/src/auth/with_file.rs +++ b/src/proxy/src/auth/with_file.rs @@ -74,6 +74,7 @@ impl AuthWithFile { Ok(()) } + // TODO: currently we only support basic auth pub fn identify(&self, authorization: Option) -> bool { if !self.enable { return true; @@ -86,15 +87,19 @@ impl AuthWithFile { } } +pub fn get_authorization(req: &tonic::Request) -> Option { + req.metadata() + .get(AUTHORIZATION) + .and_then(|value| value.to_str().ok().map(String::from)) +} + impl Interceptor for AuthWithFile { fn call( &mut self, request: tonic::Request<()>, ) -> std::result::Result, tonic::Status> { - let metadata = request.metadata(); - let authorization = metadata - .get(AUTHORIZATION) - .map(|v| v.to_str().unwrap().to_string()); + // TODO: extract username from request + let authorization = get_authorization(&request); if self.identify(authorization) { Ok(request) } else { diff --git a/src/server/src/grpc/storage_service/mod.rs b/src/server/src/grpc/storage_service/mod.rs index 228c1742c9..9cf1fa2237 100644 --- a/src/server/src/grpc/storage_service/mod.rs +++ b/src/server/src/grpc/storage_service/mod.rs @@ -35,7 +35,7 @@ use horaedbproto::{ }, }; use http::StatusCode; -use proxy::{auth::AUTHORIZATION, Context, Proxy, FORWARDED_FROM}; +use proxy::{auth::with_file::get_authorization, Context, Proxy, FORWARDED_FROM}; use table_engine::engine::EngineRuntimes; use time_ext::InstantExt; @@ -170,12 +170,6 @@ fn get_forwarded_from(req: &tonic::Request) -> Option { .map(|value| value.to_str().unwrap().to_string()) } -fn get_authorization(req: &tonic::Request) -> Option { - req.metadata() - .get(AUTHORIZATION) - .map(|value| value.to_str().unwrap().to_string()) -} - // TODO: Use macros to simplify duplicate code impl StorageServiceImpl { async fn route_internal( From 970ef2abe5cecccaffe65d3c4a8c91bae3a17bab Mon Sep 17 00:00:00 2001 From: jiacai2050 Date: Wed, 15 May 2024 16:44:34 +0800 Subject: [PATCH 8/8] decode auth --- src/proxy/src/auth/with_file.rs | 51 +++++++++++++++++++++++++-------- 1 file changed, 39 insertions(+), 12 deletions(-) diff --git a/src/proxy/src/auth/with_file.rs b/src/proxy/src/auth/with_file.rs index a38e669dc1..116005cee4 100644 --- a/src/proxy/src/auth/with_file.rs +++ b/src/proxy/src/auth/with_file.rs @@ -15,11 +15,15 @@ // specific language governing permissions and limitations // under the License. -use std::{collections::HashSet, fs::File, io, io::BufRead, path::Path}; +use std::{ + collections::HashMap, + fs::File, + io::{self, BufRead}, + path::Path, +}; -use base64::encode; use generic_error::BoxError; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; use tonic::service::Interceptor; use crate::{ @@ -31,7 +35,8 @@ use crate::{ pub struct AuthWithFile { enable: bool, file_path: String, - auth: HashSet, + // name -> password + users: HashMap, } impl AuthWithFile { @@ -39,10 +44,11 @@ impl AuthWithFile { Self { enable, file_path, - auth: HashSet::new(), + users: HashMap::new(), } } + // Load a csv format config pub fn load_credential(&mut self) -> Result<()> { if !self.enable { return Ok(()); @@ -65,23 +71,44 @@ impl AuthWithFile { let line = line.box_err().context(Internal { msg: "failed to read line", })?; - let mut buf = Vec::with_capacity(line.len()); - buf.extend_from_slice(line.as_bytes()); - let auth = encode(&buf); - self.auth.insert(format!("Basic {}", auth)); + let (username, password) = line.split_once(',').with_context(|| InternalNoCause { + msg: format!("invalid line: {:?}", line), + })?; + self.users + .insert(username.to_string(), password.to_string()); } Ok(()) } // TODO: currently we only support basic auth - pub fn identify(&self, authorization: Option) -> bool { + // This function should return Result + pub fn identify(&self, input: Option) -> bool { if !self.enable { return true; } - match authorization { - Some(auth) => self.auth.contains(&auth), + let input = match input { + Some(v) => v, + None => return false, + }; + let input = match input.split_once("Basic ") { + Some((_, encoded)) => match base64::decode(encoded) { + Ok(v) => v, + Err(_e) => return false, + }, + None => return false, + }; + let input = match std::str::from_utf8(&input) { + Ok(v) => v, + Err(_e) => return false, + }; + match input.split_once(':') { + Some((user, pass)) => self + .users + .get(user) + .map(|expected| expected == pass) + .unwrap_or_default(), None => false, } }