Skip to content

Commit

Permalink
Reat rewrite9 (#304)
Browse files Browse the repository at this point in the history
* feat(pisa-proxy, rewrite): WIP

Signed-off-by: xuanyuan300 <[email protected]>

* feat(pisa-proxy, rewrite): WIP add rewrite

Signed-off-by: xuanyuan300 <[email protected]>

* feat(pisa-proxy, rewrite): WIP: add rewrtite

Signed-off-by: xuanyuan300 <[email protected]>

* feat(pisa-proxy, rewrite): WIP: add rewrite, complete database sharding
stratey

Signed-off-by: xuanyuan300 <[email protected]>

* fix(pisa-proxy, strategy): fix unit test failed for rewritesplitting
Signed-off-by: xuanyuan300 <[email protected]>

* chore(pisa-proxy, rewrite): WIP: remove comments
Signed-off-by: xuanyuan300 <[email protected]>

* feat(pisa-proxy, rewrite): WIP: fix unit test failed
Signed-off-by: xuanyuan300 <[email protected]>

* feat(pisa-proxy, sharding): WIP: update config, strategy

Signed-off-by: xuanyuan300 <[email protected]>

* feat(pisa-proxy, sharding): WIP: update strategy, update RouteInput

Signed-off-by: xuanyuan300 <[email protected]>

* feat(pisa-proxy, sharding): rewriter integrated to reqContext

Signed-off-by: xuanyuan300 <[email protected]>

* feat(pisa-proxy, sharding): WIP: add route_sharding, update server

Signed-off-by: xuanyuan300 <[email protected]>

* feat(pisa-proxy, sharding): WIP: add executor

Signed-off-by: xuanyuan300 <[email protected]>

* feat(pisa-proxy, sharding): WIP: add stmt_cache, prepare_sharding_inner,
executor

Signed-off-by: xuanyuan300 <[email protected]>

* feat(pisa-proxy, sharding): WIP: add shard_execute_executor, update
stmt_cache

Signed-off-by: xuanyuan300 <[email protected]>

* feat(pisa-proxy, sharding): WIP: add shard_query_inner

Signed-off-by: xuanyuan300 <[email protected]>

* feat(pisa-proxy, sharding): WIP: update get_conn for fsm, update
sharding routestrategy

Signed-off-by: xuanyuan300 <[email protected]>

* fix(pisa-proxy, sharding): fix get_conn in fsma, add check shading
config whether is valid

Signed-off-by: xuanyuan300 <[email protected]>

* feat(pisa-proxy, sharding): WIP:  add shard_conn_cache in fsm

Signed-off-by: xuanyuan300 <[email protected]>

* feat(pisa-proxy, sharding): WIP: update stmt_cache
Signed-off-by: xuanyuan300 <[email protected]>

* chore(pisa-proxy, sharding): WIP: remove unused imports
Signed-off-by: xuanyuan300 <[email protected]>

Signed-off-by: xuanyuan300 <[email protected]>
  • Loading branch information
xuanyuan300 authored Sep 19, 2022
1 parent dcc1fcb commit bc9c113
Show file tree
Hide file tree
Showing 29 changed files with 2,662 additions and 323 deletions.
16 changes: 10 additions & 6 deletions pisa-proxy/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pisa-proxy/app/config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ api = { path = "../api" }
clap = { version = "3.2.12", features = ["env"] }
plugin = { path = "../../plugin" }
proxy = { path = "../../proxy" }
strategy = { path = "../../proxy/strategy" }
reqwest = { version = "0.11.10", features = ["blocking", "json"] }
runtime_mysql = { path = "../../runtime/mysql" }
serde = { version = "1.0.133", default-features = false, features = ["derive"] }
Expand Down
2 changes: 2 additions & 0 deletions pisa-proxy/app/config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::{env, fs::File, io::prelude::*};
use api::config::Admin;
use clap::{value_parser, Arg, Command};
use proxy::proxy::{MySQLNode, MySQLNodes, ProxiesConfig, ProxyConfig};
use strategy::config::NodeGroup;
use serde::{Deserialize, Serialize};
use tracing::trace;

Expand Down Expand Up @@ -243,6 +244,7 @@ pub struct PisaProxyConfig {
pub admin: Admin,
pub proxy: Option<ProxiesConfig>,
pub mysql: Option<MySQLNodes>,
pub nodegroup: Option<NodeGroup>,
pub shardingsphere_proxy: Option<MySQLNodes>,
pub version: Option<String>,
}
Expand Down
2 changes: 1 addition & 1 deletion pisa-proxy/app/server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ impl ProxyFactory for PisaProxyFactory {
match kind {
ProxyKind::MySQL => Box::new(runtime_mysql::mysql::MySQLProxy {
proxy_config: config,
// mysql_nodes: self.pisa_config.mysql_nodes.clone(),
mysql_nodes: self.pisa_config.get_mysql().to_vec(),
node_group: self.pisa_config.nodegroup.clone(),
pisa_version: self.pisa_config.get_version().to_string(),
// mysql_nodes: self.pisa_config.mysql.as_ref().unwrap().node.as_ref().unwrap().to_vec(),
// pisa_version: self.pisa_config.version.as_ref().unwrap().to_string(),
Expand Down
15 changes: 10 additions & 5 deletions pisa-proxy/parser/mysql/src/ast/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -895,8 +895,8 @@ pub enum Value {

Num {
span: Span,
// todo add parse i64 string
value: String,
signed: bool,
},

HexNum {
Expand All @@ -907,8 +907,8 @@ pub enum Value {

FloatNum {
span: Span,
// todo add parse f64 string
value: String,
signed: bool,
},

BinNum {
Expand Down Expand Up @@ -963,12 +963,17 @@ impl Value {
var_ident
}

Self::Num { span: _, value } => (*value).to_string(),
Self::Num { span: _, signed, value } | Self::FloatNum { span:_, value, signed }=> {
let mut value = (*value).to_string();
if *signed {
value.insert(0, '-');
}

value
}

Self::HexNum { span: _, value } => (*value).to_string(),

Self::FloatNum { span: _, value } => (*value).to_string(),

Self::BinNum { span: _, value } => (*value).to_string(),

Self::True => "true".to_string(),
Expand Down
59 changes: 43 additions & 16 deletions pisa-proxy/parser/mysql/src/ast/dml.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2714,21 +2714,15 @@ impl ValOrVals {
pub struct InsertVals {
pub span: Span,
pub val_ident: ValOrVals,
pub values: Vec<Vec<Expr>>,
pub values: Vec<RowValue>,
}

impl InsertVals {
pub fn format(&self) -> String {
let mut values = Vec::with_capacity(3);

for v in &self.values {
let row = vec![
"(".to_string(),
v.iter().map(|x| x.format()).collect::<Vec<String>>().join(","),
")".to_string(),
];

values.push(row.join(" "))
values.push(v.format())
}

vec![self.val_ident.format(), values.join(",")].join(" ")
Expand All @@ -2740,20 +2734,53 @@ impl Visitor for InsertVals {
let mut new_values = Vec::with_capacity(self.values.len());

for v in self.values.iter_mut() {
let mut sub_new_exprs = Vec::with_capacity(v.len());
for vv in v.iter_mut() {
let mut node = Node::Expr(vv);
tf.trans(&mut node);
let mut node = Node::RowValue(v);
tf.trans(&mut node);
let new_node = node.into_row_value().unwrap().visit(tf);
new_values.push(new_node);
}

let new_node = node.into_expr().unwrap().visit(tf);
sub_new_exprs.push(new_node);
}
self.values = new_values;

self.clone()
}
}

#[derive(Debug, Clone)]
pub struct RowValue {
pub span: Span,
pub values: Vec<Expr>
}

new_values.push(sub_new_exprs);
impl RowValue {
fn format(&self) -> String {
let values = vec![
"(".to_string(),
self.values.iter().map(|x| x.format()).collect::<Vec<String>>().join(","),
")".to_string(),
];

values.join(" ")
}
}


impl Visitor for RowValue {
fn visit<T: Transformer>(&mut self, tf: &mut T) -> Self {
let mut new_values = Vec::with_capacity(self.values.len());

for v in self.values.iter_mut() {
let mut node = Node::Expr(v);
tf.trans(&mut node);

let new_node = node.into_expr().unwrap().visit(tf);
new_values.push(new_node);
}

self.values = new_values;

tf.complete(&mut Node::RowValue(self));

self.clone()
}
}
Expand Down
20 changes: 16 additions & 4 deletions pisa-proxy/parser/mysql/src/grammar.y
Original file line number Diff line number Diff line change
Expand Up @@ -3704,10 +3704,13 @@ opt_equal -> Option<&'input str>:
| equal { Some($1) }
;

row_value -> Vec<Expr>:
row_value -> RowValue:
'(' opt_values ')'
{
$2
RowValue {
span: $span,
values: $2,
}
}
;

Expand Down Expand Up @@ -4488,20 +4491,23 @@ NUM_literal -> Value:
Value::Num {
span: $span,
value: $1,
signed: false,
}
}
| 'DECIMAL_NUM'
{
Value::Num {
span: $span,
value: String::from($lexer.span_str($1.as_ref().unwrap().span())),
signed: false,
}
}
| 'FLOAT_NUM'
{
Value::FloatNum {
span: $span,
value: String::from($lexer.span_str($1.as_ref().unwrap().span())),
signed: false,
}
}
;
Expand Down Expand Up @@ -4724,7 +4730,13 @@ signed_literal -> Value:
}
| '-' NUM_literal
{
$2
match $2 {
Value::Num { span, value, signed:_ } => {
Value::Num { span, value, signed: true}
},

_ => unreachable!()
}
}
;

Expand Down Expand Up @@ -5607,7 +5619,7 @@ value_or_values -> ValOrVals:
| VALUES { ValOrVals::Values }
;
values_list -> Vec<Vec<Expr>>:
values_list -> Vec<RowValue>:
values_list ',' row_value
{
$1.push($3);
Expand Down
66 changes: 65 additions & 1 deletion pisa-proxy/protocol/mysql/src/client/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::{
};

use bytes::{Buf, BufMut, BytesMut};
use futures::Stream;
use futures::{Stream, stream::Fuse};
use pin_project::pin_project;
use protocol_codegen::mysql_codec_convert;
use tokio::io::Interest;
Expand Down Expand Up @@ -98,6 +98,70 @@ impl ClientCodec {
}
}


#[pin_project]
pub struct MergeStream<S>
where
S: Stream + std::marker::Unpin,
{
pub inner: Vec<Fuse<S>>,
idx: usize,
buf: Vec<Option<S::Item>>,
length: usize
}

impl<S> MergeStream<S>
where
S: Stream + std::marker::Unpin,
{
pub fn new(inner: Vec<Fuse<S>>, length: usize) -> Self {
MergeStream {
inner,
idx: 0,
buf: Vec::with_capacity(length),
length,
}
}
}

impl<S> Stream for MergeStream<S>
where
S: Stream + std::marker::Unpin,
<S as Stream>::Item: std::fmt::Debug
{
type Item = Vec<Option<S::Item>>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let me = self.project();
loop {
let s = unsafe {
Pin::new_unchecked(me.inner.get_unchecked_mut(*me.idx))
};
match s.poll_next(cx) {
Poll::Ready(data) => {
*me.idx += 1;
me.buf.push(data);
},

Poll::Pending => return Poll::Pending,
};

if *me.idx == *me.length {
*me.idx = 0;
break
}

}

if me.buf.is_empty() || me.buf.iter().all(|x| x.is_none()) {
return Poll::Ready(None);
} else {
return Poll::Ready(Some(std::mem::replace(me.buf, Vec::with_capacity(*me.length))))
}

}
}


#[derive(Debug)]
#[pin_project]
pub struct ResultsetStream<'a> {
Expand Down
Loading

0 comments on commit bc9c113

Please sign in to comment.