diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index c46ca7c43e..1797fb471d 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -1129,11 +1129,12 @@ func (h *Handler) RemoteWriteV2(ctx context.Context, r *storepb.WriteRequestV2) span, ctx := tracing.StartSpan(ctx, "receive_grpc") defer span.Finish() - req := tenantWreq{ - r.Tenant: r.Timeseries, + wreq := writev2pb.Request{ + Timeseries: r.Timeseries, + Symbols: r.Symbols, } - _, err := h.handleRequestV2(ctx, h.logger, uint64(r.Replica), writev2pb.NewSymbolTableFromSymbols(r.Symbols), req) + _, err := h.handleRequestV2(ctx, h.logger, uint64(r.Replica), &wreq, r.Tenant) if err != nil { level.Debug(h.logger).Log("msg", "failed to handle request", "err", err) } diff --git a/pkg/receive/handler_test.go b/pkg/receive/handler_test.go index c131055199..0487f61f00 100644 --- a/pkg/receive/handler_test.go +++ b/pkg/receive/handler_test.go @@ -53,6 +53,7 @@ import ( "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/store/storepb/prompb" + "github.com/thanos-io/thanos/pkg/store/storepb/writev2pb" "github.com/thanos-io/thanos/pkg/tenancy" ) @@ -111,6 +112,7 @@ func newFakeAppender(appendErr, commitErr, rollbackErr func() error) *fakeAppend } return &fakeAppender{ samples: make(map[storage.SeriesRef][]prompb.Sample), + exemplars: make(map[storage.SeriesRef][]exemplar.Exemplar), appendErr: appendErr, commitErr: commitErr, rollbackErr: rollbackErr, @@ -987,16 +989,33 @@ func (f *fakeRemoteWriteGRPCServer) RemoteWriteAsync(ctx context.Context, in *st cb(err) } +func (f *fakeRemoteWriteGRPCServer) RemoteWriteAsyncV2(ctx context.Context, in *storepb.WriteRequestV2, er endpointReplica, seriesIDs []int, responses chan writeResponse, cb func(error)) { + _, err := f.h.RemoteWriteV2(ctx, in) + responses <- writeResponse{ + er: er, + err: err, + seriesIDs: seriesIDs, + } + cb(err) +} + func (f *fakeRemoteWriteGRPCServer) Close() error { return nil } func BenchmarkHandlerReceiveHTTP(b *testing.B) { - benchmarkHandlerMultiTSDBReceiveRemoteWrite(testutil.NewTB(b)) + // Switch between v1 and v2 by changing the argument. + benchmarkHandlerMultiTSDBReceiveRemoteWrite(testutil.NewTB(b), "v2") } func TestHandlerReceiveHTTP(t *testing.T) { t.Parallel() - benchmarkHandlerMultiTSDBReceiveRemoteWrite(testutil.NewTB(t)) + benchmarkHandlerMultiTSDBReceiveRemoteWrite(testutil.NewTB(t), "v1") +} + +func TestHandlerReceiveHTTPRemoteWriteV2(t *testing.T) { + t.Parallel() + + benchmarkHandlerMultiTSDBReceiveRemoteWrite(testutil.NewTB(t), "v2") } // tsOverrideTenantStorage is storage that overrides timestamp to make it have consistent interval. @@ -1057,6 +1076,34 @@ func serializeSeriesWithOneSample(t testing.TB, series [][]labelpb.ZLabel) []byt return snappy.Encode(nil, body) } +func serializeSeriesWithOneSampleV2(t testing.TB, series [][]labelpb.ZLabel) []byte { + r := &writev2pb.Request{ + Timeseries: make([]writev2pb.TimeSeries, 0, len(series)), + Symbols: make([]string, 0), + } + + buf := make([]uint32, 0, len(series)*2) + st := writev2pb.NewSymbolTable() + for _, s := range series { + refs := st.SymbolizeLabels(labelpb.ZLabelsToPromLabels(s), buf) + r.Timeseries = append(r.Timeseries, writev2pb.TimeSeries{ + LabelsRefs: refs, + Samples: []writev2pb.Sample{ + { + Value: math.MaxFloat64, + Timestamp: math.MinInt64, + }, + }, + }) + } + + r.Symbols = st.Symbols() + + body, err := proto.Marshal(r) + testutil.Ok(t, err) + return snappy.Encode(nil, body) +} + func makeSeriesWithValues(numSeries int) []prompb.TimeSeries { series := make([]prompb.TimeSeries, numSeries) for i := 0; i < numSeries; i++ { @@ -1078,7 +1125,7 @@ func makeSeriesWithValues(numSeries int) []prompb.TimeSeries { return series } -func benchmarkHandlerMultiTSDBReceiveRemoteWrite(b testutil.TB) { +func benchmarkHandlerMultiTSDBReceiveRemoteWrite(b testutil.TB, mode string) { dir := b.TempDir() handlers, _, closeFunc, err := newTestHandlerHashring([]*fakeAppendable{nil}, 1, AlgorithmHashmod, false) @@ -1113,13 +1160,18 @@ func benchmarkHandlerMultiTSDBReceiveRemoteWrite(b testutil.TB) { testutil.Ok(b, m.Flush()) testutil.Ok(b, m.Open()) + serializerFn := serializeSeriesWithOneSample + if mode == "v2" { + serializerFn = serializeSeriesWithOneSampleV2 + } + for _, tcase := range []struct { name string writeRequest []byte }{ { name: "typical labels under 1KB, 500 of them", - writeRequest: serializeSeriesWithOneSample(b, func() [][]labelpb.ZLabel { + writeRequest: serializerFn(b, func() [][]labelpb.ZLabel { series := make([][]labelpb.ZLabel, 500) for s := 0; s < len(series); s++ { lbls := make([]labelpb.ZLabel, 10) @@ -1134,7 +1186,7 @@ func benchmarkHandlerMultiTSDBReceiveRemoteWrite(b testutil.TB) { }, { name: "typical labels under 1KB, 5000 of them", - writeRequest: serializeSeriesWithOneSample(b, func() [][]labelpb.ZLabel { + writeRequest: serializerFn(b, func() [][]labelpb.ZLabel { series := make([][]labelpb.ZLabel, 5000) for s := 0; s < len(series); s++ { lbls := make([]labelpb.ZLabel, 10) @@ -1149,7 +1201,7 @@ func benchmarkHandlerMultiTSDBReceiveRemoteWrite(b testutil.TB) { }, { name: "typical labels under 1KB, 20000 of them", - writeRequest: serializeSeriesWithOneSample(b, func() [][]labelpb.ZLabel { + writeRequest: serializerFn(b, func() [][]labelpb.ZLabel { series := make([][]labelpb.ZLabel, 20000) for s := 0; s < len(series); s++ { lbls := make([]labelpb.ZLabel, 10) @@ -1164,7 +1216,7 @@ func benchmarkHandlerMultiTSDBReceiveRemoteWrite(b testutil.TB) { }, { name: "extremely large label value 10MB, 10 of them", - writeRequest: serializeSeriesWithOneSample(b, func() [][]labelpb.ZLabel { + writeRequest: serializerFn(b, func() [][]labelpb.ZLabel { series := make([][]labelpb.ZLabel, 10) for s := 0; s < len(series); s++ { lbl := &strings.Builder{} @@ -1202,7 +1254,13 @@ func benchmarkHandlerMultiTSDBReceiveRemoteWrite(b testutil.TB) { b.ResetTimer() for i := 0; i < n; i++ { r := httptest.NewRecorder() - handler.receiveHTTP(r, &http.Request{ContentLength: int64(len(tcase.writeRequest)), Body: io.NopCloser(bytes.NewReader(tcase.writeRequest))}) + req := &http.Request{ContentLength: int64(len(tcase.writeRequest)), Body: io.NopCloser(bytes.NewReader(tcase.writeRequest)), Header: http.Header{}} + if mode == "v2" { + req.Header.Add("Content-Type", contentTypeHeader(WriteProtoFullNameV2)) + req.Header.Add(versionHeader, version2HeaderValue) + req.Header.Add("Content-Encoding", string(SnappyBlockCompression)) + } + handler.receiveHTTP(r, req) testutil.Equals(b, http.StatusOK, r.Code, "got non 200 error: %v", r.Body.String()) } }) @@ -1226,7 +1284,13 @@ func benchmarkHandlerMultiTSDBReceiveRemoteWrite(b testutil.TB) { // First request should be fine, since we don't change timestamp, rest is wrong. r := httptest.NewRecorder() - handler.receiveHTTP(r, &http.Request{ContentLength: int64(len(tcase.writeRequest)), Body: io.NopCloser(bytes.NewReader(tcase.writeRequest))}) + req := &http.Request{ContentLength: int64(len(tcase.writeRequest)), Body: io.NopCloser(bytes.NewReader(tcase.writeRequest)), Header: http.Header{}} + if mode == "v2" { + req.Header.Add("Content-Type", contentTypeHeader(WriteProtoFullNameV2)) + req.Header.Add(versionHeader, version2HeaderValue) + req.Header.Add("Content-Encoding", string(SnappyBlockCompression)) + } + handler.receiveHTTP(r, req) testutil.Equals(b, http.StatusOK, r.Code, "got non 200 error: %v", r.Body.String()) b.Run("conflict errors", func(b testutil.TB) { @@ -1234,7 +1298,13 @@ func benchmarkHandlerMultiTSDBReceiveRemoteWrite(b testutil.TB) { b.ResetTimer() for i := 0; i < n; i++ { r := httptest.NewRecorder() - handler.receiveHTTP(r, &http.Request{ContentLength: int64(len(tcase.writeRequest)), Body: io.NopCloser(bytes.NewReader(tcase.writeRequest))}) + req := &http.Request{ContentLength: int64(len(tcase.writeRequest)), Body: io.NopCloser(bytes.NewReader(tcase.writeRequest)), Header: http.Header{}} + if mode == "v2" { + req.Header.Add("Content-Type", contentTypeHeader(WriteProtoFullNameV2)) + req.Header.Add(versionHeader, version2HeaderValue) + req.Header.Add("Content-Encoding", string(SnappyBlockCompression)) + } + handler.receiveHTTP(r, req) testutil.Equals(b, http.StatusConflict, r.Code, "%v-%s", i, func() string { b, _ := io.ReadAll(r.Body) return string(b) @@ -1683,6 +1753,276 @@ func TestRelabel(t *testing.T) { } } +func TestRelabelV2(t *testing.T) { + t.Parallel() + + for _, tcase := range []struct { + name string + relabel []*relabel.Config + writeRequest writev2pb.Request + expectedWriteRequest writev2pb.Request + }{ + { + name: "empty relabel configs", + writeRequest: writev2pb.Request{ + Timeseries: []writev2pb.TimeSeries{ + { + LabelsRefs: []uint32{1, 2, 3, 4}, // References into symbols array + Samples: []writev2pb.Sample{ + { + Timestamp: 0, + Value: 1, + }, + }, + }, + }, + Symbols: []string{"", "__name__", "test_metric", "foo", "bar"}, + }, + expectedWriteRequest: writev2pb.Request{ + Timeseries: []writev2pb.TimeSeries{ + { + LabelsRefs: []uint32{1, 2, 3, 4}, + Samples: []writev2pb.Sample{ + { + Timestamp: 0, + Value: 1, + }, + }, + }, + }, + Symbols: []string{"", "__name__", "test_metric", "foo", "bar"}, + }, + }, + { + name: "has relabel configs but no relabelling applied", + relabel: []*relabel.Config{ + { + SourceLabels: model.LabelNames{"zoo"}, + TargetLabel: "bar", + Regex: relabel.MustNewRegexp("bar"), + Action: relabel.Replace, + Replacement: "baz", + }, + }, + writeRequest: writev2pb.Request{ + Timeseries: []writev2pb.TimeSeries{ + { + LabelsRefs: []uint32{1, 2, 3, 4}, + Samples: []writev2pb.Sample{ + { + Timestamp: 0, + Value: 1, + }, + }, + }, + }, + Symbols: []string{"", "__name__", "test_metric", "foo", "bar"}, + }, + expectedWriteRequest: writev2pb.Request{ + Timeseries: []writev2pb.TimeSeries{ + { + LabelsRefs: []uint32{1, 2, 3, 4}, + Samples: []writev2pb.Sample{ + { + Timestamp: 0, + Value: 1, + }, + }, + }, + }, + Symbols: []string{"", "__name__", "test_metric", "foo", "bar"}, + }, + }, + { + name: "relabel rewrite existing labels", + relabel: []*relabel.Config{ + { + TargetLabel: "foo", + Action: relabel.Replace, + Regex: relabel.MustNewRegexp(""), + Replacement: "test", + }, + { + TargetLabel: "__name__", + Action: relabel.Replace, + Regex: relabel.MustNewRegexp(""), + Replacement: "foo", + }, + }, + writeRequest: writev2pb.Request{ + Timeseries: []writev2pb.TimeSeries{ + { + LabelsRefs: []uint32{1, 2, 3, 4}, + Samples: []writev2pb.Sample{ + { + Timestamp: 0, + Value: 1, + }, + }, + }, + }, + Symbols: []string{"", "__name__", "test_metric", "foo", "bar"}, + }, + expectedWriteRequest: writev2pb.Request{ + Timeseries: []writev2pb.TimeSeries{ + { + LabelsRefs: []uint32{1, 2, 2, 3}, + Samples: []writev2pb.Sample{ + { + Timestamp: 0, + Value: 1, + }, + }, + }, + }, + Symbols: []string{"", "__name__", "foo", "test"}, + }, + }, + { + name: "relabel drops label", + relabel: []*relabel.Config{ + { + Action: relabel.LabelDrop, + Regex: relabel.MustNewRegexp("foo"), + }, + }, + writeRequest: writev2pb.Request{ + Timeseries: []writev2pb.TimeSeries{ + { + LabelsRefs: []uint32{1, 2, 3, 4}, + Samples: []writev2pb.Sample{ + { + Timestamp: 0, + Value: 1, + }, + }, + }, + }, + Symbols: []string{"", "__name__", "test_metric", "foo", "bar"}, + }, + expectedWriteRequest: writev2pb.Request{ + Timeseries: []writev2pb.TimeSeries{ + { + LabelsRefs: []uint32{1, 2}, + Samples: []writev2pb.Sample{ + { + Timestamp: 0, + Value: 1, + }, + }, + }, + }, + Symbols: []string{"", "__name__", "test_metric"}, + }, + }, + { + name: "relabel drops time series", + relabel: []*relabel.Config{ + { + SourceLabels: model.LabelNames{"foo"}, + Action: relabel.Drop, + Regex: relabel.MustNewRegexp("bar"), + }, + }, + writeRequest: writev2pb.Request{ + Timeseries: []writev2pb.TimeSeries{ + { + LabelsRefs: []uint32{1, 2, 3, 4}, + Samples: []writev2pb.Sample{ + { + Timestamp: 0, + Value: 1, + }, + }, + }, + }, + Symbols: []string{"", "__name__", "test_metric", "foo", "bar"}, + }, + expectedWriteRequest: writev2pb.Request{ + Timeseries: []writev2pb.TimeSeries{}, + Symbols: []string{""}, + }, + }, + { + name: "relabel rewrite existing exemplar series labels", + relabel: []*relabel.Config{ + { + Action: relabel.LabelDrop, + Regex: relabel.MustNewRegexp("foo"), + }, + }, + writeRequest: writev2pb.Request{ + Timeseries: []writev2pb.TimeSeries{ + { + LabelsRefs: []uint32{1, 2, 3, 4}, + Samples: []writev2pb.Sample{ + { + Timestamp: 0, + Value: 1, + }, + }, + }, + }, + Symbols: []string{"", "__name__", "test_metric", "foo", "bar"}, + }, + expectedWriteRequest: writev2pb.Request{ + Timeseries: []writev2pb.TimeSeries{ + { + LabelsRefs: []uint32{1, 2}, + Samples: []writev2pb.Sample{ + { + Timestamp: 0, + Value: 1, + }, + }, + }, + }, + Symbols: []string{"", "__name__", "test_metric"}, + }, + }, + { + name: "relabel drops exemplars", + relabel: []*relabel.Config{ + { + SourceLabels: model.LabelNames{"foo"}, + Action: relabel.Drop, + Regex: relabel.MustNewRegexp("bar"), + }, + }, + writeRequest: writev2pb.Request{ + Timeseries: []writev2pb.TimeSeries{ + { + LabelsRefs: []uint32{1, 2, 3, 4}, + Samples: []writev2pb.Sample{ + { + Timestamp: 0, + Value: 1, + }, + }, + }, + }, + Symbols: []string{"", "__name__", "test_metric", "foo", "bar"}, + }, + expectedWriteRequest: writev2pb.Request{ + Timeseries: []writev2pb.TimeSeries{}, + Symbols: []string{""}, + }, + }, + } { + t.Run(tcase.name, func(t *testing.T) { + h := NewHandler(nil, &Options{ + RelabelConfigs: tcase.relabel, + }) + + st, twreq := h.relabelAndSplitTenant(&tcase.writeRequest, "default") + testutil.Equals(t, tcase.expectedWriteRequest.Symbols, st.Symbols()) + for i, ts := range tcase.expectedWriteRequest.Timeseries { + testutil.Equals(t, ts, twreq["default"][i]) + } + }) + } +} + func TestGetStatsLimitParameter(t *testing.T) { t.Parallel() @@ -1875,6 +2215,66 @@ func TestHandlerSplitTenantLabelLocalWrite(t *testing.T) { require.Equal(t, map[string]struct{}{"bar": {}, "foo": {}}, hr.seenTenants) } +func TestHandlerSplitTenantLabelLocalWriteV2(t *testing.T) { + const tenantIDLabelName = "thanos_tenant_id" + + appendable := &fakeAppendable{ + appender: newFakeAppender(nil, nil, nil), + } + + h := NewHandler(nil, &Options{ + Endpoint: "localhost", + SplitTenantLabelName: tenantIDLabelName, + ReceiverMode: RouterIngestor, + ReplicationFactor: 1, + ForwardTimeout: 1 * time.Second, + Writer: NewWriter( + log.NewNopLogger(), + newFakeTenantAppendable(appendable), + &WriterOptions{}, + ), + }) + + // initialize hashring with a single local endpoint matching the handler endpoint to force + // using local write + hashring, err := newSimpleHashring([]Endpoint{ + { + Address: h.options.Endpoint, + }, + }) + require.NoError(t, err) + hr := &hashringSeenTenants{Hashring: hashring} + h.Hashring(hr) + + response, err := h.RemoteWriteV2(context.Background(), &storepb.WriteRequestV2{ + Timeseries: []writev2pb.TimeSeries{ + { + LabelsRefs: []uint32{1, 2, 3, 4}, + Samples: []writev2pb.Sample{ + { + Value: 123.45, + Timestamp: time.Now().UnixMilli(), + }, + }, + }, + { + LabelsRefs: []uint32{2, 1, 3, 5}, + Samples: []writev2pb.Sample{ + { + Value: 124.45, + Timestamp: time.Now().UnixMilli(), + }, + }, + }, + }, + Symbols: []string{"", "a", "b", tenantIDLabelName, "bar", "foo"}, + }) + + require.NoError(t, err) + require.NotNil(t, response) + require.Equal(t, map[string]struct{}{"bar": {}, "foo": {}}, hr.seenTenants) +} + func TestHandlerFlippingHashrings(t *testing.T) { t.Parallel() @@ -1952,3 +2352,90 @@ func TestHandlerFlippingHashrings(t *testing.T) { cancel() wg.Wait() } + +func TestHandlerRemoteWriteV2(t *testing.T) { + t.Parallel() + + appendable := &fakeAppendable{ + appender: newFakeAppender(nil, nil, nil), + } + + h := NewHandler(nil, &Options{ + Endpoint: "localhost", + ReceiverMode: RouterIngestor, + ReplicationFactor: 1, + ForwardTimeout: 1 * time.Second, + Writer: NewWriter( + log.NewNopLogger(), + newFakeTenantAppendable(appendable), + &WriterOptions{}, + ), + }) + + hashring, err := newSimpleHashring([]Endpoint{ + { + Address: h.options.Endpoint, + }, + }) + require.NoError(t, err) + hr := &hashringSeenTenants{Hashring: hashring} + h.Hashring(hr) + + for _, tc := range []struct { + name string + request *writev2pb.Request + }{ + { + name: "simple timeseries with samples", + request: &writev2pb.Request{ + Timeseries: []writev2pb.TimeSeries{ + { + LabelsRefs: []uint32{3, 4, 1, 2}, + Samples: []writev2pb.Sample{ + { + Value: 123.45, + Timestamp: time.Now().UnixMilli(), + }, + }, + }, + }, + Symbols: []string{"", "foo", "bar", "__name__", "test_metric"}, + }, + }, + { + name: "timeseries with exemplar", + request: &writev2pb.Request{ + Timeseries: []writev2pb.TimeSeries{ + { + LabelsRefs: []uint32{1, 2, 3, 4}, + Samples: []writev2pb.Sample{ + { + Value: 123.45, + Timestamp: time.Now().UnixMilli(), + }, + }, + Exemplars: []writev2pb.Exemplar{ + { + LabelsRefs: []uint32{2, 3}, + Value: 123.45, + Timestamp: time.Now().UnixMilli(), + }, + }, + }, + }, + Symbols: []string{"", "bar", "baz", "__name__", "test_exemplar"}, + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + response, err := h.RemoteWriteV2(context.Background(), &storepb.WriteRequestV2{ + Tenant: "test", + Timeseries: tc.request.Timeseries, + Symbols: tc.request.Symbols, + }) + + require.NoError(t, err) + require.NotNil(t, response) + }) + } +} diff --git a/pkg/receive/handler_writev2.go b/pkg/receive/handler_writev2.go index 1d7b2a4691..608d5f1eda 100644 --- a/pkg/receive/handler_writev2.go +++ b/pkg/receive/handler_writev2.go @@ -87,14 +87,8 @@ func (h *Handler) storeV2(ctx context.Context, tLogger log.Logger, w http.Respon return } - requestSymbolTable, tenantWreqs := h.relabelAndSplitTenant(&wreq, tenantHTTP) - if len(wreq.Timeseries) == 0 { - level.Debug(tLogger).Log("msg", "remote write request dropped due to relabeling.") - return - } - responseStatusCode := http.StatusOK - tenantStats, err := h.handleRequestV2(ctx, tLogger, rep, requestSymbolTable, tenantWreqs) + tenantStats, err := h.handleRequestV2(ctx, tLogger, rep, &wreq, tenantHTTP) if err != nil { level.Debug(tLogger).Log("msg", "failed to handle request", "err", err.Error()) switch errors.Cause(err) { @@ -122,7 +116,13 @@ func (h *Handler) storeV2(ctx context.Context, tLogger log.Logger, w http.Respon } } -func (h *Handler) handleRequestV2(ctx context.Context, tLogger log.Logger, rep uint64, symbolTable *writev2pb.SymbolsTable, tenantWreqs tenantWreq) (tenantRequestStats, error) { +func (h *Handler) handleRequestV2(ctx context.Context, tLogger log.Logger, rep uint64, wreq *writev2pb.Request, tenantHTTP string) (tenantRequestStats, error) { + symbolTable, tenantWreqs := h.relabelAndSplitTenant(wreq, tenantHTTP) + if len(wreq.Timeseries) == 0 { + level.Debug(tLogger).Log("msg", "remote write request dropped due to relabeling.") + return tenantRequestStats{}, nil + } + // This replica value is used to detect cycles in cyclic topologies. // A non-zero value indicates that the request has already been replicated by a previous receive instance. // For almost all users, this is only used in fully connected topologies of IngestorRouter instances. @@ -523,6 +523,7 @@ func (h *Handler) relabelAndSplitTenant(wreq *writev2pb.Request, tenantHTTP stri relabelledTenantWreqs := make(tenantWreq) b = labels.NewScratchBuilder(0) + buf := make([]uint32, 0, len(wreq.Symbols)*2) for tenant, timeseries := range tenantWreqs { relabelledTimeseries := make([]writev2pb.TimeSeries, 0) @@ -532,7 +533,7 @@ func (h *Handler) relabelAndSplitTenant(wreq *writev2pb.Request, tenantHTTP stri continue } - ts.LabelsRefs = st.SymbolizeLabels(lbls, nil) + ts.LabelsRefs = st.SymbolizeLabels(lbls, buf) ts.Metadata.HelpRef, ts.Metadata.UnitRef = st.SymbolizeMetadata(wreq.Symbols[ts.Metadata.HelpRef], wreq.Symbols[ts.Metadata.UnitRef]) relabelledTimeseries = append(relabelledTimeseries, ts) } diff --git a/pkg/receive/writer.go b/pkg/receive/writer.go index d321f77c83..b0dad8b337 100644 --- a/pkg/receive/writer.go +++ b/pkg/receive/writer.go @@ -248,7 +248,8 @@ func (r *Writer) WriteV2(ctx context.Context, tenantID string, symbolTable *writ } } - ref, _ = app.UpdateMetadata(ref, lset, t.ToMetadata(symbolTable.Symbols())) + _, err = app.UpdateMetadata(ref, lset, t.ToMetadata(symbolTable.Symbols())) + errorTracker.addMetadataError(err, tLogger) } errs := errorTracker.collectErrors(tLogger) diff --git a/pkg/receive/writer_errors.go b/pkg/receive/writer_errors.go index cabc626054..0ea9497d98 100644 --- a/pkg/receive/writer_errors.go +++ b/pkg/receive/writer_errors.go @@ -26,6 +26,8 @@ type writeErrorTracker struct { numExemplarsOutOfOrder int numExemplarsDuplicate int numExemplarsLabelLength int + + numMetadataErrors int } func (a *writeErrorTracker) addLabelsError(err error, lset *labelpb.ZLabelSet, logger log.Logger) { @@ -161,3 +163,11 @@ func (a *writeErrorTracker) collectErrors(tLogger log.Logger) writeErrors { } return errs } + +func (a *writeErrorTracker) addMetadataError(err error, tLogger log.Logger) { + if err == nil { + return + } + a.numMetadataErrors++ + level.Debug(tLogger).Log("msg", "Error on ingesting metadata", "err", err) +} diff --git a/pkg/receive/writer_test.go b/pkg/receive/writer_test.go index 2db5e6a341..282ede8b40 100644 --- a/pkg/receive/writer_test.go +++ b/pkg/receive/writer_test.go @@ -27,6 +27,7 @@ import ( "github.com/thanos-io/thanos/pkg/runutil" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb/prompb" + "github.com/thanos-io/thanos/pkg/store/storepb/writev2pb" "github.com/thanos-io/thanos/pkg/tenancy" ) @@ -77,12 +78,13 @@ func TestWriter(t *testing.T) { }, }, }, + // DesymbolizeLabels should sort the labels so this case will never fail. "should error out and skip series with out-of-order labels": { reqs: []*prompb.WriteRequest{ { Timeseries: []prompb.TimeSeries{ { - Labels: append(lbls, labelpb.ZLabel{Name: "a", Value: "1"}, labelpb.ZLabel{Name: "b", Value: "1"}, labelpb.ZLabel{Name: "Z", Value: "1"}, labelpb.ZLabel{Name: "b", Value: "2"}), + Labels: append(lbls, labelpb.ZLabel{Name: "a", Value: "1"}, labelpb.ZLabel{Name: "b", Value: "1"}, labelpb.ZLabel{Name: "Z", Value: "1"}, labelpb.ZLabel{Name: "c", Value: "2"}), Samples: []prompb.Sample{{Value: 1, Timestamp: 10}}, }, }, @@ -103,6 +105,7 @@ func TestWriter(t *testing.T) { }, expectedErr: errors.Wrapf(labelpb.ErrDuplicateLabels, "add 1 series"), }, + // DesymbolizeLabels should sort the labels so this case will never fail. "should error out and skip series with out-of-order labels; accept series with valid labels": { reqs: []*prompb.WriteRequest{ { @@ -381,6 +384,69 @@ func TestWriter(t *testing.T) { assertWrittenData(t, app, testData.expectedIngested) }) + + t.Run("writev2_writer", func(t *testing.T) { + if strings.Contains(testName, "out-of-order labels") { + t.Skip("v2 writer out of order shouldn't happen") + } + logger, m, app := setupMultitsdb(t, testData.maxExemplars) + + w := NewWriter(logger, m, testData.opts) + v2timeseries := make([]writev2pb.TimeSeries, 0, len(testData.reqs)) + + for idx, req := range testData.reqs { + st := writev2pb.NewSymbolTable() + for _, ts := range req.Timeseries { + refs := st.SymbolizeLabels(labelpb.ZLabelsToPromLabels(ts.Labels), nil) + + samples := make([]writev2pb.Sample, 0, len(ts.Samples)) + for _, s := range ts.Samples { + samples = append(samples, writev2pb.Sample{ + Value: s.Value, + Timestamp: s.Timestamp, + }) + } + + exemplars := make([]writev2pb.Exemplar, 0, len(ts.Exemplars)) + for _, e := range ts.Exemplars { + exemplars = append(exemplars, writev2pb.Exemplar{ + LabelsRefs: st.SymbolizeLabels(labelpb.ZLabelsToPromLabels(e.Labels), nil), + Value: e.Value, + Timestamp: e.Timestamp, + }) + } + + histograms := make([]writev2pb.Histogram, 0, len(ts.Histograms)) + for _, h := range ts.Histograms { + if h.IsFloatHistogram() { + histograms = append(histograms, writev2pb.FromFloatHistogram(h.GetTimestamp(), prompb.FloatHistogramProtoToFloatHistogram(h))) + } else { + histograms = append(histograms, writev2pb.FromIntHistogram(h.GetTimestamp(), prompb.HistogramProtoToHistogram(h))) + } + } + + v2timeseries = append(v2timeseries, writev2pb.TimeSeries{ + LabelsRefs: refs, + Samples: samples, + Exemplars: exemplars, + Histograms: histograms, + }) + } + + err := w.WriteV2(context.Background(), tenancy.DefaultTenant, st, v2timeseries) + + // We expect no error on any request except the last one + // which may error (and in that case we assert on it). + if testData.expectedErr == nil || idx < len(testData.reqs)-1 { + testutil.Ok(t, err) + } else { + testutil.NotOk(t, err) + testutil.Equals(t, testData.expectedErr.Error(), err.Error()) + } + } + + assertWrittenData(t, app, testData.expectedIngested) + }) }) } } @@ -504,6 +570,46 @@ func benchmarkWriter(b *testing.B, labelsNum int, seriesNum int, generateHistogr Timeseries: timeSeries, } + st := writev2pb.NewSymbolTable() + v2timeseries := make([]writev2pb.TimeSeries, 0, len(wreq.Timeseries)) + + for _, ts := range wreq.Timeseries { + refs := st.SymbolizeLabels(labelpb.ZLabelsToPromLabels(ts.Labels), nil) + + samples := make([]writev2pb.Sample, 0, len(ts.Samples)) + for _, s := range ts.Samples { + samples = append(samples, writev2pb.Sample{ + Value: s.Value, + Timestamp: s.Timestamp, + }) + } + + exemplars := make([]writev2pb.Exemplar, 0, len(ts.Exemplars)) + for _, e := range ts.Exemplars { + exemplars = append(exemplars, writev2pb.Exemplar{ + LabelsRefs: st.SymbolizeLabels(labelpb.ZLabelsToPromLabels(e.Labels), nil), + Value: e.Value, + Timestamp: e.Timestamp, + }) + } + + histograms := make([]writev2pb.Histogram, 0, len(ts.Histograms)) + for _, h := range ts.Histograms { + if h.IsFloatHistogram() { + histograms = append(histograms, writev2pb.FromFloatHistogram(h.GetTimestamp(), prompb.FloatHistogramProtoToFloatHistogram(h))) + } else { + histograms = append(histograms, writev2pb.FromIntHistogram(h.GetTimestamp(), prompb.HistogramProtoToHistogram(h))) + } + } + + v2timeseries = append(v2timeseries, writev2pb.TimeSeries{ + LabelsRefs: refs, + Samples: samples, + Exemplars: exemplars, + Histograms: histograms, + }) + } + b.Run("without interning", func(b *testing.B) { w := NewWriter(logger, m, &WriterOptions{Intern: false}) @@ -526,6 +632,27 @@ func benchmarkWriter(b *testing.B, labelsNum int, seriesNum int, generateHistogr } }) + b.Run("with interning remote write v2", func(b *testing.B) { + w := NewWriter(logger, m, &WriterOptions{Intern: true}) + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + testutil.Ok(b, w.WriteV2(ctx, "foo", st, v2timeseries)) + } + }) + + b.Run("without interning remote write v2", func(b *testing.B) { + w := NewWriter(logger, m, &WriterOptions{Intern: false}) + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + testutil.Ok(b, w.WriteV2(ctx, "foo", st, v2timeseries)) + } + }) } // generateLabelsAndSeries generates time series for benchmark with specified number of labels. diff --git a/pkg/store/storepb/writev2pb/codec.go b/pkg/store/storepb/writev2pb/codec.go index 617e04a5b7..3c125cec98 100644 --- a/pkg/store/storepb/writev2pb/codec.go +++ b/pkg/store/storepb/writev2pb/codec.go @@ -14,10 +14,7 @@ package writev2pb import ( - "strings" - "github.com/prometheus/common/model" - "github.com/thanos-io/thanos/pkg/store/storepb/prompb" "github.com/prometheus/prometheus/model/exemplar" "github.com/prometheus/prometheus/model/histogram" @@ -32,22 +29,51 @@ func (m TimeSeries) ToLabels(b *labels.ScratchBuilder, symbols []string) labels. // ToMetadata return model metadata from timeseries' remote metadata. func (m TimeSeries) ToMetadata(symbols []string) metadata.Metadata { - mt := strings.ToLower(m.Metadata.Type.String()) + typ := model.MetricTypeUnknown + switch m.Metadata.Type { + case Metadata_METRIC_TYPE_COUNTER: + typ = model.MetricTypeCounter + case Metadata_METRIC_TYPE_GAUGE: + typ = model.MetricTypeGauge + case Metadata_METRIC_TYPE_HISTOGRAM: + typ = model.MetricTypeHistogram + case Metadata_METRIC_TYPE_GAUGEHISTOGRAM: + typ = model.MetricTypeGaugeHistogram + case Metadata_METRIC_TYPE_SUMMARY: + typ = model.MetricTypeSummary + case Metadata_METRIC_TYPE_INFO: + typ = model.MetricTypeInfo + case Metadata_METRIC_TYPE_STATESET: + typ = model.MetricTypeStateset + } return metadata.Metadata{ - Type: model.MetricType(mt), + Type: typ, Unit: symbols[m.Metadata.UnitRef], Help: symbols[m.Metadata.HelpRef], } } -// FromMetadataType transforms a Prometheus metricType into writev2 metricType. Since the former is a string we need to transform it to an enum. +// FromMetadataType transforms a Prometheus metricType into writev2 metricType. +// Since the former is a string we need to transform it to an enum. func FromMetadataType(t model.MetricType) Metadata_MetricType { - mt := strings.ToUpper(string(t)) - v, ok := prompb.MetricMetadata_MetricType_value[mt] - if !ok { + switch t { + case model.MetricTypeCounter: + return Metadata_METRIC_TYPE_COUNTER + case model.MetricTypeGauge: + return Metadata_METRIC_TYPE_GAUGE + case model.MetricTypeHistogram: + return Metadata_METRIC_TYPE_HISTOGRAM + case model.MetricTypeGaugeHistogram: + return Metadata_METRIC_TYPE_GAUGEHISTOGRAM + case model.MetricTypeSummary: + return Metadata_METRIC_TYPE_SUMMARY + case model.MetricTypeInfo: + return Metadata_METRIC_TYPE_INFO + case model.MetricTypeStateset: + return Metadata_METRIC_TYPE_STATESET + default: return Metadata_METRIC_TYPE_UNSPECIFIED } - return Metadata_MetricType(v) } // IsFloatHistogram returns true if the histogram is float. @@ -177,11 +203,11 @@ func spansToSpansProto(s []histogram.Span) []BucketSpan { } func (m Exemplar) ToExemplar(b *labels.ScratchBuilder, symbols []string) exemplar.Exemplar { - timestamp := m.GetTimestamp() + timestamp := m.Timestamp return exemplar.Exemplar{ - Labels: DesymbolizeLabels(b, m.GetLabelsRefs(), symbols), - Value: m.GetValue(), + Labels: DesymbolizeLabels(b, m.LabelsRefs, symbols), + Value: m.Value, Ts: timestamp, HasTs: timestamp != 0, } diff --git a/pkg/store/storepb/writev2pb/codec_test.go b/pkg/store/storepb/writev2pb/codec_test.go new file mode 100644 index 0000000000..a7cc1d5593 --- /dev/null +++ b/pkg/store/storepb/writev2pb/codec_test.go @@ -0,0 +1,297 @@ +// Copyright 2024 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package writev2pb + +import ( + "testing" + + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" + + "github.com/prometheus/prometheus/model/histogram" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/metadata" + "github.com/prometheus/prometheus/prompb" +) + +func TestToLabels(t *testing.T) { + expected := labels.FromStrings("__name__", "metric1", "foo", "bar") + + t.Run("v1", func(t *testing.T) { + ts := prompb.TimeSeries{Labels: []prompb.Label{{Name: "__name__", Value: "metric1"}, {Name: "foo", Value: "bar"}}} + b := labels.NewScratchBuilder(2) + require.Equal(t, expected, ts.ToLabels(&b, nil)) + require.Equal(t, ts.Labels, prompb.FromLabels(expected, nil)) + require.Equal(t, ts.Labels, prompb.FromLabels(expected, ts.Labels)) + }) + t.Run("v2", func(t *testing.T) { + v2Symbols := []string{"", "__name__", "metric1", "foo", "bar"} + ts := TimeSeries{LabelsRefs: []uint32{1, 2, 3, 4}} + b := labels.NewScratchBuilder(2) + require.Equal(t, expected, ts.ToLabels(&b, v2Symbols)) + // No need for FromLabels in our prod code as we use symbol table to do so. + }) +} + +func TestFromMetadataType(t *testing.T) { + for _, tc := range []struct { + desc string + input model.MetricType + expectedV1 prompb.MetricMetadata_MetricType + expectedV2 Metadata_MetricType + }{ + { + desc: "with a single-word metric", + input: model.MetricTypeCounter, + expectedV1: prompb.MetricMetadata_COUNTER, + expectedV2: Metadata_METRIC_TYPE_COUNTER, + }, + { + desc: "with a two-word metric", + input: model.MetricTypeStateset, + expectedV1: prompb.MetricMetadata_STATESET, + expectedV2: Metadata_METRIC_TYPE_STATESET, + }, + { + desc: "with an unknown metric", + input: "not-known", + expectedV1: prompb.MetricMetadata_UNKNOWN, + expectedV2: Metadata_METRIC_TYPE_UNSPECIFIED, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + t.Run("v1", func(t *testing.T) { + require.Equal(t, tc.expectedV1, prompb.FromMetadataType(tc.input)) + }) + t.Run("v2", func(t *testing.T) { + require.Equal(t, tc.expectedV2, FromMetadataType(tc.input)) + }) + }) + } +} + +func TestToMetadata(t *testing.T) { + sym := NewSymbolTable() + + for _, tc := range []struct { + input Metadata + expected metadata.Metadata + }{ + { + input: Metadata{}, + expected: metadata.Metadata{ + Type: model.MetricTypeUnknown, + }, + }, + { + input: Metadata{ + Type: 12414, // Unknown. + }, + expected: metadata.Metadata{ + Type: model.MetricTypeUnknown, + }, + }, + { + input: Metadata{ + Type: Metadata_METRIC_TYPE_COUNTER, + HelpRef: sym.Symbolize("help1"), + UnitRef: sym.Symbolize("unit1"), + }, + expected: metadata.Metadata{ + Type: model.MetricTypeCounter, + Help: "help1", + Unit: "unit1", + }, + }, + { + input: Metadata{ + Type: Metadata_METRIC_TYPE_STATESET, + HelpRef: sym.Symbolize("help2"), + }, + expected: metadata.Metadata{ + Type: model.MetricTypeStateset, + Help: "help2", + }, + }, + } { + t.Run("", func(t *testing.T) { + ts := TimeSeries{Metadata: tc.input} + require.Equal(t, tc.expected, ts.ToMetadata(sym.Symbols())) + }) + } +} + +func TestToHistogram_Empty(t *testing.T) { + t.Run("v1", func(t *testing.T) { + require.NotNilf(t, prompb.Histogram{}.ToIntHistogram(), "") + require.NotNilf(t, prompb.Histogram{}.ToFloatHistogram(), "") + }) + t.Run("v2", func(t *testing.T) { + require.NotNilf(t, Histogram{}.ToIntHistogram(), "") + require.NotNilf(t, Histogram{}.ToFloatHistogram(), "") + }) +} + +// NOTE(bwplotka): This is technically not a valid histogram, but it represents +// important cases to test when copying or converting to/from int/float histograms. +func testIntHistogram() histogram.Histogram { + return histogram.Histogram{ + CounterResetHint: histogram.GaugeType, + Schema: 1, + Count: 19, + Sum: 2.7, + ZeroThreshold: 1e-128, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 4}, + {Offset: 0, Length: 0}, + {Offset: 0, Length: 3}, + }, + PositiveBuckets: []int64{1, 2, -2, 1, -1, 0, 0}, + NegativeSpans: []histogram.Span{ + {Offset: 0, Length: 5}, + {Offset: 1, Length: 0}, + {Offset: 0, Length: 1}, + }, + NegativeBuckets: []int64{1, 2, -2, 1, -1, 0}, + CustomValues: []float64{21421, 523}, + } +} + +// NOTE(bwplotka): This is technically not a valid histogram, but it represents +// important cases to test when copying or converting to/from int/float histograms. +func testFloatHistogram() histogram.FloatHistogram { + return histogram.FloatHistogram{ + CounterResetHint: histogram.GaugeType, + Schema: 1, + Count: 19, + Sum: 2.7, + ZeroThreshold: 1e-128, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 4}, + {Offset: 0, Length: 0}, + {Offset: 0, Length: 3}, + }, + PositiveBuckets: []float64{1, 3, 1, 2, 1, 1, 1}, + NegativeSpans: []histogram.Span{ + {Offset: 0, Length: 5}, + {Offset: 1, Length: 0}, + {Offset: 0, Length: 1}, + }, + NegativeBuckets: []float64{1, 3, 1, 2, 1, 1}, + CustomValues: []float64{21421, 523}, + } +} + +func TestFromIntToFloatOrIntHistogram(t *testing.T) { + t.Run("v1", func(t *testing.T) { + // v1 does not support nhcb. + testIntHistWithoutNHCB := testIntHistogram() + testIntHistWithoutNHCB.CustomValues = nil + testFloatHistWithoutNHCB := testFloatHistogram() + testFloatHistWithoutNHCB.CustomValues = nil + + h := prompb.FromIntHistogram(123, &testIntHistWithoutNHCB) + require.False(t, h.IsFloatHistogram()) + require.Equal(t, int64(123), h.Timestamp) + require.Equal(t, testIntHistWithoutNHCB, *h.ToIntHistogram()) + require.Equal(t, testFloatHistWithoutNHCB, *h.ToFloatHistogram()) + }) + t.Run("v2", func(t *testing.T) { + testIntHist := testIntHistogram() + testFloatHist := testFloatHistogram() + + h := FromIntHistogram(123, &testIntHist) + require.False(t, h.IsFloatHistogram()) + require.Equal(t, int64(123), h.Timestamp) + require.Equal(t, testIntHist, *h.ToIntHistogram()) + require.Equal(t, testFloatHist, *h.ToFloatHistogram()) + }) +} + +func TestFromFloatToFloatHistogram(t *testing.T) { + t.Run("v1", func(t *testing.T) { + // v1 does not support nhcb. + testFloatHistWithoutNHCB := testFloatHistogram() + testFloatHistWithoutNHCB.CustomValues = nil + + h := prompb.FromFloatHistogram(123, &testFloatHistWithoutNHCB) + require.True(t, h.IsFloatHistogram()) + require.Equal(t, int64(123), h.Timestamp) + require.Nil(t, h.ToIntHistogram()) + require.Equal(t, testFloatHistWithoutNHCB, *h.ToFloatHistogram()) + }) + t.Run("v2", func(t *testing.T) { + testFloatHist := testFloatHistogram() + + h := FromFloatHistogram(123, &testFloatHist) + require.True(t, h.IsFloatHistogram()) + require.Equal(t, int64(123), h.Timestamp) + require.Nil(t, h.ToIntHistogram()) + require.Equal(t, testFloatHist, *h.ToFloatHistogram()) + }) +} + +func TestFromIntOrFloatHistogram_ResetHint(t *testing.T) { + for _, tc := range []struct { + input histogram.CounterResetHint + expectedV1 prompb.Histogram_ResetHint + expectedV2 Histogram_ResetHint + }{ + { + input: histogram.UnknownCounterReset, + expectedV1: prompb.Histogram_UNKNOWN, + expectedV2: Histogram_RESET_HINT_UNSPECIFIED, + }, + { + input: histogram.CounterReset, + expectedV1: prompb.Histogram_YES, + expectedV2: Histogram_RESET_HINT_YES, + }, + { + input: histogram.NotCounterReset, + expectedV1: prompb.Histogram_NO, + expectedV2: Histogram_RESET_HINT_NO, + }, + { + input: histogram.GaugeType, + expectedV1: prompb.Histogram_GAUGE, + expectedV2: Histogram_RESET_HINT_GAUGE, + }, + } { + t.Run("", func(t *testing.T) { + t.Run("v1", func(t *testing.T) { + h := testIntHistogram() + h.CounterResetHint = tc.input + got := prompb.FromIntHistogram(1337, &h) + require.Equal(t, tc.expectedV1, got.GetResetHint()) + + fh := testFloatHistogram() + fh.CounterResetHint = tc.input + got2 := prompb.FromFloatHistogram(1337, &fh) + require.Equal(t, tc.expectedV1, got2.GetResetHint()) + }) + t.Run("v2", func(t *testing.T) { + h := testIntHistogram() + h.CounterResetHint = tc.input + got := FromIntHistogram(1337, &h) + require.Equal(t, tc.expectedV2, got.GetResetHint()) + + fh := testFloatHistogram() + fh.CounterResetHint = tc.input + got2 := FromFloatHistogram(1337, &fh) + require.Equal(t, tc.expectedV2, got2.GetResetHint()) + }) + }) + } +} diff --git a/pkg/store/storepb/writev2pb/symbols.go b/pkg/store/storepb/writev2pb/symbols.go index 11186c27ca..1f2a1d49ce 100644 --- a/pkg/store/storepb/writev2pb/symbols.go +++ b/pkg/store/storepb/writev2pb/symbols.go @@ -11,6 +11,7 @@ type SymbolsTable struct { } // NewSymbolTable returns a symbol table. +// The first element of the symbols table is always an empty string so index will be 1-based. func NewSymbolTable() *SymbolsTable { return &SymbolsTable{ // Empty string is required as a first element. @@ -42,16 +43,18 @@ func (t *SymbolsTable) Symbolize(str string) uint32 { // SymbolizeLabels symbolize Prometheus labels. func (t *SymbolsTable) SymbolizeLabels(lbls labels.Labels, buf []uint32) []uint32 { - result := buf[:0] + buf = buf[:0] + if cap(buf) < len(lbls)*2 { + buf = make([]uint32, 0, len(lbls)*2) + } for _, lbl := range lbls { - off := t.Symbolize(lbl.Name) - result = append(result, off) - off = t.Symbolize(lbl.Value) - result = append(result, off) + buf = append(buf, + t.Symbolize(lbl.Name), + t.Symbolize(lbl.Value)) } - return result + return buf } // SymbolizeMetadata symbolizes metadata help and unit text. @@ -81,7 +84,9 @@ func (t *SymbolsTable) Reset() { func DesymbolizeLabels(b *labels.ScratchBuilder, labelRefs []uint32, symbols []string) labels.Labels { b.Reset() for i := 0; i < len(labelRefs); i += 2 { - b.Add(symbols[labelRefs[i]], symbols[labelRefs[i+1]]) + name := symbols[labelRefs[i]] + value := symbols[labelRefs[i+1]] + b.Add(name, value) } b.Sort() return b.Labels() diff --git a/pkg/store/storepb/writev2pb/symbols_test.go b/pkg/store/storepb/writev2pb/symbols_test.go new file mode 100644 index 0000000000..8b4c56cace --- /dev/null +++ b/pkg/store/storepb/writev2pb/symbols_test.go @@ -0,0 +1,60 @@ +// Copyright 2024 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package writev2pb + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/prometheus/prometheus/model/labels" +) + +func TestSymbolsTable(t *testing.T) { + s := NewSymbolTable() + require.Equal(t, []string{""}, s.Symbols(), "required empty reference does not exist") + require.Equal(t, uint32(0), s.Symbolize("")) + require.Equal(t, []string{""}, s.Symbols()) + + require.Equal(t, uint32(1), s.Symbolize("abc")) + require.Equal(t, []string{"", "abc"}, s.Symbols()) + + require.Equal(t, uint32(2), s.Symbolize("__name__")) + require.Equal(t, []string{"", "abc", "__name__"}, s.Symbols()) + + require.Equal(t, uint32(3), s.Symbolize("foo")) + require.Equal(t, []string{"", "abc", "__name__", "foo"}, s.Symbols()) + + s.Reset() + require.Equal(t, []string{""}, s.Symbols(), "required empty reference does not exist") + require.Equal(t, uint32(0), s.Symbolize("")) + + require.Equal(t, uint32(1), s.Symbolize("__name__")) + require.Equal(t, []string{"", "__name__"}, s.Symbols()) + + require.Equal(t, uint32(2), s.Symbolize("abc")) + require.Equal(t, []string{"", "__name__", "abc"}, s.Symbols()) + + ls := labels.FromStrings("__name__", "qwer", "zxcv", "1234") + encoded := s.SymbolizeLabels(ls, nil) + require.Equal(t, []uint32{1, 3, 4, 5}, encoded) + b := labels.NewScratchBuilder(len(encoded)) + decoded := DesymbolizeLabels(&b, encoded, s.Symbols()) + require.Equal(t, ls, decoded) + + // Different buf. + ls = labels.FromStrings("__name__", "qwer", "zxcv2222", "1234") + encoded = s.SymbolizeLabels(ls, []uint32{1, 3, 4, 5}) + require.Equal(t, []uint32{1, 3, 6, 5}, encoded) +}