Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat add sharding #304

Merged
merged 20 commits into from
Sep 19, 2022
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
4a466e9
feat(pisa-proxy, rewrite): WIP
xuanyuan300 Sep 13, 2022
af88b48
feat(pisa-proxy, rewrite): WIP add rewrite
xuanyuan300 Sep 14, 2022
3c8c444
feat(pisa-proxy, rewrite): WIP: add rewrtite
xuanyuan300 Sep 15, 2022
7eb522c
feat(pisa-proxy, rewrite): WIP: add rewrite, complete database sharding
xuanyuan300 Sep 15, 2022
0765e83
fix(pisa-proxy, strategy): fix unit test failed for rewritesplitting
xuanyuan300 Sep 15, 2022
db53f8f
chore(pisa-proxy, rewrite): WIP: remove comments
xuanyuan300 Sep 15, 2022
63cb229
feat(pisa-proxy, rewrite): WIP: fix unit test failed
xuanyuan300 Sep 15, 2022
3b8bad9
feat(pisa-proxy, sharding): WIP: update config, strategy
xuanyuan300 Sep 16, 2022
18bd6ac
feat(pisa-proxy, sharding): WIP: update strategy, update RouteInput
xuanyuan300 Sep 16, 2022
3e90e02
feat(pisa-proxy, sharding): rewriter integrated to reqContext
xuanyuan300 Sep 17, 2022
71b4b7d
feat(pisa-proxy, sharding): WIP: add route_sharding, update server
xuanyuan300 Sep 17, 2022
d80ba17
feat(pisa-proxy, sharding): WIP: add executor
xuanyuan300 Sep 18, 2022
fb95242
feat(pisa-proxy, sharding): WIP: add stmt_cache, prepare_sharding_inner,
xuanyuan300 Sep 18, 2022
ff33bfc
feat(pisa-proxy, sharding): WIP: add shard_execute_executor, update
xuanyuan300 Sep 18, 2022
09f3fcb
feat(pisa-proxy, sharding): WIP: add shard_query_inner
xuanyuan300 Sep 18, 2022
64adb9e
feat(pisa-proxy, sharding): WIP: update get_conn for fsm, update
xuanyuan300 Sep 18, 2022
5b4e4cc
fix(pisa-proxy, sharding): fix get_conn in fsma, add check shading
xuanyuan300 Sep 18, 2022
9668e19
feat(pisa-proxy, sharding): WIP: add shard_conn_cache in fsm
xuanyuan300 Sep 18, 2022
470fcd4
feat(pisa-proxy, sharding): WIP: update stmt_cache
xuanyuan300 Sep 19, 2022
66f5b53
chore(pisa-proxy, sharding): WIP: remove unused imports
xuanyuan300 Sep 19, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions pisa-proxy/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pisa-proxy/app/config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ api = { path = "../api" }
clap = { version = "3.2.12", features = ["env"] }
plugin = { path = "../../plugin" }
proxy = { path = "../../proxy" }
strategy = { path = "../../proxy/strategy" }
reqwest = { version = "0.11.10", features = ["blocking", "json"] }
runtime_mysql = { path = "../../runtime/mysql" }
serde = { version = "1.0.133", default-features = false, features = ["derive"] }
Expand Down
2 changes: 2 additions & 0 deletions pisa-proxy/app/config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::{env, fs::File, io::prelude::*};
use api::config::Admin;
use clap::{value_parser, Arg, Command};
use proxy::proxy::{MySQLNode, MySQLNodes, ProxiesConfig, ProxyConfig};
use strategy::config::NodeGroup;
use serde::{Deserialize, Serialize};
use tracing::trace;

Expand Down Expand Up @@ -243,6 +244,7 @@ pub struct PisaProxyConfig {
pub admin: Admin,
pub proxy: Option<ProxiesConfig>,
pub mysql: Option<MySQLNodes>,
pub nodegroup: Option<NodeGroup>,
pub shardingsphere_proxy: Option<MySQLNodes>,
pub version: Option<String>,
}
Expand Down
2 changes: 1 addition & 1 deletion pisa-proxy/app/server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ impl ProxyFactory for PisaProxyFactory {
match kind {
ProxyKind::MySQL => Box::new(runtime_mysql::mysql::MySQLProxy {
proxy_config: config,
// mysql_nodes: self.pisa_config.mysql_nodes.clone(),
mysql_nodes: self.pisa_config.get_mysql().to_vec(),
node_group: self.pisa_config.nodegroup.clone(),
pisa_version: self.pisa_config.get_version().to_string(),
// mysql_nodes: self.pisa_config.mysql.as_ref().unwrap().node.as_ref().unwrap().to_vec(),
// pisa_version: self.pisa_config.version.as_ref().unwrap().to_string(),
Expand Down
15 changes: 10 additions & 5 deletions pisa-proxy/parser/mysql/src/ast/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -895,8 +895,8 @@ pub enum Value {

Num {
span: Span,
// todo add parse i64 string
value: String,
signed: bool,
},

HexNum {
Expand All @@ -907,8 +907,8 @@ pub enum Value {

FloatNum {
span: Span,
// todo add parse f64 string
value: String,
signed: bool,
},

BinNum {
Expand Down Expand Up @@ -963,12 +963,17 @@ impl Value {
var_ident
}

Self::Num { span: _, value } => (*value).to_string(),
Self::Num { span: _, signed, value } | Self::FloatNum { span:_, value, signed }=> {
let mut value = (*value).to_string();
if *signed {
value.insert(0, '-');
}

value
}

Self::HexNum { span: _, value } => (*value).to_string(),

Self::FloatNum { span: _, value } => (*value).to_string(),

Self::BinNum { span: _, value } => (*value).to_string(),

Self::True => "true".to_string(),
Expand Down
59 changes: 43 additions & 16 deletions pisa-proxy/parser/mysql/src/ast/dml.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2714,21 +2714,15 @@ impl ValOrVals {
pub struct InsertVals {
pub span: Span,
pub val_ident: ValOrVals,
pub values: Vec<Vec<Expr>>,
pub values: Vec<RowValue>,
}

impl InsertVals {
pub fn format(&self) -> String {
let mut values = Vec::with_capacity(3);

for v in &self.values {
let row = vec![
"(".to_string(),
v.iter().map(|x| x.format()).collect::<Vec<String>>().join(","),
")".to_string(),
];

values.push(row.join(" "))
values.push(v.format())
}

vec![self.val_ident.format(), values.join(",")].join(" ")
Expand All @@ -2740,20 +2734,53 @@ impl Visitor for InsertVals {
let mut new_values = Vec::with_capacity(self.values.len());

for v in self.values.iter_mut() {
let mut sub_new_exprs = Vec::with_capacity(v.len());
for vv in v.iter_mut() {
let mut node = Node::Expr(vv);
tf.trans(&mut node);
let mut node = Node::RowValue(v);
tf.trans(&mut node);
let new_node = node.into_row_value().unwrap().visit(tf);
new_values.push(new_node);
}

let new_node = node.into_expr().unwrap().visit(tf);
sub_new_exprs.push(new_node);
}
self.values = new_values;

self.clone()
}
}

#[derive(Debug, Clone)]
pub struct RowValue {
pub span: Span,
pub values: Vec<Expr>
}

new_values.push(sub_new_exprs);
impl RowValue {
fn format(&self) -> String {
let values = vec![
"(".to_string(),
self.values.iter().map(|x| x.format()).collect::<Vec<String>>().join(","),
")".to_string(),
];

values.join(" ")
}
}


impl Visitor for RowValue {
fn visit<T: Transformer>(&mut self, tf: &mut T) -> Self {
let mut new_values = Vec::with_capacity(self.values.len());

for v in self.values.iter_mut() {
let mut node = Node::Expr(v);
tf.trans(&mut node);

let new_node = node.into_expr().unwrap().visit(tf);
new_values.push(new_node);
}

self.values = new_values;

tf.complete(&mut Node::RowValue(self));

self.clone()
}
}
Expand Down
20 changes: 16 additions & 4 deletions pisa-proxy/parser/mysql/src/grammar.y
Original file line number Diff line number Diff line change
Expand Up @@ -3704,10 +3704,13 @@ opt_equal -> Option<&'input str>:
| equal { Some($1) }
;

row_value -> Vec<Expr>:
row_value -> RowValue:
'(' opt_values ')'
{
$2
RowValue {
span: $span,
values: $2,
}
}
;

Expand Down Expand Up @@ -4488,20 +4491,23 @@ NUM_literal -> Value:
Value::Num {
span: $span,
value: $1,
signed: false,
}
}
| 'DECIMAL_NUM'
{
Value::Num {
span: $span,
value: String::from($lexer.span_str($1.as_ref().unwrap().span())),
signed: false,
}
}
| 'FLOAT_NUM'
{
Value::FloatNum {
span: $span,
value: String::from($lexer.span_str($1.as_ref().unwrap().span())),
signed: false,
}
}
;
Expand Down Expand Up @@ -4724,7 +4730,13 @@ signed_literal -> Value:
}
| '-' NUM_literal
{
$2
match $2 {
Value::Num { span, value, signed:_ } => {
Value::Num { span, value, signed: true}
},

_ => unreachable!()
}
}
;

Expand Down Expand Up @@ -5607,7 +5619,7 @@ value_or_values -> ValOrVals:
| VALUES { ValOrVals::Values }
;

values_list -> Vec<Vec<Expr>>:
values_list -> Vec<RowValue>:
values_list ',' row_value
{
$1.push($3);
Expand Down
66 changes: 65 additions & 1 deletion pisa-proxy/protocol/mysql/src/client/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::{
};

use bytes::{Buf, BufMut, BytesMut};
use futures::Stream;
use futures::{Stream, stream::Fuse};
use pin_project::pin_project;
use protocol_codegen::mysql_codec_convert;
use tokio::io::Interest;
Expand Down Expand Up @@ -98,6 +98,70 @@ impl ClientCodec {
}
}


#[pin_project]
pub struct MergeStream<S>
where
S: Stream + std::marker::Unpin,
{
pub inner: Vec<Fuse<S>>,
idx: usize,
buf: Vec<Option<S::Item>>,
length: usize
}

impl<S> MergeStream<S>
where
S: Stream + std::marker::Unpin,
{
pub fn new(inner: Vec<Fuse<S>>, length: usize) -> Self {
MergeStream {
inner,
idx: 0,
buf: Vec::with_capacity(length),
length,
}
}
}

impl<S> Stream for MergeStream<S>
where
S: Stream + std::marker::Unpin,
<S as Stream>::Item: std::fmt::Debug
{
type Item = Vec<Option<S::Item>>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let me = self.project();
loop {
let s = unsafe {
Pin::new_unchecked(me.inner.get_unchecked_mut(*me.idx))
};
match s.poll_next(cx) {
Poll::Ready(data) => {
*me.idx += 1;
me.buf.push(data);
},

Poll::Pending => return Poll::Pending,
};

if *me.idx == *me.length {
*me.idx = 0;
break
}

}

if me.buf.is_empty() || me.buf.iter().all(|x| x.is_none()) {
return Poll::Ready(None);
} else {
return Poll::Ready(Some(std::mem::replace(me.buf, Vec::with_capacity(*me.length))))
}

}
}


#[derive(Debug)]
#[pin_project]
pub struct ResultsetStream<'a> {
Expand Down
Loading