-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
disttask: init capacity and check concurrency using cpu count of managed node #49875
Changes from all commits
dffd3f5
64cbdac
b927b56
9b1edd0
6532be7
10079f7
9c2c2e1
e983841
60d81f3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
// Copyright 2023 PingCAP, Inc. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package proto | ||
|
||
// ManagedNode is a TiDB node that is managed by the framework. | ||
type ManagedNode struct { | ||
// ID see GenerateExecID, it's named as host in the meta table. | ||
ID string | ||
// Role of the node, either "" or "background" | ||
// all managed node should have the same role | ||
Role string | ||
CPUCount int | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,9 +18,12 @@ import ( | |
"context" | ||
"slices" | ||
"sync" | ||
"sync/atomic" | ||
|
||
"github.com/pingcap/tidb/pkg/disttask/framework/proto" | ||
"github.com/pingcap/tidb/pkg/util/cpu" | ||
"github.com/pingcap/tidb/pkg/util/logutil" | ||
"go.uber.org/zap" | ||
) | ||
|
||
type taskStripes struct { | ||
|
@@ -47,10 +50,7 @@ type taskStripes struct { | |
// quota to subtask, but subtask can determine what to conform. | ||
type slotManager struct { | ||
// Capacity is the total number of slots and stripes. | ||
// TODO: we assume that all nodes managed by dist framework are isomorphic, | ||
// but dist owner might run on normal node where the capacity might not be | ||
// able to run any task. | ||
Comment on lines
-50
to
-52
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add some comments of isomorphic There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. see comments in slotManager |
||
capacity int | ||
capacity atomic.Int32 | ||
|
||
mu sync.RWMutex | ||
// represents the number of stripes reserved by task, when we reserve by the | ||
|
@@ -75,12 +75,16 @@ type slotManager struct { | |
|
||
// newSlotManager creates a new slotManager. | ||
func newSlotManager() *slotManager { | ||
return &slotManager{ | ||
capacity: cpu.GetCPUCount(), | ||
s := &slotManager{ | ||
task2Index: make(map[int64]int), | ||
reservedSlots: make(map[string]int), | ||
usedSlots: make(map[string]int), | ||
} | ||
// this node might not be the managed node of the framework, but we initialize | ||
// capacity with the cpu count of this node, it will be updated when node | ||
// manager starts. | ||
s.updateCapacity(cpu.GetCPUCount()) | ||
return s | ||
} | ||
|
||
// Update updates the used slots on each node. | ||
|
@@ -96,7 +100,7 @@ func (sm *slotManager) update(ctx context.Context, taskMgr TaskManager) error { | |
} | ||
newUsedSlots := make(map[string]int, len(nodes)) | ||
for _, node := range nodes { | ||
newUsedSlots[node] = slotsOnNodes[node] | ||
newUsedSlots[node.ID] = slotsOnNodes[node.ID] | ||
} | ||
sm.mu.Lock() | ||
defer sm.mu.Unlock() | ||
|
@@ -111,6 +115,7 @@ func (sm *slotManager) update(ctx context.Context, taskMgr TaskManager) error { | |
// are enough resources, or return true on resource shortage when some task | ||
// scheduled subtasks. | ||
func (sm *slotManager) canReserve(task *proto.Task) (execID string, ok bool) { | ||
capacity := int(sm.capacity.Load()) | ||
sm.mu.RLock() | ||
defer sm.mu.RUnlock() | ||
if len(sm.usedSlots) == 0 { | ||
|
@@ -125,12 +130,12 @@ func (sm *slotManager) canReserve(task *proto.Task) (execID string, ok bool) { | |
} | ||
reservedForHigherPriority += s.stripes | ||
} | ||
if task.Concurrency+reservedForHigherPriority <= sm.capacity { | ||
if task.Concurrency+reservedForHigherPriority <= capacity { | ||
return "", true | ||
} | ||
|
||
for id, count := range sm.usedSlots { | ||
if count+sm.reservedSlots[id]+task.Concurrency <= sm.capacity { | ||
if count+sm.reservedSlots[id]+task.Concurrency <= capacity { | ||
return id, true | ||
} | ||
} | ||
|
@@ -178,3 +183,16 @@ func (sm *slotManager) unReserve(task *proto.Task, execID string) { | |
} | ||
} | ||
} | ||
|
||
func (sm *slotManager) updateCapacity(cpuCount int) { | ||
old := sm.capacity.Load() | ||
if cpuCount > 0 && cpuCount != int(old) { | ||
sm.capacity.Store(int32(cpuCount)) | ||
if old == 0 { | ||
logutil.BgLogger().Info("initialize slot capacity", zap.Int("capacity", cpuCount)) | ||
} else { | ||
logutil.BgLogger().Info("update slot capacity", | ||
zap.Int("old", int(old)), zap.Int("new", cpuCount)) | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The cpuCount will be set equal to the last node, is it expected?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we assume them isomorphic, first/last makes no difference
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can add a comment~