Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Netint hardware support #2348

Merged
merged 10 commits into from
Apr 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG_PENDING.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
### Features ⚒

#### General
- \#2348 Support Netint transcoding hardware (@cyberj0g)
- \#2289 Add timeouts to ETH client (@leszko)
- \#2282 Add checksums and gpg signature support with binary releases. (@hjpotter92)
- \#2344 Use T.TempDir to create temporary test directory (@Juneezee)
Expand Down
40 changes: 33 additions & 7 deletions cmd/livepeer/livepeer.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ type LivepeerConfig struct {
maxSessions *int
currentManifest *bool
nvidia *string
netint *string
testTranscoder *bool
sceneClassificationModelPath *string
ethAcctAddr *string
Expand Down Expand Up @@ -225,6 +226,7 @@ func DefaultLivepeerConfig() LivepeerConfig {
defaultMaxSessions := 10
defaultCurrentManifest := false
defaultNvidia := ""
defaultNetint := ""
defaultTestTranscoder := true
defaultSceneClassificationModelPath := ""

Expand Down Expand Up @@ -288,6 +290,7 @@ func DefaultLivepeerConfig() LivepeerConfig {
maxSessions: &defaultMaxSessions,
currentManifest: &defaultCurrentManifest,
nvidia: &defaultNvidia,
netint: &defaultNetint,
testTranscoder: &defaultTestTranscoder,
sceneClassificationModelPath: &defaultSceneClassificationModelPath,

Expand Down Expand Up @@ -357,6 +360,7 @@ func parseLivepeerConfig() LivepeerConfig {
cfg.maxSessions = flag.Int("maxSessions", *cfg.maxSessions, "Maximum number of concurrent transcoding sessions for Orchestrator, maximum number or RTMP streams for Broadcaster, or maximum capacity for transcoder")
cfg.currentManifest = flag.Bool("currentManifest", *cfg.currentManifest, "Expose the currently active ManifestID as \"/stream/current.m3u8\"")
cfg.nvidia = flag.String("nvidia", *cfg.nvidia, "Comma-separated list of Nvidia GPU device IDs (or \"all\" for all available devices)")
cfg.netint = flag.String("netint", *cfg.netint, "Comma-separated list of NetInt device GUIDs (or \"all\" for all available devices)")
cfg.testTranscoder = flag.Bool("testTranscoder", *cfg.testTranscoder, "Test Nvidia GPU transcoding at startup")
cfg.sceneClassificationModelPath = flag.String("sceneClassificationModelPath", *cfg.sceneClassificationModelPath, "Path to scene classification model")

Expand Down Expand Up @@ -446,6 +450,11 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
return
}

if *cfg.netint != "" && *cfg.nvidia != "" {
glog.Fatal("both -netint and -nvidia arguments specified, this is not supported")
return
}

blockPollingTime := time.Duration(*cfg.blockPollingInterval) * time.Second

type NetworkConfig struct {
Expand Down Expand Up @@ -540,22 +549,39 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
var transcoderCaps []core.Capability
if *cfg.transcoder {
core.WorkDir = *cfg.datadir
accel := ffmpeg.Software
var devicesStr string
if *cfg.nvidia != "" {
accel = ffmpeg.Nvidia
devicesStr = *cfg.nvidia
}
if *cfg.netint != "" {
accel = ffmpeg.Netint
devicesStr = *cfg.netint
}
if accel != ffmpeg.Software {
accelName := ffmpeg.AccelerationNameLookup[accel]
tf, dtf, err := core.GetTranscoderFactoryByAccel(accel)
if err != nil {
glog.Fatalf("Error unsupported acceleration: %v", err)
}
// Get a list of device ids
devices, err := common.ParseNvidiaDevices(*cfg.nvidia)
devices, err := common.ParseAccelDevices(devicesStr, accel)
glog.Infof("%v devices: %v", accelName, devices)
if err != nil {
glog.Fatalf("Error while parsing '-nvidia %v' flag: %v", *cfg.nvidia, err)
glog.Fatalf("Error while parsing '-%v %v' flag: %v", strings.ToLower(accelName), devices, err)
}
glog.Infof("Transcoding on these Nvidia GPUs: %v", devices)
// Test transcoding with nvidia
glog.Infof("Transcoding on these %v devices: %v", accelName, devices)
// Test transcoding with specified device
if *cfg.testTranscoder {
transcoderCaps, err = core.TestTranscoderCapabilities(devices)
transcoderCaps, err = core.TestTranscoderCapabilities(devices, tf)
if err != nil {
glog.Fatal(err)
return
}
}
// FIXME: Short-term hack to pre-load the detection models on every device
if *cfg.sceneClassificationModelPath != "" {
if accel == ffmpeg.Nvidia && *cfg.sceneClassificationModelPath != "" {
detectorProfile := ffmpeg.DSceneAdultSoccer
detectorProfile.ModelPath = *cfg.sceneClassificationModelPath
core.DetectorProfile = &detectorProfile
Expand All @@ -568,7 +594,7 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
}
}
// Initialize LB transcoder
n.Transcoder = core.NewLoadBalancingTranscoder(devices, core.NewNvidiaTranscoder, core.NewNvidiaTranscoderWithDetector)
n.Transcoder = core.NewLoadBalancingTranscoder(devices, tf, dtf)
} else {
// for local software mode, enable all capabilities
transcoderCaps = append(core.DefaultCapabilities(), core.OptionalCapabilities()...)
Expand Down
29 changes: 26 additions & 3 deletions cmd/livepeer_bench/livepeer_bench.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ func main() {
concurrentSessions := flag.Int("concurrentSessions", 1, "# of concurrent transcode sessions")
repeat := flag.Int("repeat", 1, "# of times benchmark will be repeated")
segs := flag.Int("segs", 0, "Maximum # of segments to transcode (default all)")
log := flag.Int("log", 3, "Log level (1 fatal, 2 error, 3 warning, 4 info, 5 verbose, 6 debug, 7 trace, default warning)")
transcodingOptions := flag.String("transcodingOptions", "P240p30fps16x9,P360p30fps16x9,P720p30fps16x9", "Transcoding options for broadcast job, or path to json config")
nvidia := flag.String("nvidia", "", "Comma-separated list of Nvidia GPU device IDs (or \"all\" for all available devices)")
netint := flag.String("netint", "", "Comma-separated list of NetInt device GUIDs (or \"all\" for all available devices)")
outPrefix := flag.String("outPrefix", "", "Output segments' prefix (no segments are generated by default)")
detectionFreq := flag.Int("detectionFreq", 0, "Run content-detection on every nth segment. No detection occurs for default frequency of 0.")
detectionSampleRate := flag.Uint("detectionSampleRate", 1, "Run content-detection on every nth frame of a particular segment, if detectionFreq > 0.")
Expand Down Expand Up @@ -68,11 +70,24 @@ func main() {
if *nvidia != "" {
var err error
accel = ffmpeg.Nvidia
devices, err = common.ParseNvidiaDevices(*nvidia)
devices, err = common.ParseAccelDevices(*nvidia, accel)
if err != nil {
glog.Fatalf("Error while parsing '-nvidia %v' flag: %v", *nvidia, err)
}
}

if *netint != "" {
var err error
accel = ffmpeg.Netint
devices, err = common.ParseAccelDevices(*netint, accel)
if err != nil {
glog.Fatalf("Error while parsing '-netint %v' flag: %v", *netint, err)
}
}

glog.Infof("log level is: %d", ffmpeg.LogLevel(*log*8))
ffmpeg.InitFFmpegWithLogLevel(ffmpeg.LogLevel(*log * 8))

var wg sync.WaitGroup
dir := path.Dir(*in)

Expand All @@ -88,6 +103,13 @@ func main() {
if accel == ffmpeg.Nvidia && len(devices) > 0 {
data = append(data, []string{"Nvidia GPU IDs", fmt.Sprintf("%v", strings.Join(devices, ","))})
}

if accel == ffmpeg.Netint && len(devices) > 0 {
data = append(data, []string{"Netint GUIDs", fmt.Sprintf("%v", strings.Join(devices, ","))})
}

fmt.Printf("data %s \n", data)

if *detectionFreq > 0 {
data = append(data, []string{"Content Detection (segment_freq,frame_sample_rate)", fmt.Sprintf("%v,%v", *detectionFreq, *detectionSampleRate)})
}
Expand Down Expand Up @@ -129,7 +151,7 @@ func main() {
}
fmt.Println("timestamp,session,segment,seg_dur,transcode_time,detect_data")
} else {
fmt.Println("timestamp,session,segment,seg_dur,transcode_time")
fmt.Println("timestamp,session,segment,seg_dur,transcode_time,frames")
}

var mu sync.Mutex
Expand Down Expand Up @@ -168,6 +190,7 @@ func main() {
}
if ffmpeg.Software != accel {
in.Device = devices[k%len(devices)]
fmt.Printf("in.Device %s \n", in.Device)
}
profs2opts := func(profs []ffmpeg.VideoProfile) []ffmpeg.TranscodeOptions {
opts := []ffmpeg.TranscodeOptions{}
Expand Down Expand Up @@ -207,7 +230,7 @@ func main() {
if *detectionFreq > 0 && j%*detectionFreq == 0 {
fmt.Printf("%s,%d,%d,%0.4v,%0.4v,%v\n", end.Format("2006-01-02 15:04:05.9999"), k, j, v.Duration, end.Sub(t).Seconds(), res.Encoded[len(res.Encoded)-1].DetectData)
} else {
fmt.Printf("%s,%d,%d,%0.4v,%0.4v\n", end.Format("2006-01-02 15:04:05.9999"), k, j, v.Duration, end.Sub(t).Seconds())
fmt.Printf("%s,%d,%d,%0.4v,%0.4v,%v\n", end.Format("2006-01-02 15:04:05.9999"), k, j, v.Duration, end.Sub(t).Seconds(), res.Encoded[0].Frames)
}
segTxDur := end.Sub(t).Seconds()
mu.Lock()
Expand Down
38 changes: 38 additions & 0 deletions cmd/livepeer_bench/transcodingOptions-netint.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
[
{
"name": "240p0",
"fps": 0,
"bitrate": 250000,
"width": 426,
"height": 240,
"gop": "1",
"encoder": "HEVC"
},
{
"name": "360p0",
"fps": 0,
"bitrate": 800000,
"width": 640,
"height": 360,
"gop": "1",
"encoder": "HEVC"
},
{
"name": "480p0",
"fps": 0,
"bitrate": 1600000,
"width": 854,
"height": 480,
"gop": "1",
"encoder": "HEVC"
},
{
"name": "720p0",
"fps": 0,
"bitrate": 3000000,
"width": 1280,
"height": 720,
"gop": "1",
"encoder": "HEVC"
}
]
12 changes: 8 additions & 4 deletions cmd/livepeer_bench/transcodingOptions.json
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -5,30 +5,34 @@
"bitrate": 250000,
"width": 426,
"height": 240,
"profile": "h264constrainedhigh"
"profile": "h264constrainedhigh",
"gop": "1"
},
{
"name": "360p0",
"fps": 0,
"bitrate": 800000,
"width": 640,
"height": 360,
"profile": "h264constrainedhigh"
"profile": "h264constrainedhigh",
"gop": "1"
},
{
"name": "480p0",
"fps": 0,
"bitrate": 1600000,
"width": 854,
"height": 480,
"profile": "h264constrainedhigh"
"profile": "h264constrainedhigh",
"gop": "1"
},
{
"name": "720p0",
"fps": 0,
"bitrate": 3000000,
"width": 1280,
"height": 720,
"profile": "h264constrainedhigh"
"profile": "h264constrainedhigh",
"gop": "1"
}
]
6 changes: 3 additions & 3 deletions common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,9 +455,9 @@ func detectNvidiaDevices() ([]string, error) {
return devices, nil
}

func ParseNvidiaDevices(nvidia string) ([]string, error) {
if nvidia == "all" {
func ParseAccelDevices(devices string, acceleration ffmpeg.Acceleration) ([]string, error) {
if acceleration == ffmpeg.Nvidia && devices == "all" {
return detectNvidiaDevices()
}
return strings.Split(nvidia, ","), nil
return strings.Split(devices, ","), nil
}
5 changes: 5 additions & 0 deletions core/capabilities.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ var CapabilityNameLookup = map[Capability]string{
Capability_VP9_Decode: "VP9 decode",
Capability_VP8_Encode: "VP8 encode",
Capability_VP9_Encode: "VP9 encode",
Capability_H264_Decode_444_8bit: "H264 Decode YUV444 8-bit",
Capability_H264_Decode_422_8bit: "H264 Decode YUV422 8-bit",
Capability_H264_Decode_444_10bit: "H264 Decode YUV444 10-bit",
Capability_H264_Decode_422_10bit: "H264 Decode YUV422 10-bit",
Capability_H264_Decode_420_10bit: "H264 Decode YUV420 10-bit",
}

var CapabilityTestLookup = map[Capability]CapabilityTest{
Expand Down
4 changes: 2 additions & 2 deletions core/capabilities_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,10 @@ func TestCapability_TranscoderCapabilities(t *testing.T) {
defer cleanup()

// nvidia test
devices, err := common.ParseNvidiaDevices("all")
devices, err := common.ParseAccelDevices("all", ffmpeg.Nvidia)
devicesAvailable := err == nil && len(devices) > 0
if devicesAvailable {
nvidiaCaps, err := TestTranscoderCapabilities(devices)
nvidiaCaps, err := TestTranscoderCapabilities(devices, NewNvidiaTranscoder)
assert.Nil(t, err)
assert.False(t, InArray(Capability_H264_Decode_444_8bit, nvidiaCaps), "Nvidia device should not support decode of 444_8bit")
assert.False(t, InArray(Capability_H264_Decode_422_8bit, nvidiaCaps), "Nvidia device should not support decode of 422_8bit")
Expand Down
Loading