From bd248375a3704a9bcdfde82a9c5c7477622cf67b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Cio=C5=82ecki?= Date: Mon, 28 Oct 2024 15:15:31 +0100 Subject: [PATCH 1/4] Improvement: FlinkBaseTypeInfoRegister mechanism --- docs/Changelog.md | 4 +- docs/MigrationGuide.md | 6 +- .../FlinkBaseTypeInfoRegister.scala | 107 ++++++++++++++++++ .../TypeInformationDetection.scala | 2 + .../process/ExecutionConfigPreparer.scala | 2 + ...gResultAwareTypeInformationDetection.scala | 23 ---- 6 files changed, 119 insertions(+), 25 deletions(-) create mode 100644 engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/FlinkBaseTypeInfoRegister.scala diff --git a/docs/Changelog.md b/docs/Changelog.md index 0d5cb549075..964186e08fe 100644 --- a/docs/Changelog.md +++ b/docs/Changelog.md @@ -77,7 +77,9 @@ * [#6988](https://github.com/TouK/nussknacker/pull/6988) Remove unused API classes: `MultiMap`, `TimestampedEvictableStateFunction` * [#7000](https://github.com/TouK/nussknacker/pull/7000) Show all possible options for dictionary editor on open. * [#6979](https://github.com/TouK/nussknacker/pull/6979) Introduces an activities panel that provides information about all system activities. -* [#7058](https://github.com/TouK/nussknacker/pull/7058) Performance optimization: Add missing Flink TypeInformation for better serialization +* Performance optimization: + * [#7058](https://github.com/TouK/nussknacker/pull/7058) Add missing Flink TypeInformation for better serialization + * [#7097](https://github.com/TouK/nussknacker/pull/7097) Flink base types registration mechanism ## 1.17 diff --git a/docs/MigrationGuide.md b/docs/MigrationGuide.md index 18e00b7c811..9e28f781664 100644 --- a/docs/MigrationGuide.md +++ b/docs/MigrationGuide.md @@ -75,10 +75,14 @@ To see the biggest differences please consult the [changelog](Changelog.md). want to keep using Flink pre-1.19 with current Nussknacker, please refer to compatibility providing plugins in https://github.com/TouK/nussknacker-flink-compatibility. -* [#7058](https://github.com/TouK/nussknacker/pull/7058) Performance optimization: Add missing Flink TypeInformation for better serialization +* Performance optimization: + * [#7058](https://github.com/TouK/nussknacker/pull/7058) Add missing Flink TypeInformation for better serialization * In case of using base (bounded and unbounded) Flink components state will be probably not compatible * `FlinkCustomNodeContext.typeInformationDetection` has been removed, please use `TypeInformationDetection.instance` instead * `FlinkCustomNodeContext.forCustomContext` has been removed, please use `TypeInformationDetection.instance.forValueWithContext` instead + * [#7097](https://github.com/TouK/nussknacker/pull/7097) Flink base types registration mechanism + * In case of using types: java.time.LocalDate, java.time.LocalTime, java.time.LocalDateTime, java.time.Instant, + java.sql.Date, java.sql.Time, java.sql.Timestamp with CaseClassTypeInfo mechanism, state probably will be lost ### Configuration changes * [#6979](https://github.com/TouK/nussknacker/pull/6979) Add `type: "activities-panel"` to the `processToolbarConfig` which replaces removed `{ type: "versions-panel" }` `{ type: "comments-panel" }` and `{ type: "attachments-panel" }` diff --git a/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/FlinkBaseTypeInfoRegister.scala b/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/FlinkBaseTypeInfoRegister.scala new file mode 100644 index 00000000000..11da275612e --- /dev/null +++ b/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/FlinkBaseTypeInfoRegister.scala @@ -0,0 +1,107 @@ +package pl.touk.nussknacker.engine.flink.api.typeinformation + +import org.apache.flink.api.common.typeinfo.{TypeInfoFactory, TypeInformation, Types} +import org.apache.flink.api.java.typeutils.TypeExtractor + +import java.lang.reflect.Type +import java.sql.{Date => SqlDate, Time => SqlTime, Timestamp => SqlTimestamp} +import java.time.{Instant, LocalDate, LocalDateTime, LocalTime} +import java.util + +object FlinkBaseTypeInfoRegister { + + private case class Base[T, K <: TypeInfoFactory[T]](klass: Class[T], factoryClass: Class[K]) + + private val baseTypes = List( + Base(classOf[LocalDate], classOf[LocalDateTypeInfoFactory]), + Base(classOf[LocalTime], classOf[LocalTimeTypeInfoFactory]), + Base(classOf[LocalDateTime], classOf[LocalDateTimeTypeInfoFactory]), + Base(classOf[Instant], classOf[InstantTypeInfoFactory]), + Base(classOf[SqlDate], classOf[SqlDateTypeInfoFactory]), + Base(classOf[SqlTime], classOf[SqlTimeTypeInfoFactory]), + Base(classOf[SqlTimestamp], classOf[SqlTimestampTypeInfoFactory]), + ) + + def makeSureBaseTypesAreRegistered(): Unit = + baseTypes.foreach { base => + register(base) + } + + private def register(base: Base[_, _ <: TypeInfoFactory[_]]): Unit = { + val opt = Option(TypeExtractor.getTypeInfoFactory(base.klass)) + if (opt.isEmpty) { + TypeExtractor.registerFactory(base.klass, base.factoryClass) + } + } + + class LocalDateTypeInfoFactory extends TypeInfoFactory[LocalDate] { + + override def createTypeInfo( + t: Type, + genericParameters: util.Map[String, TypeInformation[_]] + ): TypeInformation[LocalDate] = + Types.LOCAL_DATE + + } + + class LocalTimeTypeInfoFactory extends TypeInfoFactory[LocalTime] { + + override def createTypeInfo( + t: Type, + genericParameters: util.Map[String, TypeInformation[_]] + ): TypeInformation[LocalTime] = + Types.LOCAL_TIME + + } + + class LocalDateTimeTypeInfoFactory extends TypeInfoFactory[LocalDateTime] { + + override def createTypeInfo( + t: Type, + genericParameters: util.Map[String, TypeInformation[_]] + ): TypeInformation[LocalDateTime] = + Types.LOCAL_DATE_TIME + + } + + class InstantTypeInfoFactory extends TypeInfoFactory[Instant] { + + override def createTypeInfo( + t: Type, + genericParameters: util.Map[String, TypeInformation[_]] + ): TypeInformation[Instant] = + Types.INSTANT + + } + + class SqlDateTypeInfoFactory extends TypeInfoFactory[SqlDate] { + + override def createTypeInfo( + t: Type, + genericParameters: util.Map[String, TypeInformation[_]] + ): TypeInformation[SqlDate] = + Types.SQL_DATE + + } + + class SqlTimeTypeInfoFactory extends TypeInfoFactory[SqlTime] { + + override def createTypeInfo( + t: Type, + genericParameters: util.Map[String, TypeInformation[_]] + ): TypeInformation[SqlTime] = + Types.SQL_TIME + + } + + class SqlTimestampTypeInfoFactory extends TypeInfoFactory[SqlTimestamp] { + + override def createTypeInfo( + t: Type, + genericParameters: util.Map[String, TypeInformation[_]] + ): TypeInformation[SqlTimestamp] = + Types.SQL_TIMESTAMP + + } + +} diff --git a/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/TypeInformationDetection.scala b/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/TypeInformationDetection.scala index 126b8c8f305..9e7fabe879a 100644 --- a/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/TypeInformationDetection.scala +++ b/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/TypeInformationDetection.scala @@ -52,6 +52,8 @@ object TypeInformationDetection { // We use SPI to provide implementation of TypeInformationDetection because we don't want to make // implementation classes available in flink-components-api module. val instance: TypeInformationDetection = { + FlinkBaseTypeInfoRegister.makeSureBaseTypesAreRegistered() + val classloader = Thread.currentThread().getContextClassLoader ServiceLoader .load(classOf[TypeInformationDetection], classloader) diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/ExecutionConfigPreparer.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/ExecutionConfigPreparer.scala index 75e95832428..30d57828379 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/ExecutionConfigPreparer.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/ExecutionConfigPreparer.scala @@ -9,6 +9,7 @@ import pl.touk.nussknacker.engine.ModelData import pl.touk.nussknacker.engine.api.namespaces.NamingStrategy import pl.touk.nussknacker.engine.api.{JobData, ProcessVersion} import pl.touk.nussknacker.engine.deployment.DeploymentData +import pl.touk.nussknacker.engine.flink.api.typeinformation.FlinkBaseTypeInfoRegister import pl.touk.nussknacker.engine.flink.api.{NamespaceMetricsTags, NkGlobalParameters} import pl.touk.nussknacker.engine.process.util.Serializers @@ -100,6 +101,7 @@ object ExecutionConfigPreparer extends LazyLogging { override def prepareExecutionConfig( config: ExecutionConfig )(jobData: JobData, deploymentData: DeploymentData): Unit = { + FlinkBaseTypeInfoRegister.makeSureBaseTypesAreRegistered() Serializers.registerSerializers(modelData, config) if (enableObjectReuse) { config.enableObjectReuse() diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/TypingResultAwareTypeInformationDetection.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/TypingResultAwareTypeInformationDetection.scala index baadb2baa59..396b668266d 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/TypingResultAwareTypeInformationDetection.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/TypingResultAwareTypeInformationDetection.scala @@ -29,27 +29,6 @@ import pl.touk.nussknacker.engine.util.Implicits._ */ class TypingResultAwareTypeInformationDetection extends TypeInformationDetection { - private val registeredTypeInfos: Map[TypedClass, TypeInformation[_]] = Map( - Typed.typedClass[String] -> Types.STRING, - Typed.typedClass[Boolean] -> Types.BOOLEAN, - Typed.typedClass[Byte] -> Types.BYTE, - Typed.typedClass[Short] -> Types.SHORT, - Typed.typedClass[Integer] -> Types.INT, - Typed.typedClass[Long] -> Types.LONG, - Typed.typedClass[Float] -> Types.FLOAT, - Typed.typedClass[Double] -> Types.DOUBLE, - Typed.typedClass[Character] -> Types.CHAR, - Typed.typedClass[java.math.BigDecimal] -> Types.BIG_DEC, - Typed.typedClass[java.math.BigInteger] -> Types.BIG_INT, - Typed.typedClass[java.time.LocalDate] -> Types.LOCAL_DATE, - Typed.typedClass[java.time.LocalTime] -> Types.LOCAL_TIME, - Typed.typedClass[java.time.LocalDateTime] -> Types.LOCAL_DATE_TIME, - Typed.typedClass[java.time.Instant] -> Types.INSTANT, - Typed.typedClass[java.sql.Date] -> Types.SQL_DATE, - Typed.typedClass[java.sql.Time] -> Types.SQL_TIME, - Typed.typedClass[java.sql.Timestamp] -> Types.SQL_TIMESTAMP, - ) - def forContext(validationContext: ValidationContext): TypeInformation[Context] = { val variables = forType( Typed.record(validationContext.localVariables, Typed.typedClass[Map[String, AnyRef]]) @@ -82,8 +61,6 @@ class TypingResultAwareTypeInformationDetection extends TypeInformationDetection // We generally don't use scala Maps in our runtime, but it is useful for some internal type infos: TODO move it somewhere else case a: TypedObjectTypingResult if a.runtimeObjType.klass == classOf[Map[String, _]] => createScalaMapTypeInformation(a) - case a: SingleTypingResult if registeredTypeInfos.contains(a.runtimeObjType) => - registeredTypeInfos(a.runtimeObjType) // TODO: scala case classes are not handled nicely here... CaseClassTypeInfo is created only via macro, here Kryo is used case a: SingleTypingResult if a.runtimeObjType.params.isEmpty => TypeInformation.of(a.runtimeObjType.klass) From 466b053badbfcd51def4f37d7562383493f9ab0e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Cio=C5=82ecki?= Date: Tue, 29 Oct 2024 11:11:56 +0100 Subject: [PATCH 2/4] Remove useless code and add tests --- build.sbt | 3 +- docs/MigrationGuide.md | 3 +- .../FlinkBaseTypeInfoRegister.scala | 51 ++----------------- .../FlinkBaseTypeInfoRegisterTest.scala | 49 ++++++++++++++++++ 4 files changed, 55 insertions(+), 51 deletions(-) create mode 100644 engine/flink/components-api/src/test/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/FlinkBaseTypeInfoRegisterTest.scala diff --git a/build.sbt b/build.sbt index d83e32a5ed6..8301ba4a58a 100644 --- a/build.sbt +++ b/build.sbt @@ -1643,7 +1643,8 @@ lazy val flinkComponentsApi = (project in flink("components-api")) name := "nussknacker-flink-components-api", libraryDependencies ++= { Seq( - "org.apache.flink" % "flink-streaming-java" % flinkV % Provided, + "org.apache.flink" % "flink-streaming-java" % flinkV % Provided, + "org.scalatest" %% "scalatest" % scalaTestV % Test ) } ) diff --git a/docs/MigrationGuide.md b/docs/MigrationGuide.md index 9e28f781664..e036cdc24ac 100644 --- a/docs/MigrationGuide.md +++ b/docs/MigrationGuide.md @@ -81,8 +81,7 @@ To see the biggest differences please consult the [changelog](Changelog.md). * `FlinkCustomNodeContext.typeInformationDetection` has been removed, please use `TypeInformationDetection.instance` instead * `FlinkCustomNodeContext.forCustomContext` has been removed, please use `TypeInformationDetection.instance.forValueWithContext` instead * [#7097](https://github.com/TouK/nussknacker/pull/7097) Flink base types registration mechanism - * In case of using types: java.time.LocalDate, java.time.LocalTime, java.time.LocalDateTime, java.time.Instant, - java.sql.Date, java.sql.Time, java.sql.Timestamp with CaseClassTypeInfo mechanism, state probably will be lost + * In case of using types: java.time.LocalDate, java.time.LocalTime, java.time.LocalDateTime with CaseClassTypeInfo mechanism, state probably will be lost ### Configuration changes * [#6979](https://github.com/TouK/nussknacker/pull/6979) Add `type: "activities-panel"` to the `processToolbarConfig` which replaces removed `{ type: "versions-panel" }` `{ type: "comments-panel" }` and `{ type: "attachments-panel" }` diff --git a/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/FlinkBaseTypeInfoRegister.scala b/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/FlinkBaseTypeInfoRegister.scala index 11da275612e..7ecc963f091 100644 --- a/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/FlinkBaseTypeInfoRegister.scala +++ b/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/FlinkBaseTypeInfoRegister.scala @@ -4,22 +4,17 @@ import org.apache.flink.api.common.typeinfo.{TypeInfoFactory, TypeInformation, T import org.apache.flink.api.java.typeutils.TypeExtractor import java.lang.reflect.Type -import java.sql.{Date => SqlDate, Time => SqlTime, Timestamp => SqlTimestamp} -import java.time.{Instant, LocalDate, LocalDateTime, LocalTime} +import java.time.{LocalDate, LocalDateTime, LocalTime} import java.util object FlinkBaseTypeInfoRegister { - private case class Base[T, K <: TypeInfoFactory[T]](klass: Class[T], factoryClass: Class[K]) + private[typeinformation] case class Base[T, K <: TypeInfoFactory[T]](klass: Class[T], factoryClass: Class[K]) - private val baseTypes = List( + private[typeinformation] val baseTypes = List( Base(classOf[LocalDate], classOf[LocalDateTypeInfoFactory]), Base(classOf[LocalTime], classOf[LocalTimeTypeInfoFactory]), Base(classOf[LocalDateTime], classOf[LocalDateTimeTypeInfoFactory]), - Base(classOf[Instant], classOf[InstantTypeInfoFactory]), - Base(classOf[SqlDate], classOf[SqlDateTypeInfoFactory]), - Base(classOf[SqlTime], classOf[SqlTimeTypeInfoFactory]), - Base(classOf[SqlTimestamp], classOf[SqlTimestampTypeInfoFactory]), ) def makeSureBaseTypesAreRegistered(): Unit = @@ -64,44 +59,4 @@ object FlinkBaseTypeInfoRegister { } - class InstantTypeInfoFactory extends TypeInfoFactory[Instant] { - - override def createTypeInfo( - t: Type, - genericParameters: util.Map[String, TypeInformation[_]] - ): TypeInformation[Instant] = - Types.INSTANT - - } - - class SqlDateTypeInfoFactory extends TypeInfoFactory[SqlDate] { - - override def createTypeInfo( - t: Type, - genericParameters: util.Map[String, TypeInformation[_]] - ): TypeInformation[SqlDate] = - Types.SQL_DATE - - } - - class SqlTimeTypeInfoFactory extends TypeInfoFactory[SqlTime] { - - override def createTypeInfo( - t: Type, - genericParameters: util.Map[String, TypeInformation[_]] - ): TypeInformation[SqlTime] = - Types.SQL_TIME - - } - - class SqlTimestampTypeInfoFactory extends TypeInfoFactory[SqlTimestamp] { - - override def createTypeInfo( - t: Type, - genericParameters: util.Map[String, TypeInformation[_]] - ): TypeInformation[SqlTimestamp] = - Types.SQL_TIMESTAMP - - } - } diff --git a/engine/flink/components-api/src/test/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/FlinkBaseTypeInfoRegisterTest.scala b/engine/flink/components-api/src/test/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/FlinkBaseTypeInfoRegisterTest.scala new file mode 100644 index 00000000000..baac871624e --- /dev/null +++ b/engine/flink/components-api/src/test/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/FlinkBaseTypeInfoRegisterTest.scala @@ -0,0 +1,49 @@ +package pl.touk.nussknacker.engine.flink.api.typeinformation + +import org.apache.flink.api.common.typeinfo.{TypeInformation, Types} +import org.apache.flink.api.java.typeutils.GenericTypeInfo +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.should.Matchers + +import java.time.{Instant, LocalDate, LocalDateTime, LocalTime} +import java.sql.{Date, Time, Timestamp} + +class FlinkBaseTypeInfoRegisterTest extends AnyFunSuite with Matchers { + + private val reqByNuTypesMapping: Map[Class[_], TypeInformation[_]] = Map( + classOf[LocalDate] -> Types.LOCAL_DATE, + classOf[LocalTime] -> Types.LOCAL_TIME, + classOf[LocalDateTime] -> Types.LOCAL_DATE_TIME, + ) + + private val otherTypesMapping: Map[Class[_], TypeInformation[_]] = Map( + classOf[Instant] -> Types.INSTANT, + classOf[Date] -> Types.SQL_DATE, + classOf[Time] -> Types.SQL_TIME, + classOf[Timestamp] -> Types.SQL_TIMESTAMP, + ) + + test("Looking for TypeInformation for a base klass should return a GenericTypeInfo") { + FlinkBaseTypeInfoRegister.baseTypes.foreach { base => + val typeInfo = TypeInformation.of(base.klass) + typeInfo shouldBe new GenericTypeInfo(base.klass) + } + } + + test("Looking for TypeInformation for a base klass with register should return a SpecificTypeInformation") { + FlinkBaseTypeInfoRegister.makeSureBaseTypesAreRegistered() + + FlinkBaseTypeInfoRegister.baseTypes.foreach { base => + val typeInfo = TypeInformation.of(base.klass) + Some(typeInfo) shouldBe reqByNuTypesMapping.get(base.klass) + } + } + + test("Verify TypeInformation.of for other types") { + otherTypesMapping.foreach { case (klass, expected) => + val typeInfo = TypeInformation.of(klass) + typeInfo shouldBe expected + } + } + +} From 6071698fedf27ed8985fed11f643f9add01004f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Cio=C5=82ecki?= Date: Tue, 29 Oct 2024 12:16:34 +0100 Subject: [PATCH 3/4] Changes to review --- ...scala => FlinkBaseTypeInfoRegistrar.scala} | 20 +++---- .../TypeInformationDetection.scala | 2 +- .../FlinkBaseTypeInfoRegisterTest.scala | 49 --------------- .../FlinkBaseTypeInfoRegistrarTest.scala | 60 +++++++++++++++++++ .../process/ExecutionConfigPreparer.scala | 4 +- 5 files changed, 73 insertions(+), 62 deletions(-) rename engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/{FlinkBaseTypeInfoRegister.scala => FlinkBaseTypeInfoRegistrar.scala} (63%) delete mode 100644 engine/flink/components-api/src/test/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/FlinkBaseTypeInfoRegisterTest.scala create mode 100644 engine/flink/components-api/src/test/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/FlinkBaseTypeInfoRegistrarTest.scala diff --git a/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/FlinkBaseTypeInfoRegister.scala b/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/FlinkBaseTypeInfoRegistrar.scala similarity index 63% rename from engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/FlinkBaseTypeInfoRegister.scala rename to engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/FlinkBaseTypeInfoRegistrar.scala index 7ecc963f091..37ecbaa1076 100644 --- a/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/FlinkBaseTypeInfoRegister.scala +++ b/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/FlinkBaseTypeInfoRegistrar.scala @@ -7,25 +7,25 @@ import java.lang.reflect.Type import java.time.{LocalDate, LocalDateTime, LocalTime} import java.util -object FlinkBaseTypeInfoRegister { +object FlinkBaseTypeInfoRegistrar { - private[typeinformation] case class Base[T, K <: TypeInfoFactory[T]](klass: Class[T], factoryClass: Class[K]) + private case class RegistrationEntry[T, K <: TypeInfoFactory[T]](klass: Class[T], factoryClass: Class[K]) - private[typeinformation] val baseTypes = List( - Base(classOf[LocalDate], classOf[LocalDateTypeInfoFactory]), - Base(classOf[LocalTime], classOf[LocalTimeTypeInfoFactory]), - Base(classOf[LocalDateTime], classOf[LocalDateTimeTypeInfoFactory]), + private val baseTypes = List( + RegistrationEntry(classOf[LocalDate], classOf[LocalDateTypeInfoFactory]), + RegistrationEntry(classOf[LocalTime], classOf[LocalTimeTypeInfoFactory]), + RegistrationEntry(classOf[LocalDateTime], classOf[LocalDateTimeTypeInfoFactory]), ) - def makeSureBaseTypesAreRegistered(): Unit = + def ensureBaseTypesAreRegistered(): Unit = baseTypes.foreach { base => register(base) } - private def register(base: Base[_, _ <: TypeInfoFactory[_]]): Unit = { - val opt = Option(TypeExtractor.getTypeInfoFactory(base.klass)) + private def register(entry: RegistrationEntry[_, _ <: TypeInfoFactory[_]]): Unit = { + val opt = Option(TypeExtractor.getTypeInfoFactory(entry.klass)) if (opt.isEmpty) { - TypeExtractor.registerFactory(base.klass, base.factoryClass) + TypeExtractor.registerFactory(entry.klass, entry.factoryClass) } } diff --git a/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/TypeInformationDetection.scala b/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/TypeInformationDetection.scala index 9e7fabe879a..2e7fbef74ea 100644 --- a/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/TypeInformationDetection.scala +++ b/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/TypeInformationDetection.scala @@ -52,7 +52,7 @@ object TypeInformationDetection { // We use SPI to provide implementation of TypeInformationDetection because we don't want to make // implementation classes available in flink-components-api module. val instance: TypeInformationDetection = { - FlinkBaseTypeInfoRegister.makeSureBaseTypesAreRegistered() + FlinkBaseTypeInfoRegistrar.ensureBaseTypesAreRegistered() val classloader = Thread.currentThread().getContextClassLoader ServiceLoader diff --git a/engine/flink/components-api/src/test/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/FlinkBaseTypeInfoRegisterTest.scala b/engine/flink/components-api/src/test/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/FlinkBaseTypeInfoRegisterTest.scala deleted file mode 100644 index baac871624e..00000000000 --- a/engine/flink/components-api/src/test/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/FlinkBaseTypeInfoRegisterTest.scala +++ /dev/null @@ -1,49 +0,0 @@ -package pl.touk.nussknacker.engine.flink.api.typeinformation - -import org.apache.flink.api.common.typeinfo.{TypeInformation, Types} -import org.apache.flink.api.java.typeutils.GenericTypeInfo -import org.scalatest.funsuite.AnyFunSuite -import org.scalatest.matchers.should.Matchers - -import java.time.{Instant, LocalDate, LocalDateTime, LocalTime} -import java.sql.{Date, Time, Timestamp} - -class FlinkBaseTypeInfoRegisterTest extends AnyFunSuite with Matchers { - - private val reqByNuTypesMapping: Map[Class[_], TypeInformation[_]] = Map( - classOf[LocalDate] -> Types.LOCAL_DATE, - classOf[LocalTime] -> Types.LOCAL_TIME, - classOf[LocalDateTime] -> Types.LOCAL_DATE_TIME, - ) - - private val otherTypesMapping: Map[Class[_], TypeInformation[_]] = Map( - classOf[Instant] -> Types.INSTANT, - classOf[Date] -> Types.SQL_DATE, - classOf[Time] -> Types.SQL_TIME, - classOf[Timestamp] -> Types.SQL_TIMESTAMP, - ) - - test("Looking for TypeInformation for a base klass should return a GenericTypeInfo") { - FlinkBaseTypeInfoRegister.baseTypes.foreach { base => - val typeInfo = TypeInformation.of(base.klass) - typeInfo shouldBe new GenericTypeInfo(base.klass) - } - } - - test("Looking for TypeInformation for a base klass with register should return a SpecificTypeInformation") { - FlinkBaseTypeInfoRegister.makeSureBaseTypesAreRegistered() - - FlinkBaseTypeInfoRegister.baseTypes.foreach { base => - val typeInfo = TypeInformation.of(base.klass) - Some(typeInfo) shouldBe reqByNuTypesMapping.get(base.klass) - } - } - - test("Verify TypeInformation.of for other types") { - otherTypesMapping.foreach { case (klass, expected) => - val typeInfo = TypeInformation.of(klass) - typeInfo shouldBe expected - } - } - -} diff --git a/engine/flink/components-api/src/test/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/FlinkBaseTypeInfoRegistrarTest.scala b/engine/flink/components-api/src/test/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/FlinkBaseTypeInfoRegistrarTest.scala new file mode 100644 index 00000000000..692f9adac32 --- /dev/null +++ b/engine/flink/components-api/src/test/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/FlinkBaseTypeInfoRegistrarTest.scala @@ -0,0 +1,60 @@ +package pl.touk.nussknacker.engine.flink.api.typeinformation + +import org.apache.flink.api.common.typeinfo.{TypeInformation, Types} +import org.apache.flink.api.java.typeutils.GenericTypeInfo +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.should.Matchers + +import java.time.{Instant, LocalDate, LocalDateTime, LocalTime} +import java.sql.{Date, Time, Timestamp} + +class FlinkBaseTypeInfoRegistrarTest extends AnyFunSuite with Matchers { + + private val baseNuTypesMapping: Map[Class[_], TypeInformation[_]] = Map( + classOf[LocalDate] -> Types.LOCAL_DATE, + classOf[LocalTime] -> Types.LOCAL_TIME, + classOf[LocalDateTime] -> Types.LOCAL_DATE_TIME, + ) + + private val baseFlinkTypesMapping: Map[Class[_], TypeInformation[_]] = Map( + classOf[String] -> Types.STRING, + classOf[Boolean] -> Types.BOOLEAN, + classOf[Byte] -> Types.BYTE, + classOf[Short] -> Types.SHORT, + classOf[Integer] -> Types.INT, + classOf[Long] -> Types.LONG, + classOf[Float] -> Types.FLOAT, + classOf[Double] -> Types.DOUBLE, + classOf[Character] -> Types.CHAR, + classOf[java.math.BigDecimal] -> Types.BIG_DEC, + classOf[java.math.BigInteger] -> Types.BIG_INT, + classOf[Instant] -> Types.INSTANT, + classOf[Date] -> Types.SQL_DATE, + classOf[Time] -> Types.SQL_TIME, + classOf[Timestamp] -> Types.SQL_TIMESTAMP, + ) + + test("Looking for TypeInformation for a NU base class should return a GenericTypeInfo") { + baseNuTypesMapping.foreach { case (klass, _) => + val typeInfo = TypeInformation.of(klass) + typeInfo shouldBe new GenericTypeInfo(klass) + } + } + + test("Looking for TypeInformation for a NU base class with registrar should return a specific TypeInformation") { + FlinkBaseTypeInfoRegistrar.ensureBaseTypesAreRegistered() + + baseNuTypesMapping.foreach { case (klass, expected) => + val typeInfo = TypeInformation.of(klass) + typeInfo shouldBe expected + } + } + + test("Make sure that the other types have specific TypeInformation") { + baseFlinkTypesMapping.foreach { case (klass, expected) => + val typeInfo = TypeInformation.of(klass) + typeInfo shouldBe expected + } + } + +} diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/ExecutionConfigPreparer.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/ExecutionConfigPreparer.scala index 30d57828379..8682353951a 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/ExecutionConfigPreparer.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/ExecutionConfigPreparer.scala @@ -9,7 +9,7 @@ import pl.touk.nussknacker.engine.ModelData import pl.touk.nussknacker.engine.api.namespaces.NamingStrategy import pl.touk.nussknacker.engine.api.{JobData, ProcessVersion} import pl.touk.nussknacker.engine.deployment.DeploymentData -import pl.touk.nussknacker.engine.flink.api.typeinformation.FlinkBaseTypeInfoRegister +import pl.touk.nussknacker.engine.flink.api.typeinformation.FlinkBaseTypeInfoRegistrar import pl.touk.nussknacker.engine.flink.api.{NamespaceMetricsTags, NkGlobalParameters} import pl.touk.nussknacker.engine.process.util.Serializers @@ -101,7 +101,7 @@ object ExecutionConfigPreparer extends LazyLogging { override def prepareExecutionConfig( config: ExecutionConfig )(jobData: JobData, deploymentData: DeploymentData): Unit = { - FlinkBaseTypeInfoRegister.makeSureBaseTypesAreRegistered() + FlinkBaseTypeInfoRegistrar.ensureBaseTypesAreRegistered() Serializers.registerSerializers(modelData, config) if (enableObjectReuse) { config.enableObjectReuse() From 2360e166b41c1149b0935f10075201276fa134ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Cio=C5=82ecki?= Date: Tue, 29 Oct 2024 12:30:50 +0100 Subject: [PATCH 4/4] Renaming --- ...trar.scala => FlinkTypeInfoRegistrar.scala} | 6 +++--- .../TypeInformationDetection.scala | 2 +- ....scala => FlinkTypeInfoRegistrarTest.scala} | 18 +++++++++--------- .../process/ExecutionConfigPreparer.scala | 4 ++-- 4 files changed, 15 insertions(+), 15 deletions(-) rename engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/{FlinkBaseTypeInfoRegistrar.scala => FlinkTypeInfoRegistrar.scala} (94%) rename engine/flink/components-api/src/test/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/{FlinkBaseTypeInfoRegistrarTest.scala => FlinkTypeInfoRegistrarTest.scala} (71%) diff --git a/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/FlinkBaseTypeInfoRegistrar.scala b/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/FlinkTypeInfoRegistrar.scala similarity index 94% rename from engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/FlinkBaseTypeInfoRegistrar.scala rename to engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/FlinkTypeInfoRegistrar.scala index 37ecbaa1076..2a9408688a0 100644 --- a/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/FlinkBaseTypeInfoRegistrar.scala +++ b/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/FlinkTypeInfoRegistrar.scala @@ -7,18 +7,18 @@ import java.lang.reflect.Type import java.time.{LocalDate, LocalDateTime, LocalTime} import java.util -object FlinkBaseTypeInfoRegistrar { +object FlinkTypeInfoRegistrar { private case class RegistrationEntry[T, K <: TypeInfoFactory[T]](klass: Class[T], factoryClass: Class[K]) - private val baseTypes = List( + private val typesToRegister = List( RegistrationEntry(classOf[LocalDate], classOf[LocalDateTypeInfoFactory]), RegistrationEntry(classOf[LocalTime], classOf[LocalTimeTypeInfoFactory]), RegistrationEntry(classOf[LocalDateTime], classOf[LocalDateTimeTypeInfoFactory]), ) def ensureBaseTypesAreRegistered(): Unit = - baseTypes.foreach { base => + typesToRegister.foreach { base => register(base) } diff --git a/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/TypeInformationDetection.scala b/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/TypeInformationDetection.scala index 2e7fbef74ea..35a14eab36e 100644 --- a/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/TypeInformationDetection.scala +++ b/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/TypeInformationDetection.scala @@ -52,7 +52,7 @@ object TypeInformationDetection { // We use SPI to provide implementation of TypeInformationDetection because we don't want to make // implementation classes available in flink-components-api module. val instance: TypeInformationDetection = { - FlinkBaseTypeInfoRegistrar.ensureBaseTypesAreRegistered() + FlinkTypeInfoRegistrar.ensureBaseTypesAreRegistered() val classloader = Thread.currentThread().getContextClassLoader ServiceLoader diff --git a/engine/flink/components-api/src/test/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/FlinkBaseTypeInfoRegistrarTest.scala b/engine/flink/components-api/src/test/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/FlinkTypeInfoRegistrarTest.scala similarity index 71% rename from engine/flink/components-api/src/test/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/FlinkBaseTypeInfoRegistrarTest.scala rename to engine/flink/components-api/src/test/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/FlinkTypeInfoRegistrarTest.scala index 692f9adac32..71bc968124a 100644 --- a/engine/flink/components-api/src/test/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/FlinkBaseTypeInfoRegistrarTest.scala +++ b/engine/flink/components-api/src/test/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/FlinkTypeInfoRegistrarTest.scala @@ -8,15 +8,15 @@ import org.scalatest.matchers.should.Matchers import java.time.{Instant, LocalDate, LocalDateTime, LocalTime} import java.sql.{Date, Time, Timestamp} -class FlinkBaseTypeInfoRegistrarTest extends AnyFunSuite with Matchers { +class FlinkTypeInfoRegistrarTest extends AnyFunSuite with Matchers { - private val baseNuTypesMapping: Map[Class[_], TypeInformation[_]] = Map( + private val nuTypesMapping: Map[Class[_], TypeInformation[_]] = Map( classOf[LocalDate] -> Types.LOCAL_DATE, classOf[LocalTime] -> Types.LOCAL_TIME, classOf[LocalDateTime] -> Types.LOCAL_DATE_TIME, ) - private val baseFlinkTypesMapping: Map[Class[_], TypeInformation[_]] = Map( + private val flinkTypesMapping: Map[Class[_], TypeInformation[_]] = Map( classOf[String] -> Types.STRING, classOf[Boolean] -> Types.BOOLEAN, classOf[Byte] -> Types.BYTE, @@ -34,24 +34,24 @@ class FlinkBaseTypeInfoRegistrarTest extends AnyFunSuite with Matchers { classOf[Timestamp] -> Types.SQL_TIMESTAMP, ) - test("Looking for TypeInformation for a NU base class should return a GenericTypeInfo") { - baseNuTypesMapping.foreach { case (klass, _) => + test("Looking for TypeInformation for a NU types should return a GenericTypeInfo") { + nuTypesMapping.foreach { case (klass, _) => val typeInfo = TypeInformation.of(klass) typeInfo shouldBe new GenericTypeInfo(klass) } } - test("Looking for TypeInformation for a NU base class with registrar should return a specific TypeInformation") { - FlinkBaseTypeInfoRegistrar.ensureBaseTypesAreRegistered() + test("Looking for TypeInformation for a NU types with registrar should return a specific TypeInformation") { + FlinkTypeInfoRegistrar.ensureBaseTypesAreRegistered() - baseNuTypesMapping.foreach { case (klass, expected) => + nuTypesMapping.foreach { case (klass, expected) => val typeInfo = TypeInformation.of(klass) typeInfo shouldBe expected } } test("Make sure that the other types have specific TypeInformation") { - baseFlinkTypesMapping.foreach { case (klass, expected) => + flinkTypesMapping.foreach { case (klass, expected) => val typeInfo = TypeInformation.of(klass) typeInfo shouldBe expected } diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/ExecutionConfigPreparer.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/ExecutionConfigPreparer.scala index 8682353951a..e605dfc5a4d 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/ExecutionConfigPreparer.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/ExecutionConfigPreparer.scala @@ -9,7 +9,7 @@ import pl.touk.nussknacker.engine.ModelData import pl.touk.nussknacker.engine.api.namespaces.NamingStrategy import pl.touk.nussknacker.engine.api.{JobData, ProcessVersion} import pl.touk.nussknacker.engine.deployment.DeploymentData -import pl.touk.nussknacker.engine.flink.api.typeinformation.FlinkBaseTypeInfoRegistrar +import pl.touk.nussknacker.engine.flink.api.typeinformation.FlinkTypeInfoRegistrar import pl.touk.nussknacker.engine.flink.api.{NamespaceMetricsTags, NkGlobalParameters} import pl.touk.nussknacker.engine.process.util.Serializers @@ -101,7 +101,7 @@ object ExecutionConfigPreparer extends LazyLogging { override def prepareExecutionConfig( config: ExecutionConfig )(jobData: JobData, deploymentData: DeploymentData): Unit = { - FlinkBaseTypeInfoRegistrar.ensureBaseTypesAreRegistered() + FlinkTypeInfoRegistrar.ensureBaseTypesAreRegistered() Serializers.registerSerializers(modelData, config) if (enableObjectReuse) { config.enableObjectReuse()