Skip to content

Commit

Permalink
feat: add basic codec support for binary le && binary unsafe
Browse files Browse the repository at this point in the history
  • Loading branch information
PureWhiteWu committed May 23, 2023
1 parent 814cf9d commit 6ade2c7
Show file tree
Hide file tree
Showing 9 changed files with 2,421 additions and 61 deletions.
5 changes: 3 additions & 2 deletions pilota-build/src/codegen/thrift/ty.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,9 +352,10 @@ impl ThriftBackend {
{{
let list_ident = {read_list_begin};
let mut val = Vec::with_capacity(list_ident.size);
for _ in 0..list_ident.size {{
val.push({read_el});
for i in 0..list_ident.size {{
unsafe {{ *val.get_unchecked_mut(i) = {read_el}; }}
}};
unsafe {{ val.set_len(list_ident.size); }}
{read_list_end};
val
}}
Expand Down
4 changes: 4 additions & 0 deletions pilota/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,7 @@ rand = "0.8"
[[bench]]
name = "faststr"
harness = false

[[bench]]
name = "thrift_binary"
harness = false
261 changes: 261 additions & 0 deletions pilota/benches/thrift_binary.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,261 @@
#![allow(clippy::redundant_clone)]

use bytes::BytesMut;
use criterion::{black_box, criterion_group, criterion_main};
use pilota::thrift::{TInputProtocol, TOutputProtocol};
use rand::{self, Rng};

fn binary_bench(c: &mut criterion::Criterion) {
let size = std::env::var("SIZE")
.unwrap_or("10000".to_string())
.parse()
.unwrap();
let mut group = c.benchmark_group("Bench Thrift Binary");
let mut v: Vec<i64> = Vec::with_capacity(size);
for _ in 0..size {
v.push(rand::thread_rng().gen());
}
let mut buf = BytesMut::new();

let mut p = pilota::thrift::binary::TBinaryProtocol::new(&mut buf, true);
for i in &v {
p.write_i64(*i).unwrap();
}
drop(p);
assert_eq!(buf.len(), 8 * size);

let mut buf_le = BytesMut::new();

let mut p = pilota::thrift::binary_le::TBinaryProtocol::new(&mut buf_le, true);
for i in &v {
p.write_i64(*i).unwrap();
}
drop(p);
assert_eq!(buf_le.len(), 8 * size);

let b = buf_le.clone();
let mut v2: Vec<i64> = Vec::with_capacity(size);
let src = b.as_ptr();
let dst = v2.as_mut_ptr();
unsafe {
std::ptr::copy_nonoverlapping(src, dst as *mut u8, size * 8);
v2.set_len(size);
}
assert_eq!(v, v2);

group.bench_function("big endian decode vec i64", |b| {
b.iter(|| {
black_box({
let b = buf.clone();
black_box(read_be(b, size));
});
})
});

group.bench_function("big endian decode vec i64 unsafe", |b| {
b.iter(|| {
black_box({
let b = buf.clone();
black_box(read_be_unsafe(b, size));
});
})
});

group.bench_function("big endian decode vec i64 unsafe vec", |b| {
b.iter(|| {
black_box({
let b = buf.clone();
black_box(read_be_unsafe_vec(b, size));
});
})
});

group.bench_function("big endian decode vec i64 unsafe optimized", |b| {
b.iter(|| {
black_box({
let b = buf.clone();
black_box(read_be_unsafe_optimized(b, size));
});
})
});

group.bench_function("big endian encode vec i64", |b| {
b.iter(|| {
black_box({
let mut b = BytesMut::with_capacity(8 * size);
black_box(write_be(&mut b, &v, size));
});
})
});

group.bench_function("big endian encode vec i64 unsafe", |b| {
b.iter(|| {
black_box({
let mut b = BytesMut::with_capacity(8 * size);
black_box(write_be_unsafe(&mut b, &v, size));
});
})
});

group.bench_function("little endian decode vec i64", |b| {
b.iter(|| {
black_box({
let b = buf_le.clone();
black_box(read_le(b, size));
});
})
});
group.bench_function("little endian decode vec i64 unsafe optimized", |b| {
b.iter(|| {
black_box({
let b = buf_le.clone();
black_box(read_le_unsafe_optimized(b, size));
});
})
});
group.bench_function("little endian decode vec i64 optimized", |b| {
b.iter(|| {
black_box({
let b = buf_le.clone();
black_box(read_le_optimized(b, size));
});
})
});

group.bench_function("alloc vec", |b| {
b.iter(|| {
let mut b = buf_le.clone();
let _p = pilota::thrift::binary_le::TBinaryProtocol::new(&mut b, true);
let _: Vec<i64> = black_box(Vec::with_capacity(size));
})
});

group.finish();
}

#[inline(never)]
fn read_be(mut b: BytesMut, size: usize) -> Vec<i64> {
let mut p = pilota::thrift::binary::TBinaryProtocol::new(&mut b, true);
let mut v = Vec::with_capacity(size);
for _ in 0..size {
v.push(p.read_i64().unwrap());
}
v
}

#[inline(never)]
fn read_be_unsafe(mut b: BytesMut, size: usize) -> Vec<i64> {
unsafe {
let s = std::slice::from_raw_parts_mut(b.as_mut_ptr(), b.len());
let mut p = pilota::thrift::binary_unsafe::TBinaryProtocol::new(&mut b, s, true);
let mut v = Vec::with_capacity(size);
for _ in 0..size {
v.push(p.read_i64().unwrap());
}
v
}
}

#[inline(never)]
fn read_be_unsafe_vec(mut b: BytesMut, size: usize) -> Vec<i64> {
unsafe {
let s = std::slice::from_raw_parts_mut(b.as_mut_ptr(), b.len());
let mut p = pilota::thrift::binary_unsafe::TBinaryProtocol::new(&mut b, s, true);
let mut v = Vec::with_capacity(size);
for i in 0..size {
*v.get_unchecked_mut(i) = p.read_i64().unwrap();
}
v
}
}

#[inline(never)]
fn read_be_unsafe_optimized(b: BytesMut, size: usize) -> Vec<i64> {
unsafe {
let buf: &[u8] = b.as_ref();
assert!(buf.len() >= size * 8);
let mut index = 0;

let mut v = Vec::with_capacity(size);
for i in 0..size {
*v.get_unchecked_mut(i) = i64::from_be_bytes(
buf.get_unchecked(index..index + 8)
.try_into()
.unwrap_unchecked(),
);
index += 8;
}
v.set_len(size);
v
}
}

#[inline(never)]
fn write_be(b: &mut BytesMut, v: &Vec<i64>, size: usize) {
let mut p = pilota::thrift::binary::TBinaryProtocol::new(b, true);
for el in v {
p.write_i64(*el).unwrap();
}
}

#[inline(never)]
fn write_be_unsafe(b: &mut BytesMut, v: &Vec<i64>, size: usize) {
unsafe {
let s = std::slice::from_raw_parts_mut(b.as_mut_ptr(), b.len());
let mut p = pilota::thrift::binary_unsafe::TBinaryProtocol::new(b, s, true);
for el in v {
p.write_i64(*el).unwrap();
}
}
}

#[inline(never)]
fn read_le(mut b: BytesMut, size: usize) -> Vec<i64> {
let mut p = pilota::thrift::binary_le::TBinaryProtocol::new(&mut b, true);

let mut v = Vec::with_capacity(size);
for _ in 0..size {
v.push(p.read_i64().unwrap());
}
v
}

// cargo asm -p pilota --bench thrift_binary --native --full-name --keep-labels --simplify --rust
#[inline(never)]
fn read_le_unsafe_optimized(b: BytesMut, size: usize) -> Vec<i64> {
unsafe {
let buf: &[u8] = b.as_ref();
assert!(buf.len() >= size * 8);
let mut index = 0;

let mut v = Vec::with_capacity(size);
for i in 0..size {
*v.get_unchecked_mut(i) = i64::from_le_bytes(
buf.get_unchecked(index..index + 8)
.try_into()
.unwrap_unchecked(),
);
index += 8;
}
v.set_len(size);
v
}
}

#[inline(never)]
fn read_le_optimized(mut b: BytesMut, size: usize) -> Vec<i64> {
let _p = pilota::thrift::binary_le::TBinaryProtocol::new(&mut b, true);
let mut v: Vec<i64> = Vec::with_capacity(size);
let _ = black_box({
let src = b.as_ptr();
let dst = v.as_mut_ptr();
unsafe {
std::ptr::copy_nonoverlapping(src, dst as *mut u8, size * 8);
v.set_len(size);
}
});
v
}

criterion_group!(benches, binary_bench);
criterion_main!(benches);
36 changes: 7 additions & 29 deletions pilota/src/thrift/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{convert::TryInto, str};

use bytes::{Bytes, BytesMut};
use faststr::FastStr;
use lazy_static::__Deref;

use linkedbytes::LinkedBytes;
use tokio::io::{AsyncRead, AsyncReadExt};

Expand Down Expand Up @@ -355,16 +355,6 @@ impl TOutputProtocol for TBinaryProtocol<&mut BytesMut> {
Ok(())
}

#[inline]
fn reserve(&mut self, size: usize) {
self.trans.reserve(size)
}

#[inline]
fn buf_mut(&mut self) -> &mut BytesMut {
self.trans
}

#[inline]
fn write_bytes_vec(&mut self, b: &[u8]) -> Result<(), EncodeError> {
self.write_i32(b.len() as i32)?;
Expand Down Expand Up @@ -541,17 +531,6 @@ impl TOutputProtocol for TBinaryProtocol<&mut LinkedBytes> {
fn flush(&mut self) -> Result<(), EncodeError> {
Ok(())
}

#[inline]
fn reserve(&mut self, size: usize) {
self.trans.reserve(size)
}

#[inline]
fn buf_mut(&mut self) -> &mut LinkedBytes {
self.trans
}

#[inline]
fn write_bytes_vec(&mut self, b: &[u8]) -> Result<(), EncodeError> {
self.write_i32(b.len() as i32)?;
Expand Down Expand Up @@ -903,11 +882,15 @@ impl TInputProtocol for TBinaryProtocol<&mut BytesMut> {
#[inline]
fn read_faststr(&mut self) -> Result<FastStr, DecodeError> {
let len = self.trans.read_i32()? as usize;
let bytes = self.trans.split_to(len).freeze();
if len >= ZERO_COPY_THRESHOLD {
let bytes = self.trans.split_to(len).freeze();
unsafe { return Ok(FastStr::from_bytes_unchecked(bytes)) };
}
unsafe { Ok(FastStr::new(str::from_utf8_unchecked(bytes.deref()))) }
unsafe {
Ok(FastStr::new(str::from_utf8_unchecked(
self.trans.get(..len).unwrap(),
)))
}
}

#[inline]
Expand Down Expand Up @@ -952,11 +935,6 @@ impl TInputProtocol for TBinaryProtocol<&mut BytesMut> {
Ok(self.trans.read_u8()?)
}

#[inline]
fn buf_mut(&mut self) -> &mut Self::Buf {
self.trans
}

#[inline]
fn read_bytes_vec(&mut self) -> Result<Vec<u8>, DecodeError> {
let len = self.trans.read_i32()? as usize;
Expand Down
Loading

0 comments on commit 6ade2c7

Please sign in to comment.