Skip to content

Commit

Permalink
feat(chstorage): non low-cardinality attrs
Browse files Browse the repository at this point in the history
  • Loading branch information
ernado committed Dec 2, 2024
1 parent 9b52dc8 commit 2713c95
Show file tree
Hide file tree
Showing 6 changed files with 157 additions and 31 deletions.
8 changes: 0 additions & 8 deletions internal/chstorage/_golden/col_attr.hex

This file was deleted.

Binary file removed internal/chstorage/_golden/col_attr.raw
Binary file not shown.
119 changes: 102 additions & 17 deletions internal/chstorage/attributes.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,69 @@ type Attributes struct {
Value proto.ColumnOf[otelstorage.Attrs]
}

type attributeCol struct {
type jsonAttrCol struct {
col *proto.ColStr

// values are filled up only when decoding.
values []otelstorage.Attrs
}

func (a jsonAttrCol) Type() proto.ColumnType {
return proto.ColumnTypeString
}

func (a jsonAttrCol) Rows() int {
return a.col.Rows()
}

func (a *jsonAttrCol) DecodeColumn(r *proto.Reader, rows int) error {
if err := a.col.DecodeColumn(r, rows); err != nil {
return errors.Wrap(err, "col")
}
for i := 0; i < a.col.Rows(); i++ {
v := a.col.RowBytes(i)
m, err := decodeAttributes(v)
if err != nil {
return errors.Wrapf(err, "index value %d", i)
}
a.values = append(a.values, m)
}
return nil
}

func (a *jsonAttrCol) Reset() {
a.col.Reset()
a.values = a.values[:0]
}

func (a *jsonAttrCol) EncodeColumn(b *proto.Buffer) {
a.col.EncodeColumn(b)
}

func (a *jsonAttrCol) WriteColumn(w *proto.Writer) {
a.col.WriteColumn(w)
}

func (a *jsonAttrCol) Append(v otelstorage.Attrs) {
e := jx.GetEncoder()
defer jx.PutEncoder(e)
encodeMap(e, v.AsMap())

// Append will copy passed bytes.
a.col.AppendBytes(e.Bytes())
}

func (a *jsonAttrCol) AppendArr(v []otelstorage.Attrs) {
for _, m := range v {
a.Append(m)
}
}

func (a jsonAttrCol) Row(i int) otelstorage.Attrs {
return a.values[i]
}

type jsonLowCardinalityAttrCol struct {
index *proto.ColBytes
col *proto.ColLowCardinalityRaw
hashes map[otelstorage.Hash]int
Expand All @@ -25,15 +87,15 @@ type attributeCol struct {
values []otelstorage.Attrs
}

func (a attributeCol) Type() proto.ColumnType {
func (a jsonLowCardinalityAttrCol) Type() proto.ColumnType {
return proto.ColumnTypeLowCardinality.Sub(proto.ColumnTypeString)
}

func (a attributeCol) Rows() int {
func (a jsonLowCardinalityAttrCol) Rows() int {
return a.col.Rows()
}

func (a *attributeCol) DecodeColumn(r *proto.Reader, rows int) error {
func (a *jsonLowCardinalityAttrCol) DecodeColumn(r *proto.Reader, rows int) error {
if err := a.col.DecodeColumn(r, rows); err != nil {
return errors.Wrap(err, "col")
}
Expand All @@ -49,23 +111,23 @@ func (a *attributeCol) DecodeColumn(r *proto.Reader, rows int) error {
return nil
}

func (a *attributeCol) Reset() {
func (a *jsonLowCardinalityAttrCol) Reset() {
a.col.Reset()
a.index.Reset()
a.values = a.values[:0]
maps.Clear(a.hashes)
a.col.Key = proto.KeyUInt64
}

func (a *attributeCol) EncodeColumn(b *proto.Buffer) {
func (a *jsonLowCardinalityAttrCol) EncodeColumn(b *proto.Buffer) {
a.col.EncodeColumn(b)
}

func (a *attributeCol) WriteColumn(w *proto.Writer) {
func (a *jsonLowCardinalityAttrCol) WriteColumn(w *proto.Writer) {
a.col.WriteColumn(w)
}

func (a *attributeCol) Append(v otelstorage.Attrs) {
func (a *jsonLowCardinalityAttrCol) Append(v otelstorage.Attrs) {
a.col.Key = proto.KeyUInt64
h := v.Hash()
idx, ok := a.hashes[h]
Expand All @@ -83,21 +145,21 @@ func (a *attributeCol) Append(v otelstorage.Attrs) {
a.col.AppendKey(idx)
}

func (a *attributeCol) AppendArr(v []otelstorage.Attrs) {
func (a *jsonLowCardinalityAttrCol) AppendArr(v []otelstorage.Attrs) {
for _, m := range v {
a.Append(m)
}
}

func (a *attributeCol) DecodeState(r *proto.Reader) error {
func (a *jsonLowCardinalityAttrCol) DecodeState(r *proto.Reader) error {
return a.col.DecodeState(r)
}

func (a *attributeCol) EncodeState(b *proto.Buffer) {
func (a *jsonLowCardinalityAttrCol) EncodeState(b *proto.Buffer) {
a.col.EncodeState(b)
}

func (a attributeCol) rowIdx(i int) int {
func (a jsonLowCardinalityAttrCol) rowIdx(i int) int {
switch a.col.Key {
case proto.KeyUInt8:
return int(a.col.Keys8[i])
Expand All @@ -112,12 +174,17 @@ func (a attributeCol) rowIdx(i int) int {
}
}

func (a attributeCol) Row(i int) otelstorage.Attrs {
func (a jsonLowCardinalityAttrCol) Row(i int) otelstorage.Attrs {
return a.values[a.rowIdx(i)]
}

func newAttributesColumn() proto.ColumnOf[otelstorage.Attrs] {
ac := &attributeCol{
func newAttributesColumn(opt attributesOptions) proto.ColumnOf[otelstorage.Attrs] {
if !opt.LowCardinality {
return &jsonAttrCol{
col: new(proto.ColStr),
}
}
ac := &jsonLowCardinalityAttrCol{
index: new(proto.ColBytes),
hashes: map[otelstorage.Hash]int{},
}
Expand All @@ -128,11 +195,29 @@ func newAttributesColumn() proto.ColumnOf[otelstorage.Attrs] {
return ac
}

type attributesOptions struct {
LowCardinality bool
}

type AttributesOption func(*attributesOptions)

func WithLowCardinality(v bool) AttributesOption {
return func(o *attributesOptions) {
o.LowCardinality = v
}
}

// NewAttributes constructs a new Attributes storage representation.
func NewAttributes(name string) *Attributes {
func NewAttributes(name string, opts ...AttributesOption) *Attributes {
o := attributesOptions{
LowCardinality: true,
}
for _, opt := range opts {
opt(&o)
}
return &Attributes{
Name: name,
Value: newAttributesColumn(),
Value: newAttributesColumn(o),
}
}

Expand Down
57 changes: 53 additions & 4 deletions internal/chstorage/attributes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ import (
"github.com/go-faster/oteldb/internal/otelstorage"
)

func Test_attributeCol(t *testing.T) {
func Test_jsonLowCardinalityAttrCol(t *testing.T) {
var hashes []otelstorage.Hash
col := newAttributesColumn()
col := newAttributesColumn(attributesOptions{LowCardinality: true})

for _, s := range []string{
"foo",
Expand All @@ -44,12 +44,12 @@ func Test_attributeCol(t *testing.T) {
var buf proto.Buffer
col.EncodeColumn(&buf)
t.Run("Golden", func(t *testing.T) {
gold.Bytes(t, buf.Buf, "col_attr")
gold.Bytes(t, buf.Buf, "col_json_lc_attr")
})
t.Run("Ok", func(t *testing.T) {
br := bytes.NewReader(buf.Buf)
r := proto.NewReader(br)
dec := newAttributesColumn()
dec := newAttributesColumn(attributesOptions{LowCardinality: true})
require.NoError(t, dec.DecodeColumn(r, rows))

var gotHashes []otelstorage.Hash
Expand All @@ -64,6 +64,55 @@ func Test_attributeCol(t *testing.T) {
})
}

func Test_jsonAttrCol(t *testing.T) {
var hashes []otelstorage.Hash
col := newAttributesColumn(attributesOptions{LowCardinality: false})

for _, s := range []string{
"foo",
"foo",
"bar",
"foo",
"baz",
} {
m := pcommon.NewMap()
m.PutStr("v", s)
v := otelstorage.Attrs(m)
col.Append(v)
hashes = append(hashes, v.Hash())
}
for j := 0; j < 3; j++ {
m := pcommon.NewMap()
v := otelstorage.Attrs(m)
col.Append(v)
hashes = append(hashes, v.Hash())
}

rows := col.Rows()

var buf proto.Buffer
col.EncodeColumn(&buf)
t.Run("Golden", func(t *testing.T) {
gold.Bytes(t, buf.Buf, "col_attr_json")
})
t.Run("Ok", func(t *testing.T) {
br := bytes.NewReader(buf.Buf)
r := proto.NewReader(br)
dec := newAttributesColumn(attributesOptions{LowCardinality: false})
require.NoError(t, dec.DecodeColumn(r, rows))

var gotHashes []otelstorage.Hash
for i := 0; i < dec.Rows(); i++ {
gotHashes = append(gotHashes, dec.Row(i).Hash())
}
require.Equal(t, hashes, gotHashes)
require.Equal(t, rows, dec.Rows())
dec.Reset()
require.Equal(t, 0, dec.Rows())
require.Equal(t, proto.ColumnTypeString, dec.Type())
})
}

func testMap() pcommon.Map {
m := pcommon.NewMap()
m.PutStr("net.transport", "ip_tcp")
Expand Down
2 changes: 1 addition & 1 deletion internal/chstorage/columns_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func newLogColumns() *logColumns {
serviceNamespace: new(proto.ColStr).LowCardinality(),
timestamp: new(proto.ColDateTime64).WithPrecision(proto.PrecisionNano),
severityText: new(proto.ColStr).LowCardinality(),
attributes: NewAttributes(colAttrs),
attributes: NewAttributes(colAttrs, WithLowCardinality(false)),
resource: NewAttributes(colResource),
scopeName: new(proto.ColStr).LowCardinality(),
scopeVersion: new(proto.ColStr).LowCardinality(),
Expand Down
2 changes: 1 addition & 1 deletion internal/chstorage/schema_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ const (
body String CODEC(ZSTD(1)), -- string or json object
attribute LowCardinality(String) CODEC(ZSTD(1)),
attribute String CODEC(ZSTD(1)),
resource LowCardinality(String) CODEC(ZSTD(1)),
scope LowCardinality(String) CODEC(ZSTD(1)),
Expand Down

0 comments on commit 2713c95

Please sign in to comment.