diff --git a/cmd/router/main.go b/cmd/router/main.go index 24a96ff339..d7733aac3c 100644 --- a/cmd/router/main.go +++ b/cmd/router/main.go @@ -104,40 +104,44 @@ type EnsembleStepOutput struct { StepStatusCode int } +// See if reviewer suggests a better name for this function +func handleSplitterORSwitchNode(route *v1alpha1.InferenceStep, graph v1alpha1.InferenceGraphSpec, input []byte, headers http.Header) ([]byte, int, error) { + var statusCode int + var responseBytes []byte + var err error + stepType := "serviceUrl" + if route.NodeName != "" { + stepType = "node" + } + log.Info("Starting execution of step", "type", stepType, "stepName", route.StepName) + if responseBytes, statusCode, err = executeStep(route, graph, input, headers); err != nil { + return nil, 500, err + } + + if route.Dependency == v1alpha1.Hard && !isSuccessFul(statusCode) { + log.Info("This step is a hard dependency and it is unsuccessful", "stepName", route.StepName, "statusCode", statusCode) + } + return responseBytes, statusCode, nil +} + func routeStep(nodeName string, graph v1alpha1.InferenceGraphSpec, input []byte, headers http.Header) ([]byte, int, error) { defer timeTrack(time.Now(), "node", nodeName) currentNode := graph.Nodes[nodeName] if currentNode.RouterType == v1alpha1.Splitter { - return executeStep(pickupRoute(currentNode.Steps), graph, input, headers) + route := pickupRoute(currentNode.Steps) + return handleSplitterORSwitchNode(route, graph, input, headers) } if currentNode.RouterType == v1alpha1.Switch { - var statusCode int - var responseBytes []byte var err error route := pickupRouteByCondition(input, currentNode.Steps) if route == nil { errorMessage := "None of the routes matched with the switch condition" + err = errors.New(errorMessage) log.Error(err, errorMessage) - return nil, 404, errors.New(errorMessage) - } - stepType := "serviceUrl" - if route.NodeName != "" { - stepType = "node" + return nil, 404, err } - log.Info("Starting execution of step", "type", stepType, "stepName", route.StepName) - if responseBytes, statusCode, err = executeStep(route, graph, input, headers); err != nil { - return nil, 500, err - } - - if route.Dependency == v1alpha1.Hard { - if !isSuccessFul(statusCode) { - log.Info("This step is a hard dependency and it is unsuccessful", "stepName", route.StepName, "statusCode", statusCode) - // Stop the execution right away if step is hard dep and is unsuccessful - return responseBytes, statusCode, nil - } - } - return responseBytes, statusCode, nil + return handleSplitterORSwitchNode(route, graph, input, headers) } if currentNode.RouterType == v1alpha1.Ensemble { ensembleRes := make([]chan EnsembleStepOutput, len(currentNode.Steps)) @@ -221,12 +225,12 @@ func routeStep(nodeName string, graph v1alpha1.InferenceGraphSpec, input []byte, return nil, 500, err } /* - Only if a step is a hard dependency, we will check for its success. + Only if a step is a hard dependency, we will check for its success. */ if step.Dependency == v1alpha1.Hard { if !isSuccessFul(statusCode) { log.Info("This step is a hard dependency and it is unsuccessful", "stepName", step.StepName, "statusCode", statusCode) - // Stop the execution right away if step is hard dep and is unsuccessful + // Stop the execution of sequence right away if step is a hard dependency and is unsuccessful return responseBytes, statusCode, nil } } diff --git a/test/e2e/graph/test-resources/ig_test_switch_scenario_10.yaml b/test/e2e/graph/test-resources/ig_test_switch_scenario_10.yaml new file mode 100644 index 0000000000..86beb59905 --- /dev/null +++ b/test/e2e/graph/test-resources/ig_test_switch_scenario_10.yaml @@ -0,0 +1,22 @@ +apiVersion: serving.kserve.io/v1alpha1 +kind: InferenceGraph +metadata: + name: {{ graph_name }} +spec: + nodes: + root: + routerType: Sequence + steps: + - name: "rootStep1" + nodeName: node1 + dependency: Hard + - name: "rootStep2" + serviceName: {{ success_200_isvc_id }} + node1: + routerType: Splitter + steps: + - name: "node1Step1" + weight: 100 + serviceName: {{ error_404_isvc_id }} + condition: "[@this].#(decision_picker==ERROR)" + dependency: Hard \ No newline at end of file diff --git a/test/e2e/graph/test-resources/ig_test_switch_scenario_9.yaml b/test/e2e/graph/test-resources/ig_test_switch_scenario_9.yaml new file mode 100644 index 0000000000..e74f4c4f77 --- /dev/null +++ b/test/e2e/graph/test-resources/ig_test_switch_scenario_9.yaml @@ -0,0 +1,21 @@ +apiVersion: serving.kserve.io/v1alpha1 +kind: InferenceGraph +metadata: + name: {{ graph_name }} +spec: + nodes: + root: + routerType: Sequence + steps: + - name: "rootStep1" + nodeName: node1 + dependency: Soft # This is the default setting but setting it explicitly for more clarity + - name: "rootStep2" + serviceName: {{ success_200_isvc_id }} + node1: + routerType: Splitter + steps: + - name: "node1Step1" + weight: 100 + serviceName: {{ error_404_isvc_id }} + condition: "[@this].#(decision_picker==ERROR)" \ No newline at end of file diff --git a/test/e2e/graph/test_inference_graph.py b/test/e2e/graph/test_inference_graph.py index fdafcd688b..ae915126a2 100644 --- a/test/e2e/graph/test_inference_graph.py +++ b/test/e2e/graph/test_inference_graph.py @@ -741,3 +741,124 @@ def test_ig_scenario8(): kserve_client.delete_inference_graph(graph_name, KSERVE_TEST_NAMESPACE) kserve_client.delete(success_isvc_name, KSERVE_TEST_NAMESPACE) kserve_client.delete(error_isvc_name, KSERVE_TEST_NAMESPACE) + + +@pytest.mark.graph +@pytest.mark.kourier +def test_ig_scenario9(): + """ + Scenario: Splitter graph where a match would happen for error node and then error would return but IG will continue + execution and call the next step in the flow as error step will be a soft dependency. + Expectation: IG will return response of success_isvc. + """ + logging.info("Starting test test_ig_scenario9") + suffix = str(uuid.uuid4())[1:6] + success_isvc_name, error_isvc_name, success_isvc, error_isvc = setup_isvcs_for_test( + suffix + ) + logging.info(f"success_isvc_name is {success_isvc_name}") + logging.info(f"error_isvc_name is {error_isvc_name}") + + kserve_client = KServeClient( + config_file=os.environ.get("KUBECONFIG", "~/.kube/config") + ) + kserve_client.create(success_isvc) + kserve_client.create(error_isvc) + + # Create graph + graph_name = "-".join(["splitter-graph", suffix]) + + # Because we run from test/e2e location in run-e2e-tests.sh + deployment_yaml_path = os.path.join( + IG_TEST_RESOURCES_BASE_LOCATION, "ig_test_switch_scenario_9.yaml" + ) + + # Read YAML file + with open(deployment_yaml_path, "r") as stream: + file_content = stream.read() + resource_template = Template(file_content) + substitutions = { + "graph_name": graph_name, + "error_404_isvc_id": error_isvc_name, + "success_200_isvc_id": success_isvc_name, + } + resource_body_after_rendering = yaml.safe_load( + resource_template.render(substitutions) + ) + create_ig_using_custom_object_api(resource_body_after_rendering) + + kserve_client.wait_isvc_ready(success_isvc_name, namespace=KSERVE_TEST_NAMESPACE) + kserve_client.wait_isvc_ready(error_isvc_name, namespace=KSERVE_TEST_NAMESPACE) + kserve_client.wait_ig_ready(graph_name, namespace=KSERVE_TEST_NAMESPACE) + + response = predict_ig( + graph_name, + os.path.join(IG_TEST_RESOURCES_BASE_LOCATION, "iris_input.json"), + ) + assert response == {"message": "SUCCESS"} + + kserve_client.delete_inference_graph(graph_name, KSERVE_TEST_NAMESPACE) + kserve_client.delete(success_isvc_name, KSERVE_TEST_NAMESPACE) + kserve_client.delete(error_isvc_name, KSERVE_TEST_NAMESPACE) + + +@pytest.mark.graph +@pytest.mark.kourier +def test_ig_scenario10(): + """ + Scenario: Splitter graph where a match would happen for error node and then error would return and IG will NOT + continue execution and call the next step in the flow as error step will be a HARD dependency. + Expectation: IG will return response of success_isvc. + """ + logging.info("Starting test test_ig_scenario10") + suffix = str(uuid.uuid4())[1:6] + success_isvc_name, error_isvc_name, success_isvc, error_isvc = setup_isvcs_for_test( + suffix + ) + logging.info(f"success_isvc_name is {success_isvc_name}") + logging.info(f"error_isvc_name is {error_isvc_name}") + + kserve_client = KServeClient( + config_file=os.environ.get("KUBECONFIG", "~/.kube/config") + ) + kserve_client.create(success_isvc) + kserve_client.create(error_isvc) + + # Create graph + graph_name = "-".join(["splitter-graph", suffix]) + + # Because we run from test/e2e location in run-e2e-tests.sh + deployment_yaml_path = os.path.join( + IG_TEST_RESOURCES_BASE_LOCATION, "ig_test_switch_scenario_10.yaml" + ) + + # Read YAML file + with open(deployment_yaml_path, "r") as stream: + file_content = stream.read() + resource_template = Template(file_content) + substitutions = { + "graph_name": graph_name, + "error_404_isvc_id": error_isvc_name, + "success_200_isvc_id": success_isvc_name, + } + resource_body_after_rendering = yaml.safe_load( + resource_template.render(substitutions) + ) + create_ig_using_custom_object_api(resource_body_after_rendering) + + kserve_client.wait_isvc_ready(success_isvc_name, namespace=KSERVE_TEST_NAMESPACE) + kserve_client.wait_isvc_ready(error_isvc_name, namespace=KSERVE_TEST_NAMESPACE) + kserve_client.wait_ig_ready(graph_name, namespace=KSERVE_TEST_NAMESPACE) + + with pytest.raises(HTTPError) as exc_info: + predict_ig( + graph_name, + os.path.join(IG_TEST_RESOURCES_BASE_LOCATION, "iris_input.json"), + ) + + assert exc_info.value.response.json() == {"detail": "Intentional 404 code"} + assert exc_info.value.response.status_code == 404 + + kserve_client.delete_inference_graph(graph_name, KSERVE_TEST_NAMESPACE) + kserve_client.delete(success_isvc_name, KSERVE_TEST_NAMESPACE) + kserve_client.delete(error_isvc_name, KSERVE_TEST_NAMESPACE)