diff --git a/go.mod b/go.mod index 2df6a0d298..9e71892c77 100644 --- a/go.mod +++ b/go.mod @@ -34,7 +34,7 @@ require ( golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 google.golang.org/protobuf v1.27.1 - k8s.io/api v0.0.0-20220331140502-02c2207317b5 + k8s.io/api v0.0.0-20220402025220-2de699698342 k8s.io/apimachinery v0.0.0-20220330050810-6550efdb7444 k8s.io/klog/v2 v2.60.1 k8s.io/kube-openapi v0.0.0-20220328201542-3ee0da9b0b42 @@ -44,6 +44,6 @@ require ( ) replace ( - k8s.io/api => k8s.io/api v0.0.0-20220331140502-02c2207317b5 + k8s.io/api => k8s.io/api v0.0.0-20220402025220-2de699698342 k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20220330050810-6550efdb7444 ) diff --git a/go.sum b/go.sum index f986516f2d..a1ac9bf501 100644 --- a/go.sum +++ b/go.sum @@ -628,8 +628,8 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= -k8s.io/api v0.0.0-20220331140502-02c2207317b5 h1:shLc1jkM9dNz8zPSm2YeE/XOpp1UP36AZOt5DjspI+0= -k8s.io/api v0.0.0-20220331140502-02c2207317b5/go.mod h1:69QWTzqWVlGn0rU+x3dmk3WAsUQHmeQwIBWMbK1ZEyE= +k8s.io/api v0.0.0-20220402025220-2de699698342 h1:xFpsdy7RmF2niTyB76yUyPubqMa5m9/L9sswkdyhONo= +k8s.io/api v0.0.0-20220402025220-2de699698342/go.mod h1:69QWTzqWVlGn0rU+x3dmk3WAsUQHmeQwIBWMbK1ZEyE= k8s.io/apimachinery v0.0.0-20220330050810-6550efdb7444 h1:whQmS3GtF822OUer+LPJnMFKn6kPfuJOCM/3xUuATIY= k8s.io/apimachinery v0.0.0-20220330050810-6550efdb7444/go.mod h1:82Bi4sCzVBdpYjyI4jY6aHX+YCUchUIrZrXKedjd2UM= k8s.io/gengo v0.0.0-20210813121822-485abfe95c7c/go.mod h1:FiNAH4ZV3gBg2Kwh89tzAEV2be7d5xI0vBa/VySYy3E= diff --git a/rest/request.go b/rest/request.go index e39b6b9dce..ab9348d9cc 100644 --- a/rest/request.go +++ b/rest/request.go @@ -614,15 +614,14 @@ func (r *Request) Watch(ctx context.Context) (watch.Interface, error) { } url := r.URL().String() for { - req, err := r.newHTTPRequest(ctx) - if err != nil { - return nil, err - } - if err := r.retry.Before(ctx, r); err != nil { return nil, r.retry.WrapPreviousError(err) } + req, err := r.newHTTPRequest(ctx) + if err != nil { + return nil, err + } resp, err := client.Do(req) updateURLMetrics(ctx, r, resp, err) r.retry.After(ctx, r, resp, err) @@ -722,6 +721,10 @@ func (r *Request) Stream(ctx context.Context) (io.ReadCloser, error) { url := r.URL().String() for { + if err := r.retry.Before(ctx, r); err != nil { + return nil, err + } + req, err := r.newHTTPRequest(ctx) if err != nil { return nil, err @@ -729,11 +732,6 @@ func (r *Request) Stream(ctx context.Context) (io.ReadCloser, error) { if r.body != nil { req.Body = ioutil.NopCloser(r.body) } - - if err := r.retry.Before(ctx, r); err != nil { - return nil, err - } - resp, err := client.Do(req) updateURLMetrics(ctx, r, resp, err) r.retry.After(ctx, r, resp, err) @@ -859,14 +857,13 @@ func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Resp // Right now we make about ten retry attempts if we get a Retry-After response. for { + if err := r.retry.Before(ctx, r); err != nil { + return r.retry.WrapPreviousError(err) + } req, err := r.newHTTPRequest(ctx) if err != nil { return err } - - if err := r.retry.Before(ctx, r); err != nil { - return r.retry.WrapPreviousError(err) - } resp, err := client.Do(req) updateURLMetrics(ctx, r, resp, err) // The value -1 or a value of 0 with a non-nil Body indicates that the length is unknown. diff --git a/rest/request_test.go b/rest/request_test.go index b942885756..8c3c650820 100644 --- a/rest/request_test.go +++ b/rest/request_test.go @@ -938,7 +938,7 @@ func TestRequestWatch(t *testing.T) { }, Err: true, ErrFn: func(err error) bool { - return apierrors.IsInternalError(err) + return !apierrors.IsInternalError(err) && strings.Contains(err.Error(), "failed to reset the request body while retrying a request: EOF") }, }, { @@ -954,7 +954,10 @@ func TestRequestWatch(t *testing.T) { serverReturns: []responseErr{ {response: nil, err: io.EOF}, }, - Empty: true, + Err: true, + ErrFn: func(err error) bool { + return !apierrors.IsInternalError(err) + }, }, { name: "max retries 2, server always returns a response with Retry-After header", @@ -1130,7 +1133,7 @@ func TestRequestStream(t *testing.T) { }, Err: true, ErrFn: func(err error) bool { - return apierrors.IsInternalError(err) + return !apierrors.IsInternalError(err) && strings.Contains(err.Error(), "failed to reset the request body while retrying a request: EOF") }, }, { @@ -1371,8 +1374,6 @@ func (b *testBackoffManager) Sleep(d time.Duration) { } func TestCheckRetryClosesBody(t *testing.T) { - // unblock CI until http://issue.k8s.io/108906 is resolved in 1.24 - t.Skip("http://issue.k8s.io/108906") count := 0 ch := make(chan struct{}) testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { @@ -2435,6 +2436,7 @@ func TestRequestWithRetry(t *testing.T) { body io.Reader serverReturns responseErr errExpected error + errContains string transformFuncInvokedExpected int roundTripInvokedExpected int }{ @@ -2451,7 +2453,7 @@ func TestRequestWithRetry(t *testing.T) { body: &readSeeker{err: io.EOF}, serverReturns: responseErr{response: retryAfterResponse(), err: nil}, errExpected: nil, - transformFuncInvokedExpected: 1, + transformFuncInvokedExpected: 0, roundTripInvokedExpected: 1, }, { @@ -2474,7 +2476,7 @@ func TestRequestWithRetry(t *testing.T) { name: "server returns retryable err, request body Seek returns error, retry aborted", body: &readSeeker{err: io.EOF}, serverReturns: responseErr{response: nil, err: io.ErrUnexpectedEOF}, - errExpected: io.ErrUnexpectedEOF, + errContains: "failed to reset the request body while retrying a request: EOF", transformFuncInvokedExpected: 0, roundTripInvokedExpected: 1, }, @@ -2517,8 +2519,15 @@ func TestRequestWithRetry(t *testing.T) { if test.transformFuncInvokedExpected != transformFuncInvoked { t.Errorf("Expected transform func to be invoked %d times, but got: %d", test.transformFuncInvokedExpected, transformFuncInvoked) } - if test.errExpected != unWrap(err) { - t.Errorf("Expected error: %v, but got: %v", test.errExpected, unWrap(err)) + switch { + case test.errExpected != nil: + if test.errExpected != unWrap(err) { + t.Errorf("Expected error: %v, but got: %v", test.errExpected, unWrap(err)) + } + case len(test.errContains) > 0: + if !strings.Contains(err.Error(), test.errContains) { + t.Errorf("Expected error message to caontain: %q, but got: %q", test.errContains, err.Error()) + } } }) } @@ -3531,3 +3540,103 @@ func TestTransportConcurrency(t *testing.T) { }) } } + +// TODO: see if we can consolidate the other trackers into one. +type requestBodyTracker struct { + io.ReadSeeker + f func(string) +} + +func (t *requestBodyTracker) Read(p []byte) (int, error) { + t.f("Request.Body.Read") + return t.ReadSeeker.Read(p) +} + +func (t *requestBodyTracker) Seek(offset int64, whence int) (int64, error) { + t.f("Request.Body.Seek") + return t.ReadSeeker.Seek(offset, whence) +} + +type responseBodyTracker struct { + io.ReadCloser + f func(string) +} + +func (t *responseBodyTracker) Read(p []byte) (int, error) { + t.f("Response.Body.Read") + return t.ReadCloser.Read(p) +} + +func (t *responseBodyTracker) Close() error { + t.f("Response.Body.Close") + return t.ReadCloser.Close() +} + +type recorder struct { + order []string +} + +func (r *recorder) record(call string) { + r.order = append(r.order, call) +} + +func TestRequestBodyResetOrder(t *testing.T) { + recorder := &recorder{} + respBodyTracker := &responseBodyTracker{ + ReadCloser: nil, // the server will fill it + f: recorder.record, + } + + var attempts int + client := clientForFunc(func(req *http.Request) (*http.Response, error) { + defer func() { + attempts++ + }() + + // read the request body. + ioutil.ReadAll(req.Body) + + // first attempt, we send a retry-after + if attempts == 0 { + resp := retryAfterResponse() + respBodyTracker.ReadCloser = ioutil.NopCloser(bytes.NewReader([]byte{})) + resp.Body = respBodyTracker + return resp, nil + } + + return &http.Response{StatusCode: http.StatusOK}, nil + }) + + reqBodyTracker := &requestBodyTracker{ + ReadSeeker: bytes.NewReader([]byte{}), // empty body ensures one Read operation at most. + f: recorder.record, + } + req := &Request{ + verb: "POST", + body: reqBodyTracker, + c: &RESTClient{ + content: defaultContentConfig(), + Client: client, + }, + backoff: &noSleepBackOff{}, + retry: &withRetry{maxRetries: 1}, + } + + req.Do(context.Background()) + + expected := []string{ + // 1st attempt: the server handler reads the request body + "Request.Body.Read", + // the server sends a retry-after, client reads the + // response body, and closes it + "Response.Body.Read", + "Response.Body.Close", + // client retry logic seeks to the beginning of the request body + "Request.Body.Seek", + // 2nd attempt: the server reads the request body + "Request.Body.Read", + } + if !reflect.DeepEqual(expected, recorder.order) { + t.Errorf("Expected invocation request and response body operations for retry do not match: %s", cmp.Diff(expected, recorder.order)) + } +} diff --git a/rest/with_retry.go b/rest/with_retry.go index e729ad1ec4..3082959d18 100644 --- a/rest/with_retry.go +++ b/rest/with_retry.go @@ -78,8 +78,12 @@ type WithRetry interface { IsNextRetry(ctx context.Context, restReq *Request, httpReq *http.Request, resp *http.Response, err error, f IsRetryableErrorFunc) bool // Before should be invoked prior to each attempt, including - // the first one. if an error is returned, the request - // should be aborted immediately. + // the first one. If an error is returned, the request should + // be aborted immediately. + // + // Before may also be additionally responsible for preparing + // the request for the next retry, namely in terms of resetting + // the request body in case it has been read. Before(ctx context.Context, r *Request) error // After should be invoked immediately after an attempt is made. @@ -194,46 +198,18 @@ func (r *withRetry) IsNextRetry(ctx context.Context, restReq *Request, httpReq * r.retryAfter.Wait = time.Duration(seconds) * time.Second r.retryAfter.Reason = getRetryReason(r.attempts, seconds, resp, err) - if err := r.prepareForNextRetry(ctx, restReq); err != nil { - klog.V(4).Infof("Could not retry request - %v", err) - return false - } - return true } -// prepareForNextRetry is responsible for carrying out operations that need -// to be completed before the next retry is initiated: -// - if the request context is already canceled there is no need to -// retry, the function will return ctx.Err(). -// - we need to seek to the beginning of the request body before we -// initiate the next retry, the function should return an error if -// it fails to do so. -func (r *withRetry) prepareForNextRetry(ctx context.Context, request *Request) error { - if ctx.Err() != nil { - return ctx.Err() - } - - // Ensure the response body is fully read and closed before - // we reconnect, so that we reuse the same TCP connection. - if seeker, ok := request.body.(io.Seeker); ok && request.body != nil { - if _, err := seeker.Seek(0, 0); err != nil { - return fmt.Errorf("can't Seek() back to beginning of body for %T", request) - } - } - - klog.V(4).Infof("Got a Retry-After %s response for attempt %d to %v", r.retryAfter.Wait, r.retryAfter.Attempt, request.URL().String()) - return nil -} - func (r *withRetry) Before(ctx context.Context, request *Request) error { + // If the request context is already canceled there + // is no need to retry. if ctx.Err() != nil { r.trackPreviousError(ctx.Err()) return ctx.Err() } url := request.URL() - // r.retryAfter represents the retry after parameters calculated // from the (response, err) tuple from the last attempt, so 'Before' // can apply these retry after parameters prior to the next attempt. @@ -245,6 +221,18 @@ func (r *withRetry) Before(ctx context.Context, request *Request) error { return nil } + // At this point we've made atleast one attempt, post which the response + // body should have been fully read and closed in order for it to be safe + // to reset the request body before we reconnect, in order for us to reuse + // the same TCP connection. + if seeker, ok := request.body.(io.Seeker); ok && request.body != nil { + if _, err := seeker.Seek(0, io.SeekStart); err != nil { + err = fmt.Errorf("failed to reset the request body while retrying a request: %v", err) + r.trackPreviousError(err) + return err + } + } + // if we are here, we have made attempt(s) al least once before. if request.backoff != nil { // TODO(tkashem) with default set to use exponential backoff @@ -263,6 +251,7 @@ func (r *withRetry) Before(ctx context.Context, request *Request) error { return err } + klog.V(4).Infof("Got a Retry-After %s response for attempt %d to %v", r.retryAfter.Wait, r.retryAfter.Attempt, request.URL().String()) return nil }