Skip to content

Commit

Permalink
client: refactor intercepted client (#1557)
Browse files Browse the repository at this point in the history
 

Signed-off-by: zyguan <[email protected]>
  • Loading branch information
zyguan authored Jan 16, 2025
1 parent 405d444 commit e9d868c
Showing 1 changed file with 73 additions and 13 deletions.
86 changes: 73 additions & 13 deletions internal/client/client_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,23 +41,49 @@ func NewInterceptedClient(client Client) Client {
return interceptedClient{client}
}

func (r interceptedClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) {
// Build the resource control interceptor.
var finalInterceptor interceptor.RPCInterceptor = buildResourceControlInterceptor(ctx, req)
// Chain the interceptors if there are multiple interceptors.
if it := interceptor.GetRPCInterceptorFromCtx(ctx); it != nil {
if finalInterceptor != nil {
finalInterceptor = interceptor.ChainRPCInterceptors(finalInterceptor, it)
} else {
finalInterceptor = it
func (r interceptedClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (resp *tikvrpc.Response, err error) {
var ruDetails *util.RUDetails

resourceGroupName, resourceControlInterceptor, reqInfo := getResourceControlInfo(ctx, req)
if resourceControlInterceptor != nil {
consumption, penalty, waitDuration, priority, err := resourceControlInterceptor.OnRequestWait(ctx, resourceGroupName, reqInfo)
if err != nil {
return nil, err
}
req.GetResourceControlContext().Penalty = penalty
// override request priority with resource group priority if it's not set.
// Get the priority at tikv side has some performance issue, so we pass it
// at client side. See: https://github.com/tikv/tikv/issues/15994 for more details.
if req.GetResourceControlContext().OverridePriority == 0 {
req.GetResourceControlContext().OverridePriority = uint64(priority)
}

if val := ctx.Value(util.RUDetailsCtxKey); val != nil {
ruDetails = val.(*util.RUDetails)
ruDetails.Update(consumption, waitDuration)
}
}
if finalInterceptor != nil {
return finalInterceptor.Wrap(func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) {

if ctxInterceptor := interceptor.GetRPCInterceptorFromCtx(ctx); ctxInterceptor == nil {
resp, err = r.Client.SendRequest(ctx, addr, req, timeout)
} else {
resp, err = ctxInterceptor.Wrap(func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) {
return r.Client.SendRequest(ctx, target, req, timeout)
})(addr, req)
}
return r.Client.SendRequest(ctx, addr, req, timeout)

if resourceControlInterceptor != nil && resp != nil {
respInfo := resourcecontrol.MakeResponseInfo(resp)
consumption, waitDuration, err := resourceControlInterceptor.OnResponseWait(ctx, resourceGroupName, reqInfo, respInfo)
if err != nil {
return nil, err
}
if ruDetails != nil {
ruDetails.Update(consumption, waitDuration)
}
}

return resp, err
}

var (
Expand All @@ -67,9 +93,43 @@ var (
ResourceControlInterceptor atomic.Pointer[resourceControlClient.ResourceGroupKVInterceptor]
)

func getResourceControlInfo(ctx context.Context, req *tikvrpc.Request) (
string,
resourceControlClient.ResourceGroupKVInterceptor,
*resourcecontrol.RequestInfo,
) {
resourceGroupName := req.GetResourceControlContext().GetResourceGroupName()
if len(resourceGroupName) == 0 {
return "", nil, nil
}
if !ResourceControlSwitch.Load().(bool) {
return "", nil, nil
}
rcInterceptor := ResourceControlInterceptor.Load()
if rcInterceptor == nil {
return "", nil, nil
}
// bypass some internal requests and it's may influence user experience. For example, the
// request of `alter user password`, totally bypasses the resource control. it's not cost
// many resources, but it's may influence the user experience.
// If the resource group has background jobs, we should not record consumption and wait for it.
// Background jobs will record and report in tikv side.
resourceControlInterceptor := *rcInterceptor
if resourceControlInterceptor.IsBackgroundRequest(ctx, resourceGroupName, req.RequestSource) {
return "", nil, nil
}
reqInfo := resourcecontrol.MakeRequestInfo(req)
if reqInfo.Bypass() {
return "", nil, nil
}
return resourceGroupName, resourceControlInterceptor, reqInfo
}

// buildResourceControlInterceptor builds a resource control interceptor with
// the given resource group name.
func buildResourceControlInterceptor(
//
// Deprecated: embedded in `interceptedClient.SendRequest` directly to reduce overhead.
func buildResourceControlInterceptor( //nolint:unused
ctx context.Context,
req *tikvrpc.Request,
) interceptor.RPCInterceptor {
Expand Down

0 comments on commit e9d868c

Please sign in to comment.