Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
94670: metrics: add tenant name to _status/vars output r=knz a=dhartunian

This commit adds a `tenant` prometheus label to each metrics output via the
`_status/vars` HTTP endpoint. The label is populated with either "system" or
the name of the tenant generating the metric. When queried from the system
tenant's HTTP port or handler, the result will now also contain metrics from
all in-process tenants on the same node.

When initializing a tenant, an optional "parent" metrics recorder is passed
down allowing the tenant's recorder to be registered with the parent. When we
query the parent it iterates over all child recorders (much like we already do
for stores) and outputs all of their metrics as well.

Example:
```
sql_txn_commit_count{tenant="system"} 0
sql_txn_commit_count{tenant="demo-tenant"} 0
```

Resolves: #94663
Epic: CRDB-18798

Release note (ops change): Metrics output via `_status/vars` now contain
`tenant` labels allowing the user to distinguish between metrics emitted by
the system tenant vs other app tenants identified by their IDs.

Co-authored-by: Alex Barganier <[email protected]>
Co-authored-by: Aaditya Sondhi <[email protected]>

95234: sql: evaluate correlated subqueries as routines r=mgartner a=mgartner

#### opt: create tree.Routine planning closure in helper function

The logic for creating a planning closure for a `tree.Routine` has been
moved to a helper function so that it can be reused in future commits.

Release note: None

#### sql: evaluate correlated subqueries as routines

Previously, the optimizer would error in rare cases when it was unable
to hoist correlated subqueries into apply-joins. Now, scalar, correlated
subqueries that aren't hoisted are executed successfully. There is
remaining work to apply the same method in this commit to `EXISTS` and
`<op> ANY` subqueries.

Hoisting correlated subqueries is not possible when a conditional
expression, like a `CASE`, wraps a subquery that is not leak-proof. One
of the effects of hoisting a subquery is that the subquery will be
unconditionally evaluated. For leak-proof subqueries, the worst case is
that unnecessary computation is performed. For non-leak-proof
subqueries, errors could originate from the subquery when it should have
never been evaluated because the corresponding conditional expression
was never true. So, in order to support these cases, we must be able to
execute a correlated subquery.

A correlated subquery can be thought of as a relational expression with
parameters that need to be filled in with constant value arguments for
each invocation. It is essentially a user-defined function with a single
statement in the function body. So, the `tree.RoutineExpr` machinery
that powers UDFs is easily repurposed to facilitate evaluation of
correlated subqueries.

Fixes #71908
Fixes #73573
Fixes #80169

Release note (sql change): Some queries which previously resulted in the
error "could not decorrelate subquery" now succeed.


95432: kvserver,kvstorage: move InitEngine r=pavelkalinnikov a=tbg

I'm working on a datadriven test for `kvstorage` and I need to be able
to initialize the engine there.

There's more that should move, but I'm taking it one step at a time.
As we build up more and more sophisticated datadriven tests in
`kvstorage` we can move whatever we need when we need it.

PS: I looked at the unit test being modified by the move and it can't
yet be moved to kvstorage - it also bootstraps the initial ranges, etc,
requiring a lot more code movement until we can move it.

100% mechanical movement via Goland.

Touches #93310.

Epic: CRDB-220
Release note: None


Co-authored-by: David Hartunian <[email protected]>
Co-authored-by: Marcus Gartner <[email protected]>
Co-authored-by: Tobias Grieger <[email protected]>
  • Loading branch information
4 people committed Jan 18, 2023
4 parents 1d2ba22 + 0cfecf3 + 61233f0 + f73bcf7 commit 85b40c7
Show file tree
Hide file tree
Showing 35 changed files with 1,043 additions and 210 deletions.
7 changes: 7 additions & 0 deletions pkg/ccl/logictestccl/tests/3node-tenant/generated_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 8 additions & 1 deletion pkg/cli/mt_start_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,14 @@ func runStartSQL(cmd *cobra.Command, args []string) error {
// always be non-nil, even if NewServer returns a nil pointer (and
// an error). The code below is dependent on the interface
// reference remaining nil in case of error.
s, err := server.NewTenantServer(ctx, stopper, serverCfg.BaseConfig, serverCfg.SQLConfig)
s, err := server.NewTenantServer(
ctx,
stopper,
serverCfg.BaseConfig,
serverCfg.SQLConfig,
nil, /* parentRecorder */
nil, /* tenantNameContainer */
)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/cli/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/cli/exit"
"github.com/cockroachdb/cockroach/pkg/docs"
"github.com/cockroachdb/cockroach/pkg/geo/geos"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage"
"github.com/cockroachdb/cockroach/pkg/security/clientsecopts"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/server"
Expand Down Expand Up @@ -1106,7 +1106,7 @@ func reportServerInfo(
nodeID := serverCfg.BaseConfig.IDContainer.Get()
if serverCfg.SQLConfig.TenantID.IsSystem() {
if initialStart {
if nodeID == kvserver.FirstNodeID {
if nodeID == kvstorage.FirstNodeID {
buf.Printf("status:\tinitialized new cluster\n")
} else {
buf.Printf("status:\tinitialized new node, joined pre-existing cluster\n")
Expand Down
117 changes: 117 additions & 0 deletions pkg/kv/kvserver/kvstorage/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,123 @@ import (
"github.com/cockroachdb/errors"
)

// FirstNodeID is the NodeID assigned to the node bootstrapping a new cluster.
const FirstNodeID = roachpb.NodeID(1)

// FirstStoreID is the StoreID assigned to the first store on the node with ID
// FirstNodeID.
const FirstStoreID = roachpb.StoreID(1)

// InitEngine writes a new store ident to the underlying engine. To
// ensure that no crufty data already exists in the engine, it scans
// the engine contents before writing the new store ident. The engine
// should be completely empty save for a cluster version, which must
// already have been persisted to it. Returns an error if this is not
// the case.
func InitEngine(ctx context.Context, eng storage.Engine, ident roachpb.StoreIdent) error {
exIdent, err := ReadStoreIdent(ctx, eng)
if err == nil {
return errors.Errorf("engine %s is already initialized with ident %s", eng, exIdent.String())
}
if !errors.HasType(err, (*NotBootstrappedError)(nil)) {
return err
}

if err := checkCanInitializeEngine(ctx, eng); err != nil {
return errors.Wrap(err, "while trying to initialize engine")
}

batch := eng.NewBatch()
if err := storage.MVCCPutProto(
ctx,
batch,
nil,
keys.StoreIdentKey(),
hlc.Timestamp{},
hlc.ClockTimestamp{},
nil,
&ident,
); err != nil {
batch.Close()
return err
}
if err := batch.Commit(true /* sync */); err != nil {
return errors.Wrap(err, "persisting engine initialization data")
}

return nil
}

// checkCanInitializeEngine ensures that the engine is empty except for a
// cluster version, which must be present.
func checkCanInitializeEngine(ctx context.Context, eng storage.Engine) error {
// See if this is an already-bootstrapped store.
ident, err := ReadStoreIdent(ctx, eng)
if err == nil {
return errors.Errorf("engine already initialized as %s", ident.String())
} else if !errors.HasType(err, (*NotBootstrappedError)(nil)) {
return errors.Wrap(err, "unable to read store ident")
}
// Engine is not bootstrapped yet (i.e. no StoreIdent). Does it contain a
// cluster version, cached settings and nothing else? Note that there is one
// cluster version key and many cached settings key, and the cluster version
// key precedes the cached settings.
//
// We use an EngineIterator to ensure that there are no keys that cannot be
// parsed as MVCCKeys (e.g. lock table keys) in the engine.
iter := eng.NewEngineIterator(storage.IterOptions{
KeyTypes: storage.IterKeyTypePointsAndRanges,
UpperBound: roachpb.KeyMax,
})
defer iter.Close()
valid, err := iter.SeekEngineKeyGE(storage.EngineKey{Key: roachpb.KeyMin})
if !valid {
if err == nil {
return errors.New("no cluster version found on uninitialized engine")
}
return err
}
getMVCCKey := func() (storage.MVCCKey, error) {
if _, hasRange := iter.HasPointAndRange(); hasRange {
bounds, err := iter.EngineRangeBounds()
if err != nil {
return storage.MVCCKey{}, err
}
return storage.MVCCKey{}, errors.Errorf("found mvcc range key: %s", bounds)
}
var k storage.EngineKey
k, err = iter.EngineKey()
if err != nil {
return storage.MVCCKey{}, err
}
if !k.IsMVCCKey() {
return storage.MVCCKey{}, errors.Errorf("found non-mvcc key: %s", k)
}
return k.ToMVCCKey()
}
var k storage.MVCCKey
if k, err = getMVCCKey(); err != nil {
return err
}
if !k.Key.Equal(keys.StoreClusterVersionKey()) {
return errors.New("no cluster version found on uninitialized engine")
}
valid, err = iter.NextEngineKey()
for valid {
// Only allowed to find cached cluster settings on an uninitialized
// engine.
if k, err = getMVCCKey(); err != nil {
return err
}
if _, err := keys.DecodeStoreCachedSettingsKey(k.Key); err != nil {
return errors.Errorf("engine cannot be bootstrapped, contains key:\n%s", k.String())
}
// There may be more cached cluster settings, so continue iterating.
valid, err = iter.NextEngineKey()
}
return err
}

// IterateIDPrefixKeys helps visit system keys that use RangeID prefixing (such
// as RaftHardStateKey, RangeTombstoneKey, and many others). Such keys could in
// principle exist at any RangeID, and this helper efficiently discovers all the
Expand Down
70 changes: 0 additions & 70 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2698,76 +2698,6 @@ func ReadMaxHLCUpperBound(ctx context.Context, engines []storage.Engine) (int64,
return hlcUpperBound, nil
}

// checkCanInitializeEngine ensures that the engine is empty except for a
// cluster version, which must be present.
func checkCanInitializeEngine(ctx context.Context, eng storage.Engine) error {
// See if this is an already-bootstrapped store.
ident, err := kvstorage.ReadStoreIdent(ctx, eng)
if err == nil {
return errors.Errorf("engine already initialized as %s", ident.String())
} else if !errors.HasType(err, (*kvstorage.NotBootstrappedError)(nil)) {
return errors.Wrap(err, "unable to read store ident")
}
// Engine is not bootstrapped yet (i.e. no StoreIdent). Does it contain a
// cluster version, cached settings and nothing else? Note that there is one
// cluster version key and many cached settings key, and the cluster version
// key precedes the cached settings.
//
// We use an EngineIterator to ensure that there are no keys that cannot be
// parsed as MVCCKeys (e.g. lock table keys) in the engine.
iter := eng.NewEngineIterator(storage.IterOptions{
KeyTypes: storage.IterKeyTypePointsAndRanges,
UpperBound: roachpb.KeyMax,
})
defer iter.Close()
valid, err := iter.SeekEngineKeyGE(storage.EngineKey{Key: roachpb.KeyMin})
if !valid {
if err == nil {
return errors.New("no cluster version found on uninitialized engine")
}
return err
}
getMVCCKey := func() (storage.MVCCKey, error) {
if _, hasRange := iter.HasPointAndRange(); hasRange {
bounds, err := iter.EngineRangeBounds()
if err != nil {
return storage.MVCCKey{}, err
}
return storage.MVCCKey{}, errors.Errorf("found mvcc range key: %s", bounds)
}
var k storage.EngineKey
k, err = iter.EngineKey()
if err != nil {
return storage.MVCCKey{}, err
}
if !k.IsMVCCKey() {
return storage.MVCCKey{}, errors.Errorf("found non-mvcc key: %s", k)
}
return k.ToMVCCKey()
}
var k storage.MVCCKey
if k, err = getMVCCKey(); err != nil {
return err
}
if !k.Key.Equal(keys.StoreClusterVersionKey()) {
return errors.New("no cluster version found on uninitialized engine")
}
valid, err = iter.NextEngineKey()
for valid {
// Only allowed to find cached cluster settings on an uninitialized
// engine.
if k, err = getMVCCKey(); err != nil {
return err
}
if _, err := keys.DecodeStoreCachedSettingsKey(k.Key); err != nil {
return errors.Errorf("engine cannot be bootstrapped, contains key:\n%s", k.String())
}
// There may be more cached cluster settings, so continue iterating.
valid, err = iter.NextEngineKey()
}
return err
}

// GetReplica fetches a replica by Range ID. Returns an error if no replica is found.
//
// See also GetReplicaIfExists for a more perfomant version.
Expand Down
60 changes: 6 additions & 54 deletions pkg/kv/kvserver/store_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,56 +23,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
)

// FirstNodeID is the NodeID assigned to the node bootstrapping a new cluster.
const FirstNodeID = roachpb.NodeID(1)

// FirstStoreID is the StoreID assigned to the first store on the node with ID
// FirstNodeID.
const FirstStoreID = roachpb.StoreID(1)

// InitEngine writes a new store ident to the underlying engine. To
// ensure that no crufty data already exists in the engine, it scans
// the engine contents before writing the new store ident. The engine
// should be completely empty save for a cluster version, which must
// already have been persisted to it. Returns an error if this is not
// the case.
func InitEngine(ctx context.Context, eng storage.Engine, ident roachpb.StoreIdent) error {
exIdent, err := kvstorage.ReadStoreIdent(ctx, eng)
if err == nil {
return errors.Errorf("engine %s is already initialized with ident %s", eng, exIdent.String())
}
if !errors.HasType(err, (*kvstorage.NotBootstrappedError)(nil)) {
return err
}

if err := checkCanInitializeEngine(ctx, eng); err != nil {
return errors.Wrap(err, "while trying to initialize engine")
}

batch := eng.NewBatch()
if err := storage.MVCCPutProto(
ctx,
batch,
nil,
keys.StoreIdentKey(),
hlc.Timestamp{},
hlc.ClockTimestamp{},
nil,
&ident,
); err != nil {
batch.Close()
return err
}
if err := batch.Commit(true /* sync */); err != nil {
return errors.Wrap(err, "persisting engine initialization data")
}

return nil
}

// WriteInitialClusterData writes initialization data to an engine. It creates
// system ranges (filling in meta1 and meta2) and the default zone config.
//
Expand Down Expand Up @@ -113,9 +65,9 @@ func WriteInitialClusterData(
// Initialize various sequence generators.
var nodeIDVal, storeIDVal, rangeIDVal, livenessVal roachpb.Value

nodeIDVal.SetInt(int64(FirstNodeID))
nodeIDVal.SetInt(int64(kvstorage.FirstNodeID))
// The caller will initialize the stores with ids FirstStoreID, ..., FirstStoreID+numStores-1.
storeIDVal.SetInt(int64(FirstStoreID) + int64(numStores) - 1)
storeIDVal.SetInt(int64(kvstorage.FirstStoreID) + int64(numStores) - 1)
// The last range has id = len(splits) + 1
rangeIDVal.SetInt(int64(len(splits) + 1))

Expand All @@ -129,15 +81,15 @@ func WriteInitialClusterData(
//
// [1]: See `(*NodeLiveness).CreateLivenessRecord` and usages for where that happens.
// [2]: See `(*NodeLiveness).Start` for where that happens.
livenessRecord := livenesspb.Liveness{NodeID: FirstNodeID, Epoch: 0}
livenessRecord := livenesspb.Liveness{NodeID: kvstorage.FirstNodeID, Epoch: 0}
if err := livenessVal.SetProto(&livenessRecord); err != nil {
return err
}
initialValues = append(initialValues,
roachpb.KeyValue{Key: keys.NodeIDGenerator, Value: nodeIDVal},
roachpb.KeyValue{Key: keys.StoreIDGenerator, Value: storeIDVal},
roachpb.KeyValue{Key: keys.RangeIDGenerator, Value: rangeIDVal},
roachpb.KeyValue{Key: keys.NodeLivenessKey(FirstNodeID), Value: livenessVal})
roachpb.KeyValue{Key: keys.NodeLivenessKey(kvstorage.FirstNodeID), Value: livenessVal})

// firstRangeMS is going to accumulate the stats for the first range, as we
// write the meta records for all the other ranges.
Expand Down Expand Up @@ -182,8 +134,8 @@ func WriteInitialClusterData(
const firstReplicaID = 1
replicas := []roachpb.ReplicaDescriptor{
{
NodeID: FirstNodeID,
StoreID: FirstStoreID,
NodeID: kvstorage.FirstNodeID,
StoreID: kvstorage.FirstStoreID,
ReplicaID: firstReplicaID,
},
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func createTestStoreWithoutStart(
cv = clusterversion.ClusterVersion{Version: opts.bootstrapVersion}
}
require.NoError(t, WriteClusterVersion(ctx, eng, cv))
if err := InitEngine(
if err := kvstorage.InitEngine(
ctx, eng, storeIdent,
); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -490,7 +490,7 @@ func TestInitializeEngineErrors(t *testing.T) {
stopper.AddCloser(eng)

// Bootstrap should fail if engine has no cluster version yet.
err := InitEngine(ctx, eng, testIdent)
err := kvstorage.InitEngine(ctx, eng, testIdent)
require.ErrorContains(t, err, "no cluster version")

require.NoError(t, WriteClusterVersion(ctx, eng, clusterversion.TestingClusterVersion))
Expand All @@ -507,7 +507,7 @@ func TestInitializeEngineErrors(t *testing.T) {
require.ErrorIs(t, err, &kvstorage.NotBootstrappedError{})

// Bootstrap should fail on non-empty engine.
err = InitEngine(ctx, eng, testIdent)
err = kvstorage.InitEngine(ctx, eng, testIdent)
require.ErrorContains(t, err, "cannot be bootstrapped")

// Bootstrap should fail on MVCC range key in engine.
Expand All @@ -517,7 +517,7 @@ func TestInitializeEngineErrors(t *testing.T) {
Timestamp: hlc.MinTimestamp,
}, storage.MVCCValue{}))

err = InitEngine(ctx, eng, testIdent)
err = kvstorage.InitEngine(ctx, eng, testIdent)
require.ErrorContains(t, err, "found mvcc range key")
}

Expand Down
1 change: 1 addition & 0 deletions pkg/roachpb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ go_library(
"//pkg/util/interval",
"//pkg/util/log",
"//pkg/util/protoutil",
"//pkg/util/syncutil",
"//pkg/util/timetz",
"//pkg/util/uuid",
"@com_github_cockroachdb_apd_v3//:apd",
Expand Down
Loading

0 comments on commit 85b40c7

Please sign in to comment.