From b9b55d60d9f591689ab1a9fd190f51e4a2691ffe Mon Sep 17 00:00:00 2001 From: wangbo Date: Mon, 19 Sep 2022 07:21:16 +0000 Subject: [PATCH 1/2] chore(sharding): update test case Signed-off-by: wangbo --- pisa-proxy/Cargo.lock | 18 ++++- .../src/readwritesplitting/rule_match.rs | 6 +- .../src/readwritesplitting/static_rw.rs | 2 + .../strategy/src/sharding_rewrite/mod.rs | 75 +++++++------------ .../runtime/mysql/src/transaction_fsm.rs | 1 - 5 files changed, 52 insertions(+), 50 deletions(-) diff --git a/pisa-proxy/Cargo.lock b/pisa-proxy/Cargo.lock index 139a6d21..7b215b14 100644 --- a/pisa-proxy/Cargo.lock +++ b/pisa-proxy/Cargo.lock @@ -1700,6 +1700,15 @@ dependencies = [ "either", ] +[[package]] +name = "itertools" +version = "0.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8bf247779e67a9082a4790b45e71ac7cfd1321331a5c856a74a9faebdab78d0" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "0.4.8" @@ -2593,7 +2602,7 @@ checksum = "32d3ebd75ac2679c2af3a92246639f9fcc8a442ee420719cc4fe195b98dd5fa3" dependencies = [ "bytes", "heck 0.3.3", - "itertools", + "itertools 0.9.0", "log", "multimap", "petgraph", @@ -2610,7 +2619,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "169a15f3008ecb5160cba7d37bcd690a7601b6d30cfb87a117d45e59d52af5d4" dependencies = [ "anyhow", - "itertools", + "itertools 0.9.0", "proc-macro2", "quote", "syn", @@ -3015,7 +3024,9 @@ dependencies = [ "endpoint", "error", "futures", + "indexmap", "iota", + "lazy_static", "loadbalance", "mysql_parser", "mysql_protocol", @@ -3575,6 +3586,7 @@ checksum = "9091b6114800a5f2141aee1d1b9d6ca3592ac062dc5decb3764ec5895a47b4eb" name = "strategy" version = "0.1.0" dependencies = [ + "aho-corasick", "async-trait", "chrono", "conn_pool", @@ -3583,8 +3595,10 @@ dependencies = [ "error", "futures", "indexmap", + "itertools 0.10.4", "lazy_static", "loadbalance", + "mysql_parser", "mysql_protocol", "parking_lot 0.12.0", "regex", diff --git a/pisa-proxy/proxy/strategy/src/readwritesplitting/rule_match.rs b/pisa-proxy/proxy/strategy/src/readwritesplitting/rule_match.rs index 02d030d8..f47f26a4 100644 --- a/pisa-proxy/proxy/strategy/src/readwritesplitting/rule_match.rs +++ b/pisa-proxy/proxy/strategy/src/readwritesplitting/rule_match.rs @@ -371,6 +371,7 @@ impl RouteBalance for GenericRuleMatchInner { mod test { use endpoint::endpoint::Endpoint; use loadbalance::balance::*; + use indexmap::IndexMap; use super::RulesMatchBuilder; use crate::{config::*, readwritesplitting::ReadWriteEndpoint, RouteBalance, RouteInput}; @@ -384,6 +385,7 @@ mod test { regex: vec![String::from("^select")], target: TargetRole::Read, algorithm_name: AlgorithmName::Random, + node_group_name: vec![String::from("")], }), ReadWriteSplittingRule::Regex(RegexRule { name: String::from("t2"), @@ -391,6 +393,7 @@ mod test { regex: vec![String::from("^insert")], target: TargetRole::Read, algorithm_name: AlgorithmName::Random, + node_group_name: vec![String::from("")], }), ]; @@ -415,7 +418,8 @@ mod test { }], }; - let mut m = RulesMatchBuilder::build(rules, default_target, rw_endpoint); + let endpoint_group = IndexMap::new(); + let mut m = RulesMatchBuilder::build(rules, default_target, endpoint_group, rw_endpoint); let (b, target) = m.get(&RouteInput::Statement("insert")); let endpoint = b.next(); assert_eq!(target, TargetRole::Read); diff --git a/pisa-proxy/proxy/strategy/src/readwritesplitting/static_rw.rs b/pisa-proxy/proxy/strategy/src/readwritesplitting/static_rw.rs index db741424..aa117b80 100644 --- a/pisa-proxy/proxy/strategy/src/readwritesplitting/static_rw.rs +++ b/pisa-proxy/proxy/strategy/src/readwritesplitting/static_rw.rs @@ -78,6 +78,7 @@ mod test { regex: vec![String::from("^select")], target: TargetRole::Read, algorithm_name: AlgorithmName::Random, + node_group_name: vec![String::from("")], }), ReadWriteSplittingRule::Regex(RegexRule { name: String::from("t2"), @@ -85,6 +86,7 @@ mod test { regex: vec![String::from("^insert")], target: TargetRole::ReadWrite, algorithm_name: AlgorithmName::Random, + node_group_name: vec![String::from("")], }), ]; diff --git a/pisa-proxy/proxy/strategy/src/sharding_rewrite/mod.rs b/pisa-proxy/proxy/strategy/src/sharding_rewrite/mod.rs index 46cf5ff4..3dd20077 100644 --- a/pisa-proxy/proxy/strategy/src/sharding_rewrite/mod.rs +++ b/pisa-proxy/proxy/strategy/src/sharding_rewrite/mod.rs @@ -18,6 +18,7 @@ use std::vec; use endpoint::endpoint::Endpoint; use indexmap::IndexMap; +use crc32fast::Hasher; use mysql_parser::ast::{SqlStmt, Visitor, TableIdent}; use crate::{config::{Sharding, StrategyType, ShardingAlgorithmName}, rewrite::{ShardingRewriter, ShardingRewriteInput}, route::BoxError}; @@ -34,7 +35,6 @@ impl CalcShardingIdx for u64 { ShardingAlgorithmName::Mod => { Some(self.wrapping_rem(id)) }, - _ => None } } @@ -46,7 +46,6 @@ impl CalcShardingIdx for i64 { ShardingAlgorithmName::Mod => { Some(self.wrapping_rem(id) as u64) }, - _ => None } } @@ -57,8 +56,7 @@ impl CalcShardingIdx for f64 { match algo { ShardingAlgorithmName::Mod => { Some((self % id).round() as u64) - }, - + } _ => None } } @@ -81,7 +79,6 @@ pub struct DatabaseChange { pub struct ShardingRewriteOutput { pub changes: Vec, pub target_sql: String, - pub endpoint: Endpoint, pub data_source: DataSource, } @@ -112,6 +109,7 @@ impl ShardingRewrite { raw_sql: "".to_string(), endpoints, has_rw, + crc32_hash: Hasher::new(), } } @@ -219,8 +217,7 @@ impl ShardingRewrite { ShardingRewriteOutput { changes, target_sql, - endpoint: ep.take().unwrap().clone(), - data_source: DataSource::None, + data_source: DataSource::Endpoint(ep.take().unwrap().clone()), } ] ) @@ -267,19 +264,10 @@ impl ShardingRewrite { None => continue } }, - Err(e ) => return Err(e) } } - //for v in try_where { - // let v = v?; - // match v { - // Some((idx, num, _)) => wheres.push((idx, num)), - // None => continue - // } - //} - let expect_sum = wheres[0].1 as usize * wheres.len(); let sum: usize = wheres.iter().map(|x| x.1).sum::() as usize; @@ -325,8 +313,7 @@ impl ShardingRewrite { ShardingRewriteOutput { changes, target_sql: target_sql.to_string(), - endpoint: ep.take().unwrap().clone(), - data_source: DataSource::None + data_source: DataSource::Endpoint(ep.take().unwrap().clone()) } ] ) @@ -384,7 +371,8 @@ impl ShardingRewrite { ).flatten().collect::>() } - fn parse_where<'b>(meta: &'b WhereMeta, algo: &ShardingAlgorithmName, actual_nodes_length: u64, query_id: u8, sharding_column: &str) -> Result, BoxError> { + fn parse_where<'b>(meta: &'b WhereMeta, algo: &ShardingAlgorithmName, sharding_count: u64, query_id: u8, sharding_column: &str) -> Result, BoxError> { + match meta { WhereMeta::BinaryExpr { left, right } => { if left != sharding_column { @@ -394,17 +382,17 @@ impl ShardingRewrite { let num = match right { WhereMetaRightDataType::Num(val) => { let val = val.parse::()?; - val.calc(algo, actual_nodes_length) + val.calc(algo, sharding_count) }, WhereMetaRightDataType::SignedNum(val) => { let val = val.parse::()?; - val.calc(algo, actual_nodes_length as i64) + val.calc(algo, sharding_count as i64) }, WhereMetaRightDataType::FloatNum(val) => { let val = val.parse::()?; - val.calc(algo, actual_nodes_length as f64) + val.calc(algo, sharding_count as f64) } _ => return Ok(None) }; @@ -445,12 +433,11 @@ impl ShardingRewrite { offset = change.target.len() - change.span.len(); } - let endpoint = self.endpoints[group].clone(); + let ep = self.endpoints[group].clone(); output.push(ShardingRewriteOutput { changes: changes.into_iter().map(|x| RewriteChange::DatabaseChange(x)).collect(), target_sql, - endpoint, - data_source: DataSource::None, + data_source: DataSource::Endpoint(ep), }) } output @@ -488,12 +475,11 @@ impl ShardingRewrite { offset = change.target.len() - change.span.len(); } - let endpoint = self.endpoints[0].clone(); + let ep = self.endpoints[0].clone(); output.push(ShardingRewriteOutput { changes: changes.into_iter().map(|x| RewriteChange::DatabaseChange(x)).collect(), target_sql: target_sql.to_string(), - endpoint, - data_source: DataSource::None + data_source: DataSource::Endpoint(ep) }) } @@ -507,13 +493,12 @@ impl ShardingRewrite { if actual_node.len() == 0 { target.push_str(schema); target.push('.'); - target.push_str(&format!("{}000{}", &table.name, table_idx.to_string())); + target.push_str(&format!("{}{:05}", &table.name, table_idx)); } else { target.push_str(actual_node); target.push_str("."); target.push_str(&table.name); - } - //target.push(' '); + } target } @@ -541,8 +526,6 @@ impl ShardingRewriter for ShardingRewrite { } } - - #[cfg(test)] mod test { use endpoint::endpoint::Endpoint; @@ -637,7 +620,7 @@ mod test { let raw_sql = "SELECT idx from db.tshard where idx = 3"; let parser = Parser::new(); let mut ast = parser.parse(raw_sql).unwrap(); - let sr = ShardingRewrite::new(config.0.clone(), config.1.clone(), false); + let mut sr = ShardingRewrite::new(config.0.clone(), config.1.clone(), false); sr.set_raw_sql(raw_sql.to_string()); let meta = sr.get_meta(&mut ast[0]); @@ -645,7 +628,7 @@ mod test { assert_eq!(res[0].target_sql, "SELECT idx from ds1.tshard where idx = 3"); let raw_sql = "SELECT idx from db.tshard where idx = 3 and idx = (SELECT idx from db.tshard where idx = 3)"; - let sr = ShardingRewrite::new(config.0.clone(), config.1.clone(), false); + let mut sr = ShardingRewrite::new(config.0.clone(), config.1.clone(), false); sr.set_raw_sql(raw_sql.to_string()); let mut ast = parser.parse(raw_sql).unwrap(); let meta = sr.get_meta(&mut ast[0]); @@ -653,7 +636,7 @@ mod test { assert_eq!(res[0].target_sql, "SELECT idx from ds1.tshard where idx = 3 and idx = (SELECT idx from ds1.tshard where idx = 3)"); let raw_sql = "SELECT idx from db.tshard where idx = 3 and idx = (SELECT idx from db.tshard where idx = 4)"; - let sr = ShardingRewrite::new(config.0.clone(), config.1, false); + let mut sr = ShardingRewrite::new(config.0.clone(), config.1, false); sr.set_raw_sql(raw_sql.to_string()); let mut ast = parser.parse(raw_sql).unwrap(); let meta = sr.get_meta(&mut ast[0]); @@ -681,10 +664,10 @@ mod test { assert_eq!( res.into_iter().map(|x| x.target_sql).collect::>(), vec![ - "SELECT idx from db.tshard0000 where idx > 3", - "SELECT idx from db.tshard0001 where idx > 3", - "SELECT idx from db.tshard0002 where idx > 3", - "SELECT idx from db.tshard0003 where idx > 3", + "SELECT idx from db.tshard00000 where idx > 3", + "SELECT idx from db.tshard00001 where idx > 3", + "SELECT idx from db.tshard00002 where idx > 3", + "SELECT idx from db.tshard00003 where idx > 3", ], ); @@ -694,7 +677,7 @@ mod test { sr.set_raw_sql(raw_sql); let meta = sr.get_meta(&mut ast[0]); let res = sr.table_strategy(meta).unwrap(); - assert_eq!(res[0].target_sql, "SELECT idx from db.tshard0000 where idx = 4".to_string()); + assert_eq!(res[0].target_sql, "SELECT idx from db.tshard00000 where idx = 4".to_string()); let raw_sql = "SELECT idx from db.tshard where idx = 3 and idx = (SELECT idx from db.tshard where idx = 3)".to_string(); let mut ast = parser.parse(&raw_sql).unwrap(); @@ -702,7 +685,7 @@ mod test { sr.set_raw_sql(raw_sql); let meta = sr.get_meta(&mut ast[0]); let res = sr.table_strategy(meta).unwrap(); - assert_eq!(res[0].target_sql, "SELECT idx from db.tshard0003 where idx = 3 and idx = (SELECT idx from db.tshard0003 where idx = 3)".to_string()); + assert_eq!(res[0].target_sql, "SELECT idx from db.tshard00003 where idx = 3 and idx = (SELECT idx from db.tshard00003 where idx = 3)".to_string()); let raw_sql = "SELECT idx from db.tshard where idx = 3 and idx = (SELECT idx from db.tshard where idx = 4)".to_string(); let mut ast = parser.parse(&raw_sql).unwrap(); @@ -713,10 +696,10 @@ mod test { assert_eq!( res.into_iter().map(|x| x.target_sql).collect::>(), vec![ - "SELECT idx from db.tshard0000 where idx = 3 and idx = (SELECT idx from db.tshard0000 where idx = 4)", - "SELECT idx from db.tshard0001 where idx = 3 and idx = (SELECT idx from db.tshard0001 where idx = 4)", - "SELECT idx from db.tshard0002 where idx = 3 and idx = (SELECT idx from db.tshard0002 where idx = 4)", - "SELECT idx from db.tshard0003 where idx = 3 and idx = (SELECT idx from db.tshard0003 where idx = 4)", + "SELECT idx from db.tshard00000 where idx = 3 and idx = (SELECT idx from db.tshard00000 where idx = 4)", + "SELECT idx from db.tshard00001 where idx = 3 and idx = (SELECT idx from db.tshard00001 where idx = 4)", + "SELECT idx from db.tshard00002 where idx = 3 and idx = (SELECT idx from db.tshard00002 where idx = 4)", + "SELECT idx from db.tshard00003 where idx = 3 and idx = (SELECT idx from db.tshard00003 where idx = 4)", ], ); } diff --git a/pisa-proxy/runtime/mysql/src/transaction_fsm.rs b/pisa-proxy/runtime/mysql/src/transaction_fsm.rs index 2bf3ea43..70e9280d 100644 --- a/pisa-proxy/runtime/mysql/src/transaction_fsm.rs +++ b/pisa-proxy/runtime/mysql/src/transaction_fsm.rs @@ -98,7 +98,6 @@ pub fn query_rewrite( .map(|x| ShardingRewriteOutput { changes: vec![], target_sql: raw_sql.clone(), - endpoint: x.clone(), data_source: strategy::sharding_rewrite::DataSource::Endpoint(x.clone()), }) .collect::>() From cb082e6b5b48237acc85ab6f24754e1f896d8f0e Mon Sep 17 00:00:00 2001 From: wangbo Date: Mon, 19 Sep 2022 07:25:43 +0000 Subject: [PATCH 2/2] delete: delete unuse code Signed-off-by: wangbo --- pisa-proxy/proxy/strategy/src/sharding_rewrite/mod.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/pisa-proxy/proxy/strategy/src/sharding_rewrite/mod.rs b/pisa-proxy/proxy/strategy/src/sharding_rewrite/mod.rs index 3dd20077..c3381f16 100644 --- a/pisa-proxy/proxy/strategy/src/sharding_rewrite/mod.rs +++ b/pisa-proxy/proxy/strategy/src/sharding_rewrite/mod.rs @@ -109,7 +109,6 @@ impl ShardingRewrite { raw_sql: "".to_string(), endpoints, has_rw, - crc32_hash: Hasher::new(), } }