Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add unique ID to flow states #1888

Merged
merged 6 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 28 additions & 4 deletions docs/concepts/flows.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ class ExampleFlow(Flow):
@start()
def generate_city(self):
print("Starting flow")
# Each flow state automatically gets a unique ID
print(f"Flow State ID: {self.state['id']}")

response = completion(
model=self.model,
Expand All @@ -47,6 +49,8 @@ class ExampleFlow(Flow):
)

random_city = response["choices"][0]["message"]["content"]
# Store the city in our state
self.state["city"] = random_city
print(f"Random City: {random_city}")

return random_city
Expand All @@ -64,6 +68,8 @@ class ExampleFlow(Flow):
)

fun_fact = response["choices"][0]["message"]["content"]
# Store the fun fact in our state
self.state["fun_fact"] = fun_fact
return fun_fact


Expand All @@ -76,7 +82,15 @@ print(f"Generated fun fact: {result}")

In the above example, we have created a simple Flow that generates a random city using OpenAI and then generates a fun fact about that city. The Flow consists of two tasks: `generate_city` and `generate_fun_fact`. The `generate_city` task is the starting point of the Flow, and the `generate_fun_fact` task listens for the output of the `generate_city` task.

When you run the Flow, it will generate a random city and then generate a fun fact about that city. The output will be printed to the console.
Each Flow instance automatically receives a unique identifier (UUID) in its state, which helps track and manage flow executions. The state can also store additional data (like the generated city and fun fact) that persists throughout the flow's execution.

When you run the Flow, it will:
1. Generate a unique ID for the flow state
2. Generate a random city and store it in the state
3. Generate a fun fact about that city and store it in the state
4. Print the results to the console

The state's unique ID and stored data can be useful for tracking flow executions and maintaining context between tasks.

**Note:** Ensure you have set up your `.env` file to store your `OPENAI_API_KEY`. This key is necessary for authenticating requests to the OpenAI API.

Expand Down Expand Up @@ -207,14 +221,17 @@ allowing developers to choose the approach that best fits their application's ne

In unstructured state management, all state is stored in the `state` attribute of the `Flow` class.
This approach offers flexibility, enabling developers to add or modify state attributes on the fly without defining a strict schema.
Even with unstructured states, CrewAI Flows automatically generates and maintains a unique identifier (UUID) for each state instance.

```python Code
from crewai.flow.flow import Flow, listen, start

class UntructuredExampleFlow(Flow):
class UnstructuredExampleFlow(Flow):

@start()
def first_method(self):
# The state automatically includes an 'id' field
print(f"State ID: {self.state['id']}")
self.state.message = "Hello from structured flow"
self.state.counter = 0

Expand All @@ -231,10 +248,12 @@ class UntructuredExampleFlow(Flow):
print(f"State after third_method: {self.state}")


flow = UntructuredExampleFlow()
flow = UnstructuredExampleFlow()
flow.kickoff()
```

**Note:** The `id` field is automatically generated and preserved throughout the flow's execution. You don't need to manage or set it manually, and it will be maintained even when updating the state with new data.

**Key Points:**

- **Flexibility:** You can dynamically add attributes to `self.state` without predefined constraints.
Expand All @@ -245,12 +264,15 @@ flow.kickoff()
Structured state management leverages predefined schemas to ensure consistency and type safety across the workflow.
By using models like Pydantic's `BaseModel`, developers can define the exact shape of the state, enabling better validation and auto-completion in development environments.

Each state in CrewAI Flows automatically receives a unique identifier (UUID) to help track and manage state instances. This ID is automatically generated and managed by the Flow system.

```python Code
from crewai.flow.flow import Flow, listen, start
from pydantic import BaseModel


class ExampleState(BaseModel):
# Note: 'id' field is automatically added to all states
counter: int = 0
message: str = ""

Expand All @@ -259,6 +281,8 @@ class StructuredExampleFlow(Flow[ExampleState]):

@start()
def first_method(self):
# Access the auto-generated ID if needed
print(f"State ID: {self.state.id}")
self.state.message = "Hello from structured flow"

@listen(first_method)
Expand Down Expand Up @@ -628,4 +652,4 @@ Also, check out our YouTube video on how to use flows in CrewAI below!
allow="accelerometer; autoplay; clipboard-write; encrypted-media; gyroscope; picture-in-picture; web-share"
referrerpolicy="strict-origin-when-cross-origin"
allowfullscreen
></iframe>
></iframe>
74 changes: 61 additions & 13 deletions src/crewai/flow/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@
Union,
cast,
)
from uuid import uuid4

from blinker import Signal
from pydantic import BaseModel, ValidationError
from pydantic import BaseModel, Field, ValidationError

from crewai.flow.flow_events import (
FlowFinishedEvent,
Expand All @@ -27,7 +28,12 @@
from crewai.flow.utils import get_possible_return_constants
from crewai.telemetry import Telemetry

T = TypeVar("T", bound=Union[BaseModel, Dict[str, Any]])

class FlowState(BaseModel):
"""Base model for all flow states, ensuring each state has a unique ID."""
id: str = Field(default_factory=lambda: str(uuid4()), description="Unique identifier for the flow state")

T = TypeVar("T", bound=Union[FlowState, Dict[str, Any]])


def start(condition: Optional[Union[str, dict, Callable]] = None) -> Callable:
Expand Down Expand Up @@ -377,14 +383,37 @@ def __init__(self) -> None:
self._methods[method_name] = getattr(self, method_name)

def _create_initial_state(self) -> T:
# Handle case where initial_state is None but we have a type parameter
if self.initial_state is None and hasattr(self, "_initial_state_T"):
return self._initial_state_T() # type: ignore
state_type = getattr(self, "_initial_state_T")
if isinstance(state_type, type):
if issubclass(state_type, FlowState):
return state_type() # type: ignore
elif issubclass(state_type, BaseModel):
# Create a new type that includes the ID field
class StateWithId(state_type, FlowState): # type: ignore
pass
return StateWithId() # type: ignore

# Handle case where no initial state is provided
if self.initial_state is None:
return {} # type: ignore
elif isinstance(self.initial_state, type):
return self.initial_state()
else:
return self.initial_state
return {"id": str(uuid4())} # type: ignore

# Handle case where initial_state is a type (class)
if isinstance(self.initial_state, type):
if issubclass(self.initial_state, FlowState):
return self.initial_state() # type: ignore
elif issubclass(self.initial_state, BaseModel):
# Create a new type that includes the ID field
class StateWithId(self.initial_state, FlowState): # type: ignore
pass
return StateWithId() # type: ignore

# Handle dictionary case
if isinstance(self.initial_state, dict) and "id" not in self.initial_state:
self.initial_state["id"] = str(uuid4())

return self.initial_state # type: ignore

@property
def state(self) -> T:
Expand All @@ -396,10 +425,17 @@ def method_outputs(self) -> List[Any]:
return self._method_outputs

def _initialize_state(self, inputs: Dict[str, Any]) -> None:
if isinstance(self._state, BaseModel):
if isinstance(self._state, dict):
# Preserve the ID when updating unstructured state
current_id = self._state.get("id")
self._state.update(inputs)
if current_id:
self._state["id"] = current_id
elif "id" not in self._state:
self._state["id"] = str(uuid4())
elif isinstance(self._state, BaseModel):
# Structured state
try:

def create_model_with_extra_forbid(
base_model: Type[BaseModel],
) -> Type[BaseModel]:
Expand All @@ -409,16 +445,28 @@ class ModelWithExtraForbid(base_model): # type: ignore

return ModelWithExtraForbid

# Get current state as dict, preserving the ID if it exists
state_model = cast(BaseModel, self._state)
current_state = (
state_model.model_dump()
if hasattr(state_model, "model_dump")
else state_model.dict()
if hasattr(state_model, "dict")
else {
k: v
for k, v in state_model.__dict__.items()
if not k.startswith("_")
}
)

ModelWithExtraForbid = create_model_with_extra_forbid(
self._state.__class__
)
self._state = cast(
T, ModelWithExtraForbid(**{**self._state.model_dump(), **inputs})
T, ModelWithExtraForbid(**{**current_state, **inputs})
)
except ValidationError as e:
raise ValueError(f"Invalid inputs for structured state: {e}") from e
elif isinstance(self._state, dict):
self._state.update(inputs)
else:
raise TypeError("State must be a BaseModel instance or a dictionary.")

Expand Down
76 changes: 76 additions & 0 deletions tests/flow_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import asyncio

import pytest
from pydantic import BaseModel

from crewai.flow.flow import Flow, and_, listen, or_, router, start

Expand Down Expand Up @@ -265,6 +266,81 @@ def step_2(self):
assert flow.counter == 2


def test_flow_uuid_unstructured():
"""Test that unstructured (dictionary) flow states automatically get a UUID that persists."""
initial_id = None

class UUIDUnstructuredFlow(Flow):
@start()
def first_method(self):
nonlocal initial_id
# Verify ID is automatically generated
assert "id" in self.state
assert isinstance(self.state["id"], str)
# Store initial ID for comparison
initial_id = self.state["id"]
# Add some data to trigger state update
self.state["data"] = "example"

@listen(first_method)
def second_method(self):
# Ensure the ID persists after state updates
assert "id" in self.state
assert self.state["id"] == initial_id
# Update state again to verify ID preservation
self.state["more_data"] = "test"
assert self.state["id"] == initial_id

flow = UUIDUnstructuredFlow()
flow.kickoff()
# Verify ID persists after flow completion
assert flow.state["id"] == initial_id
# Verify UUID format (36 characters, including hyphens)
assert len(flow.state["id"]) == 36


def test_flow_uuid_structured():
"""Test that structured (Pydantic) flow states automatically get a UUID that persists."""
initial_id = None

class MyStructuredState(BaseModel):
counter: int = 0
message: str = "initial"

class UUIDStructuredFlow(Flow[MyStructuredState]):
@start()
def first_method(self):
nonlocal initial_id
# Verify ID is automatically generated and accessible as attribute
assert hasattr(self.state, "id")
assert isinstance(self.state.id, str)
# Store initial ID for comparison
initial_id = self.state.id
# Update some fields to trigger state changes
self.state.counter += 1
self.state.message = "updated"

@listen(first_method)
def second_method(self):
# Ensure the ID persists after state updates
assert hasattr(self.state, "id")
assert self.state.id == initial_id
# Update state again to verify ID preservation
self.state.counter += 1
self.state.message = "final"
assert self.state.id == initial_id

flow = UUIDStructuredFlow()
flow.kickoff()
# Verify ID persists after flow completion
assert flow.state.id == initial_id
# Verify UUID format (36 characters, including hyphens)
assert len(flow.state.id) == 36
# Verify other state fields were properly updated
assert flow.state.counter == 2
assert flow.state.message == "final"


def test_router_with_multiple_conditions():
"""Test a router that triggers when any of multiple steps complete (OR condition),
and another router that triggers only after all specified steps complete (AND condition).
Expand Down
Loading