diff --git a/pkg/ccl/logictestccl/tests/3node-tenant/generated_test.go b/pkg/ccl/logictestccl/tests/3node-tenant/generated_test.go index 737f13d03980..50abb799c34b 100644 --- a/pkg/ccl/logictestccl/tests/3node-tenant/generated_test.go +++ b/pkg/ccl/logictestccl/tests/3node-tenant/generated_test.go @@ -301,6 +301,13 @@ func TestTenantLogic_as_of( runLogicTest(t, "as_of") } +func TestTenantLogic_asyncpg( + t *testing.T, +) { + defer leaktest.AfterTest(t)() + runLogicTest(t, "asyncpg") +} + func TestTenantLogic_auto_span_config_reconciliation_job( t *testing.T, ) { diff --git a/pkg/cli/mt_start_sql.go b/pkg/cli/mt_start_sql.go index 91c8c8fc4817..cf7bd06a7635 100644 --- a/pkg/cli/mt_start_sql.go +++ b/pkg/cli/mt_start_sql.go @@ -87,7 +87,14 @@ func runStartSQL(cmd *cobra.Command, args []string) error { // always be non-nil, even if NewServer returns a nil pointer (and // an error). The code below is dependent on the interface // reference remaining nil in case of error. - s, err := server.NewTenantServer(ctx, stopper, serverCfg.BaseConfig, serverCfg.SQLConfig) + s, err := server.NewTenantServer( + ctx, + stopper, + serverCfg.BaseConfig, + serverCfg.SQLConfig, + nil, /* parentRecorder */ + nil, /* tenantNameContainer */ + ) if err != nil { return nil, err } diff --git a/pkg/cli/start.go b/pkg/cli/start.go index dc94e8e5cabd..b2bf11d71c0d 100644 --- a/pkg/cli/start.go +++ b/pkg/cli/start.go @@ -33,7 +33,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/cli/exit" "github.com/cockroachdb/cockroach/pkg/docs" "github.com/cockroachdb/cockroach/pkg/geo/geos" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage" "github.com/cockroachdb/cockroach/pkg/security/clientsecopts" "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/server" @@ -1106,7 +1106,7 @@ func reportServerInfo( nodeID := serverCfg.BaseConfig.IDContainer.Get() if serverCfg.SQLConfig.TenantID.IsSystem() { if initialStart { - if nodeID == kvserver.FirstNodeID { + if nodeID == kvstorage.FirstNodeID { buf.Printf("status:\tinitialized new cluster\n") } else { buf.Printf("status:\tinitialized new node, joined pre-existing cluster\n") diff --git a/pkg/kv/kvserver/kvstorage/init.go b/pkg/kv/kvserver/kvstorage/init.go index 9a7e96771148..af34e058a5f9 100644 --- a/pkg/kv/kvserver/kvstorage/init.go +++ b/pkg/kv/kvserver/kvstorage/init.go @@ -26,6 +26,123 @@ import ( "github.com/cockroachdb/errors" ) +// FirstNodeID is the NodeID assigned to the node bootstrapping a new cluster. +const FirstNodeID = roachpb.NodeID(1) + +// FirstStoreID is the StoreID assigned to the first store on the node with ID +// FirstNodeID. +const FirstStoreID = roachpb.StoreID(1) + +// InitEngine writes a new store ident to the underlying engine. To +// ensure that no crufty data already exists in the engine, it scans +// the engine contents before writing the new store ident. The engine +// should be completely empty save for a cluster version, which must +// already have been persisted to it. Returns an error if this is not +// the case. +func InitEngine(ctx context.Context, eng storage.Engine, ident roachpb.StoreIdent) error { + exIdent, err := ReadStoreIdent(ctx, eng) + if err == nil { + return errors.Errorf("engine %s is already initialized with ident %s", eng, exIdent.String()) + } + if !errors.HasType(err, (*NotBootstrappedError)(nil)) { + return err + } + + if err := checkCanInitializeEngine(ctx, eng); err != nil { + return errors.Wrap(err, "while trying to initialize engine") + } + + batch := eng.NewBatch() + if err := storage.MVCCPutProto( + ctx, + batch, + nil, + keys.StoreIdentKey(), + hlc.Timestamp{}, + hlc.ClockTimestamp{}, + nil, + &ident, + ); err != nil { + batch.Close() + return err + } + if err := batch.Commit(true /* sync */); err != nil { + return errors.Wrap(err, "persisting engine initialization data") + } + + return nil +} + +// checkCanInitializeEngine ensures that the engine is empty except for a +// cluster version, which must be present. +func checkCanInitializeEngine(ctx context.Context, eng storage.Engine) error { + // See if this is an already-bootstrapped store. + ident, err := ReadStoreIdent(ctx, eng) + if err == nil { + return errors.Errorf("engine already initialized as %s", ident.String()) + } else if !errors.HasType(err, (*NotBootstrappedError)(nil)) { + return errors.Wrap(err, "unable to read store ident") + } + // Engine is not bootstrapped yet (i.e. no StoreIdent). Does it contain a + // cluster version, cached settings and nothing else? Note that there is one + // cluster version key and many cached settings key, and the cluster version + // key precedes the cached settings. + // + // We use an EngineIterator to ensure that there are no keys that cannot be + // parsed as MVCCKeys (e.g. lock table keys) in the engine. + iter := eng.NewEngineIterator(storage.IterOptions{ + KeyTypes: storage.IterKeyTypePointsAndRanges, + UpperBound: roachpb.KeyMax, + }) + defer iter.Close() + valid, err := iter.SeekEngineKeyGE(storage.EngineKey{Key: roachpb.KeyMin}) + if !valid { + if err == nil { + return errors.New("no cluster version found on uninitialized engine") + } + return err + } + getMVCCKey := func() (storage.MVCCKey, error) { + if _, hasRange := iter.HasPointAndRange(); hasRange { + bounds, err := iter.EngineRangeBounds() + if err != nil { + return storage.MVCCKey{}, err + } + return storage.MVCCKey{}, errors.Errorf("found mvcc range key: %s", bounds) + } + var k storage.EngineKey + k, err = iter.EngineKey() + if err != nil { + return storage.MVCCKey{}, err + } + if !k.IsMVCCKey() { + return storage.MVCCKey{}, errors.Errorf("found non-mvcc key: %s", k) + } + return k.ToMVCCKey() + } + var k storage.MVCCKey + if k, err = getMVCCKey(); err != nil { + return err + } + if !k.Key.Equal(keys.StoreClusterVersionKey()) { + return errors.New("no cluster version found on uninitialized engine") + } + valid, err = iter.NextEngineKey() + for valid { + // Only allowed to find cached cluster settings on an uninitialized + // engine. + if k, err = getMVCCKey(); err != nil { + return err + } + if _, err := keys.DecodeStoreCachedSettingsKey(k.Key); err != nil { + return errors.Errorf("engine cannot be bootstrapped, contains key:\n%s", k.String()) + } + // There may be more cached cluster settings, so continue iterating. + valid, err = iter.NextEngineKey() + } + return err +} + // IterateIDPrefixKeys helps visit system keys that use RangeID prefixing (such // as RaftHardStateKey, RangeTombstoneKey, and many others). Such keys could in // principle exist at any RangeID, and this helper efficiently discovers all the diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index cff7a26dcc3e..f3907716bd8f 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -2698,76 +2698,6 @@ func ReadMaxHLCUpperBound(ctx context.Context, engines []storage.Engine) (int64, return hlcUpperBound, nil } -// checkCanInitializeEngine ensures that the engine is empty except for a -// cluster version, which must be present. -func checkCanInitializeEngine(ctx context.Context, eng storage.Engine) error { - // See if this is an already-bootstrapped store. - ident, err := kvstorage.ReadStoreIdent(ctx, eng) - if err == nil { - return errors.Errorf("engine already initialized as %s", ident.String()) - } else if !errors.HasType(err, (*kvstorage.NotBootstrappedError)(nil)) { - return errors.Wrap(err, "unable to read store ident") - } - // Engine is not bootstrapped yet (i.e. no StoreIdent). Does it contain a - // cluster version, cached settings and nothing else? Note that there is one - // cluster version key and many cached settings key, and the cluster version - // key precedes the cached settings. - // - // We use an EngineIterator to ensure that there are no keys that cannot be - // parsed as MVCCKeys (e.g. lock table keys) in the engine. - iter := eng.NewEngineIterator(storage.IterOptions{ - KeyTypes: storage.IterKeyTypePointsAndRanges, - UpperBound: roachpb.KeyMax, - }) - defer iter.Close() - valid, err := iter.SeekEngineKeyGE(storage.EngineKey{Key: roachpb.KeyMin}) - if !valid { - if err == nil { - return errors.New("no cluster version found on uninitialized engine") - } - return err - } - getMVCCKey := func() (storage.MVCCKey, error) { - if _, hasRange := iter.HasPointAndRange(); hasRange { - bounds, err := iter.EngineRangeBounds() - if err != nil { - return storage.MVCCKey{}, err - } - return storage.MVCCKey{}, errors.Errorf("found mvcc range key: %s", bounds) - } - var k storage.EngineKey - k, err = iter.EngineKey() - if err != nil { - return storage.MVCCKey{}, err - } - if !k.IsMVCCKey() { - return storage.MVCCKey{}, errors.Errorf("found non-mvcc key: %s", k) - } - return k.ToMVCCKey() - } - var k storage.MVCCKey - if k, err = getMVCCKey(); err != nil { - return err - } - if !k.Key.Equal(keys.StoreClusterVersionKey()) { - return errors.New("no cluster version found on uninitialized engine") - } - valid, err = iter.NextEngineKey() - for valid { - // Only allowed to find cached cluster settings on an uninitialized - // engine. - if k, err = getMVCCKey(); err != nil { - return err - } - if _, err := keys.DecodeStoreCachedSettingsKey(k.Key); err != nil { - return errors.Errorf("engine cannot be bootstrapped, contains key:\n%s", k.String()) - } - // There may be more cached cluster settings, so continue iterating. - valid, err = iter.NextEngineKey() - } - return err -} - // GetReplica fetches a replica by Range ID. Returns an error if no replica is found. // // See also GetReplicaIfExists for a more perfomant version. diff --git a/pkg/kv/kvserver/store_init.go b/pkg/kv/kvserver/store_init.go index b1665c4d198e..bed46a0ba29b 100644 --- a/pkg/kv/kvserver/store_init.go +++ b/pkg/kv/kvserver/store_init.go @@ -23,56 +23,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/errors" ) -// FirstNodeID is the NodeID assigned to the node bootstrapping a new cluster. -const FirstNodeID = roachpb.NodeID(1) - -// FirstStoreID is the StoreID assigned to the first store on the node with ID -// FirstNodeID. -const FirstStoreID = roachpb.StoreID(1) - -// InitEngine writes a new store ident to the underlying engine. To -// ensure that no crufty data already exists in the engine, it scans -// the engine contents before writing the new store ident. The engine -// should be completely empty save for a cluster version, which must -// already have been persisted to it. Returns an error if this is not -// the case. -func InitEngine(ctx context.Context, eng storage.Engine, ident roachpb.StoreIdent) error { - exIdent, err := kvstorage.ReadStoreIdent(ctx, eng) - if err == nil { - return errors.Errorf("engine %s is already initialized with ident %s", eng, exIdent.String()) - } - if !errors.HasType(err, (*kvstorage.NotBootstrappedError)(nil)) { - return err - } - - if err := checkCanInitializeEngine(ctx, eng); err != nil { - return errors.Wrap(err, "while trying to initialize engine") - } - - batch := eng.NewBatch() - if err := storage.MVCCPutProto( - ctx, - batch, - nil, - keys.StoreIdentKey(), - hlc.Timestamp{}, - hlc.ClockTimestamp{}, - nil, - &ident, - ); err != nil { - batch.Close() - return err - } - if err := batch.Commit(true /* sync */); err != nil { - return errors.Wrap(err, "persisting engine initialization data") - } - - return nil -} - // WriteInitialClusterData writes initialization data to an engine. It creates // system ranges (filling in meta1 and meta2) and the default zone config. // @@ -113,9 +65,9 @@ func WriteInitialClusterData( // Initialize various sequence generators. var nodeIDVal, storeIDVal, rangeIDVal, livenessVal roachpb.Value - nodeIDVal.SetInt(int64(FirstNodeID)) + nodeIDVal.SetInt(int64(kvstorage.FirstNodeID)) // The caller will initialize the stores with ids FirstStoreID, ..., FirstStoreID+numStores-1. - storeIDVal.SetInt(int64(FirstStoreID) + int64(numStores) - 1) + storeIDVal.SetInt(int64(kvstorage.FirstStoreID) + int64(numStores) - 1) // The last range has id = len(splits) + 1 rangeIDVal.SetInt(int64(len(splits) + 1)) @@ -129,7 +81,7 @@ func WriteInitialClusterData( // // [1]: See `(*NodeLiveness).CreateLivenessRecord` and usages for where that happens. // [2]: See `(*NodeLiveness).Start` for where that happens. - livenessRecord := livenesspb.Liveness{NodeID: FirstNodeID, Epoch: 0} + livenessRecord := livenesspb.Liveness{NodeID: kvstorage.FirstNodeID, Epoch: 0} if err := livenessVal.SetProto(&livenessRecord); err != nil { return err } @@ -137,7 +89,7 @@ func WriteInitialClusterData( roachpb.KeyValue{Key: keys.NodeIDGenerator, Value: nodeIDVal}, roachpb.KeyValue{Key: keys.StoreIDGenerator, Value: storeIDVal}, roachpb.KeyValue{Key: keys.RangeIDGenerator, Value: rangeIDVal}, - roachpb.KeyValue{Key: keys.NodeLivenessKey(FirstNodeID), Value: livenessVal}) + roachpb.KeyValue{Key: keys.NodeLivenessKey(kvstorage.FirstNodeID), Value: livenessVal}) // firstRangeMS is going to accumulate the stats for the first range, as we // write the meta records for all the other ranges. @@ -182,8 +134,8 @@ func WriteInitialClusterData( const firstReplicaID = 1 replicas := []roachpb.ReplicaDescriptor{ { - NodeID: FirstNodeID, - StoreID: FirstStoreID, + NodeID: kvstorage.FirstNodeID, + StoreID: kvstorage.FirstStoreID, ReplicaID: firstReplicaID, }, } diff --git a/pkg/kv/kvserver/store_test.go b/pkg/kv/kvserver/store_test.go index 739fe928a024..7e9352a79200 100644 --- a/pkg/kv/kvserver/store_test.go +++ b/pkg/kv/kvserver/store_test.go @@ -264,7 +264,7 @@ func createTestStoreWithoutStart( cv = clusterversion.ClusterVersion{Version: opts.bootstrapVersion} } require.NoError(t, WriteClusterVersion(ctx, eng, cv)) - if err := InitEngine( + if err := kvstorage.InitEngine( ctx, eng, storeIdent, ); err != nil { t.Fatal(err) @@ -490,7 +490,7 @@ func TestInitializeEngineErrors(t *testing.T) { stopper.AddCloser(eng) // Bootstrap should fail if engine has no cluster version yet. - err := InitEngine(ctx, eng, testIdent) + err := kvstorage.InitEngine(ctx, eng, testIdent) require.ErrorContains(t, err, "no cluster version") require.NoError(t, WriteClusterVersion(ctx, eng, clusterversion.TestingClusterVersion)) @@ -507,7 +507,7 @@ func TestInitializeEngineErrors(t *testing.T) { require.ErrorIs(t, err, &kvstorage.NotBootstrappedError{}) // Bootstrap should fail on non-empty engine. - err = InitEngine(ctx, eng, testIdent) + err = kvstorage.InitEngine(ctx, eng, testIdent) require.ErrorContains(t, err, "cannot be bootstrapped") // Bootstrap should fail on MVCC range key in engine. @@ -517,7 +517,7 @@ func TestInitializeEngineErrors(t *testing.T) { Timestamp: hlc.MinTimestamp, }, storage.MVCCValue{})) - err = InitEngine(ctx, eng, testIdent) + err = kvstorage.InitEngine(ctx, eng, testIdent) require.ErrorContains(t, err, "found mvcc range key") } diff --git a/pkg/roachpb/BUILD.bazel b/pkg/roachpb/BUILD.bazel index 26f90ba619e1..580d251af6cc 100644 --- a/pkg/roachpb/BUILD.bazel +++ b/pkg/roachpb/BUILD.bazel @@ -53,6 +53,7 @@ go_library( "//pkg/util/interval", "//pkg/util/log", "//pkg/util/protoutil", + "//pkg/util/syncutil", "//pkg/util/timetz", "//pkg/util/uuid", "@com_github_cockroachdb_apd_v3//:apd", diff --git a/pkg/roachpb/tenant.go b/pkg/roachpb/tenant.go index 28e0d7501cca..e7e42a189256 100644 --- a/pkg/roachpb/tenant.go +++ b/pkg/roachpb/tenant.go @@ -17,6 +17,7 @@ import ( "strconv" "github.com/cockroachdb/cockroach/pkg/base/serverident" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/errors" ) @@ -186,3 +187,30 @@ func (n TenantName) IsValid() error { } return nil } + +// TenantNameContainer is a shared object between the +// server controller and the tenant server that holds +// a reference to the current name of the tenant and +// updates it if needed. This facilitates some +// observability use cases where we need to tag data +// by tenant name. +type TenantNameContainer syncutil.AtomicString + +func NewTenantNameContainer(name TenantName) *TenantNameContainer { + t := &TenantNameContainer{} + t.Set(name) + return t +} + +func (c *TenantNameContainer) Set(name TenantName) { + (*syncutil.AtomicString)(c).Set(string(name)) +} + +func (c *TenantNameContainer) Get() TenantName { + return TenantName(c.String()) +} + +// String implements the fmt.Stringer interface. +func (c *TenantNameContainer) String() string { + return (*syncutil.AtomicString)(c).Get() +} diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index 7bfcb1f7045d..9a811ced3f62 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -421,6 +421,7 @@ go_test( "//pkg/kv/kvserver/closedts/ctpb", "//pkg/kv/kvserver/kvserverbase", "//pkg/kv/kvserver/kvserverpb", + "//pkg/kv/kvserver/kvstorage", "//pkg/kv/kvserver/liveness", "//pkg/kv/kvserver/liveness/livenesspb", "//pkg/roachpb", diff --git a/pkg/server/init.go b/pkg/server/init.go index 2b345c22b92a..016a7e8cbd20 100644 --- a/pkg/server/init.go +++ b/pkg/server/init.go @@ -550,7 +550,7 @@ func (s *initServer) initializeFirstStoreAfterJoin( if err != nil { return nil, err } - if err := kvserver.InitEngine(ctx, firstEngine, sIdent); err != nil { + if err := kvstorage.InitEngine(ctx, firstEngine, sIdent); err != nil { return nil, err } diff --git a/pkg/server/initial_sql.go b/pkg/server/initial_sql.go index 61d379284318..45c8d7629060 100644 --- a/pkg/server/initial_sql.go +++ b/pkg/server/initial_sql.go @@ -14,7 +14,7 @@ import ( "context" "fmt" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" @@ -30,7 +30,7 @@ import ( func (s *Server) RunInitialSQL( ctx context.Context, startSingleNode bool, adminUser, adminPassword string, ) error { - newCluster := s.InitialStart() && s.NodeID() == kvserver.FirstNodeID + newCluster := s.InitialStart() && s.NodeID() == kvstorage.FirstNodeID if !newCluster { // The initial SQL code only runs the first time the cluster is initialized. return nil diff --git a/pkg/server/node.go b/pkg/server/node.go index 0dda9a6a1eec..a4da77f5e84f 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -319,13 +319,13 @@ func bootstrapCluster( sIdent := roachpb.StoreIdent{ ClusterID: clusterID, - NodeID: kvserver.FirstNodeID, - StoreID: kvserver.FirstStoreID + roachpb.StoreID(i), + NodeID: kvstorage.FirstNodeID, + StoreID: kvstorage.FirstStoreID + roachpb.StoreID(i), } // Initialize the engine backing the store with the store ident and cluster // version. - if err := kvserver.InitEngine(ctx, eng, sIdent); err != nil { + if err := kvstorage.InitEngine(ctx, eng, sIdent); err != nil { return nil, err } @@ -654,7 +654,7 @@ func (n *Node) initializeAdditionalStores( StoreID: startID, } for _, eng := range engines { - if err := kvserver.InitEngine(ctx, eng, sIdent); err != nil { + if err := kvstorage.InitEngine(ctx, eng, sIdent); err != nil { return err } diff --git a/pkg/server/node_tombstone_storage_test.go b/pkg/server/node_tombstone_storage_test.go index 9e9c3d75862a..384e184c554c 100644 --- a/pkg/server/node_tombstone_storage_test.go +++ b/pkg/server/node_tombstone_storage_test.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -41,7 +42,7 @@ func TestNodeTombstoneStorage(t *testing.T) { require.NoError(t, err) for i := range engs { require.NoError(t, kvserver.WriteClusterVersion(ctx, engs[i], clusterversion.TestingClusterVersion)) - require.NoError(t, kvserver.InitEngine(ctx, engs[i], roachpb.StoreIdent{ + require.NoError(t, kvstorage.InitEngine(ctx, engs[i], roachpb.StoreIdent{ ClusterID: id, NodeID: 1, StoreID: roachpb.StoreID(1 + i), diff --git a/pkg/server/server.go b/pkg/server/server.go index 89a9ecbfe656..fd7a4b96a61a 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -79,6 +79,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/pgwire" _ "github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scjob" // register jobs declared outside of pkg/sql "github.com/cockroachdb/cockroach/pkg/sql/sem/builtins" + "github.com/cockroachdb/cockroach/pkg/sql/sem/catconstants" "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" _ "github.com/cockroachdb/cockroach/pkg/sql/ttl/ttljob" // register jobs declared outside of pkg/sql _ "github.com/cockroachdb/cockroach/pkg/sql/ttl/ttlschedule" // register schedules declared outside of pkg/sql @@ -719,7 +720,16 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { storeCfg.TestingKnobs = *storeTestingKnobs.(*kvserver.StoreTestingKnobs) } - recorder := status.NewMetricsRecorder(clock, nodeLiveness, rpcContext, g, st) + systemTenantNameContainer := roachpb.NewTenantNameContainer(catconstants.SystemTenantName) + + recorder := status.NewMetricsRecorder( + clock, + nodeLiveness, + rpcContext, + g, + st, + systemTenantNameContainer, + ) registry.AddMetricStruct(rpcContext.RemoteClocks.Metrics()) updates := &diagnostics.UpdateChecker{ @@ -999,7 +1009,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { // Create a server controller. sc := newServerController(ctx, stopper, st, - lateBoundServer, &systemServerWrapper{server: lateBoundServer}) + lateBoundServer, &systemServerWrapper{server: lateBoundServer}, systemTenantNameContainer) // Create the debug API server. debugServer := debug.NewServer( diff --git a/pkg/server/server_controller.go b/pkg/server/server_controller.go index bfdb8fbab33d..dcb0ff9d3a5d 100644 --- a/pkg/server/server_controller.go +++ b/pkg/server/server_controller.go @@ -74,6 +74,15 @@ type serverEntry struct { // server is the actual server. server onDemandServer + // nameContainer holds a shared reference to the current + // name of the tenant within this serverEntry. If the + // tenant's name is updated, the `Set` method on + // nameContainer should be called in order to update + // any subscribers within the tenant. These are typically + // observability-related features that label data with + // the current tenant name. + nameContainer *roachpb.TenantNameContainer + // shouldStop indicates whether shutting down the controller // should cause this server to stop. This is true for all // servers except the one serving the system tenant. @@ -125,7 +134,7 @@ type tenantCreator interface { // can be checked with errors.Is. // // testArgs is used by tests to tweak the tenant server. - newTenant(ctx context.Context, tenantName roachpb.TenantName, index int, deregister func(), + newTenant(ctx context.Context, tenantNameContainer *roachpb.TenantNameContainer, index int, deregister func(), testArgs base.TestSharedProcessTenantArgs, ) (onDemandServer, error) } @@ -136,6 +145,7 @@ func newServerController( st *cluster.Settings, tenantCreator tenantCreator, systemServer onDemandServer, + systemTenantNameContainer *roachpb.TenantNameContainer, ) *serverController { c := &serverController{ st: st, @@ -144,8 +154,9 @@ func newServerController( } c.mu.servers = map[roachpb.TenantName]serverEntry{ catconstants.SystemTenantName: { - server: systemServer, - shouldStop: false, + server: systemServer, + shouldStop: false, + nameContainer: systemTenantNameContainer, }, } parentStopper.AddCloser(c) @@ -197,13 +208,15 @@ func (c *serverController) createServerLocked( // Server does not exist yet: instantiate and start it. c.mu.nextServerIdx++ idx := c.mu.nextServerIdx - s, err := c.tenantCreator.newTenant(ctx, tenantName, idx, deregisterFn, testArgs) + nameContainer := roachpb.NewTenantNameContainer(tenantName) + s, err := c.tenantCreator.newTenant(ctx, nameContainer, idx, deregisterFn, testArgs) if err != nil { return nil, err } c.mu.servers[tenantName] = serverEntry{ - server: s, - shouldStop: true, + server: s, + shouldStop: true, + nameContainer: nameContainer, } return s, nil } @@ -620,12 +633,12 @@ var _ tenantCreator = &Server{} // newTenant implements the tenantCreator interface. func (s *Server) newTenant( ctx context.Context, - tenantName roachpb.TenantName, + tenantNameContainer *roachpb.TenantNameContainer, index int, deregister func(), testArgs base.TestSharedProcessTenantArgs, ) (onDemandServer, error) { - tenantID, err := s.getTenantID(ctx, tenantName) + tenantID, err := s.getTenantID(ctx, tenantNameContainer.Get()) if err != nil { return nil, err } @@ -637,7 +650,7 @@ func (s *Server) newTenant( // Apply the TestTenantArgs, if any. baseCfg.TestingKnobs = testArgs.Knobs - tenantServer, err := s.startInMemoryTenantServerInternal(ctx, baseCfg, sqlCfg, tenantStopper) + tenantServer, err := s.startInMemoryTenantServerInternal(ctx, baseCfg, sqlCfg, tenantStopper, tenantNameContainer) if err != nil { // Abandon any work done so far. tenantStopper.Stop(ctx) @@ -757,7 +770,11 @@ func (s *Server) makeSharedProcessTenantConfig( // Note that even if an error is returned, tasks might have been started with // the stopper, so the caller needs to Stop() it. func (s *Server) startInMemoryTenantServerInternal( - ctx context.Context, baseCfg BaseConfig, sqlCfg SQLConfig, stopper *stop.Stopper, + ctx context.Context, + baseCfg BaseConfig, + sqlCfg SQLConfig, + stopper *stop.Stopper, + tenantNameContainer *roachpb.TenantNameContainer, ) (*SQLServerWrapper, error) { ambientCtx := baseCfg.AmbientCtx stopper.SetTracer(baseCfg.Tracer) @@ -770,7 +787,7 @@ func (s *Server) startInMemoryTenantServerInternal( log.Infof(startCtx, "starting tenant server") // Now start the tenant proper. - tenantServer, err := NewTenantServer(startCtx, stopper, baseCfg, sqlCfg) + tenantServer, err := NewTenantServer(startCtx, stopper, baseCfg, sqlCfg, s.recorder, tenantNameContainer) if err != nil { return nil, err } diff --git a/pkg/server/status/BUILD.bazel b/pkg/server/status/BUILD.bazel index e6981fd55727..eda39b39c5f9 100644 --- a/pkg/server/status/BUILD.bazel +++ b/pkg/server/status/BUILD.bazel @@ -132,11 +132,13 @@ go_test( "//pkg/base", "//pkg/build", "//pkg/roachpb", + "//pkg/rpc", "//pkg/security/securityassets", "//pkg/security/securitytest", "//pkg/server", "//pkg/server/status/statuspb", "//pkg/settings/cluster", + "//pkg/sql/sem/catconstants", "//pkg/testutils/serverutils", "//pkg/ts/tspb", "//pkg/util/hlc", @@ -149,6 +151,7 @@ go_test( "//pkg/util/timeutil", "@com_github_kr_pretty//:pretty", "@com_github_shirou_gopsutil_v3//net", + "@com_github_stretchr_testify//require", ], ) diff --git a/pkg/server/status/recorder.go b/pkg/server/status/recorder.go index d78e1ff4d221..1777b4fb3ab9 100644 --- a/pkg/server/status/recorder.go +++ b/pkg/server/status/recorder.go @@ -129,6 +129,8 @@ type MetricsRecorder struct { // independent. storeRegistries map[roachpb.StoreID]*metric.Registry stores map[roachpb.StoreID]storeMetrics + + tenantRecorders map[roachpb.TenantID]*MetricsRecorder } // prometheusExporter merges metrics into families and generates the @@ -140,6 +142,13 @@ type MetricsRecorder struct { // round-trip) that requires a mutex to be safe for concurrent usage. We // therefore give it its own mutex to avoid blocking other methods. writeSummaryMu syncutil.Mutex + + // tenantID is the tenantID of the tenant this recorder is attached to. + tenantID roachpb.TenantID + + // tenantNameContainer holds the tenant name of the tenant this recorder + // is attached to. It will be used to label metrics that are tenant-specific. + tenantNameContainer *roachpb.TenantNameContainer } // NewMetricsRecorder initializes a new MetricsRecorder object that uses the @@ -150,6 +159,7 @@ func NewMetricsRecorder( rpcContext *rpc.Context, gossip *gossip.Gossip, settings *cluster.Settings, + tenantNameContainer *roachpb.TenantNameContainer, ) *MetricsRecorder { mr := &MetricsRecorder{ HealthChecker: NewHealthChecker(trackedMetrics), @@ -160,11 +170,21 @@ func NewMetricsRecorder( } mr.mu.storeRegistries = make(map[roachpb.StoreID]*metric.Registry) mr.mu.stores = make(map[roachpb.StoreID]storeMetrics) + mr.mu.tenantRecorders = make(map[roachpb.TenantID]*MetricsRecorder) mr.prometheusExporter = metric.MakePrometheusExporter() mr.clock = clock + mr.tenantID = rpcContext.TenantID + mr.tenantNameContainer = tenantNameContainer return mr } +func (mr *MetricsRecorder) AddTenantRecorder(rec *MetricsRecorder) { + mr.mu.Lock() + defer mr.mu.Unlock() + + mr.mu.tenantRecorders[rec.tenantID] = rec +} + // AddNode adds the Registry from an initialized node, along with its descriptor // and start time. func (mr *MetricsRecorder) AddNode( @@ -193,6 +213,7 @@ func (mr *MetricsRecorder) AddNode( nodeIDGauge := metric.NewGauge(metadata) nodeIDGauge.Update(int64(desc.NodeID)) reg.AddMetric(nodeIDGauge) + reg.AddLabel("tenant", mr.tenantNameContainer) } // AddStore adds the Registry from the provided store as a store-level registry @@ -251,6 +272,9 @@ func (mr *MetricsRecorder) ScrapeIntoPrometheus(pm *metric.PrometheusExporter) { for _, reg := range mr.mu.storeRegistries { pm.ScrapeRegistry(reg, includeChildMetrics) } + for _, ten := range mr.mu.tenantRecorders { + ten.ScrapeIntoPrometheus(pm) + } } // PrintAsText writes the current metrics values as plain-text to the writer. diff --git a/pkg/server/status/recorder_test.go b/pkg/server/status/recorder_test.go index 25f27021437d..5823237f234c 100644 --- a/pkg/server/status/recorder_test.go +++ b/pkg/server/status/recorder_test.go @@ -11,6 +11,7 @@ package status import ( + "bytes" "context" "io" "os" @@ -23,8 +24,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/build" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/server/status/statuspb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql/sem/catconstants" "github.com/cockroachdb/cockroach/pkg/ts/tspb" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -33,6 +36,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/system" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/kr/pretty" + "github.com/stretchr/testify/require" ) // byTimeAndName is a slice of tspb.TimeSeriesData. @@ -97,6 +101,97 @@ func (fs fakeStore) Registry() *metric.Registry { return fs.registry } +func TestMetricsRecorderTenants(t *testing.T) { + defer leaktest.AfterTest(t)() + nodeDesc := roachpb.NodeDescriptor{ + NodeID: roachpb.NodeID(1), + } + reg1 := metric.NewRegistry() + manual := timeutil.NewManualTime(timeutil.Unix(0, 100)) + st := cluster.MakeTestingClusterSettings() + rpcCtx := &rpc.Context{ + ContextOptions: rpc.ContextOptions{ + TenantID: roachpb.SystemTenantID, + }, + } + recorder := NewMetricsRecorder( + hlc.NewClock(manual, time.Nanosecond), + nil, + rpcCtx, + nil, + st, + roachpb.NewTenantNameContainer(catconstants.SystemTenantName), + ) + recorder.AddNode(reg1, nodeDesc, 50, "foo:26257", "foo:26258", "foo:5432") + + nodeDescTenant := roachpb.NodeDescriptor{ + NodeID: roachpb.NodeID(1), + } + reg2 := metric.NewRegistry() + stTenant := cluster.MakeTestingClusterSettings() + id, err := roachpb.MakeTenantID(123) + require.NoError(t, err) + rpcCtxTenant := &rpc.Context{ + ContextOptions: rpc.ContextOptions{ + TenantID: id, + }, + } + + appNameContainer := roachpb.NewTenantNameContainer("application") + recorderTenant := NewMetricsRecorder( + hlc.NewClock(manual, time.Nanosecond), + nil, + rpcCtxTenant, + nil, + stTenant, + appNameContainer, + ) + recorderTenant.AddNode(reg2, nodeDescTenant, 50, "foo:26257", "foo:26258", "foo:5432") + + g := metric.NewGauge(metric.Metadata{Name: "some_metric"}) + reg1.AddMetric(g) + g.Update(123) + + g2 := metric.NewGauge(metric.Metadata{Name: "some_metric"}) + reg2.AddMetric(g2) + g2.Update(456) + + recorder.AddTenantRecorder(recorderTenant) + + buf := bytes.NewBuffer([]byte{}) + err = recorder.PrintAsText(buf) + require.NoError(t, err) + + require.Contains(t, buf.String(), `some_metric{tenant="system"} 123`) + require.Contains(t, buf.String(), `some_metric{tenant="application"} 456`) + + bufTenant := bytes.NewBuffer([]byte{}) + err = recorderTenant.PrintAsText(bufTenant) + require.NoError(t, err) + + require.NotContains(t, bufTenant.String(), `some_metric{tenant="system"} 123`) + require.Contains(t, bufTenant.String(), `some_metric{tenant="application"} 456`) + + // Update app name in container and ensure + // output changes accordingly. + appNameContainer.Set("application2") + + buf = bytes.NewBuffer([]byte{}) + err = recorder.PrintAsText(buf) + require.NoError(t, err) + + require.Contains(t, buf.String(), `some_metric{tenant="system"} 123`) + require.Contains(t, buf.String(), `some_metric{tenant="application2"} 456`) + + bufTenant = bytes.NewBuffer([]byte{}) + err = recorderTenant.PrintAsText(bufTenant) + require.NoError(t, err) + + require.NotContains(t, bufTenant.String(), `some_metric{tenant="system"} 123`) + require.Contains(t, bufTenant.String(), `some_metric{tenant="application2"} 456`) + +} + // TestMetricsRecorder verifies that the metrics recorder properly formats the // statistics from various registries, both for Time Series and for Status // Summaries. @@ -143,7 +238,13 @@ func TestMetricsRecorder(t *testing.T) { } manual := timeutil.NewManualTime(timeutil.Unix(0, 100)) st := cluster.MakeTestingClusterSettings() - recorder := NewMetricsRecorder(hlc.NewClock(manual, time.Nanosecond), nil, nil, nil, st /* maxOffset */) + rpcCtx := &rpc.Context{ + ContextOptions: rpc.ContextOptions{ + TenantID: roachpb.SystemTenantID, + }, + } + + recorder := NewMetricsRecorder(hlc.NewClock(manual, time.Nanosecond), nil, rpcCtx, nil, st, roachpb.NewTenantNameContainer("")) recorder.AddStore(store1) recorder.AddStore(store2) recorder.AddNode(reg1, nodeDesc, 50, "foo:26257", "foo:26258", "foo:5432") diff --git a/pkg/server/status_test.go b/pkg/server/status_test.go index 62211282ecff..28695fbc8016 100644 --- a/pkg/server/status_test.go +++ b/pkg/server/status_test.go @@ -1310,20 +1310,20 @@ func TestStatusVarsTxnMetrics(t *testing.T) { if err != nil { t.Fatal(err) } - if !bytes.Contains(body, []byte("sql_txn_begin_count 1")) { - t.Errorf("expected `sql_txn_begin_count 1`, got: %s", body) + if !bytes.Contains(body, []byte("sql_txn_begin_count{tenant=\"system\"} 1")) { + t.Errorf("expected `sql_txn_begin_count{tenant=\"system\"} 1`, got: %s", body) } - if !bytes.Contains(body, []byte("sql_restart_savepoint_count 1")) { - t.Errorf("expected `sql_restart_savepoint_count 1`, got: %s", body) + if !bytes.Contains(body, []byte("sql_restart_savepoint_count{tenant=\"system\"} 1")) { + t.Errorf("expected `sql_restart_savepoint_count{tenant=\"system\"} 1`, got: %s", body) } - if !bytes.Contains(body, []byte("sql_restart_savepoint_release_count 1")) { - t.Errorf("expected `sql_restart_savepoint_release_count 1`, got: %s", body) + if !bytes.Contains(body, []byte("sql_restart_savepoint_release_count{tenant=\"system\"} 1")) { + t.Errorf("expected `sql_restart_savepoint_release_count{tenant=\"system\"} 1`, got: %s", body) } - if !bytes.Contains(body, []byte("sql_txn_commit_count 1")) { - t.Errorf("expected `sql_txn_commit_count 1`, got: %s", body) + if !bytes.Contains(body, []byte("sql_txn_commit_count{tenant=\"system\"} 1")) { + t.Errorf("expected `sql_txn_commit_count{tenant=\"system\"} 1`, got: %s", body) } - if !bytes.Contains(body, []byte("sql_txn_rollback_count 0")) { - t.Errorf("expected `sql_txn_rollback_count 0`, got: %s", body) + if !bytes.Contains(body, []byte("sql_txn_rollback_count{tenant=\"system\"} 0")) { + t.Errorf("expected `sql_txn_rollback_count{tenant=\"system\"} 0`, got: %s", body) } } diff --git a/pkg/server/tenant.go b/pkg/server/tenant.go index 8ef0d05d9283..805cb2dad77d 100644 --- a/pkg/server/tenant.go +++ b/pkg/server/tenant.go @@ -151,7 +151,12 @@ func (s *SQLServerWrapper) Drain( // The caller is responsible for listening to the server's ShutdownRequested() // channel and stopping cfg.stopper when signaled. func NewTenantServer( - ctx context.Context, stopper *stop.Stopper, baseCfg BaseConfig, sqlCfg SQLConfig, + ctx context.Context, + stopper *stop.Stopper, + baseCfg BaseConfig, + sqlCfg SQLConfig, + parentRecorder *status.MetricsRecorder, + tenantNameContainer *roachpb.TenantNameContainer, ) (*SQLServerWrapper, error) { // TODO(knz): Make the license application a per-server thing // instead of a global thing. @@ -164,7 +169,7 @@ func NewTenantServer( // for a tenant server. baseCfg.idProvider.SetTenant(sqlCfg.TenantID) - args, err := makeTenantSQLServerArgs(ctx, stopper, baseCfg, sqlCfg) + args, err := makeTenantSQLServerArgs(ctx, stopper, baseCfg, sqlCfg, parentRecorder, tenantNameContainer) if err != nil { return nil, err } @@ -785,7 +790,12 @@ func (s *SQLServerWrapper) ShutdownRequested() <-chan ShutdownRequest { } func makeTenantSQLServerArgs( - startupCtx context.Context, stopper *stop.Stopper, baseCfg BaseConfig, sqlCfg SQLConfig, + startupCtx context.Context, + stopper *stop.Stopper, + baseCfg BaseConfig, + sqlCfg SQLConfig, + parentRecorder *status.MetricsRecorder, + tenantNameContainer *roachpb.TenantNameContainer, ) (sqlServerArgs, error) { st := baseCfg.Settings @@ -929,7 +939,14 @@ func makeTenantSQLServerArgs( registry.AddMetricStruct(pp.Metrics()) protectedTSProvider = tenantProtectedTSProvider{Provider: pp, st: st} - recorder := status.NewMetricsRecorder(clock, nil, rpcContext, nil, st) + recorder := status.NewMetricsRecorder(clock, nil, rpcContext, nil, st, tenantNameContainer) + // Note: If the tenant is in-process, we attach this tenant's metric + // recorder to the parentRecorder held by the system tenant. This + // ensures that generated Prometheus metrics from the system tenant + // include metrics from this in-process tenant. + if parentRecorder != nil { + parentRecorder.AddTenantRecorder(recorder) + } runtime := status.NewRuntimeStatSampler(startupCtx, clock) registry.AddMetricStruct(runtime) diff --git a/pkg/server/testserver.go b/pkg/server/testserver.go index 770214c46400..c8d68a17f1ea 100644 --- a/pkg/server/testserver.go +++ b/pkg/server/testserver.go @@ -1073,6 +1073,8 @@ func (ts *TestServer) StartTenant( stopper, baseCfg, sqlCfg, + ts.recorder, + roachpb.NewTenantNameContainer(params.TenantName), ) if err != nil { return nil, err diff --git a/pkg/sql/logictest/testdata/logic_test/asyncpg b/pkg/sql/logictest/testdata/logic_test/asyncpg new file mode 100644 index 000000000000..2b2fe6b085f4 --- /dev/null +++ b/pkg/sql/logictest/testdata/logic_test/asyncpg @@ -0,0 +1,229 @@ +# Tests for queries made by asyncpg. + +# Regression test for #71908 and #80169. +query TTTTTTTTTTITTI rowsort +---------------------------------------------------------------------------------------- +WITH RECURSIVE + typeinfo_tree + ( + oid, ns, name, kind, basetype, elemtype, elemdelim, range_subtype, attrtypoids, attrnames, depth + ) + AS ( + SELECT + ti.oid, + ti.ns, + ti.name, + ti.kind, + ti.basetype, + ti.elemtype, + ti.elemdelim, + ti.range_subtype, + ti.attrtypoids, + ti.attrnames, + 0 + FROM + ( + SELECT + t.oid AS oid, + ns.nspname AS ns, + t.typname AS name, + t.typtype AS kind, + CASE + WHEN t.typtype = 'd' + THEN ( + WITH RECURSIVE + typebases (oid, depth) + AS ( + SELECT + t2.typbasetype AS oid, 0 AS depth + FROM + pg_type AS t2 + WHERE + t2.oid = t.oid + UNION ALL + SELECT + t2.typbasetype AS oid, + tb.depth + 1 AS depth + FROM + pg_type AS t2, typebases AS tb + WHERE + tb.oid = t2.oid AND t2.typbasetype != 0 + ) + SELECT + oid + FROM + typebases + ORDER BY + depth DESC + LIMIT + 1 + ) + ELSE NULL + END + AS basetype, + t.typelem AS elemtype, + elem_t.typdelim AS elemdelim, + range_t.rngsubtype AS range_subtype, + CASE + WHEN t.typtype = 'c' + THEN ( + SELECT + array_agg(ia.atttypid ORDER BY ia.attnum) + FROM + pg_attribute AS ia + INNER JOIN pg_class AS c ON ia.attrelid = c.oid + WHERE + ia.attnum > 0 + AND NOT ia.attisdropped + AND c.reltype = t.oid + ) + ELSE NULL + END + AS attrtypoids, + CASE + WHEN t.typtype = 'c' + THEN ( + SELECT + array_agg(ia.attname::STRING ORDER BY ia.attnum) + FROM + pg_attribute AS ia + INNER JOIN pg_class AS c ON ia.attrelid = c.oid + WHERE + ia.attnum > 0 + AND NOT ia.attisdropped + AND c.reltype = t.oid + ) + ELSE NULL + END + AS attrnames + FROM + pg_catalog.pg_type AS t + INNER JOIN pg_catalog.pg_namespace AS ns ON + ns.oid = t.typnamespace + LEFT JOIN pg_type AS elem_t ON + t.typlen = -1 + AND t.typelem != 0 + AND t.typelem = elem_t.oid + LEFT JOIN pg_range AS range_t ON t.oid = range_t.rngtypid + ) + AS ti + WHERE + ti.oid = ANY ARRAY[21, 23]::OID[] + UNION ALL + SELECT + ti.oid, + ti.ns, + ti.name, + ti.kind, + ti.basetype, + ti.elemtype, + ti.elemdelim, + ti.range_subtype, + ti.attrtypoids, + ti.attrnames, + tt.depth + 1 + FROM + ( + SELECT + t.oid AS oid, + ns.nspname AS ns, + t.typname AS name, + t.typtype AS kind, + CASE + WHEN t.typtype = 'd' + THEN ( + WITH RECURSIVE + typebases (oid, depth) + AS ( + SELECT + t2.typbasetype AS oid, 0 AS depth + FROM + pg_type AS t2 + WHERE + t2.oid = t.oid + UNION ALL + SELECT + t2.typbasetype AS oid, + tb.depth + 1 AS depth + FROM + pg_type AS t2, typebases AS tb + WHERE + tb.oid = t2.oid + AND t2.typbasetype != 0 + ) + SELECT + oid + FROM + typebases + ORDER BY + depth DESC + LIMIT + 1 + ) + ELSE NULL + END + AS basetype, + t.typelem AS elemtype, + elem_t.typdelim AS elemdelim, + range_t.rngsubtype AS range_subtype, + CASE + WHEN t.typtype = 'c' + THEN ( + SELECT + array_agg(ia.atttypid ORDER BY ia.attnum) + FROM + pg_attribute AS ia + INNER JOIN pg_class AS c ON ia.attrelid = c.oid + WHERE + ia.attnum > 0 + AND NOT ia.attisdropped + AND c.reltype = t.oid + ) + ELSE NULL + END + AS attrtypoids, + CASE + WHEN t.typtype = 'c' + THEN ( + SELECT + array_agg(ia.attname::STRING ORDER BY ia.attnum) + FROM + pg_attribute AS ia + INNER JOIN pg_class AS c ON ia.attrelid = c.oid + WHERE + ia.attnum > 0 + AND NOT ia.attisdropped + AND c.reltype = t.oid + ) + ELSE NULL + END + AS attrnames + FROM + pg_catalog.pg_type AS t + INNER JOIN pg_catalog.pg_namespace AS ns ON + ns.oid = t.typnamespace + LEFT JOIN pg_type AS elem_t ON + t.typlen = -1 + AND t.typelem != 0 + AND t.typelem = elem_t.oid + LEFT JOIN pg_range AS range_t ON t.oid = range_t.rngtypid + ) + AS ti, + typeinfo_tree AS tt + WHERE + (tt.elemtype IS NOT NULL AND ti.oid = tt.elemtype) + OR (tt.attrtypoids IS NOT NULL AND ti.oid = ANY tt.attrtypoids) + OR (tt.range_subtype IS NOT NULL AND ti.oid = tt.range_subtype) + ) +SELECT + DISTINCT *, + basetype::REGTYPE::STRING AS basetype_name, + elemtype::REGTYPE::STRING AS elemtype_name, + range_subtype::REGTYPE::STRING AS range_subtype_name +FROM + typeinfo_tree +ORDER BY + depth DESC +---- +21 pg_catalog int2 b NULL 0 NULL NULL NULL NULL 0 NULL - NULL +23 pg_catalog int4 b NULL 0 NULL NULL NULL NULL 0 NULL - NULL diff --git a/pkg/sql/logictest/testdata/logic_test/subquery b/pkg/sql/logictest/testdata/logic_test/subquery index 5fd63647833c..a862e3c841ad 100644 --- a/pkg/sql/logictest/testdata/logic_test/subquery +++ b/pkg/sql/logictest/testdata/logic_test/subquery @@ -444,6 +444,174 @@ query I SELECT col0 FROM tab4 WHERE (col0 <= 0 AND col4 <= 5.38) OR (col4 IN (SELECT col1 FROM tab4 WHERE col1 > 8.27)) AND (col3 <= 5 AND (col3 BETWEEN 7 AND 9)) ---- +subtest correlated + +statement ok +CREATE TABLE corr ( + k INT PRIMARY KEY, + i INT +) + +statement ok +INSERT INTO corr VALUES (1, 10), (2, 22), (3, 30), (4, 40), (5, 50) + +query II rowsort +SELECT * FROM corr +WHERE CASE WHEN k < 5 THEN k*10 = (SELECT i FROM corr tmp WHERE k = corr.k) END +---- +1 10 +3 30 +4 40 + +query III colnames,rowsort +SELECT k, i, CASE WHEN k > 1 THEN (SELECT i FROM corr tmp WHERE k = corr.k-1) END AS prev_i +FROM corr +---- +k i prev_i +1 10 NULL +2 22 10 +3 30 22 +4 40 30 +5 50 40 + +# A test similar to the previous showing that the physical ordering requested by +# the ORDER BY is respected when re-optimizing the subquery. +query IIR colnames,rowsort +SELECT k, i, + CASE WHEN k > 1 THEN (SELECT i/1 FROM corr tmp WHERE i < corr.i ORDER BY i DESC LIMIT 1) END prev_i +FROM corr +---- +k i prev_i +1 10 NULL +2 22 10 +3 30 22 +4 40 30 +5 50 40 + +# The same query as above, but as a prepared statement with placeholders in the +# subquery. +statement ok +PREPARE corr_s1(INT) AS +SELECT k, i, + CASE WHEN k > 1 THEN (SELECT i/$1 FROM corr tmp WHERE i < corr.i ORDER BY i DESC LIMIT $1) END prev_i +FROM corr + +query IIR colnames,rowsort +EXECUTE corr_s1(1) +---- +k i prev_i +1 10 NULL +2 22 10 +3 30 22 +4 40 30 +5 50 40 + +# A subquery with a star-expansion. +query IIR colnames,rowsort +SELECT k, i, + CASE WHEN k > 1 THEN ( + SELECT * FROM (VALUES (33::DECIMAL)) v(i) + UNION ALL + SELECT i/1 FROM corr tmp WHERE i < corr.i + ORDER BY i DESC LIMIT 1 + ) END prev_i +FROM corr +---- +k i prev_i +1 10 NULL +2 22 33 +3 30 33 +4 40 33 +5 50 40 + +# TODO(mgartner): Execute correlated EXISTS subqueries. +statement error could not decorrelate subquery +SELECT * FROM corr +WHERE CASE WHEN k < 5 THEN EXISTS (SELECT i FROM corr tmp WHERE i = corr.k*10) END + +# TODO(mgartner): Execute correlated ANY subqueries. +statement error could not decorrelate subquery +SELECT * FROM corr +WHERE CASE WHEN k < 5 THEN k*10 = ANY (SELECT i FROM corr tmp WHERE k <= corr.k) END + +# Correlated subqueries can reference outer with expressions. +query III colnames,rowsort +WITH w AS MATERIALIZED ( + (VALUES (1)) +) +SELECT k, i, + CASE WHEN k > 0 THEN (SELECT i+corr.i FROM corr tmp UNION ALL SELECT * FROM w LIMIT 1) END i_plus_first_i +FROM corr +---- +k i i_plus_first_i +1 10 20 +2 22 32 +3 30 40 +4 40 50 +5 50 60 + +# Uncorrelated subqueries within correlated subqueries can reference outer with +# expressions. +query III colnames,rowsort +WITH w AS MATERIALIZED ( + (VALUES (1)) +) +SELECT k, i, + CASE WHEN k > 0 THEN (SELECT i+corr.i FROM corr tmp WHERE k = (SELECT * FROM w)) END i_plus_first_i +FROM corr +---- +k i i_plus_first_i +1 10 20 +2 22 32 +3 30 40 +4 40 50 +5 50 60 + +# WITH within subquery that is shadowing outer WITH. +query III colnames,rowsort +WITH w(i) AS MATERIALIZED ( + (VALUES (1)) +) +SELECT k, i, + CASE WHEN k > 0 THEN ( + WITH w(i) AS MATERIALIZED ( + (VALUES (2)) + ) + SELECT * FROM w UNION ALL SELECT i+corr.i FROM corr tmp LIMIT 1 + ) END w +FROM corr +UNION ALL +SELECT NULL, NULL, i FROM w +---- +k i w +1 10 2 +2 22 2 +3 30 2 +4 40 2 +5 50 2 +NULL NULL 1 + +statement ok +CREATE TABLE corr2 (i INT); + +# A NOT MATERIALIZED CTE with a mutation. +# TODO(mgartner): Lift this restriction. +statement error could not decorrelate subquery +WITH tmp AS NOT MATERIALIZED (INSERT INTO corr2 VALUES (1) RETURNING i) +SELECT * FROM corr +WHERE CASE WHEN k < 5 THEN k+1 = (SELECT i FROM tmp WHERE i = corr.k) END + +# The statement above should perform the INSERT only once. +# TODO(mgartner): This should return 1 when the statement above can be executed +# successfully. +query I +SELECT count(*) FROM corr2 +---- +0 + + +subtest regressions + statement ok CREATE TABLE z (z INT PRIMARY KEY) diff --git a/pkg/sql/logictest/tests/fakedist-disk/generated_test.go b/pkg/sql/logictest/tests/fakedist-disk/generated_test.go index f75661af5ace..47766181a95b 100644 --- a/pkg/sql/logictest/tests/fakedist-disk/generated_test.go +++ b/pkg/sql/logictest/tests/fakedist-disk/generated_test.go @@ -261,6 +261,13 @@ func TestLogic_as_of( runLogicTest(t, "as_of") } +func TestLogic_asyncpg( + t *testing.T, +) { + defer leaktest.AfterTest(t)() + runLogicTest(t, "asyncpg") +} + func TestLogic_auto_span_config_reconciliation_job( t *testing.T, ) { diff --git a/pkg/sql/logictest/tests/fakedist-vec-off/generated_test.go b/pkg/sql/logictest/tests/fakedist-vec-off/generated_test.go index 64450445b8c3..7aed138e125b 100644 --- a/pkg/sql/logictest/tests/fakedist-vec-off/generated_test.go +++ b/pkg/sql/logictest/tests/fakedist-vec-off/generated_test.go @@ -261,6 +261,13 @@ func TestLogic_as_of( runLogicTest(t, "as_of") } +func TestLogic_asyncpg( + t *testing.T, +) { + defer leaktest.AfterTest(t)() + runLogicTest(t, "asyncpg") +} + func TestLogic_auto_span_config_reconciliation_job( t *testing.T, ) { diff --git a/pkg/sql/logictest/tests/fakedist/generated_test.go b/pkg/sql/logictest/tests/fakedist/generated_test.go index df09b0cd5d1e..dc2a2c990652 100644 --- a/pkg/sql/logictest/tests/fakedist/generated_test.go +++ b/pkg/sql/logictest/tests/fakedist/generated_test.go @@ -261,6 +261,13 @@ func TestLogic_as_of( runLogicTest(t, "as_of") } +func TestLogic_asyncpg( + t *testing.T, +) { + defer leaktest.AfterTest(t)() + runLogicTest(t, "asyncpg") +} + func TestLogic_auto_span_config_reconciliation_job( t *testing.T, ) { diff --git a/pkg/sql/logictest/tests/local-legacy-schema-changer/generated_test.go b/pkg/sql/logictest/tests/local-legacy-schema-changer/generated_test.go index a9cf11b1b661..76f32853792b 100644 --- a/pkg/sql/logictest/tests/local-legacy-schema-changer/generated_test.go +++ b/pkg/sql/logictest/tests/local-legacy-schema-changer/generated_test.go @@ -261,6 +261,13 @@ func TestLogic_as_of( runLogicTest(t, "as_of") } +func TestLogic_asyncpg( + t *testing.T, +) { + defer leaktest.AfterTest(t)() + runLogicTest(t, "asyncpg") +} + func TestLogic_auto_span_config_reconciliation_job( t *testing.T, ) { diff --git a/pkg/sql/logictest/tests/local-vec-off/generated_test.go b/pkg/sql/logictest/tests/local-vec-off/generated_test.go index 2cc4660507a1..e8f3caf8aeae 100644 --- a/pkg/sql/logictest/tests/local-vec-off/generated_test.go +++ b/pkg/sql/logictest/tests/local-vec-off/generated_test.go @@ -261,6 +261,13 @@ func TestLogic_as_of( runLogicTest(t, "as_of") } +func TestLogic_asyncpg( + t *testing.T, +) { + defer leaktest.AfterTest(t)() + runLogicTest(t, "asyncpg") +} + func TestLogic_auto_span_config_reconciliation_job( t *testing.T, ) { diff --git a/pkg/sql/logictest/tests/local/generated_test.go b/pkg/sql/logictest/tests/local/generated_test.go index 189c94f2f5fc..3b8ae8265e1f 100644 --- a/pkg/sql/logictest/tests/local/generated_test.go +++ b/pkg/sql/logictest/tests/local/generated_test.go @@ -261,6 +261,13 @@ func TestLogic_as_of( runLogicTest(t, "as_of") } +func TestLogic_asyncpg( + t *testing.T, +) { + defer leaktest.AfterTest(t)() + runLogicTest(t, "asyncpg") +} + func TestLogic_auto_span_config_reconciliation_job( t *testing.T, ) { diff --git a/pkg/sql/opt/exec/execbuilder/scalar.go b/pkg/sql/opt/exec/execbuilder/scalar.go index b2022205cea9..85ed373bc0a7 100644 --- a/pkg/sql/opt/exec/execbuilder/scalar.go +++ b/pkg/sql/opt/exec/execbuilder/scalar.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/opt/exec" "github.com/cockroachdb/cockroach/pkg/sql/opt/memo" "github.com/cockroachdb/cockroach/pkg/sql/opt/norm" + "github.com/cockroachdb/cockroach/pkg/sql/opt/props/physical" "github.com/cockroachdb/cockroach/pkg/sql/opt/xform" "github.com/cockroachdb/cockroach/pkg/sql/sem/builtins/builtinsregistry" "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" @@ -540,6 +541,8 @@ func (b *Builder) buildItem(ctx *buildScalarCtx, scalar opt.ScalarExpr) (tree.Ty func (b *Builder) buildAny(ctx *buildScalarCtx, scalar opt.ScalarExpr) (tree.TypedExpr, error) { any := scalar.(*memo.AnyExpr) // We cannot execute correlated subqueries. + // TODO(mgartner): Plan correlated ANY subqueries using tree.RoutineExpr. + // See buildSubquery. if !any.Input.Relational().OuterCols.Empty() { return nil, b.decorrelationError() } @@ -581,6 +584,8 @@ func (b *Builder) buildExistsSubquery( ) (tree.TypedExpr, error) { exists := scalar.(*memo.ExistsExpr) // We cannot execute correlated subqueries. + // TODO(mgartner): Plan correlated EXISTS subqueries using tree.RoutineExpr. + // See buildSubquery. if !exists.Input.Relational().OuterCols.Empty() { return nil, b.decorrelationError() } @@ -610,16 +615,56 @@ func (b *Builder) buildSubquery( return nil, errors.Errorf("subquery input with multiple columns") } - // We cannot execute correlated subqueries. - // TODO(mgartner): We can execute correlated subqueries by making them - // routines, like we do below. - if !input.Relational().OuterCols.Empty() { - return nil, b.decorrelationError() + // Build correlated subqueries as lazily-evaluated routines. + if outerCols := input.Relational().OuterCols; !outerCols.Empty() { + // Routines do not yet support mutations. + // TODO(mgartner): Lift this restriction once routines support + // mutations. + if input.Relational().CanMutate { + return nil, b.decorrelationError() + } + + // The outer columns of the subquery become the parameters of the + // routine. + params := outerCols.ToList() + + // The outer columns of the subquery, as indexed columns, are the + // arguments of the routine. + // The arguments are indexed variables representing the outer columns. + args := make(tree.TypedExprs, len(params)) + for i := range args { + args[i] = b.indexedVar(ctx, b.mem.Metadata(), params[i]) + } + + // Create a single-element RelListExpr representing the subquery. + outputCol := input.Relational().OutputCols.SingleColumn() + aliasedCol := opt.AliasedColumn{ + Alias: b.mem.Metadata().ColumnMeta(outputCol).Alias, + ID: outputCol, + } + stmts := memo.RelListExpr{memo.RelRequiredPropsExpr{ + RelExpr: input, + PhysProps: &physical.Required{ + Presentation: physical.Presentation{aliasedCol}, + }, + }} + + // Create a tree.RoutinePlanFn that can plan the single statement + // representing the subquery. + planFn := b.buildRoutinePlanFn(params, stmts, true /* allowOuterWithRefs */) + return tree.NewTypedRoutineExpr( + "subquery", + args, + planFn, + 1, /* numStmts */ + subquery.Typ, + false, /* enableStepping */ + true, /* calledOnNullInput */ + ), nil } + // Build lazily-evaluated, uncorrelated subqueries as routines. if b.planLazySubqueries { - // Build lazily-evaluated subqueries as routines. - // // Note: We reuse the optimizer and memo from the original expression // because we don't need to optimize the subquery input any further. // It's already been fully optimized because it is uncorrelated and has @@ -629,11 +674,14 @@ func (b *Builder) buildSubquery( // once. We should cache their result to avoid all this overhead for // every invocation. inputRowCount := int64(input.Relational().Statistics().RowCountIfAvailable()) + withExprs := make([]builtWithExpr, len(b.withExprs)) + copy(withExprs, b.withExprs) planFn := func( ctx context.Context, ref tree.RoutineExecFactory, stmtIdx int, args tree.Datums, ) (tree.RoutinePlan, error) { ef := ref.(exec.Factory) eb := New(ctx, ef, b.optimizer, b.mem, b.catalog, input, b.evalCtx, false /* allowAutoCommit */, b.IsANSIDML) + eb.withExprs = withExprs eb.disableTelemetry = true eb.planLazySubqueries = true plan, err := eb.buildRelational(input) @@ -722,12 +770,37 @@ func (b *Builder) buildUDF(ctx *buildScalarCtx, scalar opt.ScalarExpr) (tree.Typ } } + // Create a tree.RoutinePlanFn that can plan the statements in the UDF body. + // TODO(mgartner): Add support for WITH expressions inside UDF bodies. + planFn := b.buildRoutinePlanFn(udf.Params, udf.Body, false /* allowOuterWithRefs */) + + // Enable stepping for volatile functions so that statements within the UDF + // see mutations made by the invoking statement and by previous executed + // statements. + enableStepping := udf.Volatility == volatility.Volatile + + return tree.NewTypedRoutineExpr( + udf.Name, + args, + planFn, + len(udf.Body), + udf.Typ, + enableStepping, + udf.CalledOnNullInput, + ), nil +} + +// buildRoutinePlanFn returns a tree.RoutinePlanFn that can plan the statements +// in a routine that has one or more arguments. +func (b *Builder) buildRoutinePlanFn( + params opt.ColList, stmts memo.RelListExpr, allowOuterWithRefs bool, +) tree.RoutinePlanFn { // argOrd returns the ordinal of the argument within the arguments list that // can be substituted for each reference to the given function parameter // column. If the given column does not represent a function parameter, // ok=false is returned. argOrd := func(col opt.ColumnID) (ord int, ok bool) { - for i, param := range udf.Params { + for i, param := range params { if col == param { return i, true } @@ -735,8 +808,14 @@ func (b *Builder) buildUDF(ctx *buildScalarCtx, scalar opt.ScalarExpr) (tree.Typ return 0, false } - // Create a tree.RoutinePlanFn that can plan the statements in the UDF body. - // We do this planning in a separate memo. We use an exec.Factory passed to + // We will pre-populate the withExprs of the new execbuilder. + var withExprs []builtWithExpr + if allowOuterWithRefs { + withExprs = make([]builtWithExpr, len(b.withExprs)) + copy(withExprs, b.withExprs) + } + + // Plan the statements in a separate memo. We use an exec.Factory passed to // the closure rather than b.factory to support executing plans that are // generated with explain.Factory. // @@ -750,32 +829,57 @@ func (b *Builder) buildUDF(ctx *buildScalarCtx, scalar opt.ScalarExpr) (tree.Typ ) (tree.RoutinePlan, error) { o.Init(ctx, b.evalCtx, b.catalog) f := o.Factory() - stmt := udf.Body[stmtIdx] + stmt := stmts[stmtIdx] // Copy the expression into a new memo. Replace parameter references // with argument datums. + addedWithBindings := false var replaceFn norm.ReplaceFunc replaceFn = func(e opt.Expr) opt.Expr { - if v, ok := e.(*memo.VariableExpr); ok { - if ord, ok := argOrd(v.Col); ok { - return f.ConstructConstVal(args[ord], v.Typ) + switch t := e.(type) { + case *memo.VariableExpr: + if ord, ok := argOrd(t.Col); ok { + return f.ConstructConstVal(args[ord], t.Typ) } + + case *memo.WithScanExpr: + // Allow referring to "outer" With expressions, if + // allowOuterWithRefs is true. The bound expressions are not + // part of this Memo, but they are used only for their + // relational properties, which should be valid. + // + // We must add all With expressions to the metadata even if they + // aren't referred to directly because they might be referred to + // transitively through other With expressions. For example, if + // stmt refers to With expression &1, and &1 refers to With + // expression &2, we must include &2 in the metadata so that its + // relational properties are available. See #87733. + // + // We lazily add these With expressions to the metadata here + // because the call to Factory.CopyAndReplace below clears With + // expressions in the metadata. + if allowOuterWithRefs && !addedWithBindings { + b.mem.Metadata().ForEachWithBinding(func(id opt.WithID, expr opt.Expr) { + f.Metadata().AddWithBinding(id, expr) + }) + addedWithBindings = true + } + // Fall through. } return f.CopyAndReplaceDefault(e, replaceFn) } f.CopyAndReplace(stmt, stmt.PhysProps, replaceFn) // Optimize the memo. - newRightSide, err := o.Optimize() + optimizedExpr, err := o.Optimize() if err != nil { return nil, err } // Build the memo into a plan. - // TODO(mgartner): Add support for WITH expressions inside UDF bodies. - // TODO(mgartner): Add support for subqueries inside UDF bodies. ef := ref.(exec.Factory) - eb := New(ctx, ef, &o, f.Memo(), b.catalog, newRightSide, b.evalCtx, false /* allowAutoCommit */, b.IsANSIDML) + eb := New(ctx, ef, &o, f.Memo(), b.catalog, optimizedExpr, b.evalCtx, false /* allowAutoCommit */, b.IsANSIDML) + eb.withExprs = withExprs eb.disableTelemetry = true eb.planLazySubqueries = true plan, err := eb.Build() @@ -785,26 +889,23 @@ func (b *Builder) buildUDF(ctx *buildScalarCtx, scalar opt.ScalarExpr) (tree.Typ // inner expression. fmtFlags := memo.ExprFmtHideQualifications | memo.ExprFmtHideScalars | memo.ExprFmtHideTypes - explainOpt := o.FormatExpr(newRightSide, fmtFlags) + explainOpt := o.FormatExpr(optimizedExpr, fmtFlags) err = errors.WithDetailf(err, "routineExpr:\n%s", explainOpt) } return nil, err } + if len(eb.subqueries) > 0 { + return nil, expectedLazyRoutineError("subquery") + } + if len(eb.cascades) > 0 { + return nil, expectedLazyRoutineError("cascade") + } + if len(eb.checks) > 0 { + return nil, expectedLazyRoutineError("check") + } return plan, nil } - // Enable stepping for volatile functions so that statements within the UDF - // see mutations made by the invoking statement and by previous executed - // statements. - enableStepping := udf.Volatility == volatility.Volatile - return tree.NewTypedRoutineExpr( - udf.Name, - args, - planFn, - len(udf.Body), - udf.Typ, - enableStepping, - udf.CalledOnNullInput, - ), nil + return planFn } func expectedLazyRoutineError(typ string) error { diff --git a/pkg/sql/opt/exec/execbuilder/testdata/subquery b/pkg/sql/opt/exec/execbuilder/testdata/subquery index fecebf6fc26d..b62c35c720a6 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/subquery +++ b/pkg/sql/opt/exec/execbuilder/testdata/subquery @@ -451,3 +451,64 @@ vectorized: true estimated row count: 1,000 (missing stats) table: abc@abc_pkey spans: FULL SCAN + +statement ok +CREATE TABLE corr ( + k INT PRIMARY KEY, + i INT, + FAMILY (k, i) +); +INSERT INTO corr VALUES (1, 10), (2, 22), (3, 30), (4, 40), (5, 50) + +# Case where the subquery in a filter cannot be hoisted into an apply-join. +query T +EXPLAIN (VERBOSE) +SELECT * FROM corr +WHERE CASE WHEN k < 5 THEN k*10 = (SELECT i FROM corr tmp WHERE k = corr.k) END +---- +distribution: local +vectorized: true +· +• filter +│ columns: (k, i) +│ estimated row count: 333 (missing stats) +│ filter: CASE WHEN k < 5 THEN (k * 10) = subquery(k) ELSE CAST(NULL AS BOOL) END +│ +└── • scan + columns: (k, i) + estimated row count: 1,000 (missing stats) + table: corr@corr_pkey + spans: FULL SCAN + +# Case where the subquery in a projection cannot be hoisted into an apply-join. +query T +EXPLAIN (VERBOSE) +SELECT k, i, CASE WHEN k > 1 THEN (SELECT i FROM corr tmp WHERE k = corr.k-1) ELSE 0 END AS prev_i +FROM corr +---- +distribution: local +vectorized: true +· +• render +│ columns: (k, i, prev_i) +│ render prev_i: CASE WHEN k > 1 THEN subquery(k) ELSE 0 END +│ render k: k +│ render i: i +│ +└── • scan + columns: (k, i) + estimated row count: 1,000 (missing stats) + table: corr@corr_pkey + spans: FULL SCAN + +# Each invocation of the subquery is re-optimized, so the scans are constrained +# by constant values substituted for corr.k. +query T kvtrace +SELECT k, i, CASE WHEN k > 1 THEN (SELECT i FROM corr tmp WHERE k = corr.k-1) ELSE 0 END AS prev_i +FROM corr +---- +Scan /Table/110/{1-2} +Scan /Table/110/1/1/0 +Scan /Table/110/1/2/0 +Scan /Table/110/1/3/0 +Scan /Table/110/1/4/0 diff --git a/pkg/testutils/localtestcluster/BUILD.bazel b/pkg/testutils/localtestcluster/BUILD.bazel index 9921218f9843..d16746fc458d 100644 --- a/pkg/testutils/localtestcluster/BUILD.bazel +++ b/pkg/testutils/localtestcluster/BUILD.bazel @@ -18,6 +18,7 @@ go_library( "//pkg/kv/kvserver", "//pkg/kv/kvserver/allocator/storepool", "//pkg/kv/kvserver/closedts/sidetransport", + "//pkg/kv/kvserver/kvstorage", "//pkg/kv/kvserver/liveness", "//pkg/roachpb", "//pkg/rpc", diff --git a/pkg/testutils/localtestcluster/local_test_cluster.go b/pkg/testutils/localtestcluster/local_test_cluster.go index 3a64df49b7e0..7683056c3760 100644 --- a/pkg/testutils/localtestcluster/local_test_cluster.go +++ b/pkg/testutils/localtestcluster/local_test_cluster.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/sidetransport" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" @@ -211,7 +212,7 @@ func (ltc *LocalTestCluster) Start(t testing.TB, baseCtx *base.Config, initFacto if err := kvserver.WriteClusterVersion(ctx, ltc.Eng, clusterversion.TestingClusterVersion); err != nil { t.Fatalf("unable to write cluster version: %s", err) } - if err := kvserver.InitEngine( + if err := kvstorage.InitEngine( ctx, ltc.Eng, roachpb.StoreIdent{NodeID: nodeID, StoreID: 1}, ); err != nil { t.Fatalf("unable to start local test cluster: %s", err) diff --git a/pkg/util/metric/registry.go b/pkg/util/metric/registry.go index f7f1f89554ff..e8f5e903851d 100644 --- a/pkg/util/metric/registry.go +++ b/pkg/util/metric/registry.go @@ -30,8 +30,18 @@ import ( // when exported to prometheus. type Registry struct { syncutil.Mutex - labels []*prometheusgo.LabelPair + labels []labelPair tracked map[string]Iterable + + // computedLabels get filled in by getLabels(). + // We hold onto the slice to avoid a re-allocation every + // time the metrics get scraped. + computedLabels []*prometheusgo.LabelPair +} + +type labelPair struct { + name string + value interface{} } // Struct can be implemented by the types of members of a metric @@ -43,26 +53,28 @@ type Struct interface { // NewRegistry creates a new Registry. func NewRegistry() *Registry { return &Registry{ - labels: []*prometheusgo.LabelPair{}, - tracked: map[string]Iterable{}, + labels: []labelPair{}, + computedLabels: []*prometheusgo.LabelPair{}, + tracked: map[string]Iterable{}, } } // AddLabel adds a label/value pair for this registry. -func (r *Registry) AddLabel(name, value string) { +func (r *Registry) AddLabel(name string, value interface{}) { r.Lock() defer r.Unlock() - r.labels = append(r.labels, - &prometheusgo.LabelPair{ - Name: proto.String(exportedLabel(name)), - Value: proto.String(value), - }) + r.labels = append(r.labels, labelPair{name: exportedLabel(name), value: value}) + r.computedLabels = append(r.computedLabels, &prometheusgo.LabelPair{}) } func (r *Registry) getLabels() []*prometheusgo.LabelPair { r.Lock() defer r.Unlock() - return r.labels + for i, l := range r.labels { + r.computedLabels[i].Name = proto.String(l.name) + r.computedLabels[i].Value = proto.String(fmt.Sprint(l.value)) + } + return r.computedLabels } // AddMetric adds the passed-in metric to the registry.