Skip to content

Commit

Permalink
Fix: error response handling for splitter and switch nodes (kserve#3116)
Browse files Browse the repository at this point in the history
* Refactoring common code to a function

Signed-off-by: Rachit Chauhan <[email protected]>

* Added e2e test cases for splitter nodes

Signed-off-by: Rachit Chauhan <[email protected]>

* Fixed linting errors

Signed-off-by: Rachit Chauhan <[email protected]>

* Minor refactoring for error message

Signed-off-by: Rachit Chauhan <[email protected]>

---------

Signed-off-by: Rachit Chauhan <[email protected]>
  • Loading branch information
rachitchauhan43 authored Sep 20, 2023
1 parent 4142342 commit 8231072
Show file tree
Hide file tree
Showing 4 changed files with 191 additions and 23 deletions.
50 changes: 27 additions & 23 deletions cmd/router/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,40 +104,44 @@ type EnsembleStepOutput struct {
StepStatusCode int
}

// See if reviewer suggests a better name for this function
func handleSplitterORSwitchNode(route *v1alpha1.InferenceStep, graph v1alpha1.InferenceGraphSpec, input []byte, headers http.Header) ([]byte, int, error) {
var statusCode int
var responseBytes []byte
var err error
stepType := "serviceUrl"
if route.NodeName != "" {
stepType = "node"
}
log.Info("Starting execution of step", "type", stepType, "stepName", route.StepName)
if responseBytes, statusCode, err = executeStep(route, graph, input, headers); err != nil {
return nil, 500, err
}

if route.Dependency == v1alpha1.Hard && !isSuccessFul(statusCode) {
log.Info("This step is a hard dependency and it is unsuccessful", "stepName", route.StepName, "statusCode", statusCode)
}
return responseBytes, statusCode, nil
}

func routeStep(nodeName string, graph v1alpha1.InferenceGraphSpec, input []byte, headers http.Header) ([]byte, int, error) {
defer timeTrack(time.Now(), "node", nodeName)
currentNode := graph.Nodes[nodeName]

if currentNode.RouterType == v1alpha1.Splitter {
return executeStep(pickupRoute(currentNode.Steps), graph, input, headers)
route := pickupRoute(currentNode.Steps)
return handleSplitterORSwitchNode(route, graph, input, headers)
}
if currentNode.RouterType == v1alpha1.Switch {
var statusCode int
var responseBytes []byte
var err error
route := pickupRouteByCondition(input, currentNode.Steps)
if route == nil {
errorMessage := "None of the routes matched with the switch condition"
err = errors.New(errorMessage)
log.Error(err, errorMessage)
return nil, 404, errors.New(errorMessage)
}
stepType := "serviceUrl"
if route.NodeName != "" {
stepType = "node"
return nil, 404, err
}
log.Info("Starting execution of step", "type", stepType, "stepName", route.StepName)
if responseBytes, statusCode, err = executeStep(route, graph, input, headers); err != nil {
return nil, 500, err
}

if route.Dependency == v1alpha1.Hard {
if !isSuccessFul(statusCode) {
log.Info("This step is a hard dependency and it is unsuccessful", "stepName", route.StepName, "statusCode", statusCode)
// Stop the execution right away if step is hard dep and is unsuccessful
return responseBytes, statusCode, nil
}
}
return responseBytes, statusCode, nil
return handleSplitterORSwitchNode(route, graph, input, headers)
}
if currentNode.RouterType == v1alpha1.Ensemble {
ensembleRes := make([]chan EnsembleStepOutput, len(currentNode.Steps))
Expand Down Expand Up @@ -221,12 +225,12 @@ func routeStep(nodeName string, graph v1alpha1.InferenceGraphSpec, input []byte,
return nil, 500, err
}
/*
Only if a step is a hard dependency, we will check for its success.
Only if a step is a hard dependency, we will check for its success.
*/
if step.Dependency == v1alpha1.Hard {
if !isSuccessFul(statusCode) {
log.Info("This step is a hard dependency and it is unsuccessful", "stepName", step.StepName, "statusCode", statusCode)
// Stop the execution right away if step is hard dep and is unsuccessful
// Stop the execution of sequence right away if step is a hard dependency and is unsuccessful
return responseBytes, statusCode, nil
}
}
Expand Down
22 changes: 22 additions & 0 deletions test/e2e/graph/test-resources/ig_test_switch_scenario_10.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
apiVersion: serving.kserve.io/v1alpha1
kind: InferenceGraph
metadata:
name: {{ graph_name }}
spec:
nodes:
root:
routerType: Sequence
steps:
- name: "rootStep1"
nodeName: node1
dependency: Hard
- name: "rootStep2"
serviceName: {{ success_200_isvc_id }}
node1:
routerType: Splitter
steps:
- name: "node1Step1"
weight: 100
serviceName: {{ error_404_isvc_id }}
condition: "[@this].#(decision_picker==ERROR)"
dependency: Hard
21 changes: 21 additions & 0 deletions test/e2e/graph/test-resources/ig_test_switch_scenario_9.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
apiVersion: serving.kserve.io/v1alpha1
kind: InferenceGraph
metadata:
name: {{ graph_name }}
spec:
nodes:
root:
routerType: Sequence
steps:
- name: "rootStep1"
nodeName: node1
dependency: Soft # This is the default setting but setting it explicitly for more clarity
- name: "rootStep2"
serviceName: {{ success_200_isvc_id }}
node1:
routerType: Splitter
steps:
- name: "node1Step1"
weight: 100
serviceName: {{ error_404_isvc_id }}
condition: "[@this].#(decision_picker==ERROR)"
121 changes: 121 additions & 0 deletions test/e2e/graph/test_inference_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -741,3 +741,124 @@ def test_ig_scenario8():
kserve_client.delete_inference_graph(graph_name, KSERVE_TEST_NAMESPACE)
kserve_client.delete(success_isvc_name, KSERVE_TEST_NAMESPACE)
kserve_client.delete(error_isvc_name, KSERVE_TEST_NAMESPACE)


@pytest.mark.graph
@pytest.mark.kourier
def test_ig_scenario9():
"""
Scenario: Splitter graph where a match would happen for error node and then error would return but IG will continue
execution and call the next step in the flow as error step will be a soft dependency.
Expectation: IG will return response of success_isvc.
"""
logging.info("Starting test test_ig_scenario9")
suffix = str(uuid.uuid4())[1:6]
success_isvc_name, error_isvc_name, success_isvc, error_isvc = setup_isvcs_for_test(
suffix
)
logging.info(f"success_isvc_name is {success_isvc_name}")
logging.info(f"error_isvc_name is {error_isvc_name}")

kserve_client = KServeClient(
config_file=os.environ.get("KUBECONFIG", "~/.kube/config")
)
kserve_client.create(success_isvc)
kserve_client.create(error_isvc)

# Create graph
graph_name = "-".join(["splitter-graph", suffix])

# Because we run from test/e2e location in run-e2e-tests.sh
deployment_yaml_path = os.path.join(
IG_TEST_RESOURCES_BASE_LOCATION, "ig_test_switch_scenario_9.yaml"
)

# Read YAML file
with open(deployment_yaml_path, "r") as stream:
file_content = stream.read()
resource_template = Template(file_content)
substitutions = {
"graph_name": graph_name,
"error_404_isvc_id": error_isvc_name,
"success_200_isvc_id": success_isvc_name,
}
resource_body_after_rendering = yaml.safe_load(
resource_template.render(substitutions)
)
create_ig_using_custom_object_api(resource_body_after_rendering)

kserve_client.wait_isvc_ready(success_isvc_name, namespace=KSERVE_TEST_NAMESPACE)
kserve_client.wait_isvc_ready(error_isvc_name, namespace=KSERVE_TEST_NAMESPACE)
kserve_client.wait_ig_ready(graph_name, namespace=KSERVE_TEST_NAMESPACE)

response = predict_ig(
graph_name,
os.path.join(IG_TEST_RESOURCES_BASE_LOCATION, "iris_input.json"),
)
assert response == {"message": "SUCCESS"}

kserve_client.delete_inference_graph(graph_name, KSERVE_TEST_NAMESPACE)
kserve_client.delete(success_isvc_name, KSERVE_TEST_NAMESPACE)
kserve_client.delete(error_isvc_name, KSERVE_TEST_NAMESPACE)


@pytest.mark.graph
@pytest.mark.kourier
def test_ig_scenario10():
"""
Scenario: Splitter graph where a match would happen for error node and then error would return and IG will NOT
continue execution and call the next step in the flow as error step will be a HARD dependency.
Expectation: IG will return response of success_isvc.
"""
logging.info("Starting test test_ig_scenario10")
suffix = str(uuid.uuid4())[1:6]
success_isvc_name, error_isvc_name, success_isvc, error_isvc = setup_isvcs_for_test(
suffix
)
logging.info(f"success_isvc_name is {success_isvc_name}")
logging.info(f"error_isvc_name is {error_isvc_name}")

kserve_client = KServeClient(
config_file=os.environ.get("KUBECONFIG", "~/.kube/config")
)
kserve_client.create(success_isvc)
kserve_client.create(error_isvc)

# Create graph
graph_name = "-".join(["splitter-graph", suffix])

# Because we run from test/e2e location in run-e2e-tests.sh
deployment_yaml_path = os.path.join(
IG_TEST_RESOURCES_BASE_LOCATION, "ig_test_switch_scenario_10.yaml"
)

# Read YAML file
with open(deployment_yaml_path, "r") as stream:
file_content = stream.read()
resource_template = Template(file_content)
substitutions = {
"graph_name": graph_name,
"error_404_isvc_id": error_isvc_name,
"success_200_isvc_id": success_isvc_name,
}
resource_body_after_rendering = yaml.safe_load(
resource_template.render(substitutions)
)
create_ig_using_custom_object_api(resource_body_after_rendering)

kserve_client.wait_isvc_ready(success_isvc_name, namespace=KSERVE_TEST_NAMESPACE)
kserve_client.wait_isvc_ready(error_isvc_name, namespace=KSERVE_TEST_NAMESPACE)
kserve_client.wait_ig_ready(graph_name, namespace=KSERVE_TEST_NAMESPACE)

with pytest.raises(HTTPError) as exc_info:
predict_ig(
graph_name,
os.path.join(IG_TEST_RESOURCES_BASE_LOCATION, "iris_input.json"),
)

assert exc_info.value.response.json() == {"detail": "Intentional 404 code"}
assert exc_info.value.response.status_code == 404

kserve_client.delete_inference_graph(graph_name, KSERVE_TEST_NAMESPACE)
kserve_client.delete(success_isvc_name, KSERVE_TEST_NAMESPACE)
kserve_client.delete(error_isvc_name, KSERVE_TEST_NAMESPACE)

0 comments on commit 8231072

Please sign in to comment.