Skip to content

Commit

Permalink
refactor streaming a bit more
Browse files Browse the repository at this point in the history
  • Loading branch information
marcelveldt committed Mar 20, 2024
1 parent 55563d1 commit 9a5339a
Show file tree
Hide file tree
Showing 14 changed files with 241 additions and 284 deletions.
292 changes: 121 additions & 171 deletions music_assistant/server/controllers/streams.py

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion music_assistant/server/helpers/audio.py
Original file line number Diff line number Diff line change
Expand Up @@ -935,6 +935,7 @@ def get_ffmpeg_args(
extra_args: list[str],
input_path: str = "-",
output_path: str = "-",
loglevel: str = "info",
) -> list[str]:
"""Collect all args to send to the ffmpeg process."""
ffmpeg_present, libsoxr_support, version = get_global_cache_value("ffmpeg_support")
Expand All @@ -955,7 +956,7 @@ def get_ffmpeg_args(
"ffmpeg",
"-hide_banner",
"-loglevel",
"info",
loglevel,
"-ignore_unknown",
"-protocol_whitelist",
"file,http,https,tcp,tls,crypto,pipe,data,fd",
Expand Down
101 changes: 50 additions & 51 deletions music_assistant/server/providers/airplay/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import platform
import socket
import time
from collections.abc import AsyncGenerator
from contextlib import suppress
from dataclasses import dataclass
from random import randint, randrange
Expand Down Expand Up @@ -52,6 +51,7 @@
from music_assistant.common.models.provider import ProviderManifest
from music_assistant.common.models.queue_item import QueueItem
from music_assistant.server import MusicAssistant
from music_assistant.server.controllers.streams import QueueStreamJob
from music_assistant.server.models import ProviderInstanceType

DOMAIN = "airplay"
Expand Down Expand Up @@ -105,6 +105,10 @@
CACHE_KEY_PREV_VOLUME = "airplay_prev_volume"
FALLBACK_VOLUME = 20

AIRPLAY_PCM_FORMAT = AudioFormat(
content_type=ContentType.from_bit_depth(16), sample_rate=44100, bit_depth=16
)


async def setup(
mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
Expand Down Expand Up @@ -174,20 +178,20 @@ def get_primary_ip_address(discovery_info: AsyncServiceInfo) -> str | None:
return None


class AirplayStreamJob:
class AirplayStream:
"""Object that holds the details of a stream job."""

def __init__(self, prov: AirplayProvider, airplay_player: AirPlayPlayer) -> None:
"""Initialize AirplayStreamJob."""
"""Initialize AirplayStream."""
self.prov = prov
self.mass = prov.mass
self.airplay_player = airplay_player
# always generate a new active remote id to prevent race conditions
# with the named pipe used to send commands
# with the named pipe used to send audio
self.active_remote_id: str = str(randint(1000, 8000))
self.start_ntp: int | None = None # use as checksum
self.prevent_playback: bool = False
self._audio_iterator: AsyncGenerator[bytes, None] | None = None
self.stream_job: QueueStreamJob | None = None
self._log_reader_task: asyncio.Task | None = None
self._audio_reader_task: asyncio.Task | None = None
self._cliraop_proc: AsyncProcess | None = None
Expand All @@ -202,10 +206,10 @@ def running(self) -> bool:
and self._cliraop_proc.returncode is None
)

async def start(self, start_ntp: int, audio_iterator: AsyncGenerator[bytes, None]) -> None:
async def start(self, start_ntp: int, stream_job: QueueStreamJob) -> None:
"""Initialize CLIRaop process for a player."""
self.start_ntp = start_ntp
self._audio_iterator = audio_iterator
self.stream_job = stream_job
extra_args = []
player_id = self.airplay_player.player_id
mass_player = self.mass.players.get(player_id)
Expand All @@ -216,7 +220,6 @@ async def start(self, start_ntp: int, audio_iterator: AsyncGenerator[bytes, None
for prop in ("et", "md", "am", "pk", "pw"):
if prop_value := self.airplay_player.discovery_info.decoded_properties.get(prop):
extra_args += [f"-{prop}", prop_value]

sync_adjust = self.mass.config.get_raw_player_config_value(player_id, CONF_SYNC_ADJUST, 0)
if device_password := self.mass.config.get_raw_player_config_value(
player_id, CONF_PASSWORD, None
Expand All @@ -227,7 +230,7 @@ async def start(self, start_ntp: int, audio_iterator: AsyncGenerator[bytes, None
elif self.prov.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
extra_args += ["-debug", "10"]

args = [
cliraop_args = [
self.prov.cliraop_bin,
"-ntpstart",
str(start_ntp),
Expand All @@ -243,14 +246,18 @@ async def start(self, start_ntp: int, audio_iterator: AsyncGenerator[bytes, None
"-activeremote",
self.active_remote_id,
"-udn",
str(self.airplay_player.discovery_info.name),
self.airplay_player.discovery_info.name,
self.airplay_player.address,
"-",
]
if platform.system() == "Darwin":
os.environ["DYLD_LIBRARY_PATH"] = "/usr/local/lib"

self._cliraop_proc = AsyncProcess(
args, enable_stdin=True, enable_stdout=False, enable_stderr=True
cliraop_args,
enable_stdin=True,
enable_stdout=False,
enable_stderr=True,
)
await self._cliraop_proc.start()
self._log_reader_task = asyncio.create_task(self._log_watcher())
Expand Down Expand Up @@ -279,7 +286,7 @@ async def _stop() -> None:

async def send_cli_command(self, command: str) -> None:
"""Send an interactive command to the running CLIRaop binary."""
if not (self._cliraop_proc and self._cliraop_proc.returncode is None):
if not self._cliraop_proc or self._cliraop_proc.closed:
return

named_pipe = f"/tmp/fifo-{self.active_remote_id}" # noqa: S108
Expand Down Expand Up @@ -345,7 +352,7 @@ async def _log_watcher(self) -> None:
self.mass.players.update(airplay_player.player_id)

async def _audio_reader(self) -> None:
"""Read audio chunks and send them to the cliraop process."""
"""Send audio chunks to the cliraop process."""
logger = self.airplay_player.logger
mass_player = self.mass.players.get(self.airplay_player.player_id, True)
queue = self.mass.player_queues.get_active_queue(mass_player.active_source)
Expand All @@ -356,7 +363,10 @@ async def _audio_reader(self) -> None:
)
prev_metadata_checksum: str = ""
prev_progress_report: float = 0
async for chunk in self._audio_iterator:

async for chunk in self.stream_job.iter_player_audio(
self.airplay_player.player_id, AIRPLAY_PCM_FORMAT
):
if self._stop_requested:
return
await self._cliraop_proc.write(chunk)
Expand Down Expand Up @@ -442,7 +452,7 @@ class AirPlayPlayer:
discovery_info: AsyncServiceInfo
address: str
logger: logging.Logger
active_stream: AirplayStreamJob | None = None
active_stream: AirplayStream | None = None


class AirplayProvider(PlayerProvider):
Expand Down Expand Up @@ -597,31 +607,30 @@ async def play_media(
for airplay_player in self._get_sync_clients(player_id):
if airplay_player.active_stream and airplay_player.active_stream.running:
await airplay_player.active_stream.stop(wait=False)
pcm_format = AudioFormat(
content_type=ContentType.PCM_S16LE,
sample_rate=44100,
bit_depth=16,
channels=2,
)
if queue_item.media_type == MediaType.ANNOUNCEMENT:
# stream announcement url directly
stream_job = None

if queue_item.queue_id.startswith(UGP_PREFIX):
# special case: we got forwarded a request from the UGP
# use the existing stream job that was already created by UGP
stream_job = self.mass.streams.stream_jobs[queue_item.queue_id]
else:
# create a new (multi client) flow stream
# note that in case of an existing streamjob created by the UGP, it will return
# the existing job here
stream_job = await self.mass.streams.create_stream_job(
queue_item.queue_id,
queue_item,
pcm_bit_depth=16,
pcm_sample_rate=44100,
if queue_item.media_type == MediaType.ANNOUNCEMENT:
# stream announcement url directly
audio_source = get_media_stream(
self.mass, queue_item.streamdetails, pcm_format=AIRPLAY_PCM_FORMAT
)
else:
queue = self.mass.player_queues.get(queue_item.queue_id)
audio_source = self.mass.streams.get_flow_stream(
queue, start_queue_item=queue_item, pcm_format=AIRPLAY_PCM_FORMAT
)
stream_job = self.mass.streams.create_stream_job(
queue_item.queue_id, pcm_audio_source=audio_source, pcm_format=AIRPLAY_PCM_FORMAT
)

# Python is not suitable for realtime audio streaming.
# So, I've decided to go the fancy route here. I've created a small binary
# written in C based on libraop to do the actual timestamped playback.
# the raw pcm audio is fed to the stdin of this cliraop binary and we can
# send some commands over a named pipe.
# Python is not suitable for realtime audio streaming so we do the actual streaming
# of (RAOP) audio using a small executable written in C based on libraop to do the actual
# timestamped playback. The raw pcm audio is fed to a named pipe, read by the executable
# and we can send some ingteractie commands to the process stdin.

# get current ntp before we start
_, stdout = await check_output(f"{self.cliraop_bin} -ntp")
Expand All @@ -630,20 +639,10 @@ async def play_media(
# setup Raop process for player and its sync childs
async with asyncio.TaskGroup() as tg:
for airplay_player in self._get_sync_clients(player_id):
if queue_item.media_type == MediaType.ANNOUNCEMENT:
# stream announcement url directly
audio_iterator = get_media_stream(
self.mass, queue_item.streamdetails, pcm_format=pcm_format
)
else:
stream_job.expected_players.add(airplay_player.player_id)
audio_iterator = stream_job.subscribe(
player_id=airplay_player.player_id,
output_format=pcm_format,
)
airplay_player.active_stream = AirplayStreamJob(self, airplay_player)
tg.create_task(airplay_player.active_stream.start(start_ntp, audio_iterator))
if stream_job and not queue_item.queue_id.startswith(UGP_PREFIX):
stream_job.expected_players.add(airplay_player.player_id)
airplay_player.active_stream = AirplayStream(self, airplay_player)
tg.create_task(airplay_player.active_stream.start(start_ntp, stream_job))
if not queue_item.queue_id.startswith(UGP_PREFIX):
stream_job.start()

async def cmd_volume_set(self, player_id: str, volume_level: int) -> None:
Expand Down
Binary file not shown.
Binary file not shown.
Binary file modified music_assistant/server/providers/airplay/bin/cliraop-macos-arm64
Binary file not shown.
4 changes: 2 additions & 2 deletions music_assistant/server/providers/chromecast/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ async def play_media(
use_flow_mode = await self.mass.config.get_player_config_value(
player_id, CONF_FLOW_MODE
) or await self.mass.config.get_player_config_value(player_id, CONF_CROSSFADE)
url = await self.mass.streams.resolve_stream_url(
url = self.mass.streams.resolve_stream_url(
player_id,
queue_item=queue_item,
output_codec=ContentType.FLAC,
Expand All @@ -268,7 +268,7 @@ async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem) -
url = self.mass.streams.get_command_url(queue_item, "next")
queue_item = None
else:
url = await self.mass.streams.resolve_stream_url(
url = self.mass.streams.resolve_stream_url(
player_id,
queue_item=queue_item,
output_codec=ContentType.FLAC,
Expand Down
4 changes: 2 additions & 2 deletions music_assistant/server/providers/dlna/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ async def play_media(
"""Handle PLAY MEDIA on given player."""
use_flow_mode = await self.mass.config.get_player_config_value(player_id, CONF_FLOW_MODE)
enforce_mp3 = await self.mass.config.get_player_config_value(player_id, CONF_ENFORCE_MP3)
url = await self.mass.streams.resolve_stream_url(
url = self.mass.streams.resolve_stream_url(
player_id,
queue_item=queue_item,
output_codec=ContentType.MP3 if enforce_mp3 else ContentType.FLAC,
Expand Down Expand Up @@ -383,7 +383,7 @@ async def play_media(
async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem) -> None:
"""Handle enqueuing of the next queue item on the player."""
dlna_player = self.dlnaplayers[player_id]
url = await self.mass.streams.resolve_stream_url(
url = self.mass.streams.resolve_stream_url(
player_id,
queue_item=queue_item,
output_codec=ContentType.FLAC,
Expand Down
2 changes: 1 addition & 1 deletion music_assistant/server/providers/fully_kiosk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ async def play_media(
"""Handle PLAY MEDIA on given player."""
player = self.mass.players.get(player_id)
enforce_mp3 = await self.mass.config.get_player_config_value(player_id, CONF_ENFORCE_MP3)
url = await self.mass.streams.resolve_stream_url(
url = self.mass.streams.resolve_stream_url(
player_id,
queue_item=queue_item,
output_codec=ContentType.MP3 if enforce_mp3 else ContentType.FLAC,
Expand Down
4 changes: 2 additions & 2 deletions music_assistant/server/providers/hass_players/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ async def play_media(
"""Handle PLAY MEDIA on given player."""
use_flow_mode = await self.mass.config.get_player_config_value(player_id, CONF_FLOW_MODE)
enforce_mp3 = await self.mass.config.get_player_config_value(player_id, CONF_ENFORCE_MP3)
url = await self.mass.streams.resolve_stream_url(
url = self.mass.streams.resolve_stream_url(
player_id,
queue_item=queue_item,
output_codec=ContentType.MP3 if enforce_mp3 else ContentType.FLAC,
Expand Down Expand Up @@ -288,7 +288,7 @@ async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem) -
This will NOT be called if the end of the queue is reached (and repeat disabled).
This will NOT be called if the player is using flow mode to playback the queue.
"""
url = await self.mass.streams.resolve_stream_url(
url = self.mass.streams.resolve_stream_url(
player_id,
queue_item=queue_item,
output_codec=ContentType.FLAC,
Expand Down
36 changes: 26 additions & 10 deletions music_assistant/server/providers/slimproto/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
RepeatMode,
)
from music_assistant.common.models.errors import MusicAssistantError, SetupFailedError
from music_assistant.common.models.media_items import AudioFormat
from music_assistant.common.models.player import DeviceInfo, Player
from music_assistant.constants import (
CONF_CROSSFADE,
Expand All @@ -55,6 +56,7 @@
VERBOSE_LOG_LEVEL,
)
from music_assistant.server.models.player_provider import PlayerProvider
from music_assistant.server.providers.ugp import UGP_PREFIX

if TYPE_CHECKING:
from aioslimproto.models import SlimEvent
Expand Down Expand Up @@ -337,17 +339,30 @@ async def play_media(
if player.synced_to:
msg = "A synced player cannot receive play commands directly"
raise RuntimeError(msg)
enforce_mp3 = await self.mass.config.get_player_config_value(player_id, CONF_ENFORCE_MP3)

if player.group_childs:
# player has sync members, we need to start a multi slimplayer stream job
stream_job = await self.mass.streams.create_stream_job(
# player has sync members, we need to start a (multi-player) stream job
# to make sure that all clients receive the exact same audio
pcm_format = AudioFormat(
content_type=ContentType.from_bit_depth(24), sample_rate=48000, bit_depth=24
)
queue = self.mass.player_queues.get(queue_item.queue_id)
stream_job = self.mass.streams.create_stream_job(
queue_id=queue_item.queue_id,
start_queue_item=queue_item,
pcm_audio_source=self.mass.streams.get_flow_stream(
queue,
start_queue_item=queue_item,
pcm_format=pcm_format,
),
pcm_format=pcm_format,
)
# forward command to player and any connected sync members
sync_clients = self._get_sync_clients(player_id)
async with asyncio.TaskGroup() as tg:
for slimplayer in sync_clients:
enforce_mp3 = await self.mass.config.get_player_config_value(
slimplayer.player_id, CONF_ENFORCE_MP3
)
tg.create_task(
self._handle_play_url(
slimplayer,
Expand All @@ -360,18 +375,19 @@ async def play_media(
auto_play=False,
)
)
if queue_item.queue_item_id != "flow":
if not queue_item.queue_id.startswith(UGP_PREFIX):
stream_job.start()
else:
# regular, single player playback
slimplayer = self.slimproto.get_player(player_id)
if not slimplayer:
return
url = await self.mass.streams.resolve_stream_url(
enforce_mp3 = await self.mass.config.get_player_config_value(
player_id, CONF_ENFORCE_MP3
)
url = self.mass.streams.resolve_stream_url(
player_id,
queue_item=queue_item,
# for now just hardcode flac as we assume that every (modern)
# slimproto based player can handle that just fine
output_codec=ContentType.MP3 if enforce_mp3 else ContentType.FLAC,
flow_mode=False,
)
Expand All @@ -388,7 +404,7 @@ async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem) -
if not (slimplayer := self.slimproto.get_player(player_id)):
return
enforce_mp3 = await self.mass.config.get_player_config_value(player_id, CONF_ENFORCE_MP3)
url = await self.mass.streams.resolve_stream_url(
url = self.mass.streams.resolve_stream_url(
player_id,
queue_item=queue_item,
output_codec=ContentType.MP3 if enforce_mp3 else ContentType.FLAC,
Expand Down Expand Up @@ -717,7 +733,7 @@ def _handle_client_sync(self, slimplayer: SlimClient) -> None:
sync_playpoints = self._sync_playpoints[slimplayer.player_id]

active_queue = self.mass.player_queues.get_active_queue(slimplayer.player_id)
stream_job = self.mass.streams.multi_client_jobs.get(active_queue.queue_id)
stream_job = self.mass.streams.stream_jobs.get(active_queue.queue_id)
if not stream_job:
# should not happen, but just in case
return
Expand Down
Loading

0 comments on commit 9a5339a

Please sign in to comment.