Skip to content

Commit

Permalink
server: Add timeout for the segment upload (#2122)
Browse files Browse the repository at this point in the history
  • Loading branch information
leszko authored Dec 6, 2021
1 parent e698fd0 commit 6a72cc3
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 9 deletions.
1 change: 1 addition & 0 deletions CHANGELOG_PENDING.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
- \#2103 Suspend sessions that did not pass p-hash verification (@leszko)
- \#2110 Transparently support HTTP/2 for segment requests while allowing HTTP/1 via GODEBUG runtime flags (@yondonfu)
- \#2124 Do not retry transcoding if HTTP client closed/canceled the connection (@leszko)
- \#2122 Add the upload segment timeout to improve failing fast (@leszko)

#### Orchestrator

Expand Down
3 changes: 3 additions & 0 deletions common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ import (
// HTTPTimeout timeout used in HTTP connections between nodes
var HTTPTimeout = 8 * time.Second

// SegmentUploadTimeout timeout used in HTTP connections for uploading the segment
var SegmentUploadTimeout = 2 * time.Second

// Max Segment Duration
var MaxDuration = (5 * time.Minute)

Expand Down
42 changes: 35 additions & 7 deletions server/segment_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ const segmentHeader = "Livepeer-Segment"

const pixelEstimateMultiplier = 1.02

const segUploadTimeoutMultiplier = 0.5
const segHttpPushTimeoutMultiplier = 4.0

var errSegEncoding = errors.New("ErrorSegEncoding")
var errSegSig = errors.New("ErrSegSig")
var errFormat = errors.New("unrecognized profile output format")
Expand Down Expand Up @@ -434,16 +437,24 @@ func SubmitSegment(sess *BroadcastSession, seg *stream.HLSSegment, nonce uint64,
return nil, err
}

// timeout for the whole HTTP call: segment upload, transcoding, reading response
httpTimeout := common.HTTPTimeout
// set a minimum timeout to accommodate transport / processing overhead
dur := common.HTTPTimeout
paddedDur := 4.0 * seg.Duration // use a multiplier of 4 for now
if paddedDur > dur.Seconds() {
dur = time.Duration(paddedDur * float64(time.Second))
paddedDur := segHttpPushTimeoutMultiplier * seg.Duration
if paddedDur > httpTimeout.Seconds() {
httpTimeout = time.Duration(paddedDur * float64(time.Second))
}
// timeout for the segment upload, until HTTP returns OK 200
uploadTimeout := time.Duration(segUploadTimeoutMultiplier * seg.Duration * float64(time.Second))
if uploadTimeout <= 0 {
uploadTimeout = common.SegmentUploadTimeout
}
ctx, cancel := context.WithTimeout(context.Background(), dur)

ctx, cancel := context.WithTimeout(context.Background(), httpTimeout)
defer cancel()

ti := sess.OrchestratorInfo

req, err := http.NewRequestWithContext(ctx, "POST", ti.Transcoder+"/segment", bytes.NewBuffer(data))
if err != nil {
glog.Errorf("Could not generate transcode request to orch=%s", ti.Transcoder)
Expand All @@ -463,9 +474,9 @@ func SubmitSegment(sess *BroadcastSession, seg *stream.HLSSegment, nonce uint64,
req.Header.Set("Content-Type", "video/MP2T")
}

glog.Infof("Submitting segment nonce=%d manifestID=%s sessionID=%s seqNo=%d bytes=%v orch=%s timeout=%s", nonce, params.ManifestID, sess.OrchestratorInfo.AuthToken.SessionId, seg.SeqNo, len(data), ti.Transcoder, dur)
glog.Infof("Submitting segment nonce=%d manifestID=%s sessionID=%s seqNo=%d bytes=%v orch=%s timeout=%s", nonce, params.ManifestID, sess.OrchestratorInfo.AuthToken.SessionId, seg.SeqNo, len(data), ti.Transcoder, httpTimeout)
start := time.Now()
resp, err := httpClient.Do(req)
resp, err := sendReqWithTimeout(req, uploadTimeout)
uploadDur := time.Since(start)
if err != nil {
glog.Errorf("Unable to submit segment orch=%v nonce=%d manifestID=%s sessionID=%s seqNo=%d orch=%s err=%v", ti.Transcoder, nonce, params.ManifestID, sess.OrchestratorInfo.AuthToken.SessionId, seg.SeqNo, ti.Transcoder, err)
Expand Down Expand Up @@ -816,3 +827,20 @@ func validatePrice(sess *BroadcastSession) error {
}
return nil
}

func sendReqWithTimeout(req *http.Request, timeout time.Duration) (*http.Response, error) {
ctx, cancel := context.WithCancel(req.Context())
timeouter := time.AfterFunc(timeout, cancel)

req = req.WithContext(ctx)
resp, err := httpClient.Do(req)
if timeouter.Stop() {
return resp, err
}
// timeout has already fired and cancelled the request
if err != nil {
return nil, err
}
resp.Body.Close()
return nil, context.DeadlineExceeded
}
28 changes: 26 additions & 2 deletions server/segment_rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package server

import (
"bytes"
"context"
"crypto/tls"
"encoding/base64"
"errors"
Expand Down Expand Up @@ -2000,8 +2001,7 @@ func TestSubmitSegment_Timeout(t *testing.T) {
headerTimeout = 100 * time.Millisecond
lock.Unlock()
_, err = SubmitSegment(sess, seg, 0, false, true)
assert.Contains(err.Error(), "header timeout")
assert.Contains(err.Error(), "context deadline exceeded")
assert.Contains(err.Error(), "context canceled")

// time out body
lock.Lock()
Expand Down Expand Up @@ -2261,6 +2261,30 @@ func TestSubmitSegment_Success(t *testing.T) {
balance.AssertCalled(t, "Credit", ratMatcher(change))
}

func TestSendReqWithTimeout(t *testing.T) {
assert := assert.New(t)

var wg sync.WaitGroup
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
wg.Wait()
w.WriteHeader(http.StatusOK)
}))
defer server.Close()
req, _ := http.NewRequestWithContext(context.Background(), "POST", server.URL, nil)

// no timeout
resp, err := sendReqWithTimeout(req, 5*time.Second)
assert.NoError(err)
assert.Equal(200, resp.StatusCode)

// timeout
wg.Add(1)
resp, err = sendReqWithTimeout(req, time.Millisecond)
wg.Done()
assert.Nil(resp)
assert.ErrorIs(err, context.Canceled)
}

func stubTLSServer() (*httptest.Server, *http.ServeMux) {
mux := http.NewServeMux()

Expand Down

0 comments on commit 6a72cc3

Please sign in to comment.