diff --git a/integration/prome2e/ch_test.go b/integration/prome2e/ch_test.go index d4803a0e..7aa48b7b 100644 --- a/integration/prome2e/ch_test.go +++ b/integration/prome2e/ch_test.go @@ -70,7 +70,7 @@ func TestCH(t *testing.T) { t.Logf("Test tables prefix: %s", prefix) require.NoError(t, tables.Create(ctx, c)) - inserter, err := chstorage.NewInserter(c, chstorage.InserterOptions{Tables: tables}) + inserter, err := chstorage.NewInserter(c, chstorage.InserterOptions{Tables: tables, Sync: true}) require.NoError(t, err) querier, err := chstorage.NewQuerier(c, chstorage.QuerierOptions{Tables: tables}) diff --git a/internal/chstorage/attributes.go b/internal/chstorage/attributes.go index 002a3267..1cd84b4f 100644 --- a/internal/chstorage/attributes.go +++ b/internal/chstorage/attributes.go @@ -2,11 +2,11 @@ package chstorage import ( "fmt" - "slices" "github.com/ClickHouse/ch-go/proto" "github.com/go-faster/errors" "go.opentelemetry.io/collector/pdata/pcommon" + "golang.org/x/exp/maps" "github.com/go-faster/oteldb/internal/otelstorage" ) @@ -19,7 +19,7 @@ type Attributes struct { type attributeCol struct { index *proto.ColBytes col *proto.ColLowCardinalityRaw - hashes []otelstorage.Hash + hashes map[otelstorage.Hash]int // values are filled up only when decoding. values []otelstorage.Attrs @@ -43,7 +43,7 @@ func (a *attributeCol) DecodeColumn(r *proto.Reader, rows int) error { if err != nil { return errors.Wrapf(err, "index value %d", i) } - a.hashes = append(a.hashes, m.Hash()) + a.hashes[m.Hash()] = i a.values = append(a.values, m) } return nil @@ -53,7 +53,7 @@ func (a *attributeCol) Reset() { a.col.Reset() a.index.Reset() a.values = a.values[:0] - a.hashes = a.hashes[:0] + maps.Clear(a.hashes) a.col.Key = proto.KeyUInt64 } @@ -64,10 +64,10 @@ func (a *attributeCol) EncodeColumn(b *proto.Buffer) { func (a *attributeCol) Append(v otelstorage.Attrs) { a.col.Key = proto.KeyUInt64 h := v.Hash() - idx := slices.Index(a.hashes, h) - if idx == -1 { + idx, ok := a.hashes[h] + if !ok { idx = len(a.hashes) - a.hashes = append(a.hashes, h) + a.hashes[h] = idx a.index.Append(encodeAttributes(v.AsMap())) } a.col.AppendKey(idx) @@ -108,7 +108,8 @@ func (a attributeCol) Row(i int) otelstorage.Attrs { func newAttributesColumn() proto.ColumnOf[otelstorage.Attrs] { ac := &attributeCol{ - index: new(proto.ColBytes), + index: new(proto.ColBytes), + hashes: map[otelstorage.Hash]int{}, } ac.col = &proto.ColLowCardinalityRaw{ Index: ac.index, diff --git a/internal/chstorage/inserter.go b/internal/chstorage/inserter.go index 880daf22..0100ac61 100644 --- a/internal/chstorage/inserter.go +++ b/internal/chstorage/inserter.go @@ -1,6 +1,9 @@ package chstorage import ( + "context" + + "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/trace" @@ -17,6 +20,8 @@ type Inserter struct { ch ClickhouseClient tables Tables + metricBatches chan pmetric.Metrics + insertedPoints metric.Int64Counter insertedSpans metric.Int64Counter insertedTags metric.Int64Counter @@ -34,6 +39,9 @@ type InserterOptions struct { MeterProvider metric.MeterProvider // TracerProvider provides OpenTelemetry tracer for this querier. TracerProvider trace.TracerProvider + + // Sync disables batching. + Sync bool } func (opts *InserterOptions) setDefaults() { @@ -87,7 +95,7 @@ func NewInserter(c ClickhouseClient, opts InserterOptions) (*Inserter, error) { return nil, errors.Wrap(err, "create inserts") } - return &Inserter{ + inserter := &Inserter{ ch: c, tables: opts.Tables, insertedSpans: insertedSpans, @@ -96,5 +104,18 @@ func NewInserter(c ClickhouseClient, opts InserterOptions) (*Inserter, error) { insertedPoints: insertedPoints, inserts: inserts, tracer: opts.TracerProvider.Tracer("chstorage.Inserter"), - }, nil + } + + if !opts.Sync { + inserter.metricBatches = make(chan pmetric.Metrics, 8) + ctx := context.Background() + for i := 0; i < 8; i++ { + go func() { + if err := inserter.saveMetricBatches(ctx); err != nil { + panic(err) + } + }() + } + } + return inserter, nil } diff --git a/internal/chstorage/inserter_metrics.go b/internal/chstorage/inserter_metrics.go index 19c450c0..899d3637 100644 --- a/internal/chstorage/inserter_metrics.go +++ b/internal/chstorage/inserter_metrics.go @@ -6,11 +6,11 @@ import ( "math" "slices" "strconv" - "sync" "time" "github.com/ClickHouse/ch-go" "github.com/ClickHouse/ch-go/proto" + "github.com/cenkalti/backoff/v4" "github.com/go-faster/errors" "github.com/go-faster/sdk/zctx" "github.com/prometheus/prometheus/model/labels" @@ -24,30 +24,14 @@ import ( "github.com/go-faster/oteldb/internal/otelstorage" ) -func getMetricsBatch() *metricsBatch { - v := metricsBatchPool.Get() - if v == nil { - return newMetricBatch() +func (i *Inserter) insertBatch(ctx context.Context, b *metricsBatch) error { + eb := backoff.NewExponentialBackOff() + bo := backoff.WithContext(eb, ctx) + fn := func() error { + return b.Insert(ctx, i.tables, i.ch) } - return v.(*metricsBatch) -} - -func putMetricsBatch(b *metricsBatch) { - b.Reset() - metricsBatchPool.Put(b) -} - -var metricsBatchPool sync.Pool - -// ConsumeMetrics inserts given metrics. -func (i *Inserter) ConsumeMetrics(ctx context.Context, metrics pmetric.Metrics) error { - b := getMetricsBatch() - defer putMetricsBatch(b) - if err := i.mapMetrics(b, metrics); err != nil { - return errors.Wrap(err, "map metrics") - } - if err := b.Insert(ctx, i.tables, i.ch); err != nil { - return errors.Wrap(err, "send batch") + if err := backoff.Retry(fn, bo); err != nil { + return errors.Wrap(err, "insert batch") } i.inserts.Add(ctx, 1, metric.WithAttributes( @@ -55,9 +39,67 @@ func (i *Inserter) ConsumeMetrics(ctx context.Context, metrics pmetric.Metrics) ), ) i.insertedPoints.Add(ctx, int64(b.points.value.Rows())) + b.Reset() return nil } +func (i *Inserter) saveMetricBatches(ctx context.Context) error { + b := newMetricBatch() + for { + select { + case batch, ok := <-i.metricBatches: + if !ok { + return nil + } + if err := b.mapMetrics(batch); err != nil { + return errors.Wrap(err, "map metrics") + } + if b.points.value.Rows() <= 100_000 { + continue + } + if err := i.insertBatch(ctx, b); err != nil { + return err + } + case <-time.After(time.Second): + if err := i.insertBatch(ctx, b); err != nil { + return err + } + case <-ctx.Done(): + func() { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + _ = i.insertBatch(ctx, b) + }() + return ctx.Err() + } + } +} + +// ConsumeMetrics inserts given metrics. +func (i *Inserter) ConsumeMetrics(ctx context.Context, metrics pmetric.Metrics) error { + if i.metricBatches == nil { + b := newMetricBatch() + if err := b.mapMetrics(metrics); err != nil { + return errors.Wrap(err, "map metrics") + } + if err := i.insertBatch(ctx, b); err != nil { + return errors.Wrap(err, "insert batch") + } + return nil + } + select { + case <-ctx.Done(): + return ctx.Err() + case i.metricBatches <- metrics: + // Enqueue. + return nil + } +} + +func (b metricsBatch) Len() int { + return b.points.value.Rows() +} + type metricsBatch struct { points *pointColumns expHistograms *expHistogramColumns @@ -547,7 +589,7 @@ func (b *metricsBatch) addLabels(attrs *lazyAttributes) { }) } -func (i *Inserter) mapMetrics(b *metricsBatch, metrics pmetric.Metrics) error { +func (b *metricsBatch) mapMetrics(metrics pmetric.Metrics) error { resMetrics := metrics.ResourceMetrics() for i := 0; i < resMetrics.Len(); i++ { resMetric := resMetrics.At(i)