Skip to content

Commit

Permalink
Improvement: FlinkBaseTypeInfoRegister mechanism
Browse files Browse the repository at this point in the history
  • Loading branch information
lciolecki committed Oct 29, 2024
1 parent c3904d2 commit df625b6
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 25 deletions.
4 changes: 3 additions & 1 deletion docs/Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@
* [#6988](https://github.com/TouK/nussknacker/pull/6988) Remove unused API classes: `MultiMap`, `TimestampedEvictableStateFunction`
* [#7000](https://github.com/TouK/nussknacker/pull/7000) Show all possible options for dictionary editor on open.
* [#6979](https://github.com/TouK/nussknacker/pull/6979) Introduces an activities panel that provides information about all system activities.
* [#7058](https://github.com/TouK/nussknacker/pull/7058) Performance optimization: Add missing Flink TypeInformation for better serialization
* Performance optimization:
* [#7058](https://github.com/TouK/nussknacker/pull/7058) Add missing Flink TypeInformation for better serialization
* [#7097](https://github.com/TouK/nussknacker/pull/7097) Flink base types registration mechanism

## 1.17

Expand Down
6 changes: 5 additions & 1 deletion docs/MigrationGuide.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,14 @@ To see the biggest differences please consult the [changelog](Changelog.md).
want to keep using Flink pre-1.19 with current Nussknacker, please refer to compatibility providing plugins in
https://github.com/TouK/nussknacker-flink-compatibility.

* [#7058](https://github.com/TouK/nussknacker/pull/7058) Performance optimization: Add missing Flink TypeInformation for better serialization
* Performance optimization:
* [#7058](https://github.com/TouK/nussknacker/pull/7058) Add missing Flink TypeInformation for better serialization
* In case of using base (bounded and unbounded) Flink components state will be probably not compatible
* `FlinkCustomNodeContext.typeInformationDetection` has been removed, please use `TypeInformationDetection.instance` instead
* `FlinkCustomNodeContext.forCustomContext` has been removed, please use `TypeInformationDetection.instance.forValueWithContext` instead
* [#7097](https://github.com/TouK/nussknacker/pull/7097) Flink base types registration mechanism
* In case of using types: java.time.LocalDate, java.time.LocalTime, java.time.LocalDateTime, java.time.Instant,
java.sql.Date, java.sql.Time, java.sql.Timestamp with CaseClassTypeInfo mechanism, state probably will be lost

### Configuration changes
* [#6979](https://github.com/TouK/nussknacker/pull/6979) Add `type: "activities-panel"` to the `processToolbarConfig` which replaces removed `{ type: "versions-panel" }` `{ type: "comments-panel" }` and `{ type: "attachments-panel" }`
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package pl.touk.nussknacker.engine.flink.api.typeinformation

import org.apache.flink.api.common.typeinfo.{TypeInfoFactory, TypeInformation, Types}
import org.apache.flink.api.java.typeutils.TypeExtractor

import java.lang.reflect.Type
import java.sql.{Date => SqlDate, Time => SqlTime, Timestamp => SqlTimestamp}
import java.time.{Instant, LocalDate, LocalDateTime, LocalTime}
import java.util

object FlinkBaseTypeInfoRegister {

private case class Base[T](klass: Class[T], factory: TypeInfoFactory[T]) {
lazy val factoryClass: Class[_ <: TypeInfoFactory[T]] = factory.getClass
}

private val baseTypes = List(
Base(classOf[LocalDate], new LocalDateTypeFactory),
Base(classOf[LocalTime], new LocalTimeTypeFactory),
Base(classOf[LocalDateTime], new LocalDateTimeTypeFactory),
Base(classOf[Instant], new InstantTypeFactory),
Base(classOf[SqlDate], new SqlDateTypeFactory),
Base(classOf[SqlTime], new SqlTimeTypeFactory),
Base(classOf[SqlTimestamp], new SqlTimestampTypeFactory),
)

def makeSureBaseTypesAreRegistered(): Unit =
baseTypes.foreach { base =>
register(base)
}

private def register(base: Base[_]): Unit = {
val opt = Option(TypeExtractor.getTypeInfoFactory(base.klass))
if (opt.isEmpty) {
TypeExtractor.registerFactory(base.klass, base.factoryClass)
}
}

class LocalDateTypeFactory extends TypeInfoFactory[LocalDate] {

override def createTypeInfo(
t: Type,
genericParameters: util.Map[String, TypeInformation[_]]
): TypeInformation[LocalDate] =
Types.LOCAL_DATE

}

class LocalTimeTypeFactory extends TypeInfoFactory[LocalTime] {

override def createTypeInfo(
t: Type,
genericParameters: util.Map[String, TypeInformation[_]]
): TypeInformation[LocalTime] =
Types.LOCAL_TIME

}

class LocalDateTimeTypeFactory extends TypeInfoFactory[LocalDateTime] {

override def createTypeInfo(
t: Type,
genericParameters: util.Map[String, TypeInformation[_]]
): TypeInformation[LocalDateTime] =
Types.LOCAL_DATE_TIME

}

class InstantTypeFactory extends TypeInfoFactory[Instant] {

override def createTypeInfo(
t: Type,
genericParameters: util.Map[String, TypeInformation[_]]
): TypeInformation[Instant] =
Types.INSTANT

}

class SqlDateTypeFactory extends TypeInfoFactory[SqlDate] {

override def createTypeInfo(
t: Type,
genericParameters: util.Map[String, TypeInformation[_]]
): TypeInformation[SqlDate] =
Types.SQL_DATE

}

class SqlTimeTypeFactory extends TypeInfoFactory[SqlTime] {

override def createTypeInfo(
t: Type,
genericParameters: util.Map[String, TypeInformation[_]]
): TypeInformation[SqlTime] =
Types.SQL_TIME

}

class SqlTimestampTypeFactory extends TypeInfoFactory[SqlTimestamp] {

override def createTypeInfo(
t: Type,
genericParameters: util.Map[String, TypeInformation[_]]
): TypeInformation[SqlTimestamp] =
Types.SQL_TIMESTAMP

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ object TypeInformationDetection {
// We use SPI to provide implementation of TypeInformationDetection because we don't want to make
// implementation classes available in flink-components-api module.
val instance: TypeInformationDetection = {
FlinkBaseTypeInfoRegister.makeSureBaseTypesAreRegistered()

val classloader = Thread.currentThread().getContextClassLoader
ServiceLoader
.load(classOf[TypeInformationDetection], classloader)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import pl.touk.nussknacker.engine.ModelData
import pl.touk.nussknacker.engine.api.namespaces.NamingStrategy
import pl.touk.nussknacker.engine.api.{JobData, ProcessVersion}
import pl.touk.nussknacker.engine.deployment.DeploymentData
import pl.touk.nussknacker.engine.flink.api.typeinformation.FlinkBaseTypeInfoRegister
import pl.touk.nussknacker.engine.flink.api.{NamespaceMetricsTags, NkGlobalParameters}
import pl.touk.nussknacker.engine.process.util.Serializers

Expand Down Expand Up @@ -100,6 +101,7 @@ object ExecutionConfigPreparer extends LazyLogging {
override def prepareExecutionConfig(
config: ExecutionConfig
)(jobData: JobData, deploymentData: DeploymentData): Unit = {
FlinkBaseTypeInfoRegister.makeSureBaseTypesAreRegistered()
Serializers.registerSerializers(modelData, config)
if (enableObjectReuse) {
config.enableObjectReuse()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,27 +29,6 @@ import pl.touk.nussknacker.engine.util.Implicits._
*/
class TypingResultAwareTypeInformationDetection extends TypeInformationDetection {

private val registeredTypeInfos: Map[TypedClass, TypeInformation[_]] = Map(
Typed.typedClass[String] -> Types.STRING,
Typed.typedClass[Boolean] -> Types.BOOLEAN,
Typed.typedClass[Byte] -> Types.BYTE,
Typed.typedClass[Short] -> Types.SHORT,
Typed.typedClass[Integer] -> Types.INT,
Typed.typedClass[Long] -> Types.LONG,
Typed.typedClass[Float] -> Types.FLOAT,
Typed.typedClass[Double] -> Types.DOUBLE,
Typed.typedClass[Character] -> Types.CHAR,
Typed.typedClass[java.math.BigDecimal] -> Types.BIG_DEC,
Typed.typedClass[java.math.BigInteger] -> Types.BIG_INT,
Typed.typedClass[java.time.LocalDate] -> Types.LOCAL_DATE,
Typed.typedClass[java.time.LocalTime] -> Types.LOCAL_TIME,
Typed.typedClass[java.time.LocalDateTime] -> Types.LOCAL_DATE_TIME,
Typed.typedClass[java.time.Instant] -> Types.INSTANT,
Typed.typedClass[java.sql.Date] -> Types.SQL_DATE,
Typed.typedClass[java.sql.Time] -> Types.SQL_TIME,
Typed.typedClass[java.sql.Timestamp] -> Types.SQL_TIMESTAMP,
)

def forContext(validationContext: ValidationContext): TypeInformation[Context] = {
val variables = forType(
Typed.record(validationContext.localVariables, Typed.typedClass[Map[String, AnyRef]])
Expand Down Expand Up @@ -82,8 +61,6 @@ class TypingResultAwareTypeInformationDetection extends TypeInformationDetection
// We generally don't use scala Maps in our runtime, but it is useful for some internal type infos: TODO move it somewhere else
case a: TypedObjectTypingResult if a.runtimeObjType.klass == classOf[Map[String, _]] =>
createScalaMapTypeInformation(a)
case a: SingleTypingResult if registeredTypeInfos.contains(a.runtimeObjType) =>
registeredTypeInfos(a.runtimeObjType)
// TODO: scala case classes are not handled nicely here... CaseClassTypeInfo is created only via macro, here Kryo is used
case a: SingleTypingResult if a.runtimeObjType.params.isEmpty =>
TypeInformation.of(a.runtimeObjType.klass)
Expand Down

0 comments on commit df625b6

Please sign in to comment.