From 8dd57cda78fa439762e64dc936596529a6abe595 Mon Sep 17 00:00:00 2001 From: wangbo Date: Mon, 28 Nov 2022 03:47:45 +0000 Subject: [PATCH 1/4] fix(sharding): fix min max error Signed-off-by: wangbo --- pisa-proxy/protocol/mysql/src/row.rs | 1 + .../runtime/mysql/src/server/executor.rs | 27 +++++++------------ 2 files changed, 10 insertions(+), 18 deletions(-) diff --git a/pisa-proxy/protocol/mysql/src/row.rs b/pisa-proxy/protocol/mysql/src/row.rs index 9918b4d5..8beddd9a 100644 --- a/pisa-proxy/protocol/mysql/src/row.rs +++ b/pisa-proxy/protocol/mysql/src/row.rs @@ -34,6 +34,7 @@ pub enum RowDataTyp> { Binary(RowDataBinary), } +#[derive(Clone, Debug)] pub struct RowPartData { pub data: Box<[u8]>, pub start_idx: usize, diff --git a/pisa-proxy/runtime/mysql/src/server/executor.rs b/pisa-proxy/runtime/mysql/src/server/executor.rs index 01ab07de..f9b75b2d 100644 --- a/pisa-proxy/runtime/mysql/src/server/executor.rs +++ b/pisa-proxy/runtime/mysql/src/server/executor.rs @@ -209,17 +209,10 @@ where while let Some(chunk) = stream.next().await { let mut chunk = chunk .into_par_iter().map(|x| x.unwrap()).collect::, _>>().map_err(ErrorKind::from)?; - - for i in chunk.iter() { - println!("or min_max {:?}", &i[..]); - } let ro = &req.rewrite_outputs[0]; Self::handle_min_max(ro, &mut chunk, row_data.clone(), is_binary)?; - for i in chunk.iter() { - println!("min_max {:?}", &i[..]); - } - + let avg_change = ro.changes.iter().find_map(|x| { if let RewriteChange::AvgChange(change) = x { Some(change) @@ -233,7 +226,6 @@ where let sum_field = avg.target.get(AVG_SUM).unwrap(); let (count_data, sum_data): (Vec<_>, Vec<_>) = chunk.par_iter().map(|x| -> Result<(u64, u64), Error> { - println!("xxx {:?}", &x[..]); let mut row_data = row_data.clone(); row_data.with_buf(&x[4..]); let count = decode_with_name::<&[u8], u64>(&mut row_data, &count_field, is_binary).map_err(|e| ErrorKind::Runtime(e))?.unwrap_or_else(|| 0); @@ -254,7 +246,6 @@ where let count: u64 = count_data.par_iter().sum(); let sum: u64 = sum_data.par_iter().sum(); - println!("count {:?}, sum {:?}", count, sum); chunk.par_iter_mut().for_each(|x| { let mut row_data = row_data.clone(); @@ -301,10 +292,8 @@ where let count_sum = chunk.par_iter().map(|x| { let mut row_data = row_data.clone(); row_data.with_buf(&x[4..]); - println!("count x {:?}", &x[4..]); decode_with_name::<&[u8], u64>(&mut row_data, &count_field.name, is_binary).unwrap().unwrap() }).sum::(); - println!("count_sun {:?}", count_sum); let chunk_data = &chunk[0]; let mut row_data = row_data.clone(); @@ -338,7 +327,7 @@ where } } - if chunk.par_iter().min() == chunk.par_iter().max() { + if chunk.par_iter().map(|x| &x[4..]).min() == chunk.par_iter().map(|x| &x[4..]).max() { let _ = req .framed .codec_mut() @@ -347,7 +336,6 @@ where } for row in chunk.iter() { - println!("end row {:?}", &row[..]); let _ = req .framed .codec_mut() @@ -367,7 +355,6 @@ where let (a, b) = get_min_max_value(&mut row_data, is_binary, &mmf.name, a, b); b.cmp(&a) }); - } FieldWrapFunc::Min => { chunk.par_sort_unstable_by(|a, b| { @@ -381,11 +368,16 @@ where } let chunk_data = &chunk[0]; + let ori_row_data = row_data.clone(); let mut row_data = row_data.clone(); row_data.with_buf(&chunk_data[4..]); let row_part_data = row_data.get_row_data_with_name(&mmf.name).map_err(|e| ErrorKind::Runtime(e))?.unwrap(); chunk.par_iter_mut().for_each(|x| { - row_data_cut_merge(x, &row_part_data, |data: &mut BytesMut| { + let mut row_data = ori_row_data.clone(); + row_data.with_buf(&x[4..]); + let ori_row_part_data = row_data.get_row_data_with_name(&mmf.name).unwrap().unwrap(); + + row_data_cut_merge(x, &ori_row_part_data, |data: &mut BytesMut| { if is_binary { data.extend_from_slice(&row_part_data.data); } else { @@ -650,9 +642,8 @@ where F: FnOnce(&mut BytesMut) { let mut data = ori_data.split_off(4); let mut data_remain = data.split_off(row_part_data.start_idx); - f(&mut data); - + let _ = data_remain.split_to(row_part_data.part_encode_length + row_part_data.part_data_length); data.extend_from_slice(&data_remain); ori_data.extend_from_slice(&data); From 0691b88c9b202dfa2850ba843806c3dbde307b00 Mon Sep 17 00:00:00 2001 From: wangbo Date: Tue, 29 Nov 2022 10:38:35 +0000 Subject: [PATCH 2/4] feat(sharding): support sum Signed-off-by: wangbo --- pisa-proxy/protocol/mysql/src/row.rs | 3 +- .../strategy/src/sharding_rewrite/meta.rs | 10 ++++ .../strategy/src/sharding_rewrite/mod.rs | 2 +- .../runtime/mysql/src/server/executor.rs | 53 ++++++++++++++++++- 4 files changed, 63 insertions(+), 5 deletions(-) diff --git a/pisa-proxy/protocol/mysql/src/row.rs b/pisa-proxy/protocol/mysql/src/row.rs index 8beddd9a..6adcd8b6 100644 --- a/pisa-proxy/protocol/mysql/src/row.rs +++ b/pisa-proxy/protocol/mysql/src/row.rs @@ -28,7 +28,7 @@ pub trait RowData> { fn get_row_data_with_name(&mut self, name: &str) -> value::Result; } -#[derive(Clone)] +#[derive(Clone, Debug)] pub enum RowDataTyp> { Text(RowDataText), Binary(RowDataBinary), @@ -239,7 +239,6 @@ impl> RowData for RowDataBinary { // Need to add packet header and null_map to returnd data let raw_data = &self.buf.as_ref()[start_pos + pos as usize..(start_pos + pos as usize + length as usize)]; - println!("eeeeeeeeeeeee {:?}", &raw_data[..]); return Ok(Some( RowPartData { data: raw_data.into(), diff --git a/pisa-proxy/proxy/strategy/src/sharding_rewrite/meta.rs b/pisa-proxy/proxy/strategy/src/sharding_rewrite/meta.rs index e8a4f319..1b15ab62 100644 --- a/pisa-proxy/proxy/strategy/src/sharding_rewrite/meta.rs +++ b/pisa-proxy/proxy/strategy/src/sharding_rewrite/meta.rs @@ -39,6 +39,7 @@ pub enum FieldWrapFunc { Min, Max, Count, + Sum, None, } @@ -48,6 +49,7 @@ impl AsRef for FieldWrapFunc { Self::Max => "max", Self::Min => "min", Self::Count => "count", + Self::Sum => "sum", Self::None => "none", } } @@ -378,6 +380,14 @@ impl Transformer for RewriteMetaData { ); } + AggFuncName::Sum => { + self.state = ScanState::FieldWrapFunc( + item.span, + FieldWrapFunc::Sum, + item.alias_name.clone(), + ); + } + _ => {} } return false; diff --git a/pisa-proxy/proxy/strategy/src/sharding_rewrite/mod.rs b/pisa-proxy/proxy/strategy/src/sharding_rewrite/mod.rs index 67a2d8ca..cfb3e598 100644 --- a/pisa-proxy/proxy/strategy/src/sharding_rewrite/mod.rs +++ b/pisa-proxy/proxy/strategy/src/sharding_rewrite/mod.rs @@ -1378,7 +1378,7 @@ impl ShardingRewrite { .iter() .filter_map(|f| { if let FieldMeta::Ident(meta) = f { - let is_match = matches!(meta.wrap_func, FieldWrapFunc::Max| FieldWrapFunc::Min |FieldWrapFunc::Count); + let is_match = matches!(meta.wrap_func, FieldWrapFunc::Max| FieldWrapFunc::Min |FieldWrapFunc::Count| FieldWrapFunc::Sum); return is_match.then(|| meta.clone()) } else { None diff --git a/pisa-proxy/runtime/mysql/src/server/executor.rs b/pisa-proxy/runtime/mysql/src/server/executor.rs index baa5c4e9..b375aef8 100644 --- a/pisa-proxy/runtime/mysql/src/server/executor.rs +++ b/pisa-proxy/runtime/mysql/src/server/executor.rs @@ -215,6 +215,7 @@ where .map_err(ErrorKind::from)?; let ro = &req.rewrite_outputs; + Self::handle_min_max(ro, &mut chunk, row_data.clone(), is_binary)?; let avg_change = get_avg_change(&ro.results[0].changes); @@ -257,10 +258,9 @@ where .par_iter() .cloned() .unzip(); - + let count: u64 = count_data.par_iter().sum(); let sum: u64 = sum_data.par_iter().sum(); - chunk.par_iter_mut().for_each(|x| { let mut row_data = row_data.clone(); row_data.with_buf(&x[4..]); @@ -410,6 +410,54 @@ where a.cmp(&b) }); } + FieldWrapFunc::Sum => { + let sum_data: Vec<_> = chunk + .par_iter() + .map(|x| -> Result { + let mut row_data = row_data.clone(); + row_data.with_buf(&x[4..]); + let sum = if is_binary { + let sum = decode_with_name::<&[u8], String>( + &mut row_data, + &agg.name, + is_binary, + ).unwrap(); + if let Some(sum) = sum { + sum.parse::().map_err(|e| ErrorKind::Runtime(e.into()))? + } else { + 0 + } + } else { + decode_with_name::<&[u8], u64>(&mut row_data, &agg.name, is_binary) + .map_err(|e| ErrorKind::Runtime(e))? + .unwrap_or_else(|| 0) + }; + + Ok(sum) + }) + .collect::, _>>().unwrap(); + + let sum: u64 = sum_data.par_iter().sum(); + chunk.par_iter_mut().for_each(|x| { + let mut row_data = row_data.clone(); + row_data.with_buf(&x[4..]); + let sum_data = + row_data.get_row_data_with_name(&agg.name).unwrap().unwrap(); + let part_data = RowPartData { + data: vec![].into(), + start_idx: sum_data.start_idx, + part_encode_length: sum_data.part_encode_length, + part_data_length: sum_data.part_data_length, + }; + + let sum = format!("{:.4}", sum as u64); + + row_data_cut_merge(x, &part_data, |data: &mut BytesMut| { + data.put_lenc_int(sum.len() as u64, false); + data.extend_from_slice(sum.as_bytes()); + }); + }); + } _ => {} } @@ -709,5 +757,6 @@ fn get_min_max_value<'a>( row_data.with_buf(&b[4..]); let b = decode_with_name::<&[u8], u64>(row_data, name, is_binary).unwrap().unwrap(); + (a, b) } From 61b8088fed579367ebf706097088c0a95181a3fb Mon Sep 17 00:00:00 2001 From: wangbo Date: Wed, 30 Nov 2022 09:32:07 +0000 Subject: [PATCH 3/4] chore(sharding): fix sum prepare Signed-off-by: wangbo --- pisa-proxy/runtime/mysql/src/server/executor.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pisa-proxy/runtime/mysql/src/server/executor.rs b/pisa-proxy/runtime/mysql/src/server/executor.rs index 8a58cd7e..635ec409 100644 --- a/pisa-proxy/runtime/mysql/src/server/executor.rs +++ b/pisa-proxy/runtime/mysql/src/server/executor.rs @@ -461,6 +461,10 @@ where _ => {} } + if chunk.par_iter().map(|x| &x[4..]).min() == chunk.par_iter().map(|x| &x[4..]).max() { + return Ok(()) + } + let chunk_data = &chunk[0]; let ori_row_data = row_data.clone(); let mut row_data = row_data.clone(); From bad0a84e6fe9d20ed2418b5e17fc52e666042a12 Mon Sep 17 00:00:00 2001 From: wangbo Date: Wed, 30 Nov 2022 09:34:31 +0000 Subject: [PATCH 4/4] chore(sharding): fix sum prepare Signed-off-by: wangbo --- pisa-proxy/runtime/mysql/src/server/executor.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pisa-proxy/runtime/mysql/src/server/executor.rs b/pisa-proxy/runtime/mysql/src/server/executor.rs index 635ec409..6ee854fb 100644 --- a/pisa-proxy/runtime/mysql/src/server/executor.rs +++ b/pisa-proxy/runtime/mysql/src/server/executor.rs @@ -258,7 +258,7 @@ where .par_iter() .cloned() .unzip(); - + let count: u64 = count_data.par_iter().sum(); let sum: u64 = sum_data.par_iter().sum(); chunk.par_iter_mut().for_each(|x| {