diff --git a/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/FlinkTypeInfoRegistrar.scala b/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/FlinkTypeInfoRegistrar.scala index 2a9408688a0..a7b024d47df 100644 --- a/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/FlinkTypeInfoRegistrar.scala +++ b/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/FlinkTypeInfoRegistrar.scala @@ -6,29 +6,55 @@ import org.apache.flink.api.java.typeutils.TypeExtractor import java.lang.reflect.Type import java.time.{LocalDate, LocalDateTime, LocalTime} import java.util +import java.util.concurrent.atomic.AtomicBoolean +// This class contains registers TypeInfoFactory for commonly used classes in Nussknacker. +// It is a singleton as Flink's only contains a global registry for such purpose object FlinkTypeInfoRegistrar { - private case class RegistrationEntry[T, K <: TypeInfoFactory[T]](klass: Class[T], factoryClass: Class[K]) + private val typeInfoRegistrationEnabled = new AtomicBoolean(true) - private val typesToRegister = List( + private val DisableFlinkTypeInfoRegistrationEnvVarName = "NU_DISABLE_FLINK_TYPE_INFO_REGISTRATION" + + private case class RegistrationEntry[T](klass: Class[T], factoryClass: Class[_ <: TypeInfoFactory[T]]) + + private val typeInfoToRegister = List( RegistrationEntry(classOf[LocalDate], classOf[LocalDateTypeInfoFactory]), RegistrationEntry(classOf[LocalTime], classOf[LocalTimeTypeInfoFactory]), RegistrationEntry(classOf[LocalDateTime], classOf[LocalDateTimeTypeInfoFactory]), ) - def ensureBaseTypesAreRegistered(): Unit = - typesToRegister.foreach { base => - register(base) + def ensureTypeInfosAreRegistered(): Unit = { + // TypeInfo registration is available in Flink >= 1.19. For backward compatibility purpose we allow + // to disable this by either environment variable or programmatically + if (typeInfoRegistrationEnabled.get() && !typeInfoRegistrationDisabledByEnvVariable) { + typeInfoToRegister.foreach { entry => + register(entry) + } } + } + + private def typeInfoRegistrationDisabledByEnvVariable = { + Option(System.getenv(DisableFlinkTypeInfoRegistrationEnvVarName)).exists(_.toBoolean) + } - private def register(entry: RegistrationEntry[_, _ <: TypeInfoFactory[_]]): Unit = { + private def register(entry: RegistrationEntry[_]): Unit = { val opt = Option(TypeExtractor.getTypeInfoFactory(entry.klass)) if (opt.isEmpty) { TypeExtractor.registerFactory(entry.klass, entry.factoryClass) } } + // These methods are mainly for purpose of tests in nussknacker-flink-compatibility project + // It should be used in caution as it changes the global state + def enableFlinkTypeInfoRegistration(): Unit = { + typeInfoRegistrationEnabled.set(true) + } + + def disableFlinkTypeInfoRegistration(): Unit = { + typeInfoRegistrationEnabled.set(false) + } + class LocalDateTypeInfoFactory extends TypeInfoFactory[LocalDate] { override def createTypeInfo( diff --git a/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/TypeInformationDetection.scala b/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/TypeInformationDetection.scala index 35a14eab36e..50ad362f158 100644 --- a/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/TypeInformationDetection.scala +++ b/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/TypeInformationDetection.scala @@ -52,7 +52,7 @@ object TypeInformationDetection { // We use SPI to provide implementation of TypeInformationDetection because we don't want to make // implementation classes available in flink-components-api module. val instance: TypeInformationDetection = { - FlinkTypeInfoRegistrar.ensureBaseTypesAreRegistered() + FlinkTypeInfoRegistrar.ensureTypeInfosAreRegistered() val classloader = Thread.currentThread().getContextClassLoader ServiceLoader diff --git a/engine/flink/components-api/src/test/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/FlinkTypeInfoRegistrarTest.scala b/engine/flink/components-api/src/test/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/FlinkTypeInfoRegistrarTest.scala index 71bc968124a..6514ba0c21d 100644 --- a/engine/flink/components-api/src/test/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/FlinkTypeInfoRegistrarTest.scala +++ b/engine/flink/components-api/src/test/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/FlinkTypeInfoRegistrarTest.scala @@ -42,7 +42,7 @@ class FlinkTypeInfoRegistrarTest extends AnyFunSuite with Matchers { } test("Looking for TypeInformation for a NU types with registrar should return a specific TypeInformation") { - FlinkTypeInfoRegistrar.ensureBaseTypesAreRegistered() + FlinkTypeInfoRegistrar.ensureTypeInfosAreRegistered() nuTypesMapping.foreach { case (klass, expected) => val typeInfo = TypeInformation.of(klass) diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/ExecutionConfigPreparer.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/ExecutionConfigPreparer.scala index e605dfc5a4d..d614af52f67 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/ExecutionConfigPreparer.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/ExecutionConfigPreparer.scala @@ -101,7 +101,7 @@ object ExecutionConfigPreparer extends LazyLogging { override def prepareExecutionConfig( config: ExecutionConfig )(jobData: JobData, deploymentData: DeploymentData): Unit = { - FlinkTypeInfoRegistrar.ensureBaseTypesAreRegistered() + FlinkTypeInfoRegistrar.ensureTypeInfosAreRegistered() Serializers.registerSerializers(modelData, config) if (enableObjectReuse) { config.enableObjectReuse()