Skip to content

Commit

Permalink
Improvements: Add missing Flink TypeInformation for proper serializat…
Browse files Browse the repository at this point in the history
…ion (#7058)
  • Loading branch information
lciolecki authored Oct 28, 2024
1 parent 1b454d2 commit a935294
Show file tree
Hide file tree
Showing 22 changed files with 381 additions and 109 deletions.
1 change: 1 addition & 0 deletions docs/Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
* [#6988](https://github.com/TouK/nussknacker/pull/6988) Remove unused API classes: `MultiMap`, `TimestampedEvictableStateFunction`
* [#7000](https://github.com/TouK/nussknacker/pull/7000) Show all possible options for dictionary editor on open.
* [#6979](https://github.com/TouK/nussknacker/pull/6979) Introduces an activities panel that provides information about all system activities.
* [#7058](https://github.com/TouK/nussknacker/pull/7058) Performance optimization: Add missing Flink TypeInformation for better serialization

## 1.17

Expand Down
5 changes: 5 additions & 0 deletions docs/MigrationGuide.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ To see the biggest differences please consult the [changelog](Changelog.md).
* [#6952](https://github.com/TouK/nussknacker/pull/6952) Improvement: TypeInformation support for scala.Option:
If you used CaseClassTypeInfoFactory with case classes that contain the Option type, the state won't be restored after the upgrade.

* [#7058](https://github.com/TouK/nussknacker/pull/7058) Performance optimization: Add missing Flink TypeInformation for better serialization
* In case of using base (bounded and unbounded) Flink components state will be probably not compatible
* `FlinkCustomNodeContext.typeInformationDetection` has been removed, please use `TypeInformationDetection.instance` instead
* `FlinkCustomNodeContext.forCustomContext` has been removed, please use `TypeInformationDetection.instance.forValueWithContext` instead

### Configuration changes
* [#6979](https://github.com/TouK/nussknacker/pull/6979) Add `type: "activities-panel"` to The `processToolbarConfig` which replace deprecated `{ type: "versions-panel" }` `{ type: "comments-panel" }` and `{ type: "attachments-panel" }`

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,25 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.typeutils.TypeSerializer
import org.apache.flink.streaming.api.datastream.{DataStream, SingleOutputStreamOperator}
import org.apache.flink.streaming.api.functions.co.CoMapFunction
import pl.touk.nussknacker.engine.api.{Context, LazyParameter, ValueWithContext}
import pl.touk.nussknacker.engine.flink.api.process.FlinkCustomNodeContext

object DataStreamImplicits {

implicit class DataStreamExtension[T](stream: DataStream[T]) {
implicit class DataStreamWithContextExtension(stream: DataStream[Context]) {

def flatMap[T <: AnyRef](
lazyParam: LazyParameter[T]
)(implicit ctx: FlinkCustomNodeContext): SingleOutputStreamOperator[ValueWithContext[T]] =
stream
.flatMap(
ctx.lazyParameterHelper.lazyMapFunction(lazyParam),
ctx.valueWithContextInfo.forType[T](lazyParam.returnType)
)

}

implicit class DataStreamExtension[T <: AnyRef](stream: DataStream[T]) {

@silent("deprecated")
def mapWithState[R: TypeInformation, S: TypeInformation](fun: (T, Option[S]) => (R, Option[S])): DataStream[R] = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
package pl.touk.nussknacker.engine.flink.api.process

import com.github.ghik.silencer.silent
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.api.common.typeinfo.TypeInformation
import pl.touk.nussknacker.engine.api.component.NodeDeploymentData
import pl.touk.nussknacker.engine.api.context.ValidationContext
import pl.touk.nussknacker.engine.api.process.ComponentUseCase
import pl.touk.nussknacker.engine.api.runtimecontext.EngineRuntimeContext
import pl.touk.nussknacker.engine.api.typed.typing.{TypingResult, Unknown}
import pl.touk.nussknacker.engine.api.typed.typing.{Typed, TypingResult, Unknown}
import pl.touk.nussknacker.engine.api.{Context, JobData, MetaData, ValueWithContext}
import pl.touk.nussknacker.engine.flink.api.NkGlobalParameters
import pl.touk.nussknacker.engine.flink.api.exception.ExceptionHandler
import pl.touk.nussknacker.engine.flink.api.typeinformation.TypeInformationDetection

import scala.concurrent.duration.FiniteDuration
import scala.reflect.ClassTag

@silent("deprecated")
case class FlinkCustomNodeContext(
jobData: JobData,
// TODO: it can be used in state recovery - make sure that it won't change during renaming of nodes on gui
Expand All @@ -26,8 +25,6 @@ case class FlinkCustomNodeContext(
exceptionHandlerPreparer: RuntimeContext => ExceptionHandler,
globalParameters: Option[NkGlobalParameters],
validationContext: Either[ValidationContext, Map[String, ValidationContext]],
@deprecated("TypeInformationDetection.instance should be used instead", "1.17.0")
typeInformationDetection: TypeInformationDetection,
componentUseCase: ComponentUseCase,
nodeDeploymentData: Option[NodeDeploymentData]
) {
Expand All @@ -36,17 +33,14 @@ case class FlinkCustomNodeContext(
lazy val contextTypeInfo: TypeInformation[Context] =
TypeInformationDetection.instance.forContext(asOneOutputContext)

lazy val valueWithContextInfo = new valueWithContextInfo
lazy val valueWithContextInfo = new ValueWithContextInfo

class valueWithContextInfo {
class ValueWithContextInfo {

@deprecated("TypeInformationDetection.instance.forValueWithContext should be used instead", "1.17.0")
def forCustomContext[T](ctx: ValidationContext, value: TypeInformation[T]): TypeInformation[ValueWithContext[T]] =
TypeInformationDetection.instance.forValueWithContext(ctx, value)
lazy val forUnknown: TypeInformation[ValueWithContext[AnyRef]] = forType[AnyRef](Unknown)

@deprecated("TypeInformationDetection.instance.forValueWithContext should be used instead", "1.17.0")
def forCustomContext[T](ctx: ValidationContext, value: TypingResult): TypeInformation[ValueWithContext[T]] =
TypeInformationDetection.instance.forValueWithContext(ctx, value)
def forNull[T]: TypeInformation[ValueWithContext[T]] =
forType(TypeInformationDetection.instance.forNull[T])

def forBranch[T](key: String, value: TypingResult): TypeInformation[ValueWithContext[T]] =
forBranch(key, TypeInformationDetection.instance.forType[T](value))
Expand All @@ -60,7 +54,12 @@ case class FlinkCustomNodeContext(
def forType[T](value: TypeInformation[T]): TypeInformation[ValueWithContext[T]] =
TypeInformationDetection.instance.forValueWithContext(asOneOutputContext, value)

lazy val forUnknown: TypeInformation[ValueWithContext[AnyRef]] = forType[AnyRef](Unknown)
def forClass[T](klass: Class[_]): TypeInformation[ValueWithContext[T]] =
forType[T](Typed.typedClass(klass))

def forClass[T: ClassTag]: TypeInformation[ValueWithContext[T]] =
forType(TypeInformationDetection.instance.forClass[T])

}

def branchValidationContext(branchId: String): ValidationContext = asJoinContext.getOrElse(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,25 @@
package pl.touk.nussknacker.engine.flink.api.typeinformation

import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.typeinfo.{TypeInformation, Types}
import pl.touk.nussknacker.engine.api.context.ValidationContext
import pl.touk.nussknacker.engine.api.typed.typing.TypingResult
import pl.touk.nussknacker.engine.api.typed.typing.{Typed, TypingResult}
import pl.touk.nussknacker.engine.api.{Context, ValueWithContext}
import pl.touk.nussknacker.engine.util.Implicits.RichStringList

import java.net.URLClassLoader
import java.util.ServiceLoader
import scala.jdk.CollectionConverters._
import scala.reflect.{ClassTag, classTag}

/**
* This is trait that allows for providing more details TypeInformation when ValidationContext is known,
* by default generic Flink mechanisms are used
*/
trait TypeInformationDetection extends Serializable {

// Flink doesn't have any special null serializer, so we use String TypeInfo
def forNull[T]: TypeInformation[T] = forType[T](Typed.typedClass[String])

def forContext(validationContext: ValidationContext): TypeInformation[Context]

def forValueWithContext[T](
Expand All @@ -29,6 +33,14 @@ trait TypeInformationDetection extends Serializable {
value: TypeInformation[T]
): TypeInformation[ValueWithContext[T]]

def forClass[T: ClassTag]: TypeInformation[T] = {
val klass = classTag[T].runtimeClass.asInstanceOf[Class[T]]
forClass(klass)
}

def forClass[T](klass: Class[T]): TypeInformation[T] =
forType[T](Typed.typedClass(klass))

def forType[T](typingResult: TypingResult): TypeInformation[T]

def priority: Int
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package pl.touk.nussknacker.engine.flink.util

import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.datastream.{DataStream, KeyedStream, SingleOutputStreamOperator}
import pl.touk.nussknacker.engine.api.{Context, LazyParameter, ValueWithContext}
import pl.touk.nussknacker.engine.flink.api.compat.ExplicitUidInOperatorsSupport
Expand All @@ -16,12 +15,13 @@ object richflink {

def groupBy(
groupBy: LazyParameter[CharSequence]
)(implicit ctx: FlinkCustomNodeContext): KeyedStream[ValueWithContext[String], String] = {
val typeInfo = ctx.valueWithContextInfo.forType(TypeInformation.of(classOf[String]))
)(implicit ctx: FlinkCustomNodeContext): KeyedStream[ValueWithContext[String], String] =
dataStream
.flatMap(new StringKeyOnlyMapper(ctx.lazyParameterHelper, groupBy), typeInfo)
.flatMap(
new StringKeyOnlyMapper(ctx.lazyParameterHelper, groupBy),
ctx.valueWithContextInfo.forClass[String]
)
.keyBy((k: ValueWithContext[String]) => k.value)
}

def groupByWithValue[T <: AnyRef: TypeTag](groupBy: LazyParameter[CharSequence], value: LazyParameter[T])(
implicit ctx: FlinkCustomNodeContext
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,44 @@
package pl.touk.nussknacker.engine.flink.util.timestamp

import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.typeinfo.{TypeInformation, Types}
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.functions._
import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue
import org.apache.flink.util.Collector
import pl.touk.nussknacker.engine.flink.api.timestampwatermark.TimestampWatermarkHandler

import java.util

class TimestampAssignmentHelper[T: TypeInformation](timestampAssigner: TimestampWatermarkHandler[TimestampedValue[T]]) {

def assignWatermarks(stream: DataStream[T]): DataStream[T] = {
val valueTypeInfo: TypeInformation[T] = implicitly[TypeInformation[T]]

val timestampAssignmentHelperType: TypeInformation[TimestampedValue[T]] = Types.POJO(
classOf[TimestampedValue[T]],
util.Map.of(
"timestamp",
Types.LONG,
"hasTimestamp", // TODO: Should we mark this field?
Types.BOOLEAN,
"value",
valueTypeInfo
)
)

val timestampedStream = stream
.process((value: T, ctx: ProcessFunction[T, TimestampedValue[T]]#Context, out: Collector[TimestampedValue[T]]) =>
out.collect(new TimestampedValue(value, ctx.timestamp()))
.process(
(value: T, ctx: ProcessFunction[T, TimestampedValue[T]]#Context, out: Collector[TimestampedValue[T]]) =>
out.collect(new TimestampedValue(value, ctx.timestamp())),
timestampAssignmentHelperType
)

val withTimestampAssigner = timestampAssigner.assignTimestampAndWatermarks(timestampedStream)
withTimestampAssigner.map((tv: TimestampedValue[T]) => tv.getValue)
timestampAssigner
.assignTimestampAndWatermarks(timestampedStream)
.map(
(tv: TimestampedValue[T]) => tv.getValue,
valueTypeInfo
)
}

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package pl.touk.nussknacker.engine.flink.util.transformer

import com.typesafe.config.ConfigFactory
import org.apache.flink.api.common.typeinfo.TypeInfo
import org.scalatest.Inside
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers
Expand All @@ -11,6 +12,7 @@ import pl.touk.nussknacker.engine.api.typed.typing.Typed
import pl.touk.nussknacker.engine.build.ScenarioBuilder
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
import pl.touk.nussknacker.engine.compile.ProcessValidator
import pl.touk.nussknacker.engine.flink.api.typeinfo.caseclass.CaseClassTypeInfoFactory
import pl.touk.nussknacker.engine.flink.test.FlinkSpec
import pl.touk.nussknacker.engine.flink.util.source.EmitWatermarkAfterEachElementCollectionSource
import pl.touk.nussknacker.engine.process.helpers.ConfigCreatorWithCollectingListener
Expand Down Expand Up @@ -129,6 +131,11 @@ class ForEachTransformerSpec extends AnyFunSuite with FlinkSpec with Matchers wi

}

object TestRecord {
class TypeInfoFactory extends CaseClassTypeInfoFactory[TestRecord]
}

@TypeInfo(classOf[TestRecord.TypeInfoFactory])
case class TestRecord(id: String = "1", timeHours: Int = 0, eId: Int = 1, str: String = "a") {
def timestamp: Long = timeHours * 3600L * 1000
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package pl.touk.nussknacker.engine.flink.util.transformer

import com.github.ghik.silencer.silent
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.functions.source.SourceFunction
Expand All @@ -21,6 +20,7 @@ import pl.touk.nussknacker.engine.flink.api.timestampwatermark.{
StandardTimestampWatermarkHandler,
TimestampWatermarkHandler
}
import pl.touk.nussknacker.engine.flink.api.typeinformation.TypeInformationDetection
import pl.touk.nussknacker.engine.util.TimestampUtils.supportedTypeToMillis

import java.time.Duration
Expand All @@ -45,6 +45,8 @@ class PeriodicSourceFactory(timestampAssigner: TimestampWatermarkHandler[AnyRef]
extends SourceFactory
with UnboundedStreamComponent {

import pl.touk.nussknacker.engine.flink.api.datastream.DataStreamImplicits._

@silent("deprecated")
@MethodToInvoke
def create(
Expand All @@ -55,32 +57,29 @@ class PeriodicSourceFactory(timestampAssigner: TimestampWatermarkHandler[AnyRef]
): Source = {
new FlinkSource with ReturningType {

override def contextStream(
env: StreamExecutionEnvironment,
flinkNodeContext: FlinkCustomNodeContext
): DataStream[Context] = {

override def contextStream(env: StreamExecutionEnvironment, ctx: FlinkCustomNodeContext): DataStream[Context] = {
val count = Option(nullableCount).map(_.toInt).getOrElse(1)
val processName = flinkNodeContext.metaData.name
val processName = ctx.metaData.name
val stream = env
.addSource(new PeriodicFunction(period))
.map(_ => Context(processName.value))
.flatMap(flinkNodeContext.lazyParameterHelper.lazyMapFunction(value))
.flatMap { (v: ValueWithContext[AnyRef], c: Collector[AnyRef]) =>
1.to(count).map(_ => v.value).foreach(c.collect)
}
.returns(TypeInformation.of(classOf[AnyRef]))
.flatMap(value)(ctx)
.flatMap(
(value: ValueWithContext[AnyRef], out: Collector[AnyRef]) =>
1.to(count).map(_ => value.value).foreach(out.collect),
TypeInformationDetection.instance.forType[AnyRef](value.returnType)
)

val rawSourceWithTimestamp = timestampAssigner.assignTimestampAndWatermarks(stream)

rawSourceWithTimestamp
.map(
new FlinkContextInitializingFunction[AnyRef](
new BasicContextInitializer[AnyRef](Unknown),
flinkNodeContext.nodeId,
flinkNodeContext.convertToEngineRuntimeContext
ctx.nodeId,
ctx.convertToEngineRuntimeContext
),
flinkNodeContext.contextTypeInfo
ctx.contextTypeInfo
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,20 @@ object TransformStateTransformer extends CustomStreamTransformer with ExplicitUi
ContextTransformation
.definedBy(_.withVariable(OutputVar.customNode(variableName), newValue.returnType))
.implementedBy(
FlinkCustomStreamTransformation { (stream, nodeContext) =>
implicit val nctx: FlinkCustomNodeContext = nodeContext
FlinkCustomStreamTransformation { (stream, ctx) =>
implicit val nctx: FlinkCustomNodeContext = ctx
setUidToNodeIdIfNeed(
nodeContext,
ctx,
stream
.groupBy(groupBy)
.process(
new TransformStateFunction[String](
nodeContext.lazyParameterHelper,
ctx.lazyParameterHelper,
transformWhen,
newValue,
stateTimeoutSeconds.seconds
)
),
ctx.valueWithContextInfo.forClass[AnyRef](classOf[String])
)
)
}
Expand Down
Loading

0 comments on commit a935294

Please sign in to comment.