Skip to content

Commit

Permalink
fix(logqlengine): restore otel adapter
Browse files Browse the repository at this point in the history
  • Loading branch information
tdakkota committed Apr 24, 2024
1 parent e6dce74 commit 1c0e7ad
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 50 deletions.
2 changes: 1 addition & 1 deletion internal/chstorage/querier_logs_optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ stageLoop:
sn.Line = line
// Replace original node with [InputNode], since we can execute filtering entirely in
// Clickhouse.
if skippedStages == 0 {
if skippedStages == 0 && !pn.EnableOTELAdapter {
return sn
}
return n
Expand Down
36 changes: 22 additions & 14 deletions internal/logql/logqlengine/engine_log_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,11 @@ func groupEntries(iter EntryIterator) (s lokiapi.Streams, _ error) {

// ProcessorNode implements [PipelineNode].
type ProcessorNode struct {
Input PipelineNode
Prefilter Processor
Selector logql.Selector
Pipeline []logql.PipelineStage
Input PipelineNode
Prefilter Processor
Selector logql.Selector
Pipeline []logql.PipelineStage
EnableOTELAdapter bool
}

var _ PipelineNode = (*ProcessorNode)(nil)
Expand All @@ -122,16 +123,17 @@ func (e *Engine) buildPipelineNode(ctx context.Context, sel logql.Selector, stag
return nil, errors.Wrap(err, "create input node")
}

if p := cond.prefilter; p == nil || p == NopProcessor && len(stages) == 0 {
if p := cond.prefilter; (p == nil || p == NopProcessor) && len(stages) == 0 && !e.otelAdapter {
// Empty processing pipeline, get data directly from storage.
return input, nil
}

return &ProcessorNode{
Input: input,
Prefilter: cond.prefilter,
Selector: sel,
Pipeline: stages,
Input: input,
Prefilter: cond.prefilter,
Selector: sel,
Pipeline: stages,
EnableOTELAdapter: e.otelAdapter,
}, nil
}

Expand All @@ -157,11 +159,12 @@ func (n *ProcessorNode) EvalPipeline(ctx context.Context, params EvalParams) (_
defer closeOnError(iter, &rerr)

return &entryIterator{
iter: iter,
prefilter: n.Prefilter,
pipeline: pipeline,
entries: 0,
limit: params.Limit,
iter: iter,
prefilter: n.Prefilter,
pipeline: pipeline,
entries: 0,
limit: params.Limit,
otelAdapter: n.EnableOTELAdapter,
}, nil
}

Expand All @@ -173,6 +176,8 @@ type entryIterator struct {

entries int
limit int
// TODO(tdakkota): what?
otelAdapter bool
}

func (i *entryIterator) Next(e *Entry) bool {
Expand All @@ -185,6 +190,9 @@ func (i *entryIterator) Next(e *Entry) bool {
ts = e.Timestamp
keep bool
)
if i.otelAdapter {
e.Line = LineFromEntry(*e)
}

e.Line, keep = i.prefilter.Process(ts, e.Line, e.Set)
if !keep {
Expand Down
56 changes: 21 additions & 35 deletions internal/logql/logqlengine/otel_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,51 +4,37 @@ import (
"github.com/go-faster/jx"
"go.opentelemetry.io/collector/pdata/pcommon"

"github.com/go-faster/oteldb/internal/logql"
"github.com/go-faster/oteldb/internal/logstorage"
)

// LineFromRecord returns a JSON line from a log record.
func LineFromRecord(record logstorage.Record) string {
// LineFromEntry returns a JSON line from a log record.
func LineFromEntry(entry Entry) string {
// Create JSON object from record.
e := &jx.Encoder{}
e.Obj(func(e *jx.Encoder) {
if record.Body != "" {
if entry.Line != "" {
e.Field(logstorage.LabelBody, func(e *jx.Encoder) {
e.Str(record.Body)
e.Str(entry.Line)
})
}
if m := record.Attrs.AsMap(); m != (pcommon.Map{}) {
record.Attrs.AsMap().Range(func(k string, v pcommon.Value) bool {
e.Field(k, func(e *jx.Encoder) {
switch v.Type() {
case pcommon.ValueTypeStr:
e.Str(v.Str())
case pcommon.ValueTypeBool:
e.Bool(v.Bool())
case pcommon.ValueTypeInt:
e.Int64(v.Int())
case pcommon.ValueTypeDouble:
e.Float64(v.Double())
default:
// Fallback.
e.Str(v.AsString())
}
})
return true
entry.Set.Range(func(k logql.Label, v pcommon.Value) {
e.Field(string(k), func(e *jx.Encoder) {
switch v.Type() {
case pcommon.ValueTypeStr:
e.Str(v.Str())
case pcommon.ValueTypeBool:
e.Bool(v.Bool())
case pcommon.ValueTypeInt:
e.Int64(v.Int())
case pcommon.ValueTypeDouble:
e.Float64(v.Double())
default:
// Fallback.
e.Str(v.AsString())
}
})
}
// HACK: add trace_id, span_id so "trace to logs" metrics work.
// Like `{http_method=~".+"} |= "af36000000000000c517000000000003"`.
if !record.TraceID.IsEmpty() {
e.Field(logstorage.LabelTraceID, func(e *jx.Encoder) {
e.Str(record.TraceID.Hex())
})
}
if !record.SpanID.IsEmpty() {
e.Field(logstorage.LabelSpanID, func(e *jx.Encoder) {
e.Str(record.SpanID.Hex())
})
}
})
})
return e.String()
}

0 comments on commit 1c0e7ad

Please sign in to comment.