Skip to content

Commit

Permalink
FlinkKafkaSchemaRegistryBasedKeyValueDeserializationSchemaFactory imp…
Browse files Browse the repository at this point in the history
…rovements
  • Loading branch information
lciolecki committed Nov 16, 2024
1 parent 6607e1d commit 91c1d8c
Showing 1 changed file with 9 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,30 +13,19 @@ import pl.touk.nussknacker.engine.schemedkafka.RuntimeSchemaData
import pl.touk.nussknacker.engine.schemedkafka.flink.typeinfo.ConsumerRecordTypeInfo
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.SchemaRegistryClientFactory
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.serialization.{
AbstractSchemaRegistryBasedDeserializerFactory,
KafkaSchemaRegistryBasedKeyValueDeserializationSchemaFactory,
SchemaRegistryBasedDeserializerFactory
}
import pl.touk.nussknacker.engine.schemedkafka.serialization.KafkaSchemaBasedKeyValueDeserializationSchemaFactory

import scala.reflect.ClassTag

class FlinkKafkaSchemaRegistryBasedKeyValueDeserializationSchemaFactory(
protected val schemaRegistryClientFactory: SchemaRegistryClientFactory,
protected val deserializerFactory: SchemaRegistryBasedDeserializerFactory
) extends KafkaSchemaBasedKeyValueDeserializationSchemaFactory
with AbstractSchemaRegistryBasedDeserializerFactory {

override protected def createKeyDeserializer[K: ClassTag](
schemaDataOpt: Option[RuntimeSchemaData[ParsedSchema]],
kafkaConfig: KafkaConfig
): Deserializer[K] =
createDeserializer[K](kafkaConfig, schemaDataOpt, isKey = true)

override protected def createValueDeserializer[V: ClassTag](
schemaDataOpt: Option[RuntimeSchemaData[ParsedSchema]],
kafkaConfig: KafkaConfig
): Deserializer[V] =
createDeserializer[V](kafkaConfig, schemaDataOpt, isKey = false)
schemaRegistryClientFactory: SchemaRegistryClientFactory,
deserializerFactory: SchemaRegistryBasedDeserializerFactory
) extends KafkaSchemaRegistryBasedKeyValueDeserializationSchemaFactory(
schemaRegistryClientFactory,
deserializerFactory
) {

override def create[K: ClassTag, V: ClassTag](
kafkaConfig: KafkaConfig,
Expand All @@ -55,11 +44,9 @@ class FlinkKafkaSchemaRegistryBasedKeyValueDeserializationSchemaFactory(
override protected lazy val valueDeserializer: Deserializer[V] =
createValueDeserializer[V](valueSchemaDataOpt, kafkaConfig)

@transient
private lazy val typeInformationDetector = TypeInformationDetection.instance

@transient
private val keyTypeInfo: TypeInformation[K] = {
private lazy val keyTypeInfo: TypeInformation[K] = {
if (kafkaConfig.useStringForKey) {
Types.STRING.asInstanceOf[TypeInformation[K]]
} else {
Expand All @@ -68,8 +55,7 @@ class FlinkKafkaSchemaRegistryBasedKeyValueDeserializationSchemaFactory(
}
}

@transient
private val valueTypeInfo: TypeInformation[V] =
private lazy val valueTypeInfo: TypeInformation[V] =
// TODO: Creating TypeInformation for Avro / Json Schema is difficult because of schema evolution, therefore we rely on Kryo, e.g. serializer for GenericRecordWithSchemaId
typeInformationDetector.forClass[V]

Expand Down

0 comments on commit 91c1d8c

Please sign in to comment.