Skip to content

Commit

Permalink
Fix for testing mechanism for table sources: using full, model classp…
Browse files Browse the repository at this point in the history
…ath instead of only flinkTable.jar (#6950)

* Testing mechanism usage for Iceberg purpose fix attempt: model classloader instead of one, hardcoded jar
* designer logs in e2e tests
* NussknackerVersion parsing fix after semver bump
  • Loading branch information
arkadius authored Sep 27, 2024
1 parent 396d508 commit f06ea95
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,11 @@ trait ComponentProvider {

object NussknackerVersion {

val current: NussknackerVersion = NussknackerVersion(new Semver(BuildInfo.version))
val current: NussknackerVersion = parse(BuildInfo.version)

def parse(versionString: String): NussknackerVersion = {
NussknackerVersion(Semver.coerce(versionString))
}

}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package pl.touk.nussknacker.engine.api.component

import org.scalatest.funsuite.AnyFunSuiteLike
import org.scalatest.matchers.should.Matchers
import scala.jdk.CollectionConverters._

class NussknackerVersionTest extends AnyFunSuiteLike with Matchers {

test("should allow to use underscores in nussknacker version") {
val version =
NussknackerVersion.parse("1.18.0-preview_testing-mechanism-iceberg-fix-2024-09-26-20745-9048b0f0a-SNAPSHOT")
version.value.getMajor shouldBe 1
version.value.getMinor shouldBe 18
version.value.getPatch shouldBe 0
version.value.getPreRelease.asScala shouldBe empty
version.value.getBuild.asScala shouldBe empty
}

}
5 changes: 2 additions & 3 deletions docs/Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@
* [#6734](https://github.com/TouK/nussknacker/pull/6734) Tables from external catalogs are now refreshed automatically
when entering into node form. Please be aware that changes in `tableDefinition.sql` are not refreshed.
To do this, use `/app/processingtype/reload` API
* [#6741](https://github.com/TouK/nussknacker/pull/6741) Added `catalogConfiguration` configuration option allowing to set up catalog
* [#6741](https://github.com/TouK/nussknacker/pull/6741) [#6886](https://github.com/TouK/nussknacker/pull/6886) Added `catalogConfiguration` configuration option allowing to set up catalog
directly in config instead of by `tableDefinition.sql`
* [#6741](https://github.com/TouK/nussknacker/pull/6741) (Breaking change) Fully qualified table paths are used instead of table names
in table source and sink components in `Table` parameter
* [#6950](https://github.com/TouK/nussknacker/pull/6950) Fix for testing mechanism for table sources: using full, model classpath instead of only flinkTable.jar
* [#6716](https://github.com/TouK/nussknacker/pull/6716) Fix type hints for #COLLECTION.merge function.
* [#6695](https://github.com/TouK/nussknacker/pull/6695) From now on, arrays on UI are visible as lists but on a
background they are stored as it is and SpeL converts them to lists in a runtime.
Expand All @@ -47,8 +48,6 @@
some types of unallowed expressions.
* [#6880](https://github.com/TouK/nussknacker/pull/6880) Performance optimization of generating Avro messages with unions
- shorter message in logs
* [#6886](https://github.com/TouK/nussknacker/pull/6886) Fix for "Illegal table name:$nuCatalog" error when using Apache Iceberg catalog.
Internal Nussknacker catalog is now named `_nu_catalog`
* [#6766](https://github.com/TouK/nussknacker/pull/6766) Scenario labels support - you can assign labels to scenarios and use them to filter the scenario list
* [#6176](https://github.com/TouK/nussknacker/pull/6176) Update most dependencies to latest versions, most important ones:
* Jackson 2.15.4 -> 2.17.2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ import org.apache.commons.io.FileUtils
import org.apache.flink.configuration.{Configuration, CoreOptions, PipelineOptions}
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.table.api.Expressions.$
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment
import org.apache.flink.table.catalog.ObjectIdentifier
import org.apache.flink.types.Row
import pl.touk.nussknacker.engine.api.test.{TestData, TestRecord}
import pl.touk.nussknacker.engine.flink.table.definition.FlinkDataDefinition
import pl.touk.nussknacker.engine.flink.table.definition.FlinkDataDefinition._
import pl.touk.nussknacker.engine.util.ThreadUtils

import java.net.URLClassLoader
import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Path, StandardOpenOption}
import java.util.UUID
Expand All @@ -24,18 +24,17 @@ import scala.util.{Failure, Success, Try, Using}

object FlinkMiniClusterTableOperations extends LazyLogging {

def parseTestRecords(records: List[TestRecord], schema: Schema): List[Row] =
ThreadUtils.withThisAsContextClassLoader(getClass.getClassLoader) {
implicit val env: StreamTableEnvironment = MiniClusterEnvBuilder.buildStreamTableEnv
val (inputTablePath, inputTableName) = createTempFileTable(schema)
val parsedRecords = Try {
writeRecordsToFile(inputTablePath, records)
val inputTable = env.from(s"`$inputTableName`")
env.toDataStream(inputTable).executeAndCollect().asScala.toList
}
cleanup(inputTablePath)
parsedRecords.get
def parseTestRecords(records: List[TestRecord], schema: Schema): List[Row] = {
implicit val env: StreamTableEnvironment = MiniClusterEnvBuilder.buildStreamTableEnv
val (inputTablePath, inputTableName) = createTempFileTable(schema)
val parsedRecords = Try {
writeRecordsToFile(inputTablePath, records)
val inputTable = env.from(s"`$inputTableName`")
env.toDataStream(inputTable).executeAndCollect().asScala.toList
}
cleanup(inputTablePath)
parsedRecords.get
}

def generateLiveTestData(
limit: Int,
Expand All @@ -61,21 +60,18 @@ object FlinkMiniClusterTableOperations extends LazyLogging {
limit: Int,
schema: Schema,
buildSourceTable: TableEnvironment => Table
): TestData =
// setting context classloader because Flink in multiple places relies on it and without this temporary override it doesnt have
// the necessary classes
ThreadUtils.withThisAsContextClassLoader(getClass.getClassLoader) {
implicit val env: TableEnvironment = MiniClusterEnvBuilder.buildTableEnv
val sourceTable = buildSourceTable(env)
val (outputFilePath, outputTableName) = createTempFileTable(schema)
val generatedRows = Try {
insertDataAndAwait(sourceTable, outputTableName, limit)
readRecordsFromFilesUnderPath(outputFilePath)
}
cleanup(outputFilePath)
val rows = generatedRows.get
TestData(rows.map(TestRecord(_)))
): TestData = {
implicit val env: TableEnvironment = MiniClusterEnvBuilder.buildTableEnv
val sourceTable = buildSourceTable(env)
val (outputFilePath, outputTableName) = createTempFileTable(schema)
val generatedRows = Try {
insertDataAndAwait(sourceTable, outputTableName, limit)
readRecordsFromFilesUnderPath(outputFilePath)
}
cleanup(outputFilePath)
val rows = generatedRows.get
TestData(rows.map(TestRecord(_)))
}

private def writeRecordsToFile(path: Path, records: List[TestRecord]): Unit = {
val jsonRecords: List[String] = records.map(_.json.noSpaces)
Expand Down Expand Up @@ -165,27 +161,28 @@ object FlinkMiniClusterTableOperations extends LazyLogging {

private object MiniClusterEnvBuilder {

// TODO: how to get path of jar cleaner? Through config?
private val classPathUrlsForMiniClusterTestingEnv = List(
"components/flink-table/flinkTable.jar"
).map(Path.of(_).toUri.toURL)

private val streamEnvConfig = {
private lazy val streamEnvConfig = {
val conf = new Configuration()

// parent-first - otherwise linkage error (loader constraint violation, a different class with the same name was
// previously loaded by 'app') for class 'org.apache.commons.math3.random.RandomDataGenerator'
conf.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "parent-first")

// without this, on Flink taskmanager level the classloader is basically empty
conf.set(
PipelineOptions.CLASSPATHS,
classPathUrlsForMiniClusterTestingEnv.map(_.toString).asJava
)
// Here is a hidden assumption that getClass.getClassLoader is the model classloader and another hidden assumuption that model classloader has all necessary jars (including connectors)
// TODO: we should explicitly pass model classloader + we should split model classloader into libs that are only for
// testing mechanism purpose (in the real deployment, they are already available in Flink), for example table connectors
Thread.currentThread().getContextClassLoader match {
case url: URLClassLoader =>
conf.set(PipelineOptions.CLASSPATHS, url.getURLs.toList.map(_.toString).asJava)
case _ =>
logger.warn(
"Context classloader is not a URLClassLoader. Probably data generation invocation wasn't wrapped with ModelData.withThisAsContextClassLoader. MiniCluster classpath set up will be skipped."
)
}
conf.set(CoreOptions.DEFAULT_PARALLELISM, Int.box(1))
}

private val tableEnvConfig = EnvironmentSettings.newInstance().withConfiguration(streamEnvConfig).build()
private lazy val tableEnvConfig = EnvironmentSettings.newInstance().withConfiguration(streamEnvConfig).build()

def buildTableEnv: TableEnvironment = TableEnvironment.create(tableEnvConfig)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,13 @@ class ModelDataTestInfoProvider(modelData: ModelData) extends TestInfoProvider w
}

private def generateTestData(generators: NonEmptyList[(NodeId, TestDataGenerator)], size: Int) = {
val sourceTestDataList = generators.map { case (sourceId, testDataGenerator) =>
val sourceTestRecords = testDataGenerator.generateTestData(size).testRecords
sourceTestRecords.map(testRecord => ScenarioTestJsonRecord(sourceId, testRecord))
modelData.withThisAsContextClassLoader {
val sourceTestDataList = generators.map { case (sourceId, testDataGenerator) =>
val sourceTestRecords = testDataGenerator.generateTestData(size).testRecords
sourceTestRecords.map(testRecord => ScenarioTestJsonRecord(sourceId, testRecord))
}
ListUtil.mergeLists(sourceTestDataList.toList, size)
}
ListUtil.mergeLists(sourceTestDataList.toList, size)
}

override def prepareTestData(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import pl.touk.nussknacker.engine.definition.clazz.ClassDefinitionSet
import pl.touk.nussknacker.engine.definition.globalvariables.ExpressionConfigDefinition
import pl.touk.nussknacker.engine.expression.ExpressionEvaluator
import pl.touk.nussknacker.engine.graph.expression.Expression
import pl.touk.nussknacker.engine.util.ThreadUtils
import pl.touk.nussknacker.engine.variables.GlobalVariablesPreparer

class TestDataPreparer(
Expand Down Expand Up @@ -50,7 +51,11 @@ class TestDataPreparer(
case Nil => List.empty
case _ =>
source match {
case s: SourceTestSupport[T @unchecked] => s.testRecordParser.parse(jsonRecordList.map(_.record))
case s: SourceTestSupport[T @unchecked] =>
val parser = s.testRecordParser
ThreadUtils.withThisAsContextClassLoader(classloader) {
parser.parse(jsonRecordList.map(_.record))
}
case other =>
throw new IllegalArgumentException(
s"Source ${other.getClass} cannot be stubbed - it doesn't provide test data parser"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ class DockerBasedInstallationExampleNuEnvironment(
"NUSSKNACKER_VERSION" -> nussknackerImageVersion
),
logConsumers = Seq(
ServiceLogConsumer("bootstrap-setup", new Slf4jLogConsumer(slf4jLogger))
ServiceLogConsumer("bootstrap-setup", new Slf4jLogConsumer(slf4jLogger)),
ServiceLogConsumer("designer", new Slf4jLogConsumer(slf4jLogger)),
),
waitingFor = Some(
WaitingForService(
Expand Down

0 comments on commit f06ea95

Please sign in to comment.