Skip to content

Commit

Permalink
Remove useless code and add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
lciolecki committed Oct 29, 2024
1 parent bd24837 commit 0b9976d
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 49 deletions.
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -1643,7 +1643,8 @@ lazy val flinkComponentsApi = (project in flink("components-api"))
name := "nussknacker-flink-components-api",
libraryDependencies ++= {
Seq(
"org.apache.flink" % "flink-streaming-java" % flinkV % Provided,
"org.apache.flink" % "flink-streaming-java" % flinkV % Provided,
"org.scalatest" %% "scalatest" % scalaTestV % Test
)
}
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,17 @@ import org.apache.flink.api.common.typeinfo.{TypeInfoFactory, TypeInformation, T
import org.apache.flink.api.java.typeutils.TypeExtractor

import java.lang.reflect.Type
import java.sql.{Date => SqlDate, Time => SqlTime, Timestamp => SqlTimestamp}
import java.time.{Instant, LocalDate, LocalDateTime, LocalTime}
import java.time.{LocalDate, LocalDateTime, LocalTime}
import java.util

object FlinkBaseTypeInfoRegister {

private case class Base[T, K <: TypeInfoFactory[T]](klass: Class[T], factoryClass: Class[K])
private[typeinformation] case class Base[T, K <: TypeInfoFactory[T]](klass: Class[T], factoryClass: Class[K])

private val baseTypes = List(
private[typeinformation] val baseTypes = List(
Base(classOf[LocalDate], classOf[LocalDateTypeInfoFactory]),
Base(classOf[LocalTime], classOf[LocalTimeTypeInfoFactory]),
Base(classOf[LocalDateTime], classOf[LocalDateTimeTypeInfoFactory]),
Base(classOf[Instant], classOf[InstantTypeInfoFactory]),
Base(classOf[SqlDate], classOf[SqlDateTypeInfoFactory]),
Base(classOf[SqlTime], classOf[SqlTimeTypeInfoFactory]),
Base(classOf[SqlTimestamp], classOf[SqlTimestampTypeInfoFactory]),
)

def makeSureBaseTypesAreRegistered(): Unit =
Expand Down Expand Up @@ -64,44 +59,4 @@ object FlinkBaseTypeInfoRegister {

}

class InstantTypeInfoFactory extends TypeInfoFactory[Instant] {

override def createTypeInfo(
t: Type,
genericParameters: util.Map[String, TypeInformation[_]]
): TypeInformation[Instant] =
Types.INSTANT

}

class SqlDateTypeInfoFactory extends TypeInfoFactory[SqlDate] {

override def createTypeInfo(
t: Type,
genericParameters: util.Map[String, TypeInformation[_]]
): TypeInformation[SqlDate] =
Types.SQL_DATE

}

class SqlTimeTypeInfoFactory extends TypeInfoFactory[SqlTime] {

override def createTypeInfo(
t: Type,
genericParameters: util.Map[String, TypeInformation[_]]
): TypeInformation[SqlTime] =
Types.SQL_TIME

}

class SqlTimestampTypeInfoFactory extends TypeInfoFactory[SqlTimestamp] {

override def createTypeInfo(
t: Type,
genericParameters: util.Map[String, TypeInformation[_]]
): TypeInformation[SqlTimestamp] =
Types.SQL_TIMESTAMP

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package pl.touk.nussknacker.engine.flink.api.typeinformation

import org.apache.flink.api.common.typeinfo.{TypeInformation, Types}
import org.apache.flink.api.java.typeutils.GenericTypeInfo
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers

import java.time.{Instant, LocalDate, LocalDateTime, LocalTime}
import java.sql.{Date, Time, Timestamp}

class FlinkBaseTypeInfoRegisterTest extends AnyFunSuite with Matchers {

private val baseTypes: Map[Class[_], TypeInformation[_]] = Map(
classOf[LocalDate] -> Types.LOCAL_DATE,
classOf[LocalTime] -> Types.LOCAL_TIME,
classOf[LocalDateTime] -> Types.LOCAL_DATE_TIME,
)

private val otherTypes: Map[Class[_], TypeInformation[_]] = Map(
classOf[Instant] -> Types.INSTANT,
classOf[Date] -> Types.SQL_DATE,
classOf[Time] -> Types.SQL_TIME,
classOf[Timestamp] -> Types.SQL_TIMESTAMP,
)

test("Looking for TypeInformation for a base klass should return a GenericTypeInfo") {
FlinkBaseTypeInfoRegister.baseTypes.foreach { base =>
val typeInfo = TypeInformation.of(base.klass)
typeInfo shouldBe new GenericTypeInfo(base.klass)
}
}

test("Looking for TypeInformation for a base klass with register should return a SpecificTypeInformation") {
FlinkBaseTypeInfoRegister.makeSureBaseTypesAreRegistered()

FlinkBaseTypeInfoRegister.baseTypes.foreach { base =>
val typeInfo = TypeInformation.of(base.klass)
Some(typeInfo) shouldBe baseTypes.get(base.klass)
}
}

test("Verify TypeInformation.of for other types") {
otherTypes.foreach { case (klass, expected) =>
val typeInfo = TypeInformation.of(klass)
typeInfo shouldBe expected
}
}

}

0 comments on commit 0b9976d

Please sign in to comment.