Skip to content

Commit

Permalink
Renaming
Browse files Browse the repository at this point in the history
  • Loading branch information
lciolecki committed Oct 29, 2024
1 parent 6071698 commit 2360e16
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,18 @@ import java.lang.reflect.Type
import java.time.{LocalDate, LocalDateTime, LocalTime}
import java.util

object FlinkBaseTypeInfoRegistrar {
object FlinkTypeInfoRegistrar {

private case class RegistrationEntry[T, K <: TypeInfoFactory[T]](klass: Class[T], factoryClass: Class[K])

private val baseTypes = List(
private val typesToRegister = List(
RegistrationEntry(classOf[LocalDate], classOf[LocalDateTypeInfoFactory]),
RegistrationEntry(classOf[LocalTime], classOf[LocalTimeTypeInfoFactory]),
RegistrationEntry(classOf[LocalDateTime], classOf[LocalDateTimeTypeInfoFactory]),
)

def ensureBaseTypesAreRegistered(): Unit =
baseTypes.foreach { base =>
typesToRegister.foreach { base =>
register(base)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ object TypeInformationDetection {
// We use SPI to provide implementation of TypeInformationDetection because we don't want to make
// implementation classes available in flink-components-api module.
val instance: TypeInformationDetection = {
FlinkBaseTypeInfoRegistrar.ensureBaseTypesAreRegistered()
FlinkTypeInfoRegistrar.ensureBaseTypesAreRegistered()

val classloader = Thread.currentThread().getContextClassLoader
ServiceLoader
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ import org.scalatest.matchers.should.Matchers
import java.time.{Instant, LocalDate, LocalDateTime, LocalTime}
import java.sql.{Date, Time, Timestamp}

class FlinkBaseTypeInfoRegistrarTest extends AnyFunSuite with Matchers {
class FlinkTypeInfoRegistrarTest extends AnyFunSuite with Matchers {

private val baseNuTypesMapping: Map[Class[_], TypeInformation[_]] = Map(
private val nuTypesMapping: Map[Class[_], TypeInformation[_]] = Map(
classOf[LocalDate] -> Types.LOCAL_DATE,
classOf[LocalTime] -> Types.LOCAL_TIME,
classOf[LocalDateTime] -> Types.LOCAL_DATE_TIME,
)

private val baseFlinkTypesMapping: Map[Class[_], TypeInformation[_]] = Map(
private val flinkTypesMapping: Map[Class[_], TypeInformation[_]] = Map(
classOf[String] -> Types.STRING,
classOf[Boolean] -> Types.BOOLEAN,
classOf[Byte] -> Types.BYTE,
Expand All @@ -34,24 +34,24 @@ class FlinkBaseTypeInfoRegistrarTest extends AnyFunSuite with Matchers {
classOf[Timestamp] -> Types.SQL_TIMESTAMP,
)

test("Looking for TypeInformation for a NU base class should return a GenericTypeInfo") {
baseNuTypesMapping.foreach { case (klass, _) =>
test("Looking for TypeInformation for a NU types should return a GenericTypeInfo") {
nuTypesMapping.foreach { case (klass, _) =>
val typeInfo = TypeInformation.of(klass)
typeInfo shouldBe new GenericTypeInfo(klass)
}
}

test("Looking for TypeInformation for a NU base class with registrar should return a specific TypeInformation") {
FlinkBaseTypeInfoRegistrar.ensureBaseTypesAreRegistered()
test("Looking for TypeInformation for a NU types with registrar should return a specific TypeInformation") {
FlinkTypeInfoRegistrar.ensureBaseTypesAreRegistered()

baseNuTypesMapping.foreach { case (klass, expected) =>
nuTypesMapping.foreach { case (klass, expected) =>
val typeInfo = TypeInformation.of(klass)
typeInfo shouldBe expected
}
}

test("Make sure that the other types have specific TypeInformation") {
baseFlinkTypesMapping.foreach { case (klass, expected) =>
flinkTypesMapping.foreach { case (klass, expected) =>
val typeInfo = TypeInformation.of(klass)
typeInfo shouldBe expected
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import pl.touk.nussknacker.engine.ModelData
import pl.touk.nussknacker.engine.api.namespaces.NamingStrategy
import pl.touk.nussknacker.engine.api.{JobData, ProcessVersion}
import pl.touk.nussknacker.engine.deployment.DeploymentData
import pl.touk.nussknacker.engine.flink.api.typeinformation.FlinkBaseTypeInfoRegistrar
import pl.touk.nussknacker.engine.flink.api.typeinformation.FlinkTypeInfoRegistrar
import pl.touk.nussknacker.engine.flink.api.{NamespaceMetricsTags, NkGlobalParameters}
import pl.touk.nussknacker.engine.process.util.Serializers

Expand Down Expand Up @@ -101,7 +101,7 @@ object ExecutionConfigPreparer extends LazyLogging {
override def prepareExecutionConfig(
config: ExecutionConfig
)(jobData: JobData, deploymentData: DeploymentData): Unit = {
FlinkBaseTypeInfoRegistrar.ensureBaseTypesAreRegistered()
FlinkTypeInfoRegistrar.ensureBaseTypesAreRegistered()
Serializers.registerSerializers(modelData, config)
if (enableObjectReuse) {
config.enableObjectReuse()
Expand Down

0 comments on commit 2360e16

Please sign in to comment.