Skip to content

Commit

Permalink
Control scene classification with T command line args (#2686)
Browse files Browse the repository at this point in the history
* Control scene classification with T command line args, log detection results on B, even if not requested for stream

* Changelog
  • Loading branch information
cyberj0g authored Dec 13, 2022
1 parent 7ea2d42 commit 1115630
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 44 deletions.
1 change: 1 addition & 0 deletions CHANGELOG_PENDING.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#### Orchestrator

#### Transcoder
- \#2686 Control non-stream specific scene classification with command line args

### Bug Fixes 🐞

Expand Down
3 changes: 2 additions & 1 deletion cmd/livepeer/livepeer.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ func parseLivepeerConfig() starter.LivepeerConfig {
cfg.Netint = flag.String("netint", *cfg.Netint, "Comma-separated list of NetInt device GUIDs (or \"all\" for all available devices)")
cfg.TestTranscoder = flag.Bool("testTranscoder", *cfg.TestTranscoder, "Test Nvidia GPU transcoding at startup")
cfg.SceneClassificationModelPath = flag.String("sceneClassificationModelPath", *cfg.SceneClassificationModelPath, "Path to scene classification model")
cfg.DetectContent = flag.Bool("detectContent", *cfg.DetectContent, "Set to true to enable content type detection")
cfg.DetectContent = flag.Bool("detectContent", *cfg.DetectContent, "Enables content type detection capability and automatic detection. If not specified, transcoder won't advertise corresponding capabilities and receive such jobs.")
cfg.DetectionSampleRate = flag.Uint("detectionSampleRate", *cfg.DetectionSampleRate, "Run content detection automatically on every nth frame of each segment, independently of requested stream transcoding configuration.")

// Onchain:
cfg.EthAcctAddr = flag.String("ethAcctAddr", *cfg.EthAcctAddr, "Existing Eth account address")
Expand Down
10 changes: 10 additions & 0 deletions cmd/livepeer/starter/starter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/ethereum/go-ethereum/common/math"
"io/ioutil"
"math/big"
"net"
Expand Down Expand Up @@ -87,6 +88,7 @@ type LivepeerConfig struct {
TestTranscoder *bool
SceneClassificationModelPath *string
DetectContent *bool
DetectionSampleRate *uint
EthAcctAddr *string
EthPassword *string
EthKeystorePath *string
Expand Down Expand Up @@ -153,6 +155,7 @@ func DefaultLivepeerConfig() LivepeerConfig {
defaultNetint := ""
defaultTestTranscoder := true
defaultDetectContent := false
defaultDetectionSampleRate := uint(math.MaxUint32)
defaultSceneClassificationModelPath := "tasmodel.pb"

// Onchain:
Expand Down Expand Up @@ -231,6 +234,7 @@ func DefaultLivepeerConfig() LivepeerConfig {
TestTranscoder: &defaultTestTranscoder,
SceneClassificationModelPath: &defaultSceneClassificationModelPath,
DetectContent: &defaultDetectContent,
DetectionSampleRate: &defaultDetectionSampleRate,

// Onchain:
EthAcctAddr: &defaultEthAcctAddr,
Expand Down Expand Up @@ -295,6 +299,11 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
return
}

if *cfg.DetectionSampleRate <= 0 {
glog.Fatal("-detectionSampleRate must be greater than zero")
return
}

blockPollingTime := time.Duration(*cfg.BlockPollingInterval) * time.Second

type NetworkConfig struct {
Expand Down Expand Up @@ -432,6 +441,7 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
if accel == ffmpeg.Nvidia && *cfg.DetectContent {
if _, err := os.Stat(*cfg.SceneClassificationModelPath); err == nil {
detectorProfile := ffmpeg.DSceneAdultSoccer
detectorProfile.SampleRate = *cfg.DetectionSampleRate
detectorProfile.ModelPath = *cfg.SceneClassificationModelPath
core.DetectorProfile = &detectorProfile
for _, d := range devices {
Expand Down
7 changes: 5 additions & 2 deletions core/lb.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,13 @@ func (lb *LoadBalancingTranscoder) createSession(ctx context.Context, md *SegTra
// Acquire transcode session. Map to job id + assigned transcoder
key := job + "_" + transcoder
costEstimate := calculateCost(md.Profiles)

// create the transcoder - with AI capabilities, if required by local or stream configuration
var lpmsSession TranscoderSession
if md.DetectorEnabled {
setEffectiveDetectorConfig(md)
if md.DetectorEnabled && len(md.DetectorProfiles) == 1 {
var err error
lpmsSession, err = lb.newDetectorT(DetectorProfile, transcoder)
lpmsSession, err = lb.newDetectorT(md.DetectorProfiles[0], transcoder)
if err != nil {
return nil, err
}
Expand Down
70 changes: 33 additions & 37 deletions core/transcoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"flag"
"fmt"
"io/ioutil"
"math"
"os"
"path/filepath"
"strconv"
Expand Down Expand Up @@ -41,6 +42,36 @@ func NewUnrecoverableError(err error) UnrecoverableError {

var WorkDir string

func setEffectiveDetectorConfig(md *SegTranscodingMetadata) {
aiEnabled := DetectorProfile != nil
actualProfile := DetectorProfile
if aiEnabled {
presetSampleRate := DetectorProfile.(*ffmpeg.SceneClassificationProfile).SampleRate
if md.DetectorEnabled && len(md.DetectorProfiles) == 1 {
actualProfile = md.DetectorProfiles[0]
requestedSampleRate := actualProfile.(*ffmpeg.SceneClassificationProfile).SampleRate
// 0 is not a valid value
if requestedSampleRate == 0 {
requestedSampleRate = math.MaxUint32
}
actualProfile.(*ffmpeg.SceneClassificationProfile).SampleRate = uint(math.Min(float64(presetSampleRate),
float64(requestedSampleRate)))
// copy other fields from default AI capability, as we don't yet support custom ones
actualProfile.(*ffmpeg.SceneClassificationProfile).ModelPath = DetectorProfile.(*ffmpeg.SceneClassificationProfile).ModelPath
actualProfile.(*ffmpeg.SceneClassificationProfile).Input = DetectorProfile.(*ffmpeg.SceneClassificationProfile).Input
actualProfile.(*ffmpeg.SceneClassificationProfile).Output = DetectorProfile.(*ffmpeg.SceneClassificationProfile).Output
actualProfile.(*ffmpeg.SceneClassificationProfile).Classes = DetectorProfile.(*ffmpeg.SceneClassificationProfile).Classes
}
}
if actualProfile != nil && actualProfile.(*ffmpeg.SceneClassificationProfile).SampleRate < math.MaxUint32 {
md.DetectorProfiles = []ffmpeg.DetectorProfile{actualProfile}
md.DetectorEnabled = true
} else {
md.DetectorProfiles = []ffmpeg.DetectorProfile{}
md.DetectorEnabled = false
}
}

func (lt *LocalTranscoder) Transcode(ctx context.Context, md *SegTranscodingMetadata) (td *TranscodeData, retErr error) {
// Returns UnrecoverableError instead of panicking to gracefully notify orchestrator about transcoder's failure
defer recoverFromPanic(&retErr)
Expand All @@ -50,6 +81,7 @@ func (lt *LocalTranscoder) Transcode(ctx context.Context, md *SegTranscodingMeta
Fname: md.Fname,
Accel: ffmpeg.Software,
}
setEffectiveDetectorConfig(md)
profiles := md.Profiles
opts := profilesToTranscodeOptions(lt.workDir, ffmpeg.Software, profiles, md.CalcPerceptualHash, md.SegmentParameters)
if md.DetectorEnabled {
Expand Down Expand Up @@ -141,6 +173,7 @@ func (nv *NvidiaTranscoder) Transcode(ctx context.Context, md *SegTranscodingMet
Device: nv.device,
}
profiles := md.Profiles
setEffectiveDetectorConfig(md)
out := profilesToTranscodeOptions(WorkDir, ffmpeg.Nvidia, profiles, md.CalcPerceptualHash, md.SegmentParameters)
if md.DetectorEnabled {
out = append(out, detectorsToTranscodeOptions(WorkDir, ffmpeg.Nvidia, md.DetectorProfiles)...)
Expand Down Expand Up @@ -354,43 +387,6 @@ func TestSoftwareTranscoderCapabilities(tmpdir string) (caps []Capability, fatal
return caps, fatalError
}

func TestNetintTranscoder(devices []string) error {
buf, _ := os.ReadFile("core/test.ts")
b := bytes.NewReader(buf)
z, err := gzip.NewReader(b)
if err != nil {
return err
}
mp4testSeg, err := ioutil.ReadAll(z)
z.Close()
if err != nil {
return err
}
fname := filepath.Join(WorkDir, "testseg.tempfile")
err = ioutil.WriteFile(fname, mp4testSeg, 0644)
if err != nil {
return err
}
defer os.Remove(fname)
for _, device := range devices {
t1 := NewNetintTranscoder(device)
// "145x1" is the minimal resolution that succeeds on Windows, so use "145x145"
p := ffmpeg.VideoProfile{Resolution: "145x145", Bitrate: "1k", Format: ffmpeg.FormatMP4}
md := &SegTranscodingMetadata{Fname: fname, Profiles: []ffmpeg.VideoProfile{p, p, p, p}}
td, err := t1.Transcode(context.Background(), md)

t1.Stop()
if err != nil {
return err
}
if len(td.Segments) == 0 || td.Pixels == 0 {
return errors.New("Empty transcoded segment")
}
}

return nil
}

func GetTranscoderFactoryByAccel(acceleration ffmpeg.Acceleration) (func(device string) TranscoderSession, func(detector ffmpeg.DetectorProfile, gpu string) (TranscoderSession, error), error) {
switch acceleration {
case ffmpeg.Nvidia:
Expand Down
26 changes: 22 additions & 4 deletions server/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -1013,9 +1013,30 @@ func transcodeSegment(ctx context.Context, cxn *rtmpConnection, seg *stream.HLSS
}
return nil, info, err
}
// log all received detection results
if monitor.Enabled {
if len(res.Detections) > 0 {
for _, detection := range res.Detections {
switch x := detection.Value.(type) {
case *net.DetectData_SceneClassification:
probs := x.SceneClassification.ClassProbs
for id, prob := range probs {
className := "unknown"
for name, lookupId := range ffmpeg.DetectorClassIDLookup {
if id == uint32(lookupId) {
className = name
break
}
}
monitor.SegSceneClassificationResult(ctx, seg.SeqNo, className, prob)
}
}
}
}
}
// [EXPERIMENTAL] send content detection results to callback webhook
// for now use detection only in common path
if DetectionWebhookURL != nil && len(res.Detections) > 0 {
if DetectionWebhookURL != nil {
clog.V(common.DEBUG).Infof(ctx, "Got detection result %v", res.Detections)
if monitor.Enabled {
monitor.SegSceneClassificationDone(ctx, seg.SeqNo)
Expand All @@ -1035,9 +1056,6 @@ func transcodeSegment(ctx context.Context, cxn *rtmpConnection, seg *stream.HLSS
Name: name,
Probability: prob,
})
if monitor.Enabled {
monitor.SegSceneClassificationResult(ctx, seg.SeqNo, name, prob)
}
}
}
}
Expand Down

0 comments on commit 1115630

Please sign in to comment.