Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MM-53432] Improve recording start time accuracy #44

Merged
merged 14 commits into from
Nov 21, 2023
Merged
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ docker pull mattermost/calls-recorder:latest
### Run the container

```
docker run --network=host --name calls-recorder -e "SITE_URL=http://127.0.0.1:8065/" -e "AUTH_TOKEN=ohqd1phqtt8m3gsfg8j5ymymqy" -e "CALL_ID=9c86b3q57fgfpqr8jq3b9yjweh" -e "THREAD_ID=e4pdmi6rqpn7pp9sity9hiza3r" -e "DEV_MODE=true" -v calls-recorder-volume:/recs mattermost/calls-recorder
docker run --network=host --name calls-recorder -e "SITE_URL=http://127.0.0.1:8065/" -e "AUTH_TOKEN=ohqd1phqtt8m3gsfg8j5ymymqy" -e "CALL_ID=9c86b3q57fgfpqr8jq3b9yjweh" -e "THREAD_ID=e4pdmi6rqpn7pp9sity9hiza3r" -e "DEV_MODE=true" -v calls-recorder-volume:/data mattermost/calls-recorder
```

> **_Note_**
Expand Down
2 changes: 1 addition & 1 deletion build/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ trap 'kill ${!}; term_handler' SIGTERM
RECORDER_USER=calls

# Give permission to write recording files.
chown -R $RECORDER_USER:$RECORDER_USER /recs
chown -R $RECORDER_USER:$RECORDER_USER /data
# Give permissions to home directory so that Chromium can create any
# necessary files and directories.
chown -R $RECORDER_USER:$RECORDER_USER /home/$RECORDER_USER
Expand Down
8 changes: 4 additions & 4 deletions build/pkgs_list
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
ca-certificates=20230311
chromium=116.0.5845.140-1
chromium-driver=116.0.5845.140-1
chromium-sandbox=116.0.5845.140-1
chromium=117.0.5938.62-1
chromium-driver=117.0.5938.62-1
chromium-sandbox=117.0.5938.62-1
ffmpeg=7:6.0-6
fonts-recommended=1
pulseaudio=16.1+dfsg1-2+b1
wget=1.21.3-1+b2
wget=1.21.4-1+b1
xvfb=2:21.1.8-1
8 changes: 8 additions & 0 deletions cmd/recorder/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,10 @@ func (cfg *RecorderConfig) SetDefaults() {
}

func (cfg RecorderConfig) ToEnv() []string {
if cfg == (RecorderConfig{}) {
return nil
}

return []string{
fmt.Sprintf("SITE_URL=%s", cfg.SiteURL),
fmt.Sprintf("CALL_ID=%s", cfg.CallID),
Expand All @@ -192,6 +196,10 @@ func (cfg RecorderConfig) ToEnv() []string {
}

func (cfg RecorderConfig) ToMap() map[string]any {
if cfg == (RecorderConfig{}) {
return nil
}

return map[string]any{
"site_url": cfg.SiteURL,
"call_id": cfg.CallID,
Expand Down
9 changes: 7 additions & 2 deletions cmd/recorder/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,15 @@ func main() {
log.Fatalf("failed to create recorder: %s", err)
}

log.Printf("starting recordinig")
log.Printf("starting recording")

if err := recorder.Start(); err != nil {
log.Fatalf("failed to start recording: %s", err)
log.Printf("failed to start recording: %s", err)
// cleaning up
if err := recorder.Stop(); err != nil {
log.Printf("failed to stop recorder: %s", err)
}
os.Exit(1)
}

log.Printf("recording has started")
Expand Down
138 changes: 122 additions & 16 deletions cmd/recorder/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,21 @@ import (
"context"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"net"
"os"
"os/exec"
"path/filepath"
"strings"
"sync"
"syscall"
"time"

"golang.org/x/time/rate"

"github.com/mattermost/calls-recorder/cmd/recorder/config"

"github.com/chromedp/cdproto/runtime"
Expand All @@ -28,17 +34,32 @@ const (
uploadRetryAttemptWaitTime = 5 * time.Second
initPollInterval = 250 * time.Millisecond
connCheckInterval = 1 * time.Second
dataDir = "/data"
)

const (
transcoderStartTimeout = 5 * time.Second
transcoderStatsPeriod = 100 * time.Millisecond
transcoderProgressSocketPath = "/tmp/progress.sock"
transcoderProgressBufferSize = 8192
transcoderProgressLogFreq = 2 * time.Second
)

type Recorder struct {
cfg config.RecorderConfig

// browser
readyCh chan struct{}
stopCh chan struct{}
stoppedCh chan struct{}

// display server
displayServer *exec.Cmd
transcoder *exec.Cmd

// transcoder
transcoder *exec.Cmd
transcoderStartedCh chan struct{}
transcoderStoppedCh chan struct{}

outPath string
}
Expand Down Expand Up @@ -180,6 +201,13 @@ func (rec *Recorder) runBrowser(recURL string) error {
return nil
}
continue
case <-rec.transcoderStartedCh:
if err := chromedp.Run(ctx,
chromedp.Evaluate("window.callsClient?.ws?.send('job_started');", nil),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clever

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Slightly too clever for my taste but it was the quickest way to achieve what I needed. I am actually thinking of redesigning this a bit and make a proper HTTP endpoint since we have another requirement to inform back the plugin side in case of pre-connection failures. Will think some more on this.

); err != nil {
return fmt.Errorf("failed to send job started event: %w", err)
}
continue
}
break
}
Expand All @@ -200,7 +228,58 @@ func (rec *Recorder) runBrowser(recURL string) error {
}

func (rec *Recorder) runTranscoder(dst string) error {
args := fmt.Sprintf(`-y -thread_queue_size 4096 -f alsa -i default -r %d -thread_queue_size 4096 -f x11grab -draw_mouse 0 -s %dx%d -i :%d -c:v h264 -preset %s -vf format=yuv420p -b:v %dk -b:a %dk -movflags +faststart %s`,
ln, err := net.Listen("unix", transcoderProgressSocketPath)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This whole method is pretty cool, I learned a lot. Thanks.

if err != nil {
return fmt.Errorf("failed to listen on progress socket: %w", err)
}

log.Printf("listening on progress socket at %s", ln.Addr())

startedCh := make(chan struct{})
go func() {
defer func() {
if err := ln.Close(); err != nil {
log.Printf("failed to close listener: %s", err)
}
close(rec.transcoderStoppedCh)
}()

conn, err := ln.Accept()
if err != nil {
log.Printf("failed to accept connection on progress socket: %s", err)
return
}

log.Printf("accepted connection on progress socket")

var once sync.Once
limiter := rate.NewLimiter(rate.Every(transcoderProgressLogFreq), 1)
buf := make([]byte, transcoderProgressBufferSize)
for {
n, err := conn.Read(buf)
if err != nil {
Comment on lines +269 to +270
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if I'm reading this correctly, we wait until the transcoder fills up one progress buffer, then we send out the started event? Just wondering if that will cause a timing mismatch (transcoding started, but there's a delay in sending out the started ws event), or is the delay so little it won't matter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, so the logic here is trivial, ffmpeg will output one progress line per processed video frame so we act as soon as we receive the first one. To be more accurate we could parse the actual output but so far I don't think it's needed.

And yes, there's always going to be some latency to account for but fortunately we are not looking for lip-sync accuracy here. Delay here should be a few milliseconds at most.

if !errors.Is(err, io.EOF) {
log.Printf("failed to read from conn: %s", err)
}
break
}

once.Do(func() {
// We signal the browser side that transcoding has begun.
rec.transcoderStartedCh <- struct{}{}

close(startedCh)
})

if limiter.Allow() {
log.Printf("ffmpeg progress:\n%s\n", string(buf[:n]))
}
}
}()

args := fmt.Sprintf(`-nostats -stats_period %0.2f -progress unix://%s -y -thread_queue_size 4096 -f alsa -i default -r %d -thread_queue_size 4096 -f x11grab -draw_mouse 0 -s %dx%d -i :%d -c:v h264 -preset %s -vf format=yuv420p -b:v %dk -b:a %dk -movflags +faststart %s`,
transcoderStatsPeriod.Seconds(),
ln.Addr(),
rec.cfg.FrameRate,
rec.cfg.Width,
rec.cfg.Height,
Expand All @@ -213,11 +292,17 @@ func (rec *Recorder) runTranscoder(dst string) error {

cmd, err := runCmd("ffmpeg", args)
if err != nil {
log.Fatal(err)
return fmt.Errorf("failed to run ffmpeg: %w", err)
}

rec.transcoder = cmd

select {
case <-startedCh:
case <-time.After(transcoderStartTimeout):
return fmt.Errorf("timed out waiting for transcoder to start")
}

return nil
}

Expand All @@ -232,10 +317,12 @@ func NewRecorder(cfg config.RecorderConfig) (*Recorder, error) {
}

return &Recorder{
cfg: cfg,
readyCh: make(chan struct{}),
stopCh: make(chan struct{}),
stoppedCh: make(chan struct{}),
cfg: cfg,
readyCh: make(chan struct{}),
stopCh: make(chan struct{}),
stoppedCh: make(chan struct{}),
transcoderStartedCh: make(chan struct{}),
transcoderStoppedCh: make(chan struct{}),
}, nil
}

Expand Down Expand Up @@ -271,33 +358,52 @@ func (rec *Recorder) Start() error {
log.Printf("browser connected, ready to record")

filename := fmt.Sprintf("%s-%s.mp4", rec.cfg.CallID, time.Now().UTC().Format("2006-01-02-15_04_05"))
rec.outPath = filepath.Join("/recs", filename)
rec.outPath = filepath.Join(getDataDir(), filename)
err = rec.runTranscoder(rec.outPath)
if err != nil {
return fmt.Errorf("failed to run transcoder: %s", err)
}

log.Printf("transcoder started at %v", time.Now().UnixMilli())

return nil
}

func (rec *Recorder) Stop() error {
if err := rec.transcoder.Process.Signal(syscall.SIGTERM); err != nil {
log.Printf("failed to send signal: %s", err.Error())
}
if err := rec.transcoder.Wait(); err != nil {
log.Printf("failed waiting for transcoder to exit: %s", err)
if rec.transcoder != nil {
log.Printf("stopping transcoder")
if err := rec.transcoder.Process.Signal(syscall.SIGTERM); err != nil {
log.Printf("failed to send signal: %s", err)
}
if err := rec.transcoder.Wait(); err != nil {
log.Printf("failed waiting for transcoder to exit: %s", err)
}
<-rec.transcoderStoppedCh
rec.transcoder = nil
}

close(rec.stopCh)

var exitErr error
select {
case <-rec.stoppedCh:
case <-time.After(stopTimeout):
return fmt.Errorf("timed out waiting for stopped event")
exitErr = fmt.Errorf("timed out waiting for stopped event")
}

if rec.displayServer != nil {
log.Printf("stopping display server")
if err := rec.displayServer.Process.Signal(syscall.SIGTERM); err != nil {
log.Printf("failed to send signal: %s", err)
}
if err := rec.displayServer.Wait(); err != nil {
log.Printf("failed waiting for display server to exit: %s", err)
}
rec.displayServer = nil
}

if err := rec.displayServer.Process.Kill(); err != nil {
log.Printf("failed to stop display process: %s", err)
if exitErr != nil {
return exitErr
}

// TODO (MM-48546): implement better retry logic.
Expand Down
8 changes: 8 additions & 0 deletions cmd/recorder/utils.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"os"
"regexp"
)

Expand All @@ -11,3 +12,10 @@ var (
func sanitizeConsoleLog(str string) string {
return icePasswordRE.ReplaceAllString(str, "ice-pwd:XXX")
}

func getDataDir() string {
if dir := os.Getenv("DATA_DIR"); dir != "" {
return dir
}
return dataDir
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/chromedp/chromedp v0.9.2
github.com/mattermost/mattermost/server/public v0.0.0-20230613002302-62a3ee8adcb5
github.com/stretchr/testify v1.8.2
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c
)

require (
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ golang.org/x/text v0.5.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE=
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c h1:fqgJT0MGcGpPgpWU7VRdRjuArfcOvC4AoJmILihzhDg=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
Expand Down