Skip to content

Commit

Permalink
Fix high memory and cpu usage from analyze jobs (#1157)
Browse files Browse the repository at this point in the history
  • Loading branch information
marcelveldt authored Mar 21, 2024
1 parent fa207d3 commit c99f837
Showing 1 changed file with 2 additions and 62 deletions.
64 changes: 2 additions & 62 deletions music_assistant/server/helpers/audio.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
from music_assistant.server import MusicAssistant

LOGGER = logging.getLogger(f"{ROOT_LOGGER_NAME}.audio")
analyze_jobs: set[str] = set()
# pylint:disable=consider-using-f-string,too-many-locals,too-many-statements


Expand Down Expand Up @@ -180,60 +179,6 @@ async def strip_silence(
return stripped_data


async def analyze_loudness(mass: MusicAssistant, streamdetails: StreamDetails) -> None:
"""Analyze track audio to calculate EBU R128 loudness."""
if streamdetails.uri in analyze_jobs:
return
if len(analyze_jobs) >= 5:
LOGGER.debug("Skip analyzing EBU R128 loudness: max number of jobs reached")
return
try:
analyze_jobs.add(streamdetails.uri)
item_name = f"{streamdetails.provider}/{streamdetails.item_id}"
LOGGER.debug("Start analyzing EBU R128 loudness for %s", item_name)
# calculate EBU R128 integrated loudness with ffmpeg
ffmpeg_args = get_ffmpeg_args(
input_format=streamdetails.audio_format,
output_format=streamdetails.audio_format,
filter_params=["loudnorm=print_format=json"],
extra_args=["-t", "600"], # limit to 10 minutes to prevent OOM
input_path=streamdetails.direct or "-",
output_path="NULL",
)
async with AsyncProcess(
ffmpeg_args,
enable_stdin=streamdetails.direct is None,
enable_stdout=False,
enable_stderr=True,
) as ffmpeg_proc:
if streamdetails.direct is None:
music_prov = mass.get_provider(streamdetails.provider)
chunk_count = 0
async for audio_chunk in music_prov.get_audio_stream(streamdetails):
chunk_count += 1
await ffmpeg_proc.write(audio_chunk)
if chunk_count == 600:
# safety guard: max (more or less) 10 minutes of audio may be analyzed!
break
await ffmpeg_proc.write_eof()

_, stderr = await ffmpeg_proc.communicate()
if loudness_details := _parse_loudnorm(stderr):
LOGGER.debug("Loudness measurement for %s: %s", item_name, loudness_details)
streamdetails.loudness = loudness_details
await mass.music.set_track_loudness(
streamdetails.item_id, streamdetails.provider, loudness_details
)
else:
LOGGER.warning(
"Could not determine EBU R128 loudness of %s - %s",
item_name,
stderr.decode() or "received empty value",
)
finally:
analyze_jobs.discard(streamdetails.uri)


async def get_stream_details(
mass: MusicAssistant,
queue_item: QueueItem,
Expand Down Expand Up @@ -531,8 +476,8 @@ async def writer() -> None:
finished = False
elif loudness_details := _parse_loudnorm(stderr):
logger.log(VERBOSE_LOG_LEVEL, stderr.decode())
required_seconds = 300 if streamdetails.media_type == MediaType.RADIO else 60
if finished or seconds_streamed >= required_seconds:
required_seconds = 600 if streamdetails.media_type == MediaType.RADIO else 120
if finished or (seconds_streamed >= required_seconds):
LOGGER.debug("Loudness measurement for %s: %s", streamdetails.uri, loudness_details)
streamdetails.loudness = loudness_details
await mass.music.set_track_loudness(
Expand All @@ -551,11 +496,6 @@ async def writer() -> None:
if music_prov := mass.get_provider(streamdetails.provider):
mass.create_task(music_prov.on_streamed(streamdetails, seconds_streamed))

if not streamdetails.loudness:
# send loudness analyze job to background worker
# note that we only do this if a track was at least been partially played
mass.create_task(analyze_loudness(mass, streamdetails))


async def resolve_radio_stream(mass: MusicAssistant, url: str) -> tuple[str, bool]:
"""
Expand Down

0 comments on commit c99f837

Please sign in to comment.