diff --git a/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/FlinkBaseTypeInfoRegistrar.scala b/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/FlinkTypeInfoRegistrar.scala similarity index 94% rename from engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/FlinkBaseTypeInfoRegistrar.scala rename to engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/FlinkTypeInfoRegistrar.scala index 37ecbaa1076..2a9408688a0 100644 --- a/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/FlinkBaseTypeInfoRegistrar.scala +++ b/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/FlinkTypeInfoRegistrar.scala @@ -7,18 +7,18 @@ import java.lang.reflect.Type import java.time.{LocalDate, LocalDateTime, LocalTime} import java.util -object FlinkBaseTypeInfoRegistrar { +object FlinkTypeInfoRegistrar { private case class RegistrationEntry[T, K <: TypeInfoFactory[T]](klass: Class[T], factoryClass: Class[K]) - private val baseTypes = List( + private val typesToRegister = List( RegistrationEntry(classOf[LocalDate], classOf[LocalDateTypeInfoFactory]), RegistrationEntry(classOf[LocalTime], classOf[LocalTimeTypeInfoFactory]), RegistrationEntry(classOf[LocalDateTime], classOf[LocalDateTimeTypeInfoFactory]), ) def ensureBaseTypesAreRegistered(): Unit = - baseTypes.foreach { base => + typesToRegister.foreach { base => register(base) } diff --git a/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/TypeInformationDetection.scala b/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/TypeInformationDetection.scala index 2e7fbef74ea..35a14eab36e 100644 --- a/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/TypeInformationDetection.scala +++ b/engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/TypeInformationDetection.scala @@ -52,7 +52,7 @@ object TypeInformationDetection { // We use SPI to provide implementation of TypeInformationDetection because we don't want to make // implementation classes available in flink-components-api module. val instance: TypeInformationDetection = { - FlinkBaseTypeInfoRegistrar.ensureBaseTypesAreRegistered() + FlinkTypeInfoRegistrar.ensureBaseTypesAreRegistered() val classloader = Thread.currentThread().getContextClassLoader ServiceLoader diff --git a/engine/flink/components-api/src/test/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/FlinkBaseTypeInfoRegistrarTest.scala b/engine/flink/components-api/src/test/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/FlinkTypeInfoRegistrarTest.scala similarity index 71% rename from engine/flink/components-api/src/test/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/FlinkBaseTypeInfoRegistrarTest.scala rename to engine/flink/components-api/src/test/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/FlinkTypeInfoRegistrarTest.scala index 692f9adac32..71bc968124a 100644 --- a/engine/flink/components-api/src/test/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/FlinkBaseTypeInfoRegistrarTest.scala +++ b/engine/flink/components-api/src/test/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/FlinkTypeInfoRegistrarTest.scala @@ -8,15 +8,15 @@ import org.scalatest.matchers.should.Matchers import java.time.{Instant, LocalDate, LocalDateTime, LocalTime} import java.sql.{Date, Time, Timestamp} -class FlinkBaseTypeInfoRegistrarTest extends AnyFunSuite with Matchers { +class FlinkTypeInfoRegistrarTest extends AnyFunSuite with Matchers { - private val baseNuTypesMapping: Map[Class[_], TypeInformation[_]] = Map( + private val nuTypesMapping: Map[Class[_], TypeInformation[_]] = Map( classOf[LocalDate] -> Types.LOCAL_DATE, classOf[LocalTime] -> Types.LOCAL_TIME, classOf[LocalDateTime] -> Types.LOCAL_DATE_TIME, ) - private val baseFlinkTypesMapping: Map[Class[_], TypeInformation[_]] = Map( + private val flinkTypesMapping: Map[Class[_], TypeInformation[_]] = Map( classOf[String] -> Types.STRING, classOf[Boolean] -> Types.BOOLEAN, classOf[Byte] -> Types.BYTE, @@ -34,24 +34,24 @@ class FlinkBaseTypeInfoRegistrarTest extends AnyFunSuite with Matchers { classOf[Timestamp] -> Types.SQL_TIMESTAMP, ) - test("Looking for TypeInformation for a NU base class should return a GenericTypeInfo") { - baseNuTypesMapping.foreach { case (klass, _) => + test("Looking for TypeInformation for a NU types should return a GenericTypeInfo") { + nuTypesMapping.foreach { case (klass, _) => val typeInfo = TypeInformation.of(klass) typeInfo shouldBe new GenericTypeInfo(klass) } } - test("Looking for TypeInformation for a NU base class with registrar should return a specific TypeInformation") { - FlinkBaseTypeInfoRegistrar.ensureBaseTypesAreRegistered() + test("Looking for TypeInformation for a NU types with registrar should return a specific TypeInformation") { + FlinkTypeInfoRegistrar.ensureBaseTypesAreRegistered() - baseNuTypesMapping.foreach { case (klass, expected) => + nuTypesMapping.foreach { case (klass, expected) => val typeInfo = TypeInformation.of(klass) typeInfo shouldBe expected } } test("Make sure that the other types have specific TypeInformation") { - baseFlinkTypesMapping.foreach { case (klass, expected) => + flinkTypesMapping.foreach { case (klass, expected) => val typeInfo = TypeInformation.of(klass) typeInfo shouldBe expected } diff --git a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/ExecutionConfigPreparer.scala b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/ExecutionConfigPreparer.scala index 8682353951a..e605dfc5a4d 100644 --- a/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/ExecutionConfigPreparer.scala +++ b/engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/ExecutionConfigPreparer.scala @@ -9,7 +9,7 @@ import pl.touk.nussknacker.engine.ModelData import pl.touk.nussknacker.engine.api.namespaces.NamingStrategy import pl.touk.nussknacker.engine.api.{JobData, ProcessVersion} import pl.touk.nussknacker.engine.deployment.DeploymentData -import pl.touk.nussknacker.engine.flink.api.typeinformation.FlinkBaseTypeInfoRegistrar +import pl.touk.nussknacker.engine.flink.api.typeinformation.FlinkTypeInfoRegistrar import pl.touk.nussknacker.engine.flink.api.{NamespaceMetricsTags, NkGlobalParameters} import pl.touk.nussknacker.engine.process.util.Serializers @@ -101,7 +101,7 @@ object ExecutionConfigPreparer extends LazyLogging { override def prepareExecutionConfig( config: ExecutionConfig )(jobData: JobData, deploymentData: DeploymentData): Unit = { - FlinkBaseTypeInfoRegistrar.ensureBaseTypesAreRegistered() + FlinkTypeInfoRegistrar.ensureBaseTypesAreRegistered() Serializers.registerSerializers(modelData, config) if (enableObjectReuse) { config.enableObjectReuse()