From bf1cef8fd843847d32de4e49fd9eef59811a4ef2 Mon Sep 17 00:00:00 2001 From: Jai Luthra Date: Mon, 15 Nov 2021 22:10:02 +0530 Subject: [PATCH 1/2] cmd/livepeer_bench: add option to repeat benchmark --- cmd/livepeer_bench/livepeer_bench.go | 220 ++++++++++++++------------- 1 file changed, 113 insertions(+), 107 deletions(-) diff --git a/cmd/livepeer_bench/livepeer_bench.go b/cmd/livepeer_bench/livepeer_bench.go index d942bf358f..843e3df2cd 100755 --- a/cmd/livepeer_bench/livepeer_bench.go +++ b/cmd/livepeer_bench/livepeer_bench.go @@ -32,6 +32,7 @@ func main() { in := flag.String("in", "", "Input m3u8 manifest file") live := flag.Bool("live", true, "Simulate live stream") concurrentSessions := flag.Int("concurrentSessions", 1, "# of concurrent transcode sessions") + repeat := flag.Int("repeat", 1, "# of times benchmark will be repeated") segs := flag.Int("segs", 0, "Maximum # of segments to transcode (default all)") transcodingOptions := flag.String("transcodingOptions", "P240p30fps16x9,P360p30fps16x9,P720p30fps16x9", "Transcoding options for broadcast job, or path to json config") nvidia := flag.String("nvidia", "", "Comma-separated list of Nvidia GPU device IDs (or \"all\" for all available devices)") @@ -92,6 +93,9 @@ func main() { if *detectionFreq > 0 { data = append(data, []string{"Content Detection (segment_freq,frame_sample_rate)", fmt.Sprintf("%v,%v", *detectionFreq, *detectionSampleRate)}) } + if *repeat > 1 { + data = append(data, []string{"Repeat Times", fmt.Sprintf("%v", *repeat)}) + } table.SetAlignment(tablewriter.ALIGN_LEFT) table.SetCenterSeparator("*") @@ -130,122 +134,124 @@ func main() { fmt.Println("timestamp,session,segment,seg_dur,transcode_time") } - segCount := 0 - realTimeSegCount := 0 - srcDur := 0.0 var mu sync.Mutex - transcodeDur := 0.0 - for i := 0; i < *concurrentSessions; i++ { - wg.Add(1) - go func(k int, wg *sync.WaitGroup) { - var tc *ffmpeg.Transcoder - if *detectionFreq > 0 { - t := time.Now() - tc, err = ffmpeg.NewTranscoderWithDetector(detectionOpts.Detector, devices[k%len(devices)]) - end := time.Now() - fmt.Printf("InitDetectorSession time %0.4v for session %v\n", end.Sub(t).Seconds(), i) - if err != nil { - glog.Fatalf("Could not initialize detector") - } - } else { - tc = ffmpeg.NewTranscoder() - } - for j, v := range pl.Segments { - iterStart := time.Now() - if *segs > 0 && j >= *segs { - break - } - if v == nil { - continue - } - u := path.Join(dir, v.URI) - in := &ffmpeg.TranscodeOptionsIn{ - Fname: u, - Accel: accel, - } - if ffmpeg.Software != accel { - in.Device = devices[k%len(devices)] + for repeatCounter := 0; repeatCounter < *repeat; repeatCounter++ { + segCount := 0 + realTimeSegCount := 0 + srcDur := 0.0 + transcodeDur := 0.0 + for i := 0; i < *concurrentSessions; i++ { + wg.Add(1) + go func(k int, wg *sync.WaitGroup) { + var tc *ffmpeg.Transcoder + if *detectionFreq > 0 { + t := time.Now() + tc, err = ffmpeg.NewTranscoderWithDetector(detectionOpts.Detector, devices[k%len(devices)]) + end := time.Now() + fmt.Printf("InitDetectorSession time %0.4v for session %v\n", end.Sub(t).Seconds(), i) + if err != nil { + glog.Fatalf("Could not initialize detector") + } + } else { + tc = ffmpeg.NewTranscoder() } - profs2opts := func(profs []ffmpeg.VideoProfile) []ffmpeg.TranscodeOptions { - opts := []ffmpeg.TranscodeOptions{} - for n, p := range profs { - oname := "" - muxer := "" - if *outPrefix != "" { - oname = fmt.Sprintf("%s_%s_%d_%d_%d.ts", *outPrefix, p.Name, n, k, j) - muxer = "mpegts" - } else { - oname = "-" - muxer = "null" + for j, v := range pl.Segments { + iterStart := time.Now() + if *segs > 0 && j >= *segs { + break + } + if v == nil { + continue + } + u := path.Join(dir, v.URI) + in := &ffmpeg.TranscodeOptionsIn{ + Fname: u, + Accel: accel, + } + if ffmpeg.Software != accel { + in.Device = devices[k%len(devices)] + } + profs2opts := func(profs []ffmpeg.VideoProfile) []ffmpeg.TranscodeOptions { + opts := []ffmpeg.TranscodeOptions{} + for n, p := range profs { + oname := "" + muxer := "" + if *outPrefix != "" { + oname = fmt.Sprintf("%s_%s_%d_%d_%d.ts", *outPrefix, p.Name, n, k, j) + muxer = "mpegts" + } else { + oname = "-" + muxer = "null" + } + o := ffmpeg.TranscodeOptions{ + Oname: oname, + Profile: p, + Accel: accel, + AudioEncoder: ffmpeg.ComponentOptions{Name: "copy"}, + Muxer: ffmpeg.ComponentOptions{Name: muxer}, + CalcSign: *sign, + } + opts = append(opts, o) } - o := ffmpeg.TranscodeOptions{ - Oname: oname, - Profile: p, - Accel: accel, - AudioEncoder: ffmpeg.ComponentOptions{Name: "copy"}, - Muxer: ffmpeg.ComponentOptions{Name: muxer}, - CalcSign: *sign, + // add detector profile if freq > 0 + if *detectionFreq > 0 && j%*detectionFreq == 0 { + opts = append(opts, detectionOpts) } - opts = append(opts, o) + return opts + } + out := profs2opts(profiles) + t := time.Now() + res, err := tc.Transcode(in, out) + end := time.Now() + if err != nil { + glog.Fatalf("Transcoding failed for session %d segment %d: %v", k, j, err) } - // add detector profile if freq > 0 if *detectionFreq > 0 && j%*detectionFreq == 0 { - opts = append(opts, detectionOpts) + fmt.Printf("%s,%d,%d,%0.4v,%0.4v,%v\n", end.Format("2006-01-02 15:04:05.9999"), k, j, v.Duration, end.Sub(t).Seconds(), res.Encoded[len(res.Encoded)-1].DetectData) + } else { + fmt.Printf("%s,%d,%d,%0.4v,%0.4v\n", end.Format("2006-01-02 15:04:05.9999"), k, j, v.Duration, end.Sub(t).Seconds()) + } + segTxDur := end.Sub(t).Seconds() + mu.Lock() + transcodeDur += segTxDur + srcDur += v.Duration + segCount++ + if segTxDur <= v.Duration { + realTimeSegCount += 1 + } + mu.Unlock() + iterEnd := time.Now() + segDur := time.Duration(v.Duration * float64(time.Second)) + if *live { + time.Sleep(segDur - iterEnd.Sub(iterStart)) } - return opts - } - out := profs2opts(profiles) - t := time.Now() - res, err := tc.Transcode(in, out) - end := time.Now() - if err != nil { - glog.Fatalf("Transcoding failed for session %d segment %d: %v", k, j, err) - } - if *detectionFreq > 0 && j%*detectionFreq == 0 { - fmt.Printf("%s,%d,%d,%0.4v,%0.4v,%v\n", end.Format("2006-01-02 15:04:05.9999"), k, j, v.Duration, end.Sub(t).Seconds(), res.Encoded[len(res.Encoded)-1].DetectData) - } else { - fmt.Printf("%s,%d,%d,%0.4v,%0.4v\n", end.Format("2006-01-02 15:04:05.9999"), k, j, v.Duration, end.Sub(t).Seconds()) - } - segTxDur := end.Sub(t).Seconds() - mu.Lock() - transcodeDur += segTxDur - srcDur += v.Duration - segCount++ - if segTxDur <= v.Duration { - realTimeSegCount += 1 - } - mu.Unlock() - iterEnd := time.Now() - segDur := time.Duration(v.Duration * float64(time.Second)) - if *live { - time.Sleep(segDur - iterEnd.Sub(iterStart)) } - } - tc.StopTranscoder() - wg.Done() - }(i, &wg) - time.Sleep(*concurrentSessionDelay) // wait for at least one segment before moving on to the next session - } - wg.Wait() - if segCount == 0 || srcDur == 0.0 { - glog.Fatal("Input manifest has no segments or total duration is 0s") - } - statsTable := tablewriter.NewWriter(os.Stderr) - stats := [][]string{ - {"Concurrent Sessions", fmt.Sprintf("%v", *concurrentSessions)}, - {"Total Segs Transcoded", fmt.Sprintf("%v", segCount)}, - {"Real-Time Segs Transcoded", fmt.Sprintf("%v", realTimeSegCount)}, - {"* Real-Time Segs Ratio *", fmt.Sprintf("%0.4v", float64(realTimeSegCount)/float64(segCount))}, - {"Total Source Duration", fmt.Sprintf("%vs", srcDur)}, - {"Total Transcoding Duration", fmt.Sprintf("%vs", transcodeDur)}, - {"* Real-Time Duration Ratio *", fmt.Sprintf("%0.4v", transcodeDur/srcDur)}, - } + tc.StopTranscoder() + wg.Done() + }(i, &wg) + time.Sleep(*concurrentSessionDelay) // wait for at least one segment before moving on to the next session + } + wg.Wait() + if segCount == 0 || srcDur == 0.0 { + glog.Fatal("Input manifest has no segments or total duration is 0s") + } + statsTable := tablewriter.NewWriter(os.Stderr) + stats := [][]string{ + {"Concurrent Sessions", fmt.Sprintf("%v", *concurrentSessions)}, + {"Total Segs Transcoded", fmt.Sprintf("%v", segCount)}, + {"Real-Time Segs Transcoded", fmt.Sprintf("%v", realTimeSegCount)}, + {"* Real-Time Segs Ratio *", fmt.Sprintf("%0.4v", float64(realTimeSegCount)/float64(segCount))}, + {"Total Source Duration", fmt.Sprintf("%vs", srcDur)}, + {"Total Transcoding Duration", fmt.Sprintf("%vs", transcodeDur)}, + {"* Real-Time Duration Ratio *", fmt.Sprintf("%0.4v", transcodeDur/srcDur)}, + } - statsTable.SetAlignment(tablewriter.ALIGN_LEFT) - statsTable.SetCenterSeparator("*") - statsTable.SetColumnSeparator("|") - statsTable.AppendBulk(stats) - statsTable.Render() + statsTable.SetAlignment(tablewriter.ALIGN_LEFT) + statsTable.SetCenterSeparator("*") + statsTable.SetColumnSeparator("|") + statsTable.AppendBulk(stats) + statsTable.Render() + } } func parseVideoProfiles(inp string) []ffmpeg.VideoProfile { From a0aec8f45cc0038acd9055cfb40f1b98783643ff Mon Sep 17 00:00:00 2001 From: Jai Luthra Date: Mon, 22 Nov 2021 21:19:39 +0530 Subject: [PATCH 2/2] CHANGELOG_PENDING: Add entry for #2114 --- CHANGELOG_PENDING.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index f5217c2229..1055a8cbdd 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -8,6 +8,8 @@ #### General +- \#2114 Add option to repeat the benchmarking process (@jailuthra) + #### Broadcaster - \#2086 Add support for fast verification (@jailuthra @darkdragon)