Skip to content

Commit

Permalink
disttask: remove scheduler/dispatcher id from logs and refinement (#4…
Browse files Browse the repository at this point in the history
  • Loading branch information
D3Hunter authored Sep 26, 2023
1 parent 7052cee commit e70f6e4
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 32 deletions.
15 changes: 8 additions & 7 deletions disttask/framework/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package dispatcher

import (
"context"
"fmt"
"math/rand"
"time"

Expand Down Expand Up @@ -79,10 +78,11 @@ type Dispatcher interface {
// BaseDispatcher is the base struct for Dispatcher.
// each task type embed this struct and implement the Extension interface.
type BaseDispatcher struct {
ctx context.Context
taskMgr *storage.TaskManager
Task *proto.Task
logCtx context.Context
ctx context.Context
taskMgr *storage.TaskManager
Task *proto.Task
logCtx context.Context
// serverID, it's value is ip:port now.
serverID string
// when RegisterDispatcherFactory, the factory MUST initialize this field.
Extension
Expand All @@ -104,12 +104,13 @@ var MockOwnerChange func()

// NewBaseDispatcher creates a new BaseDispatcher.
func NewBaseDispatcher(ctx context.Context, taskMgr *storage.TaskManager, serverID string, task *proto.Task) *BaseDispatcher {
logPrefix := fmt.Sprintf("task_id: %d, task_type: %s, server_id: %s", task.ID, task.Type, serverID)
logCtx := logutil.WithFields(context.Background(), zap.Int64("task-id", task.ID),
zap.String("task-type", task.Type))
return &BaseDispatcher{
ctx: ctx,
taskMgr: taskMgr,
Task: task,
logCtx: logutil.WithKeyValue(context.Background(), "dispatcher", logPrefix),
logCtx: logCtx,
serverID: serverID,
liveNodes: nil,
liveNodeFetchInterval: DefaultLiveNodesCheckInterval,
Expand Down
13 changes: 7 additions & 6 deletions disttask/framework/dispatcher/dispatcher_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,13 @@ func (dm *Manager) clearRunningTasks() {
// Dispatcher schedule and monitor tasks.
// The scheduling task number is limited by size of gPool.
type Manager struct {
ctx context.Context
cancel context.CancelFunc
taskMgr *storage.TaskManager
wg tidbutil.WaitGroupWrapper
gPool *spool.Pool
inited bool
ctx context.Context
cancel context.CancelFunc
taskMgr *storage.TaskManager
wg tidbutil.WaitGroupWrapper
gPool *spool.Pool
inited bool
// serverID, it's value is ip:port now.
serverID string

finishCh chan struct{}
Expand Down
60 changes: 51 additions & 9 deletions disttask/framework/proto/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,58 @@ import (
)

// task state machine
// 1. succeed: pending -> running -> succeed
// 2. failed: pending -> running -> reverting -> reverted/revert_failed, pending -> failed
// 3. canceled: pending -> running -> cancelling -> reverting -> reverted/revert_failed
// 3. pause/resume: pending -> running -> pausing -> paused -> running
//
// subtask state machine
// 1. succeed/failed: pending -> running -> succeed/failed
// 2. canceled: pending -> running -> canceled
// 3. rollback: revert_pending -> reverting -> reverted/revert_failed
// 4. pause/resume: pending -> running -> paused -> running
// ┌──────────────────────────────┐
// │ ┌───────┐ ┌──┴───┐
// │ ┌────────►│pausing├──────►│paused│
// │ │ └───────┘ └──────┘
// ▼ │
// ┌───────┐ ┌───┴───┐ ┌────────┐
// │pending├────►│running├────►│succeed │
// └──┬────┘ └───┬───┘ └────────┘
// ▼ │ ┌──────────┐
// ┌──────┐ ├────────►│cancelling│
// │failed│ │ └────┬─────┘
// └──────┘ │ ▼
// │ ┌─────────┐ ┌────────┐
// └────────►│reverting├────►│reverted│
// └────┬────┘ └────────┘
// │ ┌─────────────┐
// └─────────►│revert_failed│
// └─────────────┘
// 1. succeed: pending -> running -> succeed
// 2. failed: pending -> running -> reverting -> reverted/revert_failed, pending -> failed
// 3. canceled: pending -> running -> cancelling -> reverting -> reverted/revert_failed
// 3. pause/resume: pending -> running -> pausing -> paused -> running
//
// subtask state machine for normal subtask:
//
// ┌──────────────┐
// │ ┌───┴──┐
// │ ┌───────►│paused│
// ▼ │ └──────┘
// ┌───────┐ ┌───┴───┐ ┌───────┐
// │pending├───►│running├───►│succeed│
// └───────┘ └───┬───┘ └───────┘
// │ ┌──────┐
// ├───────►│failed│
// │ └──────┘
// │ ┌────────┐
// └───────►│canceled│
// └────────┘
//
// for reverting subtask:
//
// ┌──────────────┐ ┌─────────┐ ┌─────────┐
// │revert_pending├───►│reverting├──►│ reverted│
// └──────────────┘ └────┬────┘ └─────────┘
// │ ┌─────────────┐
// └────────►│revert_failed│
// └─────────────┘
// 1. succeed/failed: pending -> running -> succeed/failed
// 2. canceled: pending -> running -> canceled
// 3. rollback: revert_pending -> reverting -> reverted/revert_failed
// 4. pause/resume: pending -> running -> paused -> running
const (
TaskStatePending = "pending"
TaskStateRunning = "running"
Expand Down
12 changes: 6 additions & 6 deletions disttask/framework/scheduler/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (b *ManagerBuilder) BuildManager(ctx context.Context, id string, taskTable
m := &Manager{
id: id,
taskTable: taskTable,
logCtx: logutil.WithKeyValue(context.Background(), "dist_task_manager", id),
logCtx: logutil.WithFields(context.Background()),
newPool: b.newPool,
}
m.ctx, m.cancel = context.WithCancel(ctx)
Expand Down Expand Up @@ -201,7 +201,7 @@ func (m *Manager) onRunnableTasks(ctx context.Context, tasks []*proto.Task) {
if !exist {
continue
}
logutil.Logger(m.logCtx).Info("detect new subtask", zap.Any("task_id", task.ID))
logutil.Logger(m.logCtx).Info("detect new subtask", zap.Int64("task-id", task.ID))
m.addHandlingTask(task.ID)
t := task
err = m.schedulerPool.Run(func() {
Expand All @@ -222,7 +222,7 @@ func (m *Manager) onCanceledTasks(_ context.Context, tasks []*proto.Task) {
m.mu.RLock()
defer m.mu.RUnlock()
for _, task := range tasks {
logutil.Logger(m.logCtx).Info("onCanceledTasks", zap.Any("task_id", task.ID))
logutil.Logger(m.logCtx).Info("onCanceledTasks", zap.Int64("task-id", task.ID))
if cancel, ok := m.mu.handlingTasks[task.ID]; ok && cancel != nil {
cancel()
}
Expand Down Expand Up @@ -250,7 +250,7 @@ func (m *Manager) cancelAllRunningTasks() {
m.mu.RLock()
defer m.mu.RUnlock()
for id, cancel := range m.mu.handlingTasks {
logutil.Logger(m.logCtx).Info("cancelAllRunningTasks", zap.Any("task_id", id))
logutil.Logger(m.logCtx).Info("cancelAllRunningTasks", zap.Int64("task-id", id))
if cancel != nil {
cancel()
}
Expand Down Expand Up @@ -282,7 +282,7 @@ var testContexts sync.Map

// onRunnableTask handles a runnable task.
func (m *Manager) onRunnableTask(ctx context.Context, task *proto.Task) {
logutil.Logger(m.logCtx).Info("onRunnableTask", zap.Int64("task_id", task.ID), zap.String("type", task.Type))
logutil.Logger(m.logCtx).Info("onRunnableTask", zap.Int64("task-id", task.ID), zap.String("type", task.Type))
// runCtx only used in scheduler.Run, cancel in m.fetchAndFastCancelTasks.
factory := getSchedulerFactory(task.Type)
if factory == nil {
Expand Down Expand Up @@ -320,7 +320,7 @@ func (m *Manager) onRunnableTask(ctx context.Context, task *proto.Task) {
}
if task.State != proto.TaskStateRunning && task.State != proto.TaskStateReverting {
logutil.Logger(m.logCtx).Info("onRunnableTask exit",
zap.Int64("task_id", task.ID), zap.Int64("step", task.Step), zap.String("state", task.State))
zap.Int64("task-id", task.ID), zap.Int64("step", task.Step), zap.String("state", task.State))
return
}
if exist, err := m.taskTable.HasSubtasksInStates(m.id, task.ID, task.Step, proto.TaskStatePending, proto.TaskStateRevertPending); err != nil {
Expand Down
6 changes: 2 additions & 4 deletions disttask/framework/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package scheduler

import (
"context"
"fmt"
"sync"
"time"

Expand Down Expand Up @@ -60,12 +59,11 @@ type BaseScheduler struct {

// NewBaseScheduler creates a new BaseScheduler.
func NewBaseScheduler(_ context.Context, id string, taskID int64, taskTable TaskTable) *BaseScheduler {
logPrefix := fmt.Sprintf("id: %s, task_id: %d", id, taskID)
schedulerImpl := &BaseScheduler{
id: id,
taskID: taskID,
taskTable: taskTable,
logCtx: logutil.WithKeyValue(context.Background(), "scheduler", logPrefix),
logCtx: logutil.WithFields(context.Background(), zap.Int64("task-id", taskID)),
}
return schedulerImpl
}
Expand Down Expand Up @@ -424,7 +422,7 @@ func (s *BaseScheduler) onError(err error) {
if err == nil {
return
}
err = errors.WithStack(err)
err = errors.Trace(err)
logutil.Logger(s.logCtx).Error("onError", zap.Error(err))
s.mu.Lock()
defer s.mu.Unlock()
Expand Down

0 comments on commit e70f6e4

Please sign in to comment.