Skip to content

Commit

Permalink
fix: do not recycle the intermediate commands slice of cluster DoMult…
Browse files Browse the repository at this point in the history
…i when network errors (#706)

Signed-off-by: Rueian <[email protected]>
  • Loading branch information
rueian authored Dec 19, 2024
1 parent 9ff4c60 commit f55f3aa
Showing 1 changed file with 16 additions and 9 deletions.
25 changes: 16 additions & 9 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,8 +645,10 @@ func (c *clusterClient) pickMulti(ctx context.Context, multi []Completed) (*conn

func (c *clusterClient) doresultfn(
ctx context.Context, results *redisresults, retries *connretry, mu *sync.Mutex, cc conn, cIndexes []int, commands []Completed, resps []RedisResult, attempts int,
) {
) (clean bool) {
clean = true
for i, resp := range resps {
clean = clean && resp.NonRedisError() == nil
ii := cIndexes[i]
cm := commands[i]
results.s[ii] = resp
Expand All @@ -658,7 +660,6 @@ func (c *clusterClient) doresultfn(
if !c.retry || !cm.IsReadOnly() {
continue
}

retryDelay = c.retryHandler.RetryDelay(attempts, cm, resp.Error())
} else {
nc = c.redirectOrNew(addr, cc, cm.Slot(), mode)
Expand All @@ -685,22 +686,24 @@ func (c *clusterClient) doresultfn(
mu.Unlock()
}
}
return clean
}

func (c *clusterClient) doretry(
ctx context.Context, cc conn, results *redisresults, retries *connretry, re *retry, mu *sync.Mutex, wg *sync.WaitGroup, attempts int,
) {
clean := true
if len(re.commands) != 0 {
resps := cc.DoMulti(ctx, re.commands...)
c.doresultfn(ctx, results, retries, mu, cc, re.cIndexes, re.commands, resps.s, attempts)
clean = c.doresultfn(ctx, results, retries, mu, cc, re.cIndexes, re.commands, resps.s, attempts)
resultsp.Put(resps)
}
if len(re.cAskings) != 0 {
resps := askingMulti(cc, ctx, re.cAskings)
c.doresultfn(ctx, results, retries, mu, cc, re.aIndexes, re.cAskings, resps.s, attempts)
clean = c.doresultfn(ctx, results, retries, mu, cc, re.aIndexes, re.cAskings, resps.s, attempts) && clean
resultsp.Put(resps)
}
if ctx.Err() == nil {
if clean {
retryp.Put(re)
}
wg.Done()
Expand Down Expand Up @@ -928,8 +931,10 @@ func (c *clusterClient) pickMultiCache(ctx context.Context, multi []CacheableTTL

func (c *clusterClient) resultcachefn(
ctx context.Context, results *redisresults, retries *connretrycache, mu *sync.Mutex, cc conn, cIndexes []int, commands []CacheableTTL, resps []RedisResult, attempts int,
) {
) (clean bool) {
clean = true
for i, resp := range resps {
clean = clean && resp.NonRedisError() == nil
ii := cIndexes[i]
cm := commands[i]
results.s[ii] = resp
Expand Down Expand Up @@ -968,22 +973,24 @@ func (c *clusterClient) resultcachefn(
mu.Unlock()
}
}
return clean
}

func (c *clusterClient) doretrycache(
ctx context.Context, cc conn, results *redisresults, retries *connretrycache, re *retrycache, mu *sync.Mutex, wg *sync.WaitGroup, attempts int,
) {
clean := true
if len(re.commands) != 0 {
resps := cc.DoMultiCache(ctx, re.commands...)
c.resultcachefn(ctx, results, retries, mu, cc, re.cIndexes, re.commands, resps.s, attempts)
clean = c.resultcachefn(ctx, results, retries, mu, cc, re.cIndexes, re.commands, resps.s, attempts)
resultsp.Put(resps)
}
if len(re.cAskings) != 0 {
resps := askingMultiCache(cc, ctx, re.cAskings)
c.resultcachefn(ctx, results, retries, mu, cc, re.aIndexes, re.cAskings, resps.s, attempts)
clean = c.resultcachefn(ctx, results, retries, mu, cc, re.aIndexes, re.cAskings, resps.s, attempts) && clean
resultsp.Put(resps)
}
if ctx.Err() == nil {
if clean {
retrycachep.Put(re)
}
wg.Done()
Expand Down

0 comments on commit f55f3aa

Please sign in to comment.