Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

table: make AddRecordOpt/UpdateRecordOpt/CreateIdxOpt immutable #55325

Merged
merged 1 commit into from
Aug 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 24 additions & 16 deletions pkg/table/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,46 +32,54 @@ type IndexIterator interface {
// CreateIdxOpt contains the options will be used when creating an index.
type CreateIdxOpt struct {
commonMutateOpt
IgnoreAssertion bool
FromBackFill bool
ignoreAssertion bool
fromBackFill bool
}

// NewCreateIdxOpt creates a new CreateIdxOpt.
func NewCreateIdxOpt(opts ...CreateIdxOption) *CreateIdxOpt {
opt := &CreateIdxOpt{}
for _, o := range opts {
o.ApplyCreateIdxOpt(opt)
o.applyCreateIdxOpt(opt)
}
return opt
}

// IgnoreAssertion indicates whether to ignore assertion.
func (opt *CreateIdxOpt) IgnoreAssertion() bool {
return opt.ignoreAssertion
}

// FromBackFill indicates whether the index is created by DDL backfill worker.
func (opt *CreateIdxOpt) FromBackFill() bool {
return opt.fromBackFill
}

// CreateIdxOption is defined for the Create() method of the Index interface.
type CreateIdxOption interface {
ApplyCreateIdxOpt(*CreateIdxOpt)
applyCreateIdxOpt(*CreateIdxOpt)
}

// CreateIdxOptFunc is defined for the Create() method of Index interface.
// Here is a blog post about how to use this pattern:
// https://dave.cheney.net/2014/10/17/functional-options-for-friendly-apis
type CreateIdxOptFunc func(*CreateIdxOpt)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also delete CreateIdxOptFunc to avoid some operations like WithIgnoreAssertion(createIdxOpt) to modify the internal state of CreateIdxOpt.

type withIgnoreAssertion struct{}

// ApplyCreateIdxOpt implements the CreateIdxOption interface.
func (f CreateIdxOptFunc) ApplyCreateIdxOpt(opt *CreateIdxOpt) {
f(opt)
func (withIgnoreAssertion) applyCreateIdxOpt(opt *CreateIdxOpt) {
opt.ignoreAssertion = true
}

// WithIgnoreAssertion uses to indicate the process can ignore assertion.
var WithIgnoreAssertion CreateIdxOptFunc = func(opt *CreateIdxOpt) {
opt.IgnoreAssertion = true
var WithIgnoreAssertion CreateIdxOption = withIgnoreAssertion{}

type fromBackfill struct{}

func (fromBackfill) applyCreateIdxOpt(opt *CreateIdxOpt) {
opt.fromBackFill = true
}

// FromBackfill indicates that the index is created by DDL backfill worker.
// In the backfill-merge process, the index KVs from DML will be redirected to
// the temp index. On the other hand, the index KVs from DDL backfill worker should
// never be redirected to the temp index.
var FromBackfill CreateIdxOptFunc = func(opt *CreateIdxOpt) {
opt.FromBackFill = true
}
var FromBackfill CreateIdxOption = fromBackfill{}

// Index is the interface for index data on KV store.
type Index interface {
Expand Down
77 changes: 51 additions & 26 deletions pkg/table/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,52 +121,77 @@ type RecordIterFunc func(h kv.Handle, rec []types.Datum, cols []*Column) (more b

// commonMutateOpt is the common options for mutating a table.
type commonMutateOpt struct {
Ctx context.Context
DupKeyCheck DupKeyCheckMode
ctx context.Context
dupKeyCheck DupKeyCheckMode
}

// Ctx returns the go context in the option
func (opt *commonMutateOpt) Ctx() context.Context {
return opt.ctx
}

// DupKeyCheck returns the DupKeyCheckMode in the option
func (opt *commonMutateOpt) DupKeyCheck() DupKeyCheckMode {
return opt.dupKeyCheck
}

// AddRecordOpt contains the options will be used when adding a record.
type AddRecordOpt struct {
commonMutateOpt
IsUpdate bool
ReserveAutoID int
isUpdate bool
reserveAutoID int
}

// NewAddRecordOpt creates a new AddRecordOpt with options.
func NewAddRecordOpt(opts ...AddRecordOption) *AddRecordOpt {
opt := &AddRecordOpt{}
for _, o := range opts {
o.ApplyAddRecordOpt(opt)
o.applyAddRecordOpt(opt)
}
return opt
}

// IsUpdate indicates whether the `AddRecord` operation is in an update statement.
func (opt *AddRecordOpt) IsUpdate() bool {
return opt.isUpdate
}

// ReserveAutoID indicates the auto id count that should be reserved.
func (opt *AddRecordOpt) ReserveAutoID() int {
return opt.reserveAutoID
}

// GetCreateIdxOpt creates a CreateIdxOpt.
func (opt *AddRecordOpt) GetCreateIdxOpt() *CreateIdxOpt {
return &CreateIdxOpt{commonMutateOpt: opt.commonMutateOpt}
}

// AddRecordOption is defined for the AddRecord() method of the Table interface.
type AddRecordOption interface {
ApplyAddRecordOpt(*AddRecordOpt)
applyAddRecordOpt(*AddRecordOpt)
}

// UpdateRecordOpt contains the options will be used when updating a record.
type UpdateRecordOpt struct {
commonMutateOpt
// SkipWriteUntouchedIndices is an option to skip write untouched indices when updating a record.
SkipWriteUntouchedIndices bool
// skipWriteUntouchedIndices is an option to skip write untouched indices when updating a record.
skipWriteUntouchedIndices bool
}

// NewUpdateRecordOpt creates a new UpdateRecordOpt with options.
func NewUpdateRecordOpt(opts ...UpdateRecordOption) *UpdateRecordOpt {
opt := &UpdateRecordOpt{}
for _, o := range opts {
o.ApplyUpdateRecordOpt(opt)
o.applyUpdateRecordOpt(opt)
}
return opt
}

// SkipWriteUntouchedIndices indicates whether to skip write untouched indices when updating a record.
func (opt *UpdateRecordOpt) SkipWriteUntouchedIndices() bool {
return opt.skipWriteUntouchedIndices
}

// GetAddRecordOpt creates a AddRecordOpt.
func (opt *UpdateRecordOpt) GetAddRecordOpt() *AddRecordOpt {
return &AddRecordOpt{commonMutateOpt: opt.commonMutateOpt}
Expand All @@ -179,57 +204,57 @@ func (opt *UpdateRecordOpt) GetCreateIdxOpt() *CreateIdxOpt {

// UpdateRecordOption is defined for the UpdateRecord() method of the Table interface.
type UpdateRecordOption interface {
ApplyUpdateRecordOpt(*UpdateRecordOpt)
applyUpdateRecordOpt(*UpdateRecordOpt)
}

// CommonMutateOptFunc is a function to provide common options for mutating a table.
type CommonMutateOptFunc func(*commonMutateOpt)

// ApplyAddRecordOpt implements the AddRecordOption interface.
func (f CommonMutateOptFunc) ApplyAddRecordOpt(opt *AddRecordOpt) {
func (f CommonMutateOptFunc) applyAddRecordOpt(opt *AddRecordOpt) {
f(&opt.commonMutateOpt)
}

// ApplyUpdateRecordOpt implements the UpdateRecordOption interface.
func (f CommonMutateOptFunc) ApplyUpdateRecordOpt(opt *UpdateRecordOpt) {
func (f CommonMutateOptFunc) applyUpdateRecordOpt(opt *UpdateRecordOpt) {
f(&opt.commonMutateOpt)
}

// ApplyCreateIdxOpt implements the CreateIdxOption interface.
func (f CommonMutateOptFunc) ApplyCreateIdxOpt(opt *CreateIdxOpt) {
func (f CommonMutateOptFunc) applyCreateIdxOpt(opt *CreateIdxOpt) {
f(&opt.commonMutateOpt)
}

// WithCtx returns a CommonMutateOptFunc.
// This option is used to pass context.Context.
func WithCtx(ctx context.Context) CommonMutateOptFunc {
return func(opt *commonMutateOpt) {
opt.Ctx = ctx
opt.ctx = ctx
}
}

// WithReserveAutoIDHint tells the AddRecord operation to reserve a batch of auto ID in the stmtctx.
type WithReserveAutoIDHint int

// ApplyAddRecordOpt implements the AddRecordOption interface.
func (n WithReserveAutoIDHint) ApplyAddRecordOpt(opt *AddRecordOpt) {
opt.ReserveAutoID = int(n)
func (n WithReserveAutoIDHint) applyAddRecordOpt(opt *AddRecordOpt) {
opt.reserveAutoID = int(n)
}

// IsUpdate is a defined value for AddRecordOptFunc.
var IsUpdate AddRecordOption = isUpdate{}

type isUpdate struct{}

func (i isUpdate) ApplyAddRecordOpt(opt *AddRecordOpt) {
opt.IsUpdate = true
func (i isUpdate) applyAddRecordOpt(opt *AddRecordOpt) {
opt.isUpdate = true
}

// skipWriteUntouchedIndices implements UpdateRecordOption.
type skipWriteUntouchedIndices struct{}

func (skipWriteUntouchedIndices) ApplyUpdateRecordOpt(opt *UpdateRecordOpt) {
opt.SkipWriteUntouchedIndices = true
func (skipWriteUntouchedIndices) applyUpdateRecordOpt(opt *UpdateRecordOpt) {
opt.skipWriteUntouchedIndices = true
}

// SkipWriteUntouchedIndices is an option to skip write untouched options when updating a record.
Expand Down Expand Up @@ -257,18 +282,18 @@ const (
)

// ApplyAddRecordOpt implements the AddRecordOption interface.
func (m DupKeyCheckMode) ApplyAddRecordOpt(opt *AddRecordOpt) {
opt.DupKeyCheck = m
func (m DupKeyCheckMode) applyAddRecordOpt(opt *AddRecordOpt) {
opt.dupKeyCheck = m
}

// ApplyUpdateRecordOpt implements the UpdateRecordOption interface.
func (m DupKeyCheckMode) ApplyUpdateRecordOpt(opt *UpdateRecordOpt) {
opt.DupKeyCheck = m
func (m DupKeyCheckMode) applyUpdateRecordOpt(opt *UpdateRecordOpt) {
opt.dupKeyCheck = m
}

// ApplyCreateIdxOpt implements the CreateIdxOption interface.
func (m DupKeyCheckMode) ApplyCreateIdxOpt(opt *CreateIdxOpt) {
opt.DupKeyCheck = m
func (m DupKeyCheckMode) applyCreateIdxOpt(opt *CreateIdxOpt) {
opt.dupKeyCheck = m
}

type columnAPI interface {
Expand Down
20 changes: 10 additions & 10 deletions pkg/table/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,29 +52,29 @@ func TestOptions(t *testing.T) {
// NewAddRecordOpt with options
addOpt = NewAddRecordOpt(WithCtx(ctx), IsUpdate, WithReserveAutoIDHint(12))
require.Equal(t, AddRecordOpt{
commonMutateOpt: commonMutateOpt{Ctx: ctx},
IsUpdate: true,
ReserveAutoID: 12,
commonMutateOpt: commonMutateOpt{ctx: ctx},
isUpdate: true,
reserveAutoID: 12,
}, *addOpt)
require.Equal(t, CreateIdxOpt{commonMutateOpt: commonMutateOpt{Ctx: ctx}}, *(addOpt.GetCreateIdxOpt()))
require.Equal(t, CreateIdxOpt{commonMutateOpt: commonMutateOpt{ctx: ctx}}, *(addOpt.GetCreateIdxOpt()))
// NewUpdateRecordOpt without option
updateOpt := NewUpdateRecordOpt()
require.Equal(t, UpdateRecordOpt{}, *updateOpt)
require.Equal(t, AddRecordOpt{}, *(updateOpt.GetAddRecordOpt()))
require.Equal(t, CreateIdxOpt{}, *(updateOpt.GetCreateIdxOpt()))
// NewUpdateRecordOpt with options
updateOpt = NewUpdateRecordOpt(WithCtx(ctx))
require.Equal(t, UpdateRecordOpt{commonMutateOpt: commonMutateOpt{Ctx: ctx}}, *updateOpt)
require.Equal(t, AddRecordOpt{commonMutateOpt: commonMutateOpt{Ctx: ctx}}, *(updateOpt.GetAddRecordOpt()))
require.Equal(t, CreateIdxOpt{commonMutateOpt: commonMutateOpt{Ctx: ctx}}, *(updateOpt.GetCreateIdxOpt()))
require.Equal(t, UpdateRecordOpt{commonMutateOpt: commonMutateOpt{ctx: ctx}}, *updateOpt)
require.Equal(t, AddRecordOpt{commonMutateOpt: commonMutateOpt{ctx: ctx}}, *(updateOpt.GetAddRecordOpt()))
require.Equal(t, CreateIdxOpt{commonMutateOpt: commonMutateOpt{ctx: ctx}}, *(updateOpt.GetCreateIdxOpt()))
// NewCreateIdxOpt without option
createIdxOpt := NewCreateIdxOpt()
require.Equal(t, CreateIdxOpt{}, *createIdxOpt)
// NewCreateIdxOpt with options
createIdxOpt = NewCreateIdxOpt(WithCtx(ctx), WithIgnoreAssertion, FromBackfill)
require.Equal(t, CreateIdxOpt{
commonMutateOpt: commonMutateOpt{Ctx: ctx},
IgnoreAssertion: true,
FromBackFill: true,
commonMutateOpt: commonMutateOpt{ctx: ctx},
ignoreAssertion: true,
fromBackFill: true,
}, *createIdxOpt)
}
14 changes: 7 additions & 7 deletions pkg/table/tables/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (c *index) create(sctx table.MutateContext, txn kv.Transaction, indexedValu
txn.CacheTableInfo(c.phyTblID, c.tblInfo)
}
indexedValues := c.getIndexedValue(indexedValue)
ctx := opt.Ctx
ctx := opt.Ctx()
if ctx != nil {
var r tracing.Region
r, ctx = tracing.StartRegionEx(ctx, "index.Create")
Expand All @@ -178,7 +178,7 @@ func (c *index) create(sctx table.MutateContext, txn kv.Transaction, indexedValu
}
vars := sctx.GetSessionVars()
writeBufs := sctx.GetMutateBuffers().GetWriteStmtBufs()
skipCheck := opt.DupKeyCheck == table.DupKeyCheckSkip
skipCheck := opt.DupKeyCheck() == table.DupKeyCheckSkip
evalCtx := sctx.GetExprCtx().GetEvalCtx()
loc, ec := evalCtx.Location(), evalCtx.ErrCtx()
for _, value := range indexedValues {
Expand All @@ -192,7 +192,7 @@ func (c *index) create(sctx table.MutateContext, txn kv.Transaction, indexedValu
keyVer byte
keyIsTempIdxKey bool
)
if !opt.FromBackFill {
if !opt.FromBackFill() {
key, tempKey, keyVer = GenTempIdxKeyByState(c.idxInfo, key)
if keyVer == TempIndexKeyTypeBackfill || keyVer == TempIndexKeyTypeDelete {
key, tempKey = tempKey, nil
Expand Down Expand Up @@ -246,7 +246,7 @@ func (c *index) create(sctx table.MutateContext, txn kv.Transaction, indexedValu
return nil, err
}

ignoreAssertion := opt.IgnoreAssertion || c.idxInfo.State != model.StatePublic
ignoreAssertion := opt.IgnoreAssertion() || c.idxInfo.State != model.StatePublic

if !distinct || skipCheck || untouched {
val := idxVal
Expand All @@ -272,7 +272,7 @@ func (c *index) create(sctx table.MutateContext, txn kv.Transaction, indexedValu
}
}
if !ignoreAssertion && !untouched {
if opt.DupKeyCheck == table.DupKeyCheckLazy && !txn.IsPessimistic() {
if opt.DupKeyCheck() == table.DupKeyCheckLazy && !txn.IsPessimistic() {
err = txn.SetAssertion(key, kv.SetAssertUnknown)
} else {
err = txn.SetAssertion(key, kv.SetAssertNotExist)
Expand All @@ -288,7 +288,7 @@ func (c *index) create(sctx table.MutateContext, txn kv.Transaction, indexedValu
if c.tblInfo.TempTableType != model.TempTableNone {
// Always check key for temporary table because it does not write to TiKV
value, err = txn.Get(ctx, key)
} else if opt.DupKeyCheck == table.DupKeyCheckLazy && !keyIsTempIdxKey {
} else if opt.DupKeyCheck() == table.DupKeyCheckLazy && !keyIsTempIdxKey {
// For temp index keys, we can't get the temp value from memory buffer, even if the lazy check is enabled.
// Otherwise, it may cause the temp index value to be overwritten, leading to data inconsistency.
value, err = txn.GetMemBuffer().GetLocal(ctx, key)
Expand All @@ -308,7 +308,7 @@ func (c *index) create(sctx table.MutateContext, txn kv.Transaction, indexedValu
// The index key value is not found or deleted.
if err != nil || len(value) == 0 || (!tempIdxVal.IsEmpty() && tempIdxVal.Current().Delete) {
val := idxVal
lazyCheck := opt.DupKeyCheck == table.DupKeyCheckLazy && err != nil
lazyCheck := opt.DupKeyCheck() == table.DupKeyCheckLazy && err != nil
if keyIsTempIdxKey {
tempVal := tablecodec.TempIndexValueElem{Value: idxVal, KeyVer: keyVer, Distinct: true}
val = tempVal.Encode(value)
Expand Down
Loading