Skip to content

Commit

Permalink
Add parameter to force HW Session Reinit (livepeer#2882)
Browse files Browse the repository at this point in the history
  • Loading branch information
leszko authored and eliteprox committed Feb 21, 2024
1 parent 39ac80f commit 51f77cf
Show file tree
Hide file tree
Showing 9 changed files with 182 additions and 150 deletions.
7 changes: 7 additions & 0 deletions core/lb.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,13 @@ func (lb *LoadBalancingTranscoder) Transcode(ctx context.Context, md *SegTransco
lb.mu.RUnlock()
if exists {
clog.V(common.DEBUG).Infof(ctx, "LB: Using existing transcode session for key=%s", session.key)
if md != nil && md.SegmentParameters != nil && md.SegmentParameters.ForceSessionReinit {
// Broadcaster requested HW session reinitialization
lb.mu.Lock()
session.transcoder.Stop()
session.transcoder = lb.newT(lb.leastLoaded())
lb.mu.Unlock()
}
} else {
var err error
if len(md.DetectorProfiles) > 0 {
Expand Down
16 changes: 12 additions & 4 deletions core/streamdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,16 @@ func (s *StreamParameters) StreamID() string {
return string(s.ManifestID) + "/" + s.RtmpKey
}

type SegmentParameters struct {
type SegmentClip struct {
From time.Duration
To time.Duration
}

type SegmentParameters struct {
Clip *SegmentClip
ForceSessionReinit bool
}

type SegTranscodingMetadata struct {
ManifestID ManifestID
Fname string
Expand Down Expand Up @@ -135,9 +140,12 @@ func NetSegData(md *SegTranscodingMetadata) (*net.SegData, error) {
Profiles: []byte("invalid"),
}
if md.SegmentParameters != nil {
segData.SegmentParameters = &net.SegParameters{
From: uint64(md.SegmentParameters.From.Milliseconds()),
To: uint64(md.SegmentParameters.To.Milliseconds()),
segData.ForceSessionReinit = md.SegmentParameters.ForceSessionReinit
if md.SegmentParameters.Clip != nil {
segData.SegmentParameters = &net.SegParameters{
From: uint64(md.SegmentParameters.Clip.From.Milliseconds()),
To: uint64(md.SegmentParameters.Clip.To.Milliseconds()),
}
}
}

Expand Down
6 changes: 3 additions & 3 deletions core/transcoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,9 +500,9 @@ func profilesToTranscodeOptions(workDir string, accel ffmpeg.Acceleration, profi
AudioEncoder: ffmpeg.ComponentOptions{Name: "copy"},
CalcSign: calcPHash,
}
if segPar != nil {
o.From = segPar.From
o.To = segPar.To
if segPar != nil && segPar.Clip != nil {
o.From = segPar.Clip.From
o.To = segPar.Clip.To
}
opts[i] = o
}
Expand Down
272 changes: 141 additions & 131 deletions net/lp_rpc.pb.go

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions net/lp_rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,9 @@ message SegData {
// Transcoding parameters specific to this segment
SegParameters segment_parameters = 37;

// Force HW Session Reinit
bool ForceSessionReinit = 38;

// [EXPERIMENTAL]
// Detector profiles to use
repeated DetectorProfile detector_profiles = 36;
Expand Down
1 change: 0 additions & 1 deletion server/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -1039,7 +1039,6 @@ func transcodeSegment(ctx context.Context, cxn *rtmpConnection, seg *stream.HLSS
if seg, err = prepareForTranscoding(ctx, cxn, sess, seg, name); err != nil {
return nil, info, err
}
// cxn.sessManager.pushSegInFlight(sess, seg)
sess.pushSegInFlight(seg)
var res *ReceivedTranscodeResult
res, err = SubmitSegment(ctx, sess.Clone(), seg, segPar, nonce, calcPerceptualHash, verified)
Expand Down
16 changes: 10 additions & 6 deletions server/mediaserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,9 @@ type authWebhookResponse struct {
Name string `json:"name"`
} `json:"sceneClassification"`
} `json:"detection"`
VerificationFreq uint `json:"verificationFreq"`
TimeoutMultiplier int `json:"timeoutMultiplier"`
VerificationFreq uint `json:"verificationFreq"`
TimeoutMultiplier int `json:"timeoutMultiplier"`
ForceSessionReinit bool `json:"forceSessionReinit"`
}

func NewLivepeerServer(rtmpAddr string, lpNode *core.LivepeerNode, httpIngest bool, transcodingOptions string) (*LivepeerServer, error) {
Expand Down Expand Up @@ -802,19 +803,22 @@ func (s *LivepeerServer) HandlePush(w http.ResponseWriter, r *http.Request) {
if valMs, err := strconv.ParseUint(sliceToStr, 10, 64); err == nil {
sliceToDur = time.Duration(valMs) * time.Millisecond
}
var segPar *core.SegmentParameters
var segPar core.SegmentParameters
if sliceFromDur > 0 || sliceToDur > 0 {
if sliceFromDur > 0 && sliceToDur > 0 && sliceFromDur > sliceToDur {
httpErr := fmt.Sprintf(`Invalid slice config from=%s to=%s`, sliceFromDur, sliceToDur)
clog.Errorf(ctx, httpErr)
http.Error(w, httpErr, http.StatusBadRequest)
return
}
segPar = &core.SegmentParameters{
segPar.Clip = &core.SegmentClip{
From: sliceFromDur,
To: sliceToDur,
}
}
if authHeaderConfig != nil {
segPar.ForceSessionReinit = authHeaderConfig.ForceSessionReinit
}

now := time.Now()
if mid == "" {
Expand Down Expand Up @@ -901,7 +905,7 @@ func (s *LivepeerServer) HandlePush(w http.ResponseWriter, r *http.Request) {
}
}

cxn, err = s.registerConnection(ctx, st, vcodec, mediaFormat.PixFormat, segPar)
cxn, err = s.registerConnection(ctx, st, vcodec, mediaFormat.PixFormat, &segPar)
if err != nil {
st.Close()
if err != errAlreadyExists {
Expand Down Expand Up @@ -1010,7 +1014,7 @@ func (s *LivepeerServer) HandlePush(w http.ResponseWriter, r *http.Request) {
}()

// Do the transcoding!
urls, err := processSegment(ctx, cxn, seg, segPar)
urls, err := processSegment(ctx, cxn, seg, &segPar)
if err != nil {
status := http.StatusInternalServerError
if isNonRetryableError(err) {
Expand Down
4 changes: 2 additions & 2 deletions server/push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1997,8 +1997,8 @@ func TestPush_SlicePass(t *testing.T) {
require.NoError(err)
require.NotNil(md)
require.NotNil(md.SegmentParameters)
require.Equal(100*time.Millisecond, md.SegmentParameters.From)
require.Equal(200*time.Millisecond, md.SegmentParameters.To)
require.Equal(100*time.Millisecond, md.SegmentParameters.Clip.From)
require.Equal(200*time.Millisecond, md.SegmentParameters.Clip.To)
w.WriteHeader(http.StatusOK)
w.Write([]byte("test"))
})
Expand Down
7 changes: 4 additions & 3 deletions server/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,9 +536,10 @@ func coreSegMetadata(segData *net.SegData) (*core.SegTranscodingMetadata, error)
}
detectorProfs = append(detectorProfs, detectorProfile)
}
var segPar *core.SegmentParameters
var segPar core.SegmentParameters
segPar.ForceSessionReinit = segData.ForceSessionReinit
if segData.SegmentParameters != nil {
segPar = &core.SegmentParameters{
segPar.Clip = &core.SegmentClip{
From: time.Duration(segData.SegmentParameters.From) * time.Millisecond,
To: time.Duration(segData.SegmentParameters.To) * time.Millisecond,
}
Expand All @@ -556,6 +557,6 @@ func coreSegMetadata(segData *net.SegData) (*core.SegTranscodingMetadata, error)
DetectorEnabled: segData.DetectorEnabled,
DetectorProfiles: detectorProfs,
CalcPerceptualHash: segData.CalcPerceptualHash,
SegmentParameters: segPar,
SegmentParameters: &segPar,
}, nil
}

0 comments on commit 51f77cf

Please sign in to comment.