Skip to content

Commit

Permalink
feat(chstorage): offload Series
Browse files Browse the repository at this point in the history
  • Loading branch information
tdakkota committed May 31, 2024
1 parent 96619b4 commit a78de4d
Show file tree
Hide file tree
Showing 6 changed files with 226 additions and 53 deletions.
4 changes: 4 additions & 0 deletions internal/chstorage/attributes.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,10 @@ func attrKeys(name string) string {
return fmt.Sprintf("JSONExtractKeys(%s)", name)
}

func attrStringMap(name string) string {
return fmt.Sprintf("JSONExtract(%s, 'Map(String, String)')", name)
}

// Columns returns a slice of Columns for this attribute set.
func (a *Attributes) Columns() Columns {
return Columns{
Expand Down
136 changes: 136 additions & 0 deletions internal/chstorage/querier_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,24 @@ import (
"context"
"fmt"
"slices"
"strings"
"time"

"github.com/ClickHouse/ch-go"
"github.com/ClickHouse/ch-go/proto"
"github.com/go-faster/errors"
"github.com/go-faster/sdk/zctx"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
"golang.org/x/exp/maps"

"github.com/go-faster/oteldb/internal/iterators"
"github.com/go-faster/oteldb/internal/logql/logqlengine"
"github.com/go-faster/oteldb/internal/logstorage"
"github.com/go-faster/oteldb/internal/otelstorage"
"github.com/go-faster/oteldb/internal/xattribute"
)

var (
Expand Down Expand Up @@ -253,3 +257,135 @@ func (q *Querier) getMaterializedLabelColumn(labelName string) (column string, i
return "", false
}
}

// Series returns all available log series.
func (q *Querier) Series(ctx context.Context, opts logstorage.SeriesOptions) (result logstorage.Series, rerr error) {
table := q.tables.Logs

ctx, span := q.tracer.Start(ctx, "chstorage.logs.Series",
trace.WithAttributes(
attribute.Int64("chstorage.range.start", opts.Start.UnixNano()),
attribute.Int64("chstorage.range.end", opts.Start.UnixNano()),
xattribute.StringerSlice("chstorage.selectors", opts.Selectors),

attribute.String("chstorage.table", table),
),
)
defer func() {
if rerr != nil {
span.RecordError(rerr)
}
span.End()
}()

var materializedStringMap strings.Builder
{
materializedStringMap.WriteString("map(")
for i, label := range []string{
logstorage.LabelTraceID,
logstorage.LabelSpanID,
logstorage.LabelSeverity,
logstorage.LabelBody,
logstorage.LabelServiceName,
logstorage.LabelServiceInstanceID,
logstorage.LabelServiceNamespace,
} {
if i != 0 {
materializedStringMap.WriteByte(',')
}
materializedStringMap.WriteString(singleQuoted(label))
materializedStringMap.WriteByte(',')
expr, _ := q.getMaterializedLabelColumn(label)
if label == logstorage.LabelSeverity {
expr = fmt.Sprintf("toString(%s)", expr)
}
materializedStringMap.WriteString(expr)
}
materializedStringMap.WriteByte(')')
}

var query strings.Builder
fmt.Fprintf(&query, `SELECT DISTINCT
mapConcat(%s, %s, %s, %s) as series
FROM %s
WHERE (toUnixTimestamp64Nano(timestamp) >= %d AND toUnixTimestamp64Nano(timestamp) <= %d)`,
&materializedStringMap,
attrStringMap(colAttrs),
attrStringMap(colResource),
attrStringMap(colScope),
table,
opts.Start.UnixNano(), opts.End.UnixNano(),
)

if sels := opts.Selectors; len(sels) > 0 {
// Gather all labels for mapping fetch.
labels := make([]string, 0, len(sels))
for _, sel := range sels {
for _, m := range sel.Matchers {
labels = append(labels, string(m.Label))
}
}
mapping, err := q.getLabelMapping(ctx, labels)
if err != nil {
return nil, errors.Wrap(err, "get label mapping")
}

query.WriteString(" AND (false")
for _, sel := range sels {
query.WriteString(" OR (true")
if err := writeLabelMatchers(&query, sel.Matchers, mapping); err != nil {
return nil, err
}
query.WriteByte(')')
}
query.WriteByte(')')
}
query.WriteString(" LIMIT 1000")

var (
queryStartTime = time.Now()

series = proto.NewMap(
new(proto.ColStr),
new(proto.ColStr),
)
)
if err := q.ch.Do(ctx, ch.Query{
Body: query.String(),
Result: proto.Results{
{Name: "series", Data: series},
},
OnResult: func(ctx context.Context, block proto.Block) error {
for i := 0; i < series.Rows(); i++ {
s := make(map[string]string)
forEachColMap(series, i, func(k, v string) {
s[otelstorage.KeyToLabel(k)] = v
})
result = append(result, s)
}
return nil
},
}); err != nil {
return nil, err
}

q.clickhouseRequestHistogram.Record(ctx, time.Since(queryStartTime).Seconds(),
metric.WithAttributes(
attribute.String("chstorage.query_type", "Series"),
attribute.String("chstorage.table", table),
attribute.String("chstorage.signal", "logs"),
),
)
return result, nil
}

func forEachColMap[K comparable, V any](c *proto.ColMap[K, V], row int, cb func(K, V)) {
var start int
end := int(c.Offsets[row])
if row > 0 {
start = int(c.Offsets[row-1])
}
for idx := start; idx < end; idx++ {
cb(c.Keys.Row(idx), c.Values.Row(idx))
}
}
74 changes: 41 additions & 33 deletions internal/chstorage/querier_logs_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,11 +332,51 @@ func (q logQueryPredicates) write(
) error {
fmt.Fprintf(query, "(toUnixTimestamp64Nano(timestamp) >= %d AND toUnixTimestamp64Nano(timestamp) <= %d)",
q.Start.UnixNano(), q.End.UnixNano())
for _, m := range q.Labels {
if err := writeLabelMatchers(query, q.Labels, mapping); err != nil {
return err
}

for _, m := range q.Line {
switch m.Op {
case logql.OpEq, logql.OpRe:
query.WriteString(" AND (")
case logql.OpNotEq, logql.OpNotRe:
query.WriteString(" AND NOT (")
default:
return errors.Errorf("unexpected op %q", m.Op)
}

switch m.Op {
case logql.OpEq, logql.OpNotEq:
fmt.Fprintf(query, "positionUTF8(body, %s) > 0", singleQuoted(m.By.Value))
{
// HACK: check for special case of hex-encoded trace_id and span_id.
// Like `{http_method=~".+"} |= "af36000000000000c517000000000003"`.
// TODO(ernado): also handle regex?
encoded := strings.ToLower(m.By.Value)
v, _ := hex.DecodeString(encoded)
switch len(v) {
case len(otelstorage.TraceID{}):
fmt.Fprintf(query, " OR trace_id = unhex(%s)", singleQuoted(encoded))
case len(otelstorage.SpanID{}):
fmt.Fprintf(query, " OR span_id = unhex(%s)", singleQuoted(encoded))
}
}
case logql.OpRe, logql.OpNotRe:
fmt.Fprintf(query, "match(body, %s)", singleQuoted(m.By.Value))
}
query.WriteByte(')')
}
return nil
}

func writeLabelMatchers(query *strings.Builder, labels []logql.LabelMatcher, mapping map[string]string) error {
for _, m := range labels {
labelName := string(m.Label)
if key, ok := mapping[labelName]; ok {
labelName = key
}

switch m.Op {
case logql.OpEq, logql.OpRe:
query.WriteString(" AND (")
Expand Down Expand Up @@ -449,37 +489,5 @@ func (q logQueryPredicates) write(
}
query.WriteByte(')')
}

for _, m := range q.Line {
switch m.Op {
case logql.OpEq, logql.OpRe:
query.WriteString(" AND (")
case logql.OpNotEq, logql.OpNotRe:
query.WriteString(" AND NOT (")
default:
return errors.Errorf("unexpected op %q", m.Op)
}

switch m.Op {
case logql.OpEq, logql.OpNotEq:
fmt.Fprintf(query, "positionUTF8(body, %s) > 0", singleQuoted(m.By.Value))
{
// HACK: check for special case of hex-encoded trace_id and span_id.
// Like `{http_method=~".+"} |= "af36000000000000c517000000000003"`.
// TODO(ernado): also handle regex?
encoded := strings.ToLower(m.By.Value)
v, _ := hex.DecodeString(encoded)
switch len(v) {
case len(otelstorage.TraceID{}):
fmt.Fprintf(query, " OR trace_id = unhex(%s)", singleQuoted(encoded))
case len(otelstorage.SpanID{}):
fmt.Fprintf(query, " OR span_id = unhex(%s)", singleQuoted(encoded))
}
}
case logql.OpRe, logql.OpNotRe:
fmt.Fprintf(query, "match(body, %s)", singleQuoted(m.By.Value))
}
query.WriteByte(')')
}
return nil
}
20 changes: 19 additions & 1 deletion internal/logstorage/logstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package logstorage

import (
"context"
"time"

"github.com/go-faster/oteldb/internal/iterators"
"github.com/go-faster/oteldb/internal/logql"
"github.com/go-faster/oteldb/internal/otelstorage"
)

Expand All @@ -14,9 +16,11 @@ type Querier interface {
LabelNames(ctx context.Context, opts LabelsOptions) ([]string, error)
// LabelValues returns all available label values for given label.
LabelValues(ctx context.Context, labelName string, opts LabelsOptions) (iterators.Iterator[Label], error)
// Series returns all available log series.
Series(ctx context.Context, opts SeriesOptions) (Series, error)
}

// LabelsOptions defines options for Labels and LabelValues methods.
// LabelsOptions defines options for [Querier.LabelNames] and [Querier.LabelValues] methods.
type LabelsOptions struct {
// Start defines time range for search.
//
Expand All @@ -28,6 +32,20 @@ type LabelsOptions struct {
End otelstorage.Timestamp
}

// SeriesOptions defines options for [Querier.Series] method.
type SeriesOptions struct {
// Start defines time range for search.
//
// Querier ignores parameter, if it is zero.
Start time.Time
// End defines time range for search.
//
// Querier ignores parameter, if it is zero.
End time.Time
// Selectors defines a list of matchers to filter series.
Selectors []logql.Selector
}

// Inserter is a log storage insert interface.
type Inserter interface {
// InsertRecords inserts given records.
Expand Down
3 changes: 3 additions & 0 deletions internal/logstorage/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,6 @@ type Label struct {
Name string `json:"name"`
Value string `json:"value"`
}

// Series defines a list of series.
type Series []map[string]string
42 changes: 23 additions & 19 deletions internal/lokihandler/lokihandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package lokihandler

import (
"context"
"fmt"
"net/http"
"time"

Expand All @@ -13,6 +14,7 @@ import (
"github.com/go-faster/sdk/zctx"

"github.com/go-faster/oteldb/internal/iterators"
"github.com/go-faster/oteldb/internal/logql"
"github.com/go-faster/oteldb/internal/logql/logqlengine"
"github.com/go-faster/oteldb/internal/logql/logqlengine/logqlerrors"
"github.com/go-faster/oteldb/internal/logstorage"
Expand Down Expand Up @@ -227,30 +229,32 @@ func (h *LokiAPI) Series(ctx context.Context, params lokiapi.SeriesParams) (*lok
return nil, validationErr(err, "parse time range")
}

out := make([]lokiapi.MapsDataItem, 0, len(params.Match))
for _, q := range params.Match {
// TODO(ernado): offload
data, err := h.eval(ctx, q, logqlengine.EvalParams{
Start: start,
End: end,
Direction: logqlengine.DirectionBackward,
Limit: 1_000,
})
selectors := make([]logql.Selector, len(params.Match))
for i, m := range params.Match {
selectors[i], err = logql.ParseSelector(m, logql.ParseOptions{})
if err != nil {
return nil, executionErr(err, "evaluate series query")
}
if streams, ok := data.GetStreamsResult(); ok {
for _, stream := range streams.Result {
if labels, ok := stream.Stream.Get(); ok {
// TODO(ernado): should be MapsDataItem 1:1 match?
out = append(out, lokiapi.MapsDataItem(labels))
}
}
return nil, validationErr(err, fmt.Sprintf("invalid match[%d]", i))
}
}

series, err := h.q.Series(ctx, logstorage.SeriesOptions{
Start: start,
End: end,
Selectors: selectors,
})
if err != nil {
return nil, executionErr(err, "get series")
}

// FIXME(tdakkota): copying slice only because generated type is named.
result := make([]lokiapi.MapsDataItem, len(series))
for i, s := range series {
result[i] = s
}

return &lokiapi.Maps{
Status: "success",
Data: out,
Data: result,
}, nil
}

Expand Down

0 comments on commit a78de4d

Please sign in to comment.