Skip to content

Commit

Permalink
server: Enforce a minimum timeout for segment upload (#2541)
Browse files Browse the repository at this point in the history
* server: Enforce a minimum timeout for segment upload

* CHANGELOG: Change changelog

* server: Fix tests with new upload timeout logic
  • Loading branch information
victorges authored Jul 28, 2022
1 parent 4a06317 commit 2f306f6
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 20 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG_PENDING.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@

#### Broadcaster

- [#2541](https://github.com/livepeer/go-livepeer/pull/2541) Enforce a minimum
timeout for segment upload (@victorges)

#### Orchestrator

#### Transcoder
4 changes: 2 additions & 2 deletions common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ var SegHttpPushTimeoutMultiplier = 4.0
// SegUploadTimeoutMultiplier used in HTTP connection for uploading the segment
var SegUploadTimeoutMultiplier = 0.5

// SegmentUploadTimeout timeout used in HTTP connections for uploading the segment duration is not defined
var SegmentUploadTimeout = 2 * time.Second
// MinSegmentUploadTimeout defines the minimum timeout enforced for uploading a segment to orchestrators
var MinSegmentUploadTimeout = 2 * time.Second

// WebhookDiscoveryRefreshInterval defines for long the Webhook Discovery values should be cached
var WebhookDiscoveryRefreshInterval = 1 * time.Minute
Expand Down
4 changes: 2 additions & 2 deletions server/segment_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,8 +477,8 @@ func SubmitSegment(ctx context.Context, sess *BroadcastSession, seg *stream.HLSS
}
// timeout for the segment upload, until HTTP returns OK 200
uploadTimeout := time.Duration(common.SegUploadTimeoutMultiplier * seg.Duration * float64(time.Second))
if uploadTimeout <= 0 {
uploadTimeout = common.SegmentUploadTimeout
if uploadTimeout < common.MinSegmentUploadTimeout {
uploadTimeout = common.MinSegmentUploadTimeout
}

ctx, cancel := context.WithTimeout(clog.Clone(context.Background(), ctx), httpTimeout)
Expand Down
45 changes: 29 additions & 16 deletions server/segment_rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1962,23 +1962,23 @@ func TestSubmitSegment_Timeout(t *testing.T) {
},
}
buf, err := proto.Marshal(tr)
assert.Nil(err)

headerTimeout := 0 * time.Millisecond
bodyTimeout := 0 * time.Millisecond
headerDelay := 0 * time.Millisecond
bodyDelay := 0 * time.Millisecond
ts, mux := stubTLSServer()
var lock sync.Mutex
defer ts.Close()
mux.HandleFunc("/segment", func(w http.ResponseWriter, r *http.Request) {
lock.Lock()
ht := headerTimeout
hd, bd := headerDelay, bodyDelay
lock.Unlock()
time.Sleep(ht)

time.Sleep(hd)
w.WriteHeader(http.StatusOK)
w.(http.Flusher).Flush()
lock.Lock()
bt := bodyTimeout
lock.Unlock()
time.Sleep(bt)

time.Sleep(bd)
w.Write(buf)
})

Expand All @@ -1996,34 +1996,47 @@ func TestSubmitSegment_Timeout(t *testing.T) {
defer func() { common.HTTPTimeout = oldTimeout }()
common.HTTPTimeout = 0

oldUploadTimeout := common.MinSegmentUploadTimeout
defer func() { common.MinSegmentUploadTimeout = oldUploadTimeout }()
common.MinSegmentUploadTimeout = 100 * time.Millisecond

// time out header
lock.Lock()
headerTimeout = 500 * time.Millisecond
headerDelay = 500 * time.Millisecond
lock.Unlock()
_, err = SubmitSegment(context.Background(), sess, seg, nil, 0, false, true)
assert.NotNil(err)
assert.Contains(err.Error(), "header timeout")
assert.Contains(err.Error(), "context canceled")

// should enforce minimum segment upload timeout
lock.Lock()
headerDelay = 50 * time.Millisecond
lock.Unlock()
_, err = SubmitSegment(context.Background(), sess, seg, nil, 0, false, true)
assert.Nil(err)

// time out body
lock.Lock()
headerTimeout = 0
bodyTimeout = 500 * time.Millisecond
headerDelay = 0
bodyDelay = 500 * time.Millisecond
lock.Unlock()
_, err = SubmitSegment(context.TODO(), sess, seg, nil, 0, false, true)
assert.NotNil(err)
assert.Equal("body timeout: context deadline exceeded", err.Error())

// sanity check, again
lock.Lock()
bodyTimeout = 0
bodyDelay = 0
lock.Unlock()
_, err = SubmitSegment(context.TODO(), sess, seg, nil, 0, false, true)
assert.Nil(err)

// sanity check default timeouts with a bodyTimeout > seg.Duration
// sanity check default timeouts with a bodyDelay > seg.Duration
common.HTTPTimeout = 1 * time.Second
lock.Lock()
bodyTimeout = 500 * time.Millisecond
assert.Greater(common.HTTPTimeout.Milliseconds(), bodyTimeout.Milliseconds())
bodyDelay = 500 * time.Millisecond
assert.Greater(common.HTTPTimeout.Milliseconds(), bodyDelay.Milliseconds())
lock.Unlock()
seg.Duration = 0.0 // missing duration
_, err = SubmitSegment(context.TODO(), sess, seg, nil, 0, false, true)
Expand All @@ -2040,7 +2053,7 @@ func TestSubmitSegment_Timeout(t *testing.T) {
// ensure default timeout triggers
common.HTTPTimeout = 10 * time.Millisecond
lock.Lock()
assert.Greater(bodyTimeout.Milliseconds(), common.HTTPTimeout.Milliseconds())
assert.Greater(bodyDelay.Milliseconds(), common.HTTPTimeout.Milliseconds())
lock.Unlock()
_, err = SubmitSegment(context.TODO(), sess, seg, nil, 0, false, true)
assert.Equal("body timeout: context deadline exceeded", err.Error())
Expand Down

0 comments on commit 2f306f6

Please sign in to comment.