Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delegation fixes #6165

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 69 additions & 1 deletion .github/workflows/integration-runner.yml
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,73 @@ jobs:
echo >> $GITHUB_ENV
echo "EOF" >> $GITHUB_ENV

# -------------------------------------------------------------
# Run DelegatorAgent tests for Haiku, limited to t01 and t02
- name: Wait a little bit (again)
run: sleep 5

- name: Configure config.toml for testing DelegatorAgent (Haiku)
env:
LLM_MODEL: "litellm_proxy/claude-3-5-haiku-20241022"
LLM_API_KEY: ${{ secrets.LLM_API_KEY }}
LLM_BASE_URL: ${{ secrets.LLM_BASE_URL }}
run: |
echo "[llm.eval]" > config.toml
echo "model = \"$LLM_MODEL\"" >> config.toml
echo "api_key = \"$LLM_API_KEY\"" >> config.toml
echo "base_url = \"$LLM_BASE_URL\"" >> config.toml
echo "temperature = 0.0" >> config.toml

- name: Run integration test evaluation for DelegatorAgent (Haiku)
env:
SANDBOX_FORCE_REBUILD_RUNTIME: True
run: |
poetry run ./evaluation/integration_tests/scripts/run_infer.sh llm.eval HEAD DelegatorAgent '' $N_PROCESSES "t01_fix_simple_typo,t02_add_bash_hello" 'delegator_haiku_run'

# Find and export the delegator test results
REPORT_FILE_DELEGATOR_HAIKU=$(find evaluation/evaluation_outputs/outputs/integration_tests/DelegatorAgent/*haiku*_maxiter_10_N* -name "report.md" -type f | head -n 1)
echo "REPORT_FILE_DELEGATOR_HAIKU: $REPORT_FILE_DELEGATOR_HAIKU"
echo "INTEGRATION_TEST_REPORT_DELEGATOR_HAIKU<<EOF" >> $GITHUB_ENV
cat $REPORT_FILE_DELEGATOR_HAIKU >> $GITHUB_ENV
echo >> $GITHUB_ENV
echo "EOF" >> $GITHUB_ENV

# -------------------------------------------------------------
# Run DelegatorAgent tests for DeepSeek, limited to t01 and t02
- name: Wait a little bit (again)
run: sleep 5

- name: Configure config.toml for testing DelegatorAgent (DeepSeek)
env:
LLM_MODEL: "litellm_proxy/deepseek-chat"
LLM_API_KEY: ${{ secrets.LLM_API_KEY }}
LLM_BASE_URL: ${{ secrets.LLM_BASE_URL }}
run: |
echo "[llm.eval]" > config.toml
echo "model = \"$LLM_MODEL\"" >> config.toml
echo "api_key = \"$LLM_API_KEY\"" >> config.toml
echo "base_url = \"$LLM_BASE_URL\"" >> config.toml
echo "temperature = 0.0" >> config.toml

- name: Run integration test evaluation for DelegatorAgent (DeepSeek)
enyst marked this conversation as resolved.
Show resolved Hide resolved
env:
SANDBOX_FORCE_REBUILD_RUNTIME: True
run: |
poetry run ./evaluation/integration_tests/scripts/run_infer.sh llm.eval HEAD DelegatorAgent '' $N_PROCESSES "t01_fix_simple_typo,t02_add_bash_hello" 'delegator_deepseek_run'

# Find and export the delegator test results
REPORT_FILE_DELEGATOR_DEEPSEEK=$(find evaluation/evaluation_outputs/outputs/integration_tests/DelegatorAgent/deepseek*_maxiter_10_N* -name "report.md" -type f | head -n 1)
echo "REPORT_FILE_DELEGATOR_DEEPSEEK: $REPORT_FILE_DELEGATOR_DEEPSEEK"
echo "INTEGRATION_TEST_REPORT_DELEGATOR_DEEPSEEK<<EOF" >> $GITHUB_ENV
cat $REPORT_FILE_DELEGATOR_DEEPSEEK >> $GITHUB_ENV
echo >> $GITHUB_ENV
echo "EOF" >> $GITHUB_ENV

- name: Create archive of evaluation outputs
run: |
TIMESTAMP=$(date +'%y-%m-%d-%H-%M')
cd evaluation/evaluation_outputs/outputs # Change to the outputs directory
tar -czvf ../../../integration_tests_${TIMESTAMP}.tar.gz integration_tests/CodeActAgent/* # Only include the actual result directories
tar -czvf ../../../integration_tests_${TIMESTAMP}.tar.gz integration_tests/CodeActAgent/* integration_tests/DelegatorAgent/* # Only include the actual result directories

- name: Upload evaluation results as artifact
uses: actions/upload-artifact@v4
Expand Down Expand Up @@ -154,5 +216,11 @@ jobs:
**Integration Tests Report (DeepSeek)**
DeepSeek LLM Test Results:
${{ env.INTEGRATION_TEST_REPORT_DEEPSEEK }}
---
**Integration Tests Report Delegator (Haiku)**
${{ env.INTEGRATION_TEST_REPORT_DELEGATOR_HAIKU }}
---
**Integration Tests Report Delegator (DeepSeek)**
${{ env.INTEGRATION_TEST_REPORT_DELEGATOR_DEEPSEEK }}
---
Download testing outputs (includes both Haiku and DeepSeek results): [Download](${{ steps.upload_results_artifact.outputs.artifact-url }})
7 changes: 5 additions & 2 deletions evaluation/integration_tests/run_infer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@
from evaluation.utils.shared import (
EvalMetadata,
EvalOutput,
codeact_user_response,
make_metadata,
prepare_dataset,
reset_logger_for_multiprocessing,
run_evaluation,
update_llm_config_for_completions_logging,
)
from evaluation.utils.shared import (
codeact_user_response as fake_user_response,
)
from openhands.controller.state.state import State
from openhands.core.config import (
AgentConfig,
Expand All @@ -31,7 +33,8 @@
from openhands.utils.async_utils import call_async_from_sync

FAKE_RESPONSES = {
'CodeActAgent': codeact_user_response,
'CodeActAgent': fake_user_response,
'DelegatorAgent': fake_user_response,
}


Expand Down
157 changes: 96 additions & 61 deletions openhands/controller/agent_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,16 @@ def __init__(
self.id = sid
self.agent = agent
self.headless_mode = headless_mode
self.is_delegate = is_delegate

# subscribe to the event stream
# the event stream must be set before maybe subscribing to it
self.event_stream = event_stream
self.event_stream.subscribe(
EventStreamSubscriber.AGENT_CONTROLLER, self.on_event, self.id
)

# subscribe to the event stream if this is not a delegate
if not self.is_delegate:
self.event_stream.subscribe(
EventStreamSubscriber.AGENT_CONTROLLER, self.on_event, self.id
)

# state from the previous session, state from a parent agent, or a fresh state
self.set_initial_state(
Expand Down Expand Up @@ -229,6 +233,14 @@ def should_step(self, event: Event) -> bool:
if isinstance(event, Action):
if isinstance(event, MessageAction) and event.source == EventSource.USER:
return True
if (
isinstance(event, MessageAction)
and self.get_agent_state() != AgentState.AWAITING_USER_INPUT
):
# TODO: this is fragile, but how else to check if eligible?
enyst marked this conversation as resolved.
Show resolved Hide resolved
return True
if isinstance(event, AgentDelegateAction):
return True
return False
if isinstance(event, Observation):
if isinstance(event, NullObservation) or isinstance(
Expand All @@ -244,9 +256,31 @@ def on_event(self, event: Event) -> None:
Args:
event (Event): The incoming event to process.
"""

print(f'CONTROLLER {self.id}:on_event: {event.__class__.__name__}({event.id})')

# If we have a delegate that is not finished or errored, forward events to it
if self.delegate is not None:
delegate_state = self.delegate.get_agent_state()
if delegate_state not in (
AgentState.FINISHED,
AgentState.ERROR,
AgentState.REJECTED,
):
# Forward the event to delegate and skip parent processing
asyncio.get_event_loop().run_until_complete(
self.delegate._on_event(event)
)
return
else:
# delegate is done
self.end_delegate()
return

asyncio.get_event_loop().run_until_complete(self._on_event(event))

async def _on_event(self, event: Event) -> None:
print(f'CONTROLLER {self.id}:_on_event: {event.__class__.__name__}({event.id})')
if hasattr(event, 'hidden') and event.hidden:
return

Expand Down Expand Up @@ -316,6 +350,11 @@ async def _handle_observation(self, observation: Observation) -> None:
elif isinstance(observation, ErrorObservation):
if self.state.agent_state == AgentState.ERROR:
self.state.metrics.merge(self.state.local_metrics)
elif isinstance(observation, AgentStateChangedObservation):
# if this is a delegate, check for stop conditions
if self.is_delegate:
delegate_state = observation.agent_state
self.log('debug', f'Delegate state: {delegate_state}')
enyst marked this conversation as resolved.
Show resolved Hide resolved

async def _handle_message_action(self, action: MessageAction) -> None:
"""Handles message actions from the event stream.
Expand Down Expand Up @@ -491,7 +530,7 @@ async def start_delegate(self, action: AgentDelegateAction) -> None:
f'start delegate, creating agent {delegate_agent.name} using LLM {llm}',
)

self.event_stream.unsubscribe(EventStreamSubscriber.AGENT_CONTROLLER, self.id)
# Create the delegate with is_delegate=True so it does NOT subscribe directly
self.delegate = AgentController(
sid=self.id + '-delegate',
agent=delegate_agent,
Expand All @@ -504,10 +543,62 @@ async def start_delegate(self, action: AgentDelegateAction) -> None:
is_delegate=True,
headless_mode=self.headless_mode,
)

await self.delegate.set_agent_state_to(AgentState.RUNNING)

def end_delegate(self) -> None:
print(f'CONTROLLER {self.id}:end_delegate')
if self.delegate is None:
return

delegate_state = self.delegate.get_agent_state()

# update iteration that shall be shared across agents
self.state.iteration = self.delegate.state.iteration

# close the delegate controller before adding new events
# then add the delegate observation
asyncio.get_event_loop().run_until_complete(self.delegate.close())

if delegate_state in (AgentState.FINISHED, AgentState.REJECTED):
# retrieve delegate result
delegate_outputs = (
self.delegate.state.outputs if self.delegate.state else {}
)

# prepare delegate result observation
# TODO: replace this with AI-generated summary (#2395)
formatted_output = ', '.join(
f'{key}: {value}' for key, value in delegate_outputs.items()
)
content = (
f'{self.delegate.agent.name} finishes task with {formatted_output}'
)

# emit the delegate result observation
obs = AgentDelegateObservation(outputs=delegate_outputs, content=content)
self.event_stream.add_event(obs, EventSource.AGENT)
else:
# delegate state is ERROR
# emit AgentDelegateObservation with error content
delegate_outputs = (
self.delegate.state.outputs if self.delegate.state else {}
)
content = (
f'{self.delegate.agent.name} encountered an error during execution.'
)

# emit the delegate result observation
obs = AgentDelegateObservation(outputs=delegate_outputs, content=content)
self.event_stream.add_event(obs, EventSource.AGENT)

# unset delegate so parent can resume normal handling
self.delegate = None
self.delegateAction = None

async def _step(self) -> None:
"""Executes a single step of the parent or delegate agent. Detects stuck agents and limits on the number of iterations and the task budget."""
print(f'CONTROLLER {self.id}:_step')
if self.get_agent_state() != AgentState.RUNNING:
return

Expand Down Expand Up @@ -614,63 +705,7 @@ async def _step(self) -> None:
async def _delegate_step(self) -> None:
"""Executes a single step of the delegate agent."""
await self.delegate._step() # type: ignore[union-attr]
assert self.delegate is not None
delegate_state = self.delegate.get_agent_state()
self.log('debug', f'Delegate state: {delegate_state}')
if delegate_state == AgentState.ERROR:
# update iteration that shall be shared across agents
self.state.iteration = self.delegate.state.iteration

# emit AgentDelegateObservation to mark delegate termination due to error
delegate_outputs = (
self.delegate.state.outputs if self.delegate.state else {}
)
content = (
f'{self.delegate.agent.name} encountered an error during execution.'
)
obs = AgentDelegateObservation(outputs=delegate_outputs, content=content)
self.event_stream.add_event(obs, EventSource.AGENT)

# close the delegate upon error
await self.delegate.close()

# resubscribe parent when delegate is finished
self.event_stream.subscribe(
EventStreamSubscriber.AGENT_CONTROLLER, self.on_event, self.id
)
self.delegate = None
self.delegateAction = None

elif delegate_state in (AgentState.FINISHED, AgentState.REJECTED):
self.log('debug', 'Delegate agent has finished execution')
# retrieve delegate result
outputs = self.delegate.state.outputs if self.delegate.state else {}

# update iteration that shall be shared across agents
self.state.iteration = self.delegate.state.iteration

# close delegate controller: we must close the delegate controller before adding new events
await self.delegate.close()

# resubscribe parent when delegate is finished
self.event_stream.subscribe(
EventStreamSubscriber.AGENT_CONTROLLER, self.on_event, self.id
)

# update delegate result observation
# TODO: replace this with AI-generated summary (#2395)
formatted_output = ', '.join(
f'{key}: {value}' for key, value in outputs.items()
)
content = (
f'{self.delegate.agent.name} finishes task with {formatted_output}'
)
obs = AgentDelegateObservation(outputs=outputs, content=content)

# clean up delegate status
self.delegate = None
self.delegateAction = None
self.event_stream.add_event(obs, EventSource.AGENT)
return

async def _handle_traffic_control(
Expand Down
1 change: 1 addition & 0 deletions openhands/core/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ async def run_controller(
event_stream.add_event(initial_user_action, EventSource.USER)

def on_event(event: Event):
print(f'MAIN:on_event: {event.__class__.__name__}({event.id})')
if isinstance(event, AgentStateChangedObservation):
if event.agent_state == AgentState.AWAITING_USER_INPUT:
if exit_on_message:
Expand Down
8 changes: 6 additions & 2 deletions openhands/events/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,16 @@ class EventStream:
_queue: queue.Queue[Event]
_queue_thread: threading.Thread
_queue_loop: asyncio.AbstractEventLoop | None
_thread_pools: dict[str, dict[str, ThreadPoolExecutor]]
_thread_loops: dict[str, dict[str, asyncio.AbstractEventLoop]]

def __init__(self, sid: str, file_store: FileStore):
self.sid = sid
self.file_store = file_store
self._stop_flag = threading.Event()
self._queue: queue.Queue[Event] = queue.Queue()
self._thread_pools: dict[str, dict[str, ThreadPoolExecutor]] = {}
self._thread_loops: dict[str, dict[str, asyncio.AbstractEventLoop]] = {}
self._thread_pools = {}
self._thread_loops = {}
self._queue_loop = None
self._queue_thread = threading.Thread(target=self._run_queue_loop)
self._queue_thread.daemon = True
Expand Down Expand Up @@ -268,6 +269,7 @@ def add_event(self, event: Event, source: EventSource):
data = event_to_dict(event)
if event.id is not None:
self.file_store.write(self._get_filename_for_id(event.id), json.dumps(data))
print(f'EVENTSTREAM:add_event: {event.__class__.__name__}({event.id})')
self._queue.put(event)

def _run_queue_loop(self):
Expand All @@ -285,6 +287,8 @@ async def _process_queue(self):
event = self._queue.get(timeout=0.1)
except queue.Empty:
continue

# pass each event to each callback in order
for key in sorted(self._subscribers.keys()):
callbacks = self._subscribers[key]
for callback_id in callbacks:
Expand Down
Loading
Loading