-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[MM-53432] Improve recording start time accuracy #44
Changes from 4 commits
aa8a9c8
47a637c
3e8e4b8
cd99e44
35b0626
cd3e42a
64d6ac2
76ea034
5a6468b
faf8650
abd54a9
0e26ad1
369b9cc
aa6c6ef
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,9 +1,9 @@ | ||
ca-certificates=20230311 | ||
chromium=116.0.5845.140-1 | ||
chromium-driver=116.0.5845.140-1 | ||
chromium-sandbox=116.0.5845.140-1 | ||
chromium=117.0.5938.62-1 | ||
chromium-driver=117.0.5938.62-1 | ||
chromium-sandbox=117.0.5938.62-1 | ||
ffmpeg=7:6.0-6 | ||
fonts-recommended=1 | ||
pulseaudio=16.1+dfsg1-2+b1 | ||
wget=1.21.3-1+b2 | ||
wget=1.21.4-1+b1 | ||
xvfb=2:21.1.8-1 |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,15 +4,21 @@ import ( | |
"context" | ||
"encoding/base64" | ||
"encoding/json" | ||
"errors" | ||
"fmt" | ||
"io" | ||
"log" | ||
"net" | ||
"os" | ||
"os/exec" | ||
"path/filepath" | ||
"strings" | ||
"sync" | ||
"syscall" | ||
"time" | ||
|
||
"golang.org/x/time/rate" | ||
|
||
"github.com/mattermost/calls-recorder/cmd/recorder/config" | ||
|
||
"github.com/chromedp/cdproto/runtime" | ||
|
@@ -28,17 +34,32 @@ const ( | |
uploadRetryAttemptWaitTime = 5 * time.Second | ||
initPollInterval = 250 * time.Millisecond | ||
connCheckInterval = 1 * time.Second | ||
dataDir = "/data" | ||
) | ||
|
||
const ( | ||
transcoderStartTimeout = 5 * time.Second | ||
transcoderStatsPeriod = 100 * time.Millisecond | ||
transcoderProgressSocketPath = "/tmp/progress.sock" | ||
transcoderProgressBufferSize = 8192 | ||
transcoderProgressLogFreq = 2 * time.Second | ||
) | ||
|
||
type Recorder struct { | ||
cfg config.RecorderConfig | ||
|
||
// browser | ||
readyCh chan struct{} | ||
stopCh chan struct{} | ||
stoppedCh chan struct{} | ||
|
||
// display server | ||
displayServer *exec.Cmd | ||
transcoder *exec.Cmd | ||
|
||
// transcoder | ||
transcoder *exec.Cmd | ||
transcoderStartedCh chan struct{} | ||
transcoderStoppedCh chan struct{} | ||
|
||
outPath string | ||
} | ||
|
@@ -180,6 +201,13 @@ func (rec *Recorder) runBrowser(recURL string) error { | |
return nil | ||
} | ||
continue | ||
case <-rec.transcoderStartedCh: | ||
if err := chromedp.Run(ctx, | ||
chromedp.Evaluate("window.callsClient?.ws?.send('job_started');", nil), | ||
); err != nil { | ||
return fmt.Errorf("failed to send job started event: %w", err) | ||
} | ||
continue | ||
} | ||
break | ||
} | ||
|
@@ -200,7 +228,58 @@ func (rec *Recorder) runBrowser(recURL string) error { | |
} | ||
|
||
func (rec *Recorder) runTranscoder(dst string) error { | ||
args := fmt.Sprintf(`-y -thread_queue_size 4096 -f alsa -i default -r %d -thread_queue_size 4096 -f x11grab -draw_mouse 0 -s %dx%d -i :%d -c:v h264 -preset %s -vf format=yuv420p -b:v %dk -b:a %dk -movflags +faststart %s`, | ||
ln, err := net.Listen("unix", transcoderProgressSocketPath) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This whole method is pretty cool, I learned a lot. Thanks. |
||
if err != nil { | ||
return fmt.Errorf("failed to listen on progress socket: %w", err) | ||
} | ||
|
||
log.Printf("listening on progress socket at %s", ln.Addr()) | ||
|
||
startedCh := make(chan struct{}) | ||
go func() { | ||
defer func() { | ||
if err := ln.Close(); err != nil { | ||
log.Printf("failed to close listener: %s", err) | ||
} | ||
close(rec.transcoderStoppedCh) | ||
}() | ||
|
||
conn, err := ln.Accept() | ||
if err != nil { | ||
log.Printf("failed to accept connection on progress socket: %s", err) | ||
return | ||
} | ||
|
||
log.Printf("accepted connection on progress socket") | ||
|
||
var once sync.Once | ||
limiter := rate.NewLimiter(rate.Every(transcoderProgressLogFreq), 1) | ||
buf := make([]byte, transcoderProgressBufferSize) | ||
for { | ||
n, err := conn.Read(buf) | ||
if err != nil { | ||
Comment on lines
+269
to
+270
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So if I'm reading this correctly, we wait until the transcoder fills up one progress buffer, then we send out the started event? Just wondering if that will cause a timing mismatch (transcoding started, but there's a delay in sending out the started ws event), or is the delay so little it won't matter? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, so the logic here is trivial, And yes, there's always going to be some latency to account for but fortunately we are not looking for lip-sync accuracy here. Delay here should be a few milliseconds at most. |
||
if !errors.Is(err, io.EOF) { | ||
log.Printf("failed to read from conn: %s", err) | ||
} | ||
break | ||
} | ||
|
||
once.Do(func() { | ||
// We signal the browser side that transcoding has begun. | ||
rec.transcoderStartedCh <- struct{}{} | ||
|
||
close(startedCh) | ||
}) | ||
|
||
if limiter.Allow() { | ||
log.Printf("ffmpeg progress:\n%s\n", string(buf[:n])) | ||
} | ||
} | ||
}() | ||
|
||
args := fmt.Sprintf(`-nostats -stats_period %0.2f -progress unix://%s -y -thread_queue_size 4096 -f alsa -i default -r %d -thread_queue_size 4096 -f x11grab -draw_mouse 0 -s %dx%d -i :%d -c:v h264 -preset %s -vf format=yuv420p -b:v %dk -b:a %dk -movflags +faststart %s`, | ||
transcoderStatsPeriod.Seconds(), | ||
ln.Addr(), | ||
rec.cfg.FrameRate, | ||
rec.cfg.Width, | ||
rec.cfg.Height, | ||
|
@@ -213,11 +292,17 @@ func (rec *Recorder) runTranscoder(dst string) error { | |
|
||
cmd, err := runCmd("ffmpeg", args) | ||
if err != nil { | ||
log.Fatal(err) | ||
return fmt.Errorf("failed to run ffmpeg: %w", err) | ||
} | ||
|
||
rec.transcoder = cmd | ||
|
||
select { | ||
case <-startedCh: | ||
case <-time.After(transcoderStartTimeout): | ||
return fmt.Errorf("timed out waiting for transcoder to start") | ||
} | ||
|
||
return nil | ||
} | ||
|
||
|
@@ -232,10 +317,12 @@ func NewRecorder(cfg config.RecorderConfig) (*Recorder, error) { | |
} | ||
|
||
return &Recorder{ | ||
cfg: cfg, | ||
readyCh: make(chan struct{}), | ||
stopCh: make(chan struct{}), | ||
stoppedCh: make(chan struct{}), | ||
cfg: cfg, | ||
readyCh: make(chan struct{}), | ||
stopCh: make(chan struct{}), | ||
stoppedCh: make(chan struct{}), | ||
transcoderStartedCh: make(chan struct{}), | ||
transcoderStoppedCh: make(chan struct{}), | ||
}, nil | ||
} | ||
|
||
|
@@ -271,33 +358,52 @@ func (rec *Recorder) Start() error { | |
log.Printf("browser connected, ready to record") | ||
|
||
filename := fmt.Sprintf("%s-%s.mp4", rec.cfg.CallID, time.Now().UTC().Format("2006-01-02-15_04_05")) | ||
rec.outPath = filepath.Join("/recs", filename) | ||
rec.outPath = filepath.Join(getDataDir(), filename) | ||
err = rec.runTranscoder(rec.outPath) | ||
if err != nil { | ||
return fmt.Errorf("failed to run transcoder: %s", err) | ||
} | ||
|
||
log.Printf("transcoder started at %v", time.Now().UnixMilli()) | ||
|
||
return nil | ||
} | ||
|
||
func (rec *Recorder) Stop() error { | ||
if err := rec.transcoder.Process.Signal(syscall.SIGTERM); err != nil { | ||
log.Printf("failed to send signal: %s", err.Error()) | ||
} | ||
if err := rec.transcoder.Wait(); err != nil { | ||
log.Printf("failed waiting for transcoder to exit: %s", err) | ||
if rec.transcoder != nil { | ||
log.Printf("stopping transcoder") | ||
if err := rec.transcoder.Process.Signal(syscall.SIGTERM); err != nil { | ||
log.Printf("failed to send signal: %s", err) | ||
} | ||
if err := rec.transcoder.Wait(); err != nil { | ||
log.Printf("failed waiting for transcoder to exit: %s", err) | ||
} | ||
<-rec.transcoderStoppedCh | ||
rec.transcoder = nil | ||
} | ||
|
||
close(rec.stopCh) | ||
|
||
var exitErr error | ||
select { | ||
case <-rec.stoppedCh: | ||
case <-time.After(stopTimeout): | ||
return fmt.Errorf("timed out waiting for stopped event") | ||
exitErr = fmt.Errorf("timed out waiting for stopped event") | ||
} | ||
|
||
if rec.displayServer != nil { | ||
log.Printf("stopping display server") | ||
if err := rec.displayServer.Process.Signal(syscall.SIGTERM); err != nil { | ||
log.Printf("failed to send signal: %s", err) | ||
} | ||
if err := rec.displayServer.Wait(); err != nil { | ||
log.Printf("failed waiting for display server to exit: %s", err) | ||
} | ||
rec.displayServer = nil | ||
} | ||
|
||
if err := rec.displayServer.Process.Kill(); err != nil { | ||
log.Printf("failed to stop display process: %s", err) | ||
if exitErr != nil { | ||
return exitErr | ||
} | ||
|
||
// TODO (MM-48546): implement better retry logic. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clever
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Slightly too clever for my taste but it was the quickest way to achieve what I needed. I am actually thinking of redesigning this a bit and make a proper HTTP endpoint since we have another requirement to inform back the plugin side in case of pre-connection failures. Will think some more on this.