Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: handle multiple task outputs correctly in conditional tasks #1937

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
39 changes: 31 additions & 8 deletions src/crewai/crew.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,22 @@ def validate_end_with_at_most_one_async_task(self):

return self

@model_validator(mode="after")
def validate_must_have_non_conditional_task(self) -> "Crew":
"""Ensure that a crew has at least one non-conditional task."""
if not self.tasks:
return self
non_conditional_count = sum(
1 for task in self.tasks if not isinstance(task, ConditionalTask)
)
if non_conditional_count == 0:
raise PydanticCustomError(
"only_conditional_tasks",
"Crew must include at least one non-conditional task",
{},
)
return self

@model_validator(mode="after")
def validate_first_task(self) -> "Crew":
"""Ensure the first task is not a ConditionalTask."""
Expand Down Expand Up @@ -437,6 +453,8 @@ def validate_context_no_future_tasks(self):
)
return self



@property
def key(self) -> str:
source = [agent.key for agent in self.agents] + [
Expand Down Expand Up @@ -739,6 +757,7 @@ def _execute_tasks(
task, task_outputs, futures, task_index, was_replayed
)
if skipped_task_output:
task_outputs.append(skipped_task_output)
continue

if task.async_execution:
Expand All @@ -762,7 +781,7 @@ def _execute_tasks(
context=context,
tools=tools_for_task,
)
task_outputs = [task_output]
task_outputs.append(task_output)
self._process_task_result(task, task_output)
self._store_execution_log(task, task_output, task_index, was_replayed)

Expand All @@ -783,7 +802,7 @@ def _handle_conditional_task(
task_outputs = self._process_async_tasks(futures, was_replayed)
futures.clear()

previous_output = task_outputs[task_index - 1] if task_outputs else None
previous_output = task_outputs[-1] if task_outputs else None
if previous_output is not None and not task.should_execute(previous_output):
self._logger.log(
"debug",
Expand Down Expand Up @@ -905,11 +924,15 @@ def _process_task_result(self, task: Task, output: TaskOutput) -> None:
)

def _create_crew_output(self, task_outputs: List[TaskOutput]) -> CrewOutput:
if len(task_outputs) != 1:
raise ValueError(
"Something went wrong. Kickoff should return only one task output."
)
final_task_output = task_outputs[0]
if not task_outputs:
raise ValueError("No task outputs available to create crew output.")

# Filter out empty outputs and get the last valid one as the main output
valid_outputs = [t for t in task_outputs if t.raw]
if not valid_outputs:
raise ValueError("No valid task outputs available to create crew output.")
final_task_output = valid_outputs[-1]

final_string_output = final_task_output.raw
self._finish_execution(final_string_output)
token_usage = self.calculate_usage_metrics()
Expand All @@ -918,7 +941,7 @@ def _create_crew_output(self, task_outputs: List[TaskOutput]) -> CrewOutput:
raw=final_task_output.raw,
pydantic=final_task_output.pydantic,
json_dict=final_task_output.json_dict,
tasks_output=[task.output for task in self.tasks if task.output],
tasks_output=task_outputs,
token_usage=token_usage,
)

Expand Down
222 changes: 222 additions & 0 deletions tests/crew_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,39 @@
)


def test_crew_with_only_conditional_tasks_raises_error():
"""Test that creating a crew with only conditional tasks raises an error."""
def condition_func(task_output: TaskOutput) -> bool:
return True

conditional1 = ConditionalTask(
description="Conditional task 1",
expected_output="Output 1",
agent=researcher,
condition=condition_func,
)
conditional2 = ConditionalTask(
description="Conditional task 2",
expected_output="Output 2",
agent=researcher,
condition=condition_func,
)
conditional3 = ConditionalTask(
description="Conditional task 3",
expected_output="Output 3",
agent=researcher,
condition=condition_func,
)

with pytest.raises(
pydantic_core._pydantic_core.ValidationError,
match="Crew must include at least one non-conditional task",
):
Crew(
agents=[researcher],
tasks=[conditional1, conditional2, conditional3],
)

def test_crew_config_conditional_requirement():
with pytest.raises(ValueError):
Crew(process=Process.sequential)
Expand Down Expand Up @@ -1989,6 +2022,195 @@ def cache_func(args, result):
assert result.raw == "3"


@pytest.mark.vcr(filter_headers=["authorization"])
def test_conditional_task_uses_last_output():
"""Test that conditional tasks use the last task output for condition evaluation."""
task1 = Task(
description="First task",
expected_output="First output",
agent=researcher,
)
def condition_fails(task_output: TaskOutput) -> bool:
# This condition will never be met
return "never matches" in task_output.raw.lower()

def condition_succeeds(task_output: TaskOutput) -> bool:
# This condition will match first task's output
return "first success" in task_output.raw.lower()

conditional_task1 = ConditionalTask(
description="Second task - conditional that fails condition",
expected_output="Second output",
agent=researcher,
condition=condition_fails,
)

conditional_task2 = ConditionalTask(
description="Third task - conditional that succeeds using first task output",
expected_output="Third output",
agent=writer,
condition=condition_succeeds,
)

crew = Crew(
agents=[researcher, writer],
tasks=[task1, conditional_task1, conditional_task2],
)

# Mock outputs for tasks
mock_first = TaskOutput(
description="First task output",
raw="First success output", # Will be used by third task's condition
agent=researcher.role,
)
mock_skipped = TaskOutput(
description="Second task output",
raw="", # Empty output since condition fails
agent=researcher.role,
)
mock_third = TaskOutput(
description="Third task output",
raw="Third task executed", # Output when condition succeeds using first task output
agent=writer.role,
)

# Set up mocks for task execution and conditional logic
with patch.object(ConditionalTask, "should_execute") as mock_should_execute:
# First conditional fails, second succeeds
mock_should_execute.side_effect = [False, True]

with patch.object(Task, "execute_sync") as mock_execute:
mock_execute.side_effect = [mock_first, mock_third]
result = crew.kickoff()

# Verify execution behavior
assert mock_execute.call_count == 2 # Only first and third tasks execute
assert mock_should_execute.call_count == 2 # Both conditionals checked

# Verify outputs collection
assert len(result.tasks_output) == 3
assert result.tasks_output[0].raw == "First success output" # First task succeeded
assert result.tasks_output[1].raw == "" # Second task skipped (condition failed)
assert result.tasks_output[2].raw == "Third task executed" # Third task used first task's output

@pytest.mark.vcr(filter_headers=["authorization"])
def test_conditional_tasks_result_collection():
"""Test that task outputs are properly collected based on execution status."""
task1 = Task(
description="Normal task that always executes",
expected_output="First output",
agent=researcher,
)

def condition_never_met(task_output: TaskOutput) -> bool:
return "never matches" in task_output.raw.lower()

def condition_always_met(task_output: TaskOutput) -> bool:
return "success" in task_output.raw.lower()

task2 = ConditionalTask(
description="Conditional task that never executes",
expected_output="Second output",
agent=researcher,
condition=condition_never_met,
)

task3 = ConditionalTask(
description="Conditional task that always executes",
expected_output="Third output",
agent=writer,
condition=condition_always_met,
)

crew = Crew(
agents=[researcher, writer],
tasks=[task1, task2, task3],
)

# Mock outputs for different execution paths
mock_success = TaskOutput(
description="Success output",
raw="Success output", # Triggers third task's condition
agent=researcher.role,
)
mock_skipped = TaskOutput(
description="Skipped output",
raw="", # Empty output for skipped task
agent=researcher.role,
)
mock_conditional = TaskOutput(
description="Conditional output",
raw="Conditional task executed",
agent=writer.role,
)

# Set up mocks for task execution and conditional logic
with patch.object(ConditionalTask, "should_execute") as mock_should_execute:
# First conditional fails, second succeeds
mock_should_execute.side_effect = [False, True]

with patch.object(Task, "execute_sync") as mock_execute:
mock_execute.side_effect = [mock_success, mock_conditional]
result = crew.kickoff()

# Verify execution behavior
assert mock_execute.call_count == 2 # Only first and third tasks execute
assert mock_should_execute.call_count == 2 # Both conditionals checked

# Verify task output collection
assert len(result.tasks_output) == 3
assert result.tasks_output[0].raw == "Success output" # Normal task executed
assert result.tasks_output[1].raw == "" # Second task skipped
assert result.tasks_output[2].raw == "Conditional task executed" # Third task executed

@pytest.mark.vcr(filter_headers=["authorization"])
def test_multiple_conditional_tasks():
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is basically the exact same as the one above it so we don't really need it.

Instead, let's change this test to verify that the result we get from crew.kickoff() properly contains all the task outputs. For example, if we have 3 tasks where 1 is normal and 2 are conditional tasks and 1 of those conditioanl tasks is never triggered, the task_outputs from crew.kickoff should only have 2 task outputs.

"""Test that having multiple conditional tasks in sequence works correctly."""
task1 = Task(
description="Initial research task",
expected_output="Research output",
agent=researcher,
)

def condition1(task_output: TaskOutput) -> bool:
return "success" in task_output.raw.lower()

def condition2(task_output: TaskOutput) -> bool:
return "proceed" in task_output.raw.lower()

task2 = ConditionalTask(
description="First conditional task",
expected_output="Conditional output 1",
agent=writer,
condition=condition1,
)

task3 = ConditionalTask(
description="Second conditional task",
expected_output="Conditional output 2",
agent=writer,
condition=condition2,
)

crew = Crew(
agents=[researcher, writer],
tasks=[task1, task2, task3],
)

# Mock different task outputs to test conditional logic
mock_success = TaskOutput(
description="Mock success",
raw="Success and proceed output",
agent=researcher.role,
)

# Set up mocks for task execution
with patch.object(Task, "execute_sync", return_value=mock_success) as mock_execute:
result = crew.kickoff()
# Verify all tasks were executed (no IndexError)
assert mock_execute.call_count == 3
assert len(result.tasks_output) == 3

@pytest.mark.vcr(filter_headers=["authorization"])
def test_using_contextual_memory():
from unittest.mock import patch
Expand Down
Loading