Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1656 upload timeouts #2122

Merged
merged 8 commits into from
Dec 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG_PENDING.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
- \#2103 Suspend sessions that did not pass p-hash verification (@leszko)
- \#2110 Transparently support HTTP/2 for segment requests while allowing HTTP/1 via GODEBUG runtime flags (@yondonfu)
- \#2124 Do not retry transcoding if HTTP client closed/canceled the connection (@leszko)
- \#2122 Add the upload segment timeout to improve failing fast (@leszko)

#### Orchestrator

Expand Down
3 changes: 3 additions & 0 deletions common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ import (
// HTTPTimeout timeout used in HTTP connections between nodes
var HTTPTimeout = 8 * time.Second

// SegmentUploadTimeout timeout used in HTTP connections for uploading the segment
var SegmentUploadTimeout = 2 * time.Second

// Max Segment Duration
var MaxDuration = (5 * time.Minute)

Expand Down
42 changes: 35 additions & 7 deletions server/segment_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ const segmentHeader = "Livepeer-Segment"

const pixelEstimateMultiplier = 1.02

const segUploadTimeoutMultiplier = 0.5
const segHttpPushTimeoutMultiplier = 4.0

var errSegEncoding = errors.New("ErrorSegEncoding")
var errSegSig = errors.New("ErrSegSig")
var errFormat = errors.New("unrecognized profile output format")
Expand Down Expand Up @@ -434,16 +437,24 @@ func SubmitSegment(sess *BroadcastSession, seg *stream.HLSSegment, nonce uint64,
return nil, err
}

// timeout for the whole HTTP call: segment upload, transcoding, reading response
httpTimeout := common.HTTPTimeout
// set a minimum timeout to accommodate transport / processing overhead
dur := common.HTTPTimeout
paddedDur := 4.0 * seg.Duration // use a multiplier of 4 for now
if paddedDur > dur.Seconds() {
dur = time.Duration(paddedDur * float64(time.Second))
paddedDur := segHttpPushTimeoutMultiplier * seg.Duration
if paddedDur > httpTimeout.Seconds() {
httpTimeout = time.Duration(paddedDur * float64(time.Second))
}
// timeout for the segment upload, until HTTP returns OK 200
uploadTimeout := time.Duration(segUploadTimeoutMultiplier * seg.Duration * float64(time.Second))
if uploadTimeout <= 0 {
victorges marked this conversation as resolved.
Show resolved Hide resolved
uploadTimeout = common.SegmentUploadTimeout
}
ctx, cancel := context.WithTimeout(context.Background(), dur)

ctx, cancel := context.WithTimeout(context.Background(), httpTimeout)
defer cancel()

ti := sess.OrchestratorInfo

req, err := http.NewRequestWithContext(ctx, "POST", ti.Transcoder+"/segment", bytes.NewBuffer(data))
if err != nil {
glog.Errorf("Could not generate transcode request to orch=%s", ti.Transcoder)
Expand All @@ -463,9 +474,9 @@ func SubmitSegment(sess *BroadcastSession, seg *stream.HLSSegment, nonce uint64,
req.Header.Set("Content-Type", "video/MP2T")
}

glog.Infof("Submitting segment nonce=%d manifestID=%s sessionID=%s seqNo=%d bytes=%v orch=%s timeout=%s", nonce, params.ManifestID, sess.OrchestratorInfo.AuthToken.SessionId, seg.SeqNo, len(data), ti.Transcoder, dur)
glog.Infof("Submitting segment nonce=%d manifestID=%s sessionID=%s seqNo=%d bytes=%v orch=%s timeout=%s", nonce, params.ManifestID, sess.OrchestratorInfo.AuthToken.SessionId, seg.SeqNo, len(data), ti.Transcoder, httpTimeout)
start := time.Now()
resp, err := httpClient.Do(req)
resp, err := sendReqWithTimeout(req, uploadTimeout)
uploadDur := time.Since(start)
if err != nil {
glog.Errorf("Unable to submit segment orch=%v nonce=%d manifestID=%s sessionID=%s seqNo=%d orch=%s err=%v", ti.Transcoder, nonce, params.ManifestID, sess.OrchestratorInfo.AuthToken.SessionId, seg.SeqNo, ti.Transcoder, err)
Expand Down Expand Up @@ -816,3 +827,20 @@ func validatePrice(sess *BroadcastSession) error {
}
return nil
}

func sendReqWithTimeout(req *http.Request, timeout time.Duration) (*http.Response, error) {
ctx, cancel := context.WithCancel(req.Context())
timeouter := time.AfterFunc(timeout, cancel)

req = req.WithContext(ctx)
resp, err := httpClient.Do(req)
if timeouter.Stop() {
return resp, err
}
// timeout has already fired and cancelled the request
if err != nil {
return nil, err
}
resp.Body.Close()
return nil, context.DeadlineExceeded
}
28 changes: 26 additions & 2 deletions server/segment_rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package server

import (
"bytes"
"context"
"crypto/tls"
"encoding/base64"
"errors"
Expand Down Expand Up @@ -2000,8 +2001,7 @@ func TestSubmitSegment_Timeout(t *testing.T) {
headerTimeout = 100 * time.Millisecond
lock.Unlock()
_, err = SubmitSegment(sess, seg, 0, false, true)
assert.Contains(err.Error(), "header timeout")
assert.Contains(err.Error(), "context deadline exceeded")
assert.Contains(err.Error(), "context canceled")

// time out body
lock.Lock()
Expand Down Expand Up @@ -2261,6 +2261,30 @@ func TestSubmitSegment_Success(t *testing.T) {
balance.AssertCalled(t, "Credit", ratMatcher(change))
}

func TestSendReqWithTimeout(t *testing.T) {
assert := assert.New(t)

var wg sync.WaitGroup
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
wg.Wait()
w.WriteHeader(http.StatusOK)
}))
defer server.Close()
req, _ := http.NewRequestWithContext(context.Background(), "POST", server.URL, nil)

// no timeout
resp, err := sendReqWithTimeout(req, 5*time.Second)
assert.NoError(err)
assert.Equal(200, resp.StatusCode)

// timeout
wg.Add(1)
resp, err = sendReqWithTimeout(req, time.Millisecond)
wg.Done()
assert.Nil(resp)
assert.ErrorIs(err, context.Canceled)
}

func stubTLSServer() (*httptest.Server, *http.ServeMux) {
mux := http.NewServeMux()

Expand Down