Skip to content

Commit

Permalink
feat: add basic codec support for binary le && binary unsafe
Browse files Browse the repository at this point in the history
  • Loading branch information
PureWhiteWu committed May 23, 2023
1 parent 814cf9d commit bc99b7b
Show file tree
Hide file tree
Showing 10 changed files with 2,453 additions and 84 deletions.
7 changes: 4 additions & 3 deletions pilota-build/src/codegen/thrift/ty.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,12 +349,13 @@ impl ThriftBackend {
let read_el = self.codegen_decode_ty(helper, ty);
format! {
r#"
{{
unsafe {{
let list_ident = {read_list_begin};
let mut val = Vec::with_capacity(list_ident.size);
for _ in 0..list_ident.size {{
val.push({read_el});
for i in 0..list_ident.size {{
*val.get_unchecked_mut(i) = {read_el};
}};
val.set_len(list_ident.size);
{read_list_end};
val
}}
Expand Down
50 changes: 28 additions & 22 deletions pilota-build/test_data/thrift/wrapper_arc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -616,22 +616,24 @@ pub mod wrapper_arc {
id = Some(protocol.read_faststr()?);
}
Some(2) if field_ident.field_type == ::pilota::thrift::TType::List => {
name2 = Some({
name2 = Some(unsafe {
let list_ident = protocol.read_list_begin()?;
let mut val = Vec::with_capacity(list_ident.size);
for _ in 0..list_ident.size {
val.push({
for i in 0..list_ident.size {
*val.get_unchecked_mut(i) = unsafe {
let list_ident = protocol.read_list_begin()?;
let mut val = Vec::with_capacity(list_ident.size);
for _ in 0..list_ident.size {
val.push(::std::sync::Arc::new(
for i in 0..list_ident.size {
*val.get_unchecked_mut(i) = ::std::sync::Arc::new(
::pilota::thrift::Message::decode(protocol)?,
));
);
}
val.set_len(list_ident.size);
protocol.read_list_end()?;
val
});
};
}
val.set_len(list_ident.size);
protocol.read_list_end()?;
val
});
Expand All @@ -642,14 +644,15 @@ pub mod wrapper_arc {
let mut val =
::std::collections::HashMap::with_capacity(map_ident.size);
for _ in 0..map_ident.size {
val.insert(protocol.read_i32()?, {
val.insert(protocol.read_i32()?, unsafe {
let list_ident = protocol.read_list_begin()?;
let mut val = Vec::with_capacity(list_ident.size);
for _ in 0..list_ident.size {
val.push(::std::sync::Arc::new(
for i in 0..list_ident.size {
*val.get_unchecked_mut(i) = ::std::sync::Arc::new(
::pilota::thrift::Message::decode(protocol)?,
));
);
}
val.set_len(list_ident.size);
protocol.read_list_end()?;
val
});
Expand Down Expand Up @@ -736,25 +739,27 @@ pub mod wrapper_arc {
id = Some(protocol.read_faststr().await?);
}
Some(2) if field_ident.field_type == ::pilota::thrift::TType::List => {
name2 = Some({
name2 = Some(unsafe {
let list_ident = protocol.read_list_begin().await?;
let mut val = Vec::with_capacity(list_ident.size);
for _ in 0..list_ident.size {
val.push({
for i in 0..list_ident.size {
*val.get_unchecked_mut(i) = unsafe {
let list_ident = protocol.read_list_begin().await?;
let mut val = Vec::with_capacity(list_ident.size);
for _ in 0..list_ident.size {
val.push(::std::sync::Arc::new(
for i in 0..list_ident.size {
*val.get_unchecked_mut(i) = ::std::sync::Arc::new(
::pilota::thrift::Message::decode_async(
protocol,
)
.await?,
));
);
}
val.set_len(list_ident.size);
protocol.read_list_end().await?;
val
});
};
}
val.set_len(list_ident.size);
protocol.read_list_end().await?;
val
});
Expand All @@ -765,17 +770,18 @@ pub mod wrapper_arc {
let mut val =
::std::collections::HashMap::with_capacity(map_ident.size);
for _ in 0..map_ident.size {
val.insert(protocol.read_i32().await?, {
val.insert(protocol.read_i32().await?, unsafe {
let list_ident = protocol.read_list_begin().await?;
let mut val = Vec::with_capacity(list_ident.size);
for _ in 0..list_ident.size {
val.push(::std::sync::Arc::new(
for i in 0..list_ident.size {
*val.get_unchecked_mut(i) = ::std::sync::Arc::new(
::pilota::thrift::Message::decode_async(
protocol,
)
.await?,
));
);
}
val.set_len(list_ident.size);
protocol.read_list_end().await?;
val
});
Expand Down
4 changes: 4 additions & 0 deletions pilota/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,7 @@ rand = "0.8"
[[bench]]
name = "faststr"
harness = false

[[bench]]
name = "thrift_binary"
harness = false
262 changes: 262 additions & 0 deletions pilota/benches/thrift_binary.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,262 @@
#![allow(clippy::redundant_clone)]

use bytes::BytesMut;
use criterion::{black_box, criterion_group, criterion_main};
use pilota::thrift::{TInputProtocol, TOutputProtocol};
use rand::{self, Rng};

fn binary_bench(c: &mut criterion::Criterion) {
let size = std::env::var("SIZE")
.unwrap_or("10000".to_string())
.parse()
.unwrap();
let mut group = c.benchmark_group("Bench Thrift Binary");
let mut v: Vec<i64> = Vec::with_capacity(size);
for _ in 0..size {
v.push(rand::thread_rng().gen());
}
let mut buf = BytesMut::new();

let mut p = pilota::thrift::binary::TBinaryProtocol::new(&mut buf, true);
for i in &v {
p.write_i64(*i).unwrap();
}
drop(p);
assert_eq!(buf.len(), 8 * size);

let mut buf_le = BytesMut::new();

let mut p = pilota::thrift::binary_le::TBinaryProtocol::new(&mut buf_le, true);
for i in &v {
p.write_i64(*i).unwrap();
}
drop(p);
assert_eq!(buf_le.len(), 8 * size);

let b = buf_le.clone();
let mut v2: Vec<i64> = Vec::with_capacity(size);
let src = b.as_ptr();
let dst = v2.as_mut_ptr();
unsafe {
std::ptr::copy_nonoverlapping(src, dst as *mut u8, size * 8);
v2.set_len(size);
}
assert_eq!(v, v2);

group.bench_function("big endian decode vec i64", |b| {
b.iter(|| {
black_box({
let b = buf.clone();
black_box(read_be(b, size));
});
})
});

group.bench_function("big endian decode vec i64 unsafe", |b| {
b.iter(|| {
black_box({
let b = buf.clone();
black_box(read_be_unsafe(b, size));
});
})
});

group.bench_function("big endian decode vec i64 unsafe vec", |b| {
b.iter(|| {
black_box({
let b = buf.clone();
black_box(read_be_unsafe_vec(b, size));
});
})
});

group.bench_function("big endian decode vec i64 unsafe optimized", |b| {
b.iter(|| {
black_box({
let b = buf.clone();
black_box(read_be_unsafe_optimized(b, size));
});
})
});

group.bench_function("big endian encode vec i64", |b| {
b.iter(|| {
black_box({
let mut b = BytesMut::with_capacity(8 * size);
black_box(write_be(&mut b, &v, size));
});
})
});

group.bench_function("big endian encode vec i64 unsafe", |b| {
b.iter(|| {
black_box({
let mut b = BytesMut::with_capacity(8 * size);
black_box(write_be_unsafe(&mut b, &v, size));
});
})
});

group.bench_function("little endian decode vec i64", |b| {
b.iter(|| {
black_box({
let b = buf_le.clone();
black_box(read_le(b, size));
});
})
});
group.bench_function("little endian decode vec i64 unsafe optimized", |b| {
b.iter(|| {
black_box({
let b = buf_le.clone();
black_box(read_le_unsafe_optimized(b, size));
});
})
});
group.bench_function("little endian decode vec i64 optimized", |b| {
b.iter(|| {
black_box({
let b = buf_le.clone();
black_box(read_le_optimized(b, size));
});
})
});

group.bench_function("alloc vec", |b| {
b.iter(|| {
let mut b = buf_le.clone();
let _p = pilota::thrift::binary_le::TBinaryProtocol::new(&mut b, true);
let _: Vec<i64> = black_box(Vec::with_capacity(size));
})
});

group.finish();
}

#[inline(never)]
fn read_be(mut b: BytesMut, size: usize) -> Vec<i64> {
let mut p = pilota::thrift::binary::TBinaryProtocol::new(&mut b, true);
let mut v = Vec::with_capacity(size);
for _ in 0..size {
v.push(p.read_i64().unwrap());
}
v
}

#[inline(never)]
fn read_be_unsafe(mut b: BytesMut, size: usize) -> Vec<i64> {
unsafe {
let s = std::slice::from_raw_parts_mut(b.as_mut_ptr(), b.len());
let mut p = pilota::thrift::binary_unsafe::TBinaryProtocol::new(&mut b, s, true);
let mut v = Vec::with_capacity(size);
for _ in 0..size {
v.push(p.read_i64().unwrap());
}
v
}
}

#[inline(never)]
fn read_be_unsafe_vec(mut b: BytesMut, size: usize) -> Vec<i64> {
unsafe {
let s = std::slice::from_raw_parts_mut(b.as_mut_ptr(), b.len());
let mut p = pilota::thrift::binary_unsafe::TBinaryProtocol::new(&mut b, s, true);
let mut v = Vec::with_capacity(size);
for i in 0..size {
*v.get_unchecked_mut(i) = p.read_i64().unwrap();
}
v
}
}

#[inline(never)]
fn read_be_unsafe_optimized(b: BytesMut, size: usize) -> Vec<i64> {
unsafe {
let buf: &[u8] = b.as_ref();
assert!(buf.len() >= size * 8);
let mut index = 0;

let mut v = Vec::with_capacity(size);
for i in 0..size {
*v.get_unchecked_mut(i) = i64::from_be_bytes(
buf.get_unchecked(index..index + 8)
.try_into()
.unwrap_unchecked(),
);
index += 8;
}
v.set_len(size);
v
}
}

#[inline(never)]
fn write_be(b: &mut BytesMut, v: &Vec<i64>, size: usize) {
let mut p = pilota::thrift::binary::TBinaryProtocol::new(b, true);
for el in v {
p.write_i64(*el).unwrap();
}
}

#[inline(never)]
fn write_be_unsafe(b: &mut BytesMut, v: &Vec<i64>, size: usize) {
unsafe {
let s = std::slice::from_raw_parts_mut(b.as_mut_ptr(), b.len());
let mut p = pilota::thrift::binary_unsafe::TBinaryProtocol::new(b, s, true);
for el in v {
p.write_i64(*el).unwrap();
}
}
}

#[inline(never)]
fn read_le(mut b: BytesMut, size: usize) -> Vec<i64> {
let mut p = pilota::thrift::binary_le::TBinaryProtocol::new(&mut b, true);

let mut v = Vec::with_capacity(size);
for _ in 0..size {
v.push(p.read_i64().unwrap());
}
v
}

// cargo asm -p pilota --bench thrift_binary --native --full-name --keep-labels
// --simplify --rust
#[inline(never)]
fn read_le_unsafe_optimized(b: BytesMut, size: usize) -> Vec<i64> {
unsafe {
let buf: &[u8] = b.as_ref();
assert!(buf.len() >= size * 8);
let mut index = 0;

let mut v = Vec::with_capacity(size);
for i in 0..size {
*v.get_unchecked_mut(i) = i64::from_le_bytes(
buf.get_unchecked(index..index + 8)
.try_into()
.unwrap_unchecked(),
);
index += 8;
}
v.set_len(size);
v
}
}

#[inline(never)]
fn read_le_optimized(mut b: BytesMut, size: usize) -> Vec<i64> {
let _p = pilota::thrift::binary_le::TBinaryProtocol::new(&mut b, true);
let mut v: Vec<i64> = Vec::with_capacity(size);
let _ = black_box({
let src = b.as_ptr();
let dst = v.as_mut_ptr();
unsafe {
std::ptr::copy_nonoverlapping(src, dst as *mut u8, size * 8);
v.set_len(size);
}
});
v
}

criterion_group!(benches, binary_bench);
criterion_main!(benches);
Loading

0 comments on commit bc99b7b

Please sign in to comment.