Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
72642: democluster: improve logging/tracing r=otan a=knz

All commits but the last 3 from #72644
(Reviewers: only review last 3 commits)

Informs #58938

72727: tracing: misc cleanups r=andreimatei a=andreimatei

See individual commits.

72738: kvserver: include Raft application on leaseholder in request trace  r=andreimatei a=andreimatei

Before this patch, the decoding and application of a Raft command was
not included in the recording of the request that generated the
respective command, even in the case where consensus is synchronous with
respect to the request (i.e. non-AsyncConsensus requests). This was
because, although we plumb tracing information below Raft, the span
under which Raft processing was occurring was Fork()ed from the parent
span (i.e. the request evaluation's span). The reasons why that was are
not good:

1) forking (as opposed to creating a regular child) was introduced in
   #39425. I'm not sure whether there was a particular reason for this
   decision. Perhaps there was fear at the time about the child
   outliving the parent - although I don't think that it does because,
   in the case of async consensus, we fork a parent which I think will
   outlive the child:
   https://github.com/cockroachdb/cockroach/blame/13669f9c9bd92a4c3b0378a558d7735f122c4e72/pkg/kv/kvserver/replica_raft.go#L157

   Regardless of the exact details of the lifetimes, with time it has become
   clear that it's appropriate to create a child when it's expected that
   it will usually not outlive the parent even if, on occasion, it can
   outlive it.

2) forked spans used to be included in the parent's recording until #59815.
   This explains why we regressed here - Raft application used to be
   included in recordings properly.

This patch makes the application span be a regular child, and so the
raft application span is included in the request trace.

Touches #70864. That issue asks for a new tracing feature but I think
what motivated it is the application info missing from query traces.
This patch is sufficient for that.

Release note (general change): Tracing transaction commits now includes
details about replication.

72896: server: don't mix Tracers in test tenant r=andreimatei a=andreimatei

Before this patch, TestServer.StartTenant() would share the Stopper
between the server and the tenant, and it would also proceed to give the
Tenant a new Tracer. The stopper in question has the server's Tracer in
it, and so this sharing is no bueno, because the tenant ends up having
two different Tracers, and it uses either of them fairly arbitrarily at
different points. Intermingling two Tracers like this was never
intended, and attempting to create a child with a different tracer then
the parent will become illegal shortly. This shows that sharing stoppers
between different "server" is a bad idea. This patch stops the practice.

The patch also removes the tracer config options for test tenants
because it was unused and supporting it in combination with the stopper
configuration would require more sanity checks.

Release note: None

Co-authored-by: Raphael 'kena' Poss <[email protected]>
Co-authored-by: Andrei Matei <[email protected]>
  • Loading branch information
3 people committed Nov 18, 2021
5 parents ca8f8e4 + d3ce8d3 + d60e17a + f2bc7d9 + b09aacb commit 4d82c5f
Show file tree
Hide file tree
Showing 17 changed files with 197 additions and 261 deletions.
1 change: 0 additions & 1 deletion pkg/base/test_server_args.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,6 @@ type TestTenantArgs struct {
// Settings allows the caller to control the settings object used for the
// tenant cluster.
Settings *cluster.Settings
Tracer *tracing.Tracer

// AllowSettingClusterSettings, if true, allows the tenant to set in-memory
// cluster settings.
Expand Down
199 changes: 116 additions & 83 deletions pkg/cli/democluster/demo_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,41 +226,43 @@ func (c *transientCluster) Start(
latencyMapWaitChs := make([]chan struct{}, c.demoCtx.NumNodes)

// Step 1: create the first node.
{
phaseCtx := logtags.AddTag(ctx, "phase", 1)
c.infoLog(phaseCtx, "creating the first node")
phaseCtx := logtags.AddTag(ctx, "phase", 1)
if err := func(ctx context.Context) error {
c.infoLog(ctx, "creating the first node")

latencyMapWaitChs[0] = make(chan struct{})
firstRPCAddrReadyCh, err := c.createAndAddNode(phaseCtx, 0, latencyMapWaitChs[0], timeoutCh)
firstRPCAddrReadyCh, err := c.createAndAddNode(ctx, 0, latencyMapWaitChs[0], timeoutCh)
if err != nil {
return err
}
rpcAddrReadyChs[0] = firstRPCAddrReadyCh
return nil
}(phaseCtx); err != nil {
return err
}

// Step 2: start the first node asynchronously, then wait for RPC
// listen readiness or error.
{
phaseCtx := logtags.AddTag(ctx, "phase", 2)

c.infoLog(phaseCtx, "starting first node")
if err := c.startNodeAsync(phaseCtx, 0, errCh, timeoutCh); err != nil {
return err
}
c.infoLog(phaseCtx, "waiting for first node RPC address")
if err := c.waitForRPCAddrReadinessOrError(phaseCtx, 0, errCh, rpcAddrReadyChs, timeoutCh); err != nil {
phaseCtx = logtags.AddTag(ctx, "phase", 2)
if err := func(ctx context.Context) error {
c.infoLog(ctx, "starting first node")
if err := c.startNodeAsync(ctx, 0, errCh, timeoutCh); err != nil {
return err
}
c.infoLog(ctx, "waiting for first node RPC address")
return c.waitForRPCAddrReadinessOrError(ctx, 0, errCh, rpcAddrReadyChs, timeoutCh)
}(phaseCtx); err != nil {
return err
}

// Step 3: create the other nodes and start them asynchronously.
{
phaseCtx := logtags.AddTag(ctx, "phase", 3)
c.infoLog(phaseCtx, "creating other nodes")
phaseCtx = logtags.AddTag(ctx, "phase", 3)
if err := func(ctx context.Context) error {
c.infoLog(ctx, "creating other nodes")

for i := 1; i < c.demoCtx.NumNodes; i++ {
latencyMapWaitChs[i] = make(chan struct{})
rpcAddrReady, err := c.createAndAddNode(phaseCtx, i, latencyMapWaitChs[i], timeoutCh)
rpcAddrReady, err := c.createAndAddNode(ctx, i, latencyMapWaitChs[i], timeoutCh)
if err != nil {
return err
}
Expand All @@ -278,36 +280,40 @@ func (c *transientCluster) Start(

// Start the remaining nodes asynchronously.
for i := 1; i < c.demoCtx.NumNodes; i++ {
if err := c.startNodeAsync(phaseCtx, i, errCh, timeoutCh); err != nil {
if err := c.startNodeAsync(ctx, i, errCh, timeoutCh); err != nil {
return err
}
}
return nil
}(phaseCtx); err != nil {
return err
}

// Step 4: wait for all the nodes to know their RPC address,
// or for an error or premature shutdown.
{
phaseCtx := logtags.AddTag(ctx, "phase", 4)
c.infoLog(phaseCtx, "waiting for remaining nodes to get their RPC address")
phaseCtx = logtags.AddTag(ctx, "phase", 4)
if err := func(ctx context.Context) error {
c.infoLog(ctx, "waiting for remaining nodes to get their RPC address")

for i := 0; i < c.demoCtx.NumNodes; i++ {
if err := c.waitForRPCAddrReadinessOrError(phaseCtx, i, errCh, rpcAddrReadyChs, timeoutCh); err != nil {
if err := c.waitForRPCAddrReadinessOrError(ctx, i, errCh, rpcAddrReadyChs, timeoutCh); err != nil {
return err
}
}
return nil
}(phaseCtx); err != nil {
return err
}

// Step 5: optionally initialize the latency map, then let the servers
// proceed with their initialization.

{
phaseCtx := logtags.AddTag(ctx, "phase", 5)

phaseCtx = logtags.AddTag(ctx, "phase", 5)
if err := func(ctx context.Context) error {
// If latency simulation is requested, initialize the latency map.
if c.demoCtx.SimulateLatency {
// Now, all servers have been started enough to know their own RPC serving
// addresses, but nothing else. Assemble the artificial latency map.
c.infoLog(phaseCtx, "initializing latency map")
c.infoLog(ctx, "initializing latency map")
for i, serv := range c.servers {
latencyMap := serv.Cfg.TestingKnobs.Server.(*server.TestingKnobs).ContextTestingKnobs.ArtificialLatencyMap
srcLocality, ok := serv.Cfg.Locality.Find("region")
Expand All @@ -331,92 +337,115 @@ func (c *transientCluster) Start(
}
}
}
return nil
}(phaseCtx); err != nil {
return err
}

{
phaseCtx := logtags.AddTag(ctx, "phase", 6)

// Step 6: cluster initialization.
phaseCtx = logtags.AddTag(ctx, "phase", 6)
if err := func(ctx context.Context) error {
for i := 0; i < c.demoCtx.NumNodes; i++ {
c.infoLog(phaseCtx, "letting server %d initialize", i)
c.infoLog(ctx, "letting server %d initialize", i)
close(latencyMapWaitChs[i])
if err := c.waitForNodeIDReadiness(phaseCtx, i, errCh, timeoutCh); err != nil {
if err := c.waitForNodeIDReadiness(ctx, i, errCh, timeoutCh); err != nil {
return err
}
c.infoLog(phaseCtx, "node n%d initialized", c.servers[i].NodeID())
c.infoLog(ctx, "node n%d initialized", c.servers[i].NodeID())
}
return nil
}(phaseCtx); err != nil {
return err
}

{
phaseCtx := logtags.AddTag(ctx, "phase", 7)

// Step 7: wait for SQL to signal ready.
phaseCtx = logtags.AddTag(ctx, "phase", 7)
if err := func(ctx context.Context) error {
for i := 0; i < c.demoCtx.NumNodes; i++ {
c.infoLog(phaseCtx, "waiting for server %d SQL readiness", i)
if err := c.waitForSQLReadiness(phaseCtx, i, errCh, timeoutCh); err != nil {
c.infoLog(ctx, "waiting for server %d SQL readiness", i)
if err := c.waitForSQLReadiness(ctx, i, errCh, timeoutCh); err != nil {
return err
}
c.infoLog(phaseCtx, "node n%d ready", c.servers[i].NodeID())
c.infoLog(ctx, "node n%d ready", c.servers[i].NodeID())
}
return nil
}(phaseCtx); err != nil {
return err
}

const demoUsername = "demo"
demoPassword := genDemoPassword(demoUsername)

if c.demoCtx.Multitenant {
phaseCtx := logtags.AddTag(ctx, "phase", 8)
c.infoLog(phaseCtx, "starting tenant nodes")

c.tenantServers = make([]serverutils.TestTenantInterface, c.demoCtx.NumNodes)
for i := 0; i < c.demoCtx.NumNodes; i++ {
// Steal latency map from the neighboring server.
latencyMap := c.servers[i].Cfg.TestingKnobs.Server.(*server.TestingKnobs).ContextTestingKnobs.ArtificialLatencyMap
c.infoLog(phaseCtx, "starting tenant node %d", i)
ts, err := c.servers[i].StartTenant(ctx, base.TestTenantArgs{
// We set the tenant ID to i+2, since tenant 0 is not a tenant, and
// tenant 1 is the system tenant.
TenantID: roachpb.MakeTenantID(uint64(i + 2)),
Stopper: c.stopper,
ForceInsecure: c.demoCtx.Insecure,
SSLCertsDir: c.demoDir,
Locality: c.demoCtx.Localities[i],
TestingKnobs: base.TestingKnobs{
TenantTestingKnobs: &sql.TenantTestingKnobs{DisableLogTags: true},
Server: &server.TestingKnobs{
ContextTestingKnobs: rpc.ContextTestingKnobs{
ArtificialLatencyMap: latencyMap,
// Step 8: initialize tenant servers, if enabled.
phaseCtx = logtags.AddTag(ctx, "phase", 8)
if err := func(ctx context.Context) error {
if c.demoCtx.Multitenant {
c.infoLog(ctx, "starting tenant nodes")

c.tenantServers = make([]serverutils.TestTenantInterface, c.demoCtx.NumNodes)
for i := 0; i < c.demoCtx.NumNodes; i++ {
latencyMap := c.servers[i].Cfg.TestingKnobs.Server.(*server.TestingKnobs).ContextTestingKnobs.ArtificialLatencyMap
c.infoLog(ctx, "starting tenant node %d", i)
ts, err := c.servers[i].StartTenant(ctx, base.TestTenantArgs{
// We set the tenant ID to i+2, since tenant 0 is not a tenant, and
// tenant 1 is the system tenant.
TenantID: roachpb.MakeTenantID(uint64(i + 2)),
Stopper: c.stopper,
ForceInsecure: c.demoCtx.Insecure,
SSLCertsDir: c.demoDir,
Locality: c.demoCtx.Localities[i],
TestingKnobs: base.TestingKnobs{
TenantTestingKnobs: &sql.TenantTestingKnobs{DisableLogTags: true},
Server: &server.TestingKnobs{
ContextTestingKnobs: rpc.ContextTestingKnobs{
ArtificialLatencyMap: latencyMap,
},
},
},
},
})
if err != nil {
return err
}
c.tenantServers[i] = ts
c.infoLog(phaseCtx, "started tenant %d: %s", i, ts.SQLAddr())
if !c.demoCtx.Insecure {
// Set up the demo username and password on each tenant.
ie := ts.DistSQLServer().(*distsql.ServerImpl).ServerConfig.Executor
_, err = ie.Exec(ctx, "tenant-password", nil,
fmt.Sprintf("CREATE USER %s WITH PASSWORD %s", demoUsername, demoPassword))
})
if err != nil {
return err
}
_, err = ie.Exec(ctx, "tenant-grant", nil, fmt.Sprintf("GRANT admin TO %s", demoUsername))
if err != nil {
return err
c.tenantServers[i] = ts
c.infoLog(ctx, "started tenant %d: %s", i, ts.SQLAddr())

// Propagate the tenant server tags to the initialization
// context, so that the initialization messages below are
// properly annotated in traces.
ctx = ts.AnnotateCtx(ctx)

if !c.demoCtx.Insecure {
// Set up the demo username and password on each tenant.
ie := ts.DistSQLServer().(*distsql.ServerImpl).ServerConfig.Executor
_, err = ie.Exec(ctx, "tenant-password", nil,
fmt.Sprintf("CREATE USER %s WITH PASSWORD %s", demoUsername, demoPassword))
if err != nil {
return err
}
_, err = ie.Exec(ctx, "tenant-grant", nil, fmt.Sprintf("GRANT admin TO %s", demoUsername))
if err != nil {
return err
}
}
}
}
return nil
}(phaseCtx); err != nil {
return err
}

{
phaseCtx := logtags.AddTag(ctx, "phase", 9)

// Step 9: run SQL initialization.
phaseCtx = logtags.AddTag(ctx, "phase", 9)
if err := func(ctx context.Context) error {
// Run the SQL initialization. This takes care of setting up the
// initial replication factor for small clusters and creating the
// admin user.
c.infoLog(phaseCtx, "running initial SQL for demo cluster")
c.infoLog(ctx, "running initial SQL for demo cluster")
// Propagate the server log tags to the operations below, to include node ID etc.
server := c.firstServer.Server
ctx = server.AnnotateCtx(ctx)

if err := runInitialSQL(phaseCtx, c.firstServer.Server, c.demoCtx.NumNodes < 3, demoUsername, demoPassword); err != nil {
if err := runInitialSQL(ctx, server, c.demoCtx.NumNodes < 3, demoUsername, demoPassword); err != nil {
return err
}
if c.demoCtx.Insecure {
Expand Down Expand Up @@ -446,9 +475,13 @@ func (c *transientCluster) Start(
// We don't do this in (*server.Server).Start() because we don't want this
// overhead and possible interference in tests.
if !c.demoCtx.DisableTelemetry {
c.infoLog(phaseCtx, "starting telemetry")
c.firstServer.StartDiagnostics(phaseCtx)
c.infoLog(ctx, "starting telemetry")
c.firstServer.StartDiagnostics(ctx)
}

return nil
}(phaseCtx); err != nil {
return err
}
return nil
}
Expand Down
18 changes: 14 additions & 4 deletions pkg/kv/kvserver/apply/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,14 @@ type Command interface {
// that were locally proposed typically have a client waiting on a
// response, so there is additional urgency to apply them quickly.
IsLocal() bool
// Ctx returns the Context in which operations on this Command should be
// performed.
//
// A Command does the unusual thing of capturing a Context because commands
// are generally processed in batches, but different commands might want their
// events going to different places. In particular, commands that have been
// proposed locally get a tracing span tied to the local proposal.
Ctx() context.Context
// AckErrAndFinish signals that the application of the command has been
// rejected due to the provided error. It also relays this rejection of
// the command to its client if it was proposed locally. An error will
Expand Down Expand Up @@ -167,12 +175,13 @@ func takeWhileCmdIter(iter CommandIterator, pred func(Command) bool) CommandIter
// responsible for converting Commands into CheckedCommand. The function
// closes the provided iterator.
func mapCmdIter(
iter CommandIterator, fn func(Command) (CheckedCommand, error),
iter CommandIterator, fn func(context.Context, Command) (CheckedCommand, error),
) (CheckedCommandIterator, error) {
defer iter.Close()
ret := iter.NewCheckedList()
for iter.Valid() {
checked, err := fn(iter.Cur())
cur := iter.Cur()
checked, err := fn(cur.Ctx(), cur)
if err != nil {
ret.Close()
return nil, err
Expand All @@ -188,12 +197,13 @@ func mapCmdIter(
// is responsible for converting CheckedCommand into AppliedCommand. The
// function closes the provided iterator.
func mapCheckedCmdIter(
iter CheckedCommandIterator, fn func(CheckedCommand) (AppliedCommand, error),
iter CheckedCommandIterator, fn func(context.Context, CheckedCommand) (AppliedCommand, error),
) (AppliedCommandIterator, error) {
defer iter.Close()
ret := iter.NewAppliedList()
for iter.Valid() {
applied, err := fn(iter.CurChecked())
curChecked := iter.CurChecked()
applied, err := fn(curChecked.Ctx(), curChecked)
if err != nil {
ret.Close()
return nil, err
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/apply/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type StateMachine interface {
// an untimely crash. This means that applying these side-effects will
// typically update the in-memory representation of the state machine
// to the same state that it would be in if the process restarted.
ApplySideEffects(CheckedCommand) (AppliedCommand, error)
ApplySideEffects(context.Context, CheckedCommand) (AppliedCommand, error)
}

// ErrRemoved can be returned from ApplySideEffects which will stop the task
Expand All @@ -67,7 +67,7 @@ var ErrRemoved = errors.New("replica removed")
type Batch interface {
// Stage inserts a Command into the Batch. In doing so, the Command is
// checked for rejection and a CheckedCommand is returned.
Stage(Command) (CheckedCommand, error)
Stage(context.Context, Command) (CheckedCommand, error)
// ApplyToStateMachine applies the persistent state transitions staged
// in the Batch to the StateMachine, atomically.
ApplyToStateMachine(context.Context) error
Expand Down Expand Up @@ -225,7 +225,7 @@ func (t *Task) AckCommittedEntriesBeforeApplication(ctx context.Context, maxInde
// want to retry the command instead of returning the error to the client.
return forEachCheckedCmdIter(ctx, stagedIter, func(cmd CheckedCommand, ctx context.Context) error {
if !cmd.Rejected() && cmd.IsLocal() && cmd.CanAckBeforeApplication() {
return cmd.AckSuccess(ctx)
return cmd.AckSuccess(cmd.Ctx())
}
return nil
})
Expand Down
Loading

0 comments on commit 4d82c5f

Please sign in to comment.