Skip to content

Commit

Permalink
table: introduce RowIDShardGenerator and ReservedRowIDAlloc to al…
Browse files Browse the repository at this point in the history
…loc auto row id (#54789)

ref #54397
  • Loading branch information
lcwangchao authored Jul 24, 2024
1 parent e6e8f7f commit 47179ae
Show file tree
Hide file tree
Showing 18 changed files with 223 additions and 41 deletions.
7 changes: 4 additions & 3 deletions pkg/ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -3526,15 +3526,16 @@ func (w *reorgPartitionWorker) fetchRowColVals(txn kv.Transaction, taskRange reo
// Non-clustered table / not unique _tidb_rowid for the whole table
// Generate new _tidb_rowid if exists.
// Due to EXCHANGE PARTITION, the existing _tidb_rowid may collide between partitions!
stmtCtx := w.sessCtx.GetSessionVars().StmtCtx
if stmtCtx.BaseRowID >= stmtCtx.MaxRowID {
if reserved, ok := w.tblCtx.GetReservedRowIDAlloc(); ok && reserved.Exhausted() {
// TODO: Which autoid allocator to use?
ids := uint64(max(1, w.batchCnt-len(w.rowRecords)))
// Keep using the original table's allocator
stmtCtx.BaseRowID, stmtCtx.MaxRowID, err = tables.AllocHandleIDs(w.ctx, w.tblCtx, w.reorgedTbl, ids)
var baseRowID, maxRowID int64
baseRowID, maxRowID, err = tables.AllocHandleIDs(w.ctx, w.tblCtx, w.reorgedTbl, ids)
if err != nil {
return false, errors.Trace(err)
}
reserved.Reset(baseRowID, maxRowID)
}
recordID, err := tables.AllocHandle(w.ctx, w.tblCtx, w.reorgedTbl)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -1067,7 +1067,7 @@ func (e *InsertValues) allocAutoRandomID(ctx context.Context, fieldType *types.F
if err != nil {
return 0, err
}
currentShard := e.Ctx().GetSessionVars().GetCurrentShard(1)
currentShard := e.Ctx().GetSessionVars().GetRowIDShardGenerator().GetCurrentShard(1)
return shardFmt.Compose(currentShard, autoRandomID), nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/sessionctx/stmtctx/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ go_test(
],
embed = [":stmtctx"],
flaky = True,
shard_count = 12,
shard_count = 13,
deps = [
"//pkg/errctx",
"//pkg/kv",
Expand Down
34 changes: 30 additions & 4 deletions pkg/sessionctx/stmtctx/stmtctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,33 @@ func (rf *ReferenceCount) UnFreeze() {
atomic.StoreInt32((*int32)(rf), ReferenceCountNoReference)
}

// ReservedRowIDAlloc is used to reserve autoID for the auto_increment column.
type ReservedRowIDAlloc struct {
base int64
max int64
}

// Reset resets the base and max of reserved rowIDs.
func (r *ReservedRowIDAlloc) Reset(base int64, max int64) {
r.base = base
r.max = max
}

// Consume consumes a reserved rowID.
// If the second return value is false, it means the reserved rowID is exhausted.
func (r *ReservedRowIDAlloc) Consume() (int64, bool) {
if r.base < r.max {
r.base++
return r.base, true
}
return 0, false
}

// Exhausted returns whether the reserved rowID is exhausted.
func (r *ReservedRowIDAlloc) Exhausted() bool {
return r.base >= r.max
}

// StatementContext contains variables for a statement.
// It should be reset before executing a statement.
type StatementContext struct {
Expand Down Expand Up @@ -223,8 +250,8 @@ type StatementContext struct {
// InsertID is the given insert ID of an auto_increment column.
InsertID uint64

BaseRowID int64
MaxRowID int64
// ReservedRowIDAlloc is used to alloc auto ID from the reserved IDs.
ReservedRowIDAlloc ReservedRowIDAlloc

// Copied from SessionVars.TimeZone.
Priority mysql.PriorityEnum
Expand Down Expand Up @@ -972,8 +999,7 @@ func (sc *StatementContext) resetMuForRetry() {
// ResetForRetry resets the changed states during execution.
func (sc *StatementContext) ResetForRetry() {
sc.resetMuForRetry()
sc.MaxRowID = 0
sc.BaseRowID = 0
sc.ReservedRowIDAlloc.Reset(0, 0)
sc.TableIDs = sc.TableIDs[:0]
sc.IndexNames = sc.IndexNames[:0]
sc.TaskID = AllocateTaskID()
Expand Down
26 changes: 26 additions & 0 deletions pkg/sessionctx/stmtctx/stmtctx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,32 @@ func TestErrCtx(t *testing.T) {
require.Equal(t, errctx.NewContextWithLevels(levels, sc), sc.ErrCtx())
}

func TestReservedRowIDAlloc(t *testing.T) {
var reserved stmtctx.ReservedRowIDAlloc
// no reserved by default
require.True(t, reserved.Exhausted())
id, ok := reserved.Consume()
require.False(t, ok)
require.Equal(t, int64(0), id)
// reset some ids
reserved.Reset(12, 15)
require.False(t, reserved.Exhausted())
id, ok = reserved.Consume()
require.True(t, ok)
require.Equal(t, int64(13), id)
id, ok = reserved.Consume()
require.True(t, ok)
require.Equal(t, int64(14), id)
id, ok = reserved.Consume()
require.True(t, ok)
require.Equal(t, int64(15), id)
// exhausted
require.True(t, reserved.Exhausted())
id, ok = reserved.Consume()
require.False(t, ok)
require.Equal(t, int64(0), id)
}

func BenchmarkErrCtx(b *testing.B) {
sc := stmtctx.NewStmtCtx()

Expand Down
72 changes: 53 additions & 19 deletions pkg/sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
"github.com/pingcap/tidb/pkg/util/dbterror/plannererrors"
"github.com/pingcap/tidb/pkg/util/disk"
"github.com/pingcap/tidb/pkg/util/execdetails"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/kvcache"
"github.com/pingcap/tidb/pkg/util/mathutil"
"github.com/pingcap/tidb/pkg/util/memory"
Expand Down Expand Up @@ -203,11 +204,6 @@ type TxnCtxNoNeedToRestore struct {
StartTS uint64
StaleReadTs uint64

// ShardStep indicates the max size of continuous rowid shard in one transaction.
ShardStep int
shardRemain int
currentShard int64

// unchangedKeys is used to store the unchanged keys that needs to lock for pessimistic transaction.
unchangedKeys map[string]struct{}

Expand Down Expand Up @@ -269,24 +265,62 @@ type SavepointRecord struct {
TxnCtxSavepoint TxnCtxNeedToRestore
}

// GetCurrentShard returns the shard for the next `count` IDs.
func (s *SessionVars) GetCurrentShard(count int) int64 {
tc := s.TxnCtx
if s.shardRand == nil {
s.shardRand = rand.New(rand.NewSource(int64(tc.StartTS))) // #nosec G404
// RowIDShardGenerator is used to generate shard for row id.
type RowIDShardGenerator struct {
// shardRand is used for generated rand shard
shardRand *rand.Rand
// shardStep indicates the max size of continuous rowid shard in one transaction.
shardStep int
shardRemain int
currentShard int64
}

// NewRowIDShardGenerator creates a new RowIDShardGenerator.
func NewRowIDShardGenerator(shardRand *rand.Rand, step int) *RowIDShardGenerator {
intest.AssertNotNil(shardRand)
return &RowIDShardGenerator{
shardRand: shardRand,
shardStep: step,
}
if tc.shardRemain <= 0 {
tc.updateShard(s.shardRand)
tc.shardRemain = tc.ShardStep
}

// SetShardStep sets the step of shard
func (s *RowIDShardGenerator) SetShardStep(step int) {
s.shardStep = step
s.shardRemain = 0
}

// GetShardStep returns the shard step
func (s *RowIDShardGenerator) GetShardStep() int {
return s.shardStep
}

// GetCurrentShard returns the shard for the next `count` IDs.
func (s *RowIDShardGenerator) GetCurrentShard(count int) int64 {
if s.shardRemain <= 0 {
s.updateShard(s.shardRand)
s.shardRemain = s.GetShardStep()
}
tc.shardRemain -= count
return tc.currentShard
s.shardRemain -= count
return s.currentShard
}

func (tc *TransactionContext) updateShard(shardRand *rand.Rand) {
func (s *RowIDShardGenerator) updateShard(shardRand *rand.Rand) {
var buf [8]byte
binary.LittleEndian.PutUint64(buf[:], shardRand.Uint64())
tc.currentShard = int64(murmur3.Sum32(buf[:]))
s.currentShard = int64(murmur3.Sum32(buf[:]))
}

// GetRowIDShardGenerator shard row id generator
func (s *SessionVars) GetRowIDShardGenerator() *RowIDShardGenerator {
if s.shardGenerator != nil {
return s.shardGenerator
}

intest.Assert(s.TxnCtx.StartTS > 0)
r := rand.New(rand.NewSource(int64(s.TxnCtx.StartTS))) // #nosec G404
s.shardGenerator = NewRowIDShardGenerator(r, int(s.ShardAllocateStep))
return s.shardGenerator
}

// AddUnchangedKeyForLock adds an unchanged key for pessimistic lock.
Expand Down Expand Up @@ -1514,8 +1548,8 @@ type SessionVars struct {
// StoreBatchSize indicates the batch size limit of store batch, set this field to 0 to disable store batch.
StoreBatchSize int

// shardRand is used by TxnCtx, for the GetCurrentShard() method.
shardRand *rand.Rand
// shardGenerator indicates to generate shard for row id.
shardGenerator *RowIDShardGenerator

// Resource group name
// NOTE: all statement relate operation should use StmtCtx.ResourceGroupName instead.
Expand Down
25 changes: 25 additions & 0 deletions pkg/sessionctx/variable/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package variable_test

import (
"context"
"math/rand"
"strconv"
"sync"
"testing"
Expand Down Expand Up @@ -602,3 +603,27 @@ func TestMapDeltaCols(t *testing.T) {
}
}
}

func TestRowIDShardGenerator(t *testing.T) {
g := variable.NewRowIDShardGenerator(rand.New(rand.NewSource(12345)), 128) // #nosec G404)
// default settings
require.Equal(t, 128, g.GetShardStep())
shard := g.GetCurrentShard(127)
require.Equal(t, int64(3535546008), shard)
require.Equal(t, shard, g.GetCurrentShard(1))
// reset alloc step
g.SetShardStep(5)
require.Equal(t, 5, g.GetShardStep())
// generate shard in step
shard = g.GetCurrentShard(1)
require.Equal(t, int64(1371624976), shard)
require.Equal(t, shard, g.GetCurrentShard(1))
require.Equal(t, shard, g.GetCurrentShard(1))
require.Equal(t, shard, g.GetCurrentShard(2))
// generate shard in next step
shard = g.GetCurrentShard(1)
require.Equal(t, int64(895725277), shard)
// set step will reset clear remain
g.SetShardStep(5)
require.NotEqual(t, shard, g.GetCurrentShard(1))
}
2 changes: 1 addition & 1 deletion pkg/sessiontxn/isolation/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ func (p *baseTxnContextProvider) OnInitialize(ctx context.Context, tp sessiontxn
TxnCtxNoNeedToRestore: variable.TxnCtxNoNeedToRestore{
CreateTime: time.Now(),
InfoSchema: p.infoSchema,
ShardStep: int(sessVars.ShardAllocateStep),
TxnScope: sessVars.CheckAndGetTxnScope(),
},
}
Expand Down Expand Up @@ -295,6 +294,7 @@ func (p *baseTxnContextProvider) ActivateTxn() (kv.Transaction, error) {
sessVars := p.sctx.GetSessionVars()
sessVars.TxnCtxMu.Lock()
sessVars.TxnCtx.StartTS = txn.StartTS()
sessVars.GetRowIDShardGenerator().SetShardStep(int(sessVars.ShardAllocateStep))
sessVars.TxnCtxMu.Unlock()
if sessVars.MemDBFootprint != nil {
sessVars.MemDBFootprint.Detach()
Expand Down
2 changes: 1 addition & 1 deletion pkg/sessiontxn/isolation/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (a *txnAssert[T]) Check(t testing.TB) {
require.Equal(t, a.isolation, txnCtx.Isolation)
require.Equal(t, a.isolation != "", txnCtx.IsPessimistic)
require.Equal(t, sessVars.CheckAndGetTxnScope(), txnCtx.TxnScope)
require.Equal(t, sessVars.ShardAllocateStep, int64(txnCtx.ShardStep))
require.Equal(t, sessVars.ShardAllocateStep, int64(sessVars.GetRowIDShardGenerator().GetShardStep()))
require.False(t, txnCtx.IsStaleness)
require.GreaterOrEqual(t, txnCtx.CreateTime.UnixNano(), a.minStartTime.UnixNano())
require.Equal(t, a.inTxn, sessVars.InTxn())
Expand Down
2 changes: 1 addition & 1 deletion pkg/sessiontxn/staleread/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,11 @@ func (p *StalenessTxnContextProvider) activateStaleTxn() error {
InfoSchema: is,
CreateTime: time.Now(),
StartTS: txn.StartTS(),
ShardStep: int(sessVars.ShardAllocateStep),
IsStaleness: true,
TxnScope: txnScope,
},
}
sessVars.GetRowIDShardGenerator().SetShardStep(int(sessVars.ShardAllocateStep))
sessVars.TxnCtxMu.Unlock()

if interceptor := temptable.SessionSnapshotInterceptor(p.sctx, is); interceptor != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/table/context/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ go_library(
"//pkg/infoschema/context",
"//pkg/kv",
"//pkg/parser/model",
"//pkg/sessionctx/stmtctx",
"//pkg/sessionctx/variable",
"//pkg/tablecodec",
"//pkg/types",
Expand Down
5 changes: 5 additions & 0 deletions pkg/table/context/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
infoschema "github.com/pingcap/tidb/pkg/infoschema/context"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/util/rowcodec"
"github.com/pingcap/tidb/pkg/util/tableutil"
Expand Down Expand Up @@ -79,6 +80,10 @@ type MutateContext interface {
// which is a buffer for table related structures that aims to reuse memory and
// saves allocation.
GetMutateBuffers() *MutateBuffers
// GetRowIDShardGenerator returns the `RowIDShardGenerator` object to shard rows.
GetRowIDShardGenerator() *variable.RowIDShardGenerator
// GetReservedRowIDAlloc returns the `ReservedRowIDAlloc` object to allocate row id from reservation.
GetReservedRowIDAlloc() (*stmtctx.ReservedRowIDAlloc, bool)
// GetBinlogSupport returns a `BinlogSupport` if the context supports it.
// If the context does not support binlog, the second return value will be false.
GetBinlogSupport() (BinlogSupport, bool)
Expand Down
2 changes: 2 additions & 0 deletions pkg/table/contextimpl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ go_library(
"//pkg/expression/context",
"//pkg/parser/model",
"//pkg/sessionctx",
"//pkg/sessionctx/stmtctx",
"//pkg/sessionctx/variable",
"//pkg/table/context",
"//pkg/util/intest",
"//pkg/util/tableutil",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_tipb//go-binlog",
Expand Down
20 changes: 20 additions & 0 deletions pkg/table/contextimpl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ import (
exprctx "github.com/pingcap/tidb/pkg/expression/context"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/table/context"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/tableutil"
"github.com/pingcap/tipb/go-binlog"
)
Expand Down Expand Up @@ -90,6 +92,24 @@ func (ctx *TableContextImpl) GetMutateBuffers() *context.MutateBuffers {
return ctx.mutateBuffers
}

// GetRowIDShardGenerator implements the MutateContext interface.
func (ctx *TableContextImpl) GetRowIDShardGenerator() *variable.RowIDShardGenerator {
return ctx.vars().GetRowIDShardGenerator()
}

// GetReservedRowIDAlloc implements the MutateContext interface.
func (ctx *TableContextImpl) GetReservedRowIDAlloc() (*stmtctx.ReservedRowIDAlloc, bool) {
if sc := ctx.vars().StmtCtx; sc != nil {
return &sc.ReservedRowIDAlloc, true
}
// `StmtCtx` should not be nil in the `variable.SessionVars`.
// We just put an assertion that will panic only if in test here.
// In production code, here returns (nil, false) to make code safe
// because some old code checks `StmtCtx != nil` but we don't know why.
intest.Assert(false, "SessionVars.StmtCtx should not be nil")
return nil, false
}

// GetBinlogSupport implements the MutateContext interface.
func (ctx *TableContextImpl) GetBinlogSupport() (context.BinlogSupport, bool) {
failpoint.Inject("forceWriteBinlog", func() {
Expand Down
7 changes: 7 additions & 0 deletions pkg/table/contextimpl/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,13 @@ func TestMutateContextImplFields(t *testing.T) {
require.Equal(t, sctx.GetSessionVars().IsRowLevelChecksumEnabled(), cfg.IsRowLevelChecksumEnabled)
// mutate buffers
require.NotNil(t, ctx.GetMutateBuffers())
// RowIDShardGenerator
sctx.GetSessionVars().TxnCtx.StartTS = 123
require.Same(t, sctx.GetSessionVars().GetRowIDShardGenerator(), ctx.GetRowIDShardGenerator())
// ReservedRowIDAlloc
reserved, ok := ctx.GetReservedRowIDAlloc()
require.True(t, ok)
require.Same(t, &sctx.GetSessionVars().StmtCtx.ReservedRowIDAlloc, reserved)
// statistics support
txnCtx := sctx.GetSessionVars().TxnCtx
txnCtx.TableDeltaMap = make(map[int64]variable.TableDelta)
Expand Down
Loading

0 comments on commit 47179ae

Please sign in to comment.