Skip to content

Commit

Permalink
Add test
Browse files Browse the repository at this point in the history
  • Loading branch information
Elmacioro committed Nov 18, 2024
1 parent 5c8acdf commit f4ac49f
Showing 1 changed file with 56 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import org.apache.flink.runtime.client.JobExecutionException
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec
import org.scalatest.{BeforeAndAfterEach, Inside, OptionValues}
import pl.touk.nussknacker.engine.api.{CirceUtil, DisplayJsonWithEncoder}
import pl.touk.nussknacker.engine.api.{CirceUtil, DisplayJsonWithEncoder, FragmentSpecificData, MetaData}
import pl.touk.nussknacker.engine.api.process.ComponentUseCase
import pl.touk.nussknacker.engine.api.test.{ScenarioTestData, ScenarioTestJsonRecord}
import pl.touk.nussknacker.engine.build.{GraphBuilder, ScenarioBuilder}
Expand All @@ -16,11 +16,20 @@ import pl.touk.nussknacker.engine.flink.test.{
RecordingExceptionConsumer,
RecordingExceptionConsumerProvider
}
import pl.touk.nussknacker.engine.graph.node.Case
import pl.touk.nussknacker.engine.graph.node.{Case, FragmentInputDefinition, FragmentOutputDefinition}
import pl.touk.nussknacker.engine.process.helpers.SampleNodes._
import pl.touk.nussknacker.engine.testmode.TestProcess._
import pl.touk.nussknacker.engine.util.ThreadUtils
import pl.touk.nussknacker.engine.ModelData
import pl.touk.nussknacker.engine.api.parameter.{ParameterName, ParameterValueCompileTimeValidation}
import pl.touk.nussknacker.engine.canonicalgraph.canonicalnode.FlatNode
import pl.touk.nussknacker.engine.compile.FragmentResolver
import pl.touk.nussknacker.engine.graph.expression.Expression
import pl.touk.nussknacker.engine.graph.node.FragmentInputDefinition.{FragmentClazzRef, FragmentParameter}
import pl.touk.nussknacker.engine.process.runner.FlinkTestMainSpec.{
fragmentWithValidationName,
processWithFragmentParameterValidation
}

import java.util.{Date, UUID}
import scala.concurrent.ExecutionContext.Implicits.global
Expand Down Expand Up @@ -654,6 +663,23 @@ class FlinkTestMainSpec extends AnyWordSpec with Matchers with Inside with Befor
variable(List(ComponentUseCase.TestRuntime, ComponentUseCase.TestRuntime))
)
}

"should not throw exception when process fragment has parameter validation defined" in {
val scenario = ScenarioBuilder
.streaming("scenario1")
.source(sourceNodeId, "input")
.fragmentOneOut("sub", fragmentWithValidationName, "output", "fragmentResult", "param" -> "'asd'".spel)
.emptySink("out", "valueMonitor", "Value" -> "1".spel)

val resolved = FragmentResolver(List(processWithFragmentParameterValidation)).resolve(scenario)

val results = runFlinkTest(
resolved.valueOr { _ => throw new IllegalArgumentException("Won't happen") },
ScenarioTestData(List(ScenarioTestJsonRecord(sourceNodeId, Json.fromString("0|1|2|3|4|5|6")))),
useIOMonadInInterpreter
)
results.exceptions.length shouldBe 0
}
}

private def createTestRecord(
Expand Down Expand Up @@ -710,3 +736,31 @@ class FlinkTestMainSpec extends AnyWordSpec with Matchers with Inside with Befor
}

}

object FlinkTestMainSpec {
private val fragmentWithValidationName = "fragmentWithValidation"

private val processWithFragmentParameterValidation: CanonicalProcess = {
val fragmentParamName = ParameterName("param")
val fragmentParam = FragmentParameter(fragmentParamName, FragmentClazzRef[String]).copy(
valueCompileTimeValidation = Some(
ParameterValueCompileTimeValidation(
validationExpression = Expression.spel("true"),
validationFailedMessage = Some("param validation failed")
)
)
)

CanonicalProcess(
MetaData(fragmentWithValidationName, FragmentSpecificData()),
List(
FlatNode(
FragmentInputDefinition("start", List(fragmentParam))
),
FlatNode(FragmentOutputDefinition("out1", "output", List.empty))
),
List.empty
)
}

}

0 comments on commit f4ac49f

Please sign in to comment.