Skip to content

Commit

Permalink
Some small bugfixes and enhancements (#1117)
Browse files Browse the repository at this point in the history
* Prevent race condition on streamreader when stopping raop streamer

* Handle prevent playback for airplay players

* enforce auth for airport express

* ensure player is powered before considering queue active

* clear previous player queue state
  • Loading branch information
marcelveldt authored Feb 26, 2024
1 parent a9074f1 commit 3869d35
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 11 deletions.
3 changes: 2 additions & 1 deletion music_assistant/server/controllers/player_queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -712,9 +712,10 @@ def on_player_update(
queue.available = player.available
queue.items = len(self._queue_items[queue_id])
# determine if this queue is currently active for this player
queue.active = player.active_source == queue.queue_id
queue.active = player.powered and player.active_source == queue.queue_id
if not queue.active:
queue.state = PlayerState.IDLE
self._prev_states.pop(queue_id, None)
return
# update current item from player report
if queue.flow_mode:
Expand Down
60 changes: 50 additions & 10 deletions music_assistant/server/providers/airplay/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ def __init__(self, prov: AirplayProvider, airplay_player: AirPlayPlayer) -> None
# with the named pipe used to send commands
self.active_remote_id: str = str(randint(1000, 8000))
self.start_ntp: int | None = None # use as checksum
self.prevent_playback: bool = False
self._audio_buffer = asyncio.Queue(2)
self._log_reader_task: asyncio.Task | None = None
self._audio_reader_task: asyncio.Task | None = None
Expand All @@ -202,6 +203,9 @@ async def init_cliraop(self, start_ntp: int) -> None:
extra_args += ["-e"]
if self.mass.config.get_raw_player_config_value(player_id, CONF_ALAC_ENCODE, True):
extra_args += ["-a"]
if "airport" in mass_player.device_info.model.lower():
# enforce auth on airport express
extra_args += ["-auth"]
sync_adjust = self.mass.config.get_raw_player_config_value(player_id, CONF_SYNC_ADJUST, 0)
if device_password := self.mass.config.get_raw_player_config_value(
player_id, CONF_PASSWORD, None
Expand Down Expand Up @@ -247,17 +251,23 @@ async def init_cliraop(self, start_ntp: int) -> None:
self._log_reader_task = asyncio.create_task(self._log_watcher())
self._audio_reader_task = asyncio.create_task(self._audio_reader())

async def stop(self):
async def stop(self, force=False):
"""Stop playback and cleanup."""
if not self.running:
return
await self.send_cli_command("ACTION=STOP")
self._stop_requested = True
# stop background tasks
if self._log_reader_task and not self._log_reader_task.done():
self._log_reader_task.cancel()
if force:
self._log_reader_task.cancel()
with suppress(asyncio.CancelledError):
await self._log_reader_task
if self._audio_reader_task and not self._audio_reader_task.done():
self._audio_reader_task.cancel()
if force:
self._audio_reader_task.cancel()
with suppress(asyncio.CancelledError):
await self._audio_reader_task

empty_queue(self._audio_buffer)
await asyncio.wait_for(self._cliraop_proc.communicate(), 30)
Expand Down Expand Up @@ -491,7 +501,7 @@ async def cmd_stop(self, player_id: str) -> None:

async def stop_player(airplay_player: AirPlayPlayer) -> None:
if airplay_player.active_stream:
await airplay_player.active_stream.stop()
await airplay_player.active_stream.stop(force=False)
mass_player = self.mass.players.get(airplay_player.player_id)
mass_player.state = PlayerState.IDLE
self.mass.players.update(airplay_player.player_id)
Expand Down Expand Up @@ -551,7 +561,7 @@ async def play_media(
existing_stream.cancel()
for airplay_player in self._get_sync_clients(player_id):
if airplay_player.active_stream and airplay_player.active_stream.running:
self.mass.create_task(airplay_player.active_stream.stop())
self.mass.create_task(airplay_player.active_stream.stop(force=True))
# start streaming the queue (pcm) audio in a background task
queue = self.mass.player_queues.get_active_queue(player_id)
self._stream_tasks[player_id] = asyncio.create_task(
Expand Down Expand Up @@ -588,7 +598,7 @@ async def play_stream(self, player_id: str, stream_job: MultiClientStreamJob) ->
async with asyncio.TaskGroup() as tg:
for airplay_player in self._get_sync_clients(player_id):
if airplay_player.active_stream and airplay_player.active_stream.running:
tg.create_task(airplay_player.active_stream.stop())
tg.create_task(airplay_player.active_stream.stop(force=True))
if stream_job.pcm_format.bit_depth != 16 or stream_job.pcm_format.sample_rate != 44100:
# TODO: resample on the fly here ?
raise RuntimeError("Unsupported PCM format")
Expand Down Expand Up @@ -629,9 +639,6 @@ async def _stream_audio(

# setup Raop process for player and its sync childs
for airplay_player in self._get_sync_clients(player_id):
# make sure that existing stream is stopped
if airplay_player.active_stream:
await airplay_player.active_stream.stop()
airplay_player.active_stream = AirplayStreamJob(self, airplay_player)
await airplay_player.active_stream.init_cliraop(start_ntp)
prev_metadata_checksum: str = ""
Expand Down Expand Up @@ -941,8 +948,17 @@ async def _handle_dacp_request( # noqa: PLR0915
volume = int(path.split("dmcp.volume=", 1)[-1])
if abs(volume - mass_player.volume_level) > 2:
self.mass.create_task(self.cmd_volume_set(player_id, volume))
elif "device-prevent-playback=1" in path:
# device switched to another source (or is powered off)
if active_stream := airplay_player.active_stream:
active_stream.prevent_playback = True
self.mass.create_task(self.monitor_prevent_playback(player_id))
elif "device-prevent-playback=0" in path:
# device reports that its ready for playback again
if active_stream := airplay_player.active_stream:
active_stream.prevent_playback = False
else:
self.logger.debug(
self.logger.info(
"Unknown DACP request for %s: %s",
airplay_player.discovery_info.name,
path,
Expand Down Expand Up @@ -1014,3 +1030,27 @@ async def _send_progress(self, player_id: str, queue: PlayerQueue) -> None:
if not airplay_player.active_stream or not airplay_player.active_stream.running:
continue
await airplay_player.active_stream.send_cli_command(f"PROGRESS={progress}\n")

async def monitor_prevent_playback(self, player_id: str):
"""Monitor the prevent playback state of an airplay player."""
count = 0
if not (airplay_player := self._players.get(player_id)):
return
prev_ntp = airplay_player.active_stream.start_ntp
while count < 40:
count += 1
if not (airplay_player := self._players.get(player_id)):
return
if not airplay_player.active_stream:
return
if airplay_player.active_stream.start_ntp != prev_ntp:
# checksum
return
if not airplay_player.active_stream.prevent_playback:
return
await asyncio.sleep(0.25)

airplay_player.logger.info(
"Player has been in prevent playback mode for too long, powering off.",
)
await self.mass.players.cmd_power(airplay_player.player_id, False)

0 comments on commit 3869d35

Please sign in to comment.