Skip to content

Commit

Permalink
[NU-1848] Work around for broken compatibility with Flink < 1.19: env…
Browse files Browse the repository at this point in the history
…ironment variable allowing to disable Flink TypeInfos
  • Loading branch information
arkadius committed Nov 6, 2024
1 parent 062fdaf commit ea72f05
Showing 1 changed file with 8 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import java.util

object FlinkTypeInfoRegistrar {

private val DisableFlinkTypesRegistrationEnvVarName = "NU_DISABLE_FLINK_TYPE_INFOS_REGISTRATION"

private case class RegistrationEntry[T, K <: TypeInfoFactory[T]](klass: Class[T], factoryClass: Class[K])

private val typesToRegister = List(
Expand All @@ -17,10 +19,13 @@ object FlinkTypeInfoRegistrar {
RegistrationEntry(classOf[LocalDateTime], classOf[LocalDateTimeTypeInfoFactory]),
)

def ensureBaseTypesAreRegistered(): Unit =
typesToRegister.foreach { base =>
register(base)
def ensureBaseTypesAreRegistered(): Unit = {
if (!Option(System.getenv(DisableFlinkTypesRegistrationEnvVarName)).exists(java.lang.Boolean.parseBoolean)) {
typesToRegister.foreach { base =>
register(base)
}
}
}

private def register(entry: RegistrationEntry[_, _ <: TypeInfoFactory[_]]): Unit = {
val opt = Option(TypeExtractor.getTypeInfoFactory(entry.klass))
Expand Down

0 comments on commit ea72f05

Please sign in to comment.