From 1c5b982d7b30d682f1ad81ea6174056ee05f7a03 Mon Sep 17 00:00:00 2001 From: Engel Nyst Date: Thu, 9 Jan 2025 05:06:39 +0100 Subject: [PATCH 01/24] refactor delegation --- openhands/controller/agent_controller.py | 145 +++++++++++++---------- 1 file changed, 84 insertions(+), 61 deletions(-) diff --git a/openhands/controller/agent_controller.py b/openhands/controller/agent_controller.py index 7899c2dcfab0..3d27f4fccad8 100644 --- a/openhands/controller/agent_controller.py +++ b/openhands/controller/agent_controller.py @@ -113,11 +113,14 @@ def __init__( self.agent = agent self.headless_mode = headless_mode - # subscribe to the event stream + # the event stream must be set before maybe subscribing to it self.event_stream = event_stream - self.event_stream.subscribe( - EventStreamSubscriber.AGENT_CONTROLLER, self.on_event, self.id - ) + + # subscribe to the event stream if this is not a delegate + if not is_delegate: + self.event_stream.subscribe( + EventStreamSubscriber.AGENT_CONTROLLER, self.on_event, self.id + ) # state from the previous session, state from a parent agent, or a fresh state self.set_initial_state( @@ -229,6 +232,8 @@ def should_step(self, event: Event) -> bool: if isinstance(event, Action): if isinstance(event, MessageAction) and event.source == EventSource.USER: return True + if isinstance(event, AgentDelegateAction): + return True return False if isinstance(event, Observation): if isinstance(event, NullObservation) or isinstance( @@ -244,6 +249,25 @@ def on_event(self, event: Event) -> None: Args: event (Event): The incoming event to process. """ + + # If we have a delegate that is not finished or errored, forward events to it + if self.delegate is not None: + delegate_state = self.delegate.get_agent_state() + if delegate_state not in ( + AgentState.FINISHED, + AgentState.ERROR, + AgentState.REJECTED, + ): + # Forward the event to delegate and skip parent processing + asyncio.get_event_loop().run_until_complete( + self.delegate._on_event(event) + ) + return + else: + # Delegate is done - unset delegate so parent can resume normal handling + self.delegate = None + self.delegateAction = None + asyncio.get_event_loop().run_until_complete(self._on_event(event)) async def _on_event(self, event: Event) -> None: @@ -316,6 +340,60 @@ async def _handle_observation(self, observation: Observation) -> None: elif isinstance(observation, ErrorObservation): if self.state.agent_state == AgentState.ERROR: self.state.metrics.merge(self.state.local_metrics) + elif isinstance(observation, AgentStateChangedObservation): + # if this is a delegate, check for stop conditions + if self.is_delegate: + delegate_state = observation.agent_state + self.log('debug', f'Delegate state: {delegate_state}') + + if delegate_state == AgentState.ERROR: + # update iteration that shall be shared across agents + self.state.iteration = self.delegate.state.iteration + + # emit AgentDelegateObservation to mark delegate termination due to error + delegate_outputs = ( + self.delegate.state.outputs if self.delegate.state else {} + ) + content = f'{self.delegate.agent.name} encountered an error during execution.' + + # close the delegate upon error + await self.delegate.close() + + # emit the delegate result observation + obs = AgentDelegateObservation( + outputs=delegate_outputs, content=content + ) + self.event_stream.add_event(obs, EventSource.AGENT) + + elif delegate_state in (AgentState.FINISHED, AgentState.REJECTED): + # update iteration that shall be shared across agents + self.state.iteration = self.delegate.state.iteration + + # retrieve delegate result + delegate_outputs = ( + self.delegate.state.outputs if self.delegate.state else {} + ) + + # close delegate controller: we must close the delegate controller before adding new events + await self.delegate.close() + + # resubscribe parent when delegate is finished + self.event_stream.subscribe( + EventStreamSubscriber.AGENT_CONTROLLER, self.on_event, self.id + ) + + # prepare delegate result observation + # TODO: replace this with AI-generated summary (#2395) + formatted_output = ', '.join( + f'{key}: {value}' for key, value in delegate_outputs.items() + ) + content = f'{self.delegate.agent.name} finishes task with {formatted_output}' + + # emit the delegate result observation + obs = AgentDelegateObservation( + outputs=delegate_outputs, content=content + ) + self.event_stream.add_event(obs, EventSource.AGENT) async def _handle_message_action(self, action: MessageAction) -> None: """Handles message actions from the event stream. @@ -491,7 +569,7 @@ async def start_delegate(self, action: AgentDelegateAction) -> None: f'start delegate, creating agent {delegate_agent.name} using LLM {llm}', ) - self.event_stream.unsubscribe(EventStreamSubscriber.AGENT_CONTROLLER, self.id) + # Create the delegate with is_delegate=True so it does NOT subscribe directly self.delegate = AgentController( sid=self.id + '-delegate', agent=delegate_agent, @@ -504,6 +582,7 @@ async def start_delegate(self, action: AgentDelegateAction) -> None: is_delegate=True, headless_mode=self.headless_mode, ) + await self.delegate.set_agent_state_to(AgentState.RUNNING) async def _step(self) -> None: @@ -614,63 +693,7 @@ async def _step(self) -> None: async def _delegate_step(self) -> None: """Executes a single step of the delegate agent.""" await self.delegate._step() # type: ignore[union-attr] - assert self.delegate is not None - delegate_state = self.delegate.get_agent_state() - self.log('debug', f'Delegate state: {delegate_state}') - if delegate_state == AgentState.ERROR: - # update iteration that shall be shared across agents - self.state.iteration = self.delegate.state.iteration - - # emit AgentDelegateObservation to mark delegate termination due to error - delegate_outputs = ( - self.delegate.state.outputs if self.delegate.state else {} - ) - content = ( - f'{self.delegate.agent.name} encountered an error during execution.' - ) - obs = AgentDelegateObservation(outputs=delegate_outputs, content=content) - self.event_stream.add_event(obs, EventSource.AGENT) - - # close the delegate upon error - await self.delegate.close() - - # resubscribe parent when delegate is finished - self.event_stream.subscribe( - EventStreamSubscriber.AGENT_CONTROLLER, self.on_event, self.id - ) - self.delegate = None - self.delegateAction = None - - elif delegate_state in (AgentState.FINISHED, AgentState.REJECTED): - self.log('debug', 'Delegate agent has finished execution') - # retrieve delegate result - outputs = self.delegate.state.outputs if self.delegate.state else {} - - # update iteration that shall be shared across agents - self.state.iteration = self.delegate.state.iteration - - # close delegate controller: we must close the delegate controller before adding new events - await self.delegate.close() - - # resubscribe parent when delegate is finished - self.event_stream.subscribe( - EventStreamSubscriber.AGENT_CONTROLLER, self.on_event, self.id - ) - - # update delegate result observation - # TODO: replace this with AI-generated summary (#2395) - formatted_output = ', '.join( - f'{key}: {value}' for key, value in outputs.items() - ) - content = ( - f'{self.delegate.agent.name} finishes task with {formatted_output}' - ) - obs = AgentDelegateObservation(outputs=outputs, content=content) - # clean up delegate status - self.delegate = None - self.delegateAction = None - self.event_stream.add_event(obs, EventSource.AGENT) return async def _handle_traffic_control( From 588b25d6d818b9f4d74ab118cc64710209de67ec Mon Sep 17 00:00:00 2001 From: Engel Nyst Date: Thu, 9 Jan 2025 05:23:03 +0100 Subject: [PATCH 02/24] handle terminal states --- openhands/controller/agent_controller.py | 95 ++++++++++++------------ 1 file changed, 46 insertions(+), 49 deletions(-) diff --git a/openhands/controller/agent_controller.py b/openhands/controller/agent_controller.py index 3d27f4fccad8..245b195f797b 100644 --- a/openhands/controller/agent_controller.py +++ b/openhands/controller/agent_controller.py @@ -112,12 +112,13 @@ def __init__( self.id = sid self.agent = agent self.headless_mode = headless_mode + self.is_delegate = is_delegate # the event stream must be set before maybe subscribing to it self.event_stream = event_stream # subscribe to the event stream if this is not a delegate - if not is_delegate: + if not self.is_delegate: self.event_stream.subscribe( EventStreamSubscriber.AGENT_CONTROLLER, self.on_event, self.id ) @@ -264,7 +265,43 @@ def on_event(self, event: Event) -> None: ) return else: - # Delegate is done - unset delegate so parent can resume normal handling + if delegate_state in (AgentState.FINISHED, AgentState.REJECTED): + # retrieve delegate result + delegate_outputs = ( + self.delegate.state.outputs if self.delegate.state else {} + ) + + # prepare delegate result observation + # TODO: replace this with AI-generated summary (#2395) + formatted_output = ', '.join( + f'{key}: {value}' for key, value in delegate_outputs.items() + ) + content = f'{self.delegate.agent.name} finishes task with {formatted_output}' + + # emit the delegate result observation + obs = AgentDelegateObservation( + outputs=delegate_outputs, content=content + ) + self.event_stream.add_event(obs, EventSource.AGENT) + else: + # delegate state is ERROR + # emit AgentDelegateObservation with error content + delegate_outputs = ( + self.delegate.state.outputs if self.delegate.state else {} + ) + content = f'{self.delegate.agent.name} encountered an error during execution.' + + # emit the delegate result observation + obs = AgentDelegateObservation( + outputs=delegate_outputs, content=content + ) + self.event_stream.add_event(obs, EventSource.AGENT) + + # delegate is done + # update iteration that shall be shared across agents + self.state.iteration = self.delegate.state.iteration + + # unset delegate so parent can resume normal handling self.delegate = None self.delegateAction = None @@ -346,54 +383,14 @@ async def _handle_observation(self, observation: Observation) -> None: delegate_state = observation.agent_state self.log('debug', f'Delegate state: {delegate_state}') - if delegate_state == AgentState.ERROR: - # update iteration that shall be shared across agents - self.state.iteration = self.delegate.state.iteration - - # emit AgentDelegateObservation to mark delegate termination due to error - delegate_outputs = ( - self.delegate.state.outputs if self.delegate.state else {} - ) - content = f'{self.delegate.agent.name} encountered an error during execution.' - - # close the delegate upon error - await self.delegate.close() - - # emit the delegate result observation - obs = AgentDelegateObservation( - outputs=delegate_outputs, content=content - ) - self.event_stream.add_event(obs, EventSource.AGENT) - - elif delegate_state in (AgentState.FINISHED, AgentState.REJECTED): - # update iteration that shall be shared across agents - self.state.iteration = self.delegate.state.iteration - - # retrieve delegate result - delegate_outputs = ( - self.delegate.state.outputs if self.delegate.state else {} - ) - + if delegate_state in ( + AgentState.FINISHED, + AgentState.REJECTED, + AgentState.ERROR, + ): # close delegate controller: we must close the delegate controller before adding new events - await self.delegate.close() - - # resubscribe parent when delegate is finished - self.event_stream.subscribe( - EventStreamSubscriber.AGENT_CONTROLLER, self.on_event, self.id - ) - - # prepare delegate result observation - # TODO: replace this with AI-generated summary (#2395) - formatted_output = ', '.join( - f'{key}: {value}' for key, value in delegate_outputs.items() - ) - content = f'{self.delegate.agent.name} finishes task with {formatted_output}' - - # emit the delegate result observation - obs = AgentDelegateObservation( - outputs=delegate_outputs, content=content - ) - self.event_stream.add_event(obs, EventSource.AGENT) + # including its delegate observation + await self.close() async def _handle_message_action(self, action: MessageAction) -> None: """Handles message actions from the event stream. From f1a4ac4e2be71af643a71339f549933873556235 Mon Sep 17 00:00:00 2001 From: Engel Nyst Date: Thu, 9 Jan 2025 05:32:44 +0100 Subject: [PATCH 03/24] fix close --- openhands/controller/agent_controller.py | 21 ++++++++------------- 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/openhands/controller/agent_controller.py b/openhands/controller/agent_controller.py index 245b195f797b..f6a11297a066 100644 --- a/openhands/controller/agent_controller.py +++ b/openhands/controller/agent_controller.py @@ -265,6 +265,14 @@ def on_event(self, event: Event) -> None: ) return else: + # delegate is done + # update iteration that shall be shared across agents + self.state.iteration = self.delegate.state.iteration + + # close the delegate controller before adding new events + # then add the delegate observation + asyncio.get_event_loop().run_until_complete(self.delegate.close()) + if delegate_state in (AgentState.FINISHED, AgentState.REJECTED): # retrieve delegate result delegate_outputs = ( @@ -297,10 +305,6 @@ def on_event(self, event: Event) -> None: ) self.event_stream.add_event(obs, EventSource.AGENT) - # delegate is done - # update iteration that shall be shared across agents - self.state.iteration = self.delegate.state.iteration - # unset delegate so parent can resume normal handling self.delegate = None self.delegateAction = None @@ -383,15 +387,6 @@ async def _handle_observation(self, observation: Observation) -> None: delegate_state = observation.agent_state self.log('debug', f'Delegate state: {delegate_state}') - if delegate_state in ( - AgentState.FINISHED, - AgentState.REJECTED, - AgentState.ERROR, - ): - # close delegate controller: we must close the delegate controller before adding new events - # including its delegate observation - await self.close() - async def _handle_message_action(self, action: MessageAction) -> None: """Handles message actions from the event stream. From 382f94cd0bb0e10b51103feeb44b3e5449a0dbd5 Mon Sep 17 00:00:00 2001 From: Engel Nyst Date: Thu, 9 Jan 2025 05:38:22 +0100 Subject: [PATCH 04/24] refactor delegate end --- openhands/controller/agent_controller.py | 92 +++++++++++++----------- 1 file changed, 50 insertions(+), 42 deletions(-) diff --git a/openhands/controller/agent_controller.py b/openhands/controller/agent_controller.py index f6a11297a066..f51945c66d5b 100644 --- a/openhands/controller/agent_controller.py +++ b/openhands/controller/agent_controller.py @@ -266,48 +266,7 @@ def on_event(self, event: Event) -> None: return else: # delegate is done - # update iteration that shall be shared across agents - self.state.iteration = self.delegate.state.iteration - - # close the delegate controller before adding new events - # then add the delegate observation - asyncio.get_event_loop().run_until_complete(self.delegate.close()) - - if delegate_state in (AgentState.FINISHED, AgentState.REJECTED): - # retrieve delegate result - delegate_outputs = ( - self.delegate.state.outputs if self.delegate.state else {} - ) - - # prepare delegate result observation - # TODO: replace this with AI-generated summary (#2395) - formatted_output = ', '.join( - f'{key}: {value}' for key, value in delegate_outputs.items() - ) - content = f'{self.delegate.agent.name} finishes task with {formatted_output}' - - # emit the delegate result observation - obs = AgentDelegateObservation( - outputs=delegate_outputs, content=content - ) - self.event_stream.add_event(obs, EventSource.AGENT) - else: - # delegate state is ERROR - # emit AgentDelegateObservation with error content - delegate_outputs = ( - self.delegate.state.outputs if self.delegate.state else {} - ) - content = f'{self.delegate.agent.name} encountered an error during execution.' - - # emit the delegate result observation - obs = AgentDelegateObservation( - outputs=delegate_outputs, content=content - ) - self.event_stream.add_event(obs, EventSource.AGENT) - - # unset delegate so parent can resume normal handling - self.delegate = None - self.delegateAction = None + self.end_delegate() asyncio.get_event_loop().run_until_complete(self._on_event(event)) @@ -577,6 +536,55 @@ async def start_delegate(self, action: AgentDelegateAction) -> None: await self.delegate.set_agent_state_to(AgentState.RUNNING) + def end_delegate(self) -> None: + if self.delegate is None: + return + + delegate_state = self.delegate.get_agent_state() + + # update iteration that shall be shared across agents + self.state.iteration = self.delegate.state.iteration + + # close the delegate controller before adding new events + # then add the delegate observation + asyncio.get_event_loop().run_until_complete(self.delegate.close()) + + if delegate_state in (AgentState.FINISHED, AgentState.REJECTED): + # retrieve delegate result + delegate_outputs = ( + self.delegate.state.outputs if self.delegate.state else {} + ) + + # prepare delegate result observation + # TODO: replace this with AI-generated summary (#2395) + formatted_output = ', '.join( + f'{key}: {value}' for key, value in delegate_outputs.items() + ) + content = ( + f'{self.delegate.agent.name} finishes task with {formatted_output}' + ) + + # emit the delegate result observation + obs = AgentDelegateObservation(outputs=delegate_outputs, content=content) + self.event_stream.add_event(obs, EventSource.AGENT) + else: + # delegate state is ERROR + # emit AgentDelegateObservation with error content + delegate_outputs = ( + self.delegate.state.outputs if self.delegate.state else {} + ) + content = ( + f'{self.delegate.agent.name} encountered an error during execution.' + ) + + # emit the delegate result observation + obs = AgentDelegateObservation(outputs=delegate_outputs, content=content) + self.event_stream.add_event(obs, EventSource.AGENT) + + # unset delegate so parent can resume normal handling + self.delegate = None + self.delegateAction = None + async def _step(self) -> None: """Executes a single step of the parent or delegate agent. Detects stuck agents and limits on the number of iterations and the task budget.""" if self.get_agent_state() != AgentState.RUNNING: From a6501b70ee62e34536cf1ddcd15ded70c12e54a9 Mon Sep 17 00:00:00 2001 From: Engel Nyst Date: Thu, 9 Jan 2025 08:09:51 +0100 Subject: [PATCH 05/24] debug --- openhands/controller/agent_controller.py | 8 ++++++++ openhands/core/main.py | 1 + openhands/events/stream.py | 8 ++++++-- openhands/runtime/base.py | 2 ++ 4 files changed, 17 insertions(+), 2 deletions(-) diff --git a/openhands/controller/agent_controller.py b/openhands/controller/agent_controller.py index f51945c66d5b..b9650cf93fa2 100644 --- a/openhands/controller/agent_controller.py +++ b/openhands/controller/agent_controller.py @@ -233,6 +233,12 @@ def should_step(self, event: Event) -> bool: if isinstance(event, Action): if isinstance(event, MessageAction) and event.source == EventSource.USER: return True + if ( + isinstance(event, MessageAction) + and self.get_agent_state() != AgentState.AWAITING_USER_INPUT + ): + # TODO: this is fragile, but how else to check if eligible? + return True if isinstance(event, AgentDelegateAction): return True return False @@ -251,6 +257,8 @@ def on_event(self, event: Event) -> None: event (Event): The incoming event to process. """ + print(f'CONTROLLER{self.id}:on_event: {event.__class__.__name__}') + # If we have a delegate that is not finished or errored, forward events to it if self.delegate is not None: delegate_state = self.delegate.get_agent_state() diff --git a/openhands/core/main.py b/openhands/core/main.py index 65e328648358..065d81056889 100644 --- a/openhands/core/main.py +++ b/openhands/core/main.py @@ -129,6 +129,7 @@ async def run_controller( event_stream.add_event(initial_user_action, EventSource.USER) def on_event(event: Event): + print(f'MAIN:on_event: {event.__class__.__name__}') if isinstance(event, AgentStateChangedObservation): if event.agent_state == AgentState.AWAITING_USER_INPUT: if exit_on_message: diff --git a/openhands/events/stream.py b/openhands/events/stream.py index e58f90e79d9c..4a0bbac055e3 100644 --- a/openhands/events/stream.py +++ b/openhands/events/stream.py @@ -65,6 +65,7 @@ class EventStream: _queue: queue.Queue[Event] _queue_thread: threading.Thread _queue_loop: asyncio.AbstractEventLoop | None + _thread_pools: dict[str, dict[str, ThreadPoolExecutor]] _thread_loops: dict[str, dict[str, asyncio.AbstractEventLoop]] def __init__(self, sid: str, file_store: FileStore): @@ -72,8 +73,8 @@ def __init__(self, sid: str, file_store: FileStore): self.file_store = file_store self._stop_flag = threading.Event() self._queue: queue.Queue[Event] = queue.Queue() - self._thread_pools: dict[str, dict[str, ThreadPoolExecutor]] = {} - self._thread_loops: dict[str, dict[str, asyncio.AbstractEventLoop]] = {} + self._thread_pools = {} + self._thread_loops = {} self._queue_loop = None self._queue_thread = threading.Thread(target=self._run_queue_loop) self._queue_thread.daemon = True @@ -268,6 +269,7 @@ def add_event(self, event: Event, source: EventSource): data = event_to_dict(event) if event.id is not None: self.file_store.write(self._get_filename_for_id(event.id), json.dumps(data)) + print(f'EVENTSTREAM:add_event: {event.__class__.__name__}') self._queue.put(event) def _run_queue_loop(self): @@ -285,6 +287,8 @@ async def _process_queue(self): event = self._queue.get(timeout=0.1) except queue.Empty: continue + + # pass each event to each callback in order for key in sorted(self._subscribers.keys()): callbacks = self._subscribers[key] for callback_id in callbacks: diff --git a/openhands/runtime/base.py b/openhands/runtime/base.py index c079aae92fd1..75ae7c4369ae 100644 --- a/openhands/runtime/base.py +++ b/openhands/runtime/base.py @@ -176,6 +176,7 @@ def add_env_vars(self, env_vars: dict[str, str]) -> None: ) def on_event(self, event: Event) -> None: + print(f'RUNTIME:on_event: {event.__class__.__name__}') if isinstance(event, Action): asyncio.get_event_loop().run_until_complete(self._handle_action(event)) @@ -184,6 +185,7 @@ async def _handle_action(self, event: Action) -> None: event.timeout = self.config.sandbox.timeout assert event.timeout is not None try: + print(f'ASYNC RUNTIME:on_event: {event.__class__.__name__}') observation: Observation = await call_sync_from_async( self.run_action, event ) From a1331b1d71d3c9496b40a507b38f7b9558d24bac Mon Sep 17 00:00:00 2001 From: Engel Nyst Date: Thu, 9 Jan 2025 08:14:14 +0100 Subject: [PATCH 06/24] workflows --- .github/workflows/integration-runner.yml | 56 ++++++++++++++++++++++- evaluation/integration_tests/run_infer.py | 7 ++- 2 files changed, 60 insertions(+), 3 deletions(-) diff --git a/.github/workflows/integration-runner.yml b/.github/workflows/integration-runner.yml index 120572aa0cdd..6ff4f174625f 100644 --- a/.github/workflows/integration-runner.yml +++ b/.github/workflows/integration-runner.yml @@ -109,11 +109,59 @@ jobs: echo >> $GITHUB_ENV echo "EOF" >> $GITHUB_ENV + # ------------------------------------------------------------- + # Run DelegatorAgent tests for Haiku, limited to t01 and t02 + - name: Wait a little bit (again) + run: sleep 5 + + - name: Configure config.toml for testing DelegatorAgent (Haiku) + env: + LLM_MODEL: "litellm_proxy/claude-3-5-haiku-20241022" + LLM_API_KEY: ${{ secrets.LLM_API_KEY }} + LLM_BASE_URL: ${{ secrets.LLM_BASE_URL }} + run: | + echo "[llm.eval]" > config.toml + echo "model = \"$LLM_MODEL\"" >> config.toml + echo "api_key = \"$LLM_API_KEY\"" >> config.toml + echo "base_url = \"$LLM_BASE_URL\"" >> config.toml + echo "temperature = 0.0" >> config.toml + + # ------------------------------------------------------------- + # Run DelegatorAgent tests for DeepSeek, limited to t01 and t02 + - name: Wait a little bit (again) + run: sleep 5 + + - name: Configure config.toml for testing DelegatorAgent (DeepSeek) + env: + LLM_MODEL: "litellm_proxy/deepseek-chat" + LLM_API_KEY: ${{ secrets.LLM_API_KEY }} + LLM_BASE_URL: ${{ secrets.LLM_BASE_URL }} + run: | + echo "[llm.eval]" > config.toml + echo "model = \"$LLM_MODEL\"" >> config.toml + echo "api_key = \"$LLM_API_KEY\"" >> config.toml + echo "base_url = \"$LLM_BASE_URL\"" >> config.toml + echo "temperature = 0.0" >> config.toml + + - name: Run integration test evaluation for DelegatorAgent (DeepSeek) + env: + SANDBOX_FORCE_REBUILD_RUNTIME: True + run: | + poetry run ./evaluation/integration_tests/scripts/run_infer.sh llm.eval HEAD DelegatorAgent '' $N_PROCESSES "t01_fix_simple_typo,t02_add_bash_hello" 'delegator_deepseek_run' + + # Find and export the delegator test results + REPORT_FILE_DELEGATOR_DEEPSEEK=$(find evaluation/evaluation_outputs/outputs/integration_tests/DelegatorAgent/deepseek*_maxiter_10_N* -name "report.md" -type f | head -n 1) + echo "REPORT_FILE_DELEGATOR_DEEPSEEK: $REPORT_FILE_DELEGATOR_DEEPSEEK" + echo "INTEGRATION_TEST_REPORT_DELEGATOR_DEEPSEEK<> $GITHUB_ENV + cat $REPORT_FILE_DELEGATOR_DEEPSEEK >> $GITHUB_ENV + echo >> $GITHUB_ENV + echo "EOF" >> $GITHUB_ENV + - name: Create archive of evaluation outputs run: | TIMESTAMP=$(date +'%y-%m-%d-%H-%M') cd evaluation/evaluation_outputs/outputs # Change to the outputs directory - tar -czvf ../../../integration_tests_${TIMESTAMP}.tar.gz integration_tests/CodeActAgent/* # Only include the actual result directories + tar -czvf ../../../integration_tests_${TIMESTAMP}.tar.gz integration_tests/CodeActAgent/* integration_tests/DelegatorAgent/* # Only include the actual result directories - name: Upload evaluation results as artifact uses: actions/upload-artifact@v4 @@ -154,5 +202,11 @@ jobs: **Integration Tests Report (DeepSeek)** DeepSeek LLM Test Results: ${{ env.INTEGRATION_TEST_REPORT_DEEPSEEK }} + --- + **Integration Tests Report Delegator (Haiku)** + ${{ env.INTEGRATION_TEST_REPORT_DELEGATOR_HAIKU }} + --- + **Integration Tests Report Delegator (DeepSeek)** + ${{ env.INTEGRATION_TEST_REPORT_DELEGATOR_DEEPSEEK }} --- Download testing outputs (includes both Haiku and DeepSeek results): [Download](${{ steps.upload_results_artifact.outputs.artifact-url }}) diff --git a/evaluation/integration_tests/run_infer.py b/evaluation/integration_tests/run_infer.py index fe85d23bf585..fe411c33becf 100644 --- a/evaluation/integration_tests/run_infer.py +++ b/evaluation/integration_tests/run_infer.py @@ -8,13 +8,15 @@ from evaluation.utils.shared import ( EvalMetadata, EvalOutput, - codeact_user_response, make_metadata, prepare_dataset, reset_logger_for_multiprocessing, run_evaluation, update_llm_config_for_completions_logging, ) +from evaluation.utils.shared import ( + codeact_user_response as fake_user_response, +) from openhands.controller.state.state import State from openhands.core.config import ( AgentConfig, @@ -31,7 +33,8 @@ from openhands.utils.async_utils import call_async_from_sync FAKE_RESPONSES = { - 'CodeActAgent': codeact_user_response, + 'CodeActAgent': fake_user_response, + 'DelegatorAgent': fake_user_response, } From ea3ab1d53cd1f392c2643a04bc01e74e92278727 Mon Sep 17 00:00:00 2001 From: Engel Nyst Date: Thu, 9 Jan 2025 16:42:07 +0100 Subject: [PATCH 07/24] add missing haiku section --- .github/workflows/integration-runner.yml | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/.github/workflows/integration-runner.yml b/.github/workflows/integration-runner.yml index 6ff4f174625f..f81e37471a43 100644 --- a/.github/workflows/integration-runner.yml +++ b/.github/workflows/integration-runner.yml @@ -126,6 +126,20 @@ jobs: echo "base_url = \"$LLM_BASE_URL\"" >> config.toml echo "temperature = 0.0" >> config.toml + - name: Run integration test evaluation for DelegatorAgent (Haiku) + env: + SANDBOX_FORCE_REBUILD_RUNTIME: True + run: | + poetry run ./evaluation/integration_tests/scripts/run_infer.sh llm.eval HEAD DelegatorAgent '' $N_PROCESSES "t01_fix_simple_typo,t02_add_bash_hello" 'delegator_haiku_run' + + # Find and export the delegator test results + REPORT_FILE_DELEGATOR_HAIKU=$(find evaluation/evaluation_outputs/outputs/integration_tests/DelegatorAgent/*haiku*_maxiter_10_N* -name "report.md" -type f | head -n 1) + echo "REPORT_FILE_DELEGATOR_HAIKU: $REPORT_FILE_DELEGATOR_HAIKU" + echo "INTEGRATION_TEST_REPORT_DELEGATOR_HAIKU<> $GITHUB_ENV + cat $REPORT_FILE_DELEGATOR_HAIKU >> $GITHUB_ENV + echo >> $GITHUB_ENV + echo "EOF" >> $GITHUB_ENV + # ------------------------------------------------------------- # Run DelegatorAgent tests for DeepSeek, limited to t01 and t02 - name: Wait a little bit (again) From fca199aa6dd8aec10b82dfcd35cf1fcff39025fb Mon Sep 17 00:00:00 2001 From: Engel Nyst Date: Thu, 9 Jan 2025 16:49:34 +0100 Subject: [PATCH 08/24] more clear trace --- openhands/controller/agent_controller.py | 5 ++++- openhands/core/main.py | 2 +- openhands/events/stream.py | 2 +- openhands/runtime/base.py | 10 ++++++---- 4 files changed, 12 insertions(+), 7 deletions(-) diff --git a/openhands/controller/agent_controller.py b/openhands/controller/agent_controller.py index b9650cf93fa2..16f532906141 100644 --- a/openhands/controller/agent_controller.py +++ b/openhands/controller/agent_controller.py @@ -257,7 +257,7 @@ def on_event(self, event: Event) -> None: event (Event): The incoming event to process. """ - print(f'CONTROLLER{self.id}:on_event: {event.__class__.__name__}') + print(f'CONTROLLER {self.id}:on_event: {event.__class__.__name__}({event.id})') # If we have a delegate that is not finished or errored, forward events to it if self.delegate is not None: @@ -279,6 +279,7 @@ def on_event(self, event: Event) -> None: asyncio.get_event_loop().run_until_complete(self._on_event(event)) async def _on_event(self, event: Event) -> None: + print(f'CONTROLLER {self.id}:_on_event: {event.__class__.__name__}({event.id})') if hasattr(event, 'hidden') and event.hidden: return @@ -545,6 +546,7 @@ async def start_delegate(self, action: AgentDelegateAction) -> None: await self.delegate.set_agent_state_to(AgentState.RUNNING) def end_delegate(self) -> None: + print(f'CONTROLLER {self.id}:end_delegate') if self.delegate is None: return @@ -595,6 +597,7 @@ def end_delegate(self) -> None: async def _step(self) -> None: """Executes a single step of the parent or delegate agent. Detects stuck agents and limits on the number of iterations and the task budget.""" + print(f'CONTROLLER {self.id}:_step') if self.get_agent_state() != AgentState.RUNNING: return diff --git a/openhands/core/main.py b/openhands/core/main.py index 065d81056889..5ff81ec20285 100644 --- a/openhands/core/main.py +++ b/openhands/core/main.py @@ -129,7 +129,7 @@ async def run_controller( event_stream.add_event(initial_user_action, EventSource.USER) def on_event(event: Event): - print(f'MAIN:on_event: {event.__class__.__name__}') + print(f'MAIN:on_event: {event.__class__.__name__}({event.id})') if isinstance(event, AgentStateChangedObservation): if event.agent_state == AgentState.AWAITING_USER_INPUT: if exit_on_message: diff --git a/openhands/events/stream.py b/openhands/events/stream.py index 4a0bbac055e3..7dff207cf9d3 100644 --- a/openhands/events/stream.py +++ b/openhands/events/stream.py @@ -269,7 +269,7 @@ def add_event(self, event: Event, source: EventSource): data = event_to_dict(event) if event.id is not None: self.file_store.write(self._get_filename_for_id(event.id), json.dumps(data)) - print(f'EVENTSTREAM:add_event: {event.__class__.__name__}') + print(f'EVENTSTREAM:add_event: {event.__class__.__name__}({event.id})') self._queue.put(event) def _run_queue_loop(self): diff --git a/openhands/runtime/base.py b/openhands/runtime/base.py index 75ae7c4369ae..583b468f35d0 100644 --- a/openhands/runtime/base.py +++ b/openhands/runtime/base.py @@ -125,7 +125,7 @@ def __init__( def setup_initial_env(self) -> None: if self.attach_to_existing: return - logger.debug(f'Adding env vars: {self.initial_env_vars}') + logger.debug(f'Adding env vars: {self.initial_env_vars.keys()}') self.add_env_vars(self.initial_env_vars) if self.config.sandbox.runtime_startup_env_vars: self.add_env_vars(self.config.sandbox.runtime_startup_env_vars) @@ -172,11 +172,11 @@ def add_env_vars(self, env_vars: dict[str, str]) -> None: obs = self.run(CmdRunAction(cmd)) if not isinstance(obs, CmdOutputObservation) or obs.exit_code != 0: raise RuntimeError( - f'Failed to add env vars [{env_vars}] to environment: {obs.content}' + f'Failed to add env vars [{env_vars.keys()}] to environment: {obs.content}' ) def on_event(self, event: Event) -> None: - print(f'RUNTIME:on_event: {event.__class__.__name__}') + print(f'RUNTIME:on_event: {event.__class__.__name__}({event.id})') if isinstance(event, Action): asyncio.get_event_loop().run_until_complete(self._handle_action(event)) @@ -185,7 +185,9 @@ async def _handle_action(self, event: Action) -> None: event.timeout = self.config.sandbox.timeout assert event.timeout is not None try: - print(f'ASYNC RUNTIME:on_event: {event.__class__.__name__}') + print( + f'ASYNC RUNTIME:_handle_action: {event.__class__.__name__}({event.id})' + ) observation: Observation = await call_sync_from_async( self.run_action, event ) From 0393f9b3b75520c2e90ab19dc04507b2aac4e29d Mon Sep 17 00:00:00 2001 From: Engel Nyst Date: Thu, 9 Jan 2025 17:34:58 +0100 Subject: [PATCH 09/24] fix delegate end --- openhands/controller/agent_controller.py | 1 + tests/unit/test_agent_controller.py | 23 ++++++++++++++++++++--- 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/openhands/controller/agent_controller.py b/openhands/controller/agent_controller.py index 16f532906141..9abcb78278c7 100644 --- a/openhands/controller/agent_controller.py +++ b/openhands/controller/agent_controller.py @@ -275,6 +275,7 @@ def on_event(self, event: Event) -> None: else: # delegate is done self.end_delegate() + return asyncio.get_event_loop().run_until_complete(self._on_event(event)) diff --git a/tests/unit/test_agent_controller.py b/tests/unit/test_agent_controller.py index 72327570735b..81769c5ac369 100644 --- a/tests/unit/test_agent_controller.py +++ b/tests/unit/test_agent_controller.py @@ -1,4 +1,5 @@ import asyncio +from concurrent.futures import ThreadPoolExecutor from unittest.mock import AsyncMock, MagicMock, Mock from uuid import uuid4 @@ -272,9 +273,25 @@ async def test_delegate_step_different_states( mock_delegate._step = AsyncMock() mock_delegate.close = AsyncMock() - await controller._delegate_step() - - mock_delegate._step.assert_called_once() + def call_on_event_with_new_loop(): + """ + In this thread, create and set a fresh event loop, so that the run_until_complete() + calls inside controller.on_event(...) find a valid loop. + """ + loop_in_thread = asyncio.new_event_loop() + try: + asyncio.set_event_loop(loop_in_thread) + msg_action = MessageAction(content='Test message') + msg_action._source = EventSource.USER + controller.on_event(msg_action) + finally: + # If you like, you can close the loop afterward + loop_in_thread.close() + + loop = asyncio.get_running_loop() + with ThreadPoolExecutor() as executor: + future = loop.run_in_executor(executor, call_on_event_with_new_loop) + await future if delegate_state == AgentState.RUNNING: assert controller.delegate is not None From 84fb5c54764c988411a81ecebfeac6abb973b6e7 Mon Sep 17 00:00:00 2001 From: Engel Nyst Date: Thu, 9 Jan 2025 18:35:18 +0100 Subject: [PATCH 10/24] Update openhands/controller/agent_controller.py --- openhands/controller/agent_controller.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/openhands/controller/agent_controller.py b/openhands/controller/agent_controller.py index 9abcb78278c7..27cf46638271 100644 --- a/openhands/controller/agent_controller.py +++ b/openhands/controller/agent_controller.py @@ -350,11 +350,6 @@ async def _handle_observation(self, observation: Observation) -> None: elif isinstance(observation, ErrorObservation): if self.state.agent_state == AgentState.ERROR: self.state.metrics.merge(self.state.local_metrics) - elif isinstance(observation, AgentStateChangedObservation): - # if this is a delegate, check for stop conditions - if self.is_delegate: - delegate_state = observation.agent_state - self.log('debug', f'Delegate state: {delegate_state}') async def _handle_message_action(self, action: MessageAction) -> None: """Handles message actions from the event stream. From f62b2499aa390b3a86a3ec139e9a05f769225cc0 Mon Sep 17 00:00:00 2001 From: Engel Nyst Date: Thu, 9 Jan 2025 19:15:09 +0100 Subject: [PATCH 11/24] fix delegate too early --- openhands/controller/agent_controller.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/openhands/controller/agent_controller.py b/openhands/controller/agent_controller.py index 9abcb78278c7..ad6a197aa409 100644 --- a/openhands/controller/agent_controller.py +++ b/openhands/controller/agent_controller.py @@ -262,7 +262,7 @@ def on_event(self, event: Event) -> None: # If we have a delegate that is not finished or errored, forward events to it if self.delegate is not None: delegate_state = self.delegate.get_agent_state() - if delegate_state not in ( + if self.should_step(event) and delegate_state not in ( AgentState.FINISHED, AgentState.ERROR, AgentState.REJECTED, @@ -272,7 +272,7 @@ def on_event(self, event: Event) -> None: self.delegate._on_event(event) ) return - else: + elif self.should_step(event): # delegate is done self.end_delegate() return From 419f5687e2a90695f27c2c7249754de761b4e5a5 Mon Sep 17 00:00:00 2001 From: Engel Nyst Date: Thu, 9 Jan 2025 22:43:03 +0100 Subject: [PATCH 12/24] fix message passing, flow --- openhands/agenthub/micro/agent.py | 7 ++++ openhands/controller/agent_controller.py | 46 ++++++++++++------------ openhands/runtime/base.py | 2 +- 3 files changed, 30 insertions(+), 25 deletions(-) diff --git a/openhands/agenthub/micro/agent.py b/openhands/agenthub/micro/agent.py index a9b0825afd9d..7c51043236a2 100644 --- a/openhands/agenthub/micro/agent.py +++ b/openhands/agenthub/micro/agent.py @@ -50,6 +50,12 @@ def history_to_json(self, history: list[Event], max_events: int = 20, **kwargs): # history is in reverse order, let's fix it processed_history.reverse() + # everything starts with a message + # the first message is already in the prompt as the task + # so we don't need to include it in the history + if event_count < max_events: + processed_history.pop(0) + return json.dumps(processed_history, **kwargs) def __init__(self, llm: LLM, config: AgentConfig): @@ -62,6 +68,7 @@ def __init__(self, llm: LLM, config: AgentConfig): def step(self, state: State) -> Action: last_user_message, last_image_urls = state.get_current_user_intent() + print(f'MICROAGENT:step: {last_user_message}') prompt = self.prompt_template.render( state=state, instructions=instructions, diff --git a/openhands/controller/agent_controller.py b/openhands/controller/agent_controller.py index 497f4c0f57c7..50265f8fdd5c 100644 --- a/openhands/controller/agent_controller.py +++ b/openhands/controller/agent_controller.py @@ -169,7 +169,11 @@ async def close(self) -> None: ) # unsubscribe from the event stream - self.event_stream.unsubscribe(EventStreamSubscriber.AGENT_CONTROLLER, self.id) + # only the root parent controller subscribes to the event stream + if not self.is_delegate: + self.event_stream.unsubscribe( + EventStreamSubscriber.AGENT_CONTROLLER, self.id + ) self._closed = True def log(self, level: str, message: str, extra: dict | None = None) -> None: @@ -230,6 +234,10 @@ async def _step_with_exception_handling(self): await self._react_to_exception(reported) def should_step(self, event: Event) -> bool: + # it might be the delegate's day in the sun + if self.delegate is not None: + return False + if isinstance(event, Action): if isinstance(event, MessageAction) and event.source == EventSource.USER: return True @@ -262,7 +270,7 @@ def on_event(self, event: Event) -> None: # If we have a delegate that is not finished or errored, forward events to it if self.delegate is not None: delegate_state = self.delegate.get_agent_state() - if self.should_step(event) and delegate_state not in ( + if delegate_state not in ( AgentState.FINISHED, AgentState.ERROR, AgentState.REJECTED, @@ -272,7 +280,7 @@ def on_event(self, event: Event) -> None: self.delegate._on_event(event) ) return - elif self.should_step(event): + else: # delegate is done self.end_delegate() return @@ -297,17 +305,22 @@ async def _on_event(self, event: Event) -> None: self.step() async def _handle_action(self, action: Action) -> None: - """Handles actions from the event stream. - - Args: - action (Action): The action to handle. - """ + """Handles an Action from the agent or delegate.""" if isinstance(action, ChangeAgentStateAction): await self.set_agent_state_to(action.agent_state) # type: ignore elif isinstance(action, MessageAction): await self._handle_message_action(action) elif isinstance(action, AgentDelegateAction): await self.start_delegate(action) + assert self.delegate is not None + # Post a MessageAction with the task for the delegate + if 'task' in action.inputs: + self.event_stream.add_event( + MessageAction(content='TASK: ' + action.inputs['task']), + EventSource.USER, + ) + await self.delegate.set_agent_state_to(AgentState.RUNNING) + return elif isinstance(action, AgentFinishAction): self.state.outputs = action.outputs @@ -470,6 +483,7 @@ async def set_agent_state_to(self, new_state: AgentState) -> None: self._pending_action._id = None # type: ignore[attr-defined] self.event_stream.add_event(self._pending_action, EventSource.AGENT) + print(f'CONTROLLER {self.id}:set_agent_state_to: {new_state}') self.state.agent_state = new_state self.event_stream.add_event( AgentStateChangedObservation('', self.state.agent_state), @@ -539,8 +553,6 @@ async def start_delegate(self, action: AgentDelegateAction) -> None: headless_mode=self.headless_mode, ) - await self.delegate.set_agent_state_to(AgentState.RUNNING) - def end_delegate(self) -> None: print(f'CONTROLLER {self.id}:end_delegate') if self.delegate is None: @@ -600,14 +612,6 @@ async def _step(self) -> None: if self._pending_action: return - if self.delegate is not None: - assert self.delegate != self - # TODO this conditional will always be false, because the parent controllers are unsubscribed - # remove if it's still useless when delegation is reworked - if self.delegate.get_agent_state() != AgentState.PAUSED: - await self._delegate_step() - return - self.log( 'info', f'LEVEL {self.state.delegate_level} LOCAL STEP {self.state.local_iteration} GLOBAL STEP {self.state.iteration}', @@ -697,12 +701,6 @@ async def _step(self) -> None: log_level = 'info' if LOG_ALL_EVENTS else 'debug' self.log(log_level, str(action), extra={'msg_type': 'ACTION'}) - async def _delegate_step(self) -> None: - """Executes a single step of the delegate agent.""" - await self.delegate._step() # type: ignore[union-attr] - - return - async def _handle_traffic_control( self, limit_type: str, current_value: float, max_value: float ) -> bool: diff --git a/openhands/runtime/base.py b/openhands/runtime/base.py index 583b468f35d0..f230874c41a5 100644 --- a/openhands/runtime/base.py +++ b/openhands/runtime/base.py @@ -176,7 +176,7 @@ def add_env_vars(self, env_vars: dict[str, str]) -> None: ) def on_event(self, event: Event) -> None: - print(f'RUNTIME:on_event: {event.__class__.__name__}({event.id})') + print(f'RUNTIME:on_event: {event.__class__.__name__}({event.id})\n') if isinstance(event, Action): asyncio.get_event_loop().run_until_complete(self._handle_action(event)) From 75831fec55dd10937e2954d3800923f744f885c2 Mon Sep 17 00:00:00 2001 From: Engel Nyst Date: Fri, 10 Jan 2025 00:23:24 +0100 Subject: [PATCH 13/24] tests --- openhands/events/stream.py | 2 +- tests/unit/test_agent_controller.py | 70 +---------- tests/unit/test_agent_delegation.py | 187 ++++++++++++++++++++++++++++ 3 files changed, 190 insertions(+), 69 deletions(-) create mode 100644 tests/unit/test_agent_delegation.py diff --git a/openhands/events/stream.py b/openhands/events/stream.py index 7dff207cf9d3..ea0ffaa135b2 100644 --- a/openhands/events/stream.py +++ b/openhands/events/stream.py @@ -258,7 +258,7 @@ def unsubscribe(self, subscriber_id: EventStreamSubscriber, callback_id: str): def add_event(self, event: Event, source: EventSource): if hasattr(event, '_id') and event.id is not None: raise ValueError( - 'Event already has an ID. It was probably added back to the EventStream from inside a handler, trigging a loop.' + f'Event already has an ID:{event.id}. It was probably added back to the EventStream from inside a handler, triggering a loop.' ) with self._lock: event._id = self._cur_id # type: ignore [attr-defined] diff --git a/tests/unit/test_agent_controller.py b/tests/unit/test_agent_controller.py index 81769c5ac369..08226cd14683 100644 --- a/tests/unit/test_agent_controller.py +++ b/tests/unit/test_agent_controller.py @@ -1,6 +1,5 @@ import asyncio -from concurrent.futures import ThreadPoolExecutor -from unittest.mock import AsyncMock, MagicMock, Mock +from unittest.mock import AsyncMock, MagicMock from uuid import uuid4 import pytest @@ -131,7 +130,7 @@ async def test_react_to_exception(mock_agent, mock_event_stream, mock_status_cal @pytest.mark.asyncio -async def test_run_controller_with_fatal_error(mock_agent, mock_event_stream): +async def test_run_controller_with_fatal_error(): config = AppConfig() file_store = get_file_store(config.file_store, config.file_store_path) event_stream = EventStream(sid='test', file_store=file_store) @@ -240,71 +239,6 @@ def on_event(event: Event): assert state.last_error == 'AgentStuckInLoopError: Agent got stuck in a loop' -@pytest.mark.asyncio -@pytest.mark.parametrize( - 'delegate_state', - [ - AgentState.RUNNING, - AgentState.FINISHED, - AgentState.ERROR, - AgentState.REJECTED, - ], -) -async def test_delegate_step_different_states( - mock_agent, mock_event_stream, delegate_state -): - controller = AgentController( - agent=mock_agent, - event_stream=mock_event_stream, - max_iterations=10, - sid='test', - confirmation_mode=False, - headless_mode=True, - ) - - mock_delegate = AsyncMock() - controller.delegate = mock_delegate - - mock_delegate.state.iteration = 5 - mock_delegate.state.outputs = {'result': 'test'} - mock_delegate.agent.name = 'TestDelegate' - - mock_delegate.get_agent_state = Mock(return_value=delegate_state) - mock_delegate._step = AsyncMock() - mock_delegate.close = AsyncMock() - - def call_on_event_with_new_loop(): - """ - In this thread, create and set a fresh event loop, so that the run_until_complete() - calls inside controller.on_event(...) find a valid loop. - """ - loop_in_thread = asyncio.new_event_loop() - try: - asyncio.set_event_loop(loop_in_thread) - msg_action = MessageAction(content='Test message') - msg_action._source = EventSource.USER - controller.on_event(msg_action) - finally: - # If you like, you can close the loop afterward - loop_in_thread.close() - - loop = asyncio.get_running_loop() - with ThreadPoolExecutor() as executor: - future = loop.run_in_executor(executor, call_on_event_with_new_loop) - await future - - if delegate_state == AgentState.RUNNING: - assert controller.delegate is not None - assert controller.state.iteration == 0 - mock_delegate.close.assert_not_called() - else: - assert controller.delegate is None - assert controller.state.iteration == 5 - mock_delegate.close.assert_called_once() - - await controller.close() - - @pytest.mark.asyncio async def test_max_iterations_extension(mock_agent, mock_event_stream): # Test with headless_mode=False - should extend max_iterations diff --git a/tests/unit/test_agent_delegation.py b/tests/unit/test_agent_delegation.py new file mode 100644 index 000000000000..c5c4e63f1d1c --- /dev/null +++ b/tests/unit/test_agent_delegation.py @@ -0,0 +1,187 @@ +import asyncio +from concurrent.futures import ThreadPoolExecutor +from unittest.mock import AsyncMock, MagicMock, Mock +from uuid import uuid4 + +import pytest + +from openhands.controller.agent import Agent +from openhands.controller.agent_controller import AgentController +from openhands.controller.state.state import State +from openhands.core.config import LLMConfig +from openhands.core.config.agent_config import AgentConfig +from openhands.core.schema import AgentState +from openhands.events import EventSource, EventStream +from openhands.events.action import ( + AgentDelegateAction, + AgentFinishAction, + MessageAction, +) +from openhands.llm.llm import LLM +from openhands.llm.metrics import Metrics +from openhands.storage.memory import InMemoryFileStore + + +@pytest.fixture +def mock_event_stream(): + """Creates an event stream in memory.""" + sid = f'test-{uuid4()}' + file_store = InMemoryFileStore({}) + return EventStream(sid=sid, file_store=file_store) + + +@pytest.fixture +def mock_parent_agent(): + """Creates a mock parent agent for testing delegation.""" + agent = MagicMock(spec=Agent) + agent.name = 'ParentAgent' + agent.llm = MagicMock(spec=LLM) + agent.llm.metrics = Metrics() + agent.llm.config = LLMConfig() + agent.config = AgentConfig() + return agent + + +@pytest.fixture +def mock_child_agent(): + """Creates a mock child agent for testing delegation.""" + agent = MagicMock(spec=Agent) + agent.name = 'ChildAgent' + agent.llm = MagicMock(spec=LLM) + agent.llm.metrics = Metrics() + agent.llm.config = LLMConfig() + agent.config = AgentConfig() + return agent + + +@pytest.mark.asyncio +async def test_delegation_flow(mock_parent_agent, mock_child_agent, mock_event_stream): + """ + Test that when the parent agent delegates to a child, the parent's delegate + is set, and once the child finishes, the parent is cleaned up properly. + """ + # Mock the agent class resolution so that AgentController can instantiate mock_child_agent + Agent.get_cls = Mock(return_value=lambda llm, config: mock_child_agent) + + # Create parent controller + parent_state = State(max_iterations=10) + parent_controller = AgentController( + agent=mock_parent_agent, + event_stream=mock_event_stream, + max_iterations=10, + sid='parent', + confirmation_mode=False, + headless_mode=True, + initial_state=parent_state, + ) + + # Setup a delegate action from the parent + delegate_action = AgentDelegateAction(agent='ChildAgent', inputs={'test': True}) + mock_parent_agent.step.return_value = delegate_action + + # Simulate a user message event to cause parent.step() to run + message_action = MessageAction(content='please delegate now') + message_action._source = EventSource.USER + await parent_controller._on_event(message_action) + + # Give time for the async step() to execute + await asyncio.sleep(1) + + # The parent should receive step() from that event + # Verify that a delegate agent controller is created + assert ( + parent_controller.delegate is not None + ), "Parent's delegate controller was not set." + + # The parent's iteration should have incremented + assert ( + parent_controller.state.iteration == 1 + ), 'Parent iteration should be incremented after step.' + + # Now simulate that the child increments local iteration and finishes its subtask + delegate_controller = parent_controller.delegate + delegate_controller.state.iteration = 5 # child had some steps + delegate_controller.state.outputs = {'delegate_result': 'done'} + + # The child is done, so we simulate it finishing: + child_finish_action = AgentFinishAction() + await delegate_controller._on_event(child_finish_action) + await asyncio.sleep(0.5) + + # Now the parent's delegate is None + assert ( + parent_controller.delegate is None + ), 'Parent delegate should be None after child finishes.' + + # Parent's global iteration is updated from the child + assert ( + parent_controller.state.iteration == 6 + ), "Parent iteration should be the child's iteration + 1 after child is done." + + # Cleanup + await parent_controller.close() + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + 'delegate_state', + [ + AgentState.RUNNING, + AgentState.FINISHED, + AgentState.ERROR, + AgentState.REJECTED, + ], +) +async def test_delegate_step_different_states( + mock_parent_agent, mock_event_stream, delegate_state +): + """Ensure that delegate is closed or remains open based on the delegate's state.""" + controller = AgentController( + agent=mock_parent_agent, + event_stream=mock_event_stream, + max_iterations=10, + sid='test', + confirmation_mode=False, + headless_mode=True, + ) + + mock_delegate = AsyncMock() + controller.delegate = mock_delegate + + mock_delegate.state.iteration = 5 + mock_delegate.state.outputs = {'result': 'test'} + mock_delegate.agent.name = 'TestDelegate' + + mock_delegate.get_agent_state = Mock(return_value=delegate_state) + mock_delegate._step = AsyncMock() + mock_delegate.close = AsyncMock() + + def call_on_event_with_new_loop(): + """ + In this thread, create and set a fresh event loop, so that the run_until_complete() + calls inside controller.on_event(...) find a valid loop. + """ + loop_in_thread = asyncio.new_event_loop() + try: + asyncio.set_event_loop(loop_in_thread) + msg_action = MessageAction(content='Test message') + msg_action._source = EventSource.USER + controller.on_event(msg_action) + finally: + loop_in_thread.close() + + loop = asyncio.get_running_loop() + with ThreadPoolExecutor() as executor: + future = loop.run_in_executor(executor, call_on_event_with_new_loop) + await future + + if delegate_state == AgentState.RUNNING: + assert controller.delegate is not None + assert controller.state.iteration == 0 + mock_delegate.close.assert_not_called() + else: + assert controller.delegate is None + assert controller.state.iteration == 5 + mock_delegate.close.assert_called_once() + + await controller.close() From 8d7382cd8d88ba65f4d50f29c149bfc85a6d7b9a Mon Sep 17 00:00:00 2001 From: Engel Nyst Date: Fri, 10 Jan 2025 00:32:01 +0100 Subject: [PATCH 14/24] temporary - quicker run --- .github/workflows/integration-runner.yml | 121 ++++++++++++----------- 1 file changed, 62 insertions(+), 59 deletions(-) diff --git a/.github/workflows/integration-runner.yml b/.github/workflows/integration-runner.yml index f81e37471a43..b99564b0cd0f 100644 --- a/.github/workflows/integration-runner.yml +++ b/.github/workflows/integration-runner.yml @@ -51,63 +51,64 @@ jobs: - name: Install Python dependencies using Poetry run: poetry install --without evaluation,llama-index - - name: Configure config.toml for testing with Haiku - env: - LLM_MODEL: "litellm_proxy/claude-3-5-haiku-20241022" - LLM_API_KEY: ${{ secrets.LLM_API_KEY }} - LLM_BASE_URL: ${{ secrets.LLM_BASE_URL }} - run: | - echo "[llm.eval]" > config.toml - echo "model = \"$LLM_MODEL\"" >> config.toml - echo "api_key = \"$LLM_API_KEY\"" >> config.toml - echo "base_url = \"$LLM_BASE_URL\"" >> config.toml - echo "temperature = 0.0" >> config.toml - - - name: Build environment - run: make build - - - name: Run integration test evaluation for Haiku - env: - SANDBOX_FORCE_REBUILD_RUNTIME: True - run: | - poetry run ./evaluation/integration_tests/scripts/run_infer.sh llm.eval HEAD CodeActAgent '' $N_PROCESSES '' 'haiku_run' - - # get integration tests report - REPORT_FILE_HAIKU=$(find evaluation/evaluation_outputs/outputs/integration_tests/CodeActAgent/*haiku*_maxiter_10_N* -name "report.md" -type f | head -n 1) - echo "REPORT_FILE: $REPORT_FILE_HAIKU" - echo "INTEGRATION_TEST_REPORT_HAIKU<> $GITHUB_ENV - cat $REPORT_FILE_HAIKU >> $GITHUB_ENV - echo >> $GITHUB_ENV - echo "EOF" >> $GITHUB_ENV - - - name: Wait a little bit - run: sleep 10 - - - name: Configure config.toml for testing with DeepSeek - env: - LLM_MODEL: "litellm_proxy/deepseek-chat" - LLM_API_KEY: ${{ secrets.LLM_API_KEY }} - LLM_BASE_URL: ${{ secrets.LLM_BASE_URL }} - run: | - echo "[llm.eval]" > config.toml - echo "model = \"$LLM_MODEL\"" >> config.toml - echo "api_key = \"$LLM_API_KEY\"" >> config.toml - echo "base_url = \"$LLM_BASE_URL\"" >> config.toml - echo "temperature = 0.0" >> config.toml - - - name: Run integration test evaluation for DeepSeek - env: - SANDBOX_FORCE_REBUILD_RUNTIME: True - run: | - poetry run ./evaluation/integration_tests/scripts/run_infer.sh llm.eval HEAD CodeActAgent '' $N_PROCESSES '' 'deepseek_run' - - # get integration tests report - REPORT_FILE_DEEPSEEK=$(find evaluation/evaluation_outputs/outputs/integration_tests/CodeActAgent/deepseek*_maxiter_10_N* -name "report.md" -type f | head -n 1) - echo "REPORT_FILE: $REPORT_FILE_DEEPSEEK" - echo "INTEGRATION_TEST_REPORT_DEEPSEEK<> $GITHUB_ENV - cat $REPORT_FILE_DEEPSEEK >> $GITHUB_ENV - echo >> $GITHUB_ENV - echo "EOF" >> $GITHUB_ENV + # Commenting out CodeActAgent Haiku tests + # - name: Configure config.toml for testing with Haiku + # env: + # LLM_MODEL: "litellm_proxy/claude-3-5-haiku-20241022" + # LLM_API_KEY: ${{ secrets.LLM_API_KEY }} + # LLM_BASE_URL: ${{ secrets.LLM_BASE_URL }} + # run: | + # echo "[llm.eval]" > config.toml + # echo "model = \"$LLM_MODEL\"" >> config.toml + # echo "api_key = \"$LLM_API_KEY\"" >> config.toml + # echo "base_url = \"$LLM_BASE_URL\"" >> config.toml + # echo "temperature = 0.0" >> config.toml + + #- name: Build environment + # run: make build + # - name: Run integration test evaluation for Haiku + # env: + # SANDBOX_FORCE_REBUILD_RUNTIME: True + # run: | + # poetry run ./evaluation/integration_tests/scripts/run_infer.sh llm.eval HEAD CodeActAgent '' $N_PROCESSES '' 'haiku_run' + + # # get integration tests report + # REPORT_FILE_HAIKU=$(find evaluation/evaluation_outputs/outputs/integration_tests/CodeActAgent/*haiku*_maxiter_10_N* -name "report.md" -type f | head -n 1) + # echo "REPORT_FILE: $REPORT_FILE_HAIKU" + # echo "INTEGRATION_TEST_REPORT_HAIKU<> $GITHUB_ENV + # cat $REPORT_FILE_HAIKU >> $GITHUB_ENV + # echo >> $GITHUB_ENV + # echo "EOF" >> $GITHUB_ENV + + # - name: Wait a little bit + # run: sleep 10 + + # Commenting out CodeActAgent DeepSeek tests + # - name: Configure config.toml for testing with DeepSeek + # env: + # LLM_MODEL: "litellm_proxy/deepseek-chat" + # LLM_API_KEY: ${{ secrets.LLM_API_KEY }} + # LLM_BASE_URL: ${{ secrets.LLM_BASE_URL }} + # run: | + # echo "[llm.eval]" > config.toml + # echo "model = \"$LLM_MODEL\"" >> config.toml + # echo "api_key = \"$LLM_API_KEY\"" >> config.toml + # echo "base_url = \"$LLM_BASE_URL\"" >> config.toml + # echo "temperature = 0.0" >> config.toml + + # - name: Run integration test evaluation for DeepSeek + # env: + # SANDBOX_FORCE_REBUILD_RUNTIME: True + # run: | + # poetry run ./evaluation/integration_tests/scripts/run_infer.sh llm.eval HEAD CodeActAgent '' $N_PROCESSES '' 'deepseek_run' + + # # get integration tests report + # REPORT_FILE_DEEPSEEK=$(find evaluation/evaluation_outputs/outputs/integration_tests/CodeActAgent/deepseek*_maxiter_10_N* -name "report.md" -type f | head -n 1) + # echo "REPORT_FILE: $REPORT_FILE_DEEPSEEK" + # echo "INTEGRATION_TEST_REPORT_DEEPSEEK<> $GITHUB_ENV + # cat $REPORT_FILE_DEEPSEEK >> $GITHUB_ENV + # echo >> $GITHUB_ENV + # echo "EOF" >> $GITHUB_ENV # ------------------------------------------------------------- # Run DelegatorAgent tests for Haiku, limited to t01 and t02 @@ -211,11 +212,13 @@ jobs: Commit: ${{ github.sha }} **Integration Tests Report (Haiku)** Haiku LLM Test Results: - ${{ env.INTEGRATION_TEST_REPORT_HAIKU }} + # uncomment me + #${{ env.INTEGRATION_TEST_REPORT_HAIKU }} --- **Integration Tests Report (DeepSeek)** DeepSeek LLM Test Results: - ${{ env.INTEGRATION_TEST_REPORT_DEEPSEEK }} + # uncomment me + #${{ env.INTEGRATION_TEST_REPORT_DEEPSEEK }} --- **Integration Tests Report Delegator (Haiku)** ${{ env.INTEGRATION_TEST_REPORT_DELEGATOR_HAIKU }} From d07f225cdf47a6891f07e9c6592b74d8f61fae25 Mon Sep 17 00:00:00 2001 From: Engel Nyst Date: Fri, 10 Jan 2025 00:55:23 +0100 Subject: [PATCH 15/24] temporary run faster --- .github/workflows/integration-runner.yml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.github/workflows/integration-runner.yml b/.github/workflows/integration-runner.yml index b99564b0cd0f..69cb08f5308f 100644 --- a/.github/workflows/integration-runner.yml +++ b/.github/workflows/integration-runner.yml @@ -176,7 +176,9 @@ jobs: run: | TIMESTAMP=$(date +'%y-%m-%d-%H-%M') cd evaluation/evaluation_outputs/outputs # Change to the outputs directory - tar -czvf ../../../integration_tests_${TIMESTAMP}.tar.gz integration_tests/CodeActAgent/* integration_tests/DelegatorAgent/* # Only include the actual result directories + # uncomment me + #tar -czvf ../../../integration_tests_${TIMESTAMP}.tar.gz integration_tests/CodeActAgent/* integration_tests/DelegatorAgent/* # Only include the actual result directories + tar -czvf ../../../integration_tests_${TIMESTAMP}.tar.gz integration_tests/DelegatorAgent/* - name: Upload evaluation results as artifact uses: actions/upload-artifact@v4 From 1fb90ec34497975a9533f66fcb9a0d5d11961336 Mon Sep 17 00:00:00 2001 From: Engel Nyst Date: Fri, 10 Jan 2025 03:17:56 +0100 Subject: [PATCH 16/24] fix results report --- .github/workflows/integration-runner.yml | 4 ++++ evaluation/integration_tests/run_infer.py | 20 +++++++++++++++---- .../integration_tests/scripts/run_infer.sh | 2 +- 3 files changed, 21 insertions(+), 5 deletions(-) diff --git a/.github/workflows/integration-runner.yml b/.github/workflows/integration-runner.yml index 69cb08f5308f..7ceae21d4479 100644 --- a/.github/workflows/integration-runner.yml +++ b/.github/workflows/integration-runner.yml @@ -57,6 +57,7 @@ jobs: # LLM_MODEL: "litellm_proxy/claude-3-5-haiku-20241022" # LLM_API_KEY: ${{ secrets.LLM_API_KEY }} # LLM_BASE_URL: ${{ secrets.LLM_BASE_URL }} + # MAX_ITERATIONS: 10 # run: | # echo "[llm.eval]" > config.toml # echo "model = \"$LLM_MODEL\"" >> config.toml @@ -89,6 +90,7 @@ jobs: # LLM_MODEL: "litellm_proxy/deepseek-chat" # LLM_API_KEY: ${{ secrets.LLM_API_KEY }} # LLM_BASE_URL: ${{ secrets.LLM_BASE_URL }} + # MAX_ITERATIONS: 10 # run: | # echo "[llm.eval]" > config.toml # echo "model = \"$LLM_MODEL\"" >> config.toml @@ -120,6 +122,7 @@ jobs: LLM_MODEL: "litellm_proxy/claude-3-5-haiku-20241022" LLM_API_KEY: ${{ secrets.LLM_API_KEY }} LLM_BASE_URL: ${{ secrets.LLM_BASE_URL }} + MAX_ITERATIONS: 30 run: | echo "[llm.eval]" > config.toml echo "model = \"$LLM_MODEL\"" >> config.toml @@ -151,6 +154,7 @@ jobs: LLM_MODEL: "litellm_proxy/deepseek-chat" LLM_API_KEY: ${{ secrets.LLM_API_KEY }} LLM_BASE_URL: ${{ secrets.LLM_BASE_URL }} + MAX_ITERATIONS: 30 run: | echo "[llm.eval]" > config.toml echo "model = \"$LLM_MODEL\"" >> config.toml diff --git a/evaluation/integration_tests/run_infer.py b/evaluation/integration_tests/run_infer.py index fe411c33becf..5036cc34b541 100644 --- a/evaluation/integration_tests/run_infer.py +++ b/evaluation/integration_tests/run_infer.py @@ -222,7 +222,7 @@ def load_integration_tests() -> pd.DataFrame: df = pd.read_json(output_file, lines=True, orient='records') - # record success and reason for failure for the final report + # record success and reason df['success'] = df['test_result'].apply(lambda x: x['success']) df['reason'] = df['test_result'].apply(lambda x: x['reason']) logger.info('-' * 100) @@ -237,15 +237,27 @@ def load_integration_tests() -> pd.DataFrame: logger.info('-' * 100) # record cost for each instance, with 3 decimal places - df['cost'] = df['metrics'].apply(lambda x: round(x['accumulated_cost'], 3)) + # we sum up all the "costs" from the metrics array + df['cost'] = df['metrics'].apply( + lambda m: round(sum(c['cost'] for c in m['costs']), 3) + if m and 'costs' in m + else 0.0 + ) + + # capture the top-level error if present, per instance + df['error_message'] = df['error'] + logger.info(f'Total cost: USD {df["cost"].sum():.2f}') report_file = os.path.join(metadata.eval_output_dir, 'report.md') with open(report_file, 'w') as f: f.write( - f'Success rate: {df["success"].mean():.2%} ({df["success"].sum()}/{len(df)})\n' + f'Success rate: {df["success"].mean():.2%}' + f' ({df["success"].sum()}/{len(df)})\n' ) f.write(f'\nTotal cost: USD {df["cost"].sum():.2f}\n') f.write( - df[['instance_id', 'success', 'reason', 'cost']].to_markdown(index=False) + df[ + ['instance_id', 'success', 'reason', 'cost', 'error_message'] + ].to_markdown(index=False) ) diff --git a/evaluation/integration_tests/scripts/run_infer.sh b/evaluation/integration_tests/scripts/run_infer.sh index 3ca1529359db..e5ae35e849d2 100755 --- a/evaluation/integration_tests/scripts/run_infer.sh +++ b/evaluation/integration_tests/scripts/run_infer.sh @@ -43,7 +43,7 @@ fi COMMAND="poetry run python evaluation/integration_tests/run_infer.py \ --agent-cls $AGENT \ --llm-config $MODEL_CONFIG \ - --max-iterations 10 \ + --max-iterations ${MAX_ITERATIONS:-10} \ --eval-num-workers $NUM_WORKERS \ --eval-note $EVAL_NOTE" From 0f00ea6ea3f034d3ac22a1bb03339280e3eaeb84 Mon Sep 17 00:00:00 2001 From: Engel Nyst Date: Fri, 10 Jan 2025 03:44:09 +0100 Subject: [PATCH 17/24] fix iterations --- .github/workflows/integration-runner.yml | 8 ++++---- evaluation/integration_tests/run_infer.py | 2 +- evaluation/integration_tests/scripts/run_infer.sh | 5 +++-- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/.github/workflows/integration-runner.yml b/.github/workflows/integration-runner.yml index 7ceae21d4479..8d9b69e022af 100644 --- a/.github/workflows/integration-runner.yml +++ b/.github/workflows/integration-runner.yml @@ -134,10 +134,10 @@ jobs: env: SANDBOX_FORCE_REBUILD_RUNTIME: True run: | - poetry run ./evaluation/integration_tests/scripts/run_infer.sh llm.eval HEAD DelegatorAgent '' $N_PROCESSES "t01_fix_simple_typo,t02_add_bash_hello" 'delegator_haiku_run' + poetry run ./evaluation/integration_tests/scripts/run_infer.sh llm.eval HEAD DelegatorAgent '' 30 $N_PROCESSES "t01_fix_simple_typo,t02_add_bash_hello" 'delegator_haiku_run' # Find and export the delegator test results - REPORT_FILE_DELEGATOR_HAIKU=$(find evaluation/evaluation_outputs/outputs/integration_tests/DelegatorAgent/*haiku*_maxiter_10_N* -name "report.md" -type f | head -n 1) + REPORT_FILE_DELEGATOR_HAIKU=$(find evaluation/evaluation_outputs/outputs/integration_tests/DelegatorAgent/*haiku*_maxiter_30_N* -name "report.md" -type f | head -n 1) echo "REPORT_FILE_DELEGATOR_HAIKU: $REPORT_FILE_DELEGATOR_HAIKU" echo "INTEGRATION_TEST_REPORT_DELEGATOR_HAIKU<> $GITHUB_ENV cat $REPORT_FILE_DELEGATOR_HAIKU >> $GITHUB_ENV @@ -166,10 +166,10 @@ jobs: env: SANDBOX_FORCE_REBUILD_RUNTIME: True run: | - poetry run ./evaluation/integration_tests/scripts/run_infer.sh llm.eval HEAD DelegatorAgent '' $N_PROCESSES "t01_fix_simple_typo,t02_add_bash_hello" 'delegator_deepseek_run' + poetry run ./evaluation/integration_tests/scripts/run_infer.sh llm.eval HEAD DelegatorAgent '' 30 $N_PROCESSES "t01_fix_simple_typo,t02_add_bash_hello" 'delegator_deepseek_run' # Find and export the delegator test results - REPORT_FILE_DELEGATOR_DEEPSEEK=$(find evaluation/evaluation_outputs/outputs/integration_tests/DelegatorAgent/deepseek*_maxiter_10_N* -name "report.md" -type f | head -n 1) + REPORT_FILE_DELEGATOR_DEEPSEEK=$(find evaluation/evaluation_outputs/outputs/integration_tests/DelegatorAgent/deepseek*_maxiter_30_N* -name "report.md" -type f | head -n 1) echo "REPORT_FILE_DELEGATOR_DEEPSEEK: $REPORT_FILE_DELEGATOR_DEEPSEEK" echo "INTEGRATION_TEST_REPORT_DELEGATOR_DEEPSEEK<> $GITHUB_ENV cat $REPORT_FILE_DELEGATOR_DEEPSEEK >> $GITHUB_ENV diff --git a/evaluation/integration_tests/run_infer.py b/evaluation/integration_tests/run_infer.py index 5036cc34b541..b7018d0b04d1 100644 --- a/evaluation/integration_tests/run_infer.py +++ b/evaluation/integration_tests/run_infer.py @@ -245,7 +245,7 @@ def load_integration_tests() -> pd.DataFrame: ) # capture the top-level error if present, per instance - df['error_message'] = df['error'] + df['error_message'] = df.get('error', None) logger.info(f'Total cost: USD {df["cost"].sum():.2f}') diff --git a/evaluation/integration_tests/scripts/run_infer.sh b/evaluation/integration_tests/scripts/run_infer.sh index e5ae35e849d2..32702afa9013 100755 --- a/evaluation/integration_tests/scripts/run_infer.sh +++ b/evaluation/integration_tests/scripts/run_infer.sh @@ -7,8 +7,9 @@ MODEL_CONFIG=$1 COMMIT_HASH=$2 AGENT=$3 EVAL_LIMIT=$4 -NUM_WORKERS=$5 -EVAL_IDS=$6 +MAX_ITERATIONS=$5 +NUM_WORKERS=$6 +EVAL_IDS=$7 if [ -z "$NUM_WORKERS" ]; then NUM_WORKERS=1 From 2f83984de02aaecbc54672c4764be1d671627fb2 Mon Sep 17 00:00:00 2001 From: Engel Nyst Date: Wed, 15 Jan 2025 03:01:59 +0100 Subject: [PATCH 18/24] tweak, comments --- openhands/controller/agent_controller.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/openhands/controller/agent_controller.py b/openhands/controller/agent_controller.py index 50265f8fdd5c..ec3c88ee03a0 100644 --- a/openhands/controller/agent_controller.py +++ b/openhands/controller/agent_controller.py @@ -264,7 +264,6 @@ def on_event(self, event: Event) -> None: Args: event (Event): The incoming event to process. """ - print(f'CONTROLLER {self.id}:on_event: {event.__class__.__name__}({event.id})') # If we have a delegate that is not finished or errored, forward events to it @@ -281,10 +280,11 @@ def on_event(self, event: Event) -> None: ) return else: - # delegate is done + # delegate is done or errored, so end it self.end_delegate() return + # continue parent processing only if there's no active delegate asyncio.get_event_loop().run_until_complete(self._on_event(event)) async def _on_event(self, event: Event) -> None: @@ -292,6 +292,9 @@ async def _on_event(self, event: Event) -> None: if hasattr(event, 'hidden') and event.hidden: return + # Give others a little chance + await asyncio.sleep(0.01) + # if the event is not filtered out, add it to the history if not any(isinstance(event, filter_type) for filter_type in self.filter_out): self.state.history.append(event) @@ -554,17 +557,19 @@ async def start_delegate(self, action: AgentDelegateAction) -> None: ) def end_delegate(self) -> None: + """Ends the currently active delegate (e.g., if it is finished or errored) + so that this controller can resume normal operation. + """ print(f'CONTROLLER {self.id}:end_delegate') if self.delegate is None: return delegate_state = self.delegate.get_agent_state() - # update iteration that shall be shared across agents + # update iteration that is shared across agents self.state.iteration = self.delegate.state.iteration # close the delegate controller before adding new events - # then add the delegate observation asyncio.get_event_loop().run_until_complete(self.delegate.close()) if delegate_state in (AgentState.FINISHED, AgentState.REJECTED): From 0f7401426dae22645859a21f0ff8211ce8b20775 Mon Sep 17 00:00:00 2001 From: Engel Nyst Date: Wed, 15 Jan 2025 03:05:31 +0100 Subject: [PATCH 19/24] clean up debugging --- openhands/agenthub/micro/agent.py | 1 - openhands/controller/agent_controller.py | 5 ----- openhands/core/main.py | 1 - openhands/events/stream.py | 1 - openhands/runtime/base.py | 1 - 5 files changed, 9 deletions(-) diff --git a/openhands/agenthub/micro/agent.py b/openhands/agenthub/micro/agent.py index 7c51043236a2..b0ec514a4d48 100644 --- a/openhands/agenthub/micro/agent.py +++ b/openhands/agenthub/micro/agent.py @@ -68,7 +68,6 @@ def __init__(self, llm: LLM, config: AgentConfig): def step(self, state: State) -> Action: last_user_message, last_image_urls = state.get_current_user_intent() - print(f'MICROAGENT:step: {last_user_message}') prompt = self.prompt_template.render( state=state, instructions=instructions, diff --git a/openhands/controller/agent_controller.py b/openhands/controller/agent_controller.py index ec3c88ee03a0..521968c88d12 100644 --- a/openhands/controller/agent_controller.py +++ b/openhands/controller/agent_controller.py @@ -264,7 +264,6 @@ def on_event(self, event: Event) -> None: Args: event (Event): The incoming event to process. """ - print(f'CONTROLLER {self.id}:on_event: {event.__class__.__name__}({event.id})') # If we have a delegate that is not finished or errored, forward events to it if self.delegate is not None: @@ -288,7 +287,6 @@ def on_event(self, event: Event) -> None: asyncio.get_event_loop().run_until_complete(self._on_event(event)) async def _on_event(self, event: Event) -> None: - print(f'CONTROLLER {self.id}:_on_event: {event.__class__.__name__}({event.id})') if hasattr(event, 'hidden') and event.hidden: return @@ -486,7 +484,6 @@ async def set_agent_state_to(self, new_state: AgentState) -> None: self._pending_action._id = None # type: ignore[attr-defined] self.event_stream.add_event(self._pending_action, EventSource.AGENT) - print(f'CONTROLLER {self.id}:set_agent_state_to: {new_state}') self.state.agent_state = new_state self.event_stream.add_event( AgentStateChangedObservation('', self.state.agent_state), @@ -560,7 +557,6 @@ def end_delegate(self) -> None: """Ends the currently active delegate (e.g., if it is finished or errored) so that this controller can resume normal operation. """ - print(f'CONTROLLER {self.id}:end_delegate') if self.delegate is None: return @@ -610,7 +606,6 @@ def end_delegate(self) -> None: async def _step(self) -> None: """Executes a single step of the parent or delegate agent. Detects stuck agents and limits on the number of iterations and the task budget.""" - print(f'CONTROLLER {self.id}:_step') if self.get_agent_state() != AgentState.RUNNING: return diff --git a/openhands/core/main.py b/openhands/core/main.py index 7664eeedfe6a..b27cac1e586d 100644 --- a/openhands/core/main.py +++ b/openhands/core/main.py @@ -129,7 +129,6 @@ async def run_controller( event_stream.add_event(initial_user_action, EventSource.USER) def on_event(event: Event): - print(f'MAIN:on_event: {event.__class__.__name__}({event.id})') if isinstance(event, AgentStateChangedObservation): if event.agent_state == AgentState.AWAITING_USER_INPUT: if exit_on_message: diff --git a/openhands/events/stream.py b/openhands/events/stream.py index 088bf0987fb2..2ef6047f24f6 100644 --- a/openhands/events/stream.py +++ b/openhands/events/stream.py @@ -269,7 +269,6 @@ def add_event(self, event: Event, source: EventSource): data = event_to_dict(event) if event.id is not None: self.file_store.write(self._get_filename_for_id(event.id), json.dumps(data)) - print(f'EVENTSTREAM:add_event: {event.__class__.__name__}({event.id})') self._queue.put(event) def _run_queue_loop(self): diff --git a/openhands/runtime/base.py b/openhands/runtime/base.py index 053856ecd195..cc5ca086e0d3 100644 --- a/openhands/runtime/base.py +++ b/openhands/runtime/base.py @@ -176,7 +176,6 @@ def add_env_vars(self, env_vars: dict[str, str]) -> None: ) def on_event(self, event: Event) -> None: - print(f'RUNTIME:on_event: {event.__class__.__name__}({event.id})\n') if isinstance(event, Action): asyncio.get_event_loop().run_until_complete(self._handle_action(event)) From 0295753f4a310e917a269c8d9373480f51085b70 Mon Sep 17 00:00:00 2001 From: Engel Nyst Date: Wed, 15 Jan 2025 03:07:25 +0100 Subject: [PATCH 20/24] Revert "temporary run faster" This reverts commit d07f225cdf47a6891f07e9c6592b74d8f61fae25. --- .github/workflows/integration-runner.yml | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/.github/workflows/integration-runner.yml b/.github/workflows/integration-runner.yml index 8d9b69e022af..ea83bc3171a0 100644 --- a/.github/workflows/integration-runner.yml +++ b/.github/workflows/integration-runner.yml @@ -180,9 +180,7 @@ jobs: run: | TIMESTAMP=$(date +'%y-%m-%d-%H-%M') cd evaluation/evaluation_outputs/outputs # Change to the outputs directory - # uncomment me - #tar -czvf ../../../integration_tests_${TIMESTAMP}.tar.gz integration_tests/CodeActAgent/* integration_tests/DelegatorAgent/* # Only include the actual result directories - tar -czvf ../../../integration_tests_${TIMESTAMP}.tar.gz integration_tests/DelegatorAgent/* + tar -czvf ../../../integration_tests_${TIMESTAMP}.tar.gz integration_tests/CodeActAgent/* integration_tests/DelegatorAgent/* # Only include the actual result directories - name: Upload evaluation results as artifact uses: actions/upload-artifact@v4 From 4db521a0ac04513f797ba7fe1e742a2c86cbcaea Mon Sep 17 00:00:00 2001 From: Engel Nyst Date: Wed, 15 Jan 2025 03:09:11 +0100 Subject: [PATCH 21/24] Revert "temporary - quicker run" This reverts commit 8d7382cd8d88ba65f4d50f29c149bfc85a6d7b9a. --- .github/workflows/integration-runner.yml | 125 +++++++++++------------ 1 file changed, 61 insertions(+), 64 deletions(-) diff --git a/.github/workflows/integration-runner.yml b/.github/workflows/integration-runner.yml index ea83bc3171a0..762e0c9de712 100644 --- a/.github/workflows/integration-runner.yml +++ b/.github/workflows/integration-runner.yml @@ -51,66 +51,65 @@ jobs: - name: Install Python dependencies using Poetry run: poetry install --without evaluation,llama-index - # Commenting out CodeActAgent Haiku tests - # - name: Configure config.toml for testing with Haiku - # env: - # LLM_MODEL: "litellm_proxy/claude-3-5-haiku-20241022" - # LLM_API_KEY: ${{ secrets.LLM_API_KEY }} - # LLM_BASE_URL: ${{ secrets.LLM_BASE_URL }} - # MAX_ITERATIONS: 10 - # run: | - # echo "[llm.eval]" > config.toml - # echo "model = \"$LLM_MODEL\"" >> config.toml - # echo "api_key = \"$LLM_API_KEY\"" >> config.toml - # echo "base_url = \"$LLM_BASE_URL\"" >> config.toml - # echo "temperature = 0.0" >> config.toml - - #- name: Build environment - # run: make build - # - name: Run integration test evaluation for Haiku - # env: - # SANDBOX_FORCE_REBUILD_RUNTIME: True - # run: | - # poetry run ./evaluation/integration_tests/scripts/run_infer.sh llm.eval HEAD CodeActAgent '' $N_PROCESSES '' 'haiku_run' - - # # get integration tests report - # REPORT_FILE_HAIKU=$(find evaluation/evaluation_outputs/outputs/integration_tests/CodeActAgent/*haiku*_maxiter_10_N* -name "report.md" -type f | head -n 1) - # echo "REPORT_FILE: $REPORT_FILE_HAIKU" - # echo "INTEGRATION_TEST_REPORT_HAIKU<> $GITHUB_ENV - # cat $REPORT_FILE_HAIKU >> $GITHUB_ENV - # echo >> $GITHUB_ENV - # echo "EOF" >> $GITHUB_ENV - - # - name: Wait a little bit - # run: sleep 10 - - # Commenting out CodeActAgent DeepSeek tests - # - name: Configure config.toml for testing with DeepSeek - # env: - # LLM_MODEL: "litellm_proxy/deepseek-chat" - # LLM_API_KEY: ${{ secrets.LLM_API_KEY }} - # LLM_BASE_URL: ${{ secrets.LLM_BASE_URL }} - # MAX_ITERATIONS: 10 - # run: | - # echo "[llm.eval]" > config.toml - # echo "model = \"$LLM_MODEL\"" >> config.toml - # echo "api_key = \"$LLM_API_KEY\"" >> config.toml - # echo "base_url = \"$LLM_BASE_URL\"" >> config.toml - # echo "temperature = 0.0" >> config.toml - - # - name: Run integration test evaluation for DeepSeek - # env: - # SANDBOX_FORCE_REBUILD_RUNTIME: True - # run: | - # poetry run ./evaluation/integration_tests/scripts/run_infer.sh llm.eval HEAD CodeActAgent '' $N_PROCESSES '' 'deepseek_run' - - # # get integration tests report - # REPORT_FILE_DEEPSEEK=$(find evaluation/evaluation_outputs/outputs/integration_tests/CodeActAgent/deepseek*_maxiter_10_N* -name "report.md" -type f | head -n 1) - # echo "REPORT_FILE: $REPORT_FILE_DEEPSEEK" - # echo "INTEGRATION_TEST_REPORT_DEEPSEEK<> $GITHUB_ENV - # cat $REPORT_FILE_DEEPSEEK >> $GITHUB_ENV - # echo >> $GITHUB_ENV - # echo "EOF" >> $GITHUB_ENV + - name: Configure config.toml for testing with Haiku + env: + LLM_MODEL: "litellm_proxy/claude-3-5-haiku-20241022" + LLM_API_KEY: ${{ secrets.LLM_API_KEY }} + LLM_BASE_URL: ${{ secrets.LLM_BASE_URL }} + MAX_ITERATIONS: 10 + run: | + echo "[llm.eval]" > config.toml + echo "model = \"$LLM_MODEL\"" >> config.toml + echo "api_key = \"$LLM_API_KEY\"" >> config.toml + echo "base_url = \"$LLM_BASE_URL\"" >> config.toml + echo "temperature = 0.0" >> config.toml + + - name: Build environment + run: make build + + - name: Run integration test evaluation for Haiku + env: + SANDBOX_FORCE_REBUILD_RUNTIME: True + run: | + poetry run ./evaluation/integration_tests/scripts/run_infer.sh llm.eval HEAD CodeActAgent '' $N_PROCESSES '' 'haiku_run' + + # get integration tests report + REPORT_FILE_HAIKU=$(find evaluation/evaluation_outputs/outputs/integration_tests/CodeActAgent/*haiku*_maxiter_10_N* -name "report.md" -type f | head -n 1) + echo "REPORT_FILE: $REPORT_FILE_HAIKU" + echo "INTEGRATION_TEST_REPORT_HAIKU<> $GITHUB_ENV + cat $REPORT_FILE_HAIKU >> $GITHUB_ENV + echo >> $GITHUB_ENV + echo "EOF" >> $GITHUB_ENV + + - name: Wait a little bit + run: sleep 10 + + - name: Configure config.toml for testing with DeepSeek + env: + LLM_MODEL: "litellm_proxy/deepseek-chat" + LLM_API_KEY: ${{ secrets.LLM_API_KEY }} + LLM_BASE_URL: ${{ secrets.LLM_BASE_URL }} + MAX_ITERATIONS: 10 + run: | + echo "[llm.eval]" > config.toml + echo "model = \"$LLM_MODEL\"" >> config.toml + echo "api_key = \"$LLM_API_KEY\"" >> config.toml + echo "base_url = \"$LLM_BASE_URL\"" >> config.toml + echo "temperature = 0.0" >> config.toml + + - name: Run integration test evaluation for DeepSeek + env: + SANDBOX_FORCE_REBUILD_RUNTIME: True + run: | + poetry run ./evaluation/integration_tests/scripts/run_infer.sh llm.eval HEAD CodeActAgent '' $N_PROCESSES '' 'deepseek_run' + + # get integration tests report + REPORT_FILE_DEEPSEEK=$(find evaluation/evaluation_outputs/outputs/integration_tests/CodeActAgent/deepseek*_maxiter_10_N* -name "report.md" -type f | head -n 1) + echo "REPORT_FILE: $REPORT_FILE_DEEPSEEK" + echo "INTEGRATION_TEST_REPORT_DEEPSEEK<> $GITHUB_ENV + cat $REPORT_FILE_DEEPSEEK >> $GITHUB_ENV + echo >> $GITHUB_ENV + echo "EOF" >> $GITHUB_ENV # ------------------------------------------------------------- # Run DelegatorAgent tests for Haiku, limited to t01 and t02 @@ -216,13 +215,11 @@ jobs: Commit: ${{ github.sha }} **Integration Tests Report (Haiku)** Haiku LLM Test Results: - # uncomment me - #${{ env.INTEGRATION_TEST_REPORT_HAIKU }} + ${{ env.INTEGRATION_TEST_REPORT_HAIKU }} --- **Integration Tests Report (DeepSeek)** DeepSeek LLM Test Results: - # uncomment me - #${{ env.INTEGRATION_TEST_REPORT_DEEPSEEK }} + ${{ env.INTEGRATION_TEST_REPORT_DEEPSEEK }} --- **Integration Tests Report Delegator (Haiku)** ${{ env.INTEGRATION_TEST_REPORT_DELEGATOR_HAIKU }} From 2efc6cbd86148c235b8cf4fd817938db3175b223 Mon Sep 17 00:00:00 2001 From: Engel Nyst Date: Wed, 15 Jan 2025 03:24:13 +0100 Subject: [PATCH 22/24] fix parameter order --- .github/workflows/integration-runner.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/integration-runner.yml b/.github/workflows/integration-runner.yml index 762e0c9de712..9af41bbdecdb 100644 --- a/.github/workflows/integration-runner.yml +++ b/.github/workflows/integration-runner.yml @@ -71,7 +71,7 @@ jobs: env: SANDBOX_FORCE_REBUILD_RUNTIME: True run: | - poetry run ./evaluation/integration_tests/scripts/run_infer.sh llm.eval HEAD CodeActAgent '' $N_PROCESSES '' 'haiku_run' + poetry run ./evaluation/integration_tests/scripts/run_infer.sh llm.eval HEAD CodeActAgent '' 10 $N_PROCESSES '' 'haiku_run' # get integration tests report REPORT_FILE_HAIKU=$(find evaluation/evaluation_outputs/outputs/integration_tests/CodeActAgent/*haiku*_maxiter_10_N* -name "report.md" -type f | head -n 1) @@ -101,7 +101,7 @@ jobs: env: SANDBOX_FORCE_REBUILD_RUNTIME: True run: | - poetry run ./evaluation/integration_tests/scripts/run_infer.sh llm.eval HEAD CodeActAgent '' $N_PROCESSES '' 'deepseek_run' + poetry run ./evaluation/integration_tests/scripts/run_infer.sh llm.eval HEAD CodeActAgent '' 10 $N_PROCESSES '' 'deepseek_run' # get integration tests report REPORT_FILE_DEEPSEEK=$(find evaluation/evaluation_outputs/outputs/integration_tests/CodeActAgent/deepseek*_maxiter_10_N* -name "report.md" -type f | head -n 1) From 2b1a2eb96897c811df1add85977797ff75857c20 Mon Sep 17 00:00:00 2001 From: Engel Nyst Date: Wed, 15 Jan 2025 04:01:22 +0100 Subject: [PATCH 23/24] remove messing with history for now --- openhands/agenthub/micro/agent.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/openhands/agenthub/micro/agent.py b/openhands/agenthub/micro/agent.py index b0ec514a4d48..2c22e3840a51 100644 --- a/openhands/agenthub/micro/agent.py +++ b/openhands/agenthub/micro/agent.py @@ -52,9 +52,7 @@ def history_to_json(self, history: list[Event], max_events: int = 20, **kwargs): # everything starts with a message # the first message is already in the prompt as the task - # so we don't need to include it in the history - if event_count < max_events: - processed_history.pop(0) + # TODO: so we don't need to include it in the history return json.dumps(processed_history, **kwargs) From 3df4d307f6ca5e3f9f7a5c710a93ad181b07ec0e Mon Sep 17 00:00:00 2001 From: Engel Nyst Date: Wed, 15 Jan 2025 04:03:25 +0100 Subject: [PATCH 24/24] remove debug --- openhands/runtime/base.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/openhands/runtime/base.py b/openhands/runtime/base.py index cc5ca086e0d3..3cb6ca8f74c8 100644 --- a/openhands/runtime/base.py +++ b/openhands/runtime/base.py @@ -184,9 +184,6 @@ async def _handle_action(self, event: Action) -> None: event.timeout = self.config.sandbox.timeout assert event.timeout is not None try: - print( - f'ASYNC RUNTIME:_handle_action: {event.__class__.__name__}({event.id})' - ) observation: Observation = await call_sync_from_async( self.run_action, event )