Skip to content

Commit

Permalink
fix(chstorage): remove async metric batching
Browse files Browse the repository at this point in the history
Removing for better stability.
This will negatively affect performance,
but problem should be solved on different level.
  • Loading branch information
ernado committed Apr 21, 2024
1 parent 3fb2973 commit 63f1c76
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 70 deletions.
2 changes: 1 addition & 1 deletion integration/prome2e/ch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func TestCH(t *testing.T) {
t.Logf("Test tables prefix: %s", prefix)
require.NoError(t, tables.Create(ctx, c))

inserter, err := chstorage.NewInserter(c, chstorage.InserterOptions{Tables: tables, Sync: true})
inserter, err := chstorage.NewInserter(c, chstorage.InserterOptions{Tables: tables})
require.NoError(t, err)

querier, err := chstorage.NewQuerier(c, chstorage.QuerierOptions{Tables: tables})
Expand Down
22 changes: 0 additions & 22 deletions internal/chstorage/inserter.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
package chstorage

import (
"context"
"runtime"

"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
Expand All @@ -21,8 +17,6 @@ type Inserter struct {
ch ClickhouseClient
tables Tables

metricBatches chan pmetric.Metrics

insertedPoints metric.Int64Counter
insertedSpans metric.Int64Counter
insertedTags metric.Int64Counter
Expand All @@ -40,9 +34,6 @@ type InserterOptions struct {
MeterProvider metric.MeterProvider
// TracerProvider provides OpenTelemetry tracer for this querier.
TracerProvider trace.TracerProvider

// Sync disables batching.
Sync bool
}

func (opts *InserterOptions) setDefaults() {
Expand Down Expand Up @@ -106,18 +97,5 @@ func NewInserter(c ClickhouseClient, opts InserterOptions) (*Inserter, error) {
inserts: inserts,
tracer: opts.TracerProvider.Tracer("chstorage.Inserter"),
}

if !opts.Sync {
jobs := runtime.GOMAXPROCS(0)
inserter.metricBatches = make(chan pmetric.Metrics, jobs)
ctx := context.Background()
for i := 0; i < jobs; i++ {
go func() {
if err := inserter.saveMetricBatches(ctx); err != nil {
panic(err)
}
}()
}
}
return inserter, nil
}
53 changes: 6 additions & 47 deletions internal/chstorage/inserter_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,57 +43,16 @@ func (i *Inserter) insertBatch(ctx context.Context, b *metricsBatch) error {
return nil
}

func (i *Inserter) saveMetricBatches(ctx context.Context) error {
b := newMetricBatch()
for {
select {
case batch, ok := <-i.metricBatches:
if !ok {
return nil
}
if err := b.mapMetrics(batch); err != nil {
return errors.Wrap(err, "map metrics")
}
if b.points.value.Rows() <= 100_000 {
continue
}
if err := i.insertBatch(ctx, b); err != nil {
return err
}
case <-time.After(time.Second):
if err := i.insertBatch(ctx, b); err != nil {
return err
}
case <-ctx.Done():
func() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_ = i.insertBatch(ctx, b)
}()
return ctx.Err()
}
}
}

// ConsumeMetrics inserts given metrics.
func (i *Inserter) ConsumeMetrics(ctx context.Context, metrics pmetric.Metrics) error {
if i.metricBatches == nil {
b := newMetricBatch()
if err := b.mapMetrics(metrics); err != nil {
return errors.Wrap(err, "map metrics")
}
if err := i.insertBatch(ctx, b); err != nil {
return errors.Wrap(err, "insert batch")
}
return nil
b := newMetricBatch()
if err := b.mapMetrics(metrics); err != nil {
return errors.Wrap(err, "map metrics")
}
select {
case <-ctx.Done():
return ctx.Err()
case i.metricBatches <- metrics:
// Enqueue.
return nil
if err := i.insertBatch(ctx, b); err != nil {
return errors.Wrap(err, "insert batch")
}
return nil
}

func (b metricsBatch) Len() int {
Expand Down

0 comments on commit 63f1c76

Please sign in to comment.