Skip to content

Commit

Permalink
refactor: Improve goroutine synchronization and error handling in aud…
Browse files Browse the repository at this point in the history
…io capture

- Update WaitGroup management in audio capture and analysis routines
- Simplify goroutine startup and synchronization patterns
- Add more explicit logging for audio capture and FFmpeg process management
- Remove redundant WaitGroup increments in goroutine calls
  • Loading branch information
tphakala committed Feb 16, 2025
1 parent 2c8989e commit e945214
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 20 deletions.
13 changes: 8 additions & 5 deletions internal/analysis/realtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,10 @@ func startAudioCapture(wg *sync.WaitGroup, settings *conf.Settings, quitChan, re
// startClipCleanupMonitor initializes and starts the clip cleanup monitoring routine in a new goroutine.
func startClipCleanupMonitor(wg *sync.WaitGroup, quitChan chan struct{}, dataStore datastore.Interface) {
wg.Add(1)
go clipCleanupMonitor(wg, quitChan, dataStore)
go func() {
defer wg.Done()
clipCleanupMonitor(quitChan, dataStore)
}()
}

// startWeatherPolling initializes and starts the weather polling routine in a new goroutine.
Expand Down Expand Up @@ -260,9 +263,7 @@ func closeDataStore(store datastore.Interface) {
}

// ClipCleanupMonitor monitors the database and deletes clips that meet the retention policy.
func clipCleanupMonitor(wg *sync.WaitGroup, quitChan chan struct{}, dataStore datastore.Interface) {
defer wg.Done() // Ensure that the WaitGroup is marked as done after the function exits

func clipCleanupMonitor(quitChan chan struct{}, dataStore datastore.Interface) {
// Create a ticker that triggers every five minutes to perform cleanup
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop() // Ensure the ticker is stopped to prevent leaks
Expand Down Expand Up @@ -331,7 +332,9 @@ func initBirdImageCache(ds datastore.Interface, metrics *telemetry.Metrics) *ima
// Mark this species as being initialized
birdImageCache.Initializing.Store(species.ScientificName, struct{}{})
go func(name string) {
defer wg.Done()
defer func() {
wg.Done()
}()
defer birdImageCache.Initializing.Delete(name) // Remove initialization mark when done
sem <- struct{}{} // Acquire semaphore
defer func() { <-sem }() // Release semaphore
Expand Down
5 changes: 4 additions & 1 deletion internal/myaudio/analysis_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,10 @@ func AnalysisBufferMonitor(wg *sync.WaitGroup, bn *birdnet.BirdNET, quitChan cha
// preRecordingTime is the time to subtract from the current time to get the start time of the detection
const preRecordingTime = -5000 * time.Millisecond

defer wg.Done()
wg.Add(1)
defer func() {
wg.Done()
}()

// Creating a ticker that ticks every 100ms
ticker := time.NewTicker(pollInterval)
Expand Down
19 changes: 10 additions & 9 deletions internal/myaudio/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,6 @@ func ReconfigureRTSPStreams(settings *conf.Settings, wg *sync.WaitGroup, quitCha
}

// New stream, start it
wg.Add(1)
activeStreams.Store(url, true)
go CaptureAudioRTSP(url, settings.Realtime.RTSP.Transport, wg, quitChan, restartChan, audioLevelChan)
}
Expand Down Expand Up @@ -316,7 +315,6 @@ func CaptureAudio(settings *conf.Settings, wg *sync.WaitGroup, quitChan, restart
continue
}

wg.Add(1)
activeStreams.Store(url, true)
go CaptureAudioRTSP(url, settings.Realtime.RTSP.Transport, wg, quitChan, restartChan, audioLevelChan)
}
Expand All @@ -343,7 +341,6 @@ func CaptureAudio(settings *conf.Settings, wg *sync.WaitGroup, quitChan, restart
}

// Device audio capture
wg.Add(1)
go captureAudioMalgo(settings, selectedSource, wg, quitChan, restartChan, audioLevelChan)
}
}
Expand Down Expand Up @@ -425,7 +422,7 @@ func ValidateAudioDevice(settings *conf.Settings) error {
settings.Realtime.Audio.Source = ""
return fmt.Errorf("failed to initialize audio context: %w", err)
}
defer malgoCtx.Uninit() //nolint:errcheck
defer malgoCtx.Uninit() //nolint:errcheck // We handle errors in the caller

// Get list of capture devices
infos, err := malgoCtx.Devices(malgo.Capture)
Expand Down Expand Up @@ -481,7 +478,7 @@ func selectCaptureSource(settings *conf.Settings) (captureSource, error) {
if err != nil {
return captureSource{}, fmt.Errorf("audio context initialization failed: %w", err)
}
defer malgoCtx.Uninit() //nolint:errcheck
defer malgoCtx.Uninit() //nolint:errcheck // We handle errors in the caller

// Get list of capture sources
infos, err := malgoCtx.Devices(malgo.Capture)
Expand Down Expand Up @@ -543,7 +540,10 @@ func hexToASCII(hexStr string) (string, error) {
}

func captureAudioMalgo(settings *conf.Settings, source captureSource, wg *sync.WaitGroup, quitChan, restartChan chan struct{}, audioLevelChan chan AudioLevelData) {
defer wg.Done() // Ensure this is called when the goroutine exits
wg.Add(1)
defer func() {
wg.Done()
}()

if settings.Debug {
fmt.Println("Initializing context")
Expand Down Expand Up @@ -625,6 +625,7 @@ func captureAudioMalgo(settings *conf.Settings, source captureSource, wg *sync.W
go func() {
select {
case <-quitChan:
log.Printf("🛑 DEBUG: Quit signal received, do not attempt to restart")
// Quit signal has been received, do not attempt to restart
return
case <-time.After(100 * time.Millisecond):
Expand Down Expand Up @@ -687,9 +688,9 @@ func captureAudioMalgo(settings *conf.Settings, source captureSource, wg *sync.W
select {
case <-quitChan:
// QuitChannel was closed, clean up and return.
if settings.Debug {
fmt.Println("🛑 Stopping audio capture due to quit signal.")
}
//if settings.Debug {
fmt.Println("🛑 Stopping audio capture due to quit signal.")
//}
time.Sleep(100 * time.Millisecond)
return
case <-restartChan:
Expand Down
7 changes: 2 additions & 5 deletions internal/myaudio/ffmpeg_input.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ func (p *FFmpegProcess) Cleanup(url string) {

select {
case <-done:
log.Printf("🛑 FFmpeg process for RTSP source %s stopped normally", url)
// Process finished normally
case <-time.After(10 * time.Second):
// Timeout occurred, forcefully kill the process
Expand Down Expand Up @@ -364,7 +365,7 @@ func startFFmpeg(ctx context.Context, config FFmpegConfig) (*FFmpegProcess, erro
}

// Log the FFmpeg command for debugging purposes
log.Println("⬆️ Starting FFmpeg with command:", cmd.String())
log.Printf("⬆️ Starting FFmpeg with command: %s", cmd.String())

// Start the FFmpeg process
if err := cmd.Start(); err != nil {
Expand Down Expand Up @@ -474,7 +475,6 @@ func manageFfmpegLifecycle(ctx context.Context, config FFmpegConfig, restartChan
select {
case <-ctx.Done():
// Context cancelled, stop the FFmpeg process
log.Printf("🔴 Context cancelled, stopping FFmpeg for RTSP source %s.", config.URL)
process.Cleanup(config.URL)
return ctx.Err()

Expand Down Expand Up @@ -555,9 +555,6 @@ func getExitCode(err error) int {

// CaptureAudioRTSP is the main function for capturing audio from an RTSP stream
func CaptureAudioRTSP(url, transport string, wg *sync.WaitGroup, quitChan <-chan struct{}, restartChan chan struct{}, audioLevelChan chan AudioLevelData) {
// Ensure the WaitGroup is decremented when the function exits
defer wg.Done()

// Return with error if FFmpeg path is not set
if conf.GetFfmpegBinaryName() == "" {
log.Printf("❌ FFmpeg is not available, cannot capture audio from RTSP source %s.", url)
Expand Down

0 comments on commit e945214

Please sign in to comment.