Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support columar encoding for datums #1158

Merged
merged 10 commits into from
Aug 21, 2023
103 changes: 96 additions & 7 deletions common_types/src/datum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1077,6 +1077,11 @@ pub enum DatumView<'a> {
}

impl<'a> DatumView<'a> {
#[inline]
pub fn is_null(&self) -> bool {
matches!(self, DatumView::Null)
}

/// Return the kind of datum
pub fn kind(&self) -> DatumKind {
match self {
Expand Down Expand Up @@ -1170,13 +1175,6 @@ impl<'a> DatumView<'a> {
}
}

pub fn as_str(&self) -> Option<&str> {
match self {
DatumView::String(v) => Some(v),
_ => None,
}
}

pub fn to_datum(&self) -> Datum {
match self {
DatumView::Null => Datum::Null,
Expand All @@ -1198,6 +1196,97 @@ impl<'a> DatumView<'a> {
DatumView::Time(v) => Datum::Time(*v),
}
}

pub fn as_i8(&self) -> Option<i8> {
match self {
DatumView::Int8(v) => Some(*v),
_ => None,
}
}

pub fn as_i16(&self) -> Option<i16> {
match self {
DatumView::Int16(v) => Some(*v),
_ => None,
}
}

pub fn as_i32(&self) -> Option<i32> {
match self {
DatumView::Int32(v) => Some(*v),
_ => None,
}
}

pub fn as_i64(&self) -> Option<i64> {
match self {
DatumView::Int64(v) => Some(*v),
_ => None,
}
}

pub fn as_u8(&self) -> Option<u8> {
match self {
DatumView::UInt8(v) => Some(*v),
_ => None,
}
}

pub fn as_u16(&self) -> Option<u16> {
match self {
DatumView::UInt16(v) => Some(*v),
_ => None,
}
}

pub fn as_u32(&self) -> Option<u32> {
match self {
DatumView::UInt32(v) => Some(*v),
_ => None,
}
}

pub fn as_u64(&self) -> Option<u64> {
match self {
DatumView::UInt64(v) => Some(*v),
_ => None,
}
}

pub fn as_timestamp(&self) -> Option<Timestamp> {
match self {
DatumView::Timestamp(v) => Some(*v),
_ => None,
}
}

pub fn as_f64(&self) -> Option<f64> {
match self {
DatumView::Double(v) => Some(*v),
_ => None,
}
}

pub fn as_f32(&self) -> Option<f32> {
match self {
DatumView::Float(v) => Some(*v),
_ => None,
}
}

pub fn into_str(self) -> Option<&'a str> {
match self {
DatumView::String(v) => Some(v),
_ => None,
}
}

pub fn into_bytes(self) -> Option<&'a [u8]> {
match self {
DatumView::Varbinary(v) => Some(v),
_ => None,
}
}
}

impl<'a> std::hash::Hash for DatumView<'a> {
Expand Down
2 changes: 1 addition & 1 deletion common_types/src/row/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::{
time::Timestamp,
};

pub(crate) mod bitset;
pub mod bitset;
pub mod contiguous;

#[derive(Debug, Snafu)]
Expand Down
70 changes: 70 additions & 0 deletions components/codec/src/columnar/bytes.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright 2023 The CeresDB Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use bytes_ext::{Buf, BufMut, Bytes};
use snafu::ResultExt;

use crate::{
columnar::{
Result, ValuesDecoder, ValuesDecoderImpl, ValuesEncoder, ValuesEncoderImpl, Varint,
},
varint,
};

impl<'a> ValuesEncoder<&'a [u8]> for ValuesEncoderImpl {
fn encode<B, I>(&self, buf: &mut B, values: I) -> Result<()>
where
B: BufMut,
I: Iterator<Item = &'a [u8]>,
{
for v in values {
debug_assert!(v.len() < u32::MAX as usize);

varint::encode_uvarint(buf, v.len() as u64).context(Varint)?;
buf.put_slice(v);
}

Ok(())
}

fn estimated_encoded_size<I>(&self, values: I) -> usize
where
I: Iterator<Item = &'a [u8]>,
{
let mut total_bytes = 0;
for v in values {
// The length of `v` should be ensured to be smaller than [u32::MAX], that is to
// say, at most 5 bytes will be used when do varint encoding over a u32 number.
total_bytes += 5 + v.len();
}
total_bytes
}
}

impl ValuesDecoder<Bytes> for ValuesDecoderImpl {
fn decode<B, F>(&self, buf: &mut B, mut f: F) -> Result<()>
where
B: Buf,
F: FnMut(Bytes) -> Result<()>,
{
while buf.remaining() > 0 {
let str_len = varint::decode_uvarint(buf).context(Varint)? as usize;
let v = &buf.chunk()[..str_len];
f(Bytes::copy_from_slice(v))?;
buf.advance(str_len);
}

Ok(())
}
}
46 changes: 46 additions & 0 deletions components/codec/src/columnar/float.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright 2023 The CeresDB Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use bytes_ext::{Buf, BufMut};

use crate::columnar::{Result, ValuesDecoder, ValuesDecoderImpl, ValuesEncoder, ValuesEncoderImpl};

impl ValuesEncoder<f64> for ValuesEncoderImpl {
fn encode<B, I>(&self, buf: &mut B, values: I) -> Result<()>
where
B: BufMut,
I: Iterator<Item = f64>,
{
for v in values {
buf.put_f64(v);
}

Ok(())
}
}

impl ValuesDecoder<f64> for ValuesDecoderImpl {
fn decode<B, F>(&self, buf: &mut B, mut f: F) -> Result<()>
where
B: Buf,
F: FnMut(f64) -> Result<()>,
{
while buf.remaining() > 0 {
let v = buf.get_f64();
f(v)?;
}

Ok(())
}
}
131 changes: 131 additions & 0 deletions components/codec/src/columnar/int.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// Copyright 2023 The CeresDB Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use bytes_ext::{Buf, BufMut};
use snafu::ResultExt;

use crate::{
columnar::{
Result, ValuesDecoder, ValuesDecoderImpl, ValuesEncoder, ValuesEncoderImpl, Varint,
},
varint,
};

/// The max number of the bytes used to store a varint encoding u64/i64.
const MAX_NUM_BYTES_OF_64VARINT: usize = 10;

impl ValuesEncoder<i32> for ValuesEncoderImpl {
fn encode<B, I>(&self, buf: &mut B, values: I) -> Result<()>
where
B: BufMut,
I: Iterator<Item = i32>,
{
for v in values {
buf.put_i32(v);
}

Ok(())
}
}

impl ValuesDecoder<i32> for ValuesDecoderImpl {
fn decode<B, F>(&self, buf: &mut B, mut f: F) -> Result<()>
where
B: Buf,
F: FnMut(i32) -> Result<()>,
{
while buf.remaining() > 0 {
let v = buf.get_i32();
f(v)?;
}

Ok(())
}
}

impl ValuesEncoder<i64> for ValuesEncoderImpl {
fn encode<B, I>(&self, buf: &mut B, values: I) -> Result<()>
where
B: BufMut,
I: Iterator<Item = i64>,
{
for v in values {
varint::encode_varint(buf, v).context(Varint)?;
}

Ok(())
}

fn estimated_encoded_size<I>(&self, values: I) -> usize
where
I: Iterator<Item = i64>,
{
let (lower, higher) = values.size_hint();
let num = lower.max(higher.unwrap_or_default());
num * MAX_NUM_BYTES_OF_64VARINT
}
}

impl ValuesDecoder<i64> for ValuesDecoderImpl {
fn decode<B, F>(&self, buf: &mut B, mut f: F) -> Result<()>
where
B: Buf,
F: FnMut(i64) -> Result<()>,
{
while buf.remaining() > 0 {
let v = varint::decode_varint(buf).context(Varint)?;
f(v)?;
}

Ok(())
}
}

impl ValuesEncoder<u64> for ValuesEncoderImpl {
fn encode<B, I>(&self, buf: &mut B, values: I) -> Result<()>
where
B: BufMut,
I: Iterator<Item = u64>,
{
for v in values {
varint::encode_uvarint(buf, v).context(Varint)?;
}

Ok(())
}

fn estimated_encoded_size<I>(&self, values: I) -> usize
where
I: Iterator<Item = u64>,
{
let (lower, higher) = values.size_hint();
let num = lower.max(higher.unwrap_or_default());
num * MAX_NUM_BYTES_OF_64VARINT
}
}

impl ValuesDecoder<u64> for ValuesDecoderImpl {
fn decode<B, F>(&self, buf: &mut B, mut f: F) -> Result<()>
where
B: Buf,
F: FnMut(u64) -> Result<()>,
{
while buf.remaining() > 0 {
let v = varint::decode_uvarint(buf).context(Varint)?;
f(v)?;
}

Ok(())
}
}
Loading