Skip to content

Commit

Permalink
Fix playback get stuck when there is a stream error on a single track (
Browse files Browse the repository at this point in the history
  • Loading branch information
marcelveldt authored Sep 8, 2024
1 parent 656f23f commit fbd9909
Show file tree
Hide file tree
Showing 7 changed files with 291 additions and 177 deletions.
3 changes: 3 additions & 0 deletions music_assistant/common/models/streamdetails.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ class StreamDetails(DataClassDictMixin):
seconds_streamed: float | None = None
target_loudness: float | None = None
bypass_loudness_normalization: bool = False
strip_silence_begin: bool = False
strip_silence_end: bool = False
stream_error: bool | None = None

def __str__(self) -> str:
"""Return pretty printable string of object."""
Expand Down
16 changes: 15 additions & 1 deletion music_assistant/server/controllers/player_queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,12 @@
from music_assistant.common.models.player_queue import PlayerQueue
from music_assistant.common.models.queue_item import QueueItem
from music_assistant.common.models.streamdetails import StreamDetails
from music_assistant.constants import CONF_FLOW_MODE, FALLBACK_DURATION, MASS_LOGO_ONLINE
from music_assistant.constants import (
CONF_CROSSFADE,
CONF_FLOW_MODE,
FALLBACK_DURATION,
MASS_LOGO_ONLINE,
)
from music_assistant.server.helpers.api import api_command
from music_assistant.server.helpers.audio import get_stream_details
from music_assistant.server.models.core_controller import CoreController
Expand Down Expand Up @@ -767,6 +772,10 @@ async def play_index(
queue_item.streamdetails = await get_stream_details(
self.mass, queue_item, seek_position=seek_position, fade_in=fade_in
)
# allow stripping silence from the end of the track if crossfade is enabled
# this will allow for smoother crossfades
if await self.mass.config.get_player_config_value(queue_id, CONF_CROSSFADE):
queue_item.streamdetails.strip_silence_end = True
# send play_media request to player
# NOTE that we debounce this a bit to account for someone hitting the next button
# like a madman. This will prevent the player from being overloaded with requests.
Expand Down Expand Up @@ -1029,6 +1038,11 @@ async def preload_next_item(
# maximum quality of thumbs
if queue_item.media_item:
queue_item.media_item = await self.mass.music.get_item_by_uri(queue_item.uri)
# allow stripping silence from the begin/end of the track if crossfade is enabled
# this will allow for (much) smoother crossfades
if await self.mass.config.get_player_config_value(queue_id, CONF_CROSSFADE):
queue_item.streamdetails.strip_silence_end = True
queue_item.streamdetails.strip_silence_begin = True
# we're all set, this is our next item
next_item = queue_item
break
Expand Down
132 changes: 30 additions & 102 deletions music_assistant/server/controllers/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

from __future__ import annotations

import asyncio
import os
import time
import urllib.parse
Expand Down Expand Up @@ -47,19 +46,17 @@
)
from music_assistant.server.helpers.audio import LOGGER as AUDIO_LOGGER
from music_assistant.server.helpers.audio import (
FFMpeg,
check_audio_support,
crossfade_pcm_parts,
get_chunksize,
get_ffmpeg_stream,
get_hls_radio_stream,
get_hls_substream,
get_icy_radio_stream,
get_media_stream,
get_player_filter_params,
get_silence,
get_stream_details,
parse_loudnorm,
strip_silence,
)
from music_assistant.server.helpers.util import get_ips
from music_assistant.server.helpers.webserver import Webserver
Expand Down Expand Up @@ -336,7 +333,8 @@ async def serve_queue_item_stream(self, request: web.Request) -> web.Response:

# all checks passed, start streaming!
self.logger.debug(
"Start serving audio stream for QueueItem %s to %s",
"Start serving audio stream for QueueItem %s (%s) to %s",
queue_item.name,
queue_item.uri,
queue.display_name,
)
Expand All @@ -347,6 +345,7 @@ async def serve_queue_item_stream(self, request: web.Request) -> web.Response:
bit_depth=queue_item.streamdetails.audio_format.bit_depth,
channels=2,
)
chunk_num = 0
async for chunk in get_ffmpeg_stream(
audio_input=self.get_media_stream(
streamdetails=queue_item.streamdetails,
Expand All @@ -358,8 +357,16 @@ async def serve_queue_item_stream(self, request: web.Request) -> web.Response:
):
try:
await resp.write(chunk)
chunk_num += 1
except (BrokenPipeError, ConnectionResetError, ConnectionError):
break
if queue_item.streamdetails.stream_error:
self.logger.error(
"Error streaming QueueItem %s (%s) to %s",
queue_item.name,
queue_item.uri,
queue.display_name,
)
if queue.stream_finished is not None:
queue.stream_finished = True
return resp
Expand Down Expand Up @@ -586,7 +593,6 @@ async def get_flow_stream(
use_crossfade,
)
total_bytes_sent = 0
started = time.time()

while True:
# get (next) queue item to stream
Expand Down Expand Up @@ -629,14 +635,7 @@ async def get_flow_stream(
pcm_format=pcm_format,
):
# buffer size needs to be big enough to include the crossfade part
# allow it to be a bit smaller when playback just starts
if not use_crossfade:
req_buffer_size = pcm_sample_size * 2
elif (time.time() - started) > 120:
# additional 5 seconds to strip silence from last part
req_buffer_size = crossfade_size + pcm_sample_size * 5
else:
req_buffer_size = crossfade_size
req_buffer_size = pcm_sample_size * 2 if not use_crossfade else crossfade_size

# ALWAYS APPEND CHUNK TO BUFFER
buffer += chunk
Expand All @@ -647,14 +646,6 @@ async def get_flow_stream(

#### HANDLE CROSSFADE OF PREVIOUS TRACK AND NEW TRACK
if last_fadeout_part:
# strip silence from last part
buffer = await strip_silence(
self.mass,
buffer,
sample_rate=pcm_format.sample_rate,
bit_depth=pcm_format.bit_depth,
reverse=False,
)
# perform crossfade
fadein_part = buffer[:crossfade_size]
remaining_bytes = buffer[crossfade_size:]
Expand Down Expand Up @@ -690,14 +681,6 @@ async def get_flow_stream(
bytes_written += len(last_fadeout_part)
last_fadeout_part = b""
if use_crossfade:
# strip silence from last part
buffer = await strip_silence(
self.mass,
buffer,
sample_rate=pcm_format.sample_rate,
bit_depth=pcm_format.bit_depth,
reverse=True,
)
# if crossfade is enabled, save fadeout part to pickup for next track
last_fadeout_part = buffer[-crossfade_size:]
remaining_bytes = buffer[:-crossfade_size]
Expand Down Expand Up @@ -772,7 +755,6 @@ async def get_media_stream(
pcm_format: AudioFormat,
) -> AsyncGenerator[tuple[bool, bytes], None]:
"""Get the audio stream for the given streamdetails as raw pcm chunks."""
logger = self.logger.getChild("media_stream")
is_radio = streamdetails.media_type == MediaType.RADIO or not streamdetails.duration
if is_radio:
streamdetails.seek_position = 0
Expand Down Expand Up @@ -810,8 +792,6 @@ async def get_media_stream(
)
filter_rule += ":print_format=json"
filter_params.append(filter_rule)
if streamdetails.fade_in:
filter_params.append("afade=type=in:start_time=0:duration=3")
if streamdetails.stream_type == StreamType.CUSTOM:
audio_source = self.mass.get_provider(streamdetails.provider).get_audio_stream(
streamdetails,
Expand All @@ -833,8 +813,14 @@ async def get_media_stream(
extra_input_args += ["-decryption_key", streamdetails.decryption_key]
else:
audio_source = streamdetails.path
if streamdetails.seek_position:
extra_input_args += ["-ss", str(int(streamdetails.seek_position))]

# handle seek support
if (
streamdetails.seek_position
and streamdetails.media_type != MediaType.RADIO
and streamdetails.stream_type != StreamType.CUSTOM
):
extra_input_args += ["-ss", str(int(streamdetails.seek_position))]

if streamdetails.media_type == MediaType.RADIO:
# pad some silence before the radio stream starts to create some headroom
Expand All @@ -843,73 +829,15 @@ async def get_media_stream(
async for chunk in get_silence(2, pcm_format):
yield chunk

logger.debug("start media stream for: %s", streamdetails.uri)
bytes_sent = 0
finished = False
try:
async with FFMpeg(
audio_input=audio_source,
input_format=streamdetails.audio_format,
output_format=pcm_format,
filter_params=filter_params,
extra_input_args=extra_input_args,
collect_log_history=True,
logger=logger,
) as ffmpeg_proc:
async for chunk in ffmpeg_proc.iter_chunked(pcm_format.pcm_sample_size):
bytes_sent += len(chunk)
yield chunk
# del chunk
finished = True
finally:
if "ffmpeg_proc" not in locals():
# edge case: ffmpeg process was not yet started
return # noqa: B012
if finished and not ffmpeg_proc.closed:
await asyncio.wait_for(ffmpeg_proc.wait(), 60)
elif not ffmpeg_proc.closed:
await ffmpeg_proc.close()

# try to determine how many seconds we've streamed
seconds_streamed = bytes_sent / pcm_format.pcm_sample_size if bytes_sent else 0
logger.debug(
"stream %s (with code %s) for %s - seconds streamed: %s",
"finished" if finished else "aborted",
ffmpeg_proc.returncode,
streamdetails.uri,
seconds_streamed,
)
streamdetails.seconds_streamed = seconds_streamed
# store accurate duration
if finished and not streamdetails.seek_position and seconds_streamed:
streamdetails.duration = seconds_streamed

# parse loudnorm data if we have that collected
if loudness_details := parse_loudnorm(" ".join(ffmpeg_proc.log_history)):
required_seconds = 600 if streamdetails.media_type == MediaType.RADIO else 120
if finished or (seconds_streamed >= required_seconds):
logger.debug(
"Loudness measurement for %s: %s",
streamdetails.uri,
loudness_details,
)
streamdetails.loudness = loudness_details
self.mass.create_task(
self.mass.music.set_track_loudness(
streamdetails.item_id, streamdetails.provider, loudness_details
)
)
# report playback
if finished or seconds_streamed > 30:
self.mass.create_task(
self.mass.music.mark_item_played(
streamdetails.media_type,
streamdetails.item_id,
streamdetails.provider,
)
)
if music_prov := self.mass.get_provider(streamdetails.provider):
self.mass.create_task(music_prov.on_streamed(streamdetails, seconds_streamed))
async for chunk in get_media_stream(
self.mass,
streamdetails=streamdetails,
pcm_format=pcm_format,
audio_source=audio_source,
filter_params=filter_params,
extra_input_args=extra_input_args,
):
yield chunk

def _log_request(self, request: web.Request) -> None:
"""Log request."""
Expand Down
Loading

0 comments on commit fbd9909

Please sign in to comment.