Skip to content

Commit

Permalink
Query: Forward tenant information via StoreAPI (#6530)
Browse files Browse the repository at this point in the history
* Querier: Forward tenant information downstream

With this commit we attach tenant information to each query request and
forward it via the StoreAPI to any downstream Store Gateways and
Queriers.

We add the following command lines options which mimics the tenant
functionality in Receive. The options are currently hidden, as they
provide no real functionality yet. This will come in future steps.

--query.tenant-header
--query.default-tenant
--query.tenant-certificate

Signed-off-by: Jacob Baungard Hansen <[email protected]>

* Receive: Use CertificateField from Tenancy pkg

These consts are now defined in the Tenancy package, so we should use
those instead.

Signed-off-by: Jacob Baungard Hansen <[email protected]>

---------

Signed-off-by: Jacob Baungard Hansen <[email protected]>
  • Loading branch information
jacobbaungard authored Jul 20, 2023
1 parent a10de73 commit dce0794
Show file tree
Hide file tree
Showing 12 changed files with 404 additions and 54 deletions.
14 changes: 14 additions & 0 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import (
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/targets"
"github.com/thanos-io/thanos/pkg/tenancy"
"github.com/thanos-io/thanos/pkg/tls"
"github.com/thanos-io/thanos/pkg/ui"
)
Expand Down Expand Up @@ -218,6 +219,10 @@ func registerQuery(app *extkingpin.App) {
queryTelemetrySamplesQuantiles := cmd.Flag("query.telemetry.request-samples-quantiles", "The quantiles for exporting metrics about the samples count quantiles.").Default("100", "1000", "10000", "100000", "1000000").Float64List()
queryTelemetrySeriesQuantiles := cmd.Flag("query.telemetry.request-series-seconds-quantiles", "The quantiles for exporting metrics about the series count quantiles.").Default("10", "100", "1000", "10000", "100000").Float64List()

tenantHeader := cmd.Flag("query.tenant-header", "HTTP header to determine tenant.").Default(tenancy.DefaultTenantHeader).Hidden().String()
defaultTenant := cmd.Flag("query.default-tenant", "Name of the default tenant.").Default(tenancy.DefaultTenant).Hidden().String()
tenantCertField := cmd.Flag("query.tenant-certificate-field", "Use TLS client's certificate field to determine tenant for write requests. Must be one of "+tenancy.CertificateFieldOrganization+", "+tenancy.CertificateFieldOrganizationalUnit+" or "+tenancy.CertificateFieldCommonName+". This setting will cause the query.tenant-header flag value to be ignored.").Default("").Hidden().Enum("", tenancy.CertificateFieldOrganization, tenancy.CertificateFieldOrganizationalUnit, tenancy.CertificateFieldCommonName)

var storeRateLimits store.SeriesSelectLimits
storeRateLimits.RegisterFlags(cmd)

Expand Down Expand Up @@ -337,6 +342,9 @@ func registerQuery(app *extkingpin.App) {
*defaultEngine,
storeRateLimits,
queryMode(*promqlQueryMode),
*tenantHeader,
*defaultTenant,
*tenantCertField,
)
})
}
Expand Down Expand Up @@ -413,6 +421,9 @@ func runQuery(
defaultEngine string,
storeRateLimits store.SeriesSelectLimits,
queryMode queryMode,
tenantHeader string,
defaultTenant string,
tenantCertField string,
) error {
if alertQueryURL == "" {
lastColon := strings.LastIndex(httpBindAddr, ":")
Expand Down Expand Up @@ -747,6 +758,9 @@ func runQuery(
queryTelemetrySeriesQuantiles,
),
reg,
tenantHeader,
defaultTenant,
tenantCertField,
)

api.Register(router.WithPrefix("/api/v1"), tracer, logger, ins, logMiddleware)
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -876,7 +876,7 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {

cmd.Flag("receive.tenant-header", "HTTP header to determine tenant for write requests.").Default(tenancy.DefaultTenantHeader).StringVar(&rc.tenantHeader)

cmd.Flag("receive.tenant-certificate-field", "Use TLS client's certificate field to determine tenant for write requests. Must be one of "+receive.CertificateFieldOrganization+", "+receive.CertificateFieldOrganizationalUnit+" or "+receive.CertificateFieldCommonName+". This setting will cause the receive.tenant-header flag value to be ignored.").Default("").EnumVar(&rc.tenantField, "", receive.CertificateFieldOrganization, receive.CertificateFieldOrganizationalUnit, receive.CertificateFieldCommonName)
cmd.Flag("receive.tenant-certificate-field", "Use TLS client's certificate field to determine tenant for write requests. Must be one of "+tenancy.CertificateFieldOrganization+", "+tenancy.CertificateFieldOrganizationalUnit+" or "+tenancy.CertificateFieldCommonName+". This setting will cause the receive.tenant-header flag value to be ignored.").Default("").EnumVar(&rc.tenantField, "", tenancy.CertificateFieldOrganization, tenancy.CertificateFieldOrganizationalUnit, tenancy.CertificateFieldCommonName)

cmd.Flag("receive.default-tenant-id", "Default tenant ID to use when none is provided via a header.").Default(tenancy.DefaultTenant).StringVar(&rc.defaultTenantID)

Expand Down
50 changes: 48 additions & 2 deletions pkg/api/query/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ import (
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/targets"
"github.com/thanos-io/thanos/pkg/targets/targetspb"
"github.com/thanos-io/thanos/pkg/tenancy"
"github.com/thanos-io/thanos/pkg/tracing"
)

Expand Down Expand Up @@ -161,6 +162,10 @@ type QueryAPI struct {
queryRangeHist prometheus.Histogram

seriesStatsAggregator seriesQueryPerformanceMetricsAggregator

tenantHeader string
defaultTenant string
tenantCertField string
}

type seriesQueryPerformanceMetricsAggregator interface {
Expand Down Expand Up @@ -196,6 +201,9 @@ func NewQueryAPI(
gate gate.Gate,
statsAggregator seriesQueryPerformanceMetricsAggregator,
reg *prometheus.Registry,
tenantHeader string,
defaultTenant string,
tenantCertField string,
) *QueryAPI {
if statsAggregator == nil {
statsAggregator = &store.NoopSeriesStatsAggregator{}
Expand Down Expand Up @@ -226,6 +234,9 @@ func NewQueryAPI(
defaultMetadataTimeRange: defaultMetadataTimeRange,
disableCORS: disableCORS,
seriesStatsAggregator: statsAggregator,
tenantHeader: tenantHeader,
defaultTenant: defaultTenant,
tenantCertField: tenantCertField,

queryRangeHist: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Name: "thanos_query_range_requested_timespan_duration_seconds",
Expand Down Expand Up @@ -505,6 +516,13 @@ func (qapi *QueryAPI) query(r *http.Request) (interface{}, []error, *api.ApiErro
lookbackDelta = lookbackDeltaFromReq
}

tenant, err := tenancy.GetTenantFromHTTP(r, qapi.tenantHeader, qapi.defaultTenant, qapi.tenantCertField)
if err != nil {
apiErr = &api.ApiError{Typ: api.ErrorBadData, Err: err}
return nil, nil, apiErr, func() {}
}
ctx = context.WithValue(ctx, tenancy.TenantKey, tenant)

// We are starting promQL tracing span here, because we have no control over promQL code.
span, ctx := tracing.StartSpan(ctx, "promql_instant_query")
defer span.Finish()
Expand Down Expand Up @@ -665,6 +683,13 @@ func (qapi *QueryAPI) queryRange(r *http.Request) (interface{}, []error, *api.Ap
lookbackDelta = lookbackDeltaFromReq
}

tenant, err := tenancy.GetTenantFromHTTP(r, qapi.tenantHeader, qapi.defaultTenant, qapi.tenantCertField)
if err != nil {
apiErr = &api.ApiError{Typ: api.ErrorBadData, Err: err}
return nil, nil, apiErr, func() {}
}
ctx = context.WithValue(ctx, tenancy.TenantKey, tenant)

// Record the query range requested.
qapi.queryRangeHist.Observe(end.Sub(start).Seconds())

Expand Down Expand Up @@ -770,6 +795,13 @@ func (qapi *QueryAPI) labelValues(r *http.Request) (interface{}, []error, *api.A
matcherSets = append(matcherSets, matchers)
}

tenant, err := tenancy.GetTenantFromHTTP(r, qapi.tenantHeader, qapi.defaultTenant, qapi.tenantCertField)
if err != nil {
apiErr = &api.ApiError{Typ: api.ErrorBadData, Err: err}
return nil, nil, apiErr, func() {}
}
ctx = context.WithValue(ctx, tenancy.TenantKey, tenant)

q, err := qapi.queryableCreate(
true,
nil,
Expand Down Expand Up @@ -866,6 +898,13 @@ func (qapi *QueryAPI) series(r *http.Request) (interface{}, []error, *api.ApiErr
return nil, nil, apiErr, func() {}
}

tenant, err := tenancy.GetTenantFromHTTP(r, qapi.tenantHeader, qapi.defaultTenant, "")
if err != nil {
apiErr = &api.ApiError{Typ: api.ErrorBadData, Err: err}
return nil, nil, apiErr, func() {}
}
ctx := context.WithValue(r.Context(), tenancy.TenantKey, tenant)

q, err := qapi.queryableCreate(
enableDedup,
replicaLabels,
Expand All @@ -876,7 +915,7 @@ func (qapi *QueryAPI) series(r *http.Request) (interface{}, []error, *api.ApiErr
true,
nil,
query.NoopSeriesStatsReporter,
).Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end))
).Querier(ctx, timestamp.FromTime(start), timestamp.FromTime(end))

if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}, func() {}
Expand Down Expand Up @@ -926,6 +965,13 @@ func (qapi *QueryAPI) labelNames(r *http.Request) (interface{}, []error, *api.Ap
matcherSets = append(matcherSets, matchers)
}

tenant, err := tenancy.GetTenantFromHTTP(r, qapi.tenantHeader, qapi.defaultTenant, "")
if err != nil {
apiErr = &api.ApiError{Typ: api.ErrorBadData, Err: err}
return nil, nil, apiErr, func() {}
}
ctx := context.WithValue(r.Context(), tenancy.TenantKey, tenant)

q, err := qapi.queryableCreate(
true,
nil,
Expand All @@ -936,7 +982,7 @@ func (qapi *QueryAPI) labelNames(r *http.Request) (interface{}, []error, *api.Ap
true,
nil,
query.NoopSeriesStatsReporter,
).Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end))
).Querier(ctx, timestamp.FromTime(start), timestamp.FromTime(end))
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}, func() {}
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/api/query/v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,8 @@ func TestQueryEndpoints(t *testing.T) {
Name: "query_range_hist",
}),
seriesStatsAggregator: &store.NoopSeriesStatsAggregator{},
tenantHeader: "thanos-tenant",
defaultTenant: "default-tenant",
}

start := time.Unix(0, 0)
Expand Down Expand Up @@ -744,6 +746,8 @@ func TestMetadataEndpoints(t *testing.T) {
Name: "query_range_hist",
}),
seriesStatsAggregator: &store.NoopSeriesStatsAggregator{},
tenantHeader: "thanos-tenant",
defaultTenant: "default-tenant",
}
apiWithLabelLookback := &QueryAPI{
baseAPI: &baseAPI.BaseAPI{
Expand All @@ -759,6 +763,8 @@ func TestMetadataEndpoints(t *testing.T) {
Name: "query_range_hist",
}),
seriesStatsAggregator: &store.NoopSeriesStatsAggregator{},
tenantHeader: "thanos-tenant",
defaultTenant: "default-tenant",
}

var tests = []endpointTestCase{
Expand Down Expand Up @@ -1229,6 +1235,8 @@ func TestStoresEndpoint(t *testing.T) {
},
}
},
tenantHeader: "thanos-tenant",
defaultTenant: "default-tenant",
}
apiWithInvalidEndpoint := &QueryAPI{
endpointStatus: func() []query.EndpointStatus {
Expand Down
4 changes: 4 additions & 0 deletions pkg/query/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/thanos-io/thanos/pkg/gate"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/tenancy"
"github.com/thanos-io/thanos/pkg/tracing"
)

Expand Down Expand Up @@ -340,6 +341,7 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms .

// TODO(bwplotka): Pass it using the SeriesRequest instead of relying on context.
ctx = context.WithValue(ctx, store.StoreMatcherKey, q.storeDebugMatchers)
ctx = context.WithValue(ctx, tenancy.TenantKey, q.ctx.Value(tenancy.TenantKey))

// TODO(bwplotka): Use inprocess gRPC when we want to stream responses.
// Currently streaming won't help due to nature of the both PromQL engine which
Expand Down Expand Up @@ -419,6 +421,7 @@ func (q *querier) LabelValues(name string, matchers ...*labels.Matcher) ([]strin

// TODO(bwplotka): Pass it using the SeriesRequest instead of relying on context.
ctx = context.WithValue(ctx, store.StoreMatcherKey, q.storeDebugMatchers)
ctx = context.WithValue(ctx, tenancy.TenantKey, q.ctx.Value(tenancy.TenantKey))

pbMatchers, err := storepb.PromMatchersToMatchers(matchers...)
if err != nil {
Expand Down Expand Up @@ -452,6 +455,7 @@ func (q *querier) LabelNames(matchers ...*labels.Matcher) ([]string, storage.War

// TODO(bwplotka): Pass it using the SeriesRequest instead of relying on context.
ctx = context.WithValue(ctx, store.StoreMatcherKey, q.storeDebugMatchers)
ctx = context.WithValue(ctx, tenancy.TenantKey, q.ctx.Value(tenancy.TenantKey))

pbMatchers, err := storepb.PromMatchersToMatchers(matchers...)
if err != nil {
Expand Down
7 changes: 0 additions & 7 deletions pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,6 @@ const (
labelError = "error"
)

// Allowed fields in client certificates.
const (
CertificateFieldOrganization = "organization"
CertificateFieldOrganizationalUnit = "organizationalUnit"
CertificateFieldCommonName = "commonName"
)

var (
// errConflict is returned whenever an operation fails due to any conflict-type error.
errConflict = errors.New("conflict")
Expand Down
19 changes: 19 additions & 0 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ import (
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/strutil"
"github.com/thanos-io/thanos/pkg/tenancy"
"github.com/thanos-io/thanos/pkg/tracing"
)

Expand Down Expand Up @@ -1229,6 +1230,12 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
defer s.queryGate.Done()
}

tenant, err := tenancy.GetTenantFromGRPCMetadata(srv.Context())
if err != nil {
level.Warn(s.logger).Log("msg", err)
}
level.Debug(s.logger).Log("msg", "Tenant for Series request", "tenant", tenant)

matchers, err := storepb.MatchersToPromMatchers(req.Matchers...)
if err != nil {
return status.Error(codes.InvalidArgument, err.Error())
Expand Down Expand Up @@ -1478,6 +1485,12 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq
return nil, status.Error(codes.InvalidArgument, errors.Wrap(err, "translate request labels matchers").Error())
}

tenant, err := tenancy.GetTenantFromGRPCMetadata(ctx)
if err != nil {
level.Warn(s.logger).Log("msg", err)
}
level.Debug(s.logger).Log("msg", "Tenant for LabelNames request", "tenant", tenant)

resHints := &hintspb.LabelNamesResponseHints{}

var reqBlockMatchers []*labels.Matcher
Expand Down Expand Up @@ -1666,6 +1679,12 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR
return nil, status.Error(codes.InvalidArgument, errors.Wrap(err, "translate request labels matchers").Error())
}

tenant, err := tenancy.GetTenantFromGRPCMetadata(ctx)
if err != nil {
level.Warn(s.logger).Log("msg", err)
}
level.Debug(s.logger).Log("msg", "Tenant for LabelValues request", "tenant", tenant)

resHints := &hintspb.LabelValuesResponseHints{}

g, gctx := errgroup.WithContext(ctx)
Expand Down
Loading

0 comments on commit dce0794

Please sign in to comment.