Skip to content

Commit

Permalink
hlc: improve scalability of the HLC
Browse files Browse the repository at this point in the history
Previously HLC was calling Now() within a mutex.
This was a bottleneck in case multiple callers had to get time
simultaneously. To address this, this patch moves the calls to Now()
outside of the mutex but keeps in place the existing check.

Previosuly HLC was also calling Now() when receiving a hybrid timestamp
update. Calling Now() can be postponed until a client reads the HLC clock
if the client does not need to check the max offset. This is now fixed
so Now() is called only in case that the max offset has to be checked.

Release note: None
  • Loading branch information
darinpp committed Apr 10, 2020
1 parent edf8eca commit 38a4cbe
Show file tree
Hide file tree
Showing 14 changed files with 121 additions and 130 deletions.
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/store_merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (s *Store) MergeRange(
// invariant that our clock is always greater than or equal to any
// timestamps in the timestamp cache. For a full discussion, see the comment
// on TestStoreRangeMergeTimestampCacheCausality.
_ = s.Clock().Update(freezeStart)
s.Clock().Update(freezeStart)
setTimestampCacheLowWaterMark(s.tsCache, &rightDesc, freezeStart)
}

Expand Down
16 changes: 8 additions & 8 deletions pkg/kv/kvserver/store_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)
Expand Down Expand Up @@ -93,17 +92,14 @@ func (s *Store) Send(

// Update our clock with the incoming request timestamp. This advances the
// local node's clock to a high water mark from all nodes with which it has
// interacted. We hold on to the resulting timestamp - we know that any
// write with a higher timestamp we run into later must have started after
// this point in (absolute) time.
var now hlc.Timestamp
// interacted.
if s.cfg.TestingKnobs.DisableMaxOffsetCheck {
now = s.cfg.Clock.Update(ba.Timestamp)
s.cfg.Clock.Update(ba.Timestamp)
} else {
// If the command appears to come from a node with a bad clock,
// reject it now before we reach that point.
var err error
if now, err = s.cfg.Clock.UpdateAndCheckMaxOffset(ba.Timestamp); err != nil {
if err = s.cfg.Clock.UpdateAndCheckMaxOffset(ctx, ba.Timestamp); err != nil {
return nil, roachpb.NewError(err)
}
}
Expand Down Expand Up @@ -144,6 +140,10 @@ func (s *Store) Send(
}
}

// We get the latest timestamp - we know that any
// write with a higher timestamp we run into later must
// have started after this point in (absolute) time.
now := s.cfg.Clock.Now()
if pErr != nil {
pErr.Now = now
} else {
Expand All @@ -160,7 +160,7 @@ func (s *Store) Send(
// this node, in which case the following is a no-op).
if _, ok := ba.Txn.GetObservedTimestamp(ba.Replica.NodeID); !ok {
txnClone := ba.Txn.Clone()
txnClone.UpdateObservedTimestamp(ba.Replica.NodeID, now)
txnClone.UpdateObservedTimestamp(ba.Replica.NodeID, s.cfg.Clock.Now())
ba.Txn = txnClone
}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1211,6 +1211,7 @@ func (s *Server) startMonitoringForwardClockJumps(ctx context.Context) error {
})

if err := s.clock.StartMonitoringForwardClockJumps(
ctx,
forwardJumpCheckEnabled,
time.NewTicker,
nil, /* tick callback */
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/apply_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ func runPlanInsidePlan(
params.extendedEvalCtx.ExecCfg.LeaseHolderCache,
params.p.Txn(),
func(ts hlc.Timestamp) {
_ = params.extendedEvalCtx.ExecCfg.Clock.Update(ts)
params.extendedEvalCtx.ExecCfg.Clock.Update(ts)
},
params.p.extendedEvalCtx.Tracing,
)
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -860,7 +860,7 @@ func (sc *SchemaChanger) distBackfill(
sc.leaseHolderCache,
nil, /* txn - the flow does not run wholly in a txn */
func(ts hlc.Timestamp) {
_ = sc.clock.Update(ts)
sc.clock.Update(ts)
},
evalCtx.Tracing,
)
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -829,7 +829,7 @@ func (ex *connExecutor) execWithDistSQLEngine(
ex.server.cfg.RangeDescriptorCache, ex.server.cfg.LeaseHolderCache,
planner.txn,
func(ts hlc.Timestamp) {
_ = ex.server.cfg.Clock.Update(ts)
ex.server.cfg.Clock.Update(ts)
},
&ex.sessionTracing,
)
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/distsql_plan_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ func (dsp *DistSQLPlanner) planAndRunCreateStats(
evalCtx.ExecCfg.LeaseHolderCache,
txn,
func(ts hlc.Timestamp) {
_ = evalCtx.ExecCfg.Clock.Update(ts)
evalCtx.ExecCfg.Clock.Update(ts)
},
evalCtx.Tracing,
)
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/distsql_running_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func TestDistSQLRunningInAbortedTxn(t *testing.T) {
execCfg.LeaseHolderCache,
txn,
func(ts hlc.Timestamp) {
_ = execCfg.Clock.Update(ts)
execCfg.Clock.Update(ts)
},
p.ExtendedEvalContext().Tracing,
)
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/explain_distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (n *explainDistSQLNode) startExec(params runParams) error {
execCfg.LeaseHolderCache,
params.p.txn,
func(ts hlc.Timestamp) {
_ = execCfg.Clock.Update(ts)
execCfg.Clock.Update(ts)
},
params.extendedEvalCtx.Tracing,
)
Expand Down Expand Up @@ -172,7 +172,7 @@ func (n *explainDistSQLNode) startExec(params runParams) error {
execCfg.LeaseHolderCache,
newParams.p.txn,
func(ts hlc.Timestamp) {
_ = execCfg.Clock.Update(ts)
execCfg.Clock.Update(ts)
},
newParams.extendedEvalCtx.Tracing,
)
Expand Down Expand Up @@ -225,7 +225,7 @@ func (n *explainDistSQLNode) startExec(params runParams) error {
execCfg.LeaseHolderCache,
params.p.txn,
func(ts hlc.Timestamp) {
_ = execCfg.Clock.Update(ts)
execCfg.Clock.Update(ts)
},
params.extendedEvalCtx.Tracing,
)
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/schema_changer.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ func (sc *SchemaChanger) maybeBackfillCreateTableAs(
sc.execCfg.LeaseHolderCache,
txn,
func(ts hlc.Timestamp) {
_ = sc.clock.Update(ts)
sc.clock.Update(ts)
},
// Make a session tracing object on-the-fly. This is OK
// because it sets "enabled: false" and thus none of the
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/scrub.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ func scrubRunDistSQL(
p.ExecCfg().LeaseHolderCache,
p.txn,
func(ts hlc.Timestamp) {
_ = p.ExecCfg().Clock.Update(ts)
p.ExecCfg().Clock.Update(ts)
},
p.extendedEvalCtx.Tracing,
)
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (dsp *DistSQLPlanner) Exec(
execCfg.LeaseHolderCache,
p.txn,
func(ts hlc.Timestamp) {
_ = execCfg.Clock.Update(ts)
execCfg.Clock.Update(ts)
},
p.ExtendedEvalContext().Tracing,
)
Expand Down
Loading

0 comments on commit 38a4cbe

Please sign in to comment.