diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 107daddb70..ac2701c469 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -20,6 +20,7 @@ - \#2103 Suspend sessions that did not pass p-hash verification (@leszko) - \#2110 Transparently support HTTP/2 for segment requests while allowing HTTP/1 via GODEBUG runtime flags (@yondonfu) - \#2124 Do not retry transcoding if HTTP client closed/canceled the connection (@leszko) +- \#2122 Add the upload segment timeout to improve failing fast (@leszko) #### Orchestrator diff --git a/common/util.go b/common/util.go index c6a432eeb3..b7b43ac500 100644 --- a/common/util.go +++ b/common/util.go @@ -29,6 +29,9 @@ import ( // HTTPTimeout timeout used in HTTP connections between nodes var HTTPTimeout = 8 * time.Second +// SegmentUploadTimeout timeout used in HTTP connections for uploading the segment +var SegmentUploadTimeout = 2 * time.Second + // Max Segment Duration var MaxDuration = (5 * time.Minute) diff --git a/server/segment_rpc.go b/server/segment_rpc.go index d89eee8ec3..c7c0ec4023 100644 --- a/server/segment_rpc.go +++ b/server/segment_rpc.go @@ -36,6 +36,9 @@ const segmentHeader = "Livepeer-Segment" const pixelEstimateMultiplier = 1.02 +const segUploadTimeoutMultiplier = 0.5 +const segHttpPushTimeoutMultiplier = 4.0 + var errSegEncoding = errors.New("ErrorSegEncoding") var errSegSig = errors.New("ErrSegSig") var errFormat = errors.New("unrecognized profile output format") @@ -434,16 +437,24 @@ func SubmitSegment(sess *BroadcastSession, seg *stream.HLSSegment, nonce uint64, return nil, err } + // timeout for the whole HTTP call: segment upload, transcoding, reading response + httpTimeout := common.HTTPTimeout // set a minimum timeout to accommodate transport / processing overhead - dur := common.HTTPTimeout - paddedDur := 4.0 * seg.Duration // use a multiplier of 4 for now - if paddedDur > dur.Seconds() { - dur = time.Duration(paddedDur * float64(time.Second)) + paddedDur := segHttpPushTimeoutMultiplier * seg.Duration + if paddedDur > httpTimeout.Seconds() { + httpTimeout = time.Duration(paddedDur * float64(time.Second)) + } + // timeout for the segment upload, until HTTP returns OK 200 + uploadTimeout := time.Duration(segUploadTimeoutMultiplier * seg.Duration * float64(time.Second)) + if uploadTimeout <= 0 { + uploadTimeout = common.SegmentUploadTimeout } - ctx, cancel := context.WithTimeout(context.Background(), dur) + + ctx, cancel := context.WithTimeout(context.Background(), httpTimeout) defer cancel() ti := sess.OrchestratorInfo + req, err := http.NewRequestWithContext(ctx, "POST", ti.Transcoder+"/segment", bytes.NewBuffer(data)) if err != nil { glog.Errorf("Could not generate transcode request to orch=%s", ti.Transcoder) @@ -463,9 +474,9 @@ func SubmitSegment(sess *BroadcastSession, seg *stream.HLSSegment, nonce uint64, req.Header.Set("Content-Type", "video/MP2T") } - glog.Infof("Submitting segment nonce=%d manifestID=%s sessionID=%s seqNo=%d bytes=%v orch=%s timeout=%s", nonce, params.ManifestID, sess.OrchestratorInfo.AuthToken.SessionId, seg.SeqNo, len(data), ti.Transcoder, dur) + glog.Infof("Submitting segment nonce=%d manifestID=%s sessionID=%s seqNo=%d bytes=%v orch=%s timeout=%s", nonce, params.ManifestID, sess.OrchestratorInfo.AuthToken.SessionId, seg.SeqNo, len(data), ti.Transcoder, httpTimeout) start := time.Now() - resp, err := httpClient.Do(req) + resp, err := sendReqWithTimeout(req, uploadTimeout) uploadDur := time.Since(start) if err != nil { glog.Errorf("Unable to submit segment orch=%v nonce=%d manifestID=%s sessionID=%s seqNo=%d orch=%s err=%v", ti.Transcoder, nonce, params.ManifestID, sess.OrchestratorInfo.AuthToken.SessionId, seg.SeqNo, ti.Transcoder, err) @@ -816,3 +827,20 @@ func validatePrice(sess *BroadcastSession) error { } return nil } + +func sendReqWithTimeout(req *http.Request, timeout time.Duration) (*http.Response, error) { + ctx, cancel := context.WithCancel(req.Context()) + timeouter := time.AfterFunc(timeout, cancel) + + req = req.WithContext(ctx) + resp, err := httpClient.Do(req) + if timeouter.Stop() { + return resp, err + } + // timeout has already fired and cancelled the request + if err != nil { + return nil, err + } + resp.Body.Close() + return nil, context.DeadlineExceeded +} diff --git a/server/segment_rpc_test.go b/server/segment_rpc_test.go index caf879906d..59142bf74f 100644 --- a/server/segment_rpc_test.go +++ b/server/segment_rpc_test.go @@ -2,6 +2,7 @@ package server import ( "bytes" + "context" "crypto/tls" "encoding/base64" "errors" @@ -2000,8 +2001,7 @@ func TestSubmitSegment_Timeout(t *testing.T) { headerTimeout = 100 * time.Millisecond lock.Unlock() _, err = SubmitSegment(sess, seg, 0, false, true) - assert.Contains(err.Error(), "header timeout") - assert.Contains(err.Error(), "context deadline exceeded") + assert.Contains(err.Error(), "context canceled") // time out body lock.Lock() @@ -2261,6 +2261,30 @@ func TestSubmitSegment_Success(t *testing.T) { balance.AssertCalled(t, "Credit", ratMatcher(change)) } +func TestSendReqWithTimeout(t *testing.T) { + assert := assert.New(t) + + var wg sync.WaitGroup + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + wg.Wait() + w.WriteHeader(http.StatusOK) + })) + defer server.Close() + req, _ := http.NewRequestWithContext(context.Background(), "POST", server.URL, nil) + + // no timeout + resp, err := sendReqWithTimeout(req, 5*time.Second) + assert.NoError(err) + assert.Equal(200, resp.StatusCode) + + // timeout + wg.Add(1) + resp, err = sendReqWithTimeout(req, time.Millisecond) + wg.Done() + assert.Nil(resp) + assert.ErrorIs(err, context.Canceled) +} + func stubTLSServer() (*httptest.Server, *http.ServeMux) { mux := http.NewServeMux()