Skip to content

Commit

Permalink
Manage index hinting for sharding data iterator query
Browse files Browse the repository at this point in the history
  • Loading branch information
mtaner committed Feb 12, 2025
1 parent 4d3db2f commit d913dcf
Show file tree
Hide file tree
Showing 4 changed files with 244 additions and 21 deletions.
44 changes: 44 additions & 0 deletions sharding/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ type Config struct {
PrimaryKeyTables []string

Throttle *ghostferry.LagThrottlerConfig

// ShardedCopyFilterConfig is used to configure the sharded copy filter query.
ShardedCopyFilterConfig *ShardedCopyFilterConfig
}

func (c *Config) ValidateConfig() error {
Expand All @@ -44,3 +47,44 @@ func (c *Config) ValidateConfig() error {

return c.Config.ValidateConfig()
}

type ShardedCopyFilterConfig struct {
// Ghostferry requires an index to be present on sharding key to improve the performance of the data iterator's query.

// DisableIndexHint disables the named index hint on the sharding key. Defaults to false.
DisableIndexHint bool

// `USE INDEX` is used by default as part of the index hint if DisableIndexHint is false.
// UseForceIndex replaces `USE INDEX` with `FORCE INDEX` on the query. Defaults to false.
// `USE INDEX` is taken as a suggestion and optimizer can still opt in to use full table scan if it thinks it's faster.
// `FORCE INDEX` will force the optimizer to use the index and will not consider other options.

// See https://dev.mysql.com/doc/refman/8.0/en/index-hints.html for more information on index hints.
UseForceIndex bool

// IndexHintingPerTable has greatest specificity and takes precedence over the other options.
// If DisableIndexHint is true, useForceIndex and indexName options will be ignored.
// indexName can be used to change the index selected by Ghostferry.
//
// example:
// IndexHintingPerTable: {
// "blog": {
// "users": {
// "DisableIndexHint": false,
// "UseForceIndex": true,
// "IndexName": "ix_users_some_id"
// }
// }
// }
IndexHintingPerTable map[string]map[string]IndexConfigPerTable
}

// SchemaName => TableName => IndexConfig
func (c *ShardedCopyFilterConfig) IndexConfigForTables(schemaName string) map[string]IndexConfigPerTable {
tableConfig, found := c.IndexHintingPerTable[schemaName]
if !found {
return nil
}

return tableConfig
}
41 changes: 38 additions & 3 deletions sharding/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,22 @@ type JoinTable struct {
TableName, JoinColumn string
}

type IndexConfigPerTable struct {
DisableIndexHint bool
UseForceIndex bool
IndexName string
}

type ShardedCopyFilter struct {
ShardingKey string
ShardingValue interface{}
JoinedTables map[string][]JoinTable
PrimaryKeyTables map[string]struct{}

IndexConfigPerTable map[string]IndexConfigPerTable
DisableIndexHint bool
UseForceIndex bool

missingShardingKeyIndexLogged sync.Map
}

Expand Down Expand Up @@ -61,8 +71,19 @@ func (f *ShardedCopyFilter) BuildSelect(columns []string, table *ghostferry.Tabl
//
// i.e. load the primary keys first, then load the rest of the columns.

selectPaginationKeys := "SELECT " + quotedPaginationKey + " FROM " + quotedTable + " " + f.shardingKeyIndexHint(table) +
" WHERE " + quotedShardingKey + " = ? AND " + quotedPaginationKey + " > ?" +
DisableIndexHint := f.DisableIndexHint

if tableSpecificIndexConfig, exists := f.IndexConfigPerTable[table.Name]; exists {
DisableIndexHint = tableSpecificIndexConfig.DisableIndexHint
}

selectPaginationKeys := "SELECT " + quotedPaginationKey + " FROM " + quotedTable

if !DisableIndexHint {
selectPaginationKeys += " " + f.shardingKeyIndexHint(table)
}

selectPaginationKeys += " WHERE " + quotedShardingKey + " = ? AND " + quotedPaginationKey + " > ?" +
" ORDER BY " + quotedPaginationKey + " LIMIT " + strconv.Itoa(int(batchSize))

return sq.Select(columns...).
Expand Down Expand Up @@ -118,7 +139,21 @@ func (f *ShardedCopyFilter) BuildSelect(columns []string, table *ghostferry.Tabl
}

func (f *ShardedCopyFilter) shardingKeyIndexHint(table *ghostferry.TableSchema) string {
if indexName := f.shardingKeyIndexName(table); indexName != "" {
useForceIndex := f.UseForceIndex
indexName := f.shardingKeyIndexName(table)

if tableSpecificIndexConfig, exists := f.IndexConfigPerTable[table.Name]; exists {
if tableSpecificIndexConfig.IndexName != "" {
indexName = tableSpecificIndexConfig.IndexName
}
useForceIndex = tableSpecificIndexConfig.UseForceIndex
}

if indexName != "" {
if useForceIndex {
return "FORCE INDEX (`" + indexName + "`)"
}

return "USE INDEX (`" + indexName + "`)"
} else {
if _, logged := f.missingShardingKeyIndexLogged.Load(table.Name); !logged {
Expand Down
45 changes: 28 additions & 17 deletions sharding/sharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,7 @@ func NewFerry(config *Config) (*ShardingFerry, error) {

config.DatabaseRewrites = map[string]string{config.SourceDB: config.TargetDB}

config.CopyFilter = &ShardedCopyFilter{
ShardingKey: config.ShardingKey,
ShardingValue: config.ShardingValue,
JoinedTables: config.JoinedTables,
}

config.CopyFilter = NewShardedCopyFilter(config)
config.VerifierType = ghostferry.VerifierTypeInline

var tableFilter ghostferry.TableFilter
Expand All @@ -37,11 +32,11 @@ func NewFerry(config *Config) (*ShardingFerry, error) {
}

tableFilter = &ShardedTableFilter{
ShardingKey: config.ShardingKey,
SourceShard: config.SourceDB,
JoinedTables: config.JoinedTables,
Type: IncludedTablesFilter,
Tables: included,
ShardingKey: config.ShardingKey,
SourceShard: config.SourceDB,
JoinedTables: config.JoinedTables,
Type: IncludedTablesFilter,
Tables: included,
}
} else {
ignored, err := compileRegexps(config.IgnoredTables)
Expand All @@ -50,11 +45,11 @@ func NewFerry(config *Config) (*ShardingFerry, error) {
}

tableFilter = &ShardedTableFilter{
ShardingKey: config.ShardingKey,
SourceShard: config.SourceDB,
JoinedTables: config.JoinedTables,
Type: IgnoredTablesFilter,
Tables: ignored,
ShardingKey: config.ShardingKey,
SourceShard: config.SourceDB,
JoinedTables: config.JoinedTables,
Type: IgnoredTablesFilter,
Tables: ignored,
}
}

Expand Down Expand Up @@ -87,6 +82,23 @@ func NewFerry(config *Config) (*ShardingFerry, error) {
}, nil
}

func NewShardedCopyFilter(config *Config) *ShardedCopyFilter {
if config.ShardedCopyFilterConfig == nil {
config.ShardedCopyFilterConfig = &ShardedCopyFilterConfig{
IndexHintingPerTable: map[string]map[string]IndexConfigPerTable{},
}
}

return &ShardedCopyFilter{
ShardingKey: config.ShardingKey,
ShardingValue: config.ShardingValue,
JoinedTables: config.JoinedTables,
DisableIndexHint: config.ShardedCopyFilterConfig.DisableIndexHint,
UseForceIndex: config.ShardedCopyFilterConfig.UseForceIndex,
IndexConfigPerTable: config.ShardedCopyFilterConfig.IndexConfigForTables(config.SourceDB),
}
}

func (r *ShardingFerry) Initialize() error {
if r.config.RunFerryFromReplica {
err := r.initializeWaitUntilReplicaIsCaughtUpToMasterConnection()
Expand Down Expand Up @@ -125,7 +137,6 @@ func (r *ShardingFerry) Run() {

r.AbortIfTargetDbNoLongerWriteable()


// The callback must ensure that all in-flight transactions are complete and
// there will be no more writes to the database after it returns.

Expand Down
135 changes: 134 additions & 1 deletion sharding/test/copy_filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type CopyFilterTestSuite struct {
shardingValue int64
paginationKeyCursor uint64

normalTable, joinedTable, primaryKeyTable *ghostferry.TableSchema
normalTable, normalTable2, joinedTable, primaryKeyTable *ghostferry.TableSchema

filter *sharding.ShardedCopyFilter
}
Expand All @@ -46,6 +46,20 @@ func (t *CopyFilterTestSuite) SetupTest() {
PaginationKeyColumn: &columns[0],
}

columns = []schema.TableColumn{{Name: "id"}, {Name: "tenant_id"}, {Name: "more_data"}}
t.normalTable2 = &ghostferry.TableSchema{
Table: &schema.Table{
Schema: "shard_1",
Name: "normaltable2",
Columns: columns,
PKColumns: []int{0},
Indexes: []*schema.Index{
{Name: "good_sharding_index", Columns: []string{"tenant_id", "id"}},
},
},
PaginationKeyColumn: &columns[0],
}

columns = []schema.TableColumn{{Name: "joined_paginationKey"}}
t.joinedTable = &ghostferry.TableSchema{
Table: &schema.Table{
Expand Down Expand Up @@ -116,6 +130,125 @@ func (t *CopyFilterTestSuite) TestFallsBackToIgnoredPrimaryIndex() {
t.Require().Equal([]interface{}{t.shardingValue, t.paginationKeyCursor}, args)
}

func (t *CopyFilterTestSuite) TestRemovesIndexHint() {
t.filter.DisableIndexHint = true
selectBuilder, err := t.filter.BuildSelect([]string{"*"}, t.normalTable, t.paginationKeyCursor, 1024)
t.Require().Nil(err)

sql, args, err := selectBuilder.ToSql()
t.Require().Nil(err)
t.Require().Equal("SELECT * FROM `shard_1`.`normaltable` JOIN (SELECT `id` FROM `shard_1`.`normaltable` WHERE `tenant_id` = ? AND `id` > ? ORDER BY `id` LIMIT 1024) AS `batch` USING(`id`)", sql)
t.Require().Equal([]interface{}{t.shardingValue, t.paginationKeyCursor}, args)
}

func (t *CopyFilterTestSuite) TestUsesForceIndex() {
t.filter.UseForceIndex = true
selectBuilder, err := t.filter.BuildSelect([]string{"*"}, t.normalTable, t.paginationKeyCursor, 1024)
t.Require().Nil(err)

sql, args, err := selectBuilder.ToSql()
t.Require().Nil(err)
t.Require().Equal("SELECT * FROM `shard_1`.`normaltable` JOIN (SELECT `id` FROM `shard_1`.`normaltable` FORCE INDEX (`good_sharding_index`) WHERE `tenant_id` = ? AND `id` > ? ORDER BY `id` LIMIT 1024) AS `batch` USING(`id`)", sql)
t.Require().Equal([]interface{}{t.shardingValue, t.paginationKeyCursor}, args)
}

func (t *CopyFilterTestSuite) TestIgnoresForceIndexIfDisableIndexHintIsTrue() {
t.filter.DisableIndexHint = true
t.filter.UseForceIndex = true
selectBuilder, err := t.filter.BuildSelect([]string{"*"}, t.normalTable, t.paginationKeyCursor, 1024)
t.Require().Nil(err)

sql, args, err := selectBuilder.ToSql()
t.Require().Nil(err)
t.Require().Equal("SELECT * FROM `shard_1`.`normaltable` JOIN (SELECT `id` FROM `shard_1`.`normaltable` WHERE `tenant_id` = ? AND `id` > ? ORDER BY `id` LIMIT 1024) AS `batch` USING(`id`)", sql)
t.Require().Equal([]interface{}{t.shardingValue, t.paginationKeyCursor}, args)
}

func (t *CopyFilterTestSuite) TestHigherSpecifityOfIndexHintingPerTable() {
t.filter.IndexConfigPerTable = map[string]sharding.IndexConfigPerTable{
"normaltable": {
DisableIndexHint: false,
UseForceIndex: true,
},
}

t.filter.DisableIndexHint = true
t.filter.UseForceIndex = false

selectBuilder1, err := t.filter.BuildSelect([]string{"*"}, t.normalTable, t.paginationKeyCursor, 1024)
t.Require().Nil(err)

sql, args, err := selectBuilder1.ToSql()
t.Require().Nil(err)
t.Require().Equal("SELECT * FROM `shard_1`.`normaltable` JOIN (SELECT `id` FROM `shard_1`.`normaltable` FORCE INDEX (`good_sharding_index`) WHERE `tenant_id` = ? AND `id` > ? ORDER BY `id` LIMIT 1024) AS `batch` USING(`id`)", sql)
t.Require().Equal([]interface{}{t.shardingValue, t.paginationKeyCursor}, args)

selectBuilder2, err := t.filter.BuildSelect([]string{"*"}, t.normalTable2, t.paginationKeyCursor, 1024)
t.Require().Nil(err)

sql, args, err = selectBuilder2.ToSql()
t.Require().Nil(err)
t.Require().Equal("SELECT * FROM `shard_1`.`normaltable2` JOIN (SELECT `id` FROM `shard_1`.`normaltable2` WHERE `tenant_id` = ? AND `id` > ? ORDER BY `id` LIMIT 1024) AS `batch` USING(`id`)", sql)
t.Require().Equal([]interface{}{t.shardingValue, t.paginationKeyCursor}, args)
}

func (t *CopyFilterTestSuite) TestHigherSpecifityOfIndexHintingPerTable2() {
t.filter.IndexConfigPerTable = map[string]sharding.IndexConfigPerTable{
"normaltable": {
DisableIndexHint: true,
},
}

t.filter.UseForceIndex = true

selectBuilder1, err := t.filter.BuildSelect([]string{"*"}, t.normalTable, t.paginationKeyCursor, 1024)
t.Require().Nil(err)

sql, args, err := selectBuilder1.ToSql()
t.Require().Nil(err)
t.Require().Equal("SELECT * FROM `shard_1`.`normaltable` JOIN (SELECT `id` FROM `shard_1`.`normaltable` WHERE `tenant_id` = ? AND `id` > ? ORDER BY `id` LIMIT 1024) AS `batch` USING(`id`)", sql)
t.Require().Equal([]interface{}{t.shardingValue, t.paginationKeyCursor}, args)

selectBuilder2, err := t.filter.BuildSelect([]string{"*"}, t.normalTable2, t.paginationKeyCursor, 1024)
t.Require().Nil(err)

sql, args, err = selectBuilder2.ToSql()
t.Require().Nil(err)
t.Require().Equal("SELECT * FROM `shard_1`.`normaltable2` JOIN (SELECT `id` FROM `shard_1`.`normaltable2` FORCE INDEX (`good_sharding_index`) WHERE `tenant_id` = ? AND `id` > ? ORDER BY `id` LIMIT 1024) AS `batch` USING(`id`)", sql)
t.Require().Equal([]interface{}{t.shardingValue, t.paginationKeyCursor}, args)
}

func (t *CopyFilterTestSuite) TestHigherSpecifityOfIndexHintingPerTable3() {
t.filter.IndexConfigPerTable = map[string]sharding.IndexConfigPerTable{
"normaltable": {
DisableIndexHint: true,
UseForceIndex: true, // will be ignored when DisableIndexHint is true
IndexName: "another_sharding_index", // will be ignored when DisableIndexHint is true
},
"normaltable2": {
IndexName: "another_sharding_index",
},
}

t.filter.UseForceIndex = true

selectBuilder1, err := t.filter.BuildSelect([]string{"*"}, t.normalTable, t.paginationKeyCursor, 1024)
t.Require().Nil(err)

sql, args, err := selectBuilder1.ToSql()
t.Require().Nil(err)
t.Require().Equal("SELECT * FROM `shard_1`.`normaltable` JOIN (SELECT `id` FROM `shard_1`.`normaltable` WHERE `tenant_id` = ? AND `id` > ? ORDER BY `id` LIMIT 1024) AS `batch` USING(`id`)", sql)
t.Require().Equal([]interface{}{t.shardingValue, t.paginationKeyCursor}, args)

selectBuilder2, err := t.filter.BuildSelect([]string{"*"}, t.normalTable2, t.paginationKeyCursor, 1024)
t.Require().Nil(err)

sql, args, err = selectBuilder2.ToSql()
t.Require().Nil(err)
t.Require().Equal("SELECT * FROM `shard_1`.`normaltable2` JOIN (SELECT `id` FROM `shard_1`.`normaltable2` USE INDEX (`another_sharding_index`) WHERE `tenant_id` = ? AND `id` > ? ORDER BY `id` LIMIT 1024) AS `batch` USING(`id`)", sql)
t.Require().Equal([]interface{}{t.shardingValue, t.paginationKeyCursor}, args)
}

func (t *CopyFilterTestSuite) TestSelectsJoinedTables() {
selectBuilder, err := t.filter.BuildSelect([]string{"*"}, t.joinedTable, t.paginationKeyCursor, 1024)
t.Require().Nil(err)
Expand Down

0 comments on commit d913dcf

Please sign in to comment.