diff --git a/docs/Changelog.md b/docs/Changelog.md index 0d5cb549075..964186e08fe 100644 --- a/docs/Changelog.md +++ b/docs/Changelog.md @@ -77,7 +77,9 @@ * [#6988](https://github.com/TouK/nussknacker/pull/6988) Remove unused API classes: `MultiMap`, `TimestampedEvictableStateFunction` * [#7000](https://github.com/TouK/nussknacker/pull/7000) Show all possible options for dictionary editor on open. * [#6979](https://github.com/TouK/nussknacker/pull/6979) Introduces an activities panel that provides information about all system activities. -* [#7058](https://github.com/TouK/nussknacker/pull/7058) Performance optimization: Add missing Flink TypeInformation for better serialization +* Performance optimization: + * [#7058](https://github.com/TouK/nussknacker/pull/7058) Add missing Flink TypeInformation for better serialization + * [#7097](https://github.com/TouK/nussknacker/pull/7097) Flink base types registration mechanism ## 1.17 diff --git a/docs/MigrationGuide.md b/docs/MigrationGuide.md index 18e00b7c811..9e28f781664 100644 --- a/docs/MigrationGuide.md +++ b/docs/MigrationGuide.md @@ -75,10 +75,14 @@ To see the biggest differences please consult the [changelog](Changelog.md). want to keep using Flink pre-1.19 with current Nussknacker, please refer to compatibility providing plugins in https://github.com/TouK/nussknacker-flink-compatibility. -* [#7058](https://github.com/TouK/nussknacker/pull/7058) Performance optimization: Add missing Flink TypeInformation for better serialization +* Performance optimization: + * [#7058](https://github.com/TouK/nussknacker/pull/7058) Add missing Flink TypeInformation for better serialization * In case of using base (bounded and unbounded) Flink components state will be probably not compatible * `FlinkCustomNodeContext.typeInformationDetection` has been removed, please use `TypeInformationDetection.instance` instead * `FlinkCustomNodeContext.forCustomContext` has been removed, please use `TypeInformationDetection.instance.forValueWithContext` instead + * [#7097](https://github.com/TouK/nussknacker/pull/7097) Flink base types registration mechanism + * In case of using types: java.time.LocalDate, java.time.LocalTime, java.time.LocalDateTime, java.time.Instant, + java.sql.Date, java.sql.Time, java.sql.Timestamp with CaseClassTypeInfo mechanism, state probably will be lost ### Configuration changes * [#6979](https://github.com/TouK/nussknacker/pull/6979) Add `type: "activities-panel"` to the `processToolbarConfig` which replaces removed `{ type: "versions-panel" }` `{ type: "comments-panel" }` and `{ type: "attachments-panel" }` diff --git a/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/FlinkBaseTypeInfoRegister.scala b/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/FlinkBaseTypeInfoRegister.scala new file mode 100644 index 00000000000..19aeaebc758 --- /dev/null +++ b/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/FlinkBaseTypeInfoRegister.scala @@ -0,0 +1,109 @@ +package pl.touk.nussknacker.engine.flink.api.typeinformation + +import org.apache.flink.api.common.typeinfo.{TypeInfoFactory, TypeInformation, Types} +import org.apache.flink.api.java.typeutils.TypeExtractor + +import java.lang.reflect.Type +import java.sql.{Date => SqlDate, Time => SqlTime, Timestamp => SqlTimestamp} +import java.time.{Instant, LocalDate, LocalDateTime, LocalTime} +import java.util + +object FlinkBaseTypeInfoRegister { + + private case class Base[T](klass: Class[T], factory: TypeInfoFactory[T]) { + lazy val factoryClass: Class[_ <: TypeInfoFactory[T]] = factory.getClass + } + + private val baseTypes = List( + Base(classOf[LocalDate], new LocalDateTypeFactory), + Base(classOf[LocalTime], new LocalTimeTypeFactory), + Base(classOf[LocalDateTime], new LocalDateTimeTypeFactory), + Base(classOf[Instant], new InstantTypeFactory), + Base(classOf[SqlDate], new SqlDateTypeFactory), + Base(classOf[SqlTime], new SqlTimeTypeFactory), + Base(classOf[SqlTimestamp], new SqlTimestampTypeFactory), + ) + + def makeSureBaseTypesAreRegistered(): Unit = + baseTypes.foreach { base => + register(base) + } + + private def register(base: Base[_]): Unit = { + val opt = Option(TypeExtractor.getTypeInfoFactory(base.klass)) + if (opt.isEmpty) { + TypeExtractor.registerFactory(base.klass, base.factoryClass) + } + } + + class LocalDateTypeFactory extends TypeInfoFactory[LocalDate] { + + override def createTypeInfo( + t: Type, + genericParameters: util.Map[String, TypeInformation[_]] + ): TypeInformation[LocalDate] = + Types.LOCAL_DATE + + } + + class LocalTimeTypeFactory extends TypeInfoFactory[LocalTime] { + + override def createTypeInfo( + t: Type, + genericParameters: util.Map[String, TypeInformation[_]] + ): TypeInformation[LocalTime] = + Types.LOCAL_TIME + + } + + class LocalDateTimeTypeFactory extends TypeInfoFactory[LocalDateTime] { + + override def createTypeInfo( + t: Type, + genericParameters: util.Map[String, TypeInformation[_]] + ): TypeInformation[LocalDateTime] = + Types.LOCAL_DATE_TIME + + } + + class InstantTypeFactory extends TypeInfoFactory[Instant] { + + override def createTypeInfo( + t: Type, + genericParameters: util.Map[String, TypeInformation[_]] + ): TypeInformation[Instant] = + Types.INSTANT + + } + + class SqlDateTypeFactory extends TypeInfoFactory[SqlDate] { + + override def createTypeInfo( + t: Type, + genericParameters: util.Map[String, TypeInformation[_]] + ): TypeInformation[SqlDate] = + Types.SQL_DATE + + } + + class SqlTimeTypeFactory extends TypeInfoFactory[SqlTime] { + + override def createTypeInfo( + t: Type, + genericParameters: util.Map[String, TypeInformation[_]] + ): TypeInformation[SqlTime] = + Types.SQL_TIME + + } + + class SqlTimestampTypeFactory extends TypeInfoFactory[SqlTimestamp] { + + override def createTypeInfo( + t: Type, + genericParameters: util.Map[String, TypeInformation[_]] + ): TypeInformation[SqlTimestamp] = + Types.SQL_TIMESTAMP + + } + +} diff --git a/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/TypeInformationDetection.scala b/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/TypeInformationDetection.scala index 126b8c8f305..9e7fabe879a 100644 --- a/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/TypeInformationDetection.scala +++ b/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/TypeInformationDetection.scala @@ -52,6 +52,8 @@ object TypeInformationDetection { // We use SPI to provide implementation of TypeInformationDetection because we don't want to make // implementation classes available in flink-components-api module. val instance: TypeInformationDetection = { + FlinkBaseTypeInfoRegister.makeSureBaseTypesAreRegistered() + val classloader = Thread.currentThread().getContextClassLoader ServiceLoader .load(classOf[TypeInformationDetection], classloader) diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/ExecutionConfigPreparer.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/ExecutionConfigPreparer.scala index 75e95832428..30d57828379 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/ExecutionConfigPreparer.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/ExecutionConfigPreparer.scala @@ -9,6 +9,7 @@ import pl.touk.nussknacker.engine.ModelData import pl.touk.nussknacker.engine.api.namespaces.NamingStrategy import pl.touk.nussknacker.engine.api.{JobData, ProcessVersion} import pl.touk.nussknacker.engine.deployment.DeploymentData +import pl.touk.nussknacker.engine.flink.api.typeinformation.FlinkBaseTypeInfoRegister import pl.touk.nussknacker.engine.flink.api.{NamespaceMetricsTags, NkGlobalParameters} import pl.touk.nussknacker.engine.process.util.Serializers @@ -100,6 +101,7 @@ object ExecutionConfigPreparer extends LazyLogging { override def prepareExecutionConfig( config: ExecutionConfig )(jobData: JobData, deploymentData: DeploymentData): Unit = { + FlinkBaseTypeInfoRegister.makeSureBaseTypesAreRegistered() Serializers.registerSerializers(modelData, config) if (enableObjectReuse) { config.enableObjectReuse() diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/TypingResultAwareTypeInformationDetection.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/TypingResultAwareTypeInformationDetection.scala index baadb2baa59..396b668266d 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/TypingResultAwareTypeInformationDetection.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/TypingResultAwareTypeInformationDetection.scala @@ -29,27 +29,6 @@ import pl.touk.nussknacker.engine.util.Implicits._ */ class TypingResultAwareTypeInformationDetection extends TypeInformationDetection { - private val registeredTypeInfos: Map[TypedClass, TypeInformation[_]] = Map( - Typed.typedClass[String] -> Types.STRING, - Typed.typedClass[Boolean] -> Types.BOOLEAN, - Typed.typedClass[Byte] -> Types.BYTE, - Typed.typedClass[Short] -> Types.SHORT, - Typed.typedClass[Integer] -> Types.INT, - Typed.typedClass[Long] -> Types.LONG, - Typed.typedClass[Float] -> Types.FLOAT, - Typed.typedClass[Double] -> Types.DOUBLE, - Typed.typedClass[Character] -> Types.CHAR, - Typed.typedClass[java.math.BigDecimal] -> Types.BIG_DEC, - Typed.typedClass[java.math.BigInteger] -> Types.BIG_INT, - Typed.typedClass[java.time.LocalDate] -> Types.LOCAL_DATE, - Typed.typedClass[java.time.LocalTime] -> Types.LOCAL_TIME, - Typed.typedClass[java.time.LocalDateTime] -> Types.LOCAL_DATE_TIME, - Typed.typedClass[java.time.Instant] -> Types.INSTANT, - Typed.typedClass[java.sql.Date] -> Types.SQL_DATE, - Typed.typedClass[java.sql.Time] -> Types.SQL_TIME, - Typed.typedClass[java.sql.Timestamp] -> Types.SQL_TIMESTAMP, - ) - def forContext(validationContext: ValidationContext): TypeInformation[Context] = { val variables = forType( Typed.record(validationContext.localVariables, Typed.typedClass[Map[String, AnyRef]]) @@ -82,8 +61,6 @@ class TypingResultAwareTypeInformationDetection extends TypeInformationDetection // We generally don't use scala Maps in our runtime, but it is useful for some internal type infos: TODO move it somewhere else case a: TypedObjectTypingResult if a.runtimeObjType.klass == classOf[Map[String, _]] => createScalaMapTypeInformation(a) - case a: SingleTypingResult if registeredTypeInfos.contains(a.runtimeObjType) => - registeredTypeInfos(a.runtimeObjType) // TODO: scala case classes are not handled nicely here... CaseClassTypeInfo is created only via macro, here Kryo is used case a: SingleTypingResult if a.runtimeObjType.params.isEmpty => TypeInformation.of(a.runtimeObjType.klass) diff --git a/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/typeinformation/TypingResultAwareTypeInformationDetectionSpec.scala b/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/typeinformation/TypingResultAwareTypeInformationDetectionSpec.scala index af15bbcf529..8fb211bbce9 100644 --- a/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/typeinformation/TypingResultAwareTypeInformationDetectionSpec.scala +++ b/engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/typeinformation/TypingResultAwareTypeInformationDetectionSpec.scala @@ -23,6 +23,7 @@ import pl.touk.nussknacker.engine.api.context.ValidationContext import pl.touk.nussknacker.engine.api.typed.typing.Typed import pl.touk.nussknacker.engine.api.{Context, ValueWithContext} import pl.touk.nussknacker.engine.flink.api.typeinfo.caseclass.ScalaCaseClassSerializer +import pl.touk.nussknacker.engine.flink.api.typeinformation.FlinkBaseTypeInfoRegister import pl.touk.nussknacker.engine.flink.serialization.FlinkTypeInformationSerializationMixin import pl.touk.nussknacker.engine.process.typeinformation.internal.typedobject._ import pl.touk.nussknacker.engine.process.typeinformation.testTypedObject.CustomTypedObject