Skip to content

Commit

Permalink
Changes to review
Browse files Browse the repository at this point in the history
  • Loading branch information
lciolecki committed Oct 29, 2024
1 parent 466b053 commit 6071698
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,25 @@ import java.lang.reflect.Type
import java.time.{LocalDate, LocalDateTime, LocalTime}
import java.util

object FlinkBaseTypeInfoRegister {
object FlinkBaseTypeInfoRegistrar {

private[typeinformation] case class Base[T, K <: TypeInfoFactory[T]](klass: Class[T], factoryClass: Class[K])
private case class RegistrationEntry[T, K <: TypeInfoFactory[T]](klass: Class[T], factoryClass: Class[K])

private[typeinformation] val baseTypes = List(
Base(classOf[LocalDate], classOf[LocalDateTypeInfoFactory]),
Base(classOf[LocalTime], classOf[LocalTimeTypeInfoFactory]),
Base(classOf[LocalDateTime], classOf[LocalDateTimeTypeInfoFactory]),
private val baseTypes = List(
RegistrationEntry(classOf[LocalDate], classOf[LocalDateTypeInfoFactory]),
RegistrationEntry(classOf[LocalTime], classOf[LocalTimeTypeInfoFactory]),
RegistrationEntry(classOf[LocalDateTime], classOf[LocalDateTimeTypeInfoFactory]),
)

def makeSureBaseTypesAreRegistered(): Unit =
def ensureBaseTypesAreRegistered(): Unit =
baseTypes.foreach { base =>
register(base)
}

private def register(base: Base[_, _ <: TypeInfoFactory[_]]): Unit = {
val opt = Option(TypeExtractor.getTypeInfoFactory(base.klass))
private def register(entry: RegistrationEntry[_, _ <: TypeInfoFactory[_]]): Unit = {
val opt = Option(TypeExtractor.getTypeInfoFactory(entry.klass))
if (opt.isEmpty) {
TypeExtractor.registerFactory(base.klass, base.factoryClass)
TypeExtractor.registerFactory(entry.klass, entry.factoryClass)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ object TypeInformationDetection {
// We use SPI to provide implementation of TypeInformationDetection because we don't want to make
// implementation classes available in flink-components-api module.
val instance: TypeInformationDetection = {
FlinkBaseTypeInfoRegister.makeSureBaseTypesAreRegistered()
FlinkBaseTypeInfoRegistrar.ensureBaseTypesAreRegistered()

val classloader = Thread.currentThread().getContextClassLoader
ServiceLoader
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package pl.touk.nussknacker.engine.flink.api.typeinformation

import org.apache.flink.api.common.typeinfo.{TypeInformation, Types}
import org.apache.flink.api.java.typeutils.GenericTypeInfo
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers

import java.time.{Instant, LocalDate, LocalDateTime, LocalTime}
import java.sql.{Date, Time, Timestamp}

class FlinkBaseTypeInfoRegistrarTest extends AnyFunSuite with Matchers {

private val baseNuTypesMapping: Map[Class[_], TypeInformation[_]] = Map(
classOf[LocalDate] -> Types.LOCAL_DATE,
classOf[LocalTime] -> Types.LOCAL_TIME,
classOf[LocalDateTime] -> Types.LOCAL_DATE_TIME,
)

private val baseFlinkTypesMapping: Map[Class[_], TypeInformation[_]] = Map(
classOf[String] -> Types.STRING,
classOf[Boolean] -> Types.BOOLEAN,
classOf[Byte] -> Types.BYTE,
classOf[Short] -> Types.SHORT,
classOf[Integer] -> Types.INT,
classOf[Long] -> Types.LONG,
classOf[Float] -> Types.FLOAT,
classOf[Double] -> Types.DOUBLE,
classOf[Character] -> Types.CHAR,
classOf[java.math.BigDecimal] -> Types.BIG_DEC,
classOf[java.math.BigInteger] -> Types.BIG_INT,
classOf[Instant] -> Types.INSTANT,
classOf[Date] -> Types.SQL_DATE,
classOf[Time] -> Types.SQL_TIME,
classOf[Timestamp] -> Types.SQL_TIMESTAMP,
)

test("Looking for TypeInformation for a NU base class should return a GenericTypeInfo") {
baseNuTypesMapping.foreach { case (klass, _) =>
val typeInfo = TypeInformation.of(klass)
typeInfo shouldBe new GenericTypeInfo(klass)
}
}

test("Looking for TypeInformation for a NU base class with registrar should return a specific TypeInformation") {
FlinkBaseTypeInfoRegistrar.ensureBaseTypesAreRegistered()

baseNuTypesMapping.foreach { case (klass, expected) =>
val typeInfo = TypeInformation.of(klass)
typeInfo shouldBe expected
}
}

test("Make sure that the other types have specific TypeInformation") {
baseFlinkTypesMapping.foreach { case (klass, expected) =>
val typeInfo = TypeInformation.of(klass)
typeInfo shouldBe expected
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import pl.touk.nussknacker.engine.ModelData
import pl.touk.nussknacker.engine.api.namespaces.NamingStrategy
import pl.touk.nussknacker.engine.api.{JobData, ProcessVersion}
import pl.touk.nussknacker.engine.deployment.DeploymentData
import pl.touk.nussknacker.engine.flink.api.typeinformation.FlinkBaseTypeInfoRegister
import pl.touk.nussknacker.engine.flink.api.typeinformation.FlinkBaseTypeInfoRegistrar
import pl.touk.nussknacker.engine.flink.api.{NamespaceMetricsTags, NkGlobalParameters}
import pl.touk.nussknacker.engine.process.util.Serializers

Expand Down Expand Up @@ -101,7 +101,7 @@ object ExecutionConfigPreparer extends LazyLogging {
override def prepareExecutionConfig(
config: ExecutionConfig
)(jobData: JobData, deploymentData: DeploymentData): Unit = {
FlinkBaseTypeInfoRegister.makeSureBaseTypesAreRegistered()
FlinkBaseTypeInfoRegistrar.ensureBaseTypesAreRegistered()
Serializers.registerSerializers(modelData, config)
if (enableObjectReuse) {
config.enableObjectReuse()
Expand Down

0 comments on commit 6071698

Please sign in to comment.