Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cmd/livepeer_bench: add option to repeat benchmark #2114

Merged
merged 2 commits into from
Nov 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG_PENDING.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

#### General

- \#2114 Add option to repeat the benchmarking process (@jailuthra)

#### Broadcaster

- \#2086 Add support for fast verification (@jailuthra @darkdragon)
Expand Down
220 changes: 113 additions & 107 deletions cmd/livepeer_bench/livepeer_bench.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func main() {
in := flag.String("in", "", "Input m3u8 manifest file")
live := flag.Bool("live", true, "Simulate live stream")
concurrentSessions := flag.Int("concurrentSessions", 1, "# of concurrent transcode sessions")
repeat := flag.Int("repeat", 1, "# of times benchmark will be repeated")
segs := flag.Int("segs", 0, "Maximum # of segments to transcode (default all)")
transcodingOptions := flag.String("transcodingOptions", "P240p30fps16x9,P360p30fps16x9,P720p30fps16x9", "Transcoding options for broadcast job, or path to json config")
nvidia := flag.String("nvidia", "", "Comma-separated list of Nvidia GPU device IDs (or \"all\" for all available devices)")
Expand Down Expand Up @@ -92,6 +93,9 @@ func main() {
if *detectionFreq > 0 {
data = append(data, []string{"Content Detection (segment_freq,frame_sample_rate)", fmt.Sprintf("%v,%v", *detectionFreq, *detectionSampleRate)})
}
if *repeat > 1 {
data = append(data, []string{"Repeat Times", fmt.Sprintf("%v", *repeat)})
}

table.SetAlignment(tablewriter.ALIGN_LEFT)
table.SetCenterSeparator("*")
Expand Down Expand Up @@ -130,122 +134,124 @@ func main() {
fmt.Println("timestamp,session,segment,seg_dur,transcode_time")
}

segCount := 0
realTimeSegCount := 0
srcDur := 0.0
var mu sync.Mutex
transcodeDur := 0.0
for i := 0; i < *concurrentSessions; i++ {
wg.Add(1)
go func(k int, wg *sync.WaitGroup) {
var tc *ffmpeg.Transcoder
if *detectionFreq > 0 {
t := time.Now()
tc, err = ffmpeg.NewTranscoderWithDetector(detectionOpts.Detector, devices[k%len(devices)])
end := time.Now()
fmt.Printf("InitDetectorSession time %0.4v for session %v\n", end.Sub(t).Seconds(), i)
if err != nil {
glog.Fatalf("Could not initialize detector")
}
} else {
tc = ffmpeg.NewTranscoder()
}
for j, v := range pl.Segments {
iterStart := time.Now()
if *segs > 0 && j >= *segs {
break
}
if v == nil {
continue
}
u := path.Join(dir, v.URI)
in := &ffmpeg.TranscodeOptionsIn{
Fname: u,
Accel: accel,
}
if ffmpeg.Software != accel {
in.Device = devices[k%len(devices)]
for repeatCounter := 0; repeatCounter < *repeat; repeatCounter++ {
segCount := 0
realTimeSegCount := 0
srcDur := 0.0
transcodeDur := 0.0
for i := 0; i < *concurrentSessions; i++ {
wg.Add(1)
go func(k int, wg *sync.WaitGroup) {
var tc *ffmpeg.Transcoder
if *detectionFreq > 0 {
t := time.Now()
tc, err = ffmpeg.NewTranscoderWithDetector(detectionOpts.Detector, devices[k%len(devices)])
end := time.Now()
fmt.Printf("InitDetectorSession time %0.4v for session %v\n", end.Sub(t).Seconds(), i)
if err != nil {
glog.Fatalf("Could not initialize detector")
}
} else {
tc = ffmpeg.NewTranscoder()
}
profs2opts := func(profs []ffmpeg.VideoProfile) []ffmpeg.TranscodeOptions {
opts := []ffmpeg.TranscodeOptions{}
for n, p := range profs {
oname := ""
muxer := ""
if *outPrefix != "" {
oname = fmt.Sprintf("%s_%s_%d_%d_%d.ts", *outPrefix, p.Name, n, k, j)
muxer = "mpegts"
} else {
oname = "-"
muxer = "null"
for j, v := range pl.Segments {
iterStart := time.Now()
if *segs > 0 && j >= *segs {
break
}
if v == nil {
continue
}
u := path.Join(dir, v.URI)
in := &ffmpeg.TranscodeOptionsIn{
Fname: u,
Accel: accel,
}
if ffmpeg.Software != accel {
in.Device = devices[k%len(devices)]
}
profs2opts := func(profs []ffmpeg.VideoProfile) []ffmpeg.TranscodeOptions {
opts := []ffmpeg.TranscodeOptions{}
for n, p := range profs {
oname := ""
muxer := ""
if *outPrefix != "" {
oname = fmt.Sprintf("%s_%s_%d_%d_%d.ts", *outPrefix, p.Name, n, k, j)
muxer = "mpegts"
} else {
oname = "-"
muxer = "null"
}
o := ffmpeg.TranscodeOptions{
Oname: oname,
Profile: p,
Accel: accel,
AudioEncoder: ffmpeg.ComponentOptions{Name: "copy"},
Muxer: ffmpeg.ComponentOptions{Name: muxer},
CalcSign: *sign,
}
opts = append(opts, o)
}
o := ffmpeg.TranscodeOptions{
Oname: oname,
Profile: p,
Accel: accel,
AudioEncoder: ffmpeg.ComponentOptions{Name: "copy"},
Muxer: ffmpeg.ComponentOptions{Name: muxer},
CalcSign: *sign,
// add detector profile if freq > 0
if *detectionFreq > 0 && j%*detectionFreq == 0 {
opts = append(opts, detectionOpts)
}
opts = append(opts, o)
return opts
}
out := profs2opts(profiles)
t := time.Now()
res, err := tc.Transcode(in, out)
end := time.Now()
if err != nil {
glog.Fatalf("Transcoding failed for session %d segment %d: %v", k, j, err)
}
// add detector profile if freq > 0
if *detectionFreq > 0 && j%*detectionFreq == 0 {
opts = append(opts, detectionOpts)
fmt.Printf("%s,%d,%d,%0.4v,%0.4v,%v\n", end.Format("2006-01-02 15:04:05.9999"), k, j, v.Duration, end.Sub(t).Seconds(), res.Encoded[len(res.Encoded)-1].DetectData)
} else {
fmt.Printf("%s,%d,%d,%0.4v,%0.4v\n", end.Format("2006-01-02 15:04:05.9999"), k, j, v.Duration, end.Sub(t).Seconds())
}
segTxDur := end.Sub(t).Seconds()
mu.Lock()
transcodeDur += segTxDur
srcDur += v.Duration
segCount++
if segTxDur <= v.Duration {
realTimeSegCount += 1
}
mu.Unlock()
iterEnd := time.Now()
segDur := time.Duration(v.Duration * float64(time.Second))
if *live {
time.Sleep(segDur - iterEnd.Sub(iterStart))
}
return opts
}
out := profs2opts(profiles)
t := time.Now()
res, err := tc.Transcode(in, out)
end := time.Now()
if err != nil {
glog.Fatalf("Transcoding failed for session %d segment %d: %v", k, j, err)
}
if *detectionFreq > 0 && j%*detectionFreq == 0 {
fmt.Printf("%s,%d,%d,%0.4v,%0.4v,%v\n", end.Format("2006-01-02 15:04:05.9999"), k, j, v.Duration, end.Sub(t).Seconds(), res.Encoded[len(res.Encoded)-1].DetectData)
} else {
fmt.Printf("%s,%d,%d,%0.4v,%0.4v\n", end.Format("2006-01-02 15:04:05.9999"), k, j, v.Duration, end.Sub(t).Seconds())
}
segTxDur := end.Sub(t).Seconds()
mu.Lock()
transcodeDur += segTxDur
srcDur += v.Duration
segCount++
if segTxDur <= v.Duration {
realTimeSegCount += 1
}
mu.Unlock()
iterEnd := time.Now()
segDur := time.Duration(v.Duration * float64(time.Second))
if *live {
time.Sleep(segDur - iterEnd.Sub(iterStart))
}
}
tc.StopTranscoder()
wg.Done()
}(i, &wg)
time.Sleep(*concurrentSessionDelay) // wait for at least one segment before moving on to the next session
}
wg.Wait()
if segCount == 0 || srcDur == 0.0 {
glog.Fatal("Input manifest has no segments or total duration is 0s")
}
statsTable := tablewriter.NewWriter(os.Stderr)
stats := [][]string{
{"Concurrent Sessions", fmt.Sprintf("%v", *concurrentSessions)},
{"Total Segs Transcoded", fmt.Sprintf("%v", segCount)},
{"Real-Time Segs Transcoded", fmt.Sprintf("%v", realTimeSegCount)},
{"* Real-Time Segs Ratio *", fmt.Sprintf("%0.4v", float64(realTimeSegCount)/float64(segCount))},
{"Total Source Duration", fmt.Sprintf("%vs", srcDur)},
{"Total Transcoding Duration", fmt.Sprintf("%vs", transcodeDur)},
{"* Real-Time Duration Ratio *", fmt.Sprintf("%0.4v", transcodeDur/srcDur)},
}
tc.StopTranscoder()
wg.Done()
}(i, &wg)
time.Sleep(*concurrentSessionDelay) // wait for at least one segment before moving on to the next session
}
wg.Wait()
if segCount == 0 || srcDur == 0.0 {
glog.Fatal("Input manifest has no segments or total duration is 0s")
}
statsTable := tablewriter.NewWriter(os.Stderr)
stats := [][]string{
{"Concurrent Sessions", fmt.Sprintf("%v", *concurrentSessions)},
{"Total Segs Transcoded", fmt.Sprintf("%v", segCount)},
{"Real-Time Segs Transcoded", fmt.Sprintf("%v", realTimeSegCount)},
{"* Real-Time Segs Ratio *", fmt.Sprintf("%0.4v", float64(realTimeSegCount)/float64(segCount))},
{"Total Source Duration", fmt.Sprintf("%vs", srcDur)},
{"Total Transcoding Duration", fmt.Sprintf("%vs", transcodeDur)},
{"* Real-Time Duration Ratio *", fmt.Sprintf("%0.4v", transcodeDur/srcDur)},
}

statsTable.SetAlignment(tablewriter.ALIGN_LEFT)
statsTable.SetCenterSeparator("*")
statsTable.SetColumnSeparator("|")
statsTable.AppendBulk(stats)
statsTable.Render()
statsTable.SetAlignment(tablewriter.ALIGN_LEFT)
statsTable.SetCenterSeparator("*")
statsTable.SetColumnSeparator("|")
statsTable.AppendBulk(stats)
statsTable.Render()
}
}

func parseVideoProfiles(inp string) []ffmpeg.VideoProfile {
Expand Down