From cf5c1f37ab75c3ba07518a702bc13cd91d9dcf12 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Mathieu?= Date: Thu, 2 Jan 2025 15:57:58 +0100 Subject: [PATCH] feat(core): EmbeddedFlow task Adds an EmbeddedFlow that allow to embed subflow tasks into a parent tasks. Fixes #6518 --- .../kestra/core/runners/ExecutableUtils.java | 50 +++-- .../kestra/plugin/core/flow/EmbeddedFlow.java | 209 ++++++++++++++++++ .../plugin/core/flow/EmbeddedFlowTest.java | 35 +++ .../resources/flows/valids/embedded-flow.yaml | 8 + .../flows/valids/embedded-parent.yaml | 8 + .../AbstractJdbcFlowRepository.java | 2 +- 6 files changed, 292 insertions(+), 20 deletions(-) create mode 100644 core/src/main/java/io/kestra/plugin/core/flow/EmbeddedFlow.java create mode 100644 core/src/test/java/io/kestra/plugin/core/flow/EmbeddedFlowTest.java create mode 100644 core/src/test/resources/flows/valids/embedded-flow.yaml create mode 100644 core/src/test/resources/flows/valids/embedded-parent.yaml diff --git a/core/src/main/java/io/kestra/core/runners/ExecutableUtils.java b/core/src/main/java/io/kestra/core/runners/ExecutableUtils.java index 336c0c785c3..5bc9ee127ba 100644 --- a/core/src/main/java/io/kestra/core/runners/ExecutableUtils.java +++ b/core/src/main/java/io/kestra/core/runners/ExecutableUtils.java @@ -65,28 +65,11 @@ public static > SubflowExecution subflowEx boolean inheritLabels, Property scheduleDate ) throws IllegalVariableEvaluationException { + String tenantId = currentExecution.getTenantId(); String subflowNamespace = runContext.render(currentTask.subflowId().namespace()); String subflowId = runContext.render(currentTask.subflowId().flowId()); Optional subflowRevision = currentTask.subflowId().revision(); - - io.kestra.core.models.flows.Flow flow = flowExecutorInterface.findByIdFromTask( - currentExecution.getTenantId(), - subflowNamespace, - subflowId, - subflowRevision, - currentExecution.getTenantId(), - currentFlow.getNamespace(), - currentFlow.getId() - ) - .orElseThrow(() -> new IllegalStateException("Unable to find flow '" + subflowNamespace + "'.'" + subflowId + "' with revision '" + subflowRevision.orElse(0) + "'")); - - if (flow.isDisabled()) { - throw new IllegalStateException("Cannot execute a flow which is disabled"); - } - - if (flow instanceof FlowWithException fwe) { - throw new IllegalStateException("Cannot execute an invalid flow: " + fwe.getException()); - } + Flow flow = getSubflow(tenantId, subflowNamespace, subflowId, subflowRevision, flowExecutorInterface, currentFlow); List