Skip to content

Commit

Permalink
refactor(tracestorage): create batch writer
Browse files Browse the repository at this point in the history
```
cpu: AMD Ryzen 9 5950X 16-Core Processor
                  │   old.txt    │               new.txt               │
                  │    sec/op    │   sec/op     vs base                │
InserterTraces-32   1386.1µ ± 1%   557.9µ ± 2%  -59.75% (p=0.000 n=15)

                  │    old.txt     │               new.txt                │
                  │      B/op      │     B/op      vs base                │
InserterTraces-32   1145.85Ki ± 0%   56.02Ki ± 0%  -95.11% (p=0.000 n=15)

                  │   old.txt   │               new.txt               │
                  │  allocs/op  │  allocs/op   vs base                │
InserterTraces-32   6.983k ± 0%   1.009k ± 0%  -85.55% (p=0.000 n=15)
```
  • Loading branch information
tdakkota committed Jul 19, 2024
1 parent 5ff9560 commit a18c67c
Show file tree
Hide file tree
Showing 5 changed files with 224 additions and 141 deletions.
148 changes: 110 additions & 38 deletions internal/chstorage/columns_traces.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,24 @@
package chstorage

import (
"sync"
"time"

"github.com/ClickHouse/ch-go/proto"
"github.com/go-faster/errors"
"go.opentelemetry.io/collector/pdata/pcommon"
semconv "go.opentelemetry.io/otel/semconv/v1.21.0"

"github.com/go-faster/oteldb/internal/chstorage/chsql"
"github.com/go-faster/oteldb/internal/otelstorage"
"github.com/go-faster/oteldb/internal/traceql"
"github.com/go-faster/oteldb/internal/tracestorage"
"github.com/go-faster/oteldb/internal/xsync"
)

var (
spanColumnsPool = xsync.NewPool(newSpanColumns)
spanAttrsColumnsPool = xsync.NewPool(newSpanAttrsColumns)
)

type spanColumns struct {
Expand Down Expand Up @@ -38,10 +47,14 @@ type spanColumns struct {

events eventsColumns
links linksColumns

columns func() Columns
Input func() proto.Input
Body func(table string) string
}

func newSpanColumns() *spanColumns {
return &spanColumns{
c := &spanColumns{
name: new(proto.ColStr).LowCardinality(),
start: new(proto.ColDateTime64).WithPrecision(proto.PrecisionNano),
end: new(proto.ColDateTime64).WithPrecision(proto.PrecisionNano),
Expand All @@ -58,47 +71,53 @@ func newSpanColumns() *spanColumns {
scopeVersion: new(proto.ColStr).LowCardinality(),
scopeAttributes: NewAttributes(colScope),
}
c.columns = sync.OnceValue(func() Columns {
return MergeColumns(Columns{
{Name: "service_instance_id", Data: c.serviceInstanceID},
{Name: "service_name", Data: c.serviceName},
{Name: "service_namespace", Data: c.serviceNamespace},

{Name: "trace_id", Data: &c.traceID},
{Name: "span_id", Data: &c.spanID},
{Name: "trace_state", Data: &c.traceState},
{Name: "parent_span_id", Data: &c.parentSpanID},
{Name: "name", Data: c.name},
{Name: "kind", Data: proto.Wrap(&c.kind, kindDDL)},
{Name: "start", Data: c.start},
{Name: "end", Data: c.end},
{Name: "status_code", Data: &c.statusCode},
{Name: "status_message", Data: c.statusMessage},
{Name: "batch_id", Data: &c.batchID},

{Name: "scope_name", Data: c.scopeName},
{Name: "scope_version", Data: c.scopeVersion},

{Name: "events_timestamps", Data: c.events.timestamps},
{Name: "events_names", Data: c.events.names},
{Name: "events_attributes", Data: c.events.attributes},

{Name: "links_trace_ids", Data: c.links.traceIDs},
{Name: "links_span_ids", Data: c.links.spanIDs},
{Name: "links_tracestates", Data: c.links.tracestates},
{Name: "links_attributes", Data: c.links.attributes},
},
c.attributes.Columns(),
c.resource.Columns(),
c.scopeAttributes.Columns(),
)
})
c.Input = sync.OnceValue(func() proto.Input {
return c.columns().Input()
})
c.Body = xsync.KeyOnce(func(table string) string {
return c.Input().Into(table)
})
return c
}

func (c *spanColumns) columns() Columns {
return MergeColumns(Columns{
{Name: "service_instance_id", Data: c.serviceInstanceID},
{Name: "service_name", Data: c.serviceName},
{Name: "service_namespace", Data: c.serviceNamespace},

{Name: "trace_id", Data: &c.traceID},
{Name: "span_id", Data: &c.spanID},
{Name: "trace_state", Data: &c.traceState},
{Name: "parent_span_id", Data: &c.parentSpanID},
{Name: "name", Data: c.name},
{Name: "kind", Data: proto.Wrap(&c.kind, kindDDL)},
{Name: "start", Data: c.start},
{Name: "end", Data: c.end},
{Name: "status_code", Data: &c.statusCode},
{Name: "status_message", Data: c.statusMessage},
{Name: "batch_id", Data: &c.batchID},

{Name: "scope_name", Data: c.scopeName},
{Name: "scope_version", Data: c.scopeVersion},

{Name: "events_timestamps", Data: c.events.timestamps},
{Name: "events_names", Data: c.events.names},
{Name: "events_attributes", Data: c.events.attributes},

{Name: "links_trace_ids", Data: c.links.traceIDs},
{Name: "links_span_ids", Data: c.links.spanIDs},
{Name: "links_tracestates", Data: c.links.tracestates},
{Name: "links_attributes", Data: c.links.attributes},
},
c.attributes.Columns(),
c.resource.Columns(),
c.scopeAttributes.Columns(),
)
}

func (c *spanColumns) Input() proto.Input { return c.columns().Input() }
func (c *spanColumns) Result() proto.Results { return c.columns().Result() }
func (c *spanColumns) ChsqlResult() []chsql.ResultColumn { return c.columns().ChsqlResult() }
func (c *spanColumns) Reset() { c.columns().Reset() }

func (c *spanColumns) AddRow(s tracestorage.Span) {
c.traceID.Append(s.TraceID)
Expand Down Expand Up @@ -311,3 +330,56 @@ func (c *linksColumns) Row(row int) (links []tracestorage.Link, _ error) {
}
return links, nil
}

type spanAttrsColumns struct {
name *proto.ColLowCardinality[string]
value proto.ColStr
valueType proto.ColEnum8
scope proto.ColEnum8

columns func() Columns
Input func() proto.Input
Body func(table string) string
}

func newSpanAttrsColumns() *spanAttrsColumns {
c := &spanAttrsColumns{
name: new(proto.ColStr).LowCardinality(),
value: proto.ColStr{},
valueType: proto.ColEnum8{},
scope: proto.ColEnum8{},
}
c.columns = sync.OnceValue(func() Columns {
return Columns{
{Name: "name", Data: c.name},
{Name: "value", Data: &c.value},
{Name: "value_type", Data: proto.Wrap(&c.valueType, valueTypeDDL)},
{Name: "scope", Data: proto.Wrap(&c.scope, scopeTypeDDL)},
}
})
c.Input = sync.OnceValue(func() proto.Input {
return c.columns().Input()
})
c.Body = xsync.KeyOnce(func(table string) string {
return c.Input().Into(table)
})
return c
}

func (c *spanAttrsColumns) Result() proto.Results { return c.columns().Result() }
func (c *spanAttrsColumns) ChsqlResult() []chsql.ResultColumn { return c.columns().ChsqlResult() }
func (c *spanAttrsColumns) Reset() { c.columns().Reset() }

func (c *spanAttrsColumns) AddAttrs(scope traceql.AttributeScope, attrs otelstorage.Attrs) {
attrs.AsMap().Range(func(k string, v pcommon.Value) bool {
c.AddRow(tracestorage.TagFromAttribute(scope, k, v))
return true
})
}

func (c *spanAttrsColumns) AddRow(tag tracestorage.Tag) {
c.name.Append(tag.Name)
c.value.Append(tag.Value)
c.valueType.Append(proto.Enum8(tag.Type))
c.scope.Append(proto.Enum8(tag.Scope))
}
143 changes: 78 additions & 65 deletions internal/chstorage/inserter_traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,95 +4,108 @@ import (
"context"

"github.com/ClickHouse/ch-go"
"github.com/ClickHouse/ch-go/proto"
"github.com/go-faster/errors"
"github.com/go-faster/sdk/zctx"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/errgroup"

"github.com/go-faster/oteldb/internal/traceql"
"github.com/go-faster/oteldb/internal/tracestorage"
"github.com/go-faster/oteldb/internal/xsync"
)

// InsertSpans inserts given spans.
func (i *Inserter) InsertSpans(ctx context.Context, spans []tracestorage.Span) (rerr error) {
table := i.tables.Spans
ctx, span := i.tracer.Start(ctx, "chstorage.traces.InsertSpans", trace.WithAttributes(
attribute.Int("chstorage.spans_count", len(spans)),
attribute.String("chstorage.table", table),
type spanWriter struct {
spans *spanColumns
attrs *spanAttrsColumns

inserter *Inserter
}

var _ tracestorage.SpanWriter = (*spanWriter)(nil)

// Add adds record to the batch.
func (w *spanWriter) Add(s tracestorage.Span) error {
w.spans.AddRow(s)
w.attrs.AddAttrs(traceql.ScopeSpan, s.Attrs)
w.attrs.AddAttrs(traceql.ScopeResource, s.ResourceAttrs)
w.attrs.AddAttrs(traceql.ScopeInstrumentation, s.ScopeAttrs)
return nil
}

// Submit sends batch.
func (w *spanWriter) Submit(ctx context.Context) error {
return w.inserter.submitTraces(ctx, w.spans, w.attrs)
}

// Close frees resources.
func (w *spanWriter) Close() error {
spanColumnsPool.Put(w.spans)
spanAttrsColumnsPool.Put(w.attrs)
return nil
}

var _ tracestorage.Inserter = (*Inserter)(nil)

// SpanWriter returns a new [tracestorage.SpanWriter]
func (i *Inserter) SpanWriter(ctx context.Context) (tracestorage.SpanWriter, error) {
return &spanWriter{
spans: xsync.GetReset(spanColumnsPool),
attrs: xsync.GetReset(spanAttrsColumnsPool),
inserter: i,
}, nil
}

// submitTraces inserts given traces.
func (i *Inserter) submitTraces(
ctx context.Context,
spans *spanColumns,
attrs *spanAttrsColumns,
) (rerr error) {
ctx, span := i.tracer.Start(ctx, "chstorage.traces.submitTraces", trace.WithAttributes(
attribute.Int("chstorage.spans_count", spans.spanID.Rows()),
))
defer func() {
if rerr != nil {
span.RecordError(rerr)
} else {
i.stats.InsertedSpans.Add(ctx, int64(len(spans)))
i.stats.InsertedSpans.Add(ctx, int64(spans.spanID.Rows()))
i.stats.InsertedTags.Add(ctx, int64(attrs.name.Rows()))

i.stats.Inserts.Add(ctx, 1,
metric.WithAttributes(
attribute.String("chstorage.table", table),
attribute.String("chstorage.signal", "traces"),
),
)
}
span.End()
}()

c := newSpanColumns()
for _, s := range spans {
c.AddRow(s)
}
input := c.Input()
return i.ch.Do(ctx, ch.Query{
Logger: zctx.From(ctx).Named("ch"),
Body: input.Into(table),
Input: input,
})
}
grp, grpCtx := errgroup.WithContext(ctx)
grp.Go(func() error {
ctx := grpCtx

// InsertTags insert given set of tags to the storage.
func (i *Inserter) InsertTags(ctx context.Context, tags map[tracestorage.Tag]struct{}) (rerr error) {
table := i.tables.Tags
ctx, span := i.tracer.Start(ctx, "chstorage.traces.InsertTags", trace.WithAttributes(
attribute.Int("chstorage.tags_count", len(tags)),
attribute.String("chstorage.table", table),
))
defer func() {
if rerr != nil {
span.RecordError(rerr)
} else {
i.stats.InsertedTags.Add(ctx, int64(len(tags)))
i.stats.Inserts.Add(ctx, 1,
metric.WithAttributes(
attribute.String("chstorage.table", table),
attribute.String("chstorage.signal", "tags"),
),
)
if err := i.ch.Do(ctx, ch.Query{
Logger: zctx.From(ctx).Named("ch"),
Body: spans.Body(i.tables.Spans),
Input: spans.Input(),
}); err != nil {
return errors.Wrap(err, "insert spans")
}
span.End()
}()
return nil
})
grp.Go(func() error {
ctx := grpCtx

var (
name = new(proto.ColStr).LowCardinality()
value proto.ColStr
valueType proto.ColEnum8
scopeType proto.ColEnum8
)

for tag := range tags {
name.Append(tag.Name)
value.Append(tag.Value)
valueType.Append(proto.Enum8(tag.Type))
scopeType.Append(proto.Enum8(tag.Scope))
}

input := proto.Input{
{Name: "name", Data: name},
{Name: "value", Data: value},
{Name: "value_type", Data: proto.Wrap(&valueType, valueTypeDDL)},
{Name: "scope", Data: proto.Wrap(&scopeType, scopeTypeDDL)},
}

return i.ch.Do(ctx, ch.Query{
Logger: zctx.From(ctx).Named("ch"),
Body: input.Into(table),
Input: input,
if err := i.ch.Do(ctx, ch.Query{
Logger: zctx.From(ctx).Named("ch"),
Body: attrs.Body(i.tables.Tags),
Input: attrs.Input(),
}); err != nil {
return errors.Wrap(err, "insert tags")
}
return nil
})
return grp.Wait()
}
Loading

0 comments on commit a18c67c

Please sign in to comment.