diff --git a/internal/asyncwriter/async_writer.go b/internal/asyncwriter/async_writer.go deleted file mode 100644 index 2c80f0d3032..00000000000 --- a/internal/asyncwriter/async_writer.go +++ /dev/null @@ -1,76 +0,0 @@ -// Package asyncwriter contains an asynchronous writer. -package asyncwriter - -import ( - "fmt" - - "github.com/bluenviron/gortsplib/v4/pkg/ringbuffer" - - "github.com/bluenviron/mediamtx/internal/logger" -) - -// Writer is an asynchronous writer. -type Writer struct { - writeErrLogger logger.Writer - buffer *ringbuffer.RingBuffer - - // out - err chan error -} - -// New allocates a Writer. -func New( - queueSize int, - parent logger.Writer, -) *Writer { - buffer, _ := ringbuffer.New(uint64(queueSize)) - - return &Writer{ - writeErrLogger: logger.NewLimitedLogger(parent), - buffer: buffer, - err: make(chan error), - } -} - -// Start starts the writer routine. -func (w *Writer) Start() { - go w.run() -} - -// Stop stops the writer routine. -func (w *Writer) Stop() { - w.buffer.Close() - <-w.err -} - -// Error returns whenever there's an error. -func (w *Writer) Error() chan error { - return w.err -} - -func (w *Writer) run() { - w.err <- w.runInner() - close(w.err) -} - -func (w *Writer) runInner() error { - for { - cb, ok := w.buffer.Pull() - if !ok { - return fmt.Errorf("terminated") - } - - err := cb.(func() error)() - if err != nil { - return err - } - } -} - -// Push appends an element to the queue. -func (w *Writer) Push(cb func() error) { - ok := w.buffer.Push(cb) - if !ok { - w.writeErrLogger.Log(logger.Warn, "write queue is full") - } -} diff --git a/internal/asyncwriter/async_writer_test.go b/internal/asyncwriter/async_writer_test.go deleted file mode 100644 index 139dd44a593..00000000000 --- a/internal/asyncwriter/async_writer_test.go +++ /dev/null @@ -1,22 +0,0 @@ -package asyncwriter - -import ( - "fmt" - "testing" - - "github.com/stretchr/testify/require" -) - -func TestAsyncWriter(t *testing.T) { - w := New(512, nil) - - w.Start() - defer w.Stop() - - w.Push(func() error { - return fmt.Errorf("testerror") - }) - - err := <-w.Error() - require.EqualError(t, err, "testerror") -} diff --git a/internal/core/core.go b/internal/core/core.go index d55a8d8ee84..c23f774ff92 100644 --- a/internal/core/core.go +++ b/internal/core/core.go @@ -440,7 +440,6 @@ func (p *Core) createResources(initial bool) error { Address: p.conf.RTMPAddress, ReadTimeout: p.conf.ReadTimeout, WriteTimeout: p.conf.WriteTimeout, - WriteQueueSize: p.conf.WriteQueueSize, IsTLS: false, ServerCert: "", ServerKey: "", @@ -471,7 +470,6 @@ func (p *Core) createResources(initial bool) error { Address: p.conf.RTMPSAddress, ReadTimeout: p.conf.ReadTimeout, WriteTimeout: p.conf.WriteTimeout, - WriteQueueSize: p.conf.WriteQueueSize, IsTLS: true, ServerCert: p.conf.RTMPServerCert, ServerKey: p.conf.RTMPServerKey, @@ -511,7 +509,6 @@ func (p *Core) createResources(initial bool) error { SegmentMaxSize: p.conf.HLSSegmentMaxSize, Directory: p.conf.HLSDirectory, ReadTimeout: p.conf.ReadTimeout, - WriteQueueSize: p.conf.WriteQueueSize, MuxerCloseAfter: p.conf.HLSMuxerCloseAfter, PathManager: p.pathManager, Parent: p, @@ -539,7 +536,6 @@ func (p *Core) createResources(initial bool) error { AllowOrigin: p.conf.WebRTCAllowOrigin, TrustedProxies: p.conf.WebRTCTrustedProxies, ReadTimeout: p.conf.ReadTimeout, - WriteQueueSize: p.conf.WriteQueueSize, LocalUDPAddress: p.conf.WebRTCLocalUDPAddress, LocalTCPAddress: p.conf.WebRTCLocalTCPAddress, IPsFromInterfaces: p.conf.WebRTCIPsFromInterfaces, @@ -570,7 +566,6 @@ func (p *Core) createResources(initial bool) error { RTSPAddress: p.conf.RTSPAddress, ReadTimeout: p.conf.ReadTimeout, WriteTimeout: p.conf.WriteTimeout, - WriteQueueSize: p.conf.WriteQueueSize, UDPMaxPayloadSize: p.conf.UDPMaxPayloadSize, RunOnConnect: p.conf.RunOnConnect, RunOnConnectRestart: p.conf.RunOnConnectRestart, @@ -755,7 +750,6 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) { newConf.RTMPAddress != p.conf.RTMPAddress || newConf.ReadTimeout != p.conf.ReadTimeout || newConf.WriteTimeout != p.conf.WriteTimeout || - newConf.WriteQueueSize != p.conf.WriteQueueSize || newConf.RTSPAddress != p.conf.RTSPAddress || newConf.RunOnConnect != p.conf.RunOnConnect || newConf.RunOnConnectRestart != p.conf.RunOnConnectRestart || @@ -770,7 +764,6 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) { newConf.RTMPSAddress != p.conf.RTMPSAddress || newConf.ReadTimeout != p.conf.ReadTimeout || newConf.WriteTimeout != p.conf.WriteTimeout || - newConf.WriteQueueSize != p.conf.WriteQueueSize || newConf.RTMPServerCert != p.conf.RTMPServerCert || newConf.RTMPServerKey != p.conf.RTMPServerKey || newConf.RTSPAddress != p.conf.RTSPAddress || @@ -797,7 +790,6 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) { newConf.HLSSegmentMaxSize != p.conf.HLSSegmentMaxSize || newConf.HLSDirectory != p.conf.HLSDirectory || newConf.ReadTimeout != p.conf.ReadTimeout || - newConf.WriteQueueSize != p.conf.WriteQueueSize || newConf.HLSMuxerCloseAfter != p.conf.HLSMuxerCloseAfter || closePathManager || closeMetrics || @@ -812,7 +804,6 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) { newConf.WebRTCAllowOrigin != p.conf.WebRTCAllowOrigin || !reflect.DeepEqual(newConf.WebRTCTrustedProxies, p.conf.WebRTCTrustedProxies) || newConf.ReadTimeout != p.conf.ReadTimeout || - newConf.WriteQueueSize != p.conf.WriteQueueSize || newConf.WebRTCLocalUDPAddress != p.conf.WebRTCLocalUDPAddress || newConf.WebRTCLocalTCPAddress != p.conf.WebRTCLocalTCPAddress || newConf.WebRTCIPsFromInterfaces != p.conf.WebRTCIPsFromInterfaces || @@ -831,7 +822,6 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) { newConf.RTSPAddress != p.conf.RTSPAddress || newConf.ReadTimeout != p.conf.ReadTimeout || newConf.WriteTimeout != p.conf.WriteTimeout || - newConf.WriteQueueSize != p.conf.WriteQueueSize || newConf.UDPMaxPayloadSize != p.conf.UDPMaxPayloadSize || newConf.RunOnConnect != p.conf.RunOnConnect || newConf.RunOnConnectRestart != p.conf.RunOnConnectRestart || diff --git a/internal/core/path.go b/internal/core/path.go index 254122dbc0a..4eeae3a4a15 100644 --- a/internal/core/path.go +++ b/internal/core/path.go @@ -711,6 +711,7 @@ func (pa *path) onDemandPublisherStop(reason string) { func (pa *path) setReady(desc *description.Session, allocateEncoder bool) error { var err error pa.stream, err = stream.New( + pa.writeQueueSize, pa.udpMaxPayloadSize, desc, allocateEncoder, @@ -777,7 +778,6 @@ func (pa *path) setNotReady() { func (pa *path) startRecording() { pa.recorder = &recorder.Recorder{ - WriteQueueSize: pa.writeQueueSize, PathFormat: pa.conf.RecordPath, Format: pa.conf.RecordFormat, PartDuration: time.Duration(pa.conf.RecordPartDuration), diff --git a/internal/protocols/hls/from_stream.go b/internal/protocols/hls/from_stream.go index 3195c517910..f08facb3f5e 100644 --- a/internal/protocols/hls/from_stream.go +++ b/internal/protocols/hls/from_stream.go @@ -7,8 +7,8 @@ import ( "github.com/bluenviron/gohlslib/v2" "github.com/bluenviron/gohlslib/v2/pkg/codecs" + "github.com/bluenviron/gortsplib/v4/pkg/description" "github.com/bluenviron/gortsplib/v4/pkg/format" - "github.com/bluenviron/mediamtx/internal/asyncwriter" "github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/stream" "github.com/bluenviron/mediamtx/internal/unit" @@ -19,195 +19,217 @@ var ErrNoSupportedCodecs = errors.New( "the stream doesn't contain any supported codec, which are currently AV1, VP9, H265, H264, Opus, MPEG-4 Audio") func setupVideoTrack( - stream *stream.Stream, - writer *asyncwriter.Writer, + strea *stream.Stream, + reader stream.Reader, muxer *gohlslib.Muxer, setuppedFormats map[format.Format]struct{}, ) { + addTrack := func( + media *description.Media, + forma format.Format, + track *gohlslib.Track, + readFunc stream.ReadFunc, + ) { + muxer.Tracks = append(muxer.Tracks, track) + setuppedFormats[forma] = struct{}{} + strea.AddReader(reader, media, forma, readFunc) + } + var videoFormatAV1 *format.AV1 - videoMedia := stream.Desc().FindFormat(&videoFormatAV1) + videoMedia := strea.Desc().FindFormat(&videoFormatAV1) if videoFormatAV1 != nil { - track := &gohlslib.Track{ - Codec: &codecs.AV1{}, - } - muxer.Tracks = append(muxer.Tracks, track) - setuppedFormats[videoFormatAV1] = struct{}{} + track := &gohlslib.Track{Codec: &codecs.AV1{}} - stream.AddReader(writer, videoMedia, videoFormatAV1, func(u unit.Unit) error { - tunit := u.(*unit.AV1) + addTrack( + videoMedia, + videoFormatAV1, + track, + func(u unit.Unit) error { + tunit := u.(*unit.AV1) - if tunit.TU == nil { - return nil - } + if tunit.TU == nil { + return nil + } - err := muxer.WriteAV1(track, tunit.NTP, tunit.PTS, tunit.TU) - if err != nil { - return fmt.Errorf("muxer error: %w", err) - } + err := muxer.WriteAV1(track, tunit.NTP, tunit.PTS, tunit.TU) + if err != nil { + return fmt.Errorf("muxer error: %w", err) + } - return nil - }) + return nil + }) return } var videoFormatVP9 *format.VP9 - videoMedia = stream.Desc().FindFormat(&videoFormatVP9) + videoMedia = strea.Desc().FindFormat(&videoFormatVP9) if videoFormatVP9 != nil { - track := &gohlslib.Track{ - Codec: &codecs.VP9{}, - } - muxer.Tracks = append(muxer.Tracks, track) - setuppedFormats[videoFormatVP9] = struct{}{} + track := &gohlslib.Track{Codec: &codecs.VP9{}} - stream.AddReader(writer, videoMedia, videoFormatVP9, func(u unit.Unit) error { - tunit := u.(*unit.VP9) + addTrack( + videoMedia, + videoFormatVP9, + track, + func(u unit.Unit) error { + tunit := u.(*unit.VP9) - if tunit.Frame == nil { - return nil - } + if tunit.Frame == nil { + return nil + } - err := muxer.WriteVP9(track, tunit.NTP, tunit.PTS, tunit.Frame) - if err != nil { - return fmt.Errorf("muxer error: %w", err) - } + err := muxer.WriteVP9(track, tunit.NTP, tunit.PTS, tunit.Frame) + if err != nil { + return fmt.Errorf("muxer error: %w", err) + } - return nil - }) + return nil + }) return } var videoFormatH265 *format.H265 - videoMedia = stream.Desc().FindFormat(&videoFormatH265) + videoMedia = strea.Desc().FindFormat(&videoFormatH265) if videoFormatH265 != nil { vps, sps, pps := videoFormatH265.SafeParams() - track := &gohlslib.Track{ - Codec: &codecs.H265{ - VPS: vps, - SPS: sps, - PPS: pps, - }, - } - muxer.Tracks = append(muxer.Tracks, track) - setuppedFormats[videoFormatH265] = struct{}{} + track := &gohlslib.Track{Codec: &codecs.H265{ + VPS: vps, + SPS: sps, + PPS: pps, + }} + + addTrack( + videoMedia, + videoFormatH265, + track, + func(u unit.Unit) error { + tunit := u.(*unit.H265) + + if tunit.AU == nil { + return nil + } - stream.AddReader(writer, videoMedia, videoFormatH265, func(u unit.Unit) error { - tunit := u.(*unit.H265) + err := muxer.WriteH265(track, tunit.NTP, tunit.PTS, tunit.AU) + if err != nil { + return fmt.Errorf("muxer error: %w", err) + } - if tunit.AU == nil { return nil - } - - err := muxer.WriteH265(track, tunit.NTP, tunit.PTS, tunit.AU) - if err != nil { - return fmt.Errorf("muxer error: %w", err) - } - - return nil - }) + }) return } var videoFormatH264 *format.H264 - videoMedia = stream.Desc().FindFormat(&videoFormatH264) + videoMedia = strea.Desc().FindFormat(&videoFormatH264) if videoFormatH264 != nil { sps, pps := videoFormatH264.SafeParams() - track := &gohlslib.Track{ - Codec: &codecs.H264{ - SPS: sps, - PPS: pps, - }, - } - muxer.Tracks = append(muxer.Tracks, track) - setuppedFormats[videoFormatH264] = struct{}{} + track := &gohlslib.Track{Codec: &codecs.H264{ + SPS: sps, + PPS: pps, + }} + + addTrack( + videoMedia, + videoFormatH264, + track, + func(u unit.Unit) error { + tunit := u.(*unit.H264) + + if tunit.AU == nil { + return nil + } - stream.AddReader(writer, videoMedia, videoFormatH264, func(u unit.Unit) error { - tunit := u.(*unit.H264) + err := muxer.WriteH264(track, tunit.NTP, tunit.PTS, tunit.AU) + if err != nil { + return fmt.Errorf("muxer error: %w", err) + } - if tunit.AU == nil { return nil - } - - err := muxer.WriteH264(track, tunit.NTP, tunit.PTS, tunit.AU) - if err != nil { - return fmt.Errorf("muxer error: %w", err) - } - - return nil - }) + }) return } } func setupAudioTracks( - stream *stream.Stream, - writer *asyncwriter.Writer, + strea *stream.Stream, + reader stream.Reader, muxer *gohlslib.Muxer, setuppedFormats map[format.Format]struct{}, ) { - for _, media := range stream.Desc().Medias { + addTrack := func( + medi *description.Media, + forma format.Format, + track *gohlslib.Track, + readFunc stream.ReadFunc, + ) { + muxer.Tracks = append(muxer.Tracks, track) + setuppedFormats[forma] = struct{}{} + strea.AddReader(reader, medi, forma, readFunc) + } + + for _, media := range strea.Desc().Medias { for _, forma := range media.Formats { switch forma := forma.(type) { case *format.Opus: - track := &gohlslib.Track{ - Codec: &codecs.Opus{ - ChannelCount: forma.ChannelCount, - }, - } - muxer.Tracks = append(muxer.Tracks, track) - setuppedFormats[forma] = struct{}{} - - stream.AddReader(writer, media, forma, func(u unit.Unit) error { - tunit := u.(*unit.Opus) - - err := muxer.WriteOpus( - track, - tunit.NTP, - tunit.PTS, - tunit.Packets) - if err != nil { - return fmt.Errorf("muxer error: %w", err) - } - - return nil - }) - - case *format.MPEG4Audio: - co := forma.GetConfig() - if co != nil { - track := &gohlslib.Track{ - Codec: &codecs.MPEG4Audio{ - Config: *co, - }, - } - muxer.Tracks = append(muxer.Tracks, track) - setuppedFormats[forma] = struct{}{} - - stream.AddReader(writer, media, forma, func(u unit.Unit) error { - tunit := u.(*unit.MPEG4Audio) - - if tunit.AUs == nil { - return nil - } - - err := muxer.WriteMPEG4Audio( + track := &gohlslib.Track{Codec: &codecs.Opus{ + ChannelCount: forma.ChannelCount, + }} + + addTrack( + media, + forma, + track, + func(u unit.Unit) error { + tunit := u.(*unit.Opus) + + err := muxer.WriteOpus( track, tunit.NTP, tunit.PTS, - tunit.AUs) + tunit.Packets) if err != nil { return fmt.Errorf("muxer error: %w", err) } return nil }) + + case *format.MPEG4Audio: + co := forma.GetConfig() + if co != nil { + track := &gohlslib.Track{Codec: &codecs.MPEG4Audio{ + Config: *co, + }} + + addTrack( + media, + forma, + track, + func(u unit.Unit) error { + tunit := u.(*unit.MPEG4Audio) + + if tunit.AUs == nil { + return nil + } + + err := muxer.WriteMPEG4Audio( + track, + tunit.NTP, + tunit.PTS, + tunit.AUs) + if err != nil { + return fmt.Errorf("muxer error: %w", err) + } + + return nil + }) } } } @@ -217,22 +239,21 @@ func setupAudioTracks( // FromStream maps a MediaMTX stream to a HLS muxer. func FromStream( stream *stream.Stream, - writer *asyncwriter.Writer, + reader stream.Reader, muxer *gohlslib.Muxer, - l logger.Writer, ) error { setuppedFormats := make(map[format.Format]struct{}) setupVideoTrack( stream, - writer, + reader, muxer, setuppedFormats, ) setupAudioTracks( stream, - writer, + reader, muxer, setuppedFormats, ) @@ -245,7 +266,7 @@ func FromStream( for _, media := range stream.Desc().Medias { for _, forma := range media.Formats { if _, ok := setuppedFormats[forma]; !ok { - l.Log(logger.Warn, "skipping track %d (%s)", n, forma.Codec()) + reader.Log(logger.Warn, "skipping track %d (%s)", n, forma.Codec()) } n++ } diff --git a/internal/protocols/hls/from_stream_test.go b/internal/protocols/hls/from_stream_test.go index 5941508d28e..a04b968ad62 100644 --- a/internal/protocols/hls/from_stream_test.go +++ b/internal/protocols/hls/from_stream_test.go @@ -7,7 +7,6 @@ import ( "github.com/bluenviron/gohlslib/v2" "github.com/bluenviron/gortsplib/v4/pkg/description" "github.com/bluenviron/gortsplib/v4/pkg/format" - "github.com/bluenviron/mediamtx/internal/asyncwriter" "github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/stream" "github.com/bluenviron/mediamtx/internal/test" @@ -16,6 +15,7 @@ import ( func TestFromStreamNoSupportedCodecs(t *testing.T) { stream, err := stream.New( + 512, 1460, &description.Session{Medias: []*description.Media{{ Type: description.MediaTypeVideo, @@ -26,20 +26,19 @@ func TestFromStreamNoSupportedCodecs(t *testing.T) { ) require.NoError(t, err) - writer := asyncwriter.New(0, nil) - l := test.Logger(func(logger.Level, string, ...interface{}) { t.Error("should not happen") }) m := &gohlslib.Muxer{} - err = FromStream(stream, writer, m, l) + err = FromStream(stream, l, m) require.Equal(t, ErrNoSupportedCodecs, err) } func TestFromStreamSkipUnsupportedTracks(t *testing.T) { stream, err := stream.New( + 512, 1460, &description.Session{Medias: []*description.Media{ { @@ -60,8 +59,6 @@ func TestFromStreamSkipUnsupportedTracks(t *testing.T) { ) require.NoError(t, err) - writer := asyncwriter.New(0, nil) - m := &gohlslib.Muxer{} n := 0 @@ -77,7 +74,7 @@ func TestFromStreamSkipUnsupportedTracks(t *testing.T) { n++ }) - err = FromStream(stream, writer, m, l) + err = FromStream(stream, l, m) require.NoError(t, err) require.Equal(t, 2, n) } diff --git a/internal/protocols/mpegts/from_stream.go b/internal/protocols/mpegts/from_stream.go index b55dc517674..7e343129081 100644 --- a/internal/protocols/mpegts/from_stream.go +++ b/internal/protocols/mpegts/from_stream.go @@ -5,6 +5,7 @@ import ( "fmt" "time" + "github.com/bluenviron/gortsplib/v4/pkg/description" "github.com/bluenviron/gortsplib/v4/pkg/format" "github.com/bluenviron/mediacommon/pkg/codecs/ac3" "github.com/bluenviron/mediacommon/pkg/codecs/h264" @@ -12,7 +13,6 @@ import ( mcmpegts "github.com/bluenviron/mediacommon/pkg/formats/mpegts" srt "github.com/datarhei/gosrt" - "github.com/bluenviron/mediamtx/internal/asyncwriter" "github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/stream" "github.com/bluenviron/mediamtx/internal/unit" @@ -24,232 +24,263 @@ func durationGoToMPEGTS(v time.Duration) int64 { // FromStream maps a MediaMTX stream to a MPEG-TS writer. func FromStream( - stream *stream.Stream, - writer *asyncwriter.Writer, + strea *stream.Stream, + reader stream.Reader, bw *bufio.Writer, sconn srt.Conn, writeTimeout time.Duration, - l logger.Writer, ) error { var w *mcmpegts.Writer var tracks []*mcmpegts.Track setuppedFormats := make(map[format.Format]struct{}) - addTrack := func(forma format.Format, codec mcmpegts.Codec) *mcmpegts.Track { - track := &mcmpegts.Track{ - Codec: codec, - } + addTrack := func( + media *description.Media, + forma format.Format, + track *mcmpegts.Track, + readFunc stream.ReadFunc, + ) { tracks = append(tracks, track) setuppedFormats[forma] = struct{}{} - return track + strea.AddReader(reader, media, forma, readFunc) } - for _, medi := range stream.Desc().Medias { - for _, forma := range medi.Formats { + for _, media := range strea.Desc().Medias { + for _, forma := range media.Formats { switch forma := forma.(type) { case *format.H265: //nolint:dupl - track := addTrack(forma, &mcmpegts.CodecH265{}) + track := &mcmpegts.Track{Codec: &mcmpegts.CodecH265{}} var dtsExtractor *h265.DTSExtractor - stream.AddReader(writer, medi, forma, func(u unit.Unit) error { - tunit := u.(*unit.H265) - if tunit.AU == nil { - return nil - } + addTrack( + media, + forma, + track, + func(u unit.Unit) error { + tunit := u.(*unit.H265) + if tunit.AU == nil { + return nil + } - randomAccess := h265.IsRandomAccess(tunit.AU) + randomAccess := h265.IsRandomAccess(tunit.AU) - if dtsExtractor == nil { - if !randomAccess { - return nil + if dtsExtractor == nil { + if !randomAccess { + return nil + } + dtsExtractor = h265.NewDTSExtractor() + } + + dts, err := dtsExtractor.Extract(tunit.AU, tunit.PTS) + if err != nil { + return err } - dtsExtractor = h265.NewDTSExtractor() - } - - dts, err := dtsExtractor.Extract(tunit.AU, tunit.PTS) - if err != nil { - return err - } - - sconn.SetWriteDeadline(time.Now().Add(writeTimeout)) - err = (*w).WriteH265(track, durationGoToMPEGTS(tunit.PTS), durationGoToMPEGTS(dts), randomAccess, tunit.AU) - if err != nil { - return err - } - return bw.Flush() - }) + + sconn.SetWriteDeadline(time.Now().Add(writeTimeout)) + err = (*w).WriteH265(track, durationGoToMPEGTS(tunit.PTS), durationGoToMPEGTS(dts), randomAccess, tunit.AU) + if err != nil { + return err + } + return bw.Flush() + }) case *format.H264: //nolint:dupl - track := addTrack(forma, &mcmpegts.CodecH264{}) + track := &mcmpegts.Track{Codec: &mcmpegts.CodecH264{}} var dtsExtractor *h264.DTSExtractor - stream.AddReader(writer, medi, forma, func(u unit.Unit) error { - tunit := u.(*unit.H264) - if tunit.AU == nil { - return nil - } + addTrack( + media, + forma, + track, + func(u unit.Unit) error { + tunit := u.(*unit.H264) + if tunit.AU == nil { + return nil + } - idrPresent := h264.IDRPresent(tunit.AU) + idrPresent := h264.IDRPresent(tunit.AU) - if dtsExtractor == nil { - if !idrPresent { - return nil + if dtsExtractor == nil { + if !idrPresent { + return nil + } + dtsExtractor = h264.NewDTSExtractor() } - dtsExtractor = h264.NewDTSExtractor() - } - - dts, err := dtsExtractor.Extract(tunit.AU, tunit.PTS) - if err != nil { - return err - } - - sconn.SetWriteDeadline(time.Now().Add(writeTimeout)) - err = (*w).WriteH264(track, durationGoToMPEGTS(tunit.PTS), durationGoToMPEGTS(dts), idrPresent, tunit.AU) - if err != nil { - return err - } - return bw.Flush() - }) + + dts, err := dtsExtractor.Extract(tunit.AU, tunit.PTS) + if err != nil { + return err + } + + sconn.SetWriteDeadline(time.Now().Add(writeTimeout)) + err = (*w).WriteH264(track, durationGoToMPEGTS(tunit.PTS), durationGoToMPEGTS(dts), idrPresent, tunit.AU) + if err != nil { + return err + } + return bw.Flush() + }) case *format.MPEG4Video: - track := addTrack(forma, &mcmpegts.CodecMPEG4Video{}) + track := &mcmpegts.Track{Codec: &mcmpegts.CodecMPEG4Video{}} firstReceived := false var lastPTS time.Duration - stream.AddReader(writer, medi, forma, func(u unit.Unit) error { - tunit := u.(*unit.MPEG4Video) - if tunit.Frame == nil { - return nil - } - - if !firstReceived { - firstReceived = true - } else if tunit.PTS < lastPTS { - return fmt.Errorf("MPEG-4 Video streams with B-frames are not supported (yet)") - } - lastPTS = tunit.PTS - - sconn.SetWriteDeadline(time.Now().Add(writeTimeout)) - err := (*w).WriteMPEG4Video(track, durationGoToMPEGTS(tunit.PTS), tunit.Frame) - if err != nil { - return err - } - return bw.Flush() - }) + addTrack( + media, + forma, + track, + func(u unit.Unit) error { + tunit := u.(*unit.MPEG4Video) + if tunit.Frame == nil { + return nil + } + + if !firstReceived { + firstReceived = true + } else if tunit.PTS < lastPTS { + return fmt.Errorf("MPEG-4 Video streams with B-frames are not supported (yet)") + } + lastPTS = tunit.PTS + + sconn.SetWriteDeadline(time.Now().Add(writeTimeout)) + err := (*w).WriteMPEG4Video(track, durationGoToMPEGTS(tunit.PTS), tunit.Frame) + if err != nil { + return err + } + return bw.Flush() + }) case *format.MPEG1Video: - track := addTrack(forma, &mcmpegts.CodecMPEG1Video{}) + track := &mcmpegts.Track{Codec: &mcmpegts.CodecMPEG1Video{}} firstReceived := false var lastPTS time.Duration - stream.AddReader(writer, medi, forma, func(u unit.Unit) error { - tunit := u.(*unit.MPEG1Video) - if tunit.Frame == nil { - return nil - } - - if !firstReceived { - firstReceived = true - } else if tunit.PTS < lastPTS { - return fmt.Errorf("MPEG-1 Video streams with B-frames are not supported (yet)") - } - lastPTS = tunit.PTS - - sconn.SetWriteDeadline(time.Now().Add(writeTimeout)) - err := (*w).WriteMPEG1Video(track, durationGoToMPEGTS(tunit.PTS), tunit.Frame) - if err != nil { - return err - } - return bw.Flush() - }) + addTrack( + media, + forma, + track, + func(u unit.Unit) error { + tunit := u.(*unit.MPEG1Video) + if tunit.Frame == nil { + return nil + } + + if !firstReceived { + firstReceived = true + } else if tunit.PTS < lastPTS { + return fmt.Errorf("MPEG-1 Video streams with B-frames are not supported (yet)") + } + lastPTS = tunit.PTS + + sconn.SetWriteDeadline(time.Now().Add(writeTimeout)) + err := (*w).WriteMPEG1Video(track, durationGoToMPEGTS(tunit.PTS), tunit.Frame) + if err != nil { + return err + } + return bw.Flush() + }) case *format.Opus: - track := addTrack(forma, &mcmpegts.CodecOpus{ + track := &mcmpegts.Track{Codec: &mcmpegts.CodecOpus{ ChannelCount: forma.ChannelCount, - }) - - stream.AddReader(writer, medi, forma, func(u unit.Unit) error { - tunit := u.(*unit.Opus) - if tunit.Packets == nil { - return nil - } - - sconn.SetWriteDeadline(time.Now().Add(writeTimeout)) - err := (*w).WriteOpus(track, durationGoToMPEGTS(tunit.PTS), tunit.Packets) - if err != nil { - return err - } - return bw.Flush() - }) + }} + + addTrack( + media, + forma, + track, + func(u unit.Unit) error { + tunit := u.(*unit.Opus) + if tunit.Packets == nil { + return nil + } + + sconn.SetWriteDeadline(time.Now().Add(writeTimeout)) + err := (*w).WriteOpus(track, durationGoToMPEGTS(tunit.PTS), tunit.Packets) + if err != nil { + return err + } + return bw.Flush() + }) case *format.MPEG4Audio: co := forma.GetConfig() - if co == nil { - return fmt.Errorf("MPEG-4 audio tracks without explicit configuration are not supported") + if co != nil { + track := &mcmpegts.Track{Codec: &mcmpegts.CodecMPEG4Audio{ + Config: *co, + }} + + addTrack( + media, + forma, + track, + func(u unit.Unit) error { + tunit := u.(*unit.MPEG4Audio) + if tunit.AUs == nil { + return nil + } + + sconn.SetWriteDeadline(time.Now().Add(writeTimeout)) + err := (*w).WriteMPEG4Audio(track, durationGoToMPEGTS(tunit.PTS), tunit.AUs) + if err != nil { + return err + } + return bw.Flush() + }) } - track := addTrack(forma, &mcmpegts.CodecMPEG4Audio{ - Config: *co, - }) - - stream.AddReader(writer, medi, forma, func(u unit.Unit) error { - tunit := u.(*unit.MPEG4Audio) - if tunit.AUs == nil { - return nil - } - - sconn.SetWriteDeadline(time.Now().Add(writeTimeout)) - err := (*w).WriteMPEG4Audio(track, durationGoToMPEGTS(tunit.PTS), tunit.AUs) - if err != nil { - return err - } - return bw.Flush() - }) - case *format.MPEG1Audio: - track := addTrack(forma, &mcmpegts.CodecMPEG1Audio{}) - - stream.AddReader(writer, medi, forma, func(u unit.Unit) error { - tunit := u.(*unit.MPEG1Audio) - if tunit.Frames == nil { - return nil - } - - sconn.SetWriteDeadline(time.Now().Add(writeTimeout)) - err := (*w).WriteMPEG1Audio(track, durationGoToMPEGTS(tunit.PTS), tunit.Frames) - if err != nil { - return err - } - return bw.Flush() - }) + track := &mcmpegts.Track{Codec: &mcmpegts.CodecMPEG1Audio{}} + + addTrack( + media, + forma, + track, + func(u unit.Unit) error { + tunit := u.(*unit.MPEG1Audio) + if tunit.Frames == nil { + return nil + } + + sconn.SetWriteDeadline(time.Now().Add(writeTimeout)) + err := (*w).WriteMPEG1Audio(track, durationGoToMPEGTS(tunit.PTS), tunit.Frames) + if err != nil { + return err + } + return bw.Flush() + }) case *format.AC3: - track := addTrack(forma, &mcmpegts.CodecAC3{}) + track := &mcmpegts.Track{Codec: &mcmpegts.CodecAC3{}} sampleRate := time.Duration(forma.SampleRate) - stream.AddReader(writer, medi, forma, func(u unit.Unit) error { - tunit := u.(*unit.AC3) - if tunit.Frames == nil { - return nil - } + addTrack( + media, + forma, + track, + func(u unit.Unit) error { + tunit := u.(*unit.AC3) + if tunit.Frames == nil { + return nil + } - for i, frame := range tunit.Frames { - framePTS := tunit.PTS + time.Duration(i)*ac3.SamplesPerFrame* - time.Second/sampleRate + for i, frame := range tunit.Frames { + framePTS := tunit.PTS + time.Duration(i)*ac3.SamplesPerFrame* + time.Second/sampleRate - sconn.SetWriteDeadline(time.Now().Add(writeTimeout)) - err := (*w).WriteAC3(track, durationGoToMPEGTS(framePTS), frame) - if err != nil { - return err + sconn.SetWriteDeadline(time.Now().Add(writeTimeout)) + err := (*w).WriteAC3(track, durationGoToMPEGTS(framePTS), frame) + if err != nil { + return err + } } - } - return bw.Flush() - }) + return bw.Flush() + }) } } } @@ -259,10 +290,10 @@ func FromStream( } n := 1 - for _, medi := range stream.Desc().Medias { + for _, medi := range strea.Desc().Medias { for _, forma := range medi.Formats { if _, ok := setuppedFormats[forma]; !ok { - l.Log(logger.Warn, "skipping track %d (%s)", n, forma.Codec()) + reader.Log(logger.Warn, "skipping track %d (%s)", n, forma.Codec()) } n++ } diff --git a/internal/protocols/mpegts/from_stream_test.go b/internal/protocols/mpegts/from_stream_test.go index 32547807e6e..90595a70b78 100644 --- a/internal/protocols/mpegts/from_stream_test.go +++ b/internal/protocols/mpegts/from_stream_test.go @@ -6,7 +6,6 @@ import ( "github.com/bluenviron/gortsplib/v4/pkg/description" "github.com/bluenviron/gortsplib/v4/pkg/format" - "github.com/bluenviron/mediamtx/internal/asyncwriter" "github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/stream" "github.com/bluenviron/mediamtx/internal/test" @@ -15,6 +14,7 @@ import ( func TestFromStreamNoSupportedCodecs(t *testing.T) { stream, err := stream.New( + 512, 1460, &description.Session{Medias: []*description.Media{{ Type: description.MediaTypeVideo, @@ -25,18 +25,17 @@ func TestFromStreamNoSupportedCodecs(t *testing.T) { ) require.NoError(t, err) - writer := asyncwriter.New(0, nil) - l := test.Logger(func(logger.Level, string, ...interface{}) { t.Error("should not happen") }) - err = FromStream(stream, writer, nil, nil, 0, l) + err = FromStream(stream, l, nil, nil, 0) require.Equal(t, errNoSupportedCodecs, err) } func TestFromStreamSkipUnsupportedTracks(t *testing.T) { stream, err := stream.New( + 512, 1460, &description.Session{Medias: []*description.Media{ { @@ -53,8 +52,6 @@ func TestFromStreamSkipUnsupportedTracks(t *testing.T) { ) require.NoError(t, err) - writer := asyncwriter.New(0, nil) - n := 0 l := test.Logger(func(l logger.Level, format string, args ...interface{}) { @@ -65,7 +62,7 @@ func TestFromStreamSkipUnsupportedTracks(t *testing.T) { n++ }) - err = FromStream(stream, writer, nil, nil, 0, l) + err = FromStream(stream, l, nil, nil, 0) require.NoError(t, err) require.Equal(t, 1, n) } diff --git a/internal/protocols/rtmp/from_stream.go b/internal/protocols/rtmp/from_stream.go index ed68214b2cd..e3414bd5f9b 100644 --- a/internal/protocols/rtmp/from_stream.go +++ b/internal/protocols/rtmp/from_stream.go @@ -10,7 +10,6 @@ import ( "github.com/bluenviron/mediacommon/pkg/codecs/h264" "github.com/bluenviron/mediacommon/pkg/codecs/mpeg1audio" "github.com/bluenviron/mediacommon/pkg/codecs/mpeg4audio" - "github.com/bluenviron/mediamtx/internal/asyncwriter" "github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/stream" "github.com/bluenviron/mediamtx/internal/unit" @@ -20,69 +19,73 @@ var errNoSupportedCodecsFrom = errors.New( "the stream doesn't contain any supported codec, which are currently H264, MPEG-4 Audio, MPEG-1/2 Audio") func setupVideo( - stream *stream.Stream, - writer *asyncwriter.Writer, + strea *stream.Stream, + reader stream.Reader, w **Writer, nconn net.Conn, writeTimeout time.Duration, ) format.Format { var videoFormatH264 *format.H264 - videoMedia := stream.Desc().FindFormat(&videoFormatH264) + videoMedia := strea.Desc().FindFormat(&videoFormatH264) if videoFormatH264 != nil { var videoDTSExtractor *h264.DTSExtractor - stream.AddReader(writer, videoMedia, videoFormatH264, func(u unit.Unit) error { - tunit := u.(*unit.H264) - - if tunit.AU == nil { - return nil - } - - idrPresent := false - nonIDRPresent := false - - for _, nalu := range tunit.AU { - typ := h264.NALUType(nalu[0] & 0x1F) - switch typ { - case h264.NALUTypeIDR: - idrPresent = true - - case h264.NALUTypeNonIDR: - nonIDRPresent = true - } - } + strea.AddReader( + reader, + videoMedia, + videoFormatH264, + func(u unit.Unit) error { + tunit := u.(*unit.H264) - var dts time.Duration - - // wait until we receive an IDR - if videoDTSExtractor == nil { - if !idrPresent { + if tunit.AU == nil { return nil } - videoDTSExtractor = h264.NewDTSExtractor() + idrPresent := false + nonIDRPresent := false - var err error - dts, err = videoDTSExtractor.Extract(tunit.AU, tunit.PTS) - if err != nil { - return err - } - } else { - if !idrPresent && !nonIDRPresent { - return nil + for _, nalu := range tunit.AU { + typ := h264.NALUType(nalu[0] & 0x1F) + switch typ { + case h264.NALUTypeIDR: + idrPresent = true + + case h264.NALUTypeNonIDR: + nonIDRPresent = true + } } - var err error - dts, err = videoDTSExtractor.Extract(tunit.AU, tunit.PTS) - if err != nil { - return err + var dts time.Duration + + // wait until we receive an IDR + if videoDTSExtractor == nil { + if !idrPresent { + return nil + } + + videoDTSExtractor = h264.NewDTSExtractor() + + var err error + dts, err = videoDTSExtractor.Extract(tunit.AU, tunit.PTS) + if err != nil { + return err + } + } else { + if !idrPresent && !nonIDRPresent { + return nil + } + + var err error + dts, err = videoDTSExtractor.Extract(tunit.AU, tunit.PTS) + if err != nil { + return err + } } - } - nconn.SetWriteDeadline(time.Now().Add(writeTimeout)) - return (*w).WriteH264(tunit.PTS, dts, idrPresent, tunit.AU) - }) + nconn.SetWriteDeadline(time.Now().Add(writeTimeout)) + return (*w).WriteH264(tunit.PTS, dts, idrPresent, tunit.AU) + }) return videoFormatH264 } @@ -91,73 +94,81 @@ func setupVideo( } func setupAudio( - stream *stream.Stream, - writer *asyncwriter.Writer, + strea *stream.Stream, + reader stream.Reader, w **Writer, nconn net.Conn, writeTimeout time.Duration, ) format.Format { var audioFormatMPEG4Audio *format.MPEG4Audio - audioMedia := stream.Desc().FindFormat(&audioFormatMPEG4Audio) + audioMedia := strea.Desc().FindFormat(&audioFormatMPEG4Audio) if audioMedia != nil { - stream.AddReader(writer, audioMedia, audioFormatMPEG4Audio, func(u unit.Unit) error { - tunit := u.(*unit.MPEG4Audio) - - if tunit.AUs == nil { - return nil - } + strea.AddReader( + reader, + audioMedia, + audioFormatMPEG4Audio, + func(u unit.Unit) error { + tunit := u.(*unit.MPEG4Audio) + + if tunit.AUs == nil { + return nil + } - for i, au := range tunit.AUs { - nconn.SetWriteDeadline(time.Now().Add(writeTimeout)) - err := (*w).WriteMPEG4Audio( - tunit.PTS+time.Duration(i)*mpeg4audio.SamplesPerAccessUnit* - time.Second/time.Duration(audioFormatMPEG4Audio.ClockRate()), - au, - ) - if err != nil { - return err + for i, au := range tunit.AUs { + nconn.SetWriteDeadline(time.Now().Add(writeTimeout)) + err := (*w).WriteMPEG4Audio( + tunit.PTS+time.Duration(i)*mpeg4audio.SamplesPerAccessUnit* + time.Second/time.Duration(audioFormatMPEG4Audio.ClockRate()), + au, + ) + if err != nil { + return err + } } - } - return nil - }) + return nil + }) return audioFormatMPEG4Audio } var audioFormatMPEG1 *format.MPEG1Audio - audioMedia = stream.Desc().FindFormat(&audioFormatMPEG1) + audioMedia = strea.Desc().FindFormat(&audioFormatMPEG1) if audioMedia != nil { - stream.AddReader(writer, audioMedia, audioFormatMPEG1, func(u unit.Unit) error { - tunit := u.(*unit.MPEG1Audio) - - pts := tunit.PTS - - for _, frame := range tunit.Frames { - var h mpeg1audio.FrameHeader - err := h.Unmarshal(frame) - if err != nil { - return err + strea.AddReader( + reader, + audioMedia, + audioFormatMPEG1, + func(u unit.Unit) error { + tunit := u.(*unit.MPEG1Audio) + + pts := tunit.PTS + + for _, frame := range tunit.Frames { + var h mpeg1audio.FrameHeader + err := h.Unmarshal(frame) + if err != nil { + return err + } + + if !(!h.MPEG2 && h.Layer == 3) { + return fmt.Errorf("RTMP only supports MPEG-1 layer 3 audio") + } + + nconn.SetWriteDeadline(time.Now().Add(writeTimeout)) + err = (*w).WriteMPEG1Audio(pts, &h, frame) + if err != nil { + return err + } + + pts += time.Duration(h.SampleCount()) * + time.Second / time.Duration(h.SampleRate) } - if !(!h.MPEG2 && h.Layer == 3) { - return fmt.Errorf("RTMP only supports MPEG-1 layer 3 audio") - } - - nconn.SetWriteDeadline(time.Now().Add(writeTimeout)) - err = (*w).WriteMPEG1Audio(pts, &h, frame) - if err != nil { - return err - } - - pts += time.Duration(h.SampleCount()) * - time.Second / time.Duration(h.SampleRate) - } - - return nil - }) + return nil + }) return audioFormatMPEG1 } @@ -168,17 +179,16 @@ func setupAudio( // FromStream maps a MediaMTX stream to a RTMP stream. func FromStream( stream *stream.Stream, - writer *asyncwriter.Writer, + reader stream.Reader, conn *Conn, nconn net.Conn, writeTimeout time.Duration, - l logger.Writer, ) error { var w *Writer videoFormat := setupVideo( stream, - writer, + reader, &w, nconn, writeTimeout, @@ -186,7 +196,7 @@ func FromStream( audioFormat := setupAudio( stream, - writer, + reader, &w, nconn, writeTimeout, @@ -206,7 +216,7 @@ func FromStream( for _, media := range stream.Desc().Medias { for _, forma := range media.Formats { if forma != videoFormat && forma != audioFormat { - l.Log(logger.Warn, "skipping track %d (%s)", n, forma.Codec()) + reader.Log(logger.Warn, "skipping track %d (%s)", n, forma.Codec()) } n++ } diff --git a/internal/protocols/rtmp/from_stream_test.go b/internal/protocols/rtmp/from_stream_test.go index 1e9daa5014e..0c23b0e5d6a 100644 --- a/internal/protocols/rtmp/from_stream_test.go +++ b/internal/protocols/rtmp/from_stream_test.go @@ -7,7 +7,6 @@ import ( "github.com/bluenviron/gortsplib/v4/pkg/description" "github.com/bluenviron/gortsplib/v4/pkg/format" - "github.com/bluenviron/mediamtx/internal/asyncwriter" "github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/protocols/rtmp/bytecounter" "github.com/bluenviron/mediamtx/internal/protocols/rtmp/message" @@ -18,6 +17,7 @@ import ( func TestFromStreamNoSupportedCodecs(t *testing.T) { stream, err := stream.New( + 512, 1460, &description.Session{Medias: []*description.Media{{ Type: description.MediaTypeVideo, @@ -28,18 +28,17 @@ func TestFromStreamNoSupportedCodecs(t *testing.T) { ) require.NoError(t, err) - writer := asyncwriter.New(0, nil) - l := test.Logger(func(logger.Level, string, ...interface{}) { t.Error("should not happen") }) - err = FromStream(stream, writer, nil, nil, 0, l) + err = FromStream(stream, l, nil, nil, 0) require.Equal(t, errNoSupportedCodecsFrom, err) } func TestFromStreamSkipUnsupportedTracks(t *testing.T) { stream, err := stream.New( + 512, 1460, &description.Session{Medias: []*description.Media{ { @@ -60,8 +59,6 @@ func TestFromStreamSkipUnsupportedTracks(t *testing.T) { ) require.NoError(t, err) - writer := asyncwriter.New(0, nil) - n := 0 l := test.Logger(func(l logger.Level, format string, args ...interface{}) { @@ -79,7 +76,7 @@ func TestFromStreamSkipUnsupportedTracks(t *testing.T) { bc := bytecounter.NewReadWriter(&buf) conn := &Conn{mrw: message.NewReadWriter(&buf, bc, false)} - err = FromStream(stream, writer, conn, nil, 0, l) + err = FromStream(stream, l, conn, nil, 0) require.NoError(t, err) require.Equal(t, 2, n) } diff --git a/internal/protocols/webrtc/from_stream.go b/internal/protocols/webrtc/from_stream.go index 8d859646cdc..7411eb730db 100644 --- a/internal/protocols/webrtc/from_stream.go +++ b/internal/protocols/webrtc/from_stream.go @@ -13,7 +13,6 @@ import ( "github.com/bluenviron/gortsplib/v4/pkg/format/rtpvp8" "github.com/bluenviron/gortsplib/v4/pkg/format/rtpvp9" "github.com/bluenviron/mediacommon/pkg/codecs/g711" - "github.com/bluenviron/mediamtx/internal/asyncwriter" "github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/stream" "github.com/bluenviron/mediamtx/internal/unit" @@ -42,7 +41,7 @@ func randUint32() (uint32, error) { func setupVideoTrack( stream *stream.Stream, - writer *asyncwriter.Writer, + reader stream.Reader, pc *PeerConnection, ) (format.Format, error) { var av1Format *format.AV1 @@ -66,25 +65,29 @@ func setupVideoTrack( return nil, err } - stream.AddReader(writer, media, av1Format, func(u unit.Unit) error { - tunit := u.(*unit.AV1) + stream.AddReader( + reader, + media, + av1Format, + func(u unit.Unit) error { + tunit := u.(*unit.AV1) - if tunit.TU == nil { - return nil - } + if tunit.TU == nil { + return nil + } - packets, err := encoder.Encode(tunit.TU) - if err != nil { - return nil //nolint:nilerr - } + packets, err := encoder.Encode(tunit.TU) + if err != nil { + return nil //nolint:nilerr + } - for _, pkt := range packets { - pkt.Timestamp += tunit.RTPPackets[0].Timestamp - track.WriteRTP(pkt) //nolint:errcheck - } + for _, pkt := range packets { + pkt.Timestamp += tunit.RTPPackets[0].Timestamp + track.WriteRTP(pkt) //nolint:errcheck + } - return nil - }) + return nil + }) return av1Format, nil } @@ -112,25 +115,29 @@ func setupVideoTrack( return nil, err } - stream.AddReader(writer, media, vp9Format, func(u unit.Unit) error { - tunit := u.(*unit.VP9) + stream.AddReader( + reader, + media, + vp9Format, + func(u unit.Unit) error { + tunit := u.(*unit.VP9) - if tunit.Frame == nil { - return nil - } + if tunit.Frame == nil { + return nil + } - packets, err := encoder.Encode(tunit.Frame) - if err != nil { - return nil //nolint:nilerr - } + packets, err := encoder.Encode(tunit.Frame) + if err != nil { + return nil //nolint:nilerr + } - for _, pkt := range packets { - pkt.Timestamp += tunit.RTPPackets[0].Timestamp - track.WriteRTP(pkt) //nolint:errcheck - } + for _, pkt := range packets { + pkt.Timestamp += tunit.RTPPackets[0].Timestamp + track.WriteRTP(pkt) //nolint:errcheck + } - return nil - }) + return nil + }) return vp9Format, nil } @@ -156,25 +163,29 @@ func setupVideoTrack( return nil, err } - stream.AddReader(writer, media, vp8Format, func(u unit.Unit) error { - tunit := u.(*unit.VP8) + stream.AddReader( + reader, + media, + vp8Format, + func(u unit.Unit) error { + tunit := u.(*unit.VP8) - if tunit.Frame == nil { - return nil - } + if tunit.Frame == nil { + return nil + } - packets, err := encoder.Encode(tunit.Frame) - if err != nil { - return nil //nolint:nilerr - } + packets, err := encoder.Encode(tunit.Frame) + if err != nil { + return nil //nolint:nilerr + } - for _, pkt := range packets { - pkt.Timestamp += tunit.RTPPackets[0].Timestamp - track.WriteRTP(pkt) //nolint:errcheck - } + for _, pkt := range packets { + pkt.Timestamp += tunit.RTPPackets[0].Timestamp + track.WriteRTP(pkt) //nolint:errcheck + } - return nil - }) + return nil + }) return vp8Format, nil } @@ -204,32 +215,36 @@ func setupVideoTrack( firstReceived := false var lastPTS time.Duration - stream.AddReader(writer, media, h264Format, func(u unit.Unit) error { - tunit := u.(*unit.H264) + stream.AddReader( + reader, + media, + h264Format, + func(u unit.Unit) error { + tunit := u.(*unit.H264) - if tunit.AU == nil { - return nil - } + if tunit.AU == nil { + return nil + } - if !firstReceived { - firstReceived = true - } else if tunit.PTS < lastPTS { - return fmt.Errorf("WebRTC doesn't support H264 streams with B-frames") - } - lastPTS = tunit.PTS + if !firstReceived { + firstReceived = true + } else if tunit.PTS < lastPTS { + return fmt.Errorf("WebRTC doesn't support H264 streams with B-frames") + } + lastPTS = tunit.PTS - packets, err := encoder.Encode(tunit.AU) - if err != nil { - return nil //nolint:nilerr - } + packets, err := encoder.Encode(tunit.AU) + if err != nil { + return nil //nolint:nilerr + } - for _, pkt := range packets { - pkt.Timestamp += tunit.RTPPackets[0].Timestamp - track.WriteRTP(pkt) //nolint:errcheck - } + for _, pkt := range packets { + pkt.Timestamp += tunit.RTPPackets[0].Timestamp + track.WriteRTP(pkt) //nolint:errcheck + } - return nil - }) + return nil + }) return h264Format, nil } @@ -239,7 +254,7 @@ func setupVideoTrack( func setupAudioTrack( stream *stream.Stream, - writer *asyncwriter.Writer, + reader stream.Reader, pc *PeerConnection, ) (format.Format, error) { var opusFormat *format.Opus @@ -280,13 +295,17 @@ func setupAudioTrack( } pc.OutgoingTracks = append(pc.OutgoingTracks, track) - stream.AddReader(writer, media, opusFormat, func(u unit.Unit) error { - for _, pkt := range u.GetRTPPackets() { - track.WriteRTP(pkt) //nolint:errcheck - } + stream.AddReader( + reader, + media, + opusFormat, + func(u unit.Unit) error { + for _, pkt := range u.GetRTPPackets() { + track.WriteRTP(pkt) //nolint:errcheck + } - return nil - }) + return nil + }) return opusFormat, nil } @@ -303,13 +322,17 @@ func setupAudioTrack( } pc.OutgoingTracks = append(pc.OutgoingTracks, track) - stream.AddReader(writer, media, g722Format, func(u unit.Unit) error { - for _, pkt := range u.GetRTPPackets() { - track.WriteRTP(pkt) //nolint:errcheck - } + stream.AddReader( + reader, + media, + g722Format, + func(u unit.Unit) error { + for _, pkt := range u.GetRTPPackets() { + track.WriteRTP(pkt) //nolint:errcheck + } - return nil - }) + return nil + }) return g722Format, nil } @@ -378,18 +401,22 @@ func setupAudioTrack( return nil, err } - stream.AddReader(writer, media, g711Format, func(u unit.Unit) error { - for _, pkt := range u.GetRTPPackets() { - // recompute timestamp from scratch. - // Chrome requires a precise timestamp that FFmpeg doesn't provide. - pkt.Timestamp = curTimestamp - curTimestamp += uint32(len(pkt.Payload)) / uint32(g711Format.ChannelCount) - - track.WriteRTP(pkt) //nolint:errcheck - } + stream.AddReader( + reader, + media, + g711Format, + func(u unit.Unit) error { + for _, pkt := range u.GetRTPPackets() { + // recompute timestamp from scratch. + // Chrome requires a precise timestamp that FFmpeg doesn't provide. + pkt.Timestamp = curTimestamp + curTimestamp += uint32(len(pkt.Payload)) / uint32(g711Format.ChannelCount) + + track.WriteRTP(pkt) //nolint:errcheck + } - return nil - }) + return nil + }) } else { encoder := &rtplpcm.Encoder{ PayloadType: 96, @@ -407,36 +434,40 @@ func setupAudioTrack( return nil, err } - stream.AddReader(writer, media, g711Format, func(u unit.Unit) error { - tunit := u.(*unit.G711) + stream.AddReader( + reader, + media, + g711Format, + func(u unit.Unit) error { + tunit := u.(*unit.G711) - if tunit.Samples == nil { - return nil - } + if tunit.Samples == nil { + return nil + } - var lpcmSamples []byte - if g711Format.MULaw { - lpcmSamples = g711.DecodeMulaw(tunit.Samples) - } else { - lpcmSamples = g711.DecodeAlaw(tunit.Samples) - } + var lpcmSamples []byte + if g711Format.MULaw { + lpcmSamples = g711.DecodeMulaw(tunit.Samples) + } else { + lpcmSamples = g711.DecodeAlaw(tunit.Samples) + } - packets, err := encoder.Encode(lpcmSamples) - if err != nil { - return nil //nolint:nilerr - } + packets, err := encoder.Encode(lpcmSamples) + if err != nil { + return nil //nolint:nilerr + } - for _, pkt := range packets { - // recompute timestamp from scratch. - // Chrome requires a precise timestamp that FFmpeg doesn't provide. - pkt.Timestamp = curTimestamp - curTimestamp += uint32(len(pkt.Payload)) / 2 / uint32(g711Format.ChannelCount) + for _, pkt := range packets { + // recompute timestamp from scratch. + // Chrome requires a precise timestamp that FFmpeg doesn't provide. + pkt.Timestamp = curTimestamp + curTimestamp += uint32(len(pkt.Payload)) / 2 / uint32(g711Format.ChannelCount) - track.WriteRTP(pkt) //nolint:errcheck - } + track.WriteRTP(pkt) //nolint:errcheck + } - return nil - }) + return nil + }) } return g711Format, nil @@ -486,29 +517,33 @@ func setupAudioTrack( return nil, err } - stream.AddReader(writer, media, lpcmFormat, func(u unit.Unit) error { - tunit := u.(*unit.LPCM) + stream.AddReader( + reader, + media, + lpcmFormat, + func(u unit.Unit) error { + tunit := u.(*unit.LPCM) - if tunit.Samples == nil { - return nil - } + if tunit.Samples == nil { + return nil + } - packets, err := encoder.Encode(tunit.Samples) - if err != nil { - return nil //nolint:nilerr - } + packets, err := encoder.Encode(tunit.Samples) + if err != nil { + return nil //nolint:nilerr + } - for _, pkt := range packets { - // recompute timestamp from scratch. - // Chrome requires a precise timestamp that FFmpeg doesn't provide. - pkt.Timestamp = curTimestamp - curTimestamp += uint32(len(pkt.Payload)) / 2 / uint32(lpcmFormat.ChannelCount) + for _, pkt := range packets { + // recompute timestamp from scratch. + // Chrome requires a precise timestamp that FFmpeg doesn't provide. + pkt.Timestamp = curTimestamp + curTimestamp += uint32(len(pkt.Payload)) / 2 / uint32(lpcmFormat.ChannelCount) - track.WriteRTP(pkt) //nolint:errcheck - } + track.WriteRTP(pkt) //nolint:errcheck + } - return nil - }) + return nil + }) return lpcmFormat, nil } @@ -519,16 +554,15 @@ func setupAudioTrack( // FromStream maps a MediaMTX stream to a WebRTC connection func FromStream( stream *stream.Stream, - writer *asyncwriter.Writer, + reader stream.Reader, pc *PeerConnection, - l logger.Writer, ) error { - videoFormat, err := setupVideoTrack(stream, writer, pc) + videoFormat, err := setupVideoTrack(stream, reader, pc) if err != nil { return err } - audioFormat, err := setupAudioTrack(stream, writer, pc) + audioFormat, err := setupAudioTrack(stream, reader, pc) if err != nil { return err } @@ -541,7 +575,7 @@ func FromStream( for _, media := range stream.Desc().Medias { for _, forma := range media.Formats { if forma != videoFormat && forma != audioFormat { - l.Log(logger.Warn, "skipping track %d (%s)", n, forma.Codec()) + reader.Log(logger.Warn, "skipping track %d (%s)", n, forma.Codec()) } n++ } diff --git a/internal/protocols/webrtc/from_stream_test.go b/internal/protocols/webrtc/from_stream_test.go index b3751919471..25c73a9e8a6 100644 --- a/internal/protocols/webrtc/from_stream_test.go +++ b/internal/protocols/webrtc/from_stream_test.go @@ -6,7 +6,6 @@ import ( "github.com/bluenviron/gortsplib/v4/pkg/description" "github.com/bluenviron/gortsplib/v4/pkg/format" - "github.com/bluenviron/mediamtx/internal/asyncwriter" "github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/stream" "github.com/bluenviron/mediamtx/internal/test" @@ -15,6 +14,7 @@ import ( func TestFromStreamNoSupportedCodecs(t *testing.T) { stream, err := stream.New( + 512, 1460, &description.Session{Medias: []*description.Media{{ Type: description.MediaTypeVideo, @@ -25,18 +25,17 @@ func TestFromStreamNoSupportedCodecs(t *testing.T) { ) require.NoError(t, err) - writer := asyncwriter.New(0, nil) - l := test.Logger(func(logger.Level, string, ...interface{}) { t.Error("should not happen") }) - err = FromStream(stream, writer, nil, l) + err = FromStream(stream, l, nil) require.Equal(t, errNoSupportedCodecsFrom, err) } func TestFromStreamSkipUnsupportedTracks(t *testing.T) { stream, err := stream.New( + 512, 1460, &description.Session{Medias: []*description.Media{ { @@ -53,8 +52,6 @@ func TestFromStreamSkipUnsupportedTracks(t *testing.T) { ) require.NoError(t, err) - writer := asyncwriter.New(0, nil) - n := 0 l := test.Logger(func(l logger.Level, format string, args ...interface{}) { @@ -67,7 +64,7 @@ func TestFromStreamSkipUnsupportedTracks(t *testing.T) { pc := &PeerConnection{} - err = FromStream(stream, writer, pc, l) + err = FromStream(stream, l, pc) require.NoError(t, err) require.Equal(t, 1, n) } @@ -79,6 +76,7 @@ func TestFromStream(t *testing.T) { } t.Run(ca.name, func(t *testing.T) { stream, err := stream.New( + 512, 1460, &description.Session{ Medias: []*description.Media{{ @@ -91,11 +89,9 @@ func TestFromStream(t *testing.T) { require.NoError(t, err) defer stream.Close() - writer := asyncwriter.New(0, nil) - pc := &PeerConnection{} - err = FromStream(stream, writer, pc, nil) + err = FromStream(stream, nil, pc) require.NoError(t, err) require.Equal(t, ca.webrtcCaps, pc.OutgoingTracks[0].Caps) diff --git a/internal/recorder/format_fmp4.go b/internal/recorder/format_fmp4.go index ee45c059347..c152c4ec087 100644 --- a/internal/recorder/format_fmp4.go +++ b/internal/recorder/format_fmp4.go @@ -100,7 +100,7 @@ func jpegExtractSize(image []byte) (int, int, error) { } type formatFMP4 struct { - ai *agentInstance + ai *recorderInstance tracks []*formatFMP4Track hasVideo bool @@ -153,50 +153,54 @@ func (f *formatFMP4) initialize() { firstReceived := false - f.ai.agent.Stream.AddReader(f.ai.writer, media, forma, func(u unit.Unit) error { - tunit := u.(*unit.AV1) - if tunit.TU == nil { - return nil - } + f.ai.agent.Stream.AddReader( + f.ai, + media, + forma, + func(u unit.Unit) error { + tunit := u.(*unit.AV1) + if tunit.TU == nil { + return nil + } - randomAccess := false + randomAccess := false - for _, obu := range tunit.TU { - var h av1.OBUHeader - err := h.Unmarshal(obu) - if err != nil { - return err - } + for _, obu := range tunit.TU { + var h av1.OBUHeader + err := h.Unmarshal(obu) + if err != nil { + return err + } - if h.Type == av1.OBUTypeSequenceHeader { - if !bytes.Equal(codec.SequenceHeader, obu) { - codec.SequenceHeader = obu - updateCodecs() + if h.Type == av1.OBUTypeSequenceHeader { + if !bytes.Equal(codec.SequenceHeader, obu) { + codec.SequenceHeader = obu + updateCodecs() + } + randomAccess = true } - randomAccess = true } - } - if !firstReceived { - if !randomAccess { - return nil + if !firstReceived { + if !randomAccess { + return nil + } + firstReceived = true } - firstReceived = true - } - sampl, err := fmp4.NewPartSampleAV1( - randomAccess, - tunit.TU) - if err != nil { - return err - } + sampl, err := fmp4.NewPartSampleAV1( + randomAccess, + tunit.TU) + if err != nil { + return err + } - return track.write(&sample{ - PartSample: sampl, - dts: tunit.PTS, - ntp: tunit.NTP, + return track.write(&sample{ + PartSample: sampl, + dts: tunit.PTS, + ntp: tunit.NTP, + }) }) - }) case *rtspformat.VP9: codec := &fmp4.CodecVP9{ @@ -211,65 +215,69 @@ func (f *formatFMP4) initialize() { firstReceived := false - f.ai.agent.Stream.AddReader(f.ai.writer, media, forma, func(u unit.Unit) error { - tunit := u.(*unit.VP9) - if tunit.Frame == nil { - return nil - } + f.ai.agent.Stream.AddReader( + f.ai, + media, + forma, + func(u unit.Unit) error { + tunit := u.(*unit.VP9) + if tunit.Frame == nil { + return nil + } - var h vp9.Header - err := h.Unmarshal(tunit.Frame) - if err != nil { - return err - } + var h vp9.Header + err := h.Unmarshal(tunit.Frame) + if err != nil { + return err + } - randomAccess := false + randomAccess := false - if !h.NonKeyFrame { - randomAccess = true + if !h.NonKeyFrame { + randomAccess = true - if w := h.Width(); codec.Width != w { - codec.Width = w - updateCodecs() - } - if h := h.Width(); codec.Height != h { - codec.Height = h - updateCodecs() - } - if codec.Profile != h.Profile { - codec.Profile = h.Profile - updateCodecs() - } - if codec.BitDepth != h.ColorConfig.BitDepth { - codec.BitDepth = h.ColorConfig.BitDepth - updateCodecs() - } - if c := h.ChromaSubsampling(); codec.ChromaSubsampling != c { - codec.ChromaSubsampling = c - updateCodecs() - } - if codec.ColorRange != h.ColorConfig.ColorRange { - codec.ColorRange = h.ColorConfig.ColorRange - updateCodecs() + if w := h.Width(); codec.Width != w { + codec.Width = w + updateCodecs() + } + if h := h.Width(); codec.Height != h { + codec.Height = h + updateCodecs() + } + if codec.Profile != h.Profile { + codec.Profile = h.Profile + updateCodecs() + } + if codec.BitDepth != h.ColorConfig.BitDepth { + codec.BitDepth = h.ColorConfig.BitDepth + updateCodecs() + } + if c := h.ChromaSubsampling(); codec.ChromaSubsampling != c { + codec.ChromaSubsampling = c + updateCodecs() + } + if codec.ColorRange != h.ColorConfig.ColorRange { + codec.ColorRange = h.ColorConfig.ColorRange + updateCodecs() + } } - } - if !firstReceived { - if !randomAccess { - return nil + if !firstReceived { + if !randomAccess { + return nil + } + firstReceived = true } - firstReceived = true - } - return track.write(&sample{ - PartSample: &fmp4.PartSample{ - IsNonSyncSample: !randomAccess, - Payload: tunit.Frame, - }, - dts: tunit.PTS, - ntp: tunit.NTP, + return track.write(&sample{ + PartSample: &fmp4.PartSample{ + IsNonSyncSample: !randomAccess, + Payload: tunit.Frame, + }, + dts: tunit.PTS, + ntp: tunit.NTP, + }) }) - }) case *rtspformat.VP8: // TODO @@ -292,67 +300,71 @@ func (f *formatFMP4) initialize() { var dtsExtractor *h265.DTSExtractor - f.ai.agent.Stream.AddReader(f.ai.writer, media, forma, func(u unit.Unit) error { - tunit := u.(*unit.H265) - if tunit.AU == nil { - return nil - } + f.ai.agent.Stream.AddReader( + f.ai, + media, + forma, + func(u unit.Unit) error { + tunit := u.(*unit.H265) + if tunit.AU == nil { + return nil + } - randomAccess := false + randomAccess := false - for _, nalu := range tunit.AU { - typ := h265.NALUType((nalu[0] >> 1) & 0b111111) + for _, nalu := range tunit.AU { + typ := h265.NALUType((nalu[0] >> 1) & 0b111111) - switch typ { - case h265.NALUType_VPS_NUT: - if !bytes.Equal(codec.VPS, nalu) { - codec.VPS = nalu - updateCodecs() - } + switch typ { + case h265.NALUType_VPS_NUT: + if !bytes.Equal(codec.VPS, nalu) { + codec.VPS = nalu + updateCodecs() + } - case h265.NALUType_SPS_NUT: - if !bytes.Equal(codec.SPS, nalu) { - codec.SPS = nalu - updateCodecs() - } + case h265.NALUType_SPS_NUT: + if !bytes.Equal(codec.SPS, nalu) { + codec.SPS = nalu + updateCodecs() + } - case h265.NALUType_PPS_NUT: - if !bytes.Equal(codec.PPS, nalu) { - codec.PPS = nalu - updateCodecs() - } + case h265.NALUType_PPS_NUT: + if !bytes.Equal(codec.PPS, nalu) { + codec.PPS = nalu + updateCodecs() + } - case h265.NALUType_IDR_W_RADL, h265.NALUType_IDR_N_LP, h265.NALUType_CRA_NUT: - randomAccess = true + case h265.NALUType_IDR_W_RADL, h265.NALUType_IDR_N_LP, h265.NALUType_CRA_NUT: + randomAccess = true + } } - } - if dtsExtractor == nil { - if !randomAccess { - return nil + if dtsExtractor == nil { + if !randomAccess { + return nil + } + dtsExtractor = h265.NewDTSExtractor() } - dtsExtractor = h265.NewDTSExtractor() - } - dts, err := dtsExtractor.Extract(tunit.AU, tunit.PTS) - if err != nil { - return err - } + dts, err := dtsExtractor.Extract(tunit.AU, tunit.PTS) + if err != nil { + return err + } - sampl, err := fmp4.NewPartSampleH26x( - int32(durationGoToMp4(tunit.PTS-dts, 90000)), - randomAccess, - tunit.AU) - if err != nil { - return err - } + sampl, err := fmp4.NewPartSampleH26x( + int32(durationGoToMp4(tunit.PTS-dts, 90000)), + randomAccess, + tunit.AU) + if err != nil { + return err + } - return track.write(&sample{ - PartSample: sampl, - dts: dts, - ntp: tunit.NTP, + return track.write(&sample{ + PartSample: sampl, + dts: dts, + ntp: tunit.NTP, + }) }) - }) case *rtspformat.H264: sps, pps := forma.SafeParams() @@ -370,60 +382,64 @@ func (f *formatFMP4) initialize() { var dtsExtractor *h264.DTSExtractor - f.ai.agent.Stream.AddReader(f.ai.writer, media, forma, func(u unit.Unit) error { - tunit := u.(*unit.H264) - if tunit.AU == nil { - return nil - } - - randomAccess := false + f.ai.agent.Stream.AddReader( + f.ai, + media, + forma, + func(u unit.Unit) error { + tunit := u.(*unit.H264) + if tunit.AU == nil { + return nil + } - for _, nalu := range tunit.AU { - typ := h264.NALUType(nalu[0] & 0x1F) - switch typ { - case h264.NALUTypeSPS: - if !bytes.Equal(codec.SPS, nalu) { - codec.SPS = nalu - updateCodecs() + randomAccess := false + + for _, nalu := range tunit.AU { + typ := h264.NALUType(nalu[0] & 0x1F) + switch typ { + case h264.NALUTypeSPS: + if !bytes.Equal(codec.SPS, nalu) { + codec.SPS = nalu + updateCodecs() + } + + case h264.NALUTypePPS: + if !bytes.Equal(codec.PPS, nalu) { + codec.PPS = nalu + updateCodecs() + } + + case h264.NALUTypeIDR: + randomAccess = true } + } - case h264.NALUTypePPS: - if !bytes.Equal(codec.PPS, nalu) { - codec.PPS = nalu - updateCodecs() + if dtsExtractor == nil { + if !randomAccess { + return nil } - - case h264.NALUTypeIDR: - randomAccess = true + dtsExtractor = h264.NewDTSExtractor() } - } - if dtsExtractor == nil { - if !randomAccess { - return nil + dts, err := dtsExtractor.Extract(tunit.AU, tunit.PTS) + if err != nil { + return err } - dtsExtractor = h264.NewDTSExtractor() - } - dts, err := dtsExtractor.Extract(tunit.AU, tunit.PTS) - if err != nil { - return err - } - - sampl, err := fmp4.NewPartSampleH26x( - int32(durationGoToMp4(tunit.PTS-dts, 90000)), - randomAccess, - tunit.AU) - if err != nil { - return err - } + sampl, err := fmp4.NewPartSampleH26x( + int32(durationGoToMp4(tunit.PTS-dts, 90000)), + randomAccess, + tunit.AU) + if err != nil { + return err + } - return track.write(&sample{ - PartSample: sampl, - dts: dts, - ntp: tunit.NTP, + return track.write(&sample{ + PartSample: sampl, + dts: dts, + ntp: tunit.NTP, + }) }) - }) case *rtspformat.MPEG4Video: config := forma.SafeParams() @@ -440,45 +456,49 @@ func (f *formatFMP4) initialize() { firstReceived := false var lastPTS time.Duration - f.ai.agent.Stream.AddReader(f.ai.writer, media, forma, func(u unit.Unit) error { - tunit := u.(*unit.MPEG4Video) - if tunit.Frame == nil { - return nil - } + f.ai.agent.Stream.AddReader( + f.ai, + media, + forma, + func(u unit.Unit) error { + tunit := u.(*unit.MPEG4Video) + if tunit.Frame == nil { + return nil + } - randomAccess := bytes.Contains(tunit.Frame, []byte{0, 0, 1, byte(mpeg4video.GroupOfVOPStartCode)}) + randomAccess := bytes.Contains(tunit.Frame, []byte{0, 0, 1, byte(mpeg4video.GroupOfVOPStartCode)}) - if bytes.HasPrefix(tunit.Frame, []byte{0, 0, 1, byte(mpeg4video.VisualObjectSequenceStartCode)}) { - end := bytes.Index(tunit.Frame[4:], []byte{0, 0, 1, byte(mpeg4video.GroupOfVOPStartCode)}) - if end >= 0 { - config := tunit.Frame[:end+4] + if bytes.HasPrefix(tunit.Frame, []byte{0, 0, 1, byte(mpeg4video.VisualObjectSequenceStartCode)}) { + end := bytes.Index(tunit.Frame[4:], []byte{0, 0, 1, byte(mpeg4video.GroupOfVOPStartCode)}) + if end >= 0 { + config := tunit.Frame[:end+4] - if !bytes.Equal(codec.Config, config) { - codec.Config = config - updateCodecs() + if !bytes.Equal(codec.Config, config) { + codec.Config = config + updateCodecs() + } } } - } - if !firstReceived { - if !randomAccess { - return nil + if !firstReceived { + if !randomAccess { + return nil + } + firstReceived = true + } else if tunit.PTS < lastPTS { + return fmt.Errorf("MPEG-4 Video streams with B-frames are not supported (yet)") } - firstReceived = true - } else if tunit.PTS < lastPTS { - return fmt.Errorf("MPEG-4 Video streams with B-frames are not supported (yet)") - } - lastPTS = tunit.PTS - - return track.write(&sample{ - PartSample: &fmp4.PartSample{ - Payload: tunit.Frame, - IsNonSyncSample: !randomAccess, - }, - dts: tunit.PTS, - ntp: tunit.NTP, + lastPTS = tunit.PTS + + return track.write(&sample{ + PartSample: &fmp4.PartSample{ + Payload: tunit.Frame, + IsNonSyncSample: !randomAccess, + }, + dts: tunit.PTS, + ntp: tunit.NTP, + }) }) - }) case *rtspformat.MPEG1Video: codec := &fmp4.CodecMPEG1Video{ @@ -489,45 +509,49 @@ func (f *formatFMP4) initialize() { firstReceived := false var lastPTS time.Duration - f.ai.agent.Stream.AddReader(f.ai.writer, media, forma, func(u unit.Unit) error { - tunit := u.(*unit.MPEG1Video) - if tunit.Frame == nil { - return nil - } + f.ai.agent.Stream.AddReader( + f.ai, + media, + forma, + func(u unit.Unit) error { + tunit := u.(*unit.MPEG1Video) + if tunit.Frame == nil { + return nil + } - randomAccess := bytes.Contains(tunit.Frame, []byte{0, 0, 1, 0xB8}) + randomAccess := bytes.Contains(tunit.Frame, []byte{0, 0, 1, 0xB8}) - if bytes.HasPrefix(tunit.Frame, []byte{0, 0, 1, 0xB3}) { - end := bytes.Index(tunit.Frame[4:], []byte{0, 0, 1, 0xB8}) - if end >= 0 { - config := tunit.Frame[:end+4] + if bytes.HasPrefix(tunit.Frame, []byte{0, 0, 1, 0xB3}) { + end := bytes.Index(tunit.Frame[4:], []byte{0, 0, 1, 0xB8}) + if end >= 0 { + config := tunit.Frame[:end+4] - if !bytes.Equal(codec.Config, config) { - codec.Config = config - updateCodecs() + if !bytes.Equal(codec.Config, config) { + codec.Config = config + updateCodecs() + } } } - } - if !firstReceived { - if !randomAccess { - return nil + if !firstReceived { + if !randomAccess { + return nil + } + firstReceived = true + } else if tunit.PTS < lastPTS { + return fmt.Errorf("MPEG-1 Video streams with B-frames are not supported (yet)") } - firstReceived = true - } else if tunit.PTS < lastPTS { - return fmt.Errorf("MPEG-1 Video streams with B-frames are not supported (yet)") - } - lastPTS = tunit.PTS - - return track.write(&sample{ - PartSample: &fmp4.PartSample{ - Payload: tunit.Frame, - IsNonSyncSample: !randomAccess, - }, - dts: tunit.PTS, - ntp: tunit.NTP, + lastPTS = tunit.PTS + + return track.write(&sample{ + PartSample: &fmp4.PartSample{ + Payload: tunit.Frame, + IsNonSyncSample: !randomAccess, + }, + dts: tunit.PTS, + ntp: tunit.NTP, + }) }) - }) case *rtspformat.MJPEG: codec := &fmp4.CodecMJPEG{ @@ -538,31 +562,35 @@ func (f *formatFMP4) initialize() { parsed := false - f.ai.agent.Stream.AddReader(f.ai.writer, media, forma, func(u unit.Unit) error { - tunit := u.(*unit.MJPEG) - if tunit.Frame == nil { - return nil - } + f.ai.agent.Stream.AddReader( + f.ai, + media, + forma, + func(u unit.Unit) error { + tunit := u.(*unit.MJPEG) + if tunit.Frame == nil { + return nil + } - if !parsed { - parsed = true - width, height, err := jpegExtractSize(tunit.Frame) - if err != nil { - return err + if !parsed { + parsed = true + width, height, err := jpegExtractSize(tunit.Frame) + if err != nil { + return err + } + codec.Width = width + codec.Height = height + updateCodecs() } - codec.Width = width - codec.Height = height - updateCodecs() - } - return track.write(&sample{ - PartSample: &fmp4.PartSample{ - Payload: tunit.Frame, - }, - dts: tunit.PTS, - ntp: tunit.NTP, + return track.write(&sample{ + PartSample: &fmp4.PartSample{ + Payload: tunit.Frame, + }, + dts: tunit.PTS, + ntp: tunit.NTP, + }) }) - }) case *rtspformat.Opus: codec := &fmp4.CodecOpus{ @@ -570,31 +598,35 @@ func (f *formatFMP4) initialize() { } track := addTrack(forma, codec) - f.ai.agent.Stream.AddReader(f.ai.writer, media, forma, func(u unit.Unit) error { - tunit := u.(*unit.Opus) - if tunit.Packets == nil { - return nil - } + f.ai.agent.Stream.AddReader( + f.ai, + media, + forma, + func(u unit.Unit) error { + tunit := u.(*unit.Opus) + if tunit.Packets == nil { + return nil + } - var dt time.Duration + var dt time.Duration - for _, packet := range tunit.Packets { - err := track.write(&sample{ - PartSample: &fmp4.PartSample{ - Payload: packet, - }, - dts: tunit.PTS + dt, - ntp: tunit.NTP.Add(dt), - }) - if err != nil { - return err - } + for _, packet := range tunit.Packets { + err := track.write(&sample{ + PartSample: &fmp4.PartSample{ + Payload: packet, + }, + dts: tunit.PTS + dt, + ntp: tunit.NTP.Add(dt), + }) + if err != nil { + return err + } - dt += opus.PacketDuration(packet) - } + dt += opus.PacketDuration(packet) + } - return nil - }) + return nil + }) case *rtspformat.MPEG4Audio: co := forma.GetConfig() @@ -606,30 +638,34 @@ func (f *formatFMP4) initialize() { sampleRate := time.Duration(forma.ClockRate()) - f.ai.agent.Stream.AddReader(f.ai.writer, media, forma, func(u unit.Unit) error { - tunit := u.(*unit.MPEG4Audio) - if tunit.AUs == nil { - return nil - } - - for i, au := range tunit.AUs { - dt := time.Duration(i) * mpeg4audio.SamplesPerAccessUnit * - time.Second / sampleRate + f.ai.agent.Stream.AddReader( + f.ai, + media, + forma, + func(u unit.Unit) error { + tunit := u.(*unit.MPEG4Audio) + if tunit.AUs == nil { + return nil + } - err := track.write(&sample{ - PartSample: &fmp4.PartSample{ - Payload: au, - }, - dts: tunit.PTS + dt, - ntp: tunit.NTP.Add(dt), - }) - if err != nil { - return err + for i, au := range tunit.AUs { + dt := time.Duration(i) * mpeg4audio.SamplesPerAccessUnit * + time.Second / sampleRate + + err := track.write(&sample{ + PartSample: &fmp4.PartSample{ + Payload: au, + }, + dts: tunit.PTS + dt, + ntp: tunit.NTP.Add(dt), + }) + if err != nil { + return err + } } - } - return nil - }) + return nil + }) } case *rtspformat.MPEG1Audio: @@ -641,45 +677,49 @@ func (f *formatFMP4) initialize() { parsed := false - f.ai.agent.Stream.AddReader(f.ai.writer, media, forma, func(u unit.Unit) error { - tunit := u.(*unit.MPEG1Audio) - if tunit.Frames == nil { - return nil - } + f.ai.agent.Stream.AddReader( + f.ai, + media, + forma, + func(u unit.Unit) error { + tunit := u.(*unit.MPEG1Audio) + if tunit.Frames == nil { + return nil + } - var dt time.Duration + var dt time.Duration - for _, frame := range tunit.Frames { - var h mpeg1audio.FrameHeader - err := h.Unmarshal(frame) - if err != nil { - return err - } + for _, frame := range tunit.Frames { + var h mpeg1audio.FrameHeader + err := h.Unmarshal(frame) + if err != nil { + return err + } - if !parsed { - parsed = true - codec.SampleRate = h.SampleRate - codec.ChannelCount = mpeg1audioChannelCount(h.ChannelMode) - updateCodecs() - } + if !parsed { + parsed = true + codec.SampleRate = h.SampleRate + codec.ChannelCount = mpeg1audioChannelCount(h.ChannelMode) + updateCodecs() + } - err = track.write(&sample{ - PartSample: &fmp4.PartSample{ - Payload: frame, - }, - dts: tunit.PTS + tunit.PTS, - ntp: tunit.NTP, - }) - if err != nil { - return err - } + err = track.write(&sample{ + PartSample: &fmp4.PartSample{ + Payload: frame, + }, + dts: tunit.PTS + tunit.PTS, + ntp: tunit.NTP, + }) + if err != nil { + return err + } - dt += time.Duration(h.SampleCount()) * - time.Second / time.Duration(h.SampleRate) - } + dt += time.Duration(h.SampleCount()) * + time.Second / time.Duration(h.SampleRate) + } - return nil - }) + return nil + }) case *rtspformat.AC3: codec := &fmp4.CodecAC3{ @@ -696,55 +736,59 @@ func (f *formatFMP4) initialize() { parsed := false - f.ai.agent.Stream.AddReader(f.ai.writer, media, forma, func(u unit.Unit) error { - tunit := u.(*unit.AC3) - if tunit.Frames == nil { - return nil - } - - for i, frame := range tunit.Frames { - var syncInfo ac3.SyncInfo - err := syncInfo.Unmarshal(frame) - if err != nil { - return fmt.Errorf("invalid AC-3 frame: %w", err) + f.ai.agent.Stream.AddReader( + f.ai, + media, + forma, + func(u unit.Unit) error { + tunit := u.(*unit.AC3) + if tunit.Frames == nil { + return nil } - var bsi ac3.BSI - err = bsi.Unmarshal(frame[5:]) - if err != nil { - return fmt.Errorf("invalid AC-3 frame: %w", err) - } + for i, frame := range tunit.Frames { + var syncInfo ac3.SyncInfo + err := syncInfo.Unmarshal(frame) + if err != nil { + return fmt.Errorf("invalid AC-3 frame: %w", err) + } - if !parsed { - parsed = true - codec.SampleRate = syncInfo.SampleRate() - codec.ChannelCount = bsi.ChannelCount() - codec.Fscod = syncInfo.Fscod - codec.Bsid = bsi.Bsid - codec.Bsmod = bsi.Bsmod - codec.Acmod = bsi.Acmod - codec.LfeOn = bsi.LfeOn - codec.BitRateCode = syncInfo.Frmsizecod >> 1 - updateCodecs() - } + var bsi ac3.BSI + err = bsi.Unmarshal(frame[5:]) + if err != nil { + return fmt.Errorf("invalid AC-3 frame: %w", err) + } - dt := time.Duration(i) * time.Duration(ac3.SamplesPerFrame) * - time.Second / time.Duration(codec.SampleRate) + if !parsed { + parsed = true + codec.SampleRate = syncInfo.SampleRate() + codec.ChannelCount = bsi.ChannelCount() + codec.Fscod = syncInfo.Fscod + codec.Bsid = bsi.Bsid + codec.Bsmod = bsi.Bsmod + codec.Acmod = bsi.Acmod + codec.LfeOn = bsi.LfeOn + codec.BitRateCode = syncInfo.Frmsizecod >> 1 + updateCodecs() + } - err = track.write(&sample{ - PartSample: &fmp4.PartSample{ - Payload: frame, - }, - dts: tunit.PTS + dt, - ntp: tunit.NTP.Add(dt), - }) - if err != nil { - return err + dt := time.Duration(i) * time.Duration(ac3.SamplesPerFrame) * + time.Second / time.Duration(codec.SampleRate) + + err = track.write(&sample{ + PartSample: &fmp4.PartSample{ + Payload: frame, + }, + dts: tunit.PTS + dt, + ntp: tunit.NTP.Add(dt), + }) + if err != nil { + return err + } } - } - return nil - }) + return nil + }) case *rtspformat.G722: // TODO @@ -758,27 +802,31 @@ func (f *formatFMP4) initialize() { } track := addTrack(forma, codec) - f.ai.agent.Stream.AddReader(f.ai.writer, media, forma, func(u unit.Unit) error { - tunit := u.(*unit.G711) - if tunit.Samples == nil { - return nil - } + f.ai.agent.Stream.AddReader( + f.ai, + media, + forma, + func(u unit.Unit) error { + tunit := u.(*unit.G711) + if tunit.Samples == nil { + return nil + } - var out []byte - if forma.MULaw { - out = g711.DecodeMulaw(tunit.Samples) - } else { - out = g711.DecodeAlaw(tunit.Samples) - } + var out []byte + if forma.MULaw { + out = g711.DecodeMulaw(tunit.Samples) + } else { + out = g711.DecodeAlaw(tunit.Samples) + } - return track.write(&sample{ - PartSample: &fmp4.PartSample{ - Payload: out, - }, - dts: tunit.PTS, - ntp: tunit.NTP, + return track.write(&sample{ + PartSample: &fmp4.PartSample{ + Payload: out, + }, + dts: tunit.PTS, + ntp: tunit.NTP, + }) }) - }) case *rtspformat.LPCM: codec := &fmp4.CodecLPCM{ @@ -789,20 +837,24 @@ func (f *formatFMP4) initialize() { } track := addTrack(forma, codec) - f.ai.agent.Stream.AddReader(f.ai.writer, media, forma, func(u unit.Unit) error { - tunit := u.(*unit.LPCM) - if tunit.Samples == nil { - return nil - } + f.ai.agent.Stream.AddReader( + f.ai, + media, + forma, + func(u unit.Unit) error { + tunit := u.(*unit.LPCM) + if tunit.Samples == nil { + return nil + } - return track.write(&sample{ - PartSample: &fmp4.PartSample{ - Payload: tunit.Samples, - }, - dts: tunit.PTS, - ntp: tunit.NTP, + return track.write(&sample{ + PartSample: &fmp4.PartSample{ + Payload: tunit.Samples, + }, + dts: tunit.PTS, + ntp: tunit.NTP, + }) }) - }) } } } diff --git a/internal/recorder/format_mpegts.go b/internal/recorder/format_mpegts.go index f0c6d8bf69d..aa201c62589 100644 --- a/internal/recorder/format_mpegts.go +++ b/internal/recorder/format_mpegts.go @@ -40,7 +40,7 @@ func (d *dynamicWriter) setTarget(w io.Writer) { } type formatMPEGTS struct { - ai *agentInstance + ai *recorderInstance dw *dynamicWriter bw *bufio.Writer @@ -73,72 +73,80 @@ func (f *formatMPEGTS) initialize() { var dtsExtractor *h265.DTSExtractor - f.ai.agent.Stream.AddReader(f.ai.writer, media, forma, func(u unit.Unit) error { - tunit := u.(*unit.H265) - if tunit.AU == nil { - return nil - } + f.ai.agent.Stream.AddReader( + f.ai, + media, + forma, + func(u unit.Unit) error { + tunit := u.(*unit.H265) + if tunit.AU == nil { + return nil + } - randomAccess := h265.IsRandomAccess(tunit.AU) + randomAccess := h265.IsRandomAccess(tunit.AU) - if dtsExtractor == nil { - if !randomAccess { - return nil + if dtsExtractor == nil { + if !randomAccess { + return nil + } + dtsExtractor = h265.NewDTSExtractor() } - dtsExtractor = h265.NewDTSExtractor() - } - - dts, err := dtsExtractor.Extract(tunit.AU, tunit.PTS) - if err != nil { - return err - } - - return f.write( - dts, - tunit.NTP, - true, - randomAccess, - func() error { - return f.mw.WriteH265(track, durationGoToMPEGTS(tunit.PTS), durationGoToMPEGTS(dts), randomAccess, tunit.AU) - }, - ) - }) + + dts, err := dtsExtractor.Extract(tunit.AU, tunit.PTS) + if err != nil { + return err + } + + return f.write( + dts, + tunit.NTP, + true, + randomAccess, + func() error { + return f.mw.WriteH265(track, durationGoToMPEGTS(tunit.PTS), durationGoToMPEGTS(dts), randomAccess, tunit.AU) + }, + ) + }) case *rtspformat.H264: //nolint:dupl track := addTrack(forma, &mpegts.CodecH264{}) var dtsExtractor *h264.DTSExtractor - f.ai.agent.Stream.AddReader(f.ai.writer, media, forma, func(u unit.Unit) error { - tunit := u.(*unit.H264) - if tunit.AU == nil { - return nil - } + f.ai.agent.Stream.AddReader( + f.ai, + media, + forma, + func(u unit.Unit) error { + tunit := u.(*unit.H264) + if tunit.AU == nil { + return nil + } - randomAccess := h264.IDRPresent(tunit.AU) + randomAccess := h264.IDRPresent(tunit.AU) - if dtsExtractor == nil { - if !randomAccess { - return nil + if dtsExtractor == nil { + if !randomAccess { + return nil + } + dtsExtractor = h264.NewDTSExtractor() } - dtsExtractor = h264.NewDTSExtractor() - } - - dts, err := dtsExtractor.Extract(tunit.AU, tunit.PTS) - if err != nil { - return err - } - - return f.write( - dts, - tunit.NTP, - true, - randomAccess, - func() error { - return f.mw.WriteH264(track, durationGoToMPEGTS(tunit.PTS), durationGoToMPEGTS(dts), randomAccess, tunit.AU) - }, - ) - }) + + dts, err := dtsExtractor.Extract(tunit.AU, tunit.PTS) + if err != nil { + return err + } + + return f.write( + dts, + tunit.NTP, + true, + randomAccess, + func() error { + return f.mw.WriteH264(track, durationGoToMPEGTS(tunit.PTS), durationGoToMPEGTS(dts), randomAccess, tunit.AU) + }, + ) + }) case *rtspformat.MPEG4Video: track := addTrack(forma, &mpegts.CodecMPEG4Video{}) @@ -146,31 +154,35 @@ func (f *formatMPEGTS) initialize() { firstReceived := false var lastPTS time.Duration - f.ai.agent.Stream.AddReader(f.ai.writer, media, forma, func(u unit.Unit) error { - tunit := u.(*unit.MPEG4Video) - if tunit.Frame == nil { - return nil - } - - if !firstReceived { - firstReceived = true - } else if tunit.PTS < lastPTS { - return fmt.Errorf("MPEG-4 Video streams with B-frames are not supported (yet)") - } - lastPTS = tunit.PTS - - randomAccess := bytes.Contains(tunit.Frame, []byte{0, 0, 1, byte(mpeg4video.GroupOfVOPStartCode)}) - - return f.write( - tunit.PTS, - tunit.NTP, - true, - randomAccess, - func() error { - return f.mw.WriteMPEG4Video(track, durationGoToMPEGTS(tunit.PTS), tunit.Frame) - }, - ) - }) + f.ai.agent.Stream.AddReader( + f.ai, + media, + forma, + func(u unit.Unit) error { + tunit := u.(*unit.MPEG4Video) + if tunit.Frame == nil { + return nil + } + + if !firstReceived { + firstReceived = true + } else if tunit.PTS < lastPTS { + return fmt.Errorf("MPEG-4 Video streams with B-frames are not supported (yet)") + } + lastPTS = tunit.PTS + + randomAccess := bytes.Contains(tunit.Frame, []byte{0, 0, 1, byte(mpeg4video.GroupOfVOPStartCode)}) + + return f.write( + tunit.PTS, + tunit.NTP, + true, + randomAccess, + func() error { + return f.mw.WriteMPEG4Video(track, durationGoToMPEGTS(tunit.PTS), tunit.Frame) + }, + ) + }) case *rtspformat.MPEG1Video: track := addTrack(forma, &mpegts.CodecMPEG1Video{}) @@ -178,53 +190,61 @@ func (f *formatMPEGTS) initialize() { firstReceived := false var lastPTS time.Duration - f.ai.agent.Stream.AddReader(f.ai.writer, media, forma, func(u unit.Unit) error { - tunit := u.(*unit.MPEG1Video) - if tunit.Frame == nil { - return nil - } - - if !firstReceived { - firstReceived = true - } else if tunit.PTS < lastPTS { - return fmt.Errorf("MPEG-1 Video streams with B-frames are not supported (yet)") - } - lastPTS = tunit.PTS - - randomAccess := bytes.Contains(tunit.Frame, []byte{0, 0, 1, 0xB8}) - - return f.write( - tunit.PTS, - tunit.NTP, - true, - randomAccess, - func() error { - return f.mw.WriteMPEG1Video(track, durationGoToMPEGTS(tunit.PTS), tunit.Frame) - }, - ) - }) + f.ai.agent.Stream.AddReader( + f.ai, + media, + forma, + func(u unit.Unit) error { + tunit := u.(*unit.MPEG1Video) + if tunit.Frame == nil { + return nil + } + + if !firstReceived { + firstReceived = true + } else if tunit.PTS < lastPTS { + return fmt.Errorf("MPEG-1 Video streams with B-frames are not supported (yet)") + } + lastPTS = tunit.PTS + + randomAccess := bytes.Contains(tunit.Frame, []byte{0, 0, 1, 0xB8}) + + return f.write( + tunit.PTS, + tunit.NTP, + true, + randomAccess, + func() error { + return f.mw.WriteMPEG1Video(track, durationGoToMPEGTS(tunit.PTS), tunit.Frame) + }, + ) + }) case *rtspformat.Opus: track := addTrack(forma, &mpegts.CodecOpus{ ChannelCount: forma.ChannelCount, }) - f.ai.agent.Stream.AddReader(f.ai.writer, media, forma, func(u unit.Unit) error { - tunit := u.(*unit.Opus) - if tunit.Packets == nil { - return nil - } - - return f.write( - tunit.PTS, - tunit.NTP, - false, - true, - func() error { - return f.mw.WriteOpus(track, durationGoToMPEGTS(tunit.PTS), tunit.Packets) - }, - ) - }) + f.ai.agent.Stream.AddReader( + f.ai, + media, + forma, + func(u unit.Unit) error { + tunit := u.(*unit.Opus) + if tunit.Packets == nil { + return nil + } + + return f.write( + tunit.PTS, + tunit.NTP, + false, + true, + func() error { + return f.mw.WriteOpus(track, durationGoToMPEGTS(tunit.PTS), tunit.Packets) + }, + ) + }) case *rtspformat.MPEG4Audio: co := forma.GetConfig() @@ -235,9 +255,38 @@ func (f *formatMPEGTS) initialize() { Config: *co, }) - f.ai.agent.Stream.AddReader(f.ai.writer, media, forma, func(u unit.Unit) error { - tunit := u.(*unit.MPEG4Audio) - if tunit.AUs == nil { + f.ai.agent.Stream.AddReader( + f.ai, + media, + forma, + func(u unit.Unit) error { + tunit := u.(*unit.MPEG4Audio) + if tunit.AUs == nil { + return nil + } + + return f.write( + tunit.PTS, + tunit.NTP, + false, + true, + func() error { + return f.mw.WriteMPEG4Audio(track, durationGoToMPEGTS(tunit.PTS), tunit.AUs) + }, + ) + }) + } + + case *rtspformat.MPEG1Audio: + track := addTrack(forma, &mpegts.CodecMPEG1Audio{}) + + f.ai.agent.Stream.AddReader( + f.ai, + media, + forma, + func(u unit.Unit) error { + tunit := u.(*unit.MPEG1Audio) + if tunit.Frames == nil { return nil } @@ -247,63 +296,46 @@ func (f *formatMPEGTS) initialize() { false, true, func() error { - return f.mw.WriteMPEG4Audio(track, durationGoToMPEGTS(tunit.PTS), tunit.AUs) + return f.mw.WriteMPEG1Audio(track, durationGoToMPEGTS(tunit.PTS), tunit.Frames) }, ) }) - } - - case *rtspformat.MPEG1Audio: - track := addTrack(forma, &mpegts.CodecMPEG1Audio{}) - - f.ai.agent.Stream.AddReader(f.ai.writer, media, forma, func(u unit.Unit) error { - tunit := u.(*unit.MPEG1Audio) - if tunit.Frames == nil { - return nil - } - - return f.write( - tunit.PTS, - tunit.NTP, - false, - true, - func() error { - return f.mw.WriteMPEG1Audio(track, durationGoToMPEGTS(tunit.PTS), tunit.Frames) - }, - ) - }) case *rtspformat.AC3: track := addTrack(forma, &mpegts.CodecAC3{}) sampleRate := time.Duration(forma.SampleRate) - f.ai.agent.Stream.AddReader(f.ai.writer, media, forma, func(u unit.Unit) error { - tunit := u.(*unit.AC3) - if tunit.Frames == nil { - return nil - } - - return f.write( - tunit.PTS, - tunit.NTP, - false, - true, - func() error { - for i, frame := range tunit.Frames { - framePTS := tunit.PTS + time.Duration(i)*ac3.SamplesPerFrame* - time.Second/sampleRate - - err := f.mw.WriteAC3(track, durationGoToMPEGTS(framePTS), frame) - if err != nil { - return err + f.ai.agent.Stream.AddReader( + f.ai, + media, + forma, + func(u unit.Unit) error { + tunit := u.(*unit.AC3) + if tunit.Frames == nil { + return nil + } + + return f.write( + tunit.PTS, + tunit.NTP, + false, + true, + func() error { + for i, frame := range tunit.Frames { + framePTS := tunit.PTS + time.Duration(i)*ac3.SamplesPerFrame* + time.Second/sampleRate + + err := f.mw.WriteAC3(track, durationGoToMPEGTS(framePTS), frame) + if err != nil { + return err + } } - } - return nil - }, - ) - }) + return nil + }, + ) + }) } } } diff --git a/internal/recorder/recoder_instance.go b/internal/recorder/recoder_instance.go index a24e5b96fb8..e0261e43172 100644 --- a/internal/recorder/recoder_instance.go +++ b/internal/recorder/recoder_instance.go @@ -6,7 +6,6 @@ import ( "github.com/bluenviron/mediacommon/pkg/formats/fmp4" - "github.com/bluenviron/mediamtx/internal/asyncwriter" "github.com/bluenviron/mediamtx/internal/conf" "github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/recordstore" @@ -18,11 +17,10 @@ type sample struct { ntp time.Time } -type agentInstance struct { +type recorderInstance struct { agent *Recorder pathFormat string - writer *asyncwriter.Writer format format terminate chan struct{} @@ -30,11 +28,11 @@ type agentInstance struct { } // Log implements logger.Writer. -func (ai *agentInstance) Log(level logger.Level, format string, args ...interface{}) { +func (ai *recorderInstance) Log(level logger.Level, format string, args ...interface{}) { ai.agent.Log(level, format, args...) } -func (ai *agentInstance) initialize() { +func (ai *recorderInstance) initialize() { ai.pathFormat = ai.agent.PathFormat ai.pathFormat = recordstore.PathAddExtension( @@ -45,8 +43,6 @@ func (ai *agentInstance) initialize() { ai.terminate = make(chan struct{}) ai.done = make(chan struct{}) - ai.writer = asyncwriter.New(ai.agent.WriteQueueSize, ai.agent) - switch ai.agent.Format { case conf.RecordFormatMPEGTS: ai.format = &formatMPEGTS{ @@ -61,28 +57,27 @@ func (ai *agentInstance) initialize() { ai.format.initialize() } + ai.agent.Stream.StartReader(ai) + go ai.run() } -func (ai *agentInstance) close() { +func (ai *recorderInstance) close() { close(ai.terminate) <-ai.done } -func (ai *agentInstance) run() { +func (ai *recorderInstance) run() { defer close(ai.done) - ai.writer.Start() - select { - case err := <-ai.writer.Error(): + case err := <-ai.agent.Stream.ReaderError(ai): ai.Log(logger.Error, err.Error()) - ai.agent.Stream.RemoveReader(ai.writer) case <-ai.terminate: - ai.agent.Stream.RemoveReader(ai.writer) - ai.writer.Stop() } + ai.agent.Stream.RemoveReader(ai) + ai.format.close() } diff --git a/internal/recorder/recorder.go b/internal/recorder/recorder.go index 0700b2febe9..cc595e1bbd4 100644 --- a/internal/recorder/recorder.go +++ b/internal/recorder/recorder.go @@ -17,7 +17,6 @@ type OnSegmentCompleteFunc = func(path string, duration time.Duration) // Recorder writes recordings to disk. type Recorder struct { - WriteQueueSize int PathFormat string Format conf.RecordFormat PartDuration time.Duration @@ -30,7 +29,7 @@ type Recorder struct { restartPause time.Duration - currentInstance *agentInstance + currentInstance *recorderInstance terminate chan struct{} done chan struct{} @@ -53,7 +52,7 @@ func (w *Recorder) Initialize() { w.terminate = make(chan struct{}) w.done = make(chan struct{}) - w.currentInstance = &agentInstance{ + w.currentInstance = &recorderInstance{ agent: w, } w.currentInstance.initialize() @@ -91,7 +90,7 @@ func (w *Recorder) run() { return } - w.currentInstance = &agentInstance{ + w.currentInstance = &recorderInstance{ agent: w, } w.currentInstance.initialize() diff --git a/internal/recorder/recorder_test.go b/internal/recorder/recorder_test.go index 1581db3eb88..c3fb654b65d 100644 --- a/internal/recorder/recorder_test.go +++ b/internal/recorder/recorder_test.go @@ -125,6 +125,7 @@ func TestRecorder(t *testing.T) { for _, ca := range []string{"fmp4", "mpegts"} { t.Run(ca, func(t *testing.T) { stream, err := stream.New( + 512, 1460, desc, true, @@ -159,7 +160,6 @@ func TestRecorder(t *testing.T) { n := 0 w := &Recorder{ - WriteQueueSize: 1024, PathFormat: recordPath, Format: f, PartDuration: 100 * time.Millisecond, @@ -338,6 +338,7 @@ func TestRecorderFMP4NegativeDTS(t *testing.T) { }} stream, err := stream.New( + 512, 1460, desc, true, @@ -353,7 +354,6 @@ func TestRecorderFMP4NegativeDTS(t *testing.T) { recordPath := filepath.Join(dir, "%path/%Y-%m-%d_%H-%M-%S-%f") w := &Recorder{ - WriteQueueSize: 1024, PathFormat: recordPath, Format: conf.RecordFormatFMP4, PartDuration: 100 * time.Millisecond, @@ -425,6 +425,7 @@ func TestRecorderSkipTracks(t *testing.T) { }} stream, err := stream.New( + 512, 1460, desc, true, @@ -457,7 +458,6 @@ func TestRecorderSkipTracks(t *testing.T) { } w := &Recorder{ - WriteQueueSize: 1024, PathFormat: recordPath, Format: fo, PartDuration: 100 * time.Millisecond, diff --git a/internal/servers/hls/muxer.go b/internal/servers/hls/muxer.go index 59f125ee7df..c445321b412 100644 --- a/internal/servers/hls/muxer.go +++ b/internal/servers/hls/muxer.go @@ -54,7 +54,6 @@ type muxer struct { partDuration conf.StringDuration segmentMaxSize conf.StringSize directory string - writeQueueSize int closeAfter conf.StringDuration wg *sync.WaitGroup pathName string @@ -147,7 +146,6 @@ func (m *muxer) runInner() error { partDuration: m.partDuration, segmentMaxSize: m.segmentMaxSize, directory: m.directory, - writeQueueSize: m.writeQueueSize, pathName: m.pathName, stream: stream, bytesSent: m.bytesSent, @@ -205,7 +203,6 @@ func (m *muxer) runInner() error { partDuration: m.partDuration, segmentMaxSize: m.segmentMaxSize, directory: m.directory, - writeQueueSize: m.writeQueueSize, pathName: m.pathName, stream: stream, bytesSent: m.bytesSent, diff --git a/internal/servers/hls/muxer_instance.go b/internal/servers/hls/muxer_instance.go index aeb9de0fc33..b37d207a313 100644 --- a/internal/servers/hls/muxer_instance.go +++ b/internal/servers/hls/muxer_instance.go @@ -6,7 +6,6 @@ import ( "time" "github.com/bluenviron/gohlslib/v2" - "github.com/bluenviron/mediamtx/internal/asyncwriter" "github.com/bluenviron/mediamtx/internal/conf" "github.com/bluenviron/mediamtx/internal/defs" "github.com/bluenviron/mediamtx/internal/logger" @@ -22,19 +21,15 @@ type muxerInstance struct { partDuration conf.StringDuration segmentMaxSize conf.StringSize directory string - writeQueueSize int pathName string stream *stream.Stream bytesSent *uint64 parent logger.Writer - writer *asyncwriter.Writer hmuxer *gohlslib.Muxer } func (mi *muxerInstance) initialize() error { - mi.writer = asyncwriter.New(mi.writeQueueSize, mi) - var muxerDirectory string if mi.directory != "" { muxerDirectory = filepath.Join(mi.directory, mi.pathName) @@ -53,22 +48,21 @@ func (mi *muxerInstance) initialize() error { }, } - err := hls.FromStream(mi.stream, mi.writer, mi.hmuxer, mi) + err := hls.FromStream(mi.stream, mi, mi.hmuxer) if err != nil { - mi.stream.RemoveReader(mi.writer) return err } err = mi.hmuxer.Start() if err != nil { - mi.stream.RemoveReader(mi.writer) + mi.stream.RemoveReader(mi) return err } mi.Log(logger.Info, "is converting into HLS, %s", - defs.FormatsInfo(mi.stream.FormatsForReader(mi.writer))) + defs.FormatsInfo(mi.stream.ReaderFormats(mi))) - mi.writer.Start() + mi.stream.StartReader(mi) return nil } @@ -79,16 +73,15 @@ func (mi *muxerInstance) Log(level logger.Level, format string, args ...interfac } func (mi *muxerInstance) close() { - mi.writer.Stop() + mi.stream.RemoveReader(mi) mi.hmuxer.Close() - mi.stream.RemoveReader(mi.writer) if mi.hmuxer.Directory != "" { os.Remove(mi.hmuxer.Directory) } } func (mi *muxerInstance) errorChan() chan error { - return mi.writer.Error() + return mi.stream.ReaderError(mi) } func (mi *muxerInstance) handleRequest(ctx *gin.Context) { diff --git a/internal/servers/hls/server.go b/internal/servers/hls/server.go index fcaae395101..7b6e530085c 100644 --- a/internal/servers/hls/server.go +++ b/internal/servers/hls/server.go @@ -74,7 +74,6 @@ type Server struct { SegmentMaxSize conf.StringSize Directory string ReadTimeout conf.StringDuration - WriteQueueSize int MuxerCloseAfter conf.StringDuration PathManager serverPathManager Parent serverParent @@ -227,7 +226,6 @@ func (s *Server) createMuxer(pathName string, remoteAddr string, query string) * partDuration: s.PartDuration, segmentMaxSize: s.SegmentMaxSize, directory: s.Directory, - writeQueueSize: s.WriteQueueSize, wg: &s.wg, pathName: pathName, pathManager: s.PathManager, diff --git a/internal/servers/hls/server_test.go b/internal/servers/hls/server_test.go index fa9bfd97a25..ba8cca80a36 100644 --- a/internal/servers/hls/server_test.go +++ b/internal/servers/hls/server_test.go @@ -131,7 +131,6 @@ func TestServerNotFound(t *testing.T) { TrustedProxies: conf.IPNetworks{}, Directory: "", ReadTimeout: conf.StringDuration(10 * time.Second), - WriteQueueSize: 512, PathManager: pm, Parent: test.NilLogger, } @@ -171,6 +170,7 @@ func TestServerRead(t *testing.T) { desc := &description.Session{Medias: []*description.Media{test.MediaH264}} str, err := stream.New( + 512, 1460, desc, true, @@ -206,7 +206,6 @@ func TestServerRead(t *testing.T) { TrustedProxies: conf.IPNetworks{}, Directory: "", ReadTimeout: conf.StringDuration(10 * time.Second), - WriteQueueSize: 512, PathManager: pm, Parent: test.NilLogger, } @@ -241,6 +240,7 @@ func TestServerRead(t *testing.T) { err = c.Start() require.NoError(t, err) + defer func() { <-c.Wait() }() defer c.Close() @@ -266,6 +266,7 @@ func TestServerRead(t *testing.T) { desc := &description.Session{Medias: []*description.Media{test.MediaH264}} str, err := stream.New( + 512, 1460, desc, true, @@ -301,7 +302,6 @@ func TestServerRead(t *testing.T) { TrustedProxies: conf.IPNetworks{}, Directory: "", ReadTimeout: conf.StringDuration(10 * time.Second), - WriteQueueSize: 512, PathManager: pm, Parent: test.NilLogger, } @@ -363,6 +363,7 @@ func TestServerReadAuthorizationHeader(t *testing.T) { desc := &description.Session{Medias: []*description.Media{test.MediaH264}} str, err := stream.New( + 512, 1460, desc, true, @@ -395,7 +396,6 @@ func TestServerReadAuthorizationHeader(t *testing.T) { TrustedProxies: conf.IPNetworks{}, Directory: "", ReadTimeout: conf.StringDuration(10 * time.Second), - WriteQueueSize: 512, PathManager: pm, Parent: test.NilLogger, } @@ -463,6 +463,7 @@ func TestDirectory(t *testing.T) { desc := &description.Session{Medias: []*description.Media{test.MediaH264}} str, err := stream.New( + 512, 1460, desc, true, @@ -491,7 +492,6 @@ func TestDirectory(t *testing.T) { TrustedProxies: conf.IPNetworks{}, Directory: filepath.Join(dir, "mydir"), ReadTimeout: conf.StringDuration(10 * time.Second), - WriteQueueSize: 512, PathManager: pm, Parent: test.NilLogger, } diff --git a/internal/servers/rtmp/conn.go b/internal/servers/rtmp/conn.go index 16c33c44bdb..b9c9f0d3fc2 100644 --- a/internal/servers/rtmp/conn.go +++ b/internal/servers/rtmp/conn.go @@ -13,7 +13,6 @@ import ( "github.com/bluenviron/gortsplib/v4/pkg/description" "github.com/google/uuid" - "github.com/bluenviron/mediamtx/internal/asyncwriter" "github.com/bluenviron/mediamtx/internal/auth" "github.com/bluenviron/mediamtx/internal/conf" "github.com/bluenviron/mediamtx/internal/defs" @@ -45,7 +44,6 @@ type conn struct { rtspAddress string readTimeout conf.StringDuration writeTimeout conf.StringDuration - writeQueueSize int runOnConnect string runOnConnectRestart bool runOnDisconnect string @@ -187,16 +185,13 @@ func (c *conn) runRead(conn *rtmp.Conn, u *url.URL) error { c.query = rawQuery c.mutex.Unlock() - writer := asyncwriter.New(c.writeQueueSize, c) - defer stream.RemoveReader(writer) - - err = rtmp.FromStream(stream, writer, conn, c.nconn, time.Duration(c.writeTimeout), c) + err = rtmp.FromStream(stream, c, conn, c.nconn, time.Duration(c.writeTimeout)) if err != nil { return err } c.Log(logger.Info, "is reading from path '%s', %s", - path.Name(), defs.FormatsInfo(stream.FormatsForReader(writer))) + path.Name(), defs.FormatsInfo(stream.ReaderFormats(c))) onUnreadHook := hooks.OnRead(hooks.OnReadParams{ Logger: c, @@ -211,14 +206,14 @@ func (c *conn) runRead(conn *rtmp.Conn, u *url.URL) error { // disable read deadline c.nconn.SetReadDeadline(time.Time{}) - writer.Start() - defer writer.Stop() + stream.StartReader(c) + defer stream.RemoveReader(c) select { case <-c.ctx.Done(): return fmt.Errorf("terminated") - case err := <-writer.Error(): + case err := <-stream.ReaderError(c): return err } } diff --git a/internal/servers/rtmp/server.go b/internal/servers/rtmp/server.go index 574f4ded61e..8d954bcbd10 100644 --- a/internal/servers/rtmp/server.go +++ b/internal/servers/rtmp/server.go @@ -66,7 +66,6 @@ type Server struct { Address string ReadTimeout conf.StringDuration WriteTimeout conf.StringDuration - WriteQueueSize int IsTLS bool ServerCert string ServerKey string @@ -178,7 +177,6 @@ outer: rtspAddress: s.RTSPAddress, readTimeout: s.ReadTimeout, writeTimeout: s.WriteTimeout, - writeQueueSize: s.WriteQueueSize, runOnConnect: s.RunOnConnect, runOnConnectRestart: s.RunOnConnectRestart, runOnDisconnect: s.RunOnDisconnect, diff --git a/internal/servers/rtmp/server_test.go b/internal/servers/rtmp/server_test.go index 007691e19cc..78c0292e659 100644 --- a/internal/servers/rtmp/server_test.go +++ b/internal/servers/rtmp/server_test.go @@ -9,7 +9,6 @@ import ( "time" "github.com/bluenviron/gortsplib/v4/pkg/description" - "github.com/bluenviron/mediamtx/internal/asyncwriter" "github.com/bluenviron/mediamtx/internal/auth" "github.com/bluenviron/mediamtx/internal/conf" "github.com/bluenviron/mediamtx/internal/defs" @@ -41,6 +40,7 @@ func (p *dummyPath) ExternalCmdEnv() externalcmd.Environment { func (p *dummyPath) StartPublisher(req defs.PathStartPublisherReq) (*stream.Stream, error) { var err error p.stream, err = stream.New( + 512, 1460, req.Desc, true, @@ -110,7 +110,6 @@ func TestServerPublish(t *testing.T) { Address: "127.0.0.1:1935", ReadTimeout: conf.StringDuration(10 * time.Second), WriteTimeout: conf.StringDuration(10 * time.Second), - WriteQueueSize: 512, IsTLS: encrypt == "tls", ServerCert: serverCertFpath, ServerKey: serverKeyFpath, @@ -146,11 +145,12 @@ func TestServerPublish(t *testing.T) { <-path.streamCreated - aw := asyncwriter.New(512, test.NilLogger) - recv := make(chan struct{}) - path.stream.AddReader(aw, + reader := test.NilLogger + + path.stream.AddReader( + reader, path.stream.Desc().Medias[0], path.stream.Desc().Medias[0].Formats[0], func(u unit.Unit) error { @@ -168,9 +168,10 @@ func TestServerPublish(t *testing.T) { }) require.NoError(t, err) - aw.Start() + path.stream.StartReader(reader) + defer path.stream.RemoveReader(reader) + <-recv - aw.Stop() }) } } @@ -197,6 +198,7 @@ func TestServerRead(t *testing.T) { desc := &description.Session{Medias: []*description.Media{test.MediaH264}} stream, err := stream.New( + 512, 1460, desc, true, @@ -212,7 +214,6 @@ func TestServerRead(t *testing.T) { Address: "127.0.0.1:1935", ReadTimeout: conf.StringDuration(10 * time.Second), WriteTimeout: conf.StringDuration(10 * time.Second), - WriteQueueSize: 512, IsTLS: encrypt == "tls", ServerCert: serverCertFpath, ServerKey: serverKeyFpath, diff --git a/internal/servers/rtsp/server_test.go b/internal/servers/rtsp/server_test.go index 78f65fa5a33..7f54a4bc44d 100644 --- a/internal/servers/rtsp/server_test.go +++ b/internal/servers/rtsp/server_test.go @@ -9,7 +9,6 @@ import ( "github.com/bluenviron/gortsplib/v4/pkg/base" "github.com/bluenviron/gortsplib/v4/pkg/description" "github.com/bluenviron/gortsplib/v4/pkg/format" - "github.com/bluenviron/mediamtx/internal/asyncwriter" "github.com/bluenviron/mediamtx/internal/conf" "github.com/bluenviron/mediamtx/internal/defs" "github.com/bluenviron/mediamtx/internal/externalcmd" @@ -40,6 +39,7 @@ func (p *dummyPath) ExternalCmdEnv() externalcmd.Environment { func (p *dummyPath) StartPublisher(req defs.PathStartPublisherReq) (*stream.Stream, error) { var err error p.stream, err = stream.New( + 512, 1460, req.Desc, true, @@ -132,11 +132,12 @@ func TestServerPublish(t *testing.T) { <-path.streamCreated - aw := asyncwriter.New(512, test.NilLogger) + reader := test.NilLogger recv := make(chan struct{}) - path.stream.AddReader(aw, + path.stream.AddReader( + reader, path.stream.Desc().Medias[0], path.stream.Desc().Medias[0].Formats[0], func(u unit.Unit) error { @@ -162,15 +163,17 @@ func TestServerPublish(t *testing.T) { }) require.NoError(t, err) - aw.Start() + path.stream.StartReader(reader) + defer path.stream.RemoveReader(reader) + <-recv - aw.Stop() } func TestServerRead(t *testing.T) { desc := &description.Session{Medias: []*description.Media{test.MediaH264}} stream, err := stream.New( + 512, 1460, desc, true, diff --git a/internal/servers/srt/conn.go b/internal/servers/srt/conn.go index 1beb849c2a9..3c062975ee9 100644 --- a/internal/servers/srt/conn.go +++ b/internal/servers/srt/conn.go @@ -14,7 +14,6 @@ import ( srt "github.com/datarhei/gosrt" "github.com/google/uuid" - "github.com/bluenviron/mediamtx/internal/asyncwriter" "github.com/bluenviron/mediamtx/internal/auth" "github.com/bluenviron/mediamtx/internal/conf" "github.com/bluenviron/mediamtx/internal/defs" @@ -54,7 +53,6 @@ type conn struct { rtspAddress string readTimeout conf.StringDuration writeTimeout conf.StringDuration - writeQueueSize int udpMaxPayloadSize int connReq srt.ConnRequest runOnConnect string @@ -284,18 +282,15 @@ func (c *conn) runRead(streamID *streamID) error { c.sconn = sconn c.mutex.Unlock() - writer := asyncwriter.New(c.writeQueueSize, c) - defer stream.RemoveReader(writer) - bw := bufio.NewWriterSize(sconn, srtMaxPayloadSize(c.udpMaxPayloadSize)) - err = mpegts.FromStream(stream, writer, bw, sconn, time.Duration(c.writeTimeout), c) + err = mpegts.FromStream(stream, c, bw, sconn, time.Duration(c.writeTimeout)) if err != nil { return err } c.Log(logger.Info, "is reading from path '%s', %s", - path.Name(), defs.FormatsInfo(stream.FormatsForReader(writer))) + path.Name(), defs.FormatsInfo(stream.ReaderFormats(c))) onUnreadHook := hooks.OnRead(hooks.OnReadParams{ Logger: c, @@ -310,14 +305,14 @@ func (c *conn) runRead(streamID *streamID) error { // disable read deadline sconn.SetReadDeadline(time.Time{}) - writer.Start() - defer writer.Stop() + stream.StartReader(c) + defer stream.RemoveReader(c) select { case <-c.ctx.Done(): return fmt.Errorf("terminated") - case err = <-writer.Error(): + case err = <-stream.ReaderError(c): return err } } diff --git a/internal/servers/srt/server.go b/internal/servers/srt/server.go index 33b26f295c5..e5a2e6b5cc3 100644 --- a/internal/servers/srt/server.go +++ b/internal/servers/srt/server.go @@ -69,7 +69,6 @@ type Server struct { RTSPAddress string ReadTimeout conf.StringDuration WriteTimeout conf.StringDuration - WriteQueueSize int UDPMaxPayloadSize int RunOnConnect string RunOnConnectRestart bool @@ -158,7 +157,6 @@ outer: rtspAddress: s.RTSPAddress, readTimeout: s.ReadTimeout, writeTimeout: s.WriteTimeout, - writeQueueSize: s.WriteQueueSize, udpMaxPayloadSize: s.UDPMaxPayloadSize, connReq: req, runOnConnect: s.RunOnConnect, diff --git a/internal/servers/srt/server_test.go b/internal/servers/srt/server_test.go index 8e0f957311b..21bfea8291c 100644 --- a/internal/servers/srt/server_test.go +++ b/internal/servers/srt/server_test.go @@ -7,7 +7,6 @@ import ( "github.com/bluenviron/gortsplib/v4/pkg/description" "github.com/bluenviron/mediacommon/pkg/formats/mpegts" - "github.com/bluenviron/mediamtx/internal/asyncwriter" "github.com/bluenviron/mediamtx/internal/auth" "github.com/bluenviron/mediamtx/internal/conf" "github.com/bluenviron/mediamtx/internal/defs" @@ -39,6 +38,7 @@ func (p *dummyPath) ExternalCmdEnv() externalcmd.Environment { func (p *dummyPath) StartPublisher(req defs.PathStartPublisherReq) (*stream.Stream, error) { var err error p.stream, err = stream.New( + 512, 1460, req.Desc, true, @@ -93,7 +93,6 @@ func TestServerPublish(t *testing.T) { RTSPAddress: "", ReadTimeout: conf.StringDuration(10 * time.Second), WriteTimeout: conf.StringDuration(10 * time.Second), - WriteQueueSize: 512, UDPMaxPayloadSize: 1472, RunOnConnect: "", RunOnConnectRestart: false, @@ -139,11 +138,12 @@ func TestServerPublish(t *testing.T) { <-path.streamCreated - aw := asyncwriter.New(512, test.NilLogger) + reader := test.NilLogger recv := make(chan struct{}) - path.stream.AddReader(aw, + path.stream.AddReader( + reader, path.stream.Desc().Medias[0], path.stream.Desc().Medias[0].Formats[0], func(u unit.Unit) error { @@ -164,9 +164,10 @@ func TestServerPublish(t *testing.T) { err = bw.Flush() require.NoError(t, err) - aw.Start() + path.stream.StartReader(reader) + defer path.stream.RemoveReader(reader) + <-recv - aw.Stop() } func TestServerRead(t *testing.T) { @@ -176,6 +177,7 @@ func TestServerRead(t *testing.T) { desc := &description.Session{Medias: []*description.Media{test.MediaH264}} stream, err := stream.New( + 512, 1460, desc, true, @@ -192,7 +194,6 @@ func TestServerRead(t *testing.T) { RTSPAddress: "", ReadTimeout: conf.StringDuration(10 * time.Second), WriteTimeout: conf.StringDuration(10 * time.Second), - WriteQueueSize: 512, UDPMaxPayloadSize: 1472, RunOnConnect: "", RunOnConnectRestart: false, diff --git a/internal/servers/webrtc/server.go b/internal/servers/webrtc/server.go index 1634250904f..a04fb3fdd62 100644 --- a/internal/servers/webrtc/server.go +++ b/internal/servers/webrtc/server.go @@ -184,7 +184,6 @@ type Server struct { AllowOrigin string TrustedProxies conf.IPNetworks ReadTimeout conf.StringDuration - WriteQueueSize int LocalUDPAddress string LocalTCPAddress string IPsFromInterfaces bool @@ -312,7 +311,6 @@ outer: case req := <-s.chNewSession: sx := &session{ parentCtx: s.ctx, - writeQueueSize: s.WriteQueueSize, ipsFromInterfaces: s.IPsFromInterfaces, ipsFromInterfacesList: s.IPsFromInterfacesList, additionalHosts: s.AdditionalHosts, diff --git a/internal/servers/webrtc/server_test.go b/internal/servers/webrtc/server_test.go index 73bde323229..f421d0c947a 100644 --- a/internal/servers/webrtc/server_test.go +++ b/internal/servers/webrtc/server_test.go @@ -12,7 +12,6 @@ import ( "github.com/bluenviron/gortsplib/v4/pkg/description" "github.com/bluenviron/gortsplib/v4/pkg/format" - "github.com/bluenviron/mediamtx/internal/asyncwriter" "github.com/bluenviron/mediamtx/internal/conf" "github.com/bluenviron/mediamtx/internal/defs" "github.com/bluenviron/mediamtx/internal/externalcmd" @@ -55,6 +54,7 @@ func (p *dummyPath) ExternalCmdEnv() externalcmd.Environment { func (p *dummyPath) StartPublisher(req defs.PathStartPublisherReq) (*stream.Stream, error) { var err error p.stream, err = stream.New( + 512, 1460, req.Desc, true, @@ -111,7 +111,6 @@ func initializeTestServer(t *testing.T) *Server { AllowOrigin: "*", TrustedProxies: conf.IPNetworks{}, ReadTimeout: conf.StringDuration(10 * time.Second), - WriteQueueSize: 512, LocalUDPAddress: "127.0.0.1:8887", LocalTCPAddress: "127.0.0.1:8887", IPsFromInterfaces: true, @@ -198,7 +197,6 @@ func TestServerOptionsICEServer(t *testing.T) { AllowOrigin: "", TrustedProxies: conf.IPNetworks{}, ReadTimeout: conf.StringDuration(10 * time.Second), - WriteQueueSize: 512, LocalUDPAddress: "127.0.0.1:8887", LocalTCPAddress: "127.0.0.1:8887", IPsFromInterfaces: true, @@ -271,7 +269,6 @@ func TestServerPublish(t *testing.T) { AllowOrigin: "", TrustedProxies: conf.IPNetworks{}, ReadTimeout: conf.StringDuration(10 * time.Second), - WriteQueueSize: 512, LocalUDPAddress: "127.0.0.1:8887", LocalTCPAddress: "127.0.0.1:8887", IPsFromInterfaces: true, @@ -328,11 +325,12 @@ func TestServerPublish(t *testing.T) { <-path.streamCreated - aw := asyncwriter.New(512, test.NilLogger) + reader := test.NilLogger recv := make(chan struct{}) - path.stream.AddReader(aw, + path.stream.AddReader( + reader, path.stream.Desc().Medias[0], path.stream.Desc().Medias[0].Formats[0], func(u unit.Unit) error { @@ -363,9 +361,10 @@ func TestServerPublish(t *testing.T) { }) require.NoError(t, err) - aw.Start() + path.stream.StartReader(reader) + defer path.stream.RemoveReader(reader) + <-recv - aw.Stop() } func TestServerRead(t *testing.T) { @@ -522,6 +521,7 @@ func TestServerRead(t *testing.T) { desc := &description.Session{Medias: ca.medias} str, err := stream.New( + 512, 1460, desc, reflect.TypeOf(ca.unit) != reflect.TypeOf(&unit.Generic{}), @@ -554,7 +554,6 @@ func TestServerRead(t *testing.T) { AllowOrigin: "", TrustedProxies: conf.IPNetworks{}, ReadTimeout: conf.StringDuration(10 * time.Second), - WriteQueueSize: 512, LocalUDPAddress: "127.0.0.1:8887", LocalTCPAddress: "127.0.0.1:8887", IPsFromInterfaces: true, @@ -637,6 +636,7 @@ func TestServerReadAuthorizationBearerJWT(t *testing.T) { desc := &description.Session{Medias: []*description.Media{test.MediaH264}} str, err := stream.New( + 512, 1460, desc, true, @@ -665,7 +665,6 @@ func TestServerReadAuthorizationBearerJWT(t *testing.T) { AllowOrigin: "", TrustedProxies: conf.IPNetworks{}, ReadTimeout: conf.StringDuration(10 * time.Second), - WriteQueueSize: 512, LocalUDPAddress: "127.0.0.1:8887", LocalTCPAddress: "127.0.0.1:8887", IPsFromInterfaces: true, @@ -714,6 +713,7 @@ func TestServerReadAuthorizationUserPass(t *testing.T) { desc := &description.Session{Medias: []*description.Media{test.MediaH264}} str, err := stream.New( + 512, 1460, desc, true, @@ -744,7 +744,6 @@ func TestServerReadAuthorizationUserPass(t *testing.T) { AllowOrigin: "", TrustedProxies: conf.IPNetworks{}, ReadTimeout: conf.StringDuration(10 * time.Second), - WriteQueueSize: 512, LocalUDPAddress: "127.0.0.1:8887", LocalTCPAddress: "127.0.0.1:8887", IPsFromInterfaces: true, @@ -809,7 +808,6 @@ func TestServerReadNotFound(t *testing.T) { AllowOrigin: "", TrustedProxies: conf.IPNetworks{}, ReadTimeout: conf.StringDuration(10 * time.Second), - WriteQueueSize: 512, LocalUDPAddress: "127.0.0.1:8887", LocalTCPAddress: "127.0.0.1:8887", IPsFromInterfaces: true, diff --git a/internal/servers/webrtc/session.go b/internal/servers/webrtc/session.go index 1b99e0afb51..6bee1909407 100644 --- a/internal/servers/webrtc/session.go +++ b/internal/servers/webrtc/session.go @@ -16,7 +16,6 @@ import ( "github.com/pion/sdp/v3" pwebrtc "github.com/pion/webrtc/v3" - "github.com/bluenviron/mediamtx/internal/asyncwriter" "github.com/bluenviron/mediamtx/internal/auth" "github.com/bluenviron/mediamtx/internal/defs" "github.com/bluenviron/mediamtx/internal/externalcmd" @@ -35,7 +34,6 @@ func whipOffer(body []byte) *pwebrtc.SessionDescription { type session struct { parentCtx context.Context - writeQueueSize int ipsFromInterfaces bool ipsFromInterfacesList []string additionalHosts []string @@ -73,6 +71,7 @@ func (s *session) initialize() { s.Log(logger.Info, "created by %s", s.req.remoteAddr) s.wg.Add(1) + go s.run() } @@ -283,9 +282,6 @@ func (s *session) runRead() (int, error) { return http.StatusInternalServerError, err } - writer := asyncwriter.New(s.writeQueueSize, s) - defer stream.RemoveReader(writer) - pc := &webrtc.PeerConnection{ ICEServers: iceServers, HandshakeTimeout: s.parent.HandshakeTimeout, @@ -299,13 +295,14 @@ func (s *session) runRead() (int, error) { Log: s, } - err = webrtc.FromStream(stream, writer, pc, s) + err = webrtc.FromStream(stream, s, pc) if err != nil { return http.StatusBadRequest, err } err = pc.Start() if err != nil { + stream.RemoveReader(s) return http.StatusBadRequest, err } defer pc.Close() @@ -314,6 +311,7 @@ func (s *session) runRead() (int, error) { answer, err := pc.CreateFullAnswer(s.ctx, offer) if err != nil { + stream.RemoveReader(s) return http.StatusBadRequest, err } @@ -323,6 +321,7 @@ func (s *session) runRead() (int, error) { err = pc.WaitUntilConnected(s.ctx) if err != nil { + stream.RemoveReader(s) return 0, err } @@ -331,7 +330,7 @@ func (s *session) runRead() (int, error) { s.mutex.Unlock() s.Log(logger.Info, "is reading from path '%s', %s", - path.Name(), defs.FormatsInfo(stream.FormatsForReader(writer))) + path.Name(), defs.FormatsInfo(stream.ReaderFormats(s))) onUnreadHook := hooks.OnRead(hooks.OnReadParams{ Logger: s, @@ -343,14 +342,14 @@ func (s *session) runRead() (int, error) { }) defer onUnreadHook() - writer.Start() - defer writer.Stop() + stream.StartReader(s) + defer stream.RemoveReader(s) select { case <-pc.Disconnected(): return 0, fmt.Errorf("peer connection closed") - case err := <-writer.Error(): + case err := <-stream.ReaderError(s): return 0, err case <-s.ctx.Done(): diff --git a/internal/stream/stream.go b/internal/stream/stream.go index e92bffeb4ae..9d654f57e22 100644 --- a/internal/stream/stream.go +++ b/internal/stream/stream.go @@ -11,45 +11,54 @@ import ( "github.com/bluenviron/gortsplib/v4/pkg/format" "github.com/pion/rtp" - "github.com/bluenviron/mediamtx/internal/asyncwriter" "github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/unit" ) +// Reader is a stream reader. +type Reader interface { + logger.Writer +} + // ReadFunc is the callback passed to AddReader(). type ReadFunc func(unit.Unit) error // Stream is a media stream. // It stores tracks, readers and allows to write data to readers. type Stream struct { - desc *description.Session + writeQueueSize int + desc *description.Session bytesReceived *uint64 bytesSent *uint64 - smedias map[*description.Media]*streamMedia + streamMedias map[*description.Media]*streamMedia mutex sync.RWMutex rtspStream *gortsplib.ServerStream rtspsStream *gortsplib.ServerStream + streamReaders map[Reader]*streamReader } // New allocates a Stream. func New( + writeQueueSize int, udpMaxPayloadSize int, desc *description.Session, generateRTPPackets bool, decodeErrLogger logger.Writer, ) (*Stream, error) { s := &Stream{ - desc: desc, - bytesReceived: new(uint64), - bytesSent: new(uint64), + writeQueueSize: writeQueueSize, + desc: desc, + bytesReceived: new(uint64), + bytesSent: new(uint64), } - s.smedias = make(map[*description.Media]*streamMedia) + s.streamMedias = make(map[*description.Media]*streamMedia) + s.streamReaders = make(map[Reader]*streamReader) for _, media := range desc.Medias { var err error - s.smedias[media], err = newStreamMedia(udpMaxPayloadSize, media, generateRTPPackets, decodeErrLogger) + s.streamMedias[media], err = newStreamMedia(udpMaxPayloadSize, media, generateRTPPackets, decodeErrLogger) if err != nil { return nil, err } @@ -116,37 +125,82 @@ func (s *Stream) RTSPSStream(server *gortsplib.Server) *gortsplib.ServerStream { } // AddReader adds a reader. -func (s *Stream) AddReader(r *asyncwriter.Writer, medi *description.Media, forma format.Format, cb ReadFunc) { +// Used by all protocols except RTSP. +func (s *Stream) AddReader(reader Reader, medi *description.Media, forma format.Format, cb ReadFunc) { s.mutex.Lock() defer s.mutex.Unlock() - sm := s.smedias[medi] + sr, ok := s.streamReaders[reader] + if !ok { + sr = &streamReader{ + queueSize: s.writeQueueSize, + parent: reader, + } + sr.initialize() + + s.streamReaders[reader] = sr + } + + sm := s.streamMedias[medi] sf := sm.formats[forma] - sf.addReader(r, cb) + sf.addReader(sr, cb) } // RemoveReader removes a reader. -func (s *Stream) RemoveReader(r *asyncwriter.Writer) { +// Used by all protocols except RTSP. +func (s *Stream) RemoveReader(reader Reader) { s.mutex.Lock() defer s.mutex.Unlock() - for _, sm := range s.smedias { + sr := s.streamReaders[reader] + + for _, sm := range s.streamMedias { for _, sf := range sm.formats { - sf.removeReader(r) + sf.removeReader(sr) } } + + delete(s.streamReaders, reader) + + sr.stop() } -// FormatsForReader returns all formats that a reader is reading. -func (s *Stream) FormatsForReader(r *asyncwriter.Writer) []format.Format { +// StartReader starts a reader. +// Used by all protocols except RTSP. +func (s *Stream) StartReader(reader Reader) { s.mutex.Lock() defer s.mutex.Unlock() + sr := s.streamReaders[reader] + + sr.start() + + for _, sm := range s.streamMedias { + for _, sf := range sm.formats { + sf.startReader(sr) + } + } +} + +// ReaderError returns whenever there's an error. +func (s *Stream) ReaderError(reader Reader) chan error { + sr := s.streamReaders[reader] + return sr.error() +} + +// ReaderFormats returns all formats that a reader is reading. +func (s *Stream) ReaderFormats(reader Reader) []format.Format { + s.mutex.RLock() + defer s.mutex.RUnlock() + + sr := s.streamReaders[reader] var formats []format.Format - for _, sm := range s.smedias { + for _, sm := range s.streamMedias { for forma, sf := range sm.formats { - if _, ok := sf.readers[r]; ok { + if _, ok := sf.pausedReaders[sr]; ok { + formats = append(formats, forma) + } else if _, ok := sf.runningReaders[sr]; ok { formats = append(formats, forma) } } @@ -157,7 +211,7 @@ func (s *Stream) FormatsForReader(r *asyncwriter.Writer) []format.Format { // WriteUnit writes a Unit. func (s *Stream) WriteUnit(medi *description.Media, forma format.Format, u unit.Unit) { - sm := s.smedias[medi] + sm := s.streamMedias[medi] sf := sm.formats[forma] s.mutex.RLock() @@ -174,7 +228,7 @@ func (s *Stream) WriteRTPPacket( ntp time.Time, pts time.Duration, ) { - sm := s.smedias[medi] + sm := s.streamMedias[medi] sf := sm.formats[forma] s.mutex.RLock() diff --git a/internal/stream/stream_format.go b/internal/stream/stream_format.go index 84c665a95d3..301308750e5 100644 --- a/internal/stream/stream_format.go +++ b/internal/stream/stream_format.go @@ -8,7 +8,6 @@ import ( "github.com/bluenviron/gortsplib/v4/pkg/format" "github.com/pion/rtp" - "github.com/bluenviron/mediamtx/internal/asyncwriter" "github.com/bluenviron/mediamtx/internal/formatprocessor" "github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/unit" @@ -23,37 +22,43 @@ func unitSize(u unit.Unit) uint64 { } type streamFormat struct { - decodeErrLogger logger.Writer - proc formatprocessor.Processor - readers map[*asyncwriter.Writer]ReadFunc + udpMaxPayloadSize int + format format.Format + generateRTPPackets bool + decodeErrLogger logger.Writer + + proc formatprocessor.Processor + pausedReaders map[*streamReader]ReadFunc + runningReaders map[*streamReader]ReadFunc } -func newStreamFormat( - udpMaxPayloadSize int, - forma format.Format, - generateRTPPackets bool, - decodeErrLogger logger.Writer, -) (*streamFormat, error) { - proc, err := formatprocessor.New(udpMaxPayloadSize, forma, generateRTPPackets) +func (sf *streamFormat) initialize() error { + sf.pausedReaders = make(map[*streamReader]ReadFunc) + sf.runningReaders = make(map[*streamReader]ReadFunc) + + var err error + sf.proc, err = formatprocessor.New(sf.udpMaxPayloadSize, sf.format, sf.generateRTPPackets) if err != nil { - return nil, err + return err } - sf := &streamFormat{ - decodeErrLogger: decodeErrLogger, - proc: proc, - readers: make(map[*asyncwriter.Writer]ReadFunc), - } + return nil +} - return sf, nil +func (sf *streamFormat) addReader(sr *streamReader, cb ReadFunc) { + sf.pausedReaders[sr] = cb } -func (sf *streamFormat) addReader(r *asyncwriter.Writer, cb ReadFunc) { - sf.readers[r] = cb +func (sf *streamFormat) removeReader(sr *streamReader) { + delete(sf.pausedReaders, sr) + delete(sf.runningReaders, sr) } -func (sf *streamFormat) removeReader(r *asyncwriter.Writer) { - delete(sf.readers, r) +func (sf *streamFormat) startReader(sr *streamReader) { + if cb, ok := sf.pausedReaders[sr]; ok { + delete(sf.pausedReaders, sr) + sf.runningReaders[sr] = cb + } } func (sf *streamFormat) writeUnit(s *Stream, medi *description.Media, u unit.Unit) { @@ -73,7 +78,7 @@ func (sf *streamFormat) writeRTPPacket( ntp time.Time, pts time.Duration, ) { - hasNonRTSPReaders := len(sf.readers) > 0 + hasNonRTSPReaders := len(sf.pausedReaders) > 0 || len(sf.runningReaders) > 0 u, err := sf.proc.ProcessRTPPacket(pkt, ntp, pts, hasNonRTSPReaders) if err != nil { @@ -101,9 +106,9 @@ func (sf *streamFormat) writeUnitInner(s *Stream, medi *description.Media, u uni } } - for writer, cb := range sf.readers { + for sr, cb := range sf.runningReaders { ccb := cb - writer.Push(func() error { + sr.push(func() error { atomic.AddUint64(s.bytesSent, size) return ccb(u) }) diff --git a/internal/stream/stream_media.go b/internal/stream/stream_media.go index bd366ce06db..f406968384e 100644 --- a/internal/stream/stream_media.go +++ b/internal/stream/stream_media.go @@ -21,11 +21,17 @@ func newStreamMedia(udpMaxPayloadSize int, } for _, forma := range medi.Formats { - var err error - sm.formats[forma], err = newStreamFormat(udpMaxPayloadSize, forma, generateRTPPackets, decodeErrLogger) + sf := &streamFormat{ + udpMaxPayloadSize: udpMaxPayloadSize, + format: forma, + generateRTPPackets: generateRTPPackets, + decodeErrLogger: decodeErrLogger, + } + err := sf.initialize() if err != nil { return nil, err } + sm.formats[forma] = sf } return sm, nil diff --git a/internal/stream/stream_reader.go b/internal/stream/stream_reader.go new file mode 100644 index 00000000000..a2e896f1339 --- /dev/null +++ b/internal/stream/stream_reader.go @@ -0,0 +1,69 @@ +package stream + +import ( + "fmt" + + "github.com/bluenviron/gortsplib/v4/pkg/ringbuffer" + "github.com/bluenviron/mediamtx/internal/logger" +) + +type streamReader struct { + queueSize int + parent logger.Writer + + writeErrLogger logger.Writer + buffer *ringbuffer.RingBuffer + started bool + + // out + err chan error +} + +func (w *streamReader) initialize() { + w.writeErrLogger = logger.NewLimitedLogger(w.parent) + buffer, _ := ringbuffer.New(uint64(w.queueSize)) + w.buffer = buffer + w.err = make(chan error) +} + +func (w *streamReader) start() { + w.started = true + go w.run() +} + +func (w *streamReader) stop() { + w.buffer.Close() + if w.started { + <-w.err + } +} + +func (w *streamReader) error() chan error { + return w.err +} + +func (w *streamReader) run() { + w.err <- w.runInner() + close(w.err) +} + +func (w *streamReader) runInner() error { + for { + cb, ok := w.buffer.Pull() + if !ok { + return fmt.Errorf("terminated") + } + + err := cb.(func() error)() + if err != nil { + return err + } + } +} + +func (w *streamReader) push(cb func() error) { + ok := w.buffer.Push(cb) + if !ok { + w.writeErrLogger.Log(logger.Warn, "write queue is full") + } +} diff --git a/internal/test/logger.go b/internal/test/logger.go index 39df66a8531..77c4003c728 100644 --- a/internal/test/logger.go +++ b/internal/test/logger.go @@ -10,10 +10,15 @@ func (nilLogger) Log(_ logger.Level, _ string, _ ...interface{}) { // NilLogger is a logger to /dev/null var NilLogger logger.Writer = &nilLogger{} -// Logger is a test logger. -type Logger func(logger.Level, string, ...interface{}) +type testLogger struct { + cb func(level logger.Level, format string, args ...interface{}) +} + +func (l *testLogger) Log(level logger.Level, format string, args ...interface{}) { + l.cb(level, format, args...) +} -// Log implements logger.Writer. -func (l Logger) Log(level logger.Level, format string, args ...interface{}) { - l(level, format, args...) +// Logger returns a test logger. +func Logger(cb func(logger.Level, string, ...interface{})) logger.Writer { + return &testLogger{cb: cb} } diff --git a/internal/test/source_tester.go b/internal/test/source_tester.go index 92d6e4d38de..03d59a4b424 100644 --- a/internal/test/source_tester.go +++ b/internal/test/source_tester.go @@ -4,7 +4,6 @@ package test import ( "context" - "github.com/bluenviron/mediamtx/internal/asyncwriter" "github.com/bluenviron/mediamtx/internal/conf" "github.com/bluenviron/mediamtx/internal/defs" "github.com/bluenviron/mediamtx/internal/logger" @@ -17,7 +16,7 @@ type SourceTester struct { ctx context.Context ctxCancel func() stream *stream.Stream - writer *asyncwriter.Writer + reader stream.Reader Unit chan unit.Unit done chan struct{} @@ -55,8 +54,7 @@ func NewSourceTester( // Close closes the tester. func (t *SourceTester) Close() { t.ctxCancel() - t.writer.Stop() - t.stream.Close() + t.stream.RemoveReader(t.reader) <-t.done } @@ -67,20 +65,22 @@ func (t *SourceTester) Log(_ logger.Level, _ string, _ ...interface{}) { // SetReady implements StaticSourceParent. func (t *SourceTester) SetReady(req defs.PathSourceStaticSetReadyReq) defs.PathSourceStaticSetReadyRes { t.stream, _ = stream.New( + 512, 1460, req.Desc, req.GenerateRTPPackets, t, ) - t.writer = asyncwriter.New(2048, t) + t.reader = NilLogger - t.stream.AddReader(t.writer, req.Desc.Medias[0], req.Desc.Medias[0].Formats[0], func(u unit.Unit) error { + t.stream.AddReader(t.reader, req.Desc.Medias[0], req.Desc.Medias[0].Formats[0], func(u unit.Unit) error { t.Unit <- u close(t.Unit) return nil }) - t.writer.Start() + + t.stream.StartReader(t.reader) return defs.PathSourceStaticSetReadyRes{ Stream: t.stream,