Skip to content

Commit

Permalink
[NU-1848] Work around for broken compatibility with Flink < 1.19: env…
Browse files Browse the repository at this point in the history
…ironment variable allowing to disable Flink TypeInfos (#7128)

* [NU-1848] Work around for broken compatibility with Flink < 1.19: environment variable allowing to disable Flink TypeInfos

* Ability to disable type info registration programmatically
  • Loading branch information
arkadius authored Nov 6, 2024
1 parent 062fdaf commit 1f1b395
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,55 @@ import org.apache.flink.api.java.typeutils.TypeExtractor
import java.lang.reflect.Type
import java.time.{LocalDate, LocalDateTime, LocalTime}
import java.util
import java.util.concurrent.atomic.AtomicBoolean

// This class contains registers TypeInfoFactory for commonly used classes in Nussknacker.
// It is a singleton as Flink's only contains a global registry for such purpose
object FlinkTypeInfoRegistrar {

private case class RegistrationEntry[T, K <: TypeInfoFactory[T]](klass: Class[T], factoryClass: Class[K])
private val typeInfoRegistrationEnabled = new AtomicBoolean(true)

private val typesToRegister = List(
private val DisableFlinkTypeInfoRegistrationEnvVarName = "NU_DISABLE_FLINK_TYPE_INFO_REGISTRATION"

private case class RegistrationEntry[T](klass: Class[T], factoryClass: Class[_ <: TypeInfoFactory[T]])

private val typeInfoToRegister = List(
RegistrationEntry(classOf[LocalDate], classOf[LocalDateTypeInfoFactory]),
RegistrationEntry(classOf[LocalTime], classOf[LocalTimeTypeInfoFactory]),
RegistrationEntry(classOf[LocalDateTime], classOf[LocalDateTimeTypeInfoFactory]),
)

def ensureBaseTypesAreRegistered(): Unit =
typesToRegister.foreach { base =>
register(base)
def ensureTypeInfosAreRegistered(): Unit = {
// TypeInfo registration is available in Flink >= 1.19. For backward compatibility purpose we allow
// to disable this by either environment variable or programmatically
if (typeInfoRegistrationEnabled.get() && !typeInfoRegistrationDisabledByEnvVariable) {
typeInfoToRegister.foreach { entry =>
register(entry)
}
}
}

private def typeInfoRegistrationDisabledByEnvVariable = {
Option(System.getenv(DisableFlinkTypeInfoRegistrationEnvVarName)).exists(_.toBoolean)
}

private def register(entry: RegistrationEntry[_, _ <: TypeInfoFactory[_]]): Unit = {
private def register(entry: RegistrationEntry[_]): Unit = {
val opt = Option(TypeExtractor.getTypeInfoFactory(entry.klass))
if (opt.isEmpty) {
TypeExtractor.registerFactory(entry.klass, entry.factoryClass)
}
}

// These methods are mainly for purpose of tests in nussknacker-flink-compatibility project
// It should be used in caution as it changes the global state
def enableFlinkTypeInfoRegistration(): Unit = {
typeInfoRegistrationEnabled.set(true)
}

def disableFlinkTypeInfoRegistration(): Unit = {
typeInfoRegistrationEnabled.set(false)
}

class LocalDateTypeInfoFactory extends TypeInfoFactory[LocalDate] {

override def createTypeInfo(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ object TypeInformationDetection {
// We use SPI to provide implementation of TypeInformationDetection because we don't want to make
// implementation classes available in flink-components-api module.
val instance: TypeInformationDetection = {
FlinkTypeInfoRegistrar.ensureBaseTypesAreRegistered()
FlinkTypeInfoRegistrar.ensureTypeInfosAreRegistered()

val classloader = Thread.currentThread().getContextClassLoader
ServiceLoader
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class FlinkTypeInfoRegistrarTest extends AnyFunSuite with Matchers {
}

test("Looking for TypeInformation for a NU types with registrar should return a specific TypeInformation") {
FlinkTypeInfoRegistrar.ensureBaseTypesAreRegistered()
FlinkTypeInfoRegistrar.ensureTypeInfosAreRegistered()

nuTypesMapping.foreach { case (klass, expected) =>
val typeInfo = TypeInformation.of(klass)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ object ExecutionConfigPreparer extends LazyLogging {
override def prepareExecutionConfig(
config: ExecutionConfig
)(jobData: JobData, deploymentData: DeploymentData): Unit = {
FlinkTypeInfoRegistrar.ensureBaseTypesAreRegistered()
FlinkTypeInfoRegistrar.ensureTypeInfosAreRegistered()
Serializers.registerSerializers(modelData, config)
if (enableObjectReuse) {
config.enableObjectReuse()
Expand Down

0 comments on commit 1f1b395

Please sign in to comment.