diff --git a/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMainSpec.scala b/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMainSpec.scala index dea96b429cc..58a9efcac3f 100644 --- a/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMainSpec.scala +++ b/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/runner/FlinkTestMainSpec.scala @@ -6,7 +6,7 @@ import org.apache.flink.runtime.client.JobExecutionException import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpec import org.scalatest.{BeforeAndAfterEach, Inside, OptionValues} -import pl.touk.nussknacker.engine.api.{CirceUtil, DisplayJsonWithEncoder} +import pl.touk.nussknacker.engine.api.{CirceUtil, DisplayJsonWithEncoder, FragmentSpecificData, MetaData} import pl.touk.nussknacker.engine.api.process.ComponentUseCase import pl.touk.nussknacker.engine.api.test.{ScenarioTestData, ScenarioTestJsonRecord} import pl.touk.nussknacker.engine.build.{GraphBuilder, ScenarioBuilder} @@ -16,11 +16,20 @@ import pl.touk.nussknacker.engine.flink.test.{ RecordingExceptionConsumer, RecordingExceptionConsumerProvider } -import pl.touk.nussknacker.engine.graph.node.Case +import pl.touk.nussknacker.engine.graph.node.{Case, FragmentInputDefinition, FragmentOutputDefinition} import pl.touk.nussknacker.engine.process.helpers.SampleNodes._ import pl.touk.nussknacker.engine.testmode.TestProcess._ import pl.touk.nussknacker.engine.util.ThreadUtils import pl.touk.nussknacker.engine.ModelData +import pl.touk.nussknacker.engine.api.parameter.{ParameterName, ParameterValueCompileTimeValidation} +import pl.touk.nussknacker.engine.canonicalgraph.canonicalnode.FlatNode +import pl.touk.nussknacker.engine.compile.FragmentResolver +import pl.touk.nussknacker.engine.graph.expression.Expression +import pl.touk.nussknacker.engine.graph.node.FragmentInputDefinition.{FragmentClazzRef, FragmentParameter} +import pl.touk.nussknacker.engine.process.runner.FlinkTestMainSpec.{ + fragmentWithValidationName, + processWithFragmentParameterValidation +} import java.util.{Date, UUID} import scala.concurrent.ExecutionContext.Implicits.global @@ -654,6 +663,23 @@ class FlinkTestMainSpec extends AnyWordSpec with Matchers with Inside with Befor variable(List(ComponentUseCase.TestRuntime, ComponentUseCase.TestRuntime)) ) } + + "should not throw exception when process fragment has parameter validation defined" in { + val scenario = ScenarioBuilder + .streaming("scenario1") + .source(sourceNodeId, "input") + .fragmentOneOut("sub", fragmentWithValidationName, "output", "fragmentResult", "param" -> "'asd'".spel) + .emptySink("out", "valueMonitor", "Value" -> "1".spel) + + val resolved = FragmentResolver(List(processWithFragmentParameterValidation)).resolve(scenario) + + val results = runFlinkTest( + resolved.valueOr { _ => throw new IllegalArgumentException("Won't happen") }, + ScenarioTestData(List(ScenarioTestJsonRecord(sourceNodeId, Json.fromString("0|1|2|3|4|5|6")))), + useIOMonadInInterpreter + ) + results.exceptions.length shouldBe 0 + } } private def createTestRecord( @@ -710,3 +736,31 @@ class FlinkTestMainSpec extends AnyWordSpec with Matchers with Inside with Befor } } + +object FlinkTestMainSpec { + private val fragmentWithValidationName = "fragmentWithValidation" + + private val processWithFragmentParameterValidation: CanonicalProcess = { + val fragmentParamName = ParameterName("param") + val fragmentParam = FragmentParameter(fragmentParamName, FragmentClazzRef[String]).copy( + valueCompileTimeValidation = Some( + ParameterValueCompileTimeValidation( + validationExpression = Expression.spel("true"), + validationFailedMessage = Some("param validation failed") + ) + ) + ) + + CanonicalProcess( + MetaData(fragmentWithValidationName, FragmentSpecificData()), + List( + FlatNode( + FragmentInputDefinition("start", List(fragmentParam)) + ), + FlatNode(FragmentOutputDefinition("out1", "output", List.empty)) + ), + List.empty + ) + } + +}