Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: decouple job scheduler from 'ddl' and make it run/exit as owner changes #53548

Merged
merged 23 commits into from
May 30, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
change
D3Hunter committed May 24, 2024

Verified

This commit was signed with the committer’s verified signature.
pradyunsg Pradyun Gedam
commit b07a4b78799f5e328c5e37d2025e9d0e549e307b
3 changes: 1 addition & 2 deletions pkg/ddl/job_table.go
Original file line number Diff line number Diff line change
@@ -43,7 +43,6 @@ import (
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/table"
tidb_util "github.com/pingcap/tidb/pkg/util"
tidbutil "github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/dbterror"
"github.com/pingcap/tidb/pkg/util/intest"
@@ -241,7 +240,7 @@ func (s *jobScheduler) getJob(se *sess.Session, tp jobType, filter func(*model.J

func hasSysDB(job *model.Job) bool {
for _, info := range job.GetInvolvingSchemaInfo() {
if tidb_util.IsSysDB(info.Database) {
if tidbutil.IsSysDB(info.Database) {
return true
}
}
20 changes: 0 additions & 20 deletions pkg/owner/listener.go

This file was deleted.

8 changes: 7 additions & 1 deletion pkg/owner/manager.go
Original file line number Diff line number Diff line change
@@ -39,6 +39,12 @@ import (
"go.uber.org/zap"
)

// Listener is used to listen the ownerManager's owner state.
type Listener interface {
OnBecomeOwner()
OnRetireOwner()
}

// Manager is used to campaign the owner and manage the owner information.
type Manager interface {
// ID returns the ID of the manager.
@@ -61,7 +67,7 @@ type Manager interface {
RequireOwner(ctx context.Context) error
// CampaignCancel cancels one etcd campaign
CampaignCancel()
// SetListener sets the listener.
// SetListener sets the listener, set before start campaign.
D3Hunter marked this conversation as resolved.
Show resolved Hide resolved
SetListener(listener Listener)
}

21 changes: 19 additions & 2 deletions pkg/owner/manager_test.go
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@ import (
"context"
"fmt"
"runtime"
"sync/atomic"
"testing"
"time"

@@ -78,6 +79,17 @@ func (ti *testInfo) Close(t *testing.T) {
ti.cluster.Terminate(t)
}

type listener struct {
val atomic.Bool
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
}

func (l *listener) OnBecomeOwner() {
l.val.Store(true)
}
func (l *listener) OnRetireOwner() {
l.val.Store(false)
}

func TestSingle(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("integration.NewClusterV3 will create file contains a colon which is not allowed on Windows")
@@ -87,9 +99,13 @@ func TestSingle(t *testing.T) {
tInfo := newTestInfo(t)
client, d := tInfo.client, tInfo.ddl
defer tInfo.Close(t)
require.NoError(t, d.OwnerManager().CampaignOwner())
ownerManager := d.OwnerManager()
lis := &listener{}
ownerManager.SetListener(lis)
require.NoError(t, ownerManager.CampaignOwner())
isOwner := checkOwner(d, true)
require.True(t, isOwner)
require.True(t, lis.val.Load())

// test for newSession failed
ctx := context.Background()
@@ -105,9 +121,10 @@ func TestSingle(t *testing.T) {
require.True(t, isOwner)

// The test is used to exit campaign loop.
d.OwnerManager().Cancel()
ownerManager.Cancel()
isOwner = checkOwner(d, false)
require.False(t, isOwner)
require.False(t, lis.val.Load())

time.Sleep(200 * time.Millisecond)

4 changes: 3 additions & 1 deletion pkg/owner/mock.go
Original file line number Diff line number Diff line change
@@ -97,7 +97,9 @@ func (m *mockManager) toBeOwner() {
// RetireOwner implements Manager.RetireOwner interface.
func (m *mockManager) RetireOwner() {
util.MockGlobalStateEntry.OwnerKey(m.storeID, m.key).UnsetOwner(m.id)
m.listener.OnRetireOwner()
if m.listener != nil {
m.listener.OnRetireOwner()
}
}

// Cancel implements Manager.Cancel interface.