Skip to content

Commit

Permalink
Remove useless code and add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
lciolecki committed Oct 29, 2024
1 parent bd24837 commit 466b053
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 51 deletions.
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -1643,7 +1643,8 @@ lazy val flinkComponentsApi = (project in flink("components-api"))
name := "nussknacker-flink-components-api",
libraryDependencies ++= {
Seq(
"org.apache.flink" % "flink-streaming-java" % flinkV % Provided,
"org.apache.flink" % "flink-streaming-java" % flinkV % Provided,
"org.scalatest" %% "scalatest" % scalaTestV % Test
)
}
)
Expand Down
3 changes: 1 addition & 2 deletions docs/MigrationGuide.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,7 @@ To see the biggest differences please consult the [changelog](Changelog.md).
* `FlinkCustomNodeContext.typeInformationDetection` has been removed, please use `TypeInformationDetection.instance` instead
* `FlinkCustomNodeContext.forCustomContext` has been removed, please use `TypeInformationDetection.instance.forValueWithContext` instead
* [#7097](https://github.com/TouK/nussknacker/pull/7097) Flink base types registration mechanism
* In case of using types: java.time.LocalDate, java.time.LocalTime, java.time.LocalDateTime, java.time.Instant,
java.sql.Date, java.sql.Time, java.sql.Timestamp with CaseClassTypeInfo mechanism, state probably will be lost
* In case of using types: java.time.LocalDate, java.time.LocalTime, java.time.LocalDateTime with CaseClassTypeInfo mechanism, state probably will be lost

### Configuration changes
* [#6979](https://github.com/TouK/nussknacker/pull/6979) Add `type: "activities-panel"` to the `processToolbarConfig` which replaces removed `{ type: "versions-panel" }` `{ type: "comments-panel" }` and `{ type: "attachments-panel" }`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,17 @@ import org.apache.flink.api.common.typeinfo.{TypeInfoFactory, TypeInformation, T
import org.apache.flink.api.java.typeutils.TypeExtractor

import java.lang.reflect.Type
import java.sql.{Date => SqlDate, Time => SqlTime, Timestamp => SqlTimestamp}
import java.time.{Instant, LocalDate, LocalDateTime, LocalTime}
import java.time.{LocalDate, LocalDateTime, LocalTime}
import java.util

object FlinkBaseTypeInfoRegister {

private case class Base[T, K <: TypeInfoFactory[T]](klass: Class[T], factoryClass: Class[K])
private[typeinformation] case class Base[T, K <: TypeInfoFactory[T]](klass: Class[T], factoryClass: Class[K])

private val baseTypes = List(
private[typeinformation] val baseTypes = List(
Base(classOf[LocalDate], classOf[LocalDateTypeInfoFactory]),
Base(classOf[LocalTime], classOf[LocalTimeTypeInfoFactory]),
Base(classOf[LocalDateTime], classOf[LocalDateTimeTypeInfoFactory]),
Base(classOf[Instant], classOf[InstantTypeInfoFactory]),
Base(classOf[SqlDate], classOf[SqlDateTypeInfoFactory]),
Base(classOf[SqlTime], classOf[SqlTimeTypeInfoFactory]),
Base(classOf[SqlTimestamp], classOf[SqlTimestampTypeInfoFactory]),
)

def makeSureBaseTypesAreRegistered(): Unit =
Expand Down Expand Up @@ -64,44 +59,4 @@ object FlinkBaseTypeInfoRegister {

}

class InstantTypeInfoFactory extends TypeInfoFactory[Instant] {

override def createTypeInfo(
t: Type,
genericParameters: util.Map[String, TypeInformation[_]]
): TypeInformation[Instant] =
Types.INSTANT

}

class SqlDateTypeInfoFactory extends TypeInfoFactory[SqlDate] {

override def createTypeInfo(
t: Type,
genericParameters: util.Map[String, TypeInformation[_]]
): TypeInformation[SqlDate] =
Types.SQL_DATE

}

class SqlTimeTypeInfoFactory extends TypeInfoFactory[SqlTime] {

override def createTypeInfo(
t: Type,
genericParameters: util.Map[String, TypeInformation[_]]
): TypeInformation[SqlTime] =
Types.SQL_TIME

}

class SqlTimestampTypeInfoFactory extends TypeInfoFactory[SqlTimestamp] {

override def createTypeInfo(
t: Type,
genericParameters: util.Map[String, TypeInformation[_]]
): TypeInformation[SqlTimestamp] =
Types.SQL_TIMESTAMP

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package pl.touk.nussknacker.engine.flink.api.typeinformation

import org.apache.flink.api.common.typeinfo.{TypeInformation, Types}
import org.apache.flink.api.java.typeutils.GenericTypeInfo
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers

import java.time.{Instant, LocalDate, LocalDateTime, LocalTime}
import java.sql.{Date, Time, Timestamp}

class FlinkBaseTypeInfoRegisterTest extends AnyFunSuite with Matchers {

private val reqByNuTypesMapping: Map[Class[_], TypeInformation[_]] = Map(
classOf[LocalDate] -> Types.LOCAL_DATE,
classOf[LocalTime] -> Types.LOCAL_TIME,
classOf[LocalDateTime] -> Types.LOCAL_DATE_TIME,
)

private val otherTypesMapping: Map[Class[_], TypeInformation[_]] = Map(
classOf[Instant] -> Types.INSTANT,
classOf[Date] -> Types.SQL_DATE,
classOf[Time] -> Types.SQL_TIME,
classOf[Timestamp] -> Types.SQL_TIMESTAMP,
)

test("Looking for TypeInformation for a base klass should return a GenericTypeInfo") {
FlinkBaseTypeInfoRegister.baseTypes.foreach { base =>
val typeInfo = TypeInformation.of(base.klass)
typeInfo shouldBe new GenericTypeInfo(base.klass)
}
}

test("Looking for TypeInformation for a base klass with register should return a SpecificTypeInformation") {
FlinkBaseTypeInfoRegister.makeSureBaseTypesAreRegistered()

FlinkBaseTypeInfoRegister.baseTypes.foreach { base =>
val typeInfo = TypeInformation.of(base.klass)
Some(typeInfo) shouldBe reqByNuTypesMapping.get(base.klass)
}
}

test("Verify TypeInformation.of for other types") {
otherTypesMapping.foreach { case (klass, expected) =>
val typeInfo = TypeInformation.of(klass)
typeInfo shouldBe expected
}
}

}

0 comments on commit 466b053

Please sign in to comment.