diff --git a/rasa/core/policies/flow_policy.py b/rasa/core/policies/flow_policy.py index 3e47fe7b6a94..50a8e70579b1 100644 --- a/rasa/core/policies/flow_policy.py +++ b/rasa/core/policies/flow_policy.py @@ -186,9 +186,10 @@ def find_startable_flow( The predicted action and the events to run. """ for flow in flows.underlying_flows: - if not flow.steps: + first_step = flow.start_step() + if not first_step: continue - first_step = flow.steps[0] + if isinstance( first_step, IntentFlowStep ) and first_step.intent == tracker.latest_message.intent.get( diff --git a/rasa/shared/core/flows/flow.py b/rasa/shared/core/flows/flow.py index 4a2a5b841cc4..712dd32ed6c5 100644 --- a/rasa/shared/core/flows/flow.py +++ b/rasa/shared/core/flows/flow.py @@ -1,11 +1,73 @@ from __future__ import annotations -from dataclasses import dataclass -from typing import Any, Dict, List, Optional, Protocol, Text +from dataclasses import dataclass, field +from typing import Any, Dict, List, Optional, Protocol, Set, Text +from rasa.shared.exceptions import RasaException import rasa.shared.utils.io +class UnreachableFlowStepException(RasaException): + """Raised when a flow step is unreachable.""" + + def __init__(self, step: FlowStep, flow: Flow) -> None: + """Initializes the exception.""" + self.step = step + self.flow = flow + + def __str__(self) -> Text: + """Return a string representation of the exception.""" + return ( + f"Step '{self.step.id}' in flow '{self.flow.id}' can not be reached from the start step. " + f"Please make sure that all steps can be reached from the start step, e.g. by " + f"checking that another step points to this step." + ) + + +class UnresolvedFlowStepIdException(RasaException): + """Raised when a flow step is referenced but it's id can not be resolved.""" + + def __init__( + self, step_id: Text, flow: Flow, referenced_from: Optional[FlowStep] + ) -> None: + """Initializes the exception.""" + self.step_id = step_id + self.flow = flow + self.referenced_from = referenced_from + + def __str__(self) -> Text: + """Return a string representation of the exception.""" + if self.referenced_from: + exception_message = ( + f"Step with id '{self.step_id}' could not be resolved. " + f"'Step '{self.referenced_from.id}' in flow '{self.flow.id}' " + f"referenced this step but it does not exist. " + ) + else: + exception_message = ( + f"Step '{self.step_id}' in flow '{self.flow.id}' can not be resolved. " + ) + + return exception_message + ( + f"Please make sure that the step is defined in the same flow." + ) + + +class UnresolvedFlowException(RasaException): + """Raised when a flow is referenced but it's id can not be resolved.""" + + def __init__(self, flow_id: Text) -> None: + """Initializes the exception.""" + self.flow_id = flow_id + + def __str__(self) -> Text: + """Return a string representation of the exception.""" + return ( + f"Flow '{self.flow_id}' can not be resolved. " + f"Please make sure that the flow is defined." + ) + + class FlowsList: """Represents the configuration of a list of flow. @@ -80,25 +142,27 @@ def flow_by_id(self, id: Optional[Text]) -> Optional[Flow]: else: return None - def steps_of_flow(self, flow_id: Text) -> List[FlowStep]: - """Return all the steps of the flow.""" - flow = self.flow_by_id(flow_id) - if flow: - return flow.steps - return [] - def first_step(self, flow_id: Text) -> Optional[FlowStep]: """Return the first step of the flow.""" - steps = self.steps_of_flow(flow_id) - return steps[0] if steps else None + flow = self.flow_by_id(flow_id) + return flow.start_step() if flow else None def step_by_id(self, step_id: Text, flow_id: Text) -> FlowStep: """Return the step with the given id.""" - for step in self.steps_of_flow(flow_id): - if step.id == step_id: - return step - else: - raise ValueError(f"Step with id '{step_id}' not found.") + flow = self.flow_by_id(flow_id) + if not flow: + raise UnresolvedFlowException(flow_id) + + step = flow.step_for_id(step_id) + if not step: + raise UnresolvedFlowStepIdException(step_id, flow) + + return step + + def validate(self) -> None: + """Validate the flows.""" + for flow in self.underlying_flows: + flow.validate() @dataclass @@ -110,6 +174,7 @@ class Flow: description: Optional[Text] """The description of the flow.""" steps: List[FlowStep] + """The steps of the flow.""" @staticmethod def from_json(flow_id: Text, flow_config: Dict[Text, Any]) -> Flow: @@ -142,6 +207,61 @@ def as_json(self) -> Dict[Text, Any]: "steps": [step.as_json() for step in self.steps], } + def validate(self) -> None: + """Validates the flow configuration. + + This ensures that the flow semantically makes sense. E.g. it + checks: + -""" + + self._validate_all_next_ids_are_availble_steps() + self._validate_all_steps_can_be_reached() + + def _validate_all_next_ids_are_availble_steps(self) -> None: + """Validates that all next links point to existing steps.""" + + available_steps = {step.id for step in self.steps} + for step in self.steps: + for link in step.next.links: + if link.target not in available_steps: + raise UnresolvedFlowStepIdException(link.target, self, step) + + def _validate_all_steps_can_be_reached(self) -> None: + """Validates that all steps can be reached from the start step.""" + + def _reachable_steps( + step: Optional[FlowStep], reached_steps: Set[Text] + ) -> Set[FlowStep]: + """Validates that the given step can be reached from the start step.""" + if step is None or step.id in reached_steps: + return reached_steps + + reached_steps.add(step.id) + for link in step.next.links: + reached_steps = _reachable_steps( + self.step_for_id(link.target), reached_steps + ) + return reached_steps + + reached_steps = _reachable_steps(self.start_step(), set()) + + for step in self.steps: + if step.id not in reached_steps: + raise UnreachableFlowStepException(step, self) + + def step_for_id(self, step_id: Text) -> Optional[FlowStep]: + """Returns the step with the given id.""" + for step in self.steps: + if step.id == step_id: + return step + return None + + def start_step(self) -> Optional[FlowStep]: + """Returns the start step of this flow.""" + if len(self.steps) == 0: + return None + return self.steps[0] + def step_from_json(flow_step_config: Dict[Text, Any]) -> FlowStep: """Used to read flow steps from parsed YAML. diff --git a/rasa/shared/core/flows/yaml_flows_io.py b/rasa/shared/core/flows/yaml_flows_io.py index 899a91e30030..2ae963ae1a71 100644 --- a/rasa/shared/core/flows/yaml_flows_io.py +++ b/rasa/shared/core/flows/yaml_flows_io.py @@ -56,7 +56,10 @@ def read_from_string(cls, string: Text, skip_validation: bool = False) -> FlowsL yaml_content = rasa.shared.utils.io.read_yaml(string) - return FlowsList.from_json(yaml_content.get(KEY_FLOWS, {})) + flows = FlowsList.from_json(yaml_content.get(KEY_FLOWS, {})) + if not skip_validation: + flows.validate() + return flows class YamlFlowsWriter: