Skip to content

Commit

Permalink
refactor(logstorage): create batch writer
Browse files Browse the repository at this point in the history
```
cpu: AMD Ryzen 9 5950X 16-Core Processor
                │   old.txt    │              new.txt               │
                │    sec/op    │   sec/op     vs base               │
InserterLogs-32   2.915m ± 12%   3.112m ± 6%  +6.76% (p=0.001 n=15)

                │   old.txt    │               new.txt                │
                │     B/op     │     B/op      vs base                │
InserterLogs-32   1.709Mi ± 0%   1.163Mi ± 0%  -31.98% (p=0.000 n=15)

                │   old.txt   │               new.txt               │
                │  allocs/op  │  allocs/op   vs base                │
InserterLogs-32   23.17k ± 0%   16.40k ± 0%  -29.23% (p=0.000 n=15)
```
  • Loading branch information
tdakkota committed Jul 19, 2024
1 parent b65379e commit 102fd87
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 142 deletions.
3 changes: 2 additions & 1 deletion internal/chstorage/inserter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ type Inserter struct {

stats struct {
// Logs.
InsertedRecords metric.Int64Counter `name:"logs.inserted_records" description:"Number of inserted log records"`
InsertedRecords metric.Int64Counter `name:"logs.inserted_records" description:"Number of inserted log records"`
InsertedLogLabels metric.Int64Counter `name:"logs.inserted_log_labels" description:"Number of inserted log labels"`
// Metrics.
InsertedPoints metric.Int64Counter `name:"metrics.inserted_points" description:"Number of inserted points"`
InsertedHistograms metric.Int64Counter `name:"metrics.inserted_histograms" description:"Number of inserted exponential (native) histograms"`
Expand Down
132 changes: 69 additions & 63 deletions internal/chstorage/inserter_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,29 +9,63 @@ import (
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/errgroup"

"github.com/go-faster/oteldb/internal/logstorage"
"github.com/go-faster/oteldb/internal/otelstorage"
)

func (i *Inserter) mapRecords(c *logColumns, records []logstorage.Record) {
for _, r := range records {
c.AddRow(r)
}
type recordWriter struct {
logs *logColumns
attrs *logAttrMapColumns
inserter *Inserter
}

// InsertRecords inserts given records.
func (i *Inserter) InsertRecords(ctx context.Context, records []logstorage.Record) (rerr error) {
var _ logstorage.RecordWriter = (*recordWriter)(nil)

// Add adds record to the batch.
func (w *recordWriter) Add(record logstorage.Record) error {
w.logs.AddRow(record)
w.attrs.AddAttrs(record.Attrs)
w.attrs.AddAttrs(record.ResourceAttrs)
w.attrs.AddAttrs(record.ScopeAttrs)
return nil
}

// Submit sends batch.
func (w *recordWriter) Submit(ctx context.Context) error {
return w.inserter.submitLogs(ctx, w.logs, w.attrs)
}

// Close frees resources.
func (*recordWriter) Close() error {
return nil
}

var _ logstorage.Inserter = (*Inserter)(nil)

// RecordWriter returns a new [logstorage.RecordWriter]
func (i *Inserter) RecordWriter(ctx context.Context) (logstorage.RecordWriter, error) {
return &recordWriter{
logs: newLogColumns(),
attrs: newLogAttrMapColumns(),
inserter: i,
}, nil
}

func (i *Inserter) submitLogs(ctx context.Context, logs *logColumns, attrs *logAttrMapColumns) (rerr error) {
table := i.tables.Logs
ctx, span := i.tracer.Start(ctx, "chstorage.logs.InsertRecords", trace.WithAttributes(
attribute.Int("chstorage.records_count", len(records)),
ctx, span := i.tracer.Start(ctx, "chstorage.logs.submitLogs", trace.WithAttributes(
attribute.Int("chstorage.records_count", logs.body.Rows()),
attribute.Int("chstorage.attrs_count", attrs.name.Rows()),
attribute.String("chstorage.table", table),
))
defer func() {
if rerr != nil {
span.RecordError(rerr)
} else {
i.stats.InsertedRecords.Add(ctx, int64(len(records)))
i.stats.InsertedRecords.Add(ctx, int64(logs.body.Rows()))
i.stats.InsertedLogLabels.Add(ctx, int64(attrs.name.Rows()))

i.stats.Inserts.Add(ctx, 1,
metric.WithAttributes(
attribute.String("chstorage.table", table),
Expand All @@ -42,60 +76,32 @@ func (i *Inserter) InsertRecords(ctx context.Context, records []logstorage.Recor
span.End()
}()

logs := newLogColumns()
i.mapRecords(logs, records)

if err := i.ch.Do(ctx, ch.Query{
Logger: zctx.From(ctx).Named("ch"),
Body: logs.Input().Into(table),
Input: logs.Input(),
}); err != nil {
return errors.Wrap(err, "insert records")
}
grp, grpCtx := errgroup.WithContext(ctx)
grp.Go(func() error {
ctx := grpCtx

attrs := newLogAttrMapColumns()
for _, record := range records {
attrs.AddAttrs(record.Attrs)
attrs.AddAttrs(record.ResourceAttrs)
attrs.AddAttrs(record.ScopeAttrs)
}
if err := i.ch.Do(ctx, ch.Query{
Logger: zctx.From(ctx).Named("ch"),
Body: attrs.Input().Into(i.tables.LogAttrs),
Input: attrs.Input(),
}); err != nil {
return errors.Wrap(err, "insert labels")
}

return nil
}

// InsertLogLabels inserts given set of labels to the storage.
func (i *Inserter) InsertLogLabels(ctx context.Context, set map[logstorage.Label]struct{}) (rerr error) {
table := i.tables.LogAttrs
ctx, span := i.tracer.Start(ctx, "chstorage.logs.InsertLogLabels", trace.WithAttributes(
attribute.Int("chstorage.labels_count", len(set)),
attribute.String("chstorage.table", table),
))
defer func() {
if rerr != nil {
span.RecordError(rerr)
input := logs.Input()
if err := i.ch.Do(ctx, ch.Query{
Logger: zctx.From(ctx).Named("ch"),
Body: input.Into(table),
Input: input,
}); err != nil {
return errors.Wrap(err, "insert records")
}
span.End()
}()
return nil
})
grp.Go(func() error {
ctx := grpCtx

attrs := newLogAttrMapColumns()
for label := range set {
name := otelstorage.KeyToLabel(label.Name)
attrs.AddRow(name, label.Name)
}
if err := i.ch.Do(ctx, ch.Query{
Logger: zctx.From(ctx).Named("ch"),
Body: attrs.Input().Into(table),
Input: attrs.Input(),
}); err != nil {
return errors.Wrap(err, "insert labels")
}

return nil
input := attrs.Input()
if err := i.ch.Do(ctx, ch.Query{
Logger: zctx.From(ctx).Named("ch"),
Body: input.Into(i.tables.LogAttrs),
Input: input,
}); err != nil {
return errors.Wrap(err, "insert labels")
}
return nil
})
return grp.Wait()
}
146 changes: 72 additions & 74 deletions internal/logstorage/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,102 +24,100 @@ func NewConsumer(i Inserter) *Consumer {

// ConsumeLogs implements otelreceiver.Consumer.
func (c *Consumer) ConsumeLogs(ctx context.Context, logs plog.Logs) error {
labels := map[Label]struct{}{}
addLabels := func(attrs pcommon.Map) {
attrs.Range(func(k string, v pcommon.Value) bool {
switch t := v.Type(); t {
case pcommon.ValueTypeMap, pcommon.ValueTypeSlice:
default:
labels[Label{k, v.AsString()}] = struct{}{}
}
return true
})
w, err := c.inserter.RecordWriter(ctx)
if err != nil {
return errors.Wrap(err, "create record writer")
}
defer func() {
_ = w.Close()
}()

var (
insertBatch []Record
resLogs = logs.ResourceLogs()
)
resLogs := logs.ResourceLogs()
for i := 0; i < resLogs.Len(); i++ {
resLog := resLogs.At(i)
res := resLog.Resource()
addLabels(res.Attributes())

scopeLogs := resLog.ScopeLogs()
for i := 0; i < scopeLogs.Len(); i++ {
scopeLog := scopeLogs.At(i)
scope := scopeLog.Scope()
addLabels(scope.Attributes())

records := scopeLog.LogRecords()
for i := 0; i < records.Len(); i++ {
record := records.At(i)
insertBatch = append(insertBatch, NewRecordFromOTEL(res, scope, record))
addLabels(record.Attributes())
parsed := c.parseRecord(NewRecordFromOTEL(res, scope, record))
if err := w.Add(parsed); err != nil {
return errors.Wrap(err, "write record")
}
}
}
}

// Parse logs.
for i, record := range insertBatch {
if record.Attrs.IsZero() || record.ResourceAttrs.IsZero() {
continue
}
if err := w.Submit(ctx); err != nil {
return errors.Wrap(err, "submit log records")
}
return nil
}

// Assuming filelog.
// Should contain "log" attribute.
attrs := record.Attrs.AsMap()
const logMessageKey = "log"
v, ok := attrs.Get(logMessageKey)
if !ok || v.Type() != pcommon.ValueTypeStr {
continue
}
for _, parser := range []logparser.Parser{
logparser.GenericJSONParser{},
// Disable for lots of false-positive detections // logparser.LogFmtParser{},
} {
if !parser.Detect(v.Str()) {
continue
}
data := []byte(v.Str())
line, err := parser.Parse(data)
if err != nil {
continue
}
attrs.PutStr("logparser.type", parser.String())
attrs.Remove(logMessageKey)
if !line.Attrs.IsZero() {
line.Attrs.AsMap().Range(func(k string, v pcommon.Value) bool {
target := attrs.PutEmpty(k)
v.CopyTo(target)
return true
})
}
record.Body = line.Body
if line.Timestamp != 0 {
record.Timestamp = line.Timestamp
}
if line.SeverityNumber != 0 {
record.SeverityNumber = line.SeverityNumber
record.SeverityText = line.SeverityText
}
if !line.SpanID.IsEmpty() {
record.SpanID = line.SpanID
}
if !line.TraceID.IsEmpty() {
record.TraceID = line.TraceID
}
func (c *Consumer) parseRecord(record Record) Record {
// NOTE(tdakkota): otelcol filelog receiver sends entries
// with zero value timestamp
// Probably, this is the wrong way to handle it.
// Probably, we should not accept records if both timestamps are zero.
ts := record.Timestamp
if ts == 0 {
ts = record.ObservedTimestamp
}
record.Timestamp = ts

insertBatch[i] = record
break
}
if record.Attrs.IsZero() || record.ResourceAttrs.IsZero() {
return record
}

if err := c.inserter.InsertRecords(ctx, insertBatch); err != nil {
return errors.Wrap(err, "insert log records")
// Assuming filelog.
// Should contain "log" attribute.
attrs := record.Attrs.AsMap()
const logMessageKey = "log"
v, ok := attrs.Get(logMessageKey)
if !ok || v.Type() != pcommon.ValueTypeStr {
return record
}
if err := c.inserter.InsertLogLabels(ctx, labels); err != nil {
return errors.Wrap(err, "insert labels")
for _, parser := range []logparser.Parser{
logparser.GenericJSONParser{},
// Disable for lots of false-positive detections // logparser.LogFmtParser{},
} {
if !parser.Detect(v.Str()) {
continue
}
data := []byte(v.Str())
line, err := parser.Parse(data)
if err != nil {
continue
}
attrs.PutStr("logparser.type", parser.String())
attrs.Remove(logMessageKey)
if !line.Attrs.IsZero() {
line.Attrs.AsMap().Range(func(k string, v pcommon.Value) bool {
target := attrs.PutEmpty(k)
v.CopyTo(target)
return true
})
}
record.Body = line.Body
if line.Timestamp != 0 {
record.Timestamp = line.Timestamp
}
if line.SeverityNumber != 0 {
record.SeverityNumber = line.SeverityNumber
record.SeverityText = line.SeverityText
}
if !line.SpanID.IsEmpty() {
record.SpanID = line.SpanID
}
if !line.TraceID.IsEmpty() {
record.TraceID = line.TraceID
}
break
}
return nil
return record
}
18 changes: 14 additions & 4 deletions internal/logstorage/logstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,18 @@ type SeriesOptions struct {

// Inserter is a log storage insert interface.
type Inserter interface {
// InsertRecords inserts given records.
InsertRecords(ctx context.Context, records []Record) error
// InsertLogLabels insert given set of labels to the storage.
InsertLogLabels(ctx context.Context, labels map[Label]struct{}) error
// RecordWriter creates a new batch.
RecordWriter(ctx context.Context) (RecordWriter, error)
}

// RecordWriter represents a log record batch.
type RecordWriter interface {
// Add adds record to the batch.
Add(record Record) error
// Submit sends batch.
Submit(ctx context.Context) error
// Close frees resources.
//
// Callers should call [RecordWriter.Close] regardless if they called [RecordWriter.Submit] or not.
Close() error
}

0 comments on commit 102fd87

Please sign in to comment.