From d913dcf934e5871682099f068fe47d1192a23dc0 Mon Sep 17 00:00:00 2001 From: Merve Taner Date: Wed, 12 Feb 2025 16:39:44 +0000 Subject: [PATCH] Manage index hinting for sharding data iterator query --- sharding/config.go | 44 ++++++++++ sharding/filter.go | 41 ++++++++- sharding/sharding.go | 45 ++++++---- sharding/test/copy_filter_test.go | 135 +++++++++++++++++++++++++++++- 4 files changed, 244 insertions(+), 21 deletions(-) diff --git a/sharding/config.go b/sharding/config.go index e244583b..21e58671 100644 --- a/sharding/config.go +++ b/sharding/config.go @@ -29,6 +29,9 @@ type Config struct { PrimaryKeyTables []string Throttle *ghostferry.LagThrottlerConfig + + // ShardedCopyFilterConfig is used to configure the sharded copy filter query. + ShardedCopyFilterConfig *ShardedCopyFilterConfig } func (c *Config) ValidateConfig() error { @@ -44,3 +47,44 @@ func (c *Config) ValidateConfig() error { return c.Config.ValidateConfig() } + +type ShardedCopyFilterConfig struct { + // Ghostferry requires an index to be present on sharding key to improve the performance of the data iterator's query. + + // DisableIndexHint disables the named index hint on the sharding key. Defaults to false. + DisableIndexHint bool + + // `USE INDEX` is used by default as part of the index hint if DisableIndexHint is false. + // UseForceIndex replaces `USE INDEX` with `FORCE INDEX` on the query. Defaults to false. + // `USE INDEX` is taken as a suggestion and optimizer can still opt in to use full table scan if it thinks it's faster. + // `FORCE INDEX` will force the optimizer to use the index and will not consider other options. + + // See https://dev.mysql.com/doc/refman/8.0/en/index-hints.html for more information on index hints. + UseForceIndex bool + + // IndexHintingPerTable has greatest specificity and takes precedence over the other options. + // If DisableIndexHint is true, useForceIndex and indexName options will be ignored. + // indexName can be used to change the index selected by Ghostferry. + // + // example: + // IndexHintingPerTable: { + // "blog": { + // "users": { + // "DisableIndexHint": false, + // "UseForceIndex": true, + // "IndexName": "ix_users_some_id" + // } + // } + // } + IndexHintingPerTable map[string]map[string]IndexConfigPerTable +} + +// SchemaName => TableName => IndexConfig +func (c *ShardedCopyFilterConfig) IndexConfigForTables(schemaName string) map[string]IndexConfigPerTable { + tableConfig, found := c.IndexHintingPerTable[schemaName] + if !found { + return nil + } + + return tableConfig +} diff --git a/sharding/filter.go b/sharding/filter.go index 5e16dd11..0ec0364d 100644 --- a/sharding/filter.go +++ b/sharding/filter.go @@ -16,12 +16,22 @@ type JoinTable struct { TableName, JoinColumn string } +type IndexConfigPerTable struct { + DisableIndexHint bool + UseForceIndex bool + IndexName string +} + type ShardedCopyFilter struct { ShardingKey string ShardingValue interface{} JoinedTables map[string][]JoinTable PrimaryKeyTables map[string]struct{} + IndexConfigPerTable map[string]IndexConfigPerTable + DisableIndexHint bool + UseForceIndex bool + missingShardingKeyIndexLogged sync.Map } @@ -61,8 +71,19 @@ func (f *ShardedCopyFilter) BuildSelect(columns []string, table *ghostferry.Tabl // // i.e. load the primary keys first, then load the rest of the columns. - selectPaginationKeys := "SELECT " + quotedPaginationKey + " FROM " + quotedTable + " " + f.shardingKeyIndexHint(table) + - " WHERE " + quotedShardingKey + " = ? AND " + quotedPaginationKey + " > ?" + + DisableIndexHint := f.DisableIndexHint + + if tableSpecificIndexConfig, exists := f.IndexConfigPerTable[table.Name]; exists { + DisableIndexHint = tableSpecificIndexConfig.DisableIndexHint + } + + selectPaginationKeys := "SELECT " + quotedPaginationKey + " FROM " + quotedTable + + if !DisableIndexHint { + selectPaginationKeys += " " + f.shardingKeyIndexHint(table) + } + + selectPaginationKeys += " WHERE " + quotedShardingKey + " = ? AND " + quotedPaginationKey + " > ?" + " ORDER BY " + quotedPaginationKey + " LIMIT " + strconv.Itoa(int(batchSize)) return sq.Select(columns...). @@ -118,7 +139,21 @@ func (f *ShardedCopyFilter) BuildSelect(columns []string, table *ghostferry.Tabl } func (f *ShardedCopyFilter) shardingKeyIndexHint(table *ghostferry.TableSchema) string { - if indexName := f.shardingKeyIndexName(table); indexName != "" { + useForceIndex := f.UseForceIndex + indexName := f.shardingKeyIndexName(table) + + if tableSpecificIndexConfig, exists := f.IndexConfigPerTable[table.Name]; exists { + if tableSpecificIndexConfig.IndexName != "" { + indexName = tableSpecificIndexConfig.IndexName + } + useForceIndex = tableSpecificIndexConfig.UseForceIndex + } + + if indexName != "" { + if useForceIndex { + return "FORCE INDEX (`" + indexName + "`)" + } + return "USE INDEX (`" + indexName + "`)" } else { if _, logged := f.missingShardingKeyIndexLogged.Load(table.Name); !logged { diff --git a/sharding/sharding.go b/sharding/sharding.go index d2d174e0..2af27402 100644 --- a/sharding/sharding.go +++ b/sharding/sharding.go @@ -20,12 +20,7 @@ func NewFerry(config *Config) (*ShardingFerry, error) { config.DatabaseRewrites = map[string]string{config.SourceDB: config.TargetDB} - config.CopyFilter = &ShardedCopyFilter{ - ShardingKey: config.ShardingKey, - ShardingValue: config.ShardingValue, - JoinedTables: config.JoinedTables, - } - + config.CopyFilter = NewShardedCopyFilter(config) config.VerifierType = ghostferry.VerifierTypeInline var tableFilter ghostferry.TableFilter @@ -37,11 +32,11 @@ func NewFerry(config *Config) (*ShardingFerry, error) { } tableFilter = &ShardedTableFilter{ - ShardingKey: config.ShardingKey, - SourceShard: config.SourceDB, - JoinedTables: config.JoinedTables, - Type: IncludedTablesFilter, - Tables: included, + ShardingKey: config.ShardingKey, + SourceShard: config.SourceDB, + JoinedTables: config.JoinedTables, + Type: IncludedTablesFilter, + Tables: included, } } else { ignored, err := compileRegexps(config.IgnoredTables) @@ -50,11 +45,11 @@ func NewFerry(config *Config) (*ShardingFerry, error) { } tableFilter = &ShardedTableFilter{ - ShardingKey: config.ShardingKey, - SourceShard: config.SourceDB, - JoinedTables: config.JoinedTables, - Type: IgnoredTablesFilter, - Tables: ignored, + ShardingKey: config.ShardingKey, + SourceShard: config.SourceDB, + JoinedTables: config.JoinedTables, + Type: IgnoredTablesFilter, + Tables: ignored, } } @@ -87,6 +82,23 @@ func NewFerry(config *Config) (*ShardingFerry, error) { }, nil } +func NewShardedCopyFilter(config *Config) *ShardedCopyFilter { + if config.ShardedCopyFilterConfig == nil { + config.ShardedCopyFilterConfig = &ShardedCopyFilterConfig{ + IndexHintingPerTable: map[string]map[string]IndexConfigPerTable{}, + } + } + + return &ShardedCopyFilter{ + ShardingKey: config.ShardingKey, + ShardingValue: config.ShardingValue, + JoinedTables: config.JoinedTables, + DisableIndexHint: config.ShardedCopyFilterConfig.DisableIndexHint, + UseForceIndex: config.ShardedCopyFilterConfig.UseForceIndex, + IndexConfigPerTable: config.ShardedCopyFilterConfig.IndexConfigForTables(config.SourceDB), + } +} + func (r *ShardingFerry) Initialize() error { if r.config.RunFerryFromReplica { err := r.initializeWaitUntilReplicaIsCaughtUpToMasterConnection() @@ -125,7 +137,6 @@ func (r *ShardingFerry) Run() { r.AbortIfTargetDbNoLongerWriteable() - // The callback must ensure that all in-flight transactions are complete and // there will be no more writes to the database after it returns. diff --git a/sharding/test/copy_filter_test.go b/sharding/test/copy_filter_test.go index dd2b1d64..01bac6d7 100644 --- a/sharding/test/copy_filter_test.go +++ b/sharding/test/copy_filter_test.go @@ -20,7 +20,7 @@ type CopyFilterTestSuite struct { shardingValue int64 paginationKeyCursor uint64 - normalTable, joinedTable, primaryKeyTable *ghostferry.TableSchema + normalTable, normalTable2, joinedTable, primaryKeyTable *ghostferry.TableSchema filter *sharding.ShardedCopyFilter } @@ -46,6 +46,20 @@ func (t *CopyFilterTestSuite) SetupTest() { PaginationKeyColumn: &columns[0], } + columns = []schema.TableColumn{{Name: "id"}, {Name: "tenant_id"}, {Name: "more_data"}} + t.normalTable2 = &ghostferry.TableSchema{ + Table: &schema.Table{ + Schema: "shard_1", + Name: "normaltable2", + Columns: columns, + PKColumns: []int{0}, + Indexes: []*schema.Index{ + {Name: "good_sharding_index", Columns: []string{"tenant_id", "id"}}, + }, + }, + PaginationKeyColumn: &columns[0], + } + columns = []schema.TableColumn{{Name: "joined_paginationKey"}} t.joinedTable = &ghostferry.TableSchema{ Table: &schema.Table{ @@ -116,6 +130,125 @@ func (t *CopyFilterTestSuite) TestFallsBackToIgnoredPrimaryIndex() { t.Require().Equal([]interface{}{t.shardingValue, t.paginationKeyCursor}, args) } +func (t *CopyFilterTestSuite) TestRemovesIndexHint() { + t.filter.DisableIndexHint = true + selectBuilder, err := t.filter.BuildSelect([]string{"*"}, t.normalTable, t.paginationKeyCursor, 1024) + t.Require().Nil(err) + + sql, args, err := selectBuilder.ToSql() + t.Require().Nil(err) + t.Require().Equal("SELECT * FROM `shard_1`.`normaltable` JOIN (SELECT `id` FROM `shard_1`.`normaltable` WHERE `tenant_id` = ? AND `id` > ? ORDER BY `id` LIMIT 1024) AS `batch` USING(`id`)", sql) + t.Require().Equal([]interface{}{t.shardingValue, t.paginationKeyCursor}, args) +} + +func (t *CopyFilterTestSuite) TestUsesForceIndex() { + t.filter.UseForceIndex = true + selectBuilder, err := t.filter.BuildSelect([]string{"*"}, t.normalTable, t.paginationKeyCursor, 1024) + t.Require().Nil(err) + + sql, args, err := selectBuilder.ToSql() + t.Require().Nil(err) + t.Require().Equal("SELECT * FROM `shard_1`.`normaltable` JOIN (SELECT `id` FROM `shard_1`.`normaltable` FORCE INDEX (`good_sharding_index`) WHERE `tenant_id` = ? AND `id` > ? ORDER BY `id` LIMIT 1024) AS `batch` USING(`id`)", sql) + t.Require().Equal([]interface{}{t.shardingValue, t.paginationKeyCursor}, args) +} + +func (t *CopyFilterTestSuite) TestIgnoresForceIndexIfDisableIndexHintIsTrue() { + t.filter.DisableIndexHint = true + t.filter.UseForceIndex = true + selectBuilder, err := t.filter.BuildSelect([]string{"*"}, t.normalTable, t.paginationKeyCursor, 1024) + t.Require().Nil(err) + + sql, args, err := selectBuilder.ToSql() + t.Require().Nil(err) + t.Require().Equal("SELECT * FROM `shard_1`.`normaltable` JOIN (SELECT `id` FROM `shard_1`.`normaltable` WHERE `tenant_id` = ? AND `id` > ? ORDER BY `id` LIMIT 1024) AS `batch` USING(`id`)", sql) + t.Require().Equal([]interface{}{t.shardingValue, t.paginationKeyCursor}, args) +} + +func (t *CopyFilterTestSuite) TestHigherSpecifityOfIndexHintingPerTable() { + t.filter.IndexConfigPerTable = map[string]sharding.IndexConfigPerTable{ + "normaltable": { + DisableIndexHint: false, + UseForceIndex: true, + }, + } + + t.filter.DisableIndexHint = true + t.filter.UseForceIndex = false + + selectBuilder1, err := t.filter.BuildSelect([]string{"*"}, t.normalTable, t.paginationKeyCursor, 1024) + t.Require().Nil(err) + + sql, args, err := selectBuilder1.ToSql() + t.Require().Nil(err) + t.Require().Equal("SELECT * FROM `shard_1`.`normaltable` JOIN (SELECT `id` FROM `shard_1`.`normaltable` FORCE INDEX (`good_sharding_index`) WHERE `tenant_id` = ? AND `id` > ? ORDER BY `id` LIMIT 1024) AS `batch` USING(`id`)", sql) + t.Require().Equal([]interface{}{t.shardingValue, t.paginationKeyCursor}, args) + + selectBuilder2, err := t.filter.BuildSelect([]string{"*"}, t.normalTable2, t.paginationKeyCursor, 1024) + t.Require().Nil(err) + + sql, args, err = selectBuilder2.ToSql() + t.Require().Nil(err) + t.Require().Equal("SELECT * FROM `shard_1`.`normaltable2` JOIN (SELECT `id` FROM `shard_1`.`normaltable2` WHERE `tenant_id` = ? AND `id` > ? ORDER BY `id` LIMIT 1024) AS `batch` USING(`id`)", sql) + t.Require().Equal([]interface{}{t.shardingValue, t.paginationKeyCursor}, args) +} + +func (t *CopyFilterTestSuite) TestHigherSpecifityOfIndexHintingPerTable2() { + t.filter.IndexConfigPerTable = map[string]sharding.IndexConfigPerTable{ + "normaltable": { + DisableIndexHint: true, + }, + } + + t.filter.UseForceIndex = true + + selectBuilder1, err := t.filter.BuildSelect([]string{"*"}, t.normalTable, t.paginationKeyCursor, 1024) + t.Require().Nil(err) + + sql, args, err := selectBuilder1.ToSql() + t.Require().Nil(err) + t.Require().Equal("SELECT * FROM `shard_1`.`normaltable` JOIN (SELECT `id` FROM `shard_1`.`normaltable` WHERE `tenant_id` = ? AND `id` > ? ORDER BY `id` LIMIT 1024) AS `batch` USING(`id`)", sql) + t.Require().Equal([]interface{}{t.shardingValue, t.paginationKeyCursor}, args) + + selectBuilder2, err := t.filter.BuildSelect([]string{"*"}, t.normalTable2, t.paginationKeyCursor, 1024) + t.Require().Nil(err) + + sql, args, err = selectBuilder2.ToSql() + t.Require().Nil(err) + t.Require().Equal("SELECT * FROM `shard_1`.`normaltable2` JOIN (SELECT `id` FROM `shard_1`.`normaltable2` FORCE INDEX (`good_sharding_index`) WHERE `tenant_id` = ? AND `id` > ? ORDER BY `id` LIMIT 1024) AS `batch` USING(`id`)", sql) + t.Require().Equal([]interface{}{t.shardingValue, t.paginationKeyCursor}, args) +} + +func (t *CopyFilterTestSuite) TestHigherSpecifityOfIndexHintingPerTable3() { + t.filter.IndexConfigPerTable = map[string]sharding.IndexConfigPerTable{ + "normaltable": { + DisableIndexHint: true, + UseForceIndex: true, // will be ignored when DisableIndexHint is true + IndexName: "another_sharding_index", // will be ignored when DisableIndexHint is true + }, + "normaltable2": { + IndexName: "another_sharding_index", + }, + } + + t.filter.UseForceIndex = true + + selectBuilder1, err := t.filter.BuildSelect([]string{"*"}, t.normalTable, t.paginationKeyCursor, 1024) + t.Require().Nil(err) + + sql, args, err := selectBuilder1.ToSql() + t.Require().Nil(err) + t.Require().Equal("SELECT * FROM `shard_1`.`normaltable` JOIN (SELECT `id` FROM `shard_1`.`normaltable` WHERE `tenant_id` = ? AND `id` > ? ORDER BY `id` LIMIT 1024) AS `batch` USING(`id`)", sql) + t.Require().Equal([]interface{}{t.shardingValue, t.paginationKeyCursor}, args) + + selectBuilder2, err := t.filter.BuildSelect([]string{"*"}, t.normalTable2, t.paginationKeyCursor, 1024) + t.Require().Nil(err) + + sql, args, err = selectBuilder2.ToSql() + t.Require().Nil(err) + t.Require().Equal("SELECT * FROM `shard_1`.`normaltable2` JOIN (SELECT `id` FROM `shard_1`.`normaltable2` USE INDEX (`another_sharding_index`) WHERE `tenant_id` = ? AND `id` > ? ORDER BY `id` LIMIT 1024) AS `batch` USING(`id`)", sql) + t.Require().Equal([]interface{}{t.shardingValue, t.paginationKeyCursor}, args) +} + func (t *CopyFilterTestSuite) TestSelectsJoinedTables() { selectBuilder, err := t.filter.BuildSelect([]string{"*"}, t.joinedTable, t.paginationKeyCursor, 1024) t.Require().Nil(err)