-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
executor: calibrate resource support tpch10 #47095
Changes from all commits
8fc8fa2
4a3323d
6911e06
d589b3e
015dc2e
26bbf2d
6a53d0f
d4c9ceb
2a6f4e4
ace33f5
bb8b5cd
ada8cdd
505a539
9619e49
6c9c338
585f99e
adec9eb
216cdd7
f86df62
fae0271
15e4a7d
09b47a4
9d069a2
7958a49
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -38,6 +38,7 @@ import ( | |
"github.com/pingcap/tidb/util/mathutil" | ||
"github.com/pingcap/tidb/util/sqlexec" | ||
"github.com/tikv/client-go/v2/oracle" | ||
resourceControlClient "github.com/tikv/pd/client/resource_group/controller" | ||
) | ||
|
||
var ( | ||
|
@@ -252,29 +253,40 @@ func (e *Executor) dynamicCalibrate(ctx context.Context, req *chunk.Chunk, exec | |
if err != nil { | ||
return err | ||
} | ||
tidbQuota, err1 := e.getTiDBQuota(ctx, exec, startTs, endTs) | ||
tiflashQuota, err2 := e.getTiFlashQuota(ctx, exec, startTs, endTs) | ||
if err1 != nil && err2 != nil { | ||
return err1 | ||
} | ||
req.AppendUint64(0, uint64(tidbQuota+tiflashQuota)) | ||
return nil | ||
} | ||
|
||
func (e *Executor) getTiDBQuota(ctx context.Context, exec sqlexec.RestrictedSQLExecutor, startTs, endTs time.Time) (float64, error) { | ||
startTime := startTs.In(e.Ctx().GetSessionVars().Location()).Format(time.DateTime) | ||
endTime := endTs.In(e.Ctx().GetSessionVars().Location()).Format(time.DateTime) | ||
|
||
totalKVCPUQuota, err := getTiKVTotalCPUQuota(ctx, exec) | ||
if err != nil { | ||
return errNoCPUQuotaMetrics.FastGenByArgs(err.Error()) | ||
return 0, errNoCPUQuotaMetrics.FastGenByArgs(err.Error()) | ||
} | ||
totalTiDBCPU, err := getTiDBTotalCPUQuota(ctx, exec) | ||
if err != nil { | ||
return errNoCPUQuotaMetrics.FastGenByArgs(err.Error()) | ||
return 0, errNoCPUQuotaMetrics.FastGenByArgs(err.Error()) | ||
} | ||
rus, err := getRUPerSec(ctx, e.Ctx(), exec, startTime, endTime) | ||
if err != nil { | ||
return err | ||
return 0, err | ||
} | ||
tikvCPUs, err := getComponentCPUUsagePerSec(ctx, e.Ctx(), exec, "tikv", startTime, endTime) | ||
if err != nil { | ||
return err | ||
return 0, err | ||
} | ||
tidbCPUs, err := getComponentCPUUsagePerSec(ctx, e.Ctx(), exec, "tidb", startTime, endTime) | ||
if err != nil { | ||
return err | ||
return 0, err | ||
} | ||
|
||
failpoint.Inject("mockMetricsDataFilter", func() { | ||
ret := make([]*timePointValue, 0) | ||
for _, point := range tikvCPUs.vals { | ||
|
@@ -332,8 +344,16 @@ func (e *Executor) dynamicCalibrate(ctx context.Context, req *chunk.Chunk, exec | |
tidbCPUs.next() | ||
tikvCPUs.next() | ||
} | ||
quota, err := setupQuotas(quotas) | ||
if err != nil { | ||
return 0, err | ||
} | ||
return quota, nil | ||
} | ||
|
||
func setupQuotas(quotas []float64) (float64, error) { | ||
if len(quotas) < 2 { | ||
return errLowUsage | ||
return 0, errLowUsage | ||
} | ||
sort.Slice(quotas, func(i, j int) bool { | ||
return quotas[i] > quotas[j] | ||
|
@@ -344,9 +364,46 @@ func (e *Executor) dynamicCalibrate(ctx context.Context, req *chunk.Chunk, exec | |
for i := lowerBound; i < upperBound; i++ { | ||
sum += quotas[i] | ||
} | ||
quota := sum / float64(upperBound-lowerBound) | ||
req.AppendUint64(0, uint64(quota)) | ||
return nil | ||
return sum / float64(upperBound-lowerBound), nil | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It cannot be zero. |
||
} | ||
|
||
func (e *Executor) getTiFlashQuota(ctx context.Context, exec sqlexec.RestrictedSQLExecutor, startTs, endTs time.Time) (float64, error) { | ||
startTime := startTs.In(e.Ctx().GetSessionVars().Location()).Format(time.DateTime) | ||
endTime := endTs.In(e.Ctx().GetSessionVars().Location()).Format(time.DateTime) | ||
|
||
quotas := make([]float64, 0) | ||
totalTiFlashLogicalCores, err := getTiFlashLogicalCores(ctx, exec) | ||
if err != nil { | ||
return 0, errNoCPUQuotaMetrics.FastGenByArgs(err.Error()) | ||
} | ||
tiflashCPUs, err := getTiFlashCPUUsagePerSec(ctx, e.Ctx(), exec, startTime, endTime) | ||
if err != nil { | ||
return 0, err | ||
} | ||
tiflashRUs, err := getTiFlashRUPerSec(ctx, e.Ctx(), exec, startTime, endTime) | ||
if err != nil { | ||
return 0, err | ||
} | ||
for { | ||
if tiflashRUs.isEnd() || tiflashCPUs.isEnd() { | ||
break | ||
} | ||
// make time point match | ||
maxTime := tiflashRUs.getTime() | ||
if tiflashCPUs.getTime().After(maxTime) { | ||
maxTime = tiflashCPUs.getTime() | ||
} | ||
if !tiflashRUs.advance(maxTime) || !tiflashCPUs.advance(maxTime) { | ||
continue | ||
} | ||
tiflashQuota := tiflashCPUs.getValue() / totalTiFlashLogicalCores | ||
if tiflashQuota > lowUsageThreshold { | ||
quotas = append(quotas, tiflashRUs.getValue()/tiflashQuota) | ||
} | ||
tiflashRUs.next() | ||
tiflashCPUs.next() | ||
} | ||
return setupQuotas(quotas) | ||
} | ||
|
||
func (e *Executor) staticCalibrate(ctx context.Context, req *chunk.Chunk, exec sqlexec.RestrictedSQLExecutor) error { | ||
|
@@ -358,6 +415,10 @@ func (e *Executor) staticCalibrate(ctx context.Context, req *chunk.Chunk, exec s | |
if resourceGroupCtl == nil { | ||
return errors.New("resource group controller is not initialized") | ||
} | ||
ruCfg := resourceGroupCtl.GetConfig() | ||
if e.WorkloadType == ast.TPCH10 { | ||
return staticCalibrateTpch10(ctx, req, exec, ruCfg) | ||
} | ||
|
||
totalKVCPUQuota, err := getTiKVTotalCPUQuota(ctx, exec) | ||
if err != nil { | ||
|
@@ -380,7 +441,6 @@ func (e *Executor) staticCalibrate(ctx context.Context, req *chunk.Chunk, exec s | |
if totalTiDBCPU/baseCost.tidbToKVCPURatio < totalKVCPUQuota { | ||
totalKVCPUQuota = totalTiDBCPU / baseCost.tidbToKVCPURatio | ||
} | ||
ruCfg := resourceGroupCtl.GetConfig() | ||
ruPerKVCPU := float64(ruCfg.ReadBaseCost)*float64(baseCost.readReqCount) + | ||
float64(ruCfg.CPUMsCost)*baseCost.kvCPU*1000 + // convert to ms | ||
float64(ruCfg.ReadBytesCost)*float64(baseCost.readBytes) + | ||
|
@@ -391,6 +451,22 @@ func (e *Executor) staticCalibrate(ctx context.Context, req *chunk.Chunk, exec s | |
return nil | ||
} | ||
|
||
func staticCalibrateTpch10(ctx context.Context, req *chunk.Chunk, exec sqlexec.RestrictedSQLExecutor, ruCfg *resourceControlClient.RUConfig) error { | ||
// TPCH10 only considers the resource usage of the TiFlash including cpu and read bytes. Others are ignored. | ||
// cpu usage: 105494.666484 / 20 / 20 = 263.74 | ||
// read bytes: 401799161689.0 / 20 / 20 = 1004497904.22 | ||
const cpuTimePerCPUPerSec float64 = 263.74 | ||
const readBytesPerCPUPerSec float64 = 1004497904.22 | ||
ruPerCPU := float64(ruCfg.CPUMsCost)*cpuTimePerCPUPerSec + float64(ruCfg.ReadBytesCost)*readBytesPerCPUPerSec | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. An additional question: Is the cost factor used by Tiflash to calculate RU obtained from PD? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, the number is by checking tiflash log(105494.666484 and 401799161689.0) |
||
totalTiFlashLogicalCores, err := getTiFlashLogicalCores(ctx, exec) | ||
if err != nil { | ||
return err | ||
} | ||
quota := totalTiFlashLogicalCores * ruPerCPU | ||
req.AppendUint64(0, uint64(quota)) | ||
return nil | ||
} | ||
|
||
func getTiKVTotalCPUQuota(ctx context.Context, exec sqlexec.RestrictedSQLExecutor) (float64, error) { | ||
query := "SELECT SUM(value) FROM METRICS_SCHEMA.tikv_cpu_quota GROUP BY time ORDER BY time desc limit 1" | ||
return getNumberFromMetrics(ctx, exec, query, "tikv_cpu_quota") | ||
|
@@ -401,6 +477,21 @@ func getTiDBTotalCPUQuota(ctx context.Context, exec sqlexec.RestrictedSQLExecuto | |
return getNumberFromMetrics(ctx, exec, query, "tidb_server_maxprocs") | ||
} | ||
|
||
func getTiFlashLogicalCores(ctx context.Context, exec sqlexec.RestrictedSQLExecutor) (float64, error) { | ||
query := "SELECT SUM(value) FROM METRICS_SCHEMA.tiflash_cpu_quota GROUP BY time ORDER BY time desc limit 1" | ||
return getNumberFromMetrics(ctx, exec, query, "tiflash_cpu_quota") | ||
} | ||
|
||
func getTiFlashRUPerSec(ctx context.Context, sctx sessionctx.Context, exec sqlexec.RestrictedSQLExecutor, startTime, endTime string) (*timeSeriesValues, error) { | ||
query := fmt.Sprintf("SELECT time, value FROM METRICS_SCHEMA.tiflash_resource_manager_resource_unit where time >= '%s' and time <= '%s' ORDER BY time asc", startTime, endTime) | ||
return getValuesFromMetrics(ctx, sctx, exec, query) | ||
} | ||
|
||
func getTiFlashCPUUsagePerSec(ctx context.Context, sctx sessionctx.Context, exec sqlexec.RestrictedSQLExecutor, startTime, endTime string) (*timeSeriesValues, error) { | ||
query := fmt.Sprintf("SELECT time, sum(value) FROM METRICS_SCHEMA.tiflash_process_cpu_usage where time >= '%s' and time <= '%s' and job = 'tiflash' GROUP BY time ORDER BY time asc", startTime, endTime) | ||
return getValuesFromMetrics(ctx, sctx, exec, query) | ||
} | ||
|
||
type timePointValue struct { | ||
tp time.Time | ||
val float64 | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like it includes TiKV Quota, does it make sense to be called
getTiDBQuota
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe
getTiDBAndTiKVQuota
?