diff --git a/kopf/_core/reactor/running.py b/kopf/_core/reactor/running.py index 96186ef8..164ac2f0 100644 --- a/kopf/_core/reactor/running.py +++ b/kopf/_core/reactor/running.py @@ -114,7 +114,7 @@ async def operator( It is efficiently `spawn_tasks` + `run_tasks` with some safety. """ existing_tasks = await aiotasks.all_tasks() - operator_tasks = await spawn_tasks( + system_tasks, root_tasks = await spawn_tasks( lifecycle=lifecycle, indexers=indexers, registry=registry, @@ -135,7 +135,7 @@ async def operator( memo=memo, _command=_command, ) - await run_tasks(operator_tasks, ignored=existing_tasks) + await run_tasks(system_tasks, root_tasks, ignored=existing_tasks) async def spawn_tasks( @@ -196,7 +196,8 @@ async def spawn_tasks( signal_flag: aiotasks.Future = asyncio.Future() started_flag: asyncio.Event = asyncio.Event() operator_paused = aiotoggles.ToggleSet(any) - tasks: MutableSequence[aiotasks.Task] = [] + root_tasks: MutableSequence[aiotasks.Task] = [] + system_tasks: MutableSequence[aiotasks.Task] = [] # Map kwargs into the settings object. settings.peering.clusterwide = clusterwide @@ -219,20 +220,21 @@ async def spawn_tasks( posting.settings_var.set(settings) # A few common background forever-running infrastructural tasks (irregular root tasks). - tasks.append(asyncio.create_task( + system_tasks.append(aiotasks.create_task( name="stop-flag checker", coro=_stop_flag_checker( signal_flag=signal_flag, stop_flag=stop_flag))) - tasks.append(asyncio.create_task( + system_tasks.append(aiotasks.create_task( name="ultimate termination", coro=_ultimate_termination( settings=settings, stop_flag=stop_flag))) - tasks.append(asyncio.create_task( + system_tasks.append(aiotasks.create_task( name="startup/cleanup activities", coro=_startup_cleanup_activities( - root_tasks=tasks, # used as a "live" view, populated later. + system_tasks=system_tasks, # used as a "live" view, populated later. + root_tasks=root_tasks, # used as a "live" view, populated later. ready_flag=ready_flag, started_flag=started_flag, registry=registry, @@ -242,7 +244,7 @@ async def spawn_tasks( memo=memo))) # to purge & finalize the caches in the end. # Kill all the daemons gracefully when the operator exits (so that they are not "hung"). - tasks.append(aiotasks.create_guarded_task( + system_tasks.append(aiotasks.create_guarded_task( name="daemon killer", flag=started_flag, logger=logger, coro=daemons.daemon_killer( settings=settings, @@ -250,7 +252,7 @@ async def spawn_tasks( operator_paused=operator_paused))) # Keeping the credentials fresh and valid via the authentication handlers on demand. - tasks.append(aiotasks.create_guarded_task( + system_tasks.append(aiotasks.create_guarded_task( name="credentials retriever", flag=started_flag, logger=logger, coro=activities.authenticator( registry=registry, @@ -261,7 +263,7 @@ async def spawn_tasks( # K8s-event posting. Events are queued in-memory and posted in the background. # NB: currently, it is a global task, but can be made per-resource or per-object. - tasks.append(aiotasks.create_guarded_task( + root_tasks.append(aiotasks.create_guarded_task( name="poster of events", flag=started_flag, logger=logger, coro=posting.poster( settings=settings, @@ -270,7 +272,7 @@ async def spawn_tasks( # Liveness probing -- so that Kubernetes would know that the operator is alive. if liveness_endpoint: - tasks.append(aiotasks.create_guarded_task( + root_tasks.append(aiotasks.create_guarded_task( name="health reporter", flag=started_flag, logger=logger, coro=probing.health_reporter( registry=registry, @@ -282,19 +284,19 @@ async def spawn_tasks( # Admission webhooks run as either a server or a tunnel or a fixed config. # The webhook manager automatically adjusts the cluster configuration at runtime. container: aiovalues.Container[reviews.WebhookClientConfig] = aiovalues.Container() - tasks.append(aiotasks.create_guarded_task( + root_tasks.append(aiotasks.create_guarded_task( name="admission insights chain", flag=started_flag, logger=logger, coro=aiobindings.condition_chain( source=insights.revised, target=container.changed))) - tasks.append(aiotasks.create_guarded_task( + root_tasks.append(aiotasks.create_guarded_task( name="admission validating configuration manager", flag=started_flag, logger=logger, coro=admission.validating_configuration_manager( container=container, settings=settings, registry=registry, insights=insights))) - tasks.append(aiotasks.create_guarded_task( + root_tasks.append(aiotasks.create_guarded_task( name="admission mutating configuration manager", flag=started_flag, logger=logger, coro=admission.mutating_configuration_manager( container=container, settings=settings, registry=registry, insights=insights))) - tasks.append(aiotasks.create_guarded_task( + root_tasks.append(aiotasks.create_guarded_task( name="admission webhook server", flag=started_flag, logger=logger, coro=admission.admission_webhook_server( container=container, settings=settings, registry=registry, insights=insights, @@ -305,13 +307,13 @@ async def spawn_tasks( # Permanent observation of what resource kinds and namespaces are available in the cluster. # Spawn and cancel dimensional tasks as they come and go; dimensions = resources x namespaces. - tasks.append(aiotasks.create_guarded_task( + root_tasks.append(aiotasks.create_guarded_task( name="resource observer", flag=started_flag, logger=logger, coro=observation.resource_observer( insights=insights, registry=registry, settings=settings))) - tasks.append(aiotasks.create_guarded_task( + root_tasks.append(aiotasks.create_guarded_task( name="namespace observer", flag=started_flag, logger=logger, coro=observation.namespace_observer( clusterwide=clusterwide, @@ -322,11 +324,11 @@ async def spawn_tasks( # Explicit command is a hack for the CLI to run coroutines in an operator-like environment. # If not specified, then use the normal resource processing. It is not exposed publicly (yet). if _command is not None: - tasks.append(aiotasks.create_guarded_task( + root_tasks.append(aiotasks.create_guarded_task( name="the command", flag=started_flag, logger=logger, finishable=True, coro=_command)) else: - tasks.append(aiotasks.create_guarded_task( + root_tasks.append(aiotasks.create_guarded_task( name="multidimensional multitasker", flag=started_flag, logger=logger, coro=orchestration.ochestrator( settings=settings, @@ -357,10 +359,11 @@ async def spawn_tasks( else: logger.warning("OS signals are ignored: running not in the main thread.") - return tasks + return system_tasks, root_tasks async def run_tasks( + system_tasks: Collection[aiotasks.Task], root_tasks: Collection[aiotasks.Task], *, ignored: Collection[aiotasks.Task] = frozenset(), @@ -391,16 +394,25 @@ async def run_tasks( # If the operator is cancelled, propagate the cancellation to all the sub-tasks. # There is no graceful period: cancel as soon as possible, but allow them to finish. try: - root_done, root_pending = await aiotasks.wait(root_tasks, return_when=asyncio.FIRST_COMPLETED) + # Start both system and root tasks together. + tasks = system_tasks + root_tasks + tasks_done, tasks_pending = await aiotasks.wait(tasks, return_when=asyncio.FIRST_COMPLETED) except asyncio.CancelledError: + # First stop regular root tasks. await aiotasks.stop(root_tasks, title="Root", logger=logger, cancelled=True, interval=10) + # Then stop system tasks on which root tasks depend. + await aiotasks.stop(system_tasks, title="System", logger=logger, cancelled=True, interval=10) hung_tasks = await aiotasks.all_tasks(ignored=ignored) await aiotasks.stop(hung_tasks, title="Hung", logger=logger, cancelled=True, interval=1) raise # If the operator is intact, but one of the root tasks has exited (successfully or not), # cancel all the remaining root tasks, and gracefully exit other spawned sub-tasks. + root_pending = [task for task in tasks_pending if task not in system_tasks] root_cancelled, _ = await aiotasks.stop(root_pending, title="Root", logger=logger) + system_pending = [task for task in tasks_pending if task not in root_tasks] + system_cancelled, _ = await aiotasks.stop(system_pending, title="System", logger=logger) + tasks_cancelled = root_cancelled | system_cancelled # After the root tasks are all gone, cancel any spawned sub-tasks (e.g. handlers). # If the operator is cancelled, propagate the cancellation to all the sub-tasks. @@ -416,7 +428,7 @@ async def run_tasks( hung_cancelled, _ = await aiotasks.stop(hung_pending, title="Hung", logger=logger, interval=1) # If succeeded or if cancellation is silenced, re-raise from failed tasks (if any). - await aiotasks.reraise(root_done | root_cancelled | hung_done | hung_cancelled) + await aiotasks.reraise(tasks_done | tasks_cancelled | hung_done | hung_cancelled) async def _stop_flag_checker( @@ -477,6 +489,7 @@ async def _ultimate_termination( async def _startup_cleanup_activities( + system_tasks: Sequence[aiotasks.Task], # mutated externally! root_tasks: Sequence[aiotasks.Task], # mutated externally! ready_flag: Optional[aioadapters.Flag], started_flag: asyncio.Event, @@ -528,8 +541,10 @@ async def _startup_cleanup_activities( # Beware: on explicit operator cancellation, there is no graceful period at all. try: current_task = asyncio.current_task() - awaited_tasks = {task for task in root_tasks if task is not current_task} - await aiotasks.wait(awaited_tasks) + awaited_root_tasks = {task for task in root_tasks if task is not current_task} + await aiotasks.wait(awaited_root_tasks) + awaited_system_tasks = {task for task in system_tasks if task is not current_task} + await aiotasks.wait(awaited_system_tasks) except asyncio.CancelledError: logger.warning("Cleanup activity is not executed at all due to cancellation.") raise