Skip to content

Commit

Permalink
cmd, common: netint hardware support (#2348)
Browse files Browse the repository at this point in the history
* Netint hardware support

* Refactor livepeer_bench

* add names for pixel format capabilities, go fmt

* Remove unused function, fix tests

* Check for netint and nvidia args combination, update lpms version

* Apply Netint changes

* Remove unused variable from livepeer_bench

* Go fmt

* Changelog
  • Loading branch information
cyberj0g authored Apr 25, 2022
1 parent f2ec58d commit 249d9eb
Show file tree
Hide file tree
Showing 11 changed files with 224 additions and 27 deletions.
1 change: 1 addition & 0 deletions CHANGELOG_PENDING.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
### Features ⚒

#### General
- \#2348 Support Netint transcoding hardware (@cyberj0g)
- \#2289 Add timeouts to ETH client (@leszko)
- \#2282 Add checksums and gpg signature support with binary releases. (@hjpotter92)
- \#2344 Use T.TempDir to create temporary test directory (@Juneezee)
Expand Down
40 changes: 33 additions & 7 deletions cmd/livepeer/livepeer.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ type LivepeerConfig struct {
maxSessions *int
currentManifest *bool
nvidia *string
netint *string
testTranscoder *bool
sceneClassificationModelPath *string
ethAcctAddr *string
Expand Down Expand Up @@ -225,6 +226,7 @@ func DefaultLivepeerConfig() LivepeerConfig {
defaultMaxSessions := 10
defaultCurrentManifest := false
defaultNvidia := ""
defaultNetint := ""
defaultTestTranscoder := true
defaultSceneClassificationModelPath := ""

Expand Down Expand Up @@ -288,6 +290,7 @@ func DefaultLivepeerConfig() LivepeerConfig {
maxSessions: &defaultMaxSessions,
currentManifest: &defaultCurrentManifest,
nvidia: &defaultNvidia,
netint: &defaultNetint,
testTranscoder: &defaultTestTranscoder,
sceneClassificationModelPath: &defaultSceneClassificationModelPath,

Expand Down Expand Up @@ -357,6 +360,7 @@ func parseLivepeerConfig() LivepeerConfig {
cfg.maxSessions = flag.Int("maxSessions", *cfg.maxSessions, "Maximum number of concurrent transcoding sessions for Orchestrator, maximum number or RTMP streams for Broadcaster, or maximum capacity for transcoder")
cfg.currentManifest = flag.Bool("currentManifest", *cfg.currentManifest, "Expose the currently active ManifestID as \"/stream/current.m3u8\"")
cfg.nvidia = flag.String("nvidia", *cfg.nvidia, "Comma-separated list of Nvidia GPU device IDs (or \"all\" for all available devices)")
cfg.netint = flag.String("netint", *cfg.netint, "Comma-separated list of NetInt device GUIDs (or \"all\" for all available devices)")
cfg.testTranscoder = flag.Bool("testTranscoder", *cfg.testTranscoder, "Test Nvidia GPU transcoding at startup")
cfg.sceneClassificationModelPath = flag.String("sceneClassificationModelPath", *cfg.sceneClassificationModelPath, "Path to scene classification model")

Expand Down Expand Up @@ -446,6 +450,11 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
return
}

if *cfg.netint != "" && *cfg.nvidia != "" {
glog.Fatal("both -netint and -nvidia arguments specified, this is not supported")
return
}

blockPollingTime := time.Duration(*cfg.blockPollingInterval) * time.Second

type NetworkConfig struct {
Expand Down Expand Up @@ -540,22 +549,39 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
var transcoderCaps []core.Capability
if *cfg.transcoder {
core.WorkDir = *cfg.datadir
accel := ffmpeg.Software
var devicesStr string
if *cfg.nvidia != "" {
accel = ffmpeg.Nvidia
devicesStr = *cfg.nvidia
}
if *cfg.netint != "" {
accel = ffmpeg.Netint
devicesStr = *cfg.netint
}
if accel != ffmpeg.Software {
accelName := ffmpeg.AccelerationNameLookup[accel]
tf, dtf, err := core.GetTranscoderFactoryByAccel(accel)
if err != nil {
glog.Fatalf("Error unsupported acceleration: %v", err)
}
// Get a list of device ids
devices, err := common.ParseNvidiaDevices(*cfg.nvidia)
devices, err := common.ParseAccelDevices(devicesStr, accel)
glog.Infof("%v devices: %v", accelName, devices)
if err != nil {
glog.Fatalf("Error while parsing '-nvidia %v' flag: %v", *cfg.nvidia, err)
glog.Fatalf("Error while parsing '-%v %v' flag: %v", strings.ToLower(accelName), devices, err)
}
glog.Infof("Transcoding on these Nvidia GPUs: %v", devices)
// Test transcoding with nvidia
glog.Infof("Transcoding on these %v devices: %v", accelName, devices)
// Test transcoding with specified device
if *cfg.testTranscoder {
transcoderCaps, err = core.TestTranscoderCapabilities(devices)
transcoderCaps, err = core.TestTranscoderCapabilities(devices, tf)
if err != nil {
glog.Fatal(err)
return
}
}
// FIXME: Short-term hack to pre-load the detection models on every device
if *cfg.sceneClassificationModelPath != "" {
if accel == ffmpeg.Nvidia && *cfg.sceneClassificationModelPath != "" {
detectorProfile := ffmpeg.DSceneAdultSoccer
detectorProfile.ModelPath = *cfg.sceneClassificationModelPath
core.DetectorProfile = &detectorProfile
Expand All @@ -568,7 +594,7 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
}
}
// Initialize LB transcoder
n.Transcoder = core.NewLoadBalancingTranscoder(devices, core.NewNvidiaTranscoder, core.NewNvidiaTranscoderWithDetector)
n.Transcoder = core.NewLoadBalancingTranscoder(devices, tf, dtf)
} else {
// for local software mode, enable all capabilities
transcoderCaps = append(core.DefaultCapabilities(), core.OptionalCapabilities()...)
Expand Down
29 changes: 26 additions & 3 deletions cmd/livepeer_bench/livepeer_bench.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ func main() {
concurrentSessions := flag.Int("concurrentSessions", 1, "# of concurrent transcode sessions")
repeat := flag.Int("repeat", 1, "# of times benchmark will be repeated")
segs := flag.Int("segs", 0, "Maximum # of segments to transcode (default all)")
log := flag.Int("log", 3, "Log level (1 fatal, 2 error, 3 warning, 4 info, 5 verbose, 6 debug, 7 trace, default warning)")
transcodingOptions := flag.String("transcodingOptions", "P240p30fps16x9,P360p30fps16x9,P720p30fps16x9", "Transcoding options for broadcast job, or path to json config")
nvidia := flag.String("nvidia", "", "Comma-separated list of Nvidia GPU device IDs (or \"all\" for all available devices)")
netint := flag.String("netint", "", "Comma-separated list of NetInt device GUIDs (or \"all\" for all available devices)")
outPrefix := flag.String("outPrefix", "", "Output segments' prefix (no segments are generated by default)")
detectionFreq := flag.Int("detectionFreq", 0, "Run content-detection on every nth segment. No detection occurs for default frequency of 0.")
detectionSampleRate := flag.Uint("detectionSampleRate", 1, "Run content-detection on every nth frame of a particular segment, if detectionFreq > 0.")
Expand Down Expand Up @@ -68,11 +70,24 @@ func main() {
if *nvidia != "" {
var err error
accel = ffmpeg.Nvidia
devices, err = common.ParseNvidiaDevices(*nvidia)
devices, err = common.ParseAccelDevices(*nvidia, accel)
if err != nil {
glog.Fatalf("Error while parsing '-nvidia %v' flag: %v", *nvidia, err)
}
}

if *netint != "" {
var err error
accel = ffmpeg.Netint
devices, err = common.ParseAccelDevices(*netint, accel)
if err != nil {
glog.Fatalf("Error while parsing '-netint %v' flag: %v", *netint, err)
}
}

glog.Infof("log level is: %d", ffmpeg.LogLevel(*log*8))
ffmpeg.InitFFmpegWithLogLevel(ffmpeg.LogLevel(*log * 8))

var wg sync.WaitGroup
dir := path.Dir(*in)

Expand All @@ -88,6 +103,13 @@ func main() {
if accel == ffmpeg.Nvidia && len(devices) > 0 {
data = append(data, []string{"Nvidia GPU IDs", fmt.Sprintf("%v", strings.Join(devices, ","))})
}

if accel == ffmpeg.Netint && len(devices) > 0 {
data = append(data, []string{"Netint GUIDs", fmt.Sprintf("%v", strings.Join(devices, ","))})
}

fmt.Printf("data %s \n", data)

if *detectionFreq > 0 {
data = append(data, []string{"Content Detection (segment_freq,frame_sample_rate)", fmt.Sprintf("%v,%v", *detectionFreq, *detectionSampleRate)})
}
Expand Down Expand Up @@ -129,7 +151,7 @@ func main() {
}
fmt.Println("timestamp,session,segment,seg_dur,transcode_time,detect_data")
} else {
fmt.Println("timestamp,session,segment,seg_dur,transcode_time")
fmt.Println("timestamp,session,segment,seg_dur,transcode_time,frames")
}

var mu sync.Mutex
Expand Down Expand Up @@ -168,6 +190,7 @@ func main() {
}
if ffmpeg.Software != accel {
in.Device = devices[k%len(devices)]
fmt.Printf("in.Device %s \n", in.Device)
}
profs2opts := func(profs []ffmpeg.VideoProfile) []ffmpeg.TranscodeOptions {
opts := []ffmpeg.TranscodeOptions{}
Expand Down Expand Up @@ -207,7 +230,7 @@ func main() {
if *detectionFreq > 0 && j%*detectionFreq == 0 {
fmt.Printf("%s,%d,%d,%0.4v,%0.4v,%v\n", end.Format("2006-01-02 15:04:05.9999"), k, j, v.Duration, end.Sub(t).Seconds(), res.Encoded[len(res.Encoded)-1].DetectData)
} else {
fmt.Printf("%s,%d,%d,%0.4v,%0.4v\n", end.Format("2006-01-02 15:04:05.9999"), k, j, v.Duration, end.Sub(t).Seconds())
fmt.Printf("%s,%d,%d,%0.4v,%0.4v,%v\n", end.Format("2006-01-02 15:04:05.9999"), k, j, v.Duration, end.Sub(t).Seconds(), res.Encoded[0].Frames)
}
segTxDur := end.Sub(t).Seconds()
mu.Lock()
Expand Down
38 changes: 38 additions & 0 deletions cmd/livepeer_bench/transcodingOptions-netint.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
[
{
"name": "240p0",
"fps": 0,
"bitrate": 250000,
"width": 426,
"height": 240,
"gop": "1",
"encoder": "HEVC"
},
{
"name": "360p0",
"fps": 0,
"bitrate": 800000,
"width": 640,
"height": 360,
"gop": "1",
"encoder": "HEVC"
},
{
"name": "480p0",
"fps": 0,
"bitrate": 1600000,
"width": 854,
"height": 480,
"gop": "1",
"encoder": "HEVC"
},
{
"name": "720p0",
"fps": 0,
"bitrate": 3000000,
"width": 1280,
"height": 720,
"gop": "1",
"encoder": "HEVC"
}
]
12 changes: 8 additions & 4 deletions cmd/livepeer_bench/transcodingOptions.json
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -5,30 +5,34 @@
"bitrate": 250000,
"width": 426,
"height": 240,
"profile": "h264constrainedhigh"
"profile": "h264constrainedhigh",
"gop": "1"
},
{
"name": "360p0",
"fps": 0,
"bitrate": 800000,
"width": 640,
"height": 360,
"profile": "h264constrainedhigh"
"profile": "h264constrainedhigh",
"gop": "1"
},
{
"name": "480p0",
"fps": 0,
"bitrate": 1600000,
"width": 854,
"height": 480,
"profile": "h264constrainedhigh"
"profile": "h264constrainedhigh",
"gop": "1"
},
{
"name": "720p0",
"fps": 0,
"bitrate": 3000000,
"width": 1280,
"height": 720,
"profile": "h264constrainedhigh"
"profile": "h264constrainedhigh",
"gop": "1"
}
]
6 changes: 3 additions & 3 deletions common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,9 +455,9 @@ func detectNvidiaDevices() ([]string, error) {
return devices, nil
}

func ParseNvidiaDevices(nvidia string) ([]string, error) {
if nvidia == "all" {
func ParseAccelDevices(devices string, acceleration ffmpeg.Acceleration) ([]string, error) {
if acceleration == ffmpeg.Nvidia && devices == "all" {
return detectNvidiaDevices()
}
return strings.Split(nvidia, ","), nil
return strings.Split(devices, ","), nil
}
5 changes: 5 additions & 0 deletions core/capabilities.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ var CapabilityNameLookup = map[Capability]string{
Capability_VP9_Decode: "VP9 decode",
Capability_VP8_Encode: "VP8 encode",
Capability_VP9_Encode: "VP9 encode",
Capability_H264_Decode_444_8bit: "H264 Decode YUV444 8-bit",
Capability_H264_Decode_422_8bit: "H264 Decode YUV422 8-bit",
Capability_H264_Decode_444_10bit: "H264 Decode YUV444 10-bit",
Capability_H264_Decode_422_10bit: "H264 Decode YUV422 10-bit",
Capability_H264_Decode_420_10bit: "H264 Decode YUV420 10-bit",
}

var CapabilityTestLookup = map[Capability]CapabilityTest{
Expand Down
4 changes: 2 additions & 2 deletions core/capabilities_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,10 @@ func TestCapability_TranscoderCapabilities(t *testing.T) {
defer cleanup()

// nvidia test
devices, err := common.ParseNvidiaDevices("all")
devices, err := common.ParseAccelDevices("all", ffmpeg.Nvidia)
devicesAvailable := err == nil && len(devices) > 0
if devicesAvailable {
nvidiaCaps, err := TestTranscoderCapabilities(devices)
nvidiaCaps, err := TestTranscoderCapabilities(devices, NewNvidiaTranscoder)
assert.Nil(t, err)
assert.False(t, InArray(Capability_H264_Decode_444_8bit, nvidiaCaps), "Nvidia device should not support decode of 444_8bit")
assert.False(t, InArray(Capability_H264_Decode_422_8bit, nvidiaCaps), "Nvidia device should not support decode of 422_8bit")
Expand Down
Loading

0 comments on commit 249d9eb

Please sign in to comment.