Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add parameter to force HW Session Reinit #2882

Merged
merged 4 commits into from
Oct 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions core/lb.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,13 @@ func (lb *LoadBalancingTranscoder) Transcode(ctx context.Context, md *SegTransco
lb.mu.RUnlock()
if exists {
clog.V(common.DEBUG).Infof(ctx, "LB: Using existing transcode session for key=%s", session.key)
if md != nil && md.SegmentParameters != nil && md.SegmentParameters.ForceSessionReinit {
// Broadcaster requested HW session reinitialization
lb.mu.Lock()
session.transcoder.Stop()
session.transcoder = lb.newT(lb.leastLoaded())
lb.mu.Unlock()
}
} else {
var err error
if len(md.DetectorProfiles) > 0 {
Expand Down
16 changes: 12 additions & 4 deletions core/streamdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,16 @@ func (s *StreamParameters) StreamID() string {
return string(s.ManifestID) + "/" + s.RtmpKey
}

type SegmentParameters struct {
type SegmentClip struct {
From time.Duration
To time.Duration
}

type SegmentParameters struct {
Clip *SegmentClip
ForceSessionReinit bool
}

type SegTranscodingMetadata struct {
ManifestID ManifestID
Fname string
Expand Down Expand Up @@ -135,9 +140,12 @@ func NetSegData(md *SegTranscodingMetadata) (*net.SegData, error) {
Profiles: []byte("invalid"),
}
if md.SegmentParameters != nil {
segData.SegmentParameters = &net.SegParameters{
From: uint64(md.SegmentParameters.From.Milliseconds()),
To: uint64(md.SegmentParameters.To.Milliseconds()),
segData.ForceSessionReinit = md.SegmentParameters.ForceSessionReinit
if md.SegmentParameters.Clip != nil {
segData.SegmentParameters = &net.SegParameters{
From: uint64(md.SegmentParameters.Clip.From.Milliseconds()),
To: uint64(md.SegmentParameters.Clip.To.Milliseconds()),
}
}
}

Expand Down
6 changes: 3 additions & 3 deletions core/transcoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,9 +500,9 @@ func profilesToTranscodeOptions(workDir string, accel ffmpeg.Acceleration, profi
AudioEncoder: ffmpeg.ComponentOptions{Name: "copy"},
CalcSign: calcPHash,
}
if segPar != nil {
o.From = segPar.From
o.To = segPar.To
if segPar != nil && segPar.Clip != nil {
o.From = segPar.Clip.From
o.To = segPar.Clip.To
}
opts[i] = o
}
Expand Down
272 changes: 141 additions & 131 deletions net/lp_rpc.pb.go

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions net/lp_rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,9 @@ message SegData {
// Transcoding parameters specific to this segment
SegParameters segment_parameters = 37;

// Force HW Session Reinit
bool ForceSessionReinit = 38;

// [EXPERIMENTAL]
// Detector profiles to use
repeated DetectorProfile detector_profiles = 36;
Expand Down
1 change: 0 additions & 1 deletion server/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -1039,7 +1039,6 @@ func transcodeSegment(ctx context.Context, cxn *rtmpConnection, seg *stream.HLSS
if seg, err = prepareForTranscoding(ctx, cxn, sess, seg, name); err != nil {
return nil, info, err
}
// cxn.sessManager.pushSegInFlight(sess, seg)
sess.pushSegInFlight(seg)
var res *ReceivedTranscodeResult
res, err = SubmitSegment(ctx, sess.Clone(), seg, segPar, nonce, calcPerceptualHash, verified)
Expand Down
16 changes: 10 additions & 6 deletions server/mediaserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,9 @@ type authWebhookResponse struct {
Name string `json:"name"`
} `json:"sceneClassification"`
} `json:"detection"`
VerificationFreq uint `json:"verificationFreq"`
TimeoutMultiplier int `json:"timeoutMultiplier"`
VerificationFreq uint `json:"verificationFreq"`
TimeoutMultiplier int `json:"timeoutMultiplier"`
ForceSessionReinit bool `json:"forceSessionReinit"`
}

func NewLivepeerServer(rtmpAddr string, lpNode *core.LivepeerNode, httpIngest bool, transcodingOptions string) (*LivepeerServer, error) {
Expand Down Expand Up @@ -801,19 +802,22 @@ func (s *LivepeerServer) HandlePush(w http.ResponseWriter, r *http.Request) {
if valMs, err := strconv.ParseUint(sliceToStr, 10, 64); err == nil {
sliceToDur = time.Duration(valMs) * time.Millisecond
}
var segPar *core.SegmentParameters
var segPar core.SegmentParameters
if sliceFromDur > 0 || sliceToDur > 0 {
if sliceFromDur > 0 && sliceToDur > 0 && sliceFromDur > sliceToDur {
httpErr := fmt.Sprintf(`Invalid slice config from=%s to=%s`, sliceFromDur, sliceToDur)
clog.Errorf(ctx, httpErr)
http.Error(w, httpErr, http.StatusBadRequest)
return
}
segPar = &core.SegmentParameters{
segPar.Clip = &core.SegmentClip{
From: sliceFromDur,
To: sliceToDur,
}
}
if authHeaderConfig != nil {
segPar.ForceSessionReinit = authHeaderConfig.ForceSessionReinit
}

now := time.Now()
if mid == "" {
Expand Down Expand Up @@ -900,7 +904,7 @@ func (s *LivepeerServer) HandlePush(w http.ResponseWriter, r *http.Request) {
}
}

cxn, err = s.registerConnection(ctx, st, vcodec, mediaFormat.PixFormat, segPar)
cxn, err = s.registerConnection(ctx, st, vcodec, mediaFormat.PixFormat, &segPar)
if err != nil {
st.Close()
if err != errAlreadyExists {
Expand Down Expand Up @@ -1009,7 +1013,7 @@ func (s *LivepeerServer) HandlePush(w http.ResponseWriter, r *http.Request) {
}()

// Do the transcoding!
urls, err := processSegment(ctx, cxn, seg, segPar)
urls, err := processSegment(ctx, cxn, seg, &segPar)
if err != nil {
status := http.StatusInternalServerError
if isNonRetryableError(err) {
Expand Down
4 changes: 2 additions & 2 deletions server/push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1997,8 +1997,8 @@ func TestPush_SlicePass(t *testing.T) {
require.NoError(err)
require.NotNil(md)
require.NotNil(md.SegmentParameters)
require.Equal(100*time.Millisecond, md.SegmentParameters.From)
require.Equal(200*time.Millisecond, md.SegmentParameters.To)
require.Equal(100*time.Millisecond, md.SegmentParameters.Clip.From)
require.Equal(200*time.Millisecond, md.SegmentParameters.Clip.To)
w.WriteHeader(http.StatusOK)
w.Write([]byte("test"))
})
Expand Down
7 changes: 4 additions & 3 deletions server/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,9 +536,10 @@ func coreSegMetadata(segData *net.SegData) (*core.SegTranscodingMetadata, error)
}
detectorProfs = append(detectorProfs, detectorProfile)
}
var segPar *core.SegmentParameters
var segPar core.SegmentParameters
segPar.ForceSessionReinit = segData.ForceSessionReinit
if segData.SegmentParameters != nil {
segPar = &core.SegmentParameters{
segPar.Clip = &core.SegmentClip{
From: time.Duration(segData.SegmentParameters.From) * time.Millisecond,
To: time.Duration(segData.SegmentParameters.To) * time.Millisecond,
}
Expand All @@ -556,6 +557,6 @@ func coreSegMetadata(segData *net.SegData) (*core.SegTranscodingMetadata, error)
DetectorEnabled: segData.DetectorEnabled,
DetectorProfiles: detectorProfs,
CalcPerceptualHash: segData.CalcPerceptualHash,
SegmentParameters: segPar,
SegmentParameters: &segPar,
}, nil
}