Skip to content

Commit

Permalink
perf: batch inserts in background
Browse files Browse the repository at this point in the history
This is unsafe, but fast.
Should use some kind of WAL.

Top after optimization:
- (*metricsBatch).mapMetrics, 40%
- prometheusremotewrite.FromTimeSeries, 33%
- runtime.mallocgc, 25%
- pcommon.Map.Range, 15%

So we can optimize mapMetrics and FromTimeSeries further
to improve ingestion speed more.
  • Loading branch information
ernado committed Jan 14, 2024
1 parent 6d69639 commit b15bdd7
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 36 deletions.
2 changes: 1 addition & 1 deletion integration/prome2e/ch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func TestCH(t *testing.T) {
t.Logf("Test tables prefix: %s", prefix)
require.NoError(t, tables.Create(ctx, c))

inserter, err := chstorage.NewInserter(c, chstorage.InserterOptions{Tables: tables})
inserter, err := chstorage.NewInserter(c, chstorage.InserterOptions{Tables: tables, Sync: true})
require.NoError(t, err)

querier, err := chstorage.NewQuerier(c, chstorage.QuerierOptions{Tables: tables})
Expand Down
17 changes: 9 additions & 8 deletions internal/chstorage/attributes.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ package chstorage

import (
"fmt"
"slices"

"github.com/ClickHouse/ch-go/proto"
"github.com/go-faster/errors"
"go.opentelemetry.io/collector/pdata/pcommon"
"golang.org/x/exp/maps"

"github.com/go-faster/oteldb/internal/otelstorage"
)
Expand All @@ -19,7 +19,7 @@ type Attributes struct {
type attributeCol struct {
index *proto.ColBytes
col *proto.ColLowCardinalityRaw
hashes []otelstorage.Hash
hashes map[otelstorage.Hash]int

// values are filled up only when decoding.
values []otelstorage.Attrs
Expand All @@ -43,7 +43,7 @@ func (a *attributeCol) DecodeColumn(r *proto.Reader, rows int) error {
if err != nil {
return errors.Wrapf(err, "index value %d", i)
}
a.hashes = append(a.hashes, m.Hash())
a.hashes[m.Hash()] = i
a.values = append(a.values, m)
}
return nil
Expand All @@ -53,7 +53,7 @@ func (a *attributeCol) Reset() {
a.col.Reset()
a.index.Reset()
a.values = a.values[:0]
a.hashes = a.hashes[:0]
maps.Clear(a.hashes)
a.col.Key = proto.KeyUInt64
}

Expand All @@ -64,10 +64,10 @@ func (a *attributeCol) EncodeColumn(b *proto.Buffer) {
func (a *attributeCol) Append(v otelstorage.Attrs) {
a.col.Key = proto.KeyUInt64
h := v.Hash()
idx := slices.Index(a.hashes, h)
if idx == -1 {
idx, ok := a.hashes[h]
if !ok {
idx = len(a.hashes)
a.hashes = append(a.hashes, h)
a.hashes[h] = idx
a.index.Append(encodeAttributes(v.AsMap()))
}
a.col.AppendKey(idx)
Expand Down Expand Up @@ -108,7 +108,8 @@ func (a attributeCol) Row(i int) otelstorage.Attrs {

func newAttributesColumn() proto.ColumnOf[otelstorage.Attrs] {
ac := &attributeCol{
index: new(proto.ColBytes),
index: new(proto.ColBytes),
hashes: map[otelstorage.Hash]int{},
}
ac.col = &proto.ColLowCardinalityRaw{
Index: ac.index,
Expand Down
25 changes: 23 additions & 2 deletions internal/chstorage/inserter.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package chstorage

import (
"context"

"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
Expand All @@ -17,6 +20,8 @@ type Inserter struct {
ch ClickhouseClient
tables Tables

metricBatches chan pmetric.Metrics

insertedPoints metric.Int64Counter
insertedSpans metric.Int64Counter
insertedTags metric.Int64Counter
Expand All @@ -34,6 +39,9 @@ type InserterOptions struct {
MeterProvider metric.MeterProvider
// TracerProvider provides OpenTelemetry tracer for this querier.
TracerProvider trace.TracerProvider

// Sync disables batching.
Sync bool
}

func (opts *InserterOptions) setDefaults() {
Expand Down Expand Up @@ -87,7 +95,7 @@ func NewInserter(c ClickhouseClient, opts InserterOptions) (*Inserter, error) {
return nil, errors.Wrap(err, "create inserts")
}

return &Inserter{
inserter := &Inserter{
ch: c,
tables: opts.Tables,
insertedSpans: insertedSpans,
Expand All @@ -96,5 +104,18 @@ func NewInserter(c ClickhouseClient, opts InserterOptions) (*Inserter, error) {
insertedPoints: insertedPoints,
inserts: inserts,
tracer: opts.TracerProvider.Tracer("chstorage.Inserter"),
}, nil
}

if !opts.Sync {
inserter.metricBatches = make(chan pmetric.Metrics, 8)
ctx := context.Background()
for i := 0; i < 8; i++ {
go func() {
if err := inserter.saveMetricBatches(ctx); err != nil {
panic(err)
}
}()
}
}
return inserter, nil
}
92 changes: 67 additions & 25 deletions internal/chstorage/inserter_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ import (
"math"
"slices"
"strconv"
"sync"
"time"

"github.com/ClickHouse/ch-go"
"github.com/ClickHouse/ch-go/proto"
"github.com/cenkalti/backoff/v4"
"github.com/go-faster/errors"
"github.com/go-faster/sdk/zctx"
"github.com/prometheus/prometheus/model/labels"
Expand All @@ -24,40 +24,82 @@ import (
"github.com/go-faster/oteldb/internal/otelstorage"
)

func getMetricsBatch() *metricsBatch {
v := metricsBatchPool.Get()
if v == nil {
return newMetricBatch()
func (i *Inserter) insertBatch(ctx context.Context, b *metricsBatch) error {
eb := backoff.NewExponentialBackOff()
bo := backoff.WithContext(eb, ctx)
fn := func() error {
return b.Insert(ctx, i.tables, i.ch)
}
return v.(*metricsBatch)
}

func putMetricsBatch(b *metricsBatch) {
b.Reset()
metricsBatchPool.Put(b)
}

var metricsBatchPool sync.Pool

// ConsumeMetrics inserts given metrics.
func (i *Inserter) ConsumeMetrics(ctx context.Context, metrics pmetric.Metrics) error {
b := getMetricsBatch()
defer putMetricsBatch(b)
if err := i.mapMetrics(b, metrics); err != nil {
return errors.Wrap(err, "map metrics")
}
if err := b.Insert(ctx, i.tables, i.ch); err != nil {
return errors.Wrap(err, "send batch")
if err := backoff.Retry(fn, bo); err != nil {
return errors.Wrap(err, "insert batch")
}
i.inserts.Add(ctx, 1,
metric.WithAttributes(
attribute.String("chstorage.signal", "metrics"),
),
)
i.insertedPoints.Add(ctx, int64(b.points.value.Rows()))
b.Reset()
return nil
}

func (i *Inserter) saveMetricBatches(ctx context.Context) error {
b := newMetricBatch()
for {
select {
case batch, ok := <-i.metricBatches:
if !ok {
return nil
}
if err := b.mapMetrics(batch); err != nil {
return errors.Wrap(err, "map metrics")
}
if b.points.value.Rows() <= 100_000 {
continue
}
if err := i.insertBatch(ctx, b); err != nil {
return err
}
case <-time.After(time.Second):
if err := i.insertBatch(ctx, b); err != nil {
return err
}
case <-ctx.Done():
func() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_ = i.insertBatch(ctx, b)
}()
return ctx.Err()
}
}
}

// ConsumeMetrics inserts given metrics.
func (i *Inserter) ConsumeMetrics(ctx context.Context, metrics pmetric.Metrics) error {
if i.metricBatches == nil {
b := newMetricBatch()
if err := b.mapMetrics(metrics); err != nil {
return errors.Wrap(err, "map metrics")
}
if err := i.insertBatch(ctx, b); err != nil {
return errors.Wrap(err, "insert batch")
}
return nil
}
select {
case <-ctx.Done():
return ctx.Err()
case i.metricBatches <- metrics:
// Enqueue.
return nil
}
}

func (b metricsBatch) Len() int {
return b.points.value.Rows()
}

type metricsBatch struct {
points *pointColumns
expHistograms *expHistogramColumns
Expand Down Expand Up @@ -547,7 +589,7 @@ func (b *metricsBatch) addLabels(attrs *lazyAttributes) {
})
}

func (i *Inserter) mapMetrics(b *metricsBatch, metrics pmetric.Metrics) error {
func (b *metricsBatch) mapMetrics(metrics pmetric.Metrics) error {
resMetrics := metrics.ResourceMetrics()
for i := 0; i < resMetrics.Len(); i++ {
resMetric := resMetrics.At(i)
Expand Down

0 comments on commit b15bdd7

Please sign in to comment.